- 数据申请阶段 -
Part 1 - 简略查问
- 客户端发送 Query (‘Q’) 音讯给服务端,蕴含了一条字符串类型的 SQL 语句。
func(cn *conn) query(query string, args []driver.Value)(_ *rows, err error){ ... // Check to see if we can use the "simpleQuery"interface, which is // *much* faster than going through prepare/exec iflen(args)==0{ return cn.simpleQuery(query) } ...}
- 服务端收到 Query 音讯,解析 SQL 语句,生成形象语法树 (AST),并传给执行器执行,取得后果。
func(c *conn) serveImpl( ctx context.Context, draining func()bool, sqlServer *sql.Server, reserved mon.BoundAccount, stopper *stop.Stopper,)error{ ...Loop: for{ typ, n, err = c.readBuf.ReadTypedMsg(&c.rd) if err !=nil{ break Loop } ... switch typ { case pgwirebase.ClientMsgSimpleQuery: ... case pgwirebase.ClientMsgExecute: ... case pgwirebase.ClientMsgParse: ... case pgwirebase.ClientMsgDescribe: ... case pgwirebase.ClientMsgBind: ... case pgwirebase.ClientMsgSync: ... } } ...}
- 服务端依据 SQL 后果,首先发送 RowDescription(B:‘T’) 音讯,蕴含列的数量,列名,列的类型等参数。
func(c *conn) writeRowDescription( ctx context.Context, columns []sqlbase.ResultColumn, formatCodes []pgwirebase.FormatCode, w io.Writer,)error{ c.msgBuilder.initMsg(pgwirebase.ServerMsgRowDescription) c.msgBuilder.putInt16(int16(len(columns))) for i, column :=range columns { ... c.msgBuilder.writeTerminatedString(column.Name) ... c.msgBuilder.putInt32(0)//Table OID (optional). c.msgBuilder.putInt16(0)//Column attribute ID (optional). c.msgBuilder.putInt32(int32(typ.oid)) c.msgBuilder.putInt16(int16(typ.size)) ... } ...}
- RowDescription 音讯前面将跟着多个 DataRow(B:‘D’) 音讯,每个 DataRow 音讯蕴含一行的数据。
func(c *conn) bufferRow( ctx context.Context, row tree.Datums, formatCodes []pgwirebase.FormatCode, convsessiondata.DataConversionConfig, types []*types.T,){ c.msgBuilder.initMsg(pgwirebase.ServerMsgDataRow) c.msgBuilder.putInt16(int16(len(row))) for i, col :=range row { ... switch fmtCode { case pgwirebase.FormatText: c.msgBuilder.writeTextDatum(ctx, col, conv, types[i]) case pgwirebase.FormatBinary: c.msgBuilder.writeBinaryDatum(ctx, col, conv.Location, types[i]) ... } if err := c.msgBuilder.finishMsg(&c.writerState.buf); err !=nil{ panic(fmt.Sprintf("unexpected err from buffer: %s", err)) }}
- 发送 CommandComplete(B:‘C’) 音讯示意这个 SQL 申请执行完结了。
- 服务端发送 ReadyForQuery(‘Z’),告诉客户端能够发送下一条 SQL 申请了。
func(r *commandResult) Close(ctx context.Context, t sql.TransactionStatusIndicator){ ... switch r.typ { case commandComplete: tag := cookTag( r.cmdCompleteTag, r.conn.writerState.tagBuf[:0], r.stmtType, r.rowsAffected, ) r.conn.bufferCommandComplete(tag) case parseComplete: r.conn.bufferParseComplete() case bindComplete: r.conn.bufferBindComplete() case closeComplete: r.conn.bufferCloseComplete() case readyForQuery: r.conn.bufferReadyForQuery(byte(t)) // The error is saved on conn.err. _ /* err */= r.conn.Flush(r.pos) ... } ...}
- 客户端依据承受 SQL 申请的后果。
func(cn *conn) simpleQuery(q string)(res *rows, err error){ b := cn.writeBuf('Q') b.string(q) cn.send(b) for{ t, r := cn.recv1() switch t { case'C','I': ... case'Z': ... case'E': ... case'D': ... case'T': ... } }}
Part 2 - 扩大查问
- 客户端发送扩大查问申请,顺次发送 Parse (F:‘P’), Bind (F:‘B’), Describe (F:‘D’), Execute (F:‘E’), Sync(F:‘S’) 音讯。
func(cn *conn) query(query string, args []driver.Value)(_ *rows, err error){ ... if cn.binaryParameters { cn.sendBinaryModeQuery(query, args) cn.readParseResponse() cn.readBindResponse() rows :=&rows{cn: cn} rows.rowsHeader = cn.readPortalDescribeResponse() cn.postExecuteWorkaround() return rows,nil } ...}
func(cn *conn) sendBinaryModeQuery(query string, args []driver.Value){ b := cn.writeBuf('P') b.byte(0)//unnamed statement b.string(query) b.int16(0) b.next('B') b.int16(0)//unnamed portal and statement cn.sendBinaryParameters(b, args) b.bytes(colFmtDataAllText) b.next('D') b.byte('P') b.byte(0)//unnamed portal b.next('E') b.byte(0) b.int32(0) b.next('S') cn.send(b)}
- 服务端解决扩大查问申请。
- 服务端发送回应音讯,ParseComplete (B:‘1’), BindComplete (B:‘2’), ParameterDescription(B:‘t’), CommandComplete (B:‘C’), CloseComplete (B:‘3’), ReadyForQuery (B:‘Z’)。
func(r *commandResult) Close(ctx context.Context, t sql.TransactionStatusIndicator){ ... switch r.typ { case commandComplete: tag := cookTag( r.cmdCompleteTag, r.conn.writerState.tagBuf[:0], r.stmtType, r.rowsAffected, ) r.conn.bufferCommandComplete(tag) case parseComplete: r.conn.bufferParseComplete() case bindComplete: r.conn.bufferBindComplete() case closeComplete: r.conn.bufferCloseComplete() case readyForQuery: r.conn.bufferReadyForQuery(byte(t)) // The error is saved on conn.err. _ /* err */= r.conn.Flush(r.pos) ... } ...}