上篇说了KAFKA利用实例,本篇承接上篇,着重形容activeMQ音讯机制的利用。
KAFKA和MQ同为数据异步解决中间件,实质都是对音讯的异步解决,异步通信、削谷填峰,高并发状况下的数据处理机制。他们的不同之处在于解决数据量的大小。
MQ和KAFKA相比拟,KAFKA解决的数据量更大.
下图为activeMQ利用目录:

ActiveMQ客户端,对连贯和会话的治理
/** *  */package com.ustcinfo.kanms.alarmcollector.activemq;import javax.jms.Connection;import javax.jms.ExceptionListener;import javax.jms.JMSException;import javax.jms.Session;import org.apache.activemq.ActiveMQConnectionFactory;import org.apache.log4j.Logger;import com.ustcinfo.kanms.alarmcollector.main.GlobleConfiguration;/** * ================================================= <br> * 工程:GessAlarmCollector <br> * 类名:ActiveMQClient <br> * 作者:dlzhang <br> * 工夫:2014-8-28下午05:37:21<br> * 版本:Version 1.0 <br><br> * 形容:ActiveMQ客户端,对连贯和会话的治理,保障全局只有一个会话被创立,缩小服务端压力、节俭资源<br> * ================================================= <br> */public class ActiveMQClient{    private static final Logger logger = Logger.getLogger(ActiveMQClient.class);            private String url;    private String user;    private String passwd;        private ActiveMQConnectionFactory connFactory;    private Connection conn;    private Session session;        private boolean isConn;        public ActiveMQClient(){        // 初始化参数        this.url = GlobleConfiguration.getInstance().getActiveMqUrl();        this.user = GlobleConfiguration.getInstance().getActiveMqUser();        this.passwd = GlobleConfiguration.getInstance().getActiveMqPasswd();    }            /**     * 建设连贯     */    protected synchronized void buildConnect() {        if(isConn)            return;        try {            logger.debug("建设连贯,user=" + user + ", passwd=" + passwd + ", url=" + url);            connFactory = new ActiveMQConnectionFactory(user, passwd, url);            conn = connFactory.createConnection();            conn.start();            session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);            isConn = true;            logger.info("建设连贯胜利");        } catch (JMSException e) {            logger.error("建设连贯失败:" + e.getMessage(), e);            isConn = false;        }    }        /**     * 敞开连贯     */    public synchronized void close() {        try {            if(null != session)                session.close();            if(null != conn)                conn.close();        } catch (JMSException e) {            logger.error("敞开连贯失败:" + e.getMessage(), e);        } finally {            session = null;            conn = null;            connFactory = null;            isConn = false;        }    }    /**     * @return the url     */    public String getUrl() {        return url;    }    /**     * @param url the url to set     */    public void setUrl(String url) {        this.url = url;    }    /**     * @return the user     */    public String getUser() {        return user;    }    /**     * @param user the user to set     */    public void setUser(String user) {        this.user = user;    }    /**     * @return the passwd     */    public String getPasswd() {        return passwd;    }    /**     * @param passwd the passwd to set     */    public void setPasswd(String passwd) {        this.passwd = passwd;    }    /**     * @return the connFactory     */    public ActiveMQConnectionFactory getConnFactory() {        if(!isConn)            buildConnect();        if(null == connFactory && isConn) {            this.close();            this.buildConnect();        }        return connFactory;    }    /**     * @param connFactory the connFactory to set     */    public void setConnFactory(ActiveMQConnectionFactory connFactory) {        this.connFactory = connFactory;    }    /**     * @return the conn     */    public Connection getConn() {        if(!isConn)            buildConnect();        if(null == conn && isConn) {            this.close();            this.buildConnect();        }        return conn;    }    /**     * @param conn the conn to set     */    public void setConn(Connection conn) {        this.conn = conn;    }    /**     * @return the session     */    public Session getSession() {        if(!isConn)            buildConnect();        if(null == session && isConn) {            this.close();            this.buildConnect();        }        return session;    }    /**     * @param session the session to set     */    public void setSession(Session session) {        this.session = session;    }    /**     * @return the isOpen     */    public boolean isConn() {        return isConn;    }    /**     * @param isOpen the isOpen to set     */    public void setConn(boolean isConn) {        this.isConn = isConn;    }}
activemq音讯主体的定义和申明
/** *  */package com.ustcinfo.kanms.alarmcollector.activemq;import java.io.Serializable;import javax.jms.BytesMessage;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageProducer;import javax.jms.ObjectMessage;import javax.jms.Session;import javax.jms.TextMessage;import org.apache.log4j.Logger;import com.ustcinfo.kanms.alarmcollector.main.GlobleConfiguration;/** * ================================================= <br> * 工程:GessAlarmCollector <br> * 类名:ActiveMQSender <br> * 作者:dlzhang <br> * 工夫:2014-8-29下午01:02:40<br> * 版本:Version 1.0 <br> <br> * 形容:形容该文件的作用<br> * ================================================= <br> */public class ActiveMQSender {    private static final Logger logger = Logger.getLogger(ActiveMQSender.class);    private ActiveMQClient mqClient;    private Session session;    private String sendQueueName;    private Destination dest;    private MessageProducer producer;        public ActiveMQSender() {        // 初始化参数        this.mqClient = new ActiveMQClient();        this.session = this.mqClient.getSession();        this.sendQueueName = GlobleConfiguration.getInstance().getActiveMqSendQueueName();        try {            dest = session.createQueue(sendQueueName);            producer = session.createProducer(dest);        } catch (JMSException e) {            logger.error(e.getMessage(), e);        }    }//    /**//     * 测试函数//     * @param args//     *///    public static void main(String[] args) {//        // 初始化零碎配置文件//        GlobleConfiguration.getInstance().initSysConfig();//        //        ActiveMQSender sender = new ActiveMQSender();////        for(int i=0; i<100; i++) {////            sender.sendTextMessage("这是一个测试");//            sender.sendTextMessage("quit");//            logger.debug("第" + ++num + "条音讯发送胜利");////        }//    }//    private static long num;        /**     * @param msg     */    public void sendMessage(Message msg) {        try {            producer.send(msg);        } catch (JMSException e) {            logger.error(e.getMessage(), e);        }    }        /**     * @param text     */    public void sendTextMessage(String text) {        TextMessage tMsg = null;        try {            tMsg = session.createTextMessage(text);        } catch (JMSException e) {            logger.error(e.getMessage(), e);        }        this.sendMessage(tMsg);    }        /**     * @param bytes     */    public void sendbytesMessage(byte[] bytes) {        BytesMessage bMsg = null;        try {            bMsg = session.createBytesMessage();            bMsg.writeBytes(bytes);        } catch (JMSException e) {            // TODO Auto-generated catch block            e.printStackTrace();        }        this.sendMessage(bMsg);    }        /**     * @param s     */    public void sendObjectMessage(Serializable s) {        ObjectMessage oMsg = null;        try {            oMsg = session.createObjectMessage(s);        } catch (JMSException e) {            logger.error(e.getMessage(), e);        }        this.sendMessage(oMsg);    }    /**     * @return the mqClient     */    public ActiveMQClient getMqClient() {        return mqClient;    }    /**     * @param mqClient the mqClient to set     */    public void setMqClient(ActiveMQClient mqClient) {        this.mqClient = mqClient;    }    /**     * @return the session     */    public Session getSession() {        return session;    }    /**     * @param session the session to set     */    public void setSession(Session session) {        this.session = session;    }    /**     * @return the sendQueueName     */    public String getSendQueueName() {        return sendQueueName;    }    /**     * @param sendQueueName the sendQueueName to set     */    public void setSendQueueName(String sendQueueName) {        this.sendQueueName = sendQueueName;    }    /**     * @return the dest     */    public Destination getDest() {        return dest;    }    /**     * @param dest the dest to set     */    public void setDest(Destination dest) {        this.dest = dest;    }    /**     * @return the producer     */    public MessageProducer getProducer() {        return producer;    }    /**     * @param producer the producer to set     */    public void setProducer(MessageProducer producer) {        this.producer = producer;    }    }
音讯接受器,应用监听的形式承受音讯
/** *  */package com.ustcinfo.kanms.alarmcollector.activemq;import java.text.SimpleDateFormat;import java.util.Date;import java.util.List;import javax.jms.Destination;import javax.jms.ExceptionListener;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageConsumer;import javax.jms.MessageListener;import javax.jms.Session;import javax.jms.TextMessage;import org.apache.log4j.Logger;import com.ustcinfo.kanms.alarmcollector.framework.BaseThread;import com.ustcinfo.kanms.alarmcollector.framework.GesAlarmContainer;import com.ustcinfo.kanms.alarmcollector.framework.ReadXML;import com.ustcinfo.kanms.alarmcollector.kafka.ProducerHandler;import com.ustcinfo.kanms.alarmcollector.main.GlobleConfiguration;import com.ustcinfo.kanms.alarmcollector.model.GesAlarm;import com.ustcinfo.kanms.alarmcollector.util.FileWriteThread;/** * ================================================= <br> * 工程:AlarmAutoReceiver <br> * 类名:ActiveMQReceiver <br> * 作者:lt <br> * 工夫:2019-9-23下午08:49:38<br> * 版本:Version 1.0 <br><br> * 形容:告警音讯接受器,应用监听的形式承受音讯<br> * 实现ExceptionListener接口,对服务器进行监听,实现主动重连性能<br> * ================================================= <br> */public class AlarmAutoReceiverThread{     private static final Logger logger = Logger.getLogger(AlarmAutoReceiverThread.class);     private final FileWriteThread filewrite = new FileWriteThread();//建设一个新的线程写文件     public AlarmAutoReceiverThread(int n) {         for(int i=0;i<n;i++){             new AlarmAutoReceiver().start();             logger.info(">>>>>胜利启动监听线程:线程"+i);         }     }     class AlarmAutoReceiver extends BaseThread implements MessageListener, ExceptionListener {        private  final Logger log = Logger.getLogger(AlarmAutoReceiver.class);        private ActiveMQClient mqClient;        private GesAlarmContainer gesAlarmContainer;        private Session session;        private String recvQueueName;        private Destination dest;        private MessageConsumer consumer;                public AlarmAutoReceiver() {            // 初始化参数            this.mqClient = new ActiveMQClient();            this.gesAlarmContainer = GesAlarmContainer.getInstance();            try {                mqClient.getConn().setExceptionListener(this); // 设置监听            } catch (JMSException e) {                log.error(e.getMessage(), e);            }            this.session = this.mqClient.getSession();            this.recvQueueName = GlobleConfiguration.getInstance().getAlarmRecvQueueName();        }                public AlarmAutoReceiver(ActiveMQClient mqClient) {            // 初始化参数            this.mqClient = mqClient;            this.gesAlarmContainer = GesAlarmContainer.getInstance();            try {                mqClient.getConn().setExceptionListener(this); // 设置监听            } catch (JMSException e) {                log.error(e.getMessage(), e);            }            this.session = this.mqClient.getSession();            this.recvQueueName = GlobleConfiguration.getInstance().getAlarmRecvQueueName();        }                /* (non-Javadoc)         * @see java.lang.Thread#run()         */        @Override        public void run() {            log.debug("启动线程, Thread.currentThread().getName()=" + Thread.currentThread().getName());            try {                dest = session.createQueue(recvQueueName);                consumer = session.createConsumer(dest);                consumer.setMessageListener(this);                // 阻塞线程,直到收到音讯为“quit”时被唤醒                synchronized (this) {                    wait();                }                log.debug("完结线程, Thread.currentThread().getName()=" + Thread.currentThread().getName());                mqClient.close();            } catch (JMSException e) {                log.error("创立消费者失败:" + e.getMessage(), e);            } catch (InterruptedException e) {                log.error("线程被中断", e);            }        }                /* (non-Javadoc)         * @see javax.jms.ExceptionListener#onException(javax.jms.JMSException)         */        @Override        public void onException(JMSException e) {            // 捕捉异样,连贯重连            log.error("服务端异样", e);                        // 完结线程            if(this.isAlive()) {                synchronized (this) {                    this.notifyAll();                }            }                        // 每隔一分钟循环获取,直到获取连贯胜利            long sleepTimeSec = 10;            long sleepTimeMillis = sleepTimeSec * 1000;            int reConnCnt = 0;            while (true) {                try {                    reConnCnt++;                    log.info("休眠" + sleepTimeSec + "秒 -_-~zZ");                    Thread.sleep(sleepTimeMillis);                    log.debug("开始从新获取连贯");                                        // 先敞开连贯,再从新连贯                    mqClient.close();                    mqClient.buildConnect();                                        if(mqClient.isConn()) {                        log.info("从新获取连贯胜利,耗时:[" + reConnCnt * sleepTimeSec + "]秒 ^_^");                        // 从新创立监听                        new AlarmAutoReceiver(mqClient).start();                        break;                    }                    log.error("第" + reConnCnt + "次从新获取连贯失败 T_T");                } catch (InterruptedException e1) {                    log.error(e1.getMessage(), e1);                }            }        }            //    /**    //     * 测试函数    //     * @param args    //     */    //    public static void main(String[] args) {    //        // 初始化零碎配置文件    //        GlobleConfiguration.getInstance().initSysConfig();    //            //        ActiveMQReceiver recv = new ActiveMQReceiver();    //        recv.start();    //        logger.debug("消费者启动胜利");    //    }                /* (non-Javadoc)         * @see javax.jms.MessageListener#onMessage(javax.jms.Message)         */        @Override        public void onMessage(Message message) {            log.info("-------------- 收到一条告警信息 -----------");            if (message instanceof TextMessage){                TextMessage  tm = (TextMessage)message;                try {                    String xml = tm.getText();                    if(xml!=null){                        try{                            filewrite.handle(xml.substring(xml.indexOf("<ALARMS>"),xml.lastIndexOf("</ALARMS>")+9)+"\n--"+dateFormat(new Date()));                        }catch(Exception e){                            log.error("告警日志写入出错!",e);                            filewrite.handle(xml+"\n--"+dateFormat(new Date()));                        }                    }                    // 当音讯为“quit”时,唤醒线程                    if(xml != null && xml.equalsIgnoreCase("quit")) {                        logger.info(Thread.currentThread().getName() + "接管到的音讯为:" + xml + ",开始退出线程");                        synchronized (this) {                            notifyAll();                        }                        return;                    }                    /*String gesAlarmStr = ReadXML.obtainBodyInfo(xml.trim());                    List<GesAlarm> gesAlarmList = ReadXML.getAlarmValue(gesAlarmStr);                    if(gesAlarmList!=null&&gesAlarmList.size()>0){                        logger.debug("gesAlarmList.size()=" + gesAlarmList.size());                        for(int i=0;i<gesAlarmList.size();i++){                            GesAlarm gesAlarm = new GesAlarm();                            gesAlarm = gesAlarmList.get(i);                            gesAlarmContainer.putGesAlarm(gesAlarm);                        }                                }*/                        //向kafka写入数据                    new ProducerHandler("znwgAlarm",xml);                 } catch (JMSException e) {                                    } catch(Exception e){                    logger.error("承受告警信息出错!", e);                }finally {                                    }            }        }                private  String dateFormat(Date date){            SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");            return sdf.format(date);        }    //    private long num;                /**         * @return the mqClient         */        public ActiveMQClient getMqClient() {            return mqClient;        }            /**         * @param mqClient the mqClient to set         */        public void setMqClient(ActiveMQClient mqClient) {            this.mqClient = mqClient;        }            /**         * @return the session         */        public Session getSession() {            return session;        }            /**         * @param session the session to set         */        public void setSession(Session session) {            this.session = session;        }            /**         * @return the recvQueueName         */        public String getRecvQueueName() {            return recvQueueName;        }            /**         * @param recvQueueName the recvQueueName to set         */        public void setRecvQueueName(String recvQueueName) {            this.recvQueueName = recvQueueName;        }            /**         * @return the dest         */        public Destination getDest() {            return dest;        }            /**         * @param dest the dest to set         */        public void setDest(Destination dest) {            this.dest = dest;        }            /**         * @return the consumer         */        public MessageConsumer getConsumer() {            return consumer;        }            /**         * @param consumer the consumer to set         */        public void setConsumer(MessageConsumer consumer) {            this.consumer = consumer;        }                    }}