关于es6:使用-Apache-Flink-开发实时ETL

41次阅读

共计 6724 个字符,预计需要花费 17 分钟才能阅读完成。

Apache Flink 是大数据畛域又一新兴框架。它与 Spark 的不同之处在于,它是应用流式解决来模仿批量解决的,因而可能提供亚秒级的、合乎 Exactly-once 语义的实时处理能力。Flink 的应用场景之一是构建实时的数据通道,在不同的存储之间搬运和转换数据。本文将介绍如何应用 Flink 开发实时 ETL 程序,并介绍 Flink 是如何保障其 Exactly-once 语义的。

让咱们来编写一个从 Kafka 抽取数据到 HDFS 的程序。数据源是一组事件日志,其中蕴含了事件产生的工夫,以工夫戳的形式存储。咱们须要将这些日志按事件工夫别离寄存到不同的目录中,即按日分桶。工夫日志示例如下:

{“timestamp”:1545184226.432,”event”:”page_view”,”uuid”:”ac0e50bf-944c-4e2f-bbf5-a34b22718e0c”}
{“timestamp”:1545184602.640,”event”:”adv_click”,”uuid”:”9b220808-2193-44d1-a0e9-09b9743dec55″}
{“timestamp”:1545184608.969,”event”:”thumbs_up”,”uuid”:”b44c3137-4c91-4f36-96fb-80f56561c914″}

产生的目录构造为:
/user/flink/event_log/dt=20181219/part-0-1
/user/flink/event_log/dt=20181220/part-1-9

创立我的项目
Flink 应用程序须要应用 Java 8 编写,咱们能够应用 Maven 模板创立我的项目:
mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.7.0

将生成好的代码导入到 IDE 中,能够看到名为 StreamingJob 的文件,咱们由此开始编写程序。
Kafka 数据源
Flink 对 Kafka 数据源提供了原生反对,咱们须要抉择正确的 Kafka 依赖版本,将其增加到 POM 文件中:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>

测试过程中,咱们须要一个可能运行的 Kafka 服务,读者能够参照官网文档 搭建本地服务。在 Flink 中初始化 Kafka 数据源时,传入服务器名和主题名就能够了:
Properties props = new Properties();
props.setProperty(“bootstrap.servers”, “localhost:9092”);
FlinkKafkaConsumer010<String> consumer = new FlinkKafkaConsumer010<>(

"flink_test", new SimpleStringSchema(), props);    

DataStream<String> stream = env.addSource(consumer);

Flink 会连贯本地的 Kafka 服务,读取 flink_test 主题中的数据,转换成字符串后返回。除了 SimpleStringSchema,Flink 还提供了其余内置的反序列化形式,如 JSON、Avro 等,咱们也能够编写自定义逻辑。

流式文件存储
StreamingFileSink 代替了先前的 BucketingSink,用来将上游数据存储到 HDFS 的不同目录中。它的外围逻辑是分桶,默认的分桶形式是 DateTimeBucketAssigner,即依照解决工夫分桶。解决工夫指的是音讯达到 Flink 程序的工夫,这点并不合乎咱们的需要。因而,咱们须要本人编写代码将事件工夫从音讯体中解析进去,按规定生成分桶的名称:

public class EventTimeBucketAssigner implements BucketAssigner<String, String> {
@Override
public String getBucketId(String element, Context context) {

JsonNode node = mapper.readTree(element);    
long date = (long) (node.path("timestamp").floatValue() * 1000);    
String partitionValue = new SimpleDateFormat("yyyyMMdd").format(new Date(date));    
return "dt=" + partitionValue;    

}
}

上述代码会应用 Jackson 库对音讯体进行解析,将工夫戳转换成日期字符串,增加前缀后返回。如此一来,StreamingFileSink 就能晓得应该将以后记录搁置到哪个目录中了。
StreamingFileSink<String> sink = StreamingFileSink

.forRowFormat(new Path("/tmp/kafka-loader"), new SimpleStringEncoder<String>())    
.withBucketAssigner(new EventTimeBucketAssigner())    
.build();    

stream.addSink(sink);

forRowFormat 示意输入的文件是按行存储的,对应的有 forBulkFormat,能够将输入后果用 Parquet 等格局进行压缩存储。

对于 StreamingFileSink 还有一点要留神,它只反对 Hadoop 2.7 以上的版本,因为须要用到高版本文件系统提供的 truncate 办法来实现故障复原,这点下文会详述。
开启检查点
代码编写到这里,其实曾经能够通过 env.execute() 来运行了。然而,它只能保障 At-least-once 语义,即音讯有可能会被反复解决。要做到 Exactly-once,咱们还须要开启 Flink 的检查点性能:
env.enableCheckpointing(60_000);
env.setStateBackend((StateBackend) new FsStateBackend(“/tmp/flink/checkpoints”));
env.getCheckpointConfig().enableExternalizedCheckpoints(

ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);

检查点(Checkpoint)是 Flink 的故障复原机制,同样会在下文详述。代码中,咱们将状态存储形式由 MemoryStateBackend 批改为了 FsStateBackend,即应用内部文件系统,如 HDFS,来保留应用程序的中间状态,这样当 Flink JobManager 宕机时,也能够恢复过来。Flink 还反对 RocksDBStateBackend,用来寄存较大的中间状态,并能反对增量的状态更新。

提交与治理脚本
Flink 程序能够间接在 IDE 中调试。咱们也能够搭建一个本地的 Flink 游戏 集群,并通过 Flink CLI 命令行工具来提交脚本:
bin/flink run -c com.shzhangji.flinksandbox.kafka.KafkaLoader target/flink-sandbox-0.1.0.jar

脚本的运行状态能够在 Flink 仪表盘中查看:
640?wx_fmt=png

应用暂存点来进行和复原脚本
当须要暂停脚本、或对程序逻辑进行批改时,咱们须要用到 Flink 的暂存点机制(Savepoint)。暂存点和检查点相似,同样保留的是 Flink 各个算子的状态数据(Operator State)。不同的是,暂存点次要用于人为的脚本更替,而检查点则次要由 Flink 管制,用来实现故障复原。flink cancel -s 命令能够在进行脚本的同时创立一个暂存点:
$ bin/flink cancel -s /tmp/flink/savepoints 1253cc85e5c702dbe963dd7d8d279038
Cancelled job 1253cc85e5c702dbe963dd7d8d279038. Savepoint stored in file:/tmp/flink/savepoints/savepoint-1253cc-0df030f4f2ee.

在 YARN 上运行
要将脚本提交到 YARN 集群上运行,同样是应用 flink run 命令。首先将代码中指定文件目录的局部增加上 HDFS 前缀,如 hdfs://localhost:9000/,从新打包后执行下列命令:
$ export HADOOP_CONF_DIR=/path/to/hadoop/conf
$ bin/flink run -m yarn-cluster -c com.shzhangji.flinksandbox.kafka.KafkaLoader target/flink-sandbox-0.1.0.jar
Submitted application application_1545534487726_0001

Flink 仪表盘会在 YARN Application Master 中运行,咱们能够通过 ResourceManager 界面进入。返回的利用 ID 能够用来治理脚本,增加 -yid 参数即可:
bin/flink cancel -s hdfs://localhost:9000/tmp/flink/savepoints -yid application_1545534487726_0001 84de00a5e193f26c937f72a9dc97f386

Flink 如何保障 Exactly-once 语义
Flink 实时处理程序能够分为三个局部,数据源、解决流程、以及输入。不同的数据源和输入提供了不同的语义保障,Flink 统称为 连接器。解决流程则能提供 Exactly-once 或 At-least-once 语义,须要看检查点是否开启。

实时处理与检查点
Flink 的检查点机制是基于 Chandy-Lamport 算法的:Flink 会定时在数据流中安插轻量的标记信息(Barrier),将音讯流切割成一组组记录;当某个算子解决完一组记录后,就将以后状态保留为一个检查点,提交给 JobManager,该组的标记信息也会传递给上游;当末端的算子(通常是 www.sangpi.comSink)解决完这组记录并提交检查点后,这个检查点将被标记为“已实现”;当脚本呈现问题时,就会从最初一个“已实现”的检查点开始重放记录。

如果算子有多个上游,Flink 会应用一种称为“音讯对齐”的机制:如果某个上游呈现提早,以后算子会进行从其它上游生产音讯,直到提早的上游赶上进度,这样就保障了算子中的状态不会蕴含下一批次的记录。显然,这种形式会引入额定的提早,因而除了这种 EXACTLY_ONCE 模式,咱们也可将检查点配置为 AT_LEAST_ONCE,以取得更高的吞吐量。具体形式请参考 官网文档。

可重放的数据源
当出错的脚本须要从上一个检查点复原时,Flink 必须对数据进行重放,这就要求数据源反对这一性能。Kafka 是目前应用得较多的音讯队列,且反对从特定位点进行生产。具体来说,FlinkKafkaConsumer 类实现了 CheckpointedFunction 接口,会在查看点中寄存主题名、分区名、以及偏移量:

abstract class FlinkKafkaConsumerBase implements CheckpointedFunction {
public void initializeState(FunctionInitializationContext context) {

OperatorStateStore stateStore = context.getOperatorStateStore();    
this.unionOffsetStates = stateStore.getUnionListState(new ListStateDescriptor<>(    
    OFFSETS_STATE_NAME,    
    TypeInformation.of(new TypeHint<Tuple2<KafkaTopicPartition, Long>>() {})));    


if (context.isRestored()) {for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : unionOffsetStates.get()) {restoredState.put(kafkaOffset.f0, kafkaOffset.f1);    
  }    
}    

}

public void snapshotState(FunctionSnapshotContext context) {

unionOffsetStates.clear();    
for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : currentOffsets.entrySet()) {unionOffsetStates.add(Tuple2.of(kafkaTopicPartitionLongEntry.getKey(),    
      kafkaTopicPartitionLongEntry.getValue()));    
}    

}
}

当数据源算子从检查点或暂存点复原时,咱们能够在 TaskManager 的日志中看到以下信息,表明以后生产的偏移量是从算子状态中复原进去的:
2018-12-23 10:56:47,380 INFO FlinkKafkaConsumerBase Consumer subtask 0 will start reading 2 partitions with offsets in restored state: {KafkaTopicPartition{topic=’flink_test’, partition=1}=725, KafkaTopicPartition{topic=’flink_test’, partition=0}=721}
1.

复原写入中的文件
程序运行过程中,StreamingFileSink 首先会将后果写入两头文件,以 . 结尾、in-progress 结尾。这些两头文件会在合乎肯定条件后更名为正式文件,取决于用户配置的 RollingPolicy,默认策略是基于工夫(60 秒)和基于大小(128 MB)。当脚本出错或重启时,两头文件会被间接敞开;在复原时,因为查看点中保留了两头文件名和胜利写入的长度,程序会从新关上这些文件,切割到指定长度(Truncate),而后持续写入。这样一来,文件中就不会蕴含检查点之后的记录了,从而实现 Exactly-once。

以 Hadoop 文件系统举例,复原的过程是在 HadoopRecoverableFsDataOutputStream 类的构造函数中进行的。它会接管一个 HadoopFsRecoverable 类型的构造,外面蕴含了两头文件的门路和长度。这个对象是 BucketState 的成员,会被保留在查看点中。
HadoopRecoverableFsDataOutputStream(FileSystem fs, HadoopFsRecoverable recoverable) {
this.tempFile = checkNotNull(recoverable.tempFile());
truncate(fs, tempFile, recoverable.offset());
out = fs.append(tempFile);
}

论断

Apache Flink 构建在实时处理之上,从设计之初就充分考虑了中间状态的保留,而且可能很好地与现有 Hadoop 生态环境联合,因此在大数据畛域十分有竞争力。它还在高速倒退之中,近期也引入了 Table API、流式 SQL、机器学习等性能,像阿里巴巴这样的公司也在大量应用和奉献代码。Flink 的利用场景泛滥,有很大的发展潜力,值得一试。

正文完
 0