共计 3181 个字符,预计需要花费 8 分钟才能阅读完成。
概要 Atlas 通过 AtlasEntityStoreV2.createOrUpdate 函数进行数据写入,了解 AtlasEntityStoreV2.createOrUpdate 这个重要函数有助于理解 Atlas 工作流程、优化写入性能。本文主要梳理 createOrUpdate 中各子模块的功能和逻辑。
源码解析函数格式:EntityMutationResponse createOrUpdate(EntityStream entityStream, boolean isPartialUpdate, boolean replaceClassifications) 数据写入过程中的上下文信息存于 EntityGraphDiscoveryContext context 对象中,数据格式如下:
public final class EntityGraphDiscoveryContext {
private final AtlasTypeRegistry typeRegistry;
private final EntityStream entityStream;
private final List<String> referencedGuids = new ArrayList<>();
private final Set<AtlasObjectId> referencedByUniqAttribs = new HashSet<>();
private final Map<String, AtlasVertex> resolvedGuids = new HashMap<>();
private final Map<AtlasObjectId, AtlasVertex> resolvedIdsByUniqAttribs = new HashMap<>();
private final Set<String> localGuids = new HashSet<>();
step1:通过 preCreateOrUpdate 函数对 entityStream 预处理:找出 entityStream 中所有 entity 的 AtlasObjectId,判断 entity 需要进行创建还是更新操作,创建新 vertex
a) AtlasEntityGraphDiscoveryV2.discover,遍历所有 entity 的 AtlasObjectId,放入到 context 中:
情况 1)AtlasObject.guid!=null 时将 guid 放入 context.referencedGuids 中;
情况 2)若 AtlasObject.id 为 null,则表示该 entity 已经写入图数据库,将 AtlasObject 放入 context.referencedByUniqAttribs
b) AtlasEntityGraphDiscoveryV2.resolveReferences,判断 entity 是否在图数据库中存在:
情况 1)若 context.referencedGuids 中的 guid 在图数据库中存在对应 vertex,将 guid 和 vertex 放入 context.resolvedGuids 中;
情况 2)若 context.referencedGuids 中的 guid 在图数据库中不存在对应 vertex,将 guid 放入 context.localGuidReference 中;
情况 3)根据 context.referencedByUniqAttribs 中的 AtlasObjectId 找到对应顶点,将顶点放入 resolvedIdsByUniqAttribs 中,并将 AtlasObjectId 放入
c) 对不存在对应 vertex 的 entity 创建 vertex,并放入 resolvedGuids d) 将需要创建的 entity 和需要更新的 entity 分别放入 EntityMutationContext.entitiesCreated 和 EntityMutationContext.entitiesUpdated 中
step2:对比 entityStream 中的 entity 和图数据库中的 vertex,判断是否有属性发生变化,忽略不需要更新的 entity,核心代码:
for(AtlasAttribute attribute:entityType.getAllAttributes().values()){
if(!entity.getAttributes().containsKey(attribute.getName())){// if value is not provided, current value will not be updated
continue;
}
Object newVal=entity.getAttribute(attribute.getName());
Object currVal=entityRetriever.getEntityAttribute(vertex,attribute);
if(!attribute.getAttributeType().areEqualValues(currVal,newVal,context.getGuidAssignments())){
hasUpdates=true;
if(LOG.isDebugEnabled()){
LOG.debug(“found attribute update: entity(guid={}, typeName={}), attrName={}, currValue={}, newValue={}”,guid,entity.getTypeName(),attribute.getName(),currVal,newVal);
}
break;
}
}
注意:当 attribute 为 referred AtlasObjectId 时,客户端定义的 guid 是 UnAssigned 的,必然和数据库中存储的 guid 值不同,这会造成元数据的重复更新,这个目前 Atlas 有待改进的地方。举个例子,Hive_Table 的 db 属性为 Hive_Db 的 AtlasObjectId, 客户端向 Atlas Server 重复创建 Hive_Table,db 属性的值为 AtlasObjectId{typeName=hive_db, guid=-1554620941},每次 guid 值为当前时间戳而不同,造成 table 元数据重复更新。
step3:操作权限验证:通过 AtlasAuthorizationUtils.verifyAccess 函数验证发起请求的用户是否有权限对各个 entity 进行写操作
step4:EntityGraphMapper.mapAttributesAndClassifications 为 vertex 更新 attributes,关于 entity 和 vertex 属性的映射,可以参考文章元数据治理框架 Atlas 研究——JanusGraph 图数据库对象关系映射
step5:通过 entityChangeNotifier.onEntitiesMutated 为 vertex 创建全文索引,并通知 audit 模块记录所有的变更操作注:在整个数据写入过程中,创建全文索引这一步骤会占用超过 60% 的时间,如果在实际使用中不需要用全文索引的功能,可以修改源码注释掉相应 doFullTextMapping 函数
step6:整个数据写入过程中,我们并未提到 Atlas 如何调用 JanusGraph 的 api 来向图数据库写入数据。其实,Atlas 只需要通过 JanusGraph api 中的 vertex、edge 对象维护数据的图结构即可。Atlas 对数据读写函数都添加了 @GraphTransaction 注解,这个注解确保在函数运行结束后调用 graph.commit() 函数将当前事务内的变更提交图数据库。具体的实现可以见 GraphTransactionInterceptor 类。