首先:因为工作须要,须要对接socket.io框架对接,所以目前只能应用netty-socketio。websocket是不反对对接socket.io框架的。
netty-socketio顾名思义他是一个底层基于netty’实现的socket。
在springboot我的项目中的集成,请看上面的代码
maven依赖
<dependency> <groupId>com.corundumstudio.socketio</groupId> <artifactId>netty-socketio</artifactId> <version>1.7.11</version></dependency>
上面就是代码了
首先是配置参数
#socketio配置socketio: host: localhost port: 9099 # 设置最大每帧解决数据的长度,避免别人利用大数据来攻打服务器 maxFramePayloadLength: 1048576 # 设置http交互最大内容长度 maxHttpContentLength: 1048576 # socket连接数大小(如只监听一个端口boss线程组为1即可) bossCount: 1 workCount: 100 allowCustomRequests: true # 协定降级超时工夫(毫秒),默认10秒。HTTP握手降级为ws协定超时工夫 upgradeTimeout: 1000000 # Ping音讯超时工夫(毫秒),默认60秒,这个工夫距离内没有接管到心跳音讯就会发送超时事件 pingTimeout: 6000000 # Ping音讯距离(毫秒),默认25秒。客户端向服务器发送一条心跳音讯距离 pingInterval: 25000
下面的正文写的很分明。上面是config代码
import com.corundumstudio.socketio.Configuration;import com.corundumstudio.socketio.SocketConfig;import com.corundumstudio.socketio.SocketIOServer;import org.springframework.beans.factory.InitializingBean;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Value;import org.springframework.stereotype.Component;import javax.annotation.Resource;/** * kcm */@Componentpublic class PushServer implements InitializingBean { @Autowired private EventListenner eventListenner; @Value("${socketio.port}") private int serverPort; @Value("${socketio.host}") private String serverHost; @Value("${socketio.bossCount}") private int bossCount; @Value("${socketio.workCount}") private int workCount; @Value("${socketio.allowCustomRequests}") private boolean allowCustomRequests; @Value("${socketio.upgradeTimeout}") private int upgradeTimeout; @Value("${socketio.pingTimeout}") private int pingTimeout; @Value("${socketio.pingInterval}") private int pingInterval; @Override public void afterPropertiesSet() throws Exception { Configuration config = new Configuration(); config.setPort(serverPort); config.setHostname(serverHost); config.setBossThreads(bossCount); config.setWorkerThreads(workCount); config.setAllowCustomRequests(allowCustomRequests); config.setUpgradeTimeout(upgradeTimeout); config.setPingTimeout(pingTimeout); config.setPingInterval(pingInterval); SocketConfig socketConfig = new SocketConfig(); socketConfig.setReuseAddress(true); socketConfig.setTcpNoDelay(true); socketConfig.setSoLinger(0); config.setSocketConfig(socketConfig); SocketIOServer server = new SocketIOServer(config); server.addListeners(eventListenner); server.start(); System.out.println("启动失常"); }}
在就是监听代码
import com.corundumstudio.socketio.AckRequest;import com.corundumstudio.socketio.SocketIOClient;import com.corundumstudio.socketio.annotation.OnConnect;import com.corundumstudio.socketio.annotation.OnDisconnect;import com.corundumstudio.socketio.annotation.OnEvent;import org.apache.commons.lang3.StringUtils;import org.bangying.auth.JwtSupport;import org.springframework.stereotype.Component;import javax.annotation.Resource;import java.util.UUID;@Componentpublic class EventListenner { @Resource private ClientCache clientCache; @Resource private JwtSupport jwtSupport; /** * 客户端连贯 * * @param client */ @OnConnect public void onConnect(SocketIOClient client) { String userId = client.getHandshakeData().getSingleUrlParam("userId");// userId = jwtSupport.getApplicationUser().getId().toString();// userId = "8"; UUID sessionId = client.getSessionId(); clientCache.saveClient(userId, sessionId, client); System.out.println("建设连贯"); } /** * 客户端断开 * * @param client */ @OnDisconnect public void onDisconnect(SocketIOClient client) { String userId = client.getHandshakeData().getSingleUrlParam("userId"); if (StringUtils.isNotBlank(userId)) { clientCache.deleteSessionClient(userId, client.getSessionId()); System.out.println("敞开连贯"); } } //音讯接管入口,当接管到音讯后,查找发送指标客户端,并且向该客户端发送音讯,且给本人发送音讯 // 暂未应用 @OnEvent("messageevent") public void onEvent(SocketIOClient client, AckRequest request) { }}
本地缓存信息
import com.corundumstudio.socketio.SocketIOClient;import org.apache.commons.lang3.StringUtils;import org.springframework.stereotype.Component;import java.util.HashMap;import java.util.Map;import java.util.UUID;import java.util.concurrent.ConcurrentHashMap;/** * kcm */@Componentpublic class ClientCache { //本地缓存 private static Map<String, HashMap<UUID, SocketIOClient>> concurrentHashMap=new ConcurrentHashMap<>(); /** * 存入本地缓存 * @param userId 用户ID * @param sessionId 页面sessionID * @param socketIOClient 页面对应的通道连贯信息 */ public void saveClient(String userId, UUID sessionId,SocketIOClient socketIOClient){ if(StringUtils.isNotBlank(userId)){ HashMap<UUID, SocketIOClient> sessionIdClientCache=concurrentHashMap.get(userId); if(sessionIdClientCache==null){ sessionIdClientCache = new HashMap<>(); } sessionIdClientCache.put(sessionId,socketIOClient); concurrentHashMap.put(userId,sessionIdClientCache); } } /** * 依据用户ID获取所有通道信息 * @param userId * @return */ public HashMap<UUID, SocketIOClient> getUserClient(String userId){ return concurrentHashMap.get(userId); } /** * 依据用户ID及页面sessionID删除页面链接信息 * @param userId * @param sessionId */ public void deleteSessionClient(String userId,UUID sessionId){ concurrentHashMap.get(userId).remove(sessionId); }}
上面是存储客户端连贯信息
import com.corundumstudio.socketio.SocketIOClient;import org.apache.commons.lang3.StringUtils;import org.springframework.stereotype.Component;import java.util.HashMap;import java.util.Map;import java.util.UUID;import java.util.concurrent.ConcurrentHashMap;/** * kcm */@Componentpublic class ClientCache { //本地缓存 private static Map<String, HashMap<UUID, SocketIOClient>> concurrentHashMap=new ConcurrentHashMap<>(); /** * 存入本地缓存 * @param userId 用户ID * @param sessionId 页面sessionID * @param socketIOClient 页面对应的通道连贯信息 */ public void saveClient(String userId, UUID sessionId,SocketIOClient socketIOClient){ if(StringUtils.isNotBlank(userId)){ HashMap<UUID, SocketIOClient> sessionIdClientCache=concurrentHashMap.get(userId); if(sessionIdClientCache==null){ sessionIdClientCache = new HashMap<>(); } sessionIdClientCache.put(sessionId,socketIOClient); concurrentHashMap.put(userId,sessionIdClientCache); } } /** * 依据用户ID获取所有通道信息 * @param userId * @return */ public HashMap<UUID, SocketIOClient> getUserClient(String userId){ return concurrentHashMap.get(userId); } /** * 依据用户ID及页面sessionID删除页面链接信息 * @param userId * @param sessionId */ public void deleteSessionClient(String userId,UUID sessionId){ concurrentHashMap.get(userId).remove(sessionId); }}
管制层推送办法
@RestController@RequestMapping("/push")public class PushController { @Resource private ClientCache clientCache; @Autowired private JwtSupport jwtSupport; @GetMapping("/message") public String pushTuUser(@Param("id") String id){ Integer userId = jwtSupport.getApplicationUser().getId(); HashMap<UUID, SocketIOClient> userClient = clientCache.getUserClient(String.valueOf(userId)); userClient.forEach((uuid, socketIOClient) -> { //向客户端推送音讯 socketIOClient.sendEvent("chatevent","服务端推送音讯"); }); return "success"; }}
————————————————
版权申明:本文为CSDN博主「ATwill...」的原创文章,遵循CC 4.0 BY-SA版权协定,转载请附上原文出处链接及本申明。
原文链接:https://blog.csdn.net/kang649...