乐趣区

关于分布式:聊聊分布式下的WebSocket解决方案

前言


最近本人搭建了个我的项目,我的项目自身很简略,然而外面有应用 WebSocket 进行音讯揭示的性能,大体状况是这样的。

公布音讯者在零碎中发送音讯,实时的把音讯推送给对应的一个部门下的所有人。

这外面如果是单机利用的状况时,咱们能够通过部门的 id 和用户的 id 组成一个惟一的 key,与应用服务器建设 WebSocket 长连贯,而后就能够接管到公布音讯者发送的音讯了。

然而真正把我的项目利用于生产环境中时,咱们是不可能就部署一个单机利用的,而是要部署一个集群。

所以我通过 Nginx+ 两台 Tomcat 搭建了一个简略的负载平衡集群,作为测试应用

然而问题呈现了,咱们的客户端浏览器只会与一台服务器建设 WebSocket 长连贯,所以公布音讯者在发送音讯时,就没法保障所有指标部门的人都能接管到音讯(因为这些人连贯的可能不是一个服务器)。

本篇文章就是针对于这么一个问题展开讨论,提出一种解决方案,当然解决方案不止一种,那咱们开始吧。

WebSocket 单体利用介绍


在介绍分布式集群之前,咱们先来看一下王子的 WebSocket 代码实现,先来看 java 后端代码如下:

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@ServerEndpoint(“/webSocket/{key}”)
public class WebSocket {

private static int onlineCount = 0;
/**
 * 存储连贯的客户端
 */
private static Map<String, WebSocket> clients = new ConcurrentHashMap<String, WebSocket>();
private Session session;
/**
 * 发送的指标科室 code
 */
private String key;

@OnOpen
public void onOpen(@PathParam("key") String key, Session session) throws IOException {
    this.key = key;
    this.session = session;
    if (!clients.containsKey(key)) {addOnlineCount();
    }
    clients.put(key, this);
    Log.info(key+"已连贯音讯服务!");
}

@OnClose
public void onClose() throws IOException {clients.remove(key);
    subOnlineCount();}

@OnMessage
public void onMessage(String message) throws IOException {if(message.equals("ping")){return ;}
    JSONObject jsonTo = JSON.parseObject(message);
    String mes = (String) jsonTo.get("message");
    if (!jsonTo.get("to").equals("All")){sendMessageTo(mes, jsonTo.get("to").toString());
    }else{sendMessageAll(mes);
    }
}

@OnError
public void onError(Session session, Throwable error) {error.printStackTrace();
}

private void sendMessageTo(String message, String To) throws IOException {for (WebSocket item : clients.values()) {if (item.key.contains(To) )
            item.session.getAsyncRemote().sendText(message);
    }
}

private void sendMessageAll(String message) throws IOException {for (WebSocket item : clients.values()) {item.session.getAsyncRemote().sendText(message);
    }
}

public static synchronized int getOnlineCount() {return onlineCount;}

public static synchronized void addOnlineCount() {WebSocket.onlineCount++;}

public static synchronized void subOnlineCount() {WebSocket.onlineCount--;}

public static synchronized Map<String, WebSocket> getClients() {return clients;}

}

示例代码中并没有应用 Spring,用的是原生的 java web 编写的,简略和大家介绍一下外面的办法。

onOpen:在客户端与 WebSocket 服务连贯时触发办法执行

onClose:在客户端与 WebSocket 连贯断开的时候触发执行

onMessage:在接管到客户端发送的音讯时触发执行

onError:在产生谬误时触发执行

能够看到,在 onMessage 办法中,咱们间接依据客户端发送的音讯,进行音讯的转发性能,这样在单体音讯服务中是没有问题的。

再来看一下 js 代码

var host = document.location.host;

// 取得以后登录科室
var deptCodes='${sessionScope.$UserContext.departmentID}';
deptCodes=deptCodes.replace(/[[|]|s]+/g, "");
var key = '${sessionScope.$UserContext.userID}'+deptCodes;
var lockReconnect = false;  // 防止 ws 反复连贯
var ws = null;          // 判断以后浏览器是否反对 WebSocket
var wsUrl = 'ws://' + host + '/webSocket/'+ key;
createWebSocket(wsUrl);   // 连贯 ws

function createWebSocket(url) {
    try{if('WebSocket' in window){ws = new WebSocket(url);
        }else if('MozWebSocket' in window){ws = new MozWebSocket(url);
        }else{layer.alert("您的浏览器不反对 websocket 协定, 倡议应用新版谷歌、火狐等浏览器,请勿应用 IE10 以下浏览器,360 浏览器请应用极速模式,不要应用兼容模式!"); 
        }
        initEventHandle();}catch(e){reconnect(url);
        console.log(e);
    }     
}

function initEventHandle() {ws.onclose = function () {reconnect(wsUrl);
        console.log("llws 连贯敞开!"+new Date().toUTCString());
    };
    ws.onerror = function () {reconnect(wsUrl);
        console.log("llws 连贯谬误!");
    };
    ws.onopen = function () {heartCheck.reset().start();      // 心跳检测重置
        console.log("llws 连贯胜利!"+new Date().toUTCString());
    };
    ws.onmessage = function (event) {    // 如果获取到音讯,心跳检测重置
        heartCheck.reset().start();      // 拿到任何音讯都阐明以后连贯是失常的 // 接管到音讯理论业务解决 

    };
}
// 监听窗口敞开事件,当窗口敞开时,被动去敞开 websocket 连贯,避免连贯还没断开就敞开窗口,server 端会抛异样。window.onbeforeunload = function() {ws.close();
}  

function reconnect(url) {if(lockReconnect) return;
    lockReconnect = true;
    setTimeout(function () {     // 没连贯上会始终重连,设置提早防止申请过多
        createWebSocket(url);
        lockReconnect = false;
    }, 2000);
}

// 心跳检测
var heartCheck = {
    timeout: 300000,        // 5 分钟发一次心跳
    timeoutObj: null,
    serverTimeoutObj: null,
    reset: function(){clearTimeout(this.timeoutObj);
        clearTimeout(this.serverTimeoutObj);
        return this;
    },
    start: function(){
        var self = this;
        this.timeoutObj = setTimeout(function(){
            // 这里发送一个心跳,后端收到后,返回一个心跳音讯,//onmessage 拿到返回的心跳就阐明连贯失常
            ws.send("ping");
            console.log("ping!")
            self.serverTimeoutObj = setTimeout(function(){// 如果超过肯定工夫还没重置,阐明后端被动断开了
                ws.close();     // 如果 onclose 会执行 reconnect,咱们执行 ws.close() 就行了. 如果间接执行 reconnect 会触发 onclose 导致重连两次
            }, self.timeout)
        }, this.timeout)
    }

}

js 局部应用的是原生 H5 编写的,如果为了更好的兼容浏览器,也能够应用 SockJS,有趣味小伙伴们能够自行百度。

接下来咱们就手动的优化代码,实现 WebSocket 对分布式架构的反对。

解决方案的思考


当初咱们曾经理解单体利用下的代码构造,也分明了 WebSocket 在分布式环境下面临的问题,那么是时候思考一下如何可能解决这个问题了。

咱们先来看一看产生这个问题的根本原因是什么。

简略思考一下就能明确,单体利用下只有一台服务器,所有的客户端连贯的都是这一台音讯服务器,所以当公布音讯者发送音讯时,所有的客户端其实曾经全副与这台服务器建设了连贯,间接群发音讯就能够了。

换成分布式系统后,如果咱们有两台音讯服务器,那么客户端通过 Nginx 负载平衡后,就会有一部分连贯到其中一台服务器,另一部分连贯到另一台服务器,所以公布音讯者发送音讯时,只会发送到其中的一台服务器上,而这台音讯服务器就能够执行群发操作,但问题是,另一台服务器并不知道这件事,也就无奈发送音讯了。

当初咱们晓得了根本原因是生产音讯时,只有一台音讯服务器可能感知到,所以咱们只有让另一台音讯服务器也能感知到就能够了,这样感知到之后,它就能够群发音讯给连贯到它上边的客户端了。

那么什么办法能够实现这种性能呢,王子很快想到了引入消息中间件,并应用它的公布订阅模式来告诉所有音讯服务器就能够了。

引入 RabbitMQ 解决分布式下的 WebSocket 问题


在消息中间件的抉择上,王子抉择了 RabbitMQ,起因是它的搭建比较简单,性能也很弱小,而且咱们只是用到它群发音讯的性能。

RabbitMQ 有一个播送模式(fanout),咱们应用的就是这种模式。

首先咱们写一个 RabbitMQ 的连贯类:

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RabbitMQUtil {

private static Connection connection;

/**
 * 与 rabbitmq 建设连贯
 * @return
 */
public static Connection getConnection() {if (connection != null&&connection.isOpen()) {return connection;}

    ConnectionFactory factory = new ConnectionFactory();
    factory.setVirtualHost("/");
    factory.setHost("192.168.220.110"); // 用的是虚构 IP 地址
    factory.setPort(5672);
    factory.setUsername("guest");
    factory.setPassword("guest");

    try {connection = factory.newConnection();
    } catch (IOException e) {e.printStackTrace();
    } catch (TimeoutException e) {e.printStackTrace();
    }

    return connection;
}

}

这个类没什么说的,就是获取 MQ 连贯的一个工厂类。

而后依照咱们的思路,就是每次服务器启动的时候,都会创立一个 MQ 的消费者监听 MQ 的音讯,王子这里测试应用的是 Servlet 的监听器,如下:

import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;

public class InitListener implements ServletContextListener {

@Override
public void contextInitialized(ServletContextEvent servletContextEvent) {WebSocket.init();
}

@Override
public void contextDestroyed(ServletContextEvent servletContextEvent) {}

}

记得要在 Web.xml 中配置监听器信息

<?xml version=”1.0″ encoding=”UTF-8″?>
<web-app xmlns=”http://xmlns.jcp.org/xml/ns/javaee”

     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_4_0.xsd"
     version="4.0">
<listener>
    <listener-class>InitListener</listener-class>
</listener>

</web-app>

WebSocket 中减少 init 办法,作为 MQ 消费者局部

public static void init() {

    try {Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        // 交换机申明(参数为:交换机名称;交换机类型)channel.exchangeDeclare("fanoutLogs",BuiltinExchangeType.FANOUT);
        // 获取一个长期队列
        String queueName = channel.queueDeclare().getQueue();
        // 队列与交换机绑定(参数为:队列名称;交换机名称;routingKey 疏忽)channel.queueBind(queueName,"fanoutLogs","");


        // 这里重写了 DefaultConsumer 的 handleDelivery 办法,因为发送的时候对音讯进行了 getByte(),在这里要从新组装成 String
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {super.handleDelivery(consumerTag, envelope, properties, body);
                String message = new String(body,"UTF-8");
                System.out.println(message);

// 这里能够应用 WebSocket 通过音讯内容发送音讯给对应的客户端

            }
        };

        // 申明队列中被生产掉的音讯(参数为:队列名称;音讯是否主动确认;consumer 主体)channel.basicConsume(queueName,true,consumer);
        // 这里不能敞开连贯,调用了生产办法后,消费者会始终连贯着 rabbitMQ 期待生产
    } catch (IOException e) {e.printStackTrace();
    }
}

同时在接管到音讯时,不是间接通过 WebSocket 发送音讯给对应客户端,而是发送音讯给 MQ,这样如果音讯服务器有多个,就都会从 MQ 中取得音讯,之后通过获取的音讯内容再应用 WebSocket 推送给对应的客户端就能够了。

WebSocket 的 onMessage 办法减少内容如下:

try {

        // 尝试获取一个连贯
        Connection connection = RabbitMQUtil.getConnection();
        // 尝试创立一个 channel
        Channel channel = connection.createChannel();
        // 申明交换机(参数为:交换机名称; 交换机类型,播送模式)channel.exchangeDeclare("fanoutLogs", BuiltinExchangeType.FANOUT);
        // 音讯公布(参数为:交换机名称; routingKey,疏忽。在播送模式中,生产者申明交换机的名称和类型即可)channel.basicPublish("fanoutLogs","", null,msg.getBytes("UTF-8"));
        System.out.println("公布音讯");
        channel.close();} catch (IOException |TimeoutException e) {e.printStackTrace();
    }

减少后删除掉原来的 Websocket 推送局部代码。

这样一整套的解决方案就实现了。

总结


到这里,咱们就解决了分布式下 WebSocket 的推送音讯问题。

咱们次要是引入了 RabbitMQ,通过 RabbitMQ 的公布订阅模式,让每个音讯服务器启动的时候都去订阅音讯,而无论哪台音讯服务器在发送音讯的时候都会发送给 MQ,这样每台音讯服务器就都会感知到发送音讯的事件,从而再通过 Websocket 发送给客户端。

大体流程就是这样,那么小伙伴们有没有想过,如果 RabbitMQ 挂掉了几分钟,之后重启了,消费者是否能够从新连贯到 RabbitMQ?是否还能失常接管音讯呢?

生产环境下,这个问题是必须思考的。

这里曾经测试过,消费者是反对主动重连的,所以咱们能够释怀的应用这套架构来解决此问题。

本文到这里就完结了,欢送各位小伙伴点赞文章留言探讨,一起学习,一起提高。

退出移动版