MQ系列(八):RabbitMQ过期时间,死信队列,延迟队列,优先队列,持久化

RabbitMQ 提供了在声明队列时或发送消息时给消息设置过期时间。死信队列为处理过期或不能正确路由的消息提供了驻留场所,可以防止消息丢失,便于分析无法消费的原因。

消息过期时间配合死信队列可以实现延迟队列的功能。优先队列中优先级高的消息被优先消费。RabbitMQ 持久化支持交换器持久化,队列持久化,消息持久化。持久化可以提高 RabbitMQ 的可靠性,防止服务器重启或宕机导致数据丢失。

消息回退

使用 channel.basicPublish发布消息,有一个方法包含了 mandatoryimmediate 这两个参数,都是 boolean 值,默认都为 false。这两个参数指定如何将不可达的消息返回给生产者。

1
2
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
throws IOException;

mandatory参数

mandatory:关注队列不存在时的处理

  • true:交换器根据路由键找不到匹配的队列时,RabbitMQ 会调用Basic.Return命令将消息返回给生产者。
  • false:交换器根据路由键找不到匹配的队列时,消息直接被丢弃。

生产者可以通过调用 channel.addReturnListener 方法来添加返回监听器接收 Broker 返回的消息。两个方法如下:

1
2
void addReturnListener(ReturnListener listener);
ReturnListener addReturnListener(ReturnCallback returnCallback);

示例:

1
2
3
4
5
6
7
8
channel.basicPublish("exchangeName", "routingKey", MessageProperties.PERSISTENT_TEXT_PLAIN, "msg".getBytes());
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
System.out.println("返回的消息:" + msg);
}
});

immediate参数

immediate:关注消费者不存在时的处理

  • true:交换器在消息路由到队列时,发现队列上不存在任何消费者,则这条消息不入队列。若与路由键匹配的所有队列都没有消费者时,RabbitMQ 会调用Basic.Return命令将消息返回给生产者。

    注:上面的 所有队列 指可能存在 topic 类型交换器绑定了多个模式匹配的队列。

  • false:交换器在消息路由到队列时,所有匹配的队列都没有消费者,消息被丢弃。

注意:RabbitMQ 3.0 版本开始去掉了对 immediate 参数的支持,官方解释是该参数会影响镜像队列的性能,增加了代码的复杂性,建议采用 TTL DLX 的方法代替。

备份交换器

Alternate Exchanges(AE,备份交换器)。当交换器无法将消息路由到目的地时(即因为没有绑定到交换器的队列,或没有匹配的队列),希望让客户端进行处理。

生产者在发送消息时如果不设置 mandatory 参数(默认 false),在消息无法路由到队列的情况下将会丢失;若设置了参数(true),Broker 返回的消息在生产者端需要添加 ReturnListener监听器来处理,这样就变得复杂且不灵活。

若不想丢失消息,也不想在生产者端增加返回监听器,可以使用 备用交换器,将未被路由的消息交由备份交换器存储,在需要的时候处理。

在声明交换器(channel.exchangeDeclare)时通过添加键为 alternate-exchange ,值为备份交换器名的属性来指定备份交换器,也可以通过 策略 的方法来实现,当两者同时使用,则代码声明的优先级更高,会覆盖掉 Policy 的设置。

示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 属性 map
Map<String, Object> args = new HashMap<String, Object>();
// 添加备份交换器属性 alternate-exchange
args.put("alternate-exchange", "my-ae");
// 声明交换器,传入参数 map
channel.exchangeDeclare("my-direct", "direct", false, false, args);
// 声明备份交换器
channel.exchangeDeclare("my-ae", "fanout");
// 声明队列
channel.queueDeclare("routed");
// 队列绑定交换器
channel.queueBind("routed", "my-direct", "key1");
// 声明队列
channel.queueDeclare("unrouted");
// 队列绑定备份交换器
channel.queueBind("unrouted", "my-ae", "");

上面代码示例,当发送到 my-direct 交换器的消息不能被正确路由与之绑定的任何队列时,此时消息就被转发到 my-ae 交换器再被路由到 unrouted 队列中。生产者可以订阅绑定到备份交换器的队列来处理未被正确路由的消息。

备份交换器实际与普通的交换器没有太大的区别,类型建议设置为 fanout,因为忽略路由键,直接投递;若设置成 directtopic 也是可以的,但要注意消息路由键与绑定键的匹配,若交换器没有匹配到合适的队列,则消息丢失。

还可以给 备份交换器 设置备份交换器,组成一个备份交换器链,无法被正确路由的消息会在整个备份交换器链中流转直到找到匹配的队列,只是较少这么使用。

使用备份交换器,注意以下情况:

  • 设置的备份交换器不存在,消息丢弃,客户端和服务器不会有异常。
  • 若备份交换器没有绑定任何队列,消息丢弃,客户端和服务器不会有异常。
  • 若备份交换器没有任何匹配的队列,消息丢弃,客户端和服务器不会有异常。
  • 若备份交换器与 mandatory 参数一起使用,则 mandatory 参数无效。

过期时间

TTL(Time-To-Live),即存活时间,或过期时间。RabbitMQ 允许对 消息队列 设置 TTL。

消息过期时间

可以通过使用策略设置message-ttl参数或在队列声明时指定相同的参数来为给定队列设置消息 TTL。

设置消息的 TTL 有两种方式:

  • 在声明队列时设置 x-message-ttl 属性参数,对队列中所有的消息起效(有相同的过期时间)。
  • 在发送消息时设置expiration属性参数,只对当前发送的消息起效。

如果上面两种方式都存在,则 TTL 在两者之间选择较小的值为准。消息在队列中生存时间超过配置的 TTL 值时,就会死亡,消费者可能无法收到该消息。

注意:使用声明队列时设置 TTL,则路由到多个队列的消息的 TTL 可能并不相同,可能在不同的时间过期,或根本不会过期,一个队列中消息的过期不会影响其他队列中同一条消息的生存时间。

声明队列设置 TTL

1
2
3
4
Map<String, Object> args = new HashMap<String, Object>();
//单位毫秒,下面设置生成 60 秒
args.put("x-message-ttl", 60000);
channel.queueDeclare("myqueue", false, false, false, args);

发送消息时设置TTL

expiration属性:必须是字符串非负整数(>=0)。单位毫秒。

1
2
3
4
5
byte[] messageBodyBytes = "Hello, world!".getBytes();
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.expiration("60000") //过期时间 60 秒,单位毫秒
.build();
channel.basicPublish("my-exchange", "routing-key", properties, messageBodyBytes);

RabbitMQ 保证不会使用 basic.deliver 命令发送死亡的消息给消费者,或将死亡的信息包含在 basic.get-ok响应中(用于一次性提取操作),此外,服务器将尝试在基于 TTL 的到期时间或之后删除消息。

TTL 参数或策略的值必须为非负整数( >= 0),单位是毫秒。如果 TTL 设置为 0 时,则表示立即将消息投递给消费者,否则直接过期。这为 RabbitMQ 不支持立即发布标志提供了替代方案,但不会返回 basic.returns命令,并且如果设置了死信交换器,消息将被死信

注意事项

通过发送消息设置 TTL ,一旦消息过期将被丢弃;通过队列设置消息 TTL ,即使消息过期,也不会立即从队列中删除,而是在每条消息到达队列头部时(即将被投递给消费者),才会被实际丢弃(或入死信队列)。

消费者不会收到过期的消息,但要记住,消息过期和传递给消者之间可能存在自然的竞争情况,例如,在将消息写入套接字之后,但在到达消费者之前,消息可能过期。

队列过期时间

也可以给队列设置 TTL,当队列未被使用的存活时间超过 TTL 值,队列就会过期被自动删除(声明队列时开启了自动删除)。

未被使用指该队列没有任何消费者,最近未被重新声明,并且在到期时间段内未调用basic.get命令。

可以在声明队列(qeueue.declare)时设置 x-expires 参数 或 设置过期策略来给队列设置过期时间。控制队列在被自动删除之前可以闲置多长时间。

服务器保证队列过期后将被删除,但不保证将如何讯速删除,若是持久队列,服务器重启后的租期重新开始。

x-expires参数 或 expires策略设置的 TTL 值以 毫秒 为单位,且必须是一个正整数,不能为 0(消息 TTL 可以为 0)。

声明队列设置过期时间

1
2
3
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-expires", 1800000);
channel.queueDeclare("myqueue", false, false, false, args);

死信交换器

Dead Letter Exchange(DLX,死信交换器)。对于 RabbitMQ ,通常所讲死信队列包含了死信交换器的概念。

当消息变成死信之后,可以被重新发送到另一个交换器中,这个交换器就是 DLX,绑定 DLX 的队列称之为 死信队列。通常有以下几种情况:

  • 消息被消费者使用 basic.rejectbasic.nack 命令拒绝,并且拒绝方法的 requeue参数设置为 false
  • 消息过期
  • 队列达到最大长度

死信交换器(DLX)是一个普通交换器,可使用任何交换器类型,使用常规声明。

注意:队列到期不会使其中的消息失效。

声明死信交换器

声明死信队列,在声明队列时设置 DLX 参数,也可以使用策略在服务器中定义 DLX,当两者都存在时,声明队列指定的参数优先级更高,会覆盖掉 Policy 的设置。

声明队列指定 DLX 参数

为一个队列设置死信交换器,在声明队列时指定 x-dead-letter-exchange 参数选项。

1
2
3
4
5
6
7
channel.exchangeDeclare("some.exchange.name", "direct");

Map<String, Object> args = new HashMap<String, Object>();
// 将声明的交换器指定为死信交换器
args.put("x-dead-letter-exchange", "some.exchange.name");
// 声明队列时传入参数
channel.queueDeclare("myqueue", false, false, false, args);

上面代码,如果 myqueue 队列存在死信,将被重新转发给死信交换器 some.exchange.name再被路由到另一个队列(死信队列),开发者可以监听这个队列进行处理。

注意:在消息需要被死信时,死信交换器应该存在;如果不存在,则消息将被静默删除。

路由死信消息

死信消息被路由到他们的死信交换器中,也可以指定在对消息进行死信时要使用的路由规则。如果未设置,则将使用消息自身的路由键。

1
args.put("x-dead-letter-routing-key", "some-routing-key");

示例:如果发布消息到带有路由键 foo 的交换器中,消息变成死信,则它将被发布到带有路由键 foo 的死信交换器中。如果队列一开始就声明了 x-dead-letter-routing-keybar,则该消息将被转发到带有路由键 bar 的死信交换器中。

重新发布死信消息到死信队列,内部启用了 发布者确认,因此,消息最终到达 死信队列(DLX路由目标)必须在将消息从原始队列中删除之前对其进行确认。即 发布(消息过期的队列)队列不会在死信队列确认收到消息之前删除消息(参才保证 确认 详情)。

注意:如果 Broker 不干将地关闭,则相同的消息可能会在原始队列或死信队列上重复。

注意:可能形成消息死信的循环。例如,当队列将消息死信到默认交换而不指定死信路由键时,可能会发生这种情况。如果死信消息两次到达同一队列,将被丢弃。

RabbitMQ DLX

死信消息分析

死信一条消息会修改消息头:

  • 交换器名会被替换为最新的死信交换器
  • 路由键会被替换为执行死信的队列指定的路由键
  • 如果上面都发生,则 CC 头将被删除
  • BCC 头将按照发送者选择的分发删除

死信处理过程会在每个名为x-death的死信消息头中添加一个数组。 该数组包含每次死信事件的元素,由一对{queue,reason}标识。 每个这样的元素都是一个包含以下几个字段的表:

  • queue:消息死信前所在的队列名
  • reason:死信的原因
  • time:死信的日期时间,是一个 64 位的 AMQP 0-9-1 时间戳
  • exchange:消息被发布到的交换器(如果消息多次死信,则会是一个死信交换器)
  • routing-key:消息被发布所带的路由键(包含 CC 键,但不包含 BCC
  • count:消息在此列队因此原因被死信的次数
  • original-expiration(如何消息通过发布设置 TTL):消息原始的expiration属性值。在死信过程中,expiration 属性会被移除是为了防止它被路由到任何队列再次出现过期。

如果有新的死信事件元数,将插入到 x-death数组的开头。如果 x-death已经包含具有相同队列和死信原因的元素,则其 count 字段值加 1,并将此元素移至数组开头。

reason 描述消息被死信可能的情况:

  • rejected:属性requeue设置为 false的消息被拒绝(即被拒绝且不能重新排队的消息)。
  • expired:消息的 TTL 过期了
  • maxlen:队列达到最大长度了

为第一个死信事件添加了三个顶级的头信息:

  • x-first-death-reason

  • x-first-death-queue

  • x-first-death-exchange

他们与原始死信事件中的 reason,queue,exchange 字段的值相同,一旦添加这些头就永不被修改。

注意:数组是按最近优先排序的,所以最近的死信将被记录到第一个元素中。

延迟队列

延迟队列概念

RabbitMQ 自身并没有直接支持延迟队列的功能,但可以通过 DLX 配合 TTL 可以实现延迟队列。

延迟队列 指其中的消息带有延迟时间的队列。即指定延迟时间消息才能被消费者消费。

延迟队列应用场景:

  1. 订单系统中,用户下单 30 分种后若未完成执行,则取消订单。

    订单系统将订单数据入库,同时发送一条带有 30 分钟延时(TTL)的消息到队列,超时则消息被转发到死信交换器并路由到死信队列,订单系统开启监听死信队列,收到消息则去做订单检查。

  2. 物联网场景,远程摇控家里浴缸 20 分钟后开始放水。

RabbitMQ 中的死信队列可以看作延迟队列。见上面 路由死信消息 小节的图, 假设某个业务需要延迟 10 秒的消息,生产者发送 TTL 值为 10000 的消息到常规交换器(exchange.normal)并路由到常规队列(queue.normal),这个常规队列设置了死信交换器(exchange.dlx),声明了一个死信队列(queue.dlx)绑定了死信交换器,消费者订阅此死信队列,收到的恰好是延迟 10 秒的消息。

在实际应用中,延迟列队会根据延迟时间的长度分为多个等级,一般分为 5s,10s,30s,1m,5m,10m,30m,1h 这几个维度,当然可以根据自身业务需要进行细化,选择不同延迟等级的延迟队列进行消费。

延时队列插件

RabbitMQ 其中一个强大的功能是支持插件,可以从 RabbitMQ 插件列表 查找各种满足需求的插件。

延时队列除了使用 DLX 配合 TTL 来实现,还可使用延时队列插件(rabbitmq/rabbitmq-delayed-message-exchange)。

  1. 启用插件

    1
    rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  2. 声明一个x-delayed-message类型的交换器

    1
    2
    3
    4
    5
    // ... elided code ...
    Map<String, Object> args = new HashMap<String, Object>();
    args.put("x-delayed-type", "direct");
    channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args);
    // ... more code ...
  3. 发布消息,带扩展头x-delay 的属性

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    // ... elided code ...
    byte[] messageBodyBytes = "delayed payload".getBytes("UTF-8");
    Map<String, Object> headers = new HashMap<String, Object>();
    headers.put("x-delay", 5000);
    AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers);
    channel.basicPublish("my-exchange", "", props.build(), messageBodyBytes);

    byte[] messageBodyBytes2 = "more delayed payload".getBytes("UTF-8");
    Map<String, Object> headers2 = new HashMap<String, Object>();
    headers2.put("x-delay", 1000);
    AMQP.BasicProperties.Builder props2 = new AMQP.BasicProperties.Builder().headers(headers2);
    channel.basicPublish("my-exchange", "", props2.build(), messageBodyBytes2);
    // ... more code ...

优先队列

RabbitMQ 从 3.5.0 版本开始,提供了优先级队列实现。优先级队列:即优先级高的消息优先被消费。

要声明优先级队列时,在声明队列时使用参数 x-max-priority ,该参数值应为 1 至 255 之间的正整数,指示队列支持的最大优先级,建议使用 1 到 10 之间。

声明优先级队列

1
2
3
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-max-priority", 10);
channel.queueDeclare("my-priority-queue", true, false, false, args);

发布者可以使用 basic.propertiespriority 字段发布优先级消息。数字越大表示优先级越高。

1
2
3
4
AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder();
props.priority(5);
props.build();
channel.basicPublish("exchange.priority", "rk.priority", props, "msg".getBytes());

AMQP 0-9-1规范对于优先级如何工作有点含糊。它说所有队列必须至少支持 2 个优先级,并且可以支持多达 10 个优先级。它没有定义如何处理没有优先级属性的消息。

与 AMQP 0-9-1 规范相反,RabbitMQ 队列默认情况下不支持优先级。 创建优先级队列时,可以根据开发人员的需要选择最大优先级。 选择值时,必须考虑以下几点:

每个队列的每个优先级级别都有一些内存和磁盘成本,还有额外的 CPU 成本,尤其是在消费的时候,因此最好不要创建大量的优先级别。

消息优先级字段 priority 被定义为无符号的字节(byte),因此在实践中优先级应在 0 到 255 之间。没有优先级属性 priority 的消息将被视为其优先级为 0,即普通消息有个默认优先级是 0。若消息优先级高于队列允许的最大优先级,消息将被视为以最大优先级发布

如果需要优先级队列,建议优先级别使用 1 到 10 之间的队列。若使用更多的优先级别将通过使用 Erlange 进程来消耗更多的 CPU 资源,运行时调度(Runtime scheduling)也将受到影响。

默认情况下,Broker 可能会在消费者回传确认之前向消费者发送大量的消息,仅受网络带宽的限制;为了保证消息被消费的优先级顺序,在消息者端可以手动确认模式使用 basic.qos命令设置预取消息为 1(),以限制可以随时传递的消息数量,从而使消息具有优先级,但也会影响吞吐量。

1
2
3
boolean autoAck = false;
channel.basicQos(1);
channel.basicConsume(queueName, autoAck, new DefaultConsumer(channel) {...});

注意:如果消费者的消费速度大于生产者的速度,即队列中的消息不会积压,消息不会花费任何时间在队列中等待。在这种情况下,优先级队列就不会有任何机会对消息进行优先排序,消息的优先级也就没有意义。

持久化

持久化可以提高 RabbitMQ 的可靠性,以防服务器异常(重启,关闭,宕机)时数据丢失,以最大程度保证数据可恢复。

RabbitMQ 的持久化分为三个部分:交换器持久化,队列持久化,消息持久化

交换器持久化

交换器持久化:在声明交换器时将 durable属性设置为 true来启用持久化(保存交换器元数据到磁盘)。

持久化交换器在 broker 重启后会被重新声明恢复。若未持久化交换器,交换器的元数据会丢失。对于一个长期使用的交换器,建议置为持久化。

声明持久化交换器

1
2
3
4
//调用
channel.exchangeDeclare("exchange.name", "route.key", true);
//方法
Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException;

队列持久化

队列持久化:在声明队列时,通过设置参数 durabletrue来开启持久化。

1
2
3
4
5
//调用
channel.queueDeclare("queue.name", true, false, true, args);
//方法
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive,
boolean autoDelete, Map<String, Object> arguments) throws IOException;

持久队列将被保存到磁盘,因此在 broker 重启后队列仍然存在。非持久队列,在 broker 重启后,队列和队列中的消息都会丢失,也并不是所有场景都需要使用持久队列。

队列的持久性不会使该队列中的消息持久化。如果 broker 关闭后恢复,则在 broker 启动期间将重新声明持久队列(相同的队列元数据),但仅持久化消息将被恢复。

要想在 Broker 重启后能够恢复消息,队列持久化和消息持久化都必须开启。

消息持久化

消息持久化:持久消息将被保存到磁盘,只到它被正确消费,broker 才会将其从内存/磁盘中删除,要想 broker 重启后恢复消息,还必须将消息所在的队列持久化。

在发布消息时,将消息的投递模式(BasicProperties 中的 deliveryMode 属性)设置为 2。其中有个 MessageProperties 常量工具类,提供了一些默认了消息属性的方法。

设置消息的传输模式:

1
2
3
4
AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder();
props.deliveryMode(2);//持久化
props.build();
channel.basicPublish("exchange.name", "route.key", props, "msg".getBytes());

MessageProperties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/** Empty basic properties, with only deliveryMode set to 2 (persistent) */
public static final BasicProperties MINIMAL_PERSISTENT_BASIC =
new BasicProperties(null, null, null, 2,
null, null, null, null,
null, null, null, null,
null, null);
/** Content-type "text/plain", deliveryMode 2 (persistent), priority zero */
public static final BasicProperties PERSISTENT_TEXT_PLAIN =
new BasicProperties("text/plain",
null,
null,
2,
0, null, null, null,
null, null, null, null,
null, null);

将交换器,队列,消息都持久化后,并不能百分之百保证数据不丢失。

  • 如果消者端将 autoAck参数设置为 true,即开启了自动确认,消费者在接收到消信后或在处理消息中宕机,数据也算丢失。

    如果需要严格的消费者确认,将 autoAck 设置为 false,并进行手动确认。

  • 持久化到磁盘,RabbitMQ并不会为每条消息都进行同步写入磁盘,依赖操作系统的的处理,可能仅保存在操作系统的缓存之中,还未写入磁盘,当系统故障时,可能导致数据丢失。

    RabbitMQ 引入了镜像队列机制,相当于配置了副本,当主节挂了,自动切换到从节点,保证了高可用。

    在实际生产环境中,关键业务队列一般都会设置镜像队列。

还可以在发送端引入事务机制发送方确认机制 来保证消息已正确发送并存储到 RabbitMQ 中。

相关参考

  1. 官方:RabbitMQ Document
  2. 官方:Alternate Exchanges(备份交换器)
  3. 官方:Time-To-Live and Expiration(生存时间和过期)
  4. 官方:Dead Letter Exchanges(死信交换器)
  5. 官方:Queue Length Limits(队列长度限制)
  6. 官方:Priority Queue Support(优先队列支持)
  7. 官方:Persistence Configuration(持久化配置)
  8. 官方:Community Plugins(社区插件列表)
  9. Github > RabbitMQ/rabbitmq-delayed-message-exchange

MQ系列(八):RabbitMQ过期时间,死信队列,延迟队列,优先队列,持久化

http://blog.gxitsky.com/2020/02/20/MQ-08-RabbitMQ-ttl-dead-delay-per-prio/

作者

光星

发布于

2020-02-20

更新于

2022-06-07

许可协议

评论