【消息队列】RabbitMQ “消息队列模式” 以及NET8集成

在 .NET 8 中集成 RabbitMQ 消息队列,可以使用官方推荐的 RabbitMQ.Client 库或封装好的 MassTransit/EasyNetQ 等高级库。以下是 RabbitMQ 的基本集成代码 和 常见消息模式 的实现。

RabbitMQ 本身并没有直接支持延时消息的功能,但是可以通过一些机制来实现延时消息的效果。以下是两种常用的方法:

  • TTL(Time To Live)+ 死信交换机(Dead Letter Exchange, DLX)

可以为队列或消息设置 TTL,当消息的 TTL 到期后,如果没有被消费,就会变成死信。
设置了死信交换机(DLX)的队列中的死信会被转发到指定的 DLX 上,然后可以由绑定到这个 DLX 的队列进行处理,这样就实现了延时消息的功能。

  • 使用插件 rabbitmq-delayed-message-exchange

RabbitMQ 提供了一个官方插件 rabbitmq-delayed-message-exchange,它允许你创建一个特殊的交换机类型,该交换机能够接受带有延迟时间的消息,并在指定的时间后将消息投递给相应的队列。
这个插件需要安装并启用,并且要求 Erlang/OPT 版本在 18.0 及以上。

一、. RabbitMQ 基础集成(.NET 8)

安装 NuGet 包

dotnet add package RabbitMQ.Client

配置 RabbitMQ 连接

csharp
using RabbitMQ.Client;public class RabbitMQService
{private readonly IConnection _connection;private readonly IModel _channel;public RabbitMQService(string hostname = "localhost", string username = "guest", string password = "guest"){var factory = new ConnectionFactory{HostName = hostname,UserName = username,Password = password,DispatchConsumersAsync = true // 启用异步消费};_connection = factory.CreateConnection();_channel = _connection.CreateModel();}public void Dispose(){_channel?.Close();_connection?.Close();}
}
二. RabbitMQ 常见消息模式
(1)、 简单队列(Simple Queue)

场景:生产者发送消息到队列,消费者从队列接收消息(一对一)。

  • 生产者(Producer)
csharp
public void SendMessage(string queueName, string message)
{_channel.QueueDeclare(queue: queueName,durable: true, // 持久化队列exclusive: false,autoDelete: false);var body = Encoding.UTF8.GetBytes(message);_channel.BasicPublish(exchange: "",routingKey: queueName,basicProperties: null,body: body);
}
  • 消费者(Consumer)
csharp
public void ReceiveMessages(string queueName)
{_channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false);var consumer = new AsyncEventingBasicConsumer(_channel);consumer.Received += async (model, ea) =>{var body = Encoding.UTF8.GetString(ea.Body.Span);Console.WriteLine($"Received: {body}");await Task.Yield(); // 模拟异步处理_channel.BasicAck(ea.DeliveryTag, false); // 手动ACK};_channel.BasicConsume(queue: queueName,autoAck: false, // 关闭自动ACKconsumer: consumer);
}
(2)、 工作队列(Work Queue)

场景:多个消费者竞争消费同一个队列的消息(任务分发)。

  • 生产者
    同 简单队列 的 SendMessage 方法。

  • 消费者

csharp
// 启动多个消费者实例,RabbitMQ 会轮询分发消息
for (int i = 0; i < 3; i++) // 3个消费者
{Task.Run(() =>{using var service = new RabbitMQService();service.ReceiveMessages("task_queue");Console.WriteLine($"Consumer {i} started...");Thread.Sleep(Timeout.Infinite);});
}
(3)、 发布/订阅(Pub/Sub)【Fannout】

场景:一个生产者发送消息到交换机(Exchange),多个队列绑定到交换机,每个队列有自己的消费者(广播模式)。

  • 生产者(发送到交换机)
csharp
public void PublishToExchange(string exchangeName, string message)
{_channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout); // Fanout 广播var body = Encoding.UTF8.GetBytes(message);_channel.BasicPublish(exchangeName, "", null, body);
}
  • 消费者(绑定队列到交换机)
csharp
public void SubscribeToExchange(string exchangeName, string queueName)
{_channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout);_channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false);_channel.QueueBind(queueName, exchangeName, "");var consumer = new AsyncEventingBasicConsumer(_channel);consumer.Received += (model, ea) =>{var body = Encoding.UTF8.GetString(ea.Body.Span);Console.WriteLine($"Received: {body}");return Task.CompletedTask;};_channel.BasicConsume(queueName, autoAck: true, consumer);
}
(4) 、路由模式(Routing)

场景:根据 RoutingKey 定向投递消息到特定队列。

  • 生产者
csharp
public void SendWithRouting(string exchangeName, string routingKey, string message)
{_channel.ExchangeDeclare(exchangeName, ExchangeType.Direct);var body = Encoding.UTF8.GetBytes(message);_channel.BasicPublish(exchangeName, routingKey, null, body);
}
  • 消费者
csharp
public void ReceiveWithRouting(string exchangeName, string queueName, string routingKey)
{_channel.ExchangeDeclare(exchangeName, ExchangeType.Direct);_channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false);_channel.QueueBind(queueName, exchangeName, routingKey);var consumer = new AsyncEventingBasicConsumer(_channel);consumer.Received += (model, ea) =>{Console.WriteLine($"Received {ea.RoutingKey}: {Encoding.UTF8.GetString(ea.Body.Span)}");return Task.CompletedTask;};_channel.BasicConsume(queueName, autoAck: true, consumer);
}
(5) 、主题模式(Topic)

场景:使用通配符(*、#)匹配 RoutingKey,实现灵活路由。
*(星号)匹配单个单词
#(井号)匹配多个单词

  • 生产者
csharp
public void SendWithTopic(string exchangeName, string topic, string message)
{_channel.ExchangeDeclare(exchangeName, ExchangeType.Topic);var body = Encoding.UTF8.GetBytes(message);_channel.BasicPublish(exchangeName, topic, null, body);
}
  • 消费者
csharp
public void ReceiveWithTopic(string exchangeName, string queueName, string topicPattern)
{_channel.ExchangeDeclare(exchangeName, ExchangeType.Topic);_channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false);_channel.QueueBind(queueName, exchangeName, topicPattern);var consumer = new AsyncEventingBasicConsumer(_channel);consumer.Received += (model, ea) =>{Console.WriteLine($"Received {ea.RoutingKey}: {Encoding.UTF8.GetString(ea.Body.Span)}");return Task.CompletedTask;};_channel.BasicConsume(queueName, autoAck: true, consumer);
}
(6)、 RPC(远程过程调用)

场景:客户端发送请求并等待服务端响应(同步通信)。

  • 客户端(RPC Client)
public string Call(string message, string queueName = "rpc_queue")
{var correlationId = Guid.NewGuid().ToString();var replyQueueName = _channel.QueueDeclare().QueueName;var properties = _channel.CreateBasicProperties();properties.ReplyTo = replyQueueName;properties.CorrelationId = correlationId;var body = Encoding.UTF8.GetBytes(message);_channel.BasicPublish("", queueName, properties, body);var tcs = new TaskCompletionSource<string>();var consumer = new EventingBasicConsumer(_channel);consumer.Received += (model, ea) =>{if (ea.BasicProperties.CorrelationId == correlationId){var response = Encoding.UTF8.GetString(ea.Body.Span);tcs.SetResult(response);}};_channel.BasicConsume(replyQueueName, autoAck: true, consumer);return tcs.Task.Result; // 同步等待响应(生产环境建议用异步)
}
  • 服务端(RPC Server)
csharp
public void StartRpcServer(string queueName = "rpc_queue")
{_channel.QueueDeclare(queueName, durable: false, exclusive: false, autoDelete: false);var consumer = new AsyncEventingBasicConsumer(_channel);consumer.Received += async (model, ea) =>{var message = Encoding.UTF8.GetString(ea.Body.Span);Console.WriteLine($"Received RPC call: {message}");var response = $"Response to: {message}";var responseBytes = Encoding.UTF8.GetBytes(response);var properties = ea.BasicProperties;var replyProps = _channel.CreateBasicProperties();replyProps.CorrelationId = properties.CorrelationId;_channel.BasicPublish("",properties.ReplyTo,replyProps,responseBytes);_channel.BasicAck(ea.DeliveryTag, false);await Task.CompletedTask;};_channel.BasicConsume(queueName, autoAck: false, consumer);
}

三. 完整示例(.NET 8 Worker Service)

  • 生产者项目
// Program.cs
var builder = WebApplication.CreateBuilder(args);
var app = builder.Build();var rabbitMQ = new RabbitMQService();
rabbitMQ.SendMessage("hello_queue", "Hello, RabbitMQ!");app.Run();
  • 消费者项目(Worker Service)
csharp
// Program.cs
IHost host = Host.CreateDefaultBuilder(args).ConfigureServices(services =>{services.AddHostedService<Worker>();}).Build();await host.RunAsync();// Worker.cs
public class Worker : BackgroundService
{private readonly RabbitMQService _rabbitMQ;public Worker(){_rabbitMQ = new RabbitMQService();}protected override async Task ExecuteAsync(CancellationToken stoppingToken){_rabbitMQ.ReceiveMessages("hello_queue");while (!stoppingToken.IsCancellationRequested){await Task.Delay(1000, stoppingToken);}}
}
四. 总结

| 模式 | 适用场景 | 关键点 |
| 简单队列 | 一对一消息传递 | QueueDeclare + BasicPublish |
| 工作队列 | 任务分发(竞争消费) | 多个消费者监听同一队列 |
| 发布/订阅 | 广播消息 | ExchangeType.Fanout |
| 路由模式 | 定向路由 | ExchangeType.Direct + RoutingKey |
| 主题模式 | 灵活匹配路由 | ExchangeType.Topic + */# |
| RPC | 同步请求-响应 | ReplyTo + CorrelationId |

推荐实践
连接管理:使用 IHostedService 或单例模式管理 IConnection 和 IModel。
异常处理:监听 Connection.ConnectionShutdown 事件并重连。
性能优化:启用 DispatchConsumersAsync = true 支持异步消费。
高级封装:考虑使用 MassTransit 或 EasyNetQ 简化开发。
通过以上模式,可以灵活应对 异步任务处理、事件驱动架构、微服务通信 等场景。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/news/918450.shtml
繁体地址,请注明出处:http://hk.pswp.cn/news/918450.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Docker 镜像常见标签(如 `标准`、`slim`、`alpine` 和 `noble`)详细对比

以下是 Docker 镜像常见标签&#xff08;如 标准、slim、alpine 和 noble&#xff09;的详细对比&#xff0c;涵盖基础系统、体积、适用场景及注意事项&#xff1a;1. 标准镜像&#xff08;无后缀&#xff09; 基础系统&#xff1a;完整 Linux 发行版&#xff08;如 Debian、Ub…

(1-9-2)Java 工厂模式

目录 1.设计模式与分类 2. 工厂模式 2.1 工厂模式概述 2.2 简单工厂 2.3 学生推荐就业Demo 00.项目目录 01. 创建抽象接口Job 02. 创建实体类 03. 创建推荐工作工厂类 04. 创建学生推荐就业客户类 3. i18n国际化语言标题自适应 00. 实现效果 01. 创建抽象接口 02…

QT远程开发技巧

交叉编译时野火提供的文件 交叉编译器 GNU官网可以下载, QT库文件这里给的QT-everyWhere版本,是开源版本需要自行编译。(远程服务器通常是PC架构【AMD64】,直接apt-get install下载的qt也是Pc架构的,不能直接在板卡【ARM64】上运行,必须对源码交叉编译) 注意QT遵从GPL开源…

Linux操作系统从入门到实战(十九)进程状态

Linux操作系统从入门到实战&#xff08;十九&#xff09;进程状态前言一、什么是进程状态二、状态本质三、最核心的3种状态1. 就绪状态2. 运行状态3. 阻塞状态四、状态变化的核心1/两种资源如何影响状态&#xff1f;五、操作系统怎么管理这些状态&#xff1f;六、Linux里结构体…

容器技术之docker

容器技术之Docker一、什么是Docker二、为什么会出现Docker2.1 环境一致性问题2.2 虚拟化技术的局限性2.3 微服务架构的兴起三、重要概念3.1 什么是镜像3.2 什么是容器3.3 总结&#xff1a;3.4 Docker仓库四、安装Docker&#xff08;Ubuntu系统下&#xff09;1.卸载老的版本2.更…

数据结构与算法:树状数组

前言 太难了…… 一、树状数组使用场景 树状数组一般用来维护可差分的信息&#xff0c;比如累加和&#xff0c;累乘积等。举个例子&#xff0c;当整个数组的累加和为sum1&#xff0c;一个区间内的累加和为sum2&#xff0c;那么除了这个区间剩下部分的累加和就是sum1-sum2&am…

“一车一码一池一充”:GB 17761-2024新国标下电动自行车的安全革命

2025年9月1日&#xff0c;电动自行车行业将迎来一场深刻变革。随着强制性国家标准GB 17761-2024《电动自行车安全技术规范》的全面实施&#xff0c;我国超3.5亿电动自行车用户的安全出行将获得全新的技术保障。在这场安全升级中&#xff0c;“一车一码一池一充”的全链条管控机…

QT聊天项目DAY18

1.文件传输1.1 客户端采用分块传输(20MB/块)&#xff0c;以及MD5码校验并将读出的二进制数据采用Base64编码进行传输1.1.0 通信协议1.1.1 UI采用垂直布局&#xff0c;该布局大小为570 * 160&#xff0c;间隔全是0&#xff0c;UI方面不详细介绍了1.1.2 MainWindow头文件#ifndef …

centos系统sglang单节点本地部署大模型

前置工作 本地部署大模型的基本概念和前置工作-CSDN博客 模型部署 这里通过docker容器进行部署。我这里是h20*8,部署deepseek-v3-0324,这个配置和模型都比较大,大家根据自己的硬件对应调整 步骤一 我们要通过sglang部署模型,先拉取sglang的docker镜像,这里下载失败的…

【dij算法/最短路/分层图】P4568 [JLOI2011] 飞行路线

题目描述 Alice 和 Bob 现在要乘飞机旅行&#xff0c;他们选择了一家相对便宜的航空公司。该航空公司一共在 nnn 个城市设有业务&#xff0c;设这些城市分别标记为 000 到 n−1n-1n−1&#xff0c;一共有 mmm 种航线&#xff0c;每种航线连接两个城市&#xff0c;并且航线有一定…

告别传统,CVPR三论文用GNN动态图重塑视觉AI

本文选自gongzhonghao【图灵学术SCI论文辅导】关注我们&#xff0c;掌握更多顶会顶刊发文资讯今天&#xff0c;为大家推荐一个极具前沿价值与实用潜力的研究方向&#xff1a;图神经网络&#xff08;GNN&#xff09;。作为深度学习领域的新兴力量&#xff0c;图神经网络在近年顶…

HTTP/HTTPS代理,支持RSA和SM2算法

在日常工作和学习中&#xff0c;我们经常遇到HTTP和HTTPS的相关问题&#xff0c;要解决这些问题&#xff0c;有时就需要搭建各种实验环境&#xff0c;重现业务场景&#xff0c;比如&#xff1a; 将HTTP转为HTTPS。本地只能发送HTTP请求&#xff0c;但是远程服务器却只能接收HT…

如何提高AI写作论文的查重率?推荐七个AI写作论文工具

随着AI技术在学术领域的广泛应用&#xff0c;越来越多的学生和研究人员开始使用AI写作工具来提高写作效率&#xff0c;帮助完成毕业论文、科研论文等。然而&#xff0c;AI生成的内容是否会提高论文的查重率&#xff1f;是否能有效避免重复和提高通过率&#xff1f;这些问题成为…

跨平台、低延迟、可嵌入:实时音视频技术在 AI 控制系统中的进化之路

引言&#xff1a;面向未来的实时音视频基座 在万物互联与智能化加速落地的时代&#xff0c;实时音视频技术早已不再只是社交娱乐的附属功能&#xff0c;而是智慧城市、应急指挥、远程操控、工业智造、教育培训、安防监控等系统的“神经中枢”。一条高性能、可控、低延迟的视频…

Spring WebFlux开发指导

Spring WebFlux是一个响应式的web服务器端应用开发框架&#xff0c;响应式是指&#xff0c;当前端组件的状态发生变化&#xff0c;则生成事件通知&#xff0c;根据需求可异步或者同步地向服务器端接口发送请求&#xff0c;当服务器端网络IO组件的状态发生变化&#xff0c;则生成…

09-docker镜像手动制作

文章目录一.手动制作单服务的nginx镜像1.启动一个基础容器&#xff0c;此处我使用的是centos7镜像。2.修改容器中的软件源3.安装nginx服务并启动nginx服务4.修复nginx的首页文件5.退出容器6.将退出的容器提交为镜像7.测试镜像的可用性二.手动制作多服务的nginx sshd镜像1.启用…

Android.mk教程

语法 Android.mk 的必备三行 LOCAL_PATH : $(call my-dir) # Android.mk的目录&#xff0c;call调用函数include $(CLEAR_VARS) # 除了LOCAL_PATH清除所有LOCAL_XXXinclude $(BUILD_SHARED_LIBRARY) # BUILD_XXX, 指定构建类型 # BUILD_SHARED_LIBRARY → .so动态库 # BUILD…

稠密检索:基于神经嵌入的高效语义搜索范式

本文由「大千AI助手」原创发布&#xff0c;专注用真话讲AI&#xff0c;回归技术本质。拒绝神话或妖魔化。搜索「大千AI助手」关注我&#xff0c;一起撕掉过度包装&#xff0c;学习真实的AI技术&#xff01; 1. 背景与定义 稠密检索&#xff08;Dense Retrieval&#xff09;是一…

AI日报0807 | GPT-5或今晚1点来袭:四大版本全曝光

关注&#xff1a;未来世界2099每日分享&#xff1a;全球最新AI资讯【应用商业技术其他】服务&#xff1a;【学习Q】【资源Q】【学习资料】【行业报告】&#xff08;无限免费下载&#xff09;应用 1、讯飞星火代码画布震撼上线&#xff1a;动嘴就能开发&#xff0c;工作效率翻倍…

认识爬虫 —— 正则表达式提取

本质是对字符串的处理&#xff0c;正则表达式描述的是一种字符串匹配的模式。简而言之&#xff0c;用具备一定特征意义的表达式对字符串进行检查&#xff0c;将符合条件的子字符串提取出来。导入模块import re一、单字符匹配match(表达式&#xff0c;匹配对象)&#xff1a;匹配…