diff --git a/hotspot/src/share/vm/gc_implementation/concurrentMarkSweep/compactibleFreeListSpace.cpp b/hotspot/src/share/vm/gc_implementation/concurrentMarkSweep/compactibleFreeListSpace.cpp index eeabe9739c4..2f960435043 100644 --- a/hotspot/src/share/vm/gc_implementation/concurrentMarkSweep/compactibleFreeListSpace.cpp +++ b/hotspot/src/share/vm/gc_implementation/concurrentMarkSweep/compactibleFreeListSpace.cpp @@ -668,12 +668,16 @@ public: // We de-virtualize the block-related calls below, since we know that our // space is a CompactibleFreeListSpace. + #define FreeListSpace_DCTOC__walk_mem_region_with_cl_DEFN(ClosureType) \ void FreeListSpace_DCTOC::walk_mem_region_with_cl(MemRegion mr, \ HeapWord* bottom, \ HeapWord* top, \ ClosureType* cl) { \ - if (SharedHeap::heap()->n_par_threads() > 0) { \ + bool is_par = SharedHeap::heap()->n_par_threads() > 0; \ + if (is_par) { \ + assert(SharedHeap::heap()->n_par_threads() == \ + SharedHeap::heap()->workers()->active_workers(), "Mismatch"); \ walk_mem_region_with_cl_par(mr, bottom, top, cl); \ } else { \ walk_mem_region_with_cl_nopar(mr, bottom, top, cl); \ @@ -1925,6 +1929,9 @@ CompactibleFreeListSpace::splitChunkAndReturnRemainder(FreeChunk* chunk, if (rem_size < SmallForDictionary) { bool is_par = (SharedHeap::heap()->n_par_threads() > 0); if (is_par) _indexedFreeListParLocks[rem_size]->lock(); + assert(!is_par || + (SharedHeap::heap()->n_par_threads() == + SharedHeap::heap()->workers()->active_workers()), "Mismatch"); returnChunkToFreeList(ffc); split(size, rem_size); if (is_par) _indexedFreeListParLocks[rem_size]->unlock(); diff --git a/hotspot/src/share/vm/gc_implementation/concurrentMarkSweep/concurrentMarkSweepGeneration.cpp b/hotspot/src/share/vm/gc_implementation/concurrentMarkSweep/concurrentMarkSweepGeneration.cpp index 54bbb24b0a4..162b991ecbb 100644 --- a/hotspot/src/share/vm/gc_implementation/concurrentMarkSweep/concurrentMarkSweepGeneration.cpp +++ b/hotspot/src/share/vm/gc_implementation/concurrentMarkSweep/concurrentMarkSweepGeneration.cpp @@ -4244,9 +4244,11 @@ void CMSConcMarkingTask::coordinator_yield() { bool CMSCollector::do_marking_mt(bool asynch) { assert(ConcGCThreads > 0 && conc_workers() != NULL, "precondition"); - // In the future this would be determined ergonomically, based - // on #cpu's, # active mutator threads (and load), and mutation rate. - int num_workers = ConcGCThreads; + int num_workers = AdaptiveSizePolicy::calc_active_conc_workers( + conc_workers()->total_workers(), + conc_workers()->active_workers(), + Threads::number_of_non_daemon_threads()); + conc_workers()->set_active_workers(num_workers); CompactibleFreeListSpace* cms_space = _cmsGen->cmsSpace(); CompactibleFreeListSpace* perm_space = _permGen->cmsSpace(); @@ -5062,6 +5064,8 @@ class CMSParRemarkTask: public AbstractGangTask { ParallelTaskTerminator _term; public: + // A value of 0 passed to n_workers will cause the number of + // workers to be taken from the active workers in the work gang. CMSParRemarkTask(CMSCollector* collector, CompactibleFreeListSpace* cms_space, CompactibleFreeListSpace* perm_space, @@ -5544,7 +5548,15 @@ void CMSCollector::do_remark_parallel() { GenCollectedHeap* gch = GenCollectedHeap::heap(); FlexibleWorkGang* workers = gch->workers(); assert(workers != NULL, "Need parallel worker threads."); - int n_workers = workers->total_workers(); + // Choose to use the number of GC workers most recently set + // into "active_workers". If active_workers is not set, set it + // to ParallelGCThreads. + int n_workers = workers->active_workers(); + if (n_workers == 0) { + assert(n_workers > 0, "Should have been set during scavenge"); + n_workers = ParallelGCThreads; + workers->set_active_workers(n_workers); + } CompactibleFreeListSpace* cms_space = _cmsGen->cmsSpace(); CompactibleFreeListSpace* perm_space = _permGen->cmsSpace(); @@ -5884,8 +5896,17 @@ void CMSCollector::refProcessingWork(bool asynch, bool clear_all_soft_refs) { // and a different number of discovered lists may have Ref objects. // That is OK as long as the Reference lists are balanced (see // balance_all_queues() and balance_queues()). - - rp->set_active_mt_degree(ParallelGCThreads); + GenCollectedHeap* gch = GenCollectedHeap::heap(); + int active_workers = ParallelGCThreads; + FlexibleWorkGang* workers = gch->workers(); + if (workers != NULL) { + active_workers = workers->active_workers(); + // The expectation is that active_workers will have already + // been set to a reasonable value. If it has not been set, + // investigate. + assert(active_workers > 0, "Should have been set during scavenge"); + } + rp->set_active_mt_degree(active_workers); CMSRefProcTaskExecutor task_executor(*this); rp->process_discovered_references(&_is_alive_closure, &cmsKeepAliveClosure, diff --git a/hotspot/src/share/vm/gc_implementation/g1/collectionSetChooser.cpp b/hotspot/src/share/vm/gc_implementation/g1/collectionSetChooser.cpp index 0b785656f90..544b5a8676c 100644 --- a/hotspot/src/share/vm/gc_implementation/g1/collectionSetChooser.cpp +++ b/hotspot/src/share/vm/gc_implementation/g1/collectionSetChooser.cpp @@ -255,7 +255,18 @@ void CollectionSetChooser:: prepareForAddMarkedHeapRegionsPar(size_t n_regions, size_t chunkSize) { _first_par_unreserved_idx = 0; - size_t max_waste = ParallelGCThreads * chunkSize; + int n_threads = ParallelGCThreads; + if (UseDynamicNumberOfGCThreads) { + assert(G1CollectedHeap::heap()->workers()->active_workers() > 0, + "Should have been set earlier"); + // This is defensive code. As the assertion above says, the number + // of active threads should be > 0, but in case there is some path + // or some improperly initialized variable with leads to no + // active threads, protect against that in a product build. + n_threads = MAX2(G1CollectedHeap::heap()->workers()->active_workers(), + 1); + } + size_t max_waste = n_threads * chunkSize; // it should be aligned with respect to chunkSize size_t aligned_n_regions = (n_regions + (chunkSize - 1)) / chunkSize * chunkSize; @@ -265,6 +276,11 @@ prepareForAddMarkedHeapRegionsPar(size_t n_regions, size_t chunkSize) { jint CollectionSetChooser::getParMarkedHeapRegionChunk(jint n_regions) { + // Don't do this assert because this can be called at a point + // where the loop up stream will not execute again but might + // try to claim more chunks (loop test has not been done yet). + // assert(_markedRegions.length() > _first_par_unreserved_idx, + // "Striding beyond the marked regions"); jint res = Atomic::add(n_regions, &_first_par_unreserved_idx); assert(_markedRegions.length() > res + n_regions - 1, "Should already have been expanded"); diff --git a/hotspot/src/share/vm/gc_implementation/g1/concurrentMark.cpp b/hotspot/src/share/vm/gc_implementation/g1/concurrentMark.cpp index 0cf789171d8..97512a0cddc 100644 --- a/hotspot/src/share/vm/gc_implementation/g1/concurrentMark.cpp +++ b/hotspot/src/share/vm/gc_implementation/g1/concurrentMark.cpp @@ -458,12 +458,17 @@ bool ConcurrentMark::not_yet_marked(oop obj) const { #pragma warning( disable:4355 ) // 'this' : used in base member initializer list #endif // _MSC_VER +size_t ConcurrentMark::scale_parallel_threads(size_t n_par_threads) { + return MAX2((n_par_threads + 2) / 4, (size_t)1); +} + ConcurrentMark::ConcurrentMark(ReservedSpace rs, int max_regions) : _markBitMap1(rs, MinObjAlignment - 1), _markBitMap2(rs, MinObjAlignment - 1), _parallel_marking_threads(0), + _max_parallel_marking_threads(0), _sleep_factor(0.0), _marking_task_overhead(1.0), _cleanup_sleep_factor(0.0), @@ -554,15 +559,17 @@ ConcurrentMark::ConcurrentMark(ReservedSpace rs, if (ParallelGCThreads == 0) { // if we are not running with any parallel GC threads we will not // spawn any marking threads either - _parallel_marking_threads = 0; - _sleep_factor = 0.0; - _marking_task_overhead = 1.0; + _parallel_marking_threads = 0; + _max_parallel_marking_threads = 0; + _sleep_factor = 0.0; + _marking_task_overhead = 1.0; } else { if (ConcGCThreads > 0) { // notice that ConcGCThreads overwrites G1MarkingOverheadPercent // if both are set _parallel_marking_threads = ConcGCThreads; + _max_parallel_marking_threads = _parallel_marking_threads; _sleep_factor = 0.0; _marking_task_overhead = 1.0; } else if (G1MarkingOverheadPercent > 0) { @@ -583,10 +590,12 @@ ConcurrentMark::ConcurrentMark(ReservedSpace rs, (1.0 - marking_task_overhead) / marking_task_overhead; _parallel_marking_threads = (size_t) marking_thread_num; + _max_parallel_marking_threads = _parallel_marking_threads; _sleep_factor = sleep_factor; _marking_task_overhead = marking_task_overhead; } else { - _parallel_marking_threads = MAX2((ParallelGCThreads + 2) / 4, (size_t)1); + _parallel_marking_threads = scale_parallel_threads(ParallelGCThreads); + _max_parallel_marking_threads = _parallel_marking_threads; _sleep_factor = 0.0; _marking_task_overhead = 1.0; } @@ -609,7 +618,7 @@ ConcurrentMark::ConcurrentMark(ReservedSpace rs, guarantee(parallel_marking_threads() > 0, "peace of mind"); _parallel_workers = new FlexibleWorkGang("G1 Parallel Marking Threads", - (int) _parallel_marking_threads, false, true); + (int) _max_parallel_marking_threads, false, true); if (_parallel_workers == NULL) { vm_exit_during_initialization("Failed necessary allocation."); } else { @@ -1106,6 +1115,33 @@ public: ~CMConcurrentMarkingTask() { } }; +// Calculates the number of active workers for a concurrent +// phase. +int ConcurrentMark::calc_parallel_marking_threads() { + + size_t n_conc_workers; + if (!G1CollectedHeap::use_parallel_gc_threads()) { + n_conc_workers = 1; + } else { + if (!UseDynamicNumberOfGCThreads || + (!FLAG_IS_DEFAULT(ConcGCThreads) && + !ForceDynamicNumberOfGCThreads)) { + n_conc_workers = max_parallel_marking_threads(); + } else { + n_conc_workers = + AdaptiveSizePolicy::calc_default_active_workers( + max_parallel_marking_threads(), + 1, /* Minimum workers */ + parallel_marking_threads(), + Threads::number_of_non_daemon_threads()); + // Don't scale down "n_conc_workers" by scale_parallel_threads() because + // that scaling has already gone into "_max_parallel_marking_threads". + } + } + assert(n_conc_workers > 0, "Always need at least 1"); + return (int) MAX2(n_conc_workers, (size_t) 1); +} + void ConcurrentMark::markFromRoots() { // we might be tempted to assert that: // assert(asynch == !SafepointSynchronize::is_at_safepoint(), @@ -1116,9 +1152,20 @@ void ConcurrentMark::markFromRoots() { _restart_for_overflow = false; - size_t active_workers = MAX2((size_t) 1, parallel_marking_threads()); + // Parallel task terminator is set in "set_phase()". force_overflow_conc()->init(); - set_phase(active_workers, true /* concurrent */); + + // _g1h has _n_par_threads + + _parallel_marking_threads = calc_parallel_marking_threads(); + assert(parallel_marking_threads() <= max_parallel_marking_threads(), + "Maximum number of marking threads exceeded"); + _parallel_workers->set_active_workers((int)_parallel_marking_threads); + // Don't set _n_par_threads because it affects MT in proceess_strong_roots() + // and the decisions on that MT processing is made elsewhere. + + assert( _parallel_workers->active_workers() > 0, "Should have been set"); + set_phase(_parallel_workers->active_workers(), true /* concurrent */); CMConcurrentMarkingTask markingTask(this, cmThread()); if (parallel_marking_threads() > 0) { @@ -1181,6 +1228,7 @@ void ConcurrentMark::checkpointRootsFinal(bool clear_all_soft_refs) { true /* expected_active */); if (VerifyDuringGC) { + HandleMark hm; // handle scope gclog_or_tty->print(" VerifyDuringGC:(after)"); Universe::heap()->prepare_for_verify(); @@ -1463,12 +1511,20 @@ public: G1ParFinalCountTask(G1CollectedHeap* g1h, CMBitMap* bm, BitMap* region_bm, BitMap* card_bm) : AbstractGangTask("G1 final counting"), _g1h(g1h), - _bm(bm), _region_bm(region_bm), _card_bm(card_bm) { - if (ParallelGCThreads > 0) { - _n_workers = _g1h->workers()->total_workers(); + _bm(bm), _region_bm(region_bm), _card_bm(card_bm), + _n_workers(0) + { + // Use the value already set as the number of active threads + // in the call to run_task(). Needed for the allocation of + // _live_bytes and _used_bytes. + if (G1CollectedHeap::use_parallel_gc_threads()) { + assert( _g1h->workers()->active_workers() > 0, + "Should have been previously set"); + _n_workers = _g1h->workers()->active_workers(); } else { _n_workers = 1; } + _live_bytes = NEW_C_HEAP_ARRAY(size_t, _n_workers); _used_bytes = NEW_C_HEAP_ARRAY(size_t, _n_workers); } @@ -1485,6 +1541,7 @@ public: calccl.no_yield(); if (G1CollectedHeap::use_parallel_gc_threads()) { _g1h->heap_region_par_iterate_chunked(&calccl, i, + (int) _n_workers, HeapRegion::FinalCountClaimValue); } else { _g1h->heap_region_iterate(&calccl); @@ -1600,6 +1657,7 @@ public: &hrrs_cleanup_task); if (G1CollectedHeap::use_parallel_gc_threads()) { _g1h->heap_region_par_iterate_chunked(&g1_note_end, i, + _g1h->workers()->active_workers(), HeapRegion::NoteEndClaimValue); } else { _g1h->heap_region_iterate(&g1_note_end); @@ -1707,6 +1765,9 @@ void ConcurrentMark::cleanup() { HeapRegionRemSet::reset_for_cleanup_tasks(); + g1h->set_par_threads(); + size_t n_workers = g1h->n_par_threads(); + // Do counting once more with the world stopped for good measure. G1ParFinalCountTask g1_par_count_task(g1h, nextMarkBitMap(), &_region_bm, &_card_bm); @@ -1715,9 +1776,10 @@ void ConcurrentMark::cleanup() { HeapRegion::InitialClaimValue), "sanity check"); - int n_workers = g1h->workers()->total_workers(); - g1h->set_par_threads(n_workers); + assert(g1h->n_par_threads() == (int) n_workers, + "Should not have been reset"); g1h->workers()->run_task(&g1_par_count_task); + // Done with the parallel phase so reset to 0. g1h->set_par_threads(0); assert(g1h->check_heap_region_claim_values( @@ -1767,8 +1829,7 @@ void ConcurrentMark::cleanup() { double note_end_start = os::elapsedTime(); G1ParNoteEndTask g1_par_note_end_task(g1h, &_cleanup_list); if (G1CollectedHeap::use_parallel_gc_threads()) { - int n_workers = g1h->workers()->total_workers(); - g1h->set_par_threads(n_workers); + g1h->set_par_threads((int)n_workers); g1h->workers()->run_task(&g1_par_note_end_task); g1h->set_par_threads(0); @@ -1797,8 +1858,7 @@ void ConcurrentMark::cleanup() { double rs_scrub_start = os::elapsedTime(); G1ParScrubRemSetTask g1_par_scrub_rs_task(g1h, &_region_bm, &_card_bm); if (G1CollectedHeap::use_parallel_gc_threads()) { - int n_workers = g1h->workers()->total_workers(); - g1h->set_par_threads(n_workers); + g1h->set_par_threads((int)n_workers); g1h->workers()->run_task(&g1_par_scrub_rs_task); g1h->set_par_threads(0); @@ -1816,7 +1876,7 @@ void ConcurrentMark::cleanup() { // this will also free any regions totally full of garbage objects, // and sort the regions. - g1h->g1_policy()->record_concurrent_mark_cleanup_end(); + g1h->g1_policy()->record_concurrent_mark_cleanup_end((int)n_workers); // Statistics. double end = os::elapsedTime(); @@ -2187,7 +2247,7 @@ void ConcurrentMark::weakRefsWork(bool clear_all_soft_refs) { // We use the work gang from the G1CollectedHeap and we utilize all // the worker threads. - int active_workers = g1h->workers() ? g1h->workers()->total_workers() : 1; + int active_workers = g1h->workers() ? g1h->workers()->active_workers() : 1; active_workers = MAX2(MIN2(active_workers, (int)_max_task_num), 1); G1CMRefProcTaskExecutor par_task_executor(g1h, this, @@ -2270,7 +2330,9 @@ public: } CMRemarkTask(ConcurrentMark* cm) : - AbstractGangTask("Par Remark"), _cm(cm) { } + AbstractGangTask("Par Remark"), _cm(cm) { + _cm->terminator()->reset_for_reuse(cm->_g1h->workers()->active_workers()); + } }; void ConcurrentMark::checkpointRootsFinalWork() { @@ -2282,16 +2344,21 @@ void ConcurrentMark::checkpointRootsFinalWork() { if (G1CollectedHeap::use_parallel_gc_threads()) { G1CollectedHeap::StrongRootsScope srs(g1h); - // this is remark, so we'll use up all available threads - int active_workers = ParallelGCThreads; + // this is remark, so we'll use up all active threads + int active_workers = g1h->workers()->active_workers(); + if (active_workers == 0) { + assert(active_workers > 0, "Should have been set earlier"); + active_workers = ParallelGCThreads; + g1h->workers()->set_active_workers(active_workers); + } set_phase(active_workers, false /* concurrent */); + // Leave _parallel_marking_threads at it's + // value originally calculated in the ConcurrentMark + // constructor and pass values of the active workers + // through the gang in the task. CMRemarkTask remarkTask(this); - // We will start all available threads, even if we decide that the - // active_workers will be fewer. The extra ones will just bail out - // immediately. - int n_workers = g1h->workers()->total_workers(); - g1h->set_par_threads(n_workers); + g1h->set_par_threads(active_workers); g1h->workers()->run_task(&remarkTask); g1h->set_par_threads(0); } else { diff --git a/hotspot/src/share/vm/gc_implementation/g1/concurrentMark.hpp b/hotspot/src/share/vm/gc_implementation/g1/concurrentMark.hpp index c724594f4af..ff8b39e8b09 100644 --- a/hotspot/src/share/vm/gc_implementation/g1/concurrentMark.hpp +++ b/hotspot/src/share/vm/gc_implementation/g1/concurrentMark.hpp @@ -375,7 +375,9 @@ protected: ConcurrentMarkThread* _cmThread; // the thread doing the work G1CollectedHeap* _g1h; // the heap. size_t _parallel_marking_threads; // the number of marking - // threads we'll use + // threads we're use + size_t _max_parallel_marking_threads; // max number of marking + // threads we'll ever use double _sleep_factor; // how much we have to sleep, with // respect to the work we just did, to // meet the marking overhead goal @@ -473,7 +475,7 @@ protected: double* _accum_task_vtime; // accumulated task vtime - WorkGang* _parallel_workers; + FlexibleWorkGang* _parallel_workers; ForceOverflowSettings _force_overflow_conc; ForceOverflowSettings _force_overflow_stw; @@ -504,6 +506,7 @@ protected: // accessor methods size_t parallel_marking_threads() { return _parallel_marking_threads; } + size_t max_parallel_marking_threads() { return _max_parallel_marking_threads;} double sleep_factor() { return _sleep_factor; } double marking_task_overhead() { return _marking_task_overhead;} double cleanup_sleep_factor() { return _cleanup_sleep_factor; } @@ -709,6 +712,14 @@ public: CMBitMapRO* prevMarkBitMap() const { return _prevMarkBitMap; } CMBitMap* nextMarkBitMap() const { return _nextMarkBitMap; } + // Returns the number of GC threads to be used in a concurrent + // phase based on the number of GC threads being used in a STW + // phase. + size_t scale_parallel_threads(size_t n_par_threads); + + // Calculates the number of GC threads to be used in a concurrent phase. + int calc_parallel_marking_threads(); + // The following three are interaction between CM and // G1CollectedHeap diff --git a/hotspot/src/share/vm/gc_implementation/g1/g1CollectedHeap.cpp b/hotspot/src/share/vm/gc_implementation/g1/g1CollectedHeap.cpp index a1a8e04059d..b0861f8e97c 100644 --- a/hotspot/src/share/vm/gc_implementation/g1/g1CollectedHeap.cpp +++ b/hotspot/src/share/vm/gc_implementation/g1/g1CollectedHeap.cpp @@ -66,6 +66,18 @@ size_t G1CollectedHeap::_humongous_object_threshold_in_words = 0; // apply to TLAB allocation, which is not part of this interface: it // is done by clients of this interface.) +// Notes on implementation of parallelism in different tasks. +// +// G1ParVerifyTask uses heap_region_par_iterate_chunked() for parallelism. +// The number of GC workers is passed to heap_region_par_iterate_chunked(). +// It does use run_task() which sets _n_workers in the task. +// G1ParTask executes g1_process_strong_roots() -> +// SharedHeap::process_strong_roots() which calls eventuall to +// CardTableModRefBS::par_non_clean_card_iterate_work() which uses +// SequentialSubTasksDone. SharedHeap::process_strong_roots() also +// directly uses SubTasksDone (_process_strong_tasks field in SharedHeap). +// + // Local to this file. class RefineCardTableEntryClosure: public CardTableEntryClosure { @@ -1156,6 +1168,7 @@ public: void work(int i) { RebuildRSOutOfRegionClosure rebuild_rs(_g1, i); _g1->heap_region_par_iterate_chunked(&rebuild_rs, i, + _g1->workers()->active_workers(), HeapRegion::RebuildRSClaimValue); } }; @@ -1360,12 +1373,32 @@ bool G1CollectedHeap::do_collection(bool explicit_gc, } // Rebuild remembered sets of all regions. - if (G1CollectedHeap::use_parallel_gc_threads()) { + int n_workers = + AdaptiveSizePolicy::calc_active_workers(workers()->total_workers(), + workers()->active_workers(), + Threads::number_of_non_daemon_threads()); + assert(UseDynamicNumberOfGCThreads || + n_workers == workers()->total_workers(), + "If not dynamic should be using all the workers"); + workers()->set_active_workers(n_workers); + // Set parallel threads in the heap (_n_par_threads) only + // before a parallel phase and always reset it to 0 after + // the phase so that the number of parallel threads does + // no get carried forward to a serial phase where there + // may be code that is "possibly_parallel". + set_par_threads(n_workers); + ParRebuildRSTask rebuild_rs_task(this); assert(check_heap_region_claim_values( HeapRegion::InitialClaimValue), "sanity check"); - set_par_threads(workers()->total_workers()); + assert(UseDynamicNumberOfGCThreads || + workers()->active_workers() == workers()->total_workers(), + "Unless dynamic should use total workers"); + // Use the most recent number of active workers + assert(workers()->active_workers() > 0, + "Active workers not properly set"); + set_par_threads(workers()->active_workers()); workers()->run_task(&rebuild_rs_task); set_par_threads(0); assert(check_heap_region_claim_values( @@ -2477,11 +2510,17 @@ void G1CollectedHeap::heap_region_iterate_from(HeapRegion* r, void G1CollectedHeap::heap_region_par_iterate_chunked(HeapRegionClosure* cl, int worker, + int no_of_par_workers, jint claim_value) { const size_t regions = n_regions(); - const size_t worker_num = (G1CollectedHeap::use_parallel_gc_threads() ? ParallelGCThreads : 1); + const size_t max_workers = (G1CollectedHeap::use_parallel_gc_threads() ? + no_of_par_workers : + 1); + assert(UseDynamicNumberOfGCThreads || + no_of_par_workers == workers()->total_workers(), + "Non dynamic should use fixed number of workers"); // try to spread out the starting points of the workers - const size_t start_index = regions / worker_num * (size_t) worker; + const size_t start_index = regions / max_workers * (size_t) worker; // each worker will actually look at all regions for (size_t count = 0; count < regions; ++count) { @@ -2920,6 +2959,7 @@ public: HandleMark hm; VerifyRegionClosure blk(_allow_dirty, true, _vo); _g1h->heap_region_par_iterate_chunked(&blk, worker_i, + _g1h->workers()->active_workers(), HeapRegion::ParVerifyClaimValue); if (blk.failures()) { _failures = true; @@ -2937,6 +2977,10 @@ void G1CollectedHeap::verify(bool allow_dirty, if (SafepointSynchronize::is_at_safepoint() || ! UseTLAB) { if (!silent) { gclog_or_tty->print("Roots (excluding permgen) "); } VerifyRootsClosure rootsCl(vo); + + assert(Thread::current()->is_VM_thread(), + "Expected to be executed serially by the VM thread at this point"); + CodeBlobToOopClosure blobsCl(&rootsCl, /*do_marking=*/ false); // We apply the relevant closures to all the oops in the @@ -2981,7 +3025,10 @@ void G1CollectedHeap::verify(bool allow_dirty, "sanity check"); G1ParVerifyTask task(this, allow_dirty, vo); - int n_workers = workers()->total_workers(); + assert(UseDynamicNumberOfGCThreads || + workers()->active_workers() == workers()->total_workers(), + "If not dynamic should be using all the workers"); + int n_workers = workers()->active_workers(); set_par_threads(n_workers); workers()->run_task(&task); set_par_threads(0); @@ -2989,6 +3036,8 @@ void G1CollectedHeap::verify(bool allow_dirty, failures = true; } + // Checks that the expected amount of parallel work was done. + // The implication is that n_workers is > 0. assert(check_heap_region_claim_values(HeapRegion::ParVerifyClaimValue), "sanity check"); @@ -3402,6 +3451,10 @@ G1CollectedHeap::do_collection_pause_at_safepoint(double target_pause_time_ms) { assert(check_young_list_well_formed(), "young list should be well formed"); + // Don't dynamically change the number of GC threads this early. A value of + // 0 is used to indicate serial work. When parallel work is done, + // it will be set. + { // Call to jvmpi::post_class_unload_events must occur outside of active GC IsGCActiveMark x; @@ -3615,7 +3668,8 @@ G1CollectedHeap::do_collection_pause_at_safepoint(double target_pause_time_ms) { double end_time_sec = os::elapsedTime(); double pause_time_ms = (end_time_sec - start_time_sec) * MILLIUNITS; g1_policy()->record_pause_time_ms(pause_time_ms); - g1_policy()->record_collection_pause_end(); + int active_gc_threads = workers()->active_workers(); + g1_policy()->record_collection_pause_end(active_gc_threads); MemoryService::track_memory_usage(); @@ -4562,13 +4616,13 @@ protected: } public: - G1ParTask(G1CollectedHeap* g1h, int workers, RefToScanQueueSet *task_queues) + G1ParTask(G1CollectedHeap* g1h, + RefToScanQueueSet *task_queues) : AbstractGangTask("G1 collection"), _g1h(g1h), _queues(task_queues), - _terminator(workers, _queues), - _stats_lock(Mutex::leaf, "parallel G1 stats lock", true), - _n_workers(workers) + _terminator(0, _queues), + _stats_lock(Mutex::leaf, "parallel G1 stats lock", true) {} RefToScanQueueSet* queues() { return _queues; } @@ -4577,6 +4631,20 @@ public: return queues()->queue(i); } + ParallelTaskTerminator* terminator() { return &_terminator; } + + virtual void set_for_termination(int active_workers) { + // This task calls set_n_termination() in par_non_clean_card_iterate_work() + // in the young space (_par_seq_tasks) in the G1 heap + // for SequentialSubTasksDone. + // This task also uses SubTasksDone in SharedHeap and G1CollectedHeap + // both of which need setting by set_n_termination(). + _g1h->SharedHeap::set_n_termination(active_workers); + _g1h->set_n_termination(active_workers); + terminator()->reset_for_reuse(active_workers); + _n_workers = active_workers; + } + void work(int i) { if (i >= _n_workers) return; // no work needed this round @@ -4861,12 +4929,12 @@ class G1STWRefProcTaskExecutor: public AbstractRefProcTaskExecutor { private: G1CollectedHeap* _g1h; RefToScanQueueSet* _queues; - WorkGang* _workers; + FlexibleWorkGang* _workers; int _active_workers; public: G1STWRefProcTaskExecutor(G1CollectedHeap* g1h, - WorkGang* workers, + FlexibleWorkGang* workers, RefToScanQueueSet *task_queues, int n_workers) : _g1h(g1h), @@ -5122,11 +5190,13 @@ void G1CollectedHeap::process_discovered_references() { // referents points to another object which is also referenced by an // object discovered by the STW ref processor. - int n_workers = (G1CollectedHeap::use_parallel_gc_threads() ? - workers()->total_workers() : 1); + int active_workers = (G1CollectedHeap::use_parallel_gc_threads() ? + workers()->active_workers() : 1); - set_par_threads(n_workers); - G1ParPreserveCMReferentsTask keep_cm_referents(this, n_workers, _task_queues); + assert(active_workers == workers()->active_workers(), + "Need to reset active_workers"); + set_par_threads(active_workers); + G1ParPreserveCMReferentsTask keep_cm_referents(this, active_workers, _task_queues); if (G1CollectedHeap::use_parallel_gc_threads()) { workers()->run_task(&keep_cm_referents); @@ -5192,7 +5262,6 @@ void G1CollectedHeap::process_discovered_references() { NULL); } else { // Parallel reference processing - int active_workers = (ParallelGCThreads > 0 ? workers()->total_workers() : 1); assert(rp->num_q() == active_workers, "sanity"); assert(active_workers <= rp->max_num_q(), "sanity"); @@ -5225,7 +5294,9 @@ void G1CollectedHeap::enqueue_discovered_references() { } else { // Parallel reference enqueuing - int active_workers = (ParallelGCThreads > 0 ? workers()->total_workers() : 1); + int active_workers = (ParallelGCThreads > 0 ? workers()->active_workers() : 1); + assert(active_workers == workers()->active_workers(), + "Need to reset active_workers"); assert(rp->num_q() == active_workers, "sanity"); assert(active_workers <= rp->max_num_q(), "sanity"); @@ -5252,9 +5323,24 @@ void G1CollectedHeap::evacuate_collection_set() { concurrent_g1_refine()->set_use_cache(false); concurrent_g1_refine()->clear_hot_cache_claimed_index(); - int n_workers = (ParallelGCThreads > 0 ? workers()->total_workers() : 1); - set_par_threads(n_workers); - G1ParTask g1_par_task(this, n_workers, _task_queues); + int n_workers; + if (G1CollectedHeap::use_parallel_gc_threads()) { + n_workers = + AdaptiveSizePolicy::calc_active_workers(workers()->total_workers(), + workers()->active_workers(), + Threads::number_of_non_daemon_threads()); + assert(UseDynamicNumberOfGCThreads || + n_workers == workers()->total_workers(), + "If not dynamic should be using all the workers"); + set_par_threads(n_workers); + } else { + assert(n_par_threads() == 0, + "Should be the original non-parallel value"); + n_workers = 1; + } + workers()->set_active_workers(n_workers); + + G1ParTask g1_par_task(this, _task_queues); init_for_evac_failure(NULL); @@ -5267,6 +5353,10 @@ void G1CollectedHeap::evacuate_collection_set() { // The individual threads will set their evac-failure closures. StrongRootsScope srs(this); if (ParallelGCVerbose) G1ParScanThreadState::print_termination_stats_hdr(); + // These tasks use ShareHeap::_process_strong_tasks + assert(UseDynamicNumberOfGCThreads || + workers()->active_workers() == workers()->total_workers(), + "If not dynamic should be using all the workers"); workers()->run_task(&g1_par_task); } else { StrongRootsScope srs(this); @@ -5275,6 +5365,7 @@ void G1CollectedHeap::evacuate_collection_set() { double par_time = (os::elapsedTime() - start_par) * 1000.0; g1_policy()->record_par_time(par_time); + set_par_threads(0); // Process any discovered reference objects - we have @@ -5905,6 +5996,21 @@ HeapRegion* MutatorAllocRegion::allocate_new_region(size_t word_size, return _g1h->new_mutator_alloc_region(word_size, force); } +void G1CollectedHeap::set_par_threads() { + // Don't change the number of workers. Use the value previously set + // in the workgroup. + int n_workers = workers()->active_workers(); + assert(UseDynamicNumberOfGCThreads || + n_workers == workers()->total_workers(), + "Otherwise should be using the total number of workers"); + if (n_workers == 0) { + assert(false, "Should have been set in prior evacuation pause."); + n_workers = ParallelGCThreads; + workers()->set_active_workers(n_workers); + } + set_par_threads(n_workers); +} + void MutatorAllocRegion::retire_region(HeapRegion* alloc_region, size_t allocated_bytes) { _g1h->retire_mutator_alloc_region(alloc_region, allocated_bytes); diff --git a/hotspot/src/share/vm/gc_implementation/g1/g1CollectedHeap.hpp b/hotspot/src/share/vm/gc_implementation/g1/g1CollectedHeap.hpp index 1dc3ff166b9..c6707511822 100644 --- a/hotspot/src/share/vm/gc_implementation/g1/g1CollectedHeap.hpp +++ b/hotspot/src/share/vm/gc_implementation/g1/g1CollectedHeap.hpp @@ -987,6 +987,16 @@ public: void set_par_threads(int t) { SharedHeap::set_par_threads(t); + // Done in SharedHeap but oddly there are + // two _process_strong_tasks's in a G1CollectedHeap + // so do it here too. + _process_strong_tasks->set_n_threads(t); + } + + // Set _n_par_threads according to a policy TBD. + void set_par_threads(); + + void set_n_termination(int t) { _process_strong_tasks->set_n_threads(t); } @@ -1276,6 +1286,7 @@ public: // i.e., that a closure never attempt to abort a traversal. void heap_region_par_iterate_chunked(HeapRegionClosure* blk, int worker, + int no_of_par_workers, jint claim_value); // It resets all the region claim values to the default. diff --git a/hotspot/src/share/vm/gc_implementation/g1/g1CollectorPolicy.cpp b/hotspot/src/share/vm/gc_implementation/g1/g1CollectorPolicy.cpp index 2182983a870..1491dd4d256 100644 --- a/hotspot/src/share/vm/gc_implementation/g1/g1CollectorPolicy.cpp +++ b/hotspot/src/share/vm/gc_implementation/g1/g1CollectorPolicy.cpp @@ -1024,7 +1024,7 @@ void G1CollectorPolicy::print_par_stats(int level, double total = 0.0; LineBuffer buf(level); buf.append("[%s (ms):", str); - for (uint i = 0; i < ParallelGCThreads; ++i) { + for (uint i = 0; i < no_of_gc_threads(); ++i) { double val = data[i]; if (val < min) min = val; @@ -1034,7 +1034,7 @@ void G1CollectorPolicy::print_par_stats(int level, buf.append(" %3.1lf", val); } buf.append_and_print_cr(""); - double avg = total / (double) ParallelGCThreads; + double avg = total / (double) no_of_gc_threads(); buf.append_and_print_cr(" Avg: %5.1lf, Min: %5.1lf, Max: %5.1lf, Diff: %5.1lf]", avg, min, max, max - min); } @@ -1046,7 +1046,7 @@ void G1CollectorPolicy::print_par_sizes(int level, double total = 0.0; LineBuffer buf(level); buf.append("[%s :", str); - for (uint i = 0; i < ParallelGCThreads; ++i) { + for (uint i = 0; i < no_of_gc_threads(); ++i) { double val = data[i]; if (val < min) min = val; @@ -1056,7 +1056,7 @@ void G1CollectorPolicy::print_par_sizes(int level, buf.append(" %d", (int) val); } buf.append_and_print_cr(""); - double avg = total / (double) ParallelGCThreads; + double avg = total / (double) no_of_gc_threads(); buf.append_and_print_cr(" Sum: %d, Avg: %d, Min: %d, Max: %d, Diff: %d]", (int)total, (int)avg, (int)min, (int)max, (int)max - (int)min); } @@ -1076,10 +1076,10 @@ void G1CollectorPolicy::print_stats(int level, double G1CollectorPolicy::avg_value(double* data) { if (G1CollectedHeap::use_parallel_gc_threads()) { double ret = 0.0; - for (uint i = 0; i < ParallelGCThreads; ++i) { + for (uint i = 0; i < no_of_gc_threads(); ++i) { ret += data[i]; } - return ret / (double) ParallelGCThreads; + return ret / (double) no_of_gc_threads(); } else { return data[0]; } @@ -1088,7 +1088,7 @@ double G1CollectorPolicy::avg_value(double* data) { double G1CollectorPolicy::max_value(double* data) { if (G1CollectedHeap::use_parallel_gc_threads()) { double ret = data[0]; - for (uint i = 1; i < ParallelGCThreads; ++i) { + for (uint i = 1; i < no_of_gc_threads(); ++i) { if (data[i] > ret) { ret = data[i]; } @@ -1102,7 +1102,7 @@ double G1CollectorPolicy::max_value(double* data) { double G1CollectorPolicy::sum_of_values(double* data) { if (G1CollectedHeap::use_parallel_gc_threads()) { double sum = 0.0; - for (uint i = 0; i < ParallelGCThreads; i++) { + for (uint i = 0; i < no_of_gc_threads(); i++) { sum += data[i]; } return sum; @@ -1115,7 +1115,7 @@ double G1CollectorPolicy::max_sum(double* data1, double* data2) { double ret = data1[0] + data2[0]; if (G1CollectedHeap::use_parallel_gc_threads()) { - for (uint i = 1; i < ParallelGCThreads; ++i) { + for (uint i = 1; i < no_of_gc_threads(); ++i) { double data = data1[i] + data2[i]; if (data > ret) { ret = data; @@ -1128,7 +1128,7 @@ double G1CollectorPolicy::max_sum(double* data1, double* data2) { // Anything below that is considered to be zero #define MIN_TIMER_GRANULARITY 0.0000001 -void G1CollectorPolicy::record_collection_pause_end() { +void G1CollectorPolicy::record_collection_pause_end(int no_of_gc_threads) { double end_time_sec = os::elapsedTime(); double elapsed_ms = _last_pause_time_ms; bool parallel = G1CollectedHeap::use_parallel_gc_threads(); @@ -1140,6 +1140,7 @@ void G1CollectorPolicy::record_collection_pause_end() { assert(cur_used_bytes == _g1->recalculate_used(), "It should!"); bool last_pause_included_initial_mark = false; bool update_stats = !_g1->evacuation_failed(); + set_no_of_gc_threads(no_of_gc_threads); #ifndef PRODUCT if (G1YoungSurvRateVerbose) { @@ -2304,6 +2305,7 @@ public: ParKnownGarbageHRClosure parKnownGarbageCl(_hrSorted, _chunk_size, i); // Back to zero for the claim value. _g1->heap_region_par_iterate_chunked(&parKnownGarbageCl, i, + _g1->workers()->active_workers(), HeapRegion::InitialClaimValue); jint regions_added = parKnownGarbageCl.marked_regions_added(); _hrSorted->incNumMarkedHeapRegions(regions_added); @@ -2315,7 +2317,7 @@ public: }; void -G1CollectorPolicy::record_concurrent_mark_cleanup_end() { +G1CollectorPolicy::record_concurrent_mark_cleanup_end(int no_of_gc_threads) { double start_sec; if (G1PrintParCleanupStats) { start_sec = os::elapsedTime(); @@ -2331,10 +2333,27 @@ G1CollectorPolicy::record_concurrent_mark_cleanup_end() { if (G1CollectedHeap::use_parallel_gc_threads()) { const size_t OverpartitionFactor = 4; - const size_t MinWorkUnit = 8; - const size_t WorkUnit = - MAX2(_g1->n_regions() / (ParallelGCThreads * OverpartitionFactor), - MinWorkUnit); + size_t WorkUnit; + // The use of MinChunkSize = 8 in the original code + // causes some assertion failures when the total number of + // region is less than 8. The code here tries to fix that. + // Should the original code also be fixed? + if (no_of_gc_threads > 0) { + const size_t MinWorkUnit = + MAX2(_g1->n_regions() / no_of_gc_threads, (size_t) 1U); + WorkUnit = + MAX2(_g1->n_regions() / (no_of_gc_threads * OverpartitionFactor), + MinWorkUnit); + } else { + assert(no_of_gc_threads > 0, + "The active gc workers should be greater than 0"); + // In a product build do something reasonable to avoid a crash. + const size_t MinWorkUnit = + MAX2(_g1->n_regions() / ParallelGCThreads, (size_t) 1U); + WorkUnit = + MAX2(_g1->n_regions() / (ParallelGCThreads * OverpartitionFactor), + MinWorkUnit); + } _collectionSetChooser->prepareForAddMarkedHeapRegionsPar(_g1->n_regions(), WorkUnit); ParKnownGarbageTask parKnownGarbageTask(_collectionSetChooser, diff --git a/hotspot/src/share/vm/gc_implementation/g1/g1CollectorPolicy.hpp b/hotspot/src/share/vm/gc_implementation/g1/g1CollectorPolicy.hpp index b339dfb863a..579384ba649 100644 --- a/hotspot/src/share/vm/gc_implementation/g1/g1CollectorPolicy.hpp +++ b/hotspot/src/share/vm/gc_implementation/g1/g1CollectorPolicy.hpp @@ -89,6 +89,9 @@ private: // has been set, or 1 otherwise int _parallel_gc_threads; + // The number of GC threads currently active. + uintx _no_of_gc_threads; + enum SomePrivateConstants { NumPrevPausesForHeuristics = 10 }; @@ -280,6 +283,9 @@ private: double update_rs_processed_buffers, double goal_ms); + uintx no_of_gc_threads() { return _no_of_gc_threads; } + void set_no_of_gc_threads(uintx v) { _no_of_gc_threads = v; } + double _pause_time_target_ms; double _recorded_young_cset_choice_time_ms; double _recorded_non_young_cset_choice_time_ms; @@ -287,6 +293,7 @@ private: size_t _max_pending_cards; public: + // Accessors void set_region_eden(HeapRegion* hr, int young_index_in_cset) { hr->set_young(); @@ -737,13 +744,13 @@ public: void record_concurrent_mark_remark_end(); void record_concurrent_mark_cleanup_start(); - void record_concurrent_mark_cleanup_end(); + void record_concurrent_mark_cleanup_end(int no_of_gc_threads); void record_concurrent_mark_cleanup_completed(); void record_concurrent_pause(); void record_concurrent_pause_end(); - void record_collection_pause_end(); + void record_collection_pause_end(int no_of_gc_threads); void print_heap_transition(); // Record the fact that a full collection occurred. diff --git a/hotspot/src/share/vm/gc_implementation/g1/g1RemSet.cpp b/hotspot/src/share/vm/gc_implementation/g1/g1RemSet.cpp index e8e5eebd739..dd644efbe08 100644 --- a/hotspot/src/share/vm/gc_implementation/g1/g1RemSet.cpp +++ b/hotspot/src/share/vm/gc_implementation/g1/g1RemSet.cpp @@ -218,7 +218,7 @@ public: HeapRegion* G1RemSet::calculateStartRegion(int worker_i) { HeapRegion* result = _g1p->collection_set(); - if (ParallelGCThreads > 0) { + if (G1CollectedHeap::use_parallel_gc_threads()) { size_t cs_size = _g1p->cset_region_length(); int n_workers = _g1->workers()->total_workers(); size_t cs_spans = cs_size / n_workers; @@ -430,8 +430,10 @@ void G1RemSet::prepare_for_oops_into_collection_set_do() { DirtyCardQueueSet& dcqs = JavaThread::dirty_card_queue_set(); dcqs.concatenate_logs(); - if (ParallelGCThreads > 0) { - _seq_task->set_n_threads((int)n_workers()); + if (G1CollectedHeap::use_parallel_gc_threads()) { + // Don't set the number of workers here. It will be set + // when the task is run + // _seq_task->set_n_termination((int)n_workers()); } guarantee( _cards_scanned == NULL, "invariant" ); _cards_scanned = NEW_C_HEAP_ARRAY(size_t, n_workers()); @@ -578,7 +580,10 @@ void G1RemSet::scrub(BitMap* region_bm, BitMap* card_bm) { void G1RemSet::scrub_par(BitMap* region_bm, BitMap* card_bm, int worker_num, int claim_val) { ScrubRSClosure scrub_cl(region_bm, card_bm); - _g1->heap_region_par_iterate_chunked(&scrub_cl, worker_num, claim_val); + _g1->heap_region_par_iterate_chunked(&scrub_cl, + worker_num, + (int) n_workers(), + claim_val); } diff --git a/hotspot/src/share/vm/gc_implementation/parNew/parCardTableModRefBS.cpp b/hotspot/src/share/vm/gc_implementation/parNew/parCardTableModRefBS.cpp index ea75a594419..5155ca934a0 100644 --- a/hotspot/src/share/vm/gc_implementation/parNew/parCardTableModRefBS.cpp +++ b/hotspot/src/share/vm/gc_implementation/parNew/parCardTableModRefBS.cpp @@ -33,6 +33,7 @@ #include "runtime/java.hpp" #include "runtime/mutexLocker.hpp" #include "runtime/virtualspace.hpp" +#include "runtime/vmThread.hpp" void CardTableModRefBS::non_clean_card_iterate_parallel_work(Space* sp, MemRegion mr, OopsInGenClosure* cl, @@ -42,6 +43,11 @@ void CardTableModRefBS::non_clean_card_iterate_parallel_work(Space* sp, MemRegio assert((n_threads == 1 && ParallelGCThreads == 0) || n_threads <= (int)ParallelGCThreads, "# worker threads != # requested!"); + assert(!Thread::current()->is_VM_thread() || (n_threads == 1), "There is only 1 VM thread"); + assert(UseDynamicNumberOfGCThreads || + !FLAG_IS_DEFAULT(ParallelGCThreads) || + n_threads == (int)ParallelGCThreads, + "# worker threads != # requested!"); // Make sure the LNC array is valid for the space. jbyte** lowest_non_clean; uintptr_t lowest_non_clean_base_chunk_index; @@ -52,6 +58,8 @@ void CardTableModRefBS::non_clean_card_iterate_parallel_work(Space* sp, MemRegio int n_strides = n_threads * ParGCStridesPerThread; SequentialSubTasksDone* pst = sp->par_seq_tasks(); + // Sets the condition for completion of the subtask (how many threads + // need to finish in order to be done). pst->set_n_threads(n_threads); pst->set_n_tasks(n_strides); diff --git a/hotspot/src/share/vm/gc_implementation/parNew/parNewGeneration.cpp b/hotspot/src/share/vm/gc_implementation/parNew/parNewGeneration.cpp index 578a4e0a8dc..1c20f87c126 100644 --- a/hotspot/src/share/vm/gc_implementation/parNew/parNewGeneration.cpp +++ b/hotspot/src/share/vm/gc_implementation/parNew/parNewGeneration.cpp @@ -305,7 +305,7 @@ public: inline ParScanThreadState& thread_state(int i); - void reset(bool promotion_failed); + void reset(int active_workers, bool promotion_failed); void flush(); #if TASKQUEUE_STATS @@ -322,6 +322,9 @@ private: ParallelTaskTerminator& _term; ParNewGeneration& _gen; Generation& _next_gen; + public: + bool is_valid(int id) const { return id < length(); } + ParallelTaskTerminator* terminator() { return &_term; } }; @@ -351,9 +354,9 @@ inline ParScanThreadState& ParScanThreadStateSet::thread_state(int i) } -void ParScanThreadStateSet::reset(bool promotion_failed) +void ParScanThreadStateSet::reset(int active_threads, bool promotion_failed) { - _term.reset_for_reuse(); + _term.reset_for_reuse(active_threads); if (promotion_failed) { for (int i = 0; i < length(); ++i) { thread_state(i).print_and_clear_promotion_failure_size(); @@ -569,6 +572,24 @@ ParNewGenTask::ParNewGenTask(ParNewGeneration* gen, Generation* next_gen, _state_set(state_set) {} +// Reset the terminator for the given number of +// active threads. +void ParNewGenTask::set_for_termination(int active_workers) { + _state_set->reset(active_workers, _gen->promotion_failed()); + // Should the heap be passed in? There's only 1 for now so + // grab it instead. + GenCollectedHeap* gch = GenCollectedHeap::heap(); + gch->set_n_termination(active_workers); +} + +// The "i" passed to this method is the part of the work for +// this thread. It is not the worker ID. The "i" is derived +// from _started_workers which is incremented in internal_note_start() +// called in GangWorker loop() and which is called under the +// which is called under the protection of the gang monitor and is +// called after a task is started. So "i" is based on +// first-come-first-served. + void ParNewGenTask::work(int i) { GenCollectedHeap* gch = GenCollectedHeap::heap(); // Since this is being done in a separate thread, need new resource @@ -581,6 +602,8 @@ void ParNewGenTask::work(int i) { Generation* old_gen = gch->next_gen(_gen); ParScanThreadState& par_scan_state = _state_set->thread_state(i); + assert(_state_set->is_valid(i), "Should not have been called"); + par_scan_state.set_young_old_boundary(_young_old_boundary); par_scan_state.start_strong_roots(); @@ -733,7 +756,9 @@ public: private: virtual void work(int i); - + virtual void set_for_termination(int active_workers) { + _state_set.terminator()->reset_for_reuse(active_workers); + } private: ParNewGeneration& _gen; ProcessTask& _task; @@ -789,18 +814,20 @@ void ParNewRefProcTaskExecutor::execute(ProcessTask& task) GenCollectedHeap* gch = GenCollectedHeap::heap(); assert(gch->kind() == CollectedHeap::GenCollectedHeap, "not a generational heap"); - WorkGang* workers = gch->workers(); + FlexibleWorkGang* workers = gch->workers(); assert(workers != NULL, "Need parallel worker threads."); + _state_set.reset(workers->active_workers(), _generation.promotion_failed()); ParNewRefProcTaskProxy rp_task(task, _generation, *_generation.next_gen(), _generation.reserved().end(), _state_set); workers->run_task(&rp_task); - _state_set.reset(_generation.promotion_failed()); + _state_set.reset(0 /* bad value in debug if not reset */, + _generation.promotion_failed()); } void ParNewRefProcTaskExecutor::execute(EnqueueTask& task) { GenCollectedHeap* gch = GenCollectedHeap::heap(); - WorkGang* workers = gch->workers(); + FlexibleWorkGang* workers = gch->workers(); assert(workers != NULL, "Need parallel worker threads."); ParNewRefEnqueueTaskProxy enq_task(task); workers->run_task(&enq_task); @@ -856,7 +883,13 @@ void ParNewGeneration::collect(bool full, assert(gch->kind() == CollectedHeap::GenCollectedHeap, "not a CMS generational heap"); AdaptiveSizePolicy* size_policy = gch->gen_policy()->size_policy(); - WorkGang* workers = gch->workers(); + FlexibleWorkGang* workers = gch->workers(); + assert(workers != NULL, "Need workgang for parallel work"); + int active_workers = + AdaptiveSizePolicy::calc_active_workers(workers->total_workers(), + workers->active_workers(), + Threads::number_of_non_daemon_threads()); + workers->set_active_workers(active_workers); _next_gen = gch->next_gen(this); assert(_next_gen != NULL, "This must be the youngest gen, and not the only gen"); @@ -894,13 +927,19 @@ void ParNewGeneration::collect(bool full, gch->save_marks(); assert(workers != NULL, "Need parallel worker threads."); - ParallelTaskTerminator _term(workers->total_workers(), task_queues()); - ParScanThreadStateSet thread_state_set(workers->total_workers(), + int n_workers = active_workers; + + // Set the correct parallelism (number of queues) in the reference processor + ref_processor()->set_active_mt_degree(n_workers); + + // Always set the terminator for the active number of workers + // because only those workers go through the termination protocol. + ParallelTaskTerminator _term(n_workers, task_queues()); + ParScanThreadStateSet thread_state_set(workers->active_workers(), *to(), *this, *_next_gen, *task_queues(), _overflow_stacks, desired_plab_sz(), _term); ParNewGenTask tsk(this, _next_gen, reserved().end(), &thread_state_set); - int n_workers = workers->total_workers(); gch->set_par_threads(n_workers); gch->rem_set()->prepare_for_younger_refs_iterate(true); // It turns out that even when we're using 1 thread, doing the work in a @@ -914,7 +953,8 @@ void ParNewGeneration::collect(bool full, GenCollectedHeap::StrongRootsScope srs(gch); tsk.work(0); } - thread_state_set.reset(promotion_failed()); + thread_state_set.reset(0 /* Bad value in debug if not reset */, + promotion_failed()); // Process (weak) reference objects found during scavenge. ReferenceProcessor* rp = ref_processor(); @@ -927,6 +967,8 @@ void ParNewGeneration::collect(bool full, EvacuateFollowersClosureGeneral evacuate_followers(gch, _level, &scan_without_gc_barrier, &scan_with_gc_barrier); rp->setup_policy(clear_all_soft_refs); + // Can the mt_degree be set later (at run_task() time would be best)? + rp->set_active_mt_degree(active_workers); if (rp->processing_is_mt()) { ParNewRefProcTaskExecutor task_executor(*this, thread_state_set); rp->process_discovered_references(&is_alive, &keep_alive, diff --git a/hotspot/src/share/vm/gc_implementation/parNew/parNewGeneration.hpp b/hotspot/src/share/vm/gc_implementation/parNew/parNewGeneration.hpp index 296eb8254b9..ccd6268a66b 100644 --- a/hotspot/src/share/vm/gc_implementation/parNew/parNewGeneration.hpp +++ b/hotspot/src/share/vm/gc_implementation/parNew/parNewGeneration.hpp @@ -240,6 +240,10 @@ public: HeapWord* young_old_boundary() { return _young_old_boundary; } void work(int i); + + // Reset the terminator in ParScanThreadStateSet for + // "active_workers" threads. + virtual void set_for_termination(int active_workers); }; class KeepAliveClosure: public DefNewGeneration::KeepAliveClosure { diff --git a/hotspot/src/share/vm/gc_implementation/parallelScavenge/cardTableExtension.cpp b/hotspot/src/share/vm/gc_implementation/parallelScavenge/cardTableExtension.cpp index 5b06d3126f6..6e42facf29e 100644 --- a/hotspot/src/share/vm/gc_implementation/parallelScavenge/cardTableExtension.cpp +++ b/hotspot/src/share/vm/gc_implementation/parallelScavenge/cardTableExtension.cpp @@ -223,7 +223,8 @@ void CardTableExtension::scavenge_contents_parallel(ObjectStartArray* start_arra MutableSpace* sp, HeapWord* space_top, PSPromotionManager* pm, - uint stripe_number) { + uint stripe_number, + uint stripe_total) { int ssize = 128; // Naked constant! Work unit = 64k. int dirty_card_count = 0; @@ -231,7 +232,11 @@ void CardTableExtension::scavenge_contents_parallel(ObjectStartArray* start_arra jbyte* start_card = byte_for(sp->bottom()); jbyte* end_card = byte_for(sp_top - 1) + 1; oop* last_scanned = NULL; // Prevent scanning objects more than once - for (jbyte* slice = start_card; slice < end_card; slice += ssize*ParallelGCThreads) { + // The width of the stripe ssize*stripe_total must be + // consistent with the number of stripes so that the complete slice + // is covered. + size_t slice_width = ssize * stripe_total; + for (jbyte* slice = start_card; slice < end_card; slice += slice_width) { jbyte* worker_start_card = slice + stripe_number * ssize; if (worker_start_card >= end_card) return; // We're done. diff --git a/hotspot/src/share/vm/gc_implementation/parallelScavenge/cardTableExtension.hpp b/hotspot/src/share/vm/gc_implementation/parallelScavenge/cardTableExtension.hpp index 00d6673e1b0..27f917d23fd 100644 --- a/hotspot/src/share/vm/gc_implementation/parallelScavenge/cardTableExtension.hpp +++ b/hotspot/src/share/vm/gc_implementation/parallelScavenge/cardTableExtension.hpp @@ -69,7 +69,8 @@ class CardTableExtension : public CardTableModRefBS { MutableSpace* sp, HeapWord* space_top, PSPromotionManager* pm, - uint stripe_number); + uint stripe_number, + uint stripe_total); // Verification static void verify_all_young_refs_imprecise(); diff --git a/hotspot/src/share/vm/gc_implementation/parallelScavenge/gcTaskManager.cpp b/hotspot/src/share/vm/gc_implementation/parallelScavenge/gcTaskManager.cpp index c4c5120410f..33641b6b1ed 100644 --- a/hotspot/src/share/vm/gc_implementation/parallelScavenge/gcTaskManager.cpp +++ b/hotspot/src/share/vm/gc_implementation/parallelScavenge/gcTaskManager.cpp @@ -25,6 +25,7 @@ #include "precompiled.hpp" #include "gc_implementation/parallelScavenge/gcTaskManager.hpp" #include "gc_implementation/parallelScavenge/gcTaskThread.hpp" +#include "gc_implementation/shared/adaptiveSizePolicy.hpp" #include "memory/allocation.hpp" #include "memory/allocation.inline.hpp" #include "runtime/mutex.hpp" @@ -181,6 +182,7 @@ void GCTaskQueue::enqueue(GCTask* task) { } set_insert_end(task); increment_length(); + verify_length(); if (TraceGCTaskQueue) { print("after:"); } @@ -192,7 +194,7 @@ void GCTaskQueue::enqueue(GCTaskQueue* list) { tty->print_cr("[" INTPTR_FORMAT "]" " GCTaskQueue::enqueue(list: " INTPTR_FORMAT ")", - this); + this, list); print("before:"); list->print("list:"); } @@ -211,14 +213,15 @@ void GCTaskQueue::enqueue(GCTaskQueue* list) { list->remove_end()->set_older(insert_end()); insert_end()->set_newer(list->remove_end()); set_insert_end(list->insert_end()); + set_length(length() + list_length); // empty the argument list. } - set_length(length() + list_length); list->initialize(); if (TraceGCTaskQueue) { print("after:"); list->print("list:"); } + verify_length(); } // Dequeue one task. @@ -288,6 +291,7 @@ GCTask* GCTaskQueue::remove() { decrement_length(); assert(result->newer() == NULL, "shouldn't be on queue"); assert(result->older() == NULL, "shouldn't be on queue"); + verify_length(); return result; } @@ -311,22 +315,40 @@ GCTask* GCTaskQueue::remove(GCTask* task) { result->set_newer(NULL); result->set_older(NULL); decrement_length(); + verify_length(); return result; } NOT_PRODUCT( +// Count the elements in the queue and verify the length against +// that count. +void GCTaskQueue::verify_length() const { + uint count = 0; + for (GCTask* element = insert_end(); + element != NULL; + element = element->older()) { + + count++; + } + assert(count == length(), "Length does not match queue"); +} + void GCTaskQueue::print(const char* message) const { tty->print_cr("[" INTPTR_FORMAT "] GCTaskQueue:" " insert_end: " INTPTR_FORMAT " remove_end: " INTPTR_FORMAT + " length: %d" " %s", - this, insert_end(), remove_end(), message); + this, insert_end(), remove_end(), length(), message); + uint count = 0; for (GCTask* element = insert_end(); element != NULL; element = element->older()) { element->print(" "); + count++; tty->cr(); } + tty->print("Total tasks: %d", count); } ) @@ -351,12 +373,16 @@ SynchronizedGCTaskQueue::~SynchronizedGCTaskQueue() { // GCTaskManager::GCTaskManager(uint workers) : _workers(workers), + _active_workers(0), + _idle_workers(0), _ndc(NULL) { initialize(); } GCTaskManager::GCTaskManager(uint workers, NotifyDoneClosure* ndc) : _workers(workers), + _active_workers(0), + _idle_workers(0), _ndc(ndc) { initialize(); } @@ -373,6 +399,7 @@ void GCTaskManager::initialize() { GCTaskQueue* unsynchronized_queue = GCTaskQueue::create_on_c_heap(); _queue = SynchronizedGCTaskQueue::create(unsynchronized_queue, lock()); _noop_task = NoopGCTask::create_on_c_heap(); + _idle_inactive_task = WaitForBarrierGCTask::create_on_c_heap(); _resource_flag = NEW_C_HEAP_ARRAY(bool, workers()); { // Set up worker threads. @@ -418,6 +445,8 @@ GCTaskManager::~GCTaskManager() { assert(queue()->is_empty(), "still have queued work"); NoopGCTask::destroy(_noop_task); _noop_task = NULL; + WaitForBarrierGCTask::destroy(_idle_inactive_task); + _idle_inactive_task = NULL; if (_thread != NULL) { for (uint i = 0; i < workers(); i += 1) { GCTaskThread::destroy(thread(i)); @@ -442,6 +471,86 @@ GCTaskManager::~GCTaskManager() { } } +void GCTaskManager::set_active_gang() { + _active_workers = + AdaptiveSizePolicy::calc_active_workers(workers(), + active_workers(), + Threads::number_of_non_daemon_threads()); + + assert(!all_workers_active() || active_workers() == ParallelGCThreads, + err_msg("all_workers_active() is incorrect: " + "active %d ParallelGCThreads %d", active_workers(), + ParallelGCThreads)); + if (TraceDynamicGCThreads) { + gclog_or_tty->print_cr("GCTaskManager::set_active_gang(): " + "all_workers_active() %d workers %d " + "active %d ParallelGCThreads %d ", + all_workers_active(), workers(), active_workers(), + ParallelGCThreads); + } +} + +// Create IdleGCTasks for inactive workers. +// Creates tasks in a ResourceArea and assumes +// an appropriate ResourceMark. +void GCTaskManager::task_idle_workers() { + { + int more_inactive_workers = 0; + { + // Stop any idle tasks from exiting their IdleGCTask's + // and get the count for additional IdleGCTask's under + // the GCTaskManager's monitor so that the "more_inactive_workers" + // count is correct. + MutexLockerEx ml(monitor(), Mutex::_no_safepoint_check_flag); + _idle_inactive_task->set_should_wait(true); + // active_workers are a number being requested. idle_workers + // are the number currently idle. If all the workers are being + // requested to be active but some are already idle, reduce + // the number of active_workers to be consistent with the + // number of idle_workers. The idle_workers are stuck in + // idle tasks and will no longer be release (since a new GC + // is starting). Try later to release enough idle_workers + // to allow the desired number of active_workers. + more_inactive_workers = + workers() - active_workers() - idle_workers(); + if (more_inactive_workers < 0) { + int reduced_active_workers = active_workers() + more_inactive_workers; + set_active_workers(reduced_active_workers); + more_inactive_workers = 0; + } + if (TraceDynamicGCThreads) { + gclog_or_tty->print_cr("JT: %d workers %d active %d " + "idle %d more %d", + Threads::number_of_non_daemon_threads(), + workers(), + active_workers(), + idle_workers(), + more_inactive_workers); + } + } + GCTaskQueue* q = GCTaskQueue::create(); + for(uint i = 0; i < (uint) more_inactive_workers; i++) { + q->enqueue(IdleGCTask::create_on_c_heap()); + increment_idle_workers(); + } + assert(workers() == active_workers() + idle_workers(), + "total workers should equal active + inactive"); + add_list(q); + // GCTaskQueue* q was created in a ResourceArea so a + // destroy() call is not needed. + } +} + +void GCTaskManager::release_idle_workers() { + { + MutexLockerEx ml(monitor(), + Mutex::_no_safepoint_check_flag); + _idle_inactive_task->set_should_wait(false); + monitor()->notify_all(); + // Release monitor + } +} + void GCTaskManager::print_task_time_stamps() { for(uint i=0; ikind())); tty->print_cr(" %s", result->name()); } - increment_busy_workers(); - increment_delivered_tasks(); + if (!result->is_idle_task()) { + increment_busy_workers(); + increment_delivered_tasks(); + } return result; // Release monitor(). } @@ -622,6 +740,7 @@ uint GCTaskManager::increment_busy_workers() { uint GCTaskManager::decrement_busy_workers() { assert(queue()->own_lock(), "don't own the lock"); + assert(_busy_workers > 0, "About to make a mistake"); _busy_workers -= 1; return _busy_workers; } @@ -643,11 +762,28 @@ void GCTaskManager::note_release(uint which) { set_resource_flag(which, false); } +// "list" contains tasks that are ready to execute. Those +// tasks are added to the GCTaskManager's queue of tasks and +// then the GC workers are notified that there is new work to +// do. +// +// Typically different types of tasks can be added to the "list". +// For example in PSScavenge OldToYoungRootsTask, SerialOldToYoungRootsTask, +// ScavengeRootsTask, and StealTask tasks are all added to the list +// and then the GC workers are notified of new work. The tasks are +// handed out in the order in which they are added to the list +// (although execution is not necessarily in that order). As long +// as any tasks are running the GCTaskManager will wait for execution +// to complete. GC workers that execute a stealing task remain in +// the stealing task until all stealing tasks have completed. The load +// balancing afforded by the stealing tasks work best if the stealing +// tasks are added last to the list. + void GCTaskManager::execute_and_wait(GCTaskQueue* list) { WaitForBarrierGCTask* fin = WaitForBarrierGCTask::create(); list->enqueue(fin); add_list(list); - fin->wait_for(); + fin->wait_for(true /* reset */); // We have to release the barrier tasks! WaitForBarrierGCTask::destroy(fin); } @@ -691,6 +827,72 @@ void NoopGCTask::destruct() { // Nothing else to do. } +// +// IdleGCTask +// + +IdleGCTask* IdleGCTask::create() { + IdleGCTask* result = new IdleGCTask(false); + return result; +} + +IdleGCTask* IdleGCTask::create_on_c_heap() { + IdleGCTask* result = new(ResourceObj::C_HEAP) IdleGCTask(true); + return result; +} + +void IdleGCTask::do_it(GCTaskManager* manager, uint which) { + WaitForBarrierGCTask* wait_for_task = manager->idle_inactive_task(); + if (TraceGCTaskManager) { + tty->print_cr("[" INTPTR_FORMAT "]" + " IdleGCTask:::do_it()" + " should_wait: %s", + this, wait_for_task->should_wait() ? "true" : "false"); + } + MutexLockerEx ml(manager->monitor(), Mutex::_no_safepoint_check_flag); + if (TraceDynamicGCThreads) { + gclog_or_tty->print_cr("--- idle %d", which); + } + // Increment has to be done when the idle tasks are created. + // manager->increment_idle_workers(); + manager->monitor()->notify_all(); + while (wait_for_task->should_wait()) { + if (TraceGCTaskManager) { + tty->print_cr("[" INTPTR_FORMAT "]" + " IdleGCTask::do_it()" + " [" INTPTR_FORMAT "] (%s)->wait()", + this, manager->monitor(), manager->monitor()->name()); + } + manager->monitor()->wait(Mutex::_no_safepoint_check_flag, 0); + } + manager->decrement_idle_workers(); + if (TraceDynamicGCThreads) { + gclog_or_tty->print_cr("--- release %d", which); + } + if (TraceGCTaskManager) { + tty->print_cr("[" INTPTR_FORMAT "]" + " IdleGCTask::do_it() returns" + " should_wait: %s", + this, wait_for_task->should_wait() ? "true" : "false"); + } + // Release monitor(). +} + +void IdleGCTask::destroy(IdleGCTask* that) { + if (that != NULL) { + that->destruct(); + if (that->is_c_heap_obj()) { + FreeHeap(that); + } + } +} + +void IdleGCTask::destruct() { + // This has to know it's superclass structure, just like the constructor. + this->GCTask::destruct(); + // Nothing else to do. +} + // // BarrierGCTask // @@ -768,7 +970,8 @@ WaitForBarrierGCTask* WaitForBarrierGCTask::create() { } WaitForBarrierGCTask* WaitForBarrierGCTask::create_on_c_heap() { - WaitForBarrierGCTask* result = new WaitForBarrierGCTask(true); + WaitForBarrierGCTask* result = + new (ResourceObj::C_HEAP) WaitForBarrierGCTask(true); return result; } @@ -849,7 +1052,7 @@ void WaitForBarrierGCTask::do_it(GCTaskManager* manager, uint which) { } } -void WaitForBarrierGCTask::wait_for() { +void WaitForBarrierGCTask::wait_for(bool reset) { if (TraceGCTaskManager) { tty->print_cr("[" INTPTR_FORMAT "]" " WaitForBarrierGCTask::wait_for()" @@ -869,7 +1072,9 @@ void WaitForBarrierGCTask::wait_for() { monitor()->wait(Mutex::_no_safepoint_check_flag, 0); } // Reset the flag in case someone reuses this task. - set_should_wait(true); + if (reset) { + set_should_wait(true); + } if (TraceGCTaskManager) { tty->print_cr("[" INTPTR_FORMAT "]" " WaitForBarrierGCTask::wait_for() returns" diff --git a/hotspot/src/share/vm/gc_implementation/parallelScavenge/gcTaskManager.hpp b/hotspot/src/share/vm/gc_implementation/parallelScavenge/gcTaskManager.hpp index 3bd3af1e306..65a8458d3b9 100644 --- a/hotspot/src/share/vm/gc_implementation/parallelScavenge/gcTaskManager.hpp +++ b/hotspot/src/share/vm/gc_implementation/parallelScavenge/gcTaskManager.hpp @@ -45,6 +45,7 @@ class BarrierGCTask; class ReleasingBarrierGCTask; class NotifyingBarrierGCTask; class WaitForBarrierGCTask; +class IdleGCTask; // A free list of Monitor*'s. class MonitorSupply; @@ -64,7 +65,8 @@ public: unknown_task, ordinary_task, barrier_task, - noop_task + noop_task, + idle_task }; static const char* to_string(kind value); }; @@ -108,6 +110,9 @@ public: bool is_noop_task() const { return kind()==Kind::noop_task; } + bool is_idle_task() const { + return kind()==Kind::idle_task; + } void print(const char* message) const PRODUCT_RETURN; protected: // Constructors: Only create subclasses. @@ -153,6 +158,7 @@ public: assert(((insert_end() == NULL && remove_end() == NULL) || (insert_end() != NULL && remove_end() != NULL)), "insert_end and remove_end don't match"); + assert((insert_end() != NULL) || (_length == 0), "Not empty"); return insert_end() == NULL; } uint length() const { @@ -204,6 +210,8 @@ protected: GCTask* remove(); // Remove from remove end. GCTask* remove(GCTask* task); // Remove from the middle. void print(const char* message) const PRODUCT_RETURN; + // Debug support + void verify_length() const PRODUCT_RETURN; }; // A GCTaskQueue that can be synchronized. @@ -285,12 +293,76 @@ protected: } }; +// Dynamic number of GC threads +// +// GC threads wait in get_task() for work (i.e., a task) to perform. +// When the number of GC threads was static, the number of tasks +// created to do a job was equal to or greater than the maximum +// number of GC threads (ParallelGCThreads). The job might be divided +// into a number of tasks greater than the number of GC threads for +// load balancing (i.e., over partitioning). The last task to be +// executed by a GC thread in a job is a work stealing task. A +// GC thread that gets a work stealing task continues to execute +// that task until the job is done. In the static number of GC theads +// case, tasks are added to a queue (FIFO). The work stealing tasks are +// the last to be added. Once the tasks are added, the GC threads grab +// a task and go. A single thread can do all the non-work stealing tasks +// and then execute a work stealing and wait for all the other GC threads +// to execute their work stealing task. +// In the dynamic number of GC threads implementation, idle-tasks are +// created to occupy the non-participating or "inactive" threads. An +// idle-task makes the GC thread wait on a barrier that is part of the +// GCTaskManager. The GC threads that have been "idled" in a IdleGCTask +// are released once all the active GC threads have finished their work +// stealing tasks. The GCTaskManager does not wait for all the "idled" +// GC threads to resume execution. When those GC threads do resume +// execution in the course of the thread scheduling, they call get_tasks() +// as all the other GC threads do. Because all the "idled" threads are +// not required to execute in order to finish a job, it is possible for +// a GC thread to still be "idled" when the next job is started. Such +// a thread stays "idled" for the next job. This can result in a new +// job not having all the expected active workers. For example if on +// job requests 4 active workers out of a total of 10 workers so the +// remaining 6 are "idled", if the next job requests 6 active workers +// but all 6 of the "idled" workers are still idle, then the next job +// will only get 4 active workers. +// The implementation for the parallel old compaction phase has an +// added complication. In the static case parold partitions the chunks +// ready to be filled into stacks, one for each GC thread. A GC thread +// executing a draining task (drains the stack of ready chunks) +// claims a stack according to it's id (the unique ordinal value assigned +// to each GC thread). In the dynamic case not all GC threads will +// actively participate so stacks with ready to fill chunks can only be +// given to the active threads. An initial implementation chose stacks +// number 1-n to get the ready chunks and required that GC threads +// 1-n be the active workers. This was undesirable because it required +// certain threads to participate. In the final implementation a +// list of stacks equal in number to the active workers are filled +// with ready chunks. GC threads that participate get a stack from +// the task (DrainStacksCompactionTask), empty the stack, and then add it to a +// recycling list at the end of the task. If the same GC thread gets +// a second task, it gets a second stack to drain and returns it. The +// stacks are added to a recycling list so that later stealing tasks +// for this tasks can get a stack from the recycling list. Stealing tasks +// use the stacks in its work in a way similar to the draining tasks. +// A thread is not guaranteed to get anything but a stealing task and +// a thread that only gets a stealing task has to get a stack. A failed +// implementation tried to have the GC threads keep the stack they used +// during a draining task for later use in the stealing task but that didn't +// work because as noted a thread is not guaranteed to get a draining task. +// +// For PSScavenge and ParCompactionManager the GC threads are +// held in the GCTaskThread** _thread array in GCTaskManager. + + class GCTaskManager : public CHeapObj { friend class ParCompactionManager; friend class PSParallelCompact; friend class PSScavenge; friend class PSRefProcTaskExecutor; friend class RefProcTaskExecutor; + friend class GCTaskThread; + friend class IdleGCTask; private: // Instance state. NotifyDoneClosure* _ndc; // Notify on completion. @@ -298,6 +370,7 @@ private: Monitor* _monitor; // Notification of changes. SynchronizedGCTaskQueue* _queue; // Queue of tasks. GCTaskThread** _thread; // Array of worker threads. + uint _active_workers; // Number of active workers. uint _busy_workers; // Number of busy workers. uint _blocking_worker; // The worker that's blocking. bool* _resource_flag; // Array of flag per threads. @@ -307,6 +380,8 @@ private: uint _emptied_queue; // Times we emptied the queue. NoopGCTask* _noop_task; // The NoopGCTask instance. uint _noop_tasks; // Count of noop tasks. + WaitForBarrierGCTask* _idle_inactive_task;// Task for inactive workers + volatile uint _idle_workers; // Number of idled workers public: // Factory create and destroy methods. static GCTaskManager* create(uint workers) { @@ -324,6 +399,9 @@ public: uint busy_workers() const { return _busy_workers; } + volatile uint idle_workers() const { + return _idle_workers; + } // Pun between Monitor* and Mutex* Monitor* monitor() const { return _monitor; @@ -331,6 +409,9 @@ public: Monitor * lock() const { return _monitor; } + WaitForBarrierGCTask* idle_inactive_task() { + return _idle_inactive_task; + } // Methods. // Add the argument task to be run. void add_task(GCTask* task); @@ -350,6 +431,10 @@ public: bool should_release_resources(uint which); // Predicate. // Note the release of resources by the argument worker. void note_release(uint which); + // Create IdleGCTasks for inactive workers and start workers + void task_idle_workers(); + // Release the workers in IdleGCTasks + void release_idle_workers(); // Constants. // A sentinel worker identifier. static uint sentinel_worker() { @@ -375,6 +460,15 @@ protected: uint workers() const { return _workers; } + void set_active_workers(uint v) { + assert(v <= _workers, "Trying to set more workers active than there are"); + _active_workers = MIN2(v, _workers); + assert(v != 0, "Trying to set active workers to 0"); + _active_workers = MAX2(1U, _active_workers); + } + // Sets the number of threads that will be used in a collection + void set_active_gang(); + NotifyDoneClosure* notify_done_closure() const { return _ndc; } @@ -457,8 +551,21 @@ protected: void reset_noop_tasks() { _noop_tasks = 0; } + void increment_idle_workers() { + _idle_workers++; + } + void decrement_idle_workers() { + _idle_workers--; + } // Other methods. void initialize(); + + public: + // Return true if all workers are currently active. + bool all_workers_active() { return workers() == active_workers(); } + uint active_workers() const { + return _active_workers; + } }; // @@ -475,6 +582,8 @@ public: static NoopGCTask* create(); static NoopGCTask* create_on_c_heap(); static void destroy(NoopGCTask* that); + + virtual char* name() { return (char *)"noop task"; } // Methods from GCTask. void do_it(GCTaskManager* manager, uint which) { // Nothing to do. @@ -518,6 +627,8 @@ protected: } // Destructor-like method. void destruct(); + + virtual char* name() { return (char *)"barrier task"; } // Methods. // Wait for this to be the only task running. void do_it_internal(GCTaskManager* manager, uint which); @@ -586,11 +697,13 @@ protected: // the BarrierGCTask is done. // This may cover many of the uses of NotifyingBarrierGCTasks. class WaitForBarrierGCTask : public BarrierGCTask { + friend class GCTaskManager; + friend class IdleGCTask; private: // Instance state. - Monitor* _monitor; // Guard and notify changes. - bool _should_wait; // true=>wait, false=>proceed. - const bool _is_c_heap_obj; // Was allocated on the heap. + Monitor* _monitor; // Guard and notify changes. + volatile bool _should_wait; // true=>wait, false=>proceed. + const bool _is_c_heap_obj; // Was allocated on the heap. public: virtual char* name() { return (char *) "waitfor-barrier-task"; } @@ -600,7 +713,10 @@ public: static void destroy(WaitForBarrierGCTask* that); // Methods. void do_it(GCTaskManager* manager, uint which); - void wait_for(); + void wait_for(bool reset); + void set_should_wait(bool value) { + _should_wait = value; + } protected: // Constructor. Clients use factory, but there might be subclasses. WaitForBarrierGCTask(bool on_c_heap); @@ -613,14 +729,38 @@ protected: bool should_wait() const { return _should_wait; } - void set_should_wait(bool value) { - _should_wait = value; - } bool is_c_heap_obj() { return _is_c_heap_obj; } }; +// Task that is used to idle a GC task when fewer than +// the maximum workers are wanted. +class IdleGCTask : public GCTask { + const bool _is_c_heap_obj; // Was allocated on the heap. + public: + bool is_c_heap_obj() { + return _is_c_heap_obj; + } + // Factory create and destroy methods. + static IdleGCTask* create(); + static IdleGCTask* create_on_c_heap(); + static void destroy(IdleGCTask* that); + + virtual char* name() { return (char *)"idle task"; } + // Methods from GCTask. + virtual void do_it(GCTaskManager* manager, uint which); +protected: + // Constructor. + IdleGCTask(bool on_c_heap) : + GCTask(GCTask::Kind::idle_task), + _is_c_heap_obj(on_c_heap) { + // Nothing to do. + } + // Destructor-like method. + void destruct(); +}; + class MonitorSupply : public AllStatic { private: // State. diff --git a/hotspot/src/share/vm/gc_implementation/parallelScavenge/gcTaskThread.cpp b/hotspot/src/share/vm/gc_implementation/parallelScavenge/gcTaskThread.cpp index 41c90f9ece7..235039dd215 100644 --- a/hotspot/src/share/vm/gc_implementation/parallelScavenge/gcTaskThread.cpp +++ b/hotspot/src/share/vm/gc_implementation/parallelScavenge/gcTaskThread.cpp @@ -93,6 +93,11 @@ void GCTaskThread::print_on(outputStream* st) const { st->cr(); } +// GC workers get tasks from the GCTaskManager and execute +// them in this method. If there are no tasks to execute, +// the GC workers wait in the GCTaskManager's get_task() +// for tasks to be enqueued for execution. + void GCTaskThread::run() { // Set up the thread for stack overflow support this->record_stack_base_and_size(); @@ -124,7 +129,6 @@ void GCTaskThread::run() { for (; /* break */; ) { // This will block until there is a task to be gotten. GCTask* task = manager()->get_task(which()); - // In case the update is costly if (PrintGCTaskTimeStamps) { timer.update(); @@ -134,18 +138,28 @@ void GCTaskThread::run() { char* name = task->name(); task->do_it(manager(), which()); - manager()->note_completion(which()); - if (PrintGCTaskTimeStamps) { - assert(_time_stamps != NULL, "Sanity (PrintGCTaskTimeStamps set late?)"); + if (!task->is_idle_task()) { + manager()->note_completion(which()); - timer.update(); + if (PrintGCTaskTimeStamps) { + assert(_time_stamps != NULL, + "Sanity (PrintGCTaskTimeStamps set late?)"); - GCTaskTimeStamp* time_stamp = time_stamp_at(_time_stamp_index++); + timer.update(); - time_stamp->set_name(name); - time_stamp->set_entry_time(entry_time); - time_stamp->set_exit_time(timer.ticks()); + GCTaskTimeStamp* time_stamp = time_stamp_at(_time_stamp_index++); + + time_stamp->set_name(name); + time_stamp->set_entry_time(entry_time); + time_stamp->set_exit_time(timer.ticks()); + } + } else { + // idle tasks complete outside the normal accounting + // so that a task can complete without waiting for idle tasks. + // They have to be terminated separately. + IdleGCTask::destroy((IdleGCTask*)task); + set_is_working(true); } // Check if we should release our inner resources. diff --git a/hotspot/src/share/vm/gc_implementation/parallelScavenge/gcTaskThread.hpp b/hotspot/src/share/vm/gc_implementation/parallelScavenge/gcTaskThread.hpp index 7fc907bf0c7..c8406545e9b 100644 --- a/hotspot/src/share/vm/gc_implementation/parallelScavenge/gcTaskThread.hpp +++ b/hotspot/src/share/vm/gc_implementation/parallelScavenge/gcTaskThread.hpp @@ -35,6 +35,7 @@ class GCTaskTimeStamp; class GCTaskManager; class GCTaskThread : public WorkerThread { + friend class GCTaskManager; private: // Instance state. GCTaskManager* _manager; // Manager for worker. @@ -45,6 +46,8 @@ private: GCTaskTimeStamp* time_stamp_at(uint index); + bool _is_working; // True if participating in GC tasks + public: // Factory create and destroy methods. static GCTaskThread* create(GCTaskManager* manager, @@ -84,6 +87,7 @@ protected: uint processor_id() const { return _processor_id; } + void set_is_working(bool v) { _is_working = v; } }; class GCTaskTimeStamp : public CHeapObj diff --git a/hotspot/src/share/vm/gc_implementation/parallelScavenge/pcTasks.cpp b/hotspot/src/share/vm/gc_implementation/parallelScavenge/pcTasks.cpp index 75ee0b3a97a..c577daf9ca9 100644 --- a/hotspot/src/share/vm/gc_implementation/parallelScavenge/pcTasks.cpp +++ b/hotspot/src/share/vm/gc_implementation/parallelScavenge/pcTasks.cpp @@ -152,15 +152,16 @@ void RefProcTaskExecutor::execute(ProcessTask& task) { ParallelScavengeHeap* heap = PSParallelCompact::gc_heap(); uint parallel_gc_threads = heap->gc_task_manager()->workers(); + uint active_gc_threads = heap->gc_task_manager()->active_workers(); RegionTaskQueueSet* qset = ParCompactionManager::region_array(); - ParallelTaskTerminator terminator(parallel_gc_threads, qset); + ParallelTaskTerminator terminator(active_gc_threads, qset); GCTaskQueue* q = GCTaskQueue::create(); for(uint i=0; ienqueue(new RefProcTaskProxy(task, i)); } if (task.marks_oops_alive()) { if (parallel_gc_threads>1) { - for (uint j=0; jenqueue(new StealMarkingTask(&terminator)); } } @@ -216,7 +217,6 @@ void StealMarkingTask::do_it(GCTaskManager* manager, uint which) { // StealRegionCompactionTask // - StealRegionCompactionTask::StealRegionCompactionTask(ParallelTaskTerminator* t): _terminator(t) {} @@ -229,6 +229,32 @@ void StealRegionCompactionTask::do_it(GCTaskManager* manager, uint which) { ParCompactionManager* cm = ParCompactionManager::gc_thread_compaction_manager(which); + + // If not all threads are active, get a draining stack + // from the list. Else, just use this threads draining stack. + uint which_stack_index; + bool use_all_workers = manager->all_workers_active(); + if (use_all_workers) { + which_stack_index = which; + assert(manager->active_workers() == ParallelGCThreads, + err_msg("all_workers_active has been incorrectly set: " + " active %d ParallelGCThreads %d", manager->active_workers(), + ParallelGCThreads)); + } else { + which_stack_index = ParCompactionManager::pop_recycled_stack_index(); + } + + cm->set_region_stack_index(which_stack_index); + cm->set_region_stack(ParCompactionManager::region_list(which_stack_index)); + if (TraceDynamicGCThreads) { + gclog_or_tty->print_cr("StealRegionCompactionTask::do_it " + "region_stack_index %d region_stack = 0x%x " + " empty (%d) use all workers %d", + which_stack_index, ParCompactionManager::region_list(which_stack_index), + cm->region_stack()->is_empty(), + use_all_workers); + } + // Has to drain stacks first because there may be regions on // preloaded onto the stack and this thread may never have // done a draining task. Are the draining tasks needed? @@ -285,6 +311,50 @@ void DrainStacksCompactionTask::do_it(GCTaskManager* manager, uint which) { ParCompactionManager* cm = ParCompactionManager::gc_thread_compaction_manager(which); + uint which_stack_index; + bool use_all_workers = manager->all_workers_active(); + if (use_all_workers) { + which_stack_index = which; + assert(manager->active_workers() == ParallelGCThreads, + err_msg("all_workers_active has been incorrectly set: " + " active %d ParallelGCThreads %d", manager->active_workers(), + ParallelGCThreads)); + } else { + which_stack_index = stack_index(); + } + + cm->set_region_stack(ParCompactionManager::region_list(which_stack_index)); + if (TraceDynamicGCThreads) { + gclog_or_tty->print_cr("DrainStacksCompactionTask::do_it which = %d " + "which_stack_index = %d/empty(%d) " + "use all workers %d", + which, which_stack_index, + cm->region_stack()->is_empty(), + use_all_workers); + } + + cm->set_region_stack_index(which_stack_index); + // Process any regions already in the compaction managers stacks. cm->drain_region_stacks(); + + assert(cm->region_stack()->is_empty(), "Not empty"); + + if (!use_all_workers) { + // Always give up the region stack. + assert(cm->region_stack() == + ParCompactionManager::region_list(cm->region_stack_index()), + "region_stack and region_stack_index are inconsistent"); + ParCompactionManager::push_recycled_stack_index(cm->region_stack_index()); + + if (TraceDynamicGCThreads) { + void* old_region_stack = (void*) cm->region_stack(); + int old_region_stack_index = cm->region_stack_index(); + gclog_or_tty->print_cr("Pushing region stack 0x%x/%d", + old_region_stack, old_region_stack_index); + } + + cm->set_region_stack(NULL); + cm->set_region_stack_index((uint)max_uintx); + } } diff --git a/hotspot/src/share/vm/gc_implementation/parallelScavenge/psCompactionManager.cpp b/hotspot/src/share/vm/gc_implementation/parallelScavenge/psCompactionManager.cpp index 24b9ddea001..c94de3a59ba 100644 --- a/hotspot/src/share/vm/gc_implementation/parallelScavenge/psCompactionManager.cpp +++ b/hotspot/src/share/vm/gc_implementation/parallelScavenge/psCompactionManager.cpp @@ -39,6 +39,9 @@ PSOldGen* ParCompactionManager::_old_gen = NULL; ParCompactionManager** ParCompactionManager::_manager_array = NULL; + +RegionTaskQueue** ParCompactionManager::_region_list = NULL; + OopTaskQueueSet* ParCompactionManager::_stack_array = NULL; ParCompactionManager::ObjArrayTaskQueueSet* ParCompactionManager::_objarray_queues = NULL; @@ -46,8 +49,14 @@ ObjectStartArray* ParCompactionManager::_start_array = NULL; ParMarkBitMap* ParCompactionManager::_mark_bitmap = NULL; RegionTaskQueueSet* ParCompactionManager::_region_array = NULL; +uint* ParCompactionManager::_recycled_stack_index = NULL; +int ParCompactionManager::_recycled_top = -1; +int ParCompactionManager::_recycled_bottom = -1; + ParCompactionManager::ParCompactionManager() : - _action(CopyAndUpdate) { + _action(CopyAndUpdate), + _region_stack(NULL), + _region_stack_index((uint)max_uintx) { ParallelScavengeHeap* heap = (ParallelScavengeHeap*)Universe::heap(); assert(heap->kind() == CollectedHeap::ParallelScavengeHeap, "Sanity"); @@ -57,7 +66,10 @@ ParCompactionManager::ParCompactionManager() : marking_stack()->initialize(); _objarray_stack.initialize(); - region_stack()->initialize(); +} + +ParCompactionManager::~ParCompactionManager() { + delete _recycled_stack_index; } void ParCompactionManager::initialize(ParMarkBitMap* mbm) { @@ -72,6 +84,19 @@ void ParCompactionManager::initialize(ParMarkBitMap* mbm) { _manager_array = NEW_C_HEAP_ARRAY(ParCompactionManager*, parallel_gc_threads+1 ); guarantee(_manager_array != NULL, "Could not allocate manager_array"); + _region_list = NEW_C_HEAP_ARRAY(RegionTaskQueue*, + parallel_gc_threads+1); + guarantee(_region_list != NULL, "Could not initialize promotion manager"); + + _recycled_stack_index = NEW_C_HEAP_ARRAY(uint, parallel_gc_threads); + + // parallel_gc-threads + 1 to be consistent with the number of + // compaction managers. + for(uint i=0; iinitialize(); + } + _stack_array = new OopTaskQueueSet(parallel_gc_threads); guarantee(_stack_array != NULL, "Could not allocate stack_array"); _objarray_queues = new ObjArrayTaskQueueSet(parallel_gc_threads); @@ -85,7 +110,7 @@ void ParCompactionManager::initialize(ParMarkBitMap* mbm) { guarantee(_manager_array[i] != NULL, "Could not create ParCompactionManager"); stack_array()->register_queue(i, _manager_array[i]->marking_stack()); _objarray_queues->register_queue(i, &_manager_array[i]->_objarray_stack); - region_array()->register_queue(i, _manager_array[i]->region_stack()); + region_array()->register_queue(i, region_list(i)); } // The VMThread gets its own ParCompactionManager, which is not available @@ -97,6 +122,29 @@ void ParCompactionManager::initialize(ParMarkBitMap* mbm) { "Not initialized?"); } +int ParCompactionManager::pop_recycled_stack_index() { + assert(_recycled_bottom <= _recycled_top, "list is empty"); + // Get the next available index + if (_recycled_bottom < _recycled_top) { + uint cur, next, last; + do { + cur = _recycled_bottom; + next = cur + 1; + last = Atomic::cmpxchg(next, &_recycled_bottom, cur); + } while (cur != last); + return _recycled_stack_index[next]; + } else { + return -1; + } +} + +void ParCompactionManager::push_recycled_stack_index(uint v) { + // Get the next available index + int cur = Atomic::add(1, &_recycled_top); + _recycled_stack_index[cur] = v; + assert(_recycled_bottom <= _recycled_top, "list top and bottom are wrong"); +} + bool ParCompactionManager::should_update() { assert(action() != NotValid, "Action is not set"); return (action() == ParCompactionManager::Update) || @@ -121,6 +169,15 @@ bool ParCompactionManager::should_reset_only() { return action() == ParCompactionManager::ResetObjects; } +void ParCompactionManager::region_list_push(uint list_index, + size_t region_index) { + region_list(list_index)->push(region_index); +} + +void ParCompactionManager::verify_region_list_empty(uint list_index) { + assert(region_list(list_index)->is_empty(), "Not empty"); +} + ParCompactionManager* ParCompactionManager::gc_thread_compaction_manager(int index) { assert(index >= 0 && index < (int)ParallelGCThreads, "index out of range"); diff --git a/hotspot/src/share/vm/gc_implementation/parallelScavenge/psCompactionManager.hpp b/hotspot/src/share/vm/gc_implementation/parallelScavenge/psCompactionManager.hpp index cbf1b7d0301..2438c1e4e56 100644 --- a/hotspot/src/share/vm/gc_implementation/parallelScavenge/psCompactionManager.hpp +++ b/hotspot/src/share/vm/gc_implementation/parallelScavenge/psCompactionManager.hpp @@ -48,6 +48,7 @@ class ParCompactionManager : public CHeapObj { friend class StealRegionCompactionTask; friend class UpdateAndFillClosure; friend class RefProcTaskExecutor; + friend class IdleGCTask; public: @@ -85,7 +86,31 @@ private: // Is there a way to reuse the _marking_stack for the // saving empty regions? For now just create a different // type of TaskQueue. - RegionTaskQueue _region_stack; + RegionTaskQueue* _region_stack; + + static RegionTaskQueue** _region_list; + // Index in _region_list for current _region_stack. + uint _region_stack_index; + + // Indexes of recycled region stacks/overflow stacks + // Stacks of regions to be compacted are embedded in the tasks doing + // the compaction. A thread that executes the task extracts the + // region stack and drains it. These threads keep these region + // stacks for use during compaction task stealing. If a thread + // gets a second draining task, it pushed its current region stack + // index into the array _recycled_stack_index and gets a new + // region stack from the task. A thread that is executing a + // compaction stealing task without ever having executing a + // draining task, will get a region stack from _recycled_stack_index. + // + // Array of indexes into the array of region stacks. + static uint* _recycled_stack_index; + // The index into _recycled_stack_index of the last region stack index + // pushed. If -1, there are no entries into _recycled_stack_index. + static int _recycled_top; + // The index into _recycled_stack_index of the last region stack index + // popped. If -1, there has not been any entry popped. + static int _recycled_bottom; Stack _revisit_klass_stack; Stack _revisit_mdo_stack; @@ -104,7 +129,6 @@ private: // Array of tasks. Needed by the ParallelTaskTerminator. static RegionTaskQueueSet* region_array() { return _region_array; } OverflowTaskQueue* marking_stack() { return &_marking_stack; } - RegionTaskQueue* region_stack() { return &_region_stack; } // Pushes onto the marking stack. If the marking stack is full, // pushes onto the overflow stack. @@ -116,10 +140,33 @@ private: Action action() { return _action; } void set_action(Action v) { _action = v; } + RegionTaskQueue* region_stack() { return _region_stack; } + void set_region_stack(RegionTaskQueue* v) { _region_stack = v; } + inline static ParCompactionManager* manager_array(int index); - ParCompactionManager(); + inline static RegionTaskQueue* region_list(int index) { + return _region_list[index]; + } + uint region_stack_index() { return _region_stack_index; } + void set_region_stack_index(uint v) { _region_stack_index = v; } + + // Pop and push unique reusable stack index + static int pop_recycled_stack_index(); + static void push_recycled_stack_index(uint v); + static void reset_recycled_stack_index() { + _recycled_bottom = _recycled_top = -1; + } + + ParCompactionManager(); + ~ParCompactionManager(); + + // Pushes onto the region stack at the given index. If the + // region stack is full, + // pushes onto the region overflow stack. + static void region_list_push(uint stack_index, size_t region_index); + static void verify_region_list_empty(uint stack_index); ParMarkBitMap* mark_bitmap() { return _mark_bitmap; } // Take actions in preparation for a compaction. diff --git a/hotspot/src/share/vm/gc_implementation/parallelScavenge/psParallelCompact.cpp b/hotspot/src/share/vm/gc_implementation/parallelScavenge/psParallelCompact.cpp index a62059c68e8..3646131c6f2 100644 --- a/hotspot/src/share/vm/gc_implementation/parallelScavenge/psParallelCompact.cpp +++ b/hotspot/src/share/vm/gc_implementation/parallelScavenge/psParallelCompact.cpp @@ -2045,6 +2045,11 @@ void PSParallelCompact::invoke_no_policy(bool maximum_heap_compaction) { ResourceMark rm; HandleMark hm; + // Set the number of GC threads to be used in this collection + gc_task_manager()->set_active_gang(); + gc_task_manager()->task_idle_workers(); + heap->set_par_threads(gc_task_manager()->active_workers()); + const bool is_system_gc = gc_cause == GCCause::_java_lang_system_gc; // This is useful for debugging but don't change the output the @@ -2197,6 +2202,7 @@ void PSParallelCompact::invoke_no_policy(bool maximum_heap_compaction) { // Track memory usage and detect low memory MemoryService::track_memory_usage(); heap->update_counters(); + gc_task_manager()->release_idle_workers(); } #ifdef ASSERT @@ -2204,7 +2210,7 @@ void PSParallelCompact::invoke_no_policy(bool maximum_heap_compaction) { ParCompactionManager* const cm = ParCompactionManager::manager_array(int(i)); assert(cm->marking_stack()->is_empty(), "should be empty"); - assert(cm->region_stack()->is_empty(), "should be empty"); + assert(ParCompactionManager::region_list(int(i))->is_empty(), "should be empty"); assert(cm->revisit_klass_stack()->is_empty(), "should be empty"); } #endif // ASSERT @@ -2351,8 +2357,9 @@ void PSParallelCompact::marking_phase(ParCompactionManager* cm, ParallelScavengeHeap* heap = gc_heap(); uint parallel_gc_threads = heap->gc_task_manager()->workers(); + uint active_gc_threads = heap->gc_task_manager()->active_workers(); TaskQueueSetSuper* qset = ParCompactionManager::region_array(); - ParallelTaskTerminator terminator(parallel_gc_threads, qset); + ParallelTaskTerminator terminator(active_gc_threads, qset); PSParallelCompact::MarkAndPushClosure mark_and_push_closure(cm); PSParallelCompact::FollowStackClosure follow_stack_closure(cm); @@ -2374,21 +2381,13 @@ void PSParallelCompact::marking_phase(ParCompactionManager* cm, q->enqueue(new MarkFromRootsTask(MarkFromRootsTask::jvmti)); q->enqueue(new MarkFromRootsTask(MarkFromRootsTask::code_cache)); - if (parallel_gc_threads > 1) { - for (uint j = 0; j < parallel_gc_threads; j++) { + if (active_gc_threads > 1) { + for (uint j = 0; j < active_gc_threads; j++) { q->enqueue(new StealMarkingTask(&terminator)); } } - WaitForBarrierGCTask* fin = WaitForBarrierGCTask::create(); - q->enqueue(fin); - - gc_task_manager()->add_list(q); - - fin->wait_for(); - - // We have to release the barrier tasks! - WaitForBarrierGCTask::destroy(fin); + gc_task_manager()->execute_and_wait(q); } // Process reference objects found during marking @@ -2483,10 +2482,22 @@ void PSParallelCompact::enqueue_region_draining_tasks(GCTaskQueue* q, { TraceTime tm("drain task setup", print_phases(), true, gclog_or_tty); - const unsigned int task_count = MAX2(parallel_gc_threads, 1U); - for (unsigned int j = 0; j < task_count; j++) { + // Find the threads that are active + unsigned int which = 0; + + const uint task_count = MAX2(parallel_gc_threads, 1U); + for (uint j = 0; j < task_count; j++) { q->enqueue(new DrainStacksCompactionTask(j)); + ParCompactionManager::verify_region_list_empty(j); + // Set the region stacks variables to "no" region stack values + // so that they will be recognized and needing a region stack + // in the stealing tasks if they do not get one by executing + // a draining stack. + ParCompactionManager* cm = ParCompactionManager::manager_array(j); + cm->set_region_stack(NULL); + cm->set_region_stack_index((uint)max_uintx); } + ParCompactionManager::reset_recycled_stack_index(); // Find all regions that are available (can be filled immediately) and // distribute them to the thread stacks. The iteration is done in reverse @@ -2495,8 +2506,10 @@ void PSParallelCompact::enqueue_region_draining_tasks(GCTaskQueue* q, const ParallelCompactData& sd = PSParallelCompact::summary_data(); size_t fillable_regions = 0; // A count for diagnostic purposes. - unsigned int which = 0; // The worker thread number. + // A region index which corresponds to the tasks created above. + // "which" must be 0 <= which < task_count + which = 0; for (unsigned int id = to_space_id; id > perm_space_id; --id) { SpaceInfo* const space_info = _space_info + id; MutableSpace* const space = space_info->space(); @@ -2509,8 +2522,7 @@ void PSParallelCompact::enqueue_region_draining_tasks(GCTaskQueue* q, for (size_t cur = end_region - 1; cur >= beg_region; --cur) { if (sd.region(cur)->claim_unsafe()) { - ParCompactionManager* cm = ParCompactionManager::manager_array(which); - cm->push_region(cur); + ParCompactionManager::region_list_push(which, cur); if (TraceParallelOldGCCompactionPhase && Verbose) { const size_t count_mod_8 = fillable_regions & 7; @@ -2521,8 +2533,10 @@ void PSParallelCompact::enqueue_region_draining_tasks(GCTaskQueue* q, NOT_PRODUCT(++fillable_regions;) - // Assign regions to threads in round-robin fashion. + // Assign regions to tasks in round-robin fashion. if (++which == task_count) { + assert(which <= parallel_gc_threads, + "Inconsistent number of workers"); which = 0; } } @@ -2642,26 +2656,19 @@ void PSParallelCompact::compact() { PSOldGen* old_gen = heap->old_gen(); old_gen->start_array()->reset(); uint parallel_gc_threads = heap->gc_task_manager()->workers(); + uint active_gc_threads = heap->gc_task_manager()->active_workers(); TaskQueueSetSuper* qset = ParCompactionManager::region_array(); - ParallelTaskTerminator terminator(parallel_gc_threads, qset); + ParallelTaskTerminator terminator(active_gc_threads, qset); GCTaskQueue* q = GCTaskQueue::create(); - enqueue_region_draining_tasks(q, parallel_gc_threads); - enqueue_dense_prefix_tasks(q, parallel_gc_threads); - enqueue_region_stealing_tasks(q, &terminator, parallel_gc_threads); + enqueue_region_draining_tasks(q, active_gc_threads); + enqueue_dense_prefix_tasks(q, active_gc_threads); + enqueue_region_stealing_tasks(q, &terminator, active_gc_threads); { TraceTime tm_pc("par compact", print_phases(), true, gclog_or_tty); - WaitForBarrierGCTask* fin = WaitForBarrierGCTask::create(); - q->enqueue(fin); - - gc_task_manager()->add_list(q); - - fin->wait_for(); - - // We have to release the barrier tasks! - WaitForBarrierGCTask::destroy(fin); + gc_task_manager()->execute_and_wait(q); #ifdef ASSERT // Verify that all regions have been processed before the deferred updates. @@ -2729,6 +2736,9 @@ void PSParallelCompact::follow_weak_klass_links() { // All klasses on the revisit stack are marked at this point. // Update and follow all subklass, sibling and implementor links. + // Check all the stacks here even if not all the workers are active. + // There is no accounting which indicates which stacks might have + // contents to be followed. if (PrintRevisitStats) { gclog_or_tty->print_cr("#classes in system dictionary = %d", SystemDictionary::number_of_classes()); diff --git a/hotspot/src/share/vm/gc_implementation/parallelScavenge/psScavenge.cpp b/hotspot/src/share/vm/gc_implementation/parallelScavenge/psScavenge.cpp index 1094d1708e8..0cf826eca40 100644 --- a/hotspot/src/share/vm/gc_implementation/parallelScavenge/psScavenge.cpp +++ b/hotspot/src/share/vm/gc_implementation/parallelScavenge/psScavenge.cpp @@ -181,28 +181,29 @@ class PSRefProcTaskExecutor: public AbstractRefProcTaskExecutor { void PSRefProcTaskExecutor::execute(ProcessTask& task) { GCTaskQueue* q = GCTaskQueue::create(); - for(uint i=0; iactive_workers(); i++) { q->enqueue(new PSRefProcTaskProxy(task, i)); } - ParallelTaskTerminator terminator( - ParallelScavengeHeap::gc_task_manager()->workers(), + ParallelTaskTerminator terminator(manager->active_workers(), (TaskQueueSetSuper*) PSPromotionManager::stack_array_depth()); - if (task.marks_oops_alive() && ParallelGCThreads > 1) { - for (uint j=0; jactive_workers() > 1) { + for (uint j = 0; j < manager->active_workers(); j++) { q->enqueue(new StealTask(&terminator)); } } - ParallelScavengeHeap::gc_task_manager()->execute_and_wait(q); + manager->execute_and_wait(q); } void PSRefProcTaskExecutor::execute(EnqueueTask& task) { GCTaskQueue* q = GCTaskQueue::create(); - for(uint i=0; iactive_workers(); i++) { q->enqueue(new PSRefEnqueueTaskProxy(task, i)); } - ParallelScavengeHeap::gc_task_manager()->execute_and_wait(q); + manager->execute_and_wait(q); } // This method contains all heap specific policy for invoking scavenge. @@ -375,6 +376,14 @@ bool PSScavenge::invoke_no_policy() { // Release all previously held resources gc_task_manager()->release_all_resources(); + // Set the number of GC threads to be used in this collection + gc_task_manager()->set_active_gang(); + gc_task_manager()->task_idle_workers(); + // Get the active number of workers here and use that value + // throughout the methods. + uint active_workers = gc_task_manager()->active_workers(); + heap->set_par_threads(active_workers); + PSPromotionManager::pre_scavenge(); // We'll use the promotion manager again later. @@ -385,8 +394,9 @@ bool PSScavenge::invoke_no_policy() { GCTaskQueue* q = GCTaskQueue::create(); - for(uint i=0; ienqueue(new OldToYoungRootsTask(old_gen, old_top, i)); + uint stripe_total = active_workers; + for(uint i=0; i < stripe_total; i++) { + q->enqueue(new OldToYoungRootsTask(old_gen, old_top, i, stripe_total)); } q->enqueue(new SerialOldToYoungRootsTask(perm_gen, perm_top)); @@ -403,10 +413,10 @@ bool PSScavenge::invoke_no_policy() { q->enqueue(new ScavengeRootsTask(ScavengeRootsTask::code_cache)); ParallelTaskTerminator terminator( - gc_task_manager()->workers(), + active_workers, (TaskQueueSetSuper*) promotion_manager->stack_array_depth()); - if (ParallelGCThreads>1) { - for (uint j=0; j 1) { + for (uint j = 0; j < active_workers; j++) { q->enqueue(new StealTask(&terminator)); } } @@ -419,6 +429,7 @@ bool PSScavenge::invoke_no_policy() { // Process reference objects discovered during scavenge { reference_processor()->setup_policy(false); // not always_clear + reference_processor()->set_active_mt_degree(active_workers); PSKeepAliveClosure keep_alive(promotion_manager); PSEvacuateFollowersClosure evac_followers(promotion_manager); if (reference_processor()->processing_is_mt()) { @@ -622,6 +633,8 @@ bool PSScavenge::invoke_no_policy() { // Track memory usage and detect low memory MemoryService::track_memory_usage(); heap->update_counters(); + + gc_task_manager()->release_idle_workers(); } if (VerifyAfterGC && heap->total_collections() >= VerifyGCStartAt) { @@ -804,6 +817,7 @@ void PSScavenge::initialize() { // Initialize ref handling object for scavenging. MemRegion mr = young_gen->reserved(); + _ref_processor = new ReferenceProcessor(mr, // span ParallelRefProcEnabled && (ParallelGCThreads > 1), // mt processing diff --git a/hotspot/src/share/vm/gc_implementation/parallelScavenge/psTasks.cpp b/hotspot/src/share/vm/gc_implementation/parallelScavenge/psTasks.cpp index f3011a82416..0800681f94a 100644 --- a/hotspot/src/share/vm/gc_implementation/parallelScavenge/psTasks.cpp +++ b/hotspot/src/share/vm/gc_implementation/parallelScavenge/psTasks.cpp @@ -202,7 +202,8 @@ void OldToYoungRootsTask::do_it(GCTaskManager* manager, uint which) { _gen->object_space(), _gen_top, pm, - _stripe_number); + _stripe_number, + _stripe_total); // Do the real work pm->drain_stacks(false); diff --git a/hotspot/src/share/vm/gc_implementation/parallelScavenge/psTasks.hpp b/hotspot/src/share/vm/gc_implementation/parallelScavenge/psTasks.hpp index 7ae5b21cb38..d31f653ac16 100644 --- a/hotspot/src/share/vm/gc_implementation/parallelScavenge/psTasks.hpp +++ b/hotspot/src/share/vm/gc_implementation/parallelScavenge/psTasks.hpp @@ -135,16 +135,63 @@ class SerialOldToYoungRootsTask : public GCTask { // OldToYoungRootsTask // // This task is used to scan old to young roots in parallel +// +// A GC thread executing this tasks divides the generation (old gen) +// into slices and takes a stripe in the slice as its part of the +// work. +// +// +===============+ slice 0 +// | stripe 0 | +// +---------------+ +// | stripe 1 | +// +---------------+ +// | stripe 2 | +// +---------------+ +// | stripe 3 | +// +===============+ slice 1 +// | stripe 0 | +// +---------------+ +// | stripe 1 | +// +---------------+ +// | stripe 2 | +// +---------------+ +// | stripe 3 | +// +===============+ slice 2 +// ... +// +// A task is created for each stripe. In this case there are 4 tasks +// created. A GC thread first works on its stripe within slice 0 +// and then moves to its stripe in the next slice until all stripes +// exceed the top of the generation. Note that having fewer GC threads +// than stripes works because all the tasks are executed so all stripes +// will be covered. In this example if 4 tasks have been created to cover +// all the stripes and there are only 3 threads, one of the threads will +// get the tasks with the 4th stripe. However, there is a dependence in +// CardTableExtension::scavenge_contents_parallel() on the number +// of tasks created. In scavenge_contents_parallel the distance +// to the next stripe is calculated based on the number of tasks. +// If the stripe width is ssize, a task's next stripe is at +// ssize * number_of_tasks (= slice_stride). In this case after +// finishing stripe 0 in slice 0, the thread finds the stripe 0 in slice1 +// by adding slice_stride to the start of stripe 0 in slice 0 to get +// to the start of stride 0 in slice 1. class OldToYoungRootsTask : public GCTask { private: PSOldGen* _gen; HeapWord* _gen_top; uint _stripe_number; + uint _stripe_total; public: - OldToYoungRootsTask(PSOldGen *gen, HeapWord* gen_top, uint stripe_number) : - _gen(gen), _gen_top(gen_top), _stripe_number(stripe_number) { } + OldToYoungRootsTask(PSOldGen *gen, + HeapWord* gen_top, + uint stripe_number, + uint stripe_total) : + _gen(gen), + _gen_top(gen_top), + _stripe_number(stripe_number), + _stripe_total(stripe_total) { } char* name() { return (char *)"old-to-young-roots-task"; } diff --git a/hotspot/src/share/vm/gc_implementation/shared/adaptiveSizePolicy.cpp b/hotspot/src/share/vm/gc_implementation/shared/adaptiveSizePolicy.cpp index 4c933f85b09..2b5858b725a 100644 --- a/hotspot/src/share/vm/gc_implementation/shared/adaptiveSizePolicy.cpp +++ b/hotspot/src/share/vm/gc_implementation/shared/adaptiveSizePolicy.cpp @@ -28,8 +28,10 @@ #include "memory/collectorPolicy.hpp" #include "runtime/timer.hpp" #include "utilities/ostream.hpp" +#include "utilities/workgroup.hpp" elapsedTimer AdaptiveSizePolicy::_minor_timer; elapsedTimer AdaptiveSizePolicy::_major_timer; +bool AdaptiveSizePolicy::_debug_perturbation = false; // The throughput goal is implemented as // _throughput_goal = 1 - ( 1 / (1 + gc_cost_ratio)) @@ -88,6 +90,134 @@ AdaptiveSizePolicy::AdaptiveSizePolicy(size_t init_eden_size, _young_gen_policy_is_ready = false; } +// If the number of GC threads was set on the command line, +// use it. +// Else +// Calculate the number of GC threads based on the number of Java threads. +// Calculate the number of GC threads based on the size of the heap. +// Use the larger. + +int AdaptiveSizePolicy::calc_default_active_workers(uintx total_workers, + const uintx min_workers, + uintx active_workers, + uintx application_workers) { + // If the user has specifically set the number of + // GC threads, use them. + + // If the user has turned off using a dynamic number of GC threads + // or the users has requested a specific number, set the active + // number of workers to all the workers. + + uintx new_active_workers = total_workers; + uintx prev_active_workers = active_workers; + uintx active_workers_by_JT = 0; + uintx active_workers_by_heap_size = 0; + + // Always use at least min_workers but use up to + // GCThreadsPerJavaThreads * application threads. + active_workers_by_JT = + MAX2((uintx) GCWorkersPerJavaThread * application_workers, + min_workers); + + // Choose a number of GC threads based on the current size + // of the heap. This may be complicated because the size of + // the heap depends on factors such as the thoughput goal. + // Still a large heap should be collected by more GC threads. + active_workers_by_heap_size = + MAX2((size_t) 2U, Universe::heap()->capacity() / HeapSizePerGCThread); + + uintx max_active_workers = + MAX2(active_workers_by_JT, active_workers_by_heap_size); + + // Limit the number of workers to the the number created, + // (workers()). + new_active_workers = MIN2(max_active_workers, + (uintx) total_workers); + + // Increase GC workers instantly but decrease them more + // slowly. + if (new_active_workers < prev_active_workers) { + new_active_workers = + MAX2(min_workers, (prev_active_workers + new_active_workers) / 2); + } + + // Check once more that the number of workers is within the limits. + assert(min_workers <= total_workers, "Minimum workers not consistent with total workers"); + assert(new_active_workers >= min_workers, "Minimum workers not observed"); + assert(new_active_workers <= total_workers, "Total workers not observed"); + + if (ForceDynamicNumberOfGCThreads) { + // Assume this is debugging and jiggle the number of GC threads. + if (new_active_workers == prev_active_workers) { + if (new_active_workers < total_workers) { + new_active_workers++; + } else if (new_active_workers > min_workers) { + new_active_workers--; + } + } + if (new_active_workers == total_workers) { + if (_debug_perturbation) { + new_active_workers = min_workers; + } + _debug_perturbation = !_debug_perturbation; + } + assert((new_active_workers <= (uintx) ParallelGCThreads) && + (new_active_workers >= min_workers), + "Jiggled active workers too much"); + } + + if (TraceDynamicGCThreads) { + gclog_or_tty->print_cr("GCTaskManager::calc_default_active_workers() : " + "active_workers(): %d new_acitve_workers: %d " + "prev_active_workers: %d\n" + " active_workers_by_JT: %d active_workers_by_heap_size: %d", + active_workers, new_active_workers, prev_active_workers, + active_workers_by_JT, active_workers_by_heap_size); + } + assert(new_active_workers > 0, "Always need at least 1"); + return new_active_workers; +} + +int AdaptiveSizePolicy::calc_active_workers(uintx total_workers, + uintx active_workers, + uintx application_workers) { + // If the user has specifically set the number of + // GC threads, use them. + + // If the user has turned off using a dynamic number of GC threads + // or the users has requested a specific number, set the active + // number of workers to all the workers. + + int new_active_workers; + if (!UseDynamicNumberOfGCThreads || + (!FLAG_IS_DEFAULT(ParallelGCThreads) && !ForceDynamicNumberOfGCThreads)) { + new_active_workers = total_workers; + } else { + new_active_workers = calc_default_active_workers(total_workers, + 2, /* Minimum number of workers */ + active_workers, + application_workers); + } + assert(new_active_workers > 0, "Always need at least 1"); + return new_active_workers; +} + +int AdaptiveSizePolicy::calc_active_conc_workers(uintx total_workers, + uintx active_workers, + uintx application_workers) { + if (!UseDynamicNumberOfGCThreads || + (!FLAG_IS_DEFAULT(ConcGCThreads) && !ForceDynamicNumberOfGCThreads)) { + return ConcGCThreads; + } else { + int no_of_gc_threads = calc_default_active_workers( + total_workers, + 1, /* Minimum number of workers */ + active_workers, + application_workers); + return no_of_gc_threads; + } +} + bool AdaptiveSizePolicy::tenuring_threshold_change() const { return decrement_tenuring_threshold_for_gc_cost() || increment_tenuring_threshold_for_gc_cost() || diff --git a/hotspot/src/share/vm/gc_implementation/shared/adaptiveSizePolicy.hpp b/hotspot/src/share/vm/gc_implementation/shared/adaptiveSizePolicy.hpp index 4822eb4bf89..dd1895e9e06 100644 --- a/hotspot/src/share/vm/gc_implementation/shared/adaptiveSizePolicy.hpp +++ b/hotspot/src/share/vm/gc_implementation/shared/adaptiveSizePolicy.hpp @@ -187,6 +187,8 @@ class AdaptiveSizePolicy : public CHeapObj { julong _young_gen_change_for_minor_throughput; julong _old_gen_change_for_major_throughput; + static const uint GCWorkersPerJavaThread = 2; + // Accessors double gc_pause_goal_sec() const { return _gc_pause_goal_sec; } @@ -331,6 +333,8 @@ class AdaptiveSizePolicy : public CHeapObj { // Return true if the policy suggested a change. bool tenuring_threshold_change() const; + static bool _debug_perturbation; + public: AdaptiveSizePolicy(size_t init_eden_size, size_t init_promo_size, @@ -338,6 +342,31 @@ class AdaptiveSizePolicy : public CHeapObj { double gc_pause_goal_sec, uint gc_cost_ratio); + // Return number default GC threads to use in the next GC. + static int calc_default_active_workers(uintx total_workers, + const uintx min_workers, + uintx active_workers, + uintx application_workers); + + // Return number of GC threads to use in the next GC. + // This is called sparingly so as not to change the + // number of GC workers gratuitously. + // For ParNew collections + // For PS scavenge and ParOld collections + // For G1 evacuation pauses (subject to update) + // Other collection phases inherit the number of + // GC workers from the calls above. For example, + // a CMS parallel remark uses the same number of GC + // workers as the most recent ParNew collection. + static int calc_active_workers(uintx total_workers, + uintx active_workers, + uintx application_workers); + + // Return number of GC threads to use in the next concurrent GC phase. + static int calc_active_conc_workers(uintx total_workers, + uintx active_workers, + uintx application_workers); + bool is_gc_cms_adaptive_size_policy() { return kind() == _gc_cms_adaptive_size_policy; } diff --git a/hotspot/src/share/vm/memory/cardTableModRefBS.cpp b/hotspot/src/share/vm/memory/cardTableModRefBS.cpp index c6645015dea..11b0e384da0 100644 --- a/hotspot/src/share/vm/memory/cardTableModRefBS.cpp +++ b/hotspot/src/share/vm/memory/cardTableModRefBS.cpp @@ -460,9 +460,43 @@ void CardTableModRefBS::non_clean_card_iterate_possibly_parallel(Space* sp, OopsInGenClosure* cl, CardTableRS* ct) { if (!mr.is_empty()) { - int n_threads = SharedHeap::heap()->n_par_threads(); - if (n_threads > 0) { + // Caller (process_strong_roots()) claims that all GC threads + // execute this call. With UseDynamicNumberOfGCThreads now all + // active GC threads execute this call. The number of active GC + // threads needs to be passed to par_non_clean_card_iterate_work() + // to get proper partitioning and termination. + // + // This is an example of where n_par_threads() is used instead + // of workers()->active_workers(). n_par_threads can be set to 0 to + // turn off parallelism. For example when this code is called as + // part of verification and SharedHeap::process_strong_roots() is being + // used, then n_par_threads() may have been set to 0. active_workers + // is not overloaded with the meaning that it is a switch to disable + // parallelism and so keeps the meaning of the number of + // active gc workers. If parallelism has not been shut off by + // setting n_par_threads to 0, then n_par_threads should be + // equal to active_workers. When a different mechanism for shutting + // off parallelism is used, then active_workers can be used in + // place of n_par_threads. + // This is an example of a path where n_par_threads is + // set to 0 to turn off parallism. + // [7] CardTableModRefBS::non_clean_card_iterate() + // [8] CardTableRS::younger_refs_in_space_iterate() + // [9] Generation::younger_refs_in_space_iterate() + // [10] OneContigSpaceCardGeneration::younger_refs_iterate() + // [11] CompactingPermGenGen::younger_refs_iterate() + // [12] CardTableRS::younger_refs_iterate() + // [13] SharedHeap::process_strong_roots() + // [14] G1CollectedHeap::verify() + // [15] Universe::verify() + // [16] G1CollectedHeap::do_collection_pause_at_safepoint() + // + int n_threads = SharedHeap::heap()->n_par_threads(); + bool is_par = n_threads > 0; + if (is_par) { #ifndef SERIALGC + assert(SharedHeap::heap()->n_par_threads() == + SharedHeap::heap()->workers()->active_workers(), "Mismatch"); non_clean_card_iterate_parallel_work(sp, mr, cl, ct, n_threads); #else // SERIALGC fatal("Parallel gc not supported here."); @@ -489,6 +523,10 @@ void CardTableModRefBS::non_clean_card_iterate_possibly_parallel(Space* sp, // change their values in any manner. void CardTableModRefBS::non_clean_card_iterate_serial(MemRegion mr, MemRegionClosure* cl) { + bool is_par = (SharedHeap::heap()->n_par_threads() > 0); + assert(!is_par || + (SharedHeap::heap()->n_par_threads() == + SharedHeap::heap()->workers()->active_workers()), "Mismatch"); for (int i = 0; i < _cur_covered_regions; i++) { MemRegion mri = mr.intersection(_covered[i]); if (mri.word_size() > 0) { diff --git a/hotspot/src/share/vm/memory/cardTableRS.cpp b/hotspot/src/share/vm/memory/cardTableRS.cpp index 4b47da783a1..c152d6cb03c 100644 --- a/hotspot/src/share/vm/memory/cardTableRS.cpp +++ b/hotspot/src/share/vm/memory/cardTableRS.cpp @@ -164,7 +164,13 @@ inline bool ClearNoncleanCardWrapper::clear_card_serial(jbyte* entry) { ClearNoncleanCardWrapper::ClearNoncleanCardWrapper( DirtyCardToOopClosure* dirty_card_closure, CardTableRS* ct) : _dirty_card_closure(dirty_card_closure), _ct(ct) { + // Cannot yet substitute active_workers for n_par_threads + // in the case where parallelism is being turned off by + // setting n_par_threads to 0. _is_par = (SharedHeap::heap()->n_par_threads() > 0); + assert(!_is_par || + (SharedHeap::heap()->n_par_threads() == + SharedHeap::heap()->workers()->active_workers()), "Mismatch"); } void ClearNoncleanCardWrapper::do_MemRegion(MemRegion mr) { diff --git a/hotspot/src/share/vm/memory/sharedHeap.cpp b/hotspot/src/share/vm/memory/sharedHeap.cpp index 15d8eaa5406..8094b6e7a46 100644 --- a/hotspot/src/share/vm/memory/sharedHeap.cpp +++ b/hotspot/src/share/vm/memory/sharedHeap.cpp @@ -58,7 +58,6 @@ SharedHeap::SharedHeap(CollectorPolicy* policy_) : _perm_gen(NULL), _rem_set(NULL), _strong_roots_parity(0), _process_strong_tasks(new SubTasksDone(SH_PS_NumElements)), - _n_par_threads(0), _workers(NULL) { if (_process_strong_tasks == NULL || !_process_strong_tasks->valid()) { @@ -80,6 +79,14 @@ SharedHeap::SharedHeap(CollectorPolicy* policy_) : } } +int SharedHeap::n_termination() { + return _process_strong_tasks->n_threads(); +} + +void SharedHeap::set_n_termination(int t) { + _process_strong_tasks->set_n_threads(t); +} + bool SharedHeap::heap_lock_held_for_gc() { Thread* t = Thread::current(); return Heap_lock->owned_by_self() @@ -144,6 +151,10 @@ void SharedHeap::process_strong_roots(bool activate_scope, StrongRootsScope srs(this, activate_scope); // General strong roots. assert(_strong_roots_parity != 0, "must have called prologue code"); + // _n_termination for _process_strong_tasks should be set up stream + // in a method not running in a GC worker. Otherwise the GC worker + // could be trying to change the termination condition while the task + // is executing in another GC worker. if (!_process_strong_tasks->is_task_claimed(SH_PS_Universe_oops_do)) { Universe::oops_do(roots); // Consider perm-gen discovered lists to be strong. diff --git a/hotspot/src/share/vm/memory/sharedHeap.hpp b/hotspot/src/share/vm/memory/sharedHeap.hpp index 55a9b569d9c..8e819c48717 100644 --- a/hotspot/src/share/vm/memory/sharedHeap.hpp +++ b/hotspot/src/share/vm/memory/sharedHeap.hpp @@ -49,6 +49,62 @@ class FlexibleWorkGang; class CollectorPolicy; class KlassHandle; +// Note on use of FlexibleWorkGang's for GC. +// There are three places where task completion is determined. +// In +// 1) ParallelTaskTerminator::offer_termination() where _n_threads +// must be set to the correct value so that count of workers that +// have offered termination will exactly match the number +// working on the task. Tasks such as those derived from GCTask +// use ParallelTaskTerminator's. Tasks that want load balancing +// by work stealing use this method to gauge completion. +// 2) SubTasksDone has a variable _n_threads that is used in +// all_tasks_completed() to determine completion. all_tasks_complete() +// counts the number of tasks that have been done and then reset +// the SubTasksDone so that it can be used again. When the number of +// tasks is set to the number of GC workers, then _n_threads must +// be set to the number of active GC workers. G1CollectedHeap, +// HRInto_G1RemSet, GenCollectedHeap and SharedHeap have SubTasksDone. +// This seems too many. +// 3) SequentialSubTasksDone has an _n_threads that is used in +// a way similar to SubTasksDone and has the same dependency on the +// number of active GC workers. CompactibleFreeListSpace and Space +// have SequentialSubTasksDone's. +// Example of using SubTasksDone and SequentialSubTasksDone +// G1CollectedHeap::g1_process_strong_roots() calls +// process_strong_roots(false, // no scoping; this is parallel code +// collecting_perm_gen, so, +// &buf_scan_non_heap_roots, +// &eager_scan_code_roots, +// &buf_scan_perm); +// which delegates to SharedHeap::process_strong_roots() and uses +// SubTasksDone* _process_strong_tasks to claim tasks. +// process_strong_roots() calls +// rem_set()->younger_refs_iterate(perm_gen(), perm_blk); +// to scan the card table and which eventually calls down into +// CardTableModRefBS::par_non_clean_card_iterate_work(). This method +// uses SequentialSubTasksDone* _pst to claim tasks. +// Both SubTasksDone and SequentialSubTasksDone call their method +// all_tasks_completed() to count the number of GC workers that have +// finished their work. That logic is "when all the workers are +// finished the tasks are finished". +// +// The pattern that appears in the code is to set _n_threads +// to a value > 1 before a task that you would like executed in parallel +// and then to set it to 0 after that task has completed. A value of +// 0 is a "special" value in set_n_threads() which translates to +// setting _n_threads to 1. +// +// Some code uses _n_terminiation to decide if work should be done in +// parallel. The notorious possibly_parallel_oops_do() in threads.cpp +// is an example of such code. Look for variable "is_par" for other +// examples. +// +// The active_workers is not reset to 0 after a parallel phase. It's +// value may be used in later phases and in one instance at least +// (the parallel remark) it has to be used (the parallel remark depends +// on the partitioning done in the previous parallel scavenge). + class SharedHeap : public CollectedHeap { friend class VMStructs; @@ -84,11 +140,6 @@ protected: // If we're doing parallel GC, use this gang of threads. FlexibleWorkGang* _workers; - // Number of parallel threads currently working on GC tasks. - // O indicates use sequential code; 1 means use parallel code even with - // only one thread, for performance testing purposes. - int _n_par_threads; - // Full initialization is done in a concrete subtype's "initialize" // function. SharedHeap(CollectorPolicy* policy_); @@ -107,6 +158,7 @@ public: CollectorPolicy *collector_policy() const { return _collector_policy; } void set_barrier_set(BarrierSet* bs); + SubTasksDone* process_strong_tasks() { return _process_strong_tasks; } // Does operations required after initialization has been done. virtual void post_initialize(); @@ -198,13 +250,6 @@ public: FlexibleWorkGang* workers() const { return _workers; } - // Sets the number of parallel threads that will be doing tasks - // (such as process strong roots) subsequently. - virtual void set_par_threads(int t); - - // Number of threads currently working on GC tasks. - int n_par_threads() { return _n_par_threads; } - // Invoke the "do_oop" method the closure "roots" on all root locations. // If "collecting_perm_gen" is false, then roots that may only contain // references to permGen objects are not scanned; instead, in that case, @@ -240,6 +285,13 @@ public: virtual void gc_prologue(bool full) = 0; virtual void gc_epilogue(bool full) = 0; + // Sets the number of parallel threads that will be doing tasks + // (such as process strong roots) subsequently. + virtual void set_par_threads(int t); + + int n_termination(); + void set_n_termination(int t); + // // New methods from CollectedHeap // diff --git a/hotspot/src/share/vm/runtime/arguments.cpp b/hotspot/src/share/vm/runtime/arguments.cpp index b8e514d5c17..a3a68e7b87c 100644 --- a/hotspot/src/share/vm/runtime/arguments.cpp +++ b/hotspot/src/share/vm/runtime/arguments.cpp @@ -1394,8 +1394,8 @@ void Arguments::set_parallel_gc_flags() { // If no heap maximum was requested explicitly, use some reasonable fraction // of the physical memory, up to a maximum of 1GB. if (UseParallelGC) { - FLAG_SET_ERGO(uintx, ParallelGCThreads, - Abstract_VM_Version::parallel_worker_threads()); + FLAG_SET_DEFAULT(ParallelGCThreads, + Abstract_VM_Version::parallel_worker_threads()); // If InitialSurvivorRatio or MinSurvivorRatio were not specified, but the // SurvivorRatio has been set, reset their default values to SurvivorRatio + diff --git a/hotspot/src/share/vm/runtime/globals.hpp b/hotspot/src/share/vm/runtime/globals.hpp index f330d44ea08..3ef356ebe13 100644 --- a/hotspot/src/share/vm/runtime/globals.hpp +++ b/hotspot/src/share/vm/runtime/globals.hpp @@ -1416,6 +1416,21 @@ class CommandLineFlags { product(uintx, ParallelGCThreads, 0, \ "Number of parallel threads parallel gc will use") \ \ + product(bool, UseDynamicNumberOfGCThreads, false, \ + "Dynamically choose the number of parallel threads " \ + "parallel gc will use") \ + \ + diagnostic(bool, ForceDynamicNumberOfGCThreads, false, \ + "Force dynamic selection of the number of" \ + "parallel threads parallel gc will use to aid debugging") \ + \ + product(uintx, HeapSizePerGCThread, ScaleForWordSize(64*M), \ + "Size of heap (bytes) per GC thread used in calculating the " \ + "number of GC threads") \ + \ + product(bool, TraceDynamicGCThreads, false, \ + "Trace the dynamic GC thread usage") \ + \ develop(bool, ParallelOldGCSplitALot, false, \ "Provoke splitting (copying data from a young gen space to" \ "multiple destination spaces)") \ @@ -2357,7 +2372,7 @@ class CommandLineFlags { develop(bool, TraceGCTaskQueue, false, \ "Trace actions of the GC task queues") \ \ - develop(bool, TraceGCTaskThread, false, \ + diagnostic(bool, TraceGCTaskThread, false, \ "Trace actions of the GC task threads") \ \ product(bool, PrintParallelOldGCPhaseTimes, false, \ diff --git a/hotspot/src/share/vm/runtime/thread.cpp b/hotspot/src/share/vm/runtime/thread.cpp index a8cbf854af0..da291036e8a 100644 --- a/hotspot/src/share/vm/runtime/thread.cpp +++ b/hotspot/src/share/vm/runtime/thread.cpp @@ -778,12 +778,12 @@ bool Thread::claim_oops_do_par_case(int strong_roots_parity) { return true; } else { guarantee(res == strong_roots_parity, "Or else what?"); - assert(SharedHeap::heap()->n_par_threads() > 0, - "Should only fail when parallel."); + assert(SharedHeap::heap()->workers()->active_workers() > 0, + "Should only fail when parallel."); return false; } } - assert(SharedHeap::heap()->n_par_threads() > 0, + assert(SharedHeap::heap()->workers()->active_workers() > 0, "Should only fail when parallel."); return false; } @@ -3939,7 +3939,15 @@ void Threads::possibly_parallel_oops_do(OopClosure* f, CodeBlobClosure* cf) { // root groups. Overhead should be small enough to use all the time, // even in sequential code. SharedHeap* sh = SharedHeap::heap(); - bool is_par = (sh->n_par_threads() > 0); + // Cannot yet substitute active_workers for n_par_threads + // because of G1CollectedHeap::verify() use of + // SharedHeap::process_strong_roots(). n_par_threads == 0 will + // turn off parallelism in process_strong_roots while active_workers + // is being used for parallelism elsewhere. + bool is_par = sh->n_par_threads() > 0; + assert(!is_par || + (SharedHeap::heap()->n_par_threads() == + SharedHeap::heap()->workers()->active_workers()), "Mismatch"); int cp = SharedHeap::heap()->strong_roots_parity(); ALL_JAVA_THREADS(p) { if (p->claim_oops_do(is_par, cp)) { diff --git a/hotspot/src/share/vm/utilities/workgroup.cpp b/hotspot/src/share/vm/utilities/workgroup.cpp index e53d78d49b3..8b695528ec4 100644 --- a/hotspot/src/share/vm/utilities/workgroup.cpp +++ b/hotspot/src/share/vm/utilities/workgroup.cpp @@ -57,7 +57,6 @@ WorkGang::WorkGang(const char* name, bool are_GC_task_threads, bool are_ConcurrentGC_threads) : AbstractWorkGang(name, are_GC_task_threads, are_ConcurrentGC_threads) { - // Save arguments. _total_workers = workers; } @@ -127,6 +126,12 @@ GangWorker* AbstractWorkGang::gang_worker(int i) const { } void WorkGang::run_task(AbstractGangTask* task) { + run_task(task, total_workers()); +} + +void WorkGang::run_task(AbstractGangTask* task, uint no_of_parallel_workers) { + task->set_for_termination(no_of_parallel_workers); + // This thread is executed by the VM thread which does not block // on ordinary MutexLocker's. MutexLockerEx ml(monitor(), Mutex::_no_safepoint_check_flag); @@ -143,22 +148,32 @@ void WorkGang::run_task(AbstractGangTask* task) { // Tell the workers to get to work. monitor()->notify_all(); // Wait for them to be finished - while (finished_workers() < total_workers()) { + while (finished_workers() < (int) no_of_parallel_workers) { if (TraceWorkGang) { tty->print_cr("Waiting in work gang %s: %d/%d finished sequence %d", - name(), finished_workers(), total_workers(), + name(), finished_workers(), no_of_parallel_workers, _sequence_number); } monitor()->wait(/* no_safepoint_check */ true); } _task = NULL; if (TraceWorkGang) { - tty->print_cr("/nFinished work gang %s: %d/%d sequence %d", - name(), finished_workers(), total_workers(), + tty->print_cr("\nFinished work gang %s: %d/%d sequence %d", + name(), finished_workers(), no_of_parallel_workers, _sequence_number); + Thread* me = Thread::current(); + tty->print_cr(" T: 0x%x VM_thread: %d", me, me->is_VM_thread()); } } +void FlexibleWorkGang::run_task(AbstractGangTask* task) { + // If active_workers() is passed, _finished_workers + // must only be incremented for workers that find non_null + // work (as opposed to all those that just check that the + // task is not null). + WorkGang::run_task(task, (uint) active_workers()); +} + void AbstractWorkGang::stop() { // Tell all workers to terminate, then wait for them to become inactive. MutexLockerEx ml(monitor(), Mutex::_no_safepoint_check_flag); @@ -168,10 +183,10 @@ void AbstractWorkGang::stop() { _task = NULL; _terminate = true; monitor()->notify_all(); - while (finished_workers() < total_workers()) { + while (finished_workers() < active_workers()) { if (TraceWorkGang) { tty->print_cr("Waiting in work gang %s: %d/%d finished", - name(), finished_workers(), total_workers()); + name(), finished_workers(), active_workers()); } monitor()->wait(/* no_safepoint_check */ true); } @@ -275,10 +290,12 @@ void GangWorker::loop() { // Check for new work. if ((data.task() != NULL) && (data.sequence_number() != previous_sequence_number)) { - gang()->internal_note_start(); - gang_monitor->notify_all(); - part = gang()->started_workers() - 1; - break; + if (gang()->needs_more_workers()) { + gang()->internal_note_start(); + gang_monitor->notify_all(); + part = gang()->started_workers() - 1; + break; + } } // Nothing to do. gang_monitor->wait(/* no_safepoint_check */ true); @@ -350,6 +367,9 @@ const char* AbstractGangTask::name() const { #endif /* PRODUCT */ +// FlexibleWorkGang + + // *** WorkGangBarrierSync WorkGangBarrierSync::WorkGangBarrierSync() @@ -411,10 +431,8 @@ bool SubTasksDone::valid() { } void SubTasksDone::set_n_threads(int t) { -#ifdef ASSERT assert(_claimed == 0 || _threads_completed == _n_threads, "should not be called while tasks are being processed!"); -#endif _n_threads = (t == 0 ? 1 : t); } diff --git a/hotspot/src/share/vm/utilities/workgroup.hpp b/hotspot/src/share/vm/utilities/workgroup.hpp index 8e9effebd7a..91161634772 100644 --- a/hotspot/src/share/vm/utilities/workgroup.hpp +++ b/hotspot/src/share/vm/utilities/workgroup.hpp @@ -96,11 +96,14 @@ private: protected: // Constructor and desctructor: only construct subclasses. - AbstractGangTask(const char* name) { + AbstractGangTask(const char* name) + { NOT_PRODUCT(_name = name); _counter = 0; } virtual ~AbstractGangTask() { } + +public: }; class AbstractGangTaskWOopQueues : public AbstractGangTask { @@ -116,6 +119,7 @@ class AbstractGangTaskWOopQueues : public AbstractGangTask { OopTaskQueueSet* queues() { return _queues; } }; + // Class AbstractWorkGang: // An abstract class representing a gang of workers. // You subclass this to supply an implementation of run_task(). @@ -130,6 +134,8 @@ public: virtual void run_task(AbstractGangTask* task) = 0; // Stop and terminate all workers. virtual void stop(); + // Return true if more workers should be applied to the task. + virtual bool needs_more_workers() const { return true; } public: // Debugging. const char* name() const; @@ -287,20 +293,62 @@ public: AbstractWorkGang* gang() const { return _gang; } }; +// Dynamic number of worker threads +// +// This type of work gang is used to run different numbers of +// worker threads at different times. The +// number of workers run for a task is "_active_workers" +// instead of "_total_workers" in a WorkGang. The method +// "needs_more_workers()" returns true until "_active_workers" +// have been started and returns false afterwards. The +// implementation of "needs_more_workers()" in WorkGang always +// returns true so that all workers are started. The method +// "loop()" in GangWorker was modified to ask "needs_more_workers()" +// in its loop to decide if it should start working on a task. +// A worker in "loop()" waits for notification on the WorkGang +// monitor and execution of each worker as it checks for work +// is serialized via the same monitor. The "needs_more_workers()" +// call is serialized and additionally the calculation for the +// "part" (effectively the worker id for executing the task) is +// serialized to give each worker a unique "part". Workers that +// are not needed for this tasks (i.e., "_active_workers" have +// been started before it, continue to wait for work. + class FlexibleWorkGang: public WorkGang { + // The currently active workers in this gang. + // This is a number that is dynamically adjusted + // and checked in the run_task() method at each invocation. + // As described above _active_workers determines the number + // of threads started on a task. It must also be used to + // determine completion. + protected: int _active_workers; public: // Constructor and destructor. + // Initialize active_workers to a minimum value. Setting it to + // the parameter "workers" will initialize it to a maximum + // value which is not desirable. FlexibleWorkGang(const char* name, int workers, bool are_GC_task_threads, bool are_ConcurrentGC_threads) : - WorkGang(name, workers, are_GC_task_threads, are_ConcurrentGC_threads) { - _active_workers = ParallelGCThreads; - }; + WorkGang(name, workers, are_GC_task_threads, are_ConcurrentGC_threads), + _active_workers(UseDynamicNumberOfGCThreads ? 1 : ParallelGCThreads) {}; // Accessors for fields virtual int active_workers() const { return _active_workers; } - void set_active_workers(int v) { _active_workers = v; } + void set_active_workers(int v) { + assert(v <= _total_workers, + "Trying to set more workers active than there are"); + _active_workers = MIN2(v, _total_workers); + assert(v != 0, "Trying to set active workers to 0"); + _active_workers = MAX2(1, _active_workers); + assert(UseDynamicNumberOfGCThreads || _active_workers == _total_workers, + "Unless dynamic should use total workers"); + } + virtual void run_task(AbstractGangTask* task); + virtual bool needs_more_workers() const { + return _started_workers < _active_workers; + } }; // Work gangs in garbage collectors: 2009-06-10 @@ -357,6 +405,11 @@ public: class SubTasksDone: public CHeapObj { jint* _tasks; int _n_tasks; + // _n_threads is used to determine when a sub task is done. + // It does not control how many threads will execute the subtask + // but must be initialized to the number that do execute the task + // in order to correctly decide when the subtask is done (all the + // threads working on the task have finished). int _n_threads; jint _threads_completed; #ifdef ASSERT diff --git a/hotspot/src/share/vm/utilities/yieldingWorkgroup.cpp b/hotspot/src/share/vm/utilities/yieldingWorkgroup.cpp index 075b4cba740..d8daf7a5d84 100644 --- a/hotspot/src/share/vm/utilities/yieldingWorkgroup.cpp +++ b/hotspot/src/share/vm/utilities/yieldingWorkgroup.cpp @@ -125,7 +125,7 @@ void YieldingFlexibleWorkGang::start_task(YieldingFlexibleGangTask* new_task) { if (requested_size != 0) { _active_workers = MIN2(requested_size, total_workers()); } else { - _active_workers = total_workers(); + _active_workers = active_workers(); } new_task->set_actual_size(_active_workers); new_task->set_for_termination(_active_workers); @@ -148,22 +148,22 @@ void YieldingFlexibleWorkGang::wait_for_gang() { for (Status status = yielding_task()->status(); status != COMPLETED && status != YIELDED && status != ABORTED; status = yielding_task()->status()) { - assert(started_workers() <= total_workers(), "invariant"); - assert(finished_workers() <= total_workers(), "invariant"); - assert(yielded_workers() <= total_workers(), "invariant"); + assert(started_workers() <= active_workers(), "invariant"); + assert(finished_workers() <= active_workers(), "invariant"); + assert(yielded_workers() <= active_workers(), "invariant"); monitor()->wait(Mutex::_no_safepoint_check_flag); } switch (yielding_task()->status()) { case COMPLETED: case ABORTED: { - assert(finished_workers() == total_workers(), "Inconsistent status"); + assert(finished_workers() == active_workers(), "Inconsistent status"); assert(yielded_workers() == 0, "Invariant"); reset(); // for next task; gang<->task binding released break; } case YIELDED: { assert(yielded_workers() > 0, "Invariant"); - assert(yielded_workers() + finished_workers() == total_workers(), + assert(yielded_workers() + finished_workers() == active_workers(), "Inconsistent counts"); break; } @@ -182,7 +182,6 @@ void YieldingFlexibleWorkGang::continue_task( MutexLockerEx ml(monitor(), Mutex::_no_safepoint_check_flag); assert(task() != NULL && task() == gang_task, "Incorrect usage"); - // assert(_active_workers == total_workers(), "For now"); assert(_started_workers == _active_workers, "Precondition"); assert(_yielded_workers > 0 && yielding_task()->status() == YIELDED, "Else why are we calling continue_task()"); @@ -202,7 +201,7 @@ void YieldingFlexibleWorkGang::reset() { void YieldingFlexibleWorkGang::yield() { assert(task() != NULL, "Inconsistency; should have task binding"); MutexLockerEx ml(monitor(), Mutex::_no_safepoint_check_flag); - assert(yielded_workers() < total_workers(), "Consistency check"); + assert(yielded_workers() < active_workers(), "Consistency check"); if (yielding_task()->status() == ABORTING) { // Do not yield; we need to abort as soon as possible // XXX NOTE: This can cause a performance pathology in the @@ -213,7 +212,7 @@ void YieldingFlexibleWorkGang::yield() { // us to return at each potential yield point. return; } - if (++_yielded_workers + finished_workers() == total_workers()) { + if (++_yielded_workers + finished_workers() == active_workers()) { yielding_task()->set_status(YIELDED); monitor()->notify_all(); } else { diff --git a/hotspot/src/share/vm/utilities/yieldingWorkgroup.hpp b/hotspot/src/share/vm/utilities/yieldingWorkgroup.hpp index a3171c8ca4a..6d8c6bf66f4 100644 --- a/hotspot/src/share/vm/utilities/yieldingWorkgroup.hpp +++ b/hotspot/src/share/vm/utilities/yieldingWorkgroup.hpp @@ -199,16 +199,10 @@ public: void abort(); private: - int _active_workers; int _yielded_workers; void wait_for_gang(); public: - // Accessors for fields - int active_workers() const { - return _active_workers; - } - // Accessors for fields int yielded_workers() const { return _yielded_workers;