关于数据库:并发场景下的幂等问题分布式锁详解

8次阅读

共计 8449 个字符,预计需要花费 22 分钟才能阅读完成。

简介:本文从钉钉实人认证场景的一例数据反复问题登程,剖析了其起因是因为并发导致幂等生效,引出幂等的概念。针对并发场景下的幂等问题,提出了一种实现幂等可行的方法论,联合通讯录加人业务场景对数据库幂等问题进行了简略剖析,就分布式锁实现幂等办法开展了具体探讨。剖析了锁在分布式场景下存在的问题,包含单点故障、网络超时、谬误开释别人锁、提前开释锁以及分布式锁单点故障等,提出了对应的解决方案,介绍了对应计划的具体实现。

作者 | 百书
起源 | 阿里技术公众号

写在后面:本文探讨的幂等问题,均为并发场景下的幂等问题。即零碎本存在幂等设计,然而在并发场景下生效了。

一 摘要

本文从钉钉实人认证场景的一例数据反复问题登程,剖析了其起因是因为并发导致幂等生效,引出幂等的概念。

针对并发场景下的幂等问题,提出了一种实现幂等可行的方法论,联合通讯录加人业务场景对数据库幂等问题进行了简略剖析,就分布式锁实现幂等办法开展了具体探讨。

剖析了锁在分布式场景下存在的问题,包含单点故障、网络超时、谬误开释别人锁、提前开释锁以及分布式锁单点故障等,提出了对应的解决方案,介绍了对应计划的具体实现。

二 问题

钉钉实人认证业务存在数据反复的问题。

1 问题景象

失常状况下,数据库中应该只有一条实人认证胜利记录,然而实际上某用户有多条。

2 问题起因

并发导致了不幂等。

咱们先来回顾一下幂等的概念:

  • 幂等(idempotent、idempotence)是一个数学与计算机学概念,常见于抽象代数中。
  • 在编程中一个幂等操作的特点是其任意屡次执行所产生的影响均与一次执行的影响雷同。
  • – 来自百度百科

实人认证在业务上有幂等设计,其个别流程为:

1)用户抉择实人认证后会在服务端初始化一条记录;
2)用户在钉钉挪动端依照批示实现人脸比对;
3)比对实现后拜访服务端批改数据库状态。

在第 3 步中,在批改数据库状态之前,会判断「是否曾经初始化」、「是否曾经实人认证」以及「智科是否返回认证胜利」以保障幂等。仅当申请首次拜访服务端尝试批改数据库状态时,能力满足幂等的判断条件并批改数据库状态。其余任意次申请将间接返回,对数据库状态无影响。申请屡次拜访服务端所产生的后果,和申请首次拜访服务端统一。因而,在实人认证胜利的前提下,数据库该当有且仅有一条认证胜利的记录。

然而在理论过程中咱们发现,同一个申请会屡次批改数据库状态,零碎并未依照咱们预期的那样实现幂等。究其原因,是因为申请并发拜访,在首次申请实现批改服务端状态前,并发的其余申请和首次申请都通过了幂等判断,对数据库状态进行了屡次批改。

并发导致了原幂等设计生效。

并发导致了不幂等。

三 解决方案

解决并发场景下幂等问题的要害,是找到唯一性束缚,执行唯一性查看,雷同的数据保留一次,雷同的申请操作一次。

一次拜访服务端的申请,可能产生以下几种交互:

  • 与数据源交互,例如数据库状态变更等;
  • 与其余业务零碎交互,例如调用上游服务或发送音讯等;

一次申请能够只蕴含一次交互,也能够蕴含屡次交互。例如一次申请能够仅仅批改一次数据库状态,也能够在批改数据库状态后再发送一条数据库状态批改胜利的音讯。

于是咱们能够得出一个论断:并发场景下,如果一个零碎依赖的组件幂等,那么该零碎在人造幂等。

以数据库为例,如果一个申请对数据造成的影响是新增一条数据,那么惟一索引能够是幂等问题的解法。数据库会帮忙咱们执行唯一性查看,雷同数据不会反复落库。

钉钉通讯录加人就是通过数据库的惟一索引解决了幂等问题。以钉钉通讯录加人为例,在向数据库写数据之前,会先判断数据是否曾经存在于数据库之中,如果不存在,加人申请最终会向数据库的员工表插入一条数据。大量雷同的并发的通讯录加人申请让零碎的幂等设计生效成为可能。在一次加人申请中,(组织 ID,工号)能够惟一标记一个申请,在数据库中,也存在(组织 ID,工号)的惟一索引。因而咱们能够保障,屡次雷同的加人申请,只会批改一次数据库状态,即增加一条记录。

如果所依赖的组件人造幂等,那么问题就简略了,然而理论状况往往更加简单。并发场景下,如果零碎依赖的组件无奈幂等,咱们就须要应用额定的伎俩实现幂等。

一个罕用的伎俩就是应用分布式锁。分布式锁的实现形式有很多,比拟罕用的是缓存式分布式锁。

四 分布式锁

在 What is a Java distributed lock? 中有这样几段话:

  • In computer science, locks are mechanisms in a multithreaded environment to prevent different threads from operating on the same resource. When using locking, a resource is “locked” for access by a specific thread, and can only be accessed by a different thread once the resource has been released. Locks have several benefits: they stop two threads from doing the same work, and they prevent errors and data corruption when two threads try to use the same resource simultaneously.
  • Distributed locks in Java are locks that can work with not only multiple threads running on the same machine, but also threads running on clients on different machines in a distributed system. The threads on these separate machines must communicate and coordinate to make sure that none of them try to access a resource that has been locked up by another.

这几段话通知咱们,锁的实质是共享资源的互斥拜访,分布式锁解决了分布式系统中共享资源的互斥拜访的问题。

java.util.concurrent.locks 包提供了丰盛的锁实现,包含偏心锁 / 非偏心锁,阻塞锁 / 非阻塞锁,读写锁以及可重入锁等。

咱们要如何实现一个分布式锁呢?

计划一

分布式系统中常见有两个问题:

1)单点故障问题,即当持有锁的利用产生单点故障时,锁将被长期有效占有;
2)网络超时问题,即当客户端产生网络超时但实际上锁胜利时,咱们无奈再次正确的获取锁。

要解决问题 1,一个简略的计划是引入过期工夫(lease time),对锁的持有将是有时效的,当利用产生单点故障时,被其持有的锁能够主动开释。

要解决问题 2,一个简略的计划是反对可重入,咱们为每个获取锁的客户端都配置一个不会反复的身份标识(通常是 UUID),上锁胜利后锁将带有该客户端的身份标识。当实际上锁胜利而客户端超时重试时,咱们能够判断锁已被该客户端持有而返回胜利。

综上咱们给出了一个 lease-based distribute lock 计划。出于性能考量,应用缓存作为锁的存储介质,利用 MVCC(Multiversion concurrency control)机制解决共享资源互斥拜访问题,具体实现可见附录代码。

分布式锁的个别应用形式如下

● 初始化分布式锁的工厂
● 利用工厂生成一个分布式锁实例
● 应用该分布式实例上锁和解锁操作

@Test
public void testTryLock() {

    // 初始化工厂
    MdbDistributeLockFactory mdbDistributeLockFactory = new MdbDistributeLockFactory();
    mdbDistributeLockFactory.setNamespace(603);
    mdbDistributeLockFactory.setMtairManager(new MultiClusterTairManager());

    // 取得锁
    DistributeLock lock = mdbDistributeLockFactory.getLock("TestLock");

    // 上锁解锁操作
    boolean locked = lock.tryLock();
    if (!locked) {return;}
    try {//do something} finally {lock.unlock();
    }
}

该计划简略易用,然而问题也很显著。例如,开释锁的时候只是简略的将缓存中的 key 生效,所以存在谬误开释别人已持有锁问题。所幸只有锁的租期设置的足够长,该问题呈现几率就足够小。

咱们借用 Martin Kleppmann 在文章 How to do distributed locking 中的一张图阐明该问题。

构想一种状况,当占有锁的 Client 1 在开释锁之前,锁就曾经到期了,Client 2 将获取锁,此时锁被 Client 2 持有,然而 Client 1 可能会谬误的将其开释。一个更优良的计划,咱们给每个锁都设置一个身份标识,在开释锁的时候,1)首先查问锁是否是本人的,2)如果是本人的则开释锁。受限于实现形式,步骤 1 和步骤 2 不是原子操作,在步骤 1 和步骤 2 之间,如果锁到期被其余客户端获取,此时也会谬误的开释别人的锁。

计划二

借助 Redis 的 Lua 脚本,能够完满的解决存在谬误开释别人已持有锁问题的。在 Distributed locks with Redis 这篇文章的 Correct implementation with a single instance 这一节中,咱们能够失去咱们想要的答案——如何实现一个分布式锁。

当咱们想要获取锁时,咱们能够执行如下办法

SET resource_name my_random_value NX PX 30000

当咱们想要开释锁时,咱们能够执行如下的 Lua 脚本

if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1])
else
    return 0
end

计划三

在计划一和计划二的探讨过程中,有一个问题被咱们重复提及:锁的主动开释。

这是一把双刃剑:

1)一方面它很好的解决了持有锁的客户端单点故障的问题
2)另一方面,如果锁提前开释,就会呈现锁的谬误持有状态

这个时候,咱们能够引入 Watch Dog 主动续租机制,咱们能够参考以下 Redisson 是如何实现的。

在上锁胜利后,Redisson 会调用 renewExpiration()办法开启一个 Watch Dog 线程,为锁主动续期。每过 1 / 3 工夫续一次,胜利则持续下一次续期,失败勾销续期操作。

咱们能够再看看 Redisson 是如何续期的。renewExpiration()办法的第 17 行 renewExpirationAsync()办法是执行锁续期的要害操作,咱们进入到办法外部,能够看到 Redisson 也是应用 Lua 脚本进行锁续租的:1)判断锁是否存在,2)如果存在则重置过期工夫。

private void renewExpiration() {ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (ee == null) {return;}

    Timeout task = commandExecutor.getConnectionManager().newTimeout(timeout -> {ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
        if (ent == null) {return;}
        Long threadId = ent.getFirstThreadId();
        if (threadId == null) {return;}

        RFuture<Boolean> future = renewExpirationAsync(threadId);
        future.onComplete((res, e) -> {if (e != null) {log.error("Can't update lock "+ getRawName() +" expiration", e);
                EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                return;
            }

            if (res) {
                // reschedule itself
                renewExpiration();} else {cancelExpirationRenewal(null);
            }
        });
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

    ee.setTimeout(task);
}
protected RFuture<Boolean> renewExpirationAsync(long threadId) {return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                          "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then" +
                          "redis.call('pexpire', KEYS[1], ARGV[1]);" +
                          "return 1;" +
                          "end;" +
                          "return 0;",
                          Collections.singletonList(getRawName()),
                          internalLockLeaseTime, getLockName(threadId));
}

计划四

借助 Redisson 的主动续期机制,咱们无需再放心锁的主动开释。然而探讨到这里,我还是不得不面对一个问题:分布式锁自身不是一个分布式应用。当 Redis 服务器故障无奈失常工作时,整个分布式锁也就无奈提供服务。

更进一步,咱们能够看看 Distributed locks with Redis 这篇文章中提到的 Redlock 算法及其实现。

Redlock 算法不是银弹,对于它的好与坏,也有很多争执:

How to do distributed locking:
https://martin.kleppmann.com/…

Is Redlock safe?:
http://antirez.com/news/101

Martin Kleppmann 和 Antirez 对于 Redlock 的辩论:
https://news.ycombinator.com/…

参考资料

What is a Java distributed lock?
https://redisson.org/glossary…

Distributed locks and synchronizers:
https://github.com/redisson/r…

Distributed locks with Redis:
https://redis.io/topics/distl…

附录

分布式锁

public class MdbDistributeLock implements DistributeLock {

    /**
     * 锁的命名空间
     */
    private final int namespace;

    /**
     * 锁对应的缓存 key
     */
    private final String lockName;

    /**
     * 锁的惟一标识,保障可重入,以应答 put 胜利,然而返回超时的状况
     */
    private final String lockId;

    /**
     * 是否持有锁。true:是
     */
    private boolean locked;

    /**
     * 缓存实例
     */
    private final TairManager tairManager;

    public MdbDistributeLock(TairManager tairManager, int namespace, String lockCacheKey) {

        this.tairManager = tairManager;
        this.namespace = namespace;
        this.lockName = lockCacheKey;
        this.lockId = UUID.randomUUID().toString();
    }

    @Override
    public boolean tryLock() {

        try {
            // 获取锁状态
            Result<DataEntry> getResult = null;
            ResultCode getResultCode = null;
            for (int cnt = 0; cnt < DEFAULT_RETRY_TIMES; cnt++) {getResult = tairManager.get(namespace, lockName);
                getResultCode = getResult == null ? null : getResult.getRc();
                if (noNeedRetry(getResultCode)) {break;}
            }

            // 重入,已持有锁,返回胜利
            if (ResultCode.SUCCESS.equals(getResultCode)
                && getResult.getValue() != null && lockId.equals(getResult.getValue().getValue())) {
                locked = true;
                return true;
            }

            // 不可获取锁,返回失败
            if (!ResultCode.DATANOTEXSITS.equals(getResultCode)) {log.error("tryLock fail code={} lock={} traceId={}", getResultCode, this, EagleEye.getTraceId());
                return false;
            }

            // 尝试获取锁
            ResultCode putResultCode = null;
            for (int cnt = 0; cnt < DEFAULT_RETRY_TIMES; cnt++) {
                putResultCode = tairManager.put(namespace, lockName, lockId, MDB_CACHE_VERSION,
                    DEFAULT_EXPIRE_TIME_SEC);
                if (noNeedRetry(putResultCode)) {break;}
            }
            if (!ResultCode.SUCCESS.equals(putResultCode)) {log.error("tryLock fail code={} lock={} traceId={}", getResultCode, this, EagleEye.getTraceId());
                return false;
            }
            locked = true;
            return true;

        } catch (Exception e) {log.error("DistributedLock.tryLock fail lock={}", this, e);
        }
        return false;
    }

    @Override
    public void unlock() {if (!locked) {return;}
        ResultCode resultCode = tairManager.invalid(namespace, lockName);
        if (!resultCode.isSuccess()) {log.error("DistributedLock.unlock fail lock={} resultCode={} traceId={}", this, resultCode,
                EagleEye.getTraceId());
        }
        locked = false;
    }

    /**
     * 判断是否须要重试
     *
     * @param resultCode 缓存的返回码
     * @return true:不必重试
     */
    private boolean noNeedRetry(ResultCode resultCode) {return resultCode != null && !ResultCode.CONNERROR.equals(resultCode) && !ResultCode.TIMEOUT.equals(resultCode) && !ResultCode.UNKNOW.equals(resultCode);
    }

}

分布式锁工厂

public class MdbDistributeLockFactory implements DistributeLockFactory {

    /**
     * 缓存的命名空间
     */
    @Setter
    private int namespace;

    @Setter
    private MultiClusterTairManager mtairManager;

    @Override
    public DistributeLock getLock(String lockName) {return new MdbDistributeLock(mtairManager, namespace, lockName);
    }
}

原文链接
本文为阿里云原创内容,未经容许不得转载。

正文完
 0