关于java:项目实战有一说一这才是RabbitMQ实现分布式事务的正确姿势附源码

40次阅读

共计 9817 个字符,预计需要花费 25 分钟才能阅读完成。

不懂就问

灵魂拷问之☞导盲犬禁止入内是给犬看的还是盲人看的?
各位巨佬们把答案留在评论区吧

分布式事务

随着互联网疾速倒退,微服务,SOA 等服务架构模式正在被大规模的应用,当初分布式系统个别由多个独立的子系统组成,多个子系统通过网络通信相互合作配合实现各个性能。有很多用例会跨多个子系统能力实现,比拟典型的是电子商务网站的下单领取流程,至多会波及交易系统和领取零碎。而且这个过程中会波及到事务的概念,即保障交易系统和领取零碎的数据一致性,此处咱们称这种跨零碎的事务为分布式事务。具体一点而言,分布式事务是指事务的参与者、反对事务的服务器、资源服务器以及事务管理器别离位于不同的分布式系统的不同节点之上。

解决方案

1. 两阶段提交(2PC)
2. 弥补事务(TCC)
3. 本地音讯表(异步确保)
4.MQ 事务音讯

实现步骤

1. 上游服务像音讯服务发送一条预提交音讯
2. 音讯服务返回对应的曲剧惟一的音讯 ID
3. 上游服务实现本身业务,执行本地逻辑,依据本地事务决定提交或者回滚
4. 音讯服务依据上游服务响应的后果提交或者回滚(删除音讯)
5. 如果上游音讯响应提交则吧音讯发送到 MQ
6. 发送音讯到 MQ 后,须要把 MQ 的 Confirm 机制关上,针对音讯发送的状态进行回调
7. 音讯服务监听 MQ 回调,依据业务逻辑判断是否须要回滚或者提交,走第 4 步
8. 当上游音讯执行某段业务逻辑可能会抛异样或者其余的谬误,会导致音讯始终都是待提交的状态,须要启动一个后盾定时工作轮询音讯表,把所有未提交的音讯进行确定,依据后果提交或者回滚

实战代码

源码

github 源码
csdn 源码

1. 我的项目构造

源码会上传到 github 和 csdn 的资源,能够自行下载,就不提供像 maven 等相干依赖、配置文件相干的代码了,我的项目整体的架构是 Springboot、注册和配置核心 Nacos、Redis 加上 RabbitMQ。须要好哥哥们相熟相干的技术点,后续有工夫一个个来整吧
构造

2.sql 语句
CREATE TABLE `message_record` (`id_` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键 ID',
  `business_id` varchar(64) DEFAULT NULL COMMENT '业务数据 ID',
  `business_type` tinyint(2) DEFAULT NULL COMMENT '业务类型:具体业务',
  `message_id` varchar(64) NOT NULL COMMENT '音讯 ID',
  `retries_number` tinyint(2) DEFAULT '0' COMMENT '重试次数',
  `status_` tinyint(2) DEFAULT '0' COMMENT '后果 1 胜利  0 失败',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创立工夫',
  PRIMARY KEY (`id_`),
  UNIQUE KEY `inx_message_id` (`message_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8 COMMENT='rabbit 音讯记录';
3.MQ 配置
import com.xjw.config.constant.RabbitmqConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author xiejianwei
 * @ClassName BusinessOrderRabbitMqConfig
 */
@Configuration
public class OrderRabbitMqConfig {

    /**
     * 初始化队列
     *
     * @return
     */
    @Bean
    public Queue orderQueue() {return new Queue(RabbitmqConstant.ORDER_QUEUE, true);
    }

    /**
     * 初始化交换机
     *
     * @return
     */
    @Bean
    public DirectExchange orderExchange() {return new DirectExchange(RabbitmqConstant.ORDER_EXCHANGE, true, false);
    }

    /**
     * 队列通过路由键绑定到交换机
     *
     * @return
     */
    @Bean
    public Binding bind() {return BindingBuilder.bind(orderQueue()).to(orderExchange()).with(RabbitmqConstant.ORDER_ROUTING_KEY);
    }
}
3. 实体类
import lombok.Getter;
import lombok.Setter;

import java.util.Date;
import java.util.UUID;

/**
 * @author xiejianwei
 */
@Getter
@Setter
public class MessageRecord {

    /**
     * 主键 ID
     */
    private Long id;

    /**
     * 业务数据 ID
     */
    private String businessId;
    /**
     * 业务类型
     */
    private int businessType;
    /**
     * 音讯 ID
     */
    private String messageId;
    /**
     * 重试次数
     */
    private int retriesNumber;
    /**
     * 音讯状态 (0. 失败,1 胜利)
     */
    private int status;
    /**
     * 创立工夫
     */
    private Date createTime;

    public MessageRecord() {}

    public MessageRecord(String businessId, int businessType) {
        this.businessId = businessId;
        this.businessType = businessType;
        this.messageId = UUID.randomUUID().toString().replace("-", "").toLowerCase();
        this.retriesNumber = 0;
        this.createTime = new Date();
        this.status = 0;
    }
}
import java.math.BigDecimal;

/**
 * @author xiejianwei
 */
@Getter
@Setter
public class Order extends SerializableDto {

    /**
     * 订单编号
     */
    private String orderId;

    /**
     * 订单金额
     */
    private BigDecimal amount;

    /**
     * 做简略的例子就不关联业务 ID 了
     */
    private String productName;
}
4. 业务实现
import com.xjw.entity.pojo.MessageRecord;
import com.xjw.entity.pojo.Order;
import com.xjw.service.MessageRecordService;
import com.xjw.service.OrderService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;


/**
 * @author xiejianwei
 */
@Service
@Slf4j
public class OrderServiceImpl implements OrderService {


    @Autowired
    public MessageRecordService messageRecordService;

    /**
     * 模仿发动一个简略的订单
     *
     * @param order
     * @return
     */
    @Override
    @Transactional(rollbackFor = Exception.class)
    public boolean start(Order order) {
        // 触发保留本地音讯表
        MessageRecord messageRecord = new MessageRecord(order.getOrderId(), 1);
        messageRecordService.preCommit(messageRecord);
        log.info("这里能够做本地业务操作");
        log.info("下单中,请稍等 -----");
        log.info("祝贺您,下单胜利,订单号:{}", order.getOrderId());
        // 操作本地事务胜利则 commit 音讯,如果解决本地事务异样,则会有定时工作回调
        messageRecordService.commit(messageRecord.getMessageId(), true);
        return true;
    }
}
import com.alibaba.fastjson.JSON;
import com.xjw.config.constant.RabbitmqConstant;
import com.xjw.entity.pojo.MessageRecord;
import com.xjw.mapper.MessageRecordMapper;
import com.xjw.service.MessageRecordService;
import com.xjw.service.RabbitmqService;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.List;

/**
 * @author xiejianwei
 */
@Service
public class MessageRecordServiceImpl implements MessageRecordService {

    @Autowired
    public MessageRecordMapper messageRecordMapper;

    @Autowired
    public RabbitmqService rabbitmqService;

    @Override
    public boolean preCommit(MessageRecord messageRecord) {return messageRecordMapper.insert(messageRecord);
    }

    @Override
    public boolean commit(String messageId, boolean commitFlag) {
        /**
         * 不提交则代表回滚
         */
        if (!commitFlag) {messageRecordMapper.delete(messageId);
            return true;
        }
        // 提交音讯到 MQ
        MessageRecord messageRecord = messageRecordMapper.find(messageId);
        /**
         * 发送 MQ 音讯
         * 将惟一音讯 ID 设置给 CorrelationData
         * 回调时能够用这个 ID 查找到数据对应的音讯记录
         */
        rabbitmqService.sendMessage(RabbitmqConstant.ORDER_EXCHANGE, RabbitmqConstant.ORDER_ROUTING_KEY, JSON.toJSONString(messageRecord), new CorrelationData(messageRecord.getMessageId()));
        return true;
    }

    @Override
    public void update(String messageId) {messageRecordMapper.update(messageId);
    }

    @Override
    public MessageRecord find(String messageId) {return messageRecordMapper.find(messageId);
    }

    @Override
    public List<MessageRecord> findAll(int status) {return messageRecordMapper.findAll(status);
    }
}
import com.xjw.callback.RabbitMqConfirmCallback;
import com.xjw.service.RabbitmqService;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * @author xiejianwei
 * @ClassName RabbitmqServiceImpl
 * @Description 发送 mq 音讯
 */
@Service
public class RabbitmqServiceImpl implements RabbitmqService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private RabbitMqConfirmCallback rabbitMqConfirmCallback;

    /**
     * 发送音讯到 mq(单个)
     *
     * @param exchange   交换机的名称
     * @param routingKey 路由 key 值
     * @param messages   音讯的附件音讯
     */
    @Override
    public void sendMessage(String exchange, String routingKey, String messages, CorrelationData correlationData) {
        /**
         * 设置回调
         */
        rabbitTemplate.setConfirmCallback(rabbitMqConfirmCallback);
        rabbitTemplate.convertAndSend(exchange, routingKey, messages, correlationData);
    }
}
5. 接口治理
import com.xjw.entity.pojo.Order;
import com.xjw.entity.vo.R;
import com.xjw.service.OrderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.math.BigDecimal;
import java.util.UUID;

/**
 * 订单接口治理
 *
 * @author xiejianwei
 */
@RestController
@RequestMapping("/order")
@Validated
public class OrderController {

    @Autowired
    public OrderService orderService;

    @PostMapping("/start")
    public R page(@RequestBody String productName) {Order order = new Order();
        order.setAmount(BigDecimal.valueOf(5000));
        order.setProductName(productName);
        order.setOrderId(UUID.randomUUID().toString().replace("-", "").toLowerCase());
        orderService.start(order);
        return R.success();}
}
6.mq/ 本地音讯回调
import com.alibaba.fastjson.JSON;
import com.xjw.config.constant.RabbitmqConstant;
import com.xjw.entity.pojo.MessageRecord;
import com.xjw.service.MessageRecordService;
import com.xjw.service.RabbitmqService;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @author xiejianwei
 */
@Component
public class RabbitMqConfirmCallback implements RabbitTemplate.ConfirmCallback {

    @Autowired
    private MessageRecordService messageRecordService;

    @Autowired
    public RabbitmqService rabbitmqService;

    /**
     * @param correlationData 相干配置信息
     * @param ack             交换机是否胜利收到音讯
     * @param cause           错误信息
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        /**
         * 这个就是咱们发送音讯设置的 messageId
         */
        String messageId = correlationData.getId();
        // 未发送胜利
        if (!ack) {MessageRecord messageRecord = messageRecordService.find(messageId);
            if (null != messageRecord) {
                // 重发
                rabbitmqService.sendMessage(RabbitmqConstant.ORDER_EXCHANGE, RabbitmqConstant.ORDER_ROUTING_KEY, JSON.toJSONString(messageRecord), new CorrelationData(messageRecord.getMessageId()));
            }
        } else {
            // 批改音讯状态为胜利
            messageRecordService.update(messageId);
        }
    }
}
/**
 * 依据具体的业务,判断是否须要提交或者回滚音讯
 *
 * @author xiejianwei
 */
@Component
public class OrderMessageRecordConfirm implements MessageRecordCallback {

    @Override
    public boolean confirm(MessageRecord messageRecord) {String messageId = messageRecord.getMessageId();
        /**
         * 依据具体的业务,判断是否须要提交或者回滚音讯
         */
        if ("1212321".equals(messageId)) {return true;}
        return false;
    }
}
7. 定时工作
import com.xjw.callback.MessageRecordCallback;
import com.xjw.entity.pojo.MessageRecord;
import com.xjw.service.MessageRecordService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.List;

/**
 * @author xiejianwei
 */
@Component
@EnableScheduling
public class MessageRecordConfirmTask {

    @Autowired
    public MessageRecordService messageRecordService;

    @Autowired
    public MessageRecordCallback messageRecordCallback;

    /**
     * 每隔 5 分钟轮询音讯表
     */
    @Scheduled(cron = "0 0/5 * * * ?")
    public void confirm() {// 查问所有状态等于 0(未提交的状态)
        List<MessageRecord> all = messageRecordService.findAll(0);
        if (null != all && all.size() > 0) {
            all.forEach(messageRecord -> {boolean confirm = messageRecordCallback.confirm(messageRecord);
                // 依据回调后果执行提交或者回滚
                messageRecordService.commit(messageRecord.getMessageId(), confirm);
            });
        }
    }
}

本期到这里啦,写的不对的中央巨佬们多多指导,喜爱的话来一个一键三连吧

正文完
 0