乐趣区

关于人工智能:Milvus-数据处理流程解剖

编者按:
本文具体解剖 Milvus 2.0 次要的数据处理流程以及拜访接入层(Access Layer)。

分享纲要:

  • 回顾 Milvus 2.0 的架构;
  • 介绍 Milvus 2.0 代码构造的组织形式;
  • 介绍次要的数据处理流程;
  • 具体介绍拜访接入层。

次要数据处理流程

Milvus 2.0 中次要的数据处理流程包含读写门路、建表等数据定义操作以及向量索引构建流程。

在《前所未有的 Milvus 源码架构解析》中,咱们曾经谈到,“Milvus 2.0 依赖 Pub/sub 零碎来做日志的存储和长久化”。Pub/sub 零碎是相似 Kafka 或者 Pulsar 的音讯队列,有这么一套零碎后,其余零碎的角色就变成了日志的消费者,这样保障 Milvus 自身是没有状态的,进而晋升故障复原速度。同时依赖 Kafka 或者 Pulsar 来做数据的可靠性。Pub/sub 零碎的引入能够保证系统的扩展性,Milvus 也能够与更多的零碎做集成。而和这些 Pub/sub 交互的重要接口封装就是 MsgStream。

文本后续行文中出些的诸如 Collection、Shard、Partition 和 Segment 等概念,本文不再赘述。如果读者敌人对这些概念不理解,请参考《前所未有的 Milvus 源码架构解析》这篇综述性文章。

MsgStream 接口

Milvus 2.0 中重要的接口之一就是 MsgStream。

MsgStream 的接口定义如上图左半局部所示。通过 Start 和 Close 能够开启和敞开 MsgStream 对象的后盾协程。一个 MsgStream 对象在被 Start 之后,后盾的 Go 协程会去解决将数据写入到音讯存储系统里或者从音讯存储系统订阅和读取数据等逻辑。

MsgStream 既能够作为生产者(producer)也能够作为消费者(consumer)。AsProducer 接口将该 MsgStream 对象定义为 producer。AsConsumer 接口将 MsgStream 定义为 consumer。留神到这两个接口都有名为 channels 的参数。后面咱们提到 collection 在创立时能够指定 shard 的数目。一个 shard 代表一个 virtual channel,每个 collection 能够有多个 virtual channel。对于 collection 的每一个 virutal channel 在音讯存储系统中都有一个 channel 与其对应,为了做辨别,咱们将音讯存储系统中的 channel 称之为 physical channel。AsProducer 和 AsConsumer 的 channels 参数代表的就是音讯存储系统中 physical channel 的名字列表。这些 channels 限定了 MsgStream 对象的写入或者生产的 physical channel 的范畴。

通过 Produce 办法将数据写入到音讯存储系统中的 physical channel 里。有两种写入模式:繁多写入模式和播送写入模式。繁多写入模式是通过写入数据中 entity 的主键 hash 值确定的 shard(virtual channel)进而决定数据写入的 physical channel。为播送写入模式是将数据写入到 channels 参数指定的所有的 physical channel 里。

Consume() 是一个阻塞式的接口。调用这个接口时如果 physical channel 里没有数据,协程会阻塞。

Chan() 返回的是 Go 语言定义的 channel,目标是提供一种非阻塞的生产数据的形式。比方应用 select 语句能够做到有数据可读时才会进入相应的数据读取和解决逻辑里,而当无数据可读时协程能够去解决其余逻辑而不必阻塞期待。

Seek() 服务于宕机复原。消费者生产到某个地位之后,会记录以后生产到的地位。这个地位须要写到 meta 里的,当新起一个节点来接管工作后,它是能够调这个 Seek 接口,传入宕机前生产的地位,接着上次的地位再接着生产。

写门路

接下来咱们来看一下写门路。这里写门路里流经的是写入到 collection 中的数据。写入的数据既能够是 insert 音讯也能够是 delete 音讯。这些音讯(entity) 会被写入到不同的 virtual channel(shard)里。对于这些 virtual channel,咱们也称之为 DmChannels(data manipulation channels).

须要指出的是,不同的 collection 可能会共享音讯存储系统中的 physical channel。一个 collection 在创立时能够指定很多个 shard(virtual channel),因此该 collection 的数据也就会流经音讯存储中的多个 physical channel。这样有个益处是能够在写的时候能够大量并通过依赖音讯存储系统的并发的个性进步写吞吐。咱们的初步设定是每一个 collection 能够在底层复用雷同的物理 channel,这样物理 channel 维持在一个固定大小,而后 collection 级别的 virtual channel 能够很多,而且不同 collection 之间也能够共用 physical channel。

这里须要指出的是 collection 在创立时不仅指定了 shard 的个数,也会确定 virutal channel 和音讯存储中 physical channel 之间的映射关系。

在写门路中,拜访接入层 proxy 作为生成者会通过 MsgStream 对象的 produce 接口将数据写入到音讯存储系统里,同时 data node 作为消费者生产数据之后,依照工夫窗口以及每个 segment 的阈值大小,定期将这些生产到的数据转换并存到对象存储中。同时存储的门路是一个 meta 信息,须要通过 RPC 去告诉 data coordinator,data coordinator 将这些 Binlog paths 记录到 etcd 里。

既然不同的 collection 可能共用音讯存储系统的 physical channel,那么 data node/query node 生产数据是须要辨别该 channel 中数据的归属问题。因而引入 flowgraph 这个对象,它能够负责对 physical channel 中的数据依据 collection 的 ID 做过滤。能够认为一个 flowgraph 负责相应 collection 中的一个 shard(virtual channel)中的数据流。

什么时候创立 MsgStream 呢?对于 proxy 来说,它是在解决 insert 申请时创立的。当 proxy 收到一个数据写入申请时,它首先询问 root coordinator 拿到 virtual channel 和 physical channel 的映射关系,而后结构一个 MsgStream 对象。

作为消费者,data node 创立 MsgStream 对象的机会则在 data node 启动之后。data coordinator 将 collection 的 virtual channel 在不同的 data node 做好调配后,会将调配信息写入 etcd。data node 启动之后能够读取这个 etcd 中的调配信息,就可知其负责的 virutal channel 及相应的 physical channel 而后创立 MsgStream 对象。以上图右半边所示,data node 1 负责 V1、C1、V2、C2,data node 2 负责 V3、C5、V4、C6。

读门路

Milvus 是一个典型的 MPP 架构的零碎。每个 query node 的搜寻是并行执行的,proxy 聚合最终的后果返回给客户端。在读门路中,查问申请通过 DqRequestChannel 进行播送,而查问后果通过 gRPC 汇总到 proxy。

proxy 作为生产者,将查问申请写入到 DqRequestChannel 中。query node 生产 DqRequestChannel 的形式比拟非凡:每个 query node 的都会订阅这个 channel,这样该 channel 中的每条音讯会播送给所有的 query node。

query node 收到申请之后,本地做查问,并以 segment 为粒度做一次聚合,将聚合后的后果通过 gRPC 发送给相应的 proxy。须要指出的是,在查问申请里有惟一的 ProxyID 标识查问的发起方。query node 据此将不同查问后果路由到相应的 proxy。

proxy 断定收集到所有 query node 的查问后果后,做一次全局的聚合失去最终的查问后果,并将查问后果返回给客户端。须要指出的是在查问申请和查问后果里有雷同且惟一 requestID 能够标记查问自身,proxy 据此辨别哪些查问后果归属于同一个查问申请。

Milvus 2.0 设计要求是流批对立摄取的,query node 等查问节点也须要从音讯存储中摄取实时流数据。因而 query node 同样须要引入 flowgraph 对象对数据做过滤,以对归属不同表的数据做隔离。

query node 是什么时候创立这个 MsgStream 对象的呢?

在 Milvus 的用户侧,提供了一个 load collection 的操作接口,其含意分两局部:第一是将批数据从对象存储中加载到 query node 中;第二是对接到 MsgStream 里可能接管流式数据。这样能够保证数据的完整性。表只有通过 load 之后能力执行读操作。

proxy 收到一个对于表的 load 申请后,会将该申请转发到 query coordinator。quary coordinator 来决策 shard(virtual channel)在不同的 query node 上的调配形式。这种调配信息以函数调用或者 RPC 的形式发送给 query node。query node 收到调配信息后,创立对应的 MsgStream 对象来生产数据。这些调配信息包含 vitural channel 名字及其和相应 physical channel 的映射关系。

在 query node 里,查问后果是来自两局部:第一局部是批量数据查问失去的后果。这些批量数据是从对象存储中加载所有的 sealed segment。第二局部来自于从音讯存储中生产的实时数据的查问后果。这些实时数据也会造成一些 segment,这些 segment 被称为 growing segment。query node 须要对这两部份查问后果做一个本地的聚合。本地聚合之后再将后果发送给相应的 proxy。

DDL 流程

DDL 示意的是 data definition language。针对元数据操作的申请也分为读和写两类,不过解决这些申请的流程是一样的,并不辨别读写。
读类型的元数据操作包含,查问表的 schema、查问索引信息等;写类型的元数据操作包含创立表、删表、建索引和删除索引等等。

客户端将 ddl 申请发送至 proxy,proxy 须要对这些申请做一个定序并打上工夫戳,而后将申请转发到 root coordinator 并期待其返回后果。这里的工夫戳指的是 root coordinator 调配的全局混合工夫戳。这意味着对于每个 ddl 的申请,proxy 都会从 root coordinator 申请一个工夫戳。proxy 对于每个 ddl 申请的解决是串行执行的,每次只解决一个 ddl 申请,以后 ddl 申请解决完并且收到反馈后果后才会执行下一个 ddl 申请。proxy 收到 root coordinator 的后果后,将其返回给客户端。

root coordinator 次要做的工作就是对申请做一些动静查看,查看通过后执行相应的逻辑。

须要重点留神的是,root coordinator 在设计上要确保 ddl 操作依照工夫戳升序程序执行。

举个例子,咱们能够看到上图里,root coordinator 的 task queue 包含 k 个操作,别离是 ddl1、ddl3…… ddlk,数字代表工夫戳。root coordinator 会对该 task queue 中的申请依照工夫戳递增的程序顺次执行,并且记录以后曾经执行结束的最大工夫戳。在分布式部署形式下,proxy 和 root coordinator 之间的通信是通过 gRPC,两个独立组件,申请达到的程序不肯定严格依照工夫戳先后。假如以后 task queue 中执行结束的最大工夫戳为 k,来自 proxy1 的 ddl(K-1) 达到时发现 ddlK 曾经被执行了,那这个时候 ddl(k-1) 就会被回绝进入 task queue,否则就会突破所有申请依照工夫戳递增程序执行的约定。而来自 proxy2 的 申请 ddl(k+5) 则被容许进入 task queue 中。

建索引流程

建索引的过程在 Milvus 零碎外部来看,是一个长期的异步的过程。
当客户端发动建索引的申请之后,proxy 收到该申请首先做一些动态查看,通过后将该申请转发到 root coordinator。root coordinator 将这些建索引的申请长久化到 KV 存储中,就立马返回给 proxy,proxy 返回给 SDK。既然是异步,那就须要有状态,以便须要查问索引建设的进度或者状态。

在用户的视角上,建索引针对的是向量 field,而向量 field 的数据在物理上是由一个个 segment 组成的。建索引是在 segment 粒度上进行的,因而 root coordinator 须要向 index coordinator 发动针对每个 sealed segment 的建索引申请。

下面这张图是每个 segment 其上的 Index 状态变动的一个过程。index coordinator 收到 root coordinator 发来的建索引的申请后,首先会将该工作长久化到 meta store 中。索引工作的初始状态是 Unissued。index coordinator 保护一个记录每个 index node 负载的优先级队列,抉择一个负载比拟低的 index node,将这个工作发送到 index node 去做。index node 建完索引后会把胜利 / 失败状态写入到 meta store 中。index coordinator 通过感知 meta store 中索引状态的变动。如果因为系统资源或者 index node 失活等可复原的失败起因,index coordinator 会从新触发这个流程,抉择另外一个 index node 从新做索引构建的工作。

index coordinator 还须要负责回收那些被标记删除的索引工作及其相应的索引文件。这里咱们能够看到 一个名为 recycleIndexFiles 接口,它的次要作用是将被标记删除的索引工作相应的索引文件从对象存储中删除。

当客户端发送索引的 drop 申请之后,root coordinator 会标记这个索引被 drop,而后立马返回给客户端。索引的 drop 也是一个异步的过程。root coordinator 告诉 index coordinator 蕴含属性 IndexID 的索引须要被标记删除。每个 segment 的索引都记录属性 IndexID,它惟一标识表中向量 field 上的索引。index coordinator 依据这个 IndexID 为过滤条件,将所有索引工作中匹配到属性 IndexID 的索引工作标记为删除。index coordinator 有一个后盾的协程,逐步将所有标记为删除的工作对应的索引文件从对象存储中删除,当该索引工作对应的 索引文件被全副删除后,再将改索引工作的 meta 信息从 meta store 中移除。

Access Layer 代码

proxy 把所有的申请分为三类:

  • DdRequest(data definition request)
  • DmRequest(data manipulation request)
  • DqRequest(data quary request)

proxy 针对每个具体的申请封装一个 task 类,实现通用的 preExecute、Execute、postExecute 三个规范流程,在规范流程里,实现动态查看、预处理等。同时,proxy 会对每一个申请调配工夫戳和全局 ID 标记申请。
上方图中左边展现了 proxy 和其余零碎所有次要组件的交互,以及交互中的数据。

proxy 的调度逻辑如下:proxy 把申请分为三类,每一类都有一个对应的 task queue;来自 SDK 的申请都会被封装成一个 task,并放入对应的 task queue 里;针对不同的 task queue 后盾有不同的调度逻辑。

对于 data definition request 类型申请的队列,其中的申请是串行执行的,流水线次要分为五个步骤。首先是进队(enqueue)操作,在这里须要设置一个工夫戳,给这个操作定序,同时设置 ID 惟一标识该申请,接着把它放入到一个待办的 unissuted tasks 列表里。而该 task queue 的 schedule 就产生在步骤 2 和 3 之间。

schedule 的过程就是将一个工作从 unissuted tasks 取出搁置到 active task 列表中。当工作搁置到 active task 之后,它外面的每个工作都会程序执行 preExecute、Execute、postExecute 三个操作,最初 从 active task 列表中删除。任何一个申请工作须要残缺地解决完,其中任何一个环节产生谬误,都会提前退出流水线并返回错误信息。

DmTaskQueue 的特点就是它能够并发执行。第一个 enqeue 的步骤和 DdTaskQueue 中 task 的 enque 逻辑雷同,也会经验设置工夫戳、设置 ID 等步骤,区别点在于步骤二和步骤三,针对该 DmTaskQueue 的调度是一次取出多个工作,每个协程解决一个工作的后续流水线步骤。

proxy 须要缓存一些重要的对象和数据,Cache 性能的实现位于 GlobalMetaCache 这个类。它次要缓存两大部分数据,第一局部是 name 到 ID 映射,客户端看到的是 name 而零碎中下游看到的都是相应对象的 ID,第二局部是每个 collection 的 schema 等重要元信息。proxy 须要大量做一些后期的动态查看,因而为了防止常常向 root coordinator 询问元数据,须要增加缓存。当然 Cache 也应该有清理机制,当 root coordinator 执行了一个表的元信息的更改操作,会告诉所有 proxy 其上对于该表的元信息缓存生效。

ChannelMgr 这个类次要保护了 virtual channel 到 physical channel 的映射,以及治理相应的 MsgStream 对象。上图右侧次要列出了 ChannelMgr 的次要接口。

曹镇山
Zilliz 高级研发工程师
Milvus 我的项目 maintainer
毕业于华中科技大学,次要趣味方向包含分布式系统、数据库和大数据处理。在 Milvus 社区次要负责多个外围模块的编写和保护,是 Milvus 我的项目的 maintainer。除了敲代码,他也喜爱钻研心理学(尤其是立功心理学和踊跃心理学)和爬山。他的 Github 账号:czs007。

退出移动版