摘要:本文介绍如何应用 Hudi 自带入湖工具 DeltaStreamer 进行数据的实时入湖。
本文分享自华为云社区《华为 FusionInsight MRS 实战 – Hudi 实时入湖之 DeltaStreamer 工具最佳实际》,作者:晋红轻。
背景
传统大数据平台的组织架构是针对离线数据处理需要设计的,罕用的数据导入形式为采纳 sqoop 定时作业批量导入。随着数据分析对实时性要求一直进步,按小时、甚至分钟级的数据同步越来越广泛。由此开展了基于 spark/flink 流解决机制的(准)实时同步零碎的开发。
然而实时同步从一开始就面临如下几个挑战:
- 小文件问题。不论是 spark 的 microbatch 模式,还是 flink 的逐条解决模式,每次写入 HDFS 时都是几 MB 甚至几十 KB 的文件。长时间下来产生的大量小文件,会对 HDFS namenode 产生微小的压力。
- 对 update 操作的反对。HDFS 零碎自身不反对数据的批改,无奈实现同步过程中对记录进行批改。
- 事务性。不论是追加数据还是批改数据,如何保障事务性。即数据只在流处理程序 commit 操作时一次性写入 HDFS,当程序 rollback 时,已写入或局部写入的数据能随之删除。
Hudi 就是针对以上问题的解决方案之一。应用 Hudi 自带的 DeltaStreamer 工具写数据到 Hudi,开启–enable-hive-sync 即可同步数据到 hive 表。
Hudi DeltaStreamer 写入工具介绍
DeltaStreamer 工具应用参考 https://hudi.apache.org/cn/do…
HoodieDeltaStreamer 实用工具 (hudi-utilities-bundle 中的一部分) 提供了从 DFS 或 Kafka 等不同起源进行摄取的形式,并具备以下性能。
- 从 Kafka 单次摄取新事件,从 Sqoop、HiveIncrementalPuller 输入或 DFS 文件夹中的多个文件
- 反对 json、avro 或自定义记录类型的传入数据
- 治理检查点,回滚和复原
- 利用 DFS 或 Confluent schema 注册表的 Avro 模式。
- 反对自定义转换操作
场景阐明
- 生产库数据通过 CDC 工具(debezium)实时录入到 MRS 集群中 Kafka 的指定 topic 里。
- 通过 Hudi 提供的 DeltaStreamer 工具,读取 Kafka 指定 topic 里的数据并解析解决。
- 同时应用 DeltaStreamer 工具将解决后的数据写入到 MRS 集群的 hive 里。
样例数据简介
生产库 MySQL 原始数据:
CDC 工具 debezium 简介
对接步骤具体参考:https://fusioninsight.github….
实现对接后,针对 MySQL 生产库别离做增、改、删除操作对应的 kafka 音讯
减少操作: insert into hudi.hudisource3 values (11,“蒋语堂”,“38”,“女”,“图”,“播放器”,“28732”);
对应 kafka 音讯体:
更改操作:UPDATE hudi.hudisource3 SET uname=‘Anne Marie333’WHERE uid=11;
对应 kafka 音讯体:
删除操作:delete from hudi.hudisource3 where uid=11;
对应 kafka 音讯体:
调试步骤
华为 MRS Hudi 样例工程获取
依据理论 MRS 版本登录 github 获取样例代码:https://github.com/huaweiclou…
关上工程 SparkOnHudiJavaExample
样例代码批改及介绍
1.debeziumJsonParser
阐明:对 debezium 的音讯体进行解析,获取到 op 字段。
源码如下:
package com.huawei.bigdata.hudi.examples;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
public class debeziumJsonParser {public static String getOP(String message){JSONObject json_obj = JSON.parseObject(message);
String op = json_obj.getJSONObject("payload").get("op").toString();
return op;
}
}
2.MyJsonKafkaSource
阐明:DeltaStreamer 默认应用 org.apache.hudi.utilities.sources.JsonKafkaSource 生产 kafka 指定 topic 的数据,如果生产阶段波及数据的解析操作,则须要重写 MyJsonKafkaSource 进行解决。
以下是源码,减少正文
package com.huawei.bigdata.hudi.examples;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.parser.Feature;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.JsonSource;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.spark.streaming.kafka010.OffsetRange;
import java.util.Map;
/**
* Read json kafka data.
*/
public class MyJsonKafkaSource extends JsonSource {private static final Logger LOG = LogManager.getLogger(MyJsonKafkaSource.class);
private final KafkaOffsetGen offsetGen;
private final HoodieDeltaStreamerMetrics metrics;
public MyJsonKafkaSource(TypedProperties properties, JavaSparkContext sparkContext, SparkSession sparkSession,
SchemaProvider schemaProvider) {super(properties, sparkContext, sparkSession, schemaProvider);
HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder();
this.metrics = new HoodieDeltaStreamerMetrics(builder.withProperties(properties).build());
properties.put("key.deserializer", StringDeserializer.class);
properties.put("value.deserializer", StringDeserializer.class);
offsetGen = new KafkaOffsetGen(properties);
}
@Override
protected InputBatch<JavaRDD<String>> fetchNewData(Option<String> lastCheckpointStr, long sourceLimit) {OffsetRange[] offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit, metrics);
long totalNewMsgs = CheckpointUtils.totalNewMessages(offsetRanges);
LOG.info("About to read" + totalNewMsgs + "from Kafka for topic :" + offsetGen.getTopicName());
if (totalNewMsgs <= 0) {return new InputBatch<>(Option.empty(), CheckpointUtils.offsetsToStr(offsetRanges));
}
JavaRDD<String> newDataRDD = toRDD(offsetRanges);
return new InputBatch<>(Option.of(newDataRDD), CheckpointUtils.offsetsToStr(offsetRanges));
}
private JavaRDD<String> toRDD(OffsetRange[] offsetRanges) {return KafkaUtils.createRDD(this.sparkContext, this.offsetGen.getKafkaParams(), offsetRanges, LocationStrategies.PreferConsistent()).filter((x)->{
// 过滤空行和脏数据
String msg = (String)x.value();
if (msg == null) {return false;}
try{String op = debeziumJsonParser.getOP(msg);
}catch (Exception e){return false;}
return true;
}).map((x) -> {
// 将 debezium 接进来的数据解析写进 map, 在返回 map 的 tostring, 这样构造改变最小
String msg = (String)x.value();
String op = debeziumJsonParser.getOP(msg);
JSONObject json_obj = JSON.parseObject(msg, Feature.OrderedField);
Boolean is_delete = false;
String out_str = "";
Object out_obj = new Object();
if(op.equals("c")){out_obj = json_obj.getJSONObject("payload").get("after");
}
else if(op.equals("u")){out_obj = json_obj.getJSONObject("payload").get("after");
}
else {
is_delete = true;
out_obj = json_obj.getJSONObject("payload").get("before");
}
Map out_map = (Map)out_obj;
out_map.put("_hoodie_is_deleted",is_delete);
out_map.put("op",op);
return out_map.toString();});
}
}
3.TransformerExample
阐明:入湖 hudi 表或者 hive 表时候须要指定的字段
以下是源码,减少正文
package com.huawei.bigdata.hudi.examples;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.utilities.transform.Transformer;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
/**
* 性能形容
* 对获取的数据进行 format
*/
public class TransformerExample implements Transformer, Serializable {
/**
* format data
*
* @param JavaSparkContext jsc
* @param SparkSession sparkSession
* @param Dataset<Row> rowDataset
* @param TypedProperties properties
* @return Dataset<Row>
*/
@Override
public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset,
TypedProperties properties) {JavaRDD<Row> rowJavaRdd = rowDataset.toJavaRDD();
List<Row> rowList = new ArrayList<>();
for (Row row : rowJavaRdd.collect()) {Row one_row = buildRow(row);
rowList.add(one_row);
}
JavaRDD<Row> stringJavaRdd = jsc.parallelize(rowList);
List<StructField> fields = new ArrayList<>();
builFields(fields);
StructType schema = DataTypes.createStructType(fields);
Dataset<Row> dataFrame = sparkSession.createDataFrame(stringJavaRdd, schema);
return dataFrame;
}
private void builFields(List<StructField> fields) {fields.add(DataTypes.createStructField("uid", DataTypes.IntegerType, true));
fields.add(DataTypes.createStructField("uname", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("age", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("sex", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("mostlike", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("lastview", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("totalcost", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("_hoodie_is_deleted", DataTypes.BooleanType, true));
fields.add(DataTypes.createStructField("op", DataTypes.StringType, true));
}
private Row buildRow(Row row) {Integer uid = row.getInt(0);
String uname = row.getString(1);
String age = row.getString(2);
String sex = row.getString(3);
String mostlike = row.getString(4);
String lastview = row.getString(5);
String totalcost = row.getString(6);
Boolean _hoodie_is_deleted = row.getBoolean(7);
String op = row.getString(8);
Row returnRow = RowFactory.create(uid, uname, age, sex, mostlike, lastview, totalcost, _hoodie_is_deleted, op);
return returnRow;
}
}
4.DataSchemaProviderExample
阐明:别离指定 MyJsonKafkaSource 返回的数据格式为 source schema,TransformerExample 写入的数据格式为 target schema
以下是源码
package com.huawei.bigdata.hudi.examples;
import org.apache.avro.Schema;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.spark.api.java.JavaSparkContext;
/**
* 性能形容
* 提供 sorce 和 target 的 schema
*/
public class DataSchemaProviderExample extends SchemaProvider {public DataSchemaProviderExample(TypedProperties props, JavaSparkContext jssc) {super(props, jssc);
}
/**
* source schema
*
* @return Schema
*/
@Override
public Schema getSourceSchema() {Schema avroSchema = new Schema.Parser().parse("{\"type\":\"record\",\"name\":\"hoodie_source\",\"fields\":[{\"name\":\"uid\",\"type\":\"int\"},{\"name\":\"uname\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"string\"},{\"name\":\"sex\",\"type\":\"string\"},{\"name\":\"mostlike\",\"type\":\"string\"},{\"name\":\"lastview\",\"type\":\"string\"},{\"name\":\"totalcost\",\"type\":\"string\"},{\"name\":\"_hoodie_is_deleted\",\"type\":\"boolean\"},{\"name\":\"op\",\"type\":\"string\"}]}");
return avroSchema;
}
/**
* target schema
*
* @return Schema
*/
@Override
public Schema getTargetSchema() {Schema avroSchema = new Schema.Parser().parse("{\"type\":\"record\",\"name\":\"mytest_record\",\"namespace\":\"hoodie.mytest\",\"fields\":[{\"name\":\"uid\",\"type\":\"int\"},{\"name\":\"uname\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"string\"},{\"name\":\"sex\",\"type\":\"string\"},{\"name\":\"mostlike\",\"type\":\"string\"},{\"name\":\"lastview\",\"type\":\"string\"},{\"name\":\"totalcost\",\"type\":\"string\"},{\"name\":\"_hoodie_is_deleted\",\"type\":\"boolean\"},{\"name\":\"op\",\"type\":\"string\"}]}");
return avroSchema;
}
}
将工程打包(hudi-security-examples-0.7.0.jar)以及 json 解析包(fastjson-1.2.4.jar)上传至 MRS 客户端
DeltaStreamer 启动命令
登录客户端执行一下命令获取环境变量以及认证
source /opt/hadoopclient/bigdata_env
kinit developuser
source /opt/hadoopclient/Hudi/component_env
DeltaStreamer 启动命令如下:
spark-submit --master yarn-client \
--jars /opt/hudi-demo2/fastjson-1.2.4.jar,/opt/hudi-demo2/hudi-security-examples-0.7.0.jar \
--driver-class-path /opt/hadoopclient/Hudi/hudi/conf:/opt/hadoopclient/Hudi/hudi/lib/*:/opt/hadoopclient/Spark2x/spark/jars/*:/opt/hudi-demo2/hudi-security-examples-0.7.0.jar \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
spark-internal --props file:///opt/hudi-demo2/kafka-source.properties \
--target-base-path /tmp/huditest/delta_demo2 \
--table-type COPY_ON_WRITE \
--target-table delta_demo2 \
--source-ordering-field uid \
--source-class com.huawei.bigdata.hudi.examples.MyJsonKafkaSource \
--schemaprovider-class com.huawei.bigdata.hudi.examples.DataSchemaProviderExample \
--transformer-class com.huawei.bigdata.hudi.examples.TransformerExample \
--enable-hive-sync --continuous
kafka.properties 配置
// hudi 配置
hoodie.datasource.write.recordkey.field=uid
hoodie.datasource.write.partitionpath.field=
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.NonpartitionedKeyGenerator
hoodie.datasource.write.hive_style_partitioning=true
hoodie.delete.shuffle.parallelism=10
hoodie.upsert.shuffle.parallelism=10
hoodie.bulkinsert.shuffle.parallelism=10
hoodie.insert.shuffle.parallelism=10
hoodie.finalize.write.parallelism=10
hoodie.cleaner.parallelism=10
hoodie.datasource.write.precombine.field=uid
hoodie.base.path = /tmp/huditest/delta_demo2
hoodie.timeline.layout.version = 1
// hive config
hoodie.datasource.hive_sync.table=delta_demo2
hoodie.datasource.hive_sync.partition_fields=
hoodie.datasource.hive_sync.assume_date_partitioning=false
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.NonPartitionedExtractor
hoodie.datasource.hive_sync.use_jdbc=false
// Kafka Source topic
hoodie.deltastreamer.source.kafka.topic=hudisource
// checkpoint
hoodie.deltastreamer.checkpoint.provider.path=hdfs://hacluster/tmp/delta_demo2/checkpoint/
// Kafka props
bootstrap.servers=172.16.9.117:21005
auto.offset.reset=earliest
group.id=a5
offset.rang.limit=10000
留神:kafka 服务端配置 allow.everyone.if.no.acl.found 为 true
应用 Spark 查问
spark-shell --master yarn
val roViewDF = spark.read.format("org.apache.hudi").load("/tmp/huditest/delta_demo2/*")
roViewDF.createOrReplaceTempView("hudi_ro_table")
spark.sql("select * from hudi_ro_table").show()
Mysql 减少操作对应 spark 中 hudi 表查问后果:
Mysql 更新操作对应 spark 中 hudi 表查问后果:
删除操作:
应用 Hive 查问
beeline
select * from delta_demo2;
Mysql 减少操作对应 hive 表中查问后果:
Mysql 更新操作对应 hive 表中查问后果:
Mysql 删除操作对应 hive 表中查问后果:
点击关注,第一工夫理解华为云陈腐技术~