图解 Janusgraph 系列 - 并发平安:锁机制(本地锁 + 分布式锁)剖析
大家好,我是 洋仔
,JanusGraph 图解系列文章, 实时更新
~
图数据库文章总目录:
- 整顿所有图相干文章,请移步 (超链): 图数据库系列 - 文章总目录
- 地址:https://liyangyang.blog.csdn.net/article/details/111031257
源码剖析相干可查看 github(码文不易,求个 star~)
:https://github.com/YYDreamer/janusgraph下述流程高清大图地址:https://www.processon.com/view/link/5f471b2e7d9c086b9903b629
版本:JanusGraph-0.5.2
转载文章请保留以下申明:
作者:洋仔聊编程
微信公众号:匠心 Java
原文地址:https://liyangyang.blog.csdn.net/
在分布式系统中,不免波及到对同一数据的并发操作,如何保障分布式系统中数据的并发平安呢?分布式锁!
一:分布式锁
罕用的分布式锁实现形式:
1、基于数据库实现分布式锁
针对于数据库实现的分布式锁,如 mysql 应用应用 for update
独特竞争一个行锁来实现;在 JanusGraph 中,也是基于数据库实现的分布式锁,这里的 数据库
指的是咱们以后应用的第三方backend storage
,具体的实现形式也和 mysql 有所不同,具体咱们会在下文剖析
2、基于 Redis 实现的分布式锁
基于 lua 脚本
+setNx
实现
3、基于 zk 实现的分布式锁
基于 znode
的有序性和 长期节点
+zk 的watcher
机制实现
4、MVCC 多版本并发管制乐观锁实现
本文次要介绍 Janusgraph 的锁机制,其余的实现机制就不在此做详解了
上面咱们来剖析一下 JanusGraph
的锁机制
实现~
二:JanusGraph 锁机制
在 JanusGraph 中应用的锁机制是:本地锁
+ 分布式锁
来实现的;
2.1 一致性行为
在 JanusGraph
中次要有三种 一致性修饰词 (Consistency Modifier)
来示意 3 种不同的 一致性行为
,来管制图库应用过程中的并发问题的管制水平;
public enum ConsistencyModifier {
DEFAULT,
LOCK,
FORK
}
源码中 ConsistencyModifier
枚举类次要作用:用于管制 JanusGraph 在 最终统一或其余非事务性后端系统
上的一致性行为!其作用别离为:
- DEFAULT:默认的一致性行为,不应用分布式锁进行管制,对配置的存储后端应用由关闭事务保障的默认一致性模型,一致性行为次要取决于存储后端的配置以及关闭事务的(可选)配置;无需显示配置即可应用
- LOCK:在存储后端反对锁的前提下,显示的获取分布式锁以保障一致性!确切的一致性保障取决于所配置的锁实现;需
management.setConsistency(element, ConsistencyModifier.LOCK);
语句进行配置 - FORK:只实用于
multi-edges
和list-properties
两种状况下应用;使 JanusGraph 批改数据时,采纳先删除后增加新的边 / 属性的形式,而不是笼罩现有的边 / 属性,从而防止潜在的并发写入抵触;需management.setConsistency(element, ConsistencyModifier.FORK);
进行配置
LOCK
在查问或者插入数据时,是否应用 分布式锁
进行并发管制,在图 shcema
的创立过程中,如上述能够通过配置 schema 元素
为ConsistencyModifier.LOCK
形式管制并发,则在应用过程中就会用 分布式锁
进行并发管制;
为了提高效率,JanusGraph 默认不应用锁定。因而,用户必须为定义 一致性束缚
的每个架构元素决定是否应用锁定。
应用 JanusGraphManagement.setConsistency(element,ConsistencyModifier.LOCK)
显式启用对架构元素的锁定
代码如下所示:
mgmt = graph.openManagement()
name = mgmt.makePropertyKey('consistentName').dataType(String.class).make()
index = mgmt.buildIndex('byConsistentName', Vertex.class).addKey(name).unique().buildCompositeIndex()
mgmt.setConsistency(name, ConsistencyModifier.LOCK) // Ensures only one name per vertex
mgmt.setConsistency(index, ConsistencyModifier.LOCK) // Ensures name uniqueness in the graph
mgmt.commit()
FORK
因为边缘作为单个记录存储在根底存储后端中,因而同时批改单个边缘将导致抵触。
FORK
就是为了代替LOCK
,能够将边缘标签配置为应用ConsistencyModifier.FORK
。
上面的示例创立一个新的 edge label,并将其设置为ConsistencyModifier.FORK
mgmt = graph.openManagement()
related = mgmt.makeEdgeLabel('related').make()
mgmt.setConsistency(related, ConsistencyModifier.FORK)
mgmt.commit()
通过上述配置后,批改标签配置为 FORK
的 edge 时,操作步骤为:
- 首先,删除该边
- 将批改后的边作为新边增加
因而,如果两个并发事务批改了同一边缘,则提交时将存在边缘的两个批改后的正本,能够在查问遍历期间依据须要解决这些正本。
留神 edge fork 仅实用于 MULTI edge。具备多重性束缚的边缘标签不能应用此策略,因为非 MULTI 的边缘标签定义中内置了一个唯一性束缚,该束缚须要显式锁定或应用根底存储后端的抵触解决机制
上面咱们具体来看一下 janusgrph
的锁机制
的实现:
2.2 LoackID
在介绍锁机制之前,先看一下锁应该锁什么货色呢?
咱们都晓得在 janusgraph
的底层存储中,vertexId 作为 Rowkey,属性和边存储在 cell 中,由 column+value 组成
当咱们批改 节点的属性和边
+ 边的属性时
,很显著只有锁住对应的Rowkey + Column
即可;
在 Janusgraph
中,这个锁的标识的根底局部就是LockID
:
LockID = RowKey + Column
源码如下:
KeyColumn lockID = new KeyColumn(key, column);
2.3 本地锁
本地锁
是在任何状况下都须要获取的一个锁,只有获取胜利后,才会进行下述 分布式锁
的获取!
本地锁
是基于 图实例
维度存在的;次要作用是保障以后图实例下的操作中无抵触!
本地锁的实现是通过 ConcurrentHashMap
数据结构来实现的,在图实例维度下惟一;
基于以后 事务
+lockId
来作为 锁标识
;
获取的次要流程:
联合源码如下:
上述图倡议按照源码一块剖析,源码在 LocalLockMediator
类中的下述办法,上面 源码剖析模块
会详细分析
public boolean lock(KeyColumn kc, T requester, Instant expires) {}
引入本地锁机制,次要目标: 在图实例维度来做一层锁判断,缩小分布式锁的并发抵触
,缩小分布式锁带来的性能耗费
2.4 分布式锁
在 本地锁
获取胜利之后才会去尝试获取 分布式锁
;
分布式锁的获取整体分为两局部流程:
分布式锁信息插入
分布式锁信息状态判断
分布式锁信息插入
该局部次要是通过 lockID
来结构要插入的 Rowkey 和 column
并将数据插入到 hbase
中;插入胜利即示意这部分解决胜利!
具体流程如下:
分布式锁信息状态判断
该局部在上一部分实现之后才会进行,次要是判断分布式锁是否获取胜利!
查问出以后 hbase 中对应Rowkey 的所有 column
,过滤未过期的 column 汇合,比对汇合的第一个 column 是否等于以后事务插入的 column;
等于则获取胜利!不等于则获取失败!
具体流程如下:
三:源码剖析 与 整体流程
源码剖析曾经 push 到 github:https://github.com/YYDreamer/…
1、获取锁的入口
public void acquireLock(StaticBuffer key, StaticBuffer column, StaticBuffer expectedValue, StoreTransaction txh) throws BackendException {
// locker 是一个一致性 key 锁对象
if (locker != null) {
// 获取以后事务对象
ExpectedValueCheckingTransaction tx = (ExpectedValueCheckingTransaction) txh;
// 判断:以后的获取锁操作是否以后事务的操作中存在增删改的操作
if (tx.isMutationStarted())
throw new PermanentLockingException("Attempted to obtain a lock after mutations had been persisted");
// 应用 key+column 组装为 lockID,供下述加锁应用!!!!!KeyColumn lockID = new KeyColumn(key, column);
log.debug("Attempting to acquireLock on {} ev={}", lockID, expectedValue);
// 获取本地以后 jvm 过程中的写锁(看下述的 1:写锁获取剖析)//(此处的获取锁只是将对应的 KLV 存储到 Hbase 中!存储胜利并不代表获取锁胜利)// 1. 获取胜利(等同于存储胜利)则继续执行
// 2. 获取失败(等同于存储失败),会抛出异样,抛出到最上层,打印谬误日志“Could not commit transaction ["+transactionId+"] due to exception”并抛出对应的异样,本次插入数据完结
locker.writeLock(lockID, tx.getConsistentTx());
// 执行前提:上述获取锁胜利!// 存储期望值,此处为了实现当雷同的 key + value + tx 多个加锁时,只解决第一个
// 存储在事务对象中,标识在 commit 判断锁是否获取胜利时,以后事务插入的是哪个锁信息
tx.storeExpectedValue(this, lockID, expectedValue);
} else {
// locker 为空状况下,间接抛出一个运行时异样,终止程序
store.acquireLock(key, column, expectedValue, unwrapTx(txh));
}
}
2、执行 locker.writeLock(lockID, tx.getConsistentTx()) 触发锁获取
public void writeLock(KeyColumn lockID, StoreTransaction tx) throws TemporaryLockingException, PermanentLockingException {if (null != tx.getConfiguration().getGroupName()) {MetricManager.INSTANCE.getCounter(tx.getConfiguration().getGroupName(), M_LOCKS, M_WRITE, M_CALLS).inc();}
// 判断以后事务是否在图实例的维度 曾经占据了 lockID 的锁
// 此处的 lockState 在一个事务胜利获取本地锁 + 分布式锁后,以事务为 key、value 为 map,其中 key 为 lockID,value 为加锁状态(开始工夫、过期工夫等)if (lockState.has(tx, lockID)) {log.debug("Transaction {} already wrote lock on {}", tx, lockID);
return;
}
// 以后事务没有占据 lockID 对应的锁
// 进行(lockLocally(lockID, tx)本地加锁锁定操作,if (lockLocally(lockID, tx)) {
boolean ok = false;
try {
// 在本地锁获取胜利的前提下:// 尝试获取基于 Hbase 实现的分布式锁;// 留神!!!(此处的获取锁只是将对应的 KLV 存储到 Hbase 中!存储胜利并不代表获取锁胜利)S stat = writeSingleLock(lockID, tx);
// 获取锁分布式锁胜利后(即写入胜利后),更新本地锁的过期工夫为分布式锁的过期工夫
lockLocally(lockID, stat.getExpirationTimestamp(), tx); // update local lock expiration time
// 将上述获取的锁,存储在标识以后存在锁的汇合中 Map<tx,Map<lockID,S>>,key 为事务、value 中的 map 为以后事务获取的锁,key 为 lockID,value 为以后获取分布式锁的 ConsistentKeyStatus(一致性密匙状态)对象
lockState.take(tx, lockID, stat);
ok = true;
} catch (TemporaryBackendException tse) {
// 在获取分布式锁失败后,捕捉该异样,并抛出该异样
throw new TemporaryLockingException(tse);
} catch (AssertionError ae) {
// Concession to ease testing with mocks & behavior verification
ok = true;
throw ae;
} catch (Throwable t) {
// 呈现底层存储谬误!则间接加锁失败!throw new PermanentLockingException(t);
} finally {
// 判断是否胜利获取锁,没有获分布式锁的,则开释本地锁
if (!ok) {
// 没有胜利获取锁,则开释本地锁
// lockState.release(tx, lockID); // has no effect
unlockLocally(lockID, tx);
if (null != tx.getConfiguration().getGroupName()) {MetricManager.INSTANCE.getCounter(tx.getConfiguration().getGroupName(), M_LOCKS, M_WRITE, M_EXCEPTIONS).inc();}
}
}
} else {
// 如果获取本地锁失败,则间接抛出异样,不进行从新本地争用
// Fail immediately with no retries on local contention
throw new PermanentLockingException("Local lock contention");
}
}
蕴含两个局部:
- 本地锁的获取
lockLocally(lockID, tx)
- 分布式锁的获取
writeSingleLock(lockID, tx)
留神此处只是将锁信息写入到 Hbase 中,并不代表获取分布式锁胜利,只是做了上述介绍的第一个阶段分布式锁信息插入
3、本地锁获取 lockLocally(lockID, tx)
public boolean lock(KeyColumn kc, T requester, Instant expires) {
assert null != kc;
assert null != requester;
final StackTraceElement[] acquiredAt = log.isTraceEnabled() ?
new Throwable("Lock acquisition by" + requester).getStackTrace() : null;
// map 的 value,以事务为外围
final AuditRecord<T> audit = new AuditRecord<>(requester, expires, acquiredAt);
// ConcurrentHashMap 实现 locks, 以 lockID 为 key,事务为外围 value
final AuditRecord<T> inMap = locks.putIfAbsent(kc, audit);
boolean success = false;
// 代表以后 map 中不存在 lockID,标识着锁没有被占用,胜利获取锁
if (null == inMap) {
// Uncontended lock succeeded
if (log.isTraceEnabled()) {log.trace("New local lock created: {} namespace={} txn={}",
kc, name, requester);
}
success = true;
} else if (inMap.equals(audit)) {
// 代表以后存在 lockID,比对旧 value 和新 value 中的事务对象是否是同一个
// requester has already locked kc; update expiresAt
// 上述判断后,事务对象为同一个,标识以后事务曾经获取这个 lockID 的锁;// 1. 这一步进行 cas 替换,作用是为了刷新过期工夫
// 2. 并发解决,如果因为锁过期被其余事务占据,则占用锁失败
success = locks.replace(kc, inMap, audit);
if (log.isTraceEnabled()) {if (success) {log.trace("Updated local lock expiration: {} namespace={} txn={} oldexp={} newexp={}",
kc, name, requester, inMap.expires, audit.expires);
} else {log.trace("Failed to update local lock expiration: {} namespace={} txn={} oldexp={} newexp={}",
kc, name, requester, inMap.expires, audit.expires);
}
}
} else if (0 > inMap.expires.compareTo(times.getTime())) {
// 比拟过期工夫,如果锁曾经过期,则以后事务能够占用该锁
// the recorded lock has expired; replace it
// 1. 以后事务占用锁
// 2. 并发解决,如果因为锁过期被其余事务占据,则占用锁失败
success = locks.replace(kc, inMap, audit);
if (log.isTraceEnabled()) {log.trace("Discarding expired lock: {} namespace={} txn={} expired={}",
kc, name, inMap.holder, inMap.expires);
}
} else {
// 标识:锁被其余事务占用,并且未过期,则占用锁失败
// we lost to a valid lock
if (log.isTraceEnabled()) {log.trace("Local lock failed: {} namespace={} txn={} (already owned by {})",
kc, name, requester, inMap);
log.trace("Owner stacktrace:\n {}", Joiner.on("\n").join(inMap.acquiredAt));
}
}
return success;
}
如上述介绍,本地锁的实现是通过 ConcurrentHashMap
数据结构来实现的,在图实例维度下惟一!
4、分布式锁获取第一个阶段:分布式锁信息插入
protected ConsistentKeyLockStatus writeSingleLock(KeyColumn lockID, StoreTransaction txh) throws Throwable {
// 组装插入 hbase 数据的 Rowkey
final StaticBuffer lockKey = serializer.toLockKey(lockID.getKey(), lockID.getColumn());
StaticBuffer oldLockCol = null;
// 进行尝试插入,默认尝试次数 3 次
for (int i = 0; i < lockRetryCount; i++) {
// 尝试将数据插入到 hbase 中;oldLockCol 示意要删除的 column 代表上一次尝试插入的数据
WriteResult wr = tryWriteLockOnce(lockKey, oldLockCol, txh);
// 如果插入胜利
if (wr.isSuccessful() && wr.getDuration().compareTo(lockWait) <= 0) {final Instant writeInstant = wr.getWriteTimestamp(); // 写入工夫
final Instant expireInstant = writeInstant.plus(lockExpire);// 过期工夫
return new ConsistentKeyLockStatus(writeInstant, expireInstant); // 返回插入对象
}
// 赋值以后的尝试插入的数据,要在下一次尝试时删除
oldLockCol = wr.getLockCol();
// 判断插入失败起因,长期异样进行尝试,非长期异样进行尝试!handleMutationFailure(lockID, lockKey, wr, txh);
}
// 解决在尝试了 3 次之后还是没插入胜利的状况,删除最初一次尝试插入的数据
tryDeleteLockOnce(lockKey, oldLockCol, txh);
// TODO log exception or successful too-slow write here
// 抛出异样,标识导入数据失败
throw new TemporaryBackendException("Lock write retry count exceeded");
}
上述只是将锁信息插入,插入胜利标识该流程完结
5、分布式锁获取第一个阶段:分布式锁锁定是否胜利断定
这一步,是在 commit
阶段进行的验证
public void commit() throws BackendException {
// 此办法内调用 checkSingleLock 查看分布式锁的获取后果
flushInternal();
tx.commit();}
最终会调用 checkSingleLock
办法,判断获取锁的状态!
protected void checkSingleLock(final KeyColumn kc, final ConsistentKeyLockStatus ls,
final StoreTransaction tx) throws BackendException, InterruptedException {
// 查看是否被查看过
if (ls.isChecked())
return;
// Slice the store
KeySliceQuery ksq = new KeySliceQuery(serializer.toLockKey(kc.getKey(), kc.getColumn()), LOCK_COL_START,
LOCK_COL_END);
// 此处从 hbase 中查问出锁定的行的所有列!默认查问重试次数 3
List<Entry> claimEntries = getSliceWithRetries(ksq, tx);
// 从每个返回条目标列中提取 timestamp 和 rid,而后过滤出带有过期工夫戳的 timestamp 对象
final Iterable<TimestampRid> iterable = Iterables.transform(claimEntries,
e -> serializer.fromLockColumn(e.getColumnAs(StaticBuffer.STATIC_FACTORY), times));
final List<TimestampRid> unexpiredTRs = new ArrayList<>(Iterables.size(iterable));
for (TimestampRid tr : iterable) { // 过滤获取未过期的锁!final Instant cutoffTime = now.minus(lockExpire);
if (tr.getTimestamp().isBefore(cutoffTime)) {...}
// 将还未过期的锁记录存储到一个汇合中
unexpiredTRs.add(tr);
}
// 判断以后 tx 是否胜利持有锁!如果咱们插入的列是读取的第一个列,或者后面的列只蕴含咱们本人的 rid(因为咱们是在第一局部的前提下获取的锁,第一局部咱们胜利获取了基于以后过程的锁,所以如果 rid 雷同,代表着咱们也胜利获取到了以后的分布式锁),那么咱们持有锁。否则,另一个过程持有该锁,咱们无奈取得锁
// 如果,获取锁失败,抛出 TemporaryLockingException 异样!!!!抛出到顶层的 mutator.commitStorage()处,最终导入失败进行事务回滚等操作
checkSeniority(kc, ls, unexpiredTRs);
// 如果上述步骤未抛出异样,则标识以后的 tx 曾经胜利获取锁!ls.setChecked();}
四:整体流程
总流程如下图:
整体流程为:
- 获取本地锁
-
获取分布式锁
- 插入分布式锁信息
- commit 阶段判断分布式锁获取是否胜利
- 获取失败,则重试
五:总结
JanusGraph 的锁机制次要是通过 本地锁 + 分布式锁
来实现分布式系统下的数据一致性;
分布式锁的管制维度为:property、vertex、edge、index 都能够;
JanusGraph
反对在数据导入时通过后面 一致性行为
局部所说的 LOCK
来开关分布式锁:
- LOCK:数据导入时开启分布式锁保障分布式一致性
- DEFAULT、FORK:数据导入时敞开分布式锁
是否开启分布式锁思考:
在开启分布式锁的状况下,数据导入开销十分大;如果是数据不是要求很高的一致性,并且数据量比拟大,咱们能够抉择敞开分布式锁相干,来进步导入速度;
而后,针对于小数据量的要求高一致性的数据,独自开启分布式锁来保障数据安全;
另外,咱们在不开启分布式锁定的状况下,能够通过针对于导入的 数据的充沛探查
来缩小抵触!
针对于图 schema 的元素开启还是敞开分布式锁,还是依据理论业务状况来决定。
本文有任何问题,可加博主微信或评论指出,感激!
码文不易,给个赞和 star 吧~
本文由博客群发一文多发等经营工具平台 OpenWrite 公布