前言
在咱们日常开发中,难免会遇到要加锁的情景。例如扣除产品库存,首先要从数据库中取出库存,进行库存判断,再减去库存。这一波操作显著不合乎原子性,如果代码块不加锁,很容易因为并发导致超卖问题。咱们的零碎如果是单体架构,那咱们应用本地锁就能够解决问题。如果是分布式架构,就须要应用分布式锁。
计划
应用 SETNX 和 EXPIRE 命令
SETNX key value
EXPIRE key seconds
DEL key
if (setnx("item_1_lock", 1)) {expire("item_1_lock", 30);
try {... 逻辑} catch {...} finally {del("item_1_lock");
}
}
这种办法看起来能够解决问题,然而有肯定的危险,因为 SETNX 和 EXPIRE 这波操作是非原子性的,如果 SETNX 胜利之后,呈现谬误,导致 EXPIRE 没有执行,导致锁没有设置超时工夫造成死锁。
针对这种状况,咱们能够应用 lua 脚本来放弃操作原子性,保障 SETNX 和 EXPIRE 两个操作要么都胜利,要么都不胜利。
if (redis.call('setnx', KEYS[1], ARGV[1]) < 1)
then return 0;
end;
redis.call('expire', KEYS[1], tonumber(ARGV[2]));
return 1;
通过这样的办法,咱们初步解决了竞争锁的原子性问题,尽管其余性能还未实现,然而应该不会造成死锁。
Redis 2.6.12 以上可灵便应用 SET 命令
SET key value NX EX 30
DEL key
if (set("item_1_lock", 1, "NX", "EX", 30)) {
try {... 逻辑} catch {...} finally {del("item_1_lock");
}
}
改良后的办法不须要借助 lua 脚本就解决了 SETNX 和 EXPIRE 的原子性问题。当初咱们再认真推敲推敲,如果 A 拿到了锁顺利进入代码块执行逻辑,然而因为各种起因导致超时主动开释锁。
在这之后 B 胜利拿到了锁进入代码块执行逻辑,但此时如果 A 执行逻辑结束再来开释锁,就会把 B 刚取得的锁开释了。就好比用本人家的钥匙开了别家的门,这是不可承受的。
为了解决这个问题咱们能够尝试在 SET 的时候设置一个锁标识,而后在 DEL 的时候验证以后锁是否为本人的锁。
String value = UUID.randomUUID().toString().replaceAll("-", "");
if (set("item_1_lock", value, "NX", "EX", 30)) {
try {... 逻辑} catch {...} finally {... lua 脚本保障原子性}
}
if (redis.call('get', KEYS[1]) == ARGV[1])
then return redis.call('del', KEYS[1])
else return 0
end
到这里,咱们终于解决了竞争锁的原子性问题和误删锁问题。然而锁个别还须要反对可重入、循环期待和超时主动续约等性能点。上面咱们学习应用一个十分好用的包来解决这些问题。
入门 Redisson
Redission 的锁,实现了可重入和超时主动续约性能,它都帮咱们封装好了,咱们只有依照本人的需要调用它的 API 就能够轻松实现下面所提到的几个性能点。具体性能能够查看 Redisson 文档
在我的项目中装置 Redisson
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.2</version>
</dependency>
implementation 'org.redisson:redisson:3.13.2'
用 Maven 或者 Gradle 构建,目前最新版本为 3.13.2,也能够在这里 Redisson 找到你须要的版本。
简略尝试
RedissonClient redissonClient = Redisson.create();
RLock lock = redissonClient.getLock("lock");
boolean res = lock.lock();
if (res) {
try {... 逻辑} finally {lock.unlock();
}
}
Redisson 将底层逻辑全副做了一个封装 ????,咱们无需关怀具体实现,几行代码就能应用一把完满的锁。上面咱们简略折腾折腾源码。
加锁
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(leaseTime, unit, threadId);
if (ttl == null) {return;}
RFuture<RedissonLockEntry> future = subscribe(threadId);
if (interruptibly) {commandExecutor.syncSubscriptionInterrupted(future);
} else {commandExecutor.syncSubscription(future);
}
try {while (true) {ttl = tryAcquire(leaseTime, unit, threadId);
if (ttl == null) {break;}
if (ttl >= 0) {
try {future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {if (interruptibly) {throw e;}
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {if (interruptibly) {future.getNow().getLatch().acquire();
} else {future.getNow().getLatch().acquireUninterruptibly();
}
}
}
} finally {unsubscribe(future, threadId);
}
}
获取锁
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {if (leaseTime != -1) {return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {if (e != null) {return;}
if (ttlRemaining == null) {scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {internalLockLeaseTime = unit.toMillis(leaseTime);
return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then" +
"redis.call('hincrby', KEYS[1], ARGV[2], 1);" +
"redis.call('pexpire', KEYS[1], ARGV[1]);" +
"return nil;" +
"end;" +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then" +
"redis.call('hincrby', KEYS[1], ARGV[2], 1);" +
"redis.call('pexpire', KEYS[1], ARGV[1]);" +
"return nil;" +
"end;" +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
删除锁
public RFuture<Void> unlockAsync(long threadId) {RPromise<Void> result = new RedissonPromise<Void>();
RFuture<Boolean> future = unlockInnerAsync(threadId);
future.onComplete((opStatus, e) -> {cancelExpirationRenewal(threadId);
if (e != null) {result.tryFailure(e);
return;
}
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id:"
+ id + "thread-id:" + threadId);
result.tryFailure(cause);
return;
}
result.trySuccess(null);
});
return result;
}
protected RFuture<Boolean> unlockInnerAsync(long threadId) {return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then" +
"return nil;" +
"end;" +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);" +
"if (counter > 0) then" +
"redis.call('pexpire', KEYS[1], ARGV[2]);" +
"return 0;" +
"else" +
"redis.call('del', KEYS[1]);" +
"redis.call('publish', KEYS[2], ARGV[1]);" +
"return 1;" +
"end;" +
"return nil;",
Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
总结
应用 Redis 做分布式锁来解决并发问题仍存在一些艰难,也有很多须要留神的点,咱们应该正确评估零碎的体量,不能为了应用某项技术而用。要齐全解决并发问题,仍须要在数据库层面做功夫。
福利:豆花同学为大家精心整顿了一份对于 linux 和 python 的学习材料大合集!有须要的小伙伴们,关注豆花集体公众号:python 头条!回复关键词“材料合集”即可收费支付!