共计 9239 个字符,预计需要花费 24 分钟才能阅读完成。
本文分享次要是 ClickHouse 的数据导入形式,本文次要介绍如何应用 Flink、Spark、Kafka、MySQL、Hive 将数据导入 ClickHouse,具体内容包含:
- 应用 Flink 导入数据
- 应用 Spark 导入数据
- 从 Kafka 中导入数据
- 从 MySQL 中导入数据
- 从 Hive 中导入数据
应用 Flink 导入数据
本文介绍应用 flink-jdbc 将数据导入 ClickHouse,Maven 依赖为:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_${scala.binary.version}</artifactId>
<version>1.10.1</version>
</dependency>
示例
本示例应用 Kafka connector,通过 Flink 将 Kafka 数据实时导入到 ClickHouse
public class FlinkSinkClickHouse {public static void main(String[] args) throws Exception {
String url = "jdbc:clickhouse://192.168.10.203:8123/default";
String user = "default";
String passwd = "hOn0d9HT";
String driver = "ru.yandex.clickhouse.ClickHouseDriver";
int batchsize = 500; // 设置 batch size,测试的话能够设置小一点,这样能够立即看到数据被写入
// 创立执行环境
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
String kafkaSource11 = ""+"CREATE TABLE user_behavior ("+" `user_id` BIGINT, -- 用户 id\n"+" `item_id` BIGINT, -- 商品 id\n"+" `cat_id` BIGINT, -- 品类 id\n"+" `action` STRING, -- 用户行为 \n"+" `province` INT, -- 用户所在的省份 \n"+" `ts` BIGINT, -- 用户行为产生的工夫戳 \n"+" `proctime` AS PROCTIME(), -- 通过计算列产生一个解决工夫列 \n"+" `eventTime` AS TO_TIMESTAMP(FROM_UNIXTIME(ts, 'yyyy-MM-dd HH:mm:ss')), -- 事件工夫 \n"+" WATERMARK FOR eventTime AS eventTime - INTERVAL '5' SECOND -- 在 eventTime 上定义 watermark\n"+") WITH ( 'connector' = 'kafka', -- 应用 kafka connector\n"+" 'topic' = 'user_behavior', -- kafka 主题 \n"+" 'scan.startup.mode' = 'earliest-offset', -- 偏移量,从起始 offset 开始读取 \n"+" 'properties.group.id' = 'group1', -- 消费者组 \n"+" 'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092', -- kafka broker 地址 \n"+" 'format' = 'json', -- 数据源格局为 json\n"+" 'json.fail-on-missing-field' = 'true',\n"+" 'json.ignore-parse-errors' = 'false'" +
")";
// Kafka Source
tEnv.executeSql(kafkaSource11);
String query = "SELECT user_id,item_id,cat_id,action,province,ts FROM user_behavior";
Table table = tEnv.sqlQuery(query);
String insertIntoCkSql = "INSERT INTO behavior_mergetree(user_id,item_id,cat_id,action,province,ts)\n" +
"VALUES(?,?,?,?,?,?)";
// 将数据写入 ClickHouse Sink
JDBCAppendTableSink sink = JDBCAppendTableSink
.builder()
.setDrivername(driver)
.setDBUrl(url)
.setUsername(user)
.setPassword(passwd)
.setQuery(insertIntoCkSql)
.setBatchSize(batchsize)
.setParameterTypes(Types.LONG, Types.LONG,Types.LONG, Types.STRING,Types.INT,Types.LONG)
.build();
String[] arr = {"user_id","item_id","cat_id","action","province","ts"};
TypeInformation[] type = {Types.LONG, Types.LONG,Types.LONG, Types.STRING,Types.INT,Types.LONG};
tEnv.registerTableSink(
"sink",
arr,
type,
sink
);
tEnv.insertInto(table, "sink");
tEnv.execute("Flink Table API to ClickHouse Example");
}
}
Note:
- 因为 ClickHouse 单次插入的提早比拟高,咱们须要设置
BatchSize
来批量插入数据,进步性能。- 在 JDBCAppendTableSink 的实现中,若最初一批数据的数目有余
BatchSize
,则不会插入残余数据。
应用 Spark 导入数据
本文次要介绍如何通过 Spark 程序写入数据到 Clickhouse 中。
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.2.4</version>
</dependency>
<!-- 如果报错:Caused by: java.lang.ClassNotFoundException: com.google.common.escape.Escapers,则增加上面的依赖 -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>28.0-jre</version>
</dependency>
示例
object Spark2ClickHouseExample {val properties = new Properties()
properties.put("driver", "ru.yandex.clickhouse.ClickHouseDriver")
properties.put("user", "default")
properties.put("password", "hOn0d9HT")
properties.put("batchsize", "1000")
properties.put("socket_timeout", "300000")
properties.put("numPartitions", "8")
properties.put("rewriteBatchedStatements", "true")
case class Person(name: String, age: Long)
private def runDatasetCreationExample(spark: SparkSession): Dataset[Person] = {
import spark.implicits._
// DataFrames 转成 DataSet
val path = "file:///e:/people.json"
val peopleDS = spark.read.json(path)
peopleDS.createOrReplaceTempView("people")
val ds = spark.sql("SELECT name,age FROM people").as[Person]
ds.show()
ds
}
def main(args: Array[String]) {
val url = "jdbc:clickhouse://kms-1:8123/default"
val table = "people"
val spark = SparkSession
.builder()
.appName("Spark Example")
.master("local") // 设置为本地运行
.getOrCreate()
val ds = runDatasetCreationExample(spark)
ds.write.mode(SaveMode.Append).option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 100000).jdbc(url, table, properties)
spark.stop()}
}
从 Kafka 中导入数据
次要是应用 ClickHouse 的表引擎。
应用形式
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
...
) ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'host:port',
kafka_topic_list = 'topic1,topic2,...',
kafka_group_name = 'group_name',
kafka_format = 'data_format'[,]
[kafka_row_delimiter = 'delimiter_symbol',]
[kafka_schema = '',]
[kafka_num_consumers = N,]
[kafka_max_block_size = 0,]
[kafka_skip_broken_messages = N,]
[kafka_commit_every_batch = 0,]
[kafka_thread_per_consumer = 0]
kafka_broker_list
:逗号分隔的 brokers 地址 (localhost:9092).kafka_topic_list
:Kafka 主题列表,多个主题用逗号分隔.kafka_group_name
:消费者组.kafka_format
– Message format. 比方JSONEachRow
、JSON、CSV 等等
应用示例
在 kafka 中创立 user_behavior 主题,并向该主题写入数据,数据示例为:
{"user_id":63401,"item_id":6244,"cat_id":143,"action":"pv","province":3,"ts":1573445919}
{"user_id":9164,"item_id":2817,"cat_id":611,"action":"fav","province":28,"ts":1573420486}
{"user_id":63401,"item_id":6244,"cat_id":143,"action":"pv","province":3,"ts":1573445919}
在 ClickHouse 中创立表,抉择表引擎为 Kafka(),如下:
CREATE TABLE kafka_user_behavior (
user_id UInt64 COMMENT '用户 id',
item_id UInt64 COMMENT '商品 id',
cat_id UInt16 COMMENT '品类 id',
action String COMMENT '行为',
province UInt8 COMMENT '省份 id',
ts UInt64 COMMENT '工夫戳'
) ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'cdh04:9092',
kafka_topic_list = 'user_behavior',
kafka_group_name = 'group1',
kafka_format = 'JSONEachRow'
;
-- 查问
cdh04 :) select * from kafka_user_behavior ;
-- 再次查看数据,发现数据为空
cdh04 :) select count(*) from kafka_user_behavior;
SELECT count(*)
FROM kafka_user_behavior
┌─count()─┐
│ 0 │
└─────────┘
通过物化视图将 kafka 数据导入 ClickHouse
当咱们一旦查问结束之后,ClickHouse 会删除表内的数据,其实 Kafka 表引擎只是一个数据管道,咱们能够通过物化视图的形式拜访 Kafka 中的数据。
- 首先创立一张 Kafka 表引擎的表,用于从 Kafka 中读取数据
- 而后再创立一张一般表引擎的表,比方 MergeTree,面向终端用户应用
- 最初创立物化视图,用于将 Kafka 引擎表实时同步到终端用户所应用的表中
-- 创立 Kafka 引擎表
CREATE TABLE kafka_user_behavior_src (
user_id UInt64 COMMENT '用户 id',
item_id UInt64 COMMENT '商品 id',
cat_id UInt16 COMMENT '品类 id',
action String COMMENT '行为',
province UInt8 COMMENT '省份 id',
ts UInt64 COMMENT '工夫戳'
) ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'cdh04:9092',
kafka_topic_list = 'user_behavior',
kafka_group_name = 'group1',
kafka_format = 'JSONEachRow'
;
-- 创立一张终端用户应用的表
CREATE TABLE kafka_user_behavior (
user_id UInt64 COMMENT '用户 id',
item_id UInt64 COMMENT '商品 id',
cat_id UInt16 COMMENT '品类 id',
action String COMMENT '行为',
province UInt8 COMMENT '省份 id',
ts UInt64 COMMENT '工夫戳'
) ENGINE = MergeTree()
ORDER BY user_id
;
-- 创立物化视图,同步数据
CREATE MATERIALIZED VIEW user_behavior_consumer TO kafka_user_behavior
AS SELECT * FROM kafka_user_behavior_src ;
-- 查问,屡次查问,曾经被查问的数据仍然会被输入
cdh04 :) select * from kafka_user_behavior;
Note:
Kafka 生产表不能间接作为后果表应用。Kafka 生产表只是用来生产 Kafka 数据,没有真正的存储所有数据。
从 MySQL 中导入数据
同 kafka 中导入数据相似,ClickHouse 同样反对 MySQL 表引擎,即映射一张 MySQL 中的表到 ClickHouse 中。
数据类型对应关系
MySQL 中数据类型与 ClickHouse 类型映射关系如下表。
MySQL | ClickHouse |
---|---|
UNSIGNED TINYINT | UInt8 |
TINYINT | Int8 |
UNSIGNED SMALLINT | UInt16 |
SMALLINT | Int16 |
UNSIGNED INT, UNSIGNED MEDIUMINT | UInt32 |
INT, MEDIUMINT | Int32 |
UNSIGNED BIGINT | UInt64 |
BIGINT | Int64 |
FLOAT | Float32 |
DOUBLE | Float64 |
DATE | Date |
DATETIME, TIMESTAMP | DateTime |
BINARY | FixedString |
应用形式
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1] [TTL expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2] [TTL expr2],
...
) ENGINE = MySQL('host:port', 'database', 'table', 'user', 'password'[, replace_query, 'on_duplicate_clause']);
应用示例
-- 连贯 MySQL 中 clickhouse 数据库的 test 表
CREATE TABLE mysql_users(
id Int32,
name String
) ENGINE = MySQL(
'192.168.10.203:3306',
'clickhouse',
'users',
'root',
'123qwe');
-- 查问数据
cdh04 :) SELECT * FROM mysql_users;
SELECT *
FROM mysql_users
┌─id─┬─name──┐
│ 1 │ tom │
│ 2 │ jack │
│ 3 │ lihua │
└────┴───────┘
-- 插入数据,会将数据插入 MySQL 对应的表中
-- 所以当查问 MySQL 数据时,会发现新增了一条数据
INSERT INTO users VALUES(4,'robin');
-- 再次查问
cdh04 :) select * from mysql_users;
SELECT *
FROM mysql_users
┌─id─┬─name──┐
│ 1 │ tom │
│ 2 │ jack │
│ 3 │ lihua │
│ 4 │ robin │
└────┴───────┘
留神:对于 MySQL 表引擎,不反对 UPDATE 和 DELETE 操作,比方执行上面命令时,会报错:
-- 执行更新
ALTER TABLE mysql_users UPDATE name = 'hanmeimei' WHERE id = 1;
-- 执行删除
ALTER TABLE mysql_users DELETE WHERE id = 1;
-- 报错
DB::Exception: Mutations are not supported by storage MySQL.
从 Hive 中导入数据
本文应用 Waterdrop 进行数据导入,Waterdrop 是一个十分易用,高性能,可能应答海量数据的实时数据处理产品,它构建在 Spark 之上。Waterdrop 领有着十分丰盛的插件,反对从 Kafka、HDFS、Kudu 中读取数据,进行各种各样的数据处理,并将后果写入 ClickHouse、Elasticsearch 或者 Kafka 中。
咱们仅须要编写一个 Waterdrop Pipeline 的配置文件即可实现数据的导入。配置文件包含四个局部,别离是 Spark、Input、filter 和 Output。
对于 Waterdrop 的装置,非常简略,只须要下载 ZIP 文件,解压即可。应用 Waterdrop 须要装置 Spark。
-
在 Waterdrop 装置目录的 config/ 文件夹下创立配置文件:hive_table_batch.conf,内容如下。次要包含四局部:Spark、Input、filter 和 Output。
- Spark 局部是 Spark 的相干配置,次要配置 Spark 执行时所需的资源大小。
- Input 局部是定义数据源,其中
pre_sql
是从 Hive 中读取数据 SQL,table_name
是将读取后的数据,注册成为 Spark 中长期表的表名,可为任意字段。 - filter 局部配置一系列的转化,比方过滤字段
- Output 局部是将解决好的结构化数据写入 ClickHouse,ClickHouse 的连贯配置。
须要留神的是,必须保障 hive 的 metastore 是在服务状态。
spark {
spark.app.name = "Waterdrop_Hive2ClickHouse"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
// 这个配置必须填写
spark.sql.catalogImplementation = "hive"
}
input {
hive {
pre_sql = "select * from default.users"
table_name = "hive_users"
}
}
filter {}
output {
clickhouse {
host = "kms-1:8123"
database = "default"
table = "users"
fields = ["id", "name"]
username = "default"
password = "hOn0d9HT"
}
}
- 执行工作
[kms@kms-1 waterdrop-1.5.1]$ bin/start-waterdrop.sh --config config/hive_table_batch.conf --master yarn --deploy-mode cluster
这样就会启动一个 Spark 作业执行数据的抽取,等执行实现之后,查看 ClickHouse 的数据。
总结
本文次要介绍了如何通过 Flink、Spark、Kafka、MySQL 以及 Hive,将数据导入到 ClickHouse,对每一种形式都出了具体的示例,心愿对你有所帮忙。
公众号『大数据技术与数仓』,回复『材料』支付大数据资料包