序
本文主要研究一下 rocketmq-client-go 的 transactionProducer
transactionProducer
rocketmq-client-go-v2.0.0/producer/producer.go
type transactionProducer struct {
producer *defaultProducer
listener primitive.TransactionListener
}
- transactionProducer 定义了 producer 及 listener 属性
NewTransactionProducer
rocketmq-client-go-v2.0.0/producer/producer.go
func NewTransactionProducer(listener primitive.TransactionListener, opts ...Option) (*transactionProducer, error) {producer, err := NewDefaultProducer(opts...)
if err != nil {return nil, errors.Wrap(err, "NewDefaultProducer failed.")
}
return &transactionProducer{
producer: producer,
listener: listener,
}, nil
}
- NewTransactionProducer 方法实例化 transactionProducer
Start
rocketmq-client-go-v2.0.0/producer/producer.go
func (tp *transactionProducer) Start() error {go primitive.WithRecover(func() {tp.checkTransactionState()
})
return tp.producer.Start()}
- Start 方法先异步执行 tp.checkTransactionState(),然后执行 tp.producer.Start()
checkTransactionState
rocketmq-client-go-v2.0.0/producer/producer.go
func (tp *transactionProducer) checkTransactionState() {
for ch := range tp.producer.callbackCh {switch callback := ch.(type) {
case *internal.CheckTransactionStateCallback:
localTransactionState := tp.listener.CheckLocalTransaction(callback.Msg)
uniqueKey := callback.Msg.GetProperty(primitive.PropertyUniqueClientMessageIdKeyIndex)
if uniqueKey == "" {uniqueKey = callback.Msg.MsgId}
header := &internal.EndTransactionRequestHeader{
CommitLogOffset: callback.Header.CommitLogOffset,
ProducerGroup: tp.producer.group,
TranStateTableOffset: callback.Header.TranStateTableOffset,
FromTransactionCheck: true,
MsgID: uniqueKey,
TransactionId: callback.Header.TransactionId,
CommitOrRollback: tp.transactionState(localTransactionState),
}
req := remote.NewRemotingCommand(internal.ReqENDTransaction, header, nil)
req.Remark = tp.errRemark(nil)
err := tp.producer.client.InvokeOneWay(context.Background(), callback.Addr.String(), req,
tp.producer.options.SendMsgTimeout)
if err != nil {rlog.Error("send ReqENDTransaction to broker error", map[string]interface{}{"callback": callback.Addr.String(),
"request": req.String(),
rlog.LogKeyUnderlayError: err,
})
}
default:
rlog.Error(fmt.Sprintf("unknown type %v", ch), nil)
}
}
}
- checkTransactionState 方法遍历 tp.producer.callbackCh,根据 type 来不同处理,目前支持 CheckTransactionStateCallback,它会构造 EndTransactionRequestHeader 执行 tp.producer.client.InvokeOneWay
Shutdown
rocketmq-client-go-v2.0.0/producer/producer.go
func (tp *transactionProducer) Shutdown() error {return tp.producer.Shutdown()
}
- Shutdown 方法执行 tp.producer.Shutdown()
SendMessageInTransaction
rocketmq-client-go-v2.0.0/producer/producer.go
func (tp *transactionProducer) SendMessageInTransaction(ctx context.Context, msg *primitive.Message) (*primitive.TransactionSendResult, error) {msg.WithProperty(primitive.PropertyTransactionPrepared, "true")
msg.WithProperty(primitive.PropertyProducerGroup, tp.producer.options.GroupName)
rsp, err := tp.producer.SendSync(ctx, msg)
if err != nil {return nil, err}
localTransactionState := primitive.UnknowState
switch rsp.Status {
case primitive.SendOK:
if len(rsp.TransactionID) > 0 {msg.WithProperty("__transactionId__", rsp.TransactionID)
}
transactionId := msg.GetProperty(primitive.PropertyUniqueClientMessageIdKeyIndex)
if len(transactionId) > 0 {msg.TransactionId = transactionId}
localTransactionState = tp.listener.ExecuteLocalTransaction(msg)
if localTransactionState != primitive.CommitMessageState {rlog.Error("executeLocalTransaction but state unexpected", map[string]interface{}{
"localState": localTransactionState,
"message": msg,
})
}
case primitive.SendFlushDiskTimeout, primitive.SendFlushSlaveTimeout, primitive.SendSlaveNotAvailable:
localTransactionState = primitive.RollbackMessageState
default:
}
tp.endTransaction(*rsp, err, localTransactionState)
transactionSendResult := &primitive.TransactionSendResult{
SendResult: rsp,
State: localTransactionState,
}
return transactionSendResult, nil
}
- SendMessageInTransaction 方法先执行 tp.producer.SendSync(ctx, msg),然后根据 rsp.Status 来做不同处理;对于 primitive.SendOK 执行 tp.listener.ExecuteLocalTransaction 来更新 localTransactionState;对于 primitive.SendFlushDiskTimeout、primitive.SendFlushSlaveTimeout、primitive.SendSlaveNotAvailable 则更新 localTransactionState 为 primitive.RollbackMessageState;最后执行 tp.endTransaction
小结
transactionProducer 定义了 producer 及 listener 属性;它提供了 NewTransactionProducer、Start、Shutdown、SendMessageInTransaction 方法
doc
- producer