1. 对比synchronized 和 Object#wait/notify/notifyAll
加锁
- synchronized 内置锁,可以修饰方法或者代码块。
- Lock需要手动加锁和手动释放锁,只能作用于代码块。加锁在try语句上面,释放锁是放在finally语句块里面。
Object #wait vs Condition #await
- 前者必须是在同步块里面。 后者必须持有Lock。
- 当前线程进入一个等待队列(前者waitSet,后者是Condition的队列),然后释放持有的锁
Object #notify/notifyAll vs Condition #signal/singalAll
- 前者必须是在同步块里面。 后者必须持有Lock。
- 被唤醒的线程会从一个队列移动到另外一个队列(前者是从waitSet -> EntryList, 后者: Condition队列 到 Lock队列)
- 被唤醒的线程必须重新获取锁才能从wait/await方法返回
2. 源码
对比synchronized和 ObjectMonitor的底层实现。逻辑上有很多相似的地方。
2.1. Condition的await方法
具体实现类:ConditionObject (AQS类的内部类)
- 等待队列用的是单链表,记录了头和尾节点
- 调用await后,当前线程会进入等待队列,释放持有的锁,进入阻塞状态
- 会检查当前线程是否持有Lock, 没有则会抛异常 IllegalMonitorStateException
static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null;
volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; Node nextWaiter;
final boolean isShared() { return nextWaiter == SHARED; } final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { } Node(Thread thread, Node mode) { this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { this.waitStatus = waitStatus; this.thread = thread; } }
|
ConditionObject #await方法
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
private Node addConditionWaiter() { Node t = lastWaiter; if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
|
Condition的await方法跟Object的wait方法是类似的,它要求在调用await的线程已经获取了对应的lock,然后调用await后,它会释放lock,等待别的线程调用相同Condition对象的signal方法唤醒,唤醒之后,它需要重新获取lock对象,然后才能从await方法返回。跟Object#wait一模一样。
2.2. Condition的signal方法
需要关注的细节:
- 当前线程调此方法时,它必须持有lock,否则会抛出IllegalMonitorState的异常 (note 1)
- 会从Condition的等待队列中,唤醒一个线程,被唤醒的线程会被加入到Lock的队列中
- Lock的队列(AQS),是一个普通的双向链表(有head和tail两个指针,追加到链表尾部时间复杂度为O(1))
public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); } private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } final boolean transferForSignal(Node node) { if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
|
2.3. 简单看下AQS
AQS:AbstractQueuedSynchronizer,它就是一个队列,具体实现是用双向链表来实现,记录了头和尾节点。
比较重要的参数:state,表示状态的整数。
package java.util.concurrent.locks; import java.util.concurrent.TimeUnit; import java.util.ArrayList; import java.util.Collection; import java.util.Date; import sun.misc.Unsafe;
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { protected AbstractQueuedSynchronizer() { } private transient volatile Node head; private transient volatile Node tail; private volatile int state;
protected final int getState() { return state; } protected final void setState(int newState) { state = newState; } protected final boolean compareAndSetState(int expect, int update) { return unsafe.compareAndSwapInt(this, stateOffset, expect, update); } private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } private void setHead(Node node) { head = node; node.thread = null; node.prev = null; } private void unparkSuccessor(Node node) { private void doReleaseShared() { private void setHeadAndPropagate(Node node, int propagate) { private void cancelAcquire(Node node) { private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { static void selfInterrupt() { Thread.currentThread().interrupt(); } private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); } final boolean acquireQueued(final Node node, int arg) { private void doAcquireInterruptibly(int arg) private boolean doAcquireNanos(int arg, long nanosTimeout) private void doAcquireShared(int arg) { private void doAcquireSharedInterruptibly(int arg) private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
protected boolean tryAcquire(int arg) { protected boolean tryRelease(int arg) { protected int tryAcquireShared(int arg) { protected boolean tryReleaseShared(int arg) { protected boolean isHeldExclusively() { public final void acquire(int arg) { public final void acquireInterruptibly(int arg) public final boolean tryAcquireNanos(int arg, long nanosTimeout) ... }
|