乐趣区

关于前端:ES日志存储以及备份压缩到COS

导语

为了满足用户日益增长的日志存储大小,不影响用户的写入和查问性能。满足不同用户写入流量。同时用户日志长期保留,日志存储比拟占用空间和老本。ES 集群规格配置高,耗费资源和老本。咱们基于 Go 语言设计了一个多用户多 ES 集群,日志备份到 cos 节省成本的计划。本篇实际基于 Go 语言编程。

索引设计

为了避免单个索引一直减少。影响 ES 集群查问写入性能,ES 集群的索引设计次要采取如下形式:

  1. 租户拆散:将索引依照租户进行拆散,防止不同租户之间的数据混同,进步 ES 集群的数据安全性和隔离性。
  2. 按月宰割:将索引依照每个月进行宰割,防止单个索引过大,进步 ES 集群的查问性能。
  3. 按大小固定 rollover:将索引依照固定大小进行 rollover,主动分索引,防止单个索引过大,进行索引治理,晋升集群性能,便于集群运维。

ES 集群设计

为了防止 ES 集群呈现单点问题,以及配置一直减少带来的运维危险,ES 集群设计时次要通过将不同规格的用户日志写入不同配置的 ES 集群,并且应用主动扩容技术来实现集群的可扩展性。
具体来说,能够将用户日志依照不同的规格(例如不同的数据量、拜访频率等)进行分类,而后将不同规格的用户日志写入不同配置的 ES 集群中。例如,能够应用较小的 ES 集群来解决低频拜访的用户日志,而应用较大的 ES 集群来解决高频拜访的用户日志。

ES 数据备份到 COS

创立备份流程
在 ES 备份流程中,为了保障备份的正确性和完整性,能够依照如下步骤执行:
创立一个仓库来存储备份数据。
创立一个快照工作,将 ES 中的数据备份到指定的仓库中。在创立快照工作时,须要指定备份的索引、仓库、快照名称等参数,并设置备份的间隔时间和保留工夫等策略。
查问快照工作是否实现。ES 提供了 API 接口来查问快照的状态和进度,能够依据查问后果来判断备份是否曾经实现。
调用 COS 提供的压缩函数对备份数据进行压缩,以减小备份数据所占用的存储空间。

通过备份和压缩到 COS 能够将 ES 存储的数据放大到 1/4 的大小,并且通过 COS 的海量分布式存储服务,能够提供低成本、弹性的云存储服务。

各项步骤对应的代码逻辑如下:
1. 创立仓库

func CreateRepository(Repository string, cosappid uint64, appId uint64, accessKeyId string, AccessKeySecret string, buckets string, region string) error {service := Client.SnapshotCreateRepository(Repository)
  service = service.Type("cos").Settings(map[string]interface{}{"app_id":            strconv.FormatUint(cosappid, 10),
    "access_key_id":     accessKeyId,
    "access_key_secret": AccessKeySecret,
    "bucket":            buckets,
    "region":            region,
    "compress":          true,
    "chunk_size":        "500mb",
    "base_path":         "backup/" + strconv.FormatUint(appId, 10) + "/" + Repository,
  })
  if err := service.Validate(); err != nil {logger.Errorf("SnapshotCreateRepository failed,app_id=%v,Repository=%s", appId, Repository)
    return err
  }
  _, err := service.Do(context.Background())
  if err != nil {logger.Errorf("SnapshotGetRepository do failed,app_id=%v,Repository=%s", appId, Repository)
  }
  return nil
}

2. 创立快照


func SnapshotCreate(Repository string, Snapshot string, appId uint64, indexname string) error {
  type IndicesSetting struct {Indices string `json:"indices"`}
  e1 := IndicesSetting{indexname}
  _, err := Client.SnapshotCreate(Repository, Snapshot).WaitForCompletion(false).BodyJson(e1).Pretty(true).Do(context.Background())
  if err != nil {logger.Errorf("SnapshotCreate failed,app_id=%v,Repository=%s Snapshot=%s", appId, Repository, Snapshot)
    return err
  }
  return nil
}

3. 查问快照状态

func SnapshotGet(Repository string, Snapshot string, appId uint64) (*elastic.SnapshotGetResponse, error) {service, err := Client.SnapshotGet(Repository).
    Snapshot(Snapshot).
    IgnoreUnavailable(true).
    Verbose(true).Pretty(true).Do(context.Background())
  if err != nil {logger.Errorf("SnapshotGet failed,app_id=%v,Repository=%s Snapshot=%s", appId, Repository, Snapshot)
    return nil, err
  } /*
    判断工作是否实现
    if service.Snapshots[0].State != "true" { }*/
  return service, nil
}

4. 创立压缩工作

压缩工作是腾讯云对象存储 COS 提供的压缩 API,须要先创立好压缩函数:
创立压缩函数参考如下:
https://cloud.tencent.com/document/product/436/58578

首先获取压缩文件,而后调用压缩函数进行压缩。获取压缩文件代码逻辑如下:


var coslist []string
  coslist, err = ext.GetCosDirCosKeyList(backupcosfile, cosClient)
  if err != nil {logger.Errorx("GetCosDirCosKeyList error", err)
    return err
  }
  scfClient, err := ext.GetScfClient(config.One.Backup.Region, true)
  if err != nil {logger.Errorx("GetScfClient error", err)
    return err
  }
  var invokeRequestId string
  invokeRequestId, err = ext.ScfCompression(scfClient, coslist, backupcosfilezip, config.One.Backup.ZipFunctionName)
  if err != nil {logger.Errorx("GetScfClient error", err)
    return err
  }

压缩函数模版如下:


func ScfCompression(scfCleint *scf.Client, cosKeyList []string, compressKey string, functionname string) (functionRequestId string, err error) {request := scf.NewInvokeRequest()

  request.FunctionName = common.StringPtr(functionname)
  scfSourceList := make([]ScfCompressionSource, 0, 1)
  for _, cosKey := range cosKeyList {
    scfSource := ScfCompressionSource{Url: fmt.Sprintf("https://%s.cos-internal.%s.tencentcos.cn/%s", config.One.Cos.GuangzhouCosBucket, config.One.Cos.CosRegion, cosKey),
    }
    scfSourceList = append(scfSourceList, scfSource)
  }
  scfClientContext := ScfCompressionClientContext{
    Bucket:     config.One.Cos.GuangzhouCosBucket,
    Region:     config.One.Cos.CosRegion,
    Key:        compressKey,
    Flatten:    false,
    SourceList: scfSourceList,
  }
  clientContext, err := json.Marshal(scfClientContext)
  if err != nil {logger.Errorx("ScfCompression Marshal", err)
    return "", err
  }
  request.ClientContext = common.StringPtr(string(clientContext))
  //fmt.Println(string(clientContext))

  response, err := scfCleint.Invoke(request)
  if _, ok := err.(*tencentError.TencentCloudSDKError); ok {logger.Errorx("An API error has returned:", err)
    return "", err
  }
  if err != nil {logger.Errorx("scfCleint.Invoke", err)
    return "", err
  }
  logger.Debugf("%s", response.ToJsonString())
  return *response.Response.Result.FunctionRequestId, nil
}

COS 数据恢复到 ES

创立复原流程
复原流程是备份流程的逆向流程,次要包含如下步骤:
创立 COS 解压缩函数
创立复原工作
查问复原工作是否实现
实现复原

  1. COS 解压缩工作
    解压缩工作是腾讯云对象存储 COS 提供的解压缩函数模板,须要先创立好解压缩函数:

创立解压函数参考如下:
https://cloud.tencent.com/document/product/436/67101

解压缩函数模板如下:


func ScfDecompression(scfCleint *scf.Client, compressKey string, cosTargetPrefix string, functionname string) (functionRequestId string, err error) {logger.Debug("ScfDecompression" + compressKey)
  request := scf.NewInvokeRequest()

  request.FunctionName = common.StringPtr(functionname)
  //key := fmt.Sprintf("https://%s.cos-internal.ap-guangzhou.myqcloud.com/%s", config.One.Cos.GuangzhouCosBucket, compressKey)
  scfClientContext := ScfDecompressionClientContext{
    Bucket:       config.One.Cos.GuangzhouCosBucket,
    Region:       config.One.Cos.CosRegion,
    Key:          compressKey,
    TargetBucket: config.One.Cos.GuangzhouCosBucket,
    TargetRegion: config.One.Cos.CosRegion,
    TargetPrefix: cosTargetPrefix,
  }
  clientContext, err := json.Marshal(scfClientContext)
  if err != nil {logger.Errorx("ScfCompression Marshal", err)
    return "", err
  }
  request.ClientContext = common.StringPtr(string(clientContext))
  //fmt.Println(string(clientContext))

  response, err := scfCleint.Invoke(request)
  if _, ok := err.(*tencentError.TencentCloudSDKError); ok {logger.Errorx("An API error has returned:", err)
    return "", err
  }
  if err != nil {logger.Errorx("scfCleint.Invoke", err)
    return "", err
  }
  logger.Debugf("%s", response.ToJsonString())
  return *response.Response.Result.FunctionRequestId, nil
}
  1. 实现解压缩后,创立复原工作

func SnapshotRestore(Repository string, Snapshot string, appId uint64, oldIndexName string, newIndexName string) error {_, err := Client.SnapshotRestore(Repository, Snapshot).RenamePattern(oldIndexName).
    RenameReplacement(newIndexName).Indices(oldIndexName).WaitForCompletion(false).Pretty(true).Do(context.Background())
  if err != nil {logger.Errorf("SnapshotRestore failed,app_id=%v,Repository=%s Snapshot=%s", appId, Repository, Snapshot)
    return err
  }
  return nil
}
退出移动版