共计 5324 个字符,预计需要花费 14 分钟才能阅读完成。
前言
在咱们日常开发中,难免会遇到要加锁的情景。例如扣除产品库存,首先要从数据库中取出库存,进行库存判断,再减去库存。这一波操作显著不合乎原子性,如果代码块不加锁,很容易因为并发导致超卖问题。咱们的零碎如果是单体架构,那咱们应用本地锁就能够解决问题。如果是分布式架构,就须要应用分布式锁。
计划
应用 SETNX 和 EXPIRE 命令
- SETNX key value
- EXPIRE key seconds
- DEL key
if (setnx("item_1_lock", 1)) {expire("item_1_lock", 30);
try {... 逻辑} catch {...} finally {del("item_1_lock");
}
}
这种办法看起来能够解决问题,然而有肯定的危险,因为 SETNX
和 EXPIRE
这波操作是非原子性的,如果 SETNX
胜利之后,呈现谬误,导致 EXPIRE
没有执行,导致锁没有设置超时工夫造成死锁。
针对这种状况,咱们能够应用 lua 脚本来放弃操作原子性,保障 SETNX
和 EXPIRE
两个操作要么都胜利,要么都不胜利。
if (redis.call('setnx', KEYS[1], ARGV[1]) < 1)
then return 0;
end;
redis.call('expire', KEYS[1], tonumber(ARGV[2]));
return 1;
通过这样的办法,咱们初步解决了竞争锁的原子性问题,尽管其余性能还未实现,然而应该不会造成死锁 ????????????。
Redis 2.6.12 以上可灵便应用 SET 命令
- SET key value NX EX 30
- DEL key
if (set("item_1_lock", 1, "NX", "EX", 30)) {
try {... 逻辑} catch {...} finally {del("item_1_lock");
}
}
改良后的办法不须要借助 lua 脚本就解决了 SETNX
和 EXPIRE
的原子性问题。当初咱们再认真推敲推敲,如果 A 拿到了锁顺利进入代码块执行逻辑,然而因为各种起因导致超时主动开释锁。在这之后 B 胜利拿到了锁进入代码块执行逻辑,但此时如果 A 执行逻辑结束再来开释锁,就会把 B 刚取得的锁开释了。就好比用本人家的钥匙开了别家的门,这是不可承受的。
为了解决这个问题咱们能够尝试在 SET
的时候设置一个锁标识,而后在 DEL
的时候验证以后锁是否为本人的锁。
String value = UUID.randomUUID().toString().replaceAll("-", "");
if (set("item_1_lock", value, "NX", "EX", 30)) {
try {... 逻辑} catch {...} finally {... lua 脚本保障原子性}
}
if (redis.call('get', KEYS[1]) == ARGV[1])
then return redis.call('del', KEYS[1])
else return 0
end
到这里,咱们终于解决了竞争锁的原子性问题和误删锁问题。然而锁个别还须要反对可重入、循环期待和超时主动续约等性能点。上面咱们学习应用一个十分好用的包来解决这些问题 ????????????。
入门 Redisson
Redission 的锁,实现了可重入和超时主动续约性能,它都帮咱们封装好了,咱们只有依照本人的需要调用它的 API 就能够轻松实现下面所提到的几个性能点。具体性能能够查看 Redisson 文档
在我的项目中装置 Redisson
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.2</version>
</dependency>
implementation 'org.redisson:redisson:3.13.2'
用 Maven 或者 Gradle 构建,目前最新版本为 3.13.2
,也能够在这里 Redisson 找到你须要的版本。
简略尝试
RedissonClient redissonClient = Redisson.create();
RLock lock = redissonClient.getLock("lock");
boolean res = lock.lock();
if (res) {
try {... 逻辑} finally {lock.unlock();
}
}
Redisson 将底层逻辑全副做了一个封装 ????,咱们无需关怀具体实现,几行代码就能应用一把完满的锁。上面咱们简略折腾折腾源码 ????????????。
加锁
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
// 获取以后线程 id
long threadId = Thread.currentThread().getId();
// 尝试获取锁
Long ttl = tryAcquire(leaseTime, unit, threadId);
// 获取胜利间接返回
if (ttl == null) {return;}
// 获取失败,订阅锁对应的频道
RFuture<RedissonLockEntry> future = subscribe(threadId);
if (interruptibly) {commandExecutor.syncSubscriptionInterrupted(future);
} else {commandExecutor.syncSubscription(future);
}
try {while (true) {
// 再次尝试获取锁
ttl = tryAcquire(leaseTime, unit, threadId);
// 获取胜利间接返回
if (ttl == null) {break;}
// 期待 ttl 工夫后持续获取
if (ttl >= 0) {
try {future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {if (interruptibly) {throw e;}
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {if (interruptibly) {future.getNow().getLatch().acquire();
} else {future.getNow().getLatch().acquireUninterruptibly();
}
}
}
} finally {
// 勾销频道订阅
unsubscribe(future, threadId);
}
}
获取锁
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
// 如果设置了锁过期工夫,则按一般形式获取锁
if (leaseTime != -1) {return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
// 如果没有设置锁过期工夫,则开启主动续约性能,先设置 30 秒过期工夫
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
// 有谬误间接返回
if (e != null) {return;}
// 获取锁
if (ttlRemaining == null) {
// 开启主动续约
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {internalLockLeaseTime = unit.toMillis(leaseTime);
return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
/**
* 锁不存在,应用 hincrby 创立新 hash 表以及给锁计数自增 1,并设置过期工夫
* 锁存在并且属于以后线程,给锁计数自增 1,并设置过期工夫
* 锁存在然而不属于以后线程,返回锁过期工夫
**/
"if (redis.call('exists', KEYS[1]) == 0) then" +
"redis.call('hincrby', KEYS[1], ARGV[2], 1);" +
"redis.call('pexpire', KEYS[1], ARGV[1]);" +
"return nil;" +
"end;" +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then" +
"redis.call('hincrby', KEYS[1], ARGV[2], 1);" +
"redis.call('pexpire', KEYS[1], ARGV[1]);" +
"return nil;" +
"end;" +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
删除锁
public RFuture<Void> unlockAsync(long threadId) {RPromise<Void> result = new RedissonPromise<Void>();
// 解锁逻辑
RFuture<Boolean> future = unlockInnerAsync(threadId);
future.onComplete((opStatus, e) -> {
// 勾销刷新过期工夫的定时工作
cancelExpirationRenewal(threadId);
if (e != null) {result.tryFailure(e);
return;
}
// 解锁线程和锁不是同一个线程,抛错
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id:"
+ id + "thread-id:" + threadId);
result.tryFailure(cause);
return;
}
result.trySuccess(null);
});
return result;
}
protected RFuture<Boolean> unlockInnerAsync(long threadId) {return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
/**
* 判断锁是否属于以后线程,不属于间接返回
* 锁计数减去 1,如果锁计数还大于 0,则设置过期工夫,否则开释锁并公布锁开释音讯
**/
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then" +
"return nil;" +
"end;" +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);" +
"if (counter > 0) then" +
"redis.call('pexpire', KEYS[1], ARGV[2]);" +
"return 0;" +
"else" +
"redis.call('del', KEYS[1]);" +
"redis.call('publish', KEYS[2], ARGV[1]);" +
"return 1;" +
"end;" +
"return nil;",
Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
总结
应用 Redis 做分布式锁来解决并发问题仍存在一些艰难,也有很多须要留神的点,咱们应该正确评估零碎的体量,不能为了应用某项技术而用。要齐全解决并发问题,仍须要在数据库层面做功夫。????????????