作者:京东科技 张石磊
1 案例引入
名词简介:
资源:能够了解为一条内容,或者图 + 文字 + 链接的载体。
档位 ID: 资源的分类组,资源必须归属于档位。
问题形容:当同一个档位下 2 条资源同时审批通过时,收到擎天审批零碎 2 条音讯,消费者利用部署了 2 台机器,此时正好由 2 台机器别离生产,在并发生产时,先更新资源状态,而后写缓存,每次取前 100 条资源,相似 select * from resource where gear_id=xxx limit 100 order by id desc;
在写档位缓存,此时事务未提交,并发查问时依据档位 Id 查问时查问不到对方的数据,全量写缓存时导致后写的缓存笼罩了先写的缓存,即缓存被笼罩,导致投放资源缺失。
计划思考:
计划 1:一台机器生产 mq–单点问题
计划 2:将同档位 ID 的资源路由到同一个 queue, 须要审批零碎配合依据档位 Id 做路由,审批零碎发的音讯不只是 cms 审批数据,此计划不实用。
计划 3:在档位级别加分布式锁。
经比拟,最终采纳计划 3 是适合的计划.
2 锁阐明和分布式锁抉择
synchronized 锁的粒度是 JVM 过程维度,集群模式下,不能对共享资源加锁,此时须要跨 JVM 过程的分布式锁。
分布式锁形式外围实现形式长处毛病剖析
1 数据库:
乐观锁,lock
乐观锁,通过版本号实现 version
实现简略,不依赖中间件
数据库 IO 瓶颈,性能差
单实例存在单点问题,主从架构存在数据不统一,主从切换时其余客户端可反复加锁。
2 zookeeper
创立长期节点
CP 模型,可靠性高,不存在主从切换不统一问题
频繁创立和销毁长期节点,且
集群模式下,leader 数据须要同步到 follower 才算加锁胜利,性能不如 redis
主从切换服务不可用
3 redis 集群
setnx+expire
性能高
有封装好的框架 redission
反对超时主动删除锁
集群反对高可用,AP 模型
主从切换时其余客户端可反复加锁。
R2M 是基于开源的 Redis cluster(Redis 3.0 以上版本) 构建的高性能分布式缓存零碎,咱们零碎始终在应用,3.2.5 版本开始反对分布式锁。
3 r2m 分布式锁原理
示例代码:
String lockKey = CacheKeyHelper.getGearLockKey(EnvEnum.getEnvFlagEnum(envCode),resource.getGearId());
R2mLock lock = (R2mLock) r2mClusterClient.getLock(lockKey);
// 获取锁,锁的默认有效期 30s, 获取不到锁就阻塞
lock.lock();
try {
// 业务代码
resourceService.afterApprovedHandle(resource);
} finally {
// 开释锁
lock.unlock();}
1 加锁外围流程:
加锁流程图:
1):尝试获取锁,通过执行加锁 Lua 脚本来做;
2):若第一步未获取到锁,则去 redis 订阅解锁音讯
3):一旦持有锁的线程开释了锁,就会播送解锁音讯,其余线程自旋从新尝试获取锁。
外围加锁原理:应用 lua 脚本封装了 hset 和 pexpire 命令,保障是一个原子操作,KEYS[1] 是加锁的 key,argv[2] 是加锁的客户端 ID(UUID+ 线程 ID),ARGV[1] 是锁的有效期,默认 30s.
private Object acquireInternal(List<String> args) {if (!this.setLocked() && this.getHolderId() != Thread.currentThread().getId()) {return -1L;} else {
try {
//hash 构造,hash 的 key 是加锁的 key, 键值对的 key 为客户端的 UUID+ 线程 id,value 为锁的重入计数器值。return this.lockSha() != null ? this.executor.evalR2mLockSha(this.lockSha(),
"if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end;
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end;
return -2;", Collections.singletonList(this.lockName), args) : this.executor. == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return -2;", Collections.singletonList(this.lockName), args);
} catch (Exception var3) {this.setUnlocked();
throw new R2mLockException("Failed to acquire lock" + this.lockName + ".", var3);
}
}
}
args 参数
private List<String> acquireArgs(long leaseTime) {List<String> args = new ArrayList();
if (leaseTime != -1L) {args.add(String.valueOf(leaseTime));
} else {
// 默认 30s
args.add(String.valueOf(this.internalLockLeaseTime()));
}
//UUID+ 以后线程 id
args.add(this.currentThreadLockId(Thread.currentThread().getId()));
return args;
}
获取锁失败订阅锁的 channel
// 获取锁失败,订阅开释锁的音讯
private boolean failedAcquire() {this.subLock();
return false;
}
private void subLock() {CompletableFuture<Void> cf = R2mLock.this.executor.lockSubscribe(R2mLock.this.lockPubSub(), R2mLock.this.getLockChannelName(), R2mLock.this);
if (cf != null) {cf.handleAsync(this::reSubIfEx);
}
}
锁开释后,订阅者通过自旋尝试获取锁。
//tryAcquire 获取锁,!tryAcquire 就是获取锁失败,锁开释后,告诉线程唤醒后返回 false, 而后通过自旋,尝试获取锁,public final void acquire(long arg) {if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();}
final boolean acquireQueued(final Node node, long arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 外部自旋获取锁
for (;;) {final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {if (failed)
cancelAcquire(node);
}
}
2 开释锁外围逻辑:
1)删除分布式锁 key(如果可重入锁计数为 0)
- 发开释锁的播送音讯
3)勾销 watchdog
private Object unlockInternal(List<String> args) {logger.debug("{} trying to unlock.", Thread.currentThread().getId());
Object var2;
try {
// 判断锁 key 是否存在,如果存在,而后递加 hash 的 value 值,当 value 值为 0 时再删除锁 key,并且播送开释锁的音讯
if (this.unlockSha() == null) {var2 = this.executor. == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;", Arrays.asList(this.lockName, this.getLockChannelName()), args);
return var2;
}
var2 = this.executor.evalR2mLockSha(this.unlockSha(), "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;", Arrays.asList(this.lockName, this.getLockChannelName()), args);
} catch (Exception var6) {throw new R2mLockException("Failed to unlock" + this.lockName + ".", var6);
} finally {this.finalizeRelease();
}
return var2;
}
// 勾销以后线程的 watchdog
private void finalizeRelease() {long threadId = Thread.currentThread().getId();
R2mLock.ExpirableEntry entry = (R2mLock.ExpirableEntry)this.entryCache.get(threadId);
if (entry != null) {entry.release(threadId);
if (entry.isReleased()) {
// 勾销这个线程 watchdog 定时工作
entry.getExtTask().cancel();
this.expEntry.compareAndSet(entry, (Object)null);
// 从缓存 watchdog 线程的 map 中删除该线程
this.entryCache.remove(threadId);
}
}
}
3 锁的健壮性思考
1 业务没执行完,锁超时过期怎么办?
客户端加锁默认有效期 30s,超过有效期后如果业务没执行完,还须要持有这把锁,r2m 客户端提供了续期机制,也就是 watchdog 机制。
watchdog 原理:客户端线程维度(UUID+ 线程 ID, 客户端保护一个 MAP,key 就是 UUID+ 线程 ID)的后盾定时线程,获取锁胜利后,如果客户端还持有以后锁,每隔 10s(this.internalLockLeaseTime() / 3L),去缩短锁的有效期(internalLockLeaseTime)
//watchdog 外围机制,internalLockLeaseTime 默认 30s
private void extendLock(long holderId) {if (this.expEntry.get() != null) {R2mLock.ExpirableEntry holderEntry = (R2mLock.ExpirableEntry)this.entryCache.get(holderId);
if (holderEntry != null) {
// 每隔 10s, 如果以后线程持有锁,则续期 30s
if (this.expEntry.compareAndSet(holderEntry, holderEntry)) {Timeout task = this.timer().newTimeout((timeout) -> {if (this.extendLockInternal(holderId)) {this.extendLock(holderId);
}
}, this.internalLockLeaseTime() / 3L, TimeUnit.MILLISECONDS);
if (this.expEntry.get() != null) {((R2mLock.ExpirableEntry)this.expEntry.get()).setExtTask(task);
}
}
}
}
}
// 执行续期 lua 脚本
private boolean extendLockInternal(long threadId) {
Object result;
try {
// 只续期
if (this.extendLockSha() != null) {result = this.executor.evalR2mLockSha(this.extendLockSha(), "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(this.lockName), this.extendLockArgs(threadId));
} else {result = this.executor. == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(this.lockName), this.extendLockArgs(threadId));
}
} catch (Exception var5) {return false;}
return Long.parseLong(result.toString()) == 1L;
}
2 客户端宕机,锁如何开释?
分布式锁是有效期的,客户端宕机后,watchdog 机制生效,锁过期主动生效。
3 redis 分布式锁集群模式下缺点
r2m 集群模式,极其状况,master 加锁胜利,宕机,还未来得及同步到 slave, 主从切换,slave 切换成 master, 能够持续加锁,对于非及其严格加锁场景,该计划可满足,属于 AP;对于严格场景下的分布式锁,可采纳基于 zookeeper 的分布式锁,属于 CP,leader 宕机,folllower 选举时不可用。性能上 redis 更优。
4 锁的开释问题
留神锁的开释在 finally 中开释,必须由锁的持有者开释,不能由其余线程开释他人的锁,示例代码中 lock 放到 try 的里面。