RabbitMQ事务处理

问题

生产者发送消息到RabbitMQ服务器,怎么保证消息发送成功呢,使用两种方式实现

  1. 事务机制
  2. confirm机制

事务机制代码主要代码演示

生产者
package com.zyx.queue.tx;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zyx.queue.RabbitMQUtils;

public class Producer {
	private static final String QUEUE_NAME = "test_queue_tx";
	public static void main(String[] args) throws IOException, TimeoutException {
		//1.获取连接
		Connection connection = RabbitMQUtils.getConnection();
		//2.创建频道
		Channel channel = connection.createChannel();
		//3.声明队列
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		
		//4.创建要发送的消息
		String msg = new String("hello rabbitmq");
		try {
			channel.txSelect();//开启事务
			//5.发送消息
			channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
			int s = 1/0;//测试事务回滚
			channel.txCommit();//提交事务
			System.out.println("Producer----send:" + msg);
		} catch (Exception e) {
			channel.txRollback();//回滚事务
			System.out.println("Producer----send:" + msg + " error");
		}
		channel.close();
		connection.close();
	}
}
消费者
package com.zyx.queue.tx;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.zyx.queue.RabbitMQUtils;
import com.rabbitmq.client.AMQP.BasicProperties;

public class Consumer {
	private static final String QUEUE_NAME = "test_queue_tx";
	public static void main(String[] args) throws IOException, TimeoutException {
		//1.获取连接
		Connection connection = RabbitMQUtils.getConnection();
		//2.创建频道
		Channel channel = connection.createChannel();
		//3.声明队列
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		//4.创建消费者
		DefaultConsumer consumer = new DefaultConsumer(channel) {
			//事件模型
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
					throws IOException {
				String msg = new String(body,"utf-8");
				System.out.println("Consumer----receive:" + msg);
			}
		};
		//5.监听消息
		channel.basicConsume(QUEUE_NAME, true,consumer);
	}
}
测试

打开和关闭代码 int s = 1/0;来查看消息发送情况。

confirm方式(1)

生产者
package com.zyx.queue.confirm;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zyx.queue.RabbitMQUtils;

public class Producer {
	private static final String QUEUE_NAME = "test_queue_confirm01";
	public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
		//1.获取连接
		Connection connection = RabbitMQUtils.getConnection();
		//2.创建频道
		Channel channel = connection.createChannel();
		//3.声明队列
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		//4.创建要发送的消息
		String msg = new String("hello rabbitmq");
		channel.confirmSelect();//开启事务
		//发送多条实现批量提交,但是如果有一条失败就全部回滚
		for (int i = 0; i < 10; i++) {
			channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
		}
		//5.确定消息是否发送成功
		if (!channel.waitForConfirms()) {
			System.out.println("Producer----send:" + msg + " error");
		} else {
			System.out.println("Producer----send:" + msg);
		}
		channel.close();
		connection.close();
	}
}
消费者
package com.zyx.queue.confirm;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.zyx.queue.RabbitMQUtils;
import com.rabbitmq.client.AMQP.BasicProperties;

public class Consumer {
	private static final String QUEUE_NAME = "test_queue_confirm01";
	public static void main(String[] args) throws IOException, TimeoutException {
		//1.获取连接
		Connection connection = RabbitMQUtils.getConnection();
		//2.创建频道
		Channel channel = connection.createChannel();
		//3.声明队列
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		//4.创建消费者
		DefaultConsumer consumer = new DefaultConsumer(channel) {
			//事件模型
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
					throws IOException {
				String msg = new String(body,"utf-8");
				System.out.println("Consumer----receive:" + msg);
			}
		};
		//5.监听消息
		channel.basicConsume(QUEUE_NAME, true,consumer);
	}
}

confirm方式(2)

生产者
package com.zyx.queue.confirm;

import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.zyx.queue.RabbitMQUtils;

public class Producer2 {
	private static final String QUEUE_NAME = "test_queue_confirm02";
	public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
		//1.获取连接
		Connection connection = RabbitMQUtils.getConnection();
		//2.创建频道
		Channel channel = connection.createChannel();
		//3.声明队列
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		//4.开启事务
		channel.confirmSelect();
		//5.设置发送消息生成的标示
		final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
		channel.addConfirmListener(new ConfirmListener() {
			//发送失败
			@Override
			public void handleNack(long deliveryTag, boolean multiple) throws IOException {
				if (multiple) {
					System.out.println("----handleNack------multiple");
					confirmSet.headSet(deliveryTag+1).clear();
				} else {
					System.out.println("----handleNack------multiple---false");
					confirmSet.remove(deliveryTag);
				}
			}
			//发送成功
			@Override
			public void handleAck(long deliveryTag, boolean multiple) throws IOException {
				if (multiple) {
					System.out.println("----handleAck------multiple");
					confirmSet.headSet(deliveryTag+1).clear();
				} else {
					System.out.println("----handleAck------multiple---false");
					confirmSet.remove(deliveryTag);
				}
			}
		});
		//4.创建要发送的消息
		String msg = new String("hello rabbitmq");
		while (true) {
			long seqNo = channel.getNextPublishSeqNo();
			channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
			confirmSet.add(seqNo);
		}
		
	}
}
消费者
package com.zyx.queue.confirm;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.zyx.queue.RabbitMQUtils;
import com.rabbitmq.client.AMQP.BasicProperties;

public class Consumer2 {
	private static final String QUEUE_NAME = "test_queue_confirm02";
	public static void main(String[] args) throws IOException, TimeoutException {
		//1.获取连接
		Connection connection = RabbitMQUtils.getConnection();
		//2.创建频道
		Channel channel = connection.createChannel();
		//3.声明队列
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		//4.创建消费者
		DefaultConsumer consumer = new DefaultConsumer(channel) {
			//事件模型
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
					throws IOException {
				String msg = new String(body,"utf-8");
				System.out.println("Consumer----receive:" + msg);
			}
		};
		//5.监听消息
		channel.basicConsume(QUEUE_NAME, true,consumer);
	}
}