共计 3108 个字符,预计需要花费 8 分钟才能阅读完成。
你在应用音讯队列的时候关注过吞吐量吗?
思考过吞吐量的影响因素吗?
思考过怎么进步吗?
总结过最佳实际吗?
本文带你一起探讨下音讯队列生产端高吞吐的 Go
框架实现。Let’s go!
对于吞吐量的一些思考
-
写入音讯队列吞吐量取决于以下两个方面
- 网络带宽
- 音讯队列(比方 Kafka)写入速度
最佳吞吐量是让其中之一打满,而个别状况下内网带宽都会十分高,不太可能被打满,所以天然就是讲音讯队列的写入速度打满,这就就有两个点须要均衡
- 批量写入的音讯量大小或者字节数多少
- 提早多久写入
go-zero 的
PeriodicalExecutor
和ChunkExecutor
就是为了这种状况设计的 -
从音讯队列里生产音讯的吞吐量取决于以下两个方面
- 音讯队列的读取速度,个别状况下音讯队列自身的读取速度相比于解决音讯的速度都是足够快的
- 处理速度,这个依赖于业务
这里有个外围问题是不能不思考业务处理速度,而读取过多的音讯到内存里,否则可能会引起两个问题:
- 内存占用过高,甚至呈现 OOM,
pod
也是有memory limit
的 - 进行
pod
时沉积的音讯来不及解决而导致音讯失落
解决方案和实现
借用一下 Rob Pike
的一张图,这个跟队列生产殊途同归。右边 4 个 gopher
从队列里取,左边 4 个 gopher
接过去解决。比拟现实的后果是右边和左边速率基本一致,没有谁节约,没有谁期待,两头替换处也没有沉积。
咱们来看看 go-zero
是怎么实现的:
Producer
端
for {
select {
case <-q.quit:
logx.Info("Quitting producer")
return
default:
if v, ok := q.produceOne(producer); ok {q.channel <- v}
}
}
没有退出事件就会通过 produceOne
去读取一个音讯,胜利后写入 channel
。利用 chan
就能够很好的解决读取和生产的连接问题。
Consumer
端
for {
select {
case message, ok := <-q.channel:
if ok {q.consumeOne(consumer, message)
} else {logx.Info("Task channel was closed, quitting consumer...")
return
}
case event := <-eventChan:
consumer.OnEvent(event)
}
}
这里如果拿到音讯就去解决,当 ok
为 false
的时候示意 channel
已被敞开,能够退出整个解决循环了。同时咱们还在 redis queue
上反对了 pause/resume
,咱们原来在社交场景里大量应用这样的队列,能够告诉 consumer
暂停和持续。
- 启动
queue
,有了这些咱们就能够通过管制producer/consumer
的数量来达到吞吐量的调优了
func (q *Queue) Start() {q.startProducers(q.producerCount)
q.startConsumers(q.consumerCount)
q.producerRoutineGroup.Wait()
close(q.channel)
q.consumerRoutineGroup.Wait()}
这里须要留神的是,先要停掉 producer
,再去等 consumer
解决完。
到这里外围控制代码根本就讲完了,其实看起来还是挺简略的,也能够到 https://github.com/tal-tech/go-zero/tree/master/core/queue 去看残缺实现。
如何应用
根本的应用流程:
- 创立
producer
或consumer
- 启动
queue
- 生产音讯 / 生产音讯
对应到 queue
中,大抵如下:
创立 queue
// 生产者创立工厂
producer := newMockedProducer()
// 消费者创立工厂
consumer := newMockedConsumer()
// 将生产者以及消费者的创立工厂函数传递给 NewQueue()
q := queue.NewQueue(func() (Producer, error) {return producer, nil}, func() (Consumer, error) {return consumer, nil})
咱们看看 NewQueue
须要什么参数:
producer
工厂办法consumer
工厂办法
将 producer & consumer
的工厂函数传递 queue
,由它去负责创立。框架提供了 Producer
和 Consumer
的接口以及工厂办法定义,而后整个流程的管制 queue
实现会主动实现。
生产 message
咱们通过自定义一个 mockedProducer
来模仿:
type mockedProducer struct {
total int32
count int32
// 应用 waitgroup 来模仿工作的实现
wait sync.WaitGroup
}
// 实现 Producer interface 的办法:Produce()
func (p *mockedProducer) Produce() (string, bool) {if atomic.AddInt32(&p.count, 1) <= p.total {p.wait.Done()
return "item", true
}
time.Sleep(time.Second)
return "", false
}
queue
中的生产者编写都必须实现:
Produce()
:由开发者编写生产音讯的逻辑AddListener()
:增加事件listener
生产 message
咱们通过自定义一个 mockedConsumer
来模仿:
type mockedConsumer struct {count int32}
func (c *mockedConsumer) Consume(string) error {atomic.AddInt32(&c.count, 1)
return nil
}
启动 queue
启动,而后验证咱们上述的生产者和消费者之间的数据是否传输胜利:
func main() {
// 创立 queue
q := NewQueue(func() (Producer, error) {return newMockedProducer(), nil
}, func() (Consumer, error) {return newMockedConsumer(), nil
})
// 启动 panic 了也能够确保 stop 被执行以清理资源
defer q.Stop()
// 启动
q.Start()}
以上就是 queue
最繁难的实现示例。咱们通过这个 core/queue
框架实现了基于 redis
和 kafka
等的音讯队列服务,在不同业务场景中通过了充沛的实际测验。你也能够依据本人的业务理论状况,实现本人的音讯队列服务。
整体设计
整体流程如上图:
- 整体的通信都由
channel
进行 Producer
和Consumer
的数量能够设定以匹配不同业务需要Produce
和Consume
具体实现由开发者定义,queue
负责整体流程
总结
本篇文章解说了如何通过 channel
来均衡从队列中读取和解决音讯的速度,以及如何实现一个通用的音讯队列解决框架,并通过 mock
示例简略展现了如何基于 core/queue
实现一个音讯队列解决服务。你能够通过相似的形式实现一个基于 rocketmq
等的音讯队列解决服务。
对于 go-zero
更多的设计和实现文章,能够关注『微服务实际』公众号。
我的项目地址
https://github.com/tal-tech/go-zero
欢送应用 go-zero 并 star 反对咱们!
微信交换群
关注『微服务实际 』公众号并点击 进群 获取社区群二维码。
go-zero 系列文章见『微服务实际』公众号