From d1099033ac63b9dd0dd6e3a7341db929e9e0e56e Mon Sep 17 00:00:00 2001 From: Alan Bateman Date: Thu, 8 Feb 2024 07:55:08 +0000 Subject: [PATCH] 8325028: (ch) Pipe channels should lazily set socket to non-blocking mode on first use by virtual thread Reviewed-by: bpb --- .../classes/sun/nio/ch/SinkChannelImpl.java | 47 ++++++++++++++++--- .../classes/sun/nio/ch/SourceChannelImpl.java | 47 ++++++++++++++++--- 2 files changed, 80 insertions(+), 14 deletions(-) diff --git a/src/java.base/unix/classes/sun/nio/ch/SinkChannelImpl.java b/src/java.base/unix/classes/sun/nio/ch/SinkChannelImpl.java index b49b699fdce..d72a4892ef0 100644 --- a/src/java.base/unix/classes/sun/nio/ch/SinkChannelImpl.java +++ b/src/java.base/unix/classes/sun/nio/ch/SinkChannelImpl.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2000, 2022, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2000, 2024, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -66,6 +66,13 @@ class SinkChannelImpl // ID of native thread doing write, for signalling private long thread; + // True if the channel's socket has been forced into non-blocking mode + // by a virtual thread. It cannot be reset. When the channel is in + // blocking mode and the channel's socket is in non-blocking mode then + // operations that don't complete immediately will poll the socket and + // preserve the semantics of blocking operations. + private volatile boolean forcedNonBlocking; + // -- End of fields protected by stateLock @@ -79,11 +86,34 @@ class SinkChannelImpl SinkChannelImpl(SelectorProvider sp, FileDescriptor fd) throws IOException { super(sp); - IOUtil.configureBlocking(fd, false); this.fd = fd; this.fdVal = IOUtil.fdVal(fd); } + /** + * Checks that the channel is open. + * + * @throws ClosedChannelException if channel is closed (or closing) + */ + private void ensureOpen() throws ClosedChannelException { + if (!isOpen()) + throw new ClosedChannelException(); + } + + /** + * Ensures that the socket is configured non-blocking when on a virtual thread. + */ + private void configureSocketNonBlockingIfVirtualThread() throws IOException { + assert writeLock.isHeldByCurrentThread(); + if (!forcedNonBlocking && Thread.currentThread().isVirtual()) { + synchronized (stateLock) { + ensureOpen(); + IOUtil.configureBlocking(fd, false); + forcedNonBlocking = true; + } + } + } + /** * Closes the write end of the pipe if there are no write operation in * progress and the channel is not registered with a Selector. @@ -183,9 +213,11 @@ class SinkChannelImpl writeLock.lock(); try { synchronized (stateLock) { - if (!isOpen()) - throw new ClosedChannelException(); - IOUtil.configureBlocking(fd, block); + ensureOpen(); + // do nothing if virtual thread has forced the socket to be non-blocking + if (!forcedNonBlocking) { + IOUtil.configureBlocking(fd, block); + } } } finally { writeLock.unlock(); @@ -241,8 +273,7 @@ class SinkChannelImpl begin(); } synchronized (stateLock) { - if (!isOpen()) - throw new ClosedChannelException(); + ensureOpen(); if (blocking) thread = NativeThread.current(); } @@ -279,6 +310,7 @@ class SinkChannelImpl int n = 0; try { beginWrite(blocking); + configureSocketNonBlockingIfVirtualThread(); n = IOUtil.write(fd, src, -1, nd); if (blocking) { while (IOStatus.okayToRetry(n) && isOpen()) { @@ -306,6 +338,7 @@ class SinkChannelImpl long n = 0; try { beginWrite(blocking); + configureSocketNonBlockingIfVirtualThread(); n = IOUtil.write(fd, srcs, offset, length, nd); if (blocking) { while (IOStatus.okayToRetry(n) && isOpen()) { diff --git a/src/java.base/unix/classes/sun/nio/ch/SourceChannelImpl.java b/src/java.base/unix/classes/sun/nio/ch/SourceChannelImpl.java index 7c2791609a7..7be17040b4c 100644 --- a/src/java.base/unix/classes/sun/nio/ch/SourceChannelImpl.java +++ b/src/java.base/unix/classes/sun/nio/ch/SourceChannelImpl.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2000, 2022, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2000, 2024, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -66,6 +66,13 @@ class SourceChannelImpl // ID of native thread doing read, for signalling private long thread; + // True if the channel's socket has been forced into non-blocking mode + // by a virtual thread. It cannot be reset. When the channel is in + // blocking mode and the channel's socket is in non-blocking mode then + // operations that don't complete immediately will poll the socket and + // preserve the semantics of blocking operations. + private volatile boolean forcedNonBlocking; + // -- End of fields protected by stateLock @@ -79,11 +86,34 @@ class SourceChannelImpl SourceChannelImpl(SelectorProvider sp, FileDescriptor fd) throws IOException { super(sp); - IOUtil.configureBlocking(fd, false); this.fd = fd; this.fdVal = IOUtil.fdVal(fd); } + /** + * Checks that the channel is open. + * + * @throws ClosedChannelException if channel is closed (or closing) + */ + private void ensureOpen() throws ClosedChannelException { + if (!isOpen()) + throw new ClosedChannelException(); + } + + /** + * Ensures that the socket is configured non-blocking when on a virtual thread. + */ + private void configureSocketNonBlockingIfVirtualThread() throws IOException { + assert readLock.isHeldByCurrentThread(); + if (!forcedNonBlocking && Thread.currentThread().isVirtual()) { + synchronized (stateLock) { + ensureOpen(); + IOUtil.configureBlocking(fd, false); + forcedNonBlocking = true; + } + } + } + /** * Closes the read end of the pipe if there are no read operation in * progress and the channel is not registered with a Selector. @@ -183,9 +213,11 @@ class SourceChannelImpl readLock.lock(); try { synchronized (stateLock) { - if (!isOpen()) - throw new ClosedChannelException(); - IOUtil.configureBlocking(fd, block); + ensureOpen(); + // do nothing if virtual thread has forced the socket to be non-blocking + if (!forcedNonBlocking) { + IOUtil.configureBlocking(fd, block); + } } } finally { readLock.unlock(); @@ -241,8 +273,7 @@ class SourceChannelImpl begin(); } synchronized (stateLock) { - if (!isOpen()) - throw new ClosedChannelException(); + ensureOpen(); if (blocking) thread = NativeThread.current(); } @@ -279,6 +310,7 @@ class SourceChannelImpl int n = 0; try { beginRead(blocking); + configureSocketNonBlockingIfVirtualThread(); n = IOUtil.read(fd, dst, -1, nd); if (blocking) { while (IOStatus.okayToRetry(n) && isOpen()) { @@ -306,6 +338,7 @@ class SourceChannelImpl long n = 0; try { beginRead(blocking); + configureSocketNonBlockingIfVirtualThread(); n = IOUtil.read(fd, dsts, offset, length, nd); if (blocking) { while (IOStatus.okayToRetry(n) && isOpen()) {