本文主要研究一下rocketmq-client-go的defaultProducer

defaultProducer

rocketmq-client-go-v2.0.0/producer/producer.go

type defaultProducer struct {    group       string    client      internal.RMQClient    state       int32    options     producerOptions    publishInfo sync.Map    callbackCh  chan interface{}    interceptor primitive.Interceptor}
  • defaultProducer定义了group、client、state、options、publishInfo、callbackCh、interceptor

NewDefaultProducer

rocketmq-client-go-v2.0.0/producer/producer.go

func NewDefaultProducer(opts ...Option) (*defaultProducer, error) {    defaultOpts := defaultProducerOptions()    for _, apply := range opts {        apply(&defaultOpts)    }    srvs, err := internal.NewNamesrv(defaultOpts.NameServerAddrs)    if err != nil {        return nil, errors.Wrap(err, "new Namesrv failed.")    }    if !defaultOpts.Credentials.IsEmpty() {        srvs.SetCredentials(defaultOpts.Credentials)    }    defaultOpts.Namesrv = srvs    producer := &defaultProducer{        group:      defaultOpts.GroupName,        callbackCh: make(chan interface{}),        options:    defaultOpts,    }    producer.client = internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, producer.callbackCh)    producer.interceptor = primitive.ChainInterceptors(producer.options.Interceptors...)    return producer, nil}
  • NewDefaultProducer方法通过internal.NewNamesrv创建NameServerAddrs,之后实例化defaultProducer,然后实例化internal.GetOrNewRocketMQClient及primitive.ChainInterceptors

Start

rocketmq-client-go-v2.0.0/producer/producer.go

func (p *defaultProducer) Start() error {    atomic.StoreInt32(&p.state, int32(internal.StateRunning))    if len(p.options.NameServerAddrs) == 0 {        p.options.Namesrv.UpdateNameServerAddress(p.options.NameServerDomain, p.options.InstanceName)    }    p.client.RegisterProducer(p.group, p)    p.client.Start()    return nil}
  • Start方法之执行p.client.RegisterProducer及p.client.Start()

Shutdown

rocketmq-client-go-v2.0.0/producer/producer.go

func (p *defaultProducer) Shutdown() error {    atomic.StoreInt32(&p.state, int32(internal.StateShutdown))    p.client.UnregisterProducer(p.group)    p.client.Shutdown()    return nil}
  • Shutdown方法执行p.client.UnregisterProducer及p.client.Shutdown()

SendSync

rocketmq-client-go-v2.0.0/producer/producer.go

func (p *defaultProducer) SendSync(ctx context.Context, msgs ...*primitive.Message) (*primitive.SendResult, error) {    if err := p.checkMsg(msgs...); err != nil {        return nil, err    }    msg := p.encodeBatch(msgs...)    resp := new(primitive.SendResult)    if p.interceptor != nil {        primitive.WithMethod(ctx, primitive.SendSync)        producerCtx := &primitive.ProducerCtx{            ProducerGroup:     p.group,            CommunicationMode: primitive.SendSync,            BornHost:          utils.LocalIP,            Message:           *msg,            SendResult:        resp,        }        ctx = primitive.WithProducerCtx(ctx, producerCtx)        err := p.interceptor(ctx, msg, resp, func(ctx context.Context, req, reply interface{}) error {            var err error            realReq := req.(*primitive.Message)            realReply := reply.(*primitive.SendResult)            err = p.sendSync(ctx, realReq, realReply)            return err        })        return resp, err    }    err := p.sendSync(ctx, msg, resp)    return resp, err}
  • SendSync方法首先通过p.checkMsg校验消息,然后通过p.encodeBatch编码,之后对于p.interceptor不为null的执行p.interceptor,最后执行p.sendSync(ctx, msg, resp)

sendSync

rocketmq-client-go-v2.0.0/producer/producer.go

func (p *defaultProducer) sendSync(ctx context.Context, msg *primitive.Message, resp *primitive.SendResult) error {    retryTime := 1 + p.options.RetryTimes    var (        err error    )    if p.options.Namespace != "" {        msg.Topic = p.options.Namespace + "%" + msg.Topic    }    var producerCtx *primitive.ProducerCtx    for retryCount := 0; retryCount < retryTime; retryCount++ {        mq := p.selectMessageQueue(msg)        if mq == nil {            err = fmt.Errorf("the topic=%s route info not found", msg.Topic)            continue        }        addr := p.options.Namesrv.FindBrokerAddrByName(mq.BrokerName)        if addr == "" {            return fmt.Errorf("topic=%s route info not found", mq.Topic)        }        if p.interceptor != nil {            producerCtx = primitive.GetProducerCtx(ctx)            producerCtx.BrokerAddr = addr            producerCtx.MQ = *mq        }        res, _err := p.client.InvokeSync(ctx, addr, p.buildSendRequest(mq, msg), 3*time.Second)        if _err != nil {            err = _err            continue        }        return p.client.ProcessSendResponse(mq.BrokerName, res, resp, msg)    }    return err}
  • sendSync会重试retryCount,每次是先通过p.selectMessageQueue(msg)选择mq,然后通过p.options.Namesrv.FindBrokerAddrByName寻找addr,最后执行p.client.InvokeSync(ctx, addr, p.buildSendRequest

SendAsync

rocketmq-client-go-v2.0.0/producer/producer.go

func (p *defaultProducer) SendAsync(ctx context.Context, f func(context.Context, *primitive.SendResult, error), msgs ...*primitive.Message) error {    if err := p.checkMsg(msgs...); err != nil {        return err    }    msg := p.encodeBatch(msgs...)    if p.interceptor != nil {        primitive.WithMethod(ctx, primitive.SendAsync)        return p.interceptor(ctx, msg, nil, func(ctx context.Context, req, reply interface{}) error {            return p.sendAsync(ctx, msg, f)        })    }    return p.sendAsync(ctx, msg, f)}
  • SendAsync方法主要是执行p.sendAsync(ctx, msg, f)

sendAsync

rocketmq-client-go-v2.0.0/producer/producer.go

func (p *defaultProducer) sendAsync(ctx context.Context, msg *primitive.Message, h func(context.Context, *primitive.SendResult, error)) error {    if p.options.Namespace != "" {        msg.Topic = p.options.Namespace + "%" + msg.Topic    }    mq := p.selectMessageQueue(msg)    if mq == nil {        return errors.Errorf("the topic=%s route info not found", msg.Topic)    }    addr := p.options.Namesrv.FindBrokerAddrByName(mq.BrokerName)    if addr == "" {        return errors.Errorf("topic=%s route info not found", mq.Topic)    }    ctx, _ = context.WithTimeout(ctx, 3*time.Second)    return p.client.InvokeAsync(ctx, addr, p.buildSendRequest(mq, msg), func(command *remote.RemotingCommand, err error) {        resp := new(primitive.SendResult)        if err != nil {            h(ctx, nil, err)        } else {            p.client.ProcessSendResponse(mq.BrokerName, command, resp, msg)            h(ctx, resp, nil)        }    })}
  • sendAsync主要是执行p.client.InvokeAsync

SendOneWay

rocketmq-client-go-v2.0.0/producer/producer.go

func (p *defaultProducer) SendOneWay(ctx context.Context, msgs ...*primitive.Message) error {    if err := p.checkMsg(msgs...); err != nil {        return err    }    msg := p.encodeBatch(msgs...)    if p.interceptor != nil {        primitive.WithMethod(ctx, primitive.SendOneway)        return p.interceptor(ctx, msg, nil, func(ctx context.Context, req, reply interface{}) error {            return p.SendOneWay(ctx, msg)        })    }    return p.sendOneWay(ctx, msg)}
  • SendOneWay主要是执行p.sendOneWay(ctx, msg)

sendOneWay

rocketmq-client-go-v2.0.0/producer/producer.go

func (p *defaultProducer) sendOneWay(ctx context.Context, msg *primitive.Message) error {    retryTime := 1 + p.options.RetryTimes    if p.options.Namespace != "" {        msg.Topic = p.options.Namespace + "%" + msg.Topic    }    var err error    for retryCount := 0; retryCount < retryTime; retryCount++ {        mq := p.selectMessageQueue(msg)        if mq == nil {            err = fmt.Errorf("the topic=%s route info not found", msg.Topic)            continue        }        addr := p.options.Namesrv.FindBrokerAddrByName(mq.BrokerName)        if addr == "" {            return fmt.Errorf("topic=%s route info not found", mq.Topic)        }        _err := p.client.InvokeOneWay(ctx, addr, p.buildSendRequest(mq, msg), 3*time.Second)        if _err != nil {            err = _err            continue        }        return nil    }    return err}
  • sendOneWay主要是重试执行p.client.InvokeOneWay

小结

defaultProducer定义了group、client、state、options、publishInfo、callbackCh、interceptor;它提供了NewDefaultProducer、Start、Shutdown、SendSync、SendAsync、SendOneWay方法

doc

  • producer