# 手撕线程池 **Repository Path**: bossDuy/hand-tearing-thread-pool ## Basic Information - **Project Name**: 手撕线程池 - **Description**: No description available - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2025-05-23 - **Last Updated**: 2025-05-23 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # 手写一个简单的线程池 ## 理解线程池的原理 线程池就是为了减少频繁的创建和销毁线程带来的性能损耗,工作原理: ![image-20250523180633812](D:\Typora\imgs\image-20250523180633812.png) 简单的说:线程池就是有一个存放线程的集合和一个存放任务的阻塞队列。当提交一个任务的时候,判断核心线程是否满了,没满就会创建一个核心线程加入线程池并且执行任务,核心线程是不会被销毁的即使没有任务执行;满了就会放入任务队列等待;如果队列满了的话就会创建非核心线程进行执行任务,这些非核心线程在不执行任务的时候就会等一段时间销毁(配置的过期时间),如果创建的线程达到了最大线程数,那么就会执行拒绝策略。 可以简要整理如下: ``` 提交任务 -> 核心任务是否已满 为满,创建核心线程并执行任务 已满,则加入任务队列 队列未满 -> 等待执行 队列已满 -> 创建非核心线程 达到线程最大数量 -> 拒绝策略 未达到最大数量 -> 执行任务 ``` ## 自己实现简单的线程池 ### 第一步:实现了一个线程复用的线程池 ```java package com.yb0os1; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class MyThreadPool { //1、线程什么时候创建? /** 核心线程中我们要保证线程是可以复用的,那么就不可以直接new Thread(task).start(); 这样执行完task线程就会被销毁了 我们将接收到的任务对象放到队列中,然后线程从队列中取出任务,通过任务的run方法进行调用,这样就是在该线程上调用任务,并且调用完后不会销毁线程 */ //2、我们一开始使用 while (true) if(!tasks.isEmpty()) Runnable task = tasks.remove(0); /** 这样如果任务队列一直为空就会一循环,消耗cpu资源。此时就是阻塞队列出现了,当为空阻塞等待 非空执行 */ BlockingQueue taskQueue = new ArrayBlockingQueue<>(1024); Thread thread = new Thread(()->{ while (true){ if(!taskQueue.isEmpty()){ try { Runnable task = taskQueue.take(); task.run(); } catch (InterruptedException e) { throw new RuntimeException(e); } } } },"唯一线程"); { thread.start();//启动线程 } public void execute(Runnable task){ taskQueue.offer(task);//向队列添加元素 尽量是否offer 满则返回false add满则排除异常 } } ``` ```java package com.yb0os1; public class Main { public static void main(String[] args) { MyThreadPool myThreadPool = new MyThreadPool(); for (int i = 0; i < 5; i++) { myThreadPool.execute(()->{ try { Thread.sleep(1000); } catch (InterruptedException e) { //InterruptedException这个是线程中断异常, // 这个异常一般都是线程在等待或者阻塞中被中断了就会抛出的, // sleep wait等等都是有,除了LockSupport.park 这个会记录中断位 不会抛出这个异常 e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+"执行完毕"); }); } System.out.println("主线程没有被阻塞"); } } ``` ### 第二步:实现多个线程复用的线程池 ```java package com.yb0os1; import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; public class MyThreadPool { //任务队列 private final BlockingQueue taskQueue; //核心线程的数量 private final int corePoolSize; //最大线程的数量 private final int maxPoolSize; private final int keepAliveTime; private final TimeUnit unit; public MyThreadPool(int corePoolSize, int maxPoolSize, int keepAliveTime, TimeUnit unit, BlockingQueue taskQueue) { this.corePoolSize = corePoolSize; this.maxPoolSize = maxPoolSize; this.keepAliveTime = keepAliveTime; this.unit = unit; this.taskQueue = taskQueue; } //核心线程 List coreList = new ArrayList<>(); //非核心线程 List supportList = new ArrayList<>(); //添加元素和判断长度不是原子的,所以存在线程安全问题 可以加锁 CAS等解决 public void execute(Runnable command) { //目前线程列表中线程数量小于核心线程的数量,则创建线程 if (coreList.size() < corePoolSize) { Thread thread = new CoreThread(); coreList.add(thread); thread.start(); // return; } //成功添加到阻塞队列 if (taskQueue.offer(command)) { return; } //任务队列也满了 需要创建非核心线程 //核心线程满 任务队列满 但是非核心线程没有满才可以添加 if (coreList.size() + supportList.size() < maxPoolSize) { Thread thread = new SupportThread(); supportList.add(thread); thread.start(); return; } //我们创建完线程之后 并没有处理刚才的command 不能确定是否队列真的满了 if (!taskQueue.offer(command)) { //真的满了 抛出异常 throw new RuntimeException("线程池已满"); } } class CoreThread extends Thread { @Override public void run() { while (true) { try { Runnable task = taskQueue.take(); task.run(); } catch (InterruptedException e) { throw new RuntimeException(e); } } } } class SupportThread extends Thread { @Override public void run() { while (true) { try { Runnable command = taskQueue.poll(keepAliveTime, unit);//等待一秒没有获取就会返回null if (command == null) {//线程结束 break; } command.run(); } catch (InterruptedException e) { throw new RuntimeException(e); } } System.out.println(Thread.currentThread().getName()+"非核心线程结束"); supportList.remove(Thread.currentThread()); System.out.println("当前非核心线程数量为:" + supportList.size()); } } } ``` ```java package com.yb0os1; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.TimeUnit; public class Main { public static void main(String[] args) { MyThreadPool myThreadPool = new MyThreadPool(2,4,1, TimeUnit.SECONDS,new ArrayBlockingQueue<>(2)); for (int i = 0; i < 4; i++) { myThreadPool.execute(()->{ try { Thread.sleep(1000); } catch (InterruptedException e) { //InterruptedException这个是线程中断异常, // 这个异常一般都是线程在等待或者阻塞中被中断了就会抛出的, // sleep wait等等都是有,除了LockSupport.park 这个会记录中断位 不会抛出这个异常 e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+"执行完毕"); }); } System.out.println("主线程没有被阻塞"); } } ``` 存在问题,任务没有被正确的执行: b站评论区指出的:`if (blockingQueue.offer(command)) { return; } `这里如果任务成功放入队列,方法就直接 return 了。 但在 创建 SupportThread 的逻辑中,没有保证这个任务会被执行,因为 offer() 失败后你才创建新线程。 但 command 并没有交给这个新线程,而是再次尝试 offer(),如果失败就直接走拒绝策略了。 这样的话,可能 SupportThread 已经启动,但任务却没被执行。 理解:如果队列满了,我们创建非核心线程,但是并没有将这任务直接交给我们创建的新线程,而是再次尝试加入队列中,这就导致了一个不确定的状态: 1. 如果此时队列还是满的(`offer` 返回 `false`),就会直接抛出异常,任务未被执行 2. 如果队列此时恰好有空间(可能因为其他线程刚刚完成了任务,从而腾出了队列空间),那么任务会被放入队列,后续由某个线程(可能是核心线程,也可能是其他非核心线程)从队列中取出并执行。但新创建的非核心线程可能并没有真正处理这个任务。 解决方案:如果队列满了,我们要创建非核心线程并且由这个线程执行任务 也可以说 让线程执行当前 command 之后,再从 queue 中拿任务 ### 第三步:修复bug 设计拒绝策略 ```java package com.yb0os1; import com.yb0os1.reject.DiscardRejectHandle; import com.yb0os1.reject.ThrowRejectHandle; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.TimeUnit; public class Main { public static void main(String[] args) { MyThreadPool myThreadPool = new MyThreadPool(2,4,1, TimeUnit.SECONDS,new ArrayBlockingQueue<>(2),new DiscardRejectHandle()); for (int i = 0; i < 8; i++) { int finalI = i; myThreadPool.execute(()->{ try { Thread.sleep(100); } catch (InterruptedException e) { //InterruptedException这个是线程中断异常, // 这个异常一般都是线程在等待或者阻塞中被中断了就会抛出的, // sleep wait等等都是有,除了LockSupport.park 这个会记录中断位 不会抛出这个异常 e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+"执行完毕---"+ finalI); }); } System.out.println("主线程没有被阻塞"); } } ``` ```java package com.yb0os1; import com.yb0os1.reject.RejectHandle; import java.sql.SQLOutput; import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; public class MyThreadPool { public BlockingQueue getTaskQueue() { return taskQueue; } //任务队列 private final BlockingQueue taskQueue; //核心线程的数量 private final int corePoolSize; //最大线程的数量 private final int maxPoolSize; private final int keepAliveTime; private final TimeUnit unit; //拒绝策略 private final RejectHandle rejectHandle; public MyThreadPool(int corePoolSize, int maxPoolSize, int keepAliveTime, TimeUnit unit, BlockingQueue taskQueue, RejectHandle rejectHandle) { this.corePoolSize = corePoolSize; this.maxPoolSize = maxPoolSize; this.keepAliveTime = keepAliveTime; this.unit = unit; this.taskQueue = taskQueue; this.rejectHandle = rejectHandle; } //核心线程 List coreList = new ArrayList<>(); //非核心线程 List supportList = new ArrayList<>(); //添加元素和判断长度不是原子的,所以存在线程安全问题 可以加锁 CAS等解决 public void execute(Runnable command) { //目前线程列表中线程数量小于核心线程的数量,则创建线程 if (coreList.size() < corePoolSize) { Thread thread = new CoreThread(command); coreList.add(thread); thread.start(); return; } //成功添加到阻塞队列 if (taskQueue.offer(command)) { return; } //任务队列也满了 需要创建非核心线程 //核心线程满 任务队列满 但是非核心线程没有满才可以添加 if (coreList.size() + supportList.size() < maxPoolSize) { Thread thread = new SupportThread(command); supportList.add(thread); thread.start(); return; } //我们创建完线程之后 并没有处理刚才的command 不能确定是否队列真的满了 if (!taskQueue.offer(command)) { //真的满 使用拒绝策略 rejectHandle.reject(command,this); } } //优先处理传过来的 然后再去阻塞队列中获取 class CoreThread extends Thread { private final Runnable command; CoreThread(Runnable command) { this.command = command; } @Override public void run() { command.run(); while (true) { try { Runnable task = taskQueue.take(); System.out.println("核心线程"+Thread.currentThread().getName()+"正在执行任务"); task.run(); } catch (InterruptedException e) { throw new RuntimeException(e); } } } } class SupportThread extends Thread { private final Runnable command; SupportThread(Runnable command) { this.command = command; } @Override public void run() { command.run(); while (true) { try { Runnable command = taskQueue.poll(keepAliveTime, unit);//等待一秒没有获取就会返回null if (command == null) {//线程结束 break; } command.run(); } catch (InterruptedException e) { throw new RuntimeException(e); } } System.out.println(Thread.currentThread().getName()+"非核心线程结束"); supportList.remove(Thread.currentThread()); // System.out.println("当前非核心线程数量为:" + supportList.size()); } } } ``` ```java package com.yb0os1.reject; import com.yb0os1.MyThreadPool; public interface RejectHandle { void reject(Runnable command, MyThreadPool myThreadPool); } ``` ```java package com.yb0os1.reject; import com.yb0os1.MyThreadPool; public class DiscardRejectHandle implements RejectHandle{ @Override public void reject(Runnable command, MyThreadPool myThreadPool) { myThreadPool.getTaskQueue().poll(); System.out.println("任务被丢弃"); } } ``` ```java package com.yb0os1.reject; import com.yb0os1.MyThreadPool; public class ThrowRejectHandle implements RejectHandle{ @Override public void reject(Runnable command, MyThreadPool myThreadPool) { throw new RuntimeException("线程池已满"); } } ```