8231506: Fix some instabilities in a few networking tests

Reviewed-by: alanb, chegar, msheppar
This commit is contained in:
Daniel Fuchs 2019-10-01 12:10:33 +01:00
parent 990ec34714
commit 8e98ce54bf
3 changed files with 107 additions and 25 deletions

View File

@ -50,6 +50,7 @@ import java.nio.file.Paths;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import jdk.test.lib.net.IPSupport; import jdk.test.lib.net.IPSupport;
@ -72,11 +73,14 @@ public class UnreferencedMulticastSockets {
static class Server implements Runnable { static class Server implements Runnable {
MulticastSocket ss; MulticastSocket ss;
final int port;
final Phaser phaser = new Phaser(2);
Server() throws IOException { Server() throws IOException {
InetAddress loopback = InetAddress.getLoopbackAddress();
InetSocketAddress serverAddress = InetSocketAddress serverAddress =
new InetSocketAddress(InetAddress.getLoopbackAddress(), 0); new InetSocketAddress(loopback, 0);
ss = new MulticastSocket(serverAddress); ss = new MulticastSocket(serverAddress);
port = ss.getLocalPort();
System.out.printf(" DatagramServer addr: %s: %d%n", System.out.printf(" DatagramServer addr: %s: %d%n",
this.getHost(), this.getPort()); this.getHost(), this.getPort());
pendingSockets.add(new NamedWeak(ss, pendingQueue, "serverMulticastSocket")); pendingSockets.add(new NamedWeak(ss, pendingQueue, "serverMulticastSocket"));
@ -89,7 +93,7 @@ public class UnreferencedMulticastSockets {
} }
int getPort() { int getPort() {
return ss.getLocalPort(); return port;
} }
// Receive a byte and send back a byte // Receive a byte and send back a byte
@ -98,12 +102,18 @@ public class UnreferencedMulticastSockets {
byte[] buffer = new byte[50]; byte[] buffer = new byte[50];
DatagramPacket p = new DatagramPacket(buffer, buffer.length); DatagramPacket p = new DatagramPacket(buffer, buffer.length);
ss.receive(p); ss.receive(p);
System.out.printf("Server: ping received from: %s%n", p.getSocketAddress());
phaser.arriveAndAwaitAdvance(); // await the client...
buffer[0] += 1; buffer[0] += 1;
System.out.printf("Server: sending echo to: %s%n", p.getSocketAddress());
ss.send(p); // send back +1 ss.send(p); // send back +1
System.out.printf("Server: awaiting client%n");
phaser.arriveAndAwaitAdvance(); // await the client...
// do NOT close but 'forget' the socket reference // do NOT close but 'forget' the socket reference
System.out.printf("Server: forgetting socket...%n");
ss = null; ss = null;
} catch (Exception ioe) { } catch (Throwable ioe) {
ioe.printStackTrace(); ioe.printStackTrace();
} }
} }
@ -112,8 +122,11 @@ public class UnreferencedMulticastSockets {
public static void main(String args[]) throws Exception { public static void main(String args[]) throws Exception {
IPSupport.throwSkippedExceptionIfNonOperational(); IPSupport.throwSkippedExceptionIfNonOperational();
InetSocketAddress clientAddress =
new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);
// Create and close a MulticastSocket to warm up the FD count for side effects. // Create and close a MulticastSocket to warm up the FD count for side effects.
try (MulticastSocket s = new MulticastSocket(0)) { try (MulticastSocket s = new MulticastSocket(clientAddress)) {
// no-op; close immediately // no-op; close immediately
s.getLocalPort(); // no-op s.getLocalPort(); // no-op
} }
@ -126,8 +139,33 @@ public class UnreferencedMulticastSockets {
Thread thr = new Thread(svr); Thread thr = new Thread(svr);
thr.start(); thr.start();
MulticastSocket client = new MulticastSocket(0); // It is possible under some circumstances that the client
System.out.printf(" client bound port: %d%n", client.getLocalPort()); // might get bound to the same port than the server: this
// would make the test fail - so if this happen we try to
// bind to a specific port by incrementing the server port.
MulticastSocket client = null;
int serverPort = svr.getPort();
int maxtries = 20;
for (int i = 0; i < maxtries; i++) {
try {
System.out.printf("Trying to bind client to: %s%n", clientAddress);
client = new MulticastSocket(clientAddress);
if (client.getLocalPort() != svr.getPort()) break;
client.close();
} catch (IOException x) {
System.out.printf("Couldn't create client after %d attempts: %s%n", i, x);
if (i == maxtries) throw x;
}
if (i == maxtries) {
String msg = String.format("Couldn't create client after %d attempts", i);
System.out.println(msg);
throw new AssertionError(msg);
}
clientAddress = new InetSocketAddress(clientAddress.getAddress(), serverPort + i);
}
System.out.printf(" client bound port: %s:%d%n",
client.getLocalAddress(), client.getLocalPort());
client.connect(svr.getHost(), svr.getPort()); client.connect(svr.getHost(), svr.getPort());
pendingSockets.add(new NamedWeak(client, pendingQueue, "clientMulticastSocket")); pendingSockets.add(new NamedWeak(client, pendingQueue, "clientMulticastSocket"));
extractRefs(client, "clientMulticastSocket"); extractRefs(client, "clientMulticastSocket");
@ -136,14 +174,17 @@ public class UnreferencedMulticastSockets {
msg[0] = 1; msg[0] = 1;
DatagramPacket p = new DatagramPacket(msg, msg.length, svr.getHost(), svr.getPort()); DatagramPacket p = new DatagramPacket(msg, msg.length, svr.getHost(), svr.getPort());
client.send(p); client.send(p);
System.out.printf(" ping sent to: %s:%d%n", svr.getHost(), svr.getPort());
svr.phaser.arriveAndAwaitAdvance(); // wait until the server has received its packet
p = new DatagramPacket(msg, msg.length); p = new DatagramPacket(msg, msg.length);
client.receive(p); client.receive(p);
System.out.printf("echo received from: %s%n", p.getSocketAddress()); System.out.printf(" echo received from: %s%n", p.getSocketAddress());
if (msg[0] != 2) { if (msg[0] != 2) {
throw new AssertionError("incorrect data received: expected: 2, actual: " + msg[0]); throw new AssertionError("incorrect data received: expected: 2, actual: " + msg[0]);
} }
svr.phaser.arriveAndAwaitAdvance(); // let the server null out its socket
// Do NOT close the MulticastSocket; forget it // Do NOT close the MulticastSocket; forget it

View File

@ -68,7 +68,7 @@ public class SocketImplCombinations {
* Test creating a connected Socket, it should be created with a platform SocketImpl. * Test creating a connected Socket, it should be created with a platform SocketImpl.
*/ */
public void testNewSocket2() throws IOException { public void testNewSocket2() throws IOException {
try (ServerSocket ss = new ServerSocket(0)) { try (ServerSocket ss = boundServerSocket()) {
try (Socket s = new Socket(ss.getInetAddress(), ss.getLocalPort())) { try (Socket s = new Socket(ss.getInetAddress(), ss.getLocalPort())) {
SocketImpl si = getSocketImpl(s); SocketImpl si = getSocketImpl(s);
assertTrue(isSocksSocketImpl(si)); assertTrue(isSocksSocketImpl(si));
@ -127,7 +127,7 @@ public class SocketImplCombinations {
Socket s = new Socket((SocketImpl) null) { }; Socket s = new Socket((SocketImpl) null) { };
try (s) { try (s) {
assertTrue(getSocketImpl(s) == null); assertTrue(getSocketImpl(s) == null);
s.bind(new InetSocketAddress(0)); // force SocketImpl to be created s.bind(loopbackSocketAddress()); // force SocketImpl to be created
SocketImpl si = getSocketImpl(s); SocketImpl si = getSocketImpl(s);
assertTrue(isSocksSocketImpl(si)); assertTrue(isSocksSocketImpl(si));
SocketImpl delegate = getDelegate(si); SocketImpl delegate = getDelegate(si);
@ -218,7 +218,7 @@ public class SocketImplCombinations {
Socket s = new Socket((SocketImpl) null) { }; Socket s = new Socket((SocketImpl) null) { };
try (s) { try (s) {
assertTrue(getSocketImpl(s) == null); assertTrue(getSocketImpl(s) == null);
s.bind(new InetSocketAddress(0)); // force SocketImpl to be created s.bind(loopbackSocketAddress()); // force SocketImpl to be created
assertTrue(getSocketImpl(s) instanceof CustomSocketImpl); assertTrue(getSocketImpl(s) instanceof CustomSocketImpl);
} }
} finally { } finally {
@ -378,7 +378,7 @@ public class SocketImplCombinations {
public void testServerSocketAccept5a() throws IOException { public void testServerSocketAccept5a() throws IOException {
SocketImpl serverImpl = new CustomSocketImpl(true); SocketImpl serverImpl = new CustomSocketImpl(true);
try (ServerSocket ss = new ServerSocket(serverImpl) { }) { try (ServerSocket ss = new ServerSocket(serverImpl) { }) {
ss.bind(new InetSocketAddress(0)); ss.bind(loopbackSocketAddress());
expectThrows(IOException.class, ss::accept); expectThrows(IOException.class, ss::accept);
} }
} }
@ -565,17 +565,37 @@ public class SocketImplCombinations {
} }
} }
/**
* Returns a new InetSocketAddress with the loopback interface
* and port 0.
*/
static InetSocketAddress loopbackSocketAddress() {
InetAddress loopback = InetAddress.getLoopbackAddress();
return new InetSocketAddress(loopback, 0);
}
/**
* Returns a ServerSocket bound to a port on the loopback address
*/
static ServerSocket boundServerSocket() throws IOException {
ServerSocket ss = new ServerSocket();
ss.bind(loopbackSocketAddress());
return ss;
}
/** /**
* Creates a ServerSocket that returns the given Socket from accept. * Creates a ServerSocket that returns the given Socket from accept.
*/ */
static ServerSocket serverSocketToAccept(Socket s) throws IOException { static ServerSocket serverSocketToAccept(Socket s) throws IOException {
return new ServerSocket(0) { ServerSocket ss = new ServerSocket() {
@Override @Override
public Socket accept() throws IOException { public Socket accept() throws IOException {
implAccept(s); implAccept(s);
return s; return s;
} }
}; };
ss.bind(loopbackSocketAddress());
return ss;
} }
/** /**
@ -590,7 +610,7 @@ public class SocketImplCombinations {
return s; return s;
} }
}; };
ss.bind(new InetSocketAddress(0)); ss.bind(loopbackSocketAddress());
return ss; return ss;
} }

View File

@ -26,6 +26,8 @@ import com.sun.net.httpserver.HttpServer;
import com.sun.net.httpserver.HttpsConfigurator; import com.sun.net.httpserver.HttpsConfigurator;
import com.sun.net.httpserver.HttpsParameters; import com.sun.net.httpserver.HttpsParameters;
import com.sun.net.httpserver.HttpsServer; import com.sun.net.httpserver.HttpsServer;
import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
@ -1568,8 +1570,8 @@ public abstract class DigestEchoServer implements HttpServerAdapters {
@Override @Override
public void run() { public void run() {
try { try {
int c = 0;
try { try {
int c;
while ((c = is.read()) != -1) { while ((c = is.read()) != -1) {
os.write(c); os.write(c);
os.flush(); os.flush();
@ -1578,11 +1580,13 @@ public abstract class DigestEchoServer implements HttpServerAdapters {
if (DEBUG) System.out.print(tag); if (DEBUG) System.out.print(tag);
} }
is.close(); is.close();
} finally {
os.close();
}
} catch (IOException ex) { } catch (IOException ex) {
if (DEBUG) ex.printStackTrace(System.out); if (DEBUG || !stopped && c > -1)
ex.printStackTrace(System.out);
end.completeExceptionally(ex);
} finally {
try {os.close();} catch (Throwable t) {}
}
} finally { } finally {
end.complete(null); end.complete(null);
} }
@ -1632,10 +1636,12 @@ public abstract class DigestEchoServer implements HttpServerAdapters {
@Override @Override
public void run() { public void run() {
Socket clientConnection = null; Socket clientConnection = null;
Socket targetConnection = null;
try { try {
while (!stopped) { while (!stopped) {
System.out.println(now() + "Tunnel: Waiting for client"); System.out.println(now() + "Tunnel: Waiting for client");
Socket toClose; Socket toClose;
targetConnection = clientConnection = null;
try { try {
toClose = clientConnection = ss.accept(); toClose = clientConnection = ss.accept();
if (NO_LINGER) { if (NO_LINGER) {
@ -1649,7 +1655,6 @@ public abstract class DigestEchoServer implements HttpServerAdapters {
} }
System.out.println(now() + "Tunnel: Client accepted"); System.out.println(now() + "Tunnel: Client accepted");
StringBuilder headers = new StringBuilder(); StringBuilder headers = new StringBuilder();
Socket targetConnection = null;
InputStream ccis = clientConnection.getInputStream(); InputStream ccis = clientConnection.getInputStream();
OutputStream ccos = clientConnection.getOutputStream(); OutputStream ccos = clientConnection.getOutputStream();
Writer w = new OutputStreamWriter( Writer w = new OutputStreamWriter(
@ -1769,28 +1774,44 @@ public abstract class DigestEchoServer implements HttpServerAdapters {
end1 = new CompletableFuture<>()); end1 = new CompletableFuture<>());
Thread t2 = pipe(targetConnection.getInputStream(), ccos, '-', Thread t2 = pipe(targetConnection.getInputStream(), ccos, '-',
end2 = new CompletableFuture<>()); end2 = new CompletableFuture<>());
end = CompletableFuture.allOf(end1, end2); var end11 = end1.whenComplete((r, t) -> exceptionally(end2, t));
var end22 = end2.whenComplete((r, t) -> exceptionally(end1, t));
end = CompletableFuture.allOf(end11, end22);
Socket tc = targetConnection;
end.whenComplete( end.whenComplete(
(r,t) -> { (r,t) -> {
try { toClose.close(); } catch (IOException x) { } try { toClose.close(); } catch (IOException x) { }
try { tc.close(); } catch (IOException x) { }
finally {connectionCFs.remove(end);} finally {connectionCFs.remove(end);}
}); });
connectionCFs.add(end); connectionCFs.add(end);
targetConnection = clientConnection = null;
t1.start(); t1.start();
t2.start(); t2.start();
} }
} catch (Throwable ex) { } catch (Throwable ex) {
try { close(clientConnection, ex);
ss.close(); close(targetConnection, ex);
} catch (IOException ex1) { close(ss, ex);
ex.addSuppressed(ex1);
}
ex.printStackTrace(System.err); ex.printStackTrace(System.err);
} finally { } finally {
System.out.println(now() + "Tunnel: exiting (stopped=" + stopped + ")"); System.out.println(now() + "Tunnel: exiting (stopped=" + stopped + ")");
connectionCFs.forEach(cf -> cf.complete(null)); connectionCFs.forEach(cf -> cf.complete(null));
} }
} }
void exceptionally(CompletableFuture<?> cf, Throwable t) {
if (t != null) cf.completeExceptionally(t);
}
void close(Closeable c, Throwable e) {
if (c == null) return;
try {
c.close();
} catch (IOException x) {
e.addSuppressed(x);
}
}
} }
/** /**