RabbitMQ 工作队列

本文对应 RabbitMQ 官方教程的第二个例子——Work Queues(工作队列),讲述怎么任务分发(竞争消费者模式)。

工作队列

工作队列

在第一篇教程中,我们编写了程序来发送和接收来自命名队列的消息。
在这个例子中,我们将创建一个工作队列,用于在多个工作人员(Worker)之间分配耗时的任务。

工作队列(又称:任务队列——Task Queues)背后的主要思想是避免等待长时间运行的操作。
相反,我们安排任务(Task)稍后完成。 我们将任务封装为消息并将其发送到队列。
在后台运行的工作进程(Worker)将接收任务并执行。
当您运行许多工作程序时,任务可以按某种策略分配给它们。

在工人之间分配任务(竞争消费者模式)

此概念在 Web 应用程序中特别有用,在这些应用程序中,HTTP 请求在短暂期间内无法长时间地处理复杂任务。

准备

在之前的教程中,我们发送了一个包含 “Hello World” 的字符串消息。
现在,我们将发送一些字符串,把这些字符串当作复杂的任务。
我们没有使用真实的例子,例如图片缩放、pdf文件转换,所以使用 sleep() 函数来模拟这种情况。
我们在字符串中加上点号(.)来表示任务的复杂程度,一个点(.)将会耗时1秒钟。比如 “Hello…” 就会耗时3秒钟。

接下来的内容和官方教程差异比较大,
官方是先给出概念和代码段,然后再给出完整的代码,我是给出完整的代码运行起来,然后再解释概念和代码段。

消息生产者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
using System;
using System.Text;

using RabbitMQ.Client; // 请自行用 NuGet 添加依赖

namespace MqTask
{
class Program
{
// 这是 生产者(发布者,发送者) 的示例代码
static void Main()
{
using (var mqHelper = new MqHelper())
{
Console.WriteLine(" 请输入任务,结尾每一个 . 代表需要执行3秒,如:Hello... 表示需要执行9秒");

while (true)
{
var msg = Console.ReadLine();

mqHelper.Publish(msg);
}
}

}
}


class MqHelper : IDisposable
{
IConnection connection = null;
IModel channel = null;

private IModel CreateModel()
{
if (channel == null)
{
var factory = new ConnectionFactory();
factory.HostName = "192.168.86.128";// 这是我的 RabbitMQ 服务器地址
factory.Port = 5672;
factory.UserName = "admin";//用户名
factory.Password = "admin";//密码

connection = factory.CreateConnection();

channel = connection.CreateModel();

// 声明一个持久化的队列。请看 “消息持久化” 小节
channel.QueueDeclare(queue: "task_queue",
durable: true, // 开启持久化队列。请看 "消息持久化" 小节
exclusive: false,
autoDelete: false,
arguments: null);
}

return channel;
}

public void Publish(string message)
{
var channel = CreateModel();

// 特别注意
var properties = channel.CreateBasicProperties();
properties.Persistent = true;

var body = Encoding.UTF8.GetBytes(message);

channel.BasicPublish(exchange: "",
routingKey: "task_queue",
basicProperties: properties,
body: body);

Console.WriteLine(" [Producer(生产者)] 发送任务(消息): {0}", message);
Console.WriteLine();
}
public void Dispose()
{
if (connection != null)
connection.Dispose();

if (channel != null)
channel.Dispose();
}
}
}

消息消费者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
using System;
using System.Text;
using System.Threading;

using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace MqWorker
{
class Program
{
// 这是 消费者(接收者,工作者) 的示例代码
public static void Main(string[] args)
{
string workerNO = string.Empty;
if (args.Length > 0)
workerNO = args[0]; // 通过命令行传参数

var factory = new ConnectionFactory();
factory.HostName = "192.168.86.128";// 这是我的 RabbitMQ 服务器地址
factory.Port = 5672;
factory.UserName = "admin";//用户名
factory.Password = "admin";//密码

using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
// 声明一个持久化的队列。请看 “消息持久化” 小节
channel.QueueDeclare(queue: "task_queue",
durable: true, // 开启持久化队列。请看 "消息持久化" 小节
exclusive: false,
autoDelete: false,
arguments: null);

// 声明一次只处理一个消息。请看 “公平分发” 小节
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

Console.WriteLine($" [Consumer-Worker(消费者-工作者,编号:{workerNO})]:我的心在等待,永远在等待...");
Console.WriteLine();

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($" [Consumer-Worker(消费者-工作者,编号:{workerNO})]:收到任务(消息)-> {message}");

int dots = message.Split('.').Length - 1;
Thread.Sleep(dots * 1000 *3);

Console.WriteLine($" [Consumer-Worker(消费者-工作者,编号:{workerNO})]:任务完成 ^_^ ");
Console.WriteLine();

// 任务完成,显示调用 消息确认。请看 “消息确认” 章节
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};

channel.BasicConsume(queue: "task_queue",
autoAck: false, // 关闭消息自动确认。请看 “消息确认” 章节
consumer: consumer);

Console.ReadLine();
}
}
}
}

为了实践本例子,
需要一个控制台(命令行窗口)运行 MqTask.dll

1
2
\MqTask\bin\Debug\netcoreapp2.1> dotnet MqTask.dll
请输入任务,结尾每一个 . 代表需要执行3秒,如:Hello... 表示需要执行9

两个以上的控制台(命令行窗口)运行 MqWorker.dll

1
2
\MqWorker\bin\Debug\netcoreapp2.1> dotnet MqWorker.dll 666
[Consumer-Worker(消费者-工作者,编号:666)]:我的心在等待,永远在等待...
1
2
\MqWorker\bin\Debug\netcoreapp2.1> dotnet MqWorker.dll 777
[Consumer-Worker(消费者-工作者,编号:777)]:我的心在等待,永远在等待...

MqTask 控制台发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Mession A ..
[Producer(生产者)] 发送任务(消息): Mession A ..

Mession B .
[Producer(生产者)] 发送任务(消息): Mession B .

Mession C ..
[Producer(生产者)] 发送任务(消息): Mession C ..

Mession D ...
[Producer(生产者)] 发送任务(消息): Mession D ...

Mession E ..
[Producer(生产者)] 发送任务(消息): Mession E ..

MqWorker.dll 666 控制台收到消息

1
2
3
4
5
6
7
8
[Consumer-Worker(消费者-工作者,编号:666)]:收到任务(消息)-> Mession A ..
[Consumer-Worker(消费者-工作者,编号:666)]:任务完成 ^_^

[Consumer-Worker(消费者-工作者,编号:666)]:收到任务(消息)-> Mession C ..
[Consumer-Worker(消费者-工作者,编号:666)]:任务完成 ^_^

[Consumer-Worker(消费者-工作者,编号:666)]:收到任务(消息)-> Mession E ..
[Consumer-Worker(消费者-工作者,编号:666)]:任务完成 ^_^

MqWorker.dll 777 控制台收到消息

1
2
3
4
5
[Consumer-Worker(消费者-工作者,编号:777)]:收到任务(消息)-> Mession B .
[Consumer-Worker(消费者-工作者,编号:777)]:任务完成 ^_^

[Consumer-Worker(消费者-工作者,编号:777)]:收到任务(消息)-> Mession D ...
[Consumer-Worker(消费者-工作者,编号:777)]:任务完成 ^_^

概念与术语

从上面的例子,我们可以看出。

轮询分发

轮询分发——(Round-robin dispatching)

使用任务队列的一个优点是能够轻松地并行工作。
如果我们积压了很多工作,那么我们可以添加更多工作进程,这样就可以轻松扩展。

默认情况下,RabbitMQ 将按顺序将每条消息发送给下一个消费者。
平均而言,每个消费者将获得相同数量的消息。这种分发消息的方式称为循环法。
读者可以自行尝试三个甚至更多的消费者的情况。

消息确认

消息确认——(Message acknowledgment)

工作者在接收到消息后,完成他的业务代码需要一些时间,
你可能想知道在一个消费者接收到一个消息,然后执行业务代码到一部分的时候退出或崩溃了会怎么办。
之前的 “Hello World” 例子,RabbitMQ 把消息传递给消费者后就会把这些消息删除掉,
在这种情况下,如果你突然关闭掉一个工作者,我们就会失去这个工作者正在执行的,
以及所有 RabbitMQ 派发给他并且还没来得及执行的消息。

但是我们并不想失去任何的任务消息,
如果一个工作者出现异常,我们想把这个工作者的任务消息派发到其他的工作者。

为了确保消息永不丢失,RabbitMQ 支持消息确认。
消费者执行完任务后发回 ack(nowledgement) 通知 RabbitMQ,那么 RabbitMQ 则可以自由删除它。

如果一个消费者没有返回消息答复就挂掉了(信道关闭,连接关闭或者 TCP 链接丢失),
RabbitMQ 就会明白,这个消息还没有被完成,RabbitMQ 就会重新把这条消息放入队列,
如果在这个时间有其他的消费者在线,那么 RabbitMQ 就会迅速地把这条消息传递给其他的消费者,这样就确保了没有消息会丢失。

没有任何消息超时; 当消费者退出或崩溃时,RabbitMQ 将重新发送消息。 即使处理消息需要非常长的时间,也没关系。
(RabbitMQ 不会有超时设置?)

消息确认必须在收到交付的同一 channel 上发送。 尝试使用不同的 channel 进行确认将导致通道级协议异常。 有关确认的文档,请参阅了解更多信息。

忘记消息确认——(Forgotten acknowledgment)
忘记调用 BasicAck 是一个常见的错误。 这是一个简单的错误,但后果是严重的。
当客户端退出时,消息将被重新传递(这可能看起来像随机重新传递),但是 RabbitMQ 将会占用越来越多的内存,因为它无法释放任何未经确认的消息。

为了调试这类型错误你可以使用 rabbitmqctl 来打印 messages_unacknowledged 字段:

1
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
1
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

另外,可以使用 RabbitMQ 自带的 Web 管理界面来查看。

消息持久化

消息持久化——(Message durability)

我们已经学会了如何确保即使消费者退出或崩溃时,任务也不会丢失。
但是如果 RabbitMQ 服务器退出或崩溃时,那么我们的任务仍然会丢失。

当 RabbitMQ 退出或崩溃时,它将忘记队列消息,除非你告诉它不要。
确保消息不会丢失需要做两件事:我们需要将队列消息都标记为持久。

首先,我们需要确保RabbitMQ永远不会丢失我们的队列。 为此,我们需要声明它是持久的:

1
2
3
4
5
channel.QueueDeclare(queue: "task_queue",
durable: true, // 开启 队列 持久化
exclusive: false,
autoDelete: false,
arguments: null);

此时我们确信即使 RabbitMQ 重新启动,task_queue 队列 也不会丢失。
现在我们需要将消息标记为持久化 - 通过将 IBasicProperties.SetPersistent 设置为 true

1
2
var properties = channel.CreateBasicProperties();
properties.Persistent = true;

有关消息持久性的注释
消息标记为持久性并不能完全保证消息不会丢失。
虽然它告诉 RabbitMQ 将消息保存到磁盘,但是当 RabbitMQ 接受消息并且尚未保存消息时,仍然有一个短时间窗口。
此外,RabbitMQ 不会为每条消息执行 fsync(2) - 它可能只是保存到缓存而不是真正写入磁盘。
持久性保证不强,但对于我们简单的任务队列来说已经足够了。
如果您需要更强的保证,那么您可以使用发布者确认

公平分发

公平分发——(Fair dispatch)

轮询分发

如果之前 “Hello World” 的例子,采用轮询分发。
但如果有些任务很重,而有些任务很轻,那么则可能造成一个工作者很忙,而另外一个几乎不用做什么工作。

会发生这种情况是因为 RabbitMQ 只是在消息进入队列时调度消息。
它不会查看消费者未确认消息的数量。 它只是盲目地向第 n 个消费者发送每个第 n 个消息。

为了改变这种行为,我们可以使用 BasicQos 方法和 prefetchCount = 1 设置。
这告诉 RabbitMQ 一次只向一个工作者发送一条消息,换句话说,在处理并确认前一个消息之前,不要再向工作者发送新消息。

1
channel.BasicQos(0, 1, false);

关于队列大小的说明
如果所有工作进程都很忙,您的队列就会填满。
您将需要密切关注这一点,并可能添加更多工作人员,或者采取其他策略。

结束语

上一章的 Hello World,比较符合寄信人将信投递到邮箱,最后转交到收信人手中,但这个新的工作模式已经刷新了前面的类比,毕竟邮递员不会将信件轮询投递给人。
所以要使用好 RabbitMQ,还有很多内容需要学习,所以继续下一章的 Publish/Subscribe(发布/订阅)

觉得文章对您有帮助,请我喝瓶肥宅快乐水可好 (๑•̀ㅂ•́)و✧
  • 本文作者: 阿彬~
  • 本文链接: https://iweixubin.github.io/posts/rabbitmq/work-queues/
  • 版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!
  • 免责声明:本媒体部分图片,版权归原作者所有。因条件限制,无法找到来源和作者未进行标注。
         如果侵犯到您的权益,请与我联系删除