关于程序员:华为云MRS基于Hudi和HetuEngine构建实时数据湖最佳实践

35次阅读

共计 7947 个字符,预计需要花费 20 分钟才能阅读完成。

数据湖与实时数据湖是什么?

各个行业企业都在构建企业级数据湖,将企业内多种格局数据源汇聚的大数据平台,通过严格的数据权限和资源管控,将数据和算力凋谢给各种使用者。一份数据反对多种剖析,是数据湖最大的特点。如果数据湖的数据,从数据源产生后,能够在 1 分钟以内实时进入到数据湖存储,反对各种交互式剖析,这种数据湖通常叫做实时数据湖,如果能够做到 15 分钟之内,也可称为准实时数据湖。构建实时数据湖,正在成为 5G 和 IOT 时代,撑持各个企业实时剖析业务的数据湖新指标。

华为 MRS 实时数据湖计划介绍

  1. 生产库数据通过 CDC 工具(debezium)实时录入到 MRS 集群中 Kafka 的指定 topic 里;
  2. 在 MRS 集群启动一个 SparkStreaming 工作,实时读取 Kafka 指定 topic 里的数据;
  3. 同时该 SparkStreaming 工作将读取到的数据进行解析解决并写入到一张 hudi 表中;
  4. 写入 hudi 表的同时能够指定该数据也写入 hive 表;
  5. 通过 MRS 提供的交互式查问引擎 HetuEngine 对数据进行疾速的交互式查问。

应用华为 MRS 实时数据湖计划的劣势:

  1. ACID 事务能力得以保障,湖内一份数据满足所有的剖析业务需要,缩小数据搬迁,缩小数据冗余;
  2. 数据一致性保障,保障增量数据与入湖后数据一致性检测;
  3. 数据加工流转,在一个存储层内闭环,数据流动更高效;
  4. 基于 HetuEngine 引擎实现交互式查问,性能不升高。

上面会针对计划的三个要害组件:CDC 工具,数据存储引擎 Hudi, 交互式查问引擎 HetuEngine 进行具体的介绍

样例数据简介

生产库 MySQL 原始数据 (前 10 条,共 1000 条):

CDC 工具

简介

CDC(changed data capture)为动态数据抓取,常见的形式分为同步和异步。同步 CDC 次要是采纳触发器记录新增数据,根本可能做到实时增量抽取。而异步 CDC 则是通过剖析曾经 commit 的日志记录来失去增量数据信息。常见的 CDC 工具有 Canal, DataBus, Maxwell, Debezium, OGG 等。本计划采纳 debezium 作为 CDC 工具

对接步骤

具体参考:https://fusioninsight.github….

实现对接后,针对 MySQL 生产库别离做增、改、删除操作对应的 kafka 音讯

减少操作: insert into hudi.hudisource values (1001,“蒋语堂”,38,“女”,“图”,”《星球大战》”,28732);

对应 kafka 音讯体:

更改操作:UPDATE hudi.hudisource SET uname=‘Anne Marie’WHERE uid=1001;

对应 kafka 音讯体:

删除操作:delete from hudi.hudisource where uid=1001;

对应 kafka 音讯体:

Hudi

简介

Apache Hudi 是一个 Data Lakes 的开源计划,Hudi 是 Hadoop Updates and Incrementals 的简写。具备以下的个性

  • ACID 事务能力,反对实时入湖和批量入湖。
  • 多种视图能力(读优化视图 / 增量视图 / 实时视图),反对疾速数据分析。
  • MVCC 设计,反对数据版本回溯。
  • 主动治理文件大小和布局,以优化查问性能准实时摄取,为查问提供最新数据。
  • 反对并发读写,基于 snapshot 的隔离机制实现写入时可读取。
  • 反对原地转表,将存量的历史表转换为 Hudi 数据集。

样例代码解析

应用 Hudi 实时入湖的样例代码分三个局部

  • Kafka 数据生产
  • 数据内容解析、解决
  • 解析后数据的写入

Kafka 数据生产局部样例代码:

String savePath = "hdfs://hacluster/huditest2/";
String groupId = "group1";
System.out.println("groupID is:" + groupId);
String brokerList = "172.16.5.51:21005";
System.out.println("brokerList is:" + brokerList);
String topic = "hudisource";
System.out.println("topic is:" + topic);
String interval = "5";

HashMap<String, Object> kafkaParam = new HashMap<>();
kafkaParam.put("bootstrap.servers", brokerList);
kafkaParam.put("group.id", groupId);
kafkaParam.put("auto.offset.reset", "earliest");
kafkaParam.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaParam.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

HashSet<String> topics = new HashSet<>();
topics.add(topic);

String[] topicArray = {topic};
Set<String> topicSet = new HashSet<String>(Arrays.asList(topicArray));
ConsumerStrategy consumerStrategy = ConsumerStrategies.Subscribe(topicSet, kafkaParam);

// 本地调试
SparkConf conf = new SparkConf()
        .setMaster("local[*]")
        .setAppName("hudi-java-demo");

conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.set("spark.streaming.kafka.maxRatePerPartition", "10");
conf.set("spark.streaming.backpressure.enabled", "true");

JavaInputDStream<ConsumerRecord<String, String>> directStream = KafkaUtils.createDirectStream(jssc,
        LocationStrategies.PreferConsistent(),
        consumerStrategy);

数据内容解析、解决局部样例代码:

JavaDStream<List> lines =
        directStream.filter(
                // 过滤空行和脏数据
                new Function<ConsumerRecord<String, String>, Boolean>() {public Boolean call(ConsumerRecord<String, String> v1) throws Exception {if (v1.value() == null) {return false;}
                        try{String op = debeziumJsonParser.getOP(v1.value());
                        }catch (Exception e){return false;}
                        return true;
                    }
                }
        ).map(new Function<ConsumerRecord<String, String>, List>() {public List call(ConsumerRecord<String, String> v1) throws Exception {
                        // 将 debezium 接进来的数据解析写进 List                       
                        String op = debeziumJsonParser.getOP(v1.value());
                        JSONObject json_obj = JSON.parseObject(v1.value());                     
                        Boolean is_delete = false;
                        String out_str = "";
                        if(op.equals("c")){out_str =  json_obj.getJSONObject("payload").get("after").toString();}
                        else if(op.equals("u")){out_str =   json_obj.getJSONObject("payload").get("after").toString();}
                        else {
                            is_delete = true;
                            out_str =   json_obj.getJSONObject("payload").get("before").toString();}
                        LinkedHashMap<String, String> jsonMap = JSON.parseObject(out_str, new TypeReference<LinkedHashMap<String, String>>() {});
                        int cnt =0;
                        List out_list = new ArrayList();
                        for (Map.Entry<String, String> entry : jsonMap.entrySet()) {out_list.add(entry.getValue());
                            cnt++;
                        }
                        out_list.add(is_delete);
                        String commitTime = Long.toString(System.currentTimeMillis());

                        out_list.add(commitTime);
                        System.out.println(out_list);

                        out_list.add(op);

                        return out_list;
                    }
                });

debezium 更新字段解析样例代码:

public class debeziumJsonParser {public static String getOP(String message){JSONObject json_obj = JSON.parseObject(message);
        String op = json_obj.getJSONObject("payload").get("op").toString();
        return  op;
    }
}

解析后数据的写入 hudi 表,hive 表样例代码:

lines.foreachRDD(new VoidFunction<JavaRDD<List>>() {
    @Override
    public void call(JavaRDD<List> stringJavaRDD) throws Exception {if (!stringJavaRDD.isEmpty()) {System.out.println("stringJavaRDD collect---"+stringJavaRDD.collect());
            List<Row> rowList =new ArrayList<>();
            // 把数据上一步数据写进 stringJavaRdd
            for(List row: stringJavaRDD.collect()){String uid = row.get(0).toString();
                String name = row.get(1).toString();
                String age = row.get(2).toString();
                String sex = row.get(3).toString();
                String mostlike = row.get(4).toString();
                String lastview = row.get(5).toString();
                String totalcost = row.get(6).toString();
                Boolean _hoodie_is_deleted = Boolean.valueOf(row.get(7).toString());      
                String commitTime = row.get(8).toString();
                String op = row.get(9).toString();
                Row returnRow = RowFactory.create(uid, name, age, sex, mostlike, lastview, totalcost, _hoodie_is_deleted, commitTime, op);
                rowList.add(returnRow);

            }
            JavaRDD<Row> stringJavaRdd = jsc.parallelize(rowList);
            // 写入表的字段 schema 设计
            List<StructField> fields = new ArrayList<>();
            fields.add(DataTypes.createStructField("uid", DataTypes.StringType, true));
            fields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
            fields.add(DataTypes.createStructField("age", DataTypes.StringType, true));
            fields.add(DataTypes.createStructField("sex", DataTypes.StringType, true));
            fields.add(DataTypes.createStructField("mostlike", DataTypes.StringType, true));
            fields.add(DataTypes.createStructField("lastview", DataTypes.StringType, true));
            fields.add(DataTypes.createStructField("totalcost", DataTypes.StringType, true));
            fields.add(DataTypes.createStructField("_hoodie_is_deleted", DataTypes.BooleanType, true));
            fields.add(DataTypes.createStructField("commitTime", DataTypes.StringType, true));
            fields.add(DataTypes.createStructField("op", DataTypes.StringType, true));
            StructType schema = DataTypes.createStructType(fields);
            Dataset<Row> dataFrame = sqlContext.createDataFrame(stringJavaRdd, schema);
            Dataset<Row> rowDataset = dataFrame.withColumn("ts", dataFrame.col("commitTime"))
                    .withColumn("uuid", dataFrame.col("uid"));        

            // 将数据写入 hudi 表以及 hive 表
            rowDataset.write().format("org.apache.hudi")
                    .option("PRECOMBINE_FIELD_OPT_KEY", "ts")
                    .option("RECORDKEY_FIELD_OPT_KEY", "uuid")
                    .option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.NonpartitionedKeyGenerator")                  
                    .option("hoodie.table.name", "huditesttable")
                    .option("hoodie.upsert.shuffle.parallelism", "10")                  
                    .option("hoodie.delete.shuffle.parallelism", "10")
                    .option("hoodie.insert.shuffle.parallelism", "10")
                    .option("hoodie.bulkinsert.shuffle.parallelism", "10")
                    .option("hoodie.finalize.write.parallelism", "10")
                    .option("hoodie.cleaner.parallelism", "10")
                    .option("hoodie.datasource.write.operation", "upsert")                  
                    .option("hoodie.datasource.hive_sync.enable", "true")                    
                    .option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.NonPartitionedExtractor")
                    .option("hoodie.datasource.hive_sync.database", "default")
                    .option("hoodie.datasource.hive_sync.table", "hudidebezium")
                    .option("hoodie.datasource.hive_sync.use_jdbc", "false")
                    .mode(SaveMode.Append)
                    .save(savePath);
        }
    }
});

jssc.start();
jssc.awaitTermination();
jssc.close();

Hudi 工作提交命令

source /opt/client/bigdata_env
source /opt/client/Hudi/component_env
spark-submit --master yarn --deploy-mode client --jars /opt/hudi-demo4/fastjson-1.2.4.jar --class hudiIn /opt/hudi-demo4/HudiJavaDemo-1.0-SNAPSHOT.jar

HetuEngine

简介

HetuEngine 是华为 FusionInsight MRS 提供的高性能分布式 SQL 查问、数据虚拟化引擎。能与大数据生态无缝交融,实现海量数据秒级查问;反对多源异构协同,提供数据湖内一站式 SQL 交融剖析。

同时 HetuEngine 领有凋谢的接口,可能反对各报表、剖析软件对接,具体可参考生态地图:https://fusioninsight.github….

上面咱们以帆软 FineBI 为例进行查问、剖析。

配置 FineBI 对接 HetuEngine

JDBC URL: jdbc:presto://172.16.5.51:29860,172.16.5.52:29860/hive/default?serviceDiscoveryMode=hsbroker&tenant=default

查看初始同步数据:

通过 HetuEngine 查看增、改、删除操作

Mysql 减少操作对应 hive 表后果:

Mysql 更改操作对应 hive 表后果:

Mysql 删除操作对应 hive 表后果:

报表:

电影青睐度剖析:

电影标签青睐度剖析:

本文由华为云公布

正文完
 0