canal 是什么?
canal [kə’næl],译意为水道 / 管道 / 沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和生产
基于日志增量订阅和生产的业务包含
- 数据库镜像
- 数据库实时备份
- 索引构建和实时保护(拆分异构索引、倒排索引等)
- 业务 cache 刷新
- 带业务逻辑的增量数据处理
以后的 canal 反对源端 MySQL 版本包含 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
工作原理
[](https://github.com/alibaba/ca…
基于下面的解说,咱们在实现 canal 之前,先简略做一个主从复制。一主 一从
- 首先下载 mysql 镜像, 并启动
docker pull mysql:latest
docker run -itd --name mysql-1 -p 23306:3306 -e MYSQL_ROOT_PASSWORD=root mysql
docker run -itd --name mysql-2 -p 23307:3306 -e MYSQL_ROOT_PASSWORD=root mysql
- 相干命令再解释一下:
name xxx :xxx 为容器名
p 111:222 其中 111 是宿主机端口,222 是容器端口
MYSQL_ROOT_PASSWORD=root 设置 root 账户明码为 root
- 进入容器测试一下,一切正常
- 设置 mysql- 1 为主,mysql- 2 为从库
- 批改一下 mysql 的配置,装置 vim 编辑器
apt-get update
apt-get install vim
- 在主库 创立一个 mysql 账户给从库应用
CREATE USER 'slave'@'%' IDENTIFIED BY '123456';
GRANT REPLICATION SLAVE, REPLICATION CLIENT ON . TO 'slave'@'%';
FLUSH PRIVILEGES;
- 批改一下从服务器
[mysqld]
pid-file = /var/run/mysqld/mysqld.pid
socket = /var/run/mysqld/mysqld.sock
datadir = /var/lib/mysql
secure-file-priv= NULL
server_id=100
log-bin=mysql-slave-bin
relay_log=edu-mysql-relay-bin
- 退出重启从服务器 docker
- 进入从服务器 执行
mysql> change master to master_host='172.17.0.4', master_user='slave', master_password='123456', master_port=3306, master_log_file='edu-mysql-bin.000001', master_log_pos= 877, master_connect_retry=30;
相干命令解释
master_port:Master 的端口号,指的是容器的端口号
master_user:用于数据同步的用户
master_password:用于同步的用户的明码
master_log_file:指定 Slave 从哪个日志文件开始复制数据,即上文中提到的 File 字段的值
master_log_pos:从哪个 Position 开始读,即上文中提到的 Position 字段的值
master_connect_retry:如果连贯失败,重试的工夫距离,单位是秒,默认是 60 秒
在 Slave 中的 mysql 终端执行 show slave status \G; 用于查看主从同步状态。
- 呈现一下信息阐明配置胜利
- 接下来再主库写数据,从库同步胜利
- 简略的主从同步实现了,然而咱们要想,怎么实现的主从同步,对吧;
其实就是 通过 同步二进制日志文件,从服务器 会起一个 io 过程,读取二进制文件同步到 从服务器
- 简略看一下二进制文件的内容;
为什么再 将 canal 之前要先说主从复制呢,其实 canal 就是把本人伪装成了从服务器,从而读取日志,拿到数据;
应用 docker 部署 canal
参考链接
docker pull canal/canal-server:latest
# 下载脚本
wget https://raw.githubusercontent.com/alibaba/canal/master/docker/run.sh
# 构建一个 destination name 为 test 的队列,address 对应的数据库 ip+ 端口,dbUsername 对应数据库用户名,dbPassword 对应数据库明码,留神批改为本人的
sh run.sh -e canal.auto.scan=false \
-e canal.destinations=test \
-e canal.instance.master.address=172.17.0.4:3306 \
-e canal.instance.dbUsername=canal \
-e canal.instance.dbPassword=canal \
-e canal.instance.connectionCharset=UTF-8 \
-e canal.instance.tsdb.enable=true \
-e canal.instance.gtidon=false \
- 启动之后能够进入容器,看一下外面的 日志, 如果呈现了标红的信息,阐明胜利,否则就查看外面的报错信息吧!也不难
- 配合 php 查看数据变动(此处不限 php,java,go,python 等都有接口)
- 多语言连贯 https://github.com/alibaba/canal
php 监听数据变动
- canal-php
canal-php 是阿里巴巴开源我的项目 Canal 是阿里巴巴 mysql 数据库 binlog 的增量订阅 & 生产组件 的 php 客户端。为 php 开发者提供一个更敌对的应用 Canal 的形式。Canal 是 mysql 数据库 binlog 的增量订阅 & 生产组件。
基于日志增量订阅 & 生产反对的业务:
- 数据库镜像
- 数据库实时备份
- 多级索引 (卖家和买家各自分库索引)
- search build
- 业务 cache 刷新
- 价格变动等重要业务音讯
对于 Canal 的更多信息请拜访 https://github.com/alibaba/canal/wiki
- 利用场景
canal-php 作为 Canal 的客户端,其利用场景就是 Canal 的利用场景。对于利用场景在 Canal 介绍一节已有概述。举一些理论的应用例子:
1. 代替应用轮询数据库形式来监控数据库变更,无效改善轮询消耗数据库资源。
2. 依据数据库的变更实时更新搜索引擎,比方电商场景下商品信息产生变更,实时同步到商品搜索引擎 Elasticsearch、solr 等
3. 依据数据库的变更实时更新缓存,比方电商场景下商品价格、库存产生变更实时同步到 redis
4. 数据库异地备份、数据同步
5. 依据数据库变更触发某种业务,比方电商场景下,创立订单超过 xx 工夫未领取被主动勾销,咱们获取到这条订单数据的状态变更即可向用户推送音讯。
6. 将数据库变更整顿成本人的数据格式发送到 kafka 等音讯队列,供音讯队列的消费者进行生产。
- 工作原理
canal-php 是 Canal 的 php 客户端,它与 Canal 是采纳的 Socket 来进行通信的,传输协定是 TCP,交互协定采纳的是 Google Protocol Buffer 3.0。
- 工作流程
- 应用组件装置,此处我有一个 laravel 框架,间接在 laravel 外面装置应用了,相干代码贴出
# 装置组件 canal-php
composer require xingwenge/canal_php
# 编写脚本监听
<?php
namespace App\Console\Commands;
use xingwenge\canal_php\CanalClient;
use xingwenge\canal_php\CanalConnectorFactory;
use xingwenge\canal_php\Fmt;
use Illuminate\Console\Command;
ini_set('display_errors', 'On');
error_reporting(E_ALL);
class CanalDemo extends Command
{
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'CanalDemo';
/**
* The console command description.
*
* @var string
*/
protected $description = '测试 canal';
/**
* Create a new command instance.
*
* @return void
*/
public function __construct()
{parent::__construct();
}
/**
* Execute the console command.
*
* @return mixed
*/
public function handle()
{
try {$client = CanalConnectorFactory::createClient(CanalClient::TYPE_SOCKET_CLUE);
# $client = CanalConnectorFactory::createClient(CanalClient::TYPE_SWOOLE);
$client->connect("172.17.0.5", 11111);// 此处批改为本人的配置
$client->checkValid();
$client->subscribe("1001", "test", ".*\\..*");// 对应启动容器时 test 的队列名
while (true) {$message = $client->get(100);
if ($entries = $message->getEntries()) {foreach ($entries as $entry) {Fmt::println($entry);
}
}
sleep(1);
}
$client->disConnect();} catch (\Exception $e) {echo $e->getMessage(), PHP_EOL;
}
}
}
- 更改数据
- 监听后果
更多文章请微信搜寻公众号 < 老 A 技术联盟 > 或拜访博主网站易查网