关于go:goalng中netrpc的使用

一、根本应用形式阐明

// server/server.go
package main

import (
    "net"
    "net/rpc"
)

type Args struct {
    A, B int
}

type Calculator int

func (t *Calculator) Add(args *Args, reply *int) error {
    *reply = args.A + args.B
    return nil
}

func (t *Calculator) Sub(args *Args, reply *int) error {
    *reply = args.A - args.B
    return nil
}

func main() {
    // 1. 创立 rpc 服务端
    rpcServer := rpc.NewServer()

    // 2. 注册服务
    // 待注册的服务办法必须是公开的,2 个参数都是与 client 约定好的固定类型,且为指针;
    // 第 1 个为 client 提交的参数,第 2 个是给 client 的返回值。
    _ = rpcServer.Register(new(Calculator))

    // 3. 开启监听指定的公开端口(比方此处的 8090)
    l, _ := net.Listen("tcp", ":8090")

    // 4. 周而复始同 client 建设 tcp 连贯,并开启一个 goroutine 解决
    // 调用了 go server.ServeConn(conn)
    rpcServer.Accept(l)

    // 5. server.ServeConn(conn) 周而复始 接管申请、解决申请

    // 6. 解决单个申请时,必然是以 gob 压缩数据, gob_encode(header) + gob_encode(body)
    //  header 为固定的数据结构 rpc.Request{},内容含 ServiceMethod、Seq(会返回给 client,client 可能会并发申请,依据返回的 Seq 辨别是哪个申请)
    //  1) gob 先解析出固定构造的 header,
    //  2) 依据 header 中的 ServiceMethod 找到注册的服务,
    //  3) 依据找到的服务确定同 client 约定好的该服务的 body 构造(client 提交的参数),
    //  4) gob 依据 body 构造解析出申请参数信息,
    //  5) ServiceMethod + 参数,解决工作,实现后返回,
    //  6) 返回信息同样是 gob_encode(header) + gob_encode(body),header(rpc.Response{})中含 Seq,body 为同客户端约定好的返回构造,body为约定好的reply构造体
}

// client/client.go
package main

import (
    "fmt"
    "net"
    "net/rpc"
    "sync"
)

type Args struct {
    A, B int
}

func main() {
    // 1. 建设 tcp 连贯
    conn, _ := net.Dial("tcp", "127.0.0.1:8090")

    // 2. 依据 tcp 连贯创立 client
    //    同时开启一个 goroutine 周而复始读取 server 返回的后果
    //    server 返回依照 header(rpc.Response{}: ServiceName+Seq) + body(具体服务约定好的返回构造)
    //    此步骤因为还没有发出请求,临时不会读取到数据
    client := rpc.NewClient(conn)

    wg := &sync.WaitGroup{}
    wg.Add(2)

    // 3. client 能够并发发动申请
    //    然而因为应用了同一个 tcp 连贯,为了不相互影响,是排队写入的
    //    通过加锁,写入一个残缺的申请后[ header(rpc.Request{}: ServiceName+Seq) + body(具体的参数构造) ],再另外写入一个申请
    //    server 读取是依照约定,先读取 header,确定 service,再读取 body(具体的参数)
    go func() {
        args := &Args{100, 20}
        reply := new(int)
        // client.Call() 办法应用了 channel 进行阻塞,直到步骤 2 中的读取到 server 返回的数据
        _ = client.Call("Calculator.Add", args, reply)
        fmt.Printf("Calculator.Add: %d + %d = %d\n", args.A, args.B, *reply)
        wg.Done()
    }()

    go func() {
        args := &Args{100, 20}
        reply := new(int)
        _ = client.Call("Calculator.Sub", args, reply)
        fmt.Printf("Calculator.Sub: %d - %d = %d\n", args.A, args.B, *reply)
        wg.Done()
    }()

    wg.Wait()
}

$ cd path/server
$ go run ./server.go

$ cd path/client
$ go run ./client.go
Calculator.Sub: 100 - 20 = 80
Calculator.Add: 100 + 20 = 120

二、利用已有的 DefaultServer 及 “http 转 rpc”

net/rpc 包已有一个初始化好的 DefaultServer

且提供了有先通过 http 连贯转 rpc 连贯的办法。

// server/server.go
package main

import (
    "net/http"
    "net/rpc"
)

type Args struct {
    A, B int
}

type Calculator int

func (t *Calculator) Add(args *Args, reply *int) error {
    *reply = args.A + args.B
    return nil
}

func (t *Calculator) Sub(args *Args, reply *int) error {
    *reply = args.A - args.B
    return nil
}

func main() {
    // 1. 将 Calculator 服务注册至默认的 rpc 服务器 DefaultServer
    _ = rpc.Register(new(Calculator))
    // 2. DefaultServer 注册至默认的 http 服务器 DefaultServeMux
    //    其注册的 http 地址为 /_goRPC_
    //    当 http 服务器收到拜访地址 /_goRPC_ 的 http 申请时,会启动一个 goroutine 调用 DefaultServer.ServeHTTP() 解决 http 申请
    //    DefaultServer.ServeHTTP() 同 client 进行完一轮 http 申请后,不会开释以后 tcp 连贯,而是转为一般的 rpc 申请
    rpc.HandleHTTP()
    // 3. http 服务器开始监听 "端口 8090、地址 /_goRPC_" 的 http 申请,解决完 http 申请(相当于校验)后,转为 rpc 申请
    _ = http.ListenAndServe(":8090", nil)
}

// client/client.go
package main

import (
    "fmt"
    "net/rpc"
    "sync"
)

type Args struct {
    A, B int
}

func main() {
    // 1. 发送 http 申请至 "端口 8090、地址 /_goRPC_",
    //    等到 http 胜利返回并校验胜利,将其转为 rpc 申请,并创立 client 返回
    client, _ := rpc.DialHTTP("tcp", "127.0.0.1:8090")

    wg := &sync.WaitGroup{}
    wg.Add(2)
    go func() {
        args := &Args{100, 20}
        reply := new(int)
        // client.Call() 办法应用了 channel 进行阻塞,直到步骤 2 中的读取到 server 返回的数据
        _ = client.Call("Calculator.Add", args, reply)
        fmt.Printf("Calculator.Add: %d + %d = %d\n", args.A, args.B, *reply)
        wg.Done()
    }()

    go func() {
        args := &Args{100, 20}
        reply := new(int)
        _ = client.Call("Calculator.Sub", args, reply)
        fmt.Printf("Calculator.Sub: %d - %d = %d\n", args.A, args.B, *reply)
        wg.Done()
    }()

    wg.Wait()
}


$ cd path/server
$ go run ./server.go

$ cd path/client
$ go run ./client.go
Calculator.Sub: 100 - 20 = 80
Calculator.Add: 100 + 20 = 120

三、基于前述http转rpc,退出token权限校验

// server/server.go
package main

import (
    "io"
    "net/http"
    "net/rpc"
)

type Args struct {
    A, B int
}

type Calculator int

func (t *Calculator) Add(args *Args, reply *int) error {
    *reply = args.A + args.B
    return nil
}

func (t *Calculator) Sub(args *Args, reply *int) error {
    *reply = args.A - args.B
    return nil
}

func main() {
    addr := ":8090"
    requestURI := "/_custom_http_to_rpc"
    token := "bb"

    rpcServer := rpc.NewServer()
    _ = rpcServer.Register(new(Calculator))
    http.Handle(requestURI, http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
        head := request.Header
        if head.Get("token") != token {
            writer.WriteHeader(http.StatusForbidden)
            _, _ = io.WriteString(writer, "403 Forbidden\n")
            return
        }
        rpcServer.ServeHTTP(writer, request)
    }))
    _ = http.ListenAndServe(addr, nil)
}


// client/client.go
package main

import (
    "bufio"
    "errors"
    "fmt"
    "io"
    "net"
    "net/http"
    "net/rpc"
    "sync"
)

type Args struct {
    A, B int
}

func dialHTTPPath(network, address, path, token string) (*rpc.Client, error) {
    connected := "200 Connected to Go RPC"
    conn, err := net.Dial(network, address)
    if err != nil {
        return nil, err
    }
    _, _ = io.WriteString(conn, "CONNECT "+path+" HTTP/1.0\nToken: "+token+"\n\n")

    // Require successful HTTP response
    // before switching to RPC protocol.
    resp, err := http.ReadResponse(bufio.NewReader(conn), &http.Request{Method: "CONNECT"})
    if err == nil && resp.Status == connected {
        return rpc.NewClient(conn), nil
    }
    if err == nil {
        err = errors.New("unexpected HTTP response: " + resp.Status)
    }
    _ = conn.Close()
    return nil, &net.OpError{
        Op:   "dial-http",
        Net:  network + " " + address,
        Addr: nil,
        Err:  err,
    }
}

func main() {
    addr := "127.0.0.1:8090"
    requestURI := "/_custom_http_to_rpc"
    token := "bb"

    client, err := dialHTTPPath("tcp", addr, requestURI, token)
    if err != nil {
        fmt.Println("创立客户端失败", err)
        return
    }
    wg := &sync.WaitGroup{}
    wg.Add(2)
    go func() {
        args := &Args{100, 20}
        reply := new(int)
        _ = client.Call("Calculator.Add", args, reply)
        fmt.Printf("Calculator.Add: %d + %d = %d\n", args.A, args.B, *reply)
        wg.Done()
    }()

    go func() {
        args := &Args{100, 20}
        reply := new(int)
        _ = client.Call("Calculator.Sub", args, reply)
        fmt.Printf("Calculator.Sub: %d - %d = %d\n", args.A, args.B, *reply)
        wg.Done()
    }()

    wg.Wait()
}

$ cd path/server
$ go run ./server.go

$ cd path/client
$ go run ./client.go
Calculator.Sub: 100 - 20 = 80
Calculator.Add: 100 + 20 = 120

【腾讯云】轻量 2核2G4M,首年65元

阿里云限时活动-云数据库 RDS MySQL  1核2G配置 1.88/月 速抢

本文由乐趣区整理发布,转载请注明出处,谢谢。

您可能还喜欢...

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据