在VictoriaMetrics集群版本中,-dedup.minScrapeInterval用于数据去重,它能够配置在vmselect和vmstorage的启动参数上:
配置在vmselect上:
- 因为vm存储工夫戳的工夫精度是millisecond,同一个vminsert的数据发往不同vmstorage存储时,存储的是雷同的millisecond;
- 故通常在vmselect上配置-dedup.minScrapeInterval=1ms,这样能够去重不同节点的反复数据;
配置在vmstorage上:
- 若两个vmagent推送雷同的数据时,通常配置vmstorage的-dedup.minScrapeInterval=scrape_interval,这样能够避免单个节点上存储雷同的数据;
VictoriaMetrics stores timestamps with millisecond precision, so -dedup.minScrapeInterval=1ms command-line flag must be passed to vmselect nodes when the replication is enabled, so they could de-duplicate replicated samples obtained from distinct vmstorage nodes during querying.
If duplicate data is pushed to VictoriaMetrics from identically configured vmagent instances or Prometheus instances, then the -dedup.minScrapeInterval must be set to scrape_interval from scrape configs according to deduplication docs.
一. vmselect
vm存储timestamps的精度为ms,通常配置vmselect的
- -dedup.minScrapeInterval=1ms
- 这样在多正本的场景下,能够对查问后果进行去重。
查问过程中,相干的去重代码:
// app/vmselect/netstorage/netstorage.gofunc (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage.TimeRange) error { ... dedupInterval := storage.GetDedupInterval() //读配置参数-dedup.minScrapeInterval mergeSortBlocks(dst, sbs, dedupInterval) //执行合并(含去重逻辑) return nil}func mergeSortBlocks(dst *Result, sbh sortBlocksHeap, dedupInterval int64) { ... //对timestamps和values的值,用dedupInterval去重 timestamps, values := storage.DeduplicateSamples(dst.Timestamps, dst.Values, dedupInterval) dedups := len(dst.Timestamps) - len(timestamps) dedupsDuringSelect.Add(dedups) dst.Timestamps = timestamps dst.Values = values}
去重的具体逻辑:
- 以dedupInterval为段(tsNext),每段仅取一个值(该段中ts最大的值);
// lib/storage/dedup.gofunc DeduplicateSamples(srcTimestamps []int64, srcValues []float64, dedupInterval int64) ([]int64, []float64) { .... tsNext := srcTimestamps[0] + dedupInterval - 1 tsNext -= tsNext % dedupInterval dstTimestamps := srcTimestamps[:0] dstValues := srcValues[:0] for i, ts := range srcTimestamps[1:] { if ts <= tsNext { continue } dstTimestamps = append(dstTimestamps, srcTimestamps[i]) dstValues = append(dstValues, srcValues[i]) tsNext += dedupInterval if tsNext < ts { tsNext = ts + dedupInterval - 1 tsNext -= tsNext % dedupInterval } } dstTimestamps = append(dstTimestamps, srcTimestamps[len(srcTimestamps)-1]) dstValues = append(dstValues, srcValues[len(srcValues)-1]) return dstTimestamps, dstValues}
二. vmstorage
vmstorage配置-dedup.minScrapeInterval的目标,通常是为了在存储下来重:
- 多个vmagent采集雷同的target,均push到vm;
- 多个prometheus实例,雷同的external_labels,采集雷同的target,均push到vm;
此时,能够配置-dedup.minScrapeInterval=scrape_interval,保障一个距离仅存储一个sample。
vmstorage在启动时,读取dedup.minScrapeInterval的值:
// app/vmstorage/main.govar ( ... minScrapeInterval = flag.Duration("dedup.minScrapeInterval", 0, "Leave only the last sample in every time series per each discrete interval "+ "equal to -dedup.minScrapeInterval > 0. See https://docs.victoriametrics.com/#deduplication for details") ...)func main() { ... storage.SetDedupInterval(*minScrapeInterval) ...}
在将内存数据merge到硬盘的parts过程中,会执行去重:
// lib/storage/block_stream_writer.gofunc (bsw *blockStreamWriter) WriteExternalBlock(b *Block, ph *partHeader, rowsMerged *uint64) { ... b.deduplicateSamplesDuringMerge() //去重 headerData, timestampsData, valuesData := b.MarshalData(bsw.timestampsBlockOffset, bsw.valuesBlockOffset) ... fs.MustWriteData(bsw.timestampsWriter, timestampsData) fs.MustWriteData(bsw.valuesWriter, valuesData) ...}
对数据反序列化,而后别离对timestamps和values进行去重:
- dedup.minScrapeInterval <= 0时,不须要去重;
- 首先,对block的数据进行反序列化;
- 而后,调用去重的逻辑,对timestamps和values去重;
// lib/storage/block.gofunc (b *Block) deduplicateSamplesDuringMerge() { if !isDedupEnabled() { //dedup.minScrapeInterval <= 0时,不须要去重 // Deduplication is disabled return } // Unmarshal block if it isn't unmarshaled yet in order to apply the de-duplication to unmarshaled samples. // 1.反序列化 if err := b.UnmarshalData(); err != nil { logger.Panicf("FATAL: cannot unmarshal block: %s", err) } srcTimestamps := b.timestamps[b.nextIdx:] ... dedupInterval := GetDedupInterval() ... srcValues := b.values[b.nextIdx:] // 2.调用去重的逻辑 timestamps, values := deduplicateSamplesDuringMerge(srcTimestamps, srcValues, dedupInterval) dedups := len(srcTimestamps) - len(timestamps) b.timestamps = b.timestamps[:b.nextIdx+len(timestamps)] b.values = b.values[:b.nextIdx+len(values)]}
具体的去重逻辑:
这里的去重逻辑,跟vmselect雷同;
- 保障一个dedupInterval时间段上,只采纳一个数据点(该段中ts最大的值);
区别在于:
- vmselect是对查问后果去重,去重的对象为[]float64;
- vmstorage是对merge到磁盘的数据去重,去重的对象为[]int64;
// lib/storage/dedup.gofunc deduplicateSamplesDuringMerge(srcTimestamps, srcValues []int64, dedupInterval int64) ([]int64, []int64) { ... tsNext := srcTimestamps[0] + dedupInterval - 1 tsNext -= tsNext % dedupInterval dstTimestamps := srcTimestamps[:0] dstValues := srcValues[:0] for i, ts := range srcTimestamps[1:] { if ts <= tsNext { continue } dstTimestamps = append(dstTimestamps, srcTimestamps[i]) dstValues = append(dstValues, srcValues[i]) tsNext += dedupInterval if tsNext < ts { tsNext = ts + dedupInterval - 1 tsNext -= tsNext % dedupInterval } } dstTimestamps = append(dstTimestamps, srcTimestamps[len(srcTimestamps)-1]) dstValues = append(dstValues, srcValues[len(srcValues)-1]) return dstTimestamps, dstValues}
参考:
1.https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html...