我的项目中须要实现即时通信IM沟通,因为我的项目是微服务架构,并且每个子系统都是集群部署模式。须要解决前端的websocket申请通过springcloud-gateway网关平衡散发到集群中的netty服务中,实现零碎的高可用。

netty服务注册到eureka中

import com.netflix.appinfo.*;import com.netflix.discovery.DiscoveryClient;import com.xgjk.xgware.im.websocket.NettyProperties;import org.springframework.beans.factory.SmartInitializingSingleton;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.cloud.commons.util.InetUtils;import org.springframework.cloud.netflix.eureka.EurekaClientConfigBean;import org.springframework.stereotype.Component;import java.util.Map;@Componentpublic class NettyDiscoveryClient implements SmartInitializingSingleton {    @Autowired    private EurekaInstanceConfig config;    @Autowired    private NettyProperties nettyProperties;    @Autowired    private InetUtils inetUtils;    @Autowired    private EurekaClientConfigBean eurekaClientConfigBean;    /**     * 在spring容器治理的所有单例对象(非懒加载对象)初始化实现之后调用的回调接口     */    @Override    public void afterSingletonsInstantiated() {        /*EurekaClientConfigBean eurekaClientConfigBean = new EurekaClientConfigBean();        eurekaClientConfigBean.setServiceUrl(new HashMap<String, String>() {{            put("defaultZone", defaultZone);        }});*/        String host = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();        InstanceInfo instanceInfo = createInstance(config);        ApplicationInfoManager applicationInfoManager = new ApplicationInfoManager(new MyDataCenterInstanceConfig() {            @Override            public String getHostName(boolean refresh) {                return host;            }        }, instanceInfo);        //创立一个客户端实例        DiscoveryClient discoveryClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfigBean);    }    private InstanceInfo createInstance(EurekaInstanceConfig config) {        LeaseInfo.Builder leaseInfoBuilder = LeaseInfo.Builder.newBuilder()                .setRenewalIntervalInSecs(config.getLeaseRenewalIntervalInSeconds())                .setDurationInSecs(config.getLeaseExpirationDurationInSeconds());        // Builder the instance information to be registered with eureka        InstanceInfo.Builder builder = InstanceInfo.Builder.newBuilder();        String namespace = config.getNamespace();        if (!namespace.endsWith(".")) {            namespace = namespace + ".";        }        String host = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();        builder.setNamespace(namespace).setAppName(nettyProperties.getName())                .setInstanceId(String.join(":", host, String.valueOf(nettyProperties.getPort())))                .setAppGroupName(config.getAppGroupName())                .setDataCenterInfo(config.getDataCenterInfo())                .setIPAddr(host).setHostName(host)                .setPort(nettyProperties.getPort())                .enablePort(InstanceInfo.PortType.UNSECURE,                        config.isNonSecurePortEnabled())                .setSecurePort(config.getSecurePort())                .enablePort(InstanceInfo.PortType.SECURE, config.getSecurePortEnabled())                .setVIPAddress(nettyProperties.getName())                .setSecureVIPAddress(nettyProperties.getName())                .setHomePageUrl("/", null)                .setStatusPageUrl(config.getStatusPageUrlPath(),                        config.getStatusPageUrl())                .setHealthCheckUrls(config.getHealthCheckUrlPath(),                        config.getHealthCheckUrl(), config.getSecureHealthCheckUrl())                .setASGName(config.getASGName());        builder.setStatus(InstanceInfo.InstanceStatus.UP);        // Add any user-specific metadata information        for (Map.Entry<String, String> mapEntry : config.getMetadataMap().entrySet()) {            String key = mapEntry.getKey();            String value = mapEntry.getValue();            // only add the metadata if the value is present            if (value != null && !value.isEmpty()) {                builder.add(key, value);            }        }        InstanceInfo instanceInfo = builder.build();        instanceInfo.setLeaseInfo(leaseInfoBuilder.build());        return instanceInfo;    }}
import lombok.Data;import org.springframework.boot.context.properties.ConfigurationProperties;@ConfigurationProperties(prefix = "netty")@Datapublic class NettyProperties {    /**     * socket端口     */    private int port;    private String name;    private String path = "/webSocket";}

配置文件

netty:  name: scp-im2-websocket-netty  port: 14052  path: /webSocket

编写websocket长连贯服务端代码,因为代码较多,这里只给出入口代码

@Slf4j@Componentpublic class NettyServer {    @Autowired    private ChannelDeadCheckHandler channelDeadCheckHandler;    @Autowired    private WebSocketHandler webSocketHandler;    @Autowired    private MessageOutHandler messageOutHandler;    @Autowired    private HeartbeatHandler heartbeatHandler;    @Autowired    private NettyProperties nettyProperties;    public void start() throws Exception {        EventLoopGroup bossGroup = new NioEventLoopGroup();//负责接管连贯        EventLoopGroup group = new NioEventLoopGroup();////负责解决申请,工作线程数默认是CPU数*2        try {            ServerBootstrap bootstrap = new ServerBootstrap();            bootstrap.group(group, bossGroup) // 绑定线程池                    .channel(NioServerSocketChannel.class) // 指定应用Nio channel                    .localAddress(nettyProperties.getPort())// 绑定监听端口                    .childHandler(new ChannelInitializer<SocketChannel>() {                        public void initChannel(SocketChannel channel) {                            ChannelPipeline pipeline = channel.pipeline();                            //websocket协定自身是基于http协定的,所以这边也要应用http解编码器                            pipeline.addLast(new HttpServerCodec());                            //将HTTP音讯的多个部分合成一条残缺的HTTP音讯                            pipeline.addLast(new HttpObjectAggregator(65536));                            //用于反对大数据流的反对                            pipeline.addLast(new ChunkedWriteHandler());                            //用来判断是否读闲暇工夫过长,或写闲暇工夫过长                            //如果60秒中没有收到客户端音讯,则触发事件IdleState.READER_IDLE                            pipeline.addLast(new IdleStateHandler(60, 0, 0, TimeUnit.SECONDS));                            pipeline.addLast(channelDeadCheckHandler);                            pipeline.addLast(webSocketHandler);//自定义音讯解决类                            pipeline.addLast(heartbeatHandler);                            pipeline.addLast(messageOutHandler);                            pipeline.addLast(new WebSocketServerProtocolHandler("/webSocket", null, true, 65536 * 10));                        }                    })                    .option(ChannelOption.SO_BACKLOG, 1024)//服务端承受连贯的队列长度,如果队列已满,客户端连贯将被回绝, linux设置/proc/sys/net/core/somaxconn                    .childOption(ChannelOption.SO_KEEPALIVE, true);//设置放弃流动的连贯            ChannelFuture cf = bootstrap.bind().sync(); // 服务器异步创立绑定            log.info("NettyServer已启动... ");            cf.channel().closeFuture().sync(); // 敞开服务器通道        } finally {            group.shutdownGracefully().sync(); // 开释线程池资源            bossGroup.shutdownGracefully().sync();        }    }}

springcloud-gateway我的项目中配置路由

spring.cloud.gateway:  httpclient:    #连贯超时工夫    connect-timeout: 1000    #响应超时工夫    responseTimeout: 60s  #默认过滤器,对所有的路由申请失效,order:按申明程序从1递增  #当过滤器的order值一样时,会依照defaultFilter>路由过滤器>GlobalFilter的程序执行  default-filters:    #- StripPrefix=1 #去除申请门路的第一个前缀后拜访微服务  #全局跨域解决  globalcors:    add-to-simple-url-handler-mapping: true    cors-configurations:      '[/**]':        allowCredentials: true #是否容许携带cookie        allowedOriginPatterns: "*"        allowedMethods: "*"        allowedHeaders: "*"        maxAge: 360000 #跨域检测有效期  # 网关指定门路路由  routes:    - id: scp-im2-websocket-netty      uri: lb:ws://scp-im2-websocket-netty      predicates:        - Path=/webSocket

而后,前端通过ws://localhost:14052/webSocket 即可连贯netty集群中的服务