8201315: (se) Allow SelectableChannel.register to be invoked while selection operation is in progress

Reviewed-by: bpb
This commit is contained in:
Alan Bateman 2018-04-18 10:20:09 +01:00
parent 76590dc009
commit 33b921f25d
6 changed files with 244 additions and 142 deletions

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2000, 2013, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2000, 2018, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -117,12 +117,6 @@ public abstract class SelectableChannel
*/
public abstract int validOps();
// Internal state:
// keySet, may be empty but is never null, typ. a tiny array
// boolean isRegistered, protected by key set
// regLock, lock object to prevent duplicate registrations
// blocking mode, protected by regLock
/**
* Tells whether or not this channel is currently registered with any
* selectors. A newly-created channel is not registered.
@ -135,8 +129,6 @@ public abstract class SelectableChannel
* @return {@code true} if, and only if, this channel is registered
*/
public abstract boolean isRegistered();
//
// sync(keySet) { return isRegistered; }
/**
* Retrieves the key representing the channel's registration with the given
@ -150,8 +142,6 @@ public abstract class SelectableChannel
* currently registered with that selector
*/
public abstract SelectionKey keyFor(Selector sel);
//
// sync(keySet) { return findKey(sel); }
/**
* Registers this channel with the given selector, returning a selection
@ -171,12 +161,12 @@ public abstract class SelectableChannel
* will be {@code att}.
*
* <p> This method may be invoked at any time. If this method is invoked
* while another invocation of this method or of the {@link
* #configureBlocking(boolean) configureBlocking} method is in progress
* then it will first block until the other operation is complete. This
* method will then synchronize on the selector's key set and therefore may
* block if invoked concurrently with another registration or selection
* operation involving the same selector. </p>
* while a selection operation is in progress then it has no effect upon
* that operation; the new registration or change to the key's interest set
* will be seen by the next selection operation. If this method is invoked
* while an invocation of {@link #configureBlocking(boolean) configureBlocking}
* is in progress then it will block until the channel's blocking mode has
* been adjusted.
*
* <p> If this channel is closed while this operation is in progress then
* the key returned by this method will have been cancelled and will
@ -218,16 +208,6 @@ public abstract class SelectableChannel
*/
public abstract SelectionKey register(Selector sel, int ops, Object att)
throws ClosedChannelException;
//
// sync(regLock) {
// sync(keySet) { look for selector }
// if (channel found) { set interest ops -- may block in selector;
// return key; }
// create new key -- may block somewhere in selector;
// sync(keySet) { add key; }
// attach(attachment);
// return key;
// }
/**
* Registers this channel with the given selector, returning a selection
@ -314,11 +294,6 @@ public abstract class SelectableChannel
*/
public abstract SelectableChannel configureBlocking(boolean block)
throws IOException;
//
// sync(regLock) {
// sync(keySet) { throw IBME if block && isRegistered; }
// change mode;
// }
/**
* Tells whether or not every I/O operation on this channel will block

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2000, 2013, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2000, 2018, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -86,15 +86,9 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
* attached via the {@link #attach attach} method and then later retrieved via
* the {@link #attachment() attachment} method.
*
* <p> Selection keys are safe for use by multiple concurrent threads. The
* operations of reading and writing the interest set will, in general, be
* synchronized with certain operations of the selector. Exactly how this
* synchronization is performed is implementation-dependent: In a naive
* implementation, reading or writing the interest set may block indefinitely
* if a selection operation is already in progress; in a high-performance
* implementation, reading or writing the interest set may block briefly, if at
* all. In any case, a selection operation will always use the interest-set
* value that was current at the moment that the operation began. </p>
* <p> Selection keys are safe for use by multiple concurrent threads. A
* selection operation will always use the interest-set value that was current
* at the moment that the operation began. </p>
*
*
* @author Mark Reinhold
@ -164,10 +158,7 @@ public abstract class SelectionKey {
* Retrieves this key's interest set.
*
* <p> It is guaranteed that the returned set will only contain operation
* bits that are valid for this key's channel.
*
* <p> This method may be invoked at any time. Whether or not it blocks,
* and for how long, is implementation-dependent. </p>
* bits that are valid for this key's channel. </p>
*
* @return This key's interest set
*
@ -179,8 +170,10 @@ public abstract class SelectionKey {
/**
* Sets this key's interest set to the given value.
*
* <p> This method may be invoked at any time. Whether or not it blocks,
* and for how long, is implementation-dependent. </p>
* <p> This method may be invoked at any time. If this method is invoked
* while a selection operation is in progress then it has no effect upon
* that operation; the change to the key's interest set will be seen by the
* next selection operation.
*
* @param ops The new interest set
*

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2000, 2013, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2000, 2018, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -84,11 +84,10 @@ import java.util.Set;
* operations. A key may be removed directly from the selected-key set by
* invoking the set's {@link java.util.Set#remove(java.lang.Object) remove}
* method or by invoking the {@link java.util.Iterator#remove() remove} method
* of an {@link java.util.Iterator iterator} obtained from the
* set. Keys are never removed from the selected-key set in any other way;
* they are not, in particular, removed as a side effect of selection
* operations. Keys may not be added directly to the selected-key set. </p>
*
* of an {@link java.util.Iterator iterator} obtained from the set.
* All keys may be removed from the selected-key set by invoking the set's
* {@link java.util.Set#clear() clear} method. Keys may not be added directly
* to the selected-key set. </p>
*
* <a id="selop"></a>
* <h2>Selection</h2>
@ -144,12 +143,12 @@ import java.util.Set;
*
* <h2>Concurrency</h2>
*
* <p> Selectors are themselves safe for use by multiple concurrent threads;
* their key sets, however, are not.
* <p> A Selector and its key set are safe for use by multiple concurrent
* threads. Its selected-key set and cancelled-key set, however, are not.
*
* <p> The selection operations synchronize on the selector itself, on the key
* set, and on the selected-key set, in that order. They also synchronize on
* the cancelled-key set during steps (1) and (3) above.
* <p> The selection operations synchronize on the selector itself, on the
* selected-key set, in that order. They also synchronize on the cancelled-key
* set during steps (1) and (3) above.
*
* <p> Changes made to the interest sets of a selector's keys while a
* selection operation is in progress have no effect upon that operation; they
@ -180,20 +179,27 @@ import java.util.Set;
*
* </ul>
*
* <p> The {@link #close close} method synchronizes on the selector and all
* three key sets in the same order as in a selection operation.
* <p> The {@link #close close} method synchronizes on the selector and its
* selected-key set in the same order as in a selection operation.
*
* <a id="ksc"></a>
* <p> A Selector's key set is safe for use by multiple concurrent threads.
* Retrieval operations from the key set do not generally block and so may
* overlap with new registrations that add to the set, or with the cancellation
* steps of selection operations that remove keys from the set. Iterators and
* spliterators return elements reflecting the state of the set at some point at
* or since the creation of the iterator/spliterator. They do not throw
* {@link java.util.ConcurrentModificationException ConcurrentModificationException}.
*
* <p> A selector's key and selected-key sets are not, in general, safe for use
* by multiple concurrent threads. If such a thread might modify one of these
* sets directly then access should be controlled by synchronizing on the set
* itself. The iterators returned by these sets' {@link
* java.util.Set#iterator() iterator} methods are <i>fail-fast:</i> If the set
* is modified after the iterator is created, in any way except by invoking the
* iterator's own {@link java.util.Iterator#remove() remove} method, then a
* {@link java.util.ConcurrentModificationException} will be thrown. </p>
*
* <a id="sksc"></a>
* <p> A selector's selected-key set is not, in general, safe for use by
* multiple concurrent threads. If such a thread might modify the set directly
* then access should be controlled by synchronizing on the set itself. The
* iterators returned by the set's {@link java.util.Set#iterator() iterator}
* methods are <i>fail-fast:</i> If the set is modified after the iterator is
* created, in any way except by invoking the iterator's own {@link
* java.util.Iterator#remove() remove} method, then a {@link
* java.util.ConcurrentModificationException} will be thrown. </p>
*
* @author Mark Reinhold
* @author JSR-51 Expert Group
@ -249,7 +255,8 @@ public abstract class Selector implements Closeable {
* attempt to modify the key set will cause an {@link
* UnsupportedOperationException} to be thrown.
*
* <p> The key set is <a href="#ksc">not thread-safe</a>. </p>
* <p> The set is <a href="#ksc">safe</a> for use by multiple concurrent
* threads. </p>
*
* @return This selector's key set
*
@ -265,7 +272,7 @@ public abstract class Selector implements Closeable {
* selected-key set. Any attempt to add an object to the key set will
* cause an {@link UnsupportedOperationException} to be thrown.
*
* <p> The selected-key set is <a href="#ksc">not thread-safe</a>. </p>
* <p> The selected-key set is <a href="#sksc">not thread-safe</a>. </p>
*
* @return This selector's selected-key set
*
@ -326,8 +333,7 @@ public abstract class Selector implements Closeable {
* @throws IllegalArgumentException
* If the value of the timeout argument is negative
*/
public abstract int select(long timeout)
throws IOException;
public abstract int select(long timeout) throws IOException;
/**
* Selects a set of keys whose corresponding channels are ready for I/O

View File

@ -108,7 +108,7 @@ public abstract class AbstractSelectableChannel
if (keys[i] == null)
break;
} else if (keys == null) {
keys = new SelectionKey[3];
keys = new SelectionKey[2];
} else {
// Grow key array
int n = keys.length * 2;
@ -123,14 +123,14 @@ public abstract class AbstractSelectableChannel
}
private SelectionKey findKey(Selector sel) {
synchronized (keyLock) {
if (keys == null)
return null;
for (int i = 0; i < keys.length; i++)
if ((keys[i] != null) && (keys[i].selector() == sel))
return keys[i];
assert Thread.holdsLock(keyLock);
if (keys == null)
return null;
}
for (int i = 0; i < keys.length; i++)
if ((keys[i] != null) && (keys[i].selector() == sel))
return keys[i];
return null;
}
void removeKey(SelectionKey k) { // package-private
@ -166,7 +166,9 @@ public abstract class AbstractSelectableChannel
}
public final SelectionKey keyFor(Selector sel) {
return findKey(sel);
synchronized (keyLock) {
return findKey(sel);
}
}
/**
@ -195,32 +197,31 @@ public abstract class AbstractSelectableChannel
*
* @throws IllegalArgumentException {@inheritDoc}
*/
public final SelectionKey register(Selector sel, int ops,
Object att)
public final SelectionKey register(Selector sel, int ops, Object att)
throws ClosedChannelException
{
if ((ops & ~validOps()) != 0)
throw new IllegalArgumentException();
if (!isOpen())
throw new ClosedChannelException();
synchronized (regLock) {
if (!isOpen())
throw new ClosedChannelException();
if ((ops & ~validOps()) != 0)
throw new IllegalArgumentException();
if (isBlocking())
throw new IllegalBlockingModeException();
SelectionKey k = findKey(sel);
if (k != null) {
k.interestOps(ops);
k.attach(att);
}
if (k == null) {
// New registration
synchronized (keyLock) {
if (!isOpen())
throw new ClosedChannelException();
synchronized (keyLock) {
// re-check if channel has been closed
if (!isOpen())
throw new ClosedChannelException();
SelectionKey k = findKey(sel);
if (k != null) {
k.attach(att);
k.interestOps(ops);
} else {
// New registration
k = ((AbstractSelector)sel).register(this, ops, att);
addKey(k);
}
return k;
}
return k;
}
}
@ -239,12 +240,20 @@ public abstract class AbstractSelectableChannel
*/
protected final void implCloseChannel() throws IOException {
implCloseSelectableChannel();
// clone keys to avoid calling cancel when holding keyLock
SelectionKey[] copyOfKeys = null;
synchronized (keyLock) {
int count = (keys == null) ? 0 : keys.length;
for (int i = 0; i < count; i++) {
SelectionKey k = keys[i];
if (k != null)
k.cancel();
if (keys != null) {
copyOfKeys = keys.clone();
}
}
if (copyOfKeys != null) {
for (SelectionKey k : copyOfKeys) {
if (k != null) {
k.cancel(); // invalidate and adds key to cancelledKey set
}
}
}
}

View File

@ -37,17 +37,18 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* Base Selector implementation class.
*/
public abstract class SelectorImpl
abstract class SelectorImpl
extends AbstractSelector
{
// The set of keys registered with this Selector
protected final Set<SelectionKey> keys;
private final Set<SelectionKey> keys;
// The set of keys with data ready for an operation
protected final Set<SelectionKey> selectedKeys;
@ -58,7 +59,7 @@ public abstract class SelectorImpl
protected SelectorImpl(SelectorProvider sp) {
super(sp);
keys = new HashSet<>();
keys = ConcurrentHashMap.newKeySet();
selectedKeys = new HashSet<>();
publicKeys = Collections.unmodifiableSet(keys);
publicSelectedKeys = Util.ungrowableSet(selectedKeys);
@ -82,11 +83,8 @@ public abstract class SelectorImpl
}
/**
* Returns the public view of the key sets
* Returns the public view of the selected-key set
*/
protected final Set<SelectionKey> nioKeys() {
return publicKeys;
}
protected final Set<SelectionKey> nioSelectedKeys() {
return publicSelectedKeys;
}
@ -116,18 +114,14 @@ public abstract class SelectorImpl
private int lockAndDoSelect(long timeout) throws IOException {
synchronized (this) {
ensureOpen();
synchronized (publicKeys) {
synchronized (publicSelectedKeys) {
return doSelect(timeout);
}
synchronized (publicSelectedKeys) {
return doSelect(timeout);
}
}
}
@Override
public final int select(long timeout)
throws IOException
{
public final int select(long timeout) throws IOException {
if (timeout < 0)
throw new IllegalArgumentException("Negative timeout");
return lockAndDoSelect((timeout == 0) ? -1 : timeout);
@ -135,7 +129,7 @@ public abstract class SelectorImpl
@Override
public final int select() throws IOException {
return select(0);
return lockAndDoSelect(-1);
}
@Override
@ -143,32 +137,33 @@ public abstract class SelectorImpl
return lockAndDoSelect(0);
}
/**
* Invoked by implCloseSelector to close the selector.
*/
protected abstract void implClose() throws IOException;
@Override
public final void implCloseSelector() throws IOException {
wakeup();
synchronized (this) {
implClose();
synchronized (publicKeys) {
synchronized (publicSelectedKeys) {
// Deregister channels
Iterator<SelectionKey> i = keys.iterator();
while (i.hasNext()) {
SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
deregister(ski);
SelectableChannel selch = ski.channel();
if (!selch.isOpen() && !selch.isRegistered())
((SelChImpl)selch).kill();
selectedKeys.remove(ski);
i.remove();
}
assert selectedKeys.isEmpty() && keys.isEmpty();
synchronized (publicSelectedKeys) {
// Deregister channels
Iterator<SelectionKey> i = keys.iterator();
while (i.hasNext()) {
SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
deregister(ski);
SelectableChannel selch = ski.channel();
if (!selch.isOpen() && !selch.isRegistered())
((SelChImpl)selch).kill();
selectedKeys.remove(ski);
i.remove();
}
assert selectedKeys.isEmpty() && keys.isEmpty();
}
}
}
protected abstract void implClose() throws IOException;
@Override
protected final SelectionKey register(AbstractSelectableChannel ch,
int ops,
@ -179,12 +174,21 @@ public abstract class SelectorImpl
SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
k.attach(attachment);
// register with selector (if needed) before adding to key set
// register (if needed) before adding to key set
implRegister(k);
synchronized (publicKeys) {
keys.add(k);
// add to the selector's key set, removing it immediately if the selector
// is closed. The key is not in the channel's key set at this point but
// it may be observed by a thread iterating over the selector's key set.
keys.add(k);
try {
k.interestOps(ops);
} catch (ClosedSelectorException e) {
assert ch.keyFor(this) == null;
keys.remove(k);
k.cancel();
throw e;
}
k.interestOps(ops);
return k;
}
@ -198,11 +202,16 @@ public abstract class SelectorImpl
ensureOpen();
}
/**
* Removes the key from the selector
*/
protected abstract void implDereg(SelectionKeyImpl ski) throws IOException;
/**
* Invoked by selection operations to process the cancelled-key set
*/
protected final void processDeregisterQueue() throws IOException {
assert Thread.holdsLock(this);
assert Thread.holdsLock(publicKeys);
assert Thread.holdsLock(publicSelectedKeys);
Set<SelectionKey> cks = cancelledKeys();
@ -231,7 +240,8 @@ public abstract class SelectorImpl
}
/**
* Change the event set in the selector
* Invoked by interestOps to ensure the interest ops are updated at the
* next selection operation.
*/
protected abstract void setEventOps(SelectionKeyImpl ski);
}

View File

@ -0,0 +1,109 @@
/*
* Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
/* @test
* @bug 8201315
* @summary Test that channels can be registered, interest ops can changed,
* and keys cancelled while a selection operation is in progress.
*/
import java.io.IOException;
import java.nio.channels.Pipe;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Phaser;
public class RegisterDuringSelect {
static Callable<Void> selectLoop(Selector sel, Phaser barrier) {
return new Callable<Void>() {
@Override
public Void call() throws IOException {
for (;;) {
sel.select();
if (sel.isOpen()) {
barrier.arriveAndAwaitAdvance();
} else {
// closed
return null;
}
}
}
};
}
/**
* Invoke register, interestOps, and cancel concurrently with a thread
* doing a selection operation
*/
public static void main(String args[]) throws Exception {
Future<Void> result;
ExecutorService pool = Executors.newFixedThreadPool(1);
try (Selector sel = Selector.open()) {
Phaser barrier = new Phaser(2);
// submit task to do the select loop
result = pool.submit(selectLoop(sel, barrier));
Pipe p = Pipe.open();
try {
Pipe.SourceChannel sc = p.source();
sc.configureBlocking(false);
System.out.println("register ...");
SelectionKey key = sc.register(sel, SelectionKey.OP_READ);
if (!sel.keys().contains(key))
throw new RuntimeException("key not in key set");
sel.wakeup();
barrier.arriveAndAwaitAdvance();
System.out.println("interestOps ...");
key.interestOps(0);
sel.wakeup();
barrier.arriveAndAwaitAdvance();
System.out.println("cancel ...");
key.cancel();
sel.wakeup();
barrier.arriveAndAwaitAdvance();
if (sel.keys().contains(key))
throw new RuntimeException("key not removed from key set");
} finally {
p.source().close();
p.sink().close();
}
} finally {
pool.shutdown();
}
// ensure selectLoop completes without exception
result.get();
}
}