共计 5085 个字符,预计需要花费 13 分钟才能阅读完成。
Redigo 连接池的使用
大家都知道 go 语言中的 goroutine 虽然消耗资源很小,并且是一个用户线程。但是 goroutine 也不是无限开的,所以我们会有很多关于协程池的库,当然啊我们自己也可以完成一些简单的携程池。redis 也是相同的,redis 的链接也是不推荐无限制的打开,否则会造成 redis 负荷加重。
先看一下 Redigo 中的连接池的使用
package main
import (
"fmt"
"github.com/panlei/redigo/redis"
"time"
)
func main() {
pool := &redis.Pool{
MaxIdle: 4,
MaxActive: 4,
Dial: func() (redis.Conn, error) {rc, err := redis.Dial("tcp", "127.0.0.1:6379")
if err != nil {return nil, err}
return rc, nil
},
IdleTimeout: time.Second,
Wait: true,
}
con := pool.Get()
str, err := redis.String(con.Do("get", "aaa"))
con.Close()
fmt.Println("value:", str, "err:", err)
}
我们可以看到 Redigo 使用连接池还是很简单的步骤:
- 创建连接池
- 简单设置连接池的最大链接数等参数
- 注入拨号函数(设置 redis 地址 端口号等)
- 调用 pool.Get() 获取连接
- 使用连接 Do 函数请求 redis
- 关闭连接
源码
Pool conn 对象的定义
type Pool struct {
// 拨号函数 从外部注入
Dial func() (Conn, error)
// DialContext is an application supplied function for creating and configuring a
DialContext func(ctx context.Context) (Conn, error)
// 检测连接的可用性,从外部注入。如果返回 error 则直接关闭连接
TestOnBorrow func(c Conn, t time.Time) error
// 最大闲置连接数量
MaxIdle int
// 最大活动连接数
MaxActive int
// 闲置过期时间 在 get 函数中会有逻辑 删除过期的连接
IdleTimeout time.Duration
// 设置如果活动连接达到上限 再获取时候是等待还是返回错误
// 如果是 false 系统会返回 redigo: connection pool exhausted
// 如果是 true 会利用 p 的 ch 属性让线程等待 知道有连接释放出来
Wait bool
// 连接最长生存时间 如果超过时间会被从链表中删除
MaxConnLifetime time.Duration
// 判断 ch 是否被初始化了
chInitialized uint32 // set to 1 when field ch is initialized
// 锁
mu sync.Mutex // mu protects the following fields
closed bool // set to true when the pool is closed.
active int // the number of open connections in the pool
ch chan struct{} // limits open connections when p.Wait is true
// 存放闲置连接的链表
idle idleList // idle connections
// 等待获取连接的数量
waitCount int64 // total number of connections waited for.
waitDuration time.Duration // total time waited for new connections.
}
// 连接池中的具体连接对象
type conn struct {
// 锁
mu sync.Mutex
pending int
err error
// http 包中的 conn 对象
conn net.Conn
// 读入过期时间
readTimeout time.Duration
// bufio reader 对象 用于读取 redis 服务返回的结果
br *bufio.Reader
// 写入过期时间
writeTimeout time.Duration
// bufio writer 对象 带 buf 用于往服务端写命令
bw *bufio.Writer
// Scratch space for formatting argument length.
// '*' or '$', length, "\r\n"
lenScratch [32]byte
// Scratch space for formatting integers and floats.
numScratch [40]byte
}
我们可以看到,其中有几个关键性的字段比如最大活动连接数、最大闲置连接数、闲置链接过期时间、连接生存时间等。
Pool 的 Get Close 方法
我们知道 连接池最重要的就是两个方法,一个是获取连接,一个是关闭连接。这个跟 sync.Pool。我们来看一下代码:
GET:
func (p *Pool) get(ctx context.Context) (*poolConn, error) {
// 处理是否需要等待 pool Wait 如果是 true 则等待连接释放
var waited time.Duration
if p.Wait && p.MaxActive > 0 {
// 重新初始化 pool 的 ch channel
p.lazyInit()
// wait indicates if we believe it will block so its not 100% accurate
// however for stats it should be good enough.
wait := len(p.ch) == 0
var start time.Time
if wait {start = time.Now()
}
// 获取 pool 的 ch 通道,一旦有连接被 close 则可以继续返回连接
if ctx == nil {<-p.ch} else {
select {
case <-p.ch:
case <-ctx.Done():
return nil, ctx.Err()}
}
if wait {waited = time.Since(start)
}
}
p.mu.Lock()
// 等待数量加 1 增加等待时间
if waited > 0 {
p.waitCount++
p.waitDuration += waited
}
// Prune stale connections at the back of the idle list.
// 删除链表尾部的陈旧连接,删除超时的连接
// 连接 close 之后,连接会回到 pool 的 idle(闲置)链表中
if p.IdleTimeout > 0 {
n := p.idle.count
for i := 0; i < n && p.idle.back != nil && p.idle.back.t.Add(p.IdleTimeout).Before(nowFunc()); i++ {
pc := p.idle.back
p.idle.popBack()
p.mu.Unlock()
pc.c.Close()
p.mu.Lock()
p.active--
}
}
// Get idle connection from the front of idle list.
// 获取链表空闲连接 拿链表第一个
for p.idle.front != nil {
pc := p.idle.front
p.idle.popFront()
p.mu.Unlock()
// 调用验证函数如果返回错误不为 nil 关闭连接拿下一个
// 判断连接生存时间 大于生存时间则关闭拿下一个
if (p.TestOnBorrow == nil || p.TestOnBorrow(pc.c, pc.t) == nil) &&
(p.MaxConnLifetime == 0 || nowFunc().Sub(pc.created) < p.MaxConnLifetime) {return pc, nil}
pc.c.Close()
p.mu.Lock()
p.active--
}
// Check for pool closed before dialing a new connection.
// 判断连接池是否被关闭 如果关闭则解锁报错
if p.closed {p.mu.Unlock()
return nil, errors.New("redigo: get on closed pool")
}
// Handle limit for p.Wait == false.
// 如果活动连接大于最大连接解锁 返回错误
if !p.Wait && p.MaxActive > 0 && p.active >= p.MaxActive {p.mu.Unlock()
return nil, ErrPoolExhausted
}
// 如果在链表中没有获取到可用的连接 并添加 active 数量添加
p.active++
p.mu.Unlock()
c, err := p.dial(ctx)
// 如果调用失败 则减少 active 数量
if err != nil {
c = nil
p.mu.Lock()
p.active--
if p.ch != nil && !p.closed {p.ch <- struct{}{}}
p.mu.Unlock()}
// 创建连接 设置创建时间
return &poolConn{c: c, created: nowFunc()}, err
}
Put:
// 关闭方法
func (ac *activeConn) Close() error {
pc := ac.pc
if pc == nil {return nil}
ac.pc = nil
// 判断连接的状态 发送取消事务 取消 watch
if ac.state&connectionMultiState != 0 {pc.c.Send("DISCARD")
ac.state &^= (connectionMultiState | connectionWatchState)
} else if ac.state&connectionWatchState != 0 {pc.c.Send("UNWATCH")
ac.state &^= connectionWatchState
}
if ac.state&connectionSubscribeState != 0 {pc.c.Send("UNSUBSCRIBE")
pc.c.Send("PUNSUBSCRIBE")
// To detect the end of the message stream, ask the server to echo
// a sentinel value and read until we see that value.
sentinelOnce.Do(initSentinel)
pc.c.Send("ECHO", sentinel)
pc.c.Flush()
for {p, err := pc.c.Receive()
if err != nil {break}
if p, ok := p.([]byte); ok && bytes.Equal(p, sentinel) {
ac.state &^= connectionSubscribeState
break
}
}
}
pc.c.Do("")
// 把连接放入链表
ac.p.put(pc, ac.state != 0 || pc.c.Err() != nil)
return nil
}
// 将连接 重新放入限制链表
func (p *Pool) put(pc *poolConn, forceClose bool) error {p.mu.Lock()
if !p.closed && !forceClose {pc.t = nowFunc()
p.idle.pushFront(pc)
if p.idle.count > p.MaxIdle {
pc = p.idle.back
p.idle.popBack()} else {pc = nil}
}
if pc != nil {p.mu.Unlock()
pc.c.Close()
p.mu.Lock()
p.active--
}
// 如果连接的 ch 不为空 并且连接池没有关闭 则给 channel 中输入一个 struct{}{}
// 如果在连接打到最大活动数量之后 再获取连接并且 pool 的 Wait 为 ture 会阻塞线程等待返回连接
if p.ch != nil && !p.closed {p.ch <- struct{}{}}
p.mu.Unlock()
return nil
}
总结
整个 Pool 整体流程,我大概画了一个图。
从初始化 =》获取 -》创建连接 =》返回连接 =》关闭连接 =》
其中还有一条线是 Pool.Wait = true 会一直阻塞 一直到有连接 Close 释放活动连接数 线程被唤醒返回闲置的连接
其实大部分的连接池都是类似的流程,比如 goroutine,redis。
正文完