一、背景简介
分布式系统中存在很多拆分的服务,在一直迭代降级的过程中,会呈现如下常见的辣手状况:
某个技术组件版本升级,依赖包降级导致局部语法或者 API 过期,或者组件修复紧急的破绽,从而会导致分布式系统下各个服务被动的降级迭代,很容易引发意外的问题;不同的服务中对组件的依赖和版本各不相同,从而导致不兼容问题的呈现,很难对版本做对立的治理和保护,一旦呈现问题很容易慌手慌脚,引发蝴蝶效应;
所以在简单的零碎中,对于依赖的框架和组件进行对立治理和二次浅封装,能够较大水平升高上述问题的解决老本与危险,同时能够更好的治理和控制技术栈。
二、框架浅封装
1、浅封装作用
为什么浅封装,外围目标在于对立治理和协调组件的依赖与降级,并对罕用办法做一层包装,实际上很多组件应用到的性能点并不多,只是在业务中的应用点很多,这样给组件自身的迭代降级带来了肯定的难度:
例如某个组件罕用的 API 中存在微小危险破绽,或者替换掉过期的用法,须要对整个零碎中波及的中央做降级,这种操作的老本是十分高的;
如果是对这种罕用的组件办法进行二次包装,作为解决业务的工具办法,那么解决下面的问题就绝对轻松许多,只有对封装的工具办法降级,服务的依赖降级即可,升高工夫老本和危险。
通过浅封装的伎俩,能够实现两个方面的解耦:
业务与技术
技术栈中罕用的办法进行二次浅封装,这样能够较大水平的升高业务与技术的耦合,如此能够独立的降级技术栈,扩大性能而不影响业务服务的迭代。
框架与组件
不同的框架与组件都须要肯定水平的自定义配置,同时分模块治理,在不同的服务中引入特定的依赖,也能够在根底包中做对立依赖,以此实现技术栈的疾速组合搭配。
这里说的浅封装,是指包装惯例罕用的语法,组件自身就是技术层面的深度封装,所以也不可能齐全隔开技术栈原生用法。
2、对立版本控制
例如微服务架构下,不同的研发组负责不同的业务模块,然而受到开发人员的教训和能力影响,很容易呈现不同的服务组件选型不统一,或者雷同的组件依赖版本不同,这样很难对系统架构做规范的对立治理。
对于二次封装的形式,能够严格的控制技术栈的迭代扩大,以及版本抵触的问题,通过对二次封装层的对立降级,能够疾速实现业务服务的降级,解决不同服务的依赖差别问题。
三、实际案例
1、案例简介
Java 分布式系统中,微服务根底组件(Nacos、Feign、Gateway、Seata)等,零碎中间件(Quartz、Redis、Kafka、ElasticSearch,Logstash)等,对罕用性能、配置、API 等,进行二次浅封装并对立集成治理,以满足日常开发中根底环境搭建与长期工具的疾速实现。
- butte-flyer 组件封装的利用案例;
- butte-frame 罕用技术组件二次封装;
2、分层架构
整体划分五层:网关层、应用层、业务层、中间件层、根底层,组合成一套分布式系统。
服务总览
服务名 | 分层 | 端口 | 缓存库 | 数据库 | 形容 |
---|---|---|---|---|---|
flyer-gateway | 网关层 | 8010 | db1 | nacos | 路由管制 |
flyer-facade | 应用层 | 8082 | db2 | facade | 门面服务 |
flyer-admin | 应用层 | 8083 | db3 | admin | 后端治理 |
flyer-account | 业务层 | 8084 | db4 | account | 账户治理 |
flyer-quartz | 业务层 | 8085 | db5 | quartz | 定时工作 |
kafka | 中间件 | 9092 | — | —— | 音讯队列 |
elasticsearch | 中间件 | 9200 | — | —— | 搜索引擎 |
redis | 中间件 | 6379 | — | —— | 缓存核心 |
logstash | 中间件 | 5044 | — | es6.8.6 | 日志采集 |
nacos | 根底层 | 8848 | — | nacos | 注册配置 |
seata | 根底层 | 8091 | — | seata | 散布事务 |
mysql | 根底层 | 3306 | — | —— | 数据存储 |
3、目录构造
在 butte-frame
中对各个技术栈进行二次封装治理,在 butte-flyer
中进行依赖援用。
butte-frame
├── frame-base 根底代码块
├── frame-jdbc 数据库组件
├── frame-core 服务根底依赖
├── frame-gateway 路由网关
├── frame-nacos 注册与配置核心
├── frame-seata 分布式事务
├── frame-feign 服务间调用
├── frame-security 平安治理
├── frame-search 搜索引擎
├── frame-redis 缓存治理
├── frame-kafka 消息中间件
├── frame-quartz 定时工作
├── frame-swagger 接口文档
└── frame-sleuth 链路日志
butte-flyer
├── flyer-gateway 网关服务:路由管制
├── flyer-facade 门面服务:性能合作接口
├── flyer-account 账户服务:用户账户
├── flyer-quartz 工作服务:定时工作
└── flyer-admin 治理服务:后端治理
4、技术栈组件
零碎罕用的技术栈:根底框架、微服务组件、缓存、平安治理、数据库、定时工作、工具依赖等。
名称 | 版本 | 阐明 |
---|---|---|
spring-cloud | 2.2.5.RELEASE | 微服务框架根底 |
spring-boot | 2.2.5.RELEASE | 服务根底依赖 |
gateway | 2.2.5.RELEASE | 路由网关 |
nacos | 2.2.5.RELEASE | 注册核心与配置管理 |
seata | 2.2.5.RELEASE | 分布式事务管理 |
feign | 2.2.5.RELEASE | 微服务间申请调用 |
security | 2.2.5.RELEASE | 平安治理 |
sleuth | 2.2.5.RELEASE | 申请轨迹链路 |
security-jwt | 1.0.10.RELEASE | JWT 加密组件 |
hikari | 3.4.2 | 数据库连接池,默认 |
mybatis-plus | 3.4.2 | ORM 长久层框架 |
kafka | 2.0.1 | MQ 音讯队列 |
elasticsearch | 6.8.6 | 搜索引擎 |
logstash | 5.2 | 日志采集 |
redis | 2.2.5.RELEASE | 缓存治理与加锁管制 |
quartz | 2.3.2 | 定时工作治理 |
swagger | 2.6.1 | 接口文档 |
apache-common | 2.7.0 | 根底依赖包 |
hutool | 5.3.1 | 根底工具包 |
四、微服务组件
1、Nacos
Nacos 在整个组件体系中,提供两个外围能力,注册发现:适配微服务注册与发现规范,疾速实现动静服务注册发现、元数据管理等,提供微服务组件中最根底的能力;配置核心:对立治理各个服务配置,集中在 Nacos 中存储管理,隔离多环境的不同配置,并且能够躲避线上配置放开的危险;
连贯治理
spring:
cloud:
nacos:
# 配置读取
config:
prefix: application
server-addr: 127.0.0.1:8848
file-extension: yml
refresh-enabled: true
# 注册核心
discovery:
server-addr: 127.0.0.1:8848
配置管理
- bootstrap.yml:服务中文件,连贯和读取 Nacos 中配置信息;
- application.yml:公共根底配置,这里配置 mybatis 组件;
- application-dev.yml:中间件连贯配置,用作环境标识隔离;
- application-def.yml:各个服务的自定义配置,参数加载;
2、Gateway
Gateway 网关外围能力,提供对立的 API 路由治理,作为微服务架构体系下申请惟一入口,还能够在网关层解决所有的非业务性能,例如:安全控制,流量监控限流,等等。
路由管制:各个服务的发现和路由;
@Component
public class RouteFactory implements RouteDefinitionRepository {
@Resource
private RouteService routeService ;
/**
* 加载全副路由
* @since 2021-11-14 18:08
*/
@Override
public Flux<RouteDefinition> getRouteDefinitions() {return Flux.fromIterable(routeService.getRouteDefinitions());
}
/**
* 增加路由
* @since 2021-11-14 18:08
*/
@Override
public Mono<Void> save(Mono<RouteDefinition> routeMono) {
return routeMono.flatMap(routeDefinition -> {routeService.saveRouter(routeDefinition);
return Mono.empty();});
}
}
全局过滤:作为网关的根底能力;
@Component
public class GatewayFilter implements GlobalFilter {private static final Logger logger = LoggerFactory.getLogger(GatewayFilter.class);
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {ServerHttpRequest request = exchange.getRequest();
String uri = request.getURI().getPath() ;
String host = String.valueOf(request.getHeaders().getHost()) ;
logger.info("request host : {} , uri : {}",host,uri);
return chain.filter(exchange);
}
}
3、Feign
Feign 组件是申明式的 WebService 客户端,使微服务之间的调用变得更简略,Feign 通过注解伎俩,将申请进行模板化和接口化治理,能够更加规范的治理各个服务间的通信交互。
响应解码:定义 Feign 接口响应时解码逻辑,校验和管制对立的接口格调;
public class FeignDecode extends ResponseEntityDecoder {public FeignDecode(Decoder decoder) {super(decoder);
}
@Override
public Object decode(Response response, Type type) {if (!type.getTypeName().startsWith(Rep.class.getName())) {throw new RuntimeException("响应格局异样");
}
try {return super.decode(response, type);
} catch (IOException e) {e.printStackTrace();
throw new RuntimeException(e.getMessage());
}
}
}
4、Seata
Seata 组件是开源的分布式事务解决方案,致力于提供高性能和简略易用的分布式事务服务,实现 AT、TCC、SAGA、XA 事务模式,反对一站式的分布式解决方案。
事务配置:基于 nacos 治理 Seata 组件的参数定义;
服务注册:在须要治理分布式事务的服务中连贯和应用 Seata 服务;
seata:
enabled: true
application-id: ${spring.application.name}
tx-service-group: butte-seata-group
config:
type: nacos
nacos:
server-addr: ${spring.cloud.nacos.config.server-addr}
group: DEFAULT_GROUP
registry:
type: nacos
nacos:
server-addr: ${spring.cloud.nacos.config.server-addr}
application: seata-server
group: DEFAULT_GROUP
五、中间件集成
1、Kafka
Kafka 是由 Apache 开源,具备分布式、分区的、多正本的、多订阅者,基于 Zookeeper 协调的分布式音讯解决平台,由 Scala 和 Java 语言编写。还罕用于收集用户在应用服务中产生的日志数据。
音讯发送:封装音讯发送的根底能力;
@Component
public class KafkaSendOperate {
@Resource
private KafkaTemplate<String, String> kafkaTemplate ;
public void send (SendMsgVO entry) {kafkaTemplate.send(entry.getTopic(),entry.getKey(),entry.getMsgBody()) ;
}
}
音讯生产:生产监听时有两种策略;
- 音讯生产方本人生产,通过 Feign 接口去执行具体生产服务的逻辑,这样有利于流程跟踪排查;
- 音讯生产方间接监听,缩小音讯解决的流程节点,当然也能够打造对立的 MQ 总线服务(文尾);
public class KafkaListen {private static final Logger logger = LoggerFactory.getLogger(KafkaListen.class);
/**
* Kafka 音讯监听
* @since 2021-11-06 16:47
*/
@KafkaListener(topics = KafkaTopic.USER_TOPIC)
public void listenUser (ConsumerRecord<?,String> record, Acknowledgment acknowledgment) {
try {String key = String.valueOf(record.key());
String body = record.value();
switch (key){}} catch (Exception e){e.printStackTrace();
} finally {acknowledgment.acknowledge();
}
}
}
2、Redis
Redis 是一款开源组件,基于内存的高性能的 key-value 数据结构存储系统,它能够用作数据库、缓存和消息中间件,反对多种类型的数据结构,如字符串、汇合等。在理论利用中,通常用来做变动频率低的热点数据缓存和加锁机制。
KV 数据缓存:作为 Redis 最罕用的性能,即缓存一个指定有效期的键和值,在应用时间接获取;
@Component
public class RedisKvOperate {
@Resource
private StringRedisTemplate stringRedisTemplate ;
/**
* 创立缓存,必须带缓存时长
* @param key 缓存 Key
* @param value 缓存 Value
* @param expire 单位秒
* @return boolean
* @since 2021-08-07 21:12
*/
public boolean set (String key, String value, long expire) {
try {stringRedisTemplate.opsForValue().set(key,value,expire, TimeUnit.SECONDS);
} catch (Exception e){e.printStackTrace();
return Boolean.FALSE ;
}
return Boolean.TRUE ;
}
}
Lock 加锁机制 :基于spring-integration-redis
中RedisLockRegistry
,实现分布式锁;
@Component
public class RedisLockOperate {
@Resource
protected RedisLockRegistry redisLockRegistry;
/**
* 尝试一次加锁,采纳默认工夫
* @param lockKey 加锁 Key
* @return java.lang.Boolean
* @since 2021-09-12 13:14
*/
@SneakyThrows
public <T> Boolean tryLock(T lockKey) {return redisLockRegistry.obtain(lockKey).tryLock(time, TimeUnit.MILLISECONDS);
}
/**
* 开释锁
* @param lockKey 解锁 Key
* @since 2021-09-12 13:32
*/
public <T> void unlock(T lockKey) {redisLockRegistry.obtain(lockKey).unlock();}
}
3、ElasticSearch
ElasticSearch 是一个基于 Lucene 的搜寻服务器,它提供了一个分布式多用户能力的全文搜索引擎,基于 RESTful web 接口,Elasticsearch 是用 Java 开发的,是以后风行的企业级搜索引擎。
索引治理:索引的创立和删除,构造增加和查问;
基于 ElasticsearchRestTemplate
的模板办法操作;
@Component
public class TemplateOperate {
@Resource
private ElasticsearchRestTemplate template ;
/**
* 创立索引和构造
* @param clazz 基于注解类实体
* @return java.lang.Boolean
* @since 2021-08-15 19:25
*/
public <T> Boolean createPut (Class<T> clazz){boolean createIf = template.createIndex(clazz) ;
if (createIf){return template.putMapping(clazz) ;
}
return Boolean.FALSE ;
}
}
基于 RestHighLevelClient
原生 API 操作;
@Component
public class IndexOperate {
@Resource
private RestHighLevelClient client ;
/**
* 判断索引是否存在
* @return boolean
* @since 2021-08-07 18:57
*/
public boolean exists (IndexVO entry) {GetIndexRequest getReq = new GetIndexRequest (entry.getIndexName()) ;
try {return client.indices().exists(getReq, entry.getOptions());
} catch (Exception e) {e.printStackTrace();
}
return Boolean.FALSE ;
}
}
数据管理:数据新增、主键查问、批改、批量操作,业务性质的搜寻封装复杂度很高;
数据的增删改办法;
@Component
public class DataOperate {
@Resource
private RestHighLevelClient client ;
/**
* 批量更新数据
* @param entry 对象主体
* @since 2021-08-07 18:16
*/
public void bulkUpdate (DataVO entry){if (CollUtil.isEmpty(entry.getDataList())){return ;}
// 申请条件
BulkRequest bulkUpdate = new BulkRequest(entry.getIndexName(),entry.getType()) ;
bulkUpdate.setRefreshPolicy(entry.getRefresh()) ;
entry.getDataList().forEach(dataMap -> {UpdateRequest updateReq = new UpdateRequest() ;
updateReq.id(String.valueOf(dataMap.get("id"))) ;
updateReq.doc(dataMap) ;
bulkUpdate.add(updateReq) ;
});
try {
// 执行申请
client.bulk(bulkUpdate, entry.getOptions());
} catch (IOException e) {e.printStackTrace();
}
}
}
索引主键查问,分组查询方法;
@Component
public class QueryOperate {
@Resource
private RestHighLevelClient client ;
/**
* 指定字段分组查问
* @since 2021-10-07 19:00
*/
public Map<String,Object> groupByField (QueryVO entry){Map<String,Object> groupMap = new HashMap<>() ;
// 分组 API
String groupName = entry.getGroupField()+"_group" ;
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.size(0) ;
TermsAggregationBuilder termAgg = AggregationBuilders.terms(groupName)
.field(entry.getGroupField()) ;
sourceBuilder.aggregation(termAgg);
// 查问 API
SearchRequest searchRequest = new SearchRequest(entry.getIndexName());
searchRequest.source(sourceBuilder) ;
try {
// 执行 API
SearchResponse response = client.search(searchRequest, entry.getOptions());
// 响应后果
Terms groupTerm = response.getAggregations().get(groupName) ;
if (CollUtil.isNotEmpty(groupTerm.getBuckets())){for (Terms.Bucket bucket:groupTerm.getBuckets()){groupMap.put(bucket.getKeyAsString(),bucket.getDocCount()) ;
}
}
} catch (IOException e) {e.printStackTrace();
}
return groupMap ;
}
}
4、Logstash
Logstash 是一款开源的数据采集组件,具备实时管道性能。Logstash 可能动静的从多个起源采集数据,进行标准化转换数据,并将数据传输到所抉择的存储容器。
- Sleuth:治理服务链路,提供外围 TraceId 和 SpanId 生成;
- ElasticSearch:基于 ES 引擎做日志聚合存储和查问;
- Logstash:提供日志采集服务,和数据发送 ES 的能力;
logback.xml:服务连贯 Logstash 地址,并加载外围配置;
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<include resource="org/springframework/boot/logging/logback/defaults.xml" />
<springProperty scope="context" name="APP_NAME" source="spring.application.name" defaultValue="butte_app" />
<springProperty scope="context" name="DES_URI" source="logstash.destination.uri" />
<springProperty scope="context" name="DES_PORT" source="logstash.destination.port" />
<!-- 输入到 LogStash 配置,须要启动 LogStash 服务 -->
<appender name="LogStash"
class="net.logstash.logback.appender.LogstashTcpSocketAppender">
<destination>${DES_URI:-}:${DES_PORT:-}</destination>
<encoder
class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
<providers>
<timestamp>
<timeZone>UTC</timeZone>
</timestamp>
<pattern>
<pattern>
{
"severity": "%level",
"service": "${APP_NAME:-}",
"trace": "%X{X-B3-TraceId:-}",
"span": "%X{X-B3-SpanId:-}",
"exportable": "%X{X-Span-Export:-}",
"pid": "${PID:-}",
"thread": "%thread",
"class": "%logger{40}",
"rest": "%message"
}
</pattern>
</pattern>
</providers>
</encoder>
</appender>
</configuration>
5、Quartz
Quartz 是一个齐全由 java 编写的开源作业调度框架,用来执行各个服务中的定时调度工作,在微服务体系架构下,通常开发一个独立的 Quartz 服务,通过 Feign 接口去触发各个服务的工作执行。
配置参数:定时工作根底信息,数据库表,线程池;
spring:
quartz:
job-store-type: jdbc
properties:
org:
quartz:
scheduler:
instanceName: ButteScheduler
instanceId: AUTO
jobStore:
class: org.quartz.impl.jdbcjobstore.JobStoreTX
driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate
tablePrefix: qrtz_
isClustered: true
clusterCheckinInterval: 15000
useProperties: false
threadPool:
class: org.quartz.simpl.SimpleThreadPool
threadPriority: 5
threadCount: 10
threadsInheritContextClassLoaderOfInitializingThread: true
6、Swagger
Swagger 是罕用的接口文档治理组件,通过对 API 接口和对象的简略正文,疾速生成接口形容信息,并且提供可视化界面能够疾速对接口发送申请和调试,该组件在前后端联调中,极大的提高效率。
配置根本的包扫描能力即可;
@Configuration
public class SwaggerConfig {
@Bean
public Docket createRestApi() {return new Docket(DocumentationType.SWAGGER_2)
.apiInfo(apiInfo())
.select()
.apis(RequestHandlerSelectors.basePackage("com.butte"))
.paths(PathSelectors.any())
.build();}
}
拜访:服务: 端口 /swagger-ui.html
即可关上接口文档;
六、数据库配置
1、MySQL
微服务架构下,不同的服务对应不同的 MySQL 库,基于业务模块做库的划分是以后罕用的形式,能够对各自业务下的服务做迭代降级,同时能够防止单点故障导致雪崩效应。
2、HikariCP
HikariCP 作为 SpringBoot2 版本举荐和默认采纳的数据库连接池,具备速度极快、轻量简略的特点。
spring:
datasource:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/${data.name.mysql}?${spring.datasource.db-param}
username: root
password: 123456
db-param: useUnicode=true&characterEncoding=UTF8&zeroDateTimeBehavior=convertToNull&useSSL=false
hikari:
minimumIdle: 5
maximumPoolSize: 10
idleTimeout: 300000
maxLifetime: 500000
connectionTimeout: 30000
连接池的配置依据业务的并发需求量,做适当的调优即可。
3、Mybatis
Mybatis 长久层的框架组件,反对定制化 SQL、存储过程以及高级映射,MyBatis-Plus 是一个 MyBatis 的加强工具,在 MyBatis 的根底上只做加强不做扭转,能够简化开发、提高效率。
mybatis-plus:
mapper-locations: classpath*:/mapper/**/*.xml
configuration:
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
- 同系列 - 架构:┃ 分布式 ┃ 消息中间件 ┃ 事务管理 ┃ 高并发 ┃ 缓存治理 ┃
- 同系列 - 组件:┃ Kafka 音讯 ┃ ElasticSearch 搜寻 ┃ Redis 缓存 ┃ Quartz 工作 ┃ Swagger2 接口 ┃
七、源代码地址
利用仓库:https://gitee.com/cicadasmile/butte-flyer-parent
组件封装:https://gitee.com/cicadasmile/butte-frame-parent