关于数据同步:从启动到关闭-SeaTunnel211源码解析

30次阅读

共计 10043 个字符,预计需要花费 26 分钟才能阅读完成。

点亮 ⭐️ Star · 照亮开源之路

GitHub:https://github.com/apache/incubator-seatunnel

目录

本文转载自 Adobee Chen 的博客 -CSDN 博客,看看是否有你感兴趣的吧!

如有出错,请多斧正。

一、启动脚本解析

二、源码解析

01 入口

02 execute()外围办法

  1. 其中 BaseSource、BaseTransform、BaseSink 都是接口、都实现 Plugin 接口。他们的实现类就是对应的插件类型
  2. execute()办法向下走,创立一个执行环境。
  3. 调用 plugin.prepare(env)
  4. 最初启动 execution.start(sources, transforms, sinks); 执行 flink 代码程序
  5. 最初敞开

一、启动脚本解析

在 /bin/start-seatunnel-flink.sh

#!/bin/bash
​
function usage() {echo "Usage: start-seatunnel-flink.sh [options]"
  echo "options:"
  echo "--config, -c FILE_PATH        Config file"
  echo "--variable, -i PROP=VALUE     Variable substitution, such as -i city=beijing, or -i date=20190318"
  echo "--check, -t                   Check config"
  echo "--help, -h                    Show this help message"
}
 
if [["[email protected]" = *--help ]] || [["[email protected]" = *-h ]] || [[$# -le 1]]; then
  usage
  exit 0
fi
 
is_exist() {if [ -z $1]; then
      usage
      exit -1
    fi
}
 
PARAMS=""while (("$#")); do
  case "$1" in
    -c|--config)
      CONFIG_FILE=$2
      is_exist ${CONFIG_FILE}
      shift 2
      ;;
 
    -i|--variable)
      variable=$2
      is_exist ${variable}
      java_property_value="-D${variable}"
      variables_substitution="${java_property_value} ${variables_substitution}"
      shift 2
      ;;
 
    *) # preserve positional arguments
      PARAMS="$PARAMS $1"
      shift
      ;;
 
  esac
done
 
if [-z ${CONFIG_FILE} ]; then
  echo "Error: The following option is required: [-c | --config]"
  usage
  exit -1
fi
 
# set positional arguments in their proper place
eval set -- "$PARAMS"
 
BIN_DIR="$(cd"$( dirname "${BASH_SOURCE[0]}" )"&& pwd )"
APP_DIR=$(dirname ${BIN_DIR})
CONF_DIR=${APP_DIR}/config
PLUGINS_DIR=${APP_DIR}/lib
DEFAULT_CONFIG=${CONF_DIR}/application.conf
CONFIG_FILE=${CONFIG_FILE:-$DEFAULT_CONFIG}
 
assemblyJarName=$(find ${PLUGINS_DIR} -name seatunnel-core-flink*.jar)
 
if [-f "${CONF_DIR}/seatunnel-env.sh" ]; then
    source ${CONF_DIR}/seatunnel-env.sh
fi
 
string_trim() {echo $1 | awk '{$1=$1;print}'
}
 
export JVM_ARGS=$(string_trim "${variables_substitution}")
 
 
exec ${FLINK_HOME}/bin/flink run \
    ${PARAMS} \
    -c org.apache.seatunnel.SeatunnelFlink \
    ${assemblyJarName} --config ${CONFIG_FILE}

其中: 启动脚本能接管的 –config –variable –check(还不反对) –help

只有不是 config、variable 参数就放到 PARAMS 参数里,最初执行 flink 执行命令,PARAMS 当作 flink 参数执行。

org.apache.seatunnel.SeatunnelFlink 这个类就是主入口

二、源码解析

01 入口

public class SeatunnelFlink {public static void main(String[] args) throws Exception {FlinkCommandArgs flinkArgs = CommandLineUtils.parseFlinkArgs(args);
        Seatunnel.run(flinkArgs);
    }
 
}

FlinkCommandArgs 中进行命令行参数解析

  public static FlinkCommandArgs parseFlinkArgs(String[] args) {FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();
        JCommander.newBuilder()
            .addObject(flinkCommandArgs)
            .build()
            .parse(args);
        return flinkCommandArgs;
    }

进入到 Seatunnel.run(flinkArgs);

  public static FlinkCommandArgs parseFlinkArgs(String[] args) {FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();
        JCommander.newBuilder()
            .addObject(flinkCommandArgs)
            .build()
            .parse(args);
        return flinkCommandArgs;
    }

进入到CommandFactory.createCommand(commandArgs)

依据不同的类型抉择 Command

咱们看的是 flinkCommand

    public static extends CommandArgs> Command createCommand(T commandArgs) {switch (commandArgs.getEngineType()) {
            case FLINK:
                return (Command) new FlinkCommandBuilder().buildCommand((FlinkCommandArgs) commandArgs);
            case SPARK:
                return (Command) new SparkCommandBuilder().buildCommand((SparkCommandArgs) commandArgs);
            default:
                throw new RuntimeException(String.format("engine type: %s is not supported", commandArgs.getEngineType()));
        }
    }

进入到 buildCommand

依据是否查看 config 进入到不同的实现类

    public Command buildCommand(FlinkCommandArgs commandArgs) {return commandArgs.isCheckConfig() ? new FlinkConfValidateCommand() : new FlinkTaskExecuteCommand();
    }

FlinkConfValidateCommand、

FlinkTaskExecuteCommand

两个类都实现了 Command 类

并且都只有一个 execute()办法

public class FlinkConfValidateCommand implements Command
public class FlinkTaskExecuteCommand extends BaseTaskExecuteCommand<flinkcommandargs, FlinkEnvironment>

在 SeaTunnel.run(flinkArgs)进入

command.execute(commandArgs);

咱们先看 FlinkTaskExecuteCommand

类中的execute 办法

02 execute()外围办法

public void execute(FlinkCommandArgs flinkCommandArgs) {
        //flink
        EngineType engine = flinkCommandArgs.getEngineType();
        // --config
        String configFile = flinkCommandArgs.getConfigFile();
        // 将 String 变成 Config 类
        Config config = new ConfigBuilder<>(configFile, engine).getConfig();
        // 解析执行上下文
        ExecutionContext executionContext = new ExecutionContext<>(config, engine);
        // 解析 sources 模块
        List<basesource> sources = executionContext.getSources();</basesource
        // 解析 tansform 模块
        List<basetransform> transforms = executionContext.getTransforms();</basetransform
        // 解析 sink 模块
        List<basesink> sinks = executionContext.getSinks();</basesink
 
        baseCheckConfig(sinks, transforms, sinks);
        showAsciiLogo();
 
        try (Execution<basesource,</basesource
                BaseTransform,
                BaseSink,
                FlinkEnvironment> execution = new ExecutionFactory<>(executionContext).createExecution()) {
            // 筹备
            prepare(executionContext.getEnvironment(), sources, transforms, sinks);
            // 启动
            execution.start(sources, transforms, sinks);
            // 敞开
            close(sources, transforms, sinks);
        } catch (Exception e) {throw new RuntimeException("Execute Flink task error", e);
        }
    }

1. 其中 BaseSource、BaseTransform、BaseSink 都是接口、都实现 Plugin 接口。他们的实现类就是对应的插件类型

如果咱们的 source、sink 是 kafka 的话那么对应的就是 source 就是 KafkaTableStream、Sink 就是 KafkaSink

2. execute()办法向下走,创立一个执行环境。

进入 ExecutionFactory 种的 createExecution()

    public Execution<basesource, BaseTransform, BaseSink, ENVIRONMENT> createExecution() {</basesource
        Execution execution = null;
        switch (executionContext.getEngine()) {
            case SPARK:
                SparkEnvironment sparkEnvironment = (SparkEnvironment) executionContext.getEnvironment();
                switch (executionContext.getJobMode()) {
                    case STREAMING:
                        execution = new SparkStreamingExecution(sparkEnvironment);
                        break;
                    case STRUCTURED_STREAMING:
                        execution = new StructuredStreamingExecution(sparkEnvironment);
                        break;
                    default:
                        execution = new SparkBatchExecution(sparkEnvironment);
                }
                break;
            case FLINK:
                FlinkEnvironment flinkEnvironment = (FlinkEnvironment) executionContext.getEnvironment();
                switch (executionContext.getJobMode()) {
                    case STREAMING:
                        execution = new FlinkStreamExecution(flinkEnvironment);
                        break;
                    default:
                        execution = new FlinkBatchExecution(flinkEnvironment);
                }
                break;
            default:
                throw new IllegalArgumentException("No suitable engine");
        }
        LOGGER.info("current execution is [{}]", execution.getClass().getName());
        return (Execution<basesource, BaseTransform, BaseSink, ENVIRONMENT>) execution;</basesource
    }

进入到 FlinkStreamExecution 中,能够看到最终是创立 flink 执行环境。

 private final FlinkEnvironment flinkEnvironment;
 
    public FlinkStreamExecution(FlinkEnvironment streamEnvironment) {this.flinkEnvironment = streamEnvironment;}

3. 调用 plugin.prepare(env)

    protected final void prepare(E env, List extends Plugin>... plugins) {for (List extends Plugin> pluginList : plugins) {pluginList.forEach(plugin -> plugin.prepare(env));
        }
    }

例如 kafka->kafka

KafkaTableStream prepare

    public void prepare(FlinkEnvironment env) {topic = config.getString(TOPICS);
        PropertiesUtil.setProperties(config, kafkaParams, consumerPrefix, false);
        tableName = config.getString(RESULT_TABLE_NAME);
        if (config.hasPath(ROWTIME_FIELD)) {rowTimeField = config.getString(ROWTIME_FIELD);
            if (config.hasPath(WATERMARK_VAL)) {watermark = config.getLong(WATERMARK_VAL);
            }
        }
        String schemaContent = config.getString(SCHEMA);
        format = FormatType.from(config.getString(SOURCE_FORMAT).trim().toLowerCase());
        schemaInfo = JSONObject.parse(schemaContent, Feature.OrderedField);
    }

KafkaSink prepare

   public void prepare(FlinkEnvironment env) {topic = config.getString("topics");
        if (config.hasPath("semantic")) {semantic = config.getString("semantic");
        }
        String producerPrefix = "producer.";
        PropertiesUtil.setProperties(config, kafkaParams, producerPrefix, false);
        kafkaParams.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        kafkaParams.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
    }

4. 启动 execution.start

(sources, transforms, sinks);

通过步骤 2. 曾经晓得 execution 是依据不同引擎创立不同的执行环境,kafka 是 FlinkStreamExecution。那么就在 FlinkStreamExecution 中找到 start()办法

5. 执行 flink 代码程序

其中 sorce.getDate 在 KafkaTableStream 中的 getDate 办法,sink 在 KafkaSink 中的 outputStream 办法

 public void start(List sources, List transforms, List sinks) throws Exception {List<datastream> data = new ArrayList<>();</datastream
 
        for (FlinkStreamSource source : sources) {DataStream dataStream = source.getData(flinkEnvironment);
            data.add(dataStream);
            registerResultTable(source, dataStream);
        }
 
        DataStream input = data.get(0);
 
        for (FlinkStreamTransform transform : transforms) {DataStream stream = fromSourceTable(transform.getConfig()).orElse(input);
            input = transform.processStream(flinkEnvironment, stream);
            registerResultTable(transform, input);
            transform.registerFunction(flinkEnvironment);
        }
 
        for (FlinkStreamSink sink : sinks) {DataStream stream = fromSourceTable(sink.getConfig()).orElse(input);
            sink.outputStream(flinkEnvironment, stream);
        }
        try {LOGGER.info("Flink Execution Plan:{}", flinkEnvironment.getStreamExecutionEnvironment().getExecutionPlan());
            flinkEnvironment.getStreamExecutionEnvironment().execute(flinkEnvironment.getJobName());
        } catch (Exception e) {LOGGER.warn("Flink with job name [{}] execute failed", flinkEnvironment.getJobName());
            throw e;
        }
    }

6. 最初敞开

    protected final void close(List extends Plugin>... plugins) {
        PluginClosedException exceptionHolder = null;
        for (List extends Plugin> pluginList : plugins) {for (Plugin plugin : pluginList) {try (Plugin> closed = plugin) {// ignore} catch (Exception e) {
                    exceptionHolder = exceptionHolder == null ?
                            new PluginClosedException("below plugins closed error:") : exceptionHolder;
                    exceptionHolder.addSuppressed(new PluginClosedException(String.format("plugin %s closed error", plugin.getClass()), e));
                }
            }
        }
        if (exceptionHolder != null) {throw exceptionHolder;}
    }

Apache SeaTunnel

来,和社区一起成长!

Apache SeaTunnel(Incubating) 是一个分布式、高性能、易扩大、用于海量数据(离线 & 实时)同步和转化的数据集成平台。

仓库地址: https://github.com/apache/incubator-seatunnel

网址:https://seatunnel.apache.org/

**Proposal:**https://cwiki.apache.org/confluence/display/INCUBATOR/SeaTunnelProposal

**Apache SeaTunnel(Incubating) 2.1.0 下载地址:**https://seatunnel.apache.org/download

衷心欢送更多人退出!

咱们置信,在 「Community Over Code」(社区大于代码)、「Open and Cooperation」(凋谢合作)、「Meritocracy」(精英治理)、以及「 多样性与共识决策」等 The Apache Way 的指引下,咱们将迎来更加多元化和容纳的社区生态,共建开源精力带来的技术提高!

咱们诚邀各位有志于让外乡开源立足寰球的搭档退出 SeaTunnel 贡献者小家庭,一起共建开源!

提交问题和倡议:https://github.com/apache/incubator-seatunnel/issues

奉献代码:https://github.com/apache/incubator-seatunnel/pulls

订阅社区开发邮件列表 : [email protected]

开发邮件列表:[email protected]

退出 Slack:https://join.slack.com/t/apacheseatunnel/shared_invite/zt-1cmonqu2q-ljomD6bY1PQ~oOzfbxxXWQ

关注 Twitter: https://twitter.com/ASFSeaTunnel

< 🐬🐬 >

正文完
 0