RabbitMQ 队列学习

准备工作

如果没有安装RabbitMQ,请参考我写的另一篇文章《RabbitMQ安装(mac版)》。安装完成后启动

rabbitmq-server

然后在浏览器输入http://localhost:15672/,输入guest/guest。点击Admin,添加一个用户,name=admin

password=123456,权限为admin,同时设置一个虚拟host为/test_host,

代码演示01(简单队列)

  1. 创建一个maven项目,修改pom文件如下
   <project xmlns="http://maven.apache.org/POM/4.0.0"
   	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   	<modelVersion>4.0.0</modelVersion>
   	<groupId>com.zyx</groupId>
   	<artifactId>queue</artifactId>
   	<version>0.0.1-SNAPSHOT</version>
   	<dependencies>
   		<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
   		<dependency>
   			<groupId>com.rabbitmq</groupId>
   			<artifactId>amqp-client</artifactId>
   			<version>5.6.0</version>
   		</dependency>
   		<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
   		<dependency>
   			<groupId>org.slf4j</groupId>
   			<artifactId>slf4j-api</artifactId>
   			<version>1.7.25</version>
   		</dependency>
   		<!-- https://mvnrepository.com/artifact/log4j/log4j -->
   		<dependency>
   			<groupId>log4j</groupId>
   			<artifactId>log4j</artifactId>
   			<version>1.2.17</version>
   		</dependency>
   		<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
   		<dependency>
   			<groupId>org.slf4j</groupId>
   			<artifactId>slf4j-log4j12</artifactId>
   			<version>1.7.25</version>
   			<scope>test</scope>
   		</dependency>
   		<!-- https://mvnrepository.com/artifact/junit/junit -->
   		<dependency>
   			<groupId>junit</groupId>
   			<artifactId>junit</artifactId>
   			<version>4.12</version>
   			<scope>test</scope>
   		</dependency>
   	</dependencies>
   </project>
  1. 创建一个rabbitmq连接,代码如下

      package com.zyx.queue;
          
      import java.io.IOException;
      import java.util.concurrent.TimeoutException;
          
      import com.rabbitmq.client.Connection;
      import com.rabbitmq.client.ConnectionFactory;
          
      public class RabbitMQUtils {
        public static Connection getConnection() throws IOException, TimeoutException{
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/test_host");
            connectionFactory.setUsername("admin");
            connectionFactory.setPassword("123456");
            return connectionFactory.newConnection();
        }
      }
    
    1. 创建生产者
    package com.zyx.queue;
     import java.io.IOException;
     import java.util.concurrent.TimeoutException;
         
     import com.rabbitmq.client.Channel;
     import com.rabbitmq.client.Connection;
         
     public class Producer {
        private static final String QUEUE_NAME = "test_queue";
        public static void main(String[] args) throws IOException, TimeoutException {
            //1.获取连接
            Connection connection = RabbitMQUtils.getConnection();
            //2.创建频道
            Channel channel = connection.createChannel();
            //3.声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //4.创建要发送的消息
            String msg = new String("hello rabbitmq");
            //5.发送消息
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            System.out.println("Producer----send:" + msg);
            channel.close();
            connection.close();
        }
     }
    

  1. 创建消费者

     package com.zyx.queue;
         
     import java.io.IOException;
     import java.util.concurrent.TimeoutException;
         
     import com.rabbitmq.client.Channel;
     import com.rabbitmq.client.Connection;
     import com.rabbitmq.client.DefaultConsumer;
     import com.rabbitmq.client.Envelope;
     import com.rabbitmq.client.AMQP.BasicProperties;
         
     public class Consumer {
        private static final String QUEUE_NAME = "test_queue";
        public static void main(String[] args) throws IOException, TimeoutException {
            //1.获取连接
            Connection connection = RabbitMQUtils.getConnection();
            //2.创建频道
            Channel channel = connection.createChannel();
            //3.声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //4.创建消费者
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                //事件模型
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                        throws IOException {
                    String msg = new String(body,"utf-8");
                    System.out.println("Consumer----receive:" + msg);
                }
            };
            //5.监听消息
            channel.basicConsume(QUEUE_NAME, true,consumer);
        }
     }
         
    
    1. 测试
    //启动生产者发送消息
    //启动消费者接收消息
    //再次运行生产者发送消息,观察结果。
    

#### 代码演示02(work队列)

  1. 生产者

      package com.zyx.queue.work;
          
      import java.io.IOException;
      import java.util.concurrent.TimeoutException;
          
      import com.rabbitmq.client.Channel;
      import com.rabbitmq.client.Connection;
      import com.zyx.queue.RabbitMQUtils;
          
      public class Send {
        private static final String QUEUE_NAME = "test_work_queue";
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
            //1.获取连接
            Connection connection = RabbitMQUtils.getConnection();
            //2.创建频道
            Channel channel = connection.createChannel();
            //3.声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            for (int i = 1; i <= 50; i++) {
                //4.创建要发送的消息
                String msg = new String("hello rabbitmq" + i);
                //5.发送消息
                channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
                System.out.println("Producer----send:" + msg);
                Thread.sleep(100);
            }
            channel.close();
            connection.close();
        }
      }
          
    
    1. 消费者1
      package com.zyx.queue.work;
          
      import java.io.IOException;
      import java.util.concurrent.TimeoutException;
          
      import com.rabbitmq.client.Channel;
      import com.rabbitmq.client.Connection;
      import com.rabbitmq.client.DefaultConsumer;
      import com.rabbitmq.client.Envelope;
      import com.rabbitmq.client.AMQP.BasicProperties;
      import com.zyx.queue.RabbitMQUtils;
          
      public class Receive001 {
        private static final String QUEUE_NAME = "test_work_queue";
        public static void main(String[] args) throws IOException, TimeoutException {
            //1.获取连接
            Connection connection = RabbitMQUtils.getConnection();
            //2.创建频道
            Channel channel = connection.createChannel();
            //3.声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //4.创建消费者
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                //事件模型
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                        throws IOException {
                    String msg = new String(body,"utf-8");
                    System.out.println("Consumer[001]----receive:" + msg);
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            };
            //5.监听消息
            channel.basicConsume(QUEUE_NAME, true,consumer);
        }
      }
          
    
  2. 消费者2

      package com.zyx.queue.work;
          
      import java.io.IOException;
      import java.util.concurrent.TimeoutException;
          
      import com.rabbitmq.client.Channel;
      import com.rabbitmq.client.Connection;
      import com.rabbitmq.client.DefaultConsumer;
      import com.rabbitmq.client.Envelope;
      import com.rabbitmq.client.AMQP.BasicProperties;
      import com.zyx.queue.RabbitMQUtils;
          
      public class Receive002 {
        private static final String QUEUE_NAME = "test_work_queue";
        public static void main(String[] args) throws IOException, TimeoutException {
            //1.获取连接
            Connection connection = RabbitMQUtils.getConnection();
            //2.创建频道
            Channel channel = connection.createChannel();
            //3.声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //4.创建消费者
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                //事件模型
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                        throws IOException {
                    String msg = new String(body,"utf-8");
                    System.out.println("Consumer[002]----receive:" + msg);
                    try {
                        Thread.sleep(10);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            };
            //5.监听消息
            channel.basicConsume(QUEUE_NAME, true,consumer);
        }
      }
          
    
    1. 测试
      //启动生产者发送消息
      //启动两个消费者接收消息
      //采用轮询方式进行消息接收
    

RabbitMQ 常用命令

启动监控管理器:rabbitmq-plugins enable rabbitmq_management
关闭监控管理器:rabbitmq-plugins disable rabbitmq_management
启动rabbitmq:rabbitmq-service start
关闭rabbitmq:rabbitmq-service stop
查看所有的队列:rabbitmqctl list_queues
清除所有的队列:rabbitmqctl reset
关闭应用:rabbitmqctl stop_app
启动应用:rabbitmqctl start_app
用户和权限设置(后面用处)

添加用户:rabbitmqctl add_user username password
分配角色:rabbitmqctl set_user_tags username administrator
新增虚拟主机:rabbitmqctl add_vhost  vhost_name
将新虚拟主机授权给新用户:rabbitmqctl set_permissions -p vhost_name username '.*' '.*' '.*'
角色说明

none  最小权限角色
management 管理员角色
policymaker   决策者
monitoring  监控
administrator  超级管理员