乐趣区

关于即时通讯:跟着源码一起学手把手教你用WebSocket打造Web端IM聊天

本文作者芋艿,原题“芋道 Spring Boot WebSocket 入门”,本次有订正和改变。

一、引言

WebSocket 现在在 Web 端即时通讯技术利用里应用宽泛,不仅用于传统 PC 端的网页里,也被很多挪动端开发者用于基于 HTML5 的混合 APP 里。对于想要在基于 Web 的利用里增加 IM、推送等实时通信性能,WebSocket 简直是必须要把握的技术。

本文将基于 Tomcat 和 Spring 框架实现一个逻辑简略的入门级 IM 利用,对于即时通讯初学者来说,能找到一个简略间接且能顺利跑通的实例代码,显然意义更大,本文正是如此。心愿能给你的 IM 开发和学习带来启发。

注: 源码在本文第四、五节结尾的附件处可下载。

学习交换:

  • 即时通讯 / 推送技术开发交换 5 群:215477170 [举荐]
  • 挪动端 IM 开发入门文章:《新手入门一篇就够:从零开发挪动端 IM》
  • 开源 IM 框架源码:https://github.com/JackJiang2011/MobileIMSDK

(本文同步公布于:http://www.52im.net/thread-3483-1-1.html)

二、常识筹备

如果你对 Web 端即时通讯常识一头雾水,务必先读:《新手入门贴:史上最全 Web 端即时通讯技术原理详解》、《Web 端即时通讯技术盘点:短轮询、Comet、Websocket、SSE》。

限于篇幅,本文不会深究 WebSocket 技术实践,如有趣味请从根底学习:

  • 《老手疾速入门:WebSocket 扼要教程》
  • 《WebSocket 从入门到精通,半小时就够!》
  • 《八问 WebSocket 协定:为你疾速解答 WebSocket 热门疑难》
  • 《WebSocket 详解(一):初步意识 WebSocket 技术》
  • 《WebSocket 详解(二):技术原理、代码演示和利用案例》
  • 《WebSocket 详解(三):深刻 WebSocket 通信协议细节》
  • 《WebSocket 详解(四):刨根问底 HTTP 与 WebSocket 的关系 (上篇)》
  • 《WebSocket 详解(五):刨根问底 HTTP 与 WebSocket 的关系 (下篇)》
  • 《WebSocket 详解(六):刨根问底 WebSocket 与 Socket 的关系》

如果想要更硬核一点的,能够读读上面这几篇:

  • 《WebSocket 硬核入门:200 行代码,教你徒手撸一个 WebSocket 服务器》
  • 《Web 端即时通讯实际干货:如何让你的 WebSocket 断网重连更疾速?》
  • 《实践联系实际:从零了解 WebSocket 的通信原理、协定格局、安全性》

三、内容概述

相比 HTTP 协定来说,WebSocket 协定对大多数后端开发者是比拟生疏的。

相对而言:WebSocket 协定重点是提供了服务端被动向客户端发送数据的能力,这样咱们就能够实现实时性较高的需要。例如:聊天 IM 即便通信性能、音讯订阅服务、网页游戏等等。

同时: 因为 WebSocket 应用 TCP 通信,能够防止反复创立连贯,晋升通信品质和效率。例如:美团的长连贯服务,具体能够看看《美团点评的挪动端网络优化实际:大幅晋升连贯成功率、速度等》。

情谊提醒:

这里有个误区,WebSocket 相比一般的 Socket 来说,仅仅是借助 HTTP 协定实现握手,创立连贯。后续的所有通信,都和 HTTP 协定无关。

看到这里,大家肯定认为又要开始哔哔 WebSocket 的概念。哈哈,我偏不~ 如果对这块不了的敌人,能够浏览本文“2、常识筹备”这一章。

要想应用 WebSocket,个别有如下几种解决方案可选:

  • 1)计划一:Spring WebSocket;
  • 2)计划二:Tomcat WebSocket;
  • 3)计划三:Netty WebSocket。

目前笔者手头有个波及到 IM 即便通信的我的项目,采纳的是计划三。

次要起因是: 咱们对 Netty 框架的实战、原理与源码,都绝对相熟一些,所以就思考了它。并且,除了须要反对 WebSocket 协定,咱们还想提供原生的 Socket 协定。

如果仅仅是仅仅提供 WebSocket 协定的反对,能够思考采纳计划一或者计划二,在应用上,两个计划是比拟靠近的。相比来说,计划一 Spring WebSocket 内置了对 STOMP 协定的反对。

不过: 本文还是采纳计划二“Tomcat WebSocket”来作为入门示例。咳咳咳,没有非凡的起因,次要是开始写本文之前,曾经花了 2 小时应用它写了一个示例。切实是有点懒,不想改。如果能重来,我要选李白,哈哈哈哈~

当然,不要慌,计划一和计划二的实现代码,真心没啥差异。

在开始搭建 Tomcat WebSocket 入门示例之前,咱们先来理解下 JSR-356 标准,定义了 Java 针对 WebSocket 的 API:即 Javax WebSocket。标准是大哥,打死不会提供实现,所以 JSR-356 也是如此。目前,支流的 Web 容器都曾经提供了 JSR-356 的实现,例如说 Tomcat、Jetty、Undertow 等等。

四、Tomcat WebSocket 实战入门

4.1、根本介绍

示例代码下载:

(因附件无奈上传到此处,请从同步链接处下载:http://www.52im.net/thread-3483-1-1.html)

代码目录内容是这样: 

在本大节中,咱们会应用 Tomcat WebSocket 搭建一个 WebSocket 的示例。

提供如下音讯的性能反对:

  • 1)身份认证申请;
  • 2)私聊音讯;
  • 3)群聊音讯。

思考到让示例更加易懂,咱们先做成全局有且仅有一个大的聊天室,即建设上 WebSocket 的连贯,都主动动进入该聊天室。

上面,开始漫游 WebSocket 这个鱼塘 …

4.2、引入依赖

在 pom.xml 文件中,引入相干依赖。

<?xml version=”1.0″encoding=”UTF-8″?>
<project xmlns=”http://maven.apache.org/POM/4.0.0″
         xmlns:xsi=”http://www.w3.org/2001/XMLSchema-instance”
         xsi:schemaLocation=”http://maven.apache.org/POM/4.0.0 [url=http://maven.apache.org/xsd/m…”>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.10.RELEASE</version>
        <relativePath/> <!– lookup parent from repository –>
    </parent>
    <modelVersion>4.0.0</modelVersion>
    <artifactId>lab-25-01</artifactId>
    <dependencies>
        <!– 实现对 WebSocket 相干依赖的引入,不便~ –>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>
        <!– 引入 Fastjson,实现对 JSON 的序列化,因为后续咱们会应用它解析音讯 –>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.62</version>
        </dependency>
    </dependencies>
</project>
具体每个依赖的作用,本人认真看下正文。

4.3、WebsocketServerEndpoint

在 cn.iocoder.springboot.lab25.springwebsocket.websocket 包门路下,创立 WebsocketServerEndpoint 类,定义 Websocket 服务的端点(EndPoint)。

代码如下:

// WebsocketServerEndpoint.java
@Controller
@ServerEndpoint(“/”)
public class WebsocketServerEndpoint {
    private Logger logger = LoggerFactory.getLogger(getClass());
    @OnOpen
    public void onOpen(Session session, EndpointConfig config) {
        logger.info(“onOpen”, session);
    }
    @OnMessage
    public void onMessage(Session session, String message) {
        logger.info(“onOpen”, session, message); // 生产环境下,请设置成 debug 级别
    }
    @OnClose
    public void onClose(Session session, CloseReason closeReason) {
        logger.info(“onClose”, session, closeReason);
    }
    @OnError
    public void onError(Session session, Throwable throwable) {
        logger.info(“onClose”, session, throwable);
    }
}

如代码所示:

  • 1)在类上,增加 @Controller 注解,保障创立一个 WebsocketServerEndpoint Bean;
  • 2)在类上,增加 JSR-356 定义的 @ServerEndpoint 注解,标记这是一个 WebSocket EndPoint,门路为 /;
  • 3)WebSocket 一共有四个事件,别离对应应用 JSR-356 定义的 @OnOpen、@OnMessage、@OnClose、@OnError 注解。

这是最简版的 WebsocketServerEndpoint 的代码。在下文,咱们会缓缓把代码补全。

4.4、WebSocketConfiguration

在 cn.iocoder.springboot.lab24.springwebsocket.config 包门路下,创立 WebsocketServerEndpoint 配置类。

代码如下:

// WebSocketConfiguration.java
@Configuration
// @EnableWebSocket // 无需增加该注解,因为咱们并不是应用 Spring WebSocket
public class WebSocketConfiguration {
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

PS: 在 #serverEndpointExporter() 办法中,创立 ServerEndpointExporter Bean。该 Bean 的作用,是扫描增加有 @ServerEndpoint 注解的 Bean。

4.5、Application

创立 Application.java 类,配置 @SpringBootApplication 注解即可。

代码如下:

// Application.java
@SpringBootApplication
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

执行 Application 启动该示例我的项目。

思考到大家可能不会或者不违心写前端代码,所以咱们间接应用 WebSocket 在线测试工具,测试 WebSocket 连贯。

如下图:

至此,最简略的一个 WebSocket 我的项目的骨架,咱们曾经搭建实现。上面,咱们开始革新,把相应的逻辑补全。

4.6、音讯

在 HTTP 协定中,是基于 Request/Response 申请响应的同步模型,进行交互。在 Websocket 协定中,是基于 Message 音讯的异步模型,进行交互。这一点,是很大的不同的,等会看到具体的音讯类,感触会更显著。

因为 WebSocket 协定,不像 HTTP 协定有 URI 能够辨别不同的 API 申请操作,所以咱们须要在 WebSocket 的 Message 里,减少可能标识音讯类型,这里咱们采纳 type 字段。

所以在这个示例中,咱们采纳的 Message 采纳 JSON 格局编码。

格局如下:

{
    type: “”, // 音讯类型
    body: {} // 音讯体
}

解释一下:

  • 1)type 字段,音讯类型。通过该字段,咱们晓得应用哪个 MessageHandler 音讯处理器(对于 MessageHandler,咱们在下一节中,具体解析);
  • 2)body 字段,音讯体。不同的音讯类型,会有不同的音讯体;
  • 3)Message 采纳 JSON 格局编码,次要思考便捷性,理论我的项目下,也能够思考 Protobuf 等更加高效且节俭流量的编码格局。

实际上: 咱们在该示例中,body 字段对应的 Message 相干的接口和类,切实想不到名字了。所有的 Message 们,咱们都放在 cn.iocoder.springboot.lab25.springwebsocket.message 包门路下。

4.6.1 Message

创立 Message 接口,根底音讯体,所有音讯体都要实现该接口。

代码如下:

// Message.java
publicinterfaceMessage {
}

目前作为一个标记接口,未定义任何操作。

4.6.2 认证相干 Message

创立 AuthRequest 类,用户认证申请。

代码如下:

// AuthRequest.java
public class AuthRequest implements Message {
    public static final String TYPE = “AUTH_REQUEST”;
    /**
     * 认证 Token
     */
    private String accessToken;
    // … 省略 set/get 办法
}

解释一下:

  • 1)TYPE 动态属性,音讯类型为 AUTH_REQUEST。
  • 2)accessToken 属性,认证 Token。

对于第 2)点,在 WebSocket 协定中,咱们也须要认证以后连贯,用户身份是什么。个别状况下,咱们采纳用户调用 HTTP 登录接口,登录胜利后返回的拜访令牌 accessToken。这里,咱们先不拓展开讲,预先能够看看《基于 Token 认证的 WebSocket 连贯》文章。

尽管说,WebSocket 协定是基于 Message 模型,进行交互。然而,这并不意味着它的操作,不须要响应后果。例如说,用户认证申请,是须要用户认证响应的。所以,咱们创立 AuthResponse 类,作为用户认证响应。

代码如下:

// AuthResponse.java
public class AuthResponse implements Message {
    public static final String TYPE = “AUTH_RESPONSE”;
    /**
     * 响应状态码
     */
    private Integer code;
    /**
     * 响应提醒
     */
    private String message;
    // … 省略 set/get 办法
}

解释一下:

  • 1)TYPE 动态属性,音讯类型为 AUTH_REQUEST;
  • 2)code 属性,响应状态码;
  • 3)message 属性,响应提醒。

对于第 1)点,实际上,咱们在每个 Message 实现类上,都减少了 TYPE 动态属性,作为音讯类型。上面,咱们就不反复赘述了。

在本示例中,用户胜利认证之后,会播送用户退出群聊的告诉 Message,应用 UserJoinNoticeRequest。

代码如下:

// UserJoinNoticeRequest.java
public class UserJoinNoticeRequest implements Message {
    public static final String TYPE = “USER_JOIN_NOTICE_REQUEST”;
    /**
     * 昵称
     */
    private String nickname;
    // … 省略 set/get 办法
}

实际上,咱们能够在须要应用到 Request/Response 模型的中央,将 Message 进行拓展:

  • 1)Request 抽象类,减少 requestId 字段,示意申请编号;
  • 2)Response 抽象类,减少 requestId 字段,和每一个 Request 申请映射上(同时,外面对立定义 code 和 message 属性,示意响应状态码和响应提醒)。

这样,在应用到同步模型的业务场景下,Message 实现类应用 Request/Reponse 作为后缀。例如说,用户认证申请、删除一个好友申请等等。

而在应用到异步模型能的业务场景下,Message 实现类还是持续 Message 作为后缀。例如说,发送一条音讯,用户操作完后,无需阻塞期待后果

4.6.3 发送音讯相干 Message

创立 SendToOneRequest 类,发送给指定人的私聊音讯的 Message。

代码如下:

// SendToOneRequest.java
public class SendToOneRequest implements Message {
    public static final String TYPE = “SEND_TO_ONE_REQUEST”;
    /**
     * 发送给的用户
     */
    private String toUser;
    /**
     * 音讯编号
     */
    private String msgId;
    /**
     * 内容
     */
    private String content;
    // … 省略 set/get 办法
}

每个字段,本人看正文噢。

创立 SendToAllRequest 类,发送给所有人的群聊音讯的 Message。

代码如下:

// SendToAllRequest.java
public class SendToAllRequest implements Message {
    public static final String TYPE = “SEND_TO_ALL_REQUEST”;
    /**
     * 音讯编号
     */
    private String msgId;
    /**
     * 内容
     */
    private String content;
    // … 省略 set/get 办法
}

每个字段,本人看正文噢。

在服务端接管到发送音讯的申请,须要异步响应发送是否胜利。所以,创立 SendResponse 类,发送音讯响应后果的 Message。

代码如下:

// SendResponse.java
public class SendResponse implements Message {
    public static final String TYPE = “SEND_RESPONSE”;
    /**
     * 音讯编号
     */
    private String msgId;
    /**
     * 响应状态码
     */
    private Integer code;
    /**
     * 响应提醒
     */
    private String message;
    // … 省略 set/get 办法
}

重点看 msgId 字段:即音讯编号。客户端在发送音讯,通过应用 UUID 算法,生成全局惟一音讯编号(惟一 ID 的生成技术见:《从老手到专家:如何设计一套亿级音讯量的分布式 IM 零碎》的“_5、惟一 ID 的技术计划_”章节)。这样,服务端通过 SendResponse 音讯响应,通过 msgId 做映射。

在服务端接管到发送音讯的申请,须要转发音讯给对应的人。所以,创立 SendToUserRequest 类,发送音讯给一个用户的 Message。

代码如下:

// SendResponse.java
public class SendToUserRequest implements Message {
    public static final String TYPE = “SEND_TO_USER_REQUEST”;
    /**
     * 音讯编号
     */
    private String msgId;
    /**
     * 内容
     */
    private String content;
    // … 省略 set/get 办法
}

相比 SendToOneRequest 来说,少一个 toUser 字段。因为,咱们能够通过 WebSocket 连贯,曾经晓得发送给谁了。

4.7、音讯处理器

每个客户端发动的 Message 音讯类型,咱们会申明对应的 MessageHandler 音讯处理器。这个就相似在 SpringMVC 中,每个 API 接口对应一个 Controller 的 Method 办法。

所有的 MessageHandler 们,咱们都放在 cn.iocoder.springboot.lab25.springwebsocket.handler 包门路下。

4.7.1 MessageHandler

创立 MessageHandler 接口,音讯处理器接口。

代码如下:

// MessageHandler.java
public interface MessageHandler<T extends Message> {
    /**
     * 执行解决音讯
     *
     * @param session 会话
     * @param message 音讯
     */
    void execute(Session session, T message);
    /**
     * @return 音讯类型,即每个 Message 实现类上的 TYPE 动态字段
     */
    String getType();
}

解释一下:

  • 1)定义了泛型 <T>,须要是 Message 的实现类;
  • 2)定义的两个接口办法,本人看下正文哈。

4.7.2 AuthMessageHandler

创立 AuthMessageHandler 类,解决 AuthRequest 音讯。

代码如下:

// AuthMessageHandler.java
@Component
public class AuthMessageHandler implements MessageHandler<AuthRequest> {
    @Override
    public void execute(Session session, AuthRequest message) {
        // 如果未传递 accessToken
        if(StringUtils.isEmpty(message.getAccessToken())) {
            WebSocketUtil.send(session, AuthResponse.TYPE,
                    new AuthResponse().setCode(1).setMessage(“ 认证 accessToken 未传入 ”));
            return;
        }
        // 增加到 WebSocketUtil 中
        WebSocketUtil.addSession(session, message.getAccessToken()); // 思考到代码简化,咱们先间接应用 accessToken 作为 User
        // 判断是否认证胜利。这里,伪装间接胜利
        WebSocketUtil.send(session, AuthResponse.TYPE,newAuthResponse().setCode(0));
        // 告诉所有人,某个人退出了。这个是可选逻辑,仅仅是为了演示
        WebSocketUtil.broadcast(UserJoinNoticeRequest.TYPE,
                newUserJoinNoticeRequest().setNickname(message.getAccessToken())); // 思考到代码简化,咱们先间接应用 accessToken 作为 User
    }
    @Override
    public String getType() {
        return AuthRequest.TYPE;
    }
}

代码比较简单,跟着代码读读即可。

对于 WebSocketUtil 类,咱们在「5.8、WebSocketUtil」一节中再来具体看看。

4.7.3 SendToOneRequest

创立 SendToOneHandler 类,解决 SendToOneRequest 音讯。

代码如下:

// SendToOneRequest.java
@Component
public class SendToOneHandler implements MessageHandler<SendToOneRequest> {
    @Override
    public void execute(Session session, SendToOneRequest message) {
        // 这里,伪装间接胜利
        SendResponse sendResponse = newSendResponse().setMsgId(message.getMsgId()).setCode(0);
        WebSocketUtil.send(session, SendResponse.TYPE, sendResponse);
        // 创立转发的音讯
        SendToUserRequest sendToUserRequest = newSendToUserRequest().setMsgId(message.getMsgId())
                .setContent(message.getContent());
        // 播送发送
        WebSocketUtil.send(message.getToUser(), SendToUserRequest.TYPE, sendToUserRequest);
    }
    @Override
    public String getType() {
        return SendToOneRequest.TYPE;
    }
}

代码比较简单,跟着代码读读即可。

4.7.4 SendToAllHandler

创立 SendToAllHandler 类,解决 SendToAllRequest 音讯。

代码如下:

// SendToAllRequest.java
@Component
public class SendToAllHandler implements MessageHandler<SendToAllRequest> {
    @Override
    public void execute(Session session, SendToAllRequest message) {
        // 这里,伪装间接胜利
        SendResponse sendResponse = newSendResponse().setMsgId(message.getMsgId()).setCode(0);
        WebSocketUtil.send(session, SendResponse.TYPE, sendResponse);
        // 创立转发的音讯
        SendToUserRequest sendToUserRequest = newSendToUserRequest().setMsgId(message.getMsgId())
                .setContent(message.getContent());
        // 播送发送
        WebSocketUtil.broadcast(SendToUserRequest.TYPE, sendToUserRequest);
    }
    @Override
    public String getType() {
        return SendToAllRequest.TYPE;
    }
}

代码比较简单,跟着代码读读即可。

4.8、WebSocketUtil

代码在 cn.iocoder.springboot.lab25.springwebsocket.util 包门路下。

创立 WebSocketUtil 工具类,次要提供两方面的性能:

  • 1)Session 会话的治理;
  • 2)多种发送音讯的形式。

整体代码比较简单,本人瞅瞅哟。

代码在目录中的如下地位:

4.9、欠缺 WebsocketServerEndpoint

在本大节,咱们会批改 WebsocketServerEndpoint 的代码,欠缺其性能。

4.9.1 初始化 MessageHandler 汇合

实现 InitializingBean 接口,在 #afterPropertiesSet() 办法中,扫描所有 MessageHandler Bean,增加到 MessageHandler 汇合中。

代码如下:

// WebsocketServerEndpoint.java
/**
 * 音讯类型与 MessageHandler 的映射
 *
 * 留神,这里设置成动态变量。尽管说 WebsocketServerEndpoint 是单例,然而 Spring Boot 还是会为每个 WebSocket 创立一个 WebsocketServerEndpoint Bean。
 */
private static final Map<String, MessageHandler> HANDLERS = newHashMap<>();
@Autowired
private ApplicationContext applicationContext;
@Override
public void afterPropertiesSet() throws Exception {
    // 通过 ApplicationContext 取得所有 MessageHandler Bean
    applicationContext.getBeansOfType(MessageHandler.class).values() // 取得所有 MessageHandler Bean.forEach(messageHandler -> HANDLERS.put(messageHandler.getType(), messageHandler)); // 增加到 handlers 中
    logger.info(“afterPropertiesSet”, HANDLERS.size());
}

通过这样的形式,能够防止手动配置 MessageHandler 与音讯类型的映射。

4.9.2 onOpen

从新实现 #onOpen(Session session, EndpointConfig config) 办法,实现连贯时,应用 accessToken 参数进行用户认证。

代码如下:

// WebsocketServerEndpoint.java
@OnOpen
public void onOpen(Session session, EndpointConfig config) {
    logger.info(“onOpen”, session);
    // <1> 解析 accessToken
    List<String> accessTokenValues = session.getRequestParameterMap().get(“accessToken”);
    String accessToken = !CollectionUtils.isEmpty(accessTokenValues) ? accessTokenValues.get(0) : null;
    // <2> 创立 AuthRequest 音讯类型
    AuthRequest authRequest = newAuthRequest().setAccessToken(accessToken);
    // <3> 取得音讯处理器
    MessageHandler<AuthRequest> messageHandler = HANDLERS.get(AuthRequest.TYPE);
    if(messageHandler == null) {
        logger.error(“onOpen”);
        return;
    }
    messageHandler.execute(session, authRequest);
}

如代码所示:

  • <1> 处:解析 ws:// 地址上的 accessToken 的申请参。例如说:ws://127.0.0.1:8080?accessToken=999999;
  • <2> 处:创立 AuthRequest 音讯类型,并设置 accessToken 属性;
  • <3> 处:取得 AuthRequest 音讯类型对应的 MessageHandler 音讯处理器,而后调用 MessageHandler#execute(session, message) 办法,执行解决用户认证申请。

关上三个浏览器创立,别离设置服务地址如下:

  • 1)ws://127.0.0.1:8080/?accessToken= 芋艿;
  • 2)ws://127.0.0.1:8080/?accessToken= 番茄;
  • 3)ws://127.0.0.1:8080/?accessToken= 土豆。

而后,一一点击「开启连贯」按钮,进行 WebSocket 连贯。

最终成果如下图:

如上图所示:

  • 1)在红圈中,能够看到 AuthResponse 的音讯;
  • 2)在黄圈中,能够看到 UserJoinNoticeRequest 的音讯。

4.9.3 onMessage

从新实现 _#onMessage(Session session, String message)_ 办法,实现不同的音讯,转发给不同的 MessageHandler 音讯处理器。

代码如下:

// WebsocketServerEndpoint.java
@OnMessage
public void onMessage(Session session, String message) {
    logger.info(“onOpen”, session, message); // 生产环境下,请设置成 debug 级别
    try{
        // <1> 取得音讯类型
        JSONObject jsonMessage = JSON.parseObject(message);
        String messageType = jsonMessage.getString(“type”);
        // <2> 取得音讯处理器
        MessageHandler messageHandler = HANDLERS.get(messageType);
        if(messageHandler == null) {
            logger.error(“onMessage”, messageType);
            return;
        }
        // <3> 解析音讯
        Class<? extendsMessage> messageClass = this.getMessageClass(messageHandler);
        // <4> 解决音讯
        Message messageObj = JSON.parseObject(jsonMessage.getString(“body”), messageClass);
        messageHandler.execute(session, messageObj);
    } catch(Throwable throwable) {
        logger.info(“onMessage”, session, throwable);
    }
}

代码中:

  • <1> 处,取得音讯类型,从 “type” 字段中;
  • <2> 处,取得音讯类型对应的 MessageHandler 音讯处理器;
  • <3> 处,调用 #getMessageClass(MessageHandler handler) 办法,通过 MessageHandler 中,通过解析其类上的泛型,取得音讯类型对应的 Class 类。

代码如下:

// WebsocketServerEndpoint.java
private Class<? extends Message> getMessageClass(MessageHandler handler) {
    // 取得 Bean 对应的 Class 类名。因为有可能被 AOP 代理过。
    Class<?> targetClass = AopProxyUtils.ultimateTargetClass(handler);
    // 取得接口的 Type 数组
    Type[] interfaces = targetClass.getGenericInterfaces();
    Class<?> superclass = targetClass.getSuperclass();
    while((Objects.isNull(interfaces) || 0== interfaces.length) && Objects.nonNull(superclass)) {// 此处,是以父类的接口为准
        interfaces = superclass.getGenericInterfaces();
        superclass = targetClass.getSuperclass();
    }
    if(Objects.nonNull(interfaces)) {
        // 遍历 interfaces 数组
        for(Type type : interfaces) {
            // 要求 type 是泛型参数
            if(type instanceof ParameterizedType) {
                ParameterizedType parameterizedType = (ParameterizedType) type;
                // 要求是 MessageHandler 接口
                if(Objects.equals(parameterizedType.getRawType(), MessageHandler.class)) {
                    Type[] actualTypeArguments = parameterizedType.getActualTypeArguments();
                    // 取首个元素
                    if(Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) {
                        return(Class<Message>) actualTypeArguments[0];
                    } else{
                        thrownewIllegalStateException(String.format(“ 类型 (%s) 取得不到音讯类型 ”, handler));
                    }
                }
            }
        }
    }
    throw new IllegalStateException(String.format(“ 类型 (%s) 取得不到音讯类型 ”, handler));
}

这是参考 rocketmq-spring 我的项目的 DefaultRocketMQListenerContainer#getMessageType() 办法,进行稍微批改。

如果大家对 Java 的泛型机制没有做过一点理解,可能稍微有点硬核。能够先临时跳过,晓得用意即可。

<4> 处,调用 MessageHandler#execute(session, message) 办法,执行解决申请。

另外: 这里减少了 try-catch 代码,防止整个执行的过程中,产生异样。如果在 onMessage 事件的解决中,产生异样,该音讯对应的 Session 会话会被主动敞开。显然,这个不合乎咱们的要求。例如说,在 MessageHandler 解决音讯的过程中,产生一些异样是无奈防止的。

持续基于上述创立的三个浏览器,咱们先点击「清空音讯」按钮,清空下音讯,清扫下上次测试展现进去的接管失去的 Message。当然,WebSocket 的连贯,不须要去断开。

在第一个浏览器中,别离发送两种聊天音讯。

一条 SendToOneRequest 私聊音讯:

{
    type: “SEND_TO_ONE_REQUEST”,
    body: {
        toUser: “ 番茄 ”,
        msgId: “eaef4a3c-35dd-46ee-b548-f9c4eb6396fe”,
        content: “ 我是一条单聊音讯 ”
    }
}

一条 SendToAllHandler 群聊音讯:

{
    type: “SEND_TO_ALL_REQUEST”,
    body: {
        msgId: “838e97e1-6ae9-40f9-99c3-f7127ed64747”,
        content: “ 我是一条群聊音讯 ”
    }
}

最终后果如下图:

如上图所示:

  • 1)在红圈中,能够看到一条 SendToUserRequest 的音讯,仅有第二个浏览器(番茄)收到;
  • 2)在黄圈中,能够看到三条 SendToUserRequest 的音讯,所有浏览器都收到。

4.9.4 onClose

从新实现 _#onClose(Session session, CloseReason closeReason)_ 办法,实现移除敞开的 Session。

代码如下:

// WebsocketServerEndpoint.java
@OnClose
public void onClose(Session session, CloseReason closeReason) {
    logger.info(“onClose”, session, closeReason);
    WebSocketUtil.removeSession(session);
}

4.9.5 onError

#onError(Session session, Throwable throwable) 办法,放弃不变。

代码如下:

// WebsocketServerEndpoint.java
@OnError
public void onError(Session session, Throwable throwable) {
    logger.info(“onClose”, session, throwable);
}

五、Spring WebSocket 实战入门

5.0、根底介绍

示例代码下载:

(因附件无奈上传到此处,请从同步链接处下载:http://www.52im.net/thread-3483-1-1.html)

认真一个捉摸,虎躯一震,还是提供一个 Spring WebSocket 疾速入门的示例。

在 上章「Tomcat WebSocket 实战入门」的 _lab-websocket-25-01_ 示例的根底上,咱们复制出 lab-websocket-25-02 我的项目,进行革新。

革新的代码目录内容是这样:

5.1、WebSocketUtil

因为 Tomcat WebSocket 应用的是 Session 作为会话,而 Spring WebSocket 应用的是 WebSocketSession 作为会话,导致咱们须要稍微批改下 WebSocketUtil 工具类。改变十分稍微,点击 WebSocketUtil.java 查看下,秒懂的噢。

次要有两点:

  • 1)将所有应用 Session 类的中央,调整成 WebSocketSession 类;
  • 2)将发送音讯,从 Session 批改成 WebSocketSession。

5.2、音讯处理器

将 _cn.iocoder.springboot.lab25.springwebsocket.handler_ 包门路下的音讯处理器们,应用到 Session 类的中央,调整成 WebSocketSession 类。

5.3、DemoWebSocketShakeInterceptor

在 _cn.iocoder.springboot.lab25.springwebsocket.websocket_ 包门路下,创立 DemoWebSocketShakeInterceptor 拦截器。因为 WebSocketSession 无奈取得 ws 地址上的申请参数,所以只好通过该拦截器,取得 accessToken 申请参数,设置到 attributes 中。

代码如下:

// DemoWebSocketShakeInterceptor.java
public class DemoWebSocketShakeInterceptor extends HttpSessionHandshakeInterceptor {
    @Override// 拦挡 Handshake 事件
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,WebSocketHandler wsHandler, Map<String, Object> attributes) throwsException {
        // 取得 accessToken
        if(request instanceof ServletServerHttpRequest) {
            ServletServerHttpRequest serverRequest = (ServletServerHttpRequest) request;
            attributes.put(“accessToken”, serverRequest.getServletRequest().getParameter(“accessToken”));
        }
        // 调用父办法,继续执行逻辑
        return super.beforeHandshake(request, response, wsHandler, attributes);
    }
}

5.4、DemoWebSocketHandler

在 _cn.iocoder.springboot.lab25.springwebsocket.websocket_ 包门路下,创立 DemoWebSocketHandler 处理器。该处理器参考「5.9、欠缺 WebsocketServerEndpoint」大节,编写它的代码。

DemoWebSocketHandler.java 代码位于如下目录处,具体内容就不贴出来了,自已去读一读:

代码极其类似,简略撸下即可。

5.5、WebSocketConfiguration

批改 WebSocketConfiguration 配置类,代码如下:

// WebSocketConfiguration.java
@Configuration
@EnableWebSocket// 开启 Spring WebSocket
public class WebSocketConfiguration implements WebSocketConfigurer {
    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(this.webSocketHandler(), “/”) // 配置处理器
                .addInterceptors(newDemoWebSocketShakeInterceptor()) // 配置拦截器
                .setAllowedOrigins(“*”); // 解决跨域问题
    }
    @Bean
    public DemoWebSocketHandler webSocketHandler() {
        return new DemoWebSocketHandler();
    }
    @Bean
    public DemoWebSocketShakeInterceptor webSocketShakeInterceptor() {
        return new DemoWebSocketShakeInterceptor();
    }
}

解释一下:

  • 1)在类上,增加 @EnableWebSocket 注解,开启 Spring WebSocket 性能;
  • 2)实现 WebSocketConfigurer 接口,自定义 WebSocket 的配置(具体能够看看 #registerWebSocketHandlers(registry) 办法,配置 WebSocket 处理器、拦截器,以及容许跨域)。

至此,咱们曾经实现 Spring WebSocket 的示例。

前面,咱们执行 Application 来启动我的项目。具体的测试,这里就不反复了,能够本人应用 WebSocket 在线测试工具 来测试下。

七、写在最初

尽管说,WebSocket 协定曾经在支流的浏览器上,失去十分好的反对,然而总有一些“异类”,是不兼容的。所以就诞生了 SockJS、Socket.io 这类库。对于它们的介绍与应用,能够看看《SockJS 简略介绍》、《Web 端即时通讯技术的倒退与 WebSocket、Socket.io 的技术实际》文章。

理论场景下,咱们在应用 WebSocket 还是原生 Socket 也好,都须要思考“如何保障音讯肯定送达给用户?”

大家必定可能想到的是:如果用户不处于在线的时候,音讯长久化到 MySQL、MongoDB 等等数据库中。这个是正确,且是必须要做的。

咱们在一起思考下边界场景:客户端网络环境较差,特地是在挪动端场景下,呈现网络闪断,可能会呈现连贯理论曾经断开,而服务端认为客户端处于在线的状况。此时,服务端会将音讯发给客户端,那么音讯理论就发送到“空气”中,产生失落的状况。

要解决这种状况下的问题,须要引入客户端的 ACK 音讯机制。

目前,支流的有两种做法。

第一种: 基于每一条音讯编号 ACK

整体流程如下:

  • 1)无论客户端是否在线,服务端都先把接管到的音讯长久化到数据库中。如果客户端此时在线,服务端将残缺音讯推送给客户端;
  • 2)客户端在接管到音讯之后,发送 ACK 音讯编号给服务端,告知曾经收到该音讯。服务端在收到 ACK 音讯编号的时候,标记该音讯曾经发送胜利;
  • 3)服务端定时轮询,在线的客户端,是否有超过 N 秒未 ACK 的音讯。如果有,则从新发送音讯给对应的客户端。

这种计划,因为客户端逐条 ACK 音讯编号,所以会导致客户端和服务端交互次数过多。当然,客户端能够异步批量 ACK 多条音讯,从而缩小次数。

不过因为服务端依然须要定时轮询,也会导致服务端压力较大。所以,这种计划根本曾经不采纳了。

第二种: 基于滑动窗口 ACK

整体流程如下:

  • 1)无论客户端是否在线,服务端都先把接管到的音讯长久化到数据库中。如果客户端此时在线,服务端将音讯编号推送给客户端;
  • 2)客户端在接管到音讯编号之后,和本地的音讯编号进行比对。如果比本地的小,阐明该音讯曾经收到,疏忽不解决;如果比本地的大,应用本地的音讯编号,向服务端拉取大于本地的音讯编号的音讯列表,即增量音讯列表。拉取实现后,更新音讯列表中最大的音讯编号为新的本地的音讯编号;
  • 3)服务端在收到客户端拉取增量的音讯列表时,将申请的编号记录到数据库中,用于晓得客户端此时本地的最新消息编号;
  • 4)思考到服务端将音讯编号推送给客户端,也会存在失落的状况,所以客户端会每 N 秒定时向服务端拉取大于本地的音讯编号的音讯列表。

这种形式,在业务被称为推拉联合的计划,在分布式音讯队列、配置核心、注册核心实现实时的数据同步,常常被采纳。

并且,采纳这种计划的状况下,客户端和服务端不肯定须要应用长连贯,也能够应用长轮询所代替。

做法比方,客户端发送带有音讯版本号的 HTTP 申请到服务端:

  • 1)如果服务端已有比客户端新的音讯编号,则间接返回增量的音讯列表;
  • 2)如果服务端没有比客户端新的音讯编号,则 HOLD 住申请,直到有新的音讯列表能够返回,或者 HTTP 申请超时;
  • 3)客户端在收到 HTTP 申请超时时,立刻又从新发动带有音讯版本号的 HTTP 申请到服务端。如此重复循环,通过音讯编号作为增量标识,达到实时获取音讯的目标。

如果大家对音讯牢靠投递这块感兴趣,能够看看上面这几篇:

  • 《零根底 IM 开发入门 (三):什么是 IM 零碎的可靠性?》
  • 《从客户端的角度来谈谈挪动端 IM 的音讯可靠性和送达机制》
  • 《IM 音讯送达保障机制实现 (一):保障在线实时音讯的牢靠投递》
  • 《IM 音讯送达保障机制实现 (二):保障离线音讯的牢靠投递》
  • 《IM 群聊音讯如此简单,如何保障不丢不重?》
  • 《IM 开发干货分享:如何优雅的实现大量离线音讯的牢靠投递》
  • 《一套亿级用户的 IM 架构技术干货 (下篇):可靠性、有序性、弱网优化等》

毕竟,本篇这里写的有点简略哈 ~

最初: 如果你想零碎的学习 IM 开发方面方面的常识,举荐详读:《新手入门一篇就够:从零开发挪动端 IM》。如果你自认为曾经有点小牛 x 了,能够看看生产环境下的大用户量 IM 零碎架构设计方面的常识:《从老手到专家:如何设计一套亿级音讯量的分布式 IM 零碎》。

限于篇幅,这里就不再持续开展了。

附录:更多 IM 开发入手实际文章

《自已开发 IM 有那么难吗?手把手教你自撸一个 Andriod 版繁难 IM (有源码)》
《一种 Android 端 IM 智能心跳算法的设计与实现探讨(含样例代码)》
《手把手教你用 Netty 实现网络通信程序的心跳机制、断线重连机制》
《[轻量级即时通讯框架 MobileIMSDK 的 iOS 源码(开源版)[ 附件下载]](http://www.52im.net/thread-35…》
《[开源 IM 工程“蘑菇街 TeamTalk”2015 年 5 月前未删减版残缺代码 [ 附件下载]](http://www.52im.net/thread-77…》
《[NIO 框架入门 ( 一):服务端基于 Netty4 的 UDP 双向通信 Demo 演示 [附件下载]](http://www.52im.net/thread-36…》
《[NIO 框架入门 ( 二):服务端基于 MINA2 的 UDP 双向通信 Demo 演示 [附件下载]](http://www.52im.net/thread-37…》
《[NIO 框架入门 ( 三):iOS 与 MINA2、Netty4 的跨平台 UDP 双向通信实战 [附件下载]](http://www.52im.net/thread-37…》
《[NIO 框架入门 ( 四):Android 与 MINA2、Netty4 的跨平台 UDP 双向通信实战 [附件下载]](http://www.52im.net/thread-38…》
《[一个 WebSocket 实时聊天室 Demo:基于 node.js+socket.io [ 附件下载]](http://www.52im.net/thread-51…》
《适宜老手:从零开发一个 IM 服务端(基于 Netty,有残缺源码)》
《拿起键盘就是干:跟我一起徒手开发一套分布式 IM 零碎》
《正确理解 IM 长连贯的心跳及重连机制,并入手实现(有残缺 IM 源码)》
《适宜老手:手把手教你用 Go 疾速搭建高性能、可扩大的 IM 零碎 (有源码)》
《跟着源码一起学:手把手教你用 WebSocket 打造 Web 端 IM 聊天》

本文已同步公布于“即时通讯技术圈”公众号。

▲ 本文在公众号上的链接是:点此进入。同步公布链接是:http://www.52im.net/thread-3483-1-1.html

退出移动版