RabbitMQ 消息确认机制 – SheHuan – 简书

沙海 2021年6月3日03:24:11Java评论32字数 5016阅读16分43秒阅读模式
摘要

智能摘要

智能摘要文章源自JAVA秀-https://www.javaxiu.com/29064.html

要测试消息不能成功发送到交换机的情况,只需要发送消息时指定一个不存在的交换机即可。这样消息自然不能成功的从交换机投放到队列。文章源自JAVA秀-https://www.javaxiu.com/29064.html

原文约 2050 | 图片 2 | 建议阅读 5 分钟 | 评价反馈文章源自JAVA秀-https://www.javaxiu.com/29064.html

RabbitMQ 消息确认机制

文章源自JAVA秀-https://www.javaxiu.com/29064.html

SheHuan文章源自JAVA秀-https://www.javaxiu.com/29064.html

RabbitMQ 消息确认机制 – SheHuan – 简书 简书优秀创作者 文章源自JAVA秀-https://www.javaxiu.com/29064.html

文章源自JAVA秀-https://www.javaxiu.com/29064.html

0.6312021-05-30 21:39打开App文章源自JAVA秀-https://www.javaxiu.com/29064.html

之前的文章我们已经介绍了 RabbitMQ 的基本使用,但是在默认情况下 RabbitMQ 并不能保证消息是否发送成功、以及是否被成功消费掉。消息在传递过程中存在丢失的可能。基于这样的现状,就有了消息的确认机制,来提高消息传递过程中的可靠性。文章源自JAVA秀-https://www.javaxiu.com/29064.html

RabbitMQ 中,消息的确认机制包含以下两个方面:文章源自JAVA秀-https://www.javaxiu.com/29064.html

  • 消息发送确认,生产者发送消息的确认包含两部分:文章源自JAVA秀-https://www.javaxiu.com/29064.html

    1、生产者发送的消息是否成功到达交换机文章源自JAVA秀-https://www.javaxiu.com/29064.html

    2、消息是否成功的从交换机投放到目标队列

  • 消息接收确认,消费者接收消息有三种不同的确认模式:文章源自JAVA秀-https://www.javaxiu.com/29064.html

    1、AcknowledgeMode.NONE:不确认,这是默认的模式,默认所有消息都被成功消费了,直接从队列删除消息。存在消息被消费过程中由于异常未被成功消费而掉丢失的风险。文章源自JAVA秀-https://www.javaxiu.com/29064.html

    2、AcknowledgeMode.AUTO:自动确认,根据消息被消费过程中是否发生异常来发送确认收到消息拒绝消息文章源自JAVA秀-https://www.javaxiu.com/29064.html

    的指令到 RabbitMQ 服务。这个确认时机开发人员是不可控的,同样存在消息丢失的风险。文章源自JAVA秀-https://www.javaxiu.com/29064.html

    3、AcknowledgeMode.MANUAL:手动确认,开发人员可以根据实际的业务,在合适的时机手动发送确认收到消息拒绝消息指令到 RabbitMQ 服务,整个过程开发人是可控的。这种模式也是我们要重点介绍的。

一、准备环境

创建 SpringBoot 项目,添加 RabbitMQ 依赖。文章源自JAVA秀-https://www.javaxiu.com/29064.html

这里将生产者和消费者放在一个项目。文章源自JAVA秀-https://www.javaxiu.com/29064.html

application.properties中添加连接 RabbitMQ 服务的配置,以及开启消息确认机制需要的配置:文章源自JAVA秀-https://www.javaxiu.com/29064.html

server.port=8080 # rabbitmq 相关配置 spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=123456 spring.rabbitmq.virtual-host=/ # 开启消息是否已经发送到交换机的确认机制 spring.rabbitmq.publisher-confirm-type=correlated # 开启消息未成功投递到目标队列时将消息返回 spring.rabbitmq.publisher-returns=true # 设置消费者需要手动确认消息 spring.rabbitmq.listener.simple.acknowledge-mode=manual spring.rabbitmq.listener.direct.acknowledge-mode=manual 

创建交换机、队列,并完成绑定:文章源自JAVA秀-https://www.javaxiu.com/29064.html

@Configuration public class AckRabbitMQConfig { // Fanout交换机 @Bean FanoutExchange ackExchange() { return new FanoutExchange("ack.exchange", true, false); } // 消息队列 @Bean Queue ackQueue() { return new Queue("ack.queue", true); } // 绑定队列和交换机 @Bean Binding ackBinding() { return BindingBuilder.bind(ackQueue()).to(ackExchange()); } } 

二、消息发送确认

消息发送确认的第一部分,是确认消息是否已经成功发送到交换机,我们需要实现RabbitTemplate.ConfirmCallback接口:文章源自JAVA秀-https://www.javaxiu.com/29064.html

@Service public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback { /** * @param correlationData * @param ack true 表示消息成功发送到交换机,false 则发送失败 * @param cause 消息发送失败的原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { System.out.println("消息已经发送到交换机!"); } else { System.out.println("消息发送到交换机失败:" + cause); } } } 

消息无论是否成功到达交换机都会调用confirm方法。文章源自JAVA秀-https://www.javaxiu.com/29064.html

消息发送确认的第二部分,就是消息是否成功的从交换机投放到目标队列,需要实现RabbitTemplate.ReturnsCallback接口:文章源自JAVA秀-https://www.javaxiu.com/29064.html

@Service public class ReturnCallbackService implements RabbitTemplate.ReturnsCallback { @Override public void returnedMessage(ReturnedMessage returned) { System.out.println("未成功投递到队列的消息:"+ returned.toString()); } } 

returnedMessage方法只会在消息未成功投递到目标队列时被调用ReturnedMessage就是投递失败的消息基本信息。文章源自JAVA秀-https://www.javaxiu.com/29064.html

定义好了两种消息发送确认服务,接下来就是配置消息发送确认服务,可以放在 RabbitMQ 配置类里进行全局配置:文章源自JAVA秀-https://www.javaxiu.com/29064.html

@Configuration public class AckRabbitMQConfig { @Autowired RabbitTemplate rabbitTemplate; @Autowired ConfirmCallbackService confirmCallbackService; @Autowired ReturnCallbackService returnCallbackService; @PostConstruct public void init(){ rabbitTemplate.setConfirmCallback(confirmCallbackService); rabbitTemplate.setReturnsCallback(returnCallbackService); } ...... ...... } 

也可以在发送消息时单独配置:文章源自JAVA秀-https://www.javaxiu.com/29064.html

@Service public class SendMessageService { @Autowired RabbitTemplate rabbitTemplate; @Autowired ConfirmCallbackService confirmCallbackService; @Autowired ReturnCallbackService returnCallbackService; public void send(String message) { rabbitTemplate.setConfirmCallback(confirmCallbackService); rabbitTemplate.setReturnsCallback(returnCallbackService); rabbitTemplate.convertAndSend("ack.exchange", "", message); System.out.println("生产者发送的消息:" + message); } } 

三、消息接收确认

消息接收确认的实现就相对简单一些:文章源自JAVA秀-https://www.javaxiu.com/29064.html

@Service public class ReceiveMessageService { @RabbitListener(queues = "ack.queue") public void receive(String msg, Channel channel, Message message) { try { // int i = 1/0; // 确认收到消息 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); System.out.println("消费者确认收到消息:" + msg); } catch (Exception e) { try { // 拒绝消息 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); System.out.println("消费者拒绝消息:" + msg); } catch (IOException ioException) { ioException.printStackTrace(); } } } } 

使用消息接收的手动确认模式时,接收消息的方法需要额外添加ChannelMessage两个类型的参数。文章源自JAVA秀-https://www.javaxiu.com/29064.html

Channel就是信道,在学习 Java client 操作 RabbitMQ 时,就是用它来发送接收消息的,不了解的可以复习一下。Message是 RabbitMQ 封装的消息类,里边包含了消息体、消息序号、以及交换机、队列等一些相关的信息。文章源自JAVA秀-https://www.javaxiu.com/29064.html

这样我们就可以根据实际的业务需求,在适当的时机告诉 RabbitMQ 服务,消息已经成功消费,或者被拒绝消费。文章源自JAVA秀-https://www.javaxiu.com/29064.html

这就涉及如下几个方法了:文章源自JAVA秀-https://www.javaxiu.com/29064.html

  • basicAck,确认收到消息,即消息消费成功,执行该方法后,消息会被从队列删除。该方法的参数含义如下:文章源自JAVA秀-https://www.javaxiu.com/29064.html

    1、deliveryTag文章源自JAVA秀-https://www.javaxiu.com/29064.html

    :消息投递的序号,就是1、2、3、4这样的递增整数。文章源自JAVA秀-https://www.javaxiu.com/29064.html

    2、multiple:是否批量确认消息,false 表示只确认当前 deliveryTag 对应的消息,true 表示会确认小于当前 deliveryTag 但还未被确认的消息。

  • basicNack,拒绝消息,由于发生异常等原因,消息没有被成功消费。和 basicAck 方法相比多了一个参数:文章源自JAVA秀-https://www.javaxiu.com/29064.html

    1、requeue:true 表示被拒绝的消息会重新进入队列头部。

  • basicReject,和 basicNack 方法的作用类似,但是少了 multiple 参数。

这里有两个问题需要注意:文章源自JAVA秀-https://www.javaxiu.com/29064.html

1、

如果拒绝消息时,设置requeuetrue,由于消息会重新进入队列头部,接下来又会被消费者处理,这样很可能陷入死循环,耗尽服务器资源,很危险的。所以在设置requeuetrue时,需要慎重考虑。文章源自JAVA秀-https://www.javaxiu.com/29064.html

拒绝消息时一般都是由于发生异常、或者业务上的错误,导致消费流程不能正常进行下去,可以考虑将此时的消息发送到死信队列,后续再单独处理。具体怎么实现,后期会有专门的文章介绍,目前先了解即可。文章源自JAVA秀-https://www.javaxiu.com/29064.html

2、

如果开启了消息接收的手动确认模式,但是消费消息时却没有做任何消息确认成功或拒绝的应答操作,则对应的消息会变成Unacked状态:文章源自JAVA秀-https://www.javaxiu.com/29064.html

如果消费者客户端不重启,则Unacked状态的消息会一直堆积,不会被删除,也不会被重新消费。文章源自JAVA秀-https://www.javaxiu.com/29064.html

如果消费者客户端重启,则消息会自动变为Ready状态,这样又会被重新消费一次。文章源自JAVA秀-https://www.javaxiu.com/29064.html

三、效果测试

可以通过如下接口来发送消息:文章源自JAVA秀-https://www.javaxiu.com/29064.html

@RestController public class SendMessageController { @Autowired private SendMessageService sendMessageService; @GetMapping("/send/{msg}") public void send(@PathVariable("msg") String msg) { sendMessageService.send(msg); } } 

要测试消息不能成功发送到交换机的情况,只需要发送消息时指定一个不存在的交换机即可。文章源自JAVA秀-https://www.javaxiu.com/29064.html

由于RabbitTemplate.ReturnsCallbackreturnedMessage方法只会在消息未成功投递到目标队列时被调用,所以要测试消息是否成功的从交换机投放到目标队列,可以注释掉AckRabbitMQConfig中交换机和队列绑定的代码,或者在后台进行交换机和队列的解绑:文章源自JAVA秀-https://www.javaxiu.com/29064.html

这样消息自然不能成功的从交换机投放到队列。文章源自JAVA秀-https://www.javaxiu.com/29064.html

至于消息接收确认,可以自行模拟不同的业务场景测试。文章源自JAVA秀-https://www.javaxiu.com/29064.html

本文完!文章源自JAVA秀-https://www.javaxiu.com/29064.html

© 著作权归作者所有,转载或内容合作请联系作者 文章源自JAVA秀-https://www.javaxiu.com/29064.html

继续阅读
速蛙云 - 极致体验,强烈推荐!!!购买套餐就免费送各大视频网站会员!快速稳定、独家福利社、流媒体稳定解锁!速度快,全球上网、视频、游戏加速、独立IP均支持!基础套餐性价比很高!这里不多说,我一直正在使用,推荐购买:https://www.javaxiu.com/59919.html
weinxin
资源分享QQ群
本站是JAVA秀团队的技术分享社区, 会经常分享资源和教程; 分享的时代, 请别再沉默!
沙海
匿名

发表评论

匿名网友 填写信息

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定