乐趣区

关于golang:一文带你理解最简消息队列实现

最近在看公司的 redis queue 时,发现底层应用的是 go-zeroqueue。本篇文章来看看 queue 的设计,也心愿能够从外面理解到 mq 的最小型设计实际。

应用

联合其余 mq 的应用经验,根本的应用流程:

  1. 创立 producerconsumer
  2. 启动 mq
  3. 生产音讯 / 生产音讯

对应到 queue 中,大抵也是这个:

创立 queue

// 生产者创立工厂
producer := newMockedProducer()
// 消费者创立工厂
consumer := newMockedConsumer()
// 将生产者以及消费者的创立工厂函数传递给 NewQueue()
q := queue.NewQueue(func() (Producer, error) {return producer, nil}, func() (Consumer, error) {return consumer, nil})

咱们看看 NewQueue 须要什么构建条件:

  1. producer constructor
  2. consumer constructor

将单方的工厂函数传递给 queue,由它去执行以及重试。

这两个须要的目标是将生产者 / 消费者的构建和音讯生产 / 生产都封装在 mq 中,而且将生产者 / 消费者的整套逻辑交给开发者解决:

type (
    // 开发者须要实现此接口
    Producer interface {AddListener(listener ProduceListener)
        Produce() (string, bool)
    }
    ...
    // ProducerFactory 定义了生成 Producer 的办法
    ProducerFactory func() (Producer, error)
)
  1. 其实也就是将生产者的逻辑交个开发者本人实现,mq 只负责生产者 / 消费者的消息传递和之间的调度。
  2. 工厂办法的设计,是将生产者自身和生产音讯,这两个工作都交给 queue 本人来做调度或者重试。

生产 msg

生产音讯当然要回到生产者自身:

type mockedProducer struct {
    total int32
    count int32
  // 应用 waitgroup 来模仿工作的实现
    wait  sync.WaitGroup
}
// 实现 Producer interface 的办法:Produce()
func (p *mockedProducer) Produce() (string, bool) {if atomic.AddInt32(&p.count, 1) <= p.total {p.wait.Done()
        return "item", true
    }
    time.Sleep(time.Second)
    return "", false
}

queue 中的生产者编写都必须实现:

  • Produce():由开发者编写生产音讯的逻辑
  • AddListener():生产者

生产 msg

和生产者相似:

type mockedConsumer struct {count  int32}

func (c *mockedConsumer) Consume(string) error {atomic.AddInt32(&c.count, 1)
    return nil
}

启动 queue

启动,而后验证咱们上述的生产者和消费者之间的数据是否传输胜利:

func TestQueue(t *testing.T) {producer := newMockedProducer(rounds)
    consumer := newMockedConsumer()
    // 创立 queue
    q := NewQueue(func() (Producer, error) {return producer, nil}, func() (Consumer, error) {return consumer, nil})
    // 当生产者生产结束,执行 Stop() 敞开生产端生产
    go func() {producer.wait.Wait()
    // mq 生产端进行生产,不是 mq 自身 Stop 运行
        q.Stop()}()
    // 启动
    q.Start()
    // 验证生产生产端是否音讯生产实现
    assert.Equal(t, int32(rounds), atomic.LoadInt32(&consumer.count))
}

以上就是 queue 最繁难的入门应用代码。开发者能够依据本人的业务理论状况:自在定义生产者 / 消费者曾经生产 / 生产逻辑。

整体设计

整体流程如上图:

  1. 整体的通信都由 channel 进行
  2. 通过退出监听器 listener,以及事件触发 event,相当于将触发器逻辑分离出来
  3. 生产者有 produceone,这个是生产音讯的逻辑,然而其中的 Produce() 是由开发者编写【下面的 interface 中正是这个函数】
  4. 同理消费者,Consume()

根本的音讯流动就入上图以及上述刻画的,具体的代码剖析咱们就留到下一篇,咱们😁剖析外面,尤其是如何管制 channel 是整个设计的外围。

总结

本篇文章从应用以及整个架构剖析上简略介绍了 queue 的设计。下篇咱们将深刻源码,剖析内部消息流转以及 channel 管制。

对于 go-zero 更多的设计和实现文章,能够继续关注咱们。欢送大家去关注和应用。

我的项目地址

https://github.com/tal-tech/go-zero

欢送应用 go-zero 并 star 反对咱们!

微信交换群

关注『微服务实际』公众号并回复 进群 获取社区群二维码。

go-zero 系列文章见『微服务实际』公众号

退出移动版