之前我的项目中遇到的问题:须要对单日告警量超过四百万条的数据进行逐条剖析和逻辑解决,导致靠繁多过程跑数据基本跑不完,无奈满足零碎和业务要求,因而决定采纳开多线程形式实现大数据处理。
数据处理流程:
1、申明一个内存队列
2、从库中轮巡取数据放入内存队列
3、开多个线程逐条取内存队列中的数据,剖析后在库中对该条数据打上标识,下次不取了。

main办法程序入口

/** *  */package cn.com.starit.main;import org.apache.log4j.Logger;import cn.com.starit.ge.persistence.cache.AllCacheRefresh;/** * ================================================= <br> * 工程:GessAlarmAnalysis <br> * 类名:Main <br> * 作者:xt<br> * 工夫:2019-10-08上午02:31:17<br> * 版本:Version 1.0 <br><br> * 形容:告警解析入口<br> * ================================================= <br> */public class Main {    private static final Logger logger = Logger.getLogger(Main.class);    private static Main instance = null;    private Main(){}        /**     * 零碎入口     * @param args     */    public static void main(String[] args) {        logger.info("过程以后内存占用量(兆):" + (int)(Runtime.getRuntime().totalMemory()/(1048576)));        Main.getInstance().sysInit();        logger.info("零碎初始化结束:过程以后内存占用量(兆):" + (int)(Runtime.getRuntime().totalMemory()/(1048576)));        logger.info("过程最大内存:" + (int)(Runtime.getRuntime().maxMemory()/(1048576)));        //从库中取出流动告警存入内存队列        new LoadGesBsmThread().start();        //告警解析过程        new alarmAnalysis(10).start();    }        /**     * 零碎初始化     */    public void sysInit() {        // 加载静态数据        AllCacheRefresh.refreshAll();    }     /**     * @return the instance     */    public static Main getInstance() {        if(null == instance)            instance = new Main();        return instance;    }}

创立内存队列实例及办法

package cn.com.starit.ge.persistence.utils;import java.text.SimpleDateFormat;import java.util.Date;import java.util.LinkedList;import java.util.List;import java.util.concurrent.ConcurrentLinkedQueue;import org.apache.log4j.Logger;import cn.com.starit.ge.etec.model.GesAlarm;import cn.com.starit.ge.gzpt.dto.GesAlarmDto;/** * ================================================= <br> * 工程:GessServiceEOMSServer <br> * 类名:SafeWorkContainer <br> * 工夫:2014-9-15下午11:23:46 <br> * 版本:Version 1.0 <br><br> * 形容:TODO 形容该文件的作用 <br> * ================================================= <br> */public class SafeWorkContainer {    private static final Logger logger = Logger.getLogger(SafeWorkContainer.class);    private static SafeWorkContainer instance;     SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");        /**     * @return the instance     */    public synchronized static SafeWorkContainer getInstance() {        if(null == instance)            instance = new SafeWorkContainer();        return instance;    }    /** 告警队列 */    private ConcurrentLinkedQueue<GesAlarmDto> safeWorkQueue;        private SafeWorkContainer(){        safeWorkQueue = new ConcurrentLinkedQueue<GesAlarmDto>();    }        /**     * 将GsmAlarm告警数据保留到内存队列     * @param safeWork     */    public synchronized void putSafeWork(GesAlarmDto safeWork) {        // 对象为null间接返回,ConcurrentLinkedQueue队列不容许增加null元素        if(null == safeWork)            return;        safeWorkQueue.offer(safeWork);        logger.info("增加一条GesAlarm告警到内存队列胜利");        notifyAll();    }        /**     * 将GsmAlarm告警数据保留到内存队列     * @param safeWork     */    public synchronized void putSafeWork(List<GesAlarmDto> safeWorkList) {        // 对象为null间接返回,ConcurrentLinkedQueue队列不容许增加null元素        if(null == safeWorkList)            return;        safeWorkQueue.addAll(safeWorkList);//        for(GesAlarm alarm:safeWorkList){//            safeWorkQueue.offer(alarm);//        }        logger.info(String.format("增加%d条GesAlarm告警到内存队列胜利",safeWorkList.size()));        notifyAll();    }        /**     * 获取GsmAlarm告警数据     * @return GesAlarm     * @throws InterruptedException      */    public synchronized GesAlarmDto getSafeWork() throws InterruptedException {        GesAlarmDto safeWork;        if(safeWorkQueue.isEmpty()) {            logger.info("SafeWorkContainer 队列中没有GsmAlarm告警数据,本线程挂起");            wait();        }        safeWork = safeWorkQueue.poll();        //logger.info("取到一条重保工单信息,mainSn=" + safeWork);        return safeWork;    }        /**     * 获取GsmAlarm告警数据     * @return GesAlarm     * @throws InterruptedException      */    public synchronized List<GesAlarmDto> getSafeWork(int cnt){        try{            if(safeWorkQueue.isEmpty()) {                logger.info("SafeWorkContainer 队列中没有GsmAlarm告警数据,本线程挂起 ");                wait();            }            List<GesAlarmDto> safeWorkList = new LinkedList<GesAlarmDto>();            for(int i=0;i<cnt;i++){                GesAlarmDto safeWork = safeWorkQueue.poll();                if(safeWork == null) break;                safeWorkList.add(safeWork);            }            logger.info(String.format("取到%dGesAlarm告警信息", safeWorkList.size()));//            logger.info(String.format("从库中取告警放入待剖析队列工夫:", formatter.format(new Date())));            return safeWorkList;        }catch(Exception e){            logger.info(e.getMessage());            e.printStackTrace();        }        return new LinkedList<GesAlarmDto>();    }        /**     * @return 返回容器中信息数     */    public synchronized int size() {        return safeWorkQueue.size();    }}

LoadGesBsmThread线程继承线程池

package cn.com.starit.main;import java.util.ArrayList;import java.util.Date;import java.util.HashMap;import java.util.List;import java.util.Map;import org.apache.log4j.Logger;import cn.com.starit.ge.etec.model.GesAlarm;import cn.com.starit.ge.gzpt.dto.GesAlarmDto;import cn.com.starit.ge.persistence.dao.ServiceProvider;import cn.com.starit.ge.persistence.utils.SafeWorkContainer;import cn.com.starit.ge.persistence.utils.Static;public class LoadGesBsmThread extends Thread {    private static final Logger logger = Logger.getLogger(LoadGesBsmThread.class);    private int flag = 1;    private long maxId = 0L;    public void run(){        Long maxId = 0L;        while(true){            try{                int queueSize = SafeWorkContainer.getInstance().size();                logger.info("GesAlarm缓存队列告警数:"+queueSize);                if(queueSize>1000){                    Thread.sleep(1000);                }                if(maxId == 0L){                    if(queueSize==0){                        maxId = loadGesAlarm(maxId);                        logger.info("reset maxId=0");                    }else{                        Thread.sleep(1000);                    }                }else {                    Long maxIdTemp = loadGesAlarm(maxId);                    if(maxIdTemp.longValue()!=0){                        if(maxIdTemp-maxId<100){                            loadGesAlarm(maxIdTemp);                            maxId=0L;                        }else                            maxId = maxIdTemp;                    }                }                try{                    Thread.sleep(500*flag);                }catch(Exception e){                    logger.error("Thread.sleep异样:"+e.getMessage());                }            }catch(Exception e){                logger.error("LoadGesActiveThread异样:"+e.getMessage());            }        }    }    private Long loadGesAlarm(Long maxAlarmId){        List<GesAlarmDto> list = ServiceProvider.getGesAlarmDao().getGesAlarm(maxAlarmId);        Long maxId = 0L;        for(GesAlarmDto alarm:list){            if(alarm.getAlarmid().longValue()>maxId.longValue()){                maxId = alarm.getAlarmid();            }        }        SafeWorkContainer.getInstance().putSafeWork(list);        logger.info(String.format("load %d GesAlarm到队列",list.size()));        if(list.size()<10){            flag = 4;        }else if(list.size()<50){            flag = 3;        }else if(list.size()<100){            flag=2;        }else{            flag = 1;        }        return maxId;    }}

将内存队列中的数据取出,多线程形式逐条解析成告警对象

package cn.com.starit.main;import java.lang.reflect.Field;import java.lang.reflect.Method;import java.text.SimpleDateFormat;import java.util.ArrayList;import java.util.Date;import java.util.HashMap;import java.util.Iterator;import java.util.List;import java.util.Map;import java.util.regex.Matcher;import java.util.regex.Pattern;import org.apache.commons.lang.StringUtils;import org.apache.log4j.Logger;import org.dom4j.Document;import org.dom4j.DocumentHelper;import org.dom4j.Element;import org.springframework.beans.BeanUtils;import com.sun.jersey.api.client.Client;import com.sun.jersey.api.client.ClientResponse;import com.sun.jersey.api.client.WebResource;import cn.com.starit.ge.etec.model.AzGesBsmAlarm;import cn.com.starit.ge.etec.model.GesAlarm;import cn.com.starit.ge.etec.model.GesAlarmClearCache;import cn.com.starit.ge.etec.model.GesAlarmHis;import cn.com.starit.ge.etec.model.GesBsmAlarm;import cn.com.starit.ge.etec.model.GesBsmAlarmCirRel;import cn.com.starit.ge.etec.model.GesBsmAlarmCirRelId;import cn.com.starit.ge.etec.model.GesBsmAlarmClear;import cn.com.starit.ge.etec.model.GesBsmAlarmHis;import cn.com.starit.ge.etec.model.GesMonitorConfig2;import cn.com.starit.ge.etec.model.ShortmessageInfo;import cn.com.starit.ge.gzpt.dto.GesAlarmDto;import cn.com.starit.ge.persistence.busi.AffectCustAnalysis;import cn.com.starit.ge.persistence.cache.GesAlarmStrategyTypeDefCache;import cn.com.starit.ge.persistence.dao.ServiceProvider;import cn.com.starit.ge.persistence.dto.AffectCustDto;import cn.com.starit.ge.persistence.dto.AlarmLevelDto;import cn.com.starit.ge.persistence.dto.CustCircuitDto;import cn.com.starit.ge.persistence.dto.DZWYDto;import cn.com.starit.ge.persistence.utils.AlarmObjConvert;import cn.com.starit.ge.persistence.utils.PropertiesLoadUtil;import cn.com.starit.ge.persistence.utils.SafeWorkContainer;import cn.com.starit.ge.persistence.utils.Static;import cn.com.starit.ge.gzpt.service.WarRoomService;/** * ================================================= <br> * 工程:GessAlarmAnalysis <br> * 类名:alarmAnalysis <br> * 工夫:2019-8-12下午01:13:10<br> * 版本:Version 1.0 <br> <br> * 形容:告警解析过程,将原始告警解析成咱们零碎外部的告警对象<br> * ================================================= <br> */public class alarmAnalysis{    private static accessAndKeepOrder accessAndKeepOrder;    private static WarRoomService warRoomService;    private static final Logger logger = Logger.getLogger(alarmAnalysis.class);    private List<AnalysisThread> threadList = new ArrayList<AnalysisThread>();    public alarmAnalysis(int n){        for(int i=0;i<n;i++){            threadList.add(new AnalysisThread("AnalysisThread_"+i));        }    }        public void start(){        logger.info("告警解析开启多线程,线程数为"+threadList.size()+"....");        if(threadList.size()>0){            for(AnalysisThread at:threadList){                at.start();            }        }    }    class AnalysisThread extends Thread{        public AnalysisThread(String name) {            super(name);        }        public void run(){            logger.info("告警解析线程"+this.getName()+"启动.....");            while(true){                List<GesAlarmDto> list = SafeWorkContainer.getInstance().getSafeWork(50);                logger.info(String.format("取%dGesAlarmDto告警解决", list.size()));                if(null != list && list.size() > 0) {                    for(int i=0;i<list.size();i++){                        GesAlarmDto gesAlarmDto = list.get(i);                        try{                            accessAndKeepOrder=new accessAndKeepOrder();                            accessAndKeepOrder.run(gesAlarmDto);                            accessAndKeepOrder=null;                            //作战室严障调用                            warRoomService=new WarRoomService();                            warRoomService.run(gesAlarmDto);                            warRoomService=null;                        }catch(Exception e){                            logger.error("剖析告警出错!告警流水号为:"+gesAlarmDto.getSrcalarmId(),e);                        }                    } // end for                } // end first if                else {                    logger.warn("未取到原始告警信息,休眠1秒 (—_—)~zZ");                    try {                        Thread.sleep(1000);                    } catch (InterruptedException e) {                        logger.error(e.getMessage(), e);                    }                }                        } // end while        }    }    /*     *      */    private Map<String,String> analyticalPostition(String alarmLocate,long alarmProftype){        String rack=null, shelf=null, slot=null, port=null, ontid=null;         Map<String,String> map = new HashMap<String,String>();        if(2 == alarmProftype) {//传输业余告警            if(null != alarmLocate && !"".equals(alarmLocate)&&!"null".equals(alarmLocate)) {                String[] locArry = alarmLocate.split("/");                Map<String, String> locMap = new HashMap<String, String>();                for(String loc : locArry) {                    if(null != loc && !"".equals(loc)) {                        if(loc.split("=").length>=2){                            String k = loc.split("=")[0].toLowerCase();                            String v = loc.split("=")[1];                            locMap.put(k, v);                        }                    }                }                rack = locMap.get("rack");                shelf = locMap.get("shelf");                slot = locMap.get("slot");                // 端口, 有port 和 PhysicalPort ,不排除第三种                for(Iterator<String> it=locMap.keySet().iterator(); it.hasNext();) {                    String k = it.next();                    if(null != k && k.contains("port")) {                        port = locMap.get(k);                    }                }            }// end second if        }// end first if        ///数据网管告警,格局 0-0-2-2:4        else if(1 == alarmProftype) {//数据业余            if(null != alarmLocate && !"".equals(alarmLocate)) {                String loc = alarmLocate.split(":")[0];                String[] locArry = loc.split("-");                if(locArry.length>0){                    rack = locArry[0];                }                if(locArry.length>1){                    shelf = locArry[1];                }                if(locArry.length>2){                    slot = locArry[2];                }                if(locArry.length>3){                    port = locArry[3];                }                if(alarmLocate.split(":").length>1) ontid = alarmLocate.split(":")[1];            }        }        map.put("rack", rack);        map.put("shelf", shelf);        map.put("slot", slot);        map.put("port", port);        map.put("ontid", ontid);        return map;    }    /**     * 将告警与电路关系保留到Ges_bsm_alarm_cir_rel表中     * @param gesBsmAlarmId,告警保留到GES_BSM_ALARM表后,主动生成的ID     * @param cirId,电路ID     */    public static void saveGesBsmAlarmCirRel(Long gesBsmAlarmId,String circuitIds){        if(null!=circuitIds&&gesBsmAlarmId!=null){            try{                String[] ids = circuitIds.split(",");                                for(String id:ids){                    GesBsmAlarmCirRelId relId = new GesBsmAlarmCirRelId(gesBsmAlarmId,Long.parseLong(id));                    GesBsmAlarmCirRel rel = new GesBsmAlarmCirRel(relId);                    ServiceProvider.getGesBsmAlarmCirRelDao().save(rel);                }                logger.debug(String.format("REL告警与电路关系入库胜利,共%d条!", ids.length));            }catch(Exception e){                logger.debug(String.format("REL告警与电路关系入库异样!%s",e.getMessage()));                e.printStackTrace();            }        }    }    public static void removeGesBsmAlarmCirRel(Long gesBsmAlarmId,String circuitIds){        if(gesBsmAlarmId!=null&&circuitIds!=null){            logger.debug(String.format("REL筹备删除告警与电路关系:[gesBsmAlarmId:%s,gesBsmAlarmCircuitIds:%s]", gesBsmAlarmId,circuitIds));            try{                String[] ids = circuitIds.split(",");                                for(String id:ids){                    GesBsmAlarmCirRelId relId = new GesBsmAlarmCirRelId(gesBsmAlarmId,Long.parseLong(id));//存储告警与电路关联的类,属性有告警ID和关联的电路编码ID                    GesBsmAlarmCirRel rel = new GesBsmAlarmCirRel(relId);                    ServiceProvider.getGesBsmAlarmCirRelDao().delete(rel);                }                logger.debug(String.format("REL告警与电路关系删除胜利,共%d条!", ids.length));            }catch(Exception e){                logger.debug(String.format("REL告警与电路关系删除异常!%s",e.getMessage()));                e.printStackTrace();            }        }    }    public void clearAlaarmSendShortMsg(GesBsmAlarm gesBsmAlarm){        List<CustCircuitDto> ccList = ServiceProvider.getGesAlarmDao().getCustCircuitByDevName(gesBsmAlarm.getDeviceName().trim());        AffectCustDto affectCustDto = new AffectCustDto();        affectCustDto.setMmeeids(gesBsmAlarm.getCustomCode());        affectCustDto.setCircuitIds(gesBsmAlarm.getCircuitIds());        affectCustDto.setCirCodes(gesBsmAlarm.getCirCodes());    }    public void convertAreaId(String srcAreaId){            }    /**     * 向告警影响的客户发送短信     * @param gesAlarm 告警信息     * @param ccList  影响的客户及电路     * @param strategyType  影响的客户及电路     * @param affectCustDto  封装的与告警相关联的电路 路由信息和客户信息     */    public void newAlarmSendShortMsg(GesAlarm gesAlarm, AffectCustDto affectCustDto,List<CustCircuitDto> ccList,Integer strategyType){        try{            List<GesMonitorConfig2> configList = ServiceProvider.getGesMonitorConfig2Dao().queryConfig(affectCustDto.getMmeeids());//查询数据库中须要进行短信告诉的客户            //<联系人电话,<客户名称,电路编码>MAP            Map<String,Map<String,StringBuffer>> customerMsgMap = new HashMap<String,Map<String,StringBuffer>>();//电话号码,<客户名称,电路编码>            Map<String,Map<String,StringBuffer>> managerMsgMap = new HashMap<String,Map<String,StringBuffer>>();//电话号码,电路编码            Map<String,Map<String,StringBuffer>> engineerMsgMap = new HashMap<String,Map<String,StringBuffer>>();//电话号码,电路编码                        if(null!=configList&&configList.size()>0){                Map<String,CustCircuitDto> cirMap = new HashMap<String,CustCircuitDto>();                Map<String,CustCircuitDto> telMap = new HashMap<String,CustCircuitDto>();                for(CustCircuitDto dto:ccList){                    if(null!=dto.getCircuitId()){                        cirMap.put(dto.getCircuitId().toString(), dto);                    }                    if(null!=dto.getCustomerTel()){                        telMap.put(dto.getCustomerTel(), dto);                    }                    if(null!=dto.getManagerTel()){                        telMap.put(dto.getManagerTel(), dto);                    }                    if(null!=dto.getEngineerTel()){                        telMap.put(dto.getEngineerTel(), dto);                    }                }                String custIds = affectCustDto.getMmeeids();//客户ID                String cirIds = affectCustDto.getCircuitIds();//电路编码                for(GesMonitorConfig2 config:configList){                    String noticeUsers = config.getNoticeUsers();//告诉用户类型                    //告诉对象标记                    boolean customerNotice = false;                    boolean managerNotice = false;                    boolean engineerNotice = false;                    if(null==noticeUsers||"".equals(noticeUsers)){                        continue;                    }                    noticeUsers += ",";                    if(noticeUsers.contains("1,")){//告诉客户联系人                        customerNotice=true;                    }                    if(noticeUsers.contains("2,")){//告诉客户经理                        managerNotice=true;                    }                    if(noticeUsers.contains("3,")){//告诉客户工程师                        engineerNotice=true;                    }                    //判断告警影响的客户是否是被动监控客户                    if(null!=affectCustDto.getMmeeids()&&affectCustDto.getMmeeids().contains(config.getCustId().toString())){                        //分隔被动监控客户的电路ID,逐个解决                        String[] cis = config.getCircuitIds().split(",");                        if(affectCustDto.getCircuitIds()!=null&&cis!=null){                            //为防止长ID蕴含短ID,如32123与212,在ID后加逗号后进行比对                            String cirIdsTemp = affectCustDto.getCircuitIds()+",";//告警影响的电路ID                            for(String str:cis){                                if(null!=str&&!"".equals(str)&&cirIdsTemp.contains(str+",")){                                    //告警影响了被动监控电路                                    //依据电路ID,取客户及电路信息                                    CustCircuitDto d = cirMap.get(str);                                    String cirCode = d.getCircode();//取电路编码                                    if(customerNotice&&null!=d.getCustomerTel()){                                        Map<String,StringBuffer> tempMap = customerMsgMap.get(d.getCustomerTel());                                        if(null==tempMap){                                            tempMap = new HashMap<String,StringBuffer>();                                        }                                        if(null!=d.getCustName()&&!"".equals(d.getCustName())){                                            StringBuffer sb = tempMap.get(d.getCustName());                                            if(null==sb){                                                sb = new StringBuffer(512);                                            }                                            sb.append(cirCode).append(",");                                            tempMap.put(d.getCustName(), sb);                                            customerMsgMap.put(d.getCustomerTel(), tempMap);                                        }                                    }                                    if(managerNotice&&null!=d.getManagerTel()){                                        Map<String,StringBuffer> tempMap = managerMsgMap.get(d.getManagerTel());                                        if(null==tempMap){                                            tempMap = new HashMap<String,StringBuffer>();                                        }                                        if(null!=d.getCustName()&&!"".equals(d.getCustName())){                                            StringBuffer sb = tempMap.get(d.getCustName());                                            if(null==sb){                                                sb = new StringBuffer(512);                                            }                                            sb.append(cirCode).append(",");                                            tempMap.put(d.getCustName(), sb);                                            managerMsgMap.put(d.getManagerTel(), tempMap);                                        }                                    }                                    if(engineerNotice&&null!=d.getEngineerTel()){                                        Map<String,StringBuffer> tempMap = engineerMsgMap.get(d.getEngineerTel());                                        if(null==tempMap){                                            tempMap = new HashMap<String,StringBuffer>();                                        }                                        if(null!=d.getCustName()&&!"".equals(d.getCustName())){                                            StringBuffer sb = tempMap.get(d.getCustName());                                            if(null==sb){                                                sb = new StringBuffer(512);                                            }                                            sb.append(cirCode).append(",");                                            tempMap.put(d.getCustName(), sb);                                            engineerMsgMap.put(d.getEngineerTel(), tempMap);                                        }                                    }                                }                            }                        }                    }                }                //发送短信                int ppFlag = 0;                SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");                if(null!=strategyType){                    ppFlag = strategyType.intValue();                }                //客户联系人                for(String tel:customerMsgMap.keySet()){                    Map<String,StringBuffer> m = customerMsgMap.get(tel);                    for(String custName:m.keySet()){                        StringBuffer sb = m.get(custName);                        if(null==custName){                            custName = "";                        }                        if(null!=sb&&sb.length()>0){                            StringBuffer msgSb = new StringBuffer(512);                            msgSb.append("尊敬的")                                .append(custName)                                .append("客户您好!于")                                .append(null==gesAlarm.getNealarmtime()?"":sdf.format(gesAlarm.getNealarmtime()))                                .append("工夫【").append(gesAlarm.getReserve3())//告警设施                                .append("】设施产生告警,可能会影响您的")                                .append(sb.substring(0, sb.length()-1))                                .append("等电路的失常应用,咱们正在修复故障,若给你带来不便,敬请体谅!【中国电信-大客户网管零碎】");                            if(2==ppFlag){                                msgSb.append(gesAlarm.getAlarmname()).append("告警被屏蔽!");                            }                            logger.info(String.format("向客户联系人%s(%s)发送短信:%s",custName,tel, msgSb.toString()));                            sendShortMsg(tel,msgSb.toString(),0);                        }                    }                }                //客户经理                for(String tel:managerMsgMap.keySet()){                    Map<String,StringBuffer> m = managerMsgMap.get(tel);                    CustCircuitDto d = telMap.get(tel);                    String name = d.getManager();                    if(null==name){                        name = "";                    }                    StringBuffer subSb = new StringBuffer(512);                    for(String custName:m.keySet()){                        StringBuffer sb = m.get(custName);                        if(null==custName){                            custName = "";                        }                        if(null!=sb&&sb.length()>1){                            if(subSb.length()>1){                                subSb.append(",");                            }                            subSb.append("【").append(custName).append(":")                                .append(sb.substring(0, sb.length()-1)).append("】");                        }                    }                    if(subSb.length()>1){                        StringBuffer msgSb = new StringBuffer(512);                        msgSb.append("尊敬的")                            .append(name)                            .append("您好!于")                            .append(null==gesAlarm.getNealarmtime()?"":sdf.format(gesAlarm.getNealarmtime()))                            .append("工夫【").append(gesAlarm.getReserve3())//告警设施                            .append("】设施产生告警,可能会影响您的以下客户业务")                            .append(subSb.toString())                            .append(",请留神查看故障修复停顿。【中国电信-大客户网管零碎】");                        if(2==ppFlag){                            msgSb.append(gesAlarm.getAlarmname()).append("告警被屏蔽!");                        }                        sendShortMsg(tel,msgSb.toString(),0);                        logger.info(String.format("向客户经理%s(%s)发送短信:%s",name,tel, msgSb.toString()));                    }                }                //客户工程师                for(String tel:engineerMsgMap.keySet()){                    Map<String,StringBuffer> m = engineerMsgMap.get(tel);                    CustCircuitDto d = telMap.get(tel);                    String name = d.getEngineer();                    if(null==name){                        name = "";                    }                    StringBuffer subSb = new StringBuffer(512);                    for(String custName:m.keySet()){                        StringBuffer sb = m.get(custName);                        if(null==custName){                            custName = "";                        }                        if(null!=sb&&sb.length()>1){                            if(subSb.length()>1){                                subSb.append(",");                            }                            subSb.append("【").append(custName).append(":")                                .append(sb.substring(0, sb.length()-1)).append("】");                        }                    }                    if(subSb.length()>1){                        StringBuffer msgSb = new StringBuffer(512);                        msgSb.append("尊敬的")                            .append(name)                            .append("客户您好!于")                            .append(null==gesAlarm.getNealarmtime()?"":sdf.format(gesAlarm.getNealarmtime()))                            .append("工夫【").append(gesAlarm.getReserve3())//告警设施                            .append("】设施产生通信故障,可能会影响您的以下客户业务")                            .append(subSb.toString())                            .append(",请留神查看故障修复停顿。【中国电信-大客户网管零碎】");                        if(2==ppFlag){                            msgSb.append(gesAlarm.getAlarmname()).append("告警被屏蔽!");                        }                        sendShortMsg(tel,msgSb.toString(),0);                        logger.info(String.format("向客户工程师%s(%s)发送短信:%s",name,tel, msgSb.toString()));                    }                }            }        }catch(Exception e){            logger.info("剖析告警推送对象异样:"+e.getMessage());            e.printStackTrace();        }    }        /**     * 比照AZ端的端口信息是否正确     * @param gesAlarm     * @return     */    public boolean compareAZinfo(GesAlarm gesAlarm,Map<String,String> mapString){        String circuitIds = "";        String cirCodes = "";        String alarmNet = gesAlarm.getReserve3();//告警设施        String alarmLocate = gesAlarm.getReserve5(); // 故障地位        long alarmProftype = gesAlarm.getAlarmproftype();//示意传输业余还是数据业余的告警        if(alarmNet!= null){            List<String> azInfos = Static.cirCodeAzInfoMap.get(alarmNet);            if(azInfos!=null){                boolean ifcompare = false;                Map<String,String> map = analyticalPostition(alarmLocate,alarmProftype);//告警的AZ端口信息                String rack = map.get("rack");                String shelf = map.get("shelf");                String slot = map.get("slot");                String port = map.get("port");                for(String azInfo:azInfos){                    System.out.print(">>>>>>"+azInfo+"\n");                    boolean portCompare = true;//如果定制了端口,则用来比照端口信息                    Map<String,String> aZmap = analyticalPostition(azInfo.split("//")[0],2);//对应的AZ端口的信息                    String azRack = aZmap.get("rack");                    String azShelf = aZmap.get("shelf");                    String azSlot = aZmap.get("slot");                    String azPort = aZmap.get("port");                    if(!isEmptyIgnoreCase(azRack)){                        if(rack!=null){                            if(!azRack.trim().equals(rack.trim())){                                portCompare=false;                            }                        }else{                            portCompare=false;                        }                    }                    if(!isEmptyIgnoreCase(azShelf)){                        if(shelf!=null){                            if(!azShelf.trim().equals(shelf.trim())){                                portCompare=false;                            }                        }else{                            portCompare=false;                        }                    }                    if(!isEmptyIgnoreCase(azSlot)){                        if(slot!=null){                            if(!azSlot.trim().equals(slot.trim())){                                portCompare=false;                            }                        }else{                            portCompare=false;                        }                    }                    if(!isEmptyIgnoreCase(azPort)){                        if(port!=null){                            if(!azPort.trim().equals(port.trim())){                                portCompare=false;                            }                        }else{                            portCompare=false;                        }                    }                    if(portCompare){//端口雷同                        if(azInfo.split("//").length>2){                            circuitIds += azInfo.split("//")[1]+",";//电路ID                            cirCodes += azInfo.split("//")[2]+",";//电路编码                            System.out.print(">>>0"+circuitIds+">>>0"+cirCodes+"\n");                        }                        ifcompare= true;                    }                }                mapString.put("prepareCircuitIds", circuitIds);                mapString.put("prepareCirCodes", cirCodes);                if(ifcompare){//如果有一个匹配上的话,就返回真                    return true;                }            }        }        return false;    }        /**     * 定制网元产生告警后,发送短信给指定人员     * @param tel     * @param msg     */    public void customizedNeSendMsg(GesAlarm gesAlarm,Integer strategyType){        SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");        String alarmNet = gesAlarm.getReserve3();//告警设施        String alarmLocate = gesAlarm.getReserve5(); // 故障地位        List<DZWYDto> dzwyList = new ArrayList<DZWYDto>();        DZWYDto dzwy = new DZWYDto();        long alarmProftype = gesAlarm.getAlarmproftype();//示意传输业余还是数据业余的告警        int ppFlag = 0;//用来判断该告警是否能够被屏蔽        if(null!=strategyType){            ppFlag = strategyType.intValue();        }        if(alarmNet!= null){            AlarmLevelDto alarmlev=null;            String levName=null;            List<AlarmLevelDto> alarmLLevels=ServiceProvider.getGesAlarmDao().getCirAlarmLev(gesAlarm.getAlarmname());//获取告警内容级别            if(alarmLLevels!=null&&alarmLLevels.size()>0){                alarmlev=alarmLLevels.get(0);            }            if(alarmlev!=null&&alarmlev.getIsA()!=null&&alarmlev.getIsA().equals("1")){                levName="业务中断";            }            else if(alarmlev!=null&&alarmlev.getIsB()!=null&&alarmlev.getIsB().equals("1")){                levName="性能劣化";            }            else if(alarmlev!=null&&alarmlev.getIsC()!=null&&alarmlev.getIsC().equals("1")){                levName="危险减少";            }else                levName="";            dzwyList = ServiceProvider.getGesAlarmDao().queryGESWY(alarmNet,alarmLocate);            if(dzwyList.size()>0){                 dzwy = dzwyList.get(0);            }            List<String> costomizes = Static.customizedMap.get(gesAlarm.getReserve3());            if(costomizes!=null){                Map<String,String> map = analyticalPostition(alarmLocate,alarmProftype);                String rack = map.get("rack");                String shelf = map.get("shelf");                String slot = map.get("slot");                String port = map.get("port");                for(String costomized:costomizes){                    boolean portCompare = true;//如果定制了端口,则用来比照端口信息                    String[] strs = costomized.split("_");                    String positions = "第";                    String cRack = strs[0];                    String cShelf = strs[1];                    String cSlot = strs[2];                    String cPort = strs[3];                    String cNoticePeople = strs[4];                    String cNoticeTelNum = strs[5];                                        if(!isEmptyIgnoreCase(cRack)){                        positions +=cRack+"架";                        if(rack!=null){                            if(!cRack.trim().equals(rack.trim())){                                portCompare=false;                            }                        }else{                            portCompare=false;                        }                    }                    if(!isEmptyIgnoreCase(cShelf)){                        positions +=cShelf+"框";                        if(shelf!=null){                            if(!cShelf.trim().equals(shelf.trim())){                                portCompare=false;                            }                        }else{                            portCompare=false;                        }                    }                    if(!isEmptyIgnoreCase(cSlot)){                        positions +=cSlot+"槽";                        if(slot!=null){                            if(!cSlot.trim().equals(slot.trim())){                                portCompare=false;                            }                        }else{                            portCompare=false;                        }                    }                    if(!isEmptyIgnoreCase(cPort)){                        positions +=cPort+"口";                        if(port!=null){                            if(!cPort.trim().equals(port.trim())){                                portCompare=false;                            }                        }else{                            portCompare=false;                        }                    }                    logger.info("1>>>>》》》》"+costomized+"》》"+alarmLocate+">>>"+portCompare+"\n");                    if(portCompare&&ppFlag!=2){                        logger.info("2>>>>》》》》"+costomized+"》》"+alarmLocate+">>>"+portCompare+"\n");                        if(positions.trim().equals("第")){                            positions="";                        }                        StringBuffer msgSb = new StringBuffer(512);                        /*msgSb.append("尊敬的")                            .append(cNoticePeople)                            .append("您好!于")                            .append(null==gesAlarm.getNealarmtime()?"":sdf.format(gesAlarm.getNealarmtime()))                            .append(",定制网元【").append(gesAlarm.getReserve3())//告警设施                            .append(positions+"】产生告警");                        if(!isEmptyIgnoreCase(gesAlarm.getAlarmname())){                            msgSb.append("("+gesAlarm.getAlarmname()+")");                        }                        msgSb.append(",请留神查看故障修复停顿。【中国电信-大客户网管零碎】");*/                        msgSb.append("您好,")                        .append(dzwy.getCustName())                        .append(",于")                        .append(null==gesAlarm.getNealarmtime()?"":sdf.format(gesAlarm.getNealarmtime()))                        .append("工夫,设施")                        .append(gesAlarm.getReserve3())                        .append("产生告警,电路编码:")                        .append(dzwy.getCirCode())                        .append(",告警形容:")                        .append(gesAlarm.getAlarmsummary());                                                if(!levName.equals("")){                            msgSb.append(",产生影响:")                            .append(levName);                        }                        sendShortMsg(cNoticeTelNum,msgSb.toString(),1);                        logger.info(String.format("向客户经理%s(%s)发送短信:%s",cNoticePeople,cNoticeTelNum, msgSb.toString()));                    }                }            }        }            }    /*     * 漠视大小写的字符串非空比拟     */    public static boolean isEmptyIgnoreCase(String str){        if(str!=null&&!str.equalsIgnoreCase("null")&&!(str.trim()).equals("")                &&!str.equalsIgnoreCase("undefined")){            return false;        }else{            return true;        }    }    /**     *      * @param tel 电话号码     * @param msg 短信内容     * @param type 0示意一般告警1示意定制网元的告警     */    public void sendShortMsg(String tel,String msg,int type){        System.out.print(">>>>>发送测试短信!");//        msg+="【测试短信】";        Long ifSend = -1L;        try{            if(type==0){                ifSend = Long.parseLong(PropertiesLoadUtil.getPro("ALARM_IF_SENDMSG"));                System.out.print(">>>>>一般告警短信发送性能!发送类型》》"+ifSend);                logger.info(">>>>>一般告警短信发送性能!发送类型》》"+ifSend);            }else if(type==1){                logger.info(">>>>>>发现定制网元,发送短信!");                ifSend = Long.parseLong(PropertiesLoadUtil.getPro("CUSTOMIZED_IF_SENDMSG"));                System.out.print(">>>>>定制网元告警短信发送性能!发送类型》》"+ifSend);                logger.info(">>>>>定制网元告警短信发送性能!发送类型》》"+ifSend);            }else if(type==2){//2019-07-20纳管电路告警                ifSend = Long.parseLong(PropertiesLoadUtil.getPro("CIRCUIT_ALARMS_SENDMSG"));                System.out.print(">>>>>纳管电路告警短信发送性能!发送类型》》"+ifSend);            }        }catch(Exception e){            logger.info("从配置文件中读取短信发送配置信息失败!",e);        }        if(isEmptyIgnoreCase(tel)||isEmptyIgnoreCase(msg)){            return;        }        ShortmessageInfo msgInfo = new ShortmessageInfo();        msgInfo.setReceiveNo(tel);//接管短信的电话号码        //msgInfo.setMessage(msg+tel);        msgInfo.setMessage(msg);        //msgInfo.setSendFlag(0L);        msgInfo.setSendFlag(ifSend);//测试用,临时设置为-1,短信模块不会发送状态为-1的短信。        msgInfo.setSendTime(new Date(System.currentTimeMillis()));        ServiceProvider.getShortMessageInfoDao().save(msgInfo);    }    //    public static void main(String[] args) {//        System.out.println(">>>>>>>>>>"+isEmptyIgnoreCase(null)+"\n");//        System.out.println(">>>>>>>>>>"+isEmptyIgnoreCase("nUll")+"\n");//        System.out.println(">>>>>>>>>>"+isEmptyIgnoreCase("UNDERFINed")+"\n");//    }        /**     * 应用反射机制将GesBsmAlarm对象映射至GesBsmAlarmClear<br>     * 注:该办法的前提是GesBsmAlarmClear和GesBsmAlarm对象定义的get/set办法对统一<br>     * @param gesBsmAlarm     * @return     */    public GesBsmAlarmClear transformBy(GesBsmAlarm gesBsmAlarm) {        Field[] fields = gesBsmAlarm.getClass().getDeclaredFields();        GesBsmAlarmClear gesBsmAlarmClear = new GesBsmAlarmClear();        for(Field field : fields) {            String fieldName = field.getName();            Class<?> type = field.getType();            logger.debug("type=" + type);            fieldName = fieldName.substring(0, 1).toUpperCase() + fieldName.substring(1);            Method m1, m2;            String v1;            try {                m1 = gesBsmAlarm.getClass().getDeclaredMethod("get" + fieldName);                v1 = (String)m1.invoke(gesBsmAlarm);                m2 = gesBsmAlarmClear.getClass().getDeclaredMethod("set" + fieldName, type);                m2.invoke(gesBsmAlarmClear, v1);            } catch (Exception e) {                logger.error(e.getMessage(), e);            }        }        return gesBsmAlarmClear;    }    //    /**//     * 测试函数//     * @param args//     *///    public static void main(String[] args) {//        // 测试transformBy办法 --通过//        GesBsmAlarm gesBsmAlarm = new GesBsmAlarm();//        gesBsmAlarm.setAlarmName("这是一个测试");//        GesBsmAlarmClear gesBsmAlarmClear = new alarmAnalysis().transformBy(gesBsmAlarm);//        System.out.println(gesBsmAlarmClear.getAlarmName());//    }        /**     * 预处理     * @param name     * @param rack     * @param shelf     * @param slot     * @param port     * @param sx     * @return     */    public String pretreatment(String name,String rack,String shelf,String slot,String port,String sx){        return "信息不全";//        List<RmResCircuitRoute2> routeAList = ServiceProvider.getGesAlarmDao().getRouteA(name,rack,shelf,slot,port,sx);//        List<RmResCircuitRoute2> routeZList = ServiceProvider.getGesAlarmDao().getRouteZ(name,rack,shelf,slot,port,sx);//        String ipA = null, ipB = null;//        if(routeAList!=null&&routeAList.size()>0){//            for(int i = 0;i<routeAList.size();i++){//                if(routeAList.get(i).getAip()!=null){//                    ipA = routeAList.get(i).getAip();//                    break;//                }//            }//        }//        if(routeZList!=null&&routeZList.size()>0){//            for(int i = 0;i<routeZList.size();i++){//                if(routeZList.get(i).getZip()!=null){//                    ipB = routeZList.get(i).getZip();//                    break;//                }//            }//        }//        if(null != ipA && null != ipB) {//            if (LocalPingUtil.PING_RESULT_SUCC == LocalPingUtil.timeoutPing(ipA) && LocalPingUtil.PING_RESULT_SUCC == LocalPingUtil.timeoutPing(ipB)) {//                return "设施能失常Ping通";//            } else if (LocalPingUtil.PING_RESULT_FAIL == LocalPingUtil.timeoutPing(ipA) && LocalPingUtil.PING_RESULT_SUCC == LocalPingUtil.timeoutPing(ipB)) {//                return "设施" + ipA + "无奈Ping通";//            } else if (LocalPingUtil.PING_RESULT_SUCC == LocalPingUtil.timeoutPing(ipA) && LocalPingUtil.PING_RESULT_FAIL == LocalPingUtil.timeoutPing(ipB)) {//                return "设施" + ipB + "无奈Ping通";//            } else {//                return "设施" + ipA + "和设施" + ipB + "无奈Ping通";//            }//        }//        else if (null != ipA) {//            if (LocalPingUtil.PING_RESULT_SUCC == LocalPingUtil.timeoutPing(ipA)) {//                return "设施能失常Ping通";//            } else {//                return "设施" + ipA + "无奈Ping通";//            }//        }//        else if (null != ipB) {//            if (LocalPingUtil.PING_RESULT_SUCC == LocalPingUtil.timeoutPing(ipB)) {//                return "设施能失常Ping通";//            } else {//                return "设施" + ipB + "无奈Ping通";//            }//        } else {//            return "设施IP未找到";//        }    }        private static final String IP_REGEX = //IP地址的正则表达式        "(1\\d{2}|2[0-4]\\d|25[0-5]|[1-9]\\d|[1-9])\\." +         "(1\\d{2}|2[0-4]\\d|25[0-5]|[1-9]\\d|\\d)\\." +        "(1\\d{2}|2[0-4]\\d|25[0-5]|[1-9]\\d|\\d)\\." +        "(1\\d{2}|2[0-4]\\d|25[0-5]|[1-9]\\d|\\d)";    //查看是否为视频专线所需告警(数据业余告警 & 华为ONU设施掉电告警 & 视频专线电路所对应IP/端口)        private boolean isVideoAlarm(GesAlarm gesAlarm){        //1.判断业余        Long alarmProftype = gesAlarm.getAlarmproftype();        boolean isDataProftype = alarmProftype!=null && alarmProftype==1; //是否数据业余        if(!isDataProftype) return false;        //2.判断告警名称        String alarmName = StringUtils.trim(gesAlarm.getAlarmname());//        boolean isImportAlarm = "华为ONU设施掉电告警".equals(alarmName) || "华为ONU设施掉电告警复原".equals(alarmName); //是否为所关注的告警(ONU掉电告警,流动/革除)        //boolean isImportAlarm = "华为ONU设施掉电告警".equals(alarmName) || "华为ONU设施掉电告警复原".equals(alarmName) || "华为分支断纤告警".equals(alarmName) || "华为分支断纤告警复原".equals(alarmName); //是否为所关注的告警(ONU掉电告警/断纤告警,流动/革除)        boolean isImportAlarm = "华为ONU设施掉电告警".equals(alarmName) || "华为ONU设施掉电告警复原".equals(alarmName) || "华为分支断纤告警".equals(alarmName) || "华为分支断纤告警复原".equals(alarmName)                || "中兴骨干断纤告警".equals(alarmName)|| "中兴骨干断纤告警复原".equals(alarmName)|| "中兴分支断纤告警".equals(alarmName) || "中兴分支断纤告警复原".equals(alarmName)                ||"中兴ONU设施掉电告警".equals(alarmName)||"中兴ONU设施掉电告警复原".equals(alarmName);         if(!isImportAlarm) return false;        //3.判断是否属于以后现有的设施的告警        String oltIp = null, alarmDetail = gesAlarm.getAlarmdetail();        Matcher matcher = Pattern.compile("上联OLTIP:"+IP_REGEX).matcher(alarmDetail);        if(matcher.find()) {//获取告警设施 ip            oltIp = matcher.group().substring("上联OLTIP:".length());        }        String rack=null, shelf=null, slot=null, port=null,ontid=null; // 架,框,槽,口        String alarmLocate = gesAlarm.getReserve5(); // 故障地位        Map<String,String> map = analyticalPostition(alarmLocate,alarmProftype);        rack = map.get("rack");        shelf = map.get("shelf");        slot = map.get("slot");        port = map.get("port");        ontid = map.get("ontid");        String ponInfo = nvl(shelf)+"/"+nvl(slot)+"/"+nvl(port)+":"+nvl(ontid);        boolean isFacility = ServiceProvider.getGesAlarmDao().impactVideoCir(oltIp, ponInfo);        if(!isFacility){            return false;        }        //4.打印返回        logger.info(String.format("视频专线相干-华为ONU设施掉电告警[alarmId:%d,业余:%d,网管:%d,原始告警ID:%s]",            gesAlarm.getAlarmid(),gesAlarm.getAlarmproftype(),gesAlarm.getSourceid(),gesAlarm.getSrcalarmId()));        //5.更新对应电路的在线状态        this.reloadOnlineStatus(oltIp, ponInfo);        return true;    }    private static void impactVideoAlarm(GesAlarm gesAlarm, String shelf, String slot, String port, String ontid){        //1.提取oltIp,ponInfo        String oltIp = null, alarmDetail = gesAlarm.getAlarmdetail();        Matcher matcher = Pattern.compile("上联OLTIP:"+IP_REGEX).matcher(alarmDetail);        if(matcher.find()) oltIp = matcher.group().substring("上联OLTIP:".length());        String ponInfo = nvl(shelf)+"/"+nvl(slot)+"/"+nvl(port)+":"+nvl(ontid);        //2.保留关联关系        Long alarmproftype = gesAlarm.getAlarmproftype();        String emsId = gesAlarm.getSourceid()+"";        String serial = gesAlarm.getSrcalarmId();        ServiceProvider.getGesAlarmDao().saveVideoAlarm(alarmproftype, emsId, serial, oltIp, ponInfo);    }    private static String nvl(String str){        return str==null?"":str;    }        //更新视频专线电路在线状态    public void reloadOnlineStatus(String oltIp, String ponInfo){        try {            long id = ServiceProvider.getGesAlarmDao().queryCirId(oltIp, ponInfo);            ReturnInfo returnInfo = this.queryVideoStatus(oltIp, ponInfo);            String result = returnInfo.getMsg();            if(!returnInfo.isFlag()){                logger.info(String.format("[状态更新-失败]:{id:%d, errorInfo:%s}\n", id, result));            }            Document dom = DocumentHelper.parseText(result.substring(result.indexOf("<root>"), result.indexOf("</root>")+7));            String resultCode = ((Element)dom.selectSingleNode("/root/msgHead/result")).attributeValue("code");            if("SUCCESS".equals(resultCode)){                String onlineStatus = ((Element)dom.selectSingleNode("/root/msgBody/block/fields/field[last()]/cusStatus")).attributeValue("status");                ServiceProvider.getGesAlarmDao().updateStatus(id, onlineStatus);                logger.info(String.format("[状态更新-胜利]:{id:%d, onlineStatus:%s}\n", id, onlineStatus));            }else{                String errorInfo = ((Element)dom.selectSingleNode("/root/msgHead/result")).getText();                logger.info(String.format("[状态更新-失败]:{id:%d, errorInfo:响应谬误-%s}\n", id, errorInfo));            }        } catch (Exception e) {            logger.info("视频专线申请产生未知异样", e);        }    }        private ReturnInfo queryVideoStatus(String ip, String ponInfo){        //1.结构报文        String result = null;        int requestId = ServiceProvider.getGesAlarmDao().querySequence();        if(requestId <= 0){            return new ReturnInfo(false, "程序异样-序列化申请ID失败");        }        String[] portAndOnt = ponInfo==null?null:ponInfo.split(":");        if(portAndOnt==null || portAndOnt.length!=2 || StringUtils.isEmpty(portAndOnt[0]) || StringUtils.isEmpty(portAndOnt[1])){            return new ReturnInfo(false, "数据异样-请查看PON口信息数据");        }        String requestMsg = createReqMsg(requestId, ip, portAndOnt[0], portAndOnt[1]);        //2.发送申请        try {            Client client = Client.create();            WebResource resource = client.resource("http://135.224.253.28:8081/services/NEService");            logger.info("申请报文为"+requestMsg);            ClientResponse resp = resource.entity(requestMsg.trim()).post(ClientResponse.class);            result = resp.getEntity(String.class).replace("&lt;", "<").replace("&gt;", ">");            logger.info("响应报文为"+result+"");        }catch(Exception e){            logger.info("失败信息为:\n"+result, e);            return new ReturnInfo(false, "程序异样-申请IPOSS产生未知谬误");        }        return new ReturnInfo(true, result);    }        //结构申请报文    private String createReqMsg(int requestId, String ip, String port, String ontid){        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");        String headMsg =             "<msgHead>"+                "<requestId>"+requestId+"</requestId>"+                "<requestTime>"+sdf.format(new Date())+"</requestTime>"+                "<source>KANMS</source>"+                "<target>IPOSS</target>"+             "</msgHead>";        String bodyMsg =             "<msgBody>"+                "<msgType>selectPonCusStatus</msgType>" +                "<block code=\"IPOSS@guizhou.cn:NEService\">" +                    "<fields>" +                        "<field>" +                            "<phyEqp ip=\""+ip+"\" type=\"OLT\"/>" +                            "<phyEqp ontid=\""+ontid+"\" type=\"ONU\"/>" +                        "</field>" +                        "<field code=\"portInfo\">" +                            "<port eqpIp=\""+ip+"\" id=\""+port+"\"/>" +                        "</field>" +                    "</fields>" +                "</block>" +            "</msgBody>";        StringBuffer sb = new StringBuffer("<soapenv:Envelope xmlns:soapenv=\"http://schemas.xmlsoap.org/soap/envelope/\" xmlns:sys=\"http://sysinf.ngis.ai.com\">" +                "<soapenv:Header/><soapenv:Body><sys:selectPonCusStatus><sys:xml><![CDATA[<root>" + headMsg + bodyMsg + "</root>]]></sys:xml></sys:selectPonCusStatus></soapenv:Body></soapenv:Envelope>");        return sb.toString();    }}class ReturnInfo{    private boolean flag;    private String msg;    public ReturnInfo(){}    public ReturnInfo(boolean flag, String msg) {        this.flag = flag;        this.msg = msg;    }    public boolean isFlag() {        return flag;    }    public void setFlag(boolean flag) {        this.flag = flag;    }    public String getMsg() {        return msg;    }    public void setMsg(String msg) {        this.msg = msg;    }}

备注:在开发过程中碰到一个问题:这样多线程形式解决数据会导致雷同数据反复生产即反复剖析问题,后改了取数据的逻辑。
即:每次取满内存队列后,等队列全副生产完后,再从数据库中取数据,而不是边剖析数据边从库中取数据。