不懂就问
灵魂拷问之☞导盲犬禁止入内是给犬看的还是盲人看的?
各位巨佬们把答案留在评论区吧
分布式事务
随着互联网疾速倒退,微服务,SOA 等服务架构模式正在被大规模的应用,当初分布式系统个别由多个独立的子系统组成,多个子系统通过网络通信相互合作配合实现各个性能。有很多用例会跨多个子系统能力实现,比拟典型的是电子商务网站的下单领取流程,至多会波及交易系统和领取零碎。而且这个过程中会波及到事务的概念,即保障交易系统和领取零碎的数据一致性,此处咱们称这种跨零碎的事务为分布式事务。具体一点而言,分布式事务是指事务的参与者、反对事务的服务器、资源服务器以及事务管理器别离位于不同的分布式系统的不同节点之上。
解决方案
1.两阶段提交(2PC)
2.弥补事务(TCC)
3.本地音讯表(异步确保)
4.MQ 事务音讯
实现步骤
1.上游服务像音讯服务发送一条预提交音讯
2.音讯服务返回对应的曲剧惟一的音讯ID
3.上游服务实现本身业务,执行本地逻辑,依据本地事务决定提交或者回滚
4.音讯服务依据上游服务响应的后果提交或者回滚(删除音讯)
5.如果上游音讯响应提交则吧音讯发送到MQ
6.发送音讯到MQ后,须要把MQ的Confirm机制关上,针对音讯发送的状态进行回调
7.音讯服务监听MQ回调,依据业务逻辑判断是否须要回滚或者提交,走第4步
8.当上游音讯执行某段业务逻辑可能会抛异样或者其余的谬误,会导致音讯始终都是待提交的状态,须要启动一个后盾定时工作轮询音讯表,把所有未提交的音讯进行确定,依据后果提交或者回滚
实战代码
源码
github源码
csdn源码
1.我的项目构造
源码会上传到github和csdn的资源,能够自行下载,就不提供像maven等相干依赖、配置文件相干的代码了,我的项目整体的架构是Springboot、注册和配置核心Nacos、Redis加上RabbitMQ。须要好哥哥们相熟相干的技术点,后续有工夫一个个来整吧
构造
2.sql语句
CREATE TABLE `message_record` ( `id_` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键ID', `business_id` varchar(64) DEFAULT NULL COMMENT '业务数据ID', `business_type` tinyint(2) DEFAULT NULL COMMENT '业务类型:具体业务', `message_id` varchar(64) NOT NULL COMMENT '音讯ID', `retries_number` tinyint(2) DEFAULT '0' COMMENT '重试次数', `status_` tinyint(2) DEFAULT '0' COMMENT '后果 1 胜利 0 失败', `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创立工夫', PRIMARY KEY (`id_`), UNIQUE KEY `inx_message_id` (`message_id`) USING BTREE) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8 COMMENT='rabbit音讯记录';
3.MQ配置
import com.xjw.config.constant.RabbitmqConstant;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.DirectExchange;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/** * @author xiejianwei * @ClassName BusinessOrderRabbitMqConfig */@Configurationpublic class OrderRabbitMqConfig { /** * 初始化队列 * * @return */ @Bean public Queue orderQueue() { return new Queue(RabbitmqConstant.ORDER_QUEUE, true); } /** * 初始化交换机 * * @return */ @Bean public DirectExchange orderExchange() { return new DirectExchange(RabbitmqConstant.ORDER_EXCHANGE, true, false); } /** * 队列通过路由键绑定到交换机 * * @return */ @Bean public Binding bind() { return BindingBuilder.bind(orderQueue()).to(orderExchange()).with(RabbitmqConstant.ORDER_ROUTING_KEY); }}
3.实体类
import lombok.Getter;import lombok.Setter;import java.util.Date;import java.util.UUID;/** * @author xiejianwei */@Getter@Setterpublic class MessageRecord { /** * 主键ID */ private Long id; /** * 业务数据ID */ private String businessId; /** * 业务类型 */ private int businessType; /** * 音讯ID */ private String messageId; /** * 重试次数 */ private int retriesNumber; /** * 音讯状态 (0.失败,1胜利) */ private int status; /** * 创立工夫 */ private Date createTime; public MessageRecord() { } public MessageRecord(String businessId, int businessType) { this.businessId = businessId; this.businessType = businessType; this.messageId = UUID.randomUUID().toString().replace("-", "").toLowerCase(); this.retriesNumber = 0; this.createTime = new Date(); this.status = 0; }}
import java.math.BigDecimal;/** * @author xiejianwei */@Getter@Setterpublic class Order extends SerializableDto { /** * 订单编号 */ private String orderId; /** * 订单金额 */ private BigDecimal amount; /** * 做简略的例子就不关联业务ID了 */ private String productName;}
4.业务实现
import com.xjw.entity.pojo.MessageRecord;import com.xjw.entity.pojo.Order;import com.xjw.service.MessageRecordService;import com.xjw.service.OrderService;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;import org.springframework.transaction.annotation.Transactional;/** * @author xiejianwei */@Service@Slf4jpublic class OrderServiceImpl implements OrderService { @Autowired public MessageRecordService messageRecordService; /** * 模仿发动一个简略的订单 * * @param order * @return */ @Override @Transactional(rollbackFor = Exception.class) public boolean start(Order order) { //触发保留本地音讯表 MessageRecord messageRecord = new MessageRecord(order.getOrderId(), 1); messageRecordService.preCommit(messageRecord); log.info("这里能够做本地业务操作"); log.info("下单中,请稍等-----"); log.info("祝贺您,下单胜利,订单号:{}", order.getOrderId()); // 操作本地事务胜利则commit 音讯,如果解决本地事务异样,则会有定时工作回调 messageRecordService.commit(messageRecord.getMessageId(), true); return true; }}
import com.alibaba.fastjson.JSON;import com.xjw.config.constant.RabbitmqConstant;import com.xjw.entity.pojo.MessageRecord;import com.xjw.mapper.MessageRecordMapper;import com.xjw.service.MessageRecordService;import com.xjw.service.RabbitmqService;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;import java.util.List;/** * @author xiejianwei */@Servicepublic class MessageRecordServiceImpl implements MessageRecordService { @Autowired public MessageRecordMapper messageRecordMapper; @Autowired public RabbitmqService rabbitmqService; @Override public boolean preCommit(MessageRecord messageRecord) { return messageRecordMapper.insert(messageRecord); } @Override public boolean commit(String messageId, boolean commitFlag) { /** * 不提交则代表回滚 */ if (!commitFlag) { messageRecordMapper.delete(messageId); return true; } // 提交音讯到MQ MessageRecord messageRecord = messageRecordMapper.find(messageId); /** * 发送MQ音讯 * 将惟一音讯ID设置给CorrelationData * 回调时能够用这个ID查找到数据对应的音讯记录 */ rabbitmqService.sendMessage(RabbitmqConstant.ORDER_EXCHANGE, RabbitmqConstant.ORDER_ROUTING_KEY, JSON.toJSONString(messageRecord), new CorrelationData(messageRecord.getMessageId())); return true; } @Override public void update(String messageId) { messageRecordMapper.update(messageId); } @Override public MessageRecord find(String messageId) { return messageRecordMapper.find(messageId); } @Override public List<MessageRecord> findAll(int status) { return messageRecordMapper.findAll(status); }}
import com.xjw.callback.RabbitMqConfirmCallback;import com.xjw.service.RabbitmqService;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;/** * @author xiejianwei * @ClassName RabbitmqServiceImpl * @Description 发送mq音讯 */@Servicepublic class RabbitmqServiceImpl implements RabbitmqService { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private RabbitMqConfirmCallback rabbitMqConfirmCallback; /** * 发送音讯到mq(单个) * * @param exchange 交换机的名称 * @param routingKey 路由key值 * @param messages 音讯的附件音讯 */ @Override public void sendMessage(String exchange, String routingKey, String messages, CorrelationData correlationData) { /** * 设置回调 */ rabbitTemplate.setConfirmCallback(rabbitMqConfirmCallback); rabbitTemplate.convertAndSend(exchange, routingKey, messages, correlationData); }}
5.接口治理
import com.xjw.entity.pojo.Order;import com.xjw.entity.vo.R;import com.xjw.service.OrderService;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.validation.annotation.Validated;import org.springframework.web.bind.annotation.PostMapping;import org.springframework.web.bind.annotation.RequestBody;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import java.math.BigDecimal;import java.util.UUID;/** * 订单接口治理 * * @author xiejianwei */@RestController@RequestMapping("/order")@Validatedpublic class OrderController { @Autowired public OrderService orderService; @PostMapping("/start") public R page(@RequestBody String productName) { Order order = new Order(); order.setAmount(BigDecimal.valueOf(5000)); order.setProductName(productName); order.setOrderId(UUID.randomUUID().toString().replace("-", "").toLowerCase()); orderService.start(order); return R.success(); }}
6.mq/本地音讯回调
import com.alibaba.fastjson.JSON;import com.xjw.config.constant.RabbitmqConstant;import com.xjw.entity.pojo.MessageRecord;import com.xjw.service.MessageRecordService;import com.xjw.service.RabbitmqService;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;/** * @author xiejianwei */@Componentpublic class RabbitMqConfirmCallback implements RabbitTemplate.ConfirmCallback { @Autowired private MessageRecordService messageRecordService; @Autowired public RabbitmqService rabbitmqService; /** * @param correlationData 相干配置信息 * @param ack 交换机是否胜利收到音讯 * @param cause 错误信息 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { /** * 这个就是咱们发送音讯设置的messageId */ String messageId = correlationData.getId(); // 未发送胜利 if (!ack) { MessageRecord messageRecord = messageRecordService.find(messageId); if (null != messageRecord) { // 重发 rabbitmqService.sendMessage(RabbitmqConstant.ORDER_EXCHANGE, RabbitmqConstant.ORDER_ROUTING_KEY, JSON.toJSONString(messageRecord), new CorrelationData(messageRecord.getMessageId())); } } else { // 批改音讯状态为胜利 messageRecordService.update(messageId); } }}
/** * 依据具体的业务,判断是否须要提交或者回滚音讯 * * @author xiejianwei */@Componentpublic class OrderMessageRecordConfirm implements MessageRecordCallback { @Override public boolean confirm(MessageRecord messageRecord) { String messageId = messageRecord.getMessageId(); /** * 依据具体的业务,判断是否须要提交或者回滚音讯 */ if ("1212321".equals(messageId)) { return true; } return false; }}
7.定时工作
import com.xjw.callback.MessageRecordCallback;import com.xjw.entity.pojo.MessageRecord;import com.xjw.service.MessageRecordService;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.scheduling.annotation.EnableScheduling;import org.springframework.scheduling.annotation.Scheduled;import org.springframework.stereotype.Component;import java.util.List;/** * @author xiejianwei */@Component@EnableSchedulingpublic class MessageRecordConfirmTask { @Autowired public MessageRecordService messageRecordService; @Autowired public MessageRecordCallback messageRecordCallback; /** * 每隔5分钟轮询音讯表 */ @Scheduled(cron = "0 0/5 * * * ?") public void confirm() { // 查问所有状态等于0(未提交的状态) List<MessageRecord> all = messageRecordService.findAll(0); if (null != all && all.size() > 0) { all.forEach(messageRecord -> { boolean confirm = messageRecordCallback.confirm(messageRecord); // 依据回调后果执行提交或者回滚 messageRecordService.commit(messageRecord.getMessageId(), confirm); }); } }}
本期到这里啦,写的不对的中央巨佬们多多指导,喜爱的话来一个一键三连吧