关于后端:grpc服务发现负载均衡

2次阅读

共计 8651 个字符,预计需要花费 22 分钟才能阅读完成。

构建高可用、高性能的通信服务,通常采纳服务注册与发现、负载平衡和容错解决等机制实现。依据负载平衡实现所在的地位不同,通常可分为以下三种解决方案:

1、集中式 LB(Proxy Model)

在服务消费者和服务提供者之间有一个独立的 LB,通常是专门的硬件设施如 F5,或者基于软件如 LVS,HAproxy 等实现。LB 上有所有服务的地址映射表,通常由运维配置注册,当服务生产方调用某个指标服务时,它向 LB 发动申请,由 LB 以某种策略,比方轮询(Round-Robin)做负载平衡后将申请转发到指标服务。LB 个别具备健康检查能力,能主动摘除不衰弱的服务实例。该计划次要问题:

  1. 单点问题,所有服务调用流量都通过 LB,当服务数量和调用量大的时候,LB 容易成为瓶颈,且一旦 LB 产生故障影响整个零碎;
  2. 服务生产方、提供方之间减少了一级,有肯定性能开销。

2、过程内 LB(Balancing-aware Client)

针对第一个计划的有余,此计划将 LB 的性能集成到服务生产方过程里,也被称为软负载或者客户端负载计划。服务提供方启动时,首先将服务地址注册到服务注册表,同时定期报心跳到服务注册表以表明服务的存活状态,相当于健康检查,服务生产方要拜访某个服务时,它通过内置的 LB 组件向服务注册表查问,同时缓存并定期刷新指标服务地址列表,而后以某种负载平衡策略抉择一个指标服务地址,最初向指标服务发动申请。LB 和服务发现能力被扩散到每一个服务消费者的过程外部,同时服务生产方和服务提供方之间是间接调用,没有额定开销,性能比拟好。该计划次要问题:

  1. 开发成本,该计划将服务调用方集成到客户端的过程外头,如果有多种不同的语言栈,就要配合开发多种不同的客户端,有肯定的研发和保护老本;
  2. 另外生产环境中,后续如果要对客户库进行降级,势必要求服务调用方批改代码并从新公布,降级较简单。

3、独立 LB 过程(External Load Balancing Service)

该计划是针对第二种计划的有余而提出的一种折中计划,原理和第二种计划根本相似。
不同之处是将 LB 和服务发现性能从过程内移出来,变成主机上的一个独立过程。主机上的一个或者多个服务要拜访指标服务时,他们都通过同一主机上的独立 LB 过程做服务发现和负载平衡。该计划也是一种分布式计划没有单点问题,一个 LB 过程挂了只影响该主机上的服务调用方,服务调用方和 LB 之间是过程内调用性能好,同时该计划还简化了服务调用方,不须要为不同语言开发客户库,LB 的降级不须要服务调用方改代码。
该计划次要问题:部署较简单,环节多,出错调试排查问题不不便。

gRPC 服务发现及负载平衡实现

gRPC 开源组件官网并未间接提供服务注册与发现的性能实现,但其设计文档已提供实现的思路,并在不同语言的 gRPC 代码 API 中已提供了命名解析和负载平衡接口供扩大。

其根本实现原理:

  1. 服务启动后 gRPC 客户端向命名服务器收回名称解析申请,名称将解析为一个或多个 IP 地址,每个 IP 地址标示它是服务器地址还是负载均衡器地址,以及标示要应用那个客户端负载平衡策略或服务配置。
  2. 客户端实例化负载平衡策略,如果解析返回的地址是负载均衡器地址,则客户端将应用 grpclb 策略,否则客户端应用服务配置申请的负载平衡策略。
  3. 负载平衡策略为每个服务器地址创立一个子通道(channel)。
  4. 当有 rpc 申请时,负载平衡策略决定那个子通道即 grpc 服务器将接管申请,当可用服务器为空时客户端的申请将被阻塞。
    依据 gRPC 官网提供的设计思路,基于过程内 LB 计划(即第 2 个案,阿里开源的服务框架 Dubbo 也是采纳相似机制),联合分布式统一的组件(如 Zookeeper、Consul、Etcd),可找到 gRPC 服务发现和负载平衡的可行解决方案。接下来以 GO 语言为例,简略介绍下基于 Etcd3 的要害代码实现:

1)命名解析实现:resolver.go
resolver.go

package etcdv3
import (
    "errors"
    "fmt"
    "strings"
    etcd3 "github.com/coreos/etcd/clientv3"
    "google.golang.org/grpc/naming"
)
// resolver is the implementaion of grpc.naming.Resolver
type resolver struct {serviceName string // service name to resolve}
// NewResolver return resolver with service name
func NewResolver(serviceName string) *resolver {return &resolver{serviceName: serviceName}
}
// Resolve to resolve the service from etcd, target is the dial address of etcd
// target example: "http://127.0.0.1:2379,http://127.0.0.1:12379,http://127.0.0.1:22379"
func (re *resolver) Resolve(target string) (naming.Watcher, error) {
    if re.serviceName == "" {return nil, errors.New("grpclb: no service name provided")
    }
    // generate etcd client
    client, err := etcd3.New(etcd3.Config{Endpoints: strings.Split(target, ","),
    })
    if err != nil {return nil, fmt.Errorf("grpclb: creat etcd3 client failed: %s", err.Error())
    }
    // Return watcher
    return &watcher{re: re, client: *client}, nil
}

2)服务发现实现:watcher.go
watcher.go

package etcdv3
import (
    "fmt"
    etcd3 "github.com/coreos/etcd/clientv3"
    "golang.org/x/net/context"
    "google.golang.org/grpc/naming"
    "github.com/coreos/etcd/mvcc/mvccpb"
)
// watcher is the implementaion of grpc.naming.Watcher
type watcher struct {
    re            *resolver // re: Etcd Resolver
    client        etcd3.Client
    isInitialized bool
}
// Close do nothing
func (w *watcher) Close() {}
// Next to return the updates
func (w *watcher) Next() ([]*naming.Update, error) {
    // prefix is the etcd prefix/value to watch
    prefix := fmt.Sprintf("/%s/%s/", Prefix, w.re.serviceName)
    // check if is initialized
    if !w.isInitialized {
        // query addresses from etcd
        resp, err := w.client.Get(context.Background(), prefix, etcd3.WithPrefix())
        w.isInitialized = true
        if err == nil {addrs := extractAddrs(resp)
            //if not empty, return the updates or watcher new dir
            if l := len(addrs); l != 0 {updates := make([]*naming.Update, l)
                for i := range addrs {updates[i] = &naming.Update{Op: naming.Add, Addr: addrs[i]}
                }
                return updates, nil
            }
        }
    }
    // generate etcd Watcher
    rch := w.client.Watch(context.Background(), prefix, etcd3.WithPrefix())
    for wresp := range rch {
        for _, ev := range wresp.Events {
            switch ev.Type {
            case mvccpb.PUT:
                return []*naming.Update{ {Op: naming.Add, Addr: string(ev.Kv.Value)} }, nil
            case mvccpb.DELETE:
                return []*naming.Update{ {Op: naming.Delete, Addr: string(ev.Kv.Value)} }, nil
            }
        }
    }
    return nil, nil
}
func extractAddrs(resp *etcd3.GetResponse) []string {addrs := []string{}
    if resp == nil || resp.Kvs == nil {return addrs}
    for i := range resp.Kvs {if v := resp.Kvs[i].Value; v != nil {addrs = append(addrs, string(v))
        }
    }
    return addrs
}

3)服务注册实现:register.go
register.go

package etcdv3
import (
    "fmt"
    "log"
    "strings"
    "time"
    etcd3 "github.com/coreos/etcd/clientv3"
    "golang.org/x/net/context"
    "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
)
// Prefix should start and end with no slash
var Prefix = "etcd3_naming"
var client etcd3.Client
var serviceKey string
var stopSignal = make(chan bool, 1)
// Register
func Register(name string, host string, port int, target string, interval time.Duration, ttl int) error {serviceValue := fmt.Sprintf("%s:%d", host, port)
    serviceKey = fmt.Sprintf("/%s/%s/%s", Prefix, name, serviceValue)
    // get endpoints for register dial address
    var err error
    client, err := etcd3.New(etcd3.Config{Endpoints: strings.Split(target, ","),
    })
    if err != nil {return fmt.Errorf("grpclb: create etcd3 client failed: %v", err)
    }
    go func() {
        // invoke self-register with ticker
        ticker := time.NewTicker(interval)
        for {
            // minimum lease TTL is ttl-second
            resp, _ := client.Grant(context.TODO(), int64(ttl))
            // should get first, if not exist, set it
            _, err := client.Get(context.Background(), serviceKey)
            if err != nil {
                if err == rpctypes.ErrKeyNotFound {if _, err := client.Put(context.TODO(), serviceKey, serviceValue, etcd3.WithLease(resp.ID)); err != nil {log.Printf("grpclb: set service'%s'with ttl to etcd3 failed: %s", name, err.Error())
                    }
                } else {log.Printf("grpclb: service'%s'connect to etcd3 failed: %s", name, err.Error())
                }
            } else {
                // refresh set to true for not notifying the watcher
                if _, err := client.Put(context.Background(), serviceKey, serviceValue, etcd3.WithLease(resp.ID)); err != nil {log.Printf("grpclb: refresh service'%s'with ttl to etcd3 failed: %s", name, err.Error())
                }
            }
            select {
            case <-stopSignal:
                return
            case <-ticker.C:
            }
        }
    }()
    return nil
}
// UnRegister delete registered service from etcd
func UnRegister() error {
    stopSignal <- true
    stopSignal = make(chan bool, 1) // just a hack to avoid multi UnRegister deadlock
    var err error;
    if _, err := client.Delete(context.Background(), serviceKey); err != nil {log.Printf("grpclb: deregister'%s'failed: %s", serviceKey, err.Error())
    } else {log.Printf("grpclb: deregister'%s'ok.", serviceKey)
    }
    return err
}

4)接口形容文件:helloworld.proto
helloworld.proto

syntax = "proto3";
option java_multiple_files = true;
option java_package = "com.midea.jr.test.grpc";
option java_outer_classname = "HelloWorldProto";
option objc_class_prefix = "HLW";
package helloworld;
// The greeting service definition.
service Greeter {
    //   Sends a greeting
    rpc SayHello (HelloRequest) returns (HelloReply) {}}
// The request message containing the user's name.
message HelloRequest {string name = 1;}
// The response message containing the greetings
message HelloReply {string message = 1;}

5)实现服务端接口:helloworldserver.go
helloworldserver.go

package main
import (
    "flag"
    "fmt"
    "log"
    "net"
    "os"
    "os/signal"
    "syscall"
    "time"
    "golang.org/x/net/context"
    "google.golang.org/grpc"
    grpclb "com.midea/jr/grpclb/naming/etcd/v3"
    "com.midea/jr/grpclb/example/pb"
)
var (serv = flag.String("service", "hello_service", "service name")
    port = flag.Int("port", 50001, "listening port")
    reg = flag.String("reg", "http://127.0.0.1:2379", "register etcd address")
)
func main() {flag.Parse()
    lis, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", *port))
    if err != nil {panic(err)
    }
    err = grpclb.Register(*serv, "127.0.0.1", *port, *reg, time.Second*10, 15)
    if err != nil {panic(err)
    }
    ch := make(chan os.Signal, 1)
    signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGQUIT)
    go func() {
        s := <-ch
        log.Printf("receive signal'%v'", s)
        grpclb.UnRegister()
        os.Exit(1)
    }()
    log.Printf("starting hello service at %d", *port)
    s := grpc.NewServer()
    pb.RegisterGreeterServer(s, &server{})
    s.Serve(lis)
}
// server is used to implement helloworld.GreeterServer.
type server struct{}
// SayHello implements helloworld.GreeterServer
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {fmt.Printf("%v: Receive is %s\n", time.Now(), in.Name)
    return &pb.HelloReply{Message: "Hello" + in.Name}, nil
}

6)实现客户端接口:helloworldclient.go
helloworldclient.go

package main
import (
    "flag"
    "fmt"
    "time"
    grpclb "com.midea/jr/grpclb/naming/etcd/v3"
    "com.midea/jr/grpclb/example/pb"
    "golang.org/x/net/context"
    "google.golang.org/grpc"
    "strconv"
)
var (serv = flag.String("service", "hello_service", "service name")
    reg = flag.String("reg", "http://127.0.0.1:2379", "register etcd address")
)
func main() {flag.Parse()
    r := grpclb.NewResolver(*serv)
    b := grpc.RoundRobin(r)
    ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
    conn, err := grpc.DialContext(ctx, *reg, grpc.WithInsecure(), grpc.WithBalancer(b))
    if err != nil {panic(err)
    }
    ticker := time.NewTicker(1 * time.Second)
    for t := range ticker.C {client := pb.NewGreeterClient(conn)
        resp, err := client.SayHello(context.Background(), &pb.HelloRequest{Name: "world" + strconv.Itoa(t.Second())})
        if err == nil {fmt.Printf("%v: Reply is %s\n", t, resp.Message)
        }
    }
}

7)运行测试

1、运行 3 个服务端 S1、S2、S3,1 个客户端 C,察看各服务端接管的申请数是否相等?

2、敞开 1 个服务端 S1,察看申请是否会转移到另外 2 个服务端?

3、重新启动 S1 服务端,察看另外 2 个服务端申请是否会平均分配到 S1?

4、敞开 Etcd3 服务器,察看客户端与服务端通信是否失常?敞开通信依然失常,但新服务端不会注册进来,服务端掉线了也无奈摘除掉。

5、重新启动 Etcd3 服务器,服务端高低线可主动恢复正常。

6、敞开所有服务端,客户端申请将被阻塞。

参考
http://www.grpc.io/docs/
https://github.com/grpc/grpc/…

原文地址 https://segmentfault.com/a/11…

关注 获取更多好文

本文由 mdnice 多平台公布

正文完
 0