乐趣区

关于golang:极速精简-Go-版-Logstash

前言

明天来介绍 go-zero 生态的另一个组件 go-stash。这是一个 logstash 的 Go 语言代替版,咱们用 go-stash 相比原先的 logstash 节俭了 2 / 3 的服务器资源。如果你在用 logstash,无妨试试,也能够看看基于 go-zero 实现这样的工具是如许的容易,这个工具作者仅用了两天工夫。

整体架构

先从它的配置中,咱们来看看设计架构。

Clusters:
  - Input:
      Kafka:
        # Kafka 配置 --> 联动 go-queue
    Filters:
        # filter action
      - Action: drop            
      - Action: remove_field
      - Action: transfer      
    Output:
      ElasticSearch:
        # es 配置 {host, index}

看配置名:kafka 是数据输入端,es 是数据输出端,filter 形象了数据处理过程。

对,整个 go-stash 就是如 config 配置中显示的,所见即所得。

启动

stash.go 的启动流程大抵分为几个局部。因为能够配置多个 cluster,那从一个 cluster 剖析:

  1. 建设与 es 的连贯【传入 es 配置】
  2. 构建 filter processorses 前置处理器,做数据过滤以及解决,能够设置多个】
  3. 欠缺对 es 中 索引配置,启动 handle,同时将 filter 退出 handle【解决输入输出】
  4. 连贯上游的 kafka,将下面创立的 handle 传入,实现 kafkaes 之间的数据生产和数据写入

MessageHandler

在下面架构图中,两头的 filter 只是从 config 中看到,其实更具体是 MessageHandler 的一部分,做数据过滤和转换,上面来说说这块。

以下代码:https://github.com/tal-tech/g…

type MessageHandler struct {
    writer  *es.Writer
    indexer *es.Index
    filters []filter.FilterFunc}

这个就对应下面说的,filter 只是其中一部分,在结构上 MessageHandler 是对接上游 es,然而没有看到对 kafka 的操作。

别急,从接口设计上 MessageHandler 实现了 go-queueConsumeHandler 接口。

这里,上下游就串联了:

  1. MessageHandler 接管了 es 的操作,负责数据处理到数据写入
  2. 对上实现了 kafkaConsume 操作。这样在生产过程中执行 handler 的操作,从而写入 es

实际上,Consume() 也是这么解决的:

func (mh *MessageHandler) Consume(_, val string) error {var m map[string]interface{}
  // 反序列化从 kafka 中的音讯
    if err := jsoniter.Unmarshal([]byte(val), &m); err != nil {return err}
    // es 写入 index 配置
    index := mh.indexer.GetIndex(m)
  // filter 链式解决【因为没有泛型,整个解决都是 `map 进 map 出 `】for _, proc := range mh.filters {if m = proc(m); m == nil {return nil}
    }
    bs, err := jsoniter.Marshal(m)
    if err != nil {return err}
    // es 写入
    return mh.writer.Write(index, string(bs))
}

数据流

说完了数据处理,以及上下游的连接点。然而数据要从 kafka -> es,数据流出这个动作从 kafka 角度看,应该是由开发者被动 pull data from kafka

那么数据流是怎么动起来?咱们回到主程序 https://github.com/tal-tech/g…

其实 启动 整个流程中,其实就是一个组合模式:

func main() {
    // 解析命令行参数,启动优雅退出
    ...
  // service 组合模式
    group := service.NewServiceGroup()
    defer group.Stop()

    for _, processor := range c.Clusters {
        // 连贯 es
    ...
        // filter processors 构建
    ...
    // 筹备 es 的写入操作 {写入的 index, 写入器 writer}
        handle := handler.NewHandler(writer, indexer)
        handle.AddFilters(filters...)
        handle.AddFilters(filter.AddUriFieldFilter("url", "uri"))
    // 依照配置启动 kafka,并将生产操作传入,同时退出组合器
        for _, k := range toKqConf(processor.Input.Kafka) {group.Add(kq.MustNewQueue(k, handle))
        }
    }
    // 启动这个组合器
    group.Start()}

整个数据流,就和这个 group 组合器无关了。

group.Start()
    |- group.doStart()
        |- [service.Start() for service in group.services]

那么阐明退出 groupservice 都是实现 Start()。也就是说 kafka 端的启动逻辑在 Start()

func (q *kafkaQueue) Start() {q.startConsumers()
    q.startProducers()

    q.producerRoutines.Wait()
    close(q.channel)
    q.consumerRoutines.Wait()}
  1. 启动 kafka 生产程序
  2. 启动 kafka 生产拉取端【可能会被名字蛊惑,实际上是从 kafka 拉取音讯到 q.channel
  3. 生产程序终止,收尾工作

而咱们传入 kafka 中的 handler,上文说过其实是 Consume,而这个办法就是在 q.startConsumers() 中执行的:

q.startConsumers()
    |- [q.consumeOne(key, value) for msg in q.channel]
        |- q.handler.Consume(key, value)

这样整个数据流就彻底串起来了:

总结

作为 go-stash 第一篇文章,本篇从架构和设计上整体介绍 go-stash,无关性能和为什么咱们要开发一个这样的组件,咱们下篇文章逐步揭晓。

https://github.com/tal-tech/g…

对于 go-zero 更多的设计和实现文章,能够继续关注咱们。

https://github.com/tal-tech/g…

欢送应用 go-zero 并 star 反对咱们!

微信交换群

关注『微服务实际』公众号并回复 进群 获取社区群二维码。

go-zero 系列文章见『微服务实际』公众号

退出移动版