

共计 5078 个字符,预计需要花费 13 分钟才能阅读完成。


Client 主要是用来执行请求服务和订阅发布事件。是对于 broker,Transort 的一种封装方便使用。



  1. 初始化连接池数量和连接池 TTL
  2. 调用注入的 opts 函数列表
  3. 最后初始化连接池
func (r *rpcClient) Init(opts ...Option) error {
    size := r.opts.PoolSize
    ttl := r.opts.PoolTTL

    for _, o := range opts {o(&r.opts)

    // update pool configuration if the options changed
    if size != r.opts.PoolSize || ttl != r.opts.PoolTTL {r.pool.Lock()
        r.pool.size = r.opts.PoolSize
        r.pool.ttl = int64(r.opts.PoolTTL.Seconds())

    return nil


Call 是 Client 接口中最主要的方法,在之前 Go Micro Selector 源码分析

  1. Client 调用 Call 方法
  2. Call 方法调用 selector 组件的 Select 方法,获取 next 函数
  3. call 匿名函数中调用 next 函数(默认为 CacheSelector 随机获取服务列表中的节点, Go Micro Selector 源码分析)返回 node
  4. 以 grpcClient 为例,调用 grpcClient.call
  5. call 函数中获取 conn,然后 Invoke 调用服务端函数
func (g *grpcClient) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
    // 复制出 options
    callOpts := g.opts.CallOptions
    for _, opt := range opts {opt(&callOpts)
    // 调用 next 函数 获取 selector
    next, err := g.next(req, callOpts)
    if err != nil {return err}

    // 检查 context Deadline
    d, ok := ctx.Deadline()
    if !ok {
        // 没有 deadline 创建一个新的
        ctx, _ = context.WithTimeout(ctx, callOpts.RequestTimeout)
    } else {
        // 获取到 deadline 设置 context 
        opt := client.WithRequestTimeout(time.Until(d))

    // should we noop right here?
    select {case <-ctx.Done():
        return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408)

    // 复制 call 函数 在下面的 goroutine 中使用
    gcall := g.call

    // wrap the call in reverse
    for i := len(callOpts.CallWrappers); i > 0; i-- {gcall = callOpts.CallWrappers[i-1](gcall)

    // return errors.New("go.micro.client", "request timeout", 408)
    call := func(i int) error {
        // call backoff first. Someone may want an initial start delay
        t, err := callOpts.Backoff(ctx, req, i)
        if err != nil {return errors.InternalServerError("go.micro.client", err.Error())

        // only sleep if greater than 0
        if t.Seconds() > 0 {time.Sleep(t)

        // select next node
        node, err := next()
        if err != nil && err == selector.ErrNotFound {return errors.NotFound("go.micro.client", err.Error())
        } else if err != nil {return errors.InternalServerError("go.micro.client", err.Error())

        // 调用 call 正式调用服务端接口
        err = gcall(ctx, node, req, rsp, callOpts)
        g.opts.Selector.Mark(req.Service(), node, err)
        return err

    ch := make(chan error, callOpts.Retries+1)
    var gerr error
    // 重试 
    for i := 0; i <= callOpts.Retries; i++ {go func(i int) {
            // 调动 call 返回 channel 
            ch <- call(i)

        select {case <-ctx.Done():
            return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408)
        case err := <-ch:
            // if the call succeeded lets bail early
            if err == nil {return nil}

            retry, rerr := callOpts.Retry(ctx, req, i, err)
            if rerr != nil {return rerr}

            if !retry {return err}

            gerr = err

    return gerr


Stream 跟 call 的逻辑几乎是一样的,不过 stream 调用的是 rpc_client.stream 函数。这边就不过多的分析了

func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOption) (Stream, error) {
    // make a copy of call opts
    callOpts := r.opts.CallOptions
    for _, opt := range opts {opt(&callOpts)

    next, err := r.next(request, callOpts)
    if err != nil {return nil, err}

    // should we noop right here?
    select {case <-ctx.Done():
        return nil, errors.Timeout("go.micro.client", fmt.Sprintf("%v", ctx.Err()))

    call := func(i int) (Stream, error) {
        // call backoff first. Someone may want an initial start delay
        t, err := callOpts.Backoff(ctx, request, i)
        if err != nil {return nil, errors.InternalServerError("go.micro.client", "backoff error: %v", err.Error())

        // only sleep if greater than 0
        if t.Seconds() > 0 {time.Sleep(t)

        node, err := next()
        if err != nil && err == selector.ErrNotFound {return nil, errors.NotFound("go.micro.client", "service %s: %v", request.Service(), err.Error())
        } else if err != nil {return nil, errors.InternalServerError("go.micro.client", "error getting next %s node: %v", request.Service(), err.Error())

        stream, err := r.stream(ctx, node, request, callOpts)
        r.opts.Selector.Mark(request.Service(), node, err)
        return stream, err

    type response struct {
        stream Stream
        err    error

    ch := make(chan response, callOpts.Retries+1)
    var grr error

    for i := 0; i <= callOpts.Retries; i++ {go func(i int) {s, err := call(i)
            ch <- response{s, err}

        select {case <-ctx.Done():
            return nil, errors.Timeout("go.micro.client", fmt.Sprintf("call timeout: %v", ctx.Err()))
        case rsp := <-ch:
            // if the call succeeded lets bail early
            if rsp.err == nil {return rsp.stream, nil}

            retry, rerr := callOpts.Retry(ctx, request, i, rsp.err)
            if rerr != nil {return nil, rerr}

            if !retry {return nil, rsp.err}

            grr = rsp.err

    return nil, grr


Client 中的 Publish 主要是调用 broker 中的 publish:r.opts.Broker.Publish
然而在 client 的 publish 函数中,获取了 topic 准备了 body 最后调用 broker 的 publish

func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOption) error {
    options := PublishOptions{Context: context.Background(),
    for _, o := range opts {o(&options)

    md, ok := metadata.FromContext(ctx)
    if !ok {md = make(map[string]string)

    id := uuid.New().String()
    md["Content-Type"] = msg.ContentType()
    md["Micro-Topic"] = msg.Topic()
    md["Micro-Id"] = id

    // set the topic
    topic := msg.Topic()

    // get proxy
    if prx := os.Getenv("MICRO_PROXY"); len(prx) > 0 {options.Exchange = prx}

    // get the exchange
    if len(options.Exchange) > 0 {topic = options.Exchange}

    // encode message body
    cf, err := r.newCodec(msg.ContentType())
    if err != nil {return errors.InternalServerError("go.micro.client", err.Error())
    b := &buffer{bytes.NewBuffer(nil)}
    if err := cf(b).Write(&codec.Message{
        Target: topic,
        Type:   codec.Event,
        Header: map[string]string{
            "Micro-Id":    id,
            "Micro-Topic": msg.Topic(),},
    }, msg.Payload()); err != nil {return errors.InternalServerError("go.micro.client", err.Error())
    r.once.Do(func() {r.opts.Broker.Connect()

    return r.opts.Broker.Publish(topic, &broker.Message{
        Header: md,
        Body:   b.Bytes(),})
