8087323: Unify and split the work gang classes

Reviewed-by: jmasa, sjohanss
This commit is contained in:
Stefan Karlsson 2015-06-29 11:09:39 +02:00
parent aee130df16
commit e25bcfd3d3
13 changed files with 329 additions and 327 deletions

View File

@ -3005,7 +3005,7 @@ void CMSCollector::checkpointRootsInitialWork() {
COMPILER2_PRESENT(DerivedPointerTableDeactivate dpt_deact;)
if (CMSParallelInitialMarkEnabled) {
// The parallel version.
FlexibleWorkGang* workers = gch->workers();
WorkGang* workers = gch->workers();
assert(workers != NULL, "Need parallel worker threads.");
uint n_workers = workers->active_workers();
@ -4488,7 +4488,7 @@ class CMSParRemarkTask: public CMSParMarkTask {
// workers to be taken from the active workers in the work gang.
CMSParRemarkTask(CMSCollector* collector,
CompactibleFreeListSpace* cms_space,
uint n_workers, FlexibleWorkGang* workers,
uint n_workers, WorkGang* workers,
OopTaskQueueSet* task_queues,
StrongRootsScope* strong_roots_scope):
CMSParMarkTask("Rescan roots and grey objects in parallel",
@ -5061,7 +5061,7 @@ initialize_sequential_subtasks_for_young_gen_rescan(int n_threads) {
// Parallel version of remark
void CMSCollector::do_remark_parallel() {
GenCollectedHeap* gch = GenCollectedHeap::heap();
FlexibleWorkGang* workers = gch->workers();
WorkGang* workers = gch->workers();
assert(workers != NULL, "Need parallel worker threads.");
// Choose to use the number of GC workers most recently set
// into "active_workers".
@ -5236,6 +5236,16 @@ void CMSCollector::do_remark_non_parallel() {
////////////////////////////////////////////////////////
// Parallel Reference Processing Task Proxy Class
////////////////////////////////////////////////////////
class AbstractGangTaskWOopQueues : public AbstractGangTask {
OopTaskQueueSet* _queues;
ParallelTaskTerminator _terminator;
public:
AbstractGangTaskWOopQueues(const char* name, OopTaskQueueSet* queues, uint n_threads) :
AbstractGangTask(name), _queues(queues), _terminator(n_threads, _queues) {}
ParallelTaskTerminator* terminator() { return &_terminator; }
OopTaskQueueSet* queues() { return _queues; }
};
class CMSRefProcTaskProxy: public AbstractGangTaskWOopQueues {
typedef AbstractRefProcTaskExecutor::ProcessTask ProcessTask;
CMSCollector* _collector;
@ -5372,7 +5382,7 @@ void CMSRefProcTaskProxy::do_work_steal(int i,
void CMSRefProcTaskExecutor::execute(ProcessTask& task)
{
GenCollectedHeap* gch = GenCollectedHeap::heap();
FlexibleWorkGang* workers = gch->workers();
WorkGang* workers = gch->workers();
assert(workers != NULL, "Need parallel worker threads.");
CMSRefProcTaskProxy rp_task(task, &_collector,
_collector.ref_processor()->span(),
@ -5385,7 +5395,7 @@ void CMSRefProcTaskExecutor::execute(EnqueueTask& task)
{
GenCollectedHeap* gch = GenCollectedHeap::heap();
FlexibleWorkGang* workers = gch->workers();
WorkGang* workers = gch->workers();
assert(workers != NULL, "Need parallel worker threads.");
CMSRefEnqueueTaskProxy enq_task(task);
workers->run_task(&enq_task);
@ -5419,7 +5429,7 @@ void CMSCollector::refProcessingWork() {
// balance_all_queues() and balance_queues()).
GenCollectedHeap* gch = GenCollectedHeap::heap();
uint active_workers = ParallelGCThreads;
FlexibleWorkGang* workers = gch->workers();
WorkGang* workers = gch->workers();
if (workers != NULL) {
active_workers = workers->active_workers();
// The expectation is that active_workers will have already

View File

@ -803,7 +803,7 @@ public:
void ParNewRefProcTaskExecutor::execute(ProcessTask& task)
{
GenCollectedHeap* gch = GenCollectedHeap::heap();
FlexibleWorkGang* workers = gch->workers();
WorkGang* workers = gch->workers();
assert(workers != NULL, "Need parallel worker threads.");
_state_set.reset(workers->active_workers(), _young_gen.promotion_failed());
ParNewRefProcTaskProxy rp_task(task, _young_gen, _old_gen,
@ -816,7 +816,7 @@ void ParNewRefProcTaskExecutor::execute(ProcessTask& task)
void ParNewRefProcTaskExecutor::execute(EnqueueTask& task)
{
GenCollectedHeap* gch = GenCollectedHeap::heap();
FlexibleWorkGang* workers = gch->workers();
WorkGang* workers = gch->workers();
assert(workers != NULL, "Need parallel worker threads.");
ParNewRefEnqueueTaskProxy enq_task(task);
workers->run_task(&enq_task);
@ -890,7 +890,7 @@ void ParNewGeneration::collect(bool full,
_gc_timer->register_gc_start();
AdaptiveSizePolicy* size_policy = gch->gen_policy()->size_policy();
FlexibleWorkGang* workers = gch->workers();
WorkGang* workers = gch->workers();
assert(workers != NULL, "Need workgang for parallel work");
uint active_workers =
AdaptiveSizePolicy::calc_active_workers(workers->total_workers(),

View File

@ -26,20 +26,45 @@
#include "gc/cms/yieldingWorkgroup.hpp"
#include "utilities/macros.hpp"
// Forward declaration of classes declared here.
class GangWorker;
class WorkData;
YieldingFlexibleGangWorker::YieldingFlexibleGangWorker(YieldingFlexibleWorkGang* gang, int id)
: AbstractGangWorker(gang, id) {}
YieldingFlexibleWorkGang::YieldingFlexibleWorkGang(
const char* name, uint workers, bool are_GC_task_threads) :
FlexibleWorkGang(name, workers, are_GC_task_threads, false),
_yielded_workers(0) {}
AbstractWorkGang(name, workers, are_GC_task_threads, false),
_yielded_workers(0),
_started_workers(0),
_finished_workers(0),
_sequence_number(0),
_task(NULL) {
GangWorker* YieldingFlexibleWorkGang::allocate_worker(uint which) {
YieldingFlexibleGangWorker* new_member =
new YieldingFlexibleGangWorker(this, which);
return (YieldingFlexibleGangWorker*) new_member;
// Other initialization.
_monitor = new Monitor(/* priority */ Mutex::leaf,
/* name */ "WorkGroup monitor",
/* allow_vm_block */ are_GC_task_threads,
Monitor::_safepoint_check_sometimes);
assert(monitor() != NULL, "Failed to allocate monitor");
}
AbstractGangWorker* YieldingFlexibleWorkGang::allocate_worker(uint which) {
return new YieldingFlexibleGangWorker(this, which);
}
void YieldingFlexibleWorkGang::internal_worker_poll(YieldingWorkData* data) const {
assert(data != NULL, "worker data is null");
data->set_task(task());
data->set_sequence_number(sequence_number());
}
void YieldingFlexibleWorkGang::internal_note_start() {
assert(monitor()->owned_by_self(), "note_finish is an internal method");
_started_workers += 1;
}
void YieldingFlexibleWorkGang::internal_note_finish() {
assert(monitor()->owned_by_self(), "note_finish is an internal method");
_finished_workers += 1;
}
// Run a task; returns when the task is done, or the workers yield,
@ -292,37 +317,37 @@ void YieldingFlexibleGangTask::abort() {
///////////////////////////////
void YieldingFlexibleGangWorker::loop() {
int previous_sequence_number = 0;
Monitor* gang_monitor = gang()->monitor();
Monitor* gang_monitor = yf_gang()->monitor();
MutexLockerEx ml(gang_monitor, Mutex::_no_safepoint_check_flag);
WorkData data;
YieldingWorkData data;
int id;
while (true) {
// Check if there is work to do.
gang()->internal_worker_poll(&data);
yf_gang()->internal_worker_poll(&data);
if (data.task() != NULL && data.sequence_number() != previous_sequence_number) {
// There is work to be done.
// First check if we need to become active or if there
// are already the requisite number of workers
if (gang()->started_workers() == yf_gang()->active_workers()) {
if (yf_gang()->started_workers() == yf_gang()->active_workers()) {
// There are already enough workers, we do not need to
// to run; fall through and wait on monitor.
} else {
// We need to pitch in and do the work.
assert(gang()->started_workers() < yf_gang()->active_workers(),
assert(yf_gang()->started_workers() < yf_gang()->active_workers(),
"Unexpected state");
id = gang()->started_workers();
gang()->internal_note_start();
id = yf_gang()->started_workers();
yf_gang()->internal_note_start();
// Now, release the gang mutex and do the work.
{
MutexUnlockerEx mul(gang_monitor, Mutex::_no_safepoint_check_flag);
data.task()->work(id); // This might include yielding
}
// Reacquire monitor and note completion of this worker
gang()->internal_note_finish();
yf_gang()->internal_note_finish();
// Update status of task based on whether all workers have
// finished or some have yielded
assert(data.task() == gang()->task(), "Confused task binding");
if (gang()->finished_workers() == yf_gang()->active_workers()) {
assert(data.task() == yf_gang()->task(), "Confused task binding");
if (yf_gang()->finished_workers() == yf_gang()->active_workers()) {
switch (data.yf_task()->status()) {
case ABORTING: {
data.yf_task()->set_status(ABORTED);
@ -338,7 +363,7 @@ void YieldingFlexibleGangWorker::loop() {
}
gang_monitor->notify_all(); // Notify overseer
} else { // at least one worker is still working or yielded
assert(gang()->finished_workers() < yf_gang()->active_workers(),
assert(yf_gang()->finished_workers() < yf_gang()->active_workers(),
"Counts inconsistent");
switch (data.yf_task()->status()) {
case ACTIVE: {
@ -347,7 +372,7 @@ void YieldingFlexibleGangWorker::loop() {
break;
}
case YIELDING: {
if (gang()->finished_workers() + yf_gang()->yielded_workers()
if (yf_gang()->finished_workers() + yf_gang()->yielded_workers()
== yf_gang()->active_workers()) {
data.yf_task()->set_status(YIELDED);
gang_monitor->notify_all(); // notify overseer

View File

@ -29,6 +29,7 @@
#include "utilities/macros.hpp"
// Forward declarations
class YieldingFlexibleGangTask;
class YieldingFlexibleWorkGang;
// Status of tasks
@ -43,13 +44,32 @@ enum Status {
COMPLETED
};
class YieldingWorkData: public StackObj {
// This would be a struct, but I want accessor methods.
private:
AbstractGangTask* _task;
int _sequence_number;
public:
// Constructor and destructor
YieldingWorkData() : _task(NULL), _sequence_number(0) {}
~YieldingWorkData() {}
// Accessors and modifiers
AbstractGangTask* task() const { return _task; }
void set_task(AbstractGangTask* value) { _task = value; }
int sequence_number() const { return _sequence_number; }
void set_sequence_number(int value) { _sequence_number = value; }
YieldingFlexibleGangTask* yf_task() const {
return (YieldingFlexibleGangTask*)_task;
}
};
// Class YieldingFlexibleGangWorker:
// Several instances of this class run in parallel as workers for a gang.
class YieldingFlexibleGangWorker: public GangWorker {
class YieldingFlexibleGangWorker: public AbstractGangWorker {
public:
// Ctor
YieldingFlexibleGangWorker(AbstractWorkGang* gang, int id) :
GangWorker(gang, id) { }
YieldingFlexibleGangWorker(YieldingFlexibleWorkGang* gang, int id);
public:
YieldingFlexibleWorkGang* yf_gang() const
@ -108,9 +128,6 @@ protected:
friend class YieldingFlexibleWorkGang;
friend class YieldingFlexibleGangWorker;
NOT_PRODUCT(virtual bool is_YieldingFlexibleGang_task() const {
return true;
})
void set_status(Status s) {
_status = s;
@ -160,7 +177,7 @@ public:
// YieldingGangWorkers, and provides infrastructure
// supporting yielding to the "GangOverseer",
// being the thread that orchestrates the WorkGang via run_task().
class YieldingFlexibleWorkGang: public FlexibleWorkGang {
class YieldingFlexibleWorkGang: public AbstractWorkGang {
// Here's the public interface to this class.
public:
// Constructor and destructor.
@ -168,12 +185,10 @@ public:
bool are_GC_task_threads);
YieldingFlexibleGangTask* yielding_task() const {
assert(task() == NULL || task()->is_YieldingFlexibleGang_task(),
"Incorrect cast");
return (YieldingFlexibleGangTask*)task();
return task();
}
// Allocate a worker and return a pointer to it.
GangWorker* allocate_worker(uint which);
AbstractGangWorker* allocate_worker(uint which);
// Run a task; returns when the task is done, or the workers yield,
// or the task is aborted.
@ -216,6 +231,42 @@ public:
private:
friend class YieldingFlexibleGangWorker;
void reset(); // NYI
// The monitor which protects these data,
// and notifies of changes in it.
Monitor* _monitor;
// Accessors for fields
Monitor* monitor() const {
return _monitor;
}
// The number of started workers.
uint _started_workers;
// The number of finished workers.
uint _finished_workers;
uint started_workers() const {
return _started_workers;
}
uint finished_workers() const {
return _finished_workers;
}
// A sequence number for the current task.
int _sequence_number;
int sequence_number() const {
return _sequence_number;
}
YieldingFlexibleGangTask* _task;
YieldingFlexibleGangTask* task() const {
return _task;
}
void internal_worker_poll(YieldingWorkData* data) const;
void internal_note_start();
void internal_note_finish();
};
#endif // SHARE_VM_GC_CMS_YIELDINGWORKGROUP_HPP

View File

@ -629,7 +629,7 @@ ConcurrentMark::ConcurrentMark(G1CollectedHeap* g1h, G1RegionToSpaceMapper* prev
gclog_or_tty->print_cr("CL Sleep Factor %1.4lf", cleanup_sleep_factor());
#endif
_parallel_workers = new FlexibleWorkGang("G1 Marker",
_parallel_workers = new WorkGang("G1 Marker",
_max_parallel_marking_threads, false, true);
if (_parallel_workers == NULL) {
vm_exit_during_initialization("Failed necessary allocation.");

View File

@ -451,7 +451,7 @@ protected:
double* _accum_task_vtime; // Accumulated task vtime
FlexibleWorkGang* _parallel_workers;
WorkGang* _parallel_workers;
ForceOverflowSettings _force_overflow_conc;
ForceOverflowSettings _force_overflow_stw;

View File

@ -1960,7 +1960,7 @@ G1CollectedHeap::G1CollectedHeap(G1CollectorPolicy* policy_) :
_gc_tracer_stw(new (ResourceObj::C_HEAP, mtGC) G1NewTracer()),
_gc_tracer_cm(new (ResourceObj::C_HEAP, mtGC) G1OldTracer()) {
_workers = new FlexibleWorkGang("GC Thread", ParallelGCThreads,
_workers = new WorkGang("GC Thread", ParallelGCThreads,
/* are_GC_task_threads */true,
/* are_ConcurrentGC_threads */false);
_workers->initialize_workers();
@ -5127,12 +5127,12 @@ class G1STWRefProcTaskExecutor: public AbstractRefProcTaskExecutor {
private:
G1CollectedHeap* _g1h;
RefToScanQueueSet* _queues;
FlexibleWorkGang* _workers;
WorkGang* _workers;
uint _active_workers;
public:
G1STWRefProcTaskExecutor(G1CollectedHeap* g1h,
FlexibleWorkGang* workers,
WorkGang* workers,
RefToScanQueueSet *task_queues,
uint n_workers) :
_g1h(g1h),

View File

@ -75,7 +75,7 @@ class G1OldTracer;
class EvacuationFailedInfo;
class nmethod;
class Ticks;
class FlexibleWorkGang;
class WorkGang;
typedef OverflowTaskQueue<StarTask, mtGC> RefToScanQueue;
typedef GenericTaskQueueSet<RefToScanQueue, mtGC> RefToScanQueueSet;
@ -200,7 +200,7 @@ class G1CollectedHeap : public CollectedHeap {
friend class G1CheckCSetFastTableClosure;
private:
FlexibleWorkGang* _workers;
WorkGang* _workers;
static size_t _humongous_object_threshold_in_words;
@ -588,7 +588,7 @@ protected:
void enqueue_discovered_references();
public:
FlexibleWorkGang* workers() const { return _workers; }
WorkGang* workers() const { return _workers; }
G1Allocator* allocator() {
return _allocator;

View File

@ -1582,7 +1582,7 @@ void
G1CollectorPolicy::record_concurrent_mark_cleanup_end() {
_collectionSetChooser->clear();
FlexibleWorkGang* workers = _g1->workers();
WorkGang* workers = _g1->workers();
uint n_workers = workers->active_workers();
uint n_regions = _g1->num_regions();

View File

@ -86,7 +86,7 @@ GenCollectedHeap::GenCollectedHeap(GenCollectorPolicy *policy) :
{
assert(policy != NULL, "Sanity check");
if (UseConcMarkSweepGC) {
_workers = new FlexibleWorkGang("GC Thread", ParallelGCThreads,
_workers = new WorkGang("GC Thread", ParallelGCThreads,
/* are_GC_task_threads */true,
/* are_ConcurrentGC_threads */false);
_workers->initialize_workers();

View File

@ -30,7 +30,7 @@
#include "gc/shared/collectorPolicy.hpp"
#include "gc/shared/generation.hpp"
class FlexibleWorkGang;
class WorkGang;
class StrongRootsScope;
class SubTasksDone;
@ -90,7 +90,7 @@ private:
// In block contents verification, the number of header words to skip
NOT_PRODUCT(static size_t _skip_header_HeapWords;)
FlexibleWorkGang* _workers;
WorkGang* _workers;
protected:
// Helper functions for allocation
@ -124,7 +124,7 @@ protected:
public:
GenCollectedHeap(GenCollectorPolicy *policy);
FlexibleWorkGang* workers() const { return _workers; }
WorkGang* workers() const { return _workers; }
GCStats* gc_stats(Generation* generation) const;

View File

@ -31,55 +31,20 @@
// Definitions of WorkGang methods.
AbstractWorkGang::AbstractWorkGang(const char* name,
bool are_GC_task_threads,
bool are_ConcurrentGC_threads) :
_name(name),
_are_GC_task_threads(are_GC_task_threads),
_are_ConcurrentGC_threads(are_ConcurrentGC_threads) {
assert(!(are_GC_task_threads && are_ConcurrentGC_threads),
"They cannot both be STW GC and Concurrent threads" );
// Other initialization.
_monitor = new Monitor(/* priority */ Mutex::leaf,
/* name */ "WorkGroup monitor",
/* allow_vm_block */ are_GC_task_threads,
Monitor::_safepoint_check_sometimes);
assert(monitor() != NULL, "Failed to allocate monitor");
_task = NULL;
_sequence_number = 0;
_started_workers = 0;
_finished_workers = 0;
}
WorkGang::WorkGang(const char* name,
uint workers,
bool are_GC_task_threads,
bool are_ConcurrentGC_threads) :
AbstractWorkGang(name, are_GC_task_threads, are_ConcurrentGC_threads) {
_total_workers = workers;
}
GangWorker* WorkGang::allocate_worker(uint which) {
GangWorker* new_worker = new GangWorker(this, which);
return new_worker;
}
// The current implementation will exit if the allocation
// of any worker fails. Still, return a boolean so that
// a future implementation can possibly do a partial
// initialization of the workers and report such to the
// caller.
bool WorkGang::initialize_workers() {
bool AbstractWorkGang::initialize_workers() {
if (TraceWorkGang) {
tty->print_cr("Constructing work gang %s with %d threads",
name(),
total_workers());
}
_gang_workers = NEW_C_HEAP_ARRAY(GangWorker*, total_workers(), mtInternal);
if (gang_workers() == NULL) {
_workers = NEW_C_HEAP_ARRAY(AbstractGangWorker*, total_workers(), mtInternal);
if (_workers == NULL) {
vm_exit_out_of_memory(0, OOM_MALLOC_ERROR, "Cannot create GangWorker array.");
return false;
}
@ -90,9 +55,9 @@ bool WorkGang::initialize_workers() {
worker_type = os::pgc_thread;
}
for (uint worker = 0; worker < total_workers(); worker += 1) {
GangWorker* new_worker = allocate_worker(worker);
AbstractGangWorker* new_worker = allocate_worker(worker);
assert(new_worker != NULL, "Failed to allocate GangWorker");
_gang_workers[worker] = new_worker;
_workers[worker] = new_worker;
if (new_worker == NULL || !os::create_thread(new_worker, worker_type)) {
vm_exit_out_of_memory(0, OOM_MALLOC_ERROR,
"Cannot create worker GC thread. Out of system resources.");
@ -105,18 +70,57 @@ bool WorkGang::initialize_workers() {
return true;
}
GangWorker* AbstractWorkGang::gang_worker(uint i) const {
AbstractGangWorker* AbstractWorkGang::worker(uint i) const {
// Array index bounds checking.
GangWorker* result = NULL;
assert(gang_workers() != NULL, "No workers for indexing");
AbstractGangWorker* result = NULL;
assert(_workers != NULL, "No workers for indexing");
assert(i < total_workers(), "Worker index out of bounds");
result = _gang_workers[i];
result = _workers[i];
assert(result != NULL, "Indexing to null worker");
return result;
}
void AbstractWorkGang::print_worker_threads_on(outputStream* st) const {
uint workers = total_workers();
for (uint i = 0; i < workers; i++) {
worker(i)->print_on(st);
st->cr();
}
}
void AbstractWorkGang::threads_do(ThreadClosure* tc) const {
assert(tc != NULL, "Null ThreadClosure");
uint workers = total_workers();
for (uint i = 0; i < workers; i++) {
tc->do_thread(worker(i));
}
}
WorkGang::WorkGang(const char* name,
uint workers,
bool are_GC_task_threads,
bool are_ConcurrentGC_threads) :
AbstractWorkGang(name, workers, are_GC_task_threads, are_ConcurrentGC_threads),
_started_workers(0),
_finished_workers(0),
_sequence_number(0),
_task(NULL) {
// Other initialization.
_monitor = new Monitor(/* priority */ Mutex::leaf,
/* name */ "WorkGroup monitor",
/* allow_vm_block */ are_GC_task_threads,
Monitor::_safepoint_check_sometimes);
assert(monitor() != NULL, "Failed to allocate monitor");
}
AbstractGangWorker* WorkGang::allocate_worker(uint worker_id) {
return new GangWorker(this, worker_id);
}
void WorkGang::run_task(AbstractGangTask* task) {
run_task(task, total_workers());
run_task(task, (uint)active_workers());
}
void WorkGang::run_task(AbstractGangTask* task, uint no_of_parallel_workers) {
@ -154,61 +158,37 @@ void WorkGang::run_task(AbstractGangTask* task, uint no_of_parallel_workers) {
}
}
void FlexibleWorkGang::run_task(AbstractGangTask* task) {
// If active_workers() is passed, _finished_workers
// must only be incremented for workers that find non_null
// work (as opposed to all those that just check that the
// task is not null).
WorkGang::run_task(task, (uint) active_workers());
}
void AbstractWorkGang::internal_worker_poll(WorkData* data) const {
void WorkGang::internal_worker_poll(WorkData* data) const {
assert(monitor()->owned_by_self(), "worker_poll is an internal method");
assert(data != NULL, "worker data is null");
data->set_task(task());
data->set_sequence_number(sequence_number());
}
void AbstractWorkGang::internal_note_start() {
void WorkGang::internal_note_start() {
assert(monitor()->owned_by_self(), "note_finish is an internal method");
_started_workers += 1;
}
void AbstractWorkGang::internal_note_finish() {
void WorkGang::internal_note_finish() {
assert(monitor()->owned_by_self(), "note_finish is an internal method");
_finished_workers += 1;
}
void AbstractWorkGang::print_worker_threads_on(outputStream* st) const {
uint num_thr = total_workers();
for (uint i = 0; i < num_thr; i++) {
gang_worker(i)->print_on(st);
st->cr();
}
}
void AbstractWorkGang::threads_do(ThreadClosure* tc) const {
assert(tc != NULL, "Null ThreadClosure");
uint num_thr = total_workers();
for (uint i = 0; i < num_thr; i++) {
tc->do_thread(gang_worker(i));
}
}
// GangWorker methods.
GangWorker::GangWorker(AbstractWorkGang* gang, uint id) {
AbstractGangWorker::AbstractGangWorker(AbstractWorkGang* gang, uint id) {
_gang = gang;
set_id(id);
set_name("%s#%d", gang->name(), id);
}
void GangWorker::run() {
void AbstractGangWorker::run() {
initialize();
loop();
}
void GangWorker::initialize() {
void AbstractGangWorker::initialize() {
this->initialize_thread_local_storage();
this->record_stack_base_and_size();
this->initialize_named_thread();
@ -224,6 +204,20 @@ void GangWorker::initialize() {
" of a work gang");
}
bool AbstractGangWorker::is_GC_task_thread() const {
return gang()->are_GC_task_threads();
}
bool AbstractGangWorker::is_ConcurrentGC_thread() const {
return gang()->are_ConcurrentGC_threads();
}
void AbstractGangWorker::print_on(outputStream* st) const {
st->print("\"%s\" ", name());
Thread::print_on(st);
st->cr();
}
void GangWorker::loop() {
int previous_sequence_number = 0;
Monitor* gang_monitor = gang()->monitor();
@ -300,37 +294,6 @@ void GangWorker::loop() {
}
}
bool GangWorker::is_GC_task_thread() const {
return gang()->are_GC_task_threads();
}
bool GangWorker::is_ConcurrentGC_thread() const {
return gang()->are_ConcurrentGC_threads();
}
void GangWorker::print_on(outputStream* st) const {
st->print("\"%s\" ", name());
Thread::print_on(st);
st->cr();
}
// Printing methods
const char* AbstractWorkGang::name() const {
return _name;
}
#ifndef PRODUCT
const char* AbstractGangTask::name() const {
return _name;
}
#endif /* PRODUCT */
// FlexibleWorkGang
// *** WorkGangBarrierSync
WorkGangBarrierSync::WorkGangBarrierSync()

View File

@ -25,112 +25,138 @@
#ifndef SHARE_VM_GC_SHARED_WORKGROUP_HPP
#define SHARE_VM_GC_SHARED_WORKGROUP_HPP
#include "gc/shared/taskqueue.hpp"
#include "runtime/thread.inline.hpp"
// Task class hierarchy:
// AbstractGangTask
// AbstractGangTaskWOopQueues
//
// Gang/Group class hierarchy:
// AbstractWorkGang
// WorkGang
// FlexibleWorkGang
// YieldingFlexibleWorkGang (defined in another file)
//
// Worker class hierarchy:
// GangWorker (subclass of WorkerThread)
// AbstractGangWorker (subclass of WorkerThread)
// GangWorker
// YieldingFlexibleGangWorker (defined in another file)
// Forward declarations of classes defined here
class WorkGang;
class AbstractGangWorker;
class GangWorker;
class YieldingFlexibleGangWorker;
class YieldingFlexibleGangTask;
class WorkData;
class AbstractWorkGang;
// An abstract task to be worked on by a gang.
// You subclass this to supply your own work() method
class AbstractGangTask VALUE_OBJ_CLASS_SPEC {
public:
const char* _name;
public:
AbstractGangTask(const char* name) : _name(name) {}
// The abstract work method.
// The argument tells you which member of the gang you are.
virtual void work(uint worker_id) = 0;
// Debugging accessor for the name.
const char* name() const PRODUCT_RETURN_(return NULL;);
int counter() { return _counter; }
void set_counter(int value) { _counter = value; }
int *address_of_counter() { return &_counter; }
// RTTI
NOT_PRODUCT(virtual bool is_YieldingFlexibleGang_task() const {
return false;
})
private:
NOT_PRODUCT(const char* _name;)
// ??? Should a task have a priority associated with it?
// ??? Or can the run method adjust priority as needed?
int _counter;
protected:
// Constructor and desctructor: only construct subclasses.
AbstractGangTask(const char* name)
{
NOT_PRODUCT(_name = name);
_counter = 0;
}
~AbstractGangTask() { }
public:
};
class AbstractGangTaskWOopQueues : public AbstractGangTask {
OopTaskQueueSet* _queues;
ParallelTaskTerminator _terminator;
public:
AbstractGangTaskWOopQueues(const char* name, OopTaskQueueSet* queues, uint n_threads) :
AbstractGangTask(name), _queues(queues), _terminator(n_threads, _queues) {}
ParallelTaskTerminator* terminator() { return &_terminator; }
OopTaskQueueSet* queues() { return _queues; }
const char* name() const { return _name; }
};
// Class AbstractWorkGang:
// An abstract class representing a gang of workers.
// You subclass this to supply an implementation of run_task().
class AbstractWorkGang: public CHeapObj<mtInternal> {
protected:
// Work gangs are never deleted, so no need to cleanup.
~AbstractWorkGang() { ShouldNotReachHere(); }
public:
// Constructor.
AbstractWorkGang(const char* name, bool are_GC_task_threads,
bool are_ConcurrentGC_threads);
// Run a task, returns when the task is done (or terminated).
virtual void run_task(AbstractGangTask* task) = 0;
// Return true if more workers should be applied to the task.
virtual bool needs_more_workers() const { return true; }
public:
// Debugging.
const char* name() const;
protected:
// The work gang is the collection of workers to execute tasks.
// The number of workers run for a task is "_active_workers"
// while "_total_workers" is the number of available of workers.
class AbstractWorkGang : public CHeapObj<mtInternal> {
protected:
// The array of worker threads for this gang.
AbstractGangWorker** _workers;
// The count of the number of workers in the gang.
uint _total_workers;
// The currently active workers in this gang.
uint _active_workers;
// Printing support.
const char* _name;
private:
// Initialize only instance data.
const bool _are_GC_task_threads;
const bool _are_ConcurrentGC_threads;
// Printing support.
const char* _name;
public:
AbstractWorkGang(const char* name, uint workers, bool are_GC_task_threads, bool are_ConcurrentGC_threads) :
_name(name),
_total_workers(workers),
_active_workers(UseDynamicNumberOfGCThreads ? 1U : workers),
_are_GC_task_threads(are_GC_task_threads),
_are_ConcurrentGC_threads(are_ConcurrentGC_threads)
{ }
virtual AbstractGangWorker* allocate_worker(uint which) = 0;
// Initialize workers in the gang. Return true if initialization succeeded.
bool initialize_workers();
bool are_GC_task_threads() const { return _are_GC_task_threads; }
bool are_ConcurrentGC_threads() const { return _are_ConcurrentGC_threads; }
uint total_workers() const { return _total_workers; }
virtual uint active_workers() const {
assert(_active_workers <= _total_workers,
err_msg("_active_workers: %u > _total_workers: %u", _active_workers, _total_workers));
assert(UseDynamicNumberOfGCThreads || _active_workers == _total_workers,
"Unless dynamic should use total workers");
return _active_workers;
}
void set_active_workers(uint v) {
assert(v <= _total_workers,
"Trying to set more workers active than there are");
_active_workers = MIN2(v, _total_workers);
assert(v != 0, "Trying to set active workers to 0");
_active_workers = MAX2(1U, _active_workers);
assert(UseDynamicNumberOfGCThreads || _active_workers == _total_workers,
"Unless dynamic should use total workers");
}
// Return the Ith worker.
AbstractGangWorker* worker(uint i) const;
void threads_do(ThreadClosure* tc) const;
// Debugging.
const char* name() const { return _name; }
// Printing
void print_worker_threads_on(outputStream *st) const;
void print_worker_threads() const {
print_worker_threads_on(tty);
}
};
// An class representing a gang of workers.
class WorkGang: public AbstractWorkGang {
private:
// Never deleted.
~WorkGang();
public:
WorkGang(const char* name,
uint workers,
bool are_GC_task_threads,
bool are_ConcurrentGC_threads);
// Run a task, returns when the task is done.
virtual void run_task(AbstractGangTask* task);
void run_task(AbstractGangTask* task, uint no_of_parallel_workers);
// Return true if more workers should be applied to the task.
virtual bool needs_more_workers() const {
return _started_workers < _active_workers;
}
protected:
// The monitor which protects these data,
// and notifies of changes in it.
Monitor* _monitor;
// The count of the number of workers in the gang.
uint _total_workers;
// The array of worker threads for this gang.
// This is only needed for cleaning up.
GangWorker** _gang_workers;
// The task for this gang.
AbstractGangTask* _task;
// A sequence number for the current task.
@ -139,20 +165,14 @@ protected:
uint _started_workers;
// The number of finished workers.
uint _finished_workers;
public:
virtual AbstractGangWorker* allocate_worker(uint which);
// Accessors for fields
Monitor* monitor() const {
return _monitor;
}
uint total_workers() const {
return _total_workers;
}
virtual uint active_workers() const {
return _total_workers;
}
GangWorker** gang_workers() const {
return _gang_workers;
}
AbstractGangTask* task() const {
return _task;
}
@ -165,12 +185,6 @@ public:
uint finished_workers() const {
return _finished_workers;
}
bool are_GC_task_threads() const {
return _are_GC_task_threads;
}
bool are_ConcurrentGC_threads() const {
return _are_ConcurrentGC_threads;
}
// Predicates.
bool is_idle() const {
return (task() == NULL);
@ -178,17 +192,8 @@ public:
// Return the Ith gang worker.
GangWorker* gang_worker(uint i) const;
void threads_do(ThreadClosure* tc) const;
// Printing
void print_worker_threads_on(outputStream *st) const;
void print_worker_threads() const {
print_worker_threads_on(tty);
}
protected:
friend class GangWorker;
friend class YieldingFlexibleGangWorker;
// Note activation and deactivation of workers.
// These methods should only be called with the mutex held.
void internal_worker_poll(WorkData* data) const;
@ -213,35 +218,13 @@ public:
void set_task(AbstractGangTask* value) { _task = value; }
int sequence_number() const { return _sequence_number; }
void set_sequence_number(int value) { _sequence_number = value; }
YieldingFlexibleGangTask* yf_task() const {
return (YieldingFlexibleGangTask*)_task;
}
};
// Class WorkGang:
class WorkGang: public AbstractWorkGang {
public:
// Constructor
WorkGang(const char* name, uint workers,
bool are_GC_task_threads, bool are_ConcurrentGC_threads);
// Run a task, returns when the task is done (or terminated).
virtual void run_task(AbstractGangTask* task);
void run_task(AbstractGangTask* task, uint no_of_parallel_workers);
// Allocate a worker and return a pointer to it.
virtual GangWorker* allocate_worker(uint which);
// Initialize workers in the gang. Return true if initialization
// succeeded. The type of the worker can be overridden in a derived
// class with the appropriate implementation of allocate_worker().
bool initialize_workers();
};
// Class GangWorker:
// Several instances of this class run in parallel as workers for a gang.
class GangWorker: public WorkerThread {
class AbstractGangWorker: public WorkerThread {
public:
// Constructors and destructor.
GangWorker(AbstractWorkGang* gang, uint id);
AbstractGangWorker(AbstractWorkGang* gang, uint id);
// The only real method: run a task for the gang.
virtual void run();
@ -251,14 +234,25 @@ public:
// Printing
void print_on(outputStream* st) const;
virtual void print() const { print_on(tty); }
protected:
AbstractWorkGang* _gang;
virtual void initialize();
virtual void loop() = 0;
AbstractWorkGang* gang() const { return _gang; }
};
class GangWorker: public AbstractGangWorker {
public:
GangWorker(WorkGang* gang, uint id) : AbstractGangWorker(gang, id) {}
protected:
virtual void loop();
public:
AbstractWorkGang* gang() const { return _gang; }
private:
WorkGang* gang() const { return (WorkGang*)_gang; }
};
// Dynamic number of worker threads
@ -282,47 +276,6 @@ public:
// are not needed for this tasks (i.e., "_active_workers" have
// been started before it, continue to wait for work.
class FlexibleWorkGang: public WorkGang {
// The currently active workers in this gang.
// This is a number that is dynamically adjusted
// and checked in the run_task() method at each invocation.
// As described above _active_workers determines the number
// of threads started on a task. It must also be used to
// determine completion.
protected:
uint _active_workers;
public:
// Constructor and destructor.
FlexibleWorkGang(const char* name, uint workers,
bool are_GC_task_threads,
bool are_ConcurrentGC_threads) :
WorkGang(name, workers, are_GC_task_threads, are_ConcurrentGC_threads),
_active_workers(UseDynamicNumberOfGCThreads ? 1U : workers) {}
// Accessors for fields.
virtual uint active_workers() const {
assert(_active_workers <= _total_workers,
err_msg("_active_workers: %u > _total_workers: %u", _active_workers, _total_workers));
assert(UseDynamicNumberOfGCThreads || _active_workers == _total_workers,
"Unless dynamic should use total workers");
return _active_workers;
}
void set_active_workers(uint v) {
assert(v <= _total_workers,
"Trying to set more workers active than there are");
_active_workers = MIN2(v, _total_workers);
assert(v != 0, "Trying to set active workers to 0");
_active_workers = MAX2(1U, _active_workers);
assert(UseDynamicNumberOfGCThreads || _active_workers == _total_workers,
"Unless dynamic should use total workers");
}
virtual void run_task(AbstractGangTask* task);
virtual bool needs_more_workers() const {
return _started_workers < _active_workers;
}
};
// A class that acts as a synchronisation barrier. Workers enter
// the barrier and must wait until all other workers have entered
// before any of them may leave.