写在开篇

本文只分享各个链路环节的配置对接,对于环境的搭建,比方kafka集群、es集群的搭建等请自行实现。还有,业务利用的日志能够是你的其余业务日志,心愿本文能够起到抛砖援用的成果。

测试架构

这个架构形容了一个将来自不同数据源的数据通过 Kafka 直达,而后应用 Logstash 将数据从 Kafka 中读取并解决,最终将解决后的数据再写回到 Kafka 中,以供 Elasticsearch 进行存储和剖析的过程。通过应用 Kafka 和 Logstash,能够将来自不同数据源的数据进行集中管理和解决,并将数据以牢靠的形式发送到 Elasticsearch 进行存储和剖析。这种架构具备高可用性和可伸缩性,并且能够在解决大量数据时提供良好的性能。同时,因为 Logstash 能够从多种起源读取数据,因而能够适应各种数据集成计划的需要。

留神:kafka集群a接管来自filebeat的音讯,并由logstash进行生产。kafka集群b接管来自logstash的音讯,并由es或者其余业务进行生产。

机器布局

主机名角色IP备注
srv-mysql8mysql+filebeat192.168.11.161业务数据库,filebeat读取mysql的日志
wordpressnginx+filebeat192.168.11.170业务web利用,filebeat读取nginx的日志
kafka01kafka节点192.168.11.247kafka集群a,broker.id=1
kafka02kafka节点192.168.11.248kafka集群a,broker.id=2
kafka03kafka节点192.168.11.249kafka集群a,broker.id=3
logstash01logstash192.168.11.250第1套logstash
kafka-b01kafka节点192.168.11.40kafka集群b,broker.id=10
kafka-b02kafka节点192.168.11.41kafka集群b,broker.id=20
kafka-b03kafka节点192.168.11.42kafka集群b,broker.id=30
logstash02logstash192.168.11.133第2套logstash
es-node01es节点(主)192.168.11.243es集群
es-node02es节点192.168.11.244es集群
es-node03es节点192.168.11.245es集群
kibana-svrkibana192.168.11.246
留神:在同一个网段中,两个 Kafka 集群必须要应用不同的 broker.id,否则会导致抵触。因为 broker.id 是 Kafka 集群中惟一标识一个 Broker 的参数,同一个网段中不能存在两个具备雷同 broker.id 的 Broker。

实战开撸

创立kafka主题

在kafka集群a中创立主题
bin/kafka-topics.sh --create --zookeeper 192.168.11.247:2181 --replication-factor 2 --partitions 3 --topic wordpress-nginx-log

装置和配置filebeat

在装置了wordpress web的业务主机上进行装置
  1. 下载和装置
wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-8.6.2-x86_64.rpmrpm -ivh filebeat-8.6.2-x86_64.rpm filebeat version
  1. 配置filebeat读取日志 关上和编辑/etc/filebeat/filebeat.yml配置文件,增加以下内容,使其读取Nginx拜访日志文件:
filebeat.inputs:- type: log  enabled: true  paths:    - /usr/local/nginx/logs/wordpress.access.log  fields:    log_type: wordpress_accessoutput.kafka:  hosts: ["192.168.11.247:9092",192.168.11.248:9092,192.168.11.249:9092]  topic: "wordpress-nginx-log"
  1. 启动filebeat
systemctl start filebeat

配置logstash01,生产kafka集群a的音讯

在logstash01主机上配置logstash,使其可能生产kafka集群a中主题为"wordpress-nginx-log"的音讯。

  1. 装置kafka插件
bin/logstash-plugin install logstash-input-kafka
留神:如果装置的时候提醒:ERROR: Installation aborted, plugin 'logstash-input-kafka' is already provided by 'logstash-integration-kafka',这个谬误提醒'logstash-input-kafka'插件曾经被Logstash集成插件'logstash-integration-kafka'提供了,能够间接应用 logstash-integration-kafka 插件生产 Kafka 音讯
  1. 在 Logstash 的配置文件中应用 kafka 输出插件

配置之前,先阐明下我的nginx日志自定义的格局:

log_format my_log_format '$remote_addr - $remote_user [$time_local] "$request" '                              '$status $body_bytes_sent "$http_referer" '                              '"$http_user_agent" "$http_x_forwarded_for"';

创立和编辑配置文件config/wordpress-logstash.conf,增加以下内容:

input {  kafka {    bootstrap_servers => "192.168.11.247:9092,192.168.11.248:9092,192.168.11.249:9092"    topics => ["wordpress-nginx-log"]  }}filter {  # 解析Nginx日志行  grok {    match => { "message" => '%{IPORHOST:clientip} - %{USERNAME:remote_user} [%{HTTPDATE:timestamp}] "%{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:status} %{NUMBER:body_bytes_sent} "%{DATA:http_referer}" "%{DATA:user_agent}"' }  }  # 将工夫戳转换为ISO 8601格局  date {    match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]    target => "@timestamp"  }}output {  stdout { codec => rubydebug }}
  1. 启动 Logstash
bin/logstash -f config/wordpress-logstash.conf 
在这里,先让其输入到屏幕,看看是否失常。确保logstash生产kafka集群a的音讯没问题、以及确保过滤没问题能够失常打印到屏幕之后,就能够持续上面的步骤了。

配置logstash01,过滤后的音讯写入到kafka集群b

持续在logstash01上配置,从kafka集群a中生产数据并过滤,解决后写入到kafka集群b中的主题wordpress-web-log。
  1. 在kafka集群b中创立主题
bin/kafka-topics.sh --create --zookeeper 192.168.11.40:2181 --replication-factor 2 --partitions 3 --topic wordpress-web-log
  1. 编辑配置文件config/wordpress-logstash.conf,配置output
input {  kafka {    bootstrap_servers => "192.168.11.247:9092,192.168.11.248:9092,192.168.11.249:9092"    topics => ["wordpress-nginx-log"]  }}filter {  # 解析Nginx日志行  grok {    match => { "message" => '%{IPORHOST:clientip} - %{USERNAME:remote_user} [%{HTTPDATE:timestamp}] "%{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:status} %{NUMBER:body_bytes_sent} "%{DATA:http_referer}" "%{DATA:user_agent}"' }  }  # 将工夫戳转换为ISO 8601格局  date {    match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]    target => "@timestamp"  }}output {  kafka {    bootstrap_servers => "192.168.11.40:9092,192.168.11.41:9092,192.168.11.42:9092"    topic_id => "wordpress-web-log"  }}
编辑实现后,记得重启logstash哦。
  1. 长期启动一个消费者,验证从kafka集群b生产主题wordpress-web-log的音讯
bin/kafka-console-consumer.sh --bootstrap-server 192.168.11.40:9092 --topic wordpress-web-log
如果能失常生产,读取到的音讯打印到管制台上,就能够持续上面的步骤了。

配置logstash02,生产kafka集群a的音讯

在logstash02主机上配置logstash,使其可能生产kafka集群b中主题为"wordpress-web-log"的音讯,并写入到ES集群

关上并编辑config/logstash.conf,增加以下内容:

input {  kafka {    bootstrap_servers => "192.168.11.40:9092,192.168.11.41:9092,192.168.11.42:9092"    topics => ["wordpress-web-log"]  }}output {  elasticsearch {    hosts => ["http://192.168.11.243:9200","http://192.168.11.244:9200","http://192.168.11.245:9200"]    index => "wordpress-web-log-%{+YYYY.MM.dd}"  }}

写在最初

所有环节对接结束,看看最终成绩。

对于如何将logstash部署到K8S,感兴趣?请放弃高度关注,有空了再分享。

本文转载于WX公众号:不背锅运维(喜爱的盆友关注咱们):https://mp.weixin.qq.com/s/5uGpFBEg67yxRCNxLpyUmw