Lock 和 Condition并发编程模式


1. 对比synchronized 和 Object#wait/notify/notifyAll

  1. 加锁

    • synchronized 内置锁,可以修饰方法或者代码块。
    • Lock需要手动加锁和手动释放锁,只能作用于代码块。加锁在try语句上面,释放锁是放在finally语句块里面。
  2. Object #wait vs Condition #await

    • 前者必须是在同步块里面。 后者必须持有Lock。
    • 当前线程进入一个等待队列(前者waitSet,后者是Condition的队列),然后释放持有的锁
  3. Object #notify/notifyAll vs Condition #signal/singalAll

    • 前者必须是在同步块里面。 后者必须持有Lock。
    • 被唤醒的线程会从一个队列移动到另外一个队列(前者是从waitSet -> EntryList, 后者: Condition队列 到 Lock队列)
    • 被唤醒的线程必须重新获取锁才能从wait/await方法返回

2. 源码

对比synchronized和 ObjectMonitor的底层实现。逻辑上有很多相似的地方。

2.1. Condition的await方法

具体实现类:ConditionObject (AQS类的内部类)

  1. 等待队列用的是单链表,记录了头和尾节点
  2. 调用await后,当前线程会进入等待队列,释放持有的锁,进入阻塞状态
  3. 会检查当前线程是否持有Lock, 没有则会抛异常 IllegalMonitorStateException
// AQS的内部类,封装了线程, 双链表的节点,跟ObjectWaiter类很类似
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;

volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;

final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}

ConditionObject #await方法

public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter(); // 1. 将当前线程添加到等待队列
int savedState = fullyRelease(node); // 2. 释放持有的锁?
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this); // 3. 阻塞当前线程
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node; // 似乎用的是单链表
lastWaiter = node;
return node;
}

Condition的await方法跟Object的wait方法是类似的,它要求在调用await的线程已经获取了对应的lock,然后调用await后,它会释放lock,等待别的线程调用相同Condition对象的signal方法唤醒,唤醒之后,它需要重新获取lock对象,然后才能从await方法返回。跟Object#wait一模一样。

2.2. Condition的signal方法

需要关注的细节:

  • 当前线程调此方法时,它必须持有lock,否则会抛出IllegalMonitorState的异常 (note 1)
  • 会从Condition的等待队列中,唤醒一个线程,被唤醒的线程会被加入到Lock的队列中
  • Lock的队列(AQS),是一个普通的双向链表(有head和tail两个指针,追加到链表尾部时间复杂度为O(1))
public final void signal() {
if (!isHeldExclusively()) // 1. 检查当前线程是否持有锁
throw new IllegalMonitorStateException();
Node first = firstWaiter; // 释放第一个等待的线程节点
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
// If cannot change waitStatus, the node has been cancelled.
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node); // 2. 将唤醒的线程节点放到Lock的队列
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread); // 3. 唤醒当前线程?
return true;
}

2.3. 简单看下AQS

AQS:AbstractQueuedSynchronizer,它就是一个队列,具体实现是用双向链表来实现,记录了头和尾节点。
比较重要的参数:state,表示状态的整数。

package java.util.concurrent.locks;
import java.util.concurrent.TimeUnit;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import sun.misc.Unsafe;

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
implements java.io.Serializable {
protected AbstractQueuedSynchronizer() { }
private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;

protected final int getState() { return state; }
protected final void setState(int newState) { state = newState; }
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
// Queuing utilities
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
private void unparkSuccessor(Node node) {
private void doReleaseShared() {
private void setHeadAndPropagate(Node node, int propagate) {
// Utilities for various versions of acquire
private void cancelAcquire(Node node) {
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
final boolean acquireQueued(final Node node, int arg) {
private void doAcquireInterruptibly(int arg)
private boolean doAcquireNanos(int arg, long nanosTimeout)
private void doAcquireShared(int arg) {
private void doAcquireSharedInterruptibly(int arg)
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)

// Main exported methods
protected boolean tryAcquire(int arg) {
protected boolean tryRelease(int arg) {
protected int tryAcquireShared(int arg) {
protected boolean tryReleaseShared(int arg) {
protected boolean isHeldExclusively() {
public final void acquire(int arg) {
public final void acquireInterruptibly(int arg)
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
...
}

文章作者: 量子数字
版权声明: 本博客所有文章除特別声明外,均采用 CC BY-NC-ND 4.0 许可协议。转载请注明来源 量子数字 !
  目录