乐趣区

关于后端:Web端如何做到日志文件实时查看该踩的坑我都踩了

前言

最近在做一个小工具,有个需要是在 Web 端能实时查看日志文件,也就是相当于在终端执行 tail -f 命令,对此没有找到好的解决形式,一开始想的间接通过 FileInputStream 来读取,因为他也能间接跳过 n 个字节来读取,就像上面这样。

public static void main(String[] args) throws Exception {File file = new File("/home/1.txt");
    FileInputStream fin = new FileInputStream(file);

    int ch;
    fin.skip(10);
    while ((ch = fin.read()) != -1){System.out.print((char) ch);
    }
  }
复制代码

如果不跳过的话,那么每次读取全部内容并展现显然不事实,咱们要做的是像 tail 一样,每次从后 n 行开始读取,并且会继续输入最新的行。

还有一个问题就是对文件的变动要能感知到,所以最初抉择间接调用 tail 命令,并且通过 WebSocket 输入到网页上。

tail 用法

在 java 中调用 tail 命令后,拿到它的输出流并且包装成 BufferedReader,如果通过 readLine()读取不到数据,那么他会始终阻塞,并不会返回 null,这也就代表日志文件中临时还没有新数据写入,一旦 readLine()办法返回,那么就代表有新数据达到了。另外一个问题就是如何终止,咱们不可能让他始终读取,要在一个适合的工夫终止,答案就是在 WebSocket 断开连接时,并且 Process 类提供了 destroy()办法用来终止这个过程,相当于按下了Ctrl+C

 public static void main(String[] args) throws Exception {Process exec = Runtime.getRuntime().exec(new String[]{"bash", "-c", "tail -F /home/HouXinLin/test.txt"});
     InputStream inputStream = exec.getInputStream();
     BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
     for (;;){System.out.println(bufferedReader.readLine()+"r");
     }
 }
复制代码

实现过程

在 Spring Boot 中退出 WebSocket 性能有很多形式,目前感觉广泛的文章都是介绍以 ServerEndpointExporter、@OnOpen、@OnClose、@OnMessage 这种形式来实现的,这种形式须要申明一个 Bean,也就是 ServerEndpointExporter,然而我记得如果要打包成 war 放入 Tomcat 中运行时,还须要把这个 Bean 勾销掉,否则还会报错,十分的麻烦,当然也有方法解决。

还有其余集成的方法,比方实现 WebSocketConfigurer 或者 WebSocketMessageBrokerConfigurer 接口,而我目前采纳的是实现 WebSocketMessageBrokerConfigurer 接口,并且前端还须要两个库,SockJS 和 Stomp(更具抉择,也能够不应用)。

SockJS 提供相似于 WebSocket 的对象,还有一套跨浏览器的 API,能够在浏览器和 Web 服务器之间创立了低提早,全双工,跨域的通信通道,如果浏览器不反对 WebSocket,它还能够模仿对 WebSocket 的反对。

Stomp 即 Simple Text Orientated Messaging Protocol,简略 (流) 文本定向音讯协定,它提供了一个可互操作的连贯格局,容许 STOMP 客户端与任意 STOMP 音讯代理(Broker)进行交互。

首先看一下连贯解决层的逻辑,其中一部分非必要的代码就不展现了。

 @Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {private static final Logger log = LoggerFactory.getLogger(WebSocketConfig.class.getName());
    @Autowired
    SimpMessagingTemplate mSimpMessagingTemplate;

    @Autowired
    WebSocketManager mWebSocketManager;

    @Autowired
    TailLog mTailLog;

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {registry.enableSimpleBroker("/topic/path");
    }

    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registration) {registration.addDecoratorFactory(new WebSocketHandlerDecoratorFactory() {
            @Override
            public WebSocketHandler decorate(WebSocketHandler webSocketHandler) {return new WebSocketHandlerDecorator(webSocketHandler) {
                    @Override
                    public void afterConnectionEstablished(WebSocketSession session) throws Exception {log.info("日志监控 WebSocket 连贯,sessionId={}", session.getId());
                        mWebSocketManager.add(session);
                        super.afterConnectionEstablished(session);
                    }

                    @Override
                    public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {mWebSocketManager.remove(session.getId());
                        super.afterConnectionClosed(session, closeStatus);
                    }
                };
            }
        });

    }


    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {registry.addEndpoint("/socket-log")
                .addInterceptors(new HttpHandshakeInterceptor())
                .setHandshakeHandler(new DefaultHandshakeHandler() {
                    @Override
                    protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) {return new StompPrincipal(UUID.randomUUID().toString());
                    }
                })
                .withSockJS();}

    @EventListener
    public void handlerSessionCloseEvent(SessionDisconnectEvent sessionDisconnectEvent) {StompHeaderAccessor headerAccessor = StompHeaderAccessor.wrap(sessionDisconnectEvent.getMessage());
        mTailLog.stopMonitor(headerAccessor.getSessionId());
    }

    /**
     * 门路订阅
     *
     * @param sessionSubscribeEvent
     */
    @EventListener
    public void handlerSessionSubscribeEvent(SessionSubscribeEvent sessionSubscribeEvent) {StompHeaderAccessor headerAccessor = StompHeaderAccessor.wrap(sessionSubscribeEvent.getMessage());

        if (mTailLog.isArriveMaxLog()) {mWebSocketManager.sendMessage(headerAccessor.getSessionId(), "监控数量曾经达到限度,无奈查看"");
            log.info("日志监控 WebSocket 连贯曾经达到最大数量,将断开 sessionId={}", headerAccessor.getSessionId());
            mWebSocketManager.close(headerAccessor.getSessionId());
            return;
        }
        String destination = headerAccessor.getDestination();
        String userId = headerAccessor.getUser().getName();

        if (destination.startsWith("/user/topic/path")) {String path = destination.substring("/user/topic/path".length());
            File file = new File(StringUtils.urlDecoder(path));
            if (!file.exists()) {mWebSocketManager.sendMessage(headerAccessor.getSessionId(), "what are you 弄啥嘞,文件找不到啊");
                mWebSocketManager.close(headerAccessor.getSessionId());
                return;
            }
            TailLogListenerImpl tailLogListener = new TailLogListenerImpl(mSimpMessagingTemplate, userId);
            mTailLog.addMonitor(new LogMonitorObject(file.getName(), file.getParent(),
                    tailLogListener, "" + headerAccessor.getSessionId(), userId));
        }
    }

}
复制代码

对于下面的几个接口可能没应用过他的人有点蒙,至多我在学习他的时候是这样的,看下面的代码,咱们先要理清逻辑,能力明确为什么要这样写。

实现 registerStompEndpoints 办法

首先是 WebSocketMessageBrokerConfigurer 接口,Spring Boot 提供的一个 WebSocket 配置接口,只须要简简单单的配置两下,就能够实现一个 WebSocket 程序,这个接口中有 8 个办法,而咱们只须要用到三个个。

而后就是给出前端连贯 WebSocket 所须要的地址,如果连连贯地址都不给,前面步骤怎么持续?这个就是通过实现 registerStompEndpoints 办法来实现,只须要向 StompEndpointRegistry 中通过 addEndpoint 增加一个新的 ” 连接点 ” 就能够,还能够设置拦截器,也就是在前端试图连贯的时候,如果后端发现这个连贯不对劲,有猫腻,能够回绝和他连贯,这步能够通过 addInterceptors 来实现。

切记如果应用了 SocketJs 库,那么肯定要退出 withSockJS。

@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {registry.addEndpoint("/log")
            .addInterceptors(new HttpHandshakeInterceptor())
            .setHandshakeHandler(new DefaultHandshakeHandler() {
                @Override
                protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) {return new StompPrincipal(UUID.randomUUID().toString());
                }
            })
            .withSockJS();}
复制代码

保留 SessionId 和 WebSocketSession 对应关系

这一步是为了方便管理,比方被动断开连接,须要实现 configureWebSocketTransport 接口,然而这里的 SessionId 并不是服务端生成的会话 ID,而是这个 WebSocket 的会话 ID,每个 WebSocket 连贯都是不同的。

这里次要思考到如果前端传过来的文件不存在,那么服务端要能被动断开连接。

@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registration) {registration.addDecoratorFactory(new WebSocketHandlerDecoratorFactory() {
        @Override
        public WebSocketHandler decorate(WebSocketHandler webSocketHandler) {return new WebSocketHandlerDecorator(webSocketHandler) {
                @Override
                public void afterConnectionEstablished(WebSocketSession session) throws Exception {log.info("日志监控 WebSocket 连贯,sessionId={}", session.getId());
                    mWebSocketManager.add(session);
                    super.afterConnectionEstablished(session);
                }
                @Override
                public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {mWebSocketManager.remove(session.getId());
                    super.afterConnectionClosed(session, closeStatus);
                }
            };
        }
    });
}
复制代码

监听订阅

接着前端通过 Stomp 的 API 来订阅一个音讯,那么咱们怎么接管订阅的事件呢?就是通过 @EventListener 注解来接管 SessionSubscribeEvent 事件。

而前端订阅时就须要传入要监控的日志门路。这时候咱们就能拿到这个 WebSocket 要监听的日志门路了。

@EventListener
public void handlerSessionSubscribeEvent(SessionSubscribeEvent sessionSubscribeEvent) {....}
复制代码

开启 tail 过程

接着咱们要为每个 WebSocket 都开启一个线程,用来执行 tail 命令。

@Component
public class TailLog {
    public static final int MAX_LOG = 3;

    private List<LogMonitorExecute> mLogMonitorExecutes = new CopyOnWriteArrayList<>();

    /**
     * Log 线程池
     */
    private ExecutorService mExecutors = Executors.newFixedThreadPool(MAX_LOG);
    public void addMonitor(LogMonitorObject object) {LogMonitorExecute logMonitorExecute = new LogMonitorExecute(object);
        mExecutors.execute(logMonitorExecute);
        mLogMonitorExecutes.add(logMonitorExecute);
    }

    public void stopMonitor(String sessionId) {if (sessionId == null) {return;}
        for (LogMonitorExecute logMonitorExecute : mLogMonitorExecutes) {if (sessionId.equals(logMonitorExecute.getLogMonitorObject().getSessionId())) {logMonitorExecute.stop();
                mLogMonitorExecutes.remove(logMonitorExecute);
            }
        }
    }

    public boolean isArriveMaxLog() {return mLogMonitorExecutes.size() == MAX_LOG;
    }
}

复制代码

最终执行者,其中的 stop()办法是在 WebSocket 断开连接时执行的。那么须要当时保留好 sessionId 和 LogMonitorExecute 的对应关系。当文件有新变动时,发送给对应的 WebSocket。

 public class LogMonitorExecute implements Runnable {private static final Logger log = LoggerFactory.getLogger(LogMonitorExecute.class.getName());

    /**
     * 监控的对象
     */
    private LogMonitorObject mLogMonitorObject;
    private volatile boolean isStop = false;

    /**
     * tail 过程对象
     */
    private Process mProcess;


    public LogMonitorExecute(LogMonitorObject logMonitorObject) {mLogMonitorObject = logMonitorObject;}

    public LogMonitorObject getLogMonitorObject() {return mLogMonitorObject;}

    @Override
    public void run() {
        try {String path = Paths.get(mLogMonitorObject.getPath(), mLogMonitorObject.getName()).toString();
            log.info("{}对 {} 开始进行日志监控", mLogMonitorObject.getSessionId(), path);
            mProcess = Runtime.getRuntime().exec(new String[]{"bash", "-c", "tail -f" + path});
            InputStream inputStream = mProcess.getInputStream();
            BufferedReader mBufferedReader = new BufferedReader(new InputStreamReader(inputStream, "utf-8"));
            String buffer = null;
            while (!Thread.currentThread().isInterrupted() && !isStop) {buffer = mBufferedReader.readLine();
                if (mLogMonitorObject.getTailLogListener() != null) {mLogMonitorObject.getTailLogListener().onNewLine(mLogMonitorObject.getName(), mLogMonitorObject.getPath(), buffer);
                    continue;
                }
                break;
            }
            mBufferedReader.close();} catch (Exception e) {e.printStackTrace();
        }
        log.info("{}退出对 {} 的监控", mLogMonitorObject.getSessionId(), mLogMonitorObject.getPath() + "/" + mLogMonitorObject.getName());
    }

    public void stop() {mProcess.destroy();
        isStop = true;
    }
}

复制代码

留神这里,要发送给指定的 WebSocket,而不是订阅了这个门路的 WebSocket,因为应用 SimpMessagingTemplate 在发送数据时,他能够给所有订阅了此门路的 WebSocket,那么就导致如果一个浏览器开了 2 个监控,而且监控的都是同一个日志文件,那么每个监控都会收到两条同样的音讯。

所以要应用 convertAndSendToUser 办法而不是 convertAndSend,这也就是为什么后面会通过 setHandshakeHandler 设置握手处理器为每个 WebSocket 连贯取一个 name 的起因。

前端

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title> 日志监控 </title>
    <style> body {
            background: #000000;
            color: #ffffff;
        }

        .log-list {
            color: #ffffff;
            font-size: 13px;
            padding: 25px;
        } </style>
</head>
<body>
<div class="container">
    <div class="log-list">
    </div>
</div>
<script src="https://cdn.jsdelivr.net/npm/sockjs-client@1/dist/sockjs.min.js"></script>
<script src="/lib/stomp/stomp.min.js"></script>
<script src="https://lib.sinaapp.com/js/jquery/2.0.2/jquery-2.0.2.min.js"></script>
<script> var socket = new SockJS('/socket-log?a=a');
    stompClient = Stomp.over(socket);
    stompClient.connect({}, function (frame) {stompClient.subscribe('/user/topic/path'+getQueryVariable("path"), function (greeting) {console.log("a" + greeting)
            let item = $("<div class='log-line'></div>");
            item.text(greeting.body)
            $(".log-list").append(item);
            $("html, body").animate({scrollTop: $(document).height()}, 0);
        });
    });

    function getQueryVariable(variable) {var query = window.location.search.substring(1);
        var vars = query.split("&");
        for (var i = 0; i < vars.length; i++) {var pair = vars[i].split("=");
            if (pair[0] == variable) {return encodeURIComponent(pair[1]);
            }
        }
        return (false);
    } </script>
</body>
</html>
复制代码

成果

上面是启动、敞开 Tomcat 的日志。

不通过 SimpMessagingTemplate 如何发送数据

如果不应用 SimpMessagingTemplate,那么首先咱们要拿到对应的 WebSocketSession,它有个 sendMessage 办法用来发送数据,然而类型是 WebSocketMessage,Spring Boot 有几个默认的实现,比方 TextMessage 用来发送文本信息。

然而如果应用了 Stomp,那么单纯的应用他发送是不行的,数据尽管能过来,然而格局不对,Stomp 解析不了,所以咱们要依照 Stomp 的格局发送。

然而通过查找,未能找到相干的材料,所以本人看了一下他的源码,其中设计到了 StompEncoder 这个类,看名字就晓得他是 Stomp 编码的工具。Stomp 协定分为三个局部,命令、头、音讯体,命令有如下几个:

CONNECT
SEND
SUBSCRIBE
UNSUBSCRIBE
BEGIN
COMMIT
ABORT
ACK
NACK
DISCONNECT
复制代码

紧跟着命令下一行是头,是键值对模式存在的,最初是音讯体,开端以空字符结尾。

上面是发送的必要格局,否则 StompEncoder 也无奈编码,将抛出异样,至于这个为什么这么写,具体就得看 StompEncoderde.writeHeaders 办法了,外面有几个验证,这种写齐全是被他逼的。

 StompEncoder stompEncoder = new StompEncoder();
 byte[] encode = stompEncoder.encode(createStompMessageHeader(),msg.getBytes());
 webSocketSession.sendMessage(new TextMessage(encode));
 
 private HashMap<String, Object> createStompMessageHeader() {HashMap<String, Object> hashMap = new HashMap<>();
     hashMap.put("subscription", createList("sub-0"));
     hashMap.put("content-type", createList("text/plain"));
     HashMap<String, Object> stringObjectHashMap = new HashMap<>();
     stringObjectHashMap.put("simpMessageType", SimpMessageType.MESSAGE);
     stringObjectHashMap.put("stompCommand", StompCommand.MESSAGE);
     stringObjectHashMap.put("subscription", "sub-0");
     stringObjectHashMap.put("nativeHeaders", hashMap);
     return stringObjectHashMap;
}
 private List<String> createList(String value) {List<String> list = new ArrayList<>();
    list.add(value);
    return list;
}
复制代码

tail -f 为什么会生效

这是偶然间的一个发现,当执行 tail -f 命令后,咱们通过 vim、gedit 等工具编辑并保留这个文件,会发现 tail -f 并不会输入新的行,反而通过 echo test>>xx.txt 是失常的。

那这里的蹊跷又在哪?

其实,tail -f不论在文件挪动、改名都会进行追踪,因为他跟踪的是文件描述符,引入维基百科的一句话:

文件描述符在模式上是一个非负整数。实际上,它是一个索引值,指向内核为每一个过程所保护的该过程关上文件的记录表。当程序关上一个现有文件或者创立一个新文件时,内核向过程返回一个文件描述符。在程序设计中,一些波及底层的程序编写往往会围绕着文件描述符开展。然而文件描述符这一概念往往只实用于 UNIX、Linux 这样的操作系统。

tail -f执行后会产生一个过程,能够在 /proc/pid/fd 门路下查看他所关上的文件描述符,上面来看一个 GIF。

在这个操作中,首先在终端 1 中创立一个 1.txt,而后进行 tail -f 跟踪,接着在终端 2 中追加一行数据,能够看到终端 1 中是能够打印进去的。

而后在看神奇的一幕,在终端 2 进行 mv 改名,接着向被改名后的文件追加新的一行,你会发现,终端 1 竟然还是会打印的。

如果查看一下这个过程的文件描述符,就不为奇了,在上面的命令中,显示了 3 号描述符追踪的是/home/HouXinLin/test/tail/2.txt

hxl@hxl-PC:/home/HouXinLin/test/tail$ ps -ef |grep 1.txt
hxl       1368 29021  0 09:02 pts/0    00:00:00 grep 1.txt
hxl      20298 29672  0 09:00 pts/6    00:00:00 tail -f 1.txt
hxl@hxl-PC:/home/HouXinLin/test/tail$ ls -l /proc/20298/fd
总用量 0
lrwx------ 1 hxl hxl 64 3 月  16 09:02 0 -> /dev/pts/6
lrwx------ 1 hxl hxl 64 3 月  16 09:02 1 -> /dev/pts/6
lrwx------ 1 hxl hxl 64 3 月  16 09:02 2 -> /dev/pts/6
lr-x------ 1 hxl hxl 64 3 月  16 09:02 3 -> /home/HouXinLin/test/tail/2.txt
lr-x------ 1 hxl hxl 64 3 月  16 09:02 4 -> anon_inode:inotify
hxl@hxl-PC:/home/HouXinLin/test/tail$ 
复制代码

然而如果咱们通过vim、等工具编辑这个文件后,那么这个文件描述符中会被记录为被删除,即便这个文件的确是存在的,此时在向 2.txt 文件中追加就会生效。

hxl@hxl-PC:/home/HouXinLin/test/tail$ vim 2.txt 
hxl@hxl-PC:/home/HouXinLin/test/tail$ ls -l /proc/20298/fd
总用量 0
lrwx------ 1 hxl hxl 64 3 月  16 09:02 0 -> /dev/pts/6
lrwx------ 1 hxl hxl 64 3 月  16 09:02 1 -> /dev/pts/6
lrwx------ 1 hxl hxl 64 3 月  16 09:02 2 -> /dev/pts/6
lr-x------ 1 hxl hxl 64 3 月  16 09:02 3 -> /home/HouXinLin/test/tail/2.txt~ (deleted)
lr-x------ 1 hxl hxl 64 3 月  16 09:02 4 -> anon_inode:inotify
hxl@hxl-PC:/home/HouXinLin/test/tail$ 

复制代码

最初,无妨在尝试一下tail -F

参考:《2020 最新 Java 根底精讲视频教程和学习路线!》
链接:https://juejin.cn/post/694006…

退出移动版