diff --git a/src/hotspot/share/logging/logTag.hpp b/src/hotspot/share/logging/logTag.hpp index c86cbcc2323..8c903b57c11 100644 --- a/src/hotspot/share/logging/logTag.hpp +++ b/src/hotspot/share/logging/logTag.hpp @@ -90,6 +90,7 @@ class outputStream; LOG_TAG(handshake) \ LOG_TAG(hashtables) \ LOG_TAG(heap) \ + LOG_TAG(heapdump) \ NOT_PRODUCT(LOG_TAG(heapsampling)) \ LOG_TAG(humongous) \ LOG_TAG(ihop) \ diff --git a/src/hotspot/share/runtime/thread.hpp b/src/hotspot/share/runtime/thread.hpp index 80114ddf7ed..42885e935f7 100644 --- a/src/hotspot/share/runtime/thread.hpp +++ b/src/hotspot/share/runtime/thread.hpp @@ -321,6 +321,7 @@ class Thread: public ThreadShadow { virtual bool is_Named_thread() const { return false; } virtual bool is_Worker_thread() const { return false; } virtual bool is_JfrSampler_thread() const { return false; } + virtual bool is_AttachListener_thread() const { return false; } virtual bool is_monitor_deflation_thread() const { return false; } // Can this thread make Java upcalls diff --git a/src/hotspot/share/runtime/vmOperation.hpp b/src/hotspot/share/runtime/vmOperation.hpp index d6c0ccc131a..147cf9dc977 100644 --- a/src/hotspot/share/runtime/vmOperation.hpp +++ b/src/hotspot/share/runtime/vmOperation.hpp @@ -48,6 +48,7 @@ template(ZombieAll) \ template(Verify) \ template(HeapDumper) \ + template(HeapDumpMerge) \ template(CollectForMetadataAllocation) \ template(CollectForCodeCacheAllocation) \ template(GC_HeapInspection) \ diff --git a/src/hotspot/share/runtime/vmStructs.cpp b/src/hotspot/share/runtime/vmStructs.cpp index 15b6b1e018f..2a2fc9acc74 100644 --- a/src/hotspot/share/runtime/vmStructs.cpp +++ b/src/hotspot/share/runtime/vmStructs.cpp @@ -110,6 +110,7 @@ #include "runtime/vframeArray.hpp" #include "runtime/vmStructs.hpp" #include "runtime/vm_version.hpp" +#include "services/attachListener.hpp" #include "utilities/globalDefinitions.hpp" #include "utilities/macros.hpp" #include "utilities/vmError.hpp" @@ -1262,6 +1263,7 @@ declare_type(NotificationThread, JavaThread) \ declare_type(CompilerThread, JavaThread) \ declare_type(StringDedupThread, JavaThread) \ + declare_type(AttachListenerThread, JavaThread) \ declare_toplevel_type(OSThread) \ declare_toplevel_type(JavaFrameAnchor) \ \ diff --git a/src/hotspot/share/services/attachListener.cpp b/src/hotspot/share/services/attachListener.cpp index 8c03ececee0..50830c4bf2a 100644 --- a/src/hotspot/share/services/attachListener.cpp +++ b/src/hotspot/share/services/attachListener.cpp @@ -244,15 +244,12 @@ jint dump_heap(AttachOperation* op, outputStream* out) { return JNI_ERR; } } - // Parallel thread number for heap dump, initialize based on active processor count. - // Note the real number of threads used is also determined by active workers and compression - // backend thread number. See heapDumper.cpp. - uint parallel_thread_num = MAX2(1, (uint)os::initial_active_processor_count() * 3 / 8); + // Request a full GC before heap dump if live_objects_only = true // This helps reduces the amount of unreachable objects in the dump // and makes it easier to browse. HeapDumper dumper(live_objects_only /* request GC */); - dumper.dump(path, out, (int)level, false, (uint)parallel_thread_num); + dumper.dump(path, out, (int)level, false, HeapDumper::default_num_of_dump_threads()); } return JNI_OK; } @@ -375,7 +372,7 @@ static AttachOperationFunctionInfo funcs[] = { // from the queue, examines the operation name (command), and dispatches // to the corresponding function to perform the operation. -static void attach_listener_thread_entry(JavaThread* thread, TRAPS) { +void AttachListenerThread::thread_entry(JavaThread* thread, TRAPS) { os::set_priority(thread, NearMaxPriority); assert(thread == Thread::current(), "Must be"); @@ -460,7 +457,7 @@ void AttachListener::init() { return; } - JavaThread* thread = new JavaThread(&attach_listener_thread_entry); + JavaThread* thread = new AttachListenerThread(); JavaThread::vm_exit_on_osthread_failure(thread); JavaThread::start_internal_daemon(THREAD, thread, thread_oop, NoPriority); diff --git a/src/hotspot/share/services/attachListener.hpp b/src/hotspot/share/services/attachListener.hpp index c95adae8cad..5cf32cc51ff 100644 --- a/src/hotspot/share/services/attachListener.hpp +++ b/src/hotspot/share/services/attachListener.hpp @@ -28,6 +28,7 @@ #include "memory/allStatic.hpp" #include "runtime/atomic.hpp" #include "runtime/globals.hpp" +#include "runtime/javaThread.inline.hpp" #include "utilities/debug.hpp" #include "utilities/exceptions.hpp" #include "utilities/globalDefinitions.hpp" @@ -58,6 +59,15 @@ enum AttachListenerState { AL_INITIALIZED }; +class AttachListenerThread : public JavaThread { +private: + static void thread_entry(JavaThread* thread, TRAPS); + +public: + AttachListenerThread() : JavaThread(&AttachListenerThread::thread_entry) {} + bool is_AttachListener_thread() const { return true; } +}; + class AttachListener: AllStatic { public: static void vm_start() NOT_SERVICES_RETURN; diff --git a/src/hotspot/share/services/diagnosticCommand.cpp b/src/hotspot/share/services/diagnosticCommand.cpp index ba6e12c9e25..2ef389d6a95 100644 --- a/src/hotspot/share/services/diagnosticCommand.cpp +++ b/src/hotspot/share/services/diagnosticCommand.cpp @@ -469,15 +469,20 @@ HeapDumpDCmd::HeapDumpDCmd(outputStream* output, bool heap) : "using the given compression level. 1 (recommended) is the fastest, " "9 the strongest compression.", "INT", false, "1"), _overwrite("-overwrite", "If specified, the dump file will be overwritten if it exists", - "BOOLEAN", false, "false") { + "BOOLEAN", false, "false"), + _parallel("-parallel", "Number of parallel threads to use for heap dump. The VM " + "will try to use the specified number of threads, but might use fewer.", + "INT", false, "1") { _dcmdparser.add_dcmd_option(&_all); _dcmdparser.add_dcmd_argument(&_filename); _dcmdparser.add_dcmd_option(&_gzip); _dcmdparser.add_dcmd_option(&_overwrite); + _dcmdparser.add_dcmd_option(&_parallel); } void HeapDumpDCmd::execute(DCmdSource source, TRAPS) { jlong level = -1; // -1 means no compression. + jlong parallel = HeapDumper::default_num_of_dump_threads(); if (_gzip.is_set()) { level = _gzip.value(); @@ -488,11 +493,23 @@ void HeapDumpDCmd::execute(DCmdSource source, TRAPS) { } } + if (_parallel.is_set()) { + parallel = _parallel.value(); + + if (parallel < 0) { + output()->print_cr("Invalid number of parallel dump threads."); + return; + } else if (parallel == 0) { + // 0 implies to disable parallel heap dump, in such case, we use serial dump instead + parallel = 1; + } + } + // Request a full GC before heap dump if _all is false // This helps reduces the amount of unreachable objects in the dump // and makes it easier to browse. HeapDumper dumper(!_all.value() /* request GC if _all is false*/); - dumper.dump(_filename.value(), output(), (int) level, _overwrite.value()); + dumper.dump(_filename.value(), output(), (int) level, _overwrite.value(), (uint)parallel); } ClassHistogramDCmd::ClassHistogramDCmd(outputStream* output, bool heap) : diff --git a/src/hotspot/share/services/diagnosticCommand.hpp b/src/hotspot/share/services/diagnosticCommand.hpp index 28bd6a9f14e..685223ac038 100644 --- a/src/hotspot/share/services/diagnosticCommand.hpp +++ b/src/hotspot/share/services/diagnosticCommand.hpp @@ -320,8 +320,9 @@ protected: DCmdArgument _all; DCmdArgument _gzip; DCmdArgument _overwrite; + DCmdArgument _parallel; public: - static int num_arguments() { return 4; } + static int num_arguments() { return 5; } HeapDumpDCmd(outputStream* output, bool heap); static const char* name() { return "GC.heap_dump"; diff --git a/src/hotspot/share/services/heapDumper.cpp b/src/hotspot/share/services/heapDumper.cpp index 1d947cff1d0..2a3650f850a 100644 --- a/src/hotspot/share/services/heapDumper.cpp +++ b/src/hotspot/share/services/heapDumper.cpp @@ -1,5 +1,6 @@ /* * Copyright (c) 2005, 2023, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2023, Alibaba Group Holding Limited. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -54,6 +55,7 @@ #include "runtime/vframe.hpp" #include "runtime/vmOperations.hpp" #include "runtime/vmThread.hpp" +#include "runtime/timerTrace.hpp" #include "services/heapDumper.hpp" #include "services/heapDumperCompression.hpp" #include "services/threadService.hpp" @@ -382,11 +384,10 @@ enum { // Supports I/O operations for a dump // Base class for dump and parallel dump -class AbstractDumpWriter : public StackObj { +class AbstractDumpWriter : public ResourceObj { protected: enum { io_buffer_max_size = 1*M, - io_buffer_max_waste = 10*K, dump_segment_header_size = 9 }; @@ -399,8 +400,6 @@ class AbstractDumpWriter : public StackObj { DEBUG_ONLY(size_t _sub_record_left;) // The bytes not written for the current sub-record. DEBUG_ONLY(bool _sub_record_ended;) // True if we have called the end_sub_record(). - virtual void flush(bool force = false) = 0; - char* buffer() const { return _buffer; } size_t buffer_size() const { return _size; } void set_position(size_t pos) { _pos = pos; } @@ -420,8 +419,9 @@ class AbstractDumpWriter : public StackObj { _pos(0), _in_dump_segment(false) { } - // total number of bytes written to the disk + // Total number of bytes written to the disk virtual julong bytes_written() const = 0; + // Return non-null if error occurred virtual char const* error() const = 0; size_t position() const { return _pos; } @@ -442,18 +442,9 @@ class AbstractDumpWriter : public StackObj { // Ends the current sub-record. void end_sub_record(); // Finishes the current dump segment if not already finished. - void finish_dump_segment(bool force_flush = false); - // Refresh to get new buffer - void refresh() { - assert (_in_dump_segment ==false, "Sanity check"); - _buffer = nullptr; - _size = io_buffer_max_size; - _pos = 0; - // Force flush to guarantee data from parallel dumper are written. - flush(true); - } - // Called when finished to release the threads. - virtual void deactivate() = 0; + void finish_dump_segment(); + // Flush internal buffer to persistent storage + virtual void flush() = 0; }; void AbstractDumpWriter::write_fast(const void* s, size_t len) { @@ -548,7 +539,7 @@ void AbstractDumpWriter::write_classID(Klass* k) { write_objectID(k->java_mirror()); } -void AbstractDumpWriter::finish_dump_segment(bool force_flush) { +void AbstractDumpWriter::finish_dump_segment() { if (_in_dump_segment) { assert(_sub_record_left == 0, "Last sub-record not written completely"); assert(_sub_record_ended, "sub-record must have ended"); @@ -566,7 +557,7 @@ void AbstractDumpWriter::finish_dump_segment(bool force_flush) { } _in_dump_segment = false; - flush(force_flush); + flush(); } } @@ -611,277 +602,119 @@ void AbstractDumpWriter::end_sub_record() { // Supports I/O operations for a dump class DumpWriter : public AbstractDumpWriter { - private: - CompressionBackend _backend; // Does the actual writing. - protected: - void flush(bool force = false) override; +private: + FileWriter* _writer; + AbstractCompressor* _compressor; + size_t _bytes_written; + char* _error; + // Compression support + char* _out_buffer; + size_t _out_size; + size_t _out_pos; + char* _tmp_buffer; + size_t _tmp_size; - public: - // Takes ownership of the writer and compressor. - DumpWriter(AbstractWriter* writer, AbstractCompressor* compressor); +private: + void do_compress(); - // total number of bytes written to the disk - julong bytes_written() const override { return (julong) _backend.get_written(); } - - char const* error() const override { return _backend.error(); } - - // Called by threads used for parallel writing. - void writer_loop() { _backend.thread_loop(); } - // Called when finish to release the threads. - void deactivate() override { flush(); _backend.deactivate(); } - // Get the backend pointer, used by parallel dump writer. - CompressionBackend* backend_ptr() { return &_backend; } +public: + DumpWriter(const char* path, bool overwrite, AbstractCompressor* compressor); + ~DumpWriter(); + julong bytes_written() const override { return (julong) _bytes_written; } + void set_bytes_written(julong bytes_written) { _bytes_written = bytes_written; } + char const* error() const override { return _error; } + void set_error(const char* error) { _error = (char*)error; } + bool has_error() const { return _error != nullptr; } + const char* get_file_path() const { return _writer->get_file_path(); } + AbstractCompressor* compressor() { return _compressor; } + void set_compressor(AbstractCompressor* p) { _compressor = p; } + bool is_overwrite() const { return _writer->is_overwrite(); } + void flush() override; }; -// Check for error after constructing the object and destroy it in case of an error. -DumpWriter::DumpWriter(AbstractWriter* writer, AbstractCompressor* compressor) : +DumpWriter::DumpWriter(const char* path, bool overwrite, AbstractCompressor* compressor) : AbstractDumpWriter(), - _backend(writer, compressor, io_buffer_max_size, io_buffer_max_waste) { - flush(); + _writer(new (std::nothrow) FileWriter(path, overwrite)), + _compressor(compressor), + _bytes_written(0), + _error(nullptr), + _out_buffer(nullptr), + _out_size(0), + _out_pos(0), + _tmp_buffer(nullptr), + _tmp_size(0) { + _error = (char*)_writer->open_writer(); + if (_error == nullptr) { + _buffer = (char*)os::malloc(io_buffer_max_size, mtInternal); + if (compressor != nullptr) { + _error = (char*)_compressor->init(io_buffer_max_size, &_out_size, &_tmp_size); + if (_error == nullptr) { + if (_out_size > 0) { + _out_buffer = (char*)os::malloc(_out_size, mtInternal); + } + if (_tmp_size > 0) { + _tmp_buffer = (char*)os::malloc(_tmp_size, mtInternal); + } + } + } + } + // initialize internal buffer + _pos = 0; + _size = io_buffer_max_size; +} + +DumpWriter::~DumpWriter(){ + if (_buffer != nullptr) { + os::free(_buffer); + } + if (_out_buffer != nullptr) { + os::free(_out_buffer); + } + if (_tmp_buffer != nullptr) { + os::free(_tmp_buffer); + } + if (_writer != NULL) { + delete _writer; + } + _bytes_written = -1; } // flush any buffered bytes to the file -void DumpWriter::flush(bool force) { - _backend.get_new_buffer(&_buffer, &_pos, &_size, force); +void DumpWriter::flush() { + if (_pos <= 0) { + return; + } + if (has_error()) { + _pos = 0; + return; + } + char* result = nullptr; + if (_compressor == nullptr) { + result = (char*)_writer->write_buf(_buffer, _pos); + _bytes_written += _pos; + } else { + do_compress(); + if (!has_error()) { + result = (char*)_writer->write_buf(_out_buffer, _out_pos); + _bytes_written += _out_pos; + } + } + _pos = 0; // reset pos to make internal buffer available + + if (result != nullptr) { + set_error(result); + } } -// Buffer queue used for parallel dump. -struct ParWriterBufferQueueElem { - char* _buffer; - size_t _used; - ParWriterBufferQueueElem* _next; -}; +void DumpWriter::do_compress() { + const char* msg = _compressor->compress(_buffer, _pos, _out_buffer, _out_size, + _tmp_buffer, _tmp_size, &_out_pos); -class ParWriterBufferQueue : public CHeapObj { - private: - ParWriterBufferQueueElem* _head; - ParWriterBufferQueueElem* _tail; - uint _length; - public: - ParWriterBufferQueue() : _head(nullptr), _tail(nullptr), _length(0) { } - - void enqueue(ParWriterBufferQueueElem* entry) { - if (_head == nullptr) { - assert(is_empty() && _tail == nullptr, "Sanity check"); - _head = _tail = entry; - } else { - assert ((_tail->_next == nullptr && _tail->_buffer != nullptr), "Buffer queue is polluted"); - _tail->_next = entry; - _tail = entry; - } - _length++; - assert(_tail->_next == nullptr, "Buffer queue is polluted"); + if (msg != nullptr) { + set_error(msg); } - - ParWriterBufferQueueElem* dequeue() { - if (_head == nullptr) return nullptr; - ParWriterBufferQueueElem* entry = _head; - assert (entry->_buffer != nullptr, "polluted buffer in writer list"); - _head = entry->_next; - if (_head == nullptr) { - _tail = nullptr; - } - entry->_next = nullptr; - _length--; - return entry; - } - - bool is_empty() { - return _length == 0; - } - - uint length() { return _length; } -}; - -// Support parallel heap dump. -class ParDumpWriter : public AbstractDumpWriter { - private: - // Lock used to guarantee the integrity of multiple buffers writing. - static Monitor* _lock; - // Pointer of backend from global DumpWriter. - CompressionBackend* _backend_ptr; - char const * _err; - ParWriterBufferQueue* _buffer_queue; - size_t _internal_buffer_used; - char* _buffer_base; - bool _split_data; - static const uint BackendFlushThreshold = 2; - protected: - void flush(bool force = false) override { - assert(_pos != 0, "must not be zero"); - if (_pos != 0) { - refresh_buffer(); - } - - if (_split_data || _is_huge_sub_record) { - return; - } - - if (should_flush_buf_list(force)) { - assert(!_in_dump_segment && !_split_data && !_is_huge_sub_record, "incomplete data send to backend!\n"); - flush_to_backend(force); - } - } - - public: - // Check for error after constructing the object and destroy it in case of an error. - ParDumpWriter(DumpWriter* dw) : - AbstractDumpWriter(), - _backend_ptr(dw->backend_ptr()), - _buffer_queue((new (std::nothrow) ParWriterBufferQueue())), - _buffer_base(nullptr), - _split_data(false) { - // prepare internal buffer - allocate_internal_buffer(); - } - - ~ParDumpWriter() { - assert(_buffer_queue != nullptr, "Sanity check"); - assert((_internal_buffer_used == 0) && (_buffer_queue->is_empty()), - "All data must be send to backend"); - if (_buffer_base != nullptr) { - os::free(_buffer_base); - _buffer_base = nullptr; - } - delete _buffer_queue; - _buffer_queue = nullptr; - } - - // total number of bytes written to the disk - julong bytes_written() const override { return (julong) _backend_ptr->get_written(); } - char const* error() const override { return _err == nullptr ? _backend_ptr->error() : _err; } - - static void before_work() { - assert(_lock == nullptr, "ParDumpWriter lock must be initialized only once"); - _lock = new (std::nothrow) PaddedMonitor(Mutex::safepoint, "ParallelHProfWriter_lock"); - } - - static void after_work() { - assert(_lock != nullptr, "ParDumpWriter lock is not initialized"); - delete _lock; - _lock = nullptr; - } - - // write raw bytes - void write_raw(const void* s, size_t len) override { - assert(!_in_dump_segment || (_sub_record_left >= len), "sub-record too large"); - debug_only(_sub_record_left -= len); - assert(!_split_data, "Invalid split data"); - _split_data = true; - // flush buffer to make room. - while (len > buffer_size() - position()) { - assert(!_in_dump_segment || _is_huge_sub_record, - "Cannot overflow in non-huge sub-record."); - size_t to_write = buffer_size() - position(); - memcpy(buffer() + position(), s, to_write); - s = (void*) ((char*) s + to_write); - len -= to_write; - set_position(position() + to_write); - flush(); - } - _split_data = false; - memcpy(buffer() + position(), s, len); - set_position(position() + len); - } - - void deactivate() override { flush(true); _backend_ptr->deactivate(); } - - private: - void allocate_internal_buffer() { - assert(_buffer_queue != nullptr, "Internal buffer queue is not ready when allocate internal buffer"); - assert(_buffer == nullptr && _buffer_base == nullptr, "current buffer must be null before allocate"); - _buffer_base = _buffer = (char*)os::malloc(io_buffer_max_size, mtInternal); - if (_buffer == nullptr) { - set_error("Could not allocate buffer for writer"); - return; - } - _pos = 0; - _internal_buffer_used = 0; - _size = io_buffer_max_size; - } - - void set_error(char const* new_error) { - if ((new_error != nullptr) && (_err == nullptr)) { - _err = new_error; - } - } - - // Add buffer to internal list - void refresh_buffer() { - size_t expected_total = _internal_buffer_used + _pos; - if (expected_total < io_buffer_max_size - io_buffer_max_waste) { - // reuse current buffer. - _internal_buffer_used = expected_total; - assert(_size - _pos == io_buffer_max_size - expected_total, "illegal resize of buffer"); - _size -= _pos; - _buffer += _pos; - _pos = 0; - - return; - } - // It is not possible here that expected_total is larger than io_buffer_max_size because - // of limitation in write_xxx(). - assert(expected_total <= io_buffer_max_size, "buffer overflow"); - assert(_buffer - _buffer_base <= io_buffer_max_size, "internal buffer overflow"); - ParWriterBufferQueueElem* entry = - (ParWriterBufferQueueElem*)os::malloc(sizeof(ParWriterBufferQueueElem), mtInternal); - if (entry == nullptr) { - set_error("Heap dumper can allocate memory"); - return; - } - entry->_buffer = _buffer_base; - entry->_used = expected_total; - entry->_next = nullptr; - // add to internal buffer queue - _buffer_queue->enqueue(entry); - _buffer_base =_buffer = nullptr; - allocate_internal_buffer(); - } - - void reclaim_entry(ParWriterBufferQueueElem* entry) { - assert(entry != nullptr && entry->_buffer != nullptr, "Invalid entry to reclaim"); - os::free(entry->_buffer); - entry->_buffer = nullptr; - os::free(entry); - } - - void flush_buffer(char* buffer, size_t used) { - assert(_lock->owner() == Thread::current(), "flush buffer must hold lock"); - size_t max = io_buffer_max_size; - // get_new_buffer - _backend_ptr->flush_external_buffer(buffer, used, max); - } - - bool should_flush_buf_list(bool force) { - return force || _buffer_queue->length() > BackendFlushThreshold; - } - - void flush_to_backend(bool force) { - // Guarantee there is only one writer updating the backend buffers. - MonitorLocker ml(_lock, Mutex::_no_safepoint_check_flag); - while (!_buffer_queue->is_empty()) { - ParWriterBufferQueueElem* entry = _buffer_queue->dequeue(); - flush_buffer(entry->_buffer, entry->_used); - // Delete buffer and entry. - reclaim_entry(entry); - entry = nullptr; - } - assert(_pos == 0, "available buffer must be empty before flush"); - // Flush internal buffer. - if (_internal_buffer_used > 0) { - flush_buffer(_buffer_base, _internal_buffer_used); - os::free(_buffer_base); - _pos = 0; - _internal_buffer_used = 0; - _buffer_base = _buffer = nullptr; - // Allocate internal buffer for future use. - allocate_internal_buffer(); - } - } -}; - -Monitor* ParDumpWriter::_lock = nullptr; +} // Support class with a collection of functions used when dumping the heap @@ -1613,87 +1446,17 @@ class StickyClassDumper : public KlassClosure { } }; -// Large object heap dump support. -// To avoid memory consumption, when dumping large objects such as huge array and -// large objects whose size are larger than LARGE_OBJECT_DUMP_THRESHOLD, the scanned -// partial object/array data will be sent to the backend directly instead of caching -// the whole object/array in the internal buffer. -// The HeapDumpLargeObjectList is used to save the large object when dumper scans -// the heap. The large objects could be added (push) parallelly by multiple dumpers, -// But they will be removed (popped) serially only by the VM thread. -class HeapDumpLargeObjectList : public CHeapObj { - private: - class HeapDumpLargeObjectListElem : public CHeapObj { - public: - HeapDumpLargeObjectListElem(oop obj) : _obj(obj), _next(nullptr) { } - oop _obj; - HeapDumpLargeObjectListElem* _next; - }; - - volatile HeapDumpLargeObjectListElem* _head; - - public: - HeapDumpLargeObjectList() : _head(nullptr) { } - - void atomic_push(oop obj) { - assert (obj != nullptr, "sanity check"); - HeapDumpLargeObjectListElem* entry = new HeapDumpLargeObjectListElem(obj); - if (entry == nullptr) { - warning("failed to allocate element for large object list"); - return; - } - assert (entry->_obj != nullptr, "sanity check"); - while (true) { - volatile HeapDumpLargeObjectListElem* old_head = Atomic::load_acquire(&_head); - HeapDumpLargeObjectListElem* new_head = entry; - if (Atomic::cmpxchg(&_head, old_head, new_head) == old_head) { - // successfully push - new_head->_next = (HeapDumpLargeObjectListElem*)old_head; - return; - } - } - } - - oop pop() { - if (_head == nullptr) { - return nullptr; - } - HeapDumpLargeObjectListElem* entry = (HeapDumpLargeObjectListElem*)_head; - _head = _head->_next; - assert (entry != nullptr, "illegal larger object list entry"); - oop ret = entry->_obj; - delete entry; - assert (ret != nullptr, "illegal oop pointer"); - return ret; - } - - void drain(ObjectClosure* cl) { - while (_head != nullptr) { - cl->do_object(pop()); - } - } - - bool is_empty() { - return _head == nullptr; - } - - static const size_t LargeObjectSizeThreshold = 1 << 20; // 1 MB -}; - class VM_HeapDumper; // Support class using when iterating over the heap. class HeapObjectDumper : public ObjectClosure { private: AbstractDumpWriter* _writer; - HeapDumpLargeObjectList* _list; - AbstractDumpWriter* writer() { return _writer; } - bool is_large(oop o); + public: - HeapObjectDumper(AbstractDumpWriter* writer, HeapDumpLargeObjectList* list = nullptr) { + HeapObjectDumper(AbstractDumpWriter* writer) { _writer = writer; - _list = list; } // called for each object in the heap @@ -1713,13 +1476,6 @@ void HeapObjectDumper::do_object(oop o) { return; } - // If large object list exists and it is large object/array, - // add oop into the list and skip scan. VM thread will process it later. - if (_list != nullptr && is_large(o)) { - _list->atomic_push(o); - return; - } - if (o->is_instance()) { // create a HPROF_GC_INSTANCE record for each object DumperSupport::dump_instance(writer(), o); @@ -1732,78 +1488,138 @@ void HeapObjectDumper::do_object(oop o) { } } -bool HeapObjectDumper::is_large(oop o) { - size_t size = 0; - if (o->is_instance()) { - // Use o->size() * 8 as the upper limit of instance size to avoid iterating static fields - size = o->size() * 8; - } else if (o->is_objArray()) { - objArrayOop array = objArrayOop(o); - BasicType type = ArrayKlass::cast(array->klass())->element_type(); - assert(type >= T_BOOLEAN && type <= T_OBJECT, "invalid array element type"); - int length = array->length(); - int type_size = sizeof(address); - size = (size_t)length * type_size; - } else if (o->is_typeArray()) { - typeArrayOop array = typeArrayOop(o); - BasicType type = ArrayKlass::cast(array->klass())->element_type(); - assert(type >= T_BOOLEAN && type <= T_OBJECT, "invalid array element type"); - int length = array->length(); - int type_size = type2aelembytes(type); - size = (size_t)length * type_size; - } - return size > HeapDumpLargeObjectList::LargeObjectSizeThreshold; -} - // The dumper controller for parallel heap dump class DumperController : public CHeapObj { private: - bool _started; Monitor* _lock; - uint _dumper_number; + const uint _dumper_number; uint _complete_number; public: DumperController(uint number) : - _started(false), _lock(new (std::nothrow) PaddedMonitor(Mutex::safepoint, "DumperController_lock")), _dumper_number(number), _complete_number(0) { } ~DumperController() { delete _lock; } - void wait_for_start_signal() { - MonitorLocker ml(_lock, Mutex::_no_safepoint_check_flag); - while (_started == false) { - ml.wait(); - } - assert(_started == true, "dumper woke up with wrong state"); - } - - void start_dump() { - assert (_started == false, "start dump with wrong state"); - MonitorLocker ml(_lock, Mutex::_no_safepoint_check_flag); - _started = true; - ml.notify_all(); - } - - void dumper_complete() { - assert (_started == true, "dumper complete with wrong state"); + void dumper_complete(DumpWriter* local_writer, DumpWriter* global_writer) { MonitorLocker ml(_lock, Mutex::_no_safepoint_check_flag); _complete_number++; + // propagate local error to global if any + if (local_writer->has_error()) { + global_writer->set_error(local_writer->error()); + } ml.notify(); } void wait_all_dumpers_complete() { - assert (_started == true, "wrong state when wait for dumper complete"); MonitorLocker ml(_lock, Mutex::_no_safepoint_check_flag); while (_complete_number != _dumper_number) { ml.wait(); } - _started = false; } }; +// DumpMerger merges separate dump files into a complete one +class DumpMerger : public StackObj { +private: + DumpWriter* _writer; + const char* _path; + bool _has_error; + int _dump_seq; + +private: + void merge_file(char* path); + void merge_done(); + +public: + DumpMerger(const char* path, DumpWriter* writer, int dump_seq) : + _writer(writer), + _path(path), + _has_error(_writer->has_error()), + _dump_seq(dump_seq) {} + + void do_merge(); +}; + +void DumpMerger::merge_done() { + // Writes the HPROF_HEAP_DUMP_END record. + if (!_has_error) { + DumperSupport::end_of_dump(_writer); + _writer->flush(); + } + _dump_seq = 0; //reset +} + +void DumpMerger::merge_file(char* path) { + assert(!SafepointSynchronize::is_at_safepoint(), "merging happens outside safepoint"); + TraceTime timer("Merge segmented heap file", TRACETIME_LOG(Info, heapdump)); + + fileStream segment_fs(path, "rb"); + if (!segment_fs.is_open()) { + log_error(heapdump)("Can not open segmented heap file %s during merging", path); + _writer->set_error("Can not open segmented heap file during merging"); + _has_error = true; + return; + } + + jlong total = 0; + size_t cnt = 0; + char read_buf[4096]; + while ((cnt = segment_fs.read(read_buf, 1, 4096)) != 0) { + _writer->write_raw(read_buf, cnt); + total += cnt; + } + + _writer->flush(); + if (segment_fs.fileSize() != total) { + log_error(heapdump)("Merged heap dump %s is incomplete, expect %ld but read " JLONG_FORMAT " bytes", + path, segment_fs.fileSize(), total); + _writer->set_error("Merged heap dump is incomplete"); + _has_error = true; + } +} + +void DumpMerger::do_merge() { + assert(!SafepointSynchronize::is_at_safepoint(), "merging happens outside safepoint"); + TraceTime timer("Merge heap files complete", TRACETIME_LOG(Info, heapdump)); + + // Since contents in segmented heap file were already zipped, we don't need to zip + // them again during merging. + AbstractCompressor* saved_compressor = _writer->compressor(); + _writer->set_compressor(nullptr); + + // merge segmented heap file and remove it anyway + char path[JVM_MAXPATHLEN]; + for (int i = 0; i < _dump_seq; i++) { + memset(path, 0, JVM_MAXPATHLEN); + os::snprintf(path, JVM_MAXPATHLEN, "%s.p%d", _path, i); + if (!_has_error) { + merge_file(path); + } + remove(path); + } + + // restore compressor for further use + _writer->set_compressor(saved_compressor); + merge_done(); +} + +// The VM operation wraps DumpMerger so that it could be performed by VM thread +class VM_HeapDumpMerge : public VM_Operation { +private: + DumpMerger* _merger; +public: + VM_HeapDumpMerge(DumpMerger* merger) : _merger(merger) {} + VMOp_Type type() const { return VMOp_HeapDumpMerge; } + // heap dump merge could happen outside safepoint + virtual bool evaluate_at_safepoint() const { return false; } + void doit() { + _merger->do_merge(); + } +}; + // The VM operation that performs the heap dump class VM_HeapDumper : public VM_GC_Operation, public WorkerTask { private: @@ -1816,69 +1632,20 @@ class VM_HeapDumper : public VM_GC_Operation, public WorkerTask { GrowableArray* _klass_map; ThreadStackTrace** _stack_traces; int _num_threads; + volatile int _dump_seq; // parallel heap dump support uint _num_dumper_threads; - uint _num_writer_threads; DumperController* _dumper_controller; ParallelObjectIterator* _poi; - HeapDumpLargeObjectList* _large_object_list; - - // VMDumperType is for thread that dumps both heap and non-heap data. - static const size_t VMDumperType = 0; - static const size_t WriterType = 1; - static const size_t DumperType = 2; // worker id of VMDumper thread. static const size_t VMDumperWorkerId = 0; - - size_t get_worker_type(uint worker_id) { - assert(_num_writer_threads >= 1, "Must be at least one writer"); - // worker id of VMDumper that dump heap and non-heap data - if (worker_id == VMDumperWorkerId) { - return VMDumperType; - } - - // worker id of dumper starts from 1, which only dump heap datar - if (worker_id < _num_dumper_threads) { - return DumperType; - } - - // worker id of writer starts from _num_dumper_threads - return WriterType; - } - - void prepare_parallel_dump(uint num_total) { - assert (_dumper_controller == nullptr, "dumper controller must be null"); - assert (num_total > 0, "active workers number must >= 1"); - // Dumper threads number must not be larger than active workers number. - if (num_total < _num_dumper_threads) { - _num_dumper_threads = num_total - 1; - } - // Calculate dumper and writer threads number. - _num_writer_threads = num_total - _num_dumper_threads; - // If dumper threads number is 1, only the VMThread works as a dumper. - // If dumper threads number is equal to active workers, need at lest one worker thread as writer. - if (_num_dumper_threads > 0 && _num_writer_threads == 0) { - _num_writer_threads = 1; - _num_dumper_threads = num_total - _num_writer_threads; - } - // Prepare parallel writer. - if (_num_dumper_threads > 1) { - ParDumpWriter::before_work(); - // Number of dumper threads that only iterate heap. - uint _heap_only_dumper_threads = _num_dumper_threads - 1 /* VMDumper thread */; - _dumper_controller = new (std::nothrow) DumperController(_heap_only_dumper_threads); - } - } - - void finish_parallel_dump() { - if (_num_dumper_threads > 1) { - ParDumpWriter::after_work(); - } - } + // VM dumper dumps both heap and non-heap data, other dumpers dump heap-only data. + static bool is_vm_dumper(uint worker_id) { return worker_id == VMDumperWorkerId; } // accessors and setters static VM_HeapDumper* dumper() { assert(_global_dumper != nullptr, "Error"); return _global_dumper; } static DumpWriter* writer() { assert(_global_writer != nullptr, "Error"); return _global_writer; } + void set_global_dumper() { assert(_global_dumper == nullptr, "Error"); _global_dumper = this; @@ -1892,6 +1659,9 @@ class VM_HeapDumper : public VM_GC_Operation, public WorkerTask { bool skip_operation() const; + // create dump writer for every parallel dump thread + DumpWriter* create_local_writer(); + // writes a HPROF_LOAD_CLASS record static void do_load_class(Klass* k); @@ -1909,9 +1679,6 @@ class VM_HeapDumper : public VM_GC_Operation, public WorkerTask { // HPROF_TRACE and HPROF_FRAME records void dump_stack_traces(); - // large objects - void dump_large_objects(ObjectClosure* writer); - public: VM_HeapDumper(DumpWriter* writer, bool gc_before_heap_dump, bool oome, uint num_dump_threads) : VM_GC_Operation(0 /* total collections, dummy, ignored */, @@ -1924,10 +1691,10 @@ class VM_HeapDumper : public VM_GC_Operation, public WorkerTask { _klass_map = new (mtServiceability) GrowableArray(INITIAL_CLASS_COUNT, mtServiceability); _stack_traces = nullptr; _num_threads = 0; + _dump_seq = 0; _num_dumper_threads = num_dump_threads; _dumper_controller = nullptr; _poi = nullptr; - _large_object_list = new (std::nothrow) HeapDumpLargeObjectList(); if (oome) { assert(!Thread::current()->is_VM_thread(), "Dump from OutOfMemoryError cannot be called by the VMThread"); // get OutOfMemoryError zero-parameter constructor @@ -1954,8 +1721,10 @@ class VM_HeapDumper : public VM_GC_Operation, public WorkerTask { _dumper_controller = nullptr; } delete _klass_map; - delete _large_object_list; } + int dump_seq() { return _dump_seq; } + bool is_parallel_dump() { return _num_dumper_threads > 1; } + bool can_parallel_dump(WorkerThreads* workers); VMOp_Type type() const { return VMOp_HeapDumper; } virtual bool doit_prologue(); @@ -2147,6 +1916,32 @@ bool VM_HeapDumper::doit_prologue() { return VM_GC_Operation::doit_prologue(); } +bool VM_HeapDumper::can_parallel_dump(WorkerThreads* workers) { + bool can_parallel = true; + uint num_active_workers = workers != nullptr ? workers->active_workers() : 0; + uint num_requested_dump_threads = _num_dumper_threads; + // check if we can dump in parallel based on requested and active threads + if (num_active_workers <= 1 || num_requested_dump_threads <= 1) { + _num_dumper_threads = 1; + can_parallel = false; + } else { + // check if we have extra path room to accommodate segmented heap files + const char* base_path = writer()->get_file_path(); + assert(base_path != nullptr, "sanity check"); + if ((strlen(base_path) + 7/*.p\d\d\d\d\0*/) >= JVM_MAXPATHLEN) { + _num_dumper_threads = 1; + can_parallel = false; + } else { + _num_dumper_threads = clamp(num_requested_dump_threads, 2U, num_active_workers); + } + } + + log_info(heapdump)("Requested dump threads %u, active dump threads %u, " + "actual dump threads %u, parallelism %s", + num_requested_dump_threads, num_active_workers, + _num_dumper_threads, can_parallel ? "true" : "false"); + return can_parallel; +} // The VM operation that dumps the heap. The dump consists of the following // records: @@ -2193,23 +1988,15 @@ void VM_HeapDumper::doit() { set_global_writer(); WorkerThreads* workers = ch->safepoint_workers(); - - if (workers == nullptr) { - // Use serial dump, set dumper threads and writer threads number to 1. - _num_dumper_threads=1; - _num_writer_threads=1; - work(0); + if (!can_parallel_dump(workers)) { + work(VMDumperWorkerId); } else { - prepare_parallel_dump(workers->active_workers()); - if (_num_dumper_threads > 1) { - ParallelObjectIterator poi(_num_dumper_threads); - _poi = &poi; - workers->run_task(this); - _poi = nullptr; - } else { - workers->run_task(this); - } - finish_parallel_dump(); + uint heap_only_dumper_threads = _num_dumper_threads - 1 /* VMDumper thread */; + _dumper_controller = new (std::nothrow) DumperController(heap_only_dumper_threads); + ParallelObjectIterator poi(_num_dumper_threads); + _poi = &poi; + workers->run_task(this, _num_dumper_threads); + _poi = nullptr; } // Now we clear the global variables, so that a future dumper can run. @@ -2217,17 +2004,26 @@ void VM_HeapDumper::doit() { clear_global_writer(); } +// prepare DumpWriter for every parallel dump thread +DumpWriter* VM_HeapDumper::create_local_writer() { + char* path = NEW_RESOURCE_ARRAY(char, JVM_MAXPATHLEN); + memset(path, 0, JVM_MAXPATHLEN); + + // generate segmented heap file path + const char* base_path = writer()->get_file_path(); + AbstractCompressor* compressor = writer()->compressor(); + int seq = Atomic::fetch_then_add(&_dump_seq, 1); + os::snprintf(path, JVM_MAXPATHLEN, "%s.p%d", base_path, seq); + + // create corresponding writer for that + DumpWriter* local_writer = new DumpWriter(path, writer()->is_overwrite(), compressor); + return local_writer; +} + void VM_HeapDumper::work(uint worker_id) { - if (worker_id != 0) { - if (get_worker_type(worker_id) == WriterType) { - writer()->writer_loop(); - return; - } - if (_num_dumper_threads > 1 && get_worker_type(worker_id) == DumperType) { - _dumper_controller->wait_for_start_signal(); - } - } else { - // The worker 0 on all non-heap data dumping and part of heap iteration. + // VM Dumper works on all non-heap data dumping and part of heap iteration. + if (is_vm_dumper(worker_id)) { + TraceTime timer("Dump non-objects", TRACETIME_LOG(Info, heapdump)); // Write the file header - we always use 1.0.2 const char* header = "JAVA PROFILE 1.0.2"; @@ -2271,55 +2067,44 @@ void VM_HeapDumper::work(uint worker_id) { StickyClassDumper class_dumper(writer()); ClassLoaderData::the_null_class_loader_data()->classes_do(&class_dumper); } + + // Heap iteration. // writes HPROF_GC_INSTANCE_DUMP records. // After each sub-record is written check_segment_length will be invoked // to check if the current segment exceeds a threshold. If so, a new // segment is started. // The HPROF_GC_CLASS_DUMP and HPROF_GC_INSTANCE_DUMP are the vast bulk // of the heap dump. - if (_num_dumper_threads <= 1) { + if (!is_parallel_dump()) { + assert(is_vm_dumper(worker_id), "must be"); + // == Serial dump + TraceTime timer("Dump heap objects", TRACETIME_LOG(Info, heapdump)); HeapObjectDumper obj_dumper(writer()); Universe::heap()->object_iterate(&obj_dumper); + writer()->finish_dump_segment(); + // Writes the HPROF_HEAP_DUMP_END record because merge does not happen in serial dump + DumperSupport::end_of_dump(writer()); + writer()->flush(); } else { - assert(get_worker_type(worker_id) == DumperType - || get_worker_type(worker_id) == VMDumperType, - "must be dumper thread to do heap iteration"); - if (get_worker_type(worker_id) == VMDumperType) { - // Clear global writer's buffer. - writer()->finish_dump_segment(true); - // Notify dumpers to start heap iteration. - _dumper_controller->start_dump(); + // == Parallel dump + ResourceMark rm; + TraceTime timer("Dump heap objects in parallel", TRACETIME_LOG(Info, heapdump)); + DumpWriter* local_writer = is_vm_dumper(worker_id) ? writer() : create_local_writer(); + if (!local_writer->has_error()) { + HeapObjectDumper obj_dumper(local_writer); + _poi->object_iterate(&obj_dumper, worker_id); + local_writer->finish_dump_segment(); + local_writer->flush(); } - // Heap iteration. - { - ParDumpWriter pw(writer()); - { - HeapObjectDumper obj_dumper(&pw, _large_object_list); - _poi->object_iterate(&obj_dumper, worker_id); - } - - if (get_worker_type(worker_id) == VMDumperType) { - _dumper_controller->wait_all_dumpers_complete(); - // clear internal buffer; - pw.finish_dump_segment(true); - // refresh the global_writer's buffer and position; - writer()->refresh(); - } else { - pw.finish_dump_segment(true); - _dumper_controller->dumper_complete(); - return; - } + if (is_vm_dumper(worker_id)) { + _dumper_controller->wait_all_dumpers_complete(); + } else { + _dumper_controller->dumper_complete(local_writer, writer()); + return; } } - - assert(get_worker_type(worker_id) == VMDumperType, "Heap dumper must be VMDumper"); - // Use writer() rather than ParDumpWriter to avoid memory consumption. - HeapObjectDumper obj_dumper(writer()); - dump_large_objects(&obj_dumper); - // Writes the HPROF_HEAP_DUMP_END record. - DumperSupport::end_of_dump(writer()); - // We are done with writing. Release the worker threads. - writer()->deactivate(); + // At this point, all fragments of the heapdump have been written to separate files. + // We need to merge them into a complete heapdump and write HPROF_HEAP_DUMP_END at that time. } void VM_HeapDumper::dump_stack_traces() { @@ -2380,11 +2165,6 @@ void VM_HeapDumper::dump_stack_traces() { } } -// dump the large objects. -void VM_HeapDumper::dump_large_objects(ObjectClosure* cl) { - _large_object_list->drain(cl); -} - // dump the heap to given path. int HeapDumper::dump(const char* path, outputStream* out, int compression, bool overwrite, uint num_dump_threads) { assert(path != nullptr && strlen(path) > 0, "path missing"); @@ -2408,7 +2188,7 @@ int HeapDumper::dump(const char* path, outputStream* out, int compression, bool } } - DumpWriter writer(new (std::nothrow) FileWriter(path, overwrite), compressor); + DumpWriter writer(path, overwrite, compressor); if (writer.error() != nullptr) { set_error(writer.error()); @@ -2419,18 +2199,38 @@ int HeapDumper::dump(const char* path, outputStream* out, int compression, bool return -1; } - // generate the dump + // generate the segmented heap dump into separate files VM_HeapDumper dumper(&writer, _gc_before_heap_dump, _oome, num_dump_threads); - if (Thread::current()->is_VM_thread()) { - assert(SafepointSynchronize::is_at_safepoint(), "Expected to be called at a safepoint"); - dumper.doit(); - } else { - VMThread::execute(&dumper); - } + VMThread::execute(&dumper); // record any error that the writer may have encountered set_error(writer.error()); + // For serial dump, once VM_HeapDumper completes, the whole heap dump process + // is done, no further phases needed. For parallel dump, the whole heap dump + // process is done in two phases + // + // Phase 1: Concurrent threads directly write heap data to multiple heap files. + // This is done by VM_HeapDumper, which is performed within safepoint. + // + // Phase 2: Merge multiple heap files into one complete heap dump file. + // This is done by DumpMerger, which is performed outside safepoint + if (dumper.is_parallel_dump()) { + DumpMerger merger(path, &writer, dumper.dump_seq()); + Thread* current_thread = Thread::current(); + if (current_thread->is_AttachListener_thread()) { + // perform heapdump file merge operation in the current thread prevents us + // from occupying the VM Thread, which in turn affects the occurrence of + // GC and other VM operations. + merger.do_merge(); + } else { + // otherwise, performs it by VM thread + VM_HeapDumpMerge op(&merger); + VMThread::execute(&op); + } + set_error(writer.error()); + } + // emit JFR event if (error() == nullptr) { event.set_destination(path); diff --git a/src/hotspot/share/services/heapDumper.hpp b/src/hotspot/share/services/heapDumper.hpp index 79d9d46addd..bfc31b2f231 100644 --- a/src/hotspot/share/services/heapDumper.hpp +++ b/src/hotspot/share/services/heapDumper.hpp @@ -29,20 +29,9 @@ #include "oops/oop.hpp" #include "runtime/os.hpp" -// HeapDumper is used to dump the java heap to file in HPROF binary format: -// -// { HeapDumper dumper(true /* full GC before heap dump */); -// if (dumper.dump("/export/java.hprof")) { -// ResourceMark rm; -// tty->print_cr("Dump failed: %s", dumper.error_as_C_string()); -// } else { -// // dump succeeded -// } -// } -// - class outputStream; +// HeapDumper is used to dump the java heap to file in HPROF binary format class HeapDumper : public StackObj { private: char* _error; @@ -80,6 +69,11 @@ class HeapDumper : public StackObj { static void dump_heap() NOT_SERVICES_RETURN; static void dump_heap_from_oome() NOT_SERVICES_RETURN; + + // Parallel thread number for heap dump, initialize based on active processor count. + static uint default_num_of_dump_threads() { + return MAX2(1, (uint)os::initial_active_processor_count() * 3 / 8); + } }; #endif // SHARE_SERVICES_HEAPDUMPER_HPP diff --git a/src/hotspot/share/services/heapDumperCompression.cpp b/src/hotspot/share/services/heapDumperCompression.cpp index 7afa769e152..d9a845d0ec7 100644 --- a/src/hotspot/share/services/heapDumperCompression.cpp +++ b/src/hotspot/share/services/heapDumperCompression.cpp @@ -134,363 +134,3 @@ char const* GZipCompressor::compress(char* in, size_t in_size, char* out, size_t return msg; } - -WorkList::WorkList() { - _head._next = &_head; - _head._prev = &_head; -} - -void WorkList::insert(WriteWork* before, WriteWork* work) { - work->_prev = before; - work->_next = before->_next; - before->_next = work; - work->_next->_prev = work; -} - -WriteWork* WorkList::remove(WriteWork* work) { - if (work != nullptr) { - assert(work->_next != work, "Invalid next"); - assert(work->_prev != work, "Invalid prev"); - work->_prev->_next = work->_next;; - work->_next->_prev = work->_prev; - work->_next = nullptr; - work->_prev = nullptr; - } - - return work; -} - -void WorkList::add_by_id(WriteWork* work) { - if (is_empty()) { - add_first(work); - } else { - WriteWork* last_curr = &_head; - WriteWork* curr = _head._next; - - while (curr->_id < work->_id) { - last_curr = curr; - curr = curr->_next; - - if (curr == &_head) { - add_last(work); - return; - } - } - - insert(last_curr, work); - } -} - - - -CompressionBackend::CompressionBackend(AbstractWriter* writer, - AbstractCompressor* compressor, size_t block_size, size_t max_waste) : - _active(false), - _err(nullptr), - _nr_of_threads(0), - _works_created(0), - _work_creation_failed(false), - _id_to_write(0), - _next_id(0), - _in_size(block_size), - _max_waste(max_waste), - _out_size(0), - _tmp_size(0), - _written(0), - _writer(writer), - _compressor(compressor), - _lock(new (std::nothrow) PaddedMonitor(Mutex::nosafepoint, "HProfCompressionBackend_lock")) { - if (_writer == nullptr) { - set_error("Could not allocate writer"); - } else if (_lock == nullptr) { - set_error("Could not allocate lock"); - } else { - set_error(_writer->open_writer()); - } - - if (_compressor != nullptr) { - set_error(_compressor->init(_in_size, &_out_size, &_tmp_size)); - } - - _current = allocate_work(_in_size, _out_size, _tmp_size); - - if (_current == nullptr) { - set_error("Could not allocate memory for buffer"); - } - - _active = (_err == nullptr); -} - -CompressionBackend::~CompressionBackend() { - assert(!_active, "Must not be active by now"); - assert(_nr_of_threads == 0, "Must have no active threads"); - assert(_to_compress.is_empty() && _finished.is_empty(), "Still work to do"); - - free_work_list(&_unused); - free_work(_current); - assert(_works_created == 0, "All work must have been freed"); - - delete _compressor; - delete _writer; - delete _lock; -} - -void CompressionBackend::flush_buffer(MonitorLocker* ml) { - - // Make sure we write the last partially filled buffer. - if ((_current != nullptr) && (_current->_in_used > 0)) { - _current->_id = _next_id++; - _to_compress.add_last(_current); - _current = nullptr; - ml->notify_all(); - } - - // Wait for the threads to drain the compression work list and do some work yourself. - while (!_to_compress.is_empty()) { - do_foreground_work(); - } -} - -void CompressionBackend::flush_buffer() { - assert(_active, "Must be active"); - - MonitorLocker ml(_lock, Mutex::_no_safepoint_check_flag); - flush_buffer(&ml); -} - -void CompressionBackend::deactivate() { - assert(_active, "Must be active"); - - MonitorLocker ml(_lock, Mutex::_no_safepoint_check_flag); - flush_buffer(&ml); - - _active = false; - ml.notify_all(); -} - -void CompressionBackend::thread_loop() { - { - MonitorLocker ml(_lock, Mutex::_no_safepoint_check_flag); - _nr_of_threads++; - } - - WriteWork* work; - while ((work = get_work()) != nullptr) { - do_compress(work); - finish_work(work); - } - - MonitorLocker ml(_lock, Mutex::_no_safepoint_check_flag); - _nr_of_threads--; - assert(_nr_of_threads >= 0, "Too many threads finished"); -} - -void CompressionBackend::set_error(char const* new_error) { - if ((new_error != nullptr) && (_err == nullptr)) { - _err = new_error; - } -} - -WriteWork* CompressionBackend::allocate_work(size_t in_size, size_t out_size, - size_t tmp_size) { - WriteWork* result = (WriteWork*) os::malloc(sizeof(WriteWork), mtInternal); - - if (result == nullptr) { - _work_creation_failed = true; - return nullptr; - } - - _works_created++; - result->_in = (char*) os::malloc(in_size, mtInternal); - result->_in_max = in_size; - result->_in_used = 0; - result->_out = nullptr; - result->_tmp = nullptr; - - if (result->_in == nullptr) { - goto fail; - } - - if (out_size > 0) { - result->_out = (char*) os::malloc(out_size, mtInternal); - result->_out_used = 0; - result->_out_max = out_size; - - if (result->_out == nullptr) { - goto fail; - } - } - - if (tmp_size > 0) { - result->_tmp = (char*) os::malloc(tmp_size, mtInternal); - result->_tmp_max = tmp_size; - - if (result->_tmp == nullptr) { - goto fail; - } - } - - return result; - -fail: - free_work(result); - _work_creation_failed = true; - return nullptr; -} - -void CompressionBackend::free_work(WriteWork* work) { - if (work != nullptr) { - os::free(work->_in); - os::free(work->_out); - os::free(work->_tmp); - os::free(work); - --_works_created; - } -} - -void CompressionBackend::free_work_list(WorkList* list) { - while (!list->is_empty()) { - free_work(list->remove_first()); - } -} - -void CompressionBackend::do_foreground_work() { - assert(!_to_compress.is_empty(), "Must have work to do"); - assert(_lock->owned_by_self(), "Must have the lock"); - - WriteWork* work = _to_compress.remove_first(); - MutexUnlocker mu(_lock, Mutex::_no_safepoint_check_flag); - do_compress(work); - finish_work(work); -} - -WriteWork* CompressionBackend::get_work() { - MonitorLocker ml(_lock, Mutex::_no_safepoint_check_flag); - - while (_active && _to_compress.is_empty()) { - ml.wait(); - } - - return _to_compress.remove_first(); -} - -void CompressionBackend::flush_external_buffer(char* buffer, size_t used, size_t max) { - assert(buffer != nullptr && used != 0 && max != 0, "Invalid data send to compression backend"); - assert(_active == true, "Backend must be active when flushing external buffer"); - char* buf; - size_t tmp_used = 0; - size_t tmp_max = 0; - - MonitorLocker ml(_lock, Mutex::_no_safepoint_check_flag); - // First try current buffer. Use it if empty. - if (_current->_in_used == 0) { - buf = _current->_in; - } else { - // If current buffer is not clean, flush it. - MutexUnlocker ml(_lock, Mutex::_no_safepoint_check_flag); - get_new_buffer(&buf, &tmp_used, &tmp_max, true); - } - assert (_current->_in != nullptr && _current->_in_max >= max && - _current->_in_used == 0, "Invalid buffer from compression backend"); - // Copy data to backend buffer. - memcpy(buf, buffer, used); - - assert(_current->_in == buf, "Must be current"); - _current->_in_used += used; -} - -void CompressionBackend::get_new_buffer(char** buffer, size_t* used, size_t* max, bool force_reset) { - if (_active) { - MonitorLocker ml(_lock, Mutex::_no_safepoint_check_flag); - if (*used > 0 || force_reset) { - _current->_in_used += *used; - // Check if we do not waste more than _max_waste. If yes, write the buffer. - // Otherwise return the rest of the buffer as the new buffer. - if (_current->_in_max - _current->_in_used <= _max_waste || force_reset) { - _current->_id = _next_id++; - _to_compress.add_last(_current); - _current = nullptr; - ml.notify_all(); - } else { - *buffer = _current->_in + _current->_in_used; - *used = 0; - *max = _current->_in_max - _current->_in_used; - return; - } - } - - while ((_current == nullptr) && _unused.is_empty() && _active) { - // Add more work objects if needed. - if (!_work_creation_failed && (_works_created <= _nr_of_threads)) { - WriteWork* work = allocate_work(_in_size, _out_size, _tmp_size); - - if (work != nullptr) { - _unused.add_first(work); - } - } else if (!_to_compress.is_empty() && (_nr_of_threads == 0)) { - do_foreground_work(); - } else { - ml.wait(); - } - } - - if (_current == nullptr) { - _current = _unused.remove_first(); - } - - if (_current != nullptr) { - _current->_in_used = 0; - _current->_out_used = 0; - *buffer = _current->_in; - *used = 0; - *max = _current->_in_max; - - return; - } - } - - *buffer = nullptr; - *used = 0; - *max = 0; - - return; -} - -void CompressionBackend::do_compress(WriteWork* work) { - if (_compressor != nullptr) { - char const* msg = _compressor->compress(work->_in, work->_in_used, work->_out, - work->_out_max, - work->_tmp, _tmp_size, &work->_out_used); - - if (msg != nullptr) { - MutexLocker ml(_lock, Mutex::_no_safepoint_check_flag); - set_error(msg); - } - } -} - -void CompressionBackend::finish_work(WriteWork* work) { - MonitorLocker ml(_lock, Mutex::_no_safepoint_check_flag); - - _finished.add_by_id(work); - - // Write all finished works as far as we can. - while (!_finished.is_empty() && (_finished.first()->_id == _id_to_write)) { - WriteWork* to_write = _finished.remove_first(); - size_t size = _compressor == nullptr ? to_write->_in_used : to_write->_out_used; - char* p = _compressor == nullptr ? to_write->_in : to_write->_out; - char const* msg = nullptr; - - if (_err == nullptr) { - _written += size; - MutexUnlocker mu(_lock, Mutex::_no_safepoint_check_flag); - msg = _writer->write_buf(p, (ssize_t) size); - } - - set_error(msg); - _unused.add_first(to_write); - _id_to_write++; - } - - ml.notify_all(); -} diff --git a/src/hotspot/share/services/heapDumperCompression.hpp b/src/hotspot/share/services/heapDumperCompression.hpp index d77436a2aa6..f8199fc5df2 100644 --- a/src/hotspot/share/services/heapDumperCompression.hpp +++ b/src/hotspot/share/services/heapDumperCompression.hpp @@ -74,6 +74,10 @@ public: // Does the write. Returns null on success and a static error message otherwise. virtual char const* write_buf(char* buf, ssize_t size); + + const char* get_file_path() { return _path; } + + bool is_overwrite() const { return _overwrite; } }; @@ -97,145 +101,4 @@ public: char* tmp, size_t tmp_size, size_t* compressed_size); }; - -// The data needed to write a single buffer (and compress it optionally). -struct WriteWork { - // The id of the work. - int64_t _id; - - // The input buffer where the raw data is - char* _in; - size_t _in_used; - size_t _in_max; - - // The output buffer where the compressed data is. Is null when compression is disabled. - char* _out; - size_t _out_used; - size_t _out_max; - - // The temporary space needed for compression. Is null when compression is disabled. - char* _tmp; - size_t _tmp_max; - - // Used to link WriteWorks into lists. - WriteWork* _next; - WriteWork* _prev; -}; - -// A list for works. -class WorkList { -private: - WriteWork _head; - - void insert(WriteWork* before, WriteWork* work); - WriteWork* remove(WriteWork* work); - -public: - WorkList(); - - // Return true if the list is empty. - bool is_empty() { return _head._next == &_head; } - - // Adds to the beginning of the list. - void add_first(WriteWork* work) { insert(&_head, work); } - - // Adds to the end of the list. - void add_last(WriteWork* work) { insert(_head._prev, work); } - - // Adds so the ids are ordered. - void add_by_id(WriteWork* work); - - // Returns the first element. - WriteWork* first() { return is_empty() ? nullptr : _head._next; } - - // Returns the last element. - WriteWork* last() { return is_empty() ? nullptr : _head._prev; } - - // Removes the first element. Returns null if empty. - WriteWork* remove_first() { return remove(first()); } - - // Removes the last element. Returns null if empty. - WriteWork* remove_last() { return remove(first()); } -}; - - -class Monitor; - -// This class is used by the DumpWriter class. It supplies the DumpWriter with -// chunks of memory to write the heap dump data into. When the DumpWriter needs a -// new memory chunk, it calls get_new_buffer(), which commits the old chunk used -// and returns a new chunk. The old chunk is then added to a queue to be compressed -// and then written in the background. -class CompressionBackend : StackObj { - bool _active; - char const * _err; - - int _nr_of_threads; - int _works_created; - bool _work_creation_failed; - - int64_t _id_to_write; - int64_t _next_id; - - size_t _in_size; - size_t _max_waste; - size_t _out_size; - size_t _tmp_size; - - size_t _written; - - AbstractWriter* const _writer; - AbstractCompressor* const _compressor; - - Monitor* const _lock; - - WriteWork* _current; - WorkList _to_compress; - WorkList _unused; - WorkList _finished; - - void set_error(char const* new_error); - - WriteWork* allocate_work(size_t in_size, size_t out_size, size_t tmp_size); - void free_work(WriteWork* work); - void free_work_list(WorkList* list); - - void do_foreground_work(); - WriteWork* get_work(); - void do_compress(WriteWork* work); - void finish_work(WriteWork* work); - void flush_buffer(MonitorLocker* ml); - -public: - // compressor can be null if no compression is used. - // Takes ownership of the writer and compressor. - // block_size is the buffer size of a WriteWork. - // max_waste is the maximum number of bytes to leave - // empty in the buffer when it is written. - CompressionBackend(AbstractWriter* writer, AbstractCompressor* compressor, - size_t block_size, size_t max_waste); - - ~CompressionBackend(); - - size_t get_written() const { return _written; } - - char const* error() const { return _err; } - - // Sets up an internal buffer, fills with external buffer, and sends to compressor. - void flush_external_buffer(char* buffer, size_t used, size_t max); - - // Commits the old buffer (using the value in *used) and sets up a new one. - void get_new_buffer(char** buffer, size_t* used, size_t* max, bool force_reset = false); - - // The entry point for a worker thread. - void thread_loop(); - - // Shuts down the backend, releasing all threads. - void deactivate(); - - // Flush all compressed data in buffer to file - void flush_buffer(); -}; - - #endif // SHARE_SERVICES_HEAPDUMPERCOMPRESSION_HPP diff --git a/src/jdk.hotspot.agent/share/classes/sun/jvm/hotspot/runtime/AttachListenerThread.java b/src/jdk.hotspot.agent/share/classes/sun/jvm/hotspot/runtime/AttachListenerThread.java new file mode 100644 index 00000000000..55d440f79fd --- /dev/null +++ b/src/jdk.hotspot.agent/share/classes/sun/jvm/hotspot/runtime/AttachListenerThread.java @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2023, Alibaba Group Holding Limited. All Rights Reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + * + */ + +package sun.jvm.hotspot.runtime; + +import java.io.*; + +import sun.jvm.hotspot.debugger.Address; + +public class AttachListenerThread extends JavaThread { + + public AttachListenerThread (Address addr) { + super(addr); + } + + public boolean isJavaThread() { return false; } + + public boolean isAttachListenerThread() { return true; } + +} diff --git a/src/jdk.hotspot.agent/share/classes/sun/jvm/hotspot/runtime/Thread.java b/src/jdk.hotspot.agent/share/classes/sun/jvm/hotspot/runtime/Thread.java index 355f78e4406..8267f12a9e9 100644 --- a/src/jdk.hotspot.agent/share/classes/sun/jvm/hotspot/runtime/Thread.java +++ b/src/jdk.hotspot.agent/share/classes/sun/jvm/hotspot/runtime/Thread.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2000, 2021, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2000, 2023, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -85,6 +85,7 @@ public class Thread extends VMObject { public boolean isWatcherThread() { return false; } public boolean isServiceThread() { return false; } public boolean isMonitorDeflationThread() { return false; } + public boolean isAttachListenerThread() { return false; } /** Memory operations */ public void oopsDo(AddressVisitor oopVisitor) { diff --git a/src/jdk.hotspot.agent/share/classes/sun/jvm/hotspot/runtime/Threads.java b/src/jdk.hotspot.agent/share/classes/sun/jvm/hotspot/runtime/Threads.java index d0fa7c8d3f1..daf00972676 100644 --- a/src/jdk.hotspot.agent/share/classes/sun/jvm/hotspot/runtime/Threads.java +++ b/src/jdk.hotspot.agent/share/classes/sun/jvm/hotspot/runtime/Threads.java @@ -157,6 +157,7 @@ public class Threads { virtualConstructor.addMapping("MonitorDeflationThread", MonitorDeflationThread.class); virtualConstructor.addMapping("NotificationThread", NotificationThread.class); virtualConstructor.addMapping("StringDedupThread", StringDedupThread.class); + virtualConstructor.addMapping("AttachListenerThread", AttachListenerThread.class); } public Threads() { @@ -164,14 +165,15 @@ public class Threads { } /** NOTE: this returns objects of type JavaThread, CompilerThread, - JvmtiAgentThread, NotificationThread, MonitorDeflationThread and ServiceThread. - The latter four are subclasses of the former. Most operations + JvmtiAgentThread, NotificationThread, MonitorDeflationThread, + StringDedupThread, AttachListenerThread and ServiceThread. + The latter seven subclasses of the former. Most operations (fetching the top frame, etc.) are only allowed to be performed on a "pure" JavaThread. For this reason, {@link sun.jvm.hotspot.runtime.JavaThread#isJavaThread} has been changed from the definition in the VM (which returns true for all of these thread types) to return true for JavaThreads and - false for the four subclasses. FIXME: should reconsider the + false for the seven subclasses. FIXME: should reconsider the inheritance hierarchy; see {@link sun.jvm.hotspot.runtime.JavaThread#isJavaThread}. */ public JavaThread getJavaThreadAt(int i) { @@ -195,7 +197,8 @@ public class Threads { return thread; } catch (Exception e) { throw new RuntimeException("Unable to deduce type of thread from address " + threadAddr + - " (expected type JavaThread, CompilerThread, MonitorDeflationThread, ServiceThread or JvmtiAgentThread)", e); + " (expected type JavaThread, CompilerThread, MonitorDeflationThread, AttachListenerThread," + + " StringDedupThread, NotificationThread, ServiceThread or JvmtiAgentThread)", e); } } diff --git a/test/hotspot/jtreg/serviceability/dcmd/gc/HeapDumpParallelTest.java b/test/hotspot/jtreg/serviceability/dcmd/gc/HeapDumpParallelTest.java new file mode 100644 index 00000000000..0dfdf54152d --- /dev/null +++ b/test/hotspot/jtreg/serviceability/dcmd/gc/HeapDumpParallelTest.java @@ -0,0 +1,147 @@ +/* + * Copyright (c) 2023, Alibaba Group Holding Limited. All Rights Reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.Arrays; +import java.util.List; + +import jdk.test.lib.Asserts; +import jdk.test.lib.JDKToolLauncher; +import jdk.test.lib.Utils; +import jdk.test.lib.apps.LingeredApp; +import jdk.test.lib.dcmd.PidJcmdExecutor; +import jdk.test.lib.process.OutputAnalyzer; +import jdk.test.lib.process.ProcessTools; + +import jdk.test.lib.hprof.HprofParser; + +/** + * @test + * @bug 8306441 + * @summary Verify the integrity of generated heap dump and capability of parallel dump + * @library /test/lib + * @run main HeapDumpParallelTest + */ + +public class HeapDumpParallelTest { + + private static void checkAndVerify(OutputAnalyzer dcmdOut, LingeredApp app, File heapDumpFile, boolean expectSerial) throws IOException { + dcmdOut.shouldHaveExitValue(0); + dcmdOut.shouldContain("Heap dump file created"); + OutputAnalyzer appOut = new OutputAnalyzer(app.getProcessStdout()); + appOut.shouldContain("[heapdump]"); + String opts = Arrays.asList(Utils.getTestJavaOpts()).toString(); + if (opts.contains("-XX:+UseSerialGC") || opts.contains("-XX:+UseEpsilonGC")) { + System.out.println("UseSerialGC detected."); + expectSerial = true; + } + if (!expectSerial && Runtime.getRuntime().availableProcessors() > 1) { + appOut.shouldContain("Dump heap objects in parallel"); + appOut.shouldContain("Merge heap files complete"); + } else { + appOut.shouldNotContain("Dump heap objects in parallel"); + appOut.shouldNotContain("Merge heap files complete"); + } + verifyHeapDump(heapDumpFile); + if (heapDumpFile.exists()) { + heapDumpFile.delete(); + } + } + + private static LingeredApp launchApp() throws IOException { + LingeredApp theApp = new LingeredApp(); + LingeredApp.startApp(theApp, "-Xlog:heapdump", "-Xmx512m", + "-XX:-UseDynamicNumberOfGCThreads", + "-XX:ParallelGCThreads=2"); + return theApp; + } + + public static void main(String[] args) throws Exception { + String heapDumpFileName = "parallelHeapDump.bin"; + + File heapDumpFile = new File(heapDumpFileName); + if (heapDumpFile.exists()) { + heapDumpFile.delete(); + } + + LingeredApp theApp = launchApp(); + try { + // Expect error message + OutputAnalyzer out = attachJcmdHeapDump(heapDumpFile, theApp.getPid(), "-parallel=" + -1); + out.shouldContain("Invalid number of parallel dump threads."); + + // Expect serial dump because 0 implies to disable parallel dump + test(heapDumpFile, "-parallel=" + 0, true); + + // Expect serial dump + test(heapDumpFile, "-parallel=" + 1, true); + + // Expect parallel dump + test(heapDumpFile, "-parallel=" + Integer.MAX_VALUE, false); + + // Expect parallel dump + test(heapDumpFile, "-gz=9 -overwrite -parallel=" + Runtime.getRuntime().availableProcessors(), false); + } finally { + theApp.stopApp(); + } + } + + private static void test(File heapDumpFile, String arg, boolean expectSerial) throws Exception { + LingeredApp theApp = launchApp(); + try { + OutputAnalyzer dcmdOut = attachJcmdHeapDump(heapDumpFile, theApp.getPid(), arg); + theApp.stopApp(); + checkAndVerify(dcmdOut, theApp, heapDumpFile, expectSerial); + } finally { + theApp.stopApp(); + } + } + + private static OutputAnalyzer attachJcmdHeapDump(File heapDumpFile, long lingeredAppPid, String arg) throws Exception { + // e.g. jcmd GC.heap_dump -parallel=cpucount + System.out.println("Testing pid " + lingeredAppPid); + PidJcmdExecutor executor = new PidJcmdExecutor("" + lingeredAppPid); + return executor.execute("GC.heap_dump " + arg + " " + heapDumpFile.getAbsolutePath()); + } + + private static void verifyHeapDump(File dump) { + Asserts.assertTrue(dump.exists() && dump.isFile(), "Could not create dump file " + dump.getAbsolutePath()); + try { + File out = HprofParser.parse(dump); + + Asserts.assertTrue(out != null && out.exists() && out.isFile(), "Could not find hprof parser output file"); + List lines = Files.readAllLines(out.toPath()); + Asserts.assertTrue(lines.size() > 0, "hprof parser output file is empty"); + for (String line : lines) { + Asserts.assertFalse(line.matches(".*WARNING(?!.*Failed to resolve object.*constantPoolOop.*).*")); + } + + out.delete(); + } catch (Exception e) { + e.printStackTrace(); + Asserts.fail("Could not parse dump file " + dump.getAbsolutePath()); + } + } +} \ No newline at end of file