共计 6863 个字符,预计需要花费 18 分钟才能阅读完成。
Logstash 入门
Logstash 是什么
Logstash 就是一个开源的数据流工具,它会做三件事:
- 从数据源拉取数据
- 对数据进行过滤、转换等解决
- 将解决后的数据写入指标地
例如:
- 监听某个目录下的日志文件,读取文件内容,解决数据,写入 influxdb。
- 从 kafka 中生产音讯,解决数据,写入 elasticsearch。
为什么要用 Logstash?
不便省事。
假如你须要从 kafka 中生产数据,而后写入 elasticsearch,如果本人编码,你得去对接 kafka 和 elasticsearch 的 API 吧,如果你用 Logstash,这部分就不必本人去实现了,因为 Logstash 曾经为你封装了对应的 plugin
插件,你只须要写一个配置文件形如:
input {
kafka {# kafka consumer 配置}
}
filter {# 数据处理配置}
output {
elasticsearch {# elasticsearch 输入配置}
}
而后运行 logstash 就能够了。
Logstash 提供了两百多个封装好的 plugin
插件,这些插件被分为三类:
input plugin
: 从哪里拉取数据filter plugin
: 数据如何解决output plugin
: 数据写入何处
应用 logstash 你只有编写一个配置文件,在配置文件中筛选组合这些 plugin
插件,就能够轻松实现数据从输出源到输入源的实时流动。
装置 logstash
请参数:官网文档
第一个示例
假如你曾经装置好了 logstash,并且可执行文件的门路曾经退出到了 PATH 环境变量中。
上面开始咱们的第一个示例,编写 pipeline.conf
文件,内容为:
input {stdin {}
}
filter {
}
output {stdout {}
}
这个配置文件的含意是:
input
输出为stdin
(规范输出)filter
为空(也就是不进行数据的解决)output
输入为stdout
(规范输入)
执行命令:
logstash -f pipeline.conf
期待 logstash 启动结束,输出 hello world 而后回车, 你就会看到以下输入内容:
{
"message" => "hello world",
"@version" => "1",
"@timestamp" => 2020-11-01T08:25:10.987Z,
"host" => "local"
}
咱们输出的内容曾经存在于 message
字段中了。
当你输出其余内容后也会看到相似的输入。
至此,咱们的第一个示例曾经实现,正如配置文件中所定义的,Logstash 从 stdin 规范输出读取数据,不对源数据做任何解决,而后输入到 stdout 规范输入。
特定名词和字段
event
: 数据在 logstash 中被包装成event
事件的模式从 input 到 filter 再到 output 流转。@timestamp
: 非凡字段,标记 event 产生的工夫。@version
: 非凡字段,标记 event 的版本号。message
: 源数据内容。@metadata
: 元数据,key/value 的模式,是否有数据得看具体插件,例如 kafka 的 input 插件会在@metadata
里记录 topic、consumer_group、partition、offset 等一些元数据。tags
: 记录 tag 的字符串数组。
字段援用
在配置文件中,能够通过 [field]
的模式援用字段内容,如果在字符串中,则能够通过 %{[field]}
的形式进行援用。
示例:
input {
kafka {# kafka 配置}
}
filter {
# 援用 log_level 字段的内容进行判断
if [log_level] == "debug" {}}
output {
elasticsearch {# %{+yyyy.MM.dd} 来源于 @timestamp
index => "log-%{+yyyy.MM.dd}"
document_type => "_doc"
document_id => "%{[@metadata][kafka][key]}"
hosts => ["127.0.0.1:9200"]
}
}
Plugin 插件一览
用好 Logstash 的第一步就是相熟 plugin 插件,只有相熟了这些插件你能力疾速高效的建设数据管道。
Input plugin
Input
插件定义了数据源,即 logstash 从哪里拉取数据。
beats
: 从 Elastic Beats 框架中接收数据。
示例:
input {
beats {port => 5044}
}
dead_letter_queue
: 从 Logstash 本人的 dead letter queue 中拉取数据,目前 dead letter queue 只反对记录 output 为 elasticsearch 时写入 400 或 404 的数据。
示例:
input {
dead_letter_queue {
path => "/var/logstash/data/dead_letter_queue"
start_timestamp => "2017-04-04T23:40:37"
}
}
elasticsearch
: 从 elasticsearch 中读取 search query 的后果。
示例:
input {
elasticsearch {
hosts => "localhost"
query => '{"query": {"match": {"statuscode": 200} } }'
}
}
exec
: 定期执行一个 shell 命令,而后捕捉其输入。
示例:
input {
exec {
command => "ls"
interval => 30
}
}
file
: 从文件中流式读取内容。
示例:
input {
file {path => ["/var/log/*.log", "/var/log/message"]
start_position => "beginning"
}
}
generator
: 生成随机数据。
示例:
input {
generator {
count => 3
lines => [
"line 1",
"line 2",
"line 3"
]
}
}
github
: 从 github webhooks 中读取数据。graphite
: 承受 graphite 的 metrics 指标数据。heartbeat
: 生成心跳信息。这样做的个别目标是测试 Logstash 的性能和可用性。http
: Logstash 承受 http 申请作为数据。http_poller
: Logstash 发动 http 申请,读取响应数据。
示例:
input {
http_poller {
urls => {
test1 => "http://localhost:9200"
test2 => {
method => get
user => "AzureDiamond"
password => "hunter2"
url => "http://localhost:9200/_cluster/health"
headers => {Accept => "application/json"}
}
}
request_timeout => 60
schedule => {cron => "* * * * * UTC"}
codec => "json"
metadata_target => "http_poller_metadata"
}
}
imap
: 从 IMAP 服务器读取邮件。jdbc
: 通过 JDBC 接口导入数据库中的数据。
示例:
input {
jdbc {
jdbc_driver_library => "mysql-connector-java-5.1.36-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/mydb"
jdbc_user => "mysql"
parameters => {"favorite_artist" => "Beethoven"}
schedule => "* * * * *"
statement => "SELECT * from songs where artist = :favorite_artist"
}
}
kafka
: 生产 kafka 中的音讯。
示例:
input {
kafka {
bootstrap_servers => "127.0.0.1:9092"
group_id => "consumer_group"
topics => ["kafka_topic"]
enable_auto_commit => true
auto_commit_interval_ms => 5000
auto_offset_reset => "latest"
decorate_events => true
isolation_level => "read_uncommitted"
max_poll_records => 1000
}
}
rabbitmq
: 从 RabbitMQ 队列中拉取数据。redis
: 从 redis 中读取数据。stdin
: 从规范输出读取数据。syslog
: 读取 syslog 数据。tcp
: 通过 TCP socket 读取数据。udp
: 通过 udp 读取数据。unix
: 通过 UNIX socket 读取数据。websocket
: 通过 websocket 协定 读取数据。
Output plugin
Output
插件定义了数据的输出地,即 logstash 将数据写入何处。
csv
: 将数据写入 csv 文件。elasticsearch
: 写入 Elasticsearch。email
: 发送 email 邮件。exec
: 执行命令。file
: 写入磁盘文件。graphite
: 写入 Graphite。http
: 发送 http 申请。influxdb
: 写入 InfluxDB。kafka
: 写入 Kafka。mongodb
: 写入 MongoDB。opentsdb
: 写入 OpenTSDB。rabbitmq
: 写入 RabbitMQ。redis
: 应用 RPUSH 的形式写入到 Redis 队列。sink
: 将数据抛弃,不写入任何中央。syslog
: 将数据发送到 syslog 服务端。tcp
: 发送 TCP socket。udp
: 发送 UDP。webhdfs
: 通过 webhdfs REST API 写入 HDFS。websocket
: 推送 websocket 音讯。
Filter plugin
Filter
插件定义对数据进行如何解决。
aggregate
: 聚合数据。alter
: 批改数据。bytes
: 将存储大小如 “123 MB” 或 “5.6gb” 的字符串示意模式解析为以字节为单位的数值。cidr
: 查看 IP 地址是否在指定范畴内。
示例:
filter {
cidr {add_tag => [ "testnet"]
address => ["%{src_ip}", "%{dst_ip}" ]
network => ["192.0.2.0/24"]
}
}
cipher
: 对数据进行加密或解密。clone
: 复制 event 事件。csv
: 解析 CSV 格局的数据。date
: 解析字段中的日期数据。
示例,匹配输出的 timestamp 字段,而后替换 @timestamp:
filter {
date {match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss ZZ"]
target => "@timestamp"
}
}
dissect
: 应用%{}
的模式拆分字符串并提取出特定内容,比拟罕用,具体语法见 dissect 文档。drop
: 抛弃这个 event。
示例:
filter {if [loglevel] == "debug" {drop {}
}
}
elapsed
: 通过记录开始和完结工夫跟踪 event 的耗时。elasticsearch
: 在 elasticsearch 中进行搜寻,并将数据复制到以后 event 中。environment
: 将环境变量中的数据存储到 @metadata 字段中。extractnumbers
: 提取字符串中找到的所有数字。fingerprint
: 依据一个或多个字段的内容创立哈希值,并存储到新的字段中。geoip
: 应用绑定的 GeoLite2 数据库增加无关 IP 地址的地理位置的信息,这个插件十分有用,你能够依据 IP 地址失去对应的国家、省份、城市、经纬度等地理位置数据。
示例,通过 clent_ip 字段获取对应的地理位置信息:
filter {
geoip {
cache_size => 1000
default_database_type => "City"
source => "clent_ip"
target => "geo"
tag_on_failure => ["_geoip_city_fail"]
add_field => {"geo_country_name" => "%{[geo][country_name]}"
"geo_region_name" => "%{[geo][region_name]}"
"geo_city_name" => "%{[geo][city_name]}"
"geo_location" => "%{[geo][latitude]},%{[geo][longitude]}"
}
remove_field => ["geo"]
}
}
grok
: 通过正则表达式去解决字符串,比拟罕用,具体语法见 grok 文档。http
: 与内部 web services/REST APIs 集成。i18n
: 从字段中删除特殊字符。java_uuid
: 生成 UUID。jdbc_static
: 从近程数据库中读取数据,而后丰盛 event。jdbc_streaming
: 执行 SQL 查问而后将后果存储到指定字段。json
: 解析 json 字符串,生成 field 和 value。
示例:
filter {
json {
skip_on_invalid_json => true
source => "message"
}
}
如果输出的 message 字段是 json 字符串如 "{"a": 1,"b": 2}"
, 那么解析后就会减少两个字段,字段名别离是 a 和 b。
kv
: 解析 key=value 模式的数据。memcached
: 与内部 memcached 集成。metrics
: logstash 在内存中去聚合指标数据。mutate
: 对字段进行一些惯例更改。
示例:
filter {
mutate {split => ["hostname", "."]
add_field => {"shortHostname" => "%{hostname[0]}" }
}
mutate {rename => ["shortHostname", "hostname"]
}
}
prune
: 通过黑白名单的形式删除多余的字段。
示例:
filter {
prune {blacklist_names => [ "method", "(referrer|status)", "${some}_field" ]
}
}
ruby
: 执行 ruby 代码。
示例,解析 http://example.com/abc?q=haha
模式字符串中的 query 参数 q 的值:
filter {
ruby {code => "require'cgi'req = event.get('request_uri').split('?')
query = ''
if req.length > 1
query = req[1]
qh = CGI::parse(query)
event.set('search_q', qh['q'][0])
end
"
}
}
在 ruby 代码中,字段的获取和设置通过 event.get()
和 event.set()
办法进行操作。
sleep
: 休眠指定工夫。split
: 拆分字段。throttle
: 限流,限度 event 数量。translate
: 依据指定的字典文件将数据进行对应转换。
示例:
filter {
translate {field => "[http_status]"
destination => "[http_status_description]"
dictionary => {
"100" => "Continue"
"101" => "Switching Protocols"
"200" => "OK"
"500" => "Server Error"
}
fallback => "I'm a teapot"
}
}
truncate
: 将字段内容超出长度的局部裁剪掉。urldecode
: 对 urlencoded 的内容进行解码。useragent
: 解析 user-agent 的内容失去诸如设施、操作系统、版本等信息。
示例:
filter {
# ua_device : 设施
# ua_name : 浏览器
# ua_os : 操作系统
useragent {
lru_cache_size => 1000
source => "user_agent"
target => "ua"
add_field => {"ua_device" => "%{[ua][device]}"
"ua_name" => "%{[ua][name]}"
"ua_os" => "%{[ua][os_name]}"
}
remove_field => ["ua"]
}
}
uuid
: 生成 UUID。xml
: 解析 XML 格局的数据。
结语
Logstash 的插件除了本文提到的这些之外还有很多,想要具体的理解每个插件如何应用还是要去查阅官网文档。
得益于 Logstash 的插件体系,你只须要编写一个配置文件,申明应用哪些插件,就能够很轻松的构建数据管道。