8257906: JFR: RecordingStream leaks memory
Reviewed-by: mgronlun
Backport-of: 3c66485015
This commit is contained in:
parent
6775113ce8
commit
bbc2e9510b
@ -45,7 +45,6 @@ import jdk.jfr.internal.PrivateAccess;
|
||||
import jdk.jfr.internal.SecuritySupport;
|
||||
import jdk.jfr.internal.Utils;
|
||||
import jdk.jfr.internal.consumer.EventDirectoryStream;
|
||||
import jdk.jfr.internal.consumer.JdkJfrConsumer;
|
||||
|
||||
/**
|
||||
* A recording stream produces events from the current JVM (Java Virtual
|
||||
@ -68,9 +67,27 @@ import jdk.jfr.internal.consumer.JdkJfrConsumer;
|
||||
*/
|
||||
public final class RecordingStream implements AutoCloseable, EventStream {
|
||||
|
||||
final static class ChunkConsumer implements Consumer<Long> {
|
||||
|
||||
private final Recording recording;
|
||||
|
||||
ChunkConsumer(Recording recording) {
|
||||
this.recording = recording;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void accept(Long endNanos) {
|
||||
Instant t = Utils.epochNanosToInstant(endNanos);
|
||||
PlatformRecording p = PrivateAccess.getInstance().getPlatformRecording(recording);
|
||||
p.removeBefore(t);
|
||||
}
|
||||
}
|
||||
|
||||
private final Recording recording;
|
||||
private final Instant creationTime;
|
||||
private final EventDirectoryStream directoryStream;
|
||||
private long maxSize;
|
||||
private Duration maxAge;
|
||||
|
||||
/**
|
||||
* Creates an event stream for the current JVM (Java Virtual Machine).
|
||||
@ -247,7 +264,11 @@ public final class RecordingStream implements AutoCloseable, EventStream {
|
||||
* state
|
||||
*/
|
||||
public void setMaxAge(Duration maxAge) {
|
||||
recording.setMaxAge(maxAge);
|
||||
synchronized (directoryStream) {
|
||||
recording.setMaxAge(maxAge);
|
||||
this.maxAge = maxAge;
|
||||
updateOnCompleteHandler();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -270,7 +291,11 @@ public final class RecordingStream implements AutoCloseable, EventStream {
|
||||
* @throws IllegalStateException if the recording is in {@code CLOSED} state
|
||||
*/
|
||||
public void setMaxSize(long maxSize) {
|
||||
recording.setMaxSize(maxSize);
|
||||
synchronized (directoryStream) {
|
||||
recording.setMaxSize(maxSize);
|
||||
this.maxSize = maxSize;
|
||||
updateOnCompleteHandler();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -320,6 +345,7 @@ public final class RecordingStream implements AutoCloseable, EventStream {
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
directoryStream.setChunkCompleteHandler(null);
|
||||
recording.close();
|
||||
directoryStream.close();
|
||||
}
|
||||
@ -333,6 +359,7 @@ public final class RecordingStream implements AutoCloseable, EventStream {
|
||||
public void start() {
|
||||
PlatformRecording pr = PrivateAccess.getInstance().getPlatformRecording(recording);
|
||||
long startNanos = pr.start();
|
||||
updateOnCompleteHandler();
|
||||
directoryStream.start(startNanos);
|
||||
}
|
||||
|
||||
@ -363,6 +390,7 @@ public final class RecordingStream implements AutoCloseable, EventStream {
|
||||
public void startAsync() {
|
||||
PlatformRecording pr = PrivateAccess.getInstance().getPlatformRecording(recording);
|
||||
long startNanos = pr.start();
|
||||
updateOnCompleteHandler();
|
||||
directoryStream.startAsync(startNanos);
|
||||
}
|
||||
|
||||
@ -380,4 +408,13 @@ public final class RecordingStream implements AutoCloseable, EventStream {
|
||||
public void onMetadata(Consumer<MetadataEvent> action) {
|
||||
directoryStream.onMetadata(action);
|
||||
}
|
||||
|
||||
private void updateOnCompleteHandler() {
|
||||
if (maxAge != null || maxSize != 0) {
|
||||
// User has set a chunk removal policy
|
||||
directoryStream.setChunkCompleteHandler(null);
|
||||
} else {
|
||||
directoryStream.setChunkCompleteHandler(new ChunkConsumer(recording));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -488,4 +488,16 @@ public final class ChunkParser {
|
||||
public boolean hasStaleMetadata() {
|
||||
return staleMetadata;
|
||||
}
|
||||
|
||||
public void resetCache() {
|
||||
LongMap<Parser> ps = this.parsers;
|
||||
if (ps != null) {
|
||||
ps.forEach(p -> {
|
||||
if (p instanceof EventParser) {
|
||||
EventParser ep = (EventParser) p;
|
||||
ep.resetCache();
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -159,6 +159,7 @@ public class EventDirectoryStream extends AbstractEventStream {
|
||||
} else {
|
||||
processUnordered(disp);
|
||||
}
|
||||
currentParser.resetCache();
|
||||
if (currentParser.getStartNanos() + currentParser.getChunkDuration() > filterEnd) {
|
||||
close();
|
||||
return;
|
||||
|
@ -102,6 +102,7 @@ public final class EventFileStream extends AbstractEventStream {
|
||||
} else {
|
||||
processUnordered(disp);
|
||||
}
|
||||
currentParser.resetCache();
|
||||
if (isClosed() || currentParser.isLastChunk()) {
|
||||
return;
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user