共计 3506 个字符,预计需要花费 9 分钟才能阅读完成。
分布式锁(2)- 基于 Redis 的实现
1. 使用 Redis 实现分布式锁的理由
- Redis 具有很高的性能;
- Redis 的命令对此支持较好,实现起来很方便;
2.Redis 命令介绍
SETNX
// 当且仅当 key 不存在时,set 一个 key 为 val 的字符串,返回 1;// 若 key 存在,则什么都不做,返回 0。SETNX key val;
expire
// 为 key 设置一个超时时间,单位为 second,超过这个时间锁会自动释放,避免死锁。expire key timeout;
delete
// 删除 key
delete key;
我们通过 Redis 实现分布式锁时,主要通过上面的这三个命令。
3. 分布式锁实现原理
3.1 加锁
最简单的方法是使用 setnx
命令。key
是锁的唯一标识,按业务来决定命名。比如想要给一种商品的秒杀活动加锁,可以给 key
命名为“lock_sale_商品 ID”。而 value
设置成什么呢?我们可以姑且设置成 1
。加锁的伪代码如下:
setnx(lock_sale_商品 ID, 1)
当一个线程执行 setnx
返回 1
,说明 key
原本不存在,该线程成功得到了锁;当一个线程执行 setnx
返回 0
,说明 key
已经存在,该线程抢锁失败。
3.2 解锁
有加锁就得有解锁。当得到锁的线程执行完任务,需要释放锁,以便其他线程可以进入。释放锁的最简单方式是执行 del
指令,伪代码如下:
del(lock_sale_商品 ID)
释放锁之后,其他线程就可以继续执行 setnx
命令来获得锁。
3.3 锁超时
锁超时是什么意思呢?如果一个得到锁的线程在执行任务的过程中挂掉,来不及显式地释放锁,这块资源将会永远被锁住(死锁),别的线程再也别想进来。所以,setnx
的 key
必须设置一个超时时间,以保证即使没有被显式释放,这把锁也要在一定时间后自动释放。setnx
不支持超时参数,所以需要额外的指令,伪代码如下:
expire(lock_sale_商品 ID, 30)
综合伪代码如下:
if(setnx(lock_sale_商品 ID,1) == 1){expire(lock_sale_商品 ID,30)
try {do something ......} finally {del(lock_sale_商品 ID)
}
}
4. 存在的问题
以上伪代码中存在三个致命问题
4.1 setnx
和 expire
的非原子性
设想一个极端场景,当某线程执行 setnx
,成功得到了锁:
setnx
刚执行成功,还未来得及执行 expire
指令,节点 1 挂掉了。
这样一来,这把锁就没有设置过期时间,变成 死锁,别的线程再也无法获得锁了。
怎么解决呢?setnx
指令本身是不支持传入超时时间的,set
指令增加了可选参数,伪代码如下:
set(lock_sale_商品 ID,1,30,NX)
这样就可以取代 setnx
指令。
4.2 del
导致误删
又是一个极端场景,假如某线程成功得到了锁,并且设置的超时时间是 30 秒。
如果某些原因导致线程 A 执行的很慢很慢,过了 30 秒都没执行完,这时候锁过期自动释放,线程 B 得到了锁。
随后,线程 A 执行完了任务,线程 A 接着执行 del
指令来释放锁。但这时候线程 B 还没执行完,线程 A 实际上 删除的是线程 B 加的锁
。
怎么避免这种情况呢?可以在 del
释放锁之前做一个判断,验证当前的锁是不是自己加的锁。至于具体的实现,可以在加锁的时候把当前的线程 ID 当做 value
,并在删除之前验证 key
对应的 value
是不是自己线程的 ID。
加锁:
String threadId = Thread.currentThread().getId()
set(key,threadId,30,NX)
解锁:
if(threadId .equals(redisClient.get(key))){del(key)
}
但是,这样做又隐含了一个新的问题,判断和释放锁是两个独立操作,不是原子性。
4.3 出现并发的可能性
还是刚才第二点所描述的场景,虽然我们避免了线程 A 误删掉 key
的情况,但是同一时间有 A,B 两个线程在访问代码块,仍然是不完美的。怎么办呢?我们可以让获得锁的线程开启一个 守护线程,用来给快要过期的锁“续航”。
当过去了 29 秒,线程 A 还没执行完,这时候守护线程会执行 expire
指令,为这把锁“续命”20 秒。守护线程从第 29 秒开始执行,每 20 秒执行一次。
当线程 A 执行完任务,会显式关掉守护线程。
另一种情况,如果节点 1 忽然断电,由于线程 A 和守护线程在同一个进程,守护线程也会停下。这把锁到了超时的时候,没人给它续命,也就自动释放了。
5. 代码实现
public class DistributedLock {
private final JedisPool jedisPool;
private final static String KEY_PREF = "lock:"; // 锁的前缀
public DistributedLock(JedisPool jedisPool) {this.jedisPool = jedisPool;}
/**
* 加锁
*
* @param lockName String 锁的名称(key)
* @param acquireTimeout long 获取超时时间
* @param timeout long 锁的超时时间
* @return 锁标识
*/
public String lockWithTimeout(String lockName, long acquireTimeout, long timeout) {
Jedis conn = null;
try {
// 获取连接
conn = jedisPool.getResource();
// 随机生成一个 value
String identifier = UUID.randomUUID().toString();
// 锁名, 即 key 值
String lockKey = KEY_PREF + lockName;
// 超时时间, 上锁后超过此时间则自动释放锁
int lockExpire = (int) (timeout / 1000);
// 获取锁的超时时间, 超过这个时间则放弃获取锁
long end = System.currentTimeMillis() + acquireTimeout;
while (System.currentTimeMillis() < end) {if (conn.setnx(lockKey, identifier) == 1) {conn.expire(lockKey, lockExpire);
// 返回 value 值, 用于释放锁时间确认
return identifier;
}
// 返回 - 1 代表 key 没有设置超时时间, 为 key 设置一个超时时间
if (conn.ttl(lockKey) == -1) {conn.expire(lockKey, lockExpire);
}
try {Thread.sleep(10);
} catch (InterruptedException ie) {Thread.currentThread().interrupt();}
}
} catch (JedisException e) {e.printStackTrace();
} finally {if (conn != null) {conn.close();
}
}
return null;
}
/**
* 释放锁
*
* @param lockName String 锁 key
* @param identifier String 释放锁的标识
* @return boolean
*/
public boolean releaseLock(String lockName, String identifier) {
Jedis conn = null;
String lockKey = KEY_PREF + lockName;
boolean retFlag = false;
try {conn = jedisPool.getResource();
while (true) {
// 监视 lock, 准备开始事务
conn.watch(lockKey);
// 通过前面返回的 value 值判断是不是该锁, 若时该锁, 则删除释放锁
if (identifier.equals(conn.get(lockKey))) {Transaction transaction = conn.multi();
transaction.del(lockKey);
List<Object> results = transaction.exec();
if (results == null) continue;
retFlag = true;
}
conn.unwatch();
break;
}
} catch (Exception e) {e.printStackTrace();
} finally {if (conn != null) {conn.close();
}
}
return retFlag;
}
}