乐趣区

Go-Redigo-源码分析三-执行命令

简单使用

简单使用 Do 函数获取单条和使用童丹请求多条获取多条数据。

func main() {
    // 1. 创建连接池
    // 2. 简单设置连接池的最大链接数等参数
    // 3. 注入拨号函数
    // 4. 调用 pool.Get() 获取连接
    pool := &redis.Pool{
        MaxIdle:   4,
        MaxActive: 4,
        Dial: func() (redis.Conn, error) {rc, err := redis.Dial("tcp", "127.0.0.1:6379")
            if err != nil {return nil, err}
            return rc, nil
        },
        IdleTimeout: time.Second,
        Wait:        true,
    }
    con := pool.Get()
    // 获取单条
    str, err := redis.String(con.Do("get", "aaa"))
    fmt.Println(str, err)
    // 通道 发送多条接受多条
    con.Send("get", "aaa")
    con.Send("get", "bbb")
    con.Send("get", "ccc")
    con.Flush()
    str, err = redis.String(con.Receive())
    fmt.Println("value:", str, "err:", err)
    str, err = redis.String(con.Receive())
    fmt.Println("value:", str, "err:", err)
    str, err = redis.String(con.Receive())
    fmt.Println("value:", str, "err:", err)
    con.Close()}

源码查看

上一篇看了 Get 方法获取连接池中的链接,获取到连接之后调用 Do 函数请求 redis 服务获取回复。现在我们就需要看 Do 函数的源码
1. Conn 接口 在 rediso 中有两个对象都实现了这个接口

type Conn interface {
    // Close closes the connection.
    Close() error

    // Err returns a non-nil value when the connection is not usable.
    Err() error

    // Do sends a command to the server and returns the received reply.
    Do(commandName string, args ...interface{}) (reply interface{}, err error)

    // Send writes the command to the client's output buffer.
    Send(commandName string, args ...interface{}) error

    // Flush flushes the output buffer to the Redis server.
    Flush() error

    // Receive receives a single reply from the Redis server
    Receive() (reply interface{}, err error)
}

// 连接池对外的连接对象
type activeConn struct {
    p     *Pool
    pc    *poolConn
    state int
}

// 连接对象 
type conn struct {
    //  锁
    mu      sync.Mutex
    pending int
    err     error
    // http 包中的 conn 对象
    conn    net.Conn

    // 读入过期时间
    readTimeout time.Duration
    // bufio reader 对象 用于读取 redis 服务返回的结果
    br          *bufio.Reader

    // 写入过期时间
    writeTimeout time.Duration
    // bufio writer 对象 带 buf 用于往服务端写命令
    bw           *bufio.Writer

    // Scratch space for formatting argument length.
    // '*' or '$', length, "\r\n"
    lenScratch [32]byte

    // Scratch space for formatting integers and floats.
    numScratch [40]byte
}

Do 函数
Do 函数最终调用的是 conn 对象的 DoWithTimeout 函数
DoWithTimeout 函数负责将请求的命令发送到 redis 服务 再从 redis 服务读取回复
writeCommand 函数是写入命令函数
readReply 函数是读取函数

// active conn Do 函数 设定请求状态用于关闭时候退出命令
func (ac *activeConn) Do(commandName string, args ...interface{}) (reply interface{}, err error) {
    pc := ac.pc
    if pc == nil {return nil, errConnClosed}
    // 查看是否需要改变状态
    ci := lookupCommandInfo(commandName)
    ac.state = (ac.state | ci.Set) &^ ci.Clear
    return pc.c.Do(commandName, args...)
}

func (c *conn) Do(cmd string, args ...interface{}) (interface{}, error) {return c.DoWithTimeout(c.readTimeout, cmd, args...)
}

// conn 执行命令函数
func (c *conn) DoWithTimeout(readTimeout time.Duration, cmd string, args ...interface{}) (interface{}, error) {c.mu.Lock()
    pending := c.pending
    c.pending = 0
    c.mu.Unlock()

    if cmd == "" && pending == 0 {return nil, nil}
    // 设置下入超时时间
    if c.writeTimeout != 0 {c.conn.SetWriteDeadline(time.Now().Add(c.writeTimeout))
    }

    // 如果 cmd 不为空则写入 redis 命令
    if cmd != "" {
        // 写入命令道 buf 中
        if err := c.writeCommand(cmd, args); err != nil {return nil, c.fatal(err)
        }
    }
    // 把写入的 buf 的 command 写入 conn 中 正式发送到服务器
    if err := c.bw.Flush(); err != nil {return nil, c.fatal(err)
    }

    var deadline time.Time
    if readTimeout != 0 {deadline = time.Now().Add(readTimeout)
    }
    c.conn.SetReadDeadline(deadline)

    if cmd == "" {reply := make([]interface{}, pending)
        for i := range reply {r, e := c.readReply()
            if e != nil {return nil, c.fatal(e)
            }
            reply[i] = r
        }
        return reply, nil
    }

    var err error
    var reply interface{}
    for i := 0; i <= pending; i++ {
        var e error
        if reply, e = c.readReply(); e != nil {return nil, c.fatal(e)
        }
        if e, ok := reply.(Error); ok && err == nil {err = e}
    }
    return reply, err
}

// 把 command 写入到 conn 的 write 中
// 1. 先写入 * 号
// 2. 再写入 command
// 3. 最后写入参数
func (c *conn) writeCommand(cmd string, args []interface{}) error {c.writeLen('*', 1+len(args))
    if err := c.writeString(cmd); err != nil {return err}
    for _, arg := range args {if err := c.writeArg(arg, true); err != nil {return err}
    }
    return nil
}

// 读取 redis 回复 通过判断回复雷星星 + -:$ 来解析
func (c *conn) readReply() (interface{}, error) {line, err := c.readLine()
    if err != nil {return nil, err}
    if len(line) == 0 {return nil, protocolError("short response line")
    }
    switch line[0] {
    // 回复状态
    case '+':
        switch {case len(line) == 3 && line[1] == 'O' && line[2] == 'K':
            // Avoid allocation for frequent "+OK" response.
            return okReply, nil
        case len(line) == 5 && line[1] == 'P' && line[2] == 'O' && line[3] == 'N' && line[4] == 'G':
            // Avoid allocation in PING command benchmarks :)
            return pongReply, nil
        default:
            return string(line[1:]), nil
        }
    // 错误回复
    case '-':
        return Error(string(line[1:])), nil
    // 整数回复
    case ':':
        return parseInt(line[1:])
    // 批量回复
    case '$':
        n, err := parseLen(line[1:])
        if n < 0 || err != nil {return nil, err}
        p := make([]byte, n)
        _, err = io.ReadFull(c.br, p)
        if err != nil {return nil, err}
        if line, err := c.readLine(); err != nil {return nil, err} else if len(line) != 0 {return nil, protocolError("bad bulk string format")
        }
        return p, nil
        // 多条批量回复
    case '*':
        n, err := parseLen(line[1:])
        if n < 0 || err != nil {return nil, err}
        r := make([]interface{}, n)
        for i := range r {r[i], err = c.readReply()
            if err != nil {return nil, err}
        }
        return r, nil
    }
    return nil, protocolError("unexpected response line")
}

总结

为了方便些注释,Fork 一份加上了一些注释希望对于理解有帮助:连接地址
其实我们可以看到 Redigo 其实就是帮助我们把命令发送到 redis 服务中兵获取 redis 回复。总体的流程还是比较简单的。

退出移动版