# Java高级之并发工具类 **Repository Path**: moyu3390/java-high-concurrent-class ## Basic Information - **Project Name**: Java高级之并发工具类 - **Description**: java并发工具类的使用 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 1 - **Created**: 2023-05-30 - **Last Updated**: 2023-05-30 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README ### Java并发工具类 #### Semaphore + Semaphore也是一个线程同步的辅助类,可以维护当前访问自身的线程个数,并提供了同步机制。使用Semaphore可以控制同时访问资源的线程个数,例如,实现一个文件允许的并发访问数。 + 主要方法: 1. void acquire():从此信号量获取一个许可,在提供一个许可前一直将线程阻塞,否则线程被中断。 2. void release():释放一个许可,将其返回给信号量。 3. int availablePermits():返回此信号量中当前可用的许可数。 4. boolean hasQueuedThreads():查询是否有线程正在等待获取。 ##### 代码示例 ``` /** * @Author felix fei * @Date 2021/4/27 0027 14:21 * @Version 1.0.1 * @describe: 设定信号量,只有获得信号量的线程才能够往后执行业务逻辑,没有获得信号量的线程只能阻塞等待唤醒重新尝试获得信号量,可用于服务限流 */ public class SemaphoreTest { private int count = 0; private Logger log = LoggerFactory.getLogger(getClass()); @Test public void test1() { ExecutorService service = Executors.newCachedThreadPool(); final Semaphore sp = new Semaphore(3);//创建Semaphore信号量,初始化许可大小为3,同时可有三个线程进行同时访问 for (int i = 0; i < 10; i++) { try { Thread.sleep(100); } catch (InterruptedException e2) { e2.printStackTrace(); } Runnable runnable = new Runnable() { @Override public void run() { try { sp.acquire();//请求获得许可,如果有可获得的许可则继续往下执行,许可数减1。否则进入阻塞状态 } catch (InterruptedException e1) { e1.printStackTrace(); } log.info("线程" + Thread.currentThread().getName() + "进入,当前已有" + (3 - sp.availablePermits()) + "个并发"); try { // 执行具体的业务 count++; Thread.sleep((long) (Math.random() * 10000)); } catch (InterruptedException e) { e.printStackTrace(); } log.info("线程" + Thread.currentThread().getName() + "即将离开"); sp.release();//释放许可,许可数加1 //下面代码有时候执行不准确,因为其没有和上面的代码合成原子单元 log.info("线程" + Thread.currentThread().getName() + "已离开,当前已有" + (3 - sp.availablePermits()) + "个并发"); } }; service.execute(runnable); } log.info("计算之后的count={}", count); } //停车场同时容纳的车辆10 private static Semaphore semaphore = new Semaphore(10); @Test public void test2() { //模拟100辆车进入停车场 for (int i = 0; i < 100; i++) { Thread thread = new Thread(new Runnable() { @Override public void run() { try { System.out.println("====" + Thread.currentThread().getName() + "来到停车场"); if (semaphore.availablePermits() == 0) { System.out.println("车位不足,请耐心等待"); } semaphore.acquire();//获取令牌尝试进入停车场 System.out.println(Thread.currentThread().getName() + "成功进入停车场"); Thread.sleep(new Random().nextInt(10000));//模拟车辆在停车场停留的时间 System.out.println(Thread.currentThread().getName() + "驶出停车场"); semaphore.release();//释放令牌,腾出停车场车位 } catch (InterruptedException e) { e.printStackTrace(); } } }, i + "号车"); thread.start(); } } } ``` #### Executors +  主要用来创建线程池,代理了线程池的创建,使得你的创建入口参数变得简单 + newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。 + newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。 + newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。 + newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO,LIFO, 优先级)执行。 ##### 代码示例 ``` public class ExecutorsTest { public static void main(String[] args) { // 1、newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。 ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { cachedThreadPool.execute(new Runnable() { @Override public void run() { // 这里会发现结果有重复的线程ID System.out.println("Thread:" + Thread.currentThread().getId() + ";newCachedThreadPool"); } }); } // 2、newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。 ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2); for (int i = 0; i < 10; i++) { newFixedThreadPool.execute(new Runnable() { @Override public void run() { try { Thread.sleep(2000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } // 这里会发现两个两个执行完后才继续执行,并且线程ID不变 System.out.println("Thread:" + Thread.currentThread().getId() + ";newFixedThreadPool"); } }); } // 3、newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。 ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(2); // 延迟3秒钟后执行任务 newScheduledThreadPool.schedule(new Runnable() { @Override public void run() { System.out.println("运行时间: " + new Date()); } }, 3, TimeUnit.SECONDS); // 延迟1秒钟后每隔3秒执行一次任务 newScheduledThreadPool.scheduleAtFixedRate(new Runnable() { @Override public void run() { System.out.println("运行时间: " + new Date()); } }, 1, 3, TimeUnit.SECONDS); // 4、newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。 ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor(); for (int i = 0; i < 10; i++) { newSingleThreadExecutor.execute(new Runnable() { @Override public void run() { // 这里会发现结果有重复的线程ID System.out.println("Thread:" + Thread.currentThread().getId() + ";newSingleThreadExecutor"); } }); } } } ``` #### CyclicBarrierTest +  栅栏屏障,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行 ##### 代码示例 ``` public class CyclicBarrierTest { public static void main(String[] args) { CyclicBarrier cyclicBarrier = new CyclicBarrier(10); for (int i = 0; i < 10; i++) { new Thread(new Runnable() { public void run() { try { Thread.sleep(new Random().nextInt(1000) + 1000); System.out.println("Thread:" + Thread.currentThread().getId() + "已经准备好"); cyclicBarrier.await(); System.out.println("Thread:" + Thread.currentThread().getId() + "开始出发"); } catch (Exception e) { e.printStackTrace(); } } }).start(); } } } ``` #### CountDownLatchTest + 能够使一个线程等待其他线程完成各自的工作后再执行 ##### 代码示例 ``` public class CountDownLatchTest { @Test public void test1() throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(2); new Task1(countDownLatch).start(); new Task2(countDownLatch).start(); countDownLatch.await(); System.out.println("起锅烧油"); } public static class Task1 extends Thread{ CountDownLatch countDownLatch; public Task1(CountDownLatch countDownLatch) { this.countDownLatch = countDownLatch; } @Override public void run() { System.out.println("正在洗菜中...."); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("菜已洗好!"); countDownLatch.countDown(); } } public static class Task2 extends Thread{ CountDownLatch countDownLatch; public Task2(CountDownLatch countDownLatch) { this.countDownLatch = countDownLatch; } @Override public void run() { System.out.println("正在煮饭中...."); try { Thread.sleep(6000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("饭已煮好!"); countDownLatch.countDown(); } } } ```