RabbitMQ 路由选择

本文对应 RabbitMQ 官方教程的第四个例子——Routing(路由选择),讲述怎么选择性的接收消息。

路由选择(Routing)

在上一篇的教程中,我们实现了一个简单的日志系统。可以把日志消息广播给多个接收者。

在本教程中,我们将为其添加一个功能 - 我们将只能订阅一部分消息。
例如,我们只能将关键错误消息定向到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。

绑定(Bindings)

在之前的例子中,我们已经在创建绑定。 您可能会记得像:

1
2
3
channel.QueueBind(queue: queueName,
exchange: "logs",
routingKey: "");

绑定(binding)是指交换器和队列(queue)的关系。
可以简单理解为:这个队列(queue)对这个交换器的消息感兴趣。

绑定的时候可以带上一个额外的 routingkey 参数。为了避免与 basicpublish 的参数混淆,我们把它叫做 binding key
以下是如何创建一个带 binding key 的绑定。

1
2
3
4
5
6
// !机制:将 队列(queue) 绑定到指定 交换器(exchange) !!!
channel.QueueBind(queue: queueName, // 队列名
exchange: "direct_logs", // 交换器名
routingKey: "black"); // 路由键

// RabbitMQ 消息传递模型:消费者(接收者)使用一个队列,绑定到某个交换器,指定路由键

binding key 的含义取决于交换器的类型。我们之前使用过的 fanout 类型会忽略这个值。

指定类型交换器

指定类型交换器——(Direct exchange)

我们上一个教程中的日志记录系统向所有消费者(consumers)广播所有消息。我们希望扩展它以允许根据消息的严重性过滤消息。
例如,我们可能希望将日志消息写入磁盘的脚本仅接收严重错误,而不是在警告或信息日志消息上浪费磁盘空间。

我们使用的是 fanout 类型交换器,它没有给我们太大的灵活性 - 它只能进行无意识的广播。

我们将会使用 direct 类型的交换器来代替。
路由的算法很简单——交换器将会对 binding keyrouting key 进行精确匹配,从而确定消息该分发到哪个队列。

下图能够很好的描述这个场景:

direct 模式

在这个场景中,我们可以看到 direct 类型交换器 X 和两个队列绑定了。
第一个队列使用 orange 作为 binding key
第二个队列有两个绑定,一个使用 black 作为 binding key,另外一个是 green

这样以来,当 routing keyorange 的消息发布到交换器,
就会被路由(route)到队列 Q1routing keyblack 或者 green 的消息就会路由到Q2
其他的所有消息都将会被丢弃。

多个绑定

多个绑定——(Multiple bindings)

Multiple bindings

使用相同的绑定密钥绑定多个队列是完全合法的。 在我们的示例中,我们可以在X和Q1之间添加绑定键黑色的绑定。 在这种情况下,直接交换将表现得像扇出一样,并将消息广播到所有匹配的队列。 路由键为黑色的消息将传送到Q1和Q2。

多个队列使用相同的 binding key 是允许的。
我们的这个例子,我们可以添加一个X和Q1之间的绑定,使用 blackbinding key
这样一来,direct 类型交换器就和 fanout 类型交换器的行为一样,将会广播消息到所有匹配的队列。
带有 routing keyblack 的消息都会发送到 Q1Q2

发送日志

我们将此模型用于我们的日志系统。 我们会将消息发送给 direct 类型交换器,而不是 fanout 类型交换器。
我们将提供日志级别作为 routing key。 这样子负责接收处理的脚本就可以选择它想要处理的日志级别。
让我们首先关注发送日志的实现。

一如既往,我们需要先创建一个交换器:

1
2
channel.ExchangeDeclare(exchange: "direct_logs", // 交换器名字
type: "direct"); // 交换器类型

我们就可以准备发送消息了:

1
2
3
4
5
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "direct_logs", // 交换器名字
routingKey: severity, // 路由键
basicProperties: null,
body: body);

为简化起见,我们假设 severity 可以是 info, warning, error之一。

订阅(Subscribing)

处理接收消息的方式和之前差不多,但是我们为每一个日志级别创建了一个新的绑定。

1
2
3
4
5
6
7
8
9
10
11
var queueName = channel.QueueDeclare().QueueName;

foreach(var severity in args)
{
// 队列(queue) 绑定到指定 交换器(exchange) !!!
channel.QueueBind(queue: queueName, // 队列名
exchange: "direct_logs", // 交换器名字
routingKey: severity); // 路由键

// RabbitMQ 消息传递模型:消费者(接收者)使用一个队列,绑定到某个交换器,指定路由键
}

完整演示

为了更好地演示,下面的代码和官方教程的不同。
官方例子传送门

路由例子

发送日志 的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
using System;
using System.Text;

using RabbitMQ.Client; // 请自行用 NuGet 添加依赖

namespace MqEmitLogDirect
{
class Program
{
// 这是 生产者(发布者,发送者) 的示例代码
static void Main(string[] args)
{
using (var mqHelper = new MqHelper())
{
Console.WriteLine(" 请输入日志消息,格式是[日志等级] [日志内容]");
Console.WriteLine(" 如:info 记得跟踪信息");
Console.WriteLine(" 如:error 系统出现错误和异常啦");
Console.WriteLine(" 如:warning 出现了不合理请看但不影响系统运行和使用");

while (true)
{
var log = Console.ReadLine().Split(" ");
var logLevel = log[0];
var logMsg = log[1];

if (string.IsNullOrWhiteSpace(logLevel))
logLevel = "info";

if (string.IsNullOrWhiteSpace(logMsg))
logMsg = "Hello World";

mqHelper.Publish(logLevel, logMsg);
}
}

}
}


class MqHelper : IDisposable
{
IConnection connection = null;
IModel channel = null;

private IModel CreateModel()
{
if (channel == null)
{
var factory = new ConnectionFactory();
factory.HostName = "192.168.86.128";// 这是我的 RabbitMQ 服务器地址
factory.Port = 5672;
factory.UserName = "admin";//用户名
factory.Password = "admin";//密码

connection = factory.CreateConnection();

channel = connection.CreateModel();

// 声明一个交换器
channel.ExchangeDeclare(exchange: "direct_logs",// 交换器名称
type: "direct"); // 交换器类型

// 和上一章教程中的例子不同的是,交换器类型不同了
// 熟悉 RabbitMQ 交换器类型,才能更好地使用 RabbitMQ
}

return channel;
}

public void Publish(string logLevel, string logMsg)
{
var channel = CreateModel();

var body = Encoding.UTF8.GetBytes(logMsg);

channel.BasicPublish(exchange: "direct_logs",// 交换器名称
routingKey: logLevel, // 路由键
basicProperties: null,
body: body);

Console.WriteLine($" [Producer(生产者)] 广播日志消息-> 级别:{logLevel} 内容:{logMsg}");
Console.WriteLine();
}
public void Dispose()
{
if (connection != null)
connection.Dispose();

if (channel != null)
channel.Dispose();
}
}
}

订阅日志 的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
using System;
using System.Text;

using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace MqReceiveLogsDirect
{
class Program
{
// 这是 消费者(接收者) 的示例代码
static void Main(string[] args)
{
string logLevel = string.Empty;
if (args.Length > 0)
logLevel = args[0];

var factory = new ConnectionFactory();
factory.HostName = "192.168.86.128";// 这是我的 RabbitMQ 服务器地址
factory.Port = 5672;
factory.UserName = "admin";//用户名
factory.Password = "admin";//密码

using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{

// 声明一个交换器
channel.ExchangeDeclare(exchange: "direct_logs", // 交换器名字
type: "direct"); // 交换器类型


var queueName = channel.QueueDeclare().QueueName;

string careLogLevel = logLevel;

// 如果有指定只关心某个日志级别,那么...
if (!string.IsNullOrWhiteSpace(logLevel))
{
// 队列(queue) 绑定到指定 交换器(exchange)
// 并告诉该 交换器 本 队列 关心什么类型的消息
channel.QueueBind(queue: queueName, // 队列名
exchange: "direct_logs", // 交换器名称
routingKey: logLevel); // 路由键
}
else
{
// 队列(queue) 绑定到指定 交换器(exchange)
// 并告诉该 交换器 本 队列 关心什么类型的消息
channel.QueueBind(queue: queueName, // 队列名
exchange: "direct_logs", // 交换器名称
routingKey: "info"); // 路由键

// 队列(queue) 绑定到指定 交换器(exchange)
// 并告诉该 交换器 本 队列 关心什么类型的消息
channel.QueueBind(queue: queueName, // 队列名
exchange: "direct_logs", // 交换器名称
routingKey: "error"); // 路由键

// 队列(queue) 绑定到指定 交换器(exchange)
// 并告诉该 交换器 本 队列 关心什么类型的消息
channel.QueueBind(queue: queueName, // 队列名
exchange: "direct_logs", // 交换器名称
routingKey: "warning"); // 路由键

careLogLevel = "info,warning,error";
}



Console.WriteLine($" [Consumer(消费者),我关心日志级别是 {careLogLevel}]:我的心在等待,永远在等待...");
Console.WriteLine();

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var logLv = ea.RoutingKey;
var logMeg = Encoding.UTF8.GetString(ea.Body);

Console.WriteLine($" [Consumer(消费者),我关心日志级别是 {careLogLevel}]:收到日志消息->{logLv} {logMeg}");
};
channel.BasicConsume(queue: queueName,
autoAck: true,
consumer: consumer);

Console.ReadLine();
}
}
}
}

启动两个 订阅日志 控制台实例:

1
2
\MqReceiveLogsDirect\bin\Debug\netcoreapp2.1> dotnet MqReceiveLogsDirect.dll error
[Consumer(消费者),我关心日志级别是 error]:我的心在等待,永远在等待...
1
2
\MqReceiveLogsDirect\bin\Debug\netcoreapp2.1> dotnet MqReceiveLogsDirect.dll
[Consumer(消费者),我关心日志级别是 info,warning,error]:我的心在等待,永远在等待...

启动一个 发布日志 控制台实例:

1
2
3
4
5
\MqEmitLogDirect\bin\Debug\netcoreapp2.1> dotnet MqEmitLogDirect.dll
请输入日志消息,格式是[日志等级] [日志内容]
如:info 记得跟踪信息
如:error 系统出现错误和异常啦
如:warning 出现了不合理请看但不影响系统运行和使用

发布日志 控制台实例中输入内容:

1
2
3
4
5
6
7
8
error 出现错误啦~
[Producer(生产者)] 广播日志消息-> 级别:error 内容:出现错误啦~

warning 警告,警告~
[Producer(生产者)] 广播日志消息-> 级别:warning 内容:警告,警告~

info 跟踪信息是~
[Producer(生产者)] 广播日志消息-> 级别:info 内容:跟踪信息是~

订阅日志 控制台实例得到消息:

1
[Consumer(消费者),我关心日志级别是 error]:收到日志消息->error 出现错误啦~
1
2
3
[Consumer(消费者),我关心日志级别是 info,warning,error]:收到日志消息->error 出现错误啦~
[Consumer(消费者),我关心日志级别是 info,warning,error]:收到日志消息->warning 警告,警告~
[Consumer(消费者),我关心日志级别是 info,warning,error]:收到日志消息->info 跟踪信息是~

结束语

以下是官方的免责声明:

生产[非]适用性免责声明
请记住,这个和其他教程都是教程。
他们一次展示一个新概念,可能会故意过度简化某些事情而忽略其他事物。
例如,为了简洁起见,在很大程度上省略了诸如连接管理,错误处理,连接恢复,并发和度量收集之类的主题。
这种简化的代码不应用于生产环境。

要使用好 RabbitMQ,还有很多内容需要学习,所以继续下一章的 Topics(主题)

觉得文章对您有帮助,请我喝瓶肥宅快乐水可好 (๑•̀ㅂ•́)و✧
  • 本文作者: 阿彬~
  • 本文链接: https://iweixubin.github.io/posts/rabbitmq/routing/
  • 版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!
  • 免责声明:本媒体部分图片,版权归原作者所有。因条件限制,无法找到来源和作者未进行标注。
         如果侵犯到您的权益,请与我联系删除