共计 3825 个字符,预计需要花费 10 分钟才能阅读完成。
上一篇文章咱们做了 channel 的根底设计,这一篇文章咱们来实现音讯队列两个必备的性能:实现确认和队列重入。
实现确认
咱们都晓得,在分布式系统的消息传递过程中有三种语义,也就是三种的质量保证:
- At most once: 至少一次。音讯在传递时,最多会被送达一次。换一个说法就是,没什么音讯可靠性保障,容许丢音讯。
- At least once: 至多一次。音讯在传递时,至多会被送达一次。也就是说,不容许丢音讯,然而容许有大量反复音讯呈现。
- Exactly once:恰好一次。音讯在传递时,只会被送达一次,不容许失落也不容许反复。
显然 Exactly once 语义是咱们最心愿零碎可能提供的,然而它实现起来非常复杂,所以绝大部分音讯队列零碎提供的都是 At least once 语义,而后用幂等性来保障业务的正确性。而 At least once 语义最常见的实现形式就是接管方在接管到音讯后进行回复确认,相似 TCP 握手中的 ACK。
要达到这个目标,咱们首先要在 channel 中保护一个 map 用于存储曾经发送的音讯,以及一个管道来辅助写入:
type Channel struct { | |
... | |
inFlightMessageChan chan *Message | |
inFlightMessages map[string]*Message | |
} |
在发送音讯的时候,咱们也往 inFlightMessageChan 中写入,同时在事件处理函数 Router 中减少对该管道的监听,将接管到的音讯增加到 inFlightMessages 中:
func (c *Channel) pushInFlightMessage(msg *Message) {c.inFlightMessages[util.UuidToStr(msg.Uuid())] = msg | |
} | |
func (c *Channel) popInFlightMessage(uuidStr string) (*Message, error) {msg, ok := c.inFlightMessages[uuidStr] | |
if !ok {return nil, errors.New("UUID not in flight") | |
} | |
delete(c.inFlightMessages, uuidStr) | |
return msg, nil | |
} | |
// Router handles the events of Channel | |
func (c *Channel) Router() { | |
... | |
go c.RequeueRouter(closeChan) | |
go c.MessagePump(closeChan) | |
... | |
} | |
func (c *Channel) RequeueRouter(closeChan chan struct{}) { | |
for { | |
select { | |
case msg := <-c.inFlightMessageChan: | |
c.pushInFlightMessage(msg) | |
case <-closeChan: | |
return | |
} | |
} | |
} | |
func (c *Channel) MessagePump(closeChan chan struct{}) { | |
for { | |
... | |
if msg != nil {c.inFlightMessageChan <- msg} | |
c.clientMessageChan <- msg | |
} | |
} |
接下来编写实现确认相干逻辑,还是在 channel 构造体中增加一个管道 finishMessageChan,提供写入办法,并减少相干事件处理逻辑,代码如下:
type Channel struct { | |
... | |
finishMessageChan chan util.ChanReq | |
} | |
func (c *Channel) FinishMessage(uuidStr string) error {errChan := make(chan interface{}) | |
c.finishMessageChan <- util.ChanReq{ | |
Variable: uuidStr, | |
RetChan: errChan, | |
} | |
err, _ := (<-errChan).(error) | |
return err | |
} | |
func (c *Channel) RequeueRouter(closeChan chan struct{}) { | |
for { | |
select { | |
... | |
case finishReq := <-c.finishMessageChan: | |
uuidStr := finishReq.Variable.(string) | |
_, err := c.popInFlightMessage(uuidStr) | |
if err != nil {log.Printf("ERROR: failed to finish message(%s) - %s", uuidStr, err.Error()) | |
} | |
finishReq.RetChan <- err | |
case <-closeChan: | |
return | |
} | |
} | |
} |
重入队列
音讯从新入队也是很常见的性能,比方消费者想屡次生产同一条音讯的时候就须要用到,当初咱们来实现一下。
重入和下面的实现确认逻辑很相似,就间接贴代码了:
type Channel struct { | |
... | |
requeueMessageChan chan util.ChanReq | |
} | |
func (c *Channel) RequeueMessage(uuidStr string) error {errChan := make(chan interface{}) | |
c.requeueMessageChan <- util.ChanReq{ | |
Variable: uuidStr, | |
RetChan: errChan, | |
} | |
err, _ := (<-errChan).(error) | |
return err | |
} | |
func (c *Channel) RequeueRouter(closeChan chan struct{}) { | |
for { | |
select { | |
... | |
case requeueReq := <-c.requeueMessageChan: | |
uuidStr := requeueReq.Variable.(string) | |
msg, err := c.popInFlightMessage(uuidStr) | |
if err != nil {log.Printf("ERROR: failed to requeue message(%s) - %s", uuidStr, err.Error()) | |
} else {go func(msg *Message) {c.PutMessage(msg) | |
}(msg) | |
} | |
requeueReq.RetChan <- err | |
case finishReq := <-c.finishMessageChan: | |
... | |
} | |
} | |
} |
到这里性能就差不多曾经全实现了,不过还有一个问题,就是如果消费者迟迟不确认实现的话,音讯就会大量沉积在 inFlightMessages 中。咱们能够增加这样一个逻辑:在限定的工夫内如果音讯没有确认实现的话,咱们就将该音讯主动从新入队。能够在监听 inFlightMessageChan 的 case 中退出以下代码:
func (c *Channel) RequeueRouter(closeChan chan struct{}) { | |
for { | |
select { | |
case msg := <-c.inFlightMessageChan: | |
... | |
go func(msg *Message) { | |
select {case <-time.After(60 * time.Second): | |
log.Printf("CHANNEL(%s): auto requeue of message(%s)", c.name, util.UuidToStr(msg.Uuid())) | |
} | |
err := c.RequeueMessage(util.UuidToStr(msg.Uuid())) | |
if err != nil {log.Printf("ERROR: channel(%s) - %s", c.name, err.Error()) | |
} | |
}(msg) | |
... | |
} | |
} | |
} |
当然,当音讯确认实现后咱们也须要终止这个期待超时的逻辑,这里的解决方案是在音讯 message 构造体中减少一个管道,期待入队的同时也监听这个管道,确认实现时则向这个管道发送音讯。message 改变如下:
message.go
type Message struct { | |
... | |
timerChan chan struct{}} | |
... | |
func (m *Message) EndTimer() { | |
select {case m.timerChan <- struct{}{}: | |
default: | |
} | |
} |
channel 改变如下:
func (c *Channel) popInFlightMessage(uuidStr string) (*Message, error) { | |
... | |
msg.EndTimer() | |
return msg, nil | |
} | |
... | |
func (c *Channel) RequeueRouter(closeChan chan struct{}) { | |
for { | |
select { | |
case msg := <-c.inFlightMessageChan: | |
... | |
go func(msg *Message) { | |
select {case <-time.After(60 * time.Second): | |
log.Printf("CHANNEL(%s): auto requeue of message(%s)", c.name, util.UuidToStr(msg.Uuid())) | |
case <-msg.timerChan: | |
return | |
} | |
... | |
}(msg) | |
} | |
} |
channel 的残缺代码:channel.go。
到这里咱们的 channel 就设计得差不多了,下一篇咱们将介绍 topic 和协定的实现。