关于filebeat:filebeat简介

本篇次要来聊一聊filebeat的装置部署与应用。 概念如果你用过logstash,那么你就会十分疾速的相熟filebeat,它其实就做一件事件:从input读取事件源,通过相应的解析和解决之后,从output输入到指标存储库。 下载首先咱们要去下载安装包,这个比较简单,就不多介绍了 我这里抉择了rpm安装包。 装置我这里用的是linux操作系统,把上一步下载到的安装包放到对应的目录下,而后进入到该目录,执行rpm -ivh filebeat-6.6.0-x86_64.rpm,执行完之后就装置胜利了,咱们能够看到装置的相干门路 配置rpm形式装置完之后,它默认的配置文件门路在/etc/filebeat,如图 filebeat.yml是一个样例文件,咱们能够基于这个文件来改变本人想要的配置项,filebeat.reference.yml是蕴含了所有的配置项的文件,咱们能够从外面拷贝本人想要的项。这里介绍几个咱们罕用的配置项, filebeat.inputs 次要用来配置收集的信息起源,咱们用的比拟多的是如下这种形式来收集日志 type就是收集的类型,enabled就是是否启用,paths就是对应的日志文件。这个节点是反对多个类型的收集配置的,如 filebeat.modules 这个预计大家用的不多或者说不是很熟,其实这个能够了解为对中间件的日志解析模块,很多中间件输入的日志格局都是固定的,只有这里加了对应中间件的module就能够主动帮你解析。通过./filebeat modules list能够查看所有反对的module。 output 这里写output并不是说这是一个配置节点,是因为filebeat反对多种输入,咱们真正在配置的时候是配理论的输入形式,比方output.elasticsearch、output.file等。 更多的配置介绍能够查看官网的配置。 启动执行命令systemctl start filebeat.service就能够启动了。而后执行ps -ef|grep filebeat查看一下 能够看到曾经启动胜利了,如果你发现没有启动胜利,能够执行 cd /usr/bin,在这个目录下执行./filebeat -c /etc/filebeat/filebeat.yml -e,这样会提醒具体的错误信息。比方我这里filebeat用的8.1.2版本,es用的7.9版本,用启动的时候就提醒es版本太老,须要配置output.elasticsearch.allow_older_versions为true。而用systemctl start filebeat.service启动的时候没有任何提醒,连在 /var/log/filebeat/ 和 /var/lib/filebeat/registry/filebeat/ 都没找到错误信息,这里属实有点坑。 重新启动命令systemctl restart filebeat.service。 最初看下es的内容http://ip:9200/_cat/indices?v: 能够看到曾经有了,当然咱们能够自定义索引名称。 集成ElasticSearch如何应用filebeat将日志文件数据同步到es中去呢,这里基于例子来讲。我本地有两个日志文件,别离为/usr/local/log/ucs.20220412.0.log、/usr/local/log/cps-provider.20220412.0.log,filebeat.yml做如下配置: #输出filebeat.inputs:- type: log enabled: true paths: - /usr/local/log/ucs.20220412.0.log tags: ucs- type: log enabled: true paths: - /usr/local/log/cps-provider.20220412.0.log tags: cpssetup.template.name: "my-log"setup.template.pattern: "my-log-*"#输入output.elasticsearch: # Array of hosts to connect to. hosts: ["192.168.10.118:9200"] indices: - index: "ucs-%{+yyyy.MM.dd}" when.contains: tags: "ucs" - index: "cps-%{+yyyy.MM.dd}" when.contains: tags: "cps" #如果是es版本较老,则开启上面这项 #allow_older_versions: true配置实现之后启动filebeat,再拜访es地址http://ip:port/_cat/indices?v看下 ...

April 19, 2022 · 1 min · jiezi

关于filebeat:日志收集-实例

日志收集流程形容留神:当es集群重启后记得在kibana中执行 PUT /_cluster/settings{ "transient": { "cluster": { "max_shards_per_node":10000 } }}tomcat 日志收集filebeat conf[root@tomcat-prod_20 ~]# cd /data/work/filebeat-5.5.2/[root@tomcat-prod_20 filebeat-5.5.2]# cat filebeat.yml filebeat.prospectors:- input_type: log paths: - /data/WEBLOG/prod-ecommerce-app/catalina.out document_type: tykh_insurance_ecommerce-app_78pro multiline: pattern: '^\d{4}(\-|\/|\.)\d{1,2}(\-|\/|\.)\d{1,2}' negate: true match: after max_lines: 100 timeout: 3s fields: logtype: tykh_insurance_ecommerce-app_78protail_files: falseoutput.kafka: enabled: true hosts: ["10.100.20.1xx:9092","10.100.20.1x1:9092","10.100.20.1x2:9092"] topic: tykh-140 compression: gzip max_message_bytes: 1000000 required_acks: 1logstash[root@localhost conf.d]# cat insurace-140.conf input { kafka { bootstrap_servers => ["10.100.20.1xx:9092,10.100.20.1x1:9092,10.100.20.1x2:9092"] topics => ["tykh-140"] codec => "json" consumer_threads => 1 #auto_offset_reset => "earliest" auto_offset_reset => "latest" group_id => "tykh-140" decorate_events => true max_partition_fetch_bytes => "52428700" max_poll_records => "200" session_timeout_ms => "50000" request_timeout_ms => "510000" heartbeat_interval_ms => "1000" }}filter { grok { patterns_dir => [ "/etc/logstash/patterns.d" ] match => [ "message", "%{TIMESTAMP_ISO8601:log_time}\s+\[%{THREADID:threadId}\]\s+\[%{THREADNAME:traceid}\]\s+%{LOGLEVEL:level}\s+%{JAVACLASS:javaclass}\s+\-\s+%{JAVAMESSAGE:javameassage}","message", "%{TIMESTAMP_ISO8601:log_time}\s+\[%{THREADID_1:threadId}\]\s+%{LOGLEVEL:level}\s+%{JAVACLASS:javaclass}\s+\-\s+%{JAVAMESSAGE:javameassage}","message","%{TIMESTAMP_ISO8601:log_time}\s+%{TID:TID}\s+\[%{THREADID_1:threadId}\]\s+%{LOGLEVEL:level}\s+%{JAVACLASS:javaclass}\s+\-\s+%{JAVAMESSAGE:javameassage}"] remove_field => [ "message","beat","timestamp","topic","hostname","name","index","host","tags"] } ruby { code => "event.timestamp.time.localtime" } date {match=>["log_time","yyyy-MM-dd HH:mm:ss.SSS"]}}output { if [fields][logtype] == "tykh_insurance_ecommerce-app_78pro" { elasticsearch { hosts => ["10.100.20.1xx:9200","10.100.20.1xx:9200","10.100.20.1x8:9200"] index => "tykh_insurance_ecommerce-app_78pro%{+YYYY-MM-dd}" user => elasxxx password => "elasticsearcxxx" } stdout { codec => rubydebug } }}k8s logs (在jenkins )[root@insurace-24 ~]# cat /root/docker/scripts/install_logstash.sh#!/bin/bashconfpath=~/docker/scripts/confrepo=harborxx.reg/pre_jinfuapp=$1topics_pattern=$2profile=$3project=$4master_host=10.100.24.xxyaml_host=http://10.100.24.1x2:8889cd $confpathmkdir -p $app/$profileecho "---logstash-configmap.yaml---"cat logstash-configmap-template.yaml | sed "s|#topics_pattern#|$topics_pattern|g" | sed "s|#project#|$project|g" | sed "s|#profile#|$profile|g"cat logstash-configmap-template.yaml | sed "s|#topics_pattern#|$topics_pattern|g" | sed "s|#project#|$project|g" | sed "s|#profile#|$profile|g" > $app/$profile/logstash-configmap.yamlecho "---logstash.yaml---"cat logstash-template.yaml | sed "s|#topics_pattern#|$topics_pattern|g" | sed "s|#project#|$project|g" | sed "s|#profile#|$profile|g" cat logstash-template.yaml | sed "s|#topics_pattern#|$topics_pattern|g" | sed "s|#project#|$project|g" | sed "s|#profile#|$profile|g" > $app/$profile/logstash.yamlssh $master_host "kubectl apply -f $yaml_host/$app/$profile/logstash-configmap.yaml && kubectl apply -f $yaml_host/$app/$profile/logstash.yaml"logstash-template.yaml[root@insurace-24 conf]# cat logstash-template.yamlapiVersion: apps/v1kind: Deploymentmetadata: name: logstash-#topics_pattern#-#profile# namespace: defaultspec: selector: matchLabels: app: logstash-#topics_pattern#-#profile# template: metadata: labels: app: logstash-#topics_pattern#-#profile# spec: containers: - name: logstash-#topics_pattern#-#profile# image: harborxx.reg/library/logstash:7.6.2.1 imagePullPolicy: IfNotPresent command: - logstash - '-f' - '/etc/logstash_c/logstash-#project#-#topics_pattern#-#profile#.conf' volumeMounts: - name: config-volume mountPath: /etc/logstash_c/ resources: limits: cpu: 1000m memory: 1348Mi volumes: - name: config-volume configMap: name: logstash-#project#-#topics_pattern#-#profile# items: - key: logstash-#project#-#topics_pattern#-#profile#.conf path: logstash-#project#-#topics_pattern#-#profile#.conf/root/docker/scripts/install_logstash.sh prodpipeline-assessment-back e-assessment-back profile-a insurance---logstash-configmap.yaml---kind: ConfigMapapiVersion: v1metadata: name: logstash-insurance-e-assessment-back-profile-a namespace: defaultdata: logstash-insurance-e-assessment-back-profile-a.conf: | input { kafka { bootstrap_servers => ["10.100.24.xx:9092"] topics_pattern => "e-assessment-back.*" codec => "json" consumer_threads => 5 auto_offset_reset => "latest" group_id => "e-assessment-back" client_id => "e-assessment-back" decorate_events => true #auto_commit_interval_ms => 5000 } } filter { json { source => "message" } date { match => [ "timestamp" ,"dd/MMM/YYYY:HH:mm:ss Z" ] } mutate { remove_field => "timestamp" } if "_geoip_lookup_failure" in [tags] { drop { } } } output { elasticsearch { hosts => ["10.100.24.xx:9200"] index => "logstash-insurance-e-assessment-back-%{+YYYY-MM-dd}" user => elastic password => "Elasticsearch_Insuance24*#" } stdout { codec => rubydebug } }---logstash.yaml---apiVersion: apps/v1kind: Deploymentmetadata: name: logstash-e-assessment-back-profile-a namespace: defaultspec: selector: matchLabels: app: logstash-e-assessment-back-profile-a template: metadata: labels: app: logstash-e-assessment-back-profile-a spec: containers: - name: logstash-e-assessment-back-profile-a image: harborxx.reg/library/logstash:7.6.2.1 imagePullPolicy: IfNotPresent command: - logstash - '-f' - '/etc/logstash_c/logstash-insurance-e-assessment-back-profile-a.conf' volumeMounts: - name: config-volume mountPath: /etc/logstash_c/ resources: limits: cpu: 1000m memory: 1348Mi volumes: - name: config-volume configMap: name: logstash-insurance-e-assessment-back-profile-a items: - key: logstash-insurance-e-assessment-back-profile-a.conf path: logstash-insurance-e-assessment-back-profile-a.confconfigmap/logstash-insurance-e-assessment-back-profile-a createddeployment.apps/logstash-e-assessment-back-profile-a created

July 12, 2021 · 3 min · jiezi

关于filebeat:FileBeat-简单介绍并且收集本地日志数据传入到logStash-在传入到es中并且使用kibana进行日志查看

一、FileBeat插件 https://www.cnblogs.com/zsql/... 首先filebeat是Beats中的一员。 Filebeat是用于转发和集中日志数据的轻量级传送工具。Filebeat监督您指定的日志文件或地位,收集日志事件,并将它们转发到Elasticsearch或 Logstash进行索引。 logstash与filebeat的关系:因为logstash是jvm跑的,资源耗费比拟大,所以起初作者又用golang写了一个性能较少然而资源耗费也小的轻量级的logstash-forwarder。最新我的项目名就是filebeat了二、filebeat原理 1、形成 由两个组件形成,别离是inputs(输出)和harvesters(收集器),harvester负责读取单个文件的内容。harvester逐行读取每个文件,并将内容发送到输入。一个input负责管理harvesters和寻找所有起源读取。如果input类型是log,则input将查找驱动器上与定义的门路匹配的所有文件,并为每个文件启动一个harvester。每个input在它本人的Go过程中运行,Filebeat以后反对多种输出类型。每个输出类型能够定义屡次。日志输出查看每个文件,以查看是否须要启动harvester、是否曾经在运行harvester或是否能够疏忽该文件2、filebeat如何保留文件的状态 Filebeat保留每个文件的状态,并常常将状态刷新到磁盘中的注册表文件中。该状态用于记住harvester读取的最初一个偏移量,并确保发送所有日志行。3、filebeat何如保障至多一次数据生产 Filebeat保障事件将至多传递到配置的输入一次,并且不会失落数据。是因为它将每个事件的传递状态存储在注册表文件中。在已定义的输入被阻止且未确认所有事件的状况下,Filebeat将持续尝试发送事件,直到输入确认已接管到事件为止。如果Filebeat在发送事件的过程中敞开,它不会期待输入确认所有事件后再敞开。当Filebeat重新启动时,将再次将Filebeat敞开前未确认的所有事件发送到输入。这样能够确保每个事件至多发送一次,但最终可能会有反复的事件发送到输入。通过设置shutdown_timeout选项,能够将Filebeat配置为在关机前期待特定工夫三、装置 https://www.elastic.co/cn/downloads/past-releases/filebeat-7-6-2 装置windows版 四、执行操作具体流程如下 1、启动logstash 在logstash中创立stdin.confinput { beats { port => "5044" }} output { elasticsearch { hosts => ["es的ip地址:9200"] index => "es_index20210311" } stdout { codec => json_lines }}先启动logstash:./logstash -f stdin.conf --config.reload.automatic2、运行filebeat 批改filebeat.yml文件filebeat.inputs: - type: log enabled: true paths: - c:\Users\34683\AppData\Local\JetBrains\IntelliJIdea2020.3\log\idea.log output.logstash: # The Logstash hosts hosts: ["localhost:5044"]再启动:./filebeat -e -c filebeat.yml -d "publish"参考起源:https://www.cnblogs.com/peter...https://www.cnblogs.com/peter... 留神:呈现以下谬误不要缓和 Non-zero metrics in the last 30s {"monitoring": {"metrics": {"beat":{"cpu":{"system":{"ticks":359},"total":{"ticks":是input的文件没有变动了,只是一种提醒,是你已近操作了,信息记录在了data中,能够间接将data文件删除探后将es日志删掉,具体意思它指的是曾经将你要的数据导入到了es中,当日志没有更新就会始终刷这个。(嘿嘿,这是我本人认为的只是跑起来,具体没有深刻探索了) ...

March 30, 2021 · 6 min · jiezi

关于filebeat:ES关于windows环境下安装filebeat环境

一、下载filebeat下载地址:filebeat官网下载地址 WINDOWS 64-BITsha二、装置filebeat1.将压缩包放到C盘目录下进行解压2.批改配置文件filebeat.yml- type: log enabled: true encoding: utf-8 paths: - c:\programdata\elasticsearch\logs\* fields: logtype: test group: test server: test10 fields_under_root: true exclude_lines: ['poll','running','Content-Length']配置文件具体细节请参考:官网filebeat配置文件配置 3.后盾通过powershell启动filebeat服务管理员启动powershell,切换到filebeat目录;运行install-service-filebeat.ps1, 装置filebeat为服务;启动命令: start-Service filebeat 4.常见谬误如果遇到如上谬误(脚本执行策略的问题),解决形式如下: 以管理员身份运行PowerShell执行:get-ExecutionPolicy.查看PowerShell脚本运行权限政策执行:set-ExecutionPolicy RemoteSigned即可装置完当前再批改回来 Set-ExecutionPolicy Restricted5.也可通过以下命令测试filebeat运行测试命令:.\filebeat.exe -c .\filebeat.yml --configtest运行命令:.\filebeat.exe -e -c .\filebeat.yml参考附录windows增加filebeat收集日志Powershell-"无奈加载文件,因为在此零碎上禁止运行脚本"windows环境应用filebeat模块收集日志

March 16, 2021 · 1 min · jiezi

容器日志采集利器filebeat深度剖析与实践

在云原生时代和容器化浪潮中,容器的日志采集是一个看起来不起眼却又无法忽视的重要议题。对于容器日志采集我们常用的工具有filebeat和fluentd,两者对比各有优劣,相比基于ruby的fluentd,考虑到可定制性,我们一般默认选择golang技术栈的filbeat作为主力的日志采集agent。 相比较传统的日志采集方式,容器化下单节点会运行更多的服务,负载也会有更短的生命周期,而这些更容易对日志采集agent造成压力,虽然filebeat足够轻量级和高性能,但如果不了解filebeat的机制,不合理的配置filebeat,实际的生产环境使用中可能也会给我们带来意想不到的麻烦和难题。 整体架构日志采集的功能看起来不复杂,主要功能无非就是找到配置的日志文件,然后读取并处理,发送至相应的后端如elasticsearch,kafka等。 filebeat官网有张示意图,如下所示:针对每个日志文件,filebeat都会启动一个harvester协程,即一个goroutine,在该goroutine中不停的读取日志文件,直到文件的EOF末尾。一个最简单的表示采集目录的input配置大概如下所示: filebeat.inputs:- type: log # Paths that should be crawled and fetched. Glob based paths. paths: - /var/log/*.log不同的harvester goroutine采集到的日志数据都会发送至一个全局的队列queue中,queue的实现有两种:基于内存和基于磁盘的队列,目前基于磁盘的队列还是处于alpha阶段,filebeat默认启用的是基于内存的缓存队列。 每当队列中的数据缓存到一定的大小或者超过了定时的时间(默认1s),会被注册的client从队列中消费,发送至配置的后端。目前可以设置的client有kafka、elasticsearch、redis等。 虽然这一切看着挺简单,但在实际使用中,我们还是需要考虑更多的问题,例如: 日志文件是如何被filbebeat发现又是如何被采集的?filebeat是如何确保日志采集发送到远程的存储中,不丢失一条数据的?如果filebeat挂掉,下次采集如何确保从上次的状态开始而不会重新采集所有日志?filebeat的内存或者cpu占用过多,该如何分析解决?filebeat如何支持docker和kubernetes,如何配置容器化下的日志采集?想让filebeat采集的日志发送至的后端存储,如果原生不支持,怎样定制化开发?这些均需要对filebeat有更深入的理解,下面让我们跟随filebeat的源码一起探究其中的实现机制。 一条日志是如何被采集的filebeat源码归属于beats项目,而beats项目的设计初衷是为了采集各类的数据,所以beats抽象出了一个libbeat库,基于libbeat我们可以快速的开发实现一个采集的工具,除了filebeat,还有像metricbeat、packetbeat等官方的项目也是在beats工程中。 如果我们大致看一下代码就会发现,libbeat已经实现了内存缓存队列memqueue、几种output日志发送客户端,数据的过滤处理processor等通用功能,而filebeat只需要实现日志文件的读取等和日志相关的逻辑即可。 从代码的实现角度来看,filebeat大概可以分以下几个模块: input: 找到配置的日志文件,启动harvesterharvester: 读取文件,发送至spoolerspooler: 缓存日志数据,直到可以发送至publisherpublisher: 发送日志至后端,同时通知registrarregistrar: 记录日志文件被采集的状态1. 找到日志文件对于日志文件的采集和生命周期管理,filebeat抽象出一个Crawler的结构体,在filebeat启动后,crawler会根据配置创建,然后遍历并运行每个input: for _, inputConfig := range c.inputConfigs { err := c.startInput(pipeline, inputConfig, r.GetStates()) }在每个input运行的逻辑里,首先会根据配置获取匹配的日志文件,需要注意的是,这里的匹配方式并非正则,而是采用linux glob的规则,和正则还是有一些区别。 matches, err := filepath.Glob(path)获取到了所有匹配的日志文件之后,会经过一些复杂的过滤,例如如果配置了exclude_files则会忽略这类文件,同时还会查询文件的状态,如果文件的最近一次修改时间大于ignore_older的配置,也会不去采集该文件。 2. 读取日志文件匹配到最终需要采集的日志文件之后,filebeat会对每个文件启动harvester goroutine,在该goroutine中不停的读取日志,并发送给内存缓存队列memqueue。 在(h *Harvester) Run()方法中,我们可以看到这么一个无限循环,省略了一些逻辑的代码如下所示: for { message, err := h.reader.Next() if err != nil { switch err { case ErrFileTruncate: logp.Info("File was truncated. Begin reading file from offset 0: %s", h.state.Source) h.state.Offset = 0 filesTruncated.Add(1) case ErrRemoved: logp.Info("File was removed: %s. Closing because close_removed is enabled.", h.state.Source) case ErrRenamed: logp.Info("File was renamed: %s. Closing because close_renamed is enabled.", h.state.Source) case ErrClosed: logp.Info("Reader was closed: %s. Closing.", h.state.Source) case io.EOF: logp.Info("End of file reached: %s. Closing because close_eof is enabled.", h.state.Source) case ErrInactive: logp.Info("File is inactive: %s. Closing because close_inactive of %v reached.", h.state.Source, h.config.CloseInactive) default: logp.Err("Read line error: %v; File: %v", err, h.state.Source) } return nil } ... if !h.sendEvent(data, forwarder) { return nil }}可以看到,reader.Next()方法会不停的读取日志,如果没有返回异常,则发送日志数据到缓存队列中。 返回的异常有几种类型,除了读取到EOF外,还会有例如文件一段时间不活跃等情况发生会使harvester goroutine退出,不再采集该文件,并关闭文件句柄。 filebeat为了防止占据过多的采集日志文件的文件句柄,默认的close_inactive参数为5min,如果日志文件5min内没有被修改,上面代码会进入ErrInactive的case,之后该harvester goroutine会被关闭。 这种场景下还需要注意的是,如果某个文件日志采集中被移除了,但是由于此时被filebeat保持着文件句柄,文件占据的磁盘空间会被保留直到harvester goroutine结束。 ...

July 10, 2019 · 3 min · jiezi

1Filebeat-环境搭建

Mac OS# 解压即用$ tar -zxvf ~/dev/doc/filebeat-6.7.1-darwin-x86_64.tar.gz -C ~/dev/tools/# 下面是个人习惯$ ln -s ~/dev/tools/filebeat-6.7.1-darwin-x86_64/ ~/dev/tools/filebeat$ vim ~/.bash_profile export FILEBEAT_HOME=/Users/baozi/dev/tools/filebeat export PATH=$PATH:$FILEBEAT_HOME$ source ~/.bash_profile$ filebeat -version

April 27, 2019 · 1 min · jiezi

2Filebeat-概述

Filebeat 简介Filebeat是一个轻量级的,用于转发与集中日志文件的数据采集程序。<br/>Filebeat监控日志文件或者你指定的目录,然后收集日志事件(数据),转发到Logstash或者Elasticsearch。<br/> Filebeat 工作原理Filebeat启动后,会启动一个或多个Input,查找指定的日志文件的位置。对于找到的每一个日志文件,都会启动一个Harvester。Harvester会将数据发送到libbeat。libbeat聚合事件(数据)并将数据发送到指定的输出。

April 27, 2019 · 1 min · jiezi

EFK接入kafka消息队列

1 前言在笔者最开始维护的日志服务中,日质量较小,没有接入kafka。随着业务规模扩增,日质量不断增长,接入到日志服务的产品线不断增多,遇到流量高峰,写入到es的性能就会降低,cpu打满,随时都有集群宕机的风险。因此,接入消息队列,进行削峰填谷就迫在眉睫。本文主要介绍在EFK的基础上如何接入kafka,并做到向前兼容。 2 主要内容如何搭建kafka集群原有EFK升级3 搭建kafka集群3.1 搭建zookeeper集群主要参考文章:【zookeeper安装指南】由于是要线上搭建集群,为避免单点故障,就需要部署至少3个节点(取决于多数选举机制)。 3.1.1 下载进入要下载的版本的目录,选择.tar.gz文件下载 3.1.2 安装使用tar解压要安装的目录即可,以3.4.5版本为例这里以解压到/home/work/common,实际安装根据自己的想安装的目录修改(注意如果修改,那后边的命令和配置文件中的路径都要相应修改) tar -zxf zookeeper-3.4.5.tar.gz -C /home/work/common3.1.3 配置在主目录下创建data和logs两个目录用于存储数据和日志: cd /home/work/zookeeper-3.4.5mkdir data mkdir logs在conf目录下新建zoo.cfg文件,写入如下配置: tickTime=2000 dataDir=/home/work/common/zookeeper1/data dataLogDir=/home/work/common/zookeeper1/logs clientPort=2181 initLimit=5 syncLimit=2 server.1=192.168.220.128:2888:3888 server.2=192.168.222.128:2888:3888 server.3=192.168.223.128:2888:3888在zookeeper1的data/myid配置如下: echo '1' > data/myidzookeeper2的data/myid配置如下: echo '2' > data/myidzookeeper2的data/myid配置如下: echo '3' > data/myid3.1.4 启停进入bin目录,启动、停止、重启分和查看当前节点状态(包括集群中是何角色)别执行: ./zkServer.sh start ./zkServer.sh stop ./zkServer.sh restart./zkServer.sh statuszookeeper集群搭建完成之后,根据实际情况开始部署kafka。以部署2个broker为例。 3.2 搭建kafka broker集群3.2.1 安装下载并解压包: curl -L -O http://mirrors.cnnic.cn/apache/kafka/0.9.0.0/kafka_2.10-0.9.0.0.tgz tar zxvf kafka_2.10-0.9.0.0.tgz 3.2.2 配置进入kafka安装工程根目录编辑config/server.properties #不同的broker对应的id不能重复broker.id=1delete.topic.enable=trueinter.broker.protocol.version=0.10.0.1log.message.format.version=0.10.0.1listeners=PLAINTEXT://:9092,SSL://:9093auto.create.topics.enable=falsessl.key.password=testssl.keystore.location=/home/work/certificate/server-keystore.jksssl.keystore.password=testssl.truststore.location=/home/work/certificate/server-truststore.jksssl.truststore.password=testnum.network.threads=3num.io.threads=8socket.send.buffer.bytes=102400socket.receive.buffer.bytes=102400socket.request.max.bytes=104857600log.dirs=/home/work/data/kafka/lognum.partitions=1num.recovery.threads.per.data.dir=1offsets.topic.replication.factor=1transaction.state.log.replication.factor=1transaction.state.log.min.isr=1log.retention.hours=72log.segment.bytes=1073741824log.retention.check.interval.ms=300000zookeeper.connect=192.168.220.128:2181,192.168.222.128:2181,192.168.223.128:2181zookeeper.connection.timeout.ms=6000group.initial.rebalance.delay.ms=03.2.3 启动kafka进入kafka的主目录 nohup sh bin/kafka-server-start.sh config/server.properties > /dev/null 2>&1 &3.2.4 连通性测试首先创建一个topic:topic_1 ...

April 24, 2019 · 2 min · jiezi

filebeat读取本地文件到ELK简单配置

filebeat读取本地文件到ELK简单配置前言阅读本文前请确保已经部署ELK,本文是基于window server如未部署请参考window环境下搭建简单ELK日志收集相比较之前用logback + redis + elk 收集日志的方式,filebeat的方式可以减少对系统的耦合性,不会对运行的系统造成干扰。依赖组件filebeat-5.6.14-windows-x86_64官网下载地址 最新版本6.6filebeat官方下载5.6到6.6 跨度太大,如果elk使用6.X的版本需要注意下参数配置,有些参数在6.X的版本已经取消了配置filebeat下载下来之后解压,修改配置文件 filebeat.yml配置文件里面其实已经有详细的说明,5.X跟6.X的版本不太一样,不过测试的时候发现5.X的配置在6.X的filebeat也能用,只是启动会提示你该参数已经过时,建议更换。首先需要定义输入源和输出源输入源那边定义文件夹地址,支持读取单个文件或者是多个文件,输出可以输出到redis、logstash和es可以参考官网的配置:filebeat配置filebeat.prospectors:# Each - is a prospector. Most options can be set at the prospector level, so# you can use different prospectors for various configurations.# Below are the prospector specific configurations.- input_type: log # Paths that should be crawled and fetched. Glob based paths. paths: - c:\programdata\elasticsearch\logs*.log # Optional additional fields. These field can be freely picked # to add additional information to the crawled log files for filtering # 这边这个自已定参数名,例如这边定义了个type名字叫service-debug # 在logstash那边可以用这个type参数作为es的索引名称 fields: type: service-debug # level: debug # review: 1 ##————————– es output ————————————#output.elasticsearch: # Array of hosts to connect to. #hosts: [“localhost:9200”] # Optional protocol and basic auth credentials. #protocol: “https” #username: “elastic” #password: “changeme” ##————————– logstash output —————————– output.logstash: # The Logstash hosts hosts: [“127.0.0.1:5044”] 如果需要通过redis输出,只需要把输出配置改成用redisoutput.redis: hosts: [“127.0.0.1”] port: 6379 #password: “test” db: 2 timeout: 5 key: “service-debug"修改logstash配置如果是filebeat直接输出logstash,配置如下,如果是通过redis,请参考上面文章input{ beats { port => 5044 }}filter {}output { if [fields][type]==“service-debug” { elasticsearch { hosts => [“127.0.0.1:9200”] index => “service-debug-%{+YYYY.MM.dd}” } } stdout{}}配置完成后启动filebeat,启动参数 filebeat.exe -e -c filebeat.yml启动完成后,当日志文件有发生变化时候,filebeat会把相关日志输出到logstashEND ...

March 15, 2019 · 1 min · jiezi