8252842: Extend jmap to support parallel heap dump

Reviewed-by: rschmelter, cjplummer
This commit is contained in:
Lin Zang 2021-09-23 07:00:06 +00:00
parent 2166ed1369
commit a74c099d67
6 changed files with 792 additions and 197 deletions

View File

@ -243,11 +243,15 @@ jint dump_heap(AttachOperation* op, outputStream* out) {
return JNI_ERR; return JNI_ERR;
} }
} }
// Parallel thread number for heap dump, initialize based on active processor count.
// Note the real number of threads used is also determined by active workers and compression
// backend thread number. See heapDumper.cpp.
uint parallel_thread_num = MAX2<uint>(1, (uint)os::initial_active_processor_count() * 3 / 8);
// Request a full GC before heap dump if live_objects_only = true // Request a full GC before heap dump if live_objects_only = true
// This helps reduces the amount of unreachable objects in the dump // This helps reduces the amount of unreachable objects in the dump
// and makes it easier to browse. // and makes it easier to browse.
HeapDumper dumper(live_objects_only /* request GC */); HeapDumper dumper(live_objects_only /* request GC */);
dumper.dump(op->arg(0), out, (int)level); dumper.dump(path, out, (int)level, false, (uint)parallel_thread_num);
} }
return JNI_OK; return JNI_OK;
} }

File diff suppressed because it is too large Load Diff

View File

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2005, 2020, Oracle and/or its affiliates. All rights reserved. * Copyright (c) 2005, 2021, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
* *
* This code is free software; you can redistribute it and/or modify it * This code is free software; you can redistribute it and/or modify it
@ -71,7 +71,8 @@ class HeapDumper : public StackObj {
// dumps the heap to the specified file, returns 0 if success. // dumps the heap to the specified file, returns 0 if success.
// additional info is written to out if not NULL. // additional info is written to out if not NULL.
// compression >= 0 creates a gzipped file with the given compression level. // compression >= 0 creates a gzipped file with the given compression level.
int dump(const char* path, outputStream* out = NULL, int compression = -1, bool overwrite = false); // parallel_thread_num >= 0 indicates thread numbers of parallel object dump
int dump(const char* path, outputStream* out = NULL, int compression = -1, bool overwrite = false, uint parallel_thread_num = 1);
// returns error message (resource allocated), or NULL if no error // returns error message (resource allocated), or NULL if no error
char* error_as_C_string() const; char* error_as_C_string() const;

View File

@ -237,23 +237,34 @@ CompressionBackend::~CompressionBackend() {
delete _lock; delete _lock;
} }
void CompressionBackend::deactivate() { void CompressionBackend::flush_buffer(MonitorLocker* ml) {
assert(_active, "Must be active");
MonitorLocker ml(_lock, Mutex::_no_safepoint_check_flag);
// Make sure we write the last partially filled buffer. // Make sure we write the last partially filled buffer.
if ((_current != NULL) && (_current->_in_used > 0)) { if ((_current != NULL) && (_current->_in_used > 0)) {
_current->_id = _next_id++; _current->_id = _next_id++;
_to_compress.add_last(_current); _to_compress.add_last(_current);
_current = NULL; _current = NULL;
ml.notify_all(); ml->notify_all();
} }
// Wait for the threads to drain the compression work list and do some work yourself. // Wait for the threads to drain the compression work list and do some work yourself.
while (!_to_compress.is_empty()) { while (!_to_compress.is_empty()) {
do_foreground_work(); do_foreground_work();
} }
}
void CompressionBackend::flush_buffer() {
assert(_active, "Must be active");
MonitorLocker ml(_lock, Mutex::_no_safepoint_check_flag);
flush_buffer(&ml);
}
void CompressionBackend::deactivate() {
assert(_active, "Must be active");
MonitorLocker ml(_lock, Mutex::_no_safepoint_check_flag);
flush_buffer(&ml);
_active = false; _active = false;
ml.notify_all(); ml.notify_all();
@ -365,16 +376,39 @@ WriteWork* CompressionBackend::get_work() {
return _to_compress.remove_first(); return _to_compress.remove_first();
} }
void CompressionBackend::get_new_buffer(char** buffer, size_t* used, size_t* max) { void CompressionBackend::flush_external_buffer(char* buffer, size_t used, size_t max) {
assert(buffer != NULL && used != 0 && max != 0, "Invalid data send to compression backend");
assert(_active == true, "Backend must be active when flushing external buffer");
char* buf;
size_t tmp_used = 0;
size_t tmp_max = 0;
MonitorLocker ml(_lock, Mutex::_no_safepoint_check_flag);
// First try current buffer. Use it if empty.
if (_current->_in_used == 0) {
buf = _current->_in;
} else {
// If current buffer is not clean, flush it.
MutexUnlocker ml(_lock, Mutex::_no_safepoint_check_flag);
get_new_buffer(&buf, &tmp_used, &tmp_max, true);
}
assert (_current->_in != NULL && _current->_in_max >= max &&
_current->_in_used == 0, "Invalid buffer from compression backend");
// Copy data to backend buffer.
memcpy(buf, buffer, used);
assert(_current->_in == buf, "Must be current");
_current->_in_used += used;
}
void CompressionBackend::get_new_buffer(char** buffer, size_t* used, size_t* max, bool force_reset) {
if (_active) { if (_active) {
MonitorLocker ml(_lock, Mutex::_no_safepoint_check_flag); MonitorLocker ml(_lock, Mutex::_no_safepoint_check_flag);
if (*used > 0 || force_reset) {
if (*used > 0) {
_current->_in_used += *used; _current->_in_used += *used;
// Check if we do not waste more than _max_waste. If yes, write the buffer. // Check if we do not waste more than _max_waste. If yes, write the buffer.
// Otherwise return the rest of the buffer as the new buffer. // Otherwise return the rest of the buffer as the new buffer.
if (_current->_in_max - _current->_in_used <= _max_waste) { if (_current->_in_max - _current->_in_used <= _max_waste || force_reset) {
_current->_id = _next_id++; _current->_id = _next_id++;
_to_compress.add_last(_current); _to_compress.add_last(_current);
_current = NULL; _current = NULL;
@ -383,7 +417,6 @@ void CompressionBackend::get_new_buffer(char** buffer, size_t* used, size_t* max
*buffer = _current->_in + _current->_in_used; *buffer = _current->_in + _current->_in_used;
*used = 0; *used = 0;
*max = _current->_in_max - _current->_in_used; *max = _current->_in_max - _current->_in_used;
return; return;
} }
} }

View File

@ -204,6 +204,7 @@ class CompressionBackend : StackObj {
WriteWork* get_work(); WriteWork* get_work();
void do_compress(WriteWork* work); void do_compress(WriteWork* work);
void finish_work(WriteWork* work); void finish_work(WriteWork* work);
void flush_buffer(MonitorLocker* ml);
public: public:
// compressor can be NULL if no compression is used. // compressor can be NULL if no compression is used.
@ -220,14 +221,20 @@ public:
char const* error() const { return _err; } char const* error() const { return _err; }
// Sets up an internal buffer, fills with external buffer, and sends to compressor.
void flush_external_buffer(char* buffer, size_t used, size_t max);
// Commits the old buffer (using the value in *used) and sets up a new one. // Commits the old buffer (using the value in *used) and sets up a new one.
void get_new_buffer(char** buffer, size_t* used, size_t* max); void get_new_buffer(char** buffer, size_t* used, size_t* max, bool force_reset = false);
// The entry point for a worker thread. // The entry point for a worker thread.
void thread_loop(); void thread_loop();
// Shuts down the backend, releasing all threads. // Shuts down the backend, releasing all threads.
void deactivate(); void deactivate();
// Flush all compressed data in buffer to file
void flush_buffer();
}; };

View File

@ -94,19 +94,39 @@ public abstract class Reader {
= new HprofReader(heapFile, in, dumpNumber, = new HprofReader(heapFile, in, dumpNumber,
callStack, debugLevel); callStack, debugLevel);
return r.read(); return r.read();
} else if ((access = GzipRandomAccess.getAccess(heapFile, 16)) != null) { } else if ((i >>> 8) == GZIP_HEADER_MAGIC) {
// Possible gziped file, try decompress it and get the stack trace.
in.close(); in.close();
try (BufferedInputStream gzBis = new BufferedInputStream(access.asStream(0)); String deCompressedFile = "heapdump" + System.currentTimeMillis() + ".hprof";
PositionDataInputStream pdin = new PositionDataInputStream(gzBis)) { File out = new File(deCompressedFile);
// Decompress to get dump file.
try (FileInputStream heapFis = new FileInputStream(heapFile);
GZIPInputStream gis = new GZIPInputStream(heapFis);
FileOutputStream fos = new FileOutputStream(out)) {
byte[] buffer = new byte[1024 * 1024];
int len = 0;
while ((len = gis.read(buffer)) > 0) {
fos.write(buffer, 0, len);
}
} catch (Exception e) {
out.delete();
throw new IOException("Cannot decompress the compressed hprof file", e);
}
// Check dump data header and print stack trace.
try (FileInputStream outFis = new FileInputStream(out);
BufferedInputStream outBis = new BufferedInputStream(outFis);
PositionDataInputStream pdin = new PositionDataInputStream(outBis)) {
i = pdin.readInt(); i = pdin.readInt();
if (i == HprofReader.MAGIC_NUMBER) { if (i == HprofReader.MAGIC_NUMBER) {
Reader r HprofReader r
= new HprofReader(access.asFileBuffer(), pdin, dumpNumber, = new HprofReader(deCompressedFile, pdin, dumpNumber,
callStack, debugLevel); true, debugLevel);
return r.read(); return r.read();
} else { } else {
throw new IOException("Wrong magic number in gzipped file: " + i); throw new IOException("Unrecognized magic number found in decompressed data: " + i);
} }
} finally {
out.delete();
} }
} else { } else {
throw new IOException("Unrecognized magic number: " + i); throw new IOException("Unrecognized magic number: " + i);