乐趣区

关于canal:基于canal实现mysql的数据同步

canal 是什么?

canal [kə’næl],译意为水道 / 管道 / 沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和生产

基于日志增量订阅和生产的业务包含

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时保护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

以后的 canal 反对源端 MySQL 版本包含 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

工作原理

[](https://github.com/alibaba/ca…

基于下面的解说,咱们在实现 canal 之前,先简略做一个主从复制。一主 一从

  • 首先下载 mysql 镜像, 并启动
docker pull mysql:latest
docker run -itd --name mysql-1 -p 23306:3306 -e MYSQL_ROOT_PASSWORD=root  mysql
docker run -itd --name mysql-2 -p 23307:3306 -e MYSQL_ROOT_PASSWORD=root  mysql
  • 相干命令再解释一下:
    name xxx :xxx 为容器名
    p 111:222 其中 111 是宿主机端口,222 是容器端口
    MYSQL_ROOT_PASSWORD=root 设置 root 账户明码为 root

  • 进入容器测试一下,一切正常

  • 设置 mysql- 1 为主,mysql- 2 为从库
  • 批改一下 mysql 的配置,装置 vim 编辑器
apt-get update
apt-get install vim
  • 在主库 创立一个 mysql 账户给从库应用
CREATE USER 'slave'@'%' IDENTIFIED BY '123456';
GRANT REPLICATION SLAVE, REPLICATION CLIENT ON . TO 'slave'@'%';
FLUSH PRIVILEGES;

  • 批改一下从服务器
[mysqld]
pid-file        = /var/run/mysqld/mysqld.pid
socket          = /var/run/mysqld/mysqld.sock
datadir         = /var/lib/mysql
secure-file-priv= NULL
server_id=100
log-bin=mysql-slave-bin
relay_log=edu-mysql-relay-bin
  • 退出重启从服务器 docker
  • 进入从服务器 执行
mysql> change master to master_host='172.17.0.4', master_user='slave', master_password='123456', master_port=3306, master_log_file='edu-mysql-bin.000001', master_log_pos= 877, master_connect_retry=30;

相干命令解释
master_port:Master 的端口号,指的是容器的端口号
master_user:用于数据同步的用户
master_password:用于同步的用户的明码
master_log_file:指定 Slave 从哪个日志文件开始复制数据,即上文中提到的 File 字段的值
master_log_pos:从哪个 Position 开始读,即上文中提到的 Position 字段的值
master_connect_retry:如果连贯失败,重试的工夫距离,单位是秒,默认是 60 秒
在 Slave 中的 mysql 终端执行 show slave status \G; 用于查看主从同步状态。

  • 呈现一下信息阐明配置胜利

  • 接下来再主库写数据,从库同步胜利

  • 简略的主从同步实现了,然而咱们要想,怎么实现的主从同步,对吧;

其实就是 通过 同步二进制日志文件,从服务器 会起一个 io 过程,读取二进制文件同步到 从服务器

  • 简略看一下二进制文件的内容;

为什么再 将 canal 之前要先说主从复制呢,其实 canal 就是把本人伪装成了从服务器,从而读取日志,拿到数据;

应用 docker 部署 canal

参考链接

docker pull canal/canal-server:latest
# 下载脚本
wget https://raw.githubusercontent.com/alibaba/canal/master/docker/run.sh 

# 构建一个 destination name 为 test 的队列,address 对应的数据库 ip+ 端口,dbUsername 对应数据库用户名,dbPassword 对应数据库明码,留神批改为本人的
sh run.sh -e canal.auto.scan=false \
-e canal.destinations=test \
-e canal.instance.master.address=172.17.0.4:3306  \
-e canal.instance.dbUsername=canal  \
-e canal.instance.dbPassword=canal  \
-e canal.instance.connectionCharset=UTF-8 \
-e canal.instance.tsdb.enable=true \
-e canal.instance.gtidon=false  \
  • 启动之后能够进入容器,看一下外面的 日志, 如果呈现了标红的信息,阐明胜利,否则就查看外面的报错信息吧!也不难

  • 配合 php 查看数据变动(此处不限 php,java,go,python 等都有接口)
  • 多语言连贯 https://github.com/alibaba/canal

php 监听数据变动

  • canal-php

canal-php 是阿里巴巴开源我的项目 Canal 是阿里巴巴 mysql 数据库 binlog 的增量订阅 & 生产组件 的 php 客户端。为 php 开发者提供一个更敌对的应用 Canal 的形式。Canal 是 mysql 数据库 binlog 的增量订阅 & 生产组件。

基于日志增量订阅 & 生产反对的业务:

  1. 数据库镜像
  2. 数据库实时备份
  3. 多级索引 (卖家和买家各自分库索引)
  4. search build
  5. 业务 cache 刷新
  6. 价格变动等重要业务音讯

对于 Canal 的更多信息请拜访 https://github.com/alibaba/canal/wiki

  • 利用场景

canal-php 作为 Canal 的客户端,其利用场景就是 Canal 的利用场景。对于利用场景在 Canal 介绍一节已有概述。举一些理论的应用例子:

1. 代替应用轮询数据库形式来监控数据库变更,无效改善轮询消耗数据库资源。

2. 依据数据库的变更实时更新搜索引擎,比方电商场景下商品信息产生变更,实时同步到商品搜索引擎 Elasticsearch、solr 等

3. 依据数据库的变更实时更新缓存,比方电商场景下商品价格、库存产生变更实时同步到 redis

4. 数据库异地备份、数据同步

5. 依据数据库变更触发某种业务,比方电商场景下,创立订单超过 xx 工夫未领取被主动勾销,咱们获取到这条订单数据的状态变更即可向用户推送音讯。

6. 将数据库变更整顿成本人的数据格式发送到 kafka 等音讯队列,供音讯队列的消费者进行生产。

  • 工作原理

canal-php 是 Canal 的 php 客户端,它与 Canal 是采纳的 Socket 来进行通信的,传输协定是 TCP,交互协定采纳的是 Google Protocol Buffer 3.0。

  • 工作流程

  • 应用组件装置,此处我有一个 laravel 框架,间接在 laravel 外面装置应用了,相干代码贴出
 # 装置组件 canal-php
 composer require xingwenge/canal_php
# 编写脚本监听
<?php

namespace App\Console\Commands;
use xingwenge\canal_php\CanalClient;
use xingwenge\canal_php\CanalConnectorFactory;
use xingwenge\canal_php\Fmt;
use Illuminate\Console\Command;

ini_set('display_errors', 'On');
error_reporting(E_ALL);
class CanalDemo extends Command
{
    /**
     * The name and signature of the console command.
     *
     * @var string
     */
    protected $signature = 'CanalDemo';

    /**
     * The console command description.
     *
     * @var string
     */
    protected $description = '测试 canal';

    /**
     * Create a new command instance.
     *
     * @return void
     */
    public function __construct()
    {parent::__construct();
    }

    /**
     * Execute the console command.
     *
     * @return mixed
     */
    public function handle()
    {
        try {$client = CanalConnectorFactory::createClient(CanalClient::TYPE_SOCKET_CLUE);
            # $client = CanalConnectorFactory::createClient(CanalClient::TYPE_SWOOLE);

            $client->connect("172.17.0.5", 11111);// 此处批改为本人的配置
            $client->checkValid();
            $client->subscribe("1001", "test", ".*\\..*");// 对应启动容器时 test 的队列名
            while (true) {$message = $client->get(100);
                if ($entries = $message->getEntries()) {foreach ($entries as $entry) {Fmt::println($entry);
                    }
                }
                sleep(1);
            }

            $client->disConnect();} catch (\Exception $e) {echo $e->getMessage(), PHP_EOL;
        }

    }

}
  • 更改数据

  • 监听后果

更多文章请微信搜寻公众号 < 老 A 技术联盟 > 或拜访博主网站易查网

退出移动版