MQ 消息中间件基本都支持延时消息或消息过期处理相关功能,可以使用此功能来实现消息队列。
使用 MQ 实现延时队列的基本逻辑,是将消息发送到普通队列,让消息自动过期被转发路由到死信队列,消费者订阅死信队列。
MQ实现延时队列
RabbitMQ实现延时队列
RabbitMQ 支持消息过期和死信列队,可以借助此功能实现延时队列。
基本逻辑是,将消息发送到普通队列,让消息自动过期被转发路由到死信队列,消费者订阅死信队列。
RabbitMQ 实现延时队列有两种方式:
在 RabbitMQ 可以对 Queue 设置 x-expires
过期时间或者对 message
设置超时时间x-message-ttl
。
注意:延时相同的消息要扔到同一个队列中,对于每一个延时要建立一个与之对应的队列,这是因为 RabbitMQ 的过期检测是惰性检测的,同样可以保证消息处理的顺序性。
使用 RabbitMQ 的 rabbitmq-delayed-message-exchange
插件来实现延时队列。
RocketMQ实现延时队列
RocketMQ 在发送延时消息时,是先把消息按照延迟时间段发送到指定的队列中(把延时时间段相同的消息放到同一个队列中,保证了消息过期能被及时顺序地处理,可以让同一个队列中消息延时时间是相同的,整个 RocketMQ 中延时消息时按照递增顺序排序,保证信息处理的先后顺序性)。
然后通过一个定时器来轮询处理这些队列里的信息,判断是否到期。对于到期的消息会发送到相应的处理队列中,进行业务处理。
注意 :目前 RocketMQ 只支持特定的延时时间段,1s,5s,10s,…2h,不能支持任意时间段的延时设置。
Kafka实现延时队
Kafka 基于时间轮算法自定义了一个用于实现延迟功能的定时器(SystemTimer
),Kafka中的时间轮(TimingWheel
)是一个存储定时任务的环形队列,可以进行相关的延时队列设置。
Kafka 这种消息队列存储来实现延时功能,每个队列的时间都需要创建一个单独的 topic(如: Q1-1s, Q1-2s..)。这种设计在延时时间比较固定的场景下问题不太大,但如果是延时时间变化比较大会导致 topic 数目过多,会把磁盘从顺序读写会变成随机读写从导致性能衰减,同时也会带来其他类似重启或者恢复时间过长的问题。
TTL+DLX实现
RabbitMQ 的过期时间和死信队列,可参考 RabbitMQ(四):过期时间,死信队列,延迟队列,优先队列,持久化。
基于 RabbitMQ 的 TTL + DLX 实现延迟队列的基本逻辑是:
- 生产者发送消息到普通队列(这里称为缓冲队列),消息设置了过期时间,给缓存冲队列绑定死信交换机。
- 让缓冲队列中的消息自动过期,死信消息被转发到死信交换机。
- 死信交换机再将死信消息路由到死信队列。
- 消费都监听死信队列进行处理。
TTL
TTL(Time To Live):消息的存活时间,即消息在队列的保留时间,等同于过期时间,单位为毫秒。
当消息在存活期间内没被消费,就变为 死信(Dead Letter)。如果一个队列和消息都设置了过期时间,则取小的。
设置消息的 TTL 有两种方式:
设置了消息过期时间,还需要设置 死信交换机,过期的消息要被路由到 死信交换。
消息转为死信的三种形式:
- 消息被拒绝。通过调用
basic.reject
或 basic.nack
,并且设置的 requeue
参数为 false
。
- 消息设置了 TTL 而过期。
- 消息进入了一条已达最大长度的队列。
DLX
Dead Letter Exchange 实际就是一种普通的 exchange
,可使用任何交换器类型,使用常规声明即可。
为一个队列设置死信交换器,在声明队列时指定 x-dead-letter-exchange
参数选项。
声明死信交换器:
1 2 3 4 5 6 7
| channel.exchangeDeclare("some.exchange.name", "direct");
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "some.exchange.name");
channel.queueDeclare("myqueue", false, false, false, args);
|
上面代码,如果 myqueue
队列存在死信,将被重新转发给死信交换器 some.exchange.name
再被路由到另一个队列(死信队列),开发者可以监听这个队列进行处理。
注意:在消息需要被死信时,死信交换器应该存在;如果不存在,则消息将被静默删除。
注意:相同过期时间的消息应发送到相同的队列,这样可以保证消息过期能被及时处理;队列消费是先进先出(FIFO),如果不同过期时间的消息存在同一个队列里,因为消息过期是隋性处理(即是在投递到消费者前判断的),如果当前队列消息积压严重,或队列头的消息过期时间远大于后面消息的过期时间,则已过期的消息可能在队列中存活很长时间而没有被及时处理。
RabbitMQ 的延时队列插件不存在此问题,插件是把消息缓存在交换器并做延时检测,再把过期的消息投递到队列。
实现示例
基于 Spring Boot + spring-boot-starter-amqp 实现
添加依赖
Maven pom.xml 添加 Spring Amqp 依赖:
1 2 3 4
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
|
配置连接
properties 文件配置 RabbitMQ 连接:
1 2 3 4 5 6 7 8 9 10
| spring.rabbitmq.host=192.168.0.120 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=123456 spring.rabbitmq.virtual-host=/ spring.rabbitmq.listener.type=simple spring.rabbitmq.listener.simple.acknowledge-mode=manual
|
注意,Spring AMQP 默认配置的 RabbitMQ 的监听器类型是 simple
,消费者确认模式默认是自动(AUTO
),确认会删除消息,关闭 channel
。如果这个时候再调用手动确认(`channel.basicAck),因为消费已被删除会报错的:
1
| Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
|
这时需要设置监听器的确认模式为手动(manual
):
1 2 3 4
| spring.rabbitmq.listener.type=simple spring.rabbitmq.listener.simple.acknowledge-mode=manual
|
声明交换机和队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
| import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
import java.util.HashMap; import java.util.Map;
@Configuration public class RabbitMQConfig {
public static final String DELAY_EXCHANGE_NAME = "delay.exchange"; public static final String DELAY_QUEUE_NAME = "delay.queue"; public static final String DELAY_MESSAGE_ROUTE_KEY = "delay.route.key"; public static final String DEAD_LETTER_EXCHANGE_NAME = "dead.letter.exchange"; public static final String DEAD_LETTER_QUEUE_NAME = "dead.letter.queue"; public static final String DEAD_LETTER_ROUTE_KEY = "dead.letter.route.key";
@Bean(name = "delayExchange") public DirectExchange delayExchange() { return new DirectExchange(DELAY_EXCHANGE_NAME); }
@Bean(name = "delayQueue") public Queue delayQueue() { Map<String, Object> args = new HashMap<>(2); args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME); args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTE_KEY); args.put("x-message-ttl", 10000); return QueueBuilder.durable(DELAY_QUEUE_NAME).withArguments(args).build(); }
@Bean(name = "deadLetterExchange") public DirectExchange deadLetterExchange() { return new DirectExchange(DEAD_LETTER_EXCHANGE_NAME); }
@Bean(name = "deadLetterQueue") public Queue deadLetterQueue() { return QueueBuilder.nonDurable(DEAD_LETTER_QUEUE_NAME).build(); }
@Bean public Binding delayQueueBind(@Qualifier(value = "delayQueue") Queue delayQueue, @Qualifier(value = "delayExchange") DirectExchange delayExchange) { return BindingBuilder.bind(delayQueue).to(delayExchange).with(DELAY_MESSAGE_ROUTE_KEY); }
@Bean public Binding deadLetterQueueBind(@Qualifier(value = "deadLetterQueue") Queue deadLetterQueue, @Qualifier(value = "deadLetterExchange") DirectExchange deadLetterExchange) { return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with(DEAD_LETTER_ROUTE_KEY); } }
|
创建生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @Component public class MessageProduce {
@Autowired private RabbitTemplate rabbitTemplate;
public void sendDelayMessage(String msg) throws IOException { ConnectionFactory connectionFactory = rabbitTemplate.getConnectionFactory(); Connection connection = connectionFactory.createConnection(); Channel channel = connection.createChannel(false); AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .expiration("6000") .build(); channel.basicPublish(RabbitMQConfig.DELAY_EXCHANGE_NAME, RabbitMQConfig.DELAY_MESSAGE_ROUTE_KEY, properties, msg.getBytes());
} }
|
创建消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| import com.rabbitmq.client.Channel; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
import java.io.IOException;
@Component public class MessageConsumer { private static final Logger logger = LogManager.getLogger(MessageConsumer.class);
@RabbitListener(queues = RabbitMQConfig.DEAD_LETTER_QUEUE_NAME) public void receiveDelayMsg(Message message, Channel channel, String msgBody) { String msg = new String(message.getBody()); logger.info("receive message:{}, currentTime:{}, ", msgBody, System.currentTimeMillis() / 1000); try { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (IOException e) { channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, true); e.printStackTrace(); } } }
|
发送消息接口
1 2 3 4 5 6 7 8 9 10 11 12
| @RestController @RequestMapping("/rabbitmq") public class RabbitMQMsgController {
@Autowired private MessageProduce messageProduce;
@RequestMapping("/send") public void sendDelayMsg(String msg) { messageProduce.sendDelayMessage(msg + ":" + System.currentTimeMillis() / 1000); } }
|
收到的消息:
1 2 3 4 5
| receive message:hello1:1616512604, currentTime:1616512610, receive message:hello2:1616512606, currentTime:1616512612, receive message:hello3:1616512607, currentTime:1616512613, receive message:hello4:1616512609, currentTime:1616512615, receive message:hello5:1616512610, currentTime:1616512616,
|
延时队列插件
RabbitMQ Delayed Message Plugin:rabbitmq_delayed_message_exchange,该插件将延迟消息(或计划消息)添加到 RabbitMQ。
用户可以声明类型为x-delayed-message
的交换,然后发布带有自定义头为x-delay
的消息,该消息以毫秒为单位表示消息的延迟时间。该消息将在 x
延迟毫秒后被投递到相应的队列。
安装插件
查看插件列表:rabbitmq-plugins list
默认是没有安装 rabbitmq_delayed_message_exchange
插件的。
进入 RabbitMQ 的插件存放目录:/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.14/plugins
下载 rabbitmq_delayed_message_exchange
插件,注意版本号的匹配
1
| wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.8.9/rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez
|
启用插件
1
| rabbitmq-plugins enable rabbitmq_delayed_message_exchange
|
具体使用可参考官方文档,或 RabbitMQ(四):过期时间,死信队列,延迟队列,优先队列,持久化 > 延时队列,及下面实现示例。
实现示例
声明交换机和队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
import java.util.HashMap; import java.util.Map;
@Configuration public class RabbitMQConfig {
public static final String X_DELAYED_EXCHANGE_NAME = "x-delayed-exchange"; public static final String X_DELAYED_EXCHANGE_TYPE = "x-delayed-message"; public static final String X_DELAYED_QUEUE_NAME = "x-delayed-queue"; public static final String X_DELAYED_ROUTE_KEY = "x-delayed-route-key";
@Bean(name = "xDelayedQueue") public Queue xDelayedQueue() { return QueueBuilder.nonDurable(X_DELAYED_QUEUE_NAME).build(); }
@Bean public CustomExchange customExchange() { Map<String, Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); return new CustomExchange(X_DELAYED_EXCHANGE_NAME, X_DELAYED_EXCHANGE_TYPE, true, false, args); }
@Bean public Binding bindingNotify(@Qualifier("xDelayedQueue") Queue queue, @Qualifier("customExchange") CustomExchange customExchange) { return BindingBuilder.bind(queue).to(customExchange).with(X_DELAYED_ROUTE_KEY).noargs(); }
}
|
创建生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.connection.Connection; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component;
import java.io.IOException; import java.nio.charset.Charset; import java.util.HashMap;
@Component public class MessageProduce {
@Autowired private RabbitTemplate rabbitTemplate;
public void xDelayedMessage(String msg) throws IOException {
rabbitTemplate.convertAndSend(RabbitMQConfig.X_DELAYED_EXCHANGE_NAME, RabbitMQConfig.X_DELAYED_ROUTE_KEY, msg, message -> { message.getMessageProperties().setDelay(6000);
return message; }); } }
|
创建消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| import com.rabbitmq.client.Channel; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
@Component public class MessageConsumer { private static final Logger logger = LogManager.getLogger(MessageConsumer.class);
@RabbitListener(queues = RabbitMQConfig.X_DELAYED_QUEUE_NAME) public void xDelayedListener(Message message, Channel channel, String msgBody) { String msg = new String(message.getBody()); logger.info("receive message:{}, currentTime:{}, ", msgBody, System.currentTimeMillis() / 1000);
}
}
|
启动服务
启动服务,查看 RabbitMQ 的 Web 管理平台的交换器,存在一个类型为 x-delayed-message
的交换器。
TTL+DLX与插件区别
- TTL+DLX:先进先出(FIFO),如果不同过期的消息存放在同一个队列,可能因为先进的消息未过期而导致后进的消息已过期而不被立即处理
- 延时插件:消息能被及时处理。
相关资料
相关参考
- 一文带你搞定RabbitMQ延迟队列:详细
- 实现一个延时队列:模拟 DelayQueue 实现自定义的延时对列,对理解 DelayQueue 实现原理非常有帮助。
- 有赞延迟队列设计:基于 Redis 实现,把定时任务和消费进行了拆分。
- 延时队列实现思路:Redis,RabbitMQ,Kafka,Netty,DelayQueue,没有示例代码。
- 定时任务实现几种方式:@schedule 注解,Timer & TimerTask,Quartz,ScheduleExecutorService。
- 美图延时队列实现-LMSTFY:基于 Redis 实现,LMSTFY Github地址。
- Redis实现消息队列:借助了 Redis 的 List 的 BLPOP 或 BRPOP 阻塞消费消息。
- Lua Guava-EventBus 实现延时队列,这个实现思路值得参考。
- 10种延迟任务实现方式:做了汇总,有示例代码,可参考。
- Redus 过期 Key 监听与发布订阅功能:有详情的代码示例参考。
- Spring Messaging with Redis:Spring 官方手册,基于 Redis 的 发布/订阅 来发送消息。
- Spring Messaging with RabbitMQ:Spring 官方手册,基于 RabbitMQ 的 发布/订阅 来发送消息。