1、start-cluster.sh

# 1、获取文件所在的门路bin=`dirname "$0"`bin=`cd "$bin"; pwd`# 2、先加载配置文件,外面都是一些函数和变量,还有一些逻辑. "$bin"/config.sh# 3、启动JobManager# Start the JobManager instance(s)shopt -s nocasematch# 4、判断JobManager的启动模式,是集群还是单机模式,读取的是flink-conf.yaml中的 high-availability.typeif [[ $HIGH_AVAILABILITY == "zookeeper" ]]; then    # HA Mode    # 5、如果是基于zookeeper集群模式,调用readMasters    readMasters    echo "Starting HA cluster with ${#MASTERS[@]} masters."    for ((i=0;i<${#MASTERS[@]};++i)); do        master=${MASTERS[i]}        webuiport=${WEBUIPORTS[i]}        # 6、如果是本地模式,本地启动        if [ ${MASTERS_ALL_LOCALHOST} = true ] ; then            "${FLINK_BIN_DIR}"/jobmanager.sh start "${master}" "${webuiport}"        else            # 7、近程通过SSH形式进行后盾启动            ssh -n $FLINK_SSH_OPTS $master -- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/jobmanager.sh\" start ${master} ${webuiport} &"        fi    doneelse    echo "Starting cluster."    # Start single JobManager on this machine    # 8、以后节点启动JobManager    "$FLINK_BIN_DIR"/jobmanager.sh startfishopt -u nocasematch# Start TaskManager instance(s)# 9、启动TaskManagerTMWorkers start

2、加载config.sh

# 构建Flink ClassPathconstructFlinkClassPath() {    local FLINK_DIST    local FLINK_CLASSPATH    while read -d '' -r jarfile ; do        if [[ "$jarfile" =~ .*/flink-dist[^/]*.jar$ ]]; then            FLINK_DIST="$FLINK_DIST":"$jarfile"        elif [[ "$FLINK_CLASSPATH" == "" ]]; then            FLINK_CLASSPATH="$jarfile";        else            FLINK_CLASSPATH="$FLINK_CLASSPATH":"$jarfile"        fi    done < <(find "$FLINK_LIB_DIR" ! -type d -name '*.jar' -print0 | sort -z)    local FLINK_DIST_COUNT    FLINK_DIST_COUNT="$(echo "$FLINK_DIST" | tr -s ':' '\n' | grep -v '^$' | wc -l)"    # If flink-dist*.jar cannot be resolved write error messages to stderr since stdout is stored    # as the classpath and exit function with empty classpath to force process failure    if [[ "$FLINK_DIST" == "" ]]; then        (>&2 echo "[ERROR] Flink distribution jar not found in $FLINK_LIB_DIR.")        exit 1    elif [[ "$FLINK_DIST_COUNT" -gt 1 ]]; then        (>&2 echo "[ERROR] Multiple flink-dist*.jar found in $FLINK_LIB_DIR. Please resolve.")        exit 1    fi    echo "$FLINK_CLASSPATH""$FLINK_DIST"}findFlinkDistJar() {    local FLINK_DIST    FLINK_DIST="$(find "$FLINK_LIB_DIR" -name 'flink-dist*.jar')"    local FLINK_DIST_COUNT    FLINK_DIST_COUNT="$(echo "$FLINK_DIST" | wc -l)"    # If flink-dist*.jar cannot be resolved write error messages to stderr since stdout is stored    # as the classpath and exit function with empty classpath to force process failure    if [[ "$FLINK_DIST" == "" ]]; then        (>&2 echo "[ERROR] Flink distribution jar not found in $FLINK_LIB_DIR.")        exit 1    elif [[ "$FLINK_DIST_COUNT" -gt 1 ]]; then        (>&2 echo "[ERROR] Multiple flink-dist*.jar found in $FLINK_LIB_DIR. Please resolve.")        exit 1    fi    echo "$FLINK_DIST"}findSqlGatewayJar() {    local SQL_GATEWAY    SQL_GATEWAY="$(find "$FLINK_OPT_DIR" -name 'flink-sql-gateway*.jar')"    local SQL_GATEWAY_COUNT    SQL_GATEWAY_COUNT="$(echo "$SQL_GATEWAY" | wc -l)"    # If flink-sql-gateway*.jar cannot be resolved write error messages to stderr since stdout is stored    # as the classpath and exit function with empty classpath to force process failure    if [[ "$SQL_GATEWAY" == "" ]]; then        (>&2 echo "[ERROR] Flink sql gateway jar not found in $FLINK_OPT_DIR.")        exit 1    elif [[ "$SQL_GATEWAY_COUNT" -gt 1 ]]; then        (>&2 echo "[ERROR] Multiple flink-sql-gateway*.jar found in $FLINK_OPT_DIR. Please resolve.")        exit 1    fi    echo "$SQL_GATEWAY"}findFlinkPythonJar() {    local FLINK_PYTHON    FLINK_PYTHON="$(find "$FLINK_OPT_DIR" -name 'flink-python*.jar')"    local FLINK_PYTHON_COUNT    FLINK_PYTHON_COUNT="$(echo "FLINK_PYTHON" | wc -l)"    # If flink-python*.jar cannot be resolved write error messages to stderr since stdout is stored    # as the classpath and exit function with empty classpath to force process failure    if [[ "$FLINK_PYTHON" == "" ]]; then        echo "[WARN] Flink python jar not found in $FLINK_OPT_DIR."    elif [[ "$FLINK_PYTHON_COUNT" -gt 1 ]]; then        (>&2 echo "[ERROR] Multiple flink-python*.jar found in $FLINK_OPT_DIR. Please resolve.")        exit 1    fi    echo "$FLINK_PYTHON"}# These are used to mangle paths that are passed to java when using# cygwin. Cygwin paths are like linux paths, i.e. /path/to/somewhere# but the windows java version expects them in Windows Format, i.e. C:\bla\blub.# "cygpath" can do the conversion.manglePath() {    UNAME=$(uname -s)    if [ "${UNAME:0:6}" == "CYGWIN" ]; then        echo `cygpath -w "$1"`    else        echo $1    fi}manglePathList() {    UNAME=$(uname -s)    # a path list, for example a java classpath    if [ "${UNAME:0:6}" == "CYGWIN" ]; then        echo `cygpath -wp "$1"`    else        echo $1    fi}# Looks up a config value by key from a simple YAML-style key-value map.# $1: key to look up# $2: default value to return if key does not exist# $3: config file to read fromreadFromConfig() {    local key=$1    local defaultValue=$2    local configFile=$3    # first extract the value with the given key (1st sed), then trim the result (2nd sed)    # if a key exists multiple times, take the "last" one (tail)    local value=`sed -n "s/^[ ]*${key}[ ]*: \([^#]*\).*$/\1/p" "${configFile}" | sed "s/^ *//;s/ *$//" | tail -n 1`    [ -z "$value" ] && echo "$defaultValue" || echo "$value"}######################################################################################################################### DEFAULT CONFIG VALUES: These values will be used when nothing has been specified in conf/flink-conf.yaml# -or- the respective environment variables are not set.######################################################################################################################### WARNING !!! , these values are only used if there is nothing else is specified in# conf/flink-conf.yamlDEFAULT_ENV_PID_DIR="/tmp"                          # Directory to store *.pid files toDEFAULT_ENV_LOG_MAX=10                              # Maximum number of old log files to keepDEFAULT_ENV_JAVA_OPTS=""                            # Optional JVM argsDEFAULT_ENV_JAVA_OPTS_JM=""                         # Optional JVM args (JobManager)DEFAULT_ENV_JAVA_OPTS_TM=""                         # Optional JVM args (TaskManager)DEFAULT_ENV_JAVA_OPTS_HS=""                         # Optional JVM args (HistoryServer)DEFAULT_ENV_JAVA_OPTS_CLI=""                        # Optional JVM args (Client)DEFAULT_ENV_SSH_OPTS=""                             # Optional SSH parameters running in cluster modeDEFAULT_YARN_CONF_DIR=""                            # YARN Configuration Directory, if necessaryDEFAULT_HADOOP_CONF_DIR=""                          # Hadoop Configuration Directory, if necessaryDEFAULT_HBASE_CONF_DIR=""                           # HBase Configuration Directory, if necessary######################################################################################################################### CONFIG KEYS: The default values can be overwritten by the following keys in conf/flink-conf.yaml########################################################################################################################KEY_TASKM_COMPUTE_NUMA="taskmanager.compute.numa"KEY_ENV_PID_DIR="env.pid.dir"KEY_ENV_LOG_DIR="env.log.dir"KEY_ENV_LOG_MAX="env.log.max"KEY_ENV_YARN_CONF_DIR="env.yarn.conf.dir"KEY_ENV_HADOOP_CONF_DIR="env.hadoop.conf.dir"KEY_ENV_HBASE_CONF_DIR="env.hbase.conf.dir"KEY_ENV_JAVA_HOME="env.java.home"KEY_ENV_JAVA_OPTS="env.java.opts.all"KEY_ENV_JAVA_OPTS_JM="env.java.opts.jobmanager"KEY_ENV_JAVA_OPTS_TM="env.java.opts.taskmanager"KEY_ENV_JAVA_OPTS_HS="env.java.opts.historyserver"KEY_ENV_JAVA_OPTS_CLI="env.java.opts.client"KEY_ENV_SSH_OPTS="env.ssh.opts"KEY_HIGH_AVAILABILITY="high-availability.type"KEY_ZK_HEAP_MB="zookeeper.heap.mb"######################################################################################################################### PATHS AND CONFIG########################################################################################################################target="$0"# For the case, the executable has been directly symlinked, figure out# the correct bin path by following its symlink up to an upper bound.# Note: we can't use the readlink utility here if we want to be POSIX# compatible.iteration=0while [ -L "$target" ]; do    if [ "$iteration" -gt 100 ]; then        echo "Cannot resolve path: You have a cyclic symlink in $target."        break    fi    ls=`ls -ld -- "$target"`    target=`expr "$ls" : '.* -> \(.*\)$'`    iteration=$((iteration + 1))done# Convert relative path to absolute path and resolve directory symlinksbin=`dirname "$target"`SYMLINK_RESOLVED_BIN=`cd "$bin"; pwd -P`# Define the main directory of the flink installation# If config.sh is called by pyflink-shell.sh in python bin directory(pip installed), then do not need to set the FLINK_HOME here.if [ -z "$_FLINK_HOME_DETERMINED" ]; then    FLINK_HOME=`dirname "$SYMLINK_RESOLVED_BIN"`fiif [ -z "$FLINK_LIB_DIR" ]; then FLINK_LIB_DIR=$FLINK_HOME/lib; fiif [ -z "$FLINK_PLUGINS_DIR" ]; then FLINK_PLUGINS_DIR=$FLINK_HOME/plugins; fiif [ -z "$FLINK_OPT_DIR" ]; then FLINK_OPT_DIR=$FLINK_HOME/opt; fi# These need to be mangled because they are directly passed to java.# The above lib path is used by the shell script to retrieve jars in a# directory, so it needs to be unmangled.FLINK_HOME_DIR_MANGLED=`manglePath "$FLINK_HOME"`if [ -z "$FLINK_CONF_DIR" ]; then FLINK_CONF_DIR=$FLINK_HOME_DIR_MANGLED/conf; fiFLINK_BIN_DIR=$FLINK_HOME_DIR_MANGLED/binDEFAULT_FLINK_LOG_DIR=$FLINK_HOME_DIR_MANGLED/logFLINK_CONF_FILE="flink-conf.yaml"YAML_CONF=${FLINK_CONF_DIR}/${FLINK_CONF_FILE}### Exported environment variables #### 定义一些环境变量export FLINK_CONF_DIRexport FLINK_BIN_DIRexport FLINK_PLUGINS_DIR# export /lib dir to access it during deployment of the Yarn staging filesexport FLINK_LIB_DIR# export /opt dir to access it for the SQL clientexport FLINK_OPT_DIR######################################################################################################################### ENVIRONMENT VARIABLES######################################################################################################################### read JAVA_HOME from config with no default valueMY_JAVA_HOME=$(readFromConfig ${KEY_ENV_JAVA_HOME} "" "${YAML_CONF}")# check if config specified JAVA_HOMEif [ -z "${MY_JAVA_HOME}" ]; then    # config did not specify JAVA_HOME. Use system JAVA_HOME    MY_JAVA_HOME="${JAVA_HOME}"fi# check if we have a valid JAVA_HOME and if java is not availableif [ -z "${MY_JAVA_HOME}" ] && ! type java > /dev/null 2> /dev/null; then    echo "Please specify JAVA_HOME. Either in Flink config ./conf/flink-conf.yaml or as system-wide JAVA_HOME."    exit 1else    JAVA_HOME="${MY_JAVA_HOME}"fiUNAME=$(uname -s)if [ "${UNAME:0:6}" == "CYGWIN" ]; then    JAVA_RUN=javaelse    if [[ -d "$JAVA_HOME" ]]; then        JAVA_RUN="$JAVA_HOME"/bin/java    else        JAVA_RUN=java    fifi# Define HOSTNAME if it is not already setif [ -z "${HOSTNAME}" ]; then    HOSTNAME=`hostname`fiIS_NUMBER="^[0-9]+$"# Verify that NUMA tooling is availablecommand -v numactl >/dev/null 2>&1if [[ $? -ne 0 ]]; then    FLINK_TM_COMPUTE_NUMA="false"else    # Define FLINK_TM_COMPUTE_NUMA if it is not already set    if [ -z "${FLINK_TM_COMPUTE_NUMA}" ]; then        FLINK_TM_COMPUTE_NUMA=$(readFromConfig ${KEY_TASKM_COMPUTE_NUMA} "false" "${YAML_CONF}")    fifiif [ -z "${MAX_LOG_FILE_NUMBER}" ]; then    MAX_LOG_FILE_NUMBER=$(readFromConfig ${KEY_ENV_LOG_MAX} ${DEFAULT_ENV_LOG_MAX} "${YAML_CONF}")    export MAX_LOG_FILE_NUMBERfiif [ -z "${FLINK_LOG_DIR}" ]; then    FLINK_LOG_DIR=$(readFromConfig ${KEY_ENV_LOG_DIR} "${DEFAULT_FLINK_LOG_DIR}" "${YAML_CONF}")fiif [ -z "${YARN_CONF_DIR}" ]; then    YARN_CONF_DIR=$(readFromConfig ${KEY_ENV_YARN_CONF_DIR} "${DEFAULT_YARN_CONF_DIR}" "${YAML_CONF}")fiif [ -z "${HADOOP_CONF_DIR}" ]; then    HADOOP_CONF_DIR=$(readFromConfig ${KEY_ENV_HADOOP_CONF_DIR} "${DEFAULT_HADOOP_CONF_DIR}" "${YAML_CONF}")fiif [ -z "${HBASE_CONF_DIR}" ]; then    HBASE_CONF_DIR=$(readFromConfig ${KEY_ENV_HBASE_CONF_DIR} "${DEFAULT_HBASE_CONF_DIR}" "${YAML_CONF}")fiif [ -z "${FLINK_PID_DIR}" ]; then    FLINK_PID_DIR=$(readFromConfig ${KEY_ENV_PID_DIR} "${DEFAULT_ENV_PID_DIR}" "${YAML_CONF}")fiif [ -z "${FLINK_ENV_JAVA_OPTS}" ]; then    FLINK_ENV_JAVA_OPTS=$(readFromConfig ${KEY_ENV_JAVA_OPTS} "" "${YAML_CONF}")    if [ -z "${FLINK_ENV_JAVA_OPTS}" ]; then      # try deprecated key      FLINK_ENV_JAVA_OPTS=$(readFromConfig "env.java.opts" "${DEFAULT_ENV_JAVA_OPTS}" "${YAML_CONF}")    fi    # Remove leading and ending double quotes (if present) of value    FLINK_ENV_JAVA_OPTS="$( echo "${FLINK_ENV_JAVA_OPTS}" | sed -e 's/^"//'  -e 's/"$//' )"fiif [ -z "${FLINK_ENV_JAVA_OPTS_JM}" ]; then    FLINK_ENV_JAVA_OPTS_JM=$(readFromConfig ${KEY_ENV_JAVA_OPTS_JM} "${DEFAULT_ENV_JAVA_OPTS_JM}" "${YAML_CONF}")    # Remove leading and ending double quotes (if present) of value    FLINK_ENV_JAVA_OPTS_JM="$( echo "${FLINK_ENV_JAVA_OPTS_JM}" | sed -e 's/^"//'  -e 's/"$//' )"fiif [ -z "${FLINK_ENV_JAVA_OPTS_TM}" ]; then    FLINK_ENV_JAVA_OPTS_TM=$(readFromConfig ${KEY_ENV_JAVA_OPTS_TM} "${DEFAULT_ENV_JAVA_OPTS_TM}" "${YAML_CONF}")    # Remove leading and ending double quotes (if present) of value    FLINK_ENV_JAVA_OPTS_TM="$( echo "${FLINK_ENV_JAVA_OPTS_TM}" | sed -e 's/^"//'  -e 's/"$//' )"fiif [ -z "${FLINK_ENV_JAVA_OPTS_HS}" ]; then    FLINK_ENV_JAVA_OPTS_HS=$(readFromConfig ${KEY_ENV_JAVA_OPTS_HS} "${DEFAULT_ENV_JAVA_OPTS_HS}" "${YAML_CONF}")    # Remove leading and ending double quotes (if present) of value    FLINK_ENV_JAVA_OPTS_HS="$( echo "${FLINK_ENV_JAVA_OPTS_HS}" | sed -e 's/^"//'  -e 's/"$//' )"fiif [ -z "${FLINK_ENV_JAVA_OPTS_CLI}" ]; then    FLINK_ENV_JAVA_OPTS_CLI=$(readFromConfig ${KEY_ENV_JAVA_OPTS_CLI} "${DEFAULT_ENV_JAVA_OPTS_CLI}" "${YAML_CONF}")    # Remove leading and ending double quotes (if present) of value    FLINK_ENV_JAVA_OPTS_CLI="$( echo "${FLINK_ENV_JAVA_OPTS_CLI}" | sed -e 's/^"//'  -e 's/"$//' )"fi# -z是如果FLINK_SSH_OPTS的length为0的时候为true,-n是不为0的时候字符串为trueif [ -z "${FLINK_SSH_OPTS}" ]; then    FLINK_SSH_OPTS=$(readFromConfig ${KEY_ENV_SSH_OPTS} "${DEFAULT_ENV_SSH_OPTS}" "${YAML_CONF}")fi# Define ZK_HEAP if it is not already setif [ -z "${ZK_HEAP}" ]; then    ZK_HEAP=$(readFromConfig ${KEY_ZK_HEAP_MB} 0 "${YAML_CONF}")fi# High availabilityif [ -z "${HIGH_AVAILABILITY}" ]; then     HIGH_AVAILABILITY=$(readFromConfig ${KEY_HIGH_AVAILABILITY} "" "${YAML_CONF}")     if [ -z "${HIGH_AVAILABILITY}" ]; then        # Try deprecated value        DEPRECATED_HA=$(readFromConfig "high-availability" "$(readFromConfig "recovery.mode" "" "${YAML_CONF}")" "${YAML_CONF}")        if [ -z "${DEPRECATED_HA}" ]; then            HIGH_AVAILABILITY="none"        elif [ ${DEPRECATED_HA} == "standalone" ]; then            # Standalone is now 'none'            HIGH_AVAILABILITY="none"        else            HIGH_AVAILABILITY=${DEPRECATED_HA}        fi     fifi# Arguments for the JVM. Used for job and task manager JVMs.# DO NOT USE FOR MEMORY SETTINGS! Use conf/flink-conf.yaml with keys# JobManagerOptions#TOTAL_PROCESS_MEMORY and TaskManagerOptions#TOTAL_PROCESS_MEMORY for that!if [ -z "${JVM_ARGS}" ]; then    JVM_ARGS=""fi# Check if deprecated HADOOP_HOME is set, and specify config path to HADOOP_CONF_DIR if it's empty.if [ -z "$HADOOP_CONF_DIR" ]; then    if [ -n "$HADOOP_HOME" ]; then        # HADOOP_HOME is set. Check if its a Hadoop 1.x or 2.x HADOOP_HOME path        if [ -d "$HADOOP_HOME/conf" ]; then            # It's Hadoop 1.x            HADOOP_CONF_DIR="$HADOOP_HOME/conf"        fi        if [ -d "$HADOOP_HOME/etc/hadoop" ]; then            # It's Hadoop 2.2+            HADOOP_CONF_DIR="$HADOOP_HOME/etc/hadoop"        fi    fifi# if neither HADOOP_CONF_DIR nor HADOOP_CLASSPATH are set, use some common default (if available)if [ -z "$HADOOP_CONF_DIR" ] && [ -z "$HADOOP_CLASSPATH" ]; then    if [ -d "/etc/hadoop/conf" ]; then        echo "Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR or HADOOP_CLASSPATH was set."        HADOOP_CONF_DIR="/etc/hadoop/conf"    fifi# Check if deprecated HBASE_HOME is set, and specify config path to HBASE_CONF_DIR if it's empty.if [ -z "$HBASE_CONF_DIR" ]; then    if [ -n "$HBASE_HOME" ]; then        # HBASE_HOME is set.        if [ -d "$HBASE_HOME/conf" ]; then            HBASE_CONF_DIR="$HBASE_HOME/conf"        fi    fifi# try and set HBASE_CONF_DIR to some common default if it's not setif [ -z "$HBASE_CONF_DIR" ]; then    if [ -d "/etc/hbase/conf" ]; then        echo "Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set."        HBASE_CONF_DIR="/etc/hbase/conf"    fifiINTERNAL_HADOOP_CLASSPATHS="${HADOOP_CLASSPATH}:${HADOOP_CONF_DIR}:${YARN_CONF_DIR}"if [ -n "${HBASE_CONF_DIR}" ]; then    INTERNAL_HADOOP_CLASSPATHS="${INTERNAL_HADOOP_CLASSPATHS}:${HBASE_CONF_DIR}"fi# Auxiliary function which extracts the name of host from a line which# also potentially includes topology information and the taskManager typeextractHostName() {    # handle comments: extract first part of string (before first # character)    WORKER=`echo $1 | cut -d'#' -f 1`    # Extract the hostname from the network hierarchy    if [[ "$WORKER" =~ ^.*/([0-9a-zA-Z.-]+)$ ]]; then            WORKER=${BASH_REMATCH[1]}    fi    echo $WORKER}readMasters() {    MASTERS_FILE="${FLINK_CONF_DIR}/masters"    if [[ ! -f "${MASTERS_FILE}" ]]; then        echo "No masters file. Please specify masters in 'conf/masters'."        exit 1    fi    MASTERS=()    WEBUIPORTS=()    MASTERS_ALL_LOCALHOST=true    GOON=true    while $GOON; do        read line || GOON=false        HOSTWEBUIPORT=$( extractHostName $line)        if [ -n "$HOSTWEBUIPORT" ]; then            HOST=$(echo $HOSTWEBUIPORT | cut -f1 -d:)            WEBUIPORT=$(echo $HOSTWEBUIPORT | cut -s -f2 -d:)            MASTERS+=(${HOST})            if [ -z "$WEBUIPORT" ]; then                WEBUIPORTS+=(0)            else                WEBUIPORTS+=(${WEBUIPORT})            fi            if [ "${HOST}" != "localhost" ] && [ "${HOST}" != "127.0.0.1" ] ; then                MASTERS_ALL_LOCALHOST=false            fi        fi    done < "$MASTERS_FILE"}# 12、循环遍历slaves文件外面的节点readWorkers() {    WORKERS_FILE="${FLINK_CONF_DIR}/workers"    if [[ ! -f "$WORKERS_FILE" ]]; then        echo "No workers file. Please specify workers in 'conf/workers'."        exit 1    fi    WORKERS=()    WORKERS_ALL_LOCALHOST=true    GOON=true    while $GOON; do        read line || GOON=false        HOST=$( extractHostName $line)        if [ -n "$HOST" ] ; then            WORKERS+=(${HOST})            if [ "${HOST}" != "localhost" ] && [ "${HOST}" != "127.0.0.1" ] ; then                WORKERS_ALL_LOCALHOST=false            fi        fi    done < "$WORKERS_FILE"}# starts or stops TMs on all workers# TMWorkers start|stop# 10、启动所有的TaskManagerTMWorkers() {    CMD=$1    # 11、读取slaves配置文件内容    readWorkers    # 13、本机启动    if [ ${WORKERS_ALL_LOCALHOST} = true ] ; then        # all-local setup        for worker in ${WORKERS[@]}; do            "${FLINK_BIN_DIR}"/taskmanager.sh "${CMD}"        done    else        # 14、近程启动        # non-local setup        # start/stop TaskManager instance(s) using pdsh (Parallel Distributed Shell) when available        command -v pdsh >/dev/null 2>&1        if [[ $? -ne 0 ]]; then            for worker in ${WORKERS[@]}; do                ssh -n $FLINK_SSH_OPTS $worker -- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" \"${CMD}\" &"            done        else            PDSH_SSH_ARGS="" PDSH_SSH_ARGS_APPEND=$FLINK_SSH_OPTS pdsh -w $(IFS=, ; echo "${WORKERS[*]}") \                "nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" \"${CMD}\""        fi    fi}runBashJavaUtilsCmd() {    local cmd=$1    local conf_dir=$2    local class_path=$3    local dynamic_args=${@:4}    class_path=`manglePathList "${class_path}"`    local output=`"${JAVA_RUN}" -classpath "${class_path}" org.apache.flink.runtime.util.bash.BashJavaUtils ${cmd} --configDir "${conf_dir}" $dynamic_args 2>&1 | tail -n 1000`    if [[ $? -ne 0 ]]; then        echo "[ERROR] Cannot run BashJavaUtils to execute command ${cmd}." 1>&2        # Print the output in case the user redirect the log to console.        echo "$output" 1>&2        exit 1    fi    echo "$output"}extractExecutionResults() {    local output="$1"    local expected_lines="$2"    local EXECUTION_PREFIX="BASH_JAVA_UTILS_EXEC_RESULT:"    local execution_results    local num_lines    execution_results=$(echo "${output}" | grep ${EXECUTION_PREFIX})    num_lines=$(echo "${execution_results}" | wc -l)    # explicit check for empty result, because if execution_results is empty, then wc returns 1    if [[ -z ${execution_results} ]]; then        echo "[ERROR] The execution result is empty." 1>&2        exit 1    fi    if [[ ${num_lines} -ne ${expected_lines} ]]; then        echo "[ERROR] The execution results has unexpected number of lines, expected: ${expected_lines}, actual: ${num_lines}." 1>&2        echo "[ERROR] An execution result line is expected following the prefix '${EXECUTION_PREFIX}'" 1>&2        echo "$output" 1>&2        exit 1    fi    echo "${execution_results//${EXECUTION_PREFIX}/}"}extractLoggingOutputs() {    local output="$1"    local EXECUTION_PREFIX="BASH_JAVA_UTILS_EXEC_RESULT:"    echo "${output}" | grep -v ${EXECUTION_PREFIX}}parseResourceParamsAndExportLogs() {  local cmd=$1  java_utils_output=$(runBashJavaUtilsCmd ${cmd} "${FLINK_CONF_DIR}" "${FLINK_BIN_DIR}/bash-java-utils.jar:$(findFlinkDistJar)" "${@:2}")  logging_output=$(extractLoggingOutputs "${java_utils_output}")  params_output=$(extractExecutionResults "${java_utils_output}" 2)  if [[ $? -ne 0 ]]; then    echo "[ERROR] Could not get JVM parameters and dynamic configurations properly."    echo "[ERROR] Raw output from BashJavaUtils:"    echo "$java_utils_output"    exit 1  fi  jvm_params=$(echo "${params_output}" | head -n1)  export JVM_ARGS="${JVM_ARGS} ${jvm_params}"  export DYNAMIC_PARAMETERS=$(IFS=" " echo "${params_output}" | tail -n1)  export FLINK_INHERITED_LOGS="$FLINK_INHERITED_LOGSRESOURCE_PARAMS extraction logs:jvm_params: $jvm_paramsdynamic_configs: $DYNAMIC_PARAMETERSlogs: $logging_output"}parseJmArgsAndExportLogs() {  parseResourceParamsAndExportLogs GET_JM_RESOURCE_PARAMS "$@"}parseTmArgsAndExportLogs() {  parseResourceParamsAndExportLogs GET_TM_RESOURCE_PARAMS "$@"}

要害的函数 TMWorkers(启动TaskManager)readMasters(读取masters信息)readWorkers(读取worker信息)定义了一些环境变量

3、jobmanager.sh

# Start/stop a Flink JobManager.USAGE="Usage: jobmanager.sh ((start|start-foreground) [args])|stop|stop-all"STARTSTOP=$1if [ -z $2 ] || [[ $2 == "-D" ]]; then    # start [-D ...]    args=("${@:2}")elif [ -z $3 ] || [[ $3 == "-D" ]]; then    # legacy path: start <host> [-D ...]    HOST=$2    args=("${@:3}")else    # legacy path: start <host> <port> [-D ...]    HOST=$2    WEBUIPORT=$3    args=("${@:4}")fi# 16、校验参数if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then  echo $USAGE  exit 1fibin=`dirname "$0"`bin=`cd "$bin"; pwd`# 17、加载config.sh. "$bin"/config.sh# 18、JobManager启动的主类,这是还是很重要的ENTRYPOINT=standalonesessionif [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then    # Add JobManager-specific JVM options    export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_JM}"    parseJmArgsAndExportLogs "${ARGS[@]}"    args=("--configDir" "${FLINK_CONF_DIR}" "--executionMode" "cluster" "${args[@]}")    if [ ! -z $HOST ]; then        args+=("--host")        args+=("${HOST}")    fi    if [ ! -z $WEBUIPORT ]; then        args+=("--webui-port")        args+=("${WEBUIPORT}")    fi    if [ ! -z "${DYNAMIC_PARAMETERS}" ]; then        args=(${DYNAMIC_PARAMETERS[@]} "${args[@]}")    fifiif [[ $STARTSTOP == "start-foreground" ]]; then    exec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${args[@]}"else    # 19、通过 flink-daemon.sh 启动脚本 standalonesession    "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${args[@]}"fi

4、taskmanger.sh

# Start/stop a Flink TaskManager.USAGE="Usage: taskmanager.sh (start|start-foreground|stop|stop-all)"STARTSTOP=$1ARGS=("${@:2}")if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then  echo $USAGE  exit 1fibin=`dirname "$0"`bin=`cd "$bin"; pwd`. "$bin"/config.sh# 20、定义变量taskexecutorENTRYPOINT=taskexecutorif [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then    # if no other JVM options are set, set the GC to G1    if [ -z "${FLINK_ENV_JAVA_OPTS}" ] && [ -z "${FLINK_ENV_JAVA_OPTS_TM}" ]; then        export JVM_ARGS="$JVM_ARGS -XX:+UseG1GC"    fi    # Add TaskManager-specific JVM options    export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_TM}"    # Startup parameters    parseTmArgsAndExportLogs "${ARGS[@]}"    if [ ! -z "${DYNAMIC_PARAMETERS}" ]; then        ARGS=(${DYNAMIC_PARAMETERS[@]} "${ARGS[@]}")    fi    ARGS=("--configDir" "${FLINK_CONF_DIR}" "${ARGS[@]}")fiif [[ $STARTSTOP == "start-foreground" ]]; then    exec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${ARGS[@]}"else    if [[ $FLINK_TM_COMPUTE_NUMA == "false" ]]; then        # Start a single TaskManager        "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${ARGS[@]}"    else        # Example output from `numactl --show` on an AWS c4.8xlarge:        # policy: default        # preferred node: current        # physcpubind: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35        # cpubind: 0 1        # nodebind: 0 1        # membind: 0 1        read -ra NODE_LIST <<< $(numactl --show | grep "^nodebind: ")        for NODE_ID in "${NODE_LIST[@]:1}"; do            # Start a TaskManager for each NUMA node            # 启动            numactl --membind=$NODE_ID --cpunodebind=$NODE_ID -- "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${ARGS[@]}"        done    fifi

5、flink-daemon.sh

# Start/stop a Flink daemon.USAGE="Usage: flink-daemon.sh (start|stop|stop-all) (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob|sql-gateway) [args]"STARTSTOP=$1DAEMON=$2ARGS=("${@:3}") # get remaining arguments as arraybin=`dirname "$0"`bin=`cd "$bin"; pwd`# 加载配置. "$bin"/config.shcase $DAEMON in    (taskexecutor)        # 21、TaskExecutor启动的类,TaskManagerRunner        CLASS_TO_RUN=org.apache.flink.runtime.taskexecutor.TaskManagerRunner    ;;    (zookeeper)        CLASS_TO_RUN=org.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeer    ;;    (historyserver)        CLASS_TO_RUN=org.apache.flink.runtime.webmonitor.history.HistoryServer    ;;    (standalonesession)        # 22、JobManager启动类 StandaloneSessionClusterEntrypoint        CLASS_TO_RUN=org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint    ;;    (standalonejob)        CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint    ;;    (sql-gateway)        CLASS_TO_RUN=org.apache.flink.table.gateway.SqlGateway        SQL_GATEWAY_CLASSPATH="`findSqlGatewayJar`":"`findFlinkPythonJar`"    ;;    (*)        echo "Unknown daemon '${DAEMON}'. $USAGE."        exit 1    ;;esacif [ "$FLINK_IDENT_STRING" = "" ]; then    FLINK_IDENT_STRING="$USER"fiFLINK_TM_CLASSPATH=`constructFlinkClassPath`pid=$FLINK_PID_DIR/flink-$FLINK_IDENT_STRING-$DAEMON.pidmkdir -p "$FLINK_PID_DIR"# Log files for daemons are indexed from the process ID's position in the PID# file. The following lock prevents a race condition during daemon startup# when multiple daemons read, index, and write to the PID file concurrently.# The lock is created on the PID directory since a lock file cannot be safely# removed. The daemon is started with the lock closed and the lock remains# active in this script until the script exits.command -v flock >/dev/null 2>&1if [[ $? -eq 0 ]]; then    exec 200<"$FLINK_PID_DIR"    flock 200fi# Ascending ID depending on number of lines in pid file.# This allows us to start multiple daemon of each type.id=$([ -f "$pid" ] && echo $(wc -l < "$pid") || echo "0")FLINK_LOG_PREFIX="${FLINK_LOG_DIR}/flink-${FLINK_IDENT_STRING}-${DAEMON}-${id}-${HOSTNAME}"log="${FLINK_LOG_PREFIX}.log"out="${FLINK_LOG_PREFIX}.out"log_setting=("-Dlog.file=${log}" "-Dlog4j.configuration=file:${FLINK_CONF_DIR}/log4j.properties" "-Dlog4j.configurationFile=file:${FLINK_CONF_DIR}/log4j.properties" "-Dlogback.configurationFile=file:${FLINK_CONF_DIR}/logback.xml")function guaranteed_kill {  to_stop_pid=$1  daemon=$2  # send sigterm for graceful shutdown  kill $to_stop_pid  # if timeout exists, use it  if command -v timeout &> /dev/null ; then    # wait 10 seconds for process to stop. By default, Flink kills the JVM 5 seconds after sigterm.    timeout 10 tail --pid=$to_stop_pid -f /dev/null &> /dev/null    if [ "$?" -eq 124 ]; then      echo "Daemon $daemon didn't stop within 10 seconds. Killing it."      # send sigkill      kill -9 $to_stop_pid    fi  fi}case $STARTSTOP in    (start)        # Print a warning if daemons are already running on host        if [ -f "$pid" ]; then          active=()          while IFS='' read -r p || [[ -n "$p" ]]; do            kill -0 $p >/dev/null 2>&1            if [ $? -eq 0 ]; then              active+=($p)            fi          done < "${pid}"          count="${#active[@]}"          if [ ${count} -gt 0 ]; then            echo "[INFO] $count instance(s) of $DAEMON are already running on $HOSTNAME."          fi        fi        # Evaluate user options for local variable expansion        FLINK_ENV_JAVA_OPTS=$(eval echo ${FLINK_ENV_JAVA_OPTS})        echo "Starting $DAEMON daemon on host $HOSTNAME."        "$JAVA_RUN" $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$SQL_GATEWAY_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 200<&- 2>&1 < /dev/null &        mypid=$!        # Add to pid file if successful start        if [[ ${mypid} =~ ${IS_NUMBER} ]] && kill -0 $mypid > /dev/null 2>&1 ; then            echo $mypid >> "$pid"        else            echo "Error starting $DAEMON daemon."            exit 1        fi    ;;    (stop)        if [ -f "$pid" ]; then            # Remove last in pid file            to_stop=$(tail -n 1 "$pid")            if [ -z $to_stop ]; then                rm "$pid" # If all stopped, clean up pid file                echo "No $DAEMON daemon to stop on host $HOSTNAME."            else                sed \$d "$pid" > "$pid.tmp" # all but last line                # If all stopped, clean up pid file                [ $(wc -l < "$pid.tmp") -eq 0 ] && rm "$pid" "$pid.tmp" || mv "$pid.tmp" "$pid"                if kill -0 $to_stop > /dev/null 2>&1; then                    echo "Stopping $DAEMON daemon (pid: $to_stop) on host $HOSTNAME."                    guaranteed_kill $to_stop $DAEMON                else                    echo "No $DAEMON daemon (pid: $to_stop) is running anymore on $HOSTNAME."                fi            fi        else            echo "No $DAEMON daemon to stop on host $HOSTNAME."        fi    ;;    (stop-all)        if [ -f "$pid" ]; then            mv "$pid" "${pid}.tmp"            while read to_stop; do                if kill -0 $to_stop > /dev/null 2>&1; then                    echo "Stopping $DAEMON daemon (pid: $to_stop) on host $HOSTNAME."                    guaranteed_kill $to_stop $DAEMON                else                    echo "Skipping $DAEMON daemon (pid: $to_stop), because it is not running anymore on $HOSTNAME."                fi            done < "${pid}.tmp"            rm "${pid}.tmp"        fi    ;;    (*)        echo "Unexpected argument '$STARTSTOP'. $USAGE."        exit 1    ;;esac

6、总结

启动集群会加载config.sh文件,会获取Masters和Slavers配置信息

  • jobmanager.sh 来启动 JobManager
  • taskmanager.sh 来启动 TaskManager

都通过flink-daemon.sh脚本来启动 JVM 过程剖析flink-daemon.sh脚本发现

  • JobManager 的启动实现类是 : StandaloneSessionClusterEntrypoint
  • TaskManager的启实现类是 : TaskManagerRunner

如感兴趣,点赞加关注,非常感谢!!!