8292989: Avoid dynamic memory in AsyncLogWriter

Reviewed-by: jsjolen, stuefe
This commit is contained in:
Xin Liu 2022-09-15 18:32:21 +00:00
parent 2028ec7412
commit bf79f99c0c
5 changed files with 304 additions and 236 deletions

View File

@ -27,6 +27,7 @@
#include "logging/logFileOutput.hpp"
#include "logging/logFileStreamOutput.hpp"
#include "logging/logHandle.hpp"
#include "memory/resourceArea.hpp"
#include "runtime/atomic.hpp"
#include "runtime/os.inline.hpp"
@ -42,28 +43,49 @@ class AsyncLogWriter::AsyncLogLocker : public StackObj {
}
};
void AsyncLogWriter::enqueue_locked(const AsyncLogMessage& msg) {
if (_buffer.size() >= _buffer_max_size) {
// LogDecorator::None applies to 'constant initialization' because of its constexpr constructor.
const LogDecorations& AsyncLogWriter::None = LogDecorations(LogLevel::Warning, LogTagSetMapping<LogTag::__NO_TAG>::tagset(),
LogDecorators::None);
bool AsyncLogWriter::Buffer::push_back(LogFileStreamOutput* output, const LogDecorations& decorations, const char* msg) {
const size_t sz = Message::calc_size(strlen(msg));
const bool is_token = output == nullptr;
// Always leave headroom for the flush token. Pushing a token must succeed.
const size_t headroom = (!is_token) ? Message::calc_size(0) : 0;
if (_pos + sz <= (_capacity - headroom)) {
new(_buf + _pos) Message(output, decorations, msg);
_pos += sz;
return true;
}
return false;
}
void AsyncLogWriter::Buffer::push_flush_token() {
bool result = push_back(nullptr, AsyncLogWriter::None, "");
assert(result, "fail to enqueue the flush token.");
}
void AsyncLogWriter::enqueue_locked(LogFileStreamOutput* output, const LogDecorations& decorations, const char* msg) {
// To save space and streamline execution, we just ignore null message.
// client should use "" instead.
assert(msg != nullptr, "enqueuing a null message!");
if (!_buffer->push_back(output, decorations, msg)) {
bool p_created;
uint32_t* counter = _stats.put_if_absent(msg.output(), 0, &p_created);
uint32_t* counter = _stats.put_if_absent(output, 0, &p_created);
*counter = *counter + 1;
// drop the enqueueing message.
os::free(msg.message());
return;
}
_buffer.push_back(msg);
_data_available = true;
_lock.notify();
}
void AsyncLogWriter::enqueue(LogFileStreamOutput& output, const LogDecorations& decorations, const char* msg) {
AsyncLogMessage m(&output, decorations, os::strdup(msg));
{ // critical area
AsyncLogLocker locker;
enqueue_locked(m);
}
AsyncLogLocker locker;
enqueue_locked(&output, decorations, msg);
}
// LogMessageBuffer consists of a multiple-part/multiple-line message.
@ -72,8 +94,7 @@ void AsyncLogWriter::enqueue(LogFileStreamOutput& output, LogMessageBuffer::Iter
AsyncLogLocker locker;
for (; !msg_iterator.is_at_end(); msg_iterator++) {
AsyncLogMessage m(&output, msg_iterator.decorations(), os::strdup(msg_iterator.message()));
enqueue_locked(m);
enqueue_locked(&output, msg_iterator.decorations(), msg_iterator.message());
}
}
@ -81,75 +102,71 @@ AsyncLogWriter::AsyncLogWriter()
: _flush_sem(0), _lock(), _data_available(false),
_initialized(false),
_stats() {
size_t size = AsyncLogBufferSize / 2;
_buffer = new Buffer(size);
_buffer_staging = new Buffer(size);
log_info(logging)("AsyncLogBuffer estimates memory use: " SIZE_FORMAT " bytes", size * 2);
if (os::create_thread(this, os::asynclog_thread)) {
_initialized = true;
} else {
log_warning(logging, thread)("AsyncLogging failed to create thread. Falling back to synchronous logging.");
}
log_info(logging)("The maximum entries of AsyncLogBuffer: " SIZE_FORMAT ", estimated memory use: " SIZE_FORMAT " bytes",
_buffer_max_size, AsyncLogBufferSize);
}
class AsyncLogMapIterator {
AsyncLogBuffer& _logs;
public:
AsyncLogMapIterator(AsyncLogBuffer& logs) :_logs(logs) {}
bool do_entry(LogFileStreamOutput* output, uint32_t& counter) {
using none = LogTagSetMapping<LogTag::__NO_TAG>;
if (counter > 0) {
LogDecorations decorations(LogLevel::Warning, none::tagset(), LogDecorators::All);
stringStream ss;
ss.print(UINT32_FORMAT_W(6) " messages dropped due to async logging", counter);
AsyncLogMessage msg(output, decorations, ss.as_string(true /*c_heap*/));
_logs.push_back(msg);
counter = 0;
}
return true;
}
};
void AsyncLogWriter::write() {
// Use kind of copy-and-swap idiom here.
// Empty 'logs' swaps the content with _buffer.
// Along with logs destruction, all processed messages are deleted.
//
// The operation 'pop_all()' is done in O(1). All I/O jobs are then performed without
// lock protection. This guarantees I/O jobs don't block logsites.
AsyncLogBuffer logs;
ResourceMark rm;
// Similar to AsyncLogMap but on resource_area
ResourceHashtable<LogFileStreamOutput*, uint32_t,
17/*table_size*/, ResourceObj::RESOURCE_AREA,
mtLogging> snapshot;
{ // critical region
// lock protection. This guarantees I/O jobs don't block logsites.
{
AsyncLogLocker locker;
_buffer.pop_all(&logs);
// append meta-messages of dropped counters
AsyncLogMapIterator dropped_counters_iter(logs);
_stats.iterate(&dropped_counters_iter);
_buffer_staging->reset();
swap(_buffer, _buffer_staging);
// move counters to snapshot and reset them.
_stats.iterate([&] (LogFileStreamOutput* output, uint32_t& counter) {
if (counter > 0) {
bool created = snapshot.put(output, counter);
assert(created == true, "sanity check");
counter = 0;
}
return true;
});
_data_available = false;
}
LinkedListIterator<AsyncLogMessage> it(logs.head());
int req = 0;
while (!it.is_empty()) {
AsyncLogMessage* e = it.next();
char* msg = e->message();
auto it = _buffer_staging->iterator();
while (it.hasNext()) {
const Message* e = it.next();
if (msg != nullptr) {
e->output()->write_blocking(e->decorations(), msg);
os::free(msg);
} else if (e->output() == nullptr) {
if (!e->is_token()){
e->output()->write_blocking(e->decorations(), e->message());
} else {
// This is a flush token. Record that we found it and then
// signal the flushing thread after the loop.
req++;
}
}
LogDecorations decorations(LogLevel::Warning, LogTagSetMapping<LogTag::__NO_TAG>::tagset(),
LogDecorators::All);
snapshot.iterate([&](LogFileStreamOutput* output, uint32_t& counter) {
if (counter > 0) {
stringStream ss;
ss.print(UINT32_FORMAT_W(6) " messages dropped due to async logging", counter);
output->write_blocking(decorations, ss.as_string(false));
}
return true;
});
if (req > 0) {
assert(req == 1, "AsyncLogWriter::flush() is NOT MT-safe!");
assert(req == 1, "Only one token is allowed in queue. AsyncLogWriter::flush() is NOT MT-safe!");
_flush_sem.signal(req);
}
}
@ -186,6 +203,8 @@ void AsyncLogWriter::initialize() {
}
os::start_thread(self);
log_debug(logging, thread)("Async logging thread started.");
} else {
delete self;
}
}
@ -200,13 +219,9 @@ AsyncLogWriter* AsyncLogWriter::instance() {
void AsyncLogWriter::flush() {
if (_instance != nullptr) {
{
using none = LogTagSetMapping<LogTag::__NO_TAG>;
AsyncLogLocker locker;
LogDecorations d(LogLevel::Off, none::tagset(), LogDecorators::None);
AsyncLogMessage token(nullptr, d, nullptr);
// Push directly in-case we are at logical max capacity, as this must not get dropped.
_instance->_buffer.push_back(token);
_instance->_buffer->push_flush_token();
_instance->_data_available = true;
_instance->_lock.notify();
}
@ -214,3 +229,23 @@ void AsyncLogWriter::flush() {
_instance->_flush_sem.wait();
}
}
AsyncLogWriter::BufferUpdater::BufferUpdater(size_t newsize) {
AsyncLogLocker locker;
auto p = AsyncLogWriter::_instance;
_buf1 = p->_buffer;
_buf2 = p->_buffer_staging;
p->_buffer = new Buffer(newsize);
p->_buffer_staging = new Buffer(newsize);
}
AsyncLogWriter::BufferUpdater::~BufferUpdater() {
AsyncLogLocker locker;
auto p = AsyncLogWriter::_instance;
delete p->_buffer;
delete p->_buffer_staging;
p->_buffer = _buf1;
p->_buffer_staging = _buf2;
}

View File

@ -26,98 +26,13 @@
#include "logging/log.hpp"
#include "logging/logDecorations.hpp"
#include "logging/logMessageBuffer.hpp"
#include "memory/resourceArea.hpp"
#include "memory/allocation.hpp"
#include "runtime/mutex.hpp"
#include "runtime/nonJavaThread.hpp"
#include "runtime/semaphore.hpp"
#include "utilities/resourceHash.hpp"
#include "utilities/linkedlist.hpp"
template <typename E, MEMFLAGS F>
class LinkedListDeque : private LinkedListImpl<E, ResourceObj::C_HEAP, F> {
private:
LinkedListNode<E>* _tail;
size_t _size;
public:
LinkedListDeque() : _tail(NULL), _size(0) {}
void push_back(const E& e) {
if (!_tail) {
_tail = this->add(e);
} else {
_tail = this->insert_after(e, _tail);
}
++_size;
}
// pop all elements to logs.
void pop_all(LinkedList<E>* logs) {
logs->move(static_cast<LinkedList<E>* >(this));
_tail = NULL;
_size = 0;
}
void pop_all(LinkedListDeque<E, F>* logs) {
logs->_size = _size;
logs->_tail = _tail;
pop_all(static_cast<LinkedList<E>* >(logs));
}
void pop_front() {
LinkedListNode<E>* h = this->unlink_head();
if (h == _tail) {
_tail = NULL;
}
if (h != NULL) {
--_size;
this->delete_node(h);
}
}
size_t size() const { return _size; }
const E* front() const {
return this->_head == NULL ? NULL : this->_head->peek();
}
const E* back() const {
return _tail == NULL ? NULL : _tail->peek();
}
LinkedListNode<E>* head() const {
return this->_head;
}
};
// Forward declaration
class LogFileStreamOutput;
class AsyncLogMessage {
LogFileStreamOutput* _output;
const LogDecorations _decorations;
char* _message;
public:
AsyncLogMessage(LogFileStreamOutput* output, const LogDecorations& decorations, char* msg)
: _output(output), _decorations(decorations), _message(msg) {}
// placeholder for LinkedListImpl.
bool equals(const AsyncLogMessage& o) const { return false; }
LogFileStreamOutput* output() const { return _output; }
const LogDecorations& decorations() const { return _decorations; }
char* message() const { return _message; }
};
typedef LinkedListDeque<AsyncLogMessage, mtLogging> AsyncLogBuffer;
typedef ResourceHashtable<LogFileStreamOutput*,
uint32_t,
17, /*table_size*/
ResourceObj::C_HEAP,
mtLogging> AsyncLogMap;
//
// ASYNC LOGGING SUPPORT
//
@ -140,7 +55,98 @@ typedef ResourceHashtable<LogFileStreamOutput*,
// change the logging configuration via jcmd, LogConfiguration::configure_output() calls flush() under the protection of the
// ConfigurationLock. In addition flush() is called during JVM termination, via LogConfiguration::finalize.
class AsyncLogWriter : public NonJavaThread {
friend class AsyncLogTest;
friend class AsyncLogTest_logBuffer_vm_Test;
class AsyncLogLocker;
using AsyncLogMap = ResourceHashtable<LogFileStreamOutput*,
uint32_t,
17, /*table_size*/
ResourceObj::C_HEAP,
mtLogging>;
// Messsage is the envelop of a log line and its associative data.
// Its length is variable because of the zero-terminated c-str. It is only valid when we create it using placement new
// within a buffer.
//
// Example layout:
// ---------------------------------------------
// |_output|_decorations|"a log line", |pad| <- pointer aligned.
// |_output|_decorations|"yet another",|pad|
// ...
// |nullptr|_decorations|"",|pad| <- flush token
// |<- _pos
// ---------------------------------------------
class Message {
NONCOPYABLE(Message);
~Message() = delete;
LogFileStreamOutput* const _output;
const LogDecorations _decorations;
public:
Message(LogFileStreamOutput* output, const LogDecorations& decorations, const char* msg)
: _output(output), _decorations(decorations) {
assert(msg != nullptr, "c-str message can not be null!");
PRAGMA_STRINGOP_OVERFLOW_IGNORED
strcpy(reinterpret_cast<char* >(this+1), msg);
}
// Calculate the size for a prospective Message object depending on its message length including the trailing zero
static constexpr size_t calc_size(size_t message_len) {
return align_up(sizeof(Message) + message_len + 1, sizeof(void*));
}
size_t size() const {
return calc_size(strlen(message()));
}
inline bool is_token() const { return _output == nullptr; }
LogFileStreamOutput* output() const { return _output; }
const LogDecorations& decorations() const { return _decorations; }
const char* message() const { return reinterpret_cast<const char *>(this+1); }
};
class Buffer : public CHeapObj<mtLogging> {
char* _buf;
size_t _pos;
const size_t _capacity;
public:
Buffer(size_t capacity) : _pos(0), _capacity(capacity) {
_buf = NEW_C_HEAP_ARRAY(char, capacity, mtLogging);
assert(capacity >= Message::calc_size(0), "capcity must be great a token size");
}
~Buffer() {
FREE_C_HEAP_ARRAY(char, _buf);
}
void push_flush_token();
bool push_back(LogFileStreamOutput* output, const LogDecorations& decorations, const char* msg);
void reset() { _pos = 0; }
class Iterator {
const Buffer& _buf;
size_t _curr;
public:
Iterator(const Buffer& buffer): _buf(buffer), _curr(0) {}
bool hasNext() const {
return _curr < _buf._pos;
}
const Message* next() {
assert(hasNext(), "sanity check");
auto msg = reinterpret_cast<Message*>(_buf._buf + _curr);
_curr = MIN2(_curr + msg->size(), _buf._pos);
return msg;
}
};
Iterator iterator() const {
return Iterator(*this);
}
};
static AsyncLogWriter* _instance;
Semaphore _flush_sem;
@ -149,14 +155,15 @@ class AsyncLogWriter : public NonJavaThread {
bool _data_available;
volatile bool _initialized;
AsyncLogMap _stats; // statistics for dropped messages
AsyncLogBuffer _buffer;
// The memory use of each AsyncLogMessage (payload) consists of itself and a variable-length c-str message.
// A regular logging message is smaller than vwrite_buffer_size, which is defined in logtagset.cpp
const size_t _buffer_max_size = {AsyncLogBufferSize / (sizeof(AsyncLogMessage) + vwrite_buffer_size)};
// ping-pong buffers
Buffer* _buffer;
Buffer* _buffer_staging;
static const LogDecorations& None;
AsyncLogWriter();
void enqueue_locked(const AsyncLogMessage& msg);
void enqueue_locked(LogFileStreamOutput* output, const LogDecorations& decorations, const char* msg);
void write();
void run() override;
void pre_run() override {
@ -171,6 +178,16 @@ class AsyncLogWriter : public NonJavaThread {
st->cr();
}
// for testing-only
class BufferUpdater {
Buffer* _buf1;
Buffer* _buf2;
public:
BufferUpdater(size_t newsize);
~BufferUpdater();
};
public:
void enqueue(LogFileStreamOutput& output, const LogDecorations& decorations, const char* msg);
void enqueue(LogFileStreamOutput& output, LogMessageBuffer::Iterator msg_iterator);

View File

@ -36,8 +36,8 @@ struct AllBitmask<LogDecorators::Count> {
static const uint _value = 0;
};
const LogDecorators LogDecorators::None = LogDecorators(0);
const LogDecorators LogDecorators::All = LogDecorators(AllBitmask<time_decorator>::_value);
const LogDecorators LogDecorators::None = {0};
const LogDecorators LogDecorators::All = {AllBitmask<time_decorator>::_value};
const char* LogDecorators::_name[][2] = {
#define DECORATOR(n, a) {#n, #a},

View File

@ -77,7 +77,7 @@ class LogDecorators {
return 1 << decorator;
}
LogDecorators(uint mask) : _decorators(mask) {
constexpr LogDecorators(uint mask) : _decorators(mask) {
}
public:

View File

@ -26,6 +26,7 @@
#include "jvm.h"
#include "logging/log.hpp"
#include "logging/logAsyncWriter.hpp"
#include "logging/logFileOutput.hpp"
#include "logging/logMessage.hpp"
#include "logTestFixture.hpp"
#include "logTestUtils.inline.hpp"
@ -69,15 +70,16 @@ LOG_LEVEL_LIST
}
void test_asynclog_drop_messages() {
if (AsyncLogWriter::instance() != nullptr) {
const size_t sz = 100;
auto writer = AsyncLogWriter::instance();
if (writer != nullptr) {
const size_t sz = 2000;
// shrink async buffer.
AutoModifyRestore<size_t> saver(AsyncLogBufferSize, sz * 1024 /*in byte*/);
AsyncLogWriter::BufferUpdater saver(1024);
LogMessage(logging) lm;
// write 100x more messages than its capacity in burst
for (size_t i = 0; i < sz * 100; ++i) {
// write more messages than its capacity in burst
for (size_t i = 0; i < sz; ++i) {
lm.debug("a lot of log...");
}
lm.flush();
@ -101,78 +103,6 @@ LOG_LEVEL_LIST
}
};
TEST_VM(AsyncLogBufferTest, fifo) {
LinkedListDeque<int, mtLogging> fifo;
LinkedListImpl<int, ResourceObj::C_HEAP, mtLogging> result;
fifo.push_back(1);
EXPECT_EQ((size_t)1, fifo.size());
EXPECT_EQ(1, *(fifo.back()));
fifo.pop_all(&result);
EXPECT_EQ((size_t)0, fifo.size());
EXPECT_EQ(NULL, fifo.back());
EXPECT_EQ((size_t)1, result.size());
EXPECT_EQ(1, *(result.head()->data()));
result.clear();
fifo.push_back(2);
fifo.push_back(1);
fifo.pop_all(&result);
EXPECT_EQ((size_t)2, result.size());
EXPECT_EQ(2, *(result.head()->data()));
EXPECT_EQ(1, *(result.head()->next()->data()));
result.clear();
const int N = 1000;
for (int i=0; i<N; ++i) {
fifo.push_back(i);
}
fifo.pop_all(&result);
EXPECT_EQ((size_t)N, result.size());
LinkedListIterator<int> it(result.head());
for (int i=0; i<N; ++i) {
int* e = it.next();
EXPECT_EQ(i, *e);
}
}
TEST_VM(AsyncLogBufferTest, deque) {
LinkedListDeque<int, mtLogging> deque;
const int N = 10;
EXPECT_EQ(NULL, deque.front());
EXPECT_EQ(NULL, deque.back());
for (int i = 0; i < N; ++i) {
deque.push_back(i);
}
EXPECT_EQ(0, *(deque.front()));
EXPECT_EQ(N-1, *(deque.back()));
EXPECT_EQ((size_t)N, deque.size());
deque.pop_front();
EXPECT_EQ((size_t)(N - 1), deque.size());
EXPECT_EQ(1, *(deque.front()));
EXPECT_EQ(N - 1, *(deque.back()));
deque.pop_front();
EXPECT_EQ((size_t)(N - 2), deque.size());
EXPECT_EQ(2, *(deque.front()));
EXPECT_EQ(N - 1, *(deque.back()));
for (int i=2; i < N-1; ++i) {
deque.pop_front();
}
EXPECT_EQ((size_t)1, deque.size());
EXPECT_EQ(N - 1, *(deque.back()));
EXPECT_EQ(deque.back(), deque.front());
deque.pop_front();
EXPECT_EQ((size_t)0, deque.size());
}
TEST_VM_F(AsyncLogTest, asynclog) {
set_log_config(TestLogFileName, "logging=debug");
@ -229,6 +159,92 @@ TEST_VM_F(AsyncLogTest, logMessage) {
EXPECT_TRUE(file_contains_substring(TestLogFileName, "a noisy message from other logger"));
}
TEST_VM_F(AsyncLogTest, logBuffer) {
const auto Default = LogDecorations(LogLevel::Warning, LogTagSetMapping<LogTag::__NO_TAG>::tagset(),
LogDecorators());
size_t len = strlen(TestLogFileName) + strlen(LogFileOutput::Prefix) + 1;
char* name = NEW_C_HEAP_ARRAY(char, len, mtLogging);
snprintf(name, len, "%s%s", LogFileOutput::Prefix, TestLogFileName);
LogFileStreamOutput* output = new LogFileOutput(name);
output->initialize(nullptr, nullptr);
auto buffer = new AsyncLogWriter::Buffer(1024);
int line = 0;
int written;
uintptr_t addr;
const uintptr_t mask = (uintptr_t)(sizeof(void*) - 1);
bool res;
res = buffer->push_back(output, Default, "a log line");
EXPECT_TRUE(res) << "first message should succeed.";
line++;
res = buffer->push_back(output, Default, "yet another");
EXPECT_TRUE(res) << "second message should succeed.";
line++;
auto it = buffer->iterator();
EXPECT_TRUE(it.hasNext());
const AsyncLogWriter::Message* e = it.next();
addr = reinterpret_cast<uintptr_t>(e);
EXPECT_EQ(0, (int)(addr & (sizeof(void*)-1))); // returned vaue aligns on sizeof(pointer)
EXPECT_EQ(output, e->output());
EXPECT_EQ(0, memcmp(&Default, &e->decorations(), sizeof(LogDecorations)));
EXPECT_STREQ("a log line", e->message());
written = e->output()->write_blocking(e->decorations(), e->message());
EXPECT_GT(written, 0);
EXPECT_TRUE(it.hasNext());
e = it.next();
addr = reinterpret_cast<uintptr_t>(e);
EXPECT_EQ(0, (int)(addr & (sizeof(void*)-1)));
EXPECT_EQ(output, e->output());
EXPECT_EQ(0, memcmp(&Default, &e->decorations(), sizeof(LogDecorations)));
EXPECT_STREQ("yet another", e->message());
written = e->output()->write_blocking(e->decorations(), e->message());
EXPECT_GT(written, 0);
while (buffer->push_back(output, Default, "0123456789abcdef")) {
line++;
}
EXPECT_GT(line, 2);
while (it.hasNext()) {
e = it.next();
addr = reinterpret_cast<uintptr_t>(e);
EXPECT_EQ(0, (int)(addr & (sizeof(void*)-1)));
EXPECT_EQ(output, e->output());
EXPECT_STREQ("0123456789abcdef", e->message());
written = e->output()->write_blocking(e->decorations(), e->message());
EXPECT_GT(written, 0);
line--;
}
EXPECT_EQ(line, 2);
// last one, flush token. expect to succeed even buffer has been full.
buffer->push_flush_token();
EXPECT_TRUE(it.hasNext());
e = it.next();
EXPECT_EQ(e->output(), nullptr);
EXPECT_TRUE(e->is_token());
EXPECT_STREQ("", e->message());
EXPECT_FALSE(it.hasNext());
// reset buffer
buffer->reset();
EXPECT_FALSE(buffer->iterator().hasNext());
delete output; // close file
FREE_C_HEAP_ARRAY(char, name);
const char* strs[4];
strs[0] = "a log line";
strs[1] = "yet another";
strs[2] = "0123456789abcdef";
strs[3] = nullptr; // sentinel!
EXPECT_TRUE(file_contains_substrings_in_order(TestLogFileName, strs));
}
TEST_VM_F(AsyncLogTest, droppingMessage) {
set_log_config(TestLogFileName, "logging=debug");
test_asynclog_drop_messages();