关于数据分析:开课吧数据分析高薪培养计划精英班30期

33次阅读

共计 14456 个字符,预计需要花费 37 分钟才能阅读完成。

download:开课吧数据分析高薪造就打算精英班 30 期

部署 ELK+Kafka+Filebeat 日志收集剖析零碎

1. 环境规划

日志零碎架构
nginx—>filebeat—>kafka—>logstash—>elasticsearch—>kibana

2. 部署 elasticsearch 集群

2.1. 配置 es- 1 节点

1. 下载 elasticsearch7.6
[root@elk-1 ~]# wget https://mirrors.huaweicloud.com/elasticsearch/7.6.0/elasticsearch-7.6.0-x86_64.rpm
[root@elk-1 ~/soft]# rpm -ivh elasticsearch-7.6.0-x86_64.rpm 

2. 编辑配置文件,配置集群模式
[root@elk-1 ~]#  vim /etc/elasticsearch/elasticsearch.yml
cluster.name: elk-application
node.name: elk-1
path.data: /data/elasticsearch
path.logs: /var/log/elasticsearch
bootstrap.memory_lock: true
network.host: 192.168.81.210,127.0.0.1
http.port: 9200
cluster.initial_master_nodes: ["elk-1"]
discovery.zen.ping.unicast.hosts: ["192.168.81.210","192.168.81.220","192.168.81.230"]
discovery.zen.fd.ping_timeout: 120s
discovery.zen.fd.ping_retries: 6
discovery.zen.fd.ping_interval: 30s
http.cors.enabled: true
http.cors.allow-origin: "*"

3. 创立数据目录
[root@elk-1 ~]# mkdir /data/elasticsearch/ -p
[root@elk-1 ~]# chown -R elasticsearch.elasticsearch /data/elasticsearch/

4. 配置内存锁定
[root@elk-1 ~]# mkdir /etc/systemd/system/elasticsearch.service.d/
[root@elk-1 ~]# vim /etc/systemd/system/elasticsearch.service.d/override.conf
[Service]
LimitMEMLOCK=infinity

5. 启动 elasticsearch
[root@elk-1 ~]# systemctl daemon-reload 
[root@elk-1 ~]# systemctl start elasticsearch
[root@elk-1 ~]# systemctl enable elasticsearch

复制代码
2.2. 配置 es- 2 节点
只是配置文件中 node.name 和 network.host 不同,其余操作形式统一

[root@elk-2 ~]#  vim /etc/elasticsearch/elasticsearch.yml
cluster.name: elk-application
node.name: elk-2
path.data: /data/elasticsearch
path.logs: /var/log/elasticsearch
bootstrap.memory_lock: true
network.host: 192.168.81.220,127.0.0.1
http.port: 9200
cluster.initial_master_nodes: ["elk-1"]
discovery.zen.ping.unicast.hosts: ["192.168.81.210","192.168.81.220","192.168.81.230"]
discovery.zen.fd.ping_timeout: 120s
discovery.zen.fd.ping_retries: 6
discovery.zen.fd.ping_interval: 30s
http.cors.enabled: true
http.cors.allow-origin: "*"

复制代码
2.3. 配置 es- 3 节点
只是配置文件中 node.name 和 network.host 不同,其余操作形式统一

[root@elk-2 ~]#  vim /etc/elasticsearch/elasticsearch.yml
cluster.name: elk-application
node.name: elk-3
path.data: /data/elasticsearch
path.logs: /var/log/elasticsearch
bootstrap.memory_lock: true
network.host: 192.168.81.230,127.0.0.1
http.port: 9200
cluster.initial_master_nodes: ["elk-1"]
discovery.zen.ping.unicast.hosts: ["192.168.81.210","192.168.81.220","192.168.81.230"]
discovery.zen.fd.ping_timeout: 120s
discovery.zen.fd.ping_retries: 6
discovery.zen.fd.ping_interval: 30s
http.cors.enabled: true
http.cors.allow-origin: "*"

复制代码
2.4. 应用 es-head 插件查看集群状态

3. 部署 kibana
1. 下载 kibana rpm 包
[root@elk-1 ~]#  rpm -ivh kibana-7.6.0-x86_64.rpm 

2. 配置 kibana
[root@elk-1 ~]# vim /etc/kibana/kibana.yml
server.port: 5601                                    
server.host: "192.168.81.210"                                
server.name: "elk-application"                                                
elasticsearch.hosts: ["http://192.168.81.210:9200"]                
i18n.locale: "zh-CN"

[root@elk-1 ~]# systemctl restart kibana
[root@elk-1 ~]#  systemctl enable elasticsearch

复制代码
kibana 部署胜利

4. 部署 zookeeper

4.1. 配置 zookeeper- 1 节点

1. 下载软件
[root@elk-1 ~]# wget http://archive.apache.org/dist/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz

2. 解压并挪动 zookeeper
[root@elk-1 ~]# tar xf soft/zookeeper-3.4.13.tar.gz -C /data/
[root@elk-1 ~]# mv /data/zookeeper-3.4.13/ /data/zookeeper

3. 创立数据目录和日志目录
[root@elk-1 ~]# mkdir /data/zookeeper/{data,logs}

4. 筹备配置文件
[root@elk-1 ~]# cd /data/zookeeper/conf
[root@elk-1 /data/zookeeper/conf]# cp zoo_sample.cfg zoo.cfg
[root@elk-1 /data/zookeeper/conf]# vim zoo.cfg 
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zookeeper/data
DataLogDir=/data/zookeeper/logs
clientPort=2181

server.1=192.168.81.210:2888:3888
server.2=192.168.81.220:2888:3888
server.3=192.168.81.230:2888:3888

5. 生成节点 id 文件
#节点 id 只能爱护数字
[root@elk-1 /data/zookeeper]# echo 1 > /data/zookeeper/data/myid

复制代码
4.2. 配置 zookeeper- 2 节点

与 zookeeper- 1 节点只有配置文件和节点 id 文件有点不同,其余全一样
[root@elk-2 /data/zookeeper/conf]# cat zoo.cfg 
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zookeeper/data
DataLogDir=/data/zookeeper/logs
clientPort=2181

server.1=192.168.81.210:2888:3888
server.2=192.168.81.220:2888:3888
server.3=192.168.81.230:2888:3888

[root@elk-2 /data/zookeeper/conf]# echo 2 > /data/zookeeper/data/myid

复制代码
4.3. 配置 zookeeper- 3 节点

[root@elk-3 /data/zookeeper/conf]# cat zoo.cfg 
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zookeeper/data
DataLogDir=/data/zookeeper/logs
clientPort=2181

server.1=192.168.81.210:2888:3888
server.2=192.168.81.220:2888:3888
server.3=192.168.81.230:2888:3888

[root@elk-3 /data/zookeeper/conf]# echo 3 > /data/zookeeper/data/myid

复制代码
4.4. 启动所有节点

zookeeper 集群必须保障有两个节点存活,也就是说必须同时要启动两个节点,否则集群将启动不胜利,因而要都批改好配置文件后,再对立启动
[root@elk-1 /data/zookeeper]# ./bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /data/zookeeper/bin/../conf/zoo.cfg
Mode: follower

[root@elk-2 /data/zookeeper]# ./bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /data/zookeeper/bin/../conf/zoo.cfg
Mode: follower

[root@elk-3 /data/zookeeper]# ./bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /data/zookeeper/bin/../conf/zoo.cfg
Mode: leader

复制代码

5. 部署 kafka

留神:不要应用 kafka2.11 版本,有重大的 bug,filebeat 无奈写入数据到 kafka 集群,写入的协定版本不同,存在问题

5.1. 配置 kafka- 1 节点
1. 下载二进制包
[root@elk-1 ~]# wget https://archive.apache.org/dist/kafka/2.0.0/kafka_2.11-2.0.0.tgz

2. 装置 kafka
[root@elk-1 ~/soft]# tar xf kafka_2.13-2.4.0.tgz -C /data/
[root@elk-1 ~]# mv /data/kafka_2.13-2.4.0 /data/kafka

3. 批改配置文件
[root@elk-1 ~]# cd /data/kafka
[root@elk-1 /data/kafka]# vim config/server.properties 
broker.id=1
listeners=PLAINTEXT://192.168.81.210:9092
host.name=192.168.81.210
advertised.listeners=PLAINTEXT://192.168.81.210:9092
advertised.host.name=192.168.81.210
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/data
num.partitions=3
delete.topic.enable=true
auto.create.topics.enable=true 
replica.fetch.max.bytes=5242880
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
message.max.byte=5242880
log.cleaner.enable=true
log.retention.hours=48
log.segment.bytes=1073741824
log.retention.check.interval.ms=15000
zookeeper.connect=192.168.81.210:2181,192.168.81.220:2181,192.168.81.230:2181
zookeeper.connection.timeout.ms=60000
group.initial.rebalance.delay.ms=0

4. 创立数据目录
[root@elk-3 ~]# mkdir /data/kafka/data

复制代码
5.2. 配置 kafka- 2 节点
只是配置文件不同,其余与 kafka- 1 节点操作统一
配置文件须要改的中央:broker.id 改成 2,示意第二个节点 listeners host.name advertised.listeners advertised.host.name 改老本机 ip 地址

[root@elk-2 /data/kafka]# cat config/server.properties 
broker.id=2
listeners=PLAINTEXT://192.168.81.220:9092
host.name=192.168.81.220
advertised.listeners=PLAINTEXT://192.168.81.220:9092
advertised.host.name=192.168.81.220
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/data
num.partitions=3
delete.topic.enable=true
auto.create.topics.enable=true 
replica.fetch.max.bytes=5242880
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
message.max.byte=5242880
log.cleaner.enable=true
log.retention.hours=48
log.segment.bytes=1073741824
log.retention.check.interval.ms=15000
zookeeper.connect=192.168.81.210:2181,192.168.81.220:2181,192.168.81.230:2181
zookeeper.connection.timeout.ms=60000
group.initial.rebalance.delay.ms=0

复制代码
5.3. 配置 kafka- 3 节点
只是配置文件不同,其余与 kafka- 1 节点操作统一
配置文件须要改的中央:broker.id 改成 3,示意第三个节点 listeners host.name advertised.listeners advertised.host.name 改老本机 ip 地址

[root@elk-3 /data/kafka]# cat config/server.properties 
broker.id=3
listeners=PLAINTEXT://192.168.81.230:9092
host.name=192.168.81.230
advertised.listeners=PLAINTEXT://192.168.81.230:9092
advertised.host.name=192.168.81.230
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/data
num.partitions=3
delete.topic.enable=true
auto.create.topics.enable=true 
replica.fetch.max.bytes=5242880
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
message.max.byte=5242880
log.cleaner.enable=true
log.retention.hours=48
log.segment.bytes=1073741824
log.retention.check.interval.ms=15000
zookeeper.connect=192.168.81.210:2181,192.168.81.220:2181,192.168.81.230:2181
zookeeper.connection.timeout.ms=60000
group.initial.rebalance.delay.ms=0

复制代码
5.4. 启动 kafka

[root@elk-1 ~]# /data/kafka/bin/kafka-server-start -daemon /data/kafka/config/server.properties
[root@elk-2 ~]# /data/kafka/bin/kafka-server-start -daemon /data/kafka/config/server.properties
[root@elk-3 ~]# /data/kafka/bin/kafka-server-start -daemon /data/kafka/config/server.properties

复制代码
6. 测试 kafka 与 zookeeper 连贯
kafka 可能产生数据并生产,整个集群就能够应用了

1. 创立一个 topic
[root@elk-1 /data/kafka]# ./bin/kafka-topics.sh --create --zookeeper 192.168.81.210:2181,192.168.81.220:2181,192.168.81.230:2181 --replication-factor 1 --partitions 1 --topic testpic
Created topic "testpic".

2. 查看 topic
[root@elk-1 /data/kafka]# ./bin/kafka-topics.sh --list --zookeeper 192.168.81.210:2181,192.168.81.220:2181,192.168.81.230:2181
testpic

3. 查看 topic 的形容信息
[root@elk-1 /data/kafka]# ./bin/kafka-topics.sh --describe --zookeeper 192.168.81.210:2181,192.168.81.220:2181,192.168.81.230:2181 --topic testpic

4. 应用 kafka-console-producer 控制台生产数据
[root@elk-1 /data/kafka]# ./bin/kafka-console-producer.sh --broker-list 192.168.81.210:9092,192.168.81.220:9092,192.168.81.230:9092 --topic testpic
>test1
>test2
>test3
>test4
>test5
>test6
>test7
>test8
>test9
>test10


5. 应用 kafka-console-consumer 控制台生产数据
[root@elk-1 /data/kafka]# ./bin/kafka-console-consumer.sh --bootstrap-server 192.168.81.210:9092,192.168.81.220:9092,192.168.81.230:9092 --topic testpic --from-beginning
test1
test2
test3
test4
test5
test6
test7
test8
test9
test10


#删除一个 topic
[root@elk-1 /data/kafka]# ./bin/kafka-topics.sh --delete --zookeeper 192.168.81.210:2181  --topic testpic

复制代码

7. 配置 filebeat 收集 nginx、tomcat 日志并存储到 kafka 中

7.1. 装置并配置 nginx 服务

1. 装置 nginx
[root@elk-3 ~]# yum -y install nginx

2. 配置 nginx 日志格局
[root@elk-3 ~]# vim /etc/nginx/nginx.conf
http {
··············
    log_format  main '{" 工夫 ":"$time_iso8601",'
                       '"客户端外网地址":"$http_x_forwarded_for",''" 客户端内网地址 ":"$remote_addr",'
                       '"状态码":$status,''" 传输流量 ":$body_bytes_sent,'
                       '"跳转起源":"$http_referer",''"URL":"$request",'
                       '"浏览器":"$http_user_agent",''" 申请响应工夫 ":$request_time,'
                       '"后端地址":"$upstream_addr"}';

    access_log  /var/log/nginx/access.log  main;
··············
}

2. 启动 nginx
[root@elk-3 ~]# systemctl start nginx
[root@elk-3 ~]# systemctl enable nginx

4. 拜访产生日志查看成果
[root@elk-3 ~]# curl 127.0.0.1
[root@elk-3 ~]# tail /var/log/nginx/access.log 
{"工夫":"2021-07-12T11:29:33+08:00","客户端外网地址":"-","客户端内网地址":"127.0.0.1","状态码":200,"传输流量":4833,"跳转起源":"-","URL":"GET / HTTP/1.1","浏览器":"curl/7.29.0","申请响应工夫":0.000,"后端地址":"-"}

复制代码
7.2. 装置 tomcat 服务

[root@elk-3 ~]# tar xf apache-tomcat-8.5.12.tar.gz -C /data/
[root@elk-3 ~]# mv /data/apache-tomcat-8.5.12/ /data/tomcat
[root@elk-3 ~]# /data/tomcat/bin/startup.sh 
Using CATALINA_BASE:   /data/tomcat
Using CATALINA_HOME:   /data/tomcat
Using CATALINA_TMPDIR: /data/tomcat/temp
Using JRE_HOME:        /usr
Using CLASSPATH:       /data/tomcat/bin/bootstrap.jar:/data/tomcat/bin/tomcat-juli.jar
Tomcat started.

复制代码
7.3. 装置 filebeat 服务

[root@elk-3 ~]# rpm -ivh filebeat-7.6.0-x86_64.rpm ```

复制代码
7.4. 配置 filebeat 收集利用日志并存储到 kafka

1. 配置 filebeat
[root@elk-3 ~]# vim /etc/filebeat/filebeat.yml 
filebeat.inputs:
- type: log                                      #类型为 log
  enabled: true
  paths:                                        #指定日志所在的门路
    - /var/log/nginx/access.log
  json.keys_under_root: true                    #反对 json 格局的日志输入
  json.overwriite_keys: true
  fields:                                       #在日志中减少一个字段,字段为 log_topic,值为 nginx_access,logstash 依据带有这个字段的日志存储到指定的 es 索引库
    log_topic: nginx-access
  tail_files: true                              #开启日志监控,从日志的最初一行开始收集

- type: log
  enabled: true
  paths:
    - /data/tomcat/logs/catalina.out
  multiline.pattern: '^20'                      #收集 tomcat 谬误日志,从第一个 20 到下一个 20 之间的日志整合在一行中显示
  multiline.negate: true
  multiline.match: after
  fields:
    log_topic: tomcat-cata
  tail_files: true

output.kafka:                                   #输入到 kafka 零碎
  enabled: true
  hosts: ["192.168.81.210:9092","192.168.81.220:9092","192.168.81.230:9092"]                           #kafka 的地址
  topic: '%{[fields][log_topic]}'               #指定将日志存储到 kafka 集群的哪个 topic 中,这里的 topic 值是援用在 inputs 中定义的 fields,通过这种形式能够将不同门路的日志别离存储到不同的 topic 中
  partition.round_robin:
    reachable_only: false
  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000

2. 启动 filebeat

[root@elk-3 ~]# systemctl start filebeat
[root@elk-3 ~]# systemctl enable filebeat

复制代码
7.5. 产生程序日志数据察看数据是否存储 kafka
1. 产生程序日志

1. 产生 nginx 日志
[root@elk-3 ~]# ab -n 1000 -c 100 http://127.0.0.1/index.html

2. 产生 tomcat 日志
[root@elk-3 ~]# /data/tomcat/bin/shutdown.sh
[root@elk-3 ~]# /data/tomcat/bin/startup.sh 

复制代码
2. 察看 kafka 中是否创立对应的 topic

[root@elk-1 /data/kafka]# ./bin/kafka-topics.sh --list --zookeeper 192.168.81.210:2181,192.168.81.220:2181,192.168.81.230:2181
__consumer_offsets
nginx-access
testpic
tomcat-cata

#nginx-access 以及 tomcat-cata 的 topic 曾经创立胜利

复制代码
3. 察看 kafka 日志的输入

[root@elk-1 /data/kafka]# tail -f logs/kafkaServer.out

8. 配置 logstash 从 kafka 中读取数据并存储到 es 集群

部署 logstash,配置 logstash 从 kafka 中读取 topic 数据并存储到 es 集群

8.1. 部署 logstash 服务

1. 装置 logstash
[root@elk-3 ~]# rpm -ivh logstash-7.6.0.rpm
复制代码
8.2. 配置 logstash 从 kafka 读取数据存储到 es 集群
[root@elk-3 ~]# cat /etc/logstash/conf.d/in_kafka_to_es.conf 
#从 kafka 中读取日志数据
input {                #数据源端
    kafka {                #类型为 kafka
        bootstrap_servers => ["192.168.81.210:9092,192.168.81.220:9092,192.168.81.230:9092"]            #kafka 集群地址
        topics => ["nginx-access","tomcat-cata"]            #要读取那些 kafka topics
        codec => "json"                                        #解决 json 格局的数据
        auto_offset_reset => "latest"                        #只生产最新的 kafka 数据
    }
}

#解决数据,去掉没用的字段
filter {if[fields][log_topic] == "nginx-access" {            #如果 log_topic 字段为 nginx-access 则进行以下数据处理
        json {                    #json 格局数据处理
             source => "message"            #source 等于 message 的
             remove_field => ["@version","path","beat","input","log","offset","prospector","source","tags"]            #删除指定的字段
        }
        mutate {            #批改数据
             remove_field => ["_index","_id","_type","_version","_score","referer","agent"]            #删除没用的字段
        }
    }
    
    if[fields][log_topic] == "tomcat-cata" {        #如果 log_topic 字段为 tomcat-cata
        grok {                #解析格局
         match => {"message" => "(?< 工夫 >20[0-9]{2}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}) \[(?< 线程名称 >[^\s]{0,})\] (?< 日志等级 >\w+) (?< 类名称 >[^\s]{0,}) (?< 日志详情 >[\W\w]+)"             #将 message 的值减少上一些格局
         }
        }
        mutate {            #批改数据
                 remove_field => ["_index","_id","_type","_version","_score","referer","agent"]            #删除没用的字段
            }  
    }
}

#数据处理后存储 es 集群
output {                #指标端
    if[fields][log_topic] == "nginx-access" {            #如果 log_topic 的字段值为 nginx-access 就存到上面的 es 集群里
        elasticsearch {                        
        action => "index"            #类型为索引
        hosts => ["192.168.81.210:9200","192.168.81.220:9200","192.168.81.230:9200"]        #es 集群地址
        index => "nginx-access-%{+YYYY.MM.dd}"            #存储到 es 集群的哪个索引里
        codec => "json"                        #解决 json 格局的解析
        } 
    }
    
    if[fields][log_topic] == "tomcat-cata" {                #如果 log_topic 的字段值为 tomcat-cata 就存到上面的 es 集群里
        elasticsearch {
        action => "index"                #类型为索引
        hosts => ["192.168.81.210:9200","192.168.81.220:9200","192.168.81.230:9200"]            #es 集群地址
        index => "tomcat-cata-%{+YYYY.MM.dd}"            #存储到 es 集群的哪个索引里
        codec => "json"                        #解决 json 格局的解析
        } 
    }        
}

复制代码
8.3. 启动 logstash 并察看日志

[root@elk-3 ~]# nphup /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/in_kafka_to_es.conf &

8.4. 查看 elasticsearch 集群是否减少了对应的索引库
es 集群曾经生成了 tomcat-cata 以及 nginx-access 索引库
到此为止 logstash 曾经胜利从 kafka 集群读取到日志数据,而后传入到 elasticsearch 集群不同的索引库

9. 在 kibana 上关联 elasticsearch 索引库浏览日志数据

1)点击创立索引
2)填写索引名
3)增加一个工夫筛选字段
4)创立胜利

10. 报错合集

1.es 启动时报错无奈指定被申请的地址
解决办法:仔细检查配置文件,必定是某个地址配置错了,我的就是监听地址的 ip 写错了
2.filebeat 写入数据到 kafka api 版本报错
剖析解决思路:初步断定为 kafka2.11 版本问题导致的,换成 2.13 问题解决

正文完
 0