写这个货色也只是因为想简略把握下 TiDB 的源码,共事给了一些浏览思路,很赞。

有些中央如果了解的有问题还请批评教育,对 Go 语言了解的比拟无限。

如果不小心误导了读者,请见谅

TiDB 模块是应用 Go 语言开发的,应用 GoLand 编译器就能够了。

JetBrains出品

浏览源码,要寻找好的切入点,咱们抉择 main.go[1] 作为浏览源码的入口。

tidb-server/main.go 

这里的 main 函数能够 debug ,也是 TiDB 启动的结尾。

略微简化一下

func main() {   registerStores()   registerMetrics()   config.InitializeConfig(*configPath, *configCheck, *configStrict, reloadConfig, overrideConfig)   if config.GetGlobalConfig().OOMUseTmpStorage {      config.GetGlobalConfig().UpdateTempStoragePath()      initializeTempDir()   }   setCPUAffinity()   setupLog()   setupTracing()    setupBinlogClient()   setupMetrics()   createStoreAndDomain()   createServer()   runServer()}

能够看出,启动流程做的每个步骤都依照函数封装好了,大抵理解一下都做什么

// 注册storefunc registerStores() {    err := kvstore.Register("tikv", tikv.Driver{}) //注册TiKV    tikv.NewGCHandlerFunc = gcworker.NewGCWorker //为TiKV生成GCworker    err = kvstore.Register("mocktikv", mockstore.MockDriver{}) //注册默认存储引擎MockTiKV}//共注册100+ prometheus监控项,这里只表一项func RegisterMetrics() {     prometheus.MustRegister(AutoAnalyzeCounter)}// get全局configfunc InitializeConfig(confPath string, configCheck, configStrict bool, reloadFunc ConfReloadFunc, enforceCmdArgs func(*Config)) {    cfg := GetGlobalConfig()     StoreGlobalConfig(cfg)}

这个判断

if config.GetGlobalConfig().OOMUseTmpStorage {        config.GetGlobalConfig().UpdateTempStoragePath()        initializeTempDir()}
设置是否在单条 SQL 语句的内存应用超出 mem-quota-query [2] 限度时为某些算子启用长期磁盘。故若为 true ,则初始化 TempStoragePath 。
// 设置CPU亲和性func setCPUAffinity() { /    err := linux.SetAffinity(cpu)    runtime.GOMAXPROCS(len(cpu))}//配置零碎logfunc setupLog() {     err = logutil.InitLogger(cfg.Log.ToLogConfig()) // 这里配置格局、文件名、slowlog等}//注册分布式系统追踪链 jaegerfunc setupTracing() {    tracingCfg := cfg.OpenTracing.ToTracingConfig()    tracingCfg.ServiceName = "TiDB"    tracer, _, err := tracingCfg.NewTracer()    opentracing.SetGlobalTracer(tracer)}// 设置binlog信息func setupBinlogClient() {    if !cfg.Binlog.Enable { //若binlog.enable=false,则不开启binlog        return    }    if cfg.Binlog.IgnoreError { //若为true,则疏忽binlog报错        binloginfo.SetIgnoreError(true)    }    if len(cfg.Binlog.BinlogSocket) == 0 { //配置binlog输入网络地址        ...    }    binloginfo.SetPumpsClient(client) //配置binlog信息到pump客户端}// 配置监控func setupMetrics() {    runtime.SetMutexProfileFraction(10)// 对锁调用的跟踪    systimeErrHandler := func() {                // 示意TiDB的过程是否依然存在。                // 若10分钟内tidb_monitor_keep_alive_total                               // 次数<100,TiDB可能退出,此时会报警        metrics.TimeJumpBackCounter.Inc()     }    callBackCount := 0    sucessCallBack := func() {        callBackCount++        if callBackCount >= 5 {            callBackCount = 0            metrics.KeepAliveCounter.Inc() // KeepAlive监控         }    }}// 启动了一些重要的后盾过程func createStoreAndDomain() {    fullPath := fmt.Sprintf("%s://%s", cfg.Store, cfg.Path)    storage, err = kvstore.New(fullPath)    dom, err = session.BootstrapSession(storage)}}// 创立TiDB serverfunc createServer() {    driver := server.NewTiDBDriver(storage)    svr, err = server.NewServer(cfg, driver)    svr.SetDomain(dom)}//启动服务runServer() 

能够看到, runServer() 是启动TiDB流程中的的最初一步。

所以咱们跳转到 server.Run() 中,

这里有很多承受申请时的异样解决逻辑,在此不表。简化一下大略有四步,如下:

if s.cfg.Status.ReportStatus {    s.startStatusHTTP() //配置路由信息}for{    conn, err := s.listener.Accept()// 监听客户端申请    clientConn := s.newConn(conn)// 创立connection    go s.onConn(clientConn)// 应用connection解决申请}

首先配置了对于 TiDB 组件的很多路由信息,有趣味的能够看看。

server 一直监听网络申请,呈现新的客户端申请就创立一个新的 connection ,应用一个新 goroutine 来继续为它提供服务。

后续就是通过 go s.onConn(clientConn) 解决客户端申请,咱们来一探到底接下来的流程,简化下代码,大抵如下

func (s *Server) onConn(conn *clientConn) {    ctx := logutil.WithConnID(context.Background(), conn.connectionID)    if err := conn.handshake(ctx); err != nil {        if plugin.IsEnable(plugin.Audit) && conn.ctx != nil {            conn.ctx.GetSessionVars().ConnectionInfo = conn.connectInfo()            })        }    }    connectedTime := time.Now()    conn.Run(ctx)}

将建链 info 写入到 session 中,统计一下链路建设胜利的工夫,胜利后,通过 conn.Run(ctx) 解决客户端申请。

func (cc *clientConn) Run(ctx context.Context) {    for {        waitTimeout := cc.getSessionVarsWaitTimeout(ctx)        cc.pkt.setReadTimeout(time.Duration(waitTimeout) * time.Second)        data, err := cc.readPacket()        if err = cc.dispatch(ctx, data); err != nil {            ...        }    }}

简化后,解决逻辑大抵是,

不停的通过 cc.readPacket() 读取客户端发来的网络包。如果这个期间产生了期待网络包超时的景象,则 close connection 。

读取 data 胜利后,将它传入 cc.dispatch(ctx,data) ,这也是解决 SQL 申请的入口了。

当 SQL 来到 很正当 ,顾名思义,是对不同的 MySQL 协定进行调度,

server/conn.go

Then.

func (cc *clientConn) dispatch(ctx context.Context, data []byte) error {     cc.lastPacket = data    cmd := data[0]    data = data[1:]    dataStr := string(hack.String(data))}

客户端申请MySQL协定报文格式

TiDB 也要依据这个格局进行解析, data[0] 就是 data 的第一个 byte ,其余的是命令。

MySQL 申请报文的命令列表

0x00 COM_SLEEP 外部线程状态0x01 COM_QUIT 敞开连贯0x02 COM_INIT_DB 切换数据库0x03 COM_QUERY SQL查问申请0x04 COM_FIELD_LIST 获取数据表字段信息0x05 COM_CREATE_DB 创立数据库0x06 COM_DROP_DB 删除数据库0x07 COM_REFRESH 革除缓存0x08 COM_SHUTDOWN 进行服务器0x09 COM_STATISTICS 获取服务器统计信息0x0A COM_PROCESS_INFO 获取以后连贯的列表0x0B COM_CONNECT 外部线程状态0x0C COM_PROCESS_KILL 中断某个连贯0x0D COM_DEBUG 保留服务器调试信息0x0E COM_PING 测试连通性0x0F COM_TIME 外部线程状态0x10 COM_DELAYED_INSERT 外部线程状态0x11 COM_CHANGE_USER 从新登陆0x12 COM_BINLOG_DUMP 获取二进制日志信息0x13 COM_TABLE_DUMP 获取数据表构造信息0x14 COM_CONNECT_OUT 外部线程状态0x15 COM_REGISTER_SLAVE 从服务器向主服务器进行注册0x16 COM_STMT_PREPARE 预处理SQL语句0x17 COM_STMT_EXECUTE 执行预处理语句0x18 COM_STMT_SEND_LONG_DATA 发送BLOB类型的数据0x19 COM_STMT_CLOSE 销毁预处理语句0x1A COM_STMT_RESET 革除预处理语句参数缓存0x1B COM_SET_OPTION 设置语句选项0x1C COM_STMT_FETCH 获取预处理语句的执行后果

咱们看一下 dispatch 接下来的局部,也就是目前 TiDB 实现的局部 MySQL 协定了

switch cmd {    case mysql.ComPing,              mysql.ComStmtClose,              mysql.ComStmtSendLongData,              mysql.ComStmtReset,         mysql.ComSetOption,              mysql.ComChangeUser:        cc.ctx.SetProcessInfo("", t, cmd, 0)    case mysql.ComInitDB:        cc.ctx.SetProcessInfo("use "+dataStr, t, cmd, 0)    }switch cmd {    case mysql.ComSleep:    case mysql.ComQuit:    case mysql.ComQuery:    case mysql.ComPing:    case mysql.ComInitDB:    case mysql.ComFieldList:    case mysql.ComStmtPrepare:    case mysql.ComStmtExecute:    case mysql.ComStmtFetch:    case mysql.ComStmtClose:    case mysql.ComStmtSendLongData:    case mysql.ComStmtReset:    case mysql.ComSetOption:    case mysql.ComChangeUser    default: // other not support

目前 TiDB 的绝大部分 SQL 语句都是走的 ComQuery ,感兴趣的同学能够自行 debug 一下。

case mysql.ComQuery:     if len(data) > 0 && data[len(data)-1] == 0 {        data = data[:len(data)-1]        dataStr = string(hack.String(data))    }    return cc.handleQuery(ctx, dataStr)

于是咱们认真看看这个 case , 要去看下 cc.handleQuery(ctx , dataStr) 。

func (cc *clientConn) handleQuery(ctx context.Context, sql string) (err error) {     stmts, err := cc.ctx.Execute(ctx, sql)}

之后就要进入解析、优化 SQL 的局部 ,咱们还是要先简略了解下 Lex & Yacc。