延时队列 之 MQ实现方案 及 RabbitMQ TTL+DLX 或插件实现

MQ 消息中间件基本都支持延时消息消息过期处理相关功能,可以使用此功能来实现消息队列。

使用 MQ 实现延时队列的基本逻辑,是将消息发送到普通队列,让消息自动过期被转发路由到死信队列,消费者订阅死信队列。

MQ实现延时队列

RabbitMQ实现延时队列

RabbitMQ 支持消息过期和死信列队,可以借助此功能实现延时队列。

基本逻辑是,将消息发送到普通队列,让消息自动过期被转发路由到死信队列,消费者订阅死信队列。

RabbitMQ 实现延时队列有两种方式:

  • 在 RabbitMQ 可以对 Queue 设置 x-expires 过期时间或者对 message设置超时时间x-message-ttl

    注意:延时相同的消息要扔到同一个队列中,对于每一个延时要建立一个与之对应的队列,这是因为 RabbitMQ 的过期检测是惰性检测的,同样可以保证消息处理的顺序性。

  • 使用 RabbitMQ 的 rabbitmq-delayed-message-exchange 插件来实现延时队列。

RocketMQ实现延时队列

RocketMQ 在发送延时消息时,是先把消息按照延迟时间段发送到指定的队列中(把延时时间段相同的消息放到同一个队列中,保证了消息过期能被及时顺序地处理,可以让同一个队列中消息延时时间是相同的,整个 RocketMQ 中延时消息时按照递增顺序排序,保证信息处理的先后顺序性)。

然后通过一个定时器来轮询处理这些队列里的信息,判断是否到期。对于到期的消息会发送到相应的处理队列中,进行业务处理。

注意 :目前 RocketMQ 只支持特定的延时时间段,1s,5s,10s,…2h,不能支持任意时间段的延时设置。

Kafka实现延时队

Kafka 基于时间轮算法自定义了一个用于实现延迟功能的定时器(SystemTimer),Kafka中的时间轮(TimingWheel)是一个存储定时任务的环形队列,可以进行相关的延时队列设置。

Kafka 这种消息队列存储来实现延时功能,每个队列的时间都需要创建一个单独的 topic(如: Q1-1s, Q1-2s..)。这种设计在延时时间比较固定的场景下问题不太大,但如果是延时时间变化比较大会导致 topic 数目过多,会把磁盘从顺序读写会变成随机读写从导致性能衰减,同时也会带来其他类似重启或者恢复时间过长的问题。

TTL+DLX实现

RabbitMQ 的过期时间死信队列,可参考 RabbitMQ(四):过期时间,死信队列,延迟队列,优先队列,持久化

基于 RabbitMQ 的 TTL + DLX 实现延迟队列的基本逻辑是:

  1. 生产者发送消息到普通队列(这里称为缓冲队列),消息设置了过期时间,给缓存冲队列绑定死信交换机。
  2. 让缓冲队列中的消息自动过期,死信消息被转发到死信交换机。
  3. 死信交换机再将死信消息路由到死信队列。
  4. 消费都监听死信队列进行处理。

延时队列逻辑

TTL

TTL(Time To Live):消息的存活时间,即消息在队列的保留时间,等同于过期时间,单位为毫秒

当消息在存活期间内没被消费,就变为 死信(Dead Letter)。如果一个队列消息都设置了过期时间,则取小的。

设置消息的 TTL 有两种方式:

  • 在声明队列时设置 x-message-ttl 属性参数,对队列中所有的消息起效(有相同的过期时间)。

    1
    2
    3
    Map<String, Object> args = new HashMap<String, Object>();
    args.put("x-message-ttl", 6000);
    channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);
  • 在发送消息时设置expiration属性参数,只对当前发送的消息起效。

    1
    2
    3
    4
    byte[] messageBodyBytes = "Hello, world!".getBytes();
    AMQP.BasicProperties properties = new AMQP.BasicProperties();
    properties.setExpiration("60000");
    channel.basicPublish("my-exchange", "routing-key", properties, messageBodyBytes);

设置了消息过期时间,还需要设置 死信交换机,过期的消息要被路由到 死信交换

消息转为死信的三种形式:

  • 消息被拒绝。通过调用 basic.rejectbasic.nack,并且设置的 requeue参数为 false
  • 消息设置了 TTL 而过期。
  • 消息进入了一条已达最大长度的队列。

DLX

Dead Letter Exchange 实际就是一种普通的 exchange,可使用任何交换器类型,使用常规声明即可。

为一个队列设置死信交换器,在声明队列时指定 x-dead-letter-exchange 参数选项。

声明死信交换器

1
2
3
4
5
6
7
channel.exchangeDeclare("some.exchange.name", "direct");

Map<String, Object> args = new HashMap<String, Object>();
// 将声明的交换器指定为死信交换器
args.put("x-dead-letter-exchange", "some.exchange.name");
// 声明队列时传入参数
channel.queueDeclare("myqueue", false, false, false, args);

上面代码,如果 myqueue 队列存在死信,将被重新转发给死信交换器 some.exchange.name再被路由到另一个队列(死信队列),开发者可以监听这个队列进行处理。

注意:在消息需要被死信时,死信交换器应该存在;如果不存在,则消息将被静默删除。

注意:相同过期时间的消息应发送到相同的队列,这样可以保证消息过期能被及时处理;队列消费是先进先出(FIFO),如果不同过期时间的消息存在同一个队列里,因为消息过期是隋性处理(即是在投递到消费者前判断的),如果当前队列消息积压严重,或队列头的消息过期时间远大于后面消息的过期时间,则已过期的消息可能在队列中存活很长时间而没有被及时处理。

RabbitMQ 的延时队列插件不存在此问题,插件是把消息缓存在交换器并做延时检测,再把过期的消息投递到队列。

实现示例

基于 Spring Boot + spring-boot-starter-amqp 实现

添加依赖

Maven pom.xml 添加 Spring Amqp 依赖:

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置连接

properties 文件配置 RabbitMQ 连接:

1
2
3
4
5
6
7
8
9
10
#===========Rabbit MQ=====================
spring.rabbitmq.host=192.168.0.120
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
spring.rabbitmq.virtual-host=/
spring.rabbitmq.listener.type=simple
spring.rabbitmq.listener.simple.acknowledge-mode=manual
#spring.rabbitmq.listener.type=direct
#spring.rabbitmq.listener.direct.acknowledge-mode=manual

注意,Spring AMQP 默认配置的 RabbitMQ 的监听器类型是 simple,消费者确认模式默认是自动(AUTO),确认会删除消息,关闭 channel。如果这个时候再调用手动确认(`channel.basicAck),因为消费已被删除会报错的:

1
Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)

这时需要设置监听器的确认模式为手动(manual):

1
2
3
4
spring.rabbitmq.listener.type=simple
spring.rabbitmq.listener.simple.acknowledge-mode=manual
#spring.rabbitmq.listener.type=direct
#spring.rabbitmq.listener.direct.acknowledge-mode=manual

声明交换机和队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
* @desc: 延时队列 TTL+DLX
*/
@Configuration
public class RabbitMQConfig {

// 延时
public static final String DELAY_EXCHANGE_NAME = "delay.exchange";
public static final String DELAY_QUEUE_NAME = "delay.queue";
public static final String DELAY_MESSAGE_ROUTE_KEY = "delay.route.key";
// 死信
public static final String DEAD_LETTER_EXCHANGE_NAME = "dead.letter.exchange";
public static final String DEAD_LETTER_QUEUE_NAME = "dead.letter.queue";
public static final String DEAD_LETTER_ROUTE_KEY = "dead.letter.route.key";

/**
* 声明延时交换器
*
* @return
*/
@Bean(name = "delayExchange")
public DirectExchange delayExchange() {
return new DirectExchange(DELAY_EXCHANGE_NAME);
}

/**
* 声明延时队列
*
* @return
*/
@Bean(name = "delayQueue")
public Queue delayQueue() {
Map<String, Object> args = new HashMap<>(2);
// x-dead-letter-exchange 声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME);
// x-dead-letter-routing-key 声明当前队列的死信路由key
args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTE_KEY);
// x-message-ttl 声明队列的TTL, 单位 毫秒
args.put("x-message-ttl", 10000);
return QueueBuilder.durable(DELAY_QUEUE_NAME).withArguments(args).build();
}

/**
* 声明死信交换器
*
* @return
*/
@Bean(name = "deadLetterExchange")
public DirectExchange deadLetterExchange() {
return new DirectExchange(DEAD_LETTER_EXCHANGE_NAME);
}

/**
* 声明死信队列
*
* @return
*/
@Bean(name = "deadLetterQueue")
public Queue deadLetterQueue() {
return QueueBuilder.nonDurable(DEAD_LETTER_QUEUE_NAME).build();
}

/**
* 延时队列绑定延时交换器
*
* @param delayExchange
* @param delayExchange
* @return
*/
@Bean
public Binding delayQueueBind(@Qualifier(value = "delayQueue") Queue delayQueue,
@Qualifier(value = "delayExchange") DirectExchange delayExchange) {
return BindingBuilder.bind(delayQueue).to(delayExchange).with(DELAY_MESSAGE_ROUTE_KEY);
}

/**
* 死信队列绑定死信交换器
*
* @param deadLetterQueue
* @param deadLetterExchange
* @return
*/
@Bean
public Binding deadLetterQueueBind(@Qualifier(value = "deadLetterQueue") Queue deadLetterQueue,
@Qualifier(value = "deadLetterExchange") DirectExchange deadLetterExchange) {
return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with(DEAD_LETTER_ROUTE_KEY);
}
}

创建生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Component
public class MessageProduce {

@Autowired
private RabbitTemplate rabbitTemplate;

public void sendDelayMessage(String msg) throws IOException {
ConnectionFactory connectionFactory = rabbitTemplate.getConnectionFactory();
Connection connection = connectionFactory.createConnection();
Channel channel = connection.createChannel(false);
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.expiration("6000") //设置消息过期时间 6 秒,单位毫秒
.build();
channel.basicPublish(RabbitMQConfig.DELAY_EXCHANGE_NAME, RabbitMQConfig.DELAY_MESSAGE_ROUTE_KEY, properties, msg.getBytes());
// rabbitTemplate.convertAndSend(RabbitMQConfig.DELAY_EXCHANGE_NAME, RabbitMQConfig.DELAY_MESSAGE_ROUTE_KEY, msg);
}
}

创建消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import com.rabbitmq.client.Channel;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
* @desc 消费者
*/
@Component
public class MessageConsumer {
private static final Logger logger = LogManager.getLogger(MessageConsumer.class);

/**
* 监听消息
*
* @param message
* @param channel
* @param msgBody
*/
@RabbitListener(queues = RabbitMQConfig.DEAD_LETTER_QUEUE_NAME)
public void receiveDelayMsg(Message message, Channel channel, String msgBody) {
String msg = new String(message.getBody());
logger.info("receive message:{}, currentTime:{}, ", msgBody, System.currentTimeMillis() / 1000);
try {
//.....处理业务......
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (IOException e) {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, true);
e.printStackTrace();
}
}
}

发送消息接口

1
2
3
4
5
6
7
8
9
10
11
12
@RestController
@RequestMapping("/rabbitmq")
public class RabbitMQMsgController {

@Autowired
private MessageProduce messageProduce;

@RequestMapping("/send")
public void sendDelayMsg(String msg) {
messageProduce.sendDelayMessage(msg + ":" + System.currentTimeMillis() / 1000);
}
}

收到的消息:

1
2
3
4
5
receive message:hello1:1616512604, currentTime:1616512610, 
receive message:hello2:1616512606, currentTime:1616512612,
receive message:hello3:1616512607, currentTime:1616512613,
receive message:hello4:1616512609, currentTime:1616512615,
receive message:hello5:1616512610, currentTime:1616512616,

延时队列插件

RabbitMQ Delayed Message Plugin:rabbitmq_delayed_message_exchange,该插件将延迟消息(或计划消息)添加到 RabbitMQ。

用户可以声明类型为x-delayed-message的交换,然后发布带有自定义头为x-delay的消息,该消息以毫秒为单位表示消息的延迟时间。该消息将在 x 延迟毫秒后被投递到相应的队列。

安装插件

  1. 查看插件列表:rabbitmq-plugins list

    默认是没有安装 rabbitmq_delayed_message_exchange 插件的。

  2. 进入 RabbitMQ 的插件存放目录:/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.14/plugins

  3. 下载 rabbitmq_delayed_message_exchange 插件,注意版本号的匹配

    1
    wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.8.9/rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez
  4. 启用插件

    1
    rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  5. 具体使用可参考官方文档,或 RabbitMQ(四):过期时间,死信队列,延迟队列,优先队列,持久化 > 延时队列,及下面实现示例。

实现示例

声明交换机和队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
* @desc: 延时队列 TTL+DLX
*/
@Configuration
public class RabbitMQConfig {

/*-------RabbitMQ Plugin:rabbitmq_delayed_message_exchange ----------------*/
public static final String X_DELAYED_EXCHANGE_NAME = "x-delayed-exchange";
public static final String X_DELAYED_EXCHANGE_TYPE = "x-delayed-message";
public static final String X_DELAYED_QUEUE_NAME = "x-delayed-queue";
public static final String X_DELAYED_ROUTE_KEY = "x-delayed-route-key";


/**
* 声明延时队列
*
* @return
*/
@Bean(name = "xDelayedQueue")
public Queue xDelayedQueue() {
return QueueBuilder.nonDurable(X_DELAYED_QUEUE_NAME).build();
}

/**
* 声明延时交换器
*
* @return
*/
@Bean
public CustomExchange customExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange(X_DELAYED_EXCHANGE_NAME, X_DELAYED_EXCHANGE_TYPE, true, false, args);
}

/**
* 队列与交换器绑定
*
* @param queue
* @param customExchange
* @return
*/
@Bean
public Binding bindingNotify(@Qualifier("xDelayedQueue") Queue queue,
@Qualifier("customExchange") CustomExchange customExchange) {
return BindingBuilder.bind(queue).to(customExchange).with(X_DELAYED_ROUTE_KEY).noargs();
}

}

创建生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.Connection;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.nio.charset.Charset;
import java.util.HashMap;

/**
* @desc
*/
@Component
public class MessageProduce {

@Autowired
private RabbitTemplate rabbitTemplate;

public void xDelayedMessage(String msg) throws IOException {

// channel.basicPublish
/*
Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(false);;
HashMap<String, Object> headers = new HashMap<>();
headers.put("x-delay", 5000);
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().headers(headers).build();
channel.basicPublish(RabbitMQConfig.X_DELAYED_EXCHANGE_NAME, RabbitMQConfig.X_DELAYED_ROUTE_KEY, props, msg.getBytes(Charset.defaultCharset()));
*/

// rabbitTemplate.convertAndSend
rabbitTemplate.convertAndSend(RabbitMQConfig.X_DELAYED_EXCHANGE_NAME, RabbitMQConfig.X_DELAYED_ROUTE_KEY, msg, message -> {
// 下面两都效果一样
message.getMessageProperties().setDelay(6000);
// message.getMessageProperties().setHeader("x-delay", 5000);
return message;
});
}
}

创建消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import com.rabbitmq.client.Channel;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* @desc 消费者
*/
@Component
public class MessageConsumer {
private static final Logger logger = LogManager.getLogger(MessageConsumer.class);

/**
* 监听消息
*
* @param message
* @param channel
* @param msgBody
*/
@RabbitListener(queues = RabbitMQConfig.X_DELAYED_QUEUE_NAME)
public void xDelayedListener(Message message, Channel channel, String msgBody) {
String msg = new String(message.getBody());
logger.info("receive message:{}, currentTime:{}, ", msgBody, System.currentTimeMillis() / 1000);
// try {
// //.....处理业务......
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// } catch (IOException e) {
// channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, true);
// e.printStackTrace();
// }
}

}

启动服务

启动服务,查看 RabbitMQ 的 Web 管理平台的交换器,存在一个类型为 x-delayed-message 的交换器。

rabbitmq-plugin-delayed-exchange

TTL+DLX与插件区别

  • TTL+DLX:先进先出(FIFO),如果不同过期的消息存放在同一个队列,可能因为先进的消息未过期而导致后进的消息已过期而不被立即处理
  • 延时插件:消息能被及时处理。

相关资料

相关参考

  1. 一文带你搞定RabbitMQ延迟队列:详细
  2. 实现一个延时队列:模拟 DelayQueue 实现自定义的延时对列,对理解 DelayQueue 实现原理非常有帮助。
  3. 有赞延迟队列设计:基于 Redis 实现,把定时任务和消费进行了拆分。
  4. 延时队列实现思路:Redis,RabbitMQ,Kafka,Netty,DelayQueue,没有示例代码。
  5. 定时任务实现几种方式:@schedule 注解,Timer & TimerTask,Quartz,ScheduleExecutorService。
  6. 美图延时队列实现-LMSTFY:基于 Redis 实现,LMSTFY Github地址
  7. Redis实现消息队列:借助了 Redis 的 List 的 BLPOP 或 BRPOP 阻塞消费消息。
  8. Lua Guava-EventBus 实现延时队列,这个实现思路值得参考。
  9. 10种延迟任务实现方式:做了汇总,有示例代码,可参考。
  10. Redus 过期 Key 监听与发布订阅功能:有详情的代码示例参考。
  11. Spring Messaging with Redis:Spring 官方手册,基于 Redis 的 发布/订阅 来发送消息。
  12. Spring Messaging with RabbitMQ:Spring 官方手册,基于 RabbitMQ 的 发布/订阅 来发送消息。

延时队列 之 MQ实现方案 及 RabbitMQ TTL+DLX 或插件实现

http://blog.gxitsky.com/2021/03/24/ArchitectureDesign-Distrbuted-Task-DelayQueue-RabbitMQ/

作者

光星

发布于

2021-03-24

更新于

2022-07-12

许可协议

评论