RabbitMQ订阅模式

Publish/Subscribe模式

  1. 一个生产者,多个消费者。

  2. 每一个消费者拥有自己的队列。

  3. 生产者没有将消息放到队列,而是exchange。

  4. 每个消费者的队列都要绑定到exchange上。

  5. 生产者发送消息,经过exchange到达队列,实现订阅模式。

图例:

生产者

package com.zyx.queue.ps;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zyx.queue.RabbitMQUtils;

public class Send {
	private static final String EXCHANGE_NAME = "test_exchange";
	public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
		//1.获取连接
		Connection connection = RabbitMQUtils.getConnection();
		//2.创建频道
		Channel channel = connection.createChannel();
		//3.声明队列
		channel.exchangeDeclare(EXCHANGE_NAME, "fanout");//分发模式
		//4.创建要发送的消息
		String msg = new String("hello rabbitmq");
		//5.发送消息
		channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
		System.out.println("Producer----send:" + msg);
		channel.close();
		connection.close();
	}
}

消费者01

package com.zyx.queue.ps;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.zyx.queue.RabbitMQUtils;

public class Receive001 {
	private static final String EXCHANGE_NAME = "test_exchange";
	private static final String QUEUE_NAME = "test_exchange_queue001";
	public static void main(String[] args) throws IOException, TimeoutException {
		//1.获取连接
		Connection connection = RabbitMQUtils.getConnection();
		//2.创建频道
		Channel channel = connection.createChannel();
		//3.声明队列
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		//3.2绑定exchange
		channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
		//4.创建消费者
		DefaultConsumer consumer = new DefaultConsumer(channel) {
			//事件模型
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
					throws IOException {
				String msg = new String(body,"utf-8");
				System.out.println("Consumer[001]----receive:" + msg);
			}
		};
		//5.监听消息
		channel.basicConsume(QUEUE_NAME, true,consumer);
	}
}

消费者02

package com.zyx.queue.ps;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.zyx.queue.RabbitMQUtils;

public class Receive002 {
	private static final String EXCHANGE_NAME = "test_exchange";
	private static final String QUEUE_NAME = "test_exchange_queue002";
	public static void main(String[] args) throws IOException, TimeoutException {
		//1.获取连接
		Connection connection = RabbitMQUtils.getConnection();
		//2.创建频道
		Channel channel = connection.createChannel();
		//3.声明队列
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		//3.2绑定exchange
		channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
		//4.创建消费者
		DefaultConsumer consumer = new DefaultConsumer(channel) {
			//事件模型
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
					throws IOException {
				String msg = new String(body,"utf-8");
				System.out.println("Consumer[002]----receive:" + msg);
			}
		};
		//5.监听消息
		channel.basicConsume(QUEUE_NAME, true,consumer);
	}
}

Exchange(routingKey模式)

一方面接收生产者的消息,一方面向队列推送消息

不设置的话是匿名转发,设置为fanout就不处理routingKey。

当设置为direct时,就可以通过设置routingKey来进行路由转发,其实就是分类发送。

图例:

代码演示

生产者
package com.zyx.queue.routing;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zyx.queue.RabbitMQUtils;

public class Send {
	private static final String EXCHANGE_NAME = "test_exchange";
	public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
		//1.获取连接
		Connection connection = RabbitMQUtils.getConnection();
		//2.创建频道
		Channel channel = connection.createChannel();
		//3.声明队列
		channel.exchangeDeclare(EXCHANGE_NAME, "direct");//路由模式
		//4.创建要发送的消息
		String msg = new String("hello rabbitmq");
		//5.发送消息+设置路由
		String routingKey = "blue";
		channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
		System.out.println("Producer----send:" + msg);
		channel.close();
		connection.close();
	}
}
消费者1
package com.zyx.queue.routing;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.zyx.queue.RabbitMQUtils;

public class Receive001 {
	private static final String EXCHANGE_NAME = "test_exchange";
	private static final String QUEUE_NAME = "test_exchange_queue001";
	public static void main(String[] args) throws IOException, TimeoutException {
		//1.获取连接
		Connection connection = RabbitMQUtils.getConnection();
		//2.创建频道
		Channel channel = connection.createChannel();
		//3.声明队列
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		//3.2绑定exchange+routingKey
		channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "red");
		//4.创建消费者
		DefaultConsumer consumer = new DefaultConsumer(channel) {
			//事件模型
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
					throws IOException {
				String msg = new String(body,"utf-8");
				System.out.println("Consumer[001]----receive:" + msg);
			}
		};
		//5.监听消息
		channel.basicConsume(QUEUE_NAME, true,consumer);
	}
}

消费者2
package com.zyx.queue.routing;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.zyx.queue.RabbitMQUtils;

public class Receive002 {
	private static final String EXCHANGE_NAME = "test_exchange";
	private static final String QUEUE_NAME = "test_exchange_queue002";
	public static void main(String[] args) throws IOException, TimeoutException {
		//1.获取连接
		Connection connection = RabbitMQUtils.getConnection();
		//2.创建频道
		Channel channel = connection.createChannel();
		//3.声明队列
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		//3.2绑定exchange+routingKey
		channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "red");
		channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "blue");
		//4.创建消费者
		DefaultConsumer consumer = new DefaultConsumer(channel) {
			//事件模型
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
					throws IOException {
				String msg = new String(body,"utf-8");
				System.out.println("Consumer[002]----receive:" + msg);
			}
		};
		//5.监听消息
		channel.basicConsume(QUEUE_NAME, true,consumer);
	}
}
测试

当生产者将routingKey设置为blue时,只有消费者2能够接收到消息,设置为red时都可接收。

topic模式

将routingKey和某种模式匹配

’#’ 匹配零个或者多个单词

‘*’ 匹配一个单词

图例:

提示:如果消费者的routingKey为’person.#‘,或者’person.add’,那么生产者的routingKey为’person’时都不匹配,将会丢弃消息。

代码演示Topic

生产者
package com.zyx.queue.topic;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zyx.queue.RabbitMQUtils;

public class Send {
	private static final String EXCHANGE_NAME = "test_exchange_topic";
	public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
		//1.获取连接
		Connection connection = RabbitMQUtils.getConnection();
		//2.创建频道
		Channel channel = connection.createChannel();
		//3.声明队列
		channel.exchangeDeclare(EXCHANGE_NAME, "topic");//主题模式
		//4.创建要发送的消息
		String msg = new String("hello rabbitmq");
		//5.发送消息+设置路由
		String routingKey = "person.update";
		channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
		System.out.println("Producer----send:" + msg);
		channel.close();
		connection.close();
	}
}
消费者1
package com.zyx.queue.topic;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.zyx.queue.RabbitMQUtils;

public class Receive001 {
	private static final String EXCHANGE_NAME = "test_exchange_topic";
	private static final String QUEUE_NAME = "test_exchange_queue_topic001";
	public static void main(String[] args) throws IOException, TimeoutException {
		//1.获取连接
		Connection connection = RabbitMQUtils.getConnection();
		//2.创建频道
		Channel channel = connection.createChannel();
		//3.声明队列
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		//3.2绑定exchange+routingKey
		channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "person.add");
		//4.创建消费者
		DefaultConsumer consumer = new DefaultConsumer(channel) {
			//事件模型
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
					throws IOException {
				String msg = new String(body,"utf-8");
				System.out.println("Consumer[001]----receive:" + msg);
			}
		};
		//5.监听消息
		channel.basicConsume(QUEUE_NAME, true,consumer);
	}
}
消费者2
package com.zyx.queue.topic;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.zyx.queue.RabbitMQUtils;

public class Receive002 {
	private static final String EXCHANGE_NAME = "test_exchange_topic";
	private static final String QUEUE_NAME = "test_exchange_queue_topic002";
	public static void main(String[] args) throws IOException, TimeoutException {
		//1.获取连接
		Connection connection = RabbitMQUtils.getConnection();
		//2.创建频道
		Channel channel = connection.createChannel();
		//3.声明队列
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		//3.2绑定exchange+routingKey
		channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "person.#");
		//4.创建消费者
		DefaultConsumer consumer = new DefaultConsumer(channel) {
			//事件模型
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
					throws IOException {
				String msg = new String(body,"utf-8");
				System.out.println("Consumer[002]----receive:" + msg);
			}
		};
		//5.监听消息
		channel.basicConsume(QUEUE_NAME, true,consumer);
	}
}

测试

当生产者将routingKey设置为person.update时,只有消费者2能够接收到消息,设置为person.add时都可接收。