乐趣区

关于java:Datastream-开发打包问题

简介:Datastream 作业开发时往往会遇到一些 jar 包抵触等问题,本文次要解说作业开发时须要引入哪些依赖以及哪些须要被打包进作业的 jar 中,从而防止不必要的依赖被打入了作业 jar 中以及可能产生的依赖抵触。

Datastream 作业开发时往往会遇到一些 jar 包抵触等问题,本文次要解说作业开发时须要引入哪些依赖以及哪些须要被打包进作业的 jar 中,从而防止不必要的依赖被打入了作业 jar 中以及可能产生的依赖抵触。

一个 Datastream 作业次要波及下述依赖:

Flink 的外围依赖以及应用程序本身的依赖

每一个 Flink 应用程序都依赖于一系列相干的库,其中至多应该包含 Flink 的 API. 许多应用程序还依赖于连接器相干的库(如 Kafka, Cassandra 等). 在运行 Flink 应用程序时,无论是在运行在分布式的环境下还是在本地 IDE 进行测试,Flink 的运行时相干依赖都是必须的。

与大多数运行用户自定义应用程序的零碎一样,Flink 中有两大类依赖项:

  • Flink 外围依赖:Flink 自身由一组运行零碎所必须的类和依赖项组成,例如协调器、网络、检查点、容错、API、算子(例如窗口)、资源管理等。所有这些类和依赖项的汇合形成了 Flink 运行时的外围,在 Flink 应用程序启动时必须存在。这些外围类和依赖项都被打包在 flink-dist jar 中。它们是 Flink 的 lib 文件夹的一部分,也是 Flink 根底容器镜像的一部分。这些依赖之于 Flink 就像 Java 运行所需的蕴含 String 和 List 等类的外围库(rt.jar、charsets.jar 等)之于 Java。Flink 的外围依赖不蕴含任何连接器或扩大库(CEP、SQL、ML 等),这使得 Flink 的外围依赖尽可能小,以防止默认状况下类门路中有过多的依赖项,同时缩小依赖抵触。
  • 用户应用程序依赖项:指特定用户应用程序所需的所有连接器、Format 或扩大库。用户应用程序通常被打包成一个 jar 文件,其中蕴含利用程序代码以及所需的连接器和库依赖项。用户应用程序依赖项不应包含 Flink DataStream API 和运行时依赖项,因为这些曾经被蕴含在了 Flink 的外围依赖中。

依赖配置步骤

1. 增加根底依赖

每一个 Flink 应用程序的开发至多须要增加对相干 API 的根底依赖。

手动配置我的项目时,须要增加对 Java/Scala API 的依赖 (这里以 Maven 为例,在其余构建工具(Gradle,SBT 等) 中能够应用同样的依赖)。

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-java_2.11</artifactId>
  <version>1.12.3</version>
  <scope>provided</scope>
</dependency>

重要提醒:请留神,所有这些依赖项都将其范畴设置为 ”provided”。这意味着须要对它们进行编译,但不应将它们打包到我的项目生成的应用程序 jar 文件中——这些依赖项是 Flink 外围依赖项,在理论运行时曾经被加载。

强烈建议将依赖项设置成 ”provided” 的范畴, 如果未将它们设置为 ”provided”,最好的状况下会导致生成的 jar 变得臃肿,因为它还蕴含所有 Flink 外围依赖项。而最怀的状况下,增加到应用程序 jar 文件中的 Flink 外围依赖项与您本人的一些依赖项会产生版本抵触(通常通过 Flink 的反向类加载机制来防止)。

对于 IntelliJ 的注意事项:为了使应用程序在 IntelliJ IDEA 中运行,有必要在运行配置中勾选 ”Include dependencies with “Provided” scope” 选项框。如果没有该选项(可能是因为应用较旧的 IntelliJ IDEA 版本),那么一个简略的解决办法是创立一个调用应用程序 main() 办法的测试用例。

2. 增加连接器和库的依赖

大多数应用程序的运行须要特定的连接器或库,例如 Kafka、Cassandra 等连接器。这些连接器不是 Flink 外围依赖项的一部分,必须作为额定依赖项增加到应用程序中。

下述代码是增加 Kafka 连接器依赖项的示例(Maven 语法):

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>1.12.3</version>
</dependency>

咱们倡议将利用程序代码和它所有的依赖以 jar-with-dependencies 的模式打包到一个 application jar 中。这个应用程序 jar 包能够被提交到曾经存在的 Flink 集群下来,或者被退出到 Flink 应用程序的容器镜像中去。

从 Maven 作业模版 (见下文 Maven 作业模版局部) 创立的我的项目,通过 mvn clean package 命令会主动把依赖打到应用程序的 jar 包中去。对于没有应用模版进行配置的状况,倡议应用 Maven Shade Plugin (配置如附录所示) 来构建蕴含依赖的 jar 包。

重要提醒:对于 Maven(和其余构建工具)来说,要将依赖项正确打包到应用程序 jar 中,这些应用程序依赖项的 scope 必须指定为 ”compile”(与外围依赖项不同,外围依赖项的 scope 必须指定为 ”provided”)。

注意事项

Scala 版本

Scala 的不同版本 (2.11,2.12 等) 相互之间是不兼容的。因而,Scala 2.11 对应的 Flink 版本不能用于应用 Scala 2.12 的应用程序。

所有依赖 (或传递依赖) 于 Scala 的 Flink 依赖项都以构建它们的 Scala 版本作为后缀,例如 flink-streaming-scala_2.11。

只应用 Java 进行开发时能够抉择任何 Scala 版本,应用 Scala 开发时须要抉择与其应用程序的 Scala 版本匹配的 Flink 依赖版本。

注:2.12.8 之后的 Scala 版本与之前的 2.12.x 版本不兼容,因而 Flink 我的项目无奈将其 2.12.x 版本升级到 2.12.8 之后的版本。用户能够在本地本人编译对应 Scala 版本的 Flink。为了使其可能失常工作,须要增加 -Djapicmp.skip 以在构建时跳过二进制兼容性查看。

Hadoop 依赖

个别的规定: 永远不要将 Hadoop 相干依赖间接增加到应用程序中.(惟一的例外是将现有的 Hadoop 输出 / 输入 Format 与 Flink 的 Hadoop 兼容包一起应用时)

如果心愿将 Flink 与 Hadoop 联合应用,则须要蕴含 Hadoop 依赖的 Flink 启动项,而不是将 Hadoop 增加为应用程序依赖项。Flink 将应用 HADOOP_CLASSPATH 环境变量指定的 Hadoop 依赖项,可通过以下形式进行设置:

export HADOOP_CLASSPATH=hadoop classpath“

这种设计有两个次要起因:

  • 一些与 Hadoop 的交互可能产生在 Flink 的外围模块中,并且在用户应用程序启动之前,例如为检查点设置 HDFS、通过 Hadoop 的 Kerberos 令牌进行身份验证,或者在 YARN 上进行部署等。
  • Flink 的反向类加载机制从外围依赖项中暗藏了许多可传递的依赖项。这不仅实用于 Flink 本人的外围依赖项,而且实用于 Hadoop 的依赖项。这样,应用程序就能够应用雷同依赖项的不同版本,而不会产生依赖项抵触(置信咱们,这是一件小事,因为 Hadoop 依赖树十分宏大。)

如果在 IDE 外部的测试或开发过程中须要 Hadoop 依赖项(例如 HDFS 拜访),请将这些依赖项的 scope 配置为

test 或则 provided。

Transform table connector/format resources #
Flink 应用 Java 的 Service Provider Interfaces (SPI) 机制通过特定标识符加载 table 的 connector/format 工厂。因为每个 table 的 connector/format 的名为 org.apache.flink.table.factories.Factory 的 SPI 资源文件位于同一目录:META-INF/services 下,因而在构建应用多个 table connector/format 的我的项目的 uber jar 时,这些资源文件将互相笼罩,这将导致 Flink 无奈正确加载工厂类。

在这种状况下,举荐的办法是通过 maven shade 插件的 ServicesResourceTransformer 转换 META-INF/services 目录下的这些资源文件。给定示例的 pom.xml 文件内容如下,其中蕴含连接器 flink-sql-connector-hive-3.1.2 和 flink-parquet format。

    <modelVersion>4.0.0</modelVersion>
    <groupId>org.example</groupId>
    <artifactId>myProject</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <!--  other project dependencies  ...-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-sql-connector-hive-3.1.2__2.11</artifactId>
            <version>1.13.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-parquet__2.11<</artifactId>
            <version>1.13.0</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <executions>
                    <execution>
                        <id>shade</id>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers combine.children="append">
                                <!-- The service transformer is needed to merge META-INF/services files -->
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                                <!-- ... -->
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

在配置了 ServicesResourceTransformer 之后, 我的项目构建 uber-jar 时,META-INF/services 目录下的这些资源文件会被整合在一起而不是互相笼罩。

Maven 作业模版

强烈建议应用该形式进行配置,能够缩小很多反复的配置工作。

前置要求

惟一的环境要求是装置了 Maven 3.0.4(或更高版本)和 Java 8.x。

创立我的项目

应用以下两种形式中的一种创立我的项目:

  • 应用 Maven archetypes
$ mvn archetype:generate                               \
  -DarchetypeGroupId=org.apache.flink              \
  -DarchetypeArtifactId=flink-quickstart-java      \
  -DarchetypeVersion=1.12.3

这容许您命名新创建的我的项目。它将以交互方式要求您输出 groupId、artifactId 和包名。

  • 运行 quickstart 脚本
$ curl https://flink.apache.org/q/quickstart.sh | bash -s 1.12.3

咱们建议您将此我的项目导入 IDE 以开发和测试它。IntelliJ IDEA 原生反对 Maven 我的项目。如果应用 Eclipse,能够应用 m2e 插件导入 Maven 我的项目。默认状况下,某些 Eclipse 捆绑包蕴含该插件,否则须要您手动装置。

请留神:默认的 Java JVM heap size 对于 Flink 来说可能太小了。你必须手动减少它。在 Eclipse 中,抉择 RunConfigurations->Arguments 并写入 VM Arguments 框:-Xmx800m。在 IntelliJ IDEA 中,更改 JVM 选项的举荐办法是应用 Help | Edit Custom VM Options 选项菜单。细节见这篇文章.

构建我的项目

如果要生成 / 打包我的项目,请转到我的项目目录并运行 ”mvn clean package” 命令。执行后将会失去一个 JAR 文件:target/-.jar,其中蕴含您的应用程序,以及作为依赖项增加到应用程序的连接器和库。

留神:如果应用与 StreamingJob 不同的类作为应用程序的主类 / 入口点,咱们建议您相应地更改 pom.xml 文件中的 mainClass 设置。这样,Flink 就能够间接从 JAR 文件运行应用程序,而无需另外指定主类。

附录: 构建带依赖的 jar 包的模版

要构建蕴含连接器和库所需的所有依赖项的应用程序 JAR,能够应用以下 shade 插件定义:

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>3.1.1</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <artifactSet>
                            <excludes>
                                <exclude>com.google.code.findbugs:jsr305</exclude>
                                <exclude>org.slf4j:*</exclude>
                                <exclude>log4j:*</exclude>
                            </excludes>
                        </artifactSet>
                        <filters>
                            <filter>
                                <!-- Do not copy the signatures in the META-INF folder.
                                Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                <artifact>*:*</artifact>
                                <excludes>
                                    <exclude>META-INF/*.SF</exclude>
                                    <exclude>META-INF/*.DSA</exclude>
                                    <exclude>META-INF/*.RSA</exclude>
                                </excludes>
                            </filter>
                        </filters>
                        <transformers>
                            <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                <mainClass>my.programs.main.clazz</mainClass>
                            </transformer>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

原文链接
本文为阿里云原创内容,未经容许不得转载。

退出移动版