本文对应 RabbitMQ 官方教程的第三个例子——Publish/Subscribe(发布/订阅),讲述怎么一次向多个消费者发送消息。
发布/订阅
在上一个教程中,我们创建了一个工作队列。 工作队列背后的假设是每个任务都交付给其中一个工作者(worker)。
在这一部分,我们将做一些完全不同的事情 - 我们将一个消息分发给多个消费者(consumers)。 此模式称为“发布/订阅”。
为了演示这种模式,我们将构建一个简单的日志记录系统。
它将包含两个程序 - 第一个将发出日志消息,第二个将接收和打印它们。
在我们的日志记录系统中,接收程序的每个运行实例都将获取消息。
这样我们就可以运行一个接收器并将日志定向到磁盘; 同时我们将能够运行另一个接收器并在屏幕上看到日志。
基本上,发布的日志消息将会被广播给所有接收者(receiver)。
交换器(Exchanges)
在本教程的前几部分中,我们向队列发送消息和从队列接收消息。
现在是时候介绍 RabbitMQ 完整的消息传递模型了。
让我们快速复习前面教程中介绍的内容:
- 发布者(producer) 是发布消息的应用程序。
- 队列(queue) 是用于消息存储的缓冲。
- 消费者(consumer) 是接收消息的应用程序。
RabbitMQ 中消息传递模型的核心思想是发布者(producer)永远不会将任何消息直接发送到队列(queue)。
实际上,生产者通常甚至不知道消息是否会被传递到任何队列。
事实上,发布者(producer)只能将消息发给交换器(exchange),
交换器非常简单,它一边从发布者方接收消息,一边把消息推入队列。
交换器必须知道如何处理它接收到的消息,是应该推送到指定的队列还是是多个队列,或者是直接忽略消息。
这些规则是通过 exchange type 来定义的。
有几种交换器类型可供选择:direct(指定)
, topic(主题)
, headers(标题)
和 fanout(扇出)
。
我们将专注于最后一个 - fanout
。 让我们创建一个这种类型的交换,并将其称为log
:
译者补充
我第一次看到扇入扇出,有点难以理解,后来查了一些资料。fanin(扇入)
和fanout(扇出)
的确是计算机专业术语。
fanin(扇入)
如下图所示,是讲多个数据源的数据流向一个接收者。
fanout(扇出)
如下图所示,是讲一个数据源的数据流向多个接收者。而这里 MQ 中的
fanout(扇出)
的概念和计算机另外一个专业术语广播
相似。
另外,熟悉理解交换机类型的才能用好 RabbitMQ
1 | channel.ExchangeDeclare("logs", "fanout"); |
扇出(fanout)类型交换器(exchange)非常简单。
正如您可能从名称中猜到的那样——成扇形散开——它只是将收到的所有消息广播到它知道的所有队列中。
而这正是我们多个日志记录器都能接收同一个消息所需要的。
交换器列表——(Listing exchanges)
要列出服务器上的交换器(exchange),您可以运行rabbitmqctl
:
1 | sudo rabbitmqctl list_exchanges |
在此列表中将有一些 amq.*
交换和默认(未命名)交换。 这些是默认创建的,但目前您不太可能需要使用它们。
默认交换器——(The default exchange)
在本教程的前几部分中,我们对交换器(exchange)一无所知,
但仍然可以向队列发送消息,这是因为我们使用默认交换器(exchange)。
回想一下我们之前如何发布消息:
1 | var message = GetMessage(args); |
第一个参数是交换器(exchange)的名称。
空字符串表示默认或未命名交换器(exchange):消息通过 routingKey
指定的名称路由到队列(如果存在)。
现在,我们可以发布消息到我们的命名交换器(exchange):
1 | var message = GetMessage(args); |
你还记得之前我们使用的队列名吗(hello
和 task_queue
)?
给一个队列命名是很重要的——我们需要把工作者(workers)指向正确的队列。
如果你打算在发布者(producers)和消费者(consumers)之间共享同队列的话,给队列命名是十分重要的。
但是这并不适用于我们的日志系统。
我们打算接收所有的日志消息,而不仅仅是一小部分。我们关心的是最新的消息而不是旧的。
为了解决这些问题,我们需要做两件事情。
首先,当我们连接上RabbitMQ的时候,我们需要一个全新的、空的队列。
我们可以手动创建一个随机的队列名,或者让 RabbitMQ 为我们选择一个随机的队列名。
其次,一旦消费者断开,就应该自动删除它所对应的队列。
在 .NET 客户端中,当我们不向 QueueDeclare()
提供参数时,我们使用生成的名称创建一个非持久的,独占的自动删除队列:
1 | var queueName = channel.QueueDeclare().QueueName; |
您可以在队列指南中了解有关独占标志和其他队列属性的更多信息。
有关 exclusive
标签和队列其它参数的信息,可以阅读 guide on queues
此时,queueName
包含随机队列名称。 例如,它可能看起来像 amq.gen-JzTY20BRgKO-HjmUJj0wLg
。
绑定(Bindings)
我们已经创建了一个扇出交换器(fanout exchange)和一个队列(queue)。
现在我们需要告诉交换器(exchange)将消息发送到我们的队列。 交换和队列之间的关系称为绑定(Binding)。
1 | // 队列(queue) 绑定到指定 交换器(exchange) !!! |
现在,日志交换器(exchange)会将消息附加到我们的队列(queue)中去。
绑定列表——Listing bindings
你可以使用rabbitmqctl list_bindings
列出所有存在的绑定。
完整演示
为了更好地演示,下面的代码和官方教程的不同。
官方例子传送门
Publish(发布) 代码
1 | using System; |
Subscribe(订阅)
代码
1 | using System; |
运行 订阅者
三个控制台实例:
1 | \MqSubscribe\bin\Debug\netcoreapp2.1> dotnet MqSubscribe.dll 555 |
1 | \MqSubscribe\bin\Debug\netcoreapp2.1> dotnet MqSubscribe.dll 666 |
1 | \MqSubscribe\bin\Debug\netcoreapp2.1> dotnet MqSubscribe.dll 777 |
运行 发布者
一个控制台实例:
1 | \MqPublish\bin\Debug\netcoreapp2.1> dotnet MqPublish.dll |
发布者
发布消息:
1 | 大家好~ |
订阅者
都收到了消息:
1 | [Consumer-Subscriber(消费者-订阅者,编号:555)]:收到广播消息-> 大家好~ |
1 | [Consumer-Subscriber(消费者-订阅者,编号:666)]:收到广播消息-> 大家好~ |
1 | [Consumer-Subscriber(消费者-订阅者,编号:777)]:收到广播消息-> 大家好~ |
显示结果很直观:logs 交换器(exchange)把数据发送给两个系统命名的队列,这就是我们所期望的。
结束语
理解 MQ 多种工作模式是学好和用好 MQ 的关键,所以下一章介绍新的工作模式:Routing(路由选择)