前言
最近在做一个小工具,有个需要是在Web端能实时查看日志文件,也就是相当于在终端执行tail -f
命令,对此没有找到好的解决形式,一开始想的间接通过FileInputStream
来读取,因为他也能间接跳过n个字节来读取,就像上面这样。
public static void main(String[] args) throws Exception { File file = new File("/home/1.txt"); FileInputStream fin = new FileInputStream(file); int ch; fin.skip(10); while ((ch = fin.read()) != -1){ System.out.print((char) ch); } }复制代码
如果不跳过的话,那么每次读取全部内容并展现显然不事实,咱们要做的是像tail
一样,每次从后n行开始读取,并且会继续输入最新的行。
还有一个问题就是对文件的变动要能感知到,所以最初抉择间接调用tail
命令,并且通过WebSocket输入到网页上。
tail用法
在java中调用tail
命令后,拿到它的输出流并且包装成BufferedReader,如果通过readLine()读取不到数据,那么他会始终阻塞,并不会返回null,这也就代表日志文件中临时还没有新数据写入,一旦readLine()办法返回,那么就代表有新数据达到了。另外一个问题就是如何终止,咱们不可能让他始终读取,要在一个适合的工夫终止,答案就是在WebSocket断开连接时,并且Process类提供了destroy()办法用来终止这个过程,相当于按下了Ctrl+C
public static void main(String[] args) throws Exception { Process exec = Runtime.getRuntime().exec(new String[]{"bash", "-c", "tail -F /home/HouXinLin/test.txt"}); InputStream inputStream = exec.getInputStream(); BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream)); for (;;){ System.out.println(bufferedReader.readLine()+"r"); } }复制代码
实现过程
在Spring Boot中退出WebSocket性能有很多形式,目前感觉广泛的文章都是介绍以ServerEndpointExporter、@OnOpen、 @OnClose、@OnMessage这种形式来实现的,这种形式须要申明一个Bean,也就是ServerEndpointExporter,然而我记得如果要打包成war放入Tomcat中运行时,还须要把这个Bean勾销掉,否则还会报错,十分的麻烦,当然也有方法解决。
还有其余集成的方法,比方实现WebSocketConfigurer
或者WebSocketMessageBrokerConfigurer
接口,而我目前采纳的是实现WebSocketMessageBrokerConfigurer
接口,并且前端还须要两个库,SockJS和Stomp(更具抉择,也能够不应用)。
SockJS提供相似于WebSocket的对象,还有一套跨浏览器的API,能够在浏览器和Web服务器之间创立了低提早,全双工,跨域的通信通道,如果浏览器不反对 WebSocket,它还能够模仿对WebSocket的反对。
Stomp即Simple Text Orientated Messaging Protocol,简略(流)文本定向音讯协定,它提供了一个可互操作的连贯格局,容许STOMP客户端与任意STOMP音讯代理(Broker)进行交互。
首先看一下连贯解决层的逻辑,其中一部分非必要的代码就不展现了。
@Configuration@EnableWebSocketMessageBrokerpublic class WebSocketConfig implements WebSocketMessageBrokerConfigurer { private static final Logger log = LoggerFactory.getLogger(WebSocketConfig.class.getName()); @Autowired SimpMessagingTemplate mSimpMessagingTemplate; @Autowired WebSocketManager mWebSocketManager; @Autowired TailLog mTailLog; @Override public void configureMessageBroker(MessageBrokerRegistry registry) { registry.enableSimpleBroker("/topic/path"); } @Override public void configureWebSocketTransport(WebSocketTransportRegistration registration) { registration.addDecoratorFactory(new WebSocketHandlerDecoratorFactory() { @Override public WebSocketHandler decorate(WebSocketHandler webSocketHandler) { return new WebSocketHandlerDecorator(webSocketHandler) { @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { log.info("日志监控WebSocket连贯,sessionId={}", session.getId()); mWebSocketManager.add(session); super.afterConnectionEstablished(session); } @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception { mWebSocketManager.remove(session.getId()); super.afterConnectionClosed(session, closeStatus); } }; } }); } @Override public void registerStompEndpoints(StompEndpointRegistry registry) { registry.addEndpoint("/socket-log") .addInterceptors(new HttpHandshakeInterceptor()) .setHandshakeHandler(new DefaultHandshakeHandler() { @Override protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) { return new StompPrincipal(UUID.randomUUID().toString()); } }) .withSockJS(); } @EventListener public void handlerSessionCloseEvent(SessionDisconnectEvent sessionDisconnectEvent) { StompHeaderAccessor headerAccessor = StompHeaderAccessor.wrap(sessionDisconnectEvent.getMessage()); mTailLog.stopMonitor(headerAccessor.getSessionId()); } /** * 门路订阅 * * @param sessionSubscribeEvent */ @EventListener public void handlerSessionSubscribeEvent(SessionSubscribeEvent sessionSubscribeEvent) { StompHeaderAccessor headerAccessor = StompHeaderAccessor.wrap(sessionSubscribeEvent.getMessage()); if (mTailLog.isArriveMaxLog()) { mWebSocketManager.sendMessage(headerAccessor.getSessionId(), "监控数量曾经达到限度,无奈查看""); log.info("日志监控WebSocket连贯曾经达到最大数量,将断开sessionId={}", headerAccessor.getSessionId()); mWebSocketManager.close(headerAccessor.getSessionId()); return; } String destination = headerAccessor.getDestination(); String userId = headerAccessor.getUser().getName(); if (destination.startsWith("/user/topic/path")) { String path = destination.substring("/user/topic/path".length()); File file = new File(StringUtils.urlDecoder(path)); if (!file.exists()) { mWebSocketManager.sendMessage(headerAccessor.getSessionId(), "what are you 弄啥嘞,文件找不到啊"); mWebSocketManager.close(headerAccessor.getSessionId()); return; } TailLogListenerImpl tailLogListener = new TailLogListenerImpl(mSimpMessagingTemplate, userId); mTailLog.addMonitor(new LogMonitorObject(file.getName(), file.getParent(), tailLogListener, "" + headerAccessor.getSessionId(), userId)); } }}复制代码
对于下面的几个接口可能没应用过他的人有点蒙,至多我在学习他的时候是这样的,看下面的代码,咱们先要理清逻辑,能力明确为什么要这样写。
实现registerStompEndpoints办法
首先是WebSocketMessageBrokerConfigurer接口,Spring Boot提供的一个WebSocket配置接口,只须要简简单单的配置两下,就能够实现一个WebSocket程序,这个接口中有8个办法,而咱们只须要用到三个个。
而后就是给出前端连贯WebSocket所须要的地址,如果连连贯地址都不给,前面步骤怎么持续?这个就是通过实现registerStompEndpoints办法来实现,只须要向StompEndpointRegistry中通过addEndpoint增加一个新的"连接点"就能够,还能够设置拦截器,也就是在前端试图连贯的时候,如果后端发现这个连贯不对劲,有猫腻,能够回绝和他连贯,这步能够通过addInterceptors来实现。
切记如果应用了SocketJs库,那么肯定要退出withSockJS。
@Overridepublic void registerStompEndpoints(StompEndpointRegistry registry) { registry.addEndpoint("/log") .addInterceptors(new HttpHandshakeInterceptor()) .setHandshakeHandler(new DefaultHandshakeHandler() { @Override protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) { return new StompPrincipal(UUID.randomUUID().toString()); } }) .withSockJS();}复制代码
保留SessionId和WebSocketSession对应关系
这一步是为了方便管理,比方被动断开连接,须要实现configureWebSocketTransport接口,然而这里的SessionId并不是服务端生成的会话ID,而是这个WebSocket的会话ID,每个WebSocket连贯都是不同的。
这里次要思考到如果前端传过来的文件不存在,那么服务端要能被动断开连接。
@Overridepublic void configureWebSocketTransport(WebSocketTransportRegistration registration) { registration.addDecoratorFactory(new WebSocketHandlerDecoratorFactory() { @Override public WebSocketHandler decorate(WebSocketHandler webSocketHandler) { return new WebSocketHandlerDecorator(webSocketHandler) { @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { log.info("日志监控WebSocket连贯,sessionId={}", session.getId()); mWebSocketManager.add(session); super.afterConnectionEstablished(session); } @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception { mWebSocketManager.remove(session.getId()); super.afterConnectionClosed(session, closeStatus); } }; } });}复制代码
监听订阅
接着前端通过Stomp的API来订阅一个音讯,那么咱们怎么接管订阅的事件呢?就是通过 @EventListener注解来接管SessionSubscribeEvent事件。
而前端订阅时就须要传入要监控的日志门路。这时候咱们就能拿到这个WebSocket要监听的日志门路了。
@EventListenerpublic void handlerSessionSubscribeEvent(SessionSubscribeEvent sessionSubscribeEvent) { ....}复制代码
开启tail过程
接着咱们要为每个WebSocket都开启一个线程,用来执行tail
命令。
@Componentpublic class TailLog { public static final int MAX_LOG = 3; private List<LogMonitorExecute> mLogMonitorExecutes = new CopyOnWriteArrayList<>(); /** * Log线程池 */ private ExecutorService mExecutors = Executors.newFixedThreadPool(MAX_LOG); public void addMonitor(LogMonitorObject object) { LogMonitorExecute logMonitorExecute = new LogMonitorExecute(object); mExecutors.execute(logMonitorExecute); mLogMonitorExecutes.add(logMonitorExecute); } public void stopMonitor(String sessionId) { if (sessionId == null) { return; } for (LogMonitorExecute logMonitorExecute : mLogMonitorExecutes) { if (sessionId.equals(logMonitorExecute.getLogMonitorObject().getSessionId())) { logMonitorExecute.stop(); mLogMonitorExecutes.remove(logMonitorExecute); } } } public boolean isArriveMaxLog() { return mLogMonitorExecutes.size() == MAX_LOG; }}复制代码
最终执行者,其中的stop()办法是在WebSocket断开连接时执行的。那么须要当时保留好sessionId和LogMonitorExecute的对应关系。当文件有新变动时,发送给对应的WebSocket。
public class LogMonitorExecute implements Runnable { private static final Logger log = LoggerFactory.getLogger(LogMonitorExecute.class.getName()); /** * 监控的对象 */ private LogMonitorObject mLogMonitorObject; private volatile boolean isStop = false; /** * tail 过程对象 */ private Process mProcess; public LogMonitorExecute(LogMonitorObject logMonitorObject) { mLogMonitorObject = logMonitorObject; } public LogMonitorObject getLogMonitorObject() { return mLogMonitorObject; } @Override public void run() { try { String path = Paths.get(mLogMonitorObject.getPath(), mLogMonitorObject.getName()).toString(); log.info("{}对{}开始进行日志监控", mLogMonitorObject.getSessionId(), path); mProcess = Runtime.getRuntime().exec(new String[]{"bash", "-c", "tail -f " + path}); InputStream inputStream = mProcess.getInputStream(); BufferedReader mBufferedReader = new BufferedReader(new InputStreamReader(inputStream, "utf-8")); String buffer = null; while (!Thread.currentThread().isInterrupted() && !isStop) { buffer = mBufferedReader.readLine(); if (mLogMonitorObject.getTailLogListener() != null) { mLogMonitorObject.getTailLogListener().onNewLine(mLogMonitorObject.getName(), mLogMonitorObject.getPath(), buffer); continue; } break; } mBufferedReader.close(); } catch (Exception e) { e.printStackTrace(); } log.info("{}退出对{}的监控", mLogMonitorObject.getSessionId(), mLogMonitorObject.getPath() + "/" + mLogMonitorObject.getName()); } public void stop() { mProcess.destroy(); isStop = true; }}复制代码
留神这里,要发送给指定的WebSocket,而不是订阅了这个门路的WebSocket,因为应用SimpMessagingTemplate
在发送数据时,他能够给所有订阅了此门路的WebSocket,那么就导致如果一个浏览器开了2个监控,而且监控的都是同一个日志文件,那么每个监控都会收到两条同样的音讯。
所以要应用convertAndSendToUser办法而不是convertAndSend,这也就是为什么后面会通过setHandshakeHandler设置握手处理器为每个WebSocket连贯取一个name的起因。
前端
<!DOCTYPE html><html lang="en"><head> <meta charset="UTF-8"> <title>日志监控</title> <style> body { background: #000000; color: #ffffff; } .log-list { color: #ffffff; font-size: 13px; padding: 25px; } </style></head><body><div class="container"> <div class="log-list"> </div></div><script src="https://cdn.jsdelivr.net/npm/sockjs-client@1/dist/sockjs.min.js"></script><script src="/lib/stomp/stomp.min.js"></script><script src="https://lib.sinaapp.com/js/jquery/2.0.2/jquery-2.0.2.min.js"></script><script> var socket = new SockJS('/socket-log?a=a'); stompClient = Stomp.over(socket); stompClient.connect({}, function (frame) { stompClient.subscribe('/user/topic/path'+getQueryVariable("path"), function (greeting) { console.log("a" + greeting) let item = $("<div class='log-line'></div>"); item.text(greeting.body) $(".log-list").append(item); $("html, body").animate({scrollTop: $(document).height()}, 0); }); }); function getQueryVariable(variable) { var query = window.location.search.substring(1); var vars = query.split("&"); for (var i = 0; i < vars.length; i++) { var pair = vars[i].split("="); if (pair[0] == variable) { return encodeURIComponent(pair[1]); } } return (false); } </script></body></html>复制代码
成果
上面是启动、敞开Tomcat的日志。
不通过SimpMessagingTemplate如何发送数据
如果不应用SimpMessagingTemplate,那么首先咱们要拿到对应的WebSocketSession,它有个sendMessage办法用来发送数据,然而类型是WebSocketMessage,Spring Boot有几个默认的实现,比方TextMessage用来发送文本信息。
然而如果应用了Stomp,那么单纯的应用他发送是不行的,数据尽管能过来,然而格局不对,Stomp解析不了,所以咱们要依照Stomp的格局发送。
然而通过查找,未能找到相干的材料,所以本人看了一下他的源码,其中设计到了StompEncoder这个类,看名字就晓得他是Stomp编码的工具。Stomp协定分为三个局部,命令、头、音讯体,命令有如下几个:
CONNECTSENDSUBSCRIBEUNSUBSCRIBEBEGINCOMMITABORTACKNACKDISCONNECT复制代码
紧跟着命令下一行是头,是键值对模式存在的,最初是音讯体,开端以空字符结尾。
上面是发送的必要格局,否则StompEncoder也无奈编码,将抛出异样,至于这个为什么这么写,具体就得看StompEncoderde.writeHeaders
办法了,外面有几个验证,这种写齐全是被他逼的。
StompEncoder stompEncoder = new StompEncoder(); byte[] encode = stompEncoder.encode(createStompMessageHeader(),msg.getBytes()); webSocketSession.sendMessage(new TextMessage(encode)); private HashMap<String, Object> createStompMessageHeader() { HashMap<String, Object> hashMap = new HashMap<>(); hashMap.put("subscription", createList("sub-0")); hashMap.put("content-type", createList("text/plain")); HashMap<String, Object> stringObjectHashMap = new HashMap<>(); stringObjectHashMap.put("simpMessageType", SimpMessageType.MESSAGE); stringObjectHashMap.put("stompCommand", StompCommand.MESSAGE); stringObjectHashMap.put("subscription", "sub-0"); stringObjectHashMap.put("nativeHeaders", hashMap); return stringObjectHashMap;} private List<String> createList(String value) { List<String> list = new ArrayList<>(); list.add(value); return list;}复制代码
tail -f 为什么会生效
这是偶然间的一个发现,当执行tail -f
命令后,咱们通过vim、gedit等工具编辑并保留这个文件,会发现tail -f
并不会输入新的行,反而通过echo test>>xx.txt
是失常的。
那这里的蹊跷又在哪?
其实,tail -f
不论在文件挪动、改名都会进行追踪,因为他跟踪的是文件描述符,引入维基百科的一句话:
文件描述符在模式上是一个非负整数。实际上,它是一个索引值,指向内核为每一个过程所保护的该过程关上文件的记录表。当程序关上一个现有文件或者创立一个新文件时,内核向过程返回一个文件描述符。在程序设计中,一些波及底层的程序编写往往会围绕着文件描述符开展。然而文件描述符这一概念往往只实用于UNIX、Linux这样的操作系统。
tail -f
执行后会产生一个过程,能够在/proc/pid/fd
门路下查看他所关上的文件描述符,上面来看一个GIF。
在这个操作中,首先在终端1中创立一个1.txt,而后进行tail -f
跟踪,接着在终端2中追加一行数据,能够看到终端1中是能够打印进去的。
而后在看神奇的一幕,在终端2进行mv改名,接着向被改名后的文件追加新的一行,你会发现,终端1竟然还是会打印的。
如果查看一下这个过程的文件描述符,就不为奇了,在上面的命令中,显示了3号描述符追踪的是/home/HouXinLin/test/tail/2.txt
。
hxl@hxl-PC:/home/HouXinLin/test/tail$ ps -ef |grep 1.txthxl 1368 29021 0 09:02 pts/0 00:00:00 grep 1.txthxl 20298 29672 0 09:00 pts/6 00:00:00 tail -f 1.txthxl@hxl-PC:/home/HouXinLin/test/tail$ ls -l /proc/20298/fd总用量 0lrwx------ 1 hxl hxl 64 3月 16 09:02 0 -> /dev/pts/6lrwx------ 1 hxl hxl 64 3月 16 09:02 1 -> /dev/pts/6lrwx------ 1 hxl hxl 64 3月 16 09:02 2 -> /dev/pts/6lr-x------ 1 hxl hxl 64 3月 16 09:02 3 -> /home/HouXinLin/test/tail/2.txtlr-x------ 1 hxl hxl 64 3月 16 09:02 4 -> anon_inode:inotifyhxl@hxl-PC:/home/HouXinLin/test/tail$ 复制代码
然而如果咱们通过vim
、等工具编辑这个文件后,那么这个文件描述符中会被记录为被删除,即便这个文件的确是存在的,此时在向2.txt文件中追加就会生效。
hxl@hxl-PC:/home/HouXinLin/test/tail$ vim 2.txt hxl@hxl-PC:/home/HouXinLin/test/tail$ ls -l /proc/20298/fd总用量 0lrwx------ 1 hxl hxl 64 3月 16 09:02 0 -> /dev/pts/6lrwx------ 1 hxl hxl 64 3月 16 09:02 1 -> /dev/pts/6lrwx------ 1 hxl hxl 64 3月 16 09:02 2 -> /dev/pts/6lr-x------ 1 hxl hxl 64 3月 16 09:02 3 -> /home/HouXinLin/test/tail/2.txt~ (deleted)lr-x------ 1 hxl hxl 64 3月 16 09:02 4 -> anon_inode:inotifyhxl@hxl-PC:/home/HouXinLin/test/tail$ 复制代码
最初,无妨在尝试一下tail -F
?
参考:《2020最新Java根底精讲视频教程和学习路线!》
链接:https://juejin.cn/post/694006...