关于kafka:KafkaKafkaServerstartsh-启动脚本分析Ver-272

52次阅读

共计 36973 个字符,预计需要花费 93 分钟才能阅读完成。

Kafka-Server-start.sh

  
if [$# -lt 1];  
then  
 # 提醒命令应用办法
 echo "USAGE: $0 [-daemon] server.properties [--override property=value]*" exit 1  
fi  
base_dir=$(dirname $0)  
  
if ["x$KAFKA_LOG4J_OPTS" = "x"]; then  
    export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/log4j.properties"  
fi  
  

if ["x$KAFKA_HEAP_OPTS" = "x"]; then  
    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"  
fi  
  
EXTRA_ARGS=${EXTRA_ARGS-'-name kafkaServer -loggc'}  
  
COMMAND=$1  
case $COMMAND in  
  -daemon)  
    EXTRA_ARGS="-daemon"$EXTRA_ARGS  
    shift  
    ;;  
  *)  
    ;;  
esac  
  
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
  1. 判断参数有没有,参数个数小于 1 就提醒用法;
  2. 获取脚本以后门路赋值给变量 base_dir;
  3. 判断日志参数 KAFKA_LOG4J_OPTS 是否为空,为空就给它一个值;
  4. 判断堆参数 KAFKA_HEAP_OPTS是否为空,为空就默认给它赋值为 “-Xmx1G -Xms1G”,默认的堆空间指定为 1G;
  5. 判断启动命令中第一个参数是否为 -daemon,如果是就以 守护过程启动(其实不是,是赋给另一个变量 EXTRA_ARGS);
  6. 执行命令。

最初一个脚本是执行另一个脚本:kafka-run-class.sh,这个脚本的内容比较复杂了。

kafka-run-class.sh

#!/bin/bash
if [$# -lt 1];
then
  echo "USAGE: $0 [-daemon] [-name servicename] [-loggc] classname [opts]"
  exit 1
fi

# CYGWIN == 1 if Cygwin is detected, else 0.
if [[$(uname -a) =~ "CYGWIN" ]]; then
  CYGWIN=1
else
  CYGWIN=0
fi

if [-z "$INCLUDE_TEST_JARS"]; then
  INCLUDE_TEST_JARS=false
fi

# Exclude jars not necessary for running commands.
regex="(-(test|test-sources|src|scaladoc|javadoc)\.jar|jar.asc)$"
should_include_file() {if [ "$INCLUDE_TEST_JARS" = true]; then
    return 0
  fi
  file=$1
  if [-z "$(echo"$file"| egrep"$regex")" ] ; then
    return 0
  else
    return 1
  fi
}

base_dir=$(dirname $0)/..

if [-z "$SCALA_VERSION"]; then
  SCALA_VERSION=2.13.3
  if [[-f "$base_dir/gradle.properties"]]; then
    SCALA_VERSION=`grep "^scalaVersion=" "$base_dir/gradle.properties" | cut -d= -f 2`
  fi
fi

if [-z "$SCALA_BINARY_VERSION"]; then
  SCALA_BINARY_VERSION=$(echo $SCALA_VERSION | cut -f 1-2 -d '.')
fi

# run ./gradlew copyDependantLibs to get all dependant jars in a local dir
shopt -s nullglob
if [-z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION"]; then
  for dir in "$base_dir"/core/build/dependant-libs-${SCALA_VERSION}*;
  do
    CLASSPATH="$CLASSPATH:$dir/*"
  done
fi

for file in "$base_dir"/examples/build/libs/kafka-examples*.jar;
do
  if should_include_file "$file"; then
    CLASSPATH="$CLASSPATH":"$file"
  fi
done

if [-z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION"]; then
  clients_lib_dir=$(dirname $0)/../clients/build/libs
  streams_lib_dir=$(dirname $0)/../streams/build/libs
  streams_dependant_clients_lib_dir=$(dirname $0)/../streams/build/dependant-libs-${SCALA_VERSION}
else
  clients_lib_dir=/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs
  streams_lib_dir=$clients_lib_dir
  streams_dependant_clients_lib_dir=$streams_lib_dir
fi


for file in "$clients_lib_dir"/kafka-clients*.jar;
do
  if should_include_file "$file"; then
    CLASSPATH="$CLASSPATH":"$file"
  fi
done

for file in "$streams_lib_dir"/kafka-streams*.jar;
do
  if should_include_file "$file"; then
    CLASSPATH="$CLASSPATH":"$file"
  fi
done

if [-z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION"]; then
  for file in "$base_dir"/streams/examples/build/libs/kafka-streams-examples*.jar;
  do
    if should_include_file "$file"; then
      CLASSPATH="$CLASSPATH":"$file"
    fi
  done
else
  VERSION_NO_DOTS=`echo $UPGRADE_KAFKA_STREAMS_TEST_VERSION | sed 's/\.//g'`
  SHORT_VERSION_NO_DOTS=${VERSION_NO_DOTS:0:((${#VERSION_NO_DOTS} - 1))} # remove last char, ie, bug-fix number
  for file in "$base_dir"/streams/upgrade-system-tests-$SHORT_VERSION_NO_DOTS/build/libs/kafka-streams-upgrade-system-tests*.jar;
  do
    if should_include_file "$file"; then
      CLASSPATH="$file":"$CLASSPATH"
    fi
  done
  if ["$SHORT_VERSION_NO_DOTS" = "0100"]; then
    CLASSPATH="/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs/zkclient-0.8.jar":"$CLASSPATH"
    CLASSPATH="/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs/zookeeper-3.4.6.jar":"$CLASSPATH"
  fi
  if ["$SHORT_VERSION_NO_DOTS" = "0101"]; then
    CLASSPATH="/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs/zkclient-0.9.jar":"$CLASSPATH"
    CLASSPATH="/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs/zookeeper-3.4.8.jar":"$CLASSPATH"
  fi
fi

for file in "$streams_dependant_clients_lib_dir"/rocksdb*.jar;
do
  CLASSPATH="$CLASSPATH":"$file"
done

for file in "$streams_dependant_clients_lib_dir"/*hamcrest*.jar;
do
  CLASSPATH="$CLASSPATH":"$file"
done

for file in "$base_dir"/tools/build/libs/kafka-tools*.jar;
do
  if should_include_file "$file"; then
    CLASSPATH="$CLASSPATH":"$file"
  fi
done

for dir in "$base_dir"/tools/build/dependant-libs-${SCALA_VERSION}*;
do
  CLASSPATH="$CLASSPATH:$dir/*"
done

for cc_pkg in "api" "transforms" "runtime" "file" "mirror" "mirror-client" "json" "tools" "basic-auth-extension"
do
  for file in "$base_dir"/connect/${cc_pkg}/build/libs/connect-${cc_pkg}*.jar;
  do
    if should_include_file "$file"; then
      CLASSPATH="$CLASSPATH":"$file"
    fi
  done
  if [-d "$base_dir/connect/${cc_pkg}/build/dependant-libs" ] ; then
    CLASSPATH="$CLASSPATH:$base_dir/connect/${cc_pkg}/build/dependant-libs/*"
  fi
done

# classpath addition for release
for file in "$base_dir"/libs/*;
do
  if should_include_file "$file"; then
    CLASSPATH="$CLASSPATH":"$file"
  fi
done

for file in "$base_dir"/core/build/libs/kafka_${SCALA_BINARY_VERSION}*.jar;
do
  if should_include_file "$file"; then
    CLASSPATH="$CLASSPATH":"$file"
  fi
done
shopt -u nullglob

if [-z "$CLASSPATH"] ; then
  echo "Classpath is empty. Please build the project first e.g. by running'./gradlew jar -PscalaVersion=$SCALA_VERSION'"
  exit 1
fi

# JMX settings
if [-z "$KAFKA_JMX_OPTS"]; then
  KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false  -Dcom.sun.management.jmxremote.ssl=false"
fi

# JMX port to use
if [$JMX_PORT]; then
  KAFKA_JMX_OPTS="$KAFKA_JMX_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT"
fi

# Log directory to use
if ["x$LOG_DIR" = "x"]; then
  LOG_DIR="$base_dir/logs"
fi

# Log4j settings
if [-z "$KAFKA_LOG4J_OPTS"]; then
  # Log to console. This is a tool.
  LOG4J_DIR="$base_dir/config/tools-log4j.properties"
  # If Cygwin is detected, LOG4J_DIR is converted to Windows format.
  ((CYGWIN)) && LOG4J_DIR=$(cygpath --path --mixed "${LOG4J_DIR}")
  KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:${LOG4J_DIR}"
else
  # create logs directory
  if [! -d "$LOG_DIR"]; then
    mkdir -p "$LOG_DIR"
  fi
fi

# If Cygwin is detected, LOG_DIR is converted to Windows format.
((CYGWIN)) && LOG_DIR=$(cygpath --path --mixed "${LOG_DIR}")
KAFKA_LOG4J_OPTS="-Dkafka.logs.dir=$LOG_DIR $KAFKA_LOG4J_OPTS"

# Generic jvm settings you want to add
if [-z "$KAFKA_OPTS"]; then
  KAFKA_OPTS=""
fi

# Set Debug options if enabled
if ["x$KAFKA_DEBUG" != "x"]; then

    # Use default ports
    DEFAULT_JAVA_DEBUG_PORT="5005"

    if [-z "$JAVA_DEBUG_PORT"]; then
        JAVA_DEBUG_PORT="$DEFAULT_JAVA_DEBUG_PORT"
    fi

    # Use the defaults if JAVA_DEBUG_OPTS was not set
    DEFAULT_JAVA_DEBUG_OPTS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=${DEBUG_SUSPEND_FLAG:-n},address=$JAVA_DEBUG_PORT"
    if [-z "$JAVA_DEBUG_OPTS"]; then
        JAVA_DEBUG_OPTS="$DEFAULT_JAVA_DEBUG_OPTS"
    fi

    echo "Enabling Java debug options: $JAVA_DEBUG_OPTS"
    KAFKA_OPTS="$JAVA_DEBUG_OPTS $KAFKA_OPTS"
fi

# Which java to use
if [-z "$JAVA_HOME"]; then
  JAVA="java"
else
  JAVA="$JAVA_HOME/bin/java"
fi

# Memory options
if [-z "$KAFKA_HEAP_OPTS"]; then
  KAFKA_HEAP_OPTS="-Xmx256M"
fi

# JVM performance options
# MaxInlineLevel=15 is the default since JDK 14 and can be removed once older JDKs are no longer supported
if [-z "$KAFKA_JVM_PERFORMANCE_OPTS"]; then
  KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true"
fi

while [$# -gt 0]; do
  COMMAND=$1
  case $COMMAND in
    -name)
      DAEMON_NAME=$2
      CONSOLE_OUTPUT_FILE=$LOG_DIR/$DAEMON_NAME.out
      shift 2
      ;;
    -loggc)
      if [-z "$KAFKA_GC_LOG_OPTS"]; then
        GC_LOG_ENABLED="true"
      fi
      shift
      ;;
    -daemon)
      DAEMON_MODE="true"
      shift
      ;;
    *)
      break
      ;;
  esac
done

# GC options    
GC_FILE_SUFFIX='-gc.log'
GC_LOG_FILE_NAME=''if ["x$GC_LOG_ENABLED"="xtrue"]; then
  GC_LOG_FILE_NAME=$DAEMON_NAME$GC_FILE_SUFFIX

  # The first segment of the version number, which is '1' for releases before Java 9
  # it then becomes '9', '10', ...
  # Some examples of the first line of `java --version`:
  # 8 -> java version "1.8.0_152"
  # 9.0.4 -> java version "9.0.4"
  # 10 -> java version "10" 2018-03-20
  # 10.0.1 -> java version "10.0.1" 2018-04-17
  # We need to match to the end of the line to prevent sed from printing the characters that do not match
  JAVA_MAJOR_VERSION=$("$JAVA" -version 2>&1 | sed -E -n 's/.* version"([0-9]*).*$/\1/p')
  if [["$JAVA_MAJOR_VERSION" -ge "9"]] ; then
    KAFKA_GC_LOG_OPTS="-Xlog:gc*:file=$LOG_DIR/$GC_LOG_FILE_NAME:time,tags:filecount=10,filesize=100M"
  else
    KAFKA_GC_LOG_OPTS="-Xloggc:$LOG_DIR/$GC_LOG_FILE_NAME -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M"
  fi
fi

# Remove a possible colon prefix from the classpath (happens at lines like `CLASSPATH="$CLASSPATH:$file"` when CLASSPATH is blank)
# Syntax used on the right side is native Bash string manipulation; for more details see
# http://tldp.org/LDP/abs/html/string-manipulation.html, specifically the section titled "Substring Removal"
CLASSPATH=${CLASSPATH#:}

# If Cygwin is detected, classpath is converted to Windows format.
((CYGWIN)) && CLASSPATH=$(cygpath --path --mixed "${CLASSPATH}")

# Launch mode
if ["x$DAEMON_MODE" = "xtrue"]; then
  nohup "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp "$CLASSPATH" $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
else
  exec "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp "$CLASSPATH" $KAFKA_OPTS "$@"
fi

脚本内容很长,然而实际上只有最初一部分才是真正在实现启动操作:

# Launch mode
if ["x$DAEMON_MODE" = "xtrue"]; then
  nohup "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp "$CLASSPATH" $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
else
  exec "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp "$CLASSPATH" $KAFKA_OPTS "$@"
fi

Launch modes

在脚本最初一段是无关启动形式的提醒。

# Launch mode
if ["x$DAEMON_MODE" = "xtrue"]; then
  nohup "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp "$CLASSPATH" $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
else
  exec "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp "$CLASSPATH" $KAFKA_OPTS "$@"
fi

这段脚本阐明了之前的一大堆脚本都是为了这里启动赋值进行的一系列操作,这里依据传递参数判断是否守护过程的形式启动。这里以应用比拟多的 守护过程 启动形式进行参数介绍(实际上两者差异不算很大)。

KAFKA_HEAP_OPTS

KAFKA_HEAP_OPTS 出自最结尾,判断堆参数 KAFKA_HEAP_OPTS是否为空,为空就默认给它赋值为 “-Xmx1G -Xms1G”。

KAFKA_JVM_PERFORMANCE_OPTS

这个值代表了 JVM 的启动参数。

# JVM performance options
# MaxInlineLevel=15 is the default since JDK 14 and can be removed once older JDKs are no longer supported
# MaxInlineLevel=15 是自 JDK 14 以来的默认值,一旦旧的 JDK 不再反对,就能够删除。if [-z "$KAFKA_JVM_PERFORMANCE_OPTS"]; then
  KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true"
fi

while [$# -gt 0]; do
  COMMAND=$1
  case $COMMAND in
    -name)
      DAEMON_NAME=$2
      CONSOLE_OUTPUT_FILE=$LOG_DIR/$DAEMON_NAME.out
      shift 2
      ;;
    -loggc)
      if [-z "$KAFKA_GC_LOG_OPTS"]; then
        GC_LOG_ENABLED="true"
      fi
      shift
      ;;
    -daemon)
      DAEMON_MODE="true"
      shift
      ;;
    *)
      break
      ;;
  esac
done

G1 垃圾收集器

Kafka 默认应用 G1 的垃圾收集器,自身最低 JDK 版本要求就是 JDK1.8。

-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true

MaxGCPauseMillis

-XX:MaxGCPauseMillis:GC 最大的进展毫秒数,暂停工夫默认值 200ms,如果设置比这个小的值,G1 收集器会尽可能的达到这个预期设置。

因为 Kafka 是十分激进的高并发分布式音讯队列,为了获取更高的并发,应用 20ms 的极限值值尽可能的缩小 GC 工夫,最初用极短 GC 的代价换取高吞吐,当然后果会导致垃圾回收不洁净。

但 Kafka 对于 JVM 自身的堆内存占用并不是很多,默认 20ms 的进展工夫其实是能够放心使用的。

此外从 Kafka 的设计来看,更频繁的 GC 是为了尽可能的触发 Full Gc,因为 Full Gc 是回收 Direct Memory 的条件,而 Kafka 大量应用了页缓存进步数据的 Log 的读写速度,底层用的也是 Java 的 Direct Memory。

InitiatingHeapOccupancyPercent

这个参数实际上出入比拟大,依据源码剖析在 JDK8b12 版本之后,以及 JDK11 之前这个参数和官网的文档形容,这个值的含意是合乎“整堆”来计算是否触发 Mixed Gc,然而JDK8b12 版本之后更高的补丁,以及 JDK11 之后就变了,它变成依据 老年代占整堆的比重

这样的出入问题源自此参数的 源码 BUG,这部分波及源码的探讨就不探讨了,具体能够看对于 G1 收集器参数 InitiatingHeapOccupancyPercent 的正确认知 – 豆大侠的菜地 (doudaxia.club)这篇大佬的文章剖析。

这里间接给出论断:

  • 如果你应用的 JDK 版本在 8b12 之前,XX:InitiatingHeapOccupancyPercent 是 整个堆 使用量与 堆总体容量的 比值;
  • 如果你应用的 JDK 版本在 8b12 之后(包含大版本 9、10、11….),那么 XX:InitiatingHeapOccupancyPercent老年代大小与堆总体容量的比值 这种说法和批改之后的 JVM 源码合乎。

整体算是一个暗藏已久的 BUG,因为 G1 的垃圾收集器设计角度看,它更关怀的是 Old Region 占满整个堆空间之前提前尽可能的进行回收,而不是简略的看看残余空间在整个堆空间的占比,因为残余空间不是一个非常牢靠的掂量值。

为了验证上文大佬的说法,集体也去参阅 JDK8 的 Oracle 文档:java (oracle.com)

-XX:c=percent
    Sets the percentage of the heap occupancy (0 to 100) at which to start a concurrent GC cycle. It is used by garbage collectors that trigger a concurrent GC cycle based on the occupancy of the entire heap, not just one of the generations (for example, the G1 garbage collector).

    By default, the initiating value is set to 45%. A value of 0 implies nonstop GC cycles. The following example shows how to set the initiating heap occupancy to 75%:

    -XX:InitiatingHeapOccupancyPercent=75

关键字 entire heap,也就是简略的残余空间和整堆的占比。这里同样接着翻阅了一下,直到 JDK12 版本,这个形容还是和 JDK8 的版本统一的。直到浏览长期反对的 JDK17 的文档,发现外面的说法终于变了:

Garbage-First (G1) Garbage Collector (oracle.com

XX:InitiatingHeapOccupancyPercent determines the initial value as a percentage of the size of the current old generation as long as there aren’t enough observations to make a good prediction of the Initiating Heap Occupancy threshold. Turn off this behavior of G1 using the option-XX:-G1UseAdaptiveIHOP. In this case, the value of -XX:InitiatingHeapOccupancyPercent always determines this threshold.。
“XX:启动堆占用百分比”将初始值确定为 以后老一代大小的百分比 ,只有没有足够的观测值来很好地预测起始堆占用阈值。
应用选项 ’-XX:-G1UseAdaptiveIHOP‘ 敞开 G1 的此行为。在这种状况下-XX:InitiatingHeapOccupancyPercent 启动堆占用百分比 ’ 的值始终确定此阈值。

所以这个值的实在含意和应用的 JDK 版本无关,并且 JDK8 的后续补丁版本也修复了这个问题,所以最终倡议是降级 JDK8 的补丁版本,或者应用 JDK11 之后的版本。

-XX:+ExplicitGCInvokesConcurrent

看似简略的参数,实际上又是暗藏这十分多的“坑”和细节,这里咱们划分更多的大节缓缓细品。

简略了解

这个参数是指通过应用 System.gc() 申请 启用并发 GC 的调用 默认禁用。如果没有非凡的利用场景,大部分状况下这个参数都是被倡议禁用的,而并发 GC 实际上就是 CMS 的并发回收解决。

集体在官网文档中搜到相似的参数形容:Garbage-First Garbage Collector Tuning (oracle.com)。

Other causes than Allocation Failure for a Full GC typically indicate that either the application or some external tool causes a full heap collection. If the cause is , and there is no way to modify the application sources, the effect of Full GCs can be mitigated by using or let the VM completely ignore them by setting . External tools may still force Full GCs; they can be removed only by not requesting them.System.gc()-XX:+ExplicitGCInvokesConcurrent -XX:+DisableExplicitGC

下面一大段的话粗心指的是:阻止内部调用 Full GC(也就是 System.gc())要么间接设置-XX:+DisableExplicitGC,要么设置-XX:+ExplicitGCInvokesConcurrent 进步强制 Full Gc 的效率,浏览源码发现这两个参 数不能一起开启 ,因为-XX:+ExplicitGCInvokesConcurrent 须要敞开 -XX:+DisableExplicitGC 参数能力失效。

局部文章也解释仅仅倡议在 G1 的垃圾收集器中能够应用-XX:+ExplicitGCInvokesConcurrent。其余垃圾收集器不倡议应用。

Kafka 官网修复 BUG:-XX:+DisableExplicitGC 改为 -XX:+ExplicitGCInvokesConcurrent

为什么两者只能选其一应用,JDK 8 的 JVM 中存在相似的代码能够给予解释。

bool GenCollectedHeap::should_do_concurrent_full_gc(GCCause::Cause cause) {
    // 查看参数 -XX:+DisableExplicitGC 和 -XX:+ExplicitGCInvokesConcurrent
  return UseConcMarkSweepGC &&
         ((cause == GCCause::_gc_locker && GCLockerInvokesConcurrent) ||
         // -XX:+ExplicitGCInvokesConcurrent 须要满足不配置 -XX:+DisableExplicitGC 的条件,能力断定为 true
          (cause == GCCause::_java_lang_system_gc && ExplicitGCInvokesConcurrent));
}

void GenCollectedHeap::collect(GCCause::Cause cause) {
    // 查看参数 -XX:+DisableExplicitGC 和 -XX:+ExplicitGCInvokesConcurrent
  if (should_do_concurrent_full_gc(cause)) {
#ifndef SERIALGC
    // mostly concurrent full collection
    collect_mostly_concurrent(cause);
#else  // SERIALGC
    ShouldNotReachHere();
#endif // SERIALGC
  } else {
#ifdef ASSERT
    if (cause == GCCause::_scavenge_alot) {
      // minor collection only
      collect(cause, 0);
    } else {
      // Stop-the-world full collection
      // STW 进行 Full Gc
      collect(cause, n_gens() - 1);
    }
#else
    // Stop-the-world full collection
    collect(cause, n_gens() - 1);
#endif
  }
}

collect 里一结尾就有个判断,如果 should_do_concurrent_full_gc 返回 true,那会执行 collect_mostly_concurrent 做并行的回收。

回到 Kafka 的服务端参数,KafKa 最后的服务端启动脚本中,此参数理论为 -XX:+DisableExplicitGC,然而后续被指出会影响间接内存的回收性能,并且很可能会导致 间接内存无奈被回收!

为什么会有这么重大 ? 这里先不急着剖析,而是先看看作者的这个 issue 的提交:

KAFKA-5470: Replace -XX:+DisableExplicitGC with -XX:+ExplicitGCInvokesConcurrent in kafka-run-class by ijuma · Pull Request #3371 · apache/kafka (github.com)

提交者的原话是:

This is important because Bits.reserveMemory calls System.gc() hoping to free native
memory in order to avoid throwing an OutOfMemoryException. This call is currently
a no-op due to -XX:+DisableExplicitGC.

It’s worth mentioning that -XX:MaxDirectMemorySize can be used to increase the
amount of native memory available for allocation of direct byte buffers.

简略来说就是 Bits.reserveMemory 外面会有 System.gc() 调用,通过程序强制调用 Full Gc 来回收掉 native 内存,所以倡议在 JVM 参数中删掉 -XX:+DisableExplicitGC,开启System.gc(); 并且通过增加 -XX:+ExplicitGCInvokesConcurrentSystem.gc()调用效率更高一些。

另外大佬这里还提了一嘴 -XX:MaxDirectMemorySize 能够用来进步可用于调配间接字节缓冲区的本地内存的数量。
大佬一句话就是一个知识点,牛呀。

Bits#reserveMemory

既然提交者提到了 Bits#reserveMemory,这里就顺带贴一下官网 jdk8 的java.nio.Bits#reserveMemory 源码不便了解:

// These methods should be called whenever direct memory is allocated or
// freed.  They allow the user to control the amount of direct memory
// which a process may access.  All sizes are specified in bytes.

// 每当调配间接内存或开释。它们容许用户管制间接内存的数量过程能够拜访的内容。所有大小均以字节为单位指定。static void reserveMemory(long size, int cap) {if (!memoryLimitSet && VM.isBooted()) {maxMemory = VM.maxDirectMemory();
        memoryLimitSet = true;
    }

    // optimist!
    if (tryReserveMemory(size, cap)) {return;}

    final JavaLangRefAccess jlra = SharedSecrets.getJavaLangRefAccess();

    // retry while helping enqueue pending Reference objects
    // which includes executing pending Cleaner(s) which includes
    // Cleaner(s) that free direct buffer memory
    while (jlra.tryHandlePendingReference()) {if (tryReserveMemory(size, cap)) {return;}
    }

    // trigger VM's Reference processing
    System.gc();

    // a retry loop with exponential back-off delays
    // (this gives VM some time to do it's job)
    boolean interrupted = false;
    try {
        long sleepTime = 1;
        int sleeps = 0;
        while (true) {if (tryReserveMemory(size, cap)) {return;}
            if (sleeps >= MAX_SLEEPS) {break;}
            if (!jlra.tryHandlePendingReference()) {
                try {Thread.sleep(sleepTime);
                    sleepTime <<= 1;
                    sleeps++;
                } catch (InterruptedException e) {interrupted = true;}
            }
        }

        // no luck
        throw new OutOfMemoryError("Direct buffer memory");

    } finally {if (interrupted) {
            // don't swallow interrupts
            Thread.currentThread().interrupt();
        }
    }
}

咱们通过浏览 JDK8 的 Nio 包的这部分用于调配 DirectMememory 的一段代码,发现 每次Direct Mememory 进行理论的调配动作之前,都会调用这个办法检测是否有足够空间调配时都被调用,不过外面的逻辑奇奇怪怪的,初看的确有点摸不着头脑。

国外有网友间接痛骂了这一段代码是一坨 Shit:java.nio.Bits.reserveMemory uses a lock, calls System.gc, and is generally bad code… (google.com)

1.  ALL memory access requires a lock.  That's evil if you're allocating small chunks.

2.  The code to change the reserved memory counters is duplicated twice.  This is a great way to introduce bugs.  (how did this even get approved? do they not do code audits or require that commits be approved?)

3.  If you are out of memory we call System.gc... EVIL.  The entire way direct memory is reclaimed via GC is a horrible design.

4.  After GC they sleep 100ms.  What's that about?  Why 100ms?  Why not 1ms?  
  1. 所有的内存拜访都须要一个锁。如果你调配的是小块的内存几乎就是噩梦。
  2. 扭转保留内存计数器的代码反复了两次。这是个引入谬误的好办法。(难道他们不进行代码审计或要求提交的代码必须失去批准吗?)
  3. 如果你没有内存了,咱们就调用 System.gc… 通过 GC 回收间接内存的整个形式是一个可怕的设计。
  4. 在 GC 之后,他们会休眠 100ms。那是什么意思?为什么是 100ms?为什么不是 1ms?

集体并不感冒这些评论,这里拎出 System.gc() 这行代码来剖析具体用意。要看懂这一行代码的用意,咱们须要理解 DirectMemory 关联的本机内存是如何清理的,这里就间接给出答案了。

JVM 实际上是管不到 DirectMemory 的,须要依附非凡的形式回收掉 DirectMemory:

  • 手动调用 unsafe.freeMemory() 进行开释,nettyByteBuf.release() 就是这种形式实现的;
  • 利用 GC 机制 在 GC 的过程中主动调用 unsafe.freeMemory() 开释被援用的间接内存;

这段代码作者的用意显著是显示调用 System.gc(),尽可能回收不可达的DirectByteBuffer 对象,也只有通过 GC 才会主动触发 unsafe.freeMemory() 的调用,开释间接内存。

至于其余代码 ….. 这里不做过多评论。

Fix -XX:+DisableExplicitGC

基于以上种种原因,Kafka 官网最终提交了一个 Commit 修复这个问题:

Fix run class to work with Java 10 and use ExplicitGCInvokesConcurrent by ijuma · Pull Request #1329 · confluentinc/ksql (github.com)

具体的调整细节能够看上面的连贯,读者能够通过比照本人的下载 Kafka 启动脚本查看是否修复这个问题:

KAFKA-5470: Replace -XX:+DisableExplicitGC with -XX:+ExplicitGCInvokesConcurrent in kafka-run-class by ijuma · Pull Request #3371 · apache/kafka (github.com)

其余参考资料

上面的这些参考资料能够帮忙咱们更深刻的了解 -XX:+ExplicitGCInvokesConcurrent 参数附带的知识点:

-XX:+ExplicitGCInvokesConcurrent 的含意:What is JVM startup parameter: -XX:+ExplicitGCInvokesConcurrent? – yCrash Answers

官网的 G1 文档:Java HotSpot Garbage Collection (oracle.com)

为什么仅限 G1 能够开启此参数来进行健康检查,其余垃圾收集器倡议敞开此参数:Health Check: Explicit Garbage Collection | Jira | Atlassian Documentation

JVM 源码剖析之 SystemGC 齐全解读 | HeapDump 性能社区

  • 为什么不能同时设置 -XX:+DisableExplicitGC 以及 -XX:+ExplicitGCInvokesConcurrent
  • 为什么 CMS GC 下 -XX:+ExplicitGCInvokesConcurrent 这个参数加了之后会比真正的 Full GC 好?
  • 它如何做到暂停整个过程?
  • 堆外内存调配为什么有时候要配合 System.gc?
  • Netty 回收堆外内存的策略又是如何?
小结

笔者也没有想到一个简略的参数能牵扯出这么多内容,这里做一个大略的总结:

  • Kafka 官网已经禁用过System.gc()
  • 前面有大神剖析了脚本和 JDK 的 NIO 源码,发现禁用 System.gc() 这不是有问题嘛,你 Kafka 大量应用 Java 的间接内存,间接内存靠个别的 Gc 是回收不掉的,只能靠 Ful Gc 顺带回收,JDK 官网代码又是靠频繁调用 System.gc() 强制腾出间接内存空间的,你 System.gc() 禁用了不是“找死”么,于是连忙解释了一波 Bits#reserveMemory 写的“垃圾代码”来证实本人的观点,而后倡议启用System.gc(),并且为了进步 Full Gc 效率应用-XX:+ExplicitGCInvokesConcurrent
  • 官网发现这个问题连忙修复了一版并且提交了 issue。

MaxInlineLevel

java 有一个参数 -XX:MaxInlineLevel(JDK14 之前默认值为 9),这个值在 JDK14 之后默认值改为 15。这个值的批改能够参考 JDK 官网的申明 https://bugs.openjdk.org/browse/JDK-8234863。

上面的图 Oracle 官网对于 JDK14 版本之后批改 MaxInlineLevel=15 的 Push。

上面简明扼要源自网上收集的材料和集体了解,其实简略了解为古代硬件资源足以反对 -XX:MaxInlineLevel设置为 15,更大的内联深度能够让 JIT 编译出更多的本地代码从而进步 Java 代码的运行效率即可。

如果你的服务器还是古旧的四五年前的机器,或者生产机器的确渣的能够,那么还是倡议把这个参数 -XX:MaxInlineLevel改回 9 比拟得当。

简明扼要的局部:

链接 https://bugs.openjdk.org/browse/JDK-8234863 的申明指出,15 这个值在 scala 上的性能测试是被认为最优后果。这个值在古代处理器速度以及性能优化较好的明天最为适合,默认值 9 这个数字显得十分 过期

Kafka 作为激进压迫机器性能的榜样,也听从 JDK 官网的改变默认所有版本的 JDK 对立应用 15 这个默认值。

这里额定插一嘴,集体认为实际上这个值 Oracle 官网在 JDK11 就能够批改为 15。

MaxInlineLevel 自身的判断逻辑仿佛更引起宽广程序员的关注,StackFlow 上有一篇对于这个参数的探讨:https://stackoverflow.com/questions/32503669/why-does-the-jvm-have-a-maximum-inline-depth 比拟有意思。

在评论中有网友指出在比拟低的 JDK8 版本当中,MaxRecursiveInlineLevel 对 间接 间接 的递归调用都进行计数,编译后的代码应该在运行时放弃对整个内联树的跟踪(以便可能解压和去优化)。紧接着是其他人的一些个人观点,没人接这个人话茬 =-=,难堪。

持续翻阅,上面的评论有一位大佬解释了为什么会呈现 MaxInlineLevel 这个参数,简略易懂这里就间接贴过来了:

One reason is also that the inlining itself in the HotSpot JVM is implemented with recursion. Every time inlining of a method is started a new context is created on the native stack. Allowing an unlimited depth would eventually make the JIT-compiler crash when it runs out of stack.

(旧版激进的限度办法内联深度),其中一个起因是 HotSpot JVM 的内联自身是用递归实现的。每次对一个办法进行内联时,都会在本地堆栈中创立一个新的上下文。如果容许有限的深度,最终会使 JIT- 编译器在堆栈耗尽时解体。

在过来硬件资源缓和的状况下,适度的办法内联有可能会呈现比拟深的堆栈调用,非常耗费程序内存,然而古代内存动不动就是 32,64,128G 的明天,加上处理器的外围数量上来了之后,扩充默认的办法内联深度参数值的确十分有必要。

办法内联是 JVM 比拟底层的优化,能够通过周大神的《深刻了解 JVM 虚拟机第三版》理解。

如果不懂办法内联间接无脑设置 MaxInlineLevel=15 即可,没有为什么,官网都曾经在高版本 JDK 批改了默认值,JDK8 忠诚粉丝天然也能够这么干。

jdk14 hotspot 依赖的调整日志:https://hg.openjdk.org/jdk8u/jdk8u/hotspot/

-Djava.awt.headless=true

这个参数比拟奇怪,然而实际上在 SpringBoot 源码中也有同样的写法。

这算是一个不太被关注的优化参数,简略了解是 -Djava.awt.headless=true 能够屏蔽掉一些不必要的外置设施影响,告知程序以后没有外置设施,尽可能的让程序底层本人模仿,比方打印从图形显示变为控制台打印。

又是牵扯内容很多的一个点,具体解释能够看这篇文章:[[【Java】The Java Headless Mode]],篇幅无限,这里就不多解释了。

KAFKA_GC_LOG_OPTS

见名知意,就是 JVM 的日志参数配置,Kafka 最终的日志格局为:XXX-gc.log,日志配置这一块和大部分以 JAVA 为底层的开源组件大差不差,简略的扫一眼差不多了。

# Log directory to use
# 获取 log_dir,如果没配置就那 $base_dir 环境变量
if ["x$LOG_DIR" = "x"]; then
  # base_dir=$(dirname $0)/..
  LOG_DIR="$base_dir/logs"
fi


# GC options    
GC_FILE_SUFFIX='-gc.log'
GC_LOG_FILE_NAME=''if ["x$GC_LOG_ENABLED"="xtrue"]; then
  GC_LOG_FILE_NAME=$DAEMON_NAME$GC_FILE_SUFFIX

  # The first segment of the version number, which is '1' for releases before Java 9
  # it then becomes '9', '10', ...
  # Some examples of the first line of `java --version`:
  # 8 -> java version "1.8.0_152"
  # 9.0.4 -> java version "9.0.4"
  # 10 -> java version "10" 2018-03-20
  # 10.0.1 -> java version "10.0.1" 2018-04-17
  # We need to match to the end of the line to prevent sed from printing the characters that do not match
  JAVA_MAJOR_VERSION=$("$JAVA" -version 2>&1 | sed -E -n 's/.* version"([0-9]*).*$/\1/p')
  if [["$JAVA_MAJOR_VERSION" -ge "9"]] ; then
    KAFKA_GC_LOG_OPTS="-Xlog:gc*:file=$LOG_DIR/$GC_LOG_FILE_NAME:time,tags:filecount=10,filesize=100M"
  else
    KAFKA_GC_LOG_OPTS="-Xloggc:$LOG_DIR/$GC_LOG_FILE_NAME -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M"
  fi
fi

JAVA_MAJOR_VERSION就是通过正则去除 JDK 的主版本号。

  • 如果主版本号大于或者等于 JDK9,就应用 JDK9 新增的 -xlog:gc* 对立的日志参数作为启动参数,
  • 如果是 JDK8 之前的版本,就须要用一大堆旧版的日志参数,学习和应用老本比拟大:

    • GCLogFileSize=100M,限度 GC 日志文件大小为 100M。
    • NumberOfGCLogFiles=10,容许存在的 GC 日志文件数量为 10 个。
    • UseGCLogFileRotation,让 GC 日志一直循环,如果最初一个 GC 日志写满,将会从第一个文件从新开始写入
    • -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps 是旧版本凌乱的 GC 参数配置诞生的恶果,这些参数在 JDK9 之后被通通被-xlog:gc* 代替。

      • -verbose:gc -XX:+PrintGCDetails这两个参数常常在低版本 JDK 一起呈现,最大的区别是前者是稳固版本,后者则是被认为是不稳固的日志启动参数(强制和其余 GC 参数配合呈现显得不稳固)。
      • -XX:+PrintGCDateStamps:每行结尾显示以后相对的日期及工夫,打印 GC 产生时的工夫戳,搭配 -XX:+PrintGCDetails 应用,不能够独立应用。
      • -XX:+PrintGCTimeStamps 自从 JVM 启动 以来的工夫。

-XX:+PrintGCDateStamps-XX:+PrintGCTimeStamps 能够间接看上面的例子比照:

-XX:+PrintGCDateStamps
日志输入示例:2014-01-03T12:08:38.102-0100: [GC 66048K->53077K(251392K), 0,0959470 secs]
2014-01-03T12:08:38.239-0100: [GC 119125K->114661K(317440K), 0,1421720 secs]

-XX:+PrintGCTimeStamps
日志输入示例:0,185: [GC 66048K->53077K(251392K), 0,0977580 secs]
0,323: [GC 119125K->114661K(317440K), 0,1448850 secs]

因为 -XX:+PrintGCDetails 被标记为 manageable,所以能够通过如下三种形式批改:
1、com.sun.management.HotSpotDiagnosticMXBean API
2、JConsole
3、jinfo -flag

最初再把英文正文局部简略翻译一下:

  1. 第一个参数如果是 1 结尾,代表是 JDK9 之后的版本。
  2. java --version产生的后果如下:

    • 8 -> java version “1.8.0_152”
    • 9.0.4 -> java version “9.0.4”
    • 10 -> java version “10” 2018-03-20
    • 10.0.1 -> java version “10.0.1” 2018-04-17
  3. 通过正则表达式匹配到行尾,以避免 sed 打印出不匹配的字符

KAFKA_JMX_OPTS

JMX 全称 Java Management Extensions, 为 Java 利用提供治理扩大性能。在 JDK 5 的时候引入,Kafka 设置启动参数让 Kafka 应用程序取得 JMX 近程调用的反对。

# JMX settings
if [-z "$KAFKA_JMX_OPTS"]; then
  KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false  -Dcom.sun.management.jmxremote.ssl=false"
fi

KAFKA_JMX_OPTS 对应的 value 的含意参考自:https://www.jianshu.com/p/414647c1179e, 此处列举一些无关 JMX 的相干参数:

参数名 类型 形容
-Dcom.sun.management.jmxremote 布尔 是否反对近程 JMX 拜访,默认 true
-Dcom.sun.management.jmxremote.port 数值 监听端口号,不便近程拜访
-Dcom.sun.management.jmxremote.authenticate 布尔 是否须要开启用户认证, 默认开启
-Dcom.sun.management.jmxremote.ssl 布尔 是否对连贯开启 SSL 加密,默认开启
-Dcom.sun.management.jmxremote.access.file 门路 对拜访用户的权限受权的文件的门路,默认门路JRE_HOME/lib/management/jmxremote.access
-Dcom.sun.management.jmxremote. password.file 门路 设置拜访用户的用户名和明码,默认门路JRE_HOME/lib/management/ jmxremote.password

KAFKA_LOG4J_OPTS

log4j 的日志配置地址。

if ["x$KAFKA_LOG4J_OPTS" = "x"]; then  
    export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/log4j.properties"  
fi  

配置含意不须要记忆,在浏览的时候查阅相干材料即可:https://www.jianshu.com/p/ccafda45bcea,这里间接贴过来作为正文局部供读者参考。

# 根 Log
# 默认日志等级为 INFO 级别
# NFO、WARN、ERROR 和 FATAL 级别的日志信息都会输入
# 日志最终输入到 kafkaAppender
log4j.rootLogger=INFO, stdout, kafkaAppender  
# 控制台配置
log4j.appender.stdout=org.apache.log4j.ConsoleAppender  
# 布局模式应用能够灵便模式
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout  
# 日志打印格局
# [%d] 输入日志工夫点的日期或工夫,默认格局为 ISO8601,也能够在其后指定格局,如:%d{yyyy/MM/dd HH:mm:ss,SSS}。# %m::输入代码中指定的具体日志信息。# %p:输入日志信息的优先级,即 DEBUG,INFO,WARN,ERROR,FATAL。# %c:输入日志信息所属的类目,通常就是所在类的全名。# %n:输入一个回车换行符,Windows 平台为 "\r\n",Unix 平台为 "\n"。log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n  

# DailyRollingFileAppender Kafka 默认服务端日志
log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender  
log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM-dd-HH  
# server.log 存储地位
log4j.appender.kafkaAppender.File=${kafka.logs.dir}/server.log  
log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout  
log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n  

# DailyRollingFileAppender 状态机变更日志
log4j.appender.stateChangeAppender=org.apache.log4j.DailyRollingFileAppender  
# 依照每小时产生一个日志的形式
log4j.appender.stateChangeAppender.DatePattern='.'yyyy-MM-dd-HH   
log4j.appender.stateChangeAppender.File=${kafka.logs.dir}/state-change.log  
log4j.appender.stateChangeAppender.layout=org.apache.log4j.PatternLayout  
log4j.appender.stateChangeAppender.layout.ConversionPattern=[%d] %p %m (%c)%n  

# 申请日志
log4j.appender.requestAppender=org.apache.log4j.DailyRollingFileAppender  
log4j.appender.requestAppender.DatePattern='.'yyyy-MM-dd-HH  
log4j.appender.requestAppender.File=${kafka.logs.dir}/kafka-request.log  
log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout  
log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n  

# Log 清理日志
log4j.appender.cleanerAppender=org.apache.log4j.DailyRollingFileAppender  
log4j.appender.cleanerAppender.DatePattern='.'yyyy-MM-dd-HH  
log4j.appender.cleanerAppender.File=${kafka.logs.dir}/log-cleaner.log  
log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout  
log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n  

# Controller 日志
log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender  
log4j.appender.controllerAppender.DatePattern='.'yyyy-MM-dd-HH  
log4j.appender.controllerAppender.File=${kafka.logs.dir}/controller.log  
log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout  
log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n  

# 验证日志
log4j.appender.authorizerAppender=org.apache.log4j.DailyRollingFileAppender  
log4j.appender.authorizerAppender.DatePattern='.'yyyy-MM-dd-HH  
log4j.appender.authorizerAppender.File=${kafka.logs.dir}/kafka-authorizer.log  
log4j.appender.authorizerAppender.layout=org.apache.log4j.PatternLayout  
log4j.appender.authorizerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n  
  
# Change the line below to adjust ZK client logging  
# 批改上面的日志管制 ZK 的日志输入
log4j.logger.org.apache.zookeeper=INFO  
  
# Change the two lines below to adjust the general broker logging level (output to server.log and stdout)  
# 更改上面两行以调整个别代理日志记录级别(输入到 server.log 和 stdout)log4j.logger.kafka=INFO  
log4j.logger.org.apache.kafka=INFO  
  
# Change to DEBUG or TRACE to enable request logging  
# 批改日志级别为 DEBUG 和 TRACE 获取申请日志
log4j.logger.kafka.request.logger=WARN, requestAppender  
log4j.additivity.kafka.request.logger=false  
  
# Uncomment the lines below and change log4j.logger.kafka.network.RequestChannel$ to TRACE for additional output  
# 勾销正文上面的行并将 log4j.logger.kafka.network.RequestChannel$ 更改为 TRACE 以取得额定的输入
# related to the handling of requests  
# 与申请的解决相干
#log4j.logger.kafka.network.Processor=TRACE, requestAppender  
#log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender  
#log4j.additivity.kafka.server.KafkaApis=false  

log4j.logger.kafka.network.RequestChannel$=WARN, requestAppender  
log4j.additivity.kafka.network.RequestChannel$=false  
  
log4j.logger.kafka.controller=TRACE, controllerAppender  
log4j.additivity.kafka.controller=false  
  
log4j.logger.kafka.log.LogCleaner=INFO, cleanerAppender  
log4j.additivity.kafka.log.LogCleaner=false  
  
log4j.logger.state.change.logger=INFO, stateChangeAppender  
log4j.additivity.state.change.logger=false  
  
# Access denials are logged at INFO level, change to DEBUG to also log allowed accesses 
# 回绝拜访记录在 INFO 级别,更改为 DEBUG 以记录容许的拜访
log4j.logger.kafka.authorizer.logger=INFO, authorizerAppender  
log4j.additivity.kafka.authorizer.logger=false

KAFKA_OPTS

KAFKA_OPTS 能够在这里设置本人的想要的通用配置:

# Generic jvm settings you want to add
if [-z "$KAFKA_OPTS"]; then
  KAFKA_OPTS=""
fi

UPGRADE_KAFKA_STREAMS_TEST_VERSION

变量名称翻译过去是“降级 kafka 流的测试版本”,这里大抵的意思是取出版本号进行一些判断之后设置到 ClassPath 当中。

说实话这部分内容看不太懂,然而不算是非常重要的货色,能够当前深刻之后回来理解,这里间接遗记这个设置即可。

if [-z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION"]; then
  for file in "$base_dir"/streams/examples/build/libs/kafka-streams-examples*.jar;
  do
    if should_include_file "$file"; then
      CLASSPATH="$CLASSPATH":"$file"
    fi
  done
else
  VERSION_NO_DOTS=`echo $UPGRADE_KAFKA_STREAMS_TEST_VERSION | sed 's/\.//g'`
  SHORT_VERSION_NO_DOTS=${VERSION_NO_DOTS:0:((${#VERSION_NO_DOTS} - 1))} # remove last char, ie, bug-fix number
  for file in "$base_dir"/streams/upgrade-system-tests-$SHORT_VERSION_NO_DOTS/build/libs/kafka-streams-upgrade-system-tests*.jar;
  do
    if should_include_file "$file"; then
      CLASSPATH="$file":"$CLASSPATH"
    fi
  done
  if ["$SHORT_VERSION_NO_DOTS" = "0100"]; then
    CLASSPATH="/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs/zkclient-0.8.jar":"$CLASSPATH"
    CLASSPATH="/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs/zookeeper-3.4.6.jar":"$CLASSPATH"
  fi
  if ["$SHORT_VERSION_NO_DOTS" = "0101"]; then
    CLASSPATH="/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs/zkclient-0.9.jar":"$CLASSPATH"
    CLASSPATH="/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs/zookeeper-3.4.8.jar":"$CLASSPATH"
  fi
fi

CLASSPATH

运行 ./gradlew copyDependantLibs 来获取本地目录下的所有依赖性 jar。

留神这里划分了很多个子模块,所以应用了 for 循环加载到 CLASSPATH 当中,这会导致最终产生的命令会十分长。

# run ./gradlew copyDependantLibs to get all dependant jars in a local dir
shopt -s nullglob
if [-z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION"]; then
  for dir in "$base_dir"/core/build/dependant-libs-${SCALA_VERSION}*;
  do
    CLASSPATH="$CLASSPATH:$dir/*"
  done
fi

for file in "$base_dir"/examples/build/libs/kafka-examples*.jar;
do
  if should_include_file "$file"; then
    CLASSPATH="$CLASSPATH":"$file"
  fi
done

集体尝试了一下正文介绍的 gradle copyDependantLibs 命令,本地执行后果如下,这个命令会在对应的模块构建依赖 jar 包:

$ gradle copyDependantLibs

> Configure project :
Building project 'core' with Scala version 2.13.3
Building project 'streams-scala' with Scala version 2.13.3

Deprecated Gradle features were used in this build, making it incompatible with Gradle 7.0.
Use '--warning-mode all' to show the individual deprecation warnings.
See https://docs.gradle.org/6.6.1/userguide/command_line_interface.html#sec:command_line_warnings

BUILD SUCCESSFUL in 2s
55 actionable tasks: 3 executed, 52 up-to-date

集体是 win11 的电脑,通过 wox 查找 dependant-libs 后果如下:

对应的一堆依赖 jar 包

CONSOLE_OUTPUT_FILE

日志的打印输出地址文件地址设置,留神不是 GC 的日志。

  case $COMMAND in
    -name)
      DAEMON_NAME=$2
      CONSOLE_OUTPUT_FILE=$LOG_DIR/$DAEMON_NAME.out
      shift 2
      ;;

这里翻阅了一些无关 shift 的材料:

对于 Shift 的作用能够参考:https://ss64.com/bash/shift.html。Linux 中通过 help shift 查看使用手册,然而会发现写的比拟潦草和形象。

shift: shift [n]
    Shift positional parameters.
    
    Rename the positional parameters $N+1,$N+2 ... to $1,$2 ...  If N is
    not given, it is assumed to be 1.
    
    Exit Status:
    Returns success unless N is negative or greater than $#

比方上面的程序:

#! /bin/bash

echo $1 
echo $2

shift 1

echo $1
echo $2

# 输入后果
[zxd@localhost ~]$ ./test.sh 1 2 3 5 6
1
2

2
3

shift 1 执行之后会弹出第一个参数,之后的运行参数会往前“推动”,$1 变为 $2 的值,$2 变为 $3 的值,以此类推。

& 后盾启动和 nohup 挂起过程

开端局部是设置 ClaassPath,用户本人自定义参数以及把规范输出和输入重定向到同一个地位,最初就是当前台模式启动并且最终通过 nohup 挂起整个过程。

-cp "$CLASSPATH" $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &

这段脚本最后面把代替参数意义进行了替换。-cp是 nohup 命令的参数,接着是把输入的后果全副重定向到规范输入当中,这个地址对应CONSOLE_OUTPUT_FILE

上面了解最初局部的 nohup 和 &2>&1/dev/null这几个常见的服务端脚本启动参数的含意。

nohup 和 &

nohup:nohup 指令会疏忽所有挂断(SIGHUP)信号不挂断的运行。留神 nohup 命令自身并没有后盾运行的性能,须要配合 & 应用。它的实现原理是让命令不间断的运行实现挂机的成果。

& 是指在后盾运行,但当用户退出(解除挂起)的时候,命令主动也跟着退出,nohup& 这两个指令通常会放到一起应用。

/dev/null

这个空间属于 Linux 的一块非凡空间,UNIX 零碎中,它被称为空设施。以下内容摘自维基百科:

/dev/null(或称空设施)在类 Unix 零碎中是一个非凡的设施文件,它抛弃所有写入其中的数据(但报告写入操作胜利),读取它则会立刻失去一个 EOF[1]。

在程序员行话,尤其是 Unix 行话中,/dev/null 被称为比特桶或者 黑洞

2>&1 的问题

后面的第一个数字 2 通常对应上面几种含意:

  • 0 – stdin (standard input) 规范输出
  • 1 – stdout (standard output) 规范输入
  • 2 – stderr (standard error) 规范谬误输入

\> 是重定向符号,而数字 2 的含意是规范谬误输入,&1指的就是规范输入,三个符号组合到一起就是把规范谬误输入输出重定向到规范输入当中,这里能够了解为“合流”。

留神命令 2>&12>1 是存在区别的,这里 & 不能丢,后者的 1 代表输入代表谬误重定向到一个 文件 1 ,不代表规范输入,只有 &1 才代表规范输入。

如果想要抛弃所有的规范谬误输入和规范输入后果,上面是一个不错的例子:

nohup python3 getfile.py > /dev/null 2>&1 &

如果想要写入到指定的地位,上面是又一个不错的例子:

nohup python3 getfile.py > test.log 2>&1 &

最初是理论一点的例子:

0 9 \* \* \* /usr/bin/python3 /opt/getFile.py > /opt/file.log 2>&1

下面的命令含意是放在 crontab 中的定时工作,每天 9:00 启动这个 python 的脚本,并把执行后果写入日志文件 file.log 中

exec 运行

如果不是守护过程的执行,则是应用 exec 在以后的 shell 中进行失常模式启动,此时整个 shell 会挂起运行 kafka 服务端。

  exec "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp "$CLASSPATH" $KAFKA_OPTS "$@"

其余内容

Lauch modes 应用到的变量设置蕴含了启动整个 Kafka 服务端的外围局部,上面再列觉其余的依赖配置以及“辅助”内容。

Scala 版本抉择

Kafka 是应用 Java 和 Scala 混合编写的,依据不同的 Kafka 版本须要不同版本的 Scala 版本反对,这里官网做了一个版本抉择强制判断抉择出最合适的 Scala。

if [-z "$SCALA_VERSION"]; then
  SCALA_VERSION=2.13.3
  if [[-f "$base_dir/gradle.properties"]]; then
    SCALA_VERSION=`grep "^scalaVersion=" "$base_dir/gradle.properties" | cut -d= -f 2`
  fi
fi

gradle.properties

Kafka 我的项目是基于 gradle 构建的,gradle 集体平时根本没啥接触机会,这里做一个大抵配置理解。

group=org.apache.kafka  
# NOTE: When you change this version number, you should also make sure to update  
# the version numbers in  
#  - docs/js/templateData.js  
#  - tests/kafkatest/__init__.py  
#  - tests/kafkatest/version.py (variable DEV_VERSION)  
#  - kafka-merge-pr.py  
version=2.7.2  
scalaVersion=2.13.3  
task=build  
org.gradle.jvmargs=-Xmx2g -Xss4m -XX:+UseParallelGC

gradle 这里调配的是 2g 的堆内存,Xss4m 每个线程的堆栈大小为 4M,最初是应用 ParallelGC 垃圾收集器,也是 JDK8 的默认垃圾收集器。

DEBUG 模式

如果在启动参数外面设置了KAFKA_DEBUG,就能够开启 DEBUG 模式。

# Set Debug options if enabled
if ["x$KAFKA_DEBUG" != "x"]; then

    # Use default ports
    DEFAULT_JAVA_DEBUG_PORT="5005"

    if [-z "$JAVA_DEBUG_PORT"]; then
        JAVA_DEBUG_PORT="$DEFAULT_JAVA_DEBUG_PORT"
    fi

    # Use the defaults if JAVA_DEBUG_OPTS was not set
    DEFAULT_JAVA_DEBUG_OPTS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=${DEBUG_SUSPEND_FLAG:-n},address=$JAVA_DEBUG_PORT"
    if [-z "$JAVA_DEBUG_OPTS"]; then
        JAVA_DEBUG_OPTS="$DEFAULT_JAVA_DEBUG_OPTS"
    fi

    echo "Enabling Java debug options: $JAVA_DEBUG_OPTS"
    KAFKA_OPTS="$JAVA_DEBUG_OPTS $KAFKA_OPTS"
fi

咱们须要理解的是 JAVA_DEBUG_OPTS 命令的含意。起初尽管不是很懂上面的参数含意,然而能够晓得是 JAVA 调试应用程序用的。

-agentlib:jdwp=transport=dt_socket,server=y,suspend=${DEBUG_SUSPEND_FLAG:-n},address=$JAVA_DEBUG_PORT

咱们调试程序更多是在 IDE 外面,上面的内容来自网络材料整合参考和了解:

Debugging Java applications) 这篇文章大略介绍了如何在 JVM 启动之后调试 JAVA 程序,以及如何在应用 JDK 调试应用程序。

若要调试 Java 过程,能够应用 Java 调试器(JDB)利用过程或其余调试器,这些调试器通过应用 SDK 为操作系统提供的 Java™ 平台调试器体系结构(JPDA)进行通信。

在 Linux 零碎当中进行 JAVA 过程调试能够应用上面的命令。对于咱们来说这些写法照着写就行,不须要过分查究具体的含意。

java -agentlib:jdwp=transport=dt_socket,server=y,address=_<port>_ <class>

调试近程服务器运行的 JAVA 应用程序,在 Window 中和 Linux 中调试形式如下:

  • On Windows systems:
jdb -connect com.sun.jdi.SocketAttach:hostname=<host>,port=<port>
  • On other systems:
jdb -attach <host>:<port>

此外 Stack-Flow 上还有一个写的更棒的帖子,这篇帖子的参数和 Kafka 的脚本局部基本一致了。

debugging – What are Java command line options to set to allow JVM to be remotely debugged? – Stack Overflow

Before Java 5.0, use -Xdebug and -Xrunjdwp arguments. These options will still work in later versions, but it will run in interpreted mode instead of JIT, which will be slower.

JDK5 之前的版本这里能够间接疏忽。(晓得了也没啥用途)

From Java 5.0, it is better to use the -agentlib:jdwp single option:

JDK5 之后应用上面的命令格局:

-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=1044

Options on -Xrunjdwp or agentlib:jdwp arguments are :

  • transport=dt_socket : means the way used to connect to JVM (socket is a good choice, it can be used to debug a distant computer)
  • address=8000 : TCP/IP port exposed, to connect from the debugger,
  • suspend=y : if ‘y’, tell the JVM to wait until debugger is attached to begin execution, otherwise (if ‘n’), starts execution right away.
  • transport=dt_socket:示意用于连贯 JVM 的形式(socket 是一个不错的抉择,它能够用来调试近程计算机)
  • address=8000:TCP / IP 端口公开,从调试器连贯。
  • suspend=y:如果为“y”,则通知 JVM 等到连贯调试器后再开始执行,否则(如果为“n”),立刻开始执行。

最初比照一下 Kafka 的参数,恍然大悟。

-agentlib:jdwp=transport=dt_socket,server=y,suspend=${DEBUG_SUSPEND_FLAG:-n},address=$JAVA_DEBUG_PORT

Which java to use

如正文所言查找 java 命令在哪。

if [-z "$JAVA_HOME"]; then
  JAVA="java"
else
  JAVA="$JAVA_HOME/bin/java"
fi

Memory options

内存配置选项如下:

if [-z "$KAFKA_HEAP_OPTS"]; then
  KAFKA_HEAP_OPTS="-Xmx256M"
fi

-Xmxn:指定内存调配池的最大大小(以字节为单位)。此值的倍数必须大于 2MB,1024 的倍数。

这里设置最大的 HEAP 大小为 256M。

cc_pkg

同样是 jar 包依赖的查找和引入到 ClassPath 当中,这里同样不晓得干啥用的,简略了解是获取必要依赖项即可。

for cc_pkg in "api" "transforms" "runtime" "file" "mirror" "mirror-client" "json" "tools" "basic-auth-extension"
do
  for file in "$base_dir"/connect/${cc_pkg}/build/libs/connect-${cc_pkg}*.jar;
  do
    if should_include_file "$file"; then
      CLASSPATH="$CLASSPATH":"$file"
    fi
  done
  if [-d "$base_dir/connect/${cc_pkg}/build/dependant-libs" ] ; then
    CLASSPATH="$CLASSPATH:$base_dir/connect/${cc_pkg}/build/dependant-libs/*"
  fi
done

Exclude jars not necessary for running commands.

排除命令不须要的 jar 包,比方 test 和 javadoc 等。

regex="(-(test|test-sources|src|scaladoc|javadoc)\.jar|jar.asc)$"
should_include_file() {if [ "$INCLUDE_TEST_JARS" = true]; then
    return 0
  fi
  file=$1
  if [-z "$(echo"$file"| egrep"$regex")" ] ; then
    return 0
  else
    return 1
  fi
}

INCLUDE_TEST_JARS

判断是否开启了蕴含测试的 jar 包。

if [-z "$INCLUDE_TEST_JARS"]; then
  INCLUDE_TEST_JARS=false
fi

写在最初

不得不感叹学无止境,晓得的越多不晓得的也就更多,一个脚本外面竟然有这么多学识,本局部的外围毫无疑问是JVM 的启动参数,其余的参数或者配置以及奇怪的脚本写法看不懂 也没啥关系,这里仅仅对于一些集体关注的外围局部进行介绍,对于一些细枝末节不做过多的查究和钻牛角尖,读者感兴趣能够比照参考资料做更多理解。

正文完
 0