共计 3616 个字符,预计需要花费 10 分钟才能阅读完成。
上一篇文章中咱们曾经实现了 channel 的设计,这里咱们持续实现 topic 的设计工作。
topic
字段设计
topic 的作用是接管客户端的音讯,而后同时发送给所有绑定的 channel 上,所以它的设计和 channel 很相似,蕴含的字段有:
- name:名称
- newChannelChan:新增 channel 的管道
- channelMap:保护的 channel 汇合
- incomingMessageChan:接管音讯的管道
- msgChan:有缓冲管道,相当于音讯的内存队列
- readSyncChan:和 routerSyncChan 配合应用保障 channelMap 的并发平安
- routerSyncChan:见上
- exitChan:接管退出信号的管道
- channelWriteStarted:是否已向 channel 发送音讯
topic 工厂
咱们须要保护一个全局的 topic map,在消费者订阅时生成新的 topic,相似于一个工厂,逻辑与第一篇中生成 uuid 相似:
注:Router 是 topic 的事件处理办法,详情见后文。
var (TopicMap = make(map[string]*Topic) | |
newTopicChan = make(chan util.ChanReq) | |
) | |
func NewTopic(name string, inMemSize int) *Topic { | |
topic := &Topic{ | |
name: name, | |
newChannelChan: make(chan util.ChanReq), | |
channelMap: make(map[string]*Channel), | |
incomingMessageChan: make(chan *Message), | |
msgChan: make(chan *Message, inMemSize), | |
readSyncChan: make(chan struct{}), | |
routerSyncChan: make(chan struct{}), | |
exitChan: make(chan util.ChanReq), | |
} | |
go topic.Router(inMemSize) | |
return topic | |
} | |
func GetTopic(name string) *Topic {topicChan := make(chan interface{}) | |
newTopicChan <- util.ChanReq{ | |
Variable: name, | |
RetChan: topicChan, | |
} | |
return (<-topicChan).(*Topic) | |
} | |
func TopicFactory(inMemSize int) { | |
var ( | |
topicReq util.ChanReq | |
name string | |
topic *Topic | |
ok bool | |
) | |
for { | |
topicReq = <-newTopicChan | |
name = topicReq.Variable.(string) | |
if topic, ok = TopicMap[name]; !ok {topic = NewTopic(name, inMemSize) | |
TopicMap[name] = topic | |
log.Printf("TOPIC %s CREATED", name) | |
} | |
topicReq.RetChan <- topic | |
} | |
} |
保护 channel
topic 保护 channel 的逻辑和 channel 保护消费者类似,也是“老熟人”chan + slice 的组合:
func (t *Topic) GetChannel(channelName string) *Channel {channelRet := make(chan interface{}) | |
t.newChannelChan <- util.ChanReq{ | |
Variable: channelName, | |
RetChan: channelRet, | |
} | |
return (<-channelRet).(*Channel) | |
} | |
func (t *Topic) Router(inMemSize int) { | |
for { | |
select { | |
case channelReq := <-t.newChannelChan: | |
channelName := channelReq.Variable.(string) | |
channel, ok := t.channelMap[channelName] | |
if !ok {channel = NewChannel(channelName, inMemSize) | |
t.channelMap[channelName] = channel | |
log.Printf("TOPIC(%s): new channel(%s)", t.name, channel.name) | |
} | |
channelReq.RetChan <- channel | |
} | |
} | |
} |
推送音讯给 channel
此处的逻辑仍然与 channel 中的设计相似,间接贴代码:
func (t *Topic) PutMessage(msg *Message) {t.incomingMessageChan <- msg} | |
func (t *Topic) MessagePump() { | |
var msg *Message | |
for { | |
select {case msg = <-t.msgChan:} | |
t.readSyncChan <- struct{}{} | |
for _, channel := range t.channelMap {go func(ch *Channel) {ch.PutMessage(msg) | |
}(channel) | |
} | |
t.routerSyncChan <- struct{}{} | |
} | |
} | |
func (t *Topic) Router(inMemSize int) { | |
var (msg *Message) | |
for { | |
select { | |
case channelReq := <-t.newChannelChan: | |
... | |
if !t.channelWriteStarted {go t.MessagePump(closeChan) | |
t.channelWriteStarted = true | |
} | |
case msg = <-t.incomingMessageChan: | |
select { | |
case t.msgChan <- msg: | |
log.Printf("TOPIC(%s) wrote message", t.name) | |
default: | |
} | |
case <-t.readSyncChan: | |
<-t.routerSyncChan | |
} | |
} | |
} |
咱们还是从 incomingMessageChan 中读取音讯,而后写入 msgChan,msgChan 缓冲区满了就抛弃(后续会加上长久化磁盘性能)。推送音讯到 channel 的协程是在增加 channel 时开启的,因为没有 channel 的话 topic 并不会推送音讯。
在向所有 channel 推送音讯的前后,咱们发现多了两个读管道的操作,这样做的目标是防止 map 的并发读写谬误。在 Go 语言中 map 是不反对并发读写的,因而咱们在遍历 channel 之前先读取 readSyncChan,确保咱们在遍历的时候调度协程是阻塞在 <-t.readSyncChan
这个 case 上,防止了对 map 的并发写操作。
敞开
敞开操作置信大家曾经一目了然了,无非就是监听推出信号的管道而后敞开 channel 和推送音讯的协程,代码如下:
func (t *Topic) MessagePump(closeChan <-chan struct{}) { | |
... | |
for { | |
select { | |
... | |
case <-closeChan: | |
return | |
} | |
... | |
} | |
} | |
func (t *Topic) Router(inMemSize int) { | |
var ( | |
msg *Message | |
closeChan = make(chan struct{}) | |
) | |
for { | |
select { | |
case channelReq := <-t.newChannelChan: | |
... | |
if !t.channelWriteStarted {go t.MessagePump(closeChan) | |
t.channelWriteStarted = true | |
} | |
... | |
case closeReq := <-t.exitChan: | |
log.Printf("TOPIC(%s): closing", t.name) | |
for _, channel := range t.channelMap {err := channel.Close() | |
if err != nil {log.Printf("ERROR: channel(%s) close - %s", channel.name, err.Error()) | |
} | |
} | |
close(closeChan) | |
closeReq.RetChan <- nil | |
} | |
} | |
} | |
func (t *Topic) Close() error {errChan := make(chan interface{}) | |
t.exitChan <- util.ChanReq{RetChan: errChan,} | |
err, _ := (<-errChan).(error) | |
return err | |
} |
topic 残缺代码:topic.go
到这里咱们的两个外围组件 topic 和 channel 就全副设计实现了,下一篇文章咱们持续实现协定和后盾队列的性能。
当前目录构造为: