作者:京东科技 张石磊

1 案例引入

名词简介:

资源:能够了解为一条内容,或者图+文字+链接的载体。

档位ID: 资源的分类组,资源必须归属于档位。

问题形容:当同一个档位下2条资源同时审批通过时,收到擎天审批零碎2条音讯,消费者利用部署了2台机器,此时正好由2台机器别离生产,在并发生产时,先更新资源状态,而后写缓存,每次取前100条资源,相似select * from resource where gear_id=xxx limit 100 order by id desc;

在写档位缓存,此时事务未提交,并发查问时依据档位Id查问时查问不到对方的数据,全量写缓存时导致后写的缓存笼罩了先写的缓存,即缓存被笼罩,导致投放资源缺失。

计划思考 :

计划1:一台机器生产mq–单点问题

计划2:将同档位ID的资源路由到同一个queue,须要审批零碎配合依据档位Id做路由,审批零碎发的音讯不只是cms审批数据,此计划不实用。

计划3:在档位级别加分布式锁。

经比拟,最终采纳计划3是适合的计划.

2 锁阐明和分布式锁抉择

synchronized锁的粒度是JVM过程维度,集群模式下,不能对共享资源加锁,此时须要跨JVM过程的分布式锁。

分布式锁形式外围实现形式长处毛病剖析

1 数据库:

乐观锁,lock

乐观锁,通过版本号实现version

实现简略,不依赖中间件

数据库IO瓶颈,性能差

单实例存在单点问题,主从架构存在数据不统一,主从切换时其余客户端可反复加锁。

2 zookeeper

创立长期节点

CP模型,可靠性高,不存在主从切换不统一问题

频繁创立和销毁长期节点,且

集群模式下,leader数据须要同步到follower才算加锁胜利,性能不如redis

主从切换服务不可用

3 redis集群

setnx+expire

性能高

有封装好的框架redission

反对超时主动删除锁

集群反对高可用,AP模型

主从切换时其余客户端可反复加锁。

R2M是基于开源的Redis cluster(Redis 3.0以上版本)构建的高性能分布式缓存零碎,咱们零碎始终在应用,3.2.5版本开始反对分布式锁。

3 r2m分布式锁原理

示例代码:

String lockKey = CacheKeyHelper.getGearLockKey(EnvEnum.getEnvFlagEnum(envCode),resource.getGearId());R2mLock lock = (R2mLock) r2mClusterClient.getLock(lockKey);//获取锁,锁的默认有效期30s,获取不到锁就阻塞lock.lock();try {    //业务代码    resourceService.afterApprovedHandle(resource);}  finally {    //开释锁    lock.unlock();}

1 加锁外围流程:

加锁流程图:

1):尝试获取锁,通过执行加锁Lua脚本来做;

2):若第一步未获取到锁,则去redis订阅解锁音讯

3):一旦持有锁的线程开释了锁,就会播送解锁音讯,其余线程自旋从新尝试获取锁。

外围加锁原理:应用lua脚本封装了hset和pexpire命令,保障是一个原子操作, KEYS[1]是加锁的key,argv[2]是加锁的客户端ID(UUID+线程ID),ARGV[1]是锁的有效期,默认30s.

private Object acquireInternal(List<String> args) {if (!this.setLocked() && this.getHolderId() != Thread.currentThread().getId()) {return -1L;} else {try {//hash构造,hash的key是加锁的key,键值对的key为客户端的UUID+线程id,value为锁的重入计数器值。return this.lockSha() != null ? this.executor.evalR2mLockSha(this.lockSha(),"if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end;if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end;return -2;", Collections.singletonList(this.lockName), args) : this.executor. == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return -2;", Collections.singletonList(this.lockName), args);} catch (Exception var3) {this.setUnlocked();throw new R2mLockException("Failed to acquire lock " + this.lockName + ".", var3);}}}

args参数

private List<String> acquireArgs(long leaseTime) {        List<String> args = new ArrayList();        if (leaseTime != -1L) {            args.add(String.valueOf(leaseTime));        } else {             //默认30s            args.add(String.valueOf(this.internalLockLeaseTime()));        }        //UUID+以后线程id        args.add(this.currentThreadLockId(Thread.currentThread().getId()));        return args;    }

获取锁失败订阅锁的channel

//获取锁失败,订阅开释锁的音讯 private boolean failedAcquire() {            this.subLock();            return false;        }  private void subLock() {            CompletableFuture<Void> cf = R2mLock.this.executor.lockSubscribe(R2mLock.this.lockPubSub(), R2mLock.this.getLockChannelName(), R2mLock.this);            if (cf != null) {                cf.handleAsync(this::reSubIfEx);            }         }

锁开释后,订阅者通过自旋尝试获取锁。

//tryAcquire获取锁,!tryAcquire就是获取锁失败,锁开释后,告诉线程唤醒后返回false,而后通过自旋,尝试获取锁,public final void acquire(long arg) {        if (!tryAcquire(arg) &&            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))            selfInterrupt();    }  final boolean acquireQueued(final Node node, long arg) {        boolean failed = true;        try {            boolean interrupted = false;            //外部自旋获取锁            for (;;) {                final Node p = node.predecessor();                if (p == head && tryAcquire(arg)) {                    setHead(node);                    p.next = null; // help GC                    failed = false;                    return interrupted;                }                if (shouldParkAfterFailedAcquire(p, node) &&                    parkAndCheckInterrupt())                    interrupted = true;            }        } finally {            if (failed)                cancelAcquire(node);        }    }

2 开释锁外围逻辑:

1)删除分布式锁key(如果可重入锁计数为0)

  1. 发开释锁的播送音讯

3)勾销watchdog

private Object unlockInternal(List<String> args) {        logger.debug("{} trying to unlock.", Thread.currentThread().getId());         Object var2;        try {     //判断锁 key 是否存在,如果存在,而后递加hash的value值,当value值为0时再删除锁key,并且播送开释锁的音讯            if (this.unlockSha() == null) {                var2 = this.executor. == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;", Arrays.asList(this.lockName, this.getLockChannelName()), args);                return var2;            }             var2 = this.executor.evalR2mLockSha(this.unlockSha(), "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;", Arrays.asList(this.lockName, this.getLockChannelName()), args);        } catch (Exception var6) {            throw new R2mLockException("Failed to unlock " + this.lockName + ".", var6);        } finally {            this.finalizeRelease();        }         return var2;    } //勾销以后线程的watchdog private void finalizeRelease() {        long threadId = Thread.currentThread().getId();        R2mLock.ExpirableEntry entry = (R2mLock.ExpirableEntry)this.entryCache.get(threadId);        if (entry != null) {            entry.release(threadId);            if (entry.isReleased()) {                //勾销这个线程watchdog定时工作                entry.getExtTask().cancel();                this.expEntry.compareAndSet(entry, (Object)null);                //从缓存watchdog线程的map中删除该线程                this.entryCache.remove(threadId);            }        }     }

3 锁的健壮性思考

1 业务没执行完,锁超时过期怎么办?

客户端加锁默认有效期30s,超过有效期后如果业务没执行完,还须要持有这把锁,r2m客户端提供了续期机制,也就是watchdog机制。

watchdog原理:客户端线程维度(UUID+线程ID,客户端保护一个MAP,key就是UUID+线程ID)的后盾定时线程,获取锁胜利后,如果客户端还持有以后锁,每隔10s(this.internalLockLeaseTime() / 3L),去缩短锁的有效期(internalLockLeaseTime)

//watchdog外围机制 ,internalLockLeaseTime默认30sprivate void extendLock(long holderId) {        if (this.expEntry.get() != null) {            R2mLock.ExpirableEntry holderEntry = (R2mLock.ExpirableEntry)this.entryCache.get(holderId);            if (holderEntry != null) {                 //每隔10s,如果以后线程持有锁,则续期30s                if (this.expEntry.compareAndSet(holderEntry, holderEntry)) {                    Timeout task = this.timer().newTimeout((timeout) -> {                        if (this.extendLockInternal(holderId)) {                            this.extendLock(holderId);                        }                     }, this.internalLockLeaseTime() / 3L, TimeUnit.MILLISECONDS);                    if (this.expEntry.get() != null) {                        ((R2mLock.ExpirableEntry)this.expEntry.get()).setExtTask(task);                    }                }             }        }    }  //执行续期lua脚本 private boolean extendLockInternal(long threadId) {        Object result;        try {           //只续期            if (this.extendLockSha() != null) {                result = this.executor.evalR2mLockSha(this.extendLockSha(), "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(this.lockName), this.extendLockArgs(threadId));            } else {                result = this.executor. == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(this.lockName), this.extendLockArgs(threadId));            }        } catch (Exception var5) {            return false;        }         return Long.parseLong(result.toString()) == 1L;    }

2 客户端宕机,锁如何开释?

分布式锁是有效期的,客户端宕机后,watchdog机制生效,锁过期主动生效。

3 redis分布式锁集群模式下缺点

r2m集群模式,极其状况,master加锁胜利,宕机,还未来得及同步到slave,主从切换,slave切换成master,能够持续加锁,对于非及其严格加锁场景,该计划可满足,属于AP;对于严格场景下的分布式锁,可采纳基于zookeeper的分布式锁,属于CP,leader宕机,folllower选举时不可用。性能上redis更优。

4 锁的开释问题

留神锁的开释在finally中开释,必须由锁的持有者开释,不能由其余线程开释他人的锁,示例代码中lock放到try的里面。