共计 6779 个字符,预计需要花费 17 分钟才能阅读完成。
在应用 spring cloud 云架构的时候,咱们不得不应用 Spring cloud Stream,因为消息中间件的应用在我的项目中无处不在,咱们公司前面做了娱乐方面的 APP,在应用 spring cloud 做架构的时候,其中音讯的异步告诉,业务的异步解决都须要应用消息中间件机制。spring cloud 的官网给出的集成倡议(应用 rabbit mq 和 kafka),我看了一下源码和配置,只有把 rabbit mq 集成,kafka 只是换了一个 pom 配置 jar 包而已,闲话少说,咱们就间接进入配置施行:
- 简介:
Spring cloud Stream 数据流操作开发包,封装了与 Redis,Rabbit、Kafka 等发送接管音讯。
- 应用工具:
rabbit,具体的下载和装置细节我这里不做太多解说,网上的实例太多了
- 创立 commonservice-mq-producer 音讯的发送者我的项目,在 pom 外面配置 stream-rabbit 的依赖
1. <span style="font-size: 16px;"><!-- 引入 MQ 音讯驱动的微服务包,引入 stream 只须要进行配置化即可,是对 rabbit、kafka 很好的封装 -->
2. <dependency>
3. <groupId>org.springframework.cloud</groupId>
4. <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
5. </dependency></span>
- 在 yml 文件外面配置 rabbit mq
1. <span style="font-size: 16px;">server:
2. port: 5666
3. spring:
4. application:
5. name: commonservice-mq-producer
6. profiles:
7. active: dev
8. cloud:
9. config:
10. discovery:
11. enabled: true
12. service-id: commonservice-config-server
13. <span style="color: #ff0000;"># rabbitmq 和 kafka 都有相干配置的默认值,如果批改,能够再次进行配置
14. stream:
15. bindings:
16. mqScoreOutput:
17. destination: honghu_exchange
18. contentType: application/json
20. rabbitmq:
21. host: localhost
22. port: 5672
23. username: honghu
24. password: honghu</span>
25. eureka:
26. client:
27. service-url:
28. defaultZone: http://honghu:123456@localhost:8761/eureka
29. instance:
30. prefer-ip-address: true</span>
- 定义接口 ProducerService
1. <span style="font-size: 16px;">package com.honghu.cloud.producer;
3. import org.springframework.cloud.stream.annotation.Output;
4. import org.springframework.messaging.SubscribableChannel;
6. public interface ProducerService {
8. String SCORE_OUPUT = "mqScoreOutput";
10. @Output(ProducerService.SCORE_OUPUT)
11. SubscribableChannel sendMessage();
12. }</span>
- 定义绑定
1. <span style="font-size: 16px;">package com.honghu.cloud.producer;
3. import org.springframework.cloud.stream.annotation.EnableBinding;
5. @EnableBinding(ProducerService.class)
6. public class SendServerConfig {8.}</span>
- 定义发送音讯业务 ProducerController
1. <span style="font-size: 16px;">package com.honghu.cloud.controller;
4. import org.springframework.beans.factory.annotation.Autowired;
5. import org.springframework.integration.support.MessageBuilder;
6. import org.springframework.messaging.Message;
7. import org.springframework.web.bind.annotation.PathVariable;
8. import org.springframework.web.bind.annotation.RequestBody;
9. import org.springframework.web.bind.annotation.RequestMapping;
10. import org.springframework.web.bind.annotation.RequestMethod;
11. import org.springframework.web.bind.annotation.RestController;
13. import com.honghu.cloud.common.code.ResponseCode;
14. import com.honghu.cloud.common.code.ResponseVO;
15. import com.honghu.cloud.entity.User;
16. import com.honghu.cloud.producer.ProducerService;
18. import net.sf.json.JSONObject;
20. @RestController
21. @RequestMapping(value = "producer")
22. public class ProducerController {
24. @Autowired
25. private ProducerService producerService;
28. /**
29. * 通过 get 形式发送 </span> 对象 <span >
30. * @param name 门路参数
31. * @return 胜利 | 失败
32. */
33. @RequestMapping(value = "/sendObj", method = RequestMethod.GET)
34. public ResponseVO sendObj() {35. User user = new User(1, "hello User");
36. <span style="color: #ff0000;">Message<User> msg = MessageBuilder.withPayload(user).build();</span>
37. boolean result = producerService.sendMessage().send(msg);
38. if(result){39. return ResponseCode.buildEnumResponseVO(ResponseCode.RESPONSE_CODE_SUCCESS, false);
40. }
41. return ResponseCode.buildEnumResponseVO(ResponseCode.RESPONSE_CODE_FAILURE, false);
42. }
45. /**
46. * 通过 get 形式发送字符串音讯
47. * @param name 门路参数
48. * @return 胜利 | 失败
49. */
50. @RequestMapping(value = "/send/{name}", method = RequestMethod.GET)
51. public ResponseVO send(@PathVariable(value = "name", required = true) String name) {52. Message msg = MessageBuilder.withPayload(name.getBytes()).build();
53. boolean result = producerService.sendMessage().send(msg);
54. if(result){55. return ResponseCode.buildEnumResponseVO(ResponseCode.RESPONSE_CODE_SUCCESS, false);
56. }
57. return ResponseCode.buildEnumResponseVO(ResponseCode.RESPONSE_CODE_FAILURE, false);
58. }
60. /**
61. * 通过 post 形式发送 </span>json 对象 <span >
62. * @param name 门路参数
63. * @return 胜利 | 失败
64. */
65. @RequestMapping(value = "/sendJsonObj", method = RequestMethod.POST)
66. public ResponseVO sendJsonObj(@RequestBody JSONObject jsonObj) {67. Message<JSONObject> msg = MessageBuilder.withPayload(jsonObj).build();
68. boolean result = producerService.sendMessage().send(msg);
69. if(result){70. return ResponseCode.buildEnumResponseVO(ResponseCode.RESPONSE_CODE_SUCCESS, false);
71. }
72. return ResponseCode.buildEnumResponseVO(ResponseCode.RESPONSE_CODE_FAILURE, false);
73. }
74. }
75. </span>
- 创立 commonservice-mq-consumer1 音讯的消费者我的项目,在 pom 外面配置 stream-rabbit 的依赖
1. <!-- 引入 MQ 音讯驱动的微服务包,引入 stream 只须要进行配置化即可,是对 rabbit、kafka 很好的封装 -->
2. <dependency>
3. <groupId>org.springframework.cloud</groupId>
4. <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
5. </dependency>
- 在 yml 文件中配置:
1. server:
2. port: 5111
3. spring:
4. application:
5. name: commonservice-mq-consumer1
6. profiles:
7. active: dev
8. cloud:
9. config:
10. discovery:
11. enabled: true
12. service-id: commonservice-config-server
14. <span style="color: #ff0000;">stream:
15. bindings:
16. mqScoreInput:
17. group: honghu_queue
18. destination: honghu_exchange
19. contentType: application/json
21. rabbitmq:
22. host: localhost
23. port: 5672
24. username: honghu
25. password: honghu</span>
26. eureka:
27. client:
28. service-url:
29. defaultZone: http://honghu:123456@localhost:8761/eureka
30. instance:
31. prefer-ip-address: true
- 定义接口 ConsumerService
1. package com.honghu.cloud.consumer;
3. import org.springframework.cloud.stream.annotation.Input;
4. import org.springframework.messaging.SubscribableChannel;
6. public interface ConsumerService {
8. <span style="color: #ff0000;">String SCORE_INPUT = "mqScoreInput";
10. @Input(ConsumerService.SCORE_INPUT)
11. SubscribableChannel sendMessage();</span>
13. }
- 定义启动类和音讯生产
1. package com.honghu.cloud;
3. import org.springframework.boot.SpringApplication;
4. import org.springframework.boot.autoconfigure.SpringBootApplication;
5. import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
6. import org.springframework.cloud.stream.annotation.EnableBinding;
7. import org.springframework.cloud.stream.annotation.StreamListener;
9. import com.honghu.cloud.consumer.ConsumerService;
10. import com.honghu.cloud.entity.User;
12. @EnableEurekaClient
13. @SpringBootApplication
14. @EnableBinding(ConsumerService.class) // 能够绑定多个接口
15. public class ConsumerApplication {17. public static void main(String[] args) {18. SpringApplication.run(ConsumerApplication.class, args);
19. }
21. <span style="color: #ff0000;">@StreamListener(ConsumerService.SCORE_INPUT)
22. public void onMessage(Object obj) {23. System.out.println("消费者 1,接管到的音讯:" + obj);
24. }</span>
26. }
- 别离启动 commonservice-mq-producer、commonservice-mq-consumer1
- 通过 postman 来验证音讯的发送和接管
能够看到接管到了音讯,下一章咱们介绍 mq 的集群计划。
到此,整个音讯核心计划集成结束(企业架构源码能够加求球:叁五三陆二肆柒二伍玖)
欢送大家和我一起学习 spring cloud 构建微服务云架构,我这边会将近期研发的 spring cloud 微服务云架构的搭建过程和精华记录下来,帮忙更多有趣味研发 spring cloud 框架的敌人,大家来一起探讨 spring cloud 架构的搭建过程及如何使用于企业我的项目。
正文完