乐趣区

Gorm-源码分析一-databasesql

简介

Gorm 是 Go 语言开发用的比较多的一个 ORM。它的功能比较全:

  • 增删改查
  • 关联(包含一个,包含多个,属于,多对多,多种包含)
  • CallBacks(创建、保存、更新、删除、查询找)之前 之后都可以有 callback 函数
  • 预加载
  • 事务
  • 复合主键
  • 日志

database/sql 包

但是这篇文章中并不会直接看 Gorm 的源码,我们会先从 database/sql 分析。原因是 Gorm 也是基于这个包来封装的一些功能。所以只有先了解了 database/sql 包才能更加好的理解 Gorm 源码。
database/sql 其实也是一个对于 mysql 驱动的上层封装。”github.com/go-sql-driver/mysql” 就是一个对于 mysql 的驱动,database/sql 就是在这个基础上做的基本封装包含连接池的使用

使用例子

下面这个是最基本的增删改查操作
操作分下面几个步骤:

  1. 引入 github.com/go-sql-driver/mysql 包(包中的 init 方法会初始化 mysql 驱动的注册)
  2. 使用 sql.Open 初始化一个 sql.DB 结构
  3. 调用 Prepare Exec 执行 sql 语句

== 注意:==使用 Exec 函数无需释放调用完毕之后会自动释放,把连接放入连接池中

  使用 Query 返回的 sql.rows 需要手动释放连接 rows.Close()
package main

import (
    "database/sql"
    "fmt"
    _ "github.com/go-sql-driver/mysql"
    "strconv"
)

func main() {
    // 打开连接
    db, err := sql.Open("mysql", "root:feg@125800@tcp(47.100.245.167:3306)/artifact?charset=utf8&loc=Asia%2FShanghai&parseTime=True")
    if err != nil {fmt.Println("err:", err)
    }
    // 设置最大空闲连接数
    db.SetMaxIdleConns(1)
    // 设置最大链接数
    db.SetMaxOpenConns(1)
    query(db, 3)
}

// 修改
func update(db *sql.DB, id int, user string) {stmt, err := db.Prepare("update user set UserName=? where Id =?")
    if err != nil {fmt.Println(err)
    }
    res, err := stmt.Exec(user, id)
    updateId, err := res.LastInsertId()
    fmt.Println(updateId)
}

// 删除
func delete(db *sql.DB, id int) {stmt, err := db.Prepare("delete  from user where id = ?")
    if err != nil {fmt.Println(err)
    }
    res, err := stmt.Exec(1)
    updateId, err := res.LastInsertId()
    fmt.Println(updateId)
}

// 查询
func query(db *sql.DB, id int) {rows, err := db.Query("select * from user where  id =" + strconv.Itoa(id))
    if err != nil {fmt.Println(err)
        return
    }

    for rows.Next() {
        var id int
        var user string
        var pwd string
        rows.Scan(&id, &user, &pwd)
        fmt.Println("id:", id, "user:", user, "pwd:", pwd)
    }
    rows.Close()}

// 插入
func insert(db *sql.DB, user, pwd string) {stmt, err := db.Prepare("insert into user set UserName=?,Password=?")
    if err != nil {fmt.Println(err)
    }
    res, err := stmt.Exec("peter", "panlei")
    id, err := res.LastInsertId()
    fmt.Println(id)
}

连接池

因为 Gorm 的连接池就是使用 database/sql 包中的连接池,所以这里我们需要学习一下包里的连接池的源码实现。其实所有连接池最重要的就是连接池对象、获取函数、释放函数下面来看一下 database/sql 中的连接池。

DB 对象
type DB struct {
    // 数据库实现驱动
    driver driver.Driver
    dsn    string
    numClosed uint64
    // 锁
    mu           sync.Mutex // protects following fields
    // 空闲连接
    freeConn     []*driverConn
    // 阻塞请求队列,等连接数达到最大限制时,后续请求将插入此队列等待可用连接
    connRequests map[uint64]chan connRequest
    // 记录下一个 key 用于 connRequests map 的 key
    nextRequest  uint64 // Next key to use in connRequests.
    numOpen      int    // number of opened and pending open connections
    
    openerCh    chan struct{}
    closed      bool
    dep         map[finalCloser]depSet
    lastPut     map[*driverConn]string 
    // 最大空闲连接数
    maxIdle     int                    
    // 最大打开连接数
    maxOpen     int  
    // 连接最大存活时间
    maxLifetime time.Duration          
    cleanerCh   chan struct{}}
获取方法
func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) {db.mu.Lock()
    if db.closed {db.mu.Unlock()
        return nil, errDBClosed
    }
    // Check if the context is expired.
    select {
    default:
    case <-ctx.Done():
        db.mu.Unlock()
        return nil, ctx.Err()}
    lifetime := db.maxLifetime

    // 查看是否有空闲的连接 如果有则直接使用空闲连接
    numFree := len(db.freeConn)
    if strategy == cachedOrNewConn && numFree > 0 {
        // 取出数据第一个
        conn := db.freeConn[0]
        // 复制数组,去除第一个连接
        copy(db.freeConn, db.freeConn[1:])
        db.freeConn = db.freeConn[:numFree-1]
        conn.inUse = true
        db.mu.Unlock()
        if conn.expired(lifetime) {conn.Close()
            return nil, driver.ErrBadConn
        }
        return conn, nil
    }

    // 判断是否超出最大连接数 
    if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
        // 创建一个 chan 
        req := make(chan connRequest, 1)
        // 获取下一个 request 作为 map 中的 key
        reqKey := db.nextRequestKeyLocked()
        db.connRequests[reqKey] = req
        db.mu.Unlock()

        // Timeout the connection request with the context.
        select {case <-ctx.Done():
            // Remove the connection request and ensure no value has been sent
            // on it after removing.
            db.mu.Lock()
            delete(db.connRequests, reqKey)
            db.mu.Unlock()
            select {
            default:
            case ret, ok := <-req:
                if ok {db.putConn(ret.conn, ret.err)
                }
            }
            return nil, ctx.Err()
        // 如果没有取消则从 req chan 中获取数据 阻塞主一直等待有 conn 数据传入
        case ret, ok := <-req:
            if !ok {return nil, errDBClosed}
            // 判断超时 
            if ret.err == nil && ret.conn.expired(lifetime) {ret.conn.Close()
                return nil, driver.ErrBadConn
            }
            return ret.conn, ret.err
        }
    }
    
    db.numOpen++ // optimistically
    db.mu.Unlock()
    // 调用 driver 的 Open 方法建立连接
    ci, err := db.driver.Open(db.dsn)
    if err != nil {db.mu.Lock()
        db.numOpen-- // correct for earlier optimism
        db.maybeOpenNewConnections()
        db.mu.Unlock()
        return nil, err
    }
    db.mu.Lock()
    dc := &driverConn{
        db:        db,
        createdAt: nowFunc(),
        ci:        ci,
        inUse:     true,
    }
    db.addDepLocked(dc, dc)
    db.mu.Unlock()
    return dc, nil
}
释放连接方法
// 释放连接
func (db *DB) putConn(dc *driverConn, err error) {db.mu.Lock()
    if !dc.inUse {
        if debugGetPut {fmt.Printf("putConn(%v) DUPLICATE was: %s\n\nPREVIOUS was: %s", dc, stack(), db.lastPut[dc])
        }
        panic("sql: connection returned that was never out")
    }
    if debugGetPut {db.lastPut[dc] = stack()}
    // 设置已经在使用中
    dc.inUse = false

    for _, fn := range dc.onPut {fn()
    }
    dc.onPut = nil
    // 判断连接是否有错误 
    if err == driver.ErrBadConn {db.maybeOpenNewConnections()
        db.mu.Unlock()
        dc.Close()
        return
    }
    if putConnHook != nil {putConnHook(db, dc)
    }
    // 调用方法 释放连接
    added := db.putConnDBLocked(dc, nil)
    db.mu.Unlock()
    // 判断如果没有加到了空闲列表中 dc 关闭
    if !added {dc.Close()
    }
}

func (db *DB) putConnDBLocked(dc *driverConn, err error) bool {
    if db.closed {return false}
    if db.maxOpen > 0 && db.numOpen > db.maxOpen {return false}
    // 如果等待 chan 列表大于 0 
    if c := len(db.connRequests); c > 0 {
        var req chan connRequest
        var reqKey uint64
        // 获取 map 中 chan 和 key
        for reqKey, req = range db.connRequests {break}
        // 从列表中删除 chan 
        delete(db.connRequests, reqKey) // Remove from pending requests.
        if err == nil {dc.inUse = true}
        // 把连接传入 chan 中 让之前获取连接被阻塞的获取函数继续
        req <- connRequest{
            conn: dc,
            err:  err,
        }
        return true
    } else if err == nil && !db.closed && db.maxIdleConnsLocked() > len(db.freeConn) {
        // 如果没有等待列表,则把连接放到空闲列表中
        db.freeConn = append(db.freeConn, dc)
        db.startCleanerLocked()
        return true
    }
    return false
}

连接池的实现有很多方法,在 database/sql 包中使用的是 chan 阻塞 使用 map 记录等待列表,等到有连接释放的时候再把连接传入等待列表中的 chan 不在阻塞返回连接。
之前我们看到的 Redigo 是使用一个 chan 来阻塞,然后释放的时候放入空闲列表,在往这一个 chan 中传入 struct{}{},让程序继续 获取的时候再从空闲列表中获取。并且使用的是链表的结构来存储空闲列表。

总结

database/sql 是对于 mysql 驱动的封装,然而 Gorm 则是对于 database/sql 的再次封装。让我们可以更加简单的实现对于 mysql 数据库的操作。

退出移动版