乐趣区

关于分布式:微服务架构中二次浅封装实践

一、背景简介

分布式系统中存在很多拆分的服务,在一直迭代降级的过程中,会呈现如下常见的辣手状况:

某个技术组件版本升级,依赖包降级导致局部语法或者 API 过期,或者组件修复紧急的破绽,从而会导致分布式系统下各个服务被动的降级迭代,很容易引发意外的问题;不同的服务中对组件的依赖和版本各不相同,从而导致不兼容问题的呈现,很难对版本做对立的治理和保护,一旦呈现问题很容易慌手慌脚,引发蝴蝶效应;

所以在简单的零碎中,对于依赖的框架和组件进行对立治理和二次浅封装,能够较大水平升高上述问题的解决老本与危险,同时能够更好的治理和控制技术栈。

二、框架浅封装

1、浅封装作用

为什么浅封装,外围目标在于对立治理和协调组件的依赖与降级,并对罕用办法做一层包装,实际上很多组件应用到的性能点并不多,只是在业务中的应用点很多,这样给组件自身的迭代降级带来了肯定的难度:

例如某个组件罕用的 API 中存在微小危险破绽,或者替换掉过期的用法,须要对整个零碎中波及的中央做降级,这种操作的老本是十分高的;

如果是对这种罕用的组件办法进行二次包装,作为解决业务的工具办法,那么解决下面的问题就绝对轻松许多,只有对封装的工具办法降级,服务的依赖降级即可,升高工夫老本和危险。

通过浅封装的伎俩,能够实现两个方面的解耦:

业务与技术

技术栈中罕用的办法进行二次浅封装,这样能够较大水平的升高业务与技术的耦合,如此能够独立的降级技术栈,扩大性能而不影响业务服务的迭代。

框架与组件

不同的框架与组件都须要肯定水平的自定义配置,同时分模块治理,在不同的服务中引入特定的依赖,也能够在根底包中做对立依赖,以此实现技术栈的疾速组合搭配。

这里说的浅封装,是指包装惯例罕用的语法,组件自身就是技术层面的深度封装,所以也不可能齐全隔开技术栈原生用法。

2、对立版本控制

例如微服务架构下,不同的研发组负责不同的业务模块,然而受到开发人员的教训和能力影响,很容易呈现不同的服务组件选型不统一,或者雷同的组件依赖版本不同,这样很难对系统架构做规范的对立治理。

对于二次封装的形式,能够严格的控制技术栈的迭代扩大,以及版本抵触的问题,通过对二次封装层的对立降级,能够疾速实现业务服务的降级,解决不同服务的依赖差别问题。

三、实际案例

1、案例简介

Java 分布式系统中,微服务根底组件(Nacos、Feign、Gateway、Seata)等,零碎中间件(Quartz、Redis、Kafka、ElasticSearch,Logstash)等,对罕用性能、配置、API 等,进行二次浅封装并对立集成治理,以满足日常开发中根底环境搭建与长期工具的疾速实现。

  • butte-flyer 组件封装的利用案例;
  • butte-frame 罕用技术组件二次封装;

2、分层架构

整体划分五层:网关层、应用层、业务层、中间件层、根底层,组合成一套分布式系统。

服务总览

服务名 分层 端口 缓存库 数据库 形容
flyer-gateway 网关层 8010 db1 nacos 路由管制
flyer-facade 应用层 8082 db2 facade 门面服务
flyer-admin 应用层 8083 db3 admin 后端治理
flyer-account 业务层 8084 db4 account 账户治理
flyer-quartz 业务层 8085 db5 quartz 定时工作
kafka 中间件 9092 —— 音讯队列
elasticsearch 中间件 9200 —— 搜索引擎
redis 中间件 6379 —— 缓存核心
logstash 中间件 5044 es6.8.6 日志采集
nacos 根底层 8848 nacos 注册配置
seata 根底层 8091 seata 散布事务
mysql 根底层 3306 —— 数据存储

3、目录构造

butte-frame 中对各个技术栈进行二次封装治理,在 butte-flyer 中进行依赖援用。

butte-frame
├── frame-base          根底代码块
├── frame-jdbc          数据库组件
├── frame-core          服务根底依赖
├── frame-gateway       路由网关
├── frame-nacos         注册与配置核心
├── frame-seata         分布式事务
├── frame-feign         服务间调用
├── frame-security      平安治理
├── frame-search        搜索引擎
├── frame-redis         缓存治理
├── frame-kafka         消息中间件
├── frame-quartz        定时工作
├── frame-swagger       接口文档
└── frame-sleuth        链路日志

butte-flyer
├── flyer-gateway       网关服务:路由管制
├── flyer-facade        门面服务:性能合作接口
├── flyer-account       账户服务:用户账户
├── flyer-quartz        工作服务:定时工作
└── flyer-admin         治理服务:后端治理

4、技术栈组件

零碎罕用的技术栈:根底框架、微服务组件、缓存、平安治理、数据库、定时工作、工具依赖等。

名称 版本 阐明
spring-cloud 2.2.5.RELEASE 微服务框架根底
spring-boot 2.2.5.RELEASE 服务根底依赖
gateway 2.2.5.RELEASE 路由网关
nacos 2.2.5.RELEASE 注册核心与配置管理
seata 2.2.5.RELEASE 分布式事务管理
feign 2.2.5.RELEASE 微服务间申请调用
security 2.2.5.RELEASE 平安治理
sleuth 2.2.5.RELEASE 申请轨迹链路
security-jwt 1.0.10.RELEASE JWT 加密组件
hikari 3.4.2 数据库连接池,默认
mybatis-plus 3.4.2 ORM 长久层框架
kafka 2.0.1 MQ 音讯队列
elasticsearch 6.8.6 搜索引擎
logstash 5.2 日志采集
redis 2.2.5.RELEASE 缓存治理与加锁管制
quartz 2.3.2 定时工作治理
swagger 2.6.1 接口文档
apache-common 2.7.0 根底依赖包
hutool 5.3.1 根底工具包

四、微服务组件

1、Nacos

Nacos 在整个组件体系中,提供两个外围能力,注册发现:适配微服务注册与发现规范,疾速实现动静服务注册发现、元数据管理等,提供微服务组件中最根底的能力;配置核心:对立治理各个服务配置,集中在 Nacos 中存储管理,隔离多环境的不同配置,并且能够躲避线上配置放开的危险;

连贯治理

spring:
  cloud:
    nacos:
      # 配置读取
      config:
        prefix: application
        server-addr: 127.0.0.1:8848
        file-extension: yml
        refresh-enabled: true
      # 注册核心
      discovery:
        server-addr: 127.0.0.1:8848

配置管理

  • bootstrap.yml:服务中文件,连贯和读取 Nacos 中配置信息;
  • application.yml:公共根底配置,这里配置 mybatis 组件;
  • application-dev.yml:中间件连贯配置,用作环境标识隔离;
  • application-def.yml:各个服务的自定义配置,参数加载;

2、Gateway

Gateway 网关外围能力,提供对立的 API 路由治理,作为微服务架构体系下申请惟一入口,还能够在网关层解决所有的非业务性能,例如:安全控制,流量监控限流,等等。

路由管制:各个服务的发现和路由;

@Component
public class RouteFactory implements RouteDefinitionRepository {

    @Resource
    private RouteService routeService ;

    /**
     * 加载全副路由
     * @since 2021-11-14 18:08
     */
    @Override
    public Flux<RouteDefinition> getRouteDefinitions() {return Flux.fromIterable(routeService.getRouteDefinitions());
    }

    /**
     * 增加路由
     * @since 2021-11-14 18:08
     */
    @Override
    public Mono<Void> save(Mono<RouteDefinition> routeMono) {
        return routeMono.flatMap(routeDefinition -> {routeService.saveRouter(routeDefinition);
            return Mono.empty();});
    }
}

全局过滤:作为网关的根底能力;

@Component
public class GatewayFilter implements GlobalFilter {private static final Logger logger = LoggerFactory.getLogger(GatewayFilter.class);

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {ServerHttpRequest request = exchange.getRequest();
        String uri = request.getURI().getPath() ;
        String host = String.valueOf(request.getHeaders().getHost()) ;
        logger.info("request host : {} , uri : {}",host,uri);
        return chain.filter(exchange);
    }
}

3、Feign

Feign 组件是申明式的 WebService 客户端,使微服务之间的调用变得更简略,Feign 通过注解伎俩,将申请进行模板化和接口化治理,能够更加规范的治理各个服务间的通信交互。

响应解码:定义 Feign 接口响应时解码逻辑,校验和管制对立的接口格调;

public class FeignDecode extends ResponseEntityDecoder {public FeignDecode(Decoder decoder) {super(decoder);
    }

    @Override
    public Object decode(Response response, Type type) {if (!type.getTypeName().startsWith(Rep.class.getName())) {throw new RuntimeException("响应格局异样");
        }
        try {return super.decode(response, type);
        } catch (IOException e) {e.printStackTrace();
            throw new RuntimeException(e.getMessage());
        }
    }
}

4、Seata

Seata 组件是开源的分布式事务解决方案,致力于提供高性能和简略易用的分布式事务服务,实现 AT、TCC、SAGA、XA 事务模式,反对一站式的分布式解决方案。

事务配置:基于 nacos 治理 Seata 组件的参数定义;

服务注册:在须要治理分布式事务的服务中连贯和应用 Seata 服务;

seata:
  enabled: true
  application-id: ${spring.application.name}
  tx-service-group: butte-seata-group
  config:
    type: nacos
    nacos:
      server-addr: ${spring.cloud.nacos.config.server-addr}
      group: DEFAULT_GROUP
  registry:
    type: nacos
    nacos:
      server-addr: ${spring.cloud.nacos.config.server-addr}
      application: seata-server
      group: DEFAULT_GROUP

五、中间件集成

1、Kafka

Kafka 是由 Apache 开源,具备分布式、分区的、多正本的、多订阅者,基于 Zookeeper 协调的分布式音讯解决平台,由 Scala 和 Java 语言编写。还罕用于收集用户在应用服务中产生的日志数据。

音讯发送:封装音讯发送的根底能力;

@Component
public class KafkaSendOperate {

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate ;

    public void send (SendMsgVO entry) {kafkaTemplate.send(entry.getTopic(),entry.getKey(),entry.getMsgBody()) ;
    }
}

音讯生产:生产监听时有两种策略;

  • 音讯生产方本人生产,通过 Feign 接口去执行具体生产服务的逻辑,这样有利于流程跟踪排查;
  • 音讯生产方间接监听,缩小音讯解决的流程节点,当然也能够打造对立的 MQ 总线服务(文尾);
public class KafkaListen {private static final Logger logger = LoggerFactory.getLogger(KafkaListen.class);
    /**
     * Kafka 音讯监听
     * @since 2021-11-06 16:47
     */
    @KafkaListener(topics = KafkaTopic.USER_TOPIC)
    public void listenUser (ConsumerRecord<?,String> record, Acknowledgment acknowledgment) {
        try {String key =  String.valueOf(record.key());
            String body = record.value();
            switch (key){}} catch (Exception e){e.printStackTrace();
        } finally {acknowledgment.acknowledge();
        }
    }
}

2、Redis

Redis 是一款开源组件,基于内存的高性能的 key-value 数据结构存储系统,它能够用作数据库、缓存和消息中间件,反对多种类型的数据结构,如字符串、汇合等。在理论利用中,通常用来做变动频率低的热点数据缓存和加锁机制。

KV 数据缓存:作为 Redis 最罕用的性能,即缓存一个指定有效期的键和值,在应用时间接获取;

@Component
public class RedisKvOperate {

    @Resource
    private StringRedisTemplate stringRedisTemplate ;

    /**
     * 创立缓存,必须带缓存时长
     * @param key 缓存 Key
     * @param value 缓存 Value
     * @param expire 单位秒
     * @return boolean
     * @since 2021-08-07 21:12
     */
    public boolean set (String key, String value, long expire) {
        try {stringRedisTemplate.opsForValue().set(key,value,expire, TimeUnit.SECONDS);
        } catch (Exception e){e.printStackTrace();
            return Boolean.FALSE ;
        }
        return Boolean.TRUE ;
    }
}

Lock 加锁机制 :基于spring-integration-redisRedisLockRegistry,实现分布式锁;

@Component
public class RedisLockOperate {

    @Resource
    protected RedisLockRegistry redisLockRegistry;

    /**
     * 尝试一次加锁,采纳默认工夫
     * @param lockKey 加锁 Key
     * @return java.lang.Boolean
     * @since 2021-09-12 13:14
     */
    @SneakyThrows
    public <T> Boolean tryLock(T lockKey) {return redisLockRegistry.obtain(lockKey).tryLock(time, TimeUnit.MILLISECONDS);
    }

    /**
     * 开释锁
     * @param lockKey 解锁 Key
     * @since 2021-09-12 13:32
     */
    public <T> void unlock(T lockKey) {redisLockRegistry.obtain(lockKey).unlock();}

}

3、ElasticSearch

ElasticSearch 是一个基于 Lucene 的搜寻服务器,它提供了一个分布式多用户能力的全文搜索引擎,基于 RESTful web 接口,Elasticsearch 是用 Java 开发的,是以后风行的企业级搜索引擎。

索引治理:索引的创立和删除,构造增加和查问;

基于 ElasticsearchRestTemplate 的模板办法操作;

@Component
public class TemplateOperate {

    @Resource
    private ElasticsearchRestTemplate template ;

    /**
     * 创立索引和构造
     * @param clazz 基于注解类实体
     * @return java.lang.Boolean
     * @since 2021-08-15 19:25
     */
    public <T> Boolean createPut (Class<T> clazz){boolean createIf = template.createIndex(clazz) ;
        if (createIf){return template.putMapping(clazz) ;
        }
        return Boolean.FALSE ;
    }
}

基于 RestHighLevelClient 原生 API 操作;

@Component
public class IndexOperate {

    @Resource
    private RestHighLevelClient client ;

    /**
     * 判断索引是否存在
     * @return boolean
     * @since 2021-08-07 18:57
     */
    public boolean exists (IndexVO entry) {GetIndexRequest getReq = new GetIndexRequest (entry.getIndexName()) ;
        try {return client.indices().exists(getReq, entry.getOptions());
        } catch (Exception e) {e.printStackTrace();
        }
        return Boolean.FALSE ;
    }
}

数据管理:数据新增、主键查问、批改、批量操作,业务性质的搜寻封装复杂度很高;

数据的增删改办法;

@Component
public class DataOperate {

    @Resource
    private RestHighLevelClient client ;

    /**
     * 批量更新数据
     * @param entry 对象主体
     * @since 2021-08-07 18:16
     */
    public void bulkUpdate (DataVO entry){if (CollUtil.isEmpty(entry.getDataList())){return ;}
        // 申请条件
        BulkRequest bulkUpdate = new BulkRequest(entry.getIndexName(),entry.getType()) ;
        bulkUpdate.setRefreshPolicy(entry.getRefresh()) ;
        entry.getDataList().forEach(dataMap -> {UpdateRequest updateReq = new UpdateRequest() ;
            updateReq.id(String.valueOf(dataMap.get("id"))) ;
            updateReq.doc(dataMap) ;
            bulkUpdate.add(updateReq) ;
        });
        try {
            // 执行申请
            client.bulk(bulkUpdate, entry.getOptions());
        } catch (IOException e) {e.printStackTrace();
        }
    }
}

索引主键查问,分组查询方法;

@Component
public class QueryOperate {

    @Resource
    private RestHighLevelClient client ;

    /**
     * 指定字段分组查问
     * @since 2021-10-07 19:00
     */
    public Map<String,Object> groupByField (QueryVO entry){Map<String,Object> groupMap = new HashMap<>() ;
        // 分组 API
        String groupName = entry.getGroupField()+"_group" ;
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        sourceBuilder.size(0) ;
        TermsAggregationBuilder termAgg = AggregationBuilders.terms(groupName)
                                                             .field(entry.getGroupField()) ;
        sourceBuilder.aggregation(termAgg);
        // 查问 API
        SearchRequest searchRequest = new SearchRequest(entry.getIndexName());
        searchRequest.source(sourceBuilder) ;
        try {
            // 执行 API
            SearchResponse response = client.search(searchRequest, entry.getOptions());
            // 响应后果
            Terms groupTerm = response.getAggregations().get(groupName) ;
            if (CollUtil.isNotEmpty(groupTerm.getBuckets())){for (Terms.Bucket bucket:groupTerm.getBuckets()){groupMap.put(bucket.getKeyAsString(),bucket.getDocCount()) ;
                }
            }
        } catch (IOException e) {e.printStackTrace();
        }
        return groupMap ;
    }
}

4、Logstash

Logstash 是一款开源的数据采集组件,具备实时管道性能。Logstash 可能动静的从多个起源采集数据,进行标准化转换数据,并将数据传输到所抉择的存储容器。

  • Sleuth:治理服务链路,提供外围 TraceId 和 SpanId 生成;
  • ElasticSearch:基于 ES 引擎做日志聚合存储和查问;
  • Logstash:提供日志采集服务,和数据发送 ES 的能力;

logback.xml:服务连贯 Logstash 地址,并加载外围配置;

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <include resource="org/springframework/boot/logging/logback/defaults.xml" />

    <springProperty scope="context" name="APP_NAME" source="spring.application.name" defaultValue="butte_app" />
    <springProperty scope="context" name="DES_URI" source="logstash.destination.uri" />
    <springProperty scope="context" name="DES_PORT" source="logstash.destination.port" />

    <!-- 输入到 LogStash 配置,须要启动 LogStash 服务 -->
    <appender name="LogStash"
              class="net.logstash.logback.appender.LogstashTcpSocketAppender">
        <destination>${DES_URI:-}:${DES_PORT:-}</destination>
        <encoder
                class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
            <providers>
                <timestamp>
                    <timeZone>UTC</timeZone>
                </timestamp>
                <pattern>
                    <pattern>
                        {
                        "severity": "%level",
                        "service": "${APP_NAME:-}",
                        "trace": "%X{X-B3-TraceId:-}",
                        "span": "%X{X-B3-SpanId:-}",
                        "exportable": "%X{X-Span-Export:-}",
                        "pid": "${PID:-}",
                        "thread": "%thread",
                        "class": "%logger{40}",
                        "rest": "%message"
                        }
                    </pattern>
                </pattern>
            </providers>
        </encoder>
    </appender>
</configuration>

5、Quartz

Quartz 是一个齐全由 java 编写的开源作业调度框架,用来执行各个服务中的定时调度工作,在微服务体系架构下,通常开发一个独立的 Quartz 服务,通过 Feign 接口去触发各个服务的工作执行。

配置参数:定时工作根底信息,数据库表,线程池;

spring:
  quartz:
    job-store-type: jdbc
    properties:
      org:
        quartz:
          scheduler:
            instanceName: ButteScheduler
            instanceId: AUTO
          jobStore:
            class: org.quartz.impl.jdbcjobstore.JobStoreTX
            driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate
            tablePrefix: qrtz_
            isClustered: true
            clusterCheckinInterval: 15000
            useProperties: false
          threadPool:
            class: org.quartz.simpl.SimpleThreadPool
            threadPriority: 5
            threadCount: 10
            threadsInheritContextClassLoaderOfInitializingThread: true

6、Swagger

Swagger 是罕用的接口文档治理组件,通过对 API 接口和对象的简略正文,疾速生成接口形容信息,并且提供可视化界面能够疾速对接口发送申请和调试,该组件在前后端联调中,极大的提高效率。

配置根本的包扫描能力即可;

@Configuration
public class SwaggerConfig {

    @Bean
    public Docket createRestApi() {return new Docket(DocumentationType.SWAGGER_2)
                .apiInfo(apiInfo())
                .select()
                .apis(RequestHandlerSelectors.basePackage("com.butte"))
                .paths(PathSelectors.any())
                .build();}
}

拜访:服务: 端口 /swagger-ui.html即可关上接口文档;

六、数据库配置

1、MySQL

微服务架构下,不同的服务对应不同的 MySQL 库,基于业务模块做库的划分是以后罕用的形式,能够对各自业务下的服务做迭代降级,同时能够防止单点故障导致雪崩效应。

2、HikariCP

HikariCP 作为 SpringBoot2 版本举荐和默认采纳的数据库连接池,具备速度极快、轻量简略的特点。

spring:
  datasource:
    type: com.zaxxer.hikari.HikariDataSource
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3306/${data.name.mysql}?${spring.datasource.db-param}
    username: root
    password: 123456
    db-param: useUnicode=true&characterEncoding=UTF8&zeroDateTimeBehavior=convertToNull&useSSL=false
    hikari:
      minimumIdle: 5
      maximumPoolSize: 10
      idleTimeout: 300000
      maxLifetime: 500000
      connectionTimeout: 30000

连接池的配置依据业务的并发需求量,做适当的调优即可。

3、Mybatis

Mybatis 长久层的框架组件,反对定制化 SQL、存储过程以及高级映射,MyBatis-Plus 是一个 MyBatis 的加强工具,在 MyBatis 的根底上只做加强不做扭转,能够简化开发、提高效率。

mybatis-plus:
  mapper-locations: classpath*:/mapper/**/*.xml
  configuration:
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
  • 同系列 - 架构:┃ 分布式 ┃ 消息中间件 ┃ 事务管理 ┃ 高并发 ┃ 缓存治理 ┃
  • 同系列 - 组件:┃ Kafka 音讯 ┃ ElasticSearch 搜寻 ┃ Redis 缓存 ┃ Quartz 工作 ┃ Swagger2 接口 ┃

七、源代码地址

利用仓库:https://gitee.com/cicadasmile/butte-flyer-parent

组件封装:https://gitee.com/cicadasmile/butte-frame-parent

退出移动版