8254758: Change G1ServiceThread to be task based

Reviewed-by: ayang, iwalulya, tschatzl
This commit is contained in:
Stefan Johansson 2020-11-03 11:07:35 +00:00
parent 9a0cf58721
commit 1d0bd50624
3 changed files with 565 additions and 98 deletions
src/hotspot/share/gc/g1
test/hotspot/gtest/gc/g1

@ -36,92 +36,69 @@
#include "runtime/mutexLocker.hpp"
#include "runtime/os.hpp"
G1ServiceThread::G1ServiceThread() :
ConcurrentGCThread(),
_monitor(Mutex::nonleaf,
"G1ServiceThread monitor",
true,
Monitor::_safepoint_check_never),
_last_periodic_gc_attempt_s(os::elapsedTime()),
_vtime_accum(0) {
set_name("G1 Service");
create_and_start();
G1SentinelTask::G1SentinelTask() : G1ServiceTask("Sentinel Task") {
set_time(max_jlong);
set_next(this);
}
void G1ServiceThread::sleep_before_next_cycle() {
MonitorLocker ml(&_monitor, Mutex::_no_safepoint_check_flag);
if (!should_terminate()) {
uintx waitms = G1ConcRefinementServiceIntervalMillis;
ml.wait(waitms);
}
void G1SentinelTask::execute() {
guarantee(false, "Sentinel service task should never be executed.");
}
bool G1ServiceThread::should_start_periodic_gc() {
G1CollectedHeap* g1h = G1CollectedHeap::heap();
// If we are currently in a concurrent mark we are going to uncommit memory soon.
if (g1h->concurrent_mark()->cm_thread()->in_progress()) {
log_debug(gc, periodic)("Concurrent cycle in progress. Skipping.");
return false;
// Task handling periodic GCs
class G1PeriodicGCTask : public G1ServiceTask {
bool should_start_periodic_gc() {
G1CollectedHeap* g1h = G1CollectedHeap::heap();
// If we are currently in a concurrent mark we are going to uncommit memory soon.
if (g1h->concurrent_mark()->cm_thread()->in_progress()) {
log_debug(gc, periodic)("Concurrent cycle in progress. Skipping.");
return false;
}
// Check if enough time has passed since the last GC.
uintx time_since_last_gc = (uintx)g1h->time_since_last_collection().milliseconds();
if ((time_since_last_gc < G1PeriodicGCInterval)) {
log_debug(gc, periodic)("Last GC occurred " UINTX_FORMAT "ms before which is below threshold " UINTX_FORMAT "ms. Skipping.",
time_since_last_gc, G1PeriodicGCInterval);
return false;
}
// Check if load is lower than max.
double recent_load;
if ((G1PeriodicGCSystemLoadThreshold > 0.0f) &&
(os::loadavg(&recent_load, 1) == -1 || recent_load > G1PeriodicGCSystemLoadThreshold)) {
log_debug(gc, periodic)("Load %1.2f is higher than threshold %1.2f. Skipping.",
recent_load, G1PeriodicGCSystemLoadThreshold);
return false;
}
return true;
}
// Check if enough time has passed since the last GC.
uintx time_since_last_gc = (uintx)g1h->time_since_last_collection().milliseconds();
if ((time_since_last_gc < G1PeriodicGCInterval)) {
log_debug(gc, periodic)("Last GC occurred " UINTX_FORMAT "ms before which is below threshold " UINTX_FORMAT "ms. Skipping.",
time_since_last_gc, G1PeriodicGCInterval);
return false;
}
void check_for_periodic_gc(){
// If disabled, just return.
if (G1PeriodicGCInterval == 0) {
return;
}
// Check if load is lower than max.
double recent_load;
if ((G1PeriodicGCSystemLoadThreshold > 0.0f) &&
(os::loadavg(&recent_load, 1) == -1 || recent_load > G1PeriodicGCSystemLoadThreshold)) {
log_debug(gc, periodic)("Load %1.2f is higher than threshold %1.2f. Skipping.",
recent_load, G1PeriodicGCSystemLoadThreshold);
return false;
}
return true;
}
void G1ServiceThread::check_for_periodic_gc(){
// If disabled, just return.
if (G1PeriodicGCInterval == 0) {
return;
}
if ((os::elapsedTime() - _last_periodic_gc_attempt_s) > (G1PeriodicGCInterval / 1000.0)) {
log_debug(gc, periodic)("Checking for periodic GC.");
if (should_start_periodic_gc()) {
if (!G1CollectedHeap::heap()->try_collect(GCCause::_g1_periodic_collection)) {
log_debug(gc, periodic)("GC request denied. Skipping.");
}
}
_last_periodic_gc_attempt_s = os::elapsedTime();
}
}
void G1ServiceThread::run_service() {
double vtime_start = os::elapsedVTime();
while (!should_terminate()) {
sample_young_list_rs_length();
if (os::supports_vtime()) {
_vtime_accum = (os::elapsedVTime() - vtime_start);
} else {
_vtime_accum = 0.0;
}
public:
G1PeriodicGCTask(const char* name) : G1ServiceTask(name) { }
virtual void execute() {
check_for_periodic_gc();
sleep_before_next_cycle();
// G1PeriodicGCInterval is a manageable flag and can be updated
// during runtime. If no value is set, wait a second and run it
// again to see if the value has been updated. Otherwise use the
// real value provided.
schedule(G1PeriodicGCInterval == 0 ? 1000 : G1PeriodicGCInterval);
}
}
void G1ServiceThread::stop_service() {
MutexLocker x(&_monitor, Mutex::_no_safepoint_check_flag);
_monitor.notify();
}
};
class G1YoungRemSetSamplingClosure : public HeapRegionClosure {
SuspendibleThreadSetJoiner* _sts;
@ -155,19 +132,265 @@ public:
size_t sampled_rs_length() const { return _sampled_rs_length; }
};
void G1ServiceThread::sample_young_list_rs_length() {
SuspendibleThreadSetJoiner sts;
G1CollectedHeap* g1h = G1CollectedHeap::heap();
G1Policy* policy = g1h->policy();
// Task handling young gen remembered set sampling.
class G1RemSetSamplingTask : public G1ServiceTask {
// Sample the current length of remembered sets for young.
//
// At the end of the GC G1 determines the length of the young gen based on
// how much time the next GC can take, and when the next GC may occur
// according to the MMU.
//
// The assumption is that a significant part of the GC is spent on scanning
// the remembered sets (and many other components), so this thread constantly
// reevaluates the prediction for the remembered set scanning costs, and potentially
// G1Policy resizes the young gen. This may do a premature GC or even
// increase the young gen size to keep pause time length goal.
void sample_young_list_rs_length(){
SuspendibleThreadSetJoiner sts;
G1CollectedHeap* g1h = G1CollectedHeap::heap();
G1Policy* policy = g1h->policy();
if (policy->use_adaptive_young_list_length()) {
G1YoungRemSetSamplingClosure cl(&sts);
if (policy->use_adaptive_young_list_length()) {
G1YoungRemSetSamplingClosure cl(&sts);
G1CollectionSet* g1cs = g1h->collection_set();
g1cs->iterate(&cl);
G1CollectionSet* g1cs = g1h->collection_set();
g1cs->iterate(&cl);
if (cl.is_complete()) {
policy->revise_young_list_target_length_if_necessary(cl.sampled_rs_length());
if (cl.is_complete()) {
policy->revise_young_list_target_length_if_necessary(cl.sampled_rs_length());
}
}
}
public:
G1RemSetSamplingTask(const char* name) : G1ServiceTask(name) { }
virtual void execute() {
sample_young_list_rs_length();
schedule(G1ConcRefinementServiceIntervalMillis);
}
};
G1ServiceThread::G1ServiceThread() :
ConcurrentGCThread(),
_monitor(Mutex::nonleaf,
"G1ServiceThread monitor",
true,
Monitor::_safepoint_check_never),
_task_queue(),
_vtime_accum(0) {
set_name("G1 Service");
create_and_start();
}
void G1ServiceThread::register_task(G1ServiceTask* task, jlong delay) {
guarantee(!task->is_registered(), "Task already registered");
guarantee(task->next() == NULL, "Task already in queue");
log_debug(gc, task)("G1 Service Thread (%s) (register)", task->name());
// Associate the task with the service thread.
task->set_service_thread(this);
// Schedule the task to run after the given delay.
schedule_task(task, delay);
// Notify the service thread that there is a new task, thread might
// be waiting and the newly added task might be first in the list.
MonitorLocker ml(&_monitor, Mutex::_no_safepoint_check_flag);
ml.notify();
}
void G1ServiceThread::schedule_task(G1ServiceTask* task, jlong delay_ms) {
guarantee(task->is_registered(), "Must be registered before scheduled");
guarantee(task->next() == NULL, "Task already in queue");
// Schedule task by setting the task time and adding it to queue.
jlong delay = TimeHelper::millis_to_counter(delay_ms);
task->set_time(os::elapsed_counter() + delay);
MutexLocker ml(&_monitor, Mutex::_no_safepoint_check_flag);
_task_queue.add_ordered(task);
log_trace(gc, task)("G1 Service Thread (%s) (schedule) @%1.3fs",
task->name(), TimeHelper::counter_to_seconds(task->time()));
}
int64_t G1ServiceThread::time_to_next_task_ms() {
assert(_monitor.owned_by_self(), "Must be owner of lock");
assert(!_task_queue.is_empty(), "Should not be called for empty list");
jlong time_diff = _task_queue.peek()->time() - os::elapsed_counter();
if (time_diff < 0) {
// Run without sleeping.
return 0;
}
// Return sleep time in milliseconds.
return (int64_t) TimeHelper::counter_to_millis(time_diff);
}
void G1ServiceThread::sleep_before_next_cycle() {
if (should_terminate()) {
return;
}
MonitorLocker ml(&_monitor, Mutex::_no_safepoint_check_flag);
if (_task_queue.is_empty()) {
// Sleep until new task is registered if no tasks available.
log_trace(gc, task)("G1 Service Thread (wait for new tasks)");
ml.wait(0);
} else {
int64_t sleep_ms = time_to_next_task_ms();
if (sleep_ms > 0) {
log_trace(gc, task)("G1 Service Thread (wait) %1.3fs", sleep_ms / 1000.0);
ml.wait(sleep_ms);
}
}
}
G1ServiceTask* G1ServiceThread::pop_due_task() {
MutexLocker ml(&_monitor, Mutex::_no_safepoint_check_flag);
if (_task_queue.is_empty() || time_to_next_task_ms() != 0) {
return NULL;
}
return _task_queue.pop();
}
void G1ServiceThread::run_task(G1ServiceTask* task) {
double start = os::elapsedTime();
double vstart = os::elapsedVTime();
log_debug(gc, task, start)("G1 Service Thread (%s) (run)", task->name());
task->execute();
double duration = os::elapsedTime() - start;
double vduration = os::elapsedVTime() - vstart;
log_debug(gc, task)("G1 Service Thread (%s) (run) %1.3fms (cpu: %1.3fms)",
task->name(), duration * MILLIUNITS, vduration * MILLIUNITS);
}
void G1ServiceThread::run_service() {
double vtime_start = os::elapsedVTime();
// Setup the tasks handeled by the service thread and
// add them to the task list.
G1PeriodicGCTask gc_task("Periodic GC Task");
register_task(&gc_task);
G1RemSetSamplingTask remset_task("Remembered Set Sampling Task");
register_task(&remset_task);
while (!should_terminate()) {
G1ServiceTask* task = pop_due_task();
if (task != NULL) {
run_task(task);
}
if (os::supports_vtime()) {
_vtime_accum = (os::elapsedVTime() - vtime_start);
} else {
_vtime_accum = 0.0;
}
sleep_before_next_cycle();
}
}
void G1ServiceThread::stop_service() {
MonitorLocker ml(&_monitor, Mutex::_no_safepoint_check_flag);
ml.notify();
}
G1ServiceTask::G1ServiceTask(const char* name) :
_time(),
_name(name),
_next(NULL),
_service_thread(NULL) { }
void G1ServiceTask::set_service_thread(G1ServiceThread* thread) {
_service_thread = thread;
}
bool G1ServiceTask::is_registered() {
return _service_thread != NULL;
}
void G1ServiceTask::schedule(jlong delay_ms) {
_service_thread->schedule_task(this, delay_ms);
}
const char* G1ServiceTask::name() {
return _name;
}
void G1ServiceTask::set_time(jlong time) {
assert(_next == NULL, "Not allowed to update time while in queue");
_time = time;
}
jlong G1ServiceTask::time() {
return _time;
}
void G1ServiceTask::set_next(G1ServiceTask* next) {
_next = next;
}
G1ServiceTask* G1ServiceTask::next() {
return _next;
}
G1ServiceTaskQueue::G1ServiceTaskQueue() : _sentinel() { }
G1ServiceTask* G1ServiceTaskQueue::pop() {
verify_task_queue();
G1ServiceTask* task = _sentinel.next();
_sentinel.set_next(task->next());
task->set_next(NULL);
return task;
}
G1ServiceTask* G1ServiceTaskQueue::peek() {
verify_task_queue();
return _sentinel.next();
}
bool G1ServiceTaskQueue::is_empty() {
return &_sentinel == _sentinel.next();
}
void G1ServiceTaskQueue::add_ordered(G1ServiceTask* task) {
assert(task != NULL, "not a valid task");
assert(task->next() == NULL, "invariant");
assert(task->time() != max_jlong, "invalid time for task");
G1ServiceTask* current = &_sentinel;
while (task->time() >= current->next()->time()) {
assert(task != current, "Task should only be added once.");
current = current->next();
}
// Update the links.
task->set_next(current->next());
current->set_next(task);
verify_task_queue();
}
#ifdef ASSERT
void G1ServiceTaskQueue::verify_task_queue() {
G1ServiceTask* cur = _sentinel.next();
assert(cur != &_sentinel, "Should never try to verify empty queue");
while (cur != &_sentinel) {
G1ServiceTask* next = cur->next();
assert(cur->time() <= next->time(),
"Tasks out of order, prev: %s (%1.3fs), next: %s (%1.3fs)",
cur->name(), TimeHelper::counter_to_seconds(cur->time()), next->name(), TimeHelper::counter_to_seconds(next->time()));
assert(cur != next, "Invariant");
cur = next;
}
}
#endif

@ -28,43 +28,104 @@
#include "gc/shared/concurrentGCThread.hpp"
#include "runtime/mutex.hpp"
class G1ServiceTaskQueue;
class G1ServiceThread;
class G1ServiceTask : public CHeapObj<mtGC> {
friend class G1ServiceTaskQueue;
friend class G1ServiceThread;
// The next absolute time this task should be executed.
jlong _time;
// Name of the task.
const char* _name;
// Next task in the task queue.
G1ServiceTask* _next;
// The service thread this task is registered with.
G1ServiceThread* _service_thread;
void set_service_thread(G1ServiceThread* thread);
bool is_registered();
public:
G1ServiceTask(const char* name);
jlong time();
const char* name();
G1ServiceTask* next();
// Do the actual work for the task. To get added back to the
// execution queue a task can call schedule(delay_ms).
virtual void execute() = 0;
protected:
// Schedule the task on the associated service thread
// using the provided delay in milliseconds.
void schedule(jlong delay_ms);
// These setters are protected for use by testing and the
// sentinel task only.
void set_time(jlong time);
void set_next(G1ServiceTask* next);
};
class G1SentinelTask : public G1ServiceTask {
public:
G1SentinelTask();
virtual void execute();
};
class G1ServiceTaskQueue {
// The sentinel task is the entry point of this priority queue holding the
// service tasks. The queue is ordered by the time the tasks are scheduled
// to run. To simplify list management the sentinel task has its time set
// to max_jlong, guaranteeing it to be the last task in the queue.
G1SentinelTask _sentinel;
// Verify that the queue is ordered.
void verify_task_queue() NOT_DEBUG_RETURN;
public:
G1ServiceTaskQueue();
G1ServiceTask* pop();
G1ServiceTask* peek();
void add_ordered(G1ServiceTask* task);
bool is_empty();
};
// The G1ServiceThread is used to periodically do a number of different tasks:
// - re-assess the validity of the prediction for the
// remembered set lengths of the young generation.
// - check if a periodic GC should be scheduled.
class G1ServiceThread: public ConcurrentGCThread {
private:
friend class G1ServiceTask;
// The monitor is used to ensure thread safety for the task queue
// and allow other threads to signal the service thread to wake up.
Monitor _monitor;
double _last_periodic_gc_attempt_s;
G1ServiceTaskQueue _task_queue;
double _vtime_accum; // Accumulated virtual time.
// Sample the current length of remembered sets for young.
//
// At the end of the GC G1 determines the length of the young gen based on
// how much time the next GC can take, and when the next GC may occur
// according to the MMU.
//
// The assumption is that a significant part of the GC is spent on scanning
// the remembered sets (and many other components), so this thread constantly
// reevaluates the prediction for the remembered set scanning costs, and potentially
// G1Policy resizes the young gen. This may do a premature GC or even
// increase the young gen size to keep pause time length goal.
void sample_young_list_rs_length();
void run_service();
void check_for_periodic_gc();
void stop_service();
// Returns the time in milliseconds until the next task is due.
// Used both to determine if there are tasks ready to run and
// how long to sleep when nothing is ready.
int64_t time_to_next_task_ms();
void sleep_before_next_cycle();
bool should_start_periodic_gc();
G1ServiceTask* pop_due_task();
void run_task(G1ServiceTask* task);
// Schedule a registered task to run after the given delay.
void schedule_task(G1ServiceTask* task, jlong delay);
public:
G1ServiceThread();
double vtime_accum() { return _vtime_accum; }
// Register a task with the service thread and schedule it. If
// no delay is specified the task is scheduled to run directly.
void register_task(G1ServiceTask* task, jlong delay = 0);
};
#endif // SHARE_GC_G1_G1SERVICETHREAD_HPP

@ -0,0 +1,183 @@
/*
* Copyright (c) 2020, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*
*/
#include "precompiled.hpp"
#include "gc/g1/g1ServiceThread.hpp"
#include "runtime/interfaceSupport.inline.hpp"
#include "runtime/os.hpp"
#include "utilities/autoRestore.hpp"
#include "unittest.hpp"
class CheckTask : public G1ServiceTask {
int _execution_count;
bool _reschedule;
public:
CheckTask(const char* name) :
G1ServiceTask(name),
_execution_count(0),
_reschedule(true) { }
virtual void execute() {
_execution_count++;
if (_reschedule) {
schedule(100);
}
}
int execution_count() { return _execution_count;}
void set_reschedule(bool reschedule) { _reschedule = reschedule; }
};
static void stop_service_thread(G1ServiceThread* thread) {
ThreadInVMfromNative tvn(JavaThread::current());
thread->stop();
}
// Test that a task that is added during runtime gets run.
TEST_VM(G1ServiceThread, test_add) {
// Create thread and let it start.
G1ServiceThread* st = new G1ServiceThread();
os::naked_short_sleep(500);
CheckTask ct("AddAndRun");
st->register_task(&ct);
// Give CheckTask time to run.
os::naked_short_sleep(500);
stop_service_thread(st);
ASSERT_GT(ct.execution_count(), 0);
}
// Test that a task that is added while the service thread is
// waiting gets run in a timely manner.
TEST_VM(G1ServiceThread, test_add_while_waiting) {
// Make sure default tasks use long intervals so that the service thread
// is doing a long wait for the next execution.
AutoModifyRestore<uintx> f1(G1PeriodicGCInterval, 100000);
AutoModifyRestore<uintx> f2(G1ConcRefinementServiceIntervalMillis, 100000);
// Create thread and let it start.
G1ServiceThread* st = new G1ServiceThread();
os::naked_short_sleep(500);
// Register a new task that should run right away.
CheckTask ct("AddWhileWaiting");
st->register_task(&ct);
// Give CheckTask time to run.
os::naked_short_sleep(500);
stop_service_thread(st);
ASSERT_GT(ct.execution_count(), 0);
}
// Test that a task with negative timeout is not rescheduled.
TEST_VM(G1ServiceThread, test_add_run_once) {
// Create thread and let it start.
G1ServiceThread* st = new G1ServiceThread();
os::naked_short_sleep(500);
// Set reschedule to false to only run once.
CheckTask ct("AddRunOnce");
ct.set_reschedule(false);
st->register_task(&ct);
// Give CheckTask time to run.
os::naked_short_sleep(500);
stop_service_thread(st);
// Should be exactly 1 since negative timeout should
// prevent rescheduling.
ASSERT_EQ(ct.execution_count(), 1);
}
class TestTask : public G1ServiceTask {
jlong _delay_ms;
public:
TestTask(jlong delay) :
G1ServiceTask("TestTask"),
_delay_ms(delay) {
set_time(delay);
}
virtual void execute() {}
void update_time(jlong now, int multiplier) {
set_time(now + (_delay_ms * multiplier));
}
};
TEST_VM(G1ServiceTaskQueue, add_ordered) {
G1ServiceTaskQueue queue;
int num_test_tasks = 5;
for (int i = 1; i <= num_test_tasks; i++) {
// Create tasks with different timeout.
TestTask* task = new TestTask(100 * i);
queue.add_ordered(task);
}
// Now fake a run-loop, that reschedules the tasks using a
// random multiplier.
for (jlong now = 0; now < 1000000; now++) {
// Random multiplier is at least 1 to ensure progress.
int multiplier = 1 + os::random() % 10;
while (queue.peek()->time() < now) {
TestTask* task = (TestTask*) queue.pop();
// Update delay multiplier.
task->execute();
task->update_time(now, multiplier);
// All additions will verify that the queue is sorted.
queue.add_ordered(task);
}
}
while (!queue.is_empty()) {
G1ServiceTask* task = queue.pop();
delete task;
}
}
#ifdef ASSERT
TEST_VM_ASSERT_MSG(G1ServiceTaskQueue, pop_empty,
"Should never try to verify empty queue") {
G1ServiceTaskQueue queue;
queue.pop();
}
TEST_VM_ASSERT_MSG(G1ServiceTaskQueue, peek_empty,
"Should never try to verify empty queue") {
G1ServiceTaskQueue queue;
queue.peek();
}
TEST_VM_ASSERT_MSG(G1ServiceTaskQueue, set_time_in_queue,
"Not allowed to update time while in queue") {
G1ServiceTaskQueue queue;
TestTask a(100);
queue.add_ordered(&a);
// Not allowed to update time while in queue.
a.update_time(500, 1);
}
#endif