sofa-jraft在同程游览中的实际
同程艺龙作为对 raft 钻研较早的公司,早在14年算法的 paper 刚公布的时候,便曾经对其进行了调研,同时也与 paxos 、zab 等算法进行了具体的比照,并在公司外部的计数器、任务调度元信息存储等场景进行试点。不过晚期对于 raft 的尝试较多的是在 cs++ 技术栈试水,在 java 技术栈里却很少有波及。近期刚好有基于 etcd 的老我的项目因为须要自定义的数据结构须要重构,原有的 etcd 无奈在底层数据结构层面满足需要,因而决定采纳 java 技术栈联合开源我的项目 sofa-jraft 进行底层数据存储的开发,以期解决多节点数据强统一的问题。本文假如读者对 raft 及强统一的概念曾经有了较深的了解,具体介绍了公司外部如何应用 jraft 的进行老零碎的革新以及应用过程中遇到的工程问题,心愿其余对 raft 有趣味的同学能够一起探讨。
一、背景
公司外部本来存在一个零碎 mdb (metadata database),go 语言编写,用于治理所有的实例元数据信息,元数据的内容就是一个 map。该组件提供对元数据增删改查的接口,并且应用 go 语言编写,在检索数据时引入了 k8s selector 的包,应用 k8s selector 的解析规定筛选特定标签的元数据信息。数据长久化则是实用了强统一组件 etcd 进行存储,key 为元数据的 id,保障惟一,value 为具体的元信息,蕴含各个标签以及对应的值。
该零碎大体架构如图-1所示:
图-1:原来的架构
该架构的弊病:
- 每隔一段时间须要去拉取 etcd 的全量数据,放心单次申请数据量太大的状况,对数据id进行了 hash 分片,顺次获取每个分片下个 key,再通过 key 去获取 value,导致 etcd 的查问频率十分高
- 非 id 查问条件的一致性查问,和下面的问题一样,也须要做拉取全量数据的操作
- 更新删除操作也是一样,须要拉取全量数据再操作
剖析以上问题能够发现,应用 etcd 作为强统一存储,但 etcd 是基于 KV 存储的组件,并且解析组件 mdb 和 etcd 是拆散的,在须要保证数据最新的状况下,必须得去 etcd 拿最新的数据到本地再进行操作。而 etcd 基于 KV,就得拿到 etcd 的全量数据都给拉到本地后再做操作。
如果有一个组件,提供强统一的存储能力,又能间接去解析 k8s selector 的规定,存储的数据结构和元数据信息更加亲和,那么两头的那一层 mdb 就能够间接去掉了,由这个组件来解析对应的 crud 规定,将解析过后的规定间接在本地查问,那么以上问题就可能间接解决了。
二、革新
基于以上问题,咱们筹备本人开发一个强统一存储的组件,可能本人解析 k8s selector 的规定,并且将数据保留在本人本地。因为集体对 java 语言比拟理解,并且之前应用 nacos 时,对 sofa-jraft 也有肯定理解,最终抉择了 sofa-jraft 来构建强统一存储组件,将它命名为 mdb-store。
次要革新点:
- 应用 sofa-jraft 编程模型构建业务状态机,业务状态机中依据 raft log 中的 data 类型,进行 crud 的操作。
- mdb-store 提供与原来 mdb 雷同的 api,让革新对用户通明,革新实现后只须要切换域名对应的实例即可。
- 迁徙 k8s selector 的解析逻辑,这里用 java 写了一套和 go 版本 k8s selector 一样解析逻辑的组件 k8s-selector-java,目前该组件已开源 k8s-selector-java
革新过后的架构如图-2所示:
图-2:重构后的架构
通过革新过后,将 mdb 移除,让用户间接和 mdb-store 进行通信,两头的通信少了一跳,放慢了拜访效率。将 mdb 和 etcd 也进行了移除,缩小了整个零碎的组件,升高运维老本。
三、sofa-jraft的具体应用
3.1 将写操作转换成 raft log
在 sofa-jraft 中,编程模型和通常的 Spring MVC 的编程模式不太一样。在 Spring MVC 中,一个申请达到了后端,通常会通过 Controller -> Service -> Processor 这么几层。Controller 负责本次 http 申请的资源映射, 再由 Controller 去调用特定的 Service 办法,在 Service 层中,对参数进行一些解决和转换,再交由 Processor 层去对申请做真正的解决。
大体逻辑如图-3所示
图-3:通常的编程模型
在 sofa-jraft 中,所有的写操作都要通过状态机去执行(读操作不须要通过状态机)。须要将写操作转换成 task,状态机去利用 task 而后再进行业务解决。task 中蕴含两个属性是须要关注的,一个是 done,一个是 data。done 就是本次 task 被状态机解决实现后的回调,比方在 done 的回调逻辑中,将 response flush 到客户端。data 就是 raft log 中的具体数据,比方要执行一条插入元数据的命令。data 就会蕴含本次操作的类型(插入),以及本次操作的具体数据。
javapublic class Task implements Serializable { private ByteBuffer data = LogEntry.EMPTY_DATA; private Closure done; /// 省略局部代码}
大体逻辑如图-4所示
图-4:sofa-jraft的编程模型
所有的操作都形象成一个实体 Operation,在 Service 层,就依据业务把参数转换成不同的 Operation,而后再将 Operation 序列化,转换成 Task 实体,再由 Node 将 task 提交。这里能够将 task 看成是一个 raft log,一旦 raft log 被半数的机器给提交。状态机就会利用 raft log,利用实现之后就会触发 done 中的回调。
- 形象过后的实体类
class Operation<T> { //操作类型,比方增删改查 int type; //操作的哪个表,某些类型不须要此字段 String table; //具体的操作数据,依据type的不同,数据类型也会不同 T params;}
- 构建task并通过node提交给状态机
final Task task = new Task();//定义回调的逻辑,当该 raft log 被状态机利用后,会进行回调task.setDone(new StoreClosure(operation, status -> { StoreStatus storageStatus = (StoreStatus) status; closure.setThrowable(storageStatus.getThrowable()); closure.setResponse(storageStatus.getResponse()); closure.run(storageStatus);}));//将operation进行序列化,在状态机中会将该值反序列化还原,再交给processor解决task.setData(ByteBuffer.wrap(serializer.serialize(operation)));node.apply(task);
3.2 状态机的实现
onApply
onApply 是状态机的外围性能,其目标就是接支出参中的 raft log 以及 done,而后将 raft log 中的数据反序列化,交由本人的业务处理器去进行解决。解决实现之后,触发 done 的回调,这里就和 Node.apply(task) 关联上了。
while (iter.hasNext()) { Status status = Status.OK(); try { if (iter.done() != null) { // 阐明以后状态机是 Leader,能够间接从 closure 中获取操作数据 closure = (MdbStoreClosure) iter.done(); operation = closure.getOperation(); } else { // 以后状态机不是 Leader,通过对 raft log 中的数据进行反序列化操作,还原操作数据 ByteBuffer data = iter.getData(); operation = serializer.deserialize(data.array(), Operation.class); } //业务处理器进行业务解决,业务处理器中会判断operation的类型,抉择不同的解决逻辑 OperationResponse result = mdbStoreProcessor.processOperation(operation); //将result序列化 GrpcResponse response = GrpcResponse.newBuilder().setSuccess(true) .setData(ByteString.copyFrom(serializer.serialize(result))).build(); Optional.ofNullable(closure) .ifPresent(closure1 -> closure1.setResponse(response)); } catch (Throwable e) { status.setError(RaftError.UNKNOWN, e.toString()); Optional.ofNullable(closure).ifPresent(closure1 -> closure1.setThrowable(e)); throw e; } finally { //对task中的done进行回调 Optional.ofNullable(closure).ifPresent(closure1 -> closure1.run(status)); } //将raft log的生产地位+1,示意以后这个raft log曾经被胜利利用了 iter.next(); }
onSnapshotSave
在初始化 sofa-jraft node 时,存在一个参数,NodeOptions#snapshotUri。该参数设置后就会开启snapshot 机制,个别是举荐设置。开启实现之后,每隔30min就会进行一次 snapshot(这里须要留神的是,30分钟内有raft log提交时,才会进行snapshot)。在进行 snapshot 的时候,须要把以后的数据进行长久化操作。在 snapshot 实现后,就会将 snapshot 中最初一条 raft log 之前的 raft-loig全副删除。其意义就是防止 raft log 始终减少,导致磁盘占用飙高。
snapshot 机制能够这么去了解,在 sofa-jraft 中,业务 processor 中的操作都是状态机驱动的,而状态机又是由 raft log 驱动。那么 processor 中数据的最终状态其实就是所有的 raft log 利用的总和。比方存在一个 raft log,其业务含意是 i++。10条 raft log 被状态机利用后,驱动 processor 进行10次i++操作,最终的值就是为10。利用就算解体重启后,重启时,他会去利用之前的10条i++的raft log,processor 中的值也还是10。应用 snapshot 机制,在进行 snapshot 时,把 processor 中的10进行长久化,长久化实现过后,将前10条 raft log 进行删除,后续再来2条 i++的 raft log,processor的值变为12,存在2条i++的 raft log。利用就算解体重启,那么它首先也会读取 snapshot 中的数据10,再去利用2条i++的 raft log,最终数据也是12,和解体之前保持一致。
Processor的最终态 = snapshot + raft log
MdbStoreStoreSnapshotFileImpl mdbStoreStoreSnapshotFile = (MdbStoreStoreSnapshotFileImpl) snapshotFile; String tempPath = snapshotPath + "_temp"; File tempFile = new File(tempPath); FileUtils.deleteDirectory(tempFile); FileUtils.forceMkdir(tempFile); //记录总共的table数量 mdbStoreStoreSnapshotFile .writeToFile(tempPath, "tailIndex", new TailIndex(persistData.size())); //将每一个table中的数据都进行长久化 for (int i = 0; i < persistData.size(); i++) { mdbStoreStoreSnapshotFile.writeToFile(tempPath, combineDataFile(i), new TablePersistence(persistData.get(i))); } File destinationPath = new File(snapshotPath); FileUtils.deleteDirectory(destinationPath); FileUtils.moveDirectory(tempFile, destinationPath);
onSnapshotLoad
onSnapshotLoad 的几个触发场景。
当一个节点重新启动时。
- 当 Follower 中的 commit-index 都小于 Leader 中 snapshot 的最初一条 raft log 时(Follower太落后了,Follower 须要的 raft log 曾经被 Leader 的 snapshot 机制删除了)
onSnapshotLoad 和下面的 onSnapshotSave 是成对的,这里只须要把之前保留的文件中的内存读取,而后再进行反序列化,增加到 processor 中的数据容器即可。
MdbStoreStoreSnapshotFileImpl mdbStoreStoreSnapshotFile = (MdbStoreStoreSnapshotFileImpl) snapshotFile; //读取总共的文件数 TailIndex tailIndex = mdbStoreStoreSnapshotFile .readFromFile(snapshotPath, TAIL_INDEX, TailIndex.class); int size = tailIndex.data(); for (int i = 0; i < size; i++) //挨个读取文件,将文件内容进行反序列化 TablePersistence tablePersistence = mdbStoreStoreSnapshotFile .readFromFile(snapshotPath, combineDataFile(i), TablePersistence.class); TableDataDTO data = tablePersistence.data(); Table table = new Table(data.getName(), new HashSet<>(data.getIndexNames()), data.getRetryCount()); for (Record dataData : data.getDatas()) { table.addRecord(dataData); } //将数据丢给 processor 中的数据容器 dataComponent.putData(table.getName(), table); }
状态机的其余状态变更的办法
一般来说,节点的状态是不会发生变化的,一旦发生变化,就须要去剖析利用的状态了,察看节点是否失常。StateMachine 提供了状态回调的接口,咱们在回调中对接外部的监控零碎,当状态机的节点状态发生变化时,会实时告诉到保护人员,保护人员再对利用进行剖析排查。
3.3 应用 read-index read 进行读操作
依照 raft 论文失常来说,读写操作都只能由 Leader 进行解决,这样可能保障读取的数据都是统一的。这样的话,读申请的吞吐就没方法减少。对于这个 case,sofa-jraft 提供了 read-index read,能够在 Follower 中进行读取操作,并且能保障在 Follower 中读的后果和在 Leader 中读的后果统一。对于read-index read 能够参考 pingcap 的这篇博客 https://pingcap.com/zh/blog/l...。
com.alipay.sofa.jraft.Node#readIndex(final byte[] requestContext, final ReadIndexClosure done) ,第一个参数是发动 read-index read时的上下文,能够在回调中应用。第二个参数就是具体的回调逻辑,须要在 run 办法中实现读取逻辑。
- read-index read 编程模型
CompletableFuture future = new CompletableFuture<>(); node.readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() { @Override public void run(Status status, long index, byte[] reqCtx) { //状态ok,阐明能够通过 read-index 去进行读取 if (status.isOk()) { try { //间接应用 processor 查问数据,不通过状态机 OperationResponse<T> res = (OperationResponse<T>) mdbStoreProcessor .processOperation(operation); future.complete(res); } catch (Throwable t) { future.completeExceptionally( new IllegalStateException("Fail to read data from processor", t)); } } else { //状态不ok,可能是超时,也可能是状态机异样等其余起因 if (Operation.ALL_DATA == operation.getType()) { //这里判断是不是读取全量的数据,读取全量数据的话,须要疾速失败,不能转到 leader 走 raft log读取, //起因见 4.3 future.completeExceptionally(new IllegalStateException( "Fail to get all data by read-index read, status: " + status .getErrorMsg())); } else { //通过将本次申请转发到 Leader 中,走 raft log,在 Leader 的状态机中把本条 raft log 利用后,再 //返回数据给 Follower LOGGER.warn("ReadIndex read failed, status: {}, go to Leader read.", status.getErrorMsg()); readFromLeader(operation, future); } } } } Object o = future.get(5_000L, TimeUnit.MILLISECONDS); if (o instanceof GrpcResponse) { //返回类型的GrpcResponse,阐明本次申请是通过 raft log 转到 Leader 解决并返回的,须要将数据反序列化 return serializer .deserialize(((GrpcResponse) o).getData().toByteArray(), OperationResponse.class); } else { //间接在本地通过 read-index read 读本地内存 return (OperationResponse<T>) o; }
3.4 Follower 申请转发
在 sofa-jraft 中,所有的写申请都只能由 Leader 节点进行解决,当写申请落到了 Follower 中,有两种形式进行解决。
- 间接回绝该申请,并将 Leader 节点的地址返回给客户端,让客户端从新发动申请。
- 将以后申请 hold 在服务端,并将该申请转发到 Leader 节点,Leader 节点解决实现后,将 response 返回给 Follower,Follower 再将之前 hold 住的申请返回给客户端。
这里应用第一种时,须要客户端也进行响应的革新,为了对客户端通明,咱们抉择了第二种,通过转发的形式,将申请转给 Leader
在 sofa-jraft 中,各个节点须要通过 Rpc 来进行通信,比方发送心跳,投票等。sofa-jraft 默认提供了两种通信形式,一种是 sofa-bolt,还有一种是 grpc,思考到组件的流行性,抉择了grpc来作为通信形式。在构建 server 时,应用 GrpcRaftRpcFactory 在创立 RpcServer 。而后将 sofa-jraft 中自带的处理器(心跳处理器,投票处理器等)注册到 RpcServer中。这些处理器都是实现了 RpcProcessor 接口,该接口的 handleRequest 办法会解决收到的申请。
应用 GrpcRaftRpcFactory 须要留神的是,须要引入依赖。
<dependency> <groupId>com.alipay.sofa</groupId> <artifactId>rpc-grpc-impl</artifactId> <version>${jraft.grpc.version}</version></dependency>
并且须要通过 spi 指定应用 GrpcRaftRpcFactory。文件门路 /resources/META-INF.services/com.alipay.sofa.jraft.rpc.RaftRpcFactory,文件内容 com.alipay.sofa.jraft.rpc.impl.GrpcRaftRpcFactory
这里,能够定义一个本人的处理器,实现 RpcProcessor 接口,将该 Processor 也注册到 RpcServer 中,复用同一个 RpcServer。
- 创立 RpcServer 并注册处理器
//获取GrpcRaftRpcFactory GrpcRaftRpcFactory raftRpcFactory = (GrpcRaftRpcFactory) RpcFactoryHelper.rpcFactory(); //GrpcRequest 是本人的 Processor 通信应用,这里应用 proto 去生成 GrpcRequest 和 GrpcResponse raftRpcFactory.registerProtobufSerializer(GrpcRequest.class.getName(), GrpcRequest.getDefaultInstance()); raftRpcFactory.registerProtobufSerializer(GrpcResponse.class.getName(), GrpcResponse.getDefaultInstance()); MarshallerRegistry registry = raftRpcFactory.getMarshallerRegistry(); //注册GrpcRequest 对应的 response 的默认对象 registry.registerResponseInstance(GrpcRequest.class.getName(), GrpcResponse.getDefaultInstance()); //创立 GrpcServer final RpcServer rpcServer = raftRpcFactory.createRpcServer(peerId.getEndpoint()); //注册sofa-jraft中自带的处理器 RaftRpcServerFactory.addRaftRequestProcessors(rpcServer, RaftExecutor.getRaftCoreExecutor(), RaftExecutor.getRaftCliServiceExecutor()); //注册本人业务的处理器 rpcServer.registerProcessor(new GrpcRequestProcessor(server)); return rpcServer;
- proto file
syntax = "proto3";option java_multiple_files = true;package com.xxx.mdb.store.raft.entity;message GrpcRequest { //这里的data保留的就是 Operation 序列化过后的二进制流 bytes data =1;}message GrpcResponse { //这里的data保留的是业务 Processor 解决完 Operation过后,并且通过序列化后的二进制流 bytes data = 1; //异样信息 string errMsg = 2; //标记位,申请是否ok bool success = 3;}
- 本人的处理器,用于接管 Follower 过去的转发申请
//如果以后节点不是 Leader,不进行解决 if (!jRaftServer.getNode().isLeader()) { return; } //定义 done,状态机利用 raft log 后,会回调这个done FailoverClosure done = new FailoverClosure() { GrpcResponse data; Throwable ex; @Override public void setResponse(GrpcResponse data) { //Follwer在状态机中执行胜利后,会将 result 封装成 GrpcResponse,而后在这里设置 this.data = data; } @Override public void setThrowable(Throwable throwable) { //在异样时,会进行调用 this.ex = throwable; } @Override public void run(Status status) { if (Objects.nonNull(ex)) { LOGGER.error("execute has error", ex); //ex不为null,阐明产生了异样,将异样返回给 Follower rpcCtx.sendResponse( GrpcResponse.newBuilder().setErrMsg(ex.toString()).setSuccess(false) .build()); } else { //将申请返回 Follower rpcCtx.sendResponse(data); } } }; //将从 Follower 过去的申请提交给状态机,在外部会把 request 的 data 字段给反序列化为 Operation jRaftServer.applyOperation(jRaftServer.getNode(), request, done);
- Follower 中的转发逻辑
try { //将 operation 序列化成byte数组,而后构建GrpcRequest. GrpcRequest request = GrpcRequest.newBuilder() .setData(ByteString.copyFrom(serializer.serialize(operation))).build(); //从缓存获取以后 Leader 节点的地址,如果 Leader 为空,抛出异样。这里的 Leader 须要动静刷新,每隔5秒中就去刷新一次 //Leader,保障 Leader 是最新的。能够通过 RouteTable#refreshLeader 去定时刷新。 final Endpoint leaderIp = Optional.ofNullable(getLeader()) .orElseThrow(() -> new IllegalStateException("Not find leader")).getEndpoint(); //通过 grpc 将申请发送给本人的处理器 cliClientService.getRpcClient().invokeAsync(leaderIp, request, new InvokeCallback() { @Override public void complete(Object o, Throwable ex) { if (Objects.nonNull(ex)) { //存在异样,将异样进行回调 closure.setThrowable(ex); //进行 fail 的回调,回调中会将 exception 返回给客户端 closure.run(new Status(RaftError.UNKNOWN, ex.getMessage())); return; } //将 grpc response 设置给回调类 closure.setResponse((GrpcResponse) o); //进行 success 的回调,回调中会将数据返回给客户端 closure.run(Status.OK()); } @Override public Executor executor() { return RaftExecutor.getRaftCliServiceExecutor(); } }, timeoutMillis); } catch (Exception e) { closure.setThrowable(e); closure.run(new Status(RaftError.UNKNOWN, e.toString())); }
四、sofa-jraft 的一些实际
4.1 read-index read 返回数据量过大导致 oom
在咱们的业务场景中,有一个获取全量数据的接口,并且是通过 read-index read 去进行读数据的。在对这个接口进行压测时,会发现 CPU 飙高的状况,通过排查,是因为堆内存占满了,GC线程始终在 work 导致的。通过 dump 堆内存后发现,是因为外部应用 Disruptor 导致的问题,该问题目前已被咱们修复,并且也已反馈给社区,在1.3.8版本中进行了解决。具体问题见 issue#618
4.2 read-index read 响应工夫较长
在测试同学进行压测,发现读取接口的最大耗时偶然会跑到 500ms,均匀响应耗时大略在100ms左右。通过重复排查以及浏览代码,最终发现这个问题和 election timeout 无关。在 sofa-jraft 中,election timeout 就是选举超时的工夫,一旦超过了 election timeout,Follwer 还没有收到 Leader 的心跳,Follower 认为以后集群中没有 Leader,本人发动投票来尝试入选 Leader。失常状况下,Leader 给 Follower 发心跳的频率是 election timeout / 10,也就是说在 election timeout 期间内,Leader 会给 Follower 发10次心跳,Follower 10次都没有收到心跳的状况下,才会产生选举。而凑巧的是,我设置的 election timeout 刚好就是 5s,5s / 10 刚好就是 500ms。
于是进一步剖析 read-index read 的机制,当 Follower 应用 read-index read 时,首先要去 Leader 获取 Leader 以后的 commit index,而后须要期待 Follower 本人的状态机的 apply index 超过从 Leader 那边获取到的 commit index,而后才会进行 read-index read 的回调。而 Follower 的状态机的 apply 操作是通过 Leader 的心跳申请驱动的,Leader 中可能晓得 raft log 是否被半数提交了,一旦某一条 raft log 被半数提交,Leader 在下一次的心跳申请中就会发最新的 commit index 同步给 Follower,Follower 收到新的 commit index 后,才会驱动本人的状态机去 apply raft log。而心跳申请的频率又是 election timeout / 10,所有会存在 read-index read 偶然的响应工夫会是 election timeout / 10.
如何解决:基于以上剖析,将 election timeout 的工夫调整为了1s,心跳频率也就变成了 100ms,最大的响应耗时也就变低了,均匀响应耗时也升高到了4ms左右。
read-index read 大略逻辑如图-5所示
图-5:read-index read 解决逻辑
4.3 read-index read 大响应接口失败后转发申请到 leader 导致状态机阻塞
在一次排查问题的过程中,狐疑网络存在问题。于是分割运维同学,运维同学对执行 tcpdump 命令,对网络进行了抓包。整个集群分为3个机房,2+2+1的模式进行部署,1这个节点的网络偶然会存在稳定。在过后执行 tcpdump 过后4分钟,到1这个节点的读申请就开始产生 read-index timeout 了,而过后的逻辑是,只有 read-index read 回调状态不ok,就将该申请转发到 Leader,走 raft log 来进行解决。
这里存在一个接口,是去读所有的数据,数据量比拟大。当 read-index read 超时时,会将这个申请转发到了 Leader 节点,走 raft log 去读数据,走 raft log 就会在状态机中去进行解决,而这个申请的 response 比拟大,导致在获取完数据后,去序列化数据时比拟耗时,大略须要耗费1500ms,状态机中解决 raft log 的吞吐就升高了。并且 raft log 是会从 Leader 复制给 Follower 的,也就是说,Follower 的状态机也会去执行这个耗时 1500 ms的 raft log,只是 Follower 不对 response 做解决而已。
在下面形容了 read-index read 的逻辑,Follower 要执行 read-index read,须要状态机的 apply-index 追上 Leader 的 commit index,当产生上述网络稳定时,这个大接口走 raft log 的形式,升高了状态机解决 raft log 的吞吐,导致 Follwer 的 apply index 更难追上 Leader 的 commit index 了。因而陷入了恶性循环,这个大接口统一通过 raft log 转向 Leader 去读取数据,而这个 raft log 解决十分耗时。最终导致状态机的 apply index 远远小于 commit index,所有的客户端的读操作和写操作全副都超时。
如何解决:将这个大接口的读取操作改成疾速失败,一旦 read-index read 的回调不胜利,不把申请通过 raft log 转到 Leader 去,间接返回异样给客户端,让客户端重试。
4.4 snapshot 操作时,阻塞状态机利用 raft log,导致响应超时
零碎在压测时,跑着跑着客户端会偶然超时。通过重复排查,发现超时的工夫点和 snapshot 的工夫点重合,依据浏览代码发现,状态机的 apply 操作和 snapshot 操作默认是同步的,而 snapshot 比拟耗时,导致了状态机 apply raft log 工夫被缩短了,从而客户端申请超时。
如何解决:在 snapshot 时,将 snapshot 的操作变为异步操作,应用copy on write 把 snapshot 时的内存数据 copy 了一份,再异步进行长久化解决。这里须要留神的是,copy on write 会耗费2倍的内存,这里须要确保不要导致OOM了。不同的场景须要思考不同的异步 snapshot 的形式。
4.5 raft中存在 raft log 和 snapshot file,须要文件系统保障有状态
sofa-jraft 须要保留 raft log 以及 snapshot file。在容器部署时,须要确保利用应用的 raft 目录是长久化的。
4.6 开启 metrics 以及利用 kill -s SIGUSR2 帮忙问题剖析
在 sofa-jraft 中,存在 node 参数 enableMetrics
,是否开启 metrics 统计指标数据。咱们将它关上,并且将指标数据输入到一个独自的日志文件,归档的日志能够在剖析问题时提供线索。比方:有时候的读取申请响应工夫增大了,就能够通过观察指标数据 read-index 来帮忙剖析是否是线性读的机制导致申请响应飙升。
将指标输入到日志文件:
Node node = ...NodeOptions nodeOpts = ...//关上监控指标nodeOpts.setEnableMetrics(true);node.init(nodeOpts);Slf4jReporter reporter = Slf4jReporter .forRegistry(node.getNodeMetrics().getMetricRegistry()) //获取到日志的输入对象 .outputTo(LoggerFactory.getLogger("com.jraft.metrics")) .convertRatesTo(TimeUnit.SECONDS) .convertDurationsTo(TimeUnit.MILLISECONDS) .build();reporter.start(30, TimeUnit.SECONDS);
除此之外,还能够利用 kill -s SIGUSR2 pid 给 jraft 过程发送信号量,过程收到信号量后,会在过程的启动目录中生成指标数据数据文件。这里我集体比拟关注 node_describe.log 中 log manager 的 diskId 和 appliedId,前者是 raft log 写到磁盘中的地位,后者是状态机以后利用到 raft log 的地位,能够通过比照这两个数据,用来察看状态机的吞吐是否失常一旦两者相差很多,阐明状态机出问题了。
五、后续演进
- 引入 Learner 节点,减少整个集群的读吞吐量。
- 继续关注社区,和社区独特倒退。
六、结语
以上就是 sofa-jraft 在咱们公司内的应用分享,有问题的小伙伴能够找到我的 github 间接邮箱和我沟通。感激 sofa-stack 提供的一个如此优良的 java 框架。 respect!!!
「作者」
赵延: Github:horizonzy,同程艺龙高级开发,负责服务治理相干工作。关注RPC、服务治理和分布式等畛域。
董春明: 同程艺龙架构师,负责服务治理及云原生布局演进相干工作,分布式领域专家,Paper 爱好者。
「参考」
- alibaba nacos 中对于sofa-jraft的应用 nacos
- jraft-rheakv 中对于sofa-jraft的应用 jraft
本周举荐浏览
技术风口上的限流
蚂蚁团体万级规模 k8s 集群 etcd 高可用建设之路
2021 年云原生技术倒退现状及将来趋势