乐趣区

SpringBootadmin之HttpTrace显示入参和出参及监控Redisson

spring-boot-admin(以下简称 SBA) 与 Spring Boot、Spring cloud 项目以 starter 得方式自动集成, 包括 Server 端和 Client 端

SBA监控包括应用的基本信息、logfile(在线实时浏览或者 download)、JVM 信息(线程信息、堆信息、非堆信息)、Web(API 接口信息、最近 100 次 API 调用的信息)、应用中用户登录信息; 监控指标很全面,但针对具体项目就要增加符合自己项目的内容了,比如如下两点:

自定义 HttpTrace 增加入参和出参

结果:

spring-boot-admin 中 HttpTrace 显示的信息包括 sessionprincipalrequestresponsetimeTakentimestamp, 但 sessionprincipal 对该项目完全无用,requestHttpTrace 的内部类显示信息包括:

private final String method;
private final URI uri;
// 唯一可以扩展的地方
private final Map<String, List<String>> headers;
private final String remoteAddress;

response也是 HttpTrace 的内部类:

private final int status;
// 唯一可以扩展的地方
private final Map<String, List<String>> headers;

唯一缺少的就是请求的 入参 出参 ,而 Headers 的信息是无用的。所以扩展 HttpTrace 显示请求中的 入参 出参 势在必行, 大致的思路是:自定义 Filter–> 装饰模式转换成自定义的 request 和 response 对象, 内部获取请求和相应内容 –>HttpExchangeTracer 创建 HttpTrace 对象 –>InmemoryHttpTraceRepository 保存 100 次请求的 HttpTrace 对象,供 server 端使用。由于 Filter 中使用的部分对象是先创建的所以我们先从需要的零部件开始

  • 第一步: 包装 HttpServletRequest 获取请求内容:
public class RequestWrapper extends HttpServletRequestWrapper {// 存放请求的消息体(先缓存一份)
    private byte[] body;
// 自定义输入流的包装类,将缓存数据再写入到流中
    private ServletInputStreamWrapper wrapper;
    private final Logger logger = LoggerFactory.getLogger(RequestWrapper.class);

    public RequestWrapper(HttpServletRequest request) {super(request);
        try {
// 使用 Apache 的 commons-io 工具从 request 中先读取数据
            body = IOUtils.toByteArray(request.getInputStream());
        } catch (IOException e) {logger.error("从请求中获取请求参数出现异常:", e);
        }
// 将读取出来的内存再写入流中
        wrapper = new ServletInputStreamWrapper(new ByteArrayInputStream(body));
    }
// 转换成 String 供外部调用, 并替换转义字符
    public String body() {return new String(body).replaceAll("[\n\t\r]","");
    }
// 将我们的自定义的流包装类返回,供系统调用 读取数据
    @Override
    public ServletInputStream getInputStream() throws IOException {return this.wrapper;}
// 将我们的自定义的流包装类返回,供系统调用 读取数据
    @Override
    public BufferedReader getReader() throws IOException {return new BufferedReader(new InputStreamReader(this.wrapper));
    }
   // 从给定的输入流中读取数据
    static final class ServletInputStreamWrapper extends ServletInputStream {

        private InputStream inputStream;

        public ServletInputStreamWrapper(InputStream inputStream) {this.inputStream = inputStream;}

        @Override
        public boolean isFinished() {return true;}

        @Override
        public boolean isReady() {return false;}

        @Override
        public void setReadListener(ReadListener listener) { }
// 读取缓存数据
        @Override
        public int read() throws IOException {return this.inputStream.read();
        }

        public InputStream getInputStream() {return inputStream;}

        public void setInputStream(InputStream inputStream) {this.inputStream = inputStream;}
    }
}
  • 第二步: 包装 HttpServletResponse 类获取响应内容:
public class ResponseWrapper extends HttpServletResponseWrapper {

    private HttpServletResponse response;
// 缓存响应内容的输出流
    private ByteArrayOutputStream result = new ByteArrayOutputStream();

    public ResponseWrapper(HttpServletResponse response) {super(response);
        this.response = response;
    }

    /**
     * 响应的内容 供外部调用
     * 针对 体积较大的响应内容 很容易发生 OOM(比如:/actuator/logfile 接口),可在调用该方法的地方就行 api 过滤
     * 解决方法在第四步
     */
    public String body(){return result.toString();
    }

    @Override
    public ServletOutputStream getOutputStream() throws IOException {return new ServletOutputStreamWrapper(this.response,this.result);
    }

    @Override
    public PrintWriter getWriter() throws IOException {return new PrintWriter(new OutputStreamWriter(this.result,this.response.getCharacterEncoding()));
    }

// 自定义输出流的包装类 内部类
    static final class ServletOutputStreamWrapper extends ServletOutputStream{

        private HttpServletResponse response;
        private ByteArrayOutputStream byteArrayOutputStream;

        public ServletOutputStreamWrapper(HttpServletResponse response, ByteArrayOutputStream byteArrayOutputStream) {
            this.response = response;
            this.byteArrayOutputStream = byteArrayOutputStream;
        }

        @Override
        public boolean isReady() {return true;}

        @Override
        public void setWriteListener(WriteListener listener) { }

        @Override
        public void write(int b) throws IOException {this.byteArrayOutputStream.write(b);
        }

        /**
         * 将内容重新刷新到返回的对象中  并且避免多次刷新
         */
        @Override
        public void flush() throws IOException {if(!response.isCommitted()){byte[] bytes = this.byteArrayOutputStream.toByteArray();
                ServletOutputStream outputStream = response.getOutputStream();
                outputStream.write(bytes);
                outputStream.flush();}
        }
    }
}
  • 第三步: 扩展 TraceableRequest, 该接口中的方法会在创建HttpTrace#Request 内部类时调用,自定义实现里面的方法,再在过滤器中引用该类就可以达到自定义显示内容的目的,该类中的 Request 是我们第一步创建的装饰类,不能使用 HttpServletRequest
public class CustomerTraceableRequest implements TraceableRequest {
// 自定义的 Request 装饰类,不能使用 HttpServletRequest
    private RequestWrapper request;

    public CustomerTraceableRequest(RequestWrapper request) {this.request = request;}
//HttpTrace 类中 getMethod 会调用
    @Override
    public String getMethod() {return request.getMethod();
    }

    /**
     * @return POST 或者 GET 方式 都返回 {ip}:{port}/uir 的形式返回
     */
    @Override
    public URI getUri() {return URI.create(request.getRequestURL().toString());
    }

// 因为在 HttpTrace 中可扩展的只有 headers 的 Map, 所以我们自定义属性 RequestParam 存入 headers 中,作为入参信息展示
    @Override
    public Map<String, List<String>> getHeaders() {Map<String, List<String>> headerParam = new HashMap<>(1);
        headerParam.put("RequestParam",getParams());
        return headerParam;
    }

// 该方法也要重写,默认的太简单无法获取真是的 IP
    @Override
    public String getRemoteAddress() {return IpUtils.getIpAddress(request);
    }
// 根据 GET 或者 POST 的请求方式不同, 获取不同情况下的请求参数
    public List<String> getParams() {
        String params = null;
        String method = this.getMethod();
        if(HttpMethod.GET.matches(method)){params = request.getQueryString();
        }else if(HttpMethod.POST.matches(method)){params = this.request.body();
        }
        List<String> result = new ArrayList<>(1);
        result.add(params);
        return result;
    }
}
  • 第四步: 扩展 TraceableResponse, 该接口中方法在创建HttpTrace#Response 内部类时引用,自定义实现里面的方法:
public class CustomerTraceableResponse implements TraceableResponse {
    // 自定义的 HttpServletResponse 包装类
    private ResponseWrapper response;
    private HttpServletRequest request;

    public CustomerTraceableResponse(ResponseWrapper response, HttpServletRequest request) {
        this.response = response;
        this.request = request;
    }
// 返回响应状态
    @Override
    public int getStatus() {return response.getStatus();
    }
// 扩展 Response headers 添加 Response Body 属性,展示响应内容,但是需要排除 `/actuator/` 开头的请求,这里面部分响应内容太大,容易 OOM
    @Override
    public Map<String, List<String>> getHeaders() {if(isActuatorUri()){return extractHeaders();
        }else{Map<String, List<String>> result = new LinkedHashMap<>(1);
            List<String> responseBody = new ArrayList<>(1);
            responseBody.add(this.response.body());
            result.put("ResponseBody", responseBody);
            result.put("Content-Type", getContentType());
            return result;
        }
    }
// 是否是需要过滤的请求 uri
    private boolean isActuatorUri() {String requestUri = request.getRequestURI();
        AntPathMatcher matcher = new AntPathMatcher();
        return matcher.match("/actuator/**", requestUri);
    }
//server 端页面展示的 Content-Type 以及 Length 是从 Response 中获取的
    private List<String> getContentType() {List<String> list = new ArrayList<>(1);
        list.add(this.response.getContentType());
        return list;
    }
// 针对 /actuator/** 的请求返回默认的 headers 内容获
    private Map<String, List<String>> extractHeaders() {Map<String, List<String>> headers = new LinkedHashMap<>();
        for (String name : this.response.getHeaderNames()) {headers.put(name, new ArrayList<>(this.response.getHeaders(name)));
        }
        return headers;
    }
}
  • 第五步: 自定义 Filter 对 Resquest 和 Response 过滤, 并创建 HttpTrace 对象:
public class CustomerHttpTraceFilter extends OncePerRequestFilter implements Ordered {
// 存储 HttpTrace 的 repository, 默认是居于内存的,可扩展该类跟换存储数据的方式
    private HttpTraceRepository httpTraceRepository;
// 该类创建 HttpTrace 对象,Set<Include> 包含的内容是我们需要展示那些内容的容器(request-headers,response-headers,remote-address,time-taken)
    private HttpExchangeTracer httpExchangeTracer;

    public CustomerHttpTraceFilter(HttpTraceRepository httpTraceRepository, HttpExchangeTracer httpExchangeTracer) {
        this.httpTraceRepository = httpTraceRepository;
        this.httpExchangeTracer = httpExchangeTracer;
    }

    @Override
    protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) throws ServletException, IOException {
// 校验 URI 是否有效
        if (!isRequestValid(request)) {filterChain.doFilter(request, response);
            return;
        }
// 将 HttpServletRequest 包装成我们自己的
        RequestWrapper wrapper = new RequestWrapper(request);
// 将 HttpServletResponse 包装成我们的自己的
        ResponseWrapper responseWrapper = new ResponseWrapper(response);

// 创建我们的自己的 TraceRequest 对象
        CustomerTraceableRequest traceableRequest = new CustomerTraceableRequest(wrapper);
// 创建 HttpTrace 对象(FilteredTraceableRequest 是内部类,通过 Set<Include> 筛选那些信息需要展示就保存那些信息), 重点设置 HttpTrace#Request 对象的各种参数
        HttpTrace httpTrace = httpExchangeTracer.receivedRequest(traceableRequest);
        try {filterChain.doFilter(wrapper, responseWrapper);
        } finally {
// 自定义的 TraceableResponse 保存需要的 response 信息
            CustomerTraceableResponse traceableResponse = new CustomerTraceableResponse(responseWrapper,request);
// 根据 Set<Include> 设置 HttpTrace 中 session、principal、timeTaken 信息以及 Response 内部类信息       
  this.httpExchangeTracer.sendingResponse(httpTrace, traceableResponse, null, null);
// 将 HttpTrace 对象保存在 Respository 中存储起来
            this.httpTraceRepository.add(httpTrace);
        }
    }

    private boolean isRequestValid(HttpServletRequest request) {
        try {new URI(request.getRequestURL().toString());
            return true;
        } catch (URISyntaxException ex) {return false;}
    }

    @Override
    public int getOrder() {return Ordered.LOWEST_PRECEDENCE - 10;}
}
  • 第六步: 通过 @SpringBootApplication(exclude) 禁用 HttpTraceAutoConfiguration 自动配置,自定义自动配置更换 Filter 过滤器:
@Configuration
@ConditionalOnWebApplication
@ConditionalOnProperty(prefix = "management.trace.http", name = "enabled", matchIfMissing = true)
@EnableConfigurationProperties(HttpTraceProperties.class)
public class TraceFilterConfig {

// 存储 HttpTrace 信息的对象
    @Bean
    @ConditionalOnMissingBean(HttpTraceRepository.class)
    public InMemoryHttpTraceRepository traceRepository() {return new InMemoryHttpTraceRepository();
    }
// 创建 HttpTrace 对象 Exchange
    @Bean
    @ConditionalOnMissingBean
    public HttpExchangeTracer httpExchangeTracer(HttpTraceProperties traceProperties) {return new HttpExchangeTracer(traceProperties.getInclude());
    }

    @ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.SERVLET)
    static class ServletTraceFilterConfiguration {
// 将我们自定义的 Filter 已 Bean 的方式注册,才能生效
        @Bean
        @ConditionalOnMissingBean
        public CustomerHttpTraceFilter httpTraceFilter(HttpTraceRepository repository,
                                               HttpExchangeTracer tracer) {return new CustomerHttpTraceFilter(repository,tracer);
        }
    }

    @ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.REACTIVE)
    static class ReactiveTraceFilterConfiguration {

        @Bean
        @ConditionalOnMissingBean
        public HttpTraceWebFilter httpTraceWebFilter(HttpTraceRepository repository,
                                                     HttpExchangeTracer tracer, HttpTraceProperties traceProperties) {
            return new HttpTraceWebFilter(repository, tracer,
                    traceProperties.getInclude());
        }
    }
}

集成 Redisson 健康状态监控

如果有引入 spring-boot-starter-redis,SBA 默认同过RedisConnectionFactory 监控 Redis 的健康状态,无奈 Redisson 还没有,自己东收丰衣足食。通过 HealthIndicatorReactiveHealthIndicator使用策略模式实现不同组件的健康监控,后者是使用 Rective 模式下的。我是通过 JavaBean 的方式配置 Redisson, 所以顺便实现 ReactiveHealthIndicator 再添加该指标即可:

@Configuration
@EnableConfigurationProperties(value = RedissonProperties.class)
public class RedissonConfig implements ReactiveHealthIndicator {
// 自己的 RedissonProperties 文件
    @Autowired
    private RedissonProperties redissonProperties;
// 暴露 redissonClient 句柄
    @Bean
    @ConditionalOnMissingBean
    public RedissonClient redisClient() {return Redisson.create(config());
    }
// 通过 Bean 的方式配置 RedissonConfig 相关信息
    @Bean
    public Config config() {Config config = new Config();
        config.useSingleServer() // 单实列模式
                .setAddress(redissonProperties.getAddress() + ":" + redissonProperties.getPort())
                .setPassword(redissonProperties.getPassword())
                .setDatabase(redissonProperties.getDatabase())
                .setConnectionPoolSize(redissonProperties.getConnectionPoolSize())
                .setConnectionMinimumIdleSize(redissonProperties.getConnectionMinimumIdleSize())
                .setIdleConnectionTimeout(redissonProperties.getIdleConnectionTimeout())
                .setSubscriptionConnectionPoolSize(redissonProperties.getSubscriptionConnectionPoolSize())
                .setSubscriptionConnectionMinimumIdleSize(redissonProperties.getSubscriptionConnectionMinimumIdleSize())
                .setTimeout(redissonProperties.getTimeout())
                .setRetryAttempts(redissonProperties.getRetryAttempts())
                .setRetryInterval(redissonProperties.getRetryInterval())
                .setConnectTimeout(redissonProperties.getConnectTimeout())
                .setReconnectionTimeout(redissonProperties.getReconnectionTimeout());
        config
                .setCodecProvider(new DefaultCodecProvider())
                .setEventLoopGroup(new NioEventLoopGroup())
                .setThreads(Runtime.getRuntime().availableProcessors() * 2)
                .setNettyThreads(Runtime.getRuntime().availableProcessors() * 2);
        return config;
    }
// 实现 ReactiveHealthIndicator 重写 health 方法
    @Override
    public Mono<Health> health() {return checkRedissonHealth().onErrorResume(ex -> Mono.just(new Health.Builder().down(ex).build()));
    }
// 我是通过 ping 的方式判断 redis 服务器是否 up 的状态,并增加加 Netty 和 Threads 的监控
    private Mono<Health> checkRedissonHealth() {Health.Builder builder = new Health.Builder();
        builder.withDetail("address", redissonProperties.getAddress());
        // 检测健康状态
        if (this.redisClient().getNodesGroup().pingAll()) {builder.status(Status.UP);
            builder.withDetail("dataBase", redissonProperties.getDatabase());
            builder.withDetail("redisNodeThreads", this.redisClient().getConfig().getThreads());
            builder.withDetail("nettyThreads", this.redisClient().getConfig().getNettyThreads());

        }else{builder.status(Status.DOWN);
        }
        return Mono.just(builder.build());
    }
}

在页面上看就是:

Ok!圆满完成!
如有错误,不吝赐教!


欢迎关注公众号:

退出移动版