Rabbit SpringBoot高级用法

news/2024/7/23 23:20:41

Rabbit高级用法

  • 一、Rabbit Springboot集成
    • 1.1 引入依赖
    • 1.2 添加配置
    • 1.3 添加Config
    • 1.4 编写Consumer
    • 1.5 发送消息
  • 二、Rabbit 高级用法
    • 2.1 消息发送前置处理器
    • 2.2 消息发送确认机制
    • 2.3 消息接收后处理器
    • 2.4 事务消息

一、Rabbit Springboot集成

1.1 引入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

1.2 添加配置

server:
  port: 8080
spring:
  application:
    name: rabbitmq-test
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: test
    password: test
    virtual-host: /
    publisher-returns: true

1.3 添加Config

@Configuration
@EnableConfigurationProperties(MqProperties.class)
public class MqConfig {

    @Autowired
    private MqProperties mqProperties;

    @Bean
    public MessageConverter messageConverter() {
        // 设置消息转换器
        return new Jackson2JsonMessageConverter();
    }
	
    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory());
        // 设置消息转换器
        rabbitTemplate.setMessageConverter(messageConverter());
        return rabbitTemplate;
    }

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(mqProperties.getHost());
        connectionFactory.setPort(mqProperties.getPort());
        connectionFactory.setUsername(mqProperties.getUsername());
        connectionFactory.setPassword(mqProperties.getPassword());
        connectionFactory.setVirtualHost(mqProperties.getVirtualHost());
        connectionFactory.setPublisherReturns(mqProperties.getPublisherReturns());
        connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
        return connectionFactory;
    }
    
    @Bean
    public Queue queue() {
        return new Queue("test-queue", true, false, false);
    }

    @Bean
    public Exchange exchange() {
        return new DirectExchange("direct-exchange-test", true, false);
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(exchange()).with("direct-key").noargs();
    }
}

1.4 编写Consumer

@Component
public class DirectConsumer extends MessageListenerAdapter {
    private static final Logger logger = LoggerFactory.getLogger(DirectConsumer.class);

    @Autowired
    private MessageConverter messageConverter;

    @Override
    @RabbitListener(queues = {"test-queue"}, ackMode = "MANUAL")
    public void onMessage(Message message, Channel channel) throws Exception {
        try {
            Map<String, String> msg = (Map<String, String>) messageConverter.fromMessage(message);
            // 获取 correlation id
            String id = (String) message.getMessageProperties().getHeaders().get(PublisherCallbackChannel.RETURNED_MESSAGE_CORRELATION_KEY);
            // String msg = new String(message.getBody(), StandardCharsets.UTF_8);
            logger.info("directConsumer>>>>>>message={}", msg);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            logger.error("directConsumer>>>>>>exception", e);
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
        }
    }
}

1.5 发送消息

@RestController
@RequestMapping("/mq")
public class MqController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMsg/{data}")
    public void test(@PathVariable(value = "data", required = false) String data) {
        Map<String, String> msg = new HashMap<>();
        msg.put("time", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
        if (StringUtils.isEmpty(data)) {
            data = String.valueOf(System.currentTimeMillis());
        }
        msg.put("data", "this is data:" + data);
        rabbitTemplate.convertAndSend("direct-exchange-test", "direct-key", msg, new CorrelationData(UUID.randomUUID().toString()));
    }
}

二、Rabbit 高级用法

2.1 消息发送前置处理器

RabbitTemplate.setBeforePublishPostProcessors(new MessagePostProcessor() {....})

@Bean
public RabbitTemplate rabbitTemplate() {
    RabbitTemplate rabbitTemplate = new RabbitTemplate();
    // 添加消息前置处理器。
    rabbitTemplate.setBeforePublishPostProcessors(new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            logger.info("-----------------rabbitmq before postProcess message={}", JSON.toJSONString(message));
            return message;
        }
    });
    rabbitTemplate.setConnectionFactory(connectionFactory());
    // 设置消息转换器
    rabbitTemplate.setMessageConverter(messageConverter());
    return rabbitTemplate;
}

通过前置处理器,可以修改消息、保存消息,设置通用header等。

2.2 消息发送确认机制

设置消息发送ConfirmCallback,消息发送成功 / 失败都会调用当前方法。


@Bean
public RabbitTemplate rabbitTemplate() {
    RabbitTemplate rabbitTemplate = new RabbitTemplate();

    rabbitTemplate.setCorrelationDataPostProcessor(new CorrelationDataPostProcessor() {
        @Override
        public CorrelationData postProcess(Message message, CorrelationData correlationData) {
            logger.info("-----------------rabbitmq correlationData postProcess message={}, correlationData={}",
                    JSON.toJSONString(message), JSON.toJSONString(correlationData));
            return correlationData;
        }
    });
    rabbitTemplate.setConnectionFactory(connectionFactory());
    // 设置消息转换器
    rabbitTemplate.setMessageConverter(messageConverter());

    // 消息确认,需要配置 spring.rabbitmq.publisher-confirm-type = correlated
    rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            logger.info("setConfirmCallback>>>>>>correlationData={} ack={}, cause={}", JSON.toJSONString(correlationData), ack, cause);
        }
    });
//开启mandatory模式(开启失败回调)
    rabbitTemplate.setMandatory(true);
    //添加失败回调方法
    rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
        logger.info("setReturnCallback>>>>>消息发送队列不可达, message:{}, exchange:{}, routingKey:{}, 原因:{}", message, exchange, routingKey, replyText);
    });
    return rabbitTemplate;
}

通过 new RabbitTemplate.ConfirmCallback() 中的 confirm(CorrelationData correlationData, boolean ack, String cause) 判断消息是否发送成功。

  • ack=true:发送成功
  • ack=false:发送失败

2.3 消息接收后处理器

消息接收前执行。
方式1:

/**
* 添加SimpleRabbitListenerContainerFactory 
* 通过 @RabbitListener(queues = {"test-queue"}, containerFactory = "containerFactory", ackMode = "MANUAL") 设置
*/
@Bean("containerFactory")
public SimpleRabbitListenerContainerFactory containerFactory(
        MessageConverter messageConverter) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    // 并发消费者数,默认为 1
    factory.setConcurrentConsumers(5);
    // 最大并发消费者数,默认为 1
    factory.setMaxConcurrentConsumers(10);
    // 拒绝未确认的消息并重新将它们放回队列,默认为 true
    factory.setDefaultRequeueRejected(false);
    // 容器启动时是否自动启动,默认为 true
    factory.setAutoStartup(true);
    // 消息确认模式,默认为 AUTO
    factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    // 每个消费者在一次请求中预获取的消息数,默认为 1
    factory.setPrefetchCount(5);
    // 从队列中接收消息的超时时间,默认为 0,表示没有超时限制
    factory.setReceiveTimeout(0L);
    // 与容器一起使用的事务管理器。默认情况下,容器不会使用事务
    // factory.setTransactionManager(platformTransactionManager());
    // 消息转换器,用于将接收到的消息转换为 Java 对象或将 Java 对象转换为消息
    factory.setMessageConverter(messageConverter);
    // 用于异步消息处理的线程池。默认情况下,容器使用一个简单的 SimpleAsyncTaskExecutor
    factory.setTaskExecutor(new SimpleAsyncTaskExecutor());
    // 重试失败的消息之前等待的时间,默认为 5000 毫秒
    factory.setRecoveryInterval(5000L);
    // 如果消息处理器尝试监听不存在的队列,是否抛出异常。默认为 true
    factory.setMissingQueuesFatal(false);
    // 监听器容器连接工厂
    factory.setConnectionFactory(connectionFactory());
    // 设置后置处理器
    factory.setAfterReceivePostProcessors(new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            logger.info("-----------------rabbitmq after postProcess message={}", JSON.toJSONString(message));
            return message;
        }
    });
    return factory;
}

方式2:

@Bean
public SimpleMessageListenerContainer listenerContainer(
        @Qualifier("directConsumer") DirectConsumer directConsumer) {
    String queueName = "test-queue";

    SimpleMessageListenerContainer simpleMessageListenerContainer =
            new SimpleMessageListenerContainer(connectionFactory());
    simpleMessageListenerContainer.setQueueNames(queueName);
    simpleMessageListenerContainer.setMessageListener(directConsumer);
    return simpleMessageListenerContainer;
}

@Autowired(required = false)
private List<AbstractMessageListenerContainer> simpleMessageListenerContainers;

@PostConstruct
public void init() {
    if (CollectionUtils.isEmpty(simpleMessageListenerContainers)) {
        return;
    }
    for (AbstractMessageListenerContainer simpleMessageListenerContainer : simpleMessageListenerContainers) {
        simpleMessageListenerContainer.setAfterReceivePostProcessors(new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                logger.info("-----------------rabbitmq after postProcess message={}", JSON.toJSONString(message));
                return message;
            }
        });
        // 设置手动 ACK
        simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    }
}

2.4 事务消息

不建议使用 rabbitmq 事务消息,对性能非常影响。建议通过消息发送确认机制实现事务。


http://www.niftyadmin.cn/n/363492.html

相关文章

三位一体,新华三绿洲3.0数据平台聚焦五大提升

如何有效发挥出数据要素的价值&#xff1f;--这已成为行业用户在数字化转型和智能化升级中的一道必答题。 从2020年《关于构建更加完善的要素市场化配置体制机制的意见》首次明确“数据”成为五大生产要素之一&#xff0c;到去年底《中共中央、国务院关于构建数据基础制度更好…

VBA学习-循环语句

目录 一、基础知识学习 二、单元格格式 三、循环语句与判断语句 一、基础知识学习 1、对单元格赋值 Sub 赋值()对单元格进行赋值Range("A1").Value 100End Sub 2、多区域赋值 Sub 多个区域赋值() 多区域赋值Range("B1:C2").Value 200 End Sub 3、不…

wordpress常用标签、方法解释

<!-- t 9 r --><?php$the_query new Wp_Query(array(//列表页排序规则meta_key > wp_top_value,meta_value > 1,orderby > rand, order > DESC, //倒序posts_per_page > 9));?><?php if ( $the_query->have_posts() ) : ?><?ph…

复习vue

目录 1、computed和watch的理解 2、computed 与 watch的区别? 3.computed计算属性 4.组件中 data&#xff0c;computed 和 watch 的区别 5.vue的响应式原理中 object.defineproperty有什么缺陷? 6.面试题&#xff1a;react、vue中的key有什么作用&#xff1f;&#xff0…

【day 06】vue的组件

组件 组件就是把一个网页分割成独立的小的模块&#xff0c;然后通过把模块进行组合&#xff0c;构建成一个大型的应用 单文件组件 只有一个组件 html css js 都在这个文件内 非单文件组件 可有多个组件 全局注册 !! 得先注册子组件 再生成 vm实例对象 创建子组件 const …

WPF入门实例 WPF完整例子 WPF DEMO WPF学习完整例子 WPF实战例子 WPF sql实例应用

WPF 和 WinForms 都是用于创建 Windows 桌面应用程序的开发框架&#xff0c;它们有一些相似之处&#xff0c;但也有很多不同之处。 在开发速度方面&#xff0c;这取决于具体情况。如果您熟悉 WinForms 开发并且正在开发简单的界面应用程序&#xff0c;则可能会比使用 WPF 更快…

从代码审计的角度分析 Ruoyi v4.7.6 的任意文件下载漏洞

前言 Ruoyi 的 v4.7.6 是 2022 年 12 月 16 日发布的一个版本&#xff0c;而任意文件下载漏洞实际上 12 月底的时候就已经爆出了&#xff0c;也陆续有一些文章在写这个漏洞&#xff0c;但是 Ruoyi 一直没有更新修复。 上月中旬&#xff08;2023 年 5 月&#xff09;&#xff0c…

ChatGPT国内免费使用的方法有哪些?分享几个网内可用的免费的ChatGPT网页版

目录 一、ChatGpt是什么&#xff1f; 二、ChatGPT国内免费使用的方法&#xff1a; 第一点&#xff1a;电脑端 第二点&#xff1a;手机端 三、结语&#xff1a; 一、ChatGpt是什么&#xff1f; ChatGPt是美国OpenAI [1] 研发的聊天机器人程序 。更是人工智能技术驱动的自然语言…