关于spark:编译支持-spark-读写-osscdh-5x

54次阅读

共计 6725 个字符,预计需要花费 17 分钟才能阅读完成。

前言

背景:应用 spark 读取 hdfs 文件写入到 oss
hadoop : 2.6.0-cdh5.15.1
spark : 2.4.1
次要参考链接:https://blog.csdn.net/wankund…
减少了留神点和坑点

编译 hadoop-aliyun

hadoop 高版本曾经默认反对 aliyun-oss 的拜访,而本版本不反对,须要编译反对下

  • 拉取 hadoop trunk 分支代码,copy hadoop-tools/hadoop-aliyun 模块代码到 cdh 对应的我的项目模块中
  • 批改 hadoop-tools pom.xml

    • <module>hadoop-aliyun</module> 增加 hadoop-aliyun 子 module
  • 批改根 pom.xml 中的 java 版本为 1.8,hadoop-aliyun 应用了 1.8 的 lambda 语法,也能够间接批改代码反对
  • 批改 hadoop-aliyun pom.xml,批改 version,以及相干的 oss,http 依赖包,应用 shade 插件将相干依赖打进去
  • 代码批改

    • import org.apache.commons.lang3 改为 import org.apache.commons.lang
    • 复制(cdh 版本)hadoop-aws 模块下的 BlockingThreadPoolExecutorService 和 SemaphoredDelegatingExecutor 两个类 到 org.apache.hadoop.util 目录下
  • 编译模块 hadoop-aliyun

    • mvn clean package -pl hadoop-tools/hadoop-aliyun

最终的配置文件如下

<?xml version="1.0" encoding="UTF-8"?>
<!--
  Licensed under the Apache License, Version 2.0 (the "License");
  you may not use this file except in compliance with the License.
  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License. See accompanying LICENSE file.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <parent>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-project</artifactId>
    <version>2.6.0-cdh5.15.1</version>
    <relativePath>../../hadoop-project</relativePath>
  </parent>
  <artifactId>hadoop-aliyun</artifactId>
  <name>Apache Hadoop Aliyun OSS support</name>
  <packaging>jar</packaging>

  <properties>
    <file.encoding>UTF-8</file.encoding>
    <downloadSources>true</downloadSources>
  </properties>

  <profiles>
    <profile>
      <id>tests-off</id>
      <activation>
        <file>
          <missing>src/test/resources/auth-keys.xml</missing>
        </file>
      </activation>
      <properties>
        <maven.test.skip>true</maven.test.skip>
      </properties>
    </profile>
    <profile>
      <id>tests-on</id>
      <activation>
        <file>
          <exists>src/test/resources/auth-keys.xml</exists>
        </file>
      </activation>
      <properties>
        <maven.test.skip>false</maven.test.skip>
      </properties>
    </profile>
  </profiles>

  <build>
    <plugins>
      <plugin>
        <groupId>org.codehaus.mojo</groupId>
        <artifactId>findbugs-maven-plugin</artifactId>
        <configuration>
          <findbugsXmlOutput>true</findbugsXmlOutput>
          <xmlOutput>true</xmlOutput>
          <excludeFilterFile>${basedir}/dev-support/findbugs-exclude.xml
          </excludeFilterFile>
          <effort>Max</effort>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-surefire-plugin</artifactId>
        <configuration>
          <forkedProcessTimeoutInSeconds>3600</forkedProcessTimeoutInSeconds>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-dependency-plugin</artifactId>
        <executions>
          <execution>
            <id>deplist</id>
            <phase>compile</phase>
            <goals>
              <goal>list</goal>
            </goals>
            <configuration>
              <!-- build a shellprofile -->
              <outputFile>
                ${project.basedir}/target/hadoop-tools-deps/${project.artifactId}.tools-optional.txt
              </outputFile>
            </configuration>
          </execution>
        </executions>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.1.0</version>
        <executions>
          <execution>
            <id>shade-aliyun-sdk-oss</id>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
            <configuration>
              <shadedArtifactAttached>false</shadedArtifactAttached>
              <promoteTransitiveDependencies>true</promoteTransitiveDependencies>
              <createDependencyReducedPom>true</createDependencyReducedPom>
              <createSourcesJar>true</createSourcesJar>
              <relocations>
                <relocation>
                  <pattern>org.apache.http</pattern>
                  <shadedPattern>com.xxx.thirdparty.org.apache.http</shadedPattern>
                </relocation>
              </relocations>
            </configuration>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <scope>test</scope>
    </dependency>

    <dependency>
      <groupId>com.aliyun.oss</groupId>
      <artifactId>aliyun-sdk-oss</artifactId>
      <version>3.4.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.httpcomponents</groupId>
      <artifactId>httpclient</artifactId>
      <version>4.4.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.httpcomponents</groupId>
      <artifactId>httpcore</artifactId>
      <version>4.4.1</version>
    </dependency>

    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-common</artifactId>
      <exclusions>
        <exclusion>
          <groupId>org.apache.httpcomponents</groupId>
          <artifactId>httpclient</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.apache.httpcomponents</groupId>
          <artifactId>httpcore</artifactId>
        </exclusion>
      </exclusions>
      <scope>provided</scope>
    </dependency>

    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-common</artifactId>
      <scope>test</scope>
      <type>test-jar</type>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-distcp</artifactId>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-distcp</artifactId>
      <scope>test</scope>
      <type>test-jar</type>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-yarn-server-tests</artifactId>
      <scope>test</scope>
      <type>test-jar</type>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-mapreduce-examples</artifactId>
      <scope>test</scope>
      <type>jar</type>
    </dependency>
  </dependencies>

</project>

spark 读写取 oss 文件

val inputPath = "hdfs:///xxx"
val outputPath = "oss://bucket/OSS_FILES"

val conf = new SparkConf()
    conf.set("spark.hadoop.fs.oss.endpoint", "oss-cn-xxx")
    conf.set("spark.hadoop.fs.oss.accessKeyId", "xxx")
    conf.set("spark.hadoop.fs.oss.accessKeySecret", "xxx")
    conf.set("spark.hadoop.fs.oss.impl", "org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem")
    conf.set("spark.hadoop.fs.oss.buffer.dir", "/tmp/oss")
    conf.set("spark.hadoop.fs.oss.connection.secure.enabled", "false")
    conf.set("spark.hadoop.fs.oss.connection.maximum", "2048")
    
spark.write.format("orc").mode("overwrite").save(outputPath)

其它以 spark sql 以及 hdfs 读取 oss 的形式能够参考前面的第三个链接

spark submit

spark-submit \
--class org.example.HdfsToOSS \
--master yarn \
--deploy-mode cluster \
--num-executors 2 \
--executor-cores 2 \
--executor-memory 3G \
--driver-cores 1  \
--driver-memory 3G \
--conf "spark.driver.extraClassPath=hadoop-common-2.6.0-cdh5.15.1.jar" \
--conf "spark.executor.extraClassPath=hadoop-common-2.6.0-cdh5.15.1.jar" \
--jars ./hadoop-aliyun-2.6.0-cdh5.15.1.jar,./hadoop-common-2.6.0-cdh5.15.1.jar \
./spark-2.4-worker-1.0-SNAPSHOT.jar

留神下 extraClassPath 这个参数,如果没有非凡的配置,spark 会默认加载本身的 hadoop-common 包,如果版本不对,可能会导致 ClassNotFound,须要 extraClassPath 指定,会优先加载


满腹经纶,如果有谬误的中央,欢送斧正

参考链接

  • Spark 依赖包加载程序和抵触解决方案
  • Spark Configuration
  • 批改 Spark 反对近程拜访 OSS 文件

正文完
 0