通过上一篇文章对 dq 生产者的剖析,咱们晓得 dq 是基于 beanstalk 的封装。至于 生产者 咱们在后续的文章持续分享,本篇文章先来剖析一下 go-queue 中的 kq

kq 基于 kafka 封装,设计之初是为了使 kafka 的应用更人性化。那就来看看 kq 的应用。

上手应用

func main() {  // 1. 初始化    pusher := kq.NewPusher([]string{        "127.0.0.1:19092",        "127.0.0.1:19092",        "127.0.0.1:19092",    }, "kq")    ticker := time.NewTicker(time.Millisecond)    for round := 0; round < 3; round++ {        select {        case <-ticker.C:            count := rand.Intn(100)            m := message{                Key:     strconv.FormatInt(time.Now().UnixNano(), 10),                Value:   fmt.Sprintf("%d,%d", round, count),                Payload: fmt.Sprintf("%d,%d", round, count),            }            body, err := json.Marshal(m)            if err != nil {                log.Fatal(err)            }            fmt.Println(string(body))      // 2. 写入            if err := pusher.Push(string(body)); err != nil {                log.Fatal(err)            }        }    }}

kafka cluster 配置以及 topic 传入,你就失去一个操作 kafkapush operator

至于写入音讯,简略的调用 pusher.Push(msg) 就行。是的,就这么简略!

当然,目前只反对单个 msg 写入。可能有人会纳闷,那就持续往下看,为什么只能一条一条写入?

初始化

一起看看 pusher 初始化哪些步骤:

NewPusher(clusterAddrs, topic, opts...)    |- kafka.NewWriter(kfConfig)                                // 与 kf 之前的连贯    |- executor = executors.NewChunkExecutor()  // 设置外部写入的executor为字节数定量写入
  1. 建设与 kafka cluster 的连贯。此处必定就要传入 kafka config
  2. 设置外部暂存区的写入函数以及刷新规定。

应用 chunkExecutor 作用显而易见:将随机写 -> 批量写,缩小 I/O 耗费;同时保障单次写入不能超过默认的 1M 或者本人设定的最大写入字节数。

其实再往 chunkExecutor 外部看,其实每次触发插入有两个指标:

  • maxChunkSize:单次最大写入字节数
  • flushInterval:刷新暂存音讯插入的间隔时间

在触发写入,只有满足任意一个指标都会执行写入。同时在 executors 都有设置插入间隔时间,以防暂存区写入阻塞而暂存区内音讯始终不被刷新清空。

更多对于 executors 能够参看以下:https://zeromicro.github.io/g...

生产者插入

根据上述初始化对 executors 介绍,插入过程中也少不了它的配合:

func (p *Pusher) Push(v string) error {  // 1. 将 msg -> kafka 外部的 Message    msg := kafka.Message{        Key:   []byte(strconv.FormatInt(time.Now().UnixNano(), 10)),        Value: []byte(v),    }    // 应用 executor.Add() 插入外部的 container  // 当 executor 初始化失败或者是外部产生谬误,也会将 Message 直接插入 kafka    if p.executor != nil {        return p.executor.Add(msg, len(v))    } else {        return p.produer.WriteMessages(context.Background(), msg)    }}

过程其实很简略。那 executors.Add(msg, len(msg)) 是怎么把 msg 插入到 kafka 呢?

插入的逻辑其实在初始化中就申明了:

pusher.executor = executors.NewChunkExecutor(func(tasks []interface{}) {        chunk := make([]kafka.Message, len(tasks))      // 1        for i := range tasks {            chunk[i] = tasks[i].(kafka.Message)        }      // 2        if err := pusher.produer.WriteMessages(context.Background(), chunk...); err != nil {            logx.Error(err)        }    }, newOptions(opts)...)
  1. 触发插入时,将暂存区中存储的 []msg 顺次拿出,作为最终插入音讯汇合;
  2. 将上一步的音讯汇合,作为一个批次插入 kafkatopic

这样 pusher -> chunkExecutor -> kafka 一个链路就呈现了。上面用一张图形象表白一下:

框架地址

https://github.com/tal-tech/go-queue

同时在 go-queue 也大量应用 go-zero 的 批量解决工具库 executors

https://github.com/tal-tech/go-zero

欢送应用 go-zero & go-queuestar 反对咱们!一起构建 go-zero 生态!????

go-zero 系列文章见『微服务实际』公众号