8341334: CDS: Parallel relocation

Reviewed-by: iklam, adinn, stuefe
This commit is contained in:
Aleksey Shipilev 2024-11-19 08:45:02 +00:00
parent 499186be0f
commit 76a55c3cb6
6 changed files with 327 additions and 3 deletions

View File

@ -399,3 +399,188 @@ size_t HeapRootSegments::segment_offset(size_t seg_idx) {
return _base_offset + seg_idx * _max_size_in_bytes; return _base_offset + seg_idx * _max_size_in_bytes;
} }
ArchiveWorkers ArchiveWorkers::_workers;
ArchiveWorkers::ArchiveWorkers() :
_start_semaphore(0),
_end_semaphore(0),
_num_workers(0),
_started_workers(0),
_waiting_workers(0),
_running_workers(0),
_state(NOT_READY),
_task(nullptr) {
}
void ArchiveWorkers::initialize() {
assert(Atomic::load(&_state) == NOT_READY, "Should be");
Atomic::store(&_num_workers, max_workers());
Atomic::store(&_state, READY);
// Kick off pool startup by creating a single worker.
start_worker_if_needed();
}
int ArchiveWorkers::max_workers() {
// The pool is used for short-lived bursty tasks. We do not want to spend
// too much time creating and waking up threads unnecessarily. Plus, we do
// not want to overwhelm large machines. This is why we want to be very
// conservative about the number of workers actually needed.
return MAX2(0, log2i_graceful(os::active_processor_count()));
}
bool ArchiveWorkers::is_parallel() {
return _num_workers > 0;
}
void ArchiveWorkers::shutdown() {
while (true) {
State state = Atomic::load(&_state);
if (state == SHUTDOWN) {
// Already shut down.
return;
}
if (Atomic::cmpxchg(&_state, state, SHUTDOWN, memory_order_relaxed) == state) {
if (is_parallel()) {
// Execute a shutdown task and block until all workers respond.
run_task(&_shutdown_task);
}
}
}
}
void ArchiveWorkers::start_worker_if_needed() {
while (true) {
int cur = Atomic::load(&_started_workers);
if (cur >= _num_workers) {
return;
}
if (Atomic::cmpxchg(&_started_workers, cur, cur + 1, memory_order_relaxed) == cur) {
new ArchiveWorkerThread(this);
return;
}
}
}
void ArchiveWorkers::signal_worker_if_needed() {
while (true) {
int cur = Atomic::load(&_waiting_workers);
if (cur == 0) {
return;
}
if (Atomic::cmpxchg(&_waiting_workers, cur, cur - 1, memory_order_relaxed) == cur) {
_start_semaphore.signal(1);
return;
}
}
}
void ArchiveWorkers::run_task(ArchiveWorkerTask* task) {
assert((Atomic::load(&_state) == READY) ||
((Atomic::load(&_state) == SHUTDOWN) && (task == &_shutdown_task)),
"Should be in correct state");
assert(Atomic::load(&_task) == nullptr, "Should not have running tasks");
if (is_parallel()) {
run_task_multi(task);
} else {
run_task_single(task);
}
}
void ArchiveWorkers::run_task_single(ArchiveWorkerTask* task) {
// Single thread needs no chunking.
task->configure_max_chunks(1);
// Execute the task ourselves, as there are no workers.
task->work(0, 1);
}
void ArchiveWorkers::run_task_multi(ArchiveWorkerTask* task) {
// Multiple threads can work with multiple chunks.
task->configure_max_chunks(_num_workers * CHUNKS_PER_WORKER);
// Set up the run and publish the task.
Atomic::store(&_waiting_workers, _num_workers);
Atomic::store(&_running_workers, _num_workers);
Atomic::release_store(&_task, task);
// Kick off pool wakeup by signaling a single worker, and proceed
// immediately to executing the task locally.
signal_worker_if_needed();
// Execute the task ourselves, while workers are catching up.
// This allows us to hide parts of task handoff latency.
task->run();
// Done executing task locally, wait for any remaining workers to complete,
// and then do the final housekeeping.
_end_semaphore.wait();
Atomic::store(&_task, (ArchiveWorkerTask *) nullptr);
OrderAccess::fence();
assert(Atomic::load(&_waiting_workers) == 0, "All workers were signaled");
assert(Atomic::load(&_running_workers) == 0, "No workers are running");
}
void ArchiveWorkerTask::run() {
while (true) {
int chunk = Atomic::load(&_chunk);
if (chunk >= _max_chunks) {
return;
}
if (Atomic::cmpxchg(&_chunk, chunk, chunk + 1, memory_order_relaxed) == chunk) {
assert(0 <= chunk && chunk < _max_chunks, "Sanity");
work(chunk, _max_chunks);
}
}
}
void ArchiveWorkerTask::configure_max_chunks(int max_chunks) {
if (_max_chunks == 0) {
_max_chunks = max_chunks;
}
}
bool ArchiveWorkers::run_as_worker() {
assert(is_parallel(), "Should be in parallel mode");
_start_semaphore.wait();
// Avalanche wakeups: each worker signals two others.
signal_worker_if_needed();
signal_worker_if_needed();
ArchiveWorkerTask* task = Atomic::load_acquire(&_task);
task->run();
// All work done in threads should be visible to caller.
OrderAccess::fence();
// Signal the pool the tasks are complete, if this is the last worker.
if (Atomic::sub(&_running_workers, 1, memory_order_relaxed) == 0) {
_end_semaphore.signal();
}
// Continue if task was not a termination task.
return (task != &_shutdown_task);
}
ArchiveWorkerThread::ArchiveWorkerThread(ArchiveWorkers* pool) : NamedThread(), _pool(pool) {
set_name("ArchiveWorkerThread");
os::create_thread(this, os::os_thread);
os::start_thread(this);
}
void ArchiveWorkerThread::run() {
// Avalanche thread startup: each starting worker starts two others.
_pool->start_worker_if_needed();
_pool->start_worker_if_needed();
// Set ourselves up.
os::set_priority(this, NearMaxPriority);
while (_pool->run_as_worker()) {
// Work until terminated.
}
}

View File

@ -33,6 +33,8 @@
#include "utilities/bitMap.hpp" #include "utilities/bitMap.hpp"
#include "utilities/exceptions.hpp" #include "utilities/exceptions.hpp"
#include "utilities/macros.hpp" #include "utilities/macros.hpp"
#include "runtime/nonJavaThread.hpp"
#include "runtime/semaphore.hpp"
class BootstrapInfo; class BootstrapInfo;
class ReservedSpace; class ReservedSpace;
@ -319,4 +321,95 @@ public:
HeapRootSegments& operator=(const HeapRootSegments&) = default; HeapRootSegments& operator=(const HeapRootSegments&) = default;
}; };
class ArchiveWorkers;
// A task to be worked on by worker threads
class ArchiveWorkerTask : public CHeapObj<mtInternal> {
friend class ArchiveWorkers;
friend class ArchiveWorkerShutdownTask;
private:
const char* _name;
int _max_chunks;
volatile int _chunk;
void run();
void configure_max_chunks(int max_chunks);
public:
ArchiveWorkerTask(const char* name) :
_name(name), _max_chunks(0), _chunk(0) {}
const char* name() const { return _name; }
virtual void work(int chunk, int max_chunks) = 0;
};
class ArchiveWorkerThread : public NamedThread {
friend class ArchiveWorkers;
private:
ArchiveWorkers* const _pool;
public:
ArchiveWorkerThread(ArchiveWorkers* pool);
const char* type_name() const override { return "Archive Worker Thread"; }
void run() override;
};
class ArchiveWorkerShutdownTask : public ArchiveWorkerTask {
public:
ArchiveWorkerShutdownTask() : ArchiveWorkerTask("Archive Worker Shutdown") {
// This task always have only one chunk.
configure_max_chunks(1);
}
void work(int chunk, int max_chunks) override {
// Do nothing.
}
};
// Special worker pool for archive workers. The goal for this pool is to
// startup fast, distribute spiky workloads efficiently, and being able to
// shutdown after use. This makes the implementation quite different from
// the normal GC worker pool.
class ArchiveWorkers {
friend class ArchiveWorkerThread;
private:
// Target number of chunks per worker. This should be large enough to even
// out work imbalance, and small enough to keep bookkeeping overheads low.
static constexpr int CHUNKS_PER_WORKER = 4;
static int max_workers();
// Global shared instance. Can be uninitialized, can be shut down.
static ArchiveWorkers _workers;
ArchiveWorkerShutdownTask _shutdown_task;
Semaphore _start_semaphore;
Semaphore _end_semaphore;
int _num_workers;
int _started_workers;
int _waiting_workers;
int _running_workers;
typedef enum { NOT_READY, READY, SHUTDOWN } State;
volatile State _state;
ArchiveWorkerTask* _task;
bool run_as_worker();
void start_worker_if_needed();
void signal_worker_if_needed();
void run_task_single(ArchiveWorkerTask* task);
void run_task_multi(ArchiveWorkerTask* task);
bool is_parallel();
ArchiveWorkers();
public:
static ArchiveWorkers* workers() { return &_workers; }
void initialize();
void shutdown();
void run_task(ArchiveWorkerTask* task);
};
#endif // SHARE_CDS_ARCHIVEUTILS_HPP #endif // SHARE_CDS_ARCHIVEUTILS_HPP

View File

@ -117,7 +117,10 @@
product(bool, AOTClassLinking, false, \ product(bool, AOTClassLinking, false, \
"Load/link all archived classes for the boot/platform/app " \ "Load/link all archived classes for the boot/platform/app " \
"loaders before application main") \ "loaders before application main") \
\
product(bool, AOTCacheParallelRelocation, true, DIAGNOSTIC, \
"Use parallel relocation code to speed up startup.") \
\
// end of CDS_FLAGS // end of CDS_FLAGS
DECLARE_FLAGS(CDS_FLAGS) DECLARE_FLAGS(CDS_FLAGS)

View File

@ -1972,6 +1972,32 @@ char* FileMapInfo::map_bitmap_region() {
return bitmap_base; return bitmap_base;
} }
class SharedDataRelocationTask : public ArchiveWorkerTask {
private:
BitMapView* const _rw_bm;
BitMapView* const _ro_bm;
SharedDataRelocator* const _rw_reloc;
SharedDataRelocator* const _ro_reloc;
public:
SharedDataRelocationTask(BitMapView* rw_bm, BitMapView* ro_bm, SharedDataRelocator* rw_reloc, SharedDataRelocator* ro_reloc) :
ArchiveWorkerTask("Shared Data Relocation"),
_rw_bm(rw_bm), _ro_bm(ro_bm), _rw_reloc(rw_reloc), _ro_reloc(ro_reloc) {}
void work(int chunk, int max_chunks) override {
work_on(chunk, max_chunks, _rw_bm, _rw_reloc);
work_on(chunk, max_chunks, _ro_bm, _ro_reloc);
}
void work_on(int chunk, int max_chunks, BitMapView* bm, SharedDataRelocator* reloc) {
BitMap::idx_t size = bm->size();
BitMap::idx_t start = MIN2(size, size * chunk / max_chunks);
BitMap::idx_t end = MIN2(size, size * (chunk + 1) / max_chunks);
assert(end > start, "Sanity: no empty slices");
bm->iterate(reloc, start, end);
}
};
// This is called when we cannot map the archive at the requested[ base address (usually 0x800000000). // This is called when we cannot map the archive at the requested[ base address (usually 0x800000000).
// We relocate all pointers in the 2 core regions (ro, rw). // We relocate all pointers in the 2 core regions (ro, rw).
bool FileMapInfo::relocate_pointers_in_core_regions(intx addr_delta) { bool FileMapInfo::relocate_pointers_in_core_regions(intx addr_delta) {
@ -2010,8 +2036,14 @@ bool FileMapInfo::relocate_pointers_in_core_regions(intx addr_delta) {
valid_new_base, valid_new_end, addr_delta); valid_new_base, valid_new_end, addr_delta);
SharedDataRelocator ro_patcher((address*)ro_patch_base + header()->ro_ptrmap_start_pos(), (address*)ro_patch_end, valid_old_base, valid_old_end, SharedDataRelocator ro_patcher((address*)ro_patch_base + header()->ro_ptrmap_start_pos(), (address*)ro_patch_end, valid_old_base, valid_old_end,
valid_new_base, valid_new_end, addr_delta); valid_new_base, valid_new_end, addr_delta);
if (AOTCacheParallelRelocation) {
SharedDataRelocationTask task(&rw_ptrmap, &ro_ptrmap, &rw_patcher, &ro_patcher);
ArchiveWorkers::workers()->run_task(&task);
} else {
rw_ptrmap.iterate(&rw_patcher); rw_ptrmap.iterate(&rw_patcher);
ro_ptrmap.iterate(&ro_patcher); ro_ptrmap.iterate(&ro_patcher);
}
// The MetaspaceShared::bm region will be unmapped in MetaspaceShared::initialize_shared_spaces(). // The MetaspaceShared::bm region will be unmapped in MetaspaceShared::initialize_shared_spaces().

View File

@ -1088,6 +1088,9 @@ void MetaspaceShared::initialize_runtime_shared_and_meta_spaces() {
assert(CDSConfig::is_using_archive(), "Must be called when UseSharedSpaces is enabled"); assert(CDSConfig::is_using_archive(), "Must be called when UseSharedSpaces is enabled");
MapArchiveResult result = MAP_ARCHIVE_OTHER_FAILURE; MapArchiveResult result = MAP_ARCHIVE_OTHER_FAILURE;
// We are about to open the archives. Initialize workers now.
ArchiveWorkers::workers()->initialize();
FileMapInfo* static_mapinfo = open_static_archive(); FileMapInfo* static_mapinfo = open_static_archive();
FileMapInfo* dynamic_mapinfo = nullptr; FileMapInfo* dynamic_mapinfo = nullptr;
@ -1679,6 +1682,9 @@ void MetaspaceShared::initialize_shared_spaces() {
dynamic_mapinfo->unmap_region(MetaspaceShared::bm); dynamic_mapinfo->unmap_region(MetaspaceShared::bm);
} }
// Archive was fully read. Workers are no longer needed.
ArchiveWorkers::workers()->shutdown();
LogStreamHandle(Info, cds) lsh; LogStreamHandle(Info, cds) lsh;
if (lsh.is_enabled()) { if (lsh.is_enabled()) {
lsh.print("Using AOT-linked classes: %s (static archive: %s aot-linked classes", lsh.print("Using AOT-linked classes: %s (static archive: %s aot-linked classes",

View File

@ -441,6 +441,11 @@ void before_exit(JavaThread* thread, bool halt) {
#if INCLUDE_CDS #if INCLUDE_CDS
ClassListWriter::write_resolved_constants(); ClassListWriter::write_resolved_constants();
// Initiate Archive Workers shutdown. These workers are likely already
// shut down, but we need to make sure they really are. Otherwise, workers
// would fail hard on broken semaphores.
ArchiveWorkers::workers()->shutdown();
#endif #endif
// Hang forever on exit if we're reporting an error. // Hang forever on exit if we're reporting an error.