上篇说了KAFKA利用实例,本篇承接上篇,着重形容activeMQ音讯机制的利用。
KAFKA和MQ同为数据异步解决中间件,实质都是对音讯的异步解决,异步通信、削谷填峰,高并发状况下的数据处理机制。他们的不同之处在于解决数据量的大小。
MQ和KAFKA相比拟,KAFKA解决的数据量更大.
下图为activeMQ利用目录:
ActiveMQ客户端,对连贯和会话的治理
/** * */package com.ustcinfo.kanms.alarmcollector.activemq;import javax.jms.Connection;import javax.jms.ExceptionListener;import javax.jms.JMSException;import javax.jms.Session;import org.apache.activemq.ActiveMQConnectionFactory;import org.apache.log4j.Logger;import com.ustcinfo.kanms.alarmcollector.main.GlobleConfiguration;/** * ================================================= <br> * 工程:GessAlarmCollector <br> * 类名:ActiveMQClient <br> * 作者:dlzhang <br> * 工夫:2014-8-28下午05:37:21<br> * 版本:Version 1.0 <br><br> * 形容:ActiveMQ客户端,对连贯和会话的治理,保障全局只有一个会话被创立,缩小服务端压力、节俭资源<br> * ================================================= <br> */public class ActiveMQClient{ private static final Logger logger = Logger.getLogger(ActiveMQClient.class); private String url; private String user; private String passwd; private ActiveMQConnectionFactory connFactory; private Connection conn; private Session session; private boolean isConn; public ActiveMQClient(){ // 初始化参数 this.url = GlobleConfiguration.getInstance().getActiveMqUrl(); this.user = GlobleConfiguration.getInstance().getActiveMqUser(); this.passwd = GlobleConfiguration.getInstance().getActiveMqPasswd(); } /** * 建设连贯 */ protected synchronized void buildConnect() { if(isConn) return; try { logger.debug("建设连贯,user=" + user + ", passwd=" + passwd + ", url=" + url); connFactory = new ActiveMQConnectionFactory(user, passwd, url); conn = connFactory.createConnection(); conn.start(); session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); isConn = true; logger.info("建设连贯胜利"); } catch (JMSException e) { logger.error("建设连贯失败:" + e.getMessage(), e); isConn = false; } } /** * 敞开连贯 */ public synchronized void close() { try { if(null != session) session.close(); if(null != conn) conn.close(); } catch (JMSException e) { logger.error("敞开连贯失败:" + e.getMessage(), e); } finally { session = null; conn = null; connFactory = null; isConn = false; } } /** * @return the url */ public String getUrl() { return url; } /** * @param url the url to set */ public void setUrl(String url) { this.url = url; } /** * @return the user */ public String getUser() { return user; } /** * @param user the user to set */ public void setUser(String user) { this.user = user; } /** * @return the passwd */ public String getPasswd() { return passwd; } /** * @param passwd the passwd to set */ public void setPasswd(String passwd) { this.passwd = passwd; } /** * @return the connFactory */ public ActiveMQConnectionFactory getConnFactory() { if(!isConn) buildConnect(); if(null == connFactory && isConn) { this.close(); this.buildConnect(); } return connFactory; } /** * @param connFactory the connFactory to set */ public void setConnFactory(ActiveMQConnectionFactory connFactory) { this.connFactory = connFactory; } /** * @return the conn */ public Connection getConn() { if(!isConn) buildConnect(); if(null == conn && isConn) { this.close(); this.buildConnect(); } return conn; } /** * @param conn the conn to set */ public void setConn(Connection conn) { this.conn = conn; } /** * @return the session */ public Session getSession() { if(!isConn) buildConnect(); if(null == session && isConn) { this.close(); this.buildConnect(); } return session; } /** * @param session the session to set */ public void setSession(Session session) { this.session = session; } /** * @return the isOpen */ public boolean isConn() { return isConn; } /** * @param isOpen the isOpen to set */ public void setConn(boolean isConn) { this.isConn = isConn; }}
activemq音讯主体的定义和申明
/** * */package com.ustcinfo.kanms.alarmcollector.activemq;import java.io.Serializable;import javax.jms.BytesMessage;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageProducer;import javax.jms.ObjectMessage;import javax.jms.Session;import javax.jms.TextMessage;import org.apache.log4j.Logger;import com.ustcinfo.kanms.alarmcollector.main.GlobleConfiguration;/** * ================================================= <br> * 工程:GessAlarmCollector <br> * 类名:ActiveMQSender <br> * 作者:dlzhang <br> * 工夫:2014-8-29下午01:02:40<br> * 版本:Version 1.0 <br> <br> * 形容:形容该文件的作用<br> * ================================================= <br> */public class ActiveMQSender { private static final Logger logger = Logger.getLogger(ActiveMQSender.class); private ActiveMQClient mqClient; private Session session; private String sendQueueName; private Destination dest; private MessageProducer producer; public ActiveMQSender() { // 初始化参数 this.mqClient = new ActiveMQClient(); this.session = this.mqClient.getSession(); this.sendQueueName = GlobleConfiguration.getInstance().getActiveMqSendQueueName(); try { dest = session.createQueue(sendQueueName); producer = session.createProducer(dest); } catch (JMSException e) { logger.error(e.getMessage(), e); } }// /**// * 测试函数// * @param args// */// public static void main(String[] args) {// // 初始化零碎配置文件// GlobleConfiguration.getInstance().initSysConfig();// // ActiveMQSender sender = new ActiveMQSender();//// for(int i=0; i<100; i++) {//// sender.sendTextMessage("这是一个测试");// sender.sendTextMessage("quit");// logger.debug("第" + ++num + "条音讯发送胜利");//// }// }// private static long num; /** * @param msg */ public void sendMessage(Message msg) { try { producer.send(msg); } catch (JMSException e) { logger.error(e.getMessage(), e); } } /** * @param text */ public void sendTextMessage(String text) { TextMessage tMsg = null; try { tMsg = session.createTextMessage(text); } catch (JMSException e) { logger.error(e.getMessage(), e); } this.sendMessage(tMsg); } /** * @param bytes */ public void sendbytesMessage(byte[] bytes) { BytesMessage bMsg = null; try { bMsg = session.createBytesMessage(); bMsg.writeBytes(bytes); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } this.sendMessage(bMsg); } /** * @param s */ public void sendObjectMessage(Serializable s) { ObjectMessage oMsg = null; try { oMsg = session.createObjectMessage(s); } catch (JMSException e) { logger.error(e.getMessage(), e); } this.sendMessage(oMsg); } /** * @return the mqClient */ public ActiveMQClient getMqClient() { return mqClient; } /** * @param mqClient the mqClient to set */ public void setMqClient(ActiveMQClient mqClient) { this.mqClient = mqClient; } /** * @return the session */ public Session getSession() { return session; } /** * @param session the session to set */ public void setSession(Session session) { this.session = session; } /** * @return the sendQueueName */ public String getSendQueueName() { return sendQueueName; } /** * @param sendQueueName the sendQueueName to set */ public void setSendQueueName(String sendQueueName) { this.sendQueueName = sendQueueName; } /** * @return the dest */ public Destination getDest() { return dest; } /** * @param dest the dest to set */ public void setDest(Destination dest) { this.dest = dest; } /** * @return the producer */ public MessageProducer getProducer() { return producer; } /** * @param producer the producer to set */ public void setProducer(MessageProducer producer) { this.producer = producer; } }
音讯接受器,应用监听的形式承受音讯
/** * */package com.ustcinfo.kanms.alarmcollector.activemq;import java.text.SimpleDateFormat;import java.util.Date;import java.util.List;import javax.jms.Destination;import javax.jms.ExceptionListener;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageConsumer;import javax.jms.MessageListener;import javax.jms.Session;import javax.jms.TextMessage;import org.apache.log4j.Logger;import com.ustcinfo.kanms.alarmcollector.framework.BaseThread;import com.ustcinfo.kanms.alarmcollector.framework.GesAlarmContainer;import com.ustcinfo.kanms.alarmcollector.framework.ReadXML;import com.ustcinfo.kanms.alarmcollector.kafka.ProducerHandler;import com.ustcinfo.kanms.alarmcollector.main.GlobleConfiguration;import com.ustcinfo.kanms.alarmcollector.model.GesAlarm;import com.ustcinfo.kanms.alarmcollector.util.FileWriteThread;/** * ================================================= <br> * 工程:AlarmAutoReceiver <br> * 类名:ActiveMQReceiver <br> * 作者:lt <br> * 工夫:2019-9-23下午08:49:38<br> * 版本:Version 1.0 <br><br> * 形容:告警音讯接受器,应用监听的形式承受音讯<br> * 实现ExceptionListener接口,对服务器进行监听,实现主动重连性能<br> * ================================================= <br> */public class AlarmAutoReceiverThread{ private static final Logger logger = Logger.getLogger(AlarmAutoReceiverThread.class); private final FileWriteThread filewrite = new FileWriteThread();//建设一个新的线程写文件 public AlarmAutoReceiverThread(int n) { for(int i=0;i<n;i++){ new AlarmAutoReceiver().start(); logger.info(">>>>>胜利启动监听线程:线程"+i); } } class AlarmAutoReceiver extends BaseThread implements MessageListener, ExceptionListener { private final Logger log = Logger.getLogger(AlarmAutoReceiver.class); private ActiveMQClient mqClient; private GesAlarmContainer gesAlarmContainer; private Session session; private String recvQueueName; private Destination dest; private MessageConsumer consumer; public AlarmAutoReceiver() { // 初始化参数 this.mqClient = new ActiveMQClient(); this.gesAlarmContainer = GesAlarmContainer.getInstance(); try { mqClient.getConn().setExceptionListener(this); // 设置监听 } catch (JMSException e) { log.error(e.getMessage(), e); } this.session = this.mqClient.getSession(); this.recvQueueName = GlobleConfiguration.getInstance().getAlarmRecvQueueName(); } public AlarmAutoReceiver(ActiveMQClient mqClient) { // 初始化参数 this.mqClient = mqClient; this.gesAlarmContainer = GesAlarmContainer.getInstance(); try { mqClient.getConn().setExceptionListener(this); // 设置监听 } catch (JMSException e) { log.error(e.getMessage(), e); } this.session = this.mqClient.getSession(); this.recvQueueName = GlobleConfiguration.getInstance().getAlarmRecvQueueName(); } /* (non-Javadoc) * @see java.lang.Thread#run() */ @Override public void run() { log.debug("启动线程, Thread.currentThread().getName()=" + Thread.currentThread().getName()); try { dest = session.createQueue(recvQueueName); consumer = session.createConsumer(dest); consumer.setMessageListener(this); // 阻塞线程,直到收到音讯为“quit”时被唤醒 synchronized (this) { wait(); } log.debug("完结线程, Thread.currentThread().getName()=" + Thread.currentThread().getName()); mqClient.close(); } catch (JMSException e) { log.error("创立消费者失败:" + e.getMessage(), e); } catch (InterruptedException e) { log.error("线程被中断", e); } } /* (non-Javadoc) * @see javax.jms.ExceptionListener#onException(javax.jms.JMSException) */ @Override public void onException(JMSException e) { // 捕捉异样,连贯重连 log.error("服务端异样", e); // 完结线程 if(this.isAlive()) { synchronized (this) { this.notifyAll(); } } // 每隔一分钟循环获取,直到获取连贯胜利 long sleepTimeSec = 10; long sleepTimeMillis = sleepTimeSec * 1000; int reConnCnt = 0; while (true) { try { reConnCnt++; log.info("休眠" + sleepTimeSec + "秒 -_-~zZ"); Thread.sleep(sleepTimeMillis); log.debug("开始从新获取连贯"); // 先敞开连贯,再从新连贯 mqClient.close(); mqClient.buildConnect(); if(mqClient.isConn()) { log.info("从新获取连贯胜利,耗时:[" + reConnCnt * sleepTimeSec + "]秒 ^_^"); // 从新创立监听 new AlarmAutoReceiver(mqClient).start(); break; } log.error("第" + reConnCnt + "次从新获取连贯失败 T_T"); } catch (InterruptedException e1) { log.error(e1.getMessage(), e1); } } } // /** // * 测试函数 // * @param args // */ // public static void main(String[] args) { // // 初始化零碎配置文件 // GlobleConfiguration.getInstance().initSysConfig(); // // ActiveMQReceiver recv = new ActiveMQReceiver(); // recv.start(); // logger.debug("消费者启动胜利"); // } /* (non-Javadoc) * @see javax.jms.MessageListener#onMessage(javax.jms.Message) */ @Override public void onMessage(Message message) { log.info("-------------- 收到一条告警信息 -----------"); if (message instanceof TextMessage){ TextMessage tm = (TextMessage)message; try { String xml = tm.getText(); if(xml!=null){ try{ filewrite.handle(xml.substring(xml.indexOf("<ALARMS>"),xml.lastIndexOf("</ALARMS>")+9)+"\n--"+dateFormat(new Date())); }catch(Exception e){ log.error("告警日志写入出错!",e); filewrite.handle(xml+"\n--"+dateFormat(new Date())); } } // 当音讯为“quit”时,唤醒线程 if(xml != null && xml.equalsIgnoreCase("quit")) { logger.info(Thread.currentThread().getName() + "接管到的音讯为:" + xml + ",开始退出线程"); synchronized (this) { notifyAll(); } return; } /*String gesAlarmStr = ReadXML.obtainBodyInfo(xml.trim()); List<GesAlarm> gesAlarmList = ReadXML.getAlarmValue(gesAlarmStr); if(gesAlarmList!=null&&gesAlarmList.size()>0){ logger.debug("gesAlarmList.size()=" + gesAlarmList.size()); for(int i=0;i<gesAlarmList.size();i++){ GesAlarm gesAlarm = new GesAlarm(); gesAlarm = gesAlarmList.get(i); gesAlarmContainer.putGesAlarm(gesAlarm); } }*/ //向kafka写入数据 new ProducerHandler("znwgAlarm",xml); } catch (JMSException e) { } catch(Exception e){ logger.error("承受告警信息出错!", e); }finally { } } } private String dateFormat(Date date){ SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); return sdf.format(date); } // private long num; /** * @return the mqClient */ public ActiveMQClient getMqClient() { return mqClient; } /** * @param mqClient the mqClient to set */ public void setMqClient(ActiveMQClient mqClient) { this.mqClient = mqClient; } /** * @return the session */ public Session getSession() { return session; } /** * @param session the session to set */ public void setSession(Session session) { this.session = session; } /** * @return the recvQueueName */ public String getRecvQueueName() { return recvQueueName; } /** * @param recvQueueName the recvQueueName to set */ public void setRecvQueueName(String recvQueueName) { this.recvQueueName = recvQueueName; } /** * @return the dest */ public Destination getDest() { return dest; } /** * @param dest the dest to set */ public void setDest(Destination dest) { this.dest = dest; } /** * @return the consumer */ public MessageConsumer getConsumer() { return consumer; } /** * @param consumer the consumer to set */ public void setConsumer(MessageConsumer consumer) { this.consumer = consumer; } }}