关于logstash:将-nxlog-采集的-windows-日志转换为标准化-ELK-ECS-日志

一、用nxlog采集windows日志########################################################################### 根底配置 ############################################################################# 64零碎define ROOT C:\Program Files (x86)\nxlog# 32零碎#define ROOT C:\Program Files\nxlogModuledir %ROOT%\modulesCacheDir %ROOT%\dataPidfile %ROOT%\data\nxlog.pidSpoolDir %ROOT%\dataLogFile %ROOT%\data\nxlog.log# 开启GELF格局扩大,并定义最大日志长度<Extension gelf> Module xm_gelf ShortMessageLength 65536</Extension># 开启JSON扩大 <Extension json> Module xm_json</Extension># 开启主动转码<Extension _charconv> Module xm_charconv AutodetectCharsets iso8859-2, utf-8, utf-16, utf-32</Extension>########################################################################### 输出配置-windows日志 ############################################################################# 因为NXLOG社区版本最大只能发送256个Channel,而目前windows2016,曾经超过300个,因而局部日志采集不到,须要手动查问通道,来避免单次查问超过256<Input APP_Logs># WIN7 以上 Module im_msvistalog# WINDOWS 2003# Module im_mseventlog # 查问Application通道 Query <QueryList><Query Id="0"><Select Path="Application">*</Select></Query></QueryList> # 过滤所有类型为具体的事件类型 Exec if $EventType == 'VERBOSE' drop(); Exec $Hostname = hostname();</Input><Input SYS_Logs># WIN7 以上 Module im_msvistalog# WINDOWS 2003# Module im_mseventlog # 查问System通道 Query <QueryList><Query Id="0"><Select Path="System">*</Select></Query></QueryList> # 过滤所有类型为具体的事件类型 Exec if $EventType == 'VERBOSE' drop(); Exec $Hostname = hostname();</Input><Input SEC_Logs># WIN7 以上 Module im_msvistalog# WINDOWS 2003# Module im_mseventlog # 查问Security通道 Query <QueryList><Query Id="0"><Select Path="Security">*</Select></Query></QueryList> # 过滤所有类型为具体的事件类型 Exec if $EventType == 'VERBOSE' drop(); Exec $Hostname = hostname();</Input>########################################################################### 输入配置 ############################################################################<Output Logstash> Module om_udp Host logstash-ip Port 5414 OutputType GELF</Output><Route APP> Path APP_Logs => Logstash</Route><Route SYS> Path SYS_Logs => Logstash</Route><Route SYS> Path SEC_Logs => Logstash</Route>二、用logstash,将windows日志进一步整顿备注:二和三能够合并在一起,而无需kafka ...

September 14, 2022 · 7 min · jiezi

关于logstash:logstash配置样例

logstash.conf,放到{logstash}/conf.d目录下 # Sample Logstash configuration for creating a simple# Beats -> Logstash -> Elasticsearch pipeline.input { # tcp { # mode => "server" # host => "0.0.0.0" # port => 5044 # tcp_keep_alive => true # codec => json_lines # } file { #监听文件的门路 path => ["/opt/logs"] #设置新事件的标记 delimiter => "\n" #设置多长时间扫描目录,发现新文件 discover_interval => 15 #设置多长时间检测文件是否批改 stat_interval => 3 #监听文件的起始地位,默认是end start_position => beginning #设置多长时间会写入读取的地位信息 sincedb_write_interval => 5 codec => json_lines } http{ host => "0.0.0.0" port => 8080 additional_codecs => {"application/json"=>"json"} codec => "json_lines" threads => 4 ssl => false } # kafka{ # bootstrap_servers => ["x.x.x.x:9092"] # auto_offset_reset => "latest" # consumer_threads => 5 # decorate_events => false # 须要监听的topic # topics => ["spy_admin_topic", "sd_topic"] # codec => json_lines # }}filter {# date {# timezone => "Asia/Shanghai"# match => ["timestamp", "yyyy-MM-dd HH:mm:ss S"]# } # https://www.elastic.co/guide/en/logstash/current/plugins-filters-date.html#plugins-filters-date-match date { match => [ "@timestamp", "ISO8601" ] locale => "cn" target => "@timestamp" } if "_" in [ad_id] { mutate { split => { "ad_id" => "_" } add_field => { "t_ad_id" => "%{[ad_id][1]}" } } mutate { rename => {"t_ad_id" => "ad_id"} } } mutate { convert => { "client_id" => "integer" "@version" => "integer" "app_name" => "string" "file" => "string" "host" => "string" "status_code" => "integer" "duration" => "integer" "size" => "integer" "cnt7" => "integer" "cnt6" => "integer" "cnt5" => "integer" "cnt4" => "integer" "cnt3" => "integer" "cnt2" => "integer" "cnt1" => "integer" } remove_field => [ "@version", "_score", "_source", "_type", "timestamp", "level_value", "logger_name" ] }}output { elasticsearch { hosts => ["http://localhost:9200"] index => "%{[app_name]}" user => "elastic" password => "1qaz@WSX3edc" pool_max => 500 pool_max_per_route => 2000 retry_initial_interval => 4 retry_max_interval => 16 } #if [level] == "ERROR" { # mongodb { # collection => "%{[app_name]}" # database => "logstash" # isodate => true # uri => "mongodb://spy_user:L2LRi7BAAP163fii@107.150.33.170:33017/?authSource=admin" # bulk => true # codec => json # } #}}

May 27, 2022 · 2 min · jiezi

关于logstash:Expected-ut-at-line-1-column-1-byte-1

呈现如下问题阐明是字符编码问题pipeline_id:main, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of [ \\t\\r\\n], \"#\", \"input\", \"filter\", \"output\" at line 1, column 1 (byte 1)", :backtrace=>["/data/logstash-7.9.0/logstash-core/lib/logstash/compiler.rb:32:in `compile_imperative'", "org/logstash/execution/AbstractPipelineExt.java:183:in `initialize'", "org/logstash/execution/JavaBasePipelineExt.java:69:in `initialize'", "/data/logstash-7.9.0/logstash-core/lib/logstash/java_pipeline.rb:44:in `initialize'", "/data/logstash-7.9.0/logstash-core/lib/logstash/pipeline_action/create.rb:52:in `execute'", "/data/logstash-7.9.0/logstash-core/lib/logstash/agent.rb:357:in `block in converge_state'"]} 我呈现的状况是在容器里面的logstash启动没有问题,然而容器里就有问题,如上图问题应该说docker容器里没有utf8的编码,所以docker里的文件不要应用中文

March 25, 2022 · 1 min · jiezi

关于logstash:Logstash-grok-匹配拆分日志

[root@logstash-1 conf.d]# cat h3c_syslog-e.conf input { kafka { bootstrap_servers => "10.26.x.2x:9092,10.26.5.2x:9092,10.2x.5.2x:9092" group_id => "h3c_syslog" topics => ["h3c_syslog"] codec => "json" consumer_threads => 5 decorate_events => false auto_offset_reset => "earliest" #latest最新的;earliest最开始的 }}filter { if [fields][logtype] == "Test_Log1" { grok { patterns_dir => "/data/logstash-7.9.0/config/conf.d/patterns" match => [ "message", "%{Agent:datetime} %{Hostnamenetwork:Hostname} %{Agent:messages}"] remove_field => [ "message","agent.hostname","ecs.version","topic","hostname","host.name","agent.ephemeral_id","agent.id","tags" ] } }}output { elasticsearch { hosts => ["10.26.5.xx:9200","10.xx.5.2x:9200","10.2x.5.2x:9200"] index => "h3c_syslog-%{+YYYY-MM-dd}" user => elastic password => "X666xxx8" }}[root@logstash-1 conf.d]# cat patterns Hostnamenetwork \bAcc-\S+\bAgent [\s\S]+USERNAME [a-zA-Z0-9._-]+USER %{USERNAME}INT (?:[+-]?(?:[0-9]+))BASE10NUM (?<![0-9.+-])(?>[+-]?(?:(?:[0-9]+(?:\.[0-9]+)?)|(?:\.[0-9]+)))NUMBER (?:%{BASE10NUM})BASE16NUM (?<![0-9A-Fa-f])(?:[+-]?(?:0x)?(?:[0-9A-Fa-f]+))BASE16FLOAT \b(?<![0-9A-Fa-f.])(?:[+-]?(?:0x)?(?:(?:[0-9A-Fa-f]+(?:\.[0-9A-Fa-f]*)?)|(?:\.[0-9A-Fa-f]+)))\bPOSINT \b(?:[1-9][0-9]*)\bNONNEGINT \b(?:[0-9]+)\bWORD \b\w+\bNOTSPACE \S+SPACE \s*DATA .*?GREEDYDATA .*QUOTEDSTRING (?>(?<!\\)(?>"(?>\\.|[^\\"]+)+"|""|(?>'(?>\\.|[^\\']+)+')|''|(?>`(?>\\.|[^\\`]+)+`)|``))UUID [A-Fa-f0-9]{8}-(?:[A-Fa-f0-9]{4}-){3}[A-Fa-f0-9]{12}# NetworkingMAC (?:%{CISCOMAC}|%{WINDOWSMAC}|%{COMMONMAC})CISCOMAC (?:(?:[A-Fa-f0-9]{4}\.){2}[A-Fa-f0-9]{4})WINDOWSMAC (?:(?:[A-Fa-f0-9]{2}-){5}[A-Fa-f0-9]{2})COMMONMAC (?:(?:[A-Fa-f0-9]{2}:){5}[A-Fa-f0-9]{2})IPV6 ((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:)))(%.+)?IPV4 (?<![0-9])(?:(?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2}))(?![0-9])IP (?:%{IPV6}|%{IPV4})HOSTNAME \b(?:[0-9A-Za-z][0-9A-Za-z-]{0,62})(?:\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*(\.?|\b)HOST %{HOSTNAME}IPORHOST (?:%{HOSTNAME}|%{IP})HOSTPORT %{IPORHOST}:%{POSINT}# pathsPATH (?:%{UNIXPATH}|%{WINPATH})UNIXPATH (?>/(?>[\w_%!$@:.,-]+|\\.)*)+TTY (?:/dev/(pts|tty([pq])?)(\w+)?/?(?:[0-9]+))WINPATH (?>[A-Za-z]+:|\\)(?:\\[^\\?*]*)+URIPROTO [A-Za-z]+(\+[A-Za-z+]+)?URIHOST %{IPORHOST}(?::%{POSINT:port})?# uripath comes loosely from RFC1738, but mostly from what Firefox# doesn't turn into %XXURIPATH (?:/[A-Za-z0-9$.+!*'(){},~:;=@#%_\-]*)+#URIPARAM \?(?:[A-Za-z0-9]+(?:=(?:[^&]*))?(?:&(?:[A-Za-z0-9]+(?:=(?:[^&]*))?)?)*)?URIPARAM \?[A-Za-z0-9$.+!*'|(){},~@#%&/=:;_?\-\[\]]*URIPATHPARAM %{URIPATH}(?:%{URIPARAM})?URI %{URIPROTO}://(?:%{USER}(?::[^@]*)?@)?(?:%{URIHOST})?(?:%{URIPATHPARAM})?# Months: January, Feb, 3, 03, 12, DecemberMONTH \b(?:Jan(?:uary)?|Feb(?:ruary)?|Mar(?:ch)?|Apr(?:il)?|May|Jun(?:e)?|Jul(?:y)?|Aug(?:ust)?|Sep(?:tember)?|Oct(?:ober)?|Nov(?:ember)?|Dec(?:ember)?)\bMONTHNUM (?:0?[1-9]|1[0-2])MONTHNUM2 (?:0[1-9]|1[0-2])MONTHDAY (?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9])# Days: Monday, Tue, Thu, etc...DAY (?:Mon(?:day)?|Tue(?:sday)?|Wed(?:nesday)?|Thu(?:rsday)?|Fri(?:day)?|Sat(?:urday)?|Sun(?:day)?)# Years?YEAR (?>\d\d){1,2}HOUR (?:2[0123]|[01]?[0-9])MINUTE (?:[0-5][0-9])# '60' is a leap second in most time standards and thus is valid.SECOND (?:(?:[0-5]?[0-9]|60)(?:[:.,][0-9]+)?)TIME (?!<[0-9])%{HOUR}:%{MINUTE}(?::%{SECOND})(?![0-9])# datestamp is YYYY/MM/DD-HH:MM:SS.UUUU (or something like it)DATE_US %{MONTHNUM}[/-]%{MONTHDAY}[/-]%{YEAR}DATE_EU %{MONTHDAY}[./-]%{MONTHNUM}[./-]%{YEAR}ISO8601_TIMEZONE (?:Z|[+-]%{HOUR}(?::?%{MINUTE}))ISO8601_SECOND (?:%{SECOND}|60)TIMESTAMP_ISO8601 %{YEAR}-%{MONTHNUM}-%{MONTHDAY}[T ]%{HOUR}:?%{MINUTE}(?::?%{SECOND})?%{ISO8601_TIMEZONE}?DATE %{DATE_US}|%{DATE_EU}DATESTAMP %{DATE}[- ]%{TIME}TZ (?:[PMCE][SD]T|UTC)DATESTAMP_RFC822 %{DAY} %{MONTH} %{MONTHDAY} %{YEAR} %{TIME} %{TZ}DATESTAMP_RFC2822 %{DAY}, %{MONTHDAY} %{MONTH} %{YEAR} %{TIME} %{ISO8601_TIMEZONE}DATESTAMP_OTHER %{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{TZ} %{YEAR}DATESTAMP_EVENTLOG %{YEAR}%{MONTHNUM2}%{MONTHDAY}%{HOUR}%{MINUTE}%{SECOND}# Syslog Dates: Month Day HH:MM:SSSYSLOGTIMESTAMP %{MONTH} +%{MONTHDAY} %{TIME}PROG (?:[\w._/%-]+)SYSLOGPROG %{PROG:program}(?:\[%{POSINT:pid}\])?SYSLOGHOST %{IPORHOST}SYSLOGFACILITY <%{NONNEGINT:facility}.%{NONNEGINT:priority}>HTTPDATE %{MONTHDAY}/%{MONTH}/%{YEAR}:%{TIME} %{INT}# ShortcutsQS %{QUOTEDSTRING}# Log formatsSYSLOGBASE %{SYSLOGTIMESTAMP:timestamp} (?:%{SYSLOGFACILITY} )?%{SYSLOGHOST:logsource} %{SYSLOGPROG}:COMMONAPACHELOG %{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})" %{NUMBER:response} (?:%{NUMBER:bytes}|-)COMBINEDAPACHELOG %{COMMONAPACHELOG} %{QS:referrer} %{QS:agent}# Log LevelsLOGLEVEL ([Aa]lert|ALERT|[Tt]race|TRACE|[Dd]ebug|DEBUG|[Nn]otice|NOTICE|[Ii]nfo|INFO|[Ww]arn?(?:ing)?|WARN?(?:ING)?|[Ee]rr?(?:or)?|ERR?(?:OR)?|[Cc]rit?(?:ical)?|CRIT?(?:ICAL)?|[Ff]atal|FATAL|[Ss]evere|SEVERE|EMERG(?:ENCY)?|[Ee]merg(?:ency)?) ...

March 25, 2022 · 2 min · jiezi

关于logstash:Logstash-Grok-解析防火墙日志

一、Logstash解析华为防火墙日志示例1.防火墙日志: "<190>Sep 18 2021 04:10:29 DJI-WL-FW-USG6620E-01 %%01POLICY/6/POLICYPERMIT(l):vsys=public, protocol=17, source-ip=192.99.19.56, source-port=50585, destination-ip=192.9.2.87, destination-port=8456, time=2021/9/18 12:10:29, source-zone=Kaifa_CT_01, destination-zone=Internal, application-name=firewall, rule-name=rule_370.\u0000"2.grok 解析语法 (?<time>%{MONTH}\s%{MONTHDAY}\s%{YEAR}\s%{TIME}) %{HOSTNAME:name} %%01POLICY/6/%{WORD:action}\(l\):vsys=%{WORD:vsys}, protocol=%{INT:protocol}, source-ip=%{IP:source_ip}, source-port=%{INT:source_port}, destination-ip=%{IP:destination_ip}, destination-port=%{INT:destination_port}, time=(?<session_time>%{YEAR}/%{MONTHNUM}/%{MONTHDAY}\s%{TIME}), source-zone=%{WORD:source_zone}, destination-zone=%{WORD:destinatione_zone}, (application-name=|application-name=%{WORD:application_name}), rule-name=%{WORD:rule_name}3.解析后果 { "vsys": "public", "destination_port": "8456", "rule_name": "rule_370", "source_zone": "Kaifa_CT_01", "session_time": "2021/9/18 12:10:29", "source_ip": "192.99.19.56", "protocol": "17", "destination_ip": "192.9.2.87", "destinatione_zone": "Internal", "application_name": "firewall", "source_port": "50585", "name": "DJI-WL-FW-USG6620E-01", "action": "POLICYPERMIT", "time": "Sep 18 2021 04:10:29"}

September 18, 2021 · 1 min · jiezi

关于logstash:logstash-简单介绍

参考起源:https://www.zhihu.com/search?... 一、logstash简介 后面装置与配置中已近对es和kibana进行了简略的介绍,这里次要介绍logStash导入日志到es的索引中并应用kibana的搜寻,次要介绍logstash相干。logstash在2008年已近呈现了,Logstash作为Elasicsearch罕用的实时数据采集引擎,能够采集来自不同数据源的数据,并对数据进行解决后输入到多种输入源,是Elastic Stack 的重要组成部分。二、logstash解决流程 名词解析: Pipeline 管道 codecs 编解码器 如上图所示LogStash的数据处理过程次要包含:input.filters.Outputs三局部。 另外在Inputs和Outputs中能够应用Codecs对数据格式进行解决。1)Inputs:用于从数据源获取数据,常见的插件如file, syslog, redis, beats 等2)Filters:用于解决数据如格局转换,数据派生等,常见的插件如grok, mutate, drop, clone, geoip等3)Outputs:用于数据输入,常见的插件如elastcisearch,file, graphite, statsd等4)Codecs:Codecs不是一个独自的流程,而是在输出和输入等插件中用于数据转换的模块,用于对数据进行编码解决,常见的插件如json,multiline1、执行模型 (1)每个Input启动一个线程,从对应数据源获取数据(2)Input会将数据写入一个队列:默认为内存中的有界队列(意外进行会导致数据失落)。为了避免数失落Logstash提供了两个个性:dPersistent Queues:通过磁盘上的queue来避免数据失落Dead Letter Queues:保留无奈解决的event(仅反对Elastic(3)Logstash会有多个pipeline worker, 每一个pipeline worker会从队列中取一批数据,而后执行filter和output(worker数目及每次解决的数据量均由配置确定)三、实例 1、日志采集 采纳的工具 FileBeatinput 插件,采集本地日志,而后将后果输入。FileBeat介绍上面会整合FileBeat来采集日志,它能够采集sql、redis、以及ide运行环境的日志

March 30, 2021 · 1 min · jiezi

关于logstash:ELK实践

整体架构:逻辑架构: 一个零碎(project)蕴含多个服务(service),一个服务有多个正本,elk零碎须要将不同零碎的日志对立归集,按project-service-date归集到index便于查看系统日志,防止寻找零碎所有正本日志的繁琐。Filebeat定义所采集的零碎、服务及其日志地址,发送到project-service的topic。Logstash订阅kafka以该project结尾所有的topic,通过对立解析后增加日期信息,发往project-service-date的index中。如果须要采集其余系统日志,更改filebeat的project,service及日志地址,并在logstash增加新的数据源(project-*)即可。 filebeat:采集日志发往kafka input: 配置日志地址,监听*All.log文件,不采集error,防止日志反复采集定义project和service配置换行,如果非日期结尾,则合并至下一行,便于采集堆栈报错信息Output: kafka:topic为projectservice部署: 把配置打进镜像,在swarm集群中以global模式部署,Docker-compose中把监听的日志文件映射进容器Logstash:解析数据发往es input:kafka group_id:分布式环境下group_id应该雷同,可横向扩大topic_pattern,通过正则解决同一project的所有topicfilter: 通过mutate,json,grok一条日志中蕴含:日志工夫,线程号,日志级别,日志信息。把日志工夫里的日期解析进去,做为index里的date,插入es。output: 可依据不同的条件发送至不同的数据库

February 24, 2021 · 1 min · jiezi