前言
后面几章咱们实现了 服务端的编写 当初开始客户端编写
https://github.com/dollarkill…
Client
type Client struct {options *Options}
func NewClient(discover discovery.Discovery, options ...Option) *Client {
client := &Client{options: defaultOptions(),
}
client.options.Discovery = discover
for _, fn := range options {fn(client.options)
}
return client
}
option
type Options struct {
Discovery discovery.Discovery // 服务发现插件
loadBalancing load_banlancing.LoadBalancing // 负载平衡插件
serializationType codes.SerializationType // 序列化插件
compressorType codes.CompressorType // 压缩插件
pool int // 连接池大小
cryptology cryptology.Cryptology
rsaPublicKey []byte
writeTimeout time.Duration
readTimeout time.Duration
heartBeat time.Duration
Trace bool
AUTH string // AUTH TOKEN
}
func defaultOptions() *Options {defaultPoolSize := runtime.NumCPU() * 4
if defaultPoolSize < 20 {defaultPoolSize = 20}
return &Options{
pool: defaultPoolSize,
serializationType: codes.MsgPack,
compressorType: codes.Snappy,
loadBalancing: load_banlancing.NewPolling(),
cryptology: cryptology.AES,
rsaPublicKey: []byte(`
-----BEGIN PUBLIC KEY-----
-----END PUBLIC KEY-----`),
writeTimeout: time.Minute,
readTimeout: time.Minute * 3,
heartBeat: time.Minute,
Trace: false,
AUTH: "",
}
}
具体每个链接
type Connect struct {
Client *Client
pool *connectPool
close chan struct{}
serverName string
}
func (c *Client) NewConnect(serverName string) (conn *Connect, err error) {
connect := &Connect{
Client: c,
serverName: serverName,
close: make(chan struct{}),
}
connect.pool, err = initPool(connect)
return connect, err
}
初始化连接池
func initPool(c *Connect) (*connectPool, error) {
cp := &connectPool{
connect: c,
pool: make(chan LightClient, c.Client.options.pool),
}
return cp, cp.initPool()}
func (c *connectPool) initPool() error {hosts, err := c.connect.Client.options.Discovery.Discovery(c.connect.serverName) // 调用服务发现 查看 发现具体服务
if err != nil {return err}
if len(hosts) == 0 {return errors.New(fmt.Sprintf("%s server 404", c.connect.serverName))
}
c.connect.Client.options.loadBalancing.InitBalancing(hosts) // 初始化 负载平衡插件
// 初始化连接池
for i := 0; i < c.connect.Client.options.pool; i++ {client, err := newBaseClient(c.connect.serverName, c.connect.Client.options) // 建设链接
if err != nil {return errors.WithStack(err)
}
c.pool <- client
}
return nil
}
// 连接池中获取一个链接
func (c *connectPool) Get(ctx context.Context) (LightClient, error) {
select {case <-ctx.Done():
return nil, errors.New("pool get timeout")
case r := <-c.pool:
return r, nil
}
}
// 放回一个链接
func (c *connectPool) Put(client LightClient) {if client.Error() == nil {
c.pool <- client
return
}
// 如果 client.Error() 有异样 须要新初始化一个链接 放入连接池
go func() {fmt.Println("The server starts to restore")
for {time.Sleep(time.Second)
hosts, err := c.connect.Client.options.Discovery.Discovery(c.connect.serverName)
if err != nil {log.Println(err)
continue
}
if len(hosts) == 0 {err := errors.New(fmt.Sprintf("%s server 404", c.connect.serverName))
log.Println(err)
continue
}
c.connect.Client.options.loadBalancing.InitBalancing(hosts)
baseClient, err := newBaseClient(c.connect.serverName, c.connect.Client.options)
if err != nil {log.Println(err)
continue
}
c.pool <- baseClient
fmt.Println("Service recovery success")
break
}
}()}
Connect 调用具体服务
func (c *Connect) Call(ctx *light.Context, serviceMethod string, request interface{}, response interface{}) error {ctxT, _ := context.WithTimeout(context.TODO(), time.Second*6)
var err error
// 连接池中获取一个链接
client, err := c.pool.Get(ctxT)
if err != nil {return errors.WithStack(err)
}
// 用完 放回链接
defer func() {c.pool.Put(client)
}()
// 设置 token
ctx.SetValue("Light_AUTH", c.Client.options.AUTH)
// 具体调用
err = client.Call(ctx, serviceMethod, request, response)
if err != nil {return errors.WithStack(err)
}
return nil
}
调用外围 重点
温习 s03 协定设计
/**
协定设计
起始符 : 版本号 : crc32 校验 : magicNumberSize: serverNameSize : serverMethodSize : metaDataSize : payloadSize: respType : compressorType : serializationType : magicNumber : serverName : serverMethod : metaData : payload
0x05 : 0x01 : 4 : 4 : 4 : 4 : 4 : 4 : 1 : 1 : 1 : xxx : xxx : xxx : xxx : xxx
*/
留神: 每一个申请都有一个 magicNumber 都有一个申请 ID
单个链接定义
type BaseClient struct {
conn net.Conn
options *Options
serverName string
aesKey []byte
serialization codes.Serialization
compressor codes.Compressor
respInterMap map[string]*respMessage
respInterRM sync.RWMutex // 返回构造锁
writeMu sync.Mutex // 写锁
err error // 谬误
close chan struct{} // 用于敞开服务}
type respMessage struct {response interface{}
ctx *light.Context
respChan chan error
}
初始化单个链接
func newBaseClient(serverName string, options *Options) (*BaseClient, error) {
// 服务发现用
service, err := options.loadBalancing.GetService()
if err != nil {return nil, err}
con, err := transport.Client.Gen(service.Protocol, service.Addr)
if err != nil {return nil, errors.WithStack(err)
}
serialization, ex := codes.SerializationManager.Get(options.serializationType)
if !ex {return nil, pkg.ErrSerialization404}
compressor, ex := codes.CompressorManager.Get(options.compressorType)
if !ex {return nil, pkg.ErrCompressor404}
// 握手
encrypt, err := cryptology.RsaEncrypt([]byte(options.AUTH), options.rsaPublicKey)
if err != nil {return nil, err}
aesKey := []byte(strings.ReplaceAll(uuid.New().String(), "-", ""))
// 替换秘钥
aesKey2, err := cryptology.RsaEncrypt(aesKey, options.rsaPublicKey)
if err != nil {return nil, err}
handshake := protocol.EncodeHandshake(aesKey2, encrypt, []byte(""))
_, err = con.Write(handshake)
if err != nil {con.Close()
return nil, err
}
hsk := &protocol.Handshake{}
err = hsk.Handshake(con)
if err != nil {con.Close()
return nil, err
}
if hsk.Error != nil && len(hsk.Error) > 0 {con.Close()
err := string(hsk.Error)
return nil, errors.New(err)
}
bc := &BaseClient{
serverName: serverName,
conn: con,
options: options,
serialization: serialization,
compressor: compressor,
respInterMap: map[string]*respMessage{},
aesKey: aesKey,
close: make(chan struct{}),
}
go bc.heartBeat() // 心跳服务
go bc.processMessageManager() // 返回音讯的解决
return bc, nil
}
heartBeat 心跳服务
func (b *BaseClient) heartBeat() {defer func() {fmt.Println("heartBeat Close")
}()
loop:
for {
select {
case <-b.close:
break loop
case <-time.After(b.options.heartBeat): // 定时发送心跳
_, i, err := protocol.EncodeMessage("x", []byte(""), []byte(""), []byte(""), byte(protocol.HeartBeat), byte(b.options.compressorType), byte(b.options.serializationType), []byte(""))
if err != nil {log.Println(err)
break
}
now := time.Now()
b.conn.SetDeadline(now.Add(b.options.writeTimeout))
b.conn.SetWriteDeadline(now.Add(b.options.writeTimeout))
b.writeMu.Lock()
_, err = b.conn.Write(i)
b.writeMu.Unlock()
if err != nil {
b.err = err
break loop
}
}
}
}
processMessageManager 返回音讯的解决服务 (留神这里能够并发的来)
func (b *BaseClient) processMessageManager() {defer func() {fmt.Println("processMessageManager Close")
}()
for {magic, respChan, err := b.processMessage() // 解决某个音讯
if err == nil && magic == "" {continue}
if err != nil && magic == "" {break}
if err != nil && magic != "" && respChan != nil {respChan <- err}
if err == nil && magic != "" && respChan != nil {close(respChan)
}
}
}
func (b *BaseClient) processMessage() (magic string, respChan chan error, err error) {
// 3. 封装回执
now := time.Now()
b.conn.SetReadDeadline(now.Add(b.options.readTimeout))
proto := protocol.NewProtocol()
msg, err := proto.IODecode(b.conn)
if err != nil {
b.err = err
close(b.close)
return "", nil, err
}
// heartbeat
if msg.Header.RespType == byte(protocol.HeartBeat) {
if b.options.Trace {log.Println("is HeartBeat")
}
return "", nil, nil
}
b.respInterRM.RLock()
message, ex := b.respInterMap[msg.MagicNumber]
b.respInterRM.RUnlock()
if !ex { // 不存在 代表音讯曾经生效
if b.options.Trace {log.Println("Not Ex", msg.MagicNumber)
}
return "", nil, nil
}
comp, ex := codes.CompressorManager.Get(codes.CompressorType(msg.Header.CompressorType))
if !ex {return "", nil, nil}
// 1. 解压缩
msg.MetaData, err = comp.Unzip(msg.MetaData)
if err != nil {return "", nil, err}
msg.Payload, err = comp.Unzip(msg.Payload)
if err != nil {return "", nil, err}
// 2. 解密
msg.MetaData, err = cryptology.AESDecrypt(b.aesKey, msg.MetaData)
if err != nil {if len(msg.MetaData) != 0 {return "", nil, err}
msg.Payload = []byte("")
}
msg.Payload, err = cryptology.AESDecrypt(b.aesKey, msg.Payload)
if err != nil {if len(msg.Payload) != 0 {return "", nil, err}
msg.Payload = []byte("")
}
// 3. 反序列化 RespError
mtData := make(map[string]string)
err = b.serialization.Decode(msg.MetaData, &mtData)
if err != nil {return "", nil, err}
message.ctx.SetMetaData(mtData)
value := message.ctx.Value("RespError")
if value != "" {return msg.MagicNumber, message.respChan, errors.New(value)
}
return msg.MagicNumber, message.respChan, b.serialization.Decode(msg.Payload, message.response)
}
服务调用
func (b *BaseClient) call(ctx *light.Context, serviceMethod string, request interface{}, response interface{}, respChan chan error) (magic string, err error) {metaData := ctx.GetMetaData() // 获取 ctx 进行根底编码
// 1. 结构申请
// 1.1 序列化
serviceNameByte := []byte(b.serverName)
serviceMethodByte := []byte(serviceMethod)
var metaDataBytes []byte
var requestBytes []byte
metaDataBytes, err = b.serialization.Encode(metaData)
if err != nil {return "", err}
requestBytes, err = b.serialization.Encode(request)
if err != nil {return "", err}
// 1.2 加密
metaDataBytes, err = cryptology.AESEncrypt(b.aesKey, metaDataBytes)
if err != nil {return "", err}
requestBytes, err = cryptology.AESEncrypt(b.aesKey, requestBytes)
if err != nil {return "", err}
compressorType := b.options.compressorType
if len(metaDataBytes) > compressorMin && len(metaDataBytes) < compressorMax {
// 1.3 压缩
metaDataBytes, err = b.compressor.Zip(metaDataBytes)
if err != nil {return "", err}
requestBytes, err = b.compressor.Zip(requestBytes)
if err != nil {return "", err}
} else {compressorType = codes.RawData}
// 1.4 封装音讯
magic, message, err := protocol.EncodeMessage("", serviceNameByte, serviceMethodByte, metaDataBytes, byte(protocol.Request), byte(compressorType), byte(b.options.serializationType), requestBytes)
if err != nil {return "", err}
// 2. 发送音讯
if b.options.writeTimeout > 0 {now := time.Now()
timeout := ctx.GetTimeout() // 如果 ctx 存在设置 则采纳 返之应用默认配置
if timeout > 0 {b.conn.SetDeadline(now.Add(timeout))
b.conn.SetWriteDeadline(now.Add(timeout))
} else {b.conn.SetDeadline(now.Add(b.options.writeTimeout))
b.conn.SetWriteDeadline(now.Add(b.options.writeTimeout))
}
}
// 写 MAP
b.respInterRM.Lock()
b.respInterMap[magic] = &respMessage{
response: response,
ctx: ctx,
respChan: respChan,
}
b.respInterRM.Unlock()
// 有点暴力呀 间接上锁
b.writeMu.Lock()
_, err = b.conn.Write(message)
b.writeMu.Unlock()
if err != nil {
if b.options.Trace {log.Println(err)
}
b.err = err
return "", errors.WithStack(err)
}
return magic, nil
}