共计 9040 个字符,预计需要花费 23 分钟才能阅读完成。
sidecar 负责定期向近程存储发送本地 prometheus 的 block 数据,其运行参数:
/bin/thanos sidecar
--prometheus.url=http://localhost:9090/
--tsdb.path=/prometheus
--grpc-address=[$(POD_IP)]:10901
--http-address=[$(POD_IP)]:10902
--objstore.config=$(OBJSTORE_CONFIG)
其中环境变量:
Environment:
POD_IP: (v1:status.podIP)
OBJSTORE_CONFIG: <set to the key 'thanos.yaml' in secret 'thanos-objstore-config'> Optional: false
POD_IP 是 downwareAPI 拿到的,OBJSTORE_CONFIG 是 secret 保留的 thanos.yaml 内容,配置了 s3 近程存储 minio:
type: s3
config:
bucket: thanos
endpoint: minio.minio.svc.cluster.local:9000
access_key: minio
secret_key: minio
insecure: true
signature_version2: false
sidecar 源码入口
先找 sidecar 的入口:
func main() {
......
app := extkingpin.NewApp(kingpin.New(filepath.Base(os.Args[0]), "A block storage based long-term storage for Prometheus.").Version(version.Print("thanos")))
registerSidecar(app)
.......
var g run.Group
......
if err := g.Run(); err != nil {
// Use %+v for github.com/pkg/errors error to print with stack.
level.Error(logger).Log("err", fmt.Sprintf("%+v", errors.Wrapf(err, "%s command failed", cmd)))
os.Exit(1)
}
}
registerSidecar()注册 sidecar 服务:
// cmd/thanos/sidecar.go
func registerSidecar(app *extkingpin.App) {cmd := app.Command(component.Sidecar.String(), "Sidecar for Prometheus server.")
conf := &sidecarConfig{}
conf.registerFlag(cmd)
cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error {
......
return runSidecar(g, logger, reg, tracer, rl, component.Sidecar, *conf, grpcLogOpts, tagOpts)
}
}
sidecar 中 shipper 的启动流程如下:
- 有近程存储时,uploads=true;
- 将 shipper 作为 1 个后盾 goroutine 运行;
- shipper 运行过程中,每 30s 查看一次是否有新 block 产生,若有新 block,则执行 Sync()将其 ship 到远端存储;
// cmd/thanos/shidecar.go
func runSidecar(
g *run.Group,
logger log.Logger,
reg *prometheus.Registry,
tracer opentracing.Tracer,
reloader *reloader.Reloader,
comp component.Component,
conf sidecarConfig,
grpcLogOpts []grpc_logging.Option,
tagOpts []tags.Option,) error {
......
// 有近程存储的配置时, uploads=true
var uploads = true
if len(confContentYaml) == 0 {level.Info(logger).Log("msg", "no supported bucket was configured, uploads will be disabled")
uploads = false
}
......
if uploads {
// The background shipper continuously scans the data directory and uploads
// new blocks to Google Cloud Storage or an S3-compatible storage service.
bkt, err := client.NewBucket(logger, confContentYaml, reg, component.Sidecar.String())
ctx, cancel := context.WithCancel(context.Background())
// shipper 作为 1 个后盾 goroutine 执行
g.Add(func() error {
......
s := shipper.New(logger, reg, conf.tsdb.path, bkt, m.Labels, metadata.SidecarSource,
conf.shipper.uploadCompacted, conf.shipper.allowOutOfOrderUpload, metadata.HashFunc(conf.shipper.hashFunc))
.......
// 每隔 30s 查看并执行一次
return runutil.Repeat(30*time.Second, ctx.Done(), func() error {if uploaded, err := s.Sync(ctx); err != nil {level.Warn(logger).Log("err", err, "uploaded", uploaded)
}
minTime, _, err := s.Timestamps()
if err != nil {level.Warn(logger).Log("msg", "reading timestamps failed", "err", err)
return nil
}
m.UpdateTimestamps(minTime, math.MaxInt64)
return nil
})
}, func(error) {cancel()
})
}
}
这里用 run.Group 治理并发的工作,当有一个工作谬误退出时,其它工作也退出。
shipper 的流程
- 首先查看本地哪些 block 须要上传;
- 而后将要上传的 block dir 用 hardlink 的形式爱护起来;
- 最初将 block dir 上传到 minio(minio API);
1. 查看哪些 block 须要上传
s := shipper.New(logger, reg, conf.tsdb.path, bkt, m.Labels, metadata.SidecarSource,
conf.shipper.uploadCompacted, conf.shipper.allowOutOfOrderUpload, metadata.HashFunc(conf.shipper.hashFunc))
.......
// 每隔 30s 查看并执行一次
return runutil.Repeat(30*time.Second, ctx.Done(), func() error {if uploaded, err := s.Sync(ctx); err != nil {level.Warn(logger).Log("err", err, "uploaded", uploaded)
}
minTime, _, err := s.Timestamps()
if err != nil {level.Warn(logger).Log("msg", "reading timestamps failed", "err", err)
return nil
}
m.UpdateTimestamps(minTime, math.MaxInt64)
return nil
})
重点在 s.Sync(ctx):
- 已上传 block: 读 metafile,其中保留了已上传了哪些 block;
- 以后的 block: 读 prometheus 的 data 目录,查问目前所有的 block;
- 将 data 中没有 ship 的 block,通过 s.upload()上传到近程存储;
- 最初,重写 metafile,将新 upload 的 block 写入 metafile(metafile: thanos.shipper.json);
// pkg/shipper/shipper.go
// Sync performs a single synchronization, which ensures all non-compacted local blocks have been uploaded
// to the object bucket once.
//
// If uploaded.
//
// It is not concurrency-safe, however it is compactor-safe (running concurrently with compactor is ok).
func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) {meta, err := ReadMetaFile(s.dir) // 读以后的 data 目录
// Build a map of blocks we already uploaded.
hasUploaded := make(map[ulid.ULID]struct{}, len(meta.Uploaded))
for _, id := range meta.Uploaded {hasUploaded[id] = struct{}{}
}
metas, err := s.blockMetasFromOldest()
for _, m := range metas {
// Do not sync a block if we already uploaded or ignored it. If it's no longer found in the bucket,
// it was generally removed by the compaction process.
if _, uploaded := hasUploaded[m.ULID]; uploaded { // 曾经上传过了
meta.Uploaded = append(meta.Uploaded, m.ULID)
continue
}
if err := s.upload(ctx, m); err != nil { // 上传
uploadErrs++
continue
}
meta.Uploaded = append(meta.Uploaded, m.ULID)
uploaded++
}
if err := WriteMetaFile(s.logger, s.dir, meta); err != nil { // 写 metafile: thanos.shipper.json
level.Warn(s.logger).Log("msg", "updating meta file failed", "err", err)
}
}
metafile 理论是 data/thanos.shipper.json,保留了已上传的 block:
/prometheus $ cat thanos.shipper.json
{
"version": 1,
"uploaded": [
"01FEYW9R0P134EWRCPQSQSCEZM",
"01FEZ35F8Q1WBHSDCGBJGN52YN",
"01FEZA16GMX4E1VZRQKMEJ7B5R",
"01FEZGWXRT31P1M8BG5SMFARAJ"
]
}
2. 将要上传的 block dir 用 hardlink 的形式爱护起来
将要上传的 block dir 进行 hardlink,hardlink 的文件被长期搁置 thanos 文件夹内,以避免其它操作对 dir 的批改;
/prometheus $ ls
01FEZGWXRT31P1M8BG5SMFARAJ thanos
01FEZ35F8Q1WBHSDCGBJGN52YN chunks_head thanos.shipper.json
01FEZA16GMX4E1VZRQKMEJ7B5R queries.active wal
实现代码:
// pkg/shipper/shipper.go
// sync uploads the block if not exists in remote storage.
func (s *Shipper) upload(ctx context.Context, meta *metadata.Meta) error {level.Info(s.logger).Log("msg", "upload new block", "id", meta.ULID)
// We hard-link the files into a temporary upload directory so we are not affected
// by other operations happening against the TSDB directory.
updir := filepath.Join(s.dir, "thanos", "upload", meta.ULID.String()) // 长期目录
// Remove updir just in case.
if err := os.RemoveAll(updir); err != nil {return errors.Wrap(err, "clean upload directory")
}
if err := os.MkdirAll(updir, 0750); err != nil {return errors.Wrap(err, "create upload dir")
}
.....
dir := filepath.Join(s.dir, meta.ULID.String())
if err := hardlinkBlock(dir, updir); err != nil {return errors.Wrap(err, "hard link block")
}
......
return block.Upload(ctx, s.logger, s.bucket, updir, s.hashFunc)
}
因为 linux hardlink 不能操作文件夹,于是新建了 1 个文件夹,递归的将其目录下的每个文件都 hardlink 起来。
每个 block 蕴含的文件如下:
/prometheus/01FEWYG6RK8JE9MY45XBJ0893G $ ls -alh
total 3M
drwxr-sr-x 3 1000 2000 68 Sep 6 07:00 .
drwxrwsrwx 18 root 2000 4.0K Sep 7 08:19 ..
drwxr-sr-x 2 1000 2000 20 Sep 6 07:00 chunks
-rw-r--r-- 1 1000 2000 2.5M Sep 6 07:00 index
-rw-r--r-- 1 1000 2000 280 Sep 6 07:00 meta.json
-rw-r--r-- 1 1000 2000 9 Sep 6 07:00 tombstones
/prometheus/01FEWYG6RK8JE9MY45XBJ0893G $
/prometheus/01FEWYG6RK8JE9MY45XBJ0893G $ ls chunks/
000001
在 hardlink 目录时,遍历目录下的每个文件进行 hardlink:
// pkg/shipper/shipper.go
func hardlinkBlock(src, dst string) error {
//chunks 目录
chunkDir := filepath.Join(dst, block.ChunksDirname)
if err := os.MkdirAll(chunkDir, 0750); err != nil {return errors.Wrap(err, "create chunks dir")
}
fis, err := ioutil.ReadDir(filepath.Join(src, block.ChunksDirname))
if err != nil {return errors.Wrap(err, "read chunk dir")
}
files := make([]string, 0, len(fis))
// 遍历 chunks 目录
for _, fi := range fis {files = append(files, fi.Name())
}
for i, fn := range files {files[i] = filepath.Join(block.ChunksDirname, fn)
}
// meta.json 文件,index 文件
files = append(files, block.MetaFilename, block.IndexFilename)
// 将 dir 下的文件都 hardlink
for _, fn := range files {if err := os.Link(filepath.Join(src, fn), filepath.Join(dst, fn)); err != nil {return errors.Wrapf(err, "hard link file %s", fn)
}
}
return nil
}
3.upload 到远端存储
// pkg/block/block.go
// Upload uploads a TSDB block to the object storage. It verifies basic
// features of Thanos block.
func Upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir string, hf metadata.HashFunc) error {return upload(ctx, logger, bkt, bdir, hf, true)
}
upload 会别离上传 block 目录中的每个文件:
// pkg/block/block.go
func upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir string, hf metadata.HashFunc, checkExternalLabels bool) error {
......
// 上传 chunks 目录
if err := objstore.UploadDir(ctx, logger, bkt, path.Join(bdir, ChunksDirname), path.Join(id.String(), ChunksDirname)); err != nil {return cleanUp(logger, bkt, id, errors.Wrap(err, "upload chunks"))
}
// 上传 index 文件
if err := objstore.UploadFile(ctx, logger, bkt, path.Join(bdir, IndexFilename), path.Join(id.String(), IndexFilename)); err != nil {return cleanUp(logger, bkt, id, errors.Wrap(err, "upload index"))
}
// 上传 meta.json 文件
if err := bkt.Upload(ctx, path.Join(id.String(), MetaFilename), strings.NewReader(metaEncoded.String())); err != nil {return errors.Wrap(err, "upload meta file")
}
......
}
上传目录的函数 UploadDir()会遍历目录中的文件,而后一一上传文件:
// pkg/objstore/objstore.go
func UploadDir(ctx context.Context, logger log.Logger, bkt Bucket, srcdir, dstdir string) error {
.......
return filepath.Walk(srcdir, func(src string, fi os.FileInfo, err error) error {
if err != nil {return err}
if fi.IsDir() {return nil}
dst := filepath.Join(dstdir, strings.TrimPrefix(src, srcdir))
return UploadFile(ctx, logger, bkt, src, dst)
})
}
上传时,依据不同的对象存储,应用不同的接口;对 minio 来讲,它应用 s3 接口;
调用 mino 提供的 client 进行上传:
// pkg/objstore/s3/s3.go
// Upload the contents of the reader as an object into the bucket.
func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error {sse, err := b.getServerSideEncryption(ctx)
...
size, err := objstore.TryToGetSize(r)
partSize := b.partSize
err := b.client.PutObject( //minio 的 API
ctx,
b.name,
name,
r,
size,
minio.PutObjectOptions{
PartSize: partSize,
ServerSideEncryption: sse,
UserMetadata: b.putUserMetadata,
},
)
...
}
参考
1.linux hard-link: https://linuxhandbook.com/har…
正文完
发表至: prometheus
2021-11-11