上一篇文章咱们定义了音讯体和根底工具,这一篇咱们开始着手客户端的处理函数和 channel 的根底设计。

客户端处理函数

这里所谓的客户端指的是消费者,处理函数也就是解决消费者同咱们服务之间的 tcp 连贯。咱们定义一个构造体 Client,外面蕴含有连贯和状态字段,而后就是编写读写状态和 tcp 连贯的相干函数。

client.go

package serverimport (    "encoding/binary"    "io"    "log")type Client struct {    conn  io.ReadWriteCloser    name  string    state int}func NewClient(conn io.ReadWriteCloser, name string) *Client {    return &Client{conn, name, -1}}func (c *Client) String() string {    return c.name}func (c *Client) GetState() int {    return c.state}func (c *Client) SetState(state int) {    c.state = state}func (c *Client) Read(data []byte) (int, error) {    return c.conn.Read(data)}func (c *Client) Write(data []byte) (int, error) {    var err error    err = binary.Write(c.conn, binary.BigEndian, int32(len(data)))    if err != nil {        return 0, err    }    n, err := c.conn.Write(data)    if err != nil {        return 0, err    }    return n + 4, nil}func (c *Client) Close() {    log.Printf("CLIENT(%s): closing", c.String())    c.conn.Close()}

这里的逻辑比较简单,惟一值得一提的是 Write 办法。在给消费者写音讯之前,咱们先往连贯中写入音讯体的长度,固定为 4 个字节,这样客户端读取的时候就能够先读取长度,而后按长度读取音讯。

channel

从上篇文章中咱们能够晓得,channel 是咱们这个音讯队列中的外围数据结构之一,因而它的设计尤为重要。

保护消费者信息

首先,因为咱们的消费者是从 channel 中读取音讯的,所以 channel 中须要保护消费者的信息,并且能够增删消费者。因而咱们先在 channel 构造中保护一个 consumer 数组和两个管道用来接管增删 consumer 的音讯:

type Consumer interface {    Close()}type Channel struct {    name                string    addClientChan       chan util.ChanReq    removeClientChan    chan util.ChanReq    clients             []Consumer}func (c *Channel) AddClient(client Consumer) {    log.Printf("Channel(%s): adding client...", c.name)    doneChan := make(chan interface{})    c.addClientChan <- util.ChanReq{        Variable: client,        RetChan:  doneChan,    }    <-doneChan}func (c *Channel) RemoveClient(client Consumer) {    log.Printf("Channel(%s): removing client...", c.name)    doneChan := make(chan interface{})    c.removeClientChan <- util.ChanReq{        Variable: client,        RetChan:  doneChan,    }    <-doneChan}

值得注意的是,这里咱们没有间接绑定下面的 Client 构造体,而是形象出了一个 Consumer 接口。这样做的益处是倒转依赖关系,而且能够防止包循环援用。

既然有了接管音讯的管道,那么咱们须要一个常驻后盾的 goroutine 来解决这些音讯,能够称之为事件处理循环,也就是一个 for + select 组合:

// Router handles the events of Channelfunc (c *Channel) Router() {    var clientReq util.ChanReq    for {        select {        case clientReq = <-c.addClientChan:            client := clientReq.Variable.(Consumer)            c.clients = append(c.clients, client)            log.Printf("CHANNEL(%s) added client %#v", c.name, client)            clientReq.RetChan <- struct{}{}        case clientReq = <-c.removeClientChan:            client := clientReq.Variable.(Consumer)            indexToRemove := -1            for k, v := range c.clients {                if v == client {                    indexToRemove = k                    break                }            }            if indexToRemove == -1 {                log.Printf("ERROR: could not find client(%#v) in clients(%#v)", client, c.clients)            } else {                c.clients = append(c.clients[:indexToRemove], c.clients[indexToRemove+1:]...)                log.Printf("CHANNEL(%s) removed client %#v", c.name, client)            }            clientReq.RetChan <- struct{}{}        }    }}

收发音讯

对于收发音讯,这里咱们应用三个管道来实现:

  • msgChan:这是一个有缓冲管道,用来暂存音讯,超过长度则抛弃音讯(后续会加上长久化到磁盘的性能)
  • incomingMessageChan:用来接管生产者的音讯
  • clientMessageChan:音讯会被发送到这个管道,后续会由消费者拉取

代码如下:

type Channel struct {    ...    incomingMessageChan chan *Message    msgChan             chan *Message    clientMessageChan   chan *Message}func (c *Channel) PutMessage(msg *Message) {    c.incomingMessageChan <- msg}func (c *Channel) PullMessage() *Message {    return <-c.clientMessageChan}func (c *Channel) Router() {    var clientReq util.ChanReq    go c.MessagePump()    for {        select {        ...        case msg := <-c.incomingMessageChan:            // 避免因 msgChan 缓冲填满时造成阻塞,加上一个 default 分支间接抛弃音讯            select {            case c.msgChan <- msg:                log.Printf("CHANNEL(%s) wrote message", c.name)            default:            }        }    }}// MessagePump send messages to ClientMessageChanfunc (c *Channel) MessagePump() {    var msg *Message    for {        select {        case msg = <-c.msgChan:        }        c.clientMessageChan <- msg    }}

敞开

当 channel 敞开的时候,咱们须要做一些清理的工作,首先咱们减少一个接管敞开信号的管道,在接管到信号时敞开发送音讯的 MessagePump 协程和消费者连贯,代码如下:

type Channel struct {    ...    exitChan            chan util.ChanReq}func (c *Channel) Router() {    var (        ...        closeChan = make(chan struct{})    )    go c.MessagePump(closeChan)    for {        select {        ...        case closeReq := <-c.exitChan:            log.Printf("CHANNEL(%s) is closing", c.name)            close(closeChan)            for _, consumer := range c.clients {                consumer.Close()            }            closeReq.RetChan <- nil        }    }}// MessagePump send messages to ClientMessageChanfunc (c *Channel) MessagePump(closeChan chan struct{}) {    var msg *Message    for {        select {        ...        case <-closeChan:            return        }        ...    }}func (c *Channel) Close() error {    errChan := make(chan interface{})    c.exitChan <- util.ChanReq{        RetChan: errChan,    }    err, _ := (<-errChan).(error)    return err}

咱们在事件处理循环中初始化一个管道,并作为参数传递给 MessagePump 协程,当接管到敞开信号时敞开此管道,而后顺次敞开消费者连贯,敞开逻辑就完结了。

channel 的残缺代码如下:

channel.go

此时咱们的目录构造为: