Redisson锁机制源码分析( 三 )

关注renewExpiration()方法
private void renewExpiration() {ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ee == null) {return;}// 创建一个定时任务去实现自动续约Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {// 获取当前锁的ExpirationEntry 对象 。ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ent == null) {return;}// 获取第一个线程IDLong threadId = ent.getFirstThreadId();if (threadId == null) {return;}// 锁续期CompletionStage<Boolean> future = renewExpirationAsync(threadId);future.whenComplete((res, e) -> {if (e != null) {log.error("Can't update lock " + getRawName() + " expiration", e);EXPIRATION_RENEWAL_MAP.remove(getEntryName());return;}// 续约成功,递归自己无限续约下去if (res) {// reschedule itselfrenewExpiration();} else {// 续约失败,表示锁已释放,取消续约任务cancelExpirationRenewal(null);}});}}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); // internalLockLeaseTime / 3表示每隔锁时间的三分之一,去续约一次ee.setTimeout(task);}关注renewExpirationAsync方法
protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return 1; " +"end; " +"return 0;",Collections.singletonList(getRawName()),internalLockLeaseTime, getLockName(threadId));}我们发现,又是一段lua脚本,还是复制出来,格式化后详细解释下代码 。
-- KEYS[1] 加锁的对象(也就是我们传入的的锁名称)-- ARGV[1] 表示锁的过期时间-- ARGV[2]:UUID+当前线程id-- 使用hexists判断锁是不是自己持有的,== 1表示是自己持有,== 0 表示被其他客户端持有if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then-- 重新设置过期时间redis.call('pexpire', KEYS[1], ARGV[1]);-- 返回1 表示续约成功return 1;end ;-- 返回0 表示续约失败,也意味着锁已经被释放或者被其他客户端获取了return 0;所以续约的逻辑就是,启动一个定时任务,每隔续约时间的三分之一次就执行一次 。尝试去续约,续约成功则会一直递归续约下去 。续约失败表示锁已被释放,则停止续约任务 。
而续约的操作就是,判断是否是自己持有锁,是的话就重新设置过期时间,并且返回1表示续约成功,否则返回0表示续约失败 。
2.3、关注RedissonLock.unlock()方法@Overridepublic void unlock() {try {// 其实就是调用了unlockAsync进行解锁get(unlockAsync(Thread.currentThread().getId()));} catch (RedisException e) {if (e.getCause() instanceof IllegalMonitorStateException) {throw (IllegalMonitorStateException) e.getCause();} else {throw e;}}//Future<Void> future = unlockAsync();//future.awaitUninterruptibly();//if (future.isSuccess()) {//return;//}//if (future.cause() instanceof IllegalMonitorStateException) {//throw (IllegalMonitorStateException)future.cause();//}//throw commandExecutor.convertException(future);}我们可以看到,会使用unlockAsync方法进行解锁,并且在这里传入了当前的线程ID 。
关注unlockAsync方法
@Overridepublic RFuture<Void> unlockAsync(long threadId) {// 调用unlockInnerAsync实现异步解锁RFuture<Boolean> future = unlockInnerAsync(threadId);// 释放之后再处理一些事情CompletionStage<Void> f = future.handle((opStatus, e) -> {// 取消(停止)续约任务,这里也会停止watch dogcancelExpirationRenewal(threadId);if (e != null) {throw new CompletionException(e);}if (opStatus == null) {IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "+ id + " thread-id: " + threadId);throw new CompletionException(cause);}return null;});return new CompletableFutureWrapper<>(f);}关注解锁的核心逻辑unlockInnerAsync方法
protected RFuture<Boolean> unlockInnerAsync(long threadId) {return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +"return nil;" +"end; " +"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +"if (counter > 0) then " +"redis.call('pexpire', KEYS[1], ARGV[2]); " +"return 0; " +"else " +"redis.call('del', KEYS[1]); " +"redis.call('publish', KEYS[2], ARGV[1]); " +"return 1; " +"end; " +"return nil;",Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));}


推荐阅读