Condition定义了等待/通知两种类型的方法,当前线程调用这些方法时,需要提前获取到Condition对象关联的锁。
public interface Condition {
/**
* 当前线程进入等待状态直到被通知(signal)或者中断;
* 当前线程进入运行状态并从await()方法返回的场景包括:
*(1)其他线程调用相同Condition对象的signal/signalAll方法,并且当前线程被唤醒;
*(2)其他线程调用interrupt方法中断当前线程;
*/
void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
/**
* 唤醒一个等待在Condition上的线程,被唤醒的线程在方法返回前必须获得与Condition对象关联的锁
*/
void signal();
/**
* 唤醒所有等待在Condition上的线程,能够从await()等方法返回的线程必须先获得与Condition对象关联的锁
*/
void signalAll();
}
与notify和wait是配合synchronized内置锁实现线程同步的基础设施一样,条件变量的signal和await方法也是用来配合锁(使用AQS实现的锁)实现线程间同步的基础设施。
不过与synchronized同时只能与一个共享变量的notify或wait方法实现同步不同,AQS的一个锁可以对应多个条件变量。
Condition是依赖于Lock对象的
public interface Lock {
Condition newCondition();
}
lock.newCondition是作用其实是new了一个在AQS内部声明的ConditionObject对象。
ConditionObject是同步器AbstractQueuedSynchronizer的内部类,因为Condition的操作需要获取相关联的锁。每个Condition对象都包含一个等待队列,该队列是Condition对象实现等待/通知功能的关键。不过AQS只提供了ConditionObject的实现,并没有提供newCondition
函数,该函数用来new一个ConditionObject对象,需要由AQS的子类来提供newCondition函数。
是一个FIFO的队列,但需要注意的是单向链表,队列中的每个节点复用了AQS中的Node的定义。 一个Condition包含一个等待队列,Condition拥有首节点(firstWaiter)和尾节点(lastWaiter)。当前线程调用await方法,将会以当前线程构造成节点,并将节点从尾部加入等待队列。这个操作不需要使用CAS保证,因为调用await()方法的线程必定是获取了锁的线程,也就是说这个过程是由锁来保证线程安全的。
调用Condition的await(或以await开头的方法),会使当前线程进入等待队列并释放锁,同时线程状态变为等待状态。当从await方法返回时,当前线程一定获取了Condition相关联的锁。下面是await方法实现:
public final void await() throws InterruptedException {
//当前线程中断
if (Thread.interrupted())
throw new InterruptedException();
//将当前线程构造成CONDITION的节点加入条件队列末尾
Node node = addConditionWaiter();
//释放当前线程获取到的锁
int savedState = fullyRelease(node);
int interruptMode = 0;
//检测此节点的线程是否在同步队列上,如果不在,则说明该线程还不具备
//竞争锁的资格,则继续等待,直到检测到此节点在同步队列上
while (!isOnSyncQueue(node)) {
//调用park方法阻塞挂起当前线程
LockSupport.park(this);
//如果已经中断了,则退出
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
...
}
来看一下当一个线程调用条件变量的await()
方法而被阻塞后,如何将其放入条件队列
private Node addConditionWaiter() {
//尾节点
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
//Node的节点状态不为CONDITION,则表示该节点不处于等待状态,需要清除节点
if (t != null && t.waitStatus != Node.CONDITION) {
//清除条件队列中所有状态不为Condition的节点
unlinkCancelledWaiters();
t = lastWaiter;
}
//根据当前线程创建一个CONDITION的节点
Node node = new Node(Thread.currentThread(), Node.CONDITION);
//下面代码就是将当前线程插入条件队列的末尾
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
然后调用fullyRelease
方法释放线程持有的锁:
final long fullyRelease(Node node) {
boolean failed = true;
try {
// 节点状态--其实就是持有锁的数量
long savedState = getState();
// 释放锁
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
Lock#lock()
方法,再调用await
方法Lock#lock
方法,直接调用await
方法,就会抛出IllegalMonitorStateException
异常然后需要注意isOnSyncQueue
这个方法,当第一次调用时,当前这个节点即是头节点也是尾节点,并且waitStatus == Node.CONDITION,所以第一个if条件成立,返回false,进入while循环,当前线程被挂起,知道它出现在同步队列,那什么情况它会出现在同步队列呢?就是调用了signal
后,该节点就会从条件队列头部移到同步队列尾部。
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
//后续节点不为null,肯定在同步队列中
if (node.next != null) // If has successor, it must be on queue
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
return findNodeFromTail(node);
}
这个时候前两个if条件都不满足了(此时waitStatus已经被设置为0了),所以会进入下面这个方法:
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
可以看到,tail就是同步队列中的尾节点,而此时node就是同步队列的尾节点,所以第一个if成立,返回true,跳出while循环,继续执行await
后面的方法:
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//此时开始往下执行,这时该节点已经在同步队列了
//可以竞争同步状态
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
//清理下条件队列中的不是在等待条件的节点
//因为条件队列中的节点都是CONDITION状态
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
看下unlinkCancelledWaiters
这个方法:
//等待队列是一个单向链表,遍历链表将已取消等待的节点清除出去
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
//用于中间操作不需要跳过时,记录上一个Node节点
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
//如果节点的状态不是Node.CONDITION的话,这个节点就是被取消的
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
总结下这个流程:
当多个线程同时调用lock.lock()方法(不管是非公平锁还是公平锁都会调用到acquire方法,然后走AQS获取资源这套流程)获取锁时,只有一个线程能成功获取到锁,其他线程就会被转换为Node节点插入到lock锁对应的AQS阻塞队列里面,并做自旋CAS尝试获取锁。
如果获取到锁的线程又调用了对应的条件变量的await()
方法,则该线程就会转换为Node节点插入到条件变量对应的条件队列里面,并且释放获取到的锁。
这时候因为调用lock.lock()
方法被阻塞到AQS队列里面的一个线程会获取到被释放的锁,如果这个线程也调用了await方法,那么该线程也会被放入条件变量的条件队列中。
如果一个线程调用了条件变量的signal()
或者signalAll()
方法时,会把条件队列里面的一个或全部Node节点(头节点)移动到AQS的阻塞队列(尾部)里面,等待时间获取到锁。
public final void signal() {
//检测当前线程是否为拥有锁的线程
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//头节点,唤醒条件队列中的第一个节点
Node first = firstWaiter;
if (first != null)
//将条件队列的对头元素移动到AQS队列
doSignal(first);
}
doSignal代码如下:
private void doSignal(Node first) {
do {
//修改头节点,将旧的头节点移出
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
transferForSignal
方法将节点移动到同步队列中:
final boolean transferForSignal(Node node) {
//将该节点从状态CONDITION改变为初始状态0,
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//将节点加入到syn队列中去,返回的是syn队列中node节点前面的一个节点
Node p = enq(node);
int ws = p.waitStatus;
//如果结点p的状态为cancel 或者修改waitStatus失败,则直接唤醒
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
总结下这个流程:
Node.CANCEL
,或者修改状态为Node.SIGNAL
失败时,则直接调用LockSupport唤醒该节点的线程一个线程获取锁后,通过调用await
方法,会将当前线程先加入到条件队列中,然后释放锁,最后通过isOnSyncQueue
方法,不断自检看节点是否已经在同步队列了,如果是则尝试获取锁,否则一直挂起。
当线程调用signal
方法后,首先检查当前线程是否已经获取了锁,然后通过doSignal
方法唤醒等待队列的首节点。被唤醒的线程,将从await
方法中的while循环中退出来,然后调用acquireQueued
方法竞争同步状态。
一个锁对应一个AQS阻塞队列,对应多个条件变量,每个条件变量有自己的一个条件队列:
ReentrantLock是可重入的独占锁。同时只能有一个线程获取到锁,其他没有获取到锁的线程会被放入到AQS阻塞队列中。
可重入:任意线程获取到锁后能够再次获取该锁而不会被锁所阻塞。这个特性的实现需要解决两个问题:
1、线程再次获取锁:锁需要去识别获取锁的线程是否为当前占有锁的线程,如果是,则获取成功。
2、锁的最终释放:锁被重复获取的次数等于0时表示锁已经成功释放。
类图
从类图中可以看到,ReentrantLock还是使用AQS实现的。根据参数可以决定其内部是一个公平还是非公平锁:
//默认是非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
获取锁的入口如下:
public void lock() {
sync.lock();
}
lock方法会委托给AbstractQueueSynchronized
的子类Sync
abstract void lock();
Sync有两个实现类,分别是公平锁和非公平锁,首先看下非公平锁的实现逻辑
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
//如果CAS设置state的值为1成功,则设置锁的拥有者为当前线程
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
//否则调用AQS的acquire方法
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
假设现在有2个线程,线程1CAS
设置重入次数为1成功后返回,此时线程2尝试CAS
失败,就会调用acquire
方法,在讲AQS的时候说过,该方法会首先调用子类重写的tryAcquire
方法,也就是上面的第二个方法,然后进入下面这个方法:
final boolean nonfairTryAcquire(int acquires) {
//获取当前线程
final Thread current = Thread.currentThread();
//获取同步状态
int c = getState();
//如果为0,说明锁空闲
if (c == 0) {
//CAS设置状态值
if (compareAndSetState(0, acquires)) {
//设置为当前线程所有
setExclusiveOwnerThread(current);
return true;
}
}
//如果当前线程还是持有锁的线程,说明是锁重入
else if (current == getExclusiveOwnerThread()) {
//状态值继续加1
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
//否则说明当前线程不是持有锁的线程,返回false,加入AQS阻塞队列
return false;
}
这里有三种情况:
1、就是上面说的,线程2此时不会走if和else if,直接返回false,被加入到AQS阻塞队列中
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
2、线程1又调用了lock
方法,那么此时线程1会走else if逻辑,代表发生了锁重入
3、如果线程1持有的状态值减到0了,那么就会释放资源,并且线程1是头节点(持有锁的节点),而线程2是它的后继节点,那么根据AQS中分析的,线程2调用tryAcquire
方法后,就会走第一个if逻辑,将当前持有锁的线程设置为线程2。
非公平的体现:还是假设2个线程,在2个线程执行之前,已经有一个线程获取到锁了,所以线程A执行上面的代码后返回false,被加入到阻塞队列,这时候线程B执行到第一个if语句的时候,占有锁的线程刚好释放了锁,那么线程B就能成功获取到锁,但是明明是线程A先请求获取锁的,这就是非公平的体现。之所以会这样,是因为线程B在获取锁之前并没有查看当前AQS队列中是否有比自己更早请求锁的线程。
公平锁的实现在FairSync
重写的tryAcquire
方法:
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//公平性策略
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
与非公平锁不一样的地方在于多了一个判断
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
通过方法名可知这个方法的作用是判断是否有排队的前驱节点,如果返回true就代表有,否则就是没有
一、返回true的情况
(s=h.next)==null
表示的是什么意思:在AQS中说过节点入队的时候,第一个元素入队是2步操作:首先创建一个哨兵节点,然后将第一个元素插入哨兵节点的后面,也就是说,在执行到这里的时候,如果刚好有第一个元素正在入队(具体可以看enq函数,当时看这段代码完全懵逼,Doug Lea是真牛逼),那么返回true二、返回false的情况
公平与否是针对获取锁而言的,如果一个锁是公平的,那么锁的获取顺序就应符合请求锁的绝对时间顺序,也就是FIFO。
下面通过一段代码测试公平锁和非公平锁在获取锁时的区别:
public class FairAndUnfairTest {
//公平锁
private static Lock fairLock = new ReentrantLock2(true);
//非公平锁
private static Lock unfairLock = new ReentrantLock2(false);
//公平锁测试
public void fair() throws InterruptedException {
testLock("公平锁",fairLock);
}
//非公平锁测试
public void unfair() throws InterruptedException {
testLock("非公平锁",unfairLock);
}
//启动线程
private void testLock(String type, Lock lock) throws InterruptedException {
System.out.println(type);
for (int i = 0; i < 5; i++) {
Thread thread=new Thread(new Job(lock)){
@Override
public String toString() {
return getName();
}
};
thread.setName(""+i);
thread.start();
}
Thread.sleep(11000);
}
//重写实现ReentrantLock类是为了重写getQueuedThreads方法,便于观察
private static class ReentrantLock2 extends ReentrantLock {
public ReentrantLock2(boolean fair) {
super(fair);
}
/**
* 返回正在获取等待锁的列表(获取同步队列中的线程)
*
* @return
*/
@Override
protected Collection<Thread> getQueuedThreads() {
List<Thread> arrayList = new ArrayList<>(super.getQueuedThreads());
//因为getQueuedThreads()方法是从后往前遍历添加到集合的,所以需要反转输出
Collections.reverse(arrayList);
return arrayList;
}
}
private static class Job implements Runnable {
private Lock lock;
public Job(Lock lock) {
this.lock = lock;
}
@Override
public void run() {
//连续打印两次才能看出效果,一次的话两者没区别
for (int i = 0; i < 2; i++) {
lock.lock();
try{
Thread.sleep(1000);
//((ReentrantLock2)lock).getQueuedThreads()),这里一定要注意必须这样写
//如果这里改成new ReentrantLock2(fair)的话,那么和传进来的lock是不相关的,不会输出结果
//父类转换成子类,就可以调用子类的方法
System.out.println("Lock by["+ Thread.currentThread().getName()+"]"+
",Waiting by"+((ReentrantLock2)lock).getQueuedThreads());
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
}
输出结果如下:
公平锁
Lock by[0],Waiting by[1, 2, 4, 3]
Lock by[1],Waiting by[2, 4, 3, 0]
Lock by[2],Waiting by[4, 3, 0, 1]
Lock by[4],Waiting by[3, 0, 1, 2]
Lock by[3],Waiting by[0, 1, 2, 4]
Lock by[0],Waiting by[1, 2, 4, 3]
Lock by[1],Waiting by[2, 4, 3]
Lock by[2],Waiting by[4, 3]
Lock by[4],Waiting by[3]
Lock by[3],Waiting by[]
非公平锁
Lock by[0],Waiting by[1, 2, 3, 4]
Lock by[0],Waiting by[1, 2, 3, 4]
Lock by[1],Waiting by[2, 3, 4]
Lock by[1],Waiting by[2, 3, 4]
Lock by[2],Waiting by[3, 4]
Lock by[2],Waiting by[3, 4]
Lock by[3],Waiting by[4]
Lock by[3],Waiting by[4]
Lock by[4],Waiting by[]
Lock by[4],Waiting by[]
其中每个数字代表一个线程,可以发现,公平锁每次都是从队列中的第一个节点获取到锁,而非公平锁会出现一个线程连续获取到锁的情况,这会导致其它线程出现饥饿情况。既然如此,为什么非公平锁还会被设定为默认的实现呢?
如果把每次不同线程获取到锁定义为1次切换,那么会发现公平锁发生了10次切换,而非公平锁发生了5次切换,说明非公平锁的开销更小。
总结:公平锁保证了锁的获取按照FIFO原则,但代价就是进行大量的线程切换。非公平锁虽然可能造成线程“饥饿”,但极少的线程切换,保证了其更大的吞吐量。
释放锁入口:
public void unlock() {
sync.release(1);
}
会调用到AQS的release
方法,然后进入子类重写的tryRelease
方法:
protected final boolean tryRelease(int releases) {
//锁的可重入次数减1
int c = getState() - releases;
//释放的不是持有锁的线程
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//如果锁重入次数为0了,说明当前线程放弃了对该锁的使用权
//其他锁可以获取同步状态了
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
//否则仅仅是将可重入次数减1
setState(c);
//只有可重入次数减为0才会返回true
return free;
}
假设线程1、线程2、线程3同时获取独占锁ReentrantLock,如果线程1获取成功了,那么线程2和线程3就会加入到AQS阻塞队列中。假设线程1获取锁后调用了对应的锁创建的条件变量1,那么线程1会释放锁,然后转换为Node节点加入到条件变量对应的条件队列中。此时线程2和线程3就有机会获取到锁,如果使用的是公平锁模式,那么线程2就会获取到锁,从而从AQS队列移除线程2对应的Node节点。
当持有锁的线程长时间不释放时,可以调用interrupt
方法中断当前线程,前提是获取锁的方法调用的是lockInterruptibly
Synchronized中的锁是非公平的,ReentrantLock还支持公平锁,不过性能会下降
见上图
虽然ReentrantLock是Synchronized的超集,但是如果两者都可以满足需求的情况下,优先推荐使用Synchronized:
类图
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。