最近在看公司的 redis queue
时,发现底层应用的是 go-zero
的 queue
。本篇文章来看看 queue
的设计,也心愿能够从外面理解到 mq
的最小型设计实际。
应用
联合其余 mq
的应用经验,根本的应用流程:
- 创立
producer
或consumer
- 启动
mq
- 生产音讯/生产音讯
对应到 queue
中,大抵也是这个:
创立 queue
// 生产者创立工厂producer := newMockedProducer()// 消费者创立工厂consumer := newMockedConsumer()// 将生产者以及消费者的创立工厂函数传递给 NewQueue()q := queue.NewQueue(func() (Producer, error) { return producer, nil}, func() (Consumer, error) { return consumer, nil})
咱们看看 NewQueue
须要什么构建条件:
producer constructor
consumer constructor
将单方的工厂函数传递给 queue
,由它去执行以及重试。
这两个须要的目标是将生产者/消费者的构建和音讯生产/生产都封装在 mq
中,而且将生产者/消费者的整套逻辑交给开发者解决:
type ( // 开发者须要实现此接口 Producer interface { AddListener(listener ProduceListener) Produce() (string, bool) } ... // ProducerFactory定义了生成Producer的办法 ProducerFactory func() (Producer, error))
- 其实也就是将生产者的逻辑交个开发者本人实现,
mq
只负责生产者/消费者的消息传递和之间的调度。 - 工厂办法的设计,是将生产者自身和生产音讯,这两个工作都交给
queue
本人来做调度或者重试。
生产msg
生产音讯当然要回到生产者自身:
type mockedProducer struct { total int32 count int32 // 应用waitgroup来模仿工作的实现 wait sync.WaitGroup}// 实现 Producer interface 的办法:Produce()func (p *mockedProducer) Produce() (string, bool) { if atomic.AddInt32(&p.count, 1) <= p.total { p.wait.Done() return "item", true } time.Sleep(time.Second) return "", false}
queue
中的生产者编写都必须实现:
Produce()
:由开发者编写生产音讯的逻辑AddListener()
:生产者
生产msg
和生产者相似:
type mockedConsumer struct { count int32}func (c *mockedConsumer) Consume(string) error { atomic.AddInt32(&c.count, 1) return nil}
启动 queue
启动,而后验证咱们上述的生产者和消费者之间的数据是否传输胜利:
func TestQueue(t *testing.T) { producer := newMockedProducer(rounds) consumer := newMockedConsumer() // 创立 queue q := NewQueue(func() (Producer, error) { return producer, nil }, func() (Consumer, error) { return consumer, nil }) // 当生产者生产结束,执行 Stop() 敞开生产端生产 go func() { producer.wait.Wait() // mq生产端进行生产,不是mq自身 Stop 运行 q.Stop() }() // 启动 q.Start() // 验证生产生产端是否音讯生产实现 assert.Equal(t, int32(rounds), atomic.LoadInt32(&consumer.count))}
以上就是 queue
最繁难的入门应用代码。开发者能够依据本人的业务理论状况:自在定义生产者/消费者曾经生产/生产逻辑。
整体设计
整体流程如上图:
- 整体的通信都由
channel
进行 - 通过退出监听器
listener
,以及事件触发event
,相当于将触发器逻辑分离出来 - 生产者有
produceone
,这个是生产音讯的逻辑,然而其中的Produce()
是由开发者编写【下面的interface
中正是这个函数】 - 同理消费者,
Consume()
根本的音讯流动就入上图以及上述刻画的,具体的代码剖析咱们就留到下一篇,咱们剖析外面,尤其是如何管制 channel
是整个设计的外围。
总结
本篇文章从应用以及整个架构剖析上简略介绍了 queue
的设计。下篇咱们将深刻源码,剖析内部消息流转以及 channel
管制。
对于 go-zero
更多的设计和实现文章,能够继续关注咱们。欢送大家去关注和应用。
我的项目地址
https://github.com/tal-tech/go-zero
欢送应用 go-zero 并 star 反对咱们!
微信交换群
关注『微服务实际』公众号并回复 进群 获取社区群二维码。
go-zero 系列文章见『微服务实际』公众号