Kafka-Server-start.sh

  if [ $# -lt 1 ];  then   # 提醒命令应用办法 echo "USAGE: $0 [-daemon] server.properties [--override property=value]*" exit 1  fi  base_dir=$(dirname $0)    if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then      export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/log4j.properties"  fi    if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then      export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"  fi    EXTRA_ARGS=${EXTRA_ARGS-'-name kafkaServer -loggc'}    COMMAND=$1  case $COMMAND in    -daemon)      EXTRA_ARGS="-daemon "$EXTRA_ARGS      shift      ;;    *)      ;;  esac    exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
  1. 判断参数有没有,参数个数小于1就提醒用法;
  2. 获取脚本以后门路赋值给变量 base_dir;
  3. 判断日志参数 KAFKA_LOG4J_OPTS 是否为空,为空就给它一个值;
  4. 判断堆参数 KAFKA_HEAP_OPTS是否为空,为空就默认给它赋值为 "-Xmx1G -Xms1G",默认的堆空间指定为1G;
  5. 判断启动命令中第一个参数是否为 -daemon,如果是就以守护过程启动(其实不是,是赋给另一个变量 EXTRA_ARGS);
  6. 执行命令。

最初一个脚本是执行另一个脚本:kafka-run-class.sh,这个脚本的内容比较复杂了。

kafka-run-class.sh

#!/bin/bashif [ $# -lt 1 ];then  echo "USAGE: $0 [-daemon] [-name servicename] [-loggc] classname [opts]"  exit 1fi# CYGWIN == 1 if Cygwin is detected, else 0.if [[ $(uname -a) =~ "CYGWIN" ]]; then  CYGWIN=1else  CYGWIN=0fiif [ -z "$INCLUDE_TEST_JARS" ]; then  INCLUDE_TEST_JARS=falsefi# Exclude jars not necessary for running commands.regex="(-(test|test-sources|src|scaladoc|javadoc)\.jar|jar.asc)$"should_include_file() {  if [ "$INCLUDE_TEST_JARS" = true ]; then    return 0  fi  file=$1  if [ -z "$(echo "$file" | egrep "$regex")" ] ; then    return 0  else    return 1  fi}base_dir=$(dirname $0)/..if [ -z "$SCALA_VERSION" ]; then  SCALA_VERSION=2.13.3  if [[ -f "$base_dir/gradle.properties" ]]; then    SCALA_VERSION=`grep "^scalaVersion=" "$base_dir/gradle.properties" | cut -d= -f 2`  fifiif [ -z "$SCALA_BINARY_VERSION" ]; then  SCALA_BINARY_VERSION=$(echo $SCALA_VERSION | cut -f 1-2 -d '.')fi# run ./gradlew copyDependantLibs to get all dependant jars in a local dirshopt -s nullglobif [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then  for dir in "$base_dir"/core/build/dependant-libs-${SCALA_VERSION}*;  do    CLASSPATH="$CLASSPATH:$dir/*"  donefifor file in "$base_dir"/examples/build/libs/kafka-examples*.jar;do  if should_include_file "$file"; then    CLASSPATH="$CLASSPATH":"$file"  fidoneif [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then  clients_lib_dir=$(dirname $0)/../clients/build/libs  streams_lib_dir=$(dirname $0)/../streams/build/libs  streams_dependant_clients_lib_dir=$(dirname $0)/../streams/build/dependant-libs-${SCALA_VERSION}else  clients_lib_dir=/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs  streams_lib_dir=$clients_lib_dir  streams_dependant_clients_lib_dir=$streams_lib_dirfifor file in "$clients_lib_dir"/kafka-clients*.jar;do  if should_include_file "$file"; then    CLASSPATH="$CLASSPATH":"$file"  fidonefor file in "$streams_lib_dir"/kafka-streams*.jar;do  if should_include_file "$file"; then    CLASSPATH="$CLASSPATH":"$file"  fidoneif [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then  for file in "$base_dir"/streams/examples/build/libs/kafka-streams-examples*.jar;  do    if should_include_file "$file"; then      CLASSPATH="$CLASSPATH":"$file"    fi  doneelse  VERSION_NO_DOTS=`echo $UPGRADE_KAFKA_STREAMS_TEST_VERSION | sed 's/\.//g'`  SHORT_VERSION_NO_DOTS=${VERSION_NO_DOTS:0:((${#VERSION_NO_DOTS} - 1))} # remove last char, ie, bug-fix number  for file in "$base_dir"/streams/upgrade-system-tests-$SHORT_VERSION_NO_DOTS/build/libs/kafka-streams-upgrade-system-tests*.jar;  do    if should_include_file "$file"; then      CLASSPATH="$file":"$CLASSPATH"    fi  done  if [ "$SHORT_VERSION_NO_DOTS" = "0100" ]; then    CLASSPATH="/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs/zkclient-0.8.jar":"$CLASSPATH"    CLASSPATH="/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs/zookeeper-3.4.6.jar":"$CLASSPATH"  fi  if [ "$SHORT_VERSION_NO_DOTS" = "0101" ]; then    CLASSPATH="/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs/zkclient-0.9.jar":"$CLASSPATH"    CLASSPATH="/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs/zookeeper-3.4.8.jar":"$CLASSPATH"  fififor file in "$streams_dependant_clients_lib_dir"/rocksdb*.jar;do  CLASSPATH="$CLASSPATH":"$file"donefor file in "$streams_dependant_clients_lib_dir"/*hamcrest*.jar;do  CLASSPATH="$CLASSPATH":"$file"donefor file in "$base_dir"/tools/build/libs/kafka-tools*.jar;do  if should_include_file "$file"; then    CLASSPATH="$CLASSPATH":"$file"  fidonefor dir in "$base_dir"/tools/build/dependant-libs-${SCALA_VERSION}*;do  CLASSPATH="$CLASSPATH:$dir/*"donefor cc_pkg in "api" "transforms" "runtime" "file" "mirror" "mirror-client" "json" "tools" "basic-auth-extension"do  for file in "$base_dir"/connect/${cc_pkg}/build/libs/connect-${cc_pkg}*.jar;  do    if should_include_file "$file"; then      CLASSPATH="$CLASSPATH":"$file"    fi  done  if [ -d "$base_dir/connect/${cc_pkg}/build/dependant-libs" ] ; then    CLASSPATH="$CLASSPATH:$base_dir/connect/${cc_pkg}/build/dependant-libs/*"  fidone# classpath addition for releasefor file in "$base_dir"/libs/*;do  if should_include_file "$file"; then    CLASSPATH="$CLASSPATH":"$file"  fidonefor file in "$base_dir"/core/build/libs/kafka_${SCALA_BINARY_VERSION}*.jar;do  if should_include_file "$file"; then    CLASSPATH="$CLASSPATH":"$file"  fidoneshopt -u nullglobif [ -z "$CLASSPATH" ] ; then  echo "Classpath is empty. Please build the project first e.g. by running './gradlew jar -PscalaVersion=$SCALA_VERSION'"  exit 1fi# JMX settingsif [ -z "$KAFKA_JMX_OPTS" ]; then  KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false  -Dcom.sun.management.jmxremote.ssl=false "fi# JMX port to useif [  $JMX_PORT ]; then  KAFKA_JMX_OPTS="$KAFKA_JMX_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT "fi# Log directory to useif [ "x$LOG_DIR" = "x" ]; then  LOG_DIR="$base_dir/logs"fi# Log4j settingsif [ -z "$KAFKA_LOG4J_OPTS" ]; then  # Log to console. This is a tool.  LOG4J_DIR="$base_dir/config/tools-log4j.properties"  # If Cygwin is detected, LOG4J_DIR is converted to Windows format.  (( CYGWIN )) && LOG4J_DIR=$(cygpath --path --mixed "${LOG4J_DIR}")  KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:${LOG4J_DIR}"else  # create logs directory  if [ ! -d "$LOG_DIR" ]; then    mkdir -p "$LOG_DIR"  fifi# If Cygwin is detected, LOG_DIR is converted to Windows format.(( CYGWIN )) && LOG_DIR=$(cygpath --path --mixed "${LOG_DIR}")KAFKA_LOG4J_OPTS="-Dkafka.logs.dir=$LOG_DIR $KAFKA_LOG4J_OPTS"# Generic jvm settings you want to addif [ -z "$KAFKA_OPTS" ]; then  KAFKA_OPTS=""fi# Set Debug options if enabledif [ "x$KAFKA_DEBUG" != "x" ]; then    # Use default ports    DEFAULT_JAVA_DEBUG_PORT="5005"    if [ -z "$JAVA_DEBUG_PORT" ]; then        JAVA_DEBUG_PORT="$DEFAULT_JAVA_DEBUG_PORT"    fi    # Use the defaults if JAVA_DEBUG_OPTS was not set    DEFAULT_JAVA_DEBUG_OPTS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=${DEBUG_SUSPEND_FLAG:-n},address=$JAVA_DEBUG_PORT"    if [ -z "$JAVA_DEBUG_OPTS" ]; then        JAVA_DEBUG_OPTS="$DEFAULT_JAVA_DEBUG_OPTS"    fi    echo "Enabling Java debug options: $JAVA_DEBUG_OPTS"    KAFKA_OPTS="$JAVA_DEBUG_OPTS $KAFKA_OPTS"fi# Which java to useif [ -z "$JAVA_HOME" ]; then  JAVA="java"else  JAVA="$JAVA_HOME/bin/java"fi# Memory optionsif [ -z "$KAFKA_HEAP_OPTS" ]; then  KAFKA_HEAP_OPTS="-Xmx256M"fi# JVM performance options# MaxInlineLevel=15 is the default since JDK 14 and can be removed once older JDKs are no longer supportedif [ -z "$KAFKA_JVM_PERFORMANCE_OPTS" ]; then  KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true"fiwhile [ $# -gt 0 ]; do  COMMAND=$1  case $COMMAND in    -name)      DAEMON_NAME=$2      CONSOLE_OUTPUT_FILE=$LOG_DIR/$DAEMON_NAME.out      shift 2      ;;    -loggc)      if [ -z "$KAFKA_GC_LOG_OPTS" ]; then        GC_LOG_ENABLED="true"      fi      shift      ;;    -daemon)      DAEMON_MODE="true"      shift      ;;    *)      break      ;;  esacdone# GC options    GC_FILE_SUFFIX='-gc.log'GC_LOG_FILE_NAME=''if [ "x$GC_LOG_ENABLED" = "xtrue" ]; then  GC_LOG_FILE_NAME=$DAEMON_NAME$GC_FILE_SUFFIX  # The first segment of the version number, which is '1' for releases before Java 9  # it then becomes '9', '10', ...  # Some examples of the first line of `java --version`:  # 8 -> java version "1.8.0_152"  # 9.0.4 -> java version "9.0.4"  # 10 -> java version "10" 2018-03-20  # 10.0.1 -> java version "10.0.1" 2018-04-17  # We need to match to the end of the line to prevent sed from printing the characters that do not match  JAVA_MAJOR_VERSION=$("$JAVA" -version 2>&1 | sed -E -n 's/.* version "([0-9]*).*$/\1/p')  if [[ "$JAVA_MAJOR_VERSION" -ge "9" ]] ; then    KAFKA_GC_LOG_OPTS="-Xlog:gc*:file=$LOG_DIR/$GC_LOG_FILE_NAME:time,tags:filecount=10,filesize=100M"  else    KAFKA_GC_LOG_OPTS="-Xloggc:$LOG_DIR/$GC_LOG_FILE_NAME -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M"  fifi# Remove a possible colon prefix from the classpath (happens at lines like `CLASSPATH="$CLASSPATH:$file"` when CLASSPATH is blank)# Syntax used on the right side is native Bash string manipulation; for more details see# http://tldp.org/LDP/abs/html/string-manipulation.html, specifically the section titled "Substring Removal"CLASSPATH=${CLASSPATH#:}# If Cygwin is detected, classpath is converted to Windows format.(( CYGWIN )) && CLASSPATH=$(cygpath --path --mixed "${CLASSPATH}")# Launch modeif [ "x$DAEMON_MODE" = "xtrue" ]; then  nohup "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp "$CLASSPATH" $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &else  exec "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp "$CLASSPATH" $KAFKA_OPTS "$@"fi

脚本内容很长,然而实际上只有最初一部分才是真正在实现启动操作:

# Launch modeif [ "x$DAEMON_MODE" = "xtrue" ]; then  nohup "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp "$CLASSPATH" $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &else  exec "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp "$CLASSPATH" $KAFKA_OPTS "$@"fi

Launch modes

在脚本最初一段是无关启动形式的提醒。

# Launch modeif [ "x$DAEMON_MODE" = "xtrue" ]; then  nohup "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp "$CLASSPATH" $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &else  exec "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp "$CLASSPATH" $KAFKA_OPTS "$@"fi

这段脚本阐明了之前的一大堆脚本都是为了这里启动赋值进行的一系列操作,这里依据传递参数判断是否守护过程的形式启动。这里以应用比拟多的 守护过程启动形式进行参数介绍(实际上两者差异不算很大)。

KAFKA_HEAP_OPTS

KAFKA_HEAP_OPTS 出自最结尾,判断堆参数 KAFKA_HEAP_OPTS是否为空,为空就默认给它赋值为 "-Xmx1G -Xms1G"。

KAFKA_JVM_PERFORMANCE_OPTS

这个值代表了JVM的启动参数。

# JVM performance options# MaxInlineLevel=15 is the default since JDK 14 and can be removed once older JDKs are no longer supported# MaxInlineLevel=15 是自JDK 14以来的默认值,一旦旧的JDK不再反对,就能够删除。if [ -z "$KAFKA_JVM_PERFORMANCE_OPTS" ]; then  KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true"fiwhile [ $# -gt 0 ]; do  COMMAND=$1  case $COMMAND in    -name)      DAEMON_NAME=$2      CONSOLE_OUTPUT_FILE=$LOG_DIR/$DAEMON_NAME.out      shift 2      ;;    -loggc)      if [ -z "$KAFKA_GC_LOG_OPTS" ]; then        GC_LOG_ENABLED="true"      fi      shift      ;;    -daemon)      DAEMON_MODE="true"      shift      ;;    *)      break      ;;  esacdone

G1垃圾收集器

Kafka默认应用G1的垃圾收集器,自身最低JDK版本要求就是JDK1.8。

-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true

MaxGCPauseMillis

-XX:MaxGCPauseMillis:GC最大的进展毫秒数,暂停工夫默认值200ms,如果设置比这个小的值,G1收集器会尽可能的达到这个预期设置。

因为Kafka是十分激进的高并发分布式音讯队列,为了获取更高的并发,应用20ms的极限值值尽可能的缩小GC工夫,最初用极短GC的代价换取高吞吐,当然后果会导致垃圾回收不洁净。

但Kafka对于JVM自身的堆内存占用并不是很多,默认20ms的进展工夫其实是能够放心使用的。

此外从Kafka的设计来看,更频繁的GC是为了尽可能的触发Full Gc,因为Full Gc是回收Direct Memory的条件,而Kafka大量应用了页缓存进步数据的Log的读写速度,底层用的也是Java的Direct Memory。

InitiatingHeapOccupancyPercent

这个参数实际上出入比拟大,依据源码剖析在JDK8b12版本之后,以及JDK11 之前这个参数和官网的文档形容,这个值的含意是合乎“整堆”来计算是否触发Mixed Gc,然而JDK8b12版本之后更高的补丁,以及JDK11之后就变了,它变成依据老年代占整堆的比重

这样的出入问题源自此参数的源码BUG,这部分波及源码的探讨就不探讨了,具体能够看对于G1收集器参数InitiatingHeapOccupancyPercent的正确认知 - 豆大侠的菜地 (doudaxia.club)这篇大佬的文章剖析。

这里间接给出论断:

  • 如果你应用的JDK版本在8b12之前,XX:InitiatingHeapOccupancyPercent是整个堆使用量与堆总体容量的比值;
  • 如果你应用的JDK版本在8b12之后(包含大版本9、10、11....),那么XX:InitiatingHeapOccupancyPercent老年代大小与堆总体容量的比值这种说法和批改之后的JVM源码合乎。

整体算是一个暗藏已久的BUG,因为G1的垃圾收集器设计角度看,它更关怀的是Old Region占满整个堆空间之前提前尽可能的进行回收,而不是简略的看看残余空间在整个堆空间的占比,因为残余空间不是一个非常牢靠的掂量值。

为了验证上文大佬的说法,集体也去参阅JDK8的Oracle文档:java (oracle.com)

-XX:c=percent    Sets the percentage of the heap occupancy (0 to 100) at which to start a concurrent GC cycle. It is used by garbage collectors that trigger a concurrent GC cycle based on the occupancy of the entire heap, not just one of the generations (for example, the G1 garbage collector).    By default, the initiating value is set to 45%. A value of 0 implies nonstop GC cycles. The following example shows how to set the initiating heap occupancy to 75%:    -XX:InitiatingHeapOccupancyPercent=75

关键字 entire heap,也就是简略的残余空间和整堆的占比。这里同样接着翻阅了一下,直到JDK12版本,这个形容还是和JDK8的版本统一的。直到浏览长期反对的JDK17的文档,发现外面的说法终于变了:

Garbage-First (G1) Garbage Collector (oracle.com

XX:InitiatingHeapOccupancyPercent determines the initial value as a percentage of the size of the current old generation as long as there aren't enough observations to make a good prediction of the Initiating Heap Occupancy threshold. Turn off this behavior of G1 using the option-XX:-G1UseAdaptiveIHOP. In this case, the value of -XX:InitiatingHeapOccupancyPercent always determines this threshold.。
“XX:启动堆占用百分比”将初始值确定为以后老一代大小的百分比,只有没有足够的观测值来很好地预测起始堆占用阈值。
应用选项'-XX:-G1UseAdaptiveIHOP'敞开G1的此行为。在这种状况下-XX:InitiatingHeapOccupancyPercent 启动堆占用百分比'的值始终确定此阈值。

所以这个值的实在含意和应用的JDK版本无关,并且JDK8的后续补丁版本也修复了这个问题,所以最终倡议是降级JDK8的补丁版本,或者应用JDK11之后的版本。

-XX:+ExplicitGCInvokesConcurrent

看似简略的参数,实际上又是暗藏这十分多的“坑”和细节,这里咱们划分更多的大节缓缓细品。

简略了解

这个参数是指通过应用System.gc()申请启用并发 GC 的调用默认禁用。如果没有非凡的利用场景,大部分状况下这个参数都是被倡议禁用的,而并发GC实际上就是CMS的并发回收解决。

集体在官网文档中搜到相似的参数形容:Garbage-First Garbage Collector Tuning (oracle.com)。

Other causes than Allocation Failure for a Full GC typically indicate that either the application or some external tool causes a full heap collection. If the cause is , and there is no way to modify the application sources, the effect of Full GCs can be mitigated by using or let the VM completely ignore them by setting . External tools may still force Full GCs; they can be removed only by not requesting them.System.gc()-XX:+ExplicitGCInvokesConcurrent -XX:+DisableExplicitGC

下面一大段的话粗心指的是:阻止内部调用Full GC(也就是System.gc())要么间接设置-XX:+DisableExplicitGC,要么设置-XX:+ExplicitGCInvokesConcurrent进步强制Full Gc的效率,浏览源码发现这两个参数不能一起开启,因为-XX:+ExplicitGCInvokesConcurrent须要敞开-XX:+DisableExplicitGC参数能力失效。

局部文章也解释仅仅倡议在G1的垃圾收集器中能够应用-XX:+ExplicitGCInvokesConcurrent。其余垃圾收集器不倡议应用。
Kafka官网修复BUG:-XX:+DisableExplicitGC 改为 -XX:+ExplicitGCInvokesConcurrent

为什么两者只能选其一应用,JDK 8 的JVM中存在相似的代码能够给予解释。

bool GenCollectedHeap::should_do_concurrent_full_gc(GCCause::Cause cause) {    // 查看参数 -XX:+DisableExplicitGC 和 -XX:+ExplicitGCInvokesConcurrent  return UseConcMarkSweepGC &&         ((cause == GCCause::_gc_locker && GCLockerInvokesConcurrent) ||         // -XX:+ExplicitGCInvokesConcurrent 须要满足不配置-XX:+DisableExplicitGC的条件,能力断定为true          (cause == GCCause::_java_lang_system_gc && ExplicitGCInvokesConcurrent));}void GenCollectedHeap::collect(GCCause::Cause cause) {    // 查看参数 -XX:+DisableExplicitGC 和 -XX:+ExplicitGCInvokesConcurrent  if (should_do_concurrent_full_gc(cause)) {#ifndef SERIALGC    // mostly concurrent full collection    collect_mostly_concurrent(cause);#else  // SERIALGC    ShouldNotReachHere();#endif // SERIALGC  } else {#ifdef ASSERT    if (cause == GCCause::_scavenge_alot) {      // minor collection only      collect(cause, 0);    } else {      // Stop-the-world full collection      // STW 进行Full Gc      collect(cause, n_gens() - 1);    }#else    // Stop-the-world full collection    collect(cause, n_gens() - 1);#endif  }}

collect里一结尾就有个判断,如果should_do_concurrent_full_gc返回true,那会执行collect_mostly_concurrent做并行的回收。

回到Kafka的服务端参数,KafKa最后的服务端启动脚本中,此参数理论为-XX:+DisableExplicitGC,然而后续被指出会影响间接内存的回收性能,并且很可能会导致间接内存无奈被回收!

为什么会有这么重大 ? 这里先不急着剖析,而是先看看作者的这个issue的提交:

KAFKA-5470: Replace -XX:+DisableExplicitGC with -XX:+ExplicitGCInvokesConcurrent in kafka-run-class by ijuma · Pull Request #3371 · apache/kafka (github.com)

提交者的原话是:

This is important because Bits.reserveMemory calls System.gc() hoping to free native
memory in order to avoid throwing an OutOfMemoryException. This call is currently
a no-op due to -XX:+DisableExplicitGC.

It's worth mentioning that -XX:MaxDirectMemorySize can be used to increase the
amount of native memory available for allocation of direct byte buffers.

简略来说就是Bits.reserveMemory外面会有System.gc()调用,通过程序强制调用Full Gc来回收掉native内存,所以倡议在JVM参数中删掉-XX:+DisableExplicitGC,开启System.gc();并且通过增加-XX:+ExplicitGCInvokesConcurrentSystem.gc()调用效率更高一些。

另外大佬这里还提了一嘴-XX:MaxDirectMemorySize能够用来进步可用于调配间接字节缓冲区的本地内存的数量。
大佬一句话就是一个知识点,牛呀。
Bits#reserveMemory

既然提交者提到了Bits#reserveMemory,这里就顺带贴一下官网jdk8的java.nio.Bits#reserveMemory源码不便了解:

// These methods should be called whenever direct memory is allocated or// freed.  They allow the user to control the amount of direct memory// which a process may access.  All sizes are specified in bytes.//每当调配间接内存或开释。 它们容许用户管制间接内存的数量过程能够拜访的内容。 所有大小均以字节为单位指定。static void reserveMemory(long size, int cap) {    if (!memoryLimitSet && VM.isBooted()) {        maxMemory = VM.maxDirectMemory();        memoryLimitSet = true;    }    // optimist!    if (tryReserveMemory(size, cap)) {        return;    }    final JavaLangRefAccess jlra = SharedSecrets.getJavaLangRefAccess();    // retry while helping enqueue pending Reference objects    // which includes executing pending Cleaner(s) which includes    // Cleaner(s) that free direct buffer memory    while (jlra.tryHandlePendingReference()) {        if (tryReserveMemory(size, cap)) {            return;        }    }    // trigger VM's Reference processing    System.gc();    // a retry loop with exponential back-off delays    // (this gives VM some time to do it's job)    boolean interrupted = false;    try {        long sleepTime = 1;        int sleeps = 0;        while (true) {            if (tryReserveMemory(size, cap)) {                return;            }            if (sleeps >= MAX_SLEEPS) {                break;            }            if (!jlra.tryHandlePendingReference()) {                try {                    Thread.sleep(sleepTime);                    sleepTime <<= 1;                    sleeps++;                } catch (InterruptedException e) {                    interrupted = true;                }            }        }        // no luck        throw new OutOfMemoryError("Direct buffer memory");    } finally {        if (interrupted) {            // don't swallow interrupts            Thread.currentThread().interrupt();        }    }}

咱们通过浏览JDK8的Nio包的这部分用于调配DirectMememory的一段代码,发现每次Direct Mememory进行理论的调配动作之前,都会调用这个办法检测是否有足够空间调配时都被调用,不过外面的逻辑奇奇怪怪的,初看的确有点摸不着头脑。

国外有网友间接痛骂了这一段代码是一坨Shit:java.nio.Bits.reserveMemory uses a lock, calls System.gc, and is generally bad code... (google.com)

1.  ALL memory access requires a lock.  That's evil if you're allocating small chunks.2.  The code to change the reserved memory counters is duplicated twice.  This is a great way to introduce bugs.  (how did this even get approved? do they not do code audits or require that commits be approved?)3.  If you are out of memory we call System.gc... EVIL.  The entire way direct memory is reclaimed via GC is a horrible design.4.  After GC they sleep 100ms.  What's that about?  Why 100ms?  Why not 1ms?  
  1. 所有的内存拜访都须要一个锁。 如果你调配的是小块的内存几乎就是噩梦。
  2. 扭转保留内存计数器的代码反复了两次。 这是个引入谬误的好办法。 (难道他们不进行代码审计或要求提交的代码必须失去批准吗?)
  3. 如果你没有内存了,咱们就调用System.gc... 通过GC回收间接内存的整个形式是一个可怕的设计。
  4. 在GC之后,他们会休眠100ms。 那是什么意思? 为什么是100ms? 为什么不是1ms?

集体并不感冒这些评论,这里拎出System.gc()这行代码来剖析具体用意。要看懂这一行代码的用意,咱们须要理解DirectMemory关联的本机内存是如何清理的,这里就间接给出答案了。

JVM实际上是管不到DirectMemory的,须要依附非凡的形式回收掉DirectMemory:

  • 手动调用unsafe.freeMemory()进行开释,nettyByteBuf.release()就是这种形式实现的;
  • 利用GC机制在GC的过程中主动调用unsafe.freeMemory()开释被援用的间接内存;

这段代码作者的用意显著是显示调用System.gc(),尽可能回收不可达的DirectByteBuffer对象,也只有通过GC才会主动触发unsafe.freeMemory()的调用,开释间接内存。

至于其余代码.....这里不做过多评论。

Fix -XX:+DisableExplicitGC

基于以上种种原因,Kafka官网最终提交了一个Commit修复这个问题:

Fix run class to work with Java 10 and use ExplicitGCInvokesConcurrent by ijuma · Pull Request #1329 · confluentinc/ksql (github.com)

具体的调整细节能够看上面的连贯,读者能够通过比照本人的下载Kafka启动脚本查看是否修复这个问题:

KAFKA-5470: Replace -XX:+DisableExplicitGC with -XX:+ExplicitGCInvokesConcurrent in kafka-run-class by ijuma · Pull Request #3371 · apache/kafka (github.com)

其余参考资料

上面的这些参考资料能够帮忙咱们更深刻的了解-XX:+ExplicitGCInvokesConcurrent参数附带的知识点:

-XX:+ExplicitGCInvokesConcurrent的含意:What is JVM startup parameter: -XX:+ExplicitGCInvokesConcurrent? - yCrash Answers

官网的G1文档:Java HotSpot Garbage Collection (oracle.com)

为什么仅限G1能够开启此参数来进行健康检查,其余垃圾收集器倡议敞开此参数:Health Check: Explicit Garbage Collection | Jira | Atlassian Documentation

JVM源码剖析之SystemGC齐全解读 | HeapDump性能社区

  • 为什么不能同时设置-XX:+DisableExplicitGC 以及 -XX:+ExplicitGCInvokesConcurrent
  • 为什么CMS GC下-XX:+ExplicitGCInvokesConcurrent这个参数加了之后会比真正的Full GC好?
  • 它如何做到暂停整个过程?
  • 堆外内存调配为什么有时候要配合System.gc?
  • Netty回收堆外内存的策略又是如何?
小结

笔者也没有想到一个简略的参数能牵扯出这么多内容,这里做一个大略的总结:

  • Kafka官网已经禁用过System.gc()
  • 前面有大神剖析了脚本和JDK的NIO源码,发现禁用System.gc()这不是有问题嘛,你Kafka大量应用Java的间接内存,间接内存靠个别的Gc是回收不掉的,只能靠Ful Gc顺带回收,JDK官网代码又是靠频繁调用System.gc()强制腾出间接内存空间的,你System.gc()禁用了不是“找死”么,于是连忙解释了一波 Bits#reserveMemory写的“垃圾代码”来证实本人的观点,而后倡议启用System.gc(),并且为了进步Full Gc效率应用-XX:+ExplicitGCInvokesConcurrent
  • 官网发现这个问题连忙修复了一版并且提交了issue。

MaxInlineLevel

java 有一个参数 -XX:MaxInlineLevel(JDK14之前默认值为 9),这个值在JDK14之后默认值改为15。这个值的批改能够参考JDK官网的申明 https://bugs.openjdk.org/browse/JDK-8234863。

上面的图Oracle官网对于JDK14版本之后批改MaxInlineLevel=15的Push。

上面简明扼要源自网上收集的材料和集体了解,其实简略了解为古代硬件资源足以反对 -XX:MaxInlineLevel设置为15,更大的内联深度能够让JIT编译出更多的本地代码从而进步Java代码的运行效率即可。

如果你的服务器还是古旧的四五年前的机器,或者生产机器的确渣的能够,那么还是倡议把这个参数 -XX:MaxInlineLevel改回 9 比拟得当。

简明扼要的局部:

链接https://bugs.openjdk.org/browse/JDK-8234863的申明指出,15这个值在scala上的性能测试是被认为最优后果。这个值在古代处理器速度以及性能优化较好的明天最为适合,默认值9这个数字显得十分过期

Kafka作为激进压迫机器性能的榜样,也听从JDK官网的改变默认所有版本的JDK对立应用15这个默认值。

这里额定插一嘴,集体认为实际上这个值Oracle官网在JDK11就能够批改为15。

MaxInlineLevel自身的判断逻辑仿佛更引起宽广程序员的关注,StackFlow上有一篇对于这个参数的探讨:https://stackoverflow.com/questions/32503669/why-does-the-jvm-have-a-maximum-inline-depth 比拟有意思。

在评论中有网友指出在比拟低的JDK8版本当中,MaxRecursiveInlineLevel对间接间接的递归调用都进行计数,编译后的代码应该在运行时放弃对整个内联树的跟踪(以便可能解压和去优化)。紧接着是其他人的一些个人观点,没人接这个人话茬=-=,难堪。

持续翻阅,上面的评论有一位大佬解释了为什么会呈现MaxInlineLevel这个参数,简略易懂这里就间接贴过来了:

One reason is also that the inlining itself in the HotSpot JVM is implemented with recursion. Every time inlining of a method is started a new context is created on the native stack. Allowing an unlimited depth would eventually make the JIT-compiler crash when it runs out of stack.

(旧版激进的限度办法内联深度),其中一个起因是HotSpot JVM的内联自身是用递归实现的。每次对一个办法进行内联时,都会在本地堆栈中创立一个新的上下文。如果容许有限的深度,最终会使JIT-编译器在堆栈耗尽时解体。

在过来硬件资源缓和的状况下,适度的办法内联有可能会呈现比拟深的堆栈调用,非常耗费程序内存,然而古代内存动不动就是32,64,128G 的明天,加上处理器的外围数量上来了之后,扩充默认的办法内联深度参数值的确十分有必要。

办法内联是JVM比拟底层的优化,能够通过周大神的《深刻了解JVM虚拟机第三版》理解。

如果不懂办法内联间接无脑设置MaxInlineLevel=15即可,没有为什么,官网都曾经在高版本JDK批改了默认值,JDK8忠诚粉丝天然也能够这么干。

jdk14 hotspot 依赖的调整日志:https://hg.openjdk.org/jdk8u/jdk8u/hotspot/

-Djava.awt.headless=true

这个参数比拟奇怪,然而实际上在SpringBoot源码中也有同样的写法。

这算是一个不太被关注的优化参数,简略了解是-Djava.awt.headless=true能够屏蔽掉一些不必要的外置设施影响,告知程序以后没有外置设施,尽可能的让程序底层本人模仿,比方打印从图形显示变为控制台打印。

又是牵扯内容很多的一个点,具体解释能够看这篇文章:[[【Java】The Java Headless Mode]],篇幅无限,这里就不多解释了。

KAFKA_GC_LOG_OPTS

见名知意,就是JVM的日志参数配置,Kafka最终的日志格局为:XXX-gc.log,日志配置这一块和大部分以JAVA为底层的开源组件大差不差,简略的扫一眼差不多了。

# Log directory to use# 获取log_dir,如果没配置就那 $base_dir 环境变量if [ "x$LOG_DIR" = "x" ]; then  # base_dir=$(dirname $0)/..  LOG_DIR="$base_dir/logs"fi# GC options    GC_FILE_SUFFIX='-gc.log'GC_LOG_FILE_NAME=''if [ "x$GC_LOG_ENABLED" = "xtrue" ]; then  GC_LOG_FILE_NAME=$DAEMON_NAME$GC_FILE_SUFFIX  # The first segment of the version number, which is '1' for releases before Java 9  # it then becomes '9', '10', ...  # Some examples of the first line of `java --version`:  # 8 -> java version "1.8.0_152"  # 9.0.4 -> java version "9.0.4"  # 10 -> java version "10" 2018-03-20  # 10.0.1 -> java version "10.0.1" 2018-04-17  # We need to match to the end of the line to prevent sed from printing the characters that do not match  JAVA_MAJOR_VERSION=$("$JAVA" -version 2>&1 | sed -E -n 's/.* version "([0-9]*).*$/\1/p')  if [[ "$JAVA_MAJOR_VERSION" -ge "9" ]] ; then    KAFKA_GC_LOG_OPTS="-Xlog:gc*:file=$LOG_DIR/$GC_LOG_FILE_NAME:time,tags:filecount=10,filesize=100M"  else    KAFKA_GC_LOG_OPTS="-Xloggc:$LOG_DIR/$GC_LOG_FILE_NAME -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M"  fifi

JAVA_MAJOR_VERSION就是通过正则去除JDK的主版本号。

  • 如果主版本号大于或者等于JDK9,就应用JDK9新增的 -xlog:gc* 对立的日志参数作为启动参数,
  • 如果是JDK8之前的版本,就须要用一大堆旧版的日志参数,学习和应用老本比拟大:

    • GCLogFileSize=100M,限度GC日志文件大小为100M。
    • NumberOfGCLogFiles=10,容许存在的GC日志文件数量为10个。
    • UseGCLogFileRotation,让GC日志一直循环,如果最初一个GC日志写满,将会从第一个文件从新开始写入
    • -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps 是旧版本凌乱的GC参数配置诞生的恶果,这些参数在JDK9之后被通通被-xlog:gc* 代替。

      • -verbose:gc -XX:+PrintGCDetails这两个参数常常在低版本JDK一起呈现,最大的区别是前者是稳固版本,后者则是被认为是不稳固的日志启动参数(强制和其余GC参数配合呈现显得不稳固)。
      • -XX:+PrintGCDateStamps:每行结尾显示以后相对的日期及工夫,打印GC产生时的工夫戳,搭配 -XX:+PrintGCDetails 应用,不能够独立应用。
      • -XX:+PrintGCTimeStamps 自从JVM启动以来的工夫。

-XX:+PrintGCDateStamps-XX:+PrintGCTimeStamps能够间接看上面的例子比照:

-XX:+PrintGCDateStamps日志输入示例:2014-01-03T12:08:38.102-0100: [GC 66048K->53077K(251392K), 0,0959470 secs]2014-01-03T12:08:38.239-0100: [GC 119125K->114661K(317440K), 0,1421720 secs]-XX:+PrintGCTimeStamps日志输入示例:0,185: [GC 66048K->53077K(251392K), 0,0977580 secs]0,323: [GC 119125K->114661K(317440K), 0,1448850 secs]
因为-XX:+PrintGCDetails被标记为manageable,所以能够通过如下三种形式批改:
1、com.sun.management.HotSpotDiagnosticMXBean API
2、JConsole
3、jinfo -flag

最初再把英文正文局部简略翻译一下:

  1. 第一个参数如果是1结尾,代表是JDK9之后的版本。
  2. java --version产生的后果如下:

    • 8 -> java version "1.8.0_152"
    • 9.0.4 -> java version "9.0.4"
    • 10 -> java version "10" 2018-03-20
    • 10.0.1 -> java version "10.0.1" 2018-04-17
  3. 通过正则表达式匹配到行尾,以避免sed打印出不匹配的字符

KAFKA_JMX_OPTS

JMX全称Java Management Extensions, 为Java利用提供治理扩大性能。在JDK 5的时候引入,Kafka设置启动参数让Kafka应用程序取得JMX近程调用的反对。

# JMX settingsif [ -z "$KAFKA_JMX_OPTS" ]; then  KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false  -Dcom.sun.management.jmxremote.ssl=false "fi

KAFKA_JMX_OPTS对应的value的含意参考自:https://www.jianshu.com/p/414647c1179e,此处列举一些无关JMX的相干参数:

参数名类型形容
-Dcom.sun.management.jmxremote布尔是否反对近程JMX拜访,默认true
-Dcom.sun.management.jmxremote.port数值监听端口号,不便近程拜访
-Dcom.sun.management.jmxremote.authenticate布尔是否须要开启用户认证,默认开启
-Dcom.sun.management.jmxremote.ssl布尔是否对连贯开启SSL加密,默认开启
-Dcom.sun.management.jmxremote.access.file门路对拜访用户的权限受权的文件的门路,默认门路JRE_HOME/lib/management/jmxremote.access
-Dcom.sun.management.jmxremote. password.file门路设置拜访用户的用户名和明码,默认门路JRE_HOME/lib/management/ jmxremote.password

KAFKA_LOG4J_OPTS

log4j的日志配置地址。

if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then      export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/log4j.properties"  fi  

配置含意不须要记忆,在浏览的时候查阅相干材料即可:https://www.jianshu.com/p/ccafda45bcea,这里间接贴过来作为正文局部供读者参考。

# 根Log# 默认日志等级为INFO级别# NFO、WARN、ERROR和FATAL级别的日志信息都会输入# 日志最终输入到kafkaAppenderlog4j.rootLogger=INFO, stdout, kafkaAppender  # 控制台配置log4j.appender.stdout=org.apache.log4j.ConsoleAppender  # 布局模式应用能够灵便模式log4j.appender.stdout.layout=org.apache.log4j.PatternLayout  # 日志打印格局# [%d] 输入日志工夫点的日期或工夫,默认格局为ISO8601,也能够在其后指定格局,如:%d{yyyy/MM/dd HH:mm:ss,SSS}。# %m::输入代码中指定的具体日志信息。# %p:输入日志信息的优先级,即DEBUG,INFO,WARN,ERROR,FATAL。# %c:输入日志信息所属的类目,通常就是所在类的全名。# %n:输入一个回车换行符,Windows平台为"\r\n",Unix平台为"\n"。log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n  # DailyRollingFileAppender Kafka默认服务端日志log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender  log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM-dd-HH  # server.log 存储地位log4j.appender.kafkaAppender.File=${kafka.logs.dir}/server.log  log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout  log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n  # DailyRollingFileAppender 状态机变更日志log4j.appender.stateChangeAppender=org.apache.log4j.DailyRollingFileAppender  # 依照每小时产生一个日志的形式log4j.appender.stateChangeAppender.DatePattern='.'yyyy-MM-dd-HH   log4j.appender.stateChangeAppender.File=${kafka.logs.dir}/state-change.log  log4j.appender.stateChangeAppender.layout=org.apache.log4j.PatternLayout  log4j.appender.stateChangeAppender.layout.ConversionPattern=[%d] %p %m (%c)%n  # 申请日志log4j.appender.requestAppender=org.apache.log4j.DailyRollingFileAppender  log4j.appender.requestAppender.DatePattern='.'yyyy-MM-dd-HH  log4j.appender.requestAppender.File=${kafka.logs.dir}/kafka-request.log  log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout  log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n  # Log清理日志log4j.appender.cleanerAppender=org.apache.log4j.DailyRollingFileAppender  log4j.appender.cleanerAppender.DatePattern='.'yyyy-MM-dd-HH  log4j.appender.cleanerAppender.File=${kafka.logs.dir}/log-cleaner.log  log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout  log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n  # Controller 日志log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender  log4j.appender.controllerAppender.DatePattern='.'yyyy-MM-dd-HH  log4j.appender.controllerAppender.File=${kafka.logs.dir}/controller.log  log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout  log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n  # 验证日志log4j.appender.authorizerAppender=org.apache.log4j.DailyRollingFileAppender  log4j.appender.authorizerAppender.DatePattern='.'yyyy-MM-dd-HH  log4j.appender.authorizerAppender.File=${kafka.logs.dir}/kafka-authorizer.log  log4j.appender.authorizerAppender.layout=org.apache.log4j.PatternLayout  log4j.appender.authorizerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n    # Change the line below to adjust ZK client logging  # 批改上面的日志管制ZK的日志输入log4j.logger.org.apache.zookeeper=INFO    # Change the two lines below to adjust the general broker logging level (output to server.log and stdout)  # 更改上面两行以调整个别代理日志记录级别(输入到 server.log 和 stdout)log4j.logger.kafka=INFO  log4j.logger.org.apache.kafka=INFO    # Change to DEBUG or TRACE to enable request logging  # 批改日志级别为 DEBUG和TRACE获取申请日志log4j.logger.kafka.request.logger=WARN, requestAppender  log4j.additivity.kafka.request.logger=false    # Uncomment the lines below and change log4j.logger.kafka.network.RequestChannel$ to TRACE for additional output  # 勾销正文上面的行并将 log4j.logger.kafka.network.RequestChannel$ 更改为 TRACE 以取得额定的输入# related to the handling of requests  # 与申请的解决相干#log4j.logger.kafka.network.Processor=TRACE, requestAppender  #log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender  #log4j.additivity.kafka.server.KafkaApis=false  log4j.logger.kafka.network.RequestChannel$=WARN, requestAppender  log4j.additivity.kafka.network.RequestChannel$=false    log4j.logger.kafka.controller=TRACE, controllerAppender  log4j.additivity.kafka.controller=false    log4j.logger.kafka.log.LogCleaner=INFO, cleanerAppender  log4j.additivity.kafka.log.LogCleaner=false    log4j.logger.state.change.logger=INFO, stateChangeAppender  log4j.additivity.state.change.logger=false    # Access denials are logged at INFO level, change to DEBUG to also log allowed accesses # 回绝拜访记录在 INFO 级别,更改为 DEBUG 以记录容许的拜访log4j.logger.kafka.authorizer.logger=INFO, authorizerAppender  log4j.additivity.kafka.authorizer.logger=false

KAFKA_OPTS

KAFKA_OPTS 能够在这里设置本人的想要的通用配置:

# Generic jvm settings you want to addif [ -z "$KAFKA_OPTS" ]; then  KAFKA_OPTS=""fi

UPGRADE_KAFKA_STREAMS_TEST_VERSION

变量名称翻译过去是“降级kafka流的测试版本”,这里大抵的意思是取出版本号进行一些判断之后设置到ClassPath当中。

说实话这部分内容看不太懂,然而不算是非常重要的货色,能够当前深刻之后回来理解,这里间接遗记这个设置即可。

if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then  for file in "$base_dir"/streams/examples/build/libs/kafka-streams-examples*.jar;  do    if should_include_file "$file"; then      CLASSPATH="$CLASSPATH":"$file"    fi  doneelse  VERSION_NO_DOTS=`echo $UPGRADE_KAFKA_STREAMS_TEST_VERSION | sed 's/\.//g'`  SHORT_VERSION_NO_DOTS=${VERSION_NO_DOTS:0:((${#VERSION_NO_DOTS} - 1))} # remove last char, ie, bug-fix number  for file in "$base_dir"/streams/upgrade-system-tests-$SHORT_VERSION_NO_DOTS/build/libs/kafka-streams-upgrade-system-tests*.jar;  do    if should_include_file "$file"; then      CLASSPATH="$file":"$CLASSPATH"    fi  done  if [ "$SHORT_VERSION_NO_DOTS" = "0100" ]; then    CLASSPATH="/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs/zkclient-0.8.jar":"$CLASSPATH"    CLASSPATH="/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs/zookeeper-3.4.6.jar":"$CLASSPATH"  fi  if [ "$SHORT_VERSION_NO_DOTS" = "0101" ]; then    CLASSPATH="/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs/zkclient-0.9.jar":"$CLASSPATH"    CLASSPATH="/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs/zookeeper-3.4.8.jar":"$CLASSPATH"  fifi

CLASSPATH

运行 ./gradlew copyDependantLibs 来获取本地目录下的所有依赖性jar。

留神这里划分了很多个子模块,所以应用了for循环加载到CLASSPATH当中,这会导致最终产生的命令会十分长。

# run ./gradlew copyDependantLibs to get all dependant jars in a local dirshopt -s nullglobif [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then  for dir in "$base_dir"/core/build/dependant-libs-${SCALA_VERSION}*;  do    CLASSPATH="$CLASSPATH:$dir/*"  donefifor file in "$base_dir"/examples/build/libs/kafka-examples*.jar;do  if should_include_file "$file"; then    CLASSPATH="$CLASSPATH":"$file"  fidone

集体尝试了一下正文介绍的gradle copyDependantLibs命令,本地执行后果如下,这个命令会在对应的模块构建依赖jar包:

$ gradle copyDependantLibs> Configure project :Building project 'core' with Scala version 2.13.3Building project 'streams-scala' with Scala version 2.13.3Deprecated Gradle features were used in this build, making it incompatible with Gradle 7.0.Use '--warning-mode all' to show the individual deprecation warnings.See https://docs.gradle.org/6.6.1/userguide/command_line_interface.html#sec:command_line_warningsBUILD SUCCESSFUL in 2s55 actionable tasks: 3 executed, 52 up-to-date

集体是win11的电脑,通过wox查找dependant-libs后果如下:

对应的一堆依赖jar包

CONSOLE_OUTPUT_FILE

日志的打印输出地址文件地址设置,留神不是GC的日志。

  case $COMMAND in    -name)      DAEMON_NAME=$2      CONSOLE_OUTPUT_FILE=$LOG_DIR/$DAEMON_NAME.out      shift 2      ;;

这里翻阅了一些无关shift的材料:

对于Shift的作用能够参考:https://ss64.com/bash/shift.html。Linux中通过help shift查看使用手册,然而会发现写的比拟潦草和形象。
shift: shift [n]    Shift positional parameters.        Rename the positional parameters $N+1,$N+2 ... to $1,$2 ...  If N is    not given, it is assumed to be 1.        Exit Status:    Returns success unless N is negative or greater than $#

比方上面的程序:

#! /bin/bashecho $1 echo $2shift 1echo $1echo $2# 输入后果[zxd@localhost ~]$ ./test.sh 1 2 3 5 61223

shift 1 执行之后会弹出第一个参数,之后的运行参数会往前“推动”,$1变为$2的值,$2变为$3的值,以此类推。

& 后盾启动和nohup挂起过程

开端局部是设置ClaassPath,用户本人自定义参数以及把规范输出和输入重定向到同一个地位,最初就是当前台模式启动并且最终通过nohup挂起整个过程。

-cp "$CLASSPATH" $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &

这段脚本最后面把代替参数意义进行了替换。-cp是nohup命令的参数,接着是把输入的后果全副重定向到规范输入当中,这个地址对应CONSOLE_OUTPUT_FILE

上面了解最初局部的nohup和&2>&1/dev/null这几个常见的服务端脚本启动参数的含意。

nohup和&

nohup:nohup指令会疏忽所有挂断(SIGHUP)信号不挂断的运行。留神nohup命令自身并没有后盾运行的性能,须要配合&应用。它的实现原理是让命令不间断的运行实现挂机的成果。

& 是指在后盾运行,但当用户退出(解除挂起)的时候,命令主动也跟着退出,nohup&这两个指令通常会放到一起应用。

/dev/null

这个空间属于Linux的一块非凡空间,UNIX零碎中,它被称为空设施。以下内容摘自维基百科:

/dev/null(或称空设施)在类Unix零碎中是一个非凡的设施文件,它抛弃所有写入其中的数据(但报告写入操作胜利),读取它则会立刻失去一个EOF[1]。

在程序员行话,尤其是Unix行话中,/dev/null被称为比特桶或者黑洞

2>&1的问题

后面的第一个数字2通常对应上面几种含意:

  • 0 – stdin (standard input) 规范输出
  • 1 – stdout (standard output) 规范输入
  • 2 – stderr (standard error) 规范谬误输入

\> 是重定向符号,而数字2的含意是规范谬误输入,&1指的就是规范输入,三个符号组合到一起就是把规范谬误输入输出重定向到规范输入当中,这里能够了解为“合流”。

留神命令2>&12>1是存在区别的,这里&不能丢,后者的1代表输入代表谬误重定向到一个文件1,不代表规范输入,只有&1才代表规范输入。

如果想要抛弃所有的规范谬误输入和规范输入后果,上面是一个不错的例子:

nohup python3 getfile.py > /dev/null 2>&1 &

如果想要写入到指定的地位,上面是又一个不错的例子:

nohup python3 getfile.py > test.log 2>&1 &

最初是理论一点的例子:

0 9 \* \* \* /usr/bin/python3 /opt/getFile.py > /opt/file.log 2>&1
下面的命令含意是放在crontab中的定时工作,每天9:00启动这个python的脚本,并把执行后果写入日志文件file.log中

exec 运行

如果不是守护过程的执行,则是应用exec在以后的shell中进行失常模式启动,此时整个shell会挂起运行kafka服务端。

  exec "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp "$CLASSPATH" $KAFKA_OPTS "$@"

其余内容

Lauch modes 应用到的变量设置蕴含了启动整个Kafka服务端的外围局部,上面再列觉其余的依赖配置以及“辅助”内容。

Scala 版本抉择

Kafka是应用Java和Scala混合编写的,依据不同的Kafka版本须要不同版本的Scala版本反对,这里官网做了一个版本抉择强制判断抉择出最合适的Scala。

if [ -z "$SCALA_VERSION" ]; then  SCALA_VERSION=2.13.3  if [[ -f "$base_dir/gradle.properties" ]]; then    SCALA_VERSION=`grep "^scalaVersion=" "$base_dir/gradle.properties" | cut -d= -f 2`  fifi

gradle.properties

Kafka 我的项目是基于gradle构建的,gradle 集体平时根本没啥接触机会,这里做一个大抵配置理解。

group=org.apache.kafka  # NOTE: When you change this version number, you should also make sure to update  # the version numbers in  #  - docs/js/templateData.js  #  - tests/kafkatest/__init__.py  #  - tests/kafkatest/version.py (variable DEV_VERSION)  #  - kafka-merge-pr.py  version=2.7.2  scalaVersion=2.13.3  task=build  org.gradle.jvmargs=-Xmx2g -Xss4m -XX:+UseParallelGC

gradle这里调配的是2g的堆内存,Xss4m每个线程的堆栈大小为4M,最初是应用ParallelGC垃圾收集器,也是JDK8的默认垃圾收集器。

DEBUG模式

如果在启动参数外面设置了KAFKA_DEBUG,就能够开启DEBUG模式。

# Set Debug options if enabledif [ "x$KAFKA_DEBUG" != "x" ]; then    # Use default ports    DEFAULT_JAVA_DEBUG_PORT="5005"    if [ -z "$JAVA_DEBUG_PORT" ]; then        JAVA_DEBUG_PORT="$DEFAULT_JAVA_DEBUG_PORT"    fi    # Use the defaults if JAVA_DEBUG_OPTS was not set    DEFAULT_JAVA_DEBUG_OPTS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=${DEBUG_SUSPEND_FLAG:-n},address=$JAVA_DEBUG_PORT"    if [ -z "$JAVA_DEBUG_OPTS" ]; then        JAVA_DEBUG_OPTS="$DEFAULT_JAVA_DEBUG_OPTS"    fi    echo "Enabling Java debug options: $JAVA_DEBUG_OPTS"    KAFKA_OPTS="$JAVA_DEBUG_OPTS $KAFKA_OPTS"fi

咱们须要理解的是 JAVA_DEBUG_OPTS 命令的含意。起初尽管不是很懂上面的参数含意,然而能够晓得是JAVA调试应用程序用的。

-agentlib:jdwp=transport=dt_socket,server=y,suspend=${DEBUG_SUSPEND_FLAG:-n},address=$JAVA_DEBUG_PORT

咱们调试程序更多是在IDE外面,上面的内容来自网络材料整合参考和了解:

Debugging Java applications) 这篇文章大略介绍了如何在JVM启动之后调试JAVA程序,以及如何在应用JDK调试应用程序。

若要调试 Java 过程,能够应用 Java 调试器 (JDB) 利用过程或其余调试器,这些调试器通过应用 SDK 为操作系统提供的 Java™ 平台调试器体系结构 (JPDA) 进行通信。

在Linux零碎当中进行JAVA过程调试能够应用上面的命令。对于咱们来说这些写法照着写就行,不须要过分查究具体的含意。

java -agentlib:jdwp=transport=dt_socket,server=y,address=_<port>_ <class>

调试近程服务器运行的JAVA应用程序,在Window中和Linux中调试形式如下:

  • On Windows systems:
jdb -connect com.sun.jdi.SocketAttach:hostname=<host>,port=<port>
  • On other systems:
jdb -attach <host>:<port>

此外Stack-Flow上还有一个写的更棒的帖子,这篇帖子的参数和Kafka的脚本局部基本一致了。

debugging - What are Java command line options to set to allow JVM to be remotely debugged? - Stack Overflow

Before Java 5.0, use -Xdebug and -Xrunjdwp arguments. These options will still work in later versions, but it will run in interpreted mode instead of JIT, which will be slower.

JDK5之前的版本这里能够间接疏忽。(晓得了也没啥用途)

From Java 5.0, it is better to use the -agentlib:jdwp single option:

JDK5之后应用上面的命令格局:

-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=1044

Options on -Xrunjdwp or agentlib:jdwp arguments are :

  • transport=dt_socket : means the way used to connect to JVM (socket is a good choice, it can be used to debug a distant computer)
  • address=8000 : TCP/IP port exposed, to connect from the debugger,
  • suspend=y : if 'y', tell the JVM to wait until debugger is attached to begin execution, otherwise (if 'n'), starts execution right away.
  • transport=dt_socket :示意用于连贯JVM的形式(socket是一个不错的抉择,它能够用来调试近程计算机)
  • address=8000 : TCP / IP端口公开,从调试器连贯。
  • suspend=y :如果为“y”,则通知 JVM 等到连贯调试器后再开始执行,否则(如果为“n”),立刻开始执行。

最初比照一下Kafka的参数,恍然大悟。

-agentlib:jdwp=transport=dt_socket,server=y,suspend=${DEBUG_SUSPEND_FLAG:-n},address=$JAVA_DEBUG_PORT

Which java to use

如正文所言查找java命令在哪。

if [ -z "$JAVA_HOME" ]; then  JAVA="java"else  JAVA="$JAVA_HOME/bin/java"fi

Memory options

内存配置选项如下:

if [ -z "$KAFKA_HEAP_OPTS" ]; then  KAFKA_HEAP_OPTS="-Xmx256M"fi
-Xmxn:指定内存调配池的最大大小(以字节为单位)。此值的倍数必须大于 2MB,1024 的倍数。

这里设置最大的HEAP大小为256M。

cc_pkg

同样是jar包依赖的查找和引入到ClassPath当中,这里同样不晓得干啥用的,简略了解是获取必要依赖项即可。

for cc_pkg in "api" "transforms" "runtime" "file" "mirror" "mirror-client" "json" "tools" "basic-auth-extension"do  for file in "$base_dir"/connect/${cc_pkg}/build/libs/connect-${cc_pkg}*.jar;  do    if should_include_file "$file"; then      CLASSPATH="$CLASSPATH":"$file"    fi  done  if [ -d "$base_dir/connect/${cc_pkg}/build/dependant-libs" ] ; then    CLASSPATH="$CLASSPATH:$base_dir/connect/${cc_pkg}/build/dependant-libs/*"  fidone

Exclude jars not necessary for running commands.

排除命令不须要的jar包,比方test和javadoc等。

regex="(-(test|test-sources|src|scaladoc|javadoc)\.jar|jar.asc)$"should_include_file() {  if [ "$INCLUDE_TEST_JARS" = true ]; then    return 0  fi  file=$1  if [ -z "$(echo "$file" | egrep "$regex")" ] ; then    return 0  else    return 1  fi}

INCLUDE_TEST_JARS

判断是否开启了蕴含测试的jar包。

if [ -z "$INCLUDE_TEST_JARS" ]; then  INCLUDE_TEST_JARS=falsefi

写在最初

不得不感叹学无止境,晓得的越多不晓得的也就更多,一个脚本外面竟然有这么多学识,本局部的外围毫无疑问是JVM的启动参数,其余的参数或者配置以及奇怪的脚本写法看不懂 也没啥关系,这里仅仅对于一些集体关注的外围局部进行介绍,对于一些细枝末节不做过多的查究和钻牛角尖,读者感兴趣能够比照参考资料做更多理解。