乐趣区

关于prometheus:thanos源码分析-sidecar-shipper数据到minio

sidecar 负责定期向近程存储发送本地 prometheus 的 block 数据,其运行参数:

/bin/thanos sidecar     
    --prometheus.url=http://localhost:9090/
    --tsdb.path=/prometheus
    --grpc-address=[$(POD_IP)]:10901
    --http-address=[$(POD_IP)]:10902
    --objstore.config=$(OBJSTORE_CONFIG)

其中环境变量:

Environment:
    POD_IP:            (v1:status.podIP)
    OBJSTORE_CONFIG:  <set to the key 'thanos.yaml' in secret 'thanos-objstore-config'>  Optional: false

POD_IP 是 downwareAPI 拿到的,OBJSTORE_CONFIG 是 secret 保留的 thanos.yaml 内容,配置了 s3 近程存储 minio:

type: s3
config:
  bucket: thanos
  endpoint: minio.minio.svc.cluster.local:9000
  access_key: minio
  secret_key: minio
  insecure: true
  signature_version2: false

sidecar 源码入口

先找 sidecar 的入口:

func main() {
    ......
    app := extkingpin.NewApp(kingpin.New(filepath.Base(os.Args[0]), "A block storage based long-term storage for Prometheus.").Version(version.Print("thanos")))
    registerSidecar(app)
    .......
    var g run.Group
    ......
    if err := g.Run(); err != nil {
        // Use %+v for github.com/pkg/errors error to print with stack.
        level.Error(logger).Log("err", fmt.Sprintf("%+v", errors.Wrapf(err, "%s command failed", cmd)))
        os.Exit(1)
    }
}

registerSidecar()注册 sidecar 服务:

// cmd/thanos/sidecar.go
func registerSidecar(app *extkingpin.App) {cmd := app.Command(component.Sidecar.String(), "Sidecar for Prometheus server.")
    conf := &sidecarConfig{}
    conf.registerFlag(cmd)
    cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error {
        ......
        return runSidecar(g, logger, reg, tracer, rl, component.Sidecar, *conf, grpcLogOpts, tagOpts)
    }
}

sidecar 中 shipper 的启动流程如下:

  • 有近程存储时,uploads=true;
  • 将 shipper 作为 1 个后盾 goroutine 运行;
  • shipper 运行过程中,每 30s 查看一次是否有新 block 产生,若有新 block,则执行 Sync()将其 ship 到远端存储;
// cmd/thanos/shidecar.go
func runSidecar(
    g *run.Group,
    logger log.Logger,
    reg *prometheus.Registry,
    tracer opentracing.Tracer,
    reloader *reloader.Reloader,
    comp component.Component,
    conf sidecarConfig,
    grpcLogOpts []grpc_logging.Option,
    tagOpts []tags.Option,) error {
    ......
    // 有近程存储的配置时, uploads=true
    var uploads = true
    if len(confContentYaml) == 0 {level.Info(logger).Log("msg", "no supported bucket was configured, uploads will be disabled")
        uploads = false
    }
    ......
    if uploads {
        // The background shipper continuously scans the data directory and uploads
        // new blocks to Google Cloud Storage or an S3-compatible storage service.
        bkt, err := client.NewBucket(logger, confContentYaml, reg, component.Sidecar.String())


        ctx, cancel := context.WithCancel(context.Background())
        // shipper 作为 1 个后盾 goroutine 执行
        g.Add(func() error {
            ......
            s := shipper.New(logger, reg, conf.tsdb.path, bkt, m.Labels, metadata.SidecarSource,
                conf.shipper.uploadCompacted, conf.shipper.allowOutOfOrderUpload, metadata.HashFunc(conf.shipper.hashFunc))
            .......
            // 每隔 30s 查看并执行一次
            return runutil.Repeat(30*time.Second, ctx.Done(), func() error {if uploaded, err := s.Sync(ctx); err != nil {level.Warn(logger).Log("err", err, "uploaded", uploaded)
                }
                minTime, _, err := s.Timestamps()
                if err != nil {level.Warn(logger).Log("msg", "reading timestamps failed", "err", err)
                    return nil
                }
                m.UpdateTimestamps(minTime, math.MaxInt64)
                return nil
            })
        }, func(error) {cancel()
        })
    }
}

这里用 run.Group 治理并发的工作,当有一个工作谬误退出时,其它工作也退出。

shipper 的流程

  • 首先查看本地哪些 block 须要上传;
  • 而后将要上传的 block dir 用 hardlink 的形式爱护起来;
  • 最初将 block dir 上传到 minio(minio API);

1. 查看哪些 block 须要上传

s := shipper.New(logger, reg, conf.tsdb.path, bkt, m.Labels, metadata.SidecarSource,
        conf.shipper.uploadCompacted, conf.shipper.allowOutOfOrderUpload, metadata.HashFunc(conf.shipper.hashFunc))
.......
// 每隔 30s 查看并执行一次
return runutil.Repeat(30*time.Second, ctx.Done(), func() error {if uploaded, err := s.Sync(ctx); err != nil {level.Warn(logger).Log("err", err, "uploaded", uploaded)
    }
    minTime, _, err := s.Timestamps()
    if err != nil {level.Warn(logger).Log("msg", "reading timestamps failed", "err", err)
        return nil
    }
    m.UpdateTimestamps(minTime, math.MaxInt64)
    return nil
})

重点在 s.Sync(ctx):

  • 已上传 block: 读 metafile,其中保留了已上传了哪些 block;
  • 以后的 block: 读 prometheus 的 data 目录,查问目前所有的 block;
  • 将 data 中没有 ship 的 block,通过 s.upload()上传到近程存储;
  • 最初,重写 metafile,将新 upload 的 block 写入 metafile(metafile: thanos.shipper.json);
// pkg/shipper/shipper.go
// Sync performs a single synchronization, which ensures all non-compacted local blocks have been uploaded
// to the object bucket once.
//
// If uploaded.
//
// It is not concurrency-safe, however it is compactor-safe (running concurrently with compactor is ok).
func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) {meta, err := ReadMetaFile(s.dir)    // 读以后的 data 目录
    
    // Build a map of blocks we already uploaded.
    hasUploaded := make(map[ulid.ULID]struct{}, len(meta.Uploaded))
    for _, id := range meta.Uploaded {hasUploaded[id] = struct{}{}
    }

    metas, err := s.blockMetasFromOldest()
    for _, m := range metas {
        // Do not sync a block if we already uploaded or ignored it. If it's no longer found in the bucket,
        // it was generally removed by the compaction process.
        if _, uploaded := hasUploaded[m.ULID]; uploaded {    // 曾经上传过了
            meta.Uploaded = append(meta.Uploaded, m.ULID)
            continue
        }
        if err := s.upload(ctx, m); err != nil {    // 上传
            uploadErrs++
            continue
        }
        meta.Uploaded = append(meta.Uploaded, m.ULID)
        uploaded++
    }    
    
    if err := WriteMetaFile(s.logger, s.dir, meta); err != nil {   // 写 metafile: thanos.shipper.json
        level.Warn(s.logger).Log("msg", "updating meta file failed", "err", err)
    }    
}

metafile 理论是 data/thanos.shipper.json,保留了已上传的 block:

/prometheus $ cat thanos.shipper.json
{
        "version": 1,
        "uploaded": [
                "01FEYW9R0P134EWRCPQSQSCEZM",
                "01FEZ35F8Q1WBHSDCGBJGN52YN",
                "01FEZA16GMX4E1VZRQKMEJ7B5R",
                "01FEZGWXRT31P1M8BG5SMFARAJ"
        ]
}

2. 将要上传的 block dir 用 hardlink 的形式爱护起来

将要上传的 block dir 进行 hardlink,hardlink 的文件被长期搁置 thanos 文件夹内,以避免其它操作对 dir 的批改;

/prometheus $ ls
01FEZGWXRT31P1M8BG5SMFARAJ  thanos
01FEZ35F8Q1WBHSDCGBJGN52YN  chunks_head                 thanos.shipper.json
01FEZA16GMX4E1VZRQKMEJ7B5R  queries.active              wal

实现代码:

// pkg/shipper/shipper.go
// sync uploads the block if not exists in remote storage.
func (s *Shipper) upload(ctx context.Context, meta *metadata.Meta) error {level.Info(s.logger).Log("msg", "upload new block", "id", meta.ULID)

    // We hard-link the files into a temporary upload directory so we are not affected
    // by other operations happening against the TSDB directory.
    updir := filepath.Join(s.dir, "thanos", "upload", meta.ULID.String())    // 长期目录

    // Remove updir just in case.
    if err := os.RemoveAll(updir); err != nil {return errors.Wrap(err, "clean upload directory")
    }
    if err := os.MkdirAll(updir, 0750); err != nil {return errors.Wrap(err, "create upload dir")
    }
    .....
    dir := filepath.Join(s.dir, meta.ULID.String())
    if err := hardlinkBlock(dir, updir); err != nil {return errors.Wrap(err, "hard link block")
    }
    ......
    return block.Upload(ctx, s.logger, s.bucket, updir, s.hashFunc)
}

因为 linux hardlink 不能操作文件夹,于是新建了 1 个文件夹,递归的将其目录下的每个文件都 hardlink 起来。

每个 block 蕴含的文件如下:

/prometheus/01FEWYG6RK8JE9MY45XBJ0893G $ ls -alh
total 3M
drwxr-sr-x    3 1000     2000          68 Sep  6 07:00 .
drwxrwsrwx   18 root     2000        4.0K Sep  7 08:19 ..
drwxr-sr-x    2 1000     2000          20 Sep  6 07:00 chunks
-rw-r--r--    1 1000     2000        2.5M Sep  6 07:00 index
-rw-r--r--    1 1000     2000         280 Sep  6 07:00 meta.json
-rw-r--r--    1 1000     2000           9 Sep  6 07:00 tombstones
/prometheus/01FEWYG6RK8JE9MY45XBJ0893G $
/prometheus/01FEWYG6RK8JE9MY45XBJ0893G $ ls chunks/
000001

在 hardlink 目录时,遍历目录下的每个文件进行 hardlink:

// pkg/shipper/shipper.go
func hardlinkBlock(src, dst string) error {
    //chunks 目录
    chunkDir := filepath.Join(dst, block.ChunksDirname)
    if err := os.MkdirAll(chunkDir, 0750); err != nil {return errors.Wrap(err, "create chunks dir")
    }
    fis, err := ioutil.ReadDir(filepath.Join(src, block.ChunksDirname))
    if err != nil {return errors.Wrap(err, "read chunk dir")
    }
    files := make([]string, 0, len(fis))
    // 遍历 chunks 目录
    for _, fi := range fis {files = append(files, fi.Name())
    }
    for i, fn := range files {files[i] = filepath.Join(block.ChunksDirname, fn)
    }
    // meta.json 文件,index 文件
    files = append(files, block.MetaFilename, block.IndexFilename)
    // 将 dir 下的文件都 hardlink
    for _, fn := range files {if err := os.Link(filepath.Join(src, fn), filepath.Join(dst, fn)); err != nil {return errors.Wrapf(err, "hard link file %s", fn)
        }
    }
    return nil
}

3.upload 到远端存储

// pkg/block/block.go
// Upload uploads a TSDB block to the object storage. It verifies basic
// features of Thanos block.
func Upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir string, hf metadata.HashFunc) error {return upload(ctx, logger, bkt, bdir, hf, true)
}

upload 会别离上传 block 目录中的每个文件:

// pkg/block/block.go
func upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir string, hf metadata.HashFunc, checkExternalLabels bool) error {
    ......
    // 上传 chunks 目录
    if err := objstore.UploadDir(ctx, logger, bkt, path.Join(bdir, ChunksDirname), path.Join(id.String(), ChunksDirname)); err != nil {return cleanUp(logger, bkt, id, errors.Wrap(err, "upload chunks"))
    }
    // 上传 index 文件
    if err := objstore.UploadFile(ctx, logger, bkt, path.Join(bdir, IndexFilename), path.Join(id.String(), IndexFilename)); err != nil {return cleanUp(logger, bkt, id, errors.Wrap(err, "upload index"))
    }
    // 上传 meta.json 文件
    if err := bkt.Upload(ctx, path.Join(id.String(), MetaFilename), strings.NewReader(metaEncoded.String())); err != nil {return errors.Wrap(err, "upload meta file")
    }
    ......
}

上传目录的函数 UploadDir()会遍历目录中的文件,而后一一上传文件:

// pkg/objstore/objstore.go
func UploadDir(ctx context.Context, logger log.Logger, bkt Bucket, srcdir, dstdir string) error {
    .......
    return filepath.Walk(srcdir, func(src string, fi os.FileInfo, err error) error {
        if err != nil {return err}
        if fi.IsDir() {return nil}
        dst := filepath.Join(dstdir, strings.TrimPrefix(src, srcdir))
        return UploadFile(ctx, logger, bkt, src, dst)
    })
}

上传时,依据不同的对象存储,应用不同的接口;对 minio 来讲,它应用 s3 接口;
调用 mino 提供的 client 进行上传:

// pkg/objstore/s3/s3.go
// Upload the contents of the reader as an object into the bucket.
func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error {sse, err := b.getServerSideEncryption(ctx)
    ...
    size, err := objstore.TryToGetSize(r)
    partSize := b.partSize
    err := b.client.PutObject(    //minio 的 API
        ctx,
        b.name,
        name,
        r,
        size,
        minio.PutObjectOptions{
            PartSize:             partSize,
            ServerSideEncryption: sse,
            UserMetadata:         b.putUserMetadata,
        },
    )
    ...
}

参考

1.linux hard-link: https://linuxhandbook.com/har…

退出移动版