作者:邱城铨\
起源:https://segmentfault.com/a/11...

问题起因

最近做我的项目时遇到了须要多用户之间通信的问题,波及到了WebSocket握手申请,以及集群中WebSocket Session共享的问题。

期间我通过了几天的钻研,总结出了几个实现分布式WebSocket集群的方法,从zuul到spring cloud gateway的不同尝试,总结出了这篇文章,心愿能帮忙到某些人,并且能一起分享这方面的想法与钻研。

以下是我的场景形容

  • 资源:4台服务器。其中只有一台服务器具备ssl认证域名,一台redis+mysql服务器,两台应用服务器(集群)
  • 利用公布限度条件:因为场景须要,利用场合须要ssl认证的域名能力公布。因而ssl认证的域名服务器用来当api网关,负责https申请与wss(平安认证的ws)连贯。俗称https卸载,用户申请https域名服务器,但实在拜访到的是http+ip地址的模式。只有网关配置高,能handle多个利用
  • 需要:用户登录利用,须要与服务器建设wss连贯,不同角色之间能够单发音讯,也能够群发音讯
  • 集群中的应用服务类型:每个集群实例都负责http无状态申请服务与ws长连贯服务

零碎架构图

在我的实现里,每个应用服务器都负责http and ws申请,其实也能够将ws申请建设的聊天模型独自成立为一个模块。从分布式的角度来看,这两种实现类型差不多,但从实现方便性来说,一个应用服务http+ws申请的形式更为不便。下文会有解释

本文波及的技术栈

  • Eureka 服务发现与注册
  • Redis Session共享
  • Redis 音讯订阅
  • Spring Boot
  • Zuul 网关
  • Spring Cloud Gateway 网关
  • Spring WebSocket 解决长连贯
  • Ribbon 负载平衡
  • Netty 多协定NIO网络通信框架
  • Consistent Hash 一致性哈希算法

置信能走到这一步的人都理解过我下面列举的技术栈了,如果还没有,能够先去网上找找入门教程理解一下。上面的内容都与上述技术相干,题主默认大家都理解过了...

技术可行性剖析

上面我将形容session个性,以及依据这些个性列举出n个解决分布式架构中解决ws申请的集群计划

WebSocketSession与HttpSession

在Spring所集成的WebSocket外面,每个ws连贯都有一个对应的session:WebSocketSession,在Spring WebSocket中,咱们建设ws连贯之后能够通过相似这样的形式进行与客户端的通信:

protected void handleTextMessage(WebSocketSession session, TextMessage message) {   System.out.println("服务器接管到的音讯: "+ message );   //send message to client   session.sendMessage(new TextMessage("message"));}

那么问题来了:ws的session无奈序列化到redis,因而在集群中,咱们无奈将所有WebSocketSession都缓存到redis进行session共享。每台服务器都有各自的session。于此相同的是HttpSession,redis能够反对httpsession共享,然而目前没有websocket session共享的计划,因而走redis websocket session共享这条路是行不通的。

有的人可能会想:我可不可以将sessin要害信息缓存到redis,集群中的服务器从redis拿取session要害信息而后从新构建websocket session...我只想说这种办法如果有人能试出来,请通知我一声...

以上便是websocket session与http session共享的区别,总的来说就是http session共享曾经有解决方案了,而且很简略,只有引入相干依赖:spring-session-data-redisspring-boot-starter-redis,大家能够从网上找个demo玩一下就晓得怎么做了。而websocket session共享的计划因为websocket底层实现的形式,咱们无奈做到真正的websocket session共享。

解决方案的演变

Netty与Spring WebSocket

刚开始的时候,我尝试着用netty实现了websocket服务端的搭建。在netty外面,并没有websocket session这样的概念,与其相似的是channel,每一个客户端连贯都代表一个channel。前端的ws申请通过netty监听的端口,走websocket协定进行ws握手连贯之后,通过一些列的handler(责链模式)进行音讯解决。与websocket session相似地,服务端在连贯建设后有一个channel,咱们能够通过channel进行与客户端的通信

/*** TODO 依据服务器传进来的id,调配到不同的group*/private static final ChannelGroup GROUP = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);@Overrideprotected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {   //retain减少援用计数,避免接下来的调用援用生效   System.out.println("服务器接管到来自 " + ctx.channel().id() + " 的音讯: " + msg.text());   //将音讯发送给group外面的所有channel,也就是发送音讯给客户端   GROUP.writeAndFlush(msg.retain());}

那么,服务端用netty还是用spring websocket?以下我将从几个方面列举这两种实现形式的优缺点

应用netty实现websocket

玩过netty的人都晓得netty是的线程模型是nio模型,并发量十分高,spring5之前的网络线程模型是servlet实现的,而servlet不是nio模型,所以在spring5之后,spring的底层网络实现采纳了netty。如果咱们独自应用netty来开发websocket服务端,速度快是相对的,然而可能会遇到下列问题:

  1. 与零碎的其余利用集成不不便,在rpc调用的时候,无奈享受springcloud里feign服务调用的便利性
  2. 业务逻辑可能要反复实现
  3. 应用netty可能须要反复造轮子
  4. 怎么连贯上服务注册核心,也是一件麻烦的事件
  5. restful服务与ws服务须要离开实现,如果在netty上实现restful服务,有多麻烦可想而知,用spring一站式restful开发置信很多人都习惯了。

应用spring websocket实现ws服务

spring websocket曾经被springboot很好地集成了,所以在springboot上开发ws服务十分不便,做法非常简单。

Spring Boot 根底就不介绍了,举荐下这个实战教程:
https://github.com/javastacks...

第一步:增加依赖

<dependency>   <groupId>org.springframework.boot</groupId>   <artifactId>spring-boot-starter-websocket</artifactId></dependency>

第二步:增加配置类

@Configurationpublic class WebSocketConfig implements WebSocketConfigurer {@Overridepublic void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {    registry.addHandler(myHandler(), "/")        .setAllowedOrigins("*");}@Bean public WebSocketHandler myHandler() {     return new MessageHandler(); }}

第三步:实现音讯监听类

@Component@SuppressWarnings("unchecked")public class MessageHandler extends TextWebSocketHandler {   private List<WebSocketSession> clients = new ArrayList<>();   @Override   public void afterConnectionEstablished(WebSocketSession session) {       clients.add(session);       System.out.println("uri :" + session.getUri());       System.out.println("连贯建设: " + session.getId());       System.out.println("current seesion: " + clients.size());   }   @Override   public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {       clients.remove(session);       System.out.println("断开连接: " + session.getId());   }   @Override   protected void handleTextMessage(WebSocketSession session, TextMessage message) {       String payload = message.getPayload();       Map<String, String> map = JSONObject.parseObject(payload, HashMap.class);       System.out.println("承受到的数据" + map);       clients.forEach(s -> {           try {               System.out.println("发送音讯给: " + session.getId());               s.sendMessage(new TextMessage("服务器返回收到的信息," + payload));           } catch (Exception e) {               e.printStackTrace();           }       });   }}

从这个demo中,应用spring websocket实现ws服务的便利性大家可想而知了。为了能更好地向spring cloud大家族看齐,我最终采纳了spring websocket实现ws服务。

因而我的应用服务架构是这样子的:一个利用既负责restful服务,也负责ws服务。没有将ws服务模块拆分是因为拆分进来要应用feign来进行服务调用。第一自己比拟懈怠,第二拆分与不拆分相差在多了一层服务间的io调用,所以就没有这么做了。

从zuul技术转型到spring cloud gateway

要实现websocket集群,咱们必不可免地得从zuul转型到spring cloud gateway。起因如下:

zuul1.0版本不反对websocket转发,zuul 2.0开始反对websocket,zuul2.0几个月前开源了,然而2.0版本没有被spring boot集成,而且文档不健全。因而转型是必须的,同时转型也很容易实现。

在gateway中,为了实现ssl认证和动静路由负载平衡,yml文件中以下的某些配置是必须的,在这里提前防止大家采坑。

Spring Boot 根底就不介绍了,举荐下这个实战教程:
https://github.com/javastacks...

server:  port: 443  ssl:    enabled: true    key-store: classpath:xxx.jks    key-store-password: xxxx    key-store-type: JKS    key-alias: aliasspring:  application:    name: api-gateway  cloud:    gateway:      httpclient:        ssl:          handshake-timeout-millis: 10000          close-notify-flush-timeout-millis: 3000          close-notify-read-timeout-millis: 0          useInsecureTrustManager: true      discovery:        locator:          enabled: true          lower-case-service-id: true      routes:      - id: dc        uri: lb://dc        predicates:        - Path=/dc/**      - id: wecheck        uri: lb://wecheck        predicates:        - Path=/wecheck/**

如果要欢快地玩https卸载,咱们还须要配置一个filter,否则申请网关时会呈现谬误not an SSL/TLS record

@Componentpublic class HttpsToHttpFilter implements GlobalFilter, Ordered {  private static final int HTTPS_TO_HTTP_FILTER_ORDER = 10099;  @Override  public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {      URI originalUri = exchange.getRequest().getURI();      ServerHttpRequest request = exchange.getRequest();      ServerHttpRequest.Builder mutate = request.mutate();      String forwardedUri = request.getURI().toString();      if (forwardedUri != null && forwardedUri.startsWith("https")) {          try {              URI mutatedUri = new URI("http",                      originalUri.getUserInfo(),                      originalUri.getHost(),                      originalUri.getPort(),                      originalUri.getPath(),                      originalUri.getQuery(),                      originalUri.getFragment());              mutate.uri(mutatedUri);          } catch (Exception e) {              throw new IllegalStateException(e.getMessage(), e);          }      }      ServerHttpRequest build = mutate.build();      ServerWebExchange webExchange = exchange.mutate().request(build).build();      return chain.filter(webExchange);  }  @Override  public int getOrder() {      return HTTPS_TO_HTTP_FILTER_ORDER;  }}

这样子咱们就能够应用gateway来卸载https申请了,到目前为止,咱们的根本框架曾经搭建结束,网关既能够转发https申请,也能够转发wss申请。接下来就是用户多对多之间session互通的通信解决方案了。接下来,我将依据计划的优雅性,从最不优雅的计划开始讲起。

session播送

这是最简略的websocket集群通信解决方案。场景如下:

老师A想要群发音讯给他的学生们

  • 老师的音讯申请发给网关,内容蕴含{我是老师A,我想把xxx音讯发送我的学生们}
  • 网关接管到音讯,获取集群所有ip地址,一一调用老师的申请
  • 集群中的每台服务器获取申请,依据老师A的信息查找本地有没有与学生关联的session,有则调用sendMessage办法,没有则疏忽申请

session播送实现很简略,然而有一个致命缺点:计算力节约景象,当服务器没有音讯接收者session的时候,相当于节约了一次循环遍历的计算力,该计划在并发需要不高的状况下能够优先思考,实现很容易。

spring cloud中获取服务集群中每台服务器信息的办法如下

@Resourceprivate EurekaClient eurekaClient;Application app = eurekaClient.getApplication("service-name");//instanceInfo包含了一台服务器ip,port等音讯InstanceInfo instanceInfo = app.getInstances().get(0);System.out.println("ip address: " + instanceInfo.getIPAddr());

服务器须要保护关系映射表,将用户的id与session做映射,session建设时在映射表中增加映射关系,session断开后要删除映射表内关联关系

一致性哈希算法实现(本文的要点)

这种办法是自己认为最优雅的实现计划,了解这种计划须要肯定的工夫,如果你急躁看上来,置信你肯定会有所播种。再强调一次,不理解一致性哈希算法的同学请先看这里,现先假如哈希环是顺时针查找的。

首先,想要将一致性哈希算法的思维利用到咱们的websocket集群,咱们须要解决以下新问题:

  • 集群节点DOWN,会影响到哈希环映射到状态是DOWN的节点。
  • 集群节点UP,会影响到旧key映射不到对应的节点。
  • 哈希环读写共享。

在集群中,总会呈现服务UP/DOWN的问题。

针对节点DOWN的问题剖析如下:

一个服务器DOWN的时候,其领有的websocket session会主动敞开连贯,并且前端会收到告诉。此时会影响到哈希环的映射谬误。咱们只须要当监听到服务器DOWN的时候,删除哈希环下面对应的理论结点和虚结点,防止让网关转发到状态是DOWN的服务器上。

实现办法:在eureka治理核心监听集群服务DOWN事件,并及时更新哈希环。

针对节点UP的问题剖析如下:

现假如集群中有服务 CacheB上线了,该服务器的ip地址刚好被映射到key1和 cacheA之间。那么key1对应的用户每次要发消息时都跑去 CacheB发送音讯,后果显著是发送不了音讯,因为 CacheB没有key1对应的session。

此时咱们有两种解决方案。

计划A简略,动作大:

eureka监听到节点UP事件之后,依据现有集群信息,更新哈希环。并且断开所有session连贯,让客户端从新连贯,此时客户端会连贯到更新后的哈希环节点,以此防止音讯无奈送达的状况。

计划B简单,动作小:

咱们先看看没有虚构节点的状况,假如 CacheC和 CacheA之间上线了服务器 CacheB。所有映射在 CacheC到 CacheB的用户发消息时都会去 CacheB外面找session发消息。也就是说 CacheB一但上线,便会影响到 CacheC到 CacheB之间的用户发送音讯。所以咱们只须要将 CacheA断开 CacheC到 CacheB的用户所对应的session,让客户端重连。

接下来是有虚构节点的状况,假如浅色的节点是虚构节点。咱们用长括号来代表某段区域映射的后果属于某个 Cache。首先是C节点未上线的状况。图大家应该都懂吧,所有B的虚构节点都会指向实在的B节点,所以所有B节点逆时针那一部分都会映射到B(因为咱们规定哈希环顺时针查找)。

接下来是C节点上线的状况,能够看到某些区域被C霸占了。

由以上状况咱们能够晓得:节点上线,会有许多对应虚构节点也同时上线,因而咱们须要将多段范畴key对应的session断开连接(上图红色的局部)。具体算法有点简单,实现的形式因人而异,大家能够尝试一下本人实现算法。

哈希环应该放在哪里?

  • gateway本地创立并保护哈希环。当ws申请进来的时候,本地获取哈希环并获取映射服务器信息,转发ws申请。这种办法看上去不错,但实际上是不太可取的,回忆一下下面服务器DOWN的时候只能通过eureka监听,那么eureka监听到DOWN事件之后,须要通过io来告诉gateway删除对应节点吗?显然太麻烦了,将eureka的职责扩散到gateway,不倡议这么做。
  • eureka创立,并放到redis共享读写。这个计划可行,当eureka监听到服务DOWN的时候,批改哈希环并推送到redis上。为了申请响应工夫尽量地短,咱们不能够让gateway每次转发ws申请的时候都去redis取一次哈希环。哈希环批改的概率确实很低,gateway只须要利用redis的音讯订阅模式,订阅哈希环批改事件便能够解决此问题。

至此咱们的spring websocket集群曾经搭建的差不多了,最重要的中央还是一致性哈希算法。当初有最初一个技术瓶颈,网关如何依据ws申请转发到指定的集群服务器上?

答案在负载平衡。spring cloud gateway或zuul都默认集成了ribbon作为负载平衡,咱们只须要依据建设ws申请时客户端发来的user id,重写ribbon负载平衡算法,依据user id进行hash,并在哈希环上寻找ip,并将ws申请转发到该ip便完事了。流程如下图所示:

接下来用户沟通的时候,只须要依据id进行hash,在哈希环上获取对应ip,便能够晓得与该用户建设ws连贯时的session存在哪台服务器上了!

spring cloud Finchley.RELEASE 版本中ribbon未欠缺的中央

题主在实际操作的时候发现了ribbon两个不欠缺的中央......

  • 依据网上找的办法,继承AbstractLoadBalancerRule重写负载平衡策略之后,多个不同利用的申请变得凌乱。如果eureka上有两个service A和B,重写负载平衡策略之后,申请A或B的服务,最终只会映射到其中一个服务上。十分奇怪!可能spring cloud gateway官网须要给出一个正确的重写负载平衡策略的demo。
  • 一致性哈希算法须要一个key,相似user id,依据key进行hash之后在哈希环上搜寻并返回ip。然而ribbon没有欠缺choose函数的key参数,间接写死了default!

难道这样子咱们就没有方法了吗?其实还有一个可行并且临时可代替的方法!

如下图所示,客户端发送一个一般的http申请(蕴含id参数)给网关,网关依据id进行hash,在哈希环中寻找ip地址,将ip地址返回给客户端,客户端再依据该ip地址进行ws申请。

因为ribbon未欠缺key的解决,咱们临时无奈在ribbon上实现一致性哈希算法。只能间接地通过客户端发动两次申请(一次http,一次ws)的形式来实现一致性哈希。心愿不久之后ribbon能更新这个缺点!让咱们的websocket集群实现得更优雅一点。

后记

以上便是我这几天摸索的后果。期间遇到了许多问题,并逐个解决难题,列出两个websocket集群解决方案。第一个是session播送,第二个是一致性哈希。

这两种计划针对不同场景各有优缺点,本文并未用到ActiveMQ,Karfa等音讯队列实现音讯推送,只是想通过本人的想法,不依附音讯队列来简略地实现多用户之间的长连贯通信。心愿能为大家提供一条不同于寻常的思路。

近期热文举荐:

1.1,000+ 道 Java面试题及答案整顿(2021最新版)

2.别在再满屏的 if/ else 了,试试策略模式,真香!!

3.卧槽!Java 中的 xx ≠ null 是什么新语法?

4.Spring Boot 2.6 正式公布,一大波新个性。。

5.《Java开发手册(嵩山版)》最新公布,速速下载!

感觉不错,别忘了顺手点赞+转发哦!