关于sofa:SOFAJRaft-在同程旅游中的实践

40次阅读

共计 15709 个字符,预计需要花费 40 分钟才能阅读完成。

sofa-jraft 在同程游览中的实际

同程艺龙作为对 raft 钻研较早的公司,早在 14 年算法的 paper 刚公布的时候,便曾经对其进行了调研,同时也与 paxos、zab 等算法进行了具体的比照,并在公司外部的计数器、任务调度元信息存储等场景进行试点。不过晚期对于 raft 的尝试较多的是在 cs++ 技术栈试水,在 java 技术栈里却很少有波及。近期刚好有基于 etcd 的老我的项目因为须要自定义的数据结构须要重构,原有的 etcd 无奈在底层数据结构层面满足需要,因而决定采纳 java 技术栈联合开源我的项目 sofa-jraft 进行底层数据存储的开发,以期解决多节点数据强统一的问题。本文假如读者对 raft 及强统一的概念曾经有了较深的了解,具体介绍了公司外部如何应用 jraft 的进行老零碎的革新以及应用过程中遇到的工程问题,心愿其余对 raft 有趣味的同学能够一起探讨。

一、背景

公司外部本来存在一个零碎 mdb (metadata database),go 语言编写,用于治理所有的实例元数据信息,元数据的内容就是一个 map。该组件提供对元数据增删改查的接口,并且应用 go 语言编写,在检索数据时引入了 k8s selector 的包,应用 k8s selector 的解析规定筛选特定标签的元数据信息。数据长久化则是实用了强统一组件 etcd 进行存储,key 为元数据的 id,保障惟一,value 为具体的元信息,蕴含各个标签以及对应的值。

该零碎大体架构如图 - 1 所示:

图 -1:原来的架构

该架构的弊病:

  1. 每隔一段时间须要去拉取 etcd 的全量数据,放心单次申请数据量太大的状况,对数据 id 进行了 hash 分片,顺次获取每个分片下个 key,再通过 key 去获取 value,导致 etcd 的查问频率十分高
  2. 非 id 查问条件的一致性查问,和下面的问题一样,也须要做拉取全量数据的操作
  3. 更新删除操作也是一样,须要拉取全量数据再操作

剖析以上问题能够发现,应用 etcd 作为强统一存储,但 etcd 是基于 KV 存储的组件,并且解析组件 mdb 和 etcd 是拆散的,在须要保证数据最新的状况下,必须得去 etcd 拿最新的数据到本地再进行操作。而 etcd 基于 KV,就得拿到 etcd 的全量数据都给拉到本地后再做操作。

如果有一个组件,提供强统一的存储能力,又能间接去解析 k8s selector 的规定,存储的数据结构和元数据信息更加亲和,那么两头的那一层 mdb 就能够间接去掉了,由这个组件来解析对应的 crud 规定,将解析过后的规定间接在本地查问,那么以上问题就可能间接解决了。

二、革新

基于以上问题,咱们筹备本人开发一个强统一存储的组件,可能本人解析 k8s selector 的规定,并且将数据保留在本人本地。因为集体对 java 语言比拟理解,并且之前应用 nacos 时,对 sofa-jraft 也有肯定理解,最终抉择了 sofa-jraft 来构建强统一存储组件,将它命名为 mdb-store。

次要革新点:
  1. 应用 sofa-jraft 编程模型构建业务状态机,业务状态机中依据 raft log 中的 data 类型,进行 crud 的操作。
  2. mdb-store 提供与原来 mdb 雷同的 api,让革新对用户通明,革新实现后只须要切换域名对应的实例即可。
  3. 迁徙 k8s selector 的解析逻辑,这里用 java 写了一套和 go 版本 k8s selector 一样解析逻辑的组件 k8s-selector-java,目前该组件已开源 k8s-selector-java

革新过后的架构如图 - 2 所示:

图 -2:重构后的架构

通过革新过后,将 mdb 移除,让用户间接和 mdb-store 进行通信,两头的通信少了一跳,放慢了拜访效率。将 mdb 和 etcd 也进行了移除,缩小了整个零碎的组件,升高运维老本。

三、sofa-jraft 的具体应用

3.1 将写操作转换成 raft log

在 sofa-jraft 中,编程模型和通常的 Spring MVC 的编程模式不太一样。在 Spring MVC 中,一个申请达到了后端,通常会通过 Controller -> Service -> Processor 这么几层。Controller 负责本次 http 申请的资源映射,再由 Controller 去调用特定的 Service 办法,在 Service 层中,对参数进行一些解决和转换,再交由 Processor 层去对申请做真正的解决。

大体逻辑如图 - 3 所示

图 -3:通常的编程模型

在 sofa-jraft 中,所有的写操作都要通过状态机去执行(读操作不须要通过状态机)。须要将写操作转换成 task,状态机去利用 task 而后再进行业务解决。task 中蕴含两个属性是须要关注的,一个是 done,一个是 data。done 就是本次 task 被状态机解决实现后的回调,比方在 done 的回调逻辑中,将 response flush 到客户端。data 就是 raft log 中的具体数据,比方要执行一条插入元数据的命令。data 就会蕴含本次操作的类型(插入),以及本次操作的具体数据。

java
public class Task implements Serializable {
    private ByteBuffer        data             = LogEntry.EMPTY_DATA;
    private Closure           done;
    /// 省略局部代码
}

大体逻辑如图 - 4 所示

图 -4:sofa-jraft 的编程模型

所有的操作都形象成一个实体 Operation,在 Service 层,就依据业务把参数转换成不同的 Operation,而后再将 Operation 序列化,转换成 Task 实体,再由 Node 将 task 提交。这里能够将 task 看成是一个 raft log,一旦 raft log 被半数的机器给提交。状态机就会利用 raft log,利用实现之后就会触发 done 中的回调。

  • 形象过后的实体类
class Operation<T> {
      // 操作类型,比方增删改查
    int type;
        // 操作的哪个表,某些类型不须要此字段
    String table;
    // 具体的操作数据,依据 type 的不同,数据类型也会不同
    T params;
}
  • 构建 task 并通过 node 提交给状态机
final Task task = new Task();
// 定义回调的逻辑,当该 raft log 被状态机利用后,会进行回调
task.setDone(new StoreClosure(operation, status -> {StoreStatus storageStatus = (StoreStatus) status;
    closure.setThrowable(storageStatus.getThrowable());
    closure.setResponse(storageStatus.getResponse());
    closure.run(storageStatus);
}));
// 将 operation 进行序列化,在状态机中会将该值反序列化还原,再交给 processor 解决
task.setData(ByteBuffer.wrap(serializer.serialize(operation)));
node.apply(task);
3.2 状态机的实现
  • onApply

    onApply 是状态机的外围性能,其目标就是接支出参中的 raft log 以及 done,而后将 raft log 中的数据反序列化,交由本人的业务处理器去进行解决。解决实现之后,触发 done 的回调,这里就和 Node.apply(task) 关联上了。

                while (iter.hasNext()) {Status status = Status.OK();
                    try {if (iter.done() != null) {
                                // 阐明以后状态机是 Leader,能够间接从 closure 中获取操作数据
                            closure = (MdbStoreClosure) iter.done();
                            operation = closure.getOperation();} else {
                            // 以后状态机不是 Leader,通过对 raft log 中的数据进行反序列化操作,还原操作数据
                            ByteBuffer data = iter.getData();
                            operation = serializer.deserialize(data.array(), Operation.class);
                        }
                          // 业务处理器进行业务解决,业务处理器中会判断 operation 的类型,抉择不同的解决逻辑
                        OperationResponse result = mdbStoreProcessor.processOperation(operation);
                        // 将 result 序列化
                        GrpcResponse response = GrpcResponse.newBuilder().setSuccess(true)
                                .setData(ByteString.copyFrom(serializer.serialize(result))).build();
              
                        Optional.ofNullable(closure)
                                .ifPresent(closure1 -> closure1.setResponse(response));
                    } catch (Throwable e) {status.setError(RaftError.UNKNOWN, e.toString());
                        Optional.ofNullable(closure).ifPresent(closure1 -> closure1.setThrowable(e));
                        throw e;
                    } finally {
                        // 对 task 中的 done 进行回调
                        Optional.ofNullable(closure).ifPresent(closure1 -> closure1.run(status));
                    }
                      // 将 raft log 的生产地位 +1,示意以后这个 raft log 曾经被胜利利用了
                    iter.next();}
  • onSnapshotSave

    在初始化 sofa-jraft node 时,存在一个参数,NodeOptions#snapshotUri。该参数设置后就会开启 snapshot 机制,个别是举荐设置。开启实现之后,每隔 30min 就会进行一次 snapshot(这里须要留神的是,30 分钟内有 raft log 提交时,才会进行 snapshot)。在进行 snapshot 的时候,须要把以后的数据进行长久化操作。在 snapshot 实现后,就会将 snapshot 中最初一条 raft log 之前的 raft-loig 全副删除。其意义就是防止 raft log 始终减少,导致磁盘占用飙高。

snapshot 机制能够这么去了解,在 sofa-jraft 中,业务 processor 中的操作都是状态机驱动的,而状态机又是由 raft log 驱动。那么 processor 中数据的最终状态其实就是所有的 raft log 利用的总和。比方存在一个 raft log,其业务含意是 i++。10 条 raft log 被状态机利用后,驱动 processor 进行 10 次 i ++ 操作,最终的值就是为 10。利用就算解体重启后,重启时,他会去利用之前的 10 条 i ++ 的 raft log,processor 中的值也还是 10。应用 snapshot 机制,在进行 snapshot 时,把 processor 中的 10 进行长久化,长久化实现过后,将前 10 条 raft log 进行删除,后续再来 2 条 i++ 的 raft log,processor 的值变为 12,存在 2 条 i ++ 的 raft log。利用就算解体重启,那么它首先也会读取 snapshot 中的数据 10,再去利用 2 条 i ++ 的 raft log,最终数据也是 12,和解体之前保持一致。

Processor 的最终态 = snapshot + raft log

                MdbStoreStoreSnapshotFileImpl mdbStoreStoreSnapshotFile = (MdbStoreStoreSnapshotFileImpl) snapshotFile;
        String tempPath = snapshotPath + "_temp";
        File tempFile = new File(tempPath);
        FileUtils.deleteDirectory(tempFile);
        FileUtils.forceMkdir(tempFile);
                // 记录总共的 table 数量
        mdbStoreStoreSnapshotFile
                .writeToFile(tempPath, "tailIndex", new TailIndex(persistData.size()));
                // 将每一个 table 中的数据都进行长久化
        for (int i = 0; i < persistData.size(); i++) {mdbStoreStoreSnapshotFile.writeToFile(tempPath, combineDataFile(i),
                    new TablePersistence(persistData.get(i)));
        }
        File destinationPath = new File(snapshotPath);
        FileUtils.deleteDirectory(destinationPath);
        FileUtils.moveDirectory(tempFile, destinationPath);
  • onSnapshotLoad

    onSnapshotLoad 的几个触发场景。

    1. 当一个节点重新启动时。

      1. 当 Follower 中的 commit-index 都小于 Leader 中 snapshot 的最初一条 raft log 时(Follower 太落后了,Follower 须要的 raft log 曾经被 Leader 的 snapshot 机制删除了)

    onSnapshotLoad 和下面的 onSnapshotSave 是成对的,这里只须要把之前保留的文件中的内存读取,而后再进行反序列化,增加到 processor 中的数据容器即可。

                    MdbStoreStoreSnapshotFileImpl mdbStoreStoreSnapshotFile = (MdbStoreStoreSnapshotFileImpl) snapshotFile;
                    // 读取总共的文件数
            TailIndex tailIndex = mdbStoreStoreSnapshotFile
                    .readFromFile(snapshotPath, TAIL_INDEX, TailIndex.class);
    
            int size = tailIndex.data();
    
            for (int i = 0; i < size; i++) 
                  // 挨个读取文件,将文件内容进行反序列化
                TablePersistence tablePersistence = mdbStoreStoreSnapshotFile
                        .readFromFile(snapshotPath, combineDataFile(i), TablePersistence.class);
                TableDataDTO data = tablePersistence.data();
                            
                Table table = new Table(data.getName(), new HashSet<>(data.getIndexNames()),
                        data.getRetryCount());
                for (Record dataData : data.getDatas()) {table.addRecord(dataData);
                }
                  // 将数据丢给 processor 中的数据容器
                dataComponent.putData(table.getName(), table);
            }
  • 状态机的其余状态变更的办法

    一般来说,节点的状态是不会发生变化的,一旦发生变化,就须要去剖析利用的状态了,察看节点是否失常。StateMachine 提供了状态回调的接口,咱们在回调中对接外部的监控零碎,当状态机的节点状态发生变化时,会实时告诉到保护人员,保护人员再对利用进行剖析排查。

3.3 应用 read-index read 进行读操作

依照 raft 论文失常来说,读写操作都只能由 Leader 进行解决,这样可能保障读取的数据都是统一的。这样的话,读申请的吞吐就没方法减少。对于这个 case,sofa-jraft 提供了 read-index read,能够在 Follower 中进行读取操作,并且能保障在 Follower 中读的后果和在 Leader 中读的后果统一。对于 read-index read 能够参考 pingcap 的这篇博客 https://pingcap.com/zh/blog/l…。

com.alipay.sofa.jraft.Node#readIndex(final byte[] requestContext, final ReadIndexClosure done),第一个参数是发动 read-index read 时的上下文,能够在回调中应用。第二个参数就是具体的回调逻辑,须要在 run 办法中实现读取逻辑。

  • read-index read 编程模型
        CompletableFuture future = new CompletableFuture<>();    
                node.readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() {
                @Override
                public void run(Status status, long index, byte[] reqCtx) {
                      // 状态 ok,阐明能够通过 read-index 去进行读取
                    if (status.isOk()) {
                        try {
                            // 间接应用 processor 查问数据,不通过状态机
                            OperationResponse<T> res = (OperationResponse<T>) mdbStoreProcessor
                                    .processOperation(operation);
                            future.complete(res);
                        } catch (Throwable t) {
                            future.completeExceptionally(
                                    new IllegalStateException("Fail to read data from processor",
                                            t));
                        }
                    } else {
                          // 状态不 ok,可能是超时,也可能是状态机异样等其余起因
                        if (Operation.ALL_DATA == operation.getType()) {
                              // 这里判断是不是读取全量的数据,读取全量数据的话,须要疾速失败,不能转到 leader 走 raft log 读取,// 起因见 4.3
                            future.completeExceptionally(new IllegalStateException(
                                    "Fail to get all data by read-index read, status:" + status
                                            .getErrorMsg()));
                        } else {
                              // 通过将本次申请转发到 Leader 中,走 raft log,在 Leader 的状态机中把本条 raft log 利用后,再                                                         // 返回数据给 Follower
                            LOGGER.warn("ReadIndex read failed, status: {}, go to Leader read.",
                                    status.getErrorMsg());
                            readFromLeader(operation, future);
                        }
                    }
                }
            }
                       
        Object o = future.get(5_000L, TimeUnit.MILLISECONDS);
        if (o instanceof GrpcResponse) {
            // 返回类型的 GrpcResponse,阐明本次申请是通过 raft log 转到 Leader 解决并返回的,须要将数据反序列化
            return serializer
                    .deserialize(((GrpcResponse) o).getData().toByteArray(), OperationResponse.class);
        } else {
              // 间接在本地通过 read-index read 读本地内存
            return (OperationResponse<T>) o;
        }
3.4 Follower 申请转发

在 sofa-jraft 中,所有的写申请都只能由 Leader 节点进行解决,当写申请落到了 Follower 中,有两种形式进行解决。

  1. 间接回绝该申请,并将 Leader 节点的地址返回给客户端,让客户端从新发动申请。
  2. 将以后申请 hold 在服务端,并将该申请转发到 Leader 节点,Leader 节点解决实现后,将 response 返回给 Follower,Follower 再将之前 hold 住的申请返回给客户端。

这里应用第一种时,须要客户端也进行响应的革新,为了对客户端通明,咱们抉择了第二种,通过转发的形式,将申请转给 Leader

在 sofa-jraft 中,各个节点须要通过 Rpc 来进行通信,比方发送心跳,投票等。sofa-jraft 默认提供了两种通信形式,一种是 sofa-bolt,还有一种是 grpc,思考到组件的流行性,抉择了 grpc 来作为通信形式。在构建 server 时,应用 GrpcRaftRpcFactory 在创立 RpcServer。而后将 sofa-jraft 中自带的处理器(心跳处理器,投票处理器等)注册到 RpcServer 中。这些处理器都是实现了 RpcProcessor 接口,该接口的 handleRequest 办法会解决收到的申请。

应用 GrpcRaftRpcFactory 须要留神的是,须要引入依赖。

<dependency>
    <groupId>com.alipay.sofa</groupId>
    <artifactId>rpc-grpc-impl</artifactId>
    <version>${jraft.grpc.version}</version>
</dependency>

并且须要通过 spi 指定应用 GrpcRaftRpcFactory。文件门路 /resources/META-INF.services/com.alipay.sofa.jraft.rpc.RaftRpcFactory,文件内容 com.alipay.sofa.jraft.rpc.impl.GrpcRaftRpcFactory

这里,能够定义一个本人的处理器,实现 RpcProcessor 接口,将该 Processor 也注册到 RpcServer 中,复用同一个 RpcServer。

  • 创立 RpcServer 并注册处理器
                // 获取 GrpcRaftRpcFactory
                GrpcRaftRpcFactory raftRpcFactory = (GrpcRaftRpcFactory) RpcFactoryHelper.rpcFactory();
                //GrpcRequest 是本人的 Processor 通信应用,这里应用 proto 去生成 GrpcRequest 和 GrpcResponse
        raftRpcFactory.registerProtobufSerializer(GrpcRequest.class.getName(),
                GrpcRequest.getDefaultInstance());
        raftRpcFactory.registerProtobufSerializer(GrpcResponse.class.getName(),
                GrpcResponse.getDefaultInstance());
                
        MarshallerRegistry registry = raftRpcFactory.getMarshallerRegistry();
                 // 注册 GrpcRequest 对应的 response 的默认对象
        registry.registerResponseInstance(GrpcRequest.class.getName(),
                GrpcResponse.getDefaultInstance());
        // 创立 GrpcServer
        final RpcServer rpcServer = raftRpcFactory.createRpcServer(peerId.getEndpoint());
                 // 注册 sofa-jraft 中自带的处理器
        RaftRpcServerFactory.addRaftRequestProcessors(rpcServer, RaftExecutor.getRaftCoreExecutor(),
                RaftExecutor.getRaftCliServiceExecutor());
                // 注册本人业务的处理器
        rpcServer.registerProcessor(new GrpcRequestProcessor(server));

        return rpcServer;
  • proto file
syntax = "proto3";
option java_multiple_files = true;
package com.xxx.mdb.store.raft.entity;

message GrpcRequest {
  // 这里的 data 保留的就是 Operation 序列化过后的二进制流
  bytes data =1;
}

message GrpcResponse {
  // 这里的 data 保留的是业务 Processor 解决完 Operation 过后,并且通过序列化后的二进制流
  bytes data = 1;
  // 异样信息
  string errMsg = 2;
  // 标记位,申请是否 ok
  bool success = 3;
}
  • 本人的处理器,用于接管 Follower 过去的转发申请
                // 如果以后节点不是 Leader,不进行解决
        if (!jRaftServer.getNode().isLeader()) {return;}
        // 定义 done,状态机利用 raft log 后,会回调这个 done
        FailoverClosure done = new FailoverClosure() {

            GrpcResponse data;

            Throwable ex;

            @Override
            public void setResponse(GrpcResponse data) {
                  //Follwer 在状态机中执行胜利后,会将 result 封装成 GrpcResponse,而后在这里设置
                this.data = data;
            }

            @Override
            public void setThrowable(Throwable throwable) {
                // 在异样时,会进行调用
                this.ex = throwable;
            }

            @Override
            public void run(Status status) {if (Objects.nonNull(ex)) {LOGGER.error("execute has error", ex);
                      //ex 不为 null,阐明产生了异样,将异样返回给 Follower
                    rpcCtx.sendResponse(GrpcResponse.newBuilder().setErrMsg(ex.toString()).setSuccess(false)
                                    .build());
                } else {
                      // 将申请返回 Follower
                    rpcCtx.sendResponse(data);
                }
            }
        };
        // 将从 Follower 过去的申请提交给状态机,在外部会把 request 的 data 字段给反序列化为 Operation
        jRaftServer.applyOperation(jRaftServer.getNode(), request, done);
  • Follower 中的转发逻辑
                    try {
              // 将 operation 序列化成 byte 数组,而后构建 GrpcRequest.
            GrpcRequest request = GrpcRequest.newBuilder()
                    .setData(ByteString.copyFrom(serializer.serialize(operation))).build();
            // 从缓存获取以后 Leader 节点的地址,如果 Leader 为空,抛出异样。这里的 Leader 须要动静刷新,每隔 5 秒中就去刷新一次                                 //Leader,保障 Leader 是最新的。能够通过 RouteTable#refreshLeader 去定时刷新。final Endpoint leaderIp = Optional.ofNullable(getLeader())
                    .orElseThrow(() -> new IllegalStateException("Not find leader")).getEndpoint();
              // 通过 grpc 将申请发送给本人的处理器
            cliClientService.getRpcClient().invokeAsync(leaderIp, request, new InvokeCallback() {
                @Override
                public void complete(Object o, Throwable ex) {if (Objects.nonNull(ex)) {
                          // 存在异样,将异样进行回调
                        closure.setThrowable(ex);
                          // 进行 fail 的回调,回调中会将 exception 返回给客户端
                        closure.run(new Status(RaftError.UNKNOWN, ex.getMessage()));
                        return;
                    }
                      // 将 grpc response 设置给回调类
                    closure.setResponse((GrpcResponse) o);
                    // 进行 success 的回调,回调中会将数据返回给客户端
                    closure.run(Status.OK());
                }

                @Override
                public Executor executor() {return RaftExecutor.getRaftCliServiceExecutor();
                }
            }, timeoutMillis);
        } catch (Exception e) {closure.setThrowable(e);
            closure.run(new Status(RaftError.UNKNOWN, e.toString()));
        }

四、sofa-jraft 的一些实际

4.1 read-index read 返回数据量过大导致 oom

在咱们的业务场景中,有一个获取全量数据的接口,并且是通过 read-index read 去进行读数据的。在对这个接口进行压测时,会发现 CPU 飙高的状况,通过排查,是因为堆内存占满了,GC 线程始终在 work 导致的。通过 dump 堆内存后发现,是因为外部应用 Disruptor 导致的问题,该问题目前已被咱们修复,并且也已反馈给社区,在 1.3.8 版本中进行了解决。具体问题见 issue#618

4.2 read-index read 响应工夫较长

在测试同学进行压测,发现读取接口的最大耗时偶然会跑到 500ms,均匀响应耗时大略在 100ms 左右。通过重复排查以及浏览代码,最终发现这个问题和 election timeout 无关。在 sofa-jraft 中,election timeout 就是选举超时的工夫,一旦超过了 election timeout,Follwer 还没有收到 Leader 的心跳,Follower 认为以后集群中没有 Leader,本人发动投票来尝试入选 Leader。失常状况下,Leader 给 Follower 发心跳的频率是 election timeout / 10,也就是说在 election timeout 期间内,Leader 会给 Follower 发 10 次心跳,Follower 10 次都没有收到心跳的状况下,才会产生选举。而凑巧的是,我设置的 election timeout 刚好就是 5s,5s / 10 刚好就是 500ms。

于是进一步剖析 read-index read 的机制,当 Follower 应用 read-index read 时,首先要去 Leader 获取 Leader 以后的 commit index,而后须要期待 Follower 本人的状态机的 apply index 超过从 Leader 那边获取到的 commit index,而后才会进行 read-index read 的回调。而 Follower 的状态机的 apply 操作是通过 Leader 的心跳申请驱动的,Leader 中可能晓得 raft log 是否被半数提交了,一旦某一条 raft log 被半数提交,Leader 在下一次的心跳申请中就会发最新的 commit index 同步给 Follower,Follower 收到新的 commit index 后,才会驱动本人的状态机去 apply raft log。而心跳申请的频率又是 election timeout / 10,所有会存在 read-index read 偶然的响应工夫会是 election timeout / 10.

如何解决: 基于以上剖析,将 election timeout 的工夫调整为了 1s,心跳频率也就变成了 100ms,最大的响应耗时也就变低了,均匀响应耗时也升高到了 4ms 左右。

read-index read 大略逻辑如图 - 5 所示


图 -5:read-index read 解决逻辑

4.3 read-index read 大响应接口失败后转发申请到 leader 导致状态机阻塞

在一次排查问题的过程中,狐疑网络存在问题。于是分割运维同学,运维同学对执行 tcpdump 命令,对网络进行了抓包。整个集群分为 3 个机房,2+2+ 1 的模式进行部署,1 这个节点的网络偶然会存在稳定。在过后执行 tcpdump 过后 4 分钟,到 1 这个节点的读申请就开始产生 read-index timeout 了,而过后的逻辑是,只有 read-index read 回调状态不 ok,就将该申请转发到 Leader,走 raft log 来进行解决。

这里存在一个接口,是去读所有的数据,数据量比拟大。当 read-index read 超时时,会将这个申请转发到了 Leader 节点,走 raft log 去读数据,走 raft log 就会在状态机中去进行解决,而这个申请的 response 比拟大,导致在获取完数据后,去序列化数据时比拟耗时,大略须要耗费 1500ms,状态机中解决 raft log 的吞吐就升高了。并且 raft log 是会从 Leader 复制给 Follower 的,也就是说,Follower 的状态机也会去执行这个耗时 1500 ms 的 raft log,只是 Follower 不对 response 做解决而已。

在下面形容了 read-index read 的逻辑,Follower 要执行 read-index read,须要状态机的 apply-index 追上 Leader 的 commit index,当产生上述网络稳定时,这个大接口走 raft log 的形式,升高了状态机解决 raft log 的吞吐,导致 Follwer 的 apply index 更难追上 Leader 的 commit index 了。因而陷入了恶性循环,这个大接口统一通过 raft log 转向 Leader 去读取数据,而这个 raft log 解决十分耗时。最终导致状态机的 apply index 远远小于 commit index,所有的客户端的读操作和写操作全副都超时。

如何解决: 将这个大接口的读取操作改成疾速失败,一旦 read-index read 的回调不胜利,不把申请通过 raft log 转到 Leader 去,间接返回异样给客户端,让客户端重试。

4.4 snapshot 操作时,阻塞状态机利用 raft log,导致响应超时

零碎在压测时,跑着跑着客户端会偶然超时。通过重复排查,发现超时的工夫点和 snapshot 的工夫点重合,依据浏览代码发现,状态机的 apply 操作和 snapshot 操作默认是同步的,而 snapshot 比拟耗时,导致了状态机 apply raft log 工夫被缩短了,从而客户端申请超时。

如何解决: 在 snapshot 时,将 snapshot 的操作变为异步操作,应用 copy on write 把 snapshot 时的内存数据 copy 了一份,再异步进行长久化解决。这里须要留神的是,copy on write 会耗费 2 倍的内存,这里须要确保不要导致 OOM 了。不同的场景须要思考不同的异步 snapshot 的形式。

4.5 raft 中存在 raft log 和 snapshot file,须要文件系统保障有状态

sofa-jraft 须要保留 raft log 以及 snapshot file。在容器部署时,须要确保利用应用的 raft 目录是长久化的。

4.6 开启 metrics 以及利用 kill -s SIGUSR2 帮忙问题剖析

在 sofa-jraft 中,存在 node 参数 enableMetrics,是否开启 metrics 统计指标数据。咱们将它关上,并且将指标数据输入到一个独自的日志文件,归档的日志能够在剖析问题时提供线索。比方:有时候的读取申请响应工夫增大了,就能够通过观察指标数据 read-index 来帮忙剖析是否是线性读的机制导致申请响应飙升。

将指标输入到日志文件:

Node node = ...
NodeOptions nodeOpts =  ...
// 关上监控指标
nodeOpts.setEnableMetrics(true);
node.init(nodeOpts);

Slf4jReporter reporter = Slf4jReporter
         .forRegistry(node.getNodeMetrics().getMetricRegistry())
         // 获取到日志的输入对象
         .outputTo(LoggerFactory.getLogger("com.jraft.metrics"))
         .convertRatesTo(TimeUnit.SECONDS)
         .convertDurationsTo(TimeUnit.MILLISECONDS)
         .build();
reporter.start(30, TimeUnit.SECONDS);

除此之外,还能够利用 kill -s SIGUSR2 pid 给 jraft 过程发送信号量,过程收到信号量后,会在过程的启动目录中生成指标数据数据文件。这里我集体比拟关注 node_describe.log 中 log manager 的 diskId 和 appliedId,前者是 raft log 写到磁盘中的地位,后者是状态机以后利用到 raft log 的地位,能够通过比照这两个数据,用来察看状态机的吞吐是否失常一旦两者相差很多,阐明状态机出问题了。

五、后续演进

  • 引入 Learner 节点,减少整个集群的读吞吐量。
  • 继续关注社区,和社区独特倒退。

六、结语

以上就是 sofa-jraft 在咱们公司内的应用分享,有问题的小伙伴能够找到我的 github 间接邮箱和我沟通。感激 sofa-stack 提供的一个如此优良的 java 框架。respect!!!

「作者」

赵延: Github:horizonzy,同程艺龙高级开发,负责服务治理相干工作。关注 RPC、服务治理和分布式等畛域。

董春明: 同程艺龙架构师,负责服务治理及云原生布局演进相干工作,分布式领域专家,Paper 爱好者。

「参考」

  • alibaba nacos 中对于 sofa-jraft 的应用 nacos
  • jraft-rheakv 中对于 sofa-jraft 的应用 jraft

本周举荐浏览

技术风口上的限流

蚂蚁团体万级规模 k8s 集群 etcd 高可用建设之路

2021 年云原生技术倒退现状及将来趋势

正文完
 0