前言
后面几章咱们实现了 服务端的编写 当初开始客户端编写
https://github.com/dollarkill...
Client
type Client struct { options *Options}func NewClient(discover discovery.Discovery, options ...Option) *Client { client := &Client{ options: defaultOptions(), } client.options.Discovery = discover for _, fn := range options { fn(client.options) } return client}
option
type Options struct { Discovery discovery.Discovery // 服务发现插件 loadBalancing load_banlancing.LoadBalancing // 负载平衡插件 serializationType codes.SerializationType // 序列化插件 compressorType codes.CompressorType // 压缩插件 pool int // 连接池大小 cryptology cryptology.Cryptology rsaPublicKey []byte writeTimeout time.Duration readTimeout time.Duration heartBeat time.Duration Trace bool AUTH string // AUTH TOKEN}func defaultOptions() *Options { defaultPoolSize := runtime.NumCPU() * 4 if defaultPoolSize < 20 { defaultPoolSize = 20 } return &Options{ pool: defaultPoolSize, serializationType: codes.MsgPack, compressorType: codes.Snappy, loadBalancing: load_banlancing.NewPolling(), cryptology: cryptology.AES, rsaPublicKey: []byte(`-----BEGIN PUBLIC KEY----------END PUBLIC KEY-----`), writeTimeout: time.Minute, readTimeout: time.Minute * 3, heartBeat: time.Minute, Trace: false, AUTH: "", }}
具体每个链接
type Connect struct { Client *Client pool *connectPool close chan struct{} serverName string}func (c *Client) NewConnect(serverName string) (conn *Connect, err error) { connect := &Connect{ Client: c, serverName: serverName, close: make(chan struct{}), } connect.pool, err = initPool(connect) return connect, err}
初始化连接池
func initPool(c *Connect) (*connectPool, error) { cp := &connectPool{ connect: c, pool: make(chan LightClient, c.Client.options.pool), } return cp, cp.initPool()}func (c *connectPool) initPool() error { hosts, err := c.connect.Client.options.Discovery.Discovery(c.connect.serverName) // 调用服务发现 查看 发现具体服务 if err != nil { return err } if len(hosts) == 0 { return errors.New(fmt.Sprintf("%s server 404", c.connect.serverName)) } c.connect.Client.options.loadBalancing.InitBalancing(hosts) // 初始化 负载平衡插件 // 初始化连接池 for i := 0; i < c.connect.Client.options.pool; i++ { client, err := newBaseClient(c.connect.serverName, c.connect.Client.options) // 建设链接 if err != nil { return errors.WithStack(err) } c.pool <- client } return nil}// 连接池中获取一个链接func (c *connectPool) Get(ctx context.Context) (LightClient, error) { select { case <-ctx.Done(): return nil, errors.New("pool get timeout") case r := <-c.pool: return r, nil }}// 放回一个链接func (c *connectPool) Put(client LightClient) { if client.Error() == nil { c.pool <- client return } // 如果 client.Error() 有异样 须要新初始化一个链接 放入连接池 go func() { fmt.Println("The server starts to restore") for { time.Sleep(time.Second) hosts, err := c.connect.Client.options.Discovery.Discovery(c.connect.serverName) if err != nil { log.Println(err) continue } if len(hosts) == 0 { err := errors.New(fmt.Sprintf("%s server 404", c.connect.serverName)) log.Println(err) continue } c.connect.Client.options.loadBalancing.InitBalancing(hosts) baseClient, err := newBaseClient(c.connect.serverName, c.connect.Client.options) if err != nil { log.Println(err) continue } c.pool <- baseClient fmt.Println("Service recovery success") break } }()}
Connect 调用具体服务
func (c *Connect) Call(ctx *light.Context, serviceMethod string, request interface{}, response interface{}) error { ctxT, _ := context.WithTimeout(context.TODO(), time.Second*6) var err error // 连接池中获取一个链接 client, err := c.pool.Get(ctxT) if err != nil { return errors.WithStack(err) } // 用完 放回链接 defer func() { c.pool.Put(client) }() // 设置token ctx.SetValue("Light_AUTH", c.Client.options.AUTH) // 具体调用 err = client.Call(ctx, serviceMethod, request, response) if err != nil { return errors.WithStack(err) } return nil}
调用外围 重点
温习 s03 协定设计
/** 协定设计 起始符 : 版本号 : crc32校验 : magicNumberSize: serverNameSize : serverMethodSize : metaDataSize : payloadSize: respType : compressorType : serializationType : magicNumber : serverName : serverMethod : metaData : payload 0x05 : 0x01 : 4 : 4 : 4 : 4 : 4 : 4 : 1 : 1 : 1 : xxx : xxx : xxx : xxx : xxx*/
留神: 每一个申请都有一个 magicNumber 都有一个申请ID
单个链接定义
type BaseClient struct { conn net.Conn options *Options serverName string aesKey []byte serialization codes.Serialization compressor codes.Compressor respInterMap map[string]*respMessage respInterRM sync.RWMutex // 返回构造锁 writeMu sync.Mutex // 写锁 err error // 谬误 close chan struct{} // 用于敞开服务}type respMessage struct { response interface{} ctx *light.Context respChan chan error}
初始化单个链接
func newBaseClient(serverName string, options *Options) (*BaseClient, error) { // 服务发现用 service, err := options.loadBalancing.GetService() if err != nil { return nil, err } con, err := transport.Client.Gen(service.Protocol, service.Addr) if err != nil { return nil, errors.WithStack(err) } serialization, ex := codes.SerializationManager.Get(options.serializationType) if !ex { return nil, pkg.ErrSerialization404 } compressor, ex := codes.CompressorManager.Get(options.compressorType) if !ex { return nil, pkg.ErrCompressor404 } // 握手 encrypt, err := cryptology.RsaEncrypt([]byte(options.AUTH), options.rsaPublicKey) if err != nil { return nil, err } aesKey := []byte(strings.ReplaceAll(uuid.New().String(), "-", "")) // 替换秘钥 aesKey2, err := cryptology.RsaEncrypt(aesKey, options.rsaPublicKey) if err != nil { return nil, err } handshake := protocol.EncodeHandshake(aesKey2, encrypt, []byte("")) _, err = con.Write(handshake) if err != nil { con.Close() return nil, err } hsk := &protocol.Handshake{} err = hsk.Handshake(con) if err != nil { con.Close() return nil, err } if hsk.Error != nil && len(hsk.Error) > 0 { con.Close() err := string(hsk.Error) return nil, errors.New(err) } bc := &BaseClient{ serverName: serverName, conn: con, options: options, serialization: serialization, compressor: compressor, respInterMap: map[string]*respMessage{}, aesKey: aesKey, close: make(chan struct{}), } go bc.heartBeat() // 心跳服务 go bc.processMessageManager() // 返回音讯的解决 return bc, nil}
heartBeat 心跳服务
func (b *BaseClient) heartBeat() { defer func() { fmt.Println("heartBeat Close") }()loop: for { select { case <-b.close: break loop case <-time.After(b.options.heartBeat): // 定时发送心跳 _, i, err := protocol.EncodeMessage("x", []byte(""), []byte(""), []byte(""), byte(protocol.HeartBeat), byte(b.options.compressorType), byte(b.options.serializationType), []byte("")) if err != nil { log.Println(err) break } now := time.Now() b.conn.SetDeadline(now.Add(b.options.writeTimeout)) b.conn.SetWriteDeadline(now.Add(b.options.writeTimeout)) b.writeMu.Lock() _, err = b.conn.Write(i) b.writeMu.Unlock() if err != nil { b.err = err break loop } } }}
processMessageManager 返回音讯的解决服务 (留神这里能够并发的来)
func (b *BaseClient) processMessageManager() { defer func() { fmt.Println("processMessageManager Close") }() for { magic, respChan, err := b.processMessage() // 解决某个音讯 if err == nil && magic == "" { continue } if err != nil && magic == "" { break } if err != nil && magic != "" && respChan != nil { respChan <- err } if err == nil && magic != "" && respChan != nil { close(respChan) } }}func (b *BaseClient) processMessage() (magic string, respChan chan error, err error) { // 3.封装回执 now := time.Now() b.conn.SetReadDeadline(now.Add(b.options.readTimeout)) proto := protocol.NewProtocol() msg, err := proto.IODecode(b.conn) if err != nil { b.err = err close(b.close) return "", nil, err } // heartbeat if msg.Header.RespType == byte(protocol.HeartBeat) { if b.options.Trace { log.Println("is HeartBeat") } return "", nil, nil } b.respInterRM.RLock() message, ex := b.respInterMap[msg.MagicNumber] b.respInterRM.RUnlock() if !ex { // 不存在 代表音讯曾经生效 if b.options.Trace { log.Println("Not Ex", msg.MagicNumber) } return "", nil, nil } comp, ex := codes.CompressorManager.Get(codes.CompressorType(msg.Header.CompressorType)) if !ex { return "", nil, nil } // 1. 解压缩 msg.MetaData, err = comp.Unzip(msg.MetaData) if err != nil { return "", nil, err } msg.Payload, err = comp.Unzip(msg.Payload) if err != nil { return "", nil, err } // 2. 解密 msg.MetaData, err = cryptology.AESDecrypt(b.aesKey, msg.MetaData) if err != nil { if len(msg.MetaData) != 0 { return "", nil, err } msg.Payload = []byte("") } msg.Payload, err = cryptology.AESDecrypt(b.aesKey, msg.Payload) if err != nil { if len(msg.Payload) != 0 { return "", nil, err } msg.Payload = []byte("") } // 3. 反序列化 RespError mtData := make(map[string]string) err = b.serialization.Decode(msg.MetaData, &mtData) if err != nil { return "", nil, err } message.ctx.SetMetaData(mtData) value := message.ctx.Value("RespError") if value != "" { return msg.MagicNumber, message.respChan, errors.New(value) } return msg.MagicNumber, message.respChan, b.serialization.Decode(msg.Payload, message.response)}
服务调用
func (b *BaseClient) call(ctx *light.Context, serviceMethod string, request interface{}, response interface{}, respChan chan error) (magic string, err error) { metaData := ctx.GetMetaData() // 获取ctx 进行根底编码 // 1. 结构申请 // 1.1 序列化 serviceNameByte := []byte(b.serverName) serviceMethodByte := []byte(serviceMethod) var metaDataBytes []byte var requestBytes []byte metaDataBytes, err = b.serialization.Encode(metaData) if err != nil { return "", err } requestBytes, err = b.serialization.Encode(request) if err != nil { return "", err } // 1.2 加密 metaDataBytes, err = cryptology.AESEncrypt(b.aesKey, metaDataBytes) if err != nil { return "", err } requestBytes, err = cryptology.AESEncrypt(b.aesKey, requestBytes) if err != nil { return "", err } compressorType := b.options.compressorType if len(metaDataBytes) > compressorMin && len(metaDataBytes) < compressorMax { // 1.3 压缩 metaDataBytes, err = b.compressor.Zip(metaDataBytes) if err != nil { return "", err } requestBytes, err = b.compressor.Zip(requestBytes) if err != nil { return "", err } } else { compressorType = codes.RawData } // 1.4 封装音讯 magic, message, err := protocol.EncodeMessage("", serviceNameByte, serviceMethodByte, metaDataBytes, byte(protocol.Request), byte(compressorType), byte(b.options.serializationType), requestBytes) if err != nil { return "", err } // 2. 发送音讯 if b.options.writeTimeout > 0 { now := time.Now() timeout := ctx.GetTimeout() // 如果ctx 存在设置 则采纳 返之应用默认配置 if timeout > 0 { b.conn.SetDeadline(now.Add(timeout)) b.conn.SetWriteDeadline(now.Add(timeout)) } else { b.conn.SetDeadline(now.Add(b.options.writeTimeout)) b.conn.SetWriteDeadline(now.Add(b.options.writeTimeout)) } } // 写MAP b.respInterRM.Lock() b.respInterMap[magic] = &respMessage{ response: response, ctx: ctx, respChan: respChan, } b.respInterRM.Unlock() // 有点暴力呀 间接上锁 b.writeMu.Lock() _, err = b.conn.Write(message) b.writeMu.Unlock() if err != nil { if b.options.Trace { log.Println(err) } b.err = err return "", errors.WithStack(err) } return magic, nil}