上一篇文章中咱们曾经实现了 channel 的设计,这里咱们持续实现 topic 的设计工作。

topic

字段设计

topic 的作用是接管客户端的音讯,而后同时发送给所有绑定的 channel 上,所以它的设计和 channel 很相似,蕴含的字段有:

  • name:名称
  • newChannelChan:新增 channel 的管道
  • channelMap:保护的 channel 汇合
  • incomingMessageChan:接管音讯的管道
  • msgChan:有缓冲管道,相当于音讯的内存队列
  • readSyncChan:和 routerSyncChan 配合应用保障 channelMap 的并发平安
  • routerSyncChan:见上
  • exitChan:接管退出信号的管道
  • channelWriteStarted:是否已向 channel 发送音讯

topic 工厂

咱们须要保护一个全局的 topic map,在消费者订阅时生成新的 topic,相似于一个工厂,逻辑与第一篇中生成 uuid 相似:

注:Router 是 topic 的事件处理办法,详情见后文。

var (    TopicMap     = make(map[string]*Topic)    newTopicChan = make(chan util.ChanReq))func NewTopic(name string, inMemSize int) *Topic {    topic := &Topic{        name:                name,        newChannelChan:      make(chan util.ChanReq),        channelMap:          make(map[string]*Channel),        incomingMessageChan: make(chan *Message),        msgChan:             make(chan *Message, inMemSize),        readSyncChan:        make(chan struct{}),        routerSyncChan:      make(chan struct{}),        exitChan:            make(chan util.ChanReq),    }    go topic.Router(inMemSize)    return topic}func GetTopic(name string) *Topic {    topicChan := make(chan interface{})    newTopicChan <- util.ChanReq{        Variable: name,        RetChan:  topicChan,    }    return (<-topicChan).(*Topic)}func TopicFactory(inMemSize int) {    var (        topicReq util.ChanReq        name     string        topic    *Topic        ok       bool    )    for {        topicReq = <-newTopicChan        name = topicReq.Variable.(string)        if topic, ok = TopicMap[name]; !ok {            topic = NewTopic(name, inMemSize)            TopicMap[name] = topic            log.Printf("TOPIC %s CREATED", name)        }        topicReq.RetChan <- topic    }}

保护 channel

topic 保护 channel 的逻辑和 channel 保护消费者类似,也是“老熟人” chan + slice 的组合:

func (t *Topic) GetChannel(channelName string) *Channel {    channelRet := make(chan interface{})    t.newChannelChan <- util.ChanReq{        Variable: channelName,        RetChan:  channelRet,    }    return (<-channelRet).(*Channel)}func (t *Topic) Router(inMemSize int) {    for {        select {        case channelReq := <-t.newChannelChan:            channelName := channelReq.Variable.(string)            channel, ok := t.channelMap[channelName]            if !ok {                channel = NewChannel(channelName, inMemSize)                t.channelMap[channelName] = channel                log.Printf("TOPIC(%s): new channel(%s)", t.name, channel.name)            }            channelReq.RetChan <- channel        }    }}

推送音讯给 channel

此处的逻辑仍然与 channel 中的设计相似,间接贴代码:

func (t *Topic) PutMessage(msg *Message) {    t.incomingMessageChan <- msg}func (t *Topic) MessagePump() {    var msg *Message    for {        select {        case msg = <-t.msgChan:        }        t.readSyncChan <- struct{}{}        for _, channel := range t.channelMap {            go func(ch *Channel) {                ch.PutMessage(msg)            }(channel)        }        t.routerSyncChan <- struct{}{}    }}func (t *Topic) Router(inMemSize int) {    var (        msg *Message    )    for {        select {        case channelReq := <-t.newChannelChan:            ...            if !t.channelWriteStarted {                go t.MessagePump(closeChan)                t.channelWriteStarted = true            }        case msg = <-t.incomingMessageChan:            select {            case t.msgChan <- msg:                log.Printf("TOPIC(%s) wrote message", t.name)            default:            }        case <-t.readSyncChan:            <-t.routerSyncChan        }    }}

咱们还是从 incomingMessageChan 中读取音讯,而后写入 msgChan,msgChan 缓冲区满了就抛弃(后续会加上长久化磁盘性能)。推送音讯到 channel 的协程是在增加 channel 时开启的,因为没有 channel 的话 topic 并不会推送音讯。

在向所有 channel 推送音讯的前后,咱们发现多了两个读管道的操作,这样做的目标是防止 map 的并发读写谬误。在 Go 语言中 map 是不反对并发读写的,因而咱们在遍历 channel 之前先读取 readSyncChan,确保咱们在遍历的时候调度协程是阻塞在 <-t.readSyncChan 这个 case 上,防止了对 map 的并发写操作。

敞开

敞开操作置信大家曾经一目了然了,无非就是监听推出信号的管道而后敞开 channel 和推送音讯的协程,代码如下:

func (t *Topic) MessagePump(closeChan <-chan struct{}) {    ...    for {        select {        ...        case <-closeChan:            return        }        ...    }}func (t *Topic) Router(inMemSize int) {    var (        msg       *Message        closeChan = make(chan struct{})    )    for {        select {        case channelReq := <-t.newChannelChan:            ...            if !t.channelWriteStarted {                go t.MessagePump(closeChan)                t.channelWriteStarted = true            }            ...        case closeReq := <-t.exitChan:            log.Printf("TOPIC(%s): closing", t.name)            for _, channel := range t.channelMap {                err := channel.Close()                if err != nil {                    log.Printf("ERROR: channel(%s) close - %s", channel.name, err.Error())                }            }            close(closeChan)            closeReq.RetChan <- nil        }    }}func (t *Topic) Close() error {    errChan := make(chan interface{})    t.exitChan <- util.ChanReq{        RetChan: errChan,    }    err, _ := (<-errChan).(error)    return err}

topic 残缺代码:topic.go

到这里咱们的两个外围组件 topic 和 channel 就全副设计实现了,下一篇文章咱们持续实现协定和后盾队列的性能。

当前目录构造为: