8274298: JFR Thread Sampler thread must not acquire malloc lock after suspending a thread because of possible deadlock
Reviewed-by: egahlin
This commit is contained in:
parent
84baea753a
commit
965ea8d9cd
@ -27,8 +27,10 @@
|
|||||||
#include "jfr/recorder/jfrRecorder.hpp"
|
#include "jfr/recorder/jfrRecorder.hpp"
|
||||||
#include "jfr/periodic/sampling/jfrCallTrace.hpp"
|
#include "jfr/periodic/sampling/jfrCallTrace.hpp"
|
||||||
#include "jfr/periodic/sampling/jfrThreadSampler.hpp"
|
#include "jfr/periodic/sampling/jfrThreadSampler.hpp"
|
||||||
|
#include "jfr/recorder/checkpoint/types/traceid/jfrTraceIdLoadBarrier.inline.hpp"
|
||||||
#include "jfr/recorder/service/jfrOptionSet.hpp"
|
#include "jfr/recorder/service/jfrOptionSet.hpp"
|
||||||
#include "jfr/recorder/stacktrace/jfrStackTraceRepository.hpp"
|
#include "jfr/recorder/stacktrace/jfrStackTraceRepository.hpp"
|
||||||
|
#include "jfr/recorder/storage/jfrBuffer.hpp"
|
||||||
#include "jfr/support/jfrThreadId.hpp"
|
#include "jfr/support/jfrThreadId.hpp"
|
||||||
#include "jfr/support/jfrThreadLocal.hpp"
|
#include "jfr/support/jfrThreadLocal.hpp"
|
||||||
#include "jfr/utilities/jfrTime.hpp"
|
#include "jfr/utilities/jfrTime.hpp"
|
||||||
@ -323,10 +325,15 @@ class JfrThreadSampler : public NonJavaThread {
|
|||||||
JavaThread* _last_thread_native;
|
JavaThread* _last_thread_native;
|
||||||
size_t _interval_java;
|
size_t _interval_java;
|
||||||
size_t _interval_native;
|
size_t _interval_native;
|
||||||
|
const size_t _min_size; // for enqueue buffer monitoring
|
||||||
|
const size_t _renew_size;
|
||||||
int _cur_index;
|
int _cur_index;
|
||||||
const u4 _max_frames;
|
const u4 _max_frames;
|
||||||
volatile bool _disenrolled;
|
volatile bool _disenrolled;
|
||||||
|
|
||||||
|
const JfrBuffer* get_enqueue_buffer();
|
||||||
|
const JfrBuffer* renew_if_full(const JfrBuffer* enqueue_buffer);
|
||||||
|
|
||||||
JavaThread* next_thread(ThreadsList* t_list, JavaThread* first_sampled, JavaThread* current);
|
JavaThread* next_thread(ThreadsList* t_list, JavaThread* first_sampled, JavaThread* current);
|
||||||
void task_stacktrace(JfrSampleType type, JavaThread** last_thread);
|
void task_stacktrace(JfrSampleType type, JavaThread** last_thread);
|
||||||
JfrThreadSampler(size_t interval_java, size_t interval_native, u4 max_frames);
|
JfrThreadSampler(size_t interval_java, size_t interval_native, u4 max_frames);
|
||||||
@ -396,6 +403,8 @@ JfrThreadSampler::JfrThreadSampler(size_t interval_java, size_t interval_native,
|
|||||||
_last_thread_native(NULL),
|
_last_thread_native(NULL),
|
||||||
_interval_java(interval_java),
|
_interval_java(interval_java),
|
||||||
_interval_native(interval_native),
|
_interval_native(interval_native),
|
||||||
|
_min_size(JfrOptionSet::stackdepth() * sizeof(intptr_t)),
|
||||||
|
_renew_size(_min_size * 2),
|
||||||
_cur_index(-1),
|
_cur_index(-1),
|
||||||
_max_frames(max_frames),
|
_max_frames(max_frames),
|
||||||
_disenrolled(true) {
|
_disenrolled(true) {
|
||||||
@ -520,6 +529,15 @@ void JfrThreadSampler::post_run() {
|
|||||||
delete this;
|
delete this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const JfrBuffer* JfrThreadSampler::get_enqueue_buffer() {
|
||||||
|
const JfrBuffer* buffer = JfrTraceIdLoadBarrier::get_enqueue_buffer(this);
|
||||||
|
return buffer != nullptr ? renew_if_full(buffer) : JfrTraceIdLoadBarrier::renew_enqueue_buffer(_renew_size, this);
|
||||||
|
}
|
||||||
|
|
||||||
|
const JfrBuffer* JfrThreadSampler::renew_if_full(const JfrBuffer* enqueue_buffer) {
|
||||||
|
assert(enqueue_buffer != nullptr, "invariant");
|
||||||
|
return enqueue_buffer->free_size() < _min_size ? JfrTraceIdLoadBarrier::renew_enqueue_buffer(_renew_size, this) : enqueue_buffer;
|
||||||
|
}
|
||||||
|
|
||||||
void JfrThreadSampler::task_stacktrace(JfrSampleType type, JavaThread** last_thread) {
|
void JfrThreadSampler::task_stacktrace(JfrSampleType type, JavaThread** last_thread) {
|
||||||
ResourceMark rm;
|
ResourceMark rm;
|
||||||
@ -530,7 +548,6 @@ void JfrThreadSampler::task_stacktrace(JfrSampleType type, JavaThread** last_thr
|
|||||||
const uint sample_limit = JAVA_SAMPLE == type ? MAX_NR_OF_JAVA_SAMPLES : MAX_NR_OF_NATIVE_SAMPLES;
|
const uint sample_limit = JAVA_SAMPLE == type ? MAX_NR_OF_JAVA_SAMPLES : MAX_NR_OF_NATIVE_SAMPLES;
|
||||||
uint num_samples = 0;
|
uint num_samples = 0;
|
||||||
JavaThread* start = NULL;
|
JavaThread* start = NULL;
|
||||||
|
|
||||||
{
|
{
|
||||||
elapsedTimer sample_time;
|
elapsedTimer sample_time;
|
||||||
sample_time.start();
|
sample_time.start();
|
||||||
@ -542,6 +559,15 @@ void JfrThreadSampler::task_stacktrace(JfrSampleType type, JavaThread** last_thr
|
|||||||
_cur_index = tlh.list()->find_index_of_JavaThread(*last_thread);
|
_cur_index = tlh.list()->find_index_of_JavaThread(*last_thread);
|
||||||
JavaThread* current = _cur_index != -1 ? *last_thread : NULL;
|
JavaThread* current = _cur_index != -1 ? *last_thread : NULL;
|
||||||
|
|
||||||
|
// Explicitly monitor the available space of the thread-local buffer used by the load barrier
|
||||||
|
// for enqueuing klasses as part of tagging methods. We do this because if space becomes sparse,
|
||||||
|
// we cannot rely on the implicit allocation of a new buffer as part of the regular tag mechanism.
|
||||||
|
// If the free list is empty, a malloc could result, and the problem with that is that the thread
|
||||||
|
// we have suspended could be the holder of the malloc lock. Instead, the buffer is pre-emptively
|
||||||
|
// renewed before thread suspension.
|
||||||
|
const JfrBuffer* enqueue_buffer = get_enqueue_buffer();
|
||||||
|
assert(enqueue_buffer != nullptr, "invariant");
|
||||||
|
|
||||||
while (num_samples < sample_limit) {
|
while (num_samples < sample_limit) {
|
||||||
current = next_thread(tlh.list(), start, current);
|
current = next_thread(tlh.list(), start, current);
|
||||||
if (current == NULL) {
|
if (current == NULL) {
|
||||||
@ -553,9 +579,11 @@ void JfrThreadSampler::task_stacktrace(JfrSampleType type, JavaThread** last_thr
|
|||||||
if (current->is_Compiler_thread()) {
|
if (current->is_Compiler_thread()) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
assert(enqueue_buffer->free_size() >= _min_size, "invariant");
|
||||||
if (sample_task.do_sample_thread(current, _frames, _max_frames, type)) {
|
if (sample_task.do_sample_thread(current, _frames, _max_frames, type)) {
|
||||||
num_samples++;
|
num_samples++;
|
||||||
}
|
}
|
||||||
|
enqueue_buffer = renew_if_full(enqueue_buffer);
|
||||||
}
|
}
|
||||||
*last_thread = current; // remember the thread we last attempted to sample
|
*last_thread = current; // remember the thread we last attempted to sample
|
||||||
}
|
}
|
||||||
|
@ -28,6 +28,7 @@
|
|||||||
#include "jfr/utilities/jfrAllocation.hpp"
|
#include "jfr/utilities/jfrAllocation.hpp"
|
||||||
|
|
||||||
class JavaThread;
|
class JavaThread;
|
||||||
|
class JfrBuffer;
|
||||||
class JfrStackFrame;
|
class JfrStackFrame;
|
||||||
class JfrThreadSampler;
|
class JfrThreadSampler;
|
||||||
class Thread;
|
class Thread;
|
||||||
|
@ -247,6 +247,14 @@ void JfrTraceIdKlassQueue::enqueue(const Klass* klass) {
|
|||||||
_queue->enqueue(klass);
|
_queue->enqueue(klass);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
JfrBuffer* JfrTraceIdKlassQueue::get_enqueue_buffer(Thread* thread) {
|
||||||
|
return _queue->thread_local_storage(thread);
|
||||||
|
}
|
||||||
|
|
||||||
|
JfrBuffer* JfrTraceIdKlassQueue::renew_enqueue_buffer(size_t size, Thread* thread) {
|
||||||
|
return _queue->renew(size, thread);
|
||||||
|
}
|
||||||
|
|
||||||
void JfrTraceIdKlassQueue::iterate(klass_callback callback, bool previous_epoch) {
|
void JfrTraceIdKlassQueue::iterate(klass_callback callback, bool previous_epoch) {
|
||||||
assert_locked_or_safepoint(ClassLoaderDataGraph_lock);
|
assert_locked_or_safepoint(ClassLoaderDataGraph_lock);
|
||||||
KlassFunctor functor(callback);
|
KlassFunctor functor(callback);
|
||||||
|
@ -28,6 +28,7 @@
|
|||||||
#include "jfr/utilities/jfrAllocation.hpp"
|
#include "jfr/utilities/jfrAllocation.hpp"
|
||||||
#include "jfr/utilities/jfrEpochQueue.hpp"
|
#include "jfr/utilities/jfrEpochQueue.hpp"
|
||||||
|
|
||||||
|
class JfrBuffer;
|
||||||
class Klass;
|
class Klass;
|
||||||
class Thread;
|
class Thread;
|
||||||
|
|
||||||
@ -47,7 +48,7 @@ class KlassFunctor {
|
|||||||
// It details how to store and process an enqueued Klass representation. See utilities/jfrEpochQueue.hpp.
|
// It details how to store and process an enqueued Klass representation. See utilities/jfrEpochQueue.hpp.
|
||||||
//
|
//
|
||||||
template <typename Buffer>
|
template <typename Buffer>
|
||||||
class JfrEpochQueueKlassPolicy {
|
class JfrEpochQueueKlassPolicy : public JfrCHeapObj {
|
||||||
public:
|
public:
|
||||||
typedef Buffer* BufferPtr;
|
typedef Buffer* BufferPtr;
|
||||||
typedef Klass Type;
|
typedef Klass Type;
|
||||||
@ -64,8 +65,11 @@ class JfrEpochQueueKlassPolicy {
|
|||||||
};
|
};
|
||||||
|
|
||||||
class JfrTraceIdKlassQueue : public JfrCHeapObj {
|
class JfrTraceIdKlassQueue : public JfrCHeapObj {
|
||||||
|
friend class JfrTraceIdLoadBarrier;
|
||||||
private:
|
private:
|
||||||
JfrEpochQueue<JfrEpochQueueKlassPolicy>* _queue;
|
JfrEpochQueue<JfrEpochQueueKlassPolicy>* _queue;
|
||||||
|
JfrBuffer* get_enqueue_buffer(Thread* thread);
|
||||||
|
JfrBuffer* renew_enqueue_buffer(size_t size, Thread* thread);
|
||||||
public:
|
public:
|
||||||
JfrTraceIdKlassQueue();
|
JfrTraceIdKlassQueue();
|
||||||
~JfrTraceIdKlassQueue();
|
~JfrTraceIdKlassQueue();
|
||||||
|
@ -68,3 +68,11 @@ void JfrTraceIdLoadBarrier::do_klasses(klass_callback callback, bool previous_ep
|
|||||||
assert_locked_or_safepoint(ClassLoaderDataGraph_lock);
|
assert_locked_or_safepoint(ClassLoaderDataGraph_lock);
|
||||||
klass_queue().iterate(callback, previous_epoch);
|
klass_queue().iterate(callback, previous_epoch);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
JfrBuffer* JfrTraceIdLoadBarrier::get_enqueue_buffer(Thread* thread) {
|
||||||
|
return klass_queue().get_enqueue_buffer(thread);
|
||||||
|
}
|
||||||
|
|
||||||
|
JfrBuffer* JfrTraceIdLoadBarrier::renew_enqueue_buffer(size_t size, Thread* thread) {
|
||||||
|
return klass_queue().renew_enqueue_buffer(size, thread);
|
||||||
|
}
|
||||||
|
@ -29,6 +29,7 @@
|
|||||||
#include "memory/allocation.hpp"
|
#include "memory/allocation.hpp"
|
||||||
|
|
||||||
class ClassLoaderData;
|
class ClassLoaderData;
|
||||||
|
class JfrBuffer;
|
||||||
class Klass;
|
class Klass;
|
||||||
class Method;
|
class Method;
|
||||||
class ModuleEntry;
|
class ModuleEntry;
|
||||||
@ -69,12 +70,16 @@ class PackageEntry;
|
|||||||
class JfrTraceIdLoadBarrier : AllStatic {
|
class JfrTraceIdLoadBarrier : AllStatic {
|
||||||
friend class Jfr;
|
friend class Jfr;
|
||||||
friend class JfrCheckpointManager;
|
friend class JfrCheckpointManager;
|
||||||
|
friend class JfrStackTrace;
|
||||||
|
friend class JfrThreadSampler;
|
||||||
private:
|
private:
|
||||||
static bool initialize();
|
static bool initialize();
|
||||||
static void clear();
|
static void clear();
|
||||||
static void destroy();
|
static void destroy();
|
||||||
static void enqueue(const Klass* klass);
|
static void enqueue(const Klass* klass);
|
||||||
static void load_barrier(const Klass* klass);
|
static void load_barrier(const Klass* klass);
|
||||||
|
static JfrBuffer* get_enqueue_buffer(Thread* thread);
|
||||||
|
static JfrBuffer* renew_enqueue_buffer(size_t size, Thread* thread);
|
||||||
public:
|
public:
|
||||||
static traceid load(const ClassLoaderData* cld);
|
static traceid load(const ClassLoaderData* cld);
|
||||||
static traceid load(const Klass* klass);
|
static traceid load(const Klass* klass);
|
||||||
|
@ -27,6 +27,7 @@
|
|||||||
#include "jfr/recorder/checkpoint/types/traceid/jfrTraceId.inline.hpp"
|
#include "jfr/recorder/checkpoint/types/traceid/jfrTraceId.inline.hpp"
|
||||||
#include "jfr/recorder/repository/jfrChunkWriter.hpp"
|
#include "jfr/recorder/repository/jfrChunkWriter.hpp"
|
||||||
#include "jfr/recorder/stacktrace/jfrStackTrace.hpp"
|
#include "jfr/recorder/stacktrace/jfrStackTrace.hpp"
|
||||||
|
#include "jfr/recorder/storage/jfrBuffer.hpp"
|
||||||
#include "jfr/support/jfrMethodLookup.hpp"
|
#include "jfr/support/jfrMethodLookup.hpp"
|
||||||
#include "memory/allocation.inline.hpp"
|
#include "memory/allocation.inline.hpp"
|
||||||
#include "oops/instanceKlass.inline.hpp"
|
#include "oops/instanceKlass.inline.hpp"
|
||||||
@ -175,11 +176,22 @@ void vframeStreamSamples::samples_next() {
|
|||||||
} while (!fill_from_frame());
|
} while (!fill_from_frame());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static const size_t min_valid_free_size_bytes = 16;
|
||||||
|
|
||||||
|
static inline bool is_full(const JfrBuffer* enqueue_buffer) {
|
||||||
|
return enqueue_buffer->free_size() < min_valid_free_size_bytes;
|
||||||
|
}
|
||||||
|
|
||||||
bool JfrStackTrace::record_thread(JavaThread& thread, frame& frame) {
|
bool JfrStackTrace::record_thread(JavaThread& thread, frame& frame) {
|
||||||
|
// Explicitly monitor the available space of the thread-local buffer used for enqueuing klasses as part of tagging methods.
|
||||||
|
// We do this because if space becomes sparse, we cannot rely on the implicit allocation of a new buffer as part of the
|
||||||
|
// regular tag mechanism. If the free list is empty, a malloc could result, and the problem with that is that the thread
|
||||||
|
// we have suspended could be the holder of the malloc lock. If there is no more available space, the attempt is aborted.
|
||||||
|
const JfrBuffer* const enqueue_buffer = JfrTraceIdLoadBarrier::get_enqueue_buffer(Thread::current());
|
||||||
|
assert(enqueue_buffer != nullptr, "invariant");
|
||||||
vframeStreamSamples st(&thread, frame, false);
|
vframeStreamSamples st(&thread, frame, false);
|
||||||
u4 count = 0;
|
u4 count = 0;
|
||||||
_reached_root = true;
|
_reached_root = true;
|
||||||
|
|
||||||
_hash = 1;
|
_hash = 1;
|
||||||
while (!st.at_end()) {
|
while (!st.at_end()) {
|
||||||
if (count >= _max_frames) {
|
if (count >= _max_frames) {
|
||||||
@ -187,7 +199,7 @@ bool JfrStackTrace::record_thread(JavaThread& thread, frame& frame) {
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
const Method* method = st.method();
|
const Method* method = st.method();
|
||||||
if (!Method::is_valid_method(method)) {
|
if (!Method::is_valid_method(method) || is_full(enqueue_buffer)) {
|
||||||
// we throw away everything we've gathered in this sample since
|
// we throw away everything we've gathered in this sample since
|
||||||
// none of it is safe
|
// none of it is safe
|
||||||
return false;
|
return false;
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
/*
|
/*
|
||||||
* Copyright (c) 2020, Oracle and/or its affiliates. All rights reserved.
|
* Copyright (c) 2021, Oracle and/or its affiliates. All rights reserved.
|
||||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||||
*
|
*
|
||||||
* This code is free software; you can redistribute it and/or modify it
|
* This code is free software; you can redistribute it and/or modify it
|
||||||
@ -27,6 +27,8 @@
|
|||||||
|
|
||||||
#include "jfr/recorder/storage/jfrEpochStorage.hpp"
|
#include "jfr/recorder/storage/jfrEpochStorage.hpp"
|
||||||
|
|
||||||
|
class Thread;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* An ElmentPolicy template template argument provides the implementation for how elements
|
* An ElmentPolicy template template argument provides the implementation for how elements
|
||||||
* associated with the queue is encoded and managed by exposing the following members:
|
* associated with the queue is encoded and managed by exposing the following members:
|
||||||
@ -43,7 +45,7 @@
|
|||||||
* size_t operator()(const u1* next_element, Callback& callback, bool previous_epoch = false);
|
* size_t operator()(const u1* next_element, Callback& callback, bool previous_epoch = false);
|
||||||
*/
|
*/
|
||||||
template <template <typename> class ElementPolicy>
|
template <template <typename> class ElementPolicy>
|
||||||
class JfrEpochQueue : public JfrCHeapObj {
|
class JfrEpochQueue : public ElementPolicy<typename JfrEpochStorage::Buffer> {
|
||||||
public:
|
public:
|
||||||
typedef JfrEpochStorage::Buffer Buffer;
|
typedef JfrEpochStorage::Buffer Buffer;
|
||||||
typedef JfrEpochStorage::BufferPtr BufferPtr;
|
typedef JfrEpochStorage::BufferPtr BufferPtr;
|
||||||
@ -53,22 +55,20 @@ class JfrEpochQueue : public JfrCHeapObj {
|
|||||||
~JfrEpochQueue();
|
~JfrEpochQueue();
|
||||||
bool initialize(size_t min_buffer_size, size_t free_list_cache_count_limit, size_t cache_prealloc_count);
|
bool initialize(size_t min_buffer_size, size_t free_list_cache_count_limit, size_t cache_prealloc_count);
|
||||||
void enqueue(TypePtr t);
|
void enqueue(TypePtr t);
|
||||||
|
BufferPtr renew(size_t size, Thread* thread);
|
||||||
template <typename Callback>
|
template <typename Callback>
|
||||||
void iterate(Callback& callback, bool previous_epoch = false);
|
void iterate(Callback& callback, bool previous_epoch = false);
|
||||||
private:
|
private:
|
||||||
typedef ElementPolicy<Buffer> Policy;
|
|
||||||
Policy _policy;
|
|
||||||
JfrEpochStorage* _storage;
|
JfrEpochStorage* _storage;
|
||||||
BufferPtr storage_for_element(TypePtr t, size_t element_size);
|
BufferPtr storage_for_element(TypePtr t, size_t element_size);
|
||||||
|
|
||||||
template <typename Callback>
|
template <typename Callback>
|
||||||
class ElementDispatch {
|
class ElementDispatch {
|
||||||
private:
|
private:
|
||||||
Callback& _callback;
|
Callback& _callback;
|
||||||
Policy& _policy;
|
JfrEpochQueue& _queue;
|
||||||
public:
|
public:
|
||||||
typedef Buffer Type;
|
typedef Buffer Type;
|
||||||
ElementDispatch(Callback& callback, Policy& policy);
|
ElementDispatch(Callback& callback, JfrEpochQueue& queue);
|
||||||
size_t operator()(const u1* element, bool previous_epoch);
|
size_t operator()(const u1* element, bool previous_epoch);
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
|
@ -26,13 +26,12 @@
|
|||||||
#define SHARE_JFR_UTILITIES_JFREPOCHQUEUE_INLINE_HPP
|
#define SHARE_JFR_UTILITIES_JFREPOCHQUEUE_INLINE_HPP
|
||||||
|
|
||||||
#include "jfr/utilities/jfrEpochQueue.hpp"
|
#include "jfr/utilities/jfrEpochQueue.hpp"
|
||||||
|
|
||||||
#include "jfr/recorder/storage/jfrEpochStorage.inline.hpp"
|
#include "jfr/recorder/storage/jfrEpochStorage.inline.hpp"
|
||||||
#include "jfr/recorder/storage/jfrStorageUtils.inline.hpp"
|
#include "jfr/recorder/storage/jfrStorageUtils.inline.hpp"
|
||||||
#include "runtime/thread.inline.hpp"
|
#include "runtime/thread.inline.hpp"
|
||||||
|
|
||||||
template <template <typename> class ElementPolicy>
|
template <template <typename> class ElementPolicy>
|
||||||
JfrEpochQueue<ElementPolicy>::JfrEpochQueue() : _policy(), _storage(NULL) {}
|
JfrEpochQueue<ElementPolicy>::JfrEpochQueue() : _storage(NULL) {}
|
||||||
|
|
||||||
template <template <typename> class ElementPolicy>
|
template <template <typename> class ElementPolicy>
|
||||||
JfrEpochQueue<ElementPolicy>::~JfrEpochQueue() {
|
JfrEpochQueue<ElementPolicy>::~JfrEpochQueue() {
|
||||||
@ -46,45 +45,61 @@ bool JfrEpochQueue<ElementPolicy>::initialize(size_t min_buffer_size, size_t fre
|
|||||||
return _storage != NULL && _storage->initialize(min_buffer_size, free_list_cache_count_limit, cache_prealloc_count);
|
return _storage != NULL && _storage->initialize(min_buffer_size, free_list_cache_count_limit, cache_prealloc_count);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
template <template <typename> class ElementPolicy>
|
||||||
|
inline typename JfrEpochQueue<ElementPolicy>::BufferPtr
|
||||||
|
JfrEpochQueue<ElementPolicy>::renew(size_t size, Thread* thread) {
|
||||||
|
assert(thread != nullptr, "invariant");
|
||||||
|
BufferPtr buffer = this->thread_local_storage(thread);
|
||||||
|
if (buffer != nullptr) {
|
||||||
|
_storage->release(buffer);
|
||||||
|
}
|
||||||
|
buffer = _storage->acquire(size, thread);
|
||||||
|
assert(buffer != nullptr, "invariant");
|
||||||
|
assert(buffer->free_size() >= size, "invariant");
|
||||||
|
this->set_thread_local_storage(buffer, thread);
|
||||||
|
assert(this->thread_local_storage(thread) == buffer, "invariant");
|
||||||
|
return buffer;
|
||||||
|
}
|
||||||
|
|
||||||
template <template <typename> class ElementPolicy>
|
template <template <typename> class ElementPolicy>
|
||||||
inline typename JfrEpochQueue<ElementPolicy>::BufferPtr
|
inline typename JfrEpochQueue<ElementPolicy>::BufferPtr
|
||||||
JfrEpochQueue<ElementPolicy>::storage_for_element(JfrEpochQueue<ElementPolicy>::TypePtr t, size_t element_size) {
|
JfrEpochQueue<ElementPolicy>::storage_for_element(JfrEpochQueue<ElementPolicy>::TypePtr t, size_t element_size) {
|
||||||
assert(_policy.element_size(t) == element_size, "invariant");
|
assert(this->element_size(t) == element_size, "invariant");
|
||||||
Thread* const thread = Thread::current();
|
Thread* const thread = Thread::current();
|
||||||
BufferPtr buffer = _policy.thread_local_storage(thread);
|
BufferPtr buffer = this->thread_local_storage(thread);
|
||||||
if (buffer == NULL) {
|
if (buffer == nullptr) {
|
||||||
buffer = _storage->acquire(element_size, thread);
|
buffer = _storage->acquire(element_size, thread);
|
||||||
_policy.set_thread_local_storage(buffer, thread);
|
this->set_thread_local_storage(buffer, thread);
|
||||||
} else if (buffer->free_size() < element_size) {
|
} else if (buffer->free_size() < element_size) {
|
||||||
_storage->release(buffer);
|
_storage->release(buffer);
|
||||||
buffer = _storage->acquire(element_size, thread);
|
buffer = _storage->acquire(element_size, thread);
|
||||||
_policy.set_thread_local_storage(buffer, thread);
|
this->set_thread_local_storage(buffer, thread);
|
||||||
}
|
}
|
||||||
assert(buffer->free_size() >= element_size, "invariant");
|
assert(buffer->free_size() >= element_size, "invariant");
|
||||||
assert(_policy.thread_local_storage(thread) == buffer, "invariant");
|
assert(this->thread_local_storage(thread) == buffer, "invariant");
|
||||||
return buffer;
|
return buffer;
|
||||||
}
|
}
|
||||||
|
|
||||||
template <template <typename> class ElementPolicy>
|
template <template <typename> class ElementPolicy>
|
||||||
void JfrEpochQueue<ElementPolicy>::enqueue(JfrEpochQueue<ElementPolicy>::TypePtr t) {
|
void JfrEpochQueue<ElementPolicy>::enqueue(JfrEpochQueue<ElementPolicy>::TypePtr t) {
|
||||||
assert(t != NULL, "invariant");
|
assert(t != nullptr, "invariant");
|
||||||
size_t element_size = _policy.element_size(t);
|
size_t element_size = this->element_size(t);
|
||||||
BufferPtr buffer = storage_for_element(t, element_size);
|
BufferPtr buffer = storage_for_element(t, element_size);
|
||||||
assert(buffer != NULL, "invariant");
|
assert(buffer != nullptr, "invariant");
|
||||||
_policy.store_element(t, buffer);
|
this->store_element(t, buffer);
|
||||||
buffer->set_pos(element_size);
|
buffer->set_pos(element_size);
|
||||||
}
|
}
|
||||||
|
|
||||||
template <template <typename> class ElementPolicy>
|
template <template <typename> class ElementPolicy>
|
||||||
template <typename Callback>
|
template <typename Callback>
|
||||||
JfrEpochQueue<ElementPolicy>::ElementDispatch<Callback>::ElementDispatch(Callback& callback, JfrEpochQueue<ElementPolicy>::Policy& policy) :
|
JfrEpochQueue<ElementPolicy>::ElementDispatch<Callback>::ElementDispatch(Callback& callback, JfrEpochQueue<ElementPolicy>& queue) :
|
||||||
_callback(callback),_policy(policy) {}
|
_callback(callback), _queue(queue) {}
|
||||||
|
|
||||||
template <template <typename> class ElementPolicy>
|
template <template <typename> class ElementPolicy>
|
||||||
template <typename Callback>
|
template <typename Callback>
|
||||||
size_t JfrEpochQueue<ElementPolicy>::ElementDispatch<Callback>::operator()(const u1* element, bool previous_epoch) {
|
size_t JfrEpochQueue<ElementPolicy>::ElementDispatch<Callback>::operator()(const u1* element, bool previous_epoch) {
|
||||||
assert(element != NULL, "invariant");
|
assert(element != nullptr, "invariant");
|
||||||
return _policy(element, _callback, previous_epoch);
|
return _queue(element, _callback, previous_epoch);
|
||||||
}
|
}
|
||||||
|
|
||||||
template <template <typename> class ElementPolicy>
|
template <template <typename> class ElementPolicy>
|
||||||
@ -92,7 +107,7 @@ template <typename Callback>
|
|||||||
void JfrEpochQueue<ElementPolicy>::iterate(Callback& callback, bool previous_epoch) {
|
void JfrEpochQueue<ElementPolicy>::iterate(Callback& callback, bool previous_epoch) {
|
||||||
typedef ElementDispatch<Callback> ElementDispatcher;
|
typedef ElementDispatch<Callback> ElementDispatcher;
|
||||||
typedef EpochDispatchOp<ElementDispatcher> QueueDispatcher;
|
typedef EpochDispatchOp<ElementDispatcher> QueueDispatcher;
|
||||||
ElementDispatcher element_dispatcher(callback, _policy);
|
ElementDispatcher element_dispatcher(callback, *this);
|
||||||
QueueDispatcher dispatch(element_dispatcher, previous_epoch);
|
QueueDispatcher dispatch(element_dispatcher, previous_epoch);
|
||||||
_storage->iterate(dispatch, previous_epoch);
|
_storage->iterate(dispatch, previous_epoch);
|
||||||
DEBUG_ONLY(_storage->verify_previous_empty();)
|
DEBUG_ONLY(_storage->verify_previous_empty();)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user