乐趣区

关于netty:netty项目注册到eureka

我的项目中须要实现即时通信 IM 沟通,因为我的项目是微服务架构,并且每个子系统都是集群部署模式。须要解决前端的 websocket 申请通过 springcloud-gateway 网关平衡散发到集群中的 netty 服务中,实现零碎的高可用。

netty 服务注册到 eureka 中

import com.netflix.appinfo.*;
import com.netflix.discovery.DiscoveryClient;
import com.xgjk.xgware.im.websocket.NettyProperties;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.commons.util.InetUtils;
import org.springframework.cloud.netflix.eureka.EurekaClientConfigBean;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
public class NettyDiscoveryClient implements SmartInitializingSingleton {

    @Autowired
    private EurekaInstanceConfig config;

    @Autowired
    private NettyProperties nettyProperties;

    @Autowired
    private InetUtils inetUtils;

    @Autowired
    private EurekaClientConfigBean eurekaClientConfigBean;

    /**
     * 在 spring 容器治理的所有单例对象(非懒加载对象)初始化实现之后调用的回调接口
     */
    @Override
    public void afterSingletonsInstantiated() {/*EurekaClientConfigBean eurekaClientConfigBean = new EurekaClientConfigBean();
        eurekaClientConfigBean.setServiceUrl(new HashMap<String, String>() {{put("defaultZone", defaultZone);
        }});*/
        String host = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();
        InstanceInfo instanceInfo = createInstance(config);
        ApplicationInfoManager applicationInfoManager = new ApplicationInfoManager(new MyDataCenterInstanceConfig() {
            @Override
            public String getHostName(boolean refresh) {return host;}
        }, instanceInfo);
        // 创立一个客户端实例
        DiscoveryClient discoveryClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfigBean);
    }

    private InstanceInfo createInstance(EurekaInstanceConfig config) {LeaseInfo.Builder leaseInfoBuilder = LeaseInfo.Builder.newBuilder()
                .setRenewalIntervalInSecs(config.getLeaseRenewalIntervalInSeconds())
                .setDurationInSecs(config.getLeaseExpirationDurationInSeconds());

        // Builder the instance information to be registered with eureka
        InstanceInfo.Builder builder = InstanceInfo.Builder.newBuilder();

        String namespace = config.getNamespace();
        if (!namespace.endsWith(".")) {namespace = namespace + ".";}

        String host = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();
        builder.setNamespace(namespace).setAppName(nettyProperties.getName())
                .setInstanceId(String.join(":", host, String.valueOf(nettyProperties.getPort())))
                .setAppGroupName(config.getAppGroupName())
                .setDataCenterInfo(config.getDataCenterInfo())
                .setIPAddr(host).setHostName(host)
                .setPort(nettyProperties.getPort())
                .enablePort(InstanceInfo.PortType.UNSECURE,
                        config.isNonSecurePortEnabled())
                .setSecurePort(config.getSecurePort())
                .enablePort(InstanceInfo.PortType.SECURE, config.getSecurePortEnabled())
                .setVIPAddress(nettyProperties.getName())
                .setSecureVIPAddress(nettyProperties.getName())
                .setHomePageUrl("/", null)
                .setStatusPageUrl(config.getStatusPageUrlPath(),
                        config.getStatusPageUrl())
                .setHealthCheckUrls(config.getHealthCheckUrlPath(),
                        config.getHealthCheckUrl(), config.getSecureHealthCheckUrl())
                .setASGName(config.getASGName());
        builder.setStatus(InstanceInfo.InstanceStatus.UP);

        // Add any user-specific metadata information
        for (Map.Entry<String, String> mapEntry : config.getMetadataMap().entrySet()) {String key = mapEntry.getKey();
            String value = mapEntry.getValue();
            // only add the metadata if the value is present
            if (value != null && !value.isEmpty()) {builder.add(key, value);
            }
        }

        InstanceInfo instanceInfo = builder.build();
        instanceInfo.setLeaseInfo(leaseInfoBuilder.build());
        return instanceInfo;
    }
}
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;

@ConfigurationProperties(prefix = "netty")
@Data
public class NettyProperties {

    /**
     * socket 端口
     */
    private int port;

    private String name;

    private String path = "/webSocket";
}

配置文件

netty:
  name: scp-im2-websocket-netty
  port: 14052
  path: /webSocket

编写 websocket 长连贯服务端代码,因为代码较多,这里只给出入口代码

@Slf4j
@Component
public class NettyServer {

    @Autowired
    private ChannelDeadCheckHandler channelDeadCheckHandler;

    @Autowired
    private WebSocketHandler webSocketHandler;

    @Autowired
    private MessageOutHandler messageOutHandler;

    @Autowired
    private HeartbeatHandler heartbeatHandler;

    @Autowired
    private NettyProperties nettyProperties;

    public void start() throws Exception {EventLoopGroup bossGroup = new NioEventLoopGroup();// 负责接管连贯
        EventLoopGroup group = new NioEventLoopGroup();//// 负责解决申请,工作线程数默认是 CPU 数 *2
        try {ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(group, bossGroup) // 绑定线程池
                    .channel(NioServerSocketChannel.class) // 指定应用 Nio channel
                    .localAddress(nettyProperties.getPort())// 绑定监听端口
                    .childHandler(new ChannelInitializer<SocketChannel>() {public void initChannel(SocketChannel channel) {ChannelPipeline pipeline = channel.pipeline();
                            //websocket 协定自身是基于 http 协定的,所以这边也要应用 http 解编码器
                            pipeline.addLast(new HttpServerCodec());
                            // 将 HTTP 音讯的多个部分合成一条残缺的 HTTP 音讯
                            pipeline.addLast(new HttpObjectAggregator(65536));
                            // 用于反对大数据流的反对
                            pipeline.addLast(new ChunkedWriteHandler());
                            // 用来判断是否读闲暇工夫过长,或写闲暇工夫过长
                            // 如果 60 秒中没有收到客户端音讯,则触发事件 IdleState.READER_IDLE
                            pipeline.addLast(new IdleStateHandler(60, 0, 0, TimeUnit.SECONDS));
                            pipeline.addLast(channelDeadCheckHandler);
                            pipeline.addLast(webSocketHandler);// 自定义音讯解决类
                            pipeline.addLast(heartbeatHandler);

                            pipeline.addLast(messageOutHandler);
                            pipeline.addLast(new WebSocketServerProtocolHandler("/webSocket", null, true, 65536 * 10));
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 1024)// 服务端承受连贯的队列长度,如果队列已满,客户端连贯将被回绝, linux 设置 /proc/sys/net/core/somaxconn
                    .childOption(ChannelOption.SO_KEEPALIVE, true);// 设置放弃流动的连贯
            ChannelFuture cf = bootstrap.bind().sync(); // 服务器异步创立绑定
            log.info("NettyServer 已启动...");
            cf.channel().closeFuture().sync(); // 敞开服务器通道} finally {group.shutdownGracefully().sync(); // 开释线程池资源
            bossGroup.shutdownGracefully().sync();
        }
    }
}

springcloud-gateway 我的项目中配置路由

spring.cloud.gateway:
  httpclient:
    #连贯超时工夫
    connect-timeout: 1000
    #响应超时工夫
    responseTimeout: 60s
  #默认过滤器,对所有的路由申请失效,order:按申明程序从 1 递增
  #当过滤器的 order 值一样时,会依照 defaultFilter> 路由过滤器 >GlobalFilter 的程序执行
  default-filters:
    #- StripPrefix=1 #去除申请门路的第一个前缀后拜访微服务
  #全局跨域解决
  globalcors:
    add-to-simple-url-handler-mapping: true
    cors-configurations:
      '[/**]':
        allowCredentials: true #是否容许携带 cookie
        allowedOriginPatterns: "*"
        allowedMethods: "*"
        allowedHeaders: "*"
        maxAge: 360000 #跨域检测有效期
  # 网关指定门路路由
  routes:
    - id: scp-im2-websocket-netty
      uri: lb:ws://scp-im2-websocket-netty
      predicates:
        - Path=/webSocket

而后,前端通过 ws://localhost:14052/webSocket 即可连贯 netty 集群中的服务

退出移动版