MQ系列(九):RabbitMQ发布者确认,消费者(确认,预取,优先级,退出通知)

发布者确认解决了消息发送传输过程中因网络故章导致丢失消息的问题,RabbitMQ 为消息者确认提供了 自动确认手动确认 两种方式。

消费者预取可以限制在回传确认消息之前接收的消息数量,消费者优先级允许高优先级的消息者优先接收消息,当高优先级消费者阻塞时,消息只会传递给低优先级消费者。

发布者确认

在默认情况下,发布者在将消息发送出去之后,broker 是不会返回任何消息给生产者的,即发布者不知道消息是否正确到达 broker。发布者发布消息,可能会因为网络故障或不稳定导致消息丢失,或者到达服务器大大延迟。RabbitMQ 为解决此问题提供了两种方式:一是基于 AMQP 协议中的事务机制;二是把信道设置成确认模式。

在 AMQP 0-9-1 标准中,为确保消息不会丢失唯一的方法是使用事务 - 使通道具有事务性,然后对每条消息或一组消息进行发布,提交。但事务方式是不必要的重量级的,并使吞吐量减少 250倍。

为解决这个问题,RabbitMQ 引入了发送者确认机制。模仿了协议中已存在的消费者确认机制。

事务机制

事务 能解决消息发送方和 RabbitMQ 之间消息确认的问题,只有消息成功被 RabbitMQ 接收,事务才能提交成功,否则便在捕获异常之后进行事务回滚,同时可以进行消息重发。

事务命令

事务机制命令指行流程:

  1. 客户端使用 Tx.Select 命令开启事务,
  2. 服务器回传 Tx.Select-Ok指令,
  3. 再通过Basic.Publish 命令发送消息,
  4. 再使用 Tx.Commit命令提交消息,若提交成功,
  5. 服务器返回 Tx.Commit-Ok指令。
  6. 如果异常,客户端捕获异常并发送 Tx.Roolback 命令,
  7. 服务器回滚消息成功,返回 Tx.Roolback-Ok指令。

事务方法

通道事务机制有三个方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public interface Channel extends ShutdownNotifier, AutoCloseable {
//.....其它省略.....

/**
* 此通道开启事务模式
*/
Tx.SelectOk txSelect() throws IOException;

/**
* 此道通提交事务
*/
Tx.CommitOk txCommit() throws IOException;

/**
* 此通道事务回滚
*/
Tx.RollbackOk txRollback() throws IOException;

//.....其它省略.....
}

使用事务

发布者事务使用

  1. 通过 channel.txSelect 方法开启事务后。
  2. 使用 channel.basicPublish 发送消息给 RabbitMQ 。
  3. 执行 channel.txCommit 提交事务
  • 示例代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    try {
    //开启事务
    channel.txSelect();
    //发送消息
    channel.basicPublish("exchangeName", "routingKey", MessageProperties.PERSISTENT_TEXT_PLAIN, "msg".getBytes());
    //提交事务
    channel.txCommit();
    } catch (IOException e) {
    e.printStackTrace();
    //异常,事务回滚
    channel.txRollback();
    }
  • 在事务内发送多条消息

    开启事务后,循环发送多条消息,但每一条消息是独自提交,独自回滚的。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    //开启事务
    channel.txSelect();
    for (int i = 0; i < LOOP_TIMES; i++) {
    try {
    //发送消息
    channel.basicPublish("exchangeName", "routingKey", MessageProperties.PERSISTENT_TEXT_PLAIN, ("msg" + i).getBytes());
    //提交事务
    channel.txCommit();
    } catch (IOException e) {
    e.printStackTrace();
    //异常,事务回滚
    channel.txRollback();
    }
    }

使用 事务机制 会严重降低 RabbitMQ 的性能,并使吞吐量降低了 250 倍,在 AMQP 协议层面上没有更好的办法,但 RabbitMQ 提供了一个改进方案,即发布者确认机制

发布者确认机制

事务机制在一条消息发送之后会使发送端阻塞,以等待 RabbitMQ 的回应,之后才能继续发送下一条消息。

发布者确认机制模仿的是 AMQP 0-9-1 标准中的消费者确认,其最大优势是确认是异步的,在等待返回消息确认之前仍可继续发送消息,相比事务机制,吞吐量大大提高。

确认机制

要开启 确认模式,生产者信道需要设置成 confirm 模式,即发送了 confirm.select 指令,broker 响应 confirm.select-ok 指令表示当前信息开启 confirm 模式成功。

1
2
3
4
// 信道调用 confirm
channel.confirmSelect();
// 方法
Confirm.SelectOk confirmSelect() throws IOException;

事务性通道无法进入确认模式,并且一旦通道处于确认模式,就无法使其变为事务性。

一旦通道开启确认模式后,broker 和 发送者都会对消息进行计数(在第一个 confirm.select 上,计数从 1 开始)。消息被投递到匹配的队列后,broker 就会在同一通道上回传一条 确认basic.ack指令) 消息给发布者,使得发布者知晓消息已经正确到达了目的地,broker 返回的确认消息包含消息的唯一 ID。如果消息和队列是持久化的,那么确认消息会在消息写入磁盘之后发出。

RabbitMQ 回传给生产者的确认消息中的 deliverTag包含了确认消息的序列号(唯一ID),还可以在确认消息中设置 multiple 参数,以指示所有消息已确认,表示ID消息之前的所有消息已确认。

1
2
3
4
// 调用
channel.basicAck(deliveryTag, multiple);
// 方法
void basicAck(long deliveryTag, boolean multiple) throws IOException;

发布者确认机制运用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
try {
//信道开启确认模式
channel.confirmSelect();
//发送消息
channel.basicPublish("exchange.name", "routing.key", null, "msg".getBytes());
//等到自上次调用以来发布的所有消息都被 broker 确认(ack)或 否定确认(nack)。
if (!channel.waitForConfirms()) {
// 如果是 nack 进入
System.out.println("Send message fail");
//...do something else....
}
} catch (InterruptedException e) {
e.printStackTrace();
}

如果是发布多条消息,只需将 channel.basicPublish()channel.waitForConfirms()包裹在循环里面即可,channel.confirmSelect()不需要包裹在循环内部。

否定确认

发布者确认机制最大的好处在于它是异步的,一旦发布一条消息,在等待信息返回确认的同时继续发送下一条消息,当消息最终得到确认之后,发布者可以通过回调方式来处理该确认消息。

在特殊情况下,当 Broker 无法成功处理消息,会发送一条 nackbasic.nack 指令),表示它无法处理这些消息并拒绝对此承担责任;此时,客户端可以选择重新发布消息,发布者同样可以在回调中处理该 nack 消息。

1
2
3
4
// 调用
channel.basicNack(deliveryTag, multiple, requeue);
// 方法
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;

信道进入确认模式后,所有后续发布的消息都会被 acknack 一次,只能其一,RabbitMQ 不保证确认消息的时间(快慢)。

仅在负责队列的 Erlang 进程中发生内部错误时,才会传递 basic.nack

等待确认:waitForConfirms

channel.waitForConfirms 方法是接收 RabbitMQ 返回的确认,对于无参方法,其返回的条件是客户端收到 basic.ackbasic.nack 或者被中断。

1
2
3
4
5
6
7
8
9
10
11
// 等待确认, 如果不是确认模式调用,抛出 InterruptedException
boolean waitForConfirms() throws InterruptedException;

// 等待确认,增加了超时
boolean waitForConfirms(long timeout) throws InterruptedException, TimeoutException;

// 等待确认, 如果任务消息RabbitMQ返回 Basic.Nack, 抛出 IOException
void waitForConfirmsOrDie() throws IOException, InterruptedException;

// 等待确认, 增加了超时, 如果任务消息RabbitMQ返回 Basic.Nack, 抛出 IOException
void waitForConfirmsOrDie(long timeout) throws IOException, InterruptedException, TimeoutException;

事务机制发布确认机制 确认消息能正确地发送到 RabbitMQ,具体是指消息被正确发送到 RabbitMQ 交换器,如果交换器没有匹配的队列,消息也会丢失。所以在使用这两种机制时要确保所涉及的交换器存在绑定的队列。更优的处理是,发送方要配合 mandatory参数或者备份交换器一起使用来提高消息传输的可靠性。

确认方式

  1. 同步确认

    每发送完一条消息后,就调用 channel.waitForConfirms 方法等待 Broker 返回确认消息,本质上就是串行确认方式。

    此方式的性能是最差的,略优先事务机制。但编程方式简单,不需要在客户端维护消息的 deliveryTag 及未确认的消息。

  2. 批量确认

    每发送完一批消息后,再调用 channel.waitForConfirms 方法等待 Broker 返回确认消息。basicAckchannel.basicNackmultiple 参数设置为true,开启多条消息确认。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    private static int BATCH_COUNT = 50;
    String msg = "msg";
    int msgCount = 0;
    try {
    channel.confirmSelect();
    while (true) {
    channel.basicPublish("exchange.name", "routing.key", null, msg.getBytes());
    //将发送出去的消息存入缓存中,缓存可以是一个 ArrayList 或者 BlockingQueue 之类
    if (++msgCount >= BATCH_COUNT) {
    msgCount = 0;
    try {
    if (channel.waitForConfirms()) {
    //将缓存中的消息清空
    continue;
    }
    //将缓存中的消息重发
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }
    } catch (IOException e) {
    e.printStackTrace();
    }

    在批量确认方式中,发布者需要定期或定量(达到多少条),亦或两者结合起来调用 channel.waitForConfirms()来等待 RabbitMQ 的确认返回。

    此方式存在个问题是,如果出现返回 basic.nack 或超时的情况,客户端需要将这一批次的消息全部重发,这会带来明显的重复消息数量,并且当消息经常丢失时,批量确认可能带来的不是性能升高,而是降低。

  3. 异步确认

    通过调用 addConfirmListener 方法注册回调,在 Broker 确认了一条或多条消息之后由客户端回调该方法。

    1
    2
    3
    4
    // 添加确认监听器
    void addConfirmListener(ConfirmListener listener);
    // 添加确认监听器, ack 回调, nack 回调
    ConfirmListener addConfirmListener(ConfirmCallback ackCallback, ConfirmCallback nackCallback);

    确认监听器:ConfirmListener

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    /**
    * 消息确认监听器
    * 实现此接口以便收到消息确认事件的通知
    * Acks 表示成功处理的消息; Nacks 表示代理丢失的消息.
    * 丢失的消息仍可能已传递给消费者, 但 broker 无法保证这一点
    */
    public interface ConfirmListener {
    // deliveryTag: 标记消息的唯一ID;
    // multiple: false 删除相应的一条, true 删除相应的多条
    void handleAck(long deliveryTag, boolean multiple)
    throws IOException;

    void handleNack(long deliveryTag, boolean multiple)
    throws IOException;
    }

    使用异步确认方式,发布者客户端需要维护一个 unconfirm 的消息号的集合,每发送一条消息,将该消息号加入到集合中。每当调用 ConfirmListener.handleAck 方法时,unconfirm 集合中删除掉相应的一条(multiple=false 时)或多条(multiple=true 时)记录。

    从程序运行效率上看,这个 unconfirm 最好采用有序 集合 SortedSet 的存储结构。事实上, Java 客户端 SDK 中的 waitForConfirms 方法也是通过 SortedSet 维护消息的序列号的。代码示例如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    SortedSet<Long> confirmSet = new TreeSet<>();
    //信道设置确认机制
    channel.confirmSelect();
    channel.addConfirmListener(new ConfirmListener() {
    @Override
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
    System.out.println("Ack, SeqNo:" + deliveryTag + ", multiple:" + multiple);
    if (multiple) {
    confirmSet.headSet(deliveryTag + 1).clear();
    } else {
    confirmSet.remove(deliveryTag);
    }
    }

    @Override
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
    System.out.println("NAck, SeqNo:" + deliveryTag + ", multiple:" + multiple);
    if (multiple) {
    confirmSet.headSet(deliveryTag + 1).clear();
    } else {
    confirmSet.remove(deliveryTag);
    }
    //注意,这里需要添加处理消息重发的场景
    }
    });

    while (true){
    long nextSeqNo = channel.getNextPublishSeqNo();
    channel.basicPublish("exchange.name", "routing.key", MessageProperties.PERSISTENT_TEXT_PLAIN, "msg".getBytes());
    confirmSet.add(nextSeqNo);
    }

    异步confirm 方式编程略有复杂,和 批量confirm 方式一样需要在客户端维护状态,但性能好很多。

何时确认发布

对于无法路由的消息,一旦交换器验证消息不会路由到任何队列(返回空列表),Broker 将发出确认。如果消息被发布为约束消息(mandatory=true:退回给发布者),则在basic.ack之前将basic.return发送到客户端,对于否定确认(basic.nack)也是如此。

对于可路由消息,当所有队列都接受了消息时,将发送basic.ack。 对于路由到持久队列持久消息,这意味着持久存储到磁盘成功后,发送basic.ack;对于镜像队列,这意味着所有镜像都已接受该消息后发送 basic.ack

持久消息的ACK延迟

将消息持久化到磁盘成功后,将发送持久消息的basic.ack。RabbitMQ 消息存储会在间隔时间(几百毫秒)后将消息分批保存到磁盘,以最大程度地减少fsync(2)调用的资数,队列空闲时。这意味着在恒定负载下,basic.ack的延迟可能会达到几百毫秒。

为了提高吞吐量,强烈建议应用程序异步处理确认(作为流)或发布批量消息并等待未完成的确认。客户端库之间对此的确切 API 有所不同。

发布确认顺序注意

在大多数情况下,RabbitMQ 将按照发布的顺序向发布者确认消息(这适用于在单个通道上发布的消息)。

但是,发布者确认是异步发出的,并且可以确认单个消息或一组消息。 发出确认的确切时间取决于消息的传递模式(持久还是 非持久)以及消息路由到的队列的属性。 也就是说,不同的消息可以被认为在不同的时间准备好确认。

这意味着,与各自的消息相比,确认可以以不同的顺序到达,在可能的情况下,应用程序不应依赖于确认的顺序

发布确认与传输保存

如果 RabbitMQ 节点在将消息写入磁盘之前失败,则可能会丢失持久消息。例如,考虑以下场景:

  1. 发布者发布一条持久化消息到持久化队列。
  2. 消费者从队列消费消息(注意,消息是持久的,队列是持久的),但没有确认。
  3. RabbitMQ 节点失败并重启。
  4. 消费者重连并开始消费消息。

此时,消费者端可以合理地假设该消息将再次传递。 情况并非如此:重新启动已导致 Broker 丢失消息,为了保证持久性,发布者应使用确认,如果发布者的通道处于确认模式,则发布者不会收到丢失消息的 ack(因为该消息尚未写入磁盘),发布者端可设置发布确认超时时间,在到达超时而没有收到确认消息,则重发消息或做相应的处理。

传递标签局限性

传递标签(Delivery tag)的长度为 64 位,因此最大值为 9223372036854754775807。由于传递标签是按通道划分的范围,因此实际上发行者或消费者不太可能会超过该值。

消费者确认

消费者可以通过 推模式拉模式 来获取并消费消息,当消息者处理完业务逻辑需要手动确认消息已被接收,这样 RabbitMQ 才能把当前消息从列表中移除。

如果消费者因为某些原因无法处理当前接收到的消息,可以通过 channel.basicNackchannel.basicReject来拒绝消息。

基本概念

根据定义,使用消息代理(如 RabbitMQ)的系统是分布式的。由于发送的协议方法(消息)不能保证到达消费者或消费者成功地处理,因此发布者和消费者都需要一种传递和处理确认的机制。RabbitMQ 支持的几个消息传递协议提供了这样的特性。本指南涵盖了 AMQP 0-9-1 中的特性,但在其他受支持的协议中,这一思想大体上是相同的。

从消费者到 RabbitMQ 的传递处理确认在消息传递协议中被称为确认。代理对发布者的确认是称为发布者确认(publisher confirms)的协议扩展。这两个特性都基于相同的思想,并受到 TCP 的启发。

它们对于从发布者RabbitMQ节点以及从 RabbitMQ节点消费者 的可靠交付都是必不可少的。换而言之,它们对于数据安全至关重要,因为应用程序与 RabbitMQ 节点一样负责数据安全。

消费者传递确认

当 RabbitMQ 将消息传递给消费者时,它需要知道何时考虑消息发送是成功的。 什么样的逻辑是最优的取决于系统,因此,它主要是一个应用程序决策。

在 AMQP 0-9-1 中,它是通过使用basic.consume方法注册消费者或使用basic.get方法按需获取消息的情况。

消息传递 ID

消息传递唯一标签(Delivery Tags):消费者注册时,RabbitMQ 将使用 basic.deliver方法传递(推送)消息。该方法会携带一个传递标签(delivery tags),作为通道上的传递的唯一标识(ID)。因此,传递标签的作用域是每个通道。

传递标签是单向增长的正整数,由客户端库表示。确认传递的客户端库方法将传递标签作为参数。

由于传递标识的作用域是每个通道,因此,必须在接收它们的同一通道上确认传递。在不同的通道上进行确认将导致 未知的传递标识 协议异常,并关闭该通道。

消费者确认模式

当节点将消息传递给使用者时,它必须决定该消息是否应该被使用者视为已处理(或至少已接收)。由于多种情况(客户端连接,消费者应用程序等)可能会失败,因此此决定是数据安全问题。

消息协议通常提供一种确认机制,该机制允许消费者确认向其连接的节点的传递。是否使用该机制由消费者订阅时决定。

根据所使用的确认模式,RabbitMQ 在消息发出(写入TCP套接字)后或在接收到显式(手动)客户端确认后,会立即认为消息已被成功传递。

手动发送的确认可以是ack的或nack的,并使用以下协议方法之一:

  • basic.ack 用于肯定确认,表示消息已成功接收并处理。
  • basic.nack 用于否定确认,表示消息处理失败。
  • basic.reject 用于否定确认,表示拒绝消息。

肯定的确认(ack)只是指示 RabbitMQ 将消息标记已传递,可以将其丢弃。 带有basic.reject的否定确认具有相同的效果。 区别主要在语义上:肯定的确认假定消息已成功处理,而否定的确认则表明未处理传递,但仍应删除。

自动确认模式下,消息被视为在发送后立即成功传递。此模式在较高的吞吐量(只要消费者可以跟上)之间进行权衡,从而会降低传递和消费者处理的安全性。此模式通常称为 fire-and-forget(即发即忘,一劳永逸)。 与手动确认模式不同,如果在成功传递之前关闭使用者的TCP 连接或通道,则服务器发送的消息将丢失。因此,自动消息确认应该被认为是不安全的,并且不适合所有工作负载。

使用自动确认模式时,另一个需要考虑的重要问题是消费者过载。手动确认模式通常与信道预取一起使用,该预取限制了通道上允许未完成确认的传递数量。但是,对于自动确认,从定义上讲,没有这种限制。因此,消费者的消息处理速度可能跟不上消息的投递速度,从而可能在内存持续积压,耗尽内存,或消费者进程被操作系统终止。

某些客户端库将应用 TCP 背压(停止从套接字读取,直到未处理的消息的积压量下降到某个限制之外)。因此,仅建议自动确认模式用于可以高效,稳定地处理传递的消费者。

ACK确认单条消息

用于传递确认的API方法通常公开为客户端库中某个通道上的操作。

Java客户端用户将使用Channel # basicAckChannel#basicNack分别执行basic.ackbasic.nack

消费者确认方法:

1
void basicAck(long deliveryTag, boolean multiple) throws IOException;

下面是一个Java客户端示例,显示出肯定的确认:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// this example assumes an existing channel instance

boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "a-consumer-tag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException
{
long deliveryTag = envelope.getDeliveryTag();
// positively acknowledge a single delivery, the message will
// be discarded
channel.basicAck(deliveryTag, false);
}
});

ACK一次确认多条

可以批量手动确认以减少网络流量。这是通过将确认方法的multiple字段设置为 true来完成的。注意basic.reject历来没有该字段,这就是 RabbitMQ 引入 basic.nack 作为协议扩展的原因。

multiple设置为 true 时,RabbitMQ 将确认所有未完成的传递标签,包括确认中指定的标签。与其它所有与确认相关的内容一样,也是按通道划分的。

例如,假设在通道Ch上有未确认的交付标签5、6、78,当某个确认帧到达该通道时,delivery_tag 设置为 8multiple设置为 true,则将确认从 58 的所有标签 。 如果将multiple设置为false,则仍然无法确认交货 5、6 和 7。

下面示例开启批量确认,需将 Channel # basicAck中的multiple 设置为 true

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// this example assumes an existing channel instance

boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "a-consumer-tag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException
{
long deliveryTag = envelope.getDeliveryTag();
// positively acknowledge all deliveries up to
// this delivery tag
channel.basicAck(deliveryTag, true);
}
});

NACK和重新排队

有时消费者无法立即传递的消息,但其他实例可能可以处理。在这种情况下,可能需要重新排队以让另一个消费者接收和处理它。 basic.rejectbasic.nack是用于此目的两种协议方法。

这些方法通常用于否定地确认传递(nack),这样的传递的消息可以被 Broker 丢弃或重新排队。此行为由重新排队(requeue )字段控制,当该字段为 true 时,代理将使用指定的传递标签重新排队传递(或多次传递)。

这两种方法通常在是客户端的通道上进行操作。Java 客户端用户将使用Channel#basicRejectChannel#basicNack分别执行basic.rejectbasic.nack

两个方法及参数:

1
2
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
void basicReject(long deliveryTag, boolean requeue) throws IOException;

拒绝并丢弃消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// this example assumes an existing channel instance

boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "a-consumer-tag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException
{
long deliveryTag = envelope.getDeliveryTag();
// negatively acknowledge, the message will
// be discarded
channel.basicReject(deliveryTag, false);
}
});

拒绝但重新排队:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// this example assumes an existing channel instance

boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "a-consumer-tag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException
{
long deliveryTag = envelope.getDeliveryTag();
// requeue the delivery
channel.basicReject(deliveryTag, true);
}
});

当消息被重新排队,将尽可能放置在其队列中的原始位置。否则,该消息将重新排队到更靠近队列头的位置(当多个消费者共享一个队列时,可能存在来自其他消费者的并发传递和确认,Broker 会删除确认的消息,重新排队的消息相当于前移)。

重新排队的消息可能立即准备好重新发送,具体取决于它们在队列中的位置以及活动的消费者使用的通道预取值。这意味着,如果所有消费者都因暂时情况而无法处理传递而重新排队,则他们将创建一个重新排队/重新传递循环,这样的循环在网络带宽和 CPU资源方面的代价可能很高。消费者实现可以跟踪重新传递的次数并永久拒绝消息(丢弃消息),或在延迟后安排重新排队。

使用basic.nack方法可以一次拒绝或重新排队多个消息。 这就是它与basic.reject区别的地方。 它接受一个附加参数:multiple。 下面是一个Java客户端示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// this example assumes an existing channel instance

boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "a-consumer-tag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException
{
long deliveryTag = envelope.getDeliveryTag();
// requeue all unacknowledged deliveries up to
// this delivery tag
channel.basicNack(deliveryTag, true, true);
}
});

消息轮询分发

当 RabbitMQ 的队列有多个消费者时,队列收到的消息将以轮询(round-robin)的分发方式发送给消费者,每条消息只会发送给一个消费者,这种方式实现了最简单的负载均衡,非常适合扩展,并发情况下负载加重,只需创建更多的消费者实例来消费处理消息即可。

有时间轮询的分发机制也不是那么优雅。默认情况下,假如有 n 个消息者,那么 RabbitMQ 会将第 m 条消息分发给 第 m%n(取余的方式),RabbitMQ 不管消费者是否消费并已经确认(Basic.Ack)了消息。

假如某些消费者任务繁重,来不及消费那么多的消息,而某些消费者由于某些原因(如,业务逻辑简单,机器性能卓越)很快地处理完所分配的消息,进而进程空闲,这样就造成整体就用吞吐量的下降。

这里就可以启用通道预取机制(*channel.basicQos(int prefetchCount)*)了,限制信道上的消费者所能保持的最大未确认消息的数量。见下章节的 通道预取机制

通道预取机制

因为消息是异步发送(推送)给客户端的,所以在任何给定时刻,一个通道上通常会有多个消息在传输中。另外,来自客户端的手动确认本质上也是异步的。因此,存在一个未确认的传递标签滑动窗口,开发人员通常希望限制这个窗口的大小,以避免在消费者端出现无限制的缓冲区问题。

这是通过使用basic.qos方法设置预取计数值来完成的。该值定义通道上允许未确认的最大传递数。一旦数量达到配置的限制,RabbitMQ 将停止在通道上传递更多消息,除非至少有一个未完成的消息得到确认。

例如,假设在通道 Ch上有未确认的传递标签5、6、78,并且通道Ch的预取计数设置为4,RabbitMQ 将不会在 Ch上 推送任何传递,除非至少确认一个未完成的传递 。 当确认帧到达该通道,且 delivery_tag 设置为5(或6、7或8)时,RabbitMQ 将注意到并再发送另一条消息,一次确认多条消息将使多条消息可供发送。

值得重申的是,传递流程和手动客户确认流程完全是异步的。 因此,如果在已经有正在传递中同时更改了预取值,则会出现自然竞争的情况,并且在通道上可能会出现超过预取计数的未确认消息。

QoS 预取设置即使使用手动确认模式,也不会影响使用basic.getpull API)获取的消息(即 消息预取 对拉模式无效)。可以为通道或使用者配置 QoS 设置。 有关详细信息,请参见下面的消费者预取

消息顺序性

消息的顺序性是指消费者消费到的消息和发送者发布的消息的顺序是一致的。

使用了消息中间件的架构在消息处理上异步,要保存消息的顺序性是比较困难的,见上面 发布确认顺序注意 小节。

而通常所说的保障消息顺序是,是在不使用任何 RabbitMQ 高级特性,没有消息丢失,网络延迟或故障之类异常情况发生,且只有一个消费者的情况下,最好也只有一个生产者的情况下可以保证消息的顺序性。

如果有多个生产者同时发送消息,无法确定消息到达 Broker 的前后顺序,也就无法验证消息的顺序性。

消息顺序性被打破的情况:

  • 生产者启用了事务,消息发送后遇到异常进行了事务回滚,需要补偿发送。
  • 启用了发布确认,发生超时,中断或收到 RabbitMQ 的 Basic.Nack 命令,需要补偿发送。
  • 生产者发送消息设置了超时时间,也设置了死信队列,相当于一个延迟队列,消费的顺序必然与发送顺序一致。
  • 消息设置了优先级,消费者消费到的消息必然不是顺序的。

或者可理解为消息的顺序性保障是从存入队列之后开始的,而不是在发送的时候开始。

如果要保证消息的顺序性,需要业务方使用 RabbitMQ 做进一步的处理,比如在消息体内增加全局有序标识(类似 Sequence)来实现。

确认模式/预取/吞吐量

确认模式QoS 预取值对用户吞吐量有重大影响。通常,增加预取将提高向消费者传递消息的速度。自动确认模式可产生最佳的传送速度。但是,在两种情况下,已传递但尚未处理的消息的数量也会增加,从而增加了消费者的RAM消耗。

应谨慎使用具有无限预取功能的自动确认模式或手动确认模式。消费大量消息而未确认的消费者将导致他们所连接的节点上的内存消耗增加。 找到合适的预取值是一个反复尝试的问题,并且会因工作负载的不同而有所不同。 100300 范围内的值通常可提供最佳吞吐量,并且不会带来压跨消费者的重大风险。较高的价值经常会遇到 收益递减的规律

预取值为 1 是最保守的,这将显着降低吞吐量,特别是在消费者连接延迟较高的的环境中。 对于许多应用,较高的值将是合适的和最佳的。

消费失败/连接丢失

使用手动确认时,如果关闭了发生传递的通道(或连接),则会自动重新排列未确认的任何传递(消息)。 这包括客户端的 TCP 连接丢失,消费者应用程序(进程)故障和通道级协议异常(如下所述)。

注意检查到不可用的客户端是需要一段时间的。

由于这种行为,消费者必须做好处理重新传递的准备,否则在实现时必须考虑幂等性。 重新交付将具有特殊的布尔属性redeliver,由 RabbitMQ 设置为 true。 对于首次交付,它将设置为 false。 请注意,消费者可以接收到先前传递给另一个消费者的消息。

重复确认/未知标签

如果客户端多次确认同一传递标签(delivery tag),RabbitMQ 将导致通道错误。例如 PRECONDITION_FAILED - unknown delivery tag 100。如果使用未知传递标签,则会引发同一渠道异常。

Broker 将抱怨未知的交付标签的另一种场景是,尝试在与接收传递的通道不同的通道上确认(无论是 ack 还是 nack)。 必须在同一通道上确认传递。

消费者预取

消费者预取信道预取机制 的扩展。

AMQP 0-9-1 指定basic.qos方法,用于限制通道(或连接)上未确认消息的数量(即 预取计数,prefetch count)。

不幸的是,通道不是这方面的理想范围,因为一个通道可能会消费来自多个队列的消息,所以通道和队列需要针对发送的每个消息相互协调,以确保它们不会超出限制。但在单台计算机上很慢,且在群集中使用时非常慢。

此外,对于许多用途,更自然的做法是指定适用于每个消费者的预取计数。因此,RabbitMQ 重新定义了 basic.qos 方法中全局标志global的含义:

global AMQP 0-9-1 中的预取计数 prefetchCount RabbitMQ 中的预取计数 prefetchCount
false 信道上的所有消费者之间共享 作用于信首上的每个新消费者
true 连接上的所有消费者之间共享 信道上的所有消费者之间共享

注意:在大多数 API 中,global 标志默认值是 false

RabbitMQ 中的通道预取有三个带不同参的方法可调用:

1
2
3
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
void basicQos(int prefetchCount, boolean global) throws IOException;
void basicQos(int prefetchCount) throws IOException;
  • prefetchSize:允许通道上未确认消息的大小,以字节为单位,0 表示不限制。
  • prefetchCount:允许通道上未确认消息的预取个数,0 表示不限制。

单一消费者

下面 Java 基本示例:一次最多接收 10 条未确认的消息

1
2
3
4
Channel channel = ...;
Consumer consumer = ...;
channel.basicQos(10); // Per consumer limit
channel.basicConsume("my-queue", false, consumer);

独立消费者

下面示例:在同一通道上启动两个消费者,每个消费者将独立地一次最多接收 10 条未确认的消息。

1
2
3
4
5
6
Channel channel = ...;
Consumer consumer1 = ...;
Consumer consumer2 = ...;
channel.basicQos(10); // Per consumer limit
channel.basicConsume("my-queue1", false, consumer1);
channel.basicConsume("my-queue2", false, consumer2);

多消费者共享限制

AMQP 0-9-1规范没有解释如果多次使用不同的global值调用basic.qos会发生什么。

RabbitMQ 将此解释为这两个预取限制应彼此独立执行;只有在未达到对未确认消息的限制时,消费者才会接收新消息(即接收的未确认消息数量必须小于预取限制)。

示例:

1
2
3
4
5
6
7
Channel channel = ...;
Consumer consumer1 = ...;
Consumer consumer2 = ...;
channel.basicQos(10, false); // Per consumer limit
channel.basicQos(15, true); // Per channel limit
channel.basicConsume("my-queue1", false, consumer1);
channel.basicConsume("my-queue2", false, consumer2);

这两个消费者之间将只会有 15 条未确认的消息(整个信道上的),每个消费者最多有 10 条未确认的消息。

由于在通道和队列之间进行协调以强制执行全局限制,因此这种方式会增加 RabbitMQ 负载。如无特殊需要,最好只使用 global 为 false 的设置,这也是默认设置。

消费者优先级

消费者优先级可以确保高优先级的消费者处于活跃状态时接收消息,而只有当高优先级的消费者阻塞时,消息才发送给较低优先级的消费者。

通常,连接到队列的活动消费者会以循环方式从队列接收消息。 当使用消费者优先级时,如果存在多个具有相同高优先级的活动消费者,则会循环传递消息。

活跃消费者

上述段落提到了消费者处于活跃状态阻塞状态。在任何时候,任何给定的消费者都会处于其中一个状态,活跃状态的消费者是无需等待即可接收消息的消费者。

如果消费者无法接收消息,则该消费者会被阻塞。因为其通道在发出basic.qos之后已达到未确认消息的最大数量,或者只是因为网络拥塞。

因此,对于每个队列,活跃者定义至少满足以下三个条件之一:

  1. 没有活跃的消费者
  2. 队列是空的
  3. 队列正忙于传递消息给消费者。

注意,消费者每秒可以在活动状态阻塞状态之间进行多次切换。 因此,难以通过管理插件或rabbitmqctl显示消费者是处于活动状态还是阻塞状态。

当使用消费者优先级时,可以期望优先级最高的消费者可以接收所有消息,直到它们被阻塞为止,此时优先级较低的使用者将开始接收某些消息。重要的是要理解RabbitMQ仍将优先传递消息,如果有一个活跃的低优先级消费者准备就绪,它就不会等待高优先级阻塞的消旨者变成非阻塞。

使用消费者优先级

basic.consume方法中的x-priority参数设置为整数值,未指定值的使用者的优先级为 0 。数字越大,优先级越高,可以使用正数和负数。

示例:创建一个优先级为 10 的新消费者

1
2
3
4
5
Channel channel = ...;
Consumer consumer = ...;
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-priority", 10);
channel.basicConsume("my-queue", false, args, consumer);

消费者退出通知

消费者退出概述

当某个信道正在从队列中消费时,有多种原因可能会导致消费停止。

其中之一是,如果客户端在同一通道上发出basic.cancel,这将导致消费者被取消,服务器将以basic.cancel-ok进行回复。其他事件,例如,队列正在被删除,或者在集群场景中,队列所在的节点发生故障,将导致取消消费者,但不会通知客户端信道,这通常是没有帮助的。

为解决这类问题,RabbitMQ 引入了一个扩展,如果发生意外的消费被被取消,Broker 将向客户端发送 basic.cancel。如果 Broker 从客户端收到 basic.cancel,则不会发送此消息。

默认情况下,AMQP 0-9-1 客户端不希望异步从 Broker 接收basic.cancel方法,因此要启用此行为,客户端必须在其client-properties中提供capabilities 表,其中包含一个键名为 consumer_cancel_notify,值为 true 的属性。详细参考 section on capabilities

RabbitMQ 支持的客户端默认情况下向 Broker 提供此能力,因此 Broker 将向客户端回调发送送异步basic.cancel方法。

例如,在 Java 客户端中,消费者(Consumer)接口有一个 handleCancel 回调,可以被子类DefaultConsumer类重写:

1
2
3
4
5
6
7
8
channel.queueDeclare(queue, false, true, false, null);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleCancel(String consumerTag) throws IOException {
// consumer has been cancelled unexpectedly
}
};
channel.basicConsume(queue, consumer);

对于已被意外取消(例如,由于队列删除)的消费者,客户端发送 basic.cancel并不是一个错误。

根据定义,在发出 basic.cancel 的客户端 与 发出异步通知的 Broker 之间可能存在竞争,在此情况下,Broker 在接收 basic.cancel 并正常返回 basic.cancel-ok 时不会出错。

消费者退出与镜像队列

当队列被删除或变得不可用时,将始终通知支持消费者退出通知的客户端。

当镜像队列故障转移时,消费者可以请求取消它们自身(请参阅镜像队列-mirrored queues 以了解为什么以及如何完成此操作)。

消息传输保障

消息可靠传输一般是业务系统接入消息中间件首要考虑的问题,一般消息中间件的消息传输保障分为三个层级:

  • At most once:最多一次。消息可能会丢失,但绝不会重复传输。
  • At least once:最少一次。消息绝不会丢失,但可能会重复传输。
  • Exactly once:恰好一次。每条消息肯定会被传输一次且仅传输一次。

RabbitMQ 支持其中的 最多一次,最少一次,其中最少一次发送实现需要考虑下以几个方面的内容:

  1. 消息生产者需要开启事务机制或者 publisher confirm 机制,以确保消息可以可靠地传输到 RabbitMQ 中。
  2. 消息生产者需要配合使用 mandatory 参数或者备份交换器来确保消息能够从交换器路由到队列中,进而能够保存下来而不会被丢弃。
  3. 消息和队列都需要进行持久化处理,以确保 RabbitMQ 服务器在遇到异常情况时不会造成消息丢失。
  4. 消费者在消费消息的同时需要将 autoAck 设置为 false,然后通过手动确认的方式去确认已经正在消费的消息,以避免消费端引起不必要的消息丢失。

恰好一次 是 RabbitMQ 目前无法保障的。一种情况,消费者端在消费完一条消息后向 RabbitMQ 发送确认命令,此时若因网强行断开或其它原因导致 RabbitMQ 没收到这条确认消息,客户端重新连接后还是会收到这条消息,就存在了重复消费;
或生产者使用 发布确认 机制时,发送完消息等待 RabbitMQ 返回确认通知,此时网络断开,生产者捕获异常,为了确保消息可靠性选择重新发送,这样 RabbitMQ 中就有两条同样的消息,消费者就会重复消费。

目前大多数主流的消息中间件都没有消息去重机制,也不保障 恰好一次。去重处理一般在业务层实现,比如引入 GUID 的概念。那么可能需要引入集中式缓存,必然会增加依赖复杂度。在实际生产环境中,业务方根据自身的业务特性进行去重。如,业务消息本身具备幂等性,或借助 Redis 等其它产品进行去重处理。

相关参考

  1. 官方:RabbitMQ Document
  2. 官方:Publisher Confirms and Consumer Acknowledgements
  3. 官方:Consumer Prefetch
  4. 官方:Consumer Priorities
  5. 官方:Consumer Cancellation Notifications
  6. 一文理解RabbitMQ原生版

MQ系列(九):RabbitMQ发布者确认,消费者(确认,预取,优先级,退出通知)

http://blog.gxitsky.com/2020/02/22/MQ-09-RabbitMQ-publisher-consumer-ack/

作者

光星

发布于

2020-02-22

更新于

2022-06-07

许可协议

评论