关于java:Spring-Boot-集成-WebSocket轻松实现信息推送

1次阅读

共计 7004 个字符,预计需要花费 18 分钟才能阅读完成。

在一次我的项目开发中,应用到了 Netty 网络应用框架,以及 MQTT 进行音讯数据的收发,这其中须要后盾来将获取到的音讯被动推送给前端,于是就应用到了 MQTT,特此记录一下。

一、什么是 websocket?

WebSocket 协定是基于 TCP 的一种新的网络协议。它实现了客户端与服务器全双工通信,学过计算机网络都晓得,既然是全双工,就阐明了 服务器能够被动发送信息给客户端

这与咱们的推送技术或者是多人在线聊天的性能不约而同。

为什么不应用 HTTP 协定呢?

这是因为 HTTP 是单工通信,通信只能由客户端发动,客户端申请一下,服务器解决一下,这就太麻烦了。于是 websocket 应运而生。

上面咱们就间接开始应用 Springboot 开始整合。以下案例都在我本人的电脑上测试胜利,你能够依据本人的性能进行批改即可。

我的我的项目构造如下:

二、应用步骤

1. 增加依赖

Maven 依赖:

<dependency>  
    <groupId>org.springframework.boot</groupId>  
    <artifactId>spring-boot-starter-websocket</artifactId>  
</dependency> 

2. 启用 Springboot 对 WebSocket 的反对

启用 WebSocket 的反对也是很简略,几句代码搞定:

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
/**
 * @ Auther: 马超伟
 * @ Date: 2020/06/16/14:35
 * @ Description: 开启 WebSocket 反对
 */
@Configuration
public class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();
    }
}

Spring Boot 最新教程举荐看这个:https://github.com/javastacks…

3. 外围配置:WebSocketServer

因为 WebSocket 是相似客户端服务端的模式(采纳 ws 协定),那么这里的 WebSocketServer 其实就相当于一个 ws 协定的 Controller

  • @ ServerEndpoint 注解是一个类档次的注解,它的性能次要是将目前的类定义成一个 websocket 服务器端, 注解的值将被用于监听用户连贯的终端拜访 URL 地址, 客户端能够通过这个 URL 来连贯到 WebSocket 服务器端
  • 新建一个 ConcurrentHashMap webSocketMap 用于接管以后 userId 的 WebSocket,不便传递之间对 userId 进行推送音讯。

上面是具体业务代码:

package cc.mrbird.febs.external.webScoket;

import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.time.LocalDateTime;
import java.util.List;
import java.util.concurrent.CopyOnWriteArraySet;

/**
 * Created with IntelliJ IDEA.
 * @ Auther: 马超伟
 * @ Date: 2020/06/16/14:35
 * @ Description:
 * @ ServerEndpoint 注解是一个类档次的注解,它的性能次要是将目前的类定义成一个 websocket 服务器端,
 * 注解的值将被用于监听用户连贯的终端拜访 URL 地址, 客户端能够通过这个 URL 来连贯到 WebSocket 服务器端
 */
@Component
@Slf4j
@Service
@ServerEndpoint("/api/websocket/{sid}")
public class WebSocketServer {
    // 动态变量,用来记录以后在线连接数。应该把它设计成线程平安的。private static int onlineCount = 0;
    //concurrent 包的线程平安 Set,用来寄存每个客户端对应的 MyWebSocket 对象。private static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<WebSocketServer>();

    // 与某个客户端的连贯会话,须要通过它来给客户端发送数据
    private Session session;

    // 接管 sid
    private String sid = "";

    /**
     * 连贯建设胜利调用的办法
     */
    @OnOpen
    public void onOpen(Session session, @PathParam("sid") String sid) {
        this.session = session;
        webSocketSet.add(this);     // 退出 set 中
        this.sid = sid;
        addOnlineCount();           // 在线数加 1
        try {sendMessage("conn_success");
            log.info("有新窗口开始监听:" + sid + ", 以后在线人数为:" + getOnlineCount());
        } catch (IOException e) {log.error("websocket IO Exception");
        }
    }

    /**
     * 连贯敞开调用的办法
     */
    @OnClose
    public void onClose() {webSocketSet.remove(this);  // 从 set 中删除
        subOnlineCount();           // 在线数减 1
        // 断开连接状况下,更新主板占用状况为开释
        log.info("开释的 sid 为:"+sid);
        // 这里写你 开释的时候,要解决的业务
        log.info("有一连贯敞开!以后在线人数为" + getOnlineCount());

    }

    /**
     * 收到客户端音讯后调用的办法
     * @ Param message 客户端发送过去的音讯
     */
    @OnMessage
    public void onMessage(String message, Session session) {log.info("收到来自窗口" + sid + "的信息:" + message);
        // 群发音讯
        for (WebSocketServer item : webSocketSet) {
            try {item.sendMessage(message);
            } catch (IOException e) {e.printStackTrace();
            }
        }
    }

    /**
     * @ Param session
     * @ Param error
     */
    @OnError
    public void onError(Session session, Throwable error) {log.error("产生谬误");
        error.printStackTrace();}

    /**
     * 实现服务器被动推送
     */
    public void sendMessage(String message) throws IOException {this.session.getBasicRemote().sendText(message);
    }

    /**
     * 群发自定义音讯
     */
    public static void sendInfo(String message, @PathParam("sid") String sid) throws IOException {log.info("推送音讯到窗口" + sid + ",推送内容:" + message);

        for (WebSocketServer item : webSocketSet) {
            try {
                // 这里能够设定只推送给这个 sid 的,为 null 则全副推送
                if (sid == null) {//                    item.sendMessage(message);
                } else if (item.sid.equals(sid)) {item.sendMessage(message);
                }
            } catch (IOException e) {continue;}
        }
    }

    public static synchronized int getOnlineCount() {return onlineCount;}

    public static synchronized void addOnlineCount() {WebSocketServer.onlineCount++;}

    public static synchronized void subOnlineCount() {WebSocketServer.onlineCount--;}

    public static CopyOnWriteArraySet<WebSocketServer> getWebSocketSet() {return webSocketSet;}
}

4. 测试 Controller

import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.servlet.ModelAndView;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
 * Created with IntelliJ IDEA.
 *
 * @ Auther: 马超伟
 * @ Date: 2020/06/16/14:38
 * @ Description:
 */
@Controller("web_Scoket_system")
@RequestMapping("/api/socket")
public class SystemController {
    // 页面申请
    @GetMapping("/index/{userId}")
    public ModelAndView socket(@PathVariable String userId) {ModelAndView mav = new ModelAndView("/socket1");
        mav.addObject("userId", userId);
        return mav;
    }

    // 推送数据接口
    @ResponseBody
    @RequestMapping("/socket/push/{cid}")
    public Map pushToWeb(@PathVariable String cid, String message) {Map<String,Object> result = new HashMap<>();
        try {WebSocketServer.sendInfo(message, cid);
            result.put("code", cid);
            result.put("msg", message);
        } catch (IOException e) {e.printStackTrace();
        }
        return result;
    }
}

5. 测试页面 index.html

<!DOCTYPE html>
<html>

    <head>
        <meta charset="utf-8">
        <title>Java 后端 WebSocket 的 Tomcat 实现 </title>
        <script type="text/javascript" src="js/jquery.min.js"></script>
    </head>

    <body>
        <div id="main" style="width: 1200px;height:800px;"></div>
        Welcome<br/><input id="text" type="text" />
        <button onclick="send()"> 发送音讯 </button>
        <hr/>
        <button onclick="closeWebSocket()"> 敞开 WebSocket 连贯 </button>
        <hr/>
        <div id="message"></div>
    </body>
    <script type="text/javascript">
        var websocket = null;
        // 判断以后浏览器是否反对 WebSocket
        if('WebSocket' in window) {
            // 改成你的地址
            websocket = new WebSocket("ws://192.168.100.196:8082/api/websocket/100");
        } else {alert('以后浏览器 Not support websocket')
        }

        // 连贯产生谬误的回调办法
        websocket.onerror = function() {setMessageInnerHTML("WebSocket 连贯产生谬误");
        };

        // 连贯胜利建设的回调办法
        websocket.onopen = function() {setMessageInnerHTML("WebSocket 连贯胜利");
        }
        var U01data, Uidata, Usdata
        // 接管到音讯的回调办法
        websocket.onmessage = function(event) {console.log(event);
            setMessageInnerHTML(event);
            setechart()}

        // 连贯敞开的回调办法
        websocket.onclose = function() {setMessageInnerHTML("WebSocket 连贯敞开");
        }

        // 监听窗口敞开事件,当窗口敞开时,被动去敞开 websocket 连贯,避免连贯还没断开就敞开窗口,server 端会抛异样。window.onbeforeunload = function() {closeWebSocket();
        }

        // 将音讯显示在网页上
        function setMessageInnerHTML(innerHTML) {document.getElementById('message').innerHTML += innerHTML + '<br/>';
        }

        // 敞开 WebSocket 连贯
        function closeWebSocket() {websocket.close();
        }

        // 发送音讯
        function send() {var message = document.getElementById('text').value;
            websocket.send('{"msg":"' + message + '"}');
            setMessageInnerHTML(message + "&#13;");
        }
    </script>

</html>

6. 后果展现

后盾:

如果有连贯申请

前台显示:

总结

这两头我遇到一个问题,就是说 WebSocket 启动的时候优先于 spring 容器,从而导致在 WebSocketServer 中调用业务 Service 会报空指针异样

所以须要在 WebSocketServer 中将所须要用到的 service 给动态初始化一下:

如图所示:

还须要做如下配置:

原文链接:https://blog.csdn.net/MacWx/a…

版权申明:本文为 CSDN 博主「大树学生.」的原创文章,遵循 CC 4.0 BY-SA 版权协定,转载请附上原文出处链接及本申明。

近期热文举荐:

1.600+ 道 Java 面试题及答案整顿(2021 最新版)

2. 终于靠开源我的项目弄到 IntelliJ IDEA 激活码了,真香!

3. 阿里 Mock 工具正式开源,干掉市面上所有 Mock 工具!

4.Spring Cloud 2020.0.0 正式公布,全新颠覆性版本!

5.《Java 开发手册(嵩山版)》最新公布,速速下载!

感觉不错,别忘了顺手点赞 + 转发哦!

正文完
 0