乐趣区

关于java:通过Docker启动Solace并在Spring-Boot通过JMS整合Solace

1 简介

Solace是一个弱小的实时性的事件驱动音讯队列。本文将介绍如何在 Spring 中应用,尽管代码应用的是 Spring Boot,但并没有应用相干 starter,跟 Spring 的整合一样,可通用。JMS 是通过的音讯解决框架,能够深刻学习一下,不同的 MQ 在 JMS 的整合上都是相似的。

2 通过 Docker 启动 Solace

有两种形式试用 Solace,一种是通过 Docker 来启动,另一种是应用 Cloud 版本,但 Cloud 版本有试用期限,咱们应用 Docker 来启动吧。

先下载镜像:

$ docker pull solace/solace-pubsub-standard:9.13.0.16

而后通过以下命令启动:

$ docker run -d -p 8080:8080 -p 55554:55555 -p 8008:8008 -p 1883:1883 -p 8000:8000 -p 5672:5672 -p 9000:9000 -p 2222:2222 --shm-size=2g --env username_admin_globalaccesslevel=admin --env username_admin_password=admin --name=solace solace/solace-pubsub-standard:9.13.0.16

这里端口改为55554,是因为 Mac 的起因。

而后便能够拜访来登陆治理界面:http://localhost:8080/

用户名明码为:admin/admin

登陆后能够看到如下界面,Solace 按 VPN 来治理队列,VPN 有点像分组,比方某个业务线应用某个 VPN。

咱们在 default 的 VPN 上创立一个Queue,名为pkslow-queue

其它设置如下:

接着在该 Queue 上创立 Topic:

创立实现后,咱们能够间接测试一下:

能够 Publish 到 Topic 或 Queue,也能够从其中一个 Subscribe。

实现以上设置后,咱们就能够在 Spring Boot 中整合了。

3 Spring Boot JMS 整合 Solace

3.1 发送音讯

咱们是通过 JmsTemplate 来发送音讯的,而 JmsTemplate 须要连贯到 MQ,就须要一个ConnectionFactory,这个 Factory 是带着 MQ 的一些连贯信息。配置代码如下:

@Configuration
public class SolacePubConfig {

    private final SolaceProperties solaceProperties;

    public SolacePubConfig(SolaceProperties solaceProperties) {this.solaceProperties = solaceProperties;}

    @Bean("connectionFactory")
    public ConnectionFactory connectionFactory() throws Exception {Properties env = new Properties();
        env.put(InitialContext.INITIAL_CONTEXT_FACTORY, "com.solacesystems.jndi.SolJNDIInitialContextFactory");
        env.put(InitialContext.PROVIDER_URL, solaceProperties.getBrokerUrl());
        env.put(SupportedProperty.SOLACE_JMS_VPN, solaceProperties.getVpn());
        env.put(InitialContext.SECURITY_PRINCIPAL, solaceProperties.getUsername());
        env.put(InitialContext.SECURITY_CREDENTIALS, solaceProperties.getPassword());
        return SolJmsUtility.createConnectionFactory(env);
    }

    @Bean
    public CachingConnectionFactory cachingConnectionFactory(ConnectionFactory connectionFactory) {CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(connectionFactory);
        cachingConnectionFactory.setSessionCacheSize(10);
        return cachingConnectionFactory;
    }

    @Bean
    public JmsTemplate pubJmsTemplate(CachingConnectionFactory cachingConnectionFactory) {JmsTemplate jmsTemplate = new JmsTemplate(cachingConnectionFactory);
        jmsTemplate.setPubSubDomain(true);
        jmsTemplate.setExplicitQosEnabled(true);
        jmsTemplate.setDeliveryPersistent(true);
        jmsTemplate.setDefaultDestinationName(solaceProperties.getDefaultPubDestinationName());
        return jmsTemplate;
    }

}

生成 JmsTemplate 后,就能够援用并发送音讯了:

@RestController
@RequestMapping("/solace")
public class SolaceTestController {
    private final JmsTemplate pubJmsTemplate;
    private final SolaceProperties solaceProperties;

    public SolaceTestController(JmsTemplate pubJmsTemplate, SolaceProperties solaceProperties) {
        this.pubJmsTemplate = pubJmsTemplate;
        this.solaceProperties = solaceProperties;
    }

    @GetMapping
    public String send() {pubJmsTemplate.send(solaceProperties.getDefaultPubDestinationName(), session -> session.createTextMessage("www.pkslow.com"));
        pubJmsTemplate.send(session -> session.createTextMessage("Larry Deng"));
        return "OK";
    }
}

用到的属性配置如下:

server.port=8083

pkslow.solace.brokerUrl=smf://127.0.0.1:55554
pkslow.solace.vpn=default
pkslow.solace.username=default
pkslow.solace.password=default
pkslow.solace.defaultPubDestinationName=pkslow-topic
pkslow.solace.defaultSubDestinationName=pkslow-queue
@Configuration
@ConfigurationProperties(prefix = "pkslow.solace")
@Setter
@Getter
public class SolaceProperties {
    private String brokerUrl;
    private String vpn;
    private String username;
    private String password;
    private String defaultPubDestinationName;
    private String defaultSubDestinationName;
}

3.2 接管音讯

咱们通过 MessageListenerContainer 来接管音讯,MessageListenerContainer也须要一个ConnectionFactory,也有 MQ 的连贯信息。还须要一个MessageListener,用来定义如何解决音讯。咱们的配置如下:

@Configuration
@Slf4j
public class SolaceSubConfig {
    private final SolaceProperties solaceProperties;

    public SolaceSubConfig(SolaceProperties solaceProperties) {this.solaceProperties = solaceProperties;}

    @Bean
    public SingleConnectionFactory singleConnectionFactory(@Qualifier("connectionFactory") ConnectionFactory targetConnectionFactory) {return new SingleConnectionFactory(targetConnectionFactory);
    }

    @Bean
    public MessageListener messageListener() {
        return message -> {
            try {log.info("Received message" + ((TextMessage) message).getText() + "on destination:" +
                        message.getJMSDestination().toString());
            } catch (JMSException ex) {throw new RuntimeException(ex);
            }
        };
    }


    @Bean
    public MessageListenerContainer messageListenerContainer(SingleConnectionFactory singleConnectionFactory, MessageListener messageListener) {DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
        container.setConnectionFactory(singleConnectionFactory);
        container.setDestinationName(solaceProperties.getDefaultSubDestinationName());
        container.setMessageListener(messageListener);

        return container;
    }
}

这里 @Qualifier("connectionFactory") ConnectionFactory targetConnectionFactory 复用了在 SolacePubConfig 创立的对象。

3.3 测试

发送 GET 申请就能够触发发送了:

GET http://localhost:8083/solace

我发了三次,后果日志如下:

4 代码

代码请看 GitHub: https://github.com/LarryDpk/p…


References:

Docker available image tags

Docker Solace Guide

Spring Solace

退出移动版