8198928: (so) SocketChannel connect may deadlock if closed at around same time that connect fails

Reviewed-by: bpb, mli
This commit is contained in:
Alan Bateman 2018-03-07 07:13:55 +00:00
parent 1deb2a7bd5
commit 889a041f22
6 changed files with 258 additions and 152 deletions
src/java.base
share/classes/sun/nio/ch
unix/native/libnio/ch
windows/native/libnio/ch
test/jdk/java/nio/channels

@ -588,12 +588,16 @@ class SocketChannelImpl
/**
* Marks the beginning of a connect operation that might block.
*
* @param blocking true if configured blocking
* @param isa the remote address
* @throws ClosedChannelException if the channel is closed
* @throws AlreadyConnectedException if already connected
* @throws ConnectionPendingException is a connection is pending
* @throws IOException if the pre-connect hook fails
*/
private void beginConnect(boolean blocking) throws ClosedChannelException {
private void beginConnect(boolean blocking, InetSocketAddress isa)
throws IOException
{
if (blocking) {
// set hook for Thread.interrupt
begin();
@ -604,6 +608,13 @@ class SocketChannelImpl
throw new AlreadyConnectedException();
if (state == ST_CONNECTIONPENDING)
throw new ConnectionPendingException();
assert state == ST_UNCONNECTED;
state = ST_CONNECTIONPENDING;
if (localAddress == null)
NetHooks.beforeTcpConnect(fd, isa.getAddress(), isa.getPort());
remoteAddress = isa;
if (blocking)
readerThread = NativeThread.current();
}
@ -614,11 +625,21 @@ class SocketChannelImpl
*
* @throws AsynchronousCloseException if the channel was closed due to this
* thread being interrupted on a blocking connect operation.
* @throws IOException if completed and unable to obtain the local address
*/
private void endConnect(boolean blocking, boolean completed)
throws AsynchronousCloseException
throws IOException
{
endRead(blocking, completed);
if (completed) {
synchronized (stateLock) {
if (state == ST_CONNECTIONPENDING) {
localAddress = Net.localAddress(fd);
state = ST_CONNECTED;
}
}
}
}
@Override
@ -628,64 +649,37 @@ class SocketChannelImpl
if (sm != null)
sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort());
readLock.lock();
InetAddress ia = isa.getAddress();
if (ia.isAnyLocalAddress())
ia = InetAddress.getLocalHost();
try {
writeLock.lock();
readLock.lock();
try {
// notify before-connect hook
synchronized (stateLock) {
if (state == ST_UNCONNECTED && localAddress == null) {
NetHooks.beforeTcpConnect(fd, isa.getAddress(), isa.getPort());
}
}
InetAddress ia = isa.getAddress();
if (ia.isAnyLocalAddress())
ia = InetAddress.getLocalHost();
int n = 0;
boolean blocking = isBlocking();
writeLock.lock();
try {
int n = 0;
boolean blocking = isBlocking();
try {
beginConnect(blocking);
if (blocking) {
do {
n = Net.connect(fd, ia, isa.getPort());
} while (n == IOStatus.INTERRUPTED && isOpen());
} else {
beginConnect(blocking, isa);
do {
n = Net.connect(fd, ia, isa.getPort());
}
} while (n == IOStatus.INTERRUPTED && isOpen());
} finally {
endConnect(blocking, n > 0);
}
} catch (IOException x) {
// connect failed, close socket
close();
throw x;
}
// connection may be established
synchronized (stateLock) {
if (!isOpen())
throw new AsynchronousCloseException();
remoteAddress = isa;
if (n > 0) {
// connected established
localAddress = Net.localAddress(fd);
state = ST_CONNECTED;
return true;
} else {
// connection pending
assert !blocking;
state = ST_CONNECTIONPENDING;
return false;
endConnect(blocking, (n > 0));
}
assert IOStatus.check(n);
return n > 0;
} finally {
writeLock.unlock();
}
} finally {
writeLock.unlock();
readLock.unlock();
}
} finally {
readLock.unlock();
} catch (IOException ioe) {
// connect failed, close the channel
close();
throw ioe;
}
}
@ -714,65 +708,62 @@ class SocketChannelImpl
*
* @throws AsynchronousCloseException if the channel was closed due to this
* thread being interrupted on a blocking connect operation.
* @throws IOException if completed and unable to obtain the local address
*/
private void endFinishConnect(boolean blocking, boolean completed)
throws AsynchronousCloseException
throws IOException
{
endRead(blocking, completed);
if (completed) {
synchronized (stateLock) {
if (state == ST_CONNECTIONPENDING) {
localAddress = Net.localAddress(fd);
state = ST_CONNECTED;
}
}
}
}
@Override
public boolean finishConnect() throws IOException {
readLock.lock();
try {
writeLock.lock();
readLock.lock();
try {
// already connected?
synchronized (stateLock) {
if (state == ST_CONNECTED)
return true;
}
int n = 0;
boolean blocking = isBlocking();
writeLock.lock();
try {
// no-op if already connected
if (isConnected())
return true;
boolean blocking = isBlocking();
boolean connected = false;
try {
beginFinishConnect(blocking);
int n = 0;
if (blocking) {
do {
n = checkConnect(fd, true);
} while (n == 0 || (n == IOStatus.INTERRUPTED) && isOpen());
} while ((n == 0 || n == IOStatus.INTERRUPTED) && isOpen());
} else {
n = checkConnect(fd, false);
}
connected = (n > 0);
} finally {
endFinishConnect(blocking, n > 0);
}
} catch (IOException x) {
close();
throw x;
}
// post finishConnect, connection may be established
synchronized (stateLock) {
if (!isOpen())
throw new AsynchronousCloseException();
if (n > 0) {
// connection established
localAddress = Net.localAddress(fd);
state = ST_CONNECTED;
return true;
} else {
// connection still pending
assert !blocking;
return false;
endFinishConnect(blocking, connected);
}
assert (blocking && connected) ^ !blocking;
return connected;
} finally {
writeLock.unlock();
}
} finally {
writeLock.unlock();
readLock.unlock();
}
} finally {
readLock.unlock();
} catch (IOException ioe) {
// connect failed, close the channel
close();
throw ioe;
}
}

@ -1,5 +1,5 @@
/*
* Copyright (c) 2001, 2017, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2001, 2018, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -293,8 +293,7 @@ Java_sun_nio_ch_Net_connect0(JNIEnv *env, jclass clazz, jboolean preferIPv6,
int sa_len = 0;
int rv;
if (NET_InetAddressToSockaddr(env, iao, port, &sa, &sa_len,
preferIPv6) != 0) {
if (NET_InetAddressToSockaddr(env, iao, port, &sa, &sa_len, preferIPv6) != 0) {
return IOS_THROWN;
}
@ -761,11 +760,11 @@ handleSocketError(JNIEnv *env, jint errorValue)
break;
#endif
case ECONNREFUSED:
xn = JNU_JAVANETPKG "ConnectException";
break;
case ETIMEDOUT:
case ENOTCONN:
xn = JNU_JAVANETPKG "ConnectException";
break;
case EHOSTUNREACH:
xn = JNU_JAVANETPKG "NoRouteToHostException";
break;

@ -59,23 +59,29 @@ Java_sun_nio_ch_SocketChannelImpl_checkConnect(JNIEnv *env, jobject this,
poller.events = POLLOUT;
poller.revents = 0;
result = poll(&poller, 1, block ? -1 : 0);
if (result < 0) {
JNU_ThrowIOExceptionWithLastError(env, "Poll failed");
return IOS_THROWN;
if (errno == EINTR) {
return IOS_INTERRUPTED;
} else {
JNU_ThrowIOExceptionWithLastError(env, "poll failed");
return IOS_THROWN;
}
}
if (!block && (result == 0))
return IOS_UNAVAILABLE;
return IOS_UNAVAILABLE;
if (poller.revents) {
if (result > 0) {
errno = 0;
result = getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &n);
if (result < 0) {
handleSocketError(env, errno);
return JNI_FALSE;
return handleSocketError(env, errno);
} else if (error) {
handleSocketError(env, error);
return JNI_FALSE;
return handleSocketError(env, error);
} else if ((poller.revents & POLLHUP) != 0) {
return handleSocketError(env, ENOTCONN);
}
// connected
return 1;
}
return 0;

@ -58,9 +58,7 @@ Java_sun_nio_ch_SocketChannelImpl_checkConnect(JNIEnv *env, jobject this,
jobject fdo, jboolean block)
{
int optError = 0;
int lastError = 0;
int result = 0;
int retry = 0;
int result;
int n = sizeof(int);
jint fd = fdval(env, fdo);
fd_set wr, ex;
@ -73,64 +71,33 @@ Java_sun_nio_ch_SocketChannelImpl_checkConnect(JNIEnv *env, jobject this,
result = select(fd+1, 0, &wr, &ex, block ? NULL : &t);
/* save last winsock error */
if (result == SOCKET_ERROR) {
lastError = WSAGetLastError();
}
if (block) { /* must configure socket back to blocking state */
u_long argp = 0;
int r = ioctlsocket(fd, FIONBIO, &argp);
if (r == SOCKET_ERROR) {
handleSocketError(env, WSAGetLastError());
}
}
if (result == 0) { /* timeout */
return block ? 0 : IOS_UNAVAILABLE;
} else {
if (result == SOCKET_ERROR) { /* select failed */
handleSocketError(env, lastError);
if (result == SOCKET_ERROR) { /* select failed */
handleSocketError(env, WSAGetLastError());
return IOS_THROWN;
}
}
/*
* Socket is writable or error occurred. On some Windows editions
* the socket will appear writable when the connect fails so we
* check for error rather than writable.
*/
if (!FD_ISSET(fd, &ex)) {
return 1; /* connection established */
// connection established if writable and no error to check
if (FD_ISSET(fd, &wr) && !FD_ISSET(fd, &ex)) {
return 1;
}
/*
* A getsockopt( SO_ERROR ) may indicate success on NT4 even
* though the connection has failed. The workaround is to allow
* winsock to be scheduled and this is done via by yielding.
* As the yield approach is problematic in heavy load situations
* we attempt up to 3 times to get the failure reason.
*/
for (retry=0; retry<3; retry++) {
result = getsockopt((SOCKET)fd,
SOL_SOCKET,
SO_ERROR,
(char *)&optError,
&n);
if (result == SOCKET_ERROR) {
int lastError = WSAGetLastError();
if (lastError == WSAEINPROGRESS) {
return IOS_UNAVAILABLE;
}
NET_ThrowNew(env, lastError, "getsockopt");
return IOS_THROWN;
result = getsockopt((SOCKET)fd,
SOL_SOCKET,
SO_ERROR,
(char *)&optError,
&n);
if (result == SOCKET_ERROR) {
int lastError = WSAGetLastError();
if (lastError == WSAEINPROGRESS) {
return IOS_UNAVAILABLE;
}
if (optError) {
break;
}
Sleep(0);
NET_ThrowNew(env, lastError, "getsockopt");
return IOS_THROWN;
}
if (optError != NO_ERROR) {
handleSocketError(env, optError);
return IOS_THROWN;

@ -1,5 +1,5 @@
/*
* Copyright (c) 2002, 2016, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2002, 2018, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -708,9 +708,9 @@ public class AsyncCloseAndInterrupt {
test(connectedSocketChannelFactory);
if (TestUtil.onWindows()) {
if (TestUtil.onWindows() || TestUtil.onSolaris()) {
log.println("WARNING Cannot reliably test connect/finishConnect"
+ " operations on Windows");
+ " operations on this platform");
} else {
// Only the following tests need refuser's connection backlog
// to be saturated

@ -0,0 +1,143 @@
/*
* Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
/* @test
* @bug 8198928
* @run main CloseDuringConnect
* @summary Attempt to cause a deadlock by closing a SocketChannel in one thread
* where another thread is closing the channel after a connect fail
*/
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.SocketChannel;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.IntStream;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
public class CloseDuringConnect {
// number of test iterations, needs to be 5-10 at least
static final int ITERATIONS = 50;
// maximum delay before closing SocketChannel, in milliseconds
static final int MAX_DELAY_BEFORE_CLOSE = 20;
/**
* Returns the socket address of an endpoint that refuses connections. The
* endpoint is an InetSocketAddress where the address is the loopback address
* and the port is a system port (1-1023 range).
*/
static SocketAddress refusingEndpoint() {
InetAddress lb = InetAddress.getLoopbackAddress();
int port = 1;
while (port < 1024) {
SocketAddress sa = new InetSocketAddress(lb, port);
try {
SocketChannel.open(sa).close();
} catch (IOException ioe) {
return sa;
}
port++;
}
throw new RuntimeException("Unable to find system port that is refusing connections");
}
/**
* Invoked by a task in the thread pool to connect to a remote address.
* The connection should never be established.
*/
static Void connect(SocketChannel sc, SocketAddress remote) {
try {
if (!sc.connect(remote)) {
while (!sc.finishConnect()) {
Thread.yield();
}
}
throw new RuntimeException("Connected, should not happen");
} catch (IOException expected) { }
if (sc.isConnected())
throw new RuntimeException("isConnected return true, should not happen");
return null;
}
/**
* Invoked by a task in the thread pool to close a socket channel.
*/
static Void close(SocketChannel sc) {
try {
sc.close();
} catch (IOException e) {
throw new UncheckedIOException("close failed", e);
}
return null;
}
/**
* Test for deadlock by submitting a task to connect to the given address
* while another task closes the socket channel.
* @param pool the thread pool to submit or schedule tasks
* @param remote the remote address, does not accept connections
* @param blocking socket channel blocking mode
* @param delay the delay, in millis, before closing the channel
*/
static void test(ScheduledExecutorService pool,
SocketAddress remote,
boolean blocking,
long delay) {
try {
SocketChannel sc = SocketChannel.open();
sc.configureBlocking(blocking);
Future<Void> r1 = pool.submit(() -> connect(sc, remote));
Future<Void> r2 = pool.schedule(() -> close(sc), delay, MILLISECONDS);
r1.get();
r2.get();
} catch (Throwable t) {
throw new RuntimeException("Test failed", t);
}
}
public static void main(String[] args) throws Exception {
SocketAddress refusing = refusingEndpoint();
ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);
try {
IntStream.range(0, ITERATIONS).forEach(i -> {
System.out.format("Iteration %d ...%n", (i + 1));
// Execute the test for varying delays up to MAX_DELAY_BEFORE_CLOSE,
// for socket channels configured both blocking and non-blocking
IntStream.range(0, MAX_DELAY_BEFORE_CLOSE).forEach(delay -> {
test(pool, refusing, /*blocking mode*/true, delay);
test(pool, refusing, /*blocking mode*/false, delay);
});
});
} finally {
pool.shutdown();
}
}
}