1 我的项目概览

Kindling collector我的项目作为Go端分析器,应用相似opentelmetry的pinpeline进行数据分析。其中波及5个组件:

  • UdsReceiver - Unix Domain Socket接收器,接管探针事件并传递给后续的网络分析器。
  • NetworkAnalyzer - 网络事件分析器,将接管的Read / Write事件匹配成单次申请后,依据协定解析申请内容生成要害指标,传递给后续的分析器。
  • K8sMetadataProcessor - K8S指标处理器,补充K8S指标并传递给后续的聚合处理器
  • AggregateProcessor - 数据聚合处理器,将接管的指标数据生成Trace和Metric,传递给给后续的转发器。
  • OtelExporter - Opentelmetry转发器,将Trace / Metric数据转发给Prometheus进行展现。

其中协定解析流程次要在NetworkAnalyzer组件中进行,将接管的申请/响应事件成对匹配后,交由parseProtocols()函数解析出协定指标。

1.1 协定解析流程

NetworkAnnalyzer.parseProtocols()办法定义了整体解析流程,依据协定解析器别离解析申请和响应,当最终都胜利时输入指标。

1.2 协定解析设计思路

失常的协定解析只负责逐帧解析指标性能。

现已反对5种协定解析,当协定越来越多时,遍历引起的解析会越来越耗时,那么需引入fastfail疾速辨认协定

对于简单的多报文协定,如Kafka有不同的API报文,而雷同API也有不同的版本报文。将所有报文解析逻辑都写在一起会使整个类过于臃肿且不易保护。为此引入树形多报文构造用于疾速且低耦合地实现开发。

1.2.1 报文解析构造体

在树形报文解析过程中,有如下2个场景须要思考

  • 当父协定解析了指标A,子协定解析可能会用到A指标,那么父子协定解析的指标须要透传。
  • 父协定已解析了局部报文长度的内容,那么子协定在开始解析时可间接跳过相应长度的内容进行解析,此处引入偏移量用于下一个协定跳过解析。

定义PayloadMessage,封装报文内容、读取偏移量和指标存储的Map。

type PayloadMessage struct {    Data         []byte    Offset       int    attributeMap *model.AttributeMap}

1.2.2 报文解析API

因为引入协定树,协定解析过程parse() (ok bool)将不再实用。协定树中的个协定的解析胜利不示意整个协定解析胜利,需解析整颗树的协定是否胜利,将API扩大为parse() (ok bool, complete bool)。

  • 对于单层协定(HTTP),返回为(ok, true)


基于以上几点需要,设计树形构造的报文解析器PkgParser。PkgParser定义了fastFail(疾速辨认失败) 和parser(解析单个报文)函数;每个协定只需注册本身的PkgParser即可接入整套流程。
fastFail(message *PayloadMessage) (fail bool)

  • 申明协定是否辨认失败,用于疾速辨认协定

parser(message *PayloadMessage) (ok bool, complete bool)

  • 解析协定,将解析出的指标存储到message的Attributes中
  • 返回是2个参数:

    • 是否解析胜利
    • 是否解析实现 (默认为true,当为false次要是用于嵌套解析过程,例如先定义一个头解析,再定义不同的音讯体解析)。

1.3 申请 / 响应解析

ProtocolParser定义了申请和响应的解析器,并提供ParseRequest()和ParseResponse() API用于解析申请和响应
其中response的message携带了request解析出的attributes指标,用于匹配。eg. kafka的correlationId request和response报文需统一,且response报文解析用到了request的key。

2 开发流程

2.1 增加协定名

const (    HTTP      = "http"  ...    XX        = "xx" // 此处替换具体协定名    ...)

2.2 创立协定

analyzer/network/protocol目录下创立文件夹xx,xx替换为具体协定,并创立3个文件xx_parser.go、xx_request.go 和 xx_response.go

analyzer/network/protocol/xx├── xx_parser.go           协定解析器├── xx_request.go          实现申请解析流程└── xx_response.go          实现响应解析流程

2.2.1 xx_request.go

实现fastfail()和parser()函数

func fastfailXXRequest() protocol.FastFailFn {    return func(message *protocol.PayloadMessage) bool {        // 依据报文实现具体的fastFail()函数        return false    }}func parseXXRequest() protocol.ParsePkgFn {    return func(message *protocol.PayloadMessage) (bool, bool) {        // 解析报文内容        contentKey := getContentKey(message.Data)        if contentKey == "" {            // 第一个参数false 示意解析失败,第二个参数示意报文解析实现            return false, true        }        // 通过AddStringAttribute() 或 AttIntAttribute() 存储解析出的属性        message.AddStringAttribute(constlabels.ContentKey, contentKey)        // 解析胜利        return true, true    }}

2.2.2 xx_response.go

实现fastfail()和parser()函数

func fastfailXXResponse() protocol.FastFailFn {    return func(message *protocol.PayloadMessage) bool {        // 依据报文实现具体的fastFail()函数        return false    }}func parseXXResponse() protocol.ParsePkgFn {    return func(message *protocol.PayloadMessage) (bool, bool) {        // 通过GetStringAttribute() 或 GetIntAttribute() 读取request解析后的参数        contentKey := message.GetStringAttribute(constlabels.ContentKey)                // 解析响应报文        errorCode := getErrorCode(message)        if errorCode > 20 {            // 有errorCode或errorMsg等常见,需定义IsError为true用于后续processor生成Metric            message.AddBoolAttribute(constlabels.IsError, true)            message.AddIntAttribute(constlabels.ErrorType, int64(constlabels.ProtocolError))        }        message.AddStringAttribute(constlabels.XXErrorCode, errorCode)                // 解析胜利        return true, true    }}

2.2.3 xx_parser.go

定义协定解析器

func NewXXParser() *protocol.ProtocolParser {    requestParser := protocol.CreatePkgParser(fastfailXXRequest(), parseXXRequest())    // 当存在嵌套的协定解析 eg. 解析头 + 解析各类不同报文    // 可通过 Add()增加子协定,生成一颗协定树解析,顶部是公共局部解析,分叉是各个不同报文解析    //             Header    //             / | \    //         API1 API2 API3    //         /|\    //       v1 v2 v3    responseParser := protocol.CreatePkgParser(fastfailXXResponse(), parseXXResponse())    return protocol.NewProtocolParser(protocol.XX, requestParser, responseParser, nil)}

2.2.4 factory.go

注册协定解析器

var (    ...    xx_parser   *protocol.ProtocolParser = xx.NewXXParser())func GetParser(key string) *protocol.ProtocolParser {    switch key {        ...        case protocol.XX:            return xx_parser        ...        default:            return nil    }}

2.2.5 kindling-collector-config.yml

配置新增协定

analyzers:  networkanalyzer:    protocol_parser: [ http, mysql, dns, redis, kafka, xx ]

3 开发案例 - Dubbo2协定

3.1 dubbo2协定剖析


依据官网提供的协定标准,解析网络抓包的数据。

  • 前2个byte为魔数,可用于fastfail()办法
  • 第3个byte蕴含Req/Resp、序列化形式等信息,可用于解析协定中判断是否非法报文。
  • 第4个byte用于返回报文的错误码
  • 第16个byte开始需通过指定的序列化形式解析报文内容,service name + method name可用于contentKey标识该申请的URL

3.2 申明协定名

const (    ...    DUBBO2    = "dubbo2"    ...)

3.3 实现dubbo2解析

创立协定相干文件

kindling/collector/analyzer/network/protocol/dubbo2├── dubbo2_parser.go            Dubbo2解析器├── dubbo2_request.go           实现申请解析流程├── dubbo2_response.go          实现响应解析流程└── dubbo2_serialize.go         Dubbo2反序列化器

3.3.1 dubbo2_request.go

申明request申请的fastFail函数

  • dubbo2有魔数0xdabb可疾速辨认
func fastfailDubbo2Request() protocol.FastFailFn {    return func(message *protocol.PayloadMessage) bool {        return len(message.Data) < 16 || message.Data[0] != MagicHigh || message.Data[1] != MagicLow    }}

申明request申请的解析函数

  • 将解析出 服务/办法作为 相似于URL的Key
  • 存储报文内容
func parseDubbo2Request() protocol.ParsePkgFn {    return func(message *protocol.PayloadMessage) (bool, bool) {        contentKey := getContentKey(message.Data)        if contentKey == "" {            return false, true        }        message.AddStringAttribute(constlabels.ContentKey, contentKey)        message.AddStringAttribute(constlabels.Dubbo2RequestPayload, getAsciiString(message.GetData(16, protocol.GetDubbo2PayLoadLength())))        return true, true    }}

解析Dubbo2申请

  • 过滤非法申请
  • 思考到dubbo2存在单向和心跳申请,这些申请不做解析
  • 依据报文构造解析相应指标
func getContentKey(requestData []byte) string {    serialID := requestData[2] & SerialMask    if serialID == Zero {        return ""    }    if (requestData[2] & FlagEvent) != Zero {        return "Heartbeat"    }    if (requestData[2] & FlagRequest) == Zero {        // Invalid Data        return ""    }    if (requestData[2] & FlagTwoWay) == Zero {        // Ignore Oneway Data        return "Oneway"    }    serializer := GetSerializer(serialID)    if serializer == serialUnsupport {        // Unsupport Serial. only support hessian and fastjson.        return "UnSupportSerialFormat"    }    var (        service string        method  string    )    // version    offset := serializer.eatString(requestData, 16)    // service name    offset, service = serializer.getStringValue(requestData, offset)    // service version    offset = serializer.eatString(requestData, offset)    // method name    _, method = serializer.getStringValue(requestData, offset)    return service + "#" + method}

3.3.2 dubbo2_serialize.go

因为dubbo2内置了多套序列化形式,先定义接口dubbo2Serializer

type dubbo2Serializer interface {    eatString(data []byte, offset int) int    getStringValue(data []byte, offset int) (int, string)}

dubbo2默认的序列化形式是hessian2,此处实现hessian2形式

type dubbo2Hessian struct{}func (dh *dubbo2Hessian) eatString(data []byte, offset int) int {    dataLength := len(data)    if offset >= dataLength {        return dataLength    }    tag := data[offset]    if tag >= 0x30 && tag <= 0x33 {        if offset+1 == dataLength {            return dataLength        }        // [x30-x34] <utf8-data>        return offset + 2 + int(tag-0x30)<<8 + int(data[offset+1])    } else {        return offset + 1 + int(tag)    }}func (dh *dubbo2Hessian) getStringValue(data []byte, offset int) (int, string) {    dataLength := len(data)    if offset >= dataLength {        return dataLength, ""    }    var stringValueLength int    tag := data[offset]    if tag >= 0x30 && tag <= 0x33 {        if offset+1 == dataLength {            return dataLength, ""        }        // [x30-x34] <utf8-data>        stringValueLength = int(tag-0x30)<<8 + int(data[offset+1])        offset += 2    } else {        stringValueLength = int(tag)        offset += 1    }    if offset+stringValueLength >= len(data) {        return dataLength, string(data[offset:])    }    return offset + stringValueLength, string(data[offset : offset+stringValueLength])}

对外裸露公共办法,用于获取序列化形式

var (    serialHessian2  = &dubbo2Hessian{}    serialUnsupport = &dubbo2Unsupport{})func GetSerializer(serialID byte) dubbo2Serializer {    switch serialID {    case SerialHessian2:        return serialHessian2    default:        return serialUnsupport    }}

3.3.3 dubbo2_response.go

申明response响应的fastFail函数

  • 与request相似,采纳魔数0xdabb可疾速辨认
func fastfailDubbo2Response() protocol.FastFailFn {    return func(message *protocol.PayloadMessage) bool {        return len(message.Data) < 16 || message.Data[0] != MagicHigh || message.Data[1] != MagicLow    }}

申明response响应的解析函数

  • 依据 status解析出对应的errorCode
  • 存储报文内容
func parseDubbo2Response() protocol.ParsePkgFn {    return func(message *protocol.PayloadMessage) (bool, bool) {        errorCode := getErrorCode(message.Data)        if errorCode == -1 {            return false, true        }        message.AddIntAttribute(constlabels.Dubbo2ErrorCode, errorCode)        if errorCode > 20 {            // 有errorCode或errorMsg等常见,需定义IsError为true用于后续processor生成Metric            message.AddBoolAttribute(constlabels.IsError, true)            message.AddIntAttribute(constlabels.ErrorType, int64(constlabels.ProtocolError))        }        message.AddStringAttribute(constlabels.Dubbo2ResponsePayload, getAsciiString(message.GetData(16, protocol.GetDubbo2PayLoadLength())))        return true, true    }}

解析Dubbo2响应

  • 过滤非法响应
  • 依据报文构造解析相应指标
func getErrorCode(responseData []byte) int64 {    SerialID := responseData[2] & SerialMask    if SerialID == Zero {        return -1    }    if (responseData[2] & FlagEvent) != Zero {        return 20    }    if (responseData[2] & FlagRequest) != Zero {        // Invalid Data        return -1    }    return int64(responseData[3])}

3.3.4 dubbo2_parser.go

申明dubbo2解析器

  • 通过CreatePkgParser()别离定义Reques / Response解析器
  • 通过NewProtocolParser()将Request / Response解析器生成Dubbo2解析器
func NewDubbo2Parser() *protocol.ProtocolParser {    requestParser := protocol.CreatePkgParser(fastfailDubbo2Request(), parseDubbo2Request())    responseParser := protocol.CreatePkgParser(fastfailDubbo2Response(), parseDubbo2Response())    return protocol.NewProtocolParser(protocol.DUBBO2, requestParser, responseParser, nil)}

3.4 注册dubbo2解析器

在factory.go中注册dubbo2协定的解析器

var (    ...    dubbo2_parser   *protocol.ProtocolParser = dubbo2.NewDubbo2Parser())func GetParser(key string) *protocol.ProtocolParser {    switch key {        ...        case protocol.DUBBO2:            return dubbo2_parser        ...        default:            return nil    }}

3.5 申明反对协定

在deploy/kindling-collector-config.yml中申明dubbo2协定

analyzers:  networkanalyzer:    protocol_parser: [ http, mysql, dns, redis, kafka, dubbo2 ]    protocol_config:      - key: "dubbo2"        payload_length: 200

Kindling是一款基于eBPF的云原生可观测性开源工具,旨在帮忙用户更好更快的定界云原生零碎问题,并致力于打造云原生全故障域的定界能力。欢送这方面的使用者和爱好者与咱们分割。

退出咱们

关注咱们