RabbitMQ 主题

本文对应 RabbitMQ 官方教程的第五个例子——Topics(主题),讲述怎么基于模式(主题)来接收消息。

主题(Topics)

在上一个教程中,我们改进了日志系统,我们使用了 direct 交换器来代替 fanout 交换器,
因为fanout 交换器只能广播消息,而 direct 交换器可以实现选择性地接收消息。

虽然使用 direct 交换器 改进了我们的系统,但它仍然有局限性——它不能基于多个条件进行路由。

在我们的日志记录系统中,我们可能不仅要根据严重性来订阅日志,还要根据发出日志的源来订阅日志。
您可能从syslog unix工具中了解这个概念,
该工具根据 severity(info / warn / crit …)和 facility(auth / cron / kern …)来路由日志。

这会给我们带来很大的灵活性 - 我们可能想要听取来自 ‘cron’ 的关键错误以及来自 ‘kern’ 的所有日志。

要在我们的日志记录系统中实现这一点,我们需要了解更复杂的 topic 交换器。

主题交换器(Topic exchange)

发送到 topic 交换器的消息不能随意指定 routing key(路由键),它必须是一个由点分割的单词列表,
这些单词可以是任意内容,但通常会在其中指定一些与消息相关的特性。
请看一些合法的路由键示例:stock.usd.nysenyse.vmwquick.orange.rabbit
路由键可以包含任意数量的单词,但不能超过 255 个字节的上限。

binding key(绑定键) 也必须是相同的形式,topic 交换器的背后逻辑与 direct 交换器类似,
使用指定路由键发送的消息会被分发到与其绑定键匹配的所有队列中。
不过对于绑定键来说,有两个重要的特殊情况需要注意:

  • *(星号)可以代替一个单词。
  • #(哈希)可以代替零个或多个单词。

下图示例是对上述内容最简单的解释:

主题交换器

在这个例子中,我们将发送所有描述动物的消息。
消息将与包含三个单词(两个点)的路由键一起发送。
路由键中的第一个单词将描述速度,第二个是颜色,第三个是物种,
即:speed.colour.species

我们创建了三个绑定规则绑定到两个队列:
Q1 绑定了键 .orange.
Q2 绑定了键 *.*.rabbitlazy.#

这些绑定可以被概括为:

  • Q1 对所有橙色的动物感兴趣。
  • Q2 对兔子以及所有行动缓慢的动物感兴趣。

路由键为 quick.orange.rabbit 的消息会被发送到这两个队列。
路由键为 lazy.orange.elephant 也会被发送到这两个队列。
路由键为 quick.orange.fox 只会进入 Q1 队列。
路由键为 lazy.brown.fox 只会进入 Q2 队列。
路由键为 lazy.pink.rabbit 只会被发送到 Q2 队列一次,尽管它匹配了两个绑定(避免了消息重复)。
路由键为 quick.brown.fox 没有匹配的绑定,因此它将会被丢弃。

如果我们打破约定,发送使用一个或四个单词(例如:orangequick.orange.male.rabbit)作路由键的消息会发生什么?
答案是,这些消息因为没有匹配到任何绑定,将被丢弃。

但是,另外,例如路由键为 lazy.orange.male.rabbit 的消息,
尽管它有四个单词,也会匹配最后一个绑定,并将被发送到第二个队列。

Topics 交换器
topic交换器的功能是很强大的,它可以表现出一些其他交换器的行为。
当一个队列与键 绑定时, 它会忽略路由键,接收所有消息,这就像 fanout 交换器一样。
当特殊字符 * 未在绑定中使用时,topic 交换器的行为就像 direct 交换器一样。

完整演示

为了更好地演示,下面的代码和官方教程的不同。
官方例子传送门

派糖者代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
using System;
using System.Text;

using RabbitMQ.Client; // 请自行用 NuGet 添加依赖

namespace CandyKeeper
{
class Program
{
// 这是 生产者(发布者,发送者) 的示例代码
static void Main(string[] args)
{
using (var mqHelper = new MqHelper())
{
Console.WriteLine(" 请输入糖的特征与祝福语:软硬.颜色.味道 祝福语");
Console.WriteLine(" 如:软.红.草莓味 快乐成长");
Console.WriteLine(" 如:硬.灰.巧克力 身体健康");

while (true)
{
var gift = Console.ReadLine().Split(" ");
var candyFeature = gift[0];
var blessings = gift[1];

if (string.IsNullOrWhiteSpace(candyFeature))
candyFeature = "软.粉红.草莓味";

if (string.IsNullOrWhiteSpace(blessings))
blessings = "好好学习,天天向上";

mqHelper.Publish(candyFeature, blessings);
}
}

}
}


class MqHelper : IDisposable
{
IConnection connection = null;
IModel channel = null;

private IModel CreateModel()
{
if (channel == null)
{
var factory = new ConnectionFactory();
factory.HostName = "192.168.86.128";// 这是我的 RabbitMQ 服务器地址
factory.Port = 5672;
factory.UserName = "admin";//用户名
factory.Password = "admin";//密码

connection = factory.CreateConnection();

channel = connection.CreateModel();


// 声明一个交换器
channel.ExchangeDeclare(exchange: "candy_keeper", // 交换器名称
type: "topic"); // 交换器类型

}

return channel;
}

public void Publish(string candyFeature, string blessings)
{
var channel = CreateModel();

var body = Encoding.UTF8.GetBytes(blessings);

channel.BasicPublish(exchange: "candy_keeper",// 交换器名称
routingKey: candyFeature,// 路由键
basicProperties: null,
body: body);

Console.WriteLine($" [Producer(生产者,派糖者)] 派出一些礼物: 糖果:{candyFeature} 祝福语:{blessings}");
Console.WriteLine();
}
public void Dispose()
{
if (connection != null)
connection.Dispose();

if (channel != null)
channel.Dispose();
}
}
}

熊孩子代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
using System;
using System.Text;

using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace MqReceiveLogsDirect
{
class Program
{
// 这是 消费者(接收者) 的示例代码
static void Main(string[] args)
{
string favorite = string.Empty;
if (args.Length > 0)
favorite = args[0];


var factory = new ConnectionFactory();
factory.HostName = "192.168.86.128";// 这是我的 RabbitMQ 服务器地址
factory.Port = 5672;
factory.UserName = "admin";//用户名
factory.Password = "admin";//密码

using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{

// 声明一个交换器
channel.ExchangeDeclare(exchange: "candy_keeper", // 交换器名称
type: "topic"); // 交换器类型


var queueName = channel.QueueDeclare().QueueName;

channel.QueueBind(queue: queueName, // 队列名
exchange: "candy_keeper", // 交换器名称
routingKey: favorite == "" ? "#" : favorite);// 路由键
//如果没有指定路由键,那么就什么口味都合适

Console.WriteLine($" [Consumer(消费者),我喜欢吃的糖果是 {favorite}]:我的心在等待,永远在等待...");
Console.WriteLine();

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var candy = ea.RoutingKey;
var blessings = Encoding.UTF8.GetString(ea.Body);

Console.WriteLine($" [Consumer(消费者),我喜欢吃的糖果是 {favorite}]:收到礼物->{candy} {blessings}");
};
channel.BasicConsume(queue: queueName,
autoAck: true,
consumer: consumer);

Console.ReadLine();
}
}
}
}

启动一个喜欢吃糖的熊孩子

1
2
\Child\bin\Debug\netcoreapp2.2> dotnet Child.dll 软.#
[Consumer(消费者),我喜欢吃的糖果是 软.#]:我的心在等待,永远在等待...

启动一个喜欢吃硬巧克力糖的熊孩子

1
2
\Child\bin\Debug\netcoreapp2.2> dotnet Child.dll 硬.*.巧克力
[Consumer(消费者),我喜欢吃的糖果是 硬.*.巧克力]:我的心在等待,永远在等待...

启动一个什么糖都喜欢吃的熊孩子

1
2
\Child\bin\Debug\netcoreapp2.2> dotnet Child.dll
[Consumer(消费者),我喜欢吃的糖果是 ]:我的心在等待,永远在等待...

启动派糖者

1
2
3
4
\CandyKeeper\bin\Debug\netcoreapp2.2> dotnet CandyKeeper.dll
请输入糖的特征与祝福语:软硬.颜色.味道 祝福语
如:软.红.草莓味 快乐成长
如:硬.灰.巧克力 身体健康

派糖者开始派糖:

1
2
软.粉红.巧克力 健康快乐
[Producer(生产者,派糖者)] 派出一些礼物: 糖果:软.粉红.巧克力 祝福语:健康快乐

哪些熊孩子会收到这些礼物呢?

结束语

下一章是将 MQ 中的 RPG,会有惊喜哦。

觉得文章对您有帮助,请我喝瓶肥宅快乐水可好 (๑•̀ㅂ•́)و✧
  • 本文作者: 阿彬~
  • 本文链接: https://iweixubin.github.io/posts/rabbitmq/topics/
  • 版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!
  • 免责声明:本媒体部分图片,版权归原作者所有。因条件限制,无法找到来源和作者未进行标注。
         如果侵犯到您的权益,请与我联系删除