8297338: JFR: RemoteRecordingStream doesn't respect setMaxAge and setMaxSize

Reviewed-by: mgronlun
This commit is contained in:
Erik Gahlin 2022-11-23 17:48:29 +00:00
parent 8df3bc4ec5
commit 2afb4c3327
5 changed files with 246 additions and 17 deletions
src/jdk.management.jfr/share/classes/jdk/management/jfr
test/jdk/jdk/jfr/jmx/streaming

@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, 2021, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2020, 2022, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -429,7 +429,7 @@ final class DiskRepository implements Closeable {
}
int count = 0;
while (chunks.size() > 1) {
DiskChunk oldestChunk = chunks.getLast();
DiskChunk oldestChunk = chunks.peekLast();
if (oldestChunk.endTime.isAfter(oldest)) {
return;
}
@ -440,14 +440,14 @@ final class DiskRepository implements Closeable {
}
private void removeOldestChunk() {
DiskChunk chunk = chunks.poll();
DiskChunk chunk = chunks.pollLast();
chunk.release();
size -= chunk.size;
}
public synchronized void onChunkComplete(long endTimeNanos) {
while (!chunks.isEmpty()) {
DiskChunk oldestChunk = chunks.peek();
DiskChunk oldestChunk = chunks.peekLast();
if (oldestChunk.startTimeNanos < endTimeNanos) {
removeOldestChunk();
} else {
@ -460,7 +460,7 @@ final class DiskRepository implements Closeable {
if (maxAge != null) {
trimToAge(chunk.endTime.minus(maxAge));
}
chunks.push(chunk);
chunks.addFirst(chunk);
size += chunk.size;
trimToSize();
@ -500,9 +500,13 @@ final class DiskRepository implements Closeable {
public synchronized FileDump newDump(long endTime) {
FileDump fd = new FileDump(endTime);
for (DiskChunk dc : chunks) {
// replay history by iterating from oldest to most recent
Iterator<DiskChunk> it = chunks.descendingIterator();
while (it.hasNext()) {
DiskChunk dc = it.next();
fd.add(dc);
}
if (!fd.isComplete()) {
fileDumps.add(fd);
}

@ -1,5 +1,5 @@
/*
* Copyright (c) 2021, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2021, 2022, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -29,12 +29,12 @@ import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.Deque;
import jdk.management.jfr.DiskRepository.DiskChunk;
final class FileDump {
private final Queue<DiskChunk> chunks = new ArrayDeque<>();
private final Deque<DiskChunk> chunks = new ArrayDeque<>();
private final long stopTimeMillis;
private boolean complete;
@ -47,7 +47,7 @@ final class FileDump {
return;
}
dc.acquire();
chunks.add(dc);
chunks.addFirst(dc);
long endMillis = dc.endTimeNanos / 1_000_000;
if (endMillis >= stopTimeMillis) {
setComplete();
@ -75,7 +75,7 @@ final class FileDump {
while (true) {
synchronized (this) {
if (!chunks.isEmpty()) {
return chunks.poll();
return chunks.pollLast();
}
if (complete) {
return null;
@ -86,14 +86,19 @@ final class FileDump {
}
public void write(Path path) throws IOException, InterruptedException {
DiskChunk chunk = null;
try (FileChannel out = FileChannel.open(path, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE)) {
DiskChunk chunk = null;
while ((chunk = oldestChunk()) != null) {
try (FileChannel in = FileChannel.open(chunk.path(), StandardOpenOption.READ)) {
in.transferTo(0, in.size(), out);
}
chunk.release();
chunk = null;
}
} finally {
if (chunk != null) {
chunk.release();
}
close();
}
}

@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, 2021, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2020, 2022, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -156,7 +156,10 @@ public final class RemoteRecordingStream implements EventStream {
volatile Instant startTime;
volatile Instant endTime;
volatile boolean closed;
private boolean started; // always guarded by lock
// always guarded by lock
private boolean started;
private Duration maxAge;
private long maxSize;
/**
* Creates an event stream that operates against a {@link MBeanServerConnection}
@ -415,7 +418,11 @@ public final class RemoteRecordingStream implements EventStream {
*/
public void setMaxAge(Duration maxAge) {
Objects.requireNonNull(maxAge);
repository.setMaxAge(maxAge);
synchronized (lock) {
repository.setMaxAge(maxAge);
this.maxAge = maxAge;
updateOnCompleteHandler();
}
}
/**
@ -441,7 +448,11 @@ public final class RemoteRecordingStream implements EventStream {
if (maxSize < 0) {
throw new IllegalArgumentException("Max size of recording can't be negative");
}
repository.setMaxSize(maxSize);
synchronized (lock) {
repository.setMaxSize(maxSize);
this.maxSize = maxSize;
updateOnCompleteHandler();
}
}
@Override
@ -645,6 +656,15 @@ public final class RemoteRecordingStream implements EventStream {
return Files.createTempDirectory("jfr-streaming");
}
private void updateOnCompleteHandler() {
if (maxAge != null || maxSize != 0) {
// User has set a chunk removal policy
ManagementSupport.setOnChunkCompleteHandler(stream, null);
} else {
ManagementSupport.setOnChunkCompleteHandler(stream, new ChunkConsumer(repository));
}
}
private void startDownload() {
String name = "JFR: Download Thread " + creationTime;
Thread downLoadThread = new DownLoadThread(this, name);

@ -0,0 +1,130 @@
/*
* Copyright (c) 2022, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package jdk.jfr.jmx.streaming;
import java.lang.management.ManagementFactory;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import javax.management.MBeanServerConnection;
import jdk.jfr.Event;
import jdk.jfr.StackTrace;
import jdk.jfr.Recording;
import jdk.jfr.consumer.RecordedEvent;
import jdk.jfr.consumer.RecordingFile;
import jdk.management.jfr.RemoteRecordingStream;
/**
* @test
* @key jfr
* @summary Tests that chunks arrive in the same order they were committed
* @requires vm.hasJFR
* @library /test/lib /test/jdk
* @run main/othervm jdk.jfr.jmx.streaming.TestDumpOrder
*/
public class TestDumpOrder {
private static final MBeanServerConnection CONNECTION = ManagementFactory.getPlatformMBeanServer();
@StackTrace(false)
static class Ant extends Event {
long id;
}
public static void main(String... args) throws Exception {
// Set up the test so half of the events have been consumed
// when the dump occurs.
AtomicLong eventCount = new AtomicLong();
CountDownLatch halfLatch = new CountDownLatch(1);
CountDownLatch dumpLatch = new CountDownLatch(1);
Path directory = Path.of("chunks");
Files.createDirectory(directory);
try (var rs = new RemoteRecordingStream(CONNECTION, directory)) {
rs.setMaxSize(100_000_000); // keep all data
rs.onEvent(event -> {
try {
eventCount.incrementAndGet();
if (eventCount.get() == 10) {
halfLatch.countDown();
dumpLatch.await();
}
} catch (InterruptedException ie) {
ie.printStackTrace();
}
});
rs.startAsync();
long counter = 0;
for (int i = 0; i < 10; i++) {
try (Recording r = new Recording()) {
r.start();
Ant a = new Ant();
a.id = counter++;
a.commit();
Ant b = new Ant();
b.id = counter++;
b.commit();
}
if (counter == 10) {
halfLatch.await();
}
}
Path file = Path.of("events.jfr");
// Wait for most (but not all) chunk files to be downloaded
// before invoking dump()
awaitChunkFiles(directory);
// To stress the implementation, release consumer thread
// during the dump
dumpLatch.countDown();
rs.dump(file);
List<RecordedEvent> events = RecordingFile.readAllEvents(file);
if (events.isEmpty()) {
throw new AssertionError("No events found");
}
// Print events for debugging purposes
events.forEach(System.out::println);
long expected = 0;
for (var event : events) {
long value = event.getLong("id");
if (value != expected) {
throw new Exception("Expected " + expected + ", got " + value);
}
expected++;
}
if (expected != counter) {
throw new Exception("Not all events found");
}
}
}
private static void awaitChunkFiles(Path directory) throws Exception {
while (Files.list(directory).count() < 7) {
Thread.sleep(10);
}
}
}

@ -1,5 +1,5 @@
/*
* Copyright (c) 2021, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2021, 2022, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -26,16 +26,19 @@ import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.nio.file.Path;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.management.MBeanServerConnection;
import jdk.jfr.Event;
import jdk.jfr.Name;
import jdk.jfr.Recording;
import jdk.jfr.consumer.RecordedEvent;
import jdk.jfr.consumer.RecordingFile;
import jdk.jfr.consumer.RecordingStream;
@ -63,6 +66,9 @@ public class TestRemoteDump {
testOneDump();
testMultipleDumps();
testEventAfterDump();
testSetNoPolicy();
testSetMaxAge();
testSetMaxSize();
}
private static void testUnstarted() throws Exception {
@ -91,6 +97,70 @@ public class TestRemoteDump {
}
}
private static List<RecordedEvent> recordWithPolicy(String filename, Consumer<RemoteRecordingStream> policy) throws Exception {
CountDownLatch latch1 = new CountDownLatch(1);
CountDownLatch latch2 = new CountDownLatch(2);
CountDownLatch latch3 = new CountDownLatch(3);
try (var rs = new RemoteRecordingStream(CONNECTION)) {
policy.accept(rs);
rs.onEvent(e -> {
latch1.countDown();
latch2.countDown();
latch3.countDown();
});
rs.startAsync();
DumpEvent e1 = new DumpEvent();
e1.commit();
latch1.await();
// Force chunk rotation
try (Recording r = new Recording()) {
r.start();
DumpEvent e2 = new DumpEvent();
e2.commit();
}
latch2.await();
DumpEvent e3 = new DumpEvent();
e3.commit();
latch3.await();
Path p = Path.of(filename);
rs.dump(p);
return RecordingFile.readAllEvents(p);
}
}
private static void testSetMaxSize() throws Exception {
var events = recordWithPolicy("max-size.jfr", rs -> {
// keeps all events for the dump
rs.setMaxSize(100_000_000);
});
if (events.size() != 3) {
throw new Exception("Expected all 3 events to be in dump after setMaxSize");
}
}
private static void testSetMaxAge() throws Exception {
var events = recordWithPolicy("max-age.jfr", rs -> {
// keeps all events for the dump
rs.setMaxAge(Duration.ofDays(1));
});
if (events.size() != 3) {
throw new Exception("Expected all 3 events to be in dump after setMaxAge");
}
}
private static void testSetNoPolicy() throws Exception {
var events = recordWithPolicy("no-policy.jfr", rs -> {
// use default policy, remove after consumption
});
// Since latch3 have been triggered at least two events/chunks
// before must have been consumed, possibly 3, but it's a race.
if (events.size() > 1) {
throw new Exception("Expected at most one event to not be consumed");
}
}
private static void testMultipleDumps() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
try (var rs = new RemoteRecordingStream(CONNECTION)) {