本我的项目采纳的是SpringMvc+原生JDBC开发的Java数据处理过程,用的是oracle数据库。
我的项目整体目录如下:
我的项目中用到的消息中间件次要有activeMQ和KAFKA两种,本篇次要介绍KAFKA
KAFKA数据的推和取,分为生产者和消费者。生产者定义一个topic,相当于一个数据队列,将数据推送至kafka的该topic下,消费者凭借该topic从kafka中按序列读取数据。实例代码实现如下:
首先是oracle数据库配置文件
<?xml version="1.0" encoding="GB18030"?><!-- the proxool configuration can be embedded within your own application's.Anything outside the "proxool" tag is ignored. --><something-else-entirely> <proxool> <alias>gess</alias> <driver-url>jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS_LIST=(ADDRESS=(PROTOCOL=TCP)(HOST=135.224.21.131)(PORT=1521))(ADDRESS = (PROTOCOL = TCP)(HOST = 135.224.21.133)(PORT=1521))(LOAD_BALANCE=yes)(FAILOVER=on))(CONNECT_DATA=(SERVICE_NAME=iam)(FAILOVER_MODE=(TYPE=SESSION)(METHOD=BASIC))))</driver-url> <!-- <driver-url>jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST=135.224.21.133)(PORT=1521))(ADDRESS=(PROTOCOL=TCP)(HOST=135.224.21.131)(PORT=1521))(LOAD_BALANCE=yes)(CONNECT_DATA=(SERVER=DEDICATED)(SERVICE_NAME=iam2)))</driver-url>--> <!-- <driver-url>jdbc:oracle:thin:@//135.224.21.133:1521:iam2</driver-url> --> <driver-class>oracle.jdbc.driver.OracleDriver</driver-class> <driver-properties> <!-- <property name="user" value="c##scott"/> <property name="password" value="StCgzpt@12!qqz;"/> --> <property name="user" value="xjgzk"/> <property name="password" value="Gzk%^1202"/> </driver-properties> <maximum-connection-count>1000</maximum-connection-count> <minimum-connection-count>1</minimum-connection-count> <maximum-active-time>90000</maximum-active-time> <maximum-connection-lifetime>3600000</maximum-connection-lifetime> <house-keeping-test-sql>select CURRENT_DATE from DUAL</house-keeping-test-sql> </proxool></something-else-entirely>
程序入口,main办法主体
/** * */package com.ustcinfo.kanms.alarmcollector.main;import java.text.ParseException;import com.ustcinfo.kanms.alarmcollector.framework.GesAlarmAnalysisInDBThreads;import com.ustcinfo.kanms.alarmcollector.framework.GesAlarmInDBThreads;import com.ustcinfo.kanms.alarmcollector.framework.UnusualAlarmAnalysisThreads;import com.ustcinfo.kanms.alarmcollector.kafka.kafkaMessageReceiver;/** * ================================================= <br> * 工程:GessAlarmCollector <br> * 类名:MainProcess <br> * 作者:lt <br> * 工夫:2019-9-18下午02:57:59<br> * 版本:Version 1.0 <br><br> * 形容:零碎入口<br> * ================================================= <br> */public class MainProcess { /** * 零碎入口 * @param args * @throws ParseException */ public static void main(String[] args) throws ParseException { // 初始化零碎配置文件 GlobleConfiguration.getInstance().initSysConfig(); //启动Kafka consumer监督,获取告警数据 new kafkaMessageReceiver(2).start(); //启动数据入库线程(瞬断入库) new GesAlarmInDBThreads(2); //非瞬断告警剖析入库 new GesAlarmAnalysisInDBThreads(3); //异样告警查问对应流动告警 new UnusualAlarmAnalysisThreads(2); }}
全局化参数定义
/** * */package com.ustcinfo.kanms.alarmcollector.main;import java.io.IOException;import java.io.InputStream;import java.util.HashMap;import java.util.Iterator;import java.util.Map;import java.util.Properties;import org.apache.activemq.ActiveMQConnection;import org.apache.log4j.Appender;import org.apache.log4j.FileAppender;import org.apache.log4j.Logger;import org.apache.log4j.PropertyConfigurator;import com.ustcinfo.kanms.alarmcollector.util.ResourceLoaderUtil;/** * ================================================= <br> * 工程:GessAlarmCollector <br> * 类名:GlobleConfiguration <br> * 作者:dlzhang <br> * 工夫:2014-8-28下午06:45:11<br> * 版本:Version 1.0 <br> <br> * 形容:初始化全局参数,保障配置数据只被加载一次<br> * ================================================= <br> */public class GlobleConfiguration { private static final Logger logger = Logger.getLogger(GlobleConfiguration.class); private static GlobleConfiguration instance; private GlobleConfiguration(){} public static final String LOG_CFG_FILE_NAME_DEFAULT = "log4j_alarm.properties"; public static final String LOG_FILE_NAME_DEFAULT = "GessAlarmCollector.log"; public static final String SYS_CFG_FILE_NAME_DEFAULT = "sys_config_alarm.properties"; public static final String ACTIVE_MQ_RUL = "ACTIVE_MQ_RUL"; public static final String ALARM_RECV_QUEUE_NAME = "ALARM_RECV_QUEUE_NAME"; public static final String ALARM_BILL_REL_RECV_QUEUE_NAME = "ALARM_BILL_REL_RECV_QUEUE_NAME"; public static final String ACTIVE_MQ_SEND_QUEUE_NAME = "ACTIVE_MQ_SEND_QUEUE_NAME"; public static final String ACTIVE_MQ_USER = "ACTIVE_MQ_USER"; public static final String ACTIVE_MQ_PASSWD = "ACTIVE_MQ_PASSWD"; public static final String ACTIVE_MQ_USER_DEFAULT = ActiveMQConnection.DEFAULT_USER; public static final String ACTIVE_MQ_PASSWD_DEFAULT = ActiveMQConnection.DEFAULT_PASSWORD; public static final String KAFKA_RUL = "KAFKA_RUL"; public static final String KAFKA_TOPIC = "KAFKA_TOPIC"; public static final String KAFKA_QFYJ_TOPIC = "QFYJ_TOPIC"; private String activeMqUrl; private String alarmRecvQueueName; private String alarmBillRelRecvQueueName; private String activeMqSendQueueName; private String activeMqUser; private String activeMqPasswd; private String kafkaUrl; private String kafkaTopic; private String kafkaQFYJTopic; private String groupid; public void initSysConfig() { // 初始化日志 initLogConfig(LOG_CFG_FILE_NAME_DEFAULT); // 本地测试需注掉 logger.info("零碎初始化:过程以后内存占用量(兆):" + (int)(Runtime.getRuntime().totalMemory()/(1048576))); Map<String, String> configMap = loadSysConfigTrim(SYS_CFG_FILE_NAME_DEFAULT); this.activeMqUrl = configMap.get(ACTIVE_MQ_RUL); this.alarmRecvQueueName = configMap.get(ALARM_RECV_QUEUE_NAME); this.alarmBillRelRecvQueueName = configMap.get(ALARM_BILL_REL_RECV_QUEUE_NAME); this.activeMqSendQueueName = configMap.get(ACTIVE_MQ_SEND_QUEUE_NAME); this.activeMqUser = configMap.get(ACTIVE_MQ_USER); if(null == this.activeMqUser) this.activeMqUser = ACTIVE_MQ_USER_DEFAULT; this.activeMqPasswd = (String)configMap.get(ACTIVE_MQ_PASSWD); if(null == this.activeMqPasswd) this.activeMqPasswd = ACTIVE_MQ_PASSWD_DEFAULT; this.kafkaUrl = configMap.get(KAFKA_RUL); this.kafkaTopic = configMap.get(KAFKA_TOPIC); this.kafkaQFYJTopic = configMap.get(KAFKA_QFYJ_TOPIC); logger.debug(this.toString()); logger.info("零碎初始化结束:过程以后内存占用量(兆):" + (int)(Runtime.getRuntime().totalMemory()/(1048576))); } /** * 加载配置文件 * @param sysFile * @return */ private Map<String, String> loadSysConfigTrim(String sysFile) { if(null == sysFile) sysFile = SYS_CFG_FILE_NAME_DEFAULT; InputStream is = null; Properties props = null; Map<String, String> cfgMap = null; String configPath = System.getenv("GESS_CONFIG"); if(null != configPath && !"".equals(configPath)) { sysFile = configPath + "/" + sysFile; } logger.info("零碎配置文件:" + sysFile); is = ResourceLoaderUtil.getResourceAsStream(sysFile); try { props = new Properties(); props.load(is); cfgMap = new HashMap<String, String>(); for(Iterator<Object> it=props.keySet().iterator(); it.hasNext();) { String key = ((String)it.next()).trim(); String value = props.getProperty(key).trim(); cfgMap.put(key, value); } } catch (IOException e) { logger.error("加载配置文件失败", e); } finally { try { if (is != null) is.close(); is = null; } catch (IOException e) { logger.error("敞开IO流失败", e); } } return cfgMap; } /** * 初始化log4j配置文件 */ private void initLogConfig(String cfgFile) { if(null == cfgFile) cfgFile = LOG_CFG_FILE_NAME_DEFAULT; String cfgPath = System.getenv("GESS_CONFIG"); // 没有配置环境变量则从当前目录找 if(cfgPath == null || "".equals(cfgPath)) { cfgFile = "./" + cfgFile; } else { cfgFile = cfgPath + "/" + cfgFile; } PropertyConfigurator.configure(cfgFile); generateLogFileName(null); logger.info("日志配置文件:" + cfgFile); } /** * 生成日志文件名 */ private void generateLogFileName(String logFile) { if(null == logFile) logFile = LOG_FILE_NAME_DEFAULT; Appender app = Logger.getRootLogger().getAppender("R"); if (app instanceof FileAppender) { FileAppender fileApp = (FileAppender) app; String logPath = System.getenv("GESS_LOG"); // 没有配置环境变量则生成到当前目录 if(logPath != null && !"".equals(logPath)) { logFile = logPath + "/" + logFile; } fileApp.setFile(logFile); fileApp.activateOptions(); } } /* (non-Javadoc) * @see java.lang.Object#toString() */ @Override public String toString() { return "GlobleConfiguration [activeMqPasswd=" + activeMqPasswd + ", activeMqSendQueueName=" + activeMqSendQueueName + ", activeMqUrl=" + activeMqUrl + ", activeMqUser=" + activeMqUser + ", alarmBillRelRecvQueueName=" + alarmBillRelRecvQueueName + ", alarmRecvQueueName=" + alarmRecvQueueName + "]"; } /** * @return the instance */ public static GlobleConfiguration getInstance() { if(null == instance) instance = new GlobleConfiguration(); return instance; } /** * @return the activeMqUrl */ public String getActiveMqUrl() { return activeMqUrl; } /** * @param activeMqUrl the activeMqUrl to set */ public void setActiveMqUrl(String activeMqUrl) { this.activeMqUrl = activeMqUrl; } /** * @return the alarmRecvQueueName */ public String getAlarmRecvQueueName() { return alarmRecvQueueName; } /** * @param alarmRecvQueueName the alarmRecvQueueName to set */ public void setAlarmRecvQueueName(String alarmRecvQueueName) { this.alarmRecvQueueName = alarmRecvQueueName; } /** * @return the alarmBillRelRecvQueueName */ public String getAlarmBillRelRecvQueueName() { return alarmBillRelRecvQueueName; } /** * @param alarmBillRelRecvQueueName the alarmBillRelRecvQueueName to set */ public void setAlarmBillRelRecvQueueName(String alarmBillRelRecvQueueName) { this.alarmBillRelRecvQueueName = alarmBillRelRecvQueueName; } /** * @return the activeMqUser */ public String getActiveMqUser() { return activeMqUser; } /** * @param activeMqUser the activeMqUser to set */ public void setActiveMqUser(String activeMqUser) { this.activeMqUser = activeMqUser; } /** * @return the activeMqPasswd */ public String getActiveMqPasswd() { return activeMqPasswd; } /** * @param activeMqPasswd the activeMqPasswd to set */ public void setActiveMqPasswd(String activeMqPasswd) { this.activeMqPasswd = activeMqPasswd; } /** * @return the activeMqSendQueueName */ public String getActiveMqSendQueueName() { return activeMqSendQueueName; } /** * @param activeMqSendQueueName the activeMqSendQueueName to set */ public void setActiveMqSendQueueName(String activeMqSendQueueName) { this.activeMqSendQueueName = activeMqSendQueueName; } /** * @return the kafkaUrl */ public String getKafkaUrl() { return kafkaUrl; } /** * @param kafkaUrl the kafkaUrl to set */ public void setKafkaUrl(String kafkaUrl) { this.kafkaUrl = kafkaUrl; } /** * @return the kafkaTopic */ public String getKafkaTopic() { return kafkaTopic; } /** * @param kafkaTopic the kafkaTopic to set */ public void setKafkaTopic(String kafkaTopic) { this.kafkaTopic = kafkaTopic; } public String getKafkaQFYJTopic() { return kafkaQFYJTopic; } public void setKafkaQFYJTopic(String kafkaQFYJTopic) { this.kafkaQFYJTopic = kafkaQFYJTopic; } public String getGroupid() { return groupid; } public void setGroupid(String groupid) { this.groupid = groupid; }}
定义KFAKA的topic,kafka承受音讯
/** * */package com.ustcinfo.kanms.alarmcollector.kafka;import java.util.ArrayList;import java.util.Arrays;import java.util.Date;import java.util.HashSet;import java.util.List;import java.util.Map;import java.util.Properties;import java.util.Set;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.TopicPartition;import org.apache.log4j.Logger;import redis.clients.jedis.Jedis;import com.ustcinfo.kanms.alarmcollector.framework.ReadXML;import com.ustcinfo.kanms.alarmcollector.main.GlobleConfiguration;import com.ustcinfo.kanms.alarmcollector.model.GesAlarm;import com.ustcinfo.kanms.alarmcollector.model.GesAlarmDto;import com.ustcinfo.kanms.alarmcollector.util.JedisConfig;import com.ustcinfo.kanms.alarmcollector.util.JsonUtil;import com.ustcinfo.kanms.alarmcollector.util.escapeUtil;/** * ================================================= <br> * 工程:AlarmAutoReceiver <br> * 类名:ActiveMQReceiver <br> * 作者:lt <br> * 工夫:2019-9-23下午08:49:38<br> * 版本:Version 1.0 <br><br> * 形容:kafka承受音讯<br> *<br> * ================================================= <br> */public class kafkaMessageReceiver{ private static final Logger logger = Logger.getLogger(kafkaMessageReceiver.class); private String kafkaTopic; private String kafkaUrl; private String groupId = "group01"; private Lock lock; private List<ConsumerThread> threadList = new ArrayList<ConsumerThread>(); private static Properties buildKafkaProperty(String brokers, String groupId){ Properties properties = new Properties(); properties.put("bootstrap.servers", brokers); properties.put("group.id", groupId); properties.put("enable.auto.commit", "true"); properties.put("auto.commit.interval.ms", "1000"); properties.put("session.timeout.ms", "30000"); properties.put("auto.offset.reset", "earliest"); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); return properties; } public kafkaMessageReceiver(int n){ //参数获取 this.kafkaUrl = GlobleConfiguration.getInstance().getKafkaUrl(); this.kafkaTopic = GlobleConfiguration.getInstance().getKafkaTopic(); this.lock=new ReentrantLock(); for(int i=0;i<n;i++){ threadList.add(new ConsumerThread("consumerThread_"+i)); } } public void start(){ logger.info("告警接管多线程,线程数为"+threadList.size()+"...."); if(threadList.size()>0){ for(ConsumerThread at:threadList){ at.start(); } } } class ConsumerThread extends Thread{ private final KafkaConsumer<String, String> consumer; Set<TopicPartition> assignment = new HashSet<>(); public ConsumerThread(String name) { super(name); Properties properties = buildKafkaProperty(kafkaUrl,groupId); this.consumer = new KafkaConsumer<>(properties); this.consumer.subscribe(Arrays.asList(kafkaTopic)); } public void run(){ logger.info("告警接管线程"+this.getName()+"启动....."); try{ // 指定分区从开端生产 ConsumerRecords<String,String> consumerRecords_1 = consumer.poll(100); assignment = consumer.assignment(); Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignment); for (TopicPartition tp : assignment) { Long offset = endOffsets.get(tp); //System.out.println("分区最新ossset= "+ offset + " "); //取得上次接收数据的最大offset //Long curoffset=gesAlarmDataBase.getOffsetMax(); System.out.println("分区 " + tp + " 从 " + offset + " 开始生产"); consumer.seek(tp, offset); } while(true){ ConsumerRecords<String,String> consumerRecords = consumer.poll(100); for (ConsumerRecord<String,String> item : consumerRecords){ ConsumerThreadHandler(item,item.offset()); } } }catch (Exception e) { logger.error("Unexpected error", e); } finally { consumer.close(); } } //告警处理过程 private void ConsumerThreadHandler(ConsumerRecord<String,String> item,Long offset){ logger.info("收到原始告警:"+item.value()); GesAlarm gesAlarms=new GesAlarm(); String xmlString =""; String xml=(String)item.value(); if(xml.indexOf("<50-description>")>0){ xmlString =xml.replace("<50-description>", "|50-description|"); }else xmlString =xml; String xml_escape= escapeUtil.escapeHTMLTag(xmlString); //String s="<?xml version='1.0' encoding='UTF-8'?><Root><Header><Esb><Route><Sender>65.0001</Sender><Time>2019-12-28 19:39:46.910</Time><ServCode>65.0001.Itim_AlarmInfoPubServ.broadcast</ServCode><MsgId>65.0001_20191228193946_7533186910</MsgId><TransId>65.0001_20191228193946_7533186910</TransId><AuthType/><AuthCode/><Version>V0.1</Version><EsbId/><CarryType>0</CarryType><ServTestFlag>0</ServTestFlag><MsgType/><Topic>1</Topic></Route></Esb></Header><Body><![CDATA[<SERVICE><ALARMS><ALARM><AREAID>XJ-AKS</AREAID><ALARMTYPEID>4</ALARMTYPEID><ALARMPROFTYPE>1</ALARMPROFTYPE><ALARMCLASS>0</ALARMCLASS><ALARMLEVEL>3</ALARMLEVEL><SOURCEID>1014501</SOURCEID><ALARMNAME>输出光功率(dBm)越限复原(612603)</ALARMNAME><ALARMSUMMARY>输出光功率(dBm)越限复原(612603)</ALARMSUMMARY><ALARMDETAIL>设施222.83.41.23(编码AKS-AWT-MSE-1.MAN.M6000;类型BRAS;型号M6000)告警: 输出光功率(dBm)越限复原((rack = NA, shelf = 0, slot = 1, subslot = 1, port = 6, lane = 0, port_type = 131) <50-description> 15min low alarm disappear. Low thres is: -23.0000, Current value is: -15.8670.)</ALARMDETAIL><PROBABLECAUSE></PROBABLECAUSE><SRCALARMID>1195831659</SRCALARMID><USERLABEL></USERLABEL><ADITIONALINFO></ADITIONALINFO><PROPOSEDADVISE></PROPOSEDADVISE><COLLECTTIME>2019-12-28 19:39:41</COLLECTTIME><NEALARMTIME>2019-12-28 19:40:49</NEALARMTIME><RESUMEFLAG>0</RESUMEFLAG><RESUMETIME></RESUMETIME><GROUPFLAG>0</GROUPFLAG><BUREAUID></BUREAUID><RESERVE1>222.83.41.23</RESERVE1><RESERVE2>M6000</RESERVE2><RESERVE3>阿瓦提M6000</RESERVE3><RESERVE4>3</RESERVE4><RESERVE5>输出光功率</RESERVE5><RESERVE6></RESERVE6><ALARMLEVEL>3</ALARMLEVEL><RESERVE7></RESERVE7></ALARM></ALARMS></SERVICE>]]></Body></Root>"; // String xmlString= escapeUtil.escapeHTMLTag(s); if(xml_escape!=null){ List<GesAlarm> alarmList=ReadXML.getAlarmValue(ReadXML.obtainBodyInfo(xml_escape)); if(alarmList!=null&&alarmList.size()>0){ for (GesAlarm gesAlarm : alarmList) { gesAlarms=gesAlarm; GesAlarmDto gesAlarmDto=new GesAlarmDto(); gesAlarmDto.setAreaid(gesAlarms.getAreaid()); gesAlarmDto.setAlarmtypeid(gesAlarms.getAlarmtypeid()); gesAlarmDto.setAlarmproftype(gesAlarms.getAlarmproftype()); gesAlarmDto.setAlarmclass(gesAlarms.getAlarmclass()); gesAlarmDto.setAlarmlevel(gesAlarms.getAlarmlevel()); gesAlarmDto.setSourceid(gesAlarms.getSourceid()); gesAlarmDto.setAlarmname(gesAlarms.getAlarmname()); gesAlarmDto.setAlarmsummary(gesAlarms.getAlarmsummary()); gesAlarmDto.setAlarmdetail(gesAlarms.getAlarmdetail()); gesAlarmDto.setProbablecause(gesAlarms.getProbablecause()); gesAlarmDto.setSrcalarmId(gesAlarms.getSrcalarmId()); gesAlarmDto.setUserlabel(gesAlarms.getUserlabel()); gesAlarmDto.setAditionalinfo(gesAlarms.getAditionalinfo()); gesAlarmDto.setProposedadvise(gesAlarms.getProposedadvise()); gesAlarmDto.setCollecttime(gesAlarms.getCollecttime()); gesAlarmDto.setNealarmtime(gesAlarms.getNealarmtime()); gesAlarmDto.setResumeflag(gesAlarms.getResumeflag()); gesAlarmDto.setResumetime(gesAlarms.getResumetime()); gesAlarmDto.setGroupflag(gesAlarms.getGroupflag()); gesAlarmDto.setBureauid(gesAlarms.getBureauid()); gesAlarmDto.setReserve1(gesAlarms.getReserve1()); gesAlarmDto.setReserve2(gesAlarms.getReserve2()); gesAlarmDto.setReserve3(gesAlarms.getReserve3()); gesAlarmDto.setReserve4(gesAlarms.getReserve4()); gesAlarmDto.setReserve5(gesAlarms.getReserve5()); gesAlarmDto.setReserve6(gesAlarms.getReserve6()); gesAlarmDto.setReserve7(gesAlarms.getReserve7()); gesAlarmDto.setEnterqueuetime(new Date()); //gesAlarmDto.setEnterlibrarytime(null); //gesAlarmDto.setOffset(offset); //告警过滤(哪些告警须要/不须要,现阶段都是通过策略中的告警名称来过滤) boolean isuseful=alarmFilter(gesAlarmDto); if(isuseful){ alarmPreHandle(gesAlarmDto); } } } } } } /**告警预处理 * * */ private void alarmPreHandle(GesAlarmDto gesAlarmDto){ if(gesAlarmDto.getResumeflag()==null){//为空,为批改告警 return; } Jedis jedis=JedisConfig.getJedis(); //在缓存汇合中查问是否有对应的流动告警/复原告警 if(jedis!=null&&gesAlarmDto!=null){ //if(gesAlarmDto!=null){ //查问雷同srcalarmid lock.lock(); Set<String> set=jedis.keys(gesAlarmDto.getSrcalarmId()+":*");//查问到雷同的srcalarm_id JedisConfig.returnResource(jedis); if(set.size()==1){ for (String s : set) { Jedis jedis_pop=JedisConfig.getJedis(); String str=jedis_pop.rpop(s);//从队列中取出查问到的告警 JedisConfig.returnResource(jedis_pop); GesAlarmDto searchalarm=JsonUtil.stringToBean(str, GesAlarmDto.class); if(searchalarm==null){// Jedis jedis_push=JedisConfig.getJedis(); jedis_push.lpush(gesAlarmDto.getSrcalarmId()+":"+gesAlarmDto.getEnterqueuetime().getTime(),JsonUtil.beanToString(gesAlarmDto)); JedisConfig.returnResource(jedis_push); } else if(searchalarm!=null&&searchalarm.getSourceid().equals(gesAlarmDto.getSourceid()) &&searchalarm.getSrcalarmId().equals(gesAlarmDto.getSrcalarmId()) &&!searchalarm.getResumeflag().equals(gesAlarmDto.getResumeflag())){ searchalarm.setIdentify(2); gesAlarmDto.setIdentify(2); Jedis jedis_push=JedisConfig.getJedis(); jedis_push.lpush(searchalarm.getSrcalarmId()+":"+searchalarm.getEnterqueuetime().getTime(),JsonUtil.beanToString(searchalarm)); JedisConfig.returnResource(jedis_push); Jedis jedis_push2=JedisConfig.getJedis(); jedis_push2.lpush(gesAlarmDto.getSrcalarmId()+":"+gesAlarmDto.getEnterqueuetime().getTime(),JsonUtil.beanToString(gesAlarmDto)); JedisConfig.returnResource(jedis_push2); } else if(searchalarm!=null&&searchalarm.getSourceid().equals(gesAlarmDto.getSourceid()) &&searchalarm.getSrcalarmId().equals(gesAlarmDto.getSrcalarmId()) &&searchalarm.getResumeflag().equals(gesAlarmDto.getResumeflag())){ Jedis jedis_push3=JedisConfig.getJedis(); jedis_push3.lpush(searchalarm.getSrcalarmId()+":"+searchalarm.getEnterqueuetime().getTime(),JsonUtil.beanToString(searchalarm)); JedisConfig.returnResource(jedis_push3); logger.info("取到反复告警,不入队列,srcalarm_id="+gesAlarmDto.getSrcalarmId()); }else { logger.info("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~"); } } }else if(set.size()>1){//队列中曾经有一对瞬断告警,间接将此告警忽略不计 logger.info("取到反复瞬断告警,不入队列,srcalarm_id="+gesAlarmDto.getSrcalarmId()); } else{ Jedis jedis_push4=JedisConfig.getJedis(); jedis_push4.lpush(gesAlarmDto.getSrcalarmId()+":"+gesAlarmDto.getEnterqueuetime().getTime(), JsonUtil.beanToString(gesAlarmDto)); JedisConfig.returnResource(jedis_push4); } lock.unlock(); } } /** * 告警过滤(过滤无效告警)*/ private boolean alarmFilter(GesAlarmDto alarm){ boolean isUseful=false; switch(alarm.getAlarmproftype().toString()){ case "1"://数据 isUseful=getShuJufiler(alarm); break; case "2": //isUseful=getChuanShufiler(alarm); isUseful=true; break; case "3": //isUseful=getDongHuanfiler(alarm); isUseful= true; break; case "4": //isUseful=getWuXianfiler(alarm); isUseful= true; break; case "5": //isUseful=getJiaoHuanfiler(alarm); isUseful= true; break; case "6": //isUseful=getYeWuPingTaifiler(alarm); isUseful= true; break; case "7": isUseful=get4Gfiler(alarm); break; case "8": isUseful=getIPRANfiler(alarm); //isUseful=true; break; case "9": //isUseful=getZiJianKongfiler(alarm); isUseful=true; break; case "10": //isUseful=getQunZhangLanJiefiler(alarm); isUseful=true; break; case "11": isUseful=true; break; case "12": isUseful=true; break; default: isUseful=false; } return isUseful; } /**数据业余无效告警过滤*/ private boolean getShuJufiler(GesAlarmDto alarm){ if(alarm.getAlarmname().indexOf("华为骨干断纤告警")>=0||alarm.getAlarmname().indexOf("中兴骨干断纤告警")>=0 ||alarm.getAlarmname().indexOf("战火ONU设施掉电")>=0||alarm.getAlarmname().indexOf("中兴ONU设施掉电")>=0||alarm.getAlarmname().indexOf("华为ONU设施掉电")>=0 ||alarm.getAlarmname().indexOf("中兴分支断纤告警")>=0||alarm.getAlarmname().indexOf("华为登录设施")>=0 ||alarm.getAlarmname().indexOf("华为登陆设施")>=0){ return false; }else if(alarm.getAlarmname().indexOf("SNMP_Link_Up")>=0&&alarm.getResumeflag()==0){ return false; }else{ return true; } } /**传输业余无效告警过滤*/ private boolean getChuanShufiler(GesAlarmDto alarm){ if(alarm.getAlarmname().indexOf("FAN_ALM")>=0||alarm.getAlarmname().indexOf("LINK_LOS")>=0 ||alarm.getAlarmname().indexOf("PPI_LOS")>=0||alarm.getAlarmname().indexOf("TEMP_OVER")>=0 ||alarm.getAlarmname().indexOf("0LS")>=0||alarm.getAlarmname().indexOf("ABSENCE_WARNING")>=0 ||alarm.getAlarmname().indexOf("ALM_GFP_dCSF")>=0||alarm.getAlarmname().indexOf("BD_STATUS")>=0 ||alarm.getAlarmname().indexOf("BIOS_STATUS")>=0||alarm.getAlarmname().indexOf("BUS_ERR")>=0 ||alarm.getAlarmname().indexOf("CARD_TCT")>=0||alarm.getAlarmname().indexOf("CLIENT_PORT_PS")>=0 ||alarm.getAlarmname().indexOf("COMFAIL")>=0||alarm.getAlarmname().indexOf("COMMUN_FAIL")>=0 ||alarm.getAlarmname().indexOf("coolingFanFailure")>=0||alarm.getAlarmname().indexOf("DBMS_ERROR")>=0 ||alarm.getAlarmname().indexOf("DCVOLDOWN")>=0||alarm.getAlarmname().indexOf("DCVOLOVER")>=0 ||alarm.getAlarmname().indexOf("detectTemp")>=0||alarm.getAlarmname().indexOf("ELCMODALM")>=0 ||alarm.getAlarmname().indexOf("ETH_LOS")>=0||alarm.getAlarmname().indexOf("etpieUnlink")>=0 ||alarm.getAlarmname().indexOf("External1")>=0||alarm.getAlarmname().indexOf("External13")>=0 ||alarm.getAlarmname().indexOf("FAIL")>=0||alarm.getAlarmname().indexOf("FAN_ALM")>=0 ||alarm.getAlarmname().indexOf("FAN_FAIL")>=0||alarm.getAlarmname().indexOf("FAN_TEMPTURE")>=0 ||alarm.getAlarmname().indexOf("fanMiss")>=0||alarm.getAlarmname().indexOf("HARD_BAD")>=0 ||alarm.getAlarmname().indexOf("ILS")>=0||alarm.getAlarmname().indexOf("IN_PWR_ABN")>=0 ||alarm.getAlarmname().indexOf("IN_PWR_FAIL")>=0||alarm.getAlarmname().indexOf("IN_PWR_HIGH")>=0 ||alarm.getAlarmname().indexOf("IN_PWR_LOW")>=0||alarm.getAlarmname().indexOf("INPWR_FAIL")>=0 ||alarm.getAlarmname().indexOf("INTRA_OTU_PS")>=0||alarm.getAlarmname().indexOf("IOP_HIGH")>=0 ||alarm.getAlarmname().indexOf("IOP_LOW")>=0||alarm.getAlarmname().indexOf("IOP-HIGH")>=0 ||alarm.getAlarmname().indexOf("IOP-LOW")>=0||alarm.getAlarmname().indexOf("laserOld")>=0 ||alarm.getAlarmname().indexOf("LOF")>=0||alarm.getAlarmname().indexOf("LOS")>=0 ||alarm.getAlarmname().indexOf("lossOfFrame")>=0||alarm.getAlarmname().indexOf("lossOfSignal")>=0 ||alarm.getAlarmname().indexOf("LSR_WILL_DIE")>=0||alarm.getAlarmname().indexOf("M_S_DATA_MSIMATCH")>=0 ||alarm.getAlarmname().indexOf("M_S_VER_MISMATCH")>=0||alarm.getAlarmname().indexOf("ModuleTemp-TCA")>=0 ||alarm.getAlarmname().indexOf("MS_SNCP_PS")>=0||alarm.getAlarmname().indexOf("MS-PSE")>=0 ||alarm.getAlarmname().indexOf("MUT_LOS")>=0||alarm.getAlarmname().indexOf("MUT_TLOS")>=0 ||alarm.getAlarmname().indexOf("NE_COMMU_BREAK")>=0||alarm.getAlarmname().indexOf("NE_NOT_LOGIN")>=0 ||alarm.getAlarmname().indexOf("OA_LOW_GAIN")>=0||alarm.getAlarmname().indexOf("ODU_SNCP_PS")>=0 ||alarm.getAlarmname().indexOf("ODUK_PS")>=0||alarm.getAlarmname().indexOf("ODUK_SNCP_PS")>=0 ||alarm.getAlarmname().indexOf("OOF")>=0||alarm.getAlarmname().indexOf("OPT_LESS_INPUT")>=0 ||alarm.getAlarmname().indexOf("OPT_NO_INPUT")>=0||alarm.getAlarmname().indexOf("OPT_OFFSET_UP")>=0 ||alarm.getAlarmname().indexOf("OPT_OVER_INPUT")>=0||alarm.getAlarmname().indexOf("OSC_LOS")>=0 ||alarm.getAlarmname().indexOf("OTN_LOF")>=0||alarm.getAlarmname().indexOf("OTRX_ABSENT")>=0 ||alarm.getAlarmname().indexOf("OTS_LOS")>=0||alarm.getAlarmname().indexOf("OTS_LOS-P")>=0 ||alarm.getAlarmname().indexOf("OTU_LOF")>=0||alarm.getAlarmname().indexOf("OVER_BRD_TMP")>=0 ||alarm.getAlarmname().indexOf("OVER_ENV_TMP")>=0||alarm.getAlarmname().indexOf("OWSP_PS")>=0 ||alarm.getAlarmname().indexOf("PG_P_FAIL")>=0||alarm.getAlarmname().indexOf("PG_SWITCH")>=0 ||alarm.getAlarmname().indexOf("PLM")>=0||alarm.getAlarmname().indexOf("POWER_ABNORMAL")>=0 ||alarm.getAlarmname().indexOf("POWER_FAIL")>=0||alarm.getAlarmname().indexOf("POWER_HIGH_48V")>=0 ||alarm.getAlarmname().indexOf("POWER_LOW_48V")>=0||alarm.getAlarmname().indexOf("POWERALM")>=0 ||alarm.getAlarmname().indexOf("POWERALM")>=0||alarm.getAlarmname().indexOf("powerBox1Port1NoInput")>=0 ||alarm.getAlarmname().indexOf("powerBox2Port1NoInput")>=0||alarm.getAlarmname().indexOf("powerProblem")>=0 ||alarm.getAlarmname().indexOf("powerSupplyFailure")>=0||alarm.getAlarmname().indexOf("protectionSwitchEvent")>=0 ||alarm.getAlarmname().indexOf("PS")>=0||alarm.getAlarmname().indexOf("PSE")>=0 ||alarm.getAlarmname().indexOf("PUMP_SD")>=0||alarm.getAlarmname().indexOf("PUMP_TF")>=0 ||alarm.getAlarmname().indexOf("PW_INPUT_FAIL")>=0||alarm.getAlarmname().indexOf("PW_SHELL_NO_INPUT")>=0 ||alarm.getAlarmname().indexOf("PW_SHELL_NO_OUTPUT")>=0||alarm.getAlarmname().indexOf("PWR_MAJ_ALM")>=0 ||alarm.getAlarmname().indexOf("PWR_TEMP_HIGH")>=0||alarm.getAlarmname().indexOf("PWR_TEMP_OVERTH")>=0 ||alarm.getAlarmname().indexOf("R_LOF")>=0||alarm.getAlarmname().indexOf("R_OOF")>=0 ||alarm.getAlarmname().indexOf("replaceableUnitMissing")>=0||alarm.getAlarmname().indexOf("replaceableUnitProblem")>=0 ||alarm.getAlarmname().indexOf("RLOS")>=0||alarm.getAlarmname().indexOf("RS_EXC")>=0||alarm.getAlarmname().indexOf("RS_LOF")>=0 ||alarm.getAlarmname().indexOf("RS_SD")>=0||alarm.getAlarmname().indexOf("SD")>=0||alarm.getAlarmname().indexOf("SHELF_ABSENCE")>=0 ||alarm.getAlarmname().indexOf("signalQualityEvaluationFailure")>=0||alarm.getAlarmname().indexOf("SNCP_FAIL")>=0 ||alarm.getAlarmname().indexOf("SNCP_INDI")>=0||alarm.getAlarmname().indexOf("SUBRACK_COMM_PS")>=0 ||alarm.getAlarmname().indexOf("SW_FAIL")>=0||alarm.getAlarmname().indexOf("SW_SNCP_PS")>=0||alarm.getAlarmname().indexOf("SWR")>=0 ||alarm.getAlarmname().indexOf("SWTR")>=0||alarm.getAlarmname().indexOf("TD")>=0||alarm.getAlarmname().indexOf("TEMP_ALAM")>=0 ||alarm.getAlarmname().indexOf("TEMP_OVER")>=0||alarm.getAlarmname().indexOf("TF")>=0||alarm.getAlarmname().indexOf("TLIMIT")>=0){ return true; }else return false; } /**IPRAN业余无效告警过滤*/ private boolean getIPRANfiler(GesAlarmDto alarm){ if(alarm.getResumeflag().equals("0")&&alarm.getReserve5().equals("IPRAN")){ return false; }else return true; /*if(alarm.getAlarmname().indexOf("NODE DOWN")>=0||alarm.getAlarmname().indexOf("IPRAN PING告警")>=0 ||alarm.getAlarmname().indexOf("端口链路状态DOWN")>=0||alarm.getAlarmname().indexOf("IPRAN 板卡CPU占用率阈值")>=0 ||alarm.getAlarmname().indexOf("温度门限超出告警")>=0||alarm.getAlarmname().indexOf("IPRAN 温度告警")>=0 ||alarm.getAlarmname().indexOf("ISIS状态DOWN")>=0||alarm.getAlarmname().indexOf("板卡异样告警")>=00){ return true; }else return false;*/ } /**动环业余无效告警过滤*/ private boolean getDongHuanfiler(GesAlarmDto alarm){ if(alarm.getAlarmname().indexOf("供电系统故障")>=0||alarm.getAlarmname().indexOf("UPS故障")>=0 ||alarm.getAlarmname().indexOf("防盗")>=0||alarm.getAlarmname().indexOf("空调故障")>=0 ||alarm.getAlarmname().indexOf("门禁")>=0||alarm.getAlarmname().indexOf("市电停电")>=0 ||alarm.getAlarmname().indexOf("水浸")>=0||alarm.getAlarmname().indexOf("通信故障")>=0 ||alarm.getAlarmname().indexOf("温度")>=0||alarm.getAlarmname().indexOf("温度故障")>=0 ||alarm.getAlarmname().indexOf("烟感")>=0||alarm.getAlarmname().indexOf("油机故障")>=0 ||alarm.getAlarmname().indexOf("油机启动")>=0||alarm.getAlarmname().indexOf("油机重大故障")>=0 ||alarm.getAlarmname().indexOf("与服务器通信异样故障(服务器与智能网管断连)")>=0 ||alarm.getAlarmname().indexOf("整流模块故障")>=0||alarm.getAlarmname().indexOf("直流电压故障")>=0 ||alarm.getAlarmname().indexOf("直流熔丝断故障故障")>=0){ return true; }else return false; } /**无线业余无效告警过滤*/ private boolean getWuXianfiler(GesAlarmDto alarm){ if(alarm.getAlarmname().indexOf("24小时闪断统计告警(只针对乌鲁木齐派单,且由PPP链路故障告警引起的不派单)")>=0 ||alarm.getAlarmname().indexOf("30分钟闪断统计告警")>=0||alarm.getAlarmname().indexOf("A10/A11接口链路故障")>=0 ||alarm.getAlarmname().indexOf("A12接口鉴权通道故障")>=0||alarm.getAlarmname().indexOf("A12链路不通或者AN AAA故障")>=0 ||alarm.getAlarmname().indexOf("A13/A16接口通道故障")>=0||alarm.getAlarmname().indexOf("A17信令链路连贯中断")>=0 ||alarm.getAlarmname().indexOf("AAA接口模块绑定socket失败")>=0||alarm.getAlarmname().indexOf("Abis口以太网连贯断")>=0 ||alarm.getAlarmname().indexOf("Abis口以太网下层链路建链不胜利")>=0||alarm.getAlarmname().indexOf("Abis链路断")>=0 ||alarm.getAlarmname().indexOf("AN-AAA鉴权不可达")>=0||alarm.getAlarmname().indexOf("AN和AN AAA间的A12链路不通或者AN AAA故障")>=0 ||alarm.getAlarmname().indexOf("BAM与SAU连贯中断")>=0||alarm.getAlarmname().indexOf("BAM与核心框SCU中断")>=0 ||alarm.getAlarmname().indexOf("BBU单板保护链路异样告警")>=0||alarm.getAlarmname().indexOf("BBU光模块接管异样告警")>=0 ||alarm.getAlarmname().indexOf("BBU光模块性能好转告警")>=0||alarm.getAlarmname().indexOf("BFD会话Down")>=0 ||alarm.getAlarmname().indexOf("BSC与BTS通信中断")>=0||alarm.getAlarmname().indexOf("BTS掉站")>=0 ||alarm.getAlarmname().indexOf("BTS掉站未探测到RTR")>=0 ||alarm.getAlarmname().indexOf("CDT(呼叫具体跟踪)的FTP Server与网元OMP之间的链路异样")>=0 ||alarm.getAlarmname().indexOf("CTML链路断开")>=0||alarm.getAlarmname().indexOf("DSP资源不可用 ")>=0 ||alarm.getAlarmname().indexOf("E1/T1滑帧超限告警")>=0||alarm.getAlarmname().indexOf("GE选路异样")>=0 ||alarm.getAlarmname().indexOf("GPS/Glonass/北斗接收机天馈短路")>=0||alarm.getAlarmname().indexOf("MLPPP组故障告警")>=0 ||alarm.getAlarmname().indexOf("MTP3局向不可达 ")>=0||alarm.getAlarmname().indexOf("MTP3链路不可用")>=0 ||alarm.getAlarmname().indexOf("PA去使能")>=0||alarm.getAlarmname().indexOf("PDSN/HSGW处于性能异样状态")>=0 ||alarm.getAlarmname().indexOf("PDSN处于性能异样状态")>=0||alarm.getAlarmname().indexOf("PWRD485通信链路断")>=0 ||alarm.getAlarmname().indexOf("SCCP子系统不可用")>=0||alarm.getAlarmname().indexOf("SDH/SONET:信号失落")>=0 ||alarm.getAlarmname().indexOf("STC局向不可达")>=0||alarm.getAlarmname().indexOf("TRUNK端口down")>=0 ||alarm.getAlarmname().indexOf("TRUNK通信能力有余")>=0||alarm.getAlarmname().indexOf("蕴含“驻波”的告警")>=0 ||alarm.getAlarmname().indexOf("备用BAM故障告警")>=0||alarm.getAlarmname().indexOf("备用BAM数据同步异样告警")>=0 ||alarm.getAlarmname().indexOf("单板不在位")>=0||alarm.getAlarmname().indexOf("单板对外端口down")>=0 ||alarm.getAlarmname().indexOf("单板离线或CPU处于长期复位状态")>=0||alarm.getAlarmname().indexOf("单板时钟异样故障")>=0 ||alarm.getAlarmname().indexOf("单板硬件故障告警")>=0||alarm.getAlarmname().indexOf("单板子系统故障")>=0 ||alarm.getAlarmname().indexOf("电池欠压告警")>=0||alarm.getAlarmname().indexOf("掉电报警")>=0 ||alarm.getAlarmname().indexOf("反向链路RSSI(接管信号强度批示)低")>=0||alarm.getAlarmname().indexOf("反向链路RSSI(接管信号强度批示)高")>=0 ||alarm.getAlarmname().indexOf("风扇堵转")>=0||alarm.getAlarmname().indexOf("风扇堵转告警")>=0||alarm.getAlarmname().indexOf("风扇故障")>=0 ||alarm.getAlarmname().indexOf("干接点告警(机房交换停电告警、电池电压过低告警、交换停电、直流欠压)")>=0||alarm.getAlarmname().indexOf("光口0 信号失落")>=0 ||alarm.getAlarmname().indexOf("光口反向帧失锁")>=0||alarm.getAlarmname().indexOf("光口光模块不在位")>=0||alarm.getAlarmname().indexOf("光口光模块无光")>=0 ||alarm.getAlarmname().indexOf("光口环回检测异样")>=0||alarm.getAlarmname().indexOf("光口上行帧失锁")>=0||alarm.getAlarmname().indexOf("光口信号劣化告警(RSD)")>=0 ||alarm.getAlarmname().indexOf("光口信号生效告警(RSF)")>=0||alarm.getAlarmname().indexOf("光口支路单元批示告警")>=0||alarm.getAlarmname().indexOf("光口支路告警批示信号告警(AIS)")>=0 ||alarm.getAlarmname().indexOf("机柜输出电压异样告警")>=0||alarm.getAlarmname().indexOf("基站操作保护链路中断")>=0||alarm.getAlarmname().indexOf("基站时钟不可用告警")>=0 ||alarm.getAlarmname().indexOf("口支路单元告警批示信号告警(TUAIS)")>=0||alarm.getAlarmname().indexOf("框间链路中断")>=0 ||alarm.getAlarmname().indexOf("拉远射频单元保护链路异样告警")>=0||alarm.getAlarmname().indexOf("没有可用的UPCF资源")>=0 ||alarm.getAlarmname().indexOf("模块心跳失落")>=0||alarm.getAlarmname().indexOf("目标信令点不可拜访")>=0 ||alarm.getAlarmname().indexOf("偶联断链")>=0||alarm.getAlarmname().indexOf("偶联通路断")>=0 ||alarm.getAlarmname().indexOf("七号信令链路不可用")>=0||alarm.getAlarmname().indexOf("射频单元光模块/SFP接口电缆不在位告警")>=0 ||alarm.getAlarmname().indexOf("射频单元光模块接管异样告警")>=0||alarm.getAlarmname().indexOf("射频单元光模块性能好转告警")>=0 ||alarm.getAlarmname().indexOf("射频单元交换掉电告警")>=0||alarm.getAlarmname().indexOf("时钟模块故障")>=0 ||alarm.getAlarmname().indexOf("搜星不胜利")>=0||alarm.getAlarmname().indexOf("天馈驻波比重大异样")>=0 ||alarm.getAlarmname().indexOf("天馈驻波比重大异样故障")>=0||alarm.getAlarmname().indexOf("天馈驻波比个别异样")>=0 ||alarm.getAlarmname().indexOf("网口心跳链路故障告警")>=0||alarm.getAlarmname().indexOf("网元代理链路断")>=0 ||alarm.getAlarmname().indexOf("网元和OMC断链")>=0||alarm.getAlarmname().indexOf("未探测到CC")>=0 ||alarm.getAlarmname().indexOf("未探测到CH/CVI")>=0||alarm.getAlarmname().indexOf("未探测到FS")>=0 ||alarm.getAlarmname().indexOf("未探测到GCM")>=0||alarm.getAlarmname().indexOf("未探测到PM")>=0 ||alarm.getAlarmname().indexOf("未探测到RTR")>=0||alarm.getAlarmname().indexOf("未探测到SA")>=0 ||alarm.getAlarmname().indexOf("温度告警")>=0||alarm.getAlarmname().indexOf("无可用SPCF资源")>=0 ||alarm.getAlarmname().indexOf("零碎时钟参考源不可用告警")>=0||alarm.getAlarmname().indexOf("下联光口信号失落")>=0 ||alarm.getAlarmname().indexOf("下联光口帧信号失落")>=0||alarm.getAlarmname().indexOf("小区退服")>=0 ||alarm.getAlarmname().indexOf("小区退服BSC与BTS通信中断")>=0||alarm.getAlarmname().indexOf("星卡工作异样告警")>=0 ||alarm.getAlarmname().indexOf("星卡锁星有余告警")>=0||alarm.getAlarmname().indexOf("星卡天线故障告警")>=0 ||alarm.getAlarmname().indexOf("星卡天线搜星异样告警")>=0||alarm.getAlarmname().indexOf("业务单板心跳失落")>=0 ||alarm.getAlarmname().indexOf("语音呼叫成功率低于门限值")>=0||alarm.getAlarmname().indexOf("直流输出电压欠压")>=0 ||alarm.getAlarmname().indexOf("中继线不可用")>=0||alarm.getAlarmname().indexOf("中继线谬误")>=0 ||alarm.getAlarmname().indexOf("主备SCU板间通信故障告警")>=0||alarm.getAlarmname().indexOf("主备通信链路断")>=0 ){ return true; }else return false; } /**替换业余无效告警过滤*/ private boolean getJiaoHuanfiler(GesAlarmDto alarm){ return true; } /**业务平台业余无效告警过滤*/ private boolean getYeWuPingTaifiler(GesAlarmDto alarm){ return true; } /**4G业余无效告警过滤*/ private boolean get4Gfiler(GesAlarmDto alarm){ /*if(alarm.getAlarmname().indexOf("小区不可用")>=0||alarm.getAlarmname().indexOf("3/4G大面积RRU退服")>=0 ||alarm.getAlarmname().indexOf("3/4G大面积断站")>=0||alarm.getAlarmname().indexOf("CELL FAULTY")>=0 ||alarm.getAlarmname().indexOf("GNSS接收机搜星故障")>=0||alarm.getAlarmname().indexOf("GNSS天馈链路故障")>=0 ||alarm.getAlarmname().indexOf("LTE小区退出服务")>=0||alarm.getAlarmname().indexOf("NE O&M CONNECTION FAILURE")>=0 ||alarm.getAlarmname().indexOf("NE OandM CONNECTION FAILURE")>=0 ||alarm.getAlarmname().indexOf("RRU掉电告警")>=0||alarm.getAlarmname().indexOf("RRU链路断")>=0 ||alarm.getAlarmname().indexOf("RX通道异样(198098469)")>=0||alarm.getAlarmname().indexOf("X2断链告警")>=0 ||alarm.getAlarmname().indexOf("单板不在位")>=0||alarm.getAlarmname().indexOf("单板通信链路断")>=0 ||alarm.getAlarmname().indexOf("单板温度异样")>=0||alarm.getAlarmname().indexOf("单板硬件故障")>=0 ||alarm.getAlarmname().indexOf("风扇故障")>=0||alarm.getAlarmname().indexOf("光口接管帧失锁")>=0 ||alarm.getAlarmname().indexOf("光口未接管到光信号")>=0||alarm.getAlarmname().indexOf("光模块不可用")>=0 ||alarm.getAlarmname().indexOf("光模块接管光功率异样")>=0||alarm.getAlarmname().indexOf("基站退出服务")>=0 ||alarm.getAlarmname().indexOf("基站退出服务andRRU链路断")>=0||alarm.getAlarmname().indexOf("进风口温度异样")>=0 ||alarm.getAlarmname().indexOf("没有可用的空口时钟源")>=0||alarm.getAlarmname().indexOf("设施掉电")>=0 ||alarm.getAlarmname().indexOf("瞬断告警")>=0||alarm.getAlarmname().indexOf("天馈驻波比异样")>=0 ||alarm.getAlarmname().indexOf("内部扩大设施异样")>=0||alarm.getAlarmname().indexOf("网元断链告警")>=0 ||alarm.getAlarmname().indexOf("网元断链告警")>=0||alarm.getAlarmname().indexOf("网元连贯中断")>=0 ||alarm.getAlarmname().indexOf("重大大面积断站")>=0||alarm.getAlarmname().indexOf("重大大面积断站")>=0){ return true; }else return false;*/ if(alarm.getAlarmname().indexOf("射频单元启动事件")>=0||alarm.getAlarmname().indexOf("性能后果失落")>=0){ return false; }else{ return true; } } /**自监控业余无效告警过滤*/ private boolean getZiJianKongfiler(GesAlarmDto alarm){ return true; } /**群障拦挡业余无效告警过滤*/ private boolean getQunZhangLanJiefiler(GesAlarmDto alarm){ return true; }}
生产者Handler ProducerHandler
package com.ustcinfo.kanms.alarmcollector.kafka;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.log4j.Logger; import com.ustcinfo.kanms.alarmcollector.main.GlobleConfiguration;import java.util.Properties;public class ProducerHandler{ private final KafkaProducer<String, String> producer; private static Logger logger = Logger.getLogger(ProducerHandler.class.getName()); private String kafkaUrl; public ProducerHandler(String topic,String message) { this.kafkaUrl = GlobleConfiguration.getInstance().getKafkaUrl(); Properties props = new Properties(); //此处配置的是kafka的端口 props.put("bootstrap.servers",kafkaUrl); props.put("acks", "all"); props.put("retries", "0"); props.put("batch.size", "16384"); props.put("linger.ms", "1"); props.put("buffer.memory", "33554432"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer<String, String>(props); //生成音讯 ProducerRecord record = new ProducerRecord(topic,message); //发送音讯 producer.send(record); logger.info("【kafka】向Kafka的TOPIC【" + topic + "】中发送音讯"); logger.info("【kafka】音讯内容:" + message); logger.info("【kafka】推送胜利"); }}
生产者Worker ProducerWorker
package com.ustcinfo.kanms.alarmcollector.kafka;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.log4j.Logger;public class ProducerWorker implements Runnable{ private ProducerRecord<String, String> producerRecord; public ProducerWorker(ProducerRecord record) { this.producerRecord = record; } private static Logger logger = Logger.getLogger(ProducerWorker.class.getName()); public void run() { System.out.println(producerRecord.value()); }}
消费者Handler ConsumerHandler
package com.ustcinfo.kanms.alarmcollector.kafka;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.log4j.Logger;import com.ustcinfo.kanms.alarmcollector.main.GlobleConfiguration;import java.util.List;import java.util.Map;import java.util.Properties;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.ExecutorService;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;import javax.xml.datatype.Duration;public class ConsumerHandler { //static Logger logger = Logger.getLogger(ConsumerHandler.class.getName()); private final KafkaConsumer<String, String> consumer; private ExecutorService executors; private String kafkaUrl; public ConsumerHandler(List<String> topics) { this.kafkaUrl = GlobleConfiguration.getInstance().getKafkaUrl(); Properties props = new Properties(); props.put("bootstrap.servers", kafkaUrl); props.put("group.id", "gzpt"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(topics); execute(1); } public void execute(int workerNum) { executors = new ThreadPoolExecutor(workerNum, workerNum, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(1000), new ThreadPoolExecutor.CallerRunsPolicy()); Thread t = new Thread(new Runnable(){//启动一个子线程来监听kafka音讯 public void run(){ while (true) { Duration timeout=null; ConsumerRecords<String, String> records = consumer.poll(100); for (final ConsumerRecord record : records) { //logger.info("【Kafka】监听到kafka的TOPIC【" + record.topic() + "】的音讯"); //logger.info("【Kafka】音讯内容:" + record.value()); executors.submit(new ConsumerWorker(record)); } } }}); t.start(); } public void shutdown() { if (consumer != null) { consumer.close(); } if (executors != null) { executors.shutdown(); } try { if (!executors.awaitTermination(10, TimeUnit.SECONDS)) { //logger.info("【Kafka】Timeout.... Ignore for this case "); } } catch (InterruptedException ignored) { //logger.info("【Kafka】Other thread interrupted this shutdown, ignore for this case."); Thread.currentThread().interrupt(); } }}
消费者Worker ConsumerWorker
package com.ustcinfo.kanms.alarmcollector.kafka;import java.text.SimpleDateFormat;import java.util.Date;import java.util.List;import javax.jms.JMSException;import javax.jms.TextMessage;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.log4j.Logger;import com.ustcinfo.kanms.alarmcollector.model.GesAlarm;import com.ustcinfo.kanms.alarmcollector.util.FileWriteThread;//import com.ustcinfo.kanms.alarmcollector.activemq.AlarmAutoReceiverThread.AlarmAutoReceiver;import com.ustcinfo.kanms.alarmcollector.framework.GesAlarmContainer;import com.ustcinfo.kanms.alarmcollector.framework.ReadXML;import javax.jms.Message;import javax.jms.MessageConsumer;import javax.jms.MessageListener;public class ConsumerWorker implements Runnable { private ConsumerRecord<String, String> consumerRecord; private final FileWriteThread filewrite = new FileWriteThread();//建设一个新的线程写文件 public ConsumerWorker() { this.gesAlarmContainer = GesAlarmContainer.getInstance(); } public ConsumerWorker(ConsumerRecord record) { this.consumerRecord = record; } private GesAlarmContainer gesAlarmContainer; private static Logger logger = Logger.getLogger(ConsumerWorker.class); public void run() { // consumer接管音讯后,这里能够写针对收到的音讯的业务解决 System.out.println(consumerRecord.value()); String writeData=consumerRecord.value(); try{ filewrite.handle(writeData+"\n--"+dateFormat(new Date())); }catch(Exception e){ logger.error("告警日志写入出错!",e); filewrite.handle(writeData+"\n--"+dateFormat(new Date())); } /*Message message=consumerRecord.value(); if (message instanceof TextMessage){ TextMessage tm = (TextMessage)message; try { String xml = tm.getText(); if(xml!=null){ try{ filewrite.handle(xml.substring(xml.indexOf("<ALARMS>"),xml.lastIndexOf("</ALARMS>")+9)+"\n--"+dateFormat(new Date())); }catch(Exception e){ logger.error("告警日志写入出错!",e); filewrite.handle(xml+"\n--"+dateFormat(new Date())); } } // 当音讯为“quit”时,唤醒线程 if(xml != null && xml.equalsIgnoreCase("quit")) { logger.info(Thread.currentThread().getName() + "接管到的音讯为:" + xml + ",开始退出线程"); synchronized (this) { notifyAll(); } return; } */ /*String gesAlarmStr = ReadXML.obtainBodyInfo(xml.trim()); List<GesAlarm> gesAlarmList = ReadXML.getAlarmValue(gesAlarmStr); if(gesAlarmList!=null&&gesAlarmList.size()>0){ logger.debug("gesAlarmList.size()=" + gesAlarmList.size()); for(int i=0;i<gesAlarmList.size();i++){ GesAlarm gesAlarm = new GesAlarm(); gesAlarm = gesAlarmList.get(i); gesAlarmContainer.putGesAlarm(gesAlarm); } }*/ //向kafka写入数据 /*new ProducerHandler("znwgAlarm",xml); }catch(Exception e){ logger.error("承受告警信息出错!", e); }finally { }*/ /* String xml=consumerRecord.value().trim(); String gesAlarmStr = ReadXML.obtainBodyInfo(xml.trim()); List<GesAlarm> gesAlarmList = ReadXML.getAlarmValue(gesAlarmStr); if(gesAlarmList!=null&&gesAlarmList.size()>0){ logger.debug("gesAlarmList.size()=" + gesAlarmList.size()); for(int i=0;i<gesAlarmList.size();i++){ GesAlarm gesAlarm = new GesAlarm(); gesAlarm = gesAlarmList.get(i); gesAlarmContainer.putGesAlarm(gesAlarm); } } */ } private String dateFormat(Date date){ SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); return sdf.format(date); }}