延时队列 这 Redisson DelayedQueue 实现
基于 Redisson 的最佳实践应用 Redisson 实现了延时队列(Delayed Queue )功能,可以直接拿来使用。
Redisson DelayedQueue
基于 Redisson 的最佳实践应用 Redisson 实现了延时队列(Delayed Queue )功能,可以直接拿来使用。
涉及三个队列,一个发布订阅通道,获取延时元素是通过阻塞队列实现的,发布订阅通道没看出具体的作用。
- 阻塞队列 List:KEY =
queueName
,执行BLPOP
命令从左端弹出元素,右端插入元素。 - 有序集合 Sorted Set:KEY =
redisson_delay_queue_timeout:{queueName}
,score 是元素的过期时间,按从小到大排序,
过期时间小于当前时间表示已过期,删除集合中的元素,并将元素添加到阻塞队列。 - 普通集合 List:KEY =
redisson_delay_queue:{DelayMessage}
,按顺序从右端添加元素,元素过期会被删除。 - 发布/订阅通道:
redisson_delay_queue_channel
,目前没看出具体用途。
应用示例
基于 Spring Boot + redisson-spring-boot-starter 实现示例
添加依赖
1 | <dependency> |
Redisson 的 starter 包自动配置默认注册了 RedisTemplate Bean,RedissonConnectionFactory Bean,RedissonClient Bean。
延时队列管理
1 | import org.redisson.api.RBlockingQueue; |
延时队列监听器
监听器抽象接口:
1 | /** |
具体监听器实现:
1 | import org.apache.logging.log4j.LogManager; |
添加延时任务
1 | package com.delay.queue.controller; |
快速发送两条请求:
发送请求1:http://localhost:8080/delayed/message?delay=20&value=AAAA
发送请求2:http://localhost:8080/delayed/message?delay=5&value=BBBB
监听结果
1 | BBBB |
源码分析
Redisson 入队操作使用是的 Lua 脚本,Redis 底层解析是会开启事务来执行脚本中的多条命令,可以确保原子性。
1 | public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelayedQueue<V> { |
RedissonDelayedQueue构造方法
添加元素,会先创建 RedissonDelayedQueue
实例,执行构造方法中的 Lua 脚本,目的是对已过期但未处理的任务进行处理。
Lua 脚本
1 | -- 从 Sorted Set 按score从小到大排序,拿出小于当前时间的 100 条数据(已过期的数据) |
脚本参数示例
1 | KEY "DelayMessage" "redisson_delay_queue_timeout:{DelayMessage}" "redisson_delay_queue:{DelayMessage}" |
脚本转为执行的Redis命令示例
1 | # 从 Sorted Set 按score从小到大排序,拿出小于当前时间的 100 条数据 |
offerAsync添加元素
执行添加元素操作,执行 Lua 脚本,在 Redis 解析 Lua 脚本转换为 Redis 命令底层是开启事务执行多条命令。
Lua 脚本
1 | local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]); |
脚本参数示例
1 | KEYS "DelayMessage" "redisson_delay_queue_timeout:{DelayMessage}" "redisson_delay_queue:{DelayMessage}" "redisson_delay_queue_channel:{DelayMessage}" |
脚本转为执行的Reids命令示例
添加元素的 offerAsync 方法中的 Lua 脚本最终执行的 Redis 命令如下:
1 | # 添加元素添加到 KEY 为 redisson_delay_queue_timeout:{DelayMessage} 的 Sorted Set 有序队列中 |
延时任务定时轮询
Redisson 的定时轮询是基于 Netty 的 时间轮(HashedWheelTimeout
) 实现的。轮询到过期的元素,则将期加入到阻塞队列,删除 Sorted Set 和 List 中的元素。
Netty 的 时间轮后续再开一篇文章详细描述。
底层逻辑
Redisson 的延时队列是基于 Redis 的 Sorted Set
和 List
+ Blocking List
实现的,底层是基于 Redis 的事务和相关命令实现。参考 【附文一】。
使用 redis-client
登录 Redis 服务,执行 monitor
命令开启监控,输出打印见 【附文二】。
添加元素
执行
MULTI
开启事务。执行
zdd
命令将元素加入到一个 KEY 为redisson_delay_queue_timeout:{queueName}
的Sorted Set
,权重
score
是当前时间 + 延迟时间
,Sorted Set 是一个按score
排序的有序集合。执行
rpush
命令将元素将入到一个 KEY 为redisson_delay_queue:{queueName}
的普通List
。该 List 是按插入的顺序保存元素。
执行
EXEC
命令提交。
消费元素
客户端随应该启动就运行,
Redisson 客户端会订阅一个 KEY 为
queueName
的Blocking List
(阻塞队列 )。使用
zrange
命令获取Sorted Set
中的第一个元素,如果小于等于当前时间,表示已过期。执行
MULTI
开启事务。拿到过期的元素,执行
rpush
命令,将元素插入到Blocking List
(阻塞队列)最右端。执行
lrem
命令从 KEY 为redisson_delay_queue:{queueName}
的普通 List 中删除元素。执行
zrem
命令从 KEY 为redisson_delay_queue_timeout:{DelayMessage}
的Sorted Set
中删除元素。执行
EXEC
命令提交。注意:这里是先删除成功后,再将元素加入到阻塞队列,这样可以防止并发执行的情况。
最后执行
LPOP
命令 从Blocking List
(阻塞队列)最左端弹出元素完成消费。如果阻塞队列中元素未被消费则不会消失,直到消费完。
相关参考
- 实现一个延时队列:模拟 DelayQueue 实现自定义的延时对列,对理解 DelayQueue 实现原理非常有帮助。
- 有赞延迟队列设计:基于 Redis 实现,把定时任务和消费进行了拆分。
- 延时队列实现思路:Redis,RabbitMQ,Kafka,Netty,DelayQueue,没有示例代码。
- 定时任务实现几种方式:@schedule 注解,Timer & TimerTask,Quartz,ScheduleExecutorService。
- 美图延时队列实现-LMSTFY:基于 Redis 实现,LMSTFY Github地址。
- Redis实现消息队列:借助了 Redis 的 List 的 BLPOP 或 BRPOP 阻塞消费消息。
- Lua Guava-EventBus 实现延时队列,这个实现思路值得参考。
- 10种延迟任务实现方式:做了汇总,有示例代码,可参考。
- Redus 过期 Key 监听与发布订阅功能:有详情的代码示例参考。
- Spring Messaging with Redis:Spring 官方手册,基于 Redis 的 发布/订阅 来发送消息。
- Spring Messaging with RabbitMQ:Spring 官方手册,基于 RabbitMQ 的 发布/订阅 来发送消息。
附文一
Redisson 延时队列执行 Redis 命令的 AOF 日志
1 | $6 |
附文二
使用 redis-client
登录 Redis 服务,执行 monitor
监控。
添加两个元素到延时队列,Redis 监控命令
1 | 1615736035.403628 [4 120.229.16.239:34559] "SUBSCRIBE" "redisson_delay_queue_channel:{DelayMessage}" |
过期监听处理的Redis命令
1 | 1615736443.935211 [4 120.229.16.239:34795] "EVAL" "local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); if #expiredValues > 0 then for i, v in ipairs(expiredValues) do local randomId, value = struct.unpack('dLc0', v);redis.call('rpush', KEYS[1], value);redis.call('lrem', KEYS[3], 1, v);end; redis.call('zrem', KEYS[2], unpack(expiredValues));end; local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); if v[1] ~= nil then return v[2]; end return nil;" "3" "DelayMessage" "redisson_delay_queue_timeout:{DelayMessage}" "redisson_delay_queue:{DelayMessage}" "1615736445889" "100" |
延时队列 这 Redisson DelayedQueue 实现
http://blog.gxitsky.com/2021/03/15/ArchitectureDesign-Distrbuted-Task-DelayQueue-Redisson/