关于java:Logstash收集日志数据到MySQL

75次阅读

共计 6090 个字符,预计需要花费 16 分钟才能阅读完成。

现有的需要是:从现有系统日志中采集局部数据,写入到 MySQL 中不便查问。

应用的版本为:Logstash-7.10.0,MySQL-5.7.22,jdk1.8

次要阐明三个局部:

  • 数据起源:微服务生成的所有日志文件,为了不便跨服务器收集多份日志,引入另外一个工具 Filebeat(不必也能够)。
  • 数据筛选:特定格局的日志内容或带有某些标识的日志内容,规定日志内容以及日志格局,不便 logstash filter 做过滤等操作。
  • 写入数据:将 logstash 中解决的数据写入 MySQL 中,假如咱们的表构造如下:
CREATE TABLE `logstash_log_temp` (`id` int(10) unsigned NOT NULL AUTO_INCREMENT,
  `serve` char(16) NOT NULL COMMENT '服务名',
  `level` char(8) NOT NULL COMMENT '日志等级',
  `pid` int(8) NOT NULL COMMENT '过程号',
  `ip` int(16) NOT NULL COMMENT 'ip 地址',
  `content` varchar(512) NOT NULL COMMENT '内容',
  `logdate` datetime NOT NULL COMMENT '工夫',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4;

第一步:装置

官网下载后将 Logstash-7.10.0.tar.gz 放到须要的服务器上,解压:
tar -zxvf Logstash-7.10.0.tar.gz

进入到 logstash-7.10.0/config,复制一个配置文件,并改名:
cp logstash-simple.conf logstash-mysql.conf

第二步:增加配置

第一个配置是 jvm.optios, 是用来增加 jvm 参数,这里不做过多阐明。
第二个是 logstash.yml 文件,基本上不须要改变。
第三个 logstash-mysql 的配置是收集数据和写入数据的重点。
这个配置分为三局部

1. 数据起源:

logstash 本人能够读取日志文件,这里咱们通过 Filebeat 来收集日志数据。上面贴出局部 logstash 读取日志的配置

input {
  file {# 多个日志能够写成数组模式 path => ['a.log','b.log']
    path => '/data/dev/logs/monitor.log'
    # 从日志最初开始读取数据
    start_position => 'end'
    # 设置新的行分隔符,默认为“\ n”。#delimiter => "\n"
    # 对多久之前的文件批改做疏忽,单位秒
    ignore_older => 0
    # 设置最多关上文件量,此值没有默认值,然而存在一个外部限度 4095
    #max_open_files => 4095
  }
}
2. 数据处理

数据处理是从日志数据中找到咱们须要的数据,并作一些不便咱们解决的操作,次要是在 filter 模块中做解决,其中还有一些非凡的解决模式

1. 其中 filter grok match 的匹配要严格对应日志格局,能够依据 logstash 现有提供的匹配正则来匹配日志格局,这里提供一个在线测试地址,成果如下:

上面是服务中 logback 的局部配置以及打印出的日志:

<property name="CONSOLE_LOG_PATTERN"
    value="%clr(MONITOR) ${CONSOLE_LOG_PATTERN:-%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr(${PID:-}){magenta} %clr([%ip]) %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}}"/>

注:其中退出了 MONITOR%ip,其中 MONITOR 示意以后服务编码,此处可替换为本人服务的编码或名称(%ip自定义日志字段可参考文章)。

MONITOR 2021-03-01 12:51:00.000  INFO 20147 [172.16.26.62] --- [scheduling-1] c.jason.monitor.schedule.ScheduledTask   : &MONITOR&={"modularName":"日志监控列表 -info","operationType":"R","operationLogin":"1","operationUserName":"admin","operationRoleName":"管理员","tenantId":"tenant_system","operationResult":"1","logInfo":"OK"}

通过 match 后的数据可依据需要删掉一些不须要的字段:

    ### 以下去掉多个字段,remove_field => "beat.version"
    remove_field => "beat.name"
    ### 也能够应用数组模式,两种模式可同时存在,以下这些字段仅为缩小 logstash 日志量
    remove_field => ["cloud","input","log","agent","host","ecs"]

:若存在多种日志格局,则配置多个 match 来匹配对应的日志;须要多少就写多少,当上一个 match 不匹配后会顺次用上面的 match 做解决,如果都不合乎,则这条数据会被抛弃。

match => ["message", "\[%{GREEDYDATA:serve}\]\[%{TIMESTAMP_ISO8601:logdate}\]\[%{LOGLEVEL:level}%{SPACE}\] %{GREEDYDATA:content}"]
match => ["message", "%{TIMESTAMP_ISO8601:logdate} %{SPACE}%{LOGLEVEL} %{POSINT} --- \[%{GREEDYDATA:thread}\] %{GREEDYDATA:path} : %{GREEDYDATA:content}"]

2. 依据咱们本人的需要,对数据做一些解决,咱们排除掉 DEBUG 以及没有 &MONITOR&= 非凡标识的日志后,将服务名大写,将日志中的非凡字段做去除等操作后,就会失去日志内容中的 json 字符串,而后咱们再将这部分数据做 json 转化,不便后续解决。最初再将咱们日志中的工夫做格式化。

filter {
  grok {match => {"message" => "%{WORD:serve} %{TIMESTAMP_ISO8601:logdate} %{LOGLEVEL:level} %{POSINT} \[%{IP:ip}\] --- \[%{GREEDYDATA}\] %{GREEDYDATA} : %{GREEDYDATA:content}"}

    ### 去掉 TIMESTAMP_ISO8601,GREEDYDATA 解析进去的多余字段
    remove_field => ["YEAR","MONTHNUM","MONTHDAY","HOUR","MINUTE","SECOND","SPACE","IPV4","IPV6","GREEDYDATA"]
  }

  ### DEBUG 级别的日志过滤掉
  if [level] == "DEBUG" {drop {}
  }

  ### content 中不蕴含这个字段的日志过滤掉
  if "&MONITOR&=" not in [content] {drop {}
  }

  mutate {
    ### 将 serve 中的值转化为大写
    uppercase => ["serve"]
    ### 将 pid 转化为 String 类型
    convert => {"pid" => "string"}
    ### 去掉 content 前后空格
    strip => ["content"]
    ### 将 content 中的 &MONITOR&= 替换为“”gsub => ["content", "&MONITOR&=", ""]
  }

  ### 将上一步中的 content 字段做 json 解析
  json {
    source => "content"
    ### 解析成 json 后将 IP 字段去掉
    remove_field => ["IP"]
  }

  date {
    ### 将工夫格式化
    match => ["logdate", "yyyy-MM-dd HH:mm:ss.SSS Z"]
    timezone => "Asia/Shanghai"
    target => "logdate"
  }
}
3. 写入 MySQL

须要配置 MySQL 的信息,此处配置了一个库,如果有多个库,则配置多个 jdbc 模块。

output {
  jdbc {
    ### mysql driver path
    driver_jar_path => "/data/dev/server/lib/mysql-connector-java-5.1.46.jar"
    driver_class => "com.mysql.jdbc.Driver"
    ### connection
    connection_string => "jdbc:mysql://localhost:3306/monitor?useUnicode=true&characterEncoding=utf8&useSSL=false"
    username => "root"
    password => "clover"
    ### sql
    statement => ["INSERT INTO logstash_log_temp(serve, level, pid, ip, content, logdate) VALUES(?, ?, ? ,INET_ATON(?), ? ,?)", "[serve]", "[level]", "[pid]", "[ip]", "[content]", "[logdate]"]
  }
}

** 注:** 写入到数据库须要 logstash-out-jdbc 插件;但这个插件不在默认的插件列表中,所以咱们须要手动的装置一下。

确保有网络的前提下,执行命令:
./logstash-plugin install logstash-output-jdbc

如果服务器没有网络环境,则能够在任何一台有网络,有 java 环境的电脑上创立一个插件,这里以 windows 为例:

  • 首先下载一个 windows 版本的 Logstash-7.10.0
  • bin 目录下执行命令 logstash-plugin install logstash-output-jdbc
  • 执行 logstash-plugin list,列表中有 logstash-out-jdbc 则装置胜利
  • 将插件剥离打包命令 logtash-plugin prepare-offline-pack --overwrite --output logstash-output-jdbc.zip logstash-output-jdbc
  • 将生成的 zip 放到服务器上,并在服务器 bin 目录下执行./logstash-plugin install file:///data/dev/software/logstash-output-jdbc.zip
  • 执行 logstash-plugin list 查看装置状况。

第三步:启动

配置完所有步骤后,启动 logstash
./logstash -f ../config/logstash-mysql.conf
查看数据库中数据

贴出残缺 logstash-mysql.conf 内容如下:

input {
  beats {port => 5044}
}

filter {
  grok {match => {"message" => "%{WORD:serve} %{TIMESTAMP_ISO8601:logdate} %{LOGLEVEL:level} %{POSINT} \[%{IP:ip}\] --- \[%{GREEDYDATA}\] %{GREEDYDATA} : %{GREEDYDATA:content}"}

    remove_field => ["cloud","input","log","agent","host","ecs"]
    ### 去掉 TIMESTAMP_ISO8601,GREEDYDATA 解析进去的多余字段
    remove_field => ["YEAR","MONTHNUM","MONTHDAY","HOUR","MINUTE","SECOND","SPACE","IPV4","IPV6","GREEDYDATA"]
  }

  ### DEBUG 级别的日志过滤掉
  if [level] == "DEBUG" {drop {}
  }

  ### content 中不蕴含这个字段的日志过滤掉
  if "&MONITOR&=" not in [content] {drop {}
  }

  mutate {
    ### 将 serve 中的值转化为大写
    uppercase => ["serve"]
    ### 将 pid 转化为 String 类型
    #convert => {
    #     "pid" => "string"
    #}
    ### 将 content 中的 &MONITOR&= 替换为空
    gsub => ["content", "&MONITOR&=", ""]
    ### 去掉某个字段的前后空格,比方 content
    #strip => ["content"]
  }

  ### 将上一步中的 content 字段做 json 解析
  json {source => "content"}

  date {
    ### 将工夫格式化
    match => ["logdate", "yyyy-MM-dd HH:mm:ss.SSS Z"]
    timezone => "Asia/Shanghai"
    target => "logdate"
  }
}

output {
  ### 多个库则声明多个 jdbc
  jdbc {
    ### mysql driver path
    driver_jar_path => "/data/dev/server/lib/mysql-connector-java-5.1.46.jar"
    driver_class => "com.mysql.jdbc.Driver"
    ### connection
    connection_string => "jdbc:mysql://localhsot:3306/monitor?useUnicode=true&characterEncoding=utf8&useSSL=false"
    username => "root"
    password => "clover"
    ### sql ip 地址用 INET_NTOA(`host_ip`)来获取
    statement => ["INSERT INTO logstash_log_temp(serve, level, pid, ip, content, logdate) VALUES(?, ?, ? ,INET_ATON(?), ? ,?)", "[serve]", "[level]", "[pid]", "[ip]", "[content]", "[logdate]"]
    ]
  }

  ### logstash 数据转化成 json 格局写入日志中
  stdout {codec => json_lines}
}

正文完
 0