聊聊reactor-netty的AccessLog

5次阅读

共计 8900 个字符,预计需要花费 23 分钟才能阅读完成。


本文主要研究一下 reactor-netty 的 AccessLog
开启 access log

对于使用 tomcat 的 spring boot 应用,可以 server.tomcat.accesslog.enabled=true 来开启
对于使用 jetty 的 spring boot 应用,可以 server.jetty.accesslog.enabled=true 来开启
对于使用 undertow 的 spring boot 应用,可以 server.undertow.accesslog.enabled=true 来开启

对于使用 webflux 的应用,没有这么对应的配置,但是可以通过 -Dreactor.netty.http.server.accessLogEnabled=true 来开启
ReactorNetty
reactor-netty-0.8.5.RELEASE-sources.jar!/reactor/netty/ReactorNetty.java
/**
* Internal helpers for reactor-netty contracts
*
* @author Stephane Maldini
*/
public final class ReactorNetty {
//……

// System properties names

/**
* Default worker thread count, fallback to available processor
* (but with a minimum value of 4)
*/
public static final String IO_WORKER_COUNT = “reactor.netty.ioWorkerCount”;
/**
* Default selector thread count, fallback to -1 (no selector thread)
*/
public static final String IO_SELECT_COUNT = “reactor.netty.ioSelectCount”;
/**
* Default worker thread count for UDP, fallback to available processor
* (but with a minimum value of 4)
*/
public static final String UDP_IO_THREAD_COUNT = “reactor.netty.udp.ioThreadCount”;

/**
* Default value whether the native transport (epoll, kqueue) will be preferred,
* fallback it will be preferred when available
*/
public static final String NATIVE = “reactor.netty.native”;

/**
* Default max connections, if -1 will never wait to acquire before opening a new
* connection in an unbounded fashion. Fallback to
* available number of processors (but with a minimum value of 16)
*/
public static final String POOL_MAX_CONNECTIONS = “reactor.netty.pool.maxConnections”;
/**
* Default acquisition timeout (milliseconds) before error. If -1 will never wait to
* acquire before opening a new
* connection in an unbounded fashion. Fallback 45 seconds
*/
public static final String POOL_ACQUIRE_TIMEOUT = “reactor.netty.pool.acquireTimeout”;

/**
* Default SSL handshake timeout (milliseconds), fallback to 10 seconds
*/
public static final String SSL_HANDSHAKE_TIMEOUT = “reactor.netty.tcp.sslHandshakeTimeout”;
/**
* Default value whether the SSL debugging on the client side will be enabled/disabled,
* fallback to SSL debugging disabled
*/
public static final String SSL_CLIENT_DEBUG = “reactor.netty.tcp.ssl.client.debug”;
/**
* Default value whether the SSL debugging on the server side will be enabled/disabled,
* fallback to SSL debugging disabled
*/
public static final String SSL_SERVER_DEBUG = “reactor.netty.tcp.ssl.server.debug”;

/**
* Specifies whether the Http Server access log will be enabled.
* By default it is disabled.
*/
public static final String ACCESS_LOG_ENABLED = “reactor.netty.http.server.accessLogEnabled”;

//……
}
ReactorNetty 定义了 ACCESS_LOG_ENABLED 常量,其值为 reactor.netty.http.server.accessLogEnabled
HttpServerBind
reactor-netty-0.8.5.RELEASE-sources.jar!/reactor/netty/http/server/HttpServerBind.java
final class HttpServerBind extends HttpServer
implements Function<ServerBootstrap, ServerBootstrap> {

static final HttpServerBind INSTANCE = new HttpServerBind();

static final Function<DisposableServer, DisposableServer> CLEANUP_GLOBAL_RESOURCE = DisposableBind::new;

static final boolean ACCESS_LOG =
Boolean.parseBoolean(System.getProperty(ACCESS_LOG_ENABLED, “false”));

//……

static final class Http1Initializer
implements BiConsumer<ConnectionObserver, Channel> {

final int line;
final int header;
final int chunk;
final boolean validate;
final int buffer;
final int minCompressionSize;
final BiPredicate<HttpServerRequest, HttpServerResponse> compressPredicate;
final boolean forwarded;
final ServerCookieEncoder cookieEncoder;
final ServerCookieDecoder cookieDecoder;

Http1Initializer(int line,
int header,
int chunk,
boolean validate,
int buffer,
int minCompressionSize,
@Nullable BiPredicate<HttpServerRequest, HttpServerResponse> compressPredicate,
boolean forwarded,
ServerCookieEncoder encoder,
ServerCookieDecoder decoder) {
this.line = line;
this.header = header;
this.chunk = chunk;
this.validate = validate;
this.buffer = buffer;
this.minCompressionSize = minCompressionSize;
this.compressPredicate = compressPredicate;
this.forwarded = forwarded;
this.cookieEncoder = encoder;
this.cookieDecoder = decoder;
}

@Override
public void accept(ConnectionObserver listener, Channel channel) {
ChannelPipeline p = channel.pipeline();

p.addLast(NettyPipeline.HttpCodec, new HttpServerCodec(line, header, chunk, validate, buffer));

if (ACCESS_LOG) {
p.addLast(NettyPipeline.AccessLogHandler, new AccessLogHandler());
}

boolean alwaysCompress = compressPredicate == null && minCompressionSize == 0;

if (alwaysCompress) {
p.addLast(NettyPipeline.CompressionHandler,
new SimpleCompressionHandler());
}

p.addLast(NettyPipeline.HttpTrafficHandler,
new HttpTrafficHandler(listener, forwarded, compressPredicate, cookieEncoder, cookieDecoder));
}
}

//……
}
HttpServerBind 有个 ACCESS_LOG 属性,它读取 ReactorNetty 的 ACCESS_LOG_ENABLED(reactor.netty.http.server.accessLogEnabled) 的属性,读取不到默认为 false;HttpServerBind 有个 Http1Initializer 类,它的 accept 方法会判断 ACCESS_LOG 是否为 true,如果为 true 则会往 Channel 的 pipeline 添加名为 accessLogHandler(NettyPipeline.AccessLogHandler) 的 AccessLogHandler
AccessLogHandler
reactor-netty-0.8.5.RELEASE-sources.jar!/reactor/netty/http/server/AccessLogHandler.java
final class AccessLogHandler extends ChannelDuplexHandler {

AccessLog accessLog = new AccessLog();

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof HttpRequest) {
final HttpRequest request = (HttpRequest) msg;
final SocketChannel channel = (SocketChannel) ctx.channel();

accessLog = new AccessLog()
.address(channel.remoteAddress().getHostString())
.port(channel.localAddress().getPort())
.method(request.method().name())
.uri(request.uri())
.protocol(request.protocolVersion().text());
}
super.channelRead(ctx, msg);
}

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if (msg instanceof HttpResponse) {
final HttpResponse response = (HttpResponse) msg;
final HttpResponseStatus status = response.status();

if (status.equals(HttpResponseStatus.CONTINUE)) {
ctx.write(msg, promise);
return;
}

final boolean chunked = HttpUtil.isTransferEncodingChunked(response);
accessLog.status(status.codeAsText())
.chunked(chunked);
if (!chunked) {
accessLog.contentLength(HttpUtil.getContentLength(response, -1));
}
}
if (msg instanceof LastHttpContent) {
accessLog.increaseContentLength(((LastHttpContent) msg).content().readableBytes());
ctx.write(msg, promise)
.addListener(future -> {
if (future.isSuccess()) {
accessLog.log();
}
});
return;
}
if (msg instanceof ByteBuf) {
accessLog.increaseContentLength(((ByteBuf) msg).readableBytes());
}
if (msg instanceof ByteBufHolder) {
accessLog.increaseContentLength(((ByteBufHolder) msg).content().readableBytes());
}
ctx.write(msg, promise);
}
}
AccessLogHandler 继承了 ChannelDuplexHandler;在 channelRead 的时候创建了 AccessLog 对象,在 write 的时候更新 AccessLog 对象;当 msg 为 LastHttpContent 时,则添加了一个 listener,在成功回调时执行 accessLog.log()
AccessLog
reactor-netty-0.8.5.RELEASE-sources.jar!/reactor/netty/http/server/AccessLog.java
final class AccessLog {
static final Logger log = Loggers.getLogger(“reactor.netty.http.server.AccessLog”);
static final DateTimeFormatter DATE_TIME_FORMATTER =
DateTimeFormatter.ofPattern(“dd/MMM/yyyy:HH:mm:ss Z”, Locale.US);
static final String COMMON_LOG_FORMAT =
“{} – {} [{}] \”{} {} {}\” {} {} {} {} ms”;
static final String MISSING = “-“;

final String zonedDateTime;

String address;
CharSequence method;
CharSequence uri;
String protocol;
String user = MISSING;
CharSequence status;
long contentLength;
boolean chunked;
long startTime = System.currentTimeMillis();
int port;

AccessLog() {
this.zonedDateTime = ZonedDateTime.now().format(DATE_TIME_FORMATTER);
}

AccessLog address(String address) {
this.address = Objects.requireNonNull(address, “address”);
return this;
}

AccessLog port(int port) {
this.port = port;
return this;
}

AccessLog method(CharSequence method) {
this.method = Objects.requireNonNull(method, “method”);
return this;
}

AccessLog uri(CharSequence uri) {
this.uri = Objects.requireNonNull(uri, “uri”);
return this;
}

AccessLog protocol(String protocol) {
this.protocol = Objects.requireNonNull(protocol, “protocol”);
return this;
}

AccessLog status(CharSequence status) {
this.status = Objects.requireNonNull(status, “status”);
return this;
}

AccessLog contentLength(long contentLength) {
this.contentLength = contentLength;
return this;
}

AccessLog increaseContentLength(long contentLength) {
if (chunked) {
this.contentLength += contentLength;
}
return this;
}

AccessLog chunked(boolean chunked) {
this.chunked = chunked;
return this;
}

long duration() {
return System.currentTimeMillis() – startTime;
}

void log() {
if (log.isInfoEnabled()) {
log.info(COMMON_LOG_FORMAT, address, user, zonedDateTime,
method, uri, protocol, status, (contentLength > -1 ? contentLength : MISSING), port, duration());
}
}
}
AccessLog 的 log 方法直接通过 logger 输出日志,其日志格式为 COMMON_LOG_FORMAT({} – {} [{}] “{} {} {}” {} {} {} {} ms),分别是 address, user, zonedDateTime, method, uri, protocol, status, contentLength, port, duration
小结

对于使用 webflux 的应用,可以通过 -Dreactor.netty.http.server.accessLogEnabled=true 来开启 access log
HttpServerBind 有个 ACCESS_LOG 属性,它读取 ReactorNetty 的 ACCESS_LOG_ENABLED(reactor.netty.http.server.accessLogEnabled) 的属性,读取不到默认为 false;HttpServerBind 有个 Http1Initializer 类,它的 accept 方法会判断 ACCESS_LOG 是否为 true,如果为 true 则会往 Channel 的 pipeline 添加名为 accessLogHandler(NettyPipeline.AccessLogHandler) 的 AccessLogHandler
AccessLogHandler 继承了 ChannelDuplexHandler;在 channelRead 的时候创建了 AccessLog 对象,在 write 的时候更新 AccessLog 对象;当 msg 为 LastHttpContent 时,则添加了一个 listener,在成功回调时执行 accessLog.log();AccessLog 的 log 方法直接通过 logger 输出日志,其日志格式为 COMMON_LOG_FORMAT({} – {} [{}] “{} {} {}” {} {} {} {} ms),分别是 address, user, zonedDateTime, method, uri, protocol, status, contentLength, port, duration

doc
Spring Boot Reactor Netty Configuration

正文完
 0