8295350: JFR: Add stop methods for recording streams

Reviewed-by: mgronlun
This commit is contained in:
Erik Gahlin 2022-12-01 15:01:42 +00:00
parent 9430f3e65c
commit eec24aa203
12 changed files with 643 additions and 5 deletions
src
jdk.jfr/share/classes/jdk/jfr
jdk.management.jfr/share/classes/jdk/management/jfr
test/jdk/jdk/jfr
api/consumer/recordingstream
jmx/streaming

@ -48,6 +48,7 @@ import jdk.jfr.internal.PrivateAccess;
import jdk.jfr.internal.SecuritySupport;
import jdk.jfr.internal.Utils;
import jdk.jfr.internal.consumer.EventDirectoryStream;
import jdk.jfr.internal.management.StreamBarrier;
/**
* A recording stream produces events from the current JVM (Java Virtual
@ -380,6 +381,43 @@ public final class RecordingStream implements AutoCloseable, EventStream {
directoryStream.startAsync(startNanos);
}
/**
* Stops the recording stream.
* <p>
* Stops a started stream and waits until all events in the recording have
* been consumed.
* <p>
* Invoking this method in an action, for example in the
* {@link #onEvent(Consumer)} method, could block the stream indefinitely.
* To stop the stream abruptly, use the {@link #close} method.
* <p>
* The following code snippet illustrates how this method can be used in
* conjunction with the {@link #startAsync()} method to monitor what happens
* during a test method:
* <p>
* {@snippet class="Snippets" region="RecordingStreamStop"}
*
* @return {@code true} if recording is stopped, {@code false} otherwise
*
* @throws IllegalStateException if the recording is not started or is already stopped
*
* @since 20
*/
public boolean stop() {
boolean stopped = false;
try {
try (StreamBarrier sb = directoryStream.activateStreamBarrier()) {
stopped = recording.stop();
directoryStream.setCloseOnComplete(false);
sb.setStreamEnd(recording.getStopTime().toEpochMilli());
}
directoryStream.awaitTermination();
} catch (InterruptedException | IOException e) {
// OK, return
}
return stopped;
}
/**
* Writes recording data to a file.
* <p>

@ -28,6 +28,7 @@ import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import jdk.jfr.consumer.EventStream;
import jdk.jfr.consumer.RecordingFile;
@ -166,4 +167,26 @@ public class Snippets {
}
// @end
}
void RecordingStreamStop() throws Exception {
// @start region="RecordingStreamStop"
AtomicBoolean socketUse = new AtomicBoolean();
try (var r = new RecordingStream()) {
r.setMaxSize(Long.MAX_VALUE);
r.enable("jdk.SocketWrite").withoutThreshold();
r.enable("jdk.SocketRead").withoutThreshold();
r.onEvent(event -> socketUse.set(true));
r.startAsync();
testFoo();
r.stop();
if (socketUse.get()) {
r.dump(Path.of("socket-events.jfr"));
throw new AssertionError("testFoo() should not use network");
}
}
// @end
}
void testFoo() {
}
}

@ -62,9 +62,11 @@ public abstract class AbstractEventStream implements EventStream {
private final StreamConfiguration streamConfiguration = new StreamConfiguration();
private final List<Configuration> configurations;
private final ParserState parserState = new ParserState();
private volatile boolean closeOnComplete = true;
private Dispatcher dispatcher;
private boolean daemon = false;
AbstractEventStream(@SuppressWarnings("removal") AccessControlContext acc, List<Configuration> configurations) throws IOException {
this.accessControllerContext = Objects.requireNonNull(acc);
this.configurations = configurations;
@ -107,6 +109,13 @@ public abstract class AbstractEventStream implements EventStream {
this.daemon = daemon;
}
// When set to false, it becomes the callers responsibility
// to invoke close() and clean up resources. By default,
// the resource is cleaned up when the process() call has finished.
public final void setCloseOnComplete(boolean closeOnComplete) {
this.closeOnComplete = closeOnComplete;
}
@Override
public final void setStartTime(Instant startTime) {
Objects.requireNonNull(startTime, "startTime");
@ -258,7 +267,9 @@ public abstract class AbstractEventStream implements EventStream {
} finally {
Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Execution of stream ended.");
try {
close();
if (closeOnComplete) {
close();
}
} finally {
terminated.countDown();
}

@ -41,6 +41,7 @@ import jdk.jfr.internal.JVM;
import jdk.jfr.internal.PlatformRecording;
import jdk.jfr.internal.SecuritySupport;
import jdk.jfr.internal.Utils;
import jdk.jfr.internal.management.StreamBarrier;
/**
* Implementation of an {@code EventStream}} that operates against a directory
@ -54,11 +55,11 @@ public final class EventDirectoryStream extends AbstractEventStream {
private final RepositoryFiles repositoryFiles;
private final FileAccess fileAccess;
private final PlatformRecording recording;
private final StreamBarrier barrier = new StreamBarrier();
private ChunkParser currentParser;
private long currentChunkStartNanos;
private RecordedEvent[] sortedCache;
private int threadExclusionLevel = 0;
private volatile Consumer<Long> onCompleteHandler;
public EventDirectoryStream(
@ -150,7 +151,6 @@ public final class EventDirectoryStream extends AbstractEventStream {
long segmentStart = currentParser.getStartNanos() + currentParser.getChunkDuration();
long filterStart = validStartTime ? disp.startNanos : segmentStart;
long filterEnd = disp.endTime != null ? disp.endNanos : Long.MAX_VALUE;
while (!isClosed()) {
onMetadata(currentParser);
while (!isClosed() && !currentParser.isChunkFinished()) {
@ -166,8 +166,14 @@ public final class EventDirectoryStream extends AbstractEventStream {
processUnordered(disp);
}
currentParser.resetCache();
if (currentParser.getStartNanos() + currentParser.getChunkDuration() > filterEnd) {
close();
barrier.check(); // block if recording is being stopped
long endNanos = currentParser.getStartNanos() + currentParser.getChunkDuration();
// same conversion as in RecordingInfo
long endMillis = Instant.ofEpochSecond(0, endNanos).toEpochMilli();
if (barrier.getStreamEnd() <= endMillis) {
return;
}
if (endNanos > filterEnd) {
return;
}
}
@ -205,6 +211,7 @@ public final class EventDirectoryStream extends AbstractEventStream {
}
}
private boolean isLastChunk() {
if (!isRecording()) {
return false;
@ -259,4 +266,9 @@ public final class EventDirectoryStream extends AbstractEventStream {
c.dispatch(e);
}
}
public StreamBarrier activateStreamBarrier() {
barrier.activate();
return barrier;
}
}

@ -51,6 +51,7 @@ import jdk.jfr.internal.PrivateAccess;
import jdk.jfr.internal.SecuritySupport.SafePath;
import jdk.jfr.internal.Utils;
import jdk.jfr.internal.WriteableUserPath;
import jdk.jfr.internal.consumer.AbstractEventStream;
import jdk.jfr.internal.consumer.EventDirectoryStream;
import jdk.jfr.internal.consumer.FileAccess;
import jdk.jfr.internal.instrument.JDKEvents;
@ -178,4 +179,18 @@ public final class ManagementSupport {
false
);
}
// An EventStream is passive, so a stop() method doesn't fit well in the API.
// RemoteRecordingStream::stop() implementation need to prevent stream
// from being closed, so this method is needed
public static void setCloseOnComplete(EventStream stream, boolean closeOnComplete) {
AbstractEventStream aes = (AbstractEventStream) stream;
aes.setCloseOnComplete(closeOnComplete);
}
// Internal method needed to block parser
public static StreamBarrier activateStreamBarrier(EventStream stream) {
EventDirectoryStream aes = (EventDirectoryStream) stream;
return aes.activateStreamBarrier();
}
}

@ -0,0 +1,74 @@
/*
* Copyright (c) 2022, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package jdk.jfr.internal.management;
import java.io.Closeable;
import java.io.IOException;
/**
* Purpose of this class is to provide a synchronization point when stopping a
* recording. Without it, a race can happen where a stream advances beyond the
* last chunk of the recording.
*
* Code that is processing the stream calls check() and Unless the recording is
* in the process of being stopped, it will just return. On the other hand, if
* the recording is stopping, the thread waits and when it wakes up an end
* position should have been set (last chunk position) beyond which the stream
* processing should not continue.
*/
public final class StreamBarrier implements Closeable {
private boolean activated = false;
private long end = Long.MAX_VALUE;
// Blocks thread until barrier is deactivated
public synchronized void check() {
while (activated) {
try {
this.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public synchronized void setStreamEnd(long timestamp) {
end = timestamp;
}
public synchronized long getStreamEnd() {
return end;
}
public synchronized void activate() {
activated = true;
}
@Override
public synchronized void close() throws IOException {
activated = false;
this.notifyAll();
}
}

@ -49,6 +49,7 @@ import java.util.Queue;
import jdk.jfr.internal.management.ChunkFilename;
import jdk.jfr.internal.management.ManagementSupport;
import jdk.jfr.internal.management.StreamBarrier;
final class DiskRepository implements Closeable {
@ -126,6 +127,7 @@ final class DiskRepository implements Closeable {
private final ByteBuffer buffer = ByteBuffer.allocate(256);
private final Path directory;
private final ChunkFilename chunkFilename;
private final StreamBarrier barrier = new StreamBarrier();
private RandomAccessFile raf;
private RandomAccessFile previousRAF;
@ -153,6 +155,7 @@ final class DiskRepository implements Closeable {
}
public synchronized void write(byte[] bytes) throws IOException {
barrier.check();
index = 0;
lastFlush = 0;
currentByteArray = bytes;
@ -345,6 +348,10 @@ final class DiskRepository implements Closeable {
long endTimeNanos = currentChunk.startTimeNanos + durationNanos;
currentChunk.endTimeNanos = endTimeNanos;
currentChunk.endTime = ManagementSupport.epochNanosToInstant(endTimeNanos);
if (currentChunk.endTime.toEpochMilli() == barrier.getStreamEnd()) {
// Recording has been stopped, need to complete last chunk
completePrevious(currentChunk);
}
}
raf.seek(position);
}
@ -512,4 +519,9 @@ final class DiskRepository implements Closeable {
}
return fd;
}
public StreamBarrier activateStreamBarrier() {
barrier.activate();
return barrier;
}
}

@ -60,6 +60,7 @@ import jdk.jfr.consumer.RecordedEvent;
import jdk.jfr.consumer.RecordingStream;
import jdk.jfr.internal.management.EventSettingsModifier;
import jdk.jfr.internal.management.ManagementSupport;
import jdk.jfr.internal.management.StreamBarrier;
import jdk.management.jfr.DiskRepository.DiskChunk;
import jdk.jfr.internal.management.EventByteStream;
@ -560,6 +561,75 @@ public final class RemoteRecordingStream implements EventStream {
}
}
/**
* Stops the recording stream.
* <p>
* Stops a started stream and waits until all events in the recording have
* been consumed.
* <p>
* Invoking this method in an action, for example in the
* {@link #onEvent(Consumer)} method, could block the stream indefinitely.
* To stop the stream abruptly, use the {@link #close} method.
* <p>
* The following code snippet illustrates how this method can be used in
* conjunction with the {@link #startAsync()} method to monitor what happens
* during a test method:
* <p>
* {@snippet :
* AtomicLong bytesWritten = new AtomicLong();
* try (var r = new RemoteRecordingStream(connection)) {
* r.setMaxSize(Long.MAX_VALUE);
* r.enable("jdk.FileWrite").withoutThreshold();
* r.onEvent(event ->
* bytesWritten.addAndGet(event.getLong("bytesWritten"))
* );
* r.startAsync();
* testFoo();
* r.stop();
* if (bytesWritten.get() > 1_000_000L) {
* r.dump(Path.of("file-write-events.jfr"));
* throw new AssertionError("testFoo() writes too much data to disk");
* }
* }
* }
* @return {@code true} if recording is stopped, {@code false} otherwise
*
* @throws IllegalStateException if the recording is not started or is already stopped
*
* @since 20
*/
public boolean stop() {
synchronized (lock) {
if (closed) {
throw new IllegalStateException("Event stream is closed");
}
if (!started) {
throw new IllegalStateException("Event stream must be started before it can stopped");
}
try {
boolean stopped = false;
try (StreamBarrier pb = ManagementSupport.activateStreamBarrier(stream)) {
try (StreamBarrier rb = repository.activateStreamBarrier()) {
stopped = mbean.stopRecording(recordingId);
ManagementSupport.setCloseOnComplete(stream, false);
long stopTime = getRecordingInfo(mbean.getRecordings(), recordingId).getStopTime();
pb.setStreamEnd(stopTime);
rb.setStreamEnd(stopTime);
}
}
try {
stream.awaitTermination();
} catch (InterruptedException e) {
// OK
}
return stopped;
} catch (Exception e) {
ManagementSupport.logDebug(e.getMessage());
return false;
}
}
}
private void ensureStartable() {
synchronized (lock) {
if (closed) {

@ -27,6 +27,11 @@ import jdk.jfr.api.consumer.recordingstream.TestStart.StartEvent;
class EventProducer extends Thread {
private final Object lock = new Object();
private boolean killed = false;
public EventProducer() {
super("Event Producer");
}
public void run() {
while (true) {
StartEvent s = new StartEvent();

@ -54,6 +54,7 @@ public class TestDump {
public static void main(String... args) throws Exception {
testUnstarted();
testStopped();
testClosed();
testOneDump();
testMultipleDumps();
@ -72,6 +73,22 @@ public class TestDump {
}
}
private static void testStopped() throws Exception {
Path path = Path.of("recording.jfr");
try (var rs = new RecordingStream()) {
rs.setMaxAge(Duration.ofHours(1));
rs.startAsync();
DumpEvent event = new DumpEvent();
event.commit();
rs.stop();
rs.dump(path);
var events = RecordingFile.readAllEvents(path);
if (events.size() != 1) {
throw new Exception("Expected one event");
}
}
}
private static void testClosed() throws Exception {
Path path = Path.of("recording.jfr");
var rs = new RecordingStream();

@ -0,0 +1,176 @@
/*
* Copyright (c) 2022, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package jdk.jfr.api.consumer.recordingstream;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import jdk.jfr.Event;
import jdk.jfr.consumer.RecordedEvent;
import jdk.jfr.consumer.RecordingFile;
import jdk.jfr.consumer.RecordingStream;
/**
* @test
* @summary Tests RecordingStream::stop()
* @key jfr
* @requires vm.hasJFR
* @library /test/lib /test/jdk
* @build jdk.jfr.api.consumer.recordingstream.EventProducer
* @run main/othervm jdk.jfr.api.consumer.recordingstream.TestStop
*/
public class TestStop {
static class StopEvent extends Event {
}
static class MarkEvent extends Event {
String id;
}
public static void main(String... args) throws Exception {
testStopUnstarted();
testStop();
testStopFromOtherThread();
testNestedStop();
testStopClosed();
}
private static void testStopUnstarted() {
try (RecordingStream rs = new RecordingStream()) {
try {
rs.stop();
throw new AssertionError("Expected IllegalStateException");
} catch (IllegalStateException ise) {
// OK, as expected.
}
}
}
static void testStop() throws Exception {
// Check that all events emitted prior to
// stop() can be consumed
// Check that events are not consumer after stop()
List<RecordedEvent> events = new ArrayList<>();
try (RecordingStream rs = new RecordingStream()) {
rs.onEvent(events::add);
rs.startAsync();
for (int i = 0; i < 100; i++) {
StopEvent s = new StopEvent();
s.commit();
}
rs.stop();
if (events.size() != 100) {
throw new AssertionError("Expected 100 events");
}
for (int i = 0; i < 100; i++) {
StopEvent s = new StopEvent();
s.commit();
}
if (events.size() != 100) {
throw new AssertionError("Expected 100 events");
}
}
}
private static void testStopFromOtherThread() throws Exception {
try (RecordingStream rs = new RecordingStream()) {
Thread t = new Thread(() -> rs.stop());
rs.startAsync();
t.start();
rs.awaitTermination();
t.join();
}
}
private static void testNestedStop() throws Exception {
AtomicLong outerCount = new AtomicLong();
AtomicLong innerCount = new AtomicLong();
try (RecordingStream outer = new RecordingStream()) {
outer.onEvent(e -> outerCount.incrementAndGet());
outer.setMaxSize(100_000_000);
outer.startAsync();
MarkEvent a = new MarkEvent();
a.id = "a";
a.commit();
try (RecordingStream inner = new RecordingStream()) {
inner.setMaxSize(100_000_000);
inner.onEvent(e -> innerCount.incrementAndGet());
inner.startAsync();
MarkEvent b = new MarkEvent();
b.id = "b";
b.commit();
inner.stop();
MarkEvent c = new MarkEvent();
c.id = "c";
c.commit();
outer.stop();
Path fileOuter = Path.of("outer.jfr");
Path fileInner = Path.of("inner.jfr");
inner.dump(fileInner);
outer.dump(fileOuter);
System.out.println("RecordingStream outer:");
var dumpOuter = RecordingFile.readAllEvents(fileOuter);
System.out.println(dumpOuter);
System.out.println("RecordingStream inner:");
var dumpInner = RecordingFile.readAllEvents(fileInner);
System.out.println(dumpInner);
System.out.println("Outer count: " + outerCount);
System.out.println("Inner count: " + innerCount);
if (dumpOuter.size() != 3) {
throw new AssertionError("Expected outer dump to have 3 events");
}
if (outerCount.get() == 3) {
throw new AssertionError("Expected outer stream to have 3 events");
}
if (dumpInner.size() != 1) {
throw new AssertionError("Expected inner dump to have 1 event");
}
if (innerCount.get() != 1) {
throw new AssertionError("Expected inner stream to have 1 event");
}
}
}
}
static void testStopClosed() {
try (RecordingStream rs = new RecordingStream()) {
rs.close();
try {
rs.stop();
throw new AssertionError("Expected IllegalStateException");
} catch (IllegalStateException ise) {
// OK, as expected.
}
}
}
}

@ -0,0 +1,185 @@
/*
* Copyright (c) 2022, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package jdk.jfr.jmx.streaming;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import jdk.jfr.Event;
import jdk.jfr.consumer.RecordedEvent;
import jdk.jfr.consumer.RecordingFile;
import jdk.jfr.consumer.RecordingStream;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import javax.management.MBeanServerConnection;
import jdk.management.jfr.RemoteRecordingStream;
/**
* @test
* @summary Tests RemoteRecordingStream::stop()
* @key jfr
* @requires vm.hasJFR
* @library /test/lib /test/jdk
* @build jdk.jfr.api.consumer.recordingstream.EventProducer
* @run main/othervm jdk.jfr.jmx.streaming.TestStop
*/
public class TestStop {
private static final MBeanServerConnection CONNECTION = ManagementFactory.getPlatformMBeanServer();
static class StopEvent extends Event {
}
static class MarkEvent extends Event {
String id;
}
public static void main(String... args) throws Exception {
testStopUnstarted();
testStop();
testStopFromOtherThread();
testNestedStop();
testStopClosed();
}
private static void testStopUnstarted() throws Exception {
try (var rs = new RemoteRecordingStream(CONNECTION)) {
try {
rs.stop();
throw new AssertionError("Expected IllegalStateException");
} catch (IllegalStateException ise) {
// OK, as expected.
}
}
}
static void testStop() throws Exception {
// Check that all events emitted prior to
// stop() can be consumed
// Check that events are not consumed after stop()
List<RecordedEvent> events = new ArrayList<>();
try (var rs = new RemoteRecordingStream(CONNECTION)) {
rs.onEvent(e -> {
events.add(e);
});
rs.startAsync();
for (int i = 0; i < 100; i++) {
StopEvent s = new StopEvent();
s.commit();
}
rs.stop();
if (events.size() != 100) {
throw new AssertionError("Expected 100 events");
}
for (int i = 0; i < 100; i++) {
StopEvent s = new StopEvent();
s.commit();
}
if (events.size() != 100) {
throw new AssertionError("Expected 100 events");
}
}
}
private static void testStopFromOtherThread() throws Exception {
try (var rs = new RemoteRecordingStream(CONNECTION)) {
Thread t = new Thread(() -> rs.stop());
rs.startAsync();
t.start();
rs.awaitTermination();
t.join();
}
}
private static void testNestedStop() throws Exception {
AtomicLong outerCount = new AtomicLong();
AtomicLong innerCount = new AtomicLong();
try (var outer = new RemoteRecordingStream(CONNECTION)) {
outer.onEvent(e -> outerCount.incrementAndGet());
outer.setMaxSize(100_000_000);
outer.startAsync();
MarkEvent a = new MarkEvent();
a.id = "a";
a.commit();
try (var inner = new RemoteRecordingStream(CONNECTION)) {
inner.setMaxSize(100_000_000);
inner.onEvent(e -> innerCount.incrementAndGet());
inner.startAsync();
MarkEvent b = new MarkEvent();
b.id = "b";
b.commit();
inner.stop();
MarkEvent c = new MarkEvent();
c.id = "c";
c.commit();
outer.stop();
Path fileOuter = Path.of("outer.jfr");
Path fileInner = Path.of("inner.jfr");
inner.dump(fileInner);
outer.dump(fileOuter);
System.out.println("RecordingStream outer:");
var dumpOuter = RecordingFile.readAllEvents(fileOuter);
System.out.println(dumpOuter);
System.out.println("RecordingStream inner:");
var dumpInner = RecordingFile.readAllEvents(fileInner);
System.out.println(dumpInner);
System.out.println("Outer count: " + outerCount);
System.out.println("Inner count: " + innerCount);
if (dumpOuter.size() != 3) {
throw new AssertionError("Expected outer dump to have 3 events");
}
if (outerCount.get() == 3) {
throw new AssertionError("Expected outer stream to have 3 events");
}
if (dumpInner.size() != 1) {
throw new AssertionError("Expected inner dump to have 1 event");
}
if (innerCount.get() != 1) {
throw new AssertionError("Expected inner stream to have 1 event");
}
}
}
}
static void testStopClosed() throws Exception {
try (var rs = new RemoteRecordingStream(CONNECTION)) {
rs.close();
try {
rs.stop();
throw new AssertionError("Expected IllegalStateException");
} catch (IllegalStateException ise) {
// OK, as expected.
}
}
}
}