关于golang:go-databasesql-连接的申请与释放超过最大连接数

10次阅读

共计 2088 个字符,预计需要花费 6 分钟才能阅读完成。

go 自带的 database/sql 领有连接池的能力,能够通过 SetMaxOpenConns 配置最大的连接数,当超过最大连贯时,database/sql 是如何解决的呢?

答案是将申请放入期待队列并阻塞,当有连贯开释时被唤醒,将开释的连贯给它。

申请连贯

申请连贯的流程:

  • 首先查看连接池是否有闲暇的连贯,若有,则返回该连贯;
  • 查看是否超过最大连接数:

    • 若超过最大连贯,则将该申请放入期待队列;
    • 否则,新建一个连贯对象返回;
// conn returns a newly-opened or cached *driverConn.
func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) {
   ......
   // 先查看连接池是否有闲暇的连贯,若有,则返回该 conn
   ......

   // 如果超过最大连接数,将该连贯申请放入期待队列中并阻塞
   if db.maxOpen > 0 && db.numOpen >= db.maxOpen {req := make(chan connRequest, 1)
      reqKey := db.nextRequestKeyLocked()
      db.connRequests[reqKey] = req
      db.waitCount++
      
      select {case <-ctx.Done():
         ......
      case ret, ok := <-req:
         ......
         return ret.conn, ret.err
      }
   }

   // 新建一个连贯对象,返回
   ......
}

重点关注 超过最大连接数 的行为,将连贯申请放入期待队列 connRequests:map[unit64]chan connRequest 类型,key 是 unit64,value 是 chan connRequest,申请放入 map 的逻辑:

req := make(chan connRequest, 1)
reqKey := db.nextRequestKeyLocked()
db.connRequests[reqKey] = req
db.waitCount++

//reqKey 是递增的整数值
func (db *DB) nextRequestKeyLocked() uint64 {
   next := db.nextRequest
   db.nextRequest++
   return next
}

除了将 request 退出 map,还用 select 将申请阻塞:

// Timeout the connection request with the context.
select {//cxt.Done()
    case <-ctx.Done():
        ......
    //req 是 chann connRequest,也就是 channel 上有数据,就返回该连贯
    case ret, ok := <-req:
        return ret.conn, ret.err
}

这个阻塞的 select,期待 req channel 上有数据。

当有连贯开释的时候,会向 channel 上放数据,这里就不再阻塞,能够走上来了。

开释连贯

入口函数是 releaseConn:

func (dc *driverConn) releaseConn(err error) {dc.db.putConn(dc, err, true)
}

func (db *DB) putConn(dc *driverConn, err error, resetSession bool) {
    ......
    dc.inUse = false    
    added := db.putConnDBLocked(dc, nil)    // 开释连贯
    db.mu.Unlock()
    // 如果没有加到了闲暇列表中,则敞开 dc
    if !added {dc.Close()
    }
}

开释连贯的逻辑在 db.putConnDBLocked 中:

  • 首先查看 connection 期待队列中,是否有阻塞的申请;
  • 若有,则将该 connection 给它,向 channel 中发数据,阻塞的连贯申请就能够拿到连贯;
  • 否则,将连贯放至 freeConn 连接池中;
func (db *DB) putConnDBLocked(dc *driverConn, err error) bool {
    // 期待队列中,有阻塞期待的申请
    if c := len(db.connRequests); c > 0 {        
        var req chan connRequest
        var reqKey uint64
        for reqKey, req = range db.connRequests {break}
        delete(db.connRequests, reqKey)     // 删除该申请
        // 给该申请一个数据,也就是向 channel 上放数据,这样的话,阻塞的申请就能够进行上来了
        req <- connRequest{
            conn: dc,
            err:  err,
        }
        return true
    } else if err == nil && !db.closed {    // 放入闲暇连接池
        if db.maxIdleConnsLocked() > len(db.freeConn) {db.freeConn = append(db.freeConn, dc)
            db.startCleanerLocked()
            return true
        }
        db.maxIdleClosed++
       }
    ......
}

正文完
 0