Java AbstractQueuedSynchronizer笔记

AbstractQueuedSynchronizer使用CLH节点构建双向链表。在head获得锁、释放锁。在tail加入等待队列。AQS支持共享锁、独占锁、条件队列。为了解决并发释放共享锁,增加了PROPAGATE状态。

AbstractQueuedSynchronizer是JUC包的基础,解决了同步器的细节问题(同步状态、FIFO队列),ReentrantLock、Semaphore、CountDownLatch等类都使用了AQS。

CLH lock

      +------+  prev +------+  prev +------+
      |      | <---- |      | <---- |      |  
 head | Node |  next | Node |  next | Node |  tail
      |      | ----> |      | ----> |      |  
      +------+       +------+       +------+

CLH锁通常用于自旋锁。AQS使用CLH的变种作为同步器。CLH节点数据结构如下。

static final class Node {
    /** Marker to indicate a node is waiting in shared mode */
    static final Node SHARED = new Node();
    /** Marker to indicate a node is waiting in exclusive mode */
    static final Node EXCLUSIVE = null;
    
    /** waitStatus value to indicate thread has cancelled */
    static final int CANCELLED =  1;
    /** waitStatus value to indicate successor's thread needs unparking */
    static final int SIGNAL    = -1;
    /** waitStatus value to indicate thread is waiting on condition */
    static final int CONDITION = -2;
    /**
     * waitStatus value to indicate the next acquireShared should
     * unconditionally propagate
     */
    static final int PROPAGATE = -3;

    volatile int waitStatus;

    volatile Node prev;
    volatile Node next;

    /**
     * The thread that enqueued this node.  Initialized on
     * construction and nulled out after use.
     */
    volatile Thread thread;
    // 存储condition队列中的后继节点
    Node nextWaiter;
}

waitStatus是节点的状态。

  • SIGNAL:-1。这个节点释放、或者取消的时候,要通过unpark操作通知后继节点。
  • CANCELLED:1。节点由于超时或者中断,因此取消了。
  • CONDITION:-2。这个节点处于条件队列。
  • PROPAGATE:-3。releaseShared操作的时候要传递到其他节点。
  • 0:其他。

对于普通同步节点,waitStatus初始化为0,如果是条件队列节点,则初始化为CONDITION。

AQS是由CLH node构建的双向链表,head、tail分别指向链表的头部、尾部。

clh-lock.png

private transient volatile Node head;
private transient volatile Node tail;

/**
 * The synchronization state.
 */
private volatile int state;

留意state字段,子类使用state字段存储同步状态。

AQS入队

/**
 * Creates and enqueues node for current thread and given mode.
 *
 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
 * @return the new node
 */
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

入队操作发生在tail:node.prev = pred。这里做了优化,假设入队竞争不大,先尝试检查和更新tail,失败才走完整的enq流程。

enq逻辑如下

private Node enq(final Node node) {
    for (;;) {
        // 入队发生在tail
        Node t = tail;
        if (t == null) { // Must initialize
            // 由于AQS的head、tail节点使用了lazy init方式,
            // 所以要先检查节点是否为null,并做初始化。
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // 设置入队节点的前驱为之前的tail
            // 并且尝试原子化检查和更新tail
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

/**
 * CAS head field. Used only by enq.
 */
private final boolean compareAndSetHead(Node update) {
    return unsafe.compareAndSwapObject(this, headOffset, null, update);
}

/**
 * CAS tail field. Used only by enq.
 */
private final boolean compareAndSetTail(Node expect, Node update) {
    return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}

在一个无限循环中不停检查和重试更新tail,直至入队成功。 底层使用了Unsafe类,直接使用底层硬件提供的原子化操作,这里先不展开。

获取互斥锁

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

acquire是互斥方式获取锁,不支持中断。 tryAcquire是模板方法,由子类覆盖(ReentrantLock、Semaphore等)。 当尝试获取锁失败,则使用addWaiter把当前线程添加到CLH队列,并在循环中尝试。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            // 当前节点的前驱是head,才尝试获取同步器
            // tryAcquire是模板方法,由子类覆盖
            if (p == head && tryAcquire(arg)) {
                // 更新node为head
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 判断是否要阻塞线程
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

有意思的是获取锁失败后,判断是否需要阻塞线程。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 检查前驱节点的waitStatus
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        // 需要被阻塞
        return true;
    if (ws > 0) {
        // ws > 0 即CANCELLED,往回找到非CANCELLED节点
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        // 其余情况设置为 SIGNAL
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

private final boolean parkAndCheckInterrupt() {
    // 使用LockSupport阻塞当前线程
    LockSupport.park(this);
    return Thread.interrupted();
}

释放互斥锁

public final boolean release(int arg) {
    // tryRelease是模板方法,由子类覆盖
    if (tryRelease(arg)) {
        // 在head释放锁
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next;
    // 没有后继节点,或者已经CANCELLED
    // 则需要从tail往回遍历,寻找当前节点的后继
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

获取共享锁

public final void acquireShared(int arg) {
    // tryAcquireShared是模板方法
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            // 前驱是head,才尝试获取锁
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

因为是shared模式,获取锁之后,可能要通知后面的节点

/**
 * Sets head of queue, and checks if successor may be waiting
 * in shared mode, if so propagating if either propagate > 0 or
 * PROPAGATE status was set.
 *
 * @param node the node
 * @param propagate the return value from a tryAcquireShared
 */
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
    /*
     * Try to signal next queued node if:
     *   Propagation was indicated by caller,
     *     or was recorded (as h.waitStatus either before
     *     or after setHead) by a previous operation
     *     (note: this uses sign-check of waitStatus because
     *      PROPAGATE status may transition to SIGNAL.)
     * and
     *   The next node is waiting in shared mode,
     *     or we don't know, because it appears null
     *
     * The conservatism in both of these checks may cause
     * unnecessary wake-ups, but only when there are multiple
     * racing acquires/releases, so most need signals now or soon
     * anyway.
     */
    // propagate > 0:还有剩余,需要传递
    // h.waitStatus < 0 :SIGNAL or PROPAGATE
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

释放共享锁

public final boolean releaseShared(int arg) {
    // tryReleaseShared是模板方法
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
private void doReleaseShared() {
    /*
     * Ensure that a release propagates, even if there are other
     * in-progress acquires/releases.  This proceeds in the usual
     * way of trying to unparkSuccessor of head if it needs
     * signal. But if it does not, status is set to PROPAGATE to
     * ensure that upon release, propagation continues.
     * Additionally, we must loop in case a new node is added
     * while we are doing this. Also, unlike other uses of
     * unparkSuccessor, we need to know if CAS to reset status
     * fails, if so rechecking.
     */
    for (;;) {
        // 在head释放锁
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            // SIGNAL需要唤醒后续节点
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            // why 转为 PROPAGATE 状态?
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

Node.PROPAGATE

AQS源码比较难理解的是Node.PROPAGATE。javadoc对Node.PROPAGATE的解释

PROPAGATE:  A releaseShared should be propagated to other
            nodes. This is set (for head node only) in
            doReleaseShared to ensure propagation
            continues, even if other operations have
            since intervened.

在共享模式下,可以认为资源有多个,因此当前线程被唤醒之后,可能还有剩余的资源可以唤醒其他线程。该状态用来表明后续节点会传播唤醒的操作。需要注意的是只有头节点才可以设置为该状态

找到一篇文章,对此思考比较深入,推荐阅读:AbstractQueuedSynchronizer源码解读

在AQS的共享锁中,一个被park的线程,不考虑线程中断和前驱节点取消的情况,有两种情况可以被unpark:一种是其他线程释放信号量,调用unparkSuccessor;另一种是其他线程获取共享锁时通过传播机制来唤醒后继节点。 可能会有队列中处于等待状态的节点因为第一个线程完成释放唤醒,第二个线程获取到锁,但还没设置好head,又有新线程释放锁,但是读到老的head状态为0导致释放但不唤醒,最终后一个等待线程既没有被释放线程唤醒,也没有被持锁线程唤醒。 PROPAGATE的引入是为了解决共享锁并发释放导致的线程hang住问题。

setHeadAndPropagate()

if (propagate > 0 || h == null || h.waitStatus < 0 ||
    (h = head) == null || h.waitStatus < 0) {
    Node s = node.next;
    if (s == null || s.isShared())
        doReleaseShared();
}

doReleaseShared()

else if (ws == 0 &&
         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
    continue;                // loop on failed CAS

AQS和Condition

AQS提供了ConditionObject,实现了条件队列。具体参见

小结

  • AQS使用CLH节点,构建双向链表
  • AQS提供了模板方法(tryXXX,例如tryRelease、tryAcquire),由子类覆盖
  • AQS支持互斥锁、共享锁、条件队列
  • 在tail增加等待线程
  • 在head获取锁、释放锁
  • 使用unsafe类CAS更新head、tail
  • 释放锁且Node.SIGNAL,则要通过unparkSuccessor唤醒后续节点
  • 对于共享锁,为了解决并发释放导致线程hang的情况,增加了Node.PROPAGATE。
  • 只有head节点才可以设置为PROPAGATE状态

参考

Built with Hugo
Theme Stack designed by Jimmy