序
本文主要研究一下rocketmq-client-go的defaultProducer
defaultProducer
rocketmq-client-go-v2.0.0/producer/producer.go
type defaultProducer struct { group string client internal.RMQClient state int32 options producerOptions publishInfo sync.Map callbackCh chan interface{} interceptor primitive.Interceptor}
- defaultProducer定义了group、client、state、options、publishInfo、callbackCh、interceptor
NewDefaultProducer
rocketmq-client-go-v2.0.0/producer/producer.go
func NewDefaultProducer(opts ...Option) (*defaultProducer, error) { defaultOpts := defaultProducerOptions() for _, apply := range opts { apply(&defaultOpts) } srvs, err := internal.NewNamesrv(defaultOpts.NameServerAddrs) if err != nil { return nil, errors.Wrap(err, "new Namesrv failed.") } if !defaultOpts.Credentials.IsEmpty() { srvs.SetCredentials(defaultOpts.Credentials) } defaultOpts.Namesrv = srvs producer := &defaultProducer{ group: defaultOpts.GroupName, callbackCh: make(chan interface{}), options: defaultOpts, } producer.client = internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, producer.callbackCh) producer.interceptor = primitive.ChainInterceptors(producer.options.Interceptors...) return producer, nil}
- NewDefaultProducer方法通过internal.NewNamesrv创建NameServerAddrs,之后实例化defaultProducer,然后实例化internal.GetOrNewRocketMQClient及primitive.ChainInterceptors
Start
rocketmq-client-go-v2.0.0/producer/producer.go
func (p *defaultProducer) Start() error { atomic.StoreInt32(&p.state, int32(internal.StateRunning)) if len(p.options.NameServerAddrs) == 0 { p.options.Namesrv.UpdateNameServerAddress(p.options.NameServerDomain, p.options.InstanceName) } p.client.RegisterProducer(p.group, p) p.client.Start() return nil}
- Start方法之执行p.client.RegisterProducer及p.client.Start()
Shutdown
rocketmq-client-go-v2.0.0/producer/producer.go
func (p *defaultProducer) Shutdown() error { atomic.StoreInt32(&p.state, int32(internal.StateShutdown)) p.client.UnregisterProducer(p.group) p.client.Shutdown() return nil}
- Shutdown方法执行p.client.UnregisterProducer及p.client.Shutdown()
SendSync
rocketmq-client-go-v2.0.0/producer/producer.go
func (p *defaultProducer) SendSync(ctx context.Context, msgs ...*primitive.Message) (*primitive.SendResult, error) { if err := p.checkMsg(msgs...); err != nil { return nil, err } msg := p.encodeBatch(msgs...) resp := new(primitive.SendResult) if p.interceptor != nil { primitive.WithMethod(ctx, primitive.SendSync) producerCtx := &primitive.ProducerCtx{ ProducerGroup: p.group, CommunicationMode: primitive.SendSync, BornHost: utils.LocalIP, Message: *msg, SendResult: resp, } ctx = primitive.WithProducerCtx(ctx, producerCtx) err := p.interceptor(ctx, msg, resp, func(ctx context.Context, req, reply interface{}) error { var err error realReq := req.(*primitive.Message) realReply := reply.(*primitive.SendResult) err = p.sendSync(ctx, realReq, realReply) return err }) return resp, err } err := p.sendSync(ctx, msg, resp) return resp, err}
- SendSync方法首先通过p.checkMsg校验消息,然后通过p.encodeBatch编码,之后对于p.interceptor不为null的执行p.interceptor,最后执行p.sendSync(ctx, msg, resp)
sendSync
rocketmq-client-go-v2.0.0/producer/producer.go
func (p *defaultProducer) sendSync(ctx context.Context, msg *primitive.Message, resp *primitive.SendResult) error { retryTime := 1 + p.options.RetryTimes var ( err error ) if p.options.Namespace != "" { msg.Topic = p.options.Namespace + "%" + msg.Topic } var producerCtx *primitive.ProducerCtx for retryCount := 0; retryCount < retryTime; retryCount++ { mq := p.selectMessageQueue(msg) if mq == nil { err = fmt.Errorf("the topic=%s route info not found", msg.Topic) continue } addr := p.options.Namesrv.FindBrokerAddrByName(mq.BrokerName) if addr == "" { return fmt.Errorf("topic=%s route info not found", mq.Topic) } if p.interceptor != nil { producerCtx = primitive.GetProducerCtx(ctx) producerCtx.BrokerAddr = addr producerCtx.MQ = *mq } res, _err := p.client.InvokeSync(ctx, addr, p.buildSendRequest(mq, msg), 3*time.Second) if _err != nil { err = _err continue } return p.client.ProcessSendResponse(mq.BrokerName, res, resp, msg) } return err}
- sendSync会重试retryCount,每次是先通过p.selectMessageQueue(msg)选择mq,然后通过p.options.Namesrv.FindBrokerAddrByName寻找addr,最后执行p.client.InvokeSync(ctx, addr, p.buildSendRequest
SendAsync
rocketmq-client-go-v2.0.0/producer/producer.go
func (p *defaultProducer) SendAsync(ctx context.Context, f func(context.Context, *primitive.SendResult, error), msgs ...*primitive.Message) error { if err := p.checkMsg(msgs...); err != nil { return err } msg := p.encodeBatch(msgs...) if p.interceptor != nil { primitive.WithMethod(ctx, primitive.SendAsync) return p.interceptor(ctx, msg, nil, func(ctx context.Context, req, reply interface{}) error { return p.sendAsync(ctx, msg, f) }) } return p.sendAsync(ctx, msg, f)}
- SendAsync方法主要是执行p.sendAsync(ctx, msg, f)
sendAsync
rocketmq-client-go-v2.0.0/producer/producer.go
func (p *defaultProducer) sendAsync(ctx context.Context, msg *primitive.Message, h func(context.Context, *primitive.SendResult, error)) error { if p.options.Namespace != "" { msg.Topic = p.options.Namespace + "%" + msg.Topic } mq := p.selectMessageQueue(msg) if mq == nil { return errors.Errorf("the topic=%s route info not found", msg.Topic) } addr := p.options.Namesrv.FindBrokerAddrByName(mq.BrokerName) if addr == "" { return errors.Errorf("topic=%s route info not found", mq.Topic) } ctx, _ = context.WithTimeout(ctx, 3*time.Second) return p.client.InvokeAsync(ctx, addr, p.buildSendRequest(mq, msg), func(command *remote.RemotingCommand, err error) { resp := new(primitive.SendResult) if err != nil { h(ctx, nil, err) } else { p.client.ProcessSendResponse(mq.BrokerName, command, resp, msg) h(ctx, resp, nil) } })}
- sendAsync主要是执行p.client.InvokeAsync
SendOneWay
rocketmq-client-go-v2.0.0/producer/producer.go
func (p *defaultProducer) SendOneWay(ctx context.Context, msgs ...*primitive.Message) error { if err := p.checkMsg(msgs...); err != nil { return err } msg := p.encodeBatch(msgs...) if p.interceptor != nil { primitive.WithMethod(ctx, primitive.SendOneway) return p.interceptor(ctx, msg, nil, func(ctx context.Context, req, reply interface{}) error { return p.SendOneWay(ctx, msg) }) } return p.sendOneWay(ctx, msg)}
- SendOneWay主要是执行p.sendOneWay(ctx, msg)
sendOneWay
rocketmq-client-go-v2.0.0/producer/producer.go
func (p *defaultProducer) sendOneWay(ctx context.Context, msg *primitive.Message) error { retryTime := 1 + p.options.RetryTimes if p.options.Namespace != "" { msg.Topic = p.options.Namespace + "%" + msg.Topic } var err error for retryCount := 0; retryCount < retryTime; retryCount++ { mq := p.selectMessageQueue(msg) if mq == nil { err = fmt.Errorf("the topic=%s route info not found", msg.Topic) continue } addr := p.options.Namesrv.FindBrokerAddrByName(mq.BrokerName) if addr == "" { return fmt.Errorf("topic=%s route info not found", mq.Topic) } _err := p.client.InvokeOneWay(ctx, addr, p.buildSendRequest(mq, msg), 3*time.Second) if _err != nil { err = _err continue } return nil } return err}
- sendOneWay主要是重试执行p.client.InvokeOneWay
小结
defaultProducer定义了group、client、state、options、publishInfo、callbackCh、interceptor;它提供了NewDefaultProducer、Start、Shutdown、SendSync、SendAsync、SendOneWay方法
doc
- producer