redisson 实现分布式锁的源码解析

网友投稿 327 2022-08-08


redisson 实现分布式锁的源码解析

目录redisson测试代码加锁设计锁续期设计锁的自旋重试解锁设计撤销锁续期解锁成功唤排队线程

redisson

redisson 实现分布式锁的机制如下:

依赖版本

implementation 'org.redisson:redisson-spring-boot-starter:3.17.0'

测试代码

下面是模拟一个商品秒杀的场景,示例代码如下:

public class RedissonTest {

public static void main(String[] args) {

//1. 配置部分

Config config = new Config();

String address = "redis://127.0.0.1:6379";

SingleServerConfig serverConfig = config.useSingleServer();

serverConfig.setAddress(address);

serverConfig.setDatabase(0);

config.setLockWatchdogTimeout(5000);

Redisson redisson = (Redisson) Redisson.create(config);

RLock rLock = redisson.getLock("goods:1000:1");

//2. 加锁

rLock.lock();

try {

System.out.println("todo 逻辑处理 1000000.");

} finally {

if (rLock.isLocked() && rLock.isHeldByCurrentThread()) {

//3. 解锁

rLock.unlock();

}

}

}

}

加锁设计

rLock.lock();是加锁的核心代码,我们一起来看看调用栈

加锁的核心方法是:org.redisson.RedissonLock#tryLockInnerAsync

RFuture tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) {

return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,

"if (redis.call('exists', KEYS[1]) == 0) then " +

"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +

"redis.call('pexpire', KEYS[1], ARGV[1]); " +

"return nil; " +

"end; " +

"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +

"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +

"redis.call('pexpire', KEYS[1], ARGV[1]); " +

"return nil; " +

"end; " +

"return redis.call('pttl', KEYS[1]);",

Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));

}

其实它的本质是调用一段 LUA 脚本进行加锁。

锁续期设计

锁的续期是在 org.redisson.RedissonLock#tryAcquireAsync方法中调用 scheduleExpirationRenewal实现的。

续期需要注意的是,看门狗是设置在主线程的延迟队列的线程中。

tryAcquireAsync 代码如下:

private RFuture tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {

RFuture ttlRemainingFuture;

if (leaseTime != -1) {

ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);

} else {

ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,

TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);

}

CompletionStage f = ttlRemainingFuture.thenApply(ttlRemaining -> {

// lock acquired

if (ttlRemaining == null) {

if (leaseTime != -1) {

internalLockLeaseTime = unit.toMillis(leaseTime);

} else {

// 锁过期时间续期

scheduleExpirationRenewal(threadId);

}

}

return ttlRemaining;

});

return new CompletableFutureWrapper<>(f);

}

锁续期 scheduleExpirationRenewal代码如下:

protected void scheduleExpirationRenewal(long threadId) {

ExpirationEntry entry = new ExpirationEntry();

ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);

if (oldEntry != null) {

oldEntry.addThreadId(threadId);

} else {

entry.addThreadId(threadId);

try {

renewExpiration();

} finally {

if (Thread.currentThread().isInterrupted()) {

cancelExpirationRenewal(threadId);

}

}

}

}

然后在调用 renewExpiration();执行续期逻辑

private void renewExpiration() {

ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());

if (ee == null) {

return;

}

// 创建延迟任务

Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {

@Override

public void run(Timeout timeout) throws Exception {

ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());

if (ent == null) {

return;

}

Long threadId = ent.getFirstThreadId();

if (threadId == null) {

return;

}

// 真正的续期,调用 LUA 脚本续期

RFuture future = renewExpirationAsync(threadId);

future.whenComplete((res, e) -> {

if (e != null) {

log.error("Can't update lock " + getRawName() + " expiration", e);

EXPIRATION_RENEWAL_MAP.remove(getEntryName());

return;

}

// 如果续期成功

if (res) {

// reschedule itself

renewExpiration();

} else {

cancelExpirationRenewal(null);

}

});

}

}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

ee.setTimeout(task);

}

renewExpirationAsync方法, 里面还是一段 LUA 脚本,进行重新设置锁的过期时间。

protected RFuture renewExpirationAsync(long threadId) {

return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,

"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +

"redis.call('pexpire', KEYS[1], ARGV[1]); " +

"return 1; " +

"end; " +

"return 0;",

Collections.singletonList(getRawName()),

internalLockLeaseTime, getLockName(threadId));

}

锁的自旋重试

org.redisson.RedissonLock#lock(long, java.util.concurrent.TimeUnit, boolean)在执行获取锁失败的时候,会进入重试。其实这里就会执行 18 行以后的 while (true) 逻辑

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {

long threadId = Thread.currentThread().getId();

Long ttl = tryAcquire(-1, leaseTime, unit, threadId);

// lock acquired

if (ttl == null) {

return;

}

CompletableFuture future = subscribe(threadId);

RedissonLockEntry entry;

if (interruptibly) {

entry = commandExecutor.getInterrupted(future);

} else {

entry = commandExecutor.get(future);

}

try {

while (true) {

ttl = tryAcquire(-1, leaseTime, unit, threadId);

// lock acquired

if (ttl == null) {

break;

}

// waiting for message

if (ttl >= 0) {

try {

// 阻塞锁的超时时间,等锁过期后再尝试加锁

entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);

} catch (InterruptedException e) {

if (interruptibly) {

throw e;

}

entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);

}

} else {

if (interruptibly) {

entry.getLatch().acquire();

} else {

entry.getLatch().acquireUninterruptibly();

}

}

}

} finally {

unsubscribe(entry, threnqjsxXaRHFadId);

}

// get(lockAsync(leaseTime, unit));

}

entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);其实这里就是一个间歇性自旋。 等到上次锁过期的时间,在唤醒进行抢锁 entry.getLatch().acquire();

还有一个逻辑就是

CompletableFuture future = subscribe(threadId);

这里其实是会订阅一个消息,如果解锁过后,会发布解锁的消息。

解锁设计

rLock.unlock(); 的核心就是释放锁,撤销续期和唤醒在等待加锁的线程(发布解锁成功消息)。

核心方法(解锁): org.redisson.RedissonLock#unlockInnerAsync

protected RFuture unlockInnerAsync(long threadId) {

return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,

"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +

"return nil;" +

"end; " +

"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +

"if (counter > 0) then " +

"redis.call('pexpire', KEYS[1], ARGV[2]); " +

"return 0; " +

"else " +

"redis.call('del', KEYS[1]); " +

// 发布解锁成功消息

"redis.call('publish', KEYS[2], ARGV[1]); " +

"return 1; " +

"end; " +

"return nil;",

Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));

}

还是 LUA 的执行方式。

撤销锁续期

核心方法 org.redisson.RedissonBaseLock#unlockAsync(long)

@Override

public RFuture unlockAsync(long threadId) {

// 解锁

RFuture future = unlockInnerAsync(threadId);

// 撤销续期

CompletionStage f = future.handle((opStatus, e) -> {

cancelExpirationRenewal(threadId);

if (e != null) {

throw new CompletionException(e);

}

if (opStatus == null) {

IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "

+ id + " thread-id: " + threadId);

throw new CompletionException(cause);

}

return null;

});

return new CompletableFutureWrapper<>(f);

}

解锁成功唤排队线程

在 org.redisson.pubsub.LockPubSub#onMessage中回去唤醒阻塞的线程,让执行前面的锁自旋逻辑,具体代码如下:

@Override

protected void onMessage(RedissonLockEntry value, Long message) {

if (message.equals(UNLOCK_MESSAGE)) {

Runnable runnableToExecute = value.getListeners().poll();

if (runnableToExecute != null) {

runnableToExecute.run();

}

value.getLatch().release();

} else if (message.equals(READ_UNLOCK_MESSAGE)) {

while (true) {

Runnable runnableToExecute = value.getListeners().poll();

if (runnableToExecute == null) {

break;

}

runnableToExecute.run();

}

value.getLatch().release(value.getLatch().getQueueLength());

}

}


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:关于springboot的接口返回值统一标准格式(springboot 接口)
下一篇:Java对象的内存布局全流程(java对象内存大小)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~