简介:Datastream作业开发时往往会遇到一些jar包抵触等问题,本文次要解说作业开发时须要引入哪些依赖以及哪些须要被打包进作业的jar中,从而防止不必要的依赖被打入了作业jar中以及可能产生的依赖抵触。
Datastream作业开发时往往会遇到一些jar包抵触等问题,本文次要解说作业开发时须要引入哪些依赖以及哪些须要被打包进作业的jar中,从而防止不必要的依赖被打入了作业jar中以及可能产生的依赖抵触。
一个Datastream作业次要波及下述依赖:
Flink的外围依赖以及应用程序本身的依赖
每一个Flink应用程序都依赖于一系列相干的库,其中至多应该包含Flink的API. 许多应用程序还依赖于连接器相干的库(如 Kafka, Cassandra等).在运行Flink应用程序时,无论是在运行在分布式的环境下还是在本地IDE进行测试,Flink的运行时相干依赖都是必须的。
与大多数运行用户自定义应用程序的零碎一样,Flink 中有两大类依赖项:
- Flink外围依赖:Flink 自身由一组运行零碎所必须的类和依赖项组成,例如协调器、网络、检查点、容错、API、算子(例如窗口)、资源管理等。 所有这些类和依赖项的汇合形成了 Flink 运行时的外围,在 Flink 应用程序启动时必须存在。这些外围类和依赖项都被打包在 flink-dist jar 中。 它们是 Flink 的 lib 文件夹的一部分,也是Flink根底容器镜像的一部分。这些依赖之于Flink就像Java 运行所需的蕴含 String 和 List 等类的外围库(rt.jar、charsets.jar 等)之于Java。Flink的外围依赖不蕴含任何连接器或扩大库(CEP、SQL、ML等),这使得Flink的外围依赖尽可能小,以防止默认状况下类门路中有过多的依赖项,同时缩小依赖抵触。
- 用户应用程序依赖项:指特定用户应用程序所需的所有连接器、Format或扩大库。用户应用程序通常被打包成一个 jar文件,其中蕴含利用程序代码以及所需的连接器和库依赖项。用户应用程序依赖项不应包含 Flink DataStream API 和运行时依赖项,因为这些曾经被蕴含在了Flink 的外围依赖中。
依赖配置步骤
1.增加根底依赖
每一个Flink应用程序的开发至多须要增加对相干API的根底依赖。
手动配置我的项目时,须要增加对Java/Scala API的依赖(这里以Maven为例,在其余构建工具(Gradle,SBT等)中能够应用同样的依赖)。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.12.3</version>
<scope>provided</scope>
</dependency>
重要提醒:请留神,所有这些依赖项都将其范畴设置为”provided”。这意味着须要对它们进行编译,但不应将它们打包到我的项目生成的应用程序jar文件中——这些依赖项是Flink外围依赖项,在理论运行时曾经被加载。
强烈建议将依赖项设置成”provided”的范畴,如果未将它们设置为”provided”,最好的状况下会导致生成的jar变得臃肿,因为它还蕴含所有Flink外围依赖项。而最怀的状况下,增加到应用程序jar文件中的Flink外围依赖项与您本人的一些依赖项会产生版本抵触(通常通过Flink的反向类加载机制来防止)。
对于IntelliJ的注意事项:为了使应用程序在IntelliJ IDEA中运行,有必要在运行配置中勾选”Include dependencies with “Provided” scope”选项框。如果没有该选项(可能是因为应用较旧的IntelliJ IDEA版本),那么一个简略的解决办法是创立一个调用应用程序 main() 办法的测试用例。
2.增加连接器和库的依赖
大多数应用程序的运行须要特定的连接器或库,例如Kafka、Cassandra等连接器。这些连接器不是Flink外围依赖项的一部分,必须作为额定依赖项增加到应用程序中。
下述代码是增加Kafka连接器依赖项的示例(Maven语法):
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.12.3</version>
</dependency>
咱们倡议将利用程序代码和它所有的依赖以jar-with-dependencies 的模式打包到一个application jar中。这个应用程序jar包能够被提交到曾经存在的Flink集群下来,或者被退出到Flink应用程序的容器镜像中去。
从Maven作业模版(见下文Maven作业模版局部)创立的我的项目,通过mvn clean package命令会主动把依赖打到应用程序的jar包中去。对于没有应用模版进行配置的状况,倡议应用Maven Shade Plugin (配置如附录所示) 来构建蕴含依赖的jar包。
重要提醒:对于Maven(和其余构建工具)来说,要将依赖项正确打包到应用程序jar中,这些应用程序依赖项的scope必须指定为”compile”(与外围依赖项不同,外围依赖项的scope必须指定为”provided”)。
注意事项
Scala版本
Scala的不同版本(2.11,2.12等)相互之间是不兼容的。因而,Scala 2.11对应的Flink版本不能用于应用Scala 2.12的应用程序。
所有依赖(或传递依赖)于Scala的Flink依赖项都以构建它们的Scala版本作为后缀,例如flink-streaming-scala_2.11。
只应用Java进行开发时能够抉择任何Scala版本,应用Scala开发时须要抉择与其应用程序的Scala版本匹配的Flink依赖版本。
注:2.12.8之后的Scala版本与之前的2.12.x版本不兼容,因而Flink我的项目无奈将其2.12.x版本升级到2.12.8之后的版本。用户能够在本地本人编译对应Scala版本的Flink。为了使其可能失常工作,须要增加-Djapicmp.skip以在构建时跳过二进制兼容性查看。
Hadoop依赖
个别的规定: 永远不要将Hadoop相干依赖间接增加到应用程序中. (惟一的例外是将现有的Hadoop输出/输入Format与Flink的Hadoop兼容包一起应用时)
如果心愿将Flink与Hadoop联合应用,则须要蕴含Hadoop依赖的Flink启动项,而不是将Hadoop增加为应用程序依赖项。Flink将应用HADOOP_CLASSPATH环境变量指定的Hadoop依赖项,可通过以下形式进行设置:
export HADOOP_CLASSPATH=hadoop classpath“
这种设计有两个次要起因:
- 一些与Hadoop的交互可能产生在Flink的外围模块中,并且在用户应用程序启动之前,例如为检查点设置HDFS、通过Hadoop的Kerberos令牌进行身份验证,或者在YARN上进行部署等。
- Flink的反向类加载机制从外围依赖项中暗藏了许多可传递的依赖项。这不仅实用于Flink本人的外围依赖项,而且实用于Hadoop的依赖项。这样,应用程序就能够应用雷同依赖项的不同版本,而不会产生依赖项抵触(置信咱们,这是一件小事,因为Hadoop依赖树十分宏大。)
如果在IDE外部的测试或开发过程中须要Hadoop依赖项(例如HDFS拜访),请将这些依赖项的scope配置为
test 或则 provided。
Transform table connector/format resources #
Flink应用Java的Service Provider Interfaces (SPI) 机制通过特定标识符加载table的connector/format工厂。因为每个table的connector/format的名为org.apache.flink.table.factories.Factory的SPI资源文件位于同一目录:META-INF/services下,因而在构建应用多个table connector/format的我的项目的uber jar时,这些资源文件将互相笼罩,这将导致Flink无奈正确加载工厂类。
在这种状况下,举荐的办法是通过maven shade插件的ServicesResourceTransformer转换META-INF/services目录下的这些资源文件。给定示例的pom.xml文件内容如下,其中蕴含连接器flink-sql-connector-hive-3.1.2和flink-parquet format。
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>myProject</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<!-- other project dependencies ...-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-hive-3.1.2__2.11</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet__2.11<</artifactId>
<version>1.13.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<id>shade</id>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers combine.children="append">
<!-- The service transformer is needed to merge META-INF/services files -->
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<!-- ... -->
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
在配置了ServicesResourceTransformer之后, 我的项目构建uber-jar时,META-INF/services目录下的这些资源文件会被整合在一起而不是互相笼罩。
Maven作业模版
强烈建议应用该形式进行配置,能够缩小很多反复的配置工作。
前置要求
惟一的环境要求是装置了Maven 3.0.4(或更高版本)和Java 8.x。
创立我的项目
应用以下两种形式中的一种创立我的项目:
- 应用Maven archetypes
$ mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.12.3
这容许您命名新创建的我的项目。它将以交互方式要求您输出groupId、artifactId和包名。
- 运行quickstart脚本
$ curl https://flink.apache.org/q/quickstart.sh | bash -s 1.12.3
咱们建议您将此我的项目导入IDE以开发和测试它。IntelliJ IDEA原生反对Maven我的项目。如果应用Eclipse,能够应用m2e插件导入Maven我的项目。默认状况下,某些Eclipse捆绑包蕴含该插件,否则须要您手动装置。
请留神:默认的Java JVM heap size对于Flink来说可能太小了。你必须手动减少它。在Eclipse中,抉择RunConfigurations->Arguments并写入VM Arguments框:-Xmx800m。在IntelliJ IDEA中,更改JVM选项的举荐办法是应用Help | Edit Custom VM Options选项菜单。细节见这篇文章.
构建我的项目
如果要生成/打包我的项目,请转到我的项目目录并运行”mvn clean package”命令。执行后将会失去一个JAR文件:target/-.jar,其中蕴含您的应用程序,以及作为依赖项增加到应用程序的连接器和库。
留神:如果应用与StreamingJob不同的类作为应用程序的主类/入口点,咱们建议您相应地更改pom.xml文件中的mainClass设置。这样,Flink就能够间接从JAR文件运行应用程序,而无需另外指定主类。
附录: 构建带依赖的jar包的模版
要构建蕴含连接器和库所需的所有依赖项的应用程序JAR,能够应用以下shade插件定义:
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>my.programs.main.clazz</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
原文链接
本文为阿里云原创内容,未经容许不得转载。