Chapter6 发布确认高级
6.1 发布确认 SpringBoot 版本
6.1.1 说明
问题
RabbitMQ宕机时,生产者发送的消息会丢失,如何保证消息的可靠投递?确认机制
代码架构
6.1.2 环境搭建
配置文件
spring.rabbitmq.publisher-confirm-type=correlated
- none: 禁用发布确认模式,默认值
- simple
- correlated: 发布消息成功到交换器后会触发回调方法
配置类
package com.atguigu.rabbitmq.springbootrabbitmq.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class ConfirmConfig { public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange"; public static final String CONFIRM_QUEUE_NAME = "confirm.queue"; @Bean public DirectExchange confirmExchange(){ return new DirectExchange(CONFIRM_EXCHANGE_NAME); } @Bean public Queue confirmQueue(){ return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build(); } @Bean public Binding queueBinding(@Qualifier("confirmQueue") Queue confirmQueue, @Qualifier("confirmExchange") DirectExchange confirmExchange){ return BindingBuilder.bind(confirmQueue).to(confirmExchange).with("key1"); } }
生产者
package com.atguigu.rabbitmq.springbootrabbitmq.controller; import com.atguigu.rabbitmq.springbootrabbitmq.config.ConfirmConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @Slf4j @RestController @RequestMapping("confirm") public class ProducerController { private final RabbitTemplate rabbitTemplate; @Autowired public ProducerController(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } // 发消息 @GetMapping("sendMessage/{message}") public void sendMsg(@PathVariable("message") String message){ CorrelationData correlationData = new CorrelationData("1"); rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.CONFIRM_ROUTING_KEY,message, correlationData); log.info("发送消息内容:{}",message); // 路由错误,交换机能收到但队列收不到 CorrelationData correlationData2 = new CorrelationData("2"); rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, "key2",message, correlationData2); log.info("发送消息内容:{}",message); } }
回调接口
package com.atguigu.rabbitmq.springbootrabbitmq.config; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; @Slf4j @Component public class MyCallBack implements RabbitTemplate.ConfirmCallback { private final RabbitTemplate rabbitTemplate; @Autowired public MyCallBack(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } // 注入 @PostConstruct public void init(){ rabbitTemplate.setConfirmCallback(this); } /** * 交换机确认回调方法 * @param correlationData 保存回调消息的ID及相关信息 * @param ack 交换机是否成功接收消息 * @param cause 失败原因(成功为null) */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { String id = correlationData != null ? correlationData.getId() : ""; if (ack){ log.info("交换机已经收到ID为:{}的消息",id); }else { log.info("交换机还未收到ID为:{}的消息,原因为:{}", id, cause); } } }
消费者
package com.atguigu.rabbitmq.springbootrabbitmq.consumer; import com.atguigu.rabbitmq.springbootrabbitmq.config.ConfirmConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Slf4j @Component public class Consumer { @RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME) public void receiveConfirmMessage(Message message){ String msg = new String(message.getBody()); log.info("接收到队列confirm.queue的消息:{}", msg); } }
收不到消息两种情况
- 交换机错误(如交换机名称错误)
- 队列错误(如路由错误)
6.2 回退消息
6.2.1 说明
- 问题
- 生产者确认机制下,交换机收不到消息可以通过回调方法通知到生产者
- 交换机接收到消息,如果发现该消息不可路由,那么消息会被直接丢弃,生产者不会被通知到
- Mandatory 参数
在当消息传递过程中不可达目的地时将消息返回给生产者
6.2.2 代码
配置文件
spring.rabbitmq.publisher-returns=true
生产者
@Slf4j @RestController @RequestMapping("confirm") public class ProducerController { private final RabbitTemplate rabbitTemplate; private final MyCallBack myCallBack; @Autowired public ProducerController(RabbitTemplate rabbitTemplate, MyCallBack myCallBack) { this.rabbitTemplate = rabbitTemplate; this.myCallBack = myCallBack; } // 注入 @PostConstruct public void init(){ rabbitTemplate.setConfirmCallback(myCallBack); rabbitTemplate.setReturnsCallback(myCallBack); } // 发消息 @GetMapping("sendMessage/{message}") public void sendMsg(@PathVariable("message") String message){ CorrelationData correlationData = new CorrelationData("1"); rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.CONFIRM_ROUTING_KEY,message, correlationData); log.info("发送消息内容:{}",message); CorrelationData correlationData2 = new CorrelationData("2"); rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, "key2",message, correlationData2); log.info("发送消息内容:{}",message); } }
回调接口
package com.atguigu.rabbitmq.springbootrabbitmq.config; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.ReturnedMessage; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.nio.charset.StandardCharsets; @Slf4j @Component public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback { /** * 交换机确认回调方法 * @param correlationData 保存回调消息的ID及相关信息 * @param ack 交换机是否成功接收消息 * @param cause 失败原因(成功为null) */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { String id = correlationData != null ? correlationData.getId() : ""; if (ack){ log.info("交换机已经收到ID为:{}的消息",id); }else { log.info("交换机还未收到ID为:{}的消息,原因为:{}", id, cause); } } /** * 无法路由时的回调方法 * @param returnedMessage 回调信息 */ @Override public void returnedMessage(ReturnedMessage returnedMessage) { log.error("消息:{}被交换机:{}退回,路由为:{}", new String(returnedMessage.getMessage().getBody(), StandardCharsets.UTF_8), returnedMessage.getExchange(), returnedMessage.getRoutingKey()); } }
消费者
不变
6.3 备份交换机
6.3.1 说明
通过mandatory 参数和回退消息,我们获得了对无法投递消息的感知能力
但这些消息需要手动处理,麻烦而且容易出错,也增加了生产者的复杂性
以上问题可以通过备份交换机来解决
- 当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理
- 备份交换机的类型为 fanout
- 在备份交换机下绑定一个队列,用于处理无法被路由的消息
- 建立一个报警队列,用独立的消费者来进行监测和报警
代码架构
6.3.2 代码
配置类
public static final String BACKUP_EXCHANGE_NAME = "backup.exchange"; public static final String BACKUP_QUEUE_NAME = "backup.queue"; public static final String WARNING_QUEUE_NAME = "warning.queue"; @Bean public FanoutExchange backupExchange(){ return new FanoutExchange(BACKUP_EXCHANGE_NAME); } @Bean public Queue backupQueue(){ return QueueBuilder.durable(BACKUP_QUEUE_NAME).build(); } @Bean public Queue warningQueue(){ return QueueBuilder.durable(WARNING_QUEUE_NAME).build(); } @Bean public Binding backupBinding(@Qualifier("backupQueue") Queue backupQueue, @Qualifier("backupExchange") FanoutExchange backupExchange){ return BindingBuilder.bind(backupQueue).to(backupExchange); } @Bean public Binding warningBinding(@Qualifier("warningQueue") Queue warningQueue, @Qualifier("backupExchange") FanoutExchange backupExchange){ return BindingBuilder.bind(warningQueue).to(backupExchange); } // 为确认交换机绑定备份交换机 @Bean public DirectExchange confirmExchange(){ ExchangeBuilder exchangeBuilder = ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true).withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME); return exchangeBuilder.build(); }
报警消费者
package com.atguigu.rabbitmq.springbootrabbitmq.consumer; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.nio.charset.StandardCharsets; @Slf4j @Component public class WarningConsumer { public static final String WARNING_QUEUE_NAME = "warning.queue"; @RabbitListener(queues = WARNING_QUEUE_NAME) public void receiveWarningMsg(Message message){ String msg = new String(message.getBody(), StandardCharsets.UTF_8); log.warn("报警发现不可路由消息{}", msg); } }
操作
- 删除原来的交换机
- 启动