最近在看公司的 redis queue 时,发现底层应用的是 go-zeroqueue 。本篇文章来看看 queue 的设计,也心愿能够从外面理解到 mq 的最小型设计实际。

应用

联合其余 mq 的应用经验,根本的应用流程:

  1. 创立 producerconsumer
  2. 启动 mq
  3. 生产音讯/生产音讯

对应到 queue 中,大抵也是这个:

创立 queue

// 生产者创立工厂producer := newMockedProducer()// 消费者创立工厂consumer := newMockedConsumer()// 将生产者以及消费者的创立工厂函数传递给 NewQueue()q := queue.NewQueue(func() (Producer, error) {  return producer, nil}, func() (Consumer, error) {  return consumer, nil})

咱们看看 NewQueue 须要什么构建条件:

  1. producer constructor
  2. consumer constructor

将单方的工厂函数传递给 queue ,由它去执行以及重试。

这两个须要的目标是将生产者/消费者的构建和音讯生产/生产都封装在 mq 中,而且将生产者/消费者的整套逻辑交给开发者解决:

type (    // 开发者须要实现此接口    Producer interface {        AddListener(listener ProduceListener)        Produce() (string, bool)    }    ...    // ProducerFactory定义了生成Producer的办法    ProducerFactory func() (Producer, error))
  1. 其实也就是将生产者的逻辑交个开发者本人实现,mq 只负责生产者/消费者的消息传递和之间的调度。
  2. 工厂办法的设计,是将生产者自身和生产音讯,这两个工作都交给 queue 本人来做调度或者重试。

生产msg

生产音讯当然要回到生产者自身:

type mockedProducer struct {    total int32    count int32  // 应用waitgroup来模仿工作的实现    wait  sync.WaitGroup}// 实现 Producer interface 的办法:Produce()func (p *mockedProducer) Produce() (string, bool) {    if atomic.AddInt32(&p.count, 1) <= p.total {        p.wait.Done()        return "item", true    }    time.Sleep(time.Second)    return "", false}

queue 中的生产者编写都必须实现:

  • Produce():由开发者编写生产音讯的逻辑
  • AddListener():生产者

生产msg

和生产者相似:

type mockedConsumer struct {    count  int32}func (c *mockedConsumer) Consume(string) error {    atomic.AddInt32(&c.count, 1)    return nil}

启动 queue

启动,而后验证咱们上述的生产者和消费者之间的数据是否传输胜利:

func TestQueue(t *testing.T) {    producer := newMockedProducer(rounds)    consumer := newMockedConsumer()    // 创立 queue    q := NewQueue(func() (Producer, error) {        return producer, nil    }, func() (Consumer, error) {        return consumer, nil    })    // 当生产者生产结束,执行 Stop() 敞开生产端生产    go func() {        producer.wait.Wait()    // mq生产端进行生产,不是mq自身 Stop 运行        q.Stop()    }()    // 启动    q.Start()    // 验证生产生产端是否音讯生产实现    assert.Equal(t, int32(rounds), atomic.LoadInt32(&consumer.count))}

以上就是 queue 最繁难的入门应用代码。开发者能够依据本人的业务理论状况:自在定义生产者/消费者曾经生产/生产逻辑。

整体设计

整体流程如上图:

  1. 整体的通信都由 channel 进行
  2. 通过退出监听器 listener ,以及事件触发 event ,相当于将触发器逻辑分离出来
  3. 生产者有 produceone ,这个是生产音讯的逻辑,然而其中的 Produce() 是由开发者编写【下面的 interface 中正是这个函数】
  4. 同理消费者,Consume()

根本的音讯流动就入上图以及上述刻画的,具体的代码剖析咱们就留到下一篇,咱们剖析外面,尤其是如何管制 channel 是整个设计的外围。

总结

本篇文章从应用以及整个架构剖析上简略介绍了 queue 的设计。下篇咱们将深刻源码,剖析内部消息流转以及 channel 管制。

对于 go-zero 更多的设计和实现文章,能够继续关注咱们。欢送大家去关注和应用。

我的项目地址

https://github.com/tal-tech/go-zero

欢送应用 go-zero 并 star 反对咱们!

微信交换群

关注『微服务实际』公众号并回复 进群 获取社区群二维码。

go-zero 系列文章见『微服务实际』公众号