序
本文主要研究一下 rocketmq 的 DefaultRocketMQListenerContainer
DefaultRocketMQListenerContainer
rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
public class DefaultRocketMQListenerContainer implements InitializingBean,
RocketMQListenerContainer, SmartLifecycle, ApplicationContextAware {private final static Logger log = LoggerFactory.getLogger(DefaultRocketMQListenerContainer.class);
private ApplicationContext applicationContext;
/**
* The name of the DefaultRocketMQListenerContainer instance
*/
private String name;
private long suspendCurrentQueueTimeMillis = 1000;
/**
* Message consume retry strategy<br> -1,no retry,put into DLQ directly<br> 0,broker control retry frequency<br>
* >0,client control retry frequency.
*/
private int delayLevelWhenNextConsume = 0;
private String nameServer;
private AccessChannel accessChannel = AccessChannel.LOCAL;
private String consumerGroup;
private String topic;
private int consumeThreadMax = 64;
private String charset = "UTF-8";
private ObjectMapper objectMapper;
private RocketMQListener rocketMQListener;
private RocketMQMessageListener rocketMQMessageListener;
private DefaultMQPushConsumer consumer;
private Class messageType;
private boolean running;
// The following properties came from @RocketMQMessageListener.
private ConsumeMode consumeMode;
private SelectorType selectorType;
private String selectorExpression;
private MessageModel messageModel;
private long consumeTimeout;
//......
public void setRocketMQMessageListener(RocketMQMessageListener anno) {
this.rocketMQMessageListener = anno;
this.consumeMode = anno.consumeMode();
this.consumeThreadMax = anno.consumeThreadMax();
this.messageModel = anno.messageModel();
this.selectorExpression = anno.selectorExpression();
this.selectorType = anno.selectorType();
this.consumeTimeout = anno.consumeTimeout();}
@Override
public void setupMessageListener(RocketMQListener rocketMQListener) {this.rocketMQListener = rocketMQListener;}
@Override
public void destroy() {this.setRunning(false);
if (Objects.nonNull(consumer)) {consumer.shutdown();
}
log.info("container destroyed, {}", this.toString());
}
@Override
public boolean isAutoStartup() {return true;}
@Override
public void stop(Runnable callback) {stop();
callback.run();}
@Override
public void start() {if (this.isRunning()) {throw new IllegalStateException("container already running." + this.toString());
}
try {consumer.start();
} catch (MQClientException e) {throw new IllegalStateException("Failed to start RocketMQ push consumer", e);
}
this.setRunning(true);
log.info("running container: {}", this.toString());
}
@Override
public void stop() {if (this.isRunning()) {if (Objects.nonNull(consumer)) {consumer.shutdown();
}
setRunning(false);
}
}
@Override
public boolean isRunning() {return running;}
private void setRunning(boolean running) {this.running = running;}
@Override
public int getPhase() {
// Returning Integer.MAX_VALUE only suggests that
// we will be the first bean to shutdown and last bean to start
return Integer.MAX_VALUE;
}
@Override
public void afterPropertiesSet() throws Exception {initRocketMQPushConsumer();
this.messageType = getMessageType();
log.debug("RocketMQ messageType: {}", messageType.getName());
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.applicationContext = applicationContext;}
@Override
public String toString() {
return "DefaultRocketMQListenerContainer{" +
"consumerGroup='" + consumerGroup + '\'' +
", nameServer='" + nameServer + '\'' +
", topic='" + topic + '\'' +
", consumeMode=" + consumeMode +
", selectorType=" + selectorType +
", selectorExpression='" + selectorExpression + '\'' +
", messageModel=" + messageModel +
'}';
}
private void initRocketMQPushConsumer() throws MQClientException {Assert.notNull(rocketMQListener, "Property'rocketMQListener'is required");
Assert.notNull(consumerGroup, "Property'consumerGroup'is required");
Assert.notNull(nameServer, "Property'nameServer'is required");
Assert.notNull(topic, "Property'topic'is required");
RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(),
this.rocketMQMessageListener.accessKey(), this.rocketMQMessageListener.secretKey());
boolean enableMsgTrace = rocketMQMessageListener.enableMsgTrace();
if (Objects.nonNull(rpcHook)) {consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook, new AllocateMessageQueueAveragely(),
enableMsgTrace, this.applicationContext.getEnvironment().
resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
consumer.setVipChannelEnabled(false);
consumer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook, consumerGroup));
} else {log.debug("Access-key or secret-key not configure in" + this + ".");
consumer = new DefaultMQPushConsumer(consumerGroup, enableMsgTrace,
this.applicationContext.getEnvironment().
resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
}
String customizedNameServer = this.applicationContext.getEnvironment().resolveRequiredPlaceholders(this.rocketMQMessageListener.nameServer());
if (customizedNameServer != null) {consumer.setNamesrvAddr(customizedNameServer);
} else {consumer.setNamesrvAddr(nameServer);
}
if (accessChannel != null) {consumer.setAccessChannel(accessChannel);
}
consumer.setConsumeThreadMax(consumeThreadMax);
if (consumeThreadMax < consumer.getConsumeThreadMin()) {consumer.setConsumeThreadMin(consumeThreadMax);
}
consumer.setConsumeTimeout(consumeTimeout);
consumer.setInstanceName(this.name);
switch (messageModel) {
case BROADCASTING:
consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);
break;
case CLUSTERING:
consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);
break;
default:
throw new IllegalArgumentException("Property'messageModel'was wrong.");
}
switch (selectorType) {
case TAG:
consumer.subscribe(topic, selectorExpression);
break;
case SQL92:
consumer.subscribe(topic, MessageSelector.bySql(selectorExpression));
break;
default:
throw new IllegalArgumentException("Property'selectorType'was wrong.");
}
switch (consumeMode) {
case ORDERLY:
consumer.setMessageListener(new DefaultMessageListenerOrderly());
break;
case CONCURRENTLY:
consumer.setMessageListener(new DefaultMessageListenerConcurrently());
break;
default:
throw new IllegalArgumentException("Property'consumeMode'was wrong.");
}
if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer);
}
}
private Class getMessageType() {Class<?> targetClass = AopProxyUtils.ultimateTargetClass(rocketMQListener);
Type[] interfaces = targetClass.getGenericInterfaces();
Class<?> superclass = targetClass.getSuperclass();
while ((Objects.isNull(interfaces) || 0 == interfaces.length) && Objects.nonNull(superclass)) {interfaces = superclass.getGenericInterfaces();
superclass = targetClass.getSuperclass();}
if (Objects.nonNull(interfaces)) {for (Type type : interfaces) {if (type instanceof ParameterizedType) {ParameterizedType parameterizedType = (ParameterizedType) type;
if (Objects.equals(parameterizedType.getRawType(), RocketMQListener.class)) {Type[] actualTypeArguments = parameterizedType.getActualTypeArguments();
if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) {return (Class) actualTypeArguments[0];
} else {return Object.class;}
}
}
}
return Object.class;
} else {return Object.class;}
}
//......
}
- DefaultRocketMQListenerContainer 实现了 InitializingBean, RocketMQListenerContainer, SmartLifecycle, ApplicationContextAware 接口;setRocketMQMessageListener 方法会根据 RocketMQMessageListener 注解的信息来设置 consumeMode、consumeThreadMax、messageModel、selectorExpression、selectorType、consumeTimeout
- afterPropertiesSet 方法执行了 initRocketMQPushConsumer 及 getMessageType 方法;initRocketMQPushConsumer 方法会根据 rpcHook 是否为 null 来创建不同的 DefaultMQPushConsumer,之后根据 messageModel、selectorType、consumeMode 等来配置 consumer;如果 rocketMQListener 类型是 RocketMQPushConsumerLifecycleListener 的,则执行 RocketMQPushConsumerLifecycleListener 的 prepareStart 方法
- setupMessageListener 方法主要是保存了 rocketMQListener;isAutoStartup 方法返回 true;start 方法主要是执行 consumer.start() 方法;stop 及 destroy 方法主要是执行 consumer.shutdown()
DefaultMessageListenerConcurrently
rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently {@SuppressWarnings("unchecked")
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt messageExt : msgs) {log.debug("received msg: {}", messageExt);
try {long now = System.currentTimeMillis();
rocketMQListener.onMessage(doConvertMessage(messageExt));
long costTime = System.currentTimeMillis() - now;
log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
} catch (Exception e) {log.warn("consume message failed. messageExt:{}", messageExt, e);
context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
- DefaultMessageListenerConcurrently 方法实现了 MessageListenerConcurrently 接口;它的 consumeMessage 方法使用 for 循环 try catch 执行 rocketMQListener.onMessage(doConvertMessage(messageExt)) 回调,都成功返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS,一旦异常则返回 ConsumeConcurrentlyStatus.RECONSUME_LATER
DefaultMessageListenerOrderly
rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
public class DefaultMessageListenerOrderly implements MessageListenerOrderly {@SuppressWarnings("unchecked")
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {for (MessageExt messageExt : msgs) {log.debug("received msg: {}", messageExt);
try {long now = System.currentTimeMillis();
rocketMQListener.onMessage(doConvertMessage(messageExt));
long costTime = System.currentTimeMillis() - now;
log.info("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
} catch (Exception e) {log.warn("consume message failed. messageExt:{}", messageExt, e);
context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
return ConsumeOrderlyStatus.SUCCESS;
}
}
- DefaultMessageListenerOrderly 实现了 MessageListenerOrderly 接口,其 consumeMessage 方法使用 for 循环 try catch 执行 rocketMQListener.onMessage(doConvertMessage(messageExt)) 回调,都成功返回 ConsumeOrderlyStatus.SUCCESS,一旦异常则返回 ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT
小结
- DefaultRocketMQListenerContainer 实现了 InitializingBean, RocketMQListenerContainer, SmartLifecycle, ApplicationContextAware 接口;setRocketMQMessageListener 方法会根据 RocketMQMessageListener 注解的信息来设置 consumeMode、consumeThreadMax、messageModel、selectorExpression、selectorType、consumeTimeout
- afterPropertiesSet 方法执行了 initRocketMQPushConsumer 及 getMessageType 方法;initRocketMQPushConsumer 方法会根据 rpcHook 是否为 null 来创建不同的 DefaultMQPushConsumer,之后根据 messageModel、selectorType、consumeMode 等来配置 consumer;如果 rocketMQListener 类型是 RocketMQPushConsumerLifecycleListener 的,则执行 RocketMQPushConsumerLifecycleListener 的 prepareStart 方法
- setupMessageListener 方法主要是保存了 rocketMQListener;isAutoStartup 方法返回 true;start 方法主要是执行 consumer.start() 方法;stop 及 destroy 方法主要是执行 consumer.shutdown()
doc
- DefaultRocketMQListenerContainer