Logstash-数据流引擎

作者 | WenasWei

一 Logstash

Logstash是具备实时流水线性能的开源数据收集引擎。Logstash能够动静对立来自不同起源的数据,并将数据标准化到您抉择的指标地位。革除所有数据并使其民主化,以用于各种高级上游剖析和可视化用例。

1.1 Logstash简介

Logstash 是一个数据流引擎:

  • 它是用于数据物流的开源流式 ETL(Extract-Transform-Load)引擎
  • 在几分钟内建设数据流管道
  • 具备程度可扩大及韧性且具备自适应缓冲
  • 不可知的数据源
  • 具备200多个集成和处理器的插件生态系统
  • 应用 Elastic Stack 监督和治理部署

官网介绍:Logstash is an open source data collection engine with real-time pipelining capabilities。简略来说logstash就是一根具备实时数据传输能力的管道,负责将数据信息从管道的输出端传输到管道的输入端;与此同时这根管道还能够让你依据本人的需要在两头加上滤网,Logstash提供里很多功能强大的滤网以满足你的各种利用场景。

1.2 数据处理

Logstash 是一个功能强大的工具,可与各种部署集成。 它提供了大量插件,可帮忙你解析,丰盛,转换和缓冲来自各种起源的数据。 如果你的数据须要 Beats 中没有的其余解决,则须要将 Logstash 增加到部署中。

当下最为风行的数据源:

Logstash 能够摄入日志,文件,指标或者网路实在数据。通过 Logstash 的解决,变为能够应用的 Web
Apps 能够耗费的数据,也能够存储于数据中心,或变为其它的流式数据:

  • Logstash 能够很不便地和 Beats一起单干,这也是被举荐的办法
  • Logstash 也能够和那些驰名的云厂商的服务一起单干解决它们的数据
  • 它也能够和最为同样的信息音讯队列,比方 redis 或 kafka 一起合作
  • Logstash 也能够应用 JDBC 来拜访 RDMS 数据
  • 它也能够和 IoT 设施一起解决它们的数据
  • Logstash 不仅仅能够把数据传送到 Elasticsearch,而且它还能够把数据发送至很多其它的目的地,并作为它们的输出源做进一步的解决

二 Logstash零碎架构

Logstash 蕴含3个次要局部: 输出(inputs),过滤器(filters)和输入(outputs)

Logstash的事件(logstash将数据流中等每一条数据称之为一个event)解决流水线有三个次要角色实现:inputs –> filters –> outputs:

  • inpust:必须,负责产生事件(Inputs generate events),罕用:File、syslog、redis、kakfa、beats(如:Filebeats)
  • filters:可选,负责数据处理与转换(filters modify them),罕用:grok、mutate、drop、clone、geoip
  • outpus:必须,负责数据输入(outputs ship them elsewhere),罕用:elasticsearch、file、graphite、kakfa、statsd

三 Logstash装置

3.1 环境清单
  • 操作系统:Linux #56-Ubuntu SMP Tue Jun 4 22:49:08 UTC 2019 x86_64
  • Logstash版本:logstash-6.2.4
  • Jdk版本:1.8.0_152
3.2 Linux装置JDK
3.2.1 解压缩并挪动到指定目录(约定的目录:/usr/local)
(1)解压缩
tar -zxvf jdk-8u152-linux-x64.tar.gz
(2)创立目录
mkdir -p /usr/local/java
(3)挪动安装包
mv jdk1.8.0_152/ /usr/local/java/
(4)设置所有者
chown -R root:root /usr/local/java/
3.2.2 配置环境变量
(1)配置零碎环境变量
vi /etc/environment
(2)增加如下语句
PATH="/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games"export JAVA_HOME=/usr/local/java/jdk1.8.0_152export JRE_HOME=/usr/local/java/jdk1.8.0_152/jreexport CLASSPATH=$CLASSPATH:$JAVA_HOME/lib:$JAVA_HOME/jre/lib
(3)配置用户环境变量
nano /etc/profile
(4)增加如下语句(肯定要放两头)
if [ "$PS1" ]; then  if [ "$BASH" ] && [ "$BASH" != "/bin/sh" ]; then    # The file bash.bashrc already sets the default PS1.    # PS1='\h:\w\$ '    if [ -f /etc/bash.bashrc ]; then      . /etc/bash.bashrc    fi  else    if [ "`id -u`" -eq 0 ]; then      PS1='# '    else      PS1='$ '    fi  fifiexport JAVA_HOME=/usr/local/java/jdk1.8.0_152export JRE_HOME=/usr/local/java/jdk1.8.0_152/jreexport CLASSPATH=$CLASSPATH:$JAVA_HOME/lib:$JAVA_HOME/jre/libexport PATH=$JAVA_HOME/bin:$JAVA_HOME/jre/bin:$PATH:$HOME/binif [ -d /etc/profile.d ]; then  for i in /etc/profile.d/*.sh; do    if [ -r $i ]; then      . $i    fi  done  unset ifi
(5)使用户环境变量失效
source /etc/profile
(6)测试是否装置胜利
$ java -versionjava version "1.8.0_152"Java(TM) SE Runtime Environment (build 1.8.0_152-b16)Java HotSpot(TM) 64-Bit Server VM (build 25.152-b16, mixed mode)
3.3 装置Logstash
3.3.1 创立装置目录
$ sudo mkdir /usr/local/logstash
3.3.2 下载Logstash安装文件
$ wget -P /usr/local/logstash https://artifacts.elastic.co/downloads/logstash/logstash-6.2.4.tar.gz
3.3.2 解压缩安装文件
$ cd /usr/local/logstash/$ sudo tar -zxvf logstash-6.2.4.tar.gz
3.3.3 测试装置是否胜利

测试: 疾速启动,规范输入输出作为input和output,没有filter

$ cd logstash-6.2.4/$ ./bin/logstash -e 'input { stdin {} } output { stdout {} }'Sending Logstash's logs to /usr/local/logstash/logstash-6.2.4/logs which is now configured via log4j2.properties[2021-05-27T00:22:28,729][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"/usr/local/logstash/logstash-6.2.4/modules/fb_apache/configuration"}[2021-05-27T00:22:28,804][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"/usr/local/logstash/logstash-6.2.4/modules/netflow/configuration"}[2021-05-27T00:22:29,827][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified[2021-05-27T00:22:30,979][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"6.2.4"}[2021-05-27T00:22:31,821][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}[2021-05-27T00:22:36,463][INFO ][logstash.pipeline        ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>1, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}[2021-05-27T00:22:36,690][INFO ][logstash.pipeline        ] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0x55a5abea run>"}The stdin plugin is now waiting for input:[2021-05-27T00:22:36,853][INFO ][logstash.agent           ] Pipelines running {:count=>1, :pipelines=>["main"]}## 此时命令窗口停留在期待输出状态,键盘键入任意字符 ##hello world## 下方是Logstash输入到成果 ##{    "@timestamp" => 2021-05-26T16:22:52.527Z,          "host" => "*******",       "message" => "hello world",      "@version" => "1"}

四 Logstash参数与配置

4.1 罕用启动参数
参数阐明举例
-e立刻执行,应用命令行里的配置参数启动实例./bin/logstash -e ‘input {stdin {}} output {stdout {}}’
-f指定启动实例的配置文件./bin/logstash -f config/test.conf
-t测试配置文件的正确性./bin/logstash-f config/test.conf -t
-l指定日志文件名称./bin/logstash-f config/test.conf -l logs/test.log
-w指定filter线程数量,默认线程数是5./bin/logstash-f config/test.conf -w 8
4.2 配置文件构造及语法
(1)区段

Logstash通过{}来定义区域,区域内能够定义插件,一个区域内能够定义多个插件,如下:

input {    stdin {    }    beats {        port => 5044    }}
(2)数据类型

Logstash仅反对大量的数据类型:

  • Boolean:ssl_enable => true
  • Number:port => 33
  • String:name => “Hello world”
  • Commonts:# this is a comment
(3)字段援用

Logstash数据流中的数据被称之为Event对象,Event以JSON构造形成,Event的属性被称之为字段,如果你像在配置文件中援用这些字段,只须要把字段的名字写在中括号[]里就行了,如[type],对于嵌套字段每层字段名称都写在[]里就能够了,比方:tags;除此之外,对于Logstash的arrag类型反对下标与倒序下表,如:tags[0],tags[-1]。

(4)条件判断

Logstash反对上面的操作符:

  • equality:==, !=, <, >, <=, >=
  • regexp:=~, !~
  • inclusion:in, not in
  • boolean:and, or, nand, xor
  • unary:!

例如:

if EXPRESSION {  ...} else if EXPRESSION {  ...} else {  ...}
(5)环境变量援用

Logstash反对援用零碎环境变量,环境变量不存在时能够设置默认值,例如:

export TCP_PORT=12345input {  tcp {    port => "${TCP_PORT:54321}"  }}
4.3 罕用输出插件(Input plugin)

输出插件包含有以下多种,详情查看官网文档-罕用输出插件:

  • elasticsearch
  • exec
  • file
  • github
  • http
  • jdbc
  • jms
  • jmx
  • kafka
  • log4j
  • rabbitmq
  • redis
  • tcp
  • udp
  • unix
  • websocket
4.3.1 File读取插件

文件读取插件次要用来抓取文件的变动信息,将变动信息封装成Event过程解决或者传递。

  • 配置事例

    inputfile {  path => ["/var/log/*.log", "/var/log/message"]  type => "system"  start_position => "beginning"}}
  • 罕用参数
参数名称类型默认值形容信息
add_fieldhash{}用于向Event中增加字段
close_oldernumber3600设置文件多久秒内没有更新就关掉对文件的监听
codecstring“plain”输出数据之后对数据进行解码
delimiterstring“\n”文件内容的行分隔符,默认依照行进行Event封装
discover_intervalnumber15距离多少秒查看一下path匹配对门路下是否有新文件产生
enable_metricbooleantrue
excludearraypath匹配的文件中指定例外,如:path => “/var/log/“;exclude =>”.gz”
idstring辨别两个雷同类型的插件,比方两个filter,在应用Monitor API监控是能够辨别,倡议设置上ID
ignore_oldernumber疏忽历史批改,如果设置3600秒,logstash只会发现一小时内被批改过的文件,一小时之前批改的文件的变动不会被读取,如果再次批改该文件,所有的变动都会被读取,默认被禁用
max_open_filesnumberlogstash能够同时监控的文件个数(同时关上的file_handles个数),如果你须要解决多于这个数量多文件,能够应用“close_older”去敞开一些文件
patharray必须设置项,用于匹配被监控的文件,如“/var/log/.log”或者“/var/log//.log”,必须应用绝对路径
sincedb_pathstring文件读取记录,必须指定一个文件而不是目录,文件中保留没个被监控的文件等以后inode和byteoffset,默认寄存地位“$HOME/.sincedb*”
sincedb_write_intervalnumber15距离多少秒写一次sincedb文件
start_position“beginning”,“end”” end”从文件等结尾还是结尾读取文件内容,默认是结尾,如果须要导入文件中的老数据,能够设置为“beginning”,该选项只在第一次启动logstash时无效,如果文件曾经存在于sincedb的记录内,则此配置有效
stat_intervalnumber1距离多少秒检查一下文件是否被批改,加大此参数将升高零碎负载,然而减少了发现新日志的间隔时间
tagsarray能够在Event中减少标签,以便于在后续的解决流程中应用
typestringEvent的type字段,如果采纳elasticsearch做store,在默认状况下将作为elasticsearch的type
4.3.2 TCP监听插件

TCP插件有两种工作模式,“Client”和“Server”,别离用于发送网络数据和监听网络数据。

  • 配置事例

    tcp {  port => 41414}
  • 罕用参数(空 => 同上)
参数名称类型默认值形容信息
add_field
codec
enable_metric
host
id
mode“server”、“client”“server”“server”监听“client”的连贯申请,“client”连贯“server”
portnumber必须设置项,“server”模式时指定监听端口,“client”模式指定连贯端口
proxy_protocolbooleanfalseProxyprotocol support, only v1 is supported at this time
ssl_cert
ssl_enable
ssl_extra_chain_certs
ssl_key
ssl_key_passphrase
ssl_verify
tags
type
4.3.3 Redis读取插件

用于读取Redis中缓存的数据信息。

  • 配置事例

    input {redis {  host => "127.0.0.1"  port => 6379  data_type => "list"  key => "logstash-list"}}
  • 罕用参数(空 => 同上)
参数名称类型默认值形容信息
add_field
batch_countnumber125应用redis的batch个性,须要redis2.6.0或者更新的版本
codec
data_typelist,channel, pattern_channel必须设置项,依据设置不同,订阅redis应用不同的命令,顺次是:BLPOP、SUBSCRIBE、PSUBSCRIBE
dbnumber0指定应用的redis数据库
enable_metric
hoststring127.0.0.1redis服务地址
id
keystring必须设置项,reidslist或者channel的key名称
passwordstringredis明码
portnumber6379redis连贯端口号
tags
threadsnumber1
timeoutnumber5redis服务连贯超时工夫,单位:秒

留神:

data_type 须要留神的是“channel”和“pattern_channel”是播送类型,雷同的数据会同时发送给订阅了该channel的logstash,也就是说在logstash集群环境下会呈现数据反复,集群中的每一个节点都将收到同样的数据,然而在单节点状况下,“pattern_channel”能够同时定于满足pattern的多个key

4.3.4 Kafka读取插件

用于读取Kafka中推送的主题数据信息。

  • 配置事例

    input {kafka {  bootstrap_servers => "kafka-01:9092,kafka-02:9092,kafka-03:9092"  topics_pattern  => "elk-.*"  consumer_threads => 5  decorate_events => true  codec => "json"  auto_offset_reset => "latest"  group_id => "logstash1"##logstash 集群需雷同}}
  • 罕用参数:
参数名称类型默认值形容信息
bootstrap_serversstringlocalhost:9092Kafka列表,用于建设与集群的初始连贯
topics_patternstring要订阅的主题正则表达式模式。应用此配置时,主题配置将被疏忽。
consumer_threadsnumber并发线程数,现实状况下,您应该领有与分区数量一样多的线程
decorate_eventsstringnone可承受的值为:none/basic/extended/false
codeccodecplain用于输出数据的编解码器
auto_offset_resetstring当Kafka初始偏移量
group_idStringlogstash该消费者所属的组的标识符

留神:

  • auto_offset_reset: earliest-将偏移量主动重置为最早的偏移量;latest-主动将偏移量重置为最新偏移量;none-如果未找到消费者组的先前偏移量,则向消费者抛出异样;anything else-向消费者抛出异样。
  • decorate_events: none:未增加元数据,basic:增加了记录的属性,extended:记录的属性,增加题目,false:不倡议应用的别名 none,true:不倡议应用的别名 basic
4.4 罕用过滤插件(Filter plugin)

丰盛的过滤器插件的是 logstash威力如此弱小的重要因素,过滤器插件次要解决流经以后Logstash的事件信息,能够增加字段、移除字段、转换字段类型,通过正则表达式切分数据等,也能够依据条件判断来进行不同的数据处理形式,详情查看官网文档-罕用过滤插件

4.4.1 grok正则捕捉

grok 是Logstash中将非结构化数据解析成结构化数据以便于查问的最好工具,非常适合解析syslog logs,apache log, mysql log,以及一些其余的web log

(1)预约义表达式调用:
  • Logstash提供120个罕用正则表达式可供装置应用,装置之后你能够通过名称调用它们,语法如下:%{SYNTAX:SEMANTIC}
  • SYNTAX:示意曾经装置的正则表达式的名称
  • SEMANTIC:示意从Event中匹配到的内容的名称

例如:
Event的内容为“[debug] 127.0.0.1 - test log content”,匹配%{IP:client}将取得“client: 127.0.0.1”的后果,前提装置了IP表达式;如果你在捕捉数据时想进行数据类型转换能够应用%{NUMBER:num:int}这种语法,默认状况下,所有的返回后果都是string类型,以后Logstash所反对的转换类型仅有“int”和“float”;

一个略微残缺一点的事例:

  • 日志文件http.log内容:55.3.244.1 GET /index.html 15824 0.043
  • 表达式:%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}
  • 配置文件内容:

    input {file {  path => "/var/log/http.log"}}filter {grok {  match => {"message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"}}}
  • 输入后果:

    client: 55.3.244.1method: GETrequest: /index.htmlbytes: 15824duration: 0.043
(2)自定义表达式调用

语法:(?<field_name>the pattern here)

举例:捕捉10或11和长度的十六进制queue_id能够应用表达式(?<queue_id>[0-9A-F]{10,11})
装置自定义表达式

与预约义表达式雷同,你也能够将自定义的表达式配置到Logstash中,而后就能够像于定义的表达式一样应用;以下是操作步骤阐明:

  • 1、在Logstash根目录下创立文件夹“patterns”,在“patterns”文件夹中创立文件“extra”(文件名称无所谓,可本人抉择有意义的文件名称);
  • 2、在文件“extra”中增加表达式,格局:patternName regexp,名称与表达式之间用空格隔开即可,如下:

    # contents of ./patterns/postfix:POSTFIX_QUEUEID [0-9A-F]{10,11}
  • 3、应用自定义的表达式时须要指定“patterns_dir”变量,变量内容指向表达式文件所在的目录,举例如下:

<1>日志内容

Jan  1 06:25:43 mailserver14 postfix/cleanup[21403]: BEF25A72965: message-id=<20130101142543.5828399CCAF@mailserver14.example.com>

<2>Logstash配置

filter {  grok {    patterns_dir => ["./patterns"]    match => { "message" => "%{SYSLOGBASE} %{POSTFIX_QUEUEID:queue_id}: %{GREEDYDATA:syslog_message}" }  }}

<3>运行后果

timestamp: Jan 1 06:25:43logsource: mailserver14program: postfix/cleanuppid: 21403queue_id: BEF25A72965
(3)grok罕用配置参数(空 => 同上)
参数名称类型默认值形容信息
add_field
add_tag
break_on_matchbooleantruematch字段存在多个pattern时,当第一个匹配胜利后完结前面的匹配,如果想匹配所有的pattern,将此参数设置为false
enable_metric
id
keep_empty_capturesbooleanfalse如果为true,捕捉失败的字段奖设置为空值
matcharray{}设置pattern数组: match=> {“message” => [“Duration: %{NUMBER:duration}”,”Speed: %{NUMBER:speed}”]}
named_captures_onlybooleantrueIf true, only store named captures from grok.
overwritearray[]笼罩字段内容: match=> { “message” => “%{SYSLOGBASE} %{DATA:message}” } overwrite=> [ “message” ]
patterns_dirarray[]指定自定义的pattern文件寄存目录,Logstash在启动时会读取文件夹内patterns_files_glob 匹配的所有文件内容
patterns_files_globstring“*”用于匹配patterns_dir中的文件
periodic_flushbooleanfalse定期调用filter的flush办法
remove_fieldarray[]从Event中删除任意字段: remove_field=> [ “foo_%{somefield}” ]
remove_tagarray[]删除“tags”中的值: remove_tag=> [ “foo_%{somefield}” ]
tag_on_failurearray[“_grokparsefailure”]当没有匹配胜利时,将此array增加到“tags”字段内
tag_on_timeoutstring“_groktimeout”当匹配超时时,将此内容增加到“tags”字段内
timeout_millisnumber30000设置单个match到超时工夫,单位:毫秒,如果设置为0,则不启用超时设置
4.4.2 date工夫解决插件

该插件用于工夫字段的格局转换,比方将“Apr 17 09:32:01”(MMM dd HH:mm:ss)转换为“MM-dd HH:mm:ss”。而且通常状况下,Logstash会为主动给Event打上工夫戳,然而这个工夫戳是Event的解决工夫(次要是input接收数据的工夫),和日志记录时间会存在偏差(次要起因是buffer),咱们能够应用此插件用日志产生工夫替换掉默认是工夫戳的值。

罕用配置参数(空 => 同上)
参数名称类型默认值形容信息
add_field
add_tag
enable_metric
id
locale
matcharray[]工夫字段匹配,可自定多种格局,直到匹配到或匹配完结
periodic_flush
remove_field
remove_tag
tag_on_failure
targetstring“@timestamp”指定match匹配并且转换为date类型的存储地位(字段),默认笼罩到“@timestamp”
timezonestring指定工夫格式化的时区

留神:

match的格局:工夫字段匹配,可自定多种格局,直到匹配到或匹配完结,格局: [ field,formats…],如:match=>[ “logdate”,“MMM dd yyyy HH:mm:ss”,“MMM d yyyy HH:mm:ss”,“ISO8601”]

4.4.3 mutate数据批改插件

mutate 插件是 Logstash另一个重要插件。它提供了丰盛的根底类型数据处理能力。能够重命名,删除,替换和批改事件中的字段。

罕用配置参数(空 => 同上)
参数名称类型默认值形容信息
add_field
add_tag
converthash将指定字段转换为指定类型,字段内容是数组,则转换所有数组元素,如果字段内容是hash,则不做任何解决,目前反对的转换类型包含:integer,float, string, and boolean.例如: convert=> { “fieldname” => “integer” }
enable_metric
gsubarray相似replace办法,应用指定内容替换掉指标字符串的现有内容,前提是指标字段必须是字符串,否则不做任何解决,例如:[ “fieldname”, “/”, ““, “fieldname2”, “[\?#-]”, “.”],解释:应用“”替换掉“fieldname”中的所有“/”,应用“.”替换掉“fieldname2”中的所有“\”“?”、“#”和“-”
id
joinhash应用指定的符号将array字段的每个元素连接起来,对非array字段有效。例如: 应用“,”将array字段“fieldname”的每一个元素连接成一个字符串: join=> { “fieldname” => “,” }
lowercasearray将自定的字段值转换为小写
mergehash合并两个array或者hash,如果是字符串,将主动转换为一个单元素数组;将一个array和一个hash合并。例如: 将”added_field”合并到”dest_field”: merge=> { “dest_field” => “added_field” }
periodic_flush
remove_field
remove_tag
renamehash批改一个或者多个字段的名称。例如: 将”HOSTORIP”改名为”client_ip”: rename=> { “HOSTORIP” => “client_ip” }
replacehash应用新值残缺的替换掉指定字段的原内容,反对变量援用。例如: 应用字段“source_host”的内容拼接上字符串“: My new message”之后的后果替换“message”的值: replace=> { “message” => “%{source_host}: My new message” }
splithash依照自定的分隔符将字符串字段拆分成array字段,只能作用于string类型的字段。例如: 将“fieldname”的内容依照“,”拆分成数组: split=> { “fieldname” => “,” }
striparray去掉字段内容中间的空白字符。例如: 去掉“field1”和“field2”中间的空格: strip=> [“field1”, “field2”]
updatehash更新现有字段的内容,例如: 将“sample”字段的内容更新为“Mynew message”: update=> { “sample” => “My new message” }
uppercasearray将字符串转换为大写
4.4.4 JSON插件

JSON插件用于解码JSON格局的字符串,个别是一堆日志信息中,局部是JSON格局,局部不是的状况下

(1)配置事例
json {    source => ...}
  • 事例配置,message是JSON格局的字符串:"{\"uid\":3081609001,\"type\":\"signal\"}"

    filter {  json {      source => "message"      target => "jsoncontent"  }}
  • 输入后果:

    {  "@version": "1",  "@timestamp": "2014-11-18T08:11:33.000Z",  "host": "web121.mweibo.tc.sinanode.com",  "message": "{\"uid\":3081609001,\"type\":\"signal\"}",  "jsoncontent": {      "uid": 3081609001,      "type": "signal"  }}
  • 如果从事例配置中删除target,输入后果如下:

    {  "@version": "1",  "@timestamp": "2014-11-18T08:11:33.000Z",  "host": "web121.mweibo.tc.sinanode.com",  "message": "{\"uid\":3081609001,\"type\":\"signal\"}",  "uid": 3081609001,  "type": "signal"}
(2)罕用配置参数(空 => 同上)
参数名称类型默认值形容信息
add_field
add_tag
enable_metric
id
periodic_flush
remove_field
remove_tag
skip_on_invalid_jsonbooleanfalse是否跳过验证不通过的JSON
sourcestring必须设置项,指定须要解码的JSON字符串字段
tag_on_failure
targetstring解析之后的JSON对象所在的字段名称,如果没有,JSON对象的所有字段将挂在根节点下
4.4.5 elasticsearch查问过滤插件

用于查问Elasticsearch中的事件,可将查问后果利用于以后事件中

罕用配置参数(空 => 同上)
参数名称类型默认值形容信息
add_field
add_tag
ca_filestringSSL Certificate Authority file path
enable_sortbooleantrue是否对后果进行排序
fieldsarray{}从老事件中复制字段到新事件中,老事件来源于elasticsearch(用于查问更新)
hostsarray[“localhost:9200”]elasticsearch服务列表
indexstring“”用逗号分隔的elasticsearch索引列表,如果要操作所有所有应用“_all”或者“”,保留数据到elasticsearch时,如果索引不存在会主动以此创立
passwordstring明码
periodic_flush
querystring查问elasticsearch的查问字符串
remove_field
remove_tag
result_sizenumber1查问elasticsearch时,返回后果的数量
sortstring“@timestamp:desc”逗号分隔的“:”列表,用于查问后果排序
sslbooleanfalseSSL
tag_on_failure
userstring用户名
4.5 罕用输入插件(Output plugin)
4.5.1 ElasticSearch输入插件

用于将事件信息写入到Elasticsearch中,官网举荐插件,ELK必备插件

(1)配置事例
output {    elasticsearch {        hosts => ["127.0.0.1:9200"]        index => "filebeat-%{type}-%{+yyyy.MM.dd}"        template_overwrite => true    }}
(2)罕用配置参数(空 => 同上)
参数名称类型默认值形容信息
absolute_healthcheck_pathbooleanfalse当配置了“healthcheck_path”时,决定elasticsearch健康检查URL是否依照绝对路径配置。例如: elasticsearch拜访门路为:”http://localhost:9200/es“,“h... 以后参数为true时的拜访门路为:”http://localhost:9200/es/heal... 以后参数为false时的拜访门路为:”http://localhost:9200/health”
absolute_sniffing_pathbooleanfalse当配置了“sniffing_path”时,决定elasticsearch的sniffing拜访门路配置。例如: elasticsearch拜访门路为:“http://localhost:9200/es”,“s... 以后参数为true时的拜访门路为:“http://localhost:9200/es/_sni... 以后参数为false时的拜访门路为:“http://localhost:9200/_sniffing”
actionstring“index”对elasticsearch的操作类型,可用的操作类型: index:索引Logstash事件数据到elasticsearch; delete:依据id删除文档,id必须指定; delete:依据id删除文档,id必须指定; update:依据id更新文档
cacertstring.cer或者.pem证书文件门路,应用证书进行elasticsearch认证
codec
doc_as_upsertbooleanfalse使update启用upsert模式,即文档不存在时创立新文档
document_idstringelasticsearch中的文档id,用来笼罩曾经保留到elasticsearch中的文档
document_typestring指定存入elasticsearch中的文档的type,没有指定的状况下会应用Event信息中的“type”字段的值作为elasticsearch的type
enable_metric
failure_type_logging_whitelistarray[]elasricsearch报错白名单,白名单的异样信息不会被记入logstash的log中,比方你想疏忽掉所有的“document_already_exists_exception”异样
flush_size
healthcheck_pathstring“/”elasricsearch查看状态查看门路
hostsstring[//127.0.0.1]elasticsearch服务地址列表,如果配置多个将启用负载平衡
id
idle_flush_timenumber1距离多长时间将数据输入到elasticsearch中一次,次要用于较慢的事件
indexstring“logstash-%{+YYYY.MM.dd}”指定elasticsearch存储数据时的所有名称,反对变量援用,比方你能够按天创立索引,不便删除历史数据或者查问制订范畴内的数据
keystorestring用于指定密钥库门路,能够是.jks或者.p12
keystore_passwordstring密钥库明码
manage_templatebooleantrue是否启用elasticsearch模版,Logstash自带一个模版,然而只有名称匹配“logstash-*”的索引才会利用该默版
parametershash增加到elasticsearch URL前面的参数键值对
parentstring“nil”为文档子节点指定父节点的id
passwordstringelasticsearch集群拜访明码
pathstring当设置了elasticsearch代理时用此参数从定向HTTP API,如果“hosts”中曾经蕴含此门路,则不须要设置
pipelinestring“nil”设置Event管道
pool_maxnumber1000elasticsearch最大连接数
pool_max_per_routenumber100每个“endpoint”的最大连接数
proxystring代理URL
resurrect_delaynumber5查看挂掉的“endpoint”是否恢复正常的频率
retry_initial_intervalnumber2设置批量重试的工夫距离,重试到 “retry_max_interval”次
retry_max_intervalnumber64Setmax interval in seconds between bulk retries.
retry_on_conflictnumber1Thenumber of times Elasticsearch should internally retry an update/upserteddocument
routingstring指定Event路由
scriptstring“”设置“scriptedupdate”模式下的脚本名称
script_langstring“painless”设置脚本语言
script_type“inline”、“indexed”、 “file”[“inline”]Definethe type of script referenced by “script” variable inline :”script” contains inline script indexed : “script” containsthe name of script directly indexed in elasticsearch file : “script”contains the name of script stored in elasticseach’s config directory
script_var_namestring“event”Setvariable name passed to script (scripted update)
scripted_upsertbooleanfalseifenabled, script is in charge of creating non-existent document (scriptedupdate)
sniffing
sniffing_delay
sniffing_path
ssl
ssl_certificate_verification
templatestring设置自定义的默版寄存门路
template_namestring“logstash”设置应用的默版名称
template_overwritebooleanfalse是否始终笼罩现有模版
timeoutnumber60网络超时工夫
truststorestring“:truststore”或者“:cacert”证书库门路
truststore_passwordstring证书库明码
upsertstring“”Setupsert content for update mode.s Create a new document with this parameter asjson string if document_id doesn’texists
userstring“”elasticsearch用户名
validate_after_inactivitynumber10000距离多长时间放弃连贯可用
versionstring存入elasticsearch的文档的版本号
version_type“internal”、“external”、 “external_gt”、 “external_gte”、“force”
workersstring1whenwe no longer support the :legacy type This is hacky, but it can only be herne
4.5.2 Redis输入插件

用于将Event写入Redis中进行缓存,通常状况下Logstash的Filter解决比拟吃系统资源,简单的Filter解决会十分耗时,如果Event产生速度比拟快,能够应用Redis作为buffer应用

(1)配置事例
output {    redis {        host => "127.0.0.1"        port => 6379        data_type => "list"        key => "logstash-list"    }}
(2)罕用配置参数(空 => 同上)
参数名称类型默认值形容信息
batchbooleanfalse是否启用redis的batch模式,仅在data_type=”list”时无效
batch_eventsnumber50batch大小,batch达到此大小时执行“RPUSH”
batch_timeoutnumber5batch超时工夫,超过这个工夫执行“RPUSH”
codec
congestion_intervalnumber1距离多长时间查看阻塞,如果设置为0,则没个Event查看一次
congestion_thresholdnumber0
data_type“list”、“channel”存储在redis中的数据类型,如果应用“list”,将采纳“RPUSH”操作,如果是“channel”,将采纳“PUBLISH”操作
dbnumber0应用的redis数据库编号
enable_metric
hostarray[“127.0.0.1”]redis服务列表,如果配置多个,将随机抉择一个,如果以后的redis服务不可用,将抉择下一个
id
keystringThename of a Redis list or channel. Dynamic names are valid here, forexample logstash-%{type}.
passwordstringredis服务明码
portnumber6379redis服务监听端口
reconnect_intervalnumber1连贯失败时的重连距离
shuffle_hostsbooleantrueShufflethe host list during Logstash startup.
timeoutnumber5redis连贯超时工夫
workersnumber1whenwe no longer support the :legacy type This is hacky, but it can only be herne
4.5.3 File输入插件

用于将Event输入到文件内

(1)配置事例
output {    file {        path => ...        codec => line { format => "custom format: %{message}"}    }}
(2)罕用配置参数(空 => 同上)
参数名称类型默认值形容信息
codec
create_if_deletedbooleantrue如果指标文件被删除,则在写入事件时创立新文件
dir_modenumber-1设置目录的拜访权限,如果为“-1”,应用操作系统默认的拜访权限
enable_metric
file_modenumber-1设置文件的拜访权限,如果为“-1”,应用操作系统默认的拜访权限
filename_failurestring“_filepath_failures”如果指定的文件门路有效,这会在目录内创立这个文件并记录数据
flush_intervalnumber2flush距离
gzipbooleanfalse是否启用gzip压缩
id
pathstring必须设置项,文件输入门路,如:path =>”./test-%{+YYYY-MM-dd}.txt”
workersstring1whenwe no longer support the :legacy type This is hacky, but it can only be herne
4.5.4 Kafka输入插件

用于将Event输入到Kafka指定的Topic中,官网Kafka详情配置

(1)配置事例
output {    kafka {        bootstrap_servers => "localhost:9092"        topic_id => "test"        compression_type => "gzip"    }}
(2)罕用配置参数(空 => 同上)
参数名称类型默认值形容信息
bootstrap_serversstringKafka集群信息,格局为 host1:port1,host2:port2
topic_idstring产生音讯的主题
compression_typeStringnone生产者生成的所有数据的压缩类型。默认为无(即无压缩)。有效值为 none、gzip、snappy 或 lz4。
batch_sizenumber16384配置以字节为单位管制默认批处理大小
buffer_memorynumber33554432(32MB)生产者可用于缓冲期待发送到服务器的记录的总内存字节数
max_request_sizenumber1048576(1MB)申请的最大大小
flush_intervalnumber2flush距离
gzipbooleanfalse是否启用gzip压缩
id
pathstring必须设置项,文件输入门路,如:path =>”./test-%{+YYYY-MM-dd}.txt”
workersstring1whenwe no longer support the :legacy type This is hacky, but it can only be herne
4.6 罕用编码插件(Codec plugin)
4.6.1 JSON编码插件

间接输出预约义好的 JSON 数据,这样就能够省略掉 filter/grok 配置

  • 配置事例

    json {}
  • 罕用配置参数

    参数名称类型默认值形容信息
    charsetstring“UTF-8”字符集
    enable_metric
    id

五 Logstash实例

5.1 接管Filebeat事件,输入到Redis
input {    beats {        port => 5044    }}output {    redis {        host => "127.0.0.1"        port => 6379        data_type => "list"        key => "logstash-list"    }}
5.2 读取Redis数据,依据“type”判断,别离解决,输入到ES
input {    redis {        host => "127.0.0.1"        port => 6379        data_type => "list"        key => "logstash-list"    }}filter {    if [type] == "application" {        grok {            match => ["message", "(?m)-(?<systemName>.+?):(?<logTime>(?>\d\d){1,2}-(?:0?[1-9]|1[0-2])-(?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9]) (?:2[0123]|[01]?[0-9]):(?:[0-5][0-9]):(?:(?:[0-5][0-9]|60)(?:[:.,][0-9]+)?)) \[(?<level>(\b\w+\b)) *\] (?<thread>(\b\w+\b)) \((?<point>.*?)\) - (?<content>.*)"]        }        date {            match => ["logTime", "yyyy-MM-dd HH:mm:ss,SSS"]        }        json {            source => "message"        }        date {            match => ["timestamp", "yyyy-MM-dd HH:mm:ss,SSS"]        }    }    if [type] == "application_bizz" {        json {            source => "message"        }        date {            match => ["timestamp", "yyyy-MM-dd HH:mm:ss,SSS"]        }    }    mutate {        remove_field => ["@version", "beat", "logTime"]    }}output {    stdout{    }    elasticsearch {        hosts => ["127.0.0.1:9200"]        index => "filebeat-%{type}-%{+yyyy.MM.dd}"        document_type => "%{documentType}"        template_overwrite => true    }}

六 利用场景

6.1 以logstash作为日志搜索器

架构:logstash采集、解决、转发到elasticsearch存储,在kibana进行展现

特点:这种构造因为须要在各个服务器上部署 Logstash,而它比拟耗费 CPU 和内存资源,所以比拟适宜计算资源丰盛的服务器,否则容易造成服务器性能降落,甚至可能导致无奈失常工作。

6.2 音讯模式

音讯模式:Beats 还不反对输入到音讯队列(新版本除外:5.0版本及以上),所以在音讯队列前后两端只能是 Logstash 实例。logstash从各个数据源收集数据,不通过任何解决转换仅转收回到音讯队列(kafka、redis、rabbitMQ等),后logstash从音讯队列取数据进行转换剖析过滤,输入到elasticsearch,并在kibana进行图形化展现

架构(Logstash进行日志解析所在服务器性能各方面必须要足够好):

模式特点:这种架构适宜于日志规模比拟宏大的状况。但因为 Logstash 日志解析节点和 Elasticsearch 的负荷比拟重,可将他们配置为集群模式,以分担负荷。引入音讯队列,平衡了网络传输,从而升高了网络灵通,尤其是失落数据的可能性,但仍然存在 Logstash 占用系统资源过多的问题

工作流程:Filebeat采集—> logstash转发到kafka—> logstash解决从kafka缓存的数据进行剖析—> 输入到es—> 显示在kibana

6.3 logstash(非filebeat)进行文件采集,输入到kafka缓存,读取kafka数据并解决输入到文件或es
6.4 logstash同步mysql数据库数据到es(logstash5版本以上已集成jdbc插件,无需下载安装,间接应用)

七 Logstash和Flume比照

首先从构造比照,咱们会惊人的发现,两者是如许的类似!Logstash的Shipper、Broker、Indexer别离和Flume的Source、Channel、Sink各自对应!只不过是Logstash集成了,Broker能够不须要,而Flume须要独自配置,且缺一不可,但这再一次阐明了计算机的设计思维都是通用的!只是实现形式会不同而已。

从程序员的角度来说,上文也提到过了,Flume是真的很繁琐,你须要别离作source、channel、sink的手工配置,而且波及到简单的数据采集环境,你可能还要做多个配置,这在下面提过了,反过来说Logstash的配置就十分简洁清晰,三个局部的属性都定义好了,程序员本人去抉择就行,就算没有,也能够自行开发插件,十分不便。当然了,Flume的插件也很多,但Channel就只有内存和文件这两种(其实当初不止了,但罕用的也就两种)。读者能够看得出来,两者其实配置都是非常灵活的,只不过看场景取舍罢了。

其实从作者和历史背景来看,两者最后的设计目标就不太一样。Flume自身最后设计的目标是为了把数据传入HDFS中(并不是为了采集日志而设计,这和Logstash有基本的区别),所以理所应当侧重于数据的传输,程序员要十分分明整个数据的路由,并且比Logstash还多了一个可靠性策略,上文中的channel就是用于长久化目标,数据除非确认传输到下一地位了,否则不会删除,这一步是通过事务来管制的,这样的设计使得可靠性十分好。相同,Logstash则显著偏重对数据的预处理,因为日志的字段须要大量的预处理,为解析做铺垫。

回过来看我当初为什么先讲Logstash而后讲Flume?这外面有几个思考

  • 其一:Logstash其实更有点像通用的模型,所以对新人来说了解起来更简略,而Flume这样轻量级的线程,可能有肯定的计算机编程根底了解起来更好;
  • 其二:目前大部分的状况下,Logstash用的更加多,这个数据我本人没有统计过,然而依据教训判断,Logstash能够和ELK其余组件配合应用,开发、利用都会简略很多,技术成熟,应用场景宽泛。相同Flume组件就须要和其余很多工具配合应用,场景的针对性会比拟强,更不必提Flume的配置过于繁琐简单了。

最初总结下来,咱们能够这么了解他们的区别:

Logstash就像是买来的台式机,主板、电源、硬盘,机箱(Logstash)把外面的货色全副装好了,你能够间接用,当然也能够本人组装批改;

Flume就像提供给你一套残缺的主板,电源、硬盘,Flume没有打包,只是像说明书一样领导你如何组装,能力运行的起来。

参考文档:

  • [1] 迷途的攻城狮.CSDN: https://blog.csdn.net/chenlei... , 2017-06-22.
  • [2] Logstash 官网: https://www.elastic.co/cn/log...