关于golang:聊聊gorm的读写分离

67次阅读

共计 6201 个字符,预计需要花费 16 分钟才能阅读完成。

本文次要钻研一下 gorm 的读写拆散

DBResolver

gorm.io/plugin/dbresolver@v1.1.0/dbresolver.go

type DBResolver struct {
    *gorm.DB
    configs          []Config
    resolvers        map[string]*resolver
    global           *resolver
    prepareStmtStore map[gorm.ConnPool]*gorm.PreparedStmtDB
    compileCallbacks []func(gorm.ConnPool) error
}

func (dr *DBResolver) Name() string {return "gorm:db_resolver"}

func (dr *DBResolver) Initialize(db *gorm.DB) error {
    dr.DB = db
    dr.registerCallbacks(db)
    return dr.compile()}

DBResolver 定义了 resolvers;DBResolver 实现了 Plugin 接口的 Name、Initialize;Initialize 办法执行了 dr.registerCallbacks(db)、dr.compile()

registerCallbacks

gorm.io/plugin/dbresolver@v1.1.0/callbacks.go

func (dr *DBResolver) registerCallbacks(db *gorm.DB) {dr.Callback().Create().Before("*").Register("gorm:db_resolver", dr.switchSource)
    dr.Callback().Query().Before("*").Register("gorm:db_resolver", dr.switchReplica)
    dr.Callback().Update().Before("*").Register("gorm:db_resolver", dr.switchSource)
    dr.Callback().Delete().Before("*").Register("gorm:db_resolver", dr.switchSource)
    dr.Callback().Row().Before("*").Register("gorm:db_resolver", dr.switchReplica)
    dr.Callback().Raw().Before("*").Register("gorm:db_resolver", dr.switchGuess)
}

registerCallbacks 办法针对 Create、Update、Delete 办法注册了 dr.switchSource;针对 Query、Row 注册了 dr.switchReplica

switchSource

gorm.io/plugin/dbresolver@v1.1.0/callbacks.go

func (dr *DBResolver) switchSource(db *gorm.DB) {if !isTransaction(db.Statement.ConnPool) {db.Statement.ConnPool = dr.resolve(db.Statement, Write)
    }
}

switchSource 办法在以后连贯没有开启事务时执行 dr.resolve(db.Statement, Write)

switchReplica

gorm.io/plugin/dbresolver@v1.1.0/callbacks.go

func (dr *DBResolver) switchReplica(db *gorm.DB) {if !isTransaction(db.Statement.ConnPool) {if rawSQL := db.Statement.SQL.String(); len(rawSQL) > 0 {dr.switchGuess(db)
        } else {_, locking := db.Statement.Clauses["FOR"]
            if _, ok := db.Statement.Clauses[writeName]; ok || locking {db.Statement.ConnPool = dr.resolve(db.Statement, Write)
            } else {db.Statement.ConnPool = dr.resolve(db.Statement, Read)
            }
        }
    }
}

switchReplica 办法在以后连贯没有开启事务时,在 rawSQL 长度大于 0 时执行 switchGuess,否则判断是否有 for 语句,若 tag 有指定 write 或者语句有 for 加锁则执行 dr.resolve(db.Statement, Write),否则执行 dr.resolve(db.Statement, Read)

switchGuess

gorm.io/plugin/dbresolver@v1.1.0/callbacks.go

func (dr *DBResolver) switchGuess(db *gorm.DB) {if !isTransaction(db.Statement.ConnPool) {if _, ok := db.Statement.Clauses[writeName]; ok {db.Statement.ConnPool = dr.resolve(db.Statement, Write)
        } else if rawSQL := strings.TrimSpace(db.Statement.SQL.String()); len(rawSQL) > 10 && strings.EqualFold(rawSQL[:6], "select") && !strings.EqualFold(rawSQL[len(rawSQL)-10:], "for update") {db.Statement.ConnPool = dr.resolve(db.Statement, Read)
        } else {db.Statement.ConnPool = dr.resolve(db.Statement, Write)
        }
    }
}

switchGuess 在办法在以后连贯没有开启事务时,先判断 tag 有指定 write,若有则执行 dr.resolve(db.Statement, Write),否则判断 select 是否有 for update,没有则执行 dr.resolve(db.Statement, Read),否则执行 dr.resolve(db.Statement, Write)

resolve

gorm.io/plugin/dbresolver@v1.1.0/dbresolver.go

func (dr *DBResolver) resolve(stmt *gorm.Statement, op Operation) gorm.ConnPool {if len(dr.resolvers) > 0 {if u, ok := stmt.Clauses[usingName].Expression.(using); ok && u.Use != "" {if r, ok := dr.resolvers[u.Use]; ok {return r.resolve(stmt, op)
            }
        }

        if stmt.Table != "" {if r, ok := dr.resolvers[stmt.Table]; ok {return r.resolve(stmt, op)
            }
        }

        if stmt.Schema != nil {if r, ok := dr.resolvers[stmt.Schema.Table]; ok {return r.resolve(stmt, op)
            }
        }

        if rawSQL := stmt.SQL.String(); rawSQL != "" {if r, ok := dr.resolvers[getTableFromRawSQL(rawSQL)]; ok {return r.resolve(stmt, op)
            }
        }
    }

    if dr.global != nil {return dr.global.resolve(stmt, op)
    }

    return stmt.ConnPool
}

resolve 办法查找对应的 resolver 执行,没有的话应用 dr.global

dr.compile()

gorm.io/plugin/dbresolver@v1.1.0/dbresolver.go

func (dr *DBResolver) compile() error {
    for _, config := range dr.configs {if err := dr.compileConfig(config); err != nil {return err}
    }
    return nil
}

func (dr *DBResolver) compileConfig(config Config) (err error) {
    var (
        connPool = dr.DB.Config.ConnPool
        r        = resolver{
            dbResolver: dr,
            policy:     config.Policy,
        }
    )

    if preparedStmtDB, ok := connPool.(*gorm.PreparedStmtDB); ok {connPool = preparedStmtDB.ConnPool}

    if len(config.Sources) == 0 {r.sources = []gorm.ConnPool{connPool}
    } else if r.sources, err = dr.convertToConnPool(config.Sources); err != nil {return err}

    if len(config.Replicas) == 0 {r.replicas = r.sources} else if r.replicas, err = dr.convertToConnPool(config.Replicas); err != nil {return err}

    if len(config.datas) > 0 {
        for _, data := range config.datas {if t, ok := data.(string); ok {dr.resolvers[t] = &r
            } else {stmt := &gorm.Statement{DB: dr.DB}
                if err := stmt.Parse(data); err == nil {dr.resolvers[stmt.Table] = &r
                } else {return err}
            }
        }
    } else if dr.global == nil {dr.global = &r} else {return errors.New("conflicted global resolver")
    }

    for _, fc := range dr.compileCallbacks {if err = r.call(fc); err != nil {return err}
    }

    return nil
}

compile 办法遍历 dr.configs,挨个执行 dr.compileConfig(config),它会应用 config.Policy 创立 resolver

resolver

gorm.io/plugin/dbresolver@v1.1.0/resolver.go

type resolver struct {sources    []gorm.ConnPool
    replicas   []gorm.ConnPool
    policy     Policy
    dbResolver *DBResolver
}

func (r *resolver) resolve(stmt *gorm.Statement, op Operation) (connPool gorm.ConnPool) {
    if op == Read {if len(r.replicas) == 1 {connPool = r.replicas[0]
        } else {connPool = r.policy.Resolve(r.replicas)
        }
    } else if len(r.sources) == 1 {connPool = r.sources[0]
    } else {connPool = r.policy.Resolve(r.sources)
    }

    if stmt.DB.PrepareStmt {if preparedStmt, ok := r.dbResolver.prepareStmtStore[connPool]; ok {
            return &gorm.PreparedStmtDB{
                ConnPool: connPool,
                Mux:      preparedStmt.Mux,
                Stmts:    preparedStmt.Stmts,
            }
        }
    }

    return
}

resolver 的 resolve 在 Operation 为 Read 的时候,会应用 r.replicas,若只有 1 个 replica 则间接返回,若有多个则应用 r.policy.Resolve(r.replicas) 选一个;若 Operation 为 write 时,判断 sources,若只有一个 sources,则间接返回,若有多个 source 则通过 r.policy.Resolve(r.sources) 抉择

Policy

gorm.io/plugin/dbresolver@v1.1.0/policy.go

type Policy interface {Resolve([]gorm.ConnPool) gorm.ConnPool
}

type RandomPolicy struct {
}

func (RandomPolicy) Resolve(connPools []gorm.ConnPool) gorm.ConnPool {return connPools[rand.Intn(len(connPools))]
}

Policy 接口定义了 Resolve 办法来选取数据源,默认提供了 RandomPolicy,随机选取。

实例

func dbResolverDemo() {db, _ := gorm.Open(mysql.Open("master_dsn"), &gorm.Config{})
    dbResolverCfg := dbresolver.Config{Sources:  []gorm.Dialector{mysql.Open("master_dsn")},
        Replicas: []gorm.Dialector{mysql.Open("replica_a_dsn"), mysql.Open("replica_b_dsn")},
        Policy:   dbresolver.RandomPolicy{}}
    readWritePlugin := dbresolver.Register(dbResolverCfg)
    db.Use(readWritePlugin)
}

小结

gorm 的 dbresolver 实现了 Plugin 接口,它针对 Create、Update、Delete 办法注册了 dr.switchSource;针对 Query、Row 注册了 dr.switchReplica;switchSource 及 switchReplica 办法在以后连贯没有开启事务时动静判断是否 Operation 是 Read 还是 Write,开启事务时执行 dr.resolve(db.Statement, Write);resolver 的 resolve 依据 Operation 来进行数据源的切换。

doc

  • gorm

正文完
 0