8240754: Instrument FlowTest.java to provide more debug traces
Reviewed-by: chegar
This commit is contained in:
parent
dc17821807
commit
5c8f935641
@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved.
|
||||
* Copyright (c) 2017, 2020, Oracle and/or its affiliates. All rights reserved.
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
@ -90,26 +90,27 @@ public class FlowTest extends AbstractRandomTest {
|
||||
SSLFlowDelegate sslClient = new SSLFlowDelegate(engineClient, executor, end, looper);
|
||||
// going to measure how long handshake takes
|
||||
final long start = System.currentTimeMillis();
|
||||
sslClient.alpn().whenComplete((String s, Throwable t) -> {
|
||||
var alpnCF = sslClient.alpn()
|
||||
.whenComplete((String s, Throwable t) -> {
|
||||
if (t != null)
|
||||
t.printStackTrace();
|
||||
long endTime = System.currentTimeMillis();
|
||||
alpn = s;
|
||||
System.out.println("ALPN: " + alpn);
|
||||
println("ALPN: " + alpn);
|
||||
long period = (endTime - start);
|
||||
System.out.printf("Handshake took %d ms\n", period);
|
||||
printf("Handshake took %d ms", period);
|
||||
});
|
||||
Subscriber<List<ByteBuffer>> reader = sslClient.upstreamReader();
|
||||
Subscriber<List<ByteBuffer>> writer = sslClient.upstreamWriter();
|
||||
looper.setReturnSubscriber(reader);
|
||||
// now connect all the pieces
|
||||
srcPublisher.subscribe(writer);
|
||||
String aa = sslClient.alpn().join();
|
||||
System.out.println("AAALPN = " + aa);
|
||||
String aa = alpnCF.join();
|
||||
println("AAALPN = " + aa);
|
||||
}
|
||||
|
||||
private void handlePublisherException(Object o, Throwable t) {
|
||||
System.out.println("Src Publisher exception");
|
||||
println("Src Publisher exception");
|
||||
t.printStackTrace(System.out);
|
||||
}
|
||||
|
||||
@ -125,29 +126,43 @@ public class FlowTest extends AbstractRandomTest {
|
||||
@Test
|
||||
public void run() {
|
||||
long count = 0;
|
||||
System.out.printf("Submitting %d buffer arrays\n", COUNTER);
|
||||
System.out.printf("LoopCount should be %d\n", TOTAL_LONGS);
|
||||
printf("Submitting %d buffer arrays", COUNTER);
|
||||
printf("LoopCount should be %d", TOTAL_LONGS);
|
||||
for (long i = 0; i < COUNTER; i++) {
|
||||
ByteBuffer b = getBuffer(count);
|
||||
count += LONGS_PER_BUF;
|
||||
srcPublisher.submit(List.of(b));
|
||||
}
|
||||
System.out.println("Finished submission. Waiting for loopback");
|
||||
println("Finished submission. Waiting for loopback");
|
||||
// make sure we don't wait for allBytesReceived in case of error.
|
||||
completion.whenComplete((r,t) -> allBytesReceived.countDown());
|
||||
var done = completion.whenComplete((r,t) -> {
|
||||
try {
|
||||
if (t != null) {
|
||||
println("Completion with error: " + t);
|
||||
t.printStackTrace(System.out);
|
||||
} else {
|
||||
println("Successful completion");
|
||||
}
|
||||
} finally {
|
||||
println("allBytesReceived.countDown()");
|
||||
allBytesReceived.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
try {
|
||||
allBytesReceived.await();
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
System.out.println("All bytes received: ");
|
||||
println("All bytes received; latch count:"
|
||||
+ allBytesReceived.getCount());
|
||||
srcPublisher.close();
|
||||
try {
|
||||
completion.join();
|
||||
done.join();
|
||||
if (!alpn.equals("proto2")) {
|
||||
throw new RuntimeException("wrong alpn received");
|
||||
}
|
||||
System.out.println("OK");
|
||||
println("OK");
|
||||
} finally {
|
||||
executor.shutdownNow();
|
||||
}
|
||||
@ -177,6 +192,7 @@ public class FlowTest extends AbstractRandomTest {
|
||||
*/
|
||||
static class SSLLoopbackSubscriber implements Subscriber<List<ByteBuffer>> {
|
||||
private final BlockingQueue<ByteBuffer> buffer;
|
||||
private final SSLServerSocket serv;
|
||||
private final Socket clientSock;
|
||||
private final SSLSocket serverSock;
|
||||
private final Thread thread1, thread2, thread3;
|
||||
@ -188,7 +204,7 @@ public class FlowTest extends AbstractRandomTest {
|
||||
ExecutorService exec,
|
||||
CountDownLatch allBytesReceived) throws IOException {
|
||||
SSLServerSocketFactory fac = ctx.getServerSocketFactory();
|
||||
SSLServerSocket serv = (SSLServerSocket) fac.createServerSocket();
|
||||
serv = (SSLServerSocket) fac.createServerSocket();
|
||||
serv.setReuseAddress(false);
|
||||
serv.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
|
||||
SSLParameters params = serv.getSSLParameters();
|
||||
@ -216,7 +232,7 @@ public class FlowTest extends AbstractRandomTest {
|
||||
}
|
||||
|
||||
private void handlePublisherException(Object o, Throwable t) {
|
||||
System.out.println("Loopback Publisher exception");
|
||||
println("Loopback Publisher exception");
|
||||
t.printStackTrace(System.out);
|
||||
}
|
||||
|
||||
@ -227,20 +243,21 @@ public class FlowTest extends AbstractRandomTest {
|
||||
try {
|
||||
InputStream is = clientSock.getInputStream();
|
||||
final int bufsize = FlowTest.randomRange(512, 16 * 1024);
|
||||
System.out.println("clientReader: bufsize = " + bufsize);
|
||||
println("clientReader: bufsize = " + bufsize);
|
||||
while (true) {
|
||||
byte[] buf = new byte[bufsize];
|
||||
int n = is.read(buf);
|
||||
if (n == -1) {
|
||||
System.out.println("clientReader close: read "
|
||||
println("clientReader close: read "
|
||||
+ readCount.get() + " bytes");
|
||||
System.out.println("clientReader: got EOF. "
|
||||
+ "Waiting signal to close publisher.");
|
||||
println("clientReader: got EOF. "
|
||||
+ "Waiting signal to close publisher.");
|
||||
allBytesReceived.await();
|
||||
System.out.println("clientReader: closing publisher");
|
||||
println("clientReader: closing publisher; latch count: "
|
||||
+ allBytesReceived.getCount());
|
||||
publisher.close();
|
||||
sleep(2000);
|
||||
Utils.close(is, clientSock);
|
||||
Utils.close(is, clientSock, serv);
|
||||
return;
|
||||
}
|
||||
ByteBuffer bb = ByteBuffer.wrap(buf, 0, n);
|
||||
@ -248,6 +265,7 @@ public class FlowTest extends AbstractRandomTest {
|
||||
publisher.submit(List.of(bb));
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
println("clientReader failed: " + e);
|
||||
e.printStackTrace();
|
||||
Utils.close(clientSock);
|
||||
}
|
||||
@ -266,9 +284,9 @@ public class FlowTest extends AbstractRandomTest {
|
||||
if (buf == FlowTest.SENTINEL) {
|
||||
// finished
|
||||
//Utils.sleep(2000);
|
||||
System.out.println("clientWriter close: " + nbytes + " written");
|
||||
println("clientWriter close: " + nbytes + " written");
|
||||
clientSock.shutdownOutput();
|
||||
System.out.println("clientWriter close return");
|
||||
println("clientWriter close return");
|
||||
return;
|
||||
}
|
||||
int len = buf.remaining();
|
||||
@ -280,6 +298,7 @@ public class FlowTest extends AbstractRandomTest {
|
||||
clientSubscription.request(1);
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
println("clientWriter failed: " + e);
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
@ -307,14 +326,15 @@ public class FlowTest extends AbstractRandomTest {
|
||||
InputStream is = serverSock.getInputStream();
|
||||
OutputStream os = serverSock.getOutputStream();
|
||||
final int bufsize = FlowTest.randomRange(512, 16 * 1024);
|
||||
System.out.println("serverLoopback: bufsize = " + bufsize);
|
||||
println("serverLoopback: bufsize = " + bufsize);
|
||||
byte[] bb = new byte[bufsize];
|
||||
while (true) {
|
||||
int n = is.read(bb);
|
||||
if (n == -1) {
|
||||
sleep(2000);
|
||||
is.close();
|
||||
serverSock.close();
|
||||
println("Received EOF: closing server socket");
|
||||
Utils.close(is, serverSock, serv);
|
||||
println("Server socket closed");
|
||||
return;
|
||||
}
|
||||
os.write(bb, 0, n);
|
||||
@ -322,6 +342,7 @@ public class FlowTest extends AbstractRandomTest {
|
||||
loopCount.addAndGet(n);
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
println("serverLoopback failed: " + e);
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
@ -376,18 +397,18 @@ public class FlowTest extends AbstractRandomTest {
|
||||
*/
|
||||
static class EndSubscriber implements Subscriber<List<ByteBuffer>> {
|
||||
|
||||
private final long nbytes;
|
||||
private final long nlongs;
|
||||
|
||||
private final AtomicLong counter;
|
||||
private volatile Flow.Subscription subscription;
|
||||
private final CompletableFuture<Void> completion;
|
||||
private final CountDownLatch allBytesReceived;
|
||||
|
||||
EndSubscriber(long nbytes,
|
||||
EndSubscriber(long nlongs,
|
||||
CompletableFuture<Void> completion,
|
||||
CountDownLatch allBytesReceived) {
|
||||
counter = new AtomicLong(0);
|
||||
this.nbytes = nbytes;
|
||||
this.nlongs = nlongs;
|
||||
this.completion = completion;
|
||||
this.allBytesReceived = allBytesReceived;
|
||||
}
|
||||
@ -417,12 +438,16 @@ public class FlowTest extends AbstractRandomTest {
|
||||
|
||||
for (ByteBuffer buf : buffers) {
|
||||
while (buf.hasRemaining()) {
|
||||
if (buf.remaining() % 8 != 0) {
|
||||
completion.completeExceptionally(
|
||||
new AssertionError("Unaligned buffer: " + buf.remaining()));
|
||||
}
|
||||
long n = buf.getLong();
|
||||
//if (currval > (FlowTest.TOTAL_LONGS - 50)) {
|
||||
//System.out.println("End: " + currval);
|
||||
//}
|
||||
if (n != currval++) {
|
||||
System.out.println("ERROR at " + n + " != " + (currval - 1));
|
||||
println("ERROR at " + n + " != " + (currval - 1));
|
||||
completion.completeExceptionally(new RuntimeException("ERROR"));
|
||||
subscription.cancel();
|
||||
return;
|
||||
@ -433,25 +458,24 @@ public class FlowTest extends AbstractRandomTest {
|
||||
counter.set(currval);
|
||||
subscription.request(1);
|
||||
if (currval >= TOTAL_LONGS) {
|
||||
println("allBytesReceived.countDown(): currval=" +currval);
|
||||
allBytesReceived.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable throwable) {
|
||||
allBytesReceived.countDown();
|
||||
completion.completeExceptionally(throwable);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onComplete() {
|
||||
long n = counter.get();
|
||||
if (n != nbytes) {
|
||||
System.out.printf("nbytes=%d n=%d\n", nbytes, n);
|
||||
if (n != nlongs) {
|
||||
printf("Error at end: nlongs=%d n=%d", nlongs, n);
|
||||
completion.completeExceptionally(new RuntimeException("ERROR AT END"));
|
||||
} else {
|
||||
System.out.println("DONE OK: counter = " + n);
|
||||
allBytesReceived.countDown();
|
||||
println("DONE OK: counter = " + n);
|
||||
completion.complete(null);
|
||||
}
|
||||
}
|
||||
@ -464,4 +488,40 @@ public class FlowTest extends AbstractRandomTest {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
static final long START = System.nanoTime();
|
||||
|
||||
static String now() {
|
||||
long now = System.nanoTime() - START;
|
||||
long min = now / 1000_000_000L / 60L;
|
||||
long sec = (now / 1000_000_000L) % 60L;
|
||||
long mil = (now / 1000_000L) % 1000L;
|
||||
long nan = (now % 1000_000L);
|
||||
StringBuilder str = new StringBuilder();
|
||||
if (min != 0) {
|
||||
str = str.append(min).append("m ");
|
||||
}
|
||||
if (sec != 0) {
|
||||
str = str.append(sec).append("s ");
|
||||
}
|
||||
if (mil != 0) {
|
||||
str.append(mil).append("ms ");
|
||||
}
|
||||
if (nan != 0) {
|
||||
str.append(nan).append("ns ");
|
||||
}
|
||||
assert now == min * 60L * 1000_000_000L
|
||||
+ sec * 1000_000_000L
|
||||
+ mil * 1000_000L + nan;
|
||||
return str.toString().trim();
|
||||
}
|
||||
|
||||
static void printf(String fmt, Object... args) {
|
||||
println(String.format(fmt, args));
|
||||
}
|
||||
|
||||
static void println(String msg) {
|
||||
System.out.println("[" + Thread.currentThread() + "] [" + now() + "] " + msg);
|
||||
}
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user