乐趣区

关于后端:Logstash数据流引擎

Logstash- 数据流引擎

作者 | WenasWei

一 Logstash

Logstash 是具备实时流水线性能的开源数据收集引擎。Logstash 能够动静对立来自不同起源的数据,并将数据标准化到您抉择的指标地位。革除所有数据并使其民主化,以用于各种高级上游剖析和可视化用例。

1.1 Logstash 简介

Logstash 是一个数据流引擎:

  • 它是用于数据物流的开源流式 ETL(Extract-Transform-Load)引擎
  • 在几分钟内建设数据流管道
  • 具备程度可扩大及韧性且具备自适应缓冲
  • 不可知的数据源
  • 具备 200 多个集成和处理器的插件生态系统
  • 应用 Elastic Stack 监督和治理部署

官网介绍:Logstash is an open source data collection engine with real-time pipelining capabilities。简略来说 logstash 就是一根具备实时数据传输能力的管道,负责将数据信息从管道的输出端传输到管道的输入端;与此同时这根管道还能够让你依据本人的需要在两头加上滤网,Logstash 提供里很多功能强大的滤网以满足你的各种利用场景。

1.2 数据处理

Logstash 是一个功能强大的工具,可与各种部署集成。它提供了大量插件,可帮忙你解析,丰盛,转换和缓冲来自各种起源的数据。如果你的数据须要 Beats 中没有的其余解决,则须要将 Logstash 增加到部署中。

当下最为风行的数据源:

Logstash 能够摄入日志,文件,指标或者网路实在数据。通过 Logstash 的解决,变为能够应用的 Web
Apps 能够耗费的数据,也能够存储于数据中心,或变为其它的流式数据:

  • Logstash 能够很不便地和 Beats 一起单干,这也是被举荐的办法
  • Logstash 也能够和那些驰名的云厂商的服务一起单干解决它们的数据
  • 它也能够和最为同样的信息音讯队列,比方 redis 或 kafka 一起合作
  • Logstash 也能够应用 JDBC 来拜访 RDMS 数据
  • 它也能够和 IoT 设施一起解决它们的数据
  • Logstash 不仅仅能够把数据传送到 Elasticsearch,而且它还能够把数据发送至很多其它的目的地,并作为它们的输出源做进一步的解决

二 Logstash 零碎架构

Logstash 蕴含 3 个次要局部:输出(inputs),过滤器(filters)和输入(outputs)

Logstash 的事件(logstash 将数据流中等每一条数据称之为一个 event)解决流水线有三个次要角色实现:inputs –> filters –> outputs:

  • inpust:必须,负责产生事件(Inputs generate events),罕用:File、syslog、redis、kakfa、beats(如:Filebeats)
  • filters:可选,负责数据处理与转换(filters modify them),罕用:grok、mutate、drop、clone、geoip
  • outpus:必须,负责数据输入(outputs ship them elsewhere),罕用:elasticsearch、file、graphite、kakfa、statsd

三 Logstash 装置

3.1 环境清单
  • 操作系统:Linux #56-Ubuntu SMP Tue Jun 4 22:49:08 UTC 2019 x86_64
  • Logstash 版本:logstash-6.2.4
  • Jdk 版本:1.8.0_152
3.2 Linux 装置 JDK
3.2.1 解压缩并挪动到指定目录(约定的目录:/usr/local)
(1)解压缩
tar -zxvf jdk-8u152-linux-x64.tar.gz
(2)创立目录
mkdir -p /usr/local/java
(3)挪动安装包
mv jdk1.8.0_152/ /usr/local/java/
(4)设置所有者
chown -R root:root /usr/local/java/
3.2.2 配置环境变量
(1)配置零碎环境变量
vi /etc/environment
(2)增加如下语句
PATH="/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games"
export JAVA_HOME=/usr/local/java/jdk1.8.0_152
export JRE_HOME=/usr/local/java/jdk1.8.0_152/jre
export CLASSPATH=$CLASSPATH:$JAVA_HOME/lib:$JAVA_HOME/jre/lib
(3)配置用户环境变量
nano /etc/profile
(4)增加如下语句(肯定要放两头)
if ["$PS1"]; then
  if ["$BASH"] && ["$BASH" != "/bin/sh"]; then
    # The file bash.bashrc already sets the default PS1.
    # PS1='\h:\w\$'
    if [-f /etc/bash.bashrc]; then
      . /etc/bash.bashrc
    fi
  else
    if ["`id -u`" -eq 0]; then
      PS1='#'
    else
      PS1='$'
    fi
  fi
fi

export JAVA_HOME=/usr/local/java/jdk1.8.0_152
export JRE_HOME=/usr/local/java/jdk1.8.0_152/jre
export CLASSPATH=$CLASSPATH:$JAVA_HOME/lib:$JAVA_HOME/jre/lib
export PATH=$JAVA_HOME/bin:$JAVA_HOME/jre/bin:$PATH:$HOME/bin

if [-d /etc/profile.d]; then
  for i in /etc/profile.d/*.sh; do
    if [-r $i]; then
      . $i
    fi
  done
  unset i
fi
(5)使用户环境变量失效
source /etc/profile
(6)测试是否装置胜利
$ java -version
java version "1.8.0_152"
Java(TM) SE Runtime Environment (build 1.8.0_152-b16)
Java HotSpot(TM) 64-Bit Server VM (build 25.152-b16, mixed mode)
3.3 装置 Logstash
3.3.1 创立装置目录
$ sudo mkdir /usr/local/logstash
3.3.2 下载 Logstash 安装文件
$ wget -P /usr/local/logstash https://artifacts.elastic.co/downloads/logstash/logstash-6.2.4.tar.gz
3.3.2 解压缩安装文件
$ cd /usr/local/logstash/
$ sudo tar -zxvf logstash-6.2.4.tar.gz
3.3.3 测试装置是否胜利

测试: 疾速启动,规范输入输出作为 input 和 output,没有 filter

$ cd logstash-6.2.4/
$ ./bin/logstash -e 'input {stdin {} } output {stdout {} }'

Sending Logstash's logs to /usr/local/logstash/logstash-6.2.4/logs which is now configured via log4j2.properties
[2021-05-27T00:22:28,729][INFO][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"/usr/local/logstash/logstash-6.2.4/modules/fb_apache/configuration"}
[2021-05-27T00:22:28,804][INFO][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"/usr/local/logstash/logstash-6.2.4/modules/netflow/configuration"}
[2021-05-27T00:22:29,827][WARN][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2021-05-27T00:22:30,979][INFO][logstash.runner] Starting Logstash {"logstash.version"=>"6.2.4"}
[2021-05-27T00:22:31,821][INFO][logstash.agent] Successfully started Logstash API endpoint {:port=>9600}
[2021-05-27T00:22:36,463][INFO][logstash.pipeline] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>1, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
[2021-05-27T00:22:36,690][INFO][logstash.pipeline] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0x55a5abea run>"}
The stdin plugin is now waiting for input:
[2021-05-27T00:22:36,853][INFO][logstash.agent] Pipelines running {:count=>1, :pipelines=>["main"]}

## 此时命令窗口停留在期待输出状态,键盘键入任意字符 ##

hello world

## 下方是 Logstash 输入到成果 ##

{
    "@timestamp" => 2021-05-26T16:22:52.527Z,
          "host" => "*******",
       "message" => "hello world",
      "@version" => "1"
}

四 Logstash 参数与配置

4.1 罕用启动参数
参数 阐明 举例
-e 立刻执行,应用命令行里的配置参数启动实例 ./bin/logstash -e‘input {stdin {}} output {stdout {}}’
-f 指定启动实例的配置文件 ./bin/logstash -f config/test.conf
-t 测试配置文件的正确性 ./bin/logstash-f config/test.conf -t
-l 指定日志文件名称 ./bin/logstash-f config/test.conf -l logs/test.log
-w 指定 filter 线程数量,默认线程数是 5 ./bin/logstash-f config/test.conf -w 8
4.2 配置文件构造及语法
(1)区段

Logstash 通过 {} 来定义区域,区域内能够定义插件,一个区域内能够定义多个插件,如下:

input {stdin {}
    beats {port => 5044}
}
(2)数据类型

Logstash 仅反对大量的数据类型:

  • Boolean:ssl_enable => true
  • Number:port => 33
  • String:name =>“Hello world”
  • Commonts:# this is a comment
(3)字段援用

Logstash 数据流中的数据被称之为 Event 对象,Event 以 JSON 构造形成,Event 的属性被称之为字段,如果你像在配置文件中援用这些字段,只须要把字段的名字写在中括号 [] 里就行了,如 [type],对于嵌套字段每层字段名称都写在[] 里就能够了,比方:tags;除此之外,对于 Logstash 的 arrag 类型反对下标与倒序下表,如:tags[0],tags[-1]。

(4)条件判断

Logstash 反对上面的操作符:

  • equality:==, !=, <, >, <=, >=
  • regexp:=~, !~
  • inclusion:in, not in
  • boolean:and, or, nand, xor
  • unary:!

例如:

if EXPRESSION {...} else if EXPRESSION {...} else {...}
(5)环境变量援用

Logstash 反对援用零碎环境变量,环境变量不存在时能够设置默认值,例如:

export TCP_PORT=12345

input {
  tcp {port => "${TCP_PORT:54321}"
  }
}
4.3 罕用输出插件(Input plugin)

输出插件包含有以下多种,详情查看官网文档 - 罕用输出插件:

  • elasticsearch
  • exec
  • file
  • github
  • http
  • jdbc
  • jms
  • jmx
  • kafka
  • log4j
  • rabbitmq
  • redis
  • tcp
  • udp
  • unix
  • websocket
4.3.1 File 读取插件

文件读取插件次要用来抓取文件的变动信息,将变动信息封装成 Event 过程解决或者传递。

  • 配置事例

    input
    file {path => ["/var/log/*.log", "/var/log/message"]
      type => "system"
      start_position => "beginning"
    }
    }
  • 罕用参数
参数名称 类型 默认值 形容信息
add_field hash {} 用于向 Event 中增加字段
close_older number 3600 设置文件多久秒内没有更新就关掉对文件的监听
codec string “plain” 输出数据之后对数据进行解码
delimiter string “\n” 文件内容的行分隔符,默认依照行进行 Event 封装
discover_interval number 15 距离多少秒查看一下 path 匹配对门路下是否有新文件产生
enable_metric boolean true
exclude array path 匹配的文件中指定例外,如:path =>“/var/log/“;exclude =>”.gz”
id string 辨别两个雷同类型的插件,比方两个 filter,在应用 Monitor API 监控是能够辨别,倡议设置上 ID
ignore_older number 疏忽历史批改,如果设置 3600 秒,logstash 只会发现一小时内被批改过的文件,一小时之前批改的文件的变动不会被读取,如果再次批改该文件,所有的变动都会被读取,默认被禁用
max_open_files number logstash 能够同时监控的文件个数(同时关上的 file_handles 个数),如果你须要解决多于这个数量多文件,能够应用“close_older”去敞开一些文件
path array 必须设置项,用于匹配被监控的文件,如“/var/log/.log”或者“/var/log//.log”,必须应用绝对路径
sincedb_path string 文件读取记录,必须指定一个文件而不是目录,文件中保留没个被监控的文件等以后 inode 和 byteoffset,默认寄存地位“$HOME/.sincedb*”
sincedb_write_interval number 15 距离多少秒写一次 sincedb 文件
start_position “beginning”,“end” ”end” 从文件等结尾还是结尾读取文件内容,默认是结尾,如果须要导入文件中的老数据,能够设置为“beginning”,该选项只在第一次启动 logstash 时无效,如果文件曾经存在于 sincedb 的记录内,则此配置有效
stat_interval number 1 距离多少秒检查一下文件是否被批改,加大此参数将升高零碎负载,然而减少了发现新日志的间隔时间
tags array 能够在 Event 中减少标签,以便于在后续的解决流程中应用
type string Event 的 type 字段,如果采纳 elasticsearch 做 store,在默认状况下将作为 elasticsearch 的 type
4.3.2 TCP 监听插件

TCP 插件有两种工作模式,“Client”和“Server”,别离用于发送网络数据和监听网络数据。

  • 配置事例

    tcp {port => 41414}
  • 罕用参数(空 => 同上)
参数名称 类型 默认值 形容信息
add_field
codec
enable_metric
host
id
mode “server”、“client” “server” “server”监听“client”的连贯申请,“client”连贯“server”
port number 必须设置项,“server”模式时指定监听端口,“client”模式指定连贯端口
proxy_protocol boolean false Proxyprotocol support, only v1 is supported at this time
ssl_cert
ssl_enable
ssl_extra_chain_certs
ssl_key
ssl_key_passphrase
ssl_verify
tags
type
4.3.3 Redis 读取插件

用于读取 Redis 中缓存的数据信息。

  • 配置事例

    input {
    redis {
      host => "127.0.0.1"
      port => 6379
      data_type => "list"
      key => "logstash-list"
    }
    }
  • 罕用参数(空 => 同上)
参数名称 类型 默认值 形容信息
add_field
batch_count number 125 应用 redis 的 batch 个性,须要 redis2.6.0 或者更新的版本
codec
data_type list,channel,pattern_channel 必须设置项,依据设置不同,订阅 redis 应用不同的命令,顺次是:BLPOP、SUBSCRIBE、PSUBSCRIBE
db number 0 指定应用的 redis 数据库
enable_metric
host string 127.0.0.1 redis 服务地址
id
key string 必须设置项,reidslist 或者 channel 的 key 名称
password string redis 明码
port number 6379 redis 连贯端口号
tags
threads number 1
timeout number 5 redis 服务连贯超时工夫,单位:秒

留神:

data_type 须要留神的是“channel”和“pattern_channel”是播送类型,雷同的数据会同时发送给订阅了该 channel 的 logstash,也就是说在 logstash 集群环境下会呈现数据反复,集群中的每一个节点都将收到同样的数据,然而在单节点状况下,“pattern_channel”能够同时定于满足 pattern 的多个 key

4.3.4 Kafka 读取插件

用于读取 Kafka 中推送的主题数据信息。

  • 配置事例

    input {
    kafka {
      bootstrap_servers => "kafka-01:9092,kafka-02:9092,kafka-03:9092"
      topics_pattern  => "elk-.*"
      consumer_threads => 5
      decorate_events => true
      codec => "json"
      auto_offset_reset => "latest"
      group_id => "logstash1"##logstash 集群需雷同
    }
    }
  • 罕用参数:
参数名称 类型 默认值 形容信息
bootstrap_servers string localhost:9092 Kafka 列表,用于建设与集群的初始连贯
topics_pattern string 要订阅的主题正则表达式模式。应用此配置时,主题配置将被疏忽。
consumer_threads number 并发线程数,现实状况下,您应该领有与分区数量一样多的线程
decorate_events string none 可承受的值为:none/basic/extended/false
codec codec plain 用于输出数据的编解码器
auto_offset_reset string 当 Kafka 初始偏移量
group_id String logstash 该消费者所属的组的标识符

留神:

  • auto_offset_reset: earliest- 将偏移量主动重置为最早的偏移量;latest- 主动将偏移量重置为最新偏移量;none- 如果未找到消费者组的先前偏移量,则向消费者抛出异样;anything else- 向消费者抛出异样。
  • decorate_events: none:未增加元数据,basic:增加了记录的属性,extended:记录的属性,增加题目,false:不倡议应用的别名 none,true:不倡议应用的别名 basic
4.4 罕用过滤插件(Filter plugin)

丰盛的过滤器插件的是 logstash 威力如此弱小的重要因素,过滤器插件次要解决流经以后 Logstash 的事件信息,能够增加字段、移除字段、转换字段类型,通过正则表达式切分数据等,也能够依据条件判断来进行不同的数据处理形式, 详情查看官网文档 - 罕用过滤插件

4.4.1 grok 正则捕捉

grok 是 Logstash 中将非结构化数据解析成结构化数据以便于查问的最好工具,非常适合解析 syslog logs,apache log,mysql log,以及一些其余的 web log

(1)预约义表达式调用:
  • Logstash 提供 120 个罕用正则表达式可供装置应用,装置之后你能够通过名称调用它们,语法如下:%{SYNTAX:SEMANTIC}
  • SYNTAX:示意曾经装置的正则表达式的名称
  • SEMANTIC:示意从 Event 中匹配到的内容的名称

例如:
Event 的内容为“[debug] 127.0.0.1 – test log content”,匹配 %{IP:client}将取得“client: 127.0.0.1”的后果,前提装置了 IP 表达式;如果你在捕捉数据时想进行数据类型转换能够应用 %{NUMBER:num:int}这种语法,默认状况下,所有的返回后果都是 string 类型,以后 Logstash 所反对的转换类型仅有“int”和“float”;

一个略微残缺一点的事例:

  • 日志文件 http.log 内容:55.3.244.1 GET /index.html 15824 0.043
  • 表达式:%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}
  • 配置文件内容:

    input {
    file {path => "/var/log/http.log"}
    }
    filter {
    grok {match => {"message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"}
    }
    }
  • 输入后果:

    client: 55.3.244.1
    method: GET
    request: /index.html
    bytes: 15824
    duration: 0.043
(2)自定义表达式调用

语法:(?<field_name>the pattern here)

举例:捕捉 10 或 11 和长度的十六进制 queue_id 能够应用表达式 (?<queue_id>[0-9A-F]{10,11})
装置自定义表达式

与预约义表达式雷同,你也能够将自定义的表达式配置到 Logstash 中,而后就能够像于定义的表达式一样应用;以下是操作步骤阐明:

  • 1、在 Logstash 根目录下创立文件夹“patterns”,在“patterns”文件夹中创立文件“extra”(文件名称无所谓,可本人抉择有意义的文件名称);
  • 2、在文件“extra”中增加表达式,格局:patternName regexp,名称与表达式之间用空格隔开即可,如下:

    # contents of ./patterns/postfix:
    POSTFIX_QUEUEID [0-9A-F]{10,11}
  • 3、应用自定义的表达式时须要指定“patterns_dir”变量,变量内容指向表达式文件所在的目录,举例如下:

<1> 日志内容

Jan  1 06:25:43 mailserver14 postfix/cleanup[21403]: BEF25A72965: message-id=<20130101142543.5828399CCAF@mailserver14.example.com>

<2>Logstash 配置

filter {
  grok {patterns_dir => ["./patterns"]
    match => {"message" => "%{SYSLOGBASE} %{POSTFIX_QUEUEID:queue_id}: %{GREEDYDATA:syslog_message}" }
  }
}

<3> 运行后果

timestamp: Jan 1 06:25:43
logsource: mailserver14
program: postfix/cleanup
pid: 21403
queue_id: BEF25A72965
(3)grok 罕用配置参数(空 => 同上)
参数名称 类型 默认值 形容信息
add_field
add_tag
break_on_match boolean true match 字段存在多个 pattern 时,当第一个匹配胜利后完结前面的匹配,如果想匹配所有的 pattern,将此参数设置为 false
enable_metric
id
keep_empty_captures boolean false 如果为 true,捕捉失败的字段奖设置为空值
match array {} 设置 pattern 数组: match=> {“message”=> [“Duration: %{NUMBER:duration}”,”Speed: %{NUMBER:speed}”]}
named_captures_only boolean true If true, only store named captures from grok.
overwrite array [] 笼罩字段内容: match=> {“message”=>“%{SYSLOGBASE} %{DATA:message}”} overwrite=> [“message”]
patterns_dir array [] 指定自定义的 pattern 文件寄存目录,Logstash 在启动时会读取文件夹内 patterns_files_glob 匹配的所有文件内容
patterns_files_glob string “*” 用于匹配 patterns_dir 中的文件
periodic_flush boolean false 定期调用 filter 的 flush 办法
remove_field array [] 从 Event 中删除任意字段: remove_field=> [“foo_%{somefield}”]
remove_tag array [] 删除“tags”中的值: remove_tag=> [“foo_%{somefield}”]
tag_on_failure array [“_grokparsefailure”] 当没有匹配胜利时,将此 array 增加到“tags”字段内
tag_on_timeout string “_groktimeout” 当匹配超时时,将此内容增加到“tags”字段内
timeout_millis number 30000 设置单个 match 到超时工夫,单位:毫秒,如果设置为 0,则不启用超时设置
4.4.2 date 工夫解决插件

该插件用于工夫字段的格局转换,比方将“Apr 17 09:32:01”(MMM dd HH:mm:ss)转换为“MM-dd HH:mm:ss”。而且通常状况下,Logstash 会为主动给 Event 打上工夫戳,然而这个工夫戳是 Event 的解决工夫(次要是 input 接收数据的工夫),和日志记录时间会存在偏差(次要起因是 buffer),咱们能够应用此插件用日志产生工夫替换掉默认是工夫戳的值。

罕用配置参数(空 => 同上)
参数名称 类型 默认值 形容信息
add_field
add_tag
enable_metric
id
locale
match array [] 工夫字段匹配, 可自定多种格局,直到匹配到或匹配完结
periodic_flush
remove_field
remove_tag
tag_on_failure
target string “@timestamp” 指定 match 匹配并且转换为 date 类型的存储地位(字段),默认笼罩到“@timestamp”
timezone string 指定工夫格式化的时区

留神:

match 的格局: 工夫字段匹配, 可自定多种格局,直到匹配到或匹配完结,格局: [field,formats…],如:match=>[“logdate”,“MMM dd yyyy HH:mm:ss”,“MMM d yyyy HH:mm:ss”,“ISO8601”]

4.4.3 mutate 数据批改插件

mutate 插件是 Logstash 另一个重要插件。它提供了丰盛的根底类型数据处理能力。能够重命名,删除,替换和批改事件中的字段。

罕用配置参数(空 => 同上)
参数名称 类型 默认值 形容信息
add_field
add_tag
convert hash 将指定字段转换为指定类型,字段内容是数组,则转换所有数组元素,如果字段内容是 hash,则不做任何解决,目前反对的转换类型包含:integer,float, string, and boolean. 例如:convert=> {“fieldname”=>“integer”}
enable_metric
gsub array 相似 replace 办法,应用指定内容替换掉指标字符串的现有内容,前提是指标字段必须是字符串,否则不做任何解决,例如:[“fieldname”,“/”,““,“fieldname2”,“[\?#-]”,“.”],解释:应用“”替换掉“fieldname”中的所有“/”,应用“.”替换掉“fieldname2”中的所有“\”“?”、“#”和“-”
id
join hash 应用指定的符号将 array 字段的每个元素连接起来,对非 array 字段有效。例如:应用“,”将 array 字段“fieldname”的每一个元素连接成一个字符串:join=> {“fieldname”=>“,”}
lowercase array 将自定的字段值转换为小写
merge hash 合并两个 array 或者 hash,如果是字符串,将主动转换为一个单元素数组;将一个 array 和一个 hash 合并。例如:将”added_field”合并到”dest_field”:merge=> {“dest_field”=>“added_field”}
periodic_flush
remove_field
remove_tag
rename hash 批改一个或者多个字段的名称。例如:将”HOSTORIP”改名为”client_ip”:rename=> {“HOSTORIP”=>“client_ip”}
replace hash 应用新值残缺的替换掉指定字段的原内容,反对变量援用。例如:应用字段“source_host”的内容拼接上字符串“: My new message”之后的后果替换“message”的值:replace=> {“message”=>“%{source_host}: My new message”}
split hash 依照自定的分隔符将字符串字段拆分成 array 字段,只能作用于 string 类型的字段。例如:将“fieldname”的内容依照“,”拆分成数组:split=> {“fieldname”=>“,”}
strip array 去掉字段内容中间的空白字符。例如:去掉“field1”和“field2”中间的空格:strip=> [“field1”,“field2”]
update hash 更新现有字段的内容,例如:将“sample”字段的内容更新为“Mynew message”:update=> {“sample”=>“My new message”}
uppercase array 将字符串转换为大写
4.4.4 JSON 插件

JSON 插件用于解码 JSON 格局的字符串,个别是一堆日志信息中,局部是 JSON 格局,局部不是的状况下

(1)配置事例
json {source => ...}
  • 事例配置,message 是 JSON 格局的字符串:"{\"uid\":3081609001,\"type\":\"signal\"}"

    filter {
      json {
          source => "message"
          target => "jsoncontent"
      }
    }
  • 输入后果:

    {
      "@version": "1",
      "@timestamp": "2014-11-18T08:11:33.000Z",
      "host": "web121.mweibo.tc.sinanode.com",
      "message": "{\"uid\":3081609001,\"type\":\"signal\"}",
      "jsoncontent": {
          "uid": 3081609001,
          "type": "signal"
      }
    }
  • 如果从事例配置中删除target,输入后果如下:

    {
      "@version": "1",
      "@timestamp": "2014-11-18T08:11:33.000Z",
      "host": "web121.mweibo.tc.sinanode.com",
      "message": "{\"uid\":3081609001,\"type\":\"signal\"}",
      "uid": 3081609001,
      "type": "signal"
    }
(2)罕用配置参数(空 => 同上)
参数名称 类型 默认值 形容信息
add_field
add_tag
enable_metric
id
periodic_flush
remove_field
remove_tag
skip_on_invalid_json boolean false 是否跳过验证不通过的 JSON
source string 必须设置项,指定须要解码的 JSON 字符串字段
tag_on_failure
target string 解析之后的 JSON 对象所在的字段名称,如果没有,JSON 对象的所有字段将挂在根节点下
4.4.5 elasticsearch 查问过滤插件

用于查问 Elasticsearch 中的事件,可将查问后果利用于以后事件中

罕用配置参数(空 => 同上)
参数名称 类型 默认值 形容信息
add_field
add_tag
ca_file string SSL Certificate Authority file path
enable_sort boolean true 是否对后果进行排序
fields array {} 从老事件中复制字段到新事件中,老事件来源于 elasticsearch(用于查问更新)
hosts array [“localhost:9200”] elasticsearch 服务列表
index string “” 用逗号分隔的 elasticsearch 索引列表,如果要操作所有所有应用“_all”或者“”,保留数据到 elasticsearch 时,如果索引不存在会主动以此创立
password string 明码
periodic_flush
query string 查问 elasticsearch 的查问字符串
remove_field
remove_tag
result_size number 1 查问 elasticsearch 时,返回后果的数量
sort string “@timestamp:desc” 逗号分隔的“:”列表,用于查问后果排序
ssl boolean false SSL
tag_on_failure
user string 用户名
4.5 罕用输入插件(Output plugin)
4.5.1 ElasticSearch 输入插件

用于将事件信息写入到 Elasticsearch 中,官网举荐插件,ELK 必备插件

(1)配置事例
output {
    elasticsearch {hosts => ["127.0.0.1:9200"]
        index => "filebeat-%{type}-%{+yyyy.MM.dd}"
        template_overwrite => true
    }
}
(2)罕用配置参数(空 => 同上)
参数名称 类型 默认值 形容信息
absolute_healthcheck_path boolean false 当配置了“healthcheck_path”时,决定 elasticsearch 健康检查 URL 是否依照绝对路径配置。例如:elasticsearch 拜访门路为:”http://localhost:9200/es“,“h… 以后参数为 true 时的拜访门路为:”http://localhost:9200/es/heal… 以后参数为 false 时的拜访门路为:”http://localhost:9200/health”
absolute_sniffing_path boolean false 当配置了“sniffing_path”时,决定 elasticsearch 的 sniffing 拜访门路配置。例如:elasticsearch 拜访门路为:“http://localhost:9200/es”,“s… 以后参数为 true 时的拜访门路为:“http://localhost:9200/es/_sni… 以后参数为 false 时的拜访门路为:“http://localhost:9200/_sniffing”
action string “index” 对 elasticsearch 的操作类型,可用的操作类型:index:索引 Logstash 事件数据到 elasticsearch;delete:依据 id 删除文档,id 必须指定;delete:依据 id 删除文档,id 必须指定;update:依据 id 更新文档
cacert string .cer 或者.pem 证书文件门路,应用证书进行 elasticsearch 认证
codec
doc_as_upsert boolean false 使 update 启用 upsert 模式,即文档不存在时创立新文档
document_id string elasticsearch 中的文档 id,用来笼罩曾经保留到 elasticsearch 中的文档
document_type string 指定存入 elasticsearch 中的文档的 type,没有指定的状况下会应用 Event 信息中的“type”字段的值作为 elasticsearch 的 type
enable_metric
failure_type_logging_whitelist array [] elasricsearch 报错白名单,白名单的异样信息不会被记入 logstash 的 log 中,比方你想疏忽掉所有的“document_already_exists_exception”异样
flush_size
healthcheck_path string “/” elasricsearch 查看状态查看门路
hosts string [//127.0.0.1] elasticsearch 服务地址列表,如果配置多个将启用负载平衡
id
idle_flush_time number 1 距离多长时间将数据输入到 elasticsearch 中一次,次要用于较慢的事件
index string “logstash-%{+YYYY.MM.dd}” 指定 elasticsearch 存储数据时的所有名称,反对变量援用,比方你能够按天创立索引,不便删除历史数据或者查问制订范畴内的数据
keystore string 用于指定密钥库门路,能够是.jks 或者.p12
keystore_password string 密钥库明码
manage_template boolean true 是否启用 elasticsearch 模版,Logstash 自带一个模版,然而只有名称匹配“logstash-*”的索引才会利用该默版
parameters hash 增加到 elasticsearch URL 前面的参数键值对
parent string “nil” 为文档子节点指定父节点的 id
password string elasticsearch 集群拜访明码
path string 当设置了 elasticsearch 代理时用此参数从定向 HTTP API,如果“hosts”中曾经蕴含此门路,则不须要设置
pipeline string “nil” 设置 Event 管道
pool_max number 1000 elasticsearch 最大连接数
pool_max_per_route number 100 每个“endpoint”的最大连接数
proxy string 代理 URL
resurrect_delay number 5 查看挂掉的“endpoint”是否恢复正常的频率
retry_initial_interval number 2 设置批量重试的工夫距离,重试到“retry_max_interval”次
retry_max_interval number 64 Setmax interval in seconds between bulk retries.
retry_on_conflict number 1 Thenumber of times Elasticsearch should internally retry an update/upserteddocument
routing string 指定 Event 路由
script string “” 设置“scriptedupdate”模式下的脚本名称
script_lang string “painless” 设置脚本语言
script_type “inline”、“indexed”、“file” [“inline”] Definethe type of script referenced by“script”variable inline :”script”contains inline script indexed :“script”containsthe name of script directly indexed in elasticsearch file :“script”contains the name of script stored in elasticseach’s config directory
script_var_name string “event” Setvariable name passed to script (scripted update)
scripted_upsert boolean false ifenabled, script is in charge of creating non-existent document (scriptedupdate)
sniffing
sniffing_delay
sniffing_path
ssl
ssl_certificate_verification
template string 设置自定义的默版寄存门路
template_name string “logstash” 设置应用的默版名称
template_overwrite boolean false 是否始终笼罩现有模版
timeout number 60 网络超时工夫
truststore string “:truststore”或者“:cacert”证书库门路
truststore_password string 证书库明码
upsert string “” Setupsert content for update mode.s Create a new document with this parameter asjson string if document_id doesn’texists
user string “” elasticsearch 用户名
validate_after_inactivity number 10000 距离多长时间放弃连贯可用
version string 存入 elasticsearch 的文档的版本号
version_type “internal”、“external”、“external_gt”、“external_gte”、“force”
workers string 1 whenwe no longer support the :legacy type This is hacky, but it can only be herne
4.5.2 Redis 输入插件

用于将 Event 写入 Redis 中进行缓存,通常状况下 Logstash 的 Filter 解决比拟吃系统资源,简单的 Filter 解决会十分耗时,如果 Event 产生速度比拟快,能够应用 Redis 作为 buffer 应用

(1)配置事例
output {
    redis {
        host => "127.0.0.1"
        port => 6379
        data_type => "list"
        key => "logstash-list"
    }
}
(2)罕用配置参数(空 => 同上)
参数名称 类型 默认值 形容信息
batch boolean false 是否启用 redis 的 batch 模式,仅在 data_type=”list”时无效
batch_events number 50 batch 大小,batch 达到此大小时执行“RPUSH”
batch_timeout number 5 batch 超时工夫,超过这个工夫执行“RPUSH”
codec
congestion_interval number 1 距离多长时间查看阻塞,如果设置为 0,则没个 Event 查看一次
congestion_threshold number 0
data_type “list”、“channel” 存储在 redis 中的数据类型,如果应用“list”,将采纳“RPUSH”操作,如果是“channel”,将采纳“PUBLISH”操作
db number 0 应用的 redis 数据库编号
enable_metric
host array [“127.0.0.1”] redis 服务列表,如果配置多个,将随机抉择一个,如果以后的 redis 服务不可用,将抉择下一个
id
key string Thename of a Redis list or channel. Dynamic names are valid here, forexample logstash-%{type}.
password string redis 服务明码
port number 6379 redis 服务监听端口
reconnect_interval number 1 连贯失败时的重连距离
shuffle_hosts boolean true Shufflethe host list during Logstash startup.
timeout number 5 redis 连贯超时工夫
workers number 1 whenwe no longer support the :legacy type This is hacky, but it can only be herne
4.5.3 File 输入插件

用于将 Event 输入到文件内

(1)配置事例
output {
    file {
        path => ...
        codec => line {format => "custom format: %{message}"}
    }
}
(2)罕用配置参数(空 => 同上)
参数名称 类型 默认值 形容信息
codec
create_if_deleted boolean true 如果指标文件被删除,则在写入事件时创立新文件
dir_mode number -1 设置目录的拜访权限,如果为“-1”,应用操作系统默认的拜访权限
enable_metric
file_mode number -1 设置文件的拜访权限,如果为“-1”,应用操作系统默认的拜访权限
filename_failure string “_filepath_failures” 如果指定的文件门路有效,这会在目录内创立这个文件并记录数据
flush_interval number 2 flush 距离
gzip boolean false 是否启用 gzip 压缩
id
path string 必须设置项,文件输入门路,如:path =>”./test-%{+YYYY-MM-dd}.txt”
workers string 1 whenwe no longer support the :legacy type This is hacky, but it can only be herne
4.5.4 Kafka 输入插件

用于将 Event 输入到 Kafka 指定的 Topic 中, 官网 Kafka 详情配置

(1)配置事例
output {
    kafka {
        bootstrap_servers => "localhost:9092"
        topic_id => "test"
        compression_type => "gzip"
    }
}
(2)罕用配置参数(空 => 同上)
参数名称 类型 默认值 形容信息
bootstrap_servers string Kafka 集群信息,格局为 host1:port1,host2:port2
topic_id string 产生音讯的主题
compression_type String none 生产者生成的所有数据的压缩类型。默认为无(即无压缩)。有效值为 none、gzip、snappy 或 lz4。
batch_size number 16384 配置以字节为单位管制默认批处理大小
buffer_memory number 33554432(32MB) 生产者可用于缓冲期待发送到服务器的记录的总内存字节数
max_request_size number 1048576(1MB) 申请的最大大小
flush_interval number 2 flush 距离
gzip boolean false 是否启用 gzip 压缩
id
path string 必须设置项,文件输入门路,如:path =>”./test-%{+YYYY-MM-dd}.txt”
workers string 1 whenwe no longer support the :legacy type This is hacky, but it can only be herne
4.6 罕用编码插件(Codec plugin)
4.6.1 JSON 编码插件

间接输出预约义好的 JSON 数据,这样就能够省略掉 filter/grok 配置

  • 配置事例

    json {}
  • 罕用配置参数

    参数名称 类型 默认值 形容信息
    charset string “UTF-8” 字符集
    enable_metric
    id

五 Logstash 实例

5.1 接管 Filebeat 事件,输入到 Redis
input {
    beats {port => 5044}
}

output {
    redis {
        host => "127.0.0.1"
        port => 6379
        data_type => "list"
        key => "logstash-list"
    }
}
5.2 读取 Redis 数据,依据“type”判断,别离解决,输入到 ES
input {
    redis {
        host => "127.0.0.1"
        port => 6379
        data_type => "list"
        key => "logstash-list"
    }
}

filter {if [type] == "application" {
        grok {match => ["message", "(?m)-(?<systemName>.+?):(?<logTime>(?>\d\d){1,2}-(?:0?[1-9]|1[0-2])-(?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9]) (?:2[0123]|[01]?[0-9]):(?:[0-5][0-9]):(?:(?:[0-5][0-9]|60)(?:[:.,][0-9]+)?)) \[(?<level>(\b\w+\b)) *\] (?<thread>(\b\w+\b)) \((?<point>.*?)\) - (?<content>.*)"]
        }
        date {match => ["logTime", "yyyy-MM-dd HH:mm:ss,SSS"]
        }
        json {source => "message"}
        date {match => ["timestamp", "yyyy-MM-dd HH:mm:ss,SSS"]
        }
    }
    if [type] == "application_bizz" {
        json {source => "message"}
        date {match => ["timestamp", "yyyy-MM-dd HH:mm:ss,SSS"]
        }
    }
    mutate {remove_field => ["@version", "beat", "logTime"]
    }
}

output {stdout{}
    elasticsearch {hosts => ["127.0.0.1:9200"]
        index => "filebeat-%{type}-%{+yyyy.MM.dd}"
        document_type => "%{documentType}"
        template_overwrite => true
    }
}

六 利用场景

6.1 以 logstash 作为日志搜索器

架构:logstash 采集、解决、转发到 elasticsearch 存储,在 kibana 进行展现

特点:这种构造因为须要在各个服务器上部署 Logstash,而它比拟耗费 CPU 和内存资源,所以比拟适宜计算资源丰盛的服务器,否则容易造成服务器性能降落,甚至可能导致无奈失常工作。

6.2 音讯模式

音讯模式:Beats 还不反对输入到音讯队列(新版本除外:5.0 版本及以上),所以在音讯队列前后两端只能是 Logstash 实例。logstash 从各个数据源收集数据,不通过任何解决转换仅转收回到音讯队列(kafka、redis、rabbitMQ 等),后 logstash 从音讯队列取数据进行转换剖析过滤,输入到 elasticsearch,并在 kibana 进行图形化展现

架构(Logstash 进行日志解析所在服务器性能各方面必须要足够好):

模式特点:这种架构适宜于日志规模比拟宏大的状况。但因为 Logstash 日志解析节点和 Elasticsearch 的负荷比拟重,可将他们配置为集群模式,以分担负荷。引入音讯队列,平衡了网络传输,从而升高了网络灵通,尤其是失落数据的可能性,但仍然存在 Logstash 占用系统资源过多的问题

工作流程:Filebeat 采集—> logstash 转发到 kafka—> logstash 解决从 kafka 缓存的数据进行剖析—> 输入到 es—> 显示在 kibana

6.3 logstash(非 filebeat)进行文件采集,输入到 kafka 缓存,读取 kafka 数据并解决输入到文件或 es
6.4 logstash 同步 mysql 数据库数据到 es(logstash5 版本以上已集成 jdbc 插件,无需下载安装,间接应用)

七 Logstash 和 Flume 比照

首先从构造比照,咱们会惊人的发现,两者是如许的类似!Logstash 的 Shipper、Broker、Indexer 别离和 Flume 的 Source、Channel、Sink 各自对应!只不过是 Logstash 集成了,Broker 能够不须要,而 Flume 须要独自配置,且缺一不可,但这再一次阐明了计算机的设计思维都是通用的!只是实现形式会不同而已。

从程序员的角度来说,上文也提到过了,Flume 是真的很繁琐,你须要别离作 source、channel、sink 的手工配置,而且波及到简单的数据采集环境,你可能还要做多个配置,这在下面提过了,反过来说Logstash 的配置就十分简洁清晰,三个局部的属性都定义好了,程序员本人去抉择就行,就算没有,也能够自行开发插件,十分不便。当然了,Flume 的插件也很多,但 Channel 就只有内存和文件这两种(其实当初不止了,但罕用的也就两种)。读者能够看得出来,两者其实配置都是非常灵活的,只不过看场景取舍罢了。

其实从作者和历史背景来看,两者最后的设计目标就不太一样。Flume 自身最后设计的目标是为了把数据传入 HDFS 中(并不是为了采集日志而设计,这和 Logstash 有基本的区别),所以理所应当侧重于数据的传输,程序员要十分分明整个数据的路由,并且比 Logstash 还多了一个可靠性策略,上文中的 channel 就是用于长久化目标,数据除非确认传输到下一地位了,否则不会删除,这一步是通过事务来管制的,这样的设计使得可靠性十分好。相同,Logstash 则显著偏重对数据的预处理,因为日志的字段须要大量的预处理,为解析做铺垫。

回过来看我当初为什么先讲 Logstash 而后讲 Flume?这外面有几个思考

  • 其一:Logstash 其实更有点像通用的模型,所以对新人来说了解起来更简略,而 Flume 这样轻量级的线程,可能有肯定的计算机编程根底了解起来更好;
  • 其二:目前大部分的状况下,Logstash 用的更加多,这个数据我本人没有统计过,然而依据教训判断,Logstash 能够和 ELK 其余组件配合应用,开发、利用都会简略很多,技术成熟,应用场景宽泛。相同 Flume 组件就须要和其余很多工具配合应用,场景的针对性会比拟强,更不必提 Flume 的配置过于繁琐简单了。

最初总结下来,咱们能够这么了解他们的区别:

Logstash 就像是买来的台式机,主板、电源、硬盘,机箱(Logstash)把外面的货色全副装好了,你能够间接用,当然也能够本人组装批改;

Flume 就像提供给你一套残缺的主板,电源、硬盘,Flume 没有打包,只是像说明书一样领导你如何组装,能力运行的起来。

参考文档:

  • [1] 迷途的攻城狮.CSDN: https://blog.csdn.net/chenlei… , 2017-06-22.
  • [2] Logstash 官网: https://www.elastic.co/cn/log…
退出移动版