8214271: Fast primitive to wake many threads

Reviewed-by: dholmes, dcubed
This commit is contained in:
Robbin Ehn 2019-01-11 10:58:46 +01:00
parent 6622393221
commit 83c8720879
6 changed files with 568 additions and 0 deletions

@ -0,0 +1,80 @@
/*
* Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*
*/
#include "precompiled/precompiled.hpp"
#include "runtime/orderAccess.hpp"
#include "runtime/os.hpp"
#include "waitBarrier_linux.hpp"
#include <sys/syscall.h>
#include <linux/futex.h>
#define check_with_errno(check_type, cond, msg) \
do { \
int err = errno; \
check_type(cond, "%s: error='%s' (errno=%s)", msg, os::strerror(err), \
os::errno_name(err)); \
} while (false)
#define guarantee_with_errno(cond, msg) check_with_errno(guarantee, cond, msg)
static int futex(volatile int *addr, int futex_op, int op_arg) {
return syscall(SYS_futex, addr, futex_op, op_arg, NULL, NULL, 0);
}
void LinuxWaitBarrier::arm(int barrier_tag) {
assert(_futex_barrier == 0, "Should not be already armed: "
"_futex_barrier=%d", _futex_barrier);
_futex_barrier = barrier_tag;
OrderAccess::fence();
}
void LinuxWaitBarrier::disarm() {
assert(_futex_barrier != 0, "Should be armed/non-zero.");
_futex_barrier = 0;
int s = futex(&_futex_barrier,
FUTEX_WAKE_PRIVATE,
INT_MAX /* wake a max of this many threads */);
guarantee_with_errno(s > -1, "futex FUTEX_WAKE failed");
}
void LinuxWaitBarrier::wait(int barrier_tag) {
assert(barrier_tag != 0, "Trying to wait on disarmed value");
if (barrier_tag == 0 ||
barrier_tag != _futex_barrier) {
OrderAccess::fence();
return;
}
do {
int s = futex(&_futex_barrier,
FUTEX_WAIT_PRIVATE,
barrier_tag /* should be this tag */);
guarantee_with_errno((s == 0) ||
(s == -1 && errno == EAGAIN) ||
(s == -1 && errno == EINTR),
"futex FUTEX_WAIT failed");
// Return value 0: woken up, but re-check in case of spurious wakeup.
// Error EINTR: woken by signal, so re-check and re-wait if necessary.
// Error EAGAIN: we are already disarmed and so will pass the check.
} while (barrier_tag == _futex_barrier);
}

@ -0,0 +1,48 @@
/*
* Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*
*/
#ifndef OS_LINUX_WAITBARRIER_LINUX_HPP
#define OS_LINUX_WAITBARRIER_LINUX_HPP
#include "memory/allocation.hpp"
class LinuxWaitBarrier : public CHeapObj<mtInternal> {
volatile int _futex_barrier;
// Prevent copying and assignment of LinuxWaitBarrier instances.
LinuxWaitBarrier(const LinuxWaitBarrier&);
LinuxWaitBarrier& operator=(const LinuxWaitBarrier&);
public:
LinuxWaitBarrier() : _futex_barrier(0) {};
~LinuxWaitBarrier() {};
const char* description() { return "futex"; }
void arm(int barrier_tag);
void disarm();
void wait(int barrier_tag);
};
#endif // OS_LINUX_WAITBARRIER_LINUX_HPP

@ -0,0 +1,135 @@
/*
* Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*
*/
#ifndef SHARE_UTILITIES_WAITBARRIER_HPP
#define SHARE_UTILITIES_WAITBARRIER_HPP
#include "memory/allocation.hpp"
#include "runtime/thread.hpp"
#include "utilities/debug.hpp"
#include "utilities/waitBarrier_generic.hpp"
#if defined(LINUX)
#include "waitBarrier_linux.hpp"
typedef LinuxWaitBarrier WaitBarrierDefault;
#else
typedef GenericWaitBarrier WaitBarrierDefault;
#endif
// Platform independent WaitBarrier API.
// An armed WaitBarrier prevents threads from advancing until the threads are
// woken by calling disarm(). The barrier is armed by setting a non-zero value
// - the tag. When the WaitBarrier is created, a thread is designated the owner
// and is the thread that should arm and disarm the WaitBarrier. In debug builds
// this is enforced.
//
// Expected Usage:
// - Arming thread:
// tag = ...; // non-zero value
// barrier.arm(tag);
// <publish tag>
// <work>
// barrier.disarm();
//
// - After arm(tag) returns any thread calling wait(tag) will block.
// - Calling disarm() guarantees any thread calling or that has wait(tag) will
// return. Either they will see the WaitBarrier as disarmed or they will be
// unblocked and eligible to execute again when disarm() returns.
// - After calling disarm() the barrier is ready to be re-armed with a new tag.
// (may not be re-armed with last used tag)
//
// - Waiting threads
// wait(tag); // don't execute following code unless 'safe'
// <work>
//
// - A call to wait(tag) will block if the barrier is armed with the value
// 'tag'; else it will return immediately.
// - A blocked thread is eligible to execute again once the barrier is
// disarmed when disarm() has been called.
//
// It is a usage error to:
// - call arm on a barrier that is already armed
// - call disarm on a barrier that is not armed
// - arm with the same tag as last used
// Usage errors are checked in debug builds but may be ignored otherwise.
//
// A primary goal of the WaitBarrier implementation is to wake all waiting
// threads as fast, and as concurrently, as possible.
//
template <typename WaitBarrierImpl>
class WaitBarrierType : public CHeapObj<mtInternal> {
WaitBarrierImpl _impl;
// Prevent copying and assignment of WaitBarrier instances.
WaitBarrierType(const WaitBarrierDefault&);
WaitBarrierType& operator=(const WaitBarrierDefault&);
#ifdef ASSERT
int _last_arm_tag;
Thread* _owner;
#endif
public:
WaitBarrierType(Thread* owner) : _impl() {
#ifdef ASSERT
_last_arm_tag = 0;
_owner = owner;
#endif
}
~WaitBarrierType() {}
// Returns implementation description.
const char* description() { return _impl.description(); }
// Guarantees any thread calling wait() with same tag will be blocked.
// Provides a trailing fence.
void arm(int barrier_tag) {
#ifdef ASSERT
assert(_last_arm_tag != barrier_tag, "Re-arming with same tag");
_last_arm_tag = barrier_tag;
assert(_owner == Thread::current(), "Not owner thread");
#endif
_impl.arm(barrier_tag);
}
// Guarantees any thread that called wait() will be awake when it returns.
// Provides a trailing fence.
void disarm() {
assert(_owner == Thread::current(), "Not owner thread");
_impl.disarm();
}
// Guarantees not to return until disarm() is called,
// if called with currently armed tag (otherwise returns immediately).
// Implementations must guarantee no spurious wakeups.
// Provides a trailing fence.
void wait(int barrier_tag) {
assert(_owner != Thread::current(), "Trying to wait with owner thread");
_impl.wait(barrier_tag);
}
};
typedef WaitBarrierType<WaitBarrierDefault> WaitBarrier;
#endif // SHARE_UTILITIES_WAITBARRIER_HPP

@ -0,0 +1,94 @@
/*
* Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*
*/
#include "precompiled.hpp"
#include "runtime/atomic.hpp"
#include "runtime/orderAccess.hpp"
#include "runtime/os.hpp"
#include "utilities/waitBarrier_generic.hpp"
#include "utilities/spinYield.hpp"
void GenericWaitBarrier::arm(int barrier_tag) {
assert(_barrier_tag == 0, "Already armed");
assert(_waiters == 0, "We left a thread hanging");
_barrier_tag = barrier_tag;
_waiters = 0;
OrderAccess::fence();
}
int GenericWaitBarrier::wake_if_needed() {
assert(_barrier_tag == 0, "Not disarmed");
int w = _waiters;
if (w == 0) {
// Load of _barrier_threads in caller must not pass the load of _waiters.
OrderAccess::loadload();
return 0;
}
assert(w > 0, "Bad counting");
// We need an exact count which never goes below zero,
// otherwise the semaphore may be signalled too many times.
if (Atomic::cmpxchg(w - 1, &_waiters, w) == w) {
_sem_barrier.signal();
return w - 1;
}
return w;
}
void GenericWaitBarrier::disarm() {
assert(_barrier_tag != 0, "Not armed");
_barrier_tag = 0;
// Loads of _barrier_threads/_waiters must not float above disarm store and
// disarm store must not sink below.
OrderAccess::fence();
int left;
SpinYield sp;
do {
left = GenericWaitBarrier::wake_if_needed();
if (left == 0 && _barrier_threads > 0) {
// There is no thread to wake but we still have barrier threads.
sp.wait();
}
// We must loop here until there are no waiters or potential waiters.
} while (left > 0 || _barrier_threads > 0);
// API specifies disarm() must provide a trailing fence.
OrderAccess::fence();
}
void GenericWaitBarrier::wait(int barrier_tag) {
assert(barrier_tag != 0, "Trying to wait on disarmed value");
if (barrier_tag != _barrier_tag) {
// API specifies wait() must provide a trailing fence.
OrderAccess::fence();
return;
}
Atomic::add(1, &_barrier_threads);
if (barrier_tag != 0 && barrier_tag == _barrier_tag) {
Atomic::add(1, &_waiters);
_sem_barrier.wait();
// We help out with posting, but we need to do so before we decrement the
// _barrier_threads otherwise we might wake threads up in next wait.
GenericWaitBarrier::wake_if_needed();
}
Atomic::add(-1, &_barrier_threads);
}

@ -0,0 +1,59 @@
/*
* Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*
*/
#ifndef SHARE_UTILITIES_WAITBARRIER_GENERIC_HPP
#define SHARE_UTILITIES_WAITBARRIER_GENERIC_HPP
#include "memory/allocation.hpp"
#include "runtime/semaphore.hpp"
// In addition to the barrier tag, it uses two counters to keep the semaphore
// count correct and not leave any late thread waiting.
class GenericWaitBarrier : public CHeapObj<mtInternal> {
volatile int _barrier_tag;
// The number of threads waiting on or about to wait on the semaphore.
volatile int _waiters;
// The number of threads in the wait path, before or after the tag check.
// These threads can become waiters.
volatile int _barrier_threads;
Semaphore _sem_barrier;
// Prevent copying and assignment of GenericWaitBarrier instances.
GenericWaitBarrier(const GenericWaitBarrier&);
GenericWaitBarrier& operator=(const GenericWaitBarrier&);
int wake_if_needed();
public:
GenericWaitBarrier() : _barrier_tag(0), _waiters(0), _barrier_threads(0), _sem_barrier(0) {}
~GenericWaitBarrier() {}
const char* description() { return "semaphore"; }
void arm(int barrier_tag);
void disarm();
void wait(int barrier_tag);
};
#endif // SHARE_UTILITIES_WAITBARRIER_GENERIC_HPP

@ -0,0 +1,152 @@
/*
* Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
#include "precompiled.hpp"
#include "runtime/atomic.hpp"
#include "runtime/orderAccess.hpp"
#include "runtime/os.hpp"
#include "utilities/spinYield.hpp"
#include "utilities/waitBarrier.hpp"
#include "threadHelper.inline.hpp"
static volatile int wait_tag = 0;
static volatile int valid_value = 0;
template <typename WaitBarrierImpl>
class WBThread : public JavaTestThread {
public:
static volatile bool _exit;
WaitBarrierType<WaitBarrierImpl>* _wait_barrier;
Semaphore* _wrt_start;
volatile int _on_barrier;
WBThread(Semaphore* post, WaitBarrierType<WaitBarrierImpl>* wb, Semaphore* wrt_start)
: JavaTestThread(post), _wait_barrier(wb), _wrt_start(wrt_start) {};
virtual ~WBThread(){}
void main_run() {
_wrt_start->signal();
int vv, tag;
// Similar to how a JavaThread would stop in a safepoint.
while (!_exit) {
// Load the published tag.
tag = OrderAccess::load_acquire(&wait_tag);
// Publish the tag this thread is going to wait for.
OrderAccess::release_store(&_on_barrier, tag);
if (_on_barrier == 0) {
SpinPause();
continue;
}
OrderAccess::storeload(); // Loads in WB must not float up.
// Wait until we are woken.
_wait_barrier->wait(tag);
// Verify that we do not see an invalid value.
vv = OrderAccess::load_acquire(&valid_value);
ASSERT_EQ((vv & 0x1), 0);
OrderAccess::release_store(&_on_barrier, 0);
}
}
};
template <typename WaitBarrierImpl>
volatile bool WBThread<WaitBarrierImpl>::_exit = false;
template <typename WaitBarrierImpl>
class WBArmerThread : public JavaTestThread {
public:
WBArmerThread(Semaphore* post) : JavaTestThread(post) {
};
virtual ~WBArmerThread(){}
void main_run() {
static const int NUMBER_OF_READERS = 4;
Semaphore post;
Semaphore wrt_start;
WaitBarrierType<WaitBarrierImpl> wb(this);
WBThread<WaitBarrierImpl>* reader1 = new WBThread<WaitBarrierImpl>(&post, &wb, &wrt_start);
WBThread<WaitBarrierImpl>* reader2 = new WBThread<WaitBarrierImpl>(&post, &wb, &wrt_start);
WBThread<WaitBarrierImpl>* reader3 = new WBThread<WaitBarrierImpl>(&post, &wb, &wrt_start);
WBThread<WaitBarrierImpl>* reader4 = new WBThread<WaitBarrierImpl>(&post, &wb, &wrt_start);
reader1->doit();
reader2->doit();
reader3->doit();
reader4->doit();
int nw = NUMBER_OF_READERS;
while (nw > 0) {
wrt_start.wait();
--nw;
}
jlong stop_ms = os::javaTimeMillis() + 1000; // 1 seconds max test time
int next_tag = 1;
// Similar to how the VM thread would use a WaitBarrier in a safepoint.
while (stop_ms > os::javaTimeMillis()) {
// Arm next tag.
wb.arm(next_tag);
// Publish tag.
OrderAccess::release_store_fence(&wait_tag, next_tag);
// Wait until threads picked up new tag.
while (reader1->_on_barrier != wait_tag ||
reader2->_on_barrier != wait_tag ||
reader3->_on_barrier != wait_tag ||
reader4->_on_barrier != wait_tag) {
SpinPause();
}
// Set an invalid value.
OrderAccess::release_store(&valid_value, valid_value + 1); // odd
os::naked_yield();
// Set a valid value.
OrderAccess::release_store(&valid_value, valid_value + 1); // even
// Publish inactive tag.
OrderAccess::release_store_fence(&wait_tag, 0); // Stores in WB must not float up.
wb.disarm();
// Wait until threads done valid_value verification.
while (reader1->_on_barrier != 0 ||
reader2->_on_barrier != 0 ||
reader3->_on_barrier != 0 ||
reader4->_on_barrier != 0) {
SpinPause();
}
++next_tag;
}
WBThread<WaitBarrierImpl>::_exit = true;
for (int i = 0; i < NUMBER_OF_READERS; i++) {
post.wait();
}
}
};
TEST_VM(WaitBarrier, default_wb) {
WBThread<WaitBarrierDefault>::_exit = false;
mt_test_doer<WBArmerThread<WaitBarrierDefault> >();
}
#if defined(LINUX)
TEST_VM(WaitBarrier, generic_wb) {
WBThread<GenericWaitBarrier>::_exit = false;
mt_test_doer<WBArmerThread<GenericWaitBarrier> >();
}
#endif