乐趣区

关于监控工具:Openfalcon-graph使用RRD保存指标数据

Open-falcon 的 graph 组件应用 rrd 保留指标数据,rrd 因为是本地存储,单机故障容易引起数据失落,Open-falcon 提供了双写的逻辑 (transfer 双写 2 个 graph 节点) 以加强可用性。

graph 的指标数据的存储,可抉择不同的 TSDB,比方 InfluxDB、OpenTSDB 等。

本文次要联合源码,介绍 Open-falcon 如何应用 rrd 保留和查问数据。

1. rrd 文件的命名

rrd 文件的命名形式:

md5=md5sum(metric, endpoint, tags)
md5[0:2]_md5_dsType_step.rrd

看一下代码中的实现:

// RRDTOOL UTILS
// 监控数据对应的 rrd 文件名称
func RrdFileName(baseDir string, md5 string, dsType string, step int) string {return baseDir + "/" + md5[0:2] + "/" +
        md5 + "_" + dsType + "_" + strconv.Itoa(step) + ".rrd"
}

md5 的计算:将 endpoint/metric/tags 拼成 string,而后 md5sum(string)

//MD5 的值
func Checksum(endpoint string, metric string, tags map[string]string) string {pk := PK(endpoint, metric, tags)
    return Md5(pk)
}

func PK(endpoint, metric string, tags map[string]string) string {ret := bufferPool.Get().(*bytes.Buffer)
    ret.Reset()
    defer bufferPool.Put(ret)

    if tags == nil || len(tags) == 0 {ret.WriteString(endpoint)
        ret.WriteString("/")
        ret.WriteString(metric)
        return ret.String()}
    ret.WriteString(endpoint)
    ret.WriteString("/")
    ret.WriteString(metric)
    ret.WriteString("/")
    ret.WriteString(SortedTags(tags))
    return ret.String()}

比方:trovedev 这台机器的 agent.alive 的数据,该指标将被存储在 be/be3cdc0fce0d498e19e2c6e9f1f338b6_GAUGE_60.rrd 文件中:

# echo -n "trovedev/agent.alive" | md5sum
be3cdc0fce0d498e19e2c6e9f1f338b6  -
# ls -alh be/be3cdc0fce0d498e19e2c6e9f1f338b6_GAUGE_60.rrd
-rw-r--r-- 1 root root 70K 6 月   2 09:43 be/be3cdc0fce0d498e19e2c6e9f1f338b6_GAUGE_60.rrd

2. rrd 文件的创立

rrd 文件被创立时,除了在本地磁盘上新建了一个 rrd 文件,还指定了指标的归档策略,也就是指定保留多久、如何聚合:
比方:原始的数据 1min1 个点,保留 12hour;原始的数据每隔 5min 被聚合 (AVERAGE) 成 1 个点,原始的数据被删掉;

// modules/graph/rrdtool/rrdtool.go
const (
    RRA1PointCnt   = 720 
    RRA5PointCnt   = 576 
    RRA20PointCnt  = 504 
    RRA180PointCnt = 766 
    RRA720PointCnt = 730 
)

func create(filename string, item *cmodel.GraphItem) error {now := time.Now()
    start := now.Add(time.Duration(-24) * time.Hour)
    step := uint(item.Step)

    c := rrdlite.NewCreator(filename, start, step)
    c.DS("metric", item.DsType, item.Heartbeat, item.Min, item.Max)

    // 设置各种归档策略
    // 1min1 个点,存 12hour
    c.RRA("AVERAGE", 0, 1, RRA1PointCnt)    //RRA1PointCnt=720: 1min1 个点,存 720 个点,即 12hour

    // 5m1 个点,存 2d
    c.RRA("AVERAGE", 0, 5, RRA5PointCnt)    //RRA5PointCnt=576:5min1 个点,存 576 个点,即 2d
    c.RRA("MAX", 0, 5, RRA5PointCnt)
    c.RRA("MIN", 0, 5, RRA5PointCnt)

    // 20m1 个点,存 7d
    c.RRA("AVERAGE", 0, 20, RRA20PointCnt)    //RRA20PointCnt=504: 20m1 个点,共 504 个点,即 7d
    c.RRA("MAX", 0, 20, RRA20PointCnt)
    c.RRA("MIN", 0, 20, RRA20PointCnt)

    .......

    return c.Create(true)
}

能够应用 rrdtool 查问 rrd 文件的信息:其中保留了配置的归档策略

# rrdtool info be/be3cdc0fce0d498e19e2c6e9f1f338b6_GAUGE_60.rrd
filename = "be/be3cdc0fce0d498e19e2c6e9f1f338b6_GAUGE_60.rrd"
rrd_version = "0003"
step = 60
last_update = 1591062120
header_size = 3080
ds[metric].index = 0
ds[metric].type = "GAUGE"
ds[metric].minimal_heartbeat = 120
ds[metric].min = NaN
ds[metric].max = NaN
ds[metric].last_ds = "1"
ds[metric].value = 0.0000000000e+00
ds[metric].unknown_sec = 0
rra[0].cf = "AVERAGE"
rra[0].rows = 720
rra[0].cur_row = 385
rra[0].pdp_per_row = 1
rra[0].xff = 0.0000000000e+00
rra[0].cdp_prep[0].value = NaN
rra[0].cdp_prep[0].unknown_datapoints = 0
rra[1].cf = "AVERAGE"
rra[1].rows = 576
rra[1].cur_row = 543
rra[1].pdp_per_row = 5
rra[1].xff = 0.0000000000e+00
rra[1].cdp_prep[0].value = 2.0000000000e+00
rra[1].cdp_prep[0].unknown_datapoints = 0
rra[2].cf = "MAX"
rra[2].rows = 576
rra[2].cur_row = 24
rra[2].pdp_per_row = 5
rra[2].xff = 0.0000000000e+00
rra[2].cdp_prep[0].value = 1.0000000000e+00
rra[2].cdp_prep[0].unknown_datapoints = 0
rra[3].cf = "MIN"
.....

3. 应用 rrd 文件查问数据

先看下如何间接查问 rrd 文件中的指标数据,比方查问 2020-06-02 09:30:46 ~ 2020-06-02 09:39:46 这段时间,每 60s1 个点的数据:

# rrdtool fetch be/be3cdc0fce0d498e19e2c6e9f1f338b6_GAUGE_60.rrd AVERAGE -r 60 -s 1591061446 -e 1591061986
                         metric
1591061460: 1.0000000000e+00
1591061520: 1.0000000000e+00
1591061580: 1.0000000000e+00
1591061640: 1.0000000000e+00
1591061700: 1.0000000000e+00
1591061760: 1.0000000000e+00
1591061820: 1.0000000000e+00
1591061880: 1.0000000000e+00
1591061940: 1.0000000000e+00
1591062000: 1.0000000000e+00

再看一下代码中如何查问的:

  • filename: 指标所在的文件;
  • cf: 指标聚合办法,比方 AVERAGE;
  • start,end: 查问的起始 / 终止工夫;
  • step: 指标的间隔时间;
// modules/graph/rrdtool/rrdtool.go
func fetch(filename string, cf string, start, end int64, step int) ([]*cmodel.RRDData, error) {start_t := time.Unix(start, 0)
    end_t := time.Unix(end, 0)
    step_t := time.Duration(step) * time.Second

    fetchRes, err := rrdlite.Fetch(filename, cf, start_t, end_t, step_t)
    if err != nil {return []*cmodel.RRDData{}, err}

    defer fetchRes.FreeValues()

    values := fetchRes.Values()
    size := len(values)
    ret := make([]*cmodel.RRDData, size)

    start_ts := fetchRes.Start.Unix()
    step_s := fetchRes.Step.Seconds()

    for i, val := range values {ts := start_ts + int64(i+1)*int64(step_s)
        d := &cmodel.RRDData{
            Timestamp: ts,
            Value:     cmodel.JsonFloat(val),
        }
        ret[i] = d
    }
    return ret, nil
}

4. 保留数据到 rrd 文件

通过 rrdlite.Updater 来实现,先将指标数据写入 cache,而后一起 update 到 rrd 文件:

// modules/graph/rrdtool/rrdtool.go
func update(filename string, items []*cmodel.GraphItem) error {u := rrdlite.NewUpdater(filename)

    for _, item := range items {v := math.Abs(item.Value)
        if v > 1e+300 || (v < 1e-300 && v > 0) {continue}
        if item.DsType == "DERIVE" || item.DsType == "COUNTER" {u.Cache(item.Timestamp, int(item.Value))
        } else {u.Cache(item.Timestamp, item.Value)
        }
    }

    return u.Update()}
退出移动版