关于java:RabbitMQ实现即时通讯居然如此简单

6次阅读

共计 6023 个字符,预计需要花费 16 分钟才能阅读完成。

有时候咱们的我的项目中会用到即时通讯性能,比方电商零碎中的客服聊天性能,还有在领取过程中,当用户领取胜利后,第三方领取服务会回调咱们的回调接口,此时咱们须要告诉前端领取胜利。最近发现 RabbitMQ 能够很不便的实现即时通讯性能,如果你没有非凡的业务需要,甚至能够不写后端代码,明天给大家讲讲如何应用 RabbitMQ 来实现即时通讯!

MQTT 协定

MQTT(Message Queuing Telemetry Transport,音讯队列遥测传输协定),是一种基于公布 / 订阅 (publish/subscribe) 模式的轻量级通信协定,该协定构建于 TCP/IP 协定上。MQTT 最大长处在于,能够以极少的代码和无限的带宽,为连贯近程设施提供实时牢靠的音讯服务。

MQTT 相干概念

  • Publisher(发布者):音讯的收回者,负责发送音讯。
  • Subscriber(订阅者):音讯的订阅者,负责接管并解决音讯。
  • Broker(代理):音讯代理,位于音讯发布者和订阅者之间,各类反对 MQTT 协定的消息中间件都能够充当。
  • Topic(主题):能够了解为音讯队列中的路由,订阅者订阅了主题之后,就能够收到发送到该主题的音讯。
  • Payload(负载);能够了解为发送音讯的内容。
  • QoS(音讯品质):全称 Quality of Service,即音讯的发送品质,次要有 QoS 0、QoS 1、QoS 2 三个等级,上面别离介绍下:QoS 0(Almost Once):至少一次,只发送一次,会产生音讯失落或反复;QoS 1(Atleast Once):至多一次,确保音讯达到,但音讯反复可能会产生;QoS 2(Exactly Once):只有一次,确保音讯只达到一次。

RabbitMQ 启用 MQTT 性能

RabbitMQ 启用 MQTT 性能,须要先装置然 RabbitMQ 而后再启用 MQTT 插件。

  • 首先咱们须要装置并启动 RabbitMQ,对 RabbitMQ 不理解的敌人能够参考《花了 3 天总结的 RabbitMQ 实用技巧,有点货色!》;
  • 接下来就是启用 RabbitMQ 的 MQTT 插件了,默认是不启用的,应用如下命令开启即可;
rabbitmq-plugins enable rabbitmq_mqtt
复制代码
  • 开启胜利后,查看治理控制台,咱们能够发现 MQTT 服务运行在 1883 端口上了。

MQTT 客户端

咱们能够应用 MQTT 客户端来测试 MQTT 的即时通讯性能,这里应用的是 MQTTBox 这个客户端工具。

  • 首先下载并装置好 MQTTBox,下载地址:http://workswithweb.com/mqttb…

  • 点击 Create MQTT Client 按钮来创立一个 MQTT 客户端;

  • 接下来对 MQTT 客户端进行配置,次要是配置好协定端口、连贯用户名明码和 QoS 即可;

  • 再配置一个订阅者,订阅者订阅 testTopicA 这个主题,咱们会向这个主题发送音讯;

  • 发布者向主题中公布音讯,订阅者能够实时接管到。

前端间接实现即时通讯

既然 MQTTBox 客户端能够间接通过 RabbitMQ 实现即时通讯,那咱们是不是间接应用前端技术也能够实现即时通讯?答案是必定的!上面咱们将通过 html+javascript 实现一个简略的聊天性能,真正不写一行后端代码实现即时通讯!

  • 因为 RabbitMQ 与 Web 端交互底层应用的是 WebSocket,所以咱们须要开启 RabbitMQ 的 MQTT WEB 反对,应用如下命令开启即可;
rabbitmq-plugins enable rabbitmq_web_mqtt
复制代码
  • 开启胜利后,查看治理控制台,咱们能够发现 MQTT 的 WEB 服务运行在 15675 端口上了;

  • WEB 端与 MQTT 服务进行通信须要应用一个叫 MQTT.js 的库,我的项目地址:https://github.com/mqttjs/MQT…

  • 实现的性能非常简单,一个单聊性能,须要留神的是配置好 MQTT 服务的拜访地址为:ws://localhost:15675/ws
Title
复制代码

指标 Topic:发送音讯:发送 清空

  • 接下来咱们订阅不同的主题开启两个页面测试下性能(页面放在了 SpringBoot 利用的 resource 目录下了,须要先启动利用再拜访):第一个订阅主题 testTopicA,拜访地址:http://localhost:8088/page/index?topic=testTopicA 第二个订阅主题 testTopicB,拜访地址:http://localhost:8088/page/index?topic=testTopicB
  • 之后相互发送音讯,让咱们来看看成果吧!

在 SpringBoot 中应用

没有非凡业务需要的时候,前端能够间接和 RabbitMQ 对接实现即时通讯。然而有时候咱们须要通过服务端去告诉前端,此时就须要在利用中集成 MQTT 了,接下来咱们来讲讲如何在 SpringBoot 利用中应用 MQTT。

  • 首先咱们须要在 pom.xml 中增加 MQTT 相干依赖;
org.springframework.integration    spring-integration-mqtt
复制代码
  • 在 application.yml 中增加 MQTT 相干配置,次要是拜访地址、用户名明码、默认主题信息;
rabbitmq:  mqtt:    url: tcp://localhost:1883    username: guest    password: guest    defaultTopic: testTopic
复制代码
  • 编写一个 Java 配置类从配置文件中读取配置便于应用;
/** * MQTT 相干配置 * Created by macro on 2020/9/15. */@Data@EqualsAndHashCode(callSuper = false)@Component@ConfigurationProperties(prefix = "rabbitmq.mqtt")public class MqttConfig {/**     * RabbitMQ 连贯用户名     */    private String username;    /**     * RabbitMQ 连贯明码     */    private String password;    /**     * RabbitMQ 的 MQTT 默认 topic     */    private String defaultTopic;    /**     * RabbitMQ 的 MQTT 连贯地址     */    private String url;}
复制代码
  • 增加 MQTT 音讯订阅者相干配置,应用 @ServiceActivator 注解申明一个服务激活器,通过 MessageHandler 来解决订阅音讯;
/** * MQTT 音讯订阅者相干配置 * Created by macro on 2020/9/15. */@Slf4j@Configurationpublic class MqttInboundConfig {@Autowired    private MqttConfig mqttConfig;    @Bean    public MessageChannel mqttInputChannel() {return new DirectChannel();    }    @Bean    public MessageProducer inbound() {        MqttPahoMessageDrivenChannelAdapter adapter =                new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getUrl(), "subscriberClient",                        mqttConfig.getDefaultTopic());        adapter.setCompletionTimeout(5000);        adapter.setConverter(new DefaultPahoMessageConverter());        // 设置音讯品质:0-> 至少一次;1-> 至多一次;2-> 只有一次        adapter.setQos(1);        adapter.setOutputChannel(mqttInputChannel());        return adapter;    }    @Bean    @ServiceActivator(inputChannel = "mqttInputChannel")    public MessageHandler handler() {        return new MessageHandler() {@Override            public void handleMessage(Message> message) throws MessagingException {// 解决订阅音讯                log.info("handleMessage : {}",message.getPayload());            }        };    }}
复制代码
  • 增加 MQTT 音讯发布者相干配置;
/** * MQTT 音讯发布者相干配置 * Created by macro on 2020/9/15. */@Configurationpublic class MqttOutboundConfig {@Autowired    private MqttConfig mqttConfig;    @Bean    public MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();        MqttConnectOptions options = new MqttConnectOptions();        options.setServerURIs(new String[] {mqttConfig.getUrl()});        options.setUserName(mqttConfig.getUsername());        options.setPassword(mqttConfig.getPassword().toCharArray());        factory.setConnectionOptions(options);        return factory;    }    @Bean    @ServiceActivator(inputChannel = "mqttOutboundChannel")    public MessageHandler mqttOutbound() {        MqttPahoMessageHandler messageHandler =                new MqttPahoMessageHandler("publisherClient", mqttClientFactory());        messageHandler.setAsync(true);        messageHandler.setDefaultTopic(mqttConfig.getDefaultTopic());        return messageHandler;    }    @Bean    public MessageChannel mqttOutboundChannel() {        return new DirectChannel();    }}
复制代码
  • 增加 MQTT 网关,用于向主题中发送音讯;
/** * MQTT 网关,通过接口将数据传递到集成流 * Created by macro on 2020/9/15. */@Component@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")public interface MqttGateway {/**     * 发送音讯到默认 topic     */    void sendToMqtt(String payload);    /**     * 发送音讯到指定 topic     */    void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic);    /**     * 发送音讯到指定 topic 并设置 QOS     */    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);}
复制代码
  • 增加 MQTT 测试接口,应用 MQTT 网关向特定主题中发送音讯;
/** * MQTT 测试接口 * Created by macro on 2020/9/15. */@Api(tags = "MqttController", description = "MQTT 测试接口")@RestController@RequestMapping("/mqtt")public class MqttController {@Autowired    private MqttGateway mqttGateway;    @PostMapping("/sendToDefaultTopic")    @ApiOperation("向默认主题发送音讯")    public CommonResult sendToDefaultTopic(String payload) {mqttGateway.sendToMqtt(payload);        return CommonResult.success(null);    }    @PostMapping("/sendToTopic")    @ApiOperation("向指定主题发送音讯")    public CommonResult sendToTopic(String payload, String topic) {mqttGateway.sendToMqtt(payload, topic);        return CommonResult.success(null);    }}
复制代码
  • 调用接口向主题中发送音讯进行测试;

  • 后盾胜利接管到音讯并进行打印。
2020-09-17 14:29:01.689  INFO 11192 --- [ubscriberClient] c.m.mall.tiny.config.MqttInboundConfig   : handleMessage : 来自网页上的音讯 2020-09-17 14:29:06.101  INFO 11192 --- [ubscriberClient] c.m.mall.tiny.config.MqttInboundConfig   : handleMessage : 来自网页上的音讯 2020-09-17 14:29:07.384  INFO 11192 --- [ubscriberClient] c.m.mall.tiny.config.MqttInboundConfig   : handleMessage : 来自网页上的音讯
复制代码

总结

消息中间件利用越来越宽泛,不仅能够实现牢靠的异步通信,还能够实现即时通讯,把握一个消息中间件还是很有必要的。如果没有非凡业务需要,客户端或者前端间接应用 MQTT 对接消息中间件即可实现即时通讯,有非凡需要的时候也能够应用 SpringBoot 集成 MQTT 的形式来实现,总之消息中间件是实现即时通讯的一个好抉择!

参考:《2020 最新 Java 根底精讲视频教程和学习路线!》
链接:https://juejin.cn/post/693524…

正文完
 0