From 76a55c3cb6e5177442f355ae1036db4fbf8e54af Mon Sep 17 00:00:00 2001 From: Aleksey Shipilev Date: Tue, 19 Nov 2024 08:45:02 +0000 Subject: [PATCH] 8341334: CDS: Parallel relocation Reviewed-by: iklam, adinn, stuefe --- src/hotspot/share/cds/archiveUtils.cpp | 185 ++++++++++++++++++++++ src/hotspot/share/cds/archiveUtils.hpp | 93 +++++++++++ src/hotspot/share/cds/cds_globals.hpp | 5 +- src/hotspot/share/cds/filemap.cpp | 36 ++++- src/hotspot/share/cds/metaspaceShared.cpp | 6 + src/hotspot/share/runtime/java.cpp | 5 + 6 files changed, 327 insertions(+), 3 deletions(-) diff --git a/src/hotspot/share/cds/archiveUtils.cpp b/src/hotspot/share/cds/archiveUtils.cpp index 4d717879e0f..3c778cbb523 100644 --- a/src/hotspot/share/cds/archiveUtils.cpp +++ b/src/hotspot/share/cds/archiveUtils.cpp @@ -399,3 +399,188 @@ size_t HeapRootSegments::segment_offset(size_t seg_idx) { return _base_offset + seg_idx * _max_size_in_bytes; } +ArchiveWorkers ArchiveWorkers::_workers; + +ArchiveWorkers::ArchiveWorkers() : + _start_semaphore(0), + _end_semaphore(0), + _num_workers(0), + _started_workers(0), + _waiting_workers(0), + _running_workers(0), + _state(NOT_READY), + _task(nullptr) { +} + +void ArchiveWorkers::initialize() { + assert(Atomic::load(&_state) == NOT_READY, "Should be"); + + Atomic::store(&_num_workers, max_workers()); + Atomic::store(&_state, READY); + + // Kick off pool startup by creating a single worker. + start_worker_if_needed(); +} + +int ArchiveWorkers::max_workers() { + // The pool is used for short-lived bursty tasks. We do not want to spend + // too much time creating and waking up threads unnecessarily. Plus, we do + // not want to overwhelm large machines. This is why we want to be very + // conservative about the number of workers actually needed. + return MAX2(0, log2i_graceful(os::active_processor_count())); +} + +bool ArchiveWorkers::is_parallel() { + return _num_workers > 0; +} + +void ArchiveWorkers::shutdown() { + while (true) { + State state = Atomic::load(&_state); + if (state == SHUTDOWN) { + // Already shut down. + return; + } + if (Atomic::cmpxchg(&_state, state, SHUTDOWN, memory_order_relaxed) == state) { + if (is_parallel()) { + // Execute a shutdown task and block until all workers respond. + run_task(&_shutdown_task); + } + } + } +} + +void ArchiveWorkers::start_worker_if_needed() { + while (true) { + int cur = Atomic::load(&_started_workers); + if (cur >= _num_workers) { + return; + } + if (Atomic::cmpxchg(&_started_workers, cur, cur + 1, memory_order_relaxed) == cur) { + new ArchiveWorkerThread(this); + return; + } + } +} + +void ArchiveWorkers::signal_worker_if_needed() { + while (true) { + int cur = Atomic::load(&_waiting_workers); + if (cur == 0) { + return; + } + if (Atomic::cmpxchg(&_waiting_workers, cur, cur - 1, memory_order_relaxed) == cur) { + _start_semaphore.signal(1); + return; + } + } +} + +void ArchiveWorkers::run_task(ArchiveWorkerTask* task) { + assert((Atomic::load(&_state) == READY) || + ((Atomic::load(&_state) == SHUTDOWN) && (task == &_shutdown_task)), + "Should be in correct state"); + assert(Atomic::load(&_task) == nullptr, "Should not have running tasks"); + + if (is_parallel()) { + run_task_multi(task); + } else { + run_task_single(task); + } +} + +void ArchiveWorkers::run_task_single(ArchiveWorkerTask* task) { + // Single thread needs no chunking. + task->configure_max_chunks(1); + + // Execute the task ourselves, as there are no workers. + task->work(0, 1); +} + +void ArchiveWorkers::run_task_multi(ArchiveWorkerTask* task) { + // Multiple threads can work with multiple chunks. + task->configure_max_chunks(_num_workers * CHUNKS_PER_WORKER); + + // Set up the run and publish the task. + Atomic::store(&_waiting_workers, _num_workers); + Atomic::store(&_running_workers, _num_workers); + Atomic::release_store(&_task, task); + + // Kick off pool wakeup by signaling a single worker, and proceed + // immediately to executing the task locally. + signal_worker_if_needed(); + + // Execute the task ourselves, while workers are catching up. + // This allows us to hide parts of task handoff latency. + task->run(); + + // Done executing task locally, wait for any remaining workers to complete, + // and then do the final housekeeping. + _end_semaphore.wait(); + Atomic::store(&_task, (ArchiveWorkerTask *) nullptr); + OrderAccess::fence(); + + assert(Atomic::load(&_waiting_workers) == 0, "All workers were signaled"); + assert(Atomic::load(&_running_workers) == 0, "No workers are running"); +} + +void ArchiveWorkerTask::run() { + while (true) { + int chunk = Atomic::load(&_chunk); + if (chunk >= _max_chunks) { + return; + } + if (Atomic::cmpxchg(&_chunk, chunk, chunk + 1, memory_order_relaxed) == chunk) { + assert(0 <= chunk && chunk < _max_chunks, "Sanity"); + work(chunk, _max_chunks); + } + } +} + +void ArchiveWorkerTask::configure_max_chunks(int max_chunks) { + if (_max_chunks == 0) { + _max_chunks = max_chunks; + } +} + +bool ArchiveWorkers::run_as_worker() { + assert(is_parallel(), "Should be in parallel mode"); + _start_semaphore.wait(); + + // Avalanche wakeups: each worker signals two others. + signal_worker_if_needed(); + signal_worker_if_needed(); + + ArchiveWorkerTask* task = Atomic::load_acquire(&_task); + task->run(); + + // All work done in threads should be visible to caller. + OrderAccess::fence(); + + // Signal the pool the tasks are complete, if this is the last worker. + if (Atomic::sub(&_running_workers, 1, memory_order_relaxed) == 0) { + _end_semaphore.signal(); + } + + // Continue if task was not a termination task. + return (task != &_shutdown_task); +} + +ArchiveWorkerThread::ArchiveWorkerThread(ArchiveWorkers* pool) : NamedThread(), _pool(pool) { + set_name("ArchiveWorkerThread"); + os::create_thread(this, os::os_thread); + os::start_thread(this); +} + +void ArchiveWorkerThread::run() { + // Avalanche thread startup: each starting worker starts two others. + _pool->start_worker_if_needed(); + _pool->start_worker_if_needed(); + + // Set ourselves up. + os::set_priority(this, NearMaxPriority); + + while (_pool->run_as_worker()) { + // Work until terminated. + } +} diff --git a/src/hotspot/share/cds/archiveUtils.hpp b/src/hotspot/share/cds/archiveUtils.hpp index 606f5137e9d..128fdc4d33a 100644 --- a/src/hotspot/share/cds/archiveUtils.hpp +++ b/src/hotspot/share/cds/archiveUtils.hpp @@ -33,6 +33,8 @@ #include "utilities/bitMap.hpp" #include "utilities/exceptions.hpp" #include "utilities/macros.hpp" +#include "runtime/nonJavaThread.hpp" +#include "runtime/semaphore.hpp" class BootstrapInfo; class ReservedSpace; @@ -319,4 +321,95 @@ public: HeapRootSegments& operator=(const HeapRootSegments&) = default; }; +class ArchiveWorkers; + +// A task to be worked on by worker threads +class ArchiveWorkerTask : public CHeapObj { + friend class ArchiveWorkers; + friend class ArchiveWorkerShutdownTask; +private: + const char* _name; + int _max_chunks; + volatile int _chunk; + + void run(); + + void configure_max_chunks(int max_chunks); + +public: + ArchiveWorkerTask(const char* name) : + _name(name), _max_chunks(0), _chunk(0) {} + const char* name() const { return _name; } + virtual void work(int chunk, int max_chunks) = 0; +}; + +class ArchiveWorkerThread : public NamedThread { + friend class ArchiveWorkers; +private: + ArchiveWorkers* const _pool; + +public: + ArchiveWorkerThread(ArchiveWorkers* pool); + const char* type_name() const override { return "Archive Worker Thread"; } + void run() override; +}; + +class ArchiveWorkerShutdownTask : public ArchiveWorkerTask { +public: + ArchiveWorkerShutdownTask() : ArchiveWorkerTask("Archive Worker Shutdown") { + // This task always have only one chunk. + configure_max_chunks(1); + } + void work(int chunk, int max_chunks) override { + // Do nothing. + } +}; + +// Special worker pool for archive workers. The goal for this pool is to +// startup fast, distribute spiky workloads efficiently, and being able to +// shutdown after use. This makes the implementation quite different from +// the normal GC worker pool. +class ArchiveWorkers { + friend class ArchiveWorkerThread; +private: + // Target number of chunks per worker. This should be large enough to even + // out work imbalance, and small enough to keep bookkeeping overheads low. + static constexpr int CHUNKS_PER_WORKER = 4; + static int max_workers(); + + // Global shared instance. Can be uninitialized, can be shut down. + static ArchiveWorkers _workers; + + ArchiveWorkerShutdownTask _shutdown_task; + Semaphore _start_semaphore; + Semaphore _end_semaphore; + + int _num_workers; + int _started_workers; + int _waiting_workers; + int _running_workers; + + typedef enum { NOT_READY, READY, SHUTDOWN } State; + volatile State _state; + + ArchiveWorkerTask* _task; + + bool run_as_worker(); + void start_worker_if_needed(); + void signal_worker_if_needed(); + + void run_task_single(ArchiveWorkerTask* task); + void run_task_multi(ArchiveWorkerTask* task); + + bool is_parallel(); + + ArchiveWorkers(); + +public: + static ArchiveWorkers* workers() { return &_workers; } + void initialize(); + void shutdown(); + void run_task(ArchiveWorkerTask* task); +}; + #endif // SHARE_CDS_ARCHIVEUTILS_HPP diff --git a/src/hotspot/share/cds/cds_globals.hpp b/src/hotspot/share/cds/cds_globals.hpp index 38f5d8f46a6..811740cfbcb 100644 --- a/src/hotspot/share/cds/cds_globals.hpp +++ b/src/hotspot/share/cds/cds_globals.hpp @@ -117,7 +117,10 @@ product(bool, AOTClassLinking, false, \ "Load/link all archived classes for the boot/platform/app " \ "loaders before application main") \ - + \ + product(bool, AOTCacheParallelRelocation, true, DIAGNOSTIC, \ + "Use parallel relocation code to speed up startup.") \ + \ // end of CDS_FLAGS DECLARE_FLAGS(CDS_FLAGS) diff --git a/src/hotspot/share/cds/filemap.cpp b/src/hotspot/share/cds/filemap.cpp index 00d8fba4411..d33710a65a3 100644 --- a/src/hotspot/share/cds/filemap.cpp +++ b/src/hotspot/share/cds/filemap.cpp @@ -1972,6 +1972,32 @@ char* FileMapInfo::map_bitmap_region() { return bitmap_base; } +class SharedDataRelocationTask : public ArchiveWorkerTask { +private: + BitMapView* const _rw_bm; + BitMapView* const _ro_bm; + SharedDataRelocator* const _rw_reloc; + SharedDataRelocator* const _ro_reloc; + +public: + SharedDataRelocationTask(BitMapView* rw_bm, BitMapView* ro_bm, SharedDataRelocator* rw_reloc, SharedDataRelocator* ro_reloc) : + ArchiveWorkerTask("Shared Data Relocation"), + _rw_bm(rw_bm), _ro_bm(ro_bm), _rw_reloc(rw_reloc), _ro_reloc(ro_reloc) {} + + void work(int chunk, int max_chunks) override { + work_on(chunk, max_chunks, _rw_bm, _rw_reloc); + work_on(chunk, max_chunks, _ro_bm, _ro_reloc); + } + + void work_on(int chunk, int max_chunks, BitMapView* bm, SharedDataRelocator* reloc) { + BitMap::idx_t size = bm->size(); + BitMap::idx_t start = MIN2(size, size * chunk / max_chunks); + BitMap::idx_t end = MIN2(size, size * (chunk + 1) / max_chunks); + assert(end > start, "Sanity: no empty slices"); + bm->iterate(reloc, start, end); + } +}; + // This is called when we cannot map the archive at the requested[ base address (usually 0x800000000). // We relocate all pointers in the 2 core regions (ro, rw). bool FileMapInfo::relocate_pointers_in_core_regions(intx addr_delta) { @@ -2010,8 +2036,14 @@ bool FileMapInfo::relocate_pointers_in_core_regions(intx addr_delta) { valid_new_base, valid_new_end, addr_delta); SharedDataRelocator ro_patcher((address*)ro_patch_base + header()->ro_ptrmap_start_pos(), (address*)ro_patch_end, valid_old_base, valid_old_end, valid_new_base, valid_new_end, addr_delta); - rw_ptrmap.iterate(&rw_patcher); - ro_ptrmap.iterate(&ro_patcher); + + if (AOTCacheParallelRelocation) { + SharedDataRelocationTask task(&rw_ptrmap, &ro_ptrmap, &rw_patcher, &ro_patcher); + ArchiveWorkers::workers()->run_task(&task); + } else { + rw_ptrmap.iterate(&rw_patcher); + ro_ptrmap.iterate(&ro_patcher); + } // The MetaspaceShared::bm region will be unmapped in MetaspaceShared::initialize_shared_spaces(). diff --git a/src/hotspot/share/cds/metaspaceShared.cpp b/src/hotspot/share/cds/metaspaceShared.cpp index b2f5f420365..a8c10e38577 100644 --- a/src/hotspot/share/cds/metaspaceShared.cpp +++ b/src/hotspot/share/cds/metaspaceShared.cpp @@ -1088,6 +1088,9 @@ void MetaspaceShared::initialize_runtime_shared_and_meta_spaces() { assert(CDSConfig::is_using_archive(), "Must be called when UseSharedSpaces is enabled"); MapArchiveResult result = MAP_ARCHIVE_OTHER_FAILURE; + // We are about to open the archives. Initialize workers now. + ArchiveWorkers::workers()->initialize(); + FileMapInfo* static_mapinfo = open_static_archive(); FileMapInfo* dynamic_mapinfo = nullptr; @@ -1679,6 +1682,9 @@ void MetaspaceShared::initialize_shared_spaces() { dynamic_mapinfo->unmap_region(MetaspaceShared::bm); } + // Archive was fully read. Workers are no longer needed. + ArchiveWorkers::workers()->shutdown(); + LogStreamHandle(Info, cds) lsh; if (lsh.is_enabled()) { lsh.print("Using AOT-linked classes: %s (static archive: %s aot-linked classes", diff --git a/src/hotspot/share/runtime/java.cpp b/src/hotspot/share/runtime/java.cpp index c9c7ae8e4c3..d2338cd74d0 100644 --- a/src/hotspot/share/runtime/java.cpp +++ b/src/hotspot/share/runtime/java.cpp @@ -441,6 +441,11 @@ void before_exit(JavaThread* thread, bool halt) { #if INCLUDE_CDS ClassListWriter::write_resolved_constants(); + + // Initiate Archive Workers shutdown. These workers are likely already + // shut down, but we need to make sure they really are. Otherwise, workers + // would fail hard on broken semaphores. + ArchiveWorkers::workers()->shutdown(); #endif // Hang forever on exit if we're reporting an error.