RabbitMQ 队列学习
RabbitMQ 队列学习
准备工作
如果没有安装RabbitMQ,请参考我写的另一篇文章《RabbitMQ安装(mac版)》。安装完成后启动
rabbitmq-server
然后在浏览器输入http://localhost:15672/,输入guest/guest。点击Admin,添加一个用户,name=admin
password=123456,权限为admin,同时设置一个虚拟host为/test_host,
代码演示01(简单队列)
- 创建一个maven项目,修改pom文件如下
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.zyx</groupId>
<artifactId>queue</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<!-- https://mvnrepository.com/artifact/log4j/log4j -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/junit/junit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
创建一个rabbitmq连接,代码如下
package com.zyx.queue; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class RabbitMQUtils { public static Connection getConnection() throws IOException, TimeoutException{ ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/test_host"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("123456"); return connectionFactory.newConnection(); } }
- 创建生产者
package com.zyx.queue; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Producer { private static final String QUEUE_NAME = "test_queue"; public static void main(String[] args) throws IOException, TimeoutException { //1.获取连接 Connection connection = RabbitMQUtils.getConnection(); //2.创建频道 Channel channel = connection.createChannel(); //3.声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //4.创建要发送的消息 String msg = new String("hello rabbitmq"); //5.发送消息 channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); System.out.println("Producer----send:" + msg); channel.close(); connection.close(); } }
创建消费者
package com.zyx.queue; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.AMQP.BasicProperties; public class Consumer { private static final String QUEUE_NAME = "test_queue"; public static void main(String[] args) throws IOException, TimeoutException { //1.获取连接 Connection connection = RabbitMQUtils.getConnection(); //2.创建频道 Channel channel = connection.createChannel(); //3.声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //4.创建消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { //事件模型 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String msg = new String(body,"utf-8"); System.out.println("Consumer----receive:" + msg); } }; //5.监听消息 channel.basicConsume(QUEUE_NAME, true,consumer); } }
- 测试
//启动生产者发送消息 //启动消费者接收消息 //再次运行生产者发送消息,观察结果。
#### 代码演示02(work队列)
生产者
package com.zyx.queue.work; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.zyx.queue.RabbitMQUtils; public class Send { private static final String QUEUE_NAME = "test_work_queue"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { //1.获取连接 Connection connection = RabbitMQUtils.getConnection(); //2.创建频道 Channel channel = connection.createChannel(); //3.声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); for (int i = 1; i <= 50; i++) { //4.创建要发送的消息 String msg = new String("hello rabbitmq" + i); //5.发送消息 channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); System.out.println("Producer----send:" + msg); Thread.sleep(100); } channel.close(); connection.close(); } }
- 消费者1
package com.zyx.queue.work; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.AMQP.BasicProperties; import com.zyx.queue.RabbitMQUtils; public class Receive001 { private static final String QUEUE_NAME = "test_work_queue"; public static void main(String[] args) throws IOException, TimeoutException { //1.获取连接 Connection connection = RabbitMQUtils.getConnection(); //2.创建频道 Channel channel = connection.createChannel(); //3.声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //4.创建消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { //事件模型 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String msg = new String(body,"utf-8"); System.out.println("Consumer[001]----receive:" + msg); try { Thread.sleep(100); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }; //5.监听消息 channel.basicConsume(QUEUE_NAME, true,consumer); } }
消费者2
package com.zyx.queue.work; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.AMQP.BasicProperties; import com.zyx.queue.RabbitMQUtils; public class Receive002 { private static final String QUEUE_NAME = "test_work_queue"; public static void main(String[] args) throws IOException, TimeoutException { //1.获取连接 Connection connection = RabbitMQUtils.getConnection(); //2.创建频道 Channel channel = connection.createChannel(); //3.声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //4.创建消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { //事件模型 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String msg = new String(body,"utf-8"); System.out.println("Consumer[002]----receive:" + msg); try { Thread.sleep(10); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }; //5.监听消息 channel.basicConsume(QUEUE_NAME, true,consumer); } }
- 测试
//启动生产者发送消息 //启动两个消费者接收消息 //采用轮询方式进行消息接收
RabbitMQ 常用命令
启动监控管理器:rabbitmq-plugins enable rabbitmq_management
关闭监控管理器:rabbitmq-plugins disable rabbitmq_management
启动rabbitmq:rabbitmq-service start
关闭rabbitmq:rabbitmq-service stop
查看所有的队列:rabbitmqctl list_queues
清除所有的队列:rabbitmqctl reset
关闭应用:rabbitmqctl stop_app
启动应用:rabbitmqctl start_app
用户和权限设置(后面用处)
添加用户:rabbitmqctl add_user username password
分配角色:rabbitmqctl set_user_tags username administrator
新增虚拟主机:rabbitmqctl add_vhost vhost_name
将新虚拟主机授权给新用户:rabbitmqctl set_permissions -p vhost_name username '.*' '.*' '.*'
角色说明
none 最小权限角色
management 管理员角色
policymaker 决策者
monitoring 监控
administrator 超级管理员