本文应用 springboot 集成 IBM MQ 的客户端, 简略的实现音讯接管和发送逻辑.
pom 依赖
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
</dependency>
<dependency>
<groupId>javax.jms</groupId>
<artifactId>javax.jms-api</artifactId>
</dependency>
<dependency>
<groupId>com.ibm.mq</groupId>
<artifactId>com.ibm.mq.allclient</artifactId>
<version>9.0.4.0</version>
</dependency>
配置文件
这里的配置和上文中创立的队列信息统一
server.port=9111
## mq 的连贯 ip
project.mq.host=192.168.0.106
## mq 的连贯端口
project.mq.port=1414
## mq 的队列管理器名字
project.mq.queue-manager=QM144
## mq 的 java 客户端连贯通道
project.mq.channel=SYSTEM_DEF_SVRCONN
## mq 的用户名
project.mq.username=mqm
## mq 的明码
project.mq.password=Paic2023q2
## mq 的接管超时工夫
project.mq.receive-timeout=20000
## 收发队列名称(可选)project.mq.queuename=QUEUE1
配置类
@Configuration
public class JmsConfig {@Value("${project.mq.host}")
private String host;
@Value("${project.mq.port}")
private Integer port;
@Value("${project.mq.queue-manager}")
private String queueManager;
@Value("${project.mq.channel}")
private String channel;
@Value("${project.mq.username}")
private String username;
@Value("${project.mq.password}")
private String password;
@Value("${project.mq.receive-timeout}")
private long receiveTimeout;
@Bean
public MQQueueConnectionFactory mqQueueConnectionFactory() {MQQueueConnectionFactory mqQueueConnectionFactory = new MQQueueConnectionFactory();
mqQueueConnectionFactory.setHostName(host);
try {mqQueueConnectionFactory.setTransportType(WMQConstants.WMQ_CM_CLIENT);
mqQueueConnectionFactory.setCCSID(1208);
mqQueueConnectionFactory.setChannel(channel);
mqQueueConnectionFactory.setPort(port);
mqQueueConnectionFactory.setQueueManager(queueManager);
} catch (Exception e) {e.printStackTrace();
}
return mqQueueConnectionFactory;
}
@Bean
UserCredentialsConnectionFactoryAdapter userCredentialsConnectionFactoryAdapter(MQQueueConnectionFactory mqQueueConnectionFactory) {UserCredentialsConnectionFactoryAdapter userCredentialsConnectionFactoryAdapter = new UserCredentialsConnectionFactoryAdapter();
userCredentialsConnectionFactoryAdapter.setUsername(username);
userCredentialsConnectionFactoryAdapter.setPassword(password);
userCredentialsConnectionFactoryAdapter.setTargetConnectionFactory(mqQueueConnectionFactory);
return userCredentialsConnectionFactoryAdapter;
}
@Bean
@Primary
public CachingConnectionFactory cachingConnectionFactory(UserCredentialsConnectionFactoryAdapter userCredentialsConnectionFactoryAdapter) {CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setTargetConnectionFactory(userCredentialsConnectionFactoryAdapter);
cachingConnectionFactory.setSessionCacheSize(500);
cachingConnectionFactory.setReconnectOnException(true);
return cachingConnectionFactory;
}
@Bean
public PlatformTransactionManager jmsTransactionManager(CachingConnectionFactory cachingConnectionFactory) {JmsTransactionManager jmsTransactionManager = new JmsTransactionManager();
jmsTransactionManager.setConnectionFactory(cachingConnectionFactory);
return jmsTransactionManager;
}
@Bean
public JmsOperations jmsOperations(CachingConnectionFactory cachingConnectionFactory) {JmsTemplate jmsTemplate = new JmsTemplate(cachingConnectionFactory);
jmsTemplate.setReceiveTimeout(receiveTimeout);
return jmsTemplate;
}
}
发送音讯
public void sendMq(@RequestParam String msg) {log.info("取得报文并开始向 mq 发送:"+msg);
jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
jmsTemplate.setExplicitQosEnabled(true);
jmsTemplate.send("QUEUE1", session -> {TextMessage m = session.createTextMessage();
m.setText(msg);
m.setJMSCorrelationID("1111");
return m;
});
// Message message = jmsTemplate.receiveSelected("QUEUE1", "JMSCorrelationID='2222'");
// System.err.println("---->" + message.toString());
log.info("====== 推送 mq 报文实现 ======");
}
接管音讯
@Slf4j
@Component
public class MQListener extends MessageListenerAdapter {
@Autowired
JmsOperations jmsOperations;
@Autowired
private JmsTemplate jmsTemplate;
@Override
@JmsListener(destination = "${project.mq.queuename}", selector = "JMSCorrelationID='1111'")
public void onMessage(Message message) {log.info("从 MQ 接管的 message 报文:"+message);
jmsTemplate.send("QUEUE1", session -> {TextMessage m = session.createTextMessage();
m.setText("return data");
m.setJMSCorrelationID("2222");
return m;
});
}
}
代码地址: https://gitee.com/sharloon/ibm-mq80-demo.git
本文由 mdnice 多平台公布