关于java:SpringBoot-nettysocketio实现服务器端消息推送

36次阅读

共计 6600 个字符,预计需要花费 17 分钟才能阅读完成。

首先:因为工作须要,须要对接 socket.io 框架对接,所以目前只能应用 netty-socketio。websocket 是不反对对接 socket.io 框架的。
netty-socketio 顾名思义他是一个底层基于 netty’实现的 socket。
在 springboot 我的项目中的集成,请看上面的代码

maven 依赖

<dependency>
    <groupId>com.corundumstudio.socketio</groupId>
    <artifactId>netty-socketio</artifactId>
    <version>1.7.11</version>
</dependency>

上面就是代码了
首先是配置参数

#socketio 配置
socketio:
  host: localhost
  port: 9099
  # 设置最大每帧解决数据的长度,避免别人利用大数据来攻打服务器
  maxFramePayloadLength: 1048576
  # 设置 http 交互最大内容长度
  maxHttpContentLength: 1048576
  # socket 连接数大小(如只监听一个端口 boss 线程组为 1 即可)bossCount: 1
  workCount: 100
  allowCustomRequests: true
  # 协定降级超时工夫(毫秒),默认 10 秒。HTTP 握手降级为 ws 协定超时工夫
  upgradeTimeout: 1000000
  # Ping 音讯超时工夫(毫秒),默认 60 秒,这个工夫距离内没有接管到心跳音讯就会发送超时事件
  pingTimeout: 6000000
  # Ping 音讯距离(毫秒),默认 25 秒。客户端向服务器发送一条心跳音讯距离
  pingInterval: 25000

下面的正文写的很分明。上面是 config 代码

import com.corundumstudio.socketio.Configuration;
import com.corundumstudio.socketio.SocketConfig;
import com.corundumstudio.socketio.SocketIOServer;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * kcm
 */
@Component
public class PushServer implements InitializingBean {

    @Autowired
    private EventListenner eventListenner;

    @Value("${socketio.port}")
    private int serverPort;

    @Value("${socketio.host}")
    private String serverHost;

    @Value("${socketio.bossCount}")
    private int bossCount;

    @Value("${socketio.workCount}")
    private int workCount;

    @Value("${socketio.allowCustomRequests}")
    private boolean allowCustomRequests;

    @Value("${socketio.upgradeTimeout}")
    private int upgradeTimeout;

    @Value("${socketio.pingTimeout}")
    private int pingTimeout;

    @Value("${socketio.pingInterval}")
    private int pingInterval;

    @Override
    public void afterPropertiesSet() throws Exception {Configuration config = new Configuration();
        config.setPort(serverPort);
        config.setHostname(serverHost);
        config.setBossThreads(bossCount);
        config.setWorkerThreads(workCount);
        config.setAllowCustomRequests(allowCustomRequests);
        config.setUpgradeTimeout(upgradeTimeout);
        config.setPingTimeout(pingTimeout);
        config.setPingInterval(pingInterval);

        SocketConfig socketConfig = new SocketConfig();
        socketConfig.setReuseAddress(true);
        socketConfig.setTcpNoDelay(true);
        socketConfig.setSoLinger(0);
        config.setSocketConfig(socketConfig);

        SocketIOServer server = new SocketIOServer(config);
        server.addListeners(eventListenner);
        server.start();
        System.out.println("启动失常");
    }
}

在就是监听代码

import com.corundumstudio.socketio.AckRequest;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.annotation.OnConnect;
import com.corundumstudio.socketio.annotation.OnDisconnect;
import com.corundumstudio.socketio.annotation.OnEvent;
import org.apache.commons.lang3.StringUtils;
import org.bangying.auth.JwtSupport;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.UUID;

@Component
public class EventListenner {
    @Resource
    private ClientCache clientCache;

    @Resource
    private JwtSupport jwtSupport;

    /**
     * 客户端连贯
     *
     * @param client
     */
    @OnConnect
    public void onConnect(SocketIOClient client) {String userId = client.getHandshakeData().getSingleUrlParam("userId");
//        userId = jwtSupport.getApplicationUser().getId().toString();
//        userId = "8";
        UUID sessionId = client.getSessionId();
        clientCache.saveClient(userId, sessionId, client);
        System.out.println("建设连贯");
    }

    /**
     * 客户端断开
     *
     * @param client
     */
    @OnDisconnect
    public void onDisconnect(SocketIOClient client) {String userId = client.getHandshakeData().getSingleUrlParam("userId");
        if (StringUtils.isNotBlank(userId)) {clientCache.deleteSessionClient(userId, client.getSessionId());
            System.out.println("敞开连贯");
        }
    }

    // 音讯接管入口,当接管到音讯后,查找发送指标客户端,并且向该客户端发送音讯,且给本人发送音讯
    // 暂未应用
    @OnEvent("messageevent")
    public void onEvent(SocketIOClient client, AckRequest request) {}}

本地缓存信息

import com.corundumstudio.socketio.SocketIOClient;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

/**
 * kcm
 */
@Component
public class ClientCache {

    // 本地缓存
    private static Map<String, HashMap<UUID, SocketIOClient>> concurrentHashMap=new ConcurrentHashMap<>();
    /**
     * 存入本地缓存
     * @param userId 用户 ID
     * @param sessionId 页面 sessionID
     * @param socketIOClient 页面对应的通道连贯信息
     */
    public void saveClient(String userId, UUID sessionId,SocketIOClient socketIOClient){if(StringUtils.isNotBlank(userId)){HashMap<UUID, SocketIOClient> sessionIdClientCache=concurrentHashMap.get(userId);
            if(sessionIdClientCache==null){sessionIdClientCache = new HashMap<>();
            }
            sessionIdClientCache.put(sessionId,socketIOClient);
            concurrentHashMap.put(userId,sessionIdClientCache);
        }
    }
    /**
     * 依据用户 ID 获取所有通道信息
     * @param userId
     * @return
     */
    public HashMap<UUID, SocketIOClient> getUserClient(String userId){return concurrentHashMap.get(userId);
    }
    /**
     * 依据用户 ID 及页面 sessionID 删除页面链接信息
     * @param userId
     * @param sessionId
     */
    public void deleteSessionClient(String userId,UUID sessionId){concurrentHashMap.get(userId).remove(sessionId);
    }
}

上面是存储客户端连贯信息

import com.corundumstudio.socketio.SocketIOClient;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

/**
 * kcm
 */
@Component
public class ClientCache {

    // 本地缓存
    private static Map<String, HashMap<UUID, SocketIOClient>> concurrentHashMap=new ConcurrentHashMap<>();
    /**
     * 存入本地缓存
     * @param userId 用户 ID
     * @param sessionId 页面 sessionID
     * @param socketIOClient 页面对应的通道连贯信息
     */
    public void saveClient(String userId, UUID sessionId,SocketIOClient socketIOClient){if(StringUtils.isNotBlank(userId)){HashMap<UUID, SocketIOClient> sessionIdClientCache=concurrentHashMap.get(userId);
            if(sessionIdClientCache==null){sessionIdClientCache = new HashMap<>();
            }
            sessionIdClientCache.put(sessionId,socketIOClient);
            concurrentHashMap.put(userId,sessionIdClientCache);
        }
    }
    /**
     * 依据用户 ID 获取所有通道信息
     * @param userId
     * @return
     */
    public HashMap<UUID, SocketIOClient> getUserClient(String userId){return concurrentHashMap.get(userId);
    }
    /**
     * 依据用户 ID 及页面 sessionID 删除页面链接信息
     * @param userId
     * @param sessionId
     */
    public void deleteSessionClient(String userId,UUID sessionId){concurrentHashMap.get(userId).remove(sessionId);
    }
}

管制层推送办法

@RestController
@RequestMapping("/push")
public class PushController {
    @Resource
    private ClientCache clientCache;

    @Autowired
    private JwtSupport jwtSupport;

    @GetMapping("/message")
    public String pushTuUser(@Param("id") String id){Integer userId = jwtSupport.getApplicationUser().getId();
        HashMap<UUID, SocketIOClient> userClient = clientCache.getUserClient(String.valueOf(userId));
        userClient.forEach((uuid, socketIOClient) -> {
            // 向客户端推送音讯
            socketIOClient.sendEvent("chatevent","服务端推送音讯");
        });
        return "success";
    }
}

————————————————
版权申明:本文为 CSDN 博主「ATwill…」的原创文章,遵循 CC 4.0 BY-SA 版权协定,转载请附上原文出处链接及本申明。
原文链接:https://blog.csdn.net/kang649…

正文完
 0