乐趣区

Go-Micro-Selector-源码分析

概况

Micro 中的 Selector 是客户端级别的负载均衡的组件。当客户端调用服务端方法时,会根据 selector 组件中定义的负载均衡策略来选择 Register 中注册的服务器列表中的一个。默认的有随机策略使用的是随机策略。使用的是 cacheSelector 组件,当然我们可以根据需求来替换这个组件只要实现 Selector 的接口就可以随时替换组件。下面可以重新复习一下组件的接口。

type Selector interface {Init(opts ...Option) error
    Options() Options
    // Select returns a function which should return the next node
    
    Select(service string, opts ...SelectOption) (Next, error)
    // Mark sets the success/error against a node
    
    Mark(service string, node *registry.Node, err error)
    // Reset returns state back to zero for a service
    
    Reset(service string)
    // Close renders the selector unusable
    
    Close() error
    // Name of the selector
    
    String() string}

主要方法和流程

  1. Client 调用 Call 方法
  2. Call 方法调用 selector 组件的 Select 方法,获取 next 函数
  3. call 匿名函数中调用 next 函数(默认为 CacheSelector 随机获取服务列表中的节点)返回 node
  4. 以 grpcClient 为例,调用 grpcClient.call
  5. call 函数中获取 conn,然后 Invoke 调用服务端函数

selector 源码

// Client 调用 call 函数
func (g *grpcClient) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
    // make a copy of call opts
    callOpts := g.opts.CallOptions
    for _, opt := range opts {opt(&callOpts)
    }
    // `1. Client 调用 Call 方法 `
    next, err := g.next(req, callOpts)
    if err != nil {return err}

    // check if we already have a deadline
    d, ok := ctx.Deadline()
    if !ok {
        // no deadline so we create a new one
        ctx, _ = context.WithTimeout(ctx, callOpts.RequestTimeout)
    } else {
        // got a deadline so no need to setup context
        // but we need to set the timeout we pass along
        opt := client.WithRequestTimeout(time.Until(d))
        opt(&callOpts)
    }

    // should we noop right here?
    select {case <-ctx.Done():
        return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408)
    default:
    }

    // 复制 call 函数 在下面的 goroutine 中使用
    gcall := g.call

    // wrap the call in reverse
    for i := len(callOpts.CallWrappers); i > 0; i-- {gcall = callOpts.CallWrappers[i-1](gcall)
    }

    // return errors.New("go.micro.client", "request timeout", 408)
    call := func(i int) error {
        // call backoff first. Someone may want an initial start delay
        t, err := callOpts.Backoff(ctx, req, i)
        if err != nil {return errors.InternalServerError("go.micro.client", err.Error())
        }

        // only sleep if greater than 0
        if t.Seconds() > 0 {time.Sleep(t)
        }

        // 调用 next 方法,获取服务器节点
        node, err := next()
        if err != nil && err == selector.ErrNotFound {return errors.NotFound("go.micro.client", err.Error())
        } else if err != nil {return errors.InternalServerError("go.micro.client", err.Error())
        }

        // 调用 grpccall 方法 正式发送数据
        err = gcall(ctx, node, req, rsp, callOpts)
        g.opts.Selector.Mark(req.Service(), node, err)
        return err
    }
    // 初始化 channel 接受 call 返回的 error 用于重试
    ch := make(chan error, callOpts.Retries+1)
    var gerr error

    for i := 0; i <= callOpts.Retries; i++ {go func(i int) {ch <- call(i)
        }(i)

        select {case <-ctx.Done():
            return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408)
        case err := <-ch:
            // if the call succeeded lets bail early
            if err == nil {return nil}

            retry, rerr := callOpts.Retry(ctx, req, i, err)
            if rerr != nil {return rerr}

            if !retry {return err}

            gerr = err
        }
    }

    return gerr
}

// grpcClient next 方法 调用 selector 组件的 Select 方法,获取 next 函数
func (g *grpcClient) next(request client.Request, opts client.CallOptions) (selector.Next, error) {service := request.Service()

    // get proxy
    if prx := os.Getenv("MICRO_PROXY"); len(prx) > 0 {service = prx}

    // get proxy address
    if prx := os.Getenv("MICRO_PROXY_ADDRESS"); len(prx) > 0 {opts.Address = []string{prx}
    }

    // return remote address
    if len(opts.Address) > 0 {return func() (*registry.Node, error) {
            return &registry.Node{Address: opts.Address[0],
            }, nil
        }, nil
    }

    // get next nodes from the selector
    next, err := g.opts.Selector.Select(service, opts.SelectOptions...)
    if err != nil && err == selector.ErrNotFound {return nil, errors.NotFound("go.micro.client", err.Error())
    } else if err != nil {return nil, errors.InternalServerError("go.micro.client", err.Error())
    }

    return next, nil
}

// 随机获取 Registry 中服务列表中的一个 策略是从上层方法传进来的
func Random(services []*registry.Service) Next {var nodes []*registry.Node

    for _, service := range services {nodes = append(nodes, service.Nodes...)
    }

    return func() (*registry.Node, error) {if len(nodes) == 0 {return nil, ErrNoneAvailable}

        i := rand.Int() % len(nodes)
        return nodes[i], nil
    }
}

默认 registrySelector(CacheSelector)源码分析

主体方法

// 默认 selector 对象
type registrySelector struct {
    so Options
    rc cache.Cache
}

// Options 设置 只有一个 time to live 设置
type Options struct {
    // TTL is the cache TTL
    TTL time.Duration
}

// Cache 对象
type cache struct {
    // Registry 对象如果过期则通过这个对象获取
    registry.Registry
    // Options 设置 TTL
    opts Options
    // 锁用来控制并发问题
    sync.RWMutex
    // 利用 map 缓存服务器节点 用 name 作为 key
    cache   map[string][]*registry.Service
    // map 保存的服务器过期时间
    ttls    map[string]time.Time
    watched map[string]bool
    //
    exit chan bool
}

// CacheSelector 获取服务节点核心方法
// 利用如果找到则直接返回如果找不到则请求 Registry 获取服务器节点列表
func (c *cache) get(service string) ([]*registry.Service, error) {
    // read lock
    c.RLock()

    // 先获取缓存中的服务节点
    services := c.cache[service]
    // 获取服务节点的 ttl
    ttl := c.ttls[service]

    if c.isValid(services, ttl) {
        // make a copy
        cp := registry.Copy(services)
        // unlock the read
        c.RUnlock()
        // return servics
        return cp, nil
    }

    // get does the actual request for a service and cache it
    get := func(service string) ([]*registry.Service, error) {
        // 如果缓存不存在则
        services, err := c.Registry.GetService(service)
        if err != nil {return nil, err}

        // cache results
        c.Lock()
        // 设置缓存 并同时设置 TTL
        c.set(service, registry.Copy(services))
        c.Unlock()

        return services, nil
    }

    // watch service if not watched
    if _, ok := c.watched[service]; !ok {go c.run(service)
    }

    // unlock the read lock
    c.RUnlock()

    // get and return services
    return get(service)
}

Watch

在调用 cache.get 函数执行完成之后 会创建一个 goroutine 来监控 service 节点。调用的是 Register 中的 watch 方法,获取到 result 然后判断是否要添加、修改或者删除缓存中的服务器节点。

// get 方法调用 run 函数启动监控
if _, ok := c.watched[service]; !ok {go c.run(service)
}

func (c *cache) run(service string) {
    // set watcher
    c.Lock()
    c.watched[service] = true
    c.Unlock()

    // delete watcher on exit
    defer func() {c.Lock()
        delete(c.watched, service)
        c.Unlock()}()

    var a, b int

    for {
        // exit early if already dead
        if c.quit() {return}

        // 设施隔多久检测服务是否可用
        j := rand.Int63n(100)
        time.Sleep(time.Duration(j) * time.Millisecond)

        // 创建一个 watcher
        w, err := c.Registry.Watch(registry.WatchService(service),
        )

        if err != nil {if c.quit() {return}

            d := backoff(a)

            if a > 3 {log.Log("rcache:", err, "backing off", d)
                a = 0
            }

            time.Sleep(d)
            a++

            continue
        }

        // reset a
        a = 0

        // watch 循环下一个事件并调用 update
        // update 函数根据 Register 返回的 result 来修改本地缓存中的节点信息
        if err := c.watch(w); err != nil {if c.quit() {return}

            d := backoff(b)

            if b > 3 {log.Log("rcache:", err, "backing off", d)
                b = 0
            }

            time.Sleep(d)
            b++

            continue
        }

        // reset b
        b = 0
    }
}    
退出移动版