8274315: JFR: One closed state per file or stream

Reviewed-by: mgronlun
This commit is contained in:
Erik Gahlin 2021-12-22 14:29:46 +00:00
parent e49d4a9870
commit dfb15c3e34
6 changed files with 62 additions and 22 deletions

View File

@ -41,6 +41,7 @@ import jdk.jfr.internal.Type;
import jdk.jfr.internal.consumer.ChunkHeader; import jdk.jfr.internal.consumer.ChunkHeader;
import jdk.jfr.internal.consumer.ChunkParser; import jdk.jfr.internal.consumer.ChunkParser;
import jdk.jfr.internal.consumer.FileAccess; import jdk.jfr.internal.consumer.FileAccess;
import jdk.jfr.internal.consumer.ParserState;
import jdk.jfr.internal.consumer.RecordingInput; import jdk.jfr.internal.consumer.RecordingInput;
/** /**
@ -61,6 +62,7 @@ import jdk.jfr.internal.consumer.RecordingInput;
*/ */
public final class RecordingFile implements Closeable { public final class RecordingFile implements Closeable {
private final ParserState parserState = new ParserState();
private boolean isLastEventInChunk; private boolean isLastEventInChunk;
private final File file; private final File file;
private RecordingInput input; private RecordingInput input;
@ -247,7 +249,7 @@ public final class RecordingFile implements Closeable {
private void findNext() throws IOException { private void findNext() throws IOException {
while (nextEvent == null) { while (nextEvent == null) {
if (chunkParser == null) { if (chunkParser == null) {
chunkParser = new ChunkParser(input); chunkParser = new ChunkParser(input, parserState);
} else if (!chunkParser.isLastChunk()) { } else if (!chunkParser.isLastChunk()) {
chunkParser = chunkParser.nextChunkParser(); chunkParser = chunkParser.nextChunkParser();
} else { } else {

View File

@ -65,7 +65,7 @@ public abstract class AbstractEventStream implements EventStream {
private volatile Thread thread; private volatile Thread thread;
private Dispatcher dispatcher; private Dispatcher dispatcher;
private volatile boolean closed; protected final ParserState parserState = new ParserState();
private boolean daemon = false; private boolean daemon = false;
@ -215,12 +215,12 @@ public abstract class AbstractEventStream implements EventStream {
protected abstract void process() throws IOException; protected abstract void process() throws IOException;
protected final void setClosed(boolean closed) { protected final void closeParser() {
this.closed = closed; parserState.close();
} }
protected final boolean isClosed() { protected final boolean isClosed() {
return closed; return parserState.isClosed();
} }
public final void startAsync(long startNanos) { public final void startAsync(long startNanos) {

View File

@ -97,7 +97,7 @@ public final class ChunkParser {
private final RecordingInput input; private final RecordingInput input;
private final ChunkHeader chunkHeader; private final ChunkHeader chunkHeader;
private final TimeConverter timeConverter; private final TimeConverter timeConverter;
private final ParserState parserState;
private final LongMap<ConstantLookup> constantLookups; private final LongMap<ConstantLookup> constantLookups;
private LongMap<Type> typeMap; private LongMap<Type> typeMap;
@ -105,24 +105,24 @@ public final class ChunkParser {
private boolean chunkFinished; private boolean chunkFinished;
private ParserConfiguration configuration; private ParserConfiguration configuration;
private volatile boolean closed;
private MetadataDescriptor previousMetadata; private MetadataDescriptor previousMetadata;
private MetadataDescriptor metadata; private MetadataDescriptor metadata;
private boolean staleMetadata = true; private boolean staleMetadata = true;
public ChunkParser(RecordingInput input) throws IOException { public ChunkParser(RecordingInput input, ParserState ps) throws IOException {
this(input, new ParserConfiguration()); this(input, new ParserConfiguration(), ps);
} }
ChunkParser(RecordingInput input, ParserConfiguration pc) throws IOException { ChunkParser(RecordingInput input, ParserConfiguration pc, ParserState ps) throws IOException {
this(new ChunkHeader(input), null, pc); this(new ChunkHeader(input), null, pc, ps);
} }
private ChunkParser(ChunkParser previous) throws IOException { private ChunkParser(ChunkParser previous, ParserState ps) throws IOException {
this(new ChunkHeader(previous.input), previous, new ParserConfiguration()); this(new ChunkHeader(previous.input), previous, new ParserConfiguration(), ps);
} }
private ChunkParser(ChunkHeader header, ChunkParser previous, ParserConfiguration pc) throws IOException { private ChunkParser(ChunkHeader header, ChunkParser previous, ParserConfiguration pc, ParserState ps) throws IOException {
this.parserState = ps;
this.configuration = pc; this.configuration = pc;
this.input = header.getInput(); this.input = header.getInput();
this.chunkHeader = header; this.chunkHeader = header;
@ -155,7 +155,7 @@ public final class ChunkParser {
} }
public ChunkParser nextChunkParser() throws IOException { public ChunkParser nextChunkParser() throws IOException {
return new ChunkParser(chunkHeader.nextHeader(), this, configuration); return new ChunkParser(chunkHeader.nextHeader(), this, configuration, parserState);
} }
private void updateConfiguration() { private void updateConfiguration() {
@ -280,7 +280,7 @@ public final class ChunkParser {
Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Waiting for more data (streaming). Read so far: " + chunkHeader.getChunkSize() + " bytes"); Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Waiting for more data (streaming). Read so far: " + chunkHeader.getChunkSize() + " bytes");
} }
while (true) { while (true) {
if (closed) { if (parserState.isClosed()) {
return true; return true;
} }
if (chunkHeader.getLastNanos() > filterEnd) { if (chunkHeader.getLastNanos() > filterEnd) {
@ -437,7 +437,7 @@ public final class ChunkParser {
} }
ChunkParser newChunkParser() throws IOException { ChunkParser newChunkParser() throws IOException {
return new ChunkParser(this); return new ChunkParser(this, parserState);
} }
public boolean isChunkFinished() { public boolean isChunkFinished() {
@ -457,7 +457,7 @@ public final class ChunkParser {
} }
public void close() { public void close() {
this.closed = true; parserState.close();
try { try {
input.close(); input.close();
} catch(IOException e) { } catch(IOException e) {

View File

@ -83,7 +83,7 @@ public class EventDirectoryStream extends AbstractEventStream {
@Override @Override
public void close() { public void close() {
setClosed(true); closeParser();
dispatcher().runCloseActions(); dispatcher().runCloseActions();
repositoryFiles.close(); repositoryFiles.close();
if (currentParser != null) { if (currentParser != null) {
@ -148,7 +148,7 @@ public class EventDirectoryStream extends AbstractEventStream {
} }
currentChunkStartNanos = repositoryFiles.getTimestamp(path); currentChunkStartNanos = repositoryFiles.getTimestamp(path);
try (RecordingInput input = new RecordingInput(path.toFile(), fileAccess)) { try (RecordingInput input = new RecordingInput(path.toFile(), fileAccess)) {
currentParser = new ChunkParser(input, disp.parserConfiguration); currentParser = new ChunkParser(input, disp.parserConfiguration, parserState);
long segmentStart = currentParser.getStartNanos() + currentParser.getChunkDuration(); long segmentStart = currentParser.getStartNanos() + currentParser.getChunkDuration();
long filterStart = validStartTime ? disp.startNanos : segmentStart; long filterStart = validStartTime ? disp.startNanos : segmentStart;
long filterEnd = disp.endTime != null ? disp.endNanos : Long.MAX_VALUE; long filterEnd = disp.endTime != null ? disp.endNanos : Long.MAX_VALUE;

View File

@ -64,7 +64,7 @@ public final class EventFileStream extends AbstractEventStream {
@Override @Override
public void close() { public void close() {
setClosed(true); closeParser();
dispatcher().runCloseActions(); dispatcher().runCloseActions();
try { try {
input.close(); input.close();
@ -85,7 +85,7 @@ public final class EventFileStream extends AbstractEventStream {
end = disp.endNanos; end = disp.endNanos;
} }
currentParser = new ChunkParser(input, disp.parserConfiguration); currentParser = new ChunkParser(input, disp.parserConfiguration, parserState);
while (!isClosed()) { while (!isClosed()) {
onMetadata(currentParser); onMetadata(currentParser);
if (currentParser.getStartNanos() > end) { if (currentParser.getStartNanos() > end) {

View File

@ -0,0 +1,38 @@
/*
* Copyright (c) 2021, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package jdk.jfr.internal.consumer;
public final class ParserState {
private volatile boolean closed;
public boolean isClosed() {
return closed;
}
public void close() {
closed = true;
}
}