关于java:后端向前端推送消息

12次阅读

共计 12405 个字符,预计需要花费 32 分钟才能阅读完成。

SpringBoot+WebSocket 集成

    什么是 WebSocket?
    为什么须要 WebSocket?前言
    maven 依赖
    WebSocketConfig
    WebSocketServer
    音讯推送
    页面发动
    运行成果
    后续
    Websocker 注入 Bean 问题
    netty-websocket-spring-boot-starter
    Springboot2+Netty+Websocket
    ServerEndpointExporter 谬误
    正式我的项目的前端 WebSocket 框架 GoEasy
    `@Component` 和 `@ServerEndpoint` 对于是否单例模式,是否应用 static Map 等一些问题的解答
    Vue 版本的 websocket 连贯

什么是 WebSocket?

这里写图片形容
WebSocket 协定是基于 TCP 的一种新的网络协议。它实现了浏览器与服务器全双工(full-duplex) 通信——容许服务器被动发送信息给客户端。
为什么须要 WebSocket?

首次接触 WebSocket 的人,都会问同样的问题:咱们曾经有了 HTTP 协定,为什么还须要另一个协定?它能带来什么益处?

答案很简略,因为 HTTP 协定有一个缺点:通信只能由客户端发动,HTTP 协定做不到服务器被动向客户端推送信息。这里写图片形容
举例来说,咱们想要查问以后的排队状况,只能是页面轮询向服务器发出请求,服务器返回查问后果。轮询的效率低,十分浪费资源(因为必须不停连贯,或者 HTTP 连贯始终关上)。因而 WebSocket 就是这样创造的。

前言

2020-10-20 教程补充:

补充对于 @Component 和 @ServerEndpoint 对于是否单例模式等的解答,感激大家热心发问和钻研。Vue 版本的 websocket 连贯办法

2020-01-05 教程补充:

整合了 IM 相干的优化
优化开启 / 敞开连贯的解决
上传到开源我的项目 spring-cloud-study-websocket,不便大家下载代码。

感激大家的反对和留言,14W 访问量是满满的能源!接下来还会有 websocket+redis 集群优化篇针对多 ws 服务器做简略优化解决,敬请期待!

话不多说,马上进入干货时刻。
maven 依赖

SpringBoot2.0 对 WebSocket 的反对几乎太棒了,间接就有包能够引入

    <dependency>  
       <groupId>org.springframework.boot</groupId>  
       <artifactId>spring-boot-starter-websocket</artifactId>  
   </dependency> 

1
2
3
4

WebSocketConfig

启用 WebSocket 的反对也是很简略,几句代码搞定

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**

  • 开启 WebSocket 反对
  • @author zhengkai.blog.csdn.net
    */

@Configuration
public class WebSocketConfig {


@Bean  
public ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();  
}  

}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

WebSocketServer

这就是重点了,外围都在这里。

因为 WebSocket 是相似客户端服务端的模式(采纳 ws 协定),那么这里的 WebSocketServer 其实就相当于一个 ws 协定的 Controller

间接 @ServerEndpoint("/imserver/{userId}")、@Component 启用即可,而后在外面实现 @OnOpen 开启连贯,@onClose 敞开连贯,@onMessage 接管音讯等办法。新建一个 ConcurrentHashMap webSocketMap 用于接管以后 userId 的 WebSocket,不便 IM 之间对 userId 进行推送音讯。单机版实现到这里就能够。集群版(多个 ws 节点)还须要借助 mysql 或者 redis 等进行解决,革新对应的 sendMessage 办法即可。

package com.softdev.system.demo.config;

import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang.StringUtils;
import org.springframework.stereotype.Component;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;

/**

  • @author zhengkai.blog.csdn.net
    */

@ServerEndpoint(“/imserver/{userId}”)
@Component
public class WebSocketServer {

static Log log=LogFactory.get(WebSocketServer.class);
/** 动态变量,用来记录以后在线连接数。应该把它设计成线程平安的。*/
private static int onlineCount = 0;
/**concurrent 包的线程平安 Set,用来寄存每个客户端对应的 MyWebSocket 对象。*/
private static ConcurrentHashMap<String,WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
/** 与某个客户端的连贯会话,须要通过它来给客户端发送数据 */
private Session session;
/** 接管 userId*/
private String userId="";

/**
 * 连贯建设胜利调用的办法 */
@OnOpen
public void onOpen(Session session,@PathParam("userId") String userId) {
    this.session = session;
    this.userId=userId;
    if(webSocketMap.containsKey(userId)){webSocketMap.remove(userId);
        webSocketMap.put(userId,this);
        // 退出 set 中
    }else{webSocketMap.put(userId,this);
        // 退出 set 中
        addOnlineCount();
        // 在线数加 1
    }

    log.info("用户连贯:"+userId+", 以后在线人数为:" + getOnlineCount());

    try {sendMessage("连贯胜利");
    } catch (IOException e) {log.error("用户:"+userId+", 网络异样!!!!!!");
    }
}

/**
 * 连贯敞开调用的办法
 */
@OnClose
public void onClose() {if(webSocketMap.containsKey(userId)){webSocketMap.remove(userId);
        // 从 set 中删除
        subOnlineCount();}
    log.info("用户退出:"+userId+", 以后在线人数为:" + getOnlineCount());
}

/**
 * 收到客户端音讯后调用的办法
 *
 * @param message 客户端发送过去的音讯 */
@OnMessage
public void onMessage(String message, Session session) {log.info("用户音讯:"+userId+", 报文:"+message);
    // 能够群发音讯
    // 音讯保留到数据库、redis
    if(StringUtils.isNotBlank(message)){
        try {
            // 解析发送的报文
            JSONObject jsonObject = JSON.parseObject(message);
            // 追加发送人(避免串改)
            jsonObject.put("fromUserId",this.userId);
            String toUserId=jsonObject.getString("toUserId");
            // 传送给对应 toUserId 用户的 websocket
            if(StringUtils.isNotBlank(toUserId)&&webSocketMap.containsKey(toUserId)){webSocketMap.get(toUserId).sendMessage(jsonObject.toJSONString());
            }else{log.error("申请的 userId:"+toUserId+"不在该服务器上");
                // 否则不在这个服务器上,发送到 mysql 或者 redis
            }
        }catch (Exception e){e.printStackTrace();
        }
    }
}

/**
 *
 * @param session
 * @param error
 */
@OnError
public void onError(Session session, Throwable error) {log.error("用户谬误:"+this.userId+", 起因:"+error.getMessage());
    error.printStackTrace();}
/**
 * 实现服务器被动推送
 */
public void sendMessage(String message) throws IOException {this.session.getBasicRemote().sendText(message);
}


/**
 * 发送自定义音讯
 * */
public static void sendInfo(String message,@PathParam("userId") String userId) throws IOException {log.info("发送音讯到:"+userId+",报文:"+message);
    if(StringUtils.isNotBlank(userId)&&webSocketMap.containsKey(userId)){webSocketMap.get(userId).sendMessage(message);
    }else{log.error("用户"+userId+", 不在线!");
    }
}

public static synchronized int getOnlineCount() {return onlineCount;}

public static synchronized void addOnlineCount() {WebSocketServer.onlineCount++;}

public static synchronized void subOnlineCount() {WebSocketServer.onlineCount--;}

}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146

音讯推送

至于推送新信息,能够再本人的 Controller 写个办法调用 WebSocketServer.sendInfo(); 即可

import com.softdev.system.demo.config.WebSocketServer;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.ModelAndView;
import java.io.IOException;

/**

  • WebSocketController
  • @author zhengkai.blog.csdn.net
    */

@RestController
public class DemoController {

@GetMapping("index")
public ResponseEntity<String> index(){return ResponseEntity.ok("申请胜利");
}

@GetMapping("page")
public ModelAndView page(){return new ModelAndView("websocket");
}

@RequestMapping("/push/{toUserId}")
public ResponseEntity<String> pushToWeb(String message, @PathVariable String toUserId) throws IOException {WebSocketServer.sendInfo(message,toUserId);
    return ResponseEntity.ok("MSG SEND SUCCESS");
}

}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

页面发动

页面用 js 代码调用 websocket,当然,太古老的浏览器是不行的,个别新的浏览器或者谷歌浏览器是没问题的。还有一点,记得协定是 ws 的,如果应用了一些门路类,能够 replace(“http”,“ws”)来替换协定。

<!DOCTYPE html>
<html>
<head>

<meta charset="utf-8">
<title>websocket 通信 </title>

</head>
<script src=”https://cdn.bootcss.com/jquery/3.3.1/jquery.js”></script>
<script>

var socket;
function openSocket() {if(typeof(WebSocket) == "undefined") {console.log("您的浏览器不反对 WebSocket");
    }else{console.log("您的浏览器反对 WebSocket");
        // 实现化 WebSocket 对象,指定要连贯的服务器地址与端口  建设连贯
        // 等同于 socket = new WebSocket("ws://localhost:8888/xxxx/im/25");
        //var socketUrl="${request.contextPath}/im/"+$("#userId").val();
        var socketUrl="http://localhost:9999/demo/imserver/"+$("#userId").val();
        socketUrl=socketUrl.replace("https","ws").replace("http","ws");
        console.log(socketUrl);
        if(socket!=null){socket.close();
            socket=null;
        }
        socket = new WebSocket(socketUrl);
        // 关上事件
        socket.onopen = function() {console.log("websocket 已关上");
            //socket.send("这是来自客户端的音讯" + location.href + new Date());
        };
        // 取得音讯事件
        socket.onmessage = function(msg) {console.log(msg.data);
            // 发现音讯进入    开始解决前端触发逻辑
        };
        // 敞开事件
        socket.onclose = function() {console.log("websocket 已敞开");
        };
        // 产生了谬误事件
        socket.onerror = function() {console.log("websocket 产生了谬误");
        }
    }
}
function sendMessage() {if(typeof(WebSocket) == "undefined") {console.log("您的浏览器不反对 WebSocket");
    }else {console.log("您的浏览器反对 WebSocket");
        console.log('{"toUserId":"'+$("#toUserId").val()+'","contentText":"'+$("#contentText").val()+'"}');
        socket.send('{"toUserId":"'+$("#toUserId").val()+'","contentText":"'+$("#contentText").val()+'"}');
    }
}

</script>
<body>
<p>【userId】:<div><input id=”userId” name=”userId” type=”text” value=”10″></div>
<p>【toUserId】:<div><input id=”toUserId” name=”toUserId” type=”text” value=”20″></div>
<p>【toUserId】:<div><input id=”contentText” name=”contentText” type=”text” value=”hello websocket”></div>
<p>【操作】:<div> 开启 socket</div>
<p>【操作】:<div> 发送音讯 </div>
</body>

</html>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64

运行成果

v20200105,退出开源我的项目 spring-cloud-study-websocket,更新运行成果,更不便了解。v1.1 的成果,刚刚修复了日志,并且反对指定监听某个端口,代码曾经全副更新,当初是这样的成果

关上两个页面,按 F12 调出控控制台查看测试成果:

页面 参数
http://localhost:9999/demo/page fromUserId=10,toUserId=20
http://localhost:9999/demo/page fromUserId=20,toUserId=10

别离开启 socket,再发送音讯
在这里插入图片形容
在这里插入图片形容

  1. 向前端推送数据:

    http://localhost:9999/demo/push/10?message=123123

在这里插入图片形容
通过调用 push api,能够向指定的 userId 推送信息,当然报文这里乱写,倡议规定好格局。
后续

针对简略 IM 的业务场景,进行了一些优化,能够看后续的文章 SpringBoot2+WebSocket 之聊天利用实战(优化版本)(v20201005 已整合)

次要变动是 CopyOnWriteArraySet 改为 ConcurrentHashMap,保障多线程平安同时不便利用 map.get(userId)进行推送到指定端口。

相比之前的 Set,Set 遍历是麻烦且麻烦的事件,而 Map 的 get 是简略便捷的,当 WebSocket 数量大的时候,这个小小的耗费就会聚少成多,影响体验,所以须要优化。在 IM 的场景下,指定 userId 进行推送音讯更加不便。
Websocker 注入 Bean 问题

对于这个问题,能够看最新发表的这篇文章,在参考和钻研了网上一些攻略后,我的项目曾经通过该办法注入胜利,大家能够参考。
对于 controller 调用 controller/service 调用 service/util 调用 service/websocket 中 autowired 的解决办法
netty-websocket-spring-boot-starter

Springboot2 构建基于 Netty 的高性能 Websocket 服务器 (netty-websocket-spring-boot-starter)
只须要换个 starter 即可实现高性能 websocket,连忙应用吧
Springboot2+Netty+Websocket

Springboot2+Netty 实现 Websocket,应用官网的 netty-all 的包,比原生的 websocket 更加稳固更加高性能,等同配置状况下能够 handle 更多的连贯。

代码款式全副曾经更正,也反对 websocket 连贯 url 带参数性能,另外也感激大家的浏览和评论,一起提高,谢谢!~~
ServerEndpointExporter 谬误

org.springframework.beans.factory.BeanCreationException: Error creating bean with name‘serverEndpointExporter’defined in class path resource [com/xxx/WebSocketConfig.class]: Invocation of init method failed; nested exception is java.lang.IllegalStateException: javax.websocket.server.ServerContainer not available

感激 @来了老弟儿 的反馈:

如果 tomcat 部署始终报这个错,请移除 WebSocketConfig 中 @Bean ServerEndpointExporter 的注入。

ServerEndpointExporter 是由 Spring 官网提供的规范实现,用于扫描 ServerEndpointConfig 配置类和 @ServerEndpoint 注解实例。应用规定也很简略:

如果应用默认的嵌入式容器 比方 Tomcat 则必须手工在上下文提供 ServerEndpointExporter。如果应用内部容器部署 war 包,则不须要提供提供 ServerEndpointExporter,因为此时 SpringBoot 默认将扫描服务端的行为交给内部容器解决,所以线上部署的时候要把 WebSocketConfig 中这段注入 bean 的代码注掉。

正式我的项目的前端 WebSocket 框架 GoEasy

感激 kkatrina 的补充,正式的我的项目中,个别是用第三方 websocket 框架来做,稳定性、实时性有保障的多,也会包含一些心跳、重连机制。

GoEasy 专一于服务器与浏览器, 浏览器与浏览器之间音讯推送, 完满兼容世界上的绝大多数浏览器, 包含 IE6, IE7 之类的十分古老的浏览器。反对 Uniapp, 各种小程序,react,vue 等所有支流 Web 前端技术。GoEasy 采纳 公布 / 订阅 的音讯模式, 帮忙您十分轻松的实现一对一, 一对多的通信。https://www.goeasy.io/cn/doc/

@Component 和 @ServerEndpoint 对于是否单例模式,是否应用 static Map 等一些问题的解答

看到大家都在热心的探讨对于是否单例模式这个问题,请大家置信本人的间接,如果 websocket 是单例模式,还怎么服务这么多 session 呢。

websocket 是原型模式,@ServerEndpoint 每次建设双向通信的时候都会创立一个实例,区别于 spring 的单例模式。Spring 的 @Component 默认是单例模式,请留神,默认 而已,是能够被扭转的。这里的 @Component 仅仅为了反对 @Autowired 依赖注入应用,如果不加则不能注入任何货色,为了不便。什么是 prototype 原型模式?根本就是你须要从 A 的实例失去一份与 A 内容雷同,然而又互不烦扰的实例 B 的话,就须要应用原型模式。对于在原型模式下应用 static 的 webSocketMap,请留神这是 ConcurrentHashMap,也就是线程平安 / 线程同步的,而且曾经是动态变量作为全局调用,这种状况下是 ok 的,或者大家如果有顾虑或者更好的想法的化,能够进行改良。例如应用一个两头类来接管和寄存 session。为什么每次都 @OnOpen 都要查看 webSocketMap.containsKey(userId),首先了为了代码强健性思考,假如代码以及机制没有问题,那么必定这个逻辑是废的对吧。然而理论应用的时候发现偶然会呈现重连失败或者其余起因导致之前的 session 还存在,这里就做了一个革除旧 session,迎接新 session 的性能。

Vue 版本的 websocket 连贯

感激 @GzrStudy 的奉献,供大家参考。

<script>
export default {

data() {
    return {
        socket:null,
        userId:localStorage.getItem("ms_uuid"),
        toUserId:'2',
        content:'3'
    }
},

methods: {

openSocket() {if (typeof WebSocket == "undefined") {console.log("您的浏览器不反对 WebSocket");
  } else {console.log("您的浏览器反对 WebSocket");
    // 实现化 WebSocket 对象,指定要连贯的服务器地址与端口  建设连贯
    // 等同于 socket = new WebSocket("ws://localhost:8888/xxxx/im/25");
    //var socketUrl="${request.contextPath}/im/"+$("#userId").val();
    var socketUrl =
      "http://localhost:8081/imserver/" + this.userId;
    socketUrl = socketUrl.replace("https", "ws").replace("http", "ws");
    console.log(socketUrl);
    if (this.socket != null) {this.socket.close();
      this.socket = null;
    }
    this.socket = new WebSocket(socketUrl);
    // 关上事件
    this.socket = new WebSocket(socketUrl);
    // 关上事件
    this.socket.onopen = function() {console.log("websocket 已关上");
      //socket.send("这是来自客户端的音讯" + location.href + new Date());
    };
    // 取得音讯事件
    this.socket.onmessage = function(msg) {console.log(msg.data);
      // 发现音讯进入    开始解决前端触发逻辑
    };
    // 敞开事件
    this.socket.onclose = function() {console.log("websocket 已敞开");
    };
    // 产生了谬误事件
    this.socket.onerror = function() {console.log("websocket 产生了谬误");
    };
  }
},
sendMessage() {if (typeof WebSocket == "undefined") {console.log("您的浏览器不反对 WebSocket");
  } else {console.log("您的浏览器反对 WebSocket");
    console.log(
      '{"toUserId":"' +
         this.toUserId +
        '","contentText":"' +
         this.content +
        '"}'
    );
    this.socket.send(
      '{"toUserId":"' +
         this.toUserId +
        '","contentText":"' +
         this.content +
        '"}'
     );

}

}
————————————————
版权申明:本文为 CSDN 博主「Moshow 郑锴」的原创文章,遵循 CC 4.0 BY-SA 版权协定,转载请附上原文出处链接及本申明。
原文链接:https://blog.csdn.net/moshowg…

正文完
 0