8199433: (se) select(Consumer<SelectionKey> action) as alternative to selected-key set
Reviewed-by: bpb
This commit is contained in:
parent
4a24d95917
commit
db61a602f6
src/java.base
linux/classes/sun/nio/ch
macosx/classes/sun/nio/ch
share/classes
solaris/classes/sun/nio/ch
unix/classes/sun/nio/ch
windows/classes/sun/nio/ch
test/jdk/java/nio/channels/Selector
@ -27,6 +27,7 @@ package sun.nio.ch;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.channels.ClosedSelectorException;
|
||||
import java.nio.channels.SelectionKey;
|
||||
import java.nio.channels.Selector;
|
||||
import java.nio.channels.spi.SelectorProvider;
|
||||
import java.util.ArrayDeque;
|
||||
@ -34,6 +35,7 @@ import java.util.Deque;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import static sun.nio.ch.EPoll.EPOLLIN;
|
||||
import static sun.nio.ch.EPoll.EPOLL_CTL_ADD;
|
||||
@ -97,7 +99,9 @@ class EPollSelectorImpl extends SelectorImpl {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int doSelect(long timeout) throws IOException {
|
||||
protected int doSelect(Consumer<SelectionKey> action, long timeout)
|
||||
throws IOException
|
||||
{
|
||||
assert Thread.holdsLock(this);
|
||||
|
||||
// epoll_wait timeout is int
|
||||
@ -130,7 +134,7 @@ class EPollSelectorImpl extends SelectorImpl {
|
||||
end(blocking);
|
||||
}
|
||||
processDeregisterQueue();
|
||||
return updateSelectedKeys(numEntries);
|
||||
return processEvents(numEntries, action);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -171,13 +175,13 @@ class EPollSelectorImpl extends SelectorImpl {
|
||||
}
|
||||
|
||||
/**
|
||||
* Update the keys of file descriptors that were polled and add them to
|
||||
* the selected-key set.
|
||||
* Process the polled events.
|
||||
* If the interrupt fd has been selected, drain it and clear the interrupt.
|
||||
*/
|
||||
private int updateSelectedKeys(int numEntries) throws IOException {
|
||||
private int processEvents(int numEntries, Consumer<SelectionKey> action)
|
||||
throws IOException
|
||||
{
|
||||
assert Thread.holdsLock(this);
|
||||
assert Thread.holdsLock(nioSelectedKeys());
|
||||
|
||||
boolean interrupted = false;
|
||||
int numKeysUpdated = 0;
|
||||
@ -190,17 +194,7 @@ class EPollSelectorImpl extends SelectorImpl {
|
||||
SelectionKeyImpl ski = fdToKey.get(fd);
|
||||
if (ski != null) {
|
||||
int rOps = EPoll.getEvents(event);
|
||||
if (selectedKeys.contains(ski)) {
|
||||
if (ski.translateAndUpdateReadyOps(rOps)) {
|
||||
numKeysUpdated++;
|
||||
}
|
||||
} else {
|
||||
ski.translateAndSetReadyOps(rOps);
|
||||
if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
|
||||
selectedKeys.add(ski);
|
||||
numKeysUpdated++;
|
||||
}
|
||||
}
|
||||
numKeysUpdated += processReadyEvents(rOps, ski, action);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -27,6 +27,7 @@ package sun.nio.ch;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.channels.ClosedSelectorException;
|
||||
import java.nio.channels.SelectionKey;
|
||||
import java.nio.channels.Selector;
|
||||
import java.nio.channels.spi.SelectorProvider;
|
||||
import java.util.ArrayDeque;
|
||||
@ -34,6 +35,7 @@ import java.util.Deque;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import static sun.nio.ch.KQueue.EVFILT_READ;
|
||||
import static sun.nio.ch.KQueue.EVFILT_WRITE;
|
||||
@ -100,7 +102,9 @@ class KQueueSelectorImpl extends SelectorImpl {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int doSelect(long timeout) throws IOException {
|
||||
protected int doSelect(Consumer<SelectionKey> action, long timeout)
|
||||
throws IOException
|
||||
{
|
||||
assert Thread.holdsLock(this);
|
||||
|
||||
long to = Math.min(timeout, Integer.MAX_VALUE); // max kqueue timeout
|
||||
@ -132,7 +136,7 @@ class KQueueSelectorImpl extends SelectorImpl {
|
||||
end(blocking);
|
||||
}
|
||||
processDeregisterQueue();
|
||||
return updateSelectedKeys(numEntries);
|
||||
return processEvents(numEntries, action);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -180,13 +184,13 @@ class KQueueSelectorImpl extends SelectorImpl {
|
||||
}
|
||||
|
||||
/**
|
||||
* Update the keys of file descriptors that were polled and add them to
|
||||
* the selected-key set.
|
||||
* Process the polled events.
|
||||
* If the interrupt fd has been selected, drain it and clear the interrupt.
|
||||
*/
|
||||
private int updateSelectedKeys(int numEntries) throws IOException {
|
||||
private int processEvents(int numEntries, Consumer<SelectionKey> action)
|
||||
throws IOException
|
||||
{
|
||||
assert Thread.holdsLock(this);
|
||||
assert Thread.holdsLock(nioSelectedKeys());
|
||||
|
||||
int numKeysUpdated = 0;
|
||||
boolean interrupted = false;
|
||||
@ -214,22 +218,10 @@ class KQueueSelectorImpl extends SelectorImpl {
|
||||
} else if (filter == EVFILT_WRITE) {
|
||||
rOps |= Net.POLLOUT;
|
||||
}
|
||||
|
||||
if (selectedKeys.contains(ski)) {
|
||||
if (ski.translateAndUpdateReadyOps(rOps)) {
|
||||
// file descriptor may be polled more than once per poll
|
||||
if (ski.lastPolled != pollCount) {
|
||||
numKeysUpdated++;
|
||||
ski.lastPolled = pollCount;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
ski.translateAndSetReadyOps(rOps);
|
||||
if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
|
||||
selectedKeys.add(ski);
|
||||
numKeysUpdated++;
|
||||
ski.lastPolled = pollCount;
|
||||
}
|
||||
int updated = processReadyEvents(rOps, ski, action);
|
||||
if (updated > 0 && ski.lastPolled != pollCount) {
|
||||
numKeysUpdated++;
|
||||
ski.lastPolled = pollCount;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -291,7 +291,7 @@ public abstract class SelectionKey {
|
||||
* detects that the corresponding channel is ready for reading, has reached
|
||||
* end-of-stream, has been remotely shut down for further reading, or has
|
||||
* an error pending, then it will add {@code OP_READ} to the key's
|
||||
* ready-operation set and add the key to its selected-key set. </p>
|
||||
* ready-operation set. </p>
|
||||
*/
|
||||
public static final int OP_READ = 1 << 0;
|
||||
|
||||
@ -303,8 +303,7 @@ public abstract class SelectionKey {
|
||||
* href="Selector.html#selop">selection operation</a>. If the selector
|
||||
* detects that the corresponding channel is ready for writing, has been
|
||||
* remotely shut down for further writing, or has an error pending, then it
|
||||
* will add {@code OP_WRITE} to the key's ready set and add the key to its
|
||||
* selected-key set. </p>
|
||||
* will add {@code OP_WRITE} to the key's ready set. </p>
|
||||
*/
|
||||
public static final int OP_WRITE = 1 << 2;
|
||||
|
||||
@ -316,8 +315,7 @@ public abstract class SelectionKey {
|
||||
* href="Selector.html#selop">selection operation</a>. If the selector
|
||||
* detects that the corresponding socket channel is ready to complete its
|
||||
* connection sequence, or has an error pending, then it will add
|
||||
* {@code OP_CONNECT} to the key's ready set and add the key to its
|
||||
* selected-key set. </p>
|
||||
* {@code OP_CONNECT} to the key's ready set. </p>
|
||||
*/
|
||||
public static final int OP_CONNECT = 1 << 3;
|
||||
|
||||
@ -329,8 +327,7 @@ public abstract class SelectionKey {
|
||||
* href="Selector.html#selop">selection operation</a>. If the selector
|
||||
* detects that the corresponding server-socket channel is ready to accept
|
||||
* another connection, or has an error pending, then it will add
|
||||
* {@code OP_ACCEPT} to the key's ready set and add the key to its
|
||||
* selected-key set. </p>
|
||||
* {@code OP_ACCEPT} to the key's ready set. </p>
|
||||
*/
|
||||
public static final int OP_ACCEPT = 1 << 4;
|
||||
|
||||
|
@ -28,7 +28,9 @@ package java.nio.channels;
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.nio.channels.spi.SelectorProvider;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
|
||||
/**
|
||||
@ -56,7 +58,8 @@ import java.util.Set;
|
||||
*
|
||||
* <li><p> The <i>selected-key set</i> is the set of keys such that each
|
||||
* key's channel was detected to be ready for at least one of the operations
|
||||
* identified in the key's interest set during a prior selection operation.
|
||||
* identified in the key's interest set during a prior selection operation
|
||||
* that adds keys or updates keys in the set.
|
||||
* This set is returned by the {@link #selectedKeys() selectedKeys} method.
|
||||
* The selected-key set is always a subset of the key set. </p></li>
|
||||
*
|
||||
@ -92,6 +95,27 @@ import java.util.Set;
|
||||
* <a id="selop"></a>
|
||||
* <h2>Selection</h2>
|
||||
*
|
||||
* <p> A selection operation queries the underlying operating system for an
|
||||
* update as to the readiness of each registered channel to perform any of the
|
||||
* operations identified by its key's interest set. There are two forms of
|
||||
* selection operation:
|
||||
*
|
||||
* <ol>
|
||||
*
|
||||
* <li><p> The {@link #select()}, {@link #select(long)}, and {@link #selectNow()}
|
||||
* methods add the keys of channels ready to perform an operation to the
|
||||
* selected-key set, or update the ready-operation set of keys already in the
|
||||
* selected-key set. </p></li>
|
||||
*
|
||||
* <li><p> The {@link #select(Consumer)}, {@link #select(Consumer, long)}, and
|
||||
* {@link #selectNow(Consumer)} methods perform an <i>action</i> on the key
|
||||
* of each channel that is ready to perform an operation. These methods do
|
||||
* not add to the selected-key set. </p></li>
|
||||
*
|
||||
* </ol>
|
||||
*
|
||||
* <h3>Selection operations that add to the selected-key set</h3>
|
||||
*
|
||||
* <p> During each selection operation, keys may be added to and removed from a
|
||||
* selector's selected-key set and may be removed from its key and
|
||||
* cancelled-key sets. Selection is performed by the {@link #select()}, {@link
|
||||
@ -141,6 +165,45 @@ import java.util.Set;
|
||||
* difference between the three selection methods. </p>
|
||||
*
|
||||
*
|
||||
* <h3>Selection operations that perform an action on selected keys</h3>
|
||||
*
|
||||
* <p> During each selection operation, keys may be removed from the selector's
|
||||
* key, selected-key, and cancelled-key sets. Selection is performed by the
|
||||
* {@link #select(Consumer)}, {@link #select(Consumer,long)}, and {@link
|
||||
* #selectNow(Consumer)} methods, and involves three steps: </p>
|
||||
*
|
||||
* <ol>
|
||||
*
|
||||
* <li><p> Each key in the cancelled-key set is removed from each key set of
|
||||
* which it is a member, and its channel is deregistered. This step leaves
|
||||
* the cancelled-key set empty. </p></li>
|
||||
*
|
||||
* <li><p> The underlying operating system is queried for an update as to the
|
||||
* readiness of each remaining channel to perform any of the operations
|
||||
* identified by its key's interest set as of the moment that the selection
|
||||
* operation began.
|
||||
*
|
||||
* <p> For a channel that is ready for at least one such operation, the
|
||||
* ready-operation set of the channel's key is set to identify exactly the
|
||||
* operations for which the channel is ready and the <i>action</i> specified
|
||||
* to the {@code select} method is invoked to consume the channel's key. Any
|
||||
* readiness information previously recorded in the ready set is discarded
|
||||
* prior to invoking the <i>action</i>.
|
||||
*
|
||||
* <p> Alternatively, where a channel is ready for more than one operation,
|
||||
* the <i>action</i> may be invoked more than once with the channel's key and
|
||||
* ready-operation set modified to a subset of the operations for which the
|
||||
* channel is ready. Where the <i>action</i> is invoked more than once for
|
||||
* the same key then its ready-operation set never contains operation bits
|
||||
* that were contained in the set at previous calls to the <i>action</i>
|
||||
* in the same selection operation. </p></li>
|
||||
*
|
||||
* <li><p> If any keys were added to the cancelled-key set while step (2) was
|
||||
* in progress then they are processed as in step (1). </p></li>
|
||||
*
|
||||
* </ol>
|
||||
*
|
||||
*
|
||||
* <h2>Concurrency</h2>
|
||||
*
|
||||
* <p> A Selector and its key set are safe for use by multiple concurrent
|
||||
@ -156,13 +219,12 @@ import java.util.Set;
|
||||
*
|
||||
* <p> Keys may be cancelled and channels may be closed at any time. Hence the
|
||||
* presence of a key in one or more of a selector's key sets does not imply
|
||||
* that the key is valid or that its channel is open. Application code should
|
||||
* that the key is valid or that its channel is open. Application code should
|
||||
* be careful to synchronize and check these conditions as necessary if there
|
||||
* is any possibility that another thread will cancel a key or close a channel.
|
||||
*
|
||||
* <p> A thread blocked in one of the {@link #select()} or {@link
|
||||
* #select(long)} methods may be interrupted by some other thread in one of
|
||||
* three ways:
|
||||
* <p> A thread blocked in a selection operation may be interrupted by some
|
||||
* other thread in one of three ways:
|
||||
*
|
||||
* <ul>
|
||||
*
|
||||
@ -355,19 +417,189 @@ public abstract class Selector implements Closeable {
|
||||
*/
|
||||
public abstract int select() throws IOException;
|
||||
|
||||
/**
|
||||
* Selects and performs an action on the keys whose corresponding channels
|
||||
* are ready for I/O operations.
|
||||
*
|
||||
* <p> This method performs a blocking <a href="#selop">selection
|
||||
* operation</a>. It wakes up from querying the operating system only when
|
||||
* at least one channel is selected, this selector's {@link #wakeup wakeup}
|
||||
* method is invoked, the current thread is interrupted, or the given
|
||||
* timeout period expires, whichever comes first.
|
||||
*
|
||||
* <p> The specified <i>action</i>'s {@link Consumer#accept(Object) accept}
|
||||
* method is invoked with the key for each channel that is ready to perform
|
||||
* an operation identified by its key's interest set. The {@code accept}
|
||||
* method may be invoked more than once for the same key but with the
|
||||
* ready-operation set containing a subset of the operations for which the
|
||||
* channel is ready (as described above). The {@code accept} method is
|
||||
* invoked while synchronized on the selector and its selected-key set.
|
||||
* Great care must be taken to avoid deadlocking with other threads that
|
||||
* also synchronize on these objects. Selection operations are not reentrant
|
||||
* in general and consequently the <i>action</i> should take great care not
|
||||
* to attempt a selection operation on the same selector. The behavior when
|
||||
* attempting a reentrant selection operation is implementation specific and
|
||||
* therefore not specified. If the <i>action</i> closes the selector then
|
||||
* {@code ClosedSelectorException} is thrown when the action completes.
|
||||
* The <i>action</i> is not prohibited from closing channels registered with
|
||||
* the selector, nor prohibited from cancelling keys or changing a key's
|
||||
* interest set. If a channel is selected but its key is cancelled or its
|
||||
* interest set changed before the <i>action</i> is performed on the key
|
||||
* then it is implementation specific as to whether the <i>action</i> is
|
||||
* invoked (it may be invoked with an {@link SelectionKey#isValid() invalid}
|
||||
* key). Exceptions thrown by the action are relayed to the caller.
|
||||
*
|
||||
* <p> This method does not offer real-time guarantees: It schedules the
|
||||
* timeout as if by invoking the {@link Object#wait(long)} method.
|
||||
*
|
||||
* @implSpec The default implementation removes all keys from the
|
||||
* selected-key set, invokes {@link #select(long) select(long)} with the
|
||||
* given timeout and then performs the action for each key added to the
|
||||
* selected-key set. The default implementation does not detect the action
|
||||
* performing a reentrant selection operation. The selected-key set may
|
||||
* or may not be empty on completion of the default implementation.
|
||||
*
|
||||
* @param action The action to perform
|
||||
*
|
||||
* @param timeout If positive, block for up to {@code timeout}
|
||||
* milliseconds, more or less, while waiting for a
|
||||
* channel to become ready; if zero, block indefinitely;
|
||||
* must not be negative
|
||||
*
|
||||
* @return The number of unique keys consumed, possibly zero
|
||||
*
|
||||
* @throws IOException
|
||||
* If an I/O error occurs
|
||||
*
|
||||
* @throws ClosedSelectorException
|
||||
* If this selector is closed or is closed by the action
|
||||
*
|
||||
* @throws IllegalArgumentException
|
||||
* If the value of the timeout argument is negative
|
||||
*
|
||||
* @since 11
|
||||
*/
|
||||
public int select(Consumer<SelectionKey> action, long timeout)
|
||||
throws IOException
|
||||
{
|
||||
if (timeout < 0)
|
||||
throw new IllegalArgumentException("Negative timeout");
|
||||
return doSelect(Objects.requireNonNull(action), timeout);
|
||||
}
|
||||
|
||||
/**
|
||||
* Selects and performs an action on the keys whose corresponding channels
|
||||
* are ready for I/O operations.
|
||||
*
|
||||
* <p> This method performs a blocking <a href="#selop">selection
|
||||
* operation</a>. It wakes up from querying the operating system only when
|
||||
* at least one channel is selected, this selector's {@link #wakeup wakeup}
|
||||
* method is invoked, or the current thread is interrupted, whichever comes
|
||||
* first.
|
||||
*
|
||||
* <p> This method is equivalent to invoking the 2-arg
|
||||
* {@link #select(Consumer, long) select} method with a timeout of {@code 0}
|
||||
* to block indefinitely. </p>
|
||||
*
|
||||
* @implSpec The default implementation invokes the 2-arg {@code select}
|
||||
* method with a timeout of {@code 0}.
|
||||
*
|
||||
* @param action The action to perform
|
||||
*
|
||||
* @return The number of unique keys consumed, possibly zero
|
||||
*
|
||||
* @throws IOException
|
||||
* If an I/O error occurs
|
||||
*
|
||||
* @throws ClosedSelectorException
|
||||
* If this selector is closed or is closed by the action
|
||||
*
|
||||
* @since 11
|
||||
*/
|
||||
public int select(Consumer<SelectionKey> action) throws IOException {
|
||||
return select(action, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Selects and performs an action on the keys whose corresponding channels
|
||||
* are ready for I/O operations.
|
||||
*
|
||||
* <p> This method performs a non-blocking <a href="#selop">selection
|
||||
* operation</a>.
|
||||
*
|
||||
* <p> Invoking this method clears the effect of any previous invocations
|
||||
* of the {@link #wakeup wakeup} method. </p>
|
||||
*
|
||||
* @implSpec The default implementation removes all keys from the
|
||||
* selected-key set, invokes {@link #selectNow() selectNow()} and then
|
||||
* performs the action for each key added to the selected-key set. The
|
||||
* default implementation does not detect the action performing a reentrant
|
||||
* selection operation. The selected-key set may or may not be empty on
|
||||
* completion of the default implementation.
|
||||
*
|
||||
* @param action The action to perform
|
||||
*
|
||||
* @return The number of unique keys consumed, possibly zero
|
||||
*
|
||||
* @throws IOException
|
||||
* If an I/O error occurs
|
||||
*
|
||||
* @throws ClosedSelectorException
|
||||
* If this selector is closed or is closed by the action
|
||||
*
|
||||
* @since 11
|
||||
*/
|
||||
public int selectNow(Consumer<SelectionKey> action) throws IOException {
|
||||
return doSelect(Objects.requireNonNull(action), -1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Default implementation of select(Consumer) and selectNow(Consumer).
|
||||
*/
|
||||
private int doSelect(Consumer<SelectionKey> action, long timeout)
|
||||
throws IOException
|
||||
{
|
||||
synchronized (this) {
|
||||
Set<SelectionKey> selectedKeys = selectedKeys();
|
||||
synchronized (selectedKeys) {
|
||||
selectedKeys.clear();
|
||||
int numKeySelected;
|
||||
if (timeout < 0) {
|
||||
numKeySelected = selectNow();
|
||||
} else {
|
||||
numKeySelected = select(timeout);
|
||||
}
|
||||
|
||||
// copy selected-key set as action may remove keys
|
||||
Set<SelectionKey> keysToConsume = Set.copyOf(selectedKeys);
|
||||
assert keysToConsume.size() == numKeySelected;
|
||||
selectedKeys.clear();
|
||||
|
||||
// invoke action for each selected key
|
||||
keysToConsume.forEach(k -> {
|
||||
action.accept(k);
|
||||
if (!isOpen())
|
||||
throw new ClosedSelectorException();
|
||||
});
|
||||
|
||||
return numKeySelected;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Causes the first selection operation that has not yet returned to return
|
||||
* immediately.
|
||||
*
|
||||
* <p> If another thread is currently blocked in an invocation of the
|
||||
* {@link #select()} or {@link #select(long)} methods then that invocation
|
||||
* will return immediately. If no selection operation is currently in
|
||||
* progress then the next invocation of one of these methods will return
|
||||
* immediately unless the {@link #selectNow()} method is invoked in the
|
||||
* meantime. In any case the value returned by that invocation may be
|
||||
* non-zero. Subsequent invocations of the {@link #select()} or {@link
|
||||
* #select(long)} methods will block as usual unless this method is invoked
|
||||
* again in the meantime.
|
||||
* <p> If another thread is currently blocked in a selection operation then
|
||||
* that invocation will return immediately. If no selection operation is
|
||||
* currently in progress then the next invocation of a selection operation
|
||||
* will return immediately unless {@link #selectNow()} or {@link
|
||||
* #selectNow(Consumer)} is invoked in the meantime. In any case the value
|
||||
* returned by that invocation may be non-zero. Subsequent selection
|
||||
* operations will block as usual unless this method is invoked again in the
|
||||
* meantime.
|
||||
*
|
||||
* <p> Invoking this method more than once between two successive selection
|
||||
* operations has the same effect as invoking it just once. </p>
|
||||
@ -398,5 +630,4 @@ public abstract class Selector implements Closeable {
|
||||
* If an I/O error occurs
|
||||
*/
|
||||
public abstract void close() throws IOException;
|
||||
|
||||
}
|
||||
|
@ -36,8 +36,10 @@ import java.nio.channels.spi.SelectorProvider;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
|
||||
/**
|
||||
@ -51,12 +53,15 @@ abstract class SelectorImpl
|
||||
private final Set<SelectionKey> keys;
|
||||
|
||||
// The set of keys with data ready for an operation
|
||||
protected final Set<SelectionKey> selectedKeys;
|
||||
private final Set<SelectionKey> selectedKeys;
|
||||
|
||||
// Public views of the key sets
|
||||
private final Set<SelectionKey> publicKeys; // Immutable
|
||||
private final Set<SelectionKey> publicSelectedKeys; // Removal allowed, but not addition
|
||||
|
||||
// used to check for reentrancy
|
||||
private boolean inSelect;
|
||||
|
||||
protected SelectorImpl(SelectorProvider sp) {
|
||||
super(sp);
|
||||
keys = ConcurrentHashMap.newKeySet();
|
||||
@ -82,13 +87,6 @@ abstract class SelectorImpl
|
||||
return publicSelectedKeys;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the public view of the selected-key set
|
||||
*/
|
||||
protected final Set<SelectionKey> nioSelectedKeys() {
|
||||
return publicSelectedKeys;
|
||||
}
|
||||
|
||||
/**
|
||||
* Marks the beginning of a select operation that might block
|
||||
*/
|
||||
@ -106,16 +104,27 @@ abstract class SelectorImpl
|
||||
/**
|
||||
* Selects the keys for channels that are ready for I/O operations.
|
||||
*
|
||||
* @param action the action to perform, can be null
|
||||
* @param timeout timeout in milliseconds to wait, 0 to not wait, -1 to
|
||||
* wait indefinitely
|
||||
*/
|
||||
protected abstract int doSelect(long timeout) throws IOException;
|
||||
protected abstract int doSelect(Consumer<SelectionKey> action, long timeout)
|
||||
throws IOException;
|
||||
|
||||
private int lockAndDoSelect(long timeout) throws IOException {
|
||||
private int lockAndDoSelect(Consumer<SelectionKey> action, long timeout)
|
||||
throws IOException
|
||||
{
|
||||
synchronized (this) {
|
||||
ensureOpen();
|
||||
synchronized (publicSelectedKeys) {
|
||||
return doSelect(timeout);
|
||||
if (inSelect)
|
||||
throw new IllegalStateException("select in progress");
|
||||
inSelect = true;
|
||||
try {
|
||||
synchronized (publicSelectedKeys) {
|
||||
return doSelect(action, timeout);
|
||||
}
|
||||
} finally {
|
||||
inSelect = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -124,17 +133,39 @@ abstract class SelectorImpl
|
||||
public final int select(long timeout) throws IOException {
|
||||
if (timeout < 0)
|
||||
throw new IllegalArgumentException("Negative timeout");
|
||||
return lockAndDoSelect((timeout == 0) ? -1 : timeout);
|
||||
return lockAndDoSelect(null, (timeout == 0) ? -1 : timeout);
|
||||
}
|
||||
|
||||
@Override
|
||||
public final int select() throws IOException {
|
||||
return lockAndDoSelect(-1);
|
||||
return lockAndDoSelect(null, -1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public final int selectNow() throws IOException {
|
||||
return lockAndDoSelect(0);
|
||||
return lockAndDoSelect(null, 0);
|
||||
}
|
||||
|
||||
@Override
|
||||
public final int select(Consumer<SelectionKey> action, long timeout)
|
||||
throws IOException
|
||||
{
|
||||
Objects.requireNonNull(action);
|
||||
if (timeout < 0)
|
||||
throw new IllegalArgumentException("Negative timeout");
|
||||
return lockAndDoSelect(action, (timeout == 0) ? -1 : timeout);
|
||||
}
|
||||
|
||||
@Override
|
||||
public final int select(Consumer<SelectionKey> action) throws IOException {
|
||||
Objects.requireNonNull(action);
|
||||
return lockAndDoSelect(action, -1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public final int selectNow(Consumer<SelectionKey> action) throws IOException {
|
||||
Objects.requireNonNull(action);
|
||||
return lockAndDoSelect(action, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -239,6 +270,39 @@ abstract class SelectorImpl
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Invoked by selection operations to handle ready events. If an action
|
||||
* is specified then it is invoked to handle the key, otherwise the key
|
||||
* is added to the selected-key set (or updated when it is already in the
|
||||
* set).
|
||||
*/
|
||||
protected final int processReadyEvents(int rOps,
|
||||
SelectionKeyImpl ski,
|
||||
Consumer<SelectionKey> action) {
|
||||
if (action != null) {
|
||||
ski.translateAndSetReadyOps(rOps);
|
||||
if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
|
||||
action.accept(ski);
|
||||
ensureOpen();
|
||||
return 1;
|
||||
}
|
||||
} else {
|
||||
assert Thread.holdsLock(publicSelectedKeys);
|
||||
if (selectedKeys.contains(ski)) {
|
||||
if (ski.translateAndUpdateReadyOps(rOps)) {
|
||||
return 1;
|
||||
}
|
||||
} else {
|
||||
ski.translateAndSetReadyOps(rOps);
|
||||
if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
|
||||
selectedKeys.add(ski);
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Invoked by interestOps to ensure the interest ops are updated at the
|
||||
* next selection operation.
|
||||
|
@ -27,6 +27,7 @@ package sun.nio.ch;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.channels.ClosedSelectorException;
|
||||
import java.nio.channels.SelectionKey;
|
||||
import java.nio.channels.Selector;
|
||||
import java.nio.channels.spi.SelectorProvider;
|
||||
import java.util.ArrayDeque;
|
||||
@ -34,6 +35,7 @@ import java.util.Deque;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import static sun.nio.ch.DevPollArrayWrapper.NUM_POLLFDS;
|
||||
import static sun.nio.ch.DevPollArrayWrapper.POLLREMOVE;
|
||||
@ -85,7 +87,9 @@ class DevPollSelectorImpl
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int doSelect(long timeout) throws IOException {
|
||||
protected int doSelect(Consumer<SelectionKey> action, long timeout)
|
||||
throws IOException
|
||||
{
|
||||
assert Thread.holdsLock(this);
|
||||
|
||||
long to = timeout;
|
||||
@ -117,7 +121,7 @@ class DevPollSelectorImpl
|
||||
end(blocking);
|
||||
}
|
||||
processDeregisterQueue();
|
||||
return updateSelectedKeys(numEntries);
|
||||
return processEvents(numEntries, action);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -165,13 +169,13 @@ class DevPollSelectorImpl
|
||||
}
|
||||
|
||||
/**
|
||||
* Update the keys of file descriptors that were polled and add them to
|
||||
* the selected-key set.
|
||||
* Process the polled events.
|
||||
* If the interrupt fd has been selected, drain it and clear the interrupt.
|
||||
*/
|
||||
private int updateSelectedKeys(int numEntries) throws IOException {
|
||||
private int processEvents(int numEntries, Consumer<SelectionKey> action)
|
||||
throws IOException
|
||||
{
|
||||
assert Thread.holdsLock(this);
|
||||
assert Thread.holdsLock(nioSelectedKeys());
|
||||
|
||||
boolean interrupted = false;
|
||||
int numKeysUpdated = 0;
|
||||
@ -183,17 +187,7 @@ class DevPollSelectorImpl
|
||||
SelectionKeyImpl ski = fdToKey.get(fd);
|
||||
if (ski != null) {
|
||||
int rOps = pollWrapper.getReventOps(i);
|
||||
if (selectedKeys.contains(ski)) {
|
||||
if (ski.translateAndUpdateReadyOps(rOps)) {
|
||||
numKeysUpdated++;
|
||||
}
|
||||
} else {
|
||||
ski.translateAndSetReadyOps(rOps);
|
||||
if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
|
||||
selectedKeys.add(ski);
|
||||
numKeysUpdated++;
|
||||
}
|
||||
}
|
||||
numKeysUpdated += processReadyEvents(rOps, ski, action);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -35,6 +35,7 @@ import java.util.Deque;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import static sun.nio.ch.SolarisEventPort.PORT_SOURCE_FD;
|
||||
import static sun.nio.ch.SolarisEventPort.PORT_SOURCE_USER;
|
||||
@ -97,7 +98,9 @@ class EventPortSelectorImpl
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int doSelect(long timeout) throws IOException {
|
||||
protected int doSelect(Consumer<SelectionKey> action, long timeout)
|
||||
throws IOException
|
||||
{
|
||||
assert Thread.holdsLock(this);
|
||||
|
||||
long to = timeout;
|
||||
@ -129,7 +132,7 @@ class EventPortSelectorImpl
|
||||
end(blocking);
|
||||
}
|
||||
processDeregisterQueue();
|
||||
return processPortEvents(numEvents);
|
||||
return processPortEvents(numEvents, action);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -166,19 +169,21 @@ class EventPortSelectorImpl
|
||||
}
|
||||
|
||||
/**
|
||||
* Process the port events. This method updates the keys of file descriptors
|
||||
* that were polled. It also re-queues the key so that the file descriptor
|
||||
* is re-associated at the next select operation.
|
||||
*
|
||||
* @return the number of selection keys updated.
|
||||
* Process the polled events and re-queue the selected keys so the file
|
||||
* descriptors are re-associated at the next select operation.
|
||||
*/
|
||||
private int processPortEvents(int numEvents) throws IOException {
|
||||
private int processPortEvents(int numEvents, Consumer<SelectionKey> action)
|
||||
throws IOException
|
||||
{
|
||||
assert Thread.holdsLock(this);
|
||||
assert Thread.holdsLock(nioSelectedKeys());
|
||||
|
||||
int numKeysUpdated = 0;
|
||||
boolean interrupted = false;
|
||||
|
||||
// Process the polled events while holding the update lock. This allows
|
||||
// keys to be queued for ready file descriptors so they can be
|
||||
// re-associated at the next select. The selected-key can be updated
|
||||
// in this pass.
|
||||
synchronized (updateLock) {
|
||||
for (int i = 0; i < numEvents; i++) {
|
||||
short source = getSource(i);
|
||||
@ -186,22 +191,15 @@ class EventPortSelectorImpl
|
||||
int fd = getDescriptor(i);
|
||||
SelectionKeyImpl ski = fdToKey.get(fd);
|
||||
if (ski != null) {
|
||||
int rOps = getEventOps(i);
|
||||
if (selectedKeys.contains(ski)) {
|
||||
if (ski.translateAndUpdateReadyOps(rOps)) {
|
||||
numKeysUpdated++;
|
||||
}
|
||||
} else {
|
||||
ski.translateAndSetReadyOps(rOps);
|
||||
if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
|
||||
selectedKeys.add(ski);
|
||||
numKeysUpdated++;
|
||||
}
|
||||
}
|
||||
|
||||
// re-queue key so it re-associated at next select
|
||||
ski.registeredEvents(0);
|
||||
updateKeys.addLast(ski);
|
||||
|
||||
// update selected-key set if no action specified
|
||||
if (action == null) {
|
||||
int rOps = getEventOps(i);
|
||||
numKeysUpdated += processReadyEvents(rOps, ski, null);
|
||||
}
|
||||
|
||||
}
|
||||
} else if (source == PORT_SOURCE_USER) {
|
||||
interrupted = true;
|
||||
@ -211,6 +209,22 @@ class EventPortSelectorImpl
|
||||
}
|
||||
}
|
||||
|
||||
// if an action specified then iterate over the polled events again so
|
||||
// that the action is performed without holding the update lock.
|
||||
if (action != null) {
|
||||
for (int i = 0; i < numEvents; i++) {
|
||||
short source = getSource(i);
|
||||
if (source == PORT_SOURCE_FD) {
|
||||
int fd = getDescriptor(i);
|
||||
SelectionKeyImpl ski = fdToKey.get(fd);
|
||||
if (ski != null) {
|
||||
int rOps = getEventOps(i);
|
||||
numKeysUpdated += processReadyEvents(rOps, ski, action);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (interrupted) {
|
||||
clearInterrupt();
|
||||
}
|
||||
|
@ -26,6 +26,7 @@ package sun.nio.ch;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.channels.ClosedSelectorException;
|
||||
import java.nio.channels.SelectionKey;
|
||||
import java.nio.channels.Selector;
|
||||
import java.nio.channels.spi.SelectorProvider;
|
||||
import java.util.ArrayDeque;
|
||||
@ -33,6 +34,7 @@ import java.util.ArrayList;
|
||||
import java.util.Deque;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import jdk.internal.misc.Unsafe;
|
||||
|
||||
@ -92,7 +94,9 @@ class PollSelectorImpl extends SelectorImpl {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int doSelect(long timeout) throws IOException {
|
||||
protected int doSelect(Consumer<SelectionKey> action, long timeout)
|
||||
throws IOException
|
||||
{
|
||||
assert Thread.holdsLock(this);
|
||||
|
||||
int to = (int) Math.min(timeout, Integer.MAX_VALUE); // max poll timeout
|
||||
@ -125,7 +129,7 @@ class PollSelectorImpl extends SelectorImpl {
|
||||
}
|
||||
|
||||
processDeregisterQueue();
|
||||
return updateSelectedKeys();
|
||||
return processEvents(action);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -157,13 +161,13 @@ class PollSelectorImpl extends SelectorImpl {
|
||||
}
|
||||
|
||||
/**
|
||||
* Update the keys of file descriptors that were polled and add them to
|
||||
* the selected-key set.
|
||||
* Process the polled events.
|
||||
* If the interrupt fd has been selected, drain it and clear the interrupt.
|
||||
*/
|
||||
private int updateSelectedKeys() throws IOException {
|
||||
private int processEvents(Consumer<SelectionKey> action)
|
||||
throws IOException
|
||||
{
|
||||
assert Thread.holdsLock(this);
|
||||
assert Thread.holdsLock(nioSelectedKeys());
|
||||
assert pollArraySize > 0 && pollArraySize == pollKeys.size();
|
||||
|
||||
int numKeysUpdated = 0;
|
||||
@ -173,17 +177,7 @@ class PollSelectorImpl extends SelectorImpl {
|
||||
SelectionKeyImpl ski = pollKeys.get(i);
|
||||
assert ski.getFDVal() == getDescriptor(i);
|
||||
if (ski.isValid()) {
|
||||
if (selectedKeys.contains(ski)) {
|
||||
if (ski.translateAndUpdateReadyOps(rOps)) {
|
||||
numKeysUpdated++;
|
||||
}
|
||||
} else {
|
||||
ski.translateAndSetReadyOps(rOps);
|
||||
if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
|
||||
selectedKeys.add(ski);
|
||||
numKeysUpdated++;
|
||||
}
|
||||
}
|
||||
numKeysUpdated += processReadyEvents(rOps, ski, action);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -28,6 +28,7 @@ package sun.nio.ch;
|
||||
import java.io.IOException;
|
||||
import java.nio.channels.ClosedSelectorException;
|
||||
import java.nio.channels.Pipe;
|
||||
import java.nio.channels.SelectionKey;
|
||||
import java.nio.channels.Selector;
|
||||
import java.nio.channels.spi.SelectorProvider;
|
||||
import java.util.ArrayDeque;
|
||||
@ -36,6 +37,7 @@ import java.util.Deque;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
/**
|
||||
* A multi-threaded implementation of Selector for Windows.
|
||||
@ -139,7 +141,9 @@ class WindowsSelectorImpl extends SelectorImpl {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int doSelect(long timeout) throws IOException {
|
||||
protected int doSelect(Consumer<SelectionKey> action, long timeout)
|
||||
throws IOException
|
||||
{
|
||||
assert Thread.holdsLock(this);
|
||||
this.timeout = timeout; // set selector timeout
|
||||
processUpdateQueue();
|
||||
@ -173,7 +177,7 @@ class WindowsSelectorImpl extends SelectorImpl {
|
||||
// Done with poll(). Set wakeupSocket to nonsignaled for the next run.
|
||||
finishLock.checkForException();
|
||||
processDeregisterQueue();
|
||||
int updated = updateSelectedKeys();
|
||||
int updated = updateSelectedKeys(action);
|
||||
// Done with poll(). Set wakeupSocket to nonsignaled for the next run.
|
||||
resetWakeupSocket();
|
||||
return updated;
|
||||
@ -349,16 +353,16 @@ class WindowsSelectorImpl extends SelectorImpl {
|
||||
private native int poll0(long pollAddress, int numfds,
|
||||
int[] readFds, int[] writeFds, int[] exceptFds, long timeout);
|
||||
|
||||
private int processSelectedKeys(long updateCount) {
|
||||
private int processSelectedKeys(long updateCount, Consumer<SelectionKey> action) {
|
||||
int numKeysUpdated = 0;
|
||||
numKeysUpdated += processFDSet(updateCount, readFds,
|
||||
numKeysUpdated += processFDSet(updateCount, action, readFds,
|
||||
Net.POLLIN,
|
||||
false);
|
||||
numKeysUpdated += processFDSet(updateCount, writeFds,
|
||||
numKeysUpdated += processFDSet(updateCount, action, writeFds,
|
||||
Net.POLLCONN |
|
||||
Net.POLLOUT,
|
||||
false);
|
||||
numKeysUpdated += processFDSet(updateCount, exceptFds,
|
||||
numKeysUpdated += processFDSet(updateCount, action, exceptFds,
|
||||
Net.POLLIN |
|
||||
Net.POLLCONN |
|
||||
Net.POLLOUT,
|
||||
@ -372,7 +376,9 @@ class WindowsSelectorImpl extends SelectorImpl {
|
||||
*
|
||||
* me.updateCount <= updateCount
|
||||
*/
|
||||
private int processFDSet(long updateCount, int[] fds, int rOps,
|
||||
private int processFDSet(long updateCount,
|
||||
Consumer<SelectionKey> action,
|
||||
int[] fds, int rOps,
|
||||
boolean isExceptFds)
|
||||
{
|
||||
int numKeysUpdated = 0;
|
||||
@ -401,20 +407,10 @@ class WindowsSelectorImpl extends SelectorImpl {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (selectedKeys.contains(sk)) { // Key in selected set
|
||||
if (sk.translateAndUpdateReadyOps(rOps)) {
|
||||
if (me.updateCount != updateCount) {
|
||||
me.updateCount = updateCount;
|
||||
numKeysUpdated++;
|
||||
}
|
||||
}
|
||||
} else { // Key is not in selected set yet
|
||||
sk.translateAndSetReadyOps(rOps);
|
||||
if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) {
|
||||
selectedKeys.add(sk);
|
||||
me.updateCount = updateCount;
|
||||
numKeysUpdated++;
|
||||
}
|
||||
int updated = processReadyEvents(rOps, sk, action);
|
||||
if (updated > 0 && me.updateCount != updateCount) {
|
||||
me.updateCount = updateCount;
|
||||
numKeysUpdated++;
|
||||
}
|
||||
}
|
||||
return numKeysUpdated;
|
||||
@ -509,12 +505,12 @@ class WindowsSelectorImpl extends SelectorImpl {
|
||||
|
||||
// Update ops of the corresponding Channels. Add the ready keys to the
|
||||
// ready queue.
|
||||
private int updateSelectedKeys() {
|
||||
private int updateSelectedKeys(Consumer<SelectionKey> action) {
|
||||
updateCount++;
|
||||
int numKeysUpdated = 0;
|
||||
numKeysUpdated += subSelector.processSelectedKeys(updateCount);
|
||||
numKeysUpdated += subSelector.processSelectedKeys(updateCount, action);
|
||||
for (SelectThread t: threads) {
|
||||
numKeysUpdated += t.subSelector.processSelectedKeys(updateCount);
|
||||
numKeysUpdated += t.subSelector.processSelectedKeys(updateCount, action);
|
||||
}
|
||||
return numKeysUpdated;
|
||||
}
|
||||
|
755
test/jdk/java/nio/channels/Selector/SelectWithConsumer.java
Normal file
755
test/jdk/java/nio/channels/Selector/SelectWithConsumer.java
Normal file
@ -0,0 +1,755 @@
|
||||
/*
|
||||
* Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
* under the terms of the GNU General Public License version 2 only, as
|
||||
* published by the Free Software Foundation.
|
||||
*
|
||||
* This code is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
||||
* version 2 for more details (a copy is included in the LICENSE file that
|
||||
* accompanied this code).
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License version
|
||||
* 2 along with this work; if not, write to the Free Software Foundation,
|
||||
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
*
|
||||
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
|
||||
* or visit www.oracle.com if you need additional information or have any
|
||||
* questions.
|
||||
*/
|
||||
|
||||
/* @test
|
||||
* @bug 8199433
|
||||
* @run testng SelectWithConsumer
|
||||
* @summary Unit test for Selector select(Consumer), select(Consumer,long) and
|
||||
* selectNow(Consumer)
|
||||
*/
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.ClosedSelectorException;
|
||||
import java.nio.channels.Pipe;
|
||||
import java.nio.channels.SelectionKey;
|
||||
import java.nio.channels.Selector;
|
||||
import java.nio.channels.ServerSocketChannel;
|
||||
import java.nio.channels.SocketChannel;
|
||||
import java.nio.channels.WritableByteChannel;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import static java.util.concurrent.TimeUnit.*;
|
||||
|
||||
import org.testng.annotations.AfterTest;
|
||||
import org.testng.annotations.Test;
|
||||
import static org.testng.Assert.*;
|
||||
|
||||
@Test
|
||||
public class SelectWithConsumer {
|
||||
|
||||
/**
|
||||
* Invoke the select methods that take an action and check that the
|
||||
* accumulated ready ops notified to the action matches the expected ops.
|
||||
*/
|
||||
void testActionInvoked(SelectionKey key, int expectedOps) throws Exception {
|
||||
var callerThread = Thread.currentThread();
|
||||
var sel = key.selector();
|
||||
var interestOps = key.interestOps();
|
||||
var notifiedOps = new AtomicInteger();
|
||||
|
||||
// select(Consumer)
|
||||
if (expectedOps == 0)
|
||||
sel.wakeup(); // ensure select does not block
|
||||
notifiedOps.set(0);
|
||||
int n = sel.select(k -> {
|
||||
assertTrue(Thread.currentThread() == callerThread);
|
||||
assertTrue(k == key);
|
||||
int readyOps = key.readyOps();
|
||||
assertTrue((readyOps & interestOps) != 0);
|
||||
assertTrue((readyOps & notifiedOps.get()) == 0);
|
||||
notifiedOps.set(notifiedOps.get() | readyOps);
|
||||
});
|
||||
assertTrue((n == 1) ^ (expectedOps == 0));
|
||||
assertTrue(notifiedOps.get() == expectedOps);
|
||||
|
||||
// select(Consumer, timeout)
|
||||
notifiedOps.set(0);
|
||||
n = sel.select(k -> {
|
||||
assertTrue(Thread.currentThread() == callerThread);
|
||||
assertTrue(k == key);
|
||||
int readyOps = key.readyOps();
|
||||
assertTrue((readyOps & interestOps) != 0);
|
||||
assertTrue((readyOps & notifiedOps.get()) == 0);
|
||||
notifiedOps.set(notifiedOps.get() | readyOps);
|
||||
}, 1000);
|
||||
assertTrue((n == 1) ^ (expectedOps == 0));
|
||||
assertTrue(notifiedOps.get() == expectedOps);
|
||||
|
||||
// selectNow(Consumer)
|
||||
notifiedOps.set(0);
|
||||
n = sel.selectNow(k -> {
|
||||
assertTrue(Thread.currentThread() == callerThread);
|
||||
assertTrue(k == key);
|
||||
int readyOps = key.readyOps();
|
||||
assertTrue((readyOps & interestOps) != 0);
|
||||
assertTrue((readyOps & notifiedOps.get()) == 0);
|
||||
notifiedOps.set(notifiedOps.get() | readyOps);
|
||||
});
|
||||
assertTrue((n == 1) ^ (expectedOps == 0));
|
||||
assertTrue(notifiedOps.get() == expectedOps);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that an action is performed when a channel is ready for reading.
|
||||
*/
|
||||
public void testReadable() throws Exception {
|
||||
Pipe p = Pipe.open();
|
||||
try (Selector sel = Selector.open()) {
|
||||
Pipe.SinkChannel sink = p.sink();
|
||||
Pipe.SourceChannel source = p.source();
|
||||
source.configureBlocking(false);
|
||||
SelectionKey key = source.register(sel, SelectionKey.OP_READ);
|
||||
|
||||
// write to sink to ensure source is readable
|
||||
scheduleWrite(sink, messageBuffer(), 100, MILLISECONDS);
|
||||
|
||||
// test that action is invoked
|
||||
testActionInvoked(key, SelectionKey.OP_READ);
|
||||
} finally {
|
||||
closePipe(p);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that an action is performed when a channel is ready for writing.
|
||||
*/
|
||||
public void testWritable() throws Exception {
|
||||
Pipe p = Pipe.open();
|
||||
try (Selector sel = Selector.open()) {
|
||||
Pipe.SourceChannel source = p.source();
|
||||
Pipe.SinkChannel sink = p.sink();
|
||||
sink.configureBlocking(false);
|
||||
SelectionKey key = sink.register(sel, SelectionKey.OP_WRITE);
|
||||
|
||||
// test that action is invoked
|
||||
testActionInvoked(key, SelectionKey.OP_WRITE);
|
||||
} finally {
|
||||
closePipe(p);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that an action is performed when a channel is ready for both
|
||||
* reading and writing.
|
||||
*/
|
||||
public void testReadableAndWriteable() throws Exception {
|
||||
ServerSocketChannel ssc = null;
|
||||
SocketChannel sc = null;
|
||||
SocketChannel peer = null;
|
||||
try (Selector sel = Selector.open()) {
|
||||
ssc = ServerSocketChannel.open().bind(new InetSocketAddress(0));
|
||||
sc = SocketChannel.open(ssc.getLocalAddress());
|
||||
sc.configureBlocking(false);
|
||||
SelectionKey key = sc.register(sel, (SelectionKey.OP_READ |
|
||||
SelectionKey.OP_WRITE));
|
||||
|
||||
// accept connection and write data so the source is readable
|
||||
peer = ssc.accept();
|
||||
peer.write(messageBuffer());
|
||||
|
||||
// test that action is invoked
|
||||
testActionInvoked(key, (SelectionKey.OP_READ | SelectionKey.OP_WRITE));
|
||||
} finally {
|
||||
if (ssc != null) ssc.close();
|
||||
if (sc != null) sc.close();
|
||||
if (peer != null) peer.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that the action is called for two selected channels
|
||||
*/
|
||||
public void testTwoChannels() throws Exception {
|
||||
Pipe p = Pipe.open();
|
||||
try (Selector sel = Selector.open()) {
|
||||
Pipe.SourceChannel source = p.source();
|
||||
Pipe.SinkChannel sink = p.sink();
|
||||
source.configureBlocking(false);
|
||||
sink.configureBlocking(false);
|
||||
SelectionKey key1 = source.register(sel, SelectionKey.OP_READ);
|
||||
SelectionKey key2 = sink.register(sel, SelectionKey.OP_WRITE);
|
||||
|
||||
// write to sink to ensure that the source is readable
|
||||
sink.write(messageBuffer());
|
||||
|
||||
var counter = new AtomicInteger();
|
||||
|
||||
// select(Consumer)
|
||||
counter.set(0);
|
||||
int n = sel.select(k -> {
|
||||
counter.incrementAndGet();
|
||||
if (k == key1) {
|
||||
assertTrue(k.isReadable());
|
||||
} else if (k == key2) {
|
||||
assertTrue(k.isWritable());
|
||||
} else {
|
||||
assertTrue(false);
|
||||
}
|
||||
});
|
||||
assertTrue(n == 2);
|
||||
assertTrue(counter.get() == 2);
|
||||
|
||||
// select(Consumer, timeout)
|
||||
counter.set(0);
|
||||
n = sel.select(k -> {
|
||||
counter.incrementAndGet();
|
||||
if (k == key1) {
|
||||
assertTrue(k.isReadable());
|
||||
} else if (k == key2) {
|
||||
assertTrue(k.isWritable());
|
||||
} else {
|
||||
assertTrue(false);
|
||||
}
|
||||
}, 1000);
|
||||
assertTrue(n == 2);
|
||||
assertTrue(counter.get() == 2);
|
||||
|
||||
// selectNow(Consumer)
|
||||
counter.set(0);
|
||||
n = sel.selectNow(k -> {
|
||||
counter.incrementAndGet();
|
||||
if (k == key1) {
|
||||
assertTrue(k.isReadable());
|
||||
} else if (k == key2) {
|
||||
assertTrue(k.isWritable());
|
||||
} else {
|
||||
assertTrue(false);
|
||||
}
|
||||
});
|
||||
assertTrue(n == 2);
|
||||
assertTrue(counter.get() == 2);
|
||||
} finally {
|
||||
closePipe(p);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test calling select twice, the action should be invoked each time
|
||||
*/
|
||||
public void testRepeatedSelect1() throws Exception {
|
||||
Pipe p = Pipe.open();
|
||||
try (Selector sel = Selector.open()) {
|
||||
Pipe.SourceChannel source = p.source();
|
||||
Pipe.SinkChannel sink = p.sink();
|
||||
source.configureBlocking(false);
|
||||
SelectionKey key = source.register(sel, SelectionKey.OP_READ);
|
||||
|
||||
// write to sink to ensure that the source is readable
|
||||
sink.write(messageBuffer());
|
||||
|
||||
// test that action is invoked
|
||||
testActionInvoked(key, SelectionKey.OP_READ);
|
||||
testActionInvoked(key, SelectionKey.OP_READ);
|
||||
|
||||
} finally {
|
||||
closePipe(p);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test calling select twice. An I/O operation is performed after the
|
||||
* first select so the channel will not be selected by the second select.
|
||||
*/
|
||||
public void testRepeatedSelect2() throws Exception {
|
||||
Pipe p = Pipe.open();
|
||||
try (Selector sel = Selector.open()) {
|
||||
Pipe.SourceChannel source = p.source();
|
||||
Pipe.SinkChannel sink = p.sink();
|
||||
source.configureBlocking(false);
|
||||
SelectionKey key = source.register(sel, SelectionKey.OP_READ);
|
||||
|
||||
// write to sink to ensure that the source is readable
|
||||
sink.write(messageBuffer());
|
||||
|
||||
// test that action is invoked
|
||||
testActionInvoked(key, SelectionKey.OP_READ);
|
||||
|
||||
// read all bytes
|
||||
int n;
|
||||
ByteBuffer bb = ByteBuffer.allocate(100);
|
||||
do {
|
||||
n = source.read(bb);
|
||||
bb.clear();
|
||||
} while (n > 0);
|
||||
|
||||
// test that action is not invoked
|
||||
testActionInvoked(key, 0);
|
||||
} finally {
|
||||
closePipe(p);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test timeout
|
||||
*/
|
||||
public void testTimeout() throws Exception {
|
||||
Pipe p = Pipe.open();
|
||||
try (Selector sel = Selector.open()) {
|
||||
Pipe.SourceChannel source = p.source();
|
||||
Pipe.SinkChannel sink = p.sink();
|
||||
source.configureBlocking(false);
|
||||
source.register(sel, SelectionKey.OP_READ);
|
||||
long start = System.currentTimeMillis();
|
||||
int n = sel.select(k -> assertTrue(false), 1000L);
|
||||
long duration = System.currentTimeMillis() - start;
|
||||
assertTrue(n == 0);
|
||||
assertTrue(duration > 500, "select took " + duration + " ms");
|
||||
} finally {
|
||||
closePipe(p);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test wakeup prior to select
|
||||
*/
|
||||
public void testWakeupBeforeSelect() throws Exception {
|
||||
// select(Consumer)
|
||||
try (Selector sel = Selector.open()) {
|
||||
sel.wakeup();
|
||||
int n = sel.select(k -> assertTrue(false));
|
||||
assertTrue(n == 0);
|
||||
}
|
||||
|
||||
// select(Consumer, timeout)
|
||||
try (Selector sel = Selector.open()) {
|
||||
sel.wakeup();
|
||||
long start = System.currentTimeMillis();
|
||||
int n = sel.select(k -> assertTrue(false), 60*1000);
|
||||
long duration = System.currentTimeMillis() - start;
|
||||
assertTrue(n == 0);
|
||||
assertTrue(duration < 5000, "select took " + duration + " ms");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test wakeup during select
|
||||
*/
|
||||
public void testWakeupDuringSelect() throws Exception {
|
||||
// select(Consumer)
|
||||
try (Selector sel = Selector.open()) {
|
||||
scheduleWakeup(sel, 1, SECONDS);
|
||||
int n = sel.select(k -> assertTrue(false));
|
||||
assertTrue(n == 0);
|
||||
}
|
||||
|
||||
// select(Consumer, timeout)
|
||||
try (Selector sel = Selector.open()) {
|
||||
scheduleWakeup(sel, 1, SECONDS);
|
||||
long start = System.currentTimeMillis();
|
||||
int n = sel.select(k -> assertTrue(false), 60*1000);
|
||||
long duration = System.currentTimeMillis() - start;
|
||||
assertTrue(n == 0);
|
||||
assertTrue(duration > 500 && duration < 10*1000,
|
||||
"select took " + duration + " ms");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test invoking select with interrupt status set
|
||||
*/
|
||||
public void testInterruptBeforeSelect() throws Exception {
|
||||
// select(Consumer)
|
||||
try (Selector sel = Selector.open()) {
|
||||
Thread.currentThread().interrupt();
|
||||
int n = sel.select(k -> assertTrue(false));
|
||||
assertTrue(n == 0);
|
||||
assertTrue(Thread.currentThread().isInterrupted());
|
||||
assertTrue(sel.isOpen());
|
||||
} finally {
|
||||
Thread.currentThread().interrupted(); // clear interrupt status
|
||||
}
|
||||
|
||||
// select(Consumer, timeout)
|
||||
try (Selector sel = Selector.open()) {
|
||||
Thread.currentThread().interrupt();
|
||||
long start = System.currentTimeMillis();
|
||||
int n = sel.select(k -> assertTrue(false), 60*1000);
|
||||
long duration = System.currentTimeMillis() - start;
|
||||
assertTrue(n == 0);
|
||||
assertTrue(duration < 5000, "select took " + duration + " ms");
|
||||
assertTrue(Thread.currentThread().isInterrupted());
|
||||
assertTrue(sel.isOpen());
|
||||
} finally {
|
||||
Thread.currentThread().interrupted(); // clear interrupt status
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test interrupt thread during select
|
||||
*/
|
||||
public void testInterruptDuringSelect() throws Exception {
|
||||
// select(Consumer)
|
||||
try (Selector sel = Selector.open()) {
|
||||
scheduleInterrupt(Thread.currentThread(), 1, SECONDS);
|
||||
int n = sel.select(k -> assertTrue(false));
|
||||
assertTrue(n == 0);
|
||||
assertTrue(Thread.currentThread().isInterrupted());
|
||||
assertTrue(sel.isOpen());
|
||||
} finally {
|
||||
Thread.currentThread().interrupted(); // clear interrupt status
|
||||
}
|
||||
|
||||
// select(Consumer, timeout)
|
||||
try (Selector sel = Selector.open()) {
|
||||
scheduleInterrupt(Thread.currentThread(), 1, SECONDS);
|
||||
long start = System.currentTimeMillis();
|
||||
int n = sel.select(k -> assertTrue(false), 60*1000);
|
||||
long duration = System.currentTimeMillis() - start;
|
||||
assertTrue(n == 0);
|
||||
assertTrue(duration > 500 && duration < 5000,
|
||||
"select took " + duration + " ms");
|
||||
assertTrue(Thread.currentThread().isInterrupted());
|
||||
assertTrue(sel.isOpen());
|
||||
} finally {
|
||||
Thread.currentThread().interrupted(); // clear interrupt status
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test invoking select on a closed selector
|
||||
*/
|
||||
@Test(expectedExceptions = ClosedSelectorException.class)
|
||||
public void testClosedSelector1() throws Exception {
|
||||
Selector sel = Selector.open();
|
||||
sel.close();
|
||||
sel.select(k -> assertTrue(false));
|
||||
}
|
||||
@Test(expectedExceptions = ClosedSelectorException.class)
|
||||
public void testClosedSelector2() throws Exception {
|
||||
Selector sel = Selector.open();
|
||||
sel.close();
|
||||
sel.select(k -> assertTrue(false), 1000);
|
||||
}
|
||||
@Test(expectedExceptions = ClosedSelectorException.class)
|
||||
public void testClosedSelector3() throws Exception {
|
||||
Selector sel = Selector.open();
|
||||
sel.close();
|
||||
sel.selectNow(k -> assertTrue(false));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test closing selector while in a selection operation
|
||||
*/
|
||||
public void testCloseDuringSelect() throws Exception {
|
||||
// select(Consumer)
|
||||
try (Selector sel = Selector.open()) {
|
||||
scheduleClose(sel, 3, SECONDS);
|
||||
int n = sel.select(k -> assertTrue(false));
|
||||
assertTrue(n == 0);
|
||||
assertFalse(sel.isOpen());
|
||||
}
|
||||
|
||||
// select(Consumer, timeout)
|
||||
try (Selector sel = Selector.open()) {
|
||||
scheduleClose(sel, 3, SECONDS);
|
||||
long start = System.currentTimeMillis();
|
||||
int n = sel.select(k -> assertTrue(false), 60*1000);
|
||||
long duration = System.currentTimeMillis() - start;
|
||||
assertTrue(n == 0);
|
||||
assertTrue(duration > 2000 && duration < 10*1000,
|
||||
"select took " + duration + " ms");
|
||||
assertFalse(sel.isOpen());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test action closing selector
|
||||
*/
|
||||
@Test(expectedExceptions = ClosedSelectorException.class)
|
||||
public void testActionClosingSelector() throws Exception {
|
||||
Pipe p = Pipe.open();
|
||||
try (Selector sel = Selector.open()) {
|
||||
Pipe.SourceChannel source = p.source();
|
||||
Pipe.SinkChannel sink = p.sink();
|
||||
source.configureBlocking(false);
|
||||
SelectionKey key = source.register(sel, SelectionKey.OP_READ);
|
||||
|
||||
// write to sink to ensure that the source is readable
|
||||
sink.write(messageBuffer());
|
||||
|
||||
// should relay ClosedSelectorException
|
||||
sel.select(k -> {
|
||||
assertTrue(k == key);
|
||||
try {
|
||||
sel.close();
|
||||
} catch (IOException ioe) { }
|
||||
});
|
||||
} finally {
|
||||
closePipe(p);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that the action is invoked while synchronized on the selector and
|
||||
* its selected-key set.
|
||||
*/
|
||||
public void testLocks() throws Exception {
|
||||
Pipe p = Pipe.open();
|
||||
try (Selector sel = Selector.open()) {
|
||||
Pipe.SourceChannel source = p.source();
|
||||
Pipe.SinkChannel sink = p.sink();
|
||||
source.configureBlocking(false);
|
||||
SelectionKey key = source.register(sel, SelectionKey.OP_READ);
|
||||
|
||||
// write to sink to ensure that the source is readable
|
||||
sink.write(messageBuffer());
|
||||
|
||||
// select(Consumer)
|
||||
sel.select(k -> {
|
||||
assertTrue(k == key);
|
||||
assertTrue(Thread.holdsLock(sel));
|
||||
assertFalse(Thread.holdsLock(sel.keys()));
|
||||
assertTrue(Thread.holdsLock(sel.selectedKeys()));
|
||||
});
|
||||
|
||||
// select(Consumer, timeout)
|
||||
sel.select(k -> {
|
||||
assertTrue(k == key);
|
||||
assertTrue(Thread.holdsLock(sel));
|
||||
assertFalse(Thread.holdsLock(sel.keys()));
|
||||
assertTrue(Thread.holdsLock(sel.selectedKeys()));
|
||||
}, 1000L);
|
||||
|
||||
// selectNow(Consumer)
|
||||
sel.selectNow(k -> {
|
||||
assertTrue(k == key);
|
||||
assertTrue(Thread.holdsLock(sel));
|
||||
assertFalse(Thread.holdsLock(sel.keys()));
|
||||
assertTrue(Thread.holdsLock(sel.selectedKeys()));
|
||||
});
|
||||
} finally {
|
||||
closePipe(p);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that selection operations remove cancelled keys from the selector's
|
||||
* key and selected-key sets.
|
||||
*/
|
||||
public void testCancel() throws Exception {
|
||||
Pipe p = Pipe.open();
|
||||
try (Selector sel = Selector.open()) {
|
||||
Pipe.SinkChannel sink = p.sink();
|
||||
Pipe.SourceChannel source = p.source();
|
||||
|
||||
// write to sink to ensure that the source is readable
|
||||
sink.write(messageBuffer());
|
||||
|
||||
sink.configureBlocking(false);
|
||||
source.configureBlocking(false);
|
||||
SelectionKey key1 = sink.register(sel, SelectionKey.OP_WRITE);
|
||||
SelectionKey key2 = source.register(sel, SelectionKey.OP_READ);
|
||||
|
||||
sel.selectNow();
|
||||
assertTrue(sel.keys().contains(key1));
|
||||
assertTrue(sel.keys().contains(key2));
|
||||
assertTrue(sel.selectedKeys().contains(key1));
|
||||
assertTrue(sel.selectedKeys().contains(key2));
|
||||
|
||||
// cancel key1
|
||||
key1.cancel();
|
||||
int n = sel.selectNow(k -> assertTrue(k == key2));
|
||||
assertTrue(n == 1);
|
||||
assertFalse(sel.keys().contains(key1));
|
||||
assertTrue(sel.keys().contains(key2));
|
||||
assertFalse(sel.selectedKeys().contains(key1));
|
||||
assertTrue(sel.selectedKeys().contains(key2));
|
||||
|
||||
// cancel key2
|
||||
key2.cancel();
|
||||
n = sel.selectNow(k -> assertTrue(false));
|
||||
assertTrue(n == 0);
|
||||
assertFalse(sel.keys().contains(key1));
|
||||
assertFalse(sel.keys().contains(key2));
|
||||
assertFalse(sel.selectedKeys().contains(key1));
|
||||
assertFalse(sel.selectedKeys().contains(key2));
|
||||
} finally {
|
||||
closePipe(p);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test an action invoking select()
|
||||
*/
|
||||
public void testReentrantSelect1() throws Exception {
|
||||
Pipe p = Pipe.open();
|
||||
try (Selector sel = Selector.open()) {
|
||||
Pipe.SinkChannel sink = p.sink();
|
||||
Pipe.SourceChannel source = p.source();
|
||||
source.configureBlocking(false);
|
||||
source.register(sel, SelectionKey.OP_READ);
|
||||
|
||||
// write to sink to ensure that the source is readable
|
||||
scheduleWrite(sink, messageBuffer(), 100, MILLISECONDS);
|
||||
|
||||
int n = sel.select(k -> {
|
||||
try {
|
||||
sel.select();
|
||||
assertTrue(false);
|
||||
} catch (IOException ioe) {
|
||||
throw new RuntimeException(ioe);
|
||||
} catch (IllegalStateException expected) {
|
||||
}
|
||||
});
|
||||
assertTrue(n == 1);
|
||||
} finally {
|
||||
closePipe(p);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test an action invoking selectNow()
|
||||
*/
|
||||
public void testReentrantSelect2() throws Exception {
|
||||
Pipe p = Pipe.open();
|
||||
try (Selector sel = Selector.open()) {
|
||||
Pipe.SinkChannel sink = p.sink();
|
||||
Pipe.SourceChannel source = p.source();
|
||||
|
||||
// write to sink to ensure that the source is readable
|
||||
scheduleWrite(sink, messageBuffer(), 100, MILLISECONDS);
|
||||
|
||||
source.configureBlocking(false);
|
||||
source.register(sel, SelectionKey.OP_READ);
|
||||
int n = sel.select(k -> {
|
||||
try {
|
||||
sel.selectNow();
|
||||
assertTrue(false);
|
||||
} catch (IOException ioe) {
|
||||
throw new RuntimeException(ioe);
|
||||
} catch (IllegalStateException expected) {
|
||||
}
|
||||
});
|
||||
assertTrue(n == 1);
|
||||
} finally {
|
||||
closePipe(p);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test an action invoking select(Consumer)
|
||||
*/
|
||||
public void testReentrantSelect3() throws Exception {
|
||||
Pipe p = Pipe.open();
|
||||
try (Selector sel = Selector.open()) {
|
||||
Pipe.SinkChannel sink = p.sink();
|
||||
Pipe.SourceChannel source = p.source();
|
||||
|
||||
// write to sink to ensure that the source is readable
|
||||
scheduleWrite(sink, messageBuffer(), 100, MILLISECONDS);
|
||||
|
||||
source.configureBlocking(false);
|
||||
source.register(sel, SelectionKey.OP_READ);
|
||||
int n = sel.select(k -> {
|
||||
try {
|
||||
sel.select(x -> assertTrue(false));
|
||||
assertTrue(false);
|
||||
} catch (IOException ioe) {
|
||||
throw new RuntimeException(ioe);
|
||||
} catch (IllegalStateException expected) {
|
||||
}
|
||||
});
|
||||
assertTrue(n == 1);
|
||||
} finally {
|
||||
closePipe(p);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Negative timeout
|
||||
*/
|
||||
@Test(expectedExceptions = IllegalArgumentException.class)
|
||||
public void testNegativeTimeout() throws Exception {
|
||||
try (Selector sel = Selector.open()) {
|
||||
sel.select(k -> { }, -1L);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Null action
|
||||
*/
|
||||
@Test(expectedExceptions = NullPointerException.class)
|
||||
public void testNull1() throws Exception {
|
||||
try (Selector sel = Selector.open()) {
|
||||
sel.select(null);
|
||||
}
|
||||
}
|
||||
@Test(expectedExceptions = NullPointerException.class)
|
||||
public void testNull2() throws Exception {
|
||||
try (Selector sel = Selector.open()) {
|
||||
sel.select(null, 1000);
|
||||
}
|
||||
}
|
||||
@Test(expectedExceptions = NullPointerException.class)
|
||||
public void testNull3() throws Exception {
|
||||
try (Selector sel = Selector.open()) {
|
||||
sel.selectNow(null);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// -- support methods ---
|
||||
|
||||
private final ScheduledExecutorService POOL = Executors.newScheduledThreadPool(1);
|
||||
|
||||
@AfterTest
|
||||
void shutdownThreadPool() {
|
||||
POOL.shutdown();
|
||||
}
|
||||
|
||||
void scheduleWakeup(Selector sel, long delay, TimeUnit unit) {
|
||||
POOL.schedule(() -> sel.wakeup(), delay, unit);
|
||||
}
|
||||
|
||||
void scheduleInterrupt(Thread t, long delay, TimeUnit unit) {
|
||||
POOL.schedule(() -> t.interrupt(), delay, unit);
|
||||
}
|
||||
|
||||
void scheduleClose(Closeable c, long delay, TimeUnit unit) {
|
||||
POOL.schedule(() -> {
|
||||
try {
|
||||
c.close();
|
||||
} catch (IOException ioe) {
|
||||
ioe.printStackTrace();
|
||||
}
|
||||
}, delay, unit);
|
||||
}
|
||||
|
||||
void scheduleWrite(WritableByteChannel sink, ByteBuffer buf, long delay, TimeUnit unit) {
|
||||
POOL.schedule(() -> {
|
||||
try {
|
||||
sink.write(buf);
|
||||
} catch (IOException ioe) {
|
||||
ioe.printStackTrace();
|
||||
}
|
||||
}, delay, unit);
|
||||
}
|
||||
|
||||
static void closePipe(Pipe p) {
|
||||
try { p.sink().close(); } catch (IOException ignore) { }
|
||||
try { p.source().close(); } catch (IOException ignore) { }
|
||||
}
|
||||
|
||||
static ByteBuffer messageBuffer() {
|
||||
try {
|
||||
return ByteBuffer.wrap("message".getBytes("UTF-8"));
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user