8325028: (ch) Pipe channels should lazily set socket to non-blocking mode on first use by virtual thread

Reviewed-by: bpb
This commit is contained in:
Alan Bateman 2024-02-08 07:55:08 +00:00
parent 1fb9e3d674
commit d1099033ac
2 changed files with 80 additions and 14 deletions

View File

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2000, 2022, Oracle and/or its affiliates. All rights reserved. * Copyright (c) 2000, 2024, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
* *
* This code is free software; you can redistribute it and/or modify it * This code is free software; you can redistribute it and/or modify it
@ -66,6 +66,13 @@ class SinkChannelImpl
// ID of native thread doing write, for signalling // ID of native thread doing write, for signalling
private long thread; private long thread;
// True if the channel's socket has been forced into non-blocking mode
// by a virtual thread. It cannot be reset. When the channel is in
// blocking mode and the channel's socket is in non-blocking mode then
// operations that don't complete immediately will poll the socket and
// preserve the semantics of blocking operations.
private volatile boolean forcedNonBlocking;
// -- End of fields protected by stateLock // -- End of fields protected by stateLock
@ -79,11 +86,34 @@ class SinkChannelImpl
SinkChannelImpl(SelectorProvider sp, FileDescriptor fd) throws IOException { SinkChannelImpl(SelectorProvider sp, FileDescriptor fd) throws IOException {
super(sp); super(sp);
IOUtil.configureBlocking(fd, false);
this.fd = fd; this.fd = fd;
this.fdVal = IOUtil.fdVal(fd); this.fdVal = IOUtil.fdVal(fd);
} }
/**
* Checks that the channel is open.
*
* @throws ClosedChannelException if channel is closed (or closing)
*/
private void ensureOpen() throws ClosedChannelException {
if (!isOpen())
throw new ClosedChannelException();
}
/**
* Ensures that the socket is configured non-blocking when on a virtual thread.
*/
private void configureSocketNonBlockingIfVirtualThread() throws IOException {
assert writeLock.isHeldByCurrentThread();
if (!forcedNonBlocking && Thread.currentThread().isVirtual()) {
synchronized (stateLock) {
ensureOpen();
IOUtil.configureBlocking(fd, false);
forcedNonBlocking = true;
}
}
}
/** /**
* Closes the write end of the pipe if there are no write operation in * Closes the write end of the pipe if there are no write operation in
* progress and the channel is not registered with a Selector. * progress and the channel is not registered with a Selector.
@ -183,9 +213,11 @@ class SinkChannelImpl
writeLock.lock(); writeLock.lock();
try { try {
synchronized (stateLock) { synchronized (stateLock) {
if (!isOpen()) ensureOpen();
throw new ClosedChannelException(); // do nothing if virtual thread has forced the socket to be non-blocking
IOUtil.configureBlocking(fd, block); if (!forcedNonBlocking) {
IOUtil.configureBlocking(fd, block);
}
} }
} finally { } finally {
writeLock.unlock(); writeLock.unlock();
@ -241,8 +273,7 @@ class SinkChannelImpl
begin(); begin();
} }
synchronized (stateLock) { synchronized (stateLock) {
if (!isOpen()) ensureOpen();
throw new ClosedChannelException();
if (blocking) if (blocking)
thread = NativeThread.current(); thread = NativeThread.current();
} }
@ -279,6 +310,7 @@ class SinkChannelImpl
int n = 0; int n = 0;
try { try {
beginWrite(blocking); beginWrite(blocking);
configureSocketNonBlockingIfVirtualThread();
n = IOUtil.write(fd, src, -1, nd); n = IOUtil.write(fd, src, -1, nd);
if (blocking) { if (blocking) {
while (IOStatus.okayToRetry(n) && isOpen()) { while (IOStatus.okayToRetry(n) && isOpen()) {
@ -306,6 +338,7 @@ class SinkChannelImpl
long n = 0; long n = 0;
try { try {
beginWrite(blocking); beginWrite(blocking);
configureSocketNonBlockingIfVirtualThread();
n = IOUtil.write(fd, srcs, offset, length, nd); n = IOUtil.write(fd, srcs, offset, length, nd);
if (blocking) { if (blocking) {
while (IOStatus.okayToRetry(n) && isOpen()) { while (IOStatus.okayToRetry(n) && isOpen()) {

View File

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2000, 2022, Oracle and/or its affiliates. All rights reserved. * Copyright (c) 2000, 2024, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
* *
* This code is free software; you can redistribute it and/or modify it * This code is free software; you can redistribute it and/or modify it
@ -66,6 +66,13 @@ class SourceChannelImpl
// ID of native thread doing read, for signalling // ID of native thread doing read, for signalling
private long thread; private long thread;
// True if the channel's socket has been forced into non-blocking mode
// by a virtual thread. It cannot be reset. When the channel is in
// blocking mode and the channel's socket is in non-blocking mode then
// operations that don't complete immediately will poll the socket and
// preserve the semantics of blocking operations.
private volatile boolean forcedNonBlocking;
// -- End of fields protected by stateLock // -- End of fields protected by stateLock
@ -79,11 +86,34 @@ class SourceChannelImpl
SourceChannelImpl(SelectorProvider sp, FileDescriptor fd) throws IOException { SourceChannelImpl(SelectorProvider sp, FileDescriptor fd) throws IOException {
super(sp); super(sp);
IOUtil.configureBlocking(fd, false);
this.fd = fd; this.fd = fd;
this.fdVal = IOUtil.fdVal(fd); this.fdVal = IOUtil.fdVal(fd);
} }
/**
* Checks that the channel is open.
*
* @throws ClosedChannelException if channel is closed (or closing)
*/
private void ensureOpen() throws ClosedChannelException {
if (!isOpen())
throw new ClosedChannelException();
}
/**
* Ensures that the socket is configured non-blocking when on a virtual thread.
*/
private void configureSocketNonBlockingIfVirtualThread() throws IOException {
assert readLock.isHeldByCurrentThread();
if (!forcedNonBlocking && Thread.currentThread().isVirtual()) {
synchronized (stateLock) {
ensureOpen();
IOUtil.configureBlocking(fd, false);
forcedNonBlocking = true;
}
}
}
/** /**
* Closes the read end of the pipe if there are no read operation in * Closes the read end of the pipe if there are no read operation in
* progress and the channel is not registered with a Selector. * progress and the channel is not registered with a Selector.
@ -183,9 +213,11 @@ class SourceChannelImpl
readLock.lock(); readLock.lock();
try { try {
synchronized (stateLock) { synchronized (stateLock) {
if (!isOpen()) ensureOpen();
throw new ClosedChannelException(); // do nothing if virtual thread has forced the socket to be non-blocking
IOUtil.configureBlocking(fd, block); if (!forcedNonBlocking) {
IOUtil.configureBlocking(fd, block);
}
} }
} finally { } finally {
readLock.unlock(); readLock.unlock();
@ -241,8 +273,7 @@ class SourceChannelImpl
begin(); begin();
} }
synchronized (stateLock) { synchronized (stateLock) {
if (!isOpen()) ensureOpen();
throw new ClosedChannelException();
if (blocking) if (blocking)
thread = NativeThread.current(); thread = NativeThread.current();
} }
@ -279,6 +310,7 @@ class SourceChannelImpl
int n = 0; int n = 0;
try { try {
beginRead(blocking); beginRead(blocking);
configureSocketNonBlockingIfVirtualThread();
n = IOUtil.read(fd, dst, -1, nd); n = IOUtil.read(fd, dst, -1, nd);
if (blocking) { if (blocking) {
while (IOStatus.okayToRetry(n) && isOpen()) { while (IOStatus.okayToRetry(n) && isOpen()) {
@ -306,6 +338,7 @@ class SourceChannelImpl
long n = 0; long n = 0;
try { try {
beginRead(blocking); beginRead(blocking);
configureSocketNonBlockingIfVirtualThread();
n = IOUtil.read(fd, dsts, offset, length, nd); n = IOUtil.read(fd, dsts, offset, length, nd);
if (blocking) { if (blocking) {
while (IOStatus.okayToRetry(n) && isOpen()) { while (IOStatus.okayToRetry(n) && isOpen()) {