概况
Micro 中的 Selector 是客户端级别的负载均衡的组件。当客户端调用服务端方法时,会根据 selector 组件中定义的负载均衡策略来选择 Register 中注册的服务器列表中的一个。默认的有随机策略使用的是随机策略。使用的是 cacheSelector 组件,当然我们可以根据需求来替换这个组件只要实现 Selector 的接口就可以随时替换组件。下面可以重新复习一下组件的接口。
type Selector interface {Init(opts ...Option) error
Options() Options
// Select returns a function which should return the next node
Select(service string, opts ...SelectOption) (Next, error)
// Mark sets the success/error against a node
Mark(service string, node *registry.Node, err error)
// Reset returns state back to zero for a service
Reset(service string)
// Close renders the selector unusable
Close() error
// Name of the selector
String() string}
主要方法和流程
- Client 调用 Call 方法
- Call 方法调用 selector 组件的 Select 方法,获取 next 函数
- call 匿名函数中调用 next 函数(默认为 CacheSelector 随机获取服务列表中的节点)返回 node
- 以 grpcClient 为例,调用 grpcClient.call
- call 函数中获取 conn,然后 Invoke 调用服务端函数
selector 源码
// Client 调用 call 函数
func (g *grpcClient) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
// make a copy of call opts
callOpts := g.opts.CallOptions
for _, opt := range opts {opt(&callOpts)
}
// `1. Client 调用 Call 方法 `
next, err := g.next(req, callOpts)
if err != nil {return err}
// check if we already have a deadline
d, ok := ctx.Deadline()
if !ok {
// no deadline so we create a new one
ctx, _ = context.WithTimeout(ctx, callOpts.RequestTimeout)
} else {
// got a deadline so no need to setup context
// but we need to set the timeout we pass along
opt := client.WithRequestTimeout(time.Until(d))
opt(&callOpts)
}
// should we noop right here?
select {case <-ctx.Done():
return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408)
default:
}
// 复制 call 函数 在下面的 goroutine 中使用
gcall := g.call
// wrap the call in reverse
for i := len(callOpts.CallWrappers); i > 0; i-- {gcall = callOpts.CallWrappers[i-1](gcall)
}
// return errors.New("go.micro.client", "request timeout", 408)
call := func(i int) error {
// call backoff first. Someone may want an initial start delay
t, err := callOpts.Backoff(ctx, req, i)
if err != nil {return errors.InternalServerError("go.micro.client", err.Error())
}
// only sleep if greater than 0
if t.Seconds() > 0 {time.Sleep(t)
}
// 调用 next 方法,获取服务器节点
node, err := next()
if err != nil && err == selector.ErrNotFound {return errors.NotFound("go.micro.client", err.Error())
} else if err != nil {return errors.InternalServerError("go.micro.client", err.Error())
}
// 调用 grpccall 方法 正式发送数据
err = gcall(ctx, node, req, rsp, callOpts)
g.opts.Selector.Mark(req.Service(), node, err)
return err
}
// 初始化 channel 接受 call 返回的 error 用于重试
ch := make(chan error, callOpts.Retries+1)
var gerr error
for i := 0; i <= callOpts.Retries; i++ {go func(i int) {ch <- call(i)
}(i)
select {case <-ctx.Done():
return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408)
case err := <-ch:
// if the call succeeded lets bail early
if err == nil {return nil}
retry, rerr := callOpts.Retry(ctx, req, i, err)
if rerr != nil {return rerr}
if !retry {return err}
gerr = err
}
}
return gerr
}
// grpcClient next 方法 调用 selector 组件的 Select 方法,获取 next 函数
func (g *grpcClient) next(request client.Request, opts client.CallOptions) (selector.Next, error) {service := request.Service()
// get proxy
if prx := os.Getenv("MICRO_PROXY"); len(prx) > 0 {service = prx}
// get proxy address
if prx := os.Getenv("MICRO_PROXY_ADDRESS"); len(prx) > 0 {opts.Address = []string{prx}
}
// return remote address
if len(opts.Address) > 0 {return func() (*registry.Node, error) {
return ®istry.Node{Address: opts.Address[0],
}, nil
}, nil
}
// get next nodes from the selector
next, err := g.opts.Selector.Select(service, opts.SelectOptions...)
if err != nil && err == selector.ErrNotFound {return nil, errors.NotFound("go.micro.client", err.Error())
} else if err != nil {return nil, errors.InternalServerError("go.micro.client", err.Error())
}
return next, nil
}
// 随机获取 Registry 中服务列表中的一个 策略是从上层方法传进来的
func Random(services []*registry.Service) Next {var nodes []*registry.Node
for _, service := range services {nodes = append(nodes, service.Nodes...)
}
return func() (*registry.Node, error) {if len(nodes) == 0 {return nil, ErrNoneAvailable}
i := rand.Int() % len(nodes)
return nodes[i], nil
}
}
默认 registrySelector(CacheSelector)源码分析
主体方法
// 默认 selector 对象
type registrySelector struct {
so Options
rc cache.Cache
}
// Options 设置 只有一个 time to live 设置
type Options struct {
// TTL is the cache TTL
TTL time.Duration
}
// Cache 对象
type cache struct {
// Registry 对象如果过期则通过这个对象获取
registry.Registry
// Options 设置 TTL
opts Options
// 锁用来控制并发问题
sync.RWMutex
// 利用 map 缓存服务器节点 用 name 作为 key
cache map[string][]*registry.Service
// map 保存的服务器过期时间
ttls map[string]time.Time
watched map[string]bool
//
exit chan bool
}
// CacheSelector 获取服务节点核心方法
// 利用如果找到则直接返回如果找不到则请求 Registry 获取服务器节点列表
func (c *cache) get(service string) ([]*registry.Service, error) {
// read lock
c.RLock()
// 先获取缓存中的服务节点
services := c.cache[service]
// 获取服务节点的 ttl
ttl := c.ttls[service]
if c.isValid(services, ttl) {
// make a copy
cp := registry.Copy(services)
// unlock the read
c.RUnlock()
// return servics
return cp, nil
}
// get does the actual request for a service and cache it
get := func(service string) ([]*registry.Service, error) {
// 如果缓存不存在则
services, err := c.Registry.GetService(service)
if err != nil {return nil, err}
// cache results
c.Lock()
// 设置缓存 并同时设置 TTL
c.set(service, registry.Copy(services))
c.Unlock()
return services, nil
}
// watch service if not watched
if _, ok := c.watched[service]; !ok {go c.run(service)
}
// unlock the read lock
c.RUnlock()
// get and return services
return get(service)
}
Watch
在调用 cache.get 函数执行完成之后 会创建一个 goroutine 来监控 service 节点。调用的是 Register 中的 watch 方法,获取到 result 然后判断是否要添加、修改或者删除缓存中的服务器节点。
// get 方法调用 run 函数启动监控
if _, ok := c.watched[service]; !ok {go c.run(service)
}
func (c *cache) run(service string) {
// set watcher
c.Lock()
c.watched[service] = true
c.Unlock()
// delete watcher on exit
defer func() {c.Lock()
delete(c.watched, service)
c.Unlock()}()
var a, b int
for {
// exit early if already dead
if c.quit() {return}
// 设施隔多久检测服务是否可用
j := rand.Int63n(100)
time.Sleep(time.Duration(j) * time.Millisecond)
// 创建一个 watcher
w, err := c.Registry.Watch(registry.WatchService(service),
)
if err != nil {if c.quit() {return}
d := backoff(a)
if a > 3 {log.Log("rcache:", err, "backing off", d)
a = 0
}
time.Sleep(d)
a++
continue
}
// reset a
a = 0
// watch 循环下一个事件并调用 update
// update 函数根据 Register 返回的 result 来修改本地缓存中的节点信息
if err := c.watch(w); err != nil {if c.quit() {return}
d := backoff(b)
if b > 3 {log.Log("rcache:", err, "backing off", d)
b = 0
}
time.Sleep(d)
b++
continue
}
// reset b
b = 0
}
}