重学“消息队列”,详解 RabbitMQ 消息确认机制! – 废柴程序员 – 简书

沙海 2021年6月3日03:23:43Java评论27字数 4155阅读13分51秒阅读模式
摘要

重学“消息队列”,详解 RabbitMQ 消息确认机制!废柴程序员

重学“消息队列”,详解 RabbitMQ 消息确认机制!

文章源自JAVA秀-https://www.javaxiu.com/29058.html

废柴程序员文章源自JAVA秀-https://www.javaxiu.com/29058.html

简书作者文章源自JAVA秀-https://www.javaxiu.com/29058.html

文章源自JAVA秀-https://www.javaxiu.com/29058.html

0.2422021-05-26 13:11打开App文章源自JAVA秀-https://www.javaxiu.com/29058.html

引言

RabbitMQ的模型是生产者发送信息到 Broker (代理),消费者从 Broker 中取出信息。但是生产者怎么知道消息是否真的发送到 Broker 中了呢?Broker 又怎么知道消息到底有没有被消费者消费?文章源自JAVA秀-https://www.javaxiu.com/29058.html

如果由于网络原因出现故障,生产者生产的消息未到达 Broker 或者 Broker 的消息被虚假消费,而它们又不知道,就会产生很严重的问题,如重复消费等。文章源自JAVA秀-https://www.javaxiu.com/29058.html

image文章源自JAVA秀-https://www.javaxiu.com/29058.html

01 RabbitMQ的消息确认流程

image文章源自JAVA秀-https://www.javaxiu.com/29058.html

从图中可以看出:文章源自JAVA秀-https://www.javaxiu.com/29058.html

消息确认机制分为生产者确认和消费者确认文章源自JAVA秀-https://www.javaxiu.com/29058.html

  • ConfirmCallback 生产者
  • ReturnCallback 生产者
  • ACK 消费者

02 生产者确认

  • 消息到达RabbitMQ的Exchange:Exchange向生产者发送Confirm确认。成功抑或失败都会返回一个confirmCallback
  • 消息成功达到Exchange,但是从Exchange投递Queue失败:向生产者返回一个returnCallback。只有失败才会返回

03 消费者确认

  • 消费者收到消息后需要对 RabbitMQ Server 进行消息 ACK 确认,RabbitMQ 根据确认信息决定是删除队列中的该信息还是重新发送

04 代码实现

4.1 生产者确认

重点在于生产者重写下面两个方法文章源自JAVA秀-https://www.javaxiu.com/29058.html

  • rabbitMQTemplate.setConfirmCallback
  • rabbitMQTemplate.setReturnCallback

1.开启生产者消息确认文章源自JAVA秀-https://www.javaxiu.com/29058.html

spring: rabbitmq: host: localhost port: 5672 virtual-host: / username: root password: root # 开启两个模式的生产者消息确认 publisher-confirm-type: simple publisher-returns: true 

2.声明交换机、队列,绑定交换机和队列文章源自JAVA秀-https://www.javaxiu.com/29058.html

@Configuration public class RabbitMQConfig { private static final String SB_TOPIC_EXCHANGE="sb_topic_exchange"; private static final String SB_TOPIC_QUEUE="sb_topic_queue1"; // 注入交换机 topic类型 @Bean("topicExchange") public Exchange topicExchange(){ return ExchangeBuilder.topicExchange(SB_TOPIC_EXCHANGE).durable(true) .autoDelete().build(); } // 声明队列 @Bean public Queue queue1(){ return QueueBuilder.durable(SB_TOPIC_QUEUE).build(); } // 绑定队列和交换机 @Bean public Binding exchangQueue(@Qualifier("queue1") Queue queue, @Qualifier("topicExchange") Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("user.#").noargs(); } 

3.创建消费者文章源自JAVA秀-https://www.javaxiu.com/29058.html

@Component @RabbitListener(queues = "sb_topic_queue1") public class Consumer { @RabbitHandler public void testPublishConfirm(String msg) { System.out.println("收到的信息:"+msg); } } 

4.创建生产者文章源自JAVA秀-https://www.javaxiu.com/29058.html

创建生产者发送消息到消息队列,模拟两种异常情况文章源自JAVA秀-https://www.javaxiu.com/29058.html

@SpringBootTest class RabiitmqSpringbootApplicationTests { @Autowired RabbitTemplate template; @Test void testConfirmTrue() { // 设置confirm回调函数 template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean b, java.lang.String s) { if (b) System.out.println("消息发送成功"); else System.out.println("消息发送失败"); } }); // 模拟生产者发送信息--正常情况 template.convertAndSend("sb_topic_exchange","user.info","日志级别:info;日志模块:user;日志信息:*****"); } @Test void testConfirmFalse() { // 设置confirm回调函数 template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean b, java.lang.String s) { if (b) System.out.println("消息发送成功"); else System.out.println("消息发送失败"); } }); // 模拟生产者发送信息 // 不存在的交换机--异常情况 template.convertAndSend("sb_topic_exchange_noexist","user.info","日志级别:info;日志模块:user;日志信息:*****"); } @Test void testReturnFalse() { // 设置return回调函数 template.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int i, java.lang.String s, java.lang.String s1, java.lang.String s2) { System.out.println(message.toString()); System.out.println(s+"*********"); } }); template.setMandatory(true); // 模拟生产者发送信息 // 正确的交换机 错误的routekey -- 异常情况 template.convertAndSend("sb_topic_exchange","noexist.user.info","日志级别:info;日志模块:user;日志信息:*****"); } 

4.2 消费者确认

重点在于消费者的下面两个方法文章源自JAVA秀-https://www.javaxiu.com/29058.html

  • channel.basicAck 消费者签收
  • channel.basicNAck 消费者拒绝签收

1.开启消费者确认模式文章源自JAVA秀-https://www.javaxiu.com/29058.html

spring: rabbitmq: host: localhost port: 5672 virtual-host: / username: root password: root # 设置消费端手动签收 listener: direct: acknowledge-mode: manual simple: acknowledge-mode: manual 

2.创建消费者文章源自JAVA秀-https://www.javaxiu.com/29058.html

/** * 注入消费者--手动签到 */ @Component @RabbitListener(queues = "sb_topic_queue1") public class Consumer2 { @RabbitHandler public void testComsumer(String msg, Channel channel, Message message) throws InterruptedException, IOException { // 消费端设置手动签收代码 try { System.out.println(msg); // 正常签收,mq收到此消息被正常签收后即可从队列中删除vi信息 // 是哟了那个channel的方法 // 第一个参数是deliverytag 标识哪条信息 第二个参数是是否批量签收 // int i=2/0; 模拟异常 channel.basicAck(message.getMessageProperties().getDeliveryTag(),true); System.out.println("消费者签收了该信息,服务器你可以删了"); }catch (Exception e){ // 异常拒绝签收,让mq重发此信息 System.out.println("该信息丢了,给我重发"); channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,true); // 该信息丢了,但是不需要你重发 // channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,false); } } } 

3.创建生产者文章源自JAVA秀-https://www.javaxiu.com/29058.html

@SpringBootTest class RabiitmqSpringbootApplicationTests { @Autowired RabbitTemplate template; @Test void testConsumerAck() { template.convertAndSend("sb_topic_exchange","noexist.user.info","日志级别:info;日志模块:user;日志信息:*****"); } } 

原文链接:https://www.cnblogs.com/sang-bit/p/14793341.html文章源自JAVA秀-https://www.javaxiu.com/29058.html

© 著作权归作者所有,转载或内容合作请联系作者 文章源自JAVA秀-https://www.javaxiu.com/29058.html

继续阅读
速蛙云 - 极致体验,强烈推荐!!!购买套餐就免费送各大视频网站会员!快速稳定、独家福利社、流媒体稳定解锁!速度快,全球上网、视频、游戏加速、独立IP均支持!基础套餐性价比很高!这里不多说,我一直正在使用,推荐购买:https://www.javaxiu.com/59919.html
weinxin
资源分享QQ群
本站是JAVA秀团队的技术分享社区, 会经常分享资源和教程; 分享的时代, 请别再沉默!
沙海
匿名

发表评论

匿名网友 填写信息

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定