关于logstash:将-nxlog-采集的-windows-日志转换为标准化-ELK-ECS-日志

一、用nxlog采集windows日志########################################################################### 根底配置 ############################################################################# 64零碎define ROOT C:\Program Files (x86)\nxlog# 32零碎#define ROOT C:\Program Files\nxlogModuledir %ROOT%\modulesCacheDir %ROOT%\dataPidfile %ROOT%\data\nxlog.pidSpoolDir %ROOT%\dataLogFile %ROOT%\data\nxlog.log# 开启GELF格局扩大,并定义最大日志长度<Extension gelf> Module xm_gelf ShortMessageLength 65536</Extension># 开启JSON扩大 <Extension json> Module xm_json</Extension># 开启主动转码<Extension _charconv> Module xm_charconv AutodetectCharsets iso8859-2, utf-8, utf-16, utf-32</Extension>########################################################################### 输出配置-windows日志 ############################################################################# 因为NXLOG社区版本最大只能发送256个Channel,而目前windows2016,曾经超过300个,因而局部日志采集不到,须要手动查问通道,来避免单次查问超过256<Input APP_Logs># WIN7 以上 Module im_msvistalog# WINDOWS 2003# Module im_mseventlog # 查问Application通道 Query <QueryList><Query Id="0"><Select Path="Application">*</Select></Query></QueryList> # 过滤所有类型为具体的事件类型 Exec if $EventType == 'VERBOSE' drop(); Exec $Hostname = hostname();</Input><Input SYS_Logs># WIN7 以上 Module im_msvistalog# WINDOWS 2003# Module im_mseventlog # 查问System通道 Query <QueryList><Query Id="0"><Select Path="System">*</Select></Query></QueryList> # 过滤所有类型为具体的事件类型 Exec if $EventType == 'VERBOSE' drop(); Exec $Hostname = hostname();</Input><Input SEC_Logs># WIN7 以上 Module im_msvistalog# WINDOWS 2003# Module im_mseventlog # 查问Security通道 Query <QueryList><Query Id="0"><Select Path="Security">*</Select></Query></QueryList> # 过滤所有类型为具体的事件类型 Exec if $EventType == 'VERBOSE' drop(); Exec $Hostname = hostname();</Input>########################################################################### 输入配置 ############################################################################<Output Logstash> Module om_udp Host logstash-ip Port 5414 OutputType GELF</Output><Route APP> Path APP_Logs => Logstash</Route><Route SYS> Path SYS_Logs => Logstash</Route><Route SYS> Path SEC_Logs => Logstash</Route>二、用logstash,将windows日志进一步整顿备注:二和三能够合并在一起,而无需kafka ...

September 14, 2022 · 7 min · jiezi

关于logstash:logstash配置样例

logstash.conf,放到{logstash}/conf.d目录下 # Sample Logstash configuration for creating a simple# Beats -> Logstash -> Elasticsearch pipeline.input { # tcp { # mode => "server" # host => "0.0.0.0" # port => 5044 # tcp_keep_alive => true # codec => json_lines # } file { #监听文件的门路 path => ["/opt/logs"] #设置新事件的标记 delimiter => "\n" #设置多长时间扫描目录,发现新文件 discover_interval => 15 #设置多长时间检测文件是否批改 stat_interval => 3 #监听文件的起始地位,默认是end start_position => beginning #设置多长时间会写入读取的地位信息 sincedb_write_interval => 5 codec => json_lines } http{ host => "0.0.0.0" port => 8080 additional_codecs => {"application/json"=>"json"} codec => "json_lines" threads => 4 ssl => false } # kafka{ # bootstrap_servers => ["x.x.x.x:9092"] # auto_offset_reset => "latest" # consumer_threads => 5 # decorate_events => false # 须要监听的topic # topics => ["spy_admin_topic", "sd_topic"] # codec => json_lines # }}filter {# date {# timezone => "Asia/Shanghai"# match => ["timestamp", "yyyy-MM-dd HH:mm:ss S"]# } # https://www.elastic.co/guide/en/logstash/current/plugins-filters-date.html#plugins-filters-date-match date { match => [ "@timestamp", "ISO8601" ] locale => "cn" target => "@timestamp" } if "_" in [ad_id] { mutate { split => { "ad_id" => "_" } add_field => { "t_ad_id" => "%{[ad_id][1]}" } } mutate { rename => {"t_ad_id" => "ad_id"} } } mutate { convert => { "client_id" => "integer" "@version" => "integer" "app_name" => "string" "file" => "string" "host" => "string" "status_code" => "integer" "duration" => "integer" "size" => "integer" "cnt7" => "integer" "cnt6" => "integer" "cnt5" => "integer" "cnt4" => "integer" "cnt3" => "integer" "cnt2" => "integer" "cnt1" => "integer" } remove_field => [ "@version", "_score", "_source", "_type", "timestamp", "level_value", "logger_name" ] }}output { elasticsearch { hosts => ["http://localhost:9200"] index => "%{[app_name]}" user => "elastic" password => "1qaz@WSX3edc" pool_max => 500 pool_max_per_route => 2000 retry_initial_interval => 4 retry_max_interval => 16 } #if [level] == "ERROR" { # mongodb { # collection => "%{[app_name]}" # database => "logstash" # isodate => true # uri => "mongodb://spy_user:L2LRi7BAAP163fii@107.150.33.170:33017/?authSource=admin" # bulk => true # codec => json # } #}}

May 27, 2022 · 2 min · jiezi

关于logstash:Expected-ut-at-line-1-column-1-byte-1

呈现如下问题阐明是字符编码问题pipeline_id:main, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of [ \\t\\r\\n], \"#\", \"input\", \"filter\", \"output\" at line 1, column 1 (byte 1)", :backtrace=>["/data/logstash-7.9.0/logstash-core/lib/logstash/compiler.rb:32:in `compile_imperative'", "org/logstash/execution/AbstractPipelineExt.java:183:in `initialize'", "org/logstash/execution/JavaBasePipelineExt.java:69:in `initialize'", "/data/logstash-7.9.0/logstash-core/lib/logstash/java_pipeline.rb:44:in `initialize'", "/data/logstash-7.9.0/logstash-core/lib/logstash/pipeline_action/create.rb:52:in `execute'", "/data/logstash-7.9.0/logstash-core/lib/logstash/agent.rb:357:in `block in converge_state'"]} 我呈现的状况是在容器里面的logstash启动没有问题,然而容器里就有问题,如上图问题应该说docker容器里没有utf8的编码,所以docker里的文件不要应用中文

March 25, 2022 · 1 min · jiezi

关于logstash:Logstash-grok-匹配拆分日志

[root@logstash-1 conf.d]# cat h3c_syslog-e.conf input { kafka { bootstrap_servers => "10.26.x.2x:9092,10.26.5.2x:9092,10.2x.5.2x:9092" group_id => "h3c_syslog" topics => ["h3c_syslog"] codec => "json" consumer_threads => 5 decorate_events => false auto_offset_reset => "earliest" #latest最新的;earliest最开始的 }}filter { if [fields][logtype] == "Test_Log1" { grok { patterns_dir => "/data/logstash-7.9.0/config/conf.d/patterns" match => [ "message", "%{Agent:datetime} %{Hostnamenetwork:Hostname} %{Agent:messages}"] remove_field => [ "message","agent.hostname","ecs.version","topic","hostname","host.name","agent.ephemeral_id","agent.id","tags" ] } }}output { elasticsearch { hosts => ["10.26.5.xx:9200","10.xx.5.2x:9200","10.2x.5.2x:9200"] index => "h3c_syslog-%{+YYYY-MM-dd}" user => elastic password => "X666xxx8" }}[root@logstash-1 conf.d]# cat patterns Hostnamenetwork \bAcc-\S+\bAgent [\s\S]+USERNAME [a-zA-Z0-9._-]+USER %{USERNAME}INT (?:[+-]?(?:[0-9]+))BASE10NUM (?<![0-9.+-])(?>[+-]?(?:(?:[0-9]+(?:\.[0-9]+)?)|(?:\.[0-9]+)))NUMBER (?:%{BASE10NUM})BASE16NUM (?<![0-9A-Fa-f])(?:[+-]?(?:0x)?(?:[0-9A-Fa-f]+))BASE16FLOAT \b(?<![0-9A-Fa-f.])(?:[+-]?(?:0x)?(?:(?:[0-9A-Fa-f]+(?:\.[0-9A-Fa-f]*)?)|(?:\.[0-9A-Fa-f]+)))\bPOSINT \b(?:[1-9][0-9]*)\bNONNEGINT \b(?:[0-9]+)\bWORD \b\w+\bNOTSPACE \S+SPACE \s*DATA .*?GREEDYDATA .*QUOTEDSTRING (?>(?<!\\)(?>"(?>\\.|[^\\"]+)+"|""|(?>'(?>\\.|[^\\']+)+')|''|(?>`(?>\\.|[^\\`]+)+`)|``))UUID [A-Fa-f0-9]{8}-(?:[A-Fa-f0-9]{4}-){3}[A-Fa-f0-9]{12}# NetworkingMAC (?:%{CISCOMAC}|%{WINDOWSMAC}|%{COMMONMAC})CISCOMAC (?:(?:[A-Fa-f0-9]{4}\.){2}[A-Fa-f0-9]{4})WINDOWSMAC (?:(?:[A-Fa-f0-9]{2}-){5}[A-Fa-f0-9]{2})COMMONMAC (?:(?:[A-Fa-f0-9]{2}:){5}[A-Fa-f0-9]{2})IPV6 ((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:)))(%.+)?IPV4 (?<![0-9])(?:(?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2}))(?![0-9])IP (?:%{IPV6}|%{IPV4})HOSTNAME \b(?:[0-9A-Za-z][0-9A-Za-z-]{0,62})(?:\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*(\.?|\b)HOST %{HOSTNAME}IPORHOST (?:%{HOSTNAME}|%{IP})HOSTPORT %{IPORHOST}:%{POSINT}# pathsPATH (?:%{UNIXPATH}|%{WINPATH})UNIXPATH (?>/(?>[\w_%!$@:.,-]+|\\.)*)+TTY (?:/dev/(pts|tty([pq])?)(\w+)?/?(?:[0-9]+))WINPATH (?>[A-Za-z]+:|\\)(?:\\[^\\?*]*)+URIPROTO [A-Za-z]+(\+[A-Za-z+]+)?URIHOST %{IPORHOST}(?::%{POSINT:port})?# uripath comes loosely from RFC1738, but mostly from what Firefox# doesn't turn into %XXURIPATH (?:/[A-Za-z0-9$.+!*'(){},~:;=@#%_\-]*)+#URIPARAM \?(?:[A-Za-z0-9]+(?:=(?:[^&]*))?(?:&(?:[A-Za-z0-9]+(?:=(?:[^&]*))?)?)*)?URIPARAM \?[A-Za-z0-9$.+!*'|(){},~@#%&/=:;_?\-\[\]]*URIPATHPARAM %{URIPATH}(?:%{URIPARAM})?URI %{URIPROTO}://(?:%{USER}(?::[^@]*)?@)?(?:%{URIHOST})?(?:%{URIPATHPARAM})?# Months: January, Feb, 3, 03, 12, DecemberMONTH \b(?:Jan(?:uary)?|Feb(?:ruary)?|Mar(?:ch)?|Apr(?:il)?|May|Jun(?:e)?|Jul(?:y)?|Aug(?:ust)?|Sep(?:tember)?|Oct(?:ober)?|Nov(?:ember)?|Dec(?:ember)?)\bMONTHNUM (?:0?[1-9]|1[0-2])MONTHNUM2 (?:0[1-9]|1[0-2])MONTHDAY (?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9])# Days: Monday, Tue, Thu, etc...DAY (?:Mon(?:day)?|Tue(?:sday)?|Wed(?:nesday)?|Thu(?:rsday)?|Fri(?:day)?|Sat(?:urday)?|Sun(?:day)?)# Years?YEAR (?>\d\d){1,2}HOUR (?:2[0123]|[01]?[0-9])MINUTE (?:[0-5][0-9])# '60' is a leap second in most time standards and thus is valid.SECOND (?:(?:[0-5]?[0-9]|60)(?:[:.,][0-9]+)?)TIME (?!<[0-9])%{HOUR}:%{MINUTE}(?::%{SECOND})(?![0-9])# datestamp is YYYY/MM/DD-HH:MM:SS.UUUU (or something like it)DATE_US %{MONTHNUM}[/-]%{MONTHDAY}[/-]%{YEAR}DATE_EU %{MONTHDAY}[./-]%{MONTHNUM}[./-]%{YEAR}ISO8601_TIMEZONE (?:Z|[+-]%{HOUR}(?::?%{MINUTE}))ISO8601_SECOND (?:%{SECOND}|60)TIMESTAMP_ISO8601 %{YEAR}-%{MONTHNUM}-%{MONTHDAY}[T ]%{HOUR}:?%{MINUTE}(?::?%{SECOND})?%{ISO8601_TIMEZONE}?DATE %{DATE_US}|%{DATE_EU}DATESTAMP %{DATE}[- ]%{TIME}TZ (?:[PMCE][SD]T|UTC)DATESTAMP_RFC822 %{DAY} %{MONTH} %{MONTHDAY} %{YEAR} %{TIME} %{TZ}DATESTAMP_RFC2822 %{DAY}, %{MONTHDAY} %{MONTH} %{YEAR} %{TIME} %{ISO8601_TIMEZONE}DATESTAMP_OTHER %{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{TZ} %{YEAR}DATESTAMP_EVENTLOG %{YEAR}%{MONTHNUM2}%{MONTHDAY}%{HOUR}%{MINUTE}%{SECOND}# Syslog Dates: Month Day HH:MM:SSSYSLOGTIMESTAMP %{MONTH} +%{MONTHDAY} %{TIME}PROG (?:[\w._/%-]+)SYSLOGPROG %{PROG:program}(?:\[%{POSINT:pid}\])?SYSLOGHOST %{IPORHOST}SYSLOGFACILITY <%{NONNEGINT:facility}.%{NONNEGINT:priority}>HTTPDATE %{MONTHDAY}/%{MONTH}/%{YEAR}:%{TIME} %{INT}# ShortcutsQS %{QUOTEDSTRING}# Log formatsSYSLOGBASE %{SYSLOGTIMESTAMP:timestamp} (?:%{SYSLOGFACILITY} )?%{SYSLOGHOST:logsource} %{SYSLOGPROG}:COMMONAPACHELOG %{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})" %{NUMBER:response} (?:%{NUMBER:bytes}|-)COMBINEDAPACHELOG %{COMMONAPACHELOG} %{QS:referrer} %{QS:agent}# Log LevelsLOGLEVEL ([Aa]lert|ALERT|[Tt]race|TRACE|[Dd]ebug|DEBUG|[Nn]otice|NOTICE|[Ii]nfo|INFO|[Ww]arn?(?:ing)?|WARN?(?:ING)?|[Ee]rr?(?:or)?|ERR?(?:OR)?|[Cc]rit?(?:ical)?|CRIT?(?:ICAL)?|[Ff]atal|FATAL|[Ss]evere|SEVERE|EMERG(?:ENCY)?|[Ee]merg(?:ency)?) ...

March 25, 2022 · 2 min · jiezi

关于logstash:Logstash-Grok-解析防火墙日志

一、Logstash解析华为防火墙日志示例1.防火墙日志: "<190>Sep 18 2021 04:10:29 DJI-WL-FW-USG6620E-01 %%01POLICY/6/POLICYPERMIT(l):vsys=public, protocol=17, source-ip=192.99.19.56, source-port=50585, destination-ip=192.9.2.87, destination-port=8456, time=2021/9/18 12:10:29, source-zone=Kaifa_CT_01, destination-zone=Internal, application-name=firewall, rule-name=rule_370.\u0000"2.grok 解析语法 (?<time>%{MONTH}\s%{MONTHDAY}\s%{YEAR}\s%{TIME}) %{HOSTNAME:name} %%01POLICY/6/%{WORD:action}\(l\):vsys=%{WORD:vsys}, protocol=%{INT:protocol}, source-ip=%{IP:source_ip}, source-port=%{INT:source_port}, destination-ip=%{IP:destination_ip}, destination-port=%{INT:destination_port}, time=(?<session_time>%{YEAR}/%{MONTHNUM}/%{MONTHDAY}\s%{TIME}), source-zone=%{WORD:source_zone}, destination-zone=%{WORD:destinatione_zone}, (application-name=|application-name=%{WORD:application_name}), rule-name=%{WORD:rule_name}3.解析后果 { "vsys": "public", "destination_port": "8456", "rule_name": "rule_370", "source_zone": "Kaifa_CT_01", "session_time": "2021/9/18 12:10:29", "source_ip": "192.99.19.56", "protocol": "17", "destination_ip": "192.9.2.87", "destinatione_zone": "Internal", "application_name": "firewall", "source_port": "50585", "name": "DJI-WL-FW-USG6620E-01", "action": "POLICYPERMIT", "time": "Sep 18 2021 04:10:29"}

September 18, 2021 · 1 min · jiezi

关于logstash:logstash-简单介绍

参考起源:https://www.zhihu.com/search?... 一、logstash简介 后面装置与配置中已近对es和kibana进行了简略的介绍,这里次要介绍logStash导入日志到es的索引中并应用kibana的搜寻,次要介绍logstash相干。logstash在2008年已近呈现了,Logstash作为Elasicsearch罕用的实时数据采集引擎,能够采集来自不同数据源的数据,并对数据进行解决后输入到多种输入源,是Elastic Stack 的重要组成部分。二、logstash解决流程 名词解析: Pipeline 管道 codecs 编解码器 如上图所示LogStash的数据处理过程次要包含:input.filters.Outputs三局部。 另外在Inputs和Outputs中能够应用Codecs对数据格式进行解决。1)Inputs:用于从数据源获取数据,常见的插件如file, syslog, redis, beats 等2)Filters:用于解决数据如格局转换,数据派生等,常见的插件如grok, mutate, drop, clone, geoip等3)Outputs:用于数据输入,常见的插件如elastcisearch,file, graphite, statsd等4)Codecs:Codecs不是一个独自的流程,而是在输出和输入等插件中用于数据转换的模块,用于对数据进行编码解决,常见的插件如json,multiline1、执行模型 (1)每个Input启动一个线程,从对应数据源获取数据(2)Input会将数据写入一个队列:默认为内存中的有界队列(意外进行会导致数据失落)。为了避免数失落Logstash提供了两个个性:dPersistent Queues:通过磁盘上的queue来避免数据失落Dead Letter Queues:保留无奈解决的event(仅反对Elastic(3)Logstash会有多个pipeline worker, 每一个pipeline worker会从队列中取一批数据,而后执行filter和output(worker数目及每次解决的数据量均由配置确定)三、实例 1、日志采集 采纳的工具 FileBeatinput 插件,采集本地日志,而后将后果输入。FileBeat介绍上面会整合FileBeat来采集日志,它能够采集sql、redis、以及ide运行环境的日志

March 30, 2021 · 1 min · jiezi

关于logstash:ELK实践

整体架构:逻辑架构: 一个零碎(project)蕴含多个服务(service),一个服务有多个正本,elk零碎须要将不同零碎的日志对立归集,按project-service-date归集到index便于查看系统日志,防止寻找零碎所有正本日志的繁琐。Filebeat定义所采集的零碎、服务及其日志地址,发送到project-service的topic。Logstash订阅kafka以该project结尾所有的topic,通过对立解析后增加日期信息,发往project-service-date的index中。如果须要采集其余系统日志,更改filebeat的project,service及日志地址,并在logstash增加新的数据源(project-*)即可。 filebeat:采集日志发往kafka input: 配置日志地址,监听*All.log文件,不采集error,防止日志反复采集定义project和service配置换行,如果非日期结尾,则合并至下一行,便于采集堆栈报错信息Output: kafka:topic为projectservice部署: 把配置打进镜像,在swarm集群中以global模式部署,Docker-compose中把监听的日志文件映射进容器Logstash:解析数据发往es input:kafka group_id:分布式环境下group_id应该雷同,可横向扩大topic_pattern,通过正则解决同一project的所有topicfilter: 通过mutate,json,grok一条日志中蕴含:日志工夫,线程号,日志级别,日志信息。把日志工夫里的日期解析进去,做为index里的date,插入es。output: 可依据不同的条件发送至不同的数据库

February 24, 2021 · 1 min · jiezi

如何通过Logstash同步多表关联数据至Elasticsearch

如果你对 使用Logstash保持Elasticsearch与数据库同步 方案还不是很熟悉,建议先花点时间精读它。上面的文章以单表同步场景为例,清楚讲述了如何通过JDBC同步数据至ES,而对于实际开发中经常出现的多表关联同步并未提及,以下是我针对多表关联同步的趟坑过程希望对你有所帮助。数据库表的约定原则同步单表时我们对于表字段的约定: 表中要有主键字段(如id),最近变更时间字段(如modification_time),软删除标记字段(如is_deleted),以便jdbc-input数据采集的轮询Job可以识别出增量变动的数据。提示:jdbc input轮询需要基于modification_time条件查询,所以给该字段加上索引。多表关联同步方案多表关联的情况下我们需要JOIN其他表查询得到结果,这个结果就是ES需要的打平后的宽表。ES新的版本中也增加了join操作,但这事不是ES擅长的,我们选择交给更擅长的数据库处理,让ES只存储打平后的单层索引。 如果你理解单表同步而困惑多表关联同步的话,试着将关联查询的复杂SQL想象(定义)为视图,是不是后续操作就跟单表没区别了! 我们来逐个看下多表关联的同步问题 (假设表a多对多关联表b): 单表的id字段绑定到ES document的_id,可以实现ES索引幂等性,不会出现job原因导致索引文档重复。那对于多表关联的情况呢,可以使用各表id的组合作为document的_id。如SELECT: concat(a.id, '_', b.id) AS docid(如果你不关注幂等,也可以用_id默认生成策略。) 单表基于modification_time就可以识别出自上次轮询后新的变化数据,多表关联的情况呢也类似: (CASE WHEN a.modification_time > b.modification_time THEN a.modification_time ELSE b.modification_time END) AS modification_time同理软删除字段is_deleted的处理逻辑: (CASE WHEN a.is_deleted=0 AND b.is_deleted=0 THEN 0 ELSE 1 END) AS is_deleted这样无论表a还是表b发生变更,都可以被logstash识别出来采集到。 如此我们就可以写出多表关联同步的SQL了,为了方便更新维护SQL及保持logstash-jdbc端conf配置文件的简洁,你可以把SQL定义成一张视图,conf文件中的SQL statement可以像写单表处理一样了。 示例conf: input { jdbc { jdbc_driver_library => "../drivers/mysql-connector-java-8.0.16.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://localhost:3306/es_db?serverTimezone=UTC" jdbc_user => "usr" jdbc_password => "pwd" jdbc_paging_enabled => true tracking_column => "unix_ts_in_secs" use_column_value => true tracking_column_type => "numeric" schedule => "*/5 * * * * *" statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM esview WHERE (UNIX_TIMESTAMP(modification_time) > :sql_last_value AND modification_time < NOW()) ORDER BY modification_time ASC" }}filter { mutate { copy => { "docid" => "[@metadata][_id]"} remove_field => ["docid", "@version", "unix_ts_in_secs"] }}output { elasticsearch { index => "test_idx" document_id => "%{[@metadata][_id]}" }}diboot 简单高效的轻代码开发框架 (求star) ...

July 26, 2019 · 1 min · jiezi

Logstash-参考指南Elasticsearch输入插件

Elasticsearch输入插件插件版本:v4.3.0发布于:2019-02-04更新日志对于其他版本,请参阅版本化插件文档。 获取帮助有关插件的问题,请在讨论论坛中打开一个主题,对于错误或特性请求,请在Github中打开一个问题,关于Elastic支持的插件列表,请考虑Elastic支持矩阵。 描述兼容性说明从Elasticsearch 5.3开始,有一个名为http.content_type.required的HTTP设置,如果此选项设置为true,并且你使用的是Logstash 2.4到5.2,则需要将Elasticsearch输入插件更新到4.0.2或更高版本。 从Elasticsearch集群中读取搜索查询的结果,这对于重放测试日志、重建索引等非常有用。你可以使用cron语法定期调度摄取(请参阅调度设置)或运行查询一次以将数据加载到Logstash中。 示例: input { # 读取Elasticsearch中与给定查询匹配的所有文档 elasticsearch { hosts => "localhost" query => '{ "query": { "match": { "statuscode": 200 } }, "sort": [ "_doc" ] }' }}这将使用以下格式创建Elasticsearch查询: curl 'http://localhost:9200/logstash-*/_search?&scroll=1m&size=1000' -d '{ "query": { "match": { "statuscode": 200 } }, "sort": [ "_doc" ]}'调度可以调度此插件的输入根据特定调度定期运行,此调度语法由rufus-scheduler提供支持,语法类似cron,有一些特定于Rufus的扩展(例如时区支持)。 示例: * 5 * 1-3 *将从1月到3月的每天凌晨5点每分钟执行一次0 * * * *将在每天每小时的第0分钟执行0 6 * * * America/Chicago将在每天上午6:00(UTC/GMT-5)执行可以在此处找到描述此语法的更多文档。 Elasticsearch输入配置选项此插件支持以下配置选项以及稍后描述的通用选项。 设置输入类型必须ca_file一个有效的文件系统路径NodocinfobooleanNodocinfo_fields数组Nodocinfo_targetstringNohosts数组NoindexstringNopasswordstringNoquerystringNoschedulestringNoscrollstringNosizenumberNoslicesnumberNosslbooleanNouserstringNo另请参阅通用选项以获取所有输入插件支持的选项列表。 ...

June 5, 2019 · 1 min · jiezi

网络分析利器wireshark命令版4tshark结合ES

tshark是网络分析工具wireshark下的一个工具,主要用于命令行环境进行抓包、分析,尤其对协议深层解析时,tcpdump难以胜任的场景中。本系列文章将整理介绍tshark相关内容。本文将介绍与tshark相关的流量解决方案。 利用tshark,不仅可以对现有的pcap文件进行分析,由于可以输出其他格式,也就可以结合ES的强大搜索能力,达到对数据报文进行记录、分析处理的能力,可以实现回溯分析,结合kibana可视化工具,甚至达到实时可视化监控。 tshark + elastic stackelastic stack全家桶性能一直被诟病,后来另起炉灶,针对采集使用golang构建出一套beats,用于不同的采集场景。其中针对网络流量,开发出packetbeat。 packetbeat的优势是定制了elasticsearch的mapping、kibana一系列可视化图表,可以满足一般对tcp、dns、udp等常规报文的分析。基本达到开箱即用程度。 但packetbeat也有不足,对报文的分析采用会话分析,没有一个个报文单独分析,倾向于应用层,对网络层面分析不足(尤其是故障排查时),此外,支持的协议有限,仅常见协议与tshark的2000多种存在明显差距,当遇到不支持时,需要等待支持或手动写插件,难度极高。 离线导入elasticsearchtshark支持将pcap报文分析后生成json文件导入elasticsearch,同时支持elasticsearch的批量导入接口_bulk的格式,命令如下: tshark -r test_trace.pcap -T ek > test_trace.pcap.json之后可以将json文件通过curl导入。 curl -s -H "Content-Type: application/x-ndjson" -XPOST "localhost:9200/foo/_bulk" --data-binary "@/Users/test-elastic/test_trace.pcap.json"注: 导入时可能存在导入失败,由于_bulk接口对post的文件大小有限制,尽量不要超过15MB,最好在10MB以内。如果超过,建议使用tshark的输出条件生成多个json文件,使用curl依次导入。如果导入失败,可以在curl加-v查看提示信息。默认索引名为类似 packets-2019-04-23(报文记录的日期),可以导入后重新索引可以使用类似以下命令查看导入情况: curl 'http://127.0.0.1:9200/packets-2019-04-23/_search/?size=10&pretty=true'实时监控方案主要思路使用tshark实时抓取报文,并启用过滤策略使用tshark解析捕获的报文,提取指定的字段并写入csv文件中,或者使用json格式在下一步进行ETL writes captured wireless packets as .csv.使用filebeat持续检测csv文件,并发个logstash用于字段过滤之类(如无需过滤可以跳过logstash直接发给elastsearch)logstash对字段进行过滤,格式转化等,之后发到elasticsearch使用kibana进行数据可视化,对报文统计分析简单实现1. tshark部分tshark -a duration:600 -i phy0.mon -t ad -t ad -lT fields -E separator=, -E quote=d -e _ws.col.Time -e wlan.fc.type -e wlan.fc.type_subtype -e radiotap.dbm_antsignal -e frame.len -e radiotap.datarate > tshark.csv2. filebeat简单filebeat.yml配置文件 ...

May 1, 2019 · 2 min · jiezi

bulk写es超时问题

背景笔者维护的线上EFK集群,在每天早上8点创建新索引的时候,日志中总会出现如下的日志: failed to process cluster event (cluster_update_settings) within 30s[2019-04-13T08:00:38,213][DEBUG][o.e.a.b.TransportShardBulkAction] [logstash-felice-query-2019.04.13][2] failed to execute bulk item (index) BulkShardRequest [[logstash-felice-query-2019.04.13][2]] containing [7] requestsorg.elasticsearch.cluster.metadata.ProcessClusterEventTimeoutException: failed to process cluster event (put-mapping) within 30s at org.elasticsearch.cluster.service.MasterService$Batcher.lambda$onTimeout$0(MasterService.java:124) ~[elasticsearch-6.3.0.jar:6.3.0] at java.util.ArrayList.forEach(ArrayList.java:1257) ~[?:1.8.0_152] at org.elasticsearch.cluster.service.MasterService$Batcher.lambda$onTimeout$1(MasterService.java:123) ~[elasticsearch-6.3.0.jar:6.3.0] at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:625) ~[elasticsearch-6.3.0.jar:6.3.0] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_152] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_152] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_152]问题原因简言之:es的master处理不过来了,es的segment合并是一个非常耗时的操作。批量处理的超时时间默认设置为30s。可以通过以下命令查看pending的task: curl 'localhost:9200/_cat/pending_tasks?v'网上说法不一,出现这个问题很可能的原因可以总结如下: 分片数过多;单节点的分片数不要超过1000个(经验值);通过写入数据自动创建索引最容易出现这种情况;大批量写入数据refresh时间间隔太短;索引的字段数量太多(几十上百个)解决方案方案一:每天预创建索引虽然普通的EFK日志服务可以通过logstash的默认模板去创建索引,但是当索引个数比较大,并且索引的字段数量太多时,就非常容易出现超时。那么我们一般的做法是提前创建好索引,并且设定好每个索引的mapping。笔者采用了一种偷懒的操作,写了一个定时任务,每天去读取当天的索引的mapping,然后直接用这个mapping去创建新的索引。当然这种解决方案的前提是你自己已经对索引有了一个基本的管理。 方案二:减少每个节点的分片数es默认为每个索引设置的分片数是5。可以通过以下命令查看索引的settings: GET /logstash-sonofelice-2019.04.27可以在创建索引的时候,优化一下分片的个数。可以参考文章:https://www.cnblogs.com/gugul... 一句话:shard越少越好。shard越少,越能减少写入开销。 方案三:集群扩容如果是偶发性的写入超时,可能是流量高峰或者新创建索引的时候创建mapping新字段导致。但是如果经常性的出现超时,并且write_queue里面的数据太多,cpu使用率居高不下,那可能就是es集群出现了瓶颈,这时候就需要进行扩容,没有更好的办法。 参考资料https://github.com/elastic/el...

April 30, 2019 · 1 min · jiezi

EFK接入kafka消息队列

1 前言在笔者最开始维护的日志服务中,日质量较小,没有接入kafka。随着业务规模扩增,日质量不断增长,接入到日志服务的产品线不断增多,遇到流量高峰,写入到es的性能就会降低,cpu打满,随时都有集群宕机的风险。因此,接入消息队列,进行削峰填谷就迫在眉睫。本文主要介绍在EFK的基础上如何接入kafka,并做到向前兼容。 2 主要内容如何搭建kafka集群原有EFK升级3 搭建kafka集群3.1 搭建zookeeper集群主要参考文章:【zookeeper安装指南】由于是要线上搭建集群,为避免单点故障,就需要部署至少3个节点(取决于多数选举机制)。 3.1.1 下载进入要下载的版本的目录,选择.tar.gz文件下载 3.1.2 安装使用tar解压要安装的目录即可,以3.4.5版本为例这里以解压到/home/work/common,实际安装根据自己的想安装的目录修改(注意如果修改,那后边的命令和配置文件中的路径都要相应修改) tar -zxf zookeeper-3.4.5.tar.gz -C /home/work/common3.1.3 配置在主目录下创建data和logs两个目录用于存储数据和日志: cd /home/work/zookeeper-3.4.5mkdir data mkdir logs在conf目录下新建zoo.cfg文件,写入如下配置: tickTime=2000 dataDir=/home/work/common/zookeeper1/data dataLogDir=/home/work/common/zookeeper1/logs clientPort=2181 initLimit=5 syncLimit=2 server.1=192.168.220.128:2888:3888 server.2=192.168.222.128:2888:3888 server.3=192.168.223.128:2888:3888在zookeeper1的data/myid配置如下: echo '1' > data/myidzookeeper2的data/myid配置如下: echo '2' > data/myidzookeeper2的data/myid配置如下: echo '3' > data/myid3.1.4 启停进入bin目录,启动、停止、重启分和查看当前节点状态(包括集群中是何角色)别执行: ./zkServer.sh start ./zkServer.sh stop ./zkServer.sh restart./zkServer.sh statuszookeeper集群搭建完成之后,根据实际情况开始部署kafka。以部署2个broker为例。 3.2 搭建kafka broker集群3.2.1 安装下载并解压包: curl -L -O http://mirrors.cnnic.cn/apache/kafka/0.9.0.0/kafka_2.10-0.9.0.0.tgz tar zxvf kafka_2.10-0.9.0.0.tgz 3.2.2 配置进入kafka安装工程根目录编辑config/server.properties #不同的broker对应的id不能重复broker.id=1delete.topic.enable=trueinter.broker.protocol.version=0.10.0.1log.message.format.version=0.10.0.1listeners=PLAINTEXT://:9092,SSL://:9093auto.create.topics.enable=falsessl.key.password=testssl.keystore.location=/home/work/certificate/server-keystore.jksssl.keystore.password=testssl.truststore.location=/home/work/certificate/server-truststore.jksssl.truststore.password=testnum.network.threads=3num.io.threads=8socket.send.buffer.bytes=102400socket.receive.buffer.bytes=102400socket.request.max.bytes=104857600log.dirs=/home/work/data/kafka/lognum.partitions=1num.recovery.threads.per.data.dir=1offsets.topic.replication.factor=1transaction.state.log.replication.factor=1transaction.state.log.min.isr=1log.retention.hours=72log.segment.bytes=1073741824log.retention.check.interval.ms=300000zookeeper.connect=192.168.220.128:2181,192.168.222.128:2181,192.168.223.128:2181zookeeper.connection.timeout.ms=6000group.initial.rebalance.delay.ms=03.2.3 启动kafka进入kafka的主目录 nohup sh bin/kafka-server-start.sh config/server.properties > /dev/null 2>&1 &3.2.4 连通性测试首先创建一个topic:topic_1 ...

April 24, 2019 · 2 min · jiezi

centos7 安装ELK做日志收集(elasticsearch,logstash,kibana)

关于elk不用说,大家多多少少都听过,最近我搭建了一套用作收集日志,供大家参考:一.安装elasticsearch,logstash,kibana强烈建议安装去es的官网安装: 今天是2019.4.9,目前最新的版本是6.7.1,三个都装6.7.1版本(版本最好一致,否则会有各种疑难杂症)elasticsearch: https://www.elastic.co/downlo...logstash: https://www.elastic.co/cn/dow...kibana:https://www.elastic.co/cn/dow…其中es的安装教程我在这篇文章里面已经写了:https://segmentfault.com/a/11…,下面介绍logstash和kibana的安装过程1.logstash: 下载后##进入安装包所在目录,解压tar -xf logstash-6.7.1.tar.gz##切换到bin目录cd /logstash-6.7.1/bin##编辑或者创建一个启动的配置文件,到时候用这个配置文件启动vim input_flter_output.confinput { file{ path=> “/crawler/jenkins/.log” ##生成日志的目录 type=> “cml” ##索引的类型 start_position=> “beginning” ##一开始就输入原来的日志信息 } stdin{}}filter{ }output{ elasticsearch{ action=> “index” hosts=> “www.iamcrawler.cn:9500” ##输出到elasticsearch上面 index=> “log-%{+yyyy.MM.dd}” ##生成一个log-时间的索引 }} #最后保存退出 ##在bin目录下再编写一个启动bat vim run.bat sh logstash -f input_flter_output.conf & #最后保存退出 然后运行run.bat即可 [root@iamcrawler bin]# sh run.bat ##正常会出现以下情况: [root@iamcrawler bin]# Sending Logstash logs to /crawler/logstash/logstash-6.7.1/logs which is now configured via log4j2.properties [2019-04-09T13:11:40,120][WARN ][logstash.config.source.multilocal] Ignoring the ‘pipelines.yml’ file because modules or command line options are specified [2019-04-09T13:11:40,138][INFO ][logstash.runner ] Starting Logstash {“logstash.version”=>“6.7.1”} [2019-04-09T13:11:50,041][INFO ][logstash.pipeline ] Starting pipeline {:pipeline_id=>“main”, “pipeline.workers”=>2, “pipeline.batch.size”=>125, “pipeline.batch.delay”=>50} [2019-04-09T13:11:50,697][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://www.iamcrawler.cn:9500/]}} [2019-04-09T13:11:51,065][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>“http://www.iamcrawler.cn:9500/"} [2019-04-09T13:11:51,191][INFO ][logstash.outputs.elasticsearch] ES Output version determined {:es_version=>6} [2019-04-09T13:11:51,196][WARN ][logstash.outputs.elasticsearch] Detected a 6.x and above cluster: the type event field won’t be used to determine the document _type {:es_version=>6} [2019-04-09T13:11:51,232][INFO ][logstash.outputs.elasticsearch] Using default mapping template [2019-04-09T13:11:51,253][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>“LogStash::Outputs::ElasticSearch”, :hosts=>[”//www.iamcrawler.cn:9500"]} [2019-04-09T13:11:51,287][INFO ][logstash.outputs.elasticsearch] Attempting to install template {:manage_template=>{“template”=>“logstash-”, “version”=>60001, “settings”=>{“index.refresh_interval”=>“5s”}, “mappings”=>{"default"=>{“dynamic_templates”=>[{“message_field”=>{“path_match”=>“message”, “match_mapping_type”=>“string”, “mapping”=>{“type”=>“text”, “norms”=>false}}}, {“string_fields”=>{“match”=>"", “match_mapping_type”=>“string”, “mapping”=>{“type”=>“text”, “norms”=>false, “fields”=>{“keyword”=>{“type”=>“keyword”, “ignore_above”=>256}}}}}], “properties”=>{"@timestamp"=>{“type”=>“date”}, “@version”=>{“type”=>“keyword”}, “geoip”=>{“dynamic”=>true, “properties”=>{“ip”=>{“type”=>“ip”}, “location”=>{“type”=>“geo_point”}, “latitude”=>{“type”=>“half_float”}, “longitude”=>{“type”=>“half_float”}}}}}}}} [2019-04-09T13:11:51,399][INFO ][logstash.outputs.elasticsearch] Installing elasticsearch template to _template/logstash [2019-04-09T13:11:51,783][INFO ][logstash.inputs.file ] No sincedb_path set, generating one based on the “path” setting {:sincedb_path=>"/crawler/logstash/logstash-6.7.1/data/plugins/inputs/file/.sincedb_6677650ec826fa62a735f6625357dead", :path=>["/crawler/jenkins/.log"]} [2019-04-09T13:11:51,896][INFO ][logstash.pipeline ] Pipeline started successfully {:pipeline_id=>“main”, :thread=>"#<Thread:0x729fdee9 run>"} [2019-04-09T13:11:52,010][INFO ][filewatch.observingtail ] START, creating Discoverer, Watch with file and sincedb collections [2019-04-09T13:11:52,033][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]} [2019-04-09T13:11:52,723][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600} 2.kibana的安装 kibana安装比较简单,下载后,进入安装目录的config目录,如: cd /crawler/kibana/kibana-6.7.1-linux-x86_64/configvim kibana.yml##添加如下命令server.host: 0.0.0.0elasticsearch.url: “http://localhost:9500” #这里是es的http地址##进入kibana目录,执行以下命令,后台运行kibana./bin/kibana &二.kibana的使用可以参照网上的很多教程,这里就不过多的描述了 ...

April 9, 2019 · 2 min · jiezi

如何实现 Logstash/Elasticsearch 与MySQL自动同步 更新操作 和 删除操作 ?

技术背景我们现在的同步, 是依靠 Logstash的 input-jdbc-plugin插件来实现的自动增量更新,这个的方案貌似只能 增量 添加数据而不能修改或者删除数据. 其实不然, 我们根据input-jdbc-plugin这个插件的一些配置, 是可以实现我们要的效果的.方案原理:用一个更新时间的字段来作为每次Logstash增量更新的tracking column, 这样Logstash每次增量更新就会根据上一次的最后的更新时间来作为标记.索引的document id必须是 主键, 这样在每次增量更新的时候, 才不会只是增加数据, 之前ID相同的数据就会被覆盖, 从而达到update的效果.删除是建立在上面更新的原理之上, 就是再加一个删除标记的字段, 也就是数据只能软删除, 不能直接删除.以上就是这个方案的实现原理, 缺点就是要多加一个更新时间的字段, 并且数据不能直接删除, 只能软删除, 所以这个方案有一定的局限性, 但是对于大部分操作, 应该都是可以妥协的.实施细节:第一步: 数据表设计你的表, 必须要有一个update_time或同样意思的字段, 表明这条数据修改的时间如果有删除操作的话, 是不可以直接删除数据的, 必须是软删除,就是还得有一个 delete_time或者is_delete或相同意思的字段第二步: 配置logstashinput 和outputinput { jdbc { … statement => “SELECT * FROM article WHERE update_time > :sql_last_value ORDER BY id ASC” tracking_column => ‘update_time’ … }}output { elasticsearch { … document_id => “%{id}” … }}

February 18, 2019 · 1 min · jiezi

ELK(Elasticsearch,Logstash,Kibana) 搭建 同步 MySQL 及 用户权限安全设置

本文章用的elastic相关组件版本: 6.4.1一, 准备ElasticsearchLogstashKibanajdk 8 (主要根据logstash的版本)下载mysql-connector-java.jar找对应MySQL的版本具体每个插件的安装方式请查看官方文档, 很简单的, WIN下的都是绿色版, Linux的安装目录基本是在/usr/share/对应软件,配置文件目录在/etc/对应软件下面.二, 配置1, Logstash的配置input的配置https://www.elastic.co/guide/…增量更新的主要几个参数use_column_value => true //是否使用自定义标记列tracking_column => “id” //指定的列record_last_run => true //是否记录最后运行的指标last_run_metadata_path => “[path]” //记录的指标存储路径, 当多个input的时候, 这个是必须要设置的, 否则多个input会共用一个jdbc_paging_enabled => true //是否启用分页查询jdbc_page_size => “[number]” //每次查询多少statement => “SELECT * FROM db_name WHERE id > :sql_last_value” //记住这里的 :sql_last_value多个input的时候, 需要这个配置 后面的output用用if else时候需要作为判断依据type => ““output配置mappingshttps://www.elastic.co/guide/…template => “mappings配置的路径, 一般json格式"template_name => ““template_overwrite => trueoutput的判断语法if EXPRESSION { //…} else if EXPRESSION { //…} else { //…}2, 安全设置(用户认证相关设置)修改Elasticsearch设置修改elasticsearch.yml, 添加两个配置项:xpack.security.enabled: truexpack.security.transport.ssl.enabled: true在下面更新license的时候, 请将以上设置为false(也可以不设置为false, 影响可能不大)X-PACK设置:不过在用这个命令之前, 是需要依赖x-pack模块的, Elastic 6.3.x后面的版本, 就内置了这个模块,这个模块的使用, 不是免费的, 是收费的, 免费和收费的区别, 网上有破解版, 我暂时用破解版做演示, 请大家还是使用正版吧. 破解版替换完原版文件后, 需要自己去官网申请一个basic授权的license文件, 这是一个json文件, 修改里面的type为platinum,expiry_date_in_millis为2855980923000, 然后再在Kibana的Management的License Management的地方上传修改后的License文件.初始化用户及密码命令:elastic/bin/elastic-set-password这个命令只有两个参数 auto 和 interactive 一个是自动, 一个是交互, 交互的方式就是可以自己设置密码, 自动的我没用过, 这个命令会设置5个用户的密码:elastic,kibana,logstash_system,beats_system,apm_system_users,其中elastic这个用户的权限最大.在完成以上步骤后, 记得重启Elasticsearch和Kibana, 在重启Kibana的时候, 会遇到一些warning和error,先不管error,有两个warning需要先解决, 后面的error自然就没有了. 这两个应该是关于xpack.reporting.encryptionKey和xpack.security.encryptionKey的.参考https://www.elastic.co/guide/… 和 https://www.elastic.co/guide/...xpack.reporting.encryptionKey: “a_random_string"xpack.security.encryptionKey: “something_at_least_32_characters"再重启应该就没有error了, 这个时候就可以用之前设置密码的那几个账号登录了, 用elastic账号登录, 还可以设置其他几个账号的权限了.三, 运行Logstashlogstash -f [指定配置文件的路径.conf] ...

February 14, 2019 · 1 min · jiezi

logstash常用插件介绍

前言在《使用EFK快速搭建安全可靠的日志服务》一文中,我们已经大致介绍了分布式日志服务的各个组件。但是对于很多实现细节,并未涵盖到。本文主要介绍logstash用到的插件,以及对插件的离线打包,避免每次手动安装插件。本文主要分为4部分:插件的离线打包input插件filter插件output插件在使用插件之前,我们先了解一个概念:事件。Logstash 每读取一次数据的行为叫做事件。1.插件的离线打包在生产环境中,logstash可能是部署到多台机器上,如果每次logstash的安装更新都需要手动更新一遍插件成本是非常高的。那我们可以对用到的插件进行offline打包。制作logstash-offline-plugins-x.x.x.zip说明:需要在一台已经安装了logstash的机器上,先给logstash安装好所需要的插件然后再导出插件,具体命令:bin/logstash-plugin update logstash-filter-mutate bin/logstash-plugin install logstash-filter-json_encode bin/logstash-plugin prepare-offline-pack logstash-filter-mutate logstash-filter-json_encode参考elastic的官方文档:https://www.elastic.co/guide/…在logstash安装时通过如下命令进行插件包的安装:bin/logstash-plugin install file://$(pwd)/logstash-offline-plugins.zip安装插件后要及时观察logstash的打印日志,避免因为插件版本冲突导致的logstash启动失败的情况。2.input插件输入插件允许一个特定的事件源可以读取到 Logstash 管道中,配置在 input {} 中,且可以设置多个。 在《使用EFK快速搭建安全可靠的日志服务》一文中我们是分别将input、filter和output三部分的配置放到了不同的配置文件中。input的配置在input.conf文件中。由于我们使用的是EFK,因此需要beats插件。可以参考官网:https://www.elastic.co/guide/…input { beats { port => 5044 client_inactivity_timeout => 600 ssl => true ssl_certificate_authorities => ["/home/work/certificate/chain-ca.pem"] ssl_certificate => “/home/work/certificate/server.crt.pem” ssl_key => “/home/work/certificate/server.key.pem” ssl_verify_mode => “force_peer” }}需要注意的是,在6.4版本以上的beats配置中,多了ssl_peer_metadata,可以拿到证书里的meta信息,可以便于我们后续在logstash上做一些鉴权校验。除了beats插件,inputs类型插件还有很多种,主要列举如下:elasticsearchfilelog4jkafka更多插件配置介绍参考官网:https://www.elastic.co/guide/…3.filter插件filter插件就比较多了。日志接入之后要进行日志的规则匹配、过滤等操作,配置filter是必不可少的。最长用到的filter插件有以下几种:grokdatemutategeoipsplituuidjsonjsonencode3.1 grok插件Grok是logstash最主要的过滤插件,grok是通过系统预定义的正则表达式或者通过自己定义正则表达式来匹配日志中的各个值,安装了logstash后默认会有一些常用软件的日志匹配正则,在使用时自己可以根据自己的日志格式或者对匹配正则进行调整或者直接调用。如果,自己想在其他目录定义正则规则匹配日志,在使用时需要指定正则的路径。对于以下一行日志:55.3.244.1 GET /index.html 15824 0.043使用如下的规则:%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}grok配置示例如下:filter { grok { match => { “message” => “%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}” } }}那么这行日志经过grok之后会得到以下的field:client: 55.3.244.1method: GETrequest: /index.htmlbytes: 15824duration: 0.043常用的配置选项:match:用来对字段的模式进行匹配patterns_dir:用来指定规则的匹配路劲,如果使用logstash自定义的规则时,不需要写此参数。Patterns_dir可以同时制定多个存放过滤规则的目录。语法格式:patterns_dir => [“/ opt / logstash / patterns”,“/ opt / logstash / extra_patterns”]remove_field:如果匹配到某个”日志字段,则将匹配的这个日志字段从这条日志中删除Grok过滤正则规则可以自己根据自己的日志格式自行编写,在编写grok过滤规则时容易出错,此时可以使用grokdebug网站对自己的日志及编写的正则规则进行调试,grokdebug网址为(https://grokdebug.herokuapp.com/)3.2 date插件date用于解析字段中的日期,然后使用该日期或时间戳作为事件的logstash时间戳。对于date插件的介绍,在https://segmentfault.com/a/11…(感觉是官网https://www.elastic.co/guide/… 的翻译版本)中有非常详细的介绍。我来列举一个自己在配置过程中遇到的示例:对于给定的时间格式:2019-01-10T18:11:28.699+08:00需要进行配置的pattern应该如下:date { match => ["[@metadata][log_timestamp]", “yyyy-MM-dd’T’HH:mm:ss.SSSZZ”] timezone => “Asia/Shanghai”}需要注意的是,对于filter.conf配置是可以热加载的,如果时间格式配置错误,会导致logstash进程挂掉,对于问题的排查,可以自行查询logstash的日志。3.3 mutate插件mutate 插件可以在字段上执行变换,包括重命名、删除、替换和修改。这个插件相当常用。比如:你已经根据 Grok 表达式将 Tomcat 日志的内容放到各个字段中,想把状态码、字节大小或是响应时间,转换成整型;你已经根据正则表达式将日志内容放到各个字段中,但是字段的值,大小写都有,这对于 Elasticsearch 的全文检索来说,显然用处不大,那么可以用该插件,将字段内容全部转换成小写。示例如下:filter { mutate { split => [“hostname”, “.”] add_field => { “shortHostname” => “%{hostname[0]}” } } mutate { rename => [“shortHostname”, “hostname” ] }}在笔者之前写的《使用EFK快速搭建安全可靠的日志服务》一文中,主要使用mutate插件来add_field, remove_field, convert,gsub等操作。其中covert可以将字符串类型转换为int/long/float等类型,便于进行聚合等操作。例如:mutate { convert => { “averageSliceQueryTime” => “integer” “maxSliceQueryTime” => “integer” }关于更多filter类型插件可以参考:https://www.elastic.co/guide/…4.output插件和前面介绍的input插件一样,output插件也是最基础最简单的输出插件。在《使用EFK快速搭建安全可靠的日志服务》一文中我们使用的elasticsearch作为最终的输出存储,因此output插件使用的是elasticsearch:output { if “_grokparsefailure” in [tags] or “_jsonparsefailure” in [tags] { file { path => “/home/work/logstash/failure_output/failure-%{+YYYY-MM-dd}.log” } } else { elasticsearch { hosts => [“https://es-host1:9920”,“https://es-host2:9920”,“https://es-host3:9920”] index => “logstash-%{[@metadata][index_name]}-%{+YYYY.MM.dd}” document_id => “%{[@metadata][document_id]}” ssl => true ssl_certificate_verification => true truststore => “/home/work/certificate/truststore.jks” truststore_password => “adofkoe” user => “logstash” password => “dafodmfkamkefadfg” } }}除了elasticsearch,还支持以下输出:emailfileinfluxdbmongodbredis更多可以参考官网:https://www.elastic.co/guide/…总结logstash的各种插件极大丰富了其功能。在实际使用中,我们可以根据自己的实际需求进行相关配置,来构建自己的日志服务系统。笔者此处主要是对自己使用过程中所用到内容的总结。遇到的很多实际问题,可以自行查询官网进行解决。 ...

February 14, 2019 · 1 min · jiezi