由前文知道每个 BSPServiceWorker 有一个 WorkerServer 对象,WorkerServer 对象里面又有 ServerData 对象,作为数据实 。ServerData 中包含该 Worker 的 partitionStore、edgeStore、incomingMessageStore、currentMessageStore、聚集值等。其中 incomingMessageStore 对象为 MessageStoreByPartition(接口) 类型,也就是说消息时按照分区来存储的。MessageStoreByPartition 接口的关系图如下:
在 SimpleMessageStore 抽象类中,有一个 ConcurrentMap<Integer,ConcurrentMap<I,T>> 类型的变量 map,用来存储消息。第一层是 pairtitionID 到发送到该 partition 消息的映射;第二层是 VertexID 到发送给该 Vertex 的消息队列。
《Giraph 通信模块分析》:http://my.oschina.net/skyaugu…
每个顶点的消息列表具体为 ExtendedDataOutput 类型,它继承 DataOutput 接口,增加了几个方法而已。每个消息是以字节形式写入到 ExtendedDataOutput 对象中的。
发送消息时,采用异步式通信。
图顶点的计算处理与消息通信并发执行,在计算过程中就可以发送消息,将大规模消息发送分散在不同的时间段,避免瞬时网络通信阻塞,但是接受端需要额外的空间,存储临时接收到的消息,相当于空间换时间。而集中式通信,图顶点的计算处理与消息通信串行进行,在计算完毕后,统一发送消息,控制和实现方式简单,可在发送端对消息进行最大程度优化,但容易造成瞬时间的网络通信阻塞以及增加发送端的消息存储开销。
不同 Worker 间的消息通信使用 RPC 方式,具体为 Netty。同一 Worker 内,连续两次迭代的消息直接通过内存操作,把要发送的消息直接复制到 Worker 的 incomingMessageStore 中。下面详述消息的存储格式和发送机制。
Giraph 使用 Cache 来缓存消息,当消息达到一定阈值后,一次性发送。
既按照 bulk 模式进行,不会一条一条信息发送。向某个顶点发送的消息是按照 <destVertexId,Message> pair 存储在 ByteArrayVertexIdData<I,T> 中(实际为 ByteArrayVertexIdMessages<I,M> 类型)。介绍如下:org.apache.giraph.utils.ByteArrayVertexIdData<I,T>
功能:把 < 顶点 ID,data> Pair 存储在一个 byte 数组中。里面有 ExtendedDataOutput 对象用来存储数据。
该类中还有一个内部类:VertexIdDataIterator,该内部类继承 VertexIdIterator 类。
org.apache.giraph.comm.SendCache 用来缓存发送的信息,然后以“Bulk”模式发送。在 Giraph 中,每个 Worker 上可以对应多个分区。消息缓存的阈值是以 Worker 为单位计算,而不是 Partition。
SendCache 中有 ByteArrayVertexIdData<I,T>[] dataCache 数组用来存储发送给每个 Partition 的消息;有 int[] dataSizes 数组用于记录向每个 Worker 发送的消息大小,若大于 MAX_MSG_REQUEST_SIZE(默认为 512KB)就把此 Worker 上的所有 Partition 缓存的消息发送到给该 Worker,同一 Worker 内消息也是如此缓存;有 int[] initBufferSizes 数组用于记录每个 Worker 上的每个 Partition 的初始化 ByteArrayVertexIdData 中 ExtendedDataOutput 对象的大小,同一 Worker 上的所有 Partition 初始值相同,该值为平均值。记 MAX_MSG_REQUEST_SIZE(message request size)值为 M,该 Worker 上有 P 个 partitions,ADDTITIONNAL_MSG_REQUEST_SIZE(比平均值大的因子)默认为 0.2f,记为 A。则每个 Partition 的初始大小为:M*(1+A) / P .
由前文知道,每个 Worker 都有一个 NettyWorkerClientRequestProcessor<I,V,E,M> 用来发送消息。该类中有 SendMessageCache 对象用来缓存向外发送的信息。NettyWorkerClientRequestProcessor 类中的 sendMessageRequest(I,M)
方法如下,用于向某个顶点 destVertexId 发送消息 message。
方法解释 :首先根据 destVertexId 得到对应的 partitionId 和 WorkerInfo,然后把消息 add 到 SendMessageCache 中,并返回向该顶点所属 Worker 发送的消息大小 workerMessageSize。若该值大于默认值 512KB,则把此 Worker 对应的所有 Partition 消息从 SendMessageCache 中删除,把删除的消息赋值给 workerMessages,其类型为 PairList<Integer,ByteArrayVertexIdMessages<I,M>>,key 为 partitionId,value 为发送给该 partition 的消息列表,最后调用 doRequest() 方法发送信息。doRequest()方法如下:
可以看到在发送消息时,先判断是否在同一 Worker 上。如果是的话,调用 SendWorkerMessagesRequest<T,M> 的 doRequest 发送消息;否则使用 WorkerClient(底层使用 Netty)进行消息发送。下面着重讨论同一 Worker 内的机制。
org.apache.giraph.comm.requests.SendWorkerMessagesRequest 类中的 doRequest 方法如下:
参数为该 Worker 的 ServerData,代码中的 partitionVertexData 实际为 PairList<Integer,ByteArrayVertexIdMessages<I,M>>workerMessages。遍历 <partitionID,对应的消息列表 > 来添加到 ServerData 中的 incomingMessageStore 中。
ByteArrayMessagesPerVertexStore 类中的 addPartitionMessages()方法如下:
当用户使用了 Combiner,incomingMessageStore 对应的类型则为 OneMessagePerVertexStore,该类为每个顶点只存储一个消息,而非消息队列。结构如下图:
当添加一条消息时,会把顶点已对应的消息和要添加的消息调用 combine()方法进行合并,然后存储在上述结构图中。addPartitionMessages()方法如下:
在 ComputeCallable 中的 call()方法调用 computePartition(Partition)计算完所有 Partition 上的顶点后,调用 WorkerClientRequestProcessor.flush()方法把所有剩余的消息发送出去。