乐趣区

关于微服务:服务器注册发现在Go微服务中的使用

服务注册和发现

什么是服务发现和注册

  • 服务注册:将提供某个服务的模块信息(通常是这个服务的 IP 和端口)注册到 1 个公共的租价你下来(比方:zookeeper、consul、etcd)
  • 服务发现:新注册的这个服务模块可能及时的被其余的调用者发现,不论是服务新增和服务删减都能实现主动发现

服务注册与发现利用在哪里

咱们晓得随着互联网的迅速倒退,客户端的申请量正在迅速的减少。那么服务器的压力就会越来越大。那么咱们有什么计划去解决这个问题呢。

Web1.0

在 web1.0 的时代,所有的 client 调用的都是一台服务器。所有的服务也都在一个我的项目中(如下图)。在互联网起步阶段是能够满足过后的访问量的。然而随着客户端不停的减少单台服务器曾经无奈满足这么大的访问量了,那咱们如何解决呢?

Web2.0

通过鸡蛋和篮子实践,当所有鸡蛋都放到一个篮子里,然而鸡蛋确越来越多的状况下,咱们只能减少篮子了。把鸡蛋放到不同的篮子里 这样就能够解决这个问题。
这样咱们就进入了 web2.0 的时代。也就是咱们尝尝说道的服务器集群,所有的客户端会通过一个负载平衡把所有的申请通过不同的算法散发到集群中的机器里。这样就能够负载更多客户端的申请了。

微服务时代

随着时代的提高,咱们的业务从繁多的网站浏览变成了各种各样的 APP。用户的减少也就是咱们的性能变得多样化。那么在 Web2.0 中应用的单体服务集群部署的形式就会让我的项目变的非常的简单。给咱们的开发和保护造成了很大的困扰。那么再次依据鸡蛋篮子实践,咱们的解决方案是 把不同的鸡蛋放到不同的篮子里。简略的说就是把一个单体的服务杜绝业务的维度拆分成不同的服务,这些服务里的资源须要做到互相隔离。

微服务对于服务发现注册的需要

咱们能够想一下,一个电商服务咱们能够分成用户、商品、订单、库存等微服务。如果只是单单 5 个微服务咱们可能感觉关系并不是很简单能够应用配置文件或者其余的形式去解决服务和服务之间的调用配置。那么随着业务越来越简单,用户微服务又会被拆分成用户、注销、会员等等独立的微服务。这个时候微服务的数量将会变得非常的宏大。那么这么多的微服务指尖的错综合更简单的互相调用呢?这个时候服务注册和发现就能够解决这些问题。

能够设想一下,用户服务我只须要向服务中心注册一个名字为“User”的服务,而后把本人的 ip 和端口上传到注册中心里。那么调用方就能够简略依据这个服务的名称来找到 User 服务,调用方并不关怀有多少台机器在这个服务中这些都会是注册核心来解决的问题。那这么做有什么益处呢:

  • 能够随时减少微服务中服务器的数量
  • 能够通过对立的注册核心做心跳检测,来保障服务的可用性
  • 不用保护简单的服务名称和对应的 IP 端口等等
  • 能够做对立的注册发现零碎,做对立的预警或者报警机制,呈现问题能够被及时的发现

这就是为什么各个微服务框架中都会有服务发现注册这个模块了。

在 Kratos 如何应用服务发现注册

Kratos 本文就不做过多的解释了,是一个 B 站开源的微服务框架。在这个框架中有一个接口:

type Registrar interface {
    // 注册实例
    Register(ctx context.Context, service *ServiceInstance) error
    // 反注册实例
    Deregister(ctx context.Context, service *ServiceInstance) error
}

type Discovery interface {
    // 依据 serviceName 间接拉取实例列表
    GetService(ctx context.Context, serviceName string) ([]*ServiceInstance, error)
    // 依据 serviceName 阻塞式订阅一个服务的实例列表信息
    Watch(ctx context.Context, serviceName string) (Watcher, error)
}

只有咱们的对象实现了这两个接口就能够在框架中应用服务注册与发现了。
当然在 Kratos 中曾经默认实现了大多数支流的服务发现组件比方 consul、etcd、kubernetes、zookeeper 等等。
那咱们要如何应用呢?

服务注册

在服务注册中,咱们只须要创立一个 Registrar 对象,讲这个对象注入到 Kratos 中就能够实现服务的发现。当然这个服务须要有一个不反复的名称。
上面的代码中应用的时 consul 为例的代码。

import (
    consul "github.com/go-kratos/kratos/contrib/registry/consul/v2"
    "github.com/hashicorp/consul/api"
)

// new consul client
client, err := api.NewClient(api.DefaultConfig())
if err != nil {panic(err)
}
// new reg with consul client
reg := consul.New(client)

app := kratos.New(
    // service-name
    kratos.Name(Name),
    kratos.Version(Version),
    kratos.Metadata(map[string]string{}),
    kratos.Logger(logger),
    kratos.Server(
        hs,
        gs,
    ),
    // with registrar
    kratos.Registrar(reg),
)
外围代码

上面是 Kratos 中 consul 作为服务发现的外围代码,咱们看到第一个办法是对服务的一些解决和判断,留神这个服务里能够蕴含 http 协定的也包好 grpc 协定的。最终组成一个对象,调用 ServiceRegister 办法,办法中能够看到是真正的向 consul 发送一个 put 申请,注册了一个服务。
这个 put 申请是 consul 对外公开的注册服务的接口。其实 Kratos 是对这个接口封装了一层,通过配置读取主动的做服务注册。

// Register register service instance to consul
func (c *Client) Register(_ context.Context, svc *registry.ServiceInstance, enableHealthCheck bool) error {addresses := make(map[string]api.ServiceAddress)
    checkAddresses := make([]string, 0, len(svc.Endpoints))
    for _, endpoint := range svc.Endpoints {raw, err := url.Parse(endpoint)
        if err != nil {return err}
        addr := raw.Hostname()
        port, _ := strconv.ParseUint(raw.Port(), 10, 16)

        checkAddresses = append(checkAddresses, net.JoinHostPort(addr, strconv.FormatUint(port, 10)))
        addresses[raw.Scheme] = api.ServiceAddress{Address: endpoint, Port: int(port)}
    }
    asr := &api.AgentServiceRegistration{
        ID:              svc.ID,
        Name:            svc.Name,
        Meta:            svc.Metadata,
        Tags:            []string{fmt.Sprintf("version=%s", svc.Version)},
        TaggedAddresses: addresses,
    }
    if len(checkAddresses) > 0 {host, portRaw, _ := net.SplitHostPort(checkAddresses[0])
        port, _ := strconv.ParseInt(portRaw, 10, 32)
        asr.Address = host
        asr.Port = int(port)
    }
    if enableHealthCheck {
        for _, address := range checkAddresses {
            asr.Checks = append(asr.Checks, &api.AgentServiceCheck{
                TCP:                            address,
                Interval:                       fmt.Sprintf("%ds", c.healthcheckInterval),
                DeregisterCriticalServiceAfter: fmt.Sprintf("%ds", c.deregisterCriticalServiceAfter),
                Timeout:                        "5s",
            })
        }
    }
    if c.heartbeat {
        asr.Checks = append(asr.Checks, &api.AgentServiceCheck{
            CheckID:                        "service:" + svc.ID,
            TTL:                            fmt.Sprintf("%ds", c.healthcheckInterval*2),
            DeregisterCriticalServiceAfter: fmt.Sprintf("%ds", c.deregisterCriticalServiceAfter),
        })
    }

    err := c.cli.Agent().ServiceRegister(asr)
    if err != nil {return err}
    if c.heartbeat {go func() {time.Sleep(time.Second)
            err = c.cli.Agent().UpdateTTL("service:"+svc.ID, "pass", "pass")
            if err != nil {log.Errorf("[Consul]update ttl heartbeat to consul failed!err:=%v", err)
            }
            ticker := time.NewTicker(time.Second * time.Duration(c.healthcheckInterval))
            defer ticker.Stop()
            for {
                select {
                case <-ticker.C:
                    err = c.cli.Agent().UpdateTTL("service:"+svc.ID, "pass", "pass")
                    if err != nil {log.Errorf("[Consul]update ttl heartbeat to consul failed!err:=%v", err)
                    }
                case <-c.ctx.Done():
                    return
                }
            }
        }()}
    return nil
}

func (a *Agent) serviceRegister(service *AgentServiceRegistration, opts ServiceRegisterOpts) error {r := a.c.newRequest("PUT", "/v1/agent/service/register")
    r.obj = service
    r.ctx = opts.ctx
    if opts.ReplaceExistingChecks {r.params.Set("replace-existing-checks", "true")
    }
    _, resp, err := a.c.doRequest(r)
    if err != nil {return err}
    defer closeResponseBody(resp)
    if err := requireOK(resp); err != nil {return err}
    return nil
}

服务发现

参考文章:
深刻理解服务注册与发现

退出移动版