8257906: JFR: RecordingStream leaks memory

Reviewed-by: mgronlun
This commit is contained in:
Erik Gahlin 2020-12-16 11:03:41 +00:00
parent 0c8cc2cde4
commit 3c66485015
4 changed files with 54 additions and 3 deletions

View File

@ -45,7 +45,6 @@ import jdk.jfr.internal.PrivateAccess;
import jdk.jfr.internal.SecuritySupport;
import jdk.jfr.internal.Utils;
import jdk.jfr.internal.consumer.EventDirectoryStream;
import jdk.jfr.internal.consumer.JdkJfrConsumer;
/**
* A recording stream produces events from the current JVM (Java Virtual
@ -68,9 +67,27 @@ import jdk.jfr.internal.consumer.JdkJfrConsumer;
*/
public final class RecordingStream implements AutoCloseable, EventStream {
final static class ChunkConsumer implements Consumer<Long> {
private final Recording recording;
ChunkConsumer(Recording recording) {
this.recording = recording;
}
@Override
public void accept(Long endNanos) {
Instant t = Utils.epochNanosToInstant(endNanos);
PlatformRecording p = PrivateAccess.getInstance().getPlatformRecording(recording);
p.removeBefore(t);
}
}
private final Recording recording;
private final Instant creationTime;
private final EventDirectoryStream directoryStream;
private long maxSize;
private Duration maxAge;
/**
* Creates an event stream for the current JVM (Java Virtual Machine).
@ -247,7 +264,11 @@ public final class RecordingStream implements AutoCloseable, EventStream {
* state
*/
public void setMaxAge(Duration maxAge) {
synchronized (directoryStream) {
recording.setMaxAge(maxAge);
this.maxAge = maxAge;
updateOnCompleteHandler();
}
}
/**
@ -270,7 +291,11 @@ public final class RecordingStream implements AutoCloseable, EventStream {
* @throws IllegalStateException if the recording is in {@code CLOSED} state
*/
public void setMaxSize(long maxSize) {
synchronized (directoryStream) {
recording.setMaxSize(maxSize);
this.maxSize = maxSize;
updateOnCompleteHandler();
}
}
@Override
@ -320,6 +345,7 @@ public final class RecordingStream implements AutoCloseable, EventStream {
@Override
public void close() {
directoryStream.setChunkCompleteHandler(null);
recording.close();
directoryStream.close();
}
@ -333,6 +359,7 @@ public final class RecordingStream implements AutoCloseable, EventStream {
public void start() {
PlatformRecording pr = PrivateAccess.getInstance().getPlatformRecording(recording);
long startNanos = pr.start();
updateOnCompleteHandler();
directoryStream.start(startNanos);
}
@ -363,6 +390,7 @@ public final class RecordingStream implements AutoCloseable, EventStream {
public void startAsync() {
PlatformRecording pr = PrivateAccess.getInstance().getPlatformRecording(recording);
long startNanos = pr.start();
updateOnCompleteHandler();
directoryStream.startAsync(startNanos);
}
@ -380,4 +408,13 @@ public final class RecordingStream implements AutoCloseable, EventStream {
public void onMetadata(Consumer<MetadataEvent> action) {
directoryStream.onMetadata(action);
}
private void updateOnCompleteHandler() {
if (maxAge != null || maxSize != 0) {
// User has set a chunk removal policy
directoryStream.setChunkCompleteHandler(null);
} else {
directoryStream.setChunkCompleteHandler(new ChunkConsumer(recording));
}
}
}

View File

@ -488,4 +488,16 @@ public final class ChunkParser {
public boolean hasStaleMetadata() {
return staleMetadata;
}
public void resetCache() {
LongMap<Parser> ps = this.parsers;
if (ps != null) {
ps.forEach(p -> {
if (p instanceof EventParser) {
EventParser ep = (EventParser) p;
ep.resetCache();
}
});
}
}
}

View File

@ -159,6 +159,7 @@ public class EventDirectoryStream extends AbstractEventStream {
} else {
processUnordered(disp);
}
currentParser.resetCache();
if (currentParser.getStartNanos() + currentParser.getChunkDuration() > filterEnd) {
close();
return;

View File

@ -102,6 +102,7 @@ public final class EventFileStream extends AbstractEventStream {
} else {
processUnordered(disp);
}
currentParser.resetCache();
if (isClosed() || currentParser.isLastChunk()) {
return;
}