在开始提到Redis分布式锁之前,我想跟大家聊点Redis的基础知识。说一下Redis的两个命令:SETNX key value
setnx 是SET if Not eXists(如果不存在,则 SET)的简写。
用法如图,如果不存在set胜利返回int的1,这个key存在了返回0。SETEX key seconds value复制代码将值 value 关联到 key ,并将 key 的生存工夫设为 seconds (以秒为单位)。如果 key 曾经存在,setex命令将覆写旧值。有小伙伴必定会纳闷万一set value 胜利 set time失败,那不就傻了么,这啊Redis官网想到了。setex是一个原子性(atomic)操作,关联值和设置生存工夫两个动作会在同一时间内实现。
我设置了10秒的生效工夫,ttl命令能够查看倒计时,负的阐明曾经到期了。跟大家讲这两个命名也是有起因的,因为他们是Redis实现分布式锁的要害。注释开始前还是看看场景:
我仍然是创立了很多个线程去扣减库存inventory,不出意外的库存扣减程序变了,最终的后果也是不对的。单机加synchronized或者Lock这些惯例操作我就不说了好吧,后果必定是对的。
我先实现一个简略的Redis锁,而后咱们再实现分布式锁,可能更不便大家的了解。还记得下面我说过的命令么,实现一个单机的其实比较简单,你们先思考一下,别往下看。setnx
能够看到,第一个胜利了,没开释锁,前面的都失败了,至多程序问题问题是解决了,只有加锁,缩放前面的拿到,开释如此循环,就能保障依照程序执行。然而你们也发现问题了,还是一样的,第一个仔set胜利了,然而忽然挂了,那锁就始终在那无奈失去开释,前面的线程也永远得不到锁,又死锁了。所以....setex晓得我之前说这个命令的起因了吧,设置一个过期工夫,就算线程1挂了,也会在生效工夫到了,主动开释。我这里就用到了nx和px的联合参数,就是set值并且加了过期工夫,这里我还设置了一个过期工夫,就是这工夫内如果第二个没拿到第一个的锁,就退出阻塞了,因为可能是客户端断连了。
加锁整体加锁的逻辑比较简单,大家基本上都能看懂,不过我拿到以后工夫去减开始工夫的操作感觉有点笨, System.currentTimeMillis()耗费很大的。/** * 加锁 * * @param id * @return */public boolean lock(String id) { Long start = System.currentTimeMillis(); try { for (; ; ) { //SET命令返回OK ,则证实获取锁胜利 String lock = jedis.set(LOCK_KEY, id, params); if ("OK".equals(lock)) { return true; } //否则循环期待,在timeout工夫内仍未获取到锁,则获取失败 long l = System.currentTimeMillis() - start; if (l >= timeout) { return false; } try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } finally { jedis.close(); }}
System.currentTimeMillis耗费大,每个线程进来都这样,我之前写代码,就会在服务器启动的时候,开一个线程一直去拿,调用方间接获取值就好了,不过也不是最优解,日期类还是有很多好办法的。@Servicepublic class TimeServcie { private static long time; static { new Thread(new Runnable(){ @Override public void run() { while (true){ try { Thread.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } long cur = System.currentTimeMillis(); setTime(cur); } } }).start(); }
public static long getTime() { return time; }
public static void setTime(long time) { TimeServcie.time = time; }}
解锁解锁的逻辑更加简略,就是一段Lua的拼装,把Key做了删除。你们发现没,我下面加锁解锁都用了UUID,这就是为了保障,谁加锁了谁解锁,要是你删掉了我的锁,那不乱套了嘛。LUA是原子性的,也比较简单,就是判断一下Key和咱们参数是否相等,是的话就删除,返回胜利1,0就是失败。/** * 解锁 * * @param id * @return */public boolean unlock(String id) { String script = "if redis.call('get',KEYS[1]) == ARGV[1] then" + " return redis.call('del',KEYS[1]) " + "else" + " return 0 " + "end"; try { String result = jedis.eval(script, Collections.singletonList(LOCK_KEY), Collections.singletonList(id)).toString(); return "1".equals(result) ? true : false; } finally { jedis.close(); }}
验证咱们能够用咱们写的Redis锁试试成果,能够看到都依照程序去执行了
思考大家是不是感觉完满了,然而下面的锁,有不少瑕疵的,我没思考很多点,你或者能够思考一下,源码我都开源到我的GItHub了。而且,锁个别都是须要可重入行的,下面的线程都是执行完了就开释了,无奈再次进入了,进去也是从新加锁了,对于一个锁的设计来说必定不是很正当的。我不打算手写,因为都有现成的,他人帮咱们写好了。redissonredisson的锁,就实现了可重入了,然而他的源码比拟艰涩难懂。应用起来很简略,因为他们底层都封装好了,你连贯上你的Redis客户端,他帮你做了我下面写的所有,而后更完满。简略看看他的应用吧,跟失常应用Lock没啥区别。ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(inventory, inventory, 10L, SECONDS, linkedBlockingQueue);long start = System.currentTimeMillis();Config config = new Config();config.useSingleServer().setAddress("redis://127.0.0.1:6379");final RedissonClient client = Redisson.create(config);final RLock lock = client.getLock("lock1");
for (int i = 0; i <= NUM; i++) { threadPoolExecutor.execute(new Runnable() { public void run() { lock.lock(); inventory--; System.out.println(inventory); lock.unlock(); } });}long end = System.currentTimeMillis();System.out.println("执行线程数:" + NUM + " 总耗时:" + (end - start) + " 库存数为:" + inventory);
下面能够看到我用到了getLock,其实就是获取一个锁的实例。RedissionLock也没做啥,就是相熟的初始化。public RLock getLock(String name) { return new RedissonLock(connectionManager.getCommandExecutor(), name);}
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) { super(commandExecutor, name); //命令执行器 this.commandExecutor = commandExecutor; //UUID字符串 this.id = commandExecutor.getConnectionManager().getId(); //外部锁过期工夫 this.internalLockLeaseTime = commandExecutor. getConnectionManager().getCfg().getLockWatchdogTimeout(); this.entryName = id + ":" + name;}
加锁有没有发现很多跟Lock很多类似的中央呢?尝试加锁,拿到以后线程,而后我结尾说的ttl也看到了,是不是一切都是那么相熟?public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException { //以后线程ID long threadId = Thread.currentThread().getId(); //尝试获取锁 Long ttl = tryAcquire(leaseTime, unit, threadId); // 如果ttl为空,则证实获取锁胜利 if (ttl == null) { return; } //如果获取锁失败,则订阅到对应这个锁的channel RFuture<RedissonLockEntry> future = subscribe(threadId); commandExecutor.syncSubscription(future);
try { while (true) { //再次尝试获取锁 ttl = tryAcquire(leaseTime, unit, threadId); //ttl为空,阐明胜利获取锁,返回 if (ttl == null) { break; } //ttl大于0 则期待ttl工夫后持续尝试获取 if (ttl >= 0) { getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { getEntry(threadId).getLatch().acquire(); } } } finally { //勾销对channel的订阅 unsubscribe(future, threadId); } //get(lockAsync(leaseTime, unit));}
获取锁获取锁的时候,也比较简单,你能够看到,他也是一直刷新过期工夫,跟我下面一直去拿以后工夫,校验过期是一个情理,只是我比拟毛糙。private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
//如果带有过期工夫,则依照一般形式获取锁 if (leaseTime != -1) { return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } //先依照30秒的过期工夫来执行获取锁的办法 RFuture<Long> ttlRemainingFuture = tryLockInnerAsync( commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); //如果还持有这个锁,则开启定时工作一直刷新该锁的过期工夫 ttlRemainingFuture.addListener(new FutureListener<Long>() { @Override public void operationComplete(Future<Long> future) throws Exception { if (!future.isSuccess()) { return; }
Long ttlRemaining = future.getNow(); // lock acquired if (ttlRemaining == null) { scheduleExpirationRenewal(threadId); } } }); return ttlRemainingFuture;}
底层加锁逻辑你可能会想这么多操作,在一起不是原子性不还是有问题么?大佬们必定想得到呀,所以还是LUA,他应用了Hash的数据结构。次要是判断锁是否存在,存在就设置过期工夫,如果锁曾经存在了,那比照一下线程,线程是一个那就证实能够重入,锁在了,然而不是以后线程,证实他人还没开释,那就把剩余时间返回,加锁失败。是不是有点绕,多了解一遍。<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
//过期工夫 internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, //如果锁不存在,则通过hset设置它的值,并设置过期工夫 "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + //如果锁已存在,并且锁的是以后线程,则通过hincrby给数值递增1 "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + //如果锁已存在,但并非本线程,则返回过期工夫ttl "return redis.call('pttl', KEYS[1]);", Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }
解锁锁的开释次要是publish开释锁的信息,而后做校验,一样会判断是否以后线程,胜利就开释锁,还有个hincrby递加的操作,锁的值大于0阐明是可重入锁,那就刷新过期工夫。如果值小于0了,那删掉Key开释锁。是不是又和AQS很像了?AQS就是通过一个volatile润饰status去看锁的状态,也会看数值判断是否是可重入的。所以我说代码的设计,最初就万剑归一,都是一样的。public RFuture<Void> unlockAsync(final long threadId) { final RPromise<Void> result = new RedissonPromise<Void>(); //解锁办法 RFuture<Boolean> future = unlockInnerAsync(threadId);
future.addListener(new FutureListener<Boolean>() { @Override public void operationComplete(Future<Boolean> future) throws Exception { if (!future.isSuccess()) { cancelExpirationRenewal(threadId); result.tryFailure(future.cause()); return; } //获取返回值 Boolean opStatus = future.getNow(); //如果返回空,则证实解锁的线程和以后锁不是同一个线程,抛出异样 if (opStatus == null) { IllegalMonitorStateException cause = new IllegalMonitorStateException(" attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + threadId); result.tryFailure(cause); return; } //解锁胜利,勾销刷新过期工夫的那个定时工作 if (opStatus) { cancelExpirationRenewal(null); } result.trySuccess(null); } });
return result;}
protected RFuture<Boolean> unlockInnerAsync(long threadId) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, EVAL, //如果锁曾经不存在, 公布锁开释的音讯 "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end;" + //如果开释锁的线程和已存在锁的线程不是同一个线程,返回null "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + //通过hincrby递加1的形式,开释一次锁 //若剩余次数大于0 ,则刷新过期工夫 "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + //否则证实锁曾经开释,删除key并公布锁开释的音讯 "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; "+ "end; " + "return nil;", Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
}
总结这个写了比拟久,然而不是因为简单什么的,是因为集体工作的起因,最近事件很多嘛,还是那句话,程序员才是我的本职写文章只是个喜好,不能轻重倒置了。大家会发现,你学懂一个技术栈之后,学新的会很快,而且也能发现他们的设计思维和技巧真的很奇妙,也总能找到类似点,和让你惊叹的点。就拿Doug Lea写的AbstractQueuedSynchronizer(AQS)来说,他写了一行代码,你可能看几天才能看懂,大佬们的思维是真的牛。我看源码有时候也头疼,然而去谷歌一下,本人了解一下,忽然豁然开朗的时候感觉所有又很值。学习就是一条时而郁郁寡欢,时而开环大笑的路,大家加油,咱们成长路上一起共勉。一个在互联网得过且过的工具人。注:如果本篇博客有任何谬误和倡议,欢送人才们留言,你快说句话啊!