关于sql:万字详解搜狐智能媒体基于-Zipkin-和-StarRocks-的微服务链路追踪实践

7次阅读

共计 20742 个字符,预计需要花费 52 分钟才能阅读完成。

作者:翟东波、叶书俊

在微服务体系架构下,搜狐智能媒体应用 Zipkin 进行服务链路追踪(Tracing)的埋点采集,将采集的 Trace 信息存储到 StarRocks 中。通过 StarRocks 弱小的 SQL 计算能力,对 Tracing 信息进行多维度的统计、剖析等操作,晋升了微服务监控能力,从简略统计的 Monitoring 回升到更多维度摸索剖析的 Observability。

全文次要分为三个局部:第一节次要介绍微服务下的罕用监控形式,其中链路追踪技术,能够串联整个服务调用链路,取得整体服务的要害信息,对微服务的监控有十分重要的意义。第二节次要介绍搜狐智能媒体是如何构建链路追踪剖析体系的,次要包含 Zipkin 的数据采集,StarRocks 的数据存储,以及依据利用场景对 StarRocks 进行剖析计算等三个局部。第三节次要介绍搜狐智能媒体通过引入 Zipkin 和 StarRocks 进行链路追踪剖析获得的一些实际成果。

01 微服务架构中的链路追踪

近年来,企业 IT 利用架构逐渐向微服务、云原生等分布式应用架构演进,在搜狐智能媒体外部,应用服务依照微服务、Docker、Kubernetes、Spring Cloud 等架构思维和技术计划进行研发运维,晋升部门整体工程效率

微服务架构晋升工程效率的同时,也带来了一些新的问题。微服务是一个分布式架构,它按业务划分服务单元,用户的每次申请不再是由某一个服务独立实现了,而是变成了多个服务一起配合实现。这些服务可能是由不同的团队、应用不同的编程语言实现,可能布在了不同的服务器、甚至不同的数据中心。如果用户申请呈现了谬误和异样,微服务分布式调用的个性决定了这些 故障难以定位,绝对于传统的单体架构,微服务监控面临着新的难题。

Logging、Metrics、Tracing

微服务监控能够蕴含很多形式,依照监测的数据类型次要划分为 Logging、Metrics 和 Tracing 三大畛域:

Logging

用户被动记录的离散事件,记录的信息个别是非结构化的文本内容,在用户进行问题分析判断时能够提供更为详尽的线索。

具备聚合属性的采集数据,旨在为用户展现某个指标在某个时段的运行状态,用于查看一些指标和趋势。

Tracing

记录一次申请调用的生命周期全过程,其中包含服务调用和解决时长等信息,含有申请上下文环境,由一个全局惟一的 Trace ID 来进行标识和串联整个调用链路,非常适合微服务架构的监控场景。


图 1

三者的关系如上图所示,这三者之间也是有重叠的,比方 Logging 能够聚合相干字段生成 Metrics 信息,关联相干字段生成 Tracing 信息;Tracing 能够聚合查问次数生成 Metrics 信息,能够记录业务日志生成 Logging 信息。个别状况下要在 Metrics 和 Logging 中减少字段串联微服务申请调用生命周期比拟艰难,通过 Tracing 获取 Metrics 和 Logging 则绝对容易很多。

另外,这三者对存储资源有着不同的需要,Metrics 是人造的压缩数据,最节俭资源;Logging 偏向于有限减少的,甚至会超出预期的容量;Tracing 的存储容量,个别介于 Metrics 和 Logging 两者之间,另外还可通过采样率进一步管制容量需要。

从 Monitoring 到 Observability

Monitoring tells you whether the system works. Observability lets you ask why it’s not working.

– Baron Schwarz
微服务监控从数据分析档次,能够简略分为 Monitoring 和 Observability。

Monitoring

通知你零碎是否在工作,对已知场景的预约义计算,对各种监控问题的事先假如。对应上图 Known Knowns 和 Known Unknowns,都是当时假如可能会产生的事件,包含曾经明确和不明确的事件。

Observability

能够让你询问零碎为什么不工作,对未知场景的摸索式剖析,对任意监控问题的预先剖析。对应上图 Unknown Knowns 和 Unknown Unknowns,都是事未觉察可能会产生的事件,包含曾经明确和不明确的事件。

很显然,通过事后假如所有可能产生事件进行 Monitoring 的形式,曾经不能满足微服务简单的监控场景,咱们须要可能提供摸索式剖析的 Observability 监控形式。在 Logging、Metrics 和 Tracing,Tracing 是目前能提供多维度监控剖析能力的最无效形式。

Tracing

链路追踪 Tracing Analysis 为分布式应用的开发者提供了残缺的调用链路还原、调用申请量统计、链路拓扑、利用依赖剖析等工具,能够帮忙开发者疾速剖析和诊断分布式应用架构下的性能瓶颈,进步微服务时代下的开发诊断效率。

Tracing 能够串联微服务中分布式申请的调用链路,在微服务监控体系中有着重要的作用。另外,Tracing 介于 Metrics 和 Logging 之间,既能够实现 Monitoring 的工作,也能够进行 Observability 的剖析,晋升监控体系建设效率。

零碎模型

链路追踪(Tracing)零碎,须要记录一次特定申请通过的上下游服务调用链路,以及各服务所实现的相干工作信息。

如下图所示的微服务零碎,用户向服务 A 发动一个申请,服务 A 会生成一个全局惟一的 Trace ID,服务 A 外部 Messaging 形式调用相干解决模块(比方跨线程异步调用等),服务 A 模块再通过 RPC 形式并行调用服务 B 和服务 C;服务 B 会即刻返回响应,但服务 C 会采纳串行形式,先用 RPC 调用服务 D,再用 RPC 调用服务 E,而后再响应服务 A 的调用申请;服务 A 在外部两个模块调用解决完后,会响应最后的用户申请。

最开始生成的 Trace ID 会在这一系列的服务外部或服务之间的申请调用中传递,从而将这些申请调用连接起来。另外,Tracing 零碎还会记录每一个申请调用解决的 Timestamp、服务名等等相干信息。

图 3(注:服务外部串行调用对系统性能有影响,个别采纳并行调用形式,后续章节将只思考并行调用场景。)

在 Tracing 零碎中,次要蕴含 Trace 和 Span 两个根底概念,下图展现了一个由 Span 形成的 Trace。

图 4

Trace 指一个内部申请通过的所有服务的调用链路,能够了解为一个有服务调用组成的树状构造,每条链路都有一个全局惟一的 ID 来标识。

Span 指服务外部或服务之间的一次调用,即 Trace 树中的节点,如下图所示的由 Span 形成的 Trace 树,树中的 Span 节点之间存在父子关系。Span 次要蕴含 Span 名称、Span ID、父 ID,以及 Timestamp、Dration(蕴含子节点调用解决的 duration)、业务数据等其余 log 信息。

Span 依据调用形式能够分为 RPC Span 和 Messaging Span:

RPC Span

由 RPC Tracing 生成,分为 Client 和 Server 两类 Span,别离由 RPC 服务调用的 Client 节点和 Server 节点记录生成,两者共享 Span ID、Parent Span ID 等信息,但要留神,这两个 Span 记录的工夫是有偏差,这个偏差是服务间的调用开销,个别是由网络传输开销、代理服务或服务接口音讯排队等状况引起的。

Messaging Span

由 Messaging Tracing 生成,个别用于 Tracing 服务外部调用,不同于 RPC Span,Messaging Span 之间不会共享 Span ID 等信息。

利用场景

依据 Tracing 的零碎模型,可取得服务响应等各类 Metric 信息,用于 Alerting、DashBoard 查问等;也可依据 Span 组成的链路,剖析单个或整体服务状况,发现服务性能瓶颈、网络传输开销、服务内异步调用设计等各种问题。如下图所示,相比于 Metrics 和 Logging,Tracing 能够同时涵盖监控的 Monitoring 和 Observability 场景,在监控体系中占据重要地位,Opentracing、Opencensus、Opentelemetry 等协会和组织都蕴含对 Tracing 的反对。

图 5

从微服务的角度,Tracing 记录的 Span 信息能够进行各种维度的统计和剖析。下图基于 HTTP API 设计的微服务零碎为例,用户查问 Service1 的 /1/api 接口,Service1 再申请 Service2 的 /2/api,Service2 外部异步并发调用 msg2.1 和 msg2.2,msg2.1 申请 Service3 的 /3/api 接口,msg2.2 申请 Service4 的 /4/api 接口,Service3 外部调用 msg3,Service4 再申请 Service5 的 /5/api,其中 Service5 没有进行 Tracing 埋点,无奈采集 Service5 的信息。

图 6

针对上图的微服务零碎,能够进行如下两大类的统计分析操作:

服务内剖析

关注单个服务运行状况,比方对外服务接口和上游接口查问的性能指标等,剖析场景次要有:

1、上游服务申请

如 Service1 提供的 /1/api ,Service4 提供的 /4/api 等,统计取得次数、QPS、耗时百分位数、出错率、超时率等等 metric 信息。

2、上游服务响应

如 Service1 申请的 /2/api、Service4 申请的 /5/api 等,统计查问次数、QPS、耗时百分位数、出错率、超时率等等 Metric 信息。

3、服务外部解决

服务对外接口在外部可能会被分拆为多个 Span,能够依照 Span Name 进行分组聚合统计,发现耗时最长的 Span 等,如 Service2 接口 /2/api,接口服务外部 Span 包含 /2/api 的 Server Span,call2.1 对应的 Span 和 call2.2 对应的 Span,通过 Span 之间的依赖关系能够算出这些 Span 本身的耗时 Duraion,进行各类统计分析。

服务间剖析

在进行微服务整体剖析时,咱们将单个服务看作黑盒,关注服务间的依赖、调用链路上的服务热点等,剖析场景次要有:

1、服务拓扑统计

能够依据服务间调用的 Client Span 和 Server Span,取得整个服务零碎的拓扑构造,以及服务之间调用申请次数、Duration 等统计信息。

2、调用链路性能瓶颈剖析

剖析某个对外申请接口的调用链路上的性能瓶颈,这个瓶颈可能是某个服务外部解决开销造成的,也可能是某两个服务间的网络调用开销等等起因造成的。

对于一次调用波及到数十个以上微服务的简单调用申请,每次呈现的性能瓶颈很可能都会不一样,此时就须要进行聚合统计,算出性能瓶颈呈现频次的排名,剖析出针对性能瓶颈热点的服务或服务间调用。

以上仅仅是列举的局部剖析场景,Tracing 提供的信息其实能够反对更多的 Metric 统计和摸索式剖析场景,本文不再一一例举。

02 基于 Zipkin 和 StarRocks 构建链路追踪剖析零碎

链路追踪零碎次要分为数据采集、数据存储和剖析计算三大部分,目前应用最宽泛的开源链路追踪零碎是 Zipkin,它次要包含数据采集和剖析计算两大部分,底层的存储依赖其余存储系统。搜狐智能媒体在构建链路追踪零碎时,最后采纳 Zipkin + ElasticSearch 得形式进行构建,后减少 StarRocks 作为底层存储系统,并基于 StarRocks 进行剖析统计,零碎总体架构如下图。

图 7

数据采集

Zipkin 反对客户端全自动埋点,只需将相干库引入应用程序中并简略配置,就能够实现 Span 信息主动生成,Span 信息通过 HTTP 或 Kafka 等形式主动进行上传。Zipkin 目前提供了绝大部分语言的埋点采集库,如 Java 语言的 Spring Cloud 提供了 Sleuth 与 Zipkin 进行深度绑定,对开发人员根本做到通明应用。为了解决存储空间,在应用时个别要设置 1/100 左右的采样率,Dapper 的论文中提到即使是 1/1000 的采样率,对于跟踪数据的通用应用层面上,也能够提供足够多的信息。

数据模型

对应 图 6,上面给出了 Zipkin Span 埋点采集示意图 (图 8),具体流程如下:

图 8

  1. 用户发送给 Service1 的 Request 中,不含有 Trace 和 Span 信息,Service1 会创立一个 Server Span,随机生成全局惟一的 TraceID(如图中的 X)和 SpanId(如图中的 A,此处的 X 和 A 会应用雷同的值),记录 Timestamp 等信息;Service1 在给用户返回 Response 时,Service1 会统计 Server Span 的解决耗时 Duration,会将蕴含 TraceID、SpanID、Timestamp、Duration 等信息的 Server Span 残缺信息进行上报。
  2. Service1 向 Service2 发送的申请,会创立一个 Client Span,应用 X 作为 Trace ID,随机生成全局惟一的 SpanID(如图中的 B), 记录 Timestamp 等信息,同时 Service1 会将 Trace ID(X)和 SpanID(B)传递给 Service2(如在 HTTP 协定的 HEADER 中增加 TraceID 和 SpanID 等相干字段);Service1 在收到 Service2 的响应后,Service1 会解决 Client Span 相干信息,并将 Client Span 进行上报
  3. Service2 收到 Service1 的 Request 中,蕴含 Trace(X)和 Span(B)等信息,Service2 会创立一个 Server Span,应用 X 作为 Trace ID,B 作为 SpanID,外部调用 msg2.1 和 msg2.2 同时,将 Trace ID(X)和 SpanID(B)传递给它们;Service2 在收到 msg2.1 和 msg2.2 的返回后,Service1 会解决 Server Span 相干信息,并将此 Server Span 进行上报
  4. Service2 的 msg2.1 和 msg2.2 会别离创立一个 Messaging Span,应用 X 作为 Trace ID,随机生成全局惟一的 SpanID(如图中的 C 和 F), 记录 Timestamp 等信息,别离向 Service3 和 Service4 发送申请;msg2.1 和 msg2.2 收到响应后,会别离解决 Messaging Span 相干信息,并将两个 Messaging Span 进行上报
  5. Service2 向 Service3 和 Service4 发送的申请,会各创立一个 Client Span,应用 X 作为 Trace ID,随机生成全局惟一的 SpanID(如图中的 D 和 G), 记录 Timestamp 等信息,同时 Service2 会将 Trace ID(X)和 SpanID(D 或 G)传递给 Service3 和 Service4;Service12 在收到 Service3 和 Service3 的响应后,Service2 会别离解决 Client Span 相干信息,并将两个 Client Span 进行上报
  6. Service3 收到 Service2 的 Request 中,蕴含 Trace(X)和 Span(D)等信息,Service3 会创立一个 Server Span,应用 X 作为 Trace ID,D 作为 SpanID,外部调用 msg3;Service3 在收到 msg3 的返回后,Service3 会解决此 Server Span 相干信息,并将此 Server Span 进行上报
  7. Service3 的 msg3 会别离创立一个 Messaging Span,应用 X 作为 Trace ID,随机生成全局惟一的 SpanID(如图中的 E), 记录 Timestamp 等信息,msg3 解决实现后,解决此 Messaging Span 相干信息,并将此 Messaging Span 进行上报
  8. Service4 收到 Service2 的 Request 中,蕴含 Trace(X)和 Span(G)等信息,Service4 会创立一个 Server Span,应用 X 作为 Trace ID,G 作为 SpanID,再向 Service5 发送申请;Service4 在收到 Service5 的响应后,Service4 会解决此 Server Span 相干信息,并将此 Server Span 进行上报
  9. Service4 向 Service5 发送的申请,会创立一个 Client Span,应用 X 作为 Trace ID,随机生成全局惟一的 SpanID(如图中的 H), 记录 Timestamp 等信息,同时 Service4 会将 Trace ID(X)和 SpanID(H)传递给 Service5;Service4 在收到 Service5 的响应后,Service4 会解决 Client Span 相干信息,并将此 Client Span 进行上报

下面整个 Trace X 调用链路会生成的 Span 记录如下图,每个 Span 次要会记录 Span Id、Parent Id、Kind(CLIENT 示意 RPC CLIENT 端 Span,SERVER 示意 RPC SERVER 端 SPAN,NULL 示意 Messaging SPAN),SN(Service Name),还会蕴含 Trace ID,工夫戳、Duration 等信息。Service5 没有进行 Zipkin 埋点采集,因而不会有 Service5 的 Span 记录。

图 9

数据格式

设置了 Zipkin 埋点的应用服务,默认会应用 Json 格局向 Kafka 上报 Span 信息,上报的信息次要有如下几个留神点:

每个应用服务每次会上报一组 Span,组成一个 Json 数组上报

Json 数组里蕴含不同 Trace 的 Span,即不是所有的 Trace ID 都 雷同

不同模式的接口(如 Http、Grpc、Dubbo 等),除了次要字段雷同外,在 tags 中会各自记录一些不同的字段

[
  {
    "traceId": "3112dd04c3112036",
    "id": "3112dd04c3112036",
    "kind": "SERVER",
    "name": "get /2/api",
    "timestamp": 1618480662355011,
    "duration": 12769,
    "localEndpoint": {
      "serviceName": "SERVICE2",
      "ipv4": "172.24.132.32"
    },
    "remoteEndpoint": {
      "ipv4": "111.25.140.166",
      "port": 50214
    },
    "tags": {
      "http.method": "GET",
      "http.path": "/2/api",
      "mvc.controller.class": "Controller",
      "mvc.controller.method": "get2Api"
    }
  },
  {
    "traceId": "3112dd04c3112036",
    "parentId": "3112dd04c3112036",
    "id": "b4bd9859c690160a",
    "name": "msg2.1",
    "timestamp": 1618480662357211,
    "duration": 11069,
    "localEndpoint": {"serviceName": "SERVICE2"},
    "tags": {
      "class": "MSG",
      "method": "msg2.1"
    }
  },
  {
    "traceId": "3112dd04c3112036",
    "parentId": "3112dd04c3112036",
    "id": "c31d9859c69a2b21",
    "name": "msg2.2",
    "timestamp": 1618480662357201,
    "duration": 10768,
    "localEndpoint": {"serviceName": "SERVICE2"},
    "tags": {
      "class": "MSG",
      "method": "msg2.2"
    }
  },
  {
    "traceId": "3112dd04c3112036",
    "parentId": "b4bd9859c690160a",
    "id": "f1659c981c0f4744",
    "kind": "CLIENT",
    "name": "get /3/api",
    "timestamp": 1618480662358201,
    "duration": 9206,
    "localEndpoint": {
      "serviceName": "SERVICE2",
      "ipv4": "172.24.132.32"
    },
    "tags": {
      "http.method": "GET",
      "http.path": "/3/api"
    }
  },
  {
    "traceId": "3112dd04c3112036",
    "parentId": "c31d9859c69a2b21",
    "id": "73cd1cab1d72a971",
    "kind": "CLIENT",
    "name": "get /4/api",
    "timestamp": 1618480662358211,
    "duration": 9349,
    "localEndpoint": {
      "serviceName": "SERVICE2",
      "ipv4": "172.24.132.32"
    },
    "tags": {
      "http.method": "GET",
      "http.path": "/4/api"
    }
  }
]

图 10

数据存储

Zipkin 反对 MySQL、Cassandra 和 ElasticSearch 三种数据存储,这三者都存在各自的毛病:

  • MySQL:采集的 Tracing 信息根本都在每天上亿行甚至百亿行以上,MySQL 无奈撑持这么大数据量。
  • Cassandra:能反对对单个 Trace 的 Span 信息剖析,但对聚合查问等数据统计分析场景反对不好
  • ElasticSearch:能反对单个 Trace 的剖析和简略的聚合查问剖析,但对于一些较简单的数据分析计算不能很好的反对,比方波及到 Join、窗口函数等等的计算需要,尤其是工作间依赖计算,Zipkin 目前还不能实时计算,须要通过离线跑 Spark 工作计算工作间依赖信息。

咱们在实践中也是首先应用 ElasticSearch,发现了下面提到的问题,比方 Zipkin 的服务依赖拓扑必须应用离线形式计算,便新增了 StarRocks 作为底层数据存储。将 Zipkin 的 trace 数据导入到 StarRocks 很不便,根本步骤只须要两步,CREATE TABLE + CREATE ROUTINE LOAD。

另外,在调用链路性能瓶颈剖析场景中,要将单个服务看作黑盒,只关注 RPC SPAN,屏蔽掉服务外部的 Messaging Span,应用了 Flink 对服务外部 span 进行 ParentID 溯源,即从 RPC Client SPAN,始终追溯到同一服务同一 Trace ID 的 RPC Server SPAN,用 RPC Server SPAN 的 ID 替换 RPC Client SPAN 的 parentId,最初通过 Flink-Connector-StarRocks 将转换后的数据实时写入 StarRocks。

基于 StarRocks 的数据存储架构流程如下图所示。

图 11

CREATE TABLE

建表语句示例参考如下,有如下几点留神点:

  • 包含 Zipkin 和 zipkin_trace_perf 两张表,zipkin_trace_perf 表只用于调用链路性能瓶颈剖析场景,其余统计分析都实用 Zipkin 表
  • 通过采集信息中的 Timestamp 字段,生成 dt、hr、min 工夫字段,便于后续统计分析
  • 采纳 DUPLICATE 模型、Bitmap 索引等设置,放慢查问速度
  • Zipkin 表应用 id 作为分桶字段,在查问服务拓扑时,查问打算会优化为 Colocate Join,晋升查问性能。

Zipkin

CREATE TABLE `zipkin` (`traceId` varchar(24) NULL COMMENT "",
  `id` varchar(24) NULL COMMENT "Span ID",
  `localEndpoint_serviceName` varchar(512) NULL COMMENT "",
  `dt` int(11) NULL COMMENT "",
  `parentId` varchar(24) NULL COMMENT "",
  `timestamp` bigint(20) NULL COMMENT "",
  `hr` int(11) NULL COMMENT "",
  `min` bigint(20) NULL COMMENT "",
  `kind` varchar(16) NULL COMMENT "",
  `duration` int(11) NULL COMMENT "",
  `name` varchar(300) NULL COMMENT "",
  `localEndpoint_ipv4` varchar(16) NULL COMMENT "",
  `remoteEndpoint_ipv4` varchar(16) NULL COMMENT "",
  `remoteEndpoint_port` varchar(16) NULL COMMENT "",
  `shared` int(11) NULL COMMENT "",
  `tag_error` int(11) NULL DEFAULT "0" COMMENT "",
  `error_msg` varchar(1024) NULL COMMENT "",
  `tags_http_path` varchar(2048) NULL COMMENT "",
  `tags_http_method` varchar(1024) NULL COMMENT "",
  `tags_controller_class` varchar(100) NULL COMMENT "",
  `tags_controller_method` varchar(1024) NULL COMMENT "",
  INDEX service_name_idx (`localEndpoint_serviceName`) USING BITMAP COMMENT ''
) ENGINE=OLAP 
DUPLICATE KEY(`traceId`, `parentId`, `id`, `timestamp`, `localEndpoint_serviceName`, `dt`)
COMMENT "OLAP"
PARTITION BY RANGE(`dt`)
(PARTITION p20220104 VALUES [("20220104"), ("20220105")),
 PARTITION p20220105 VALUES [("20220105"), ("20220106")))
DISTRIBUTED BY HASH(`id`) BUCKETS 100 
PROPERTIES (
"replication_num" = "3",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.time_zone" = "Asia/Shanghai",
"dynamic_partition.start" = "-30",
"dynamic_partition.end" = "2",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "100",
"in_memory" = "false",
"storage_format" = "DEFAULT"
);

zipkin_trace_perf

CREATE TABLE `zipkin_trace_perf` (`traceId` varchar(24) NULL COMMENT "",
  `id` varchar(24) NULL COMMENT "",
  `dt` int(11) NULL COMMENT "",
  `parentId` varchar(24) NULL COMMENT "",
  `localEndpoint_serviceName` varchar(512) NULL COMMENT "",
  `timestamp` bigint(20) NULL COMMENT "",
  `hr` int(11) NULL COMMENT "",
  `min` bigint(20) NULL COMMENT "",
  `kind` varchar(16) NULL COMMENT "",
  `duration` int(11) NULL COMMENT "",
  `name` varchar(300) NULL COMMENT "",
  `tag_error` int(11) NULL DEFAULT "0" COMMENT ""
) ENGINE=OLAP 
DUPLICATE KEY(`traceId`, `id`, `dt`, `parentId`, `localEndpoint_serviceName`)
COMMENT "OLAP"
PARTITION BY RANGE(`dt`)
(PARTITION p20220104 VALUES [("20220104"), ("20220105")),
 PARTITION p20220105 VALUES [("20220105"), ("20220106")))
DISTRIBUTED BY HASH(`traceId`) BUCKETS 32 
PROPERTIES (
"replication_num" = "3",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.time_zone" = "Asia/Shanghai",
"dynamic_partition.start" = "-60",
"dynamic_partition.end" = "2",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "12",
"in_memory" = "false",
"storage_format" = "DEFAULT"
);

ROUTINE LOAD

ROUTINE LOAD 创立语句示例如下:

CREATE ROUTINE LOAD zipkin_routine_load ON zipkin COLUMNS(
  id,
  kind,
  localEndpoint_serviceName,
  traceId,
  `name`,
  `timestamp`,
  `duration`,
  `localEndpoint_ipv4`,
  `remoteEndpoint_ipv4`,
  `remoteEndpoint_port`,
  `shared`,
  `parentId`,
  `tags_http_path`,
  `tags_http_method`,
  `tags_controller_class`,
  `tags_controller_method`,
  tmp_tag_error,
  tag_error = if(`tmp_tag_error` IS NULL, 0, 1),
  error_msg = tmp_tag_error,
  dt = from_unixtime(`timestamp` / 1000000, '%Y%m%d'),
  hr = from_unixtime(`timestamp` / 1000000, '%H'),
  `min` = from_unixtime(`timestamp` / 1000000, '%i')
) PROPERTIES (
  "desired_concurrent_number" = "3",
  "max_batch_interval" = "50",
  "max_batch_rows" = "300000",
  "max_batch_size" = "209715200",
  "max_error_number" = "1000000",
  "strict_mode" = "false",
  "format" = "json",
  "strip_outer_array" = "true",
  "jsonpaths" = "[\"$.id\",\"$.kind\",\"$.localEndpoint.serviceName\",\"$.traceId\",\"$.name\",\"$.timestamp\",\"$.duration\",\"$.localEndpoint.ipv4\",\"$.remoteEndpoint.ipv4\",\"$.remoteEndpoint.port\",\"$.shared\",\"$.parentId\",\"$.tags.\\\"http.path\\\"\",\"$.tags.\\\"http.method\\\"\",\"$.tags.\\\"mvc.controller.class\\\"\",\"$.tags.\\\"mvc.controller.method\\\"\",\"$.tags.error\"]"
)
FROM
  KAFKA (
    "kafka_broker_list" = "IP1:PORT1,IP2:PORT2,IP3:PORT3",
    "kafka_topic" = "XXXXXXXXX"
  );

Flink 溯源 Parent ID

针对调用链路性能瓶颈剖析场景中,应用 Flink 进行 Parent ID 溯源,代码示例如下:

env
  // 增加 kafka 数据源
  .addSource(getKafkaSource())
  // 将采集到的 Json 字符串转换为 JSONArray,// 这个 JSONArray 是从单个服务采集的信息,外面会蕴含多个 Trace 的 Span 信息
  .map(JSON.parseArray(_))
  // 将 JSONArray 转换为 JSONObject,每个 JSONObejct 就是一个 Span
  .flatMap(_.asScala.map(_.asInstanceOf[JSONObject]))
  // 将 Span 的 JSONObject 对象转换为 Bean 对象
  .map(jsonToBean(_))
  // 以 traceID+localEndpoint_serviceName 作为 key 对 span 进行分区生成 keyed stream
  .keyBy(span => keyOfTrace(span))
  // 应用会话窗口,将同一个 Trace 的不同服务上的所有 Span,散发到同一个固定距离的 processing-time 窗口
  // 这里为了实现简略,应用了 processing-time session 窗口,后续咱们会应用 starrocks 的 UDAF 函数进行优化,去掉对 Flink 的依赖
  .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
  // 应用 Aggregate 窗口函数
  .aggregate(new TraceAggregateFunction)
  // 将通过溯源的 span 汇合开展,便于调用 flink-connector-starrocks
  .flatMap(spans => spans)
  // 应用 flink-connector-starrocks sink,将数据写入 starrocks 中
  .addSink(
    StarRocksSink.sink(StarRocksSinkOptions.builder().withProperty("XXX", "XXX").build()))

剖析计算

以 图 6 作为一个微服务零碎用例,给出各个统计分析场景对应的 StarRocks SQL 语句。

服务内剖析

上游服务申请指标统计

上面的 SQL 应用 Zipkin 表数据,计算服务 Service2 申请上游服务 Service3 和上游服务 Service4 的查问统计信息,按小时和接口分组统计查问指标

select
  hr,
  name,
  req_count,
  timeout / req_count * 100 as timeout_rate,
  error_count / req_count * 100 as error_rate,
  avg_duration,
  tp95,
  tp99
from
  (
    select
      hr,
      name,
      count(1) as req_count,
      AVG(duration) / 1000 as avg_duration,
      sum(if(duration > 200000, 1, 0)) as timeout,
      sum(tag_error) as error_count,
      percentile_approx(duration, 0.95) / 1000 AS tp95,
      percentile_approx(duration, 0.99) / 1000 AS tp99
    from
      zipkin
    where
      localEndpoint_serviceName = 'Service2'
      and kind = 'CLIENT'
      and dt = 20220105
    group by
      hr,
      name
  ) tmp
order by
  hr

上游服务响应指标统计

上面的 SQL 应用 Zipkin 表数据,计算服务 Service2 响应上游服务 Service1 的查问统计信息,按小时和接口分组统计查问指标。

select
  hr,
  name,
  req_count,
  timeout / req_count * 100 as timeout_rate,
  error_count / req_count * 100 as error_rate,
  avg_duration,
  tp95,
  tp99
from
  (
    select
      hr,
      name,
      count(1) as req_count,
      AVG(duration) / 1000 as avg_duration,
      sum(if(duration > 200000, 1, 0)) as timeout,
      sum(tag_error) as error_count,
      percentile_approx(duration, 0.95) / 1000 AS tp95,
      percentile_approx(duration, 0.99) / 1000 AS tp99
    from
      zipkin
    where
      localEndpoint_serviceName = 'Service2'
      and kind = 'SERVER'
      and dt = 20220105
    group by
      hr, 
      name
  ) tmp
order by
  hr

服务外部解决剖析

上面的 SQL 应用 Zipkin 表数据,查问服务 Service2 的接口 /2/api,按 Span Name 分组统计 Duration 等信息。

with 
spans as (select * from zipkin where dt = 20220105 and localEndpoint_serviceName = "Service2"),
api_spans as (
  select
    spans.id as id,
    spans.parentId as parentId,
    spans.name as name,
    spans.duration as duration
  from
    spans
    inner JOIN 
    (select * from spans where kind = "SERVER" and name = "/2/api") tmp 
    on spans.traceId = tmp.traceId
)
SELECT
  name,
  AVG(inner_duration) / 1000 as avg_duration,
  percentile_approx(inner_duration, 0.95) / 1000 AS tp95,
  percentile_approx(inner_duration, 0.99) / 1000 AS tp99
from
  (
    select
      l.name as name,
      (l.duration - ifnull(r.duration, 0)) as inner_duration
    from
      api_spans l
      left JOIN 
      api_spans r 
      on l.parentId = r.id
  ) tmp
GROUP BY
  name

服务间剖析

服务拓扑统计

上面的 SQL 应用 Zipkin 表数据,计算服务间的拓扑关系,以及服务间接口 Duration 的统计信息。

with tbl as (select * from zipkin where dt = 20220105)
select 
  client, 
  server, 
  name,
  AVG(duration) / 1000 as avg_duration,
  percentile_approx(duration, 0.95) / 1000 AS tp95,
  percentile_approx(duration, 0.99) / 1000 AS tp99
from
  (
    select
      c.localEndpoint_serviceName as client,
      s.localEndpoint_serviceName as server,
      c.name as name,
      c.duration as duration
    from
    (select * from tbl where kind = "CLIENT") c
    left JOIN 
    (select * from tbl where kind = "SERVER") s 
    on c.id = s.id and c.traceId = s.traceId
  ) as tmp
group by 
  client,  
  server,
  name

调用链路性能瓶颈剖析

上面的 SQL 应用 zipkin_trace_perf 表数据,针对某个服务接口响应超时的查问申请,统计出每次申请的调用链路中解决耗时最长的服务或服务间调用,进而剖析出性能热点是在某个服务或服务间调用。

select
  service,
  ROUND(count(1) * 100 / sum(count(1)) over(), 2) as percent
from
  (
    select
      traceId,
      service,
      duration,
      ROW_NUMBER() over(partition by traceId order by duration desc) as rank4
    from
      (
        with tbl as (
          SELECT
            l.traceId as traceId,
            l.id as id,
            l.parentId as parentId,
            l.kind as kind,
            l.duration as duration,
            l.localEndpoint_serviceName as localEndpoint_serviceName
          FROM
            zipkin_trace_perf l
            INNER JOIN 
            zipkin_trace_perf r 
            on l.traceId = r.traceId
              and l.dt = 20220105
              and r.dt = 20220105
              and r.tag_error = 0     -- 过滤掉出错的 trace
              and r.localEndpoint_serviceName = "Service1"
              and r.name = "/1/api"
              and r.kind = "SERVER"
              and r.duration > 200000  -- 过滤掉未超时的 trace
        )
        select
          traceId,
          id,
          service,
          duration
        from
          (
            select
              traceId,
              id,
              service,
              (c_duration - s_duration) as duration,
              ROW_NUMBER() over(partition by traceId order by (c_duration - s_duration) desc) as rank2
            from
              (
                select
                  c.traceId as traceId,
                  c.id as id,
                  concat(c.localEndpoint_serviceName, "=>", ifnull(s.localEndpoint_serviceName, "?")) as service,
                  c.duration as c_duration,
                  ifnull(s.duration, 0) as s_duration
                from
                  (select * from tbl where kind = "CLIENT") c
                  left JOIN 
                  (select * from tbl where kind = "SERVER") s 
                  on c.id = s.id and c.traceId = s.traceId
              ) tmp1
          ) tmp2
        where
          rank2 = 1
        union ALL
        select
          traceId,
          id,
          service,
          duration
        from
          (
            select
              traceId,
              id,
              service,
              (s_duration - c_duration) as duration,
              ROW_NUMBER() over(partition by traceId order by (s_duration - c_duration) desc) as rank2
            from
              (
                select
                  s.traceId as traceId,
                  s.id as id,
                  s.localEndpoint_serviceName as service,
                  s.duration as s_duration,
                  ifnull(c.duration, 0) as c_duration,
                  ROW_NUMBER() over(partition by s.traceId, s.id order by ifnull(c.duration, 0) desc) as rank
                from
                  (select * from tbl where kind = "SERVER") s
                  left JOIN 
                  (select * from tbl where kind = "CLIENT") c 
                  on s.id = c.parentId and s.traceId = c.traceId
              ) tmp1
            where
              rank = 1
          ) tmp2
        where
          rank2 = 1
      ) tmp3
  ) tmp4
where
  rank4 = 1
GROUP BY
  service
order by
  percent desc

SQL 查问的后果如下图所示,在超时的 Trace 申请中,性能瓶颈服务或服务间调用的比例散布。

图 12

03 实际成果

目前搜狐智能媒体已在 30+ 个服务中接入 Zipkin,涵盖上百个线上服务实例,1% 的采样率每天产生近 10 亿 多行的日志。

通过 Zipkin Server 查问 StarRocks,获取的 Trace 信息如下图所示:

图 13

通过 Zipkin Server 查问 StarRocks,获取的服务拓扑信息如下图所示:

图 14

基于 Zipkin StarRocks 的链路追踪体系实际过程中,显著晋升了微服务监控剖析能力和工程效率:

晋升微服务监控剖析能力

  • 在监控报警方面,能够基于 StarRocks 查问统计线上服务以后时刻的响应提早百分位数、错误率等指标,依据这些指标及时产生各类告警;
  • 在指标统计方面,能够基于 StarRocks 按天、小时、分钟等粒度统计服务响应提早的各项指标,更好的理解服务运行状况;
  • 在故障剖析方面,基于 StarRocks 弱小的 SQL 计算能力,能够进行服务、工夫、接口等多个维度的摸索式剖析查问,定位故障起因。

晋升微服务监控工程效率

Metric 和 Logging 数据采集,很多须要用户手动埋点和装置各种采集器 Agent,数据采集后存储到 ElasticSearch 等存储系统,每上一个业务,这些流程都要操作一遍,十分繁琐,且资源扩散不易治理。

而应用 Zipkin + StarRocks 的形式,只需在代码中引入对应库 SDK,设置上报的 Kafka 地址和采样率等大量配置信息,Tracing 便可主动埋点采集,通过 zikpin server 界面进行查问剖析,十分简便。

04 总结与瞻望

基于 Zipkin+StarRocks 构建链路追踪零碎,可能提供微服务监控的 Monitoring 和 Observability 能力,晋升微服务监控的剖析能力和工程效率。
后续有几个 优化点,能够进一步晋升链路追踪零碎的剖析能力和易用性:

  1. 应用 StarRocks 的 UDAF、窗口函数等性能,将 Parent ID 溯源下沉到 StarRocks 计算,通过计算后置的形式,勾销对 Flink 的依赖,进一步简化整个零碎架构。
  2. 目前对原始日志中的 tag s 等字段,并没有齐全采集,StarRocks 正在实现 Json 数据类型,可能更好的反对 tags 等嵌套数据类型。
  3. Zipkin Server 目前的界面还稍显简陋,咱们曾经买通了 Zipkin Server 查问 StarRokcs,后续会对 Zipkin Server 进行 U I 等优化,通过 StarRocks 弱小的计算能力实现更多的指标查问,进一步晋升用户体验。

05 参考文档

  1. 《云原生计算重塑企业 IT 架构 – 分布式应用架构》:
    https://developer.aliyun.com/article/717072
  2. What is Upstream and Downstream in Software Development?
    https://reflectoring.io/upstream-downstream/
  3. Metrics, tracing, and logging:
    https://peter.bourgon.org/blog/2017/02/21/metrics-tracing-and-logging.html
  4. The 3 pillars of system observability:logs, metrics and tracing:
    https://iamondemand.com/blog/the-3-pillars-of-system-observability-logs-metrics-and-tracing/
  5. observability 3 ways: logging, metrics and tracing:
    https://speakerdeck.com/adriancole/observability-3-ways-logging-metrics-and-tracing
  6. Dapper, a Large-Scale Distributed Systems Tracing Infrastructure:
    https://static.googleusercontent.com/media/research.google.com/en//archive/papers/dapper-2010-1.pdf
  7. Jaeger:www.jaegertracing.io
  8. Zipkin:https://zipkin.io/
  9. opentracing.io:
    https://opentracing.io/docs/
  10. opencensus.io:
    https://opencensus.io/
  11. opentelemetry.io:
    https://opentelemetry.io/docs/
  12. Microservice Observability, Part 1: Disambiguating Observability and Monitoring:
    https://bravenewgeek.com/microservice-observability-part-1-disambiguating-observability-and-monitoring/
  13. How to Build Observable Distributed Systems:
    https://www.infoq.com/presentations/observable-distributed-ststems/
  14. Monitoring and Observability:
    https://copyconstruct.medium.com/monitoring-and-observability-8417d1952e1c
  15. Monitoring Isn’t Observability:
    https://orangematter.solarwinds.com/2017/09/14/monitoring-isnt-observability/
  16. Spring Cloud Sleuth Documentation:
    https://docs.spring.io/spring-cloud-sleuth/docs/current-SNAPSHOT/reference/html/getting-started.html#getting-started
正文完
 0