同步操作将从 X-Pacific/ConcurrentLatch 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
[toc]
ConcurrentLatch是一个基于JDK的多线程归类并发处理闩工具(基于JDK1.6)
当你有5个无相关性操作,顺序执行那么消耗时间合将达到5个操作的所有操作时间和,如果开启多线程,那么又不能保证这5个操作都进行完毕才能进行后续操作,那么基于刚才说的场景ConcurrentLatch就是用来解决这个问题的,考虑到系统不能无限制的增加线程,所以ConcurrentLatch又增加了线程池管理器的概念,防止系统因为线程开启过多而宕机
ConcurrentLatch不依赖任何三方jar包,如果您使用的是Maven,那么把ConcurrentLatch安装到中央仓库后,在pom文件中添加以下依赖
<dependency>
<groupId>org.zxp</groupId>
<artifactId>concurrentLatch</artifactId>
<version>1.2-SNAPSHOT</version>
</dependency>
您需要做的只是实现LatchThread接口,并将业务代码写入handle方法,如果有需要传入的对象或信息,可以通过构造方法传参的方式传入
public class RuleLatch implements LatchThread {
RuleDto dto = null;
/**通过构造方法传递入参*/
public RuleLatch(RuleDto args){
dto = args;
}
/**你的业务处理*/
public RuleDto handle() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
dto.setMmmm(dto.getMmmm()+ 9999);
return dto;
}
}
还可以通过下面注解(或成员变量)的方式来标识业务的名称,但我们并不建议您这样做,因为这样会导致这个业务类不能以多个不同名称的任务来运行
/**添加这个类运行的任务名,注意这个名字在一次线程池运行中不可重复*/
/**也可以不配置注解,写一个名为TASKNAME的字符串成员变量也可行*/
@LatchTaskName("rule")
public class RuleLatch implements LatchThread {
RuleDto dto = null;
/**通过构造方法传递入参*/
public RuleLatch(RuleDto args){
dto = args;
}
/**你的业务处理*/
public RuleDto handle() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
dto.setMmmm(dto.getMmmm()+ 9999);
return dto;
}
}
/**添加这个类运行的任务名,注意这个名字在一次线程池运行中不可重复*/
/**也可以不配置注解,写一个名为TASKNAME的字符串成员变量也可行*/
/**返回对象配置的注解值必须和对应的LatchThread实现类一致,否则无法获取返回值对应关系*/
@LatchTaskName("rule")
public class RuleDto {
……
}
/**创建一个ConcurrentLatch执行器*/
ConcurrentLatch excutor = ConcurrentLatchExcutorFactory.getConcurrentLatch();
/**也可以通过其他静态方法获取其他的ConcurrentLatch*/
//ConcurrentLatch excutorproxy = ConcurrentLatchExcutorFactory.getConcurrentLatch(ConcurrentLatchExcutorFactory.ExcutorType.PROXY);
/**组织需要并发的业(任)务(务)类对象*/
LatchThread platformLatchThread = new PlatformLatch();//业务类A
RuleDto ruleDto = new RuleDto();
ruleDto.setRuleID("zxp123");
ruleDto.setMmmm(0.00001);
LatchThread ruleLatchThread = new RuleLatch(ruleDto);//业务类B
/**将任务推入ConcurrentLatch执行器*/
excutor.put(platformLatchThread,"AAA");
excutor.put(ruleLatchThread,"BBB");
/**可以把一个之前已经有的业务组件再放入池中,只需要名称不重复即可,当然你可以可以重新new一个*/
excutor.put(platformLatchThread,"CCC");
/**通知ConcurrentLatch执行器开始执行所有推入的任务*/
/**这里主线程或调用线程会挂起,直到所有任务都执行完毕*/
Map<String, Object> map = excutor.excute();
/**获取返回值*/
/**这里的key就是上面在注解(LatchTaskName)或者TASKNAME配置的值,能轻松的获取所有的返回对象*/
for (String key : map.keySet()) {
Object out = map.get(key);
}
注解方式标注任务名称的调用稍有不同
/**创建一个ConcurrentLatch执行器,不同点*/
ConcurrentLatch excutor =
ConcurrentLatchExcutorFactory.getConcurrentLatch(ConcurrentLatchExcutorFactory.ExcutorType.NORMAL);
/**组织需要并发的业(任)务(务)类对象*/
LatchThread platformLatchThread = new PlatformLatch();//业务类A
RuleDto ruleDto = new RuleDto();
ruleDto.setRuleID("zxp123");
ruleDto.setMmmm(0.00001);
LatchThread ruleLatchThread = new RuleLatch(ruleDto);//业务类B
/**将任务推入ConcurrentLatch执行器,不同点是这里只有一个入参*/
excutor.put(platformLatchThread);
excutor.put(ruleLatchThread);
/**不可以把一个之前已经有的业务组件再放入池中,即使你重新new了一个实例也不允许,这也是为什么我们不推荐使用这种方式的原因*/
/**通知ConcurrentLatch执行器开始执行所有推入的任务*/
/**这里主线程或调用线程会挂起,直到所有任务都执行完毕*/
Map<String, Object> map = excutor.excute();
/**获取返回值*/
/**这里的key就是上面在注解(LatchTaskName)或者TASKNAME配置的值,能轻松的获取所有的返回对象*/
/**再一次强调,必须把handle方法返回的类配置与对应的LatchThread实现类一致的注解任务名称,否则无法正常获取对应关系*/
for (String key : map.keySet()) {
Object out = map.get(key);
}
执行结果
我是BBB
我是AAA
我是AAA
taskname==========aaa
PlatformDto{name='0516', premium=6500.98, policyNo='000000000001'}
taskname==========ccc
PlatformDto{name='0516', premium=6500.98, policyNo='000000000001'}
taskname==========bbb
RuleDto{ruleID='zxp123', mmmm=9999.00001}
LatchExcutorBlockingQueueManager.print();
配置详见org.zxp.ConcurrentLatch.Constants
调用详见org.zxp.ConcurrentLatch.LatchExcutorBlockingQueueManager#getExcutor
/**
* 单个线程池最大核心线程数
*/
public static final int MAX_CORE_POOL_SIZE = 10;
线程池的核心线程数,如果不明线程池如何使用请参考我总结的一片线程池的使用文档:线程池使用说明
当然您可以结合您的业务情况以及硬件情况单独定制线程池的类型(我把线程池从上个版本的newFixedThreadPool调整为现在的普通线程池)
<!--详见org.zxp.ConcurrentLatch.LatchExcutorBlockingQueueManager#getExcutor-->
excutor = new ThreadPoolExecutor(corepoolsize, threadCount*Constants.MAX_POOL_SIZE_RATIO , 60, TimeUnit.SECONDS, queue);
/**
* 最大线程池数量倍数
*/
public static final int MAX_POOL_SIZE_RATIO = 2;
线程池的最大线程数倍数(=核心线程数*MAX_POOL_SIZE_RATIO),如果不明线程池如何使用请参考我总结的一片线程池的使用文档:线程池使用说明
/**
* 是否有界,默认无界,即不会有任务超出容量导致丢弃任务的情况
*/
public static final boolean HAS_LIMITS = false;
线程池任务等待队列是否无界,如果有界(配置为false)则下面LIMITS_SIZE配置有效,如果不明线程池如何使用请参考我总结的一片线程池的使用文档:线程池使用说明
/**
* 任务等待队列长度,有界时有效
*/
public static final int LIMITS_SIZE = 50;
线程池任务等待队列长度,如果不明线程池如何使用请参考我总结的一片线程池的使用文档:线程池使用说明
/**
* 线程池最大数量
*/
public static final int MAX_EXCUTOR_SIZE= 20;
在线程池管理器中维护的可用线程池数量,默认配置为20个,未到达20个线程池时会继续创建,直到到达20个线程
当新任务发现没有可用线程池并且线程池已经创建满20个后会有一定获取策略:
1、尝试10次获取线程池
2、再持续等待50ms获取线程池
3、如果仍无法获得线程池资源执行失败策略(详见下面AFTER_TRY_BLOCK配置)
/**
* 通过尝试后无法获得线程池资源,是否挂起等待(false抛出异常)
*/
public static final boolean AFTER_TRY_BLOCK = false;
当无法获取线程池资源时执行的失败策略
true为继续等待(挂起直到有线程池资源的释放)
false为直接抛出异常,丢弃当前任务
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。