概述
Client 主要是用来执行请求服务和订阅发布事件。是对于 broker,Transort 的一种封装方便使用。
Init
初始化客户端函数
- 初始化连接池数量和连接池 TTL
- 调用注入的 opts 函数列表
- 最后初始化连接池
func (r *rpcClient) Init(opts ...Option) error {
size := r.opts.PoolSize
ttl := r.opts.PoolTTL
for _, o := range opts {o(&r.opts)
}
// update pool configuration if the options changed
if size != r.opts.PoolSize || ttl != r.opts.PoolTTL {r.pool.Lock()
r.pool.size = r.opts.PoolSize
r.pool.ttl = int64(r.opts.PoolTTL.Seconds())
r.pool.Unlock()}
return nil
}
==Call==
Call 是 Client 接口中最主要的方法,在之前 Go Micro Selector 源码分析
- Client 调用 Call 方法
- Call 方法调用 selector 组件的 Select 方法,获取 next 函数
- call 匿名函数中调用 next 函数(默认为 CacheSelector 随机获取服务列表中的节点, Go Micro Selector 源码分析)返回 node
- 以 grpcClient 为例,调用 grpcClient.call
- call 函数中获取 conn,然后 Invoke 调用服务端函数
func (g *grpcClient) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
// 复制出 options
callOpts := g.opts.CallOptions
for _, opt := range opts {opt(&callOpts)
}
// 调用 next 函数 获取 selector
next, err := g.next(req, callOpts)
if err != nil {return err}
// 检查 context Deadline
d, ok := ctx.Deadline()
if !ok {
// 没有 deadline 创建一个新的
ctx, _ = context.WithTimeout(ctx, callOpts.RequestTimeout)
} else {
// 获取到 deadline 设置 context
opt := client.WithRequestTimeout(time.Until(d))
opt(&callOpts)
}
// should we noop right here?
select {case <-ctx.Done():
return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408)
default:
}
// 复制 call 函数 在下面的 goroutine 中使用
gcall := g.call
// wrap the call in reverse
for i := len(callOpts.CallWrappers); i > 0; i-- {gcall = callOpts.CallWrappers[i-1](gcall)
}
// return errors.New("go.micro.client", "request timeout", 408)
call := func(i int) error {
// call backoff first. Someone may want an initial start delay
t, err := callOpts.Backoff(ctx, req, i)
if err != nil {return errors.InternalServerError("go.micro.client", err.Error())
}
// only sleep if greater than 0
if t.Seconds() > 0 {time.Sleep(t)
}
// select next node
node, err := next()
if err != nil && err == selector.ErrNotFound {return errors.NotFound("go.micro.client", err.Error())
} else if err != nil {return errors.InternalServerError("go.micro.client", err.Error())
}
// 调用 call 正式调用服务端接口
err = gcall(ctx, node, req, rsp, callOpts)
g.opts.Selector.Mark(req.Service(), node, err)
return err
}
ch := make(chan error, callOpts.Retries+1)
var gerr error
// 重试
for i := 0; i <= callOpts.Retries; i++ {go func(i int) {
// 调动 call 返回 channel
ch <- call(i)
}(i)
select {case <-ctx.Done():
return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408)
case err := <-ch:
// if the call succeeded lets bail early
if err == nil {return nil}
retry, rerr := callOpts.Retry(ctx, req, i, err)
if rerr != nil {return rerr}
if !retry {return err}
gerr = err
}
}
return gerr
}
Stream
Stream 跟 call 的逻辑几乎是一样的,不过 stream 调用的是 rpc_client.stream 函数。这边就不过多的分析了
func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOption) (Stream, error) {
// make a copy of call opts
callOpts := r.opts.CallOptions
for _, opt := range opts {opt(&callOpts)
}
next, err := r.next(request, callOpts)
if err != nil {return nil, err}
// should we noop right here?
select {case <-ctx.Done():
return nil, errors.Timeout("go.micro.client", fmt.Sprintf("%v", ctx.Err()))
default:
}
call := func(i int) (Stream, error) {
// call backoff first. Someone may want an initial start delay
t, err := callOpts.Backoff(ctx, request, i)
if err != nil {return nil, errors.InternalServerError("go.micro.client", "backoff error: %v", err.Error())
}
// only sleep if greater than 0
if t.Seconds() > 0 {time.Sleep(t)
}
node, err := next()
if err != nil && err == selector.ErrNotFound {return nil, errors.NotFound("go.micro.client", "service %s: %v", request.Service(), err.Error())
} else if err != nil {return nil, errors.InternalServerError("go.micro.client", "error getting next %s node: %v", request.Service(), err.Error())
}
stream, err := r.stream(ctx, node, request, callOpts)
r.opts.Selector.Mark(request.Service(), node, err)
return stream, err
}
type response struct {
stream Stream
err error
}
ch := make(chan response, callOpts.Retries+1)
var grr error
for i := 0; i <= callOpts.Retries; i++ {go func(i int) {s, err := call(i)
ch <- response{s, err}
}(i)
select {case <-ctx.Done():
return nil, errors.Timeout("go.micro.client", fmt.Sprintf("call timeout: %v", ctx.Err()))
case rsp := <-ch:
// if the call succeeded lets bail early
if rsp.err == nil {return rsp.stream, nil}
retry, rerr := callOpts.Retry(ctx, request, i, rsp.err)
if rerr != nil {return nil, rerr}
if !retry {return nil, rsp.err}
grr = rsp.err
}
}
return nil, grr
}
Publish
Client 中的 Publish 主要是调用 broker 中的 publish:r.opts.Broker.Publish
然而在 client 的 publish 函数中,获取了 topic 准备了 body 最后调用 broker 的 publish
func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOption) error {
options := PublishOptions{Context: context.Background(),
}
for _, o := range opts {o(&options)
}
md, ok := metadata.FromContext(ctx)
if !ok {md = make(map[string]string)
}
id := uuid.New().String()
md["Content-Type"] = msg.ContentType()
md["Micro-Topic"] = msg.Topic()
md["Micro-Id"] = id
// set the topic
topic := msg.Topic()
// get proxy
if prx := os.Getenv("MICRO_PROXY"); len(prx) > 0 {options.Exchange = prx}
// get the exchange
if len(options.Exchange) > 0 {topic = options.Exchange}
// encode message body
cf, err := r.newCodec(msg.ContentType())
if err != nil {return errors.InternalServerError("go.micro.client", err.Error())
}
b := &buffer{bytes.NewBuffer(nil)}
if err := cf(b).Write(&codec.Message{
Target: topic,
Type: codec.Event,
Header: map[string]string{
"Micro-Id": id,
"Micro-Topic": msg.Topic(),},
}, msg.Payload()); err != nil {return errors.InternalServerError("go.micro.client", err.Error())
}
r.once.Do(func() {r.opts.Broker.Connect()
})
return r.opts.Broker.Publish(topic, &broker.Message{
Header: md,
Body: b.Bytes(),})
}