本文对应 RabbitMQ 官方教程的第四个例子——Routing(路由选择),讲述怎么选择性的接收消息。
路由选择(Routing)
在上一篇的教程中,我们实现了一个简单的日志系统。可以把日志消息广播给多个接收者。
在本教程中,我们将为其添加一个功能 - 我们将只能订阅一部分消息。
例如,我们只能将关键错误消息定向到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。
绑定(Bindings)
在之前的例子中,我们已经在创建绑定。 您可能会记得像:
1 | channel.QueueBind(queue: queueName, |
绑定(binding)是指交换器和队列(queue)的关系。
可以简单理解为:这个队列(queue)对这个交换器的消息感兴趣。
绑定的时候可以带上一个额外的 routingkey
参数。为了避免与 basicpublish
的参数混淆,我们把它叫做 binding key
。
以下是如何创建一个带 binding key
的绑定。
1 | // !机制:将 队列(queue) 绑定到指定 交换器(exchange) !!! |
binding key
的含义取决于交换器的类型。我们之前使用过的 fanout
类型会忽略这个值。
指定类型交换器
指定类型交换器——(Direct exchange)
我们上一个教程中的日志记录系统向所有消费者(consumers)广播所有消息。我们希望扩展它以允许根据消息的严重性过滤消息。
例如,我们可能希望将日志消息写入磁盘的脚本仅接收严重错误,而不是在警告或信息日志消息上浪费磁盘空间。
我们使用的是 fanout
类型交换器,它没有给我们太大的灵活性 - 它只能进行无意识的广播。
我们将会使用 direct
类型的交换器来代替。
路由的算法很简单——交换器将会对 binding key
和 routing key
进行精确匹配,从而确定消息该分发到哪个队列。
下图能够很好的描述这个场景:
在这个场景中,我们可以看到 direct
类型交换器 X
和两个队列绑定了。
第一个队列使用 orange
作为 binding key
,
第二个队列有两个绑定,一个使用 black
作为 binding key
,另外一个是 green
。
这样以来,当 routing key
为 orange
的消息发布到交换器,
就会被路由(route)到队列 Q1
。routing key
为 black
或者 green
的消息就会路由到Q2
。
其他的所有消息都将会被丢弃。
多个绑定
多个绑定——(Multiple bindings)
使用相同的绑定密钥绑定多个队列是完全合法的。 在我们的示例中,我们可以在X和Q1之间添加绑定键黑色的绑定。 在这种情况下,直接交换将表现得像扇出一样,并将消息广播到所有匹配的队列。 路由键为黑色的消息将传送到Q1和Q2。
多个队列使用相同的 binding key
是允许的。
我们的这个例子,我们可以添加一个X和Q1之间的绑定,使用 black
的 binding key
。
这样一来,direct
类型交换器就和 fanout
类型交换器的行为一样,将会广播消息到所有匹配的队列。
带有 routing key
为 black
的消息都会发送到 Q1
和 Q2
。
发送日志
我们将此模型用于我们的日志系统。 我们会将消息发送给 direct
类型交换器,而不是 fanout
类型交换器。
我们将提供日志级别作为 routing key
。 这样子负责接收处理的脚本就可以选择它想要处理的日志级别。
让我们首先关注发送日志的实现。
一如既往,我们需要先创建一个交换器:
1 | channel.ExchangeDeclare(exchange: "direct_logs", // 交换器名字 |
我们就可以准备发送消息了:
1 | var body = Encoding.UTF8.GetBytes(message); |
为简化起见,我们假设 severity
可以是 info
, warning
, error
之一。
订阅(Subscribing)
处理接收消息的方式和之前差不多,但是我们为每一个日志级别创建了一个新的绑定。
1 | var queueName = channel.QueueDeclare().QueueName; |
完整演示
为了更好地演示,下面的代码和官方教程的不同。
官方例子传送门
发送日志
的代码:
1 | using System; |
订阅日志
的代码:
1 | using System; |
启动两个 订阅日志
控制台实例:
1 | \MqReceiveLogsDirect\bin\Debug\netcoreapp2.1> dotnet MqReceiveLogsDirect.dll error |
1 | \MqReceiveLogsDirect\bin\Debug\netcoreapp2.1> dotnet MqReceiveLogsDirect.dll |
启动一个 发布日志
控制台实例:
1 | \MqEmitLogDirect\bin\Debug\netcoreapp2.1> dotnet MqEmitLogDirect.dll |
在 发布日志
控制台实例中输入内容:
1 | error 出现错误啦~ |
订阅日志
控制台实例得到消息:
1 | [Consumer(消费者),我关心日志级别是 error]:收到日志消息->error 出现错误啦~ |
1 | [Consumer(消费者),我关心日志级别是 info,warning,error]:收到日志消息->error 出现错误啦~ |
结束语
以下是官方的免责声明:
生产[非]适用性免责声明
请记住,这个和其他教程都是教程。
他们一次展示一个新概念,可能会故意过度简化某些事情而忽略其他事物。
例如,为了简洁起见,在很大程度上省略了诸如连接管理,错误处理,连接恢复,并发和度量收集之类的主题。
这种简化的代码不应用于生产环境。
要使用好 RabbitMQ,还有很多内容需要学习,所以继续下一章的 Topics(主题)