共计 4513 个字符,预计需要花费 12 分钟才能阅读完成。
简单使用
简单使用 Do 函数获取单条和使用童丹请求多条获取多条数据。
func main() {
// 1. 创建连接池
// 2. 简单设置连接池的最大链接数等参数
// 3. 注入拨号函数
// 4. 调用 pool.Get() 获取连接
pool := &redis.Pool{
MaxIdle: 4,
MaxActive: 4,
Dial: func() (redis.Conn, error) {rc, err := redis.Dial("tcp", "127.0.0.1:6379")
if err != nil {return nil, err}
return rc, nil
},
IdleTimeout: time.Second,
Wait: true,
}
con := pool.Get()
// 获取单条
str, err := redis.String(con.Do("get", "aaa"))
fmt.Println(str, err)
// 通道 发送多条接受多条
con.Send("get", "aaa")
con.Send("get", "bbb")
con.Send("get", "ccc")
con.Flush()
str, err = redis.String(con.Receive())
fmt.Println("value:", str, "err:", err)
str, err = redis.String(con.Receive())
fmt.Println("value:", str, "err:", err)
str, err = redis.String(con.Receive())
fmt.Println("value:", str, "err:", err)
con.Close()}
源码查看
上一篇看了 Get 方法获取连接池中的链接,获取到连接之后调用 Do 函数请求 redis 服务获取回复。现在我们就需要看 Do 函数的源码
1. Conn 接口 在 rediso 中有两个对象都实现了这个接口
type Conn interface {
// Close closes the connection.
Close() error
// Err returns a non-nil value when the connection is not usable.
Err() error
// Do sends a command to the server and returns the received reply.
Do(commandName string, args ...interface{}) (reply interface{}, err error)
// Send writes the command to the client's output buffer.
Send(commandName string, args ...interface{}) error
// Flush flushes the output buffer to the Redis server.
Flush() error
// Receive receives a single reply from the Redis server
Receive() (reply interface{}, err error)
}
// 连接池对外的连接对象
type activeConn struct {
p *Pool
pc *poolConn
state int
}
// 连接对象
type conn struct {
// 锁
mu sync.Mutex
pending int
err error
// http 包中的 conn 对象
conn net.Conn
// 读入过期时间
readTimeout time.Duration
// bufio reader 对象 用于读取 redis 服务返回的结果
br *bufio.Reader
// 写入过期时间
writeTimeout time.Duration
// bufio writer 对象 带 buf 用于往服务端写命令
bw *bufio.Writer
// Scratch space for formatting argument length.
// '*' or '$', length, "\r\n"
lenScratch [32]byte
// Scratch space for formatting integers and floats.
numScratch [40]byte
}
Do 函数
Do 函数最终调用的是 conn 对象的 DoWithTimeout 函数
DoWithTimeout 函数负责将请求的命令发送到 redis 服务 再从 redis 服务读取回复
writeCommand 函数是写入命令函数
readReply 函数是读取函数
// active conn Do 函数 设定请求状态用于关闭时候退出命令
func (ac *activeConn) Do(commandName string, args ...interface{}) (reply interface{}, err error) {
pc := ac.pc
if pc == nil {return nil, errConnClosed}
// 查看是否需要改变状态
ci := lookupCommandInfo(commandName)
ac.state = (ac.state | ci.Set) &^ ci.Clear
return pc.c.Do(commandName, args...)
}
func (c *conn) Do(cmd string, args ...interface{}) (interface{}, error) {return c.DoWithTimeout(c.readTimeout, cmd, args...)
}
// conn 执行命令函数
func (c *conn) DoWithTimeout(readTimeout time.Duration, cmd string, args ...interface{}) (interface{}, error) {c.mu.Lock()
pending := c.pending
c.pending = 0
c.mu.Unlock()
if cmd == "" && pending == 0 {return nil, nil}
// 设置下入超时时间
if c.writeTimeout != 0 {c.conn.SetWriteDeadline(time.Now().Add(c.writeTimeout))
}
// 如果 cmd 不为空则写入 redis 命令
if cmd != "" {
// 写入命令道 buf 中
if err := c.writeCommand(cmd, args); err != nil {return nil, c.fatal(err)
}
}
// 把写入的 buf 的 command 写入 conn 中 正式发送到服务器
if err := c.bw.Flush(); err != nil {return nil, c.fatal(err)
}
var deadline time.Time
if readTimeout != 0 {deadline = time.Now().Add(readTimeout)
}
c.conn.SetReadDeadline(deadline)
if cmd == "" {reply := make([]interface{}, pending)
for i := range reply {r, e := c.readReply()
if e != nil {return nil, c.fatal(e)
}
reply[i] = r
}
return reply, nil
}
var err error
var reply interface{}
for i := 0; i <= pending; i++ {
var e error
if reply, e = c.readReply(); e != nil {return nil, c.fatal(e)
}
if e, ok := reply.(Error); ok && err == nil {err = e}
}
return reply, err
}
// 把 command 写入到 conn 的 write 中
// 1. 先写入 * 号
// 2. 再写入 command
// 3. 最后写入参数
func (c *conn) writeCommand(cmd string, args []interface{}) error {c.writeLen('*', 1+len(args))
if err := c.writeString(cmd); err != nil {return err}
for _, arg := range args {if err := c.writeArg(arg, true); err != nil {return err}
}
return nil
}
// 读取 redis 回复 通过判断回复雷星星 + -:$ 来解析
func (c *conn) readReply() (interface{}, error) {line, err := c.readLine()
if err != nil {return nil, err}
if len(line) == 0 {return nil, protocolError("short response line")
}
switch line[0] {
// 回复状态
case '+':
switch {case len(line) == 3 && line[1] == 'O' && line[2] == 'K':
// Avoid allocation for frequent "+OK" response.
return okReply, nil
case len(line) == 5 && line[1] == 'P' && line[2] == 'O' && line[3] == 'N' && line[4] == 'G':
// Avoid allocation in PING command benchmarks :)
return pongReply, nil
default:
return string(line[1:]), nil
}
// 错误回复
case '-':
return Error(string(line[1:])), nil
// 整数回复
case ':':
return parseInt(line[1:])
// 批量回复
case '$':
n, err := parseLen(line[1:])
if n < 0 || err != nil {return nil, err}
p := make([]byte, n)
_, err = io.ReadFull(c.br, p)
if err != nil {return nil, err}
if line, err := c.readLine(); err != nil {return nil, err} else if len(line) != 0 {return nil, protocolError("bad bulk string format")
}
return p, nil
// 多条批量回复
case '*':
n, err := parseLen(line[1:])
if n < 0 || err != nil {return nil, err}
r := make([]interface{}, n)
for i := range r {r[i], err = c.readReply()
if err != nil {return nil, err}
}
return r, nil
}
return nil, protocolError("unexpected response line")
}
总结
为了方便些注释,Fork 一份加上了一些注释希望对于理解有帮助:连接地址
其实我们可以看到 Redigo 其实就是帮助我们把命令发送到 redis 服务中兵获取 redis 回复。总体的流程还是比较简单的。
正文完