Rabbit高级用法
- 一、Rabbit Springboot集成
- 1.1 引入依赖
- 1.2 添加配置
- 1.3 添加Config
- 1.4 编写Consumer
- 1.5 发送消息
- 二、Rabbit 高级用法
- 2.1 消息发送前置处理器
- 2.2 消息发送确认机制
- 2.3 消息接收后处理器
- 2.4 事务消息
一、Rabbit Springboot集成
1.1 引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
1.2 添加配置
server:
port: 8080
spring:
application:
name: rabbitmq-test
rabbitmq:
host: 127.0.0.1
port: 5672
username: test
password: test
virtual-host: /
publisher-returns: true
1.3 添加Config
@Configuration
@EnableConfigurationProperties(MqProperties.class)
public class MqConfig {
@Autowired
private MqProperties mqProperties;
@Bean
public MessageConverter messageConverter() {
// 设置消息转换器
return new Jackson2JsonMessageConverter();
}
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory());
// 设置消息转换器
rabbitTemplate.setMessageConverter(messageConverter());
return rabbitTemplate;
}
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(mqProperties.getHost());
connectionFactory.setPort(mqProperties.getPort());
connectionFactory.setUsername(mqProperties.getUsername());
connectionFactory.setPassword(mqProperties.getPassword());
connectionFactory.setVirtualHost(mqProperties.getVirtualHost());
connectionFactory.setPublisherReturns(mqProperties.getPublisherReturns());
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
return connectionFactory;
}
@Bean
public Queue queue() {
return new Queue("test-queue", true, false, false);
}
@Bean
public Exchange exchange() {
return new DirectExchange("direct-exchange-test", true, false);
}
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(exchange()).with("direct-key").noargs();
}
}
1.4 编写Consumer
@Component
public class DirectConsumer extends MessageListenerAdapter {
private static final Logger logger = LoggerFactory.getLogger(DirectConsumer.class);
@Autowired
private MessageConverter messageConverter;
@Override
@RabbitListener(queues = {"test-queue"}, ackMode = "MANUAL")
public void onMessage(Message message, Channel channel) throws Exception {
try {
Map<String, String> msg = (Map<String, String>) messageConverter.fromMessage(message);
// 获取 correlation id
String id = (String) message.getMessageProperties().getHeaders().get(PublisherCallbackChannel.RETURNED_MESSAGE_CORRELATION_KEY);
// String msg = new String(message.getBody(), StandardCharsets.UTF_8);
logger.info("directConsumer>>>>>>message={}", msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
logger.error("directConsumer>>>>>>exception", e);
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
1.5 发送消息
@RestController
@RequestMapping("/mq")
public class MqController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMsg/{data}")
public void test(@PathVariable(value = "data", required = false) String data) {
Map<String, String> msg = new HashMap<>();
msg.put("time", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
if (StringUtils.isEmpty(data)) {
data = String.valueOf(System.currentTimeMillis());
}
msg.put("data", "this is data:" + data);
rabbitTemplate.convertAndSend("direct-exchange-test", "direct-key", msg, new CorrelationData(UUID.randomUUID().toString()));
}
}
二、Rabbit 高级用法
2.1 消息发送前置处理器
RabbitTemplate.setBeforePublishPostProcessors(new MessagePostProcessor() {....})
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
// 添加消息前置处理器。
rabbitTemplate.setBeforePublishPostProcessors(new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
logger.info("-----------------rabbitmq before postProcess message={}", JSON.toJSONString(message));
return message;
}
});
rabbitTemplate.setConnectionFactory(connectionFactory());
// 设置消息转换器
rabbitTemplate.setMessageConverter(messageConverter());
return rabbitTemplate;
}
通过前置处理器,可以修改消息、保存消息,设置通用header等。
2.2 消息发送确认机制
设置消息发送ConfirmCallback,消息发送成功 / 失败都会调用当前方法。
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setCorrelationDataPostProcessor(new CorrelationDataPostProcessor() {
@Override
public CorrelationData postProcess(Message message, CorrelationData correlationData) {
logger.info("-----------------rabbitmq correlationData postProcess message={}, correlationData={}",
JSON.toJSONString(message), JSON.toJSONString(correlationData));
return correlationData;
}
});
rabbitTemplate.setConnectionFactory(connectionFactory());
// 设置消息转换器
rabbitTemplate.setMessageConverter(messageConverter());
// 消息确认,需要配置 spring.rabbitmq.publisher-confirm-type = correlated
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
logger.info("setConfirmCallback>>>>>>correlationData={} ack={}, cause={}", JSON.toJSONString(correlationData), ack, cause);
}
});
//开启mandatory模式(开启失败回调)
rabbitTemplate.setMandatory(true);
//添加失败回调方法
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
logger.info("setReturnCallback>>>>>消息发送队列不可达, message:{}, exchange:{}, routingKey:{}, 原因:{}", message, exchange, routingKey, replyText);
});
return rabbitTemplate;
}
通过 new RabbitTemplate.ConfirmCallback()
中的 confirm(CorrelationData correlationData, boolean ack, String cause)
判断消息是否发送成功。
- ack=true:发送成功
- ack=false:发送失败
2.3 消息接收后处理器
消息接收前执行。
方式1:
/**
* 添加SimpleRabbitListenerContainerFactory
* 通过 @RabbitListener(queues = {"test-queue"}, containerFactory = "containerFactory", ackMode = "MANUAL") 设置
*/
@Bean("containerFactory")
public SimpleRabbitListenerContainerFactory containerFactory(
MessageConverter messageConverter) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
// 并发消费者数,默认为 1
factory.setConcurrentConsumers(5);
// 最大并发消费者数,默认为 1
factory.setMaxConcurrentConsumers(10);
// 拒绝未确认的消息并重新将它们放回队列,默认为 true
factory.setDefaultRequeueRejected(false);
// 容器启动时是否自动启动,默认为 true
factory.setAutoStartup(true);
// 消息确认模式,默认为 AUTO
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
// 每个消费者在一次请求中预获取的消息数,默认为 1
factory.setPrefetchCount(5);
// 从队列中接收消息的超时时间,默认为 0,表示没有超时限制
factory.setReceiveTimeout(0L);
// 与容器一起使用的事务管理器。默认情况下,容器不会使用事务
// factory.setTransactionManager(platformTransactionManager());
// 消息转换器,用于将接收到的消息转换为 Java 对象或将 Java 对象转换为消息
factory.setMessageConverter(messageConverter);
// 用于异步消息处理的线程池。默认情况下,容器使用一个简单的 SimpleAsyncTaskExecutor
factory.setTaskExecutor(new SimpleAsyncTaskExecutor());
// 重试失败的消息之前等待的时间,默认为 5000 毫秒
factory.setRecoveryInterval(5000L);
// 如果消息处理器尝试监听不存在的队列,是否抛出异常。默认为 true
factory.setMissingQueuesFatal(false);
// 监听器容器连接工厂
factory.setConnectionFactory(connectionFactory());
// 设置后置处理器
factory.setAfterReceivePostProcessors(new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
logger.info("-----------------rabbitmq after postProcess message={}", JSON.toJSONString(message));
return message;
}
});
return factory;
}
方式2:
@Bean
public SimpleMessageListenerContainer listenerContainer(
@Qualifier("directConsumer") DirectConsumer directConsumer) {
String queueName = "test-queue";
SimpleMessageListenerContainer simpleMessageListenerContainer =
new SimpleMessageListenerContainer(connectionFactory());
simpleMessageListenerContainer.setQueueNames(queueName);
simpleMessageListenerContainer.setMessageListener(directConsumer);
return simpleMessageListenerContainer;
}
@Autowired(required = false)
private List<AbstractMessageListenerContainer> simpleMessageListenerContainers;
@PostConstruct
public void init() {
if (CollectionUtils.isEmpty(simpleMessageListenerContainers)) {
return;
}
for (AbstractMessageListenerContainer simpleMessageListenerContainer : simpleMessageListenerContainers) {
simpleMessageListenerContainer.setAfterReceivePostProcessors(new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
logger.info("-----------------rabbitmq after postProcess message={}", JSON.toJSONString(message));
return message;
}
});
// 设置手动 ACK
simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
}
}
2.4 事务消息
不建议使用 rabbitmq 事务消息,对性能非常影响。建议通过消息发送确认机制实现事务。