写这个货色也只是因为想简略把握下 TiDB 的源码,共事给了一些浏览思路,很赞。
有些中央如果了解的有问题还请批评教育,对 Go 语言了解的比拟无限。
如果不小心误导了读者,请见谅
TiDB 模块是应用 Go 语言开发的,应用 GoLand 编译器就能够了。
JetBrains 出品
浏览源码,要寻找好的切入点,咱们抉择 main.go[1] 作为浏览源码的入口。
tidb-server/main.go
这里的 main 函数能够 debug,也是 TiDB 启动的结尾。
略微简化一下
func main() {registerStores()
registerMetrics()
config.InitializeConfig(*configPath, *configCheck, *configStrict, reloadConfig, overrideConfig)
if config.GetGlobalConfig().OOMUseTmpStorage {config.GetGlobalConfig().UpdateTempStoragePath()
initializeTempDir()}
setCPUAffinity()
setupLog()
setupTracing()
setupBinlogClient()
setupMetrics()
createStoreAndDomain()
createServer()
runServer()}
能够看出,启动流程做的每个步骤都依照函数封装好了,大抵理解一下都做什么
// 注册 store
func registerStores() {err := kvstore.Register("tikv", tikv.Driver{}) // 注册 TiKV
tikv.NewGCHandlerFunc = gcworker.NewGCWorker // 为 TiKV 生成 GCworker
err = kvstore.Register("mocktikv", mockstore.MockDriver{}) // 注册默认存储引擎 MockTiKV
}
// 共注册 100+ prometheus 监控项,这里只表一项
func RegisterMetrics() {prometheus.MustRegister(AutoAnalyzeCounter)
}
// get 全局 config
func InitializeConfig(confPath string, configCheck, configStrict bool, reloadFunc ConfReloadFunc, enforceCmdArgs func(*Config)) {cfg := GetGlobalConfig()
StoreGlobalConfig(cfg)
}
这个判断
if config.GetGlobalConfig().OOMUseTmpStorage {config.GetGlobalConfig().UpdateTempStoragePath()
initializeTempDir()}
设置是否在单条 SQL 语句的内存应用超出 mem-quota-query [2] 限度时为某些算子启用长期磁盘。故若为 true,则初始化 TempStoragePath。
// 设置 CPU 亲和性
func setCPUAffinity() { /
err := linux.SetAffinity(cpu)
runtime.GOMAXPROCS(len(cpu))
}
// 配置零碎 log
func setupLog() {err = logutil.InitLogger(cfg.Log.ToLogConfig()) // 这里配置格局、文件名、slowlog 等
}
// 注册分布式系统追踪链 jaeger
func setupTracing() {tracingCfg := cfg.OpenTracing.ToTracingConfig()
tracingCfg.ServiceName = "TiDB"
tracer, _, err := tracingCfg.NewTracer()
opentracing.SetGlobalTracer(tracer)
}
// 设置 binlog 信息
func setupBinlogClient() {
if !cfg.Binlog.Enable { // 若 binlog.enable=false,则不开启 binlog
return
}
if cfg.Binlog.IgnoreError { // 若为 true,则疏忽 binlog 报错
binloginfo.SetIgnoreError(true)
}
if len(cfg.Binlog.BinlogSocket) == 0 { // 配置 binlog 输入网络地址
...
}
binloginfo.SetPumpsClient(client) // 配置 binlog 信息到 pump 客户端
}
// 配置监控
func setupMetrics() {runtime.SetMutexProfileFraction(10)// 对锁调用的跟踪
systimeErrHandler := func() {
// 示意 TiDB 的过程是否依然存在。// 若 10 分钟内 tidb_monitor_keep_alive_total
// 次数 <100,TiDB 可能退出,此时会报警
metrics.TimeJumpBackCounter.Inc()}
callBackCount := 0
sucessCallBack := func() {
callBackCount++
if callBackCount >= 5 {
callBackCount = 0
metrics.KeepAliveCounter.Inc() // KeepAlive 监控}
}
}
// 启动了一些重要的后盾过程
func createStoreAndDomain() {fullPath := fmt.Sprintf("%s://%s", cfg.Store, cfg.Path)
storage, err = kvstore.New(fullPath)
dom, err = session.BootstrapSession(storage)}
}
// 创立 TiDB server
func createServer() {driver := server.NewTiDBDriver(storage)
svr, err = server.NewServer(cfg, driver)
svr.SetDomain(dom)
}
// 启动服务
runServer()
能够看到,runServer() 是启动 TiDB 流程中的的最初一步。
所以咱们跳转到 server.Run() 中,
这里有很多承受申请时的异样解决逻辑,在此不表。简化一下大略有四步,如下:
if s.cfg.Status.ReportStatus {s.startStatusHTTP() // 配置路由信息
}
for{conn, err := s.listener.Accept()// 监听客户端申请
clientConn := s.newConn(conn)// 创立 connection
go s.onConn(clientConn)// 应用 connection 解决申请
}
首先配置了对于 TiDB 组件的很多路由信息,有趣味的能够看看。
server 一直监听网络申请,呈现新的客户端申请就创立一个新的 connection,应用一个新 goroutine 来继续为它提供服务。
后续就是通过 go s.onConn(clientConn) 解决客户端申请,咱们来一探到底接下来的流程,简化下代码,大抵如下
func (s *Server) onConn(conn *clientConn) {ctx := logutil.WithConnID(context.Background(), conn.connectionID)
if err := conn.handshake(ctx); err != nil {if plugin.IsEnable(plugin.Audit) && conn.ctx != nil {conn.ctx.GetSessionVars().ConnectionInfo = conn.connectInfo()})
}
}
connectedTime := time.Now()
conn.Run(ctx)
}
将建链 info 写入到 session 中,统计一下链路建设胜利的工夫,胜利后,通过 conn.Run(ctx) 解决客户端申请。
func (cc *clientConn) Run(ctx context.Context) {
for {waitTimeout := cc.getSessionVarsWaitTimeout(ctx)
cc.pkt.setReadTimeout(time.Duration(waitTimeout) * time.Second)
data, err := cc.readPacket()
if err = cc.dispatch(ctx, data); err != nil {...}
}
}
简化后,解决逻辑大抵是,
不停的通过 cc.readPacket() 读取客户端发来的网络包。如果这个期间产生了期待网络包超时的景象,则 close connection。
读取 data 胜利后,将它传入 cc.dispatch(ctx,data),这也是解决 SQL 申请的入口了。
当 SQL 来到 很正当,顾名思义,是对不同的 MySQL 协定进行调度,
server/conn.go
Then.
func (cc *clientConn) dispatch(ctx context.Context, data []byte) error {
cc.lastPacket = data
cmd := data[0]
data = data[1:]
dataStr := string(hack.String(data))
}
客户端申请 MySQL 协定报文格式
TiDB 也要依据这个格局进行解析,data[0] 就是 data 的第一个 byte,其余的是命令。
MySQL 申请报文的命令列表
0x00 COM_SLEEP 外部线程状态
0x01 COM_QUIT 敞开连贯
0x02 COM_INIT_DB 切换数据库
0x03 COM_QUERY SQL 查问申请
0x04 COM_FIELD_LIST 获取数据表字段信息
0x05 COM_CREATE_DB 创立数据库
0x06 COM_DROP_DB 删除数据库
0x07 COM_REFRESH 革除缓存
0x08 COM_SHUTDOWN 进行服务器
0x09 COM_STATISTICS 获取服务器统计信息
0x0A COM_PROCESS_INFO 获取以后连贯的列表
0x0B COM_CONNECT 外部线程状态
0x0C COM_PROCESS_KILL 中断某个连贯
0x0D COM_DEBUG 保留服务器调试信息
0x0E COM_PING 测试连通性
0x0F COM_TIME 外部线程状态
0x10 COM_DELAYED_INSERT 外部线程状态
0x11 COM_CHANGE_USER 从新登陆
0x12 COM_BINLOG_DUMP 获取二进制日志信息
0x13 COM_TABLE_DUMP 获取数据表构造信息
0x14 COM_CONNECT_OUT 外部线程状态
0x15 COM_REGISTER_SLAVE 从服务器向主服务器进行注册
0x16 COM_STMT_PREPARE 预处理 SQL 语句
0x17 COM_STMT_EXECUTE 执行预处理语句
0x18 COM_STMT_SEND_LONG_DATA 发送 BLOB 类型的数据
0x19 COM_STMT_CLOSE 销毁预处理语句
0x1A COM_STMT_RESET 革除预处理语句参数缓存
0x1B COM_SET_OPTION 设置语句选项
0x1C COM_STMT_FETCH 获取预处理语句的执行后果
咱们看一下 dispatch 接下来的局部,也就是目前 TiDB 实现的局部 MySQL 协定了
switch cmd {
case mysql.ComPing,
mysql.ComStmtClose,
mysql.ComStmtSendLongData,
mysql.ComStmtReset,
mysql.ComSetOption,
mysql.ComChangeUser:
cc.ctx.SetProcessInfo("", t, cmd, 0)
case mysql.ComInitDB:
cc.ctx.SetProcessInfo("use"+dataStr, t, cmd, 0)
}
switch cmd {
case mysql.ComSleep:
case mysql.ComQuit:
case mysql.ComQuery:
case mysql.ComPing:
case mysql.ComInitDB:
case mysql.ComFieldList:
case mysql.ComStmtPrepare:
case mysql.ComStmtExecute:
case mysql.ComStmtFetch:
case mysql.ComStmtClose:
case mysql.ComStmtSendLongData:
case mysql.ComStmtReset:
case mysql.ComSetOption:
case mysql.ComChangeUser
default: // other not support
目前 TiDB 的绝大部分 SQL 语句都是走的 ComQuery,感兴趣的同学能够自行 debug 一下。
case mysql.ComQuery:
if len(data) > 0 && data[len(data)-1] == 0 {data = data[:len(data)-1]
dataStr = string(hack.String(data))
}
return cc.handleQuery(ctx, dataStr)
于是咱们认真看看这个 case,要去看下 cc.handleQuery(ctx , dataStr)。
func (cc *clientConn) handleQuery(ctx context.Context, sql string) (err error) {stmts, err := cc.ctx.Execute(ctx, sql)
}
之后就要进入解析、优化 SQL 的局部,咱们还是要先简略了解下 Lex & Yacc。