共计 2178 个字符,预计需要花费 6 分钟才能阅读完成。
最近在看公司的 redis queue
时,发现底层应用的是 go-zero
的 queue
。本篇文章来看看 queue
的设计,也心愿能够从外面理解到 mq
的最小型设计实际。
应用
联合其余 mq
的应用经验,根本的应用流程:
- 创立
producer
或consumer
- 启动
mq
- 生产音讯 / 生产音讯
对应到 queue
中,大抵也是这个:
创立 queue
// 生产者创立工厂
producer := newMockedProducer()
// 消费者创立工厂
consumer := newMockedConsumer()
// 将生产者以及消费者的创立工厂函数传递给 NewQueue()
q := queue.NewQueue(func() (Producer, error) {return producer, nil}, func() (Consumer, error) {return consumer, nil})
咱们看看 NewQueue
须要什么构建条件:
producer constructor
consumer constructor
将单方的工厂函数传递给 queue
,由它去执行以及重试。
这两个须要的目标是将生产者 / 消费者的构建和音讯生产 / 生产都封装在 mq
中,而且将生产者 / 消费者的整套逻辑交给开发者解决:
type (
// 开发者须要实现此接口
Producer interface {AddListener(listener ProduceListener)
Produce() (string, bool)
}
...
// ProducerFactory 定义了生成 Producer 的办法
ProducerFactory func() (Producer, error)
)
- 其实也就是将生产者的逻辑交个开发者本人实现,
mq
只负责生产者 / 消费者的消息传递和之间的调度。 - 工厂办法的设计,是将生产者自身和生产音讯,这两个工作都交给
queue
本人来做调度或者重试。
生产 msg
生产音讯当然要回到生产者自身:
type mockedProducer struct {
total int32
count int32
// 应用 waitgroup 来模仿工作的实现
wait sync.WaitGroup
}
// 实现 Producer interface 的办法:Produce()
func (p *mockedProducer) Produce() (string, bool) {if atomic.AddInt32(&p.count, 1) <= p.total {p.wait.Done()
return "item", true
}
time.Sleep(time.Second)
return "", false
}
queue
中的生产者编写都必须实现:
Produce()
:由开发者编写生产音讯的逻辑AddListener()
:生产者
生产 msg
和生产者相似:
type mockedConsumer struct {count int32}
func (c *mockedConsumer) Consume(string) error {atomic.AddInt32(&c.count, 1)
return nil
}
启动 queue
启动,而后验证咱们上述的生产者和消费者之间的数据是否传输胜利:
func TestQueue(t *testing.T) {producer := newMockedProducer(rounds)
consumer := newMockedConsumer()
// 创立 queue
q := NewQueue(func() (Producer, error) {return producer, nil}, func() (Consumer, error) {return consumer, nil})
// 当生产者生产结束,执行 Stop() 敞开生产端生产
go func() {producer.wait.Wait()
// mq 生产端进行生产,不是 mq 自身 Stop 运行
q.Stop()}()
// 启动
q.Start()
// 验证生产生产端是否音讯生产实现
assert.Equal(t, int32(rounds), atomic.LoadInt32(&consumer.count))
}
以上就是 queue
最繁难的入门应用代码。开发者能够依据本人的业务理论状况:自在定义生产者 / 消费者曾经生产 / 生产逻辑。
整体设计
整体流程如上图:
- 整体的通信都由
channel
进行 - 通过退出监听器
listener
,以及事件触发event
,相当于将触发器逻辑分离出来 - 生产者有
produceone
,这个是生产音讯的逻辑,然而其中的Produce()
是由开发者编写【下面的interface
中正是这个函数】 - 同理消费者,
Consume()
根本的音讯流动就入上图以及上述刻画的,具体的代码剖析咱们就留到下一篇,咱们😁剖析外面,尤其是如何管制 channel
是整个设计的外围。
总结
本篇文章从应用以及整个架构剖析上简略介绍了 queue
的设计。下篇咱们将深刻源码,剖析内部消息流转以及 channel
管制。
对于 go-zero
更多的设计和实现文章,能够继续关注咱们。欢送大家去关注和应用。
我的项目地址
https://github.com/tal-tech/go-zero
欢送应用 go-zero 并 star 反对咱们!
微信交换群
关注『微服务实际』公众号并回复 进群 获取社区群二维码。
go-zero 系列文章见『微服务实际』公众号
正文完