共计 9109 个字符,预计需要花费 23 分钟才能阅读完成。
1 我的项目概览
Kindling collector 我的项目作为 Go 端分析器,应用相似 opentelmetry 的 pinpeline 进行数据分析。其中波及 5 个组件:
- UdsReceiver – Unix Domain Socket 接收器,接管探针事件并传递给后续的网络分析器。
- NetworkAnalyzer – 网络事件分析器,将接管的 Read / Write 事件匹配成单次申请后,依据协定解析申请内容生成要害指标,传递给后续的分析器。
- K8sMetadataProcessor – K8S 指标处理器,补充 K8S 指标并传递给后续的聚合处理器
- AggregateProcessor – 数据聚合处理器,将接管的指标数据生成 Trace 和 Metric,传递给给后续的转发器。
- OtelExporter – Opentelmetry 转发器,将 Trace / Metric 数据转发给 Prometheus 进行展现。
其中协定解析流程次要在 NetworkAnalyzer 组件中进行,将接管的申请 / 响应事件成对匹配后,交由 parseProtocols()函数解析出协定指标。
1.1 协定解析流程
NetworkAnnalyzer.parseProtocols()办法定义了整体解析流程,依据协定解析器别离解析申请和响应,当最终都胜利时输入指标。
1.2 协定解析设计思路
失常的协定解析只负责逐帧解析指标性能。
现已反对 5 种协定解析,当协定越来越多时,遍历引起的解析会越来越耗时,那么需引入 fastfail 疾速辨认协定
对于简单的多报文协定,如 Kafka 有不同的 API 报文,而雷同 API 也有不同的版本报文。将所有报文解析逻辑都写在一起会使整个类过于臃肿且不易保护。为此引入树形多报文构造用于疾速且低耦合地实现开发。
1.2.1 报文解析构造体
在树形报文解析过程中,有如下 2 个场景须要思考
- 当父协定解析了指标 A,子协定解析可能会用到 A 指标,那么父子协定解析的指标须要透传。
- 父协定已解析了局部报文长度的内容,那么子协定在开始解析时可间接跳过相应长度的内容进行解析,此处引入偏移量用于下一个协定跳过解析。
定义 PayloadMessage,封装报文内容、读取偏移量和指标存储的 Map。
type PayloadMessage struct {Data []byte
Offset int
attributeMap *model.AttributeMap
}
1.2.2 报文解析 API
因为引入协定树,协定解析过程 parse() (ok bool)将不再实用。协定树中的个协定的解析胜利不示意整个协定解析胜利,需解析整颗树的协定是否胜利,将 API 扩大为 parse() (ok bool, complete bool)。
- 对于单层协定(HTTP),返回为(ok, true)
基于以上几点需要,设计树形构造的报文解析器 PkgParser。PkgParser 定义了 fastFail(疾速辨认失败) 和 parser(解析单个报文)函数;每个协定只需注册本身的 PkgParser 即可接入整套流程。
fastFail(message *PayloadMessage) (fail bool)
- 申明协定是否辨认失败,用于疾速辨认协定
parser(message *PayloadMessage) (ok bool, complete bool)
- 解析协定,将解析出的指标存储到 message 的 Attributes 中
-
返回是 2 个参数:
- 是否解析胜利
- 是否解析实现 (默认为 true,当为 false 次要是用于嵌套解析过程,例如先定义一个头解析,再定义不同的音讯体解析)。
1.3 申请 / 响应解析
ProtocolParser 定义了申请和响应的解析器,并提供 ParseRequest()和 ParseResponse() API 用于解析申请和响应
其中 response 的 message 携带了 request 解析出的 attributes 指标,用于匹配。eg. kafka 的 correlationId request 和 response 报文需统一,且 response 报文解析用到了 request 的 key。
2 开发流程
2.1 增加协定名
const (
HTTP = "http"
...
XX = "xx" // 此处替换具体协定名
...
)
2.2 创立协定
analyzer/network/protocol 目录下创立文件夹 xx,xx 替换为具体协定,并创立 3 个文件 xx_parser.go、xx_request.go 和 xx_response.go
analyzer/network/protocol/xx
├── xx_parser.go 协定解析器
├── xx_request.go 实现申请解析流程
└── xx_response.go 实现响应解析流程
2.2.1 xx_request.go
实现 fastfail()和 parser()函数
func fastfailXXRequest() protocol.FastFailFn {return func(message *protocol.PayloadMessage) bool {// 依据报文实现具体的 fastFail()函数
return false
}
}
func parseXXRequest() protocol.ParsePkgFn {return func(message *protocol.PayloadMessage) (bool, bool) {
// 解析报文内容
contentKey := getContentKey(message.Data)
if contentKey == "" {
// 第一个参数 false 示意解析失败,第二个参数示意报文解析实现
return false, true
}
// 通过 AddStringAttribute() 或 AttIntAttribute() 存储解析出的属性
message.AddStringAttribute(constlabels.ContentKey, contentKey)
// 解析胜利
return true, true
}
}
2.2.2 xx_response.go
实现 fastfail()和 parser()函数
func fastfailXXResponse() protocol.FastFailFn {return func(message *protocol.PayloadMessage) bool {// 依据报文实现具体的 fastFail()函数
return false
}
}
func parseXXResponse() protocol.ParsePkgFn {return func(message *protocol.PayloadMessage) (bool, bool) {// 通过 GetStringAttribute() 或 GetIntAttribute() 读取 request 解析后的参数
contentKey := message.GetStringAttribute(constlabels.ContentKey)
// 解析响应报文
errorCode := getErrorCode(message)
if errorCode > 20 {
// 有 errorCode 或 errorMsg 等常见,需定义 IsError 为 true 用于后续 processor 生成 Metric
message.AddBoolAttribute(constlabels.IsError, true)
message.AddIntAttribute(constlabels.ErrorType, int64(constlabels.ProtocolError))
}
message.AddStringAttribute(constlabels.XXErrorCode, errorCode)
// 解析胜利
return true, true
}
}
2.2.3 xx_parser.go
定义协定解析器
func NewXXParser() *protocol.ProtocolParser {requestParser := protocol.CreatePkgParser(fastfailXXRequest(), parseXXRequest())
// 当存在嵌套的协定解析 eg. 解析头 + 解析各类不同报文
// 可通过 Add()增加子协定,生成一颗协定树解析,顶部是公共局部解析,分叉是各个不同报文解析
// Header
// / | \
// API1 API2 API3
// /|\
// v1 v2 v3
responseParser := protocol.CreatePkgParser(fastfailXXResponse(), parseXXResponse())
return protocol.NewProtocolParser(protocol.XX, requestParser, responseParser, nil)
}
2.2.4 factory.go
注册协定解析器
var (
...
xx_parser *protocol.ProtocolParser = xx.NewXXParser())
func GetParser(key string) *protocol.ProtocolParser {
switch key {
...
case protocol.XX:
return xx_parser
...
default:
return nil
}
}
2.2.5 kindling-collector-config.yml
配置新增协定
analyzers:
networkanalyzer:
protocol_parser: [http, mysql, dns, redis, kafka, xx]
3 开发案例 – Dubbo2 协定
3.1 dubbo2 协定剖析
依据官网提供的协定标准,解析网络抓包的数据。
- 前 2 个 byte 为魔数,可用于 fastfail()办法
- 第 3 个 byte 蕴含 Req/Resp、序列化形式等信息,可用于解析协定中判断是否非法报文。
- 第 4 个 byte 用于返回报文的错误码
- 第 16 个 byte 开始需通过指定的序列化形式解析报文内容,service name + method name 可用于 contentKey 标识该申请的 URL
3.2 申明协定名
const (
...
DUBBO2 = "dubbo2"
...
)
3.3 实现 dubbo2 解析
创立协定相干文件
kindling/collector/analyzer/network/protocol/dubbo2
├── dubbo2_parser.go Dubbo2 解析器
├── dubbo2_request.go 实现申请解析流程
├── dubbo2_response.go 实现响应解析流程
└── dubbo2_serialize.go Dubbo2 反序列化器
3.3.1 dubbo2_request.go
申明 request 申请的 fastFail 函数
- dubbo2 有魔数 0xdabb 可疾速辨认
func fastfailDubbo2Request() protocol.FastFailFn {return func(message *protocol.PayloadMessage) bool {return len(message.Data) < 16 || message.Data[0] != MagicHigh || message.Data[1] != MagicLow
}
}
申明 request 申请的解析函数
- 将解析出 服务 / 办法作为 相似于 URL 的 Key
- 存储报文内容
func parseDubbo2Request() protocol.ParsePkgFn {return func(message *protocol.PayloadMessage) (bool, bool) {contentKey := getContentKey(message.Data)
if contentKey == "" {return false, true}
message.AddStringAttribute(constlabels.ContentKey, contentKey)
message.AddStringAttribute(constlabels.Dubbo2RequestPayload, getAsciiString(message.GetData(16, protocol.GetDubbo2PayLoadLength())))
return true, true
}
}
解析 Dubbo2 申请
- 过滤非法申请
- 思考到 dubbo2 存在单向和心跳申请,这些申请不做解析
- 依据报文构造解析相应指标
func getContentKey(requestData []byte) string {serialID := requestData[2] & SerialMask
if serialID == Zero {return ""}
if (requestData[2] & FlagEvent) != Zero {return "Heartbeat"}
if (requestData[2] & FlagRequest) == Zero {
// Invalid Data
return ""
}
if (requestData[2] & FlagTwoWay) == Zero {
// Ignore Oneway Data
return "Oneway"
}
serializer := GetSerializer(serialID)
if serializer == serialUnsupport {
// Unsupport Serial. only support hessian and fastjson.
return "UnSupportSerialFormat"
}
var (
service string
method string
)
// version
offset := serializer.eatString(requestData, 16)
// service name
offset, service = serializer.getStringValue(requestData, offset)
// service version
offset = serializer.eatString(requestData, offset)
// method name
_, method = serializer.getStringValue(requestData, offset)
return service + "#" + method
}
3.3.2 dubbo2_serialize.go
因为 dubbo2 内置了多套序列化形式,先定义接口 dubbo2Serializer
type dubbo2Serializer interface {eatString(data []byte, offset int) int
getStringValue(data []byte, offset int) (int, string)
}
dubbo2 默认的序列化形式是 hessian2,此处实现 hessian2 形式
type dubbo2Hessian struct{}
func (dh *dubbo2Hessian) eatString(data []byte, offset int) int {dataLength := len(data)
if offset >= dataLength {return dataLength}
tag := data[offset]
if tag >= 0x30 && tag <= 0x33 {
if offset+1 == dataLength {return dataLength}
// [x30-x34] <utf8-data>
return offset + 2 + int(tag-0x30)<<8 + int(data[offset+1])
} else {return offset + 1 + int(tag)
}
}
func (dh *dubbo2Hessian) getStringValue(data []byte, offset int) (int, string) {dataLength := len(data)
if offset >= dataLength {return dataLength, ""}
var stringValueLength int
tag := data[offset]
if tag >= 0x30 && tag <= 0x33 {
if offset+1 == dataLength {return dataLength, ""}
// [x30-x34] <utf8-data>
stringValueLength = int(tag-0x30)<<8 + int(data[offset+1])
offset += 2
} else {stringValueLength = int(tag)
offset += 1
}
if offset+stringValueLength >= len(data) {return dataLength, string(data[offset:])
}
return offset + stringValueLength, string(data[offset : offset+stringValueLength])
}
对外裸露公共办法,用于获取序列化形式
var (serialHessian2 = &dubbo2Hessian{}
serialUnsupport = &dubbo2Unsupport{})
func GetSerializer(serialID byte) dubbo2Serializer {
switch serialID {
case SerialHessian2:
return serialHessian2
default:
return serialUnsupport
}
}
3.3.3 dubbo2_response.go
申明 response 响应的 fastFail 函数
- 与 request 相似,采纳魔数 0xdabb 可疾速辨认
func fastfailDubbo2Response() protocol.FastFailFn {return func(message *protocol.PayloadMessage) bool {return len(message.Data) < 16 || message.Data[0] != MagicHigh || message.Data[1] != MagicLow
}
}
申明 response 响应的解析函数
- 依据 status 解析出对应的 errorCode
- 存储报文内容
func parseDubbo2Response() protocol.ParsePkgFn {return func(message *protocol.PayloadMessage) (bool, bool) {errorCode := getErrorCode(message.Data)
if errorCode == -1 {return false, true}
message.AddIntAttribute(constlabels.Dubbo2ErrorCode, errorCode)
if errorCode > 20 {
// 有 errorCode 或 errorMsg 等常见,需定义 IsError 为 true 用于后续 processor 生成 Metric
message.AddBoolAttribute(constlabels.IsError, true)
message.AddIntAttribute(constlabels.ErrorType, int64(constlabels.ProtocolError))
}
message.AddStringAttribute(constlabels.Dubbo2ResponsePayload, getAsciiString(message.GetData(16, protocol.GetDubbo2PayLoadLength())))
return true, true
}
}
解析 Dubbo2 响应
- 过滤非法响应
- 依据报文构造解析相应指标
func getErrorCode(responseData []byte) int64 {SerialID := responseData[2] & SerialMask
if SerialID == Zero {return -1}
if (responseData[2] & FlagEvent) != Zero {return 20}
if (responseData[2] & FlagRequest) != Zero {
// Invalid Data
return -1
}
return int64(responseData[3])
}
3.3.4 dubbo2_parser.go
申明 dubbo2 解析器
- 通过 CreatePkgParser()别离定义 Reques / Response 解析器
- 通过 NewProtocolParser()将 Request / Response 解析器生成 Dubbo2 解析器
func NewDubbo2Parser() *protocol.ProtocolParser {requestParser := protocol.CreatePkgParser(fastfailDubbo2Request(), parseDubbo2Request())
responseParser := protocol.CreatePkgParser(fastfailDubbo2Response(), parseDubbo2Response())
return protocol.NewProtocolParser(protocol.DUBBO2, requestParser, responseParser, nil)
}
3.4 注册 dubbo2 解析器
在 factory.go 中注册 dubbo2 协定的解析器
var (
...
dubbo2_parser *protocol.ProtocolParser = dubbo2.NewDubbo2Parser())
func GetParser(key string) *protocol.ProtocolParser {
switch key {
...
case protocol.DUBBO2:
return dubbo2_parser
...
default:
return nil
}
}
3.5 申明反对协定
在 deploy/kindling-collector-config.yml 中申明 dubbo2 协定
analyzers:
networkanalyzer:
protocol_parser: [http, mysql, dns, redis, kafka, dubbo2]
protocol_config:
- key: "dubbo2"
payload_length: 200
Kindling 是一款基于 eBPF 的云原生可观测性开源工具,旨在帮忙用户更好更快的定界云原生零碎问题,并致力于打造云原生全故障域的定界能力。欢送这方面的使用者和爱好者与咱们分割。
退出咱们
关注咱们