用consul做grpc的服务发现

32次阅读

共计 12304 个字符,预计需要花费 31 分钟才能阅读完成。

用 consul 做 grpc 的服务发现与健康检查
consul
服务发现与负载均衡
当 server 端是集群部署时,client 调用 server 就需要用到服务发现与负载均衡。通常有两总方式:

一种方式是在 client 与 server 之间加代理,由代理来做负载均衡
一种方式是将服务注册到一个数据中心,client 通过数据中心查询到所有服务的节点信息,然后自己选择负载均衡的策略。

第一种方式常见的就是用 nginx 给 http 服务做负载均衡,client 端不直接与 server 交互,而是把请求并给 nginx,nginx 再转给后端的服务。这种方式的优点是:
client 和 server 无需做改造,client 看不到 server 的集群,就像单点一样调用就可以
这种方式有几个缺点:

所有的请求都必须经过代理,代理侧容易出现性能瓶颈
代理不能出故障,一旦代理挂了服务就没法访问了。

第二种方式可以参考 dubbo 的 rpc 方式,所有的服务都注册在 zookeeper 上,client 端从 zookeeper 订阅 server 的列表,然后自己选择把请求发送到哪个 server 上。对于上面提到的两个缺点,这种方式都很好的避免了:

client 与 server 端是直接交互的,server 可以做任意的水平扩展,不会出现性能瓶颈
注册中心 (zookeeper) 通过 raft 算法实现分布式高可用,不用担心注册中心挂了服务信息丢失的情况。

这种方式的缺点就是实现起来比较复杂。
用第一种方式做 grpc 的负载均衡时可以有以下的选择:

nginx grpc
traefik grpc

用第二种方式时,可以选择的数据中心中间件有:

zookeeper

etcd Etcd 是 Kubernetes 集群中的一个十分重要的组件
consul

他们都实现了 raft 算法,都可以用来做注册中心,本篇文章选择 consul 是因为 consul 的特点就是做服务发现,有现成的 api 可以用。
用 consul 给 golang 的 grpc 做服务注册与发现
grpc 的 resolver
grpc 的 Dial()和 DialContent()方法中都可以添加 Load-Balance 的选项,Dial 方法已经被废弃了,本篇文章介绍使用 DialContext 的方法。
grpc 官方实现了 [dns_resolver]() 用来做 dns 的负载均衡。我们通过例子看看 grpc client 端的代码是怎么写的,然后再理解 dns_resolver 的源码,最后参照 dns_resolver 来写自己的 consul_resovler。
dns 的负载均衡的例子:
package main
import (
“context”
“log”
“google.golang.org/grpc”
“google.golang.org/grpc/balancer/roundrobin”
pb “google.golang.org/grpc/examples/helloworld/helloworld”
“google.golang.org/grpc/resolver”
)
const (
address = “dns:///dns-record-name:443”
defaultName = “world”
)
func main() {
// The secret sauce
resolver.SetDefaultScheme(“dns”)
// Set up a connection to the server.

ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)

conn, err := grpc.DialContext(ctx, address, grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
if err != nil {
log.Fatalf(“did not connect: %v”, err)
}
defer conn.Close()
c := pb.NewGreeterClient(conn)
// Contact the servers in round-robin manner.
for i := 0; i < 3; i++ {
ctx := context.Background()
r, err := c.SayHello(ctx, &pb.HelloRequest{Name: defaultName})
if err != nil {
log.Fatalf(“could not greet: %v”, err)
}
log.Printf(“Greeting: %s”, r.Message)
}
}
DialContext 的定义如下:
func DialContext(ctx context.Context, target string, opts …DialOption) (conn *ClientConn, err error)
下面这行代码指明了用 dns_resolver,实际上也可以不写,grpc 会根据 DialContext 的第二个参数 target 来判断选用哪个 resolver,例子中传给 DialContext 的 target 是 dns:///dns-record-name:443,grpc 会自动选择 dns_resolver
resolver.SetDefaultScheme(“dns”)

下面的这个选项,指明了 grpc 用轮询做为负载均衡的策略
grpc.WithBalancerName(roundrobin.Name)

调用 grpc.DialContext 之后,grpc 会找到对应的 resovler,拿到服务的地址列表,然后在调用服务提供的接口时,根据指定的轮询策略选择一个服务。
gRPC Name Resolution 里面说了,可以实现自定义的 resolver 作为插件。
先看看 resolver.go 的源码,源码路径是 $GOPATH/src/google.golang.org/grpc/resolver/resolver.go
m = make(map[string]Builder) //scheme 到 Builder 的 map

func Register(b Builder) {// 用于 resolver 注册的接口,dns_resolver.go 的 init 方中调用了这个方法, 实际就是更新了 map
m[b.Scheme()] = b
}

type Resolver interface {
ResolveNow(ResolveNowOption) // 立即 resolve,重新查询服务信息
Close() // 关闭这个 Resolver
}

type Target struct {//uri 解析之后的对象, uri 的格式详见 RFC3986
Scheme string
Authority string
Endpoint string
}

type Address struct {// 描述一个服务的地址信息
Addr string // 格式是 host:port
Type AddressType
ServerName string
Metadata interface{}
}

type ClientConn interface {// 定义了两个 callback 函数,用于通知服务信息的更新
NewAddress(addresses []Address)
NewServiceConfig(serviceConfig string)
}

type Builder interface {
Build(target Target, cc ClientConn, opts BuildOption) (Resolver, error) // 返回一个 Resolver
Scheme() string // 返回 scheme 如 “dns”, “passthrough”, “consul”
}

func Get(scheme string) Builder {//grpc.ClientConn 会高用这个方法获取指定的 Builder 接口的实例
if b, ok := m[scheme]; ok {
return b
}
return nil
}

即使加了注释,估计也很难马上理解这个其中的具体含意,博主也是结合 dns_resolver.go,反复读了好几遍才理解 resolver.go。其大致的意思是,grpc.DialContext 方法调用之后:

解析 target(例如 dns:///dns-record-name:443)获取 scheme
调用 resolver.Get 方法根据 scheme 拿到对应的 Builder

调用 Builder.Build 方法

解析 target
获取服务地址的信息
调用 ClientConn.NewAddress 和 NewServiceConfig 这两个 callback 把服务信息传递给上层的调用方
返回 Resolver 接口实例给上层

上层可以通过 Resolver.ResolveNow 方法主动刷新服务信息

了解了 resolver 源码的意思之后,再看一下 dns_resolver.go 就比较清晰了
// 注册一个 Builder 到 resolver 的 map 里面
// 这个方法会被默认调用,了解 go 的 init 可以自行百度
func init() {
resolver.Register(NewBuilder())
}

func NewBuilder() resolver.Builder {// 创建一个 resolver.Builder 的实例
return &dnsBuilder{minFreq: defaultFreq}
}

func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {
// 解析 target 拿到 ip 和端口
host, port, err := parseTarget(target.Endpoint, defaultPort)
if err != nil {
return nil, err
}

// IP address.
if net.ParseIP(host) != nil {
host, _ = formatIP(host)
addr := []resolver.Address{{Addr: host + “:” + port}}
i := &ipResolver{
cc: cc,
ip: addr,
rn: make(chan struct{}, 1),
q: make(chan struct{}),
}
cc.NewAddress(addr)
go i.watcher()
return i, nil
}

// DNS address (non-IP).
ctx, cancel := context.WithCancel(context.Background())
d := &dnsResolver{
freq: b.minFreq,
backoff: backoff.Exponential{MaxDelay: b.minFreq},
host: host,
port: port,
ctx: ctx,
cancel: cancel,
cc: cc,
t: time.NewTimer(0),
rn: make(chan struct{}, 1),
disableServiceConfig: opts.DisableServiceConfig,
}

if target.Authority == “” {
d.resolver = defaultResolver
} else {
d.resolver, err = customAuthorityResolver(target.Authority)
if err != nil {
return nil, err
}
}

d.wg.Add(1)
go d.watcher()// 起一个 goroutine,因为 watcher 这个方法是个死循环,当定时器
return d, nil
}

func (d *dnsResolver) watcher() {
defer d.wg.Done()
for {
// 这个 select 没有 default,当没有 case 满足时会一直阻塞
// 结束阻塞的条件是定时器超时 d.t.C,或者 d.rn 这个 channel 中有数据可读
select {
case <-d.ctx.Done():
return
case <-d.t.C:
case <-d.rn:
}
result, sc := d.lookup()
// Next lookup should happen within an interval defined by d.freq. It may be
// more often due to exponential retry on empty address list.
if len(result) == 0 {
d.retryCount++
d.t.Reset(d.backoff.Backoff(d.retryCount))
} else {
d.retryCount = 0
d.t.Reset(d.freq)
}
//resolver.ClientConn 的两个 callback 的调用,实现服务信息传入上层
d.cc.NewServiceConfig(sc)
d.cc.NewAddress(result)
}
}

// 向 channel 中写入,用于结束 watcher 中那个 select 的阻塞状态,后面的代码就是重新查询服务信息的逻辑
func (i *ipResolver) ResolveNow(opt resolver.ResolveNowOption) {
select {
case i.rn <- struct{}{}:
default:
}
}

实现 consul_resovler
上面我们了解了 grpc 的 resolver 的机制,接下来实现 consul_resolver, 我们先把代码的架子搭起来
init() // 返回一个 resolver.Builder 的实例

// 实现 resolver.Builder 的接口中的所有方法就是一个 resolver.Builder
type consulBuidler strcut {
}

func (cb *consulBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {
//TODO 解析 target, 拿到 consul 的 ip 和端口

//TODO 用 consul 的 go api 连接 consul,查询服务结点信息,并且调用 resolver.ClientConn 的两个 callback
}

func (cb *consulBuilder) Scheme() string {
return “consul”
}

//ResolverNow 方法什么也不做,因为和 consul 保持了发布订阅的关系
// 不需要像 dns_resolver 那个定时的去刷新
func (cr *consulResolver) ResolveNow(opt resolver.ResolveNowOption) {
}

// 暂时先什么也不做吧
func (cr *consulResolver) Close() {
}

现在来看,实现 consul_resolver.go 最大的问题就是怎么用 consul 提供的 go api 了,参考这篇文章就可以了,然后 consul_resolver.go 的代码就出来了
package consul

import (
“errors”
“fmt”
“github.com/hashicorp/consul/api”
“google.golang.org/grpc/resolver”
“regexp”
“sync”
)

const (
defaultPort = “8500”
)

var (
errMissingAddr = errors.New(“consul resolver: missing address”)

errAddrMisMatch = errors.New(“consul resolver: invalied uri”)

errEndsWithColon = errors.New(“consul resolver: missing port after port-separator colon”)

regexConsul, _ = regexp.Compile(“^([A-z0-9.]+)(:[0-9]{1,5})?/([A-z_]+)$”)
)

func Init() {
fmt.Printf(“calling consul init\n”)
resolver.Register(NewBuilder())
}

type consulBuilder struct {
}

type consulResolver struct {
address string
wg sync.WaitGroup
cc resolver.ClientConn
name string
disableServiceConfig bool
lastIndex uint64
}

func NewBuilder() resolver.Builder {
return &consulBuilder{}
}

func (cb *consulBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {

fmt.Printf(“calling consul build\n”)
fmt.Printf(“target: %v\n”, target)
host, port, name, err := parseTarget(fmt.Sprintf(“%s/%s”, target.Authority, target.Endpoint))
if err != nil {
return nil, err
}

cr := &consulResolver{
address: fmt.Sprintf(“%s%s”, host, port),
name: name,
cc: cc,
disableServiceConfig: opts.DisableServiceConfig,
lastIndex: 0,
}

cr.wg.Add(1)
go cr.watcher()
return cr, nil

}

func (cr *consulResolver) watcher() {
fmt.Printf(“calling consul watcher\n”)
config := api.DefaultConfig()
config.Address = cr.address
client, err := api.NewClient(config)
if err != nil {
fmt.Printf(“error create consul client: %v\n”, err)
return
}

for {
services, metainfo, err := client.Health().Service(cr.name, cr.name, true, &api.QueryOptions{WaitIndex: cr.lastIndex})
if err != nil {
fmt.Printf(“error retrieving instances from Consul: %v”, err)
}

cr.lastIndex = metainfo.LastIndex
var newAddrs []resolver.Address
for _, service := range services {
addr := fmt.Sprintf(“%v:%v”, service.Service.Address, service.Service.Port)
newAddrs = append(newAddrs, resolver.Address{Addr: addr})
}
fmt.Printf(“adding service addrs\n”)
fmt.Printf(“newAddrs: %v\n”, newAddrs)
cr.cc.NewAddress(newAddrs)
cr.cc.NewServiceConfig(cr.name)
}

}

func (cb *consulBuilder) Scheme() string {
return “consul”
}

func (cr *consulResolver) ResolveNow(opt resolver.ResolveNowOption) {
}

func (cr *consulResolver) Close() {
}

func parseTarget(target string) (host, port, name string, err error) {

fmt.Printf(“target uri: %v\n”, target)
if target == “” {
return “”, “”, “”, errMissingAddr
}

if !regexConsul.MatchString(target) {
return “”, “”, “”, errAddrMisMatch
}

groups := regexConsul.FindStringSubmatch(target)
host = groups[1]
port = groups[2]
name = groups[3]
if port == “” {
port = defaultPort
}
return host, port, name, nil
}

到此,grpc 客户端服务发现就搞定了。
consul 的服务注册
服务注册直接用 consul 的 go api 就可以了,也是参考前一篇文章,简单的封装一下,consul_register.go 的代码如下:
package consul

import (
“fmt”
“github.com/hashicorp/consul/api”
“time”
)

type ConsulService struct {
IP string
Port int
Tag []string
Name string
}

func RegitserService(ca string, cs *ConsulService) {

//register consul
consulConfig := api.DefaultConfig()
consulConfig.Address = ca
client, err := api.NewClient(consulConfig)
if err != nil {
fmt.Printf(“NewClient error\n%v”, err)
return
}
agent := client.Agent()
interval := time.Duration(10) * time.Second
deregister := time.Duration(1) * time.Minute

reg := &api.AgentServiceRegistration{
ID: fmt.Sprintf(“%v-%v-%v”, cs.Name, cs.IP, cs.Port), // 服务节点的名称
Name: cs.Name, // 服务名称
Tags: cs.Tag, // tag,可以为空
Port: cs.Port, // 服务端口
Address: cs.IP, // 服务 IP
Check: &api.AgentServiceCheck{// 健康检查
Interval: interval.String(), // 健康检查间隔
GRPC: fmt.Sprintf(“%v:%v/%v”, cs.IP, cs.Port, cs.Name), // grpc 支持,执行健康检查的地址,service 会传到 Health.Check 函数中
DeregisterCriticalServiceAfter: deregister.String(), // 注销时间,相当于过期时间
},
}

fmt.Printf(“registing to %v\n”, ca)
if err := agent.ServiceRegister(reg); err != nil {
fmt.Printf(“Service Register error\n%v”, err)
return
}

}

改造一下 grpc 的 helloworld
把 grpc 的 helloworld 的 demo 改一下,用 consul 来做服务注册和发现。server 端代码:
package main

import (
“context”
“fmt”
“google.golang.org/grpc”
“google.golang.org/grpc/health/grpc_health_v1”
“log”
“net”
“server/internal/consul”
pb “server/proto/helloworld”
)

const (
port = “:50051”
)

// server is used to implement helloworld.GreeterServer.
type server struct{}

// SayHello implements helloworld.GreeterServer
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
log.Printf(“Received: %v”, in.Name)
return &pb.HelloReply{Message: “Hello ” + in.Name}, nil
}

func RegisterToConsul() {
consul.RegitserService(“127.0.0.1:8500”, &consul.ConsulService{
Name: “helloworld”,
Tag: []string{“helloworld”},
IP: “127.0.0.1”,
Port: 50051,
})
}

//health
type HealthImpl struct{}

// Check 实现健康检查接口,这里直接返回健康状态,这里也可以有更复杂的健康检查策略,比如根据服务器负载来返回
func (h *HealthImpl) Check(ctx context.Context, req *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) {
fmt.Print(“health checking\n”)
return &grpc_health_v1.HealthCheckResponse{
Status: grpc_health_v1.HealthCheckResponse_SERVING,
}, nil
}

func (h *HealthImpl) Watch(req *grpc_health_v1.HealthCheckRequest, w grpc_health_v1.Health_WatchServer) error {
return nil
}

func main() {
lis, err := net.Listen(“tcp”, port)
if err != nil {
log.Fatalf(“failed to listen: %v”, err)
}
s := grpc.NewServer()
pb.RegisterGreeterServer(s, &server{})
grpc_health_v1.RegisterHealthServer(s, &HealthImpl{})
RegisterToConsul()
if err := s.Serve(lis); err != nil {
log.Fatalf(“failed to serve: %v”, err)
}
}

client 端代码:
package main

import (
“client/internal/consul”
pb “client/proto/helloworld”
“context”
“google.golang.org/grpc”
“log”
“os”
“time”
)

const (
target = “consul://127.0.0.1:8500/helloworld”
defaultName = “world”
)

func main() {
consul.Init()
// Set up a connection to the server.
ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
conn, err := grpc.DialContext(ctx, target, grpc.WithBlock(), grpc.WithInsecure(), grpc.WithBalancerName(“round_robin”))
if err != nil {
log.Fatalf(“did not connect: %v”, err)
}
defer conn.Close()
c := pb.NewGreeterClient(conn)

// Contact the server and print out its response.
name := defaultName
if len(os.Args) > 1 {
name = os.Args[1]
}
for {
ctx, _ := context.WithTimeout(context.Background(), time.Second)
r, err := c.SayHello(ctx, &pb.HelloRequest{Name: name})
if err != nil {
log.Fatalf(“could not greet: %v”, err)
}
log.Printf(“Greeting: %s”, r.Message)
time.Sleep(time.Second * 2)
}
}

运行一把
启动 consul
consul agent -dev

启动 hello server
cd server
go run cmd/main.go

启动 hello client
cd client
go run cmd/main.go

运行结果:
//client
2019/03/07 17:22:04 Greeting: Hello world
2019/03/07 17:22:06 Greeting: Hello world

//server
2019/03/07 17:22:04 Received: world
2019/03/07 17:22:06 Received: world

完整工程的 git 地址工程使用方法:
cd server
go mod tidy
go run cmd/main.go

cd client
go mod tidy
go run cmd/main.go

请自行解决防火墙的问题
参考文章

grpc 名称发现与负载均衡
golang consul-grpc 服务注册与发现
gRPC Client-Side Load Balancing in Go
godoc grpc
consul go api

正文完
 0