乐趣区

聊聊rocketmqclientgo的transactionProducer

本文主要研究一下 rocketmq-client-go 的 transactionProducer

transactionProducer

rocketmq-client-go-v2.0.0/producer/producer.go

type transactionProducer struct {
    producer *defaultProducer
    listener primitive.TransactionListener
}
  • transactionProducer 定义了 producer 及 listener 属性

NewTransactionProducer

rocketmq-client-go-v2.0.0/producer/producer.go

func NewTransactionProducer(listener primitive.TransactionListener, opts ...Option) (*transactionProducer, error) {producer, err := NewDefaultProducer(opts...)
    if err != nil {return nil, errors.Wrap(err, "NewDefaultProducer failed.")
    }
    return &transactionProducer{
        producer: producer,
        listener: listener,
    }, nil
}
  • NewTransactionProducer 方法实例化 transactionProducer

Start

rocketmq-client-go-v2.0.0/producer/producer.go

func (tp *transactionProducer) Start() error {go primitive.WithRecover(func() {tp.checkTransactionState()
    })
    return tp.producer.Start()}
  • Start 方法先异步执行 tp.checkTransactionState(),然后执行 tp.producer.Start()

checkTransactionState

rocketmq-client-go-v2.0.0/producer/producer.go

func (tp *transactionProducer) checkTransactionState() {
    for ch := range tp.producer.callbackCh {switch callback := ch.(type) {
        case *internal.CheckTransactionStateCallback:
            localTransactionState := tp.listener.CheckLocalTransaction(callback.Msg)
            uniqueKey := callback.Msg.GetProperty(primitive.PropertyUniqueClientMessageIdKeyIndex)
            if uniqueKey == "" {uniqueKey = callback.Msg.MsgId}
            header := &internal.EndTransactionRequestHeader{
                CommitLogOffset:      callback.Header.CommitLogOffset,
                ProducerGroup:        tp.producer.group,
                TranStateTableOffset: callback.Header.TranStateTableOffset,
                FromTransactionCheck: true,
                MsgID:                uniqueKey,
                TransactionId:        callback.Header.TransactionId,
                CommitOrRollback:     tp.transactionState(localTransactionState),
            }

            req := remote.NewRemotingCommand(internal.ReqENDTransaction, header, nil)
            req.Remark = tp.errRemark(nil)

            err := tp.producer.client.InvokeOneWay(context.Background(), callback.Addr.String(), req,
                tp.producer.options.SendMsgTimeout)
            if err != nil {rlog.Error("send ReqENDTransaction to broker error", map[string]interface{}{"callback":               callback.Addr.String(),
                    "request":                req.String(),
                    rlog.LogKeyUnderlayError: err,
                })
            }
        default:
            rlog.Error(fmt.Sprintf("unknown type %v", ch), nil)
        }
    }
}
  • checkTransactionState 方法遍历 tp.producer.callbackCh,根据 type 来不同处理,目前支持 CheckTransactionStateCallback,它会构造 EndTransactionRequestHeader 执行 tp.producer.client.InvokeOneWay

Shutdown

rocketmq-client-go-v2.0.0/producer/producer.go

func (tp *transactionProducer) Shutdown() error {return tp.producer.Shutdown()
}
  • Shutdown 方法执行 tp.producer.Shutdown()

SendMessageInTransaction

rocketmq-client-go-v2.0.0/producer/producer.go

func (tp *transactionProducer) SendMessageInTransaction(ctx context.Context, msg *primitive.Message) (*primitive.TransactionSendResult, error) {msg.WithProperty(primitive.PropertyTransactionPrepared, "true")
    msg.WithProperty(primitive.PropertyProducerGroup, tp.producer.options.GroupName)

    rsp, err := tp.producer.SendSync(ctx, msg)
    if err != nil {return nil, err}
    localTransactionState := primitive.UnknowState
    switch rsp.Status {
    case primitive.SendOK:
        if len(rsp.TransactionID) > 0 {msg.WithProperty("__transactionId__", rsp.TransactionID)
        }
        transactionId := msg.GetProperty(primitive.PropertyUniqueClientMessageIdKeyIndex)
        if len(transactionId) > 0 {msg.TransactionId = transactionId}
        localTransactionState = tp.listener.ExecuteLocalTransaction(msg)
        if localTransactionState != primitive.CommitMessageState {rlog.Error("executeLocalTransaction but state unexpected", map[string]interface{}{
                "localState": localTransactionState,
                "message":    msg,
            })
        }

    case primitive.SendFlushDiskTimeout, primitive.SendFlushSlaveTimeout, primitive.SendSlaveNotAvailable:
        localTransactionState = primitive.RollbackMessageState
    default:
    }

    tp.endTransaction(*rsp, err, localTransactionState)

    transactionSendResult := &primitive.TransactionSendResult{
        SendResult: rsp,
        State:      localTransactionState,
    }

    return transactionSendResult, nil
}
  • SendMessageInTransaction 方法先执行 tp.producer.SendSync(ctx, msg),然后根据 rsp.Status 来做不同处理;对于 primitive.SendOK 执行 tp.listener.ExecuteLocalTransaction 来更新 localTransactionState;对于 primitive.SendFlushDiskTimeout、primitive.SendFlushSlaveTimeout、primitive.SendSlaveNotAvailable 则更新 localTransactionState 为 primitive.RollbackMessageState;最后执行 tp.endTransaction

小结

transactionProducer 定义了 producer 及 listener 属性;它提供了 NewTransactionProducer、Start、Shutdown、SendMessageInTransaction 方法

doc

  • producer
退出移动版