diff --git a/src/java.base/aix/classes/sun/nio/ch/DefaultPollerProvider.java b/src/java.base/aix/classes/sun/nio/ch/DefaultPollerProvider.java index df449cd6fd2..b645b735533 100644 --- a/src/java.base/aix/classes/sun/nio/ch/DefaultPollerProvider.java +++ b/src/java.base/aix/classes/sun/nio/ch/DefaultPollerProvider.java @@ -33,12 +33,12 @@ class DefaultPollerProvider extends PollerProvider { DefaultPollerProvider() { } @Override - Poller readPoller() throws IOException { + Poller readPoller(boolean subPoller) throws IOException { return new PollsetPoller(true); } @Override - Poller writePoller() throws IOException { + Poller writePoller(boolean subPoller) throws IOException { return new PollsetPoller(false); } } diff --git a/src/java.base/aix/classes/sun/nio/ch/PollsetPoller.java b/src/java.base/aix/classes/sun/nio/ch/PollsetPoller.java index 1aecbf7d57a..413568861e5 100644 --- a/src/java.base/aix/classes/sun/nio/ch/PollsetPoller.java +++ b/src/java.base/aix/classes/sun/nio/ch/PollsetPoller.java @@ -42,11 +42,12 @@ class PollsetPoller extends Poller { MAX_EVENTS_TO_POLL = 512; } + private final int event; private final int setid; private final long pollBuffer; PollsetPoller(boolean read) throws IOException { - super(read); + this.event = (read) ? Net.POLLIN : Net.POLLOUT; this.setid = Pollset.pollsetCreate(); this.pollBuffer = Pollset.allocatePollArray(MAX_EVENTS_TO_POLL); } @@ -58,8 +59,7 @@ class PollsetPoller extends Poller { @Override void implRegister(int fd) throws IOException { - int ret = Pollset.pollsetCtl(setid, Pollset.PS_MOD, fd, - Pollset.PS_POLLPRI | (this.reading() ? Net.POLLIN : Net.POLLOUT)); + int ret = Pollset.pollsetCtl(setid, Pollset.PS_MOD, fd, Pollset.PS_POLLPRI | event); if (ret != 0) { throw new IOException("Unable to register fd " + fd); } diff --git a/src/java.base/linux/classes/sun/nio/ch/DefaultPollerProvider.java b/src/java.base/linux/classes/sun/nio/ch/DefaultPollerProvider.java index 3c1050808f3..a9b169a4657 100644 --- a/src/java.base/linux/classes/sun/nio/ch/DefaultPollerProvider.java +++ b/src/java.base/linux/classes/sun/nio/ch/DefaultPollerProvider.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017, 2022, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2017, 2023, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -25,6 +25,7 @@ package sun.nio.ch; import java.io.IOException; +import jdk.internal.vm.ContinuationSupport; /** * Default PollerProvider for Linux. @@ -33,12 +34,31 @@ class DefaultPollerProvider extends PollerProvider { DefaultPollerProvider() { } @Override - Poller readPoller() throws IOException { - return new EPollPoller(true); + Poller.Mode defaultPollerMode() { + if (ContinuationSupport.isSupported()) { + return Poller.Mode.VTHREAD_POLLERS; + } else { + return Poller.Mode.SYSTEM_THREADS; + } } @Override - Poller writePoller() throws IOException { - return new EPollPoller(false); + int defaultReadPollers(Poller.Mode mode) { + int ncpus = Runtime.getRuntime().availableProcessors(); + if (mode == Poller.Mode.VTHREAD_POLLERS) { + return Math.min(Integer.highestOneBit(ncpus), 32); + } else { + return Math.max(Integer.highestOneBit(ncpus / 4), 1); + } + } + + @Override + Poller readPoller(boolean subPoller) throws IOException { + return new EPollPoller(subPoller, true); + } + + @Override + Poller writePoller(boolean subPoller) throws IOException { + return new EPollPoller(subPoller, false); } } diff --git a/src/java.base/linux/classes/sun/nio/ch/EPollPoller.java b/src/java.base/linux/classes/sun/nio/ch/EPollPoller.java index d3f7f97e3ab..4a8b2cdda08 100644 --- a/src/java.base/linux/classes/sun/nio/ch/EPollPoller.java +++ b/src/java.base/linux/classes/sun/nio/ch/EPollPoller.java @@ -32,18 +32,18 @@ import static sun.nio.ch.EPoll.*; */ class EPollPoller extends Poller { - private static final int MAX_EVENTS_TO_POLL = 512; private static final int ENOENT = 2; private final int epfd; private final int event; + private final int maxEvents; private final long address; - EPollPoller(boolean read) throws IOException { - super(read); + EPollPoller(boolean subPoller, boolean read) throws IOException { this.epfd = EPoll.create(); this.event = (read) ? EPOLLIN : EPOLLOUT; - this.address = EPoll.allocatePollArray(MAX_EVENTS_TO_POLL); + this.maxEvents = (subPoller) ? 64 : 512; + this.address = EPoll.allocatePollArray(maxEvents); } @Override @@ -68,7 +68,7 @@ class EPollPoller extends Poller { @Override int poll(int timeout) throws IOException { - int n = EPoll.wait(epfd, address, MAX_EVENTS_TO_POLL, timeout); + int n = EPoll.wait(epfd, address, maxEvents, timeout); int i = 0; while (i < n) { long eventAddress = EPoll.getEvent(address, i); diff --git a/src/java.base/macosx/classes/sun/nio/ch/DefaultPollerProvider.java b/src/java.base/macosx/classes/sun/nio/ch/DefaultPollerProvider.java index 17e1d3425ba..dc32c2cd90c 100644 --- a/src/java.base/macosx/classes/sun/nio/ch/DefaultPollerProvider.java +++ b/src/java.base/macosx/classes/sun/nio/ch/DefaultPollerProvider.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017, 2022, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2017, 2023, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -33,12 +33,12 @@ class DefaultPollerProvider extends PollerProvider { DefaultPollerProvider() { } @Override - Poller readPoller() throws IOException { - return new KQueuePoller(true); + Poller readPoller(boolean subPoller) throws IOException { + return new KQueuePoller(subPoller, true); } @Override - Poller writePoller() throws IOException { - return new KQueuePoller(false); + Poller writePoller(boolean subPoller) throws IOException { + return new KQueuePoller(subPoller, false); } } diff --git a/src/java.base/macosx/classes/sun/nio/ch/KQueuePoller.java b/src/java.base/macosx/classes/sun/nio/ch/KQueuePoller.java index dd3738b7d72..645b17e458e 100644 --- a/src/java.base/macosx/classes/sun/nio/ch/KQueuePoller.java +++ b/src/java.base/macosx/classes/sun/nio/ch/KQueuePoller.java @@ -31,17 +31,16 @@ import static sun.nio.ch.KQueue.*; * Poller implementation based on the kqueue facility. */ class KQueuePoller extends Poller { - private static final int MAX_EVENTS_TO_POLL = 512; - private final int kqfd; private final int filter; + private final int maxEvents; private final long address; - KQueuePoller(boolean read) throws IOException { - super(read); + KQueuePoller(boolean subPoller, boolean read) throws IOException { this.kqfd = KQueue.create(); this.filter = (read) ? EVFILT_READ : EVFILT_WRITE; - this.address = KQueue.allocatePollArray(MAX_EVENTS_TO_POLL); + this.maxEvents = (subPoller) ? 64 : 512; + this.address = KQueue.allocatePollArray(maxEvents); } @Override @@ -63,7 +62,7 @@ class KQueuePoller extends Poller { @Override int poll(int timeout) throws IOException { - int n = KQueue.poll(kqfd, address, MAX_EVENTS_TO_POLL, timeout); + int n = KQueue.poll(kqfd, address, maxEvents, timeout); int i = 0; while (i < n) { long keventAddress = KQueue.getEvent(address, i); diff --git a/src/java.base/share/classes/sun/nio/ch/Poller.java b/src/java.base/share/classes/sun/nio/ch/Poller.java index d39b5945b07..f62371d8344 100644 --- a/src/java.base/share/classes/sun/nio/ch/Poller.java +++ b/src/java.base/share/classes/sun/nio/ch/Poller.java @@ -24,52 +24,100 @@ */ package sun.nio.ch; -import java.io.IOError; import java.io.IOException; +import java.util.Arrays; import java.util.Map; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.LinkedTransferQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.locks.LockSupport; import java.util.function.BooleanSupplier; -import java.util.stream.Stream; import jdk.internal.misc.InnocuousThread; -import jdk.internal.access.JavaLangAccess; -import jdk.internal.access.SharedSecrets; import sun.security.action.GetPropertyAction; /** * Polls file descriptors. Virtual threads invoke the poll method to park * until a given file descriptor is ready for I/O. */ -public abstract class Poller { - private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess(); - private static final Poller[] READ_POLLERS; - private static final Poller[] WRITE_POLLERS; - private static final int READ_MASK, WRITE_MASK; - private static final boolean USE_DIRECT_REGISTER; - - // true if this is a poller for reading, false for writing - private final boolean read; +abstract class Poller { + private static final Pollers POLLERS; + static { + try { + var pollers = new Pollers(); + pollers.start(); + POLLERS = pollers; + } catch (IOException ioe) { + throw new ExceptionInInitializerError(ioe); + } + } // maps file descriptors to parked Thread private final Map<Integer, Thread> map = new ConcurrentHashMap<>(); - // the queue of updates to the updater Thread - private final BlockingQueue<Request> queue = new LinkedTransferQueue<>(); - /** - * Initialize a Poller for reading or writing. + * Poller mode. */ - protected Poller(boolean read) { - this.read = read; + enum Mode { + /** + * ReadPoller and WritePoller are dedicated platform threads that block waiting + * for events and unpark virtual threads when file descriptors are ready for I/O. + */ + SYSTEM_THREADS, + + /** + * ReadPoller and WritePoller threads are virtual threads that poll for events, + * yielding between polls and unparking virtual threads when file descriptors are + * ready for I/O. If there are no events then the poller threads park until there + * are I/O events to poll. This mode helps to integrate polling with virtual + * thread scheduling. The approach is similar to the default scheme in "User-level + * Threading: Have Your Cake and Eat It Too" by Karsten and Barghi 2020 + * (https://dl.acm.org/doi/10.1145/3379483). + */ + VTHREAD_POLLERS } /** - * Returns true if this poller is for read (POLLIN) events. + * Initialize a Poller. */ - final boolean reading() { - return read; + protected Poller() { + } + + /** + * Returns the poller's file descriptor, used when the read and write poller threads + * are virtual threads. + * + * @throws UnsupportedOperationException if not supported + */ + int fdVal() { + throw new UnsupportedOperationException(); + } + + /** + * Register the file descriptor. + */ + abstract void implRegister(int fdVal) throws IOException; + + /** + * Deregister the file descriptor. + */ + abstract void implDeregister(int fdVal); + + /** + * Poll for events. The {@link #polled(int)} method is invoked for each + * polled file descriptor. + * + * @param timeout if positive then block for up to {@code timeout} milliseconds, + * if zero then don't block, if -1 then block indefinitely + * @return the number of file descriptors polled + */ + abstract int poll(int timeout) throws IOException; + + /** + * Callback by the poll method when a file descriptor is polled. + */ + final void polled(int fdVal) { + wakeup(fdVal); } /** @@ -79,35 +127,45 @@ public abstract class Poller { * @param nanos the waiting time or 0 to wait indefinitely * @param supplier supplies a boolean to indicate if the enclosing object is open */ - public static void poll(int fdVal, int event, long nanos, BooleanSupplier supplier) + static void poll(int fdVal, int event, long nanos, BooleanSupplier supplier) throws IOException { assert nanos >= 0L; if (event == Net.POLLIN) { - readPoller(fdVal).poll(fdVal, nanos, supplier); + POLLERS.readPoller(fdVal).poll(fdVal, nanos, supplier); } else if (event == Net.POLLOUT) { - writePoller(fdVal).poll(fdVal, nanos, supplier); + POLLERS.writePoller(fdVal).poll(fdVal, nanos, supplier); } else { assert false; } } /** - * Parks the current thread until a file descriptor is ready. + * If there is a thread polling the given file descriptor for the given event then + * the thread is unparked. */ - private void poll(int fdVal, long nanos, BooleanSupplier supplier) throws IOException { - if (USE_DIRECT_REGISTER) { - pollDirect(fdVal, nanos, supplier); + static void stopPoll(int fdVal, int event) { + if (event == Net.POLLIN) { + POLLERS.readPoller(fdVal).wakeup(fdVal); + } else if (event == Net.POLLOUT) { + POLLERS.writePoller(fdVal).wakeup(fdVal); } else { - pollIndirect(fdVal, nanos, supplier); + throw new IllegalArgumentException(); } } /** - * Parks the current thread until a file descriptor is ready. This implementation - * registers the file descriptor, then parks until the file descriptor is polled. + * If there are any threads polling the given file descriptor then they are unparked. */ - private void pollDirect(int fdVal, long nanos, BooleanSupplier supplier) throws IOException { + static void stopPoll(int fdVal) { + stopPoll(fdVal, Net.POLLIN); + stopPoll(fdVal, Net.POLLOUT); + } + + /** + * Parks the current thread until a file descriptor is ready. + */ + private void poll(int fdVal, long nanos, BooleanSupplier supplier) throws IOException { register(fdVal); try { boolean isOpen = supplier.getAsBoolean(); @@ -123,28 +181,6 @@ public abstract class Poller { } } - /** - * Parks the current thread until a file descriptor is ready. This implementation - * queues the file descriptor to the update thread, then parks until the file - * descriptor is polled. - */ - private void pollIndirect(int fdVal, long nanos, BooleanSupplier supplier) { - Request request = registerAsync(fdVal); - try { - boolean isOpen = supplier.getAsBoolean(); - if (isOpen) { - if (nanos > 0) { - LockSupport.parkNanos(nanos); - } else { - LockSupport.park(); - } - } - } finally { - request.awaitFinish(); - deregister(fdVal); - } - } - /** * Registers the file descriptor. */ @@ -154,18 +190,6 @@ public abstract class Poller { implRegister(fdVal); } - /** - * Queues the file descriptor to be registered by the updater thread, returning - * a Request object to track the request. - */ - private Request registerAsync(int fdVal) { - Thread previous = map.putIfAbsent(fdVal, Thread.currentThread()); - assert previous == null; - Request request = new Request(fdVal); - queue.add(request); - return request; - } - /** * Deregister the file descriptor, a no-op if already polled. */ @@ -177,159 +201,6 @@ public abstract class Poller { } } - /** - * A registration request queued to the updater thread. - */ - private static class Request { - private final int fdVal; - private volatile boolean done; - private volatile Thread waiter; - - Request(int fdVal) { - this.fdVal = fdVal; - } - - private int fdVal() { - return fdVal; - } - - /** - * Invoked by the updater when the request has been processed. - */ - void finish() { - done = true; - Thread waiter = this.waiter; - if (waiter != null) { - LockSupport.unpark(waiter); - } - } - - /** - * Waits for a request to be processed. - */ - void awaitFinish() { - if (!done) { - waiter = Thread.currentThread(); - boolean interrupted = false; - while (!done) { - LockSupport.park(); - if (Thread.interrupted()) { - interrupted = true; - } - } - if (interrupted) { - Thread.currentThread().interrupt(); - } - } - } - } - - /** - * Register the file descriptor. - */ - abstract void implRegister(int fdVal) throws IOException; - - /** - * Deregister the file descriptor. - */ - abstract void implDeregister(int fdVal); - - /** - * Starts the poller threads. - */ - private Poller start() { - String prefix = (read) ? "Read" : "Write"; - startThread(prefix + "-Poller", this::pollLoop); - if (!USE_DIRECT_REGISTER) { - startThread(prefix + "-Updater", this::updateLoop); - } - return this; - } - - /** - * Starts a platform thread to run the given task. - */ - private void startThread(String name, Runnable task) { - try { - Thread thread = JLA.executeOnCarrierThread(() -> - InnocuousThread.newSystemThread(name, task) - ); - thread.setDaemon(true); - thread.start(); - } catch (Exception e) { - throw new InternalError(e); - } - } - - /** - * Polling loop. - */ - private void pollLoop() { - try { - for (;;) { - poll(); - } - } catch (Exception e) { - e.printStackTrace(); - } - } - - /** - * The update loop to handle updates to the interest set. - */ - private void updateLoop() { - try { - for (;;) { - Request req = null; - while (req == null) { - try { - req = queue.take(); - } catch (InterruptedException ignore) { } - } - implRegister(req.fdVal()); - req.finish(); - } - } catch (Exception e) { - e.printStackTrace(); - } - } - - /** - * Maps the file descriptor value to a read poller. - */ - private static Poller readPoller(int fdVal) { - return READ_POLLERS[fdVal & READ_MASK]; - } - - /** - * Maps the file descriptor value to a write poller. - */ - private static Poller writePoller(int fdVal) { - return WRITE_POLLERS[fdVal & WRITE_MASK]; - } - - /** - * Unparks any thread that is polling the given file descriptor for the - * given event. - */ - static void stopPoll(int fdVal, int event) { - if (event == Net.POLLIN) { - readPoller(fdVal).wakeup(fdVal); - } else if (event == Net.POLLOUT) { - writePoller(fdVal).wakeup(fdVal); - } else { - throw new IllegalArgumentException(); - } - } - - /** - * Unparks any threads that are polling the given file descriptor. - */ - static void stopPoll(int fdVal) { - stopPoll(fdVal, Net.POLLIN); - stopPoll(fdVal, Net.POLLOUT); - } - /** * Unparks any thread that is polling the given file descriptor. */ @@ -341,129 +212,178 @@ public abstract class Poller { } /** - * Called by the polling facility when the file descriptor is polled + * Master polling loop. The {@link #polled(int)} method is invoked for each file + * descriptor that is polled. */ - final void polled(int fdVal) { - wakeup(fdVal); - } - - /** - * Poll for events. The {@link #polled(int)} method is invoked for each - * polled file descriptor. - * - * @param timeout if positive then block for up to {@code timeout} milliseconds, - * if zero then don't block, if -1 then block indefinitely - */ - abstract int poll(int timeout) throws IOException; - - /** - * Poll for events, blocks indefinitely. - */ - final int poll() throws IOException { - return poll(-1); - } - - /** - * Poll for events, non-blocking. - */ - final int pollNow() throws IOException { - return poll(0); - } - - /** - * Returns the poller's file descriptor, or -1 if none. - */ - int fdVal() { - return -1; - } - - /** - * Creates the read and writer pollers. - */ - static { - PollerProvider provider = PollerProvider.provider(); - String s = GetPropertyAction.privilegedGetProperty("jdk.useDirectRegister"); - if (s == null) { - USE_DIRECT_REGISTER = provider.useDirectRegister(); - } else { - USE_DIRECT_REGISTER = "".equals(s) || Boolean.parseBoolean(s); - } + private void pollerLoop() { try { - Poller[] readPollers = createReadPollers(provider); - READ_POLLERS = readPollers; - READ_MASK = readPollers.length - 1; - Poller[] writePollers = createWritePollers(provider); - WRITE_POLLERS = writePollers; - WRITE_MASK = writePollers.length - 1; - } catch (IOException ioe) { - throw new IOError(ioe); + for (;;) { + poll(-1); + } + } catch (Exception e) { + e.printStackTrace(); } } /** - * Create the read poller(s). + * Sub-poller polling loop. The {@link #polled(int)} method is invoked for each file + * descriptor that is polled. + * + * The sub-poller registers its file descriptor with the master poller to park until + * there are events to poll. When unparked, it does non-blocking polls and parks + * again when there are no more events. The sub-poller yields after each poll to help + * with fairness and to avoid re-registering with the master poller where possible. */ - private static Poller[] createReadPollers(PollerProvider provider) throws IOException { - int readPollerCount = pollerCount("jdk.readPollers"); - Poller[] readPollers = new Poller[readPollerCount]; - for (int i = 0; i< readPollerCount; i++) { - var poller = provider.readPoller(); - readPollers[i] = poller.start(); + private void subPollerLoop(Poller masterPoller) { + assert Thread.currentThread().isVirtual(); + try { + int polled = 0; + for (;;) { + if (polled == 0) { + masterPoller.poll(fdVal(), 0, () -> true); // park + } else { + Thread.yield(); + } + polled = poll(0); + } + } catch (Exception e) { + e.printStackTrace(); } - return readPollers; } /** - * Create the write poller(s). + * The Pollers used for read and write events. */ - private static Poller[] createWritePollers(PollerProvider provider) throws IOException { - int writePollerCount = pollerCount("jdk.writePollers"); - Poller[] writePollers = new Poller[writePollerCount]; - for (int i = 0; i< writePollerCount; i++) { - var poller = provider.writePoller(); - writePollers[i] = poller.start(); + private static class Pollers { + private final PollerProvider provider; + private final Poller.Mode pollerMode; + private final Poller masterPoller; + private final Poller[] readPollers; + private final Poller[] writePollers; + + // used by start method to executor is kept alive + private Executor executor; + + /** + * Creates the Poller instances based on configuration. + */ + Pollers() throws IOException { + PollerProvider provider = PollerProvider.provider(); + Poller.Mode mode; + String s = GetPropertyAction.privilegedGetProperty("jdk.pollerMode"); + if (s != null) { + if (s.equalsIgnoreCase(Mode.SYSTEM_THREADS.name()) || s.equals("1")) { + mode = Mode.SYSTEM_THREADS; + } else if (s.equalsIgnoreCase(Mode.VTHREAD_POLLERS.name()) || s.equals("2")) { + mode = Mode.VTHREAD_POLLERS; + } else { + throw new RuntimeException("Can't parse '" + s + "' as polling mode"); + } + } else { + mode = provider.defaultPollerMode(); + } + + // vthread poller mode needs a master poller + Poller masterPoller = (mode == Mode.VTHREAD_POLLERS) + ? provider.readPoller(false) + : null; + + // read pollers (or sub-pollers) + int readPollerCount = pollerCount("jdk.readPollers", provider.defaultReadPollers(mode)); + Poller[] readPollers = new Poller[readPollerCount]; + for (int i = 0; i < readPollerCount; i++) { + readPollers[i] = provider.readPoller(mode == Mode.VTHREAD_POLLERS); + } + + // write pollers (or sub-pollers) + int writePollerCount = pollerCount("jdk.writePollers", provider.defaultWritePollers(mode)); + Poller[] writePollers = new Poller[writePollerCount]; + for (int i = 0; i < writePollerCount; i++) { + writePollers[i] = provider.writePoller(mode == Mode.VTHREAD_POLLERS); + } + + this.provider = provider; + this.pollerMode = mode; + this.masterPoller = masterPoller; + this.readPollers = readPollers; + this.writePollers = writePollers; } - return writePollers; - } - /** - * Reads the given property name to get the poller count. If the property is - * set then the value must be a power of 2. Returns 1 if the property is not - * set. - * @throws IllegalArgumentException if the property is set to a value that - * is not a power of 2. - */ - private static int pollerCount(String propName) { - String s = GetPropertyAction.privilegedGetProperty(propName, "1"); - int count = Integer.parseInt(s); - - // check power of 2 - if (count != (1 << log2(count))) { - String msg = propName + " is set to a vale that is not a power of 2"; - throw new IllegalArgumentException(msg); + /** + * Starts the Poller threads. + */ + void start() { + if (pollerMode == Mode.VTHREAD_POLLERS) { + startPlatformThread("MasterPoller", masterPoller::pollerLoop); + ThreadFactory factory = Thread.ofVirtual() + .inheritInheritableThreadLocals(false) + .name("SubPoller-", 0) + .uncaughtExceptionHandler((t, e) -> e.printStackTrace()) + .factory(); + executor = Executors.newThreadPerTaskExecutor(factory); + Arrays.stream(readPollers).forEach(p -> { + executor.execute(() -> p.subPollerLoop(masterPoller)); + }); + Arrays.stream(writePollers).forEach(p -> { + executor.execute(() -> p.subPollerLoop(masterPoller)); + }); + } else { + Arrays.stream(readPollers).forEach(p -> { + startPlatformThread("Read-Poller", p::pollerLoop); + }); + Arrays.stream(writePollers).forEach(p -> { + startPlatformThread("Write-Poller", p::pollerLoop); + }); + } } - return count; - } - private static int log2(int n) { - return 31 - Integer.numberOfLeadingZeros(n); - } - - /** - * Return a stream of all threads blocked waiting for I/O operations. - */ - public static Stream<Thread> blockedThreads() { - Stream<Thread> s = Stream.empty(); - for (int i = 0; i < READ_POLLERS.length; i++) { - s = Stream.concat(s, READ_POLLERS[i].registeredThreads()); + /** + * Returns the read poller for the given file descriptor. + */ + Poller readPoller(int fdVal) { + int index = provider.fdValToIndex(fdVal, readPollers.length); + return readPollers[index]; } - for (int i = 0; i < WRITE_POLLERS.length; i++) { - s = Stream.concat(s, WRITE_POLLERS[i].registeredThreads()); - } - return s; - } - private Stream<Thread> registeredThreads() { - return map.values().stream(); + /** + * Returns the write poller for the given file descriptor. + */ + Poller writePoller(int fdVal) { + int index = provider.fdValToIndex(fdVal, writePollers.length); + return writePollers[index]; + } + + /** + * Reads the given property name to get the poller count. If the property is + * set then the value must be a power of 2. Returns 1 if the property is not + * set. + * @throws IllegalArgumentException if the property is set to a value that + * is not a power of 2. + */ + private static int pollerCount(String propName, int defaultCount) { + String s = GetPropertyAction.privilegedGetProperty(propName); + int count = (s != null) ? Integer.parseInt(s) : defaultCount; + + // check power of 2 + if (count != Integer.highestOneBit(count)) { + String msg = propName + " is set to a vale that is not a power of 2"; + throw new IllegalArgumentException(msg); + } + return count; + } + + /** + * Starts a platform thread to run the given task. + */ + private void startPlatformThread(String name, Runnable task) { + try { + Thread thread = InnocuousThread.newSystemThread(name, task); + thread.setDaemon(true); + thread.setUncaughtExceptionHandler((t, e) -> e.printStackTrace()); + thread.start(); + } catch (Exception e) { + throw new InternalError(e); + } + } } } diff --git a/src/java.base/share/classes/sun/nio/ch/PollerProvider.java b/src/java.base/share/classes/sun/nio/ch/PollerProvider.java index 90bdcbd08ae..b10ec309265 100644 --- a/src/java.base/share/classes/sun/nio/ch/PollerProvider.java +++ b/src/java.base/share/classes/sun/nio/ch/PollerProvider.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017, 2022, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2017, 2023, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -25,46 +25,63 @@ package sun.nio.ch; import java.io.IOException; -import java.util.ServiceConfigurationError; -import sun.security.action.GetPropertyAction; +/** + * Provider class for Poller implementations. + */ abstract class PollerProvider { + private static final PollerProvider INSTANCE = new DefaultPollerProvider(); + PollerProvider() { } /** - * Returns true if threads should register file descriptors directly, - * false to queue registrations to an updater thread. - * - * The default implementation returns false. + * Returns the system-wide PollerProvider. */ - boolean useDirectRegister() { - return false; + static PollerProvider provider() { + return INSTANCE; + } + + /** + * Returns the default poller mode. + * @implSpec The default implementation uses system threads. + */ + Poller.Mode defaultPollerMode() { + return Poller.Mode.SYSTEM_THREADS; + } + + /** + * Default number of read pollers for the given mode. The count must be a power of 2. + * @implSpec The default implementation returns 1. + */ + int defaultReadPollers(Poller.Mode mode) { + return 1; + } + + /** + * Default number of write pollers for the given mode. The count must be a power of 2. + * @implSpec The default implementation returns 1. + */ + int defaultWritePollers(Poller.Mode mode) { + return 1; + } + + /** + * Maps a file descriptor to an index from 0 to {@code toIndex}. + * @implSpec The default implementation is good for Unix file descriptors. + */ + int fdValToIndex(int fdVal, int toIndex) { + return fdVal & (toIndex - 1); } /** * Creates a Poller for read ops. + * @param subPoller true to create a sub-poller */ - abstract Poller readPoller() throws IOException; + abstract Poller readPoller(boolean subPoller) throws IOException; /** * Creates a Poller for write ops. + * @param subPoller true to create a sub-poller */ - abstract Poller writePoller() throws IOException; - - /** - * Creates the PollerProvider. - */ - static PollerProvider provider() { - String cn = GetPropertyAction.privilegedGetProperty("jdk.PollerProvider"); - if (cn != null) { - try { - Class<?> clazz = Class.forName(cn, true, ClassLoader.getSystemClassLoader()); - return (PollerProvider) clazz.getConstructor().newInstance(); - } catch (Exception e) { - throw new ServiceConfigurationError(null, e); - } - } else { - return new DefaultPollerProvider(); - } - } + abstract Poller writePoller(boolean subPoller) throws IOException; } diff --git a/src/java.base/windows/classes/sun/nio/ch/DefaultPollerProvider.java b/src/java.base/windows/classes/sun/nio/ch/DefaultPollerProvider.java index 67625352baf..abd2f34a229 100644 --- a/src/java.base/windows/classes/sun/nio/ch/DefaultPollerProvider.java +++ b/src/java.base/windows/classes/sun/nio/ch/DefaultPollerProvider.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019, 2022, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2019, 2023, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -33,12 +33,26 @@ class DefaultPollerProvider extends PollerProvider { DefaultPollerProvider() { } @Override - Poller readPoller() throws IOException { + int defaultReadPollers(Poller.Mode mode) { + assert mode == Poller.Mode.SYSTEM_THREADS; + int ncpus = Runtime.getRuntime().availableProcessors(); + return Math.max(Integer.highestOneBit(ncpus / 8), 1); + } + + @Override + int fdValToIndex(int fdVal, int toIndex) { + return (fdVal >> 2) & (toIndex - 1); + } + + @Override + Poller readPoller(boolean subPoller) throws IOException { + assert !subPoller; return new WEPollPoller(true); } @Override - Poller writePoller() throws IOException { + Poller writePoller(boolean subPoller) throws IOException { + assert !subPoller; return new WEPollPoller(false); } } diff --git a/src/java.base/windows/classes/sun/nio/ch/WEPollPoller.java b/src/java.base/windows/classes/sun/nio/ch/WEPollPoller.java index e24bc0f83b3..21aee78d03b 100644 --- a/src/java.base/windows/classes/sun/nio/ch/WEPollPoller.java +++ b/src/java.base/windows/classes/sun/nio/ch/WEPollPoller.java @@ -39,7 +39,6 @@ class WEPollPoller extends Poller { private final long address; WEPollPoller(boolean read) throws IOException { - super(read); this.handle = WEPoll.create(); this.event = (read) ? EPOLLIN : EPOLLOUT; this.address = WEPoll.allocatePollArray(MAX_EVENTS_TO_POLL); diff --git a/test/hotspot/jtreg/ProblemList-Virtual.txt b/test/hotspot/jtreg/ProblemList-Virtual.txt index 8f044dfaeca..7615e103d1a 100644 --- a/test/hotspot/jtreg/ProblemList-Virtual.txt +++ b/test/hotspot/jtreg/ProblemList-Virtual.txt @@ -84,11 +84,11 @@ vmTestbase/nsk/jdb/repeat/repeat001/repeat001.java 8300707 generic-all vmTestbase/nsk/jdi/ExceptionEvent/catchLocation/location002/TestDescription.java 8278470 generic-all ### -# This test always times out on windows. This is due to the test forcing OOME in the -# debuggee, which has the side affect of making the Read-Poller thread exit. Because +# This test times out on Windows and Linux. This is due to the test forcing OOME in +# the debuggee, which can lead to I/O poller threads exiting. Because # of this no vthreads can complete their reads, and the test times out as a result. -vmTestbase/nsk/jdi/VMOutOfMemoryException/VMOutOfMemoryException001/VMOutOfMemoryException001.java 8285417 windows-all +vmTestbase/nsk/jdi/VMOutOfMemoryException/VMOutOfMemoryException001/VMOutOfMemoryException001.java 8285417 generic-all ########## ## Tests incompatible with with virtual test thread factory. diff --git a/test/jdk/java/net/vthread/BlockingSocketOps.java b/test/jdk/java/net/vthread/BlockingSocketOps.java index d9ae36fbb77..f72cf768d22 100644 --- a/test/jdk/java/net/vthread/BlockingSocketOps.java +++ b/test/jdk/java/net/vthread/BlockingSocketOps.java @@ -24,17 +24,17 @@ /** * @test id=default * @bug 8284161 - * @summary Test virtual threads doing blocking I/O on java.net sockets + * @summary Test virtual threads doing blocking I/O on java.net Sockets * @library /test/lib * @run junit BlockingSocketOps */ /** - * @test id=direct-register - * @summary Test virtual threads doing blocking I/O on java.net sockets and with - * the I/O poller configured to use direct registration + * @test id=poller-modes + * @requires (os.family == "linux") | (os.family == "mac") * @library /test/lib - * @run junit/othervm -Djdk.useDirectRegister BlockingSocketOps + * @run junit/othervm -Djdk.pollerMode=1 BlockingSocketOps + * @run junit/othervm -Djdk.pollerMode=2 BlockingSocketOps */ /** diff --git a/test/jdk/java/nio/channels/vthread/BlockingChannelOps.java b/test/jdk/java/nio/channels/vthread/BlockingChannelOps.java index bf35fe22aee..7629b2e21b1 100644 --- a/test/jdk/java/nio/channels/vthread/BlockingChannelOps.java +++ b/test/jdk/java/nio/channels/vthread/BlockingChannelOps.java @@ -30,11 +30,11 @@ */ /** - * @test id=direct-register - * @summary Test virtual threads doing blocking I/O on NIO channels and with - * the I/O poller configured to use direct registration + * @test id=poller-modes + * @requires (os.family == "linux") | (os.family == "mac") * @library /test/lib - * @run junit/othervm -Djdk.useDirectRegister BlockingChannelOps + * @run junit/othervm -Djdk.pollerMode=1 BlockingChannelOps + * @run junit/othervm -Djdk.pollerMode=2 BlockingChannelOps */ /** @@ -507,7 +507,7 @@ class BlockingChannelOps { */ @Test void testDatagramSocketAdaptorReceive2() throws Exception { - testDatagramSocketAdaptorReceive(60_1000); + testDatagramSocketAdaptorReceive(60_000); } private void testDatagramSocketAdaptorReceive(int timeout) throws Exception {