# JmsGroupListener **Repository Path**: lingfy/JmsGroupListener ## Basic Information - **Project Name**: JmsGroupListener - **Description**: 自定义注解实现监听多个队列或者队列集合 - **Primary Language**: Java - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 1 - **Forks**: 3 - **Created**: 2020-07-22 - **Last Updated**: 2022-12-23 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README ## 一、背景 最近双12银联进行满减活动,由于外部接入商户响应速度较慢,导致了队列数据挤压,影响了原本没有参与满减活动的商户,为了解决此问题决定按照商户将队列进行拆分,降低彼此的影响。 在Spring Boot框架下大家基本上会想到如下这样修改方式,那随着被监听的队列越来越多,可以想象代码的可读性会比较差,所以基本这个目的实现了@JmsGroupListener注解来解决该问题(如果监监听的队列数量较少还是建议使用原生注解)。 ~~~ @JmsListener(destination = "test_0001") @JmsListener(destination = "test_0002") public void receiveMessage(String msg) { System.out.println("Received <" + msg + ">"); } @JmsListeners(value = {@JmsListener(destination = "test_0001"), @JmsListener(destination = "test_0002")}) public void receiveMessage1(String msg) { System.out.println("Received <" + msg + ">"); } ~~~ ## 二、效果 在配置文件中配置需要监听的队列集合 ~~~ activemq.message.group=0001|0002 ~~~ 在业务代码中使用@JmsGroupListener注解 ~~~ @JmsGroupListener(group = "${activemq.message.group}", groupSplit = "|", destinationPrefix = "test_") public void receiveMessage2(String msg) { System.out.println("Received <" + msg + ">"); } ~~~ ## 三、定义注解 定义一个注解,如下可以看出该注解与@JmsListener注解的区别,删除的注解属性的原因后面会进行介绍,按照第二部分的使用,最后监听的队列名为test_0001和test_0002。 ~~~ public @interface JmsGroupListener { //定义要监听到队列区分关键词集合 String group(); //关键词集合分隔符 String groupSplit(); //队列名称前缀 String destinationPrefix(); //String id() default ""; String containerFactory() default ""; //String destination(); String subscription() default ""; String selector() default ""; String concurrency() default ""; } ~~~ ## 四、实现注解 ##### ①实现思路 ~~~ Processing of @JmsListener annotations is performed by registering a JmsListenerAnnotationBeanPostProcessor. This can be done manually or, more conveniently, through the element or @EnableJms annotation. ~~~ 通过查看@JmsListener注解的注释可以了解到注解的实现主要在JmsListenerAnnotationBeanPostProcessor中,该类继承了MergedBeanDefinitionPostProcessor,所以我们继承该类基于@JmsListener的实现方式实现@JmsGroupListener注解就可以了。 如果不知道为什么继承JmsListenerAnnotationBeanPostProcessor就可以实现的话可以看一下我同事写的主题为[BeanFactoryPostProcessor,BeanPostProcessor,SmartInitializingSingleton等几个可拓展接口的执行时机](https://www.jianshu.com/p/3fcf0dd7018e )的一篇博客,应该会很大的帮助。 ##### ②重写postProcessAfterInitialization方法 该方法大家注意两个Process代码块即可,第一个Process代码块主要构造一个监听方法与@MyJmsListener注解信息的Map。第二个Process代码块是处理每一个@MyJmsListener注解,也是实现了监听注册的关键代码。 ~~~ @Override @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { if (bean instanceof AopInfrastructureBean || bean instanceof JmsListenerContainerFactory || bean instanceof JmsListenerEndpointRegistry) { // Ignore AOP infrastructure such as scoped proxies. return bean; } Class targetClass = AopProxyUtils.ultimateTargetClass(bean); if (!this.nonAnnotatedClasses.contains(targetClass) && AnnotationUtils.isCandidateClass(targetClass, JmsGroupListener.class)) { //Process @MyJmsListener annotation ,Getting the relationship between method and annotation Map annotatedMethods = MethodIntrospector.selectMethods(targetClass, (MethodIntrospector.MetadataLookup) method -> { JmsGroupListener listenerMethod = AnnotatedElementUtils.findMergedAnnotation(method, JmsGroupListener.class); return listenerMethod; }); if (annotatedMethods.isEmpty()) { this.nonAnnotatedClasses.add(targetClass); if (logger.isTraceEnabled()) { logger.trace("No @JmsGroupListener annotations found on bean type: " + targetClass); } } else { //Process each @MyJmsListener annotation annotatedMethods.forEach((method, listener) -> processJmsListener(listener, method, bean)); if (logger.isDebugEnabled()) { logger.debug(annotatedMethods.size() + " @JmsGroupListener methods processed on bean '" + beanName + "': " + annotatedMethods); } } } return bean; } } ~~~ ##### ③重写processJmsListener方法 在本部分大家只要关注一个Process代码块即可,该部分实现了将group属性进行拆分,然后改造需要监听的MethodJmsListenerEndpoint并注册到JmsListenerEndpointRegistrar中。 在定义注解的部分我们注意到我们注释了@JmsListener注解的id属性,这是因为@ JmsGroupListener监听的是一个队列的集合,为了处理方便,我们自动为其生成id。 ~~~ public void processJmsListener(JmsGroupListener jmsGroupListener, Method mostSpecificMethod, Object bean) { Method invocableMethod = AopUtils.selectInvocableMethod(mostSpecificMethod, bean.getClass()); JmsListenerContainerFactory factory = null; String containerFactoryBeanName = resolve(jmsGroupListener.containerFactory()); if (StringUtils.hasText(containerFactoryBeanName)) { Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name"); try { factory = this.beanFactory.getBean(containerFactoryBeanName, JmsListenerContainerFactory.class); } catch (NoSuchBeanDefinitionException ex) { throw new BeanInitializationException("Could not register JMS listener endpoint on [" + mostSpecificMethod + "], no " + JmsListenerContainerFactory.class.getSimpleName() + " with id '" + containerFactoryBeanName + "' was found in the application context", ex); } } //Process all destination names String[] DestinationPostfixes = resolve(jmsGroupListener.group()).split("[" + jmsGroupListener.groupSplit() + "]"); for (String postfix : DestinationPostfixes) { String destination = jmsGroupListener.destinationPrefix() + postfix; MethodJmsListenerEndpoint endpoint = createMethodJmsListenerEndpoint(); endpoint.setBean(bean); endpoint.setMethod(invocableMethod); endpoint.setMostSpecificMethod(mostSpecificMethod); endpoint.setMessageHandlerMethodFactory(messageHandlerMethodFactory); endpoint.setEmbeddedValueResolver(this.embeddedValueResolver); endpoint.setBeanFactory(this.beanFactory); //Avoid conflict endpoint.setId(getEndpointId()); endpoint.setDestination(resolve(destination)); if (StringUtils.hasText(jmsGroupListener.selector())) { endpoint.setSelector(resolve(jmsGroupListener.selector())); } if (StringUtils.hasText(jmsGroupListener.subscription())) { endpoint.setSubscription(resolve(jmsGroupListener.subscription())); } if (StringUtils.hasText(jmsGroupListener.concurrency())) { endpoint.setConcurrency(resolve(jmsGroupListener.concurrency())); } this.registrar.registerEndpoint(endpoint, factory); } } ~~~ ##### ④重写afterSingletonsInstantiated方法 通过查看JmsListenerAnnotationBeanPostProcessor的源码我们发现,在该类中afterSingletonsInstantiated方法的最关键的一句registrar.afterPropertiesSet()即可完成所有监听的注册。 我们原本的思路是依靠JmsListenerAnnotationBeanPostProcessor类的afterSingletonsInstantiated,但是后面通过调试发现我们自己构造的JmsListenerEndpointRegistrar对象中的JmsListenerEndpointRegistry对象需要传递给JmsListenerEndpointRegistrar类的registerAllEndpoints方法,所以迫于无奈我们只能重写afterSingletonsInstantiated方法。 所以在本部分的重点进行了setContainerFactoryBeanName和setEndpointRegistry(全局对象),本来进行该类重写时候本来想阉割对于JmsListenerConfigurer和MessageHandlerMethodFactory扩展,但是最后还是为了有一定的通用性保留了该部分。 ~~~ @Override public void afterSingletonsInstantiated() { // Remove resolved singleton classes from cache this.nonAnnotatedClasses.clear(); if (this.beanFactory instanceof ListableBeanFactory) { // Apply JmsListenerConfigurer beans from the BeanFactory, if any Map beans = ((ListableBeanFactory) this.beanFactory).getBeansOfType(JmsListenerConfigurer.class); List configurers = new ArrayList<>(beans.values()); AnnotationAwareOrderComparator.sort(configurers); for (JmsListenerConfigurer configurer : configurers) { configurer.configureJmsListeners(this.registrar); } } // Must be set to obtain container factory by bean name if (this.containerFactoryBeanName != null) { registrar.setContainerFactoryBeanName(containerFactoryBeanName); } // Register endpointRegistry with spring context if (this.registrar.getEndpointRegistry() == null) { registrar.setEndpointRegistry(endpointRegistry); } // Set the custom handler method factory once resolved by the configurer MessageHandlerMethodFactory handlerMethodFactory = this.registrar.getMessageHandlerMethodFactory(); if (handlerMethodFactory != null) { this.messageHandlerMethodFactory.setMessageHandlerMethodFactory(handlerMethodFactory); } // Actually register all listeners this.registrar.afterPropertiesSet(); } ~~~ ## 五、总结 到此该注解的关键实现过程已经介绍完成,其中还有一部代码这里就不进行详细的介绍了,有需要的同学自己可以看一下[实现源码](https://gitee.com/changeword/JmsGroupListener)(由于水平有限,欢迎大家来找茬),最后与大家分享一下对源码进行扩展的新的体会,调试源码->了解大体流程->缺什么补什么。