1 前言
在笔者最开始维护的日志服务中,日质量较小,没有接入 kafka。随着业务规模扩增,日质量不断增长,接入到日志服务的产品线不断增多,遇到流量高峰,写入到 es 的性能就会降低,cpu 打满,随时都有集群宕机的风险。因此,接入消息队列,进行削峰填谷就迫在眉睫。本文主要介绍在 EFK 的基础上如何接入 kafka,并做到向前兼容。
2 主要内容
- 如何搭建 kafka 集群
- 原有 EFK 升级
3 搭建 kafka 集群
3.1 搭建 zookeeper 集群
主要参考文章:【zookeeper 安装指南】
由于是要线上搭建集群,为避免单点故障,就需要部署至少 3 个节点(取决于多数选举机制)。
3.1.1 下载
进入要下载的版本的目录,选择.tar.gz 文件下载
3.1.2 安装
使用 tar 解压要安装的目录即可,以 3.4.5 版本为例
这里以解压到 /home/work/common,实际安装根据自己的想安装的目录修改(注意如果修改,那后边的命令和配置文件中的路径都要相应修改)
tar -zxf zookeeper-3.4.5.tar.gz -C /home/work/common
3.1.3 配置
在主目录下创建 data 和 logs 两个目录用于存储数据和日志:
cd /home/work/zookeeper-3.4.5
mkdir data mkdir logs
在 conf 目录下新建 zoo.cfg 文件,写入如下配置:
tickTime=2000
dataDir=/home/work/common/zookeeper1/data dataLogDir=/home/work/common/zookeeper1/logs
clientPort=2181
initLimit=5
syncLimit=2
server.1=192.168.220.128:2888:3888
server.2=192.168.222.128:2888:3888
server.3=192.168.223.128:2888:3888
在 zookeeper1 的 data/myid 配置如下:
echo '1' > data/myid
zookeeper2 的 data/myid 配置如下:
echo '2' > data/myid
zookeeper2 的 data/myid 配置如下:
echo '3' > data/myid
3.1.4 启停
进入 bin 目录,启动、停止、重启分和查看当前节点状态(包括集群中是何角色)别执行:
./zkServer.sh start
./zkServer.sh stop
./zkServer.sh restart
./zkServer.sh status
zookeeper 集群搭建完成之后,根据实际情况开始部署 kafka。以部署 2 个 broker 为例。
3.2 搭建 kafka broker 集群
3.2.1 安装
下载并解压包:
curl -L -O http://mirrors.cnnic.cn/apache/kafka/0.9.0.0/kafka_2.10-0.9.0.0.tgz
tar zxvf kafka_2.10-0.9.0.0.tgz
3.2.2 配置
进入 kafka 安装工程根目录编辑 config/server.properties
# 不同的 broker 对应的 id 不能重复
broker.id=1
delete.topic.enable=true
inter.broker.protocol.version=0.10.0.1
log.message.format.version=0.10.0.1
listeners=PLAINTEXT://:9092,SSL://:9093
auto.create.topics.enable=false
ssl.key.password=test
ssl.keystore.location=/home/work/certificate/server-keystore.jks
ssl.keystore.password=test
ssl.truststore.location=/home/work/certificate/server-truststore.jks
ssl.truststore.password=test
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/home/work/data/kafka/log
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=72
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.220.128:2181,192.168.222.128:2181,192.168.223.128:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
3.2.3 启动 kafka
进入 kafka 的主目录
nohup sh bin/kafka-server-start.sh config/server.properties > /dev/null 2>&1 &
3.2.4 连通性测试
首先创建一个 topic:topic_1
sh bin/kafka-topics.sh --create --topic topic_1 --partitions 2 --replication-factor 2 --zookeeper 192.168.220.128:2181
可以先检查一下是否创建成功:
sh bin/kafka-topics.sh --list --zookeeper 192.168.220.128:2181
起两个终端,一个作为 producer,一个作为 consumer
生产消息:
bin/kafka-console-producer.sh --topic topic_1 --broker-list 192.168.220.128:9092,192.168.223.128:9092
消费消息:
sh bin/kafka-console-consumer.sh --bootstrap-server 192.168.220.128:9092,192.168.223.128:9092 --topic topic_1
好了,上面的调通了,万里长征第一步就走完了。
4 EFK 接入 kafka 向前兼容
4.1 准备证书
在之前的 EFK 中是通过证书进行安全加固的,所以要先为接入 kafka 准备一下相关的证书。要确保给 kafka 生成的证书和给 efk 生成的证书是同一个根证书。关于证书的生成,笔者会写文章专门介绍。主要包括:
- 服务端证书
- client 证书
那么作为 kafka 的输入(filebeat)和输出(logstash),都需要 kafka 的 client 证书,kafka 的 broker 需要的是服务端证书。
需要注意的是,filebeat 配置的是 pem 证书,kafka 和 logstash 的 kafka-input 插件用的是 jks 证书~~~ 因此,证书生成工具最好需要能够同时生成这两种证书。
4.2 filebeat 升级
4.2.1 input 日志收集文件
在 fields 中添加 log_topic 字段,指定写入的 topic
fields:
module: sonofelice
type: debug
log_topic: topic_1
language: java
4.2.2 filebeat.yml 文件
output.kafka:
hosts: ["192.168.220.128:9093","192.168.223.128:9093"]
topic: '%{[fields.log_topic]}'
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000
ssl.certificate_authorities: ["/home/work/filebeat/keys/root-ca.pem"]
ssl.certificate: "/home/work/filebeat/keys/kafka.crt.pem"
ssl.key: "/home/work/filebeat/keys/kafka.key.pem"
4.3 logstash 升级
input {
kafka {
bootstrap_servers => "10.100.27.199:9093,10.64.56.75:9093"
group_id => "consumer-group-01"
topics => ["topic_1"]
consumer_threads => 5
decorate_events => false
auto_offset_reset => "earliest"
security_protocol => "SSL"
ssl_keystore_password => "test"
ssl_keystore_location => "/home/work/certificate/kafka-keystore.jks"
ssl_keystore_password => "test"
ssl_truststore_password => "test"
ssl_truststore_location => "/home/work/cvca/certificate/truststore.jks"
codec => json {charset => "UTF-8"}
}
}
那为了向前兼容之前的 filebeat 日志收集,我们在 input 中同时保留 beats 配置,最终配置如下:
input {
kafka {
bootstrap_servers => "10.100.27.199:9093,10.64.56.75:9093"
group_id => "consumer-group-01"
topics => ["topic_1"]
consumer_threads => 5
decorate_events => false
auto_offset_reset => "earliest"
security_protocol => "SSL"
ssl_keystore_password => "test"
ssl_keystore_location => "/home/work/certificate/kafka-keystore.jks"
ssl_keystore_password => "test"
ssl_truststore_password => "test"
ssl_truststore_location => "/home/work/cvca/certificate/truststore.jks"
codec => json {charset => "UTF-8"}
}
beats {
port => 5044
client_inactivity_timeout => 600
ssl => true
ssl_certificate_authorities => ["/home/work/certificate/chain-ca.pem"]
ssl_certificate => "/home/work/certificate/server.crt.pem"
ssl_key => "/home/work/certificate/server.key.pem"
ssl_verify_mode => "force_peer"
}
}
需要特别注意的是,对于 kafka 的 input 来说,codec 并不是默认为 json 的,导致之前用 beats 能成功解析到 es 的字段都无法解析成功,所以务必加上 codec 的配置。
至此,改造升级的点应该没有太大的坑了,也能够向前兼容,接入端自行切换即可。