1 简介
Solace
是一个弱小的实时性的事件驱动音讯队列。本文将介绍如何在Spring中应用,尽管代码应用的是Spring Boot,但并没有应用相干starter,跟Spring的整合一样,可通用。JMS是通过的音讯解决框架,能够深刻学习一下,不同的MQ在JMS的整合上都是相似的。
2 通过Docker启动Solace
有两种形式试用Solace,一种是通过Docker来启动,另一种是应用Cloud版本,但Cloud版本有试用期限,咱们应用Docker来启动吧。
先下载镜像:
$ docker pull solace/solace-pubsub-standard:9.13.0.16
而后通过以下命令启动:
$ docker run -d -p 8080:8080 -p 55554:55555 -p 8008:8008 -p 1883:1883 -p 8000:8000 -p 5672:5672 -p 9000:9000 -p 2222:2222 --shm-size=2g --env username_admin_globalaccesslevel=admin --env username_admin_password=admin --name=solace solace/solace-pubsub-standard:9.13.0.16
这里端口改为55554
,是因为Mac的起因。
而后便能够拜访来登陆治理界面:http://localhost:8080/
用户名明码为:admin/admin
登陆后能够看到如下界面,Solace按VPN来治理队列,VPN有点像分组,比方某个业务线应用某个VPN。
咱们在default
的VPN上创立一个Queue
,名为pkslow-queue
:
其它设置如下:
接着在该Queue上创立Topic:
创立实现后,咱们能够间接测试一下:
能够Publish到Topic或Queue,也能够从其中一个Subscribe。
实现以上设置后,咱们就能够在Spring Boot中整合了。
3 Spring Boot JMS整合Solace
3.1 发送音讯
咱们是通过JmsTemplate
来发送音讯的,而JmsTemplate
须要连贯到MQ,就须要一个ConnectionFactory
,这个Factory是带着MQ的一些连贯信息。配置代码如下:
@Configurationpublic class SolacePubConfig { private final SolaceProperties solaceProperties; public SolacePubConfig(SolaceProperties solaceProperties) { this.solaceProperties = solaceProperties; } @Bean("connectionFactory") public ConnectionFactory connectionFactory() throws Exception { Properties env = new Properties(); env.put(InitialContext.INITIAL_CONTEXT_FACTORY, "com.solacesystems.jndi.SolJNDIInitialContextFactory"); env.put(InitialContext.PROVIDER_URL, solaceProperties.getBrokerUrl()); env.put(SupportedProperty.SOLACE_JMS_VPN, solaceProperties.getVpn()); env.put(InitialContext.SECURITY_PRINCIPAL, solaceProperties.getUsername()); env.put(InitialContext.SECURITY_CREDENTIALS, solaceProperties.getPassword()); return SolJmsUtility.createConnectionFactory(env); } @Bean public CachingConnectionFactory cachingConnectionFactory(ConnectionFactory connectionFactory) { CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(connectionFactory); cachingConnectionFactory.setSessionCacheSize(10); return cachingConnectionFactory; } @Bean public JmsTemplate pubJmsTemplate(CachingConnectionFactory cachingConnectionFactory) { JmsTemplate jmsTemplate = new JmsTemplate(cachingConnectionFactory); jmsTemplate.setPubSubDomain(true); jmsTemplate.setExplicitQosEnabled(true); jmsTemplate.setDeliveryPersistent(true); jmsTemplate.setDefaultDestinationName(solaceProperties.getDefaultPubDestinationName()); return jmsTemplate; }}
生成JmsTemplate
后,就能够援用并发送音讯了:
@RestController@RequestMapping("/solace")public class SolaceTestController { private final JmsTemplate pubJmsTemplate; private final SolaceProperties solaceProperties; public SolaceTestController(JmsTemplate pubJmsTemplate, SolaceProperties solaceProperties) { this.pubJmsTemplate = pubJmsTemplate; this.solaceProperties = solaceProperties; } @GetMapping public String send() { pubJmsTemplate.send(solaceProperties.getDefaultPubDestinationName(), session -> session.createTextMessage("www.pkslow.com")); pubJmsTemplate.send(session -> session.createTextMessage("Larry Deng")); return "OK"; }}
用到的属性配置如下:
server.port=8083pkslow.solace.brokerUrl=smf://127.0.0.1:55554pkslow.solace.vpn=defaultpkslow.solace.username=defaultpkslow.solace.password=defaultpkslow.solace.defaultPubDestinationName=pkslow-topicpkslow.solace.defaultSubDestinationName=pkslow-queue
@Configuration@ConfigurationProperties(prefix = "pkslow.solace")@Setter@Getterpublic class SolaceProperties { private String brokerUrl; private String vpn; private String username; private String password; private String defaultPubDestinationName; private String defaultSubDestinationName;}
3.2 接管音讯
咱们通过MessageListenerContainer
来接管音讯,MessageListenerContainer
也须要一个ConnectionFactory
,也有MQ的连贯信息。还须要一个MessageListener
,用来定义如何解决音讯。咱们的配置如下:
@Configuration@Slf4jpublic class SolaceSubConfig { private final SolaceProperties solaceProperties; public SolaceSubConfig(SolaceProperties solaceProperties) { this.solaceProperties = solaceProperties; } @Bean public SingleConnectionFactory singleConnectionFactory(@Qualifier("connectionFactory") ConnectionFactory targetConnectionFactory) { return new SingleConnectionFactory(targetConnectionFactory); } @Bean public MessageListener messageListener() { return message -> { try { log.info("Received message " + ((TextMessage) message).getText() + " on destination: " + message.getJMSDestination().toString()); } catch (JMSException ex) { throw new RuntimeException(ex); } }; } @Bean public MessageListenerContainer messageListenerContainer(SingleConnectionFactory singleConnectionFactory, MessageListener messageListener) { DefaultMessageListenerContainer container = new DefaultMessageListenerContainer(); container.setConnectionFactory(singleConnectionFactory); container.setDestinationName(solaceProperties.getDefaultSubDestinationName()); container.setMessageListener(messageListener); return container; }}
这里@Qualifier("connectionFactory") ConnectionFactory targetConnectionFactory
复用了在SolacePubConfig
创立的对象。
3.3 测试
发送GET申请就能够触发发送了:
GET http://localhost:8083/solace
我发了三次,后果日志如下:
4 代码
代码请看GitHub: https://github.com/LarryDpk/p...
References:
Docker available image tags
Docker Solace Guide
Spring Solace