关于influxdb:ETL-是什么-ETL-工具有哪些-ETL-数据交换系统

ETL简介ETL是英文Extract-Transform-Load的缩写。用来形容将数据从源端通过抽取(extract)、转换(transform)、加载(load)至目标端的过程。它可能对各种散布的、异构的源数据(如关系数据)进行抽取。依照事后设计的规定将不残缺数据、反复数据以及谬误数据等“脏"数据内容进行荡涤。失去符合要求的“洁净”数据,并加载到数据仓库中进行存储。这些“洁净”数据就成为了数据分析、数据挖掘的基石。 ETL重要性ETL是实现商务智能(Business Intelligence,BI)的外围。个别状况下,ETL会破费整个BI我的项目三分之一的工夫,因而ETL设计得好坏间接影响BI我的项目的成败。ETL工具有哪些 datastage (免费) 最业余的ETL工具, 2005年被IBM收买,目前倒退到11.7版本。 informatica (免费)informatica创建于1993年,业余水平与Datastage旗鼓相当。 ODI (免费)oracle数据库厂商提供的工具,有局限性,与oracle数据库耦合太深。 kettle(收费)Kettle是一款国外开源的ETL工具,纯java编写,能够在Windows、Linux、Unix上运行,数据抽取高效稳固,但学习及保护老本太高。etl-engine (收费)用go语言实现的ETL工具,轻量级引擎、跨平台(windows,linux,unix,mac)、可嵌入go语言脚本并解析执行,不便集成到各种我的项目中收费下载(GitHub - hw2499/etl-engine: etl engine 轻量级 跨平台 ETL引擎 数据抽取-转换-装载)。

January 12, 2023 · 1 min · jiezi

关于influxdb:Centos-MacBook-Docker离线安装InfluxDB超级简单

CentOS离线装置InfluxDBInfluxDB和IotDB介绍与性能比照 InfluxDB官网下载地址 #解压tar -zxvf influxdb-1.7.10-static_linux_amd64.tar.gz#nohup后盾启动,指定配置文件,输入日志到log.file文件nohup /home/influxdb-1.7.10-1/influxd -config /home/influxdb-1.7.10-1/influxdb.conf >> /home/influxdb-1.7.10-1/log.file 2>&1 &#减少 开机自启动vim /etc/rc.d/rc.local#关上后, 最初增加如下命令nohup /home/influxdb-1.7.10-1/influxd -config /home/influxdb-1.7.10-1/influxdb.conf >> /home/influxdb-1.7.10-1/log.file 2>&1 &终端连贯InfluxDBcd /home/influxdb-1.7.10-1/#连贯InfluxDB 默认没有设置用户名和明码./influx#如果设置了用户名和明码时./influx -username 'admin' -password 'abcd_2021'#近程连贯其余InfluxDB./influx -host 192.168.1.2 -port 8086 -username admin -password abcd_2021开启udp批改配置文件: influxdb.conf [[udp]] enabled = true 开启udp Java调用时 influxDB.write(8089, builder.build()); MacBook离线装置InfluxDB#下载wget https://dl.influxdata.com/influxdb/releases/influxdb-1.6.4_darwin_amd64.tar.gz#解压tar -zxvf influxdb-1.6.4_darwin_amd64.tar.gz#进入目录cd software/influxdb-1.6.4-1/usr/bin #启动服务,默认配置启动./influxd#指定配置启动./influxd -config ../../etc/influxdb/influxdb.conf #连贯influxDB./influx#近程连贯其余InfluxDB./influx -host 192.168.1.2 -port 8086 -username admin -password abcd_2021Docker装置InfluxDBCentos7离线装置Docker #下载镜像docker pull influxdb#运行容器,把配置文件挂载到宿主机/data/influxdb目录,把数据文件也挂载进去,避免数据失落, 挂载时区,不然默认是0时区docker run -d --name my-influxdb \-p 8086:8086 \-p 8083:8083 \-p 2003:2003 \-e INFLUXDB_GRAPHITE_ENABLED=true \-v /data/influxdb/conf/influxdb.conf:/etc/influxdb/influxdb.conf \-v /data/influxdb:/var/lib/influxdb \-v /etc/localtime:/etc/localtime \influxdbInfluxDB应用默认是不必用户名和明码的, 是否开启认证,默认值:false ...

August 17, 2022 · 2 min · jiezi

关于influxdb:InfluxDB和IotDB介绍与性能对比

InfluxDB简介InfluxDB 是用Go语言编写的一个开源分布式时序、事件和指标数据库,无需内部依赖。用于存储和剖析工夫序列数据的开源数据库。适宜存储设备性能、日志、物联网传感器等带工夫戳的数据,其设计指标是实现分布式和程度伸缩扩大。InfluxDB 包含用于存储和查问数据,在后盾解决ETL或监督和警报目标,用户仪表板以及可视化和摸索数据等的API。 InfluxDB是一个由InfluxData开发的开源时序型数据。它由Go写成,着力于高性能地查问与存储时序型数据。InfluxDB被广泛应用于存储系统的监控数据,IoT行业的实时数据等场景。 相似的数据库有Elasticsearch、Graphite、IotDB等。 InfluxDB长处专为工夫序列数据编写的自定义高性能数据存储。 TSM引擎容许高摄取速度和数据压缩齐全用 Go 语言编写。 它编译成单个二进制文件,没有内部依赖项简略,高性能的写入和查问HTTP API插件反对其余数据提取协定,如Graphite,collectd和OpenTSDB专为相似SQL的查询语言量身定制,可轻松查问聚合数据标签容许对系列进行索引以实现疾速无效的查问保留策略无效地主动使过期数据过期间断查问主动计算聚合数据,以进步频繁查问的效率InfluxDB毛病InfluxDB 的开源版本只反对一个节点。开源版本没有集群性能,集群版本须要免费存在前后版本兼容问题存储引擎在变动IotDB简介Apache IoTDB 是用Java语言编写的, 是专为物联网时序数据打造的数据库,提供数据采集、存储、剖析的性能。IoTDB 提供端云一体化的解决方案,在云端,提供高性能的数据读写以及丰盛的查问能力,针对物联网场景定制高效的目录组织构造,并与 Apache Hadoop、Spark、Flink 等大数据系统无缝买通;在边缘端,提供轻量化的 TsFile 治理能力,端上的数据写到本地 TsFile,并提供肯定的根底查问能力,同时反对将 TsFile 数据同步到云端。 IotDB长处国产我的项目,当初国家鼎力推广自主研发,国产化. IotDB是清华自研工夫序列数据库,Apache 孵化我的项目,2014年我的项目启动,2018年11月18号 iotdb 正式进入 apache 孵化器,成为中国高校首个进入 apache 孵化器的我的项目。为用户提供数据收集、存储和剖析等特定的服务轻量级的构造、高性能和可用的个性,以及与Hadoop和Spark生态的无缝集成,满足了工业IoTDB畛域中海量数据存储、高吞吐量数据写入和简单数据分析的需要。灵便的部署策略。IoTDB为用户提供了一个在云平台或终端设备上的一键装置工具,以及一个连贯云平台和终端上的数据的数据同步工具。硬件成本低。IoTDB能够达到很高的磁盘存储压缩比。高效的目录构造。IoTDB反对智能网络设备对简单工夫序列数据结构的高效组织,同类设施对工夫序列数据的组织,海量简单工夫序列数据目录的含糊搜寻策略。高吞吐量读写。IoTDB反对数以百万计的低功耗设施的强连贯数据拜访、高速数据读写,实用于上述智能网络设备和混合设施。丰盛的查问语义。IoTDB反对跨设施和测量的工夫序列数据的工夫对齐、工夫序列字段的计算(频域转换)和工夫维度的丰盛聚合函数反对。学习老本非常低。IoTDB反对相似sql的语言、JDBC规范API和易于应用的导入/导出工具。与先进的开放源码生态系统的无缝集成。IoTDB反对剖析生态系统,如Hadoop、Spark和可视化工具(如Grafana)。IotDB毛病目前只有单节点版本,不过集群版本马上要公布了IoTDB TsFile 的构造,目前仅有 java 版本,资源占用方面对边缘轻量级设施不敌对,限度了其在端/设施侧的利用。存储上反对应用 HDFS 或 本地盘,通过应用 HDFS 来存储可保障存储层高可用,但计算层没有进一步的高可用保障。关系数据库VS键值数据库关系数据库键值数据库单表列数下限: MySQL InnoDB 为1017列可治理海量条工夫序列单表行数不易过多:小于1000万行查问受限(表达能力低)如下:程度、垂直分表;分库按工夫维度的查问,按值维度的查问,多序列的工夫对齐查问时序数据库基于关系数据库基于键值数据库原生时序数据库轻量级时序数据库TimescaleopentsdbKairosDBinfluxdbIodDB基于PG开发的插件基于Hbase/Cassandra基于LSM机制的时序库工业畛域千万条量级工夫序列治理 时序数据主动分区时序分区键专属文件构造单节点万亿数据点治理查问打算做优化定时工作构建索引专属查问优化单节点数十TB级工夫序列数据管理定制并行查问 反对Hadoop、Spark、Matlab、Grafana等多种生态随着导入工夫的减少导入速率一直降落压缩比低,查问慢长期历史数据查问性能降落高效的数据长久化,丰盛/低提早的数据查问InfluxDB和IotDB性能测试测试配置总数据量1500 亿 pointsclient10group num10device num100sensor num100每个 sensor1500 万 pointsencodingRLE继续高压力写入测试。 测试后果 elapseTime(h)elapseRate(points/s)accuTime(h)accuRate(points/s)IoTDB25.51632058.8213.23156988.07InfluxDB38.71077684.2225.81617748.51测试共生成 1500 亿个 points,InfluxDB 总耗时 38.7 小时,IoTDB 总耗时 25.5 小时。测试 过程中,InfluxDB 和 IoTDB 都放弃了安稳的插入速度,中途没有出现异常。 ...

August 17, 2022 · 1 min · jiezi

关于influxdb:influxdb-使用

在InfluxDB当中,并没有表(table)这个概念,取而代之的是MEASUREMENTS,MEASUREMENTS的性能与传统数据库中的表统一,因而咱们也能够将MEASUREMENTS称为InfluxDB中的表。create database test17 CREATE USER testuser WITH PASSWORD 'testpwd' ## 创立用户和设置明码GRANT ALL PRIVILEGES ON cadvisor TO testuser ## 受权数据库给指定用户CREATE RETENTION POLICY "cadvisor_retention" ON "cadvisor" DURATION 30d REPLICATION 1 DEFAULT ## 创立默认的数据保留策略,设置保留工夫30天,正本为1新建表 InfluxDB中没有显式的新建表的语句,只能通过insert数据的形式来建设新curl -i -XPOST 'http://localhost:8086/write?db=test' --data-binary 'cpu_load_short,host=server01,region=us-west value=0.64 1434055562000000000' show retention policies on test; alter retention policy "autogen" on "test" duration 240h replication 1 default;

April 6, 2022 · 1 min · jiezi

关于influxdb:InfluxDB一键安装入门教程

InfluxDB是一个由InfluxData开发的开源时序型数据库,专一于海量时序数据的高性能读、高性能写、高效存储与实时剖析等,在DB-Engines Ranking时序型数据库排行榜上排名第一,广泛应用于DevOps监控、IoT监控、实时剖析等场景。InfluxDB部署简略、使用方便,在技术实现上充分利用了Go语言的个性,无需任何内部依赖即可独立部署。提供相似于SQL的查询语言,接口敌对,使用方便。丰盛的聚合运算和采样能力,提供灵便的数据保留策略(Retention Policy)来设置数据的保留工夫和正本数,在保障数据可靠性的同时,及时删除过期数据,开释存储空间,提供灵便的间断查问(Continues Query)来实现对海量数据的采样。反对协定品种多,除了HTTP、UDP等原生协定,还兼容CollectD、Graphite、OpenTSDB、Prometheus等组件的通信协定。次要个性有:1、内置HTTP接口,使用方便2、数据能够打标记,这样查问能够很灵便3、类SQL的查问语句4、装置治理很简略,并且读写数据很高效5、可能实时查问,数据在写入时被索引后就可能被立刻查出……上面咱们就来学习一下如何简略疾速的装置部署好influxdb服务1.找到influxdb的装置服务可点击试用。 2.装置部署增加节点-抉择版本-填写参数-部署胜利 装置部署过程简略又疾速,具体的装置教程如下:如何增加节点?https://www.bilibili.com/vide...如何装置部署influxdb?https://www.bilibili.com/vide...

November 2, 2021 · 1 min · jiezi

关于influxdb:Influxdb进程内存占用异常分析

景象07/09 15:00: 一波海量申请,向OS申请内存,拉高了heap_sys、heap_inuse;07/09 15:24: 流量峰值过来当前,heap_inuse降下来了(100MB),然而heap_idle还是很高(500MB),导致整体RSS很高; 背景常识Influxdb的内存要害指标: go_memstats_heap_sys_bytes:go过程从OS获取的heap内存;go_memstats_heap_idle_bytes: golang过程中暂未应用的heap内存;go_memstats_heap_inuse_bytes: golang过程以后heap理论应用的内存;go_memstats_heap_sys_bytes = go_memstats_heap_idle_bytes + go_memstats_heap_inuse_bytes从监控图上能够看出,申请过后heap_idle_bytes依然很高,阐明被heap占着没有开释。 起因剖析heap_idle为什么不立刻开释: heap_idle是指存在heap内,临时没有被应用,然而给runtime预留的空间;没有归还给OS是因为,应答前面app对内存的应用,不必再去OS申请了;当产生内存压力的时候,OS会开释掉这块内存,然而“感触到内存压力”可能因为判断不精确,导致OS内的其它过程因为申请不到内存而被OOM;Go开释内存的策略: Go底层用mmap申请内存,用madvise开释内存,参考go/src/runtime/mem_linux.go的代码:madvise将某段内存标记为不再应用时,有2种形式: MADV_DONTNEED标记过的内存如果再次应用,会触发缺页中断(page fault);MADV_FREE标记过的内存,内核会期待内存缓和时才会开释;在开释之前,这块内存仍然能够服用;(linux 4.5版本内核开始反对)显然,MADV_FREE是一种空间换工夫的优化;heap_idle没有被开释的起因是:Go madvise=MADV_FREE,内存被runtime预留没有开释。 解决办法madvise默认配置: Go1.12之前,linux平台下Go runtime应用madvise==MADV_DONTNEED;Go1.12之后,在MADV_FREE可用时默认优先应用MADV_FREE,当然,用户能够在执行程序前增加GODEBUG=madvdontneed=1来批改这一行为;Go1.16(含)之后,为了避免引起混同,Go官网将madvise默认批改为MADV_DONTNEED; commit: https://github.com/golang/go/...故能够选用的解决办法: 执行influx过程时,增加GODEBUG=madvdontneed=1参数,强制批改为MADV_DONTNEED;将influx降级为Go1.16,因为它默认=MADV_DONTNEED;参考https://zhuanlan.zhihu.com/p/...https://github.com/AlexiaChen...https://github.com/golang/go/...https://github.com/golang/go/...https://github.com/VictoriaMe...

October 31, 2021 · 1 min · jiezi

关于influxdb:InfluxDB-TSM存储引擎的读写操作

数据写入数据写入时,首先points按shard划分,归属于一个shard的points一起写入: //tsdb/store.go// WriteToShard writes a list of points to a shard identified by its ID.func (s *Store) WriteToShard(shardID uint64, points []models.Point) error { sh := s.shards[shardID] return sh.WritePoints(points)}//tsdb/shard.go// WritePoints will write the raw data points and any new metadata to the index in the shard.func (s *Shard) WritePoints(points []models.Point) error { ..... // Write to the engine. err := engine.WritePoints(points); .....}由tsm1.Engine负责写入points: 首先,结构数据,由points结构values=map[string][]Values,key=seriesKey+分隔符+fieldName, value=[]Value={timestamp,fieldValue}汇合;而后,将values写入cache;最初,将values写入WAL;//tsdb/engine/tsm1/engine.go// WritePoints writes metadata and point data into the engine.// It returns an error if new points are added to an existing key.func (e *Engine) WritePoints(points []models.Point) error { values := make(map[string][]Value, len(points)) for _, p := range points { keyBuf = append(keyBuf[:0], p.Key()...) keyBuf = append(keyBuf, keyFieldSeparator...) //一个Point中可能含多个field iter := p.FieldIterator() t := p.Time().UnixNano() for iter.Next() { keyBuf = append(keyBuf[:baseLen], iter.FieldKey()...) var v Value switch iter.Type() { case models.Float: fv, err := iter.FloatValue() if err != nil { return err } v = NewFloatValue(t, fv) ...... } values[string(keyBuf)] = append(values[string(keyBuf)], v) } } //先写到cache // first try to write to the cache if err := e.Cache.WriteMulti(values); err != nil { return err } //再写到WAL if e.WALEnabled { if _, err := e.WAL.WriteMulti(values); err != nil { return err } } return seriesErr}数据删除与LSM-Tree相似,influxdb应用标记删除的办法,待执行compactor的时候,再真正的将其删除。在data目录,有.tombstone文件,记录了哪个时间段的数据须要删除: ...

October 5, 2021 · 3 min · jiezi

关于influxdb:InfluxDB-TSM存储引擎的整体结构

TSM存储引擎的存储类tsdb.Store,蕴含索引区和数据区: tsdb.Store //索引 -- map[string]*Index -- map[string]*measurement -- map[string]*series //数据 -- map[uint64]*Shard -- Engine // 形象接口,可插拔设计,目前是 tsm1 存储引擎 -- *WAL -- *Cache -- *Compactor -- *FileStore索引区:map构造,databaseName-->Index构造; Index构造蕴含measurement和series元信息;数据区:map构造,shardId-->Shard构造,一个shard能够了解为独自的tsm引擎,其中蕴含wal、tsm file等数据; tsdb.Store整体构造Store是存储的顶层构造,蕴含索引和数据: //tsdb/store.gotype Store struct { path string // shared per-database indexes, only if using "inmem". indexes map[string]interface{} //key=databaseName, value理论是*Index //所有shards,key=shardId shards map[uint64]*Shard} Store::indexes索引区indexes定义为map[string]interface: key=databaseName,value=*Index。该对象缓存了influxdb中所有的database、retention policy、measurement、series等元信息,若series设计不合理,容易导致内存占用偏高。在influxdb启动时,将初始化该构造,从所有shards下的tsm file中加载index信息,获取其中的meaurement以及series信息,缓存到该构造中。 //tsdb/index/inmem/inmem.go// Index is the in memory index of a collection of measurements, time// series, and their tags.type Index struct { //数据库下measurementName-->*measurement measurements map[string]*measurement // measurement name to object and index //数据库下seriesKey-->*series series map[string]*series // map series key to the Series object database string //数据库名称}measurement构造保留某个measurement下series、tags等信息: ...

October 4, 2021 · 2 min · jiezi

关于influxdb:InfluxDB-存储引擎的演化

InfluxDB的存储引擎,通过3次的演变,最终应用基于LSM-Tree的TSM Tree计划: LSM Tree(LevelDB)-->mmap B+Tree(BoltDB)-->TSM Tree(tsm)LSM-TreeLSM-Tree:Log Structured Merge Tree常见的业务场景分两类: 读多写少:比方MySQL/ETCD等存储系统,底层大都采纳B-Tree及其变种;写多读少:比方时序数据库;LSM-Tree的核心思想是充分利用磁盘的程序写性能远高于随机写这一个性,将批量的随机写转化为一次性的程序写。LevelDB是LSM-Tree的一种实现。 因为写多读少并且是按工夫程序写的个性,使得InfluxDB非常适合LSM-Tree;然而InfluxDB在集成LevelDB中遇到了一些问题: 不反对热备份:须要停机备份;过期数据的批量删除反对不好:因为LSM-Tree的删除操作代价较高 为了解决这个问题,InfluxDB依据工夫将数据分为多个shard,每个shard作为一个LevelDB存储,过期时可间接删除Shard;随机数据量的减少,InfluxDB创立了越来越多的LevelDB数据库,产生大量的SSTable file,占用了大量的文件句柄,常常报错;mmap B+TreeBoltDB是mmap B+Tree的一种实现,它将每个数据库存储为1个文件,解决了LevelDB文件句柄有余的问题。 应用mmap B+Tree取得了较好的读性能,然而写性能经常出现高IOPS: 写入时序数据时,若key设置的不合理,容易变成随机写;(B+Tree的个性,参考MySQL)更新索引数据时,因为索引没有人造的排序字段,很容易随机写,导致性能有余;TSM-TreeTSM-Tree: Time Structured Merge TreeInfluxDB最终回归LSM-Tree,对其进行优化,转化为本人的数据引擎TSM-Tree: TSM-Tree实质还是LSM-Tree,InfluxDB对数据查问、数据合并压缩、数据革除做了优化;对数据查问:减少了数据索引和布隆过滤器以放慢查问速度; 数据索引:包含元数据索引、TSM File索引;布隆过滤器:疾速判断TSM File中是否蕴含特定的seriesKey;对数据压缩:依据不同的数据类型,采纳不同压缩算法;对数据革除:由shard存储一段时间内的数据,过期间接删除shard;参考:1.https://wingsxdu.com/post/dat...2.https://docs.influxdata.com/i...

October 2, 2021 · 1 min · jiezi

关于influxdb:InfluxDB集群-antientropy源码分析

data节点上的anti-entropy,会主动查看本节点上短少的shard,并主动从peer节点上copy短少的shard。 //services/anti-entropy/service.gofunc (s *Service) Open() error { if !s.cfg.Enabled { return nil } //查看 go s.CheckWorker() //修复 go s.ProcessMissingShards(s.ProcessMissingShard) return nil}查看短少的shard应用配置文件中的checkInterval定期检查: //services/anti-entropy/service.gofunc (s *Service) CheckWorker() { ticker := time.NewTicker(time.Duration(s.cfg.CheckInterval)) for { select { case <-s.closing: return case <-ticker.C: s.Check() } }}查找missShard的过程: 找到所有本机负责的并已不再写入数据的shard:now() > shard.EndTime;若本机上没有该shard信息并且shard文件不存在(每个shard1个文件),则找到1个短少的shard;查找该shard的peer节点,并产生1个missShard发送到channel,后续将从peer中copy shard的数据;//services/anti-entropy/service.gofunc (s *Service) Check() { dir := s.svrInfo.Path() //不再写入数据的shard,now() > endTime shards := s.MetaClient.ColdShardIDsByNode(s.Node.GetDataID()) for db, rp := range shards { for r, ss := range rp { for _, sh := range ss { shardDir := filepath.Join(dir, db, r, strconv.FormatUint(sh.ID, 10)) _, err := os.Stat(shardDir) //本机上短少的shard if storeSH := s.svrInfo.Shard(sh.ID); storeSH == nil || os.IsNotExist(err) { //找peer var peerOwners []meta.ShardOwner // remove my node id for i, owner := range sh.Owners { if owner.NodeID == s.Node.GetDataID() { peerOwners = append(sh.Owners[:i], sh.Owners[i+1:]...) break } } //产生1个missShard if len(sh.Owners) > 1 { s.missingShards <- ShardInfo{ID: sh.ID, Path: shardDir, Database: db, RetentionPolicy: r, PeerOwners: peerOwners} } } } } }}修复短少的shard修复时,先期待一段时间(11min),避免因shard还没写入tsm file而产生shard短少,而后确认本机上的shard文件是否短少,最初调用s.CopyShard()执行shard复制。 ...

September 30, 2021 · 2 min · jiezi

关于influxdb:InfluxDB集群-antientropy概念解析

influxdb的data节点是AP一致性,可用性优先,然而数据要最终统一。 anti-entropy意为“反熵”,熵简略的了解成shard间的不统一;anit-entropy运行在data节点上,通过比照shard间的不统一,进行shard数据修复,实现数据的最终统一。 anti-entropy的配置anti-entropy默认是敞开的: ### [anti-entropy]### Anti Entropy is used to check missing shards on data node.### If missing shards are found it will copy the shards from other replications.### Please don't enable anit-entropy if replication is 1.###[anti-entropy] enabled = false # check-interval = "30s" # The anti entropy service will check the missing shards very check-interval seconds.当被开启时,node上失落的分片会被主动修复,而内容不统一的分片须要手动(命令行)确认修复。 shard修复策略失落的分片: goroutine定期检查node上短少的分片(元数据中有,然而本地没有shard文件);对短少的每个分片,启动1个goroutine从peer节点copy这个分片;内容不统一的分片:分片熵的检测和修复流程如下 典型利用场景Data节点因为磁盘故障等起因不可用,则只需将新的data节点退出集群,新节点上的anti-entropy会主动从其它节点copy分片。 因为硬件降级等起因,替换正在运行的data节点,新节点上的anti-entropy会主动从其它节点copy分片。 anti-entropy与hinted-handoffanti-entropy和hinted-handoff都用来保障data节点AP一致性,最终达到数据的最终统一。 hh不能解决节点更换的场景; hh解决的是远端节点临时无响应时,先在本机缓存下来,待远端节点复原时,再将数据发往远端节点;anti-entropy能够主动修复整个分片短少的场景,当产生分片内容不统一时,须要手动命令行修复。

September 30, 2021 · 1 min · jiezi

关于influxdb:InfluxDB集群-hintedhandoff源码分析三points发送到远端节点

在本机节点上,给每个远端节点调配一个NodeProcessor对象,负责数据的写入和数据的读取。 NodeProcessor定期的读取本机队列中的数据,而后将其发送给远端节点,尝试写入远端shard。 // services/hh/node_processor.gofunc (n *NodeProcessor) run() { ...... for { case <-time.After(currInterval): limiter := NewRateLimiter(n.RetryRateLimit) //限流 for { c, err := n.SendWrite() ... limiter.Update(c) time.Sleep(limiter.Delay()) } }}读取和发送过程首先,ping远端节点,判断其是否沉闷,如果不沉闷,则间接返回;而后,读Head segment中的block,读操作均是从segment中的pos处开始;而后,将读到的block数据反序列化,发送给远端节点;最初,更新head segment中的pos;// services/hh/node_processor.go// SendWrite attempts to sent the current block of hinted data to the target node. If successful,// it returns the number of bytes it sent and advances to the next block. Otherwise returns EOF// when there is no more data or the node is inactive.func (n *NodeProcessor) SendWrite() (int, error) { //ping远端节点,判断是否沉闷 active, err := n.Active() if err != nil { return 0, err } //读head segment中:pos开始的block // Get the current block from the queue buf, err := n.queue.Current() //反序列化 // unmarshal the byte slice back to shard ID and points shardID, points, err := unmarshalWrite(buf) //写入远端节点 if err := n.writer.WriteShard(shardID, n.nodeID, points); err != nil { atomic.AddInt64(&n.stats.WriteNodeReqFail, 1) return 0, err } //更新head segment的pos,下次依然从pos读取 if err := n.queue.Advance(); err != nil { n.Logger.Info("failed to advance queue for node", zap.Uint64("nodeid", n.nodeID), zap.Error(err)) } return len(buf), nil}1. 判断节点是否沉闷先依据nodeId查问node信息(ip): ...

September 29, 2021 · 4 min · jiezi

关于influxdb:InfluxDB集群-hintedhandoff源码分析二points写入到本机队列

每个远端节点对应一个NodeProcessor,由NodeProcessor负责将远端写入失败的points,写入到本机队列: // services/hh/node_processor.go// WriteShard writes hinted-handoff data for the given shard and node. Since it may manipulate// hinted-handoff queues, and be called concurrently, it takes a lock during queue access.func (n *NodeProcessor) WriteShard(shardID uint64, points []models.Point) error { .... //序列化 b := marshalWrite(shardID, points) //写入本机队列 return n.queue.Append(b)}本机队列的格局每个NodeProcess有1个queue,对应于$data/hh的一个目录: 每个queue对应一个目录,默认最大1Gi;queue内蕴含若干个segment,从tail写入,从head读出;每个segment对应一个文件,默认最大1Mi;segment内蕴含若干个block,每个block含8byte length以及data组成;segment开端蕴含1个8byte的footer,保留了读取的地位,从header读出时更新; 写入过程:序列化写入队列的points,依照固定的格局序列化: 8byte: shardId;point.String() + "\n";point.String() + "\n";......func marshalWrite(shardID uint64, points []models.Point) []byte { b := make([]byte, 8) binary.BigEndian.PutUint64(b, shardID) for _, p := range points { b = append(b, []byte(p.String())...) b = append(b, '\n') } return b}写入过程:写segment从队列的tail segment写入: ...

September 29, 2021 · 2 min · jiezi

关于influxdb:InfluxDB集群-hintedhandoff源码分析一整体流程

influxdb集群场景下,1个shard有N个replcia,通常replica在不同的节点上;写shard时,所有的replica都要写入。 当远端的replica写入失败时,会先存储到本机的hinted-handoff队列;本机会定期的将hinted-handoff队列的内容发送给远端节点,达到数据的最终统一。 Hinted-handoff代码入口对于每个shard,都要向其shardOwner写入points: 远端节点因为网络起因可能写入失败,此时将本来发往远端节点的points,调用hinted-handoff.WriteShard()写入本机hh队列;// cluster/points_writer.go// writeToShards writes points to a shard and ensures a write consistency level has been met. If the write// partially succeeds, ErrPartialWrite is returned.func (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPolicy string, consistency models.ConsistencyLevel, points []models.Point) error { ...... ch := make(chan *AsyncWriteResult, len(shard.Owners)) // 向shard owner写shards for _, owner := range shard.Owners { go func(shardID uint64, owner meta.ShardOwner, points []models.Point) { .... // 写shard err := w.ShardWriter.WriteShard(shardID, owner.NodeID, points) if err != nil{ // 写入失败的话,入队hh hherr := w.HintedHandoff.WriteShard(shardID, owner.NodeID, points) .... if consistency == models.ConsistencyLevelAny { ch <- &AsyncWriteResult{owner, nil} return } } ch <- &AsyncWriteResult{owner, err} }(shard.ID, owner, points) } ......}Hinted-handoff写入数据在本机节点的$data/hh目录下,给每个远端节点创立了一个$id目录,其中缓存了写入远端节点失败的数据: ...

September 29, 2021 · 3 min · jiezi

关于influxdb:InfluxDB集群-write写入数据源码分析二

上文讲到,每个shard有N个replica,也就是N个replica owner,通常是不同的节点,否则就起不到高可用的作用了。 # influxd_ctl show-shardsID GroupID Database RetentionPolcy Replicas Owners StartTime EndTime44 44 prometheus one_week 2 node3:8088,node1:8088 2021-09-26 08:00:00 +0800 CST 2021-09-27 08:00:00 +0800 CST写shard时,要写入所有的replica owner,可能是本机节点,也可能是远端节点。 // cluster/points_writer.gofunc (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPolicy string, ... ch := make(chan *AsyncWriteResult, len(shard.Owners)) for _, owner := range shard.Owners { go func(shardID uint64, owner meta.ShardOwner, points []models.Point) { //以后节点写shard if w.Node.GetDataID() == owner.NodeID { err := w.TSDBStore.WriteToShard(shardID, points) if err == tsdb.ErrShardNotFound { err = w.TSDBStore.CreateShard(database, retentionPolicy, shardID, true) ... err = w.TSDBStore.WriteToShard(shardID, points) } ch <- &AsyncWriteResult{owner, err} return } //远端节点写shard err := w.ShardWriter.WriteShard(shardID, owner.NodeID, points) ch <- &AsyncWriteResult{owner, err} }(shard.ID, owner, points) } var wrote int for range shard.Owners { select { case result := <-ch: wrote++ // 写入胜利的次数 >= required if wrote >= required { return nil } } } ... return ErrWriteFailed} 本地节点写shard本地节点写shard,最终调用的是存储引擎的WritePoints: ...

September 28, 2021 · 2 min · jiezi

关于influxdb:InfluxDB集群-write写入数据源码分析一

Client通过POST /write向influxdb集群写入时序数据: curl -i -XPOST 'http://localhost:8086/write?db=mydb' --data-binary 'cpu_load_short,host=server01,region=us-west value=0.64 1434055562000000000'influxdb集群中的数据分shard在不同的节点上存储,client写入时序数据时(单条或批量): 有的数据须要写入以后节点;有的数据须要写入远端节点;在写入时,所有shard都写入胜利时,才认为该写入申请胜利。整体流程: node1在8086上接管/write申请,而后依据写入的数据,确定数据存储到shard1和shard2上;shard1和shard2都写入胜利,才算数据写入胜利;集群状况下,每个shard至多有2个replica,假如有2个replica,shard1存储到node1和node2上两份正本;shard1有2个replica,shard1写入胜利与request中传入的consistency无关; consistency: 写入的一致性级别consistency参数,由client在request中传入,标识了shard有N个replica的状况下,如何确定shard是否写入胜利。 如果client没有传入consistency参数,server端默认ConsistencyLevelOne,即只有一个replica写入OK,就返回client胜利。 consistency参数: all: 所有的replica都写入胜利则返回胜利;quorum: 大多数的replica写入胜利,则返回胜利;one: 任何一个replica写入胜利,则返回胜利;any: 任何一个replica写入胜利,或者被写入Hinted-Handoff缓存,则返回胜利;以3节点,2replica为例: Levelrequiredall2quorum2one1any1// cluster/points_writer.gofunc (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPolicy string, consistency models.ConsistencyLevel, points []models.Point) error { // The required number of writes to achieve the requested consistency level required := len(shard.Owners) switch consistency { case models.ConsistencyLevelAny, models.ConsistencyLevelOne: required = 1 case models.ConsistencyLevelQuorum: required = required/2 + 1 } ......}数据写入:代码流程POST /write的解决入口: ...

September 28, 2021 · 3 min · jiezi

关于influxdb:InfluxDB集群-query查询数据源码分析一

client通过GET /query查问influxdb集群中的指标数据: curl -G 'http://ops1:8086/query?pretty=true' --data-urlencode "db=falcon" --data-urlencode "q=SELECT * FROM \"cpu.user\" order by time desc limit 10"influxdb集群中的数据分shard在不同的节点上存储,client查问的指标数据,可能不在以后节点上,也可能以后节点和其它节点上都有,所以在查问时,即须要查问以后节点,也须要查问近程节点,而后将数据合并后返回client。 整体流程: node1在8086上接管/query申请,而后依据查问条件,确定指标数据在哪些节点上(node1&node2);依据查问条件,本机查问指标数据,失去localData;向远端节点node2发送查问申请,失去remoteData;将localData和remoteData合并返回client; HTTP handlerhttp handler入口: // services/httpd/handler.gofunc (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user meta.User) { ...... // Execute query. results := h.QueryExecutor.ExecuteQuery(q, opts, closing) ......}执行查问,可能有多个查问语句: // query/executor.gofunc (e *Executor) ExecuteQuery(query *influxql.Query, opt ExecutionOptions, closing chan struct{}) <-chan *Result { results := make(chan *Result) go e.executeQuery(query, opt, closing, results) return results}func (e *Executor) executeQuery(query *influxql.Query, opt ExecutionOptions, closing <-chan struct{}, results chan *Result) { ...... for ; i < len(query.Statements); i++ { ..... err = e.StatementExecutor.ExecuteStatement(stmt, ctx) ..... } .....}本地和远端查问对于每个statement,其查问过程: ...

September 27, 2021 · 2 min · jiezi

关于influxdb:InfluxDB集群-移除DataNode源码分析

influxdb集群中,client在node1上执行remove dataNode node3的命令: influxd_ctl remove-data node3:8088整体流程如下: node1收到CLI命令,向本人的8084发送GET /remove-data申请,request body: {"node":"node3:8088"};node1通过metaClient向集群发送DeleteDataNode();node3收到/remove-data申请后: 先告诉集群删除数据节点:metaClient.DeleteDataNode();再删除该节点上的shards数据; CLI命令解决命令行解析的代码入口: // cmd/influxd_ctl/cli/cli.gofunc (c *CommandLine) Run() error { switch cmd { case "remove-data": return do_remove_data(c) }}向本机的集群治理端口8084,发送GET /remove-data: func do_remove_data(c *CommandLine) error { var node string force := "false" .... node = fs.Args()[len(fs.Args())-1] if o.Force { force = "true" } // 向本人的8084发送HTTP remove-data url := c.getURL("remove-data", map[string]string{"node": node, "force": force}) resp, err := http.Get(url) if err != nil { return err } defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) ... return nil}node1上/remove-data的解决: ...

September 25, 2021 · 3 min · jiezi

关于influxdb:InfluxDB集群-添加DataNode源码分析

influxdb集群中,client在node1上执行add dataNode node3的命令: influxd_ctl add-data node3:8088整体流程如下: node1收到CLI命令,向本人的8084发送GET /add-data申请,request body: {"node": "node3:8088"};node1向node3的8088端口发送TCP音讯AddDataNode;node3收到AddDataNode后,被动将本人退出集群; CLI命令解决命令行解析的代码入口: // cmd/influxd_ctl/cli/cli.gofunc (c *CommandLine) Run() error { switch cmd { case "add-data": return do_add_data(c) }}向本机的集群治理端口8084,发送GET /add-data: func do_add_data(c *CommandLine) error { var node string node = c.CMD[1] // 向本机的8084发送http add-data url := c.getURL("add-data", map[string]string{"node": node}) resp, err := http.Get(url) if err != nil { return err } defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) ...... return nil}node1向node3发送AddDataNodenode1解决GET /add-data的handler: ...

September 24, 2021 · 3 min · jiezi

关于influxdb:InfluxDB集群-移除MetaNode源码分析

influxdb集群中,client在node1上执行remove metaNode node3的命令: influxd_ctl remove-meta node3:8091整体流程如下: node1收到CLI命令,向本人8084端口发送GET /remove-meta的申请,request body: {"metahttp:": "node3:8091"};node1向node3发送GET http://node3:8091/ping,探测node3是否存活;node1执行metaclient.DeleteMetaNode():Raft中删除该节点信息;node向node3发送GET http://node3:8091/remove-meta,node3做一些清理操作; CLI命令解决命令行代码入口: // cmd/influxd_ctl/cli/cli.gofunc (c *CommandLine) Run() error { .... switch cmd { case "remove-meta": return do_remove_meta(c) }}向本人的8084发送GET /remove-meta: func do_remove_meta(c *CommandLine) error { // flag读命令行的参数 fs := flag.NewFlagSet("", flag.ExitOnError) o := RemoveMetaOptions{} fs.BoolVar(&o.Force, "force", false, "force remove meta node") fs.Parse(c.CMD[1:]) httpAddr := fs.Args()[len(fs.Args())-1] force := "false" if o.Force { force = "true" } // 向本身的8084发送remote-meta,body中带要删除节点的httpAddr url := c.getURL("remove-meta", map[string]string{"metahttp": httpAddr, "force": force}) resp, err := http.Get(url) if err != nil { return err } defer resp.Body.Close() .....}admin_cluster监听8084端口,负责集群的治理性能,/remove-meta的handler: ...

September 23, 2021 · 4 min · jiezi

关于influxdb:InfluxDB集群-查询database源码分析

curl向influxdb集群的任意meta节点发送查问database申请,都能够返回集群中所有database信息: curl -i -XPOST http://node-1:8086/query --data-urlencode "q=show databases"原理是每个meta节点都保护了最新的snapshot信息,当有查问申请时,返回本地的snapshot中的databases信息;snapshot信息由后盾goroutine定期的向leader查问失去。 整体流程: HTTP handler与create database的流程相似,咱们从ExecuteStatement开始: // cluster/statement_executor.gofunc (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx *query.ExecutionContext) error { ...... switch stmt := stmt.(type) { case *influxql.ShowDatabasesStatement: rows, err = e.executeShowDatabasesStatement(stmt, ctx) } .....}同样的,还是metaClient调用,这次是Database()函数: func (e *StatementExecutor) executeShowDatabasesStatement(q *influxql.ShowDatabasesStatement, ctx *query.ExecutionContext) (models.Rows, error) { dis := e.MetaClient.Databases() a := ctx.ExecutionOptions.Authorizer row := &models.Row{Name: "databases", Columns: []string{"name"}} for _, di := range dis { // Only include databases that the user is authorized to read or write. if a.AuthorizeDatabase(influxql.ReadPrivilege, di.Name) || a.AuthorizeDatabase(influxql.WritePrivilege, di.Name) { row.Values = append(row.Values, []interface{}{di.Name}) } } return []*models.Row{row}, nil}这里的Database()返回的是client保留的cacheData: ...

September 22, 2021 · 2 min · jiezi

关于influxdb:InfluxDB集群-创建database源码分析

创立database,能够通过CLI命令,也能够通过HTTP,两种形式走的是同一套逻辑流程。 本文以HTTP为例,剖析集群模式下创立database的源码流程。 curl -i -XPOST http://localhost:8086/query --data-urlencode "q=CREATE DATABASE mydb"database是集群的元信息,须要Raft强统一;create database的request若被发送给follower节点,则返回NotLeader的redirect响应(带leader的url),client从新向Leader节点发送request。 整体流程: HTTP handler8086是httpd的监听端口,其hander: // services/httpd/handler.gofunc NewHandler(c Config) *Handler { ...... h.AddRoutes([]Route{ { "query", // Query serving route. "POST", "/query", true, true, h.serveQuery, }, })}持续向下走: // services/httpd/handler.gofunc (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user meta.User) { ... // Execute query. results := h.QueryExecutor.ExecuteQuery(q, opts, closing) ...}// query/executor.gofunc (e *Executor) executeQuery(query *influxql.Query, opt ExecutionOptions, closing <-chan struct{}, results chan *Result) { ...... // Send any other statements to the underlying statement executor. err = e.StatementExecutor.ExecuteStatement(stmt, ctx) ......}辨认到Create database语句: ...

September 22, 2021 · 3 min · jiezi

关于influxdb:InfluxDB集群-节点部署命令的源码分析

上文剖析到,InfluxDB集群的部署,波及到3个命令: influxd过程的启动;增加集群的data-nodes;查问集群的节点信息;本文联合源码,剖析每一步具体都是怎么实现的。 influxd过程启动命令: # influxd -config /etc/influxdb/influxdb.conf -join ops1:8091,ops2:8091,ops3:8091 --hostname ops3这里重点剖析-join参数。1.读取参数 //cmd/influxd/run/command.gofunc (cmd *Command) ParseFlags(args ...string) (Options, error) { ....... fs.StringVar(&options.Join, "join", "", "") return options, nil}// Run parses the config from args and runs the server.func (cmd *Command) Run(args ...string) error { // Propogate the top-level join options down to the meta config if config.Join != "" { config.Meta.JoinPeers = strings.Split(options.Join, ",") } ....}2.将peers退出Raft将joinPeers作为判断条件,找到所有的raftAddr,而后传入Raft-lib作为初始节点: //services/meta/store.go// open opens and initializes the raft store.func (s *store) open(raftln net.Listener) error { joinPeers = s.config.JoinPeers var initializePeers []string for { peers := c.peers() if !Peers(peers).Contains(s.raftAddr) { peers = append(peers, s.raftAddr) } if len(s.config.JoinPeers)-len(peers) == 0 { initializePeers = peers break } } // Open the raft store. if err := s.openRaft(initializePeers, raftln); err != nil ......}增加data nodes命令: ...

September 20, 2021 · 2 min · jiezi

关于influxdb:InfluxDB集群-节点部署过程

如果集群节点较少,一个node会同时负责meta和data,node上部署的过程同时具备meta和data的治理性能。 启动参数对于3节点的InfluxDB集群,其启动参数: //ops1节点# influxd -config /etc/influxdb/influxdb.conf -join ops1:8091,ops2:8091,ops3:8091 --hostname ops1//ops2节点# influxd -config /etc/influxdb/influxdb.conf -join ops1:8091,ops2:8091,ops3:8091 --hostname ops2//ops3节点# influxd -config /etc/influxdb/influxdb.conf -join ops1:8091,ops2:8091,ops3:8091 --hostname ops3启动结束后,应用集群治理命令,能够查问以后集群的节点: # influxd_ctl show-nodes============Meta Nodes============ID Host TCPHost1 ops2:8091 ops2:80882 ops3:8091 ops3:80883 ops1:8091 ops1:8088============Data Nodes============ID Host TCPHost能够看到,meta nodes均已被辨认。 增加data nodes在其中一个节点上执行: # influxd_ctl add-data ops1:8088# influxd_ctl add-data ops2:8088# influxd_ctl add-data ops3:8088再查问集群中以后节点: # influxd_ctl show-nodes============Meta Nodes============ID Host TCPHost1 ops2:8091 ops2:80882 ops3:8091 ops3:80883 ops1:8091 ops1:8088============Data Nodes============ID Host TCPHost1 178.104.163.69:8086 ops2:80882 178.104.163.239:8086 ops3:80883 178.104.163.205:8086 ops1:8088能够看到,data-nodes均已退出集群。 ...

September 20, 2021 · 1 min · jiezi

关于influxdb:InfluxDB集群-整体部署架构

influxdb从0.12版本开始不再将其cluster源码开源。 基于influxdb 1.6.0单机版源码分支,将0.12的cluster源码cherry-pick过去,能够失去v1.6.0版本的集群版源码。 分布式InfluxDB集群有两种节点:meta节点和data节点: meta节点负责管理集群的元数据信息,比方database/retention policy等;data节点负责保留集群的时序数据;分布式InfluxDB集群的一致性来说: meta节点要求强一致性,即在节点间始终保持统一,通过Raft实现;data节点要求最终一致性,即时序数据最终保障统一即可,通过hh,anti-entropy实现;在理论部署中,如果节点较少,通常1个节点同时负责meta和data的职责,看一下它的整体构造: 每个节点应用4个端口: 8086:负责接管解决client的/query和/write操作;8084:负责接管解决client的集群治理操作,比方增加/删除节点等;8088:负责底层Raft的RPC通信(TCP),传入Raft lib即可;8091:负责节点间HTTP查问和操作Raft信息,比方/join、/execute等;端口8086配置文件中指定: [http] # Determines whether HTTP endpoint is enabled. # enabled = true # The bind address used by the HTTP service. bind-address = "0.0.0.0:8086"代码中应用该端口: //services/httpd/service.go// NewService returns a new instance of Service.func NewService(c Config) *Service { s := &Service{ addr: c.BindAddress, https: c.HTTPSEnabled, cert: c.HTTPSCertificate, key: c.HTTPSPrivateKey, limit: c.MaxConnectionLimit, err: make(chan error), unixSocket: c.UnixSocketEnabled, unixSocketPerm: uint32(c.UnixSocketPermissions), bindSocket: c.BindSocket, Handler: NewHandler(c), Logger: zap.NewNop(), } .....}端口8084次要负责集群治理API,比方show-nodes/add-data/add-meta等:配置文件中指定: ...

September 20, 2021 · 2 min · jiezi

关于influxdb:InfluxDB-查询measurement的tag和field字段

influxdb的记录是以行协定存储的,行协定中蕴含measurement、tag set、field set和timestamp。 拿到measurement的数据当前,如何晓得哪些是tag字段,哪些是field字段呢? 以上面的时序数据为例: > select * from cpu_used_percent order by time limit 3name: cpu_used_percenttime dstype endpoint step value---- ------ -------- ---- -----2021-07-13T08:42:00Z GAUGE dfe16db6-719f-4394-ad08-1bafd072576b 60 8.5291198965230992021-07-13T08:42:00Z GAUGE dfe16db6-719f-4394-ad08-1bafd072576b 60 7.0762562238525962021-07-13T08:42:00Z GAUGE dfe16db6-719f-4394-ad08-1bafd072576b 60 7.157941185740853查问tag字段:> show tag keys from cpu_used_percent;name: cpu_used_percenttagKey------dstypeendpointstep能够看到dsType/endpoint/step是tag字段,tag的value是string。 查问field字段:> show field keys from cpu_used_percent;name: cpu_used_percentfieldKey fieldType-------- ---------value float能够看到value是field字段,因为value能够是float/int/string/bool等类型,这里的value是float。

September 19, 2021 · 1 min · jiezi

关于influxdb:InfluxDB-influxQL的group和fill

group是influxQL中常见的聚合函数,罕用于按工夫聚合(一段时间内的最大/最小/均匀);若在聚合时没有足够的数据点,可指定fill填充特定的值。 group和fill的语法SELECT <function>(<field_key>) FROM_clause WHERE <time_range> GROUP BY time(time_interval,[<offset_interval])[,tag_key] [fill(<fill_option>)]fill()用于填充没有数据的时序序列的值,其选项为: null: 默认,显示工夫戳但value=null的序列;none:在后果中不显示该工夫戳的序列;数值:fill(0),填充0;linear: 线性插入数值;previous: 填充前一个序列的值;demo原始数据:两头短少了09:05:00工夫点的数据 > select * from cpu_used_percent where endpoint='Datacenter_172.118.16.19' and time >= '2021-09-17T09:00:00Z' and time <= '2021-09-17T09:10:00Z';name: cpu_used_percenttime dstype endpoint step value---- ------ -------- ---- -----2021-09-17T09:00:00Z GAUGE Datacenter_172.118.16.19 60 2.23412737988669142021-09-17T09:01:00Z GAUGE Datacenter_172.118.16.19 60 2.70634968575972222021-09-17T09:02:00Z GAUGE Datacenter_172.118.16.19 60 2.8373020899093862021-09-17T09:03:00Z GAUGE Datacenter_172.118.16.19 60 2.19444483317467182021-09-17T09:04:00Z GAUGE Datacenter_172.118.16.19 60 2.4007940760771732021-09-17T09:06:00Z GAUGE Datacenter_172.118.16.19 60 2.0634924290250082021-09-17T09:07:00Z GAUGE Datacenter_172.118.16.19 60 2.1984130878458742021-09-17T09:08:00Z GAUGE Datacenter_172.118.16.19 60 2.2698416719275092021-09-17T09:09:00Z GAUGE Datacenter_172.118.16.19 60 2.4722226601588082021-09-17T09:10:00Z GAUGE Datacenter_172.118.16.19 60 1.9841273356009694默认:fill(null),显示value=null ...

September 18, 2021 · 2 min · jiezi

关于influxdb:InfluxDB-influxQL的模糊查询

influxQL查问measurement中的指标时,可应用含糊查问。 查问measurement中字段蕴含特定字符串: select * from cpu_used_percent where endpoint=~/192.168.0.1/ and time > '2021-09-11T00:00:00Z' order by time desc limit 10;查问measurement中字段以某个字符串起始: select * from cpu_used_percent where endpoint=~/^datacenter/ and time > '2021-09-11T00:00:00Z' order by time desc limit 10;查问meaurement中字段以某个字符串完结: select * from cpu_used_percent where endpoint=~/vm-$/ and time > '2021-09-11T00:00:00Z' order by time desc limit 10;//查问以Datacenter结尾、vm-40完结的时序数据select * from cpu_used_percent where endpoint=~/^Datacenter.*vm-40$/ order by time desc limit 10;

September 17, 2021 · 1 min · jiezi

关于influxdb:InfluxDB-数据的导入导出

influxdb反对将时序数据导出到文件,而后再将文件导入到数据库中,以此实现数据的迁徙。 导出导出命令的语法格局: # influx_inspect export --helpExports TSM files into InfluxDB line protocol format.Usage: influx_inspect export [flags] -compress Compress the output -database string Optional: the database to export -datadir string Data storage path (default "/root/.influxdb/data") -end string Optional: the end time to export (RFC3339 format) -out string Destination file to export to (default "/root/.influxdb/export") -retention string Optional: the retention policy to export (requires -database) -start string Optional: the start time to export (RFC3339 format) -waldir string WAL storage path (default "/root/.influxdb/wal")数据导出demo: ...

September 17, 2021 · 2 min · jiezi

关于influxdb:InfluxDB-Series解析

influxdb中,series是一个很重要的概念,它是retentionPolicy、measurement、tag set雷同的汇合,蕴含了监控对象的元数据信息,series的数量=RP*measurement*tag set。 一般来讲,监控对象稳固后,series根本是固定的;influxdb将series放在内存作为索引,放慢了数据查问,这使得series的数量不能太大,否则influxdb内存会被撑爆,默认单个database内series限度为<100W个。 // influxdb.conf# The maximum series allowed per database before writes are dropped. This limit can prevent# high cardinality issues at the database level. This limit can be disabled by setting it to# 0.# max-series-per-database = 1000000series汇合的例子:每一行代表一个series 序号retention policymeasuermenttag set1autogencpu.used.percentlocation=sh,node=node10012autogencpu.used.percentlocation=bj,node=node30013autogencpu.used.percentlocation=sz,node=5001查问series信息查问所有的series# influx -execute 'show series on opsultra'统计所有的series数量# influx -execute 'show series on opsultra' |wc -l将series导出到csv# influx -execute 'show series on opsultra' -format 'csv' > /tmp/series.csv查问series cardinalitycardinality反馈了series的维度,即不同的series的数量: 查问series的维度# influx -execute 'show series cardinality on opsultra'将series的维度导出到csv: 按measurement进行cardinality进行统计# influx -execute 'show series exact cardinality on opsultra' -format 'csv' > /tmp/series.csv参考:https://jasper-zhang1.gitbook...https://docs.influxdata.com/i... ...

September 17, 2021 · 1 min · jiezi

关于influxdb:InfluxDB-Retention-Policy解析

Retention Policy(RP)是数据保留工夫策略,超过了肯定的工夫后,老的数据会被主动删除。 联合CQ(Continuous Query)和RP,能够将历史数据保留为低精度,最近的数据保留为高精度,以升高存储用量。 RP的语法结构:CREATE RETENTION POLICY <retention_policy_name> ON <database_name> DURATION <duration> REPLICATION <n> [SHARD DURATION <duration>] [DEFAULT]其中: duration指定了数据保留的工夫,当超过了这个工夫后,数据被主动删除;replication指定每个shard的正本数,默认为1,集群场景须要>=2;shard duration实际上指定每个shardGroup保留数据的工夫长度,能够不传入,零碎会依据duration主动计算一个值;default指定是否默认的RP,若RP为默认,创立database未指定RP时,就应用默认的RP;influxdb内置了一个默认策略autogen: duration=0s示意永不过期;shardGroupDuration=168h示意1个shardGroup保留7天的数据;> show retention policies;name duration shardGroupDuration replicaN default---- -------- ------------------ -------- -------autogen 0s 168h0m0s 1 trueshardGroup vs shard:shardGroup蕴含若干个shard;shardGroup指定保留1段时间的数据,shardGroup下所有shard的数据都位于这个工夫范畴内; 看一下单机版influxdb,rp=autogen,replica=1的shards状况: > show shard groups;name: shard groupsid database retention_policy start_time end_time expiry_time-- -------- ---------------- ---------- -------- -----------25 falcon autogen 2020-04-27T00:00:00Z 2020-05-04T00:00:00Z 2020-05-04T00:00:00Z33 falcon autogen 2020-05-04T00:00:00Z 2020-05-11T00:00:00Z 2020-05-11T00:00:00Z42 falcon autogen 2020-05-11T00:00:00Z 2020-05-18T00:00:00Z 2020-05-18T00:00:00Z51 falcon autogen 2020-05-18T00:00:00Z 2020-05-25T00:00:00Z 2020-05-25T00:00:00Z60 falcon autogen 2020-05-25T00:00:00Z 2020-06-01T00:00:00Z 2020-06-01T00:00:00Z69 falcon autogen 2020-06-01T00:00:00Z 2020-06-08T00:00:00Z 2020-06-08T00:00:00Z78 falcon autogen 2020-06-08T00:00:00Z 2020-06-15T00:00:00Z 2020-06-15T00:00:00Z每个shardGroup保留7day的数据,1个shardGroup蕴含1个shard: ...

September 16, 2021 · 2 min · jiezi

关于influxdb:InfluxDB-Continuous-Query解析

Continuous Query(CQ)是外部周期运行的InfluxQL的查问,将查问后果写入指定的measurement,用它能够实现降采样downsampling。 联合CQ和RP(Retention Policy),能够将历史数据保留为低精度,最近的数据保留为高精度,以升高存储用量。 Continuous Query的语法结构:CREATE CONTINUOUS QUERY <cq_name> ON <database_name>BEGIN<cq_query>END其中的<cq_query>须要一个函数,一个INTO字句和一个GROUP BY time()字句,其语句构造: SELECT <function[s]> INTO <destination_measurement> FROM <measurement> [WHERE <stuff>] GROUP BY time(<interval>)[,<tag_key[s]>]举个简略的例子将cpu.user数据进行5m均匀: CREATE CONTINUOUS QUERY "cq_5" ON falcon BEGIN SELECT mean(value) INTO falcon.autogen."5_cpu.user" FROM falcon.autogen."cpu.user" GROUP BY time(5m), * END其中的falcon.autogen."5_cpu.user",falcon示意database,autogen示意retention policy,5_cpu.user示意新的table。 //查问cq> show continuous queries;name: falconname query---- -----cq_5 CREATE CONTINUOUS QUERY cq_5 ON falcon BEGIN SELECT mean(value) INTO falcon.autogen."5_cpu.user" FROM falcon.autogen."cpu.user" GROUP BY time(5m), * END//查问cq中的数据> select * from "5_cpu.user" order by time desc limit 10;name: 5_cpu.usertime endpoint mean---- -------- ----2020-06-09T04:55:00Z trovedev 6.8062524000384982020-06-09T04:50:00Z trovedev 13.7962891284793352020-06-09T04:45:00Z trovedev 16.572126105753662020-06-09T04:40:00Z trovedev 28.5775916914236352020-06-09T04:35:00Z trovedev 6.028823347020752020-06-09T04:30:00Z trovedev 4.6673982823260422020-06-09T04:25:00Z trovedev 4.7462371716753452020-06-09T04:20:00Z trovedev 5.0061695746605942020-06-09T04:15:00Z trovedev 5.0022800377362612020-06-09T04:10:00Z trovedev 4.775274987155538值得注意的是,CQ仅对实时的数据进行聚合,不会对历史数据聚合;也就是说,如果没有新的源数据,就不会产生新的聚合点。 ...

September 16, 2021 · 1 min · jiezi

关于influxdb:InfluxDB-行协议

influxdb行协定是读取和写入的时序数据格式,格局为: +-----------+--------+-+---------+-+---------+|measurement|,tag_set| |field_set| |timestamp|+-----------+--------+-+---------+-+---------+比方时序数据: weather,location=us-midwest temperature=82 1465839830100400200cpu_used_percent,dstype=GAUGE,endpoint=monitor_server,step=60 value=66 1614217980000000000参数定义必填数据类型Measurement表名Y字符串Tag-key/value标签key/valueNkey/value都是字符串Field-key/value字段key/valueYkey是字符串, value能够是字符串/浮点数/整型/布尔值Timestamp工夫戳YUnix工夫,ns精度Measurement指标名称,能够了解为MySQL中的table,例子中的measurement=weath,cpu_used_percent。 Tag-key/value标签key和value,均是字符串类型,多个tag用,分隔: <tag-key>=<tag-value>,<tag-key>=<tag-value>为放慢查问,influxdb在tag-key上创立了索引。 Field-key/value字段key/value,key是字符串类型,value能够是字符串/数值/布尔类型;多个field用,分隔: <field-key>=<field-value>,<field-key>=<field-value>当field value是字符串类型时,在执行influxQL时须要用""包裹字符串: INSERT weath,location=us-midwest temperature="hot"最罕用的是将value放入field,比方: insert cpu_used_percent,endpoint=monitor_server,step=60 value=66 Timestamp工夫戳是纳秒精度的Unix工夫戳;当INSERT未指定timestamp时,应用以后工夫。 参考:1.https://jasper-zhang1.gitbook...

September 16, 2021 · 1 min · jiezi

关于influxdb:influxdb基础1-安装使用

一、参考influxdb 学习系列目录 ——更新ing Install InfluxDB 二、装置下载链接 cd work/envmkdir influxdbcd influxdbmv ~/Desktop/influxdb2-2.0.8-darwin-amd64.tar.gz ./tar zxvf influxdb2-2.0.8-darwin-amd64.tar.gz三、根本应用3.1 服务端运行cd /Users/yz/work/env/influxdb/influxdb2-2.0.8-darwin-amd64./influxd3.2 应用golang客户端 写入数据依赖装置 go get github.com/influxdata/influxdb-client-go/v2

August 20, 2021 · 1 min · jiezi

关于influxdb:influxdb-学习系列目录-更新ing

一、根底(文档学习)elasticsearch根底(1)——概念

August 19, 2021 · 1 min · jiezi

关于influxdb:InfluxDb使用

<!--引入maven--><dependency> <groupId>org.influxdb</groupId> <artifactId>influxdb-java</artifactId> <version>2.15</version></dependency>/*** influxdb 连贯配置类*/@Configurationpublic class InfluxDbConfig { @Value("${spring.influx.url:''}") private String influxDBUrl; @Value("${spring.influx.user:''}") private String userName; @Value("${spring.influx.password:''}") private String password; @Value("${spring.influx.database:''}") private String database; @Value("${spring.influx.retentionPolicy:''}") private String retentionPolicy; public static String tabName; @Bean public InfluxDbUtils influxDbUtils() { return new InfluxDbUtils(userName, password, influxDBUrl, database, retentionPolicy); } @Value("${spring.influx.tabName:''}") public void setTabName(String tabName) { InfluxDbConfig.tabName = tabName; }}/*** 操作工具类*/@Data@Slf4jpublic class InfluxDbUtils { private String userName; private String password; private String url; public String database; public String tabName; // 保留策略 private String retentionPolicy; // InfluxDB实例 private InfluxDB influxDB; /** * @param userName 数据库用户名 * @param password 数据库明码 * @param url 数据库连贯 * @param database 数据库名称 * @param retentionPolicy 保留策略,默认autogen策略(数据保留7天,SHOW RETENTION POLICIES ON database) */ public InfluxDbUtils(String userName, String password, String url, String database, String retentionPolicy) { this.userName = userName; this.password = password; this.url = url; this.database = database; this.retentionPolicy = StringUtils.isBlank(retentionPolicy) ? "autogen" : retentionPolicy; this.influxDB = influxDbBuild(); } /** * 连贯数据库 ,若不存在则创立 * * @return influxDb实例 */ private InfluxDB influxDbBuild() { if (influxDB == null) { influxDB = InfluxDBFactory.connect(url, userName, password); } try { createDB(database); influxDB.setDatabase(database); } catch (Exception e) { log.error("create influx db failed, error: {}", e.getMessage()); } finally { influxDB.setRetentionPolicy(retentionPolicy); } influxDB.setLogLevel(InfluxDB.LogLevel.BASIC); return influxDB; } /**** * 创立数据库 * @param database */ private void createDB(String database) { influxDB.query(new Query("CREATE DATABASE " + database)); }//具体应用办法InfluxDbUtils influxDbUtils = SpringUtil.getBean(InfluxDbUtils.class);influxDB = influxDbUtils.getInfluxDB();Point.Builder point = Point.measurement(InfluxDbConfig.tabName);point.time(System.currentTimeMillis(), TimeUnit.MILLISECONDS);point.tag("code","");point.addField("name","");influxDB.write(point.build());

May 22, 2021 · 2 min · jiezi

关于prometheus:可观测告警运维系统调研SLS告警与多款方案对比

简介:本文介绍比照多款告警监控运维平台计划,笼罩阿里云SLS、Azure、AWS、自建零碎(ELK、Prometheus、TICK)等计划。前言本篇是SLS新版告警系列宣传与培训的第三篇,后续咱们会推出20+系列直播与实战培训视频,敬请关注。系列目录(继续更新)• 一站式云原生智能告警运维平台——SLS新版告警公布!• 这才是可观测告警运维平台——20个SLS告警运维场景• 可观测告警运维零碎调研——SLS告警与多款计划比照(本篇)1. 什么是SLS告警运维零碎1.1. SLS(日志服务)是什么SLS是阿里云上云原生观测剖析平台,为Log/Metric/Trace等数据提供大规模、低成本、实时平台化服务。目前对内曾经是“阿里巴巴 + 蚂蚁金服”零碎的数据总线,数年稳固撑持双十一、双十二、新春红包流动。对外则曾经服务阿里云几十万企业客户。 1.2. SLS新版告警——一站式智能告警运维零碎SLS新版告警在中国站等公布公测(国内站预计4月公布),新版在SLS云原生可观测性平台上提供了一站式智能运维告警零碎。新版告警提供对日志、时序等各类数据的告警监控,亦可承受三方告警,对告警进行降噪、事件治理、告诉治理等,新增40+性能场景,充分考虑研发、运维、平安以及经营人员的告警监控运维需要。 1.3. 劣势应用SLS新版告警,能够无效缓解典型告警运维零碎的痛点,和其余自建、商业化或云厂商提供的计划比,具备如下5大劣势: 2. 与自建计划比照2.1. 与ELK X-Pack 告警 (Watcher/KibanaAlert)比照2.1.1. 简介自建计划ELK示意应用开源计划ElasticSearch + Logstash + Kibana组合,然而其不包含告警性能,须要额定购买X-Pack商业包,会领有2个告警性能,一个是ElasticSearch附带的Watcher,一个是Kibana 7.x+新增的Alert性能,留神:这两个告警性能相互独立,并不能协同和关联。2.1.2. 评估比照 2.2. 与Prometheus & Loki(含AlertManager)告警比照2.2.1. 简介自建计划Prometheus&Loki示意应用开源计划Prometheus + Loki + AlertManager组合,其中Prometheus Alert针对时序进行告警监控,Loki针对日志进行告警监控,两者独特将告警发送给Alert Manager进行告警治理。2.2.2. 评估比照 2.3. 与InfluxDB 2.0 告警(含Kapa 2.3.1. 简介自建计划InfluxDB示意应用开源计划InfluxDB OSS 2.0 + kapacitor组合搭建告警监控零碎,如果须要集群部署性能,还须要购买InfluxDB商业版本。留神,该计划只能反对针对时序数据的告警监控。2.3.2. 评估比照 3. 与其余云厂商计划比照3.1. 与AWSCloudWatch 告警 +SNS+SSM比照3.1.1. 简介AWS告警监控计划,须要依赖AWS CloudWatch告警加上多个其余AWS服务达到告警监控和治理的能力。个别应用CloudWatch Alarm + SNS + System Manager OpsCenter组合的形式实现对日志、时序的监控治理。CloudWatch Logs反对日志的采集,但理论监控告警时,须要先转换成时序才行。3.1.2. 评估比照![上传中...]()3.2. 与AzureMonitor 告警比照3.2.1. 简介Azure Monitor反对残缺的基于时序与日志的监控,并很好集成了上下游计划提供残缺的告警监控与告警治理告诉性能。3.2.2. 评估比照 ...

April 19, 2021 · 1 min · jiezi

关于influxdb:Centos-安装-influxdb

下载安装包拜访 https://portal.influxdata.com/downloads/influxdb/2.0/db/ 抉择平台下载安装包。 此处抉择生成命令行: wget https://dl.influxdata.com/influxdb/releases/influxdb2-2.0.4.x86_64.rpmsudo yum localinstall influxdb2-2.0.4.x86_64.rpm服务启动$ service influxdb start # 启动服务$ service influxdb status # 查看状态

March 29, 2021 · 1 min · jiezi

Influx-Sql系列教程一database-数据库

对于influxdb而言,database和我们更熟悉的mysql中的dababse没有什么特别的区别,可以将数据库简单理解为一堆表(measurement)的集合,接下来我们将看一下在influxdb中,database的常规操作 <!-- more --> 1. 查看当前数据库如果需要查询当前有哪些数据库,可以通过show语句来实现 show database 上面的_internal是内置的数据库 2. 创建数据库create database yhh创建一个名为yhh的数据库 3. 使用数据库如果需要查询某个measurement的数据时,首先需要进入到对应的数据库,直接使用use语句即可 use yhh 4. 删除数据库数据库的删除,需要慎重,因为会删除其中所有的数据,属于高危操作 drop database yhh II. 其他0. 系列博文190717-Influx Sql系列教程零:安装及influx-cli使用姿势介绍190509-InfluxDb之时间戳显示为日期格式190506-InfluxDB之配置修改190505-InfluxDB之权限管理180727-时序数据库InfluxDB之备份和恢复策略180726-InfluxDB基本概念小结180725-InfluxDB-v1.6.0安装和简单使用小结参考博文 Database management using InfluxQL1. 一灰灰Blog: https://liuyueyi.github.io/he...一灰灰的个人博客,记录所有学习和工作中的博文,欢迎大家前去逛逛 2. 声明尽信书则不如,已上内容,纯属一家之言,因个人能力有限,难免有疏漏和错误之处,如发现bug或者有更好的建议,欢迎批评指正,不吝感激 微博地址: 小灰灰BlogQQ: 一灰灰/33027978403. 扫描关注一灰灰blog

August 21, 2019 · 1 min · jiezi

Influx-Sql系列教程零安装及influxcli使用姿势介绍

influxdb 时序数据库,因为实际业务中使用到了,然而并没有发现有特别好的文章,完整的介绍influx sql的使用姿势,因此记录下实际开发中学习的体会,主要参考来自于官方文档 Influx Query Language (InfluxQL) <!-- more --> influx已经推出2.0beta版本,并没有使用,后面的所有都是以1.7版本进行说明 I. 安装安装教程,直接参考官网链接,installing-influxdb-oss,下面只介绍centos/macos两个操作系统的安装姿势 1. centos通过yum包管理方式实现安装最新的稳定版, 在终端中输入 cat <<EOF | sudo tee /etc/yum.repos.d/influxdb.repo[influxdb]name = InfluxDB Repository - RHEL \$releaseverbaseurl = https://repos.influxdata.com/rhel/\$releasever/\$basearch/stableenabled = 1gpgcheck = 1gpgkey = https://repos.influxdata.com/influxdb.keyEOF然后就可以按照常见的yum方式进行安装 sudo yum install influxdb# 启动sudo service influxdb start# 7+ 版本可以使用 systemctl 方式启动sudo systemctl start influxdb2. macosmac推荐通过homebrew方式进行安装,命令也比较简单 brew updatebrew install influxdb3. 相关配置一般安装完毕之后,如果作为测试的话,直接使用并没有啥问题;但是实际的成产环境中,铁定是需要修改默认配置的 如果需要开启权限校验,访问时需要用户名密码时,可以参考: 190505-InfluxDB之权限管理 如果需要修改数据的存储位置,访问端口号等,可以参考: 190506-InfluxDB之配置修改 II. influx-cli安装完毕之后,influx自带了一个控制台访问操作的工具: influx,在正式进入后面的influxsql之前,有必要了解一下这个工具如何使用,因为后面的sql,都是需要在它上面玩耍的 官方也给出了它的使用文档,有兴趣的可以参考: InfluxDB command line interface (CLI/shell) ...

August 21, 2019 · 1 min · jiezi

分布式时序数据库QTSDB的设计与实现

奇技指南现有的开源时序数据库influxdb只支持单机运行,在面临大量数据写入时,会出现查询慢,机器负载高,单机容量的限制。 为了解决这一问题,360基础架构团队在单机influxdb的基础上,开发了集群版——QTSDB QTSDB 简述QTSDB是一个分布式时间序列数据库,用于处理海量数据写入与查询。实现上,是基于开源单机时序数据库influxdb 1.7开发的分布式版本,除了具有influxdb本身的特性之外,还有容量扩展、副本容错等集群功能。 主要特点如下: 为时间序列数据专门编写的高性能数据存储, 兼顾写入性能和磁盘空间占用;类sql查询语句, 支持多种统计聚合函数;自动清理过期数据;内置连续查询,自动完成用户预设的聚合操作;Golang编写,没有其它的依赖, 部署运维简单;节点动态水平扩展,支持海量数据存储;副本冗余设计,自动故障转移,支持高可用;优化数据写入,支持高吞吐量;系统架构逻辑存储层次结构 influxdb架构层次最高是database,database下边根据数据保留时长不同分成了不同的retension policy,形成了database下面的多个存储容器,因为时序数据库与时间维度关联,所以将相同保留时长的内容存放到一起,便于到期删除。除此之外,在retension policy之下,将retension policy的保留时长继续细分,每个时间段的数据存储在一个shard group中,这样当某个分段的shard group到期之后,会将其整个删掉,避免从存储引擎内部抠出部分数据。例如,在database之下的数据,可能是30天保留时长,可能是7天保留时长,他们将存放在不同的retension policy之下。假设将7天的数据继续按1天进行划分,就将他们分别存放到7个shard group中,当第8天的数据生成时,会新建一个shard group写入,并将第 1天的shard group整个删除。 到此为止,同一个retension policy下,发来的当下时序数据只会落在当下的时间段,也就是只有最新的shard group有数据写入,为了提高并发量,一个shard group又分成了多个shard,这些shard全局唯一,分布于所有物理节点上,每个shard对应一个tsm存储引擎,负责存储数据。 在请求访问数据时,通过请求的信息可以锁定某个database和retension policy,然后根据请求中的时间段信息,锁定某个(些)shard group。对于写入的情况,每条写入的数据都对应一个serieskey(这个概念后面会介绍),通过对serieskey进行哈希取模就能锁定一个shard,进行写入。而shard是有副本的,在写入的时候会采用无主多写的策略同时写入到每个副本中。查询时,由于查询请求中没有serieskey的信息,所以只能将shard group内的shard都查询一遍,针对一个shard,会在其副本中选择一个可用的物理节点进行访问。 那么一个shard group要有多少shard呢,为了达到最大并发量,又不过分干扰数据整体的有序性,在物理节点数和副本数确定后,一个shard group内的shard数量是机器数除以副本数,保障了当下的数据可以均匀写入到所有的物理节点之上,也不至于因为shard过多影响查询效率。例如,图上data集群有6个物理节点,用户指定双副本,那么就有3个shard。 集群结构 整个系统分成三个部分:proxy、meta集群、data集群。proxy负责接收请求,无状态,其前可接lvs支持水平扩展。meta集群保存上面提到的逻辑存储层次及其与物理节点的对应关系,通过raft协议保障元数据的强一致,这里meta信息保存在内存中,日志和快照会持久化到磁盘。data集群是真正的数据存储节点,数据以shard为单位存储于其上,每个shard都对应一个tsm存储引擎。 请求到来的时候,经过lvs锁定一台proxy,proxy先根据database、retension policy和时间段到meta集群查找meta信息,最终得到一个shard到物理节点的映射,然后将这个映射关系转换为物理节点到shard的映射返回给proxy,最后根据这个映射关系,到data集群指定的物理节点中访问具体的shard,至于shard之下的数据访问后边会介绍。 数据访问语法格式 influxdb的查询提供类似于关系数据库的查询方式,展示出来类似一个关系表:measurement,时序数据库的时间作为一个永恒的列,除此之外的列分成两类: 1、field 一类是field,他们是时序数据最关键的数据部分,其值会随着时间的流动源源不断的追加,例如两台机器之间在每个时间点上的延迟。 2、tag 另一类是tag,他们是一个field值的一些标记,所以都是字符串类型,并且取值范围很有限。例如某个时间点的延迟field值是2ms,对应有两个标记属性,从哪台机器到哪台机器的延迟,因此可以设计两个tag:from、to。 measurement展示出来第一行是key,剩下的可以看成value,这样tag有tagkey,tagvalue,field有fieldkey和fieldvalue。 数据读写 当收到一行写入数据时,会转化为如下的格式: measurement+tagkey1+tagvalue1+tagkey2+tagvalue2+fieldkey+fieldvalue+time。 如果一行中存在多个field就会划分成多条这样的数据存储。influxdb的存储引擎可以理解为一个map,从measurement到fieldkey作为存储key,后边的fieldvalue和time是存储value,这些值会源源不断追加的,在存储引擎中,这些值会作为一列存储到一起,因为是随时间渐变的数据,将他们保存到一起可以提升压缩的效果。另外将存储key去掉fieldkey之后剩余部分就是上边提到的serieskey。 上边提到,访问请求在集群中如何锁定shard,这里介绍在一个shard内的访问。 influxdb的查询类似于sql语法,但是跟sql语句的零散信息无法直接查询存储引擎,所以需要一些策略将sql语句转换成存储key。influxdb通过构建倒排索引来将where后的tag信息转换为所有相关的serieskey的集合,然后将每个serieskey拼接上select后边的fieldkey就组成了存储key,这样就可以按列取出对应的数据了。 通过对tsm存储引擎中存储key内serieskey的分析,能够构建出倒排索引,新版本influxdb将倒排索引持久化到每个shard中,与存储数据的tsm存储引擎对应,叫做tsi存储引擎。倒排索引相当于一个三层的map,map的key是measurment,值是一个二层的map,这个二层的map的key是tagkey,对应的值是一个一层的map,这个一层map的key是tagval,对应的值是一个serieskey的集合,这个集合中的每个serieskey字串都包含了map索引路径上的measurement、tagkey和tagval。 这样可以分析查询sql,用from后的measurement查询倒排索引三级map获得一个二级map,然后再分析where之后多个过滤逻辑单元,以tagkey1=tagval1为例,将这两个信息作为二层map的key,查到最终的值:serieskey的集合,这个集合的每个serieskey字串都包含了measurment、tagkey1和tagval1,他们是满足当下过滤逻辑单元的serieskey。根据这些逻辑单元的与或逻辑,将其对应的serieskey的集合进行交并运算,最终根据sql的语义过滤出所有的符合其逻辑的serieskey的集合,然后将这些serieskey与select后边的fieldkey拼接起来,得到最终的存储·key,就可以读取数据了。 不带聚合函数的查询:如图,对于一个serieskey,需要拼接众多的fieldkey,进而取出多个列的数据,他们出来后面临的问题是怎么组合为一行的数据,influxdb行列约束比较松散,不能单纯按照列内偏移确定行。Influxdb把serieskey和time作为判断列数据为一行的依据,每一个serieskey对应的多列就汇集为一个以多行为粒度的数据流,多个serieskey对应的数据流按照一定顺序汇集为一个数据流,作为最终的结果集返回到客户端。 带聚合函数的查询:这种方式与上边的查询正好相反,这里是针对聚合函数参数field,拼接上众多的serieskey,当然最终目的都是一样,得到存储key,多个存储key可以读取多个数据流,这些数据流面临两种处理,先将他们按照一定的顺序汇集为一个数据流,然后按照一定的策略圈定这个数据流内相邻的一些数据进行聚合计算,进而得到最终聚合后的值。这里的顺序和策略来自于sql语句中group by后的聚合方式。 多数据流的合并聚合方式,也同样适用于shard之上的查询结果。 对于写入就比较简单了,直接更新数据存储引擎和倒排索引就可以了。 整个流程对于访问的整个流程上边都已经提到了,这里整体梳理一下:分成两个阶段,在shard之上的查询,在shard之下的查询。 首先访问请求通过lvs锁定到某个proxy,proxy到meta集群中查找meta信息,根据请求信息,锁定database,retension policy和shard group,进而得到众多的shard。 对于写入操作,根据写入时的serieskey,锁定一个shard进行写入,由于shard存在多副本,需要同时将数据写入到多个副本。对于查询,无法通过请求信息得到serieskey,因此需要查询所有的shard,针对每个shard选择一个可用的副本,进行访问。 经过上边的处理就获得shard到物理节点的映射,然后将其反转为物理节点到shard的映射,返回给proxy,proxy就可以在data集群的某个节点访问对应的shard了。 在shard之下的写入访问,需要拆解insert语句,组合为存储键值对存入tsm存储引擎,然后根据组合的serieskey更新倒排索引。 ...

May 31, 2019 · 1 min · jiezi

用cAdvisor InfluxDB Grafana监控docker容器的TcpState

问题搭建完cAdvisor InfluxDB Grafana监控集群后, 发现没有tcp相关的数据.源码版本:https://github.com/google/cad…git commit hash:9db8c7dee20a0c41627b208977ab192a0411bf93搭建cAdvisor InfluxDB Grafana参考https://botleg.com/stories/mo…定位过程是否cadvisor没有记录tcp state?容易搜索到, 因为cadvisor的高cpu占用, 需要–disable_metrics=““https://github.com/google/cad…实际上并非如此. 不带任何参数情况下, 本地启动cadvisor.~/gopath/src/github.com/google/cadvisor(master*) » sudo ./cadvisor -logtostderr 在浏览器中打开 http://127.0.0.1:8080/containers/ 可以看到response中, 带有TcpState.是否写入了influxdb?打开influx db shellInfluxDB shell 0.9.6.1> show databasesname: databases—————name_internalmydbcadvisor> use cadvisorUsing database cadvisor> show tag keysname: cpu_usage_system———————-tagKeycontainer_namemachine可以看到, 这些tagKey对应grafana中的select column.那么, 是否cadvisor没有写入influxdb呢?cadvisor/storage/influxdb/influxdb.go:174func (self *influxdbStorage) containerStatsToPoints( cInfo *info.ContainerInfo, stats *info.ContainerStats,) (points []*influxdb.Point) { // CPU usage: Total usage in nanoseconds points = append(points, makePoint(serCpuUsageTotal, stats.Cpu.Usage.Total)) // CPU usage: Time spend in system space (in nanoseconds) points = append(points, makePoint(serCpuUsageSystem, stats.Cpu.Usage.System)) // CPU usage: Time spent in user space (in nanoseconds) points = append(points, makePoint(serCpuUsageUser, stats.Cpu.Usage.User)) // CPU usage per CPU for i := 0; i < len(stats.Cpu.Usage.PerCpu); i++ { point := makePoint(serCpuUsagePerCpu, stats.Cpu.Usage.PerCpu[i]) tags := map[string]string{“instance”: fmt.Sprintf("%v”, i)} addTagsToPoint(point, tags) points = append(points, point) } // Load Average points = append(points, makePoint(serLoadAverage, stats.Cpu.LoadAverage)) // Memory Usage points = append(points, makePoint(serMemoryUsage, stats.Memory.Usage)) // Working Set Size points = append(points, makePoint(serMemoryWorkingSet, stats.Memory.WorkingSet)) // Network Stats points = append(points, makePoint(serRxBytes, stats.Network.RxBytes)) points = append(points, makePoint(serRxErrors, stats.Network.RxErrors)) points = append(points, makePoint(serTxBytes, stats.Network.TxBytes)) points = append(points, makePoint(serTxErrors, stats.Network.TxErrors)) self.tagPoints(cInfo, stats, points) return points}结论需要修改cadvisor代码, 将自己需要的metrics加上. ...

April 17, 2019 · 1 min · jiezi

InfluxDB 简介

InfluxDB 是一个时间序列数据库(TSDB), 被设计用来处理高写入、高查询负载,是 TICK 的一部分。TSDB 是针对时间戳或时间序列数据进行优化的数据库,专门为处理带有时间戳的度量和事件或度量而构建的。而时间序列数据可以是随时间跟踪、监视、下采样和聚合的度量或事件,如服务器指标、应用程序性能、网络数据、传感器数据以及许多其他类型的分析数据。关键特性能够高速读取和压缩时间序列数据使用 Go 编写,能够但文件运行,没有依赖提供了简单、高效的 HTTP 读写接口能够使用插件支持其他的数据协议,如: Graphite=, =collectd 和 OpenTSDB可轻松使用 SQL 语言查询聚合数据能够使用 Tag 进行快速高效的查询支持保留策略(Retention Policy), 能够自动清理旧数据支持持续查询,能够自动定期计算聚合数据,提高了查询的效率注意: 开源版本的 InfluxDB 只运行在单个节点上,如需更好的性能或避免单点故障,请使用企业版。安装deepin/Ubuntu/Debiansudo apt install influxdb influxdb-cliArchlinuxyaourt -S influxdb 或 sudo pacman -S influxdb其他请参见:Installing InfluxDB配置InfluxDB 的配置文件为: /etc/influxdb/influxdb.conf ,选项详情请参见:Configuration Settings,这里就不在赘述。基本操作服务相关启用/停止服务systemctl start/stop influxdb.service数据库连接数据库使用 influx 命令连接数据库,参看其帮组手册了解使用方法创建数据库CREATE DATABASE <name>删除数据库DROP DATABASE <name>列出数据库SHOW DATABASES选择数据库USE <name>写入查询InfluxDB 中使用 measurement 表示表, tags 表示表的元数据, fields 表示数据。表的 scheme 不用定义, null 值也不会被存储。tag 可理解为表中需要索引的列, field 是不需要索引的列, point 表示一条记录。tags 之间或 fields 之间使用 ‘,’ 分割, 而 tags 与 fields 之间使用空格分割。删除表DROP MEASUREMENTS <name>列出表SHOW MEASUREMENTS写入数据point 写入的语法如下:<measurement>[,<tag-key>=<tag-value>…] <field-key>=<field-value>[,<field2-key>=<field2-value>…] [unix-nano-timestamp]插入一条 cpu load 的数据: INSERT cpu_load,machine=001,region=ch value=0.56 ,这就向名为 cpu_load 的 measurement 中添加了 tags 为 machine 和 region , fields 为 value 的 point 。不指定 timestamp 时,默认会使用 本地的当前时间 作为 timestamp 。查询数据查询语法:SELECT <field_key>[,<field_key>,<tag_key>] FROM <measurement_name>[,<measurement_name>]查询语句中必须要有 field 存在 ,查询语句还支持 Go 风格的正则,下面给出一些例子。SELECT * FROM cpu_load查询 cpu_load 中的所有 fields 和 tagsSELECT ::field FROM cpu_load查询 cpu_load 中的所有 fieldsSELECT value,machine FROM cpu_load只查询 value 与 machineSELECT value::field,machine::tag FROM cpu_load只查询 value 与 machine ,并限定了类型,如果类型错误将返回 null ,如果所有查询字段的类型都错误将没有 point 返回SELECT * FROM /./查询所有表中的所有字段注意: WHERE 语句后的值不为数字的,必须引起来。更多用法参见: Data exploration using InfluxQL 。删除 PointInfluxDB 不支持 Point 的删除操作,但可以通过 Retention Policy 清理 Point 。SERIESSERIES 是 measurement,<tag1>,<tag2>… 的集合,如之前的写入的 SERIES 就是 cpu_load,machine,region查看语法:SHOW SERIES FROM [measurement],[tag1],[tag2]…FROM 可以不加,如:SHOW SERIES 显示数据库中所有的 seriesSHOW SERIES FROM cpu_load 显示表 cpu_load 中的所有 series删除DROPDROP 将删除所有的记录,并删除所有的索引,语法:DROP SERIES FROM <measurement> WHERE [condition]DELETEDELETE 将删除所有的记录,但不会删除索引,并支持在 WHERE 语句中使用 =timestamp=,语法:DELETE FROM <measurement_name> WHERE [<tag_key>=’<tag_value>’] | [<time interval>]~持续查询连续查询(Continuous Queries 简称 CQ)是 InfluxQL 对实时数据自动周期运行的查询,然后把查询结果写入到指定的 measurement 中。语法如下:CREATE CONTINUOUS QUERY <cq_name> ON <database_name>BEGIN <cq_query>END删除语法: DROP CONTINUOUS QUERY <cq_name> ON <database_name>cq_query 需要一个函数,一个 INTO 子句和一个 GROUP BY time() 子句:SELECT <function[s]> INTO <destination_measurement> FROM <measurement> [WHERE <stuff>] GROUP BY time(<interval>)[,<tag_key[s]>]注意: 在 WHERE 子句中, cq_query 不需要时间范围。 InfluxDB 在执行 CQ 时自动生成 cq_query 的时间范围。cq_query 的 WHERE 子句中的任何用户指定的时间范围将被系统忽略。如创建一个一分钟采样一次 cpu_load 并写入 cpu_load_1min 表的连续查询:CREATE CONTINUOUS QUERY “cpu_load_1min” ON “learn_test"BEGIN SELECT mean(“value”) INTO “cpu_load_1min” FROM “cpu_load” GROUP BY time(1m)ENDvalue 将以 mean 为名保存在 cpu_load_1min 中。更多高级用法参加: InfluxQL Continuous Queries保留策略InfluxDB 是没有提供直接删除数据记录的方法,但是提供数据保存策略,主要用于指定数据保留时间,超过指定时间,就删除这部分数据。可以有多个 RP 并存,但 default 表明默认策略。更多用法参见: Database management using InfluxQL 。列出SHOW RETENTION POLICY ON <database name>创建创建语法:CREATE RETENTION POLICY <retention_policy_name> ON <database_name> DURATION <duration> REPLICATION <n> [SHARD DURATION <duration>] [DEFAULT]REPLICATION 子句确定每个点在集群中存储多少个独立副本,其中 n 是数据节点的数量,对单节点实例无效。碎片持续时间子句确定碎片组覆盖的时间范围,是一个 duration 文字,不支持 INF (infinite) duration 。这个设置是可选的。默认情况下,碎片组的持续时间由保留策略的持续时间决定:RP DurationShard Duration< 2 days1 hour>= 2 days and <= 6 months1 day> 6 months7 days如果 RP Duration 大于 0s 小于 1 hour , Shard Duration 仍将设置为 1 hour 。删除DROP RETENTION POLICY <rp_name>修改ALTER RETENTION POLICY <rp_name> ON <database name> DURATION <duration> REPLICATION <n> [SHARD DURATION <duration>] DEFAULTHTTP 接口/query数据主要使用 query 接口查询,下面给出一些常见用法,而更多用法参见: Querying data with the HTTP API 。创建数据库POST 请求可用于创建数据库,如:curl -X POST http://localhost:8086/query –data-urlencode “q=CREATE DATABASE <database name>“查询curl -X GET http://localhost:8086/query?pretty=true –data-urlencode ‘db=<database name>’ –data-urlencode ‘q=SELECT “field1”,“tag1”… FROM <measurement> WHERE <condition>‘多个查询多个查询语句间用 ; 分割,如:curl -X GET http://localhost:8086/query?pretty=true –data-urlencode ‘db=<database name>’ –data-urlencode ‘q=SELECT “field1”,“tag1”… FROM <measurement> WHERE <condition>;SELECT fields FROM <measurement>‘最大行限制(max-row-limit) 允许使用者限制返回结果的数目,以保护InfluxDB不会在聚合结果的时候导致的内存耗尽。分块(chunking) 可以设置参数 chunked=true 开启分块,使返回的数据是流式的 batch ,而不是单个的返回。返回结果可以按 100 数据点被分块,为了改变这个返回最大的分块的大小,可以在查询的时候加上 chunk_size 参数,例如返回数据点是每 20000 为一个批次。curl -X GET ‘http://localhost:8086/query’ –data-urlencode “db=<name>” –data-urlencode “chunked=true” –data-urlencode “chunk_size=100” –data-urlencode “q=SELECT * FROM cpu_load”/write发送 POST 请求是写入数据的主要方式,,下面给出一些常见用法,而更多用法参见: Writing data with the HTTP API 。插入一条 Pointcurl -X POST http://localhost:8086/write?db=<database name> –data-binary “cpu_load,machine=001,region=cn value=0.56 1555164637838240795"必须指定 database name插入多条 Point多条 Point 之间用行分割,如:curl -X POST http://localhost:8086/write?db=<database name> –data-binary “cpu_load,machine=001,region=cn value=0.56 1555164637838240795cpu_load,machine=001,region=cn value=0.65 1555164637838340795cpu_load,machine=003,region-cn value=0.6 1555164637839240795"如果需要写入 Point 过多,可以将 Point 放入文件中,然后通过 POST 请求上传。文件(cpu_data.txt)内容如:cpu_load,machine=001,region=cn value=0.56 1555164637838240795cpu_load,machine=001,region=cn value=0.65 1555164637838340795cpu_load,machine=003,region-cn value=0.6 1555164637839240795然后上传:curl -X POST http://localhost:8086/write?db=<database name> –data-binary @cpu_data.txt ...

April 14, 2019 · 3 min · jiezi

go + influxdb + grafana 日志监控系统

docker 运行 influxdb grafanadocker 启动 influxdb# 启动 docker$ sudo docker run -d -p 8083:8083 -p8086:8086 –expose 8090 –expose 8099 –name indb -v /data/dockerdata/influxdb:/var/lib/influxdb docker.io/influxdb# 创建数据库和用户$ sudo docker exec -it indb /bin/bash> create User nginx with password ‘123456’> GRANT ALL PRIVILEGES ON monitor TO nginx > CREATE RETENTION POLICY “monitor_retention” ON “monitor” DURATION 30d REPLICATION 1 DEFAULT docker 启动 grafana# grafana 5.10 后创建数据卷需要传入权限, 使用 nginx 反代需要设置 server root# 使用 link 连接其他容器,访问该容器内容直接使用 容器名称sudo docker run -d \ -p 3000:3000 \ -e INFLUXDB_HOST=localhost \ -e INFLUXDB_PORT=8086 \ -e INFLUXDB_NAME=monitor \ -e INFLUXDB_USER=nginx \ -e INFLUXDB_PASS=123456 \ -e “GF_SECURITY_ADMIN_PASSWORD=123456” \ -e “GF_SERVER_ROOT_URL=https://www.amoyiki.com/monitor/” \ -v /data/dockerdata/grafana:/var/lib/grafana \ –link indb:indb \ –user root \ –name grafana \ grafana/grafana配置 grafana 数据源PS Access 使用 Server 即可go 项目编写 go 代码本代码完全照搬慕课网视频教程package mainimport ( “bufio” “encoding/json” “flag” “fmt” “github.com/influxdata/influxdb/client/v2” “io” “log” “net/http” “net/url” “os” “regexp” “strconv” “strings” “time”)type Reader interface { Read(rc chan []byte)}type Writer interface { Write(wc chan *Message)}type LogProcess struct { rc chan []byte // 读取 -> 解析 wc chan *Message // 解析 -> 写入 reader Reader writer Writer}type Message struct { TimeLocal time.Time ByteSent int Path, Method, Scheme, Status string RequestTime float64}type SystemInfo struct { HandleLine int json: "handleLine" // 总日志行数 Tps float64 json: "tps" // 系统吞吐量 ReadChanLen int json: "readChanLen" // read channel 长度 WriteChanLen int json: "wirteChanLen" // wirte channel 长度 RunTime string json:"runTime" // 总运行时间 ErrNum int json:"errTime" // 错误数}const ( TypeHandleLine = 0 TypeErrNum = 1)var TypeMonitorChan = make(chan int, 200)type Monitor struct { startTime time.Time data SystemInfo tpsSlic []int}func (m *Monitor) start(lp *LogProcess) { go func() { for n := range TypeMonitorChan { switch n { case TypeErrNum: m.data.ErrNum += 1 case TypeHandleLine: m.data.HandleLine += 1 } } }() ticker := time.NewTicker(time.Second * 5) go func() { for { <-ticker.C m.tpsSlic = append(m.tpsSlic, m.data.HandleLine) if len(m.tpsSlic) > 2 { m.tpsSlic = m.tpsSlic[1:] } } }() http.HandleFunc("/monitor", func(writer http.ResponseWriter, request *http.Request) { m.data.RunTime = time.Now().Sub(m.startTime).String() m.data.ReadChanLen = len(lp.rc) m.data.WriteChanLen = len(lp.wc) if len(m.tpsSlic) >= 2 { m.data.Tps = float64(m.tpsSlic[1]-m.tpsSlic[0]) / 5 } ret, _ := json.MarshalIndent(m.data, “”, “\t”) io.WriteString(writer, string(ret)) }) http.ListenAndServe(":9999", nil)}type ReadFromFile struct { path string // 读取文件的地址}func (r *ReadFromFile) Read(rc chan []byte) { // 打开文件 f, err := os.Open(r.path) if err != nil { panic(fmt.Sprintln(“open file error: %s”, err.Error())) } // 从文件末尾逐行读取文件内容 f.Seek(0, 2) rd := bufio.NewReader(f) for { line, err := rd.ReadBytes(’\n’) if err == io.EOF { time.Sleep(500 * time.Millisecond) continue } else if err != nil { panic(fmt.Sprintln(“read file error: %s”, err.Error())) } TypeMonitorChan <- TypeHandleLine rc <- line[:len(line)-1] }}type WriteToInfluxDB struct { influxDBsn string}func (w WriteToInfluxDB) Write(wc chan Message) { // 写入模块 infSli := strings.Split(w.influxDBsn, “&”) // Create a new HTTPClient c, err := client.NewHTTPClient(client.HTTPConfig{ Addr: infSli[0], Username: infSli[1], Password: infSli[2], }) if err != nil { log.Fatal(err) } defer c.Close() // Create a new point batch bp, err := client.NewBatchPoints(client.BatchPointsConfig{ Database: infSli[3], Precision: infSli[4], }) if err != nil { log.Fatal(err) } for v := range wc { // Create a point and add to batch tags := map[string]string{“Path”: v.Path, “Method”: v.Method, “Scheme”: v.Scheme, “Status”: v.Status} fields := map[string]interface{}{ “RequestTime”: v.RequestTime, “BytesSent”: v.ByteSent, } pt, err := client.NewPoint(“nginx_log”, tags, fields, v.TimeLocal) if err != nil { log.Fatal(err) } bp.AddPoint(pt) // Write the batch if err := c.Write(bp); err != nil { log.Fatal(err) } // Close client resources if err := c.Close(); err != nil { log.Fatal(err) } log.Println(“write influxdb success …”) }}func (l LogProcess) Process() { // 解析模块 / 139.199.10.130 - - [12/Dec/2018:16:02:34 +0800] “POST /wp-cron.php HTTP/1.1” 200 0 “https://xxx.exmple.com/wp-cron.php?doing_wp_cron=1544601753.9868400096893310546875" “WordPress/4.9.8; https://xxx.exmple.com” “-” “0.058” ([\d.]+)\s+([^ []+)\s+([^ []+)\s+[([^]]+)]\s+"([^”]+)"\s+(\d{3})\s+(\d+)\s+"([^"]+)"\s+"(.?)"\s+"([^"]+)"\s+"([^"]+)" */ r := regexp.MustCompile(([\d\.]+)\s+([^ \[]+)\s+([^ \[]+)\s+\[([^\]]+)\]\s+\"([^"]+)\"\s+(\d{3})\s+(\d+)\s+\"([^"]+)\"\s+\"(.*?)\"\s+\"([^"]+)\"\s+\"([^"]+)\") loc, _ := time.LoadLocation(“Asia/Shanghai”) for v := range l.rc { ret := r.FindStringSubmatch(string(v)) if len(ret) != 14 { TypeMonitorChan <- TypeErrNum log.Println(“FindStringSubmatch fail: “, string(v)) continue } message := &Message{ } t, err := time.ParseInLocation(“02/Jan/2006:15:04:05 +0000”, ret[4], loc) if err != nil { TypeMonitorChan <- TypeErrNum log.Println(“ParseInLocation failed: “, err.Error(), ret[4]) continue } message.TimeLocal = t byteSent, _ := strconv.Atoi(ret[8]) message.ByteSent = byteSent reqSli := strings.Split(ret[6], " “) if len(reqSli) != 3 { TypeMonitorChan <- TypeErrNum log.Println(“strings.Split fail”, ret[6]) continue } message.Method = reqSli[0] u, err := url.Parse(reqSli[1]) if err != nil { TypeMonitorChan <- TypeErrNum log.Println(“url parse fail: “, err) continue } message.Path = u.Path message.Scheme = ret[5] message.Status = ret[7] requestTime, _ := strconv.ParseFloat(ret[12], 64) message.RequestTime = requestTime l.wc <- message }}func main() { var path, influxDsn string // 利用命令行参数 传入配置 flag.StringVar(&path, “path”, “./access.log”, “read file path”) flag.StringVar(&influxDsn, “influxDsn”, “http://localhost:8086&root&123456&imooc&s”, “influx data source”) flag.Parse() r := &ReadFromFile{ path: path, } w := &WriteToInfluxDB{ influxDBsn: influxDsn, } lp := &LogProcess{ rc: make(chan []byte), wc: make(chan *Message), reader: r, writer: w, } go lp.reader.Read(lp.rc) go lp.Process() go lp.writer.Write(lp.wc) m := &Monitor{ startTime: time.Now(), data: SystemInfo{}, } m.start(lp)}编写启动脚本(稍后用 docker 部署时使用)./log_process –path “/var/log/nginx/access.log” –influxDsn “http://indb:8086&root&root&monitor&s” 编译 go 项目go build log_process.go编写DockerfileFROM golang:latest MAINTAINER amoyiki “amoyiki@gmail.com” WORKDIR $GOPATH/src/amoyiki.com/nginxlog ADD . $GOPATH/src/amoyiki.com/nginxlog EXPOSE 9999 ENTRYPOINT [“sh”, “./start_process.sh”] 编译并启动镜像$ sudo docker build -t log_process . # 指定数据卷方便容器读取 nginx 日志, 指定关联 influxdb 容器,确保 go 项目能连接到 influxdb 容器$ sudo docker run –name log1 -d -v /var/log/nginx:/var/log/nginx –link indb:indb log_process # 查看当前所有容器$ sudo docker ps -a最终结果更多文章请访问我的 Blog 四畳半神话大系 ...

December 13, 2018 · 4 min · jiezi