波斯马BOSSMA Information Technology

消费者应答(ACK)和发布者确认(Confirm)

发布时间:2017年11月2日 / 分类:RabbitMQ / 13,144 次浏览 / 评论

这篇文章翻译自:http://www.rabbitmq.com/confirms.html

译注:这里将Consumer Acknowledgements翻译为消费者应答,将Acknowledgement简称为Ack,Acknowledgement本意是应答或者承认,肯定的应答实际产生确认的效果。Publisher Confirms翻译为发布者确认,Confirm本意是通过再次检查确认某些事是正确的,但是RabbitMQ代理也可能向发布者返回消息接收失败。

引言

如果系统使用类似RabbitMQ这种分布式的消息代理,因为协议方法(消息)发送不保证到达节点或者被节点成功处理,所以发布者和消费者都需要某种机制来对投递和处理进行确认。RabbitMQ的一些消息协议支持这些功能。这篇指南将涵盖AMQP 0-9-1中的这些功能,其想法与其它一些协议(STOMP, MQTT等等)大致相同。

在AMQP 0-9-1中从消费者到RabbitMQ的投递处理确认被称为应答,代理对发布者的确认是一个协议扩展,称为发布者确认。

(消费者)投递应答

当RabbitMQ投递一条消息到消费者时,它需要知道什么时候认定消息已经成功发送。什么样的逻辑是最优的取决于业务系统。因此它主要是一个应用的决策。在AMQP 0-9-1 中,这个决策发生在当一个消费者使用basic.consume方法注册时,或者通过basic.get方法获取一条消息时。

如果你想要更多实例和按部就班的资料,RabbitMQ教程#2中涵盖了消费者应答。

投递标识符:投递标签(Tag)

在继续讨论其它主题之前,需要重点说明投递是怎样被标识的(以及应答声明他们各自对应的投递)。当消费者(订阅)注册后,RabbitMQ将使用basic.deliver方法投递(推送)消息。这个方法携带一个投递标签,在每个通道上可以唯一的标识一次投递。因此投递标签的作用域是每个通道。

投递标签是单调增长的正整数,由客户端类库提供。应答投递的客户端类库方法也使用投递标签作为参数。

Ack模式

依据使用的Ack模式,RabbitMQ认定消息被成功投递的时机,既可以在消息一经发送之后(已写到TCP Socket),也可以在收到明确的(手动的)客户端ACK时。使用下边的某个协议方法,手动发送的Ack可以是肯定的或者否定的:

  • basic.ack用于肯定Ack
  • basic.nack用于否定Ack(注:这是RabbitMQ对AMQP 0-9-1的扩展)
  • basic.reject用于否定Ack,但是与basic.nack相比限制1条

肯定Ack简单的指示RabbitMQ记录一条消息已被投递。否定Ack使用basic.reject有相同的效果。不同的主要是语义:肯定Ack认为消息已经被成功处理,否定Ack则表明投递没有被处理但是仍然应该被删除。

一次应答多条投递

手动Ack可以采取批量处理的方式,以减少网络通信。这可以通过设置Ack方法中的multiple字段来完成。注意basic.reject中一直没有这个字段,这也是RabbitMQ将basic.nack作为一个协议扩展引入的原因。

multiple字段被设置为true,RabbitMQ将应答所有未完成的投递标签,这些标签直到并包括在Ack中指定的标签。像其它所有与Ack有关的一样,它的作用域是每个通道。例如,在通道Ch中有几个未应答的投递标签5、6、7、8,当通道中有一个设置了delivery_tag为8和multiple为true的Ack帧到达时,所有从5到8的标签都将被确认。如果multiple被设置为false,投递5、6、7将仍然是未被应答的。

通道预取设置(QoS)

因为消息发送(推送)到客户端是异步的,所以在通道中某个给定的时刻经常有超过1条的消息在处理。此外,客户端发起的手动Ack也是天然异步的。因此对于未应答的投递标签有一个滑动窗口。开发者往往希望为这个窗口的大小增加一个限制,以避免消费者端的无限缓冲问题。这可以通过使用basis.qos方法设置prefetch count的值来实现。这个值定义了每个通道上允许的未应答投递的最大数量。一旦数量达到这个配置的值,RabbitMQ将停止在这个通道上投递更多的消息,除非有一个未完成的消息被应答。

例如,在通道Ch上有几个未应答的投递标签5、6、7、8,通道Ch的prefetch count设置为4,RabbitMQ将不会在Ch上推送更多的投递,除非其中某个未完成的投递被应答。当通道中有一个设置了delivery_tag为8的Ack帧到达时,RabbitMQ将会通知并再次投递消息。

需要反复重申的是投递和客户端手动Ack的流程是完全异步的。因此如果已经有投递在处理时prefetch的值被更改,将会出现紊乱情况,通道上未应答的消息数量可能临时的超过prefetch的值。

QoS设置可以为通道或者消费者配置。详情看消费者预取

QoS设置对于使用basic.get(拉API)获取消息没有影响,即使是手动确认模式。

当消费者失败或者断开连接:自动重新入队

使用手动Ack的情况下,当投递使用的通道(或连接)被关闭时,任何没有被应答的投递(消息)将自动的重新入队列。这包括客户端丢失TCP连接,消费者应用(处理)故障,以及通道级的协议异常(下边会提到)。

注意检测到不可用的客户端将需要一段时间。

由于这些行为,消费者必须准备好处理重新投递,另外需要考虑实现幂等性。重新投递会有一个特殊的布尔值属性:redeliver,RabbitMQ会将其设置为true。首次投递时这个值将设置为false。注意消费者可能会收到之前投递向其它消费者的消息。

客户端错误:重复Ack和未知标签

假设客户端对同一个投递标签应答了不止一次,RabbitMQ将会产生一个通道错误,比如PRECONDITION_FAILED -?unknown delivery tag 100。如果使用了一个未知的投递标签,也会有同样的通道异常抛出。

发布者确认(Confirm)

使用标准的AMQP 0-9-1,只有一个方法可以保证消息不丢失:使用事务,让通道具有事务性,发布消息,然后提交。在这种情况下,事务过于笨重,吞吐量会降低250倍。为了补救这个问题,一个确认机制被引入。它模仿协议中已经存在的消费者Ack机制。

为了使确认可用,客户端需要发送confirm.select方法。根据no-wait是否设置,代理可能会响应一个confirm.select-ok。一旦在通道上使用confirm.select方法,就可以说是确认模式。一个事务性的通道不能进入确认模式,一个确认模式的通道也不能成为事务性的。

一旦通道进入确认模式,代理和客户端都将计算消息总数(计算从第一次confirm.select从1开始)。代理处理消息时将通过在同一通道上发送一个basic.ack来确认消息。delivery-tag字段包含确认消息的序列号。代理也可能设置basic.ack的multiple字段,以指示所有消息已经被处理,这些消息直到并包括有序列号的这个。

有一个Java的例子:使用确认模式发布大量的消息到通道,然后等待确认。点击这里查看

否定应答

在异常情况下,当代理不能成功处理消息时,代替basic.ack,代理将发送一个basic.nack。在此上下文中,basic.nack的字段与basic.ack相应的字段具有相同的含义,除了requeue字段应该被忽略。通过否定应答一条或多条消息,代理表示它不能处理这些消息,并且拒绝为这些消息负责;此时,客户端可能选择重新发布消息。

当通道进入确认模式,随后发布的所有消息都将被确认或者否定应答一次。不能保证消息确认的速度。消息将不能同时被确认和否定应答。

basic.nack将仅在负责某个队列的Erlang进程发生内部错误时被投递。

消息什么时候被确认?

对于不可路由的消息,代理将在交换机验证消息不能被路由到任何一个队列时(返回空的队列列表)发出一个确认。如果消息被发布为强制的,basic.return将在basic.ack之前发送到客户端。对于否定应答(basic.nack)也是如此。

对于可路由的消息,当消息被所有的队列接收后,代理将发送basic.ack。对于路由到持久化队列的持久化消息,这意味着持久化到磁盘。对于镜像队列,这意味着所有的镜像都已经接收了消息。

持久化消息的Ack延迟

对于路由到持久化队列的持久化消息,basic.ack将在持久化消息到磁盘后发送。RabbitMQ消息存储在某个时间间隔(几百毫秒)之后将消息批量保存到磁盘,以最大限度减少fsync(2)的调用次数,或者当队列空闲时。这意味着在恒定负荷下,basic.ack的延迟可以达到数百毫秒。为了提升吞吐量,强烈建议应用异步处理Ack,或者批量发布消息后等待未完成的确认。对于这些方式在客户端库中有明确的API。

发布者确认的排序考量

在大多数情况下,RabbitMQ将按照消息发布的顺序向发布者进行确认(这适用于消息都发布到一个通道的情况)。然而,发布者Ack是异步发出的,可以确认单条消息,也可以确认一组消息。确认发出的确切时刻取决于消息投递的模式(持久化还是临时的)和消息将如何路由的队列属性。也就是说不同的消息将在不同的时刻准备好Ack。这意味着Ack与对应的消息将以不同的顺序到达。应用应该尽可能不依赖Ack的顺序。

发布者确认和保证投递

如果代理在表示消息已经写入磁盘之前崩溃了,持久化消息将丢失。在某些条件下,这将导致代理出人意料的表现。

对于实例,考虑如下场景:

1、一个客户端发布一条持久化消息到持久化队列

2、一个客户端从队列中消费这条消息(注意这条消息是持久化的,这个队列也是持久化的),但是还没有Ack

3、代理挂掉并重新启动

4、客户端重新连接并启动消费消息

此时,客户端有理由假设消息将被再次投递。情况可能并非如此:重启将导致代理丢失这条消息。为了保证持久化,客户端应该使用确认。如果发布者的通道已经处于确认模式,发布者将不会收到这条丢失消息的Ack(因为消息还没有被写入磁盘)。

投递标签的最大值

投递标签是个64位的长整形值,因此最大值是9223372036854775807。因为投递标签的作用域是每个通道,实际上发布者或者消费者都不太可能超过这个值。

本博客所有文章如无特别注明均为原创。
复制或转载请以超链接形式注明转自波斯马,原文地址《消费者应答(ACK)和发布者确认(Confirm)

关键字:

建议订阅本站,及时阅读最新文章!
【上一篇】 【下一篇】

发表评论