download:开课吧数据分析高薪造就打算精英班30期

部署ELK+Kafka+Filebeat日志收集剖析零碎

1.环境规划

日志零碎架构
nginx--->filebeat--->kafka--->logstash--->elasticsearch--->kibana

2.部署elasticsearch集群

2.1.配置es-1节点

1.下载elasticsearch7.6[root@elk-1 ~]# wget https://mirrors.huaweicloud.com/elasticsearch/7.6.0/elasticsearch-7.6.0-x86_64.rpm[root@elk-1 ~/soft]# rpm -ivh elasticsearch-7.6.0-x86_64.rpm 2.编辑配置文件,配置集群模式[root@elk-1 ~]#  vim /etc/elasticsearch/elasticsearch.ymlcluster.name: elk-applicationnode.name: elk-1path.data: /data/elasticsearchpath.logs: /var/log/elasticsearchbootstrap.memory_lock: truenetwork.host: 192.168.81.210,127.0.0.1http.port: 9200cluster.initial_master_nodes: ["elk-1"]discovery.zen.ping.unicast.hosts: ["192.168.81.210","192.168.81.220","192.168.81.230"]discovery.zen.fd.ping_timeout: 120sdiscovery.zen.fd.ping_retries: 6discovery.zen.fd.ping_interval: 30shttp.cors.enabled: truehttp.cors.allow-origin: "*"3.创立数据目录[root@elk-1 ~]# mkdir /data/elasticsearch/ -p[root@elk-1 ~]# chown -R elasticsearch.elasticsearch /data/elasticsearch/4.配置内存锁定[root@elk-1 ~]# mkdir /etc/systemd/system/elasticsearch.service.d/[root@elk-1 ~]# vim /etc/systemd/system/elasticsearch.service.d/override.conf[Service]LimitMEMLOCK=infinity5.启动elasticsearch[root@elk-1 ~]# systemctl daemon-reload [root@elk-1 ~]# systemctl start elasticsearch[root@elk-1 ~]# systemctl enable elasticsearch

复制代码
2.2.配置es-2节点
只是配置文件中node.name和network.host不同,其余操作形式统一

[root@elk-2 ~]#  vim /etc/elasticsearch/elasticsearch.ymlcluster.name: elk-applicationnode.name: elk-2path.data: /data/elasticsearchpath.logs: /var/log/elasticsearchbootstrap.memory_lock: truenetwork.host: 192.168.81.220,127.0.0.1http.port: 9200cluster.initial_master_nodes: ["elk-1"]discovery.zen.ping.unicast.hosts: ["192.168.81.210","192.168.81.220","192.168.81.230"]discovery.zen.fd.ping_timeout: 120sdiscovery.zen.fd.ping_retries: 6discovery.zen.fd.ping_interval: 30shttp.cors.enabled: truehttp.cors.allow-origin: "*"

复制代码
2.3.配置es-3节点
只是配置文件中node.name和network.host不同,其余操作形式统一

[root@elk-2 ~]#  vim /etc/elasticsearch/elasticsearch.ymlcluster.name: elk-applicationnode.name: elk-3path.data: /data/elasticsearchpath.logs: /var/log/elasticsearchbootstrap.memory_lock: truenetwork.host: 192.168.81.230,127.0.0.1http.port: 9200cluster.initial_master_nodes: ["elk-1"]discovery.zen.ping.unicast.hosts: ["192.168.81.210","192.168.81.220","192.168.81.230"]discovery.zen.fd.ping_timeout: 120sdiscovery.zen.fd.ping_retries: 6discovery.zen.fd.ping_interval: 30shttp.cors.enabled: truehttp.cors.allow-origin: "*"

复制代码
2.4.应用es-head插件查看集群状态

3.部署kibana
1.下载kibana rpm包[root@elk-1 ~]#  rpm -ivh kibana-7.6.0-x86_64.rpm 2.配置kibana[root@elk-1 ~]# vim /etc/kibana/kibana.ymlserver.port: 5601                                    server.host: "192.168.81.210"                                server.name: "elk-application"                                                elasticsearch.hosts: ["http://192.168.81.210:9200"]                i18n.locale: "zh-CN"[root@elk-1 ~]# systemctl restart kibana[root@elk-1 ~]#  systemctl enable elasticsearch

复制代码
kibana部署胜利

4.部署zookeeper

4.1.配置zookeeper-1节点

1.下载软件[root@elk-1 ~]# wget http://archive.apache.org/dist/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz2.解压并挪动zookeeper[root@elk-1 ~]# tar xf soft/zookeeper-3.4.13.tar.gz -C /data/[root@elk-1 ~]# mv /data/zookeeper-3.4.13/ /data/zookeeper3.创立数据目录和日志目录[root@elk-1 ~]# mkdir /data/zookeeper/{data,logs}4.筹备配置文件[root@elk-1 ~]# cd /data/zookeeper/conf[root@elk-1 /data/zookeeper/conf]# cp zoo_sample.cfg zoo.cfg[root@elk-1 /data/zookeeper/conf]# vim zoo.cfg tickTime=2000initLimit=10syncLimit=5dataDir=/data/zookeeper/dataDataLogDir=/data/zookeeper/logsclientPort=2181server.1=192.168.81.210:2888:3888server.2=192.168.81.220:2888:3888server.3=192.168.81.230:2888:38885.生成节点id文件#节点id只能爱护数字[root@elk-1 /data/zookeeper]# echo 1 > /data/zookeeper/data/myid

复制代码
4.2.配置zookeeper-2节点

与zookeeper-1节点只有配置文件和节点id文件有点不同,其余全一样[root@elk-2 /data/zookeeper/conf]# cat zoo.cfg tickTime=2000initLimit=10syncLimit=5dataDir=/data/zookeeper/dataDataLogDir=/data/zookeeper/logsclientPort=2181server.1=192.168.81.210:2888:3888server.2=192.168.81.220:2888:3888server.3=192.168.81.230:2888:3888[root@elk-2 /data/zookeeper/conf]# echo 2 > /data/zookeeper/data/myid

复制代码
4.3.配置zookeeper-3节点

[root@elk-3 /data/zookeeper/conf]# cat zoo.cfg tickTime=2000initLimit=10syncLimit=5dataDir=/data/zookeeper/dataDataLogDir=/data/zookeeper/logsclientPort=2181server.1=192.168.81.210:2888:3888server.2=192.168.81.220:2888:3888server.3=192.168.81.230:2888:3888[root@elk-3 /data/zookeeper/conf]# echo 3 > /data/zookeeper/data/myid

复制代码
4.4.启动所有节点

zookeeper集群必须保障有两个节点存活,也就是说必须同时要启动两个节点,否则集群将启动不胜利,因而要都批改好配置文件后,再对立启动
[root@elk-1 /data/zookeeper]# ./bin/zkServer.sh statusZooKeeper JMX enabled by defaultUsing config: /data/zookeeper/bin/../conf/zoo.cfgMode: follower[root@elk-2 /data/zookeeper]# ./bin/zkServer.sh statusZooKeeper JMX enabled by defaultUsing config: /data/zookeeper/bin/../conf/zoo.cfgMode: follower[root@elk-3 /data/zookeeper]# ./bin/zkServer.sh statusZooKeeper JMX enabled by defaultUsing config: /data/zookeeper/bin/../conf/zoo.cfgMode: leader

复制代码

5.部署kafka

留神:不要应用kafka2.11版本,有重大的bug,filebeat无奈写入数据到kafka集群,写入的协定版本不同,存在问题

5.1.配置kafka-1节点
1.下载二进制包[root@elk-1 ~]# wget https://archive.apache.org/dist/kafka/2.0.0/kafka_2.11-2.0.0.tgz2.装置kafka[root@elk-1 ~/soft]# tar xf kafka_2.13-2.4.0.tgz -C /data/[root@elk-1 ~]# mv /data/kafka_2.13-2.4.0 /data/kafka3.批改配置文件[root@elk-1 ~]# cd /data/kafka[root@elk-1 /data/kafka]# vim config/server.properties broker.id=1listeners=PLAINTEXT://192.168.81.210:9092host.name=192.168.81.210advertised.listeners=PLAINTEXT://192.168.81.210:9092advertised.host.name=192.168.81.210num.network.threads=3num.io.threads=8socket.send.buffer.bytes=102400socket.receive.buffer.bytes=102400socket.request.max.bytes=104857600log.dirs=/data/kafka/datanum.partitions=3delete.topic.enable=trueauto.create.topics.enable=true replica.fetch.max.bytes=5242880num.recovery.threads.per.data.dir=1offsets.topic.replication.factor=3transaction.state.log.replication.factor=3transaction.state.log.min.isr=3message.max.byte=5242880log.cleaner.enable=truelog.retention.hours=48log.segment.bytes=1073741824log.retention.check.interval.ms=15000zookeeper.connect=192.168.81.210:2181,192.168.81.220:2181,192.168.81.230:2181zookeeper.connection.timeout.ms=60000group.initial.rebalance.delay.ms=04.创立数据目录[root@elk-3 ~]# mkdir /data/kafka/data

复制代码
5.2.配置kafka-2节点
只是配置文件不同,其余与kafka-1节点操作统一
配置文件须要改的中央:broker.id改成2,示意第二个节点 listeners host.name advertised.listeners advertised.host.name改老本机ip地址

[root@elk-2 /data/kafka]# cat config/server.properties broker.id=2listeners=PLAINTEXT://192.168.81.220:9092host.name=192.168.81.220advertised.listeners=PLAINTEXT://192.168.81.220:9092advertised.host.name=192.168.81.220num.network.threads=3num.io.threads=8socket.send.buffer.bytes=102400socket.receive.buffer.bytes=102400socket.request.max.bytes=104857600log.dirs=/data/kafka/datanum.partitions=3delete.topic.enable=trueauto.create.topics.enable=true replica.fetch.max.bytes=5242880num.recovery.threads.per.data.dir=1offsets.topic.replication.factor=3transaction.state.log.replication.factor=3transaction.state.log.min.isr=3message.max.byte=5242880log.cleaner.enable=truelog.retention.hours=48log.segment.bytes=1073741824log.retention.check.interval.ms=15000zookeeper.connect=192.168.81.210:2181,192.168.81.220:2181,192.168.81.230:2181zookeeper.connection.timeout.ms=60000group.initial.rebalance.delay.ms=0

复制代码
5.3.配置kafka-3节点
只是配置文件不同,其余与kafka-1节点操作统一
配置文件须要改的中央:broker.id改成3,示意第三个节点 listeners host.name advertised.listeners advertised.host.name改老本机ip地址

[root@elk-3 /data/kafka]# cat config/server.properties broker.id=3listeners=PLAINTEXT://192.168.81.230:9092host.name=192.168.81.230advertised.listeners=PLAINTEXT://192.168.81.230:9092advertised.host.name=192.168.81.230num.network.threads=3num.io.threads=8socket.send.buffer.bytes=102400socket.receive.buffer.bytes=102400socket.request.max.bytes=104857600log.dirs=/data/kafka/datanum.partitions=3delete.topic.enable=trueauto.create.topics.enable=true replica.fetch.max.bytes=5242880num.recovery.threads.per.data.dir=1offsets.topic.replication.factor=3transaction.state.log.replication.factor=3transaction.state.log.min.isr=3message.max.byte=5242880log.cleaner.enable=truelog.retention.hours=48log.segment.bytes=1073741824log.retention.check.interval.ms=15000zookeeper.connect=192.168.81.210:2181,192.168.81.220:2181,192.168.81.230:2181zookeeper.connection.timeout.ms=60000group.initial.rebalance.delay.ms=0

复制代码
5.4.启动kafka

[root@elk-1 ~]# /data/kafka/bin/kafka-server-start -daemon /data/kafka/config/server.properties[root@elk-2 ~]# /data/kafka/bin/kafka-server-start -daemon /data/kafka/config/server.properties[root@elk-3 ~]# /data/kafka/bin/kafka-server-start -daemon /data/kafka/config/server.properties

复制代码
6.测试kafka与zookeeper连贯
kafka可能产生数据并生产,整个集群就能够应用了

1.创立一个topic[root@elk-1 /data/kafka]# ./bin/kafka-topics.sh --create --zookeeper 192.168.81.210:2181,192.168.81.220:2181,192.168.81.230:2181 --replication-factor 1 --partitions 1 --topic testpicCreated topic "testpic".2.查看topic[root@elk-1 /data/kafka]# ./bin/kafka-topics.sh --list --zookeeper 192.168.81.210:2181,192.168.81.220:2181,192.168.81.230:2181testpic3.查看topic的形容信息[root@elk-1 /data/kafka]# ./bin/kafka-topics.sh --describe --zookeeper 192.168.81.210:2181,192.168.81.220:2181,192.168.81.230:2181 --topic testpic4.应用kafka-console-producer控制台生产数据[root@elk-1 /data/kafka]# ./bin/kafka-console-producer.sh --broker-list 192.168.81.210:9092,192.168.81.220:9092,192.168.81.230:9092 --topic testpic>test1>test2>test3>test4>test5>test6>test7>test8>test9>test105.应用kafka-console-consumer控制台生产数据[root@elk-1 /data/kafka]# ./bin/kafka-console-consumer.sh --bootstrap-server 192.168.81.210:9092,192.168.81.220:9092,192.168.81.230:9092 --topic testpic --from-beginningtest1test2test3test4test5test6test7test8test9test10#删除一个topic[root@elk-1 /data/kafka]# ./bin/kafka-topics.sh --delete --zookeeper 192.168.81.210:2181  --topic testpic

复制代码

7.配置filebeat收集nginx、tomcat日志并存储到kafka中

7.1.装置并配置nginx服务

1.装置nginx[root@elk-3 ~]# yum -y install nginx2.配置nginx日志格局[root@elk-3 ~]# vim /etc/nginx/nginx.confhttp {··············    log_format  main '{"工夫":"$time_iso8601",'                       '"客户端外网地址":"$http_x_forwarded_for",'                       '"客户端内网地址":"$remote_addr",'                       '"状态码":$status,'                       '"传输流量":$body_bytes_sent,'                       '"跳转起源":"$http_referer",'                       '"URL":"$request",'                       '"浏览器":"$http_user_agent",'                       '"申请响应工夫":$request_time,'                       '"后端地址":"$upstream_addr"}';    access_log  /var/log/nginx/access.log  main;··············}2.启动nginx[root@elk-3 ~]# systemctl start nginx[root@elk-3 ~]# systemctl enable nginx4.拜访产生日志查看成果[root@elk-3 ~]# curl 127.0.0.1[root@elk-3 ~]# tail /var/log/nginx/access.log {"工夫":"2021-07-12T11:29:33+08:00","客户端外网地址":"-","客户端内网地址":"127.0.0.1","状态码":200,"传输流量":4833,"跳转起源":"-","URL":"GET / HTTP/1.1","浏览器":"curl/7.29.0","申请响应工夫":0.000,"后端地址":"-"}

复制代码
7.2.装置tomcat服务

[root@elk-3 ~]# tar xf apache-tomcat-8.5.12.tar.gz -C /data/[root@elk-3 ~]# mv /data/apache-tomcat-8.5.12/ /data/tomcat[root@elk-3 ~]# /data/tomcat/bin/startup.sh Using CATALINA_BASE:   /data/tomcatUsing CATALINA_HOME:   /data/tomcatUsing CATALINA_TMPDIR: /data/tomcat/tempUsing JRE_HOME:        /usrUsing CLASSPATH:       /data/tomcat/bin/bootstrap.jar:/data/tomcat/bin/tomcat-juli.jarTomcat started.

复制代码
7.3.装置filebeat服务

[root@elk-3 ~]# rpm -ivh filebeat-7.6.0-x86_64.rpm ```

复制代码
7.4.配置filebeat收集利用日志并存储到kafka

1.配置filebeat[root@elk-3 ~]# vim /etc/filebeat/filebeat.yml filebeat.inputs:- type: log                                      #类型为log  enabled: true  paths:                                        #指定日志所在的门路    - /var/log/nginx/access.log  json.keys_under_root: true                    #反对json格局的日志输入  json.overwriite_keys: true  fields:                                       #在日志中减少一个字段,字段为log_topic,值为nginx_access,logstash依据带有这个字段的日志存储到指定的es索引库    log_topic: nginx-access  tail_files: true                              #开启日志监控,从日志的最初一行开始收集- type: log  enabled: true  paths:    - /data/tomcat/logs/catalina.out  multiline.pattern: '^20'                      #收集tomcat谬误日志,从第一个20到下一个20之间的日志整合在一行中显示  multiline.negate: true  multiline.match: after  fields:    log_topic: tomcat-cata  tail_files: trueoutput.kafka:                                   #输入到kafka零碎  enabled: true  hosts: ["192.168.81.210:9092","192.168.81.220:9092","192.168.81.230:9092"]                           #kafka的地址  topic: '%{[fields][log_topic]}'               #指定将日志存储到kafka集群的哪个topic中,这里的topic值是援用在inputs中定义的fields,通过这种形式能够将不同门路的日志别离存储到不同的topic中  partition.round_robin:    reachable_only: false  required_acks: 1  compression: gzip  max_message_bytes: 1000000

2.启动filebeat

[root@elk-3 ~]# systemctl start filebeat[root@elk-3 ~]# systemctl enable filebeat

复制代码
7.5.产生程序日志数据察看数据是否存储kafka
1.产生程序日志

1.产生nginx日志[root@elk-3 ~]# ab -n 1000 -c 100 http://127.0.0.1/index.html2.产生tomcat日志[root@elk-3 ~]# /data/tomcat/bin/shutdown.sh[root@elk-3 ~]# /data/tomcat/bin/startup.sh 

复制代码
2.察看kafka中是否创立对应的topic

[root@elk-1 /data/kafka]# ./bin/kafka-topics.sh --list --zookeeper 192.168.81.210:2181,192.168.81.220:2181,192.168.81.230:2181__consumer_offsetsnginx-accesstestpictomcat-cata#nginx-access以及tomcat-cata的topic曾经创立胜利

复制代码
3.察看kafka日志的输入

[root@elk-1 /data/kafka]# tail -f logs/kafkaServer.out8.配置logstash从kafka中读取数据并存储到es集群部署logstash,配置logstash从kafka中读取topic数据并存储到es集群

8.1.部署logstash服务

1.装置logstash[root@elk-3 ~]# rpm -ivh logstash-7.6.0.rpm复制代码8.2.配置logstash从kafka读取数据存储到es集群[root@elk-3 ~]# cat /etc/logstash/conf.d/in_kafka_to_es.conf #从kafka中读取日志数据input {                #数据源端    kafka {                #类型为kafka        bootstrap_servers => ["192.168.81.210:9092,192.168.81.220:9092,192.168.81.230:9092"]            #kafka集群地址        topics => ["nginx-access","tomcat-cata"]            #要读取那些kafka topics        codec => "json"                                        #解决json格局的数据        auto_offset_reset => "latest"                        #只生产最新的kafka数据    }}#解决数据,去掉没用的字段filter {    if[fields][log_topic] == "nginx-access" {            #如果log_topic字段为nginx-access则进行以下数据处理        json {                    #json格局数据处理             source => "message"            #source等于message的             remove_field => ["@version","path","beat","input","log","offset","prospector","source","tags"]            #删除指定的字段        }        mutate {            #批改数据             remove_field => ["_index","_id","_type","_version","_score","referer","agent"]            #删除没用的字段        }    }        if[fields][log_topic] == "tomcat-cata" {        #如果log_topic字段为tomcat-cata        grok {                #解析格局         match => {              "message" => "(?<工夫>20[0-9]{2}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}) \[(?<线程名称>[^\s]{0,})\] (?<日志等级>\w+) (?<类名称>[^\s]{0,}) (?<日志详情>[\W\w]+)"             #将message的值减少上一些格局         }        }        mutate {            #批改数据                 remove_field => ["_index","_id","_type","_version","_score","referer","agent"]            #删除没用的字段            }      }}#数据处理后存储es集群output {                #指标端    if[fields][log_topic] == "nginx-access" {            #如果log_topic的字段值为nginx-access就存到上面的es集群里        elasticsearch {                                action => "index"            #类型为索引        hosts => ["192.168.81.210:9200","192.168.81.220:9200","192.168.81.230:9200"]        #es集群地址        index => "nginx-access-%{+YYYY.MM.dd}"            #存储到es集群的哪个索引里        codec => "json"                        #解决json格局的解析        }     }        if[fields][log_topic] == "tomcat-cata" {                #如果log_topic的字段值为tomcat-cata就存到上面的es集群里        elasticsearch {        action => "index"                #类型为索引        hosts => ["192.168.81.210:9200","192.168.81.220:9200","192.168.81.230:9200"]            #es集群地址        index => "tomcat-cata-%{+YYYY.MM.dd}"            #存储到es集群的哪个索引里        codec => "json"                        #解决json格局的解析        }     }        }

复制代码
8.3.启动logstash并察看日志

[root@elk-3 ~]# nphup /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/in_kafka_to_es.conf &

8.4.查看elasticsearch集群是否减少了对应的索引库
es集群曾经生成了tomcat-cata以及nginx-access索引库
到此为止logstash曾经胜利从kafka集群读取到日志数据,而后传入到elasticsearch集群不同的索引库

9.在kibana上关联elasticsearch索引库浏览日志数据

1)点击创立索引
2)填写索引名
3)增加一个工夫筛选字段
4)创立胜利

10.报错合集

1.es启动时报错无奈指定被申请的地址
解决办法:仔细检查配置文件,必定是某个地址配置错了,我的就是监听地址的ip写错了
2.filebeat写入数据到kafka api版本报错
剖析解决思路:初步断定为kafka2.11版本问题导致的,换成2.13问题解决