关于kafka:Kafka实时数据即席查询应用与实践

9次阅读

共计 9858 个字符,预计需要花费 25 分钟才能阅读完成。

作者:vivo 互联网搜寻团队 - Deng Jie

Kafka 中的实时数据是以 Topic 的概念进行分类存储,而 Topic 的数据是有肯定时效性的,比方保留 24 小时、36 小时、48 小时等。而在定位一些实时数据的 Case 时,如果没有对实时数据进行历史归档,在排查问题时,没有日志追述,会很难定位是哪个环节的问题。

一、背景

Kafka 中的实时数据是以 Topic 的概念进行分类存储,而 Topic 的数据是有肯定时效性的,比方保留 24 小时、36 小时、48 小时等。而在定位一些实时数据的 Case 时,如果没有对实时数据进行历史归档,在排查问题时,没有日志追述,会很难定位是哪个环节的问题。因而,咱们须要对解决的这些实时数据进行记录归档并存储。

二、内容

2.1 案例剖析

这里以 i 视频和 vivo 短视频实时数据为例,之前存在这样的合作问题:

数据上游内容方提供实时 Topic(寄存 i 视频和 vivo 短视频相干实时数据),数据侧对实时数据进行逻辑解决后,发送给上游工程去建库实时索引,当工作执行一段时间后,工程侧建索引偶然会提出数据没有发送过来的 Case,后期因为没有对数据做存储,在定位问题的时候会比拟麻烦,常常需要查看实时日志,须要破费很长的工夫来剖析这些 Case 是呈现在哪个环节。

为了解决这个问题,咱们能够将实时 Topic 中的数据,在发送给其余 Topic 的时候,增加跟踪机制,进行数据分流,Sink 到存储介质(比方 HDFS、Hive 等)。这里,咱们抉择应用 Hive 来进行存储,次要是查问不便,反对 SQL 来疾速查问。如下图所示:

在实现优化后的计划时,有两种形式能够实现跟踪机制,它们别离是 Flink SQL 写 Hive、Flink DataStream 写 Hive。接下来,别离对这两种实现计划进行介绍和实际。

2.2 计划一:Flink SQL 写 Hive

这种形式比拟间接,能够在 Flink 工作外面间接操作实时 Topic 数据后,将生产后的数据进行分流跟踪,作为日志记录写入到 Hive 表中,具体实现步骤如下:

  • 结构 Hive Catalog;
  • 创立 Hive 表;
  • 写入实时数据到 Hive 表。

2.2.1 结构 Hive Catalog

在结构 Hive Catalog 时,须要初始化 Hive 的相干信息,局部代码片段如下所示:

// 设置执行环境
 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();
 StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,settings);
 
 // 结构 Hive Catalog 名称
 String name = "video-hive-catalog";
 // 初始化数据库名
 String defaultDatabase = "comsearch";
 // Hive 配置文件门路地址
 String hiveConfDir = "/appcom/hive/conf";
 // Hive 版本号
 String version = "3.1.2";
 
 // 实例化一个 HiveCatalog 对象
 HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);
 // 注册 HiveCatalog
 tEnv.registerCatalog(name, hive);
 // 设定以后 HiveCatalog
 tEnv.useCatalog(name);
 // 设置执行 SQL 为 Hive
 tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
 // 应用数据库
 tEnv.useDatabase("db1");

在以上代码中,咱们首先设置了 Flink 的执行环境和表环境,而后创立了一个 HiveCatalog,并将其注册到表环境中。

2.2.2 创立 Hive 表

如果 Hive 表不存在,能够通过在程序中执行建表语句,具体 SQL 见表语句代码如下所示:

-- 创立表语句 
tEnv.executeSql("CREATE TABLE IF NOT EXISTS TABLE `xxx_table`(
  `content_id` string,
  `status` int)
PARTITIONED BY (
  `dt` string,
  `h` string,
  `m` string)
stored as ORC
TBLPROPERTIES (
  'auto-compaction'='true',
  'sink.partition-commit.policy.kind'='metastore,success-file',
  'partition.time-extractor.timestamp-pattern'='$dt $h:$m:00'
)")

在创立 Hive 表时咱们应用了 IF NOT EXISTS 关键字,如果 Hive 中该表不存在会主动在 Hive 上创立,也能够提前在 Hive 中创立好该表,Flink SQL 中就无需再执行建表 SQL,因为用了 Hive 的 Catalog,Flink SQL 运行时会找到表。这里,咱们设置了 auto-compaction 属性为 true,用来使小文件主动合并,1.12 版的新个性,解决了实时写 Hive 产生的小文件问题。同时,指定 metastore 值是专门用于写入 Hive 的,也须要指定 success-file 值,这样 CheckPoint 触发完数据写入磁盘后会创立_SUCCESS 文件以及 Hive metastore 上创立元数据,这样 Hive 才可能对这些写入的数据可查。

2.2.3 写入实时数据到 Hive 表

在筹备实现 2.2.1 和 2.2.2 中的步骤后,接下来就能够在 Flink 工作中通过 SQL 来对实时数据进行操作了,具体实现代码片段如下所示:

// 编写业务 SQL
 String insertSql = "insert into  xxx_table SELECT content_id, status," +
                    "DATE_FORMAT(ts,'yyyy-MM-dd'), DATE_FORMAT(ts,'HH'), DATE_FORMAT(ts,'mm') FROM xxx_rt";
 // 执行 Hive SQL
 tEnv.executeSql(insertSql);
 // 执行工作
 env.execute();

将生产后的数据进行分类,编写业务 SQL 语句,将生产的数据作为日志记录,发送到 Hive 表进行存储,这样 Kafka 中的实时数据就存储到 Hive 了,方便使用 Hive 来对 Kafka 数据进行即席剖析。

2.2.4 避坑技巧

应用这种形式在解决的过程中,如果配置应用的是 EventTime,在程序中配置 ’sink.partition-commit.trigger’=’partition-time’,最初会呈现无奈提交分区的状况。通过对源代码 PartitionTimeCommitTigger 的剖析,找到了呈现这种异常情况的起因。

咱们能够通过看

org.apache.flink.table.filesystem.stream.PartitionTimeCommitTigger#committablePartitionsorg.apache.flink.table.filesystem.stream.PartitionTimeCommitTigger#committablePartitions

中的一个函数,来阐明具体的问题,局部源代码片段如下:

// PartitionTimeCommitTigger 源代码函数代码片段
@Override
public List<String> committablePartitions(long checkpointId) {if (!watermarks.containsKey(checkpointId)) {
  throw new IllegalArgumentException(String.format("Checkpoint(%d) has not been snapshot. The watermark information is: %s.",
    checkpointId, watermarks));
 }
 
 long watermark = watermarks.get(checkpointId);
 watermarks.headMap(checkpointId, true).clear();
 
 List<String> needCommit = new ArrayList<>();
 Iterator<String> iter = pendingPartitions.iterator();
 while (iter.hasNext()) {String partition = iter.next();
  // 通过分区的值来获取分区的工夫
  LocalDateTime partTime = extractor.extract(partitionKeys, extractPartitionValues(new Path(partition)));
  // 判断水印是否大于分区创立工夫 + 延迟时间
  if (watermark > toMills(partTime) + commitDelay) {needCommit.add(partition);
   iter.remove();}
 }
 return needCommit;
}

通过剖析上述代码片段,咱们能够晓得零碎通过分区值来抽取相应的分区来创立工夫,而后进行比对,比方咱们设置的工夫 pattern 是 ‘$dt $h:$m:00’ , 某一时刻咱们正在往 /2022-02-26/18/20/ 这个分区下写数据,那么程序依据分区值,失去的 pattern 将会是 2022-02-26 18:20:00,这个值在 SQL 中是依据 DATA_FORMAT 函数获取的。

而这个值是带有时区的,比方咱们的时区设置为东八区,2022-02-26 18:20:00 这个工夫是东八区的工夫,换成规范 UTC 工夫是减去 8 个小时,也就是 2022-02-26 10:20:00,而在源代码中的 toMills 函数在解决这个东八区的工夫时,并没有对时区进行解决,把这个其实应该是东八区的工夫当做了 UTC 工夫来解决,这样计算出来的值就比理论值大 8 小时,导致始终没有触发分区的提交。

如果咱们在数据源中结构的分区是 UTC 工夫,也就是不带分区的工夫,那么这个逻辑就是没有问题的,然而这样又不合乎咱们的理论状况,比方对于分区 2022-02-26 18:20:00,我心愿我的分区必定是东八区的工夫,而不是比东八区小 8 个小时的 UTC 工夫 2022-02-26 10:20:00。

在明确了起因之后,咱们就能够针对上述异常情况进行优化咱们的实现计划,比方自定义一个分区类、或者批改缺省的工夫分区类。比方,咱们应用 TimeZoneTableFunction 类来实现一个自定义时区,局部参考代码片段如下:

public class CustomTimeZoneTableFunction implements TimeZoneTableFunction {
 
  private transient DateTimeFormatter formatter;
  private String timeZoneId;
 
  public CustomTimeZoneTableFunction(String timeZoneId) {this.timeZoneId = timeZoneId;}
 
  @Override
  public void open(FunctionContext context) throws Exception {
    // 初始化 DateTimeFormatter 对象
    formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:00");
    formatter = formatter.withZone(ZoneId.of(timeZoneId));
  }
 
  @Override
  public void eval(Long timestamp, Collector<TimestampWithTimeZone> out) {
    // 将工夫戳转换为 LocalDateTime 对象
    LocalDateTime localDateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneOffset.UTC);
    // 将 LocalDateTime 对象转换为指定时区下的 LocalDateTime 对象
    LocalDateTime targetDateTime = localDateTime.atZone(ZoneId.of(timeZoneId)).toLocalDateTime();
    // 将 LocalDateTime 对象转换为 TimestampWithTimeZone 对象,并输入到上游
    out.collect(TimestampWithTimeZone.fromLocalDateTime(targetDateTime, ZoneId.of(timeZoneId)));
  }
}

2.3 计划二:Flink DataStream 写 Hive

在一些非凡的场景下,Flink SQL 如果无奈实现咱们简单的业务需要,那么咱们能够思考应用 Flink DataStream 写 Hive 这种实现计划。比方如下业务场景,当初须要实现这样一个业务需要,内容方将实时数据写入到 Kafka 音讯队列中,而后由数据侧通过 Flink 工作生产内容方提供的数据源,接着对生产的数据进行分流解决(这里的步骤和 Flink SQL 写 Hive 的步骤相似),每分钟进行存储到 HDFS(MapReduce 工作须要计算和重跑 HDFS 数据),而后通过 MapReduce 工作将 HDFS 上的这些日志数据生成 Hive 所须要格局,最初将这些 Hive 格局数据文件加载到 Hive 表中。实现 Kafka 数据到 Hive 的即席剖析性能,具体实现流程细节如下图所示:

具体外围实现步骤如下:

  • 生产内容方 Topic 实时数据;
  • 生成数据预处理策略;
  • 加载数据;
  • 应用 Hive SQL 对 Kafka 数据进行即席剖析。

2.3.1 生产内容方 Topic 实时数据

编写生产 Topic 的 Flink 代码,这里不对 Topic 中的数据做逻辑解决,在前面对立交给 MapReduce 来做数据预处理,间接生产并存储到 HDFS 上。具体实现代码如下所示:


public class Kafka2Hdfs {public static void main(String[] args) {
        // 判断参数是否无效
        if (args.length != 3) {LOG.error("kafka(server01:9092), hdfs(hdfs://cluster01/data/), flink(parallelism=2) must be exist.");
            return;
        }
        // 初始化 Kafka 连贯地址和 HDFS 存储地址以及 Flink 并行度
        String bootStrapServer = args[0];
        String hdfsPath = args[1];
        int parallelism = Integer.parseInt(args[2]);
 
        // 实例化一个 Flink 工作对象
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(5000);
        env.setParallelism(parallelism);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 
        // Flink 生产 Topic 中的数据
        DataStream<String> transction = env.addSource(new FlinkKafkaConsumer010<>("test_bll_topic", new SimpleStringSchema(), configByKafkaServer(bootStrapServer)));
 
        // 实例化一个 HDFS 存储对象
        BucketingSink<String> sink = new BucketingSink<>(hdfsPath);
 
        // 自定义存储到 HDFS 上的文件名,用小时和分钟来命名,不便前面算策略
        sink.setBucketer(new DateTimeBucketer<String>("HH-mm"));
 
        // 设置存储 HDFS 的文件大小和存储文件工夫频率
        sink.setBatchSize(1024 * 1024 * 4);
        sink.setBatchRolloverInterval(1000 * 30);
        transction.addSink(sink);
 
        env.execute("Kafka2Hdfs");
    }
 
    // 初始化 Kafka 对象连接信息
    private static Object configByKafkaServer(String bootStrapServer) {Properties props = new Properties();
        props.setProperty("bootstrap.servers", bootStrapServer);
        props.setProperty("group.id", "test_bll_group");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        return props;
    }
 
}

注意事项:

  • 这里咱们把工夫窗口设置小一些,每 30s 做一次 Checkpoint,如果该批次的工夫窗口没有数据过去,就生成一个文件落地到 HDFS 上;
  • 另外,咱们重写了 Bucketer 为 DateTimeBucketer,逻辑并不简单,在原有的办法上加一个年 - 月 - 日 / 时 - 分的文件生成门路,例如在 HDFS 上的生成门路:xxxx/2022-02-26/00-00。

具体 DateTimeBucketer 实现代码如下所示:

public class DateMinuteBucketer implements Bucketer<String> {private SimpleDateFormat baseFormatDay = new SimpleDateFormat("yyyy-MM-dd");
    private SimpleDateFormat baseFormatMin = new SimpleDateFormat("HH-mm");
 
    @Override
    public Path getBucketPath(Clock clock, Path basePath, String element) {return new Path(basePath + "/" + baseFormatDay.format(new Date()) + "/" + baseFormatMin.format(new Date()));
    }
}

2.3.2 生成数据预处理策略

这里,咱们须要对落地到 HDFS 上的文件进行预处理,解决的逻辑是这样的。比方,当初是 2022-02-26 14:00,那么咱们须要将当天的 13:55,13:56,13:57,13:58,13:59 这最近 5 分钟的数据处理到一起,并加载到 Hive 的最近 5 分钟的一个分区外面去。那么,咱们须要生成这样一个逻辑策略汇合,用 HH-mm 作为 key,与之最近的 5 个文件作为 value,进行数据预处理合并。具体实现代码步骤如下:

  • 步骤一:获取小时循环策略;
  • 步骤二:获取分钟循环策略;
  • 步骤三:判断是否为 5 分钟的倍数;
  • 步骤四:对分钟级别小于 10 的数字做 0 补齐(比方 9 补齐后变成 09);
  • 步骤五:对小时级别小于 10 的数字做 0 补齐(比方 1 补齐后变成 01);
  • 步骤六:生成工夫范畴;
  • 步骤七:输入后果。

其中,次要的逻辑是在生成工夫范畴的过程中,依据小时和分钟数的不同状况,生成不同的工夫范畴,并输入后果。在生成工夫范畴时,须要留神前导 0 的解决,以及非凡状况(如小时为 0、分钟为 0 等)的解决。最初,将生成的工夫范畴输入即可。

根据上述步骤编写对应的实现代码,生成当天所有日期命名规定,预览局部后果如下:

须要留神的是,如果产生了第二天 00:00,那么咱们须要用到前一天的 00-00=>23-59,23-58,23-57,23-56,23-55 这 5 个文件中的数据来做预处理。

2.3.3 加载数据

在实现 2.3.1 和 2.3.2 外面的内容后,接下来,咱们能够应用 Hive 的 load 命令间接加载 HDFS 上预处理后的文件,把数据加载到对应的 Hive 表中,具体实现命令如下:

-- 加载数据到 Hive 表
load data inpath '<hdfs_path_hfile>' overwrite into table xxx.table partition(day='2022-02-26',hour='14',min='05')

2.3.4 即席剖析

之后,咱们应用 Hive SQL 来对 Kafka 数据进行即席剖析,示例 SQL 如下所示:

-- 查问某 5 分钟分区数据
select * from xxx.table where day='2022-02-26' and hour='14' and min='05'

2.4 Flink SQL 与 Flink DataStream 如何抉择

Flink SQL 和 Flink DataStream 都是 Flink 中用于解决数据的外围组件,咱们能够依据本人理论的业务场景来抉择应用哪一种组件。

Flink SQL 是一种基于 SQL 语言的数据处理引擎,它能够将 SQL 查问语句转换为 Flink 的数据流处理程序。相比于 Flink DataStream,Flink SQL 更加易于应用和保护,同时具备更快的开发速度和更高的代码复用性。Flink SQL 实用于须要疾速开发和部署数据处理工作的场景,比方数据仓库、实时报表、数据荡涤等。

Flink DataStream API 是 Flink 数据流解决规范 API,SQL 是 Flink 前期版本提供的新的数据处理操作接口。SQL 的引入为进步了 Flink 应用的灵活性。能够认为 Flink SQL 是一种通过字符串来定义数据流解决逻辑的描述语言。

因而,在抉择 Flink SQL 和 Flink DataStream 时,须要依据具体的业务需要和数据处理工作的特点来进行抉择。如果须要疾速开发和部署工作,能够抉择应用 Flink SQL;如果须要进行更为深刻和定制化的数据处理操作,能够抉择应用 Flink DataStream。同时,也能够依据理论状况,联合应用 Flink SQL 和 Flink DataStream 来实现简单的数据处理工作。

三、总结

在理论利用中,Kafka 实时数据即席查问能够用于多种场景,如实时监控、实时报警、实时统计、实时剖析等。具体利用和实际中,须要留神以下几点:

  • 数据品质:Kafka 实时数据即席查问须要保证数据品质,防止数据反复、失落或谬误等问题,须要进行数据品质监控和调优。
  • 零碎复杂性:Kafka 实时数据即席查问须要波及到多个零碎和组件,包含 Kafka、数据处理引擎(比方 Flink)、查问引擎(比方 Hive)等,须要对系统进行配置和治理,减少了零碎的复杂性。
  • 安全性:Kafka 实时数据即席查问须要增强数据安全性保障,防止数据泄露或数据篡改等平安问题,做好 Hive 的权限管控。
  • 性能优化:Kafka 实时数据即席查问须要对系统进行性能优化,包含优化数据处理引擎、查问引擎等,进步零碎的性能和效率。

参考:

  1. https://github.com/apache/flink
  2. https://flink.apache.org/
正文完
 0