关于数据迁移:技术分享-OMS-初识

12次阅读

共计 13310 个字符,预计需要花费 34 分钟才能阅读完成。

作者:高鹏

DBA,负责我的项目日常问题排查,广告位长期出租。

本文起源:原创投稿

* 爱可生开源社区出品,原创内容未经受权不得随便应用,转载请分割小编并注明起源。


本文次要贡献者:进行 OMS 源码剖析的 @操盛春(北分之光)

一、走进 OMS

本文以 OMS 社区版 3.3.1 为例

咱们能够从官网的地址上获取到它的架构图,这是它大略的样子:

能够看到一个 OMS 数据迁徙工具它蕴含了很多的组件,有 DBCat、Store、Connector、JDBCWriter、Checker 和 Supervisor 等,组件的各个性能这里就不 copy 官网的叙述了,毕竟有手就行。接下来说点官网上没有的。

之前领导让我进行 OMS 性能测试时要顺便打印火焰图看一下,剖析迁徙过程中工夫都用在了哪里,然而当我登录 OMS 容器后看到很多相干 java 过程,一时间分不清哪个过程是做什么的,那么接下里咱们就对这些过程逐个阐明

1.Ghana-endpoint

[ActionTech ~]# ps uax | grep Ghana-endpoint
root        61  3.1  0.5 20918816 1582384 pts/0 Sl  Feb07 1756:47 java -Dspring.config.location=/home/ds/ghana/config/application-oms.properties -server -Xms1g -Xmx2g -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -Xloggc:/home/admin/logs/ghana/gc.log -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=10M -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/home/admin/logs/ghana -jar /home/ds/ghana/boot/Ghana-endpoint-1.0.0-executable.jar

Ghana-endpoint 负责提供 OMS 后盾治理界面、调度 TaskJob 和 StructTaskJob 程序

tips:

StructTaskJob:构造迁徙任务调度程序

TaskJob:

  • TaskJob::scheduledTask(),负责正向切换步骤的子工作执行的调度
  • TaskJob::scheduleMigrationProject(),负责构造迁徙我的项目所有步骤初始化 & 工作执行进度监控的调度

2.commons-daemon(CM)

[ActionTech ~]# ps uax | grep commons-daemon
root        50  297  1.7 25997476 4711620 pts/0 Sl  Feb07 163685:09 java -cp /home/ds/cm/package/deployapp/lib/commons-daemon.jar:/home/ds/cm/package/jetty/start.jar -server -Xmx4g -Xms4g -Xmn4g -Dorg.eclipse.jetty.util.URI.charset=utf-8 -Dorg.eclipse.jetty.server.Request.maxFormContentSize=0 -Dorg.eclipse.jetty.server.Request.maxFormKeys=20000 -DSTOP.PORT=8089 -DSTOP.KEY=cm -Djetty.base=/home/ds/cm/package/deployapp org.eclipse.jetty.start.Main

CM 集群治理过程,为 OMS 治理后盾过程提供接口,用于创立拉取增量日志、全量迁徙、增量同步、全量校验等工作,以及获取这些工作的执行进度

3.oms-supervisor

[ActionTech ~]# ps uax | grep oms-supervisor
ds          63  1.0  0.3 11780820 985860 pts/0 Sl   Feb07 566:35 java -server -Xms1g -Xmx1g -Xmn512m -verbose:gc -Xloggc:./log/gc.log -XX:+PrintGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -Dserver.port=9000 -DconfigDir=/u01/ds/supervisor/config/ -Dspring.main.allow-circular-references=true -jar ./bin/oms-supervisor.jar

oms-supervisor 过程用于启动执行拉取增量日志、全量迁徙、增量同步、全量校验等工作的过程(组件),并监控这些过程的状态

4.store

store 增量日志拉取过程,它是多过程合作,间接抓取 store 过程可能会看到上面一坨,其根过程为 ./bin/store,该过程有子过程及多个后辈过程。

  • ./bin/store:模仿源节点的从库,从源节点接管增量日志
  • /u01/ds/store/store7100/bin/metadata_builder:进行过滤、转换,写入文件,并对 DDL 进行解决

这些过程会继续一直的把须要迁徙表的增量日志拉取到 OMS 服务器上存储起来,以供增量同步工作应用。

5. 全量迁徙和全量校验

[ActionTech ~]# ps -ef | grep VEngine
UID          PID    PPID  C STIME TTY          TIME       CMD
ds         32635       1 99 11:21 pts/0    00:00:02       /opt/alibaba/java/bin/java -server -Xms8g -Xmx8g -Xmn4g -Xss512k -XX:ErrorFile=/u01/ds/bin/..//run//10.186.17.106-9000:90230:0000000016/logs/hs_err_pid%p.log -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/u01/ds/bin/..//run//10.186.17.106-9000:90230:0000000016/logs -verbose:gc -Xloggc:/u01/ds/bin/..//run//10.186.17.106-9000:90230:0000000016/logs/gc_%p.log -XX:+PrintGCDateStamps -XX:+PrintGCDetails -classpath lib/verification-core-1.2.54.jar:lib/verification-OB05-1.2.54.jar:lib/verification-TIDB-1.2.54.jar:lib/verification-MySQL-1.2.54.jar:lib/verification-OB-Oracle-Mode-1.2.54.jar:lib/verification-OB10-1.2.54.jar:lib/verification-Oracle-1.2.54.jar:lib/verification-DB2-1.2.54.jar:lib/verification-Sybase-1.2.54.jar:lib/verification-common-1.2.54.jar:lib/apache-log4j-extras-1.2.17.jar:lib/slf4j-log4j12-1.7.21.jar:lib/oms-conditions-3.2.3.12-SNAPSHOT.jar:lib/oms-common-3.2.3.12-SNAPSHOT.jar:lib/log4j-1.2.17.jar:lib/dws-rule-1.1.6.jar:lib/dws-schema-1.1.6.jar:lib/oms-record-3.2.3.12-SNAPSHOT.jar:lib/commons-io-2.6.jar:lib/metrics-core-4.0.2.jar:lib/connect-api-2.1.0.jar:lib/kafka-clients-2.1.0.jar:lib/slf4j-api-1.7.25.jar:lib/dss-transformer-1.0.10.jar:lib/calcite-core-1.19.0.jar:lib/dss-record-1.0.0.jar:lib/avatica-core-1.13.0.jar:lib/jackson-datatype-jsr310-2.11.1.jar:lib/jackson-databind-2.11.1.jar:lib/esri-geometry-api-2.2.0.jar:lib/jackson-dataformat-yaml-2.9.8.jar:lib/jackson-core-2.11.1.jar:lib/jackson-annotations-2.11.1.jar:lib/mysql-connector-java-5.1.47.jar:lib/oceanbase-1.2.1.jar:lib/druid-1.1.11.jar:lib/etransfer-0.0.65-SNAPSHOT.jar:lib/commons-lang3-3.9.jar:lib/aggdesigner-algorithm-6.0.jar:lib/commons-lang-2.6.jar:lib/fastjson-1.2.72_noneautotype.jar:lib/commons-beanutils-1.7.0.jar:lib/log4j-1.2.15.jar:lib/mapdb-3.0.8.jar:lib/kotlin-stdlib-1.2.71.jar:lib/annotations-16.0.3.jar:lib/calcite-linq4j-1.19.0.jar:lib/guava-29.0-jre.jar:lib/maven-project-2.2.1.jar:lib/maven-artifact-manager-2.2.1.jar:lib/maven-reporting-api-3.0.jar:lib/doxia-sink-api-1.1.2.jar:lib/doxia-logging-api-1.1.2.jar:lib/maven-settings-2.2.1.jar:lib/maven-profile-2.2.1.jar:lib/maven-plugin-registry-2.2.1.jar:lib/plexus-container-default-1.0-alpha-30.jar:lib/groovy-test-2.5.5.jar:lib/plexus-classworlds-1.2-alpha-9.jar:lib/junit-4.12.jar:lib/commons-dbcp2-2.5.0.jar:lib/httpclient-4.5.6.jar:lib/commons-logging-1.2.jar:lib/commons-collections4-4.1.jar:lib/oms-operator-3.2.3.12-SNAPSHOT.jar:lib/retrofit-2.9.0.jar:lib/jsr305-3.0.2.jar:lib/servlet-api-2.5.jar:lib/org.osgi.core-4.3.1.jar:lib/protobuf-java-3.11.0.jar:lib/maven-plugin-api-2.2.1.jar:lib/okhttp-3.14.9.jar:lib/maven-artifact-2.2.1.jar:lib/wagon-provider-api-1.0-beta-6.jar:lib/maven-repository-metadata-2.2.1.jar:lib/maven-model-2.2.1.jar:lib/plexus-utils-3.0.16.jar:lib/javax.annotation-api-1.3.2.jar:lib/javassist-3.20.0-GA.jar:lib/xml-apis-1.3.03.jar:lib/error_prone_annotations-2.3.4.jar:lib/easy-random-core-4.2.0.jar:lib/objenesis-3.1.jar:lib/commons-collections-3.2.2.jar:lib/lombok-1.18.16.jar:lib/antlr4-runtime-4.9.1.jar:lib/ojdbc8-19.7.0.0.jar:lib/orai18n-19.3.0.0.jar:lib/oceanbase-client-1.1.10.jar:lib/db2jcc-db2jcc4.jar:lib/jtds-1.3.1.jar:lib/javax.ws.rs-api-2.1.1.jar:lib/groovy-ant-2.5.5.jar:lib/groovy-cli-commons-2.5.5.jar:lib/groovy-groovysh-2.5.5.jar:lib/groovy-console-2.5.5.jar:lib/groovy-groovydoc-2.5.5.jar:lib/groovy-docgenerator-2.5.5.jar:lib/groovy-cli-picocli-2.5.5.jar:lib/groovy-datetime-2.5.5.jar:lib/groovy-jmx-2.5.5.jar:lib/groovy-json-2.5.5.jar:lib/groovy-jsr223-2.5.5.jar:lib/groovy-macro-2.5.5.jar:lib/groovy-nio-2.5.5.jar:lib/groovy-servlet-2.5.5.jar:lib/groovy-sql-2.5.5.jar:lib/groovy-swing-2.5.5.jar:lib/groovy-templates-2.5.5.jar:lib/groovy-test-junit5-2.5.5.jar:lib/groovy-testng-2.5.5.jar:lib/groovy-xml-2.5.5.jar:lib/groovy-2.5.5.jar:lib/sketches-core-0.9.0.jar:lib/json-path-2.4.0.jar:lib/janino-3.0.11.jar:lib/commons-compiler-3.0.11.jar:lib/hamcrest-core-1.3.jar:lib/gson-2.8.5.jar:lib/httpcore-4.4.13.jar:lib/commons-codec-1.15.jar:lib/ini4j-0.5.2.jar:lib/backport-util-concurrent-3.1.jar:lib/plexus-interpolation-1.11.jar:lib/failureaccess-1.0.1.jar:lib/listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar:lib/checker-qual-2.11.1.jar:lib/j2objc-annotations-1.3.jar:lib/classgraph-4.8.65.jar:lib/zstd-jni-1.3.5-4.jar:lib/lz4-java-1.5.0.jar:lib/snappy-java-1.1.7.2.jar:lib/ant-junit-1.9.13.jar:lib/ant-1.9.13.jar:lib/ant-launcher-1.9.13.jar:lib/ant-antlr-1.9.13.jar:lib/commons-cli-1.4.jar:lib/picocli-3.7.0.jar:lib/qdox-1.12.1.jar:lib/jline-2.14.6.jar:lib/junit-platform-launcher-1.3.2.jar:lib/junit-jupiter-engine-5.3.2.jar:lib/testng-6.13.1.jar:lib/avatica-metrics-1.13.0.jar:lib/commons-pool2-2.6.0.jar:lib/snakeyaml-1.23.jar:lib/memory-0.9.0.jar:lib/eclipse-collections-forkjoin-11.0.0.jar:lib/eclipse-collections-11.0.0.jar:lib/eclipse-collections-api-11.0.0.jar:lib/lz4-1.3.0.jar:lib/elsa-3.0.0-M5.jar:lib/okio-1.17.2.jar:lib/junit-platform-engine-1.3.2.jar:lib/junit-jupiter-api-5.3.2.jar:lib/junit-platform-commons-1.3.2.jar:lib/apiguardian-api-1.0.0.jar:lib/jcommander-1.72.jar:lib/kotlin-stdlib-common-1.2.71.jar:lib/opentest4j-1.1.1.jar:conf com.alipay.light.VEngine -t 10.186.17.106-9000:90230:0000000016 -c /home/ds//run/10.186.17.106-9000:90230:0000000016/conf/checker.conf start

全量迁徙和全量校验这两个工作的启动命令雷同,但它其实是两个过程,所以咱们无奈从 ps 命令中分辨出该过程是全量迁徙还是全量校验

想要辨别须要看 /home/ds//run/10.186.17.106-9000:90230:0000000016/conf/checker.conf 文件,该文件就是上述启动命令中 -c 参数所指定的配置文件。

condition.whiteCondition=[{"name":"sakila","all":false,"attr":{"dml":"d,i,u"},"sub":[{"name":"city"},{"name":"country"}]}]
condition.blackCondition=[{"name":"sakila","all":false,"sub":[{"name":"DRC_TXN*","func":"fn"},{"name":"drc_txn*","func":"fn"}]}]
datasource.master.type=MYSQL
......
task.split.mode=false
task.type=verify
task.checker_jvm_param=-server -Xms8g -Xmx8g -Xmn4g -Xss512k
task.id=90230
task.subId=3
task.resume=false

配置文件中的 task.type 配置项的值代表了过程类型

  • task.type=migrate:全量迁徙过程
  • task.type=verify:全量校验过程

6. 增量同步过程

[ActionTech ~]# ps aux | grep "coordinator\.Bootstrap"
ds       58500 34.7  0.9 44600844 2575092 pts/0 Sl  Feb08 18483:51 java -server -XX:+DisableExplicitGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/u01/ds/bin/../run/10.186.17.104-9000:p_4gmn723gtt28_dest-000-0:0002000001 -Djava.library.path=/u01/ds/bin/../plugins/jdbc_connector/ -Duser.home=/home/ds -Dlogging.path=/u01/ds/bin/../run/10.186.17.104-9000:p_4gmn723gtt28_dest-000-0:0002000001/logs -cp jdbc_connector.jar:jdbc-source-store.jar:jdbc-sink-ob-mysql.jar com.oceanbase.oms.connector.jdbc.coordinator.Bootstrap -c conf -s com.oceanbase.oms.connector.jdbc.source.store.StoreSource -d com.oceanbase.oms.connector.jdbc.sink.obmysql.OBMySQLJdbcSink -t 10.186.17.104-9000:p_4gmn723gtt28_dest-000-0:0002000001 start

该过程用于读取增量日志,结构成 SQL 语句在指标节点进行回放

7. 过程之间关系

二、迁徙流程

上述是组成 OMS 的各个组件过程以及它们之间的关系。接下来咱们用一个例子来简述下 OMS 外部工作流程。

以增量迁徙工作为例:

通过共事对源码的剖析,可得出迁徙流程大抵如下:

那么咱们晓得了迁徙流程,接下来如何去分析 OMS 迁徙快慢的问题想必是每一个运维共事最为关怀的点。

家喻户晓 OMS 的操作界面是非常简洁,明明用到了 influxDB,然而展现进去的信息还是十分的少,简直帮不上什么忙。

登录 influxDB 中能够看到如下相干监控项

["checker.dest.rps"]
["checker.dest.rt"]
["checker.dest.write.iops"]
["checker.source.read.iops"]
["checker.source.rps"]
["checker.source.rt"]
["checker.verify.dest.read.iops"]
["checker.verify.dest.rps"]
["checker.verify.dest.rt"]
["checker.verify.source.read.iops"]
["checker.verify.source.rps"]
["checker.verify.source.rt"]
["jdbcwriter.delay"]
["jdbcwriter.iops"]
["jdbcwriter.rps"]
["store.conn"]
["store.delay"]
["store.iops"]
["store.rps"]

除此之外 OMS 还应用了 io.dropwizard.metrics5 做局部监控,由 coordinator 过程每 10 秒打印一次 metrics,metrics 的日志门路:/u01/ds/run/$test_id/logs/msg/metrics.log,日志内容为 json 格局,记录较为具体。

[2023-03-18 22:49:18.101] [{"jvm":{"JVM":"jvm:[heapMemory[max:30360MB, init:2048MB, used:677MB, committed:1980MB], noHeapMemory[max:0MB, init:2MB, used:58MB, committed:63MB], gc[gcName:ParNew, count:2643, time:64034ms;gcName:ConcurrentMarkSweep, count:8, time:624ms;], thread[count:88]]"},"sink":{"sink_worker_num":0,"sink_total_transaction":4.5654123E7,"rps":0.0,"tps":0.0,"iops":0.0,"sink_total_record":4.5654123E7,"sink_commit_time":0.0,"sink_worker_num_all":64,"sink_execute_time":0.0,"sink_total_bytes":2.8912799892E10},"source":{"StoreSource[0]source_delay":5,"p_4gmn723gtt28_source-000-0source_record_num":1.36962569E8,"p_4gmn723gtt28_source-000-0source_iops":198.98,"StoreSource[0]source_status":"running","p_4gmn723gtt28_source-000-0source_dml_num":4.5654123E7,"p_4gmn723gtt28_source-000-0source_dml_rps":0.0,"p_4gmn723gtt28_source-000-0source_rps":0.0},"dispatcher":{"wait_dispatch_record_size":0,"ready_execute_batch_size":0},"frame":{"SourceTaskManager.createdSourceSize":1,"queue_slot1.batchAccumulate":0,"forward_slot0.batchAccumulate":0,"forward_slot0.batchCount":4.7873212E7,"queue_slot1.batchCount":4.7873212E7,"queue_slot1.rps":0.6999300122261047,"SourceTaskManager.sourceTaskNum":0,"forward_slot0.recordAccumulate":0,"forward_slot0.rps":0.6999300122261047,"queue_slot1.tps":0.6999300122261047,"forward_slot0.tps":0.6999300122261047,"queue_slot1.recordAccumulate":0,"queue_slot1.recordCount":4.7873212E7,"forward_slot0.recordCount":4.7873212E7}}]

metrics.log 是剖析迁徙瓶颈的重要工具,重点取 source、dispatch、sink 三个局部的监控值进行剖析。

1.source 阶段

作用:次要用于连贯源端数据库,拉取增量日志,单线程运行,读取 SQL 以事务模式向上游输入,因为是单线程运行,因而整个动作只能串行实现。

TransactionAssembler::TransactionAssemblerInner::generateTransaction()将读到的 SQL 拼接成事务;

TransactionAssembler::TransactionAssemblerInner::putTransaction()发送到 QueuedSlot 队列中。

通过 metrics.log 能够获取到如下监控值:

  • source_record_num: Source 阶段, 解决的行记录的总数 (在解决行记录 (将行记录拼接为事务) 时统计, TransactionAssembler.notify)
  • source_rps: Source 阶段, 每秒解决的行记录数 (TransactionAssembler.notify)
  • source_iops: Source 阶段, 每秒解决的行记录的大小 (TransactionAssembler.notify)
  • source_dml_num: Source 阶段, 解决的 DML 的行记录的总数 (TransactionAssembler.notify)
  • source_dml_rps: Source 阶段, 每秒解决的 DML 的行记录数 (TransactionAssembler.notify)
  • StoreSource[0]source_status: Source 阶段的状态, running/stopped
  • StoreSource[0]source_delay: Source 阶段的提早, 解决数据的工夫

能够利用 influxDB 将其制作成监控展现图,如下所示(局部):

2.dispatcher 阶段

作用:协调生产和生产之间的速度,负责从 QueuedSlot 队列读取事务,而后写入事务调度队列(DefaultTransactionScheduler::readyQueue),队列大小默认值为 16384,单线程。

  • dispatcher_ready_transaction_size: DefaultTransactionScheduler::readyQueue 队列中积压的事务数量,期待 SinkTaskManager 线程解决。
  • coordinator_dispatch_record_size: DefaultTransactionScheduler::readyQueue 队列中积压的事务的记录数量之和,期待 SinkTaskManager 线程解决。

3.sink 阶段

作用:从上游的事务调度队列中获取数据,而后在指标数据库进行回放。多线程,线程数由 workerNum 配置项指定,默认 16。

  • sink_tps: 每秒回放实现多少事务(在事务回放实现时统计)
  • sink_total_transaction: 回放事务的总数
  • sink_worker_num: 工作线程中繁忙线程的数量
  • sink_worker_num_all: 工作线程的总数
  • sink_rps: 每秒回放实现多少行记录
  • sink_total_record: 回放的行记录总数
  • sink_execute_time: 行记录的均匀执行工夫 = 事务的 SQL 执行工夫 / 事务的记录数
  • sink_commit_time: 事务的提交阶段的均匀工夫
  • sink_iops: 每秒回放的事务的行记录大小, 单位为 kb/s
  • sink_total_bytes: 回放的事务的行记录大小总数

三、OMS 优化

OMS 留给运维的优化空间并不多,本期咱们只围绕迁徙链路进行优化,下文列举出了两个最间接的优化形式。

1.sink_worker_number

通过一组监控图来看查看 OMS 是否存在性能问题

在压测过程中,sink_worker_number 和 wait_dispatch_record 监控视图出现进去的曲线简直统一。很明的能够看出是因为上游的生产能力的有余导致事务全副积压在 dispatch 的队列中,并且将 dispatch 的队列都打满了。

找到问题起因后优化也就变得简略了,只须要调大 sink_worker_number 的线程数就能够,批改如下图所示。

运维监控 – 组件 – 更新

找到“JDBCWriter.worker_num”值,倡议批改成宿主机 CPU 逻辑核数,最大不超过 CPU 逻辑核数 *4(具体数值可依据环境动静调整)。

2. 迁徙选项

OMS 在创立迁徙工作时,如果迁徙工作中蕴含全量迁徙的话能够抉择“全量迁徙并发速度”,档位越高占用的系统资源也就越多,如果宿主机资源丰盛,能够抉择“疾速”。

如下是不同档位所须要的资源,这是官网自带的模板,也能够依据理论状况调整。

注:如果仅仅抉择增量迁徙是没有该选项的。

3. 其余优化形式只能具体起因具体分析了。

正文完
 0