消息重发中需要注意的问题
由于最近工作中接触了比较多关闭消息推送以及异常重发机制的问题,终于得空总结一下经验
目前接触的消息推送分为两种
主动推送:一般为 websocket 建立长连接实现,此处网上多有各种实现方式。下面贴出本人结合实际应用场景使用的长连接方式。
websocket 服务端代码
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;
import javax.annotation.PostConstruct;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
@ServerEndpoint(value = “/websocket/{id}”)
@Component
@Slf4j
public class WebSocket {
// 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
private static int onlineCount = 0;
// concurrent 包的线程安全 Set,用来存放每个客户端对应的 MyWebSocket 对象。
private static ConcurrentHashMap<String, WebSocket> webSocketSet = new ConcurrentHashMap<>();
// 保存允许建立连接的 id
private static List<String> idList = Lists.newArrayList();
private String id = “”;
/**
* 这里使用 AutoWired 注入的 bean 会出现无法持续保存而出现 null 的情况。
* 具体原因暂时没有深究,如果有需要时,可以再 init 初始化方法中手动将临时的 beanTmp 类存入 static 常量中即可正常使用该 bean 类。
* @Autowired
* private RedisCacheUtil redisTmp;
* private static RedisCacheUtil redis;
*
*/
// 与某个客户端的连接会话,需要通过它来给客户端发送数据
private Session session;
public void closeConn(String appId) {
// 关闭连接
try {
WebSocket socket = webSocketSet.get(appId);
if (null != socket) {
if (socket.session.isOpen()) {
socket.session.close();
}
}
} catch (IOException e) {
System.out.println(“IO 异常 ”);
e.printStackTrace();
}
idList.remove(appId);
}
/**
* 连接 / 注册时去重
*/
public void conn(String appId) {
// 去重
if (!idList.contains(appId)) {
idList.add(appId);
}
}
/**
* 获取注册在 websocket 进行连接的 id
*/
public static List<String> getIdList() {
return idList;
}
/**
* 初始化方法
* @author caoting
* @date 2019 年 2 月 13 日
*/
@PostConstruct
public void init() {
try {
/**
* TODO 这里的设计是在项目启动时从 DB 或者缓存中获取注册了允许建立连接的 id
* 然后将获取到的 id 存入内存 –idList
* // 从数据库获取 idList
* List<WsIds> ids = wsIdsServiceTmp.selectList(null);
*/
// TODO 初始化时将刚注入的对象进行静态保存
// redis = redisTmp;
} catch (Exception e) {
// TODO 项目启动错误信息
}
}
/**
* 连接启动时查询是否有滞留的新邮件提醒
* @param id
*
* @author caoting
* @throws IOException
* @date 2019 年 2 月 28 日
*/
private void selectOfflineMail(String id) throws IOException {
// 查询缓存中是否存在离线邮件消息
Jedis jedis = redis.getConnection();
try {
List<String> mails = jedis.lrange(Constant.MAIL_OFFLINE+id, 0, -1);
if (CommomUtil.isNotEmpty(mails)) {
for (String mailuuid : mails) {
String mail = jedis.get(mailuuid);
if (StringUtils.isNotEmpty(mail))
sendToUser(Constant.MESSAGE_MAIL + mail, id);
Thread.sleep(1000);
}
// 发送完成从缓存中移除
jedis.del(Constant.MAIL_OFFLINE+id);
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
jedis.close();
}
}
/**
* 连接建立成功调用的方法
* @param id
*/
@OnOpen
public void onOpen(@PathParam(value = “id”) String id, Session session) {
try {
// 注:ws-admin 是管理员内部使用通道 不受监控 谨慎使用
if (!id.contains(Constant.WS_ADMIN)) {
this.session = session;
this.id = id;// 接收到发送消息的人员编号
// 验证 id 是否在允许
if (idList.contains(id)) {
// 判断是否已存在相同 id
WebSocket socket = webSocketSet.get(id);
if (socket == null) {
webSocketSet.put(id, this); // 加入 set 中
addOnlineCount(); // 在线数加 1
this.sendMessage(“Hello:::” + id);
System.out.println(“ 用户 ”+id+” 加入!当前在线人数为 ” + getOnlineCount());
// 检查是否存在离线推送消息
selectOfflineMail(id);
} else {
this.sendMessage(Constant.MESSAGE_ERROR+” 连接 id 重复 – 连接即将关闭 ”);
this.session.close();
}
} else {
// 查询数据库中是否存在数据
WsIds wsIds = wsIdsService.selectByAppId(id);
if (null != wsIds) {
idList.add(id);
webSocketSet.put(id, this); // 加入 set 中
addOnlineCount(); // 在线数加 1
this.sendMessage(“Hello:::” + id);
log.debug(“ 用户 ”+id+” 加入!当前在线人数为 ” + getOnlineCount());
// 检查是否存在离线推送消息
selectOfflineMail(id);
} else {
// 关闭
this.sendMessage(Constant.MESSAGE_ERROR+” 暂无连接权限, 连接即将关闭, 请确认连接申请是否过期!”);
this.session.close();
log.warn(“ 有异常应用尝试与服务器进行长连接 使用 id 为:”+id);
}
}
} else {
this.session = session;
this.id = id;// 接收到发送消息的人员编号
webSocketSet.put(id, this); // 加入 set 中
addOnlineCount(); // 在线数加 1
this.sendMessage(“Hello:::” + id);
log.debug(“ 用户 ”+id+” 加入!当前在线人数为 ” + getOnlineCount());
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
webSocketSet.remove(this.id); // 从 set 中删除
subOnlineCount(); // 在线数减 1
log.debug(“ 有一连接关闭!当前在线人数为 ” + getOnlineCount());
}
/**
* 收到客户端消息后调用的方法
*
* @param message
* 客户端发送过来的消息
*/
@OnMessage
public void onMessage(String message, Session session) {
log.debug(“ 来自客户端的消息:” + message);
// TODO 收到客户端消息后的操作
}
/**
* 发生错误时调用
*/
@OnError
public void onError(Session session, Throwable error) {
log.debug(“ 发生错误 ”);
error.printStackTrace();
}
public void sendMessage(String message) throws IOException {
this.session.getAsyncRemote().sendText(message);
}
/**
* 发送信息给指定 ID 用户,如果用户不在线则返回不在线信息给自己
* @param message
* @param sendUserId
* @throws IOException
*/
public Boolean sendToUser(String message, String sendUserId) throws IOException {
Boolean flag = true;
WebSocket socket = webSocketSet.get(sendUserId);
if (socket != null) {
try {
if (socket.session.isOpen()) {
socket.sendMessage(message);
} else {
flag = false;
}
} catch (Exception e) {
flag = false;
e.printStackTrace();
}
} else {
flag = false;
log.warn(“【” + sendUserId + “】该用户不在线 ”);
}
return flag;
}
/**
* 群发自定义消息
*/
public void sendToAll(String message) throws IOException {
for (String key : webSocketSet.keySet()) {
try {
WebSocket socket = webSocketSet.get(key);
if (socket.session.isOpen()) {
socket.sendMessage(message);
}
} catch (IOException e) {
continue;
}
}
}
public static synchronized int getOnlineCount() {
return onlineCount;
}
public static synchronized void addOnlineCount() {
WebSocket.onlineCount++;
}
public static synchronized void subOnlineCount() {
if (WebSocket.onlineCount > 0)
WebSocket.onlineCount–;
}
}
这里使用的是较为原始的 websocket 连接方式,事实上 springboot 已经融合了 websocket,工作关系没有空暂未研究。记录一下有空了再去写写 demo。这个 socket 服务端主要实现了:1. 连接控制,建立连接时验证 id 的合法性。无证连接进行异常记录并关闭连接。2. 离线消息检测到上线立即推送 这是消息推送需要实现的基本功能之一了,详见代码。3. 统计在线人数 依旧是基本功能
下面是 websocket 服务端配置类 WebSocketServerConfig
import lombok.extern.slf4j.Slf4j;
import org.apache.catalina.session.StandardSessionFacade;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
import javax.servlet.http.HttpSession;
import javax.websocket.HandshakeResponse;
import javax.websocket.server.HandshakeRequest;
import javax.websocket.server.ServerEndpointConfig;
import javax.websocket.server.ServerEndpointConfig.Configurator;
@Configuration
@Slf4j
public class WebSocketServerConfig extends Configurator {
@Override
public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {
/* 如果没有监听器, 那么这里获取到的 HttpSession 是 null */
StandardSessionFacade ssf = (StandardSessionFacade) request.getHttpSession();
if (ssf != null) {
HttpSession httpSession = (HttpSession) request.getHttpSession();
// 关键操作
sec.getUserProperties().put(“sessionId”, httpSession.getId());
log.debug(“ 获取到的 SessionID:” + httpSession.getId());
}
}
/**
* 如果使用独立的 servlet 容器,而不是直接使用 springboot 的内置容器
* 就不要注入 ServerEndpointExporter,因为它将由容器自己提供和管理。
* 即:生产环境中在独立的 tomcat 运行时请注释掉这个 bean
*
* @return
*
* @author caoting
* @date 2019 年 2 月 20 日
*/
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
这里其实有个坑,就是上述代码中的 bean 类 serverEndpointExporter,开发环境如果不是配置独立的 tomcat 运行的话是需要注入的,但是生产环境下在独立的 tomcat 容器运行时是需要注释掉的,否则会报错。
很重要的 session 监听器 RequestListener
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.servlet.ServletRequestEvent;
import javax.servlet.ServletRequestListener;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpSession;
/**
* 监听器类: 主要任务是用 ServletRequest 将我们的 HttpSession 携带过去
* 此注解千万千万不要忘记,它的主要作用就是将这个监听器纳入到 Spring 容器中进行管理, 相当于注册监听
*/
@Component
@Slf4j
public class RequestListener implements ServletRequestListener {
@Override
public void requestInitialized(ServletRequestEvent sre) {
// 将所有 request 请求都携带上 httpSession
HttpSession httpSession = ((HttpServletRequest) sre.getServletRequest()).getSession();
log.debug(“ 将所有 request 请求都携带上 httpSession ” + httpSession.getId());
}
public RequestListener() {
}
@Override
public void requestDestroyed(ServletRequestEvent arg0) {
}
}
以上就是一个 websocket 服务端需要的所有配置和类
websocket 客户端代码
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.shaded.com.google.common.collect.Lists;
import org.apache.curator.shaded.com.google.common.collect.Maps;
import redis.clients.jedis.Jedis;
import javax.websocket.*;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
/**
* @author caoting
* @date 2018 年 9 月 27 日
*/
@Slf4j
@ClientEndpoint
public class MailWebSocketClient {
private static RedisCacheUtil redis;
protected void setRedis(RedisCacheUtil redisTmp) {
redis = redisTmp;
}
/**
* @author caoting
* @date 2019 年 3 月 11 日
*/
public static void doSomething() {
// TODO 由于这个类没有写初始化方法,但是有些初始化操作必须完成,
// 因此在 socket 配置类中调用此方法可以完成一些需要初始化注入的操作
}
private Session session;
@OnOpen
public void open(Session session) {
log.info(“ 连接开启 …”);
this.session = session;
}
@OnMessage
public void onMessage(String message) {
log.info(“ 来自服务端的消息: ” + message);
// TODO 对消息进行过滤判断处理
// 不做过多操作影响性能 直接交给异步任务处理 – 这个办法还是比较 low 的现在 springboot 有更好的解决办法 @Async 有空再记录下多线程异步处理任务调度的相关代码。
ExecutorService executor = Executors.newSingleThreadExecutor();
FutureTask<Boolean> future = new FutureTask<Boolean>(new Callable<Boolean>() {// 使用 Callable 接口作为构造参数
public Boolean call() {
return pushMsg(message);
}
});
executor.execute(future);
Boolean res = CommomUtil.timeOutTask(future, executor, 600);
if (res != null && res)
log.info(“ 操作成功 ”);
else
log.info(“ 操作失败 ”);
}
/**
* @author caoting
* @date 2019 年 3 月 11 日
*/
private Boolean pushMailMsg(String message) {
Boolean flag = true;
// 推送消息
ReceiverRes resObj = new ReceiverRes();
try {
resObj = restTemplate.httpPostMediaTypeJson(url, ReceiverRes.class, message);
} catch (Exception e) {
// 这里异常一般是 http 接口服务宕机了,所以放进缓存在对方上线时进行重新推送
resObj.setCode(500);
log.error(e.getMessage(), e);
}
// ==== 推送完成后的后续异常检查与数据重发工作 这里是一个 redis 任务调度 处理失败任务的典型案例 看不懂就删掉
Integer code = resObj.getCode();
if (code == 500) {
// 发送失败存进 redis 缓存 按照约定好的状态码进行判断
jedis.lpush(Constant.PUSH_ERROR, mailMapJson);
} else {
// 发送成功以后查询以前出错的数据进行重新推送。– 这种办法只适合消息很频繁的,毕竟不频繁的等下次发消息又不知道是何时了,因此需要采用别的方法
while (true) {
// 查询以往的异常发送数据 重新发送
String jsonMap = jedis.rpoplpush(Constant.PUSH_ERROR, Constant.PUSH_ERROR_TMP);
if (StringUtils.isEmpty(jsonMap)) {
break;
}
try {
errObj = restTemplate.httpPostMediaTypeJson(receiverUrl, ReceiverRes.class, message);
} catch (Exception e) {
errObj.setCode(500);
log.error(e.getMessage(), e);
}
if (errObj.getCode() == 500) {
// 再次失败 弹回原队列
jedis.rpoplpush(Constant.PUSH_ERROR_TMP, Constant.PUSH_ERROR);
} else {
jedis.rpop(Constant.PUSH_ERROR_TMP);
}
}
}
return flag;
}
@OnClose
public void onClose() {
log.info(“ 长连接关闭 …”);
}
@OnError
public void onError(Session session, Throwable t) {
t.printStackTrace();
}
public void send(String message) {
this.session.getAsyncRemote().sendText(message);
}
public void close() throws IOException {
if (this.session.isOpen()) {
this.session.close();
}
}
}
上面是 websocket 客户端的代码。其中主要有:1、http 推送失败重发机制 2、redis 任务调度经典案例
websocket 客户端配置类 WebSocketConfig
import com.hnpolice.business.service.ApplicationService;
import com.hnpolice.sso.common.ex.BaseException;
import com.hnpolice.sso.common.utils.RedisCacheUtil;
import com.hnpolice.sync.RestTemplateFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import javax.websocket.ContainerProvider;
import javax.websocket.WebSocketContainer;
import java.net.URI;
@Slf4j
@Component
public class WebSocketConfig implements ApplicationRunner {
@Autowired
private RedisCacheUtil redisTmp;
private static Boolean isOk;
private MailWebSocketClient client;
private static WebSocketContainer conmtainer = ContainerProvider.getWebSocketContainer();
@Override
public void run(ApplicationArguments args) throws Exception {
// 跟随项目启动的方法可以在这里做一些初始化工作
// websocket 客户端初始化
wsClientInit();
}
public void wsClientInit() {
try {
client = new MailWebSocketClient();
client.setRedis(redisTmp);
MailWebSocketClient.dosomething();
conmtainer.connectToServer(client, new URI(##socket 服务连接地址 ##));
isOk = true;
} catch (Exception e) {
isOk = false;
log.error(e);
}
// 断线重连
while (true) {
if (isOk != null && isOk) {
try {
client.send(“ping:”+appId);
} catch (Exception e) {
isOk = false;
}
}
else {
// 系统连接失败进行重试
log.warn(“ 系统连接失败,正在重连 …”);
try {
client.send(“ping:”+appId);
log.warn(“ 系统重连成功!”);
isOk = true;
} catch (Exception e) {
try {
client = new MailWebSocketClient();
conmtainer.connectToServer(client, new URI(mailUrl));
isOk = true;
} catch (Exception e1) {
isOk = false;
}
if (isOk != null && isOk) {
log.warn(“ 系统重连成功!”);
}
}
}
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
log.error(BaseException.collectExceptionStackMsg(e));
e.printStackTrace();
}
}
}
}
这是 websocket 客户端的配置类,实现 ApplicationRunner 接口是为了在项目启动时完成一些初始化工作,并非必须。主要功能:1、协助 websocketCient 进行初始化,2、心跳包检测,断线自动重连
消息推送的第二种方式在下篇中再编写