延时队列 这 Redisson DelayedQueue 实现

基于 Redisson 的最佳实践应用 Redisson 实现了延时队列(Delayed Queue )功能,可以直接拿来使用。

Redisson DelayedQueue

基于 Redisson 的最佳实践应用 Redisson 实现了延时队列(Delayed Queue )功能,可以直接拿来使用。

涉及三个队列,一个发布订阅通道,获取延时元素是通过阻塞队列实现的,发布订阅通道没看出具体的作用。

  • 阻塞队列 List:KEY = queueName,执行 BLPOP 命令从左端弹出元素,右端插入元素。
  • 有序集合 Sorted Set:KEY = redisson_delay_queue_timeout:{queueName},score 是元素的过期时间,按从小到大排序,
    过期时间小于当前时间表示已过期,删除集合中的元素,并将元素添加到阻塞队列。
  • 普通集合 List:KEY = redisson_delay_queue:{DelayMessage},按顺序从右端添加元素,元素过期会被删除。
  • 发布/订阅通道:redisson_delay_queue_channel,目前没看出具体用途。

应用示例

基于 Spring Boot + redisson-spring-boot-starter 实现示例

添加依赖

1
2
3
4
5
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.15.1</version>
</dependency>

Redisson 的 starter 包自动配置默认注册了 RedisTemplate Bean,RedissonConnectionFactory Bean,RedissonClient Bean。

延时队列管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;

/**
* @desc 延时队列管理
*/
@Component
public class RedisDelayedQueueManager {

@Autowired
RedissonClient redissonClient;

/**
* 添加元素到延时队列
*
* @param t 队列成员
* @param delay 延时时间
* @param timeUnit 时间单位
* @param queueName 队列名称
* @param <T> 泛型
*/
public <T> void add(T t, long delay, TimeUnit timeUnit, String queueName) {
RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName);
RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);
delayedQueue.offer(t, delay, timeUnit);
delayedQueue.destroy();
}

/**
* 获取元素并删除
*
* @param queueName 队列名称
* @param delayedTaskListener 延时任务监听器
* @param <T> 泛型
*/
public <T> void take(String queueName, DelayedTaskListener delayedTaskListener) {
RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName);
while (true) {
try {
delayedTaskListener.invoke(blockingFairQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//由于此线程需要常驻,可以新建线程,不用交给线程池管理
/*((Runnable) () -> {
while (true) {
try {
Thread.sleep(1000);
T t = blockingFairQueue.take();
delayedTaskListener.invoke(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).run();*/
}
}

延时队列监听器

监听器抽象接口:

1
2
3
4
5
6
7
/**
* 延时任务监听器
*/
public interface DelayedTaskListener<T> {

void invoke(T t);
}

具体监听器实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

/**
* @desc 实现 CommandLineRunner 接口中的 run 方法,随应用启动而运行
*/
@Component
public class RedisDelayQueueListener implements CommandLineRunner {
private static final Logger logger = LogManager.getLogger(RedisDelayQueueListener.class);

@Autowired
RedisDelayedQueueManager redisDelayedQueueManager;
@Autowired
private RedissonClient redissonClient;

@Override
public void run(String... args) throws Exception {
logger.info("===============延时队列监听器启动==============");
//监听延迟队列
DelayedTaskListener<String> delayedTaskListener = new DelayedTaskListener<String>() {
@Override
public void invoke(String delayMessage) {
//这里调用你延迟之后的代码,在这里执行业务处理
System.out.println(delayMessage);
}
};
redisDelayedQueueManager.take("DelayMessage", delayedTaskListener);
}
}

添加延时任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.delay.queue.controller;

import com.delay.queue.redisson.DelayMessage;
import com.delay.queue.redisson.RedisDelayedQueueManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.concurrent.TimeUnit;

@RestController
@RequestMapping("/delayed")
public class DelayedTaskController {

@Autowired
private RedisDelayedQueueManager redisDelayedQueueManager;

@RequestMapping("/message")
public void addDelayedMessage(Integer delay, String value) {
redisDelayedQueueManager.add(value, delay, TimeUnit.SECONDS, "DelayMessage");
}
}

快速发送两条请求:

发送请求1:http://localhost:8080/delayed/message?delay=20&value=AAAA
发送请求2:http://localhost:8080/delayed/message?delay=5&value=BBBB

监听结果

1
2
BBBB
AAAA

源码分析

Redisson 入队操作使用是的 Lua 脚本,Redis 底层解析是会开启事务来执行脚本中的多条命令,可以确保原子性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelayedQueue<V> {

private final QueueTransferService queueTransferService;
private final String channelName;
private final String queueName;
private final String timeoutSetName;

protected RedissonDelayedQueue(QueueTransferService queueTransferService, Codec codec, final CommandAsyncExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
// 订阅通道名前缀
channelName = prefixName("redisson_delay_queue_channel", getName());
// 延时队列 List 前缀
queueName = prefixName("redisson_delay_queue", getName());
// 延时队列过期排序 Sorted Set 的 KEY 前缀
timeoutSetName = prefixName("redisson_delay_queue_timeout", getName());
// 创建一个订阅作为监听器
QueueTransferTask task = new QueueTransferTask(commandExecutor.getConnectionManager()) {

@Override
protected RFuture<Long> pushTaskAsync() {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
"local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); "
+ "if #expiredValues > 0 then "
+ "for i, v in ipairs(expiredValues) do "
+ "local randomId, value = struct.unpack('dLc0', v);"
+ "redis.call('rpush', KEYS[1], value);"
+ "redis.call('lrem', KEYS[3], 1, v);"
+ "end; "
+ "redis.call('zrem', KEYS[2], unpack(expiredValues));"
+ "end; "
// get startTime from scheduler queue head task
+ "local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); "
+ "if v[1] ~= nil then "
+ "return v[2]; "
+ "end "
+ "return nil;",
Arrays.<Object>asList(getName(), timeoutSetName, queueName),
System.currentTimeMillis(), 100);//默认取 100条
}

@Override
protected RTopic getTopic() {
return new RedissonTopic(LongCodec.INSTANCE, commandExecutor, channelName);
}
};
// 执行任务
queueTransferService.schedule(queueName, task);

this.queueTransferService = queueTransferService;
}

@Override
public void offer(V e, long delay, TimeUnit timeUnit) {
// 添加元素
get(offerAsync(e, delay, timeUnit));
}

@Override
public RFuture<Void> offerAsync(V e, long delay, TimeUnit timeUnit) {
if (delay < 0) {
throw new IllegalArgumentException("Delay can't be negative");
}
// 延时时间转换为毫秒值
long delayInMs = timeUnit.toMillis(delay);
// 超时时间=当前时间毫秒值 + 延时时间毫秒值
long timeout = System.currentTimeMillis() + delayInMs;

long randomId = ThreadLocalRandom.current().nextLong();
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID,
"local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);"
+ "redis.call('zadd', KEYS[2], ARGV[1], value);"
+ "redis.call('rpush', KEYS[3], value);"
// if new object added to queue head when publish its startTime
// to all scheduler workers
+ "local v = redis.call('zrange', KEYS[2], 0, 0); "
+ "if v[1] == value then "
+ "redis.call('publish', KEYS[4], ARGV[1]); "
+ "end;",
Arrays.<Object>asList(getName(), timeoutSetName, queueName, channelName),
timeout, randomId, encode(e));
}
}

RedissonDelayedQueue构造方法

添加元素,会先创建 RedissonDelayedQueue 实例,执行构造方法中的 Lua 脚本,目的是对已过期但未处理的任务进行处理

Lua 脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
-- 从 Sorted Set 按score从小到大排序,拿出小于当前时间的 100 条数据(已过期的数据)
local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]);
if #expiredValues > 0
then
-- 循环遍历
for i, v in ipairs(expiredValues)
do local randomId, value = struct.unpack('dLc0', v);
-- 加入到阻塞队列 queueName
redis.call('rpush', KEYS[1], value);
-- 删除 redisson_delay_queue:{queueName} 队列(List)该元素
redis.call('lrem', KEYS[3], 1, v);
end;
-- 删除 Sorted Set 中的该元素
redis.call('zrem', KEYS[2], unpack(expiredValues));
end;

local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES');
if v[1] ~= nil
then return v[2];
end
return nil;

脚本参数示例

1
2
KEY "DelayMessage" "redisson_delay_queue_timeout:{DelayMessage}" "redisson_delay_queue:{DelayMessage}" 
VALUE "1615773796322" "100"

脚本转为执行的Redis命令示例

1
2
3
4
# 从 Sorted Set 按score从小到大排序,拿出小于当前时间的 100 条数据
1615773628.031731 [4 lua] "zrangebyscore" "redisson_delay_queue_timeout:{DelayMessage}" "0" "1615773796322" "limit" "0" "100"
# 取出元 Sorted Set 中 score 最小的元素返回
1615773628.031752 [4 lua] "zrange" "redisson_delay_queue_timeout:{DelayMessage}" "0" "0" "WITHSCORES"

offerAsync添加元素

执行添加元素操作,执行 Lua 脚本,在 Redis 解析 Lua 脚本转换为 Redis 命令底层是开启事务执行多条命令。

Lua 脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);
-- 将元素加入到 Sorted Set, KEY 为 redisson_delay_queue_timeout:{queueName} 的延时队列, score 为延时时间
redis.call('zadd', KEYS[2], ARGV[1], value);
-- 将元素加入到 List, KEY 为 redisson_delay_queue:{queueName}, 从右侧加入, 这是个普通 List,
redis.call('rpush', KEYS[3], value);
-- 从 Sorted Set KEY 为 redisson_delay_queue_timeout:{queueName} 中从小到大排序, 取出第一个元素
local v = redis.call('zrange', KEYS[2], 0, 0);

if v[1] == value
then
-- 如果第一个元素为当前新增的元素, 发布到 channel 为 redisson_delay_queue_channel:{queueName}
redis.call('publish', KEYS[4], ARGV[1]);
end;

脚本参数示例

1
2
3
KEYS "DelayMessage" "redisson_delay_queue_timeout:{DelayMessage}" "redisson_delay_queue:{DelayMessage}" "redisson_delay_queue_channel:{DelayMessage}" 

VALUES "1615773806323" "1493646126723946165" "\x04>\x04BBBB"

脚本转为执行的Reids命令示例

添加元素的 offerAsync 方法中的 Lua 脚本最终执行的 Redis 命令如下:

1
2
3
4
5
6
7
8
# 添加元素添加到 KEY 为 redisson_delay_queue_timeout:{DelayMessage} 的 Sorted Set 有序队列中
1615773628.032423 [4 lua] "zadd" "redisson_delay_queue_timeout:{DelayMessage}" "1615773806323" "\xd7\x84\x13=\x7f\xba\xb4C\a\x00\x00\x00\x00\x00\x00\x00\x04>\x04BBBB"
# 将元素添加到 KEY 为 redisson_delay_queue:{DelayMessage} 的 普通 List 中,右侧插入
1615773628.032463 [4 lua] "rpush" "redisson_delay_queue:{DelayMessage}" "\xd7\x84\x13=\x7f\xba\xb4C\a\x00\x00\x00\x00\x00\x00\x00\x04>\x04BBBB"
# 从 Sorted Set 按 socre 从小到大排序, 取第一个元素
1615773628.032491 [4 lua] "zrange" "redisson_delay_queue_timeout:{DelayMessage}" "0" "0"
# 如果从 Sorted Set 拿到的元素即为本次添加的元素,就将其发布到
1615773628.032510 [4 lua] "publish" "redisson_delay_queue_channel:{DelayMessage}" "1615773806323"

延时任务定时轮询

Redisson 的定时轮询是基于 Netty 的 时间轮(HashedWheelTimeout) 实现的。轮询到过期的元素,则将期加入到阻塞队列,删除 Sorted Set 和 List 中的元素。

Netty 的 时间轮后续再开一篇文章详细描述。

底层逻辑

Redisson 的延时队列是基于 Redis 的 Sorted SetList + Blocking List 实现的,底层是基于 Redis 的事务和相关命令实现。参考 【附文一】。

使用 redis-client登录 Redis 服务,执行 monitor 命令开启监控,输出打印见 【附文二】。

添加元素

  1. 执行 MULTI开启事务。

  2. 执行 zdd 命令将元素加入到一个 KEY 为 redisson_delay_queue_timeout:{queueName}Sorted Set

    权重 score当前时间 + 延迟时间,Sorted Set 是一个按 score 排序的有序集合。

  3. 执行 rpush命令将元素将入到一个 KEY 为 redisson_delay_queue:{queueName}的普通 List

    该 List 是按插入的顺序保存元素。

  4. 执行 EXEC 命令提交。

消费元素

客户端随应该启动就运行,

  1. Redisson 客户端会订阅一个 KEY 为 queueNameBlocking List(阻塞队列 )。

  2. 使用 zrange命令获取 Sorted Set中的第一个元素,如果小于等于当前时间,表示已过期。

  3. 执行 MULTI开启事务。

  4. 拿到过期的元素,执行 rpush 命令,将元素插入到 Blocking List (阻塞队列)最右端。

  5. 执行 lrem 命令从 KEY 为 redisson_delay_queue:{queueName}的普通 List 中删除元素。

  6. 执行 zrem命令从 KEY 为 redisson_delay_queue_timeout:{DelayMessage}Sorted Set 中删除元素。

  7. 执行 EXEC 命令提交。

    注意:这里是先删除成功后,再将元素加入到阻塞队列,这样可以防止并发执行的情况。

  8. 最后执行 LPOP 命令 从 Blocking List (阻塞队列)最左端弹出元素完成消费。

    如果阻塞队列中元素未被消费则不会消失,直到消费完。

相关参考

  1. 实现一个延时队列:模拟 DelayQueue 实现自定义的延时对列,对理解 DelayQueue 实现原理非常有帮助。
  2. 有赞延迟队列设计:基于 Redis 实现,把定时任务和消费进行了拆分。
  3. 延时队列实现思路:Redis,RabbitMQ,Kafka,Netty,DelayQueue,没有示例代码。
  4. 定时任务实现几种方式:@schedule 注解,Timer & TimerTask,Quartz,ScheduleExecutorService。
  5. 美图延时队列实现-LMSTFY:基于 Redis 实现,LMSTFY Github地址
  6. Redis实现消息队列:借助了 Redis 的 List 的 BLPOP 或 BRPOP 阻塞消费消息。
  7. Lua Guava-EventBus 实现延时队列,这个实现思路值得参考。
  8. 10种延迟任务实现方式:做了汇总,有示例代码,可参考。
  9. Redus 过期 Key 监听与发布订阅功能:有详情的代码示例参考。
  10. Spring Messaging with Redis:Spring 官方手册,基于 Redis 的 发布/订阅 来发送消息。
  11. Spring Messaging with RabbitMQ:Spring 官方手册,基于 RabbitMQ 的 发布/订阅 来发送消息。

附文一

Redisson 延时队列执行 Redis 命令的 AOF 日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
$6
SELECT
$1
4
*1
$5
MULTI
*4
$4
zadd
$43
redisson_delay_queue_timeout:{DelayMessage}
$13
1615696477945
$23
???薈 >AAAA
*3
$5
rpush
$35
redisson_delay_queue:{DelayMessage}
$23
???薈 >AAAA
*1
$4
EXEC
*1
$5
MULTI
*4
$4
zadd
$43
redisson_delay_queue_timeout:{DelayMessage}
$13
1615696543300
$23
3j 澁C >BBBB
*3
$5
rpush
$35
redisson_delay_queue:{DelayMessage}
$23
3j 澁C >BBBB
*1
$4
EXEC
*1
$5
MULTI
*4
$4
zadd
$43
redisson_delay_queue_timeout:{DelayMessage}
$13
1615696458376
$23
囤NM褚? >CCCC
*3
$5
rpush
$35
redisson_delay_queue:{DelayMessage}
$23
囤NM褚? >CCCC
*1
$4
EXEC
*1
$5
MULTI
*4
$4
zadd
$43
redisson_delay_queue_timeout:{DelayMessage}
$13
1615696526357
$23
:跜灿#幻 >DDDD
*3
$5
rpush
$35
redisson_delay_queue:{DelayMessage}
$23
:跜灿#幻 >DDDD
*1
$4
EXEC
*1
$5
MULTI
*3
$5
rpush
$12
DelayMessage
$7
>CCCC
*4
$4
lrem
$35
redisson_delay_queue:{DelayMessage}
$1
1
$23
囤NM褚? >CCCC
*3
$4
zrem
$43
redisson_delay_queue_timeout:{DelayMessage}
$23
囤NM褚? >CCCC
*1
$4
EXEC
*2
$4
LPOP
$12
DelayMessage
*1
$5
MULTI
*3
$5
rpush
$12
DelayMessage
$7
>AAAA
*4
$4
lrem
$35
redisson_delay_queue:{DelayMessage}
$1
1
$23
???薈 >AAAA
*3
$4
zrem
$43
redisson_delay_queue_timeout:{DelayMessage}
$23
???薈 >AAAA
*1
$4
EXEC
*2
$4
LPOP
$12
DelayMessage
*1
$5
MULTI
*3
$5
rpush
$12
DelayMessage
$7
>DDDD
*4
$4
lrem
$35
redisson_delay_queue:{DelayMessage}
$1
1
$23
:跜灿#幻 >DDDD
*3
$4
zrem
$43
redisson_delay_queue_timeout:{DelayMessage}
$23
:跜灿#幻 >DDDD
*1
$4
EXEC
*2
$4
LPOP
$12
DelayMessage
*1
$5
MULTI
*3
$5
rpush
$12
DelayMessage
$7
>BBBB
*4
$4
lrem
$35
redisson_delay_queue:{DelayMessage}
$1
1
$23
3j 澁C >BBBB
*3
$4
zrem
$43
redisson_delay_queue_timeout:{DelayMessage}
$23
3j 澁C >BBBB
*1
$4
EXEC
*2
$4
LPOP
$12
DelayMessage

附文二

使用 redis-client登录 Redis 服务,执行 monitor 监控。

添加两个元素到延时队列,Redis 监控命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
1615736035.403628 [4 120.229.16.239:34559] "SUBSCRIBE" "redisson_delay_queue_channel:{DelayMessage}"
1615736035.420899 [4 120.229.16.239:34562] "EVAL" "local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); if #expiredValues > 0 then for i, v in ipairs(expiredValues) do local randomId, value = struct.unpack('dLc0', v);redis.call('rpush', KEYS[1], value);redis.call('lrem', KEYS[3], 1, v);end; redis.call('zrem', KEYS[2], unpack(expiredValues));end; local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); if v[1] ~= nil then return v[2]; end return nil;" "3" "DelayMessage" "redisson_delay_queue_timeout:{DelayMessage}" "redisson_delay_queue:{DelayMessage}" "1615736037375" "100"
1615736035.420986 [4 lua] "zrangebyscore" "redisson_delay_queue_timeout:{DelayMessage}" "0" "1615736037375" "limit" "0" "100"
1615736035.421000 [4 lua] "zrange" "redisson_delay_queue_timeout:{DelayMessage}" "0" "0" "WITHSCORES"
1615736035.460491 [4 120.229.16.239:34557] "EVAL" "local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);redis.call('zadd', KEYS[2], ARGV[1], value);redis.call('rpush', KEYS[3], value);local v = redis.call('zrange', KEYS[2], 0, 0); if v[1] == value then redis.call('publish', KEYS[4], ARGV[1]); end;" "4" "DelayMessage" "redisson_delay_queue_timeout:{DelayMessage}" "redisson_delay_queue:{DelayMessage}" "redisson_delay_queue_channel:{DelayMessage}" "1615736097379" "1732831665336717572" "\x04>\x04AAAA"
1615736035.460573 [4 lua] "zadd" "redisson_delay_queue_timeout:{DelayMessage}" "1615736097379" "\x91\xf1\xfb7A\x0c\xb8C\a\x00\x00\x00\x00\x00\x00\x00\x04>\x04AAAA"
1615736035.460593 [4 lua] "rpush" "redisson_delay_queue:{DelayMessage}" "\x91\xf1\xfb7A\x0c\xb8C\a\x00\x00\x00\x00\x00\x00\x00\x04>\x04AAAA"
1615736035.460617 [4 lua] "zrange" "redisson_delay_queue_timeout:{DelayMessage}" "0" "0"
1615736035.460626 [4 lua] "publish" "redisson_delay_queue_channel:{DelayMessage}" "1615736097379"
1615736035.479613 [4 120.229.16.239:34559] "UNSUBSCRIBE" "redisson_delay_queue_channel:{DelayMessage}"
============================================================================
1615736039.976610 [4 120.229.16.239:34559] "SUBSCRIBE" "redisson_delay_queue_channel:{DelayMessage}"
1615736039.991013 [4 120.229.16.239:34561] "EVAL" "local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); if #expiredValues > 0 then for i, v in ipairs(expiredValues) do local randomId, value = struct.unpack('dLc0', v);redis.call('rpush', KEYS[1], value);redis.call('lrem', KEYS[3], 1, v);end; redis.call('zrem', KEYS[2], unpack(expiredValues));end; local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); if v[1] ~= nil then return v[2]; end return nil;" "3" "DelayMessage" "redisson_delay_queue_timeout:{DelayMessage}" "redisson_delay_queue:{DelayMessage}" "1615736041946" "100"
1615736039.991096 [4 lua] "zrangebyscore" "redisson_delay_queue_timeout:{DelayMessage}" "0" "1615736041946" "limit" "0" "100"
1615736039.991118 [4 lua] "zrange" "redisson_delay_queue_timeout:{DelayMessage}" "0" "0" "WITHSCORES"
1615736039.996328 [4 120.229.16.239:34566] "EVAL" "local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);redis.call('zadd', KEYS[2], ARGV[1], value);redis.call('rpush', KEYS[3], value);local v = redis.call('zrange', KEYS[2], 0, 0); if v[1] == value then redis.call('publish', KEYS[4], ARGV[1]); end;" "4" "DelayMessage" "redisson_delay_queue_timeout:{DelayMessage}" "redisson_delay_queue:{DelayMessage}" "redisson_delay_queue_channel:{DelayMessage}" "1615736061948" "-139067833826375740" "\x04>\x04BBBB"
1615736039.996401 [4 lua] "zadd" "redisson_delay_queue_timeout:{DelayMessage}" "1615736061948" "\x84w.T\x17\xe1~\xc3\a\x00\x00\x00\x00\x00\x00\x00\x04>\x04BBBB"
1615736039.996419 [4 lua] "rpush" "redisson_delay_queue:{DelayMessage}" "\x84w.T\x17\xe1~\xc3\a\x00\x00\x00\x00\x00\x00\x00\x04>\x04BBBB"
1615736039.996431 [4 lua] "zrange" "redisson_delay_queue_timeout:{DelayMessage}" "0" "0"
1615736039.996438 [4 lua] "publish" "redisson_delay_queue_channel:{DelayMessage}" "1615736061948"
1615736040.015530 [4 120.229.16.239:34559] "UNSUBSCRIBE" "redisson_delay_queue_channel:{DelayMessage}"

过期监听处理的Redis命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
1615736443.935211 [4 120.229.16.239:34795] "EVAL" "local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); if #expiredValues > 0 then for i, v in ipairs(expiredValues) do local randomId, value = struct.unpack('dLc0', v);redis.call('rpush', KEYS[1], value);redis.call('lrem', KEYS[3], 1, v);end; redis.call('zrem', KEYS[2], unpack(expiredValues));end; local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); if v[1] ~= nil then return v[2]; end return nil;" "3" "DelayMessage" "redisson_delay_queue_timeout:{DelayMessage}" "redisson_delay_queue:{DelayMessage}" "1615736445889" "100"
1615736443.935305 [4 lua] "zrangebyscore" "redisson_delay_queue_timeout:{DelayMessage}" "0" "1615736445889" "limit" "0" "100"
1615736443.935335 [4 lua] "rpush" "DelayMessage" "\x04>\x04BBBB"
1615736443.935348 [4 lua] "lrem" "redisson_delay_queue:{DelayMessage}" "1" "\x19\xef\xa5\x0c9>\xcb\xc3\a\x00\x00\x00\x00\x00\x00\x00\x04>\x04BBBB"
1615736443.935361 [4 lua] "zrem" "redisson_delay_queue_timeout:{DelayMessage}" "\x19\xef\xa5\x0c9>\xcb\xc3\a\x00\x00\x00\x00\x00\x00\x00\x04>\x04BBBB"
1615736443.935373 [4 lua] "zrange" "redisson_delay_queue_timeout:{DelayMessage}" "0" "0" "WITHSCORES"
1615736491.636923 [4 120.229.16.239:34818] "PING"
===========================================================================
1615736491.637112 [4 120.229.16.239:34819] "PING"
1615736497.234518 [4 120.229.16.239:34797] "EVAL" "local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); if #expiredValues > 0 then for i, v in ipairs(expiredValues) do local randomId, value = struct.unpack('dLc0', v);redis.call('rpush', KEYS[1], value);redis.call('lrem', KEYS[3], 1, v);end; redis.call('zrem', KEYS[2], unpack(expiredValues));end; local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); if v[1] ~= nil then return v[2]; end return nil;" "3" "DelayMessage" "redisson_delay_queue_timeout:{DelayMessage}" "redisson_delay_queue:{DelayMessage}" "1615736499189" "100"
1615736497.234599 [4 lua] "zrangebyscore" "redisson_delay_queue_timeout:{DelayMessage}" "0" "1615736499189" "limit" "0" "100"
1615736497.234621 [4 lua] "rpush" "DelayMessage" "\x04>\x04AAAA"
1615736497.234630 [4 lua] "lrem" "redisson_delay_queue:{DelayMessage}" "1" "\x96Vrr[\x1b\xdaC\a\x00\x00\x00\x00\x00\x00\x00\x04>\x04AAAA"
1615736497.234641 [4 lua] "zrem" "redisson_delay_queue_timeout:{DelayMessage}" "\x96Vrr[\x1b\xdaC\a\x00\x00\x00\x00\x00\x00\x00\x04>\x04AAAA"
1615736497.234651 [4 lua] "zrange" "redisson_delay_queue_timeout:{DelayMessage}" "0" "0" "WITHSCORES"
作者

光星

发布于

2021-03-15

更新于

2022-07-12

许可协议

评论