乐趣区

关于clickhouse:篇五ClickHouse数据导入FlinkSparkKafkaMySQLHive

本文分享次要是 ClickHouse 的数据导入形式,本文次要介绍如何应用 Flink、Spark、Kafka、MySQL、Hive 将数据导入 ClickHouse,具体内容包含:

  • 应用 Flink 导入数据
  • 应用 Spark 导入数据
  • 从 Kafka 中导入数据
  • 从 MySQL 中导入数据
  • 从 Hive 中导入数据

应用 Flink 导入数据

本文介绍应用 flink-jdbc 将数据导入 ClickHouse,Maven 依赖为:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-jdbc_${scala.binary.version}</artifactId>
    <version>1.10.1</version>
</dependency>

示例

本示例应用 Kafka connector,通过 Flink 将 Kafka 数据实时导入到 ClickHouse

public class FlinkSinkClickHouse {public static void main(String[] args) throws Exception {
        String url = "jdbc:clickhouse://192.168.10.203:8123/default";
        String user = "default";
        String passwd = "hOn0d9HT";
        String driver = "ru.yandex.clickhouse.ClickHouseDriver";
        int batchsize = 500; // 设置 batch size,测试的话能够设置小一点,这样能够立即看到数据被写入

        // 创立执行环境
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

        String kafkaSource11 = ""+"CREATE TABLE user_behavior ("+" `user_id` BIGINT, -- 用户 id\n"+" `item_id` BIGINT, -- 商品 id\n"+" `cat_id` BIGINT, -- 品类 id\n"+" `action` STRING, -- 用户行为 \n"+" `province` INT, -- 用户所在的省份 \n"+" `ts` BIGINT, -- 用户行为产生的工夫戳 \n"+" `proctime` AS PROCTIME(), -- 通过计算列产生一个解决工夫列 \n"+" `eventTime` AS TO_TIMESTAMP(FROM_UNIXTIME(ts, 'yyyy-MM-dd HH:mm:ss')), -- 事件工夫 \n"+" WATERMARK FOR eventTime AS eventTime - INTERVAL '5' SECOND -- 在 eventTime 上定义 watermark\n"+") WITH ( 'connector' = 'kafka', -- 应用 kafka connector\n"+" 'topic' = 'user_behavior', -- kafka 主题 \n"+" 'scan.startup.mode' = 'earliest-offset', -- 偏移量,从起始 offset 开始读取 \n"+" 'properties.group.id' = 'group1', -- 消费者组 \n"+" 'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092', -- kafka broker 地址 \n"+" 'format' = 'json', -- 数据源格局为 json\n"+" 'json.fail-on-missing-field' = 'true',\n"+" 'json.ignore-parse-errors' = 'false'" +
                ")";

        // Kafka Source
        tEnv.executeSql(kafkaSource11);
        String query = "SELECT user_id,item_id,cat_id,action,province,ts FROM user_behavior";
        Table table = tEnv.sqlQuery(query);

        String insertIntoCkSql = "INSERT INTO behavior_mergetree(user_id,item_id,cat_id,action,province,ts)\n" +
                "VALUES(?,?,?,?,?,?)";

        // 将数据写入 ClickHouse Sink
        JDBCAppendTableSink sink = JDBCAppendTableSink
                .builder()
                .setDrivername(driver)
                .setDBUrl(url)
                .setUsername(user)
                .setPassword(passwd)
                .setQuery(insertIntoCkSql)
                .setBatchSize(batchsize)
                .setParameterTypes(Types.LONG, Types.LONG,Types.LONG, Types.STRING,Types.INT,Types.LONG)
                .build();

        String[] arr = {"user_id","item_id","cat_id","action","province","ts"};
        TypeInformation[] type = {Types.LONG, Types.LONG,Types.LONG, Types.STRING,Types.INT,Types.LONG};

        tEnv.registerTableSink(
                "sink",
                arr,
                type,
                sink
        );

        tEnv.insertInto(table, "sink");

        tEnv.execute("Flink Table API to ClickHouse Example");
    }

}

Note:

  • 因为 ClickHouse 单次插入的提早比拟高,咱们须要设置 BatchSize 来批量插入数据,进步性能。
  • 在 JDBCAppendTableSink 的实现中,若最初一批数据的数目有余 BatchSize,则不会插入残余数据。

应用 Spark 导入数据

本文次要介绍如何通过 Spark 程序写入数据到 Clickhouse 中。

<dependency>
      <groupId>ru.yandex.clickhouse</groupId>
      <artifactId>clickhouse-jdbc</artifactId>
      <version>0.2.4</version>
</dependency>
<!-- 如果报错:Caused by: java.lang.ClassNotFoundException: com.google.common.escape.Escapers,则增加上面的依赖 -->
<dependency>
          <groupId>com.google.guava</groupId>
          <artifactId>guava</artifactId>
          <version>28.0-jre</version>
</dependency>

示例

object Spark2ClickHouseExample {val properties = new Properties()
  properties.put("driver", "ru.yandex.clickhouse.ClickHouseDriver")
  properties.put("user", "default")
  properties.put("password", "hOn0d9HT")
  properties.put("batchsize", "1000")
  properties.put("socket_timeout", "300000")
  properties.put("numPartitions", "8")
  properties.put("rewriteBatchedStatements", "true")

  case class Person(name: String, age: Long)

  private def runDatasetCreationExample(spark: SparkSession): Dataset[Person] = {
    import spark.implicits._
    // DataFrames 转成 DataSet
    val path = "file:///e:/people.json"
    val peopleDS = spark.read.json(path)
    peopleDS.createOrReplaceTempView("people")
    val ds = spark.sql("SELECT name,age FROM people").as[Person]
    ds.show()
    ds
  }

  def main(args: Array[String]) {


    val url = "jdbc:clickhouse://kms-1:8123/default"
    val table = "people"

    val spark = SparkSession
      .builder()
      .appName("Spark  Example")
      .master("local") // 设置为本地运行
      .getOrCreate()
    val ds = runDatasetCreationExample(spark)

    ds.write.mode(SaveMode.Append).option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 100000).jdbc(url, table, properties)
    spark.stop()}
}

从 Kafka 中导入数据

次要是应用 ClickHouse 的表引擎。

应用形式

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
    ...
) ENGINE = Kafka()
SETTINGS
    kafka_broker_list = 'host:port',
    kafka_topic_list = 'topic1,topic2,...',
    kafka_group_name = 'group_name',
    kafka_format = 'data_format'[,]
    [kafka_row_delimiter = 'delimiter_symbol',]
    [kafka_schema = '',]
    [kafka_num_consumers = N,]
    [kafka_max_block_size = 0,]
    [kafka_skip_broken_messages = N,]
    [kafka_commit_every_batch = 0,]
    [kafka_thread_per_consumer = 0]
  • kafka_broker_list:逗号分隔的 brokers 地址 (localhost:9092).
  • kafka_topic_list:Kafka 主题列表,多个主题用逗号分隔.
  • kafka_group_name:消费者组.
  • kafka_format – Message format. 比方JSONEachRow、JSON、CSV 等等

应用示例

在 kafka 中创立 user_behavior 主题,并向该主题写入数据,数据示例为:

{"user_id":63401,"item_id":6244,"cat_id":143,"action":"pv","province":3,"ts":1573445919}
{"user_id":9164,"item_id":2817,"cat_id":611,"action":"fav","province":28,"ts":1573420486}
{"user_id":63401,"item_id":6244,"cat_id":143,"action":"pv","province":3,"ts":1573445919}

在 ClickHouse 中创立表,抉择表引擎为 Kafka(),如下:

 CREATE TABLE kafka_user_behavior (
    user_id UInt64 COMMENT '用户 id',
    item_id UInt64 COMMENT '商品 id',
    cat_id UInt16  COMMENT '品类 id',
    action String  COMMENT '行为',
    province UInt8 COMMENT '省份 id',
    ts UInt64      COMMENT '工夫戳'
  ) ENGINE = Kafka()
    SETTINGS
    kafka_broker_list = 'cdh04:9092',
    kafka_topic_list = 'user_behavior',
    kafka_group_name = 'group1',
    kafka_format = 'JSONEachRow'
;
-- 查问
cdh04 :) select * from kafka_user_behavior ;

-- 再次查看数据,发现数据为空
cdh04 :) select count(*) from kafka_user_behavior;

SELECT count(*)
FROM kafka_user_behavior

┌─count()─┐
│       0 │
└─────────┘

通过物化视图将 kafka 数据导入 ClickHouse

当咱们一旦查问结束之后,ClickHouse 会删除表内的数据,其实 Kafka 表引擎只是一个数据管道,咱们能够通过物化视图的形式拜访 Kafka 中的数据。

  • 首先创立一张 Kafka 表引擎的表,用于从 Kafka 中读取数据
  • 而后再创立一张一般表引擎的表,比方 MergeTree,面向终端用户应用
  • 最初创立物化视图,用于将 Kafka 引擎表实时同步到终端用户所应用的表中
--  创立 Kafka 引擎表
 CREATE TABLE kafka_user_behavior_src (
    user_id UInt64 COMMENT '用户 id',
    item_id UInt64 COMMENT '商品 id',
    cat_id UInt16  COMMENT '品类 id',
    action String  COMMENT '行为',
    province UInt8 COMMENT '省份 id',
    ts UInt64      COMMENT '工夫戳'
  ) ENGINE = Kafka()
    SETTINGS
    kafka_broker_list = 'cdh04:9092',
    kafka_topic_list = 'user_behavior',
    kafka_group_name = 'group1',
    kafka_format = 'JSONEachRow'
;

-- 创立一张终端用户应用的表
 CREATE TABLE kafka_user_behavior (
    user_id UInt64 COMMENT '用户 id',
    item_id UInt64 COMMENT '商品 id',
    cat_id UInt16  COMMENT '品类 id',
    action String  COMMENT '行为',
    province UInt8 COMMENT '省份 id',
    ts UInt64      COMMENT '工夫戳'
  ) ENGINE = MergeTree()
    ORDER BY user_id
;
-- 创立物化视图,同步数据
CREATE MATERIALIZED VIEW user_behavior_consumer TO kafka_user_behavior
    AS SELECT * FROM kafka_user_behavior_src ;
-- 查问,屡次查问,曾经被查问的数据仍然会被输入
cdh04 :) select * from kafka_user_behavior;

Note:

Kafka 生产表不能间接作为后果表应用。Kafka 生产表只是用来生产 Kafka 数据,没有真正的存储所有数据。

从 MySQL 中导入数据

同 kafka 中导入数据相似,ClickHouse 同样反对 MySQL 表引擎,即映射一张 MySQL 中的表到 ClickHouse 中。

数据类型对应关系

MySQL 中数据类型与 ClickHouse 类型映射关系如下表。

MySQL ClickHouse
UNSIGNED TINYINT UInt8
TINYINT Int8
UNSIGNED SMALLINT UInt16
SMALLINT Int16
UNSIGNED INT, UNSIGNED MEDIUMINT UInt32
INT, MEDIUMINT Int32
UNSIGNED BIGINT UInt64
BIGINT Int64
FLOAT Float32
DOUBLE Float64
DATE Date
DATETIME, TIMESTAMP DateTime
BINARY FixedString

应用形式

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1] [TTL expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2] [TTL expr2],
    ...
) ENGINE = MySQL('host:port', 'database', 'table', 'user', 'password'[, replace_query, 'on_duplicate_clause']);

应用示例

-- 连贯 MySQL 中 clickhouse 数据库的 test 表
CREATE TABLE mysql_users(
    id Int32,
    name String
) ENGINE = MySQL(
 '192.168.10.203:3306',
 'clickhouse',
 'users', 
 'root', 
 '123qwe');
-- 查问数据
cdh04 :) SELECT * FROM mysql_users;

SELECT *
FROM mysql_users

┌─id─┬─name──┐
│  1 │ tom   │
│  2 │ jack  │
│  3 │ lihua │
└────┴───────┘
-- 插入数据,会将数据插入 MySQL 对应的表中
-- 所以当查问 MySQL 数据时,会发现新增了一条数据
INSERT INTO users VALUES(4,'robin');
-- 再次查问
cdh04 :) select * from mysql_users;                

SELECT *
FROM mysql_users

┌─id─┬─name──┐
│  1 │ tom   │
│  2 │ jack  │
│  3 │ lihua │
│  4 │ robin │
└────┴───────┘

留神:对于 MySQL 表引擎,不反对 UPDATE 和 DELETE 操作,比方执行上面命令时,会报错:

-- 执行更新
ALTER TABLE mysql_users UPDATE name = 'hanmeimei' WHERE id = 1;
-- 执行删除
ALTER TABLE mysql_users DELETE WHERE id = 1;
-- 报错
DB::Exception: Mutations are not supported by storage MySQL.

从 Hive 中导入数据

本文应用 Waterdrop 进行数据导入,Waterdrop 是一个十分易用,高性能,可能应答海量数据的实时数据处理产品,它构建在 Spark 之上。Waterdrop 领有着十分丰盛的插件,反对从 Kafka、HDFS、Kudu 中读取数据,进行各种各样的数据处理,并将后果写入 ClickHouse、Elasticsearch 或者 Kafka 中。

咱们仅须要编写一个 Waterdrop Pipeline 的配置文件即可实现数据的导入。配置文件包含四个局部,别离是 Spark、Input、filter 和 Output。

对于 Waterdrop 的装置,非常简略,只须要下载 ZIP 文件,解压即可。应用 Waterdrop 须要装置 Spark。

  • 在 Waterdrop 装置目录的 config/ 文件夹下创立配置文件:hive_table_batch.conf,内容如下。次要包含四局部:Spark、Input、filter 和 Output。

    • Spark 局部是 Spark 的相干配置,次要配置 Spark 执行时所需的资源大小。
    • Input 局部是定义数据源,其中 pre_sql 是从 Hive 中读取数据 SQL,table_name是将读取后的数据,注册成为 Spark 中长期表的表名,可为任意字段。
    • filter 局部配置一系列的转化,比方过滤字段
    • Output 局部是将解决好的结构化数据写入 ClickHouse,ClickHouse 的连贯配置。

      须要留神的是,必须保障 hive 的 metastore 是在服务状态。

spark {
  spark.app.name = "Waterdrop_Hive2ClickHouse"
  spark.executor.instances = 2
  spark.executor.cores = 1
  spark.executor.memory = "1g"
  // 这个配置必须填写
  spark.sql.catalogImplementation = "hive"
}
input {
    hive {
        pre_sql = "select * from default.users"
        table_name = "hive_users"
    }
}
filter {}
output {
    clickhouse {
        host = "kms-1:8123"
        database = "default"
        table = "users"
        fields = ["id", "name"]
        username = "default"
        password = "hOn0d9HT"
    }
}
  • 执行工作
[kms@kms-1 waterdrop-1.5.1]$ bin/start-waterdrop.sh  --config config/hive_table_batch.conf --master yarn --deploy-mode cluster

这样就会启动一个 Spark 作业执行数据的抽取,等执行实现之后,查看 ClickHouse 的数据。

总结

本文次要介绍了如何通过 Flink、Spark、Kafka、MySQL 以及 Hive,将数据导入到 ClickHouse,对每一种形式都出了具体的示例,心愿对你有所帮忙。

公众号『大数据技术与数仓』,回复『材料』支付大数据资料包

退出移动版