服务幂等性架构设计
- 作者: 博学谷狂野架构师
-
GitHub:GitHub地址 (有我精心筹备的130本电子书PDF)
只分享干货、不吹水,让咱们一起加油!😄
防重表实现幂等
对于避免数据反复提交,还有一种解决方案就是通过防重表实现。
防重表的实现思路也非常简单,首先创立一张表作为防重表,同时在该表中建设一个或多个字段的惟一索引作为防重字段,用于保障并发状况下,数据只有一条。在向业务表中插入数据之前先向防重表插入,如果插入失败则示意是反复数据。
为什么不必乐观锁
对于防重表的解决方案,可能有人会说为什么不应用乐观锁,乐观锁在应用的过程中也是会产生死锁的。
乐观锁是通过锁表的形式实现的,假如当初一个用户A拜访表A(锁住了表A),而后试图拜访表B;
另一个用户B拜访表B(锁住了表B),而后试图拜访表A。 这时对于用户A来说,因为表B曾经被用户B锁住了,所以用户A必须等到用户B开释表B能力拜访。
同时对于用户B来说,因为表A曾经被用户A锁住了,所以用户B必须等到用户A开释表A能力拜访。此时死锁就曾经产生了。
浏览业余类书籍是Java程序员必备的学习形式之一。通过一直学习和积攒,能够一直进步本人的技术能力和职业程度,实现职业倒退的指标。
十分倡议大家重视浏览,并抉择一些有深度、有价值的书籍一直晋升本人的技术水平和能力。这些书籍包含Java编程语言、数据结构和算法、面向对象设计、设计模式、框架原理与利用等等。
对于一位2-3年的Java程序员来说,浏览业余类书籍是更加的重要,因为它们能够帮忙你扩大技术广度和深度,进步你的技术能力和职业程度。以下是我给这些程序员的一些倡议:
学会寻找优良的书籍:在抉择书籍时,要抉择那些被宽泛认可和举荐的经典书籍。能够通过搜寻网上的书籍举荐列表,向其余经验丰富的程序员求教,或者参考公司外部的学习打算,来找到好的书籍。
浏览无关设计模式和架构的书籍:对于Java程序员来说,把握设计模式和架构准则是十分重要的。能够抉择浏览《设计模式:可复用面向对象软件的根底》、《大话设计模式》、《Java程序员修炼之道》等书籍来深刻学习。
学习新的技术和框架:Java技术一直倒退,新的技术和框架也不断涌现。因而,Java程序员应该定期浏览无关新技术和框架的书籍,比方Spring、Spring Boot、MyBatis、Netty等。
学习算法和数据结构:算法和数据结构是编程根底,把握这些常识能够进步代码的品质和效率。能够抉择浏览《算法》、《算法导论》等书籍来学习算法和数据结构。
参考开源我的项目和源代码:浏览开源我的项目和源代码是十分无益的,能够学习到其余程序员的编码技巧和设计思路。能够抉择一些出名的开源我的项目,如Spring、MyBatis等来进行学习。
当然,我也晓得,光是倡议是不足以激发大家学习的能源的,所以,书也我也帮大家整顿好了,把饭喂到嘴里了,我只能帮你到这里了,剩下的就靠你本人了。
以下这份蕴含46本Java程序员必备经典的书单(豆瓣评分8分+),是我破费一个月工夫整顿的:GitHub:GitHub地址
惟一主键实现幂等
数据库惟一主键的实现次要是利用数据库中主键惟一束缚的个性,一般来说惟一主键比拟实用于“插入”时的幂等性,其能保障一张表中只能存在一条带该惟一主键的记录。
应用数据库惟一主键实现幂等性时须要留神的是,该主键一般来说并不是应用数据库中自增主键,而是应用分布式 ID 充当主键,这样能力能保障在分布式环境下 ID 的全局唯一性。
对于一些后盾零碎,并发量并不高的状况下,对于幂等的实现非常简单,通过这种思维即可实现幂等管制。
实用场景
- 插入操作
- 删除操作
应用限度
- 须要生成全局惟一主键 ID;
次要流程
次要流程如下:
- 客户端执行创立申请,调用服务端接口。
- 服务端执行业务逻辑,生成一个分布式
ID
,将该 ID 充当待插入数据的主键,然 后执数据插入操作,运行对应的SQL
语句。 -
服务端将该条数据插入数据库中,如果插入胜利则示意没有反复调用接口。如果抛出主键反复异样,则示意数据库中曾经存在该条记录,返回错误信息到客户端。
在业务执行前,先判断是否曾经操作过,如果没有则执行,否则判断为反复操作。
成果演示
在并发下拜访时,因为是基于id进行判断,那id值就必须要保障在屡次提交时,须要惟一。拜访流程如下:
@Override
@Transactional(rollbackFor = Exception.class)
public String addOrder(Order order) {
order.setCreateTime(new Date());
order.setUpdateTime(new Date());
//查问
Order orderResult = orderMapper.selectByPrimaryKey(order.getId());
Optional<Order> orderOptional = Optional.ofNullable(orderResult);
if (orderOptional.isPresent()){
return "repeat request";
}
int result = orderMapper.insert(order);
if (result != 1){
return "fail";
}
return "success";
}
对于上述性能实现,在并发下,并不能实现幂等性管制。通过jemeter测试,模仿50个并发,能够发现,插入了反复数据。产生了脏数据。
要解决这个问题,非常简单,在数据库层面增加惟一索引即可,将id设置为惟一索引,也是最容易想到的形式,一旦id呈现反复,就会出现异常,防止了脏数据的产生也能够解决永久性幂等。但该计划无奈用于分库分表状况,其只实用于单表状况。
乐观锁实现幂等性
数据库乐观锁计划个别只能实用于执行更新操作的过程,咱们能够提前在对应的数据表中多增加一个字段,充当以后数据的版本标识。
这样每次对该数据库该表的这条数据执行更新时,都会将该版本标识作为一个条件,值为上次待更新数据中的版本标识的值。
实用操作
- 更新操作
应用限度
- 须要数据库对应业务表中增加额定字段
问题抛出
扣减库存数据谬误
通过jemeter进行测试,能够发现。当模仿一万并发时,最终的库存数量是谬误的。这次要是因为当多线程拜访时,一个线程读取到了另外线程未提交的数据造成。
synchronized生效问题
对于当初的问题,暂不思考秒杀设计、队列申请串行化等,只思考如何通过锁进行解决,要通过锁解决的话,那最先想到的可能是synchronized。
依据synchronized定义,当多线程并发拜访时,会对以后加锁的办法产生阻塞,从而保障线程平安,防止脏数据。然而,真的能如预期的一样吗?
@Service
public class StockServiceImpl implements StockService {
@Autowired
private StockMapper stockMapper;
@Override
@Transactional(rollbackFor = Exception.class)
public synchronized int lessInventory(String goodsId, int num) {
return stockMapper.lessInventory(goodsId, num);
}
}
以后曾经在在办法上增加了synchronized,对以后办法对象进行了锁定。 通过Jemeter,模仿一万并发对其进行拜访。能够发现,依然呈现了脏数据。
事务导致锁生效
该问题的产生起因,就在于在办法上synchronized搭配应用了@Transactional。
首先synchronized锁定的是以后办法对象,而@Transactional会对以后办法进行AOP加强,动静代理出一个代理对象,在办法执行前开启事务,执行后提交事务。
所以synchronized和@Transactional其实操作的是两个不同的对象,换句话说就是@Transactional的事务操作并不在synchronized锁定范畴之内。
假如A线程执行完扣减库存办法,会开释锁并提交事务。但A线程开释锁但还没提交事务前,B线程执行扣减库存办法,B线程执行后,和A线程一起提交事务,就呈现了线程平安问题,造成脏数据的呈现。
乐观锁保障幂等
基于版本号实现
MySQL乐观锁是基于数据库实现分布式锁的一种实现,实现的形式有两种:
- 基于版本号
- 基于条件
然而实现思维都是基于MySQL的行锁思维来实现的。
- 批改数据表,增加version字段,默认值为0
- 批改StockMapper增加基于版本批改数据办法
@Update("update tb_stock set amount=amount-#{num},version=version+1 where goods_id=#{goodsId} and version=#{version}")
int lessInventoryByVersion(@Param("goodsId") String goodsId,@Param("num") int num,@Param("version") int version);
- 测试模仿一万并发进行数据批改,此时能够发现以后版本号从0变为1,且库存量正确。
基于条件实现
通过版本号管制是一种十分常见的形式,适宜于大多数场景。
但当初库存扣减的场景来说,通过版本号管制就是多人并发拜访购买时,查问时显示能够购买,但最终只有一个人能胜利,这也是不能够的。其实最终只有商品库存不产生超卖就能够。那此时就能够通过条件来进行管制。
- 批改StockMapper:
@Update("update tb_stock set amount=amount-#{num} where goods_id=#{goodsId} and amount-#{num}>=0")
int lessInventoryByVersionOut(@Param("goodsId") String goodsId,@Param("num") int num);
- 批改StockController:
@PutMapping("/lessInventoryByVersionOut/{goodsId}/{num}")
public String lessInventoryByVersionOut(@PathVariable("goodsId") String goodsId,@PathVariable("num") int num){
int result = stockService.lessInventoryByVersionOut(goodsId, num);
if (result == 1){
System.out.println("购买胜利");
return "success";
}
System.out.println("购买失败");
return "fail";
}
- 通过jemeter进行测试,能够发现当多人并发扣减库存时,管制住了商品超卖的问题。
乐观锁实现幂等性
在零碎中,不光要保障客户端拜访的幂等性,同时还要保障服务间幂等。
比拟常见的状况,当服务间进行调用时,因为网络抖动等起因呈现超时,则很有可能呈现数据谬误。此时在分布式环境下,就须要通过分布式事务或分布式锁来保证数据的一致性。分布式锁的解决方案中MySQL乐观锁就是其中一种实现。
feign超时重试成果演示
以上图为例,当客户端要生成订单时,能够基于token机制保障生成订单的幂等性,接着订单生成胜利后,还会基于feign调用库存服务进行库存扣减,此时则很有可能呈现,库存服务执行扣减库存胜利,然而当后果返回时,呈现网络抖动超时了,那么上游的订单服务则很有可能会发动重试,此时如果不进行扣减库存的幂等性保障的话,则呈现扣减库存执行屡次。
那能够先来演示当上游服务呈现提早,上游服务基于feign进行重试的成果。
- 以后是order调用feign,所以在order中会存在feignConfigure配置类,用于配置超时工夫与重试次数。
/**
* 自定义feign超时工夫、重试次数
* 默认超时为10秒,不会进行重试。
*/
@Configuration
public class FeignConfigure {
//超时工夫,工夫单位毫秒
public static int connectTimeOutMillis = 5000;
public static int readTimeOutMillis = 5000;
@Bean
public Request.Options options() {
return new Request.Options(connectTimeOutMillis, readTimeOutMillis);
}
//自定义重试次数
@Bean
public Retryer feignRetryer(){
Retryer retryer = new Retryer.Default(100, 1000, 4);
return retryer;
}
}
-
stock服务的StockController中demo办法会提早六秒。
通过这种形式模仿超时成果。此时在order中调用stock服务,能够发现,order服务会对stock服务调用四次。
这里就演示了服务间调用超时的成果,当上游服务超时,上游服务会进行重试。
服务调用超时库存屡次扣减
根据上述演示,当上游服务超时,上游服务就会进行重试。
那么联合以后的业务场景,当用户下单胜利去调用库存服务扣减库存时, 如果库存服务执行扣减库存胜利但返回后果超时,则上游订单服务就会重试,再次进行扣减库存,此时就会呈现同一订单商品库存被屡次扣减。
- 在订单服务中生成订单,并调用库存服务扣减库存
@Idemptent
@PostMapping("/genOrder")
public String genOrder(@RequestBody Order order){
String orderId = String.valueOf(idWorker.nextId());
order.setId(orderId);
order.setCreateTime(new Date());
order.setUpdateTime(new Date());
int result = orderService.addOrder(order);
if (result != 1){
System.out.println("fail");
return "fail";
}
//生成订单详情信息
List<String> goodsIdArray = JSON.parseArray(order.getGoodsIds(), String.class);
goodsIdArray.stream().forEach(goodsId->{
//插入订单详情
OrderDetail orderDetail = new OrderDetail();
orderDetail.setId(String.valueOf(idWorker.nextId()));
orderDetail.setGoodsId(goodsId);
orderDetail.setOrderId(orderId);
orderDetail.setGoodsName("heima");
orderDetail.setGoodsNum(1);
orderDetail.setGoodsPrice(1);
orderDetailService.addOrderDetail(orderDetail);
//扣减库存(不思考锁)
stockFeign.reduceStockNoLock(goodsId, orderDetail.getGoodsNum());
});
return "success";
}
- 库存服务间接基于商品信息进行库存扣减
@Update("update tb_stock set amount=amount-#{num} where goods_id=#{goodsId}")
int reduceStockNoLock(@Param("goodsId") String goodsId,@Param("num") Integer num);
@PutMapping("/reduceStockNoLock/{goodsId}/{num}")
public String reduceStockNoLock(@PathVariable("goodsId") String goodsId,
@PathVariable("num") Integer num) throws InterruptedException {
System.out.println("reduce stock");
int result = stockService.reduceStockNoLock(goodsId, num);
if (result != 1){
return "reduce stock fail";
}
//提早
TimeUnit.SECONDS.sleep(6000);
return "reduce stock success";
}
- 执行生成订单扣减库存,此时能够发现扣减库存办法被执行屡次,并且库存数量也被扣减了屡次
{"totalNum":1,"payMoney":1,"goodsIds":"['1271700536000909313']"}
乐观锁解决服务间重试保障幂等
- 批改StockMapper,增加乐观锁管制管制库存
@Update("update tb_stock set version=version+1,amount=amount-#{num} where goods_id=#{goodsId} and version=#{version} and amount-#{num}>=0")
int reduceStock(@Param("goodsId") String goodsId,@Param("num") Integer num,@Param("version") Integer version);
- 批改StockController,增加乐观锁扣减库存办法
/**
* 乐观锁扣减库存
* @param goodsId
* @param num
* @param version
* @return
*/
@PutMapping("/reduceStock/{goodsId}/{num}/{version}")
public int reduceStock(@PathVariable("goodsId") String goodsId,
@PathVariable("num") Integer num,
@PathVariable("version") Integer version) throws InterruptedException {
System.out.println("exec reduce stock");
int result = stockService.reduceStock(goodsId, num, version);
if (result != 1){
//扣减失败
return result;
}
//提早
TimeUnit.SECONDS.sleep(6000);
return result;
}
- 测试,能够发现尽管产生多次重试,然而库存只会被扣减胜利一次。保障了服务间的幂等性。
ps:order服务出现异常,是因为order服务会超时重试四次,但stock服务的提早每一次都是超过超时工夫的,所以最终在order服务才会呈现read timeout异样提醒。
音讯幂等
在零碎中当应用音讯队列时,无论做哪种技术选型,有很多问题是无论如何也不能漠视的,如:音讯必达、音讯幂等等。本章节以典型的RabbitMQ为例,解说如何保障音讯幂等的可施行解决方案,其余MQ选型均可参考。
音讯重试演示
音讯队列的音讯幂等性,次要是由MQ重试机制引起的。
因为音讯生产者将音讯发送到MQ-Server后,MQ-Server会将音讯推送到具体的音讯消费者。假如因为网络抖动或出现异常时,MQ-Server依据重试机制就会将音讯从新向音讯消费者推送,造成音讯消费者屡次收到雷同音讯,造成数据不统一。
在RabbitMQ中,音讯重试机制是默认开启的,但只会在consumer出现异常时,才会反复推送。在应用中,异样的呈现有可能是因为生产方又去调用第三方接口,因为网络抖动而造成异样,然而这个异样有可能是临时的。所以当消费者出现异常,能够让其重试几次,如果重试几次后,依然有异样,则须要进行数据弥补。
数据弥补计划:当重试屡次后依然出现异常,则让此条音讯进入死信队列,最终进入到数据库中,接着设置定时job查问这些数据,进行手动弥补。
本节中以consumer生产异样为演示主体,因而须要批改RabbitMQ配置文件。
批改配置文件
批改consumer一方的配置文件
# 消费者监听相干配置
listener:
simple:
retry:
# 开启消费者(程序出现异常)重试机制,默认开启并始终重试
enabled: true
# 最大重试次数
max-attempts: 5
# 重试间隔时间(毫秒)
initial-interval: 3000
设置生产异样
当consumer音讯监听类中增加异样,最终承受音讯时,能够发现,音讯在接管五次后,最终出现异常。
音讯幂等解决
要保障音讯幂等性的话,其实最终要解决的就是保障屡次操作,造成的影响是雷同的。那么其解决方案的思路与服务间幂等的思路其实根本都是统一的。
- 音讯防重表,解决思路与服务间幂等的防重表统一。
- redis:利用redis防重。
这两种计划是最常见的解决方案。其实现思路其实都是统一的。
代码实现
批改OrderController
/**
* 此处为了不便演示,不做根底增加数据库操作
* @return
*/
@PostMapping("/addOrder")
public String addOrder(){
String uniqueKey = String.valueOf(idWorker.nextId());
MessageProperties messageProperties = new MessageProperties();
messageProperties.setMessageId(uniqueKey);
messageProperties.setContentType("text/plain");
messageProperties.setContentEncoding("utf-8");
Message message = new Message("1271700536000909313".getBytes(),messageProperties);
rabbitTemplate.convertAndSend(RabbitMQConfig.REDUCE_STOCK_QUEUE,message);
return "success";
}
批改stockApplication
@Bean
public JedisPool jedisPool(){
return new JedisPool("192.168.200.150",6379);
}
新增音讯监听类
@Component
public class ReduceStockListener {
@Autowired
private StockService stockService;
@Autowired
private JedisPool jedisPool;
@Autowired
private StockFlowService stockFlowService;
@RabbitListener(queues = RabbitMQConfig.REDUCE_STOCK_QUEUE)
@Transactional
public void receiveMessage(Message message){
//获取音讯id
String messageId = message.getMessageProperties().getMessageId();
Jedis jedis = jedisPool.getResource();
System.out.println(messageId);
try {
//redis锁去重校验
if (!"OK".equals(jedis.set(messageId, messageId, "NX", "PX", 300000))){
System.out.println("反复申请");
return;
}
//mysql状态校验
if (!(stockFlowService.findByFlag(messageId).size() == 0)){
System.out.println("数据已解决");
return;
}
String goodsId = null;
try {
//获取音讯体中goodsId
goodsId = new String(message.getBody(),"utf-8");
stockService.reduceStock(goodsId,messageId);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
int nextInt = new Random().nextInt(100);
System.out.println("随机数:"+nextInt);
if (nextInt%2 ==0){
int i= 1/0;
}
} catch (RuntimeException e) {
//解锁
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
jedis.eval(script, Collections.singletonList(messageId), Collections.singletonList(messageId));
System.out.println("出现异常了");
System.out.println(messageId+":开释锁");
throw e;
}
}
}
音讯缓冲区
对于RabbitMQ的应用,默认状况下,每条音讯都会进行别离的ack告诉,生产完一条后,再来生产下一条。然而这样就会造成大量音讯的阻塞状况。所以为了晋升消费者对于音讯的生产速度,能够减少consumer数据或者对音讯进行批量生产。MQ接管到producer发送的音讯后,不会间接推送给consumer。而是积攒到肯定数量后,再进行音讯的发送。 这种形式的实现,能够了解为是一种缓冲区的实现,晋升了音讯的生产速度,然而会在肯定水平上舍弃后果返回的实时性。
对于批量生产来说,也是须要思考幂等的。对于幂等性的解决方案,沿用方才的思路即可解决。
本文由
传智教育博学谷狂野架构师
教研团队公布。如果本文对您有帮忙,欢送
关注
和点赞
;如果您有任何倡议也可留言评论
或私信
,您的反对是我保持创作的能源。转载请注明出处!
发表回复