图解Janusgraph系列-并发平安:锁机制(本地锁+分布式锁)剖析

大家好,我是洋仔,JanusGraph图解系列文章,实时更新~

图数据库文章总目录:
  • 整顿所有图相干文章,请移步(超链):图数据库系列-文章总目录
  • 地址:https://liyangyang.blog.csdn.net/article/details/111031257
源码剖析相干可查看github(码文不易,求个star~): https://github.com/YYDreamer/janusgraph

下述流程高清大图地址:https://www.processon.com/view/link/5f471b2e7d9c086b9903b629

版本:JanusGraph-0.5.2

转载文章请保留以下申明:

作者:洋仔聊编程
微信公众号:匠心Java
原文地址:https://liyangyang.blog.csdn.net/

在分布式系统中,不免波及到对同一数据的并发操作,如何保障分布式系统中数据的并发平安呢?分布式锁!

一:分布式锁

罕用的分布式锁实现形式:

1、基于数据库实现分布式锁

针对于数据库实现的分布式锁,如mysql应用应用for update独特竞争一个行锁来实现; 在JanusGraph中,也是基于数据库实现的分布式锁,这里的数据库指的是咱们以后应用的第三方backend storage,具体的实现形式也和mysql有所不同,具体咱们会在下文剖析

2、基于Redis实现的分布式锁

基于lua脚本+setNx实现

3、基于zk实现的分布式锁

基于znode的有序性和长期节点+zk的watcher机制实现

4、MVCC多版本并发管制乐观锁实现

本文次要介绍Janusgraph的锁机制,其余的实现机制就不在此做详解了

上面咱们来剖析一下JanusGraph锁机制实现~

二:JanusGraph锁机制

在JanusGraph中应用的锁机制是:本地锁 + 分布式锁来实现的;

2.1 一致性行为

JanusGraph中次要有三种一致性修饰词(Consistency Modifier)来示意3种不同的一致性行为,来管制图库应用过程中的并发问题的管制水平;

public enum ConsistencyModifier {    DEFAULT,    LOCK,    FORK}

源码中ConsistencyModifier枚举类次要作用:用于管制JanusGraph在最终统一或其余非事务性后端系统上的一致性行为!其作用别离为:

  • DEFAULT:默认的一致性行为,不应用分布式锁进行管制,对配置的存储后端应用由关闭事务保障的默认一致性模型,一致性行为次要取决于存储后端的配置以及关闭事务的(可选)配置;无需显示配置即可应用
  • LOCK:在存储后端反对锁的前提下,显示的获取分布式锁以保障一致性!确切的一致性保障取决于所配置的锁实现;需management.setConsistency(element, ConsistencyModifier.LOCK);语句进行配置
  • FORK:只实用于multi-edgeslist-properties两种状况下应用;使JanusGraph批改数据时,采纳先删除后增加新的边/属性的形式,而不是笼罩现有的边/属性,从而防止潜在的并发写入抵触;需management.setConsistency(element, ConsistencyModifier.FORK);进行配置
LOCK

在查问或者插入数据时,是否应用分布式锁进行并发管制,在图shcema的创立过程中,如上述能够通过配置schema元素ConsistencyModifier.LOCK形式管制并发,则在应用过程中就会用分布式锁进行并发管制;

为了提高效率,JanusGraph默认不应用锁定。 因而,用户必须为定义一致性束缚的每个架构元素决定是否应用锁定。

应用JanusGraphManagement.setConsistency(element,ConsistencyModifier.LOCK)显式启用对架构元素的锁定

代码如下所示:

mgmt = graph.openManagement() name = mgmt.makePropertyKey('consistentName').dataType(String.class).make() index = mgmt.buildIndex('byConsistentName', Vertex.class).addKey(name).unique().buildCompositeIndex() mgmt.setConsistency(name, ConsistencyModifier.LOCK) // Ensures only one name per vertex mgmt.setConsistency(index, ConsistencyModifier.LOCK) // Ensures name uniqueness in the graph mgmt.commit()
FORK

因为边缘作为单个记录存储在根底存储后端中,因而同时批改单个边缘将导致抵触。

FORK就是为了代替LOCK,能够将边缘标签配置为应用ConsistencyModifier.FORK

上面的示例创立一个新的edge label,并将其设置为ConsistencyModifier.FORK

mgmt = graph.openManagement() related = mgmt.makeEdgeLabel('related').make() mgmt.setConsistency(related, ConsistencyModifier.FORK) mgmt.commit()

通过上述配置后,批改标签配置为FORK的edge时,操作步骤为:

  1. 首先,删除该边
  2. 将批改后的边作为新边增加

因而,如果两个并发事务批改了同一边缘,则提交时将存在边缘的两个批改后的正本,能够在查问遍历期间依据须要解决这些正本。

留神edge fork仅实用于MULTI edge。 具备多重性束缚的边缘标签不能应用此策略,因为非MULTI的边缘标签定义中内置了一个唯一性束缚,该束缚须要显式锁定或应用根底存储后端的抵触解决机制

上面咱们具体来看一下janusgrph锁机制的实现:

2.2 LoackID

在介绍锁机制之前,先看一下锁应该锁什么货色呢?

咱们都晓得在janusgraph的底层存储中,vertexId作为Rowkey,属性和边存储在cell中,由column+value组成

当咱们批改节点的属性和边+边的属性时,很显著只有锁住对应的Rowkey + Column即可;

Janusgraph中,这个锁的标识的根底局部就是LockID

LockID = RowKey + Column

源码如下:

KeyColumn lockID = new KeyColumn(key, column);

2.3 本地锁

本地锁是在任何状况下都须要获取的一个锁,只有获取胜利后,才会进行下述分布式锁的获取!

本地锁是基于图实例维度存在的;次要作用是保障以后图实例下的操作中无抵触!

本地锁的实现是通过ConcurrentHashMap数据结构来实现的,在图实例维度下惟一;

基于以后事务+lockId来作为锁标识

获取的次要流程:

联合源码如下:

上述图倡议按照源码一块剖析,源码在LocalLockMediator类中的下述办法,上面源码剖析模块会详细分析

    public boolean lock(KeyColumn kc, T requester, Instant expires) {    }

引入本地锁机制,次要目标: 在图实例维度来做一层锁判断,缩小分布式锁的并发抵触,缩小分布式锁带来的性能耗费

2.4 分布式锁

本地锁获取胜利之后才会去尝试获取分布式锁

分布式锁的获取整体分为两局部流程:

  1. 分布式锁信息插入
  2. 分布式锁信息状态判断
分布式锁信息插入

该局部次要是通过lockID来结构要插入的Rowkey和column并将数据插入到hbase中;插入胜利即示意这部分解决胜利!

具体流程如下:

分布式锁信息状态判断

该局部在上一部分实现之后才会进行,次要是判断分布式锁是否获取胜利!

查问出以后hbase中对应Rowkey的所有column,过滤未过期的column汇合,比对汇合的第一个column是否等于以后事务插入的column;

等于则获取胜利!不等于则获取失败!

具体流程如下:

三:源码剖析 与 整体流程

源码剖析曾经push到github:https://github.com/YYDreamer/...

1、获取锁的入口

    public void acquireLock(StaticBuffer key, StaticBuffer column, StaticBuffer expectedValue, StoreTransaction txh) throws BackendException {        // locker是一个一致性key锁对象        if (locker != null) {            // 获取以后事务对象            ExpectedValueCheckingTransaction tx = (ExpectedValueCheckingTransaction) txh;            // 判断:以后的获取锁操作是否以后事务的操作中存在增删改的操作            if (tx.isMutationStarted())                throw new PermanentLockingException("Attempted to obtain a lock after mutations had been persisted");            // 应用key+column组装为lockID,供下述加锁应用!!!!!            KeyColumn lockID = new KeyColumn(key, column);            log.debug("Attempting to acquireLock on {} ev={}", lockID, expectedValue);            // 获取本地以后jvm过程中的写锁(看下述的 1:写锁获取剖析)            // (此处的获取锁只是将对应的KLV存储到Hbase中!存储胜利并不代表获取锁胜利)            // 1. 获取胜利(等同于存储胜利)则继续执行            // 2. 获取失败(等同于存储失败),会抛出异样,抛出到最上层,打印谬误日志“Could not commit transaction ["+transactionId+"] due to exception” 并抛出对应的异样,本次插入数据完结            locker.writeLock(lockID, tx.getConsistentTx());            // 执行前提:上述获取锁胜利!            // 存储期望值,此处为了实现当雷同的key + value + tx多个加锁时,只解决第一个            // 存储在事务对象中,标识在commit判断锁是否获取胜利时,以后事务插入的是哪个锁信息            tx.storeExpectedValue(this, lockID, expectedValue);        } else {            // locker为空状况下,间接抛出一个运行时异样,终止程序            store.acquireLock(key, column, expectedValue, unwrapTx(txh));        }    }

2、执行 locker.writeLock(lockID, tx.getConsistentTx()) 触发锁获取

    public void writeLock(KeyColumn lockID, StoreTransaction tx) throws TemporaryLockingException, PermanentLockingException {        if (null != tx.getConfiguration().getGroupName()) {            MetricManager.INSTANCE.getCounter(tx.getConfiguration().getGroupName(), M_LOCKS, M_WRITE, M_CALLS).inc();        }        // 判断以后事务是否在图实例的维度 曾经占据了lockID的锁        // 此处的lockState在一个事务胜利获取本地锁+分布式锁后,以事务为key、value为map,其中key为lockID,value为加锁状态(开始工夫、过期工夫等)        if (lockState.has(tx, lockID)) {            log.debug("Transaction {} already wrote lock on {}", tx, lockID);            return;        }        // 以后事务没有占据lockID对应的锁        // 进行(lockLocally(lockID, tx) 本地加锁锁定操作,        if (lockLocally(lockID, tx)) {            boolean ok = false;            try {                // 在本地锁获取胜利的前提下:                // 尝试获取基于Hbase实现的分布式锁;                // 留神!!!(此处的获取锁只是将对应的KLV存储到Hbase中!存储胜利并不代表获取锁胜利)                S stat = writeSingleLock(lockID, tx);                // 获取锁分布式锁胜利后(即写入胜利后),更新本地锁的过期工夫为分布式锁的过期工夫                lockLocally(lockID, stat.getExpirationTimestamp(), tx); // update local lock expiration time                // 将上述获取的锁,存储在标识以后存在锁的汇合中Map<tx,Map<lockID,S>>,  key为事务、value中的map为以后事务获取的锁,key为lockID,value为以后获取分布式锁的ConsistentKeyStatus(一致性密匙状态)对象                lockState.take(tx, lockID, stat);                ok = true;            } catch (TemporaryBackendException tse) {                // 在获取分布式锁失败后,捕捉该异样,并抛出该异样                throw new TemporaryLockingException(tse);            } catch (AssertionError ae) {                // Concession to ease testing with mocks & behavior verification                ok = true;                throw ae;            } catch (Throwable t) {                // 呈现底层存储谬误! 则间接加锁失败!                throw new PermanentLockingException(t);            } finally {                // 判断是否胜利获取锁,没有获分布式锁的,则开释本地锁                if (!ok) {                    // 没有胜利获取锁,则开释本地锁                    // lockState.release(tx, lockID); // has no effect                    unlockLocally(lockID, tx);                    if (null != tx.getConfiguration().getGroupName()) {                        MetricManager.INSTANCE.getCounter(tx.getConfiguration().getGroupName(), M_LOCKS, M_WRITE, M_EXCEPTIONS).inc();                    }                }            }        } else {            // 如果获取本地锁失败,则间接抛出异样,不进行从新本地争用            // Fail immediately with no retries on local contention            throw new PermanentLockingException("Local lock contention");        }    }

蕴含两个局部:

  1. 本地锁的获取lockLocally(lockID, tx)
  2. 分布式锁的获取writeSingleLock(lockID, tx) 留神此处只是将锁信息写入到Hbase中,并不代表获取分布式锁胜利,只是做了上述介绍的第一个阶段分布式锁信息插入

3、本地锁获取 lockLocally(lockID, tx)

  public boolean lock(KeyColumn kc, T requester, Instant expires) {        assert null != kc;        assert null != requester;        final StackTraceElement[] acquiredAt = log.isTraceEnabled() ?                new Throwable("Lock acquisition by " + requester).getStackTrace() : null;        // map的value,以事务为外围        final AuditRecord<T> audit = new AuditRecord<>(requester, expires, acquiredAt);        //  ConcurrentHashMap实现locks, 以lockID为key,事务为外围value        final AuditRecord<T> inMap = locks.putIfAbsent(kc, audit);        boolean success = false;        // 代表以后map中不存在lockID,标识着锁没有被占用,胜利获取锁        if (null == inMap) {            // Uncontended lock succeeded            if (log.isTraceEnabled()) {                log.trace("New local lock created: {} namespace={} txn={}",                    kc, name, requester);            }            success = true;        } else if (inMap.equals(audit)) {            // 代表以后存在lockID,比对旧value和新value中的事务对象是否是同一个            // requester has already locked kc; update expiresAt            // 上述判断后,事务对象为同一个,标识以后事务曾经获取这个lockID的锁;            // 1. 这一步进行cas替换,作用是为了刷新过期工夫            // 2. 并发解决,如果因为锁过期被其余事务占据,则占用锁失败            success = locks.replace(kc, inMap, audit);            if (log.isTraceEnabled()) {                if (success) {                    log.trace("Updated local lock expiration: {} namespace={} txn={} oldexp={} newexp={}",                        kc, name, requester, inMap.expires, audit.expires);                } else {                    log.trace("Failed to update local lock expiration: {} namespace={} txn={} oldexp={} newexp={}",                        kc, name, requester, inMap.expires, audit.expires);                }            }        } else if (0 > inMap.expires.compareTo(times.getTime())) {            // 比拟过期工夫,如果锁曾经过期,则以后事务能够占用该锁            // the recorded lock has expired; replace it            // 1. 以后事务占用锁            // 2. 并发解决,如果因为锁过期被其余事务占据,则占用锁失败            success = locks.replace(kc, inMap, audit);            if (log.isTraceEnabled()) {                log.trace("Discarding expired lock: {} namespace={} txn={} expired={}",                    kc, name, inMap.holder, inMap.expires);            }        } else {            // 标识:锁被其余事务占用,并且未过期,则占用锁失败            // we lost to a valid lock            if (log.isTraceEnabled()) {                log.trace("Local lock failed: {} namespace={} txn={} (already owned by {})",                    kc, name, requester, inMap);                log.trace("Owner stacktrace:\n        {}", Joiner.on("\n        ").join(inMap.acquiredAt));            }        }        return success;    }

如上述介绍,本地锁的实现是通过ConcurrentHashMap数据结构来实现的,在图实例维度下惟一!

4、分布式锁获取第一个阶段:分布式锁信息插入

    protected ConsistentKeyLockStatus writeSingleLock(KeyColumn lockID, StoreTransaction txh) throws Throwable {        // 组装插入hbase数据的Rowkey        final StaticBuffer lockKey = serializer.toLockKey(lockID.getKey(), lockID.getColumn());        StaticBuffer oldLockCol = null;        // 进行尝试插入 ,默认尝试次数3次        for (int i = 0; i < lockRetryCount; i++) {            // 尝试将数据插入到hbase中;oldLockCol示意要删除的column代表上一次尝试插入的数据            WriteResult wr = tryWriteLockOnce(lockKey, oldLockCol, txh);            // 如果插入胜利            if (wr.isSuccessful() && wr.getDuration().compareTo(lockWait) <= 0) {                final Instant writeInstant = wr.getWriteTimestamp(); // 写入工夫                final Instant expireInstant = writeInstant.plus(lockExpire);// 过期工夫                return new ConsistentKeyLockStatus(writeInstant, expireInstant); // 返回插入对象            }            // 赋值以后的尝试插入的数据,要在下一次尝试时删除            oldLockCol = wr.getLockCol();            // 判断插入失败起因,长期异样进行尝试,非长期异样进行尝试!            handleMutationFailure(lockID, lockKey, wr, txh);        }        // 解决在尝试了3次之后还是没插入胜利的状况,删除最初一次尝试插入的数据        tryDeleteLockOnce(lockKey, oldLockCol, txh);        // TODO log exception or successful too-slow write here        // 抛出异样,标识导入数据失败        throw new TemporaryBackendException("Lock write retry count exceeded");    }

上述只是将锁信息插入,插入胜利标识该流程完结

5、分布式锁获取第一个阶段:分布式锁锁定是否胜利断定

这一步,是在commit阶段进行的验证

    public void commit() throws BackendException {        // 此办法内调用checkSingleLock 查看分布式锁的获取后果        flushInternal();        tx.commit();    }

最终会调用checkSingleLock办法,判断获取锁的状态!

    protected void checkSingleLock(final KeyColumn kc, final ConsistentKeyLockStatus ls,                                   final StoreTransaction tx) throws BackendException, InterruptedException {        // 查看是否被查看过        if (ls.isChecked())            return;        // Slice the store        KeySliceQuery ksq = new KeySliceQuery(serializer.toLockKey(kc.getKey(), kc.getColumn()), LOCK_COL_START,            LOCK_COL_END);        // 此处从hbase中查问出锁定的行的所有列! 默认查问重试次数3        List<Entry> claimEntries = getSliceWithRetries(ksq, tx);        // 从每个返回条目标列中提取timestamp和rid,而后过滤出带有过期工夫戳的timestamp对象        final Iterable<TimestampRid> iterable = Iterables.transform(claimEntries,            e -> serializer.fromLockColumn(e.getColumnAs(StaticBuffer.STATIC_FACTORY), times));        final List<TimestampRid> unexpiredTRs = new ArrayList<>(Iterables.size(iterable));        for (TimestampRid tr : iterable) { // 过滤获取未过期的锁!            final Instant cutoffTime = now.minus(lockExpire);            if (tr.getTimestamp().isBefore(cutoffTime)) {                ...            }            // 将还未过期的锁记录存储到一个汇合中            unexpiredTRs.add(tr);        }        // 判断以后tx是否胜利持有锁! 如果咱们插入的列是读取的第一个列,或者后面的列只蕴含咱们本人的rid(因为咱们是在第一局部的前提下获取的锁,第一局部咱们胜利获取了基于以后过程的锁,所以如果rid雷同,代表着咱们也胜利获取到了以后的分布式锁),那么咱们持有锁。否则,另一个过程持有该锁,咱们无奈取得锁        // 如果,获取锁失败,抛出TemporaryLockingException异样!!!! 抛出到顶层的mutator.commitStorage()处,最终导入失败进行事务回滚等操作        checkSeniority(kc, ls, unexpiredTRs);        // 如果上述步骤未抛出异样,则标识以后的tx曾经胜利获取锁!        ls.setChecked();    }

四:整体流程

总流程如下图:

整体流程为:

  1. 获取本地锁
  2. 获取分布式锁

    1. 插入分布式锁信息
    2. commit阶段判断分布式锁获取是否胜利
  3. 获取失败,则重试

五:总结

JanusGraph的锁机制次要是通过本地锁+分布式锁来实现分布式系统下的数据一致性;

分布式锁的管制维度为:property、vertex、edge、index都能够;

JanusGraph反对在数据导入时通过后面一致性行为局部所说的LOCK来开关分布式锁:

  • LOCK:数据导入时开启分布式锁保障分布式一致性
  • DEFAULT、FORK:数据导入时敞开分布式锁

是否开启分布式锁思考:

在开启分布式锁的状况下,数据导入开销十分大;如果是数据不是要求很高的一致性,并且数据量比拟大,咱们能够抉择敞开分布式锁相干,来进步导入速度;

而后,针对于小数据量的要求高一致性的数据,独自开启分布式锁来保障数据安全;

另外,咱们在不开启分布式锁定的状况下,能够通过针对于导入的数据的充沛探查来缩小抵触!

针对于图schema的元素开启还是敞开分布式锁,还是依据理论业务状况来决定。

本文有任何问题,可加博主微信或评论指出,感激!

码文不易,给个赞和star吧~

本文由博客群发一文多发等经营工具平台 OpenWrite 公布