# javaExecutorService线程池 **Repository Path**: dych/javaExecutorServiceXianChengChi ## Basic Information - **Project Name**: javaExecutorService线程池 - **Description**: 多线程分批处理数据 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 1 - **Forks**: 1 - **Created**: 2020-02-15 - **Last Updated**: 2020-12-19 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README package com.amarsoft.gci.proj.unit; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.commons.lang.StringUtils; import com.amarsoft.are.ARE; import com.amarsoft.are.util.DataConvert; import com.amarsoft.awe.util.ASResultSet; import com.amarsoft.awe.util.SqlObject; import com.amarsoft.cabp.AbstractCreditBatch; import com.amarsoft.cabp.utils.Constants; import com.xiaoniu.unipay.util.CYCTools; import com.xiaoniu.unipay.util.SendMessage; public class SendRemindMessage extends AbstractCreditBatch{ @Override protected String process() throws Exception { this.logger.info("发送催收提醒短信开始........."); // 查询总数 iCount = DataConvert.toInt((sqlca.getString(new SqlObject("select count(1) from sms_send_service where inputdate='"+this.deductDate+"' and status='0' order by sendtime ")))); //每批多少条 int batchCount = Integer.parseInt(this.getConfigParameter("sms_batch_count", "2000")); //固定大小的线程池 int threadCount = Integer.parseInt(this.getConfigParameter("sms_thread_count", "50")); int start = 1; int end = 1; int batch = (iCount / batchCount) + 1; this.logger.info("共有:《" + iCount + "》条数据, 每批:《" + batchCount + "》条数据, 分《" + batch + "》批发送...."); // 线程池 ExecutorService exec = Executors.newFixedThreadPool(threadCount); // 模拟batch个客户端访问 for (int i = 1; i <= batch; i++) { start = (i - 1) * batchCount; end = i * batchCount; if (end > iCount) { end = iCount; } if (end <= start) { break; } this.logger.info("分批处理数据, 批次号【:" + i + "】, start: " + start + " ---> end: " + end); Thread thread = new SmsThreadClass(start, end); exec.execute(thread); } exec.shutdown(); while (true) { if (exec.isTerminated()) { this.logger.info("多线程分批处理发送短信【结束】"); break; } Thread.sleep(500); } this.logger.info("发送催收提醒短信结束........."); return Constants.TASK_SUCCESS; } /** * 多线程操作类 * * @author xn034767 * */ public class SmsThreadClass extends Thread { int start = 0; int end = 0; public SmsThreadClass(int start, int end) { this.start = start; this.end = end; } @Override public void run() { try { ARE.getLog().info(">>>>>> BEGIN Thread start:" + start + ", end:" + end + ""); excultTimeTaskThread(start, end); } catch (Exception e) { e.printStackTrace(); } } } /** * 执行发送信 * * @param start 从哪开始 * @param end 到哪结束 * */ private void excultTimeTaskThread(int start, int end) { PreparedStatement ps = null; ResultSet rs = null; String serialno = ""; String phoneno = ""; String context = ""; String customername = ""; String contractno = ""; String bankno = ""; String itemdescribe = ""; String item = ""; String sendtime = ""; String sSMSReturn = ""; String smsType =""; //短信类型,如果是营销短信走另外渠道 add by fangding XNR-734 SimpleDateFormat formatter = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss"); String sDate = formatter.format(new Date()); //查出当天要发送短信的数据 String sql = "select * from (" + " select rownum as rn, sms.* from sms_send_service sms where inputdate=? and status='0' order by sendtime " + " ) tab where tab.rn > ? and tab.rn <= ?"; //更新发送成功状态和时间 String updateSql = " update sms_send_service set status = '1',up_date =? where serialno=?"; //更新发送失败状态和时间 String failSql = " update sms_send_service set status = '2',up_date =? where serialno=?"; try { ps = connection.prepareStatement(sql); ps.setString(1, this.deductDate); ps.setInt(2, start); ps.setInt(3, end); rs = ps.executeQuery(); while(rs.next()){ serialno = rs.getString("serialno"); phoneno = rs.getString("phoneno");//电话 context = rs.getString("context");//短信内容 customername = rs.getString("customername");//客户名 contractno = rs.getString("contractno");//合同号 bankno = rs.getString("messagename"); itemdescribe = rs.getString("describe"); item = rs.getString("item"); sendtime = rs.getString("sendtime"); smsType = rs.getString("attribute3"); //add by fangding XNR-734 String[] timeArray = sendtime.split(":"); Calendar calendar = Calendar.getInstance(); Calendar calendar1 = Calendar.getInstance(); calendar.set(calendar.get(calendar.YEAR), calendar.get(calendar.MONTH), calendar.get(calendar.DATE), Integer.parseInt(timeArray[0]), Integer.parseInt(timeArray[1])); if(calendar1.getTimeInMillis()>=calendar.getTimeInMillis()){ try { SendMessage sendMessage = new SendMessage(); if("MK".equals(smsType)){ String key = CYCTools.getPropertiesFileValue().getProperty("MK_Key");// 只查找短信营销key sSMSReturn = sendMessage.SendNote(phoneno, context,customername,contractno,bankno,itemdescribe,item,connection,key); }else{ sSMSReturn = sendMessage.SendNote(phoneno, context,customername,contractno,bankno,itemdescribe,item,connection); } ARE.getLog().info("调接口返回的值:------------"+sSMSReturn); if("0".equals(sSMSReturn)){ PreparedStatement psUpdateSql = connection.prepareStatement(updateSql); psUpdateSql.setString(1, sDate); psUpdateSql.setString(2, serialno); psUpdateSql.execute(); psUpdateSql.close(); connection.commit(); ARE.getLog().info("短信发送成功!!!"); }else{ PreparedStatement psUpdateSql = connection.prepareStatement(failSql); psUpdateSql.setString(1, sDate); psUpdateSql.setString(2, serialno); psUpdateSql.execute(); psUpdateSql.close(); connection.commit(); ARE.getLog().info("短信发送失败!!!"+sSMSReturn); } } catch (Exception e) { logger.info("["+serialno+"]短信发送失败:",e); } } } ps.close(); rs.close(); } catch (SQLException e) { e.printStackTrace(); } } /** * 获取系统配置参数 * * @param codeno * 对应数据CODE值 * @param defaultValue * 默认值 * @return 返回结果值 * @throws Exception * 异常 */ public String getConfigParameter(String itemno, String defaultValue) { // 1.先取默认值 String returnVal; try { String sSql = " select codeno,itemno,itemname,isinuse,inputuser,attribute1, inputorg,inputtime,remark from code_library where CODENO = 'SystemConfigParameter' and itemno = :itemno"; ASResultSet rs = sqlca.getASResultSet(new SqlObject(sSql).setParameter("itemno", itemno)); returnVal = defaultValue; if (rs.next()) { returnVal = rs.getString("attribute1"); // 2.取数据库的值 } if (StringUtils.isBlank(returnVal)) { returnVal = defaultValue; } rs.getStatement().close(); logger.info(String.format("配置项%s对应的值为:%s", itemno, returnVal)); return returnVal; } catch (Exception e) { logger.info(String.format("配置项%s对应的值为:%s", itemno, defaultValue)); logger.error("获取系统参数异常", e); return defaultValue; } } }