diff --git a/test/jdk/java/net/httpclient/whitebox/java.net.http/jdk/internal/net/http/FlowTest.java b/test/jdk/java/net/httpclient/whitebox/java.net.http/jdk/internal/net/http/FlowTest.java index e6188bf4bb8..215f591dd55 100644 --- a/test/jdk/java/net/httpclient/whitebox/java.net.http/jdk/internal/net/http/FlowTest.java +++ b/test/jdk/java/net/httpclient/whitebox/java.net.http/jdk/internal/net/http/FlowTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2017, 2020, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -90,26 +90,27 @@ public class FlowTest extends AbstractRandomTest { SSLFlowDelegate sslClient = new SSLFlowDelegate(engineClient, executor, end, looper); // going to measure how long handshake takes final long start = System.currentTimeMillis(); - sslClient.alpn().whenComplete((String s, Throwable t) -> { + var alpnCF = sslClient.alpn() + .whenComplete((String s, Throwable t) -> { if (t != null) t.printStackTrace(); long endTime = System.currentTimeMillis(); alpn = s; - System.out.println("ALPN: " + alpn); + println("ALPN: " + alpn); long period = (endTime - start); - System.out.printf("Handshake took %d ms\n", period); + printf("Handshake took %d ms", period); }); Subscriber> reader = sslClient.upstreamReader(); Subscriber> writer = sslClient.upstreamWriter(); looper.setReturnSubscriber(reader); // now connect all the pieces srcPublisher.subscribe(writer); - String aa = sslClient.alpn().join(); - System.out.println("AAALPN = " + aa); + String aa = alpnCF.join(); + println("AAALPN = " + aa); } private void handlePublisherException(Object o, Throwable t) { - System.out.println("Src Publisher exception"); + println("Src Publisher exception"); t.printStackTrace(System.out); } @@ -125,29 +126,43 @@ public class FlowTest extends AbstractRandomTest { @Test public void run() { long count = 0; - System.out.printf("Submitting %d buffer arrays\n", COUNTER); - System.out.printf("LoopCount should be %d\n", TOTAL_LONGS); + printf("Submitting %d buffer arrays", COUNTER); + printf("LoopCount should be %d", TOTAL_LONGS); for (long i = 0; i < COUNTER; i++) { ByteBuffer b = getBuffer(count); count += LONGS_PER_BUF; srcPublisher.submit(List.of(b)); } - System.out.println("Finished submission. Waiting for loopback"); + println("Finished submission. Waiting for loopback"); // make sure we don't wait for allBytesReceived in case of error. - completion.whenComplete((r,t) -> allBytesReceived.countDown()); + var done = completion.whenComplete((r,t) -> { + try { + if (t != null) { + println("Completion with error: " + t); + t.printStackTrace(System.out); + } else { + println("Successful completion"); + } + } finally { + println("allBytesReceived.countDown()"); + allBytesReceived.countDown(); + } + }); + try { allBytesReceived.await(); } catch (InterruptedException e) { throw new RuntimeException(e); } - System.out.println("All bytes received: "); + println("All bytes received; latch count:" + + allBytesReceived.getCount()); srcPublisher.close(); try { - completion.join(); + done.join(); if (!alpn.equals("proto2")) { throw new RuntimeException("wrong alpn received"); } - System.out.println("OK"); + println("OK"); } finally { executor.shutdownNow(); } @@ -177,6 +192,7 @@ public class FlowTest extends AbstractRandomTest { */ static class SSLLoopbackSubscriber implements Subscriber> { private final BlockingQueue buffer; + private final SSLServerSocket serv; private final Socket clientSock; private final SSLSocket serverSock; private final Thread thread1, thread2, thread3; @@ -188,7 +204,7 @@ public class FlowTest extends AbstractRandomTest { ExecutorService exec, CountDownLatch allBytesReceived) throws IOException { SSLServerSocketFactory fac = ctx.getServerSocketFactory(); - SSLServerSocket serv = (SSLServerSocket) fac.createServerSocket(); + serv = (SSLServerSocket) fac.createServerSocket(); serv.setReuseAddress(false); serv.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0)); SSLParameters params = serv.getSSLParameters(); @@ -216,7 +232,7 @@ public class FlowTest extends AbstractRandomTest { } private void handlePublisherException(Object o, Throwable t) { - System.out.println("Loopback Publisher exception"); + println("Loopback Publisher exception"); t.printStackTrace(System.out); } @@ -227,20 +243,21 @@ public class FlowTest extends AbstractRandomTest { try { InputStream is = clientSock.getInputStream(); final int bufsize = FlowTest.randomRange(512, 16 * 1024); - System.out.println("clientReader: bufsize = " + bufsize); + println("clientReader: bufsize = " + bufsize); while (true) { byte[] buf = new byte[bufsize]; int n = is.read(buf); if (n == -1) { - System.out.println("clientReader close: read " + println("clientReader close: read " + readCount.get() + " bytes"); - System.out.println("clientReader: got EOF. " - + "Waiting signal to close publisher."); + println("clientReader: got EOF. " + + "Waiting signal to close publisher."); allBytesReceived.await(); - System.out.println("clientReader: closing publisher"); + println("clientReader: closing publisher; latch count: " + + allBytesReceived.getCount()); publisher.close(); sleep(2000); - Utils.close(is, clientSock); + Utils.close(is, clientSock, serv); return; } ByteBuffer bb = ByteBuffer.wrap(buf, 0, n); @@ -248,6 +265,7 @@ public class FlowTest extends AbstractRandomTest { publisher.submit(List.of(bb)); } } catch (Throwable e) { + println("clientReader failed: " + e); e.printStackTrace(); Utils.close(clientSock); } @@ -266,9 +284,9 @@ public class FlowTest extends AbstractRandomTest { if (buf == FlowTest.SENTINEL) { // finished //Utils.sleep(2000); - System.out.println("clientWriter close: " + nbytes + " written"); + println("clientWriter close: " + nbytes + " written"); clientSock.shutdownOutput(); - System.out.println("clientWriter close return"); + println("clientWriter close return"); return; } int len = buf.remaining(); @@ -280,6 +298,7 @@ public class FlowTest extends AbstractRandomTest { clientSubscription.request(1); } } catch (Throwable e) { + println("clientWriter failed: " + e); e.printStackTrace(); } } @@ -307,14 +326,15 @@ public class FlowTest extends AbstractRandomTest { InputStream is = serverSock.getInputStream(); OutputStream os = serverSock.getOutputStream(); final int bufsize = FlowTest.randomRange(512, 16 * 1024); - System.out.println("serverLoopback: bufsize = " + bufsize); + println("serverLoopback: bufsize = " + bufsize); byte[] bb = new byte[bufsize]; while (true) { int n = is.read(bb); if (n == -1) { sleep(2000); - is.close(); - serverSock.close(); + println("Received EOF: closing server socket"); + Utils.close(is, serverSock, serv); + println("Server socket closed"); return; } os.write(bb, 0, n); @@ -322,6 +342,7 @@ public class FlowTest extends AbstractRandomTest { loopCount.addAndGet(n); } } catch (Throwable e) { + println("serverLoopback failed: " + e); e.printStackTrace(); } } @@ -376,18 +397,18 @@ public class FlowTest extends AbstractRandomTest { */ static class EndSubscriber implements Subscriber> { - private final long nbytes; + private final long nlongs; private final AtomicLong counter; private volatile Flow.Subscription subscription; private final CompletableFuture completion; private final CountDownLatch allBytesReceived; - EndSubscriber(long nbytes, + EndSubscriber(long nlongs, CompletableFuture completion, CountDownLatch allBytesReceived) { counter = new AtomicLong(0); - this.nbytes = nbytes; + this.nlongs = nlongs; this.completion = completion; this.allBytesReceived = allBytesReceived; } @@ -417,12 +438,16 @@ public class FlowTest extends AbstractRandomTest { for (ByteBuffer buf : buffers) { while (buf.hasRemaining()) { + if (buf.remaining() % 8 != 0) { + completion.completeExceptionally( + new AssertionError("Unaligned buffer: " + buf.remaining())); + } long n = buf.getLong(); //if (currval > (FlowTest.TOTAL_LONGS - 50)) { //System.out.println("End: " + currval); //} if (n != currval++) { - System.out.println("ERROR at " + n + " != " + (currval - 1)); + println("ERROR at " + n + " != " + (currval - 1)); completion.completeExceptionally(new RuntimeException("ERROR")); subscription.cancel(); return; @@ -433,25 +458,24 @@ public class FlowTest extends AbstractRandomTest { counter.set(currval); subscription.request(1); if (currval >= TOTAL_LONGS) { + println("allBytesReceived.countDown(): currval=" +currval); allBytesReceived.countDown(); } } @Override public void onError(Throwable throwable) { - allBytesReceived.countDown(); completion.completeExceptionally(throwable); } @Override public void onComplete() { long n = counter.get(); - if (n != nbytes) { - System.out.printf("nbytes=%d n=%d\n", nbytes, n); + if (n != nlongs) { + printf("Error at end: nlongs=%d n=%d", nlongs, n); completion.completeExceptionally(new RuntimeException("ERROR AT END")); } else { - System.out.println("DONE OK: counter = " + n); - allBytesReceived.countDown(); + println("DONE OK: counter = " + n); completion.complete(null); } } @@ -464,4 +488,40 @@ public class FlowTest extends AbstractRandomTest { e.printStackTrace(); } } + + static final long START = System.nanoTime(); + + static String now() { + long now = System.nanoTime() - START; + long min = now / 1000_000_000L / 60L; + long sec = (now / 1000_000_000L) % 60L; + long mil = (now / 1000_000L) % 1000L; + long nan = (now % 1000_000L); + StringBuilder str = new StringBuilder(); + if (min != 0) { + str = str.append(min).append("m "); + } + if (sec != 0) { + str = str.append(sec).append("s "); + } + if (mil != 0) { + str.append(mil).append("ms "); + } + if (nan != 0) { + str.append(nan).append("ns "); + } + assert now == min * 60L * 1000_000_000L + + sec * 1000_000_000L + + mil * 1000_000L + nan; + return str.toString().trim(); + } + + static void printf(String fmt, Object... args) { + println(String.format(fmt, args)); + } + + static void println(String msg) { + System.out.println("[" + Thread.currentThread() + "] [" + now() + "] " + msg); + } + }