C#使用RabbitMQ发送和接收消息工具类

news/2024/7/24 11:02:14 标签: c#, rabbitmq

下面是一个简单的 C# RabbitMQ 发送和接收消息的封装工具类的示例代码:

工具类

通过NuGet安装RabbitMQ.Client

using Newtonsoft.Json;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Channels;
using System.Threading.Tasks;

namespace WorkerService1
{
    public class RabbitMQHelper : IDisposable
    {
        private readonly ConnectionFactory _factory;
        private IConnection _connection;
        private IModel _channel;
        public RabbitMQHelper()
        {
            // 设置连接参数
            _factory = new ConnectionFactory() { HostName = "localhost", Port = 5672, UserName = "guest", Password = "guest" };
        }

        /// <summary>
        /// 发送消息
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="queueName"></param>
        /// <param name="message"></param>
        public void SendMessage<T>(string queueName, T message)
        {
            try
            {
                InitConnection();

                // 声明队列
                _channel.QueueDeclare(queue: queueName,
                    durable: true,// 设置为true表示队列是持久化的
                    exclusive: false,
                    autoDelete: false,
                    arguments: null);

                var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message));

                _channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: body);

            }
            catch (Exception ex)
            {
                Console.WriteLine("Failed to send message: {0}", ex.Message);
            }
        }

        /// <summary>
        /// 接收消息
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="queueName"></param>
        /// <param name="messageHandler"></param>
        public void ReceiveMessage<T>(string queueName, Action<T> messageHandler)
        {
            try
            {
                InitConnection();

                // 声明队列
                _channel.QueueDeclare(queue: queueName,
                    durable: true, // 设置为true表示队列是持久化的
                    exclusive: false,
                    autoDelete: false,
                    arguments: null);

                //设置消费者数量(并发度),每个消费者每次只能处理一条消息
                _channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

                // 创建消费者
                var consumer = new EventingBasicConsumer(_channel);
                consumer.Received += (model, ea) =>
                {
                    try
                    {
                        var message = Encoding.UTF8.GetString(ea.Body.ToArray());

                        var convertedMessage = JsonConvert.DeserializeObject<T>(message);

                        //委托方法
                        messageHandler.Invoke(convertedMessage);

                        // 消息处理成功,确认消息
                        _channel.BasicAck(ea.DeliveryTag, false);
                    }
                    catch (Exception ex)
                    {
                        // 消息处理异常,确认消息
                        _channel.BasicAck(ea.DeliveryTag, false);
                    }
                };

                _channel.BasicConsume(queue: queueName,
                    autoAck: false,// 设置为true表示自动确认消息
                    consumer: consumer);

            }
            catch (Exception ex)
            {
                Console.WriteLine("Failed to receive message: {0}", ex.Message);
            }
        }

        /// <summary>
        /// 初始化链接
        /// </summary>
        private void InitConnection()
        {
            if (_connection == null || !_connection.IsOpen)
            {
                _connection = _factory.CreateConnection();
                _channel = _connection.CreateModel();
            }
        }

        /// <summary>
        /// 释放资源
        /// </summary>
        public void Dispose()
        {
            _channel?.Close();
            _channel?.Dispose();
            _connection?.Close();
            _connection?.Dispose();
        }
    }
}

使用示例

using System;
using System.Text;
using System.Threading.Tasks;
using WorkerService1;

public class Program
{
    private static string QueueName = "myqueue_key";
    public static void Main()
    {

        var rabbitMQHelper = new RabbitMQHelper();
        for (long i = 0; i < 30; i++)
        {
            rabbitMQHelper.SendMessage(QueueName, i);
        }

        rabbitMQHelper.ReceiveMessage<long>(QueueName, ReceivedHandle);

        Console.ReadLine();
    }

    /// <summary>
    /// 接收处理
    /// </summary>
    /// <param name="index"></param>
    private static void ReceivedHandle(long index)
    {
        try
        {
            Console.WriteLine($"第{index}次开始{DateTime.Now}");
            Thread.Sleep(2000);
            Console.WriteLine($"第{index}次结束{DateTime.Now}");
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.Message);
        }
    }
}

http://www.niftyadmin.cn/n/1004228.html

相关文章

Golang syncMap 详解

Golang sync.Map 详解 在 Golang 中&#xff0c;sync.Map 是一个并发安全的 Map 实现&#xff0c;可以在多个 Goroutine 中安全地读写 Map。本文将对 sync.Map 进行详细的介绍&#xff0c;包括创建、初始化、添加元素、删除元素、遍历等操作&#xff0c;并提供相应的示例代码。…

劫持react组件

劫持props 假设我们有一个原组件&#xff0c;它接收一个 name prop&#xff0c;并显示一个问候语&#xff1a; // 原组件 function Greeting(props) {return <h1> Hello, {props.name}! </h1>; }我们可以定义一个高阶组件&#xff0c;它可以通过 props 传递一个 …

【云原生丶Kubernetes】从应用部署的发展看Kubernetes的前世今生

在了解Kubernetes之前&#xff0c;我们十分有必要先了解一下应用程序部署的发展历程&#xff0c;下面让我们一起来看看&#xff01; 应用部署的发展历程 我们先来看看应用程序部署的3个阶段&#xff1a;从物理机部署到虚拟机部署&#xff0c;再到容器化部署&#xff0c;他们之…

Linux下查看共享文件(so库)的编译是32位还是64位

方法一 采用file查看&#xff1a; file 命令可以用于查看文件的类型。它会根据文件的内容进行分析&#xff0c;给出详细的文件类型信息。 如&#xff1a; file libQtCore.so 显示如下 libQtCore.so: ELF 64-bit LSB shared object, x86-64, version 1 (GNU/Linux), dynamica…

Java基础---接口和抽象类的区别

目录 访问修饰符不同 职责不同 接口Interface 抽象类abstract class 访问修饰符不同 接口和抽象类&#xff0c;最明显的区别就是接口只是定义了一些方法而已&#xff0c;在不考虑Java8中default方法情况下&#xff0c;接口中是没有实现的代码的抽象类中的抽象方法可以有pub…

八数码、解华容道(bfs,全局择先,A*搜索)

【问题描述】 题目6&#xff1a;数阵问题 每个局面是三行三列的数字方阵&#xff0c;每个位置为0-8的一个数码且互不相同&#xff0c;求从初始局面&#xff08;自己设定&#xff09;如何“最快”移动到终止局面&#xff08;自己设定&#xff09;。 移动规则&#xff1a;每次只…

【推荐】win 安装 rust 1.70

目录 一、下载二、安装先决条件MinGW三、安装Rust四、配置国内镜像五、检查是否安装成功五、参考文章 一、下载 官网地址&#xff1a;https://www.rust-lang.org/zh-CN/ 二、安装先决条件MinGW win 安装 C运行环境 - MinGW 三、安装Rust 3.1首先设置安装路径和环境变量 配…

利用jmeter测试java请求

jmeter和loadrunner一样包含了测试脚本开发、测试执行、以及测试结果统计三个部分。只是jmeter没有脚本开发工具&#xff0c;因此测试java请求的脚本选择在eclipse中进行。 首先介绍如何用eclipse编写接口性能测试脚本。 针对"Java请求"类型的测试&#xff0c;需要…