关于大数据:RocketMQ-Flink-Catalog-设计与实践

40次阅读

共计 5523 个字符,预计需要花费 14 分钟才能阅读完成。

摘要:本文为 RocketMQ Flink Catalog 使用指南。次要内容包含:

  1. Flink 和 Flink Catalog
  2. RocketMQ Flink Connector
  3. RocketMQ Flink Catalog

作者:李晓双,Apache RocketMQ Contributor

Mentor:蒋晓峰,Apache RocketMQ Committer

一、Flink 和 Flink Catalog

Flink 是一个分布式计算引擎,目前曾经实现批流一体,能够实现对有界数据和无界数据的解决。须要无效调配和治理计算资源能力执行流式应用程序。

目前 Flink API 共形象为四个局部:

  • 最顶层的形象为 SQL。SQL 形象与 Table API 形象之间的关联是十分严密的,并且 SQL 查问语句能够在 Table API 中定义的表上执行。
  • 第二层形象为 Table API。Table API 是以表(Table)为核心的申明式编程(DSL)API,例如在流式数据场景下,它能够示意一张正在动静扭转的表。
  • 第三层形象是 Core APIs。 许多程序可能应用不到最底层的 API 而是能够应用 Core APIs 进行编程:其中蕴含 DataStream API(利用于有界 / 无界数据流场景)和 DataSet API(利用于有界数据集场景)两局部。
  • 第四层形象为有状态的实时流解决。

Flink Catalog 提供了元数据信息,例如数据库、表、分区、视图以及数据库或其余内部零碎中存储的函数和信息。Flink 对于元数据的治理分为长期的、长久化的两种。内置的 GenericInMemoryCatalog 是基于内存实现的 Catalog,所有元数据只在 session 的生命周期内可用。JdbcCatalog 和 HiveCatalog 就是能够长久化元数据的 Catalog。

Flink Catalog 是扩大的,反对用户自定义。为了在 Flink SQL 中应用自定义 Catalog,用户须要通过实现 CatalogFactory 接口来实现对应的 Catalog 工厂。该工厂是应用 Java 的服务提供者接口 (SPI) 发现的。能够将实现此接口的类增加到 META_INF/services/org.apache.flink.table.factories.FactoryJAR 文件中。

二、RocketMQ Flink Connector

RocketMQ 连接器为 Flink 提供从 RocketMQ Topic 中生产和写入数据的能力。Flink 的 Table API & SQL 程序能够连贯到其余内部零碎,用于读取和写入批处理和流式表。Source 提供对存储在内部零碎(例如数据库、键值存储、音讯队列或文件系统)中的数据的拜访。Sink 将数据发送到内部存储系统。

该项目标 Github 仓库是:https://github.com/apache/roc…

三、RocketMQ Flink Catalog

3.1 设计与实现

3.1.1 RocketMQ Flink Catalog 的设计次要分为两步

  • 实现一个 RocketMqCatalogFactory 基于字符串属性创立已配置 Catalog 实例的工厂。将此实现类增加到 META_INF/services/org.apache.flink.table.factories.Factory 中。
  • 继承 AbstractCatalog 实现 RocketMqCatalog,通过实现 Catalog 接口中的办法,实现对数据库、表、分区等信息的查问操作。

类图如下:

3.1.2 RocketMQ Flink Catalog 的存储

RocketMQ Flink Catalog 的底层存储应用的是 RocketMQ Schema Registry。Flink 调用 Catalog 的时候,在 AbstractCatalog 的实现类中通过 RocketMQ Schema Registry 的客户端和 RocketMQ Schema Registry 服务端进行交互。

  • Database : 返回默认的 default。
  • Table : 从 RocketMQ Schema Registry 获取对应的 Schema,而后解析 IDL 转换成 DataType。
  • Partition : 通过 DefaultMQAdminExt 从 RocketMQ 中获取到 Partition 相干信息。

RocketMQ Schema Registry 是一个 Topic Schema 的管理中心。它为 Topic(RocketMQ Topic)的注册、删除、更新、获取和援用模式提供了一个 RESTful 接口。New RocketMQ 客户端通过将 Schema 与 Subject 关联起来,能够间接发送结构化数据。用户不再须要关怀序列化和反序列化的细节。

3.1.3 RocketMQ Flink Catalog 反对的 API

目前 RocketMQ Flink Catalog 反对对 Database、Table、Partition 的查问和判断是否存在的操作,不反对创立、批改、删除。所以在应用之前须要通过 RocketMQ Schema Registry 来创立好对应的 Schema。

3.2 使用指南

表环境(TableEnvironment)是 Flink 中集成 Table API & SQL 的外围概念。它负责:

  • 在外部的 Catalog 中注册 Table。
  • 注册内部的 Catalog。
  • 加载可插拔模块。
  • 执行 SQL 查问。
  • 注册自定义函数(scalar、table 或 aggregation)。
  • 将 DataStream 或 DataSet 转换成 Table。
  • 持有对 ExecutionEnvironment 或 StreamExecutionEnvironment 的援用。

3.2.1 创立并注册 Catalog

Table API

RocketMQCatalog rocketMqCatalog = new RocketMQCatalog("rocketmq_catalog", "default", "http://localhost:9876", "http://localhost:8080");
tableEnvironment.registerCatalog("rocketmq_catalog", rocketMqCatalog);

SQL

TableResult tableResult = tableEnvironment.executeSql(
                "CREATE CATALOG rocketmq_catalog WITH (" +
                        "'type'='rocketmq_catalog',"+"'nameserver.address'='http://localhost:9876'," +
                        "'schema.registry.base.url'='http://localhost:8088');");

3.2.2 批改以后的 Catalog

Table API

tableEnvironment.useCatalog("rocketmq_catalog");

SQL

tableEnvironment.executeSql("USE CATALOG rocketmq_catalog");

3.2.3 列出可用的 Catalog

Table API

String[] catalogs = tableEnvironment.listCatalogs();

SQL

TableResult tableResult = tableEnvironment.executeSql("show catalogs");

3.2.4 列出可用的 Database

Table API

String[] databases = tableEnvironment.listDatabases();

SQL

TableResult tableResult = tableEnvironment.executeSql("show databases");

3.2.5 列出可用的 Table

Table API

String[] tables = tableEnvironment.listTables();

SQL

TableResult tableResult = tableEnvironment.executeSql("show tables");

3.3 Quick Start

须要提前准备可用的 RocketMQ、RocketMQ Schema Registry:

  • RocketMQ 部署:https://rocketmq.apache.org/docs/ 介绍 /02quickstart
  • RocketMQ Schema Registry 部署:https://github.com/apache/roc…

3.3.1 创立 Topic

创立两个 Topic,rocketmq_source 和 rocketmq_sink。

3.3.2 注册 Source Schema

curl -X POST -H "Content-Type: application/json" \
-d '{"schemaIdl":"{"type":"record","name":"rocketmq_source_schema","namespace":"namespace","fields":[{"name":"name","type":"string"}]}"}' \
http://localhost:8088/schema-registry/v1/subject/rocketmq_source/schema/rocketmq_source_schema

3.3.3 注册 Sink Schema

curl -X POST -H "Content-Type: application/json" \
-d '{"schemaIdl":"{"type":"record","name":"rocketmq_sink_schema","namespace":"namespace","fields":[{"name":"name","type":"string"}]}"}' \
http://localhost:8088/schema-registry/v1/subject/rocketmq_sink/schema/rocketmq_sink_schema

3.3.4 增加依赖

创立一个工作我的项目,增加 rocketmq-flink 的依赖 :

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-flink</artifactId>
    <version>1.0.0-SNAPSHOT</version>
</dependency>

目前 RocketMQ Schema Registry 还没有公布正式的版本,只有快照版,如果发现 jar 找不到,能够尝试以下办法:

<repositories>
    <repository>
        <id>snapshot-repos</id>
        <name>Apache Snapshot Repository</name>
        <url>https://repository.apache.org/snapshots/</url>
        <snapshots>
            <enabled>true</enabled>
        </snapshots>
        <layout>default</layout>
    </repository>
</repositories>

3.3.5 创立工作

/**
 * @author lixiaoshuang
 */
public class RocketMqCatalog {public static void main(String[] args) {
        // 初始化表环境参数
        EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().inStreamingMode().build();
        // 创立 table 环境
        TableEnvironment tableEnvironment = TableEnvironment.create(environmentSettings);


        // 注册 rocketmq catalog
        tableEnvironment.executeSql(
                "CREATE CATALOG rocketmq_catalog WITH (" +
                        "'type'='rocketmq_catalog',"+"'nameserver.address'='http://localhost:9876'," +
                        "'schema.registry.base.url'='http://localhost:8088');");
        tableEnvironment.executeSql("USE CATALOG rocketmq_catalog");

        // 从 rocketmq_source 中获取数据写入到 rocketmq_sink 中
        TableResult tableResult = tableEnvironment.executeSql("INSERT INTO rocketmq_sink /*+ OPTIONS" +
                "('producerGroup'='topic_producer_group') */ select * from rocketmq_source /*+ OPTIONS" +
                "('consumerGroup'='topic_consumer_group') */");
    }
}

启动工作并运行当前,关上 RocketMQ 控制台,往 rocketmq_source 这个 Topic 发送一条音讯。

而后再查看 rocketmq_sink 的状态,就会发现音讯曾经通过写入到 rocketmq_sink 中了。

点击查看更多技术内容


正文完
 0