GoReplay 简介

随着应用程序的复杂度的增长,测试它所须要的工作量也呈指数级增长。 GoReplay 为咱们提供了复用现有流量进行测试的简略想法。GoReplay是一个用golang开发的简略的流量录制插件,反对多种形式的过滤,限流放大,重写等等个性。GoReplay 能够做到对代码齐全无侵入性,也不须要更改你的生产基础设施,并且与语言无关。它不是代理,而是间接监听网卡上的流量。

GoReplay 工作形式:listener server 捕捉流量,并将其发送至 replay server 或者保留至文件,或者保留到kafka。而后replay server 会将流量转移至配置的地址

应用过程

需要:接到算法侧的需要,须要录制实在的生产环境流量,并且随时回放到任意环境。

因为算法侧局部场景为非Java语言编写,现存的流量录制平台临时无奈反对,须要采纳新的录制组件来撑持压测需要,遂抉择goreplay 。

GoReplay反对将录制的数据存储到本地文件中,而后回放时从文件中读取。思考到每次录制回放时须要进行存储及下发文件的复杂度,咱们冀望应用更便捷的形式来治理数据。
GoReplay也是原生反对录制数据存储到kafka中的,然而在应用的时候,发现它有较大的限度;应用kafka存储数据时,必须是流量录制的同时进行流量回放,其架构图如下

流程1-4 无奈拆分,只能同时进行

这会显得流量录制回放性能很鸡肋,咱们须要录制好的数据任意时刻重放,并且也要反对将一份录制好的数据多次重放。既然它曾经将流量数据存储到了kafka,咱们就能够思考对GoReplay进行革新,以让他反对咱们的需要。

革新后的流量录制回放架构图:

图中,1-2 与 3-5 阶段是互相独立的

也就是说,流量录制过程与回放过程能够拆开。只须要在录制开始与完结的时候记录kafka的offset,就能够晓得这个录制工作蕴含了哪些数据,咱们能够轻松的将每一段录制数据,整顿成录制工作,而后在须要的时候进行流量回放。

革新与整合

kafka offset 反对革新

简要过程:
源码中的 InputKafkaConfig 的定义

type InputKafkaConfig struct {    producer sarama.AsyncProducer    consumer sarama.Consumer    Host     string `json:"input-kafka-host"`    Topic    string `json:"input-kafka-topic"`    UseJSON  bool   `json:"input-kafka-json-format"`}

批改后的 InputKafkaConfig 的定义

type InputKafkaConfig struct {    producer  sarama.AsyncProducer    consumer  sarama.Consumer    Host      string `json:"input-kafka-host"`    Topic     string `json:"input-kafka-topic"`    UseJSON   bool   `json:"input-kafka-json-format"`    StartOffset    int64  `json:"input-kafka-offset"`    EndOffset int64  `json:"input-kafka-end-offset"`}

源码中,从kafka读取数据的片段:
能够看到,它选取的offset 是 Newest

for index, partition := range partitions {        consumer, err := con.ConsumePartition(config.Topic, partition, sarama.OffsetNewest)        go func(consumer sarama.PartitionConsumer) {            defer consumer.Close()            for message := range consumer.Messages() {                i.messages <- message            }        }(consumer)    }

批改过后的从kafka读数据的片段:

for index, partition := range partitions {        consumer, err := con.ConsumePartition(config.Topic, partition, config.StartOffset)        offsetEnd := config.EndOffset - 1        go func(consumer sarama.PartitionConsumer) {            defer consumer.Close()            for message := range consumer.Messages() {                // 比拟音讯的offset, 当超过这一批数据的最大值的时候,敞开通道                if offsetFlag && message.Offset > offsetEnd {                    i.quit <- struct{}{}                    break                }                i.messages <- message            }        }(consumer)    }

此时,只有在启动回放工作时,指定kafka offset的范畴。就能够达到咱们想要的成果了。

整合到压测平台

通过页面简略的填写抉择操作,而后生成启动命令,来代替简短的命令编写

StringBuilder builder = new StringBuilder("nohup /opt/apps/gor/gor");// 拼接参数 组合命令builder.append(" --input-kafka-host ").append("'").append(kafkaServer).append("'");builder.append(" --input-kafka-topic ").append("'").append(kafkaTopic).append("'");builder.append(" --input-kafka-start-offset ").append(record.getStartOffset());builder.append(" --input-kafka-end-offset ").append(record.getEndOffset());builder.append(" --output-http ").append(replayDTO.getTargetAddress());builder.append(" --exit-after ").append(replayDTO.getMonitorTimes()).append("s");if (StringUtils.isNotBlank(replayDTO.getExtParam())) {  builder.append(" ").append(replayDTO.getExtParam());}builder.append(" > /opt/apps/gor/replay.log 2>&1 &");String completeParam = builder.toString();

压测平台通过 Java agent 裸露的接口来管制 GoReplay过程的启停

String sourceAddress = replayDTO.getSourceAddress();String[] split = sourceAddress.split(COMMA);for (String ip : split) {  String uri = String.format(HttpTrafficRecordServiceImpl.BASE_URL + "/gor/start", ip,                                                  HttpTrafficRecordServiceImpl.AGENT_PORT);  // 从新创建对象  GoreplayRequest request = new GoreplayRequest();  request.setConfig(replayDTO.getCompleteParam());  request.setType(0);  try {    restTemplate.postForObject(uri, request, String.class);  } catch (RestClientException e) {    LogUtil.error("start gor fail,please check it!", e);    MSException.throwException("start gor fail,please check it!", e);  }}