0 引言

在很多业务状况下,咱们都会在零碎中引入ElasticSearch搜索引擎作为做全文检索的优化计划。

如果数据库数据产生更新,这时候就须要在业务代码中写一段同步更新ElasticSearch的代码。

上面我会以一个blog文章治理为例来演示canal+RocketMQGolang实现MySQLElasticSearch的数据同步。

示例地址:https://gitee.com/thepoy/Rock...

尽量不要在 macOS 中应用,创立的容器多多少少会有问题,出问题时很难找到症结所在,而在 linux 零碎中应用则一切正常。

1 RocketMQ

RocketMQ是没有官网镜像的,所以须要在本地创立:

cd rocketMQdocker build --no-cache -f Dockerfile -t rocketmq:4.8.0 --build-arg version=4.8.0 .
可依据本人的需要对 Dockerfile 进行批改

批改环境变量文件.env中的主机地址为本人的 ip 地址,而后应用 rocketMQ 目录中的配置文件创立容器:

docker-compose --file compose.yml up

2 Canal

2.1 创立容器

应用我的项目根目录中的配置文件创立mysqlcanal-admincanal-server容器:

cd ..docker-compose --file compose.yml up

也有一个环境变量文件须要批改,另外,compos 文件中的信息也须要依据须要批改,如 mysql 的 root 明码。

2.2 为 canal 账号受权

创立 mysql 容器时也创立了 canal 账号,须要为这个账号受权。

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';FLUSH PRIVILEGES;

2.3 关上 canal 治理后盾

http://localhost:8089,关上后...admin账号登录,默认明码为123456,治理后盾的界面如下图所示:

因为 compose.yml 文件中曾经配置了 canal-server,所以在后盾中能看见曾经启动的一个 server。

2.4 配置实例 / Instance

点击侧边栏的Instance治理,抉择新建 Instance,抉择那个惟一的主机,再点击载入模板,批改上面的一些参数:

# 勾销第 3 行中 mysql slaveId 的正文,轻易批改为一个数字(不能是 1,因为 mysql 的 server_id=1)canal.instance.mysql.slaveId=1234# 批改 mysql 的地址,canal-admin 容器中也有一个 mysql 实例,咱们不应用这个 mysql,而应用独自的 mysql 容器canal.instance.master.address=192.168.31.129:3306# 改成本人的数据库信息(须要监听的数据库,新建一个 database 就能够),这一行须要增加canal.instance.defaultDatabaseName = blog# table regex 须要过滤的表 这里数据库的中所有表canal.instance.filter.regex = .\*\\..\*# MQ 配置 日志数据会发送到 blog_articles 这个 topic 上canal.mq.topic=blog_articles

实例名称轻易填一个就行。

创立好的新实例默认是进行状态,将其启动。

创立 database 和 table:

CREATE DATABASE IF NOT EXISTS `blog`;USE blog;CREATE TABLE IF NOT EXISTS `blog_articles` (    `id` INT AUTO_INCREMENT PRIMARY KEY NOT NULL,    `title` VARCHAR(100) NOT NULL UNIQUE,    `content` TEXT NOT NULL,    `created_date` VARCHAR(10) NOT NULL);

2.5 配置 canal-server

批改上面的参数:

# 默信是 tcp, 批改为 rocketMQcanal.serverMode = rocketMQ###########################################################             RocketMQ         ###############################################################rocketmq.producer.group = blogrocketmq.namesrv.addr = 192.168.31.129:9876

保留后 server 会重启,这时关上 rocketMQ 控制台,可能看到新减少了一个主题blog_articles

能够通过增加一行数据来测试是否胜利:

INSERT INTO blog.blog_articles(title, content, created_date)VALUES('test1', '这是第 1 个测试文章', '2020-01-01');

增加后,在 rocketMQ 控制台查看音讯:

能够看到,增加数据的音讯曾经产生期待生产。

3 Elasticsearch

elasticsearch 容器会在应用配置文件创立 Canal 时一起创立,须要留神的是,如果你想批改 elasticsearch 的 tag,能够在.env文件中批改ES_TAG的值。

我没有创立 Kibana 容器,有需要的话能够自行创立。

4 代码设计

当数据库发生变化时,Canal 会将变动信息发送到 RocketMQ 中,所以咱们只须要生产 RocketMQ 中的音讯就能够做到即时或很快地将变动的数据同步到 Elasticsearch 中。

4.1 RocketMQ

常量
const (    // topic 在 Canal 中曾经配置了,这里肯定不能写错    topic              string = "blog_articles"    // 消费者组能够自定义,但要与 2.5 节中设置的 rocketmq.producer.group 雷同    consumerGroup      string = "blog")

从环境变量中获取host,并生成server

var (    server string    Host   string)func init() {    Host = os.Getenv("HOST")    if Host == "" {        Host = "localhost"    }    server = Host + ":9876"}
构造体的设计

尽管代码中没有用到这个构造体,但我感觉须要拿进去聊一聊:

type ChangedData struct {    // 变动的文档汇合    Data []es.Document `json:"data"`    // 发生变化的数据库    Database string `json:"database"`    // 数据库内执行工夫    ES uint64 `json:"es"`    // 就是 id    ID uint `json:"id"`    // 是否为 DDL 语句,create database、create table、alter table    IsDDL bool `json:"isDdl"`    // 表构造的字段类型    MysqlType map[string]string `json:"mysqlType"`    // 主键名称    PrimaryKeyNames []string `json:"pkNames"`    // sql 语句    SQL string `json:"sql"`    // sql 语句类型    SqlType map[string]uint `json:"sqlType"`    // 表名称    Table string `json:"table"`    // 操作类型,(新增)INSERT、(更新)UPDATE、(删除)DELETE、(删除表)ERASE等等    Type string `json:"type"`    // 数据库内解析工夫    Timestamp uint `json:"ts"`    // 旧数据    Old []map[string]string `json:"old"`}

其中es.Document构造如下:

type Document struct {    ID          string `json:"id,omitempty"`    Title       string `json:"title,omitempty"`    Content     string `json:"content,omitempty"`    CreatedDate string `json:"created_date,omitempty"`}
应用第三方 json 库

这也是为什么没用到下面的构造体的起因。

应用 json 规范库解决音讯数据并同步到 es 中,齐全是小题大做,会节约很多的性能。

data := gjson.Get(string(msg.body), "data")

应用 gjson 库,能够不便地从 json 字符串中获取想要的数据,并进行后续解决,无需将整个 json 反序列化。

应用 context 阻塞或退出生产线程

启动生产订阅后,阻塞多久,就会生产多久,为了可能管制何时完结生产,这里应用contextcancle()函数管制:

    err = c.Start()    ...    select {    case <-ctx.Done():        fmt.Println(strings.Repeat("*", 60))        fmt.Println("shutdown consumer")        fmt.Println(strings.Repeat("*", 60))    }    err = c.Shutdown()    ...

4.2 Elasticsearch

es 的代码是通用的,没有特地阐明的意义,间接看代码即可。

4.3 二者联合

联合 RocketMQ 和 Elasticsearch 的代码,就能实现音讯的即时生产文档的即时更新

须要从音讯中取出的数据

下面的构造体对每个字段都有正文,此示例只取dataoldtype三个字段:

// 将音讯体解析成 gjson.Resultbody := gjson.Parse(string(msg.Body))// 从音讯体中取 datadata := body.Get("data").Array()// 从音讯体中取 oldold := body.Get("old").Array()// 从音讯体中取 typecanalTypeStr := body.Get("type").String()
依据不同的操作以不同的形式更新数据

本示例中的仅包含非 DDL 操作,仅限于根本的增、删、改,因为数据已同步到 es 中,所以 查 应该在 es 中进行。

switch canalType {    case canal.DELETE:    ...    case canal.UPDATE:    ...    case canal.INSERT:    ...    default:    log.Fatal("未知操作", canalType)}

5 操作后果

设置环境变量(可选操作):

export HOST=192.168.31.129

运行示例,示例我的项目在core目录中:

cd corego run main.go

而后在数据库中增加一篇文章:

INSERT INTO blog.blog_articles(title, content, created_date)VALUES('test9', '这是第 9 篇测试文章', '2020-01-01');

在终端中就能看见日志:

...2021/05/08 15:05:41 已创立新的文档: map[content:这是第 9 篇测试文章 created_date:2020-01-01 id:10 title:test9]...

在 es 中查问一下id=10的文档:

curl -X GET "http://localhost:9200/canal_es/_doc/10?pretty"

查问后果:

{  "_index" : "canal_es",  "_type" : "_doc",  "_id" : "10",  "_version" : 1,  "_seq_no" : 14,  "_primary_term" : 1,  "found" : true,  "_source" : {    "content" : "这是第 9 篇测试文章",    "created_date" : "2020-01-01",    "id" : "10",    "title" : "test9"  }}

在数据库中更新一下这篇文章的创立日期:

UPDATE blog.blog_articlesSET created_date='2009-04-15'WHERE id=10;

终端日志:

2021/05/08 15:15:08 文档已存在,行将更新...[200 OK] {"_index":"canal_es","_type":"_doc","_id":"10","_version":2,"result":"updated","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":15,"_primary_term":1}2021/05/08 15:15:08 已更新文档:id=10, new-data=map[created_date:2009-04-15]

再查问一下这篇文章信息,后果为:

{  "_index" : "canal_es",  "_type" : "_doc",  "_id" : "10",  "_version" : 2,  "_seq_no" : 15,  "_primary_term" : 1,  "found" : true,  "_source" : {    "content" : "这是第 9 篇测试文章",    "created_date" : "2009-04-15",    "id" : "10",    "title" : "test9"  }}

可见,创立日期曾经更新。

上面删除这篇文章:

DELETE FROM blog.blog_articlesWHERE id=10;

终端日志:

2021/05/08 15:18:03 行将删除文档  102021/05/08 15:18:03 已删除: {"id":"10","title":"test9","content":"这是第 9 篇测试文章","created_date":"2009-04-15"}

再查问一下这篇文档:

{  "_index" : "canal_es",  "_type" : "_doc",  "_id" : "10",  "found" : false}

es 中也已删除此文章。


示例完结。