关于influxdb:ETL-是什么-ETL-工具有哪些-ETL-数据交换系统

ETL简介ETL是英文Extract-Transform-Load的缩写。用来形容将数据从源端通过抽取(extract)、转换(transform)、加载(load)至目标端的过程。它可能对各种散布的、异构的源数据(如关系数据)进行抽取。依照事后设计的规定将不残缺数据、反复数据以及谬误数据等“脏"数据内容进行荡涤。失去符合要求的“洁净”数据,并加载到数据仓库中进行存储。这些“洁净”数据就成为了数据分析、数据挖掘的基石。 ETL重要性ETL是实现商务智能(Business Intelligence,BI)的外围。个别状况下,ETL会破费整个BI我的项目三分之一的工夫,因而ETL设计得好坏间接影响BI我的项目的成败。ETL工具有哪些 datastage (免费) 最业余的ETL工具, 2005年被IBM收买,目前倒退到11.7版本。 informatica (免费)informatica创建于1993年,业余水平与Datastage旗鼓相当。 ODI (免费)oracle数据库厂商提供的工具,有局限性,与oracle数据库耦合太深。 kettle(收费)Kettle是一款国外开源的ETL工具,纯java编写,能够在Windows、Linux、Unix上运行,数据抽取高效稳固,但学习及保护老本太高。etl-engine (收费)用go语言实现的ETL工具,轻量级引擎、跨平台(windows,linux,unix,mac)、可嵌入go语言脚本并解析执行,不便集成到各种我的项目中收费下载(GitHub - hw2499/etl-engine: etl engine 轻量级 跨平台 ETL引擎 数据抽取-转换-装载)。

January 12, 2023 · 1 min · jiezi

关于influxdb:Centos-MacBook-Docker离线安装InfluxDB超级简单

CentOS离线装置InfluxDBInfluxDB和IotDB介绍与性能比照 InfluxDB官网下载地址 #解压tar -zxvf influxdb-1.7.10-static_linux_amd64.tar.gz#nohup后盾启动,指定配置文件,输入日志到log.file文件nohup /home/influxdb-1.7.10-1/influxd -config /home/influxdb-1.7.10-1/influxdb.conf >> /home/influxdb-1.7.10-1/log.file 2>&1 &#减少 开机自启动vim /etc/rc.d/rc.local#关上后, 最初增加如下命令nohup /home/influxdb-1.7.10-1/influxd -config /home/influxdb-1.7.10-1/influxdb.conf >> /home/influxdb-1.7.10-1/log.file 2>&1 &终端连贯InfluxDBcd /home/influxdb-1.7.10-1/#连贯InfluxDB 默认没有设置用户名和明码./influx#如果设置了用户名和明码时./influx -username 'admin' -password 'abcd_2021'#近程连贯其余InfluxDB./influx -host 192.168.1.2 -port 8086 -username admin -password abcd_2021开启udp批改配置文件: influxdb.conf [[udp]] enabled = true 开启udp Java调用时 influxDB.write(8089, builder.build()); MacBook离线装置InfluxDB#下载wget https://dl.influxdata.com/influxdb/releases/influxdb-1.6.4_darwin_amd64.tar.gz#解压tar -zxvf influxdb-1.6.4_darwin_amd64.tar.gz#进入目录cd software/influxdb-1.6.4-1/usr/bin #启动服务,默认配置启动./influxd#指定配置启动./influxd -config ../../etc/influxdb/influxdb.conf #连贯influxDB./influx#近程连贯其余InfluxDB./influx -host 192.168.1.2 -port 8086 -username admin -password abcd_2021Docker装置InfluxDBCentos7离线装置Docker #下载镜像docker pull influxdb#运行容器,把配置文件挂载到宿主机/data/influxdb目录,把数据文件也挂载进去,避免数据失落, 挂载时区,不然默认是0时区docker run -d --name my-influxdb \-p 8086:8086 \-p 8083:8083 \-p 2003:2003 \-e INFLUXDB_GRAPHITE_ENABLED=true \-v /data/influxdb/conf/influxdb.conf:/etc/influxdb/influxdb.conf \-v /data/influxdb:/var/lib/influxdb \-v /etc/localtime:/etc/localtime \influxdbInfluxDB应用默认是不必用户名和明码的, 是否开启认证,默认值:false ...

August 17, 2022 · 2 min · jiezi

关于influxdb:InfluxDB和IotDB介绍与性能对比

InfluxDB简介InfluxDB 是用Go语言编写的一个开源分布式时序、事件和指标数据库,无需内部依赖。用于存储和剖析工夫序列数据的开源数据库。适宜存储设备性能、日志、物联网传感器等带工夫戳的数据,其设计指标是实现分布式和程度伸缩扩大。InfluxDB 包含用于存储和查问数据,在后盾解决ETL或监督和警报目标,用户仪表板以及可视化和摸索数据等的API。 InfluxDB是一个由InfluxData开发的开源时序型数据。它由Go写成,着力于高性能地查问与存储时序型数据。InfluxDB被广泛应用于存储系统的监控数据,IoT行业的实时数据等场景。 相似的数据库有Elasticsearch、Graphite、IotDB等。 InfluxDB长处专为工夫序列数据编写的自定义高性能数据存储。 TSM引擎容许高摄取速度和数据压缩齐全用 Go 语言编写。 它编译成单个二进制文件,没有内部依赖项简略,高性能的写入和查问HTTP API插件反对其余数据提取协定,如Graphite,collectd和OpenTSDB专为相似SQL的查询语言量身定制,可轻松查问聚合数据标签容许对系列进行索引以实现疾速无效的查问保留策略无效地主动使过期数据过期间断查问主动计算聚合数据,以进步频繁查问的效率InfluxDB毛病InfluxDB 的开源版本只反对一个节点。开源版本没有集群性能,集群版本须要免费存在前后版本兼容问题存储引擎在变动IotDB简介Apache IoTDB 是用Java语言编写的, 是专为物联网时序数据打造的数据库,提供数据采集、存储、剖析的性能。IoTDB 提供端云一体化的解决方案,在云端,提供高性能的数据读写以及丰盛的查问能力,针对物联网场景定制高效的目录组织构造,并与 Apache Hadoop、Spark、Flink 等大数据系统无缝买通;在边缘端,提供轻量化的 TsFile 治理能力,端上的数据写到本地 TsFile,并提供肯定的根底查问能力,同时反对将 TsFile 数据同步到云端。 IotDB长处国产我的项目,当初国家鼎力推广自主研发,国产化. IotDB是清华自研工夫序列数据库,Apache 孵化我的项目,2014年我的项目启动,2018年11月18号 iotdb 正式进入 apache 孵化器,成为中国高校首个进入 apache 孵化器的我的项目。为用户提供数据收集、存储和剖析等特定的服务轻量级的构造、高性能和可用的个性,以及与Hadoop和Spark生态的无缝集成,满足了工业IoTDB畛域中海量数据存储、高吞吐量数据写入和简单数据分析的需要。灵便的部署策略。IoTDB为用户提供了一个在云平台或终端设备上的一键装置工具,以及一个连贯云平台和终端上的数据的数据同步工具。硬件成本低。IoTDB能够达到很高的磁盘存储压缩比。高效的目录构造。IoTDB反对智能网络设备对简单工夫序列数据结构的高效组织,同类设施对工夫序列数据的组织,海量简单工夫序列数据目录的含糊搜寻策略。高吞吐量读写。IoTDB反对数以百万计的低功耗设施的强连贯数据拜访、高速数据读写,实用于上述智能网络设备和混合设施。丰盛的查问语义。IoTDB反对跨设施和测量的工夫序列数据的工夫对齐、工夫序列字段的计算(频域转换)和工夫维度的丰盛聚合函数反对。学习老本非常低。IoTDB反对相似sql的语言、JDBC规范API和易于应用的导入/导出工具。与先进的开放源码生态系统的无缝集成。IoTDB反对剖析生态系统,如Hadoop、Spark和可视化工具(如Grafana)。IotDB毛病目前只有单节点版本,不过集群版本马上要公布了IoTDB TsFile 的构造,目前仅有 java 版本,资源占用方面对边缘轻量级设施不敌对,限度了其在端/设施侧的利用。存储上反对应用 HDFS 或 本地盘,通过应用 HDFS 来存储可保障存储层高可用,但计算层没有进一步的高可用保障。关系数据库VS键值数据库关系数据库键值数据库单表列数下限: MySQL InnoDB 为1017列可治理海量条工夫序列单表行数不易过多:小于1000万行查问受限(表达能力低)如下:程度、垂直分表;分库按工夫维度的查问,按值维度的查问,多序列的工夫对齐查问时序数据库基于关系数据库基于键值数据库原生时序数据库轻量级时序数据库TimescaleopentsdbKairosDBinfluxdbIodDB基于PG开发的插件基于Hbase/Cassandra基于LSM机制的时序库工业畛域千万条量级工夫序列治理 时序数据主动分区时序分区键专属文件构造单节点万亿数据点治理查问打算做优化定时工作构建索引专属查问优化单节点数十TB级工夫序列数据管理定制并行查问 反对Hadoop、Spark、Matlab、Grafana等多种生态随着导入工夫的减少导入速率一直降落压缩比低,查问慢长期历史数据查问性能降落高效的数据长久化,丰盛/低提早的数据查问InfluxDB和IotDB性能测试测试配置总数据量1500 亿 pointsclient10group num10device num100sensor num100每个 sensor1500 万 pointsencodingRLE继续高压力写入测试。 测试后果 elapseTime(h)elapseRate(points/s)accuTime(h)accuRate(points/s)IoTDB25.51632058.8213.23156988.07InfluxDB38.71077684.2225.81617748.51测试共生成 1500 亿个 points,InfluxDB 总耗时 38.7 小时,IoTDB 总耗时 25.5 小时。测试 过程中,InfluxDB 和 IoTDB 都放弃了安稳的插入速度,中途没有出现异常。 ...

August 17, 2022 · 1 min · jiezi

关于influxdb:influxdb-使用

在InfluxDB当中,并没有表(table)这个概念,取而代之的是MEASUREMENTS,MEASUREMENTS的性能与传统数据库中的表统一,因而咱们也能够将MEASUREMENTS称为InfluxDB中的表。create database test17 CREATE USER testuser WITH PASSWORD 'testpwd' ## 创立用户和设置明码GRANT ALL PRIVILEGES ON cadvisor TO testuser ## 受权数据库给指定用户CREATE RETENTION POLICY "cadvisor_retention" ON "cadvisor" DURATION 30d REPLICATION 1 DEFAULT ## 创立默认的数据保留策略,设置保留工夫30天,正本为1新建表 InfluxDB中没有显式的新建表的语句,只能通过insert数据的形式来建设新curl -i -XPOST 'http://localhost:8086/write?db=test' --data-binary 'cpu_load_short,host=server01,region=us-west value=0.64 1434055562000000000' show retention policies on test; alter retention policy "autogen" on "test" duration 240h replication 1 default;

April 6, 2022 · 1 min · jiezi

关于influxdb:InfluxDB一键安装入门教程

InfluxDB是一个由InfluxData开发的开源时序型数据库,专一于海量时序数据的高性能读、高性能写、高效存储与实时剖析等,在DB-Engines Ranking时序型数据库排行榜上排名第一,广泛应用于DevOps监控、IoT监控、实时剖析等场景。InfluxDB部署简略、使用方便,在技术实现上充分利用了Go语言的个性,无需任何内部依赖即可独立部署。提供相似于SQL的查询语言,接口敌对,使用方便。丰盛的聚合运算和采样能力,提供灵便的数据保留策略(Retention Policy)来设置数据的保留工夫和正本数,在保障数据可靠性的同时,及时删除过期数据,开释存储空间,提供灵便的间断查问(Continues Query)来实现对海量数据的采样。反对协定品种多,除了HTTP、UDP等原生协定,还兼容CollectD、Graphite、OpenTSDB、Prometheus等组件的通信协定。次要个性有:1、内置HTTP接口,使用方便2、数据能够打标记,这样查问能够很灵便3、类SQL的查问语句4、装置治理很简略,并且读写数据很高效5、可能实时查问,数据在写入时被索引后就可能被立刻查出……上面咱们就来学习一下如何简略疾速的装置部署好influxdb服务1.找到influxdb的装置服务可点击试用。 2.装置部署增加节点-抉择版本-填写参数-部署胜利 装置部署过程简略又疾速,具体的装置教程如下:如何增加节点?https://www.bilibili.com/vide...如何装置部署influxdb?https://www.bilibili.com/vide...

November 2, 2021 · 1 min · jiezi

关于influxdb:Influxdb进程内存占用异常分析

景象07/09 15:00: 一波海量申请,向OS申请内存,拉高了heap_sys、heap_inuse;07/09 15:24: 流量峰值过来当前,heap_inuse降下来了(100MB),然而heap_idle还是很高(500MB),导致整体RSS很高; 背景常识Influxdb的内存要害指标: go_memstats_heap_sys_bytes:go过程从OS获取的heap内存;go_memstats_heap_idle_bytes: golang过程中暂未应用的heap内存;go_memstats_heap_inuse_bytes: golang过程以后heap理论应用的内存;go_memstats_heap_sys_bytes = go_memstats_heap_idle_bytes + go_memstats_heap_inuse_bytes从监控图上能够看出,申请过后heap_idle_bytes依然很高,阐明被heap占着没有开释。 起因剖析heap_idle为什么不立刻开释: heap_idle是指存在heap内,临时没有被应用,然而给runtime预留的空间;没有归还给OS是因为,应答前面app对内存的应用,不必再去OS申请了;当产生内存压力的时候,OS会开释掉这块内存,然而“感触到内存压力”可能因为判断不精确,导致OS内的其它过程因为申请不到内存而被OOM;Go开释内存的策略: Go底层用mmap申请内存,用madvise开释内存,参考go/src/runtime/mem_linux.go的代码:madvise将某段内存标记为不再应用时,有2种形式: MADV_DONTNEED标记过的内存如果再次应用,会触发缺页中断(page fault);MADV_FREE标记过的内存,内核会期待内存缓和时才会开释;在开释之前,这块内存仍然能够服用;(linux 4.5版本内核开始反对)显然,MADV_FREE是一种空间换工夫的优化;heap_idle没有被开释的起因是:Go madvise=MADV_FREE,内存被runtime预留没有开释。 解决办法madvise默认配置: Go1.12之前,linux平台下Go runtime应用madvise==MADV_DONTNEED;Go1.12之后,在MADV_FREE可用时默认优先应用MADV_FREE,当然,用户能够在执行程序前增加GODEBUG=madvdontneed=1来批改这一行为;Go1.16(含)之后,为了避免引起混同,Go官网将madvise默认批改为MADV_DONTNEED; commit: https://github.com/golang/go/...故能够选用的解决办法: 执行influx过程时,增加GODEBUG=madvdontneed=1参数,强制批改为MADV_DONTNEED;将influx降级为Go1.16,因为它默认=MADV_DONTNEED;参考https://zhuanlan.zhihu.com/p/...https://github.com/AlexiaChen...https://github.com/golang/go/...https://github.com/golang/go/...https://github.com/VictoriaMe...

October 31, 2021 · 1 min · jiezi

关于influxdb:InfluxDB-TSM存储引擎的读写操作

数据写入数据写入时,首先points按shard划分,归属于一个shard的points一起写入: //tsdb/store.go// WriteToShard writes a list of points to a shard identified by its ID.func (s *Store) WriteToShard(shardID uint64, points []models.Point) error { sh := s.shards[shardID] return sh.WritePoints(points)}//tsdb/shard.go// WritePoints will write the raw data points and any new metadata to the index in the shard.func (s *Shard) WritePoints(points []models.Point) error { ..... // Write to the engine. err := engine.WritePoints(points); .....}由tsm1.Engine负责写入points: 首先,结构数据,由points结构values=map[string][]Values,key=seriesKey+分隔符+fieldName, value=[]Value={timestamp,fieldValue}汇合;而后,将values写入cache;最初,将values写入WAL;//tsdb/engine/tsm1/engine.go// WritePoints writes metadata and point data into the engine.// It returns an error if new points are added to an existing key.func (e *Engine) WritePoints(points []models.Point) error { values := make(map[string][]Value, len(points)) for _, p := range points { keyBuf = append(keyBuf[:0], p.Key()...) keyBuf = append(keyBuf, keyFieldSeparator...) //一个Point中可能含多个field iter := p.FieldIterator() t := p.Time().UnixNano() for iter.Next() { keyBuf = append(keyBuf[:baseLen], iter.FieldKey()...) var v Value switch iter.Type() { case models.Float: fv, err := iter.FloatValue() if err != nil { return err } v = NewFloatValue(t, fv) ...... } values[string(keyBuf)] = append(values[string(keyBuf)], v) } } //先写到cache // first try to write to the cache if err := e.Cache.WriteMulti(values); err != nil { return err } //再写到WAL if e.WALEnabled { if _, err := e.WAL.WriteMulti(values); err != nil { return err } } return seriesErr}数据删除与LSM-Tree相似,influxdb应用标记删除的办法,待执行compactor的时候,再真正的将其删除。在data目录,有.tombstone文件,记录了哪个时间段的数据须要删除: ...

October 5, 2021 · 3 min · jiezi

关于influxdb:InfluxDB-TSM存储引擎的整体结构

TSM存储引擎的存储类tsdb.Store,蕴含索引区和数据区: tsdb.Store //索引 -- map[string]*Index -- map[string]*measurement -- map[string]*series //数据 -- map[uint64]*Shard -- Engine // 形象接口,可插拔设计,目前是 tsm1 存储引擎 -- *WAL -- *Cache -- *Compactor -- *FileStore索引区:map构造,databaseName-->Index构造; Index构造蕴含measurement和series元信息;数据区:map构造,shardId-->Shard构造,一个shard能够了解为独自的tsm引擎,其中蕴含wal、tsm file等数据; tsdb.Store整体构造Store是存储的顶层构造,蕴含索引和数据: //tsdb/store.gotype Store struct { path string // shared per-database indexes, only if using "inmem". indexes map[string]interface{} //key=databaseName, value理论是*Index //所有shards,key=shardId shards map[uint64]*Shard} Store::indexes索引区indexes定义为map[string]interface: key=databaseName,value=*Index。该对象缓存了influxdb中所有的database、retention policy、measurement、series等元信息,若series设计不合理,容易导致内存占用偏高。在influxdb启动时,将初始化该构造,从所有shards下的tsm file中加载index信息,获取其中的meaurement以及series信息,缓存到该构造中。 //tsdb/index/inmem/inmem.go// Index is the in memory index of a collection of measurements, time// series, and their tags.type Index struct { //数据库下measurementName-->*measurement measurements map[string]*measurement // measurement name to object and index //数据库下seriesKey-->*series series map[string]*series // map series key to the Series object database string //数据库名称}measurement构造保留某个measurement下series、tags等信息: ...

October 4, 2021 · 2 min · jiezi

关于influxdb:InfluxDB-存储引擎的演化

InfluxDB的存储引擎,通过3次的演变,最终应用基于LSM-Tree的TSM Tree计划: LSM Tree(LevelDB)-->mmap B+Tree(BoltDB)-->TSM Tree(tsm)LSM-TreeLSM-Tree:Log Structured Merge Tree常见的业务场景分两类: 读多写少:比方MySQL/ETCD等存储系统,底层大都采纳B-Tree及其变种;写多读少:比方时序数据库;LSM-Tree的核心思想是充分利用磁盘的程序写性能远高于随机写这一个性,将批量的随机写转化为一次性的程序写。LevelDB是LSM-Tree的一种实现。 因为写多读少并且是按工夫程序写的个性,使得InfluxDB非常适合LSM-Tree;然而InfluxDB在集成LevelDB中遇到了一些问题: 不反对热备份:须要停机备份;过期数据的批量删除反对不好:因为LSM-Tree的删除操作代价较高 为了解决这个问题,InfluxDB依据工夫将数据分为多个shard,每个shard作为一个LevelDB存储,过期时可间接删除Shard;随机数据量的减少,InfluxDB创立了越来越多的LevelDB数据库,产生大量的SSTable file,占用了大量的文件句柄,常常报错;mmap B+TreeBoltDB是mmap B+Tree的一种实现,它将每个数据库存储为1个文件,解决了LevelDB文件句柄有余的问题。 应用mmap B+Tree取得了较好的读性能,然而写性能经常出现高IOPS: 写入时序数据时,若key设置的不合理,容易变成随机写;(B+Tree的个性,参考MySQL)更新索引数据时,因为索引没有人造的排序字段,很容易随机写,导致性能有余;TSM-TreeTSM-Tree: Time Structured Merge TreeInfluxDB最终回归LSM-Tree,对其进行优化,转化为本人的数据引擎TSM-Tree: TSM-Tree实质还是LSM-Tree,InfluxDB对数据查问、数据合并压缩、数据革除做了优化;对数据查问:减少了数据索引和布隆过滤器以放慢查问速度; 数据索引:包含元数据索引、TSM File索引;布隆过滤器:疾速判断TSM File中是否蕴含特定的seriesKey;对数据压缩:依据不同的数据类型,采纳不同压缩算法;对数据革除:由shard存储一段时间内的数据,过期间接删除shard;参考:1.https://wingsxdu.com/post/dat...2.https://docs.influxdata.com/i...

October 2, 2021 · 1 min · jiezi

关于influxdb:InfluxDB集群-antientropy源码分析

data节点上的anti-entropy,会主动查看本节点上短少的shard,并主动从peer节点上copy短少的shard。 //services/anti-entropy/service.gofunc (s *Service) Open() error { if !s.cfg.Enabled { return nil } //查看 go s.CheckWorker() //修复 go s.ProcessMissingShards(s.ProcessMissingShard) return nil}查看短少的shard应用配置文件中的checkInterval定期检查: //services/anti-entropy/service.gofunc (s *Service) CheckWorker() { ticker := time.NewTicker(time.Duration(s.cfg.CheckInterval)) for { select { case <-s.closing: return case <-ticker.C: s.Check() } }}查找missShard的过程: 找到所有本机负责的并已不再写入数据的shard:now() > shard.EndTime;若本机上没有该shard信息并且shard文件不存在(每个shard1个文件),则找到1个短少的shard;查找该shard的peer节点,并产生1个missShard发送到channel,后续将从peer中copy shard的数据;//services/anti-entropy/service.gofunc (s *Service) Check() { dir := s.svrInfo.Path() //不再写入数据的shard,now() > endTime shards := s.MetaClient.ColdShardIDsByNode(s.Node.GetDataID()) for db, rp := range shards { for r, ss := range rp { for _, sh := range ss { shardDir := filepath.Join(dir, db, r, strconv.FormatUint(sh.ID, 10)) _, err := os.Stat(shardDir) //本机上短少的shard if storeSH := s.svrInfo.Shard(sh.ID); storeSH == nil || os.IsNotExist(err) { //找peer var peerOwners []meta.ShardOwner // remove my node id for i, owner := range sh.Owners { if owner.NodeID == s.Node.GetDataID() { peerOwners = append(sh.Owners[:i], sh.Owners[i+1:]...) break } } //产生1个missShard if len(sh.Owners) > 1 { s.missingShards <- ShardInfo{ID: sh.ID, Path: shardDir, Database: db, RetentionPolicy: r, PeerOwners: peerOwners} } } } } }}修复短少的shard修复时,先期待一段时间(11min),避免因shard还没写入tsm file而产生shard短少,而后确认本机上的shard文件是否短少,最初调用s.CopyShard()执行shard复制。 ...

September 30, 2021 · 2 min · jiezi

关于influxdb:InfluxDB集群-antientropy概念解析

influxdb的data节点是AP一致性,可用性优先,然而数据要最终统一。 anti-entropy意为“反熵”,熵简略的了解成shard间的不统一;anit-entropy运行在data节点上,通过比照shard间的不统一,进行shard数据修复,实现数据的最终统一。 anti-entropy的配置anti-entropy默认是敞开的: ### [anti-entropy]### Anti Entropy is used to check missing shards on data node.### If missing shards are found it will copy the shards from other replications.### Please don't enable anit-entropy if replication is 1.###[anti-entropy] enabled = false # check-interval = "30s" # The anti entropy service will check the missing shards very check-interval seconds.当被开启时,node上失落的分片会被主动修复,而内容不统一的分片须要手动(命令行)确认修复。 shard修复策略失落的分片: goroutine定期检查node上短少的分片(元数据中有,然而本地没有shard文件);对短少的每个分片,启动1个goroutine从peer节点copy这个分片;内容不统一的分片:分片熵的检测和修复流程如下 典型利用场景Data节点因为磁盘故障等起因不可用,则只需将新的data节点退出集群,新节点上的anti-entropy会主动从其它节点copy分片。 因为硬件降级等起因,替换正在运行的data节点,新节点上的anti-entropy会主动从其它节点copy分片。 anti-entropy与hinted-handoffanti-entropy和hinted-handoff都用来保障data节点AP一致性,最终达到数据的最终统一。 hh不能解决节点更换的场景; hh解决的是远端节点临时无响应时,先在本机缓存下来,待远端节点复原时,再将数据发往远端节点;anti-entropy能够主动修复整个分片短少的场景,当产生分片内容不统一时,须要手动命令行修复。

September 30, 2021 · 1 min · jiezi

关于influxdb:InfluxDB集群-hintedhandoff源码分析三points发送到远端节点

在本机节点上,给每个远端节点调配一个NodeProcessor对象,负责数据的写入和数据的读取。 NodeProcessor定期的读取本机队列中的数据,而后将其发送给远端节点,尝试写入远端shard。 // services/hh/node_processor.gofunc (n *NodeProcessor) run() { ...... for { case <-time.After(currInterval): limiter := NewRateLimiter(n.RetryRateLimit) //限流 for { c, err := n.SendWrite() ... limiter.Update(c) time.Sleep(limiter.Delay()) } }}读取和发送过程首先,ping远端节点,判断其是否沉闷,如果不沉闷,则间接返回;而后,读Head segment中的block,读操作均是从segment中的pos处开始;而后,将读到的block数据反序列化,发送给远端节点;最初,更新head segment中的pos;// services/hh/node_processor.go// SendWrite attempts to sent the current block of hinted data to the target node. If successful,// it returns the number of bytes it sent and advances to the next block. Otherwise returns EOF// when there is no more data or the node is inactive.func (n *NodeProcessor) SendWrite() (int, error) { //ping远端节点,判断是否沉闷 active, err := n.Active() if err != nil { return 0, err } //读head segment中:pos开始的block // Get the current block from the queue buf, err := n.queue.Current() //反序列化 // unmarshal the byte slice back to shard ID and points shardID, points, err := unmarshalWrite(buf) //写入远端节点 if err := n.writer.WriteShard(shardID, n.nodeID, points); err != nil { atomic.AddInt64(&n.stats.WriteNodeReqFail, 1) return 0, err } //更新head segment的pos,下次依然从pos读取 if err := n.queue.Advance(); err != nil { n.Logger.Info("failed to advance queue for node", zap.Uint64("nodeid", n.nodeID), zap.Error(err)) } return len(buf), nil}1. 判断节点是否沉闷先依据nodeId查问node信息(ip): ...

September 29, 2021 · 4 min · jiezi

关于influxdb:InfluxDB集群-hintedhandoff源码分析二points写入到本机队列

每个远端节点对应一个NodeProcessor,由NodeProcessor负责将远端写入失败的points,写入到本机队列: // services/hh/node_processor.go// WriteShard writes hinted-handoff data for the given shard and node. Since it may manipulate// hinted-handoff queues, and be called concurrently, it takes a lock during queue access.func (n *NodeProcessor) WriteShard(shardID uint64, points []models.Point) error { .... //序列化 b := marshalWrite(shardID, points) //写入本机队列 return n.queue.Append(b)}本机队列的格局每个NodeProcess有1个queue,对应于$data/hh的一个目录: 每个queue对应一个目录,默认最大1Gi;queue内蕴含若干个segment,从tail写入,从head读出;每个segment对应一个文件,默认最大1Mi;segment内蕴含若干个block,每个block含8byte length以及data组成;segment开端蕴含1个8byte的footer,保留了读取的地位,从header读出时更新; 写入过程:序列化写入队列的points,依照固定的格局序列化: 8byte: shardId;point.String() + "\n";point.String() + "\n";......func marshalWrite(shardID uint64, points []models.Point) []byte { b := make([]byte, 8) binary.BigEndian.PutUint64(b, shardID) for _, p := range points { b = append(b, []byte(p.String())...) b = append(b, '\n') } return b}写入过程:写segment从队列的tail segment写入: ...

September 29, 2021 · 2 min · jiezi

关于influxdb:InfluxDB集群-hintedhandoff源码分析一整体流程

influxdb集群场景下,1个shard有N个replcia,通常replica在不同的节点上;写shard时,所有的replica都要写入。 当远端的replica写入失败时,会先存储到本机的hinted-handoff队列;本机会定期的将hinted-handoff队列的内容发送给远端节点,达到数据的最终统一。 Hinted-handoff代码入口对于每个shard,都要向其shardOwner写入points: 远端节点因为网络起因可能写入失败,此时将本来发往远端节点的points,调用hinted-handoff.WriteShard()写入本机hh队列;// cluster/points_writer.go// writeToShards writes points to a shard and ensures a write consistency level has been met. If the write// partially succeeds, ErrPartialWrite is returned.func (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPolicy string, consistency models.ConsistencyLevel, points []models.Point) error { ...... ch := make(chan *AsyncWriteResult, len(shard.Owners)) // 向shard owner写shards for _, owner := range shard.Owners { go func(shardID uint64, owner meta.ShardOwner, points []models.Point) { .... // 写shard err := w.ShardWriter.WriteShard(shardID, owner.NodeID, points) if err != nil{ // 写入失败的话,入队hh hherr := w.HintedHandoff.WriteShard(shardID, owner.NodeID, points) .... if consistency == models.ConsistencyLevelAny { ch <- &AsyncWriteResult{owner, nil} return } } ch <- &AsyncWriteResult{owner, err} }(shard.ID, owner, points) } ......}Hinted-handoff写入数据在本机节点的$data/hh目录下,给每个远端节点创立了一个$id目录,其中缓存了写入远端节点失败的数据: ...

September 29, 2021 · 3 min · jiezi

关于influxdb:InfluxDB集群-write写入数据源码分析二

上文讲到,每个shard有N个replica,也就是N个replica owner,通常是不同的节点,否则就起不到高可用的作用了。 # influxd_ctl show-shardsID GroupID Database RetentionPolcy Replicas Owners StartTime EndTime44 44 prometheus one_week 2 node3:8088,node1:8088 2021-09-26 08:00:00 +0800 CST 2021-09-27 08:00:00 +0800 CST写shard时,要写入所有的replica owner,可能是本机节点,也可能是远端节点。 // cluster/points_writer.gofunc (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPolicy string, ... ch := make(chan *AsyncWriteResult, len(shard.Owners)) for _, owner := range shard.Owners { go func(shardID uint64, owner meta.ShardOwner, points []models.Point) { //以后节点写shard if w.Node.GetDataID() == owner.NodeID { err := w.TSDBStore.WriteToShard(shardID, points) if err == tsdb.ErrShardNotFound { err = w.TSDBStore.CreateShard(database, retentionPolicy, shardID, true) ... err = w.TSDBStore.WriteToShard(shardID, points) } ch <- &AsyncWriteResult{owner, err} return } //远端节点写shard err := w.ShardWriter.WriteShard(shardID, owner.NodeID, points) ch <- &AsyncWriteResult{owner, err} }(shard.ID, owner, points) } var wrote int for range shard.Owners { select { case result := <-ch: wrote++ // 写入胜利的次数 >= required if wrote >= required { return nil } } } ... return ErrWriteFailed} 本地节点写shard本地节点写shard,最终调用的是存储引擎的WritePoints: ...

September 28, 2021 · 2 min · jiezi

关于influxdb:InfluxDB集群-write写入数据源码分析一

Client通过POST /write向influxdb集群写入时序数据: curl -i -XPOST 'http://localhost:8086/write?db=mydb' --data-binary 'cpu_load_short,host=server01,region=us-west value=0.64 1434055562000000000'influxdb集群中的数据分shard在不同的节点上存储,client写入时序数据时(单条或批量): 有的数据须要写入以后节点;有的数据须要写入远端节点;在写入时,所有shard都写入胜利时,才认为该写入申请胜利。整体流程: node1在8086上接管/write申请,而后依据写入的数据,确定数据存储到shard1和shard2上;shard1和shard2都写入胜利,才算数据写入胜利;集群状况下,每个shard至多有2个replica,假如有2个replica,shard1存储到node1和node2上两份正本;shard1有2个replica,shard1写入胜利与request中传入的consistency无关; consistency: 写入的一致性级别consistency参数,由client在request中传入,标识了shard有N个replica的状况下,如何确定shard是否写入胜利。 如果client没有传入consistency参数,server端默认ConsistencyLevelOne,即只有一个replica写入OK,就返回client胜利。 consistency参数: all: 所有的replica都写入胜利则返回胜利;quorum: 大多数的replica写入胜利,则返回胜利;one: 任何一个replica写入胜利,则返回胜利;any: 任何一个replica写入胜利,或者被写入Hinted-Handoff缓存,则返回胜利;以3节点,2replica为例: Levelrequiredall2quorum2one1any1// cluster/points_writer.gofunc (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPolicy string, consistency models.ConsistencyLevel, points []models.Point) error { // The required number of writes to achieve the requested consistency level required := len(shard.Owners) switch consistency { case models.ConsistencyLevelAny, models.ConsistencyLevelOne: required = 1 case models.ConsistencyLevelQuorum: required = required/2 + 1 } ......}数据写入:代码流程POST /write的解决入口: ...

September 28, 2021 · 3 min · jiezi

关于influxdb:InfluxDB集群-query查询数据源码分析一

client通过GET /query查问influxdb集群中的指标数据: curl -G 'http://ops1:8086/query?pretty=true' --data-urlencode "db=falcon" --data-urlencode "q=SELECT * FROM \"cpu.user\" order by time desc limit 10"influxdb集群中的数据分shard在不同的节点上存储,client查问的指标数据,可能不在以后节点上,也可能以后节点和其它节点上都有,所以在查问时,即须要查问以后节点,也须要查问近程节点,而后将数据合并后返回client。 整体流程: node1在8086上接管/query申请,而后依据查问条件,确定指标数据在哪些节点上(node1&node2);依据查问条件,本机查问指标数据,失去localData;向远端节点node2发送查问申请,失去remoteData;将localData和remoteData合并返回client; HTTP handlerhttp handler入口: // services/httpd/handler.gofunc (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user meta.User) { ...... // Execute query. results := h.QueryExecutor.ExecuteQuery(q, opts, closing) ......}执行查问,可能有多个查问语句: // query/executor.gofunc (e *Executor) ExecuteQuery(query *influxql.Query, opt ExecutionOptions, closing chan struct{}) <-chan *Result { results := make(chan *Result) go e.executeQuery(query, opt, closing, results) return results}func (e *Executor) executeQuery(query *influxql.Query, opt ExecutionOptions, closing <-chan struct{}, results chan *Result) { ...... for ; i < len(query.Statements); i++ { ..... err = e.StatementExecutor.ExecuteStatement(stmt, ctx) ..... } .....}本地和远端查问对于每个statement,其查问过程: ...

September 27, 2021 · 2 min · jiezi

关于influxdb:InfluxDB集群-移除DataNode源码分析

influxdb集群中,client在node1上执行remove dataNode node3的命令: influxd_ctl remove-data node3:8088整体流程如下: node1收到CLI命令,向本人的8084发送GET /remove-data申请,request body: {"node":"node3:8088"};node1通过metaClient向集群发送DeleteDataNode();node3收到/remove-data申请后: 先告诉集群删除数据节点:metaClient.DeleteDataNode();再删除该节点上的shards数据; CLI命令解决命令行解析的代码入口: // cmd/influxd_ctl/cli/cli.gofunc (c *CommandLine) Run() error { switch cmd { case "remove-data": return do_remove_data(c) }}向本机的集群治理端口8084,发送GET /remove-data: func do_remove_data(c *CommandLine) error { var node string force := "false" .... node = fs.Args()[len(fs.Args())-1] if o.Force { force = "true" } // 向本人的8084发送HTTP remove-data url := c.getURL("remove-data", map[string]string{"node": node, "force": force}) resp, err := http.Get(url) if err != nil { return err } defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) ... return nil}node1上/remove-data的解决: ...

September 25, 2021 · 3 min · jiezi

关于influxdb:InfluxDB集群-添加DataNode源码分析

influxdb集群中,client在node1上执行add dataNode node3的命令: influxd_ctl add-data node3:8088整体流程如下: node1收到CLI命令,向本人的8084发送GET /add-data申请,request body: {"node": "node3:8088"};node1向node3的8088端口发送TCP音讯AddDataNode;node3收到AddDataNode后,被动将本人退出集群; CLI命令解决命令行解析的代码入口: // cmd/influxd_ctl/cli/cli.gofunc (c *CommandLine) Run() error { switch cmd { case "add-data": return do_add_data(c) }}向本机的集群治理端口8084,发送GET /add-data: func do_add_data(c *CommandLine) error { var node string node = c.CMD[1] // 向本机的8084发送http add-data url := c.getURL("add-data", map[string]string{"node": node}) resp, err := http.Get(url) if err != nil { return err } defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) ...... return nil}node1向node3发送AddDataNodenode1解决GET /add-data的handler: ...

September 24, 2021 · 3 min · jiezi

关于influxdb:InfluxDB集群-移除MetaNode源码分析

influxdb集群中,client在node1上执行remove metaNode node3的命令: influxd_ctl remove-meta node3:8091整体流程如下: node1收到CLI命令,向本人8084端口发送GET /remove-meta的申请,request body: {"metahttp:": "node3:8091"};node1向node3发送GET http://node3:8091/ping,探测node3是否存活;node1执行metaclient.DeleteMetaNode():Raft中删除该节点信息;node向node3发送GET http://node3:8091/remove-meta,node3做一些清理操作; CLI命令解决命令行代码入口: // cmd/influxd_ctl/cli/cli.gofunc (c *CommandLine) Run() error { .... switch cmd { case "remove-meta": return do_remove_meta(c) }}向本人的8084发送GET /remove-meta: func do_remove_meta(c *CommandLine) error { // flag读命令行的参数 fs := flag.NewFlagSet("", flag.ExitOnError) o := RemoveMetaOptions{} fs.BoolVar(&o.Force, "force", false, "force remove meta node") fs.Parse(c.CMD[1:]) httpAddr := fs.Args()[len(fs.Args())-1] force := "false" if o.Force { force = "true" } // 向本身的8084发送remote-meta,body中带要删除节点的httpAddr url := c.getURL("remove-meta", map[string]string{"metahttp": httpAddr, "force": force}) resp, err := http.Get(url) if err != nil { return err } defer resp.Body.Close() .....}admin_cluster监听8084端口,负责集群的治理性能,/remove-meta的handler: ...

September 23, 2021 · 4 min · jiezi

关于influxdb:InfluxDB集群-查询database源码分析

curl向influxdb集群的任意meta节点发送查问database申请,都能够返回集群中所有database信息: curl -i -XPOST http://node-1:8086/query --data-urlencode "q=show databases"原理是每个meta节点都保护了最新的snapshot信息,当有查问申请时,返回本地的snapshot中的databases信息;snapshot信息由后盾goroutine定期的向leader查问失去。 整体流程: HTTP handler与create database的流程相似,咱们从ExecuteStatement开始: // cluster/statement_executor.gofunc (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx *query.ExecutionContext) error { ...... switch stmt := stmt.(type) { case *influxql.ShowDatabasesStatement: rows, err = e.executeShowDatabasesStatement(stmt, ctx) } .....}同样的,还是metaClient调用,这次是Database()函数: func (e *StatementExecutor) executeShowDatabasesStatement(q *influxql.ShowDatabasesStatement, ctx *query.ExecutionContext) (models.Rows, error) { dis := e.MetaClient.Databases() a := ctx.ExecutionOptions.Authorizer row := &models.Row{Name: "databases", Columns: []string{"name"}} for _, di := range dis { // Only include databases that the user is authorized to read or write. if a.AuthorizeDatabase(influxql.ReadPrivilege, di.Name) || a.AuthorizeDatabase(influxql.WritePrivilege, di.Name) { row.Values = append(row.Values, []interface{}{di.Name}) } } return []*models.Row{row}, nil}这里的Database()返回的是client保留的cacheData: ...

September 22, 2021 · 2 min · jiezi

关于influxdb:InfluxDB集群-创建database源码分析

创立database,能够通过CLI命令,也能够通过HTTP,两种形式走的是同一套逻辑流程。 本文以HTTP为例,剖析集群模式下创立database的源码流程。 curl -i -XPOST http://localhost:8086/query --data-urlencode "q=CREATE DATABASE mydb"database是集群的元信息,须要Raft强统一;create database的request若被发送给follower节点,则返回NotLeader的redirect响应(带leader的url),client从新向Leader节点发送request。 整体流程: HTTP handler8086是httpd的监听端口,其hander: // services/httpd/handler.gofunc NewHandler(c Config) *Handler { ...... h.AddRoutes([]Route{ { "query", // Query serving route. "POST", "/query", true, true, h.serveQuery, }, })}持续向下走: // services/httpd/handler.gofunc (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user meta.User) { ... // Execute query. results := h.QueryExecutor.ExecuteQuery(q, opts, closing) ...}// query/executor.gofunc (e *Executor) executeQuery(query *influxql.Query, opt ExecutionOptions, closing <-chan struct{}, results chan *Result) { ...... // Send any other statements to the underlying statement executor. err = e.StatementExecutor.ExecuteStatement(stmt, ctx) ......}辨认到Create database语句: ...

September 22, 2021 · 3 min · jiezi

关于influxdb:InfluxDB集群-节点部署命令的源码分析

上文剖析到,InfluxDB集群的部署,波及到3个命令: influxd过程的启动;增加集群的data-nodes;查问集群的节点信息;本文联合源码,剖析每一步具体都是怎么实现的。 influxd过程启动命令: # influxd -config /etc/influxdb/influxdb.conf -join ops1:8091,ops2:8091,ops3:8091 --hostname ops3这里重点剖析-join参数。1.读取参数 //cmd/influxd/run/command.gofunc (cmd *Command) ParseFlags(args ...string) (Options, error) { ....... fs.StringVar(&options.Join, "join", "", "") return options, nil}// Run parses the config from args and runs the server.func (cmd *Command) Run(args ...string) error { // Propogate the top-level join options down to the meta config if config.Join != "" { config.Meta.JoinPeers = strings.Split(options.Join, ",") } ....}2.将peers退出Raft将joinPeers作为判断条件,找到所有的raftAddr,而后传入Raft-lib作为初始节点: //services/meta/store.go// open opens and initializes the raft store.func (s *store) open(raftln net.Listener) error { joinPeers = s.config.JoinPeers var initializePeers []string for { peers := c.peers() if !Peers(peers).Contains(s.raftAddr) { peers = append(peers, s.raftAddr) } if len(s.config.JoinPeers)-len(peers) == 0 { initializePeers = peers break } } // Open the raft store. if err := s.openRaft(initializePeers, raftln); err != nil ......}增加data nodes命令: ...

September 20, 2021 · 2 min · jiezi

关于influxdb:InfluxDB集群-节点部署过程

如果集群节点较少,一个node会同时负责meta和data,node上部署的过程同时具备meta和data的治理性能。 启动参数对于3节点的InfluxDB集群,其启动参数: //ops1节点# influxd -config /etc/influxdb/influxdb.conf -join ops1:8091,ops2:8091,ops3:8091 --hostname ops1//ops2节点# influxd -config /etc/influxdb/influxdb.conf -join ops1:8091,ops2:8091,ops3:8091 --hostname ops2//ops3节点# influxd -config /etc/influxdb/influxdb.conf -join ops1:8091,ops2:8091,ops3:8091 --hostname ops3启动结束后,应用集群治理命令,能够查问以后集群的节点: # influxd_ctl show-nodes============Meta Nodes============ID Host TCPHost1 ops2:8091 ops2:80882 ops3:8091 ops3:80883 ops1:8091 ops1:8088============Data Nodes============ID Host TCPHost能够看到,meta nodes均已被辨认。 增加data nodes在其中一个节点上执行: # influxd_ctl add-data ops1:8088# influxd_ctl add-data ops2:8088# influxd_ctl add-data ops3:8088再查问集群中以后节点: # influxd_ctl show-nodes============Meta Nodes============ID Host TCPHost1 ops2:8091 ops2:80882 ops3:8091 ops3:80883 ops1:8091 ops1:8088============Data Nodes============ID Host TCPHost1 178.104.163.69:8086 ops2:80882 178.104.163.239:8086 ops3:80883 178.104.163.205:8086 ops1:8088能够看到,data-nodes均已退出集群。 ...

September 20, 2021 · 1 min · jiezi

关于influxdb:InfluxDB集群-整体部署架构

influxdb从0.12版本开始不再将其cluster源码开源。 基于influxdb 1.6.0单机版源码分支,将0.12的cluster源码cherry-pick过去,能够失去v1.6.0版本的集群版源码。 分布式InfluxDB集群有两种节点:meta节点和data节点: meta节点负责管理集群的元数据信息,比方database/retention policy等;data节点负责保留集群的时序数据;分布式InfluxDB集群的一致性来说: meta节点要求强一致性,即在节点间始终保持统一,通过Raft实现;data节点要求最终一致性,即时序数据最终保障统一即可,通过hh,anti-entropy实现;在理论部署中,如果节点较少,通常1个节点同时负责meta和data的职责,看一下它的整体构造: 每个节点应用4个端口: 8086:负责接管解决client的/query和/write操作;8084:负责接管解决client的集群治理操作,比方增加/删除节点等;8088:负责底层Raft的RPC通信(TCP),传入Raft lib即可;8091:负责节点间HTTP查问和操作Raft信息,比方/join、/execute等;端口8086配置文件中指定: [http] # Determines whether HTTP endpoint is enabled. # enabled = true # The bind address used by the HTTP service. bind-address = "0.0.0.0:8086"代码中应用该端口: //services/httpd/service.go// NewService returns a new instance of Service.func NewService(c Config) *Service { s := &Service{ addr: c.BindAddress, https: c.HTTPSEnabled, cert: c.HTTPSCertificate, key: c.HTTPSPrivateKey, limit: c.MaxConnectionLimit, err: make(chan error), unixSocket: c.UnixSocketEnabled, unixSocketPerm: uint32(c.UnixSocketPermissions), bindSocket: c.BindSocket, Handler: NewHandler(c), Logger: zap.NewNop(), } .....}端口8084次要负责集群治理API,比方show-nodes/add-data/add-meta等:配置文件中指定: ...

September 20, 2021 · 2 min · jiezi

关于influxdb:InfluxDB-查询measurement的tag和field字段

influxdb的记录是以行协定存储的,行协定中蕴含measurement、tag set、field set和timestamp。 拿到measurement的数据当前,如何晓得哪些是tag字段,哪些是field字段呢? 以上面的时序数据为例: > select * from cpu_used_percent order by time limit 3name: cpu_used_percenttime dstype endpoint step value---- ------ -------- ---- -----2021-07-13T08:42:00Z GAUGE dfe16db6-719f-4394-ad08-1bafd072576b 60 8.5291198965230992021-07-13T08:42:00Z GAUGE dfe16db6-719f-4394-ad08-1bafd072576b 60 7.0762562238525962021-07-13T08:42:00Z GAUGE dfe16db6-719f-4394-ad08-1bafd072576b 60 7.157941185740853查问tag字段:> show tag keys from cpu_used_percent;name: cpu_used_percenttagKey------dstypeendpointstep能够看到dsType/endpoint/step是tag字段,tag的value是string。 查问field字段:> show field keys from cpu_used_percent;name: cpu_used_percentfieldKey fieldType-------- ---------value float能够看到value是field字段,因为value能够是float/int/string/bool等类型,这里的value是float。

September 19, 2021 · 1 min · jiezi

关于influxdb:InfluxDB-influxQL的group和fill

group是influxQL中常见的聚合函数,罕用于按工夫聚合(一段时间内的最大/最小/均匀);若在聚合时没有足够的数据点,可指定fill填充特定的值。 group和fill的语法SELECT <function>(<field_key>) FROM_clause WHERE <time_range> GROUP BY time(time_interval,[<offset_interval])[,tag_key] [fill(<fill_option>)]fill()用于填充没有数据的时序序列的值,其选项为: null: 默认,显示工夫戳但value=null的序列;none:在后果中不显示该工夫戳的序列;数值:fill(0),填充0;linear: 线性插入数值;previous: 填充前一个序列的值;demo原始数据:两头短少了09:05:00工夫点的数据 > select * from cpu_used_percent where endpoint='Datacenter_172.118.16.19' and time >= '2021-09-17T09:00:00Z' and time <= '2021-09-17T09:10:00Z';name: cpu_used_percenttime dstype endpoint step value---- ------ -------- ---- -----2021-09-17T09:00:00Z GAUGE Datacenter_172.118.16.19 60 2.23412737988669142021-09-17T09:01:00Z GAUGE Datacenter_172.118.16.19 60 2.70634968575972222021-09-17T09:02:00Z GAUGE Datacenter_172.118.16.19 60 2.8373020899093862021-09-17T09:03:00Z GAUGE Datacenter_172.118.16.19 60 2.19444483317467182021-09-17T09:04:00Z GAUGE Datacenter_172.118.16.19 60 2.4007940760771732021-09-17T09:06:00Z GAUGE Datacenter_172.118.16.19 60 2.0634924290250082021-09-17T09:07:00Z GAUGE Datacenter_172.118.16.19 60 2.1984130878458742021-09-17T09:08:00Z GAUGE Datacenter_172.118.16.19 60 2.2698416719275092021-09-17T09:09:00Z GAUGE Datacenter_172.118.16.19 60 2.4722226601588082021-09-17T09:10:00Z GAUGE Datacenter_172.118.16.19 60 1.9841273356009694默认:fill(null),显示value=null ...

September 18, 2021 · 2 min · jiezi

关于influxdb:InfluxDB-influxQL的模糊查询

influxQL查问measurement中的指标时,可应用含糊查问。 查问measurement中字段蕴含特定字符串: select * from cpu_used_percent where endpoint=~/192.168.0.1/ and time > '2021-09-11T00:00:00Z' order by time desc limit 10;查问measurement中字段以某个字符串起始: select * from cpu_used_percent where endpoint=~/^datacenter/ and time > '2021-09-11T00:00:00Z' order by time desc limit 10;查问meaurement中字段以某个字符串完结: select * from cpu_used_percent where endpoint=~/vm-$/ and time > '2021-09-11T00:00:00Z' order by time desc limit 10;//查问以Datacenter结尾、vm-40完结的时序数据select * from cpu_used_percent where endpoint=~/^Datacenter.*vm-40$/ order by time desc limit 10;

September 17, 2021 · 1 min · jiezi

关于influxdb:InfluxDB-数据的导入导出

influxdb反对将时序数据导出到文件,而后再将文件导入到数据库中,以此实现数据的迁徙。 导出导出命令的语法格局: # influx_inspect export --helpExports TSM files into InfluxDB line protocol format.Usage: influx_inspect export [flags] -compress Compress the output -database string Optional: the database to export -datadir string Data storage path (default "/root/.influxdb/data") -end string Optional: the end time to export (RFC3339 format) -out string Destination file to export to (default "/root/.influxdb/export") -retention string Optional: the retention policy to export (requires -database) -start string Optional: the start time to export (RFC3339 format) -waldir string WAL storage path (default "/root/.influxdb/wal")数据导出demo: ...

September 17, 2021 · 2 min · jiezi

关于influxdb:InfluxDB-Series解析

influxdb中,series是一个很重要的概念,它是retentionPolicy、measurement、tag set雷同的汇合,蕴含了监控对象的元数据信息,series的数量=RP*measurement*tag set。 一般来讲,监控对象稳固后,series根本是固定的;influxdb将series放在内存作为索引,放慢了数据查问,这使得series的数量不能太大,否则influxdb内存会被撑爆,默认单个database内series限度为<100W个。 // influxdb.conf# The maximum series allowed per database before writes are dropped. This limit can prevent# high cardinality issues at the database level. This limit can be disabled by setting it to# 0.# max-series-per-database = 1000000series汇合的例子:每一行代表一个series 序号retention policymeasuermenttag set1autogencpu.used.percentlocation=sh,node=node10012autogencpu.used.percentlocation=bj,node=node30013autogencpu.used.percentlocation=sz,node=5001查问series信息查问所有的series# influx -execute 'show series on opsultra'统计所有的series数量# influx -execute 'show series on opsultra' |wc -l将series导出到csv# influx -execute 'show series on opsultra' -format 'csv' > /tmp/series.csv查问series cardinalitycardinality反馈了series的维度,即不同的series的数量: 查问series的维度# influx -execute 'show series cardinality on opsultra'将series的维度导出到csv: 按measurement进行cardinality进行统计# influx -execute 'show series exact cardinality on opsultra' -format 'csv' > /tmp/series.csv参考:https://jasper-zhang1.gitbook...https://docs.influxdata.com/i... ...

September 17, 2021 · 1 min · jiezi

关于influxdb:InfluxDB-Retention-Policy解析

Retention Policy(RP)是数据保留工夫策略,超过了肯定的工夫后,老的数据会被主动删除。 联合CQ(Continuous Query)和RP,能够将历史数据保留为低精度,最近的数据保留为高精度,以升高存储用量。 RP的语法结构:CREATE RETENTION POLICY <retention_policy_name> ON <database_name> DURATION <duration> REPLICATION <n> [SHARD DURATION <duration>] [DEFAULT]其中: duration指定了数据保留的工夫,当超过了这个工夫后,数据被主动删除;replication指定每个shard的正本数,默认为1,集群场景须要>=2;shard duration实际上指定每个shardGroup保留数据的工夫长度,能够不传入,零碎会依据duration主动计算一个值;default指定是否默认的RP,若RP为默认,创立database未指定RP时,就应用默认的RP;influxdb内置了一个默认策略autogen: duration=0s示意永不过期;shardGroupDuration=168h示意1个shardGroup保留7天的数据;> show retention policies;name duration shardGroupDuration replicaN default---- -------- ------------------ -------- -------autogen 0s 168h0m0s 1 trueshardGroup vs shard:shardGroup蕴含若干个shard;shardGroup指定保留1段时间的数据,shardGroup下所有shard的数据都位于这个工夫范畴内; 看一下单机版influxdb,rp=autogen,replica=1的shards状况: > show shard groups;name: shard groupsid database retention_policy start_time end_time expiry_time-- -------- ---------------- ---------- -------- -----------25 falcon autogen 2020-04-27T00:00:00Z 2020-05-04T00:00:00Z 2020-05-04T00:00:00Z33 falcon autogen 2020-05-04T00:00:00Z 2020-05-11T00:00:00Z 2020-05-11T00:00:00Z42 falcon autogen 2020-05-11T00:00:00Z 2020-05-18T00:00:00Z 2020-05-18T00:00:00Z51 falcon autogen 2020-05-18T00:00:00Z 2020-05-25T00:00:00Z 2020-05-25T00:00:00Z60 falcon autogen 2020-05-25T00:00:00Z 2020-06-01T00:00:00Z 2020-06-01T00:00:00Z69 falcon autogen 2020-06-01T00:00:00Z 2020-06-08T00:00:00Z 2020-06-08T00:00:00Z78 falcon autogen 2020-06-08T00:00:00Z 2020-06-15T00:00:00Z 2020-06-15T00:00:00Z每个shardGroup保留7day的数据,1个shardGroup蕴含1个shard: ...

September 16, 2021 · 2 min · jiezi

关于influxdb:InfluxDB-Continuous-Query解析

Continuous Query(CQ)是外部周期运行的InfluxQL的查问,将查问后果写入指定的measurement,用它能够实现降采样downsampling。 联合CQ和RP(Retention Policy),能够将历史数据保留为低精度,最近的数据保留为高精度,以升高存储用量。 Continuous Query的语法结构:CREATE CONTINUOUS QUERY <cq_name> ON <database_name>BEGIN<cq_query>END其中的<cq_query>须要一个函数,一个INTO字句和一个GROUP BY time()字句,其语句构造: SELECT <function[s]> INTO <destination_measurement> FROM <measurement> [WHERE <stuff>] GROUP BY time(<interval>)[,<tag_key[s]>]举个简略的例子将cpu.user数据进行5m均匀: CREATE CONTINUOUS QUERY "cq_5" ON falcon BEGIN SELECT mean(value) INTO falcon.autogen."5_cpu.user" FROM falcon.autogen."cpu.user" GROUP BY time(5m), * END其中的falcon.autogen."5_cpu.user",falcon示意database,autogen示意retention policy,5_cpu.user示意新的table。 //查问cq> show continuous queries;name: falconname query---- -----cq_5 CREATE CONTINUOUS QUERY cq_5 ON falcon BEGIN SELECT mean(value) INTO falcon.autogen."5_cpu.user" FROM falcon.autogen."cpu.user" GROUP BY time(5m), * END//查问cq中的数据> select * from "5_cpu.user" order by time desc limit 10;name: 5_cpu.usertime endpoint mean---- -------- ----2020-06-09T04:55:00Z trovedev 6.8062524000384982020-06-09T04:50:00Z trovedev 13.7962891284793352020-06-09T04:45:00Z trovedev 16.572126105753662020-06-09T04:40:00Z trovedev 28.5775916914236352020-06-09T04:35:00Z trovedev 6.028823347020752020-06-09T04:30:00Z trovedev 4.6673982823260422020-06-09T04:25:00Z trovedev 4.7462371716753452020-06-09T04:20:00Z trovedev 5.0061695746605942020-06-09T04:15:00Z trovedev 5.0022800377362612020-06-09T04:10:00Z trovedev 4.775274987155538值得注意的是,CQ仅对实时的数据进行聚合,不会对历史数据聚合;也就是说,如果没有新的源数据,就不会产生新的聚合点。 ...

September 16, 2021 · 1 min · jiezi

关于influxdb:InfluxDB-行协议

influxdb行协定是读取和写入的时序数据格式,格局为: +-----------+--------+-+---------+-+---------+|measurement|,tag_set| |field_set| |timestamp|+-----------+--------+-+---------+-+---------+比方时序数据: weather,location=us-midwest temperature=82 1465839830100400200cpu_used_percent,dstype=GAUGE,endpoint=monitor_server,step=60 value=66 1614217980000000000参数定义必填数据类型Measurement表名Y字符串Tag-key/value标签key/valueNkey/value都是字符串Field-key/value字段key/valueYkey是字符串, value能够是字符串/浮点数/整型/布尔值Timestamp工夫戳YUnix工夫,ns精度Measurement指标名称,能够了解为MySQL中的table,例子中的measurement=weath,cpu_used_percent。 Tag-key/value标签key和value,均是字符串类型,多个tag用,分隔: <tag-key>=<tag-value>,<tag-key>=<tag-value>为放慢查问,influxdb在tag-key上创立了索引。 Field-key/value字段key/value,key是字符串类型,value能够是字符串/数值/布尔类型;多个field用,分隔: <field-key>=<field-value>,<field-key>=<field-value>当field value是字符串类型时,在执行influxQL时须要用""包裹字符串: INSERT weath,location=us-midwest temperature="hot"最罕用的是将value放入field,比方: insert cpu_used_percent,endpoint=monitor_server,step=60 value=66 Timestamp工夫戳是纳秒精度的Unix工夫戳;当INSERT未指定timestamp时,应用以后工夫。 参考:1.https://jasper-zhang1.gitbook...

September 16, 2021 · 1 min · jiezi

关于influxdb:influxdb基础1-安装使用

一、参考influxdb 学习系列目录 ——更新ing Install InfluxDB 二、装置下载链接 cd work/envmkdir influxdbcd influxdbmv ~/Desktop/influxdb2-2.0.8-darwin-amd64.tar.gz ./tar zxvf influxdb2-2.0.8-darwin-amd64.tar.gz三、根本应用3.1 服务端运行cd /Users/yz/work/env/influxdb/influxdb2-2.0.8-darwin-amd64./influxd3.2 应用golang客户端 写入数据依赖装置 go get github.com/influxdata/influxdb-client-go/v2

August 20, 2021 · 1 min · jiezi

关于influxdb:influxdb-学习系列目录-更新ing

一、根底(文档学习)elasticsearch根底(1)——概念

August 19, 2021 · 1 min · jiezi

关于influxdb:InfluxDb使用

<!--引入maven--><dependency> <groupId>org.influxdb</groupId> <artifactId>influxdb-java</artifactId> <version>2.15</version></dependency>/*** influxdb 连贯配置类*/@Configurationpublic class InfluxDbConfig { @Value("${spring.influx.url:''}") private String influxDBUrl; @Value("${spring.influx.user:''}") private String userName; @Value("${spring.influx.password:''}") private String password; @Value("${spring.influx.database:''}") private String database; @Value("${spring.influx.retentionPolicy:''}") private String retentionPolicy; public static String tabName; @Bean public InfluxDbUtils influxDbUtils() { return new InfluxDbUtils(userName, password, influxDBUrl, database, retentionPolicy); } @Value("${spring.influx.tabName:''}") public void setTabName(String tabName) { InfluxDbConfig.tabName = tabName; }}/*** 操作工具类*/@Data@Slf4jpublic class InfluxDbUtils { private String userName; private String password; private String url; public String database; public String tabName; // 保留策略 private String retentionPolicy; // InfluxDB实例 private InfluxDB influxDB; /** * @param userName 数据库用户名 * @param password 数据库明码 * @param url 数据库连贯 * @param database 数据库名称 * @param retentionPolicy 保留策略,默认autogen策略(数据保留7天,SHOW RETENTION POLICIES ON database) */ public InfluxDbUtils(String userName, String password, String url, String database, String retentionPolicy) { this.userName = userName; this.password = password; this.url = url; this.database = database; this.retentionPolicy = StringUtils.isBlank(retentionPolicy) ? "autogen" : retentionPolicy; this.influxDB = influxDbBuild(); } /** * 连贯数据库 ,若不存在则创立 * * @return influxDb实例 */ private InfluxDB influxDbBuild() { if (influxDB == null) { influxDB = InfluxDBFactory.connect(url, userName, password); } try { createDB(database); influxDB.setDatabase(database); } catch (Exception e) { log.error("create influx db failed, error: {}", e.getMessage()); } finally { influxDB.setRetentionPolicy(retentionPolicy); } influxDB.setLogLevel(InfluxDB.LogLevel.BASIC); return influxDB; } /**** * 创立数据库 * @param database */ private void createDB(String database) { influxDB.query(new Query("CREATE DATABASE " + database)); }//具体应用办法InfluxDbUtils influxDbUtils = SpringUtil.getBean(InfluxDbUtils.class);influxDB = influxDbUtils.getInfluxDB();Point.Builder point = Point.measurement(InfluxDbConfig.tabName);point.time(System.currentTimeMillis(), TimeUnit.MILLISECONDS);point.tag("code","");point.addField("name","");influxDB.write(point.build());

May 22, 2021 · 2 min · jiezi

关于prometheus:可观测告警运维系统调研SLS告警与多款方案对比

简介:本文介绍比照多款告警监控运维平台计划,笼罩阿里云SLS、Azure、AWS、自建零碎(ELK、Prometheus、TICK)等计划。前言本篇是SLS新版告警系列宣传与培训的第三篇,后续咱们会推出20+系列直播与实战培训视频,敬请关注。系列目录(继续更新)• 一站式云原生智能告警运维平台——SLS新版告警公布!• 这才是可观测告警运维平台——20个SLS告警运维场景• 可观测告警运维零碎调研——SLS告警与多款计划比照(本篇)1. 什么是SLS告警运维零碎1.1. SLS(日志服务)是什么SLS是阿里云上云原生观测剖析平台,为Log/Metric/Trace等数据提供大规模、低成本、实时平台化服务。目前对内曾经是“阿里巴巴 + 蚂蚁金服”零碎的数据总线,数年稳固撑持双十一、双十二、新春红包流动。对外则曾经服务阿里云几十万企业客户。 1.2. SLS新版告警——一站式智能告警运维零碎SLS新版告警在中国站等公布公测(国内站预计4月公布),新版在SLS云原生可观测性平台上提供了一站式智能运维告警零碎。新版告警提供对日志、时序等各类数据的告警监控,亦可承受三方告警,对告警进行降噪、事件治理、告诉治理等,新增40+性能场景,充分考虑研发、运维、平安以及经营人员的告警监控运维需要。 1.3. 劣势应用SLS新版告警,能够无效缓解典型告警运维零碎的痛点,和其余自建、商业化或云厂商提供的计划比,具备如下5大劣势: 2. 与自建计划比照2.1. 与ELK X-Pack 告警 (Watcher/KibanaAlert)比照2.1.1. 简介自建计划ELK示意应用开源计划ElasticSearch + Logstash + Kibana组合,然而其不包含告警性能,须要额定购买X-Pack商业包,会领有2个告警性能,一个是ElasticSearch附带的Watcher,一个是Kibana 7.x+新增的Alert性能,留神:这两个告警性能相互独立,并不能协同和关联。2.1.2. 评估比照 2.2. 与Prometheus & Loki(含AlertManager)告警比照2.2.1. 简介自建计划Prometheus&Loki示意应用开源计划Prometheus + Loki + AlertManager组合,其中Prometheus Alert针对时序进行告警监控,Loki针对日志进行告警监控,两者独特将告警发送给Alert Manager进行告警治理。2.2.2. 评估比照 2.3. 与InfluxDB 2.0 告警(含Kapa 2.3.1. 简介自建计划InfluxDB示意应用开源计划InfluxDB OSS 2.0 + kapacitor组合搭建告警监控零碎,如果须要集群部署性能,还须要购买InfluxDB商业版本。留神,该计划只能反对针对时序数据的告警监控。2.3.2. 评估比照 3. 与其余云厂商计划比照3.1. 与AWSCloudWatch 告警 +SNS+SSM比照3.1.1. 简介AWS告警监控计划,须要依赖AWS CloudWatch告警加上多个其余AWS服务达到告警监控和治理的能力。个别应用CloudWatch Alarm + SNS + System Manager OpsCenter组合的形式实现对日志、时序的监控治理。CloudWatch Logs反对日志的采集,但理论监控告警时,须要先转换成时序才行。3.1.2. 评估比照![上传中...]()3.2. 与AzureMonitor 告警比照3.2.1. 简介Azure Monitor反对残缺的基于时序与日志的监控,并很好集成了上下游计划提供残缺的告警监控与告警治理告诉性能。3.2.2. 评估比照 ...

April 19, 2021 · 1 min · jiezi

关于influxdb:Centos-安装-influxdb

下载安装包拜访 https://portal.influxdata.com/downloads/influxdb/2.0/db/ 抉择平台下载安装包。 此处抉择生成命令行: wget https://dl.influxdata.com/influxdb/releases/influxdb2-2.0.4.x86_64.rpmsudo yum localinstall influxdb2-2.0.4.x86_64.rpm服务启动$ service influxdb start # 启动服务$ service influxdb status # 查看状态

March 29, 2021 · 1 min · jiezi