需要背景
因为 Go-Cannal 常常挂掉,导致 MySQL binlog 同步到 ES 的链路故障。所以改用了另外一种同步计划。用某云的 DTS 生产 MySQL 的 binlong,用 DTS 的 Java 客户端生产 kafka 协定的音讯,失去 MySQL 的变更。Java 再将变更 Push 到 RocketMQ。革新原有的 Go 代码,生产 RocketMQ 音讯,实现业务逻辑( 复用之前的逻辑代码,减小开发量 )。
新计划采纳 MQ 长久化防止消费者 panic 导致数据失落,同时消费者能够分布式解决音讯进步吞吐率。
问题
Go-SDK 的官网文档 : https://help.aliyun.com/document_detail/141783.html?spm=5176....
坑一,RocketMQ 的服务不能用 apache RocketMQ 的 Go client。必须应用官网的 sdk,且官网 sdk 只有 http 协定版本。
坑二,官网的 SDK 中的 token 字段并无用处,能够为空或者任意字符
坑三,官网的 SDK 中 GetConsumer(InstanceID, Topic, Consumer, "*")
第三个参数其实是 TopicName ,或者必须写成 TopicName 一样的值。
坑四,官网的 SDK 调用之前必须配置好 Group,且 Group 必须是 HTTP 的。
示例代码
一个能够跑通的代码 demo 如下:
package mainimport ( "context" "fmt" "os" "rocketmq-demo/mq_http_sdk" "time")var ( AccessKey = "RAM帐号的key" AccessSecret = "RAM帐号的Secret" Endpoint = "必须是http协定的endpoint" Topic = "topicName" GroupID = "必须是http协定的group且当时受权" Token = "任意值" InstanceID = "从web控制台复制")func main() { //client := client cli := mq_http_sdk.NewAliyunMQClientWithTimeout(Endpoint, AccessKey, AccessSecret, Token, time.Second*5) consumer := cli.GetConsumer(InstanceID, Topic, Topic, "*") var responseChan = make(chan mq_http_sdk.ConsumeMessageResponse, 1) var errChan = make(chan error, 1) func(responseChan chan mq_http_sdk.ConsumeMessageResponse, errChan chan error) { consumer.ConsumeMessage(responseChan, errChan, 10, 30) }(responseChan, errChan) go func() { for { select { case response := <-responseChan: fmt.Println("Receive Message: ", response.Messages) // 生产胜利 err := consumer.AckMessage([]string{"OK"}) if err != nil { fmt.Println("Ack Error: ", err) } case err := <-errChan: fmt.Println("Error: ", err) } } }() time.Sleep(10 * time.Second)}