1 Star 0 Fork 0

cxylk / Java-Notes

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
锁接口和类.md 26.91 KB
一键复制 编辑 原始数据 按行查看 历史
cxylk 提交于 2023-02-23 14:24 . 并发

Condition

Condition定义了等待/通知两种类型的方法,当前线程调用这些方法时,需要提前获取到Condition对象关联的锁

public interface Condition {
	/**
	* 当前线程进入等待状态直到被通知(signal)或者中断;
	* 当前线程进入运行状态并从await()方法返回的场景包括:
	*(1)其他线程调用相同Condition对象的signal/signalAll方法,并且当前线程被唤醒;		
	*(2)其他线程调用interrupt方法中断当前线程;
	*/
    void await() throws InterruptedException;
    
    void awaitUninterruptibly();

    long awaitNanos(long nanosTimeout) throws InterruptedException;

    boolean awaitUntil(Date deadline) throws InterruptedException;

    /**
    * 唤醒一个等待在Condition上的线程,被唤醒的线程在方法返回前必须获得与Condition对象关联的锁
    */
    void signal();
	
    /**
    * 唤醒所有等待在Condition上的线程,能够从await()等方法返回的线程必须先获得与Condition对象关联的锁
    */
    void signalAll();
}

与notify和wait是配合synchronized内置锁实现线程同步的基础设施一样,条件变量的signal和await方法也是用来配合锁(使用AQS实现的锁)实现线程间同步的基础设施。

不过与synchronized同时只能与一个共享变量的notify或wait方法实现同步不同,AQS的一个锁可以对应多个条件变量

Condition是依赖于Lock对象的

public interface Lock {
     Condition newCondition();
}

lock.newCondition是作用其实是new了一个在AQS内部声明的ConditionObject对象。

Condition的实现分析

​ ConditionObject是同步器AbstractQueuedSynchronizer的内部类,因为Condition的操作需要获取相关联的锁。每个Condition对象都包含一个等待队列,该队列是Condition对象实现等待/通知功能的关键。不过AQS只提供了ConditionObject的实现,并没有提供newCondition函数,该函数用来new一个ConditionObject对象,需要由AQS的子类来提供newCondition函数

等待队列

​ 是一个FIFO的队列,但需要注意的是单向链表,队列中的每个节点复用了AQS中的Node的定义。 一个Condition包含一个等待队列,Condition拥有首节点(firstWaiter)和尾节点(lastWaiter)。当前线程调用await方法,将会以当前线程构造成节点,并将节点从尾部加入等待队列。这个操作不需要使用CAS保证,因为调用await()方法的线程必定是获取了锁的线程,也就是说这个过程是由锁来保证线程安全的

等待

​ 调用Condition的await(或以await开头的方法),会使当前线程进入等待队列并释放锁,同时线程状态变为等待状态。当从await方法返回时,当前线程一定获取了Condition相关联的锁。下面是await方法实现:

public final void await() throws InterruptedException {
  					//当前线程中断
            if (Thread.interrupted())
                throw new InterruptedException();
            //将当前线程构造成CONDITION的节点加入条件队列末尾
            Node node = addConditionWaiter();
            //释放当前线程获取到的锁
            int savedState = fullyRelease(node);
            int interruptMode = 0;
  					//检测此节点的线程是否在同步队列上,如果不在,则说明该线程还不具备
  					//竞争锁的资格,则继续等待,直到检测到此节点在同步队列上
            while (!isOnSyncQueue(node)) {
                //调用park方法阻塞挂起当前线程
                LockSupport.park(this);
              	//如果已经中断了,则退出
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
    		...
}

来看一下当一个线程调用条件变量的await()方法而被阻塞后,如何将其放入条件队列

        private Node addConditionWaiter() {
          	//尾节点
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            //Node的节点状态不为CONDITION,则表示该节点不处于等待状态,需要清除节点
            if (t != null && t.waitStatus != Node.CONDITION) {
              	//清除条件队列中所有状态不为Condition的节点
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            //根据当前线程创建一个CONDITION的节点
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            //下面代码就是将当前线程插入条件队列的末尾
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }

然后调用fullyRelease方法释放线程持有的锁:

final long fullyRelease(Node node) {
    boolean failed = true;
    try {
        // 节点状态--其实就是持有锁的数量
        long savedState = getState();
        // 释放锁
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}
  • 正常情况下,释放锁都能成功,因为是先调用Lock#lock()方法,再调用await方法
  • 当线程未持有锁,即未调用Lock#lock方法,直接调用await方法,就会抛出IllegalMonitorStateException异常
  • 另外,释放锁失败的情况下,会设置node.waitStatus = Node.CANCELLED,这也是后面为什么需要清除该标志的节点。

然后需要注意isOnSyncQueue这个方法,当第一次调用时,当前这个节点即是头节点也是尾节点,并且waitStatus == Node.CONDITION,所以第一个if条件成立,返回false,进入while循环,当前线程被挂起,知道它出现在同步队列,那什么情况它会出现在同步队列呢?就是调用了signal后,该节点就会从条件队列头部移到同步队列尾部。

final boolean isOnSyncQueue(Node node) {
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
   			//后续节点不为null,肯定在同步队列中
        if (node.next != null) // If has successor, it must be on queue
            return true;
        /*
         * node.prev can be non-null, but not yet on queue because
         * the CAS to place it on queue can fail. So we have to
         * traverse from tail to make sure it actually made it.  It
         * will always be near the tail in calls to this method, and
         * unless the CAS failed (which is unlikely), it will be
         * there, so we hardly ever traverse much.
         */
        return findNodeFromTail(node);
    }

这个时候前两个if条件都不满足了(此时waitStatus已经被设置为0了),所以会进入下面这个方法:

private boolean findNodeFromTail(Node node) {
        Node t = tail;
        for (;;) {
            if (t == node)
                return true;
            if (t == null)
                return false;
            t = t.prev;
        }
    }

可以看到,tail就是同步队列中的尾节点,而此时node就是同步队列的尾节点,所以第一个if成立,返回true,跳出while循环,继续执行await后面的方法:

public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
  					//此时开始往下执行,这时该节点已经在同步队列了
  					//可以竞争同步状态
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            //清理下条件队列中的不是在等待条件的节点
  					//因为条件队列中的节点都是CONDITION状态
  					if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

看下unlinkCancelledWaiters这个方法:

//等待队列是一个单向链表,遍历链表将已取消等待的节点清除出去
private void unlinkCancelledWaiters() {
    Node t = firstWaiter;
  	//用于中间操作不需要跳过时,记录上一个Node节点
    Node trail = null;
    while (t != null) {
      Node next = t.nextWaiter;
      //如果节点的状态不是Node.CONDITION的话,这个节点就是被取消的
      if (t.waitStatus != Node.CONDITION) {
        t.nextWaiter = null;
        if (trail == null)
          firstWaiter = next;
        else
          trail.nextWaiter = next;
        if (next == null)
          lastWaiter = trail;
      }
      else
        trail = t;
      t = next;
    }
}

总结下这个流程:

  • 首先,将当前线程新建成一个节点加入到条件队列中
  • 然后,释放当前线程持有的同步状态
  • 之后,则是不断检测该节点代表的线程,是否出现在同步队列中(当收到signal信号之后,就会在AQS队列中检测到),如果不存在,则一直挂起
  • 最后,重新参与资源竞争,获取到同步状态

​ 当多个线程同时调用lock.lock()方法(不管是非公平锁还是公平锁都会调用到acquire方法,然后走AQS获取资源这套流程)获取锁时,只有一个线程能成功获取到锁,其他线程就会被转换为Node节点插入到lock锁对应的AQS阻塞队列里面,并做自旋CAS尝试获取锁。

​ 如果获取到锁的线程又调用了对应的条件变量的await()方法,则该线程就会转换为Node节点插入到条件变量对应的条件队列里面,并且释放获取到的锁

​ 这时候因为调用lock.lock()方法被阻塞到AQS队列里面的一个线程会获取到被释放的锁,如果这个线程也调用了await方法,那么该线程也会被放入条件变量的条件队列中。

通知

​ 如果一个线程调用了条件变量的signal()或者signalAll()方法时,会把条件队列里面的一个或全部Node节点(头节点)移动到AQS的阻塞队列(尾部)里面,等待时间获取到锁

        public final void signal() {
          	//检测当前线程是否为拥有锁的线程
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
          	//头节点,唤醒条件队列中的第一个节点
            Node first = firstWaiter;
            if (first != null)
                //将条件队列的对头元素移动到AQS队列
                doSignal(first);
        }

doSignal代码如下:

				private void doSignal(Node first) {
            do {
              	//修改头节点,将旧的头节点移出
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }

transferForSignal方法将节点移动到同步队列中:

final boolean transferForSignal(Node node) {
    //将该节点从状态CONDITION改变为初始状态0,
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    //将节点加入到syn队列中去,返回的是syn队列中node节点前面的一个节点
    Node p = enq(node);
    int ws = p.waitStatus;
    //如果结点p的状态为cancel 或者修改waitStatus失败,则直接唤醒
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

总结下这个流程:

  • 判断当前线程是否已经获取了锁,如果没有则直接抛出异常
  • 如果线程已经获取了锁,则唤醒了条件队列的首节点
  • 唤醒首节点是先将条件队列中的头节点移出,然后调用AQS的enq方法将其安全地移到同步队列
  • 最后判断如果该节点的同步状态是否为Node.CANCEL,或者修改状态为Node.SIGNAL失败时,则直接调用LockSupport唤醒该节点的线程

总结

一个线程获取锁后,通过调用await方法,会将当前线程先加入到条件队列中,然后释放锁,最后通过isOnSyncQueue方法,不断自检看节点是否已经在同步队列了,如果是则尝试获取锁,否则一直挂起。

当线程调用signal方法后,首先检查当前线程是否已经获取了锁,然后通过doSignal方法唤醒等待队列的首节点。被唤醒的线程,将从await方法中的while循环中退出来,然后调用acquireQueued方法竞争同步状态。

一个锁对应一个AQS阻塞队列,对应多个条件变量,每个条件变量有自己的一个条件队列

ReentrantLock

ReentrantLock是可重入独占锁同时只能有一个线程获取到锁,其他没有获取到锁的线程会被放入到AQS阻塞队列中

可重入:任意线程获取到锁后能够再次获取该锁而不会被锁所阻塞。这个特性的实现需要解决两个问题:

1、线程再次获取锁:锁需要去识别获取锁的线程是否为当前占有锁的线程,如果是,则获取成功。

2、锁的最终释放:锁被重复获取的次数等于0时表示锁已经成功释放。

类图

从类图中可以看到,ReentrantLock还是使用AQS实现的。根据参数可以决定其内部是一个公平还是非公平锁:

    //默认是非公平锁
	public ReentrantLock() {
        sync = new NonfairSync();
    }

    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

获取锁

获取锁的入口如下:

    public void lock() {
        sync.lock();
    }

lock方法会委托给AbstractQueueSynchronized的子类Sync

abstract void lock();

Sync有两个实现类,分别是公平锁和非公平锁,首先看下非公平锁的实现逻辑

非公平锁

    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            //如果CAS设置state的值为1成功,则设置锁的拥有者为当前线程
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                //否则调用AQS的acquire方法
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

假设现在有2个线程,线程1CAS设置重入次数为1成功后返回,此时线程2尝试CAS失败,就会调用acquire方法,在讲AQS的时候说过,该方法会首先调用子类重写的tryAcquire方法,也就是上面的第二个方法,然后进入下面这个方法:

		final boolean nonfairTryAcquire(int acquires) {
            //获取当前线程
            final Thread current = Thread.currentThread();
      			//获取同步状态
            int c = getState();
            //如果为0,说明锁空闲
            if (c == 0) {
                //CAS设置状态值
                if (compareAndSetState(0, acquires)) {
                  	//设置为当前线程所有
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            //如果当前线程还是持有锁的线程,说明是锁重入
            else if (current == getExclusiveOwnerThread()) {
                //状态值继续加1
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            //否则说明当前线程不是持有锁的线程,返回false,加入AQS阻塞队列
            return false;
        }

这里有三种情况:

1、就是上面说的,线程2此时不会走if和else if,直接返回false,被加入到AQS阻塞队列中

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

2、线程1又调用了lock方法,那么此时线程1会走else if逻辑,代表发生了锁重入

3、如果线程1持有的状态值减到0了,那么就会释放资源,并且线程1是头节点(持有锁的节点),而线程2是它的后继节点,那么根据AQS中分析的,线程2调用tryAcquire方法后,就会走第一个if逻辑,将当前持有锁的线程设置为线程2。

非公平的体现:还是假设2个线程,在2个线程执行之前,已经有一个线程获取到锁了,所以线程A执行上面的代码后返回false,被加入到阻塞队列,这时候线程B执行到第一个if语句的时候,占有锁的线程刚好释放了锁,那么线程B就能成功获取到锁,但是明明是线程A先请求获取锁的,这就是非公平的体现。之所以会这样,是因为线程B在获取锁之前并没有查看当前AQS队列中是否有比自己更早请求锁的线程

公平锁

公平锁的实现在FairSync重写的tryAcquire方法:

		protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                //公平性策略
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

与非公平锁不一样的地方在于多了一个判断

    public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized
        // before tail and on head.next being accurate if the current
        // thread is first in queue.
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

通过方法名可知这个方法的作用是判断是否有排队的前驱节点,如果返回true就代表有,否则就是没有

一、返回true的情况

  • 这里要注意(s=h.next)==null表示的是什么意思:在AQS中说过节点入队的时候,第一个元素入队是2步操作:首先创建一个哨兵节点,然后将第一个元素插入哨兵节点的后面,也就是说,在执行到这里的时候,如果刚好有第一个元素正在入队(具体可以看enq函数,当时看这段代码完全懵逼,Doug Lea是真牛逼),那么返回true
  • 如果头节点的后继节点中的线程不等于当前线程,那么说明队列中的第一个线程不是当前线程,返回true

二、返回false的情况

  • 当前队列为空,即h==t
  • h!=t && ((s==h.next)!=null || s.thread == Thread.currentThread());即当前线程是AQS的第一个节点

区别

公平与否是针对获取锁而言的,如果一个锁是公平的,那么锁的获取顺序就应符合请求锁的绝对时间顺序,也就是FIFO。

下面通过一段代码测试公平锁和非公平锁在获取锁时的区别:

public class FairAndUnfairTest {
    //公平锁
    private static Lock fairLock = new ReentrantLock2(true);
    //非公平锁
    private static Lock unfairLock = new ReentrantLock2(false);

    //公平锁测试
    public void fair() throws InterruptedException {
        testLock("公平锁",fairLock);
    }

    //非公平锁测试
    public void unfair() throws InterruptedException {
        testLock("非公平锁",unfairLock);
    }

    //启动线程
    private void testLock(String type, Lock lock) throws InterruptedException {
        System.out.println(type);
        for (int i = 0; i < 5; i++) {
            Thread thread=new Thread(new Job(lock)){
                @Override
                public String toString() {
                    return getName();
                }
            };
            thread.setName(""+i);
            thread.start();
        }
        Thread.sleep(11000);
    }

    //重写实现ReentrantLock类是为了重写getQueuedThreads方法,便于观察
    private static class ReentrantLock2 extends ReentrantLock {
        public ReentrantLock2(boolean fair) {
            super(fair);
        }

        /**
         * 返回正在获取等待锁的列表(获取同步队列中的线程)
         *
         * @return
         */
        @Override
        protected Collection<Thread> getQueuedThreads() {
            List<Thread> arrayList = new ArrayList<>(super.getQueuedThreads());
            //因为getQueuedThreads()方法是从后往前遍历添加到集合的,所以需要反转输出
            Collections.reverse(arrayList);
            return arrayList;
        }
    }

    private static class Job implements Runnable {
        private Lock lock;

        public Job(Lock lock) {
            this.lock = lock;
        }

        @Override
        public void run() {
            //连续打印两次才能看出效果,一次的话两者没区别
            for (int i = 0; i < 2; i++) {
                lock.lock();
                try{
                    Thread.sleep(1000);
                    //((ReentrantLock2)lock).getQueuedThreads()),这里一定要注意必须这样写
                    //如果这里改成new ReentrantLock2(fair)的话,那么和传进来的lock是不相关的,不会输出结果
                    //父类转换成子类,就可以调用子类的方法
                    System.out.println("Lock by["+ Thread.currentThread().getName()+"]"+
                            ",Waiting by"+((ReentrantLock2)lock).getQueuedThreads());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    lock.unlock();
                }
            }
        }

    }

输出结果如下:

公平锁
Lock by[0],Waiting by[1, 2, 4, 3]
Lock by[1],Waiting by[2, 4, 3, 0]
Lock by[2],Waiting by[4, 3, 0, 1]
Lock by[4],Waiting by[3, 0, 1, 2]
Lock by[3],Waiting by[0, 1, 2, 4]
Lock by[0],Waiting by[1, 2, 4, 3]
Lock by[1],Waiting by[2, 4, 3]
Lock by[2],Waiting by[4, 3]
Lock by[4],Waiting by[3]
Lock by[3],Waiting by[]
非公平锁
Lock by[0],Waiting by[1, 2, 3, 4]
Lock by[0],Waiting by[1, 2, 3, 4]
Lock by[1],Waiting by[2, 3, 4]
Lock by[1],Waiting by[2, 3, 4]
Lock by[2],Waiting by[3, 4]
Lock by[2],Waiting by[3, 4]
Lock by[3],Waiting by[4]
Lock by[3],Waiting by[4]
Lock by[4],Waiting by[]
Lock by[4],Waiting by[]

其中每个数字代表一个线程,可以发现,公平锁每次都是从队列中的第一个节点获取到锁,而非公平锁会出现一个线程连续获取到锁的情况,这会导致其它线程出现饥饿情况。既然如此,为什么非公平锁还会被设定为默认的实现呢?

如果把每次不同线程获取到锁定义为1次切换,那么会发现公平锁发生了10次切换,而非公平锁发生了5次切换,说明非公平锁的开销更小

总结:公平锁保证了锁的获取按照FIFO原则,但代价就是进行大量的线程切换。非公平锁虽然可能造成线程“饥饿”,但极少的线程切换,保证了其更大的吞吐量。

释放锁

释放锁入口:

    public void unlock() {
        sync.release(1);
    }

会调用到AQS的release方法,然后进入子类重写的tryRelease方法:

        protected final boolean tryRelease(int releases) {
            //锁的可重入次数减1
            int c = getState() - releases;
          	//释放的不是持有锁的线程
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            //如果锁重入次数为0了,说明当前线程放弃了对该锁的使用权
          	//其他锁可以获取同步状态了
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            //否则仅仅是将可重入次数减1
            setState(c);
            //只有可重入次数减为0才会返回true
            return free;
        }

总结

假设线程1、线程2、线程3同时获取独占锁ReentrantLock,如果线程1获取成功了,那么线程2和线程3就会加入到AQS阻塞队列中。假设线程1获取锁后调用了对应的锁创建的条件变量1,那么线程1会释放锁,然后转换为Node节点加入到条件变量对应的条件队列中。此时线程2和线程3就有机会获取到锁,如果使用的是公平锁模式,那么线程2就会获取到锁,从而从AQS队列移除线程2对应的Node节点。

Synchronized和ReentrantLock区别

ReentrantLock优势

等待可中断

当持有锁的线程长时间不释放时,可以调用interrupt方法中断当前线程,前提是获取锁的方法调用的是lockInterruptibly

公平锁

Synchronized中的锁是非公平的,ReentrantLock还支持公平锁,不过性能会下降

锁绑定多个条件

见上图

如何选择

虽然ReentrantLock是Synchronized的超集,但是如果两者都可以满足需求的情况下,优先推荐使用Synchronized:

  • synchronized是Java语法层面的同步,清晰简单
  • synchronized不需要显示释放锁(monitorexit指令),但是使用Lock接口需要在finally块显示释放锁,如果忘记释放锁,将会导致死锁
  • synchronized是jvm实现的,从长远来看,虚拟机更容易对synchronized进行优化。因为JVM可以在线程和对象的元数据中记录synchronized中锁的相关信息,而使用Lock的话,JVM很难得知具体哪些锁对象是由特定线程持有的。

ReentrantWriteReadLock

类图

1
https://gitee.com/cxylk/Java-Notes.git
git@gitee.com:cxylk/Java-Notes.git
cxylk
Java-Notes
Java-Notes
main

搜索帮助