RabbitMQ事务处理
RabbitMQ事务处理
问题
生产者发送消息到RabbitMQ服务器,怎么保证消息发送成功呢,使用两种方式实现
- 事务机制
- confirm机制
事务机制代码主要代码演示
生产者
package com.zyx.queue.tx;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zyx.queue.RabbitMQUtils;
public class Producer {
private static final String QUEUE_NAME = "test_queue_tx";
public static void main(String[] args) throws IOException, TimeoutException {
//1.获取连接
Connection connection = RabbitMQUtils.getConnection();
//2.创建频道
Channel channel = connection.createChannel();
//3.声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//4.创建要发送的消息
String msg = new String("hello rabbitmq");
try {
channel.txSelect();//开启事务
//5.发送消息
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
int s = 1/0;//测试事务回滚
channel.txCommit();//提交事务
System.out.println("Producer----send:" + msg);
} catch (Exception e) {
channel.txRollback();//回滚事务
System.out.println("Producer----send:" + msg + " error");
}
channel.close();
connection.close();
}
}
消费者
package com.zyx.queue.tx;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.zyx.queue.RabbitMQUtils;
import com.rabbitmq.client.AMQP.BasicProperties;
public class Consumer {
private static final String QUEUE_NAME = "test_queue_tx";
public static void main(String[] args) throws IOException, TimeoutException {
//1.获取连接
Connection connection = RabbitMQUtils.getConnection();
//2.创建频道
Channel channel = connection.createChannel();
//3.声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//4.创建消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
//事件模型
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body,"utf-8");
System.out.println("Consumer----receive:" + msg);
}
};
//5.监听消息
channel.basicConsume(QUEUE_NAME, true,consumer);
}
}
测试
打开和关闭代码 int s = 1/0;来查看消息发送情况。
confirm方式(1)
生产者
package com.zyx.queue.confirm;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zyx.queue.RabbitMQUtils;
public class Producer {
private static final String QUEUE_NAME = "test_queue_confirm01";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//1.获取连接
Connection connection = RabbitMQUtils.getConnection();
//2.创建频道
Channel channel = connection.createChannel();
//3.声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//4.创建要发送的消息
String msg = new String("hello rabbitmq");
channel.confirmSelect();//开启事务
//发送多条实现批量提交,但是如果有一条失败就全部回滚
for (int i = 0; i < 10; i++) {
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
}
//5.确定消息是否发送成功
if (!channel.waitForConfirms()) {
System.out.println("Producer----send:" + msg + " error");
} else {
System.out.println("Producer----send:" + msg);
}
channel.close();
connection.close();
}
}
消费者
package com.zyx.queue.confirm;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.zyx.queue.RabbitMQUtils;
import com.rabbitmq.client.AMQP.BasicProperties;
public class Consumer {
private static final String QUEUE_NAME = "test_queue_confirm01";
public static void main(String[] args) throws IOException, TimeoutException {
//1.获取连接
Connection connection = RabbitMQUtils.getConnection();
//2.创建频道
Channel channel = connection.createChannel();
//3.声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//4.创建消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
//事件模型
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body,"utf-8");
System.out.println("Consumer----receive:" + msg);
}
};
//5.监听消息
channel.basicConsume(QUEUE_NAME, true,consumer);
}
}
confirm方式(2)
生产者
package com.zyx.queue.confirm;
import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.zyx.queue.RabbitMQUtils;
public class Producer2 {
private static final String QUEUE_NAME = "test_queue_confirm02";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//1.获取连接
Connection connection = RabbitMQUtils.getConnection();
//2.创建频道
Channel channel = connection.createChannel();
//3.声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//4.开启事务
channel.confirmSelect();
//5.设置发送消息生成的标示
final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
channel.addConfirmListener(new ConfirmListener() {
//发送失败
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
System.out.println("----handleNack------multiple");
confirmSet.headSet(deliveryTag+1).clear();
} else {
System.out.println("----handleNack------multiple---false");
confirmSet.remove(deliveryTag);
}
}
//发送成功
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
System.out.println("----handleAck------multiple");
confirmSet.headSet(deliveryTag+1).clear();
} else {
System.out.println("----handleAck------multiple---false");
confirmSet.remove(deliveryTag);
}
}
});
//4.创建要发送的消息
String msg = new String("hello rabbitmq");
while (true) {
long seqNo = channel.getNextPublishSeqNo();
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
confirmSet.add(seqNo);
}
}
}
消费者
package com.zyx.queue.confirm;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.zyx.queue.RabbitMQUtils;
import com.rabbitmq.client.AMQP.BasicProperties;
public class Consumer2 {
private static final String QUEUE_NAME = "test_queue_confirm02";
public static void main(String[] args) throws IOException, TimeoutException {
//1.获取连接
Connection connection = RabbitMQUtils.getConnection();
//2.创建频道
Channel channel = connection.createChannel();
//3.声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//4.创建消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
//事件模型
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body,"utf-8");
System.out.println("Consumer----receive:" + msg);
}
};
//5.监听消息
channel.basicConsume(QUEUE_NAME, true,consumer);
}
}