一文学会目前最火热的大数据技术

8次阅读

共计 9215 个字符,预计需要花费 24 分钟才能阅读完成。

欢迎大家前往腾讯云 + 社区,获取更多腾讯海量技术实践干货哦~
本文由 michelmu 发表于云 + 社区专栏

Elasticsearch 是当前主流的分布式大数据存储和搜索引擎,可以为用户提供强大的全文本检索能力,广泛应用于日志检索,全站搜索等领域。Logstash 作为 Elasicsearch 常用的实时数据采集引擎,可以采集来自不同数据源的数据,并对数据进行处理后输出到多种输出源,是 Elastic Stack 的重要组成部分。本文从 Logstash 的工作原理,使用示例,部署方式及性能调优等方面入手,为大家提供一个快速入门 Logstash 的方式。文章最后也给出了一些深入了解 Logstash 的的链接,以方便大家根据需要详细了解。
Logstash 简介
1 Logstash 工作原理
1.1 处理过程
Logstash 处理过程
如上图,Logstash 的数据处理过程主要包括:Inputs, Filters, Outputs 三部分,另外在 Inputs 和 Outputs 中可以使用 Codecs 对数据格式进行处理。这四个部分均以插件形式存在,用户通过定义 pipeline 配置文件,设置需要使用的 input,filter,output, codec 插件,以实现特定的数据采集,数据处理,数据输出等功能

(1)Inputs:用于从数据源获取数据,常见的插件如 file, syslog, redis, beats 等[详细参考]
(2)Filters:用于处理数据如格式转换,数据派生等,常见的插件如 grok, mutate, drop, clone, geoip 等[详细参考]
(3)Outputs:用于数据输出,常见的插件如 elastcisearch,file, graphite, statsd 等[详细参考]
(4)Codecs:Codecs 不是一个单独的流程,而是在输入和输出等插件中用于数据转换的模块,用于对数据进行编码处理,常见的插件如 json,multiline[详细参考]

可以点击每个模块后面的详细参考链接了解该模块的插件列表及对应功能
1.2 执行模型:

(1)每个 Input 启动一个线程,从对应数据源获取数据
(2)Input 会将数据写入一个队列:默认为内存中的有界队列(意外停止会导致数据丢失)。为了防止数丢失 Logstash 提供了两个特性:Persistent Queues:通过磁盘上的 queue 来防止数据丢失 Dead Letter Queues:保存无法处理的 event(仅支持 Elasticsearch 作为输出源)
(3)Logstash 会有多个 pipeline worker, 每一个 pipeline worker 会从队列中取一批数据,然后执行 filter 和 output(worker 数目及每次处理的数据量均由配置确定)

2 Logstash 使用示例
2.1 Logstash Hello world
第一个示例 Logstash 将采用标准输入和标准输出作为 input 和 output,并且不指定 filter

(1)下载 Logstash 并解压(需要预先安装 JDK8)
(2)cd 到 Logstash 的根目录,并执行启动命令如下:

cd logstash-6.4.0
bin/logstash -e ‘input {stdin {} } output {stdout {} }’

(3)此时 Logstash 已经启动成功,- e 表示在启动时直接指定 pipeline 配置,当然也可以将该配置写入一个配置文件中,然后通过指定配置文件来启动
(4)在控制台输入:hello world,可以看到如下输出:

{
“@version” => “1”,
“host” => “localhost”,
“@timestamp” => 2018-09-18T12:39:38.514Z,
“message” => “hello world”
}
Logstash 会自动为数据添加 @version, host, @timestamp 等字段
在这个示例中 Logstash 从标准输入中获得数据,仅在数据中添加一些简单字段后将其输出到标准输出。
2.2 日志采集
这个示例将采用 Filebeat input 插件 (Elastic Stack 中的轻量级数据采集程序) 采集本地日志,然后将结果输出到标准输出

(1)下载示例使用的日志文件[地址],解压并将日志放在一个确定位置
(2)安装 filebeat,配置并启动[参考]

filebeat.yml 配置如下(paths 改为日志实际位置,不同版本 beats 配置可能略有变化,请根据情况调整)
filebeat.prospectors:
– input\_type: log
paths:
– /path/to/file/logstash-tutorial.log
output.logstash:
hosts: “localhost:5044”
启动命令:
./filebeat -e -c filebeat.yml -d “publish”
(3)配置 logstash 并启动
1)创建 first-pipeline.conf 文件内容如下(该文件为 pipeline 配置文件,用于指定 input,filter, output 等):
input {
beats {
port => “5044”
}
}
#filter {
#}
output {
stdout {codec => rubydebug}
}
codec => rubydebug 用于美化输出[参考]
2)验证配置(注意指定配置文件的路径):
./bin/logstash -f first-pipeline.conf –config.test_and_exit
3)启动命令:
./bin/logstash -f first-pipeline.conf –config.reload.automatic
–config.reload.automatic 选项启用动态重载配置功能
4)预期结果:
可以在 Logstash 的终端显示中看到,日志文件被读取并处理为如下格式的多条数据
{
“@timestamp” => 2018-10-09T12:22:39.742Z,
“offset” => 24464,
“@version” => “1”,
“input_type” => “log”,
“beat” => {
“name” => “VM_136_9_centos”,
“hostname” => “VM_136_9_centos”,
“version” => “5.6.10”
},
“host” => “VM_136_9_centos”,
“source” => “/data/home/michelmu/workspace/logstash-tutorial.log”,
“message” => “86.1.76.62 – – [04/Jan/2015:05:30:37 +0000] \”GET /style2.css HTTP/1.1\” 200 4877 \”http://www.semicomplete.com/projects/xdotool/\” \”Mozilla/5.0 (X11; Linux x86_64; rv:24.0) Gecko/20140205 Firefox/24.0 Iceweasel/24.3.0\””,
“type” => “log”,
“tags” => [
[0] “beats_input_codec_plain_applied”
]
}
相对于示例 2.1,该示例使用了 filebeat input 插件从日志中获取一行记录,这也是 Elastic stack 获取日志数据最常见的一种方式。另外该示例还采用了 rubydebug codec 对输出的数据进行显示美化。
2.3 日志格式处理
可以看到虽然示例 2.2 使用 filebeat 从日志中读取数据,并将数据输出到标准输出,但是日志内容作为一个整体被存放在 message 字段中,这样对后续存储及查询都极为不便。可以为该 pipeline 指定一个 grok filter 来对日志格式进行处理
(1)在 first-pipeline.conf 中增加 filter 配置如下
input {
beats {
port => “5044”
}
}
filter {
grok {
match => {“message” => “%{COMBINEDAPACHELOG}”}
}
}
output {
stdout {codec => rubydebug}
}
(2)到 filebeat 的根目录下删除之前上报的数据历史(以便重新上报数据), 并重启 filebeat
sudo rm data/registry
sudo ./filebeat -e -c filebeat.yml -d “publish”
(3)由于之前启动 Logstash 设置了自动更新配置,因此 Logstash 不需要重新启动,这个时候可以获取到的日志数据如下:
{
“request” => “/style2.css”,
“agent” => “\”Mozilla/5.0 (X11; Linux x86_64; rv:24.0) Gecko/20140205 Firefox/24.0 Iceweasel/24.3.0\””,
“offset” => 24464,
“auth” => “-“,
“ident” => “-“,
“input_type” => “log”,
“verb” => “GET”,
“source” => “/data/home/michelmu/workspace/logstash-tutorial.log”,
“message” => “86.1.76.62 – – [04/Jan/2015:05:30:37 +0000] \”GET /style2.css HTTP/1.1\” 200 4877 \”http://www.semicomplete.com/projects/xdotool/\” \”Mozilla/5.0 (X11; Linux x86_64; rv:24.0) Gecko/20140205 Firefox/24.0 Iceweasel/24.3.0\””,
“type” => “log”,
“tags” => [
[0] “beats_input_codec_plain_applied”
],
“referrer” => “\”http://www.semicomplete.com/projects/xdotool/\””,
“@timestamp” => 2018-10-09T12:24:21.276Z,
“response” => “200”,
“bytes” => “4877”,
“clientip” => “86.1.76.62”,
“@version” => “1”,
“beat” => {
“name” => “VM_136_9_centos”,
“hostname” => “VM_136_9_centos”,
“version” => “5.6.10”
},
“host” => “VM_136_9_centos”,
“httpversion” => “1.1”,
“timestamp” => “04/Jan/2015:05:30:37 +0000”
}
可以看到 message 中的数据被详细解析出来了
2.4 数据派生和增强
Logstash 中的一些 filter 可以根据现有数据生成一些新的数据,如 geoip 可以根据 ip 生成经纬度信息
(1)在 first-pipeline.conf 中增加 geoip 配置如下
input {
beats {
port => “5044”
}
}
filter {
grok {
match => {“message” => “%{COMBINEDAPACHELOG}”}
}
geoip {
source => “clientip”
}
}
output {
stdout {codec => rubydebug}
}

(2)如 2.3 一样清空 filebeat 历史数据,并重启
(3)当然 Logstash 仍然不需要重启,可以看到输出变为如下:

{
“request” => “/style2.css”,
“agent” => “\”Mozilla/5.0 (X11; Linux x86_64; rv:24.0) Gecko/20140205 Firefox/24.0 Iceweasel/24.3.0\””,
“geoip” => {
“timezone” => “Europe/London”,
“ip” => “86.1.76.62”,
“latitude” => 51.5333,
“continent_code” => “EU”,
“city_name” => “Willesden”,
“country_name” => “United Kingdom”,
“country_code2” => “GB”,
“country_code3” => “GB”,
“region_name” => “Brent”,
“location” => {
“lon” => -0.2333,
“lat” => 51.5333
},
“postal_code” => “NW10”,
“region_code” => “BEN”,
“longitude” => -0.2333
},
“offset” => 24464,
“auth” => “-“,
“ident” => “-“,
“input_type” => “log”,
“verb” => “GET”,
“source” => “/data/home/michelmu/workspace/logstash-tutorial.log”,
“message” => “86.1.76.62 – – [04/Jan/2015:05:30:37 +0000] \”GET /style2.css HTTP/1.1\” 200 4877 \”http://www.semicomplete.com/projects/xdotool/\” \”Mozilla/5.0 (X11; Linux x86_64; rv:24.0) Gecko/20140205 Firefox/24.0 Iceweasel/24.3.0\””,
“type” => “log”,
“tags” => [
[0] “beats_input_codec_plain_applied”
],
“referrer” => “\”http://www.semicomplete.com/projects/xdotool/\””,
“@timestamp” => 2018-10-09T12:37:46.686Z,
“response” => “200”,
“bytes” => “4877”,
“clientip” => “86.1.76.62”,
“@version” => “1”,
“beat” => {
“name” => “VM_136_9_centos”,
“hostname” => “VM_136_9_centos”,
“version” => “5.6.10”
},
“host” => “VM_136_9_centos”,
“httpversion” => “1.1”,
“timestamp” => “04/Jan/2015:05:30:37 +0000”
}
可以看到根据 ip 派生出了许多地理位置信息数据
2.5 将数据导入 Elasticsearch
Logstash 作为 Elastic stack 的重要组成部分,其最常用的功能是将数据导入到 Elasticssearch 中。将 Logstash 中的数据导入到 Elasticsearch 中操作也非常的方便,只需要在 pipeline 配置文件中增加 Elasticsearch 的 output 即可。

(1)首先要有一个已经部署好的 Logstash,当然可以使用腾讯云快速创建一个 Elasticsearch 创建地址

(2)在 first-pipeline.conf 中增加 Elasticsearch 的配置,如下

input {
beats {
port => “5044”
}
}
filter {
grok {
match => {“message” => “%{COMBINEDAPACHELOG}”}
}
geoip {
source => “clientip”
}
}
output {
elasticsearch {
hosts => [“localhost:9200”]
}
}

(3)清理 filebeat 历史数据,并重启
(4)查询 Elasticsearch 确认数据是否正常上传(注意替换查询语句中的日期)

curl -XGET ‘http://172.16.16.17:9200/logstash-2018.10.09/_search?pretty&q=response=200’
(5)如果 Elasticsearch 关联了 Kibana 也可以使用 kibana 查看数据是否正常上报
kibana 图示
Logstash 提供了大量的 Input, filter, output, codec 的插件,用户可以根据自己的需要,使用一个或多个组件实现自己的功能,当然用户也可以自定义插件以实现更为定制化的功能。自定义插件可以参考[logstash input 插件开发]
3 部署 Logstash
演示过如何快速使用 Logstash 后,现在详细讲述一下 Logstash 的部署方式。
3.1 安装

安装 JDK:Logstash 采用 JRuby 编写,运行需要 JDK 环境,因此安装 Logstash 前需要先安装 JDK。(当前 6.4 仅支持 JDK8)

安装 Logstash:可以采用直接下载压缩包方式安装,也通过 APT 或 YUM 安装,另外 Logstash 支持安装到 Docker 中。[Logstash 安装参考]

安装 X -PACK:在 6.3 及之后版本 X -PACK 会随 Logstash 安装,在此之前需要手动安装[参考链接]

3.2 目录结构
logstash 的目录主要包括:根目录、bin 目录、配置目录、日志目录、插件目录、数据目录
不同安装方式各目录的默认位置参考[此处]
3.3 配置文件

Pipeline 配置文件,名称可以自定义,在启动 Logstash 时显式指定,编写方式可以参考前面示例,对于具体插件的配置方式参见具体插件的说明(使用 Logstash 时必须配置):用于定义一个 pipeline,数据处理方式和输出源
Settings 配置文件(可以使用默认配置):在使用 Logstash 时可以不用设置,用于性能调优,日志记录等 – logstash.yml:用于控制 logstash 的执行过程[参考链接] – pipelines.yml: 如果有多个 pipeline 时使用该配置来配置多 pipeline 执行[参考链接] – jvm.options:jvm 的配置 – log4j2.properties:log4j 2 的配置,用于记录 logstash 运行日志[参考链接] – startup.options: 仅适用于 Lniux 系统,用于设置系统启动项目!
为了保证敏感配置的安全性,logstash 提供了配置加密功能[参考链接]

3.4 启动关闭方式
3.4.1 启动

命令行启动
在 debian 和 rpm 上以服务形式启动

在 docker 中启动 3.4.2 关闭
关闭 Logstash
Logstash 的关闭时会先关闭 input 停止输入,然后处理完所有进行中的事件,然后才完全停止,以防止数据丢失,但这也导致停止过程出现延迟或失败的情况。

3.5 扩展 Logstash
当单个 Logstash 无法满足性能需求时,可以采用横向扩展的方式来提高 Logstash 的处理能力。横向扩展的多个 Logstash 相互独立,采用相同的 pipeline 配置,另外可以在这多个 Logstash 前增加一个 LoadBalance,以实现多个 Logstash 的负载均衡。
4 性能调优
[详细调优参考]

(1)Inputs 和 Outputs 的性能:当输入输出源的性能已经达到上限,那么性能瓶颈不在 Logstash,应优先对输入输出源的性能进行调优。

(2)
系统性能指标

CPU:确定 CPU 使用率是否过高,如果 CPU 过高则先查看 JVM 堆空间使用率部分,确认是否为 GC 频繁导致,如果 GC 正常,则可以通过调节 Logstash worker 相关配置来解决。

内存:由于 Logstash 运行在 JVM 上,因此注意调整 JVM 堆空间上限,以便其有足够的运行空间。另外注意 Logstash 所在机器上是否有其他应用占用了大量内存,导致 Logstash 内存磁盘交换频繁。

I/ O 使用率:1)磁盘 IO:磁盘 IO 饱和可能是因为使用了会导致磁盘 IO 饱和的创建(如 file output), 另外 Logstash 中出现错误产生大量错误日志时也会导致磁盘 IO 饱和。Linux 下可以通过 iostat, dstat 等查看磁盘 IO 情况 2)网络 IO:网络 IO 饱和一般发生在使用有大量网络操作的插件时。linux 下可以使用 dstat 或 iftop 等查看网络 IO 情况

(3)
JVM 堆检查

如果 JVM 堆大小设置过小会导致 GC 频繁,从而导致 CPU 使用率过高
快速验证这个问题的方法是 double 堆大小,看性能是否有提升。注意要给系统至少预留 1GB 的空间。
为了精确查找问题可以使用 jmap 或 VisualVM。[参考]
设置 Xms 和 Xmx 为相同值,防止堆大小在运行时调整,这个过程非常消耗性能。

(4)Logstash worker 设置:worker 相关配置在 logstash.yml 中,主要包括如下三个:– pipeline.workers:该参数用以指定 Logstash 中执行 filter 和 output 的线程数,当如果发现 CPU 使用率尚未达到上限,可以通过调整该参数,为 Logstash 提供更高的性能。建议将 Worker 数设置适当超过 CPU 核数可以减少 IO 等待时间对处理过程的影响。实际调优中可以先通过 - w 指定该参数,当确定好数值后再写入配置文件中。– pipeline.batch.size: 该指标用于指定单个 worker 线程一次性执行 flilter 和 output 的 event 批量数。增大该值可以减少 IO 次数,提高处理速度,但是也以为这增加内存等资源的消耗。当与 Elasticsearch 联用时,该值可以用于指定 Elasticsearch 一次 bluck 操作的大小。– pipeline.batch.delay: 该指标用于指定 worker 等待时间的超时时间,如果 worker 在该时间内没有等到 pipeline.batch.size 个事件,那么将直接开始执行 filter 和 output 而不再等待。

结束语
Logstash 作为 Elastic Stack 的重要组成部分,在 Elasticsearch 数据采集和处理过程中扮演着重要的角色。本文通过简单示例的演示和 Logstash 基础知识的铺陈,希望可以帮助初次接触 Logstash 的用户对 Logstash 有一个整体认识,并能较为快速上手。对于 Logstash 的高阶使用,仍需要用户在使用过程中结合实际情况查阅相关资源深入研究。当然也欢迎大家积极交流,并对文中的错误提出宝贵意见。
MORE:

Logstash 数据处理常见示例
Logstash 日志相关配置参考
Kibana 管理 Logstash pipeline 配置
LogstashModule
监控 Logstash

相关阅读大数据基础系列之 spark 的监控体系介绍 Neutron lbaas 代理 https 实践【每日课程推荐】机器学习实战!快速入门在线广告业务及 CTR 相应知识

正文完
 0