解决SpringBoot整合RocketMQ遇到的坑

网友投稿 291 2022-12-31

解决SpringBoot整合RocketMQ遇到的坑

应用场景

在实现RocketMQ消费时,一般会用到@RocketMQMessageListener注解定义Group、Topic以及selectorExpression(数据过滤、选择的规则)为了能支持动态筛选数据,一般都会使用表达式,然后通过apollo或者cloud config进行动态切换。

引入依赖

org.apache.rocketmq

rocketmq-spring-boot-starter

2.0.4

消费者代码

@RocketMQMessageListener(consumerGroup = "${rocketmq.group}",topic ="${rocketmq.topic}",selectorExpression = "${rocketmq.selectorExpression}")

public class Consumer implements RocketMQListener {

@Override

public void onMessage(String s) {

System.out.println("消费到的数据为:"+s);

}

}

问题排查

RocketMQMessageListener整个注解默认selectorExpression为*,表示接收当前Topic下的所有数据,如果我们想对tags进行动态配置,在使用${rocketmq.selectorExpression}表达式时会发现所有数据全被过滤了,跟踪源码(ListenerContainerConfiguration.java)发现在创建listener时selectorExpression的数据在通environment环境变量中获取对应的数据后又被覆盖了,导致整个过滤条件被变更为表达式。

@Override

public void afterSingletonsInstantiated() {

// 获取所有所有使用了RocketMQMessageListener注解的bean

Map beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class);

if (Objects.nonNull(beans)) {

// 循环注册容器

beans.forEach(this::registerContainer);

}

}

private void registerContainer(String beanName, Object bean) {

Class> clazz = AopProxyUtils.ultimateTargetClass(bean);

// 校验当前bean是否实现了RocketMQListener接口

if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) {

throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName());

}

// 获取bean上的annotation

RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);

// 解析group及topic,可支持表达式

String consumerGroup = this.environment.resolvePlaceholders(annotation.consumerGroup());

String topic = this.environment.resolvePlaceholders(annotation.topic());

boolean listenerEnabled =

(boolean)rocketMQProperties.getConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP)

.getOrDefault(topic, true);

if (!listenerEnabled) {

log.debug(

"Consumer Listener (group:{},topic:{}) is not enabled by configuration, will ignore initialization.",

consumerGroup, topic);

return;

}

validate(annotation);

String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(),

counter.incrementAndGet());

GenericApplicationContext genericApplicationContext = (GenericApplicationContext)applicationContext;

// 注册bean的,调用createRocketMQListenerContainer

genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class,

() -> createRocketMQListenerContainer(containerBeanName, bean, annotation));

DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName,

DefaultRocketMQListenerContainer.class);

if (!container.isRunning()) {

try {

container.start();

} catch (Exception e) {

log.error("Started container failed. {}", container, e);

throw new RuntimeException(e);

}

}

log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);

}

private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean,

RocketMQMessageListener annotation) {

DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer();

container.setRocketMQMessageListener(annotation);

String nameServer = environment.resolvePlaceholders(annotation.nameServer());

nameServer = StringUtils.isEmpty(nameServer) ? rocketMQProperties.getNameServer() : nameServer;

String accessChannel = environment.resolvePlaceholders(annotation.accessChannel());

container.setNameServer(nameServer);

if (!StringUtils.isEmpty(accessChannel)) {

container.setAccessChannel(AccessChannel.valueOf(accessChannel));

}

container.setTopic(environment.resolvePlaceholders(annotation.topic()));

// 此处已经根据表达式将数据取出

String tags = environment.resolvePlaceholders(annotation.selectorExpression());

if (!StringUtils.isEmpty(tags)) {

container.setSelectorExpression(tags);

}

container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup()));

// 此处将SelectorExpression的数据覆盖成了表达式

container.setRocketMQMessageListener(annotation);

container.setRocketMQListener((RocketMQListener)bean);

container.setObjectMapper(objectMapper);

container.setMessageConverter(rocketMQMessageConverter.getMessageConverter());

container.setName(name); // REVIEW ME, use the same clientId or multiple?

return container;

}

问题解决

因为ListenerContainerConfiguration类是实现了SmartInitializingSingleton接口的afterSingletonsInstantiated方法,我们可以通过反射对selectorExpression的数据在ListenerContainerConfiguration进行初始化前进行解析并赋值回去。

/**

* 在springboot初始化后,RocketMQ容器初始化前利用反射动态改变数据

**/

@Configuration

public class ChangeSelectorExpressionBeforeMQInit implements InitializingBean {

@Autowired

private ApplicationContext applicationContext;

@Autowired

private StandardEnvironment environment;

@Override

public void afterPropertiesSet() throws Exception {

Map beans =applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class);

for (Object bean : beans.values()){

Class> clazz = AopProxyUtils.ultimateTargetClass(bean);

if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) {

continue;

}

RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);

InvocationHandler invocationHandler = Proxy.getInvocationHandler(annotation);

Field field = invocationHandler.getClass().getDeclaredField("memberValues");

field.setAccessible(true);

Map memberValues = (Map) field.get(invocationHandler);

for (Map.Entry entry: memberValues.entrySet()) {

if(Objects.nonNull(entry)){

memberValues.put(entry.getKey(),environmhttp://ent.resolvePlaceholders(String.valueOf(entry.getValue())));

}

}

}

}

}

初次之外,在2.1.0版本的依赖包中已经修复了此Bug,在不造成依赖冲突的前提下,建议使用2.1.0以上的版本包。

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:网站api接口手机(提供api接口的平台)
下一篇:使用RocketMQTemplate发送带tags的消息
相关文章

 发表评论

暂时没有评论,来抢沙发吧~