关于java:实现一个-Redis-分布式锁

6次阅读

共计 5324 个字符,预计需要花费 14 分钟才能阅读完成。

前言

在咱们日常开发中,难免会遇到要加锁的情景。例如扣除产品库存,首先要从数据库中取出库存,进行库存判断,再减去库存。这一波操作显著不合乎原子性,如果代码块不加锁,很容易因为并发导致超卖问题。咱们的零碎如果是单体架构,那咱们应用本地锁就能够解决问题。如果是分布式架构,就须要应用分布式锁。

计划

应用 SETNX 和 EXPIRE 命令

  • SETNX key value
  • EXPIRE key seconds
  • DEL key
if (setnx("item_1_lock", 1)) {expire("item_1_lock", 30);
    try {... 逻辑} catch {...} finally {del("item_1_lock");
    }
}

这种办法看起来能够解决问题,然而有肯定的危险,因为 SETNXEXPIRE 这波操作是非原子性的,如果 SETNX 胜利之后,呈现谬误,导致 EXPIRE 没有执行,导致锁没有设置超时工夫造成死锁。

针对这种状况,咱们能够应用 lua 脚本来放弃操作原子性,保障 SETNXEXPIRE 两个操作要么都胜利,要么都不胜利。

if (redis.call('setnx', KEYS[1], ARGV[1]) < 1)
then return 0;
end;
redis.call('expire', KEYS[1], tonumber(ARGV[2]));
return 1;

通过这样的办法,咱们初步解决了竞争锁的原子性问题,尽管其余性能还未实现,然而应该不会造成死锁 ????????????。

Redis 2.6.12 以上可灵便应用 SET 命令

  • SET key value NX EX 30
  • DEL key
if (set("item_1_lock", 1, "NX", "EX", 30)) {
    try {... 逻辑} catch {...} finally {del("item_1_lock");
    }
}

改良后的办法不须要借助 lua 脚本就解决了 SETNXEXPIRE 的原子性问题。当初咱们再认真推敲推敲,如果 A 拿到了锁顺利进入代码块执行逻辑,然而因为各种起因导致超时主动开释锁。在这之后 B 胜利拿到了锁进入代码块执行逻辑,但此时如果 A 执行逻辑结束再来开释锁,就会把 B 刚取得的锁开释了。就好比用本人家的钥匙开了别家的门,这是不可承受的。

为了解决这个问题咱们能够尝试在 SET 的时候设置一个锁标识,而后在 DEL 的时候验证以后锁是否为本人的锁。

String value = UUID.randomUUID().toString().replaceAll("-", "");
if (set("item_1_lock", value, "NX", "EX", 30)) {
    try {... 逻辑} catch {...} finally {... lua 脚本保障原子性}
}
if (redis.call('get', KEYS[1]) == ARGV[1])
then return redis.call('del', KEYS[1])
else return 0
end

到这里,咱们终于解决了竞争锁的原子性问题和误删锁问题。然而锁个别还须要反对可重入、循环期待和超时主动续约等性能点。上面咱们学习应用一个十分好用的包来解决这些问题 ????????????。

入门 Redisson

Redission 的锁,实现了可重入和超时主动续约性能,它都帮咱们封装好了,咱们只有依照本人的需要调用它的 API 就能够轻松实现下面所提到的几个性能点。具体性能能够查看 Redisson 文档

在我的项目中装置 Redisson

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.13.2</version>
</dependency>
implementation 'org.redisson:redisson:3.13.2'

用 Maven 或者 Gradle 构建,目前最新版本为 3.13.2,也能够在这里 Redisson 找到你须要的版本。

简略尝试

RedissonClient redissonClient = Redisson.create();
RLock lock = redissonClient.getLock("lock");
boolean res = lock.lock();
if (res) {
   try {... 逻辑} finally {lock.unlock();
   }
}

Redisson 将底层逻辑全副做了一个封装 ????,咱们无需关怀具体实现,几行代码就能应用一把完满的锁。上面咱们简略折腾折腾源码 ????????????。

加锁

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
    // 获取以后线程 id
    long threadId = Thread.currentThread().getId();

    // 尝试获取锁
    Long ttl = tryAcquire(leaseTime, unit, threadId);

    // 获取胜利间接返回
    if (ttl == null) {return;}

    // 获取失败,订阅锁对应的频道
    RFuture<RedissonLockEntry> future = subscribe(threadId);


    if (interruptibly) {commandExecutor.syncSubscriptionInterrupted(future);
    } else {commandExecutor.syncSubscription(future);
    }

    try {while (true) {
            // 再次尝试获取锁
            ttl = tryAcquire(leaseTime, unit, threadId);
            // 获取胜利间接返回
            if (ttl == null) {break;}

            // 期待 ttl 工夫后持续获取
            if (ttl >= 0) {
                try {future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {if (interruptibly) {throw e;}
                    future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                }
            } else {if (interruptibly) {future.getNow().getLatch().acquire();
                } else {future.getNow().getLatch().acquireUninterruptibly();
                }
            }
        }
    } finally {
        // 勾销频道订阅
        unsubscribe(future, threadId);
    }
}

获取锁

private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
    // 如果设置了锁过期工夫,则按一般形式获取锁
    if (leaseTime != -1) {return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    
    // 如果没有设置锁过期工夫,则开启主动续约性能,先设置 30 秒过期工夫
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
        // 有谬误间接返回
        if (e != null) {return;}

        // 获取锁
        if (ttlRemaining == null) {
            // 开启主动续约
            scheduleExpirationRenewal(threadId);
        }
    });
    return ttlRemainingFuture;
}
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {internalLockLeaseTime = unit.toMillis(leaseTime);

    return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
            /**
             * 锁不存在,应用 hincrby 创立新 hash 表以及给锁计数自增 1,并设置过期工夫
             * 锁存在并且属于以后线程,给锁计数自增 1,并设置过期工夫
             * 锁存在然而不属于以后线程,返回锁过期工夫
             **/
            "if (redis.call('exists', KEYS[1]) == 0) then" +
                    "redis.call('hincrby', KEYS[1], ARGV[2], 1);" +
                    "redis.call('pexpire', KEYS[1], ARGV[1]);" +
                    "return nil;" +
                    "end;" +
                    "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then" +
                    "redis.call('hincrby', KEYS[1], ARGV[2], 1);" +
                    "redis.call('pexpire', KEYS[1], ARGV[1]);" +
                    "return nil;" +
                    "end;" +
                    "return redis.call('pttl', KEYS[1]);",
            Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

删除锁

public RFuture<Void> unlockAsync(long threadId) {RPromise<Void> result = new RedissonPromise<Void>();
    
    // 解锁逻辑
    RFuture<Boolean> future = unlockInnerAsync(threadId);

    future.onComplete((opStatus, e) -> {
        // 勾销刷新过期工夫的定时工作
        cancelExpirationRenewal(threadId);
        
        if (e != null) {result.tryFailure(e);
            return;
        }
        
        // 解锁线程和锁不是同一个线程,抛错
        if (opStatus == null) {
            IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id:"
                    + id + "thread-id:" + threadId);
            result.tryFailure(cause);
            return;
        }

        result.trySuccess(null);
    });

    return result;
}
protected RFuture<Boolean> unlockInnerAsync(long threadId) {return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            /**
             * 判断锁是否属于以后线程,不属于间接返回
             * 锁计数减去 1,如果锁计数还大于 0,则设置过期工夫,否则开释锁并公布锁开释音讯
             **/
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then" +
                    "return nil;" +
                    "end;" +
                    "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);" +
                    "if (counter > 0) then" +
                    "redis.call('pexpire', KEYS[1], ARGV[2]);" +
                    "return 0;" +
                    "else" +
                    "redis.call('del', KEYS[1]);" +
                    "redis.call('publish', KEYS[2], ARGV[1]);" +
                    "return 1;" +
                    "end;" +
                    "return nil;",
            Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

总结

应用 Redis 做分布式锁来解决并发问题仍存在一些艰难,也有很多须要留神的点,咱们应该正确评估零碎的体量,不能为了应用某项技术而用。要齐全解决并发问题,仍须要在数据库层面做功夫。????????????

正文完
 0