关于redis:如何用Redis实现分布式锁

11次阅读

共计 10936 个字符,预计需要花费 28 分钟才能阅读完成。

在开始提到 Redis 分布式锁之前,我想跟大家聊点 Redis 的基础知识。
说一下 Redis 的两个命令:
SETNX key value

setnx 是 SET if Not eXists(如果不存在,则 SET)的简写。

用法如图,如果不存在 set 胜利返回 int 的 1,这个 key 存在了返回 0。
SETEX key seconds value
复制代码
将值 value 关联到 key,并将 key 的生存工夫设为 seconds (以秒为单位)。
如果 key 曾经存在,setex 命令将覆写旧值。
有小伙伴必定会纳闷万一 set value 胜利 set time 失败,那不就傻了么,这啊 Redis 官网想到了。
setex 是一个原子性 (atomic) 操作,关联值和设置生存工夫两个动作会在同一时间内实现。

我设置了 10 秒的生效工夫,ttl 命令能够查看倒计时,负的阐明曾经到期了。
跟大家讲这两个命名也是有起因的,因为他们是 Redis 实现分布式锁的要害。
注释
开始前还是看看场景:

我仍然是创立了很多个线程去扣减库存 inventory,不出意外的库存扣减程序变了,最终的后果也是不对的。
单机加 synchronized 或者 Lock 这些惯例操作我就不说了好吧,后果必定是对的。

我先实现一个简略的 Redis 锁,而后咱们再实现分布式锁,可能更不便大家的了解。
还记得下面我说过的命令么,实现一个单机的其实比较简单,你们先思考一下,别往下看。
setnx

能够看到,第一个胜利了,没开释锁,前面的都失败了,至多程序问题问题是解决了,只有加锁,缩放前面的拿到,开释如此循环,就能保障依照程序执行。
然而你们也发现问题了,还是一样的,第一个仔 set 胜利了,然而忽然挂了,那锁就始终在那无奈失去开释,前面的线程也永远得不到锁,又死锁了。
所以 ….
setex
晓得我之前说这个命令的起因了吧,设置一个过期工夫,就算线程 1 挂了,也会在生效工夫到了,主动开释。
我这里就用到了 nx 和 px 的联合参数,就是 set 值并且加了过期工夫,这里我还设置了一个过期工夫,就是这工夫内如果第二个没拿到第一个的锁,就退出阻塞了,因为可能是客户端断连了。

加锁
整体加锁的逻辑比较简单,大家基本上都能看懂,不过我拿到以后工夫去减开始工夫的操作感觉有点笨,System.currentTimeMillis()耗费很大的。
/**
 * 加锁
 *
 * @param id
 * @return
 */
public boolean lock(String id) {
    Long start = System.currentTimeMillis();
    try {
        for (; ;) {
            //SET 命令返回 OK,则证实获取锁胜利
            String lock = jedis.set(LOCK_KEY, id, params);
            if (“OK”.equals(lock)) {
                return true;
            }
            // 否则循环期待,在 timeout 工夫内仍未获取到锁,则获取失败
            long l = System.currentTimeMillis() – start;
            if (l >= timeout) {
                return false;
            }
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    } finally {
        jedis.close();
    }
}

System.currentTimeMillis 耗费大,每个线程进来都这样,我之前写代码,就会在服务器启动的时候,开一个线程一直去拿,调用方间接获取值就好了,不过也不是最优解,日期类还是有很多好办法的。
@Service
public class TimeServcie {
    private static long time;
    static {
        new Thread(new Runnable(){
            @Override
            public void run() {
                while (true){
                    try {
                        Thread.sleep(5);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    long cur = System.currentTimeMillis();
                    setTime(cur);
                }
            }
        }).start();
    }

    public static long getTime() {
        return time;
    }

    public static void setTime(long time) {
        TimeServcie.time = time;
    }
}

解锁
解锁的逻辑更加简略,就是一段 Lua 的拼装,把 Key 做了删除。
你们发现没,我下面加锁解锁都用了 UUID,这就是为了保障,谁加锁了谁解锁,要是你删掉了我的锁,那不乱套了嘛。
LUA 是原子性的,也比较简单,就是判断一下 Key 和咱们参数是否相等,是的话就删除,返回胜利 1,0 就是失败。
/**
 * 解锁
 *
 * @param id
 * @return
 */
public boolean unlock(String id) {
    String script =
            “if redis.call(‘get’,KEYS[1]) == ARGV[1] then” +
                    ”   return redis.call(‘del’,KEYS[1]) ” +
                    “else” +
                    ”   return 0 ” +
                    “end”;
    try {
        String result = jedis.eval(script, Collections.singletonList(LOCK_KEY), Collections.singletonList(id)).toString();
        return “1”.equals(result) ? true : false;
    } finally {
        jedis.close();
    }
}

验证
咱们能够用咱们写的 Redis 锁试试成果,能够看到都依照程序去执行了

思考
大家是不是感觉完满了,然而下面的锁,有不少瑕疵的,我没思考很多点,你或者能够思考一下,源码我都开源到我的 GItHub 了。
而且,锁个别都是须要可重入行的,下面的线程都是执行完了就开释了,无奈再次进入了,进去也是从新加锁了,对于一个锁的设计来说必定不是很正当的。
我不打算手写,因为都有现成的,他人帮咱们写好了。
redisson
redisson 的锁,就实现了可重入了,然而他的源码比拟艰涩难懂。
应用起来很简略,因为他们底层都封装好了,你连贯上你的 Redis 客户端,他帮你做了我下面写的所有,而后更完满。
简略看看他的应用吧,跟失常应用 Lock 没啥区别。
ThreadPoolExecutor threadPoolExecutor =
        new ThreadPoolExecutor(inventory, inventory, 10L, SECONDS, linkedBlockingQueue);
long start = System.currentTimeMillis();
Config config = new Config();
config.useSingleServer().setAddress(“redis://127.0.0.1:6379”);
final RedissonClient client = Redisson.create(config);
final RLock lock = client.getLock(“lock1”);

for (int i = 0; i <= NUM; i++) {
    threadPoolExecutor.execute(new Runnable() {
        public void run() {
            lock.lock();
            inventory–;
            System.out.println(inventory);
            lock.unlock();
        }
    });
}
long end = System.currentTimeMillis();
System.out.println(“ 执行线程数:” + NUM + ”   总耗时:” + (end – start) + ”  库存数为:” + inventory);

下面能够看到我用到了 getLock,其实就是获取一个锁的实例。
RedissionLock 也没做啥,就是相熟的初始化。
public RLock getLock(String name) {
    return new RedissonLock(connectionManager.getCommandExecutor(), name);
}

public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
    super(commandExecutor, name);
    // 命令执行器
    this.commandExecutor = commandExecutor;
    //UUID 字符串
    this.id = commandExecutor.getConnectionManager().getId();
    // 外部锁过期工夫
    this.internalLockLeaseTime = commandExecutor.
                getConnectionManager().getCfg().getLockWatchdogTimeout();
    this.entryName = id + “:” + name;
}

加锁
有没有发现很多跟 Lock 很多类似的中央呢?
尝试加锁,拿到以后线程,而后我结尾说的 ttl 也看到了,是不是一切都是那么相熟?
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
    
    // 以后线程 ID
    long threadId = Thread.currentThread().getId();
    // 尝试获取锁
    Long ttl = tryAcquire(leaseTime, unit, threadId);
    // 如果 ttl 为空,则证实获取锁胜利
    if (ttl == null) {
        return;
    }
    // 如果获取锁失败,则订阅到对应这个锁的 channel
    RFuture<RedissonLockEntry> future = subscribe(threadId);
    commandExecutor.syncSubscription(future);

    try {
        while (true) {
            // 再次尝试获取锁
            ttl = tryAcquire(leaseTime, unit, threadId);
            //ttl 为空,阐明胜利获取锁,返回
            if (ttl == null) {
                break;
            }
            //ttl 大于 0 则期待 ttl 工夫后持续尝试获取
            if (ttl >= 0) {
                getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
            } else {
                getEntry(threadId).getLatch().acquire();
            }
        }
    } finally {
        // 勾销对 channel 的订阅
        unsubscribe(future, threadId);
    }
    //get(lockAsync(leaseTime, unit));
}

获取锁
获取锁的时候,也比较简单,你能够看到,他也是一直刷新过期工夫,跟我下面一直去拿以后工夫,校验过期是一个情理,只是我比拟毛糙。
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {

    // 如果带有过期工夫,则依照一般形式获取锁
    if (leaseTime != -1) {
        return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    
    // 先依照 30 秒的过期工夫来执行获取锁的办法
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(
        commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
        TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        
    // 如果还持有这个锁,则开启定时工作一直刷新该锁的过期工夫
    ttlRemainingFuture.addListener(new FutureListener<Long>() {
        @Override
        public void operationComplete(Future<Long> future) throws Exception {
            if (!future.isSuccess()) {
                return;
            }

            Long ttlRemaining = future.getNow();
            // lock acquired
            if (ttlRemaining == null) {
                scheduleExpirationRenewal(threadId);
            }
        }
    });
    return ttlRemainingFuture;
}

底层加锁逻辑
你可能会想这么多操作,在一起不是原子性不还是有问题么?
大佬们必定想得到呀,所以还是 LUA,他应用了 Hash 的数据结构。
次要是判断锁是否存在,存在就设置过期工夫,如果锁曾经存在了,那比照一下线程,线程是一个那就证实能够重入,锁在了,然而不是以后线程,证实他人还没开释,那就把剩余时间返回,加锁失败。
是不是有点绕,多了解一遍。
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit,     
                            long threadId, RedisStrictCommand<T> command) {

        // 过期工夫
        internalLockLeaseTime = unit.toMillis(leaseTime);

        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                  // 如果锁不存在,则通过 hset 设置它的值,并设置过期工夫
                  “if (redis.call(‘exists’, KEYS[1]) == 0) then ” +
                      “redis.call(‘hset’, KEYS[1], ARGV[2], 1); ” +
                      “redis.call(‘pexpire’, KEYS[1], ARGV[1]); ” +
                      “return nil; ” +
                  “end; ” +
                  // 如果锁已存在,并且锁的是以后线程,则通过 hincrby 给数值递增 1
                  “if (redis.call(‘hexists’, KEYS[1], ARGV[2]) == 1) then ” +
                      “redis.call(‘hincrby’, KEYS[1], ARGV[2], 1); ” +
                      “redis.call(‘pexpire’, KEYS[1], ARGV[1]); ” +
                      “return nil; ” +
                  “end; ” +
                  // 如果锁已存在,但并非本线程,则返回过期工夫 ttl
                  “return redis.call(‘pttl’, KEYS[1]);”,
        Collections.<Object>singletonList(getName()), 
                internalLockLeaseTime, getLockName(threadId));
    }

解锁
锁的开释次要是 publish 开释锁的信息,而后做校验,一样会判断是否以后线程,胜利就开释锁,还有个 hincrby 递加的操作,锁的值大于 0 阐明是可重入锁,那就刷新过期工夫。
如果值小于 0 了,那删掉 Key 开释锁。
是不是又和 AQS 很像了?
AQS 就是通过一个 volatile 润饰 status 去看锁的状态,也会看数值判断是否是可重入的。
所以我说代码的设计,最初就万剑归一,都是一样的。
public RFuture<Void> unlockAsync(final long threadId) {
    final RPromise<Void> result = new RedissonPromise<Void>();
    
    // 解锁办法
    RFuture<Boolean> future = unlockInnerAsync(threadId);

    future.addListener(new FutureListener<Boolean>() {
        @Override
        public void operationComplete(Future<Boolean> future) throws Exception {
            if (!future.isSuccess()) {
                cancelExpirationRenewal(threadId);
                result.tryFailure(future.cause());
                return;
            }
            // 获取返回值
            Boolean opStatus = future.getNow();
            // 如果返回空,则证实解锁的线程和以后锁不是同一个线程,抛出异样
            if (opStatus == null) {
                IllegalMonitorStateException cause = 
                    new IllegalMonitorStateException(“
                        attempt to unlock lock, not locked by current thread by node id: “
                        + id + ” thread-id: ” + threadId);
                result.tryFailure(cause);
                return;
            }
            // 解锁胜利,勾销刷新过期工夫的那个定时工作
            if (opStatus) {
                cancelExpirationRenewal(null);
            }
            result.trySuccess(null);
        }
    });

    return result;
}

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, EVAL,
    
            // 如果锁曾经不存在,公布锁开释的音讯
            “if (redis.call(‘exists’, KEYS[1]) == 0) then ” +
                “redis.call(‘publish’, KEYS[2], ARGV[1]); ” +
                “return 1; ” +
            “end;” +
            // 如果开释锁的线程和已存在锁的线程不是同一个线程,返回 null
            “if (redis.call(‘hexists’, KEYS[1], ARGV[3]) == 0) then ” +
                “return nil;” +
            “end; ” +
            // 通过 hincrby 递加 1 的形式,开释一次锁
            // 若剩余次数大于 0,则刷新过期工夫
            “local counter = redis.call(‘hincrby’, KEYS[1], ARGV[3], -1); ” +
            “if (counter > 0) then ” +
                “redis.call(‘pexpire’, KEYS[1], ARGV[2]); ” +
                “return 0; ” +
            // 否则证实锁曾经开释,删除 key 并公布锁开释的音讯
            “else ” +
                “redis.call(‘del’, KEYS[1]); ” +
                “redis.call(‘publish’, KEYS[2], ARGV[1]); ” +
                “return 1; “+
            “end; ” +
            “return nil;”,
    Arrays.<Object>asList(getName(), getChannelName()), 
        LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));

}

总结
这个写了比拟久,然而不是因为简单什么的,是因为集体工作的起因,最近事件很多嘛,还是那句话,程序员才是我的本职写文章只是个喜好,不能轻重倒置了。
大家会发现,你学懂一个技术栈之后,学新的会很快,而且也能发现他们的设计思维和技巧真的很奇妙,也总能找到类似点,和让你惊叹的点。
就拿 Doug Lea 写的 AbstractQueuedSynchronizer(AQS)来说,他写了一行代码,你可能看几天才能看懂,大佬们的思维是真的牛。
我看源码有时候也头疼,然而去谷歌一下,本人了解一下,忽然豁然开朗的时候感觉所有又很值。
学习就是一条时而郁郁寡欢,时而开环大笑的路,大家加油,咱们成长路上一起共勉。
一个在互联网得过且过的工具人。
注:如果本篇博客有任何谬误和倡议,欢送人才们留言,你快说句话啊!

正文完
 0