应用 binlog 的起因
近期须要重构一个老零碎,须要从几个服务中实时同步订单的批改到重构表里。
这里就面临两个抉择,
- 在每个服务的 mysql 操作前埋点,发送批改信息到队列或服务上。这种计划须要批改多个服务的代码并且测试对原零碎的影响,有额定开发和测试老本。
- 同步 mysql 的 binlog,依据表的 insert 和 update 更新新表。然而须要保护一个 binlog 同步的服务
本次抉择了 binlog 同步的形式。搭建的 binlog 服务也能够用在之后新零碎的缓存更新和同步 ES 索引上,绝对于埋点这种只有一次性作用的形式,性价比较高。
工具调研:canal 和 mysql-binlog-connector-java
1.canal
要实现 binlog 同步服务,应用较多的开源计划是 canal,运行比较稳定,而且性能也很丰盛。
然而在理论部署服务的时候,遇到了一些问题:
-
canal 采纳了 client-server 的模式,至多须要部署两个集群。咱们的我的项目都是应用公有云部署,为了稳固运行,就有额定的资源和保护开销。
- 起初发现 canal server 能够投递信息到 kafka。然而咱们的音讯队列是自研的,只能尝试去改源码。
- canal 的 server 是残缺独立的包,无奈间接用 springboot 嵌套。而咱们的根底组件都依赖于 springboot,比方监控,配置核心等。
- canal 的 HA 部署应用的是 zookeeper,很惋惜咱们并没有可用的 zookeper 集群,也没有资源重新部署一个。
- 单机部署的时候,曾经解决的 binlog postion 是保留在文件外面的,咱们用的公有云 docker,重启后全失落。
2.mysql-binlog-connector-java
调研同时也发现了另一个 binlog 同步工具,mysql-binlog-connector-java
。
这是一个开源的 binlog 同步工具,性能很简略,就是接管 binlog 信息。作为一个依赖 jar 能够很容易在 springboot 中应用。
然而没有对 binlog 的内容做格式化解决,应用很不不便。当然更没有保存信息和分布式部署性能。
自研工具包 binlogportal
基于这些问题,咱们须要一个具备以下个性的 binlog 同步工具:
-
能够应用 springboot 加载运行,具备较好的扩展性
- 说白了就是作为一个 jar 包,凋谢出接口能够自定义解决 binlog 信息的形式
- 能够应用 redis 实现 binlog position 的保留和分布式部署
为了满足这些条件,通过对 mysql-binlog-connector-java
封装后,实现了自研的工具 binlogportal。
- 提供了 binlogportal-spring-boot-starter 包,可应用 spring boot 疾速部署
- 应用 redis 保留 binlog position 信息,重启后可从上次 position 地位开始
- 以后反对 insert 和 update 的结构化
- 提供默认的 http 事件处理器。可通过实现 IEventHandler 接口,自定义事件处理器
- 应用 redis 作为分布式协调器,可多机部署实现高可用
应用阐明
Mysql 配置
- Mysql 须要开启 binlog 并设置为 row 模式
- 同步 binlog 应用的 mysql 账号,须要增加 REPLICATION 权限,示例如下:
CREATE USER binlogportal IDENTIFIED BY '123456';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'binlogportal'@'%';
GRANT ALL PRIVILEGES ON *.* TO 'binlogportal'@'%';
FLUSH PRIVILEGES;
通过 spring boot 构建我的项目
- 间接依赖 binlogportal-spring-boot-starter
<dependency>
<groupId>com.insistingon.binlogportal</groupId>
<artifactId>binlogportal-spring-boot-starter</artifactId>
<version>1.0.5</version>
</dependency>
- 通过 spring boot 的 application.yml 配置启动器
binlogportal:
enable: true # 是否启用 autoconfig
distributed-enable: true # 是否启用分布式部署
distributed-redis: # distributed-enable 为 true 时,要提供一个 redis 作为分布式协调器
host: 127.0.0.1
port: 6379
auth:
position-redis: # 保留 binlog position 的 redis,必须配置
host: 127.0.0.1
port: 6379
auth:
db-config: # 数据库配置,能够有多个,key 自定义即可
d1:
host: 0.0.0.0
port: 3306
user-name: binlogportal
password: 123456
handler-list: [logEventHandler] # 该数据库应用的事件处理器,名称为 spring 的 bean name
http-handler: # 启用自带的 http 事件处理器,可发送申请
url-list: [http://127.0.0.1:8988/testit] # 要发送的 url 列表,http 参数为对立的格局
result-callback: httpCallBack # 配置自定义的后果处理器,须要实现 IHttpCallback 接口,值为 bean name
-
Starter 启动
- spring boot autoconfig 启动胜利后,会把 BinlogPortalStarter 的实例注入到 IOC 中
- 我的项目中通过注入的形式获取 binlogPortalStarter 应用
- binlogPortalStarter.start()会为每个 mysql 库创立一个线程解决 binlog
- 上面是应用 CommandLineRunner 启动 starter 的一个例子
@Slf4j
@Component
public class BinlogSync implements CommandLineRunner {
@Resource
BinlogPortalStarter binlogPortalStarter;
public void run(String... args) throws Exception {
try {binlogPortalStarter.start();
} catch (BinlogPortalException e) {log.error(e.getMessage(), e);
}
}
}
非 spring boot 我的项目
- 非 spring boot 我的项目,能够应用根底包
<dependency>
<groupId>com.insistingon.binlogportal</groupId>
<artifactId>binlogportal</artifactId>
<version>1.0.5</version>
</dependency>
- 依赖后实现配置类
BinlogPortalConfig
和SyncConfig
,传入 Starter 中运行即可
public class TestClass{public static void main(String[] args) {SyncConfig syncConfig = new SyncConfig();
syncConfig.setHost("0.0.0.0");
syncConfig.setPort(3306);
syncConfig.setUserName("binlogportal");
syncConfig.setPassword("123456");
BinlogPortalConfig binlogPortalConfig = new BinlogPortalConfig();
binlogPortalConfig.addSyncConfig(syncConfig);
RedisConfig redisConfig = new RedisConfig("127.0.0.1", 6379);
RedisPositionHandler redisPositionHandler = new RedisPositionHandler(redisConfig);
binlogPortalConfig.setPositionHandler(redisPositionHandler);
binlogPortalConfig.setDistributedHandler(new RedisDistributedHandler(redisConfig));
BinlogPortalStarter binlogPortalStarter = new BinlogPortalStarter();
binlogPortalStarter.setBinlogPortalConfig(binlogPortalConfig);
try {binlogPortalStarter.start();
} catch (BinlogPortalException e) {e.printStackTrace();
}
}
}
2. 分布式部署实现
我的项目中高可用实现是基于 redis 的分布式锁。
每个实例都会加载全副数据库的配置,在创立 binlog 连贯之前,先要获取 redis 锁,获取锁后会定时刷新锁的过期工夫。所有实例会定时从新抢锁。
同一个 mysql 库的 binlog 文件和 position 会保留在 redis 里,如果一个实例宕机。新抢到锁的实例在初始化时,会应用上个实例已保留的 binlog 信息持续获取。
我的项目源码可在 Git 上查看。
我的项目的 Git 地址:https://github.com/dothetrick…
以上内容属集体学习总结,如有不当之处,欢送在评论中斧正