RabbitMQ订阅模式
RabbitMQ订阅模式
Publish/Subscribe模式
一个生产者,多个消费者。
每一个消费者拥有自己的队列。
生产者没有将消息放到队列,而是exchange。
每个消费者的队列都要绑定到exchange上。
生产者发送消息,经过exchange到达队列,实现订阅模式。
图例:
生产者
package com.zyx.queue.ps;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zyx.queue.RabbitMQUtils;
public class Send {
private static final String EXCHANGE_NAME = "test_exchange";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//1.获取连接
Connection connection = RabbitMQUtils.getConnection();
//2.创建频道
Channel channel = connection.createChannel();
//3.声明队列
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");//分发模式
//4.创建要发送的消息
String msg = new String("hello rabbitmq");
//5.发送消息
channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
System.out.println("Producer----send:" + msg);
channel.close();
connection.close();
}
}
消费者01
package com.zyx.queue.ps;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.zyx.queue.RabbitMQUtils;
public class Receive001 {
private static final String EXCHANGE_NAME = "test_exchange";
private static final String QUEUE_NAME = "test_exchange_queue001";
public static void main(String[] args) throws IOException, TimeoutException {
//1.获取连接
Connection connection = RabbitMQUtils.getConnection();
//2.创建频道
Channel channel = connection.createChannel();
//3.声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//3.2绑定exchange
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
//4.创建消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
//事件模型
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body,"utf-8");
System.out.println("Consumer[001]----receive:" + msg);
}
};
//5.监听消息
channel.basicConsume(QUEUE_NAME, true,consumer);
}
}
消费者02
package com.zyx.queue.ps;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.zyx.queue.RabbitMQUtils;
public class Receive002 {
private static final String EXCHANGE_NAME = "test_exchange";
private static final String QUEUE_NAME = "test_exchange_queue002";
public static void main(String[] args) throws IOException, TimeoutException {
//1.获取连接
Connection connection = RabbitMQUtils.getConnection();
//2.创建频道
Channel channel = connection.createChannel();
//3.声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//3.2绑定exchange
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
//4.创建消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
//事件模型
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body,"utf-8");
System.out.println("Consumer[002]----receive:" + msg);
}
};
//5.监听消息
channel.basicConsume(QUEUE_NAME, true,consumer);
}
}
Exchange(routingKey模式)
一方面接收生产者的消息,一方面向队列推送消息
不设置的话是匿名转发,设置为fanout就不处理routingKey。
当设置为direct时,就可以通过设置routingKey来进行路由转发,其实就是分类发送。
图例:
代码演示
生产者
package com.zyx.queue.routing;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zyx.queue.RabbitMQUtils;
public class Send {
private static final String EXCHANGE_NAME = "test_exchange";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//1.获取连接
Connection connection = RabbitMQUtils.getConnection();
//2.创建频道
Channel channel = connection.createChannel();
//3.声明队列
channel.exchangeDeclare(EXCHANGE_NAME, "direct");//路由模式
//4.创建要发送的消息
String msg = new String("hello rabbitmq");
//5.发送消息+设置路由
String routingKey = "blue";
channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
System.out.println("Producer----send:" + msg);
channel.close();
connection.close();
}
}
消费者1
package com.zyx.queue.routing;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.zyx.queue.RabbitMQUtils;
public class Receive001 {
private static final String EXCHANGE_NAME = "test_exchange";
private static final String QUEUE_NAME = "test_exchange_queue001";
public static void main(String[] args) throws IOException, TimeoutException {
//1.获取连接
Connection connection = RabbitMQUtils.getConnection();
//2.创建频道
Channel channel = connection.createChannel();
//3.声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//3.2绑定exchange+routingKey
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "red");
//4.创建消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
//事件模型
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body,"utf-8");
System.out.println("Consumer[001]----receive:" + msg);
}
};
//5.监听消息
channel.basicConsume(QUEUE_NAME, true,consumer);
}
}
消费者2
package com.zyx.queue.routing;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.zyx.queue.RabbitMQUtils;
public class Receive002 {
private static final String EXCHANGE_NAME = "test_exchange";
private static final String QUEUE_NAME = "test_exchange_queue002";
public static void main(String[] args) throws IOException, TimeoutException {
//1.获取连接
Connection connection = RabbitMQUtils.getConnection();
//2.创建频道
Channel channel = connection.createChannel();
//3.声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//3.2绑定exchange+routingKey
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "red");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "blue");
//4.创建消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
//事件模型
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body,"utf-8");
System.out.println("Consumer[002]----receive:" + msg);
}
};
//5.监听消息
channel.basicConsume(QUEUE_NAME, true,consumer);
}
}
测试
当生产者将routingKey设置为blue时,只有消费者2能够接收到消息,设置为red时都可接收。
topic模式
将routingKey和某种模式匹配
’#’ 匹配零个或者多个单词
‘*’ 匹配一个单词
图例:
提示:如果消费者的routingKey为’person.#‘,或者’person.add’,那么生产者的routingKey为’person’时都不匹配,将会丢弃消息。
代码演示Topic
生产者
package com.zyx.queue.topic;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zyx.queue.RabbitMQUtils;
public class Send {
private static final String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//1.获取连接
Connection connection = RabbitMQUtils.getConnection();
//2.创建频道
Channel channel = connection.createChannel();
//3.声明队列
channel.exchangeDeclare(EXCHANGE_NAME, "topic");//主题模式
//4.创建要发送的消息
String msg = new String("hello rabbitmq");
//5.发送消息+设置路由
String routingKey = "person.update";
channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
System.out.println("Producer----send:" + msg);
channel.close();
connection.close();
}
}
消费者1
package com.zyx.queue.topic;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.zyx.queue.RabbitMQUtils;
public class Receive001 {
private static final String EXCHANGE_NAME = "test_exchange_topic";
private static final String QUEUE_NAME = "test_exchange_queue_topic001";
public static void main(String[] args) throws IOException, TimeoutException {
//1.获取连接
Connection connection = RabbitMQUtils.getConnection();
//2.创建频道
Channel channel = connection.createChannel();
//3.声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//3.2绑定exchange+routingKey
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "person.add");
//4.创建消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
//事件模型
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body,"utf-8");
System.out.println("Consumer[001]----receive:" + msg);
}
};
//5.监听消息
channel.basicConsume(QUEUE_NAME, true,consumer);
}
}
消费者2
package com.zyx.queue.topic;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.zyx.queue.RabbitMQUtils;
public class Receive002 {
private static final String EXCHANGE_NAME = "test_exchange_topic";
private static final String QUEUE_NAME = "test_exchange_queue_topic002";
public static void main(String[] args) throws IOException, TimeoutException {
//1.获取连接
Connection connection = RabbitMQUtils.getConnection();
//2.创建频道
Channel channel = connection.createChannel();
//3.声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//3.2绑定exchange+routingKey
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "person.#");
//4.创建消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
//事件模型
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body,"utf-8");
System.out.println("Consumer[002]----receive:" + msg);
}
};
//5.监听消息
channel.basicConsume(QUEUE_NAME, true,consumer);
}
}
测试
当生产者将routingKey设置为person.update时,只有消费者2能够接收到消息,设置为person.add时都可接收。