我的项目中须要实现即时通信IM沟通,因为我的项目是微服务架构,并且每个子系统都是集群部署模式。须要解决前端的websocket申请通过springcloud-gateway网关平衡散发到集群中的netty服务中,实现零碎的高可用。
netty服务注册到eureka中
import com.netflix.appinfo.*;import com.netflix.discovery.DiscoveryClient;import com.xgjk.xgware.im.websocket.NettyProperties;import org.springframework.beans.factory.SmartInitializingSingleton;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.cloud.commons.util.InetUtils;import org.springframework.cloud.netflix.eureka.EurekaClientConfigBean;import org.springframework.stereotype.Component;import java.util.Map;@Componentpublic class NettyDiscoveryClient implements SmartInitializingSingleton { @Autowired private EurekaInstanceConfig config; @Autowired private NettyProperties nettyProperties; @Autowired private InetUtils inetUtils; @Autowired private EurekaClientConfigBean eurekaClientConfigBean; /** * 在spring容器治理的所有单例对象(非懒加载对象)初始化实现之后调用的回调接口 */ @Override public void afterSingletonsInstantiated() { /*EurekaClientConfigBean eurekaClientConfigBean = new EurekaClientConfigBean(); eurekaClientConfigBean.setServiceUrl(new HashMap<String, String>() {{ put("defaultZone", defaultZone); }});*/ String host = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress(); InstanceInfo instanceInfo = createInstance(config); ApplicationInfoManager applicationInfoManager = new ApplicationInfoManager(new MyDataCenterInstanceConfig() { @Override public String getHostName(boolean refresh) { return host; } }, instanceInfo); //创立一个客户端实例 DiscoveryClient discoveryClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfigBean); } private InstanceInfo createInstance(EurekaInstanceConfig config) { LeaseInfo.Builder leaseInfoBuilder = LeaseInfo.Builder.newBuilder() .setRenewalIntervalInSecs(config.getLeaseRenewalIntervalInSeconds()) .setDurationInSecs(config.getLeaseExpirationDurationInSeconds()); // Builder the instance information to be registered with eureka InstanceInfo.Builder builder = InstanceInfo.Builder.newBuilder(); String namespace = config.getNamespace(); if (!namespace.endsWith(".")) { namespace = namespace + "."; } String host = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress(); builder.setNamespace(namespace).setAppName(nettyProperties.getName()) .setInstanceId(String.join(":", host, String.valueOf(nettyProperties.getPort()))) .setAppGroupName(config.getAppGroupName()) .setDataCenterInfo(config.getDataCenterInfo()) .setIPAddr(host).setHostName(host) .setPort(nettyProperties.getPort()) .enablePort(InstanceInfo.PortType.UNSECURE, config.isNonSecurePortEnabled()) .setSecurePort(config.getSecurePort()) .enablePort(InstanceInfo.PortType.SECURE, config.getSecurePortEnabled()) .setVIPAddress(nettyProperties.getName()) .setSecureVIPAddress(nettyProperties.getName()) .setHomePageUrl("/", null) .setStatusPageUrl(config.getStatusPageUrlPath(), config.getStatusPageUrl()) .setHealthCheckUrls(config.getHealthCheckUrlPath(), config.getHealthCheckUrl(), config.getSecureHealthCheckUrl()) .setASGName(config.getASGName()); builder.setStatus(InstanceInfo.InstanceStatus.UP); // Add any user-specific metadata information for (Map.Entry<String, String> mapEntry : config.getMetadataMap().entrySet()) { String key = mapEntry.getKey(); String value = mapEntry.getValue(); // only add the metadata if the value is present if (value != null && !value.isEmpty()) { builder.add(key, value); } } InstanceInfo instanceInfo = builder.build(); instanceInfo.setLeaseInfo(leaseInfoBuilder.build()); return instanceInfo; }}
import lombok.Data;import org.springframework.boot.context.properties.ConfigurationProperties;@ConfigurationProperties(prefix = "netty")@Datapublic class NettyProperties { /** * socket端口 */ private int port; private String name; private String path = "/webSocket";}
配置文件
netty: name: scp-im2-websocket-netty port: 14052 path: /webSocket
编写websocket长连贯服务端代码,因为代码较多,这里只给出入口代码
@Slf4j@Componentpublic class NettyServer { @Autowired private ChannelDeadCheckHandler channelDeadCheckHandler; @Autowired private WebSocketHandler webSocketHandler; @Autowired private MessageOutHandler messageOutHandler; @Autowired private HeartbeatHandler heartbeatHandler; @Autowired private NettyProperties nettyProperties; public void start() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup();//负责接管连贯 EventLoopGroup group = new NioEventLoopGroup();////负责解决申请,工作线程数默认是CPU数*2 try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(group, bossGroup) // 绑定线程池 .channel(NioServerSocketChannel.class) // 指定应用Nio channel .localAddress(nettyProperties.getPort())// 绑定监听端口 .childHandler(new ChannelInitializer<SocketChannel>() { public void initChannel(SocketChannel channel) { ChannelPipeline pipeline = channel.pipeline(); //websocket协定自身是基于http协定的,所以这边也要应用http解编码器 pipeline.addLast(new HttpServerCodec()); //将HTTP音讯的多个部分合成一条残缺的HTTP音讯 pipeline.addLast(new HttpObjectAggregator(65536)); //用于反对大数据流的反对 pipeline.addLast(new ChunkedWriteHandler()); //用来判断是否读闲暇工夫过长,或写闲暇工夫过长 //如果60秒中没有收到客户端音讯,则触发事件IdleState.READER_IDLE pipeline.addLast(new IdleStateHandler(60, 0, 0, TimeUnit.SECONDS)); pipeline.addLast(channelDeadCheckHandler); pipeline.addLast(webSocketHandler);//自定义音讯解决类 pipeline.addLast(heartbeatHandler); pipeline.addLast(messageOutHandler); pipeline.addLast(new WebSocketServerProtocolHandler("/webSocket", null, true, 65536 * 10)); } }) .option(ChannelOption.SO_BACKLOG, 1024)//服务端承受连贯的队列长度,如果队列已满,客户端连贯将被回绝, linux设置/proc/sys/net/core/somaxconn .childOption(ChannelOption.SO_KEEPALIVE, true);//设置放弃流动的连贯 ChannelFuture cf = bootstrap.bind().sync(); // 服务器异步创立绑定 log.info("NettyServer已启动... "); cf.channel().closeFuture().sync(); // 敞开服务器通道 } finally { group.shutdownGracefully().sync(); // 开释线程池资源 bossGroup.shutdownGracefully().sync(); } }}
springcloud-gateway我的项目中配置路由
spring.cloud.gateway: httpclient: #连贯超时工夫 connect-timeout: 1000 #响应超时工夫 responseTimeout: 60s #默认过滤器,对所有的路由申请失效,order:按申明程序从1递增 #当过滤器的order值一样时,会依照defaultFilter>路由过滤器>GlobalFilter的程序执行 default-filters: #- StripPrefix=1 #去除申请门路的第一个前缀后拜访微服务 #全局跨域解决 globalcors: add-to-simple-url-handler-mapping: true cors-configurations: '[/**]': allowCredentials: true #是否容许携带cookie allowedOriginPatterns: "*" allowedMethods: "*" allowedHeaders: "*" maxAge: 360000 #跨域检测有效期 # 网关指定门路路由 routes: - id: scp-im2-websocket-netty uri: lb:ws://scp-im2-websocket-netty predicates: - Path=/webSocket
而后,前端通过ws://localhost:14052/webSocket 即可连贯netty集群中的服务