pursue wind pursue wind
首页
Java
Python
数据库
框架
Linux
中间件
前端
计算机基础
DevOps
项目
面试
书
关于
归档
MacOS🤣 (opens new window)
GitHub (opens new window)
首页
Java
Python
数据库
框架
Linux
中间件
前端
计算机基础
DevOps
项目
面试
书
关于
归档
MacOS🤣 (opens new window)
GitHub (opens new window)
  • 工具类

  • Java-集合框架

  • Java8

  • Java-多线程

    • JUC学习笔记(上)
    • JUC学习笔记(下)
    • Java 重入锁 ReentrantLock 原理分析
    • AbstractQueuedSynchronizer 原理分析 - Condition 实现原理
    • AbstractQueuedSynchronizer 原理分析 - 独占 共享模式
    • AbstractQueuedSynchronizer 源码解读--续篇之Condition
      • 1. 背景
      • 2. Condition是什么
      • 3. 代码解读
        • 3.1 套路
        • 3.2 await方法
        • 3.3 signal/signalAll方法
      • 4. 思考与总结
    • Java CAS 原理分析
    • Java 线程同步组件 CountDownLatch 与 CyclicBarrier 原理分析
  • Java计时新姿势√
  • Java中的BlockingQueue
  • Lambda表达式被首次调用时很慢?从JIT到类加载再到实现原理
  • 正则表达式
  • Java定时任务
  • JavaWeb

  • Java
  • Java-多线程
pursuewind
2020-11-23
目录

AbstractQueuedSynchronizer 源码解读--续篇之Condition

# 1. 背景

在之前的AbstractQueuedSynchronizer源码解读 (opens new window)中,介绍了AQS的基本概念、互斥锁、共享锁、AQS对同步队列状态流转管理、线程阻塞与唤醒等内容。其中并不涉及Condition相关的内容。本文主要介绍AQS中Condition的实现即ConditionObject类的源码。 Condition在JUC中使用很多,最常见的就是各种BlockingQueue了。

# 2. Condition是什么

java.util.concurrent.locks.Condition是JUC提供的与Java的Object中wait/notify/notifyAll类似功能的一个接口,通过此接口,线程可以在某个特定的条件下等待/唤醒。 与wait/notify/notifyAll操作需要获得对象监视器类似,一个Condition实例与某个互斥锁绑定,在此Condition实例进行等待/唤醒操作的调用也需要获得互斥锁,线程被唤醒后需要再次获取到锁,否则将继续等待。 而与原生的wait/notify/notifyAll等API不同的地方在于,JUC提供的Condition具有更丰富的功能,例如等待可以响应/不响应中断,可以设定超时时间或是等待到某个具体时间点。 此外一把互斥锁可以绑定多个Condition,这意味着在同一把互斥锁上竞争的线程可以在不同的条件下等待,唤醒时可以根据条件来唤醒线程,这是Object中的wait/notify/notifyAll不具备的机制

# 3. 代码解读

# 3.1 套路

JUC中Condition接口的主要实现类是AQS的内部类ConditionObject,它内部维护了一个队列,我们可以称之为条件队列,在某个Condition上等待的线程被signal/signalAll后,ConditionObject会将对应的节点转移到外部类AQS的等待队列中,线程需要获取到AQS等待队列的锁,才可以继续恢复执行后续的用户代码。

这里给出一个流程:

await流程:
1. 创建节点加入到条件队列
2. 释放互斥锁
3. 只要没有转移到同步队列就阻塞(等待其他线程调用signal/signalAll或是被中断)
4. 重新获取互斥锁
signal流程:
1. 将队列中第一个节点转移到同步队列
2. 根据情况决定是否要唤醒对应线程
1
2
3
4
5
6
7
8

img 这里以我之前在[AbstractQueuedSynchronizer源码解读]画的AQS状态流转图来说明下: 如果一个节点通过ConditionObject#await等方法调用初始化后,在被唤醒之后,会将状态切换至0,也即无状态,随后进入AQS的同步队列,此后就与一般的争锁无异了。

# 3.2 await方法

public final void await() throws InterruptedException {
    // 对中断敏感。
    if (Thread.interrupted())
        throw new InterruptedException();
    // 加到条件队列中。
    Node node = addConditionWaiter();
    // 完全释放互斥锁(无论锁是否可以重入),如果没有持锁,会抛出异常。
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    /*
     * 只要仍未转移到同步队列就阻塞。
     * 转移的情况如下:
     * 1. 其它线程调用signal将当前线程节点转移到同步队列并唤醒当前线程。
     * 2. 其它线程调用signalAll。
     * 3. 其它线程中断了当前线程,当前线程会自行尝试进入同步队列。
     */
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        /*
         * 获取中断模式。
         * 在线程从park中被唤醒的时候,需要判断是否此时被中断,若中断则尝试转移到同步队列。
         * 1. 中断且自行进入同步队列,返回THROW_IE(值-1),后续需要抛出InterruptedException。
         * 2. 中断且未能自行进入同步队列,则说明有线程调用signal/signalAll唤醒线程并尝试转移到同步队列,
         *     返回REINTERRUPT,后续重新中断线程。
         * 3. 线程未被中断,返回0,此时需要重试循环判断是否上了同步队列。
         */
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 重新获取互斥锁过程中如果中断并且interruptMode不为"抛出异常",设置为REINTERRUPT。
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    // 如果线程发生过中断则根据THROW_IE或是REINTERRUPT分别抛出异常或者重新中断。
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

private Node addConditionWaiter() {
    Node t = lastWaiter;
    /*
     * 如果条件队列中最后一个waiter节点状态为取消,
     * 则调用unlinkCancelledWaiters清理队列。
     */
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        // 重读lastWaiter。
        t = lastWaiter;
    }
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    // t如果为null, 初始化firstWaiter为当前节点。
    if (t == null)
        firstWaiter = node;
    else
        // 将队尾的next连接到node。
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

/**
 * 移除队列中所有取消节点。
 */
private void unlinkCancelledWaiters() {
    Node t = firstWaiter;
    // 记录上一个非取消节点。
    Node trail = null;
    while (t != null) {
        Node next = t.nextWaiter;
        if (t.waitStatus != Node.CONDITION) {
            // 断开
            t.nextWaiter = null;
            if (trail == null)
                // 如果trail为null,取当前节点的后继作为头节点的值(next可以为null)。
                firstWaiter = next;
            else
                // 否则把trail连接到当前节点的后继。
                trail.nextWaiter = next;
            // 如果当前节点没有后继了, 更新lastWaiter为trail, 即上一个非取消节点。
            if (next == null)
                lastWaiter = trail;
        }
        else
            trail = t;
        t = next;
    }
}

final boolean isOnSyncQueue(Node node) {
    /*
     * 节点状态为CONDITION一定是在条件队列,
     * 或者如果prev为null也一定是在条件队列。
     *
     * 同步队列里的节点prev为null只可能是获取到锁后调用setHead清为null,
     * 新入队的节点prev值是不会为null的。
     * 另外,条件队列里节点是用nextWaiter来维护的,不用next和prev。
     */
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    /*
     * 如果next不为null,一定是在同步队列的。
     * 这里值得一提的是在AQS的cancelAcquire方法中,
     * 一个节点将自己移除出队列的时候会把自己的next域指向自己。
     * 即node.next = node;     
     *
     * 从GC效果上来看node.next = node和node.next = null无异,
     * 但是这对此处next不为null一定在同步队列上来说,
     * 这样可以将节点在同步队列上被取消的情况与普通情况归一化判断。
     */
    if (node.next != null)
        return true;
    /*
     * 有可能node.prev的值不为null,但还没在队列中,因为入队时CAS队列的tail可能失败。
     * 这是从tail向前遍历一次,确定是否已经在同步队列上。
     */
    return findNodeFromTail(node);
}

/**
 * 从队列尾部向前遍历判断节点是否在队列中。
 */
private boolean findNodeFromTail(Node node) {
    Node t = tail;
    for (;;) {
        if (t == node)
            return true;
        if (t == null)
            return false;
        t = t.prev;
    }
}

private int checkInterruptWhileWaiting(Node node) {
    /*
     * 1. 线程未中断返回0
     * 2. 线程中断且入同步队列成功,返回THROW_IE表示后续要抛出InterruptedException。
     * 3. 线程中断且未能入同步队列(由于被signal方法唤醒),则返回REINTERRUPT表示后续重新中断。
     */
    return Thread.interrupted() ?
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
        0;
}

final boolean transferAfterCancelledWait(Node node) {
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        enq(node);
        return true;
    }
    /*
     * 上面CAS失败的原因是signal()方法被调用,状态已经被抢先更新了。
     * 这时需要自旋等待节点成功进入同步队列,否则会影响后续的重新获取锁acquireQueued()方法。
     * 因为acquireQueued必须在节点成功入队后才可以调用。
     */
    while (!isOnSyncQueue(node))
        Thread.yield();
    return false;
}

/**
 * THROW_IE则抛出InterruptedException,
 * REINTERRUPT则重新中断当前线程。
 */
private void reportInterruptAfterWait(int interruptMode)
        throws InterruptedException {
        if (interruptMode == THROW_IE)
            throw new InterruptedException();
        else if (interruptMode == REINTERRUPT)
            selfInterrupt();
}

static void selfInterrupt() {
    Thread.currentThread().interrupt();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174

# 3.3 signal/signalAll方法

public final void signal() {
    // 检查互斥锁持有情况。
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

private void doSignal(Node first) {
    do {
        // 将firstWaiter设置为后继节点,如果为null,则置lastWaiter为null。
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        // 断开连接。
        first.nextWaiter = null;
        /*
         * 如果转移失败并且下一个节点不为null,则重试。
         * 在这里转移失败只可能因为节点被取消。
         */
    } while (!transferForSignal(first) &&
            (first = firstWaiter) != null);
}

public final void signalAll() {
    // 检查互斥锁持有情况。
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignalAll(first);
}

private void doSignalAll(Node first) {
    // 将firstWaiter和lastWaiter先清为null。
    lastWaiter = firstWaiter = null;
    // 从first开始一直遍历到第一个null节点。
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        transferForSignal(first);
        first = next;
    } while (first != null);
}

final boolean transferForSignal(Node node) {
    // 必须将状态从CONDITION流转到0,如果失败则说明节点被取消,因为这里不会存在signal的竞争。
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    // 插入同步队列。
    Node p = enq(node);
    int ws = p.waitStatus;
    /*
     * 如果前驱节点状态为取消或者无法将状态CAS到SIGNAL(比如可能前驱在此期间被取消了),
     * 则需要唤醒参数node节点对应的线程,使其能开始尝试争锁。
     *
     * 如果将前驱状态切到SIGNAL了,则由相应线程在释放锁之后唤醒node节点对应线程。
     */
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63

# 4. 思考与总结

至此,已经对ConditionObject的await/signal/signalAll方法源码进行了分析。对于中断不敏感的awaitUninterruptibly, 带有时限的awaitNanos由于大致套路与await无异,不作冗述。 ConditionObject的firstWaiter/lastWaiter以及AQS.Node的nextWaiter都是没有volatile修饰的。这是因为ConditionObject假设在await/signal/signalAll等方法的调用是已经持有互斥锁的。

个人认为ConditionObject这样的设计是有个问题的。即按照Condition接口的定义,在不持锁情况下调用await由子类决定如何处理,通常是抛出InterruptedException。但如果同时有持锁和不持锁的线程调用await方法,可能会对ConditionObject的内部队列造成破坏,后果就是有些成功调用await方法的线程可能永远没有办法被唤醒,因为无法通过队列追溯到它们。也就是非法调用会抛出异常,但仍然会对内部数据结构造成破坏,这其实是有一些不合理的,至少是可以改进的地方。 最简单的处理方式是,对于不持锁的请求抛出异常,不应该依靠await -> fullyRelease这一步来抛出异常,此时按照流程已经调用过addConditionWaiter了。可以在await这类方法前面与signal/signalAll一样预检查一次持锁情况:

if (!isHeldExclusively())
    throw new IllegalMonitorStateException();
1
2

整体而言,ConditionObject中代码对GC友好,逻辑缜密,读过之后受益匪浅。

Last Updated: 2023/02/14, 18:02:00
AbstractQueuedSynchronizer 原理分析 - 独占 共享模式
Java CAS 原理分析

← AbstractQueuedSynchronizer 原理分析 - 独占 共享模式 Java CAS 原理分析→

Theme by Vdoing | Copyright © 2019-2023 pursue-wind | 粤ICP备2022093130号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式
  • 飙升榜
  • 新歌榜
  • 云音乐民谣榜
  • 美国Billboard榜
  • UK排行榜周榜
  • 网络DJ