乐趣区

关于flink:Flink110集成Hive快速入门

Hive 是大数据畛域最早呈现的 SQL 引擎,倒退至今有着丰盛的性能和宽泛的用户根底。之后呈现的 SQL 引擎,如 Spark SQL、Impala 等,都在肯定水平上提供了与 Hive 集成的性能,从而不便用户应用现有的数据仓库、进行作业迁徙等。

Flink 从 1.9 开始反对集成 Hive,不过 1.9 版本为 beta 版,不举荐在生产环境中应用。在最新版 Flink1.10 版本,标记着对 Blink 的整合宣告实现,随着对 Hive 的生产级别集成,Hive 作为数据仓库零碎的相对外围,承当着绝大多数的离线数据 ETL 计算和数据管理,期待 Flink 将来对 Hive 的完满反对。

而 HiveCatalog 会与一个 Hive Metastore 的实例连贯,提供元数据长久化的能力。要应用 Flink 与 Hive 进行交互,用户须要配置一个 HiveCatalog,并通过 HiveCatalog 拜访 Hive 中的元数据。

增加依赖

要与 Hive 集成,须要在 Flink 的 lib 目录下增加额定的依赖 jar 包,以使集成在 Table API 程序或 SQL Client 中的 SQL 中起作用。或者,能够将这些依赖项放在文件夹中,并别离应用 Table API 程序或 SQL Client 的 -C-l 选项将它们增加到 classpath 中。本文应用第一种形式,行将 jar 包间接复制到 $FLINK_HOME/lib 目录下。本文应用的 Hive 版本为 2.3.4(对于不同版本的 Hive,能够参照官网抉择不同的 jar 包依赖),总共须要 3 个 jar 包,如下:

  • flink-connector-hive_2.11-1.10.0.jar
  • flink-shaded-hadoop-2-uber-2.7.5-8.0.jar
  • hive-exec-2.3.4.jar

其中 hive-exec-2.3.4.jar 在 hive 的 lib 文件夹下,另外两个须要自行下载,下载地址:flink-connector-hive_2.11-1.10.0.jar,flink-shaded-hadoop-2-uber-2.7.5-8.0.jar

切莫拔剑四顾心茫然,话不多说,间接上代码。

构建程序

增加 Maven 依赖

<!-- Flink Dependency -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-hive_2.11</artifactId>
  <version>1.10.0</version>
  <scope>provided</scope>
</dependency>

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-api-java-bridge_2.11</artifactId>
  <version>1.10.0</version>
  <scope>provided</scope>
</dependency>

<!-- Hive Dependency -->
<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>${hive.version}</version>
    <scope>provided</scope>
</dependency>  

实例代码

package com.flink.sql.hiveintegration;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;

/**
 *  @Created with IntelliJ IDEA.
 *  @author : jmx
 *  @Date: 2020/3/31
 *  @Time: 13:22
 *  
 */
public class FlinkHiveIntegration {public static void main(String[] args) throws Exception {

        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .useBlinkPlanner() // 应用 BlinkPlanner
                .inBatchMode() // Batch 模式,默认为 StreamingMode
                .build();

        // 应用 StreamingMode
       /* EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .useBlinkPlanner() // 应用 BlinkPlanner
                .inStreamingMode() // StreamingMode
                .build();*/

        TableEnvironment tableEnv = TableEnvironment.create(settings);

        String name = "myhive";      // Catalog 名称,定义一个惟一的名称示意
        String defaultDatabase = "qfbap_ods";  // 默认数据库名称
        String hiveConfDir = "/opt/modules/apache-hive-2.3.4-bin/conf";  // hive-site.xml 门路
        String version = "2.3.4";       // Hive 版本号

        HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);

        tableEnv.registerCatalog("myhive", hive);
        tableEnv.useCatalog("myhive");
        // 创立数据库,目前不反对创立 hive 表
        String createDbSql = "CREATE DATABASE IF NOT EXISTS myhive.test123";

        tableEnv.sqlUpdate(createDbSql);  

    }
}

Flink SQL Client 集成 Hive

Flink 的表和 SQL API 能够解决用 SQL 语言编写的查问,然而这些查问须要嵌入到用 Java 或 Scala 编写的程序中。此外,这些程序在提交到集群之前须要与构建工具打包。这或多或少地限度了 Java/Scala 程序员对 Flink 的应用。

SQL 客户端旨在提供一种简略的形式,无需一行 Java 或 Scala 代码,即可将表程序编写、调试和提交到 Flink 集群。Flink SQL 客户端 CLI 容许通过命令行的模式运行分布式程序。应用 Flink SQL cli 拜访 Hive,须要配置 sql-client-defaults.yaml 文件。

sql-client-defaults.yaml 配置

目前 HiveTableSink 不反对流式写入(未实现 AppendStreamTableSink)。须要将执行模式改成 batch
模式,否则会报如下谬误:

org.apache.flink.table.api.TableException: Stream Tables can only be emitted by AppendStreamTableSink, RetractStreamTableSink, or UpsertStreamTableSink.

须要批改的配置内容如下:

#... 省略的配置项...

#==============================================================================
# Catalogs
#==============================================================================
# 配置 catalogs, 能够配置多个.
catalogs: # empty list
  - name: myhive
    type: hive
    hive-conf-dir: /opt/modules/apache-hive-2.3.4-bin/conf
    hive-version: 2.3.4
    default-database: qfbap_ods

#... 省略的配置项...

#==============================================================================
# Execution properties
#==============================================================================

# Properties that change the fundamental execution behavior of a table program.

execution:
  # select the implementation responsible for planning table programs
  # possible values are 'blink' (used by default) or 'old'
  planner: blink
  # 'batch' or 'streaming' execution
  type: batch

启动 Flink SQL Cli

bin/sql-client.sh  embedded

启动之后,就能够在此 Cli 下执行 SQL 命令拜访 Hive 的表了,根本的操作如下:

-- 命令行帮忙
Flink SQL> help
-- 查看以后会话的 catalog,其中 myhive 为本人配置的,default_catalog 为默认的
Flink SQL> show catalogs;
default_catalog
myhive
-- 应用 catalog
Flink SQL> use catalog myhive;
-- 查看以后 catalog 的数据库
Flink SQL> show databases;
-- 创立数据库
Flink SQL> create database testdb;
-- 删除数据库
Flink SQL> drop database testdb;
-- 创立表
Flink SQL> create table tbl(id int,name string);
-- 删除表
Flink SQL> drop table tbl;
-- 查问表
Flink SQL> select * from  code_city;
-- 插入数据
Flink SQL> insert overwrite code_city select id,city,province,event_time from code_city_delta ;
Flink SQL> INSERT into code_city values(1,'南京','江苏','');

小结

本文以最新版本的 Flink 为例,对 Flink 集成 Hive 进行了实操。首先通过代码的形式与 Hive 进行集成,而后介绍了如何应用 Flink SQL 客户端拜访 Hive,并对其中会遇到的坑进行了形容,最初给出了 Flink SQL Cli 的具体应用。置信在将来的版本中 Flink SQL 会越来越欠缺,期待 Flink 将来对 Hive 的完满反对。

欢送增加我的公众号,随时随地理解更多精彩内容。

公众号『大数据技术与数仓』,回复『材料』支付大数据资料包

退出移动版