关于java:java-编程技术异步通信

4次阅读

共计 9908 个字符,预计需要花费 25 分钟才能阅读完成。

一、分布式的业务场景

1、如何高效实现各个分布式系统的合作

通过音讯队列来达到异步解耦的成果,缩小了程序之间的阻塞等待时间,升高了因为服务之间调用的依赖危险。

2、音讯的弊病?如何解决?

音讯队列的问题在于不确定性,java 培训不能相对保障音讯的精确达到,所以要引入提早、周期性的被动轮询,来发现未达到的音讯,从而进行弥补。

二、音讯队列简介
音讯队列,也叫消息中间件。音讯的传输过程中保留音讯的容器。

音讯队列都解决了什么问题?

异步

2、并行

3、解耦

4、排队(削峰填谷)

5 弊病:不确定性和提早

解决方案:最终统一

音讯模式
点对点

订阅

三 音讯队列工具 ActiveMQ
1、简介

同类产品:RabbitMQ、Kafka、Redis(List)

1.1 比照 RabbitMQ

最靠近的同类型产品,常常拿来比拟,性能伯仲之间,基本上能够相互代替。最次要区别是二者的协定不同 RabbitMQ 的协定是 AMQP(Advanced Message Queueing Protoco),而 ActiveMQ 应用的是 JMS(Java Messaging Service)协定。顾名思义 JMS 是针对 Java 体系的传输协定,队列两端必须有 JVM,所以如果开发环境都是 java 的话举荐应用 ActiveMQ,能够用 Java 的一些对象进行传递比方 Map、BLob、Stream 等。而 AMQP 通用行较强,非 java 环境常常应用,传输内容就是规范字符串。

另外一点就是 RabbitMQ 用 Erlang 开发,装置前要装 Erlang 环境,比拟麻烦。ActiveMQ 解压即可用不必任何装置。

1.2 比照 KafKa

Kafka 性能超过 ActiveMQ 等传统 MQ 工具,集群扩展性好。

弊病是:

在传输过程中可能会呈现音讯反复的状况,

不保障发送程序

一些传统 MQ 的性能没有,比方音讯的事务性能。

所以通常用 Kafka 解决大数据日志。

1.3 比照 Redis

其实 Redis 自身利用 List 能够实现音讯队列的性能,然而性能很少,而且队列体积较大时性能会急剧下降。对于数据量不大、业务简略的场景能够应用。

2 装置 ActiveMQ

拷贝 apache-activemq-5.14.4-bin.tar.gz 到 Linux 服务器的 /opt 下

解压缩 tar -zxvfapache-activemq-5.14.4-bin.tar.gz

重命名 mv apache-activemq-5.14.4 activemq

vim /opt/activemq/bin/activemq

减少两行

JAVA_HOME=”/opt/jdk1.8.0_152″JAVA_CMD=”/opt/jdk1.8.0_152/bin

注册服务

ln -s /opt/activemq/bin/activemq /etc/init.d/activemqchkconfig –add activemq
复制代码

启动服务

service activemq start

敞开服务

service activemq stop

通过 netstat 查看端口

activemq 两个重要的端口,一个是提供音讯队列的默认端口:61616

另一个是控制台端口 8161

通过控制台测试

启动生产端

进入网页控制台

账号 / 明码默认:admin/admin

点击 Queues

察看客户端

在 Java 中应用音讯队列
3.1 在 gmall-service-util 中导入依赖坐标

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-activemq</artifactId><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-pool</artifactId><version>5.15.2</version><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion></exclusions></dependency>

3.2 producer 端

public static void main(String[] args) {ConnectionFactory connect = new ActiveMQConnectionFactory(“tcp://192.168.67.163:61616”);try {Connection connection = connect.createConnection();connection.start();// 第一个值示意是否应用事务,如果抉择 true,第二个值相当于抉择 0Session session = connection.createSession(true, Session.SESSION_TRANSACTED);Queue testqueue = session.createQueue(“TEST1”);MessageProducer producer = session.createProducer(testqueue);TextMessage textMessage=new ActiveMQTextMessage();textMessage.setText(“ 今天天气真好!”);producer.setDeliveryMode(DeliveryMode.PERSISTENT);producer.send(textMessage);session.commit();connection.close();} catch (JMSException e) {e.printStackTrace();}}

3.3 consumer

public static void main(String[] args) {ConnectionFactory connect = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,”tcp://192.168.67.163:61616″);try {Connection connection = connect.createConnection();connection.start();// 第一个值示意是否应用事务,如果抉择 true,第二个值相当于抉择 0Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);Destination testqueue = session.createQueue(“TEST1”);MessageConsumer consumer = session.createConsumer(testqueue);consumer.setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message) {if(message instanceof TextMessage){try {String text = ((TextMessage) message).getText();System.out.println(text);//session.rollback();} catch (JMSException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}});}catch (Exception e){e.printStackTrace();}}

3.4 对于事务管制

3.5 长久化与非长久化

通过 producer.setDeliveryMode(DeliveryMode.PERSISTENT) 进行设置

长久化的益处就是当 activemq 宕机的话,音讯队列中的音讯不会失落。非长久化会失落。然而会耗费肯定的性能。

四 与 springboot 整合
1 配置类 ActiveMQConfig

@Configurationpublic class ActiveMQConfig {@Value(“${spring.activemq.broker-url:disabled}”)String brokerURL ;@Value(“${activemq.listener.enable:disabled}”)String listenerEnable;@Beanpublic ActiveMQUtil getActiveMQUtil() throws JMSException {if(brokerURL.equals(“disabled”)){return null;}ActiveMQUtil activeMQUtil=new ActiveMQUtil();activeMQUtil.init(brokerURL);return activeMQUtil;}// 定义一个音讯监听器连贯工厂,这里定义的是点对点模式的监听器连贯工厂 @Bean(name = “jmsQueueListener”)public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(ActiveMQConnectionFactory activeMQConnectionFactory) {DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();if(!listenerEnable.equals(“true”)){return null;}factory.setConnectionFactory(activeMQConnectionFactory);// 设置并发数 factory.setConcurrency(“5”);// 重连间隔时间 factory.setRecoveryInterval(5000L);factory.setSessionTransacted(false);factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);return factory;}@Beanpublic ActiveMQConnectionFactory activeMQConnectionFactory (){ActiveMQConnectionFactory activeMQConnectionFactory =new ActiveMQConnectionFactory( brokerURL);return activeMQConnectionFactory;}}

2 工具类 ActiveMQUtil
public class ActiveMQUtil {PooledConnectionFactory pooledConnectionFactory=null;public ConnectionFactory init(String brokerUrl) {ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);// 退出连接池 pooledConnectionFactory=new PooledConnectionFactory(factory);// 出现异常时从新连贯 pooledConnectionFactory.setReconnectOnException(true);//pooledConnectionFactory.setMaxConnections(5);pooledConnectionFactory.setExpiryTimeout(10000);return pooledConnectionFactory;}public ConnectionFactory getConnectionFactory(){return pooledConnectionFactory;}}

五 在领取业务模块中利用
1 领取胜利告诉

领取模块利用音讯队列告诉订单零碎,领取胜利

在领取模块中配置 application.properties

spring.activemq.broker-url=tcp://mq.server.com:61616
复制代码

在 PaymentServiceImpl 中减少发送办法:

public void sendPaymentResult(String orderId,String result){ConnectionFactory connectionFactory = activeMQUtil.getConnectionFactory();Connection connection=null;try {connection = connectionFactory.createConnection();connection.start();Session session = connection.createSession(true, Session.SESSION_TRANSACTED);Queue paymentResultQueue = session.createQueue(“PAYMENT_RESULT_QUEUE”);MapMessage mapMessage=new ActiveMQMapMessage();mapMessage.setString(“orderId”,orderId);mapMessage.setString(“result”,result);MessageProducer producer = session.createProducer(paymentResultQueue);producer.send(mapMessage);session.commit();
复制代码

producer.close();session.close();connection.close();} catch (JMSException e) {e.printStackTrace();}}

在 PaymentController 中减少一个办法用来测试

@RequestMapping(“sendResult”)@ResponseBodypublic String sendPaymentResult(@RequestParam(“orderId”) String orderId){paymentService.sendPaymentResult(orderId,”success”);return “has been sent”;}

在浏览器中拜访:

查看队列内容:有一个在队列中没有被生产的音讯。

2 订单模块生产音讯

application.properties

spring.activemq.broker-url=tcp://mq.server.com:61616activemq.listener.enable=true
复制代码

订单音讯音讯后要更新订单状态,先筹备好订单状态更新的办法

public void updateProcessStatus(String orderId , ProcessStatus processStatus, Map<String,String>… paramMaps) {OrderInfo orderInfo = new OrderInfo();orderInfo.setId(orderId);orderInfo.setOrderStatus(processStatus.getOrderStatus());orderInfo.setProcessStatus(processStatus);// 动静减少须要补充更新的属性 if (paramMaps != null && paramMaps.length > 0) {Map<String, String> paramMap = paramMaps[0];for (Map.Entry<String, String> entry : paramMap.entrySet()) {String properties = entry.getKey();String value = entry.getValue();try {BeanUtils.setProperty(orderInfo, properties, value);} catch (IllegalAccessException e) {e.printStackTrace();} catch (InvocationTargetException e) {e.printStackTrace();}}}orderInfoMapper.updateByPrimaryKeySelective(orderInfo);}
复制代码

音讯队列的生产端

@JmsListener(destination = “PAYMENT_RESULT_QUEUE”,containerFactory = “jmsQueueListener”)public void consumePaymentResult(MapMessage mapMessage) throws JMSException {String orderId = mapMessage.getString(“orderId”);String result = mapMessage.getString(“result”);if(!”success”.equals(result)){orderService.updateProcessStatus( orderId , ProcessStatus.PAY_FAIL);}else{orderService.updateProcessStatus( orderId , ProcessStatus.PAID);} orderService.sendOrderResult(orderId);}
复制代码

3 订单模块发送减库存告诉

订单模块除了接管到申请扭转单据状态,还要发送库存零碎

查看看《库存管理系统接口手册》中【减库存的音讯队列生产端接口】中的形容,组织相应的音讯数据进行传递。

@Transactionalpublic void sendOrderResult(String orderId){OrderInfo orderInfo = getOrderInfo(orderId);Map<String, Object> messageMap = initWareOrderMessage(orderInfo);String wareOrderJson= JSON.toJSONString(messageMap);Session session = null;try {Connection conn = activeMQUtil.getConnection();session = conn.createSession(true, Session.SESSION_TRANSACTED);Queue queue = session.createQueue(“ORDER_RESULT_QUEUE”);MessageProducer producer = session.createProducer(queue);TextMessage message =new ActiveMQTextMessage();message.setText(wareOrderJson);producer.send(message);updateProcessStatus(orderInfo.getId(), ProcessStatus.NOTIFIED_WARE);session.commit();producer.close();conn.close();} catch (JMSException e) {e.printStackTrace();}}
复制代码

针对接口手册中须要的音讯进行组织

public Map<String,Object> initWareOrderMessage(OrderInfo orderInfo) {// 筹备发送到仓库零碎的订单 String wareId = orderInfo.getWareId();HashMap<String, Object> hashMap = new HashMap<>();hashMap.put(“orderId”, orderInfo.getId());hashMap.put(“consignee”, orderInfo.getConsignee());hashMap.put(“consigneeTel”, orderInfo.getConsigneeTel());hashMap.put(“orderComment”, orderInfo.getOrderComment());hashMap.put(“orderBody”, orderInfo.getOrderSubject());hashMap.put(“deliveryAddress”, orderInfo.getDeliveryAddress());hashMap.put(“paymentWay”, “2”);//1 货到付款 2 在线领取 hashMap.put(“wareId”,wareId);List<HashMap<String, String>> details = new ArrayList<>();List<OrderDetail> orderDetailList = orderInfo.getOrderDetailList();for (OrderDetail orderDetail : orderDetailList) {HashMap<String, String> detailMap = new HashMap<>();detailMap.put(“skuId”, orderDetail.getSkuId());detailMap.put(“skuNum”, “” + orderDetail.getSkuNum());detailMap.put(“skuName”, orderDetail.getSkuName());details.add(detailMap);}hashMap.put(“details”, details);return hashMap;}
复制代码

4 生产减库存后果

给仓库零碎发送减库存音讯后,还要承受减库存胜利或者失败的音讯。

同样依据《库存管理系统接口手册》中【商品减库后果音讯】的阐明实现。生产该音讯的音讯队列监听程序。

承受到音讯后次要做的工作就是更新订单状态。

@JmsListener(destination = “SKU_DEDUCT_QUEUE”,containerFactory = “jmsQueueListener”)public void consumeSkuDeduct(MapMessage mapMessage) throws JMSException {String orderId = mapMessage.getString(“orderId”);String status = mapMessage.getString(“status”);if(“DEDUCTED”.equals(status)){orderService.updateProcessStatus( orderId , ProcessStatus.WAITING_DELEVER);return ;}else{orderService.updateProcessStatus( orderId , ProcessStatus.STOCK_EXCEPTION);return ;}}
复制代码

最初一次领取实现后,所有业务全副走通应该能够在订单列表中,查看到对应的订单是待发货状态。

关键词:java 培训

正文完
 0