作者:张高迪(花名杳天),Hologres研发。

同传统MySQL数据库,Hologres反对Hologres binlog,记录数据库中所有数据的变动事件日志。通过Hologres binlog,能够十分不便灵便的实现数据之间的复制、同步。同时在大数据场景上,反对Flink间接生产Hologres Binlog,相较于传统数仓分层,Flink+Hologres Binlog能够实现残缺的事件驱动,实现ODS向DWD,DWD向DWS等的全实时加工作业,满足分层治理的前提下对立存储,晋升数据复用能力,并且大大缩短数据加工端到端提早,为用户提供一站式的实时数据仓库解决方案。在本文中,咱们将会介绍Hologres Binlog技术实现原理以及应用最佳实际。

Hologres Binlog介绍

1、什么是Binlog?

Binlog即二进制日志(Binary Log),这个概念常见于MySQL数据库,是用来记录数据库所有可能引起数据变动事件的日志,比方表构造变更(例如CREATE、ALTER TABLE…)以及表数据批改(INSERT、UPDATE、DELETE…)事件。

MySQL Binlog最后有两个主要用途:

  • 主从复制:主服务器将其Binlog文件中蕴含的事件发送到从服务器,从服务器执行这些事件以进行与主服务器雷同的数据更改,从而保障主从服务器之间的数据一致性。
  • 数据恢复:通过从新执行Binlog文件中记录的事件,使数据库复原到产生问题之前最新的状态。

Binlog也被用作流式的数据源。用户经常通过Debezium或者阿里开源的 Canal等数据采集工具,采集Binlog作为流式的数据源,从而实现数据从MySQL到其余类型数据库的数据同步,个别先将采集的数据输入到消息中间件如Kafka等,而后通过Flink引擎去生产数据并进行相应计算,最终写入到目标端。目标端能够是各种 DB,数据湖,实时数仓和离线数仓等。

2、Hologres Binlog与传统数据库Binlog的区别

Hologres Binlog与传统数据库Binlog的次要区别在于,前者个别只用于数据同步,而后者还利用于主从实例同步和数据恢复等高可用场景。因而两者的实现也有了肯定的差异,次要体现在以下方面:

Hologres的高可用通过基于WAL log复制实现的物理Replication来保障,详见技术揭秘:从双11看实时数仓Hologres高可用设计与实际。
  1. Hologres Binlog不会记录DDL操作。

Hologres的Binlog次要面向数据生产,不反对记录表构造变更(如CREATE TABLE、ALTER TABLE 等DDL)操作,只记录数据变更记录(INSERT、UPDATE、DELETE),记录形式相似MySQL Binlog的 ROW(row-based replication)模式,会残缺的记录每行数据的变更状况。

  1. Hologres Binlog较为灵便,是表级别的,能够按需关上和敞开,且能够为不同的表设置不同的TTL。

MySQL 开启Binlog是实例级别的,个别会占用比拟多的存储,并且关上Binlog往往须要重启集群;而Hologres 的Binlog更加细粒度,能够在应用中按需对某张表进行开启和敞开,并为不同的表设置不同的Binlog存活工夫,齐全不影响实例中的其余数据库和数据表,关上Binlog只会多出一份表级别的存储。

  1. Hologres Binlog能够十分不便的进行查问。

在应用中,用户能够通过在query中增加 hg_binlog_lsn,hg_binlog_event_type,hg_binlog_timetsamp_us 这三个字段来查问表的Binlog。如果查问的字段蕴含了上述三个字段之一,如select hg_binlog_lsn, * from test_message_src;便会主动路由到Binlog表进行查问。相对而言,用户在MySQL想要查看Binlog则不够直观,须要通过show binlog events命令或者MySQL Binlog工具去解析Binlog文件。须要留神的是,Hologres Binlog底层采纳行存表存储,因而在查问时,举荐过滤条件中蕴含hg_binlog_lsn字段来保障查问效率,依照其余业务字段查问会演变为全表扫描,表数据量大时查问会比较慢,应该尽量避免。

  1. Hologres作为分布式的实时数仓,Binlog同样也是分布式的。

Hologres Binlog与一般的Hologres表数据一样,散布在不同的shard上,读取Binlog等操作也以shard分片,相似kafka的partition,具体能够看下方的实现原理。

总的来说,Hologres Binlog能够简略的定义为:能够按需开关的,以行为单位记录Hologres某张表数据变更记录(INSERT、UPDATE、DELETE)的二进制日志。能够看到,Hologres Binlog自设计起便是为实时生产和用户应用而生的,相比MySQL Binlog领有的历史包袱,更加易用和灵便。

3、Hologres Binlog格局

Hologres 的Binlog蕴含如下字段。

字段阐明:

阐明:

  • UPDATE操作会产生两条Binlog记录,别离为更新前和更新后的记录。订阅Binlog性能会保障这两条记录是间断的且更新前的Binlog记录在前,更新后的Binlog记录在后。
  • 用户字段的程序与DDL定义的程序统一。

Hologres Binlog应用场景

1、数据实时复制、同步

Hologres通过Binlog实现了逻辑Replication,从而能够订阅Binlog进行数据的复制和同步。

典型的逻辑复制应用场景有:

1、把一张Hologres的行存表复制成一张列存表,行存反对点查点写,列存反对多维分析型需要。

比方在阿里CCO的应用场景中,写入Hologres行存表的数据,会通过Hologres Connector Binlog订阅,将公共层明细数据有抉择的进行二次计算,并写入回Hologres列存应用层明细表中,提供给不同的应用层剖析和汇总场景。Hologres 1.1版本开始反对了同一张表行列共存的模式,须要通过Binlog手动实现行存转列存的场景更少了。

2、部分更新之后通过Binlog驱动整行计算

一些场景中,DWD层的数据更新之后,用户上游的计算逻辑须要残缺的表字段,但传统数据仓库可能反对的不够欠缺。比方阿里CCO在之前的架构中应用Lindorm,想要拿到残缺的表字段,就须要通过Hlog订阅,触发流工作反查事实表,将宽表字段对齐之后再输入到上游。而Hologres的Binlog,即便只更新局部字段,也会生成一条蕴含整行数据的Binlog。依据这种个性,部分更新也能够驱动整行数据的计算,无需反查便能够供应用层生产。

3、实时数据打宽。

在某些场景下,用户数据被分隔成面向主题、维度的多张表,而最终剖析时须要将不同的表进行关联,便能够通过Hologres Binlog作为驱动进行双流join、维表关联等操作进行实时数据打宽,进而应用大宽表进行数据分析。

4、用于Hologres不同版本或不同实例之间的数据迁徙。

在开发中,很多用户都会有测试实例和业务实例,这个时候通过Binlog将局部业务数据实时的同步到测试实例进行新业务的开发就十分不便;有些用户在跨大版本升级时,也能够通过Binlog逻辑复制在高版本实例进行残缺的业务流程验证,保障实例降级版本之后稳固可用。

2、实时数仓分层

Hologres Binlog联合Flink CDC,能够实现事件驱动的加工开发,实现ODS向DWD,DWD向DWS等的全实时加工作业。满足分层治理的前提下对立存储,并且大大缩短数据加工端到端提早,为用户提供一站式的实时数据仓库解决方案。

以下是传统的数仓分层架构。

传统数仓分层架构通过Kafka驱动Flink,Flink 在计算过程中查问一些KeyValue 零碎(如HBase)做一些维表的关联,实现数据的拉宽。拉宽之后还会把这个后果从新写到 Kafka 另外一个 Topic 外面,而后做二次的聚合、汇总,生成一些DWS 或者ADS,最初把后果存在 OLAP/HBase 零碎。其中后果之所以须要存在KV和数仓两种零碎中,是因为查问和剖析业务的需要不同,一份数据可能须要存在多处,所以传统数仓分层架构常常会存在肯定的数据冗余。

上图是应用Flink联合Hologres实现的剖析服务一体化的实时数仓架构。 此架构不仅实现了剖析服务的交融,缩小了数据的割裂,而且精简了数据加工链路,通过Hologres Binlog来实现事件驱动,替换各个阶段的音讯队列服务(Kafka)。能够看到,剖析服务一体化的数仓分层架构,岂但能够无效的降低成本,而且简化了业务逻辑,从而大大提高开发效率,升高运维老本。

3、数据变动监控

在数据库应用尤其在新业务的开发中,可能会有想要晓得数据变更状况的场景,判断本身的业务逻辑是否精确,这种状况下也能够通过关上Binlog来不便的进行数据变动监控。

Hologres Binlog实现原理

1、Binlog表

须要提前理解的常识是,Hologres表存储构造次要分为行存和列存以及1.1版本引入的行列共存。列存表实用于OLAP场景,适宜各种简单查问、数据关联、扫描、过滤和统计;行存表实用于 KV(key-value)场景 ,适宜基于primary key的点查和扫描;行列共存则能够看作同时创立了一张行存表和一张列存表,holo会保证数据的一致性,从而达到既反对高效点查也反对OLAP剖析的成果。

Binlog表能够看作一张非凡的行存表

上面以一个行存表为例,讲述Binlog的实现原理。

一个一般的行存表如下,能够看到是KV模式的, key 为表的主键,value 为表的其余字段。

为某张表关上Binlog,能够了解为是新创建了一张以hg_binlog_lsn为key, 业务表原有字段、hg_binlog_event_type以及hg_binlog_timestamp_us字段则组合起来作为value的行存表, 能够看到Binlog表的字段是固定的,也能够说是强Schema的,程序与业务表DDL定义的程序统一。

下图是一张关上Binlog的行存表,留神不是在原有表的根底上加列,而是新建了一张外部非凡表(在holo中体现为原表的一个非凡index,所以对原表和Binlog表进行写入等操作都是原子的),两者同时存在。因而开启Binlog会占用更多的存储空间。

Binlog如何保障正确性

在首次揭秘云原生Hologres存储引擎一文中,介绍了Hologres的存储引擎,这里简略温习一下存储引擎架构以及单分片写入的过程。

  • 存储引擎架构

Hologres存储引擎的根本形象是分布式的表,为了让零碎可扩大,咱们把表切分为分片(Table Group Shard,简称Shard)。每个分片(Shard)形成了一个存储管理和复原的单元 (Recovery Unit)。上图显示了一个分片的根本架构。一个分片由多个tablet组成,这些tablet会共享一个预写式日志(Write-Ahead Log,WAL)。

Hologres存储引擎用WAL来保证数据的原子性和持久性。当INSERT、UPDATE、DELETE操作产生时,存储引擎先写WAL,再写到对应tablet的MemTable中,等到MemTable积攒到肯定的规模或者到了肯定的工夫,就会把这个MemTable切换为不可更改的flushing MemTable, 并新开一个 MemTable接管新的写入申请。 而这个不可更改的flushing MemTable就能够刷磁盘,变成不可更改的文件; 当不可更改的文件生成后,数据就能够算长久化。 当零碎产生谬误解体后,零碎重启时会去WAL读日志,复原还没有长久化的数据。

  • 单分片写入

上图展现了Hologres单分片写入的过程,这里咱们只关注前两个步骤:WAL管理器在接管到单分片写申请后,(1)为写申请调配一条Log Sequence Number (LSN,在holo表中能够通过暗藏字段hg_sequence_number查问),这个LSN是由工夫戳和递增的序号组成,并且(2)创立一条新的日志,并在文件系统中的长久化这条日志。这条日志蕴含了复原写操作所需的信息,在齐全保留这条日志后,才向tablet提交写入。

  • Binlog生成工夫

Binlog 数据生成在写WAL之前,即上方介绍的单分片写入中的第一步。其中hg_binlog_lsn间接复用了Log Sequence Number(在Binlog表中查问hg_sequence_number和hg_binlog_lsn两个字段,能够发现这两个字段完全相同)。在生成Log Sequence Number时应用零碎工夫作为hg_binlog_timestamp_us。判断写入类型之后才创立WAL日志实现写入。在holo中Binlog表体现为原表的一个非凡index,原表和Binlog表数据变更是同时产生的,从而保障了binlog数据和表原始数据肯定是完全一致的。

2、表Binlog的生成过程

上面别离以行存表和列存表为例,演示Binlog的生成过程。

开启Binlog的行存表的变更过程

图中进行了三次操作:

  1. 插入主键为id1的数据,生成相应的Binlog,evnet_type为5
  2. 插入主键为id2的数据,生成相应的Binlog,evnet_type为5
  3. 更新主键为id2的数据,能够看到更新操作会生成两条Binlog, 其event_type别离为3和7,并且有雷同的timestamp_us

开启Binlog的列存表的更新过程

一个一般的列存表如下,能够看到与行存表有显著区别,数据是依照列来进行存储的。

下图展现了对一个已有3条数据的行存表进行更新生成Binlog的过程,与行存表相似,更新操作会生成两条Binlog, 其event_type别离为3和7,并且有雷同的timestamp_us。须要留神的是,这次的update操作只更新了body这个字段,但因为Binlog记录的是整行所有字段的数据,因而在生成Binlog的过程中,须要通过id去点查列存表这行数据各个字段的值,这并非列存表善于的操作,绝对行存表会耗费更多的资源,因而个别举荐应用行存表开启Binlog。

3、Binlog更多实现细节

  • Binlog TTL

与表的TTL(time_to_live_in_seconds)不同,Binlog 有独自的ttl,能够看作是Binlog表的TTL。通过以下DDL语句为表独自设置Binlog TTL。Binlog会在生存工夫到期后被清理,倡议不要少于7天。

call set_table_property('test_message_src', 'binlog.ttl', '86400');--binlog.ttl,Binlog的TTL,单位为秒
  • Binlog 数据分布

Binlog表的shard数与原表的shard数雷同,并且也有雷同的Distribution key ,就是说一条数据和其Binlog数据肯定散布在同一个shard上。

  • Binlog 有序性

Binlog是单shard保序,单个shard上的hg_binlog_lsn枯燥递增,不同shard之间的hg_binlog_lsn可能雷同。除了更新操作生成的两条hg_binlog_timestamp_us完全相同外,单个shard上hg_binlog_timestamp_us也是递增的。

  • Binlog的数据量和表的更新频率成正比

Hologres Binlog的最佳实际

在理解了Binlog的实现原理后,咱们来看看应用Binlog的最佳实际。

1、建表并开启Binlog

举荐建表语句如下:

begin;create table test_message_src(  id int primary key,   title text not null,   body text);call set_table_property('test_message_src', 'orientation', 'row');--创立行存表test_message_srccall set_table_property('test_message_src', 'binlog.level', 'replica');--设置表属性开启Binlog性能call set_table_property('test_message_src', 'binlog.ttl', '864000');--binlog.ttl,Binlog的TTL,单位为秒,这里是7天commit;

上方的DDL中有几个须要留神的细节:

  1. 举荐应用行存表。绝对行存表,列存表生成Binlog会耗费更多的资源,因而举荐应用行存表来开启BInlog进行应用。
  2. 反对按需关上。binlog.level设置为replica即示意为此表关上Binlog,1.1版本之后无需从新建表,能够为已有表关上Binlog,对于无需关上Binlog的表,也能够通过设置此参数为none进行敞开。
  3. 按需设置Binlog TTL。binlog.ttl默认值为1个月,举荐不少于7天,用户能够依照业务需要设置,正当的TTL能够无效升高存储使用量。

2、生产Hologres Binlog

目前Hologres Binlog反对两种生产形式。

1)Flink生产Hologres Binlog

实时计算Flink版有丰盛的生产Hologres的Binlog的能力。以下是FLink定义Hologres Binlog源表的DDL。

create table binlog_source_table(  hg_binlog_lsn BIGINT,  hg_binlog_event_type BIGINT,  hg_binlog_timestamp_us BIGINT,  id INTEGER,  title VARCHAR,  body VARCHAR) with (  'connector'='hologres',  'dbname'='<yourDbname>',  'tablename'='<yourTablename>',  'username'='<yourAccessID>',  'password'='<yourAccessSecret>',  'endpoint'='<yourEndpoint>',  'binlog' = 'true',   -- 开启binlog生产  'cdcMode' = 'true',  -- cdc模式  'binlogStartUpMode' = 'initial', -- 先读取历史全量数据,再增量生产Binlog。);

Flink生产Binog有以下个性和应用细节:

1)反对CDC模式

CDC模式下生产的Binlog数据,将依据hg_binlog_event_type主动为每行数据设置精确的Flink RowKind类型(INTERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER),这样就能实现表的数据的镜像同步,相似MySQL和Postgres的CDC性能。

2)反对全增量一体化生产

全增量一体的生产会先读取数据库的历史全量数据,并平滑切换到Binlog读取增量数据。

3)举荐依据表shard数调整源表并发

Hologres的Binlog是Shard级别保序的,在应用实时计算生产Binlog时,举荐调整源表的并发数与Binlog表的Shard数雷同,从而能够每个Shard对应一个并发,保证数据的有序性。

2)JDBC生产Hologres Binlog

Hologres肯定水平上兼容了PostgreSQL的logical replication接口,能够通过相应接口应用JDBC生产Hologres的Binlog,玩转更多的场景。以下是一个简略的应用示例,权限以及清理创立的组件等详见JDBC生产Hologres Binlog。

应用之前的筹备步骤:

  1. 首先筹备一张关上Binlog的表,能够参考上方最佳实际。
  2. 关上Binlog的extension,extension是DB级别的,一个数据库只须要创立一次。
create extension hg_binlog;
  1. 为表创立Publication,与表是一对一的关系
create publication hg_publication_test_1 for table test_message_src;
  1. 创立Replication Slot并绑定到Publication上,一个publication能够绑定多个replication_slot

一个Replication Slot示意一个数据的更改流,该Replication Slot也与以后生产进度绑定,会保护Binlog生产的点位信息,使得生产端Failover之后能够从之前曾经Commit的点位进行复原,实现断点续传。

call hg_create_logical_replication_slot('replication_slot_name', 'hgoutput', 'publication_name');

应用JDBC依照以下形式生产Binlog:

// 创立PGReplicationStream并绑定至Replicaiton slotPGReplicationStream pgReplicationStream = pgConnection.getReplicationAPI().replicationStream()    .logical()    .withSlotName("hg_replication_slot_1") //指定Replication Slot的名称    .withSlotOption("parallel_index", "0") //并发序列号,对应表的shard id    .withSlotOption("batch_size", "1024")  //单次获取的Binlog最少量大小    .withSlotOption("start_time", "2021-01-01 00:00:00") //从某个工夫点位开始生产    .withSlotOption("start_lsn","0") //从某个lsn之后开始生产,优先级高于start_time    .start();// 创立holo-clientHoloConfig holoConfig = new HoloConfig();holoConfig.setJdbcUrl(url);holoConfig.setUsername(username);holoConfig.setPassword(password);HoloClient client = new HoloClient(holoConfig);// 创立Binlog decoder用于Decode binary数据,schema须要通过HoloClient获取TableSchema schema = client.getTableSchema("test_message_src", true);HoloBinlogDecoder decoder = new HoloBinlogDecoder(schema);// 生产数据ByteBuffer byteBuffer = pgReplicationStream.readPending();while (true) {    if (byteBuffer != null) {        List<BinlogRecord> records = decoder.decode(byteBuffer);        Long latestLsn = 0L;        for (BinlogRecord record : records) {            latestLsn = record.getBinlogLsn();            // Do Something            System.out.println( "lsn: " + latestLsn + ", record: " + Arrays.toString(record.getValues()));        }        // Commit Binlog 点位信息        pgReplicationStream.setFlushedLSN(LogSequenceNumber.valueOf(latestLsn));        pgReplicationStream.forceUpdateStatus();    }    byteBuffer = pgReplicationStream.readPending();}
3)应用Holo-Client生产Binlog

Holo-Client将JDBC生产Binlog能力进行了集成,能够不便的通过指定生产开始的工夫进行整表Binlog的生产,同时也反对为每个Shard独自设置启动位点,举荐应用Holo-client来生产Binlog。

建表等筹备操作与JDBC生产Hologres Binlog雷同,客户端生产示例如下。

import com.alibaba.hologres.client.BinlogShardGroupReader;import com.alibaba.hologres.client.HoloConfig;import com.alibaba.hologres.client.HoloClient;import com.alibaba.hologres.client.model.Record;import com.alibaba.hologres.client.model.TableSchema;import java.util.Arrays;public class HoloBinlogExample {    public static void main(String[] args) throws Exception {        String username = "";        String password = "";        String url = "jdbc:postgresql://ip:port/database";        String tableName = "test_message_src";        String slotName = "hg_replication_slot_1";        // 创立client的参数        HoloConfig holoConfig = new HoloConfig();        holoConfig.setJdbcUrl(url);        holoConfig.setUsername(username);        holoConfig.setPassword(password);        holoConfig.setBinlogReadBatchSize(128);        HoloClient client = new HoloClient(holoConfig);        // 生产binlog的申请,tableName和slotname为必要参数,Subscribe有StartTimeBuilder和OffsetBuilder两种,此处以前者为例        Subscribe subscribe = Subscribe.newStartTimeBuilder(tableName, slotName)                                       .setBinlogReadStartTime("2021-01-01 12:00:00+08")                                       .build();        // 创立binlog reader        BinlogShardGroupReader reader = client.binlogSubscribe(subscribe);        BinlogRecord record;        while ((record = reader.getBinlogRecord()) != null) {            //handle record        }    }}                    

应用Holo Client生产Binlog时能够指定如下参数。

参数是否必须默认值阐明
binlogReadBatchSize1024从每个Shard单次获取的Binlog最大批次大小,单位为行。
binlogHeartBeatIntervalMs-1binlogRead 发送BinlogHeartBeatRecord的距离。-1示意不发送。当binlog没有新数据,每距离binlogHeartBeatIntervalMs会下发一条BinlogHeartBeatRecord,此record的。timestamp示意截止到这个工夫的数据都曾经生产实现。
binlogIgnoreDeletefalse是否疏忽Delete类型的Binlog。
binlogIgnoreBeforeUpdatefalse是否疏忽BeforeUpdate类型的Binlog。
retryCount3生产失败时的重试次数,胜利生产时重试次数会被重置。

总结

Hologres的Binlog性能为生产而生,能够表粒度的按需开启,十分易用和灵便。Hologres Binlog联合Flink CDC,能够实现事件驱动的加工开发,实现ODS向DWD,DWD向DWS等的全实时加工作业。满足分层治理的前提下对立存储,并且大大缩短数据加工端到端提早,升高学习老本,晋升开发效率。

后续咱们将会陆续推出无关Hologres的技术底层原理揭秘系列,敬请继续关注!往期精彩内容:

  • 2020年VLDB的论文《 Alibaba Hologres: A cloud-Native Service for Hybrid Serving/Analytical Processing
  • Hologres揭秘:首次揭秘云原生Hologres存储引擎
  • Hologres揭秘:Hologres高效率分布式查问引擎
  • Hologres揭秘:高性能原生减速MaxCompute外围原理
  • Hologers揭秘:优化COPY,批量导入性能晋升5倍+
  • Hologres揭秘:如何反对超高QPS在线服务(点查)场景
  • Hologres揭秘:从双11看实时数仓Hologres高可用设计与 实际
  • Hologres揭秘:Hologres如何反对超大规模部署与运维
  • Hologres揭秘:湖仓一体,Hologres减速云数据湖DLF技术原理解析
  • Hologres揭秘:高性能写入与更新原理解析