共计 4719 个字符,预计需要花费 12 分钟才能阅读完成。
Open-falcon 的 graph 组件应用 rrd 保留指标数据,rrd 因为是本地存储,单机故障容易引起数据失落,Open-falcon 提供了双写的逻辑 (transfer 双写 2 个 graph 节点) 以加强可用性。
graph 的指标数据的存储,可抉择不同的 TSDB,比方 InfluxDB、OpenTSDB 等。
本文次要联合源码,介绍 Open-falcon 如何应用 rrd 保留和查问数据。
1. rrd 文件的命名
rrd 文件的命名形式:
md5=md5sum(metric, endpoint, tags)
md5[0:2]_md5_dsType_step.rrd
看一下代码中的实现:
// RRDTOOL UTILS
// 监控数据对应的 rrd 文件名称
func RrdFileName(baseDir string, md5 string, dsType string, step int) string {return baseDir + "/" + md5[0:2] + "/" +
md5 + "_" + dsType + "_" + strconv.Itoa(step) + ".rrd"
}
md5 的计算:将 endpoint/metric/tags 拼成 string,而后 md5sum(string)
//MD5 的值
func Checksum(endpoint string, metric string, tags map[string]string) string {pk := PK(endpoint, metric, tags)
return Md5(pk)
}
func PK(endpoint, metric string, tags map[string]string) string {ret := bufferPool.Get().(*bytes.Buffer)
ret.Reset()
defer bufferPool.Put(ret)
if tags == nil || len(tags) == 0 {ret.WriteString(endpoint)
ret.WriteString("/")
ret.WriteString(metric)
return ret.String()}
ret.WriteString(endpoint)
ret.WriteString("/")
ret.WriteString(metric)
ret.WriteString("/")
ret.WriteString(SortedTags(tags))
return ret.String()}
比方:trovedev 这台机器的 agent.alive 的数据,该指标将被存储在 be/be3cdc0fce0d498e19e2c6e9f1f338b6_GAUGE_60.rrd 文件中:
# echo -n "trovedev/agent.alive" | md5sum
be3cdc0fce0d498e19e2c6e9f1f338b6 -
# ls -alh be/be3cdc0fce0d498e19e2c6e9f1f338b6_GAUGE_60.rrd
-rw-r--r-- 1 root root 70K 6 月 2 09:43 be/be3cdc0fce0d498e19e2c6e9f1f338b6_GAUGE_60.rrd
2. rrd 文件的创立
rrd 文件被创立时,除了在本地磁盘上新建了一个 rrd 文件,还指定了指标的归档策略,也就是指定保留多久、如何聚合:
比方:原始的数据 1min1 个点,保留 12hour;原始的数据每隔 5min 被聚合 (AVERAGE) 成 1 个点,原始的数据被删掉;
// modules/graph/rrdtool/rrdtool.go
const (
RRA1PointCnt = 720
RRA5PointCnt = 576
RRA20PointCnt = 504
RRA180PointCnt = 766
RRA720PointCnt = 730
)
func create(filename string, item *cmodel.GraphItem) error {now := time.Now()
start := now.Add(time.Duration(-24) * time.Hour)
step := uint(item.Step)
c := rrdlite.NewCreator(filename, start, step)
c.DS("metric", item.DsType, item.Heartbeat, item.Min, item.Max)
// 设置各种归档策略
// 1min1 个点,存 12hour
c.RRA("AVERAGE", 0, 1, RRA1PointCnt) //RRA1PointCnt=720: 1min1 个点,存 720 个点,即 12hour
// 5m1 个点,存 2d
c.RRA("AVERAGE", 0, 5, RRA5PointCnt) //RRA5PointCnt=576:5min1 个点,存 576 个点,即 2d
c.RRA("MAX", 0, 5, RRA5PointCnt)
c.RRA("MIN", 0, 5, RRA5PointCnt)
// 20m1 个点,存 7d
c.RRA("AVERAGE", 0, 20, RRA20PointCnt) //RRA20PointCnt=504: 20m1 个点,共 504 个点,即 7d
c.RRA("MAX", 0, 20, RRA20PointCnt)
c.RRA("MIN", 0, 20, RRA20PointCnt)
.......
return c.Create(true)
}
能够应用 rrdtool 查问 rrd 文件的信息:其中保留了配置的归档策略
# rrdtool info be/be3cdc0fce0d498e19e2c6e9f1f338b6_GAUGE_60.rrd
filename = "be/be3cdc0fce0d498e19e2c6e9f1f338b6_GAUGE_60.rrd"
rrd_version = "0003"
step = 60
last_update = 1591062120
header_size = 3080
ds[metric].index = 0
ds[metric].type = "GAUGE"
ds[metric].minimal_heartbeat = 120
ds[metric].min = NaN
ds[metric].max = NaN
ds[metric].last_ds = "1"
ds[metric].value = 0.0000000000e+00
ds[metric].unknown_sec = 0
rra[0].cf = "AVERAGE"
rra[0].rows = 720
rra[0].cur_row = 385
rra[0].pdp_per_row = 1
rra[0].xff = 0.0000000000e+00
rra[0].cdp_prep[0].value = NaN
rra[0].cdp_prep[0].unknown_datapoints = 0
rra[1].cf = "AVERAGE"
rra[1].rows = 576
rra[1].cur_row = 543
rra[1].pdp_per_row = 5
rra[1].xff = 0.0000000000e+00
rra[1].cdp_prep[0].value = 2.0000000000e+00
rra[1].cdp_prep[0].unknown_datapoints = 0
rra[2].cf = "MAX"
rra[2].rows = 576
rra[2].cur_row = 24
rra[2].pdp_per_row = 5
rra[2].xff = 0.0000000000e+00
rra[2].cdp_prep[0].value = 1.0000000000e+00
rra[2].cdp_prep[0].unknown_datapoints = 0
rra[3].cf = "MIN"
.....
3. 应用 rrd 文件查问数据
先看下如何间接查问 rrd 文件中的指标数据,比方查问 2020-06-02 09:30:46 ~ 2020-06-02 09:39:46 这段时间,每 60s1 个点的数据:
# rrdtool fetch be/be3cdc0fce0d498e19e2c6e9f1f338b6_GAUGE_60.rrd AVERAGE -r 60 -s 1591061446 -e 1591061986
metric
1591061460: 1.0000000000e+00
1591061520: 1.0000000000e+00
1591061580: 1.0000000000e+00
1591061640: 1.0000000000e+00
1591061700: 1.0000000000e+00
1591061760: 1.0000000000e+00
1591061820: 1.0000000000e+00
1591061880: 1.0000000000e+00
1591061940: 1.0000000000e+00
1591062000: 1.0000000000e+00
再看一下代码中如何查问的:
- filename: 指标所在的文件;
- cf: 指标聚合办法,比方 AVERAGE;
- start,end: 查问的起始 / 终止工夫;
- step: 指标的间隔时间;
// modules/graph/rrdtool/rrdtool.go
func fetch(filename string, cf string, start, end int64, step int) ([]*cmodel.RRDData, error) {start_t := time.Unix(start, 0)
end_t := time.Unix(end, 0)
step_t := time.Duration(step) * time.Second
fetchRes, err := rrdlite.Fetch(filename, cf, start_t, end_t, step_t)
if err != nil {return []*cmodel.RRDData{}, err}
defer fetchRes.FreeValues()
values := fetchRes.Values()
size := len(values)
ret := make([]*cmodel.RRDData, size)
start_ts := fetchRes.Start.Unix()
step_s := fetchRes.Step.Seconds()
for i, val := range values {ts := start_ts + int64(i+1)*int64(step_s)
d := &cmodel.RRDData{
Timestamp: ts,
Value: cmodel.JsonFloat(val),
}
ret[i] = d
}
return ret, nil
}
4. 保留数据到 rrd 文件
通过 rrdlite.Updater 来实现,先将指标数据写入 cache,而后一起 update 到 rrd 文件:
// modules/graph/rrdtool/rrdtool.go
func update(filename string, items []*cmodel.GraphItem) error {u := rrdlite.NewUpdater(filename)
for _, item := range items {v := math.Abs(item.Value)
if v > 1e+300 || (v < 1e-300 && v > 0) {continue}
if item.DsType == "DERIVE" || item.DsType == "COUNTER" {u.Cache(item.Timestamp, int(item.Value))
} else {u.Cache(item.Timestamp, item.Value)
}
}
return u.Update()}
正文完