关于golang:TiDB源码阅读笔记三-TiDB-的在线-DDL

5次阅读

共计 8232 个字符,预计需要花费 21 分钟才能阅读完成。

在线 DDL 始终是数据库应用上的痛点。之前的工作中,有很多数据中心共事做 DDL 变更都很头疼,也吐槽过,审慎抉择工夫点进行。即便如此,面对简单庞杂的利用零碎和各类定时运维脚本,DDL 操作仍然可能烦扰到业务失常运行。

TiDB 的在线 DDL 是依据 Google F1 的在线异步 schema 变更算法实现

F1 中 schema 以非凡的 kv 对存储于 Spanner 中,同时每个 F1 服务器在运行过程中本身也保护一份拷贝。为了保障同一时刻最多只有 2 份 schema 失效,F1 约定了长度为数分钟的 schema 租约,所有 F1 服务器在租约到期后都要从新加载 schema。如果节点无奈从新实现续租,它将会主动终止服务并期待被集群治理设施重启。

简略来说,TiDB 的在线 DDL 和 MySQL 相比,次要有这些区别

MySQL 的数据和表构造是紧耦合的,想动表构造,势必会牵扯到数据。TiDB 的数据和表构造是宰割的,操作数据时会比对表构造,通过两个 version 来应答不同的 DML 语句。

具体的,能够参考这篇文章 ????

https://github.com/zimulala/builddatabase/blob/master/f1/schema-change.md​github.com

先介绍几个比拟重要的概念

Job:每个独自的 DDL 操作可看做一个 job。在一个 DDL 操作开始时,会将此操作封装成一个 job 并存放到 job queue,等此操作实现时,会将此 job 从 job queue 删除,并在存入 history job queue,便于查看历史 job。
Worker:每个节点都有一个 worker 用来解决 job。
Owner:整个零碎只有一个节点的 worker 能入选 owner 角色,每个节点都可能入选这个角色,入选 owner 后 worker 才有解决 job 的权力。owner 这个角色是有任期的,owner 的信息会存储在 KV 层中。worker 定期获取 KV 层中的 owner 信息,如果其中 ownerID 为空,或者以后的 owner 超过了任期,则 worker 能够尝试更新 KV 层中的 owner 信息(设置 ownerID 为本身的 workerID),如果更新胜利,则该 worker 成为 owner。在租期内这个用来确保整个零碎同一时间只有一个节点在解决 schema 变更。

总结一下,每个 TiDB 上有一个 Worker 线程,DDL 语句会封装为一个 Job,由 Worker 进行解决。Worker 分为 Owner 和 非 Owner,每个集群同时只能有一个 Owner,只有它能够解决队列中的 Job。咱们先去源码中看看 Worker 的样子


Worker

在 TiDB 源码浏览(一)TiDB 的入口 中,咱们提到了 main 函数中的 createStoreAndDomain 办法,这个办法初始化了一些重要的后盾过程,其中就包含 Worker,启动流程根本是如下的路数

func createStoreAndDomain() {dom, err = session.BootstrapSession(storage)
}
// 来到 BootstrapSession,办法比拟长,咱们只看关联到的中央,创立 session 
func BootstrapSession(store kv.Storage) (*domain.Domain, error) {se, err := createSession(store)
}
// 查看 createSession(store),再进一层就是 createSessionWithOpt
func createSessionWithOpt(store kv.Storage, opt *Opt) (*session, error) {dom, err := domap.Get(store)
}
// 进 Get
func (dm *domainMap) Get(store kv.Storage) (d *domain.Domain, err error) {d = domain.NewDomain(store, ddlLease, statisticLease, factory)
}

来到 domain.go,看 init 函数

do.ddl = ddl.NewDDL(
        ctx,
        ddl.WithEtcdClient(do.etcdClient),
        ddl.WithStore(do.store),
        ddl.WithInfoHandle(do.infoHandle),
        ddl.WithHook(callback),
        ddl.WithLease(ddlLease),
        ddl.WithResourcePool(sysCtxPool),
)

通过 ddl.NewDDL 进入 ddl.go,开始 start Worker

d.start(ctx, opt.ResourcePool)

这里有两种 Worker,一种专门负责 index 类型的 DDL,一种负责其余的

if RunWorker {d.workers[generalWorker] = newWorker(generalWorker, d.store, d.sessPool, d.delRangeMgr)
    d.workers[addIdxWorker] = newWorker(addIdxWorker, d.store, d.sessPool, d.delRangeMgr)
    for _, worker := range d.workers {
        w := worker
        go tidbutil.WithRecovery(func() {w.start(d.ddlCtx) },// 启动 worker
    }

这样 Worker 就启动了,启动 TiDB 也能看到这个日志打印:

[ddl_worker.go:130] ["[ddl] start DDL worker"] [worker="worker 1, tp general"]
[ddl_worker.go:130] ["[ddl] start DDL worker"] [worker="worker 2, tp add index"]

Worker 启动后,就开始轮询解决队列的 Job ⬇️

logutil.Logger(w.logCtx).Info("[ddl] start DDL worker")

// We use 4 * lease time to check owner's timeout, so here, we will update owner's status
// every 2 * lease time. If lease is 0, we will use default 1s.
// But we use etcd to speed up, normally it takes less than 1s now, so we use 1s as the max value.
checkTime := chooseLeaseTime(2*d.lease, 1*time.Second)
ticker := time.NewTicker(checkTime)
defer ticker.Stop()
for {err := w.handleDDLJobQueue(d) // 解决 Job 的流程,文章后半局部介绍
}

TiDB 接管 DDL 语句

从 parser.y 开始,以 alter table add columns 语句为例开展,先找 AlterTableStmt

AlterTableStmt:
    "ALTER" IgnoreOptional "TABLE" TableName AlterTableSpecListOpt AlterTablePartitionOpt
    {specs := $5.([]*ast.AlterTableSpec)
        if $6 != nil {specs = append(specs, $6.(*ast.AlterTableSpec))
        }
        $$ = &ast.AlterTableStmt{Table: $4.(*ast.TableName),
            Specs: specs,
        }
    }

// 看 AlterTableSpecListOpt 下
|    "ADD" ColumnKeywordOpt IfNotExists ColumnDef ColumnPosition
    {
        $$ = &ast.AlterTableSpec{IfNotExists: $3.(bool),
            Tp:          ast.AlterTableAddColumns,
            NewColumns:  []*ast.ColumnDef{$4.(*ast.ColumnDef)},
            Position:    $5.(*ast.ColumnPosition),
        }
    }

Token

IgnoreOptional
TableName
AlterTableSpecListOpt
ColumnKeywordOpt
IfNotExists
ColumnDef 
ColumnPosition

次要看下 ast.AlterTableAddColumns,来到 ddl_api.go

func (d *ddl) AlterTable(ctx sessionctx.Context, ident ast.Ident, specs []*ast.AlterTableSpec) (err error) {
     case ast.AlterTableAddColumns:
    err = d.AddColumn(ctx, ident, spec)
}

先进行了一系列的校验:

  1. 如果增加 auto increment、primary key,unique key 属性字段,返回不反对
  2. 查看字段属性
  3. 表中字段是否过多(刚来时听过 table 最多反对 512 个字段,就在这里,写死的 TableColumnCountLimit = uint32(512)
  4. 字段是否曾经存在
  5. 字段长度是否超过最大值
  6. 等等等

太长不看,来到这里

job := &model.Job{
        SchemaID:   schema.ID,
        TableID:    t.Meta().ID,
        SchemaName: schema.Name.L,
        Type:       model.ActionAddColumn,
        BinlogInfo: &model.HistoryInfo{},
        Args:       []interface{}{col, spec.Position, 0},
    }
err = d.doDDLJob(ctx, job)

进入 d.doDDLJob(ctx, job)

func (d *ddl) doDDLJob(ctx sessionctx.Context, job *model.Job) error {
    // 获取 DDL SQL
    job.Query, _ = ctx.Value(sessionctx.QueryString).(string)
    // 赋给 task
    task := &limitJobTask{job, make(chan error)}
    // 传入 limitJobCh,我了解就是队列,毕竟下一句都 true 了
    d.limitJobCh <- task
    ctx.GetSessionVars().StmtCtx.IsDDLJobInQueue = true}
// 告诉 worker,worker 分为 addIdxWorker 和 generalWorker
d.asyncNotifyWorker(job.Type)

前面的那段代码我了解就是 for true 来查 Job 的状态,若失败就报错,若胜利就 log。这就是 TiDB 接管 DDL 语句后的大抵流程。


Worker 解决 Job

当初次要的工作就回到了 ddl_worker.go,也就是上文提到的 handleDDLJobQueue 函数中,这里开始循环解决队列中的 Job,这个办法有点绕,须要循环好几次,因为每个 Job 整个流程有好几种状态,依据不同状态做不同解决,这里我简略点说,有趣味的能够 debug

func (w *worker) handleDDLJobQueue(d *ddlCtx) error {
    once := true 
    for {
    waitTime := 2 * d.lease // 2 个租约工夫 1min 30s
        // 开启事务,每次循环都会 commit
    err := kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {if !d.isOwner() {// 不是 owner 角色,就什么都不做
            return nil
        }
        job, err = w.getFirstDDLJob(t) // 获取队列中头部 Job
        if job == nil || err != nil { // 没有则返回
            return errors.Trace(err)
        }
                if once {// Job 的第一次循环都会走这个,但只是解决异常情况,// 在失常流程中,w.waitSchemaSynced 间接 return 到第二轮循环
                w.waitSchemaSynced(d, job, waitTime)
            once = false
            return nil
        }
                // 第二轮因为状态问题,不会走到这个分支
        if job.IsDone() || job.IsRollbackDone() {err = w.finishDDLJob(t, job)
        }
                // 我狐疑这个中央是操作 KV 层的,因为没找到实现,如果说错了请各位斧正
        d.mu.RLock()
        d.mu.hook.OnJobRunBefore(job)
        d.mu.RUnlock()

        tidbutil.WithRecovery(func() {
                        // runDDLJob 一看就是重要函数,上面会说,次要是更新 5 种状态
            schemaVer, runJobErr = w.runDDLJob(d, t, job)
        }
                // 如果 cancel 了 就 finish
        if job.IsCancelled() {err = w.finishDDLJob(t, job)
        }
                // 更新 Job
        err = w.updateDDLJob(t, job, runJobErr != nil)

        d.mu.RLock()
        d.mu.hook.OnJobUpdated(job)
        d.mu.RUnlock()}
}

w.runDDLJob

func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {timeStart := time.Now()
    if job.IsFinished() {return}
    if job.IsCancelling() {return convertJob2RollbackJob(w, d, t, job)
    }
    if !job.IsRollingback() && !job.IsCancelling() {job.State = model.JobStateRunning}
        
        // 下面是不同状态的解决
        // 这里依据不同的 type 走不同的函数,咱们就看 onAddColumn 
    switch job.Type {
    case model.ActionAddColumn:
        ver, err = onAddColumn(d, t, job)
    default:
        job.State = model.JobStateCancelled
        err = errInvalidDDLJob.GenWithStack("invalid ddl job type: %v", job.Type)
    }
    if err != nil {// 这里次要异常情况}
    return
}

onAddColumn

func onAddColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
    // 如果 job 是回滚状态,就走 drop colunm
    if job.IsRollingback() {ver, err = onDropColumn(t, job)
    }
        // checkAddColumn 这里解决了几种状况,比方字段信息曾经存在且是 public,就 cancel
    tblInfo, columnInfo, col, pos, offset, err := checkAddColumn(t, job)
    if columnInfo == nil {
        // 依据 after first 语法 创立 ColumnInfo
    columnInfo, offset, err = createColumnInfo(tblInfo, col, pos)
    }
        // 五种状态 none -> delete only -> write only -> reorganization -> public
    originalState := columnInfo.State
    switch columnInfo.State {
    case model.StateNone:
        // none -> delete only
        ver, err = updateVersionAndTableInfoWithCheck(t, job, tblInfo, originalState != columnInfo.State)
    case model.StateDeleteOnly:
        // delete only -> write only
        ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != columnInfo.State)
    case model.StateWriteOnly:
        // write only -> reorganization
        ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != columnInfo.State)
    case model.StateWriteReorganization:
        // reorganization -> public
        adjustColumnInfoInAddColumn(tblInfo, offset)
        columnInfo.State = model.StatePublic
        ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != columnInfo.State)
        // Finish this job.
        job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
        asyncNotifyEvent(d, &util.Event{Tp: model.ActionAddColumn, TableInfo: tblInfo, ColumnInfo: columnInfo})
    default:
        err = ErrInvalidDDLState.GenWithStackByArgs("column", columnInfo.State)
    }
}

FinishTableJob

startTime := time.Now()
err = t.AddHistoryDDLJob(job, updateRawArgs)

最初退出 historyDDLJob,供查问历史 DDL 操作。再做些清理工作,整个 DDL 流程就差不多走完了。

其余 TiDB 的同步

还记得 domai.go 吧,还是 init 函数,在这里 TiDB 的非 owner 会隔一个 lease 工夫去同步 ver,大略看一下

init@domain.go

if ddlLease > 0 {do.wg.Add(1)
   // Local store needs to get the change information for every DDL state in each session.
   go do.loadSchemaInLoop(ddlLease)
}

do.loadSchemaInLoop

这里是具体过程了,循环 reload,

func (do *Domain) loadSchemaInLoop(lease time.Duration) {ticker := time.NewTicker(lease / 2)
    for {
        select {
        case <-ticker.C:
            err := do.Reload()
        case _, ok := <-syncer.GlobalVersionCh():
            err := do.Reload()
        case <-syncer.Done():
            do.SchemaValidator.Stop()
            err := do.mustRestartSyncer()
            exitLoop := do.mustReload()
        case <-do.exit:
            return
        }
    }
}

这里大抵逻辑有这些:

  1. 获取 version 并同步,若 reload 工夫超过 lease/2,则可能报错。
  2. 集群同一时刻最多只能有两个版本的 schema,若同步过程,某个 TiDB 的版本为 1,其余的曾经是 3 了(距离了个 2)则这个 TiDB 进行服务。
  3. 若与 PD 断链,进行服务。
    • *

感觉整个流程波及的货色也不少,有些中央还是要多看几遍领悟,之后少不了勘误这篇了????

正文完
 0