本文共 8931 字,大约阅读时间需要 29 分钟。
AQS的源码分析 <二>
该篇主要分析AQS的ConditionObject,是AQS的内部类,实现等待通知机制。
条件队列与AQS中的同步队列有所不同,结构图如下:
两者区别:
两者联系:
当等待的某个条件完成,其他线程调用signal方法,通知挂起在条件队列中的线程,会将条件队列中该node移出,加入到同步队列中,node的ws状态由Node.CONDITION改为0 ,开始等待锁。
ConditionObject 和 Node一样,都是AQS的内部类, ConditionObject实现Condition接口,主要实现线程调用 await和signal ,实现线程条件阻塞和通知机制,Condition对象通过 Lock子类调用newConditon方法获取,以
ReentrantLock为例,代码如下:
ReentrantLock lock = new ReentrantLock();Condition condition = lock.newCondition();
可见排他锁的newCondition方法返回的是ConditionObject对象
final ConditionObject newCondition() { return new ConditionObject();}
简单生产者消费示例代码:
package AQS;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.ReentrantLock;/** * @author zdd * 2019/12/30 下午 * Description: 利用ReentrantLock和Condition实现生产者消费者 */public class ConditionTest { static ReentrantLock lock = new ReentrantLock(); static Condition condition = lock.newCondition(); public static void main(String[] args) { //资源类 Apple apple = new Apple(); //1.开启生产者线程 new Thread(()-> { for (;;) { lock.lock(); try { //苹果没有被消费,吃完通知我,我再生产哦 if (apple.getNumber() > 0) { condition.await(); } TimeUnit.SECONDS.sleep(1); System.out.println("生产一个苹果"); apple.addNumber(); //通知消费线程消费 condition.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } },"producer").start(); //2.开启消费者线程 new Thread(()-> { for (;;) { lock.lock(); try { //苹果数量为0,挂起等待生产苹果,有苹果了会通知 if(apple.getNumber() == 0) { condition.await(); } TimeUnit.SECONDS.sleep(1); System.out.println("消费一个苹果"); apple.decreNumber(); //通知生产线程生产 condition.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } },"consumer").start(); } //定义苹果内部类 static class Apple { //记录苹果数量 private Integer number =0; public void addNumber() { number++; System.out.println(Thread.currentThread().getName() +"当前苹果数量:"+number ); } public void decreNumber() { number--; System.out.println(Thread.currentThread().getName() +"当前苹果数量:"+number); } public Integer getNumber() { return number; } }}
执行结果如下图:
当前线程是在已经获取锁的情况下,调用await方法主动释放锁,挂起当前线程,等待某个条件(IO,mq消息等)唤醒,再去竞争获取锁的过程。该方法会将当前线程封装到node节点中,添加到Condition条件队列中,释放锁资源,并挂起当前线程。
具体执行步骤如下:
1、线程封装到node中,并添加到Condition条件队列中,ws =-2 即为Node.CONDITION。
2、释放锁。
3、将自己阻塞挂起,如果线程被唤醒,首先检查自己是被中断唤醒的不。如果是被中断唤醒,跳出while循环;如果是被其他线程signal唤醒,则判断当前线程所在node是否被加入到同步等待队列,已在同步队列中也跳出while循环,否则继续挂起,signal唤醒逻辑会将condition条件队列node 移出,加入到同步队列中,去等待获取锁。
4,线程被唤醒,执行acquireQueued方法,线程会尝试获取锁,若失败则在同步队列中找到安全位置阻塞,成功则从调用await()方法处继续向下执行,返回值表示当前线程是否被中断过。
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { //挂起当前线程 LockSupport.park(this); // 被唤醒: 1,被其他线程唤醒,2,中断唤醒, if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } //1,如果被signal正常唤醒执行acquireQueued,返回false,如果获取到锁就继续执行调用await后面的代码了,未获取到锁就在同步队列中继续挂起等待锁执行了 //2,如果被中断唤醒的,acquireQueued 返回true if (acquireQueued(node, savedState) && interruptMode != THROW_IE) //线程在被signal后,再被中断的 interruptMode = REINTERRUPT; // 后面代码处理的是被中断唤醒的情况 if (node.nextWaiter != null) // clean up if cancelled //如果nextWaiter!=null,则表示还在条件队列中,清理一下所有被取消node //什么情况下会进入该if判断中,如果是正常被signal的,会将该node从条件队列移出加入到同步队列中的, nextWaiter 一定为null,那就是被异常中断情况, unlinkCancelledWaiters(); if (interruptMode != 0) //响应中断模式 reportInterruptAfterWait(interruptMode); }
第1步,执行addConditionWaiter方法,主要逻辑是将线程封装为Node,并添加到条件队列中
private Node addConditionWaiter() { //1.获取队列中最后一个节点 Node t = lastWaiter; //2.如果最后一个节点被取消,清除出队 if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } //3. t 指向最新有效的节点,也可能条件队列为空,t==null Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
第2步,完全释放锁 fullyRelease,将同步状态state 设置为初始值0,这里考虑到有多次重入获取锁情况,state >1,这时需完全释放锁。
final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); //1,释放锁 if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) //2,释放锁失败,将条件队列中的节点标记为取消 node.waitStatus = Node.CANCELLED; } }
isOnSyncQueue 判断node是否在同步队列中
final boolean isOnSyncQueue(Node node) { //1,这2种情况肯定没有在同步队列中 if (node.waitStatus == Node.CONDITION || node.prev == null) return false; if (node.next != null) // If has successor, it must be on queue return true; //3.从同步队列尾节点开始对比,看是否在同步队列中 return findNodeFromTail(node); }
findNodeFromTail 从后向前寻找
private boolean findNodeFromTail(Node node) { Node t = tail; for (;;) { if (t == node) return true; if (t == null) return false; t = t.prev; } }
在线程被唤醒后,检查挂起期间是否被中断
private int checkInterruptWhileWaiting(Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;}
如果线程被中断了,那就需要将在条件队列中等待的该节点执行 transferAfterCancelledWait
final boolean transferAfterCancelledWait(Node node) { // 判断是否是被signal通知唤醒的,会更新为0,更新成功,执行入队操作(加入同步队列) if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { enq(node); return true; } while (!isOnSyncQueue(node)) //未在同步队列中,让出处理器,线程回到就绪态,等待下一次分配cpu调度 Thread.yield(); return false; }
最后根据不同的中断值做出相应处理
private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) //1,直接抛出中断异常 throw new InterruptedException(); else if (interruptMode == REINTERRUPT) //2,中断标志 selfInterrupt();}
就是将条件队列中的node移出,加入到同步队列等待获取锁的过程。
流程图如下:
public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); }
private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; // 1、将first节点执行出队操作 first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); //2,如果条件队列中有ws =-2的节点,肯定会移出一个到同步队列中 }
final boolean transferForSignal(Node node) { //1,将node ws更新为0 ,如果node 状态不等于CONDITION,一定是被取消了 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; //2,加入到同步队列中,返回的p是node的pre Node p = enq(node); int ws = p.waitStatus; //3,如果前置节点被取消,或者更新p的 ws =-1 失败,直接唤醒线程,否则等待前置节点唤醒自己 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) //唤醒线程 LockSupport.unpark(node.thread); return true; }
1、Condition提供的阻塞通知机制与Object类两者对比:
2、注意区别Condition条件队列与同步队列两者的区别,2个队列中线程等待条件不同
转载地址:http://goxii.baihongyu.com/