共计 10535 个字符,预计需要花费 27 分钟才能阅读完成。
参考: dapeng 日志的收集处理及查询应用
最近照着 dapeng 的日志管理系统, 想着自己也来搭建一个 EFK 试试. 在这里记录一下自己的踩坑经历, 也非常感谢 Ever_00、洋洋_3720 等大佬们的支持和帮助
技术选型
回归正题, 我们选取的依旧是 fluent-bit+fluentd+kafka+elasticsearch 作为日志系统的方案, 其中 dapeng 服务已经集成了单结点 fluent-bit 收集各个 docker 容器中的日志文件发往 fluentd, fluentd 做为中转收集所有的日志发往 kafak 用于削峰填谷,削峰后的数据再经由 fluentd 发送给 elasticsearch 进行存储. 此次搭建过程中没有使用 Kibana, 而是使用 elasticsearch-head 来做日志界面展示.
对于非 dapeng 服务需要自己修改 Dockerfile, 将修改过得 fluent-bit 打包到容器 服务启动时运行
sh /opt/fluent-bit/fluent-bit.sh
即可
fluent-bit 的日志收集配置
fluent-bit-dapeng.conf
[SERVICE]
Flush 5
Daemon On
Log_Level error
Log_File /fluent-bit/log/fluent-bit.log
Parsers_File parse_dapeng.conf
[INPUT]
Name tail
Path /dapeng-container/logs/*.log
Exclude_Path /dapeng-container/logs/fluent*.log,/dapeng-container/logs/console.log,/dapeng-container/logs/gc*.log
Tag dapeng
Multiline on
Buffer_Chunk_Size 2m
buffer_max_size 30m
Mem_Buf_Limit 32m
DB.Sync Normal
db_count 400
Parser_Firstline dapeng_multiline
db /fluent-bit/db/logs.db
[FILTER]
Name record_modifier
Match *
Record hostname ${soa_container_ip}
Record tag ${serviceName}
[OUTPUT]
Name Forward
Match *
Host fluentd
Port 24224
HostStandby fluentdStandby
PortStandby 24224
在 dapeng 服务中, 对于每个服务 serviceName
, soa_container_ip
, fluentd
, fluentdStandby
的配置必不可少, 其中的 Path, Exclude_Path 用来指定哪些日志需要收集, 哪些需要过滤, 可以通过环境变量来修改:
fluentBitLogPath=/dapeng-container/logs/*.log
fluentBitLogPathExclude=/dapeng-container/logs/fluent*.log,/dapeng-container/logs/console.log,/dapeng-container/logs/gc*.log
同时需要将上面的 fluent-bit-dapeng.conf
挂载到 /opt/fluent-bit/etc/fluent-bit.conf
environment:
- serviceName=payment
- container_ip=${host_ip}
- soa_container_port=${payment_port}
- soa_container_ip=${host_ip}
- host_ip=${host_ip}
- soa_service_timeout=60000
- JAVA_OPTS=-Dname=payment -Dfile.encoding=UTF-8 -Dsun.jun.encoding=UTF-8 -Dio.netty.leakDetectionLevel=advanced
- kafka_consumer_host=${kafka_host_ip}:9092
- kafka_producer_host=${kafka_host_ip}:9092
env_file:
- .envs/application.env
- .envs/common.env
volumes:
- "/data/logs/payment:/dapeng-container/logs"
- "/data/var/fluent/order/:/fluent-bit/db/"
- "./config/fluent-bit-dapeng.conf:/opt/fluent-bit/etc/fluent-bit.conf"
- "/data/var/shm:/data/shm"
ports:
- "${payment_port}:${payment_port}"
extra_hosts:
- "fluentd:${fluentd_host}"
- "fluentdStandby:${fluentdStandby_host}"
- "db-master:${mysql_host_ip}"
- "soa_zookeeper:${zookeeper_host_ip}"
- "redis_host:${redis_host_ip}"
同时在 dapeng 服务容器中可以看到 parse_dapeng.conf 如下
[PARSER]
Name dapeng_multiline
Format regex
Regex (?<logtime>\d{1,2}-\d{1,2} \d{1,2}:\d{1,2}:\d{1,2} \d{1,3}) (?<threadPool>.*) (?<level>.*) \[(?<sessionTid>.*)\] - (?<message>.*)
其中的 Regex 就是对日志进行正则匹配解析出我们需要的信息, 例如其中的 logtime, message 等
我们也可以通过环境变量来设置解析表达式
fluentbitParserRegex=(?<logtime>^\d{2}-\d{2} \d{2}:\d{2}:\d{2} \d{3}) (?<threadPool>[^]+|Check idle connection Thread) (?<level>[^]+) \[(?<sessionTid>\w*)\] - (?<message>.*)
注意: 虽然 dapeng 集成了 fluent-bit, 但是默认是不开启的, 需要修改环境变量:
fluent_bit_enable=true
fluentd 的镜像
首先是准备 fluentd 的镜像, 以下是 fluentd 的 Dockerfile
FROM fluent/fluentd:v1.2
#增加 es 插件, kafka 插件
RUN fluent-gem install fluent-plugin-elasticsearch
RUN fluent-gem install fluent-plugin-kafka
CMD exec fluentd -c /fluentd/etc/${FLUENTD_CONF} -p /fluentd/plugins $FLUENTD_OPT
- 打包 image (注意在 Dockerfile 所在目录,
.
即代表 Dockerfile 的上下文)docker build docker.****.com:80/basic/fluentd:v1.2 .
- push 到 docker 私服
docker push docker.****.com:80/basic/fluentd:v1.2
- dc-all.yml 文件中配置 fluentd (dapeng 的
source-compose
封装了docker-compose
)
fluentd:
container_name: fluentd
image: docker.****.com:80/basic/fluentd:v1.2
restart: on-failure:3
volumes:
- /data/var/fluentd/log:/fluentd/log
- /data/var/fluentd/etc:/fluentd/etc
environment:
- LANG=zh_CN.UTF-8
- TZ=CST-8
ports:
- "24224:24224"
labels:
- project.source=
- project.extra=public-image
- project.depends=
- project.owner=
对于 fluentd 的相关配置配置在 /data/var/fluentd/etc 下 fluent.conf
配置 fluentd 的转发器
理论上需要开启两个 fluentd, 分别做下面 1 和 2 的工作, 这里我们先合并到一个服务当中
# 1. 收集日志发送到 kafka, topic 为 efk
# 开启 8 个工作线程, 端口从 24225 往后累加
<system>
log_level error
flush_thread_count 8
workers 8
</system>
<source>
@type forward
port 24224
</source>
<source>
@type monitor_agent
port 24225
</source>
<match dapeng>
@type kafka_buffered
brokers kafak 服务器地址:9092
topic_key efk
buffer_type file
buffer_path /tmp/buffer
flush_interval 5s
default_topic efk
output_data_type json
compression_codec gzip
max_send_retries 3
required_acks -1
discard_kafka_delivery_failed true
</match>
# 1. 收集日志发送到 kafka, topic 为 efk 结尾
# 2. 消费 kafka 中的日志消息发送到 elasticsearch, topic 为 efk, group 为 efk-consumer
#<system>
# log_level error
# flush_thread_count 2
# workers 2
#</system>
#<source>
# @type monitor_agent
# port 24225
#</source>
<source>
@type kafka_group
brokers kafka 服务器地址:9092
consumer_group efk-consumer
topics efk
format json
start_from_beginning false
max_wait_time 5
max_bytes 1500000
</source>
<match>
@type elasticsearch
hosts elasticsearch 服务器地址:9200
index_name dapeng_log_index
type_name dapeng_log
#content_type application/x-ndjson
buffer_type file
buffer_path /tmp/buffer_file
buffer_chunk_limit 10m
buffer_queue_limit 512
flush_mode interval
flush_interval 5s
request_timeout 5s
flush_thread_count 2
reload_on_failure true
resurrect_after 30s
reconnect_on_error true
with_transporter_log true
logstash_format true
logstash_prefix dapeng_log_index
template_name dapeng_log_index
template_file /fluentd/etc/template.json
num_threads 2
utc_index false
</match>
# 2. 消费 kafka 中的日志消息发送到 elasticsearch 结尾
template.json
配置 elasticsearch 关于索引创建的模板
{
"template": "dapeng_log_index-*",
"mappings": {
"dapeng_log": {
"properties": {
"logtime": {
"type": "date",
"format": "MM-dd HH:mm:ss SSS"
},
"threadPool": {
"type": "keyword",
"norms": false,
"index_options": "docs"
},
"level": {
"type": "keyword",
"norms": false,
"index_options": "docs"
},
"tag": {
"type": "keyword",
"norms": false,
"index_options": "docs"
},
"message": {
"type": "keyword",
"ignore_above": 2048,
"norms": false,
"index_options": "docs"
},
"hostname": {
"type": "keyword",
"norms": false,
"index_options": "docs"
},
"sessionTid": {
"type": "keyword",
"norms": false,
"index_options": "docs"
},
"log": {
"type": "keyword",
"norms": false,
"index_options": "docs"
}
}
}
},
"settings": {
"index": {
"max_result_window": "100000000",
"number_of_shards": "3",
"number_of_replicas": "1",
"codec": "best_compression",
"translog": {
"sync_interval": "60s",
"durability": "async",
"flush_threshold_size": "1024mb"
},
"merge":{
"policy":{"max_merged_segment": "2gb"}
},
"refresh_interval": "10s"
}
},
"warmers": {}}
elasticsearch-head 的镜像准备
- 首先 clone elasticsearch-head 项目到 /data/workspace 目录下
git clone git://github.com/mobz/elasticsearch-head.git
- dc-all.yml 文件中配置 elasticsearch-head
elasticsearch-head:
image: mobz/elasticsearch-head:5
container_name: elasticsearch-head
restart: on-failure:3
environment:
- LANG=zh_CN.UTF-8
- TZ=CST-8
volumes:
- /data/workspace/elasticsearch-head/Gruntfile.js:/usr/src/app/Gruntfile.js
- /data/workspace/elasticsearch-head/_site/app.js:/usr/src/app/_site/app.js
ports:
- "9100:9100"
labels:
- project.source=
- project.extra=public-image
- project.depends=
- project.owner=
对于 Gruntfile.js
需要改动 97 行如下:
connect: {
server: {
options: {
hostname: '0.0.0.0',
port: 9100,
base: '.',
keepalive: true
}
}
}
对于 app.js
需要改动 4379 行: 修改 localhost 为 elasticsearch 集群地址
/** 修改 localhost 为 elasticsearch 集群地址,Docker 部署中,一般是 elasticsearch 宿主机地址 */
this.base_uri = this.config.base_uri || this.prefs.get("app-base_uri") || "http://elasticsearch 服务器地址:9200/";
elasticsearch 的镜像准备
- dc-all.yml 文件中关于 elasticsearch 的配置
elasticsearch:
image: elasticsearch:6.7.1
container_name: elasticsearch
restart: on-failure:3
environment:
- LANG=zh_CN.UTF-8
- TZ=CST-8
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
volumes:
- /data/var/elasticsearch/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml
ports:
- "9200:9200"
- "9300:9300"
labels:
- project.source=
- project.extra=public-image
- project.depends=
- project.owner=
elasticsearch.yml
配置启用 cors 跨域访问, 就可以通过 elasticsearch-head 来访问 elasticsearch 了
cluster.name: "docker-cluster"
network.host: 0.0.0.0
http.cors.enabled: true
http.cors.allow-origin: "*"
http.cors.allow-methods: OPTIONS, HEAD, GET, POST, PUT, DELETE
http.cors.allow-headers: "X-Requested-With, Content-Type, Content-Length, X-User"
elasticsearch 启动报错:
max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144]
即: elasticsearch 用户拥有的内存权限太小,至少需要 262144
sudo vi /etc/sysctl.conf
文件最后添加一行: vm.max_map_count=262144
然后 sudo sysctl -p
重新加载配置, 再重启 elasticsearch 即可
服务启动
将以上的服务全部启动之后, 访问 http://elasticsearch-head 服务器地址:9100/
, 可以看到如下界面(集群健康值为黄色, 是因为我没有做备份)
当然, 刚开始因为没有创建索引, 所以是看不到日志的, 我们可以加一个定时任务每天自动创建索引并处理掉之前的索引:autoIndex4DapengLog.sh
: 定期保存 7 天的索引, 打开最近三天的索引, 创建第二天的索引
#!/bin/bash
#
# 索引关闭及删除
# @date 2018 年 05 月 10 日 18:00:00
# @description Copyright (c) 2015, github.com/dapeng-soa All Rights Reserved.
date=`date -d "2 days ago" +%Y.%m.%d`
date1=`date -d "6 days ago" +%Y.%m.%d`
echo $date
echo $date1
#关闭索引
curl -H "Content-Type: application/json" -XPOST http://elasticsearch 服务器地址:9200/dapeng_log_index-$date/_close
#删除索引
curl -H "Content-Type: application/json" -XDELETE "http://elasticsearch 服务器地址:9200/dapeng_log_index-$date1"
#添加索引
tomorrow=`date -d tomorrow +%Y.%m.%d`
# 需要创建索引的 elasticsearch 服务器列表
ipList=(elasticsearch 服务器地址:9200)
for i in ${ipList[@]};do
curl -H "Content-Type: application/json" -XPUT http://$i/dapeng_log_index-$tomorrow -d'{"mappings": {"_default_": {"_all": {"enabled":"false"}
},
"dapeng_log": {
"properties": {
"logtime": {
"type": "date",
"format": "MM-dd HH:mm:ss SSS"
},
"threadPool": {
"type": "keyword",
"norms": false,
"index_options": "docs"
},
"level": {
"type": "keyword",
"norms": false,
"index_options": "docs"
},
"tag": {
"type": "keyword",
"norms": false,
"index_options": "docs"
},
"message": {
"type": "keyword",
"ignore_above": 2048,
"norms": false,
"index_options": "docs"
},
"hostname": {
"type": "keyword",
"norms": false,
"index_options": "docs"
},
"sessionTid": {
"type": "keyword",
"norms": false,
"index_options": "docs"
},
"log": {
"type": "keyword",
"norms": false,
"index_options": "docs"
}
}
}
},
"settings": {
"index": {
"max_result_window": "100000000",
"number_of_shards": "3",
"number_of_replicas": "1",
"codec": "best_compression",
"translog": {
"sync_interval": "60s",
"durability": "async",
"flush_threshold_size": "1024mb"
},
"merge":{
"policy":{"max_merged_segment": "2gb"}
},
"refresh_interval": "10s"
}
},
"warmers": {}}'response=`curl -H"Content-Type: application/json"-s"http://$i/_cat/indices?v" |grep open | grep dapeng_log_index-$tomorrow |wc -l`
echo -e "\n"
if ["$response" == 1];then
break
else
continue
fi
done;
crontab -e
将此命令加入定时任务, 每天 23:00 定时执行, 创建第二天的索引:
0 23 * * * (cd /data/workspace/elasticsearch-head/; sh autoIndex4DapengLog.sh) > /data/workspace/elasticsearch-head/autoIndex4DapengLog.log
现在就可以在 查看日志数据了
如果想去除 elasticsearch 自带的一些字段信息 (例如_index, _id, _score 等) 显示在表格中, 需要修改 elasticsearch-head/_site/app.js, 改动 2038 行如下:
_data_handler: function(store) {
// 去除结果集中无用字段
var customFields = ["logtime", "hostname", "tag", "sessionTid", "threadPool", "level", "message", "log"];
store.columns = customFields;
//store.columns = store.columns.filter(i => customFields.indexOf(i) > -1);
this.tools.text(store.summary);
this.headers.empty().append(this._header_template(store.columns));
this.body.empty().append(this._body_template(store.data, store.columns));
this._reflow();},
注意 customFields
中的字段和创建索引时的字段一致, 且其中部分字段是由 fluent-bit 解析得到的
todo
- 当然 这边图中也可以看到一些字段为空的数据, 将其中的 log 字段的值拷贝到 https://regex101.com/ 网站进行解析, 发现和之前的正则解析
Regex
不匹配, 所以部分字段没有解析到值, 无法解析的部分内容就在 log 中, 后续需要将这些内容过滤掉 - 基于现有的日志系统,开发生产故障实时告警系统