现有的需要是:从现有系统日志中采集局部数据,写入到MySQL中不便查问。

应用的版本为:Logstash-7.10.0,MySQL-5.7.22,jdk1.8

次要阐明三个局部:

  • 数据起源:微服务生成的所有日志文件,为了不便跨服务器收集多份日志,引入另外一个工具 Filebeat(不必也能够)。
  • 数据筛选:特定格局的日志内容或带有某些标识的日志内容,规定日志内容以及日志格局,不便logstash filter做过滤等操作。
  • 写入数据:将logstash中解决的数据写入MySQL中,假如咱们的表构造如下:
CREATE TABLE `logstash_log_temp` (  `id` int(10) unsigned NOT NULL AUTO_INCREMENT,  `serve` char(16) NOT NULL COMMENT '服务名',  `level` char(8) NOT NULL COMMENT '日志等级',  `pid` int(8) NOT NULL COMMENT '过程号',  `ip` int(16) NOT NULL COMMENT 'ip地址',  `content` varchar(512) NOT NULL COMMENT '内容',  `logdate` datetime NOT NULL COMMENT '工夫',  PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4;

第一步:装置

官网下载后将Logstash-7.10.0.tar.gz放到须要的服务器上,解压:
tar -zxvf Logstash-7.10.0.tar.gz

进入到logstash-7.10.0/config,复制一个配置文件,并改名:
cp logstash-simple.conf logstash-mysql.conf

第二步:增加配置

第一个配置是jvm.optios,是用来增加jvm参数,这里不做过多阐明。
第二个是logstash.yml文件,基本上不须要改变。
第三个logstash-mysql的配置是收集数据和写入数据的重点。
这个配置分为三局部

1. 数据起源:

logstash本人能够读取日志文件,这里咱们通过Filebeat来收集日志数据。上面贴出局部logstash读取日志的配置

input {  file {    # 多个日志能够写成数组模式 path => ['a.log','b.log']    path => '/data/dev/logs/monitor.log'    # 从日志最初开始读取数据    start_position => 'end'    # 设置新的行分隔符,默认为“ \ n”。    #delimiter => "\n"    # 对多久之前的文件批改做疏忽,单位秒    ignore_older => 0    # 设置最多关上文件量,此值没有默认值,然而存在一个外部限度4095    #max_open_files => 4095  }}
2.数据处理

数据处理是从日志数据中找到咱们须要的数据,并作一些不便咱们解决的操作,次要是在filter模块中做解决,其中还有一些非凡的解决模式

1.其中 filter grok match 的匹配要严格对应日志格局,能够依据logstash现有提供的匹配正则来匹配日志格局,这里提供一个在线测试地址,成果如下:

上面是服务中logback的局部配置以及打印出的日志:

<property name="CONSOLE_LOG_PATTERN"    value="%clr(MONITOR) ${CONSOLE_LOG_PATTERN:-%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr(${PID:- }){magenta} %clr([%ip]) %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}}"/>

注:其中退出了MONITOR%ip,其中MONITOR示意以后服务编码,此处可替换为本人服务的编码或名称(%ip自定义日志字段可参考文章)。

MONITOR 2021-03-01 12:51:00.000  INFO 20147 [172.16.26.62] --- [   scheduling-1] c.jason.monitor.schedule.ScheduledTask   : &MONITOR&={"modularName":"日志监控列表-info","operationType":"R","operationLogin":"1","operationUserName":"admin","operationRoleName":"管理员","tenantId":"tenant_system","operationResult":"1","logInfo":"OK"}

通过match后的数据可依据需要删掉一些不须要的字段:

    ### 以下去掉多个字段,    remove_field => "beat.version"    remove_field => "beat.name"    ### 也能够应用数组模式,两种模式可同时存在,以下这些字段仅为缩小logstash日志量    remove_field => ["cloud","input","log","agent","host","ecs"]

:若存在多种日志格局,则配置多个match来匹配对应的日志;须要多少就写多少,当上一个match不匹配后会顺次用上面的match做解决,如果都不合乎,则这条数据会被抛弃。

match => ["message", "\[%{GREEDYDATA:serve}\]\[%{TIMESTAMP_ISO8601:logdate}\]\[%{LOGLEVEL:level}%{SPACE}\] %{GREEDYDATA:content}"]match => ["message", "%{TIMESTAMP_ISO8601:logdate} %{SPACE}%{LOGLEVEL} %{POSINT} --- \[%{GREEDYDATA:thread}\] %{GREEDYDATA:path} : %{GREEDYDATA:content}"]

2.依据咱们本人的需要,对数据做一些解决,咱们排除掉DEBUG以及没有&MONITOR&=非凡标识的日志后,将服务名大写,将日志中的非凡字段做去除等操作后,就会失去日志内容中的json字符串,而后咱们再将这部分数据做json转化,不便后续解决。最初再将咱们日志中的工夫做格式化。

filter {  grok {    match => {"message" => "%{WORD:serve} %{TIMESTAMP_ISO8601:logdate} %{LOGLEVEL:level} %{POSINT} \[%{IP:ip}\] --- \[%{GREEDYDATA}\] %{GREEDYDATA} : %{GREEDYDATA:content}"}    ### 去掉 TIMESTAMP_ISO8601, GREEDYDATA 解析进去的多余字段    remove_field => ["YEAR","MONTHNUM","MONTHDAY","HOUR","MINUTE","SECOND","SPACE","IPV4","IPV6","GREEDYDATA"]  }  ### DEBUG 级别的日志过滤掉  if [level] == "DEBUG" {    drop {}  }  ### content 中不蕴含这个字段的日志过滤掉  if "&MONITOR&=" not in [content] {    drop {}  }  mutate {    ### 将 serve 中的值转化为大写    uppercase => ["serve"]    ### 将 pid 转化为 String 类型    convert => {         "pid" => "string"    }    ### 去掉 content 前后空格    strip => ["content"]    ### 将content中的&MONITOR&=替换为“”    gsub => ["content", "&MONITOR&=", ""]  }  ### 将上一步中的 content 字段做 json 解析  json {    source => "content"    ### 解析成json后将 IP 字段去掉    remove_field => ["IP"]  }  date {    ### 将工夫格式化    match => ["logdate", "yyyy-MM-dd HH:mm:ss.SSS Z"]    timezone => "Asia/Shanghai"    target => "logdate"  }}
3.写入MySQL

须要配置MySQL的信息,此处配置了一个库,如果有多个库,则配置多个jdbc模块。

output {  jdbc {    ### mysql driver path    driver_jar_path => "/data/dev/server/lib/mysql-connector-java-5.1.46.jar"    driver_class => "com.mysql.jdbc.Driver"    ### connection    connection_string => "jdbc:mysql://localhost:3306/monitor?useUnicode=true&characterEncoding=utf8&useSSL=false"    username => "root"    password => "clover"    ### sql    statement => ["INSERT INTO logstash_log_temp(serve, level, pid, ip, content, logdate) VALUES(?, ?, ? ,INET_ATON(?), ? ,?)", "[serve]", "[level]", "[pid]", "[ip]", "[content]", "[logdate]"]  }}

**注:**写入到数据库须要logstash-out-jdbc插件;但这个插件不在默认的插件列表中,所以咱们须要手动的装置一下。

确保有网络的前提下,执行命令:
./logstash-plugin install logstash-output-jdbc

如果服务器没有网络环境,则能够在任何一台有网络,有java环境的电脑上创立一个插件,这里以windows为例:

  • 首先下载一个windows版本的Logstash-7.10.0
  • bin目录下执行命令 logstash-plugin install logstash-output-jdbc
  • 执行 logstash-plugin list,列表中有logstash-out-jdbc则装置胜利
  • 将插件剥离打包命令 logtash-plugin prepare-offline-pack --overwrite --output logstash-output-jdbc.zip logstash-output-jdbc
  • 将生成的zip放到服务器上,并在服务器bin目录下执行./logstash-plugin install file:///data/dev/software/logstash-output-jdbc.zip
  • 执行logstash-plugin list查看装置状况。

第三步:启动

配置完所有步骤后,启动logstash
./logstash -f ../config/logstash-mysql.conf
查看数据库中数据

    • -

贴出残缺logstash-mysql.conf内容如下:

input {  beats {    port => 5044  }}filter {  grok {    match => {"message" => "%{WORD:serve} %{TIMESTAMP_ISO8601:logdate} %{LOGLEVEL:level} %{POSINT} \[%{IP:ip}\] --- \[%{GREEDYDATA}\] %{GREEDYDATA} : %{GREEDYDATA:content}"}    remove_field => ["cloud","input","log","agent","host","ecs"]    ### 去掉 TIMESTAMP_ISO8601, GREEDYDATA 解析进去的多余字段    remove_field => ["YEAR","MONTHNUM","MONTHDAY","HOUR","MINUTE","SECOND","SPACE","IPV4","IPV6","GREEDYDATA"]  }  ### DEBUG 级别的日志过滤掉  if [level] == "DEBUG" {    drop {}  }  ### content 中不蕴含这个字段的日志过滤掉  if "&MONITOR&=" not in [content] {    drop {}  }  mutate {    ### 将 serve 中的值转化为大写    uppercase => ["serve"]    ### 将 pid 转化为 String 类型    #convert => {    #     "pid" => "string"    #}    ### 将content中的&MONITOR&=替换为空    gsub => ["content", "&MONITOR&=", ""]    ### 去掉某个字段的前后空格,比方content    #strip => ["content"]  }  ### 将上一步中的 content 字段做 json 解析  json {    source => "content"  }  date {    ### 将工夫格式化    match => ["logdate", "yyyy-MM-dd HH:mm:ss.SSS Z"]    timezone => "Asia/Shanghai"    target => "logdate"  }}output {  ### 多个库则声明多个jdbc  jdbc {    ### mysql driver path    driver_jar_path => "/data/dev/server/lib/mysql-connector-java-5.1.46.jar"    driver_class => "com.mysql.jdbc.Driver"    ### connection    connection_string => "jdbc:mysql://localhsot:3306/monitor?useUnicode=true&characterEncoding=utf8&useSSL=false"    username => "root"    password => "clover"    ### sql ip地址用INET_NTOA(`host_ip`)来获取    statement => ["INSERT INTO logstash_log_temp(serve, level, pid, ip, content, logdate) VALUES(?, ?, ? ,INET_ATON(?), ? ,?)", "[serve]", "[level]", "[pid]", "[ip]", "[content]", "[logdate]"]    ]  }  ### logstash数据转化成json格局写入日志中  stdout {    codec => json_lines  }}