RabbitMQ 系列之:消息确认机制(Confirm)

消息的确认机制(Confirm)是实现 RabbitMQ 消息高可用投递的关键步骤,生产者在发送消息之后,如果 Broker 收到消息则会给生产端一个答复,用来确定这条消息是否正常发送。

Confirm 的另一个优点是它所处理的逻辑是异步的,生产者在发送一条消息之后可以进行异步监听通过回调来处理该消息,同时还可继续发送下一条,如果 RabbitMQ 因为系统内部错误导致消息丢失,回调会收到一条 nack 消息,用来处理失败的逻辑,否则会收到一条 ack 成功的消息。

Node.js 版实现

一个 channel 使用确认模式 “confirmation mode”,可参考官方文章 http://www.squaremobius.net/amqp.node/channel_api.html#confirmchannel

实现步骤

  1. 通过 connect.createConfirmChannel() 获取一个确认模式的 channel
  2. 使用 publish() 或者 sendToQueue() 接收回调做为附加参数

代码实现

主要变动生产者代码部分

  1. const amqp = require('amqplib');
  2. async function producer() {
  3. // 创建链接对象
  4. const connection = await amqp.connect('amqp://localhost:5672');
  5. // 获取通道
  6. const channel = await connection.createConfirmChannel();
  7. // 声明参数
  8. const exchangeName = 'confirm_exchange_name';
  9. const routingKey = 'confirm_routingKey';
  10. const msg = 'confirm_hello world';
  11. // 交换机
  12. await channel.assertExchange(exchangeName, 'direct', {
  13. durable: true,
  14. });
  15. // 发送消息
  16. channel.publish(exchangeName, routingKey, Buffer.from(msg), {}, function(err, ok){
  17. console.log(err, ok);
  18. if (err !== null) {
  19. console.warn('Message nacked!');
  20. } else {
  21. console.log('Message acked');
  22. }
  23. });
  24. // 关闭链接
  25. // await channel.close();
  26. // await connection.close();
  27. }
  28. producer();

源码地址

  1. https://github.com/Q-Angelo/project-training/tree/master/rabbitmq/helloworld-confirm

Java 版实现

实现步骤

  1. 开启确认模式 channel.confirmSelect()
  2. 添加监听 channel.addConfirmListener 监听失败或成功结果

代码实现

主要改动部分为生产者部分,并且取消了关闭了链接(channel.close()、connection.close())否则就无法监听

  1. public class Producer {
  2. public static void main(String[] args) throws Exception {
  3. // 创建链接工厂
  4. ConnectionFactory connectionFactory = new ConnectionFactory();
  5. connectionFactory.setHost("127.0.0.1");
  6. connectionFactory.setPort(5672);
  7. connectionFactory.setVirtualHost("/");
  8. // 通过链接工厂创建链接
  9. Connection connection = connectionFactory.newConnection();
  10. // 通过链接创建通道(channel)
  11. Channel channel = connection.createChannel();
  12. // 指定消息确认模式
  13. channel.confirmSelect();
  14. // 通过 channel 发送数据
  15. // exchange:交换机,如果不传默认为 AMQP default
  16. String confirmExchangeName = "confirm_exchange_name";
  17. String confirmRoutingKey = "confirm_routingKey";
  18. String confirmMsg = "confirm hello world";
  19. channel.basicPublish(confirmExchangeName, confirmRoutingKey, null, confirmMsg.getBytes());
  20. channel.addConfirmListener(new ConfirmListener() {
  21. @Override
  22. public void handleAck(long l, boolean b) throws IOException {
  23. System.out.println("--------handleAck-----------");
  24. System.out.print(l);
  25. System.out.print(b);
  26. }
  27. @Override
  28. public void handleNack(long l, boolean b) throws IOException {
  29. System.out.println("--------handleNack-----------");
  30. System.out.print(l);
  31. System.out.print(b);
  32. }
  33. });
  34. // 关闭链接
  35. // channel.close();
  36. // connection.close();
  37. }
  38. }

源码地址

  1. https://github.com/Q-Angelo/SpringBoot-Course/tree/master/chapter8/chapter8-1/src/main/java/com/may/rabbitmq/confirm/helloworld