1、start-cluster.sh
# 1、获取文件所在的门路bin=`dirname "$0"`bin=`cd "$bin"; pwd`# 2、先加载配置文件,外面都是一些函数和变量,还有一些逻辑. "$bin"/config.sh# 3、启动JobManager# Start the JobManager instance(s)shopt -s nocasematch# 4、判断JobManager的启动模式,是集群还是单机模式,读取的是flink-conf.yaml中的 high-availability.typeif [[ $HIGH_AVAILABILITY == "zookeeper" ]]; then # HA Mode # 5、如果是基于zookeeper集群模式,调用readMasters readMasters echo "Starting HA cluster with ${#MASTERS[@]} masters." for ((i=0;i<${#MASTERS[@]};++i)); do master=${MASTERS[i]} webuiport=${WEBUIPORTS[i]} # 6、如果是本地模式,本地启动 if [ ${MASTERS_ALL_LOCALHOST} = true ] ; then "${FLINK_BIN_DIR}"/jobmanager.sh start "${master}" "${webuiport}" else # 7、近程通过SSH形式进行后盾启动 ssh -n $FLINK_SSH_OPTS $master -- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/jobmanager.sh\" start ${master} ${webuiport} &" fi doneelse echo "Starting cluster." # Start single JobManager on this machine # 8、以后节点启动JobManager "$FLINK_BIN_DIR"/jobmanager.sh startfishopt -u nocasematch# Start TaskManager instance(s)# 9、启动TaskManagerTMWorkers start
2、加载config.sh
# 构建Flink ClassPathconstructFlinkClassPath() { local FLINK_DIST local FLINK_CLASSPATH while read -d '' -r jarfile ; do if [[ "$jarfile" =~ .*/flink-dist[^/]*.jar$ ]]; then FLINK_DIST="$FLINK_DIST":"$jarfile" elif [[ "$FLINK_CLASSPATH" == "" ]]; then FLINK_CLASSPATH="$jarfile"; else FLINK_CLASSPATH="$FLINK_CLASSPATH":"$jarfile" fi done < <(find "$FLINK_LIB_DIR" ! -type d -name '*.jar' -print0 | sort -z) local FLINK_DIST_COUNT FLINK_DIST_COUNT="$(echo "$FLINK_DIST" | tr -s ':' '\n' | grep -v '^$' | wc -l)" # If flink-dist*.jar cannot be resolved write error messages to stderr since stdout is stored # as the classpath and exit function with empty classpath to force process failure if [[ "$FLINK_DIST" == "" ]]; then (>&2 echo "[ERROR] Flink distribution jar not found in $FLINK_LIB_DIR.") exit 1 elif [[ "$FLINK_DIST_COUNT" -gt 1 ]]; then (>&2 echo "[ERROR] Multiple flink-dist*.jar found in $FLINK_LIB_DIR. Please resolve.") exit 1 fi echo "$FLINK_CLASSPATH""$FLINK_DIST"}findFlinkDistJar() { local FLINK_DIST FLINK_DIST="$(find "$FLINK_LIB_DIR" -name 'flink-dist*.jar')" local FLINK_DIST_COUNT FLINK_DIST_COUNT="$(echo "$FLINK_DIST" | wc -l)" # If flink-dist*.jar cannot be resolved write error messages to stderr since stdout is stored # as the classpath and exit function with empty classpath to force process failure if [[ "$FLINK_DIST" == "" ]]; then (>&2 echo "[ERROR] Flink distribution jar not found in $FLINK_LIB_DIR.") exit 1 elif [[ "$FLINK_DIST_COUNT" -gt 1 ]]; then (>&2 echo "[ERROR] Multiple flink-dist*.jar found in $FLINK_LIB_DIR. Please resolve.") exit 1 fi echo "$FLINK_DIST"}findSqlGatewayJar() { local SQL_GATEWAY SQL_GATEWAY="$(find "$FLINK_OPT_DIR" -name 'flink-sql-gateway*.jar')" local SQL_GATEWAY_COUNT SQL_GATEWAY_COUNT="$(echo "$SQL_GATEWAY" | wc -l)" # If flink-sql-gateway*.jar cannot be resolved write error messages to stderr since stdout is stored # as the classpath and exit function with empty classpath to force process failure if [[ "$SQL_GATEWAY" == "" ]]; then (>&2 echo "[ERROR] Flink sql gateway jar not found in $FLINK_OPT_DIR.") exit 1 elif [[ "$SQL_GATEWAY_COUNT" -gt 1 ]]; then (>&2 echo "[ERROR] Multiple flink-sql-gateway*.jar found in $FLINK_OPT_DIR. Please resolve.") exit 1 fi echo "$SQL_GATEWAY"}findFlinkPythonJar() { local FLINK_PYTHON FLINK_PYTHON="$(find "$FLINK_OPT_DIR" -name 'flink-python*.jar')" local FLINK_PYTHON_COUNT FLINK_PYTHON_COUNT="$(echo "FLINK_PYTHON" | wc -l)" # If flink-python*.jar cannot be resolved write error messages to stderr since stdout is stored # as the classpath and exit function with empty classpath to force process failure if [[ "$FLINK_PYTHON" == "" ]]; then echo "[WARN] Flink python jar not found in $FLINK_OPT_DIR." elif [[ "$FLINK_PYTHON_COUNT" -gt 1 ]]; then (>&2 echo "[ERROR] Multiple flink-python*.jar found in $FLINK_OPT_DIR. Please resolve.") exit 1 fi echo "$FLINK_PYTHON"}# These are used to mangle paths that are passed to java when using# cygwin. Cygwin paths are like linux paths, i.e. /path/to/somewhere# but the windows java version expects them in Windows Format, i.e. C:\bla\blub.# "cygpath" can do the conversion.manglePath() { UNAME=$(uname -s) if [ "${UNAME:0:6}" == "CYGWIN" ]; then echo `cygpath -w "$1"` else echo $1 fi}manglePathList() { UNAME=$(uname -s) # a path list, for example a java classpath if [ "${UNAME:0:6}" == "CYGWIN" ]; then echo `cygpath -wp "$1"` else echo $1 fi}# Looks up a config value by key from a simple YAML-style key-value map.# $1: key to look up# $2: default value to return if key does not exist# $3: config file to read fromreadFromConfig() { local key=$1 local defaultValue=$2 local configFile=$3 # first extract the value with the given key (1st sed), then trim the result (2nd sed) # if a key exists multiple times, take the "last" one (tail) local value=`sed -n "s/^[ ]*${key}[ ]*: \([^#]*\).*$/\1/p" "${configFile}" | sed "s/^ *//;s/ *$//" | tail -n 1` [ -z "$value" ] && echo "$defaultValue" || echo "$value"}######################################################################################################################### DEFAULT CONFIG VALUES: These values will be used when nothing has been specified in conf/flink-conf.yaml# -or- the respective environment variables are not set.######################################################################################################################### WARNING !!! , these values are only used if there is nothing else is specified in# conf/flink-conf.yamlDEFAULT_ENV_PID_DIR="/tmp" # Directory to store *.pid files toDEFAULT_ENV_LOG_MAX=10 # Maximum number of old log files to keepDEFAULT_ENV_JAVA_OPTS="" # Optional JVM argsDEFAULT_ENV_JAVA_OPTS_JM="" # Optional JVM args (JobManager)DEFAULT_ENV_JAVA_OPTS_TM="" # Optional JVM args (TaskManager)DEFAULT_ENV_JAVA_OPTS_HS="" # Optional JVM args (HistoryServer)DEFAULT_ENV_JAVA_OPTS_CLI="" # Optional JVM args (Client)DEFAULT_ENV_SSH_OPTS="" # Optional SSH parameters running in cluster modeDEFAULT_YARN_CONF_DIR="" # YARN Configuration Directory, if necessaryDEFAULT_HADOOP_CONF_DIR="" # Hadoop Configuration Directory, if necessaryDEFAULT_HBASE_CONF_DIR="" # HBase Configuration Directory, if necessary######################################################################################################################### CONFIG KEYS: The default values can be overwritten by the following keys in conf/flink-conf.yaml########################################################################################################################KEY_TASKM_COMPUTE_NUMA="taskmanager.compute.numa"KEY_ENV_PID_DIR="env.pid.dir"KEY_ENV_LOG_DIR="env.log.dir"KEY_ENV_LOG_MAX="env.log.max"KEY_ENV_YARN_CONF_DIR="env.yarn.conf.dir"KEY_ENV_HADOOP_CONF_DIR="env.hadoop.conf.dir"KEY_ENV_HBASE_CONF_DIR="env.hbase.conf.dir"KEY_ENV_JAVA_HOME="env.java.home"KEY_ENV_JAVA_OPTS="env.java.opts.all"KEY_ENV_JAVA_OPTS_JM="env.java.opts.jobmanager"KEY_ENV_JAVA_OPTS_TM="env.java.opts.taskmanager"KEY_ENV_JAVA_OPTS_HS="env.java.opts.historyserver"KEY_ENV_JAVA_OPTS_CLI="env.java.opts.client"KEY_ENV_SSH_OPTS="env.ssh.opts"KEY_HIGH_AVAILABILITY="high-availability.type"KEY_ZK_HEAP_MB="zookeeper.heap.mb"######################################################################################################################### PATHS AND CONFIG########################################################################################################################target="$0"# For the case, the executable has been directly symlinked, figure out# the correct bin path by following its symlink up to an upper bound.# Note: we can't use the readlink utility here if we want to be POSIX# compatible.iteration=0while [ -L "$target" ]; do if [ "$iteration" -gt 100 ]; then echo "Cannot resolve path: You have a cyclic symlink in $target." break fi ls=`ls -ld -- "$target"` target=`expr "$ls" : '.* -> \(.*\)$'` iteration=$((iteration + 1))done# Convert relative path to absolute path and resolve directory symlinksbin=`dirname "$target"`SYMLINK_RESOLVED_BIN=`cd "$bin"; pwd -P`# Define the main directory of the flink installation# If config.sh is called by pyflink-shell.sh in python bin directory(pip installed), then do not need to set the FLINK_HOME here.if [ -z "$_FLINK_HOME_DETERMINED" ]; then FLINK_HOME=`dirname "$SYMLINK_RESOLVED_BIN"`fiif [ -z "$FLINK_LIB_DIR" ]; then FLINK_LIB_DIR=$FLINK_HOME/lib; fiif [ -z "$FLINK_PLUGINS_DIR" ]; then FLINK_PLUGINS_DIR=$FLINK_HOME/plugins; fiif [ -z "$FLINK_OPT_DIR" ]; then FLINK_OPT_DIR=$FLINK_HOME/opt; fi# These need to be mangled because they are directly passed to java.# The above lib path is used by the shell script to retrieve jars in a# directory, so it needs to be unmangled.FLINK_HOME_DIR_MANGLED=`manglePath "$FLINK_HOME"`if [ -z "$FLINK_CONF_DIR" ]; then FLINK_CONF_DIR=$FLINK_HOME_DIR_MANGLED/conf; fiFLINK_BIN_DIR=$FLINK_HOME_DIR_MANGLED/binDEFAULT_FLINK_LOG_DIR=$FLINK_HOME_DIR_MANGLED/logFLINK_CONF_FILE="flink-conf.yaml"YAML_CONF=${FLINK_CONF_DIR}/${FLINK_CONF_FILE}### Exported environment variables #### 定义一些环境变量export FLINK_CONF_DIRexport FLINK_BIN_DIRexport FLINK_PLUGINS_DIR# export /lib dir to access it during deployment of the Yarn staging filesexport FLINK_LIB_DIR# export /opt dir to access it for the SQL clientexport FLINK_OPT_DIR######################################################################################################################### ENVIRONMENT VARIABLES######################################################################################################################### read JAVA_HOME from config with no default valueMY_JAVA_HOME=$(readFromConfig ${KEY_ENV_JAVA_HOME} "" "${YAML_CONF}")# check if config specified JAVA_HOMEif [ -z "${MY_JAVA_HOME}" ]; then # config did not specify JAVA_HOME. Use system JAVA_HOME MY_JAVA_HOME="${JAVA_HOME}"fi# check if we have a valid JAVA_HOME and if java is not availableif [ -z "${MY_JAVA_HOME}" ] && ! type java > /dev/null 2> /dev/null; then echo "Please specify JAVA_HOME. Either in Flink config ./conf/flink-conf.yaml or as system-wide JAVA_HOME." exit 1else JAVA_HOME="${MY_JAVA_HOME}"fiUNAME=$(uname -s)if [ "${UNAME:0:6}" == "CYGWIN" ]; then JAVA_RUN=javaelse if [[ -d "$JAVA_HOME" ]]; then JAVA_RUN="$JAVA_HOME"/bin/java else JAVA_RUN=java fifi# Define HOSTNAME if it is not already setif [ -z "${HOSTNAME}" ]; then HOSTNAME=`hostname`fiIS_NUMBER="^[0-9]+$"# Verify that NUMA tooling is availablecommand -v numactl >/dev/null 2>&1if [[ $? -ne 0 ]]; then FLINK_TM_COMPUTE_NUMA="false"else # Define FLINK_TM_COMPUTE_NUMA if it is not already set if [ -z "${FLINK_TM_COMPUTE_NUMA}" ]; then FLINK_TM_COMPUTE_NUMA=$(readFromConfig ${KEY_TASKM_COMPUTE_NUMA} "false" "${YAML_CONF}") fifiif [ -z "${MAX_LOG_FILE_NUMBER}" ]; then MAX_LOG_FILE_NUMBER=$(readFromConfig ${KEY_ENV_LOG_MAX} ${DEFAULT_ENV_LOG_MAX} "${YAML_CONF}") export MAX_LOG_FILE_NUMBERfiif [ -z "${FLINK_LOG_DIR}" ]; then FLINK_LOG_DIR=$(readFromConfig ${KEY_ENV_LOG_DIR} "${DEFAULT_FLINK_LOG_DIR}" "${YAML_CONF}")fiif [ -z "${YARN_CONF_DIR}" ]; then YARN_CONF_DIR=$(readFromConfig ${KEY_ENV_YARN_CONF_DIR} "${DEFAULT_YARN_CONF_DIR}" "${YAML_CONF}")fiif [ -z "${HADOOP_CONF_DIR}" ]; then HADOOP_CONF_DIR=$(readFromConfig ${KEY_ENV_HADOOP_CONF_DIR} "${DEFAULT_HADOOP_CONF_DIR}" "${YAML_CONF}")fiif [ -z "${HBASE_CONF_DIR}" ]; then HBASE_CONF_DIR=$(readFromConfig ${KEY_ENV_HBASE_CONF_DIR} "${DEFAULT_HBASE_CONF_DIR}" "${YAML_CONF}")fiif [ -z "${FLINK_PID_DIR}" ]; then FLINK_PID_DIR=$(readFromConfig ${KEY_ENV_PID_DIR} "${DEFAULT_ENV_PID_DIR}" "${YAML_CONF}")fiif [ -z "${FLINK_ENV_JAVA_OPTS}" ]; then FLINK_ENV_JAVA_OPTS=$(readFromConfig ${KEY_ENV_JAVA_OPTS} "" "${YAML_CONF}") if [ -z "${FLINK_ENV_JAVA_OPTS}" ]; then # try deprecated key FLINK_ENV_JAVA_OPTS=$(readFromConfig "env.java.opts" "${DEFAULT_ENV_JAVA_OPTS}" "${YAML_CONF}") fi # Remove leading and ending double quotes (if present) of value FLINK_ENV_JAVA_OPTS="$( echo "${FLINK_ENV_JAVA_OPTS}" | sed -e 's/^"//' -e 's/"$//' )"fiif [ -z "${FLINK_ENV_JAVA_OPTS_JM}" ]; then FLINK_ENV_JAVA_OPTS_JM=$(readFromConfig ${KEY_ENV_JAVA_OPTS_JM} "${DEFAULT_ENV_JAVA_OPTS_JM}" "${YAML_CONF}") # Remove leading and ending double quotes (if present) of value FLINK_ENV_JAVA_OPTS_JM="$( echo "${FLINK_ENV_JAVA_OPTS_JM}" | sed -e 's/^"//' -e 's/"$//' )"fiif [ -z "${FLINK_ENV_JAVA_OPTS_TM}" ]; then FLINK_ENV_JAVA_OPTS_TM=$(readFromConfig ${KEY_ENV_JAVA_OPTS_TM} "${DEFAULT_ENV_JAVA_OPTS_TM}" "${YAML_CONF}") # Remove leading and ending double quotes (if present) of value FLINK_ENV_JAVA_OPTS_TM="$( echo "${FLINK_ENV_JAVA_OPTS_TM}" | sed -e 's/^"//' -e 's/"$//' )"fiif [ -z "${FLINK_ENV_JAVA_OPTS_HS}" ]; then FLINK_ENV_JAVA_OPTS_HS=$(readFromConfig ${KEY_ENV_JAVA_OPTS_HS} "${DEFAULT_ENV_JAVA_OPTS_HS}" "${YAML_CONF}") # Remove leading and ending double quotes (if present) of value FLINK_ENV_JAVA_OPTS_HS="$( echo "${FLINK_ENV_JAVA_OPTS_HS}" | sed -e 's/^"//' -e 's/"$//' )"fiif [ -z "${FLINK_ENV_JAVA_OPTS_CLI}" ]; then FLINK_ENV_JAVA_OPTS_CLI=$(readFromConfig ${KEY_ENV_JAVA_OPTS_CLI} "${DEFAULT_ENV_JAVA_OPTS_CLI}" "${YAML_CONF}") # Remove leading and ending double quotes (if present) of value FLINK_ENV_JAVA_OPTS_CLI="$( echo "${FLINK_ENV_JAVA_OPTS_CLI}" | sed -e 's/^"//' -e 's/"$//' )"fi# -z是如果FLINK_SSH_OPTS的length为0的时候为true,-n是不为0的时候字符串为trueif [ -z "${FLINK_SSH_OPTS}" ]; then FLINK_SSH_OPTS=$(readFromConfig ${KEY_ENV_SSH_OPTS} "${DEFAULT_ENV_SSH_OPTS}" "${YAML_CONF}")fi# Define ZK_HEAP if it is not already setif [ -z "${ZK_HEAP}" ]; then ZK_HEAP=$(readFromConfig ${KEY_ZK_HEAP_MB} 0 "${YAML_CONF}")fi# High availabilityif [ -z "${HIGH_AVAILABILITY}" ]; then HIGH_AVAILABILITY=$(readFromConfig ${KEY_HIGH_AVAILABILITY} "" "${YAML_CONF}") if [ -z "${HIGH_AVAILABILITY}" ]; then # Try deprecated value DEPRECATED_HA=$(readFromConfig "high-availability" "$(readFromConfig "recovery.mode" "" "${YAML_CONF}")" "${YAML_CONF}") if [ -z "${DEPRECATED_HA}" ]; then HIGH_AVAILABILITY="none" elif [ ${DEPRECATED_HA} == "standalone" ]; then # Standalone is now 'none' HIGH_AVAILABILITY="none" else HIGH_AVAILABILITY=${DEPRECATED_HA} fi fifi# Arguments for the JVM. Used for job and task manager JVMs.# DO NOT USE FOR MEMORY SETTINGS! Use conf/flink-conf.yaml with keys# JobManagerOptions#TOTAL_PROCESS_MEMORY and TaskManagerOptions#TOTAL_PROCESS_MEMORY for that!if [ -z "${JVM_ARGS}" ]; then JVM_ARGS=""fi# Check if deprecated HADOOP_HOME is set, and specify config path to HADOOP_CONF_DIR if it's empty.if [ -z "$HADOOP_CONF_DIR" ]; then if [ -n "$HADOOP_HOME" ]; then # HADOOP_HOME is set. Check if its a Hadoop 1.x or 2.x HADOOP_HOME path if [ -d "$HADOOP_HOME/conf" ]; then # It's Hadoop 1.x HADOOP_CONF_DIR="$HADOOP_HOME/conf" fi if [ -d "$HADOOP_HOME/etc/hadoop" ]; then # It's Hadoop 2.2+ HADOOP_CONF_DIR="$HADOOP_HOME/etc/hadoop" fi fifi# if neither HADOOP_CONF_DIR nor HADOOP_CLASSPATH are set, use some common default (if available)if [ -z "$HADOOP_CONF_DIR" ] && [ -z "$HADOOP_CLASSPATH" ]; then if [ -d "/etc/hadoop/conf" ]; then echo "Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR or HADOOP_CLASSPATH was set." HADOOP_CONF_DIR="/etc/hadoop/conf" fifi# Check if deprecated HBASE_HOME is set, and specify config path to HBASE_CONF_DIR if it's empty.if [ -z "$HBASE_CONF_DIR" ]; then if [ -n "$HBASE_HOME" ]; then # HBASE_HOME is set. if [ -d "$HBASE_HOME/conf" ]; then HBASE_CONF_DIR="$HBASE_HOME/conf" fi fifi# try and set HBASE_CONF_DIR to some common default if it's not setif [ -z "$HBASE_CONF_DIR" ]; then if [ -d "/etc/hbase/conf" ]; then echo "Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set." HBASE_CONF_DIR="/etc/hbase/conf" fifiINTERNAL_HADOOP_CLASSPATHS="${HADOOP_CLASSPATH}:${HADOOP_CONF_DIR}:${YARN_CONF_DIR}"if [ -n "${HBASE_CONF_DIR}" ]; then INTERNAL_HADOOP_CLASSPATHS="${INTERNAL_HADOOP_CLASSPATHS}:${HBASE_CONF_DIR}"fi# Auxiliary function which extracts the name of host from a line which# also potentially includes topology information and the taskManager typeextractHostName() { # handle comments: extract first part of string (before first # character) WORKER=`echo $1 | cut -d'#' -f 1` # Extract the hostname from the network hierarchy if [[ "$WORKER" =~ ^.*/([0-9a-zA-Z.-]+)$ ]]; then WORKER=${BASH_REMATCH[1]} fi echo $WORKER}readMasters() { MASTERS_FILE="${FLINK_CONF_DIR}/masters" if [[ ! -f "${MASTERS_FILE}" ]]; then echo "No masters file. Please specify masters in 'conf/masters'." exit 1 fi MASTERS=() WEBUIPORTS=() MASTERS_ALL_LOCALHOST=true GOON=true while $GOON; do read line || GOON=false HOSTWEBUIPORT=$( extractHostName $line) if [ -n "$HOSTWEBUIPORT" ]; then HOST=$(echo $HOSTWEBUIPORT | cut -f1 -d:) WEBUIPORT=$(echo $HOSTWEBUIPORT | cut -s -f2 -d:) MASTERS+=(${HOST}) if [ -z "$WEBUIPORT" ]; then WEBUIPORTS+=(0) else WEBUIPORTS+=(${WEBUIPORT}) fi if [ "${HOST}" != "localhost" ] && [ "${HOST}" != "127.0.0.1" ] ; then MASTERS_ALL_LOCALHOST=false fi fi done < "$MASTERS_FILE"}# 12、循环遍历slaves文件外面的节点readWorkers() { WORKERS_FILE="${FLINK_CONF_DIR}/workers" if [[ ! -f "$WORKERS_FILE" ]]; then echo "No workers file. Please specify workers in 'conf/workers'." exit 1 fi WORKERS=() WORKERS_ALL_LOCALHOST=true GOON=true while $GOON; do read line || GOON=false HOST=$( extractHostName $line) if [ -n "$HOST" ] ; then WORKERS+=(${HOST}) if [ "${HOST}" != "localhost" ] && [ "${HOST}" != "127.0.0.1" ] ; then WORKERS_ALL_LOCALHOST=false fi fi done < "$WORKERS_FILE"}# starts or stops TMs on all workers# TMWorkers start|stop# 10、启动所有的TaskManagerTMWorkers() { CMD=$1 # 11、读取slaves配置文件内容 readWorkers # 13、本机启动 if [ ${WORKERS_ALL_LOCALHOST} = true ] ; then # all-local setup for worker in ${WORKERS[@]}; do "${FLINK_BIN_DIR}"/taskmanager.sh "${CMD}" done else # 14、近程启动 # non-local setup # start/stop TaskManager instance(s) using pdsh (Parallel Distributed Shell) when available command -v pdsh >/dev/null 2>&1 if [[ $? -ne 0 ]]; then for worker in ${WORKERS[@]}; do ssh -n $FLINK_SSH_OPTS $worker -- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" \"${CMD}\" &" done else PDSH_SSH_ARGS="" PDSH_SSH_ARGS_APPEND=$FLINK_SSH_OPTS pdsh -w $(IFS=, ; echo "${WORKERS[*]}") \ "nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" \"${CMD}\"" fi fi}runBashJavaUtilsCmd() { local cmd=$1 local conf_dir=$2 local class_path=$3 local dynamic_args=${@:4} class_path=`manglePathList "${class_path}"` local output=`"${JAVA_RUN}" -classpath "${class_path}" org.apache.flink.runtime.util.bash.BashJavaUtils ${cmd} --configDir "${conf_dir}" $dynamic_args 2>&1 | tail -n 1000` if [[ $? -ne 0 ]]; then echo "[ERROR] Cannot run BashJavaUtils to execute command ${cmd}." 1>&2 # Print the output in case the user redirect the log to console. echo "$output" 1>&2 exit 1 fi echo "$output"}extractExecutionResults() { local output="$1" local expected_lines="$2" local EXECUTION_PREFIX="BASH_JAVA_UTILS_EXEC_RESULT:" local execution_results local num_lines execution_results=$(echo "${output}" | grep ${EXECUTION_PREFIX}) num_lines=$(echo "${execution_results}" | wc -l) # explicit check for empty result, because if execution_results is empty, then wc returns 1 if [[ -z ${execution_results} ]]; then echo "[ERROR] The execution result is empty." 1>&2 exit 1 fi if [[ ${num_lines} -ne ${expected_lines} ]]; then echo "[ERROR] The execution results has unexpected number of lines, expected: ${expected_lines}, actual: ${num_lines}." 1>&2 echo "[ERROR] An execution result line is expected following the prefix '${EXECUTION_PREFIX}'" 1>&2 echo "$output" 1>&2 exit 1 fi echo "${execution_results//${EXECUTION_PREFIX}/}"}extractLoggingOutputs() { local output="$1" local EXECUTION_PREFIX="BASH_JAVA_UTILS_EXEC_RESULT:" echo "${output}" | grep -v ${EXECUTION_PREFIX}}parseResourceParamsAndExportLogs() { local cmd=$1 java_utils_output=$(runBashJavaUtilsCmd ${cmd} "${FLINK_CONF_DIR}" "${FLINK_BIN_DIR}/bash-java-utils.jar:$(findFlinkDistJar)" "${@:2}") logging_output=$(extractLoggingOutputs "${java_utils_output}") params_output=$(extractExecutionResults "${java_utils_output}" 2) if [[ $? -ne 0 ]]; then echo "[ERROR] Could not get JVM parameters and dynamic configurations properly." echo "[ERROR] Raw output from BashJavaUtils:" echo "$java_utils_output" exit 1 fi jvm_params=$(echo "${params_output}" | head -n1) export JVM_ARGS="${JVM_ARGS} ${jvm_params}" export DYNAMIC_PARAMETERS=$(IFS=" " echo "${params_output}" | tail -n1) export FLINK_INHERITED_LOGS="$FLINK_INHERITED_LOGSRESOURCE_PARAMS extraction logs:jvm_params: $jvm_paramsdynamic_configs: $DYNAMIC_PARAMETERSlogs: $logging_output"}parseJmArgsAndExportLogs() { parseResourceParamsAndExportLogs GET_JM_RESOURCE_PARAMS "$@"}parseTmArgsAndExportLogs() { parseResourceParamsAndExportLogs GET_TM_RESOURCE_PARAMS "$@"}
要害的函数 TMWorkers(启动TaskManager)、readMasters(读取masters信息)、readWorkers(读取worker信息),定义了一些环境变量
3、jobmanager.sh
# Start/stop a Flink JobManager.USAGE="Usage: jobmanager.sh ((start|start-foreground) [args])|stop|stop-all"STARTSTOP=$1if [ -z $2 ] || [[ $2 == "-D" ]]; then # start [-D ...] args=("${@:2}")elif [ -z $3 ] || [[ $3 == "-D" ]]; then # legacy path: start <host> [-D ...] HOST=$2 args=("${@:3}")else # legacy path: start <host> <port> [-D ...] HOST=$2 WEBUIPORT=$3 args=("${@:4}")fi# 16、校验参数if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then echo $USAGE exit 1fibin=`dirname "$0"`bin=`cd "$bin"; pwd`# 17、加载config.sh. "$bin"/config.sh# 18、JobManager启动的主类,这是还是很重要的ENTRYPOINT=standalonesessionif [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then # Add JobManager-specific JVM options export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_JM}" parseJmArgsAndExportLogs "${ARGS[@]}" args=("--configDir" "${FLINK_CONF_DIR}" "--executionMode" "cluster" "${args[@]}") if [ ! -z $HOST ]; then args+=("--host") args+=("${HOST}") fi if [ ! -z $WEBUIPORT ]; then args+=("--webui-port") args+=("${WEBUIPORT}") fi if [ ! -z "${DYNAMIC_PARAMETERS}" ]; then args=(${DYNAMIC_PARAMETERS[@]} "${args[@]}") fifiif [[ $STARTSTOP == "start-foreground" ]]; then exec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${args[@]}"else # 19、通过 flink-daemon.sh 启动脚本 standalonesession "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${args[@]}"fi
4、taskmanger.sh
# Start/stop a Flink TaskManager.USAGE="Usage: taskmanager.sh (start|start-foreground|stop|stop-all)"STARTSTOP=$1ARGS=("${@:2}")if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then echo $USAGE exit 1fibin=`dirname "$0"`bin=`cd "$bin"; pwd`. "$bin"/config.sh# 20、定义变量taskexecutorENTRYPOINT=taskexecutorif [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then # if no other JVM options are set, set the GC to G1 if [ -z "${FLINK_ENV_JAVA_OPTS}" ] && [ -z "${FLINK_ENV_JAVA_OPTS_TM}" ]; then export JVM_ARGS="$JVM_ARGS -XX:+UseG1GC" fi # Add TaskManager-specific JVM options export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_TM}" # Startup parameters parseTmArgsAndExportLogs "${ARGS[@]}" if [ ! -z "${DYNAMIC_PARAMETERS}" ]; then ARGS=(${DYNAMIC_PARAMETERS[@]} "${ARGS[@]}") fi ARGS=("--configDir" "${FLINK_CONF_DIR}" "${ARGS[@]}")fiif [[ $STARTSTOP == "start-foreground" ]]; then exec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${ARGS[@]}"else if [[ $FLINK_TM_COMPUTE_NUMA == "false" ]]; then # Start a single TaskManager "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${ARGS[@]}" else # Example output from `numactl --show` on an AWS c4.8xlarge: # policy: default # preferred node: current # physcpubind: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 # cpubind: 0 1 # nodebind: 0 1 # membind: 0 1 read -ra NODE_LIST <<< $(numactl --show | grep "^nodebind: ") for NODE_ID in "${NODE_LIST[@]:1}"; do # Start a TaskManager for each NUMA node # 启动 numactl --membind=$NODE_ID --cpunodebind=$NODE_ID -- "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${ARGS[@]}" done fifi
5、flink-daemon.sh
# Start/stop a Flink daemon.USAGE="Usage: flink-daemon.sh (start|stop|stop-all) (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob|sql-gateway) [args]"STARTSTOP=$1DAEMON=$2ARGS=("${@:3}") # get remaining arguments as arraybin=`dirname "$0"`bin=`cd "$bin"; pwd`# 加载配置. "$bin"/config.shcase $DAEMON in (taskexecutor) # 21、TaskExecutor启动的类,TaskManagerRunner CLASS_TO_RUN=org.apache.flink.runtime.taskexecutor.TaskManagerRunner ;; (zookeeper) CLASS_TO_RUN=org.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeer ;; (historyserver) CLASS_TO_RUN=org.apache.flink.runtime.webmonitor.history.HistoryServer ;; (standalonesession) # 22、JobManager启动类 StandaloneSessionClusterEntrypoint CLASS_TO_RUN=org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint ;; (standalonejob) CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint ;; (sql-gateway) CLASS_TO_RUN=org.apache.flink.table.gateway.SqlGateway SQL_GATEWAY_CLASSPATH="`findSqlGatewayJar`":"`findFlinkPythonJar`" ;; (*) echo "Unknown daemon '${DAEMON}'. $USAGE." exit 1 ;;esacif [ "$FLINK_IDENT_STRING" = "" ]; then FLINK_IDENT_STRING="$USER"fiFLINK_TM_CLASSPATH=`constructFlinkClassPath`pid=$FLINK_PID_DIR/flink-$FLINK_IDENT_STRING-$DAEMON.pidmkdir -p "$FLINK_PID_DIR"# Log files for daemons are indexed from the process ID's position in the PID# file. The following lock prevents a race condition during daemon startup# when multiple daemons read, index, and write to the PID file concurrently.# The lock is created on the PID directory since a lock file cannot be safely# removed. The daemon is started with the lock closed and the lock remains# active in this script until the script exits.command -v flock >/dev/null 2>&1if [[ $? -eq 0 ]]; then exec 200<"$FLINK_PID_DIR" flock 200fi# Ascending ID depending on number of lines in pid file.# This allows us to start multiple daemon of each type.id=$([ -f "$pid" ] && echo $(wc -l < "$pid") || echo "0")FLINK_LOG_PREFIX="${FLINK_LOG_DIR}/flink-${FLINK_IDENT_STRING}-${DAEMON}-${id}-${HOSTNAME}"log="${FLINK_LOG_PREFIX}.log"out="${FLINK_LOG_PREFIX}.out"log_setting=("-Dlog.file=${log}" "-Dlog4j.configuration=file:${FLINK_CONF_DIR}/log4j.properties" "-Dlog4j.configurationFile=file:${FLINK_CONF_DIR}/log4j.properties" "-Dlogback.configurationFile=file:${FLINK_CONF_DIR}/logback.xml")function guaranteed_kill { to_stop_pid=$1 daemon=$2 # send sigterm for graceful shutdown kill $to_stop_pid # if timeout exists, use it if command -v timeout &> /dev/null ; then # wait 10 seconds for process to stop. By default, Flink kills the JVM 5 seconds after sigterm. timeout 10 tail --pid=$to_stop_pid -f /dev/null &> /dev/null if [ "$?" -eq 124 ]; then echo "Daemon $daemon didn't stop within 10 seconds. Killing it." # send sigkill kill -9 $to_stop_pid fi fi}case $STARTSTOP in (start) # Print a warning if daemons are already running on host if [ -f "$pid" ]; then active=() while IFS='' read -r p || [[ -n "$p" ]]; do kill -0 $p >/dev/null 2>&1 if [ $? -eq 0 ]; then active+=($p) fi done < "${pid}" count="${#active[@]}" if [ ${count} -gt 0 ]; then echo "[INFO] $count instance(s) of $DAEMON are already running on $HOSTNAME." fi fi # Evaluate user options for local variable expansion FLINK_ENV_JAVA_OPTS=$(eval echo ${FLINK_ENV_JAVA_OPTS}) echo "Starting $DAEMON daemon on host $HOSTNAME." "$JAVA_RUN" $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$SQL_GATEWAY_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 200<&- 2>&1 < /dev/null & mypid=$! # Add to pid file if successful start if [[ ${mypid} =~ ${IS_NUMBER} ]] && kill -0 $mypid > /dev/null 2>&1 ; then echo $mypid >> "$pid" else echo "Error starting $DAEMON daemon." exit 1 fi ;; (stop) if [ -f "$pid" ]; then # Remove last in pid file to_stop=$(tail -n 1 "$pid") if [ -z $to_stop ]; then rm "$pid" # If all stopped, clean up pid file echo "No $DAEMON daemon to stop on host $HOSTNAME." else sed \$d "$pid" > "$pid.tmp" # all but last line # If all stopped, clean up pid file [ $(wc -l < "$pid.tmp") -eq 0 ] && rm "$pid" "$pid.tmp" || mv "$pid.tmp" "$pid" if kill -0 $to_stop > /dev/null 2>&1; then echo "Stopping $DAEMON daemon (pid: $to_stop) on host $HOSTNAME." guaranteed_kill $to_stop $DAEMON else echo "No $DAEMON daemon (pid: $to_stop) is running anymore on $HOSTNAME." fi fi else echo "No $DAEMON daemon to stop on host $HOSTNAME." fi ;; (stop-all) if [ -f "$pid" ]; then mv "$pid" "${pid}.tmp" while read to_stop; do if kill -0 $to_stop > /dev/null 2>&1; then echo "Stopping $DAEMON daemon (pid: $to_stop) on host $HOSTNAME." guaranteed_kill $to_stop $DAEMON else echo "Skipping $DAEMON daemon (pid: $to_stop), because it is not running anymore on $HOSTNAME." fi done < "${pid}.tmp" rm "${pid}.tmp" fi ;; (*) echo "Unexpected argument '$STARTSTOP'. $USAGE." exit 1 ;;esac
6、总结
启动集群会加载config.sh文件,会获取Masters和Slavers配置信息
- jobmanager.sh 来启动 JobManager
- taskmanager.sh 来启动 TaskManager
都通过flink-daemon.sh脚本来启动 JVM 过程,剖析flink-daemon.sh脚本发现
- JobManager 的启动实现类是 : StandaloneSessionClusterEntrypoint
- TaskManager的启实现类是 : TaskManagerRunner
如感兴趣,点赞加关注,非常感谢!!!