延时队列之 Redis Sorted Set 实现
Redis 的 Sorted Set 数据类型是一个有序集合,不可重复集合。有序集合的每个元素都有一个用于排序的权重(score)属性。
此类型非常适合用于制作排行榜,同样可以借助权重(score)属性作为元素的延时时间来实现延时队列的功能。
Sorted Set
Redis 的 Sorted Set 数据类型是一个有序集合,不可重复集合。有序集合的每个元素都有一个用于排序的权重(score)属性。
此类型非常适合用于制作排行榜,同样可以借助权重(score)属性作为元素的延时时间来实现延时队列的功能。
相关命令
- zadd key score member:添加元素,设置权重分。
- *zrange key start stop [WITHSCORES]*:获取索引范围内的元素,按权重分从小到大排序。
- *zrevrange key start stop [WITHSCORES]*:获取索引范围内的元素,按权重分由大到小排序。
- *zrangebyscore key min max [WITHSCORES] [limit offset count]*:获取权重范围内的元素,按权重由大到小排序。
- *zrevrangebyscore key max min [WITHSCORES] [limit offset count]*:获取权重范围内的元素,按权重由小到大排序。
命令示例:新增一个电影评分排行榜
1 | 192.168.50.128:6379[7]> zadd ranking:film 1431 gold_brother 690 fantanfengbao 516 diezhongdie3 293 jianghuernv |
优缺点
优点
- 实现简单实用,Sorted Set 支持 score 排序。
- Redis 数据是内存操作,性能非常好。
- Redis 可以搭建集建,可以对键分片来提高处理速度和可靠性。
- Redis 持久化机制保证了消息的可靠性。
缺点
没有重试机制,异常重试和重试策略需要自己实现。
没有ACK机制,如果客户端在拿到数据并删除成功后,处理业务崩溃了,则这条任务就丢失了。
如果对消息可靠性要求较高,推荐使用 MQ 来实现。
实现示例
使用 zadd key score member 命令添加元素,设置 score 作为延迟时间。
如果延迟3分钟,就在当前时间上增加3分钟,轮询任务只处理 score 小于 当前时间的 Key。
使用 zrange key start stop [WITHSCORES] 根据索引范围获取元素,需要在业务层判断是否过期。
1
Set<ZSetOperations.TypedTuple<Object>> typedTuples = redisTemplate.opsForZSet().rangeWithScores(KEY, 0, 100);
或使用 zrangebyscore key min max [WITHSCORES] [limit offset count] 根据 score 范围获取元素,从小到大排序输出,取出的就是已经过期的元素。
1
Set<ZSetOperations.TypedTuple<Object>> typedTuples = redisTemplate.opsForZSet().rangeByScoreWithScores(KEY, 0, (System.currentTimeMillis() / 1000), 0, 100);
随应用启动开启一个线程从队列中获取任务:
1 | import com.alibaba.fastjson.JSON; |
使用定时任务
入口类添加 @EnableScheduling
注解启用 Spring 自带的定时任务
1 |
|
生产延时任务加入到延时队列:
1 | import com.delay.queue.jdkdelayqueue.MessageTask; |
并发优化
为防止并发问题,在查出任务后,要先删除任务,再处理任务,删除失败就不执行任务。
基于 redisTemplate 的先查后删除操作不是原子的,可能存在 T1,T2都拿到任务数据,但只能给删除成功的线程处理业务。
但这里仍有个小问题,一个任务被多个线程争抢,没抢到的线程就白取了一次任务,就浪费了。进一步优化。
使用 Lua 脚本,将 查询和删除放在脚本中执行以保证原子性,删除成功返回任务,否则返回空。
脚本示例如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14String luaScript = "local resultArray = redis.call('zrangebyscore', KEYS[1], 0, ARGV[1], 'limit' , 0, 1);"
+ "if #resultArray > 0 then"
+ "if redis.call('zrem', KEYS[1], resultArray[1]) > 0 then"
+ "return resultArray[1];"
+ "else"
+ "return '';"
+ "end;"
+"else"
+"return '';"
+ "end;";
-- redisTemplate
Long releaseResult = redisTemplate.execute(luaScript, keyList, valueList);
-- jedis
jedis.eval(luaScript, ScriptOutputType.VALUE, new String[]{key}, String.valueOf(System.currentTimeMillis()));这段 Lua 脚本更多可以参考下 Redisson 的
RedissonDelayedQueue
的构造方法中的一段 Lua 脚本。
分片优化
所有任务放在一个 Sorted Set 里,当数据量非常大的话,应会影响写入和读取,可以将 KEY 进行 HASH 路由到多个集合。
网上提供了一种基于 Redis 的延时处理优化方案:
设计思路
- 将延迟的消息任务通过 HASH 算法路由至不同的 Redis Key 上,这样做有两大好处:
- 避免了当一个 KEY 在存储了较多的延时消息后,入队操作以及查询操作速度变慢的问题(两个操作的时间复杂度均为
O(logN)
)。 - 系统具有了更好的横向可扩展性,当数据量激增时,我们可以通过增加 Redis Key 的数量来快速的扩展整个系统,来抗住数据量的增长。
- 避免了当一个 KEY 在存储了较多的延时消息后,入队操作以及查询操作速度变慢的问题(两个操作的时间复杂度均为
- 每个 Redis Key 都对应建立一个处理进程,称为 Event 进程,通过
rangeWithScores
方法轮询 Key,查询是否有待处理的延迟消息。 - 所有的 Event 进程只负责分发消息,具体的业务逻辑通过一个额外的消息队列异步处理,这么做的好处有:
- 一,Event 进程只负责分发消息,那么其处理消息的速度就会非常快,就不太会出现因为业务逻辑复杂而导致消息堆积的情况。
- 二,采用一个额外的消息队列后,消息处理的可扩展性也会更好,我们可以通过增加消费者进程数量来扩展整个系统的消息处理能力。
- Event 进程采用
Zookeeper
选主单进程部署的方式,避免 Event 进程宕机后,Redis Key 中消息堆积的情况。一旦 Zookeeper 的 leader 主机宕机,Zookeeper 会自动选择新的 leader 主机来处理 Redis Key 中的消息。
从上述的思路中可以看出,通过 Redis Sorted Set 实现延迟队列是一种理解起来较为直观,可以快速落地的方案。并且可以依赖 Redis 自身的持久化来实现消息任务的持久化,使用 Redis 集群来支持高并发和高可用,是一种不错的延迟队列的实现方案。
相关参考
- 实现一个延时队列:模拟 DelayQueue 实现自定义的延时对列,对理解 DelayQueue 实现原理非常有帮助。
- 有赞延迟队列设计:基于 Redis 实现,把定时任务和消费进行了拆分。
- 延时队列实现思路:Redis,RabbitMQ,Kafka,Netty,DelayQueue,没有示例代码。
- 定时任务实现几种方式:@schedule 注解,Timer & TimerTask,Quartz,ScheduleExecutorService。
- 美图延时队列实现-LMSTFY:基于 Redis 实现,LMSTFY Github地址。
- Redis实现消息队列:借助了 Redis 的 List 的 BLPOP 或 BRPOP 阻塞消费消息。
- Lua Guava-EventBus 实现延时队列,这个实现思路值得参考。
- 10种延迟任务实现方式:做了汇总,有示例代码,可参考。
- Redus 过期 Key 监听与发布订阅功能:有详情的代码示例参考。
- Spring Messaging with Redis:Spring 官方手册,基于 Redis 的 发布/订阅 来发送消息。
- Spring Messaging with RabbitMQ:Spring 官方手册,基于 RabbitMQ 的 发布/订阅 来发送消息。
延时队列之 Redis Sorted Set 实现
http://blog.gxitsky.com/2021/03/15/ArchitectureDesign-Distrbuted-Task-DelayQueue-Redis-Sorted-Set/