diff --git a/README.md b/README.md index 0627107649228c4f2f8c742ba932087ebfbe5e08..dd33ca52085e618f059503d1086bd44ae8f78b22 100644 --- a/README.md +++ b/README.md @@ -84,7 +84,7 @@ protected void afterExecute(Runnable r, Throwable t); > > 4. 集成常用三方中间件内部线程池管理 -**经过多个版本的迭代,目前最新版本 v1.1.6.1 具有以下特性** ✅ +**经过多个版本的迭代,目前最新版本 v1.1.7 具有以下特性** ✅ - **代码零侵入**:我们改变了线程池以往的使用姿势,所有配置均放在配置中心,服务启动时会从配置中心拉取配置生成线程池对象放到 Spring 容器中,使用时直接从 Spring 容器中获取,对业务代码零侵入 @@ -194,12 +194,6 @@ protected void afterExecute(Runnable r, Throwable t); --- -## 友情链接 - -- [HertzBeat](https://github.com/dromara/hertzbeat) : 易用友好的实时监控告警系统,无需Agent,强大自定义监控能力. - ---- - ## 联系我 看到这儿,**请给项目一个 star**,你的支持是我们前进的动力! @@ -214,6 +208,19 @@ protected void afterExecute(Runnable r, Throwable t); --- +## 友情链接 + +- [HertzBeat](https://github.com/dromara/hertzbeat) : 易用友好的实时监控告警系统,无需Agent,强大自定义监控能力. + +--- + +## 特别赞助 + +**JNPF低代码开发平台** + + + +--- ## 鸣谢 感谢 JetBrains 对开源项目的支持 diff --git a/adapter/adapter-rocketmq/src/main/java/org/dromara/dynamictp/adapter/rocketmq/RocketMqDtpAdapter.java b/adapter/adapter-rocketmq/src/main/java/org/dromara/dynamictp/adapter/rocketmq/RocketMqDtpAdapter.java index fee47128e70bbf6cfac190d7ac380e5b27291859..b8d6c34cae4dc7cfb282dad67f9a33fc024e80ba 100644 --- a/adapter/adapter-rocketmq/src/main/java/org/dromara/dynamictp/adapter/rocketmq/RocketMqDtpAdapter.java +++ b/adapter/adapter-rocketmq/src/main/java/org/dromara/dynamictp/adapter/rocketmq/RocketMqDtpAdapter.java @@ -31,6 +31,7 @@ import org.dromara.dynamictp.common.properties.DtpProperties; import org.dromara.dynamictp.common.util.ReflectionUtil; import org.dromara.dynamictp.core.support.ThreadPoolExecutorProxy; import org.dromara.dynamictp.jvmti.JVMTI; +import org.springframework.util.ReflectionUtils; import java.util.Objects; import java.util.concurrent.ThreadPoolExecutor; @@ -102,6 +103,9 @@ public class RocketMqDtpAdapter extends AbstractDtpAdapter { return; } for (DefaultMQProducer defaultMQProducer : beans) { + if (Objects.isNull(ReflectionUtils.findMethod(DefaultMQProducerImpl.class, "getAsyncSenderExecutor"))) { + continue; + } val producer = (DefaultMQProducerImpl) ReflectionUtil.getFieldValue(DefaultMQProducer.class, "defaultMQProducerImpl", defaultMQProducer); if (Objects.isNull(producer)) { diff --git a/core/src/main/java/org/dromara/dynamictp/core/aware/TaskEnhanceAware.java b/core/src/main/java/org/dromara/dynamictp/core/aware/TaskEnhanceAware.java index fadf9508c4b9ac47166536a748922108470470c5..667e1518915c29faf2c7894495377ff31687d1e9 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/aware/TaskEnhanceAware.java +++ b/core/src/main/java/org/dromara/dynamictp/core/aware/TaskEnhanceAware.java @@ -41,12 +41,12 @@ public interface TaskEnhanceAware extends DtpAware { */ default Runnable getEnhancedTask(Runnable command, List taskWrappers) { Runnable wrapRunnable = command; + String taskName = (wrapRunnable instanceof NamedRunnable) ? ((NamedRunnable) wrapRunnable).getName() : null; if (CollectionUtils.isNotEmpty(taskWrappers)) { for (TaskWrapper t : taskWrappers) { wrapRunnable = t.wrap(wrapRunnable); } } - String taskName = (wrapRunnable instanceof NamedRunnable) ? ((NamedRunnable) wrapRunnable).getName() : null; return new DtpRunnable(command, wrapRunnable, taskName); } diff --git a/core/src/main/java/org/dromara/dynamictp/core/monitor/collector/jmx/JMXCollector.java b/core/src/main/java/org/dromara/dynamictp/core/monitor/collector/jmx/JMXCollector.java index a7643b2b7309faac9629bf603fa97e560c1f0c3b..1a9578130588a90b73593bdf83adaf481aa549d9 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/monitor/collector/jmx/JMXCollector.java +++ b/core/src/main/java/org/dromara/dynamictp/core/monitor/collector/jmx/JMXCollector.java @@ -40,26 +40,15 @@ public class JMXCollector extends AbstractCollector { public static final String DTP_METRIC_NAME_PREFIX = "dtp.thread.pool"; - /** - * thread pool stats map - */ - private static final Map GAUGE_CACHE = new ConcurrentHashMap<>(); - @Override public void collect(ThreadPoolStats threadPoolStats) { - if (GAUGE_CACHE.containsKey(threadPoolStats.getPoolName())) { - ThreadPoolStats poolStats = GAUGE_CACHE.get(threadPoolStats.getPoolName()); - BeanUtils.copyProperties(threadPoolStats, poolStats); - } else { - try { - MBeanServer server = ManagementFactory.getPlatformMBeanServer(); - ObjectName name = new ObjectName(DTP_METRIC_NAME_PREFIX + ":name=" + threadPoolStats.getPoolName()); - ThreadPoolStatsJMX stats = new ThreadPoolStatsJMX(threadPoolStats); - server.registerMBean(stats, name); - } catch (JMException e) { - log.error("collect thread pool stats error", e); - } - GAUGE_CACHE.put(threadPoolStats.getPoolName(), threadPoolStats); + try { + MBeanServer server = ManagementFactory.getPlatformMBeanServer(); + ObjectName name = new ObjectName(DTP_METRIC_NAME_PREFIX + ":name=" + threadPoolStats.getPoolName()); + ThreadPoolStatsJMX stats = new ThreadPoolStatsJMX(threadPoolStats); + server.registerMBean(stats, name); + } catch (JMException e) { + log.error("collect thread pool stats error", e); } } diff --git a/core/src/main/java/org/dromara/dynamictp/core/spring/DtpPostProcessor.java b/core/src/main/java/org/dromara/dynamictp/core/spring/DtpPostProcessor.java index 6d0470a41a42277170413e0d65a32ab4a93febb2..8496f1462769a38b3c4df88596776a731349240a 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/spring/DtpPostProcessor.java +++ b/core/src/main/java/org/dromara/dynamictp/core/spring/DtpPostProcessor.java @@ -17,20 +17,22 @@ package org.dromara.dynamictp.core.spring; +import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; import lombok.val; import org.apache.commons.lang3.StringUtils; +import org.dromara.dynamictp.common.plugin.DtpInterceptorRegistry; import org.dromara.dynamictp.common.util.ConstructorUtil; import org.dromara.dynamictp.common.util.ReflectionUtil; import org.dromara.dynamictp.core.DtpRegistry; import org.dromara.dynamictp.core.executor.DtpExecutor; import org.dromara.dynamictp.core.executor.eager.EagerDtpExecutor; import org.dromara.dynamictp.core.executor.eager.TaskQueue; -import org.dromara.dynamictp.common.plugin.DtpInterceptorRegistry; import org.dromara.dynamictp.core.support.DynamicTp; import org.dromara.dynamictp.core.support.ExecutorWrapper; import org.dromara.dynamictp.core.support.ScheduledThreadPoolExecutorProxy; import org.dromara.dynamictp.core.support.ThreadPoolExecutorProxy; +import org.dromara.dynamictp.core.support.task.wrapper.TaskWrapper; import org.springframework.beans.BeansException; import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.BeanFactoryAware; @@ -41,6 +43,7 @@ import org.springframework.beans.factory.config.BeanPostProcessor; import org.springframework.beans.factory.support.DefaultListableBeanFactory; import org.springframework.core.Ordered; import org.springframework.core.PriorityOrdered; +import org.springframework.core.task.TaskDecorator; import org.springframework.core.type.MethodMetadata; import org.springframework.lang.NonNull; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; @@ -139,9 +142,11 @@ public class DtpPostProcessor implements BeanPostProcessor, BeanFactoryAware, Pr private Object doRegisterAndReturnCommon(Object bean, String poolName) { if (bean instanceof ThreadPoolTaskExecutor) { - val proxy = newProxy(poolName, ((ThreadPoolTaskExecutor) bean).getThreadPoolExecutor()); + ThreadPoolTaskExecutor poolTaskExecutor = (ThreadPoolTaskExecutor) bean; + val proxy = newProxy(poolName, poolTaskExecutor.getThreadPoolExecutor()); try { ReflectionUtil.setFieldValue("threadPoolExecutor", bean, proxy); + tryWrapTaskDecorator(poolTaskExecutor, proxy); } catch (IllegalAccessException ignored) { } DtpRegistry.registerExecutor(new ExecutorWrapper(poolName, proxy), REGISTER_SOURCE); return bean; @@ -177,4 +182,23 @@ public class DtpPostProcessor implements BeanPostProcessor, BeanFactoryAware, Pr shutdownGracefulAsync(originExecutor, name, 0); return proxy; } + + private void tryWrapTaskDecorator(ThreadPoolTaskExecutor poolTaskExecutor, ThreadPoolExecutorProxy proxy) throws IllegalAccessException { + Object taskDecorator = ReflectionUtil.getFieldValue("taskDecorator", poolTaskExecutor); + if (Objects.isNull(taskDecorator)) { + return; + } + TaskWrapper taskWrapper = (taskDecorator instanceof TaskWrapper) ? (TaskWrapper) taskDecorator : new TaskWrapper() { + @Override + public String name() { + return taskDecorator.getClass().getName(); + } + + @Override + public Runnable wrap(Runnable runnable) { + return ((TaskDecorator) taskDecorator).decorate(runnable); + } + }; + ReflectionUtil.setFieldValue("taskWrappers", proxy, Lists.newArrayList(taskWrapper)); + } } diff --git a/dependencies/pom.xml b/dependencies/pom.xml index fe1a1a109a7a996406bed3c75cdccd7ae18227f1..3cc5d53bf9a10ee0272cf1106e465bfc400484a3 100644 --- a/dependencies/pom.xml +++ b/dependencies/pom.xml @@ -12,7 +12,7 @@ https://github.com/yanhom1314/dynamic-tp - 1.1.6.1 + 1.1.7 UTF-8 1.18.24 diff --git a/pom.xml b/pom.xml index 772c3a8718988816b64deb4afd89de4869f9550d..0e67e111efcdc01afb5c542d94fd9933eb54386e 100644 --- a/pom.xml +++ b/pom.xml @@ -13,7 +13,7 @@ https://github.com/yanhom1314/dynamic-tp - 1.1.6.1 + 1.1.7 8 8 diff --git a/resources/img/jnpfsoft.png b/resources/img/jnpfsoft.png new file mode 100644 index 0000000000000000000000000000000000000000..f2d3bdaca4e14a43e3c344787aa6b34875b1abd3 Binary files /dev/null and b/resources/img/jnpfsoft.png differ