共计 4221 个字符,预计需要花费 11 分钟才能阅读完成。
Catalog(目录)提供了对于数据库、表格和拜访数据所需的信息的元数据,以及对立的 API 来治理元数据,验证连贯,让元数据对 Sources(数据源)、Sinks(数据汇)和 Web 可拜访。
Catalog 让用户可能援用其数据系统中的现有元数据,并主动映射到 SeaTunnel 的对应元数据。总之,Catalog 大大简化了应用用户现有零碎开始应用 SeaTunnel 的步骤,并显著加强了用户体验。
Catalog 性能的重要性
目前,许多现有性能都是基于 Catalog 实现的,例如 CDC(变更数据捕捉)多表同步性能,咱们应用 Catalog 获取表格和字段列表。
Apache SeaTunnel 目前正在设计一个叫做 SaveMode 的性能,它是由连接器实现的,用于反对指标表中现有表格构造和数据的解决。这些性能也是基于 Catalog 实现的。
Catalog 是如何设计的?如何实现一个新的 Catalog?以下是具体介绍。
Catalog API
初始化操作
留神:目录名称目前没有被应用,预计会提供给 Web 后端进行保留和查问。
Java
public interface CatalogFactory extends Factory {String factoryIdentifier(); OptionRule optionRule(); Catalog createCatalog(String catalogName, ReadonlyConfig options); } public interface Catalog extends AutoCloseable {void open() throws CatalogException; void close() throws CatalogException;}
数据库操作
java
public interface Catalog extends AutoCloseable {// -------------------------------------------------------------------------------------------- // 数据库 // -------------------------------------------------------------------------------------------- String getDefaultDatabase() throws CatalogException; boolean databaseExists(String databaseName) throws CatalogException; List<String> listDatabases() throws CatalogException; void createDatabase(String databaseName, boolean ignoreIfExists) throws DatabaseAlreadyExistException, CatalogException; void dropDatabase(String databaseName, boolean ignoreIfNotExists) throws DatabaseNotExistException, CatalogException; }
表格操作
java
public interface Catalog extends AutoCloseable {// -------------------------------------------------------------------------------------------- // 表格 // -------------------------------------------------------------------------------------------- List<String> listTables(String databaseName) throws CatalogException, DatabaseNotExistException; boolean tableExists(TablePath tablePath) throws CatalogException; CatalogTable getTable(TablePath tablePath) throws CatalogException, TableNotExistException; void createTable(TablePath tablePath, CatalogTable table, boolean ignoreIfExists) throws TableAlreadyExistException, DatabaseNotExistException, CatalogException; void dropTable(TablePath tablePath, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException; }
这里是一个曾经实现的示例。
MySQL Catalog
MySQL Catalog 的应用形式:
- username [String] 连贯到数据库服务器时要应用的数据库名称。
- password [String] 连贯到数据库服务器时要应用的明码。
- base-url [String] URL 必须蕴含数据库,例如 “jdbc:mysql://localhost:5432/db” 或 “jdbc:mysql://localhost:5432/db?useSSL=true”。
- table-names [List] 要捕捉的数据库表格名称列表。表格名称须要包含数据库名称,例如:database_name.table_name。
- database-pattern [String] 要捕捉的数据库名称的正则表达式。
- table-pattern [String] 要捕捉的数据库表格名称的正则表达式。表格名称须要包含数据库名称,例如:database_.\.table_.。
配置文件配置
conf
{[connector-factory-id] {catalog { factory = "MySQL" username = "test" password = "123456" base-url = "jdbc:mysql://localhost:5432/db" table-names = [ "db.table"] } } }
如何应用 Catalog
对于反对 Catalog 的连接器,咱们将关上一个 Catalog 参数来配置所应用的 Catalog:
示例
sql
env {"job.mode"=STREAMING "job.name"="cdc_mysql_to_mysql" "checkpoint.interval"="2000" "custom_parameters"=""} source {MySQL-CDC { parallelism = 1 catalog { factory ="MySQL"# 默认状况下,Catalog 将应用与连接器同名的选项} username ="mysqluser"password ="mysqlpw"database-names = ["seatunnel-test"] table-pattern ="seatunnel-test\\.orders_\\d+"base-url ="jdbc:mysql://localhost:54508/seatunnel-test"} } sink {jdbc { url ="jdbc:mysql://localhost:4000/test"driver ="com.mysql.cj.jdbc.Driver"catalog { factory ="MySQL"username ="root"password ="" base-url = "jdbc:mysql://localhost:4000/test" table-pattern = "seatunnel-test2\\.orders_\\d+"} user = "root" password = ""query ="insert into sink(age, name) values(?,?)" } }
将来布局
目前,咱们只实现了局部 Catalog。将来,咱们打算扩充 Catalog 的实现范畴,包含更多反对 Catalog 的连接器,这将使更多的连接器反对 SaveMode 和主动表格创立等性能。
Apache SeaTunnel 是一个分布式、高性能、易扩大、用于海量数据(离线 & 实时)同步和转化的数据集成平台
- 仓库地址:https://github.com/apache/seatunnel
- 网址:https://seatunnel.apache.org/
- Proposal:https://cwiki.apache.org/confluence/display/INCUBATOR/SeaTunn…
- Apache SeaTunnel 下载地址:https://seatunnel.apache.org/download
衷心欢送更多人退出!
咱们置信,在「Community Over Code」(社区大于代码)、「Open and Cooperation」(凋谢合作)、「Meritocracy」(精英治理)、以及「多样性与共识决策」等 The Apache Way 的指引下,咱们将迎来更加多元化和容纳的社区生态,共建开源精力带来的技术提高!
咱们诚邀各位有志于让外乡开源立足寰球的搭档退出 SeaTunnel 贡献者小家庭,一起共建开源!
- 提交问题和倡议:https://github.com/apache/seatunnel/issues
- 奉献代码:https://github.com/apache/seatunnel/pulls
- 订阅社区开发邮件列表 : dev-subscribe@seatunnel.apache.org
- 开发邮件列表:dev@seatunnel.apache.org
- 退出 Slack: https://join.slack.com/t/apacheseatunnel/shared_invite/zt-1kc…
-
关注 Twitter: https://twitter.com/ASFSeaTunnel
本文由 白鲸开源科技 提供公布反对!