为便于查看散布在多个机器上的利用日志,常须要聚合日志. 以下是集体的实现总结
市面上的做法
- elk (es, loglash, konlia): loglash 做日志聚合(以appender)的模式, es 做存储, konlia 做可视化.
简略实现
思路: 音讯放mq,生产端有序生产,记录日志
- 详细描述:
- web入口能够通过servlet filter 生成整个调用链的惟一 TraceId (能够由 申请url,业务端标识独特形成)
- 通过dubbo 的filter, 实现 调用开始的时候将traceId传入到threadLocal. 生产端调用服务端时,将traceId. 作为额定属性传参,服务端filter 接管到参数时放入到ThreadLocal 中从而达到标记同一个申请的目标.
- 自定义 logback的 Appender, 拿到ThreadLocal中的 TraceId,如果是雷同的则放入RocketMq 的同一个队列,达到程序生产的目标.继而保障记录日志是有序的.
- 生产日志时,依照队列循环打印,因为每个队列外面的对应的是整个的一个调用链的日志, 所以依照队列打印是时序失常的. 更有利于查看日志
- 留神点:
- 对于音讯生产者而言, 记录日志时,尽管是程序发送的, 但不能保障先收回的就先达到队列. 兼顾性能, 又不能采纳rocketmq 的同步发送音讯模式.采纳sendOneWay的形式效率快,但不保障达到队列里是有序的.
- 不能用 logback 的 AsyncAppender 包装本人实现的 appender, 因为全局 traceId保留在threadLocal 中,AsyncAppender 打印日志会在新启一个线程打印日志, 之前的ThreadLocal 中的TraceId 就获取不到了.
- 示例代码
次要类放github上了https://github.com/normalHeFei/normal_try/tree/master/java/src/main/java/wk/cluloglogback.xml <?xml version="1.0" encoding="UTF-8"?><configuration> <appender name="mqAppender1" class="wk.clulog.RocketMqAppender"> <param name="Tag" value="logTag" /> <param name="Topic" value="logTopic" /> <param name="ProducerGroup" value="logGroup" /> <param name="NameServerAddress" value="192.168.103.3:9876"/> <layout class="ch.qos.logback.classic.PatternLayout"> <pattern>%date %p %t - %m%n</pattern> </layout> </appender> <appender name="mqAsyncAppender1" class="ch.qos.logback.classic.AsyncAppender"> <queueSize>1024</queueSize> <discardingThreshold>80</discardingThreshold> <maxFlushTime>2000</maxFlushTime> <neverBlock>true</neverBlock> <appender-ref ref="mqAppender1"/> </appender> <root level="INFO"> <appender-ref ref="mqAppender1"/> </root></configuration>
rocketmq 相干实现代码走读
- producer几种发送形式实现
- sendOneWay / sendAsync:
依据负载平衡策略选取broker,获取channel 间接发送,尽管是sendOneWay但对并发发送的数量,rocketMq其实用信号量爱护了一下最大的并发数,相干代码如下
public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { request.markOnewayRPC(); boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); if (acquired) { //将信号量 的 release 用 cas 包装了一下,防止多线程环境下多个release反复操作 final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway); try { channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { once.release(); if (!f.isSuccess()) { log.warn("send a request command to channel <" + channel.remoteAddress() + "> failed."); } } }); } catch (Exception e) { once.release(); log.warn("write send a request command to channel <" + channel.remoteAddress() + "> failed."); throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e); } } else { if (timeoutMillis <= 0) { throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast"); } else { String info = String.format( "invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", timeoutMillis, this.semaphoreOneway.getQueueLength(), this.semaphoreOneway.availablePermits() ); log.warn(info); throw new RemotingTimeoutException(info); } } }
- sendMessageSync
通过countDownLatch实现同步返回. 代码如下:
channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { //将后果包装成Future if (f.isSuccess()) { responseFuture.setSendRequestOK(true); return; } else { responseFuture.setSendRequestOK(false); } responseTable.remove(opaque); responseFuture.setCause(f.cause()); responseFuture.putResponse(null); log.warn("send a request command to channel <" + addr + "> failed."); }});RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);//栅栏期待.public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException { this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS); return this.responseCommand; }//回调返回后果时,解除栅栏public void putResponse(final RemotingCommand responseCommand) { this.responseCommand = responseCommand; this.countDownLatch.countDown();}
- consumer 是如何有序生产的
间接看代码
try { //processQueue 为队列音讯的解决快照,记录了解决音讯的偏移量等信息, 通过对解决队列加锁来实现 单个队列外面音讯的程序生产. this.processQueue.getLockConsume().lock(); if (this.processQueue.isDropped()) { log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}", this.messageQueue); break; } status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", RemotingHelper.exceptionSimpleDesc(e), ConsumeMessageOrderlyService.this.consumerGroup, msgs, messageQueue); hasException = true; } finally { this.processQueue.getLockConsume().unlock(); }