乐趣区

关于golang:如何让消息队列达到最大吞吐量

你在应用音讯队列的时候关注过吞吐量吗?

思考过吞吐量的影响因素吗?

思考过怎么进步吗?

总结过最佳实际吗?

本文带你一起探讨下音讯队列生产端高吞吐的 Go 框架实现。Let’s go!

对于吞吐量的一些思考

  • 写入音讯队列吞吐量取决于以下两个方面

    • 网络带宽
    • 音讯队列(比方 Kafka)写入速度

    最佳吞吐量是让其中之一打满,而个别状况下内网带宽都会十分高,不太可能被打满,所以天然就是讲音讯队列的写入速度打满,这就就有两个点须要均衡

    • 批量写入的音讯量大小或者字节数多少
    • 提早多久写入

    go-zero 的 PeriodicalExecutorChunkExecutor 就是为了这种状况设计的

  • 从音讯队列里生产音讯的吞吐量取决于以下两个方面

    • 音讯队列的读取速度,个别状况下音讯队列自身的读取速度相比于解决音讯的速度都是足够快的
    • 处理速度,这个依赖于业务

    这里有个外围问题是不能不思考业务处理速度,而读取过多的音讯到内存里,否则可能会引起两个问题:

    • 内存占用过高,甚至呈现 OOM,pod 也是有 memory limit
    • 进行 pod 时沉积的音讯来不及解决而导致音讯失落

解决方案和实现

借用一下 Rob Pike 的一张图,这个跟队列生产殊途同归。右边 4 个 gopher 从队列里取,左边 4 个 gopher 接过去解决。比拟现实的后果是右边和左边速率基本一致,没有谁节约,没有谁期待,两头替换处也没有沉积。

咱们来看看 go-zero 是怎么实现的:

  • Producer
    for {
        select {
        case <-q.quit:
            logx.Info("Quitting producer")
            return
        default:
            if v, ok := q.produceOne(producer); ok {q.channel <- v}
        }
    }

没有退出事件就会通过 produceOne 去读取一个音讯,胜利后写入 channel。利用 chan 就能够很好的解决读取和生产的连接问题。

  • Consumer
    for {
        select {
        case message, ok := <-q.channel:
            if ok {q.consumeOne(consumer, message)
            } else {logx.Info("Task channel was closed, quitting consumer...")
                return
            }
        case event := <-eventChan:
            consumer.OnEvent(event)
        }
    }

这里如果拿到音讯就去解决,当 okfalse 的时候示意 channel 已被敞开,能够退出整个解决循环了。同时咱们还在 redis queue 上反对了 pause/resume,咱们原来在社交场景里大量应用这样的队列,能够告诉 consumer 暂停和持续。

  • 启动 queue,有了这些咱们就能够通过管制 producer/consumer 的数量来达到吞吐量的调优了
func (q *Queue) Start() {q.startProducers(q.producerCount)
    q.startConsumers(q.consumerCount)

    q.producerRoutineGroup.Wait()
    close(q.channel)
    q.consumerRoutineGroup.Wait()}

这里须要留神的是,先要停掉 producer,再去等 consumer 解决完。

到这里外围控制代码根本就讲完了,其实看起来还是挺简略的,也能够到 https://github.com/tal-tech/go-zero/tree/master/core/queue 去看残缺实现。

如何应用

根本的应用流程:

  1. 创立 producerconsumer
  2. 启动 queue
  3. 生产音讯 / 生产音讯

对应到 queue 中,大抵如下:

创立 queue

// 生产者创立工厂
producer := newMockedProducer()
// 消费者创立工厂
consumer := newMockedConsumer()
// 将生产者以及消费者的创立工厂函数传递给 NewQueue()
q := queue.NewQueue(func() (Producer, error) {return producer, nil}, func() (Consumer, error) {return consumer, nil})

咱们看看 NewQueue 须要什么参数:

  1. producer 工厂办法
  2. consumer 工厂办法

producer & consumer 的工厂函数传递 queue,由它去负责创立。框架提供了 ProducerConsumer 的接口以及工厂办法定义,而后整个流程的管制 queue 实现会主动实现。

生产 message

咱们通过自定义一个 mockedProducer 来模仿:

type mockedProducer struct {
    total int32
    count int32
  // 应用 waitgroup 来模仿工作的实现
    wait  sync.WaitGroup
}
// 实现 Producer interface 的办法:Produce()
func (p *mockedProducer) Produce() (string, bool) {if atomic.AddInt32(&p.count, 1) <= p.total {p.wait.Done()
        return "item", true
    }
    time.Sleep(time.Second)
    return "", false
}

queue 中的生产者编写都必须实现:

  • Produce():由开发者编写生产音讯的逻辑
  • AddListener():增加事件 listener

生产 message

咱们通过自定义一个 mockedConsumer 来模仿:

type mockedConsumer struct {count  int32}

func (c *mockedConsumer) Consume(string) error {atomic.AddInt32(&c.count, 1)
    return nil
}

启动 queue

启动,而后验证咱们上述的生产者和消费者之间的数据是否传输胜利:

func main() {
    // 创立 queue
    q := NewQueue(func() (Producer, error) {return newMockedProducer(), nil
    }, func() (Consumer, error) {return newMockedConsumer(), nil
    })
  // 启动 panic 了也能够确保 stop 被执行以清理资源
  defer q.Stop()
    // 启动
    q.Start()}

以上就是 queue 最繁难的实现示例。咱们通过这个 core/queue 框架实现了基于 rediskafka 等的音讯队列服务,在不同业务场景中通过了充沛的实际测验。你也能够依据本人的业务理论状况,实现本人的音讯队列服务。

整体设计

整体流程如上图:

  1. 整体的通信都由 channel 进行
  2. ProducerConsumer 的数量能够设定以匹配不同业务需要
  3. ProduceConsume 具体实现由开发者定义,queue 负责整体流程

总结

本篇文章解说了如何通过 channel 来均衡从队列中读取和解决音讯的速度,以及如何实现一个通用的音讯队列解决框架,并通过 mock 示例简略展现了如何基于 core/queue 实现一个音讯队列解决服务。你能够通过相似的形式实现一个基于 rocketmq 等的音讯队列解决服务。

对于 go-zero 更多的设计和实现文章,能够关注『微服务实际』公众号。

我的项目地址

https://github.com/tal-tech/go-zero

欢送应用 go-zero 并 star 反对咱们!

微信交换群

关注『微服务实际 』公众号并点击 进群 获取社区群二维码。

go-zero 系列文章见『微服务实际』公众号

退出移动版