序
本文主要研究一下 dubbo 的 AccessLogFilter
AccessLogFilter
dubbo-2.7.3/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/AccessLogFilter.java
@Activate(group = PROVIDER, value = ACCESS_LOG_KEY)
public class AccessLogFilter implements Filter {private static final Logger logger = LoggerFactory.getLogger(AccessLogFilter.class);
private static final String ACCESS_LOG_KEY = "dubbo.accesslog";
private static final int LOG_MAX_BUFFER = 5000;
private static final long LOG_OUTPUT_INTERVAL = 5000;
private static final String FILE_DATE_FORMAT = "yyyyMMdd";
// It's safe to declare it as singleton since it runs on single thread only
private static final DateFormat FILE_NAME_FORMATTER = new SimpleDateFormat(FILE_DATE_FORMAT);
private static final Map<String, Set<AccessLogData>> LOG_ENTRIES = new ConcurrentHashMap<String, Set<AccessLogData>>();
private static final ScheduledExecutorService LOG_SCHEDULED = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Dubbo-Access-Log", true));
/**
* Default constructor initialize demon thread for writing into access log file with names with access log key
* defined in url <b>accesslog</b>
*/
public AccessLogFilter() {LOG_SCHEDULED.scheduleWithFixedDelay(this::writeLogToFile, LOG_OUTPUT_INTERVAL, LOG_OUTPUT_INTERVAL, TimeUnit.MILLISECONDS);
}
/**
* This method logs the access log for service method invocation call.
*
* @param invoker service
* @param inv Invocation service method.
* @return Result from service method.
* @throws RpcException
*/
@Override
public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
try {String accessLogKey = invoker.getUrl().getParameter(ACCESS_LOG_KEY);
if (ConfigUtils.isNotEmpty(accessLogKey)) {AccessLogData logData = buildAccessLogData(invoker, inv);
log(accessLogKey, logData);
}
} catch (Throwable t) {logger.warn("Exception in AccessLogFilter of service(" + invoker + "->" + inv + ")", t);
}
return invoker.invoke(inv);
}
private void log(String accessLog, AccessLogData accessLogData) {Set<AccessLogData> logSet = LOG_ENTRIES.computeIfAbsent(accessLog, k -> new ConcurrentHashSet<>());
if (logSet.size() < LOG_MAX_BUFFER) {logSet.add(accessLogData);
} else {
//TODO we needs use force writing to file so that buffer gets clear and new log can be written.
logger.warn("AccessLog buffer is full skipping buffer");
}
}
private void writeLogToFile() {if (!LOG_ENTRIES.isEmpty()) {for (Map.Entry<String, Set<AccessLogData>> entry : LOG_ENTRIES.entrySet()) {
try {String accessLog = entry.getKey();
Set<AccessLogData> logSet = entry.getValue();
if (ConfigUtils.isDefault(accessLog)) {processWithServiceLogger(logSet);
} else {File file = new File(accessLog);
createIfLogDirAbsent(file);
if (logger.isDebugEnabled()) {logger.debug("Append log to" + accessLog);
}
renameFile(file);
processWithAccessKeyLogger(logSet, file);
}
} catch (Exception e) {logger.error(e.getMessage(), e);
}
}
}
}
private void processWithAccessKeyLogger(Set<AccessLogData> logSet, File file) throws IOException {try (FileWriter writer = new FileWriter(file, true)) {for (Iterator<AccessLogData> iterator = logSet.iterator();
iterator.hasNext();
iterator.remove()) {writer.write(iterator.next().getLogMessage());
writer.write("\r\n");
}
writer.flush();}
}
private AccessLogData buildAccessLogData(Invoker<?> invoker, Invocation inv) {RpcContext context = RpcContext.getContext();
AccessLogData logData = AccessLogData.newLogData();
logData.setServiceName(invoker.getInterface().getName());
logData.setMethodName(inv.getMethodName());
logData.setVersion(invoker.getUrl().getParameter(VERSION_KEY));
logData.setGroup(invoker.getUrl().getParameter(GROUP_KEY));
logData.setInvocationTime(new Date());
logData.setTypes(inv.getParameterTypes());
logData.setArguments(inv.getArguments());
return logData;
}
private void processWithServiceLogger(Set<AccessLogData> logSet) {for (Iterator<AccessLogData> iterator = logSet.iterator();
iterator.hasNext();
iterator.remove()) {AccessLogData logData = iterator.next();
LoggerFactory.getLogger(ACCESS_LOG_KEY + "." + logData.getServiceName()).info(logData.getLogMessage());
}
}
private void createIfLogDirAbsent(File file) {File dir = file.getParentFile();
if (null != dir && !dir.exists()) {dir.mkdirs();
}
}
private void renameFile(File file) {if (file.exists()) {String now = FILE_NAME_FORMATTER.format(new Date());
String last = FILE_NAME_FORMATTER.format(new Date(file.lastModified()));
if (!now.equals(last)) {File archive = new File(file.getAbsolutePath() + "." + last);
file.renameTo(archive);
}
}
}
}
- AccessLogFilter 实现了 org.apache.dubbo.rpc.Filter 接口,其构造器注册了一个定时任务,每隔 LOG_OUTPUT_INTERVAL 执行一次 writeLogToFile
- invoke 方法从 invoker.getUrl() 获取 accessLogKey,如果不为空则使用 buildAccessLogData 构建 AccessLogData,然后放入到 LOG_ENTRIES 中,如果超出 LOG_MAX_BUFFER 则对其并打印 warn 日志
- writeLogToFile 方法遍历 LOG_ENTRIES,将 AccessLogData 写入文件,如果是 default 的则使用 processWithServiceLogger,否则使用 processWithAccessKeyLogger 方法
AccessLogFilterTest
dubbo-2.7.3/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/AccessLogFilterTest.java
public class AccessLogFilterTest {Filter accessLogFilter = new AccessLogFilter();
// Test filter won't throw an exception
@Test
public void testInvokeException() {Invoker<AccessLogFilterTest> invoker = new MyInvoker<AccessLogFilterTest>(null);
Invocation invocation = new MockInvocation();
LogUtil.start();
accessLogFilter.invoke(invoker, invocation);
assertEquals(1, LogUtil.findMessage("Exception in AccessLogFilter of service"));
LogUtil.stop();}
// TODO how to assert thread action
@Test
public void testDefault() {URL url = URL.valueOf("test://test:11/test?accesslog=true&group=dubbo&version=1.1");
Invoker<AccessLogFilterTest> invoker = new MyInvoker<AccessLogFilterTest>(url);
Invocation invocation = new MockInvocation();
accessLogFilter.invoke(invoker, invocation);
}
@Test
public void testCustom() {URL url = URL.valueOf("test://test:11/test?accesslog=custom-access.log");
Invoker<AccessLogFilterTest> invoker = new MyInvoker<AccessLogFilterTest>(url);
Invocation invocation = new MockInvocation();
accessLogFilter.invoke(invoker, invocation);
}
}
- 这里验证了 invokeException、default、custom 场景
AccessLogData
dubbo-2.7.3/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/support/AccessLogData.java
public final class AccessLogData {
private static final String MESSAGE_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
private static final DateFormat MESSAGE_DATE_FORMATTER = new SimpleDateFormat(MESSAGE_DATE_FORMAT);
private static final String VERSION = "version";
private static final String GROUP = "group";
private static final String SERVICE = "service";
private static final String METHOD_NAME = "method-name";
private static final String INVOCATION_TIME = "invocation-time";
private static final String TYPES = "types";
private static final String ARGUMENTS = "arguments";
private static final String REMOTE_HOST = "remote-host";
private static final String REMOTE_PORT = "remote-port";
private static final String LOCAL_HOST = "localhost";
private static final String LOCAL_PORT = "local-port";
/**
* This is used to store log data in key val format.
*/
private Map<String, Object> data;
/**
* Default constructor.
*/
private AccessLogData() {RpcContext context = RpcContext.getContext();
data = new HashMap<>();
setLocalHost(context.getLocalHost());
setLocalPort(context.getLocalPort());
setRemoteHost(context.getRemoteHost());
setRemotePort(context.getRemotePort());
}
//......
public String getLogMessage() {StringBuilder sn = new StringBuilder();
sn.append("[")
.append(MESSAGE_DATE_FORMATTER.format(getInvocationTime()))
.append("]")
.append(get(REMOTE_HOST))
.append(":")
.append(get(REMOTE_PORT))
.append("->")
.append(get(LOCAL_HOST))
.append(":")
.append(get(LOCAL_PORT))
.append("-");
String group = get(GROUP) != null ? get(GROUP).toString() : "";
if (StringUtils.isNotEmpty(group.toString())) {sn.append(group).append("/");
}
sn.append(get(SERVICE));
String version = get(VERSION) != null ? get(VERSION).toString() : "";
if (StringUtils.isNotEmpty(version.toString())) {sn.append(":").append(version);
}
sn.append(" ");
sn.append(get(METHOD_NAME));
sn.append("(");
Class<?>[] types = get(TYPES) != null ? (Class<?>[]) get(TYPES) : new Class[0];
boolean first = true;
for (Class<?> type : types) {if (first) {first = false;} else {sn.append(",");
}
sn.append(type.getName());
}
sn.append(")");
Object[] args = get(ARGUMENTS) != null ? (Object[]) get(ARGUMENTS) : null;
if (args != null && args.length > 0) {sn.append(JSON.toJSONString(args));
}
return sn.toString();}
//......
}
- AccessLogData 定义了 version、group、service、method-name、invocation-time、types、arguments、remote-host、remote-port、localhost、local-port 常量;getLogMessage 则构建 log 的输出
小结
- AccessLogFilter 实现了 org.apache.dubbo.rpc.Filter 接口,其构造器注册了一个定时任务,每隔 LOG_OUTPUT_INTERVAL 执行一次 writeLogToFile
- AccessLogFilter 的 invoke 方法从 invoker.getUrl() 获取 accessLogKey,如果不为空则使用 buildAccessLogData 构建 AccessLogData,然后放入到 LOG_ENTRIES 中,如果超出 LOG_MAX_BUFFER 则对其并打印 warn 日志
- AccessLogFilter 的 writeLogToFile 方法遍历 LOG_ENTRIES,将 AccessLogData 写入文件,如果是 default 的则使用 processWithServiceLogger,否则使用 processWithAccessKeyLogger 方法
doc
- AccessLogFilter