关于java:WebSocket-分布式集群怎么搞

8次阅读

共计 9801 个字符,预计需要花费 25 分钟才能阅读完成。

作者:邱城铨 \
起源:https://segmentfault.com/a/11…

问题起因

最近做我的项目时遇到了须要多用户之间通信的问题,波及到了 WebSocket 握手申请,以及集群中 WebSocket Session 共享的问题。

期间我通过了几天的钻研,总结出了几个实现分布式 WebSocket 集群的方法,从 zuul 到 spring cloud gateway 的不同尝试,总结出了这篇文章,心愿能帮忙到某些人,并且能一起分享这方面的想法与钻研。

以下是我的场景形容

  • 资源 :4 台服务器。其中只有一台服务器具备 ssl 认证域名,一台 redis+mysql 服务器,两台应用服务器(集群)
  • 利用公布限度条件 :因为场景须要,利用场合须要 ssl 认证的域名能力公布。因而 ssl 认证的域名服务器用来当 api 网关,负责 https 申请与 wss(平安认证的 ws)连贯。俗称 https 卸载,用户申请 https 域名服务器,但实在拜访到的是 http+ip 地址的模式。只有网关配置高,能 handle 多个利用
  • 需要 :用户登录利用,须要与服务器建设 wss 连贯,不同角色之间能够单发音讯,也能够群发音讯
  • 集群中的应用服务类型 :每个集群实例都负责 http 无状态申请服务与 ws 长连贯服务

零碎架构图

在我的实现里,每个应用服务器都负责 http and ws 申请,其实也能够将 ws 申请建设的聊天模型独自成立为一个模块。从分布式的角度来看,这两种实现类型差不多,但从实现方便性来说,一个应用服务 http+ws 申请的形式更为不便。下文会有解释

本文波及的技术栈

  • Eureka 服务发现与注册
  • Redis Session 共享
  • Redis 音讯订阅
  • Spring Boot
  • Zuul 网关
  • Spring Cloud Gateway 网关
  • Spring WebSocket 解决长连贯
  • Ribbon 负载平衡
  • Netty 多协定 NIO 网络通信框架
  • Consistent Hash 一致性哈希算法

置信能走到这一步的人都理解过我下面列举的技术栈了,如果还没有,能够先去网上找找入门教程理解一下。上面的内容都与上述技术相干,题主默认大家都理解过了 …

技术可行性剖析

上面我将形容 session 个性,以及依据这些个性列举出 n 个解决分布式架构中解决 ws 申请的集群计划

WebSocketSession 与 HttpSession

在 Spring 所集成的 WebSocket 外面,每个 ws 连贯都有一个对应的 session:WebSocketSession,在 Spring WebSocket 中,咱们建设 ws 连贯之后能够通过相似这样的形式进行与客户端的通信:

protected void handleTextMessage(WebSocketSession session, TextMessage message) {System.out.println("服务器接管到的音讯:"+ message);
   //send message to client
   session.sendMessage(new TextMessage("message"));
}

那么问题来了:ws 的 session 无奈序列化到 redis,因而在集群中,咱们无奈将所有 WebSocketSession 都缓存到 redis 进行 session 共享。每台服务器都有各自的 session。于此相同的是 HttpSession,redis 能够反对 httpsession 共享,然而目前没有 websocket session 共享的计划,因而走 redis websocket session 共享这条路是行不通的。

有的人可能会想:我可不可以将 sessin 要害信息缓存到 redis,集群中的服务器从 redis 拿取 session 要害信息而后从新构建 websocket session… 我只想说这种办法如果有人能试出来,请通知我一声 …

以上便是 websocket session 与 http session 共享的区别,总的来说就是 http session 共享曾经有解决方案了,而且很简略,只有引入相干依赖:spring-session-data-redisspring-boot-starter-redis,大家能够从网上找个 demo 玩一下就晓得怎么做了。而 websocket session 共享的计划因为 websocket 底层实现的形式,咱们无奈做到真正的 websocket session 共享。

解决方案的演变

Netty 与 Spring WebSocket

刚开始的时候,我尝试着用 netty 实现了 websocket 服务端的搭建。在 netty 外面,并没有 websocket session 这样的概念,与其相似的是 channel,每一个客户端连贯都代表一个 channel。前端的 ws 申请通过 netty 监听的端口,走 websocket 协定进行 ws 握手连贯之后,通过一些列的 handler(责链模式)进行音讯解决。与 websocket session 相似地,服务端在连贯建设后有一个 channel,咱们能够通过 channel 进行与客户端的通信

/**
* TODO 依据服务器传进来的 id,调配到不同的 group
*/
private static final ChannelGroup GROUP = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);

@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
   //retain 减少援用计数,避免接下来的调用援用生效
   System.out.println("服务器接管到来自" + ctx.channel().id() + "的音讯:" + msg.text());
   // 将音讯发送给 group 外面的所有 channel,也就是发送音讯给客户端
   GROUP.writeAndFlush(msg.retain());
}

那么,服务端用 netty 还是用 spring websocket?以下我将从几个方面列举这两种实现形式的优缺点

应用 netty 实现 websocket

玩过 netty 的人都晓得 netty 是的线程模型是 nio 模型,并发量十分高,spring5 之前的网络线程模型是 servlet 实现的,而 servlet 不是 nio 模型,所以在 spring5 之后,spring 的底层网络实现采纳了 netty。如果咱们独自应用 netty 来开发 websocket 服务端,速度快是相对的,然而可能会遇到下列问题:

  1. 与零碎的其余利用集成不不便,在 rpc 调用的时候,无奈享受 springcloud 里 feign 服务调用的便利性
  2. 业务逻辑可能要反复实现
  3. 应用 netty 可能须要反复造轮子
  4. 怎么连贯上服务注册核心,也是一件麻烦的事件
  5. restful 服务与 ws 服务须要离开实现,如果在 netty 上实现 restful 服务,有多麻烦可想而知,用 spring 一站式 restful 开发置信很多人都习惯了。

应用 spring websocket 实现 ws 服务

spring websocket 曾经被 springboot 很好地集成了,所以在 springboot 上开发 ws 服务十分不便,做法非常简单。

Spring Boot 根底就不介绍了,举荐下这个实战教程:
https://github.com/javastacks…

第一步:增加依赖

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

第二步:增加配置类

@Configuration
public class WebSocketConfig implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {registry.addHandler(myHandler(), "/")
        .setAllowedOrigins("*");
}

@Bean
 public WebSocketHandler myHandler() {return new MessageHandler();
 }
}

第三步:实现音讯监听类

@Component
@SuppressWarnings("unchecked")
public class MessageHandler extends TextWebSocketHandler {private List<WebSocketSession> clients = new ArrayList<>();

   @Override
   public void afterConnectionEstablished(WebSocketSession session) {clients.add(session);
       System.out.println("uri :" + session.getUri());
       System.out.println("连贯建设:" + session.getId());
       System.out.println("current seesion:" + clients.size());
   }

   @Override
   public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {clients.remove(session);
       System.out.println("断开连接:" + session.getId());
   }

   @Override
   protected void handleTextMessage(WebSocketSession session, TextMessage message) {String payload = message.getPayload();
       Map<String, String> map = JSONObject.parseObject(payload, HashMap.class);
       System.out.println("承受到的数据" + map);
       clients.forEach(s -> {
           try {System.out.println("发送音讯给:" + session.getId());
               s.sendMessage(new TextMessage("服务器返回收到的信息," + payload));
           } catch (Exception e) {e.printStackTrace();
           }
       });
   }
}

从这个 demo 中,应用 spring websocket 实现 ws 服务的便利性大家可想而知了。为了能更好地向 spring cloud 大家族看齐,我最终采纳了 spring websocket 实现 ws 服务。

因而我的应用服务架构是这样子的:一个利用既负责 restful 服务,也负责 ws 服务。没有将 ws 服务模块拆分是因为拆分进来要应用 feign 来进行服务调用。第一自己比拟懈怠,第二拆分与不拆分相差在多了一层服务间的 io 调用,所以就没有这么做了。

从 zuul 技术转型到 spring cloud gateway

要实现 websocket 集群,咱们必不可免地得从 zuul 转型到 spring cloud gateway。起因如下:

zuul1.0 版本不反对 websocket 转发,zuul 2.0 开始反对 websocket,zuul2.0 几个月前开源了,然而 2.0 版本没有被 spring boot 集成,而且文档不健全。因而转型是必须的,同时转型也很容易实现。

在 gateway 中,为了实现 ssl 认证和动静路由负载平衡,yml 文件中以下的某些配置是必须的,在这里提前防止大家采坑。

Spring Boot 根底就不介绍了,举荐下这个实战教程:
https://github.com/javastacks…

server:
  port: 443
  ssl:
    enabled: true
    key-store: classpath:xxx.jks
    key-store-password: xxxx
    key-store-type: JKS
    key-alias: alias
spring:
  application:
    name: api-gateway
  cloud:
    gateway:
      httpclient:
        ssl:
          handshake-timeout-millis: 10000
          close-notify-flush-timeout-millis: 3000
          close-notify-read-timeout-millis: 0
          useInsecureTrustManager: true
      discovery:
        locator:
          enabled: true
          lower-case-service-id: true
      routes:
      - id: dc
        uri: lb://dc
        predicates:
        - Path=/dc/**
      - id: wecheck
        uri: lb://wecheck
        predicates:
        - Path=/wecheck/**

如果要欢快地玩 https 卸载,咱们还须要配置一个 filter,否则申请网关时会呈现谬误 not an SSL/TLS record

@Component
public class HttpsToHttpFilter implements GlobalFilter, Ordered {
  private static final int HTTPS_TO_HTTP_FILTER_ORDER = 10099;
  @Override
  public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {URI originalUri = exchange.getRequest().getURI();
      ServerHttpRequest request = exchange.getRequest();
      ServerHttpRequest.Builder mutate = request.mutate();
      String forwardedUri = request.getURI().toString();
      if (forwardedUri != null && forwardedUri.startsWith("https")) {
          try {
              URI mutatedUri = new URI("http",
                      originalUri.getUserInfo(),
                      originalUri.getHost(),
                      originalUri.getPort(),
                      originalUri.getPath(),
                      originalUri.getQuery(),
                      originalUri.getFragment());
              mutate.uri(mutatedUri);
          } catch (Exception e) {throw new IllegalStateException(e.getMessage(), e);
          }
      }
      ServerHttpRequest build = mutate.build();
      ServerWebExchange webExchange = exchange.mutate().request(build).build();
      return chain.filter(webExchange);
  }

  @Override
  public int getOrder() {return HTTPS_TO_HTTP_FILTER_ORDER;}
}

这样子咱们就能够应用 gateway 来卸载 https 申请了,到目前为止,咱们的根本框架曾经搭建结束,网关既能够转发 https 申请,也能够转发 wss 申请。接下来就是用户多对多之间 session 互通的通信解决方案了。接下来,我将依据计划的优雅性,从最不优雅的计划开始讲起。

session 播送

这是最简略的 websocket 集群通信解决方案。场景如下:

老师 A 想要群发音讯给他的学生们

  • 老师的音讯申请发给网关,内容蕴含 {我是老师 A,我想把 xxx 音讯发送我的学生们}
  • 网关接管到音讯,获取集群所有 ip 地址,一一调用老师的申请
  • 集群中的每台服务器获取申请,依据老师 A 的信息查找本地有没有与学生关联的 session,有则调用 sendMessage 办法,没有则疏忽申请

session 播送实现很简略,然而有一个致命缺点:计算力节约景象,当服务器没有音讯接收者 session 的时候,相当于节约了一次循环遍历的计算力,该计划在并发需要不高的状况下能够优先思考,实现很容易。

spring cloud 中获取服务集群中每台服务器信息的办法如下

@Resource
private EurekaClient eurekaClient;

Application app = eurekaClient.getApplication("service-name");
//instanceInfo 包含了一台服务器 ip,port 等音讯
InstanceInfo instanceInfo = app.getInstances().get(0);
System.out.println("ip address:" + instanceInfo.getIPAddr());

服务器须要保护关系映射表,将用户的 id 与 session 做映射,session 建设时在映射表中增加映射关系,session 断开后要删除映射表内关联关系

一致性哈希算法实现(本文的要点)

这种办法是自己认为最优雅的实现计划,了解这种计划须要肯定的工夫,如果你急躁看上来,置信你肯定会有所播种。再强调一次,不理解一致性哈希算法的同学请先看这里,现先假如哈希环是顺时针查找的。

首先,想要将一致性哈希算法的思维利用到咱们的 websocket 集群,咱们须要解决以下新问题:

  • 集群节点 DOWN,会影响到哈希环映射到状态是 DOWN 的节点。
  • 集群节点 UP,会影响到旧 key 映射不到对应的节点。
  • 哈希环读写共享。

在集群中,总会呈现服务 UP/DOWN 的问题。

针对节点 DOWN 的问题剖析如下:

一个服务器 DOWN 的时候,其领有的 websocket session 会主动敞开连贯,并且前端会收到告诉。此时会影响到哈希环的映射谬误。咱们只须要当监听到服务器 DOWN 的时候,删除哈希环下面对应的理论结点和虚结点,防止让网关转发到状态是 DOWN 的服务器上。

实现办法:在 eureka 治理核心监听集群服务 DOWN 事件,并及时更新哈希环。

针对节点 UP 的问题剖析如下:

现假如集群中有服务 CacheB 上线了,该服务器的 ip 地址刚好被映射到 key1 和 cacheA 之间。那么 key1 对应的用户每次要发消息时都跑去 CacheB 发送音讯,后果显著是发送不了音讯,因为 CacheB 没有 key1 对应的 session。

此时咱们有两种解决方案。

计划 A 简略,动作大:

eureka 监听到节点 UP 事件之后,依据现有集群信息,更新哈希环。并且断开所有 session 连贯,让客户端从新连贯,此时客户端会连贯到更新后的哈希环节点,以此防止音讯无奈送达的状况。

计划 B 简单,动作小:

咱们先看看没有虚构节点的状况,假如 CacheC 和 CacheA 之间上线了服务器 CacheB。所有映射在 CacheC 到 CacheB 的用户发消息时都会去 CacheB 外面找 session 发消息。也就是说 CacheB 一但上线,便会影响到 CacheC 到 CacheB 之间的用户发送音讯。所以咱们只须要将 CacheA 断开 CacheC 到 CacheB 的用户所对应的 session,让客户端重连。

接下来是有虚构节点的状况,假如浅色的节点是虚构节点。咱们用长括号来代表某段区域映射的后果属于某个 Cache。首先是 C 节点未上线的状况。图大家应该都懂吧,所有 B 的虚构节点都会指向实在的 B 节点,所以所有 B 节点逆时针那一部分都会映射到 B(因为咱们规定哈希环顺时针查找)。

接下来是 C 节点上线的状况,能够看到某些区域被 C 霸占了。

由以上状况咱们能够晓得:节点上线,会有许多对应虚构节点也同时上线,因而咱们须要将多段范畴 key 对应的 session 断开连接(上图红色的局部)。具体算法有点简单,实现的形式因人而异,大家能够尝试一下本人实现算法。

哈希环应该放在哪里?

  • gateway 本地创立并保护哈希环。当 ws 申请进来的时候,本地获取哈希环并获取映射服务器信息,转发 ws 申请。这种办法看上去不错,但实际上是不太可取的,回忆一下下面服务器 DOWN 的时候只能通过 eureka 监听,那么 eureka 监听到 DOWN 事件之后,须要通过 io 来告诉 gateway 删除对应节点吗?显然太麻烦了,将 eureka 的职责扩散到 gateway,不倡议这么做。
  • eureka 创立,并放到 redis 共享读写。这个计划可行,当 eureka 监听到服务 DOWN 的时候,批改哈希环并推送到 redis 上。为了申请响应工夫尽量地短,咱们不能够让 gateway 每次转发 ws 申请的时候都去 redis 取一次哈希环。哈希环批改的概率确实很低,gateway 只须要利用 redis 的音讯订阅模式,订阅哈希环批改事件便能够解决此问题。

至此咱们的 spring websocket 集群曾经搭建的差不多了,最重要的中央还是一致性哈希算法。当初有最初一个技术瓶颈,网关如何依据 ws 申请转发到指定的集群服务器上?

答案在负载平衡。spring cloud gateway 或 zuul 都默认集成了 ribbon 作为负载平衡,咱们只须要依据建设 ws 申请时客户端发来的 user id,重写 ribbon 负载平衡算法,依据 user id 进行 hash,并在哈希环上寻找 ip,并将 ws 申请转发到该 ip 便完事了。流程如下图所示:

接下来用户沟通的时候,只须要依据 id 进行 hash,在哈希环上获取对应 ip,便能够晓得与该用户建设 ws 连贯时的 session 存在哪台服务器上了!

spring cloud Finchley.RELEASE 版本中 ribbon 未欠缺的中央

题主在实际操作的时候发现了 ribbon 两个不欠缺的中央 ……

  • 依据网上找的办法,继承 AbstractLoadBalancerRule 重写负载平衡策略之后,多个不同利用的申请变得凌乱。如果 eureka 上有两个 service A 和 B,重写负载平衡策略之后,申请 A 或 B 的服务,最终只会映射到其中一个服务上。十分奇怪!可能 spring cloud gateway 官网须要给出一个正确的重写负载平衡策略的 demo。
  • 一致性哈希算法须要一个 key,相似 user id,依据 key 进行 hash 之后在哈希环上搜寻并返回 ip。然而 ribbon 没有欠缺 choose 函数的 key 参数,间接写死了 default!

难道这样子咱们就没有方法了吗?其实还有一个可行并且临时可代替的方法!

如下图所示,客户端发送一个一般的 http 申请(蕴含 id 参数)给网关,网关依据 id 进行 hash,在哈希环中寻找 ip 地址,将 ip 地址返回给客户端,客户端再依据该 ip 地址进行 ws 申请。

因为 ribbon 未欠缺 key 的解决,咱们临时无奈在 ribbon 上实现一致性哈希算法。只能间接地通过客户端发动两次申请(一次 http,一次 ws)的形式来实现一致性哈希。心愿不久之后 ribbon 能更新这个缺点!让咱们的 websocket 集群实现得更优雅一点。

后记

以上便是我这几天摸索的后果。期间遇到了许多问题,并逐个解决难题,列出两个 websocket 集群解决方案。第一个是 session 播送,第二个是一致性哈希。

这两种计划针对不同场景各有优缺点,本文并未用到 ActiveMQ,Karfa 等音讯队列实现音讯推送,只是想通过本人的想法,不依附音讯队列来简略地实现多用户之间的长连贯通信。心愿能为大家提供一条不同于寻常的思路。

近期热文举荐:

1.1,000+ 道 Java 面试题及答案整顿 (2021 最新版)

2. 别在再满屏的 if/ else 了,试试策略模式,真香!!

3. 卧槽!Java 中的 xx ≠ null 是什么新语法?

4.Spring Boot 2.6 正式公布,一大波新个性。。

5.《Java 开发手册(嵩山版)》最新公布,速速下载!

感觉不错,别忘了顺手点赞 + 转发哦!

正文完
 0