乐趣区

关于程序员:华为云FusionInsight-MRS实战-Hudi实时入湖之DeltaStreamer工具最佳实践

背景

传统大数据平台的组织架构是针对离线数据处理需要设计的,罕用的数据导入形式为采纳 sqoop 定时作业批量导入。随着数据分析对实时性要求一直进步,按小时、甚至分钟级的数据同步越来越广泛。由此开展了基于 spark/flink 流解决机制的(准)实时同步零碎的开发。

然而实时同步从一开始就面临如下几个挑战:

  • 小文件问题。不论是 spark 的 microbatch 模式,还是 flink 的逐条解决模式,每次写入 HDFS 时都是几 MB 甚至几十 KB 的文件。长时间下来产生的大量小文件,会对 HDFS namenode 产生微小的压力。
  • 对 update 操作的反对。HDFS 零碎自身不反对数据的批改,无奈实现同步过程中对记录进行批改。
  • 事务性。不论是追加数据还是批改数据,如何保障事务性。即数据只在流处理程序 commit 操作时一次性写入 HDFS,当程序 rollback 时,已写入或局部写入的数据能随之删除。

Hudi 就是针对以上问题的解决方案之一。应用 Hudi 自带的 DeltaStreamer 工具写数据到 Hudi,开启–enable-hive-sync 即可同步数据到 hive 表。

Hudi DeltaStreamer 写入工具介绍

HoodieDeltaStreamer 实用工具 (hudi-utilities-bundle 中的一部分) 提供了从 DFS 或 Kafka 等不同起源进行摄取的形式,并具备以下性能。

  • 从 Kafka 单次摄取新事件,从 Sqoop、HiveIncrementalPuller 输入或 DFS 文件夹中的多个文件
  • 反对 json、avro 或自定义记录类型的传入数据
  • 治理检查点,回滚和复原
  • 利用 DFS 或 Confluent schema 注册表的 Avro 模式。
  • 反对自定义转换操作

场景阐明

  1. 生产库数据通过 CDC 工具(debezium)实时录入到 MRS 集群中 Kafka 的指定 topic 里。
  2. 通过 Hudi 提供的 DeltaStreamer 工具,读取 Kafka 指定 topic 里的数据并解析解决。
  3. 同时应用 DeltaStreamer 工具将解决后的数据写入到 MRS 集群的 hive 里。

样例数据简介

生产库 MySQL 原始数据:

CDC 工具 debezium 简介

对接步骤具体参考:https://fusioninsight.github….

实现对接后,针对 MySQL 生产库别离做增、改、删除操作对应的 kafka 音讯

减少操作: insert into hudi.hudisource3 values (11,“蒋语堂”,“38”,“女”,“图”,“播放器”,“28732”);

对应 kafka 音讯体:

更改操作:UPDATE hudi.hudisource3 SET uname=‘Anne Marie333’WHERE uid=11;

对应 kafka 音讯体:

删除操作:delete from hudi.hudisource3 where uid=11;

对应 kafka 音讯体:

调试步骤

华为云 MRS Hudi 样例工程获取

依据理论 MRS 版本登录 github 获取样例代码:https://github.com/huaweiclou…

关上工程 SparkOnHudiJavaExample

样例代码批改及介绍

1.debeziumJsonParser

阐明:对 debezium 的音讯体进行解析,获取到 op 字段。

源码如下:

package com.huawei.bigdata.hudi.examples;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;

public class debeziumJsonParser {public static String getOP(String message){JSONObject json_obj = JSON.parseObject(message);
        String op = json_obj.getJSONObject("payload").get("op").toString();
        return  op;
    }
}

2.MyJsonKafkaSource

阐明:DeltaStreamer 默认应用 org.apache.hudi.utilities.sources.JsonKafkaSource 生产 kafka 指定 topic 的数据,如果生产阶段波及数据的解析操作,则须要重写 MyJsonKafkaSource 进行解决。

以下是源码,减少正文

package com.huawei.bigdata.hudi.examples;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.parser.Feature;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.JsonSource;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.spark.streaming.kafka010.OffsetRange;
import java.util.Map;

/**
 * Read json kafka data.
 */
public class MyJsonKafkaSource extends JsonSource {private static final Logger LOG = LogManager.getLogger(MyJsonKafkaSource.class);

    private final KafkaOffsetGen offsetGen;

    private final HoodieDeltaStreamerMetrics metrics;

    public MyJsonKafkaSource(TypedProperties properties, JavaSparkContext sparkContext, SparkSession sparkSession,
                             SchemaProvider schemaProvider) {super(properties, sparkContext, sparkSession, schemaProvider);
        HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder();
        this.metrics = new HoodieDeltaStreamerMetrics(builder.withProperties(properties).build());
        properties.put("key.deserializer", StringDeserializer.class);
        properties.put("value.deserializer", StringDeserializer.class);
        offsetGen = new KafkaOffsetGen(properties);
    }

    @Override
    protected InputBatch<JavaRDD<String>> fetchNewData(Option<String> lastCheckpointStr, long sourceLimit) {OffsetRange[] offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit, metrics);
        long totalNewMsgs = CheckpointUtils.totalNewMessages(offsetRanges);
        LOG.info("About to read" + totalNewMsgs + "from Kafka for topic :" + offsetGen.getTopicName());
        if (totalNewMsgs <= 0) {return new InputBatch<>(Option.empty(), CheckpointUtils.offsetsToStr(offsetRanges));
        }
        JavaRDD<String> newDataRDD = toRDD(offsetRanges);
        return new InputBatch<>(Option.of(newDataRDD), CheckpointUtils.offsetsToStr(offsetRanges));
    }

    private JavaRDD<String> toRDD(OffsetRange[] offsetRanges) {return KafkaUtils.createRDD(this.sparkContext, this.offsetGen.getKafkaParams(), offsetRanges, LocationStrategies.PreferConsistent()).filter((x)->{
            // 过滤空行和脏数据
            String msg = (String)x.value();
            if (msg == null) {return false;}
            try{String op = debeziumJsonParser.getOP(msg);
            }catch (Exception e){return false;}
            return true;
        }).map((x) -> {
            // 将 debezium 接进来的数据解析写进 map, 在返回 map 的 tostring, 这样构造改变最小
            String msg = (String)x.value();
            String op = debeziumJsonParser.getOP(msg);
            JSONObject json_obj = JSON.parseObject(msg, Feature.OrderedField);
            Boolean is_delete = false;
            String out_str = "";
            Object out_obj = new Object();
            if(op.equals("c")){out_obj =  json_obj.getJSONObject("payload").get("after");
            }
            else if(op.equals("u")){out_obj =   json_obj.getJSONObject("payload").get("after");
            }
            else {
                is_delete = true;
                out_obj =   json_obj.getJSONObject("payload").get("before");
            }
            Map out_map = (Map)out_obj;
            out_map.put("_hoodie_is_deleted",is_delete);
            out_map.put("op",op);

            return out_map.toString();});
    }
}

3.TransformerExample

阐明:入湖 hudi 表或者 hive 表时候须要指定的字段

以下是源码,减少正文

package com.huawei.bigdata.hudi.examples;

import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.utilities.transform.Transformer;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

/**
 * 性能形容
 * 对获取的数据进行 format
 */
public class TransformerExample implements Transformer, Serializable {

    /**
     * format data
     *
     * @param JavaSparkContext jsc
     * @param SparkSession sparkSession
     * @param Dataset<Row> rowDataset
     * @param TypedProperties properties
     * @return Dataset<Row>
     */
    @Override
    public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset,
        TypedProperties properties) {JavaRDD<Row> rowJavaRdd = rowDataset.toJavaRDD();
        List<Row> rowList = new ArrayList<>();
        for (Row row : rowJavaRdd.collect()) {Row one_row = buildRow(row);
            rowList.add(one_row);
        }
        JavaRDD<Row> stringJavaRdd = jsc.parallelize(rowList);
        List<StructField> fields = new ArrayList<>();
        builFields(fields);
        StructType schema = DataTypes.createStructType(fields);
        Dataset<Row> dataFrame = sparkSession.createDataFrame(stringJavaRdd, schema);
        return dataFrame;
    }

    private void builFields(List<StructField> fields) {fields.add(DataTypes.createStructField("uid", DataTypes.IntegerType, true));
        fields.add(DataTypes.createStructField("uname", DataTypes.StringType, true));
        fields.add(DataTypes.createStructField("age", DataTypes.StringType, true));
        fields.add(DataTypes.createStructField("sex", DataTypes.StringType, true));
        fields.add(DataTypes.createStructField("mostlike", DataTypes.StringType, true));
        fields.add(DataTypes.createStructField("lastview", DataTypes.StringType, true));
        fields.add(DataTypes.createStructField("totalcost", DataTypes.StringType, true));
        fields.add(DataTypes.createStructField("_hoodie_is_deleted", DataTypes.BooleanType, true));
        fields.add(DataTypes.createStructField("op", DataTypes.StringType, true));
    }

    private Row buildRow(Row row) {Integer uid = row.getInt(0);
        String uname = row.getString(1);
        String age = row.getString(2);
        String sex = row.getString(3);
        String mostlike = row.getString(4);
        String lastview = row.getString(5);
        String totalcost = row.getString(6);
        Boolean _hoodie_is_deleted = row.getBoolean(7);
        String op = row.getString(8);
        Row returnRow = RowFactory.create(uid, uname, age, sex, mostlike, lastview, totalcost, _hoodie_is_deleted, op);
        return returnRow;
    }
}

4.DataSchemaProviderExample

阐明:别离指定 MyJsonKafkaSource 返回的数据格式为 source schema,TransformerExample 写入的数据格式为 target schema

以下是源码

package com.huawei.bigdata.hudi.examples;

import org.apache.avro.Schema;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.spark.api.java.JavaSparkContext;

/**
 * 性能形容
 * 提供 sorce 和 target 的 schema
 */
public class DataSchemaProviderExample extends SchemaProvider {public DataSchemaProviderExample(TypedProperties props, JavaSparkContext jssc) {super(props, jssc);
    }
    /**
     * source schema
     *
     * @return Schema
     */
    @Override
    public Schema getSourceSchema() {Schema avroSchema = new Schema.Parser().parse("{\"type\":\"record\",\"name\":\"hoodie_source\",\"fields\":[{\"name\":\"uid\",\"type\":\"int\"},{\"name\":\"uname\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"string\"},{\"name\":\"sex\",\"type\":\"string\"},{\"name\":\"mostlike\",\"type\":\"string\"},{\"name\":\"lastview\",\"type\":\"string\"},{\"name\":\"totalcost\",\"type\":\"string\"},{\"name\":\"_hoodie_is_deleted\",\"type\":\"boolean\"},{\"name\":\"op\",\"type\":\"string\"}]}");
        return avroSchema;
    }
    /**
     * target schema
     *
     * @return Schema
     */
    @Override
    public Schema getTargetSchema() {Schema avroSchema = new Schema.Parser().parse("{\"type\":\"record\",\"name\":\"mytest_record\",\"namespace\":\"hoodie.mytest\",\"fields\":[{\"name\":\"uid\",\"type\":\"int\"},{\"name\":\"uname\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"string\"},{\"name\":\"sex\",\"type\":\"string\"},{\"name\":\"mostlike\",\"type\":\"string\"},{\"name\":\"lastview\",\"type\":\"string\"},{\"name\":\"totalcost\",\"type\":\"string\"},{\"name\":\"_hoodie_is_deleted\",\"type\":\"boolean\"},{\"name\":\"op\",\"type\":\"string\"}]}");
        return avroSchema;
    }
}

将工程打包(hudi-security-examples-0.7.0.jar)以及 json 解析包(fastjson-1.2.4.jar)上传至 MRS 客户端

DeltaStreamer 启动命令

登录客户端执行一下命令获取环境变量以及认证

source /opt/hadoopclient/bigdata_env
kinit developuser
source /opt/hadoopclient/Hudi/component_env

DeltaStreamer 启动命令如下:

spark-submit --master yarn-client \
--jars /opt/hudi-demo2/fastjson-1.2.4.jar,/opt/hudi-demo2/hudi-security-examples-0.7.0.jar \
--driver-class-path /opt/hadoopclient/Hudi/hudi/conf:/opt/hadoopclient/Hudi/hudi/lib/*:/opt/hadoopclient/Spark2x/spark/jars/*:/opt/hudi-demo2/hudi-security-examples-0.7.0.jar \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
spark-internal --props file:///opt/hudi-demo2/kafka-source.properties \
--target-base-path /tmp/huditest/delta_demo2 \
--table-type COPY_ON_WRITE  \
--target-table delta_demo2  \
--source-ordering-field uid \
--source-class com.huawei.bigdata.hudi.examples.MyJsonKafkaSource \
--schemaprovider-class com.huawei.bigdata.hudi.examples.DataSchemaProviderExample \
--transformer-class com.huawei.bigdata.hudi.examples.TransformerExample \
--enable-hive-sync --continuous

kafka.properties 配置

// hudi 配置
hoodie.datasource.write.recordkey.field=uid
hoodie.datasource.write.partitionpath.field=
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.NonpartitionedKeyGenerator
hoodie.datasource.write.hive_style_partitioning=true
hoodie.delete.shuffle.parallelism=10
hoodie.upsert.shuffle.parallelism=10
hoodie.bulkinsert.shuffle.parallelism=10
hoodie.insert.shuffle.parallelism=10
hoodie.finalize.write.parallelism=10
hoodie.cleaner.parallelism=10
hoodie.datasource.write.precombine.field=uid
hoodie.base.path = /tmp/huditest/delta_demo2
hoodie.timeline.layout.version = 1

// hive config
hoodie.datasource.hive_sync.table=delta_demo2
hoodie.datasource.hive_sync.partition_fields=
hoodie.datasource.hive_sync.assume_date_partitioning=false
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.NonPartitionedExtractor
hoodie.datasource.hive_sync.use_jdbc=false

// Kafka Source topic
hoodie.deltastreamer.source.kafka.topic=hudisource
// checkpoint
hoodie.deltastreamer.checkpoint.provider.path=hdfs://hacluster/tmp/delta_demo2/checkpoint/

// Kafka props
bootstrap.servers=172.16.9.117:21005
auto.offset.reset=earliest
group.id=a5
offset.rang.limit=10000

留神:kafka 服务端配置 allow.everyone.if.no.acl.found 为 true

应用 Spark 查问

spark-shell --master yarn

val roViewDF = spark.read.format("org.apache.hudi").load("/tmp/huditest/delta_demo2/*")
roViewDF.createOrReplaceTempView("hudi_ro_table")
spark.sql("select * from  hudi_ro_table").show()

Mysql 减少操作对应 spark 中 hudi 表查问后果:

Mysql 更新操作对应 spark 中 hudi 表查问后果:

删除操作:

应用 Hive 查问

beeline

select * from delta_demo2;

Mysql 减少操作对应 hive 表中查问后果:

Mysql 更新操作对应 hive 表中查问后果:

Mysql 删除操作对应 hive 表中查问后果:

本文由华为云公布

退出移动版