共计 4439 个字符,预计需要花费 12 分钟才能阅读完成。
为便于查看散布在多个机器上的利用日志, 常须要聚合日志. 以下是集体的实现总结
市面上的做法
- elk (es, loglash, konlia): loglash 做日志聚合 (以 appender) 的模式, es 做存储, konlia 做可视化.
简略实现
思路: 音讯放 mq, 生产端有序生产, 记录日志
- 详细描述:
- web 入口能够通过 servlet filter 生成整个调用链的惟一 TraceId (能够由 申请 url, 业务端标识独特形成)
- 通过 dubbo 的 filter, 实现 调用开始的时候将 traceId 传入到 threadLocal. 生产端调用服务端时, 将 traceId. 作为额定属性传参, 服务端 filter 接管到参数时放入到 ThreadLocal 中从而达到标记同一个申请的目标.
- 自定义 logback 的 Appender, 拿到 ThreadLocal 中的 TraceId, 如果是雷同的则放入 RocketMq 的同一个队列, 达到程序生产的目标. 继而保障记录日志是有序的.
- 生产日志时, 依照队列循环打印, 因为每个队列外面的对应的是整个的一个调用链的日志, 所以依照队列打印是时序失常的. 更有利于查看日志
- 留神点:
- 对于音讯生产者而言, 记录日志时, 尽管是程序发送的, 但不能保障先收回的就先达到队列. 兼顾性能, 又不能采纳 rocketmq 的同步发送音讯模式. 采纳 sendOneWay 的形式效率快, 但不保障达到队列里是有序的.
- 不能用 logback 的 AsyncAppender 包装本人实现的 appender, 因为全局 traceId 保留在 threadLocal 中,AsyncAppender 打印日志会在新启一个线程打印日志, 之前的 ThreadLocal 中的 TraceId 就获取不到了.
- 示例代码
次要类放 github 上了
https://github.com/normalHeFei/normal_try/tree/master/java/src/main/java/wk/clulog
logback.xml
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="mqAppender1" class="wk.clulog.RocketMqAppender">
<param name="Tag" value="logTag" />
<param name="Topic" value="logTopic" />
<param name="ProducerGroup" value="logGroup" />
<param name="NameServerAddress" value="192.168.103.3:9876"/>
<layout class="ch.qos.logback.classic.PatternLayout">
<pattern>%date %p %t - %m%n</pattern>
</layout>
</appender>
<appender name="mqAsyncAppender1" class="ch.qos.logback.classic.AsyncAppender">
<queueSize>1024</queueSize>
<discardingThreshold>80</discardingThreshold>
<maxFlushTime>2000</maxFlushTime>
<neverBlock>true</neverBlock>
<appender-ref ref="mqAppender1"/>
</appender>
<root level="INFO">
<appender-ref ref="mqAppender1"/>
</root>
</configuration>
rocketmq 相干实现代码走读
- producer 几种发送形式实现
- sendOneWay / sendAsync:
依据负载平衡策略选取 broker, 获取 channel 间接发送, 尽管是 sendOneWay 但对并发发送的数量,rocketMq 其实用信号量爱护了一下最大的并发数, 相干代码如下
public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {request.markOnewayRPC();
boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if (acquired) {
// 将信号量 的 release 用 cas 包装了一下, 防止多线程环境下多个 release 反复操作
final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);
try {channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {once.release();
if (!f.isSuccess()) {log.warn("send a request command to channel <" + channel.remoteAddress() + "> failed.");
}
}
});
} catch (Exception e) {once.release();
log.warn("write send a request command to channel <" + channel.remoteAddress() + "> failed.");
throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
}
} else {if (timeoutMillis <= 0) {throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast");
} else {
String info = String.format(
"invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
timeoutMillis,
this.semaphoreOneway.getQueueLength(),
this.semaphoreOneway.availablePermits());
log.warn(info);
throw new RemotingTimeoutException(info);
}
}
}
- sendMessageSync
通过 countDownLatch 实现同步返回. 代码如下:
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
// 将后果包装成 Future
if (f.isSuccess()) {responseFuture.setSendRequestOK(true);
return;
} else {responseFuture.setSendRequestOK(false);
}
responseTable.remove(opaque);
responseFuture.setCause(f.cause());
responseFuture.putResponse(null);
log.warn("send a request command to channel <" + addr + "> failed.");
}
});
RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
// 栅栏期待.
public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException {this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
return this.responseCommand;
}
// 回调返回后果时, 解除栅栏
public void putResponse(final RemotingCommand responseCommand) {
this.responseCommand = responseCommand;
this.countDownLatch.countDown();}
- consumer 是如何有序生产的
间接看代码
try {
//processQueue 为队列音讯的解决快照, 记录了解决音讯的偏移量等信息, 通过对解决队列加锁来实现 单个队列外面音讯的程序生产.
this.processQueue.getLockConsume().lock();
if (this.processQueue.isDropped()) {log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
this.messageQueue);
break;
}
status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessageOrderlyService.this.consumerGroup,
msgs,
messageQueue);
hasException = true;
} finally {this.processQueue.getLockConsume().unlock();}
正文完