多线程并发是Java语言中非常重要的一块内容,同时,也是Java基础的一个难点。说它重要是因为多线程是日常开发中频繁用到的知识,说它难是因为多线程并发涉及到的知识点非常之多,想要完全掌握Java的并发相关知识并非易事。也正因此,Java并发成了Java面试中最高频的知识点之一。本系列文章将从Java内存模型、volatile关键字、synchronized关键字、ReetrantLock、Atomic并发类以及线程池等方面来系统的认识Java的并发知识。通过本系列文章的学习你将深入理解volatile关键字的作用,了解到synchronized实现原理、AQS和CLH队列锁,清晰的认识自旋锁、偏向锁、乐观锁、悲观锁…等等一系列让人眼花缭乱的并发知识。
多线程并发系列文章:
这一次,彻底搞懂Java内存模型与volatile关键字
这一次,彻底搞懂Java中的synchronized关键字
这一次,彻底搞懂Java中的ReentrantLock实现原理
这一次,彻底搞懂Java并发包中的Atomic原子类
深入理解Java线程的等待与唤醒机制(一)
深入理解Java线程的等待与唤醒机制(二)
Java并发系列终结篇:彻底搞懂Java线程池的工作原理
Java并发系列番外篇:ThreadLocal原理其实很简单
本文是Java并发系列的第五篇文章,将深入分析Java的唤醒与等待机制。
上篇文章我们从“生产者-消费者”模型出发,深入的分析了wait和notify/notifyAll的底层实现。并且了解到生产者线程与消费者线程在调用wait时都会被加入到synchronized锁对象monitor的WaitSet队列中。那么在唤醒线程的时候就无法准确的唤醒某一类线程。而在这一次,彻底搞懂Java中的ReentrantLock实现原理 这一篇文章中我们认识了更为灵活地显式锁ReentrantLock。ReentrantLock与synchronized类似,也有一套类似wait与notify/notifyAll的等待唤醒机制–Condition。本篇文章我们就来深入的认识ReentrantLock的Condition与线程的等待与唤醒机制。
开始之前先给大家推荐一下AndroidNote 这个GitHub仓库,这里是我的学习笔记,同时也是我文章初稿的出处。这个仓库中汇总了大量的java进阶和Android进阶知识。是一个比较系统且全面的Android知识库。对于准备面试的同学也是一份不可多得的面试宝典,欢迎大家到GitHub的仓库主页关注。
一、认识Lock的Condition
注:下文中将会多次出现等待队列 这一关键词,这里指得是调用了await方法后处于等待状态的队列,赢注意与上篇文章中AQS中的同步队列 做区分。同时,这里的等待队列 等同于synchronized中的 _WaitSet集合。
在这一次,彻底搞懂Java中的ReentrantLock实现原理 中关于Condition其实也有所提及,在使用Lock来保证线程同步时,我们可以使用Condition来协调线程间的协作。相比synchronize的监视器锁,Condition提供了更加灵活和精确的线程控制。它的最大特点是可以为不同的线程建立多个Condition,从而达到精确控制某一些线程的休眠与唤醒。
Condition是一个接口,内部主要提供了一些线程休眠与唤醒相关的方法,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public interface Condition { void await () throws InterruptedException ; void awaitUninterruptibly () ; long awaitNanos (long nanosTimeout) throws InterruptedException ; boolean await (long time, TimeUnit unit) throws InterruptedException ; boolean awaitUntil (Date deadline) throws InterruptedException ; void signal () ; void signalAll () ; }
Condition的实现类是在AQS中的ConditionObject,关于ConditionObject我们后边再看,接下来看下如何使用Condition来实现线程的等待与唤醒。
Condition实现“生产者-消费者”模式 仍然以“生产者-消费者”模式来看Condition的使用,沿用上篇文章生产面包的例子,稍加改动后的面包容器类如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 public class BreadContainer { LinkedList<Bread> list = new LinkedList<>(); private final static int CAPACITY = 10 ; Lock lock = new ReentrantLock(); private final Condition providerCondition = lock.newCondition(); private final Condition consumerCondition = lock.newCondition(); public void put (Bread bread) { try { lock.lock(); while (list.size() == CAPACITY) { try { providerCondition.await(); } catch (InterruptedException e) { e.printStackTrace(); } } list.add(bread); consumerCondition.signalAll(); System.out.println(Thread.currentThread().getName() + " product a bread" + bread.toString() + " size = " + list.size()); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void take () { try { lock.lock(); while (list.isEmpty()) { try { consumerCondition.await(); } catch (InterruptedException e) { e.printStackTrace(); } } Bread bread = list.removeFirst(); providerCondition.signalAll(); System.out.println("Consumer " + Thread.currentThread().getName() + " consume a bread" + bread.toString() + " size = " + list.size()); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } }
可以看到,在上述代码中我们声明了两个Condition,一个生产者Condition,一个消费者Condition。在put方法中使用ReentrantLock来实现同步,同时,当容器满时调用生产者Condition的await方法使生产者线程进入等待状态。如果生产成功,则调用消费者Condition的signalAll方法来唤醒消费者线程。take方法与put类似,不再赘述。这里要注意的是在使用Condition前必须先获得锁。
生产者消费者类与synchronize的实现一致,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public class Producer implements Runnable { private final BreadContainer container; public Producer (BreadContainer container) { this .container = container; } @Override public void run () { container.put(new Bread()); } }public class Consumer implements Runnable { private final BreadContainer container; public Consumer (BreadContainer container) { this .container = container; } @Override public void run () { container.take(); } }
那接下来测试类我们仍然实例化多个生产者线程与多个消费者线程,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public class Test { public static void main (String[] args) { BreadContainer container = new BreadContainer(); new Thread(() -> { for (int i = 0 ; i < 100 ; i++) { new Thread(new Producer(container)).start(); } }).start(); new Thread(() -> { for (int i = 0 ; i < 100 ; i++) { new Thread(new Consumer(container)).start(); } }).start(); } }
运行后生产者线程与消费者线程可以很好的实现线程协作。与使用synchronized不同的是这里有两个Condition,分别来控制生产者和消费者。
接下来,我们分析一下Condition的实现原理
二、Condition实现原理 上一章中我们已经知道Condition仅仅是一个接口,它的具体实现是在AQS的内部类ConditionObject中。调用ReentrantLock的newCondition实际上就是实例化了一个ConditionObject,代码如下:
1 2 3 4 final ConditionObject newCondition () { return new ConditionObject(); }
可见,在第一章BreadContainer中的providerCondition与consumerCondition是两个不同的ConditionObject实例。
ConditionObject的类结构如下:
1 2 3 4 5 6 7 8 public class ConditionObject implements Condition , java .io .Serializable { private transient Node firstWaiter; private transient Node lastWaiter; public ConditionObject () { } }
ConditionObject的结构比较简单,它内部维护了一个Node类型等待队列 。其中firstWaiter指向队列的头结点,而lastWaiter指向队列的尾结点。关于Node节点,在ReentrantLock那篇文章中已经详细介绍过了,它封装的是一个线程的节点,这里也不再赘述。在线程中调用了Condition的await方法后,线程就会被封装成一个Node节点,并将Node的waitStatus设置成CONDITION状态,然后插入到这个Condition的等待队列中。等到收到singal或者被中断、超时就会被从等待队列中移除。其结构示意图如下:
接下来我们从源码的角度来分析Condition的实现。
1.Condition的await方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public final void await () throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0 ; while (!isOnSyncQueue(node)) { LockSupport.park(this ); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0 ) break ; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null ) unlinkCancelledWaiters(); if (interruptMode != 0 ) reportInterruptAfterWait(interruptMode); }
在wait方法中首先会调用addConditionWaiter方法将线程封装成一个Node节点,并加入到等待队列中。addConditionWaiter的代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 private Node addConditionWaiter () { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node t = lastWaiter; if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Node.CONDITION); if (t == null ) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
addConditionWaiter方法的逻辑比较简单,就是将线程封装成Node并加入等待队列的操作。加入队列后,await方法又调用了fullyRelease去释放锁,在fullyRelease方法中会将state置为0,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 final int fullyRelease (Node node) { try { int savedState = getState(); if (release(savedState)) return savedState; throw new IllegalMonitorStateException(); } catch (Throwable t) { node.waitStatus = Node.CANCELLED; throw t; } }
这个方法主要是调用了release方法来释放锁,如果释放失败,则将节点置为CANCELLED状态。关于release这个方法在ReentrantLock中已经分析过,这里不再赘述。
释放锁之后,开启while来调用isOnSyncQueue方法,这个方法是用来判断当前节点是否在同步队列中。如果不在同步队列,则会进入自旋,并阻塞线程,等待节点进入同步队列。isOnSyncQueue的代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 final boolean isOnSyncQueue (Node node) { if (node.waitStatus == Node.CONDITION || node.prev == null ) return false ; if (node.next != null ) return true ; return findNodeFromTail(node); }private boolean findNodeFromTail (Node node) { for (Node p = tail;;) { if (p == node) return true ; if (p == null ) return false ; p = p.prev; } }
如果isOnSyncQueue返回了true,那么说明该node节点已经进入同步队列中了,则会结束自旋并调用acquireQueued,关于acquireQueued在ReentrantLock文章中已经详细分析过了,它是一个判断当前node的前驱节点是不是head,如果不是head则挂起线程,如果是head则唤醒并尝试获取锁的操作。
总的来说,调用await方法会让线程进入等待队列,并释放锁。当等待队列中的节点被唤醒时,会将节点移入到同步队列,然后await结束自旋,并调用acquireQueued来获取锁。
2.Condition的signal方法 可以通过调用Condition的signal或者signalAll方法来唤醒线程。不同的是signalAll方法会唤醒所有等待状态的线程,而singal只会唤醒等待队列头部的线程节点。这里我们选用signal方法来分析,signal方法类似Object中的notify方法,调用signal方法会将等待队列的首节点移入同步队列并唤醒。它的实现相比await来说会比较容易理解,看下代码:
1 2 3 4 5 6 7 8 public final void signal () { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null ) doSignal(first); }
在signal中会拿到等待队列的首节点并调用doSignal方法将其唤醒,doSignal代码如下:
1 2 3 4 5 6 7 8 9 private void doSignal (Node first) { do { if ( (firstWaiter = first.nextWaiter) == null ) lastWaiter = null ; first.nextWaiter = null ; } while (!transferForSignal(first) && (first = firstWaiter) != null ); }
doSignal方法中是一个循环唤醒等待队列首节点的操作,核心方法是transferForSignal,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 final boolean transferForSignal (Node node) { if (!node.compareAndSetWaitStatus(Node.CONDITION, 0 )) return false ; Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true ; }private Node enq (Node node) { for (;;) { Node oldTail = tail; if (oldTail != null ) { node.setPrevRelaxed(oldTail); if (compareAndSetTail(oldTail, node)) { oldTail.next = node; return oldTail; } } else { initializeSyncQueue(); } } }
transferForSignal实际上就是做了一个队列的转移,将node从等待队列移动到了同步队列。进入同步队列后,在wait方法中的自旋操作便能检测到node节点的状态,从而执行acquireQueued方法拿锁。
总的来说signal方法会从等待队列的队首开始,尝试唤醒队首线程,如果该节点是CANCELLED状态,则继续唤醒下一个节点。当节点被唤醒后会将其加入到同步队列,接着wait方法停止自旋执行acquireQueued方法。
总结 通过对Condition的await与signal方法的分析,可以看得出来这两个方法并非独立存在,而是一个相互配合的关系。await方法会将执行的线程封装成Node加入到等待队列,然后开启一个循环检测这个node看是否被加入到了同步队列,如果被加入到同步队列,那么调用acquireQueued开始排队竞争锁,如果没有被加入同步队列,则会一直挂起线程等待被唤醒。而signal方法则是将等待队列中的队首元素移动到同步队列,这样就触发了await方法的循环终结,继而能够执行acquireQueued方法。其流程如下图所示:
关于Java线程的等待与唤醒机制,到这里就全部结束了,通过本篇文章的学习,更加深入的了解了线程等待与唤醒的原理,其实可以看得出来无论synchronized监视器锁的等待与唤醒还是Lock锁的等待与唤醒都有着类似的原理,只不过synchronized是虚拟机底层实现,而ReentrantLock是基于Java层的实现。