8234671: JFR api/consumer/recordingstream/TestStart.java failed due to timeout at testStartTwice()

Reviewed-by: mgronlun
This commit is contained in:
Erik Gahlin 2019-11-29 17:31:01 +01:00
parent 7b02e24b65
commit b3ea416824
6 changed files with 134 additions and 128 deletions

View File

@ -76,9 +76,10 @@ abstract class AbstractEventStream implements EventStream {
abstract public void close();
protected final Dispatcher dispatcher() {
if (configuration.hasChanged()) {
if (configuration.hasChanged()) { // quick check
synchronized (configuration) {
dispatcher = new Dispatcher(configuration);
configuration.setChanged(false);
}
}
return dispatcher;

View File

@ -190,44 +190,40 @@ public final class ChunkParser {
*
* @param awaitNewEvents wait for new data.
*/
RecordedEvent readStreamingEvent(boolean awaitNewEvents) throws IOException {
RecordedEvent readStreamingEvent() throws IOException {
long absoluteChunkEnd = chunkHeader.getEnd();
while (true) {
RecordedEvent event = readEvent();
if (event != null) {
return event;
}
if (!awaitNewEvents) {
return null;
}
long lastValid = absoluteChunkEnd;
long metadataPoistion = chunkHeader.getMetataPosition();
long contantPosition = chunkHeader.getConstantPoolPosition();
chunkFinished = awaitUpdatedHeader(absoluteChunkEnd, configuration.filterEnd);
if (chunkFinished) {
Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "At chunk end");
return null;
}
absoluteChunkEnd = chunkHeader.getEnd();
// Read metadata and constant pools for the next segment
if (chunkHeader.getMetataPosition() != metadataPoistion) {
Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Found new metadata in chunk. Rebuilding types and parsers");
MetadataDescriptor metadata = chunkHeader.readMetadata(previousMetadata);
ParserFactory factory = new ParserFactory(metadata, constantLookups, timeConverter);
parsers = factory.getParsers();
typeMap = factory.getTypeMap();
updateConfiguration();;
}
if (contantPosition != chunkHeader.getConstantPoolPosition()) {
Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Found new constant pool data. Filling up pools with new values");
constantLookups.forEach(c -> c.getLatestPool().setAllResolved(false));
fillConstantPools(contantPosition + chunkHeader.getAbsoluteChunkStart());
constantLookups.forEach(c -> c.getLatestPool().setResolving());
constantLookups.forEach(c -> c.getLatestPool().resolve());
constantLookups.forEach(c -> c.getLatestPool().setResolved());
}
input.position(lastValid);
RecordedEvent event = readEvent();
if (event != null) {
return event;
}
long lastValid = absoluteChunkEnd;
long metadataPosition = chunkHeader.getMetataPosition();
long contantPosition = chunkHeader.getConstantPoolPosition();
chunkFinished = awaitUpdatedHeader(absoluteChunkEnd, configuration.filterEnd);
if (chunkFinished) {
Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "At chunk end");
return null;
}
absoluteChunkEnd = chunkHeader.getEnd();
// Read metadata and constant pools for the next segment
if (chunkHeader.getMetataPosition() != metadataPosition) {
Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Found new metadata in chunk. Rebuilding types and parsers");
MetadataDescriptor metadata = chunkHeader.readMetadata(previousMetadata);
ParserFactory factory = new ParserFactory(metadata, constantLookups, timeConverter);
parsers = factory.getParsers();
typeMap = factory.getTypeMap();
updateConfiguration();
}
if (contantPosition != chunkHeader.getConstantPoolPosition()) {
Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Found new constant pool data. Filling up pools with new values");
constantLookups.forEach(c -> c.getLatestPool().setAllResolved(false));
fillConstantPools(contantPosition + chunkHeader.getAbsoluteChunkStart());
constantLookups.forEach(c -> c.getLatestPool().setResolving());
constantLookups.forEach(c -> c.getLatestPool().resolve());
constantLookups.forEach(c -> c.getLatestPool().setResolved());
}
input.position(lastValid);
return null;
}
/**

View File

@ -105,8 +105,8 @@ public class EventDirectoryStream extends AbstractEventStream {
}
protected void processRecursionSafe() throws IOException {
Dispatcher lastDisp = null;
Dispatcher disp = dispatcher();
Path path;
boolean validStartTime = recording != null || disp.startTime != null;
if (validStartTime) {
@ -125,18 +125,20 @@ public class EventDirectoryStream extends AbstractEventStream {
long filterEnd = disp.endTime != null ? disp.endNanos: Long.MAX_VALUE;
while (!isClosed()) {
boolean awaitnewEvent = false;
while (!isClosed() && !currentParser.isChunkFinished()) {
disp = dispatcher();
ParserConfiguration pc = disp.parserConfiguration;
pc.filterStart = filterStart;
pc.filterEnd = filterEnd;
currentParser.updateConfiguration(pc, true);
currentParser.setFlushOperation(getFlushOperation());
if (pc.isOrdered()) {
awaitnewEvent = processOrdered(disp, awaitnewEvent);
if (disp != lastDisp) {
ParserConfiguration pc = disp.parserConfiguration;
pc.filterStart = filterStart;
pc.filterEnd = filterEnd;
currentParser.updateConfiguration(pc, true);
currentParser.setFlushOperation(getFlushOperation());
lastDisp = disp;
}
if (disp.parserConfiguration.isOrdered()) {
processOrdered(disp);
} else {
awaitnewEvent = processUnordered(disp, awaitnewEvent);
processUnordered(disp);
}
if (currentParser.getStartNanos() + currentParser.getChunkDuration() > filterEnd) {
close();
@ -182,29 +184,24 @@ public class EventDirectoryStream extends AbstractEventStream {
return recording.getFinalChunkStartNanos() >= currentParser.getStartNanos();
}
private boolean processOrdered(Dispatcher c, boolean awaitNewEvents) throws IOException {
private void processOrdered(Dispatcher c) throws IOException {
if (sortedCache == null) {
sortedCache = new RecordedEvent[100_000];
}
int index = 0;
while (true) {
RecordedEvent e = currentParser.readStreamingEvent(awaitNewEvents);
RecordedEvent e = currentParser.readStreamingEvent();
if (e == null) {
// wait for new event with next call to
// readStreamingEvent()
awaitNewEvents = true;
break;
}
awaitNewEvents = false;
if (index == sortedCache.length) {
sortedCache = Arrays.copyOf(sortedCache, sortedCache.length * 2);
}
sortedCache[index++] = e;
}
// no events found
if (index == 0 && currentParser.isChunkFinished()) {
return awaitNewEvents;
return;
}
// at least 2 events, sort them
if (index > 1) {
@ -213,12 +210,12 @@ public class EventDirectoryStream extends AbstractEventStream {
for (int i = 0; i < index; i++) {
c.dispatch(sortedCache[i]);
}
return awaitNewEvents;
return;
}
private boolean processUnordered(Dispatcher c, boolean awaitNewEvents) throws IOException {
private boolean processUnordered(Dispatcher c) throws IOException {
while (true) {
RecordedEvent e = currentParser.readStreamingEvent(awaitNewEvents);
RecordedEvent e = currentParser.readStreamingEvent();
if (e == null) {
return true;
} else {

View File

@ -121,4 +121,8 @@ final class StreamConfiguration {
public boolean hasChanged() {
return changed;
}
public void setChanged(boolean changed) {
this.changed = changed;
}
}

View File

@ -26,9 +26,7 @@
package jdk.jfr.api.consumer.recordingstream;
import java.time.Instant;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import jdk.jfr.Event;
@ -41,7 +39,7 @@ import jdk.jfr.consumer.RecordingStream;
* @summary Tests RecordingStream::close()
* @key jfr
* @requires vm.hasJFR
* @library /test/lib
* @library /test/lib /test/jdk
* @run main/othervm jdk.jfr.api.consumer.recordingstream.TestClose
*/
public class TestClose {
@ -58,96 +56,82 @@ public class TestClose {
testCloseNoEvents();
}
private static void testCloseMySelf() throws Exception {
log("Entering testCloseMySelf()");
CountDownLatch l1 = new CountDownLatch(1);
CountDownLatch l2 = new CountDownLatch(1);
RecordingStream r = new RecordingStream();
r.onEvent(e -> {
try {
l1.await();
r.close();
l2.countDown();
} catch (InterruptedException ie) {
throw new Error(ie);
}
});
r.startAsync();
CloseEvent c = new CloseEvent();
c.commit();
l1.countDown();
l2.await();
log("Leaving testCloseMySelf()");
}
private static void testCloseUnstarted() {
System.out.println("testCloseUnstarted()");
private static void testCloseStreaming() throws Exception {
log("Entering testCloseStreaming()");
CountDownLatch streaming = new CountDownLatch(1);
RecordingStream r = new RecordingStream();
AtomicLong count = new AtomicLong();
r.onEvent(e -> {
if (count.incrementAndGet() > 100) {
streaming.countDown();
}
});
r.startAsync();
var streamingLoop = CompletableFuture.runAsync(() -> {
while (true) {
CloseEvent c = new CloseEvent();
c.commit();
}
});
streaming.await();
r.close();
streamingLoop.cancel(true);
log("Leaving testCloseStreaming()");
try (RecordingStream r = new RecordingStream()) {
r.close();
}
}
private static void testCloseStarted() {
log("Entering testCloseStarted()");
RecordingStream r = new RecordingStream();
r.startAsync();
r.close();
log("Leaving testCloseStarted()");
}
System.out.println("testCloseStarted()");
private static void testCloseUnstarted() {
log("Entering testCloseUnstarted()");
RecordingStream r = new RecordingStream();
r.close();
log("Leaving testCloseUnstarted()");
try (RecordingStream r = new RecordingStream()) {
r.startAsync();
} // <- Close
}
private static void testCloseTwice() {
log("Entering testCloseTwice()");
RecordingStream r = new RecordingStream();
r.startAsync();
r.close();
r.close();
log("Leaving testCloseTwice()");
System.out.println("Entering testCloseTwice()");
try (RecordingStream r = new RecordingStream()) {
r.startAsync();
r.close();
} // <- Second close
}
private static void testCloseStreaming() throws Exception {
System.out.println("Entering testCloseStreaming()");
EventProducer p = new EventProducer();
p.start();
CountDownLatch streaming = new CountDownLatch(1);
try (RecordingStream r = new RecordingStream()) {
r.onEvent(e -> {
streaming.countDown();
});
r.startAsync();
streaming.await();
} // <- Close
p.kill();
}
private static void testCloseMySelf() throws Exception {
System.out.println("testCloseMySelf()");
CountDownLatch closed = new CountDownLatch(1);
try (RecordingStream r = new RecordingStream()) {
r.onEvent(e -> {
r.close(); // <- Close
closed.countDown();
});
r.startAsync();
CloseEvent c = new CloseEvent();
c.commit();
closed.await();
}
}
private static void testCloseNoEvents() throws Exception {
System.out.println("testCloseNoEvents()");
try (Recording r = new Recording()) {
r.start();
CountDownLatch finished = new CountDownLatch(2);
AtomicReference<Thread> streamingThread = new AtomicReference<>();
try (EventStream es = EventStream.openRepository()) {
es.setStartTime(Instant.EPOCH);
es.onFlush( () -> {
es.onFlush(() -> {
streamingThread.set(Thread.currentThread());
finished.countDown();;
finished.countDown();
});
es.startAsync();
finished.await();
} // <- EventStream::close should terminate thread
} // <- Close should terminate thread
while (streamingThread.get().isAlive()) {
Thread.sleep(10);
}
}
}
private static void log(String msg) {
System.out.println(msg);
}
}

View File

@ -36,7 +36,7 @@ import jdk.jfr.consumer.RecordingStream;
* @summary Tests RecordingStream::onEvent(...)
* @key jfr
* @requires vm.hasJFR
* @library /test/lib
* @library /test/lib /test/jdk
* @run main/othervm jdk.jfr.api.consumer.recordingstream.TestOnEvent
*/
public class TestOnEvent {
@ -58,6 +58,7 @@ public class TestOnEvent {
testOnEvent();
testNamedEvent();
testTwoEventWithSameName();
testOnEventAfterStart();
}
private static void testOnEventNull() {
@ -149,6 +150,29 @@ public class TestOnEvent {
log("Leaving testOnEvent()");
}
private static void testOnEventAfterStart() {
try (RecordingStream r = new RecordingStream()) {
EventProducer p = new EventProducer();
p.start();
Thread addHandler = new Thread(() -> {
r.onEvent(e -> {
// Got event, close stream
r.close();
});
});
r.onFlush(() -> {
// Only add handler once
if (!"started".equals(addHandler.getName())) {
addHandler.setName("started");
addHandler.start();
}
});
r.start();
p.kill();
}
}
private static void log(String msg) {
System.out.println(msg);
}