8009751: (se) Selector spin when select, close and interestOps(0) invoked at same time (lnx)
Reviewed-by: zhouyx, chegar, robm
This commit is contained in:
parent
b8b487dd5f
commit
0bcb468549
@ -26,9 +26,9 @@
|
||||
package sun.nio.ch;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.LinkedList;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.BitSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Manipulates a native array of epoll_event structs on Linux:
|
||||
@ -52,37 +52,78 @@ import java.util.Iterator;
|
||||
* this implementation we set data.fd to be the file descriptor that we
|
||||
* register. That way, we have the file descriptor available when we
|
||||
* process the events.
|
||||
*
|
||||
* All file descriptors registered with epoll have the POLLHUP and POLLERR
|
||||
* events enabled even when registered with an event set of 0. To ensure
|
||||
* that epoll_wait doesn't poll an idle file descriptor when the underlying
|
||||
* connection is closed or reset then its registration is deleted from
|
||||
* epoll (it will be re-added again if the event set is changed)
|
||||
*/
|
||||
|
||||
class EPollArrayWrapper {
|
||||
// EPOLL_EVENTS
|
||||
static final int EPOLLIN = 0x001;
|
||||
private static final int EPOLLIN = 0x001;
|
||||
|
||||
// opcodes
|
||||
static final int EPOLL_CTL_ADD = 1;
|
||||
static final int EPOLL_CTL_DEL = 2;
|
||||
static final int EPOLL_CTL_MOD = 3;
|
||||
private static final int EPOLL_CTL_ADD = 1;
|
||||
private static final int EPOLL_CTL_DEL = 2;
|
||||
private static final int EPOLL_CTL_MOD = 3;
|
||||
|
||||
// Miscellaneous constants
|
||||
static final int SIZE_EPOLLEVENT = sizeofEPollEvent();
|
||||
static final int EVENT_OFFSET = 0;
|
||||
static final int DATA_OFFSET = offsetofData();
|
||||
static final int FD_OFFSET = DATA_OFFSET;
|
||||
static final int NUM_EPOLLEVENTS = Math.min(IOUtil.fdLimit(), 8192);
|
||||
private static final int SIZE_EPOLLEVENT = sizeofEPollEvent();
|
||||
private static final int EVENT_OFFSET = 0;
|
||||
private static final int DATA_OFFSET = offsetofData();
|
||||
private static final int FD_OFFSET = DATA_OFFSET;
|
||||
private static final int OPEN_MAX = IOUtil.fdLimit();
|
||||
private static final int NUM_EPOLLEVENTS = Math.min(OPEN_MAX, 8192);
|
||||
|
||||
// Base address of the native pollArray
|
||||
// Special value to indicate that an update should be ignored
|
||||
private static final byte KILLED = (byte)-1;
|
||||
|
||||
// Initial size of arrays for fd registration changes
|
||||
private static final int INITIAL_PENDING_UPDATE_SIZE = 64;
|
||||
|
||||
// maximum size of updatesLow
|
||||
private static final int MAX_UPDATE_ARRAY_SIZE = Math.min(OPEN_MAX, 64*1024);
|
||||
|
||||
|
||||
// The fd of the epoll driver
|
||||
private final int epfd;
|
||||
|
||||
// The epoll_event array for results from epoll_wait
|
||||
private final AllocatedNativeObject pollArray;
|
||||
|
||||
// Base address of the epoll_event array
|
||||
private final long pollArrayAddress;
|
||||
|
||||
// Set of "idle" channels
|
||||
private final HashSet<SelChImpl> idleSet;
|
||||
// The fd of the interrupt line going out
|
||||
private int outgoingInterruptFD;
|
||||
|
||||
EPollArrayWrapper() {
|
||||
// The fd of the interrupt line coming in
|
||||
private int incomingInterruptFD;
|
||||
|
||||
// The index of the interrupt FD
|
||||
private int interruptedIndex;
|
||||
|
||||
// Number of updated pollfd entries
|
||||
int updated;
|
||||
|
||||
// object to synchronize fd registration changes
|
||||
private final Object updateLock = new Object();
|
||||
|
||||
// number of file descriptors with registration changes pending
|
||||
private int updateCount;
|
||||
|
||||
// file descriptors with registration changes pending
|
||||
private int[] updateDescriptors = new int[INITIAL_PENDING_UPDATE_SIZE];
|
||||
|
||||
// events for file descriptors with registration changes pending, indexed
|
||||
// by file descriptor and stored as bytes for efficiency reasons. For
|
||||
// file descriptors higher than MAX_UPDATE_ARRAY_SIZE (unlimited case at
|
||||
// least) then the update is stored in a map.
|
||||
private final byte[] eventsLow = new byte[MAX_UPDATE_ARRAY_SIZE];
|
||||
private Map<Integer,Byte> eventsHigh;
|
||||
|
||||
// Used by release and updateRegistrations to track whether a file
|
||||
// descriptor is registered with epoll.
|
||||
private final BitSet registered = new BitSet();
|
||||
|
||||
|
||||
EPollArrayWrapper() throws IOException {
|
||||
// creates the epoll file descriptor
|
||||
epfd = epollCreate();
|
||||
|
||||
@ -91,50 +132,11 @@ class EPollArrayWrapper {
|
||||
pollArray = new AllocatedNativeObject(allocationSize, true);
|
||||
pollArrayAddress = pollArray.address();
|
||||
|
||||
for (int i=0; i<NUM_EPOLLEVENTS; i++) {
|
||||
putEventOps(i, 0);
|
||||
putData(i, 0L);
|
||||
}
|
||||
|
||||
// create idle set
|
||||
idleSet = new HashSet<SelChImpl>();
|
||||
// eventHigh needed when using file descriptors > 64k
|
||||
if (OPEN_MAX > MAX_UPDATE_ARRAY_SIZE)
|
||||
eventsHigh = new HashMap<>();
|
||||
}
|
||||
|
||||
// Used to update file description registrations
|
||||
private static class Updator {
|
||||
SelChImpl channel;
|
||||
int opcode;
|
||||
int events;
|
||||
Updator(SelChImpl channel, int opcode, int events) {
|
||||
this.channel = channel;
|
||||
this.opcode = opcode;
|
||||
this.events = events;
|
||||
}
|
||||
Updator(SelChImpl channel, int opcode) {
|
||||
this(channel, opcode, 0);
|
||||
}
|
||||
}
|
||||
|
||||
private LinkedList<Updator> updateList = new LinkedList<Updator>();
|
||||
|
||||
// The epoll_event array for results from epoll_wait
|
||||
private AllocatedNativeObject pollArray;
|
||||
|
||||
// The fd of the epoll driver
|
||||
final int epfd;
|
||||
|
||||
// The fd of the interrupt line going out
|
||||
int outgoingInterruptFD;
|
||||
|
||||
// The fd of the interrupt line coming in
|
||||
int incomingInterruptFD;
|
||||
|
||||
// The index of the interrupt FD
|
||||
int interruptedIndex;
|
||||
|
||||
// Number of updated pollfd entries
|
||||
int updated;
|
||||
|
||||
void initInterrupt(int fd0, int fd1) {
|
||||
outgoingInterruptFD = fd1;
|
||||
incomingInterruptFD = fd0;
|
||||
@ -146,11 +148,6 @@ class EPollArrayWrapper {
|
||||
pollArray.putInt(offset, event);
|
||||
}
|
||||
|
||||
void putData(int i, long value) {
|
||||
int offset = SIZE_EPOLLEVENT * i + DATA_OFFSET;
|
||||
pollArray.putLong(offset, value);
|
||||
}
|
||||
|
||||
void putDescriptor(int i, int fd) {
|
||||
int offset = SIZE_EPOLLEVENT * i + FD_OFFSET;
|
||||
pollArray.putInt(offset, fd);
|
||||
@ -167,51 +164,83 @@ class EPollArrayWrapper {
|
||||
}
|
||||
|
||||
/**
|
||||
* Update the events for a given channel.
|
||||
* Sets the pending update events for the given file descriptor. This
|
||||
* method has no effect if the update events is already set to KILLED,
|
||||
* unless {@code force} is {@code true}.
|
||||
*/
|
||||
void setInterest(SelChImpl channel, int mask) {
|
||||
synchronized (updateList) {
|
||||
// if the previous pending operation is to add this file descriptor
|
||||
// to epoll then update its event set
|
||||
if (updateList.size() > 0) {
|
||||
Updator last = updateList.getLast();
|
||||
if (last.channel == channel && last.opcode == EPOLL_CTL_ADD) {
|
||||
last.events = mask;
|
||||
return;
|
||||
}
|
||||
private void setUpdateEvents(int fd, byte events, boolean force) {
|
||||
if (fd < MAX_UPDATE_ARRAY_SIZE) {
|
||||
if ((eventsLow[fd] != KILLED) || force) {
|
||||
eventsLow[fd] = events;
|
||||
}
|
||||
} else {
|
||||
Integer key = Integer.valueOf(fd);
|
||||
if ((eventsHigh.get(key) != KILLED) || force) {
|
||||
eventsHigh.put(key, Byte.valueOf(events));
|
||||
}
|
||||
|
||||
// update existing registration
|
||||
updateList.add(new Updator(channel, EPOLL_CTL_MOD, mask));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a channel's file descriptor to epoll
|
||||
* Returns the pending update events for the given file descriptor.
|
||||
*/
|
||||
void add(SelChImpl channel) {
|
||||
synchronized (updateList) {
|
||||
updateList.add(new Updator(channel, EPOLL_CTL_ADD));
|
||||
private byte getUpdateEvents(int fd) {
|
||||
if (fd < MAX_UPDATE_ARRAY_SIZE) {
|
||||
return eventsLow[fd];
|
||||
} else {
|
||||
Byte result = eventsHigh.get(Integer.valueOf(fd));
|
||||
// result should never be null
|
||||
return result.byteValue();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove a channel's file descriptor from epoll
|
||||
* Update the events for a given file descriptor
|
||||
*/
|
||||
void release(SelChImpl channel) {
|
||||
synchronized (updateList) {
|
||||
// flush any pending updates
|
||||
for (Iterator<Updator> it = updateList.iterator(); it.hasNext();) {
|
||||
if (it.next().channel == channel) {
|
||||
it.remove();
|
||||
}
|
||||
void setInterest(int fd, int mask) {
|
||||
synchronized (updateLock) {
|
||||
// record the file descriptor and events
|
||||
int oldCapacity = updateDescriptors.length;
|
||||
if (updateCount == oldCapacity) {
|
||||
int newCapacity = oldCapacity + INITIAL_PENDING_UPDATE_SIZE;
|
||||
int[] newDescriptors = new int[newCapacity];
|
||||
System.arraycopy(updateDescriptors, 0, newDescriptors, 0, oldCapacity);
|
||||
updateDescriptors = newDescriptors;
|
||||
}
|
||||
updateDescriptors[updateCount++] = fd;
|
||||
|
||||
// remove from the idle set (if present)
|
||||
idleSet.remove(channel);
|
||||
// events are stored as bytes for efficiency reasons
|
||||
byte b = (byte)mask;
|
||||
assert (b == mask) && (b != KILLED);
|
||||
setUpdateEvents(fd, b, false);
|
||||
}
|
||||
}
|
||||
|
||||
// remove from epoll (if registered)
|
||||
epollCtl(epfd, EPOLL_CTL_DEL, channel.getFDVal(), 0);
|
||||
/**
|
||||
* Add a file descriptor
|
||||
*/
|
||||
void add(int fd) {
|
||||
// force the initial update events to 0 as it may be KILLED by a
|
||||
// previous registration.
|
||||
synchronized (updateLock) {
|
||||
assert !registered.get(fd);
|
||||
setUpdateEvents(fd, (byte)0, true);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove a file descriptor
|
||||
*/
|
||||
void remove(int fd) {
|
||||
synchronized (updateLock) {
|
||||
// kill pending and future update for this file descriptor
|
||||
setUpdateEvents(fd, KILLED, false);
|
||||
|
||||
// remove from epoll
|
||||
if (registered.get(fd)) {
|
||||
epollCtl(epfd, EPOLL_CTL_DEL, fd, 0);
|
||||
registered.clear(fd);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -239,36 +268,38 @@ class EPollArrayWrapper {
|
||||
/**
|
||||
* Update the pending registrations.
|
||||
*/
|
||||
void updateRegistrations() {
|
||||
synchronized (updateList) {
|
||||
Updator u = null;
|
||||
while ((u = updateList.poll()) != null) {
|
||||
SelChImpl ch = u.channel;
|
||||
if (!ch.isOpen())
|
||||
continue;
|
||||
private void updateRegistrations() {
|
||||
synchronized (updateLock) {
|
||||
int j = 0;
|
||||
while (j < updateCount) {
|
||||
int fd = updateDescriptors[j];
|
||||
short events = getUpdateEvents(fd);
|
||||
boolean isRegistered = registered.get(fd);
|
||||
int opcode = 0;
|
||||
|
||||
// if the events are 0 then file descriptor is put into "idle
|
||||
// set" to prevent it being polled
|
||||
if (u.events == 0) {
|
||||
boolean added = idleSet.add(u.channel);
|
||||
// if added to idle set then remove from epoll if registered
|
||||
if (added && (u.opcode == EPOLL_CTL_MOD))
|
||||
epollCtl(epfd, EPOLL_CTL_DEL, ch.getFDVal(), 0);
|
||||
} else {
|
||||
// events are specified. If file descriptor was in idle set
|
||||
// it must be re-registered (by converting opcode to ADD)
|
||||
boolean idle = false;
|
||||
if (!idleSet.isEmpty())
|
||||
idle = idleSet.remove(u.channel);
|
||||
int opcode = (idle) ? EPOLL_CTL_ADD : u.opcode;
|
||||
epollCtl(epfd, opcode, ch.getFDVal(), u.events);
|
||||
if (events != KILLED) {
|
||||
if (isRegistered) {
|
||||
opcode = (events != 0) ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
|
||||
} else {
|
||||
opcode = (events != 0) ? EPOLL_CTL_ADD : 0;
|
||||
}
|
||||
if (opcode != 0) {
|
||||
epollCtl(epfd, opcode, fd, events);
|
||||
if (opcode == EPOLL_CTL_ADD) {
|
||||
registered.set(fd);
|
||||
} else if (opcode == EPOLL_CTL_DEL) {
|
||||
registered.clear(fd);
|
||||
}
|
||||
}
|
||||
}
|
||||
j++;
|
||||
}
|
||||
updateCount = 0;
|
||||
}
|
||||
}
|
||||
|
||||
// interrupt support
|
||||
boolean interrupted = false;
|
||||
private boolean interrupted = false;
|
||||
|
||||
public void interrupt() {
|
||||
interrupt(outgoingInterruptFD);
|
||||
|
@ -53,26 +53,24 @@ class EPollSelectorImpl
|
||||
private volatile boolean closed = false;
|
||||
|
||||
// Lock for interrupt triggering and clearing
|
||||
private Object interruptLock = new Object();
|
||||
private final Object interruptLock = new Object();
|
||||
private boolean interruptTriggered = false;
|
||||
|
||||
/**
|
||||
* Package private constructor called by factory method in
|
||||
* the abstract superclass Selector.
|
||||
*/
|
||||
EPollSelectorImpl(SelectorProvider sp) {
|
||||
EPollSelectorImpl(SelectorProvider sp) throws IOException {
|
||||
super(sp);
|
||||
long pipeFds = IOUtil.makePipe(false);
|
||||
fd0 = (int) (pipeFds >>> 32);
|
||||
fd1 = (int) pipeFds;
|
||||
pollWrapper = new EPollArrayWrapper();
|
||||
pollWrapper.initInterrupt(fd0, fd1);
|
||||
fdToKey = new HashMap<Integer,SelectionKeyImpl>();
|
||||
fdToKey = new HashMap<>();
|
||||
}
|
||||
|
||||
protected int doSelect(long timeout)
|
||||
throws IOException
|
||||
{
|
||||
protected int doSelect(long timeout) throws IOException {
|
||||
if (closed)
|
||||
throw new ClosedSelectorException();
|
||||
processDeregisterQueue();
|
||||
@ -161,8 +159,9 @@ class EPollSelectorImpl
|
||||
if (closed)
|
||||
throw new ClosedSelectorException();
|
||||
SelChImpl ch = ski.channel;
|
||||
fdToKey.put(Integer.valueOf(ch.getFDVal()), ski);
|
||||
pollWrapper.add(ch);
|
||||
int fd = Integer.valueOf(ch.getFDVal());
|
||||
fdToKey.put(fd, ski);
|
||||
pollWrapper.add(fd);
|
||||
keys.add(ski);
|
||||
}
|
||||
|
||||
@ -171,7 +170,7 @@ class EPollSelectorImpl
|
||||
SelChImpl ch = ski.channel;
|
||||
int fd = ch.getFDVal();
|
||||
fdToKey.remove(Integer.valueOf(fd));
|
||||
pollWrapper.release(ch);
|
||||
pollWrapper.remove(fd);
|
||||
ski.setIndex(-1);
|
||||
keys.remove(ski);
|
||||
selectedKeys.remove(ski);
|
||||
@ -181,10 +180,11 @@ class EPollSelectorImpl
|
||||
((SelChImpl)selch).kill();
|
||||
}
|
||||
|
||||
public void putEventOps(SelectionKeyImpl sk, int ops) {
|
||||
public void putEventOps(SelectionKeyImpl ski, int ops) {
|
||||
if (closed)
|
||||
throw new ClosedSelectorException();
|
||||
pollWrapper.setInterest(sk.channel, ops);
|
||||
SelChImpl ch = ski.channel;
|
||||
pollWrapper.setInterest(ch.getFDVal(), ops);
|
||||
}
|
||||
|
||||
public Selector wakeup() {
|
||||
@ -200,5 +200,4 @@ class EPollSelectorImpl
|
||||
static {
|
||||
Util.load();
|
||||
}
|
||||
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user