现有的需要是:从现有系统日志中采集局部数据,写入到 MySQL 中不便查问。
应用的版本为:Logstash-7.10.0,MySQL-5.7.22,jdk1.8
次要阐明三个局部:
- 数据起源:微服务生成的所有日志文件,为了不便跨服务器收集多份日志,引入另外一个工具
Filebeat
(不必也能够)。 - 数据筛选:特定格局的日志内容或带有某些标识的日志内容,规定日志内容以及日志格局,不便
logstash filter
做过滤等操作。 - 写入数据:将 logstash 中解决的数据写入 MySQL 中,假如咱们的表构造如下:
CREATE TABLE `logstash_log_temp` (`id` int(10) unsigned NOT NULL AUTO_INCREMENT,
`serve` char(16) NOT NULL COMMENT '服务名',
`level` char(8) NOT NULL COMMENT '日志等级',
`pid` int(8) NOT NULL COMMENT '过程号',
`ip` int(16) NOT NULL COMMENT 'ip 地址',
`content` varchar(512) NOT NULL COMMENT '内容',
`logdate` datetime NOT NULL COMMENT '工夫',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4;
第一步:装置
官网下载后将 Logstash-7.10.0.tar.gz 放到须要的服务器上,解压:tar -zxvf Logstash-7.10.0.tar.gz
进入到 logstash-7.10.0/config,复制一个配置文件,并改名:cp logstash-simple.conf logstash-mysql.conf
第二步:增加配置
第一个配置是 jvm.optios, 是用来增加 jvm 参数,这里不做过多阐明。
第二个是 logstash.yml 文件,基本上不须要改变。
第三个 logstash-mysql 的配置是收集数据和写入数据的重点。
这个配置分为三局部
1. 数据起源:
logstash 本人能够读取日志文件,这里咱们通过 Filebeat
来收集日志数据。上面贴出局部 logstash 读取日志的配置
input {
file {# 多个日志能够写成数组模式 path => ['a.log','b.log']
path => '/data/dev/logs/monitor.log'
# 从日志最初开始读取数据
start_position => 'end'
# 设置新的行分隔符,默认为“\ n”。#delimiter => "\n"
# 对多久之前的文件批改做疏忽,单位秒
ignore_older => 0
# 设置最多关上文件量,此值没有默认值,然而存在一个外部限度 4095
#max_open_files => 4095
}
}
2. 数据处理
数据处理是从日志数据中找到咱们须要的数据,并作一些不便咱们解决的操作,次要是在 filter
模块中做解决,其中还有一些非凡的解决模式
1. 其中 filter grok match 的匹配要严格对应日志格局,能够依据 logstash 现有提供的匹配正则来匹配日志格局,这里提供一个在线测试地址,成果如下:
上面是服务中 logback 的局部配置以及打印出的日志:
<property name="CONSOLE_LOG_PATTERN"
value="%clr(MONITOR) ${CONSOLE_LOG_PATTERN:-%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr(${PID:-}){magenta} %clr([%ip]) %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}}"/>
注:其中退出了 MONITOR
和%ip
,其中 MONITOR
示意以后服务编码,此处可替换为本人服务的编码或名称(%ip
自定义日志字段可参考文章)。
MONITOR 2021-03-01 12:51:00.000 INFO 20147 [172.16.26.62] --- [scheduling-1] c.jason.monitor.schedule.ScheduledTask : &MONITOR&={"modularName":"日志监控列表 -info","operationType":"R","operationLogin":"1","operationUserName":"admin","operationRoleName":"管理员","tenantId":"tenant_system","operationResult":"1","logInfo":"OK"}
通过 match 后的数据可依据需要删掉一些不须要的字段:
### 以下去掉多个字段,remove_field => "beat.version"
remove_field => "beat.name"
### 也能够应用数组模式,两种模式可同时存在,以下这些字段仅为缩小 logstash 日志量
remove_field => ["cloud","input","log","agent","host","ecs"]
注:若存在多种日志格局,则配置多个 match 来匹配对应的日志;须要多少就写多少,当上一个 match 不匹配后会顺次用上面的 match 做解决,如果都不合乎,则这条数据会被抛弃。
match => ["message", "\[%{GREEDYDATA:serve}\]\[%{TIMESTAMP_ISO8601:logdate}\]\[%{LOGLEVEL:level}%{SPACE}\] %{GREEDYDATA:content}"]
match => ["message", "%{TIMESTAMP_ISO8601:logdate} %{SPACE}%{LOGLEVEL} %{POSINT} --- \[%{GREEDYDATA:thread}\] %{GREEDYDATA:path} : %{GREEDYDATA:content}"]
2. 依据咱们本人的需要,对数据做一些解决,咱们排除掉 DEBUG
以及没有 &MONITOR&=
非凡标识的日志后,将服务名大写,将日志中的非凡字段做去除等操作后,就会失去日志内容中的 json 字符串,而后咱们再将这部分数据做 json 转化,不便后续解决。最初再将咱们日志中的工夫做格式化。
filter {
grok {match => {"message" => "%{WORD:serve} %{TIMESTAMP_ISO8601:logdate} %{LOGLEVEL:level} %{POSINT} \[%{IP:ip}\] --- \[%{GREEDYDATA}\] %{GREEDYDATA} : %{GREEDYDATA:content}"}
### 去掉 TIMESTAMP_ISO8601,GREEDYDATA 解析进去的多余字段
remove_field => ["YEAR","MONTHNUM","MONTHDAY","HOUR","MINUTE","SECOND","SPACE","IPV4","IPV6","GREEDYDATA"]
}
### DEBUG 级别的日志过滤掉
if [level] == "DEBUG" {drop {}
}
### content 中不蕴含这个字段的日志过滤掉
if "&MONITOR&=" not in [content] {drop {}
}
mutate {
### 将 serve 中的值转化为大写
uppercase => ["serve"]
### 将 pid 转化为 String 类型
convert => {"pid" => "string"}
### 去掉 content 前后空格
strip => ["content"]
### 将 content 中的 &MONITOR&= 替换为“”gsub => ["content", "&MONITOR&=", ""]
}
### 将上一步中的 content 字段做 json 解析
json {
source => "content"
### 解析成 json 后将 IP 字段去掉
remove_field => ["IP"]
}
date {
### 将工夫格式化
match => ["logdate", "yyyy-MM-dd HH:mm:ss.SSS Z"]
timezone => "Asia/Shanghai"
target => "logdate"
}
}
3. 写入 MySQL
须要配置 MySQL 的信息,此处配置了一个库,如果有多个库,则配置多个 jdbc 模块。
output {
jdbc {
### mysql driver path
driver_jar_path => "/data/dev/server/lib/mysql-connector-java-5.1.46.jar"
driver_class => "com.mysql.jdbc.Driver"
### connection
connection_string => "jdbc:mysql://localhost:3306/monitor?useUnicode=true&characterEncoding=utf8&useSSL=false"
username => "root"
password => "clover"
### sql
statement => ["INSERT INTO logstash_log_temp(serve, level, pid, ip, content, logdate) VALUES(?, ?, ? ,INET_ATON(?), ? ,?)", "[serve]", "[level]", "[pid]", "[ip]", "[content]", "[logdate]"]
}
}
** 注:** 写入到数据库须要 logstash-out-jdbc
插件;但这个插件不在默认的插件列表中,所以咱们须要手动的装置一下。
确保有网络的前提下,执行命令:./logstash-plugin install logstash-output-jdbc
如果服务器没有网络环境,则能够在任何一台有网络,有 java 环境的电脑上创立一个插件,这里以 windows 为例:
- 首先下载一个 windows 版本的 Logstash-7.10.0
- bin 目录下执行命令
logstash-plugin install logstash-output-jdbc
- 执行
logstash-plugin list
,列表中有logstash-out-jdbc
则装置胜利 - 将插件剥离打包命令
logtash-plugin prepare-offline-pack --overwrite --output logstash-output-jdbc.zip logstash-output-jdbc
- 将生成的 zip 放到服务器上,并在服务器 bin 目录下执行
./logstash-plugin install file:///data/dev/software/logstash-output-jdbc.zip
- 执行
logstash-plugin list
查看装置状况。
第三步:启动
配置完所有步骤后,启动 logstash./logstash -f ../config/logstash-mysql.conf
查看数据库中数据
-
- –
贴出残缺 logstash-mysql.conf 内容如下:
input {
beats {port => 5044}
}
filter {
grok {match => {"message" => "%{WORD:serve} %{TIMESTAMP_ISO8601:logdate} %{LOGLEVEL:level} %{POSINT} \[%{IP:ip}\] --- \[%{GREEDYDATA}\] %{GREEDYDATA} : %{GREEDYDATA:content}"}
remove_field => ["cloud","input","log","agent","host","ecs"]
### 去掉 TIMESTAMP_ISO8601,GREEDYDATA 解析进去的多余字段
remove_field => ["YEAR","MONTHNUM","MONTHDAY","HOUR","MINUTE","SECOND","SPACE","IPV4","IPV6","GREEDYDATA"]
}
### DEBUG 级别的日志过滤掉
if [level] == "DEBUG" {drop {}
}
### content 中不蕴含这个字段的日志过滤掉
if "&MONITOR&=" not in [content] {drop {}
}
mutate {
### 将 serve 中的值转化为大写
uppercase => ["serve"]
### 将 pid 转化为 String 类型
#convert => {
# "pid" => "string"
#}
### 将 content 中的 &MONITOR&= 替换为空
gsub => ["content", "&MONITOR&=", ""]
### 去掉某个字段的前后空格,比方 content
#strip => ["content"]
}
### 将上一步中的 content 字段做 json 解析
json {source => "content"}
date {
### 将工夫格式化
match => ["logdate", "yyyy-MM-dd HH:mm:ss.SSS Z"]
timezone => "Asia/Shanghai"
target => "logdate"
}
}
output {
### 多个库则声明多个 jdbc
jdbc {
### mysql driver path
driver_jar_path => "/data/dev/server/lib/mysql-connector-java-5.1.46.jar"
driver_class => "com.mysql.jdbc.Driver"
### connection
connection_string => "jdbc:mysql://localhsot:3306/monitor?useUnicode=true&characterEncoding=utf8&useSSL=false"
username => "root"
password => "clover"
### sql ip 地址用 INET_NTOA(`host_ip`)来获取
statement => ["INSERT INTO logstash_log_temp(serve, level, pid, ip, content, logdate) VALUES(?, ?, ? ,INET_ATON(?), ? ,?)", "[serve]", "[level]", "[pid]", "[ip]", "[content]", "[logdate]"]
]
}
### logstash 数据转化成 json 格局写入日志中
stdout {codec => json_lines}
}