延时队列之 Redis Sorted Set 实现

Redis 的 Sorted Set 数据类型是一个有序集合,不可重复集合。有序集合的每个元素都有一个用于排序的权重(score)属性。

此类型非常适合用于制作排行榜,同样可以借助权重(score)属性作为元素的延时时间来实现延时队列的功能。

Sorted Set

Redis 的 Sorted Set 数据类型是一个有序集合,不可重复集合。有序集合的每个元素都有一个用于排序的权重(score)属性。

此类型非常适合用于制作排行榜,同样可以借助权重(score)属性作为元素的延时时间来实现延时队列的功能。

相关命令

  • zadd key score member:添加元素,设置权重分。
  • *zrange key start stop [WITHSCORES]*:获取索引范围内的元素,按权重分从小到大排序。
  • *zrevrange key start stop [WITHSCORES]*:获取索引范围内的元素,按权重分由大到小排序。
  • *zrangebyscore key min max [WITHSCORES] [limit offset count]*:获取权重范围内的元素,按权重由大到小排序。
  • *zrevrangebyscore key max min [WITHSCORES] [limit offset count]*:获取权重范围内的元素,按权重由小到大排序。

命令示例:新增一个电影评分排行榜

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
192.168.50.128:6379[7]> zadd ranking:film 1431 gold_brother 690 fantanfengbao 516 diezhongdie3 293 jianghuernv
(integer) 4
192.168.50.128:6379[7]> zrange ranking:film 0 -1 #按索引范围获取,0到-1表示所有,从小到大输出
1) "jianghuernv"
2) "diezhongdie3"
3) "fantanfengbao"
4) "gold_brother"
192.168.50.128:6379[7]> zrange ranking:film 0 -1 withscores
1) "jianghuernv"
2) "293"
3) "diezhongdie3"
4) "516"
5) "fantanfengbao"
6) "690"
7) "gold_brother"
8) "1431"
192.168.50.128:6379[7]> zrange ranking:film 0 2 withscores
1) "jianghuernv"
2) "293"
3) "diezhongdie3"
4) "516"
5) "fantanfengbao"
6) "690"
192.168.50.128:6379[7]> zrevrange ranking:film 0 -1 withscores #按索引范围获取,0到-1表示所有,从大到小输出
1) "gold_brother"
2) "1431"
3) "fantanfengbao"
4) "690"
5) "diezhongdie3"
6) "516"
7) "jianghuernv"
8) "293"
192.168.50.128:6379[7]> zrevrange ranking:film 0 2 withscores
1) "gold_brother"
2) "1431"
3) "fantanfengbao"
4) "690"
5) "diezhongdie3"
6) "516"
192.168.50.128:6379[7]> zrangebyscore ranking:film 300 700 withscores #按权重范围获取,从小到大
1) "diezhongdie3"
2) "516"
3) "fantanfengbao"
4) "690"
192.168.50.128:6379[7]> zrevrangebyscore ranking:film 700 300 withscores #按权重范围获取,从大到小
1) "fantanfengbao"
2) "690"
3) "diezhongdie3"
4) "516"

优缺点

优点

  • 实现简单实用,Sorted Set 支持 score 排序。
  • Redis 数据是内存操作,性能非常好。
  • Redis 可以搭建集建,可以对键分片来提高处理速度和可靠性。
  • Redis 持久化机制保证了消息的可靠性。

缺点

  • 没有重试机制,异常重试和重试策略需要自己实现。

  • 没有ACK机制,如果客户端在拿到数据并删除成功后,处理业务崩溃了,则这条任务就丢失了。

    如果对消息可靠性要求较高,推荐使用 MQ 来实现。

实现示例

  • 使用 zadd key score member 命令添加元素,设置 score 作为延迟时间。

    如果延迟3分钟,就在当前时间上增加3分钟,轮询任务只处理 score 小于 当前时间的 Key。

  • 使用 zrange key start stop [WITHSCORES] 根据索引范围获取元素,需要在业务层判断是否过期。

    1
    Set<ZSetOperations.TypedTuple<Object>> typedTuples = redisTemplate.opsForZSet().rangeWithScores(KEY, 0, 100);
  • 或使用 zrangebyscore key min max [WITHSCORES] [limit offset count] 根据 score 范围获取元素,从小到大排序输出,取出的就是已经过期的元素。

    1
    Set<ZSetOperations.TypedTuple<Object>> typedTuples = redisTemplate.opsForZSet().rangeByScoreWithScores(KEY, 0, (System.currentTimeMillis() / 1000), 0, 100);

随应用启动开启一个线程从队列中获取任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import com.alibaba.fastjson.JSON;
import com.delay.queue.model.MessageTask;
import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ZSetOperations;
import org.springframework.stereotype.Component;

import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* @desc Sorted Set 实现延时队列
*/
@Component
public class DelayQueueManager implements CommandLineRunner {
private static final Logger logger = LogManager.getLogger(DelayQueueManager.class);

private static final String DELAY_MESSAGE_QUEUE_NAME = "MessageQueue:Task";

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Override
public void run(String... args) throws Exception {
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(this::executeTask);
}

/**
* 延时任务执行线程
*/
private void executeTask() {
while (true) {
// 这里演示用 while(true), 生产最好使用定时任务
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}

// 方式一, 基于 rangeByScoreWithScores 方法
long currentSecond = (System.currentTimeMillis() / 1000);
Set<ZSetOperations.TypedTuple<Object>> typedTuples = redisTemplate.opsForZSet().rangeByScoreWithScores(DELAY_MESSAGE_QUEUE_NAME, 0, currentSecond, 0, 100);
if (CollectionUtils.isEmpty(typedTuples)) {
logger.info("-------------延时队列为空-------------");
continue;
}
for (ZSetOperations.TypedTuple<Object> typedTuple : typedTuples) {
MessageTask messageTask = JSON.parseObject(JSON.toJSONString(typedTuple.getValue()), MessageTask.class);
// 防止并发问题,执行先删除,再处理业务,删除成功返回1,并发已被删除的任务会删除失败就不执行任务
long result = redisTemplate.opsForZSet().remove("MessageQueue:Task", messageTask);
if (result == 1) {
System.out.println("处理业务:" + JSON.toJSONString(messageTask));
}
}


// 方式二,基于 rangeWithScores 方法,但在业务层需要判断是否过期
/* Set<ZSetOperations.TypedTuple<Object>> typedTuples = redisTemplate.opsForZSet().rangeWithScores(DELAY_MESSAGE_QUEUE_NAME, 0, 100);
if (CollectionUtils.isEmpty(typedTuples)) {
logger.info("-------------延时队列为空-------------");
continue;
}

for (ZSetOperations.TypedTuple<Object> typedTuple : typedTuples) {
long score = typedTuple.getScore().longValue();
long currentSecond = (System.currentTimeMillis() / 1000);
logger.info("score:{}, current:{}", score, currentSecond);
MessageTask messageTask = JSON.parseObject(JSON.toJSONString(typedTuple.getValue()), MessageTask.class);
if (score < currentSecond) {
// 防止并发问题,执行先删除,再处理业务,删除成功返回1,并发已被删除的任务会删除失败就不执行任务
long result = redisTemplate.opsForZSet().remove("MessageQueue:Task", messageTask);
if (result == 1) {
System.out.println("处理业务:" + JSON.toJSONString(messageTask));
}
} else {
// 未过期,直接跳出循环,此元素未过期,后续其它元素肯定未过期
break;
}
}*/
}
}
}

使用定时任务

入口类添加 @EnableScheduling注解启用 Spring 自带的定时任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Component
public class SendMsgScheduledTask {
private static final Logger logger = LogManager.getLogger(SendMsgScheduledTask.class);

/**
* @desc: 发送预问诊消息
* [0 0/2 * * * ?]每2分种执行一次
* [0/5 * * * * ?]每5秒扫许一次
*/
@Scheduled(cron = "0 0/1 * * * ?")
public void sendMessageProcess() {
System.out.println("处理业务:" + JSON.toJSONString(messageTask));
// 与上面 DelayQueueManager 中处理逻辑一样
}
}

生产延时任务加入到延时队列:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import com.delay.queue.jdkdelayqueue.MessageTask;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.core.RedisTemplate;

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
class DelayQueueApplicationTests {

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Test
void contextLoads() {

MessageTask messageTask = null;
long second = System.currentTimeMillis() / 1000;
// 插入500 条数据
for (int i = 0; i < 500; i++) {
// 延时i秒
long score = second + i;
messageTask = new MessageTask(String.valueOf(i), score);
redisTemplate.opsForZSet().add("MessageQueue:Task", messageTask, score);
}
}
}

并发优化

  1. 为防止并发问题,在查出任务后,要先删除任务,再处理任务,删除失败就不执行任务。

    基于 redisTemplate 的先查后删除操作不是原子的,可能存在 T1,T2都拿到任务数据,但只能给删除成功的线程处理业务。

    但这里仍有个小问题,一个任务被多个线程争抢,没抢到的线程就白取了一次任务,就浪费了。进一步优化。

  2. 使用 Lua 脚本,将 查询和删除放在脚本中执行以保证原子性,删除成功返回任务,否则返回空。

    脚本示例如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    String luaScript = "local resultArray = redis.call('zrangebyscore', KEYS[1], 0, ARGV[1], 'limit' , 0, 1);" 
    + "if #resultArray > 0 then"
    + "if redis.call('zrem', KEYS[1], resultArray[1]) > 0 then"
    + "return resultArray[1];"
    + "else"
    + "return '';"
    + "end;"
    +"else"
    +"return '';"
    + "end;";
    -- redisTemplate
    Long releaseResult = redisTemplate.execute(luaScript, keyList, valueList);
    -- jedis
    jedis.eval(luaScript, ScriptOutputType.VALUE, new String[]{key}, String.valueOf(System.currentTimeMillis()));

    这段 Lua 脚本更多可以参考下 RedissonRedissonDelayedQueue 的构造方法中的一段 Lua 脚本。

分片优化

所有任务放在一个 Sorted Set 里,当数据量非常大的话,应会影响写入和读取,可以将 KEY 进行 HASH 路由到多个集合。

网上提供了一种基于 Redis 的延时处理优化方案:

Redis延时队列方案优化

设计思路

  1. 将延迟的消息任务通过 HASH 算法路由至不同的 Redis Key 上,这样做有两大好处:
    • 避免了当一个 KEY 在存储了较多的延时消息后,入队操作以及查询操作速度变慢的问题(两个操作的时间复杂度均为O(logN))。
    • 系统具有了更好的横向可扩展性,当数据量激增时,我们可以通过增加 Redis Key 的数量来快速的扩展整个系统,来抗住数据量的增长。
  2. 每个 Redis Key 都对应建立一个处理进程,称为 Event 进程,通过rangeWithScores 方法轮询 Key,查询是否有待处理的延迟消息。
  3. 所有的 Event 进程只负责分发消息,具体的业务逻辑通过一个额外的消息队列异步处理,这么做的好处有:
    • 一,Event 进程只负责分发消息,那么其处理消息的速度就会非常快,就不太会出现因为业务逻辑复杂而导致消息堆积的情况。
    • 二,采用一个额外的消息队列后,消息处理的可扩展性也会更好,我们可以通过增加消费者进程数量来扩展整个系统的消息处理能力。
  4. Event 进程采用 Zookeeper 选主单进程部署的方式,避免 Event 进程宕机后,Redis Key 中消息堆积的情况。一旦 Zookeeper 的 leader 主机宕机,Zookeeper 会自动选择新的 leader 主机来处理 Redis Key 中的消息。

从上述的思路中可以看出,通过 Redis Sorted Set 实现延迟队列是一种理解起来较为直观,可以快速落地的方案。并且可以依赖 Redis 自身的持久化来实现消息任务的持久化,使用 Redis 集群来支持高并发和高可用,是一种不错的延迟队列的实现方案。

相关参考

  1. 实现一个延时队列:模拟 DelayQueue 实现自定义的延时对列,对理解 DelayQueue 实现原理非常有帮助。
  2. 有赞延迟队列设计:基于 Redis 实现,把定时任务和消费进行了拆分。
  3. 延时队列实现思路:Redis,RabbitMQ,Kafka,Netty,DelayQueue,没有示例代码。
  4. 定时任务实现几种方式:@schedule 注解,Timer & TimerTask,Quartz,ScheduleExecutorService。
  5. 美图延时队列实现-LMSTFY:基于 Redis 实现,LMSTFY Github地址
  6. Redis实现消息队列:借助了 Redis 的 List 的 BLPOP 或 BRPOP 阻塞消费消息。
  7. Lua Guava-EventBus 实现延时队列,这个实现思路值得参考。
  8. 10种延迟任务实现方式:做了汇总,有示例代码,可参考。
  9. Redus 过期 Key 监听与发布订阅功能:有详情的代码示例参考。
  10. Spring Messaging with Redis:Spring 官方手册,基于 Redis 的 发布/订阅 来发送消息。
  11. Spring Messaging with RabbitMQ:Spring 官方手册,基于 RabbitMQ 的 发布/订阅 来发送消息。
作者

光星

发布于

2021-03-15

更新于

2022-07-12

许可协议

评论