Java redisson实现分布式锁原理详解

网友投稿 251 2023-05-10

Java redisson实现分布式锁原理详解

Redisson分布式锁

之前的基于注解的锁有一种锁是基本redis的分布式锁,锁的实现我是基于redisson组件提供的RLock,这篇来看看redisson是如何实现锁的。

不同版本实现锁的机制并不相同

引用的redisson最近发布的版本3.2.3,不同的版本可能实现锁的机制并不相同,早期版本好像是采用简单的setnx,getset等常规命令来配置完成,而后期由于redis支持了脚本Lua变更了实现原理。

org.redisson

redisson

3.2.3

setnx需要配合getset以及事务来完成,这样才能比较好的避免死锁问题,而新版本由于支持lua脚本,可以避免使用事务以及操作多个redis命令,语义表达更加清晰一些。

RLock接口的特点

继承标准接口Lock

拥有标准锁接口的所有特性,比如lock,unlock,trylock等等。

扩展标准接口Lock

展了很多方法,常用的主要有:强制锁释放,带有效期的锁,还有一组异步的方法。其中前面两个方法主要是解决标准lock可能造成的死锁问题。比如某个线程获取到锁之后,线程所在机器死机,此时获取了锁的线程无法正常释放锁导致其余的等待锁的线程一直等待下去。

可重入机制

各版本实现有差异,可重入主要考虑的是性能,同一线程在未释放锁时如果再次申请锁资源不需要走申请流http://程,只需要将已经获取的锁继续返回并且记录上已经重入的次数即可,与jdk里面的ReentrantLock功能类似。重入次数靠hincrby命令来配合使用,详细的参数下面的代码。

怎么判断是同一线程?

redisson的方案是,RedissonLock实例的一个guid再加当前线程的id,通过getLockName返回

public class RedissonLock extends RedissonExpirable implements RLock {

final UUID id;

protected RedissonLock(CommandExecutor commandExecutor, String name, UUID id) {

super(commandExecutor, name);

this.internalLockLeaseTime = TimeUnit.SECONDS.toMillis(30L);

this.commandExecutor = commandExecutor;

this.id = id;

}

String getLockName(long threadId) {

return this.id + ":" + threadId;

}

RLock获取锁的两种场景

这里拿tryLock的源码来看:tryAcquire方法是申请锁并返回锁有效期还剩余的时间,如果为空说明锁未被其它线程申请直接获取并返回,如果获取到时间,则进入等待竞争逻辑。

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {

long time = unit.toMillis(waitTime);

long current = System.currentTimeMillis();

final long threadId = Thread.currentThread().getId();

Long ttl = this.tryAcquire(leaseTime, unit);

if(ttl == null) {

//直接获取到锁

return true;

} else {

//有竞争的后续看

}

}

无竞争,直接获取锁

先看下首先获取锁并释放锁背后的redis都在做什么,可以利用redis的monitor来在后台监控redis的执行情况。当我们在方法了增加@RequestLockable之后,其实就是调用lock以及unlock,下面是redis命令:

加锁

由于高版本的redis支持lua脚本,所以redisson也对其进行了支持,采用了脚本模式,不熟悉lua脚本的可以去查找下。执行lua命令的逻辑如下:

RFuture tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) {

this.internalLockLeaseTime = unit.toMillis(leaseTime);

return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call(\'exists\', KEYS[1]) == 0) then redis.call(\'hset\', KEYS[1], ARGV[2], 1); redis.call(\'pexpire\', KEYS[1], ARGV[1]); return nil; end; if (redis.call(\'hexists\', KEYS[1], ARGV[2]) == 1) then redis.call(\'hincrby\', KEYS[1], ARGV[2], 1); redis.call(\'pexpire\', KEYS[1], ARGV[1]); return nil; end; return redis.call(\http://'pttl\', KEYS[1]);", Collections.singletonList(this.getName()), new Object[]{Long.valueOf(this.internalLockLeaseTime), this.getLockName(threadId)});

}

加锁的流程:

判断lock键是否存在,不存在直接调用hset存储当前线程信息并且设置过期时间,返回nil,告诉客户端直接获取到锁。

判断lock键是否存在,存在则将重入次数加1,并重新设置过期时间,返回nil,告诉客户端直接获取到锁。

被其它线程已经锁定,返回锁有效期的剩余时间,告诉客户端需要等待。

"EVAL"

"if (redis.call('exists', KEYS[1]) == 0) then

redis.call('hset', KEYS[1], ARGV[2], 1);

redis.call('pexpire', KEYS[1], ARGV[1]);

return nil; end;

if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then

redis.call('hincrby', KEYS[1], ARGV[2], 1);

redis.call('pexpire', KEYS[1], ARGV[1]);

return nil; end;

return redis.call('pttl', KEYS[1]);"

"1" "lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0"

"1000" "346e1eb8-5bfd-4d49-9870-042df402f248:21"

上面的lua脚本会转换成真正的redis命令,下面的是经过lua脚本运算之后实际执行的redis命令。

1486642677.053488 [0 lua] "exists" "lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0"

1486642677.053515 [0 lua] "hset" "lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0"

"346e1eb8-5bfd-4d49-9870-042df402f248:21" "1"

1486642677.053540 [0 lua] "pexpire" "lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0" "1000"

解锁

解锁的流程看起来复杂些:

如果lock键不存在,发消息说锁已经可用

如果锁不是被当前线程锁定,则返回nil

由于支持可重入,在解锁时将重入次数需要减1

如果计算后的重入次数>0,则重新设置过期时间

如果计算后的重入次数<=0,则发消息说锁已经可用

"EVAL"

"if (redis.call('exists', KEYS[1]) == 0) then

redis.call('publish', KEYS[2], ARGV[1]);

return 1; end;

if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then

return nil;end;

local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);

if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0;

else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end;

return nil;"

"2" "lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0"

"redisson_lock__channel:{lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0}"

"0" "1000"

"346e1eb8-5bfd-4d49-9870-042df402f248:21"

无竞争情况下解锁redis命令:

主要是发送一个解锁的消息,以此唤醒等待队列中的线程重新竞争锁。

1486642678.493691 [0 lua] "exists" "lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0"

1486642678.493712 [0 lua] "publish" "redisson_lock__channel:{lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0}" "0"

有竞争,等待

有竞争的情况在redis端的lua脚本是相同的,只是不同的条件执行不同的redis命令,复杂的在rediDvfIWsson的源码上。当通过tryAcquire发现锁被其它线程申请时,需要进入等待竞争逻辑中。

this.await返回false,说明等待时间已经超出获取锁最大等待时间,取消订阅并返回获取锁失败

this.await返回true,进入循环尝试获取锁。

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {

long time = unit.toMillis(waitTime);

long current = System.currentTimeMillis();

final long threadId = Thread.currentThread().getId();

Long ttl = this.tryAcquire(leaseTime, unit);

if(ttl == null) {

return true;

} else {

//重点是这段

time -= System.currentTimeMillis() - current;

if(time <= 0L) {

return false;

} else {

current = System.currentTimeMillis();

final RFuture subscribeFuture = this.subscribe(threadId);

if(!this.await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {

if(!subscribeFuture.cancel(false)) {

subscribeFuture.addListener(new FutureListener() {

public void operationComplete(Future future) throws Exception {

if(subscribeFuture.isSuccess()) {

RedissonLock.this.unsubscribe(subscribeFuture, threadId);

}

}

});

}

return false;

} else {

boolean var16;

try {

time -= System.currentTimeMillis() - current;

if(time <= 0L) {

boolean currentTime1 = false;

return currentTime1;

}

do {

long currentTime = System.currentTimeMillis();

ttl = this.tryAcquire(leaseTime, unit);

if(ttl == null) {

var16 = true;

return var16;

}

time -= System.currentTimeMillis() - currentTime;

if(time <= 0L) {

var16 = false;

return var16;

}

currentTime = System.currentTimeMillis();

if(ttl.longValue() >= 0L && ttl.longValue() < time) {

this.getEntry(threadId).getLatch().tryAcquire(ttl.longValue(), TimeUnit.MILLISECONDS);

} else {

this.getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);

}

time -= System.currentTimeMillis() - currentTime;

} while(time > 0L);

var16 = false;

} finally {

this.unsubscribe(subscribeFuture, threadId);

}

return var16;

}

}

}

}

循环尝试一般有如下几种方法:

while循环,一次接着一次的尝试,这个方法的缺点是会造成大量无效的锁申请。

Thread.sleep,在上面的while方案中增加睡眠时间以降低锁申请次数,缺点是这个睡眠的时间设置比较难控制。

基于信息量,当锁被其它资源占用时,当前线程订阅锁的释放事件,一旦锁释放会发消息通知待等待的锁进行竞争,有效的解决了无效的锁申请情况。核心逻辑是this.getEntry(threadId).getLatch().tryAcquire,this.getEntry(threadId).getLatch()返回的是一个信号量,有兴趣可以再研究研究。

redisson依赖

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Springboot实现Java阿里短信发送代码实例
下一篇:Java RPC框架如何实现客户端限流配置
相关文章

 发表评论

暂时没有评论,来抢沙发吧~