在应用spring cloud云架构的时候,咱们不得不应用Spring cloud Stream,因为消息中间件的应用在我的项目中无处不在,咱们公司前面做了娱乐方面的APP,在应用spring cloud做架构的时候,其中音讯的异步告诉,业务的异步解决都须要应用消息中间件机制。spring cloud的官网给出的集成倡议(应用rabbit mq和kafka),我看了一下源码和配置,只有把rabbit mq集成,kafka只是换了一个pom配置jar包而已,闲话少说,咱们就间接进入配置施行:
- 简介:
Spring cloud Stream 数据流操作开发包,封装了与Redis,Rabbit、Kafka等发送接管音讯。
- 应用工具:
rabbit,具体的下载和装置细节我这里不做太多解说,网上的实例太多了
- 创立commonservice-mq-producer音讯的发送者我的项目,在pom外面配置stream-rabbit的依赖
1. <span style="font-size: 16px;"><!-- 引入MQ音讯驱动的微服务包,引入stream只须要进行配置化即可,是对rabbit、kafka很好的封装 --> 2. <dependency> 3. <groupId>org.springframework.cloud</groupId> 4. <artifactId>spring-cloud-starter-stream-rabbit</artifactId> 5. </dependency></span>
- 在yml文件外面配置rabbit mq
1. <span style="font-size: 16px;">server: 2. port: 5666 3. spring: 4. application: 5. name: commonservice-mq-producer 6. profiles: 7. active: dev 8. cloud: 9. config: 10. discovery: 11. enabled: true 12. service-id: commonservice-config-server 13. <span style="color: #ff0000;"># rabbitmq和kafka都有相干配置的默认值,如果批改,能够再次进行配置 14. stream: 15. bindings: 16. mqScoreOutput: 17. destination: honghu_exchange 18. contentType: application/json 20. rabbitmq: 21. host: localhost 22. port: 5672 23. username: honghu 24. password: honghu</span> 25. eureka: 26. client: 27. service-url: 28. defaultZone: http://honghu:123456@localhost:8761/eureka 29. instance: 30. prefer-ip-address: true</span>
- 定义接口ProducerService
1. <span style="font-size: 16px;">package com.honghu.cloud.producer; 3. import org.springframework.cloud.stream.annotation.Output; 4. import org.springframework.messaging.SubscribableChannel; 6. public interface ProducerService { 8. String SCORE_OUPUT = "mqScoreOutput"; 10. @Output(ProducerService.SCORE_OUPUT) 11. SubscribableChannel sendMessage(); 12. }</span>
- 定义绑定
1. <span style="font-size: 16px;">package com.honghu.cloud.producer; 3. import org.springframework.cloud.stream.annotation.EnableBinding; 5. @EnableBinding(ProducerService.class) 6. public class SendServerConfig { 8. }</span>
- 定义发送音讯业务ProducerController
1. <span style="font-size: 16px;">package com.honghu.cloud.controller; 4. import org.springframework.beans.factory.annotation.Autowired; 5. import org.springframework.integration.support.MessageBuilder; 6. import org.springframework.messaging.Message; 7. import org.springframework.web.bind.annotation.PathVariable; 8. import org.springframework.web.bind.annotation.RequestBody; 9. import org.springframework.web.bind.annotation.RequestMapping; 10. import org.springframework.web.bind.annotation.RequestMethod; 11. import org.springframework.web.bind.annotation.RestController; 13. import com.honghu.cloud.common.code.ResponseCode; 14. import com.honghu.cloud.common.code.ResponseVO; 15. import com.honghu.cloud.entity.User; 16. import com.honghu.cloud.producer.ProducerService; 18. import net.sf.json.JSONObject; 20. @RestController 21. @RequestMapping(value = "producer") 22. public class ProducerController { 24. @Autowired 25. private ProducerService producerService; 28. /** 29. * 通过get形式发送</span>对象<span > 30. * @param name 门路参数 31. * @return 胜利|失败 32. */ 33. @RequestMapping(value = "/sendObj", method = RequestMethod.GET) 34. public ResponseVO sendObj() { 35. User user = new User(1, "hello User"); 36. <span style="color: #ff0000;">Message<User> msg = MessageBuilder.withPayload(user).build();</span> 37. boolean result = producerService.sendMessage().send(msg); 38. if(result){ 39. return ResponseCode.buildEnumResponseVO(ResponseCode.RESPONSE_CODE_SUCCESS, false); 40. } 41. return ResponseCode.buildEnumResponseVO(ResponseCode.RESPONSE_CODE_FAILURE, false); 42. } 45. /** 46. * 通过get形式发送字符串音讯 47. * @param name 门路参数 48. * @return 胜利|失败 49. */ 50. @RequestMapping(value = "/send/{name}", method = RequestMethod.GET) 51. public ResponseVO send(@PathVariable(value = "name", required = true) String name) { 52. Message msg = MessageBuilder.withPayload(name.getBytes()).build(); 53. boolean result = producerService.sendMessage().send(msg); 54. if(result){ 55. return ResponseCode.buildEnumResponseVO(ResponseCode.RESPONSE_CODE_SUCCESS, false); 56. } 57. return ResponseCode.buildEnumResponseVO(ResponseCode.RESPONSE_CODE_FAILURE, false); 58. } 60. /** 61. * 通过post形式发送</span>json对象<span > 62. * @param name 门路参数 63. * @return 胜利|失败 64. */ 65. @RequestMapping(value = "/sendJsonObj", method = RequestMethod.POST) 66. public ResponseVO sendJsonObj(@RequestBody JSONObject jsonObj) { 67. Message<JSONObject> msg = MessageBuilder.withPayload(jsonObj).build(); 68. boolean result = producerService.sendMessage().send(msg); 69. if(result){ 70. return ResponseCode.buildEnumResponseVO(ResponseCode.RESPONSE_CODE_SUCCESS, false); 71. } 72. return ResponseCode.buildEnumResponseVO(ResponseCode.RESPONSE_CODE_FAILURE, false); 73. } 74. } 75. </span>
- 创立commonservice-mq-consumer1音讯的消费者我的项目,在pom外面配置stream-rabbit的依赖
1. <!-- 引入MQ音讯驱动的微服务包,引入stream只须要进行配置化即可,是对rabbit、kafka很好的封装 --> 2. <dependency> 3. <groupId>org.springframework.cloud</groupId> 4. <artifactId>spring-cloud-starter-stream-rabbit</artifactId> 5. </dependency>
- 在yml文件中配置:
1. server: 2. port: 5111 3. spring: 4. application: 5. name: commonservice-mq-consumer1 6. profiles: 7. active: dev 8. cloud: 9. config: 10. discovery: 11. enabled: true 12. service-id: commonservice-config-server 14. <span style="color: #ff0000;">stream: 15. bindings: 16. mqScoreInput: 17. group: honghu_queue 18. destination: honghu_exchange 19. contentType: application/json 21. rabbitmq: 22. host: localhost 23. port: 5672 24. username: honghu 25. password: honghu</span> 26. eureka: 27. client: 28. service-url: 29. defaultZone: http://honghu:123456@localhost:8761/eureka 30. instance: 31. prefer-ip-address: true
- 定义接口ConsumerService
1. package com.honghu.cloud.consumer; 3. import org.springframework.cloud.stream.annotation.Input; 4. import org.springframework.messaging.SubscribableChannel; 6. public interface ConsumerService { 8. <span style="color: #ff0000;">String SCORE_INPUT = "mqScoreInput"; 10. @Input(ConsumerService.SCORE_INPUT) 11. SubscribableChannel sendMessage();</span> 13. }
- 定义启动类和音讯生产
1. package com.honghu.cloud; 3. import org.springframework.boot.SpringApplication; 4. import org.springframework.boot.autoconfigure.SpringBootApplication; 5. import org.springframework.cloud.netflix.eureka.EnableEurekaClient; 6. import org.springframework.cloud.stream.annotation.EnableBinding; 7. import org.springframework.cloud.stream.annotation.StreamListener; 9. import com.honghu.cloud.consumer.ConsumerService; 10. import com.honghu.cloud.entity.User; 12. @EnableEurekaClient 13. @SpringBootApplication 14. @EnableBinding(ConsumerService.class) //能够绑定多个接口 15. public class ConsumerApplication { 17. public static void main(String[] args) { 18. SpringApplication.run(ConsumerApplication.class, args); 19. } 21. <span style="color: #ff0000;">@StreamListener(ConsumerService.SCORE_INPUT) 22. public void onMessage(Object obj) { 23. System.out.println("消费者1,接管到的音讯:" + obj); 24. }</span> 26. }
- 别离启动commonservice-mq-producer、commonservice-mq-consumer1
- 通过postman来验证音讯的发送和接管
能够看到接管到了音讯,下一章咱们介绍mq的集群计划。
到此,整个音讯核心计划集成结束(企业架构源码能够加求球:叁五三陆二肆柒二伍玖)
欢送大家和我一起学习spring cloud构建微服务云架构,我这边会将近期研发的spring cloud微服务云架构的搭建过程和精华记录下来,帮忙更多有趣味研发spring cloud框架的敌人,大家来一起探讨spring cloud架构的搭建过程及如何使用于企业我的项目。