共计 1469 个字符,预计需要花费 4 分钟才能阅读完成。
需要背景
因为 Go-Cannal 常常挂掉,导致 MySQL binlog 同步到 ES 的链路故障。所以改用了另外一种同步计划。用某云的 DTS 生产 MySQL 的 binlong,用 DTS 的 Java 客户端生产 kafka 协定的音讯,失去 MySQL 的变更。Java 再将变更 Push 到 RocketMQ。革新原有的 Go 代码,生产 RocketMQ 音讯,实现业务逻辑(复用之前的逻辑代码,减小开发量)。
新计划采纳 MQ 长久化防止消费者 panic 导致数据失落,同时消费者能够分布式解决音讯进步吞吐率。
问题
Go-SDK 的官网文档:https://help.aliyun.com/document_detail/141783.html?spm=5176….
坑一,RocketMQ 的服务不能用 apache RocketMQ 的 Go client。必须应用官网的 sdk,且官网 sdk 只有 http 协定版本。
坑二,官网的 SDK 中的 token 字段并无用处,能够为空或者任意字符
坑三,官网的 SDK 中 GetConsumer(InstanceID, Topic, Consumer, "*")
第三个参数其实是 TopicName,或者必须写成 TopicName 一样的值。
坑四,官网的 SDK 调用之前必须配置好 Group,且 Group 必须是 HTTP 的。
示例代码
一个能够跑通的代码 demo 如下:
package main | |
import ( | |
"context" | |
"fmt" | |
"os" | |
"rocketmq-demo/mq_http_sdk" | |
"time" | |
) | |
var ( | |
AccessKey = "RAM 帐号的 key" | |
AccessSecret = "RAM 帐号的 Secret" | |
Endpoint = "必须是 http 协定的 endpoint" | |
Topic = "topicName" | |
GroupID = "必须是 http 协定的 group 且当时受权" | |
Token = "任意值" | |
InstanceID = "从 web 控制台复制" | |
) | |
func main() { | |
//client := client | |
cli := mq_http_sdk.NewAliyunMQClientWithTimeout(Endpoint, AccessKey, AccessSecret, Token, time.Second*5) | |
consumer := cli.GetConsumer(InstanceID, Topic, Topic, "*") | |
var responseChan = make(chan mq_http_sdk.ConsumeMessageResponse, 1) | |
var errChan = make(chan error, 1) | |
func(responseChan chan mq_http_sdk.ConsumeMessageResponse, errChan chan error) {consumer.ConsumeMessage(responseChan, errChan, 10, 30) | |
}(responseChan, errChan) | |
go func() { | |
for { | |
select { | |
case response := <-responseChan: | |
fmt.Println("Receive Message:", response.Messages) | |
// 生产胜利 | |
err := consumer.AckMessage([]string{"OK"}) | |
if err != nil {fmt.Println("Ack Error:", err) | |
} | |
case err := <-errChan: | |
fmt.Println("Error:", err) | |
} | |
} | |
}() | |
time.Sleep(10 * time.Second) | |
} |
正文完