之前我的项目中遇到的问题:须要对单日告警量超过四百万条的数据进行逐条剖析和逻辑解决,导致靠繁多过程跑数据基本跑不完,无奈满足零碎和业务要求,因而决定采纳开多线程形式实现大数据处理。
数据处理流程:
1、申明一个内存队列
2、从库中轮巡取数据放入内存队列
3、开多个线程逐条取内存队列中的数据,剖析后在库中对该条数据打上标识,下次不取了。
main办法程序入口
/** * */package cn.com.starit.main;import org.apache.log4j.Logger;import cn.com.starit.ge.persistence.cache.AllCacheRefresh;/** * ================================================= <br> * 工程:GessAlarmAnalysis <br> * 类名:Main <br> * 作者:xt<br> * 工夫:2019-10-08上午02:31:17<br> * 版本:Version 1.0 <br><br> * 形容:告警解析入口<br> * ================================================= <br> */public class Main { private static final Logger logger = Logger.getLogger(Main.class); private static Main instance = null; private Main(){} /** * 零碎入口 * @param args */ public static void main(String[] args) { logger.info("过程以后内存占用量(兆):" + (int)(Runtime.getRuntime().totalMemory()/(1048576))); Main.getInstance().sysInit(); logger.info("零碎初始化结束:过程以后内存占用量(兆):" + (int)(Runtime.getRuntime().totalMemory()/(1048576))); logger.info("过程最大内存:" + (int)(Runtime.getRuntime().maxMemory()/(1048576))); //从库中取出流动告警存入内存队列 new LoadGesBsmThread().start(); //告警解析过程 new alarmAnalysis(10).start(); } /** * 零碎初始化 */ public void sysInit() { // 加载静态数据 AllCacheRefresh.refreshAll(); } /** * @return the instance */ public static Main getInstance() { if(null == instance) instance = new Main(); return instance; }}
创立内存队列实例及办法
package cn.com.starit.ge.persistence.utils;import java.text.SimpleDateFormat;import java.util.Date;import java.util.LinkedList;import java.util.List;import java.util.concurrent.ConcurrentLinkedQueue;import org.apache.log4j.Logger;import cn.com.starit.ge.etec.model.GesAlarm;import cn.com.starit.ge.gzpt.dto.GesAlarmDto;/** * ================================================= <br> * 工程:GessServiceEOMSServer <br> * 类名:SafeWorkContainer <br> * 工夫:2014-9-15下午11:23:46 <br> * 版本:Version 1.0 <br><br> * 形容:TODO 形容该文件的作用 <br> * ================================================= <br> */public class SafeWorkContainer { private static final Logger logger = Logger.getLogger(SafeWorkContainer.class); private static SafeWorkContainer instance; SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); /** * @return the instance */ public synchronized static SafeWorkContainer getInstance() { if(null == instance) instance = new SafeWorkContainer(); return instance; } /** 告警队列 */ private ConcurrentLinkedQueue<GesAlarmDto> safeWorkQueue; private SafeWorkContainer(){ safeWorkQueue = new ConcurrentLinkedQueue<GesAlarmDto>(); } /** * 将GsmAlarm告警数据保留到内存队列 * @param safeWork */ public synchronized void putSafeWork(GesAlarmDto safeWork) { // 对象为null间接返回,ConcurrentLinkedQueue队列不容许增加null元素 if(null == safeWork) return; safeWorkQueue.offer(safeWork); logger.info("增加一条GesAlarm告警到内存队列胜利"); notifyAll(); } /** * 将GsmAlarm告警数据保留到内存队列 * @param safeWork */ public synchronized void putSafeWork(List<GesAlarmDto> safeWorkList) { // 对象为null间接返回,ConcurrentLinkedQueue队列不容许增加null元素 if(null == safeWorkList) return; safeWorkQueue.addAll(safeWorkList);// for(GesAlarm alarm:safeWorkList){// safeWorkQueue.offer(alarm);// } logger.info(String.format("增加%d条GesAlarm告警到内存队列胜利",safeWorkList.size())); notifyAll(); } /** * 获取GsmAlarm告警数据 * @return GesAlarm * @throws InterruptedException */ public synchronized GesAlarmDto getSafeWork() throws InterruptedException { GesAlarmDto safeWork; if(safeWorkQueue.isEmpty()) { logger.info("SafeWorkContainer 队列中没有GsmAlarm告警数据,本线程挂起"); wait(); } safeWork = safeWorkQueue.poll(); //logger.info("取到一条重保工单信息,mainSn=" + safeWork); return safeWork; } /** * 获取GsmAlarm告警数据 * @return GesAlarm * @throws InterruptedException */ public synchronized List<GesAlarmDto> getSafeWork(int cnt){ try{ if(safeWorkQueue.isEmpty()) { logger.info("SafeWorkContainer 队列中没有GsmAlarm告警数据,本线程挂起 "); wait(); } List<GesAlarmDto> safeWorkList = new LinkedList<GesAlarmDto>(); for(int i=0;i<cnt;i++){ GesAlarmDto safeWork = safeWorkQueue.poll(); if(safeWork == null) break; safeWorkList.add(safeWork); } logger.info(String.format("取到%dGesAlarm告警信息", safeWorkList.size()));// logger.info(String.format("从库中取告警放入待剖析队列工夫:", formatter.format(new Date()))); return safeWorkList; }catch(Exception e){ logger.info(e.getMessage()); e.printStackTrace(); } return new LinkedList<GesAlarmDto>(); } /** * @return 返回容器中信息数 */ public synchronized int size() { return safeWorkQueue.size(); }}
LoadGesBsmThread线程继承线程池
package cn.com.starit.main;import java.util.ArrayList;import java.util.Date;import java.util.HashMap;import java.util.List;import java.util.Map;import org.apache.log4j.Logger;import cn.com.starit.ge.etec.model.GesAlarm;import cn.com.starit.ge.gzpt.dto.GesAlarmDto;import cn.com.starit.ge.persistence.dao.ServiceProvider;import cn.com.starit.ge.persistence.utils.SafeWorkContainer;import cn.com.starit.ge.persistence.utils.Static;public class LoadGesBsmThread extends Thread { private static final Logger logger = Logger.getLogger(LoadGesBsmThread.class); private int flag = 1; private long maxId = 0L; public void run(){ Long maxId = 0L; while(true){ try{ int queueSize = SafeWorkContainer.getInstance().size(); logger.info("GesAlarm缓存队列告警数:"+queueSize); if(queueSize>1000){ Thread.sleep(1000); } if(maxId == 0L){ if(queueSize==0){ maxId = loadGesAlarm(maxId); logger.info("reset maxId=0"); }else{ Thread.sleep(1000); } }else { Long maxIdTemp = loadGesAlarm(maxId); if(maxIdTemp.longValue()!=0){ if(maxIdTemp-maxId<100){ loadGesAlarm(maxIdTemp); maxId=0L; }else maxId = maxIdTemp; } } try{ Thread.sleep(500*flag); }catch(Exception e){ logger.error("Thread.sleep异样:"+e.getMessage()); } }catch(Exception e){ logger.error("LoadGesActiveThread异样:"+e.getMessage()); } } } private Long loadGesAlarm(Long maxAlarmId){ List<GesAlarmDto> list = ServiceProvider.getGesAlarmDao().getGesAlarm(maxAlarmId); Long maxId = 0L; for(GesAlarmDto alarm:list){ if(alarm.getAlarmid().longValue()>maxId.longValue()){ maxId = alarm.getAlarmid(); } } SafeWorkContainer.getInstance().putSafeWork(list); logger.info(String.format("load %d GesAlarm到队列",list.size())); if(list.size()<10){ flag = 4; }else if(list.size()<50){ flag = 3; }else if(list.size()<100){ flag=2; }else{ flag = 1; } return maxId; }}
将内存队列中的数据取出,多线程形式逐条解析成告警对象
package cn.com.starit.main;import java.lang.reflect.Field;import java.lang.reflect.Method;import java.text.SimpleDateFormat;import java.util.ArrayList;import java.util.Date;import java.util.HashMap;import java.util.Iterator;import java.util.List;import java.util.Map;import java.util.regex.Matcher;import java.util.regex.Pattern;import org.apache.commons.lang.StringUtils;import org.apache.log4j.Logger;import org.dom4j.Document;import org.dom4j.DocumentHelper;import org.dom4j.Element;import org.springframework.beans.BeanUtils;import com.sun.jersey.api.client.Client;import com.sun.jersey.api.client.ClientResponse;import com.sun.jersey.api.client.WebResource;import cn.com.starit.ge.etec.model.AzGesBsmAlarm;import cn.com.starit.ge.etec.model.GesAlarm;import cn.com.starit.ge.etec.model.GesAlarmClearCache;import cn.com.starit.ge.etec.model.GesAlarmHis;import cn.com.starit.ge.etec.model.GesBsmAlarm;import cn.com.starit.ge.etec.model.GesBsmAlarmCirRel;import cn.com.starit.ge.etec.model.GesBsmAlarmCirRelId;import cn.com.starit.ge.etec.model.GesBsmAlarmClear;import cn.com.starit.ge.etec.model.GesBsmAlarmHis;import cn.com.starit.ge.etec.model.GesMonitorConfig2;import cn.com.starit.ge.etec.model.ShortmessageInfo;import cn.com.starit.ge.gzpt.dto.GesAlarmDto;import cn.com.starit.ge.persistence.busi.AffectCustAnalysis;import cn.com.starit.ge.persistence.cache.GesAlarmStrategyTypeDefCache;import cn.com.starit.ge.persistence.dao.ServiceProvider;import cn.com.starit.ge.persistence.dto.AffectCustDto;import cn.com.starit.ge.persistence.dto.AlarmLevelDto;import cn.com.starit.ge.persistence.dto.CustCircuitDto;import cn.com.starit.ge.persistence.dto.DZWYDto;import cn.com.starit.ge.persistence.utils.AlarmObjConvert;import cn.com.starit.ge.persistence.utils.PropertiesLoadUtil;import cn.com.starit.ge.persistence.utils.SafeWorkContainer;import cn.com.starit.ge.persistence.utils.Static;import cn.com.starit.ge.gzpt.service.WarRoomService;/** * ================================================= <br> * 工程:GessAlarmAnalysis <br> * 类名:alarmAnalysis <br> * 工夫:2019-8-12下午01:13:10<br> * 版本:Version 1.0 <br> <br> * 形容:告警解析过程,将原始告警解析成咱们零碎外部的告警对象<br> * ================================================= <br> */public class alarmAnalysis{ private static accessAndKeepOrder accessAndKeepOrder; private static WarRoomService warRoomService; private static final Logger logger = Logger.getLogger(alarmAnalysis.class); private List<AnalysisThread> threadList = new ArrayList<AnalysisThread>(); public alarmAnalysis(int n){ for(int i=0;i<n;i++){ threadList.add(new AnalysisThread("AnalysisThread_"+i)); } } public void start(){ logger.info("告警解析开启多线程,线程数为"+threadList.size()+"...."); if(threadList.size()>0){ for(AnalysisThread at:threadList){ at.start(); } } } class AnalysisThread extends Thread{ public AnalysisThread(String name) { super(name); } public void run(){ logger.info("告警解析线程"+this.getName()+"启动....."); while(true){ List<GesAlarmDto> list = SafeWorkContainer.getInstance().getSafeWork(50); logger.info(String.format("取%dGesAlarmDto告警解决", list.size())); if(null != list && list.size() > 0) { for(int i=0;i<list.size();i++){ GesAlarmDto gesAlarmDto = list.get(i); try{ accessAndKeepOrder=new accessAndKeepOrder(); accessAndKeepOrder.run(gesAlarmDto); accessAndKeepOrder=null; //作战室严障调用 warRoomService=new WarRoomService(); warRoomService.run(gesAlarmDto); warRoomService=null; }catch(Exception e){ logger.error("剖析告警出错!告警流水号为:"+gesAlarmDto.getSrcalarmId(),e); } } // end for } // end first if else { logger.warn("未取到原始告警信息,休眠1秒 (—_—)~zZ"); try { Thread.sleep(1000); } catch (InterruptedException e) { logger.error(e.getMessage(), e); } } } // end while } } /* * */ private Map<String,String> analyticalPostition(String alarmLocate,long alarmProftype){ String rack=null, shelf=null, slot=null, port=null, ontid=null; Map<String,String> map = new HashMap<String,String>(); if(2 == alarmProftype) {//传输业余告警 if(null != alarmLocate && !"".equals(alarmLocate)&&!"null".equals(alarmLocate)) { String[] locArry = alarmLocate.split("/"); Map<String, String> locMap = new HashMap<String, String>(); for(String loc : locArry) { if(null != loc && !"".equals(loc)) { if(loc.split("=").length>=2){ String k = loc.split("=")[0].toLowerCase(); String v = loc.split("=")[1]; locMap.put(k, v); } } } rack = locMap.get("rack"); shelf = locMap.get("shelf"); slot = locMap.get("slot"); // 端口, 有port 和 PhysicalPort ,不排除第三种 for(Iterator<String> it=locMap.keySet().iterator(); it.hasNext();) { String k = it.next(); if(null != k && k.contains("port")) { port = locMap.get(k); } } }// end second if }// end first if ///数据网管告警,格局 0-0-2-2:4 else if(1 == alarmProftype) {//数据业余 if(null != alarmLocate && !"".equals(alarmLocate)) { String loc = alarmLocate.split(":")[0]; String[] locArry = loc.split("-"); if(locArry.length>0){ rack = locArry[0]; } if(locArry.length>1){ shelf = locArry[1]; } if(locArry.length>2){ slot = locArry[2]; } if(locArry.length>3){ port = locArry[3]; } if(alarmLocate.split(":").length>1) ontid = alarmLocate.split(":")[1]; } } map.put("rack", rack); map.put("shelf", shelf); map.put("slot", slot); map.put("port", port); map.put("ontid", ontid); return map; } /** * 将告警与电路关系保留到Ges_bsm_alarm_cir_rel表中 * @param gesBsmAlarmId,告警保留到GES_BSM_ALARM表后,主动生成的ID * @param cirId,电路ID */ public static void saveGesBsmAlarmCirRel(Long gesBsmAlarmId,String circuitIds){ if(null!=circuitIds&&gesBsmAlarmId!=null){ try{ String[] ids = circuitIds.split(","); for(String id:ids){ GesBsmAlarmCirRelId relId = new GesBsmAlarmCirRelId(gesBsmAlarmId,Long.parseLong(id)); GesBsmAlarmCirRel rel = new GesBsmAlarmCirRel(relId); ServiceProvider.getGesBsmAlarmCirRelDao().save(rel); } logger.debug(String.format("REL告警与电路关系入库胜利,共%d条!", ids.length)); }catch(Exception e){ logger.debug(String.format("REL告警与电路关系入库异样!%s",e.getMessage())); e.printStackTrace(); } } } public static void removeGesBsmAlarmCirRel(Long gesBsmAlarmId,String circuitIds){ if(gesBsmAlarmId!=null&&circuitIds!=null){ logger.debug(String.format("REL筹备删除告警与电路关系:[gesBsmAlarmId:%s,gesBsmAlarmCircuitIds:%s]", gesBsmAlarmId,circuitIds)); try{ String[] ids = circuitIds.split(","); for(String id:ids){ GesBsmAlarmCirRelId relId = new GesBsmAlarmCirRelId(gesBsmAlarmId,Long.parseLong(id));//存储告警与电路关联的类,属性有告警ID和关联的电路编码ID GesBsmAlarmCirRel rel = new GesBsmAlarmCirRel(relId); ServiceProvider.getGesBsmAlarmCirRelDao().delete(rel); } logger.debug(String.format("REL告警与电路关系删除胜利,共%d条!", ids.length)); }catch(Exception e){ logger.debug(String.format("REL告警与电路关系删除异常!%s",e.getMessage())); e.printStackTrace(); } } } public void clearAlaarmSendShortMsg(GesBsmAlarm gesBsmAlarm){ List<CustCircuitDto> ccList = ServiceProvider.getGesAlarmDao().getCustCircuitByDevName(gesBsmAlarm.getDeviceName().trim()); AffectCustDto affectCustDto = new AffectCustDto(); affectCustDto.setMmeeids(gesBsmAlarm.getCustomCode()); affectCustDto.setCircuitIds(gesBsmAlarm.getCircuitIds()); affectCustDto.setCirCodes(gesBsmAlarm.getCirCodes()); } public void convertAreaId(String srcAreaId){ } /** * 向告警影响的客户发送短信 * @param gesAlarm 告警信息 * @param ccList 影响的客户及电路 * @param strategyType 影响的客户及电路 * @param affectCustDto 封装的与告警相关联的电路 路由信息和客户信息 */ public void newAlarmSendShortMsg(GesAlarm gesAlarm, AffectCustDto affectCustDto,List<CustCircuitDto> ccList,Integer strategyType){ try{ List<GesMonitorConfig2> configList = ServiceProvider.getGesMonitorConfig2Dao().queryConfig(affectCustDto.getMmeeids());//查询数据库中须要进行短信告诉的客户 //<联系人电话,<客户名称,电路编码>MAP Map<String,Map<String,StringBuffer>> customerMsgMap = new HashMap<String,Map<String,StringBuffer>>();//电话号码,<客户名称,电路编码> Map<String,Map<String,StringBuffer>> managerMsgMap = new HashMap<String,Map<String,StringBuffer>>();//电话号码,电路编码 Map<String,Map<String,StringBuffer>> engineerMsgMap = new HashMap<String,Map<String,StringBuffer>>();//电话号码,电路编码 if(null!=configList&&configList.size()>0){ Map<String,CustCircuitDto> cirMap = new HashMap<String,CustCircuitDto>(); Map<String,CustCircuitDto> telMap = new HashMap<String,CustCircuitDto>(); for(CustCircuitDto dto:ccList){ if(null!=dto.getCircuitId()){ cirMap.put(dto.getCircuitId().toString(), dto); } if(null!=dto.getCustomerTel()){ telMap.put(dto.getCustomerTel(), dto); } if(null!=dto.getManagerTel()){ telMap.put(dto.getManagerTel(), dto); } if(null!=dto.getEngineerTel()){ telMap.put(dto.getEngineerTel(), dto); } } String custIds = affectCustDto.getMmeeids();//客户ID String cirIds = affectCustDto.getCircuitIds();//电路编码 for(GesMonitorConfig2 config:configList){ String noticeUsers = config.getNoticeUsers();//告诉用户类型 //告诉对象标记 boolean customerNotice = false; boolean managerNotice = false; boolean engineerNotice = false; if(null==noticeUsers||"".equals(noticeUsers)){ continue; } noticeUsers += ","; if(noticeUsers.contains("1,")){//告诉客户联系人 customerNotice=true; } if(noticeUsers.contains("2,")){//告诉客户经理 managerNotice=true; } if(noticeUsers.contains("3,")){//告诉客户工程师 engineerNotice=true; } //判断告警影响的客户是否是被动监控客户 if(null!=affectCustDto.getMmeeids()&&affectCustDto.getMmeeids().contains(config.getCustId().toString())){ //分隔被动监控客户的电路ID,逐个解决 String[] cis = config.getCircuitIds().split(","); if(affectCustDto.getCircuitIds()!=null&&cis!=null){ //为防止长ID蕴含短ID,如32123与212,在ID后加逗号后进行比对 String cirIdsTemp = affectCustDto.getCircuitIds()+",";//告警影响的电路ID for(String str:cis){ if(null!=str&&!"".equals(str)&&cirIdsTemp.contains(str+",")){ //告警影响了被动监控电路 //依据电路ID,取客户及电路信息 CustCircuitDto d = cirMap.get(str); String cirCode = d.getCircode();//取电路编码 if(customerNotice&&null!=d.getCustomerTel()){ Map<String,StringBuffer> tempMap = customerMsgMap.get(d.getCustomerTel()); if(null==tempMap){ tempMap = new HashMap<String,StringBuffer>(); } if(null!=d.getCustName()&&!"".equals(d.getCustName())){ StringBuffer sb = tempMap.get(d.getCustName()); if(null==sb){ sb = new StringBuffer(512); } sb.append(cirCode).append(","); tempMap.put(d.getCustName(), sb); customerMsgMap.put(d.getCustomerTel(), tempMap); } } if(managerNotice&&null!=d.getManagerTel()){ Map<String,StringBuffer> tempMap = managerMsgMap.get(d.getManagerTel()); if(null==tempMap){ tempMap = new HashMap<String,StringBuffer>(); } if(null!=d.getCustName()&&!"".equals(d.getCustName())){ StringBuffer sb = tempMap.get(d.getCustName()); if(null==sb){ sb = new StringBuffer(512); } sb.append(cirCode).append(","); tempMap.put(d.getCustName(), sb); managerMsgMap.put(d.getManagerTel(), tempMap); } } if(engineerNotice&&null!=d.getEngineerTel()){ Map<String,StringBuffer> tempMap = engineerMsgMap.get(d.getEngineerTel()); if(null==tempMap){ tempMap = new HashMap<String,StringBuffer>(); } if(null!=d.getCustName()&&!"".equals(d.getCustName())){ StringBuffer sb = tempMap.get(d.getCustName()); if(null==sb){ sb = new StringBuffer(512); } sb.append(cirCode).append(","); tempMap.put(d.getCustName(), sb); engineerMsgMap.put(d.getEngineerTel(), tempMap); } } } } } } } //发送短信 int ppFlag = 0; SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss"); if(null!=strategyType){ ppFlag = strategyType.intValue(); } //客户联系人 for(String tel:customerMsgMap.keySet()){ Map<String,StringBuffer> m = customerMsgMap.get(tel); for(String custName:m.keySet()){ StringBuffer sb = m.get(custName); if(null==custName){ custName = ""; } if(null!=sb&&sb.length()>0){ StringBuffer msgSb = new StringBuffer(512); msgSb.append("尊敬的") .append(custName) .append("客户您好!于") .append(null==gesAlarm.getNealarmtime()?"":sdf.format(gesAlarm.getNealarmtime())) .append("工夫【").append(gesAlarm.getReserve3())//告警设施 .append("】设施产生告警,可能会影响您的") .append(sb.substring(0, sb.length()-1)) .append("等电路的失常应用,咱们正在修复故障,若给你带来不便,敬请体谅!【中国电信-大客户网管零碎】"); if(2==ppFlag){ msgSb.append(gesAlarm.getAlarmname()).append("告警被屏蔽!"); } logger.info(String.format("向客户联系人%s(%s)发送短信:%s",custName,tel, msgSb.toString())); sendShortMsg(tel,msgSb.toString(),0); } } } //客户经理 for(String tel:managerMsgMap.keySet()){ Map<String,StringBuffer> m = managerMsgMap.get(tel); CustCircuitDto d = telMap.get(tel); String name = d.getManager(); if(null==name){ name = ""; } StringBuffer subSb = new StringBuffer(512); for(String custName:m.keySet()){ StringBuffer sb = m.get(custName); if(null==custName){ custName = ""; } if(null!=sb&&sb.length()>1){ if(subSb.length()>1){ subSb.append(","); } subSb.append("【").append(custName).append(":") .append(sb.substring(0, sb.length()-1)).append("】"); } } if(subSb.length()>1){ StringBuffer msgSb = new StringBuffer(512); msgSb.append("尊敬的") .append(name) .append("您好!于") .append(null==gesAlarm.getNealarmtime()?"":sdf.format(gesAlarm.getNealarmtime())) .append("工夫【").append(gesAlarm.getReserve3())//告警设施 .append("】设施产生告警,可能会影响您的以下客户业务") .append(subSb.toString()) .append(",请留神查看故障修复停顿。【中国电信-大客户网管零碎】"); if(2==ppFlag){ msgSb.append(gesAlarm.getAlarmname()).append("告警被屏蔽!"); } sendShortMsg(tel,msgSb.toString(),0); logger.info(String.format("向客户经理%s(%s)发送短信:%s",name,tel, msgSb.toString())); } } //客户工程师 for(String tel:engineerMsgMap.keySet()){ Map<String,StringBuffer> m = engineerMsgMap.get(tel); CustCircuitDto d = telMap.get(tel); String name = d.getEngineer(); if(null==name){ name = ""; } StringBuffer subSb = new StringBuffer(512); for(String custName:m.keySet()){ StringBuffer sb = m.get(custName); if(null==custName){ custName = ""; } if(null!=sb&&sb.length()>1){ if(subSb.length()>1){ subSb.append(","); } subSb.append("【").append(custName).append(":") .append(sb.substring(0, sb.length()-1)).append("】"); } } if(subSb.length()>1){ StringBuffer msgSb = new StringBuffer(512); msgSb.append("尊敬的") .append(name) .append("客户您好!于") .append(null==gesAlarm.getNealarmtime()?"":sdf.format(gesAlarm.getNealarmtime())) .append("工夫【").append(gesAlarm.getReserve3())//告警设施 .append("】设施产生通信故障,可能会影响您的以下客户业务") .append(subSb.toString()) .append(",请留神查看故障修复停顿。【中国电信-大客户网管零碎】"); if(2==ppFlag){ msgSb.append(gesAlarm.getAlarmname()).append("告警被屏蔽!"); } sendShortMsg(tel,msgSb.toString(),0); logger.info(String.format("向客户工程师%s(%s)发送短信:%s",name,tel, msgSb.toString())); } } } }catch(Exception e){ logger.info("剖析告警推送对象异样:"+e.getMessage()); e.printStackTrace(); } } /** * 比照AZ端的端口信息是否正确 * @param gesAlarm * @return */ public boolean compareAZinfo(GesAlarm gesAlarm,Map<String,String> mapString){ String circuitIds = ""; String cirCodes = ""; String alarmNet = gesAlarm.getReserve3();//告警设施 String alarmLocate = gesAlarm.getReserve5(); // 故障地位 long alarmProftype = gesAlarm.getAlarmproftype();//示意传输业余还是数据业余的告警 if(alarmNet!= null){ List<String> azInfos = Static.cirCodeAzInfoMap.get(alarmNet); if(azInfos!=null){ boolean ifcompare = false; Map<String,String> map = analyticalPostition(alarmLocate,alarmProftype);//告警的AZ端口信息 String rack = map.get("rack"); String shelf = map.get("shelf"); String slot = map.get("slot"); String port = map.get("port"); for(String azInfo:azInfos){ System.out.print(">>>>>>"+azInfo+"\n"); boolean portCompare = true;//如果定制了端口,则用来比照端口信息 Map<String,String> aZmap = analyticalPostition(azInfo.split("//")[0],2);//对应的AZ端口的信息 String azRack = aZmap.get("rack"); String azShelf = aZmap.get("shelf"); String azSlot = aZmap.get("slot"); String azPort = aZmap.get("port"); if(!isEmptyIgnoreCase(azRack)){ if(rack!=null){ if(!azRack.trim().equals(rack.trim())){ portCompare=false; } }else{ portCompare=false; } } if(!isEmptyIgnoreCase(azShelf)){ if(shelf!=null){ if(!azShelf.trim().equals(shelf.trim())){ portCompare=false; } }else{ portCompare=false; } } if(!isEmptyIgnoreCase(azSlot)){ if(slot!=null){ if(!azSlot.trim().equals(slot.trim())){ portCompare=false; } }else{ portCompare=false; } } if(!isEmptyIgnoreCase(azPort)){ if(port!=null){ if(!azPort.trim().equals(port.trim())){ portCompare=false; } }else{ portCompare=false; } } if(portCompare){//端口雷同 if(azInfo.split("//").length>2){ circuitIds += azInfo.split("//")[1]+",";//电路ID cirCodes += azInfo.split("//")[2]+",";//电路编码 System.out.print(">>>0"+circuitIds+">>>0"+cirCodes+"\n"); } ifcompare= true; } } mapString.put("prepareCircuitIds", circuitIds); mapString.put("prepareCirCodes", cirCodes); if(ifcompare){//如果有一个匹配上的话,就返回真 return true; } } } return false; } /** * 定制网元产生告警后,发送短信给指定人员 * @param tel * @param msg */ public void customizedNeSendMsg(GesAlarm gesAlarm,Integer strategyType){ SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss"); String alarmNet = gesAlarm.getReserve3();//告警设施 String alarmLocate = gesAlarm.getReserve5(); // 故障地位 List<DZWYDto> dzwyList = new ArrayList<DZWYDto>(); DZWYDto dzwy = new DZWYDto(); long alarmProftype = gesAlarm.getAlarmproftype();//示意传输业余还是数据业余的告警 int ppFlag = 0;//用来判断该告警是否能够被屏蔽 if(null!=strategyType){ ppFlag = strategyType.intValue(); } if(alarmNet!= null){ AlarmLevelDto alarmlev=null; String levName=null; List<AlarmLevelDto> alarmLLevels=ServiceProvider.getGesAlarmDao().getCirAlarmLev(gesAlarm.getAlarmname());//获取告警内容级别 if(alarmLLevels!=null&&alarmLLevels.size()>0){ alarmlev=alarmLLevels.get(0); } if(alarmlev!=null&&alarmlev.getIsA()!=null&&alarmlev.getIsA().equals("1")){ levName="业务中断"; } else if(alarmlev!=null&&alarmlev.getIsB()!=null&&alarmlev.getIsB().equals("1")){ levName="性能劣化"; } else if(alarmlev!=null&&alarmlev.getIsC()!=null&&alarmlev.getIsC().equals("1")){ levName="危险减少"; }else levName=""; dzwyList = ServiceProvider.getGesAlarmDao().queryGESWY(alarmNet,alarmLocate); if(dzwyList.size()>0){ dzwy = dzwyList.get(0); } List<String> costomizes = Static.customizedMap.get(gesAlarm.getReserve3()); if(costomizes!=null){ Map<String,String> map = analyticalPostition(alarmLocate,alarmProftype); String rack = map.get("rack"); String shelf = map.get("shelf"); String slot = map.get("slot"); String port = map.get("port"); for(String costomized:costomizes){ boolean portCompare = true;//如果定制了端口,则用来比照端口信息 String[] strs = costomized.split("_"); String positions = "第"; String cRack = strs[0]; String cShelf = strs[1]; String cSlot = strs[2]; String cPort = strs[3]; String cNoticePeople = strs[4]; String cNoticeTelNum = strs[5]; if(!isEmptyIgnoreCase(cRack)){ positions +=cRack+"架"; if(rack!=null){ if(!cRack.trim().equals(rack.trim())){ portCompare=false; } }else{ portCompare=false; } } if(!isEmptyIgnoreCase(cShelf)){ positions +=cShelf+"框"; if(shelf!=null){ if(!cShelf.trim().equals(shelf.trim())){ portCompare=false; } }else{ portCompare=false; } } if(!isEmptyIgnoreCase(cSlot)){ positions +=cSlot+"槽"; if(slot!=null){ if(!cSlot.trim().equals(slot.trim())){ portCompare=false; } }else{ portCompare=false; } } if(!isEmptyIgnoreCase(cPort)){ positions +=cPort+"口"; if(port!=null){ if(!cPort.trim().equals(port.trim())){ portCompare=false; } }else{ portCompare=false; } } logger.info("1>>>>》》》》"+costomized+"》》"+alarmLocate+">>>"+portCompare+"\n"); if(portCompare&&ppFlag!=2){ logger.info("2>>>>》》》》"+costomized+"》》"+alarmLocate+">>>"+portCompare+"\n"); if(positions.trim().equals("第")){ positions=""; } StringBuffer msgSb = new StringBuffer(512); /*msgSb.append("尊敬的") .append(cNoticePeople) .append("您好!于") .append(null==gesAlarm.getNealarmtime()?"":sdf.format(gesAlarm.getNealarmtime())) .append(",定制网元【").append(gesAlarm.getReserve3())//告警设施 .append(positions+"】产生告警"); if(!isEmptyIgnoreCase(gesAlarm.getAlarmname())){ msgSb.append("("+gesAlarm.getAlarmname()+")"); } msgSb.append(",请留神查看故障修复停顿。【中国电信-大客户网管零碎】");*/ msgSb.append("您好,") .append(dzwy.getCustName()) .append(",于") .append(null==gesAlarm.getNealarmtime()?"":sdf.format(gesAlarm.getNealarmtime())) .append("工夫,设施") .append(gesAlarm.getReserve3()) .append("产生告警,电路编码:") .append(dzwy.getCirCode()) .append(",告警形容:") .append(gesAlarm.getAlarmsummary()); if(!levName.equals("")){ msgSb.append(",产生影响:") .append(levName); } sendShortMsg(cNoticeTelNum,msgSb.toString(),1); logger.info(String.format("向客户经理%s(%s)发送短信:%s",cNoticePeople,cNoticeTelNum, msgSb.toString())); } } } } } /* * 漠视大小写的字符串非空比拟 */ public static boolean isEmptyIgnoreCase(String str){ if(str!=null&&!str.equalsIgnoreCase("null")&&!(str.trim()).equals("") &&!str.equalsIgnoreCase("undefined")){ return false; }else{ return true; } } /** * * @param tel 电话号码 * @param msg 短信内容 * @param type 0示意一般告警1示意定制网元的告警 */ public void sendShortMsg(String tel,String msg,int type){ System.out.print(">>>>>发送测试短信!");// msg+="【测试短信】"; Long ifSend = -1L; try{ if(type==0){ ifSend = Long.parseLong(PropertiesLoadUtil.getPro("ALARM_IF_SENDMSG")); System.out.print(">>>>>一般告警短信发送性能!发送类型》》"+ifSend); logger.info(">>>>>一般告警短信发送性能!发送类型》》"+ifSend); }else if(type==1){ logger.info(">>>>>>发现定制网元,发送短信!"); ifSend = Long.parseLong(PropertiesLoadUtil.getPro("CUSTOMIZED_IF_SENDMSG")); System.out.print(">>>>>定制网元告警短信发送性能!发送类型》》"+ifSend); logger.info(">>>>>定制网元告警短信发送性能!发送类型》》"+ifSend); }else if(type==2){//2019-07-20纳管电路告警 ifSend = Long.parseLong(PropertiesLoadUtil.getPro("CIRCUIT_ALARMS_SENDMSG")); System.out.print(">>>>>纳管电路告警短信发送性能!发送类型》》"+ifSend); } }catch(Exception e){ logger.info("从配置文件中读取短信发送配置信息失败!",e); } if(isEmptyIgnoreCase(tel)||isEmptyIgnoreCase(msg)){ return; } ShortmessageInfo msgInfo = new ShortmessageInfo(); msgInfo.setReceiveNo(tel);//接管短信的电话号码 //msgInfo.setMessage(msg+tel); msgInfo.setMessage(msg); //msgInfo.setSendFlag(0L); msgInfo.setSendFlag(ifSend);//测试用,临时设置为-1,短信模块不会发送状态为-1的短信。 msgInfo.setSendTime(new Date(System.currentTimeMillis())); ServiceProvider.getShortMessageInfoDao().save(msgInfo); } // public static void main(String[] args) {// System.out.println(">>>>>>>>>>"+isEmptyIgnoreCase(null)+"\n");// System.out.println(">>>>>>>>>>"+isEmptyIgnoreCase("nUll")+"\n");// System.out.println(">>>>>>>>>>"+isEmptyIgnoreCase("UNDERFINed")+"\n");// } /** * 应用反射机制将GesBsmAlarm对象映射至GesBsmAlarmClear<br> * 注:该办法的前提是GesBsmAlarmClear和GesBsmAlarm对象定义的get/set办法对统一<br> * @param gesBsmAlarm * @return */ public GesBsmAlarmClear transformBy(GesBsmAlarm gesBsmAlarm) { Field[] fields = gesBsmAlarm.getClass().getDeclaredFields(); GesBsmAlarmClear gesBsmAlarmClear = new GesBsmAlarmClear(); for(Field field : fields) { String fieldName = field.getName(); Class<?> type = field.getType(); logger.debug("type=" + type); fieldName = fieldName.substring(0, 1).toUpperCase() + fieldName.substring(1); Method m1, m2; String v1; try { m1 = gesBsmAlarm.getClass().getDeclaredMethod("get" + fieldName); v1 = (String)m1.invoke(gesBsmAlarm); m2 = gesBsmAlarmClear.getClass().getDeclaredMethod("set" + fieldName, type); m2.invoke(gesBsmAlarmClear, v1); } catch (Exception e) { logger.error(e.getMessage(), e); } } return gesBsmAlarmClear; } // /**// * 测试函数// * @param args// */// public static void main(String[] args) {// // 测试transformBy办法 --通过// GesBsmAlarm gesBsmAlarm = new GesBsmAlarm();// gesBsmAlarm.setAlarmName("这是一个测试");// GesBsmAlarmClear gesBsmAlarmClear = new alarmAnalysis().transformBy(gesBsmAlarm);// System.out.println(gesBsmAlarmClear.getAlarmName());// } /** * 预处理 * @param name * @param rack * @param shelf * @param slot * @param port * @param sx * @return */ public String pretreatment(String name,String rack,String shelf,String slot,String port,String sx){ return "信息不全";// List<RmResCircuitRoute2> routeAList = ServiceProvider.getGesAlarmDao().getRouteA(name,rack,shelf,slot,port,sx);// List<RmResCircuitRoute2> routeZList = ServiceProvider.getGesAlarmDao().getRouteZ(name,rack,shelf,slot,port,sx);// String ipA = null, ipB = null;// if(routeAList!=null&&routeAList.size()>0){// for(int i = 0;i<routeAList.size();i++){// if(routeAList.get(i).getAip()!=null){// ipA = routeAList.get(i).getAip();// break;// }// }// }// if(routeZList!=null&&routeZList.size()>0){// for(int i = 0;i<routeZList.size();i++){// if(routeZList.get(i).getZip()!=null){// ipB = routeZList.get(i).getZip();// break;// }// }// }// if(null != ipA && null != ipB) {// if (LocalPingUtil.PING_RESULT_SUCC == LocalPingUtil.timeoutPing(ipA) && LocalPingUtil.PING_RESULT_SUCC == LocalPingUtil.timeoutPing(ipB)) {// return "设施能失常Ping通";// } else if (LocalPingUtil.PING_RESULT_FAIL == LocalPingUtil.timeoutPing(ipA) && LocalPingUtil.PING_RESULT_SUCC == LocalPingUtil.timeoutPing(ipB)) {// return "设施" + ipA + "无奈Ping通";// } else if (LocalPingUtil.PING_RESULT_SUCC == LocalPingUtil.timeoutPing(ipA) && LocalPingUtil.PING_RESULT_FAIL == LocalPingUtil.timeoutPing(ipB)) {// return "设施" + ipB + "无奈Ping通";// } else {// return "设施" + ipA + "和设施" + ipB + "无奈Ping通";// }// }// else if (null != ipA) {// if (LocalPingUtil.PING_RESULT_SUCC == LocalPingUtil.timeoutPing(ipA)) {// return "设施能失常Ping通";// } else {// return "设施" + ipA + "无奈Ping通";// }// }// else if (null != ipB) {// if (LocalPingUtil.PING_RESULT_SUCC == LocalPingUtil.timeoutPing(ipB)) {// return "设施能失常Ping通";// } else {// return "设施" + ipB + "无奈Ping通";// }// } else {// return "设施IP未找到";// } } private static final String IP_REGEX = //IP地址的正则表达式 "(1\\d{2}|2[0-4]\\d|25[0-5]|[1-9]\\d|[1-9])\\." + "(1\\d{2}|2[0-4]\\d|25[0-5]|[1-9]\\d|\\d)\\." + "(1\\d{2}|2[0-4]\\d|25[0-5]|[1-9]\\d|\\d)\\." + "(1\\d{2}|2[0-4]\\d|25[0-5]|[1-9]\\d|\\d)"; //查看是否为视频专线所需告警(数据业余告警 & 华为ONU设施掉电告警 & 视频专线电路所对应IP/端口) private boolean isVideoAlarm(GesAlarm gesAlarm){ //1.判断业余 Long alarmProftype = gesAlarm.getAlarmproftype(); boolean isDataProftype = alarmProftype!=null && alarmProftype==1; //是否数据业余 if(!isDataProftype) return false; //2.判断告警名称 String alarmName = StringUtils.trim(gesAlarm.getAlarmname());// boolean isImportAlarm = "华为ONU设施掉电告警".equals(alarmName) || "华为ONU设施掉电告警复原".equals(alarmName); //是否为所关注的告警(ONU掉电告警,流动/革除) //boolean isImportAlarm = "华为ONU设施掉电告警".equals(alarmName) || "华为ONU设施掉电告警复原".equals(alarmName) || "华为分支断纤告警".equals(alarmName) || "华为分支断纤告警复原".equals(alarmName); //是否为所关注的告警(ONU掉电告警/断纤告警,流动/革除) boolean isImportAlarm = "华为ONU设施掉电告警".equals(alarmName) || "华为ONU设施掉电告警复原".equals(alarmName) || "华为分支断纤告警".equals(alarmName) || "华为分支断纤告警复原".equals(alarmName) || "中兴骨干断纤告警".equals(alarmName)|| "中兴骨干断纤告警复原".equals(alarmName)|| "中兴分支断纤告警".equals(alarmName) || "中兴分支断纤告警复原".equals(alarmName) ||"中兴ONU设施掉电告警".equals(alarmName)||"中兴ONU设施掉电告警复原".equals(alarmName); if(!isImportAlarm) return false; //3.判断是否属于以后现有的设施的告警 String oltIp = null, alarmDetail = gesAlarm.getAlarmdetail(); Matcher matcher = Pattern.compile("上联OLTIP:"+IP_REGEX).matcher(alarmDetail); if(matcher.find()) {//获取告警设施 ip oltIp = matcher.group().substring("上联OLTIP:".length()); } String rack=null, shelf=null, slot=null, port=null,ontid=null; // 架,框,槽,口 String alarmLocate = gesAlarm.getReserve5(); // 故障地位 Map<String,String> map = analyticalPostition(alarmLocate,alarmProftype); rack = map.get("rack"); shelf = map.get("shelf"); slot = map.get("slot"); port = map.get("port"); ontid = map.get("ontid"); String ponInfo = nvl(shelf)+"/"+nvl(slot)+"/"+nvl(port)+":"+nvl(ontid); boolean isFacility = ServiceProvider.getGesAlarmDao().impactVideoCir(oltIp, ponInfo); if(!isFacility){ return false; } //4.打印返回 logger.info(String.format("视频专线相干-华为ONU设施掉电告警[alarmId:%d,业余:%d,网管:%d,原始告警ID:%s]", gesAlarm.getAlarmid(),gesAlarm.getAlarmproftype(),gesAlarm.getSourceid(),gesAlarm.getSrcalarmId())); //5.更新对应电路的在线状态 this.reloadOnlineStatus(oltIp, ponInfo); return true; } private static void impactVideoAlarm(GesAlarm gesAlarm, String shelf, String slot, String port, String ontid){ //1.提取oltIp,ponInfo String oltIp = null, alarmDetail = gesAlarm.getAlarmdetail(); Matcher matcher = Pattern.compile("上联OLTIP:"+IP_REGEX).matcher(alarmDetail); if(matcher.find()) oltIp = matcher.group().substring("上联OLTIP:".length()); String ponInfo = nvl(shelf)+"/"+nvl(slot)+"/"+nvl(port)+":"+nvl(ontid); //2.保留关联关系 Long alarmproftype = gesAlarm.getAlarmproftype(); String emsId = gesAlarm.getSourceid()+""; String serial = gesAlarm.getSrcalarmId(); ServiceProvider.getGesAlarmDao().saveVideoAlarm(alarmproftype, emsId, serial, oltIp, ponInfo); } private static String nvl(String str){ return str==null?"":str; } //更新视频专线电路在线状态 public void reloadOnlineStatus(String oltIp, String ponInfo){ try { long id = ServiceProvider.getGesAlarmDao().queryCirId(oltIp, ponInfo); ReturnInfo returnInfo = this.queryVideoStatus(oltIp, ponInfo); String result = returnInfo.getMsg(); if(!returnInfo.isFlag()){ logger.info(String.format("[状态更新-失败]:{id:%d, errorInfo:%s}\n", id, result)); } Document dom = DocumentHelper.parseText(result.substring(result.indexOf("<root>"), result.indexOf("</root>")+7)); String resultCode = ((Element)dom.selectSingleNode("/root/msgHead/result")).attributeValue("code"); if("SUCCESS".equals(resultCode)){ String onlineStatus = ((Element)dom.selectSingleNode("/root/msgBody/block/fields/field[last()]/cusStatus")).attributeValue("status"); ServiceProvider.getGesAlarmDao().updateStatus(id, onlineStatus); logger.info(String.format("[状态更新-胜利]:{id:%d, onlineStatus:%s}\n", id, onlineStatus)); }else{ String errorInfo = ((Element)dom.selectSingleNode("/root/msgHead/result")).getText(); logger.info(String.format("[状态更新-失败]:{id:%d, errorInfo:响应谬误-%s}\n", id, errorInfo)); } } catch (Exception e) { logger.info("视频专线申请产生未知异样", e); } } private ReturnInfo queryVideoStatus(String ip, String ponInfo){ //1.结构报文 String result = null; int requestId = ServiceProvider.getGesAlarmDao().querySequence(); if(requestId <= 0){ return new ReturnInfo(false, "程序异样-序列化申请ID失败"); } String[] portAndOnt = ponInfo==null?null:ponInfo.split(":"); if(portAndOnt==null || portAndOnt.length!=2 || StringUtils.isEmpty(portAndOnt[0]) || StringUtils.isEmpty(portAndOnt[1])){ return new ReturnInfo(false, "数据异样-请查看PON口信息数据"); } String requestMsg = createReqMsg(requestId, ip, portAndOnt[0], portAndOnt[1]); //2.发送申请 try { Client client = Client.create(); WebResource resource = client.resource("http://135.224.253.28:8081/services/NEService"); logger.info("申请报文为"+requestMsg); ClientResponse resp = resource.entity(requestMsg.trim()).post(ClientResponse.class); result = resp.getEntity(String.class).replace("<", "<").replace(">", ">"); logger.info("响应报文为"+result+""); }catch(Exception e){ logger.info("失败信息为:\n"+result, e); return new ReturnInfo(false, "程序异样-申请IPOSS产生未知谬误"); } return new ReturnInfo(true, result); } //结构申请报文 private String createReqMsg(int requestId, String ip, String port, String ontid){ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String headMsg = "<msgHead>"+ "<requestId>"+requestId+"</requestId>"+ "<requestTime>"+sdf.format(new Date())+"</requestTime>"+ "<source>KANMS</source>"+ "<target>IPOSS</target>"+ "</msgHead>"; String bodyMsg = "<msgBody>"+ "<msgType>selectPonCusStatus</msgType>" + "<block code=\"IPOSS@guizhou.cn:NEService\">" + "<fields>" + "<field>" + "<phyEqp ip=\""+ip+"\" type=\"OLT\"/>" + "<phyEqp ontid=\""+ontid+"\" type=\"ONU\"/>" + "</field>" + "<field code=\"portInfo\">" + "<port eqpIp=\""+ip+"\" id=\""+port+"\"/>" + "</field>" + "</fields>" + "</block>" + "</msgBody>"; StringBuffer sb = new StringBuffer("<soapenv:Envelope xmlns:soapenv=\"http://schemas.xmlsoap.org/soap/envelope/\" xmlns:sys=\"http://sysinf.ngis.ai.com\">" + "<soapenv:Header/><soapenv:Body><sys:selectPonCusStatus><sys:xml><![CDATA[<root>" + headMsg + bodyMsg + "</root>]]></sys:xml></sys:selectPonCusStatus></soapenv:Body></soapenv:Envelope>"); return sb.toString(); }}class ReturnInfo{ private boolean flag; private String msg; public ReturnInfo(){} public ReturnInfo(boolean flag, String msg) { this.flag = flag; this.msg = msg; } public boolean isFlag() { return flag; } public void setFlag(boolean flag) { this.flag = flag; } public String getMsg() { return msg; } public void setMsg(String msg) { this.msg = msg; }}
备注:在开发过程中碰到一个问题:这样多线程形式解决数据会导致雷同数据反复生产即反复剖析问题,后改了取数据的逻辑。
即:每次取满内存队列后,等队列全副生产完后,再从数据库中取数据,而不是边剖析数据边从库中取数据。