共计 26795 个字符,预计需要花费 67 分钟才能阅读完成。
本文转载自公众号 StreamCloudNative,作者薛松,就任于新大陆软件,负责高级软件工程师。
编辑:鸡排,StreamNative。
对于 Apache Pulsar
Apache Pulsar 是 Apache 软件基金会顶级我的项目,是下一代云原生分布式音讯流平台,集音讯、存储、轻量化函数式计算为一体,采纳计算与存储拆散架构设计,反对多租户、长久化存储、多机房跨区域数据复制,具备强一致性、高吞吐、低延时及高可扩展性等流数据存储个性。
以后已有泛滥国内外大型互联网和传统行业公司采纳 Apache Pulsar,案例散布在人工智能、金融、电信运营商、直播与短视频、物联网、批发与电子商务、在线教育等多个行业,如美国有线电视网络巨头 Comcast、Yahoo!、腾讯、中国电信、中国移动、BIGO、VIPKID 等。
背景介绍
Apache Pulsar 作为一个云原生的分布式音讯零碎,包含 Zookeeper、bookie、broker、functions-worker、proxy 等多个组件,并且所有的组件以分布式的形式部署在多台主机上,因而每个组件的日志文件也就扩散在多台主机上,当组件呈现问题时,因为日志比拟扩散,想要查看各个服务是否有报错信息,须要挨个服务去排查,比拟麻烦,通常咱们的做法是间接对日志文件进行 grep、awk 等命令就能够取得想要的信息。然而,随着利用和服务体量的减少,撑持的节点也随之减少,那么传统办法暴露出很多问题,比方:效率低下、日志量太大如何归档、文本搜寻太慢怎么办、如何多维度查问等等。所以咱们心愿通过对日志进行聚合、监控,可能疾速的找到 Pulsar 各个服务的报错信息,疾速的排查,使得运维更加具备目的性、针对性和间接性。
为了解决日志检索的问题,咱们团队思考应用集中式日志收集零碎,将 Pulsar 所有节点上的日志对立收集,治理,拜访。
一个残缺的集中式日志零碎,须要蕴含以下几个次要特点:
- 收集-可能采集多种起源的日志数据;
- 传输-可能稳固的把日志数据传输到地方零碎;
- 存储-如何存储日志数据;
- 剖析-能够反对 UI 剖析;
- 正告-可能提供错误报告,监控机制.
ELK 提供了一整套解决方案,并且都是开源软件,之间互相配合应用,完满连接,高效的满足了很多场合的利用,是目前支流的一种日志零碎。咱们公司领有自研的大数据管理平台,通过大数据管理平台部署和治理 ELK,并且在生产零碎中曾经应用 ELK 为多个业务零碎提供了撑持服务。ELK 是三个开源软件的缩写,别离示意:Elasticsearch、Logstash、Kibana , 它们都是开源软件,最新版本曾经改名为 Elastic Stack,并新增了 Beats 我的项目,其中包含 FileBeat,它是一个轻量级的日志收集解决工具 (Agent),Filebeat 占用资源少,适宜于在各个服务器上收集日志后传输给 Logstash。
在上图中能够看到,如果 Pulsar 应用这种日志采集模式存在两个问题:
- 部署了 Pulsar 服务的主机必须部署一套 Filebeat 服务;
- Pulsar 服务的日志必须以文件的形式落一次磁盘,占用了主机磁盘的 IO。
为此,咱们思考 Apache Pulsar 基于 Log4j2+Kafka+ELK 实现日志的疾速检索,Log4j2 默认反对将日志发送到 Kafka 的性能,应用 Kafka 自带的 Log4j2Appender,在 Log4j2 配置文件中进行相应的配置,即可实现将 Log4j2 产生的日志实时发送至 Kafka 中。
如下图所示:
施行过程
上面以 Pulsar 2.6.2 版本为例, 介绍 Apache Pulsar 基于 Log4j2+Kafka+ELK 实现日志的疾速检索的解决方案的具体的施行过程。
一、筹备工作
首先须要确定的是在 Kibana 中用于检索日志的字段,能够对这些字段聚合、多维度查问,而后,Elasticsearch 依据检索字段进行分词,并创立索引。
如上图所示:咱们将对 Pulsar 的日志建设了 8 个检索字段,别离是:集群名、主机名、主机 IP、组件名、日志内容、零碎工夫、日志级别、集群实例。
二、施行过程
阐明:为了保障 Pulsar 原生的配置文件和脚本文件的构造不被毁坏,咱们通过增加新的配置文件和脚本文件来实现此计划。
1. 增加配置文件
在 {PULSAR_HOME}/conf 目录中增加以下两个配置文件:
1)logenv.sh 该文件是将 Pulsar 组件启动时须要的 JVM 选项以配置的形式传递到 Pulsar 服务的 Java 过程中,内容示例如下:
KAFKA_CLUSTER=192.168.0.1:9092,192.168.0.2:9092,192.168.0.2:9092
PULSAR_CLUSTER=pulsar_cluster
PULSAR_TOPIC=pulsar_topic
HOST_IP=192.168.0.1
PULSAR_MODULE_INSTANCE_ID=1
以上这些字段的意义别离是:
- KAFKA_CLUSTER:Kafka broker list 地址;
- PULSAR_CLUSTER:Pulsar 的集群名称;
- PULSAR_TOPIC:Kafka 中用于接入 Pulsar 服务日志的 Topic;
- HOST_IP:Pulsar 主机的 IP;
- PULSAR_MODULE_INSTANCE_ID:Pulsar 服务的实例标识,一个主机上可能会部署多个 Pulsar 集群,集群间通过实例标识来辨别。
2)log4j2-kafka.yaml
该配置文件是从 log4j2.yaml 复制而来,在 log4j2.yaml 的根底上增加以下批改:(阐明:下图中左侧为 log4j2.yaml,右侧为 log4j2-kafka.yaml。)
- 增加 Kafka 集群 broker list,并定义 log4j2 写到 Kafka 中的音讯记录格局,一条音讯中的 8 个检索字段以空格宰割,Elasticsearch 以空格作为宰割符对 8 个检索字段进行分词。
•增加 kafka Appenders;
•增加 Failover;
•批改 Loggers 的 Root 和 Logger,改为异步模式;
•log4j2-kafka.yaml 配置文件的残缺内容如下:
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
Configuration:
status: INFO
monitorInterval: 30
name: pulsar
packages: io.prometheus.client.log4j2
Properties:
Property:
- name: "pulsar.log.dir"
value: "logs"
- name: "pulsar.log.file"
value: "pulsar.log"
- name: "pulsar.log.appender"
value: "RoutingAppender"
- name: "pulsar.log.root.level"
value: "info"
- name: "pulsar.log.level"
value: "info"
- name: "pulsar.routing.appender.default"
value: "Console"
- name: "kafkaBrokers"
value: "${sys:kafka.cluster}"
- name: "pattern"
value: "${sys:pulsar.cluster} ${sys:pulsar.hostname} ${sys:pulsar.hostip} ${sys:pulsar.module.type} ${sys:pulsar.module.instanceid} %date{yyyy-MM-dd HH:mm:ss.SSS} [%thread] [%c{10}] %level , %msg%n"
# Example: logger-filter script
Scripts:
ScriptFile:
name: filter.js
language: JavaScript
path: ./conf/log4j2-scripts/filter.js
charset: UTF-8
Appenders:
#Kafka
Kafka:
name: "pulsar_kafka"
topic: "${sys:pulsar.topic}"
ignoreExceptions: "false"
PatternLayout:
pattern: "${pattern}"
Property:
- name: "bootstrap.servers"
value: "${kafkaBrokers}"
- name: "max.block.ms"
value: "2000"
# Console
Console:
name: Console
target: SYSTEM_OUT
PatternLayout:
Pattern: "%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"
Failover:
name: "Failover"
primary: "pulsar_kafka"
retryIntervalSeconds: "600"
Failovers:
AppenderRef:
ref: "RollingFile"
# Rolling file appender configuration
RollingFile:
name: RollingFile
fileName: "${sys:pulsar.log.dir}/${sys:pulsar.log.file}"
filePattern: "${sys:pulsar.log.dir}/${sys:pulsar.log.file}-%d{MM-dd-yyyy}-%i.log.gz"
immediateFlush: false
PatternLayout:
Pattern: "%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"
Policies:
TimeBasedTriggeringPolicy:
interval: 1
modulate: true
SizeBasedTriggeringPolicy:
size: 1 GB
# Delete file older than 30days
DefaultRolloverStrategy:
Delete:
basePath: ${sys:pulsar.log.dir}
maxDepth: 2
IfFileName:
glob: "*/${sys:pulsar.log.file}*log.gz"
IfLastModified:
age: 30d
Prometheus:
name: Prometheus
# Routing
Routing:
name: RoutingAppender
Routes:
pattern: "$${ctx:function}"
Route:
-
Routing:
name: InstanceRoutingAppender
Routes:
pattern: "$${ctx:instance}"
Route:
-
RollingFile:
name: "Rolling-${ctx:function}"
fileName : "${sys:pulsar.log.dir}/functions/${ctx:function}/${ctx:functionname}-${ctx:instance}.log"
filePattern : "${sys:pulsar.log.dir}/functions/${sys:pulsar.log.file}-${ctx:instance}-%d{MM-dd-yyyy}-%i.log.gz"
PatternLayout:
Pattern: "%d{ABSOLUTE} %level{length=5} [%thread] [instance: %X{instance}] %logger{1} - %msg%n"
Policies:
TimeBasedTriggeringPolicy:
interval: 1
modulate: true
SizeBasedTriggeringPolicy:
size: "20MB"
# Trigger every day at midnight that also scan
# roll-over strategy that deletes older file
CronTriggeringPolicy:
schedule: "0 0 0 * * ?"
# Delete file older than 30days
DefaultRolloverStrategy:
Delete:
basePath: ${sys:pulsar.log.dir}
maxDepth: 2
IfFileName:
glob: "*/${sys:pulsar.log.file}*log.gz"
IfLastModified:
age: 30d
- ref: "${sys:pulsar.routing.appender.default}"
key: "${ctx:function}"
- ref: "${sys:pulsar.routing.appender.default}"
key: "${ctx:function}"
Loggers:
# Default root logger configuration
AsyncRoot:
level: "${sys:pulsar.log.root.level}"
additivity: true
AppenderRef:
- ref: "Failover"
level: "${sys:pulsar.log.level}"
- ref: Prometheus
level: info
AsyncLogger:
- name: org.apache.bookkeeper.bookie.BookieShell
level: info
additivity: false
AppenderRef:
- ref: Console
- name: verbose
level: info
additivity: false
AppenderRef:
- ref: Console
# Logger to inject filter script
# - name: org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl
# level: debug
# additivity: false
# AppenderRef:
# ref: "${sys:pulsar.log.appender}"
# ScriptFilter:
# onMatch: ACCEPT
# onMisMatch: DENY
# ScriptRef:
# ref: filter.js
注意事项:
- 日志接入必须异步,相对不能影响服务性能;
- 响应要求比拟高的零碎接入第三方零碎,必须依赖解耦,此处的 Failover Appender 就是解耦对 Kafka 的依赖,当 Kafka Crash 时,日志触发 Failover,写本地即可;
- log4j2 Failover appender retryIntervalSeconds 的默认值是 1 分钟,是通过异样来切换的,所以能够适量加大距离,比方下面的 10 分钟;
- Kafka appender ignoreExceptions 必须设置为 false,否则无奈触发 Failover;
- 这里有个比拟大的坑是 max.block.ms Property,KafkaClient 包里默认值是 60000ms,当 Kafka 宕机时,尝试写 Kafka 须要 1 分钟能力返回 Exception,之后才会触发 Failover,当申请量大时,log4j2 队列很快就会打满,之后写日志就 Blocking,重大影响到主服务响应。所以要设置足够短,队列长度足够长。
2、增加脚本文件
在 {PULSAR_HOME}/bin 目录中增加以下两个脚本文件:1)pulsar-kafka 该脚本文件是从 pulsar 脚本文件复制而来,在 pulsar 脚本文件的根底上增加如下批改:(阐明:下图中左侧为 pulsar,右侧为 pulsar-kafka。)
•指定 log4j2-kafka.yaml;
•增加读取 logenv.sh 的内容;
•增加 OPTS 选项,通过 pulsar-kafka 和 pulsar-daemon-kafka 两个脚本文件中启动 Pulsar 组件时将 JVM 选项传递给 Java 过程;
•pulsar-kafka 脚本文件的残缺内容如下:
#!/usr/bin/env bash
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
BINDIR=$(dirname "$0")
export PULSAR_HOME=`cd -P $BINDIR/..;pwd`
DEFAULT_BROKER_CONF=$PULSAR_HOME/conf/broker.conf
DEFAULT_BOOKKEEPER_CONF=$PULSAR_HOME/conf/bookkeeper.conf
DEFAULT_ZK_CONF=$PULSAR_HOME/conf/zookeeper.conf
DEFAULT_CONFIGURATION_STORE_CONF=$PULSAR_HOME/conf/global_zookeeper.conf
DEFAULT_DISCOVERY_CONF=$PULSAR_HOME/conf/discovery.conf
DEFAULT_PROXY_CONF=$PULSAR_HOME/conf/proxy.conf
DEFAULT_STANDALONE_CONF=$PULSAR_HOME/conf/standalone.conf
DEFAULT_WEBSOCKET_CONF=$PULSAR_HOME/conf/websocket.conf
DEFAULT_LOG_CONF=$PULSAR_HOME/conf/log4j2-kafka.yaml
DEFAULT_PULSAR_PRESTO_CONF=${PULSAR_HOME}/conf/presto
# functions related variables
FUNCTIONS_HOME=$PULSAR_HOME/pulsar-functions
DEFAULT_WORKER_CONF=$PULSAR_HOME/conf/functions_worker.yml
DEFAULT_JAVA_INSTANCE_JAR=$PULSAR_HOME/instances/java-instance.jar
JAVA_INSTANCE_JAR=${PULSAR_JAVA_INSTANCE_JAR:-"${DEFAULT_JAVA_INSTANCE_JAR}"}
DEFAULT_PY_INSTANCE_FILE=$PULSAR_HOME/instances/python-instance/python_instance_main.py
PY_INSTANCE_FILE=${PULSAR_PY_INSTANCE_FILE:-"${DEFAULT_PY_INSTANCE_FILE}"}
DEFAULT_FUNCTIONS_EXTRA_DEPS_DIR=$PULSAR_HOME/instances/deps
FUNCTIONS_EXTRA_DEPS_DIR=${PULSAR_FUNCTIONS_EXTRA_DEPS_DIR:-"${DEFAULT_FUNCTIONS_EXTRA_DEPS_DIR}"}
SQL_HOME=$PULSAR_HOME/pulsar-sql
PRESTO_HOME=${PULSAR_HOME}/lib/presto
# Check bookkeeper env and load bkenv.sh
if [-f "$PULSAR_HOME/conf/bkenv.sh"]
then
. "$PULSAR_HOME/conf/bkenv.sh"
fi
# Check pulsar env and load pulser_env.sh
if [-f "$PULSAR_HOME/conf/pulsar_env.sh"]
then
. "$PULSAR_HOME/conf/pulsar_env.sh"
fi
if [-f "$PULSAR_HOME/conf/logenv.sh"]
then
. "$PULSAR_HOME/conf/logenv.sh"
fi
# Check for the java to use
if [[-z $JAVA_HOME]]; then
JAVA=$(which java)
if [$? != 0]; then
echo "Error: JAVA_HOME not set, and no java executable found in $PATH." 1>&2
exit 1
fi
else
JAVA=$JAVA_HOME/bin/java
fi
# exclude tests jar
RELEASE_JAR=`ls $PULSAR_HOME/pulsar-*.jar 2> /dev/null | grep -v tests | tail -1`
if [$? == 0]; then
PULSAR_JAR=$RELEASE_JAR
fi
# exclude tests jar
BUILT_JAR=`ls $PULSAR_HOME/pulsar-broker/target/pulsar-*.jar 2> /dev/null | grep -v tests | tail -1`
if [$? != 0] && [! -e "$PULSAR_JAR"]; then
echo "\nCouldn't find pulsar jar.";
echo "Make sure you've run 'mvn package'\n";
exit 1;
elif [-e "$BUILT_JAR"]; then
PULSAR_JAR=$BUILT_JAR
fi
#
# find the instance locations for pulsar-functions
#
# find the java instance location
if [! -f "${JAVA_INSTANCE_JAR}" ]; then
# didn't find a released jar, then search the built jar
BUILT_JAVA_INSTANCE_JAR="${FUNCTIONS_HOME}/runtime-all/target/java-instance.jar"
if [-z "${BUILT_JAVA_INSTANCE_JAR}" ]; then
echo "\nCouldn't find pulsar-functions java instance jar.";
echo "Make sure you've run 'mvn package'\n";
exit 1;
fi
JAVA_INSTANCE_JAR=${BUILT_JAVA_INSTANCE_JAR}
fi
# find the python instance location
if [! -f "${PY_INSTANCE_FILE}" ]; then
# didn't find a released python instance, then search the built python instance
BUILT_PY_INSTANCE_FILE="${FUNCTIONS_HOME}/instance/target/python-instance/python_instance_main.py"
if [-z "${BUILT_PY_INSTANCE_FILE}" ]; then
echo "\nCouldn't find pulsar-functions python instance.";
echo "Make sure you've run 'mvn package'\n";
exit 1;
fi
PY_INSTANCE_FILE=${BUILT_PY_INSTANCE_FILE}
fi
# find pulsar sql presto distribution location
check_presto_libraries() {if [ ! -d "${PRESTO_HOME}" ]; then
BUILT_PRESTO_HOME="${SQL_HOME}/presto-distribution/target/pulsar-presto-distribution"
if [! -d "${BUILT_PRESTO_HOME}" ]; then
echo "\nCouldn't find presto distribution.";
echo "Make sure you've run 'mvn package'\n";
exit 1;
fi
PRESTO_HOME=${BUILT_PRESTO_HOME}
fi
}
pulsar_help() {
cat <<EOF
Usage: pulsar <command>
where command is one of:
broker Run a broker server
bookie Run a bookie server
zookeeper Run a zookeeper server
configuration-store Run a configuration-store server
discovery Run a discovery server
proxy Run a pulsar proxy
websocket Run a web socket proxy server
functions-worker Run a functions worker server
sql-worker Run a sql worker server
sql Run sql CLI
standalone Run a broker server with local bookies and local zookeeper
initialize-cluster-metadata One-time metadata initialization
delete-cluster-metadata Delete a cluster's metadata
initialize-transaction-coordinator-metadata One-time transaction coordinator metadata initialization
initialize-namespace namespace initialization
compact-topic Run compaction against a topic
zookeeper-shell Open a ZK shell client
broker-tool CLI to operate a specific broker
tokens Utility to create authentication tokens
help This help message
or command is the full name of a class with a defined main() method.
Environment variables:
PULSAR_LOG_CONF Log4j configuration file (default $DEFAULT_LOG_CONF)
PULSAR_BROKER_CONF Configuration file for broker (default: $DEFAULT_BROKER_CONF)
PULSAR_BOOKKEEPER_CONF Configuration file for bookie (default: $DEFAULT_BOOKKEEPER_CONF)
PULSAR_ZK_CONF Configuration file for zookeeper (default: $DEFAULT_ZK_CONF)
PULSAR_CONFIGURATION_STORE_CONF Configuration file for global configuration store (default: $DEFAULT_CONFIGURATION_STORE_CONF)
PULSAR_DISCOVERY_CONF Configuration file for discovery service (default: $DEFAULT_DISCOVERY_CONF)
PULSAR_WEBSOCKET_CONF Configuration file for websocket proxy (default: $DEFAULT_WEBSOCKET_CONF)
PULSAR_PROXY_CONF Configuration file for Pulsar proxy (default: $DEFAULT_PROXY_CONF)
PULSAR_WORKER_CONF Configuration file for functions worker (default: $DEFAULT_WORKER_CONF)
PULSAR_STANDALONE_CONF Configuration file for standalone (default: $DEFAULT_STANDALONE_CONF)
PULSAR_PRESTO_CONF Configuration directory for Pulsar Presto (default: $DEFAULT_PULSAR_PRESTO_CONF)
PULSAR_EXTRA_OPTS Extra options to be passed to the jvm
PULSAR_EXTRA_CLASSPATH Add extra paths to the pulsar classpath
PULSAR_PID_DIR Folder where the pulsar server PID file should be stored
PULSAR_STOP_TIMEOUT Wait time before forcefully kill the pulsar server instance, if the stop is not successful
These variable can also be set in conf/pulsar_env.sh
EOF
}
add_maven_deps_to_classpath() {
MVN="mvn"
if ["$MAVEN_HOME" != ""]; then
MVN=${MAVEN_HOME}/bin/mvn
fi
# Need to generate classpath from maven pom. This is costly so generate it
# and cache it. Save the file into our target dir so a mvn clean will get
# clean it up and force us create a new one.
f="${PULSAR_HOME}/distribution/server/target/classpath.txt"
if [! -f "${f}" ]
then
${MVN} -f "${PULSAR_HOME}/pom.xml" dependency:build-classpath -DincludeScope=compile -Dmdep.outputFile="${f}" &> /dev/null
fi
PULSAR_CLASSPATH=${CLASSPATH}:`cat "${f}"`
}
if [-d "$PULSAR_HOME/lib"]; then
PULSAR_CLASSPATH=$PULSAR_CLASSPATH:$PULSAR_HOME/lib/*
ASPECTJ_AGENT_PATH=`ls -1 $PULSAR_HOME/lib/org.aspectj-aspectjweaver-*.jar`
else
add_maven_deps_to_classpath
ASPECTJ_VERSION=`grep '<aspectj.version>' $PULSAR_HOME/pom.xml | awk -F'>' '{print $2}' | awk -F'<' '{print $1}'`
ASPECTJ_AGENT_PATH="$HOME/.m2/repository/org/aspectj/aspectjweaver/$ASPECTJ_VERSION/aspectjweaver-$ASPECTJ_VERSION.jar"
fi
ASPECTJ_AGENT="-javaagent:$ASPECTJ_AGENT_PATH"
# if no args specified, show usage
if [$# = 0]; then
pulsar_help;
exit 1;
fi
# get arguments
COMMAND=$1
shift
if [-z "$PULSAR_WORKER_CONF"]; then
PULSAR_WORKER_CONF=$DEFAULT_WORKER_CONF
fi
if [-z "$PULSAR_BROKER_CONF"]; then
PULSAR_BROKER_CONF=$DEFAULT_BROKER_CONF
fi
if [-z "$PULSAR_BOOKKEEPER_CONF"]; then
PULSAR_BOOKKEEPER_CONF=$DEFAULT_BOOKKEEPER_CONF
fi
if [-z "$PULSAR_ZK_CONF"]; then
PULSAR_ZK_CONF=$DEFAULT_ZK_CONF
fi
if [-z "$PULSAR_GLOBAL_ZK_CONF"]; then
PULSAR_GLOBAL_ZK_CONF=$DEFAULT_GLOBAL_ZK_CONF
fi
if [-z "$PULSAR_CONFIGURATION_STORE_CONF"]; then
PULSAR_CONFIGURATION_STORE_CONF=$DEFAULT_CONFIGURATION_STORE_CONF
fi
if [-z "$PULSAR_DISCOVERY_CONF"]; then
PULSAR_DISCOVERY_CONF=$DEFAULT_DISCOVERY_CONF
fi
if [-z "$PULSAR_PROXY_CONF"]; then
PULSAR_PROXY_CONF=$DEFAULT_PROXY_CONF
fi
if [-z "$PULSAR_WEBSOCKET_CONF"]; then
PULSAR_WEBSOCKET_CONF=$DEFAULT_WEBSOCKET_CONF
fi
if [-z "$PULSAR_STANDALONE_CONF"]; then
PULSAR_STANDALONE_CONF=$DEFAULT_STANDALONE_CONF
fi
if [-z "$PULSAR_LOG_CONF"]; then
PULSAR_LOG_CONF=$DEFAULT_LOG_CONF
fi
if [-z "$PULSAR_PRESTO_CONF"]; then
PULSAR_PRESTO_CONF=$DEFAULT_PULSAR_PRESTO_CONF
fi
PULSAR_CLASSPATH="$PULSAR_JAR:$PULSAR_CLASSPATH:$PULSAR_EXTRA_CLASSPATH"
PULSAR_CLASSPATH="`dirname $PULSAR_LOG_CONF`:$PULSAR_CLASSPATH"
OPTS="$OPTS -Dlog4j.configurationFile=`basename $PULSAR_LOG_CONF`"
# Ensure we can read bigger content from ZK. (It might be
# rarely needed when trying to list many z-nodes under a
# directory)
OPTS="$OPTS -Djute.maxbuffer=10485760 -Djava.net.preferIPv4Stack=true"
OPTS="-cp $PULSAR_CLASSPATH $OPTS"
OPTS="$OPTS $PULSAR_EXTRA_OPTS $PULSAR_MEM $PULSAR_GC"
# log directory & file
PULSAR_LOG_DIR=${PULSAR_LOG_DIR:-"$PULSAR_HOME/logs"}
PULSAR_LOG_APPENDER=${PULSAR_LOG_APPENDER:-"RoutingAppender"}
PULSAR_LOG_ROOT_LEVEL=${PULSAR_LOG_ROOT_LEVEL:-"info"}
PULSAR_LOG_LEVEL=${PULSAR_LOG_LEVEL:-"info"}
PULSAR_ROUTING_APPENDER_DEFAULT=${PULSAR_ROUTING_APPENDER_DEFAULT:-"Console"}
#Configure log configuration system properties
OPTS="$OPTS -Dpulsar.log.appender=$PULSAR_LOG_APPENDER"
OPTS="$OPTS -Dpulsar.log.dir=$PULSAR_LOG_DIR"
OPTS="$OPTS -Dpulsar.log.level=$PULSAR_LOG_LEVEL"
OPTS="$OPTS -Dpulsar.routing.appender.default=$PULSAR_ROUTING_APPENDER_DEFAULT"
# Functions related logging
OPTS="$OPTS -Dpulsar.functions.process.container.log.dir=$PULSAR_LOG_DIR"
# instance
OPTS="$OPTS -Dpulsar.functions.java.instance.jar=${JAVA_INSTANCE_JAR}"
OPTS="$OPTS -Dpulsar.functions.python.instance.file=${PY_INSTANCE_FILE}"
OPTS="$OPTS -Dpulsar.functions.extra.dependencies.dir=${FUNCTIONS_EXTRA_DEPS_DIR}"
OPTS="$OPTS -Dpulsar.functions.instance.classpath=${PULSAR_CLASSPATH}"
OPTS="$OPTS -Dpulsar.module.instanceid=${PULSAR_MODULE_INSTANCE_ID} -Dpulsar.module.type=$COMMAND -Dkafka.cluster=${KAFKA_CLUSTER} -Dpulsar.hostname=${HOSTNAME} -Dpulsar.hostip=${HOST_IP} -Dpulsar.cluster=${PULSAR_CLUSTER} -Dpulsar.topic=${PULSAR_TOPIC}"
ZK_OPTS="-Dzookeeper.4lw.commands.whitelist=* -Dzookeeper.snapshot.trust.empty=true"
#Change to PULSAR_HOME to support relative paths
cd "$PULSAR_HOME"
if [$COMMAND == "broker"]; then
PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"pulsar-broker.log"}
exec $JAVA $OPTS $ASPECTJ_AGENT -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.PulsarBrokerStarter --broker-conf $PULSAR_BROKER_CONF $@
elif [$COMMAND == "bookie"]; then
PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"bookkeeper.log"}
# Pass BOOKIE_EXTRA_OPTS option defined in pulsar_env.sh
OPTS="$OPTS $BOOKIE_EXTRA_OPTS"
exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.bookkeeper.proto.BookieServer --conf $PULSAR_BOOKKEEPER_CONF $@
elif [$COMMAND == "zookeeper"]; then
PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"zookeeper.log"}
exec $JAVA ${ZK_OPTS} $OPTS $ASPECTJ_AGENT -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.zookeeper.ZooKeeperStarter $PULSAR_ZK_CONF $@
elif [$COMMAND == "global-zookeeper"]; then
PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"global-zookeeper.log"}
# Allow global ZK to turn into read-only mode when it cannot reach the quorum
OPTS="${OPTS} ${ZK_OPTS} -Dreadonlymode.enabled=true"
exec $JAVA $OPTS $ASPECTJ_AGENT -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.zookeeper.ConfigurationStoreStarter $PULSAR_GLOBAL_ZK_CONF $@
elif [$COMMAND == "configuration-store"]; then
PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"configuration-store.log"}
# Allow global ZK to turn into read-only mode when it cannot reach the quorum
OPTS="${OPTS} ${ZK_OPTS} -Dreadonlymode.enabled=true"
exec $JAVA $OPTS $ASPECTJ_AGENT -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.zookeeper.ConfigurationStoreStarter $PULSAR_CONFIGURATION_STORE_CONF $@
elif [$COMMAND == "discovery"]; then
PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"discovery.log"}
exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.discovery.service.server.DiscoveryServiceStarter $PULSAR_DISCOVERY_CONF $@
elif [$COMMAND == "proxy"]; then
PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"pulsar-proxy.log"}
exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.proxy.server.ProxyServiceStarter --config $PULSAR_PROXY_CONF $@
elif [$COMMAND == "websocket"]; then
PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"pulsar-websocket.log"}
exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.websocket.service.WebSocketServiceStarter $PULSAR_WEBSOCKET_CONF $@
elif [$COMMAND == "functions-worker"]; then
PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"pulsar-functions-worker.log"}
exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.functions.worker.FunctionWorkerStarter -c $PULSAR_WORKER_CONF $@
elif [$COMMAND == "standalone"]; then
PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"pulsar-standalone.log"}
exec $JAVA $OPTS $ASPECTJ_AGENT ${ZK_OPTS} -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.PulsarStandaloneStarter --config $PULSAR_STANDALONE_CONF $@
elif [$COMMAND == "initialize-cluster-metadata"]; then
exec $JAVA $OPTS org.apache.pulsar.PulsarClusterMetadataSetup $@
elif [$COMMAND == "delete-cluster-metadata"]; then
exec $JAVA $OPTS org.apache.pulsar.PulsarClusterMetadataTeardown $@
elif [$COMMAND == "initialize-transaction-coordinator-metadata"]; then
exec $JAVA $OPTS org.apache.pulsar.PulsarTransactionCoordinatorMetadataSetup $@
elif [$COMMAND == "initialize-namespace"]; then
exec $JAVA $OPTS org.apache.pulsar.PulsarInitialNamespaceSetup $@
elif [$COMMAND == "zookeeper-shell"]; then
exec $JAVA $OPTS org.apache.zookeeper.ZooKeeperMain $@
elif [$COMMAND == "broker-tool"]; then
exec $JAVA $OPTS org.apache.pulsar.broker.tools.BrokerTool $@
elif [$COMMAND == "compact-topic"]; then
exec $JAVA $OPTS org.apache.pulsar.compaction.CompactorTool --broker-conf $PULSAR_BROKER_CONF $@
elif [$COMMAND == "sql"]; then
check_presto_libraries
exec $JAVA -cp "${PRESTO_HOME}/lib/*" io.prestosql.cli.Presto --server localhost:8081 "${@}"
elif [$COMMAND == "sql-worker"]; then
check_presto_libraries
exec ${PRESTO_HOME}/bin/launcher --etc-dir ${PULSAR_PRESTO_CONF} "${@}"
elif [$COMMAND == "tokens"]; then
exec $JAVA $OPTS org.apache.pulsar.utils.auth.tokens.TokensCliUtils $@
elif [$COMMAND == "help" -o $COMMAND == "--help" -o $COMMAND == "-h"]; then
pulsar_help;
else
echo ""echo"-- Invalid command '$COMMAND' -- Use '$0 help' to get a list of valid commands"echo""
exit 1
fi
2)pulsar-daemon-kafka
该脚本文件是从 pulsar-daemon 脚本文件复制而来,在 pulsar-daemon 脚本文件的根底上增加如下批改:(阐明:下图中左侧为 pulsar-daemon,右侧为 pulsar-daemon-kafka。)
•增加读取 logenv.sh 的内容;
•读取 pulsar-kafka 的内容;
•pulsar-daemon-kafka 脚本文件的残缺内容如下:
#!/usr/bin/env bash
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
usage() {
cat <<EOF
Usage: pulsar-daemon (start|stop) <command> <args...>
where command is one of:
broker Run a broker server
bookie Run a bookie server
zookeeper Run a zookeeper server
configuration-store Run a configuration-store server
discovery Run a discovery server
websocket Run a websocket proxy server
functions-worker Run a functions worker server
standalone Run a standalone Pulsar service
proxy Run a Proxy Pulsar service
where argument is one of:
-force (accepted only with stop command): Decides whether to stop the server forcefully if not stopped by normal shutdown
EOF
}
BINDIR=$(dirname "$0")
PULSAR_HOME=$(cd -P $BINDIR/..;pwd)
# Check bookkeeper env and load bkenv.sh
if [-f "$PULSAR_HOME/conf/bkenv.sh"]
then
. "$PULSAR_HOME/conf/bkenv.sh"
fi
if [-f "$PULSAR_HOME/conf/pulsar_env.sh"]
then
. "$PULSAR_HOME/conf/pulsar_env.sh"
fi
if [-f "$PULSAR_HOME/conf/logenv.sh"]
then
. "$PULSAR_HOME/conf/logenv.sh"
fi
PULSAR_LOG_APPENDER=${PULSAR_LOG_APPENDER:-"RollingFile"}
PULSAR_STOP_TIMEOUT=${PULSAR_STOP_TIMEOUT:-30}
PULSAR_PID_DIR=${PULSAR_PID_DIR:-$PULSAR_HOME/bin}
if [$# = 0]; then
usage
exit 1
elif [$# = 1]; then
if [$1 == "--help" -o $1 == "-h"]; then
usage
exit 1
else
echo "Error: no enough arguments provided."
usage
exit 1
fi
fi
startStop=$1
shift
command=$1
shift
case $command in
(broker)
echo "doing $startStop $command ..."
;;
(bookie)
echo "doing $startStop $command ..."
;;
(zookeeper)
echo "doing $startStop $command ..."
;;
(global-zookeeper)
echo "doing $startStop $command ..."
;;
(configuration-store)
echo "doing $startStop $command ..."
;;
(discovery)
echo "doing $startStop $command ..."
;;
(websocket)
echo "doing $startStop $command ..."
;;
(functions-worker)
echo "doing $startStop $command ..."
;;
(standalone)
echo "doing $startStop $command ..."
;;
(proxy)
echo "doing $startStop $command ..."
;;
(*)
echo "Error: unknown service name $command"
usage
exit 1
;;
esac
export PULSAR_LOG_DIR=$PULSAR_LOG_DIR
export PULSAR_LOG_APPENDER=$PULSAR_LOG_APPENDER
export PULSAR_LOG_FILE=pulsar-$command-$HOSTNAME.log
pid=$PULSAR_PID_DIR/pulsar-$command.pid
out=$PULSAR_LOG_DIR/pulsar-$command-$HOSTNAME.out
logfile=$PULSAR_LOG_DIR/$PULSAR_LOG_FILE
rotate_out_log ()
{
log=$1;
num=5;
if [-n "$2"]; then
num=$2
fi
if [-f "$log"]; then # rotate logs
while [$num -gt 1]; do
prev=`expr $num - 1`
[-f "$log.$prev"] && mv "$log.$prev" "$log.$num"
num=$prev
done
mv "$log" "$log.$num";
fi
}
mkdir -p "$PULSAR_LOG_DIR"
case $startStop in
(start)
if [-f $pid]; then
if kill -0 `cat $pid` > /dev/null 2>&1; then
echo $command running as process `cat $pid`. Stop it first.
exit 1
fi
fi
rotate_out_log $out
echo starting $command, logging to $logfile
echo Note: Set immediateFlush to true in conf/log4j2-kafka.yaml will guarantee the logging event is flushing to disk immediately. The default behavior is switched off due to performance considerations.
pulsar=$PULSAR_HOME/bin/pulsar-kafka
nohup $pulsar $command "$@" > "$out" 2>&1 < /dev/null &
echo $! > $pid
sleep 1; head $out
sleep 2;
if ! ps -p $! > /dev/null ; then
exit 1
fi
;;
(stop)
if [-f $pid]; then
TARGET_PID=$(cat $pid)
if kill -0 $TARGET_PID > /dev/null 2>&1; then
echo "stopping $command"
kill $TARGET_PID
count=0
location=$PULSAR_LOG_DIR
while ps -p $TARGET_PID > /dev/null;
do
echo "Shutdown is in progress... Please wait..."
sleep 1
count=`expr $count + 1`
if ["$count" = "$PULSAR_STOP_TIMEOUT"]; then
break
fi
done
if ["$count" != "$PULSAR_STOP_TIMEOUT"]; then
echo "Shutdown completed."
fi
if kill -0 $TARGET_PID > /dev/null 2>&1; then
fileName=$location/$command.out
$JAVA_HOME/bin/jstack $TARGET_PID > $fileName
echo "Thread dumps are taken for analysis at $fileName"
if ["$1" == "-force"]
then
echo "forcefully stopping $command"
kill -9 $TARGET_PID >/dev/null 2>&1
echo Successfully stopped the process
else
echo "WARNNING : $command is not stopped completely."
exit 1
fi
fi
else
echo "no $command to stop"
fi
rm $pid
else
echo no "$command to stop"
fi
;;
(*)
usage
exit 1
;;
esac
3、增加 Kafka Producer 依赖的 jar
在 pulsar 集群的所有节点上的 {PULSAR_HOME}/lib 目录中增加以下 3 个 jar:
connect-api-2.0.1.jar
disruptor-3.4.2.jar
kafka-clients-2.0.1.jar
4、启动 Pulsar 服务
- 为了确保 Pulsar 服务的日志可能正确的写入 Kafka,先通过 bin/pulsar-kafka 前台启动,在没有异样的状况下,再通过 bin/pulsar-daemon-kafka 命令后盾启动。
- 以启动 broker 为例,执行以下命令:
bin/pulsar-daemon-kafka start broker
- 通过 ps 命令查看 broker 过程如下:
在上图能够看到,咱们通过 logenv.sh 配置的 OPTS 都曾经传递到 broker 过程中,log4j2-kafka.yaml 中的 sys 标签便能够通过这些属性值实例化一个 Kafka Producer,broker 过程的日志便会通过 Kafka Producer 发送到 Kafka broker 中。
5、测试 Pulsar 日志是否胜利写入 Kafka broker
启动一个 Kafka Consumer,订阅 log4j2 发送音讯的 Topic,读取到的音讯内容如下,多个检索字段之间以空格离开:
pulsar-cluster dapp21 192.168.0.1 broker 1 2020-12-26 17:40:14.363 [prometheus-stats-43-1] [org.eclipse.jetty.server.RequestLog] INFO - 192.168.0.1 - - [26/Dec/2020:17:40:14 +0800] "GET /metrics/ HTTP/1.1" 200 23445 "http://192.168.0.1:8080/metrics" "Prometheus/2.22.1" 4
6、日志检索
关上 kibana 页面,依据分词的字段进行检索,检索条件如下:cluster:”pulsar-cluster” AND hostname:”XXX” AND module:”broker” AND level:”INFO”
在上图中能够看到某个时间段内的日志检索后果,并且能够依据须要,在检索后果中增加 Available fields。这样,开发或运维人员能够通过 kibana 从多个维度疾速无效的剖析 Pulsar 服务异样的起因。至此,就是 Apache Pulsar 基于 Log4j2+Kafka+ELK 实现日志的疾速检索的一套残缺的解决方案。
总结
目前,分布式、微服务化是比拟风行的技术方向,在生产零碎中,随着业务的一直倒退, 利用和服务体量的疾速扩张,从单体 / 垂直架构转移到分布式 / 微服务架构是自然而然的抉择,它次要体现在升高复杂度、容错、独立部署、程度伸缩等方面。但同时也面临着新的挑战,如问题排查的效率,运维监控的便捷性等。本文以 Apache Pulsar 为例,分享 Java 过程如何应用 Log4j2+Kafka+ELK 实现分布式、微服务化的日志的疾速检索,达到服务治理的成果。
相干浏览
关注 StreamCloudNative,与作者探讨各畛域技术的发展趋势👇
- 应用 Elastic Beats 收集日志到 Pulsar
- 如何应用 Apache Flume 发送日志数据至 Apache Pulsar
- KoP 正式开源:在 Apache Pulsar 上反对原生 Kafka 协定
欢送投稿
你是否从这篇文章中失去启发呢?
你有没有独特的教训与社区小伙伴分享、和社区独特成长呢?
Apache Pulsar 社区欢送大家踊跃投稿。Apache Pulsar 和 StreamNative 心愿为大家提供 Pulsar 教训与常识分享的平台,并帮忙更多的社区小伙伴深刻理解 Pulsar。扫码增加 Bot 好友即可联系投稿👇
点击 链接 ,浏览原文吧!