分布式锁理解及Redis实现方案
分布式架构中通常都会存在共享资源,在多个服务或一个服务多个实例同时对此共享资源进行读写操作情况下,会存在数据不一致性,就需要分布式锁来解决此问题。
实现分布式锁通常会借助外部组件,主要有四种实现方式,基于 Redis 实现,基于 Zookeeper 实现,直接使用 Redisson 已实现的分布式锁,Google 的 Chubby 服务。
锁
在计算机科学中,锁(lock)或互斥(mutex)是一种同步机制,用于在多线程环境中强制对共享资源的访问限制。锁旨在强制实施互斥排他、并发控制策略。
Java JDK 有 Lock,synchronized 提供锁功能。数据库 MySQL 有 for update
悲观锁,基于 CAS(Compare and Swap) 实现的乐观锁。
分布式锁
在分布式环境中,一个服务多个实例部署在不同机器上,多实例以不同的进程对共享资源进操作,为确保数据一致性,必须以互斥的方式执行,就需要借助外部组件实现分布式锁来保证不同进程之间的互斥性。
分布式锁是控制分布式系统之间同步访问共享资源的一种方式,用于协调不同服务对共享资源的写操作,通过互斥的方式来防止不同服务之间的干扰来保证一致性。
分布式锁理解
在同一进程里的多线程若要对共享资源进行操作,就需要加锁来让线程排队处理,最简单粗暴的处理是增加一个公共状态变量,线程一直循环该变量值判断是否执行操作,不同线程对该变量的判断是互斥的。
下面只是简单的伪代码演示:
1 | thread-1 |
分布式锁实际也是基于此思路来设计方案的,但因为是分布式环境,就需要把该 公共状态变量 放到外部存储,考虑性能问题,通常把此公共状态变量放到内存级的数据存储系统,如 Redis。通过设置和获取公共状态变量的值是判断是否执行后续的处理,相当于多进程之间是互斥的。
当然,分布锁还有其它特性是需要关注的,在实现上因为要考滤多种情况,实现逻辑会稍显复杂些,如下。
分布式锁特性
分布式锁方案必须考虑以下特性:
- 确保互斥:多线程或多服务实例之间必须是互斥的,锁的基本特性,否则就不叫锁了。
- 避免死锁:如果一个线程或服务获得资源锁,但后面挂了,并没有释放锁,导致其它线程或服务永远无法获得锁而进入无限等待,这就会出现死锁,必须做到避免死锁。
- 保证性能:高并发分布式环境中,线程互斥等待会成为性能瓶颈,在分布式锁实现的中间件和方案上需要保证性能。
- 锁扩展特性:基于分布式环境的复杂场景,分布式锁不能只是加锁,然后一直等待。最好实现 Java JDK Lock 的一些功能,如:锁判断,超时设置,可重入性等。
基于 Redis 实现的分布式锁,官方总结了三个属性,这些属性是有效使用分布式锁所需的最低保证:
- 安全属性:互斥性。在任何给定时刻,只有一个客户端可以持有锁。
- 活力属性A:无死锁。即使锁定资源的客户端崩溃或被分区,也始终可以获取锁定。
- 活力属性B:容错。只要大多数 Redis 节点启动,客户端就能够获取或释放锁。
分布式锁问题
在开发和使用分布式锁时,需要解决以下问题。
非原子操作:加锁和设置过期时间是分开的,非原子操作。
解决:使用 Lua 脚本 或 set 命令解决。
1
2
3SET key value [NX | XX] [GET] [EX seconds | PX milliseconds]
String result = jedis.set(lockKey, requestId, "NX", "PX", expireTime);忘了释放锁:锁没有设置过期时间,一直不能被释放。
解决:使用 Lua 脚本 或 set 命令设置过期时间,set 命令是原子操作。
占用锁超时:确实因业务复杂,导致占用的锁超时被自动释放。
解决:加锁时同时启动一个守护线程执行定时任务来自动续期。
释放了别人的锁:锁超时没有续期被自动释放,当前线程结束业务释放了其它线程加的锁。
解决:释放锁时,判断是不是自己加的锁,通过判断 set 的 value 值,value 值具有线程唯一性。
大量请求加锁失败:大量请求加锁失败,这时不能直接返回失败。
解决:使用自旋锁,不断尝试加锁(死循环 + 短时休眠不断重试),设置获取锁超时时间,到了超时时间,还未加锁成功,返回失败。
或第一次加锁失败,就走消息队列,释放锁发消息到了队列,通知等待的线程,可以用 Redis 的 阻塞队列。
锁竞争问题:在高并发访问情况下,如果都竞争一把锁,则大大降低了系统性能和业务处理能力。
解决:解决锁性能问题,根本是降低锁的粒度,把锁的粒度变的更细。
方案一,读写锁分离,读读不加锁,只给读写,写写并发操作加锁实现互斥。
方案二,减小锁的粒度,将大锁 分段。例如,库存 100 个商品,分成 5 段,每段 20 个商品,按一定算法将请求路由到分段锁上。在秒杀场景,多线程同时竞争多把锁,减少了等待的线程,提高了系统的吞吐量。
注意:将锁分段提升了系统性能,同样给系统带来了不小的复杂度,因为它需要引入额外的路由算法,跨段统计等功能。这块需要作为一块单独的复杂功能来设计。
参考 Java 中
ConcurrentHashMap
,将数据分为16段
,每一段都有单独的锁,并且处于不同锁段的数据互不干扰,以此来提升锁的性能。重入锁问题:当前线程已获得了锁,再次加锁导致失败。
通常在递归逻辑里获取锁,或一个线程在未释放锁的情况下多次调用加锁的场景会出现。
解决:使用可重入锁,实现逻辑是,在加锁时判断当前线程是否已获取锁,如果是,则锁的计数 +1;释放锁时 -1。意味着有多次加锁,就需要多次释放锁。
如果 -1 后,计数值仍大于 0 ,表示还有引用,重置过期时间,直到计数值等于 0,则删除锁,发消息通知等待线程抢锁。
参考 Java 中的
ReentrantLock
可重入锁。主从复制问题:多实例 Redis 组成的主从、或哨兵模式下;写主节点,读从节点,当主节点故障,从节点升级为主节点。在主节点加的锁,因故障未同步到从节点,导致锁丢失,分布式锁失效。
解决:多实例环境下,给所有节点都加锁。采用类似于投票的思路,计算 N 个节点中 N/2 + 1 个节点加锁成功,则表示加锁成功,否则为加锁失败;如果加锁的总耗时超过待待时间,则认为加锁失败。
实现方案
Redisson
Redisson 是一款非常优秀的,功能非常强大的开源的 Redis Java 客户端,基于高性能异步和无锁 Java Redis 客户端和 Netty 框架。
Redisson 实现的分布式是也是 Redis 官方推荐使用的 Java 实现的分布式锁。Redisson - GitHub,Rediss Distributed Lock 文档。
集成 Redisson
目前新建的应用大多都会基于 Spring Boot 来开始,同时 Redisson 为 Spring Boot 提供了 Starter 包(redisson-spring-boot-starter),这里以此为示例。或直接在项目中引入 redisson
依赖,具体参考 Redisson ReadMe > Quick Start。
项目添加
redisson-spring-boot-starter
依赖Maven pom.xml
1
2
3
4
5<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.11.2</version>
</dependency>添加 Redis 属性设置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15# common spring boot settings
=
=
=
=
=
=
=
=
=
# Redisson settings
#path to redisson.yaml or redisson.json
classpath:redisson.yaml =使用实现了 RedissonClient 接口 或 RedisTemplate / ReactiveRedisTemplate 的 Spring Bean。
最简单使用
1
2
3RLock lock = redisson.getLock("anyLock");
// 最常见的使用方法
lock.lock();指定加锁时间
如果负责储存分布式锁的 Redisson 节点宕机以后,而且这个锁正好处于锁定的状态时,这个锁会出现锁死的状态。
为了避免这种情况的发生,Redisson 内部提供了一个监控锁的看门狗,它的作用是在 Redisson 实例被关闭前,不断的延长锁的有效期。
默认情况下,看门狗的检查锁的超时时间是 30 秒钟,也可以通过修改 Config.lockWatchdogTimeout 来另行指定。
另外 Redisson 还通过加锁的方法提供了
leaseTime
的参数来指定加锁的时间。超过这个时间后锁便自动解开了。1
2
3
4
5
6
7
8
9
10
11
12
13// 加锁以后10秒钟自动解锁
// 无需调用unlock方法手动解锁
lock.lock(10, TimeUnit.SECONDS);
// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
try {
...
} finally {
lock.unlock();
}
}Redisson 同时还为分布式锁提供了异步执行的相关方法:
1
2
3
4RLock lock = redisson.getLock("anyLock");
lock.lockAsync();
lock.lockAsync(10, TimeUnit.SECONDS);
Future<Boolean> res = lock.tryLockAsync(100, 10, TimeUnit.SECONDS);
Redisson 分布式锁类型
Redisson 还提供了其它类型的锁,具体可参考官方文档 Rediss Distributed Lock 文档。
公平锁(Fair Lock)
保证当多个Redisson客户端线程同时请求加锁时,优先分配给先发出请求的线程。所有请求线程会在一个队列中排队,当某个线程出现宕机时,Redisson 会等待5秒后继续下一个线程,也就是说如果前面有5个线程都处于等待状态,那么后面的线程会等待至少25秒。
1
2
3RLock fairLock = redisson.getFairLock("anyLock");
// 最常见的使用方法
fairLock.lock();联锁(MultiLock)
基于 Redis 的 Redisson 分布式 联锁
RedissonMultiLock
对象可以将多个RLock
对象关联为一个联锁,每个RLock
对象实例可以来自于不同的 Redisson 实例。1
2
3
4
5
6
7
8
9
10RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");
RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
// 同时加锁:lock1 lock2 lock3
// 所有的锁都上锁成功才算成功。
lock.lock();
...
lock.unlock();红锁(RedLock)
基于 Redis 的 Redisson 红锁
RedissonRedLock
对象实现了 Redlock 介绍的加锁算法。该对象也可以用来将多个RLock
对象关联为一个红锁,每个RLock
对象实例可以来自于不同的 Redisson 实例。1
2
3
4
5
6
7
8
9
10RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");
RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
// 同时加锁:lock1 lock2 lock3
// 红锁在大部分节点上加锁成功就算成功。
lock.lock();
...
lock.unlock();读写锁(ReadWriteLock)
基于 Redis 的 Redisson 分布式**可重入读写锁 **
RReadWriteLock
Java对象实现了java.util.concurrent.locks.ReadWriteLock
接口。其中读锁和写锁都继承了 RLock 接口。分布式可重入读写锁允许同时有多个读锁和一个写锁处于加锁状态。
1
2
3
4
5RReadWriteLock rwlock = redisson.getReadWriteLock("anyRWLock");
// 最常见的使用方法
rwlock.readLock().lock();
// 或
rwlock.writeLock().lock();信号量(Semaphore)
基于Redis的Redisson的分布式信号量(Semaphore)Java对象
RSemaphore
采用了与java.util.concurrent.Semaphore
相似的接口和用法。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15RSemaphore semaphore = redisson.getSemaphore("semaphore");
semaphore.acquire();
//或
semaphore.acquireAsync();
semaphore.acquire(23);
semaphore.tryAcquire();
//或
semaphore.tryAcquireAsync();
semaphore.tryAcquire(23, TimeUnit.SECONDS);
//或
semaphore.tryAcquireAsync(23, TimeUnit.SECONDS);
semaphore.release(10);
semaphore.release();
//或
semaphore.releaseAsync();可过期性信号量(PermitExpirableSemaphore)
基于 Redis 的 Redisson 可过期性信号量(PermitExpirableSemaphore)是在
RSemaphore
对象的基础上,为每个信号增加了一个过期时间。每个信号可以通过独立的ID来辨识,释放时只能通过提交这个ID才能释放。1
2
3
4
5
6RPermitExpirableSemaphore semaphore = redisson.getPermitExpirableSemaphore("mySemaphore");
String permitId = semaphore.acquire();
// 获取一个信号,有效期只有2秒钟。
String permitId = semaphore.acquire(2, TimeUnit.SECONDS);
// ...
semaphore.release(permitId);闭锁(CountDownLatch)
基于 Redisson 的 Redisson 分布式闭锁(CountDownLatch)Java对象
RCountDownLatch
采用了与java.util.concurrent.CountDownLatch
相似的接口和用法。1
2
3
4
5
6
7RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.trySetCount(1);
latch.await();
// 在其他线程或其他JVM里
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.countDown();
RedissonLock 类图
RedissonLock 实现了 RLock,RLock 继承了 Java JDK 的 Lock 接口,并提供了锁过期策略处理。
Redis SET NX
基于 Redis 的 SET NX 命令实现分布锁,可以先了解下 setnx, expire, set/get
命令。
基于 Redis 实现的分布式锁是对 Redis SETNX 命令 和 SET key value [EX second] [PX milliseconds] [NX | XX] 命令的充分应用。
分布式锁核心步骤
Redis 实现分布式锁,依赖 SETNX 命令 或 SET 命令结合 NX 属性的使用。
加锁
最简单的方法是使用 setnx 命令。key 是锁的唯一标识,按业务来决定命名,例如给秒杀活动,将参于秒杀产品总数写入到 Redis,对减库存操作加锁, key 命名为
locak_sale_goodsId
,value 简单设置为 1。加锁伪代码如下:**setnx(key,1)**:当一个线程执行 setnx 返回 1 ,说明 key 原本不存在,则该线程成功获得锁(加锁);当返回 0 时,说明 key 已存在,该线程独得锁失败。
解锁
有加锁,就得有解锁。当得到锁的线程执行完任务,需要释放锁,以便其他线程可以进入。锁释放的最简单方式是执行 DEL 指令,伪代码如下:
del(key):释放锁之后,其他线程可以继续执行 setnx 命令来获得锁。
锁超时
如果一个得到锁的线程在执行任务过程中挂掉,来不及释放所持有的锁,则别的线程再也别想获得锁,就会无限等待,进入死锁状态。
所以 SETNX 的 KEY 必须设置一个超时时间,以保证即使没有被显式释放,这把锁也要在一定时间后自动释放。SETNX 命令不支持超时参数,就需要借助 EXPIRE(key,value) 命令,伪代码 **expire(key, 30)**。
综合以上操作的伪代码如下:
1
2
3
4
5
6
7
8if(setnx(key,1) == 1){
expire(key,30);
try{
do something......
}finally{
del(key)
}
}
分布式锁步骤完善
上面的基本步骤实现的分布式锁是存在缺陷的,也就不能满足分布式锁的特性了。
加锁由 setnx 和 exipre 两步操作,但是非原子性的。
一个极端情况是,在执行 setnx 时,成功得到了锁,但刚执行成功,正准备执行 exipre 命令,线程挂了,这把锁就没有设置过期时间,变得 长生不老,另的线程再也无法获得锁了。
解决:因 setnx 命令本身不支持传入超时时间,可以改为使用 set 命令,传入过期时间和 NX 参数,伪代码如下:
**set(key, 1, 30, NX)**:即把 setnx 和 expire 两条命令合为一条来执行。
备注:也可以使用 Lua 脚本来执行 setnx 和 expire 两条命令,Lua 脚本是原子性的。
del 导致误删除
另一个极端场景,假如某个线程A成功获得了锁,前且设置了超时时间,但因某些原因而执行的很慢,执行耗时已经达到了超时时间,这时候锁过期自动释放,其他线程B得到了锁。
随后线程 A 执行完任务,接着执行 del 指令来释放锁。但这时候线程B 还在执行任务,线程A 实际上删除的是线程B加的锁。
解决:为了避免这种情况,可以在 del 释放锁之前判断当前锁是不是自己加的锁。具体实现可以把当前的 线程ID 当做 value,在删除之前验证 key 对应的 value 是不是自己线程的ID。伪代码如下:
加锁:
1
2String threadId = Thread.currentThread().getId();
set(key,threadId,30,NX);解锁:
1
2
3if(threadId.equals(redisClient.get(key))){
del(key);
}这里又会隐含一个新的问题,判断和释放锁是两个独立操作,不是原子性的,所以这块改为用 Lua 脚本实现,Lua 脚本是原子性的。
1
2String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
redisClient.eval(luaScript , Collections.singletonList(key), Collections.singletonList(threadId));这样修改的话,验证和删除过程就是原子操作了。
出现并发的可能性
还是上面第二个问题场景,如果线程 A 执行需要较长时间,基于锁互斥性,锁应仍被线程A 占用,并发的线程B进来并不能获得锁,但 线程A 执行所需时长超过了过期时间,就需要对快过期的锁续期。
解决:可以让获得锁的线程开启一个守护线程,用于给快要过期的锁 续期。通常会在过期时间过去了 2/3 时间时(默认过期 30秒,2/3 时间即过去了 20秒),让守护线程执行 expire 指令,为这把锁续期 2/3 过期时长(续期 20 秒,等于重新设置过期时间),守护线程从第 2/3 时间开始执行(每次从第 20 秒 开始执行续期)。
另一种情况,如果节点 1 忽略崩溃或断电,由于线程A 和 守护线程在同一个进程,过护线程也会停下。锁到了超时时间,没有续期,也就会自动释放。
Redis分布式锁实现
基于上面描述的思路,基于 Redis 分布式锁就不难实现了。以下示例代码基于 Spring Boot 和 RedisTemplate 操作。
Redis 配置类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class RedisConfig {
/**
* redis默认使用jdk的二进制数据来序列化
* 以下自定义使用 FastJson 来序列化
*
* @param redisConnectionFactory
* @return RedisTemplate
*/
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(redisConnectionFactory);
FastJsonRedisSerializer serializer = new FastJsonRedisSerializer(Object.class);
template.setKeySerializer(new StringRedisSerializer());
template.setValueSerializer(serializer);
template.setHashKeySerializer(new StringRedisSerializer());
template.setHashValueSerializer(serializer);
template.afterPropertiesSet();
return template;
}
}Redis 分布式锁操作类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class RedisLockUtil {
private static final Long RELEASE_SUCCESS = 1L;
private RedisTemplate<String, Object> redisTemplate;
/**
* 加锁
*
* @param lockKey 锁的 key
* @param lockValue 通常为 threadId 或 requestId
* @param expireTime 锁过期时间
* @return 是否加锁成功
*/
public boolean tryLock(String lockKey, String lockValue, int expireTime, TimeUnit timeUnit) {
//set(key,value,expire_time,nx),如果不存在则设置,成功返回true,表示加锁成功
Boolean lock = redisTemplate.opsForValue().setIfAbsent(lockKey, lockValue, expireTime, timeUnit);
boolean result = !ObjectUtils.isEmpty(lock) ? lock : Boolean.FALSE;
if (result) {
RedisRenewalDaemonThread renewalDaemonThread = new RedisRenewalDaemonThread(redisTemplate, lockKey, expireTime);
renewalDaemonThread.setDaemon(true);
renewalDaemonThread.start();
}
return result;
}
/**
* 释放锁
*
* @param lockKey 锁的 key
* @param lockValue 通常为 threadId 或 requestId
* @return 是否释放成功
*/
public boolean unlock(String lockKey, String lockValue) {
//释放锁 lua 脚本,
String luaScript = "if (redis.call('get', KEYS[1]) == ARGV[1]) then return redis.call('del', KEYS[1]) else return 0 end";
//注意要设置脚本执行返回类型
RedisScript<Long> redisScript = new DefaultRedisScript<>(luaScript, Long.class);
List<String> keyList = new ArrayList<>();
keyList.add(lockKey);
Long releaseResult = redisTemplate.execute(redisScript, keyList, lockValue);
return RELEASE_SUCCESS.equals(releaseResult);
}
}Redis 分布式锁 Key 过期续期守护线程类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40public class RedisRenewalDaemonThread extends Thread {
private String lockKey;
private long expireTime;
private RedisTemplate redisTemplate;
public RedisRenewalDaemonThread() {
}
public RedisRenewalDaemonThread(RedisTemplate redisTemplate, String lockKey, long expireTime) {
this.redisTemplate = redisTemplate;
this.lockKey = lockKey;
this.expireTime = expireTime;
}
public void run() {
long renew = this.expireTime / 3;
//如果过期时间很短,小于3秒
if (renew == 0) {
renew = 1L;
}
//System.out.println("rest expire: " + renew);
while (true) {
try {
long expire = redisTemplate.getExpire(lockKey);
//System.out.println(expire);
//如果剩余过期时间小于等于三分之一,则续期。
//注意,这里是小于等于,若只是小于,expire 可能出现负数
if (expire <= renew) {
redisTemplate.expire(lockKey, expireTime, TimeUnit.SECONDS);
}
//请求间隔,降低无效请求频率
Thread.sleep(renew * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}或开启一个定时任务执行续期,如下:
1
2
3
4
5
6
7Timer timer = new Timer();
timer.schedule(new TimerTask() {
public void run(Timeout timeout) throws Exception {
//自动续期逻辑
}
}, 10000, TimeUnit.MILLISECONDS);多线程并发测试分布式锁
Redis 里库存数据设置为 3 个
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145package com.gxitsky.lock;
import com.gxitsky.lock.config.RedisLockUtil;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class RedisLockApplicationTests {
private static volatile boolean stock = true;
private StringRedisTemplate stringRedisTemplate;
private RedisTemplate<String, String> redisTemplate;
private RedisLockUtil redisLockUtil;
public void contextLoads() {
}
/**
* @desc 分布式锁多线程并发测试(手动创建线程)
* @author gxing
* @date 2022/1/9
*/
public void lockTest1() throws InterruptedException {
String key = "apple";
String LOCK_KEY = "LOCK_KEY:apple";
int maxNum = 8;
final CountDownLatch countDownLatch = new CountDownLatch(maxNum);
long startTime = System.currentTimeMillis();
for (int i = 0; i < maxNum; i++) {
Thread thread = new Thread(new Runnable() {
public void run() {
try {
tryLock(LOCK_KEY);
decrementStock(key, 1);
countDownLatch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
thread.setName("Thread_Name_" + i);
thread.start();
}
countDownLatch.await();
System.out.println("end takeTime:" + (System.currentTimeMillis() - startTime));
}
/**
* @desc 分布式锁多线程并发测试(线程池)
* @author gxing
* @date 2022/1/9
*/
public void lockTest2() throws InterruptedException {
String key = "apple";
String LOCK_KEY = "LOCK_KEY:apple";
int maxNum = 5;
final CountDownLatch countDownLatch = new CountDownLatch(maxNum);
long startTime = System.currentTimeMillis();
ExecutorService executorService = Executors.newFixedThreadPool(50);
for (int i = 0; i < maxNum; i++) {
executorService.submit(() -> {
Thread thread = Thread.currentThread();
try {
tryLock(LOCK_KEY);
decrementStock(key, 1);
} catch (Exception e) {
e.printStackTrace();
} finally {
redisLockUtil.unlock(LOCK_KEY, String.valueOf(thread.getId()));
System.out.println(thread.getName() + ":释放锁---------------");
Thread.currentThread().interrupt();
}
countDownLatch.countDown();
});
}
executorService.shutdown();
countDownLatch.await();
System.out.println("end takeTime:" + (System.currentTimeMillis() - startTime));
}
/**
* @param lockKey 锁的Key
* @desc 获取锁
* @author gxing
* @date 2022/1/9
*/
public boolean tryLock(String lockKey) throws InterruptedException {
Thread thread = Thread.currentThread();
boolean lock = redisLockUtil.tryLock(lockKey, String.valueOf(thread.getId()), 300, TimeUnit.MILLISECONDS);
if (!lock) {
System.out.println(thread.getName() + ":获取锁失败");
Thread.sleep(300);
tryLock(lockKey);
} else {
System.out.println(thread.getName() + ":获得锁,开始处理业务---------------");
}
return lock;
}
/**
* @param skuId 商品 skuId
* @desc 减库存业务
* @author gxing
* @date 2022/1/9
*/
public void decrementStock(String skuId, long num) throws InterruptedException {
String name = Thread.currentThread().getName();
if (!stock) {
System.out.println(name + ":秒杀已结束");
return;
}
Thread.sleep(400);
String count = redisTemplate.opsForValue().get(skuId);
assert count != null;
if (Integer.parseInt(count) > 0) {
Long decrement = redisTemplate.opsForValue().decrement(skuId, num);
System.out.println(name + ":执行完减库存,剩余数量:" + decrement + "---------------");
} else {
stock = false;
System.out.println(name + ":库存不足---------------");
throw new RuntimeException(name + ":库存不足");
}
}
}测试结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44获取锁失败 :
获取锁失败 :
获取锁失败 :
获取锁失败 :
获得锁,开始处理业务--------------- :
获取锁失败 :
获取锁失败 :
获取锁失败 :
获取锁失败 :
执行完减库存,剩余数量:2--------------- :
释放锁--------------- :
获得锁,开始处理业务--------------- :
获取锁失败 :
获取锁失败 :
获取锁失败 :
获取锁失败 :
获取锁失败 :
获取锁失败 :
执行完减库存,剩余数量:1--------------- :
释放锁--------------- :
获得锁,开始处理业务--------------- :
获取锁失败 :
获取锁失败 :
获取锁失败 :
获取锁失败 :
执行完减库存,剩余数量:0--------------- :
释放锁--------------- :
获得锁,开始处理业务--------------- :
获取锁失败 :
获取锁失败 :
库存不足--------------- :
pool-1-thread-1:库存不足 :
at com.gxitsky.lock.RedisLockApplicationTests.decrementStock(RedisLockApplicationTests.java:142)
at com.gxitsky.lock.RedisLockApplicationTests.lambda$lockTest2$0(RedisLockApplicationTests.java:86)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
at java.util.concurrent.FutureTask.run(FutureTask.java)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
释放锁--------------- :
获得锁,开始处理业务--------------- :
秒杀已结束 :
释放锁--------------- :
Redis 解锁通知
在并发情况下,没有拿到锁的线程可能会采用自旋的方式(while(true))循环请求来获取锁,这种方式是会浪费 CPU 资源。
解决:
在释放锁后,同时发送锁释放通知( lpush )到一个 Redis List(队列),线程在加锁之前,先执行队列阻塞命令( brpop )从队列中获取锁释放通知,再去加锁。
使用阻塞命令必须指定超时时间,不能无限等待,超时到了同样去加锁。
Redis 阻塞队列,是谁先阻塞,就谁先执行,并且只被执行一次。