8318986: Improve GenericWaitBarrier performance
Reviewed-by: rehn, iwalulya, pchilanomate
This commit is contained in:
parent
407cdd4cac
commit
30462f9da4
@ -1,5 +1,6 @@
|
||||
/*
|
||||
* Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
|
||||
* Copyright (c) 2019, 2023, Oracle and/or its affiliates. All rights reserved.
|
||||
* Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
@ -29,66 +30,228 @@
|
||||
#include "utilities/waitBarrier_generic.hpp"
|
||||
#include "utilities/spinYield.hpp"
|
||||
|
||||
void GenericWaitBarrier::arm(int barrier_tag) {
|
||||
assert(_barrier_tag == 0, "Already armed");
|
||||
assert(_waiters == 0, "We left a thread hanging");
|
||||
_barrier_tag = barrier_tag;
|
||||
_waiters = 0;
|
||||
OrderAccess::fence();
|
||||
}
|
||||
// Implements the striped semaphore wait barrier.
|
||||
//
|
||||
// To guarantee progress and safety, we need to make sure that new barrier tag
|
||||
// starts with the completely empty set of waiters and free semaphore. This
|
||||
// requires either waiting for all threads to leave wait() for current barrier
|
||||
// tag on disarm(), or waiting for all threads to leave the previous tag before
|
||||
// reusing the semaphore in arm().
|
||||
//
|
||||
// When there are multiple threads, it is normal for some threads to take
|
||||
// significant time to leave the barrier. Waiting for these threads introduces
|
||||
// stalls on barrier reuse.
|
||||
//
|
||||
// If we wait on disarm(), this stall is nearly guaranteed to happen if some threads
|
||||
// are de-scheduled by prior wait(). It would be especially bad if there are more
|
||||
// waiting threads than CPUs: every thread would need to wake up and register itself
|
||||
// as leaving, before we can unblock from disarm().
|
||||
//
|
||||
// If we wait on arm(), we can get lucky that most threads would be able to catch up,
|
||||
// exit wait(), and so we arrive to arm() with semaphore ready for reuse. However,
|
||||
// that is still insufficient in practice.
|
||||
//
|
||||
// Therefore, this implementation goes a step further and implements the _striped_
|
||||
// semaphores. We maintain several semaphores in cells. The barrier tags are assigned
|
||||
// to cells in some simple manner. Most of the current uses have sequential barrier
|
||||
// tags, so simple modulo works well. We then operate on a cell like we would operate
|
||||
// on a single semaphore: we wait at arm() for all threads to catch up before reusing
|
||||
// the cell. For the cost of maintaining just a few cells, we have enough window for
|
||||
// threads to catch up.
|
||||
//
|
||||
// The correctness is guaranteed by using a single atomic state variable per cell,
|
||||
// with updates always done with CASes:
|
||||
//
|
||||
// [.......... barrier tag ..........][.......... waiters ..........]
|
||||
// 63 31 0
|
||||
//
|
||||
// Cell starts with zero tag and zero waiters. Arming the cell swings barrier tag from
|
||||
// zero to some tag, while checking that no waiters have appeared. Disarming swings
|
||||
// the barrier tag back from tag to zero. Every waiter registers itself by incrementing
|
||||
// the "waiters", while checking that barrier tag is still the same. Every completing waiter
|
||||
// decrements the "waiters". When all waiters complete, a cell ends up in initial state,
|
||||
// ready to be armed again. This allows accurate tracking of how many signals
|
||||
// to issue and does not race with disarm.
|
||||
//
|
||||
// The implementation uses the strongest (default) barriers for extra safety, even
|
||||
// when not strictly required to do so for correctness. Extra barrier overhead is
|
||||
// dominated by the actual wait/notify latency anyway.
|
||||
//
|
||||
|
||||
int GenericWaitBarrier::wake_if_needed() {
|
||||
assert(_barrier_tag == 0, "Not disarmed");
|
||||
int w = _waiters;
|
||||
if (w == 0) {
|
||||
// Load of _barrier_threads in caller must not pass the load of _waiters.
|
||||
OrderAccess::loadload();
|
||||
return 0;
|
||||
}
|
||||
assert(w > 0, "Bad counting");
|
||||
// We need an exact count which never goes below zero,
|
||||
// otherwise the semaphore may be signalled too many times.
|
||||
if (Atomic::cmpxchg(&_waiters, w, w - 1) == w) {
|
||||
_sem_barrier.signal();
|
||||
return w - 1;
|
||||
}
|
||||
return w;
|
||||
void GenericWaitBarrier::arm(int barrier_tag) {
|
||||
assert(barrier_tag != 0, "Pre arm: Should be arming with armed value");
|
||||
assert(Atomic::load(&_barrier_tag) == 0,
|
||||
"Pre arm: Should not be already armed. Tag: %d",
|
||||
Atomic::load(&_barrier_tag));
|
||||
Atomic::release_store(&_barrier_tag, barrier_tag);
|
||||
|
||||
Cell &cell = tag_to_cell(barrier_tag);
|
||||
cell.arm(barrier_tag);
|
||||
|
||||
// API specifies arm() must provide a trailing fence.
|
||||
OrderAccess::fence();
|
||||
}
|
||||
|
||||
void GenericWaitBarrier::disarm() {
|
||||
assert(_barrier_tag != 0, "Not armed");
|
||||
_barrier_tag = 0;
|
||||
// Loads of _barrier_threads/_waiters must not float above disarm store and
|
||||
// disarm store must not sink below.
|
||||
OrderAccess::fence();
|
||||
int left;
|
||||
SpinYield sp;
|
||||
do {
|
||||
left = GenericWaitBarrier::wake_if_needed();
|
||||
if (left == 0 && _barrier_threads > 0) {
|
||||
// There is no thread to wake but we still have barrier threads.
|
||||
sp.wait();
|
||||
}
|
||||
// We must loop here until there are no waiters or potential waiters.
|
||||
} while (left > 0 || _barrier_threads > 0);
|
||||
int barrier_tag = Atomic::load_acquire(&_barrier_tag);
|
||||
assert(barrier_tag != 0, "Pre disarm: Should be armed. Tag: %d", barrier_tag);
|
||||
Atomic::release_store(&_barrier_tag, 0);
|
||||
|
||||
Cell &cell = tag_to_cell(barrier_tag);
|
||||
cell.disarm(barrier_tag);
|
||||
|
||||
// API specifies disarm() must provide a trailing fence.
|
||||
OrderAccess::fence();
|
||||
}
|
||||
|
||||
void GenericWaitBarrier::wait(int barrier_tag) {
|
||||
assert(barrier_tag != 0, "Trying to wait on disarmed value");
|
||||
if (barrier_tag != _barrier_tag) {
|
||||
// API specifies wait() must provide a trailing fence.
|
||||
OrderAccess::fence();
|
||||
return;
|
||||
}
|
||||
Atomic::add(&_barrier_threads, 1);
|
||||
if (barrier_tag != 0 && barrier_tag == _barrier_tag) {
|
||||
Atomic::add(&_waiters, 1);
|
||||
_sem_barrier.wait();
|
||||
// We help out with posting, but we need to do so before we decrement the
|
||||
// _barrier_threads otherwise we might wake threads up in next wait.
|
||||
GenericWaitBarrier::wake_if_needed();
|
||||
}
|
||||
Atomic::add(&_barrier_threads, -1);
|
||||
assert(barrier_tag != 0, "Pre wait: Should be waiting on armed value");
|
||||
|
||||
Cell &cell = tag_to_cell(barrier_tag);
|
||||
cell.wait(barrier_tag);
|
||||
|
||||
// API specifies wait() must provide a trailing fence.
|
||||
OrderAccess::fence();
|
||||
}
|
||||
|
||||
void GenericWaitBarrier::Cell::arm(int32_t requested_tag) {
|
||||
// Before we continue to arm, we need to make sure that all threads
|
||||
// have left the previous cell.
|
||||
|
||||
int64_t state;
|
||||
|
||||
SpinYield sp;
|
||||
while (true) {
|
||||
state = Atomic::load_acquire(&_state);
|
||||
assert(decode_tag(state) == 0,
|
||||
"Pre arm: Should not be armed. "
|
||||
"Tag: " INT32_FORMAT "; Waiters: " INT32_FORMAT,
|
||||
decode_tag(state), decode_waiters(state));
|
||||
if (decode_waiters(state) == 0) {
|
||||
break;
|
||||
}
|
||||
sp.wait();
|
||||
}
|
||||
|
||||
// Try to swing cell to armed. This should always succeed after the check above.
|
||||
int64_t new_state = encode(requested_tag, 0);
|
||||
int64_t prev_state = Atomic::cmpxchg(&_state, state, new_state);
|
||||
if (prev_state != state) {
|
||||
fatal("Cannot arm the wait barrier. "
|
||||
"Tag: " INT32_FORMAT "; Waiters: " INT32_FORMAT,
|
||||
decode_tag(prev_state), decode_waiters(prev_state));
|
||||
}
|
||||
}
|
||||
|
||||
int GenericWaitBarrier::Cell::signal_if_needed(int max) {
|
||||
int signals = 0;
|
||||
while (true) {
|
||||
int cur = Atomic::load_acquire(&_outstanding_wakeups);
|
||||
if (cur == 0) {
|
||||
// All done, no more waiters.
|
||||
return 0;
|
||||
}
|
||||
assert(cur > 0, "Sanity");
|
||||
|
||||
int prev = Atomic::cmpxchg(&_outstanding_wakeups, cur, cur - 1);
|
||||
if (prev != cur) {
|
||||
// Contention, return to caller for early return or backoff.
|
||||
return prev;
|
||||
}
|
||||
|
||||
// Signal!
|
||||
_sem.signal();
|
||||
|
||||
if (++signals >= max) {
|
||||
// Signalled requested number of times, break out.
|
||||
return prev;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void GenericWaitBarrier::Cell::disarm(int32_t expected_tag) {
|
||||
int32_t waiters;
|
||||
|
||||
while (true) {
|
||||
int64_t state = Atomic::load_acquire(&_state);
|
||||
int32_t tag = decode_tag(state);
|
||||
waiters = decode_waiters(state);
|
||||
|
||||
assert((tag == expected_tag) && (waiters >= 0),
|
||||
"Mid disarm: Should be armed with expected tag and have sane waiters. "
|
||||
"Tag: " INT32_FORMAT "; Waiters: " INT32_FORMAT,
|
||||
tag, waiters);
|
||||
|
||||
int64_t new_state = encode(0, waiters);
|
||||
if (Atomic::cmpxchg(&_state, state, new_state) == state) {
|
||||
// Successfully disarmed.
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Wake up waiters, if we have at least one.
|
||||
// Allow other threads to assist with wakeups, if possible.
|
||||
if (waiters > 0) {
|
||||
Atomic::release_store(&_outstanding_wakeups, waiters);
|
||||
SpinYield sp;
|
||||
while (signal_if_needed(INT_MAX) > 0) {
|
||||
sp.wait();
|
||||
}
|
||||
}
|
||||
assert(Atomic::load(&_outstanding_wakeups) == 0, "Post disarm: Should not have outstanding wakeups");
|
||||
}
|
||||
|
||||
void GenericWaitBarrier::Cell::wait(int32_t expected_tag) {
|
||||
// Try to register ourselves as pending waiter.
|
||||
while (true) {
|
||||
int64_t state = Atomic::load_acquire(&_state);
|
||||
int32_t tag = decode_tag(state);
|
||||
if (tag != expected_tag) {
|
||||
// Cell tag had changed while waiting here. This means either the cell had
|
||||
// been disarmed, or we are late and the cell was armed with a new tag.
|
||||
// Exit without touching anything else.
|
||||
return;
|
||||
}
|
||||
int32_t waiters = decode_waiters(state);
|
||||
|
||||
assert((tag == expected_tag) && (waiters >= 0 && waiters < INT32_MAX),
|
||||
"Before wait: Should be armed with expected tag and waiters are in range. "
|
||||
"Tag: " INT32_FORMAT "; Waiters: " INT32_FORMAT,
|
||||
tag, waiters);
|
||||
|
||||
int64_t new_state = encode(tag, waiters + 1);
|
||||
if (Atomic::cmpxchg(&_state, state, new_state) == state) {
|
||||
// Success! Proceed to wait.
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for notification.
|
||||
_sem.wait();
|
||||
|
||||
// Unblocked! We help out with waking up two siblings. This allows to avalanche
|
||||
// the wakeups for many threads, even if some threads are lagging behind.
|
||||
// Note that we can only do this *before* reporting back as completed waiter,
|
||||
// otherwise we might prematurely wake up threads for another barrier tag.
|
||||
// Current arm() sequence protects us from this trouble by waiting until all waiters
|
||||
// leave.
|
||||
signal_if_needed(2);
|
||||
|
||||
// Register ourselves as completed waiter before leaving.
|
||||
while (true) {
|
||||
int64_t state = Atomic::load_acquire(&_state);
|
||||
int32_t tag = decode_tag(state);
|
||||
int32_t waiters = decode_waiters(state);
|
||||
|
||||
assert((tag == 0) && (waiters > 0),
|
||||
"After wait: Should be not armed and have non-complete waiters. "
|
||||
"Tag: " INT32_FORMAT "; Waiters: " INT32_FORMAT,
|
||||
tag, waiters);
|
||||
|
||||
int64_t new_state = encode(tag, waiters - 1);
|
||||
if (Atomic::cmpxchg(&_state, state, new_state) == state) {
|
||||
// Success!
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -26,29 +26,79 @@
|
||||
#define SHARE_UTILITIES_WAITBARRIER_GENERIC_HPP
|
||||
|
||||
#include "memory/allocation.hpp"
|
||||
#include "memory/padded.hpp"
|
||||
#include "runtime/semaphore.hpp"
|
||||
#include "utilities/globalDefinitions.hpp"
|
||||
|
||||
// In addition to the barrier tag, it uses two counters to keep the semaphore
|
||||
// count correct and not leave any late thread waiting.
|
||||
class GenericWaitBarrier : public CHeapObj<mtInternal> {
|
||||
private:
|
||||
class Cell : public CHeapObj<mtInternal> {
|
||||
private:
|
||||
// Pad out the cells to avoid interference between the cells.
|
||||
// This would insulate from stalls when adjacent cells have returning
|
||||
// workers and contend over the cache line for current latency-critical
|
||||
// cell.
|
||||
DEFINE_PAD_MINUS_SIZE(0, DEFAULT_CACHE_LINE_SIZE, 0);
|
||||
|
||||
Semaphore _sem;
|
||||
|
||||
// Cell state, tracks the arming + waiters status
|
||||
volatile int64_t _state;
|
||||
|
||||
// Wakeups to deliver for current waiters
|
||||
volatile int _outstanding_wakeups;
|
||||
|
||||
int signal_if_needed(int max);
|
||||
|
||||
static int64_t encode(int32_t barrier_tag, int32_t waiters) {
|
||||
int64_t val = (((int64_t) barrier_tag) << 32) |
|
||||
(((int64_t) waiters) & 0xFFFFFFFF);
|
||||
assert(decode_tag(val) == barrier_tag, "Encoding is reversible");
|
||||
assert(decode_waiters(val) == waiters, "Encoding is reversible");
|
||||
return val;
|
||||
}
|
||||
|
||||
static int32_t decode_tag(int64_t value) {
|
||||
return (int32_t)(value >> 32);
|
||||
}
|
||||
|
||||
static int32_t decode_waiters(int64_t value) {
|
||||
return (int32_t)(value & 0xFFFFFFFF);
|
||||
}
|
||||
|
||||
public:
|
||||
Cell() : _sem(0), _state(encode(0, 0)), _outstanding_wakeups(0) {}
|
||||
NONCOPYABLE(Cell);
|
||||
|
||||
void arm(int32_t requested_tag);
|
||||
void disarm(int32_t expected_tag);
|
||||
void wait(int32_t expected_tag);
|
||||
};
|
||||
|
||||
// Should be enough for most uses without exploding the footprint.
|
||||
static constexpr int CELLS_COUNT = 16;
|
||||
|
||||
Cell _cells[CELLS_COUNT];
|
||||
|
||||
// Trailing padding to protect the last cell.
|
||||
DEFINE_PAD_MINUS_SIZE(0, DEFAULT_CACHE_LINE_SIZE, 0);
|
||||
|
||||
volatile int _barrier_tag;
|
||||
// The number of threads waiting on or about to wait on the semaphore.
|
||||
volatile int _waiters;
|
||||
// The number of threads in the wait path, before or after the tag check.
|
||||
// These threads can become waiters.
|
||||
volatile int _barrier_threads;
|
||||
Semaphore _sem_barrier;
|
||||
|
||||
// Trailing padding to insulate the rest of the barrier from adjacent
|
||||
// data structures. The leading padding is not needed, as cell padding
|
||||
// handles this for us.
|
||||
DEFINE_PAD_MINUS_SIZE(1, DEFAULT_CACHE_LINE_SIZE, 0);
|
||||
|
||||
NONCOPYABLE(GenericWaitBarrier);
|
||||
|
||||
int wake_if_needed();
|
||||
Cell& tag_to_cell(int tag) { return _cells[tag & (CELLS_COUNT - 1)]; }
|
||||
|
||||
public:
|
||||
GenericWaitBarrier() : _barrier_tag(0), _waiters(0), _barrier_threads(0), _sem_barrier(0) {}
|
||||
public:
|
||||
GenericWaitBarrier() : _cells(), _barrier_tag(0) {}
|
||||
~GenericWaitBarrier() {}
|
||||
|
||||
const char* description() { return "semaphore"; }
|
||||
const char* description() { return "striped semaphore"; }
|
||||
|
||||
void arm(int barrier_tag);
|
||||
void disarm();
|
||||
|
Loading…
x
Reference in New Issue
Block a user