RabbitMQ 发布-订阅

本文对应 RabbitMQ 官方教程的第三个例子——Publish/Subscribe(发布/订阅),讲述怎么一次向多个消费者发送消息。

发布/订阅

在上一个教程中,我们创建了一个工作队列。 工作队列背后的假设是每个任务都交付给其中一个工作者(worker)。
在这一部分,我们将做一些完全不同的事情 - 我们将一个消息分发给多个消费者(consumers)。 此模式称为“发布/订阅”。

为了演示这种模式,我们将构建一个简单的日志记录系统。
它将包含两个程序 - 第一个将发出日志消息,第二个将接收和打印它们。

在我们的日志记录系统中,接收程序的每个运行实例都将获取消息。
这样我们就可以运行一个接收器并将日志定向到磁盘; 同时我们将能够运行另一个接收器并在屏幕上看到日志。

基本上,发布的日志消息将会被广播给所有接收者(receiver)。

交换器(Exchanges)

在本教程的前几部分中,我们向队列发送消息和从队列接收消息。
现在是时候介绍 RabbitMQ 完整的消息传递模型了。

让我们快速复习前面教程中介绍的内容:

  • 发布者(producer) 是发布消息的应用程序。
  • 队列(queue) 是用于消息存储的缓冲。
  • 消费者(consumer) 是接收消息的应用程序。

RabbitMQ 中消息传递模型的核心思想是发布者(producer)永远不会将任何消息直接发送到队列(queue)。
实际上,生产者通常甚至不知道消息是否会被传递到任何队列。

事实上,发布者(producer)只能将消息发给交换器(exchange),
交换器非常简单,它一边从发布者方接收消息,一边把消息推入队列。
交换器必须知道如何处理它接收到的消息,是应该推送到指定的队列还是是多个队列,或者是直接忽略消息。
这些规则是通过 exchange type 来定义的。

交换器(exchange)

有几种交换器类型可供选择:direct(指定)topic(主题)headers(标题)fanout(扇出)
我们将专注于最后一个 - fanout。 让我们创建一个这种类型的交换,并将其称为log

译者补充
我第一次看到扇入扇出,有点难以理解,后来查了一些资料。fanin(扇入)fanout(扇出)的确是计算机专业术语。

fanin(扇入)如下图所示,是讲多个数据源的数据流向一个接收者。
fanin

fanout(扇出)如下图所示,是讲一个数据源的数据流向多个接收者。
fanout

而这里 MQ 中的 fanout(扇出)的概念和计算机另外一个专业术语广播相似。
另外,熟悉理解交换机类型的才能用好 RabbitMQ

1
channel.ExchangeDeclare("logs", "fanout");

扇出(fanout)类型交换器(exchange)非常简单。
正如您可能从名称中猜到的那样——成扇形散开——它只是将收到的所有消息广播到它知道的所有队列中。
而这正是我们多个日志记录器都能接收同一个消息所需要的。

交换器列表——(Listing exchanges)
要列出服务器上的交换器(exchange),您可以运行 rabbitmqctl

1
sudo rabbitmqctl list_exchanges

在此列表中将有一些 amq.* 交换和默认(未命名)交换。 这些是默认创建的,但目前您不太可能需要使用它们。

默认交换器——(The default exchange)
在本教程的前几部分中,我们对交换器(exchange)一无所知,
但仍然可以向队列发送消息,这是因为我们使用默认交换器(exchange)。

回想一下我们之前如何发布消息:

1
2
3
4
5
6
var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "", // 空字符串表示使用默认交换器(exchange)
routingKey: "hello",
basicProperties: null,
body: body);

第一个参数是交换器(exchange)的名称。
空字符串表示默认或未命名交换器(exchange):消息通过 routingKey 指定的名称路由到队列(如果存在)。

现在,我们可以发布消息到我们的命名交换器(exchange):

1
2
3
4
5
6
var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "logs",
routingKey: "",
basicProperties: null,
body: body);

你还记得之前我们使用的队列名吗(hellotask_queue)?
给一个队列命名是很重要的——我们需要把工作者(workers)指向正确的队列。
如果你打算在发布者(producers)和消费者(consumers)之间共享同队列的话,给队列命名是十分重要的。

但是这并不适用于我们的日志系统。
我们打算接收所有的日志消息,而不仅仅是一小部分。我们关心的是最新的消息而不是旧的。
为了解决这些问题,我们需要做两件事情。

首先,当我们连接上RabbitMQ的时候,我们需要一个全新的、空的队列。
我们可以手动创建一个随机的队列名,或者让 RabbitMQ 为我们选择一个随机的队列名。

其次,一旦消费者断开,就应该自动删除它所对应的队列。

在 .NET 客户端中,当我们不向 QueueDeclare() 提供参数时,我们使用生成的名称创建一个非持久的,独占的自动删除队列:

1
var queueName = channel.QueueDeclare().QueueName;

您可以在队列指南中了解有关独占标志和其他队列属性的更多信息。

有关 exclusive 标签和队列其它参数的信息,可以阅读 guide on queues

此时,queueName 包含随机队列名称。 例如,它可能看起来像 amq.gen-JzTY20BRgKO-HjmUJj0wLg

绑定(Bindings)

绑定

我们已经创建了一个扇出交换器(fanout exchange)和一个队列(queue)。
现在我们需要告诉交换器(exchange)将消息发送到我们的队列。 交换和队列之间的关系称为绑定(Binding)。

1
2
3
4
// 队列(queue) 绑定到指定 交换器(exchange) !!!
channel.QueueBind(queue: queueName,
exchange: "logs",
routingKey: "");

现在,日志交换器(exchange)会将消息附加到我们的队列(queue)中去。

绑定列表——Listing bindings
你可以使用 rabbitmqctl list_bindings 列出所有存在的绑定。

完整演示

为了更好地演示,下面的代码和官方教程的不同。
官方例子传送门

Publish(发布) 代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
using System;
using System.Text;

using RabbitMQ.Client; // 请自行用 NuGet 添加依赖

namespace MqPublish
{
class Program
{
// 这是 生产者(发布者,发送者) 的示例代码
static void Main(string[] args)
{
using (var mqHelper = new MqHelper())
{
Console.WriteLine(" 请输入消息,将会广播给所有订阅者(消费者,接收者)...");

while (true)
{
var msg = Console.ReadLine();

mqHelper.Publish(msg);
}
}

}
}


class MqHelper : IDisposable
{
IConnection connection = null;
IModel channel = null;

private IModel CreateModel()
{
if (channel == null)
{
var factory = new ConnectionFactory();
factory.HostName = "192.168.86.128";// 这是我的 RabbitMQ 服务器地址
factory.Port = 5672;
factory.UserName = "admin";//用户名
factory.Password = "admin";//密码

connection = factory.CreateConnection();

channel = connection.CreateModel();


// 声明一个交换器
channel.ExchangeDeclare(exchange: "logs", // 交换器名称
type: "fanout"); // 交换器类型

// 和前两章教程中的例子不同的是,这里不再声明队列了!
// 请在脑海中刷新 RabbitMQ 的模型!!!
}

return channel;
}

public void Publish(string message)
{
var channel = CreateModel();

var body = Encoding.UTF8.GetBytes(message);

// !机制:将 消息 发布到指定 交换器 !!!
channel.BasicPublish(exchange: "logs",// 交换器名称
routingKey: "", // 怎么路由会在下一个教程中讲解
basicProperties: null,
body: body);

Console.WriteLine(" [Producer(生产者)] 广播消息: {0}", message);
Console.WriteLine();
}
public void Dispose()
{
if (connection != null)
connection.Dispose();

if (channel != null)
channel.Dispose();
}
}
}

Subscribe(订阅) 代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
using System;
using System.Text;

using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace MqSubscribe
{
class Program
{
// 这是 消费者(接收者,订阅者) 的示例代码
static void Main(string[] args)
{
string subscriberNO = string.Empty;
if (args.Length > 0)
subscriberNO = args[0];

var factory = new ConnectionFactory();
factory.HostName = "192.168.86.128";// 这是我的 RabbitMQ 服务器地址
factory.Port = 5672;
factory.UserName = "admin";//用户名
factory.Password = "admin";//密码

using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
// 声明一个交换器
channel.ExchangeDeclare(exchange: "logs", // 交换器名称
type: "fanout"); // 交换器类型

// 让 RabbitMQ 分配一个队列名称
var queueName = channel.QueueDeclare().QueueName;

// !机制:将 队列(queue) 绑定到指定 交换器(exchange) !!!
channel.QueueBind(queue: queueName, // 队列名
exchange: "logs", // 交换器名称
routingKey: ""); // 怎么路由会在下一个教程中讲解

Console.WriteLine($" [Consumer-Subscriber(消费者-订阅者,编号:{subscriberNO})]:我的心在等待,永远在等待...");
Console.WriteLine();

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($" [Consumer-Subscriber(消费者-订阅者,编号:{subscriberNO})]:收到广播消息-> {message}");
};
channel.BasicConsume(queue: queueName,
autoAck: true,
consumer: consumer);

Console.ReadLine();
}
}
}
}

运行 订阅者 三个控制台实例:

1
2
\MqSubscribe\bin\Debug\netcoreapp2.1> dotnet MqSubscribe.dll 555
[Consumer-Subscriber(消费者-订阅者,编号:555)]:我的心在等待,永远在等待...
1
2
\MqSubscribe\bin\Debug\netcoreapp2.1> dotnet MqSubscribe.dll 666
[Consumer-Subscriber(消费者-订阅者,编号:666)]:我的心在等待,永远在等待...
1
2
\MqSubscribe\bin\Debug\netcoreapp2.1> dotnet MqSubscribe.dll 777
[Consumer-Subscriber(消费者-订阅者,编号:777)]:我的心在等待,永远在等待...

运行 发布者 一个控制台实例:

1
2
\MqPublish\bin\Debug\netcoreapp2.1> dotnet MqPublish.dll
请输入消息,将会广播给所有订阅者(消费者,接收者)...

发布者 发布消息:

1
2
3
4
5
大家好~
[Producer(生产者)] 广播消息: 大家好~

下面请听我说两句~
[Producer(生产者)] 广播消息: 下面请听我说两句~

订阅者 都收到了消息:

1
2
[Consumer-Subscriber(消费者-订阅者,编号:555)]:收到广播消息-> 大家好~
[Consumer-Subscriber(消费者-订阅者,编号:555)]:收到广播消息-> 下面请听我说两句~
1
2
[Consumer-Subscriber(消费者-订阅者,编号:666)]:收到广播消息-> 大家好~
[Consumer-Subscriber(消费者-订阅者,编号:666)]:收到广播消息-> 下面请听我说两句~
1
2
[Consumer-Subscriber(消费者-订阅者,编号:777)]:收到广播消息-> 大家好~
[Consumer-Subscriber(消费者-订阅者,编号:777)]:收到广播消息-> 下面请听我说两句~

显示结果很直观:logs 交换器(exchange)把数据发送给两个系统命名的队列,这就是我们所期望的。

结束语

理解 MQ 多种工作模式是学好和用好 MQ 的关键,所以下一章介绍新的工作模式:Routing(路由选择)

觉得文章对您有帮助,请我喝瓶肥宅快乐水可好 (๑•̀ㅂ•́)و✧
  • 本文作者: 阿彬~
  • 本文链接: https://iweixubin.github.io/posts/rabbitmq/publish-subscribe/
  • 版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!
  • 免责声明:本媒体部分图片,版权归原作者所有。因条件限制,无法找到来源和作者未进行标注。
         如果侵犯到您的权益,请与我联系删除