问题景象
Flink 部署在 K8S 启动时,checkpoint 目录须要配置成具体某台 Hadoop 集群的 NameNode,如:env.setStateBackend(new FsStateBackend(“hdfs://prod02:3333/user/flink-checkpoints”)); 如果配置成 nameservice 模式, 如:env.setStateBackend(new FsStateBackend(“hdfs://hztest/user/flink-checkpoints”)); 则会报错:
解决流程
- 思考 flink 启动时,未加载到 hadoop 配置文件,以后 hadoop 配置文件寄存在 flink 工作的 jar 中,在 classpath 下,因而尝试应用外置 hadoop 配置文件形式加载。
- 拷贝 hadoop 的两个配置文件到部署目录的 hadoopconf 目录中。
-
批改 Dockerfile, 减少 hadoop conf 配置文件到镜像
FROM registry.cn-shanghai.aliyuncs.com/meow-cn/flink:1.13 COPY flink-hudi.jar /opt/flink-hudi.jar COPY hadoopconf/* /opt/hadoopconf/
-
批改 deployment.yaml 文件, 减少 hadoop 配置文件环境变量 HADOOP_CONF_DIR
- name: flink-main-container imagePullPolicy: Always env: - name: HADOOP_CONF_DIR value: /opt/hadoopconf
- Flink 启动后失常,然而始终无奈执行 checkpoint
2023-03-10 18:48:20,786 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator []
- Failed to trigger checkpoint for job 3110562c050f0943caca089038709804 since Checkpoint triggering task Source: Custom Source -> DataSteamToTable(stream=default_catalog.default_database.kafka_source, type=STRING, rowtime=false, watermark=false) -> Calc(select=[CAST(UUID()) AS uuid, f0 AS message, CAST(DATE_FORMAT(NOW(), _UTF-16LE'yyyy-MM-dd hh:mm:ss')) AS event_time, CAST(DATE_FORMAT(NOW(), _UTF-16LE'yyyyMMdd')) AS ds]) -> row_data_to_hoodie_record (1/2) of job 3110562c050f0943caca089038709804 is not being executed at the moment.
Aborting checkpoint. Failure reason: Not all required tasks are currently running.
-
查看 pod 列表, 发现 flink pod 对应的 taskmanager 的 pod 无奈启动, 应用 kt get event |grep flink-hudi-event-log-taskmanager 命令,进行查看,发现 taskmanger 启动报错:
MountVolume.SetUp failed for volume “hadoop-config-volume” : configmap “hadoop-config-flink-hudi-event-log” not found
-
思考 taskmanager 的 pod 与 flink 的工作 pod 并不是通过同一个镜像启动,并不共享环境变量及文件。依据报错信息,taskmanager 是通过 configmap 的形式,进行 hadoop 配置文件的加载;配置 configmap:
kt create configmap hadoop-config-flink-hudi-event-log –from-file=/opt/flink-on-ack/flink-hudi/docker/hadoopconf/
- 重启 pod 后问题解决。