在 VictoriaMetrics 集群版本中,-dedup.minScrapeInterval 用于数据去重,它能够配置在 vmselect 和 vmstorage 的启动参数上:
-
配置在 vmselect 上:
- 因为 vm 存储工夫戳的工夫精度是 millisecond,同一个 vminsert 的数据发往不同 vmstorage 存储时,存储的是雷同的 millisecond;
- 故通常在 vmselect 上配置 -dedup.minScrapeInterval=1ms,这样能够去重不同节点的反复数据;
-
配置在 vmstorage 上:
- 若两个 vmagent 推送雷同的数据时,通常配置 vmstorage 的 -dedup.minScrapeInterval=scrape_interval,这样能够避免单个节点上存储雷同的数据;
VictoriaMetrics stores timestamps with millisecond precision, so -dedup.minScrapeInterval=1ms command-line flag must be passed to vmselect nodes when the replication is enabled, so they could de-duplicate replicated samples obtained from distinct vmstorage nodes during querying.
If duplicate data is pushed to VictoriaMetrics from identically configured vmagent instances or Prometheus instances, then the -dedup.minScrapeInterval must be set to scrape_interval from scrape configs according to deduplication docs.
一. vmselect
vm 存储 timestamps 的精度为 ms,通常配置 vmselect 的
- -dedup.minScrapeInterval=1ms
- 这样在多正本的场景下,能够对查问后果进行去重。
查问过程中,相干的去重代码:
// app/vmselect/netstorage/netstorage.go
func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage.TimeRange) error {
...
dedupInterval := storage.GetDedupInterval() // 读配置参数 -dedup.minScrapeInterval
mergeSortBlocks(dst, sbs, dedupInterval) // 执行合并 (含去重逻辑)
return nil
}
func mergeSortBlocks(dst *Result, sbh sortBlocksHeap, dedupInterval int64) {
...
// 对 timestamps 和 values 的值,用 dedupInterval 去重
timestamps, values := storage.DeduplicateSamples(dst.Timestamps, dst.Values, dedupInterval)
dedups := len(dst.Timestamps) - len(timestamps)
dedupsDuringSelect.Add(dedups)
dst.Timestamps = timestamps
dst.Values = values
}
去重的具体逻辑:
- 以 dedupInterval 为段 (tsNext),每段仅取一个值 (该段中 ts 最大的值);
// lib/storage/dedup.go
func DeduplicateSamples(srcTimestamps []int64, srcValues []float64, dedupInterval int64) ([]int64, []float64) {
....
tsNext := srcTimestamps[0] + dedupInterval - 1
tsNext -= tsNext % dedupInterval
dstTimestamps := srcTimestamps[:0]
dstValues := srcValues[:0]
for i, ts := range srcTimestamps[1:] {
if ts <= tsNext {continue}
dstTimestamps = append(dstTimestamps, srcTimestamps[i])
dstValues = append(dstValues, srcValues[i])
tsNext += dedupInterval
if tsNext < ts {
tsNext = ts + dedupInterval - 1
tsNext -= tsNext % dedupInterval
}
}
dstTimestamps = append(dstTimestamps, srcTimestamps[len(srcTimestamps)-1])
dstValues = append(dstValues, srcValues[len(srcValues)-1])
return dstTimestamps, dstValues
}
二. vmstorage
vmstorage 配置 -dedup.minScrapeInterval 的目标,通常是为了在存储下来重:
- 多个 vmagent 采集雷同的 target,均 push 到 vm;
- 多个 prometheus 实例,雷同的 external_labels,采集雷同的 target,均 push 到 vm;
此时,能够配置 -dedup.minScrapeInterval=scrape_interval,保障一个距离仅存储一个 sample。
vmstorage 在启动时,读取 dedup.minScrapeInterval 的值:
// app/vmstorage/main.go
var (
...
minScrapeInterval = flag.Duration("dedup.minScrapeInterval", 0, "Leave only the last sample in every time series per each discrete interval"+
"equal to -dedup.minScrapeInterval > 0. See https://docs.victoriametrics.com/#deduplication for details")
...
)
func main() {
...
storage.SetDedupInterval(*minScrapeInterval)
...
}
在将内存数据 merge 到硬盘的 parts 过程中,会执行去重:
// lib/storage/block_stream_writer.go
func (bsw *blockStreamWriter) WriteExternalBlock(b *Block, ph *partHeader, rowsMerged *uint64) {
...
b.deduplicateSamplesDuringMerge() // 去重
headerData, timestampsData, valuesData := b.MarshalData(bsw.timestampsBlockOffset, bsw.valuesBlockOffset)
...
fs.MustWriteData(bsw.timestampsWriter, timestampsData)
fs.MustWriteData(bsw.valuesWriter, valuesData)
...
}
对数据反序列化,而后别离对 timestamps 和 values 进行去重:
- dedup.minScrapeInterval <= 0 时,不须要去重;
- 首先,对 block 的数据进行反序列化;
- 而后,调用去重的逻辑,对 timestamps 和 values 去重;
// lib/storage/block.go
func (b *Block) deduplicateSamplesDuringMerge() {if !isDedupEnabled() { //dedup.minScrapeInterval <= 0 时,不须要去重
// Deduplication is disabled
return
}
// Unmarshal block if it isn't unmarshaled yet in order to apply the de-duplication to unmarshaled samples.
// 1. 反序列化
if err := b.UnmarshalData(); err != nil {logger.Panicf("FATAL: cannot unmarshal block: %s", err)
}
srcTimestamps := b.timestamps[b.nextIdx:]
...
dedupInterval := GetDedupInterval()
...
srcValues := b.values[b.nextIdx:]
// 2. 调用去重的逻辑
timestamps, values := deduplicateSamplesDuringMerge(srcTimestamps, srcValues, dedupInterval)
dedups := len(srcTimestamps) - len(timestamps)
b.timestamps = b.timestamps[:b.nextIdx+len(timestamps)]
b.values = b.values[:b.nextIdx+len(values)]
}
具体的去重逻辑:
-
这里的去重逻辑,跟 vmselect 雷同;
- 保障一个 dedupInterval 时间段上,只采纳一个数据点 (该段中 ts 最大的值);
-
区别在于:
- vmselect 是对查问后果去重,去重的对象为 []float64;
- vmstorage 是对 merge 到磁盘的数据去重,去重的对象为 []int64;
// lib/storage/dedup.go
func deduplicateSamplesDuringMerge(srcTimestamps, srcValues []int64, dedupInterval int64) ([]int64, []int64) {
...
tsNext := srcTimestamps[0] + dedupInterval - 1
tsNext -= tsNext % dedupInterval
dstTimestamps := srcTimestamps[:0]
dstValues := srcValues[:0]
for i, ts := range srcTimestamps[1:] {
if ts <= tsNext {continue}
dstTimestamps = append(dstTimestamps, srcTimestamps[i])
dstValues = append(dstValues, srcValues[i])
tsNext += dedupInterval
if tsNext < ts {
tsNext = ts + dedupInterval - 1
tsNext -= tsNext % dedupInterval
}
}
dstTimestamps = append(dstTimestamps, srcTimestamps[len(srcTimestamps)-1])
dstValues = append(dstValues, srcValues[len(srcValues)-1])
return dstTimestamps, dstValues
}
参考:
1.https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html…