分布式锁2-基于Redis的实现

44次阅读

共计 3506 个字符,预计需要花费 9 分钟才能阅读完成。

分布式锁(2)- 基于 Redis 的实现

1. 使用 Redis 实现分布式锁的理由

  1. Redis 具有很高的性能;
  2. Redis 的命令对此支持较好,实现起来很方便;

2.Redis 命令介绍

SETNX

// 当且仅当 key 不存在时,set 一个 key 为 val 的字符串,返回 1;// 若 key 存在,则什么都不做,返回 0。SETNX key val;

expire

// 为 key 设置一个超时时间,单位为 second,超过这个时间锁会自动释放,避免死锁。expire key timeout;

delete

// 删除 key
delete key;

我们通过 Redis 实现分布式锁时,主要通过上面的这三个命令。

3. 分布式锁实现原理

3.1 加锁

最简单的方法是使用 setnx 命令。key 是锁的唯一标识,按业务来决定命名。比如想要给一种商品的秒杀活动加锁,可以给 key 命名为“lock_sale_商品 ID”。而 value 设置成什么呢?我们可以姑且设置成 1。加锁的伪代码如下:

setnx(lock_sale_商品 ID, 1)

当一个线程执行 setnx 返回 1,说明 key 原本不存在,该线程成功得到了锁;当一个线程执行 setnx 返回 0,说明 key 已经存在,该线程抢锁失败。

3.2 解锁

有加锁就得有解锁。当得到锁的线程执行完任务,需要释放锁,以便其他线程可以进入。释放锁的最简单方式是执行 del 指令,伪代码如下:

del(lock_sale_商品 ID)

释放锁之后,其他线程就可以继续执行 setnx 命令来获得锁。

3.3 锁超时

锁超时是什么意思呢?如果一个得到锁的线程在执行任务的过程中挂掉,来不及显式地释放锁,这块资源将会永远被锁住(死锁),别的线程再也别想进来。所以,setnxkey 必须设置一个超时时间,以保证即使没有被显式释放,这把锁也要在一定时间后自动释放。setnx 不支持超时参数,所以需要额外的指令,伪代码如下:

expire(lock_sale_商品 ID, 30)

综合伪代码如下:

if(setnx(lock_sale_商品 ID,1) == 1){expire(lock_sale_商品 ID,30)
    try {do something ......} finally {del(lock_sale_商品 ID)
    }
}

4. 存在的问题

以上伪代码中存在三个致命问题

4.1 setnxexpire 的非原子性

设想一个极端场景,当某线程执行 setnx,成功得到了锁:

setnx 刚执行成功,还未来得及执行 expire 指令,节点 1 挂掉了。

这样一来,这把锁就没有设置过期时间,变成 死锁,别的线程再也无法获得锁了。

怎么解决呢?setnx 指令本身是不支持传入超时时间的,set 指令增加了可选参数,伪代码如下:

set(lock_sale_商品 ID,1,30,NX)

这样就可以取代 setnx 指令。

4.2 del 导致误删

又是一个极端场景,假如某线程成功得到了锁,并且设置的超时时间是 30 秒。

如果某些原因导致线程 A 执行的很慢很慢,过了 30 秒都没执行完,这时候锁过期自动释放,线程 B 得到了锁。

随后,线程 A 执行完了任务,线程 A 接着执行 del 指令来释放锁。但这时候线程 B 还没执行完,线程 A 实际上 删除的是线程 B 加的锁

怎么避免这种情况呢?可以在 del 释放锁之前做一个判断,验证当前的锁是不是自己加的锁。至于具体的实现,可以在加锁的时候把当前的线程 ID 当做 value,并在删除之前验证 key 对应的 value 是不是自己线程的 ID。

加锁:

String threadId = Thread.currentThread().getId()
set(key,threadId,30,NX)

解锁:

if(threadId .equals(redisClient.get(key))){del(key)
}

但是,这样做又隐含了一个新的问题,判断和释放锁是两个独立操作,不是原子性。

4.3 出现并发的可能性

还是刚才第二点所描述的场景,虽然我们避免了线程 A 误删掉 key 的情况,但是同一时间有 A,B 两个线程在访问代码块,仍然是不完美的。怎么办呢?我们可以让获得锁的线程开启一个 守护线程,用来给快要过期的锁“续航”。

当过去了 29 秒,线程 A 还没执行完,这时候守护线程会执行 expire 指令,为这把锁“续命”20 秒。守护线程从第 29 秒开始执行,每 20 秒执行一次。

当线程 A 执行完任务,会显式关掉守护线程。

另一种情况,如果节点 1 忽然断电,由于线程 A 和守护线程在同一个进程,守护线程也会停下。这把锁到了超时的时候,没人给它续命,也就自动释放了。

5. 代码实现

public class DistributedLock {

    private final JedisPool jedisPool;
    private final static String KEY_PREF = "lock:"; // 锁的前缀

    public DistributedLock(JedisPool jedisPool) {this.jedisPool = jedisPool;}

    /**
     * 加锁
     *
     * @param lockName       String 锁的名称(key)
     * @param acquireTimeout long 获取超时时间
     * @param timeout        long 锁的超时时间
     * @return 锁标识
     */
    public String lockWithTimeout(String lockName, long acquireTimeout, long timeout) {
        Jedis conn = null;

        try {
            // 获取连接
            conn = jedisPool.getResource();
            // 随机生成一个 value
            String identifier = UUID.randomUUID().toString();
            // 锁名, 即 key 值
            String lockKey = KEY_PREF + lockName;
            // 超时时间, 上锁后超过此时间则自动释放锁
            int lockExpire = (int) (timeout / 1000);

            // 获取锁的超时时间, 超过这个时间则放弃获取锁
            long end = System.currentTimeMillis() + acquireTimeout;
            while (System.currentTimeMillis() < end) {if (conn.setnx(lockKey, identifier) == 1) {conn.expire(lockKey, lockExpire);
                    // 返回 value 值, 用于释放锁时间确认
                    return identifier;
                }

                // 返回 - 1 代表 key 没有设置超时时间, 为 key 设置一个超时时间
                if (conn.ttl(lockKey) == -1) {conn.expire(lockKey, lockExpire);
                }

                try {Thread.sleep(10);
                } catch (InterruptedException ie) {Thread.currentThread().interrupt();}
            }
        } catch (JedisException e) {e.printStackTrace();
        } finally {if (conn != null) {conn.close();
            }
        }
        return null;
    }

    /**
     * 释放锁
     *
     * @param lockName   String 锁 key
     * @param identifier String 释放锁的标识
     * @return boolean
     */
    public boolean releaseLock(String lockName, String identifier) {
        Jedis conn = null;
        String lockKey = KEY_PREF + lockName;
        boolean retFlag = false;
        try {conn = jedisPool.getResource();
            while (true) {
                // 监视 lock, 准备开始事务
                conn.watch(lockKey);
                // 通过前面返回的 value 值判断是不是该锁, 若时该锁, 则删除释放锁
                if (identifier.equals(conn.get(lockKey))) {Transaction transaction = conn.multi();
                    transaction.del(lockKey);
                    List<Object> results = transaction.exec();
                    if (results == null) continue;

                    retFlag = true;
                }

                conn.unwatch();
                break;
            }
        } catch (Exception e) {e.printStackTrace();
        } finally {if (conn != null) {conn.close();
            }
        }
        return retFlag;
    }
}

正文完
 0