– 数据申请阶段 –
Part 1 – 简略查问
- 客户端发送 Query (‘Q’) 音讯给服务端,蕴含了一条字符串类型的 SQL 语句。
func(cn *conn) query(query string, args []driver.Value)(_ *rows, err error){
...
// Check to see if we can use the "simpleQuery"interface, which is
// *much* faster than going through prepare/exec
iflen(args)==0{return cn.simpleQuery(query)
}
...
}
- 服务端收到 Query 音讯,解析 SQL 语句,生成形象语法树 (AST),并传给执行器执行,取得后果。
func(c *conn) serveImpl(
ctx context.Context,
draining func()bool,
sqlServer *sql.Server,
reserved mon.BoundAccount,
stopper *stop.Stopper,
)error{
...
Loop:
for{typ, n, err = c.readBuf.ReadTypedMsg(&c.rd)
if err !=nil{break Loop}
...
switch typ {
case pgwirebase.ClientMsgSimpleQuery:
...
case pgwirebase.ClientMsgExecute:
...
case pgwirebase.ClientMsgParse:
...
case pgwirebase.ClientMsgDescribe:
...
case pgwirebase.ClientMsgBind:
...
case pgwirebase.ClientMsgSync:
...
}
}
...
}
- 服务端依据 SQL 后果,首先发送 RowDescription(B:‘T’) 音讯,蕴含列的数量,列名,列的类型等参数。
func(c *conn) writeRowDescription(
ctx context.Context,
columns []sqlbase.ResultColumn,
formatCodes []pgwirebase.FormatCode,
w io.Writer,
)error{c.msgBuilder.initMsg(pgwirebase.ServerMsgRowDescription)
c.msgBuilder.putInt16(int16(len(columns)))
for i, column :=range columns {
...
c.msgBuilder.writeTerminatedString(column.Name)
...
c.msgBuilder.putInt32(0)//Table OID (optional).
c.msgBuilder.putInt16(0)//Column attribute ID (optional).
c.msgBuilder.putInt32(int32(typ.oid))
c.msgBuilder.putInt16(int16(typ.size))
...
}
...
}
- RowDescription 音讯前面将跟着多个 DataRow(B:‘D’) 音讯,每个 DataRow 音讯蕴含一行的数据。
func(c *conn) bufferRow(
ctx context.Context,
row tree.Datums,
formatCodes []pgwirebase.FormatCode,
convsessiondata.DataConversionConfig,
types []*types.T,){c.msgBuilder.initMsg(pgwirebase.ServerMsgDataRow)
c.msgBuilder.putInt16(int16(len(row)))
for i, col :=range row {
...
switch fmtCode {
case pgwirebase.FormatText:
c.msgBuilder.writeTextDatum(ctx, col, conv, types[i])
case pgwirebase.FormatBinary:
c.msgBuilder.writeBinaryDatum(ctx, col, conv.Location, types[i])
...
}
if err := c.msgBuilder.finishMsg(&c.writerState.buf); err !=nil{panic(fmt.Sprintf("unexpected err from buffer: %s", err))
}
}
- 发送 CommandComplete(B:‘C’) 音讯示意这个 SQL 申请执行完结了。
- 服务端发送 ReadyForQuery(‘Z’),告诉客户端能够发送下一条 SQL 申请了。
func(r *commandResult) Close(ctx context.Context, t sql.TransactionStatusIndicator){
...
switch r.typ {
case commandComplete:
tag := cookTag(r.cmdCompleteTag, r.conn.writerState.tagBuf[:0], r.stmtType, r.rowsAffected,
)
r.conn.bufferCommandComplete(tag)
case parseComplete:
r.conn.bufferParseComplete()
case bindComplete:
r.conn.bufferBindComplete()
case closeComplete:
r.conn.bufferCloseComplete()
case readyForQuery:
r.conn.bufferReadyForQuery(byte(t))
// The error is saved on conn.err.
_ /* err */= r.conn.Flush(r.pos)
...
}
...
}
- 客户端依据承受 SQL 申请的后果。
func(cn *conn) simpleQuery(q string)(res *rows, err error){b := cn.writeBuf('Q')
b.string(q)
cn.send(b)
for{t, r := cn.recv1()
switch t {
case'C','I':
...
case'Z':
...
case'E':
...
case'D':
...
case'T':
...
}
}
}
Part 2 – 扩大查问
- 客户端发送扩大查问申请,顺次发送 Parse (F:‘P’), Bind (F:‘B’), Describe (F:‘D’), Execute (F:‘E’), Sync(F:‘S’) 音讯。
func(cn *conn) query(query string, args []driver.Value)(_ *rows, err error){
...
if cn.binaryParameters {cn.sendBinaryModeQuery(query, args)
cn.readParseResponse()
cn.readBindResponse()
rows :=&rows{cn: cn}
rows.rowsHeader = cn.readPortalDescribeResponse()
cn.postExecuteWorkaround()
return rows,nil
}
...
}
func(cn *conn) sendBinaryModeQuery(query string, args []driver.Value){b := cn.writeBuf('P')
b.byte(0)//unnamed statement
b.string(query)
b.int16(0)
b.next('B')
b.int16(0)//unnamed portal and statement
cn.sendBinaryParameters(b, args)
b.bytes(colFmtDataAllText)
b.next('D')
b.byte('P')
b.byte(0)//unnamed portal
b.next('E')
b.byte(0)
b.int32(0)
b.next('S')
cn.send(b)
}
- 服务端解决扩大查问申请。
- 服务端发送回应音讯,ParseComplete (B:‘1’), BindComplete (B:‘2’), ParameterDescription(B:‘t’), CommandComplete (B:‘C’), CloseComplete (B:‘3’), ReadyForQuery (B:‘Z’)。
func(r *commandResult) Close(ctx context.Context, t sql.TransactionStatusIndicator){
...
switch r.typ {
case commandComplete:
tag := cookTag(r.cmdCompleteTag, r.conn.writerState.tagBuf[:0], r.stmtType, r.rowsAffected,
)
r.conn.bufferCommandComplete(tag)
case parseComplete:
r.conn.bufferParseComplete()
case bindComplete:
r.conn.bufferBindComplete()
case closeComplete:
r.conn.bufferCloseComplete()
case readyForQuery:
r.conn.bufferReadyForQuery(byte(t))
// The error is saved on conn.err.
_ /* err */= r.conn.Flush(r.pos)
...
}
...
}