关于kafka:异步消息处理机制之KAFKA应用实例

60次阅读

共计 42949 个字符,预计需要花费 108 分钟才能阅读完成。

本我的项目采纳的是 SpringMvc+ 原生 JDBC 开发的 Java 数据处理过程,用的是 oracle 数据库。
我的项目整体目录如下:

我的项目中用到的消息中间件次要有 activeMQ 和 KAFKA 两种,本篇次要介绍 KAFKA
KAFKA 数据的推和取,分为生产者和消费者。生产者定义一个 topic,相当于一个数据队列,将数据推送至 kafka 的该 topic 下, 消费者凭借该 topic 从 kafka 中按序列读取数据。实例代码实现如下:

首先是 oracle 数据库配置文件

<?xml version="1.0" encoding="GB18030"?>
<!-- the proxool configuration can be embedded within your own application's.
Anything outside the "proxool" tag is ignored. -->
<something-else-entirely>
  <proxool>
    <alias>gess</alias>
    
    <driver-url>jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS_LIST=(ADDRESS=(PROTOCOL=TCP)(HOST=135.224.21.131)(PORT=1521))(ADDRESS = (PROTOCOL = TCP)(HOST = 135.224.21.133)(PORT=1521))(LOAD_BALANCE=yes)(FAILOVER=on))(CONNECT_DATA=(SERVICE_NAME=iam)(FAILOVER_MODE=(TYPE=SESSION)(METHOD=BASIC))))</driver-url>
    <!--  <driver-url>jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST=135.224.21.133)(PORT=1521))(ADDRESS=(PROTOCOL=TCP)(HOST=135.224.21.131)(PORT=1521))(LOAD_BALANCE=yes)(CONNECT_DATA=(SERVER=DEDICATED)(SERVICE_NAME=iam2)))</driver-url>-->
    <!-- <driver-url>jdbc:oracle:thin:@//135.224.21.133:1521:iam2</driver-url> -->
    <driver-class>oracle.jdbc.driver.OracleDriver</driver-class>
    <driver-properties>
      <!-- <property name="user" value="c##scott"/>
      <property name="password" value="StCgzpt@12!qqz;"/> -->
      <property name="user" value="xjgzk"/>
      <property name="password" value="Gzk%^1202"/>
    </driver-properties>
  
    <maximum-connection-count>1000</maximum-connection-count>
    <minimum-connection-count>1</minimum-connection-count>
       <maximum-active-time>90000</maximum-active-time>
    <maximum-connection-lifetime>3600000</maximum-connection-lifetime> 
    <house-keeping-test-sql>select CURRENT_DATE from DUAL</house-keeping-test-sql>
  </proxool>
</something-else-entirely>

程序入口,main 办法主体

/**
 * 
 */
package com.ustcinfo.kanms.alarmcollector.main;

import java.text.ParseException;
import com.ustcinfo.kanms.alarmcollector.framework.GesAlarmAnalysisInDBThreads;
import com.ustcinfo.kanms.alarmcollector.framework.GesAlarmInDBThreads;
import com.ustcinfo.kanms.alarmcollector.framework.UnusualAlarmAnalysisThreads;
import com.ustcinfo.kanms.alarmcollector.kafka.kafkaMessageReceiver;

/**
 * ================================================= <br>
 * 工程:GessAlarmCollector <br>
 * 类名:MainProcess <br>
 * 作者:lt <br>
 * 工夫:2019-9-18 下午 02:57:59<br>
 * 版本:Version 1.0 <br>
<br>
 * 形容:零碎入口 <br>
 * ================================================= <br>
 */
public class MainProcess {
    
    /**
     * 零碎入口
     * @param args
     * @throws ParseException 
     */
    public static void main(String[] args) throws ParseException {
        
        // 初始化零碎配置文件
        GlobleConfiguration.getInstance().initSysConfig();
        // 启动 Kafka consumer 监督, 获取告警数据
        new kafkaMessageReceiver(2).start();
        // 启动数据入库线程(瞬断入库)new GesAlarmInDBThreads(2);
        // 非瞬断告警剖析入库
        new GesAlarmAnalysisInDBThreads(3);
        // 异样告警查问对应流动告警
        new UnusualAlarmAnalysisThreads(2);
    }
}

全局化参数定义

/**
 * 
 */
package com.ustcinfo.kanms.alarmcollector.main;

import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;

import org.apache.activemq.ActiveMQConnection;
import org.apache.log4j.Appender;
import org.apache.log4j.FileAppender;
import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;

import com.ustcinfo.kanms.alarmcollector.util.ResourceLoaderUtil;

/**
 * ================================================= <br>
 * 工程:GessAlarmCollector <br>
 * 类名:GlobleConfiguration <br>
 * 作者:dlzhang <br>
 * 工夫:2014-8-28 下午 06:45:11<br>
 * 版本:Version 1.0 <br>
 <br>
 * 形容:初始化全局参数,保障配置数据只被加载一次 <br>
 * ================================================= <br>
 */
public class GlobleConfiguration {private static final Logger logger = Logger.getLogger(GlobleConfiguration.class);
    
    private static GlobleConfiguration instance;
    private GlobleConfiguration(){}
    
    public static final String LOG_CFG_FILE_NAME_DEFAULT = "log4j_alarm.properties";
    public static final String LOG_FILE_NAME_DEFAULT = "GessAlarmCollector.log";
    public static final String SYS_CFG_FILE_NAME_DEFAULT = "sys_config_alarm.properties";
    
    public static final String ACTIVE_MQ_RUL = "ACTIVE_MQ_RUL";
    public static final String ALARM_RECV_QUEUE_NAME = "ALARM_RECV_QUEUE_NAME";
    public static final String ALARM_BILL_REL_RECV_QUEUE_NAME = "ALARM_BILL_REL_RECV_QUEUE_NAME";
    public static final String ACTIVE_MQ_SEND_QUEUE_NAME = "ACTIVE_MQ_SEND_QUEUE_NAME";
    public static final String ACTIVE_MQ_USER = "ACTIVE_MQ_USER";
    public static final String ACTIVE_MQ_PASSWD = "ACTIVE_MQ_PASSWD";
    public static final String ACTIVE_MQ_USER_DEFAULT = ActiveMQConnection.DEFAULT_USER;
    public static final String ACTIVE_MQ_PASSWD_DEFAULT = ActiveMQConnection.DEFAULT_PASSWORD;
    public static final String KAFKA_RUL = "KAFKA_RUL";
    public static final String KAFKA_TOPIC = "KAFKA_TOPIC";
    public static final String KAFKA_QFYJ_TOPIC = "QFYJ_TOPIC";
    
    private String activeMqUrl;
    private String alarmRecvQueueName;
    private String alarmBillRelRecvQueueName;
    private String activeMqSendQueueName;
    private String activeMqUser;
    private String activeMqPasswd;
    private String kafkaUrl;
    private String kafkaTopic;
    private String  kafkaQFYJTopic;
    private String groupid;

    public void initSysConfig() {
        // 初始化日志
        initLogConfig(LOG_CFG_FILE_NAME_DEFAULT); // 本地测试需注掉
        
        logger.info("零碎初始化:过程以后内存占用量(兆):" + (int)(Runtime.getRuntime().totalMemory()/(1048576)));
        Map<String, String> configMap = loadSysConfigTrim(SYS_CFG_FILE_NAME_DEFAULT);
        this.activeMqUrl = configMap.get(ACTIVE_MQ_RUL);
        this.alarmRecvQueueName = configMap.get(ALARM_RECV_QUEUE_NAME);
        this.alarmBillRelRecvQueueName = configMap.get(ALARM_BILL_REL_RECV_QUEUE_NAME);
        this.activeMqSendQueueName = configMap.get(ACTIVE_MQ_SEND_QUEUE_NAME);
        this.activeMqUser = configMap.get(ACTIVE_MQ_USER);
        if(null == this.activeMqUser)
            this.activeMqUser = ACTIVE_MQ_USER_DEFAULT;
        this.activeMqPasswd = (String)configMap.get(ACTIVE_MQ_PASSWD);
        if(null == this.activeMqPasswd)
            this.activeMqPasswd = ACTIVE_MQ_PASSWD_DEFAULT;
        this.kafkaUrl = configMap.get(KAFKA_RUL);
        this.kafkaTopic = configMap.get(KAFKA_TOPIC);
        this.kafkaQFYJTopic = configMap.get(KAFKA_QFYJ_TOPIC);
        logger.debug(this.toString());
        logger.info("零碎初始化结束:过程以后内存占用量(兆):" + (int)(Runtime.getRuntime().totalMemory()/(1048576)));
    }
    
    /**
     * 加载配置文件
     * @param sysFile
     * @return
     */
    private Map<String, String> loadSysConfigTrim(String sysFile) {if(null == sysFile)
            sysFile = SYS_CFG_FILE_NAME_DEFAULT;
        InputStream is = null;
        Properties props = null;
        Map<String, String> cfgMap = null;
        String configPath = System.getenv("GESS_CONFIG");
        if(null != configPath && !"".equals(configPath)) {sysFile = configPath + "/" + sysFile;}
        logger.info("零碎配置文件:" + sysFile);
        is = ResourceLoaderUtil.getResourceAsStream(sysFile);
        try {props = new Properties();
            props.load(is);
            cfgMap = new HashMap<String, String>();
            for(Iterator<Object> it=props.keySet().iterator(); it.hasNext();) {String key = ((String)it.next()).trim();
                String value = props.getProperty(key).trim();
                cfgMap.put(key, value);
            }
        } catch (IOException e) {logger.error("加载配置文件失败", e);
        } finally {
            try {if (is != null)
                    is.close();
                is = null;
            } catch (IOException e) {logger.error("敞开 IO 流失败", e);
            }
        }
        return cfgMap;
    }
    
    /**
     *  初始化 log4j 配置文件
     */
    private void initLogConfig(String cfgFile) {if(null == cfgFile)
            cfgFile = LOG_CFG_FILE_NAME_DEFAULT;
        String cfgPath = System.getenv("GESS_CONFIG");
        // 没有配置环境变量则从当前目录找
        if(cfgPath == null || "".equals(cfgPath)) {cfgFile = "./" + cfgFile;} else {cfgFile = cfgPath + "/" + cfgFile;}
        PropertyConfigurator.configure(cfgFile);
        generateLogFileName(null);
        logger.info("日志配置文件:" + cfgFile);
    }
    
    /**
     * 生成日志文件名
     */
    private void generateLogFileName(String logFile) {if(null == logFile)
            logFile = LOG_FILE_NAME_DEFAULT;
        Appender app = Logger.getRootLogger().getAppender("R");
        if (app instanceof FileAppender) {FileAppender fileApp = (FileAppender) app;
            String logPath = System.getenv("GESS_LOG");
            // 没有配置环境变量则生成到当前目录
            if(logPath != null && !"".equals(logPath)) {logFile = logPath + "/" + logFile;}
            fileApp.setFile(logFile);
            fileApp.activateOptions();}
    }

    /* (non-Javadoc)
     * @see java.lang.Object#toString()
     */
    @Override
    public String toString() {
        return "GlobleConfiguration [activeMqPasswd=" + activeMqPasswd + ", activeMqSendQueueName="
                + activeMqSendQueueName + ", activeMqUrl=" + activeMqUrl + ", activeMqUser=" + activeMqUser
                + ", alarmBillRelRecvQueueName=" + alarmBillRelRecvQueueName + ", alarmRecvQueueName="
                + alarmRecvQueueName + "]";
    }

    /**
     * @return the instance
     */
    public static GlobleConfiguration getInstance() {if(null == instance)
            instance = new GlobleConfiguration();
        return instance;
    }

    /**
     * @return the activeMqUrl
     */
    public String getActiveMqUrl() {return activeMqUrl;}

    /**
     * @param activeMqUrl the activeMqUrl to set
     */
    public void setActiveMqUrl(String activeMqUrl) {this.activeMqUrl = activeMqUrl;}

    /**
     * @return the alarmRecvQueueName
     */
    public String getAlarmRecvQueueName() {return alarmRecvQueueName;}

    /**
     * @param alarmRecvQueueName the alarmRecvQueueName to set
     */
    public void setAlarmRecvQueueName(String alarmRecvQueueName) {this.alarmRecvQueueName = alarmRecvQueueName;}

    /**
     * @return the alarmBillRelRecvQueueName
     */
    public String getAlarmBillRelRecvQueueName() {return alarmBillRelRecvQueueName;}

    /**
     * @param alarmBillRelRecvQueueName the alarmBillRelRecvQueueName to set
     */
    public void setAlarmBillRelRecvQueueName(String alarmBillRelRecvQueueName) {this.alarmBillRelRecvQueueName = alarmBillRelRecvQueueName;}

    /**
     * @return the activeMqUser
     */
    public String getActiveMqUser() {return activeMqUser;}

    /**
     * @param activeMqUser the activeMqUser to set
     */
    public void setActiveMqUser(String activeMqUser) {this.activeMqUser = activeMqUser;}

    /**
     * @return the activeMqPasswd
     */
    public String getActiveMqPasswd() {return activeMqPasswd;}

    /**
     * @param activeMqPasswd the activeMqPasswd to set
     */
    public void setActiveMqPasswd(String activeMqPasswd) {this.activeMqPasswd = activeMqPasswd;}

    /**
     * @return the activeMqSendQueueName
     */
    public String getActiveMqSendQueueName() {return activeMqSendQueueName;}

    /**
     * @param activeMqSendQueueName the activeMqSendQueueName to set
     */
    public void setActiveMqSendQueueName(String activeMqSendQueueName) {this.activeMqSendQueueName = activeMqSendQueueName;}
    
    /**
     * @return the kafkaUrl
     */
    public String getKafkaUrl() {return kafkaUrl;}

    /**
     * @param kafkaUrl the kafkaUrl to set
     */
    public void setKafkaUrl(String kafkaUrl) {this.kafkaUrl = kafkaUrl;}
    /**
     * @return the kafkaTopic
     */
    public String getKafkaTopic() {return kafkaTopic;}

    /**
     * @param kafkaTopic the kafkaTopic to set
     */
    public void setKafkaTopic(String kafkaTopic) {this.kafkaTopic = kafkaTopic;}
    
    public String getKafkaQFYJTopic() {return kafkaQFYJTopic;}

    public void setKafkaQFYJTopic(String kafkaQFYJTopic) {this.kafkaQFYJTopic = kafkaQFYJTopic;}
    
    public String getGroupid() {return groupid;}

    public void setGroupid(String groupid) {this.groupid = groupid;}
}

定义 KFAKA 的 topic,kafka 承受音讯

/**
 * 
 */
package com.ustcinfo.kanms.alarmcollector.kafka;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.log4j.Logger;
import redis.clients.jedis.Jedis;
import com.ustcinfo.kanms.alarmcollector.framework.ReadXML;
import com.ustcinfo.kanms.alarmcollector.main.GlobleConfiguration;
import com.ustcinfo.kanms.alarmcollector.model.GesAlarm;
import com.ustcinfo.kanms.alarmcollector.model.GesAlarmDto;
import com.ustcinfo.kanms.alarmcollector.util.JedisConfig;
import com.ustcinfo.kanms.alarmcollector.util.JsonUtil;
import com.ustcinfo.kanms.alarmcollector.util.escapeUtil;


/**
 * ================================================= <br>
 * 工程:AlarmAutoReceiver <br>
 * 类名:ActiveMQReceiver <br>
 * 作者:lt <br>
 * 工夫:2019-9-23 下午 08:49:38<br>
 * 版本:Version 1.0 <br>
<br>
 * 形容:kafka 承受音讯 <br>
 *<br>
 * ================================================= <br>
 */
public class kafkaMessageReceiver{private static final Logger logger = Logger.getLogger(kafkaMessageReceiver.class);
     private String kafkaTopic;
     private String kafkaUrl;
     private String groupId = "group01";
     private Lock lock;
     private List<ConsumerThread> threadList = new ArrayList<ConsumerThread>();
     
     private static Properties buildKafkaProperty(String brokers, String groupId){Properties properties = new Properties();
            properties.put("bootstrap.servers", brokers);
            properties.put("group.id", groupId);
            properties.put("enable.auto.commit", "true");
            properties.put("auto.commit.interval.ms", "1000");
            properties.put("session.timeout.ms", "30000");
            properties.put("auto.offset.reset", "earliest");
            properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            return properties;
        }
     
     public kafkaMessageReceiver(int n){
        // 参数获取
         this.kafkaUrl = GlobleConfiguration.getInstance().getKafkaUrl();
         this.kafkaTopic = GlobleConfiguration.getInstance().getKafkaTopic();
         this.lock=new ReentrantLock();
         for(int i=0;i<n;i++){threadList.add(new ConsumerThread("consumerThread_"+i));
            }
        }
        
        public void start(){logger.info("告警接管多线程, 线程数为"+threadList.size()+"....");
            if(threadList.size()>0){for(ConsumerThread at:threadList){at.start();
                }
            }
        }
        
        class ConsumerThread extends Thread{
            private final KafkaConsumer<String, String> consumer;
            Set<TopicPartition> assignment = new HashSet<>();
            public ConsumerThread(String name) {super(name);
                Properties properties = buildKafkaProperty(kafkaUrl,groupId);
                this.consumer = new KafkaConsumer<>(properties);
                this.consumer.subscribe(Arrays.asList(kafkaTopic));
            }

            public void run(){logger.info("告警接管线程"+this.getName()+"启动.....");
                try{
                    // 指定分区从开端生产
                    ConsumerRecords<String,String> consumerRecords_1 = consumer.poll(100);
                    assignment = consumer.assignment();
                    Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignment);
                    for (TopicPartition tp : assignment) {Long offset = endOffsets.get(tp);
                        //System.out.println("分区最新 ossset="+ offset + " ");
                        // 取得上次接收数据的最大 offset
                        //Long curoffset=gesAlarmDataBase.getOffsetMax();
                        System.out.println("分区" + tp + "从" + offset + "开始生产");
                        consumer.seek(tp, offset);
                    }
                    while(true){ConsumerRecords<String,String> consumerRecords = consumer.poll(100);
                        for (ConsumerRecord<String,String> item : consumerRecords){ConsumerThreadHandler(item,item.offset());
                        } 
                   }
                }catch (Exception e) {logger.error("Unexpected error", e);
               } finally {consumer.close();
               }
           }
            
        // 告警处理过程
        private void ConsumerThreadHandler(ConsumerRecord<String,String> item,Long offset){logger.info("收到原始告警:"+item.value());
            GesAlarm gesAlarms=new GesAlarm();
            String xmlString ="";
            String xml=(String)item.value();
            if(xml.indexOf("<50-description>")>0){xmlString =xml.replace("<50-description>", "|50-description|");
            }else
                xmlString =xml;
           String xml_escape= escapeUtil.escapeHTMLTag(xmlString);
            //String s="<?xml version='1.0'encoding='UTF-8'?><Root><Header><Esb><Route><Sender>65.0001</Sender><Time>2019-12-28 19:39:46.910</Time><ServCode>65.0001.Itim_AlarmInfoPubServ.broadcast</ServCode><MsgId>65.0001_20191228193946_7533186910</MsgId><TransId>65.0001_20191228193946_7533186910</TransId><AuthType/><AuthCode/><Version>V0.1</Version><EsbId/><CarryType>0</CarryType><ServTestFlag>0</ServTestFlag><MsgType/><Topic>1</Topic></Route></Esb></Header><Body><![CDATA[<SERVICE><ALARMS><ALARM><AREAID>XJ-AKS</AREAID><ALARMTYPEID>4</ALARMTYPEID><ALARMPROFTYPE>1</ALARMPROFTYPE><ALARMCLASS>0</ALARMCLASS><ALARMLEVEL>3</ALARMLEVEL><SOURCEID>1014501</SOURCEID><ALARMNAME> 输出光功率 (dBm) 越限复原 (612603)</ALARMNAME><ALARMSUMMARY> 输出光功率(dBm) 越限复原 (612603)</ALARMSUMMARY><ALARMDETAIL> 设施 222.83.41.23(编码 AKS-AWT-MSE-1.MAN.M6000; 类型 BRAS; 型号 M6000) 告警: 输出光功率 (dBm) 越限复原((rack = NA, shelf = 0, slot = 1, subslot = 1, port = 6, lane = 0, port_type = 131) <50-description> 15min low alarm disappear. Low thres is: -23.0000, Current value is: -15.8670.)</ALARMDETAIL><PROBABLECAUSE></PROBABLECAUSE><SRCALARMID>1195831659</SRCALARMID><USERLABEL></USERLABEL><ADITIONALINFO></ADITIONALINFO><PROPOSEDADVISE></PROPOSEDADVISE><COLLECTTIME>2019-12-28 19:39:41</COLLECTTIME><NEALARMTIME>2019-12-28 19:40:49</NEALARMTIME><RESUMEFLAG>0</RESUMEFLAG><RESUMETIME></RESUMETIME><GROUPFLAG>0</GROUPFLAG><BUREAUID></BUREAUID><RESERVE1>222.83.41.23</RESERVE1><RESERVE2>M6000</RESERVE2><RESERVE3> 阿瓦提 M6000</RESERVE3><RESERVE4>3</RESERVE4><RESERVE5> 输出光功率 </RESERVE5><RESERVE6></RESERVE6><ALARMLEVEL>3</ALARMLEVEL><RESERVE7></RESERVE7></ALARM></ALARMS></SERVICE>]]></Body></Root>";
           // String xmlString= escapeUtil.escapeHTMLTag(s);
           if(xml_escape!=null){List<GesAlarm> alarmList=ReadXML.getAlarmValue(ReadXML.obtainBodyInfo(xml_escape));
                  if(alarmList!=null&&alarmList.size()>0){for (GesAlarm gesAlarm : alarmList) {
                              gesAlarms=gesAlarm;
                              GesAlarmDto gesAlarmDto=new GesAlarmDto();
                                 gesAlarmDto.setAreaid(gesAlarms.getAreaid());
                                 gesAlarmDto.setAlarmtypeid(gesAlarms.getAlarmtypeid());
                                 gesAlarmDto.setAlarmproftype(gesAlarms.getAlarmproftype());
                                 gesAlarmDto.setAlarmclass(gesAlarms.getAlarmclass());
                                 gesAlarmDto.setAlarmlevel(gesAlarms.getAlarmlevel());
                                 gesAlarmDto.setSourceid(gesAlarms.getSourceid());
                              gesAlarmDto.setAlarmname(gesAlarms.getAlarmname());
                                 gesAlarmDto.setAlarmsummary(gesAlarms.getAlarmsummary());
                                 gesAlarmDto.setAlarmdetail(gesAlarms.getAlarmdetail());
                                 gesAlarmDto.setProbablecause(gesAlarms.getProbablecause());
                                 gesAlarmDto.setSrcalarmId(gesAlarms.getSrcalarmId());
                                 gesAlarmDto.setUserlabel(gesAlarms.getUserlabel());
                                 gesAlarmDto.setAditionalinfo(gesAlarms.getAditionalinfo());
                                 gesAlarmDto.setProposedadvise(gesAlarms.getProposedadvise());
                                 gesAlarmDto.setCollecttime(gesAlarms.getCollecttime());
                                 gesAlarmDto.setNealarmtime(gesAlarms.getNealarmtime());
                              gesAlarmDto.setResumeflag(gesAlarms.getResumeflag());
                                 gesAlarmDto.setResumetime(gesAlarms.getResumetime());
                                 gesAlarmDto.setGroupflag(gesAlarms.getGroupflag());
                                 gesAlarmDto.setBureauid(gesAlarms.getBureauid());
                                 gesAlarmDto.setReserve1(gesAlarms.getReserve1());
                                 gesAlarmDto.setReserve2(gesAlarms.getReserve2());
                                 gesAlarmDto.setReserve3(gesAlarms.getReserve3());
                                 gesAlarmDto.setReserve4(gesAlarms.getReserve4());
                                 gesAlarmDto.setReserve5(gesAlarms.getReserve5());
                                 gesAlarmDto.setReserve6(gesAlarms.getReserve6());
                              gesAlarmDto.setReserve7(gesAlarms.getReserve7());
                                 gesAlarmDto.setEnterqueuetime(new Date());
                                 //gesAlarmDto.setEnterlibrarytime(null);
                              //gesAlarmDto.setOffset(offset);
                          // 告警过滤(哪些告警须要 / 不须要,现阶段都是通过策略中的告警名称来过滤)boolean isuseful=alarmFilter(gesAlarmDto);
                           if(isuseful){alarmPreHandle(gesAlarmDto);
                           }
                      }
               }  
           } 
        }
    }
    
    /** 告警预处理
     * 
     * */
    private void alarmPreHandle(GesAlarmDto gesAlarmDto){if(gesAlarmDto.getResumeflag()==null){// 为空,为批改告警
            return;
        }
        Jedis jedis=JedisConfig.getJedis();
        // 在缓存汇合中查问是否有对应的流动告警 / 复原告警
        if(jedis!=null&&gesAlarmDto!=null){//if(gesAlarmDto!=null){
        // 查问雷同 srcalarmid
        lock.lock();
        Set<String> set=jedis.keys(gesAlarmDto.getSrcalarmId()+":*");// 查问到雷同的 srcalarm_id
        JedisConfig.returnResource(jedis);
        if(set.size()==1){for (String s : set) {Jedis jedis_pop=JedisConfig.getJedis();
                String str=jedis_pop.rpop(s);// 从队列中取出查问到的告警
                JedisConfig.returnResource(jedis_pop);
                
                GesAlarmDto searchalarm=JsonUtil.stringToBean(str, GesAlarmDto.class);
                
                if(searchalarm==null){//
                    Jedis jedis_push=JedisConfig.getJedis();
                    jedis_push.lpush(gesAlarmDto.getSrcalarmId()+":"+gesAlarmDto.getEnterqueuetime().getTime(),JsonUtil.beanToString(gesAlarmDto));
                    JedisConfig.returnResource(jedis_push);
                }
                else if(searchalarm!=null&&searchalarm.getSourceid().equals(gesAlarmDto.getSourceid())
                        &&searchalarm.getSrcalarmId().equals(gesAlarmDto.getSrcalarmId())
                        &&!searchalarm.getResumeflag().equals(gesAlarmDto.getResumeflag())){searchalarm.setIdentify(2);
                    gesAlarmDto.setIdentify(2);
                    Jedis jedis_push=JedisConfig.getJedis();
                    jedis_push.lpush(searchalarm.getSrcalarmId()+":"+searchalarm.getEnterqueuetime().getTime(),JsonUtil.beanToString(searchalarm));
                    JedisConfig.returnResource(jedis_push);
                    
                    Jedis jedis_push2=JedisConfig.getJedis();
                    jedis_push2.lpush(gesAlarmDto.getSrcalarmId()+":"+gesAlarmDto.getEnterqueuetime().getTime(),JsonUtil.beanToString(gesAlarmDto));
                    JedisConfig.returnResource(jedis_push2);
                } else if(searchalarm!=null&&searchalarm.getSourceid().equals(gesAlarmDto.getSourceid())
                        &&searchalarm.getSrcalarmId().equals(gesAlarmDto.getSrcalarmId())
                        &&searchalarm.getResumeflag().equals(gesAlarmDto.getResumeflag())){Jedis jedis_push3=JedisConfig.getJedis();
                    jedis_push3.lpush(searchalarm.getSrcalarmId()+":"+searchalarm.getEnterqueuetime().getTime(),JsonUtil.beanToString(searchalarm));
                    JedisConfig.returnResource(jedis_push3);
                    logger.info("取到反复告警, 不入队列,srcalarm_id="+gesAlarmDto.getSrcalarmId());
                }else {logger.info("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
                }
            }
        }else if(set.size()>1){// 队列中曾经有一对瞬断告警,间接将此告警忽略不计
            logger.info("取到反复瞬断告警, 不入队列,srcalarm_id="+gesAlarmDto.getSrcalarmId());
        }
        else{Jedis jedis_push4=JedisConfig.getJedis();
            jedis_push4.lpush(gesAlarmDto.getSrcalarmId()+":"+gesAlarmDto.getEnterqueuetime().getTime(), JsonUtil.beanToString(gesAlarmDto));
            JedisConfig.returnResource(jedis_push4);
        }
        lock.unlock();}
  }
        
    /**
     * 告警过滤(过滤无效告警)*/    
    private boolean alarmFilter(GesAlarmDto alarm){
        boolean isUseful=false;
        switch(alarm.getAlarmproftype().toString()){
            case "1":// 数据
                isUseful=getShuJufiler(alarm);
                break;
            case "2":
                //isUseful=getChuanShufiler(alarm);
                isUseful=true;
                break;    
            case "3":
                //isUseful=getDongHuanfiler(alarm);
                isUseful= true;
                break;
            case "4":
                //isUseful=getWuXianfiler(alarm);
                isUseful= true;
                break;
            case "5":
                //isUseful=getJiaoHuanfiler(alarm);
                isUseful= true;
                break;
            case "6":
                //isUseful=getYeWuPingTaifiler(alarm);
                isUseful= true;
                break;
            case "7":
                isUseful=get4Gfiler(alarm);
                break;
            case "8":
                isUseful=getIPRANfiler(alarm);
                //isUseful=true;
                break;
            case "9":
                //isUseful=getZiJianKongfiler(alarm);
                isUseful=true;
                break;
            case "10":
                //isUseful=getQunZhangLanJiefiler(alarm);
                isUseful=true;
                break;
            case "11":
                isUseful=true;
                break;
            case "12":
                isUseful=true;
                break;
            default:
                isUseful=false;
        }
        return isUseful;
    }
    
    /** 数据业余无效告警过滤 */
    private boolean getShuJufiler(GesAlarmDto alarm){if(alarm.getAlarmname().indexOf("华为骨干断纤告警")>=0||alarm.getAlarmname().indexOf("中兴骨干断纤告警")>=0
                ||alarm.getAlarmname().indexOf("战火 ONU 设施掉电")>=0||alarm.getAlarmname().indexOf("中兴 ONU 设施掉电")>=0||alarm.getAlarmname().indexOf("华为 ONU 设施掉电")>=0
                ||alarm.getAlarmname().indexOf("中兴分支断纤告警")>=0||alarm.getAlarmname().indexOf("华为登录设施")>=0
                ||alarm.getAlarmname().indexOf("华为登陆设施")>=0){return false;}else if(alarm.getAlarmname().indexOf("SNMP_Link_Up")>=0&&alarm.getResumeflag()==0){return false;}else{return true;}
    }
    
    /** 传输业余无效告警过滤 */
    private boolean getChuanShufiler(GesAlarmDto alarm){if(alarm.getAlarmname().indexOf("FAN_ALM")>=0||alarm.getAlarmname().indexOf("LINK_LOS")>=0
                ||alarm.getAlarmname().indexOf("PPI_LOS")>=0||alarm.getAlarmname().indexOf("TEMP_OVER")>=0
                ||alarm.getAlarmname().indexOf("0LS")>=0||alarm.getAlarmname().indexOf("ABSENCE_WARNING")>=0
                ||alarm.getAlarmname().indexOf("ALM_GFP_dCSF")>=0||alarm.getAlarmname().indexOf("BD_STATUS")>=0
                ||alarm.getAlarmname().indexOf("BIOS_STATUS")>=0||alarm.getAlarmname().indexOf("BUS_ERR")>=0
                ||alarm.getAlarmname().indexOf("CARD_TCT")>=0||alarm.getAlarmname().indexOf("CLIENT_PORT_PS")>=0
                ||alarm.getAlarmname().indexOf("COMFAIL")>=0||alarm.getAlarmname().indexOf("COMMUN_FAIL")>=0
                ||alarm.getAlarmname().indexOf("coolingFanFailure")>=0||alarm.getAlarmname().indexOf("DBMS_ERROR")>=0
                ||alarm.getAlarmname().indexOf("DCVOLDOWN")>=0||alarm.getAlarmname().indexOf("DCVOLOVER")>=0
                ||alarm.getAlarmname().indexOf("detectTemp")>=0||alarm.getAlarmname().indexOf("ELCMODALM")>=0
                ||alarm.getAlarmname().indexOf("ETH_LOS")>=0||alarm.getAlarmname().indexOf("etpieUnlink")>=0
                ||alarm.getAlarmname().indexOf("External1")>=0||alarm.getAlarmname().indexOf("External13")>=0
                ||alarm.getAlarmname().indexOf("FAIL")>=0||alarm.getAlarmname().indexOf("FAN_ALM")>=0
                ||alarm.getAlarmname().indexOf("FAN_FAIL")>=0||alarm.getAlarmname().indexOf("FAN_TEMPTURE")>=0
                ||alarm.getAlarmname().indexOf("fanMiss")>=0||alarm.getAlarmname().indexOf("HARD_BAD")>=0
                ||alarm.getAlarmname().indexOf("ILS")>=0||alarm.getAlarmname().indexOf("IN_PWR_ABN")>=0
                ||alarm.getAlarmname().indexOf("IN_PWR_FAIL")>=0||alarm.getAlarmname().indexOf("IN_PWR_HIGH")>=0
                ||alarm.getAlarmname().indexOf("IN_PWR_LOW")>=0||alarm.getAlarmname().indexOf("INPWR_FAIL")>=0
                ||alarm.getAlarmname().indexOf("INTRA_OTU_PS")>=0||alarm.getAlarmname().indexOf("IOP_HIGH")>=0
                ||alarm.getAlarmname().indexOf("IOP_LOW")>=0||alarm.getAlarmname().indexOf("IOP-HIGH")>=0
                ||alarm.getAlarmname().indexOf("IOP-LOW")>=0||alarm.getAlarmname().indexOf("laserOld")>=0
                ||alarm.getAlarmname().indexOf("LOF")>=0||alarm.getAlarmname().indexOf("LOS")>=0
                ||alarm.getAlarmname().indexOf("lossOfFrame")>=0||alarm.getAlarmname().indexOf("lossOfSignal")>=0
                ||alarm.getAlarmname().indexOf("LSR_WILL_DIE")>=0||alarm.getAlarmname().indexOf("M_S_DATA_MSIMATCH")>=0
                ||alarm.getAlarmname().indexOf("M_S_VER_MISMATCH")>=0||alarm.getAlarmname().indexOf("ModuleTemp-TCA")>=0
                ||alarm.getAlarmname().indexOf("MS_SNCP_PS")>=0||alarm.getAlarmname().indexOf("MS-PSE")>=0
                ||alarm.getAlarmname().indexOf("MUT_LOS")>=0||alarm.getAlarmname().indexOf("MUT_TLOS")>=0
                ||alarm.getAlarmname().indexOf("NE_COMMU_BREAK")>=0||alarm.getAlarmname().indexOf("NE_NOT_LOGIN")>=0
                ||alarm.getAlarmname().indexOf("OA_LOW_GAIN")>=0||alarm.getAlarmname().indexOf("ODU_SNCP_PS")>=0
                ||alarm.getAlarmname().indexOf("ODUK_PS")>=0||alarm.getAlarmname().indexOf("ODUK_SNCP_PS")>=0
                ||alarm.getAlarmname().indexOf("OOF")>=0||alarm.getAlarmname().indexOf("OPT_LESS_INPUT")>=0
                ||alarm.getAlarmname().indexOf("OPT_NO_INPUT")>=0||alarm.getAlarmname().indexOf("OPT_OFFSET_UP")>=0
                ||alarm.getAlarmname().indexOf("OPT_OVER_INPUT")>=0||alarm.getAlarmname().indexOf("OSC_LOS")>=0
                ||alarm.getAlarmname().indexOf("OTN_LOF")>=0||alarm.getAlarmname().indexOf("OTRX_ABSENT")>=0
                ||alarm.getAlarmname().indexOf("OTS_LOS")>=0||alarm.getAlarmname().indexOf("OTS_LOS-P")>=0
                ||alarm.getAlarmname().indexOf("OTU_LOF")>=0||alarm.getAlarmname().indexOf("OVER_BRD_TMP")>=0
                ||alarm.getAlarmname().indexOf("OVER_ENV_TMP")>=0||alarm.getAlarmname().indexOf("OWSP_PS")>=0
                ||alarm.getAlarmname().indexOf("PG_P_FAIL")>=0||alarm.getAlarmname().indexOf("PG_SWITCH")>=0
                ||alarm.getAlarmname().indexOf("PLM")>=0||alarm.getAlarmname().indexOf("POWER_ABNORMAL")>=0
                ||alarm.getAlarmname().indexOf("POWER_FAIL")>=0||alarm.getAlarmname().indexOf("POWER_HIGH_48V")>=0
                ||alarm.getAlarmname().indexOf("POWER_LOW_48V")>=0||alarm.getAlarmname().indexOf("POWERALM")>=0
                ||alarm.getAlarmname().indexOf("POWERALM")>=0||alarm.getAlarmname().indexOf("powerBox1Port1NoInput")>=0
                ||alarm.getAlarmname().indexOf("powerBox2Port1NoInput")>=0||alarm.getAlarmname().indexOf("powerProblem")>=0
                ||alarm.getAlarmname().indexOf("powerSupplyFailure")>=0||alarm.getAlarmname().indexOf("protectionSwitchEvent")>=0
                ||alarm.getAlarmname().indexOf("PS")>=0||alarm.getAlarmname().indexOf("PSE")>=0
                ||alarm.getAlarmname().indexOf("PUMP_SD")>=0||alarm.getAlarmname().indexOf("PUMP_TF")>=0
                ||alarm.getAlarmname().indexOf("PW_INPUT_FAIL")>=0||alarm.getAlarmname().indexOf("PW_SHELL_NO_INPUT")>=0
                ||alarm.getAlarmname().indexOf("PW_SHELL_NO_OUTPUT")>=0||alarm.getAlarmname().indexOf("PWR_MAJ_ALM")>=0
                ||alarm.getAlarmname().indexOf("PWR_TEMP_HIGH")>=0||alarm.getAlarmname().indexOf("PWR_TEMP_OVERTH")>=0
                ||alarm.getAlarmname().indexOf("R_LOF")>=0||alarm.getAlarmname().indexOf("R_OOF")>=0
                ||alarm.getAlarmname().indexOf("replaceableUnitMissing")>=0||alarm.getAlarmname().indexOf("replaceableUnitProblem")>=0
                ||alarm.getAlarmname().indexOf("RLOS")>=0||alarm.getAlarmname().indexOf("RS_EXC")>=0||alarm.getAlarmname().indexOf("RS_LOF")>=0
                ||alarm.getAlarmname().indexOf("RS_SD")>=0||alarm.getAlarmname().indexOf("SD")>=0||alarm.getAlarmname().indexOf("SHELF_ABSENCE")>=0
                ||alarm.getAlarmname().indexOf("signalQualityEvaluationFailure")>=0||alarm.getAlarmname().indexOf("SNCP_FAIL")>=0
                ||alarm.getAlarmname().indexOf("SNCP_INDI")>=0||alarm.getAlarmname().indexOf("SUBRACK_COMM_PS")>=0
                ||alarm.getAlarmname().indexOf("SW_FAIL")>=0||alarm.getAlarmname().indexOf("SW_SNCP_PS")>=0||alarm.getAlarmname().indexOf("SWR")>=0
                ||alarm.getAlarmname().indexOf("SWTR")>=0||alarm.getAlarmname().indexOf("TD")>=0||alarm.getAlarmname().indexOf("TEMP_ALAM")>=0
                ||alarm.getAlarmname().indexOf("TEMP_OVER")>=0||alarm.getAlarmname().indexOf("TF")>=0||alarm.getAlarmname().indexOf("TLIMIT")>=0){return true;}else
            return false;
    }
    
    /**IPRAN 业余无效告警过滤 */
    private boolean getIPRANfiler(GesAlarmDto alarm){if(alarm.getResumeflag().equals("0")&&alarm.getReserve5().equals("IPRAN")){return false;}else return true;
        /*if(alarm.getAlarmname().indexOf("NODE DOWN")>=0||alarm.getAlarmname().indexOf("IPRAN PING 告警")>=0
                ||alarm.getAlarmname().indexOf("端口链路状态 DOWN")>=0||alarm.getAlarmname().indexOf("IPRAN 板卡 CPU 占用率阈值")>=0
                ||alarm.getAlarmname().indexOf("温度门限超出告警")>=0||alarm.getAlarmname().indexOf("IPRAN 温度告警")>=0
                ||alarm.getAlarmname().indexOf("ISIS 状态 DOWN")>=0||alarm.getAlarmname().indexOf("板卡异样告警")>=00){return true;}else
            return false;*/
    }
    
    /** 动环业余无效告警过滤 */
    private boolean getDongHuanfiler(GesAlarmDto alarm){if(alarm.getAlarmname().indexOf("供电系统故障")>=0||alarm.getAlarmname().indexOf("UPS 故障")>=0
                ||alarm.getAlarmname().indexOf("防盗")>=0||alarm.getAlarmname().indexOf("空调故障")>=0
                ||alarm.getAlarmname().indexOf("门禁")>=0||alarm.getAlarmname().indexOf("市电停电")>=0
                ||alarm.getAlarmname().indexOf("水浸")>=0||alarm.getAlarmname().indexOf("通信故障")>=0
                ||alarm.getAlarmname().indexOf("温度")>=0||alarm.getAlarmname().indexOf("温度故障")>=0
                ||alarm.getAlarmname().indexOf("烟感")>=0||alarm.getAlarmname().indexOf("油机故障")>=0
                ||alarm.getAlarmname().indexOf("油机启动")>=0||alarm.getAlarmname().indexOf("油机重大故障")>=0
                ||alarm.getAlarmname().indexOf("与服务器通信异样故障(服务器与智能网管断连)")>=0
                ||alarm.getAlarmname().indexOf("整流模块故障")>=0||alarm.getAlarmname().indexOf("直流电压故障")>=0
                ||alarm.getAlarmname().indexOf("直流熔丝断故障故障")>=0){return true;}else
            return false;
    }
    
    /** 无线业余无效告警过滤 */
    private boolean getWuXianfiler(GesAlarmDto alarm){if(alarm.getAlarmname().indexOf("24 小时闪断统计告警(只针对乌鲁木齐派单,且由 PPP 链路故障告警引起的不派单)")>=0
                ||alarm.getAlarmname().indexOf("30 分钟闪断统计告警")>=0||alarm.getAlarmname().indexOf("A10/A11 接口链路故障")>=0
                ||alarm.getAlarmname().indexOf("A12 接口鉴权通道故障")>=0||alarm.getAlarmname().indexOf("A12 链路不通或者 AN AAA 故障")>=0
                ||alarm.getAlarmname().indexOf("A13/A16 接口通道故障")>=0||alarm.getAlarmname().indexOf("A17 信令链路连贯中断")>=0
                ||alarm.getAlarmname().indexOf("AAA 接口模块绑定 socket 失败")>=0||alarm.getAlarmname().indexOf("Abis 口以太网连贯断")>=0
                ||alarm.getAlarmname().indexOf("Abis 口以太网下层链路建链不胜利")>=0||alarm.getAlarmname().indexOf("Abis 链路断")>=0
                ||alarm.getAlarmname().indexOf("AN-AAA 鉴权不可达")>=0||alarm.getAlarmname().indexOf("AN 和 AN AAA 间的 A12 链路不通或者 AN AAA 故障")>=0
                ||alarm.getAlarmname().indexOf("BAM 与 SAU 连贯中断")>=0||alarm.getAlarmname().indexOf("BAM 与核心框 SCU 中断")>=0
                ||alarm.getAlarmname().indexOf("BBU 单板保护链路异样告警")>=0||alarm.getAlarmname().indexOf("BBU 光模块接管异样告警")>=0
                ||alarm.getAlarmname().indexOf("BBU 光模块性能好转告警")>=0||alarm.getAlarmname().indexOf("BFD 会话 Down")>=0
                ||alarm.getAlarmname().indexOf("BSC 与 BTS 通信中断")>=0||alarm.getAlarmname().indexOf("BTS 掉站")>=0
                ||alarm.getAlarmname().indexOf("BTS 掉站未探测到 RTR")>=0
                ||alarm.getAlarmname().indexOf("CDT(呼叫具体跟踪)的 FTP Server 与网元 OMP 之间的链路异样")>=0
                ||alarm.getAlarmname().indexOf("CTML 链路断开")>=0||alarm.getAlarmname().indexOf("DSP 资源不可用")>=0
                ||alarm.getAlarmname().indexOf("E1/T1 滑帧超限告警")>=0||alarm.getAlarmname().indexOf("GE 选路异样")>=0
                ||alarm.getAlarmname().indexOf("GPS/Glonass/ 北斗接收机天馈短路")>=0||alarm.getAlarmname().indexOf("MLPPP 组故障告警")>=0
                ||alarm.getAlarmname().indexOf("MTP3 局向不可达")>=0||alarm.getAlarmname().indexOf("MTP3 链路不可用")>=0
                ||alarm.getAlarmname().indexOf("PA 去使能")>=0||alarm.getAlarmname().indexOf("PDSN/HSGW 处于性能异样状态")>=0
                ||alarm.getAlarmname().indexOf("PDSN 处于性能异样状态")>=0||alarm.getAlarmname().indexOf("PWRD485 通信链路断")>=0
                ||alarm.getAlarmname().indexOf("SCCP 子系统不可用")>=0||alarm.getAlarmname().indexOf("SDH/SONET:信号失落")>=0
                ||alarm.getAlarmname().indexOf("STC 局向不可达")>=0||alarm.getAlarmname().indexOf("TRUNK 端口 down")>=0
                ||alarm.getAlarmname().indexOf("TRUNK 通信能力有余")>=0||alarm.getAlarmname().indexOf("蕴含“驻波”的告警")>=0
                ||alarm.getAlarmname().indexOf("备用 BAM 故障告警")>=0||alarm.getAlarmname().indexOf("备用 BAM 数据同步异样告警")>=0
                ||alarm.getAlarmname().indexOf("单板不在位")>=0||alarm.getAlarmname().indexOf("单板对外端口 down")>=0
                ||alarm.getAlarmname().indexOf("单板离线或 CPU 处于长期复位状态")>=0||alarm.getAlarmname().indexOf("单板时钟异样故障")>=0
                ||alarm.getAlarmname().indexOf("单板硬件故障告警")>=0||alarm.getAlarmname().indexOf("单板子系统故障")>=0
                ||alarm.getAlarmname().indexOf("电池欠压告警")>=0||alarm.getAlarmname().indexOf("掉电报警")>=0
                ||alarm.getAlarmname().indexOf("反向链路 RSSI(接管信号强度批示)低")>=0||alarm.getAlarmname().indexOf("反向链路 RSSI(接管信号强度批示)高")>=0
                ||alarm.getAlarmname().indexOf("风扇堵转")>=0||alarm.getAlarmname().indexOf("风扇堵转告警")>=0||alarm.getAlarmname().indexOf("风扇故障")>=0
                ||alarm.getAlarmname().indexOf("干接点告警(机房交换停电告警、电池电压过低告警、交换停电、直流欠压)")>=0||alarm.getAlarmname().indexOf("光口 0 信号失落")>=0
                ||alarm.getAlarmname().indexOf("光口反向帧失锁")>=0||alarm.getAlarmname().indexOf("光口光模块不在位")>=0||alarm.getAlarmname().indexOf("光口光模块无光")>=0
                ||alarm.getAlarmname().indexOf("光口环回检测异样")>=0||alarm.getAlarmname().indexOf("光口上行帧失锁")>=0||alarm.getAlarmname().indexOf("光口信号劣化告警(RSD)")>=0
                ||alarm.getAlarmname().indexOf("光口信号生效告警(RSF)")>=0||alarm.getAlarmname().indexOf("光口支路单元批示告警")>=0||alarm.getAlarmname().indexOf("光口支路告警批示信号告警(AIS)")>=0
                ||alarm.getAlarmname().indexOf("机柜输出电压异样告警")>=0||alarm.getAlarmname().indexOf("基站操作保护链路中断")>=0||alarm.getAlarmname().indexOf("基站时钟不可用告警")>=0
                ||alarm.getAlarmname().indexOf("口支路单元告警批示信号告警(TUAIS)")>=0||alarm.getAlarmname().indexOf("框间链路中断")>=0
                ||alarm.getAlarmname().indexOf("拉远射频单元保护链路异样告警")>=0||alarm.getAlarmname().indexOf("没有可用的 UPCF 资源")>=0
                ||alarm.getAlarmname().indexOf("模块心跳失落")>=0||alarm.getAlarmname().indexOf("目标信令点不可拜访")>=0
                ||alarm.getAlarmname().indexOf("偶联断链")>=0||alarm.getAlarmname().indexOf("偶联通路断")>=0
                ||alarm.getAlarmname().indexOf("七号信令链路不可用")>=0||alarm.getAlarmname().indexOf("射频单元光模块 /SFP 接口电缆不在位告警")>=0
                ||alarm.getAlarmname().indexOf("射频单元光模块接管异样告警")>=0||alarm.getAlarmname().indexOf("射频单元光模块性能好转告警")>=0
                ||alarm.getAlarmname().indexOf("射频单元交换掉电告警")>=0||alarm.getAlarmname().indexOf("时钟模块故障")>=0
                ||alarm.getAlarmname().indexOf("搜星不胜利")>=0||alarm.getAlarmname().indexOf("天馈驻波比重大异样")>=0
                ||alarm.getAlarmname().indexOf("天馈驻波比重大异样故障")>=0||alarm.getAlarmname().indexOf("天馈驻波比个别异样")>=0
                ||alarm.getAlarmname().indexOf("网口心跳链路故障告警")>=0||alarm.getAlarmname().indexOf("网元代理链路断")>=0
                ||alarm.getAlarmname().indexOf("网元和 OMC 断链")>=0||alarm.getAlarmname().indexOf("未探测到 CC")>=0
                ||alarm.getAlarmname().indexOf("未探测到 CH/CVI")>=0||alarm.getAlarmname().indexOf("未探测到 FS")>=0
                ||alarm.getAlarmname().indexOf("未探测到 GCM")>=0||alarm.getAlarmname().indexOf("未探测到 PM")>=0
                ||alarm.getAlarmname().indexOf("未探测到 RTR")>=0||alarm.getAlarmname().indexOf("未探测到 SA")>=0
                ||alarm.getAlarmname().indexOf("温度告警")>=0||alarm.getAlarmname().indexOf("无可用 SPCF 资源")>=0
                ||alarm.getAlarmname().indexOf("零碎时钟参考源不可用告警")>=0||alarm.getAlarmname().indexOf("下联光口信号失落")>=0
                ||alarm.getAlarmname().indexOf("下联光口帧信号失落")>=0||alarm.getAlarmname().indexOf("小区退服")>=0
                ||alarm.getAlarmname().indexOf("小区退服 BSC 与 BTS 通信中断")>=0||alarm.getAlarmname().indexOf("星卡工作异样告警")>=0
                ||alarm.getAlarmname().indexOf("星卡锁星有余告警")>=0||alarm.getAlarmname().indexOf("星卡天线故障告警")>=0
                ||alarm.getAlarmname().indexOf("星卡天线搜星异样告警")>=0||alarm.getAlarmname().indexOf("业务单板心跳失落")>=0
                ||alarm.getAlarmname().indexOf("语音呼叫成功率低于门限值")>=0||alarm.getAlarmname().indexOf("直流输出电压欠压")>=0
                ||alarm.getAlarmname().indexOf("中继线不可用")>=0||alarm.getAlarmname().indexOf("中继线谬误")>=0
                ||alarm.getAlarmname().indexOf("主备 SCU 板间通信故障告警")>=0||alarm.getAlarmname().indexOf("主备通信链路断")>=0
                ){return true;}else
            return false;
    }
    
    /** 替换业余无效告警过滤 */
    private boolean getJiaoHuanfiler(GesAlarmDto alarm){return true;}
    
    /** 业务平台业余无效告警过滤 */
    private boolean getYeWuPingTaifiler(GesAlarmDto alarm){return true;}
        
    /**4G 业余无效告警过滤 */
    private boolean get4Gfiler(GesAlarmDto alarm){/*if(alarm.getAlarmname().indexOf("小区不可用")>=0||alarm.getAlarmname().indexOf("3/4G 大面积 RRU 退服")>=0
                ||alarm.getAlarmname().indexOf("3/4G 大面积断站")>=0||alarm.getAlarmname().indexOf("CELL FAULTY")>=0
                ||alarm.getAlarmname().indexOf("GNSS 接收机搜星故障")>=0||alarm.getAlarmname().indexOf("GNSS 天馈链路故障")>=0
                ||alarm.getAlarmname().indexOf("LTE 小区退出服务")>=0||alarm.getAlarmname().indexOf("NE O&M CONNECTION FAILURE")>=0
                ||alarm.getAlarmname().indexOf("NE OandM CONNECTION FAILURE")>=0
                ||alarm.getAlarmname().indexOf("RRU 掉电告警")>=0||alarm.getAlarmname().indexOf("RRU 链路断")>=0
                ||alarm.getAlarmname().indexOf("RX 通道异样(198098469)")>=0||alarm.getAlarmname().indexOf("X2 断链告警")>=0
                ||alarm.getAlarmname().indexOf("单板不在位")>=0||alarm.getAlarmname().indexOf("单板通信链路断")>=0
                ||alarm.getAlarmname().indexOf("单板温度异样")>=0||alarm.getAlarmname().indexOf("单板硬件故障")>=0
                ||alarm.getAlarmname().indexOf("风扇故障")>=0||alarm.getAlarmname().indexOf("光口接管帧失锁")>=0
                ||alarm.getAlarmname().indexOf("光口未接管到光信号")>=0||alarm.getAlarmname().indexOf("光模块不可用")>=0
                ||alarm.getAlarmname().indexOf("光模块接管光功率异样")>=0||alarm.getAlarmname().indexOf("基站退出服务")>=0
                ||alarm.getAlarmname().indexOf("基站退出服务 andRRU 链路断")>=0||alarm.getAlarmname().indexOf("进风口温度异样")>=0
                ||alarm.getAlarmname().indexOf("没有可用的空口时钟源")>=0||alarm.getAlarmname().indexOf("设施掉电")>=0
                ||alarm.getAlarmname().indexOf("瞬断告警")>=0||alarm.getAlarmname().indexOf("天馈驻波比异样")>=0
                ||alarm.getAlarmname().indexOf("内部扩大设施异样")>=0||alarm.getAlarmname().indexOf("网元断链告警")>=0
                ||alarm.getAlarmname().indexOf("网元断链告警")>=0||alarm.getAlarmname().indexOf("网元连贯中断")>=0
                ||alarm.getAlarmname().indexOf("重大大面积断站")>=0||alarm.getAlarmname().indexOf("重大大面积断站")>=0){return true;}else
            return false;*/
        if(alarm.getAlarmname().indexOf("射频单元启动事件")>=0||alarm.getAlarmname().indexOf("性能后果失落")>=0){return false;}else{return true;}
    }    
    
    /** 自监控业余无效告警过滤 */
    private boolean getZiJianKongfiler(GesAlarmDto alarm){return true;}
    
    /** 群障拦挡业余无效告警过滤 */
    private boolean getQunZhangLanJiefiler(GesAlarmDto alarm){return true;}

}

生产者 Handler ProducerHandler

package com.ustcinfo.kanms.alarmcollector.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.log4j.Logger;
 

import com.ustcinfo.kanms.alarmcollector.main.GlobleConfiguration;

import java.util.Properties;

public class ProducerHandler{
    private final KafkaProducer<String, String> producer;
    private static Logger logger = Logger.getLogger(ProducerHandler.class.getName());
    private String kafkaUrl;
    
    public ProducerHandler(String topic,String message) {this.kafkaUrl = GlobleConfiguration.getInstance().getKafkaUrl();
        Properties props = new Properties();
        // 此处配置的是 kafka 的端口  
        props.put("bootstrap.servers",kafkaUrl); 
        props.put("acks", "all");
        props.put("retries", "0");
        props.put("batch.size", "16384");
        props.put("linger.ms", "1");
        props.put("buffer.memory", "33554432");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producer = new KafkaProducer<String, String>(props);
        // 生成音讯
        ProducerRecord record = new ProducerRecord(topic,message);
        // 发送音讯
        producer.send(record);
        logger.info("【kafka】向 Kafka 的 TOPIC【" + topic + "】中发送音讯");
        logger.info("【kafka】音讯内容:" + message);
        logger.info("【kafka】推送胜利");
    }
}

生产者 Worker ProducerWorker

package com.ustcinfo.kanms.alarmcollector.kafka;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.log4j.Logger;

public class ProducerWorker implements Runnable{
    private ProducerRecord<String, String> producerRecord;
     
    public ProducerWorker(ProducerRecord record) {this.producerRecord = record;}
 
    private static Logger logger = Logger.getLogger(ProducerWorker.class.getName());
 
    public void run() {System.out.println(producerRecord.value());
    }
}

消费者 Handler ConsumerHandler

package com.ustcinfo.kanms.alarmcollector.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer; 
import org.apache.log4j.Logger;
import com.ustcinfo.kanms.alarmcollector.main.GlobleConfiguration;

import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import javax.xml.datatype.Duration;

public class ConsumerHandler {//static Logger logger = Logger.getLogger(ConsumerHandler.class.getName());
    private final KafkaConsumer<String, String> consumer;
    private ExecutorService executors;
    private String kafkaUrl;
 
    public ConsumerHandler(List<String> topics) {this.kafkaUrl = GlobleConfiguration.getInstance().getKafkaUrl();
        Properties props = new Properties();
        props.put("bootstrap.servers", kafkaUrl);
        props.put("group.id", "gzpt");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(topics);
        execute(1);
    }
 
    public void execute(int workerNum) {
        executors = new ThreadPoolExecutor(workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue(1000), new ThreadPoolExecutor.CallerRunsPolicy());
        Thread t = new Thread(new Runnable(){// 启动一个子线程来监听 kafka 音讯
            public void run(){while (true) {
                   Duration timeout=null;
                    ConsumerRecords<String, String> records = consumer.poll(100);
                    for (final ConsumerRecord record : records) {//logger.info("【Kafka】监听到 kafka 的 TOPIC【" + record.topic() + "】的音讯");
                        //logger.info("【Kafka】音讯内容:" + record.value());
                        executors.submit(new ConsumerWorker(record));
                    }
                    
                } 
            }});
        t.start();}
 
    public void shutdown() {if (consumer != null) {consumer.close();
        }
        if (executors != null) {executors.shutdown();
        }
        try {if (!executors.awaitTermination(10, TimeUnit.SECONDS)) {//logger.info("【Kafka】Timeout.... Ignore for this case");
            }
        } catch (InterruptedException ignored) {//logger.info("【Kafka】Other thread interrupted this shutdown, ignore for this case.");
            Thread.currentThread().interrupt();
        }
    }
}

消费者 Worker ConsumerWorker

package com.ustcinfo.kanms.alarmcollector.kafka;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;

import javax.jms.JMSException;
import javax.jms.TextMessage;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.log4j.Logger;

import com.ustcinfo.kanms.alarmcollector.model.GesAlarm;
import com.ustcinfo.kanms.alarmcollector.util.FileWriteThread;
//import com.ustcinfo.kanms.alarmcollector.activemq.AlarmAutoReceiverThread.AlarmAutoReceiver;
import com.ustcinfo.kanms.alarmcollector.framework.GesAlarmContainer;
import com.ustcinfo.kanms.alarmcollector.framework.ReadXML;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;

public class ConsumerWorker implements Runnable {
    private ConsumerRecord<String, String> consumerRecord;
    private final FileWriteThread filewrite = new FileWriteThread();// 建设一个新的线程写文件 
    
    public ConsumerWorker() {this.gesAlarmContainer = GesAlarmContainer.getInstance();
    }
     
    public ConsumerWorker(ConsumerRecord record) {this.consumerRecord = record;}
    private GesAlarmContainer gesAlarmContainer;
    private static Logger logger = Logger.getLogger(ConsumerWorker.class);
     
    public void run() {
        // consumer 接管音讯后,这里能够写针对收到的音讯的业务解决
        System.out.println(consumerRecord.value());
        String writeData=consumerRecord.value();
        try{filewrite.handle(writeData+"\n--"+dateFormat(new Date()));
        }catch(Exception e){logger.error("告警日志写入出错!",e);
            filewrite.handle(writeData+"\n--"+dateFormat(new Date()));
        }
        /*Message message=consumerRecord.value();
        
        if (message instanceof TextMessage){TextMessage  tm = (TextMessage)message;
            try {String xml = tm.getText();
                if(xml!=null){
                    try{filewrite.handle(xml.substring(xml.indexOf("<ALARMS>"),xml.lastIndexOf("</ALARMS>")+9)+"\n--"+dateFormat(new Date()));
                    }catch(Exception e){logger.error("告警日志写入出错!",e);
                        filewrite.handle(xml+"\n--"+dateFormat(new Date()));
                    }
                }
                // 当音讯为“quit”时,唤醒线程
                if(xml != null && xml.equalsIgnoreCase("quit")) {logger.info(Thread.currentThread().getName() + "接管到的音讯为:" + xml + ", 开始退出线程");
                    synchronized (this) {notifyAll();
                    }
                    return;
                }
                */
                /*String gesAlarmStr = ReadXML.obtainBodyInfo(xml.trim());
                List<GesAlarm> gesAlarmList = ReadXML.getAlarmValue(gesAlarmStr);
                if(gesAlarmList!=null&&gesAlarmList.size()>0){logger.debug("gesAlarmList.size()=" + gesAlarmList.size());
                    for(int i=0;i<gesAlarmList.size();i++){GesAlarm gesAlarm = new GesAlarm();
                        gesAlarm = gesAlarmList.get(i);
                        gesAlarmContainer.putGesAlarm(gesAlarm);
                    }            
                }*/    
                // 向 kafka 写入数据
                /*new ProducerHandler("znwgAlarm",xml); 
            }catch(Exception e){logger.error("承受告警信息出错!", e);
            }finally { }*/
        
       /* String xml=consumerRecord.value().trim();
        String gesAlarmStr = ReadXML.obtainBodyInfo(xml.trim());
        List<GesAlarm> gesAlarmList = ReadXML.getAlarmValue(gesAlarmStr);
        if(gesAlarmList!=null&&gesAlarmList.size()>0){logger.debug("gesAlarmList.size()=" + gesAlarmList.size());
            for(int i=0;i<gesAlarmList.size();i++){GesAlarm gesAlarm = new GesAlarm();
                gesAlarm = gesAlarmList.get(i);
                gesAlarmContainer.putGesAlarm(gesAlarm);
            }    
        }  */
    }  
    
    private  String dateFormat(Date date){SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        return sdf.format(date);
    }
}

正文完
 0