Logstash- 数据流引擎
作者 | WenasWei
一 Logstash
Logstash 是具备实时流水线性能的开源数据收集引擎。Logstash 能够动静对立来自不同起源的数据,并将数据标准化到您抉择的指标地位。革除所有数据并使其民主化,以用于各种高级上游剖析和可视化用例。
1.1 Logstash 简介
Logstash 是一个数据流引擎:
- 它是用于数据物流的开源流式 ETL(Extract-Transform-Load)引擎
- 在几分钟内建设数据流管道
- 具备程度可扩大及韧性且具备自适应缓冲
- 不可知的数据源
- 具备 200 多个集成和处理器的插件生态系统
- 应用 Elastic Stack 监督和治理部署
官网介绍:Logstash is an open source data collection engine with real-time pipelining capabilities。简略来说 logstash 就是一根具备实时数据传输能力的管道,负责将数据信息从管道的输出端传输到管道的输入端;与此同时这根管道还能够让你依据本人的需要在两头加上滤网,Logstash 提供里很多功能强大的滤网以满足你的各种利用场景。
1.2 数据处理
Logstash 是一个功能强大的工具,可与各种部署集成。它提供了大量插件,可帮忙你解析,丰盛,转换和缓冲来自各种起源的数据。如果你的数据须要 Beats 中没有的其余解决,则须要将 Logstash 增加到部署中。
当下最为风行的数据源:
Logstash 能够摄入日志,文件,指标或者网路实在数据。通过 Logstash 的解决,变为能够应用的 Web
Apps 能够耗费的数据,也能够存储于数据中心,或变为其它的流式数据:
- Logstash 能够很不便地和 Beats 一起单干,这也是被举荐的办法
- Logstash 也能够和那些驰名的云厂商的服务一起单干解决它们的数据
- 它也能够和最为同样的信息音讯队列,比方 redis 或 kafka 一起合作
- Logstash 也能够应用 JDBC 来拜访 RDMS 数据
- 它也能够和 IoT 设施一起解决它们的数据
- Logstash 不仅仅能够把数据传送到 Elasticsearch,而且它还能够把数据发送至很多其它的目的地,并作为它们的输出源做进一步的解决
二 Logstash 零碎架构
Logstash 蕴含 3 个次要局部:输出(inputs),过滤器(filters)和输入(outputs)
Logstash 的事件(logstash 将数据流中等每一条数据称之为一个 event)解决流水线有三个次要角色实现:inputs –> filters –> outputs:
- inpust:必须,负责产生事件(Inputs generate events),罕用:File、syslog、redis、kakfa、beats(如:Filebeats)
- filters:可选,负责数据处理与转换(filters modify them),罕用:grok、mutate、drop、clone、geoip
- outpus:必须,负责数据输入(outputs ship them elsewhere),罕用:elasticsearch、file、graphite、kakfa、statsd
三 Logstash 装置
3.1 环境清单
- 操作系统:Linux #56-Ubuntu SMP Tue Jun 4 22:49:08 UTC 2019 x86_64
- Logstash 版本:logstash-6.2.4
- Jdk 版本:1.8.0_152
3.2 Linux 装置 JDK
3.2.1 解压缩并挪动到指定目录(约定的目录:/usr/local)
(1)解压缩
tar -zxvf jdk-8u152-linux-x64.tar.gz
(2)创立目录
mkdir -p /usr/local/java
(3)挪动安装包
mv jdk1.8.0_152/ /usr/local/java/
(4)设置所有者
chown -R root:root /usr/local/java/
3.2.2 配置环境变量
(1)配置零碎环境变量
vi /etc/environment
(2)增加如下语句
PATH="/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games"
export JAVA_HOME=/usr/local/java/jdk1.8.0_152
export JRE_HOME=/usr/local/java/jdk1.8.0_152/jre
export CLASSPATH=$CLASSPATH:$JAVA_HOME/lib:$JAVA_HOME/jre/lib
(3)配置用户环境变量
nano /etc/profile
(4)增加如下语句(肯定要放两头)
if ["$PS1"]; then
if ["$BASH"] && ["$BASH" != "/bin/sh"]; then
# The file bash.bashrc already sets the default PS1.
# PS1='\h:\w\$'
if [-f /etc/bash.bashrc]; then
. /etc/bash.bashrc
fi
else
if ["`id -u`" -eq 0]; then
PS1='#'
else
PS1='$'
fi
fi
fi
export JAVA_HOME=/usr/local/java/jdk1.8.0_152
export JRE_HOME=/usr/local/java/jdk1.8.0_152/jre
export CLASSPATH=$CLASSPATH:$JAVA_HOME/lib:$JAVA_HOME/jre/lib
export PATH=$JAVA_HOME/bin:$JAVA_HOME/jre/bin:$PATH:$HOME/bin
if [-d /etc/profile.d]; then
for i in /etc/profile.d/*.sh; do
if [-r $i]; then
. $i
fi
done
unset i
fi
(5)使用户环境变量失效
source /etc/profile
(6)测试是否装置胜利
$ java -version
java version "1.8.0_152"
Java(TM) SE Runtime Environment (build 1.8.0_152-b16)
Java HotSpot(TM) 64-Bit Server VM (build 25.152-b16, mixed mode)
3.3 装置 Logstash
3.3.1 创立装置目录
$ sudo mkdir /usr/local/logstash
3.3.2 下载 Logstash 安装文件
$ wget -P /usr/local/logstash https://artifacts.elastic.co/downloads/logstash/logstash-6.2.4.tar.gz
3.3.2 解压缩安装文件
$ cd /usr/local/logstash/
$ sudo tar -zxvf logstash-6.2.4.tar.gz
3.3.3 测试装置是否胜利
测试: 疾速启动,规范输入输出作为 input 和 output,没有 filter
$ cd logstash-6.2.4/
$ ./bin/logstash -e 'input {stdin {} } output {stdout {} }'
Sending Logstash's logs to /usr/local/logstash/logstash-6.2.4/logs which is now configured via log4j2.properties
[2021-05-27T00:22:28,729][INFO][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"/usr/local/logstash/logstash-6.2.4/modules/fb_apache/configuration"}
[2021-05-27T00:22:28,804][INFO][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"/usr/local/logstash/logstash-6.2.4/modules/netflow/configuration"}
[2021-05-27T00:22:29,827][WARN][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2021-05-27T00:22:30,979][INFO][logstash.runner] Starting Logstash {"logstash.version"=>"6.2.4"}
[2021-05-27T00:22:31,821][INFO][logstash.agent] Successfully started Logstash API endpoint {:port=>9600}
[2021-05-27T00:22:36,463][INFO][logstash.pipeline] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>1, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
[2021-05-27T00:22:36,690][INFO][logstash.pipeline] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0x55a5abea run>"}
The stdin plugin is now waiting for input:
[2021-05-27T00:22:36,853][INFO][logstash.agent] Pipelines running {:count=>1, :pipelines=>["main"]}
## 此时命令窗口停留在期待输出状态,键盘键入任意字符 ##
hello world
## 下方是 Logstash 输入到成果 ##
{
"@timestamp" => 2021-05-26T16:22:52.527Z,
"host" => "*******",
"message" => "hello world",
"@version" => "1"
}
四 Logstash 参数与配置
4.1 罕用启动参数
参数 | 阐明 | 举例 |
---|---|---|
-e | 立刻执行,应用命令行里的配置参数启动实例 | ./bin/logstash -e‘input {stdin {}} output {stdout {}}’ |
-f | 指定启动实例的配置文件 | ./bin/logstash -f config/test.conf |
-t | 测试配置文件的正确性 | ./bin/logstash-f config/test.conf -t |
-l | 指定日志文件名称 | ./bin/logstash-f config/test.conf -l logs/test.log |
-w | 指定 filter 线程数量,默认线程数是 5 | ./bin/logstash-f config/test.conf -w 8 |
4.2 配置文件构造及语法
(1)区段
Logstash 通过 {} 来定义区域,区域内能够定义插件,一个区域内能够定义多个插件,如下:
input {stdin {}
beats {port => 5044}
}
(2)数据类型
Logstash 仅反对大量的数据类型:
- Boolean:ssl_enable => true
- Number:port => 33
- String:name =>“Hello world”
- Commonts:# this is a comment
(3)字段援用
Logstash 数据流中的数据被称之为 Event 对象,Event 以 JSON 构造形成,Event 的属性被称之为字段,如果你像在配置文件中援用这些字段,只须要把字段的名字写在中括号 [] 里就行了,如 [type],对于嵌套字段每层字段名称都写在[] 里就能够了,比方:tags;除此之外,对于 Logstash 的 arrag 类型反对下标与倒序下表,如:tags[0],tags[-1]。
(4)条件判断
Logstash 反对上面的操作符:
- equality:==, !=, <, >, <=, >=
- regexp:=~, !~
- inclusion:in, not in
- boolean:and, or, nand, xor
- unary:!
例如:
if EXPRESSION {...} else if EXPRESSION {...} else {...}
(5)环境变量援用
Logstash 反对援用零碎环境变量,环境变量不存在时能够设置默认值,例如:
export TCP_PORT=12345
input {
tcp {port => "${TCP_PORT:54321}"
}
}
4.3 罕用输出插件(Input plugin)
输出插件包含有以下多种,详情查看官网文档 - 罕用输出插件:
- elasticsearch
- exec
- file
- github
- http
- jdbc
- jms
- jmx
- kafka
- log4j
- rabbitmq
- redis
- tcp
- udp
- unix
- websocket
4.3.1 File 读取插件
文件读取插件次要用来抓取文件的变动信息,将变动信息封装成 Event 过程解决或者传递。
-
配置事例
input file {path => ["/var/log/*.log", "/var/log/message"] type => "system" start_position => "beginning" } }
- 罕用参数
参数名称 | 类型 | 默认值 | 形容信息 |
---|---|---|---|
add_field | hash | {} | 用于向 Event 中增加字段 |
close_older | number | 3600 | 设置文件多久秒内没有更新就关掉对文件的监听 |
codec | string | “plain” | 输出数据之后对数据进行解码 |
delimiter | string | “\n” | 文件内容的行分隔符,默认依照行进行 Event 封装 |
discover_interval | number | 15 | 距离多少秒查看一下 path 匹配对门路下是否有新文件产生 |
enable_metric | boolean | true | |
exclude | array | 无 | path 匹配的文件中指定例外,如:path =>“/var/log/“;exclude =>”.gz” |
id | string | 无 | 辨别两个雷同类型的插件,比方两个 filter,在应用 Monitor API 监控是能够辨别,倡议设置上 ID |
ignore_older | number | 无 | 疏忽历史批改,如果设置 3600 秒,logstash 只会发现一小时内被批改过的文件,一小时之前批改的文件的变动不会被读取,如果再次批改该文件,所有的变动都会被读取,默认被禁用 |
max_open_files | number | 无 | logstash 能够同时监控的文件个数(同时关上的 file_handles 个数),如果你须要解决多于这个数量多文件,能够应用“close_older”去敞开一些文件 |
path | array | 无 | 必须设置项,用于匹配被监控的文件,如“/var/log/.log”或者“/var/log//.log”,必须应用绝对路径 |
sincedb_path | string | 无 | 文件读取记录,必须指定一个文件而不是目录,文件中保留没个被监控的文件等以后 inode 和 byteoffset,默认寄存地位“$HOME/.sincedb*” |
sincedb_write_interval | number | 15 | 距离多少秒写一次 sincedb 文件 |
start_position | “beginning”,“end” | ”end” | 从文件等结尾还是结尾读取文件内容,默认是结尾,如果须要导入文件中的老数据,能够设置为“beginning”,该选项只在第一次启动 logstash 时无效,如果文件曾经存在于 sincedb 的记录内,则此配置有效 |
stat_interval | number | 1 | 距离多少秒检查一下文件是否被批改,加大此参数将升高零碎负载,然而减少了发现新日志的间隔时间 |
tags | array | 无 | 能够在 Event 中减少标签,以便于在后续的解决流程中应用 |
type | string | Event 的 type 字段,如果采纳 elasticsearch 做 store,在默认状况下将作为 elasticsearch 的 type |
4.3.2 TCP 监听插件
TCP 插件有两种工作模式,“Client”和“Server”,别离用于发送网络数据和监听网络数据。
-
配置事例
tcp {port => 41414}
- 罕用参数(空 => 同上)
参数名称 | 类型 | 默认值 | 形容信息 |
---|---|---|---|
add_field | |||
codec | |||
enable_metric | |||
host | |||
id | |||
mode | “server”、“client” | “server” | “server”监听“client”的连贯申请,“client”连贯“server” |
port | number | 无 | 必须设置项,“server”模式时指定监听端口,“client”模式指定连贯端口 |
proxy_protocol | boolean | false | Proxyprotocol support, only v1 is supported at this time |
ssl_cert | |||
ssl_enable | |||
ssl_extra_chain_certs | |||
ssl_key | |||
ssl_key_passphrase | |||
ssl_verify | |||
tags | |||
type |
4.3.3 Redis 读取插件
用于读取 Redis 中缓存的数据信息。
-
配置事例
input { redis { host => "127.0.0.1" port => 6379 data_type => "list" key => "logstash-list" } }
- 罕用参数(空 => 同上)
参数名称 | 类型 | 默认值 | 形容信息 |
---|---|---|---|
add_field | |||
batch_count | number | 125 | 应用 redis 的 batch 个性,须要 redis2.6.0 或者更新的版本 |
codec | |||
data_type | list,channel,pattern_channel | 无 | 必须设置项,依据设置不同,订阅 redis 应用不同的命令,顺次是:BLPOP、SUBSCRIBE、PSUBSCRIBE |
db | number | 0 | 指定应用的 redis 数据库 |
enable_metric | |||
host | string | 127.0.0.1 | redis 服务地址 |
id | |||
key | string | 无 | 必须设置项,reidslist 或者 channel 的 key 名称 |
password | string | 无 | redis 明码 |
port | number | 6379 | redis 连贯端口号 |
tags | |||
threads | number | 1 | |
timeout | number | 5 | redis 服务连贯超时工夫,单位:秒 |
留神:
data_type 须要留神的是“channel”和“pattern_channel”是播送类型,雷同的数据会同时发送给订阅了该 channel 的 logstash,也就是说在 logstash 集群环境下会呈现数据反复,集群中的每一个节点都将收到同样的数据,然而在单节点状况下,“pattern_channel”能够同时定于满足 pattern 的多个 key
4.3.4 Kafka 读取插件
用于读取 Kafka 中推送的主题数据信息。
-
配置事例
input { kafka { bootstrap_servers => "kafka-01:9092,kafka-02:9092,kafka-03:9092" topics_pattern => "elk-.*" consumer_threads => 5 decorate_events => true codec => "json" auto_offset_reset => "latest" group_id => "logstash1"##logstash 集群需雷同 } }
- 罕用参数:
参数名称 | 类型 | 默认值 | 形容信息 |
---|---|---|---|
bootstrap_servers | string | localhost:9092 | Kafka 列表,用于建设与集群的初始连贯 |
topics_pattern | string | 要订阅的主题正则表达式模式。应用此配置时,主题配置将被疏忽。 | |
consumer_threads | number | 并发线程数,现实状况下,您应该领有与分区数量一样多的线程 | |
decorate_events | string | none | 可承受的值为:none/basic/extended/false |
codec | codec | plain | 用于输出数据的编解码器 |
auto_offset_reset | string | 当 Kafka 初始偏移量 | |
group_id | String | logstash | 该消费者所属的组的标识符 |
留神:
- auto_offset_reset: earliest- 将偏移量主动重置为最早的偏移量;latest- 主动将偏移量重置为最新偏移量;none- 如果未找到消费者组的先前偏移量,则向消费者抛出异样;anything else- 向消费者抛出异样。
- decorate_events: none:未增加元数据,basic:增加了记录的属性,extended:记录的属性,增加题目,false:不倡议应用的别名 none,true:不倡议应用的别名 basic
4.4 罕用过滤插件(Filter plugin)
丰盛的过滤器插件的是 logstash 威力如此弱小的重要因素,过滤器插件次要解决流经以后 Logstash 的事件信息,能够增加字段、移除字段、转换字段类型,通过正则表达式切分数据等,也能够依据条件判断来进行不同的数据处理形式, 详情查看官网文档 - 罕用过滤插件
4.4.1 grok 正则捕捉
grok 是 Logstash 中将非结构化数据解析成结构化数据以便于查问的最好工具,非常适合解析 syslog logs,apache log,mysql log,以及一些其余的 web log
(1)预约义表达式调用:
- Logstash 提供 120 个罕用正则表达式可供装置应用,装置之后你能够通过名称调用它们,语法如下:%{SYNTAX:SEMANTIC}
- SYNTAX:示意曾经装置的正则表达式的名称
- SEMANTIC:示意从 Event 中匹配到的内容的名称
例如:
Event 的内容为“[debug] 127.0.0.1 – test log content”,匹配 %{IP:client}将取得“client: 127.0.0.1”的后果,前提装置了 IP 表达式;如果你在捕捉数据时想进行数据类型转换能够应用 %{NUMBER:num:int}这种语法,默认状况下,所有的返回后果都是 string 类型,以后 Logstash 所反对的转换类型仅有“int”和“float”;
一个略微残缺一点的事例:
- 日志文件 http.log 内容:55.3.244.1 GET /index.html 15824 0.043
- 表达式:%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}
-
配置文件内容:
input { file {path => "/var/log/http.log"} } filter { grok {match => {"message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"} } }
-
输入后果:
client: 55.3.244.1 method: GET request: /index.html bytes: 15824 duration: 0.043
(2)自定义表达式调用
语法:(?<field_name>the pattern here)
举例:捕捉 10 或 11 和长度的十六进制 queue_id 能够应用表达式 (?<queue_id>[0-9A-F]{10,11})
装置自定义表达式
与预约义表达式雷同,你也能够将自定义的表达式配置到 Logstash 中,而后就能够像于定义的表达式一样应用;以下是操作步骤阐明:
- 1、在 Logstash 根目录下创立文件夹“patterns”,在“patterns”文件夹中创立文件“extra”(文件名称无所谓,可本人抉择有意义的文件名称);
-
2、在文件“extra”中增加表达式,格局:patternName regexp,名称与表达式之间用空格隔开即可,如下:
# contents of ./patterns/postfix: POSTFIX_QUEUEID [0-9A-F]{10,11}
- 3、应用自定义的表达式时须要指定“patterns_dir”变量,变量内容指向表达式文件所在的目录,举例如下:
<1> 日志内容
Jan 1 06:25:43 mailserver14 postfix/cleanup[21403]: BEF25A72965: message-id=<20130101142543.5828399CCAF@mailserver14.example.com>
<2>Logstash 配置
filter {
grok {patterns_dir => ["./patterns"]
match => {"message" => "%{SYSLOGBASE} %{POSTFIX_QUEUEID:queue_id}: %{GREEDYDATA:syslog_message}" }
}
}
<3> 运行后果
timestamp: Jan 1 06:25:43
logsource: mailserver14
program: postfix/cleanup
pid: 21403
queue_id: BEF25A72965
(3)grok 罕用配置参数(空 => 同上)
参数名称 | 类型 | 默认值 | 形容信息 |
---|---|---|---|
add_field | |||
add_tag | |||
break_on_match | boolean | true | match 字段存在多个 pattern 时,当第一个匹配胜利后完结前面的匹配,如果想匹配所有的 pattern,将此参数设置为 false |
enable_metric | |||
id | |||
keep_empty_captures | boolean | false | 如果为 true,捕捉失败的字段奖设置为空值 |
match | array | {} | 设置 pattern 数组: match=> {“message”=> [“Duration: %{NUMBER:duration}”,”Speed: %{NUMBER:speed}”]} |
named_captures_only | boolean | true | If true, only store named captures from grok. |
overwrite | array | [] | 笼罩字段内容: match=> {“message”=>“%{SYSLOGBASE} %{DATA:message}”} overwrite=> [“message”] |
patterns_dir | array | [] | 指定自定义的 pattern 文件寄存目录,Logstash 在启动时会读取文件夹内 patterns_files_glob 匹配的所有文件内容 |
patterns_files_glob | string | “*” | 用于匹配 patterns_dir 中的文件 |
periodic_flush | boolean | false | 定期调用 filter 的 flush 办法 |
remove_field | array | [] | 从 Event 中删除任意字段: remove_field=> [“foo_%{somefield}”] |
remove_tag | array | [] | 删除“tags”中的值: remove_tag=> [“foo_%{somefield}”] |
tag_on_failure | array | [“_grokparsefailure”] | 当没有匹配胜利时,将此 array 增加到“tags”字段内 |
tag_on_timeout | string | “_groktimeout” | 当匹配超时时,将此内容增加到“tags”字段内 |
timeout_millis | number | 30000 | 设置单个 match 到超时工夫,单位:毫秒,如果设置为 0,则不启用超时设置 |
4.4.2 date 工夫解决插件
该插件用于工夫字段的格局转换,比方将“Apr 17 09:32:01”(MMM dd HH:mm:ss)转换为“MM-dd HH:mm:ss”。而且通常状况下,Logstash 会为主动给 Event 打上工夫戳,然而这个工夫戳是 Event 的解决工夫(次要是 input 接收数据的工夫),和日志记录时间会存在偏差(次要起因是 buffer),咱们能够应用此插件用日志产生工夫替换掉默认是工夫戳的值。
罕用配置参数(空 => 同上)
参数名称 | 类型 | 默认值 | 形容信息 |
---|---|---|---|
add_field | |||
add_tag | |||
enable_metric | |||
id | |||
locale | |||
match | array | [] | 工夫字段匹配, 可自定多种格局,直到匹配到或匹配完结 |
periodic_flush | |||
remove_field | |||
remove_tag | |||
tag_on_failure | |||
target | string | “@timestamp” | 指定 match 匹配并且转换为 date 类型的存储地位(字段),默认笼罩到“@timestamp” |
timezone | string | 无 | 指定工夫格式化的时区 |
留神:
match 的格局: 工夫字段匹配, 可自定多种格局,直到匹配到或匹配完结,格局: [field,formats…],如:match=>[“logdate”,“MMM dd yyyy HH:mm:ss”,“MMM d yyyy HH:mm:ss”,“ISO8601”]
4.4.3 mutate 数据批改插件
mutate 插件是 Logstash 另一个重要插件。它提供了丰盛的根底类型数据处理能力。能够重命名,删除,替换和批改事件中的字段。
罕用配置参数(空 => 同上)
参数名称 | 类型 | 默认值 | 形容信息 |
---|---|---|---|
add_field | |||
add_tag | |||
convert | hash | 无 | 将指定字段转换为指定类型,字段内容是数组,则转换所有数组元素,如果字段内容是 hash,则不做任何解决,目前反对的转换类型包含:integer,float, string, and boolean. 例如:convert=> {“fieldname”=>“integer”} |
enable_metric | |||
gsub | array | 无 | 相似 replace 办法,应用指定内容替换掉指标字符串的现有内容,前提是指标字段必须是字符串,否则不做任何解决,例如:[“fieldname”,“/”,““,“fieldname2”,“[\?#-]”,“.”],解释:应用“”替换掉“fieldname”中的所有“/”,应用“.”替换掉“fieldname2”中的所有“\”“?”、“#”和“-” |
id | |||
join | hash | 无 | 应用指定的符号将 array 字段的每个元素连接起来,对非 array 字段有效。例如:应用“,”将 array 字段“fieldname”的每一个元素连接成一个字符串:join=> {“fieldname”=>“,”} |
lowercase | array | 无 | 将自定的字段值转换为小写 |
merge | hash | 无 | 合并两个 array 或者 hash,如果是字符串,将主动转换为一个单元素数组;将一个 array 和一个 hash 合并。例如:将”added_field”合并到”dest_field”:merge=> {“dest_field”=>“added_field”} |
periodic_flush | |||
remove_field | |||
remove_tag | |||
rename | hash | 无 | 批改一个或者多个字段的名称。例如:将”HOSTORIP”改名为”client_ip”:rename=> {“HOSTORIP”=>“client_ip”} |
replace | hash | 无 | 应用新值残缺的替换掉指定字段的原内容,反对变量援用。例如:应用字段“source_host”的内容拼接上字符串“: My new message”之后的后果替换“message”的值:replace=> {“message”=>“%{source_host}: My new message”} |
split | hash | 无 | 依照自定的分隔符将字符串字段拆分成 array 字段,只能作用于 string 类型的字段。例如:将“fieldname”的内容依照“,”拆分成数组:split=> {“fieldname”=>“,”} |
strip | array | 无 | 去掉字段内容中间的空白字符。例如:去掉“field1”和“field2”中间的空格:strip=> [“field1”,“field2”] |
update | hash | 无 | 更新现有字段的内容,例如:将“sample”字段的内容更新为“Mynew message”:update=> {“sample”=>“My new message”} |
uppercase | array | 无 | 将字符串转换为大写 |
4.4.4 JSON 插件
JSON 插件用于解码 JSON 格局的字符串,个别是一堆日志信息中,局部是 JSON 格局,局部不是的状况下
(1)配置事例
json {source => ...}
-
事例配置,message 是 JSON 格局的字符串:
"{\"uid\":3081609001,\"type\":\"signal\"}"
filter { json { source => "message" target => "jsoncontent" } }
-
输入后果:
{ "@version": "1", "@timestamp": "2014-11-18T08:11:33.000Z", "host": "web121.mweibo.tc.sinanode.com", "message": "{\"uid\":3081609001,\"type\":\"signal\"}", "jsoncontent": { "uid": 3081609001, "type": "signal" } }
-
如果从事例配置中删除
target
,输入后果如下:{ "@version": "1", "@timestamp": "2014-11-18T08:11:33.000Z", "host": "web121.mweibo.tc.sinanode.com", "message": "{\"uid\":3081609001,\"type\":\"signal\"}", "uid": 3081609001, "type": "signal" }
(2)罕用配置参数(空 => 同上)
参数名称 | 类型 | 默认值 | 形容信息 |
---|---|---|---|
add_field | |||
add_tag | |||
enable_metric | |||
id | |||
periodic_flush | |||
remove_field | |||
remove_tag | |||
skip_on_invalid_json | boolean | false | 是否跳过验证不通过的 JSON |
source | string | 无 | 必须设置项,指定须要解码的 JSON 字符串字段 |
tag_on_failure | |||
target | string | 无 | 解析之后的 JSON 对象所在的字段名称,如果没有,JSON 对象的所有字段将挂在根节点下 |
4.4.5 elasticsearch 查问过滤插件
用于查问 Elasticsearch 中的事件,可将查问后果利用于以后事件中
罕用配置参数(空 => 同上)
参数名称 | 类型 | 默认值 | 形容信息 |
---|---|---|---|
add_field | |||
add_tag | |||
ca_file | string | 无 | SSL Certificate Authority file path |
enable_sort | boolean | true | 是否对后果进行排序 |
fields | array | {} | 从老事件中复制字段到新事件中,老事件来源于 elasticsearch(用于查问更新) |
hosts | array | [“localhost:9200”] | elasticsearch 服务列表 |
index | string | “” | 用逗号分隔的 elasticsearch 索引列表,如果要操作所有所有应用“_all”或者“”,保留数据到 elasticsearch 时,如果索引不存在会主动以此创立 |
password | string | 无 | 明码 |
periodic_flush | |||
query | string | 无 | 查问 elasticsearch 的查问字符串 |
remove_field | |||
remove_tag | |||
result_size | number | 1 | 查问 elasticsearch 时,返回后果的数量 |
sort | string | “@timestamp:desc” | 逗号分隔的“:”列表,用于查问后果排序 |
ssl | boolean | false | SSL |
tag_on_failure | |||
user | string | 无 | 用户名 |
4.5 罕用输入插件(Output plugin)
4.5.1 ElasticSearch 输入插件
用于将事件信息写入到 Elasticsearch 中,官网举荐插件,ELK 必备插件
(1)配置事例
output {
elasticsearch {hosts => ["127.0.0.1:9200"]
index => "filebeat-%{type}-%{+yyyy.MM.dd}"
template_overwrite => true
}
}
(2)罕用配置参数(空 => 同上)
参数名称 | 类型 | 默认值 | 形容信息 |
---|---|---|---|
absolute_healthcheck_path | boolean | false | 当配置了“healthcheck_path”时,决定 elasticsearch 健康检查 URL 是否依照绝对路径配置。例如:elasticsearch 拜访门路为:”http://localhost:9200/es“,“h… 以后参数为 true 时的拜访门路为:”http://localhost:9200/es/heal… 以后参数为 false 时的拜访门路为:”http://localhost:9200/health” |
absolute_sniffing_path | boolean | false | 当配置了“sniffing_path”时,决定 elasticsearch 的 sniffing 拜访门路配置。例如:elasticsearch 拜访门路为:“http://localhost:9200/es”,“s… 以后参数为 true 时的拜访门路为:“http://localhost:9200/es/_sni… 以后参数为 false 时的拜访门路为:“http://localhost:9200/_sniffing” |
action | string | “index” | 对 elasticsearch 的操作类型,可用的操作类型:index:索引 Logstash 事件数据到 elasticsearch;delete:依据 id 删除文档,id 必须指定;delete:依据 id 删除文档,id 必须指定;update:依据 id 更新文档 |
cacert | string | 无 | .cer 或者.pem 证书文件门路,应用证书进行 elasticsearch 认证 |
codec | |||
doc_as_upsert | boolean | false | 使 update 启用 upsert 模式,即文档不存在时创立新文档 |
document_id | string | 无 | elasticsearch 中的文档 id,用来笼罩曾经保留到 elasticsearch 中的文档 |
document_type | string | 无 | 指定存入 elasticsearch 中的文档的 type,没有指定的状况下会应用 Event 信息中的“type”字段的值作为 elasticsearch 的 type |
enable_metric | |||
failure_type_logging_whitelist | array | [] | elasricsearch 报错白名单,白名单的异样信息不会被记入 logstash 的 log 中,比方你想疏忽掉所有的“document_already_exists_exception”异样 |
flush_size | |||
healthcheck_path | string | “/” | elasricsearch 查看状态查看门路 |
hosts | string | [//127.0.0.1] | elasticsearch 服务地址列表,如果配置多个将启用负载平衡 |
id | |||
idle_flush_time | number | 1 | 距离多长时间将数据输入到 elasticsearch 中一次,次要用于较慢的事件 |
index | string | “logstash-%{+YYYY.MM.dd}” | 指定 elasticsearch 存储数据时的所有名称,反对变量援用,比方你能够按天创立索引,不便删除历史数据或者查问制订范畴内的数据 |
keystore | string | 无 | 用于指定密钥库门路,能够是.jks 或者.p12 |
keystore_password | string | 无 | 密钥库明码 |
manage_template | boolean | true | 是否启用 elasticsearch 模版,Logstash 自带一个模版,然而只有名称匹配“logstash-*”的索引才会利用该默版 |
parameters | hash | 无 | 增加到 elasticsearch URL 前面的参数键值对 |
parent | string | “nil” | 为文档子节点指定父节点的 id |
password | string | 无 | elasticsearch 集群拜访明码 |
path | string | 无 | 当设置了 elasticsearch 代理时用此参数从定向 HTTP API,如果“hosts”中曾经蕴含此门路,则不须要设置 |
pipeline | string | “nil” | 设置 Event 管道 |
pool_max | number | 1000 | elasticsearch 最大连接数 |
pool_max_per_route | number | 100 | 每个“endpoint”的最大连接数 |
proxy | string | 无 | 代理 URL |
resurrect_delay | number | 5 | 查看挂掉的“endpoint”是否恢复正常的频率 |
retry_initial_interval | number | 2 | 设置批量重试的工夫距离,重试到“retry_max_interval”次 |
retry_max_interval | number | 64 | Setmax interval in seconds between bulk retries. |
retry_on_conflict | number | 1 | Thenumber of times Elasticsearch should internally retry an update/upserteddocument |
routing | string | 无 | 指定 Event 路由 |
script | string | “” | 设置“scriptedupdate”模式下的脚本名称 |
script_lang | string | “painless” | 设置脚本语言 |
script_type | “inline”、“indexed”、“file” | [“inline”] | Definethe type of script referenced by“script”variable inline :”script”contains inline script indexed :“script”containsthe name of script directly indexed in elasticsearch file :“script”contains the name of script stored in elasticseach’s config directory |
script_var_name | string | “event” | Setvariable name passed to script (scripted update) |
scripted_upsert | boolean | false | ifenabled, script is in charge of creating non-existent document (scriptedupdate) |
sniffing | |||
sniffing_delay | |||
sniffing_path | |||
ssl | |||
ssl_certificate_verification | |||
template | string | 无 | 设置自定义的默版寄存门路 |
template_name | string | “logstash” | 设置应用的默版名称 |
template_overwrite | boolean | false | 是否始终笼罩现有模版 |
timeout | number | 60 | 网络超时工夫 |
truststore | string | 无 | “:truststore”或者“:cacert”证书库门路 |
truststore_password | string | 无 | 证书库明码 |
upsert | string | “” | Setupsert content for update mode.s Create a new document with this parameter asjson string if document_id doesn’texists |
user | string | “” | elasticsearch 用户名 |
validate_after_inactivity | number | 10000 | 距离多长时间放弃连贯可用 |
version | string | 无 | 存入 elasticsearch 的文档的版本号 |
version_type | “internal”、“external”、“external_gt”、“external_gte”、“force” | 无 | |
workers | string | 1 | whenwe no longer support the :legacy type This is hacky, but it can only be herne |
4.5.2 Redis 输入插件
用于将 Event 写入 Redis 中进行缓存,通常状况下 Logstash 的 Filter 解决比拟吃系统资源,简单的 Filter 解决会十分耗时,如果 Event 产生速度比拟快,能够应用 Redis 作为 buffer 应用
(1)配置事例
output {
redis {
host => "127.0.0.1"
port => 6379
data_type => "list"
key => "logstash-list"
}
}
(2)罕用配置参数(空 => 同上)
参数名称 | 类型 | 默认值 | 形容信息 |
---|---|---|---|
batch | boolean | false | 是否启用 redis 的 batch 模式,仅在 data_type=”list”时无效 |
batch_events | number | 50 | batch 大小,batch 达到此大小时执行“RPUSH” |
batch_timeout | number | 5 | batch 超时工夫,超过这个工夫执行“RPUSH” |
codec | |||
congestion_interval | number | 1 | 距离多长时间查看阻塞,如果设置为 0,则没个 Event 查看一次 |
congestion_threshold | number | 0 | |
data_type | “list”、“channel” | 无 | 存储在 redis 中的数据类型,如果应用“list”,将采纳“RPUSH”操作,如果是“channel”,将采纳“PUBLISH”操作 |
db | number | 0 | 应用的 redis 数据库编号 |
enable_metric | |||
host | array | [“127.0.0.1”] | redis 服务列表,如果配置多个,将随机抉择一个,如果以后的 redis 服务不可用,将抉择下一个 |
id | |||
key | string | 无 | Thename of a Redis list or channel. Dynamic names are valid here, forexample logstash-%{type}. |
password | string | 无 | redis 服务明码 |
port | number | 6379 | redis 服务监听端口 |
reconnect_interval | number | 1 | 连贯失败时的重连距离 |
shuffle_hosts | boolean | true | Shufflethe host list during Logstash startup. |
timeout | number | 5 | redis 连贯超时工夫 |
workers | number | 1 | whenwe no longer support the :legacy type This is hacky, but it can only be herne |
4.5.3 File 输入插件
用于将 Event 输入到文件内
(1)配置事例
output {
file {
path => ...
codec => line {format => "custom format: %{message}"}
}
}
(2)罕用配置参数(空 => 同上)
参数名称 | 类型 | 默认值 | 形容信息 |
---|---|---|---|
codec | |||
create_if_deleted | boolean | true | 如果指标文件被删除,则在写入事件时创立新文件 |
dir_mode | number | -1 | 设置目录的拜访权限,如果为“-1”,应用操作系统默认的拜访权限 |
enable_metric | |||
file_mode | number | -1 | 设置文件的拜访权限,如果为“-1”,应用操作系统默认的拜访权限 |
filename_failure | string | “_filepath_failures” | 如果指定的文件门路有效,这会在目录内创立这个文件并记录数据 |
flush_interval | number | 2 | flush 距离 |
gzip | boolean | false | 是否启用 gzip 压缩 |
id | |||
path | string | 无 | 必须设置项,文件输入门路,如:path =>”./test-%{+YYYY-MM-dd}.txt” |
workers | string | 1 | whenwe no longer support the :legacy type This is hacky, but it can only be herne |
4.5.4 Kafka 输入插件
用于将 Event 输入到 Kafka 指定的 Topic 中, 官网 Kafka 详情配置
(1)配置事例
output {
kafka {
bootstrap_servers => "localhost:9092"
topic_id => "test"
compression_type => "gzip"
}
}
(2)罕用配置参数(空 => 同上)
参数名称 | 类型 | 默认值 | 形容信息 |
---|---|---|---|
bootstrap_servers | string | Kafka 集群信息,格局为 host1:port1,host2:port2 | |
topic_id | string | 产生音讯的主题 | |
compression_type | String | none | 生产者生成的所有数据的压缩类型。默认为无(即无压缩)。有效值为 none、gzip、snappy 或 lz4。 |
batch_size | number | 16384 | 配置以字节为单位管制默认批处理大小 |
buffer_memory | number | 33554432(32MB) | 生产者可用于缓冲期待发送到服务器的记录的总内存字节数 |
max_request_size | number | 1048576(1MB) | 申请的最大大小 |
flush_interval | number | 2 | flush 距离 |
gzip | boolean | false | 是否启用 gzip 压缩 |
id | |||
path | string | 无 | 必须设置项,文件输入门路,如:path =>”./test-%{+YYYY-MM-dd}.txt” |
workers | string | 1 | whenwe no longer support the :legacy type This is hacky, but it can only be herne |
4.6 罕用编码插件(Codec plugin)
4.6.1 JSON 编码插件
间接输出预约义好的 JSON 数据,这样就能够省略掉 filter/grok 配置
-
配置事例
json {}
-
罕用配置参数
参数名称 类型 默认值 形容信息 charset string “UTF-8” 字符集 enable_metric id
五 Logstash 实例
5.1 接管 Filebeat 事件,输入到 Redis
input {
beats {port => 5044}
}
output {
redis {
host => "127.0.0.1"
port => 6379
data_type => "list"
key => "logstash-list"
}
}
5.2 读取 Redis 数据,依据“type”判断,别离解决,输入到 ES
input {
redis {
host => "127.0.0.1"
port => 6379
data_type => "list"
key => "logstash-list"
}
}
filter {if [type] == "application" {
grok {match => ["message", "(?m)-(?<systemName>.+?):(?<logTime>(?>\d\d){1,2}-(?:0?[1-9]|1[0-2])-(?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9]) (?:2[0123]|[01]?[0-9]):(?:[0-5][0-9]):(?:(?:[0-5][0-9]|60)(?:[:.,][0-9]+)?)) \[(?<level>(\b\w+\b)) *\] (?<thread>(\b\w+\b)) \((?<point>.*?)\) - (?<content>.*)"]
}
date {match => ["logTime", "yyyy-MM-dd HH:mm:ss,SSS"]
}
json {source => "message"}
date {match => ["timestamp", "yyyy-MM-dd HH:mm:ss,SSS"]
}
}
if [type] == "application_bizz" {
json {source => "message"}
date {match => ["timestamp", "yyyy-MM-dd HH:mm:ss,SSS"]
}
}
mutate {remove_field => ["@version", "beat", "logTime"]
}
}
output {stdout{}
elasticsearch {hosts => ["127.0.0.1:9200"]
index => "filebeat-%{type}-%{+yyyy.MM.dd}"
document_type => "%{documentType}"
template_overwrite => true
}
}
六 利用场景
6.1 以 logstash 作为日志搜索器
架构:logstash 采集、解决、转发到 elasticsearch 存储,在 kibana 进行展现
特点:这种构造因为须要在各个服务器上部署 Logstash,而它比拟耗费 CPU 和内存资源,所以比拟适宜计算资源丰盛的服务器,否则容易造成服务器性能降落,甚至可能导致无奈失常工作。
6.2 音讯模式
音讯模式:Beats 还不反对输入到音讯队列(新版本除外:5.0 版本及以上),所以在音讯队列前后两端只能是 Logstash 实例。logstash 从各个数据源收集数据,不通过任何解决转换仅转收回到音讯队列(kafka、redis、rabbitMQ 等),后 logstash 从音讯队列取数据进行转换剖析过滤,输入到 elasticsearch,并在 kibana 进行图形化展现
架构(Logstash 进行日志解析所在服务器性能各方面必须要足够好):
模式特点:这种架构适宜于日志规模比拟宏大的状况。但因为 Logstash 日志解析节点和 Elasticsearch 的负荷比拟重,可将他们配置为集群模式,以分担负荷。引入音讯队列,平衡了网络传输,从而升高了网络灵通,尤其是失落数据的可能性,但仍然存在 Logstash 占用系统资源过多的问题
工作流程:Filebeat 采集—> logstash 转发到 kafka—> logstash 解决从 kafka 缓存的数据进行剖析—> 输入到 es—> 显示在 kibana
6.3 logstash(非 filebeat)进行文件采集,输入到 kafka 缓存,读取 kafka 数据并解决输入到文件或 es
6.4 logstash 同步 mysql 数据库数据到 es(logstash5 版本以上已集成 jdbc 插件,无需下载安装,间接应用)
七 Logstash 和 Flume 比照
首先从构造比照,咱们会惊人的发现,两者是如许的类似!Logstash 的 Shipper、Broker、Indexer 别离和 Flume 的 Source、Channel、Sink 各自对应!只不过是 Logstash 集成了,Broker 能够不须要,而 Flume 须要独自配置,且缺一不可,但这再一次阐明了计算机的设计思维都是通用的!只是实现形式会不同而已。
从程序员的角度来说,上文也提到过了,Flume 是真的很繁琐,你须要别离作 source、channel、sink 的手工配置,而且波及到简单的数据采集环境,你可能还要做多个配置,这在下面提过了,反过来说Logstash 的配置就十分简洁清晰,三个局部的属性都定义好了,程序员本人去抉择就行,就算没有,也能够自行开发插件,十分不便。当然了,Flume 的插件也很多,但 Channel 就只有内存和文件这两种(其实当初不止了,但罕用的也就两种)。读者能够看得出来,两者其实配置都是非常灵活的,只不过看场景取舍罢了。
其实从作者和历史背景来看,两者最后的设计目标就不太一样。Flume 自身最后设计的目标是为了把数据传入 HDFS 中(并不是为了采集日志而设计,这和 Logstash 有基本的区别),所以理所应当侧重于数据的传输,程序员要十分分明整个数据的路由,并且比 Logstash 还多了一个可靠性策略,上文中的 channel 就是用于长久化目标,数据除非确认传输到下一地位了,否则不会删除,这一步是通过事务来管制的,这样的设计使得可靠性十分好。相同,Logstash 则显著偏重对数据的预处理,因为日志的字段须要大量的预处理,为解析做铺垫。
回过来看我当初为什么先讲 Logstash 而后讲 Flume?这外面有几个思考
- 其一:Logstash 其实更有点像通用的模型,所以对新人来说了解起来更简略,而 Flume 这样轻量级的线程,可能有肯定的计算机编程根底了解起来更好;
- 其二:目前大部分的状况下,Logstash 用的更加多,这个数据我本人没有统计过,然而依据教训判断,Logstash 能够和 ELK 其余组件配合应用,开发、利用都会简略很多,技术成熟,应用场景宽泛。相同 Flume 组件就须要和其余很多工具配合应用,场景的针对性会比拟强,更不必提 Flume 的配置过于繁琐简单了。
最初总结下来,咱们能够这么了解他们的区别:
Logstash 就像是买来的台式机,主板、电源、硬盘,机箱(Logstash)把外面的货色全副装好了,你能够间接用,当然也能够本人组装批改;
Flume 就像提供给你一套残缺的主板,电源、硬盘,Flume 没有打包,只是像说明书一样领导你如何组装,能力运行的起来。
参考文档:
- [1] 迷途的攻城狮.CSDN: https://blog.csdn.net/chenlei… , 2017-06-22.
- [2] Logstash 官网: https://www.elastic.co/cn/log…