关于clickhouse:篇五ClickHouse数据导入FlinkSparkKafkaMySQLHive

本文分享次要是ClickHouse的数据导入形式,本文次要介绍如何应用Flink、Spark、Kafka、MySQL、Hive将数据导入ClickHouse,具体内容包含:

  • 应用Flink导入数据
  • 应用Spark导入数据
  • 从Kafka中导入数据
  • 从MySQL中导入数据
  • 从Hive中导入数据

应用Flink导入数据

本文介绍应用 flink-jdbc将数据导入ClickHouse,Maven依赖为:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-jdbc_${scala.binary.version}</artifactId>
    <version>1.10.1</version>
</dependency>

示例

本示例应用Kafka connector,通过Flink将Kafka数据实时导入到ClickHouse

public class FlinkSinkClickHouse {
    public static void main(String[] args) throws Exception {
        String url = "jdbc:clickhouse://192.168.10.203:8123/default";
        String user = "default";
        String passwd = "hOn0d9HT";
        String driver = "ru.yandex.clickhouse.ClickHouseDriver";
        int batchsize = 500; // 设置batch size,测试的话能够设置小一点,这样能够立即看到数据被写入

        // 创立执行环境
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

        String kafkaSource11 = "" +
                "CREATE TABLE user_behavior ( " +
                " `user_id` BIGINT, -- 用户id\n" +
                " `item_id` BIGINT, -- 商品id\n" +
                " `cat_id` BIGINT, -- 品类id\n" +
                " `action` STRING, -- 用户行为\n" +
                " `province` INT, -- 用户所在的省份\n" +
                " `ts` BIGINT, -- 用户行为产生的工夫戳\n" +
                " `proctime` AS PROCTIME(), -- 通过计算列产生一个解决工夫列\n" +
                " `eventTime` AS TO_TIMESTAMP(FROM_UNIXTIME(ts, 'yyyy-MM-dd HH:mm:ss')), -- 事件工夫\n" +
                " WATERMARK FOR eventTime AS eventTime - INTERVAL '5' SECOND -- 在eventTime上定义watermark\n" +
                ") WITH ( 'connector' = 'kafka', -- 应用 kafka connector\n" +
                " 'topic' = 'user_behavior', -- kafka主题\n" +
                " 'scan.startup.mode' = 'earliest-offset', -- 偏移量,从起始 offset 开始读取\n" +
                " 'properties.group.id' = 'group1', -- 消费者组\n" +
                " 'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092', -- kafka broker 地址\n" +
                " 'format' = 'json', -- 数据源格局为 json\n" +
                " 'json.fail-on-missing-field' = 'true',\n" +
                " 'json.ignore-parse-errors' = 'false'" +
                ")";

        // Kafka Source
        tEnv.executeSql(kafkaSource11);
        String query = "SELECT user_id,item_id,cat_id,action,province,ts FROM user_behavior";
        Table table = tEnv.sqlQuery(query);

        String insertIntoCkSql = "INSERT INTO behavior_mergetree(user_id,item_id,cat_id,action,province,ts)\n" +
                "VALUES(?,?,?,?,?,?)";

        //将数据写入 ClickHouse Sink
        JDBCAppendTableSink sink = JDBCAppendTableSink
                .builder()
                .setDrivername(driver)
                .setDBUrl(url)
                .setUsername(user)
                .setPassword(passwd)
                .setQuery(insertIntoCkSql)
                .setBatchSize(batchsize)
                .setParameterTypes(Types.LONG, Types.LONG,Types.LONG, Types.STRING,Types.INT,Types.LONG)
                .build();

        String[] arr = {"user_id","item_id","cat_id","action","province","ts"};
        TypeInformation[] type = {Types.LONG, Types.LONG,Types.LONG, Types.STRING,Types.INT,Types.LONG};

        tEnv.registerTableSink(
                "sink",
                arr,
                type,
                sink
        );

        tEnv.insertInto(table, "sink");

        tEnv.execute("Flink Table API to ClickHouse Example");
    }

}

Note:

  • 因为 ClickHouse 单次插入的提早比拟高,咱们须要设置 BatchSize 来批量插入数据,进步性能。
  • 在 JDBCAppendTableSink 的实现中,若最初一批数据的数目有余 BatchSize,则不会插入残余数据。

应用Spark导入数据

本文次要介绍如何通过Spark程序写入数据到Clickhouse中。

<dependency>
      <groupId>ru.yandex.clickhouse</groupId>
      <artifactId>clickhouse-jdbc</artifactId>
      <version>0.2.4</version>
</dependency>
<!-- 如果报错:Caused by: java.lang.ClassNotFoundException: com.google.common.escape.Escapers,则增加上面的依赖 -->
<dependency>
          <groupId>com.google.guava</groupId>
          <artifactId>guava</artifactId>
          <version>28.0-jre</version>
</dependency>

示例

object Spark2ClickHouseExample {

  val properties = new Properties()
  properties.put("driver", "ru.yandex.clickhouse.ClickHouseDriver")
  properties.put("user", "default")
  properties.put("password", "hOn0d9HT")
  properties.put("batchsize", "1000")
  properties.put("socket_timeout", "300000")
  properties.put("numPartitions", "8")
  properties.put("rewriteBatchedStatements", "true")

  case class Person(name: String, age: Long)

  private def runDatasetCreationExample(spark: SparkSession): Dataset[Person] = {
    import spark.implicits._
    // DataFrames转成DataSet
    val path = "file:///e:/people.json"
    val peopleDS = spark.read.json(path)
    peopleDS.createOrReplaceTempView("people")
    val ds = spark.sql("SELECT name,age FROM people").as[Person]
    ds.show()
    ds
  }

  def main(args: Array[String]) {


    val url = "jdbc:clickhouse://kms-1:8123/default"
    val table = "people"

    val spark = SparkSession
      .builder()
      .appName("Spark  Example")
      .master("local") //设置为本地运行
      .getOrCreate()
    val ds = runDatasetCreationExample(spark)

    ds.write.mode(SaveMode.Append).option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 100000).jdbc(url, table, properties)
    spark.stop()
  }
}

从Kafka中导入数据

次要是应用ClickHouse的表引擎。

应用形式

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
    ...
) ENGINE = Kafka()
SETTINGS
    kafka_broker_list = 'host:port',
    kafka_topic_list = 'topic1,topic2,...',
    kafka_group_name = 'group_name',
    kafka_format = 'data_format'[,]
    [kafka_row_delimiter = 'delimiter_symbol',]
    [kafka_schema = '',]
    [kafka_num_consumers = N,]
    [kafka_max_block_size = 0,]
    [kafka_skip_broken_messages = N,]
    [kafka_commit_every_batch = 0,]
    [kafka_thread_per_consumer = 0]
  • kafka_broker_list :逗号分隔的brokers地址 (localhost:9092).
  • kafka_topic_list :Kafka 主题列表,多个主题用逗号分隔.
  • kafka_group_name :消费者组.
  • kafka_format – Message format. 比方JSONEachRow、JSON、CSV等等

应用示例

在kafka中创立user_behavior主题,并向该主题写入数据,数据示例为:

{"user_id":63401,"item_id":6244,"cat_id":143,"action":"pv","province":3,"ts":1573445919}
{"user_id":9164,"item_id":2817,"cat_id":611,"action":"fav","province":28,"ts":1573420486}
{"user_id":63401,"item_id":6244,"cat_id":143,"action":"pv","province":3,"ts":1573445919}

在ClickHouse中创立表,抉择表引擎为Kafka(),如下:

 CREATE TABLE kafka_user_behavior (
    user_id UInt64 COMMENT '用户id',
    item_id UInt64 COMMENT '商品id',
    cat_id UInt16  COMMENT '品类id',
    action String  COMMENT '行为',
    province UInt8 COMMENT '省份id',
    ts UInt64      COMMENT '工夫戳'
  ) ENGINE = Kafka()
    SETTINGS
    kafka_broker_list = 'cdh04:9092',
    kafka_topic_list = 'user_behavior',
    kafka_group_name = 'group1',
    kafka_format = 'JSONEachRow'
;
-- 查问
cdh04 :) select * from kafka_user_behavior ;

-- 再次查看数据,发现数据为空
cdh04 :) select count(*) from kafka_user_behavior;

SELECT count(*)
FROM kafka_user_behavior

┌─count()─┐
│       0 │
└─────────┘

通过物化视图将kafka数据导入ClickHouse

当咱们一旦查问结束之后,ClickHouse会删除表内的数据,其实Kafka表引擎只是一个数据管道,咱们能够通过物化视图的形式拜访Kafka中的数据。

  • 首先创立一张Kafka表引擎的表,用于从Kafka中读取数据
  • 而后再创立一张一般表引擎的表,比方MergeTree,面向终端用户应用
  • 最初创立物化视图,用于将Kafka引擎表实时同步到终端用户所应用的表中
--  创立Kafka引擎表
 CREATE TABLE kafka_user_behavior_src (
    user_id UInt64 COMMENT '用户id',
    item_id UInt64 COMMENT '商品id',
    cat_id UInt16  COMMENT '品类id',
    action String  COMMENT '行为',
    province UInt8 COMMENT '省份id',
    ts UInt64      COMMENT '工夫戳'
  ) ENGINE = Kafka()
    SETTINGS
    kafka_broker_list = 'cdh04:9092',
    kafka_topic_list = 'user_behavior',
    kafka_group_name = 'group1',
    kafka_format = 'JSONEachRow'
;

-- 创立一张终端用户应用的表
 CREATE TABLE kafka_user_behavior (
    user_id UInt64 COMMENT '用户id',
    item_id UInt64 COMMENT '商品id',
    cat_id UInt16  COMMENT '品类id',
    action String  COMMENT '行为',
    province UInt8 COMMENT '省份id',
    ts UInt64      COMMENT '工夫戳'
  ) ENGINE = MergeTree()
    ORDER BY user_id
;
-- 创立物化视图,同步数据
CREATE MATERIALIZED VIEW user_behavior_consumer TO kafka_user_behavior
    AS SELECT * FROM kafka_user_behavior_src ;
-- 查问,屡次查问,曾经被查问的数据仍然会被输入
cdh04 :) select * from kafka_user_behavior;

Note:

Kafka生产表不能间接作为后果表应用。Kafka生产表只是用来生产Kafka数据,没有真正的存储所有数据。

从MySQL中导入数据

同kafka中导入数据相似,ClickHouse同样反对MySQL表引擎,即映射一张MySQL中的表到ClickHouse中。

数据类型对应关系

MySQL中数据类型与ClickHouse类型映射关系如下表。

MySQL ClickHouse
UNSIGNED TINYINT UInt8
TINYINT Int8
UNSIGNED SMALLINT UInt16
SMALLINT Int16
UNSIGNED INT, UNSIGNED MEDIUMINT UInt32
INT, MEDIUMINT Int32
UNSIGNED BIGINT UInt64
BIGINT Int64
FLOAT Float32
DOUBLE Float64
DATE Date
DATETIME, TIMESTAMP DateTime
BINARY FixedString

应用形式

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1] [TTL expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2] [TTL expr2],
    ...
) ENGINE = MySQL('host:port', 'database', 'table', 'user', 'password'[, replace_query, 'on_duplicate_clause']);

应用示例

-- 连贯MySQL中clickhouse数据库的test表
CREATE TABLE mysql_users(
    id Int32,
    name String
) ENGINE = MySQL(
 '192.168.10.203:3306',
 'clickhouse',
 'users', 
 'root', 
 '123qwe');
-- 查问数据
cdh04 :) SELECT * FROM mysql_users;

SELECT *
FROM mysql_users

┌─id─┬─name──┐
│  1 │ tom   │
│  2 │ jack  │
│  3 │ lihua │
└────┴───────┘
-- 插入数据,会将数据插入MySQL对应的表中
-- 所以当查问MySQL数据时,会发现新增了一条数据
INSERT INTO users VALUES(4,'robin');
-- 再次查问
cdh04 :) select * from mysql_users;                

SELECT *
FROM mysql_users

┌─id─┬─name──┐
│  1 │ tom   │
│  2 │ jack  │
│  3 │ lihua │
│  4 │ robin │
└────┴───────┘

留神:对于MySQL表引擎,不反对UPDATE和DELETE操作,比方执行上面命令时,会报错:

-- 执行更新
ALTER TABLE mysql_users UPDATE name = 'hanmeimei' WHERE id = 1;
-- 执行删除
ALTER TABLE mysql_users DELETE WHERE id = 1;
-- 报错
DB::Exception: Mutations are not supported by storage MySQL.

从Hive中导入数据

本文应用Waterdrop进行数据导入,Waterdrop是一个十分易用,高性能,可能应答海量数据的实时数据处理产品,它构建在Spark之上。Waterdrop领有着十分丰盛的插件,反对从Kafka、HDFS、Kudu中读取数据,进行各种各样的数据处理,并将后果写入ClickHouse、Elasticsearch或者Kafka中。

咱们仅须要编写一个Waterdrop Pipeline的配置文件即可实现数据的导入。配置文件包含四个局部,别离是Spark、Input、filter和Output。

对于Waterdrop的装置,非常简略,只须要下载ZIP文件,解压即可。应用Waterdrop须要装置Spark。

  • 在Waterdrop装置目录的config/文件夹下创立配置文件:hive_table_batch.conf,内容如下。次要包含四局部:Spark、Input、filter和Output。

    • Spark局部是Spark的相干配置,次要配置Spark执行时所需的资源大小。
    • Input局部是定义数据源,其中pre_sql是从Hive中读取数据SQL,table_name是将读取后的数据,注册成为Spark中长期表的表名,可为任意字段。
    • filter局部配置一系列的转化,比方过滤字段
    • Output局部是将解决好的结构化数据写入ClickHouse,ClickHouse的连贯配置。

      须要留神的是,必须保障hive的metastore是在服务状态。

spark {
  spark.app.name = "Waterdrop_Hive2ClickHouse"
  spark.executor.instances = 2
  spark.executor.cores = 1
  spark.executor.memory = "1g"
  // 这个配置必须填写
  spark.sql.catalogImplementation = "hive"
}
input {
    hive {
        pre_sql = "select * from default.users"
        table_name = "hive_users"
    }
}
filter {}
output {
    clickhouse {
        host = "kms-1:8123"
        database = "default"
        table = "users"
        fields = ["id", "name"]
        username = "default"
        password = "hOn0d9HT"
    }
}
  • 执行工作
[kms@kms-1 waterdrop-1.5.1]$ bin/start-waterdrop.sh  --config config/hive_table_batch.conf --master yarn --deploy-mode cluster

这样就会启动一个Spark作业执行数据的抽取,等执行实现之后,查看ClickHouse的数据。

总结

本文次要介绍了如何通过Flink、Spark、Kafka、MySQL以及Hive,将数据导入到ClickHouse,对每一种形式都出了具体的示例,心愿对你有所帮忙。

公众号『大数据技术与数仓』,回复『材料』支付大数据资料包

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理