paascloud开源项目学习(1) — 用户邮箱注册可靠消息服务流程

25次阅读

共计 19960 个字符,预计需要花费 50 分钟才能阅读完成。

用户注册

github 开源项目 –paascloud-master:https://github.com/paascloud/…

分布式解决方案 – 基于可靠消息的最终一致性:https://github.com/paascloud/…

本篇文章目的是理解该项目可靠消息服务中心 (TCP) 发送消息、消费消息的流程,用户注册发送激活邮箱和激活后发送注册成功邮箱都是利用可靠消息服务来解决分布式事务,理解了该流程也就弄懂了该项目中其他业务流程。
发送激活邮箱过程

消息生产端:UAC

可靠消息服务:TPC

消息服务端:OPC

用户注册后,向注册邮箱发送一封激活邮箱。
消息生产端(UAC)
大致流程为:

本地服务 UAC 先持久化 预发送消息(等待确认消息),表 pc_mq_message_data;
调用远端可靠消息服务 TPC 持久化预发送消息,可靠消息表 pc_tpc_mq_message;
执行本地事务即 保存用户信息;
调用远端可靠消息服务 TPC 更新第 2 步中的等待确认状态为发送中 sending;
同时创建消费待确认列表,即持久化该 Topic 类型的消息被哪些消费者订阅监听的所有消费待确认列表,状态为未确认,表 pc_tpc_mq_confirm;
完成上面操作后,发送消息到 RocketMQ。

controller 层
AuthRestController.java
@PostMapping(value = “/register”)
@ApiOperation(httpMethod = “POST”, value = “ 注册用户 ”)
public Wrapper registerUser(UserRegisterDto user) {
uacUserService.register(user);
return WrapMapper.ok();
}
service 层

用户 ID 生成:雪花算法生成分布式唯一 ID

用户密码加密:SpringSecurity BCryptPasswordEncoder 强哈希方法加密,每次加密的结果都不一样。

bcrypt 可以有效抵御彩虹表暴力破解,其原理就是在加盐的基础上多次 hash,关于密码参考:https://mp.weixin.qq.com/s/Dk…

Redis 存储激活邮箱 token:key(active_token):email: 过期时间 1 天,即激活接口参数:activeUserToken;
生成邮件发送模板(freeMarker):activeUserTemplate.ftl

根据上面模板和发送邮件参数生成实体:MqMessageData(pc_mq_message_data)

各个子系统消息落地的消息表,比如用户服务系统主要就是邮件消息、短信消息等。
@Override
public void register(UserRegisterDto registerDto) {
// 校验注册信息
validateRegisterInfo(registerDto);
String mobileNo = registerDto.getMobileNo();
String email = registerDto.getEmail();
Date row = new Date();
String salt = String.valueOf(generateId());
// 封装注册信息
long id = generateId(); // id 雪花算法生成
UacUser uacUser = new UacUser();
uacUser.setLoginName(registerDto.getLoginName());
uacUser.setSalt(salt);
uacUser.setLoginPwd(Md5Util.encrypt(registerDto.getLoginPwd()));
uacUser.setMobileNo(mobileNo);
uacUser.setStatus(UacUserStatusEnum.DISABLE.getKey());
uacUser.setUserSource(UacUserSourceEnum.REGISTER.getKey());
uacUser.setCreatedTime(row);
uacUser.setUpdateTime(row);
uacUser.setEmail(email);
uacUser.setId(id);
uacUser.setCreatorId(id);
uacUser.setCreator(registerDto.getLoginName());
uacUser.setLastOperatorId(id);
uacUser.setUserName(registerDto.getLoginName());
uacUser.setLastOperator(registerDto.getLoginName());

// 发送激活邮件
String activeToken = PubUtils.uuid() + super.generateId();
redisService.setKey(RedisKeyUtil.getActiveUserKey(activeToken), email, 1, TimeUnit.DAYS);

Map<String, Object> param = Maps.newHashMap();
param.put(“loginName”, registerDto.getLoginName());
param.put(“email”, registerDto.getEmail());
param.put(“activeUserUrl”, activeUserUrl + activeToken);
param.put(“dateTime”, DateUtil.formatDateTime(new Date()));

Set<String> to = Sets.newHashSet();
to.add(registerDto.getEmail());

MqMessageData mqMessageData = emailProducer.sendEmailMq(to, UacEmailTemplateEnum.ACTIVE_USER, AliyunMqTopicConstants.MqTagEnum.ACTIVE_USER, param);
// 即下面的第 6 步
userManager.register(mqMessageData, uacUser);
}

userManager.register() 通过注解 @MqProducerStore 发送消息服务。
执行该方法前,先进入切面编程
@MqProducerStore
public void register(final MqMessageData mqMessageData, final UacUser uacUser) {
log.info(“ 注册用户. mqMessageData={}, user={}”, mqMessageData, uacUser);
uacUserMapper.insertSelective(uacUser);
}
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface MqProducerStore {
// WAIT_CONFIRM:等待确认;SAVE_AND_SEND:直接发送;
MqSendTypeEnum sendType() default MqSendTypeEnum.WAIT_CONFIRM;
// ORDER(1):有序;DIS_ORDER(0):无序
MqOrderTypeEnum orderType() default MqOrderTypeEnum.ORDER;
// Rocketmq 默认延时级别
// ZERO(0, 不延时);ONE(1, 1 秒)….EIGHTEEN(18, 2 小时)
DelayLevelEnum delayLevel() default DelayLevelEnum.ZERO;
}

切面中,因为邮件激活发送消息类型为默认的:等待确认。

此处本地服务 UAC 消息落地:保存待确认消息 MqMessageData 到 mysql,表 pc_mq_message_data;
发送待确认消息到可靠消息系统(TPC):发送预发送状态的消息给消息中心

// 切面
MqMessageData domain = null;
for (Object object : args) {
if (object instanceof MqMessageData) {
domain = (MqMessageData) object;
break;
}
}
domain.setOrderType(orderType);
domain.setProducerGroup(producerGroup);
// 1. 等待确认
if (type == MqSendTypeEnum.WAIT_CONFIRM) {
if (delayLevelEnum != DelayLevelEnum.ZERO) {
domain.setDelayLevel(delayLevelEnum.delayLevel());
}
// 1.1 发送待确认消息到可靠消息系统
// 本地服务消息落地,可靠消息服务中心也持久化预发送消息,但是不发送
mqMessageService.saveWaitConfirmMessage(domain);
}
result = joinPoint.proceed(); // 返回注解方法,执行业务
@Override
@Transactional(rollbackFor = Exception.class)
public void saveWaitConfirmMessage(final MqMessageData mqMessageData) {
// 1. 持久化到本地 mysql
this.saveMqProducerMessage(mqMessageData);
// 2. 发送预发送状态的消息给消息中心
TpcMqMessageDto tpcMqMessageDto = mqMessageData.getTpcMqMessageDto();
// 3. 调用远端可靠消息服务(tpc),持久化等待确认消息
tpcMqMessageFeignApi.saveMessageWaitingConfirm(tpcMqMessageDto);
// 4. mqMessageData 此时为调用远端服务返回来的数据
log.info(“<== saveWaitConfirmMessage – 存储预发送消息成功. messageKey={}”, mqMessageData.getMessageKey());
}
紧接着第 7 步,调用远端可靠消息服务(TCP),此时只是持久化预发送消息,但是没有发送(等执行完本地事务即保存用户后在发送,即第 9 步)

持久化 TpcMqMessage,即 pc_tpc_mq_message(可靠消息表)
@Override
public void saveMessageWaitingConfirm(TpcMqMessageDto messageDto) {

if (StringUtils.isEmpty(messageDto.getMessageTopic())) {
throw new TpcBizException(ErrorCodeEnum.TPC10050001);
}

Date now = new Date();
TpcMqMessage message = new ModelMapper().map(messageDto, TpcMqMessage.class);
// 消息状态:WAIT_SEND(10, “ 未发送 ”);SENDING(20, “ 已发送 ”);FINISH(30, “ 已完成 ”);
message.setMessageStatus(MqSendStatusEnum.WAIT_SEND.sendStatus());
message.setUpdateTime(now);
message.setCreatedTime(now);
tpcMqMessageMapper.insertSelective(message);
}
上面执行完后,返回注解 @MqProducerStore 所在方法,执行本地事务:保存用户到 mysql。
result = joinPoint.proceed(); // 返回注解方法,执行业务
第 9 步执行完后,再次进入切面,发送确认消息给可靠消息服务中心
result = joinPoint.proceed(); // 返回注解方法,执行业务
// 2. 直接发送
if (type == MqSendTypeEnum.SAVE_AND_SEND) {
mqMessageService.saveAndSendMessage(domain);
// 3. XXX
} else if (type == MqSendTypeEnum.DIRECT_SEND) {
mqMessageService.directSendMessage(domain);
} else {// type = WAIT_CONFIRM
final MqMessageData finalDomain = domain;
taskExecutor.execute(() -> mqMessageService.confirmAndSendMessage(finalDomain.getMessageKey()));
}
return result;
紧接着上面,可靠消息服务中心(TCP):根据传过来的 messageKey 确认并发送之前已经持久化的预发送消息。
// TpcMqMessageFeignClient.java
@Override
@ApiOperation(httpMethod = “POST”, value = “ 确认并发送消息 ”)
public Wrapper confirmAndSendMessage(@RequestParam(“messageKey”) String messageKey) {
logger.info(“ 确认并发送消息. messageKey={}”, messageKey);
tpcMqMessageService.confirmAndSendMessage(messageKey);
return WrapMapper.ok();
}

// TpcMqMessageServiceImpl.java
@Override
public void confirmAndSendMessage(String messageKey) {
final TpcMqMessage message = tpcMqMessageMapper.getByMessageKey(messageKey);
if (message == null) {
throw new TpcBizException(ErrorCodeEnum.TPC10050002);
}

TpcMqMessage update = new TpcMqMessage();
update.setMessageStatus(MqSendStatusEnum.SENDING.sendStatus());
update.setId(message.getId());
update.setUpdateTime(new Date());
// 1. 更新消息状态为:SENDING
tpcMqMessageMapper.updateByPrimaryKeySelective(update);
// 2. 创建消费待确认列表(此处 topic:SEND_EMAIL_TOPIC)
this.createMqConfirmListByTopic(message.getMessageTopic(), message.getId(), message.getMessageKey());
// 3. 直接发送消息
this.directSendMessage(message.getMessageBody(), message.getMessageTopic(),
message.getMessageTag(), message.getMessageKey(), message.getProducerGroup(), message.getDelayLevel());
}
第 11 步中的第 2 点:TCP 服务中,创建消费待确认列表,根据表 pc_tpc_mq_subscribe,查询出不同 topic 下相对应的所有 consumer_code(消费监听者),即设置该消息被哪些服务(CID)监听消费;

– “SEND_EMAIL_TOPIC“ –> “CID_OPC“:该消息会被 `consumerGroup` 为 `CID_OPC` 的服务监听并消费。
– 同时,保存 ** 确认消息 **:“TpcMqConfirm“ –> 表 “pc_tpc_mq_confirm“
@Override
public void createMqConfirmListByTopic(final String topic, final Long messageId, final String messageKey) {
List<TpcMqConfirm> list = Lists.newArrayList();
TpcMqConfirm tpcMqConfirm;
List<String> consumerGroupList = tpcMqConsumerService.listConsumerGroupByTopic(topic);
if (PublicUtil.isEmpty(consumerGroupList)) {
throw new TpcBizException(ErrorCodeEnum.TPC100500010, topic);
}
for (final String cid : consumerGroupList) {
tpcMqConfirm = new TpcMqConfirm(UniqueIdGenerator.generateId(), messageId, messageKey, cid);
list.add(tpcMqConfirm);
}

tpcMqConfirmMapper.batchCreateMqConfirm(list);
}
第 11 步中的第 3 点:完成上面操作后,直接发送消息到中间件 RocketMQ 队列中。
@Override
public void directSendMessage(String body, String topic, String tag, String key,
String pid, Integer delayLevel) {
RocketMqProducer.sendSimpleMessage(body, topic, tag, key, pid, delayLevel);
}

// 核心方法:重试发送消息(重试次数 3 次)
// pid:producerGroup –> 发送邮件服务是 PID_UAC
// cid: consumerGroup –> 监听邮件消息服务是 CID_OPC
private static SendResult retrySendMessage(String pid, Message msg) {
int iniCount = 1;
SendResult result;
while (true) {
try {
// Message 中属性
result = MqProducerBeanFactory.getBean(pid).send(msg);
break;
} catch (Exception e) {
log.error(“ 发送消息失败:”, e);
if (iniCount++ >= PRODUCER_RETRY_TIMES) {
throw new TpcBizException(ErrorCodeEnum.TPC100500014, msg.getTopic(), msg.getKeys());
}
}
}
log.info(“<== 发送 MQ SendResult={}”, result);
return result;
}
消息消费端(OPC)
大致流程:

OPC 服务通过配置类 AliyunMqConfiguration.java 启动 DefaultMQPushConsumer RocketMQ 消费端,并设置消息逻辑处理监听器 OptPushMessageListener;

本地服务 OPC 持久化消费者确认消息,表 pc_mq_message_data;
调用远端可靠消息服务 TPC,更新之前生产端持久化的消费确认列表状态,未确认 –> 已确认,表 pc_tpc_mq_confirm;
接着就可以发送激活邮箱;
如果发送成功,调用远端可靠消息服务 TPC,继续更新第 3 步表中消费确认消息的状态为已消费;

消费端 RocketMQ 启动配置类

DefaultMQPushConsumer 根据配置信息启动;
并 subscribe 订阅 该服务 OPC 所有的 Topic 和 tags 消息。

包括短信、邮箱激活、附件更新删除等所有消息。
@Slf4j
@Configuration
public class AliyunMqConfiguration {

@Resource
private PaascloudProperties paascloudProperties;

@Resource
private OptPushMessageListener optPushConsumer;

@Resource
private TaskExecutor taskExecutor;

/**
* Default mq push consumer default mq push consumer.
*
* @return the default mq push consumer
*
* @throws MQClientException the mq client exception
*/
@Bean
public DefaultMQPushConsumer defaultMQPushConsumer() throws MQClientException {
// 1. 新建消费者组
// RocketMQ 实际上都是拉模式,这里的 DefaultMQPushConsumer 实现了推模式,
// 也只是对拉消息服务做了一层封装,即拉到消息的时候触发业务消费者注册到这里的 callback
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(paascloudProperties.getAliyun().getRocketMq().getConsumerGroup());
// 2. 指定 NameServer 地址,多个地址以 ; 隔开
consumer.setNamesrvAddr(paascloudProperties.getAliyun().getRocketMq().getNamesrvAddr());
// 3. 设置 Consumer 第一次启动是从队列头部开始消费还是队列尾部开始消费
// 如果非第一次启动,那么按照上次消费的位置继续消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

String[] strArray = AliyunMqTopicConstants.ConsumerTopics.OPT.split(GlobalConstant.Symbol.COMMA);
for (String aStrArray : strArray) {
String[] topicArray = aStrArray.split(GlobalConstant.Symbol.AT);
String topic = topicArray[0];
String tags = topicArray[1];
if (PublicUtil.isEmpty(tags)) {
tags = “*”;
}
// 4. 进行 Topic 订阅,订阅 PushTopic 下 Tag 为 push 的消息
consumer.subscribe(topic, tags);
log.info(“RocketMq OpcPushConsumer topic = {}, tags={}”, topic, tags);
}

// 5. 设置消息处理器
consumer.registerMessageListener(optPushConsumer);
consumer.setConsumeThreadMax(2);
consumer.setConsumeThreadMin(2);

taskExecutor.execute(() -> {
try {
Thread.sleep(5000);
consumer.start();
log.info(“RocketMq OpcPushConsumer OK.”);
} catch (InterruptedException | MQClientException e) {
log.error(“RocketMq OpcPushConsumer, 出现异常 ={}”, e.getMessage(), e);
}
});
return consumer;
}
}
消息逻辑处理监听器

consumeMessage() 上有注解 @MqConsumerStore,执行前先进入切面编程;
@Slf4j
@Component
public class OptPushMessageListener implements MessageListenerConcurrently {

@Resource
private OptSendSmsTopicConsumer optSendSmsTopicService;
@Resource
private OptSendEmailTopicConsumer optSendEmailTopicService;
@Resource
private MdcTopicConsumer mdcTopicConsumer;

@Resource
private MqMessageService mqMessageService;
@Resource
private StringRedisTemplate srt;

/**
* Consume message consume concurrently status.
*
* @param messageExtList the message ext list
* @param consumeConcurrentlyContext the consume concurrently context
*
* @return the consume concurrently status
*/
@Override
@MqConsumerStore
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messageExtList, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
MessageExt msg = messageExtList.get(0);
String body = new String(msg.getBody());
String topicName = msg.getTopic();
String tags = msg.getTags();
String keys = msg.getKeys();
log.info(“MQ 消费 Topic={},tag={},key={}”, topicName, tags, keys);
ValueOperations<String, String> ops = srt.opsForValue();
// 控制幂等性使用的 key
try {
MqMessage.checkMessage(body, topicName, tags, keys);
String mqKV = null;
if (srt.hasKey(keys)) {
mqKV = ops.get(keys);
}
if (PublicUtil.isNotEmpty(mqKV)) {
log.error(“MQ 消费 Topic={},tag={},key={}, 重复消费 ”, topicName, tags, keys);

// 消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
if (AliyunMqTopicConstants.MqTopicEnum.SEND_SMS_TOPIC.getTopic().equals(topicName)) {
optSendSmsTopicService.handlerSendSmsTopic(body, topicName, tags, keys);
}
if (AliyunMqTopicConstants.MqTopicEnum.SEND_EMAIL_TOPIC.getTopic().equals(topicName)) {
optSendEmailTopicService.handlerSendEmailTopic(body, topicName, tags, keys);
}
if (AliyunMqTopicConstants.MqTopicEnum.TPC_TOPIC.getTopic().equals(topicName)) {
mqMessageService.deleteMessageTopic(body, tags);
}
if (AliyunMqTopicConstants.MqTopicEnum.MDC_TOPIC.getTopic().equals(topicName)) {
mdcTopicConsumer.handlerSendSmsTopic(body, topicName, tags, keys);
} else {
log.info(“OPC 订单信息消 topicName={} 不存在 ”, topicName);
}
} catch (IllegalArgumentException ex) {
log.error(“ 校验 MQ message 失败 ex={}”, ex.getMessage(), ex);
} catch (Exception e) {
log.error(“ 处理 MQ message 失败 topicName={}, keys={}, ex={}”, topicName, keys, e.getMessage(), e);
// 如果消息消费失败,例如数据库异常等,扣款失败,发送失败需要重试的场景,
// 返回下面代码,RocketMQ 就认为消费失败。
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
ops.set(keys, keys, 10, TimeUnit.DAYS);
// 业务实现消费回调的时候,当且仅当返回下面代码时,RocketMQ 才会认为这批消息是消费完成的
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
执行方法之前先进入切面编程执行,获取注解方法的参数和消息;
@Around(value = “mqConsumerStoreAnnotationPointcut()”)
public Object processMqConsumerStoreJoinPoint(ProceedingJoinPoint joinPoint) throws Throwable {
// …
MqMessageData dto = this.getTpcMqMessageDto(messageExtList.get(0));
final String messageKey = dto.getMessageKey();
if (isStorePreStatus) {
// 执行下面 3、4 步
mqMessageService.confirmReceiveMessage(consumerGroup, dto);
}
String methodName = joinPoint.getSignature().getName();
try {
// 返回注解方法;
result = joinPoint.proceed();
log.info(“result={}”, result);
if (CONSUME_SUCCESS.equals(result.toString())) {
mqMessageService.saveAndConfirmFinishMessage(consumerGroup, messageKey);
}
} catch (Exception e) {
log.error(“ 发送可靠消息, 目标方法[{}], 出现异常 ={}”, methodName, e.getMessage(), e);
throw e;
} finally {
log.info(“ 发送可靠消息 目标方法[{}], 总耗时 ={}”, methodName, System.currentTimeMillis() – startTime);
}
return result;
}

confirmReceiveMessage:消费者确认收到消息;在上面目录【发送激活邮箱的消息 /service 层】的第 12 步
持久化消费者确认消息 MqMessageData 到本地服务 OPC mysql 中,表 pc_mq_message_data;
@Override
@Transactional(rollbackFor = Exception.class)
public void confirmReceiveMessage(String cid, MqMessageData messageData) {
final String messageKey = messageData.getMessageKey();
log.info(“confirmReceiveMessage – 消费者 ={}, 确认收到 key={}的消息 ”, cid, messageKey);
// 持久化消费者确认消息 MqMessageData 到本地服务 mysql 中,表 pc_mq_message_data
messageData.setMessageType(MqMessageTypeEnum.CONSUMER_MESSAGE.messageType());
messageData.setId(UniqueIdGenerator.generateId());
mqMessageDataMapper.insertSelective(messageData);
// 调用远端服务 TPC,更新确认收到消息表状态为已确认,TpcMqConfirm,表 pc_tpc_mq_confirm;
Wrapper wrapper = tpcMqMessageFeignApi.confirmReceiveMessage(cid, messageKey);
log.info(“tpcMqMessageFeignApi.confirmReceiveMessage result={}”, wrapper);
if (wrapper == null) {
throw new TpcBizException(ErrorCodeEnum.GL99990002);
}
if (wrapper.error()) {
throw new TpcBizException(ErrorCodeEnum.TPC10050004, wrapper.getMessage(), messageKey);
}
}

紧接着第 3 步,调用远端服务 TPC,更新确认收到消息状态为已确认,TpcMqConfirm,表 pc_tpc_mq_confirm;

status:状态, 10 – 未确认 ; 20 – 已确认; 30 已消费;
consumeCount:消费的次数,加 1;

@Override
public void confirmReceiveMessage(final String cid, final String messageKey) {
// 1. 校验 cid
// 2. 校验 messageKey
// 3. 校验 cid 和 messageKey
Long confirmId = tpcMqConfirmMapper.getIdMqConfirm(cid, messageKey);
// 3. 更新消费信息的状态
tpcMqConfirmMapper.confirmReceiveMessage(confirmId);
}
第 3、4 步执行后,返回切面,执行下面代码,再返回注解修饰的方法;
result = joinPoint.proceed();

注解修饰方法,通过参数 MessageExt 获取该消息的 topic(主题)、tag(标签)、keys(唯一键)、body(消息体);

幂等性(避免重复消费):redis 中存储消费过的该消息的 keys;
根据消息的 topic 执行相应的操作处理该消息

比如此流程的发送激活邮箱,使用 spring 框架的 TaskExecutor 执行邮箱发送任务。
@Override
public int sendSimpleMail(String subject, String text, Set<String> to) {
log.info(“sendSimpleMail – 发送简单邮件. subject={}, text={}, to={}”, subject, text, to);
int result = 1;
try {
SimpleMailMessage message = MailEntity.createSimpleMailMessage(subject, text, to);
message.setFrom(from);
taskExecutor.execute(() -> mailSender.send(message));
} catch (Exception e) {
log.info(“sendSimpleMail [FAIL] ex={}”, e.getMessage(), e);
result = 0;
}
return result;
}

第 8 步 如果消息消费成功,邮件发送成功,redis 中存储该消息(幂等,过期时间 10 天),同时返回消费成功代码;
ops.set(keys, keys, 10, TimeUnit.DAYS);
// 业务实现消费回调的时候,当且仅当返回下面代码时,RocketMQ 才会认为这批消息是消费完成的
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

第 8 步 如果消息消费失败,比如数据库异常,扣款失败,邮件发送失败等需要重试的场景,返回重试消费代码。
} catch (IllegalArgumentException ex) {
log.error(“ 校验 MQ message 失败 ex={}”, ex.getMessage(), ex);
} catch (Exception e) {
log.error(“ 处理 MQ message 失败 topicName={}, keys={}, ex={}”, topicName, keys, e.getMessage(), e);
// 如果消息消费失败,例如数据库异常等,扣款失败,发送失败需要重试的场景,
// 返回下面代码,RocketMQ 就认为消费失败。
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
执行完注解修饰方法,再次返回切面中,继续执行,判断返回结果;
String methodName = joinPoint.getSignature().getName();
try {
result = joinPoint.proceed();
log.info(“result={}”, result);
if (CONSUME_SUCCESS.equals(result.toString())) {
mqMessageService.saveAndConfirmFinishMessage(consumerGroup, messageKey);
}
} catch (Exception e) {
log.error(“ 发送可靠消息, 目标方法[{}], 出现异常 ={}”, methodName, e.getMessage(), e);
throw e;
} finally {
log.info(“ 发送可靠消息 目标方法[{}], 总耗时 ={}”, methodName, System.currentTimeMillis() – startTime);
}
return result;

第 11 步中,如果返回 CONSUME_SUCCESS,保存并确认消息完成;
调用远端服务 TCP,更新消费确认消息列表 pc_tpc_mq_confirm,状态为已消费;

@Override
public void saveAndConfirmFinishMessage(String cid, String messageKey) {
// 1. 调用远端服务 tcp,确认完成消费消息
Wrapper wrapper = tpcMqMessageFeignApi.confirmConsumedMessage(cid, messageKey);
log.info(“tpcMqMessageFeignApi.confirmReceiveMessage result={}”, wrapper);
if (wrapper == null) {
throw new TpcBizException(ErrorCodeEnum.GL99990002);
}
if (wrapper.error()) {
throw new TpcBizException(ErrorCodeEnum.TPC10050004, wrapper.getMessage(), messageKey);
}
}
发送激活成功邮箱过程

发送激活成功邮箱同上面发送激活邮箱一样利用可靠消息服务完成分布式事务操作。
controller 层
@GetMapping(value = “/activeUser/{activeUserToken}”)
@ApiOperation(httpMethod = “POST”, value = “ 激活用户 ”)
public Wrapper activeUser(@PathVariable String activeUserToken) {
uacUserService.activeUser(activeUserToken);
return WrapMapper.ok(“ 激活成功 ”);
}
service 层

activeuser():
@Override
public void activeUser(String activeUserToken) {
Preconditions.checkArgument(!StringUtils.isEmpty(activeUserToken), “ 激活用户失败 ”);

String activeUserKey = RedisKeyUtil.getActiveUserKey(activeUserToken);

String email = redisService.getKey(activeUserKey);

if (StringUtils.isEmpty(email)) {
throw new UacBizException(ErrorCodeEnum.UAC10011030);
}
// 修改用户状态, 绑定访客角色
UacUser uacUser = new UacUser();
uacUser.setEmail(email);

uacUser = uacUserMapper.selectOne(uacUser);
if (uacUser == null) {
logger.error(“ 找不到用户信息. email={}”, email);
throw new UacBizException(ErrorCodeEnum.UAC10011004, email);
}

UacUser update = new UacUser();
update.setId(uacUser.getId());
update.setStatus(UacUserStatusEnum.ENABLE.getKey());
LoginAuthDto loginAuthDto = new LoginAuthDto();
loginAuthDto.setUserId(uacUser.getId());
loginAuthDto.setUserName(uacUser.getLoginName());
loginAuthDto.setLoginName(uacUser.getLoginName());
update.setUpdateInfo(loginAuthDto);

UacUser user = this.queryByUserId(uacUser.getId());

Map<String, Object> param = Maps.newHashMap();
param.put(“loginName”, user.getLoginName());
param.put(“dateTime”, DateUtil.formatDateTime(new Date()));

Set<String> to = Sets.newHashSet();
to.add(user.getEmail());

// 构建激活成功消息体
MqMessageData mqMessageData = emailProducer.sendEmailMq(to, UacEmailTemplateEnum.ACTIVE_USER_SUCCESS, AliyunMqTopicConstants.MqTagEnum.ACTIVE_USER_SUCCESS, param);
// 1. 可靠消息服务发送邮件
userManager.activeUser(mqMessageData, update, activeUserKey);
}
调用 userManager.activeUser():可以看到该方法也是注解 @MqProducerStore 修饰;
@MqProducerStore
public void activeUser(final MqMessageData mqMessageData, final UacUser uacUser, final String activeUserKey) {
log.info(“ 激活用户. mqMessageData={}, user={}”, mqMessageData, uacUser);
// 更新用户信息
int result = uacUserMapper.updateByPrimaryKeySelective(uacUser);
if (result < 1) {
throw new UacBizException(ErrorCodeEnum.UAC10011038, uacUser.getId());
}

// 绑定一个访客角色默认值 roleId=10000
final Long userId = uacUser.getId();
Preconditions.checkArgument(userId != null, “ 用戶 Id 不能爲空 ”);

final Long roleId = 10000L;

UacRoleUser roleUser = new UacRoleUser();
roleUser.setUserId(userId);
roleUser.setRoleId(roleId);
uacRoleUserMapper.insertSelective(roleUser);
// 绑定一个组织
UacGroupUser groupUser = new UacGroupUser();
groupUser.setUserId(userId);
groupUser.setGroupId(GlobalConstant.Sys.SUPER_MANAGER_GROUP_ID);
uacGroupUserMapper.insertSelective(groupUser);
// 删除 activeUserToken
redisService.deleteKey(activeUserKey);
}

本地事务执行用户信息更新和 redis 邮箱激活 token 删除,切面编程发送激活成功邮箱分析过程和上面发送激活邮箱流程是一样的,这里不再赘述。
这两个过程根据发送消息的 tag 不同,从而处理逻辑不同。Topic 都是 SEND_EMAIL_TOPIC;

此处具体为邮箱内容模板不同,其余消息生产端和消费端流程一样。
public enum MqTagEnum {

/**
* 激活用户.
*/
ACTIVE_USER(“ACTIVE_USER”, MqTopicEnum.SEND_EMAIL_TOPIC.getTopic(), “ 激活用户 ”),
/**
* 激活用户成功.
*/
ACTIVE_USER_SUCCESS(“ACTIVE_USER_SUCCESS”, MqTopicEnum.SEND_EMAIL_TOPIC.getTopic(), “ 激活用户成功 ”),

// … 省略其他 tag

String tag;
String topic;
String tagName;

MqTagEnum(String tag, String topic, String tagName) {
this.tag = tag;
this.topic = topic;
this.tagName = tagName;
}

public String getTag() {
return tag;
}
public String getTopic() {
return topic;
}
}

正文完
 0