写在开篇
本文只分享各个链路环节的配置对接,对于环境的搭建,比方 kafka 集群、es 集群的搭建等请自行实现。还有,业务利用的日志能够是你的其余业务日志,心愿本文能够起到抛砖援用的成果。
测试架构
这个架构形容了一个将来自不同数据源的数据通过 Kafka 直达,而后应用 Logstash 将数据从 Kafka 中读取并解决,最终将解决后的数据再写回到 Kafka 中,以供 Elasticsearch 进行存储和剖析的过程。通过应用 Kafka 和 Logstash,能够将来自不同数据源的数据进行集中管理和解决,并将数据以牢靠的形式发送到 Elasticsearch 进行存储和剖析。这种架构具备高可用性和可伸缩性,并且能够在解决大量数据时提供良好的性能。同时,因为 Logstash 能够从多种起源读取数据,因而能够适应各种数据集成计划的需要。
留神:kafka 集群 a 接管来自 filebeat 的音讯,并由 logstash 进行生产。kafka 集群 b 接管来自 logstash 的音讯,并由 es 或者其余业务进行生产。
机器布局
主机名 | 角色 | IP | 备注 |
---|---|---|---|
srv-mysql8 | mysql+filebeat | 192.168.11.161 | 业务数据库,filebeat 读取 mysql 的日志 |
wordpress | nginx+filebeat | 192.168.11.170 | 业务 web 利用,filebeat 读取 nginx 的日志 |
kafka01 | kafka 节点 | 192.168.11.247 | kafka 集群 a,broker.id=1 |
kafka02 | kafka 节点 | 192.168.11.248 | kafka 集群 a,broker.id=2 |
kafka03 | kafka 节点 | 192.168.11.249 | kafka 集群 a,broker.id=3 |
logstash01 | logstash | 192.168.11.250 | 第 1 套 logstash |
kafka-b01 | kafka 节点 | 192.168.11.40 | kafka 集群 b,broker.id=10 |
kafka-b02 | kafka 节点 | 192.168.11.41 | kafka 集群 b,broker.id=20 |
kafka-b03 | kafka 节点 | 192.168.11.42 | kafka 集群 b,broker.id=30 |
logstash02 | logstash | 192.168.11.133 | 第 2 套 logstash |
es-node01 | es 节点(主) | 192.168.11.243 | es 集群 |
es-node02 | es 节点 | 192.168.11.244 | es 集群 |
es-node03 | es 节点 | 192.168.11.245 | es 集群 |
kibana-svr | kibana | 192.168.11.246 |
留神:在同一个网段中,两个 Kafka 集群必须要应用不同的 broker.id,否则会导致抵触。因为 broker.id 是 Kafka 集群中惟一标识一个 Broker 的参数,同一个网段中不能存在两个具备雷同 broker.id 的 Broker。
实战开撸
创立 kafka 主题
在 kafka 集群 a 中创立主题
bin/kafka-topics.sh --create --zookeeper 192.168.11.247:2181 --replication-factor 2 --partitions 3 --topic wordpress-nginx-log
装置和配置 filebeat
在装置了 wordpress web 的业务主机上进行装置
- 下载和装置
wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-8.6.2-x86_64.rpm
rpm -ivh filebeat-8.6.2-x86_64.rpm
filebeat version
- 配置 filebeat 读取日志 关上和编辑 /etc/filebeat/filebeat.yml 配置文件,增加以下内容,使其读取 Nginx 拜访日志文件:
filebeat.inputs:
- type: log
enabled: true
paths:
- /usr/local/nginx/logs/wordpress.access.log
fields:
log_type: wordpress_access
output.kafka:
hosts: ["192.168.11.247:9092",192.168.11.248:9092,192.168.11.249:9092]
topic: "wordpress-nginx-log"
- 启动 filebeat
systemctl start filebeat
配置 logstash01,生产 kafka 集群 a 的音讯
在 logstash01 主机上配置 logstash,使其可能生产 kafka 集群 a 中主题为 ”wordpress-nginx-log” 的音讯。
- 装置 kafka 插件
bin/logstash-plugin install logstash-input-kafka
留神:如果装置的时候提醒:ERROR: Installation aborted, plugin ‘logstash-input-kafka’ is already provided by ‘logstash-integration-kafka’,这个谬误提醒 ’logstash-input-kafka’ 插件曾经被 Logstash 集成插件 ’logstash-integration-kafka’ 提供了,能够间接应用 logstash-integration-kafka 插件生产 Kafka 音讯
- 在 Logstash 的配置文件中应用 kafka 输出插件
配置之前,先阐明下我的 nginx 日志自定义的格局:
log_format my_log_format '$remote_addr - $remote_user [$time_local]"$request"''$status $body_bytes_sent "$http_referer" ''"$http_user_agent""$http_x_forwarded_for"';
创立和编辑配置文件 config/wordpress-logstash.conf,增加以下内容:
input {
kafka {
bootstrap_servers => "192.168.11.247:9092,192.168.11.248:9092,192.168.11.249:9092"
topics => ["wordpress-nginx-log"]
}
}
filter {
# 解析 Nginx 日志行
grok {match => { "message" => '%{IPORHOST:clientip} - %{USERNAME:remote_user} [%{HTTPDATE:timestamp}]"%{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}"%{NUMBER:status} %{NUMBER:body_bytes_sent}"%{DATA:http_referer}""%{DATA:user_agent}"' }
}
# 将工夫戳转换为 ISO 8601 格局
date {match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
target => "@timestamp"
}
}
output {stdout { codec => rubydebug}
}
- 启动 Logstash
bin/logstash -f config/wordpress-logstash.conf
在这里,先让其输入到屏幕,看看是否失常。确保 logstash 生产 kafka 集群 a 的音讯没问题、以及确保过滤没问题能够失常打印到屏幕之后,就能够持续上面的步骤了。
配置 logstash01,过滤后的音讯写入到 kafka 集群 b
持续在 logstash01 上配置,从 kafka 集群 a 中生产数据并过滤,解决后写入到 kafka 集群 b 中的主题 wordpress-web-log。
- 在 kafka 集群 b 中创立主题
bin/kafka-topics.sh --create --zookeeper 192.168.11.40:2181 --replication-factor 2 --partitions 3 --topic wordpress-web-log
- 编辑配置文件 config/wordpress-logstash.conf,配置 output
input {
kafka {
bootstrap_servers => "192.168.11.247:9092,192.168.11.248:9092,192.168.11.249:9092"
topics => ["wordpress-nginx-log"]
}
}
filter {
# 解析 Nginx 日志行
grok {match => { "message" => '%{IPORHOST:clientip} - %{USERNAME:remote_user} [%{HTTPDATE:timestamp}]"%{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}"%{NUMBER:status} %{NUMBER:body_bytes_sent}"%{DATA:http_referer}""%{DATA:user_agent}"' }
}
# 将工夫戳转换为 ISO 8601 格局
date {match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
target => "@timestamp"
}
}
output {
kafka {
bootstrap_servers => "192.168.11.40:9092,192.168.11.41:9092,192.168.11.42:9092"
topic_id => "wordpress-web-log"
}
}
编辑实现后,记得重启 logstash 哦。
- 长期启动一个消费者,验证从 kafka 集群 b 生产主题 wordpress-web-log 的音讯
bin/kafka-console-consumer.sh --bootstrap-server 192.168.11.40:9092 --topic wordpress-web-log
如果能失常生产,读取到的音讯打印到管制台上,就能够持续上面的步骤了。
配置 logstash02,生产 kafka 集群 a 的音讯
在 logstash02 主机上配置 logstash,使其可能生产 kafka 集群 b 中主题为 ”wordpress-web-log” 的音讯,并写入到 ES 集群
关上并编辑 config/logstash.conf,增加以下内容:
input {
kafka {
bootstrap_servers => "192.168.11.40:9092,192.168.11.41:9092,192.168.11.42:9092"
topics => ["wordpress-web-log"]
}
}
output {
elasticsearch {hosts => ["http://192.168.11.243:9200","http://192.168.11.244:9200","http://192.168.11.245:9200"]
index => "wordpress-web-log-%{+YYYY.MM.dd}"
}
}
写在最初
所有环节对接结束,看看最终成绩。
对于如何将 logstash 部署到 K8S,感兴趣?请放弃高度关注,有空了再分享。
本文转载于 WX 公众号:不背锅运维(喜爱的盆友关注咱们):https://mp.weixin.qq.com/s/5uGpFBEg67yxRCNxLpyUmw