乐趣区

关于nosql:图解Janusgraph系列并发安全锁机制本地锁分布式锁分析

图解 Janusgraph 系列 - 并发平安:锁机制(本地锁 + 分布式锁)剖析

大家好,我是 洋仔 ,JanusGraph 图解系列文章, 实时更新~

图数据库文章总目录:
  • 整顿所有图相干文章,请移步 (超链): 图数据库系列 - 文章总目录
  • 地址:https://liyangyang.blog.csdn.net/article/details/111031257

源码剖析相干可查看 github(码文不易,求个 star~):https://github.com/YYDreamer/janusgraph

下述流程高清大图地址:https://www.processon.com/view/link/5f471b2e7d9c086b9903b629

版本:JanusGraph-0.5.2

转载文章请保留以下申明:

作者:洋仔聊编程
微信公众号:匠心 Java
原文地址:https://liyangyang.blog.csdn.net/


在分布式系统中,不免波及到对同一数据的并发操作,如何保障分布式系统中数据的并发平安呢?分布式锁!

一:分布式锁

罕用的分布式锁实现形式:

1、基于数据库实现分布式锁

​ 针对于数据库实现的分布式锁,如 mysql 应用应用 for update 独特竞争一个行锁来实现;在 JanusGraph 中,也是基于数据库实现的分布式锁,这里的 数据库 指的是咱们以后应用的第三方backend storage,具体的实现形式也和 mysql 有所不同,具体咱们会在下文剖析

2、基于 Redis 实现的分布式锁

​ 基于 lua 脚本+setNx 实现

3、基于 zk 实现的分布式锁

​ 基于 znode 的有序性和 长期节点 +zk 的watcher 机制实现

4、MVCC 多版本并发管制乐观锁实现

本文次要介绍 Janusgraph 的锁机制,其余的实现机制就不在此做详解了

上面咱们来剖析一下 JanusGraph锁机制 实现~

二:JanusGraph 锁机制

在 JanusGraph 中应用的锁机制是:本地锁 + 分布式锁 来实现的;

2.1 一致性行为

JanusGraph 中次要有三种 一致性修饰词 (Consistency Modifier) 来示意 3 种不同的 一致性行为,来管制图库应用过程中的并发问题的管制水平;

public enum ConsistencyModifier {
    DEFAULT,
    LOCK,
    FORK
}

源码中 ConsistencyModifier 枚举类次要作用:用于管制 JanusGraph 在 最终统一或其余非事务性后端系统 上的一致性行为!其作用别离为:

  • DEFAULT:默认的一致性行为,不应用分布式锁进行管制,对配置的存储后端应用由关闭事务保障的默认一致性模型,一致性行为次要取决于存储后端的配置以及关闭事务的(可选)配置;无需显示配置即可应用
  • LOCK:在存储后端反对锁的前提下,显示的获取分布式锁以保障一致性!确切的一致性保障取决于所配置的锁实现;需 management.setConsistency(element, ConsistencyModifier.LOCK); 语句进行配置
  • FORK:只实用于 multi-edgeslist-properties两种状况下应用;使 JanusGraph 批改数据时,采纳先删除后增加新的边 / 属性的形式,而不是笼罩现有的边 / 属性,从而防止潜在的并发写入抵触;需 management.setConsistency(element, ConsistencyModifier.FORK); 进行配置
LOCK

在查问或者插入数据时,是否应用 分布式锁 进行并发管制,在图 shcema 的创立过程中,如上述能够通过配置 schema 元素ConsistencyModifier.LOCK形式管制并发,则在应用过程中就会用 分布式锁 进行并发管制;

为了提高效率,JanusGraph 默认不应用锁定。因而,用户必须为定义 一致性束缚 的每个架构元素决定是否应用锁定。

应用 JanusGraphManagement.setConsistency(element,ConsistencyModifier.LOCK) 显式启用对架构元素的锁定

代码如下所示:

mgmt = graph.openManagement() 
name = mgmt.makePropertyKey('consistentName').dataType(String.class).make() 
index = mgmt.buildIndex('byConsistentName', Vertex.class).addKey(name).unique().buildCompositeIndex() 
mgmt.setConsistency(name, ConsistencyModifier.LOCK) // Ensures only one name per vertex 
mgmt.setConsistency(index, ConsistencyModifier.LOCK) // Ensures name uniqueness in the graph 
mgmt.commit()
FORK

因为边缘作为单个记录存储在根底存储后端中,因而同时批改单个边缘将导致抵触。

FORK就是为了代替LOCK,能够将边缘标签配置为应用ConsistencyModifier.FORK

上面的示例创立一个新的 edge label,并将其设置为ConsistencyModifier.FORK

mgmt = graph.openManagement() 
related = mgmt.makeEdgeLabel('related').make() 
mgmt.setConsistency(related, ConsistencyModifier.FORK) 
mgmt.commit()

通过上述配置后,批改标签配置为 FORK 的 edge 时,操作步骤为:

  1. 首先,删除该边
  2. 将批改后的边作为新边增加

因而,如果两个并发事务批改了同一边缘,则提交时将存在边缘的两个批改后的正本,能够在查问遍历期间依据须要解决这些正本。

留神 edge fork 仅实用于 MULTI edge。具备多重性束缚的边缘标签不能应用此策略,因为非 MULTI 的边缘标签定义中内置了一个唯一性束缚,该束缚须要显式锁定或应用根底存储后端的抵触解决机制

上面咱们具体来看一下 janusgrph锁机制 的实现:

2.2 LoackID

在介绍锁机制之前,先看一下锁应该锁什么货色呢?

咱们都晓得在 janusgraph 的底层存储中,vertexId 作为 Rowkey,属性和边存储在 cell 中,由 column+value 组成

当咱们批改 节点的属性和边 + 边的属性时 ,很显著只有锁住对应的Rowkey + Column 即可;

Janusgraph 中,这个锁的标识的根底局部就是LockID

LockID = RowKey + Column

源码如下:

KeyColumn lockID = new KeyColumn(key, column);

2.3 本地锁

本地锁 是在任何状况下都须要获取的一个锁,只有获取胜利后,才会进行下述 分布式锁 的获取!

本地锁 是基于 图实例 维度存在的;次要作用是保障以后图实例下的操作中无抵触!

本地锁的实现是通过 ConcurrentHashMap 数据结构来实现的,在图实例维度下惟一;

基于以后 事务 +lockId 来作为 锁标识

获取的次要流程:

联合源码如下:

上述图倡议按照源码一块剖析,源码在 LocalLockMediator 类中的下述办法,上面 源码剖析模块 会详细分析

    public boolean lock(KeyColumn kc, T requester, Instant expires) {}

引入本地锁机制,次要目标: 在图实例维度来做一层锁判断,缩小分布式锁的并发抵触,缩小分布式锁带来的性能耗费

2.4 分布式锁

本地锁 获取胜利之后才会去尝试获取 分布式锁

分布式锁的获取整体分为两局部流程:

  1. 分布式锁信息插入
  2. 分布式锁信息状态判断
分布式锁信息插入

该局部次要是通过 lockID 来结构要插入的 Rowkey 和 column 并将数据插入到 hbase 中;插入胜利即示意这部分解决胜利!

具体流程如下:

分布式锁信息状态判断

该局部在上一部分实现之后才会进行,次要是判断分布式锁是否获取胜利!

查问出以后 hbase 中对应Rowkey 的所有 column,过滤未过期的 column 汇合,比对汇合的第一个 column 是否等于以后事务插入的 column;

等于则获取胜利!不等于则获取失败!

具体流程如下:

三:源码剖析 与 整体流程

源码剖析曾经 push 到 github:https://github.com/YYDreamer/…

1、获取锁的入口

    public void acquireLock(StaticBuffer key, StaticBuffer column, StaticBuffer expectedValue, StoreTransaction txh) throws BackendException {
        // locker 是一个一致性 key 锁对象
        if (locker != null) {
            // 获取以后事务对象
            ExpectedValueCheckingTransaction tx = (ExpectedValueCheckingTransaction) txh;
            // 判断:以后的获取锁操作是否以后事务的操作中存在增删改的操作
            if (tx.isMutationStarted())
                throw new PermanentLockingException("Attempted to obtain a lock after mutations had been persisted");
            // 应用 key+column 组装为 lockID,供下述加锁应用!!!!!KeyColumn lockID = new KeyColumn(key, column);
            log.debug("Attempting to acquireLock on {} ev={}", lockID, expectedValue);
            // 获取本地以后 jvm 过程中的写锁(看下述的 1:写锁获取剖析)//(此处的获取锁只是将对应的 KLV 存储到 Hbase 中!存储胜利并不代表获取锁胜利)// 1. 获取胜利(等同于存储胜利)则继续执行
            // 2. 获取失败(等同于存储失败),会抛出异样,抛出到最上层,打印谬误日志“Could not commit transaction ["+transactionId+"] due to exception”并抛出对应的异样,本次插入数据完结
            locker.writeLock(lockID, tx.getConsistentTx());
            // 执行前提:上述获取锁胜利!// 存储期望值,此处为了实现当雷同的 key + value + tx 多个加锁时,只解决第一个
            // 存储在事务对象中,标识在 commit 判断锁是否获取胜利时,以后事务插入的是哪个锁信息
            tx.storeExpectedValue(this, lockID, expectedValue);
        } else {
            // locker 为空状况下,间接抛出一个运行时异样,终止程序
            store.acquireLock(key, column, expectedValue, unwrapTx(txh));
        }
    }

2、执行 locker.writeLock(lockID, tx.getConsistentTx()) 触发锁获取

    public void writeLock(KeyColumn lockID, StoreTransaction tx) throws TemporaryLockingException, PermanentLockingException {if (null != tx.getConfiguration().getGroupName()) {MetricManager.INSTANCE.getCounter(tx.getConfiguration().getGroupName(), M_LOCKS, M_WRITE, M_CALLS).inc();}

        // 判断以后事务是否在图实例的维度 曾经占据了 lockID 的锁
        // 此处的 lockState 在一个事务胜利获取本地锁 + 分布式锁后,以事务为 key、value 为 map,其中 key 为 lockID,value 为加锁状态(开始工夫、过期工夫等)if (lockState.has(tx, lockID)) {log.debug("Transaction {} already wrote lock on {}", tx, lockID);
            return;
        }

        // 以后事务没有占据 lockID 对应的锁
        // 进行(lockLocally(lockID, tx)本地加锁锁定操作,if (lockLocally(lockID, tx)) {
            boolean ok = false;
            try {
                // 在本地锁获取胜利的前提下:// 尝试获取基于 Hbase 实现的分布式锁;// 留神!!!(此处的获取锁只是将对应的 KLV 存储到 Hbase 中!存储胜利并不代表获取锁胜利)S stat = writeSingleLock(lockID, tx);
                // 获取锁分布式锁胜利后(即写入胜利后),更新本地锁的过期工夫为分布式锁的过期工夫
                lockLocally(lockID, stat.getExpirationTimestamp(), tx); // update local lock expiration time
                // 将上述获取的锁,存储在标识以后存在锁的汇合中 Map<tx,Map<lockID,S>>,key 为事务、value 中的 map 为以后事务获取的锁,key 为 lockID,value 为以后获取分布式锁的 ConsistentKeyStatus(一致性密匙状态)对象
                lockState.take(tx, lockID, stat);
                ok = true;
            } catch (TemporaryBackendException tse) {
                // 在获取分布式锁失败后,捕捉该异样,并抛出该异样
                throw new TemporaryLockingException(tse);
            } catch (AssertionError ae) {
                // Concession to ease testing with mocks & behavior verification
                ok = true;
                throw ae;
            } catch (Throwable t) {
                // 呈现底层存储谬误!则间接加锁失败!throw new PermanentLockingException(t);
            } finally {
                // 判断是否胜利获取锁,没有获分布式锁的,则开释本地锁
                if (!ok) {
                    // 没有胜利获取锁,则开释本地锁
                    // lockState.release(tx, lockID); // has no effect
                    unlockLocally(lockID, tx);
                    if (null != tx.getConfiguration().getGroupName()) {MetricManager.INSTANCE.getCounter(tx.getConfiguration().getGroupName(), M_LOCKS, M_WRITE, M_EXCEPTIONS).inc();}
                }
            }
        } else {
            // 如果获取本地锁失败,则间接抛出异样,不进行从新本地争用

            // Fail immediately with no retries on local contention
            throw new PermanentLockingException("Local lock contention");
        }
    }

蕴含两个局部:

  1. 本地锁的获取lockLocally(lockID, tx)
  2. 分布式锁的获取 writeSingleLock(lockID, tx) 留神此处只是将锁信息写入到 Hbase 中,并不代表获取分布式锁胜利,只是做了上述介绍的第一个阶段 分布式锁信息插入

3、本地锁获取 lockLocally(lockID, tx)

  public boolean lock(KeyColumn kc, T requester, Instant expires) {
        assert null != kc;
        assert null != requester;

        final StackTraceElement[] acquiredAt = log.isTraceEnabled() ?
                new Throwable("Lock acquisition by" + requester).getStackTrace() : null;

        // map 的 value,以事务为外围
        final AuditRecord<T> audit = new AuditRecord<>(requester, expires, acquiredAt);
        //  ConcurrentHashMap 实现 locks, 以 lockID 为 key,事务为外围 value
        final AuditRecord<T> inMap = locks.putIfAbsent(kc, audit);

        boolean success = false;

        // 代表以后 map 中不存在 lockID,标识着锁没有被占用,胜利获取锁
        if (null == inMap) {
            // Uncontended lock succeeded
            if (log.isTraceEnabled()) {log.trace("New local lock created: {} namespace={} txn={}",
                    kc, name, requester);
            }
            success = true;
        } else if (inMap.equals(audit)) {
            // 代表以后存在 lockID,比对旧 value 和新 value 中的事务对象是否是同一个
            // requester has already locked kc; update expiresAt
            // 上述判断后,事务对象为同一个,标识以后事务曾经获取这个 lockID 的锁;// 1. 这一步进行 cas 替换,作用是为了刷新过期工夫
            // 2. 并发解决,如果因为锁过期被其余事务占据,则占用锁失败
            success = locks.replace(kc, inMap, audit);
            if (log.isTraceEnabled()) {if (success) {log.trace("Updated local lock expiration: {} namespace={} txn={} oldexp={} newexp={}",
                        kc, name, requester, inMap.expires, audit.expires);
                } else {log.trace("Failed to update local lock expiration: {} namespace={} txn={} oldexp={} newexp={}",
                        kc, name, requester, inMap.expires, audit.expires);
                }
            }
        } else if (0 > inMap.expires.compareTo(times.getTime())) {
            // 比拟过期工夫,如果锁曾经过期,则以后事务能够占用该锁

            // the recorded lock has expired; replace it
            // 1. 以后事务占用锁
            // 2. 并发解决,如果因为锁过期被其余事务占据,则占用锁失败
            success = locks.replace(kc, inMap, audit);
            if (log.isTraceEnabled()) {log.trace("Discarding expired lock: {} namespace={} txn={} expired={}",
                    kc, name, inMap.holder, inMap.expires);
            }
        } else {
            // 标识:锁被其余事务占用,并且未过期,则占用锁失败
            // we lost to a valid lock
            if (log.isTraceEnabled()) {log.trace("Local lock failed: {} namespace={} txn={} (already owned by {})",
                    kc, name, requester, inMap);
                log.trace("Owner stacktrace:\n        {}", Joiner.on("\n").join(inMap.acquiredAt));
            }
        }

        return success;
    }

如上述介绍,本地锁的实现是通过 ConcurrentHashMap 数据结构来实现的,在图实例维度下惟一!

4、分布式锁获取第一个阶段:分布式锁信息插入

    protected ConsistentKeyLockStatus writeSingleLock(KeyColumn lockID, StoreTransaction txh) throws Throwable {

        // 组装插入 hbase 数据的 Rowkey
        final StaticBuffer lockKey = serializer.toLockKey(lockID.getKey(), lockID.getColumn());
        StaticBuffer oldLockCol = null;

        // 进行尝试插入,默认尝试次数 3 次
        for (int i = 0; i < lockRetryCount; i++) {
            // 尝试将数据插入到 hbase 中;oldLockCol 示意要删除的 column 代表上一次尝试插入的数据
            WriteResult wr = tryWriteLockOnce(lockKey, oldLockCol, txh);
            // 如果插入胜利
            if (wr.isSuccessful() && wr.getDuration().compareTo(lockWait) <= 0) {final Instant writeInstant = wr.getWriteTimestamp(); // 写入工夫
                final Instant expireInstant = writeInstant.plus(lockExpire);// 过期工夫
                return new ConsistentKeyLockStatus(writeInstant, expireInstant); // 返回插入对象
            }
            // 赋值以后的尝试插入的数据,要在下一次尝试时删除
            oldLockCol = wr.getLockCol();
            // 判断插入失败起因,长期异样进行尝试,非长期异样进行尝试!handleMutationFailure(lockID, lockKey, wr, txh);
        }
        // 解决在尝试了 3 次之后还是没插入胜利的状况,删除最初一次尝试插入的数据
        tryDeleteLockOnce(lockKey, oldLockCol, txh);
        // TODO log exception or successful too-slow write here
        // 抛出异样,标识导入数据失败
        throw new TemporaryBackendException("Lock write retry count exceeded");
    }

上述只是将锁信息插入,插入胜利标识该流程完结

5、分布式锁获取第一个阶段:分布式锁锁定是否胜利断定

这一步,是在 commit 阶段进行的验证

    public void commit() throws BackendException {
        // 此办法内调用 checkSingleLock 查看分布式锁的获取后果
        flushInternal();
        tx.commit();}

最终会调用 checkSingleLock 办法,判断获取锁的状态!

    protected void checkSingleLock(final KeyColumn kc, final ConsistentKeyLockStatus ls,
                                   final StoreTransaction tx) throws BackendException, InterruptedException {

        // 查看是否被查看过
        if (ls.isChecked())
            return;

        // Slice the store
        KeySliceQuery ksq = new KeySliceQuery(serializer.toLockKey(kc.getKey(), kc.getColumn()), LOCK_COL_START,
            LOCK_COL_END);
        // 此处从 hbase 中查问出锁定的行的所有列!默认查问重试次数 3
        List<Entry> claimEntries = getSliceWithRetries(ksq, tx);

        // 从每个返回条目标列中提取 timestamp 和 rid,而后过滤出带有过期工夫戳的 timestamp 对象
        final Iterable<TimestampRid> iterable = Iterables.transform(claimEntries,
            e -> serializer.fromLockColumn(e.getColumnAs(StaticBuffer.STATIC_FACTORY), times));
        final List<TimestampRid> unexpiredTRs = new ArrayList<>(Iterables.size(iterable));
        for (TimestampRid tr : iterable) { // 过滤获取未过期的锁!final Instant cutoffTime = now.minus(lockExpire);
            if (tr.getTimestamp().isBefore(cutoffTime)) {...}
            // 将还未过期的锁记录存储到一个汇合中
            unexpiredTRs.add(tr);
        }
        // 判断以后 tx 是否胜利持有锁!如果咱们插入的列是读取的第一个列,或者后面的列只蕴含咱们本人的 rid(因为咱们是在第一局部的前提下获取的锁,第一局部咱们胜利获取了基于以后过程的锁,所以如果 rid 雷同,代表着咱们也胜利获取到了以后的分布式锁),那么咱们持有锁。否则,另一个过程持有该锁,咱们无奈取得锁
        // 如果,获取锁失败,抛出 TemporaryLockingException 异样!!!!抛出到顶层的 mutator.commitStorage()处,最终导入失败进行事务回滚等操作
        checkSeniority(kc, ls, unexpiredTRs);
        // 如果上述步骤未抛出异样,则标识以后的 tx 曾经胜利获取锁!ls.setChecked();}

四:整体流程

总流程如下图:

整体流程为:

  1. 获取本地锁
  2. 获取分布式锁

    1. 插入分布式锁信息
    2. commit 阶段判断分布式锁获取是否胜利
  3. 获取失败,则重试

五:总结

JanusGraph 的锁机制次要是通过 本地锁 + 分布式锁 来实现分布式系统下的数据一致性;

分布式锁的管制维度为:property、vertex、edge、index 都能够;

JanusGraph反对在数据导入时通过后面 一致性行为 局部所说的 LOCK 来开关分布式锁:

  • LOCK:数据导入时开启分布式锁保障分布式一致性
  • DEFAULT、FORK:数据导入时敞开分布式锁

是否开启分布式锁思考:

在开启分布式锁的状况下,数据导入开销十分大;如果是数据不是要求很高的一致性,并且数据量比拟大,咱们能够抉择敞开分布式锁相干,来进步导入速度;

而后,针对于小数据量的要求高一致性的数据,独自开启分布式锁来保障数据安全;

另外,咱们在不开启分布式锁定的状况下,能够通过针对于导入的 数据的充沛探查 来缩小抵触!

针对于图 schema 的元素开启还是敞开分布式锁,还是依据理论业务状况来决定。

本文有任何问题,可加博主微信或评论指出,感激!

码文不易,给个赞和 star 吧~

本文由博客群发一文多发等经营工具平台 OpenWrite 公布

退出移动版