乐趣区

聊聊dubbo的AccessLogFilter

本文主要研究一下 dubbo 的 AccessLogFilter

AccessLogFilter

dubbo-2.7.3/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/AccessLogFilter.java

@Activate(group = PROVIDER, value = ACCESS_LOG_KEY)
public class AccessLogFilter implements Filter {private static final Logger logger = LoggerFactory.getLogger(AccessLogFilter.class);

    private static final String ACCESS_LOG_KEY = "dubbo.accesslog";

    private static final int LOG_MAX_BUFFER = 5000;

    private static final long LOG_OUTPUT_INTERVAL = 5000;

    private static final String FILE_DATE_FORMAT = "yyyyMMdd";

    // It's safe to declare it as singleton since it runs on single thread only
    private static final DateFormat FILE_NAME_FORMATTER = new SimpleDateFormat(FILE_DATE_FORMAT);

    private static final Map<String, Set<AccessLogData>> LOG_ENTRIES = new ConcurrentHashMap<String, Set<AccessLogData>>();

    private static final ScheduledExecutorService LOG_SCHEDULED = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Dubbo-Access-Log", true));

    /**
     * Default constructor initialize demon thread for writing into access log file with names with access log key
     * defined in url <b>accesslog</b>
     */
    public AccessLogFilter() {LOG_SCHEDULED.scheduleWithFixedDelay(this::writeLogToFile, LOG_OUTPUT_INTERVAL, LOG_OUTPUT_INTERVAL, TimeUnit.MILLISECONDS);
    }

    /**
     * This method logs the access log for service method invocation call.
     *
     * @param invoker service
     * @param inv     Invocation service method.
     * @return Result from service method.
     * @throws RpcException
     */
    @Override
    public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
        try {String accessLogKey = invoker.getUrl().getParameter(ACCESS_LOG_KEY);
            if (ConfigUtils.isNotEmpty(accessLogKey)) {AccessLogData logData = buildAccessLogData(invoker, inv);
                log(accessLogKey, logData);
            }
        } catch (Throwable t) {logger.warn("Exception in AccessLogFilter of service(" + invoker + "->" + inv + ")", t);
        }
        return invoker.invoke(inv);
    }

    private void log(String accessLog, AccessLogData accessLogData) {Set<AccessLogData> logSet = LOG_ENTRIES.computeIfAbsent(accessLog, k -> new ConcurrentHashSet<>());

        if (logSet.size() < LOG_MAX_BUFFER) {logSet.add(accessLogData);
        } else {
            //TODO we needs use force writing to file so that buffer gets clear and new log can be written.
            logger.warn("AccessLog buffer is full skipping buffer");
        }
    }

    private void writeLogToFile() {if (!LOG_ENTRIES.isEmpty()) {for (Map.Entry<String, Set<AccessLogData>> entry : LOG_ENTRIES.entrySet()) {
                try {String accessLog = entry.getKey();
                    Set<AccessLogData> logSet = entry.getValue();
                    if (ConfigUtils.isDefault(accessLog)) {processWithServiceLogger(logSet);
                    } else {File file = new File(accessLog);
                        createIfLogDirAbsent(file);
                        if (logger.isDebugEnabled()) {logger.debug("Append log to" + accessLog);
                        }
                        renameFile(file);
                        processWithAccessKeyLogger(logSet, file);
                    }

                } catch (Exception e) {logger.error(e.getMessage(), e);
                }
            }
        }
    }

    private void processWithAccessKeyLogger(Set<AccessLogData> logSet, File file) throws IOException {try (FileWriter writer = new FileWriter(file, true)) {for (Iterator<AccessLogData> iterator = logSet.iterator();
                 iterator.hasNext();
                 iterator.remove()) {writer.write(iterator.next().getLogMessage());
                writer.write("\r\n");
            }
            writer.flush();}
    }

    private AccessLogData buildAccessLogData(Invoker<?> invoker, Invocation inv) {RpcContext context = RpcContext.getContext();
        AccessLogData logData = AccessLogData.newLogData();
        logData.setServiceName(invoker.getInterface().getName());
        logData.setMethodName(inv.getMethodName());
        logData.setVersion(invoker.getUrl().getParameter(VERSION_KEY));
        logData.setGroup(invoker.getUrl().getParameter(GROUP_KEY));
        logData.setInvocationTime(new Date());
        logData.setTypes(inv.getParameterTypes());
        logData.setArguments(inv.getArguments());
        return logData;
    }

    private void processWithServiceLogger(Set<AccessLogData> logSet) {for (Iterator<AccessLogData> iterator = logSet.iterator();
             iterator.hasNext();
             iterator.remove()) {AccessLogData logData = iterator.next();
            LoggerFactory.getLogger(ACCESS_LOG_KEY + "." + logData.getServiceName()).info(logData.getLogMessage());
        }
    }

    private void createIfLogDirAbsent(File file) {File dir = file.getParentFile();
        if (null != dir && !dir.exists()) {dir.mkdirs();
        }
    }

    private void renameFile(File file) {if (file.exists()) {String now = FILE_NAME_FORMATTER.format(new Date());
            String last = FILE_NAME_FORMATTER.format(new Date(file.lastModified()));
            if (!now.equals(last)) {File archive = new File(file.getAbsolutePath() + "." + last);
                file.renameTo(archive);
            }
        }
    }
}
  • AccessLogFilter 实现了 org.apache.dubbo.rpc.Filter 接口,其构造器注册了一个定时任务,每隔 LOG_OUTPUT_INTERVAL 执行一次 writeLogToFile
  • invoke 方法从 invoker.getUrl() 获取 accessLogKey,如果不为空则使用 buildAccessLogData 构建 AccessLogData,然后放入到 LOG_ENTRIES 中,如果超出 LOG_MAX_BUFFER 则对其并打印 warn 日志
  • writeLogToFile 方法遍历 LOG_ENTRIES,将 AccessLogData 写入文件,如果是 default 的则使用 processWithServiceLogger,否则使用 processWithAccessKeyLogger 方法

AccessLogFilterTest

dubbo-2.7.3/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/AccessLogFilterTest.java

public class AccessLogFilterTest {Filter accessLogFilter = new AccessLogFilter();

    // Test filter won't throw an exception
    @Test
    public void testInvokeException() {Invoker<AccessLogFilterTest> invoker = new MyInvoker<AccessLogFilterTest>(null);
        Invocation invocation = new MockInvocation();
        LogUtil.start();
        accessLogFilter.invoke(invoker, invocation);
        assertEquals(1, LogUtil.findMessage("Exception in AccessLogFilter of service"));
        LogUtil.stop();}

    // TODO how to assert thread action
    @Test
    public void testDefault() {URL url = URL.valueOf("test://test:11/test?accesslog=true&group=dubbo&version=1.1");
        Invoker<AccessLogFilterTest> invoker = new MyInvoker<AccessLogFilterTest>(url);
        Invocation invocation = new MockInvocation();
        accessLogFilter.invoke(invoker, invocation);
    }

    @Test
    public void testCustom() {URL url = URL.valueOf("test://test:11/test?accesslog=custom-access.log");
        Invoker<AccessLogFilterTest> invoker = new MyInvoker<AccessLogFilterTest>(url);
        Invocation invocation = new MockInvocation();
        accessLogFilter.invoke(invoker, invocation);
    }

}
  • 这里验证了 invokeException、default、custom 场景

AccessLogData

dubbo-2.7.3/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/support/AccessLogData.java

public final class AccessLogData {

    private static final String MESSAGE_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
    private static final DateFormat MESSAGE_DATE_FORMATTER = new SimpleDateFormat(MESSAGE_DATE_FORMAT);

    private static final String VERSION = "version";
    private static final String GROUP = "group";
    private static final String SERVICE = "service";
    private static final String METHOD_NAME = "method-name";
    private static final String INVOCATION_TIME = "invocation-time";
    private static final String TYPES = "types";
    private static final String ARGUMENTS = "arguments";
    private static final String REMOTE_HOST = "remote-host";
    private static final String REMOTE_PORT = "remote-port";
    private static final String LOCAL_HOST = "localhost";
    private static final String LOCAL_PORT = "local-port";

    /**
     * This is used to store log data in key val format.
     */
    private Map<String, Object> data;

    /**
     * Default constructor.
     */
    private AccessLogData() {RpcContext context = RpcContext.getContext();
        data = new HashMap<>();
        setLocalHost(context.getLocalHost());
        setLocalPort(context.getLocalPort());
        setRemoteHost(context.getRemoteHost());
        setRemotePort(context.getRemotePort());
    }

    //......

    public String getLogMessage() {StringBuilder sn = new StringBuilder();

        sn.append("[")
                .append(MESSAGE_DATE_FORMATTER.format(getInvocationTime()))
                .append("]")
                .append(get(REMOTE_HOST))
                .append(":")
                .append(get(REMOTE_PORT))
                .append("->")
                .append(get(LOCAL_HOST))
                .append(":")
                .append(get(LOCAL_PORT))
                .append("-");

        String group = get(GROUP) != null ? get(GROUP).toString() : "";
        if (StringUtils.isNotEmpty(group.toString())) {sn.append(group).append("/");
        }

        sn.append(get(SERVICE));

        String version = get(VERSION) != null ? get(VERSION).toString() : "";
        if (StringUtils.isNotEmpty(version.toString())) {sn.append(":").append(version);
        }

        sn.append(" ");
        sn.append(get(METHOD_NAME));

        sn.append("(");
        Class<?>[] types = get(TYPES) != null ? (Class<?>[]) get(TYPES) : new Class[0];
        boolean first = true;
        for (Class<?> type : types) {if (first) {first = false;} else {sn.append(",");
            }
            sn.append(type.getName());
        }
        sn.append(")");


        Object[] args = get(ARGUMENTS) != null ? (Object[]) get(ARGUMENTS) : null;
        if (args != null && args.length > 0) {sn.append(JSON.toJSONString(args));
        }

        return sn.toString();}

    //......
}
  • AccessLogData 定义了 version、group、service、method-name、invocation-time、types、arguments、remote-host、remote-port、localhost、local-port 常量;getLogMessage 则构建 log 的输出

小结

  • AccessLogFilter 实现了 org.apache.dubbo.rpc.Filter 接口,其构造器注册了一个定时任务,每隔 LOG_OUTPUT_INTERVAL 执行一次 writeLogToFile
  • AccessLogFilter 的 invoke 方法从 invoker.getUrl() 获取 accessLogKey,如果不为空则使用 buildAccessLogData 构建 AccessLogData,然后放入到 LOG_ENTRIES 中,如果超出 LOG_MAX_BUFFER 则对其并打印 warn 日志
  • AccessLogFilter 的 writeLogToFile 方法遍历 LOG_ENTRIES,将 AccessLogData 写入文件,如果是 default 的则使用 processWithServiceLogger,否则使用 processWithAccessKeyLogger 方法

doc

  • AccessLogFilter
退出移动版