proxy 启动
cmd/proxy/main.go 文件
解析配置文件之后重点是 proxy.New(config) 函数
该函数中,首先会创建一个 Proxy 结构体,如下:
type Proxy struct {
mu sync.Mutex
…
config *Config
router *Router //Router 中比较重要的是连接池和 slots
…
lproxy net.Listener //19000 端口的 Listener
ladmin net.Listener //11080 端口的 Listener
…
}
然后起两个协程, 分别处理 11080 和 19000 端口的请求
go s.serveAdmin()
go s.serveProxy()
我们重点看 s.serveProxy() 的处理流程,即 redis client 连接 19000 端口后 proxy 如何分发到 codis server 并且将结果返回到客户端
Proxy 处理
s.serverProxy 也启动了两个协程,一个协程对 router 中连接池中的连接进行连接可用性检测,另一个协程是一个死循环,accept lproxy 端口的连接,并且启动一个新的 Session 进行处理,代码流程如下:
go func(l net.Listener) (err error) {
defer func() {
eh <- err
}()
for {
c, err := s.acceptConn(l)//accept 连接
if err != nil {
return err
}
NewSession(c, s.config).Start(s.router)// 启动一个新的 session 进行处理
}
}(s.lproxy)// s 为 proxy,s.lproxy 即 19000 端口的监听
首先介绍一下 Request 结构体,该结构体会贯穿整个流程
type Request struct {
Multi []*redis.Resp // 保存请求命令, 按 redis 的 resp 协议类型将请求保存到 Multi 字段中
Batch *sync.WaitGroup // 返回响应时, 会在 Batch 处等待,r.Batch.Wait(), 所以可以做到当请求执行完成后才会执行返回函数
Group *sync.WaitGroup
Broken *atomic2.Bool
OpStr string
OpFlag
Database int32
UnixNano int64
*redis.Resp // 保存响应数据, 也是 redis 的 resp 协议类型
Err error
Coalesce func() error // 聚合函数, 适用于 mget/mset 等需要聚合响应的操作命令
}
Start 函数处理流程如下:
tasks := NewRequestChanBuffer(1024)//tasks 是一个指向 RequestChan 的指针,RequestChan 结构体中有一个 data 字段,data 字段是个数组,保存 1024 个指向 Request 的指针
go func() {
s.loopWriter(tasks)// 从 RequestChan 的 data 中取出请求并且返回给客户端,如果是 mget/mset 这种需要聚合相应的请求, 则会等待所有拆分的子请求执行完毕后执行聚合函数,然后将结果返回给客户端
decrSessions()
}()
go func() {
s.loopReader(tasks, d)// 首先根据 key 计算该 key 分配到哪个 slot. 在此步骤中只会将 slot 对应的连接取出,然后将请求放到连接的 input 字段中。
tasks.Close()
}()
可以看到,s.loopWriter 只是从 RequestChan 的 data 字段中取出请求并且返回给客户端,通过上文 Request 结构体的介绍,可以看到,通过在 request 的 Batch 执行 wait 操作,只有请求处理完成后 loopWriter 才会执行
下边我们看 loopReader 的执行流程
r := &Request{} // 新建一个 Request 结构体,该结构体会贯穿请求的始终,请求字段,响应字段都放在 Request 中
r.Multi = multi
r.Batch = &sync.WaitGroup{}
r.Database = s.database
r.UnixNano = start.UnixNano()
if err := s.handleRequest(r, d); err != nil {// 执行 handleRequest 函数,处理请求
r.Resp = redis.NewErrorf(“ERR handle request, %s”, err)
tasks.PushBack(r)
if breakOnFailure {
return err
}
} else {
tasks.PushBack(r) // 如果 handleRequest 执行成功,将请求 r 放入 tasks(即上文的 RequestChan) 的 data 字段中。loopWriter 会从该字段中获取请求并且返回给客户端
}
看 handleRequest 函数如何处理请求, 重点是 router 的 dispatch 函数
func (s *Router) dispatch(r *Request) error {
hkey := getHashKey(r.Multi, r.OpStr)//hkey 为请求的 key
var id = Hash(hkey) % MaxSlotNum //hash 请求的 key 之后对 1024 取模, 获取该 key 分配到哪个 slot
slot := &s.slots[id] //slot 都保存在 router 的 slots 数组中, 获取对应的 slot
return slot.forward(r, hkey)// 执行 slot 的 forward 函数
}
forward 函数调用 process 函数,返回一个 BackendConn 结构, 然后调用其 PushBack 函数将请求放入 bc.input 中
func (d *forwardSync) Forward(s *Slot, r *Request, hkey []byte) error {
s.lock.RLock()
bc, err := d.process(s, r, hkey) // 返回一个连接, 并且将请求放入 BackendConn 的 input 中
s.lock.RUnlock()
if err != nil {
return err
}
bc.PushBack(r)
return nil
}
bc.PushBack(r) 函数如下:
func (bc *BackendConn) PushBack(r *Request) {
if r.Batch != nil {
r.Batch.Add(1) // 将请求的 Batch 执行 add 1 的操作,注意前文中的 loopWriter 会在 Batch 处等待
}
bc.input <- r // 将请求放入 bc.input channel
}
至此可以看到,Proxy 的处理流程
loopWriter->RuquestChan 的 data 字段中读取请求并且返回。在 Batch 处等待
loopReader-> 将请求放入 RequestChan 的 data 字段中,并且将请求放入 bc.input channel 中。在 Batch 处加 1
很明显,Proxy 并没有真正处理请求, 肯定会有 goroutine 从 bc.input 中读取请求并且处理完成后在 Batch 处减 1,这样当请求执行完成后,loopWriter 就可以返回给客户端端响应了。
BackendConn 的处理流程
从上文得知,proxy 结构体中有一个 router 字段,类型为 Router, 结构体类型如下:
type Router struct {
mu sync.RWMutex
pool struct {
primary *sharedBackendConnPool // 连接池
replica *sharedBackendConnPool
}
slots [MaxSlotNum]Slot //slot
…
}
Router 的 pool 中管理连接池, 执行 fillSlot 时会真正生成连接,放入 Slot 结构体的 backend 字段的 bc 字段中,Slot 结构体如下:
type Slot struct {
id int
…
backend, migrate struct {
id int
bc *sharedBackendConn
}
…
method forwardMethod
}
我们看一下 bc 字段的结构体 sharedBackendConn:
type sharedBackendConn struct {
addr string //codis server 的地址
host []byte //codis server 主机名
port []byte //codis server 的端口
owner *sharedBackendConnPool // 属于哪个连接池
conns [][]*BackendConn // 二维数组, 一般 codis server 会有 16 个 db, 第一个维度为 0 -15 的数组, 每个 db 可以有多个 BackendConn 连接
single []*BackendConn // 如果每个 db 只有一个 BackendConn 连接,则直接放入 single 中。当每个 db 有多个连接时会从 conns 中选一个返回,而每个 db 只有一个连接时,直接从 single 中返回
refcnt int
}
每个 BackendConn 中有一个 input chan *Request 字段, 是一个 channel,channel 中的内容为 Request 指针。也就是第二章节 loopReader 选取一个 BackendConn 后,会将请求放入 input 中。
下边我们看看处理 BackendConn input 字段中数据的协程是如何启动并处理数据的。代码路径为 pkg/proxy/backend.go 的 newBackendConn 函数
func NewBackendConn(addr string, database int, config *Config) *BackendConn {
bc := &BackendConn{
addr: addr, config: config, database: database,
}
//1024 长度的管道, 存放 1024 个 *Request
bc.input = make(chan *Request, 1024)
bc.retry.delay = &DelayExp2{
Min: 50, Max: 5000,
Unit: time.Millisecond,
}
go bc.run()
return bc
}
可以看到,在此处创建的 BackendConn 结构,并且初始化 bc.input 字段。连接池的建立是在 proxy 初始化启动的时候就会建立好。继续看 bc.run() 函数的处理流程
func (bc *BackendConn) run() {
log.Warnf(“backend conn [%p] to %s, db-%d start service”,
bc, bc.addr, bc.database)
for round := 0; bc.closed.IsFalse(); round++ {
log.Warnf(“backend conn [%p] to %s, db-%d round-[%d]”,
bc, bc.addr, bc.database, round)
if err := bc.loopWriter(round); err != nil {// 执行 loopWriter 函数,此处的 loopWriter 和第二章节的 loopWriter 只是名称相同,是两个不同的处理函数
bc.delayBeforeRetry()
}
}
log.Warnf(“backend conn [%p] to %s, db-%d stop and exit”,
bc, bc.addr, bc.database)
}
func (bc *BackendConn) loopWriter(round int) (err error) {
…
c, tasks, err := bc.newBackendReader(round, bc.config) // 调用 newBackendReader 函数。注意此处的 tasks 也是一个存放 *Request 的 channel, 用来此处的 loopWriter 和 loopReader 交流信息
if err != nil {
return err
}
…
for r := range bc.input {// 可以看到, 此处的 loopWriter 会从 bc.input 中取出数据并且处理
…
if err := p.EncodeMultiBulk(r.Multi); err != nil {// 将请求编码并且发送到 codis server
return bc.setResponse(r, nil, fmt.Errorf(“backend conn failure, %s”, err))
}
if err := p.Flush(len(bc.input) == 0); err != nil {
return bc.setResponse(r, nil, fmt.Errorf(“backend conn failure, %s”, err))
} else {
tasks <- r // 将请求放入 tasks 这个 channel 中
}
}
return nil
}
注意此处的 loopWriter 会从 bc.input 中取出数据发送到 codis server,bc.newBackendReader 会起一个 loopReader, 从 codis server 中读取数据并且写到 request 结构体中,此处的 loopReader 和 loopWriter 通过 tasks 这个 channel 通信。
func (bc *BackendConn) newBackendReader(round int, config *Config) (*redis.Conn, chan<- *Request, error) {
…
tasks := make(chan *Request, config.BackendMaxPipeline)// 创建 task 这个 channel 并且返回给 loopWriter
go bc.loopReader(tasks, c, round)// 启动 loopReader
return c, tasks, nil
}
func (bc *BackendConn) loopReader(tasks <-chan *Request, c *redis.Conn, round int) (err error) {
…
for r := range tasks {// 从 tasks 中取出响应
resp, err := c.Decode()
if err != nil {
return bc.setResponse(r, nil, fmt.Errorf(“backend conn failure, %s”, err))
}
…
bc.setResponse(r, resp, nil)// 设置响应数据到 request 结构体中
}
return nil
}
func (bc *BackendConn) setResponse(r *Request, resp *redis.Resp, err error) error {
r.Resp, r.Err = resp, err //Request 的 Resp 字段设置为响应值
if r.Group != nil {
r.Group.Done()
}
if r.Batch != nil {
r.Batch.Done() // 注意此处会对 Batch 执行减 1 操作,这样 proxy 中的 loopWriter 可以聚合响应并返回
}
return err
}
总结一下,BackendConn 中的函数功能如下
loopWriter-> 从 bc.input 中取出请求并且发给 codis server, 并且将请求放到 tasks channel 中
loopReader-> 从 tasks 中取出请求,设置 codis server 的响应字段到 Request 的 Resp 字段中,并且将 Batch 执行减 1 操作
小结一图胜千言,图片版权归李老师,如下