点亮 ⭐️ Star · 照亮开源之路

GitHub:https://github.com/apache/incubator-seatunnel

目录

本文转载自Adobee Chen的博客-CSDN博客,看看是否有你感兴趣的吧!

如有出错,请多斧正。

一、启动脚本解析

二、源码解析

01 入口

02 execute()外围办法

  1. 其中 BaseSource、BaseTransform、BaseSink都是接口、都实现Plugin接口。他们的实现类就是对应的插件类型
  2. execute()办法向下走,创立一个执行环境。
  3. 调用plugin.prepare(env)
  4. 最初启动 execution.start(sources, transforms, sinks);执行flink 代码程序
  5. 最初敞开

一、启动脚本解析

在 /bin/start-seatunnel-flink.sh

#!/bin/bashfunction usage() {  echo "Usage: start-seatunnel-flink.sh [options]"  echo "  options:"  echo "    --config, -c FILE_PATH        Config file"  echo "    --variable, -i PROP=VALUE     Variable substitution, such as -i city=beijing, or -i date=20190318"  echo "    --check, -t                   Check config"  echo "    --help, -h                    Show this help message"} if [[ "[email protected]" = *--help ]] || [[ "[email protected]" = *-h ]] || [[ $# -le 1 ]]; then  usage  exit 0fi is_exist() {    if [ -z $1 ]; then      usage      exit -1    fi} PARAMS=""while (( "$#" )); do  case "$1" in    -c|--config)      CONFIG_FILE=$2      is_exist ${CONFIG_FILE}      shift 2      ;;     -i|--variable)      variable=$2      is_exist ${variable}      java_property_value="-D${variable}"      variables_substitution="${java_property_value} ${variables_substitution}"      shift 2      ;;     *) # preserve positional arguments      PARAMS="$PARAMS $1"      shift      ;;   esacdone if [ -z ${CONFIG_FILE} ]; then  echo "Error: The following option is required: [-c | --config]"  usage  exit -1fi # set positional arguments in their proper placeeval set -- "$PARAMS" BIN_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"APP_DIR=$(dirname ${BIN_DIR})CONF_DIR=${APP_DIR}/configPLUGINS_DIR=${APP_DIR}/libDEFAULT_CONFIG=${CONF_DIR}/application.confCONFIG_FILE=${CONFIG_FILE:-$DEFAULT_CONFIG} assemblyJarName=$(find ${PLUGINS_DIR} -name seatunnel-core-flink*.jar) if [ -f "${CONF_DIR}/seatunnel-env.sh" ]; then    source ${CONF_DIR}/seatunnel-env.shfi string_trim() {    echo $1 | awk '{$1=$1;print}'} export JVM_ARGS=$(string_trim "${variables_substitution}")  exec ${FLINK_HOME}/bin/flink run \    ${PARAMS} \    -c org.apache.seatunnel.SeatunnelFlink \    ${assemblyJarName} --config ${CONFIG_FILE}

其中: 启动脚本能接管的 --config --variable --check(还不反对) --help

只有不是 config、variable参数就放到PARAMS参数里,最初执行flink 执行命令,PARAMS当作flink参数执行。

org.apache.seatunnel.SeatunnelFlink 这个类就是主入口

二、源码解析

01 入口

public class SeatunnelFlink {     public static void main(String[] args) throws Exception {        FlinkCommandArgs flinkArgs = CommandLineUtils.parseFlinkArgs(args);        Seatunnel.run(flinkArgs);    } }

FlinkCommandArgs中进行命令行参数解析

  public static FlinkCommandArgs parseFlinkArgs(String[] args) {        FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();        JCommander.newBuilder()            .addObject(flinkCommandArgs)            .build()            .parse(args);        return flinkCommandArgs;    }

进入到Seatunnel.run(flinkArgs);

  public static FlinkCommandArgs parseFlinkArgs(String[] args) {        FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();        JCommander.newBuilder()            .addObject(flinkCommandArgs)            .build()            .parse(args);        return flinkCommandArgs;    }

进入到CommandFactory.createCommand(commandArgs)

依据不同的类型抉择Command

咱们看的是flinkCommand

    public static extends CommandArgs> Command createCommand(T commandArgs) {        switch (commandArgs.getEngineType()) {            case FLINK:                return (Command) new FlinkCommandBuilder().buildCommand((FlinkCommandArgs) commandArgs);            case SPARK:                return (Command) new SparkCommandBuilder().buildCommand((SparkCommandArgs) commandArgs);            default:                throw new RuntimeException(String.format("engine type: %s is not supported", commandArgs.getEngineType()));        }    }

进入到 buildCommand

依据是否查看config进入到不同的实现类

    public Command buildCommand(FlinkCommandArgs commandArgs) {        return commandArgs.isCheckConfig() ? new FlinkConfValidateCommand() : new FlinkTaskExecuteCommand();    }

FlinkConfValidateCommand、

FlinkTaskExecuteCommand

两个类都实现了Command类

并且都只有一个execute()办法

public class FlinkConfValidateCommand implements Commandpublic class FlinkTaskExecuteCommand extends BaseTaskExecuteCommand<flinkcommandargs, FlinkEnvironment>

在SeaTunnel.run(flinkArgs)进入

command.execute(commandArgs);

咱们先看FlinkTaskExecuteCommand

类中的execute办法

02 execute()外围办法

public void execute(FlinkCommandArgs flinkCommandArgs) {        //flink        EngineType engine = flinkCommandArgs.getEngineType();        // --config        String configFile = flinkCommandArgs.getConfigFile();        //将String变成Config类        Config config = new ConfigBuilder<>(configFile, engine).getConfig();        //解析执行上下文        ExecutionContext executionContext = new ExecutionContext<>(config, engine);        //解析 sources模块        List<basesource> sources = executionContext.getSources();</basesource        //解析 tansform模块        List<basetransform> transforms = executionContext.getTransforms();</basetransform        //解析 sink模块        List<basesink> sinks = executionContext.getSinks();</basesink         baseCheckConfig(sinks, transforms, sinks);        showAsciiLogo();         try (Execution<basesource,</basesource                BaseTransform,                BaseSink,                FlinkEnvironment> execution = new ExecutionFactory<>(executionContext).createExecution()) {            //筹备            prepare(executionContext.getEnvironment(), sources, transforms, sinks);            //启动            execution.start(sources, transforms, sinks);            //敞开            close(sources, transforms, sinks);        } catch (Exception e) {            throw new RuntimeException("Execute Flink task error", e);        }    }

1.其中 BaseSource、BaseTransform、BaseSink都是接口、都实现Plugin接口。他们的实现类就是对应的插件类型

如果咱们的source、sink是kafka的话那么对应的就是source就是KafkaTableStream、Sink就是KafkaSink

2. execute()办法向下走,创立一个执行环境。

进入ExecutionFactory种的createExecution()

    public Execution<basesource, BaseTransform, BaseSink, ENVIRONMENT> createExecution() {</basesource        Execution execution = null;        switch (executionContext.getEngine()) {            case SPARK:                SparkEnvironment sparkEnvironment = (SparkEnvironment) executionContext.getEnvironment();                switch (executionContext.getJobMode()) {                    case STREAMING:                        execution = new SparkStreamingExecution(sparkEnvironment);                        break;                    case STRUCTURED_STREAMING:                        execution = new StructuredStreamingExecution(sparkEnvironment);                        break;                    default:                        execution = new SparkBatchExecution(sparkEnvironment);                }                break;            case FLINK:                FlinkEnvironment flinkEnvironment = (FlinkEnvironment) executionContext.getEnvironment();                switch (executionContext.getJobMode()) {                    case STREAMING:                        execution = new FlinkStreamExecution(flinkEnvironment);                        break;                    default:                        execution = new FlinkBatchExecution(flinkEnvironment);                }                break;            default:                throw new IllegalArgumentException("No suitable engine");        }        LOGGER.info("current execution is [{}]", execution.getClass().getName());        return (Execution<basesource, BaseTransform, BaseSink, ENVIRONMENT>) execution;</basesource    }

进入到FlinkStreamExecution中,能够看到最终是创立flink 执行环境。

 private final FlinkEnvironment flinkEnvironment;     public FlinkStreamExecution(FlinkEnvironment streamEnvironment) {        this.flinkEnvironment = streamEnvironment;    }

3. 调用plugin.prepare(env)

    protected final void prepare(E env, List extends Plugin>... plugins) {        for (List extends Plugin> pluginList : plugins) {            pluginList.forEach(plugin -> plugin.prepare(env));        }    }

例如kafka->kafka

KafkaTableStream prepare

    public void prepare(FlinkEnvironment env) {        topic = config.getString(TOPICS);        PropertiesUtil.setProperties(config, kafkaParams, consumerPrefix, false);        tableName = config.getString(RESULT_TABLE_NAME);        if (config.hasPath(ROWTIME_FIELD)) {            rowTimeField = config.getString(ROWTIME_FIELD);            if (config.hasPath(WATERMARK_VAL)) {                watermark = config.getLong(WATERMARK_VAL);            }        }        String schemaContent = config.getString(SCHEMA);        format = FormatType.from(config.getString(SOURCE_FORMAT).trim().toLowerCase());        schemaInfo = JSONObject.parse(schemaContent, Feature.OrderedField);    }

KafkaSink prepare

   public void prepare(FlinkEnvironment env) {        topic = config.getString("topics");        if (config.hasPath("semantic")) {            semantic = config.getString("semantic");        }        String producerPrefix = "producer.";        PropertiesUtil.setProperties(config, kafkaParams, producerPrefix, false);        kafkaParams.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");        kafkaParams.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");    }

4.启动execution.start

(sources, transforms, sinks);

通过步骤2.曾经晓得execution是依据不同引擎创立不同的执行环境,kafka是FlinkStreamExecution。那么就在FlinkStreamExecution中找到start()办法

5.执行flink 代码程序

其中sorce.getDate在KafkaTableStream中的getDate办法,sink在KafkaSink中的outputStream办法

 public void start(List sources, List transforms, List sinks) throws Exception {        List<datastream> data = new ArrayList<>();</datastream         for (FlinkStreamSource source : sources) {            DataStream dataStream = source.getData(flinkEnvironment);            data.add(dataStream);            registerResultTable(source, dataStream);        }         DataStream input = data.get(0);         for (FlinkStreamTransform transform : transforms) {            DataStream stream = fromSourceTable(transform.getConfig()).orElse(input);            input = transform.processStream(flinkEnvironment, stream);            registerResultTable(transform, input);            transform.registerFunction(flinkEnvironment);        }         for (FlinkStreamSink sink : sinks) {            DataStream stream = fromSourceTable(sink.getConfig()).orElse(input);            sink.outputStream(flinkEnvironment, stream);        }        try {            LOGGER.info("Flink Execution Plan:{}", flinkEnvironment.getStreamExecutionEnvironment().getExecutionPlan());            flinkEnvironment.getStreamExecutionEnvironment().execute(flinkEnvironment.getJobName());        } catch (Exception e) {            LOGGER.warn("Flink with job name [{}] execute failed", flinkEnvironment.getJobName());            throw e;        }    }

6.最初敞开

    protected final void close(List extends Plugin>... plugins) {        PluginClosedException exceptionHolder = null;        for (List extends Plugin> pluginList : plugins) {            for (Plugin plugin : pluginList) {                try (Plugin> closed = plugin) {                    // ignore                } catch (Exception e) {                    exceptionHolder = exceptionHolder == null ?                            new PluginClosedException("below plugins closed error:") : exceptionHolder;                    exceptionHolder.addSuppressed(new PluginClosedException(                            String.format("plugin %s closed error", plugin.getClass()), e));                }            }        }        if (exceptionHolder != null) {            throw exceptionHolder;        }    }

Apache SeaTunnel

来,和社区一起成长!

Apache SeaTunnel(Incubating) 是一个分布式、高性能、易扩大、用于海量数据(离线&实时)同步和转化的数据集成平台。

仓库地址: https://github.com/apache/incubator-seatunnel

网址:https://seatunnel.apache.org/

**Proposal:**https://cwiki.apache.org/confluence/display/INCUBATOR/SeaTunnelProposal

**Apache SeaTunnel(Incubating) 2.1.0 下载地址:**https://seatunnel.apache.org/download

衷心欢送更多人退出!

咱们置信,在「Community Over Code」(社区大于代码)、「Open and Cooperation」(凋谢合作)、「Meritocracy」(精英治理)、以及「多样性与共识决策」等 The Apache Way 的指引下,咱们将迎来更加多元化和容纳的社区生态,共建开源精力带来的技术提高!

咱们诚邀各位有志于让外乡开源立足寰球的搭档退出 SeaTunnel 贡献者小家庭,一起共建开源!

提交问题和倡议:https://github.com/apache/incubator-seatunnel/issues

奉献代码:https://github.com/apache/incubator-seatunnel/pulls

订阅社区开发邮件列表 : [email protected]

开发邮件列表:[email protected]

退出 Slack:https://join.slack.com/t/apacheseatunnel/shared_invite/zt-1cmonqu2q-ljomD6bY1PQ~oOzfbxxXWQ

关注 Twitter: https://twitter.com/ASFSeaTunnel

< >