图解Janusgraph系列-并发平安:锁机制(本地锁+分布式锁)剖析
大家好,我是洋仔
,JanusGraph图解系列文章,实时更新
~
图数据库文章总目录:
- 整顿所有图相干文章,请移步(超链):图数据库系列-文章总目录
- 地址:https://liyangyang.blog.csdn.net/article/details/111031257
源码剖析相干可查看github(码文不易,求个star~)
: https://github.com/YYDreamer/janusgraph下述流程高清大图地址:https://www.processon.com/view/link/5f471b2e7d9c086b9903b629
版本:JanusGraph-0.5.2
转载文章请保留以下申明:
作者:洋仔聊编程
微信公众号:匠心Java
原文地址:https://liyangyang.blog.csdn.net/
在分布式系统中,不免波及到对同一数据的并发操作,如何保障分布式系统中数据的并发平安呢?分布式锁!
一:分布式锁
罕用的分布式锁实现形式:
1、基于数据库实现分布式锁
针对于数据库实现的分布式锁,如mysql应用应用for update
独特竞争一个行锁来实现; 在JanusGraph中,也是基于数据库实现的分布式锁,这里的数据库
指的是咱们以后应用的第三方backend storage
,具体的实现形式也和mysql有所不同,具体咱们会在下文剖析
2、基于Redis实现的分布式锁
基于lua脚本
+setNx
实现
3、基于zk实现的分布式锁
基于znode
的有序性和长期节点
+zk的watcher
机制实现
4、MVCC多版本并发管制乐观锁实现
本文次要介绍Janusgraph的锁机制,其余的实现机制就不在此做详解了
上面咱们来剖析一下JanusGraph
的锁机制
实现~
二:JanusGraph锁机制
在JanusGraph中应用的锁机制是:本地锁
+ 分布式锁
来实现的;
2.1 一致性行为
在JanusGraph
中次要有三种一致性修饰词(Consistency Modifier)
来示意3种不同的一致性行为
,来管制图库应用过程中的并发问题的管制水平;
public enum ConsistencyModifier { DEFAULT, LOCK, FORK}
源码中ConsistencyModifier
枚举类次要作用:用于管制JanusGraph在最终统一或其余非事务性后端系统
上的一致性行为!其作用别离为:
- DEFAULT:默认的一致性行为,不应用分布式锁进行管制,对配置的存储后端应用由关闭事务保障的默认一致性模型,一致性行为次要取决于存储后端的配置以及关闭事务的(可选)配置;无需显示配置即可应用
- LOCK:在存储后端反对锁的前提下,显示的获取分布式锁以保障一致性!确切的一致性保障取决于所配置的锁实现;需
management.setConsistency(element, ConsistencyModifier.LOCK);
语句进行配置 - FORK:只实用于
multi-edges
和list-properties
两种状况下应用;使JanusGraph批改数据时,采纳先删除后增加新的边/属性的形式,而不是笼罩现有的边/属性,从而防止潜在的并发写入抵触;需management.setConsistency(element, ConsistencyModifier.FORK);
进行配置
LOCK
在查问或者插入数据时,是否应用分布式锁
进行并发管制,在图shcema
的创立过程中,如上述能够通过配置schema元素
为ConsistencyModifier.LOCK
形式管制并发,则在应用过程中就会用分布式锁
进行并发管制;
为了提高效率,JanusGraph默认不应用锁定。 因而,用户必须为定义一致性束缚
的每个架构元素决定是否应用锁定。
应用JanusGraphManagement.setConsistency(element,ConsistencyModifier.LOCK)
显式启用对架构元素的锁定
代码如下所示:
mgmt = graph.openManagement() name = mgmt.makePropertyKey('consistentName').dataType(String.class).make() index = mgmt.buildIndex('byConsistentName', Vertex.class).addKey(name).unique().buildCompositeIndex() mgmt.setConsistency(name, ConsistencyModifier.LOCK) // Ensures only one name per vertex mgmt.setConsistency(index, ConsistencyModifier.LOCK) // Ensures name uniqueness in the graph mgmt.commit()
FORK
因为边缘作为单个记录存储在根底存储后端中,因而同时批改单个边缘将导致抵触。
FORK
就是为了代替LOCK
,能够将边缘标签配置为应用ConsistencyModifier.FORK
。
上面的示例创立一个新的edge label,并将其设置为ConsistencyModifier.FORK
mgmt = graph.openManagement() related = mgmt.makeEdgeLabel('related').make() mgmt.setConsistency(related, ConsistencyModifier.FORK) mgmt.commit()
通过上述配置后,批改标签配置为FORK
的edge时,操作步骤为:
- 首先,删除该边
- 将批改后的边作为新边增加
因而,如果两个并发事务批改了同一边缘,则提交时将存在边缘的两个批改后的正本,能够在查问遍历期间依据须要解决这些正本。
留神edge fork仅实用于MULTI edge。 具备多重性束缚的边缘标签不能应用此策略,因为非MULTI的边缘标签定义中内置了一个唯一性束缚,该束缚须要显式锁定或应用根底存储后端的抵触解决机制
上面咱们具体来看一下janusgrph
的锁机制
的实现:
2.2 LoackID
在介绍锁机制之前,先看一下锁应该锁什么货色呢?
咱们都晓得在janusgraph
的底层存储中,vertexId作为Rowkey,属性和边存储在cell中,由column+value组成
当咱们批改节点的属性和边
+边的属性时
,很显著只有锁住对应的Rowkey + Column
即可;
在Janusgraph
中,这个锁的标识的根底局部就是LockID
:
LockID = RowKey + Column
源码如下:
KeyColumn lockID = new KeyColumn(key, column);
2.3 本地锁
本地锁
是在任何状况下都须要获取的一个锁,只有获取胜利后,才会进行下述分布式锁
的获取!
本地锁
是基于图实例
维度存在的;次要作用是保障以后图实例下的操作中无抵触!
本地锁的实现是通过ConcurrentHashMap
数据结构来实现的,在图实例维度下惟一;
基于以后事务
+lockId
来作为锁标识
;
获取的次要流程:
联合源码如下:
上述图倡议按照源码一块剖析,源码在LocalLockMediator
类中的下述办法,上面源码剖析模块
会详细分析
public boolean lock(KeyColumn kc, T requester, Instant expires) { }
引入本地锁机制,次要目标: 在图实例维度来做一层锁判断,缩小分布式锁的并发抵触
,缩小分布式锁带来的性能耗费
2.4 分布式锁
在本地锁
获取胜利之后才会去尝试获取分布式锁
;
分布式锁的获取整体分为两局部流程:
分布式锁信息插入
分布式锁信息状态判断
分布式锁信息插入
该局部次要是通过lockID
来结构要插入的Rowkey和column
并将数据插入到hbase
中;插入胜利即示意这部分解决胜利!
具体流程如下:
分布式锁信息状态判断
该局部在上一部分实现之后才会进行,次要是判断分布式锁是否获取胜利!
查问出以后hbase中对应Rowkey的所有column
,过滤未过期的column汇合,比对汇合的第一个column是否等于以后事务插入的column;
等于则获取胜利!不等于则获取失败!
具体流程如下:
三:源码剖析 与 整体流程
源码剖析曾经push到github:https://github.com/YYDreamer/...
1、获取锁的入口
public void acquireLock(StaticBuffer key, StaticBuffer column, StaticBuffer expectedValue, StoreTransaction txh) throws BackendException { // locker是一个一致性key锁对象 if (locker != null) { // 获取以后事务对象 ExpectedValueCheckingTransaction tx = (ExpectedValueCheckingTransaction) txh; // 判断:以后的获取锁操作是否以后事务的操作中存在增删改的操作 if (tx.isMutationStarted()) throw new PermanentLockingException("Attempted to obtain a lock after mutations had been persisted"); // 应用key+column组装为lockID,供下述加锁应用!!!!! KeyColumn lockID = new KeyColumn(key, column); log.debug("Attempting to acquireLock on {} ev={}", lockID, expectedValue); // 获取本地以后jvm过程中的写锁(看下述的 1:写锁获取剖析) // (此处的获取锁只是将对应的KLV存储到Hbase中!存储胜利并不代表获取锁胜利) // 1. 获取胜利(等同于存储胜利)则继续执行 // 2. 获取失败(等同于存储失败),会抛出异样,抛出到最上层,打印谬误日志“Could not commit transaction ["+transactionId+"] due to exception” 并抛出对应的异样,本次插入数据完结 locker.writeLock(lockID, tx.getConsistentTx()); // 执行前提:上述获取锁胜利! // 存储期望值,此处为了实现当雷同的key + value + tx多个加锁时,只解决第一个 // 存储在事务对象中,标识在commit判断锁是否获取胜利时,以后事务插入的是哪个锁信息 tx.storeExpectedValue(this, lockID, expectedValue); } else { // locker为空状况下,间接抛出一个运行时异样,终止程序 store.acquireLock(key, column, expectedValue, unwrapTx(txh)); } }
2、执行 locker.writeLock(lockID, tx.getConsistentTx()) 触发锁获取
public void writeLock(KeyColumn lockID, StoreTransaction tx) throws TemporaryLockingException, PermanentLockingException { if (null != tx.getConfiguration().getGroupName()) { MetricManager.INSTANCE.getCounter(tx.getConfiguration().getGroupName(), M_LOCKS, M_WRITE, M_CALLS).inc(); } // 判断以后事务是否在图实例的维度 曾经占据了lockID的锁 // 此处的lockState在一个事务胜利获取本地锁+分布式锁后,以事务为key、value为map,其中key为lockID,value为加锁状态(开始工夫、过期工夫等) if (lockState.has(tx, lockID)) { log.debug("Transaction {} already wrote lock on {}", tx, lockID); return; } // 以后事务没有占据lockID对应的锁 // 进行(lockLocally(lockID, tx) 本地加锁锁定操作, if (lockLocally(lockID, tx)) { boolean ok = false; try { // 在本地锁获取胜利的前提下: // 尝试获取基于Hbase实现的分布式锁; // 留神!!!(此处的获取锁只是将对应的KLV存储到Hbase中!存储胜利并不代表获取锁胜利) S stat = writeSingleLock(lockID, tx); // 获取锁分布式锁胜利后(即写入胜利后),更新本地锁的过期工夫为分布式锁的过期工夫 lockLocally(lockID, stat.getExpirationTimestamp(), tx); // update local lock expiration time // 将上述获取的锁,存储在标识以后存在锁的汇合中Map<tx,Map<lockID,S>>, key为事务、value中的map为以后事务获取的锁,key为lockID,value为以后获取分布式锁的ConsistentKeyStatus(一致性密匙状态)对象 lockState.take(tx, lockID, stat); ok = true; } catch (TemporaryBackendException tse) { // 在获取分布式锁失败后,捕捉该异样,并抛出该异样 throw new TemporaryLockingException(tse); } catch (AssertionError ae) { // Concession to ease testing with mocks & behavior verification ok = true; throw ae; } catch (Throwable t) { // 呈现底层存储谬误! 则间接加锁失败! throw new PermanentLockingException(t); } finally { // 判断是否胜利获取锁,没有获分布式锁的,则开释本地锁 if (!ok) { // 没有胜利获取锁,则开释本地锁 // lockState.release(tx, lockID); // has no effect unlockLocally(lockID, tx); if (null != tx.getConfiguration().getGroupName()) { MetricManager.INSTANCE.getCounter(tx.getConfiguration().getGroupName(), M_LOCKS, M_WRITE, M_EXCEPTIONS).inc(); } } } } else { // 如果获取本地锁失败,则间接抛出异样,不进行从新本地争用 // Fail immediately with no retries on local contention throw new PermanentLockingException("Local lock contention"); } }
蕴含两个局部:
- 本地锁的获取
lockLocally(lockID, tx)
- 分布式锁的获取
writeSingleLock(lockID, tx)
留神此处只是将锁信息写入到Hbase中,并不代表获取分布式锁胜利,只是做了上述介绍的第一个阶段分布式锁信息插入
3、本地锁获取 lockLocally(lockID, tx)
public boolean lock(KeyColumn kc, T requester, Instant expires) { assert null != kc; assert null != requester; final StackTraceElement[] acquiredAt = log.isTraceEnabled() ? new Throwable("Lock acquisition by " + requester).getStackTrace() : null; // map的value,以事务为外围 final AuditRecord<T> audit = new AuditRecord<>(requester, expires, acquiredAt); // ConcurrentHashMap实现locks, 以lockID为key,事务为外围value final AuditRecord<T> inMap = locks.putIfAbsent(kc, audit); boolean success = false; // 代表以后map中不存在lockID,标识着锁没有被占用,胜利获取锁 if (null == inMap) { // Uncontended lock succeeded if (log.isTraceEnabled()) { log.trace("New local lock created: {} namespace={} txn={}", kc, name, requester); } success = true; } else if (inMap.equals(audit)) { // 代表以后存在lockID,比对旧value和新value中的事务对象是否是同一个 // requester has already locked kc; update expiresAt // 上述判断后,事务对象为同一个,标识以后事务曾经获取这个lockID的锁; // 1. 这一步进行cas替换,作用是为了刷新过期工夫 // 2. 并发解决,如果因为锁过期被其余事务占据,则占用锁失败 success = locks.replace(kc, inMap, audit); if (log.isTraceEnabled()) { if (success) { log.trace("Updated local lock expiration: {} namespace={} txn={} oldexp={} newexp={}", kc, name, requester, inMap.expires, audit.expires); } else { log.trace("Failed to update local lock expiration: {} namespace={} txn={} oldexp={} newexp={}", kc, name, requester, inMap.expires, audit.expires); } } } else if (0 > inMap.expires.compareTo(times.getTime())) { // 比拟过期工夫,如果锁曾经过期,则以后事务能够占用该锁 // the recorded lock has expired; replace it // 1. 以后事务占用锁 // 2. 并发解决,如果因为锁过期被其余事务占据,则占用锁失败 success = locks.replace(kc, inMap, audit); if (log.isTraceEnabled()) { log.trace("Discarding expired lock: {} namespace={} txn={} expired={}", kc, name, inMap.holder, inMap.expires); } } else { // 标识:锁被其余事务占用,并且未过期,则占用锁失败 // we lost to a valid lock if (log.isTraceEnabled()) { log.trace("Local lock failed: {} namespace={} txn={} (already owned by {})", kc, name, requester, inMap); log.trace("Owner stacktrace:\n {}", Joiner.on("\n ").join(inMap.acquiredAt)); } } return success; }
如上述介绍,本地锁的实现是通过ConcurrentHashMap
数据结构来实现的,在图实例维度下惟一!
4、分布式锁获取第一个阶段:分布式锁信息插入
protected ConsistentKeyLockStatus writeSingleLock(KeyColumn lockID, StoreTransaction txh) throws Throwable { // 组装插入hbase数据的Rowkey final StaticBuffer lockKey = serializer.toLockKey(lockID.getKey(), lockID.getColumn()); StaticBuffer oldLockCol = null; // 进行尝试插入 ,默认尝试次数3次 for (int i = 0; i < lockRetryCount; i++) { // 尝试将数据插入到hbase中;oldLockCol示意要删除的column代表上一次尝试插入的数据 WriteResult wr = tryWriteLockOnce(lockKey, oldLockCol, txh); // 如果插入胜利 if (wr.isSuccessful() && wr.getDuration().compareTo(lockWait) <= 0) { final Instant writeInstant = wr.getWriteTimestamp(); // 写入工夫 final Instant expireInstant = writeInstant.plus(lockExpire);// 过期工夫 return new ConsistentKeyLockStatus(writeInstant, expireInstant); // 返回插入对象 } // 赋值以后的尝试插入的数据,要在下一次尝试时删除 oldLockCol = wr.getLockCol(); // 判断插入失败起因,长期异样进行尝试,非长期异样进行尝试! handleMutationFailure(lockID, lockKey, wr, txh); } // 解决在尝试了3次之后还是没插入胜利的状况,删除最初一次尝试插入的数据 tryDeleteLockOnce(lockKey, oldLockCol, txh); // TODO log exception or successful too-slow write here // 抛出异样,标识导入数据失败 throw new TemporaryBackendException("Lock write retry count exceeded"); }
上述只是将锁信息插入,插入胜利标识该流程完结
5、分布式锁获取第一个阶段:分布式锁锁定是否胜利断定
这一步,是在commit
阶段进行的验证
public void commit() throws BackendException { // 此办法内调用checkSingleLock 查看分布式锁的获取后果 flushInternal(); tx.commit(); }
最终会调用checkSingleLock
办法,判断获取锁的状态!
protected void checkSingleLock(final KeyColumn kc, final ConsistentKeyLockStatus ls, final StoreTransaction tx) throws BackendException, InterruptedException { // 查看是否被查看过 if (ls.isChecked()) return; // Slice the store KeySliceQuery ksq = new KeySliceQuery(serializer.toLockKey(kc.getKey(), kc.getColumn()), LOCK_COL_START, LOCK_COL_END); // 此处从hbase中查问出锁定的行的所有列! 默认查问重试次数3 List<Entry> claimEntries = getSliceWithRetries(ksq, tx); // 从每个返回条目标列中提取timestamp和rid,而后过滤出带有过期工夫戳的timestamp对象 final Iterable<TimestampRid> iterable = Iterables.transform(claimEntries, e -> serializer.fromLockColumn(e.getColumnAs(StaticBuffer.STATIC_FACTORY), times)); final List<TimestampRid> unexpiredTRs = new ArrayList<>(Iterables.size(iterable)); for (TimestampRid tr : iterable) { // 过滤获取未过期的锁! final Instant cutoffTime = now.minus(lockExpire); if (tr.getTimestamp().isBefore(cutoffTime)) { ... } // 将还未过期的锁记录存储到一个汇合中 unexpiredTRs.add(tr); } // 判断以后tx是否胜利持有锁! 如果咱们插入的列是读取的第一个列,或者后面的列只蕴含咱们本人的rid(因为咱们是在第一局部的前提下获取的锁,第一局部咱们胜利获取了基于以后过程的锁,所以如果rid雷同,代表着咱们也胜利获取到了以后的分布式锁),那么咱们持有锁。否则,另一个过程持有该锁,咱们无奈取得锁 // 如果,获取锁失败,抛出TemporaryLockingException异样!!!! 抛出到顶层的mutator.commitStorage()处,最终导入失败进行事务回滚等操作 checkSeniority(kc, ls, unexpiredTRs); // 如果上述步骤未抛出异样,则标识以后的tx曾经胜利获取锁! ls.setChecked(); }
四:整体流程
总流程如下图:
整体流程为:
- 获取本地锁
获取分布式锁
- 插入分布式锁信息
- commit阶段判断分布式锁获取是否胜利
- 获取失败,则重试
五:总结
JanusGraph的锁机制次要是通过本地锁+分布式锁
来实现分布式系统下的数据一致性;
分布式锁的管制维度为:property、vertex、edge、index都能够;
JanusGraph
反对在数据导入时通过后面一致性行为
局部所说的LOCK
来开关分布式锁:
- LOCK:数据导入时开启分布式锁保障分布式一致性
- DEFAULT、FORK:数据导入时敞开分布式锁
是否开启分布式锁思考:
在开启分布式锁的状况下,数据导入开销十分大;如果是数据不是要求很高的一致性,并且数据量比拟大,咱们能够抉择敞开分布式锁相干,来进步导入速度;
而后,针对于小数据量的要求高一致性的数据,独自开启分布式锁来保障数据安全;
另外,咱们在不开启分布式锁定的状况下,能够通过针对于导入的数据的充沛探查
来缩小抵触!
针对于图schema的元素开启还是敞开分布式锁,还是依据理论业务状况来决定。
本文有任何问题,可加博主微信或评论指出,感激!码文不易,给个赞和star吧~
本文由博客群发一文多发等经营工具平台 OpenWrite 公布