博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
多线程之美8一 AbstractQueuedSynchronizer源码分析<二>
阅读量:4096 次
发布时间:2019-05-25

本文共 8931 字,大约阅读时间需要 29 分钟。

目录

宠文 m.shupu.org

1458219-20191230230247625-1939714617.png

AQS的源码分析 <二>

该篇主要分析AQS的ConditionObject,是AQS的内部类,实现等待通知机制。

1、条件队列

条件队列与AQS中的同步队列有所不同,结构图如下:

1458219-20191230230309146-1155115184.png

两者区别:

  • 1、链表结构不同,条件队列是单向链表,同步队列是双向链表。
  • 2、两个队列中等待条件不同,条件队列中线程是已经获取到锁,主动调用await方法释放锁,挂起当前线程,等待某个条件(如IO,mq消息等),同步队列中的线程是等待获取锁,在获取锁失败后挂起等待锁可用。

两者联系:

当等待的某个条件完成,其他线程调用signal方法,通知挂起在条件队列中的线程,会将条件队列中该node移出,加入到同步队列中,node的ws状态由Node.CONDITION改为0 ,开始等待锁。

2、ConditionObject

ConditionObject 和 Node一样,都是AQS的内部类, ConditionObject实现Condition接口,主要实现线程调用 await和signal ,实现线程条件阻塞和通知机制,Condition对象通过 Lock子类调用newConditon方法获取,以

ReentrantLock为例,代码如下:

ReentrantLock lock  = new ReentrantLock();Condition condition =  lock.newCondition();

可见排他锁的newCondition方法返回的是ConditionObject对象

final ConditionObject newCondition() {    return new ConditionObject();}

简单生产者消费示例代码:

package AQS;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.ReentrantLock;/** * @author zdd * 2019/12/30 下午 * Description: 利用ReentrantLock和Condition实现生产者消费者 */public class ConditionTest {   static ReentrantLock lock  = new ReentrantLock();   static Condition condition =  lock.newCondition();    public static void main(String[] args) {       //资源类        Apple apple = new Apple();     //1.开启生产者线程        new Thread(()-> {            for (;;) {                lock.lock();                try {                    //苹果没有被消费,吃完通知我,我再生产哦                    if (apple.getNumber() > 0) {                        condition.await();                    }                    TimeUnit.SECONDS.sleep(1);                    System.out.println("生产一个苹果");                    apple.addNumber();                    //通知消费线程消费                    condition.signal();                } catch (InterruptedException e) {                    e.printStackTrace();                } finally {                    lock.unlock();                }            }        },"producer").start();      //2.开启消费者线程        new Thread(()-> {            for (;;) {                lock.lock();                try {                    //苹果数量为0,挂起等待生产苹果,有苹果了会通知                    if(apple.getNumber() == 0) {                        condition.await();                    }                    TimeUnit.SECONDS.sleep(1);                    System.out.println("消费一个苹果");                    apple.decreNumber();                    //通知生产线程生产                    condition.signal();                } catch (InterruptedException e) {                    e.printStackTrace();                } finally {                    lock.unlock();                }            }        },"consumer").start();    }  //定义苹果内部类    static class Apple {        //记录苹果数量        private Integer number =0;        public void addNumber() {            number++;            System.out.println(Thread.currentThread().getName() +"当前苹果数量:"+number );        }        public void decreNumber() {            number--;            System.out.println(Thread.currentThread().getName() +"当前苹果数量:"+number);        }        public Integer getNumber() {            return number;        }    }}

执行结果如下图:

1458219-20191230230334322-1069565226.png

2.1、 await() 方法

当前线程是在已经获取锁的情况下,调用await方法主动释放锁,挂起当前线程,等待某个条件(IO,mq消息等)唤醒,再去竞争获取锁的过程。该方法会将当前线程封装到node节点中,添加到Condition条件队列中,释放锁资源,并挂起当前线程。

具体执行步骤如下:

1、线程封装到node中,并添加到Condition条件队列中,ws =-2 即为Node.CONDITION。

2、释放锁。

3、将自己阻塞挂起,如果线程被唤醒,首先检查自己是被中断唤醒的不。如果是被中断唤醒,跳出while循环;如果是被其他线程signal唤醒,则判断当前线程所在node是否被加入到同步等待队列,已在同步队列中也跳出while循环,否则继续挂起,signal唤醒逻辑会将condition条件队列node 移出,加入到同步队列中,去等待获取锁。

4,线程被唤醒,执行acquireQueued方法,线程会尝试获取锁,若失败则在同步队列中找到安全位置阻塞,成功则从调用await()方法处继续向下执行,返回值表示当前线程是否被中断过。

public final void await() throws InterruptedException {            if (Thread.interrupted())                throw new InterruptedException();            Node node = addConditionWaiter();            int savedState = fullyRelease(node);            int interruptMode = 0;            while (!isOnSyncQueue(node)) {               //挂起当前线程                LockSupport.park(this);              // 被唤醒: 1,被其他线程唤醒,2,中断唤醒,                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)                    break;            }           //1,如果被signal正常唤醒执行acquireQueued,返回false,如果获取到锁就继续执行调用await后面的代码了,未获取到锁就在同步队列中继续挂起等待锁执行了           //2,如果被中断唤醒的,acquireQueued 返回true            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)               //线程在被signal后,再被中断的                interruptMode = REINTERRUPT;         //  后面代码处理的是被中断唤醒的情况            if (node.nextWaiter != null) // clean up if cancelled              //如果nextWaiter!=null,则表示还在条件队列中,清理一下所有被取消node              //什么情况下会进入该if判断中,如果是正常被signal的,会将该node从条件队列移出加入到同步队列中的, nextWaiter 一定为null,那就是被异常中断情况,                unlinkCancelledWaiters();            if (interruptMode != 0)               //响应中断模式                reportInterruptAfterWait(interruptMode);        }

第1步,执行addConditionWaiter方法,主要逻辑是将线程封装为Node,并添加到条件队列中

private Node addConditionWaiter() {          //1.获取队列中最后一个节点            Node t = lastWaiter;            //2.如果最后一个节点被取消,清除出队            if (t != null && t.waitStatus != Node.CONDITION) {                unlinkCancelledWaiters();                t = lastWaiter;            }            //3. t 指向最新有效的节点,也可能条件队列为空,t==null            Node node = new Node(Thread.currentThread(), Node.CONDITION);            if (t == null)                firstWaiter = node;            else                t.nextWaiter = node;            lastWaiter = node;            return node;        }

第2步,完全释放锁 fullyRelease,将同步状态state 设置为初始值0,这里考虑到有多次重入获取锁情况,state >1,这时需完全释放锁。

final int fullyRelease(Node node) {        boolean failed = true;        try {            int savedState = getState();            //1,释放锁            if (release(savedState)) {                failed = false;                return savedState;            } else {                throw new IllegalMonitorStateException();            }        } finally {            if (failed)            //2,释放锁失败,将条件队列中的节点标记为取消                node.waitStatus = Node.CANCELLED;        }    }

isOnSyncQueue 判断node是否在同步队列中

final boolean isOnSyncQueue(Node node) {       //1,这2种情况肯定没有在同步队列中        if (node.waitStatus == Node.CONDITION || node.prev == null)            return false;        if (node.next != null) // If has successor, it must be on queue            return true;       //3.从同步队列尾节点开始对比,看是否在同步队列中        return findNodeFromTail(node);    }

findNodeFromTail 从后向前寻找

private boolean findNodeFromTail(Node node) {        Node t = tail;        for (;;) {            if (t == node)                return true;            if (t == null)                return false;            t = t.prev;        }    }

在线程被唤醒后,检查挂起期间是否被中断

private int checkInterruptWhileWaiting(Node node) {    return Thread.interrupted() ?        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :        0;}

如果线程被中断了,那就需要将在条件队列中等待的该节点执行 transferAfterCancelledWait

final boolean transferAfterCancelledWait(Node node) {       // 判断是否是被signal通知唤醒的,会更新为0,更新成功,执行入队操作(加入同步队列)        if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {            enq(node);            return true;        }        while (!isOnSyncQueue(node))          //未在同步队列中,让出处理器,线程回到就绪态,等待下一次分配cpu调度            Thread.yield();        return false;    }

最后根据不同的中断值做出相应处理

private void reportInterruptAfterWait(int interruptMode)    throws InterruptedException {    if (interruptMode == THROW_IE)        //1,直接抛出中断异常        throw new InterruptedException();    else if (interruptMode == REINTERRUPT)        //2,中断标志        selfInterrupt();}

2.2、signal方法

就是将条件队列中的node移出,加入到同步队列等待获取锁的过程。

流程图如下:

1458219-20191230230400596-38393636.png

public final void signal() {            if (!isHeldExclusively())                throw new IllegalMonitorStateException();            Node first = firstWaiter;            if (first != null)                doSignal(first);        }
private void doSignal(Node first) {            do {                if ( (firstWaiter = first.nextWaiter) == null)                    lastWaiter = null;              // 1、将first节点执行出队操作                first.nextWaiter = null;            } while (!transferForSignal(first) &&                     (first = firstWaiter) != null);             //2,如果条件队列中有ws =-2的节点,肯定会移出一个到同步队列中        }
final boolean transferForSignal(Node node) {       //1,将node ws更新为0 ,如果node 状态不等于CONDITION,一定是被取消了        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))            return false;       //2,加入到同步队列中,返回的p是node的pre         Node p = enq(node);        int ws = p.waitStatus;       //3,如果前置节点被取消,或者更新p的 ws =-1 失败,直接唤醒线程,否则等待前置节点唤醒自己        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))           //唤醒线程            LockSupport.unpark(node.thread);        return true;    }

3、总结

1、Condition提供的阻塞通知机制与Object类两者对比:

  • 方法不同,Condition提供方法有 await(), signal(),signalAll(), Object类提供的是wait(),notify() , notifyAll()
  • 配合使用对象不同,Condition条件需要和Lock配合使用,Object类需和Synchronized关键字配合。
  • 多条件, Condition可实现多个条件,即创建多个Condition对象,可以每个Condition对象对应一种条件,从而有选择的实现唤醒通知,Object类的要唤醒一个阻塞线程,只能在一个条件队列中,唤醒是随机的,没有Condition使用灵活。

2、注意区别Condition条件队列与同步队列两者的区别,2个队列中线程等待条件不同

转载地址:http://goxii.baihongyu.com/

你可能感兴趣的文章
前端设计之CSS布局:上中下三栏自适应高度CSS布局
查看>>
Java的时间操作玩法实例若干
查看>>
JavaScript:时间日期格式验证大全
查看>>
pinyin4j:拼音与汉字的转换实例
查看>>
XML工具代码:SAX从String字符串XML内获取指定节点或属性的值
查看>>
时间日期:获取两个日期相差几天
查看>>
责任链模式 Chain of Responsibility
查看>>
高并发与大数据解决方案概述
查看>>
解决SimpleDateFormat线程安全问题NumberFormatException: multiple points
查看>>
MySQL数据库存储引擎简介
查看>>
处理Maven本地仓库.lastUpdated文件
查看>>
Kafka | 请求是怎么被处理的?
查看>>
Java并发编程1-线程池
查看>>
CentOS7,玩转samba服务,基于身份验证的共享
查看>>
计算机网络-网络协议模型
查看>>
计算机网络-OSI各层概述
查看>>
Java--String/StringBuffer/StringBuilder区别
查看>>
mySQL--深入理解事务隔离级别
查看>>
分布式之redis复习精讲
查看>>
数据结构与算法7-栈
查看>>