欢迎大家前往腾讯云 + 社区,获取更多腾讯海量技术实践干货哦~
本文由 michelmu 发表于云 + 社区专栏
Elasticsearch 作为当前主流的全文检索引擎,除了强大的全文检索能力和高扩展性之外,对多种数据源的兼容能力也是其成功的秘诀之一。而 Elasticsearch 强大的数据源兼容能力,主要来源于其核心组件之一的 Logstash, Logstash 通过插件的形式实现了对多种数据源的输入和输出。Kafka 是一种高吞吐量的分布式发布订阅消息系统,是一种常见的数据源,也是 Logstash 支持的众多输入输出源的其中一个。本文将从实践的角度,研究使用 Logstash Kafka Input 插件实现将 Kafka 中数据导入到 Elasticsearch 的过程。
使用 Logstash Kafka 插件连接 Kafka 和 Elasticsearch
1 Logstash Kafka input 插件简介
Logstash Kafka Input 插件使用 Kafka API 从 Kafka topic 中读取数据信息,使用时需要注意 Kafka 的版本及对应的插件版本是否一致。该插件支持通过 SSL 和 Kerveros SASL 方式连接 Kafka。另外该插件提供了 group 管理,并使用默认的 offset 管理策略来操作 Kafka topic。
Logstash 默认情况下会使用一个单独的 group 来订阅 Kafka 消息,每个 Logstash Kafka Consumer 会使用多个线程来增加吞吐量。当然也可以多个 Logstash 实例使用同一个 group_id,来均衡负载。另外建议把 Consumer 的个数设置为 Kafka 分区的大小,以提供更好的性能。
2 测试环境准备
2.1 创建 Elasticsearch 集群
为了简化搭建过程,本文使用了腾讯云 Elasticsearch service。腾讯云 Elasticsearch service 不仅可以实现 Elasticsearch 集群的快速搭建,还提供了内置 Kibana,集群监控,专用主节点,Ik 分词插件等功能,极大的简化了 Elasticsearch 集群的创建和管理工作。
2.2 创建 Kafka 服务
Kafka 服务的搭建采用腾讯云 CKafka 来完成。与 Elasticsearch Service 一样,腾讯云 CKafka 可以实现 Kafka 服务的快速创建,100% 兼容开源 Kafka API(0.9 版本)。
2.3 服务器
除了准备 Elasticsearch 和 Kafka,另外还需要准备一台服务器,用于运行 Logstash 以连接 Elasticsearch 和 Kafka。本文采用腾讯云 CVM 服务器
2.4 注意事项
1) 需要将 Elasticsearch、Kafka 和服务器创建在同一个网络下,以便实现网络互通。由于本文采用的是腾讯云相关的技术服务,因此只需要将 Elasticsearch service,CKafka 和 CVM 创建在同一个私有网路(VPC)下即可。
2) 注意获取 Elasticsearch serivce,CKafka 和 CVM 的内网地址和端口,以便后续服务使用
本次测试中:
服务
ip
port
Elasticsearch service
192.168.0.8
9200
Ckafka
192.168.13.10
9092
CVM
192.168.0.13
–
3 使用 Logstash 连接 Elasticsearch 和 Kafka
3.1 Kafka 准备
可以参考 [CKafka 使用入门]
按照上面的教程
1) 创建名为 kafka_es_test 的 topic
2) 安装 JDK
3) 安装 Kafka 工具包
4) 创建 producer 和 consumer 验证 kafka 功能
3.2 安装 Logstash
Logstash 的安装和使用可以参考 [一文快速上手 Logstash]
3.3 配置 Logstash Kafka input 插件
创建 kafka_test_pipeline.conf 文件内容如下:
input{
kafka{
bootstrap_servers=>”192.168.13.10:9092″
topics=>[“kafka_es_test”]
group_id=>”logstash_kafka_test”
}
}
output{
elasticsearch{
hosts=>[“192.168.0.8:9200″]
}
}
其中定义了一个 kafka 的 input 和一个 elasticsearch 的 output
对于 Kafka input 插件上述三个参数为必填参数,除此之外还有一些对插件行为进行调整的一些参数如:
auto_commit_interval_ms 用于设置 Consumer 提交 offset 给 Kafka 的时间间隔
consumer_threads 用于设置 Consumer 的线程数,默认为 1,实际中应设置与 Kafka Topic 分区数一致
fetch_max_wait_ms 用于指定 Consumer 等待一个 fetch 请求达到 fetch_min_bytes 的最长时间
fetch_min_bytes 用于指定 Consumer fetch 请求应返回的最小数据量
topics_pattern 用于通过正则订阅符合某一规则的一组 topic
更多参数参考:[Kafka Input Configuration Options]
3.4 启动 Logstash
以下操作在 Logstash 根目录中进行
1) 验证配置
./bin/logstash -f kafka_test_pipeline.conf –config.test_and_exit
如有错误,根据提示修改配置文件。若配置正确会得到如下结果
Sending Logstash’s logs to /root/logstash-5.6.13/logs which is now configured via log4j2.properties
[2018-11-11T15:24:01,598][INFO][logstash.modules.scaffold] Initializing module {:module_name=>”netflow”, :directory=>”/root/logstash-5.6.13/modules/netflow/configuration”}
[2018-11-11T15:24:01,603][INFO][logstash.modules.scaffold] Initializing module {:module_name=>”fb_apache”, :directory=>”/root/logstash-5.6.13/modules/fb_apache/configuration”}
Configuration OK
[2018-11-11T15:24:01,746][INFO][logstash.runner] Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash
2) 启动 Logstash
./bin/logstash -f kafka_test_pipeline.conf –config.reload.automatic
观察日志是否有错误提示,并及时处理
3.4 启动 Kafka Producer
以下操作在 Kafka 工具包根目录下进行
./bin/kafka-console-producer.sh –broker-list 192.168.13.10:9092 –topic kafka_es_test
写入测试数据
This is a message
3.5 Kibana 验证结果
登录 Elasticsearch 对应 Kibana, 在 Dev Tools 中进行如下操作
1) 查看索引
GET _cat/indices
可以看到一个名为 logstash-xxx.xx.xx 的索引被创建成功
green open .kibana QUw45tN0SHqeHbF9-QVU6A 1 1 1 0 5.5kb 2.7kb
green open logstash-2018.11.11 DejRdNJVQ1e1MwbyJjJjLw 5 1 1 0 8.7kb 4.3kb
2) 查看写入的数据
GET logstash-2018.11.11/_search
可以看到数据已经被成功写入
{
“took”: 0,
“timed_out”: false,
“_shards”: {
“total”: 5,
“successful”: 5,
“skipped”: 0,
“failed”: 0
},
“hits”: {
“total”: 1,
“max_score”: 1,
“hits”: [
{
“_index”: “logstash-2018.11.11”,
“_type”: “logs”,
“_id”: “AWcBsEegMu-Dkjm1ap3H”,
“_score”: 1,
“_source”: {
“message”: “This is a message”,
“@version”: “1”,
“@timestamp”: “2018-11-11T07:33:09.079Z”
}
}
]
}
}
4 总结
Logstash 作为 Elastic Stack 中数据采集和处理的核心组件,为 Elasticsearch 提供了强大的数据源兼容能力。从测试过程可以看出,使用 Logstash 实现 kafka 和 Elaticsearch 的连接过程相当简单方便。另外 Logstash 的数据处理功能,也使得采用该架构的系统对数据映射和处理有天然的优势。
然而,使用 Logstash 实现 Kafka 和 Elasticsearch 的连接,并不是连接 Kafka 和 Elasticsearch 的唯一方案,另一种常见的方案是使用 Kafka Connect, 可以参考“当 Elasticsearch 遇见 Kafka–Kafka Connect”
相关阅读【每日课程推荐】机器学习实战!快速入门在线广告业务及 CTR 相应知识