图解 Fluent Bit 外部设计
本文摘录自我的开源书:《Mark’s DevOps 雜碎》中的 图解 Fluent Bit 外部设计。如果你看到图片不清晰,请转回原文。
前言
最近,因为工作须要,用老程序员在 🐑 羊之前最快的速度,通过文档、源码,学习了 Fluent Bit 的一些外围实现设计。正所谓,学得快,忘得更快。固作本笔记,以备未来学不动。
本文指标
本文并 <mark> 不是要介绍 Fluent Bit</mark>。也 <mark> 不是学习如何应用它 </mark>(当然,学习实现自身就是为更好地应用它的性能,同时更好地躲避它的问题。即:<mark> 让 3PP 的应用预期可控 </mark>)。本文次要是初步简略钻研 Fluent Bit 外部实现后的总结与笔记。次要是从繁冗的文档与代码中,找到一点全局观、根底概念,以让学习的人不至于迷路其中细节。
本文是包含学习到的局部设计。Fluent Bit 虽自称 lightweight,其实实现并不简略。
因为学习工夫无限,应用教训也不多,所以内容不免有错。应用请审慎!
对读者的假如
假如读者曾经理解过 Fluent Bit。想看看其实现机理,以提前发现可能潜在的坑。
互动图片
📢 本文的失常关上办法是,点击“用 Draw.io 关上 ”后,进入互动图片状态。图中很多元素提供链接到相干源码或文档。能够做穿插参考,是进一步深刻的入口,也是图可信性取证。
本文的大部分内容是放在图中了。看图比文字更重要。
什么是 Fluent Bit
Fluent Bit 官网 这样介绍本人:
Fluent Bit 是一种超疾速、轻量级且高度可扩大的日志记录和指标处理器和转发器。它是云和容器化环境的首选。
而在开源网站
Fluent Bit 是实用于 Linux、Windows、嵌入式 Linux、MacOS 和 BSD 系列操作系统的疾速日志处理器和转发器。它是 Graduated Fluentd 生态系统和 CNCF 子项目的一部分。Fluent Bit 容许从不同起源收集日志事件或指标,对其进行解决并将它们传送到不同的后端,例如 Fluentd、Elasticsearch、Splunk、DataDog、Kafka、New Relic、Azure 服务、AWS 服务、Google 服务、NATS、InfluxDB 或任何自定义 HTTP 端点。
概念
上面只介绍本文用到的局部概念。
Record 概念
能够简化认为,日志文件中的一行,就是一个 Record
。外部以 json 树模式来记录一个 Record
。
为进步内存中的 Record
数据密度,同时减速 json 构造树的拜访。Fluent Bit 外部应用了 MessagePack
格局在内存与硬盘中保留数据。所以,请留神不是用咱们日常见的明文 json 格局。可能如果要比拟精密评估 Fluent Bit 内存应用时,须要思考这一点。
Chunk 概念
为进步解决性能,Fluent Bit 每次以小批量的 Record
为单位解决数据。每个批叫 Chunk
。他是 Record
的汇合。
数据在由 Input Plugin 加载入内存时,就曾经是以批(Chunk
) 的模式了。加载后,经由 pipeline、最初再到 Output,均以 Chunk 为粒度去解决(这里我未齐全必定)。
上面阐明一下代码与存储的构造:
图:Chunk 定义 用 Draw.io 关上
Pipeline/Engine 概念
图:Engine 概念 用 Draw.io 关上
Input
形象的输出源。
Output
形象的输入源。
Tail Input
图:Tail Input 概述 用 Draw.io 关上
Tail Input 外部设计
图:Tail Input 外部设计 用 Draw.io 关上
图中曾经比拟具体了,这里只想补充一些基本概念。
对于毎一个 Tail Input 实例,均有以下协程 (Collector):
- watcher collector process
- static file collectior process
- pending file collector process
对于每一个 Tail Input 实例,还有以下协程:
- input path scan process
以下是一些揣测的流程:
input path scan process
的主要职责是按Tail Input
的path
配置要求,定时 (Refresh_Interval
) 扫描,发现文件的:新增等状况。而后把发现告诉到static file collectior
static file collectior
首先应用 inotify 去 watch 文件。而后尝试一次读完文件,如果因各种起因无奈一次实现(如内存不足),会告诉到pending file collector
去异步实现pending file collector
实现文件的读取- Linux Kernel 在监测到文件有写入 (
IN_MODIFY
) 时,发马上读取文件。当发现文件被删除 (IN_MOVE_SELF
) 时,会进行文件的监控、读取、并敞开 fd。
下面未剖析的,包含 rotate (rename) 的场景。
仔细如你,可能会放心下面的协程会否同时读取文件或更新状态,引动竞态条件(多线程)问题。这个曾经由上面的事件事件驱动与协程框架解决了。
事件驱动与协程
以下例子场景,应用了 Fluent Bit 1.99 与其
Tail Input
+Http Output
$ top -H -p $(pgrep fluent-bit)
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
27 226099 20 0 417804 67096 9240 S 0.000 0.069 0:02.13 fluent-bit
35 226099 20 0 417804 67096 9240 S 0.000 0.069 1:16.61 flb-pipeline
37 226099 20 0 417804 67096 9240 S 0.000 0.069 0:06.69 flb-logger
45 226099 20 0 417804 67096 9240 S 0.000 0.069 0:11.58 flb-out-http.0-
46 226099 20 0 417804 67096 9240 S 0.000 0.069 0:11.70 flb-out-http.0-
47 226099 20 0 417804 67096 9240 S 0.000 0.069 0:00.00 monkey: server
48 226099 20 0 417804 67096 9240 S 0.000 0.069 0:03.17 monkey: clock
49 226099 20 0 417804 67096 9240 S 0.000 0.069 0:23.82 monkey: wrk/0
用 top -H
能够看到 fluent bit 过程的原生线程列表。PID
列即系线程的 id,而起码的线程 PID 同时作为过程的 PID。其中比拟有意思的是 TIME+
字段。这示意花在这个线程上的 CPU 计算工夫。以下是揣测:
flb-pipeline
: 日志解决与输入monkey: wrk/0
: 日志文件读取
什么是 monkey ?
https://github.com/monkey/monkey
Monkey is a fast and lightweight Web Server for Linux. It has been designed to be very scalable with low memory and CPU consumption, the perfect solution for Embedded Linux and high end production environments.
Besides the common features as HTTP server, it expose a flexible C API which aims to behave as a fully HTTP development framework, so it can be extended as desired through the plugins interface.
For more details please refer to the official documentation.
Fluent Bit 中,次要是用了其协程和事件驱动封装的性能。协程的实现设计上有一点点相似 Golang。上图的线程名中 monkey: wrk/0
。可见,是在计算量大时,能够为协程减少必要的线程来反对计算。从代码看,仿佛协程的换出点(schedule) 是在 file descriptor(fd)
的读写点上,实现上 monkey 仿佛是应用了 epoll 去多路复用 fd 汇合。协程间的同步通信由 linux 的匿名 pipe + epoll 实现。即,线程事实上是期待在一个多路复用的 epoll 事件上。
查看各线程的内核 stack:
root@root-mylab-worker006:/proc/27/task> sudo cat ./35/stack
[<0>] ep_poll+0x3d4/0x4d0
[<0>] do_epoll_wait+0xab/0xc0
[<0>] __x64_sys_epoll_wait+0x1a/0x20
[<0>] do_syscall_64+0x5b/0x1e0
[<0>] entry_SYSCALL_64_after_hwframe+0x44/0xa9
root@root-mylab-worker006:/proc/27/task> sudo cat ./49/stack
[<0>] ep_poll+0x3d4/0x4d0
[<0>] do_epoll_wait+0xab/0xc0
[<0>] __x64_sys_epoll_wait+0x1a/0x20
[<0>] do_syscall_64+0x5b/0x1e0
[<0>] entry_SYSCALL_64_after_hwframe+0x44/0xa9
root@root-mylab-worker006:/proc/27/task> sudo cat ./48/stack
[<0>] hrtimer_nanosleep+0x9a/0x140
[<0>] common_nsleep+0x33/0x50
[<0>] __x64_sys_clock_nanosleep+0xc4/0x120
[<0>] do_syscall_64+0x5b/0x1e0
[<0>] entry_SYSCALL_64_after_hwframe+0x44/0xa9
文件 fd 即事件源
如果你足够好奇,能够看看过程的 fd 列表:
bash-4.4$ cd /proc/27
bash-4.4$ cd fd
bash-4.4$ ls -l
total 0
lr-x------ 1 226099 226099 64 Dec 13 19:39 0 -> /dev/null
l-wx------ 1 226099 226099 64 Dec 13 19:39 1 -> 'pipe:[1066519386]'
l-wx------ 1 226099 226099 64 Dec 13 19:39 10 -> 'pipe:[1066519390]'
lr-x------ 1 226099 226099 64 Dec 13 19:39 100 -> anon_inode:inotify
lr-x------ 1 226099 226099 64 Dec 13 19:39 101 -> 'pipe:[1066516725]'
l-wx------ 1 226099 226099 64 Dec 13 19:39 102 -> 'pipe:[1066516725]'
lr-x------ 1 226099 226099 64 Dec 13 19:39 103 -> 'pipe:[1066516726]'
l-wx------ 1 226099 226099 64 Dec 13 19:39 104 -> 'pipe:[1066516726]'
lr-x------ 1 226099 226099 64 Dec 13 19:39 105 -> 'pipe:[1066516727]'
l-wx------ 1 226099 226099 64 Dec 13 19:39 106 -> 'pipe:[1066516727]'
lrwx------ 1 226099 226099 64 Dec 13 19:39 107 -> /var/logstash/db/myapp_mysub_pv.db
lr-x------ 1 226099 226099 64 Dec 13 19:39 108 -> anon_inode:inotify <---- intofiy
lr-x------ 1 226099 226099 64 Dec 13 19:39 109 -> 'pipe:[1066516745]'
lrwx------ 1 226099 226099 64 Dec 13 19:39 11 -> 'anon_inode:[eventpoll]' <----- epoll
l-wx------ 1 226099 226099 64 Dec 13 19:39 110 -> 'pipe:[1066516745]'
lr-x------ 1 226099 226099 64 Dec 13 19:39 111 -> 'pipe:[1066516746]'
l-wx------ 1 226099 226099 64 Dec 13 19:39 112 -> 'pipe:[1066516746]'
lr-x------ 1 226099 226099 64 Dec 13 19:39 113 -> 'pipe:[1066516747]'
l-wx------ 1 226099 226099 64 Dec 13 19:39 114 -> 'pipe:[1066516747]'
lrwx------ 1 226099 226099 64 Dec 13 19:39 115 -> /var/logstash/db/myapp_mysub_pv_outbound.db
...
lrwx------ 1 226099 226099 64 Dec 13 19:39 363 -> /var/logstash/db/myapp_mysub2.db-wal
lrwx------ 1 226099 226099 64 Dec 13 19:39 364 -> /var/logstash/db/myapp_mysub2.db-shm
...
lrwx------ 1 226099 226099 64 Dec 13 19:39 485 -> 'anon_inode:[timerfd]'
lrwx------ 1 226099 226099 64 Dec 13 19:39 486 -> 'anon_inode:[timerfd]'
lrwx------ 1 226099 226099 64 Dec 13 19:39 487 -> 'anon_inode:[timerfd]'
lrwx------ 1 226099 226099 64 Dec 13 19:39 488 -> 'anon_inode:[timerfd]'
...
lrwx------ 1 226099 226099 64 Dec 13 19:39 681 -> 'socket:[1067595164]'
lr-x------ 1 226099 226099 64 Dec 13 19:39 685 -> /var/logstash/mylog/Txlog.mylogB_0.log.2022-12-13-18
lr-x------ 1 226099 226099 64 Dec 13 19:39 686 -> /var/logstash/mylog/Txlog.mylogA_0.log.2022-12-13-19
对于重试
Fluent Bit 的 Output 插件,在尝试投递一个 Chunk 日志后,都会通知引擎,
上过生产环境战场的程序员,都明确这两件事的重要性和难度,并深谙此道:
- timeout
- retry
这里只说 retry。retry 多了(甚至有限),会呈现因一个(或一批)不可修复的 item 而卡数据或业务流的状况。我就亲眼无过没解决好 retry 卡了大量订单的状况(因一字段超过了接管方的最大长度)。
如果你不想因一棵坏的树而损失了一个森林,抉择什么时候跳过 (skip) 是要害。
Retryable error / Non-retryable error
对于实现上,如果咱们能够分明,什么谬误是值得重试的,什么不值得,则能够无效缩小出错时对数据流呑吐量的影响。Fluent Bit 大部分 Output Plugin 均在投递失败时,返回 Retryable error / Non-retryable error 给引擎以让引擎决定是否立刻放弃重试,进而缩小出错时对数据流呑吐量的影响。不过,对于 http output plugin,如同只有 Retryable error:
https://github.com/fluent/fluent-bit/blob/v1.9.9/plugins/out_http/http.c#L240
static int http_post(struct flb_out_http *ctx,
const void *body, size_t body_len,
const char *tag, int tag_len,
char **headers)
{
int ret;
int out_ret = FLB_OK;
...
ret = flb_http_do(c, &b_sent);
if (ret == 0) {
/*
* Only allow the following HTTP status:
*
* - 200: OK
* - 201: Created
* - 202: Accepted
* - 203: no authorative resp
* - 204: No Content
* - 205: Reset content
*
*/
if (c->resp.status < 200 || c->resp.status > 205) {
if (ctx->log_response_payload &&
c->resp.payload && c->resp.payload_size > 0) {
flb_plg_error(ctx->ins, "%s:%i, HTTP status=%i\n%s",
ctx->host, ctx->port,
c->resp.status, c->resp.payload);
}
else {
flb_plg_error(ctx->ins, "%s:%i, HTTP status=%i",
ctx->host, ctx->port, c->resp.status);
}
out_ret = FLB_RETRY;
}
else {
if (ctx->log_response_payload &&
c->resp.payload && c->resp.payload_size > 0) {
flb_plg_info(ctx->ins, "%s:%i, HTTP status=%i\n%s",
ctx->host, ctx->port,
c->resp.status, c->resp.payload);
}
else {
flb_plg_info(ctx->ins, "%s:%i, HTTP status=%i",
ctx->host, ctx->port,
c->resp.status);
}
}
}
else {flb_plg_error(ctx->ins, "could not flush records to %s:%i (http_do=%i)",
ctx->host, ctx->port, ret);
out_ret = FLB_RETRY;
}
总结
以上只是对 Fluent Bit 的 Input
/ Output
框架 与 Tail Input Plugin
的一些学习笔记。心愿在当前的应用中,能有更深刻的理解能够和大家分享。