synchronized与wait和notify以及底层实现


1. synchronized关键字和wait、notify的联系

调用Object #wait、#notify、#notifyAll 之前线程必须持有当前对象的monitor,也就是说,这三个方法的调用必须是在synchronized同步块或者同步方法里面。

2. 监视器对象 (monitor)

JVM的实现中,与synchronized关键字紧密相关的一个C++文件:objectMonitor.hpp

相关的类有:

  • ObjectWaiter (它是等待线程的一个封装,同时是一个双链表的节点)
  • ObjectMonitor对象

在线阅读参考:objectMonitor.hpp

ObjectWaiter是等待Monitor对象的线程的一个封装,它里面有_prev, _next, _thread,它是一个双向链表的节点。
ObjectMonitor 类的实例就是与Java对象一同创建和销毁的。比较重要的成员:

  1. ObjectWaiter * volatile _WaitSet;
    双向链表的表头。记录等待在monitor对象的线程列表
    LL of threads wait()ing on the monitor

  2. ObjectWaiter * volatile _EntryList ;
    准备获取monitor对象的线程,但是没有获取到
    Threads blocked on entry or reentry.

  3. void * volatile _owner;
    pointer to owning thread OR BasicLock

  4. volatile intptr_t _recursions;
    recursion count, 0 for first entry 可重入锁

  5. ObjectWaiter * volatile _cxq
    LL of recently-arrived threads blocked on entry. The list is actually composed of WaitNodes, acting as proxies for Threads.

  6. volatile int _WaitSetLock;
    自旋锁, protects Wait Queue - simple spinlock

示意图:

2.1. wait方法的底层实现

需要关注的细节:

  • 当前线程调用同步对象的wait方法时,它必须持有monitor监视器,也就是说必须是在同步代码块或同步方法中 (note 1)
  • 当前线程调用wait后,会将它自己放到monitor对象的waitSet中 (note 3)
  • 当前线程会将它持有的monitor对象释放掉 (note 5)
  • 当前线程实际上是调用了操作系统的API去改变线程状态的 (note 6)
  • waitSet实际上是一个双向循环链表的结构,当前线程是添加到waitSet的尾部 (O(1)复杂度)
void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {
Thread * const Self = THREAD ;
// ...
// 1. 这里会考察当前线程是否持有monitor,否则会抛出java_lang_IllegalMonitorStateException
CHECK_OWNER();

// 2. 将当前线程包装为一个链表节点,然后设置线程的状态为wait
ObjectWaiter node(Self);
node.TState = ObjectWaiter::TS_WAIT ;

// Enter the waiting queue, which is a circular doubly linked list in this case
// but it could be a priority queue or any data structure.
// _WaitSetLock protects the wait queue. Normally the wait queue is accessed only
// by the the owner of the monitor *except* in the case where park()
// returns because of a timeout of interrupt. Contention is exceptionally rare
// so we use a simple spin-lock instead of a heavier-weight blocking lock.

Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") ;
AddWaiter (&node) ; // 3. 将当前线程放到waitSet
Thread::SpinRelease (&_WaitSetLock) ;

intptr_t save = _recursions; // record the old recursion count
_waiters++; // 4. 等待的线程个数加1
exit (true, Self) ; // 5. 当前线程退出监视器,释放monitor
guarantee (_owner != Self, "invariant") ; // 确保当前线程已经释放掉了monitor

// The thread is on the WaitSet list - now park() it.
// 6. 调用OS的api将当前线程 转换成wait状态 (park)
int ret = OS_OK ;
int WasNotified = 0 ;
{ // State transition wrappers
OSThread* osthread = Self->osthread();
OSThreadWaitState osts(osthread, true);
{
ThreadBlockInVM tbivm(jt);
// Thread is in thread_blocked state and oop access is unsafe.
jt->set_suspend_equivalent();

if (interruptible && (Thread::is_interrupted(THREAD, false) || HAS_PENDING_EXCEPTION)) {
// Intentionally empty
} else
if (node._notified == 0) {
if (millis <= 0) {
Self->_ParkEvent->park () ;
} else {
ret = Self->_ParkEvent->park (millis) ;
}
}
if (ExitSuspendEquivalent (jt)) {
jt->java_suspend_self();
}

} // Exit thread safepoint: transition _thread_blocked -> _thread_in_vm
// ...
}

inline void ObjectMonitor::AddWaiter(ObjectWaiter* node) { // 简单
// put node at end of queue (circular doubly linked list)
if (_WaitSet == NULL) {
_WaitSet = node;
node->_prev = node;
node->_next = node;
} else {
ObjectWaiter* head = _WaitSet ;
ObjectWaiter* tail = head->_prev;
assert(tail->_next == head, "invariant check");
tail->_next = node;
head->_prev = node;
node->_next = head;
node->_prev = tail;
}
}

2.2. notify方法的底层实现

需要关注的细节:

  • 当前线程调用同步对象的notify方法时,它必须持有monitor监视器,也就是说必须是在同步代码块或同步方法中,否则会抛出IllegalMonitorState的异常 (note 1)
  • 会从WaitSet中等待的线程,随机唤醒一个线程 (实际不是随机唤醒,而是直接拿头节点),被唤醒的线程会被加入到EntryList中
  • 调用notify后,当前线程会将它持有的monitor对象锁释放掉 (暂时没有看到源码)
  • EntryList的结构,是一个普通的双向链表(当追加线程到链表末尾时,时间复杂度为O(n))

// Consider:
// If the lock is cool (cxq == null && succ == null) and we're on an MP system
// then instead of transferring a thread from the WaitSet to the EntryList
// we might just dequeue a thread from the WaitSet and directly unpark() it.
void ObjectMonitor::notify(TRAPS) {
CHECK_OWNER(); // note 1: 当前线程必须拥有monitor对象锁,否则会抛IMSE异常
//...
Thread::SpinAcquire (&_WaitSetLock, "WaitSet - notify") ;
ObjectWaiter * iterator = DequeueWaiter() ; // note 2: 从WaitSet中取出一个等待的线程
if (iterator != NULL) {
TEVENT (Notify1 - Transfer) ;
guarantee (iterator->TState == ObjectWaiter::TS_WAIT, "invariant") ;
guarantee (iterator->_notified == 0, "invariant") ;
if (Policy != 4) {
iterator->TState = ObjectWaiter::TS_ENTER ;
}
iterator->_notified = 1 ;
Thread * Self = THREAD;
iterator->_notifier_tid = Self->osthread()->thread_id(); // note 3. 记录被哪个线程唤醒的

ObjectWaiter * List = _EntryList ;
if (List != NULL) {
assert (List->_prev == NULL, "invariant") ;
assert (List->TState == ObjectWaiter::TS_ENTER, "invariant") ;
assert (List != iterator, "invariant") ;
}

// note 4: 根据policy按不同方式插入到EntryList,或者cxq链表
if (Policy == 0) { // prepend to EntryList
if (List == NULL) {
iterator->_next = iterator->_prev = NULL ;
_EntryList = iterator ;
} else {
List->_prev = iterator ;
iterator->_next = List ;
iterator->_prev = NULL ;
_EntryList = iterator ;
}
} else
if (Policy == 1) { // append to EntryList
if (List == NULL) {
iterator->_next = iterator->_prev = NULL ;
_EntryList = iterator ;
} else {
// note: 当前的EntryList是普通的双向链表,找tail需要O(n)时间,可以转换成循环双链表
ObjectWaiter * Tail ;
for (Tail = List ; Tail->_next != NULL ; Tail = Tail->_next) ;
assert (Tail != NULL && Tail->_next == NULL, "invariant") ;
Tail->_next = iterator ;
iterator->_prev = Tail ;
iterator->_next = NULL ;
}
} else
if (Policy == 2) { // prepend to cxq
// prepend to cxq
if (List == NULL) {
iterator->_next = iterator->_prev = NULL ;
_EntryList = iterator ;
} else {
iterator->TState = ObjectWaiter::TS_CXQ ;
for (;;) {
ObjectWaiter * Front = _cxq ;
iterator->_next = Front ;
if (Atomic::cmpxchg_ptr (iterator, &_cxq, Front) == Front) {
break ;
}
}
}
} else
if (Policy == 3) { // append to cxq
iterator->TState = ObjectWaiter::TS_CXQ ;
for (;;) {
ObjectWaiter * Tail ;
Tail = _cxq ;
if (Tail == NULL) {
iterator->_next = NULL ;
if (Atomic::cmpxchg_ptr (iterator, &_cxq, NULL) == NULL) {
break ;
}
} else {
while (Tail->_next != NULL) Tail = Tail->_next ;
Tail->_next = iterator ;
iterator->_prev = Tail ;
iterator->_next = NULL ;
break ;
}
}
} else {
ParkEvent * ev = iterator->_event ;
iterator->TState = ObjectWaiter::TS_RUN ;
OrderAccess::fence() ;
ev->unpark() ;
}

if (Policy < 4) {
iterator->wait_reenter_begin(this);
}

// _WaitSetLock protects the wait queue, not the EntryList. We could
// move the add-to-EntryList operation, above, outside the critical section
// protected by _WaitSetLock. In practice that's not useful. With the
// exception of wait() timeouts and interrupts the monitor owner
// is the only thread that grabs _WaitSetLock. There's almost no contention
// on _WaitSetLock so it's not profitable to reduce the length of the
// critical section.
}

Thread::SpinRelease (&_WaitSetLock) ;
}

inline ObjectWaiter* ObjectMonitor::DequeueWaiter() { // 简单
// dequeue the very first waiter
ObjectWaiter* waiter = _WaitSet;
if (waiter) {
DequeueSpecificWaiter(waiter);
}
return waiter;
}

inline void ObjectMonitor::DequeueSpecificWaiter(ObjectWaiter* node) { // 简单
// when the waiter has woken up because of interrupt,
// timeout or other spurious wake-up, dequeue the
// waiter from waiting list
ObjectWaiter* next = node->_next;
if (next == node) {
_WaitSet = NULL;
} else {
ObjectWaiter* prev = node->_prev;
next->_prev = prev;
prev->_next = next;
if (_WaitSet == node) {
_WaitSet = next;
}
}
node->_next = NULL;
node->_prev = NULL;
}

2.3. 其他有兴趣的细节

  1. Java中多个线程执行同步方法/同步块时,这些线程是直接进入EntryList (查看enter方法)
  2. 当monitor已经被线程占用时,当前线程会尝试自旋,对吧,怎么体现的 (查看enter方法)
  3. 多个线程同时争夺monitor,怎么体现的
  4. monitorexit指令的底层内容 (exit方法)

cur = Atomic::cmpxchg_ptr (Self, &_owner, NULL)
返回值的含义:
返回的是变量_owner的值,如果CAS操作成功,那么返回的是Self,如果失败,那么返回的应该是其他线程。

2.4. monitorenter底层原理

对应enter方法,比较难理解
之前读C++代码时对CAS的返回值有犹豫,它到底返回的是什么?
它的含义是:如果目标地址的值是原值,则将其更新为新值,并且返回原值。否则,不做更新,并且返回新值
【这里我感觉是返回内存中当前的值,不一定是参数里面设置的新值,但也有可能是新值】。因此,当返回结果是原值时,
则说明更新成功。

void ATTR ObjectMonitor::enter(TRAPS) {
// The following code is ordered to check the most common cases first
Thread * const Self = THREAD ;
void * cur ;

cur = Atomic::cmpxchg_ptr (Self, &_owner, NULL) ; // 1. 竞争锁
if (cur == NULL) { // 返回旧值:CAS成功,Self线程竞争到锁,owner设置成功
// Either ASSERT _recursions == 0 or explicitly set _recursions = 0.
assert (_recursions == 0 , "invariant") ;
assert (_owner == Self, "invariant") ;
// CONSIDER: set or assert OwnerIsThread == 1
return ;
}

if (cur == Self) { // 2. 竞争失败,CAS返回内存最新值,已经是Self线程了,说明是当前线程重入sync块
_recursions ++ ;
return ;
}

if (Self->is_lock_owned ((address)cur)) { // 3. owner除了是线程,还可能是一个基本锁对象
assert (_recursions == 0, "internal state error");
_recursions = 1 ;
// Commute owner from a thread-specific on-stack BasicLockObject address to
// a full-fledged "Thread *".
_owner = Self ;
OwnerIsThread = 1 ;
return ;
} // else: 4. 没有竞争到锁,其他线程持有了锁。当前线程,先自旋,不行再阻塞

// Try one round of spinning *before* enqueueing Self
// and before going through the awkward and expensive state
// transitions. The following spin is strictly optional ...
// Note that if we acquire the monitor from an initial spin
// we forgo posting JVMTI events and firing DTRACE probes.
if (Knob_SpinEarly && TrySpin (Self) > 0) { // 5. 尝试自旋,如果能获取到锁就更好
assert (_owner == Self , "invariant") ;
assert (_recursions == 0 , "invariant") ;
assert (((oop)(object()))->mark() == markOopDesc::encode(this), "invariant") ;
Self->_Stalled = 0 ;
return ;
}

assert (_owner != Self , "invariant") ; // 6. 自旋后没有获取到锁
assert (_succ != Self , "invariant") ;

OSThreadContendState osts(Self->osthread());
ThreadBlockInVM tbivm(jt);

Self->set_current_pending_monitor(this);

// TODO-FIXME: change the following for(;;) loop to straight-line code.
for (;;) {
jt->set_suspend_equivalent();
// cleared by handle_special_suspend_equivalent_condition()
// or java_suspend_self()
EnterI (THREAD) ;
if (!ExitSuspendEquivalent(jt)) break ;
//
// We have acquired the contended monitor, but while we were
// waiting another thread suspended us. We don't want to enter
// the monitor while suspended because that would surprise the
// thread that suspended us.
//
_recursions = 0 ;
_succ = NULL ;
exit (false, Self) ;

jt->java_suspend_self();
}
Self->set_current_pending_monitor(NULL);
}
}


void ATTR ObjectMonitor::EnterI (TRAPS) {
Thread * Self = THREAD ;
assert (((JavaThread *) Self)->thread_state() == _thread_blocked , "invariant") ;

// Try the lock - TATAS
if (TryLock (Self) > 0) {
assert (_succ != Self , "invariant") ;
assert (_owner == Self , "invariant") ;
assert (_Responsible != Self , "invariant") ;
return ;
}

DeferredInitialize () ;

// We try one round of spinning *before* enqueueing Self.
if (TrySpin (Self) > 0) {
assert (_owner == Self , "invariant") ;
assert (_succ != Self , "invariant") ;
assert (_Responsible != Self , "invariant") ;
return ;
}

// The Spin failed -- Enqueue and park the thread ...
assert (_succ != Self , "invariant") ;
assert (_owner != Self , "invariant") ;
assert (_Responsible != Self , "invariant") ;

// Enqueue "Self" on ObjectMonitor's _cxq.
ObjectWaiter node(Self) ;
Self->_ParkEvent->reset() ;
node._prev = (ObjectWaiter *) 0xBAD ;
node.TState = ObjectWaiter::TS_CXQ ;

// Push "Self" onto the front of the _cxq.
// Once on cxq/EntryList, Self stays on-queue until it acquires the lock.
// Note that spinning tends to reduce the rate at which threads
// enqueue and dequeue on EntryList|cxq.
ObjectWaiter * nxt ;
for (;;) {
node._next = nxt = _cxq ;
if (Atomic::cmpxchg_ptr (&node, &_cxq, nxt) == nxt) break ;

// Interference - the CAS failed because _cxq changed. Just retry.
// As an optional optimization we retry the lock.
if (TryLock (Self) > 0) {
assert (_succ != Self , "invariant") ;
assert (_owner == Self , "invariant") ;
assert (_Responsible != Self , "invariant") ;
return ;
}
}

// Check for cxq|EntryList edge transition to non-null. This indicates
// the onset of contention.
// The lock have been released while this thread was occupied queueing
// itself onto _cxq. To close the race and avoid "stranding" and
// progress-liveness failure we must resample-retry _owner before parking.
// Note the Dekker/Lamport duality: ST cxq; MEMBAR; LD Owner.
// In this case the ST-MEMBAR is accomplished with CAS().
//
// TODO: Defer all thread state transitions until park-time.
// Since state transitions are heavy and inefficient we'd like
// to defer the state transitions until absolutely necessary,
// and in doing so avoid some transitions ...

for (;;) {
if (TryLock (Self) > 0) break ;
assert (_owner != Self, "invariant") ;
// park self
if (_Responsible == Self || (SyncFlags & 1)) {
TEVENT (Inflated enter - park TIMED) ;
Self->_ParkEvent->park ((jlong) RecheckInterval) ;
// Increase the RecheckInterval, but clamp the value.
RecheckInterval *= 8 ;
if (RecheckInterval > 1000) RecheckInterval = 1000 ;
} else {
TEVENT (Inflated enter - park UNTIMED) ;
Self->_ParkEvent->park() ; // 将当前线程阻塞
}

if (TryLock(Self) > 0) break ;

return ;
}

2.5. monitorexit底层原理

对应exit方法,比较难理解。 exit要做的事情,让出持有的monitor锁。 比较长,后面有机会再回顾补充。

3. 参考

  1. Synchronized的原理(汇编层)_aileitianshi的博客-CSDN博客_synchronized 汇编

文章作者: 量子数字
版权声明: 本博客所有文章除特別声明外,均采用 CC BY-NC-ND 4.0 许可协议。转载请注明来源 量子数字 !
  目录