# juc-study **Repository Path**: ifyyf/juc-study ## Basic Information - **Project Name**: juc-study - **Description**: juc-study - **Primary Language**: Java - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2021-11-09 - **Last Updated**: 2021-11-09 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # Juc笔记 ## 前言 本课程学习与B站狂神说Java的[JUC并发编程](https://www.bilibili.com/video/BV1B7411L7tE) 本课程的代码都放在了我的[个人gitee仓库](https://gitee.com/ifyyf/juc-study)上了 ### 什么是JUC? - java.util.concurrent juc - java.util.concurrent.atomic 原子性 - java.util.concurrent.locks 锁 平时业务中可能用Thread 或者像Runnable接口实现,没有返回值,而且效率相对于callable较低 > java.util.concurrent > Interface Callable ### 进程与线程 我们都知道计算机的核心是CPU,它承担了所有的计算任务,而操作系统是计算机的管理者,它负责任务的调度,资源的分配和管理,统领整个计算机硬件 > - 进程是一个具有一定独立功能的程序在一个数据集上的一次动态执行的过程,是操作系统进行资源分配和调度的一个独立单位,是应用程序运行的载体 > - 线程是程序执行中一个单一的顺序控制流程,是程序执行流的最小单元,是处理器调度和分派的基本单位。一个进程可以有一个或多个线程,各个线程之间共享程序的内存空间(也就是所在进程的内存空间) 进程:一个程序,QQ.exe Music.exe 程序的集合; 一个进程往往可以包含多个线程,至少包含一个! Java默认有几个线程? 2 个 main、GC 线程:开了一个进程 Typora,写字,自动保存(线程负责的) 对于Java而言:Thread、Runnable、Callable **Java 真的可以开启线程吗? 开不了** ```java // 本地方法,调用了底层的C++,因为java运行在jvm虚拟机上,Java无法直接操作硬件 private native void start0(); ``` 线程有几个状态? `Thread.State可以看到,是一个枚举` ```java public enum State { //尚未启动的线程的线程状态 //新生 NEW, //运行 RUNNABLE, //阻塞 BLOCKED, //等待,死等 WAITING, //超时等待,过期不候 TIMED_WAITING, //终止 TERMINATED; } ``` ### 并发和并行 > - 并发是指两个或多个事件在同一时间间隔发生->交替进行 > - 一核cpu,模拟出来多个线程,快速交替运行 > - 并行是指两个或者多个事件在同一时刻发生 ->同时进行 > - 多核cpu,多个线程同时执行,线程池 并发编程的目标是充分的利用cpu的每一个核,以达到最高的处理性能 ```java //获取cpu的核数 System.out.println(Runtime.getRuntime().availableProcessors()); ``` #### wait/sleep 区别 **1、来自不同的类** wait => Object sleep => Thread **2、关于锁的释放** wait 会释放锁 sleep 睡觉了,抱着锁睡觉,不会释放! **3、使用的范围是不同的** wait必须在synchronized同步代码块中使用 sleep可以在任何地方睡 **4、是否需要捕获异常**(存疑) `throws InterruptedException` wait 也需要捕获异常(实测提示需要捕获异常,且不捕获会报错!) sleep 必须要捕获异常 ## Lock锁 只要是并发编程,就一定需要有锁! ### 传统的synchronized锁 > 此处不谈线程池,讲普通的方法 解耦线程类,不必要再去写一个单独的线程类继承Runnable接口 而是使用lambda表达式`()->{}`实现Runnable接口来创建线程 ```java new Thread(()->{ //do something },"Name").start(); ``` 然后synchronized锁方法上,锁住这个对象 ```java public synchronized void sale(){ if(num<=0)return; System.out.println(Thread.currentThread().getName()+" 买到第"+(num--)+"张票,剩下"+num+"张票"); } ``` 还是老生常谈的卖票 ```java public static void main(String[] args) throws InterruptedException { Ticket ticket = new Ticket(); new Thread(()->{ for(int i=0;i<100;i++){ ticket.sale(); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } },"A").start(); new Thread(()->{ for(int i=0;i<100;i++){ ticket.sale(); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } },"B").start(); new Thread(()->{ for(int i=0;i<100;i++){ ticket.sale(); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } },"C").start(); } static class Ticket{ private int num=100; public synchronized void sale(){ if(num<=0)return; System.out.println(Thread.currentThread().getName()+" 买到第"+(num--)+"张票,剩下"+num+"张票"); } } ``` ### Lock锁 `java.util.concurrent.locks.Lock`的Lock是一个接口 > 建议的做法是*始终*立即跟随`lock`与`try`块的通话 > > 最常见的是在之前/之后的上锁`lock.lock(); `和解锁`lock.unlock() ` 它有几个实现类: - ReentrantLock可重入锁 - ReentrantReadWriteLock.ReadLock读锁 - ReentrantReadWriteLock.WriteLock写锁 我们来看看可重入锁`ReentrantLock`的构造器 ```java //默认创建非公平锁Nonfair public ReentrantLock() { sync = new NonfairSync(); } //boolean参数为true创建公平锁Fair,反之创建非公平锁Nonfair public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); } ``` > 公平锁:公平:需要先来后到 > 非公平锁:不公平:可以插队 (默认) 使用三部曲 1. `Lock lock=new ReentrantLock();`实例化锁对象 2. `lock.lock();`上锁 3. 在**finally中**`lock.unlock();`解锁 ```java static class Ticket{ private int num=100; //可重入锁 Lock lock=new ReentrantLock(); public void sale(){ //上锁 lock.lock(); try{ //do something 业务代码 if(num<=0)return; System.out.println(Thread.currentThread().getName()+" 买到第"+(num--)+"张票,剩下"+num+"张票"); }catch(Exception e){ e.printStackTrace(); }finally{ lock.unlock(); } } } ``` ### synchronized和lock的区别 1. Synchronized 内置的Java**关键字**, Lock 是一个Java**类** 2. Synchronized **无法判断获取锁的状态**,Lock **可以判断是否获取到了锁** 3. Synchronized 会**自动释放**锁,lock 必须要**手动释放**锁!如果不释放锁,**死锁** - 如果在Synchronized中出现异常,会自动释放锁 4. Synchronized 线程 1(获得锁,阻塞)、线程2(等待,傻傻的等);Lock锁就不一定会等待下去; - `lock.tryLock()`尝试获取锁 ,获取不到就自己掉头走了不会等下去 5. Synchronized 可重入锁,不可以中断的,非公平;Lock ,可重入锁,可以判断锁,非公平(可以自己设置) 6. Synchronized 适合锁少量的代码同步问题,Lock 适合锁大量的同步代码! ### 生产者、消费者问题 面试:单例模式、排序算法、生产消费者、死锁 #### 老版的synchronized实现 当num为0时,消费者等待,生产者生成消息 当num>=0时,生产者等待,消费者进行消费 我们先来看一下**这段问题代码** ```java public static void main(String[] args) throws InterruptedException { Data data = new Data(); new Thread(()->{ for(int i=0;i<100;i++){ try { data.pro(); } catch (InterruptedException e) { e.printStackTrace(); } } },"ProducerA").start(); new Thread(()->{ for(int i=0;i<100;i++){ try { data.con(); } catch (InterruptedException e) { e.printStackTrace(); } } },"ConsumerA").start(); } static class Data{ private int num=0; public synchronized void pro() throws InterruptedException { if(num!=0){ System.out.println(Thread.currentThread().getName()+"正在等待"); this.wait(); } num++; System.out.println(Thread.currentThread().getName()+" 生产者生产了一条消息,此时num="+num); this.notifyAll(); } public synchronized void con() throws InterruptedException { if(num==0){ System.out.println(Thread.currentThread().getName()+"正在等待"); this.wait(); } num--; System.out.println(Thread.currentThread().getName()+" 消费者消费了一条消息,此时num="+num); this.notifyAll(); } } ``` 这个时候代码会正确运行嘛,结论是会的 那如果我们**放置多个**producer和consumer呢? ```java new Thread(()->{ for(int i=0;i<100;i++){ try { data.pro(); } catch (InterruptedException e) { e.printStackTrace(); } } },"ProducerA").start(); new Thread(()->{ for(int i=0;i<100;i++){ try { data.pro(); } catch (InterruptedException e) { e.printStackTrace(); } } },"ProducerB").start(); new Thread(()->{ for(int i=0;i<100;i++){ try { data.con(); } catch (InterruptedException e) { e.printStackTrace(); } } },"ConsumerA").start(); new Thread(()->{ for(int i=0;i<100;i++){ try { data.con(); } catch (InterruptedException e) { e.printStackTrace(); } } },"ConsumerB").start(); ``` 可以看见有很大几率会**出问题** > ConsumerA 消费者消费了一条消息,此时num=-92 > ConsumerA 消费者消费了一条消息,此时num=-93 > ConsumerA 消费者消费了一条消息,此时num=-94 > ConsumerA 消费者消费了一条消息,此时num=-95 > ConsumerA 消费者消费了一条消息,此时num=-96 > ConsumerB 消费者消费了一条消息,此时num=-97 > ProducerB 生产者生产了一条消息,此时num=-96 > > 正在等待 这里出现的就是**虚假唤醒** 查看Object的wait方法的api文档可以看见 > 线程也可以唤醒,而不会被通知,中断或超时,即所谓的*虚假唤醒* > > 比如说买货,如果商品本来没有货物,突然进了一件商品,这是所有的线程都被唤醒了 ,但是只能一个人买,所以其他人都是假唤醒,获取不到对象的锁 > > 虽然这在实践中很少会发生,但应用程序必须通过测试应该使线程被唤醒的条件来防范,并且如果条件不满足则继续等待。 > > 换句话说,**等待应该总是出现在循环中** #### 为什么if块会存在虚假唤醒的情况? > 在if块中使用wait方法,是非常危险的,因为一旦线程被唤醒,并得到锁,就不会再判断if条件,而执行if语句块外的代码 > > 所以建议,凡是先要做条件判断,再wait的地方,都使用while循环来做 ```java synchronized (obj) { while () obj.wait(timeout); ... // Perform action appropriate to condition } ``` 所以我们将原有的代码将if改为while ```java public synchronized void pro() throws InterruptedException { while (num!=0){ System.out.println(Thread.currentThread().getName()+"正在等待"); this.wait(); } num++; System.out.println(Thread.currentThread().getName()+" 生产者生产了一条消息,此时num="+num); this.notifyAll(); } public synchronized void con() throws InterruptedException { while (num==0){ System.out.println(Thread.currentThread().getName()+"正在等待"); this.wait(); } num--; System.out.println(Thread.currentThread().getName()+" 消费者消费了一条消息,此时num="+num); this.notifyAll(); } ``` #### juc版的生产者和消费者实现 使用Lock和Condition两个接口 其中lock对象我们这使用`ReentrantLock`实例化 condition对象使用`lock.newCondition()`获取 > Condition实现可以提供Object监视器方法的行为和语义,例如有保证的通知顺序,或者在执行通知时不需要 > > 锁定 > > 一个Condition实例本质上绑定到一个锁。 要获得特定Condition实例的Condition实例,请使用其 > > newCondition()方法 ![](https://gitee.com/ifyyf/resource/raw/master/img/202111042344018.png) > Condition因素出Object监视器方法( wait,notify 和 notifyAll )成不同的对象,以得到具有多个等待集的每个对象,通过将它们与使用任意的组合的效果Lock个实现。 > > Lock替换synchronized方法和语句的使用, Condition取代了对象监视器方法的使用。 ```java static class Data{ private int num=0; //获取锁,取代了wait和notify Lock lock=new ReentrantLock(); //获取condition,取代synchronized Condition condition = lock.newCondition(); public void pro(){ lock.lock(); try{ while (num!=0){ System.out.println(Thread.currentThread().getName()+"正在等待"); //等待 condition.await(); } num++; System.out.println(Thread.currentThread().getName()+" 生产者生产了一条消息,此时num="+num); //唤醒 condition.signalAll(); }catch(Exception e){ e.printStackTrace(); }finally{ lock.unlock(); } } public void con(){ lock.lock(); try{ while (num==0){ System.out.println(Thread.currentThread().getName()+"正在等待"); condition.await(); } num--; System.out.println(Thread.currentThread().getName()+" 消费者消费了一条消息,此时num="+num); condition.signalAll(); }catch(Exception e){ e.printStackTrace(); }finally{ lock.unlock(); } } } ``` 也许会有人问:既然synchronized更简洁,这里反而还多加了一层condition,岂不是更麻烦了? **当然不是** #### Lock+Condition与synchronized的区别 设置多个Condition监视器可以实现**精准的通知**和**唤醒线程** > 个人理解:不用就等待,需要则唤醒 #### Condition监视器的精准唤醒 ```java import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * @Author if * @Description: 使用Condition实现精准唤醒 * @Date 2021-11-05 上午 12:04 */ public class Test03 { public static void main(String[] args) { Data data=new Data(); new Thread(()->{ for(int i=0;i<10;i++){ data.soutA(); } },"A").start(); new Thread(()->{ for(int i=0;i<10;i++){ data.soutB(); } },"B").start(); new Thread(()->{ for(int i=0;i<10;i++){ data.soutC(); } },"C").start(); } static class Data{ private Lock lock=new ReentrantLock(); private Condition condition1=lock.newCondition(); private Condition condition2=lock.newCondition(); private Condition condition3=lock.newCondition(); private int num=1; public void soutA(){ lock.lock(); try{ //num不为1时,condition1等待 while(num!=1){ condition1.await(); } System.out.println("AAAAAA"); num=2; //A输出完后,唤醒condition2来输出B condition2.signal(); }catch(Exception e){ e.printStackTrace(); }finally{ lock.unlock(); } } public void soutB(){ lock.lock(); try{ //num不为2时,condition2等待 while(num!=2){ condition2.await(); } System.out.println("BBBBBB"); num=3; //B输出完后,唤醒condition3来输出C condition3.signal(); }catch(Exception e){ e.printStackTrace(); }finally{ lock.unlock(); } } public void soutC(){ lock.lock(); try{ //num不为3时,condition3等待 while(num!=3){ condition3.await(); } System.out.println("CCCCCC"); num=1; //B输出完后,唤醒condition1来输出A condition1.signal(); }catch(Exception e){ e.printStackTrace(); }finally{ lock.unlock(); } } } } ``` 结果 > AAAAAA > BBBBBB > CCCCCC > AAAAAA > BBBBBB > CCCCCC > 其实condition还有**awaitNanos超时等待**和awaitUntil超时时间等待,下文ArrayBlockingQueue会讲到 ### 八锁问题 #### 1.锁对象的同步锁synchronized ```java import java.util.concurrent.TimeUnit; /** * @Author if * @Description: * * 问题:main主线程中 没有 添加TimeUnit.SECONDS.sleep(1);之前,输出顺序? * 答题:看cpu的时间片分配,随机的(多开几个线程测试就行) * * 问题:mail方法中 没有 添加TimeUnit.SECONDS.sleep(4);之前,输出顺序? * 答题:先发邮件再打电话 * * 问题:mail方法中添加了TimeUnit.SECONDS.sleep(4);后,输出顺序? * 答题:还是先发邮件再打电话,因为synchronized不会释放锁,直到代码结束才释放 * * @Date 2021-11-05 上午 12:20 */ public class Test1 { public static void main(String[] args) { Phone phone = new Phone(); new Thread(()->{ phone.mail(); },"A").start(); try{ //main线程休眠 TimeUnit.SECONDS.sleep(1); }catch(Exception e){ e.printStackTrace(); } new Thread(()->{ phone.call(); },"B").start(); } static class Phone{ //synchronized锁住调用者(实例化的对象) public synchronized void mail(){ try{ //休眠了锁也没被释放 TimeUnit.SECONDS.sleep(4); }catch(Exception e){ e.printStackTrace(); } System.out.println("发邮件"); } public synchronized void call(){ System.out.println("打电话"); } } } ``` #### 2.synchronized和普通方法不同步 ```java import java.util.concurrent.TimeUnit; /** * @Author if * @Description: * * 问题:如果我将mail方法上synchronized,call方法不上,此时调用顺序 * 答题:先打电话再发邮件,因为call不需要获得锁就能执行,只是需要等待main线程的1秒睡眠 * * @Date 2021-11-05 上午 12:29 */ public class Test2 { public static void main(String[] args) { Phone phone = new Phone(); /** * 问题:假如现在有两个phone,且mail和call方法都是synchronized时,调用俩对象的方法时的执行顺序 * 答题:当然是按正常顺序来了,因为synchronized锁的调用者,即实例化对象,而这有两个不同的对象 */ // Phone phone1 = new Phone(); // Phone phone2 = new Phone(); // new Thread(()->{ // phone1.mail(); // },"A").start(); // try{ // //main线程休眠 // TimeUnit.SECONDS.sleep(1); // }catch(Exception e){ // e.printStackTrace(); // } // new Thread(()->{ // phone2.call(); // },"B").start(); new Thread(()->{ phone.mail(); },"A").start(); try{ //main线程休眠 TimeUnit.SECONDS.sleep(1); }catch(Exception e){ e.printStackTrace(); } new Thread(()->{ phone.call(); },"B").start(); } static class Phone{ //synchronized锁住调用者(实例化的对象) public synchronized void mail(){ try{ //休眠了锁也没被释放 TimeUnit.SECONDS.sleep(4); }catch(Exception e){ e.printStackTrace(); } System.out.println("发邮件"); } //这里没有上同步锁,当然不受影响了 public void call(){ System.out.println("打电话"); } } } ``` #### 3.锁class的同步锁synchronized ```java import java.util.concurrent.TimeUnit; /** * @Author if * @Description: * * 问题:mail和call方法改为static静态方法后,调用顺序 * 答题:先短信再电话,因为此时锁住的Phone.class,而不是实例化对象 * * 问题:实例化两个对象,phone1和phone2,调用顺序 * 答题:还是先短信再电话,锁住的是Phone.class,而不是实例化对象 * * * @Date 2021-11-05 上午 12:37 */ public class Test3 { public static void main(String[] args) { Phone phone = new Phone(); new Thread(()->{ phone.mail(); },"A").start(); try{ //main线程休眠 TimeUnit.SECONDS.sleep(1); }catch(Exception e){ e.printStackTrace(); } new Thread(()->{ phone.call(); },"B").start(); Phone phone1 = new Phone(); Phone phone2 = new Phone(); new Thread(()->{ phone1.mail(); },"A").start(); try{ //main线程休眠 TimeUnit.SECONDS.sleep(1); }catch(Exception e){ e.printStackTrace(); } new Thread(()->{ phone2.call(); },"B").start(); } static class Phone{ //synchronized锁住class,和对象相不相同没有关系,因为多个对象的class都是这同一个 public static synchronized void mail(){ try{ TimeUnit.SECONDS.sleep(4); }catch(Exception e){ e.printStackTrace(); } System.out.println("发邮件"); } public static synchronized void call(){ System.out.println("打电话"); } } } ``` #### 4.静态synchronized和普通synchronized锁 ```java import java.util.concurrent.TimeUnit; /** * @Author if * @Description: * * 问题:此时mail是静态synchronized方法,call是普通synchronized方法,问执行顺序? * 答题: * 先打电话再发邮件,因为两把锁不同,mail虽然先start,但是他锁住class且睡眠4秒 * 此时main线程的1秒已经结束,开始call的start,而call锁的对象锁,和mail的class锁不同 * 所以不需要同步,则执行call * * @Date 2021-11-05 上午 12:46 */ public class Test4 { public static void main(String[] args) { Phone phone = new Phone(); new Thread(()->{ phone.mail(); },"A").start(); try{ //main线程休眠 TimeUnit.SECONDS.sleep(1); }catch(Exception e){ e.printStackTrace(); } new Thread(()->{ phone.call(); },"B").start(); } static class Phone{ public static synchronized void mail(){ try{ TimeUnit.SECONDS.sleep(4); }catch(Exception e){ e.printStackTrace(); } System.out.println("发邮件"); } public synchronized void call(){ System.out.println("打电话"); } } } ``` #### 总结 > synchronized在**代码块**或者**普通方法**中,锁住的是**方法的调用者**(实例化对象) > > 在**静态方法**中,锁住的类的class > > 对象锁和class锁不同,所以不需要同步 ## 不安全的List类 我们之前使用的集合都是在单线程情况下,所以没有出现问题,但是其实很多都是不安全的 例如我们平时经常使用的`ArrayList` ```java //多线程下的ArrayList插入报错 // 并发修改异常:java.util.ConcurrentModificationException List stringList=new ArrayList<>(); for(int i=1;i<=100;i++){ new Thread(()->{ stringList.add(UUID.randomUUID().toString().substring(0,5)); System.out.println(stringList); },i+"").start(); } ``` 单线程玩多了,乍一看没什么问题,可是这是在多线程的情况下,就会出现并发修改异常 ![](https://gitee.com/ifyyf/resource/raw/master/img/202111051458013.png) ### 如何优化让他变成线程安全的呢? #### 1.使用Vector代替 Vector的增删改查都加上了同步锁synchronized,使得线程安全 但是效率怎么样呢?我们下文再说 ```java //多线程下的Vector没有报错,因为底层加了synchronized List stringVector=new Vector<>(); for(int i=1;i<=100;i++){ new Thread(()->{ stringVector.add(UUID.randomUUID().toString().substring(0,5)); System.out.println(stringVector); },i+"").start(); } ``` #### 2.使用Collections转synchronizedList 使用Collections.synchronizedList()方法将普通list转为线程安全的list ```java //如果我想用安全的list呢 //使用Collections.synchronizedList将普通list转为线程安全的list List stringList=new ArrayList<>(); List synchronizedList = Collections.synchronizedList(stringList); for(int i=1;i<=100;i++){ new Thread(()->{ synchronizedList.add(UUID.randomUUID().toString().substring(0,5)); System.out.println(synchronizedList); },i+"").start(); } ``` ### 如何既保证线程安全,效率也高呢 #### 使用JUC的CopyOnWriteArrayList JUC:使用CopyOnWriteArrayList,解决并发 > COW :写入时复制,一种优化策略 list是唯一固定的,多个线程读取时是固定的,但是写入时有可能会覆盖 COW写入时避免了覆盖,防止了数据问题 ```java /** * JUC:使用CopyOnWriteArrayList,解决并发 */ List list = new CopyOnWriteArrayList<>(); for(int i=1;i<=100;i++){ new Thread(()->{ list.add(UUID.randomUUID().toString().substring(0,5)); System.out.println(list); },i+"").start(); } ``` #### 怎么解决的? 写入时先复制一份长度+1的数组,然后末尾插入数据,再把数组赋给原数组完成插入 插入源码为例 ```java public boolean add(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; Object[] newElements = Arrays.copyOf(elements, len + 1); newElements[len] = e; setArray(newElements); return true; } finally { lock.unlock(); } } ``` #### CopyOnWriteArrayList比vector好在哪? Vector 的**增删改查**方法都加上了**synchronized锁**,保证同步的情况下,每个方法都要去获得锁,所以性能会下降 CopyOnWriteArrayList 方法只是在**增删改**方法上增加了**ReentrantLock锁** 但是他的**读方法不加锁**,==读写分离==,所以在读的方面就要比Vector性能要好 CopyOnWriteArrayList适合**读多写少**的并发情况 ## 不安全的Set类 和上面list差不多,我就不多做讲解了,直接贴代码 多线程下报错 ```java //并发修改异常:java.util.ConcurrentModificationException HashSet set = new HashSet<>(); for(int i=1;i<=100;i++){ new Thread(()->{ set.add(UUID.randomUUID().toString().substring(0,5)); System.out.println(set); },i+"").start(); } ``` 解决方案 ### 1.转synchronizedSet ```java HashSet set = new HashSet<>(); Set synchronizedSet = Collections.synchronizedSet(set); for(int i=1;i<=100;i++){ new Thread(()->{ synchronizedSet.add(UUID.randomUUID().toString().substring(0,5)); System.out.println(synchronizedSet); },i+"").start(); } ``` ### 2.使用CopyOnWriteArraySet ```java Set set = new CopyOnWriteArraySet<>(); for(int i=1;i<=100;i++){ new Thread(()->{ set.add(UUID.randomUUID().toString().substring(0,5)); System.out.println(set); },i+"").start(); } ``` ### 简单说明一下HashSet的实现 这里提一嘴HashSet的实现,说了不一定加分,但是说不出来一定扣分 本质就是就是**new的HashMap**,然后new的Object当做HashMap的value,add的参数当做key 因为是hash算法,所以HashSet是**无序的** 因为key不能重复,所以HashSet的的元素是**不能重复的** ```java private transient HashMap map; private static final Object PRESENT = new Object(); public HashSet() { map = new HashMap<>(); } public boolean add(E e) { return map.put(e, PRESENT)==null; } ``` ## 不安全的Map类 单线程中我们经常使用的HashMap在多线程下也是不安全的 ```java //并发修改异常:ConcurrentModificationException HashMap map = new HashMap<>(); for(int i=1;i<=100;i++){ new Thread(()->{ map.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0,5)); System.out.println(map); }).start(); } ``` ### 1.用Hashtable代替 ```java Map hashtable = new Hashtable<>(); for (int i = 1; i <= 100; i++) { new Thread(() -> { hashtable.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0, 5)); System.out.println(hashtable); }).start(); } ``` 和之前的Vector代替ArrayList一样,用synchronized简单粗暴的加上同步保证线程安全,只是效率可能会低一些 ### 2.转synchronizedMap ```java HashMap map = new HashMap<>(); Map synchronizedMap = Collections.synchronizedMap(map); for (int i = 1; i <= 100; i++) { new Thread(() -> { synchronizedMap.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0, 5)); System.out.println(synchronizedMap); }).start(); } ``` ### 3.使用ConcurrentHashMap 使用java.util.concurrent.ConcurrentHashMap 并发的HashMap,在保证了线程安全的情况下也保证了效率的高效,推荐使用 对ConcurrentHashMap不熟悉的小伙伴可以看看我的[《简单谈谈ConcurrentHashMap》](https://blog.csdn.net/Jay_Chou345/article/details/121170366) ```java Map concurrentHashMap = new ConcurrentHashMap<>(); for (int i = 1; i <= 100; i++) { new Thread(() -> { concurrentHashMap.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0, 5)); System.out.println(concurrentHashMap); }).start(); } ``` ## 走进Callable > 返回结果并可能引发异常的任务。 实现者定义一个没有参数的单一方法,称为call > > Callable接口类似于Runnable ,因为它们都是为其实例可能由另一个线程执行的类设计的 > > 然而,A Runnable不返回结果,也不能抛出被检查的异常 - 可以有返回值 - 可以抛出异常 - 方法不同 - Runnable是run() - Callable是call() ### 老版本创建线程的两种方式 #### 1.extends Thread ```java public static void main(String[] args) { //老版方式:线程类继承Thread重写run方法,然后启动 new MyThread().start(); } static class MyThread extends Thread{ @Override public void run() { System.out.println("class MyThread extends Thread"); } } ``` #### 2.实现Runnable接口 ```java public static void main(String[] args) { //老版方式:线程类实现Runnable的run,将实例化对象放入Thread参数启动线程 new Thread(new MyRun()).start(); } static class MyRun implements Runnable{ @Override public void run() { System.out.println("class MyRun implements Runnable"); } } ``` ### 使用Callable创建线程 我们这里需要一个适配类`FutureTask` 这个类实现了RunnableFuture类,`FutureTask implements RunnableFuture` RunnableFuture类继承了Runnable,`RunnableFuture extends Runnable, Future` 所以`Thread(Runnable target)`可以将其传入 注意:`futureTask.get()`可以**获取返回结果**,但是可能会抛异常,需要捕获或抛出 因为要等待执行完毕才返回,所以有可能会阻塞,最好把它放在最后,或者异步通信来处理 ```java public static void main(String[] args) { /** * 使用Callable * 线程类实现Callable实现call()方法,这个方法可以有返回值,就是定义的泛型类型 * 创建FutureTask,将线程类对象传入 * 将FutureTask对象传入Thread再启动 * * Thread(Runnable target) * FutureTask implements RunnableFuture * RunnableFuture extends Runnable, Future */ MyCall callable = new MyCall(); //适配类 FutureTask futureTask = new FutureTask<>(callable); new Thread(futureTask).start(); //获取返回结果,可能会抛异常 //可能会阻塞,因为要等待执行完毕,所以最好把他放在最后,或者异步通信来处理 try { Integer integer = futureTask.get(); System.out.println(integer); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } static class MyCall implements Callable{ @Override public Integer call() throws Exception { System.out.println("class MyCall implements Callable"); return 1024; } } ``` ### FutureTask的状态 如果我们此时用同一个FutureTask传入两条线程,会输出两次结果吗? ```java MyCall callable = new MyCall(); //适配类 FutureTask futureTask = new FutureTask<>(callable); //如果此时用同一个futureTask对象开启两条线程会有什么结果呢? //答案是:只会有一个线程进行运行,且只输出一次结果 //因为FutureTask的state的状态从初始化NEW变了完成状态COMPLETING //然后在run方法中判断不为NEW则直接返回不执行了 new Thread(futureTask,"A").start(); new Thread(futureTask,"B").start(); ``` 答案是:**不会**,只会执行一次! 为什么? 我们看看源码 可以看到FutureTask有一个state表示状态变量,还有很多int类型的常量表示具体状态 这里我们暂时只关注NEW和COMPLETING ```java public class FutureTask implements RunnableFuture { private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; ``` 在构造器中,默认给state为NEW ```java public FutureTask(Callable callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable } ``` 第一条线程进入 在run方法中,执行了callable的call方法后,会将判断变量ran设置为true`if (ran) set(result);` 而在set方法中`UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)` 将NEW状态变为了COMPLETING 也就是说**此条FutureTask已经完成了他的使命,变为COMPLETING完成状态** 当下一条线程进来判断`state != NEW`时,直接return 所以执行了一次之后,其他的线程都无法继续执行run,也就是Callable的call方法了 所以可以得出结论:正常情况下,**一个FutureTask只能执行一次call** ```java public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } } ``` ## 常用的辅助类 ### CountDownLatch减法计数器 java.util.concurrent.CountDownLatch **减法计数器** 允许一个或多个线程等待直到在其他线程中执行的一组操作完成的同步辅助类 **可用于某些线程的强制执行** > CountDownLatch用给定的计数初始化。 await方法阻塞,直到由于countDown()方法的调用而导致当前计数达到零,之后所有等待线程被释放,并且任何后续的await 调用立即返回。 这是一个一次性的现象 - 计数无法重置。 如果您需要重置计数的版本,请考虑使用CyclicBarrier 。 > > A CountDownLatch是一种通用的同步工具,可用于多种用途。 一个CountDownLatch为一个计数的CountDownLatch用作一个简单的开/关锁存器,或者门:所有线程调用await在门口等待,直到被调用countDown()的线程打开。 一个CountDownLatch初始化N可以用来做一个线程等待,直到N个线程完成某项操作,或某些动作已经完成N次。 > > CountDownLatch一个有用的属性是,它不要求调用countDown线程等待计数到达零之前继续,它只是阻止任何线程通过await ,直到所有线程可以通过 ```java import java.util.concurrent.CountDownLatch; /** * @Author if * @Description: 倒计时锁存器(减法计数器) * 每次调用countDown方法进行-1 * 当总数不为0时,会一直阻塞下去 * 可以用于线程的强制执行(因为不执行会阻塞) * @Date 2021-11-06 上午 12:00 */ public class CountDownLatchDemo { public static void main(String[] args) throws InterruptedException { //count总数,用于倒计时 //配合await方法,在倒计时结束前不会再向下执行代码 CountDownLatch countDownLatch = new CountDownLatch(12); for(int i=1;i<=6;i++){ try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("第"+i+"次执行countDown"); countDownLatch.countDown(); // new Thread(()->{ // System.out.println(Thread.currentThread().getName()+"线程走了"); // //数量-1 // countDownLatch.countDown(); // },i+"").start(); } //等待计数器归零,然后再向下执行 countDownLatch.await(); System.out.println("关门"); } } ``` ### CyclicBarrier加法计数器 java.util.concurrent.CyclicBarrier **加法计数器** 允许一组线程全部等待彼此达到共同屏障点的同步辅助类 **可以用于某些线程的强制等待** > 循环阻塞在涉及固定大小的线程方的程序中很有用,这些线程必须偶尔等待彼此。 屏障被称为循环 ,因为它可以在等待的线程被释放之后重新使用。 > > > A CyclicBarrier支持一个可选的Runnable命令,每个屏障点运行一次,在派对中的最后一个线程到达之后,但在任何线程释放之前。 在任何一方继续进行之前,此屏障操作对更新共享状态很有用。 ```java import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; /** * @Author if * @Description: 循环屏障(加法计数器) * 使用await方法阻塞当前线程 * 当执行的await方法次数达不到构造器传入的参数parties时,会一直阻塞下去 * @Date 2021-11-06 上午 12:08 */ public class CyclicBarrierDemo { public static void main(String[] args) throws InterruptedException, BrokenBarrierException { //集齐7颗龙珠召唤神龙 CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{ System.out.println("集齐7颗龙珠召唤神龙成功"); }); for(int i=1;i<=7;i++){ Thread.sleep(500); int finalI = i; new Thread(()->{ System.out.println(Thread.currentThread().getName()+"收集了第"+finalI+"颗龙珠"); try { //执行等待,直到等待了7次 cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } },i+"").start(); } } } ``` ### Semaphore信号量 一个计数信号量,在概念上,信号量维持一组许可证。 如果有必要,每个acquire()都会阻塞,直到许可证可用,然后才能使用它。 每个release()添加许可证,潜在地释放阻塞获取方 但是,没有使用实际的许可证对象; Semaphore只保留可用数量的计数,并相应地执行 **信号量通常用于限制线程数**,而不是访问某些(物理或逻辑)资源 我们这假设有3个车位和6辆车需要停车,所以只能有3台车能停进去,其他的车需要等待车位空出才能停 ```java import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; /** * @Author if * @Description: 信号量 * 参数permits许可证,可以用于限制线程数 * @Date 2021-11-06 上午 12:17 */ public class SemaphoreDemo { public static void main(String[] args) { //permits许可证 //我们这假设有3个车位和6辆车需要停车 Semaphore semaphore = new Semaphore(3); for(int i=1;i<=6;i++){ new Thread(()->{ try{ //acquire得到许可证 semaphore.acquire(); System.out.println(Thread.currentThread().getName()+"线程抢到车位"); TimeUnit.SECONDS.sleep(3); System.out.println(Thread.currentThread().getName()+"线程离开车位"); }catch(Exception e){ e.printStackTrace(); }finally{ //release释放许可证 semaphore.release(); } },i+"").start(); } } } ``` ## ReadWriteLock读写锁 ReadWriteLock维护一对关联的locks ,一个用于只读操作,一个用于写入 读锁和写锁之间是**互斥的**,同一时间只能有一个在运行 **读的时候可以被多个线程同时读** **写的时候只能由一个线程来写** ```java import java.util.HashMap; import java.util.Map; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; /** * @Author if * @Description: 读写锁:主要防止多个线程同时写与同时读写导致幻读 * 读锁:共享锁 * 写锁:独占锁 * @Date 2021-11-06 上午 12:31 */ public class ReadWriteLockDemo { public static void main(String[] args) throws InterruptedException { MyCache myCache=new MyCache(); //10个线程只做写入 for(int i=1;i<=10;i++){ final int temp=i; new Thread(()->{ myCache.put(temp+"",temp); },"write"+i).start(); } //10个线程只做读取 for(int i=1;i<=10;i++){ int temp=i; new Thread(()->{ myCache.get(temp + ""); },"read"+i).start(); } } //自定义缓存 static class MyCache{ private volatile Map map=new HashMap<>(); //读写锁,可以通过writeLock和readLock获取写锁和读锁后,再进行上锁 private ReadWriteLock readWriteLock=new ReentrantReadWriteLock(); //写的时候,希望只能有一个线程操作 public void put(String key,Object value){ //从读写锁readWriteLock中获取写锁writeLock Lock writeLock = readWriteLock.writeLock(); //写锁进行上锁操作 writeLock.lock(); try{ System.out.println(Thread.currentThread().getName()+"写入key = "+key); map.put(key,value); System.out.println(Thread.currentThread().getName()+"写入完毕"); }catch(Exception e){ e.printStackTrace(); }finally{ //写锁进行解锁操作 writeLock.unlock(); } } //读的时候,希望每个线程都可以读 public void get(String key){ Lock readLock = readWriteLock.readLock(); readLock.lock(); try{ System.out.println(Thread.currentThread().getName()+"读取key = "+key+",value = "+map.get(key)); }catch(Exception e){ e.printStackTrace(); }finally{ readLock.unlock(); } } } } ``` ## 阻塞队列BlockingQueue ![](https://gitee.com/ifyyf/resource/raw/master/img/202111061328622.png) 什么时候会用到阻塞队列:多线程并发处理、线程池 和生产者消费者问题有点相似 - 写入:如果队列满了,就必须阻塞等待 - 取出:如果是队列是空的,必须阻塞等待生产 ### BlockingQueue的4组Api | 方式 | 抛出异常 | 不抛异常且有返回值 | 阻塞等待 | 超时等待 | | :--------: | :-------: | :----------------: | :------: | :-------: | | 添加 | add() | offer() | put() | offer(,,) | | 移除 | remove() | poll() | take() | poll(,) | | 判断队列首 | element() | peek | - | - | 添加操作都会进行判空,所以不能放null ```java checkNotNull(e); private static void checkNotNull(Object v) { if (v == null) throw new NullPointerException(); } ``` #### 1.抛出异常 设定好队列大小后,这些操作都会抛出异常 - 队列满再添加:`IllegalStateException: Queue full` - 队列空再取出:`NoSuchElementException` ```java //1、抛出异常 public static void throwException(){ //参数capacity表示队列大小 BlockingQueue blockingQueue = new ArrayBlockingQueue<>(3); System.out.println(blockingQueue.add("a")); System.out.println(blockingQueue.add("b")); System.out.println(blockingQueue.add("c")); //此时队首为a System.out.println("blockingQueue.element() = " + blockingQueue.element()); //队列大小设定为3,此时添加了第4个元素时会抛出异常 //java.lang.IllegalStateException: Queue full // System.out.println(blockingQueue.add("d")); System.out.println(blockingQueue.remove()); System.out.println(blockingQueue.remove()); System.out.println(blockingQueue.remove()); //清空队列后,再remove抛出异常java.util.NoSuchElementException // System.out.println(blockingQueue.remove()); //清空队列后抛出NoSuchElementException System.out.println("blockingQueue.element() = " + blockingQueue.element()); } ``` 这里的情况和普通LinkedList的队列的异常一模一样 我们查看一下`ArrayBlockingQueue`的源码 可以看到`ArrayBlockingQueue`调用了他的父类`AbstractQueue`的add()方法 而这个**add方法调用的是offer方法**,并在添加失败时主动抛出异常 (implement BlockingQueue的offer方法) ```java public boolean add(E e) { return super.add(e); } public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); } ``` **element方法也调用的是peek方法** 为空手动抛异常 ```java public E element() { E x = peek(); if (x != null) return x; else throw new NoSuchElementException(); } public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { return itemAt(takeIndex); // null when queue is empty } finally { lock.unlock(); } } ``` #### 2.不会抛出异常且有返回值 ```java //2.不抛出异常且有返回值 public static void noExceptionAndReturn(){ BlockingQueue blockingQueue = new ArrayBlockingQueue<>(3); System.out.println(blockingQueue.offer("a")); System.out.println(blockingQueue.offer("b")); System.out.println(blockingQueue.offer("c")); //此时队首为a System.out.println("blockingQueue.peek() = " + blockingQueue.peek()); //队列满返回false System.out.println(blockingQueue.offer("d")); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); //队列空,返回null System.out.println(blockingQueue.poll()); //队列空,无队首,返回null System.out.println("blockingQueue.peek() = " + blockingQueue.peek()); } ``` #### 3.阻塞等待 ```java //3、阻塞等待 public static void blockWait() throws InterruptedException { BlockingQueue blockingQueue = new ArrayBlockingQueue<>(3); System.out.println("开始put"); new Thread(()->{ try { blockingQueue.put("a"); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); new Thread(()->{ try { blockingQueue.put("b"); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); new Thread(()->{ try { blockingQueue.put("c"); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); //给1秒时间让线程插满队列 TimeUnit.SECONDS.sleep(1); //此时队列已满,这个阻塞线程将被阻塞无法输出 new Thread(()->{ try { blockingQueue.put("d"); System.out.println(Thread.currentThread().getName()+"put完毕"); } catch (InterruptedException e) { e.printStackTrace(); } },"阻塞线程").start(); //救急线程将bq中take出一个元素,队列空一个位置,上一个阻塞线程可以完成put new Thread(()->{ try { blockingQueue.take(); System.out.println("再开一个"+Thread.currentThread().getName()+"take出来,阻塞线程能不能输出?可以"); } catch (InterruptedException e) { e.printStackTrace(); } },"救急线程").start(); System.out.println("主线程能不能输出?能输出,不影响,因为阻塞的是上上面的那个阻塞线程"); System.out.println("如果在主线程put的话,这里一样会被阻塞\n===================="); } ``` 因为`put()`和`take()`用了**Condition**监视器,调用`await`和`single`实现了**精准睡眠和唤醒** 下面是解析 **成员变量condition** 上文有讲到,这里不再赘述 `private final Condition notFull;` **put方法** `while (count == items.length)`队列满时,`notFull.await();`线程等待 enqueue方法中`notEmpty.signal();`唤醒阻塞的take线程 ```java public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } } private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); } ``` **take方法** `while (count == 0)`队列空时,`notEmpty.await();`线程等待 dequeue方法中,`notFull.signal();`唤醒阻塞的put线程 ```java public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } } private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); return x; } ``` #### 4.超时等待 和阻塞等待类比就很好理解了 阻塞等待就是会一直**死等**,直到有其他线程操作队列才有可能被唤醒 超时等待在设定的时间内会等待,**超时则放弃** ```java //4、超时等待 public static void outTimeWait() throws InterruptedException { BlockingQueue blockingQueue = new ArrayBlockingQueue<>(3); //用的还是offer和poll方法,不过这次是带参数的 blockingQueue.offer("a"); blockingQueue.offer("b"); blockingQueue.offer("c"); System.out.println("普通offer方法,立即放弃->"+blockingQueue.offer("d")); //等待3秒后,若还是阻塞则放弃 //如果是不带参的方法会立即放弃 System.out.println("带参offer方法,等待后放弃->"+blockingQueue.offer("d",3,TimeUnit.SECONDS)); blockingQueue.poll(); blockingQueue.poll(); blockingQueue.poll(); System.out.println("普通poll方法,立即放弃->" + blockingQueue.poll()); //等待3秒后,还是阻塞则放弃 System.out.println("带参poll方法,等待后放弃->" + blockingQueue.poll(3,TimeUnit.SECONDS)); } ``` 我们来看看带参数offer源码 `long nanos = unit.toNanos(timeout);`获取了超时时间 `if (nanos <= 0)`判断计时是否结束 - nacos<0,倒计时结束,`return false;`放弃等待直接返回(心灰意冷) - nacos>0,还在计时`nanos = notFull.awaitNanos(nanos);`,调用condition的awaitNanos继续计时等待(抱有希望) ```java public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { checkNotNull(e); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) { if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } enqueue(e); return true; } finally { lock.unlock(); } } ``` ### 同步队列SynchronizedQueue 和其他的BlockingQueue不同,他不是用于储存元素 put了一个元素后就必须去take出来,不然就会等待(相当于只有1个空间的BlockingQueue?) ```java import java.util.concurrent.BlockingQueue; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; /** * @Author if * @Description: 同步队列 * 和其他的BlockingQueue不同,他不是用于储存元素 * put了一个元素后就必须去take出来,不然就会等待(相当于只有1个空间的BlockingQueue?) * @Date 2021-11-06 下午 04:42 */ public class SynchronizedQueueTest { public static void main(String[] args) { BlockingQueue synchronousQueue = new SynchronousQueue<>(); new Thread(()->{ try { System.out.println(Thread.currentThread().getName()+"put a"); synchronousQueue.put("a"); System.out.println(Thread.currentThread().getName()+"put b"); synchronousQueue.put("b"); System.out.println(Thread.currentThread().getName()+"put c"); synchronousQueue.put("c"); } catch (InterruptedException e) { e.printStackTrace(); } },"Thead-put").start(); new Thread(()->{ try { //给一点让他put的时间 TimeUnit.SECONDS.sleep(1); System.out.println(Thread.currentThread().getName()+"take = " + synchronousQueue.take()); TimeUnit.SECONDS.sleep(1); System.out.println(Thread.currentThread().getName()+"take = " + synchronousQueue.take()); TimeUnit.SECONDS.sleep(1); System.out.println(Thread.currentThread().getName()+"take = " + synchronousQueue.take()); } catch (InterruptedException e) { e.printStackTrace(); } },"Thread-take").start(); } } ``` ## 线程池 平时使用线程时需要创建、销毁。十分浪费资源和时间 池化技术:事先准备好一些资源,有人要用,就来我这里拿,用完之后还给我 **线程池的好处**: 1. 降低资源的消耗 2. 提高响应的速度 3. 方便管理 **线程复用**、**可以控制最大并发数**、**管理线程** 线程池:**3大方法**、**7大参数**、**4种拒绝策略 ** 阿里java规范中关于线程池写到 > 【==强制==】线程池**不允许使用`Executors`去创建**,而是**通过`ThreadPoolExecutor`的方式** > > 这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。 > > 说明: **Executors 返回的线程池对象的弊端**如下: > > - `FixedThreadPool`和`SingleThreadPool`: > - 允许的请求队列长度为`Integer.MAX_VALUE`, 可能会堆积大量的请求,从而导致OOM > - `CachedThreadPool `和`ScheduledThreadPool`: > - 允许的创建线程数量为`Integer.MAX_VALUE`,可能会创建大量的线程,从而导致OOM > > OOM:out of memory内存溢出 Executors也是new的ThreadPoolExecutor,只是默认规定了一些参数 ### 3大方法 #### 1、单线程执行器SingleThreadExecutor ```java //Single单个,创建单个线程处理 ExecutorService service1 = Executors.newSingleThreadExecutor(); try{ for(int i=1;i<=5;i++){ service1.execute(()->{ System.out.println(Thread.currentThread().getName()+"在执行"); }); } }catch(Exception e){ e.printStackTrace(); }finally{ service1.shutdown(); TimeUnit.SECONDS.sleep(1); System.out.println("========== service1已关闭 =========="); } ``` 执行结果 > pool-1-thread-1在执行 > pool-1-thread-1在执行 > pool-1-thread-1在执行 > pool-1-thread-1在执行 > pool-1-thread-1在执行 #### 2、固定线程池FixedThreadPool ```java //Fix固定,根据参数nThreads,创建固定的线程池大小 ExecutorService service2 = Executors.newFixedThreadPool(5); try{ for(int i=1;i<=10;i++){ service2.execute(()->{ System.out.println(Thread.currentThread().getName()+"在执行"); }); } }catch(Exception e){ e.printStackTrace(); }finally{ service2.shutdown(); TimeUnit.SECONDS.sleep(1); System.out.println("========== service2已关闭 =========="); } ``` 执行结果 > pool-2-thread-2在执行 > pool-2-thread-2在执行 > pool-2-thread-1在执行 > pool-2-thread-1在执行 > pool-2-thread-1在执行 > pool-2-thread-1在执行 > pool-2-thread-1在执行 > pool-2-thread-3在执行 > pool-2-thread-4在执行 > pool-2-thread-5在执行 #### 3、缓存线程池CachedThreadPool ```java //可伸缩的,遇强则强,遇弱则弱 ExecutorService service3 = Executors.newCachedThreadPool(); try{ for(int i=1;i<=10;i++){ //加入睡眠后发现全都是pool-3-thread-1在执行 //推断可能是同时存在大量并发时,才会有多个线程入池 // Thread.sleep(1); service3.execute(()->{ System.out.println(Thread.currentThread().getName()+"在执行"); }); } }catch(Exception e){ e.printStackTrace(); }finally{ service3.shutdown(); TimeUnit.SECONDS.sleep(1); System.out.println("========== service3已关闭 =========="); } ``` 执行结果 > pool-3-thread-1在执行 > pool-3-thread-2在执行 > pool-3-thread-3在执行 > pool-3-thread-4在执行 > pool-3-thread-5在执行 > pool-3-thread-7在执行 > pool-3-thread-8在执行 > pool-3-thread-9在执行 > pool-3-thread-2在执行 > pool-3-thread-6在执行 ### 7大参数 查看源码可以发现其实调用Executor的这3个方法中,也是new的ThreadPoolExecutor ```java public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())); } public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); } public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue()); } ``` 只是他们默认给了一些固定的参数,例如`maximumPoolSize=Integer.MAX_VALUE` 俗话说得好,适合的才是最好的,有时候他默认给的参数并不一定是适合的,所以阿里java规范中让我们调用原生的线程池帮助类去创建线程池 ```java public ThreadPoolExecutor(int corePoolSize, //初始核心线程池大小 int maximumPoolSize, //最大线程池大小(核心线程不够,增加非核心线程) long keepAliveTime, //保持活跃时间(超时无调用的非核心线程则释放) TimeUnit unit, //超时时间的单位 BlockingQueue workQueue,//阻塞队列(候客区) ThreadFactory threadFactory,//线程工厂,用于创建线程的,一般用默认的 RejectedExecutionHandler handler)//拒绝策略,请求超出最大线程的承受且阻塞队列也占满时,将采用拒绝策略,默认为AbortPolicy(超出就不接收了并抛出异常) ``` #### 线程池运行原理简述 可以看到,请求打到线程池时,线程池首先根据初始化时创建的核心线程去处理请求,当核心线程都在使用时,接下来的请求将会放入阻塞队列中去 当核心线程都在处理,且阻塞队列占满时,会根据`maximumPoolSize`最大线程池大小继续创建非核心线程 > `keepAliveTime`和`unit`决定了非核心线程能够在没有业务调用时的存活时间 > > 非核心线程在`keepAliveTime`结束后会进行收回 如果所有线程(核心与非核心)都被取走使用,且阻塞队列也占满的情况下就会采取拒绝策略 这里默认的拒绝策略是`AbortPolicy`,超出承受范围就不接收,并抛出异常 ```java private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); //RejectedExecutionHandler类是接口类,有4种实现类,我们称之为4种拒绝策略 public static class CallerRunsPolicy implements RejectedExecutionHandler public static class AbortPolicy implements RejectedExecutionHandler public static class DiscardPolicy implements RejectedExecutionHandler public static class DiscardOldestPolicy implements RejectedExecutionHandler ``` #### 代码简单实现 我们这初始化2个核心线程,最大线程数为5,超时时间为5秒,阻塞队列大小为3,默认的线程工厂和中止策略 ```java import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * @Author if * @Description: 调用原生ThreadPoolExecutor创建线程池 * @Date 2021-11-07 下午 03:30 */ public class MyPool { public static void main(String[] args) { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 2, 5, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); try{ for(int i=1;i<=9;i++){ threadPoolExecutor.execute(()->{ System.out.println(Thread.currentThread().getName()+"正在运行"); }); } }catch(Exception e){ e.printStackTrace(); }finally{ threadPoolExecutor.shutdown(); } } } ``` 结果显示,此时都是2个核心线程在运转 因为5个请求有3个放入了阻塞队列中 > pool-1-thread-1正在运行 > pool-1-thread-2正在运行 > pool-1-thread-1正在运行 > pool-1-thread-2正在运行 > pool-1-thread-1正在运行 此时我们将循环次数调到8次,可以看到已经创建多出了3,4,5线程,也就是根据最大线程数来决定创建的非核心线程(5-2=3),也就是说能够多创建出3个非核心线程来排忧解难 > pool-1-thread-2正在运行 > pool-1-thread-5正在运行 > pool-1-thread-3正在运行 > pool-1-thread-5正在运行 > pool-1-thread-1正在运行 > pool-1-thread-2正在运行 > pool-1-thread-3正在运行 > pool-1-thread-4正在运行 大家知道,我们最大有5个线程和队列中的3个位置,也就是一共可以并发8个请求,那我们将循环调到9次时会发生什么呢? >pool-1-thread-2正在运行 >pool-1-thread-4正在运行 >pool-1-thread-3正在运行 >pool-1-thread-1正在运行 >pool-1-thread-3正在运行 >pool-1-thread-4正在运行 >pool-1-thread-5正在运行 >pool-1-thread-2正在运行 >java.util.concurrent.RejectedExecutionException(一大段文字省略) 没错,就是当并发请求处理不过来的时候,我们选取的拒绝策略是`AbortPolicy中止策略`会直接拒绝请求并抛出异常`RejectedExecutionException` ### 4种拒绝策略 #### 1、中止策略AbortPolicy 当并发请求处理不过来的时候,`AbortPolicy中止策略`会直接丢弃任务并抛出异常`java.util.concurrent.RejectedExecutionException` #### 2、调用者运行策略CallerRunsPolicy 哪来的回哪去 线程池表示:我没资源继续处理你的请求了,谁将你放进来的就让谁去执行你把 > pool-1-thread-1正在运行 > main正在运行 > pool-1-thread-1正在运行 > pool-1-thread-3正在运行 > pool-1-thread-2正在运行 > pool-1-thread-3正在运行 > pool-1-thread-1正在运行 > pool-1-thread-5正在运行 > pool-1-thread-4正在运行 #### 3、丢弃策略DiscardPolicy 资源不足,直接丢弃任务且不抛出异常,(直接摆烂,哪有任务,我怎么不知道?) #### 4、饱和策略DiscardOldestPolicy 丢弃线程中旧的任务,将新的任务添加 **将最早进入队列的任务删除,之后再尝试加入队列** > 当任务被拒绝添加时,会抛弃任务队列中最旧的任务也就是最先加入队列的,再把这个新任务添加进去 > > 在rejectedExecution中先从任务队列中弹出最先加入的任务,空出一个位置,然后再次执行execute方法把任务加入队列 ```java public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } ``` ### 最大线程数如何定义? #### CPU密集型(CPU bound) CPU密集型也叫计算密集型,指的是系统的硬盘、内存性能相对CPU要好很多 此时,系统运作大部分的状况是CPU Loading100%,CPU要读/写I/O(硬盘/内存),I/O在很短的时间就可以完成,而CPU还有许多运算要处理,CPU Loading很高。在多重程序系统中,大部份时间用来做计算、逻辑判断等CPU动作的程序称之CPU bound CPU bound的程序一般而言CPU占用率相当高。这可能是因为任务本身不太需要访问I/O设备,也可能是因为程序是多线程实现因此屏蔽掉了等待I/O的时间 线程数一般设置为: **线程数 = CPU核数+1 **(现代CPU支持超线程,利用等待空闲) #### IO密集型(I/O bound) IO密集型指的是系统的CPU性能相对硬盘、内存要好很多,此时,系统运作,大部分的状况是CPU在等I/O (硬盘/内存) 的读/写操作,此时CPU Loading并不高。 I/O bound的程序一般在达到性能极限时,CPU占用率仍然较低。这可能是因为任务本身需要大量I/O操作,而pipeline做得不是很好,没有充分利用处理器能力。 线程数一般设置为: **线程数 = CPU总核心数 * 2 +1 ** java虚拟机的最大可用的处理器数量,决不会小于一个 ```java Runtime.getRuntime().availableProcessors() ``` 也可以任务管理器->性能->cpu->逻辑处理器,查看 ## 四大函数式接口 新时代的程序员需要掌握:lambda表达式、链式编程、函数式接口、Stream流式计算 ### 什么是函数式接口 **只有一个方法的interface接口** 典型的就是Runnable和Callable接口了 可以看到他们都是被注解`@FunctionalInterface`标注的,这个注解直译也是叫函数式接口 ```java @FunctionalInterface public interface Runnable { public abstract void run(); } @FunctionalInterface public interface Callable { V call() throws Exception; } ``` ### 1、函数型接口Function 有输入T,和输出R ```java @FunctionalInterface public interface Function { R apply(T t); } ``` ```java //普通调用,使用匿名内部类 Function function=new Function() { @Override public String apply(String s) { return s; } }; System.out.println(function.apply("test")); //lambda Function functionL= (str)->{return str;}; System.out.println(functionL.apply("lambda")); ``` 甚至还能更简易 ```java //更简易 Function functionLS= str-> str; System.out.println(functionLS.apply("simple")); ``` ### 2、断定型接口Predicate 有输入,返回boolean ```java @FunctionalInterface public interface Predicate { boolean test(T t); } ``` ```java Predicate predicate = new Predicate(){ @Override public boolean test(Integer num) { return num.equals(1); } }; System.out.println(predicate.test(2)); System.out.println(predicate.test(1)); Predicate predicateL= num-> num.equals(1); System.out.println("predicateL.test(1) = "+predicateL.test(1)); ``` ### 3、消费型接口Consumer 只有输入,没有返回值 ```java @FunctionalInterface public interface Consumer { void accept(T t); } ``` sout甚至可以更加简化`System.out::println` ```java Consumer consumer= str-> System.out.println(str); consumer.accept("out"); //极简版本 Consumer consumerS= System.out::println; consumerS.accept("out"); ``` ### 4、供给型接口Supplier 只有返回值,没有参数 ```java @FunctionalInterface public interface Supplier { T get(); } ``` ```java Supplier supplier= ()-> "asd"; System.out.println(supplier.get()); ``` ## Stream流式计算 ### stream流是io流? ![](https://gitee.com/ifyyf/resource/raw/master/img/202111071814485.png) **并不是!!!**这个是io流,是java.io包下的 而本处要讲的是Stream流,是java.util包下的 ```java import java.io.InputStream; import java.util.stream.Stream; ``` ### 什么是Stream流式计算? 大数据=计算+存储 存储:集合、mysql数据库。。。 计算:stream流 而这些流里面有很多很多的参数都是使用的函数式接口 话不多说,直接上代码 ```java import java.util.Arrays; import java.util.List; /** * @Author if * @Description: stream流学习 * 现在有5个用户进行筛选: * 1、ID必须是偶数 * 2、年龄必须大于23岁 * 3、用户名转为大写字母 * 4、用户名字母倒着排序 * 5、只输出一个用户 * @Date 2021-11-07 下午 06:12 */ public class Test01 { public static void main(String[] args) { User user1 = new User(1,"a",21); User user2 = new User(2,"b",22); User user3 = new User(3,"c",23); User user4 = new User(4,"d",24); User user5 = new User(6,"e",25); //将5个user转到list中 List list= Arrays.asList(user1,user2,user3,user4,user5); //计算交给stream流 list.stream() //ID必须是偶数 .filter(user-> user.getId()%2==0) //年龄必须大于23岁 .filter(user-> user.getAge()>23) //用户名转为大写字母 .map(user -> user.getName().toUpperCase()) //用户名字母倒着排序(可简化为Comparator.reverseOrder()) .sorted((u1,u2)->u2.compareTo(u1)) //只输出一个用户 .limit(1) .forEach(System.out::println); } } ``` ## ForkJoin分支合并计算 ### 什么是ForkJoin? ForkJoin在jdk1.7中,并行执行任务,提高效率,大数据量! 大数据:Map Reduce (把大任务拆分为小任务) ForkJoin本质:**分而治之** 一个大任务分为了许多的小任务,最后将结果汇总得到解答 ![](https://gitee.com/ifyyf/resource/raw/master/img/202111071842190.png) ForkJoin特点:**工作窃取** 当B线程更先完成时,A还未完成,那B就会去从A的任务里拿一些工作来做,帮助分担压力,提高效率 这里面维护的都是**双端队列Dequeue** ![](https://gitee.com/ifyyf/resource/raw/master/img/202111071843750.png) ### 如何使用ForkJoin 最好是**大数据量的情况下使用**才可以提升效率,小数据量的情况下还不如直接for循环 直接上代码,感觉方式像递归一样,加入了forkjoin的工作队列的机制 有3种调用 - public void execute(ForkJoinTask task),**直接调用,没有返回值** - public ForkJoinTask submit(ForkJoinTask task),提交**任务执行**,返回**得到ForkJoinTask类** - public final V get(),返回的ForkJoinTask类的**get方法可以得到执行结果** - public T invoke(ForkJoinTask task),**直接invoke得到执行结果** ```java import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.concurrent.RecursiveTask; import java.util.stream.LongStream; /** * @Author if * @Description: What is it * @Date 2021-11-07 下午 06:46 */ public class ForkJoinDemo extends RecursiveTask { public static void main(String[] args) throws ExecutionException, InterruptedException { long start=1L; long end=10_0000_0000L; ForkJoinDemo forkJoinDemo = new ForkJoinDemo(start,end); ForkJoinPool forkJoinPool=new ForkJoinPool(); //执行任务,没有返回值 // forkJoinPool.execute(forkJoinDemo); //提交任务,获得返回值ForkJoinTask类,然后根据这个类的get获取结果 // ForkJoinTask submit = forkJoinPool.submit(forkJoinDemo); // System.out.println("submit.get() = " + submit.get()); //采用invoke唤醒可直接得到返回值,比上一种方法更简便 Long sum = forkJoinPool.invoke(forkJoinDemo); System.out.println("sum = " + sum); } private Long start; private Long end; private Long temp=10000L; public ForkJoinDemo(Long start, Long end) { this.start = start; this.end = end; } @Override protected Long compute() { //小于临界值时,采取普通循环 if((end-start) supplier` ```java @FunctionalInterface public interface Supplier { T get(); } ``` 就是说在其中执行业务逻辑 然后可以通过`成功回调whenComplete`和`失败回调exceptionally`来决定业务的处理 代码如下,注释写的很清楚了,这里排列出来更方便查看 - 成功回调whenComplete,无论是否异常都执行,所以判断t和u来决定代码逻辑 - 参数t,正常的返回结果,t不为null则正常执行,t为null时则执行失败(出现异常) - 参数u,错误信息,u为null则正常执行,否则就是异常信息 - 没有返回值(因为业务成功执行的返回值200在supplyAsync中已经写了) - 失败回调exceptionally,只有在出现异常时才会执行 - 参数e,一般是Exception e - 有返回值(一般根据异常的不同决定不同的返回值) ```java //有返回值的runAsync异步调用 //返回一个CompletableFuture类对象,用get获取最终结果 CompletableFuture result = CompletableFuture.supplyAsync(() -> { //执行的业务逻辑 System.out.println("CompletableFuture.supplyAsync中执行业务逻辑"); // int i=1/0; return 200; }) //成功回调,无论是否异常都执行,所以判断t和u来决定代码逻辑 .whenComplete((t, u) -> { //t不为null则正常执行 if(!Objects.isNull(t)&&Objects.isNull(u)){ System.out.println("t = " + t);//正常的返回结果 }else{ //u为null则正常执行,否则就是异常信息 System.out.println("异常信息u = " + u);//错误信息 } }) //失败回调,只有在出现异常时才会执行 .exceptionally((e) -> { //这里的参数一般是Exception e // e.printStackTrace(); System.out.println("异常回调->e.getMessage() = " + e.getMessage()); return 400; }); //获取 成功/异常 的结果 System.out.println("result.get() = "+result.get()); ``` ### 执行结果 #### 正常执行时 > CompletableFuture.supplyAsync中执行业务逻辑 > t = 200 > result.get() = 200 #### 执行失败时 > CompletableFuture.supplyAsync中执行业务逻辑 > 异常信息u = java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero > 异常回调->e.getMessage() = java.lang.ArithmeticException: / by zero > > result.get() = 400 ## 理解JMM 之前我们经常接触到JVM,Java Virtual Machine,Java虚拟机 那JMM又是什么呢? Java Memory Model,java内存模型 它是一个不存在的模型,相当于一种概念、一种约定 ### 关于JMM的一些同步的约定 1. 线程解锁前,必须把共享变量==立刻==刷新回主存 2. 线程加锁前,必须读取主存中的最新值到工作内存中! 3. 加锁和解锁是同一把锁 > JVM在设计时候考虑到,如果JAVA线程每次读取和写入变量都直接操作主内存,对性能影响比较大,所以每条线程拥有各自的工作内存,工作内存中的变量是主内存中的一份拷贝,线程对变量的读取和写入,直接在工作内存中操作, 而不能直接去操作主内存中的变量。但是这样就会出现一个问题,当一个线程修改了自己工作内存中变量,对其他线程是不可见的,会导致线程不安全的问题。因为JMM制定了一套标准来保证开发者在编写多线程程序的时候,能够控制什么时候内存会被同步给其他线程 我们来看下面两个线程对于主内存的操作 ![](https://gitee.com/ifyyf/resource/raw/master/img/202111081655984.png) ### 内存交互操作 内存交互操作有**8种**,虚拟机实现必须保证每一个操作都是**原子的**,**不可在分的** (对于double和long类型的变量来说,load、store、read和write操作在某些平台上允许例外) - **lock** (锁定):作用于主内存的变量,把一个变量标识为线程独占状态 - **unlock** (解锁):作用于主内存的变量,它把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定 - **read** (读取):作用于主内存变量,它把一个变量的值从主内存传输到线程的工作内存中,以便随后的load动作使用 - **load** (载入):作用于工作内存的变量,它把read操作从主存中变量放入工作内存中 - **use** (使用):作用于工作内存中的变量,它把工作内存中的变量传输给执行引擎,每当虚拟机遇到一个需要使用到变量的值,就会使用到这个指令 - **assign** (赋值):作用于工作内存中的变量,它把一个从执行引擎中接受到的值放入工作内存的变量副本中 - **store** (存储):作用于主内存中的变量,它把一个从工作内存中一个变量的值传送到主内存中,以便后续的write使用 - **write**  (写入):作用于主内存中的变量,它把store操作从工作内存中得到的变量的值放入主内存的变量中 JMM对这八种指令的使用,制定了如下规则: - 不允许read和load、store和write操作之一单独出现 - 即**使用了read必须load**,**使用了store必须write** - 不允许线程丢弃他最近的assign操作 - 即**工作变量的数据改变了之后,必须告知主存** - 不允许一个线程将没有assign的数据从工作内存同步回主内存 - 一个新的变量必须在主内存中诞生,不允许工作内存直接使用一个未被初始化的变量 - 就是对**变量实施use、store操作之前,必须经过assign和load操作** - 一个变量同一时间只有一个线程能对其进行lock。多次lock后,必须执行相同次数的unlock才能解锁 - 如果对一个变量进行lock操作,会清空所有工作内存中此变量的值,**在执行引擎使用这个变量前,必须重新load或assign操作初始化变量的值** - 如果一个变量没有被lock,就不能对其进行unlock操作。也不能unlock一个被其他线程锁住的变量 - **对一个变量进行unlock操作之前,必须把此变量同步回主内存** ## Volatile Volatile 是 Java 虚拟机提供**轻量级的同步机制**,是一个java的关键字,但是volatile 并**不能**保证线程安全性 - 保证可见性 - 不保证原子性 - 禁止指令重排 ### 1、保证可见性 我们来看看这一个问题代码 ```java import java.util.concurrent.TimeUnit; /** * @Author if * @Description: What is it * @Date 2021-11-08 下午 06:11 */ public class JmmTest { private static int num=0; public static void main(String[] args) throws InterruptedException { new Thread(()->{ while(num==0){ } System.out.println("线程结束"); }).start(); TimeUnit.SECONDS.sleep(1); num=1; System.out.println("num = "+num); } } ``` 此时会输出“线程”结束的语句嘛?并不会 因为线程并不知道num已经被main线程改变了,工作内存中的num还是0,所以一直在循环 如果我们将num加上关键字volatile呢? `private static volatile int num=0;` 可以看到,线程结束了 > 线程结束 > num = 1 > > Process finished with exit code 0 ### 2、不保证原子性 > **ACID**,是指数据库管理系统(DBMS)在写入或更新资料的过程中,为保证事务(transaction)是正确可靠的,所必须具备的四个特性: > > - **原子性**(atomicity,或称不可分割性) > - 一个事务中的所有操作,**要么全部完成,要么全部不完成**,不会结束在中间某个环节 > - **一致性**(consistency) > - 在事务开始之前和事务结束以后,数据库的**完整性没有被破坏** > - **隔离性**(isolation,又称独立性) > - 数据库允许多个并发事务同时对其数据进行读写和修改的能力,隔离性可以**防止多个事务并发执行时由于交叉执行而导致数据的不一致 ** > - 事务隔离分为不同级别,包括读未提交(Read uncommitted)、读提交(read committed)、可重复读(repeatable read)和串行化(Serializable) > - **持久性**(durability) > - 事务处理结束后,**对数据的修改就是永久的**,即便系统故障也不会丢失 我们看一下下面的代码 ```java import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * @Author if * @Description: What is it * @Date 2021-11-08 下午 06:27 */ public class VolatileTest { private static int num=0; private static volatile int vnum=0; public static void add(){ num++; vnum++; } private static int snum=0; public synchronized static void sAdd(){ snum++; } private static int lnum=0; private static Lock lock=new ReentrantLock(); public static void lAdd(){ lock.lock(); try{ lnum++; }catch(Exception e){ e.printStackTrace(); }finally{ lock.unlock(); } } public static void main(String[] args) { for (int i = 0; i < 20; i++) { new Thread(()->{ for (int j = 0; j < 1000; j++) { add(); sAdd(); lAdd(); } }).start(); } while (Thread.activeCount()>2){ //线程让出当前时间片给其他线程执行 Thread.yield(); } System.out.println("结束,普通的num = "+num+",加了volatile的vnum = "+vnum); System.out.println("结束,synchronized方法的snum = "+snum); System.out.println("结束,lock方法的lnum = "+lnum); } } ``` 可以看见,不管是普通的num还是加了volatile的vnum都是不是正确的20000结果 加了lock和synchronized的方法中的lnum和snum的结果是正确的20000结果 > 结束,普通的num = 19986,加了volatile的vnum = 19986 > 结束,synchronized方法的snum = 20000 > 结束,lock方法的lnum = 20000 因为**volatile不保证原子性**,`num++`自增也不是一个原子性操作 #### 反编译查看 我们使用`javap -c VolatileTest.class`命令反编译一下这个class文件查看一下 ```java public static void add(); Code: //获取static的num放入操作栈顶 0: getstatic #2 // Field num:I //把常量1放入操作栈顶 3: iconst_1 //当前操作栈顶中两个值相加并且把结果放入操作栈顶(num=num+1) 4: iadd //操作栈顶的结果赋值给static的num 5: putstatic #2 // Field num:I //下边是vnum的操作,和上面一样,这里不再赘述 8: getstatic #3 // Field vnum:I 11: iconst_1 12: iadd 13: putstatic #3 // Field vnum:I 16: return ``` #### 原子类Atomic 那如果要求不使用synchronized和lock怎么保证原子性实现这个方法? 使用java.util.concurrent.atomic包下的原子类 这些类的底层都直接和操作系统挂钩!在内存中修改值!Unsafe类是一个很特殊的存在! ![](https://gitee.com/ifyyf/resource/raw/master/img/202111082311348.png) 代码举例 ```java import java.util.concurrent.atomic.AtomicInteger; /** * @Author if * @Description: What is it * @Date 2021-11-08 下午 11:12 */ public class AtomicTest { private static AtomicInteger num=new AtomicInteger(0); public static void add(){ //自增并返回,CAS乐观锁 num.getAndIncrement(); } public static void main(String[] args) { long startTime=System.currentTimeMillis(); for (int i = 0; i < 20; i++) { new Thread(()->{ for (int j = 0; j < 1000; j++) { add(); } }).start(); } while(Thread.activeCount()>2){ Thread.yield(); } long endTime=System.currentTimeMillis(); System.out.println("程序运行时间: "+(endTime-startTime)+"ms"); //获得num的值并输出 System.out.println("num.get() = "+num.get()); } } ``` 结果正常达到20000且效率也不低 > 程序运行时间: 44ms > num.get() = 20000 我们来看看之前的synchronized和lock的 > 程序运行时间: 46ms > 结束,synchronized方法的snum = 20000 > > 程序运行时间: 47ms > 结束,lock方法的lnum = 20000 现在看起来三者效率都不错 当我们把循环次数提起来后 ```java for (int i = 0; i < 20000; i++) { new Thread(()->{ for (int j = 0; j < 10000; j++) { //do something } }).start(); } ``` > 程序运行时间: 2136ms > 结束,synchronized方法的snum = 200000000 > > 程序运行时间: 6287ms > 结束,lock方法的lnum = 200000000 > > 程序运行时间: 3881ms > num.get() = 200000000 可以看到是synchronized占优势,atomic其次,lock反而是时间最长的 ### 3、禁止指令重排 #### 什么是指令重排? 我们写的程序,计算机并不是按照你写的那样去执行的 为了性能考虑, 编译器和CPU可能会对指令**重新排序** 什么是指令重排:**不影响结果的前提下,对某些指令优先执行,提高效率** ![](https://img-blog.csdnimg.cn/20190227103318800.png) 源代码-->编译器优化的重排--> 指令并行也可能会重排--> 内存系统也会重排---> 执行 #### as-if-serial语义 > **不管怎么重排序,单线程程序的执行结果不能被改变** > > 编译器、runtime和处理器都必须遵守as-if-serial语义 处理器在进行指令重排的时候考虑:**数据之间的依赖性**! ```c int x = 1; // 1 int y = 2; // 2 x = x + 5; // 3 y = x * x; // 4 我们所期望的:1234 但是可能执行的时候回变成 2134 1324 不可能是 4123!因为y赋值依赖于x! ``` 只要加了volatile就可以避免指令重排 对于内存区的读写都加**内存屏障**:静止上下指令的顺序交换 作用: - 保证特定的操作的执行顺序! - 可以保证某些变量的内存可见性 (利用这些特性volatile实现了可见性) ![](https://gitee.com/ifyyf/resource/raw/master/img/202111082340450.png) **Volatile 是可以保持可见性。不能保证原子性,由于内存屏障,可以保证避免指令重排的现象产生!** ## 单例模式 关于单例模式的话,可以去看看我写的[单例模式的笔记](https://blog.csdn.net/Jay_Chou345/article/details/120395800) ## 理解CAS ### 什么是CAS? CAS,compare and swap的缩写,中文翻译成比较并交换 CAS是CPU的并发原语,是操作系统层面的原子性操作 我们查看AtomicInteger类中有一个`Unsafe`类 ```java public class AtomicInteger extends Number implements java.io.Serializable { private static final Unsafe unsafe = Unsafe.getUnsafe(); ``` 大家知道,java不能直接操作系统,而是通过关键字`native`操作C++来操作系统底层 而这个Unsafe类就是java留的“后门”,可以直接操作系统的内存 在讲cas之前,我想先看看`compareAndSet`方法,交换并赋值 ```java private volatile static AtomicInteger num=new AtomicInteger(0); num.compareAndSet(1,2); ``` 就是当num的值为1时,将其替换为2,其实CAS的结果和这个比较相似 我们来看一下上一小节的AtomicInteger是怎么原子性自增的 ```java private volatile static AtomicInteger num=new AtomicInteger(0); public static void add(){ //自增并返回 num.getAndIncrement(); } ``` 然后查看这个`getAndIncrement`方法 可以看到就是调用的Unsafe类的`getAndAddInt`方法 ```java private static final Unsafe unsafe = Unsafe.getUnsafe(); public final int getAndIncrement() { return unsafe.getAndAddInt(this, valueOffset, 1); } ``` 我们继续查看Unsafe类的`getAndAddInt`方法的源码 ```java public final int getAndAddInt(Object var1, long var2, int var4) { int var5; do { var5 = this.getIntVolatile(var1, var2); } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4)); return var5; } ``` 也就是说我们调用了是将this(AtomicInteger对象),valueOffset(内存地址偏移值)和需要加的值i进行操作 简单来说其实也就是上一小节讲的 > 获取static的num放入操作栈顶 > > 把常量1放入操作栈顶 > > 当前操作栈顶中两个值相加并且把结果放入操作栈顶(num=num+1) > > 操作栈顶的结果赋值给static的num ![](https://gitee.com/ifyyf/resource/raw/master/img/202111091645424.png) ### 样例的CAS源码的简单解释 根据`var1实例对象`与其的`内存地址var2`可以取出现在元素的值`var5` 然后**循环**调用CAS操作`compareAndSwapInt`,去比较`var5`的值是否尚未发生改变,如果还是原值,则交换成新值 这个我们也称其为**自旋**操作,或者叫自旋锁 > 比较当前工作内存中的值和主内存中的值 > > 如果这个值是期望的,那么则执行操作! > > 如果不是就一直循环! ```java public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5); ``` **如果根据`var1`和`var2`取出来的值,还是与`var5`相同,那我就将`var5`替换为`var5 + var4`并返回** 方法调用的native,也就是CAS原语了,(这里var4就是1) ### CAS的缺点 - 循环会耗时 - 一次性只能保证一个共享变量的原子性 - ABA问题 #### 什么是ABA问题? 在其他线程不知情的情况,来了一手**狸猫换太子又换狸猫**,但是别的线程并不知道这个被替换过,也不知道这个狸猫是否还是原来的那个狸猫 好比现在A=1 然后B线程调用cas(1,3)和cas(3,1) 先把1换为了3,再把3换为了1 对于线程A来说,A的值还是1,但是他可能并不是原来的那个1了 对于基本类型来说没有太大影响,因为指向常量池的位置 如果是引用类型来说,可能就有问题了,传递的值也许没变,可是对象变了! ![](https://gitee.com/ifyyf/resource/raw/master/img/202111091703199.png) #### ABA问题的解决方法 带**版本号**的原子操作,详见下一节“原子引用” ## 原子引用 乐观锁的实现不仅只有CAS操作,还有一个**版本号机制**也可以实现 我们这里采用`AtomicStampedReference`类来实现版本号机制 需要注意的是:`compareAndSet`方法底层用的==判断相等,所以使用Integer的话只能使用缓存区间-128~127! ```java static final int low = -128; static final int high; assert IntegerCache.high >= 127; public static Integer valueOf(int i) { if (i >= IntegerCache.low && i <= IntegerCache.high) return IntegerCache.cache[i + (-IntegerCache.low)]; return new Integer(i); } ``` ### 代码实现 ```java import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicStampedReference; /** * @Author if * @Description: 版本号机制 * 注意如果这里写Integer的话,只能采用-127~128的区间 * 因为底层是采用的==判断!!!!! * expectedReference == current.reference * * @Date 2021-11-09 下午 04:33 */ public class CASDemo { public static void main(String[] args) { AtomicStampedReference atomicInteger = new AtomicStampedReference<>(1, 1); new Thread(()->{ //获取版本号 int stamp = atomicInteger.getStamp(); System.out.println("一开始的A - stamp = " + stamp); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("A -> "+atomicInteger.compareAndSet(1, 2, stamp, (stamp + 1))); System.out.println("结束的的A - stamp = " + atomicInteger.getStamp()); System.out.println("======================="); },"A").start(); new Thread(()->{ int stamp = atomicInteger.getStamp(); System.out.println("B - stamp = " + stamp); System.out.println("B -> "+atomicInteger.compareAndSet(1, 2, stamp, stamp + 1)); System.out.println("结束的的B - stamp = " + atomicInteger.getStamp()); System.out.println("======================="); },"B").start(); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("最后getStamp = "+atomicInteger.getStamp()); System.out.println("atomicInteger值 = "+atomicInteger.get(new int[]{atomicInteger.getStamp()})); } } ``` ## 各种锁的理解 ### 1、公平锁和非公平锁 这个咱们在学习Lock类时应该就接触到了 `Lock lock=new ReentrantLock();` 我们来看看可重入锁`ReentrantLock`的构造器 ```java //默认创建非公平锁Nonfair public ReentrantLock() { sync = new NonfairSync(); } //boolean参数为true创建公平锁Fair,反之创建非公平锁Nonfair public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); } ``` > 公平锁:公平:需要先来后到 > 非公平锁:不公平:可以插队 (默认) ### 2、可重入锁 可重入:**某个线程已经获得某个锁,可以再次获取锁而不会出现死锁** **synchronized和ReentrantLock都是可重入的** - **隐式锁**(即synchronized关键字使用的锁)默认是可重入锁 - **显式锁**(即Lock)也有ReentrantLock这样的可重入锁 可重入锁的意义之一在于**防止死锁** 当然,有一次lock()也得要一次unlock(),即**加锁次数和释放次数要一样** > 实现原理实现是通过为每个锁关联一个请求计数器和一个占有它的线程 > > 当计数为0时,认为锁是未被占有的,线程请求一个未被占有的锁时,JVM将记录锁的占有者,并且将请求计数器置为1 > > 如果同一个线程再次请求这个锁,计数器将递增 > > 每次占用线程退出同步块,计数器值将递减。直到计数器为0,锁被释放 **现有阶段的锁默认都是可重入锁**(也称递归锁) 如果要实现不可重入的效果,可以自己设置一个继承Lock的类 > 成员变量绑定一个线程,第一次调用将当前线程赋值给绑定线程,然后后续的调用lock时,去判断绑定线程是不是当前线程,如果当前线程就是绑定线程则给他wait,在unlock方法中就清除绑定线程即可 > > 具体实现可以参考一下这篇博客https://blog.csdn.net/wb_zjp283121/article/details/88973970 ### 3、自旋锁 其实之前讲CAS的时候,就讲到了自旋操作 ```java public final int getAndAddInt(Object var1, long var2, int var4) { int var5; do { var5 = this.getIntVolatile(var1, var2); } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4)); return var5; } ``` “自旋”可以理解为“自我旋转”,这里的“旋转”指“循环”,比如 while 循环或者 for 循环 **“自旋”就是自己在这里不停地循环,直到目标达成** 而不像普通的锁那样,如果获取不到锁就进入阻塞 > 非自旋锁和自旋锁最大的区别,就是如果它**遇到拿不到锁的情况,它会把线程阻塞,直到被唤醒。而自旋锁会不停地尝试** > > 自旋锁的好处在于,自旋锁用循环去不停地尝试获取锁,让线程始终处于 Runnable 状态,节省了线程状态切换带来的开销 > > 可是如果临界区很大,线程一旦拿到锁,很久才会释放的话,那就不合适用自旋锁,因为自旋会一直占用 CPU 却无法拿到锁,白白消耗资源 ### 4、死锁 #### 什么是死锁 死锁:**多线程下,由于竞争资源或者由于彼此通信而造成的一种阻塞现象,若无外力作用,它们都讲无法推进下去** 此时称系统处于死锁状态或系统产生了死锁,这些永远在相互等待的进程成为死锁进程 **出现死锁的条件** 1. 必须是两个或者两个以上进程(线程) 2. 必须有竞争资源 #### 一张图带你看懂死锁! ![](https://gitee.com/ifyyf/resource/raw/master/img/202111091853936.png) #### 代码示例 ```java import java.util.concurrent.TimeUnit; /** * @Author if * @Description: 死锁样例 * @Date 2021-11-09 下午 06:54 */ public class DeadLockDemo { public static void main(String[] args) { Object lockA=new Object(); Object lockB=new Object(); new Thread(()->{ synchronized (lockA){ System.out.println(Thread.currentThread().getName()+"获取到A锁"); try{ TimeUnit.SECONDS.sleep(1); }catch(Exception e){ e.printStackTrace(); } synchronized (lockB){ System.out.println(Thread.currentThread().getName()+"获取到B锁"); } } },"A").start(); new Thread(()->{ synchronized (lockB){ System.out.println(Thread.currentThread().getName()+"获取到B锁"); try{ TimeUnit.SECONDS.sleep(1); }catch(Exception e){ e.printStackTrace(); } synchronized (lockA){ System.out.println(Thread.currentThread().getName()+"获取到A锁"); } } },"B").start(); } } ``` 代码应该很清晰明了了,中间sleep是因为怕一个线程同时抢了两把锁导致不成功 A抢到A锁进入睡眠,B抢到B锁进入睡眠,然后A唤醒后尝试获取B锁,当然获取不到啊,B也尝试获取A锁,当然也获取不到,需要的锁都在对方的手中,自然陷入死锁 > A获取到A锁 > B获取到B锁 #### 如何排查死锁? - 定位进程号: - 在windows命令窗口,使用 `jps -l`  查看当前的java进程的pid,通过包路径很容易区分出自己开发的程序进程 - 找到线程状态和问题代码: - 查看到pid,输入`jstack -l 15528`,15528是进程pid ```powershell # 查看进程 >jps -l 14720 1464 org.jetbrains.jps.cmdline.Launcher 15528 com.ifyyf.test.deadlock.DeadLockDemo 4040 org.jetbrains.idea.maven.server.RemoteMavenServer36 9176 sun.tools.jps.Jps # 查看具体的错误信息(篇幅太长,随便挑一点) >jstack -l 15528 Found one Java-level deadlock: ============================= "B": waiting to lock monitor 0x0000000002e39fe8 (object 0x000000076b614298, a java.lang.Object), which is held by "A" "A": waiting to lock monitor 0x0000000002e3c928 (object 0x000000076b6142a8, a java.lang.Object), which is held by "B" Java stack information for the threads listed above: =================================================== "B": at com.ifyyf.test.deadlock.DeadLockDemo.lambda$main$1(DeadLockDemo.java:42) - waiting to lock <0x000000076b614298> (a java.lang.Object) - locked <0x000000076b6142a8> (a java.lang.Object) at com.ifyyf.test.deadlock.DeadLockDemo$$Lambda$2/1078694789.run(Unknown Source) at java.lang.Thread.run(Thread.java:748) "A": at com.ifyyf.test.deadlock.DeadLockDemo.lambda$main$0(DeadLockDemo.java:26) - waiting to lock <0x000000076b6142a8> (a java.lang.Object) - locked <0x000000076b614298> (a java.lang.Object) at com.ifyyf.test.deadlock.DeadLockDemo$$Lambda$1/990368553.run(Unknown Source) at java.lang.Thread.run(Thread.java:748) Found 1 deadlock. ``` ## 完结撒花 本篇juc的学习并没有特别深入,可以说是简单入个门吧 本篇代码都放在[我的gitee仓库](https://gitee.com/ifyyf/juc-study)了,需要的可以自取