共计 6020 个字符,预计需要花费 16 分钟才能阅读完成。
我的项目中须要实现即时通信 IM 沟通,因为我的项目是微服务架构,并且每个子系统都是集群部署模式。须要解决前端的 websocket 申请通过 springcloud-gateway 网关平衡散发到集群中的 netty 服务中,实现零碎的高可用。
netty 服务注册到 eureka 中
import com.netflix.appinfo.*;
import com.netflix.discovery.DiscoveryClient;
import com.xgjk.xgware.im.websocket.NettyProperties;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.commons.util.InetUtils;
import org.springframework.cloud.netflix.eureka.EurekaClientConfigBean;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
public class NettyDiscoveryClient implements SmartInitializingSingleton {
@Autowired
private EurekaInstanceConfig config;
@Autowired
private NettyProperties nettyProperties;
@Autowired
private InetUtils inetUtils;
@Autowired
private EurekaClientConfigBean eurekaClientConfigBean;
/**
* 在 spring 容器治理的所有单例对象(非懒加载对象)初始化实现之后调用的回调接口
*/
@Override
public void afterSingletonsInstantiated() {/*EurekaClientConfigBean eurekaClientConfigBean = new EurekaClientConfigBean();
eurekaClientConfigBean.setServiceUrl(new HashMap<String, String>() {{put("defaultZone", defaultZone);
}});*/
String host = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();
InstanceInfo instanceInfo = createInstance(config);
ApplicationInfoManager applicationInfoManager = new ApplicationInfoManager(new MyDataCenterInstanceConfig() {
@Override
public String getHostName(boolean refresh) {return host;}
}, instanceInfo);
// 创立一个客户端实例
DiscoveryClient discoveryClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfigBean);
}
private InstanceInfo createInstance(EurekaInstanceConfig config) {LeaseInfo.Builder leaseInfoBuilder = LeaseInfo.Builder.newBuilder()
.setRenewalIntervalInSecs(config.getLeaseRenewalIntervalInSeconds())
.setDurationInSecs(config.getLeaseExpirationDurationInSeconds());
// Builder the instance information to be registered with eureka
InstanceInfo.Builder builder = InstanceInfo.Builder.newBuilder();
String namespace = config.getNamespace();
if (!namespace.endsWith(".")) {namespace = namespace + ".";}
String host = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();
builder.setNamespace(namespace).setAppName(nettyProperties.getName())
.setInstanceId(String.join(":", host, String.valueOf(nettyProperties.getPort())))
.setAppGroupName(config.getAppGroupName())
.setDataCenterInfo(config.getDataCenterInfo())
.setIPAddr(host).setHostName(host)
.setPort(nettyProperties.getPort())
.enablePort(InstanceInfo.PortType.UNSECURE,
config.isNonSecurePortEnabled())
.setSecurePort(config.getSecurePort())
.enablePort(InstanceInfo.PortType.SECURE, config.getSecurePortEnabled())
.setVIPAddress(nettyProperties.getName())
.setSecureVIPAddress(nettyProperties.getName())
.setHomePageUrl("/", null)
.setStatusPageUrl(config.getStatusPageUrlPath(),
config.getStatusPageUrl())
.setHealthCheckUrls(config.getHealthCheckUrlPath(),
config.getHealthCheckUrl(), config.getSecureHealthCheckUrl())
.setASGName(config.getASGName());
builder.setStatus(InstanceInfo.InstanceStatus.UP);
// Add any user-specific metadata information
for (Map.Entry<String, String> mapEntry : config.getMetadataMap().entrySet()) {String key = mapEntry.getKey();
String value = mapEntry.getValue();
// only add the metadata if the value is present
if (value != null && !value.isEmpty()) {builder.add(key, value);
}
}
InstanceInfo instanceInfo = builder.build();
instanceInfo.setLeaseInfo(leaseInfoBuilder.build());
return instanceInfo;
}
}
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
@ConfigurationProperties(prefix = "netty")
@Data
public class NettyProperties {
/**
* socket 端口
*/
private int port;
private String name;
private String path = "/webSocket";
}
配置文件
netty:
name: scp-im2-websocket-netty
port: 14052
path: /webSocket
编写 websocket 长连贯服务端代码,因为代码较多,这里只给出入口代码
@Slf4j
@Component
public class NettyServer {
@Autowired
private ChannelDeadCheckHandler channelDeadCheckHandler;
@Autowired
private WebSocketHandler webSocketHandler;
@Autowired
private MessageOutHandler messageOutHandler;
@Autowired
private HeartbeatHandler heartbeatHandler;
@Autowired
private NettyProperties nettyProperties;
public void start() throws Exception {EventLoopGroup bossGroup = new NioEventLoopGroup();// 负责接管连贯
EventLoopGroup group = new NioEventLoopGroup();//// 负责解决申请,工作线程数默认是 CPU 数 *2
try {ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(group, bossGroup) // 绑定线程池
.channel(NioServerSocketChannel.class) // 指定应用 Nio channel
.localAddress(nettyProperties.getPort())// 绑定监听端口
.childHandler(new ChannelInitializer<SocketChannel>() {public void initChannel(SocketChannel channel) {ChannelPipeline pipeline = channel.pipeline();
//websocket 协定自身是基于 http 协定的,所以这边也要应用 http 解编码器
pipeline.addLast(new HttpServerCodec());
// 将 HTTP 音讯的多个部分合成一条残缺的 HTTP 音讯
pipeline.addLast(new HttpObjectAggregator(65536));
// 用于反对大数据流的反对
pipeline.addLast(new ChunkedWriteHandler());
// 用来判断是否读闲暇工夫过长,或写闲暇工夫过长
// 如果 60 秒中没有收到客户端音讯,则触发事件 IdleState.READER_IDLE
pipeline.addLast(new IdleStateHandler(60, 0, 0, TimeUnit.SECONDS));
pipeline.addLast(channelDeadCheckHandler);
pipeline.addLast(webSocketHandler);// 自定义音讯解决类
pipeline.addLast(heartbeatHandler);
pipeline.addLast(messageOutHandler);
pipeline.addLast(new WebSocketServerProtocolHandler("/webSocket", null, true, 65536 * 10));
}
})
.option(ChannelOption.SO_BACKLOG, 1024)// 服务端承受连贯的队列长度,如果队列已满,客户端连贯将被回绝, linux 设置 /proc/sys/net/core/somaxconn
.childOption(ChannelOption.SO_KEEPALIVE, true);// 设置放弃流动的连贯
ChannelFuture cf = bootstrap.bind().sync(); // 服务器异步创立绑定
log.info("NettyServer 已启动...");
cf.channel().closeFuture().sync(); // 敞开服务器通道} finally {group.shutdownGracefully().sync(); // 开释线程池资源
bossGroup.shutdownGracefully().sync();
}
}
}
springcloud-gateway 我的项目中配置路由
spring.cloud.gateway:
httpclient:
#连贯超时工夫
connect-timeout: 1000
#响应超时工夫
responseTimeout: 60s
#默认过滤器,对所有的路由申请失效,order:按申明程序从 1 递增
#当过滤器的 order 值一样时,会依照 defaultFilter> 路由过滤器 >GlobalFilter 的程序执行
default-filters:
#- StripPrefix=1 #去除申请门路的第一个前缀后拜访微服务
#全局跨域解决
globalcors:
add-to-simple-url-handler-mapping: true
cors-configurations:
'[/**]':
allowCredentials: true #是否容许携带 cookie
allowedOriginPatterns: "*"
allowedMethods: "*"
allowedHeaders: "*"
maxAge: 360000 #跨域检测有效期
# 网关指定门路路由
routes:
- id: scp-im2-websocket-netty
uri: lb:ws://scp-im2-websocket-netty
predicates:
- Path=/webSocket
而后,前端通过 ws://localhost:14052/webSocket 即可连贯 netty 集群中的服务
正文完