关于springcloud:Spring-Cloud项目优化如何确保Redis延迟队列中数据能被正确消费

81次阅读

共计 9078 个字符,预计需要花费 23 分钟才能阅读完成。

前言:无论在哪个我的项目,应用提早队列都须要很明确你应用它的意义以及音讯执行的程序,并且你还须要思考如何确保数据可能正确被解决而不会失落,在进行梳理过程中我就发现了一个破绽会造成数据失落,所以在这里我独自写一篇文章来阐明一下这个破绽及优化策略,如果你有更好的优化策略欢送私信博主。如果你想要一个能够零碎学习的网站,那么我举荐的是牛客网,个人感觉用着还是不错的,页面很整洁,而且内容也很全面,语法练习,算法题练习,面试常识汇总等等都有,论坛也很沉闷,传送门链接:牛客刷题神器目录一:回顾梳理 1. 流程图实现后面的操作之后,应用提早队列定时提交文章性能就算实现了。在测试过程中我发现一个问题,尽管数据做了长久化解决,然而当每次生产工作之后数据库中该条数据也会随之被清理掉,这时候还会存在数据失落的危险。为什么这么说呢,咱们的定时公布是依照上面的流程图进行的:

2. 代码对应的代码如下:@Autowired
private WmNewsAutoScanServiceImpl wmNewsAutoScanService;

/**

 * 生产提早队列数据
 */

@Scheduled(fixedRate = 1000)
@Override
@SneakyThrows
public void scanNewsByTask() {

ResponseResult responseResult = scheduleClient.poll(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType(), TaskTypeEnum.NEWS_SCAN_TIME.getPriority());
if(responseResult.getCode().equals(200) && responseResult.getData() != null){log.info("文章审核 --- 生产工作执行 ---begin---");

    String json_str = JSON.toJSONString(responseResult.getData());
    Task task = JSON.parseObject(json_str, Task.class);
    byte[] parameters = task.getParameters();
    WmNews wmNews = ProtostuffUtil.deserialize(parameters, WmNews.class);
    System.out.println(wmNews.getId()+"-----------");
    wmNewsAutoScanService.autoScanWmNews(wmNews.getId());

    log.info("文章审核 --- 生产工作执行 ---end---");

}

}/**

 * 删除工作,更新日志
 * @param taskId
 * @param status
 * @return
 */
private Task UpdateDb(long taskId, int status) {
    Task task = null;
    try {
        //1. 删除工作
        log.info("删除数据库中的工作...");
        taskInfoMapper.deleteById(taskId);

        //2. 更新日志
        log.info("更新工作日志...");
        TaskinfoLogs taskinfoLogs = taskInfoLogsMapper.selectById(taskId);
        taskinfoLogs.setStatus(status);
        taskInfoLogsMapper.updateById(taskinfoLogs);

        //3. 设置返回值
        task = new Task();
        BeanUtils.copyProperties(taskinfoLogs,task);
        task.setExecuteTime(taskinfoLogs.getExecuteTime().getTime());
    } catch (BeansException e) {throw new RuntimeException(e);
    }

    return task;
}

/**
 * 生产工作
 * @param type  工作类型
 * @param priority 工作优先级
 * @return  Task
 */
@Override
public Task poll(int type, int priority) {
    Task task = null;
    try {
        String key = type + "_" + priority;

        String task_json = cacheService.lRightPop(ScheduleConstants.TOPIC + key);
        if(StringUtils.isNotBlank(task_json)) {task = JSON.parseObject(task_json,Task.class);
            // 更新数据库
            UpdateDb(task.getTaskId(),ScheduleConstants.EXECUTED);
        }
    } catch (Exception e) {e.printStackTrace();
        log.error("poll task exception");
    }
    return task;
} 能够看到在 redis 中获取数据之后便将数据从数据库中删除,这时候如果前面的审核流程呈现问题或者保留文章时候挪动端微服务呈现故障导致文章不能保留,而这时候数据库中及 redis 中的数据都删除了,这就造成了数据的失落。二:第一次优化 1. 优化策略首先我想到的优化策略是当检测到文章审核或者文章保留值挪动端有异样时候就将曾经出队列的数据从新放回队列并且在 5 分钟之后再进行生产直到生产胜利,流程图见下图:

2. 代码实现 WmAutoScanServiceImplpackage com.my.wemedia.service.impl;

@Slf4j
@Service
@Transactional
public class WmAutoScanServiceImpl implements WmAutoScanService {

@Autowired
private WmNewsService wmNewsService;
@Autowired
private TextDetection textDetection;
@Autowired
private ImageDetection imageDetection;

/**
 * 主动审核文章文本及图片
 * @param id
 */
@Override
@Async
public void AutoScanTextAndImage(Integer id) throws Exception {log.info("开始进行文章审核...");
    // Thread.sleep(300);    // 休眠 300 毫秒,以保障可能获取到数据库中的数据
    WmNews wmNews = wmNewsService.getById(id);
    if(wmNews == null) {throw new RuntimeException("WmAutoScanServiceImpl- 文章信息不存在");
    }

    if(wmNews.getStatus().equals(WmNews.Status.SUBMIT.getCode())) {
        // 两头步骤省略

        //4,审核胜利
        //4.1 保留文章
        log.info("检测到文章无违规内容");
        ResponseResult responseResult = saveAppArticle(wmNews);
        if(!responseResult.getCode().equals(200)) {throw new RuntimeException("AutoScanTextAndImage-- 检测失败");
        }

    }
}


/**
 * 保留 App 端文章数据
 * @param wmNews
 * @return
 */
@Override
public ResponseResult saveAppArticle(WmNews wmNews) {
    // 两头步骤省略

    //7. 保留 App 端文章
    log.info("异步调用保留文章至 App 端");
    ResponseResult responseResult = null;
    try {responseResult = iArticleClient.saveArticle(articleDto);
    } catch (Exception e) {responseResult = new ResponseResult(AppHttpCodeEnum.SERVER_ERROR.getCode(),"保留文章至 App 失败");
    }

    return responseResult;
}

}WmNewsTaskServiceImpl@Autowired

private WmAutoScanService wmAutoScanService;
@Autowired
private CacheService cacheService;
/**
 * 生产工作
 */
@Override
@Scheduled(fixedRate = 2000)  // 每两秒执行一次
public void scanNewsByTask() {ResponseResult responseResult = scheduleClient.poll(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType(), TaskTypeEnum.NEWS_SCAN_TIME.getPriority());
    if(responseResult.getCode().equals(200) && responseResult.getData() != null) {log.info("文章审核 --- 生产工作执行 ---begin");

        String json_str = JSON.toJSONString(responseResult.getData());
        Task task = JSON.parseObject(json_str,Task.class);
        byte[] parameters = task.getParameters();
        // 反序列化
        WmNews wmNews = ProtostuffUtil.deserialize(parameters, WmNews.class);

        try {wmAutoScanService.AutoScanTextAndImage(wmNews.getId());
        } catch (Exception e) {log.warn("审核失败,将于 5 分钟之后再次尝试");
            // 文章未能胜利审核,将数据退出 ZSet,5 分钟之后再从新尝试
            //1. 结构 key
            String key = task.getTaskType() + "_" + task.getPriority();
            //2. 获取 5 分钟之后的工夫
            Calendar calendar = Calendar.getInstance();
            calendar.add(Calendar.MINUTE,5);
            long nextScheduleTime = calendar.getTimeInMillis();

            //3. 将数据重新加入
            cacheService.zAdd(ScheduleConstants.FUTURE + key,JSON.toJSONString(task),nextScheduleTime);
            e.printStackTrace();}

        log.info("文章审核 --- 生产工作执行 ---end");
    }3. 遇到的问题这时候尽管做了优化,然而在理论运行时候 WmNewsTaskServiceImpl 中的 try-catch 并没有发挥作用,我没有开启挪动端微服务,在审核时候也抛出了异样,然而这时候 WmNewsTaskServiceImpl 中并没有 catch 到这个异样。这是为什么呢,通过断点发现在执行到 AutoScanTextAndImage 办法时候,程序便间接跳出了 catch 块,这时候便想到后面咱们还没有引入提早队列时候用的是异步办法来调用文本图片审核办法来进行审核,而且过后还呈现了在数据库中获取不到数据的问题,当初想起这就是因为采纳了异步调用才引起的。在这里也一样,因为在 AutoScanTextAndImage 办法后面增加了 @Async 注解,表明这是一个异步办法,这时候咱们在 WmNewsTaskServiceImpl 中调用该办法时候是异步执行的,这当然就捕捉不到抛出的异样,因须要将该办法的 @Async 注解去掉,改成同步调用。三:第二次优化 1. 问题引入问题一:解决完下面的问题,看着仿佛问题失去了解决,音讯没有正确被生产时候会被从新投递回去直到被正确生产,然而这时候还应该留神到另外一个问题。尽管音讯生产失败之后被从新投递了,然而这时候数据库中的数据曾经被删除掉了,如果 redis 服务器呈现了问题,这时候就算你采纳了重回队列策略数据还是永恒失落了,因为你的长久化解决在这时候曾经生效了。这时候能够思考失败之后将数据再存回数据库中,这样再次做了长久化解决,然而这样显然会造成不必要的 IO 操作。问题二:后面的做法是将审核的办法由异步改成了同步,这时候因为调用的是第三方的审核接口,有时候难免会因为网络等起因造成审核工夫很长,这时候如果采纳同步策略,就会造成长时间阻塞,影响用户体验,同时也会节约大量资源。2. 解决策略针对问题一,我认为在一开始对队列音讯进行生产时候就不应该立马删除数据库中的数据,而是等到最初确保音讯被正确处理之后再删除数据库中相应的数据。针对问题二,我认为不应该将审核办法由异步改成同步,然而这时候就会呈现后面提到的问题 ----catch 不到异样,这时候仿佛又回到了终点。试想,咱们应用这个异样捕捉的目标是什么?咱们的目标就是为了出现异常时候将音讯从新入队避免数据失落,这时候咱们无妨换一种策略,后面提到了咱们在确保音讯被正确生产之后再删除数据库中的数据,这不就曾经解决了问题了吗?咱们会定时同步数据库中的数据到 redis,这时候就算音讯在 redis 中失落了也没关系,只有数据库中的数据还在就行。流程图见下图:

3. 代码实现①在 tbug-headlines-feign-api 模块的 IScheduleClient 接口中增加如下内容:/**

 * 删除数据库中的工作,更新日志
 * @param taskId
 */
@DeleteMapping("/api/v1/task/delete/{taskId}/{status}")
void updateDb(@PathVariable("taskId") Long taskId, @PathVariable("status") Integer status);②在 package com.my.schedule.feign 中更新该接口的实现类 /**
 * 删除数据库中工作,更新日志
 * @param taskId
 */
@Override
@DeleteMapping("/api/v1/task/delete/{taskId}/{status}")
public void updateDb(@PathVariable("taskId")Long taskId, @PathVariable("status") Integer status) {taskService.UpdateDb(taskId,status);
}③在 TaskService 中减少 UpdateDb(long taskId, int status) 办法,将实现类中该办法权限设置为 publicTaskService/**
 * 删除数据库工作并更新日志
 * @param taskId
 * @param status
 * @return
 */
Task UpdateDb(long taskId, int status);Impl/**
 * 删除工作,更新日志
 * @param taskId
 * @param status
 * @return
 */
public Task UpdateDb(long taskId, int status) {
    Task task = null;
    try {
        //1. 删除工作
        log.info("删除数据库中的工作...");
        taskInfoMapper.deleteById(taskId);

        //2. 更新日志
        log.info("更新工作日志...");
        TaskinfoLogs taskinfoLogs = taskInfoLogsMapper.selectById(taskId);
        taskinfoLogs.setStatus(status);
        taskInfoLogsMapper.updateById(taskinfoLogs);

        //3. 设置返回值
        task = new Task();
        BeanUtils.copyProperties(taskinfoLogs,task);
        task.setExecuteTime(taskinfoLogs.getExecuteTime().getTime());
    } catch (BeansException e) {throw new RuntimeException(e);
    }

    return task;
}④批改 taskServiceImpl 中的 poll 办法 /**
 * 生产工作
 * @param type  工作类型
 * @param priority 工作优先级
 * @return  Task
 */
@Override
public Task poll(int type, int priority) {
    Task task = null;
    try {
        String key = type + "_" + priority;

        String task_json = cacheService.lRightPop(ScheduleConstants.TOPIC + key);
        if(StringUtils.isNotBlank(task_json)) {task = JSON.parseObject(task_json,Task.class);
            // 更新数据库   (摈弃该策略)
            // UpdateDb(task.getTaskId(),ScheduleConstants.EXECUTED);

            // 接口幂等性
            TaskinfoLogs taskinfoLogs = taskInfoLogsMapper.selectById(task.getTaskId());
            // 获取工作状态
            if(taskinfoLogs != null) {Integer status = taskinfoLogs.getStatus();
                if(ScheduleConstants.EXECUTED == status) {return null;}
            }
        }
    } catch (Exception e) {e.printStackTrace();
        log.error("poll task exception");
    }
    return task;
} 这里次要批改了两点:摈弃原来间接调用办法删除数据库中的工作的策略减少接口幂等性,如果该工作曾经被胜利执行然而并没有在数据库中删除该工作,那么第二次执行该工作时候如果判断到该工作曾经执行过则间接返回 null 不做解决。⑤批改主动审核办法 @Autowired
private IScheduleClient scheduleClient;

/**
 * 主动审核文章文本及图片
 * @param id
 */
@Override
@Async
public void AutoScanTextAndImage(Integer id,Long taskId) throws Exception {log.info("开始进行文章审核...");
    // Thread.sleep(300);    // 休眠 300 毫秒,以保障可能获取到数据库中的数据
    WmNews wmNews = wmNewsService.getById(id);
    if(wmNews == null) {throw new RuntimeException("WmAutoScanServiceImpl- 文章信息不存在");
    }

    if(wmNews.getStatus().equals(WmNews.Status.SUBMIT.getCode())) {
        //1. 提取文章文本及图片
        Map<String,Object> map = getTextAndImages(wmNews);

        //2. 检测文本
        //2.1 提取文本
        String content = ((StringBuilder) map.get("content")).toString();
        //2.2 自治理敏感词过滤
        boolean SHandleResult = handleSensitiveScan(content,wmNews);
        if(!SHandleResult) return;
        //2.3 调用腾讯云进行文本检测
        Boolean THandleResult = handleTextScan(content, wmNews);
        if(!THandleResult) return;

        //3. 检测图片
        //3.1 提取图片
        List<String> imageUrl = (List<String>) map.get("images");
        //3.2 调用腾讯云对图片进行检测
        Boolean IHandleResult = handleImageScan(imageUrl, wmNews);
        if(!IHandleResult) return;

        //4,审核胜利
        //4.1 保留文章
        log.info("检测到文章无违规内容");
        ResponseResult responseResult = saveAppArticle(wmNews);
        if(!responseResult.getCode().equals(200)) {throw new RuntimeException("AutoScanTextAndImage-- 检测失败");
        }

        //4.2 回填 article_id
        wmNews.setArticleId((Long) responseResult.getData());
        wmNews.setStatus(WmNews.Status.PUBLISHED.getCode());
        wmNews.setReason("审核胜利");
        wmNewsService.updateById(wmNews);

        // 删除数据库中的工作并更新日志
        scheduleClient.updateDb(taskId, ScheduleConstants.EXECUTED);
    }
} 至此对于提早工作的数据优化局部就实现了,后续有什么问题会再进行优化。

正文完
 0