本文次要钻研一下gorm的读写拆散

DBResolver

gorm.io/plugin/dbresolver@v1.1.0/dbresolver.go

type DBResolver struct {    *gorm.DB    configs          []Config    resolvers        map[string]*resolver    global           *resolver    prepareStmtStore map[gorm.ConnPool]*gorm.PreparedStmtDB    compileCallbacks []func(gorm.ConnPool) error}func (dr *DBResolver) Name() string {    return "gorm:db_resolver"}func (dr *DBResolver) Initialize(db *gorm.DB) error {    dr.DB = db    dr.registerCallbacks(db)    return dr.compile()}
DBResolver定义了resolvers;DBResolver实现了Plugin接口的Name、Initialize;Initialize办法执行了dr.registerCallbacks(db)、dr.compile()

registerCallbacks

gorm.io/plugin/dbresolver@v1.1.0/callbacks.go

func (dr *DBResolver) registerCallbacks(db *gorm.DB) {    dr.Callback().Create().Before("*").Register("gorm:db_resolver", dr.switchSource)    dr.Callback().Query().Before("*").Register("gorm:db_resolver", dr.switchReplica)    dr.Callback().Update().Before("*").Register("gorm:db_resolver", dr.switchSource)    dr.Callback().Delete().Before("*").Register("gorm:db_resolver", dr.switchSource)    dr.Callback().Row().Before("*").Register("gorm:db_resolver", dr.switchReplica)    dr.Callback().Raw().Before("*").Register("gorm:db_resolver", dr.switchGuess)}
registerCallbacks办法针对Create、Update、Delete办法注册了dr.switchSource;针对Query、Row注册了dr.switchReplica

switchSource

gorm.io/plugin/dbresolver@v1.1.0/callbacks.go

func (dr *DBResolver) switchSource(db *gorm.DB) {    if !isTransaction(db.Statement.ConnPool) {        db.Statement.ConnPool = dr.resolve(db.Statement, Write)    }}
switchSource办法在以后连贯没有开启事务时执行dr.resolve(db.Statement, Write)

switchReplica

gorm.io/plugin/dbresolver@v1.1.0/callbacks.go

func (dr *DBResolver) switchReplica(db *gorm.DB) {    if !isTransaction(db.Statement.ConnPool) {        if rawSQL := db.Statement.SQL.String(); len(rawSQL) > 0 {            dr.switchGuess(db)        } else {            _, locking := db.Statement.Clauses["FOR"]            if _, ok := db.Statement.Clauses[writeName]; ok || locking {                db.Statement.ConnPool = dr.resolve(db.Statement, Write)            } else {                db.Statement.ConnPool = dr.resolve(db.Statement, Read)            }        }    }}
switchReplica办法在以后连贯没有开启事务时,在rawSQL长度大于0时执行switchGuess,否则判断是否有for语句,若tag有指定write或者语句有for加锁则执行dr.resolve(db.Statement, Write),否则执行dr.resolve(db.Statement, Read)

switchGuess

gorm.io/plugin/dbresolver@v1.1.0/callbacks.go

func (dr *DBResolver) switchGuess(db *gorm.DB) {    if !isTransaction(db.Statement.ConnPool) {        if _, ok := db.Statement.Clauses[writeName]; ok {            db.Statement.ConnPool = dr.resolve(db.Statement, Write)        } else if rawSQL := strings.TrimSpace(db.Statement.SQL.String()); len(rawSQL) > 10 && strings.EqualFold(rawSQL[:6], "select") && !strings.EqualFold(rawSQL[len(rawSQL)-10:], "for update") {            db.Statement.ConnPool = dr.resolve(db.Statement, Read)        } else {            db.Statement.ConnPool = dr.resolve(db.Statement, Write)        }    }}
switchGuess在办法在以后连贯没有开启事务时,先判断tag有指定write,若有则执行dr.resolve(db.Statement, Write),否则判断select是否有for update,没有则执行dr.resolve(db.Statement, Read),否则执行dr.resolve(db.Statement, Write)

resolve

gorm.io/plugin/dbresolver@v1.1.0/dbresolver.go

func (dr *DBResolver) resolve(stmt *gorm.Statement, op Operation) gorm.ConnPool {    if len(dr.resolvers) > 0 {        if u, ok := stmt.Clauses[usingName].Expression.(using); ok && u.Use != "" {            if r, ok := dr.resolvers[u.Use]; ok {                return r.resolve(stmt, op)            }        }        if stmt.Table != "" {            if r, ok := dr.resolvers[stmt.Table]; ok {                return r.resolve(stmt, op)            }        }        if stmt.Schema != nil {            if r, ok := dr.resolvers[stmt.Schema.Table]; ok {                return r.resolve(stmt, op)            }        }        if rawSQL := stmt.SQL.String(); rawSQL != "" {            if r, ok := dr.resolvers[getTableFromRawSQL(rawSQL)]; ok {                return r.resolve(stmt, op)            }        }    }    if dr.global != nil {        return dr.global.resolve(stmt, op)    }    return stmt.ConnPool}
resolve办法查找对应的resolver执行,没有的话应用dr.global

dr.compile()

gorm.io/plugin/dbresolver@v1.1.0/dbresolver.go

func (dr *DBResolver) compile() error {    for _, config := range dr.configs {        if err := dr.compileConfig(config); err != nil {            return err        }    }    return nil}func (dr *DBResolver) compileConfig(config Config) (err error) {    var (        connPool = dr.DB.Config.ConnPool        r        = resolver{            dbResolver: dr,            policy:     config.Policy,        }    )    if preparedStmtDB, ok := connPool.(*gorm.PreparedStmtDB); ok {        connPool = preparedStmtDB.ConnPool    }    if len(config.Sources) == 0 {        r.sources = []gorm.ConnPool{connPool}    } else if r.sources, err = dr.convertToConnPool(config.Sources); err != nil {        return err    }    if len(config.Replicas) == 0 {        r.replicas = r.sources    } else if r.replicas, err = dr.convertToConnPool(config.Replicas); err != nil {        return err    }    if len(config.datas) > 0 {        for _, data := range config.datas {            if t, ok := data.(string); ok {                dr.resolvers[t] = &r            } else {                stmt := &gorm.Statement{DB: dr.DB}                if err := stmt.Parse(data); err == nil {                    dr.resolvers[stmt.Table] = &r                } else {                    return err                }            }        }    } else if dr.global == nil {        dr.global = &r    } else {        return errors.New("conflicted global resolver")    }    for _, fc := range dr.compileCallbacks {        if err = r.call(fc); err != nil {            return err        }    }    return nil}
compile办法遍历dr.configs,挨个执行dr.compileConfig(config),它会应用config.Policy创立resolver

resolver

gorm.io/plugin/dbresolver@v1.1.0/resolver.go

type resolver struct {    sources    []gorm.ConnPool    replicas   []gorm.ConnPool    policy     Policy    dbResolver *DBResolver}func (r *resolver) resolve(stmt *gorm.Statement, op Operation) (connPool gorm.ConnPool) {    if op == Read {        if len(r.replicas) == 1 {            connPool = r.replicas[0]        } else {            connPool = r.policy.Resolve(r.replicas)        }    } else if len(r.sources) == 1 {        connPool = r.sources[0]    } else {        connPool = r.policy.Resolve(r.sources)    }    if stmt.DB.PrepareStmt {        if preparedStmt, ok := r.dbResolver.prepareStmtStore[connPool]; ok {            return &gorm.PreparedStmtDB{                ConnPool: connPool,                Mux:      preparedStmt.Mux,                Stmts:    preparedStmt.Stmts,            }        }    }    return}
resolver的resolve在Operation为Read的时候,会应用r.replicas,若只有1个replica则间接返回,若有多个则应用r.policy.Resolve(r.replicas)选一个;若Operation为write时,判断sources,若只有一个sources,则间接返回,若有多个source则通过r.policy.Resolve(r.sources)抉择

Policy

gorm.io/plugin/dbresolver@v1.1.0/policy.go

type Policy interface {    Resolve([]gorm.ConnPool) gorm.ConnPool}type RandomPolicy struct {}func (RandomPolicy) Resolve(connPools []gorm.ConnPool) gorm.ConnPool {    return connPools[rand.Intn(len(connPools))]}
Policy接口定义了Resolve办法来选取数据源,默认提供了RandomPolicy,随机选取。

实例

func dbResolverDemo() {    db, _ := gorm.Open(mysql.Open("master_dsn"), &gorm.Config{})    dbResolverCfg := dbresolver.Config{        Sources:  []gorm.Dialector{mysql.Open("master_dsn")},        Replicas: []gorm.Dialector{mysql.Open("replica_a_dsn"), mysql.Open("replica_b_dsn")},        Policy:   dbresolver.RandomPolicy{}}    readWritePlugin := dbresolver.Register(dbResolverCfg)    db.Use(readWritePlugin)}

小结

gorm的dbresolver实现了Plugin接口,它针对Create、Update、Delete办法注册了dr.switchSource;针对Query、Row注册了dr.switchReplica;switchSource及switchReplica办法在以后连贯没有开启事务时动静判断是否Operation是Read还是Write,开启事务时执行dr.resolve(db.Statement, Write);resolver的resolve依据Operation来进行数据源的切换。

doc

  • gorm