共计 9482 个字符,预计需要花费 24 分钟才能阅读完成。
前言
通过上两篇的学习 咱们曾经理解了 服务端本地服务的注册, 服务端配置, 协定 当初咱们开始写服务端的外围逻辑
https://github.com/dollarkill…
默认配置
咱们先看下默认的配置
func defaultOptions() *Options { | |
return &Options{ | |
Protocol: transport.TCP, // default TCP | |
Uri: "0.0.0.0:8397", | |
UseHttp: false, | |
readTimeout: time.Minute * 3, // 心跳包 默认 3min | |
writeTimeout: time.Second * 30, | |
ctx: context.Background(), // ctx 是管制服务退出的 | |
options: map[string]interface{}{"TCPKeepAlivePeriod": time.Minute * 3,}, | |
processChanSize: 1000, | |
Trace: false, | |
RSAPublicKey: []byte(`-----BEGIN PUBLIC KEY----- | |
-----END PUBLIC KEY-----`), | |
RSAPrivateKey: []byte(`-----BEGIN RSA PRIVATE KEY----- | |
-----END RSA PRIVATE KEY-----`), | |
Discovery: &discovery.SimplePeerToPeer{},} | |
} |
run
服务注册结束之后 调用 Run 办法 启动服务
func (s *Server) Run(options ...Option) error { | |
// 初始化 服务端配置 | |
for _, fn := range options {fn(s.options) | |
} | |
var err error | |
// 更具配置传入的 protocol 获取到 网络插件 (KCP UDP TCP) 咱们等下细讲 | |
s.options.nl, err = transport.Transport.Gen(s.options.Protocol, s.options.Uri) | |
if err != nil {return err} | |
log.Printf("LightRPC: %s %s \n", s.options.Protocol, s.options.Uri) | |
// 这里是服务注册 咱们这里先跳过 | |
if s.options.Discovery != nil { | |
// 读取服务配置文件 | |
sIdb, err := ioutil.ReadFile("./light.conf") | |
if err != nil { | |
// 如果没有 就生成 分布式 ID | |
id, err := utils.DistributedID() | |
if err != nil {return err} | |
sIdb = []byte(id) | |
} | |
// 进行服务注册 | |
sId := string(sIdb) | |
for k := range s.serviceMap { // 进行服务注册 | |
err := s.options.Discovery.Registry(k, s.options.registryAddr, s.options.weights, s.options.Protocol, s.options.MaximumLoad, &sId) | |
if err != nil {return err} | |
log.Printf("Discovery Registry: %s addr: %s SUCCESS", k, s.options.registryAddr) | |
} | |
ioutil.WriteFile("./light.conf", sIdb, 00666) | |
} | |
// 启动服务 | |
return s.run()} | |
func (s *Server) run() error { | |
loop: | |
for { | |
select {case <-s.options.ctx.Done(): // 查看是否须要退出服务 | |
break loop | |
default: | |
accept, err := s.options.nl.Accept() // 获取一个链接 | |
if err != nil {log.Println(err) | |
continue | |
} | |
if s.options.Trace {log.Println("connect:", accept.RemoteAddr()) | |
} | |
go s.process(accept) // 开一个协程去解决 该 链接 | |
} | |
} | |
return nil | |
} |
咱们先回顾一下 上章讲的 握手逻辑
- 建设链接 通过非对称加密 传输 aes 密钥给服务端 (携带 token)
- 服务端 验证 token 并记录 aes 密钥 前面与客户端交互 都采纳对称加密
具体解决 链接 process (重点!!!)
func (s *Server) process(conn net.Conn) {defer func() { | |
// 网络不牢靠 | |
if err := recover(); err != nil {utils.PrintStack() | |
log.Println("Recover Err:", err) | |
} | |
}() | |
// 每进来一个申请这里就 ADD | |
s.options.Discovery.Add(1) | |
defer func() {s.options.Discovery.Less(1) // 解决完 申请就退出 | |
// 退出 回收句柄 | |
err := conn.Close() | |
if err != nil {log.Println(err) | |
return | |
} | |
if s.options.Trace {log.Println("close connect:", conn.RemoteAddr()) | |
} | |
}() | |
// 这里定义一个 xChannel 用于拆散 申请和返回 | |
xChannel := utils.NewXChannel(s.options.processChanSize) | |
// 握手 | |
handshake := protocol.Handshake{} | |
err := handshake.Handshake(conn) | |
if err != nil {return} | |
// 非对称加密 解密 AES KEY | |
aesKey, err := cryptology.RsaDecrypt(handshake.Key, s.options.RSAPrivateKey) | |
if err != nil {encodeHandshake := protocol.EncodeHandshake([]byte(""), []byte(""), []byte(err.Error())) | |
conn.Write(encodeHandshake) | |
return | |
} | |
// 检测 AES KEY 是否正确 | |
if len(aesKey) != 32 && len(aesKey) != 16 {encodeHandshake := protocol.EncodeHandshake([]byte(""), []byte(""), []byte("aes key != 32 && key != 16")) | |
conn.Write(encodeHandshake) | |
return | |
} | |
// 解密 TOKEN | |
token, err := cryptology.RsaDecrypt(handshake.Token, s.options.RSAPrivateKey) | |
if err != nil {encodeHandshake := protocol.EncodeHandshake([]byte(""), []byte(""), []byte(err.Error())) | |
conn.Write(encodeHandshake) | |
return | |
} | |
// 对 TOKEN 进行校验 | |
if s.options.AuthFunc != nil {err := s.options.AuthFunc(light.DefaultCtx(), string(token)) | |
if err != nil {encodeHandshake := protocol.EncodeHandshake([]byte(""), []byte(""), []byte(err.Error())) | |
conn.Write(encodeHandshake) | |
return | |
} | |
} | |
// limit 限流 | |
if s.options.Discovery.Limit() { | |
// 熔断 | |
encodeHandshake := protocol.EncodeHandshake([]byte(""), []byte(""), []byte(pkg.ErrCircuitBreaker.Error())) | |
conn.Write(encodeHandshake) | |
log.Println(s.options.Discovery.Limit()) | |
return | |
} | |
// 如果握手没有问题 则返回握手胜利 | |
encodeHandshake := protocol.EncodeHandshake([]byte(""), []byte(""), []byte("")) | |
_, err = conn.Write(encodeHandshake) | |
if err != nil {return} | |
// send | |
go func() { | |
loop: | |
for { | |
select { | |
// 这就是刚刚的 xChannel 对读写进行拆散 | |
case msg, ex := <-xChannel.Ch: | |
if !ex { | |
if s.options.Trace {log.Printf("ip: %s close send server", conn.RemoteAddr()) | |
} | |
break loop | |
} | |
now := time.Now() | |
if s.options.writeTimeout > 0 {conn.SetWriteDeadline(now.Add(s.options.writeTimeout)) | |
} | |
// send message | |
_, err := conn.Write(msg) | |
if err != nil { | |
if s.options.Trace {log.Printf("ip: %s err: %s", conn.RemoteAddr(), err) | |
} | |
break loop | |
} | |
} | |
} | |
}() | |
defer func() {xChannel.Close() | |
}() | |
loop: | |
for { // 具体音讯获取 | |
now := time.Now() | |
if s.options.readTimeout > 0 {conn.SetReadDeadline(now.Add(s.options.readTimeout)) | |
} | |
proto := protocol.NewProtocol() | |
msg, err := proto.IODecode(conn) // 获取一个音讯 | |
if err != nil { | |
if err == io.EOF { | |
if s.options.Trace {log.Printf("ip: %s close", conn.RemoteAddr()) | |
} | |
break loop | |
} | |
// 遇到谬误敞开链接 | |
if s.options.Trace {log.Printf("ip: %s err: %s", conn.RemoteAddr(), err) | |
} | |
break loop | |
} | |
go s.processResponse(xChannel, msg, conn.RemoteAddr().String(), aesKey) | |
} | |
} |
具体解决(重点!!!)
留神此 RPC 传输音讯都是编码过的 要进行转码
- 第一层 为压缩编码
- 第二层 为加密编码
- 第三层 为序列化
func (s *Server) processResponse(xChannel *utils.XChannel, msg *protocol.Message, addr string, aesKey []byte) { | |
var err error | |
s.options.Discovery.Add(1) | |
defer func() {s.options.Discovery.Less(1) | |
if err != nil { | |
if s.options.Trace {log.Println("ProcessResponse Error:", err, "ID:", addr) | |
} | |
xChannel.Close()} | |
}() | |
// heartBeat 判断 | |
if msg.Header.RespType == byte(protocol.HeartBeat) { | |
// 心跳返回 | |
if s.options.Trace {log.Println("HeartBeat:", addr) | |
} | |
// 4. 打包 | |
_, message, err := protocol.EncodeMessage(msg.MagicNumber, []byte(msg.ServiceName), []byte(msg.ServiceMethod), []byte(""), byte(protocol.HeartBeat), msg.Header.CompressorType, msg.Header.SerializationType, []byte("")) | |
if err != nil {return} | |
// 5. 回写 | |
err = xChannel.Send(message) | |
if err != nil {return} | |
return | |
} | |
// 限流 | |
if s.options.Discovery.Limit() {serialization, _ := codes.SerializationManager.Get(codes.MsgPack) | |
metaData := make(map[string]string) | |
metaData["RespError"] = pkg.ErrCircuitBreaker.Error() | |
meta, err := serialization.Encode(metaData) | |
if err != nil {return} | |
decrypt, err := cryptology.AESDecrypt(aesKey, meta) | |
if err != nil {return} | |
_, message, err := protocol.EncodeMessage(msg.MagicNumber, []byte(msg.ServiceName), []byte(msg.ServiceMethod), decrypt, byte(protocol.Response), byte(codes.RawData), byte(codes.MsgPack), []byte("")) | |
if err != nil {return} | |
// 5. 回写 | |
err = xChannel.Send(message) | |
if err != nil {return} | |
log.Println(s.options.Discovery.Limit()) | |
log.Println("限流 /////////////") | |
return | |
} | |
// 1. 解压缩 | |
compressor, ex := codes.CompressorManager.Get(codes.CompressorType(msg.Header.CompressorType)) | |
if !ex {err = errors.New("compressor 404") | |
return | |
} | |
msg.MetaData, err = compressor.Unzip(msg.MetaData) | |
if err != nil {return} | |
msg.Payload, err = compressor.Unzip(msg.Payload) | |
if err != nil {return} | |
// 2. 解密 | |
msg.MetaData, err = cryptology.AESDecrypt(aesKey, msg.MetaData) | |
if err != nil {return} | |
msg.Payload, err = cryptology.AESDecrypt(aesKey, msg.Payload) | |
if err != nil {return} | |
// 3. 反序列化 | |
serialization, ex := codes.SerializationManager.Get(codes.SerializationType(msg.Header.SerializationType)) | |
if !ex {err = errors.New("serialization 404") | |
return | |
} | |
metaData := make(map[string]string) | |
err = serialization.Decode(msg.MetaData, &metaData) | |
if err != nil {return} | |
// 初始化 context | |
ctx := light.DefaultCtx() | |
ctx.SetMetaData(metaData) | |
// 1.3 auth | |
if s.options.AuthFunc != nil {auth := metaData["Light_AUTH"] | |
err := s.options.AuthFunc(ctx, auth) | |
if err != nil {ctx.SetValue("RespError", err.Error()) | |
var metaDataByte []byte | |
metaDataByte, _ = serialization.Encode(ctx.GetMetaData()) | |
metaDataByte, _ = cryptology.AESEncrypt(aesKey, metaDataByte) | |
metaDataByte, _ = compressor.Zip(metaDataByte) | |
// 4. 打包 | |
_, message, err := protocol.EncodeMessage(msg.MagicNumber, []byte(msg.ServiceName), []byte(msg.ServiceMethod), metaDataByte, byte(protocol.Response), msg.Header.CompressorType, msg.Header.SerializationType, []byte("")) | |
if err != nil {return} | |
// 5. 回写 | |
err = xChannel.Send(message) | |
if err != nil {return} | |
return | |
} | |
} | |
// 找到具体调用的服务 | |
ser, ex := s.serviceMap[msg.ServiceName] | |
if !ex {err = errors.New("service does not exist") | |
return | |
} | |
// 找到具体调用的办法 | |
method, ex := ser.methodType[msg.ServiceMethod] | |
if !ex {err = errors.New("method does not exist") | |
return | |
} | |
// 初始化 req, resp | |
req := utils.RefNew(method.RequestType) | |
resp := utils.RefNew(method.ResponseType) | |
err = serialization.Decode(msg.Payload, req) | |
if err != nil {return} | |
// 定义 ctx paht 为 服务名称. 服务办法 | |
path := fmt.Sprintf("%s.%s", msg.ServiceName, msg.ServiceMethod) | |
ctx.SetPath(path) | |
// 前置 middleware | |
if len(s.beforeMiddleware) != 0 { | |
for idx := range s.beforeMiddleware {err := s.beforeMiddleware[idx](ctx, req, resp) | |
if err != nil {return} | |
} | |
} | |
funcs, ex := s.beforeMiddlewarePath[path] | |
if ex {if len(funcs) != 0 { | |
for idx := range funcs {err := funcs[idx](ctx, req, resp) | |
if err != nil {return} | |
} | |
} | |
} | |
// 外围调用 | |
callErr := ser.call(ctx, method, reflect.ValueOf(req), reflect.ValueOf(resp)) | |
if callErr != nil {ctx.SetValue("RespError", callErr.Error()) | |
} | |
// 后置 middleware | |
if len(s.afterMiddleware) != 0 { | |
for idx := range s.afterMiddleware {err := s.afterMiddleware[idx](ctx, req, resp) | |
if err != nil {return} | |
} | |
} | |
funcs, ex = s.afterMiddlewarePath[path] | |
if ex {if len(funcs) != 0 { | |
for idx := range funcs {err := funcs[idx](ctx, req, resp) | |
if err != nil {return} | |
} | |
} | |
} | |
// response | |
// 1. 序列化 | |
var respBody []byte | |
respBody, err = serialization.Encode(resp) | |
var metaDataByte []byte | |
metaDataByte, _ = serialization.Encode(ctx.GetMetaData()) | |
// 2. 加密 | |
metaDataByte, err = cryptology.AESEncrypt(aesKey, metaDataByte) | |
if err != nil {return} | |
respBody, err = cryptology.AESEncrypt(aesKey, respBody) | |
if err != nil {return} | |
// 3. 压缩 | |
metaDataByte, err = compressor.Zip(metaDataByte) | |
if err != nil {return} | |
respBody, err = compressor.Zip(respBody) | |
if err != nil {return} | |
// 4. 打包 | |
_, message, err := protocol.EncodeMessage(msg.MagicNumber, []byte(msg.ServiceName), []byte(msg.ServiceMethod), metaDataByte, byte(protocol.Response), msg.Header.CompressorType, msg.Header.SerializationType, respBody) | |
if err != nil {return} | |
// 5. 回写 | |
err = xChannel.Send(message) | |
if err != nil {return} | |
} |
调用具体方法
func (s *service) call(ctx *light.Context, mType *methodType, request, response reflect.Value) (err error) { | |
// recover 捕捉堆栈音讯 | |
defer func() {if r := recover(); r != nil {buf := make([]byte, 4096) | |
n := runtime.Stack(buf, false) | |
buf = buf[:n] | |
err = fmt.Errorf("[painc service internal error]: %v, method: %s, argv: %+v, stack: %s", | |
r, mType.method.Name, request.Interface(), buf) | |
log.Println(err) | |
} | |
}() | |
fn := mType.method.Func | |
returnValue := fn.Call([]reflect.Value{s.refVal, reflect.ValueOf(ctx), request, response}) | |
errInterface := returnValue[0].Interface() | |
if errInterface != nil {return errInterface.(error) | |
} | |
return nil | |
} |
这里就实现了服务端的根底逻辑了
正文完