共计 2793 个字符,预计需要花费 7 分钟才能阅读完成。
前言
明天来介绍 go-zero
生态的另一个组件 go-stash
。这是一个 logstash
的 Go 语言代替版,咱们用 go-stash
相比原先的 logstash
节俭了 2 / 3 的服务器资源。如果你在用 logstash
,无妨试试,也能够看看基于 go-zero
实现这样的工具是如许的容易,这个工具作者仅用了两天工夫。
整体架构
先从它的配置中,咱们来看看设计架构。
Clusters:
- Input:
Kafka:
# Kafka 配置 --> 联动 go-queue
Filters:
# filter action
- Action: drop
- Action: remove_field
- Action: transfer
Output:
ElasticSearch:
# es 配置 {host, index}
看配置名:kafka
是数据输入端,es
是数据输出端,filter
形象了数据处理过程。
对,整个 go-stash
就是如 config 配置中显示的,所见即所得。
启动
从 stash.go
的启动流程大抵分为几个局部。因为能够配置多个 cluster
,那从一个 cluster
剖析:
- 建设与
es
的连贯【传入es
配置】 - 构建
filter processors
【es
前置处理器,做数据过滤以及解决,能够设置多个】 - 欠缺对
es
中 索引配置,启动handle
,同时将filter
退出 handle【解决输入输出】 - 连贯上游的
kafka
,将下面创立的handle
传入,实现kafka
和es
之间的数据生产和数据写入
MessageHandler
在下面架构图中,两头的 filter
只是从 config 中看到,其实更具体是 MessageHandler
的一部分,做数据过滤和转换,上面来说说这块。
以下代码:https://github.com/tal-tech/g…
type MessageHandler struct {
writer *es.Writer
indexer *es.Index
filters []filter.FilterFunc}
这个就对应下面说的,filter
只是其中一部分,在结构上 MessageHandler
是对接上游 es
,然而没有看到对 kafka
的操作。
别急,从接口设计上 MessageHandler
实现了 go-queue
中 ConsumeHandler
接口。
这里,上下游就串联了:
MessageHandler
接管了es
的操作,负责数据处理到数据写入- 对上实现了
kafka
的Consume
操作。这样在生产过程中执行handler
的操作,从而写入es
实际上,Consume()
也是这么解决的:
func (mh *MessageHandler) Consume(_, val string) error {var m map[string]interface{}
// 反序列化从 kafka 中的音讯
if err := jsoniter.Unmarshal([]byte(val), &m); err != nil {return err}
// es 写入 index 配置
index := mh.indexer.GetIndex(m)
// filter 链式解决【因为没有泛型,整个解决都是 `map 进 map 出 `】for _, proc := range mh.filters {if m = proc(m); m == nil {return nil}
}
bs, err := jsoniter.Marshal(m)
if err != nil {return err}
// es 写入
return mh.writer.Write(index, string(bs))
}
数据流
说完了数据处理,以及上下游的连接点。然而数据要从 kafka -> es
,数据流出这个动作从 kafka
角度看,应该是由开发者被动 pull data from kafka
。
那么数据流是怎么动起来?咱们回到主程序 https://github.com/tal-tech/g…
其实 启动 整个流程中,其实就是一个组合模式:
func main() {
// 解析命令行参数,启动优雅退出
...
// service 组合模式
group := service.NewServiceGroup()
defer group.Stop()
for _, processor := range c.Clusters {
// 连贯 es
...
// filter processors 构建
...
// 筹备 es 的写入操作 {写入的 index, 写入器 writer}
handle := handler.NewHandler(writer, indexer)
handle.AddFilters(filters...)
handle.AddFilters(filter.AddUriFieldFilter("url", "uri"))
// 依照配置启动 kafka,并将生产操作传入,同时退出组合器
for _, k := range toKqConf(processor.Input.Kafka) {group.Add(kq.MustNewQueue(k, handle))
}
}
// 启动这个组合器
group.Start()}
整个数据流,就和这个 group
组合器无关了。
group.Start()
|- group.doStart()
|- [service.Start() for service in group.services]
那么阐明退出 group
的 service
都是实现 Start()
。也就是说 kafka
端的启动逻辑在 Start()
:
func (q *kafkaQueue) Start() {q.startConsumers()
q.startProducers()
q.producerRoutines.Wait()
close(q.channel)
q.consumerRoutines.Wait()}
- 启动
kafka
生产程序 - 启动
kafka
生产拉取端【可能会被名字蛊惑,实际上是从kafka
拉取音讯到q.channel
】 - 生产程序终止,收尾工作
而咱们传入 kafka
中的 handler
,上文说过其实是 Consume
,而这个办法就是在 q.startConsumers()
中执行的:
q.startConsumers()
|- [q.consumeOne(key, value) for msg in q.channel]
|- q.handler.Consume(key, value)
这样整个数据流就彻底串起来了:
总结
作为 go-stash
第一篇文章,本篇从架构和设计上整体介绍 go-stash
,无关性能和为什么咱们要开发一个这样的组件,咱们下篇文章逐步揭晓。
https://github.com/tal-tech/g…
对于 go-zero
更多的设计和实现文章,能够继续关注咱们。
https://github.com/tal-tech/g…
欢送应用 go-zero 并 star 反对咱们!
微信交换群
关注『微服务实际』公众号并回复 进群 获取社区群二维码。
go-zero 系列文章见『微服务实际』公众号