6801020: Concurrent Semaphore release may cause some require thread not signaled
Introduce PROPAGATE waitStatus Reviewed-by: martin
This commit is contained in:
parent
5d6fffa036
commit
b63d6d68d9
@ -166,6 +166,11 @@ public abstract class AbstractQueuedLongSynchronizer
|
|||||||
static final int SIGNAL = -1;
|
static final int SIGNAL = -1;
|
||||||
/** waitStatus value to indicate thread is waiting on condition */
|
/** waitStatus value to indicate thread is waiting on condition */
|
||||||
static final int CONDITION = -2;
|
static final int CONDITION = -2;
|
||||||
|
/**
|
||||||
|
* waitStatus value to indicate the next acquireShared should
|
||||||
|
* unconditionally propagate
|
||||||
|
*/
|
||||||
|
static final int PROPAGATE = -3;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Status field, taking on only the values:
|
* Status field, taking on only the values:
|
||||||
@ -180,10 +185,16 @@ public abstract class AbstractQueuedLongSynchronizer
|
|||||||
* Nodes never leave this state. In particular,
|
* Nodes never leave this state. In particular,
|
||||||
* a thread with cancelled node never again blocks.
|
* a thread with cancelled node never again blocks.
|
||||||
* CONDITION: This node is currently on a condition queue.
|
* CONDITION: This node is currently on a condition queue.
|
||||||
* It will not be used as a sync queue node until
|
* It will not be used as a sync queue node
|
||||||
* transferred. (Use of this value here
|
* until transferred, at which time the status
|
||||||
* has nothing to do with the other uses
|
* will be set to 0. (Use of this value here has
|
||||||
* of the field, but simplifies mechanics.)
|
* nothing to do with the other uses of the
|
||||||
|
* field, but simplifies mechanics.)
|
||||||
|
* PROPAGATE: A releaseShared should be propagated to other
|
||||||
|
* nodes. This is set (for head node only) in
|
||||||
|
* doReleaseShared to ensure propagation
|
||||||
|
* continues, even if other operations have
|
||||||
|
* since intervened.
|
||||||
* 0: None of the above
|
* 0: None of the above
|
||||||
*
|
*
|
||||||
* The values are arranged numerically to simplify use.
|
* The values are arranged numerically to simplify use.
|
||||||
@ -403,10 +414,13 @@ public abstract class AbstractQueuedLongSynchronizer
|
|||||||
*/
|
*/
|
||||||
private void unparkSuccessor(Node node) {
|
private void unparkSuccessor(Node node) {
|
||||||
/*
|
/*
|
||||||
* Try to clear status in anticipation of signalling. It is
|
* If status is negative (i.e., possibly needing signal) try
|
||||||
* OK if this fails or if status is changed by waiting thread.
|
* to clear in anticipation of signalling. It is OK if this
|
||||||
|
* fails or if status is changed by waiting thread.
|
||||||
*/
|
*/
|
||||||
compareAndSetWaitStatus(node, Node.SIGNAL, 0);
|
int ws = node.waitStatus;
|
||||||
|
if (ws < 0)
|
||||||
|
compareAndSetWaitStatus(node, ws, 0);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Thread to unpark is held in successor, which is normally
|
* Thread to unpark is held in successor, which is normally
|
||||||
@ -425,24 +439,71 @@ public abstract class AbstractQueuedLongSynchronizer
|
|||||||
LockSupport.unpark(s.thread);
|
LockSupport.unpark(s.thread);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Release action for shared mode -- signal successor and ensure
|
||||||
|
* propagation. (Note: For exclusive mode, release just amounts
|
||||||
|
* to calling unparkSuccessor of head if it needs signal.)
|
||||||
|
*/
|
||||||
|
private void doReleaseShared() {
|
||||||
|
/*
|
||||||
|
* Ensure that a release propagates, even if there are other
|
||||||
|
* in-progress acquires/releases. This proceeds in the usual
|
||||||
|
* way of trying to unparkSuccessor of head if it needs
|
||||||
|
* signal. But if it does not, status is set to PROPAGATE to
|
||||||
|
* ensure that upon release, propagation continues.
|
||||||
|
* Additionally, we must loop in case a new node is added
|
||||||
|
* while we are doing this. Also, unlike other uses of
|
||||||
|
* unparkSuccessor, we need to know if CAS to reset status
|
||||||
|
* fails, if so rechecking.
|
||||||
|
*/
|
||||||
|
for (;;) {
|
||||||
|
Node h = head;
|
||||||
|
if (h != null && h != tail) {
|
||||||
|
int ws = h.waitStatus;
|
||||||
|
if (ws == Node.SIGNAL) {
|
||||||
|
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
|
||||||
|
continue; // loop to recheck cases
|
||||||
|
unparkSuccessor(h);
|
||||||
|
}
|
||||||
|
else if (ws == 0 &&
|
||||||
|
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
|
||||||
|
continue; // loop on failed CAS
|
||||||
|
}
|
||||||
|
if (h == head) // loop if head changed
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sets head of queue, and checks if successor may be waiting
|
* Sets head of queue, and checks if successor may be waiting
|
||||||
* in shared mode, if so propagating if propagate > 0.
|
* in shared mode, if so propagating if either propagate > 0 or
|
||||||
|
* PROPAGATE status was set.
|
||||||
*
|
*
|
||||||
* @param pred the node holding waitStatus for node
|
|
||||||
* @param node the node
|
* @param node the node
|
||||||
* @param propagate the return value from a tryAcquireShared
|
* @param propagate the return value from a tryAcquireShared
|
||||||
*/
|
*/
|
||||||
private void setHeadAndPropagate(Node node, long propagate) {
|
private void setHeadAndPropagate(Node node, long propagate) {
|
||||||
|
Node h = head; // Record old head for check below
|
||||||
setHead(node);
|
setHead(node);
|
||||||
if (propagate > 0 && node.waitStatus != 0) {
|
/*
|
||||||
/*
|
* Try to signal next queued node if:
|
||||||
* Don't bother fully figuring out successor. If it
|
* Propagation was indicated by caller,
|
||||||
* looks null, call unparkSuccessor anyway to be safe.
|
* or was recorded (as h.waitStatus) by a previous operation
|
||||||
*/
|
* (note: this uses sign-check of waitStatus because
|
||||||
|
* PROPAGATE status may transition to SIGNAL.)
|
||||||
|
* and
|
||||||
|
* The next node is waiting in shared mode,
|
||||||
|
* or we don't know, because it appears null
|
||||||
|
*
|
||||||
|
* The conservatism in both of these checks may cause
|
||||||
|
* unnecessary wake-ups, but only when there are multiple
|
||||||
|
* racing acquires/releases, so most need signals now or soon
|
||||||
|
* anyway.
|
||||||
|
*/
|
||||||
|
if (propagate > 0 || h == null || h.waitStatus < 0) {
|
||||||
Node s = node.next;
|
Node s = node.next;
|
||||||
if (s == null || s.isShared())
|
if (s == null || s.isShared())
|
||||||
unparkSuccessor(node);
|
doReleaseShared();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -465,23 +526,27 @@ public abstract class AbstractQueuedLongSynchronizer
|
|||||||
while (pred.waitStatus > 0)
|
while (pred.waitStatus > 0)
|
||||||
node.prev = pred = pred.prev;
|
node.prev = pred = pred.prev;
|
||||||
|
|
||||||
// Getting this before setting waitStatus ensures staleness
|
// predNext is the apparent node to unsplice. CASes below will
|
||||||
|
// fail if not, in which case, we lost race vs another cancel
|
||||||
|
// or signal, so no further action is necessary.
|
||||||
Node predNext = pred.next;
|
Node predNext = pred.next;
|
||||||
|
|
||||||
// Can use unconditional write instead of CAS here
|
// Can use unconditional write instead of CAS here.
|
||||||
|
// After this atomic step, other Nodes can skip past us.
|
||||||
|
// Before, we are free of interference from other threads.
|
||||||
node.waitStatus = Node.CANCELLED;
|
node.waitStatus = Node.CANCELLED;
|
||||||
|
|
||||||
// If we are the tail, remove ourselves
|
// If we are the tail, remove ourselves.
|
||||||
if (node == tail && compareAndSetTail(node, pred)) {
|
if (node == tail && compareAndSetTail(node, pred)) {
|
||||||
compareAndSetNext(pred, predNext, null);
|
compareAndSetNext(pred, predNext, null);
|
||||||
} else {
|
} else {
|
||||||
// If "active" predecessor found...
|
// If successor needs signal, try to set pred's next-link
|
||||||
if (pred != head
|
// so it will get one. Otherwise wake it up to propagate.
|
||||||
&& (pred.waitStatus == Node.SIGNAL
|
int ws;
|
||||||
|| compareAndSetWaitStatus(pred, 0, Node.SIGNAL))
|
if (pred != head &&
|
||||||
&& pred.thread != null) {
|
((ws = pred.waitStatus) == Node.SIGNAL ||
|
||||||
|
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
|
||||||
// If successor is active, set predecessor's next link
|
pred.thread != null) {
|
||||||
Node next = node.next;
|
Node next = node.next;
|
||||||
if (next != null && next.waitStatus <= 0)
|
if (next != null && next.waitStatus <= 0)
|
||||||
compareAndSetNext(pred, predNext, next);
|
compareAndSetNext(pred, predNext, next);
|
||||||
@ -503,14 +568,14 @@ public abstract class AbstractQueuedLongSynchronizer
|
|||||||
* @return {@code true} if thread should block
|
* @return {@code true} if thread should block
|
||||||
*/
|
*/
|
||||||
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
|
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
|
||||||
int s = pred.waitStatus;
|
int ws = pred.waitStatus;
|
||||||
if (s < 0)
|
if (ws == Node.SIGNAL)
|
||||||
/*
|
/*
|
||||||
* This node has already set status asking a release
|
* This node has already set status asking a release
|
||||||
* to signal it, so it can safely park.
|
* to signal it, so it can safely park.
|
||||||
*/
|
*/
|
||||||
return true;
|
return true;
|
||||||
if (s > 0) {
|
if (ws > 0) {
|
||||||
/*
|
/*
|
||||||
* Predecessor was cancelled. Skip over predecessors and
|
* Predecessor was cancelled. Skip over predecessors and
|
||||||
* indicate retry.
|
* indicate retry.
|
||||||
@ -519,14 +584,14 @@ public abstract class AbstractQueuedLongSynchronizer
|
|||||||
node.prev = pred = pred.prev;
|
node.prev = pred = pred.prev;
|
||||||
} while (pred.waitStatus > 0);
|
} while (pred.waitStatus > 0);
|
||||||
pred.next = node;
|
pred.next = node;
|
||||||
}
|
} else {
|
||||||
else
|
|
||||||
/*
|
/*
|
||||||
* Indicate that we need a signal, but don't park yet. Caller
|
* waitStatus must be 0 or PROPAGATE. Indicate that we
|
||||||
* will need to retry to make sure it cannot acquire before
|
* need a signal, but don't park yet. Caller will need to
|
||||||
* parking.
|
* retry to make sure it cannot acquire before parking.
|
||||||
*/
|
*/
|
||||||
compareAndSetWaitStatus(pred, 0, Node.SIGNAL);
|
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
|
||||||
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1046,9 +1111,7 @@ public abstract class AbstractQueuedLongSynchronizer
|
|||||||
*/
|
*/
|
||||||
public final boolean releaseShared(long arg) {
|
public final boolean releaseShared(long arg) {
|
||||||
if (tryReleaseShared(arg)) {
|
if (tryReleaseShared(arg)) {
|
||||||
Node h = head;
|
doReleaseShared();
|
||||||
if (h != null && h.waitStatus != 0)
|
|
||||||
unparkSuccessor(h);
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
@ -1390,8 +1453,8 @@ public abstract class AbstractQueuedLongSynchronizer
|
|||||||
* case the waitStatus can be transiently and harmlessly wrong).
|
* case the waitStatus can be transiently and harmlessly wrong).
|
||||||
*/
|
*/
|
||||||
Node p = enq(node);
|
Node p = enq(node);
|
||||||
int c = p.waitStatus;
|
int ws = p.waitStatus;
|
||||||
if (c > 0 || !compareAndSetWaitStatus(p, c, Node.SIGNAL))
|
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
|
||||||
LockSupport.unpark(node.thread);
|
LockSupport.unpark(node.thread);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -389,6 +389,11 @@ public abstract class AbstractQueuedSynchronizer
|
|||||||
static final int SIGNAL = -1;
|
static final int SIGNAL = -1;
|
||||||
/** waitStatus value to indicate thread is waiting on condition */
|
/** waitStatus value to indicate thread is waiting on condition */
|
||||||
static final int CONDITION = -2;
|
static final int CONDITION = -2;
|
||||||
|
/**
|
||||||
|
* waitStatus value to indicate the next acquireShared should
|
||||||
|
* unconditionally propagate
|
||||||
|
*/
|
||||||
|
static final int PROPAGATE = -3;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Status field, taking on only the values:
|
* Status field, taking on only the values:
|
||||||
@ -403,10 +408,16 @@ public abstract class AbstractQueuedSynchronizer
|
|||||||
* Nodes never leave this state. In particular,
|
* Nodes never leave this state. In particular,
|
||||||
* a thread with cancelled node never again blocks.
|
* a thread with cancelled node never again blocks.
|
||||||
* CONDITION: This node is currently on a condition queue.
|
* CONDITION: This node is currently on a condition queue.
|
||||||
* It will not be used as a sync queue node until
|
* It will not be used as a sync queue node
|
||||||
* transferred. (Use of this value here
|
* until transferred, at which time the status
|
||||||
* has nothing to do with the other uses
|
* will be set to 0. (Use of this value here has
|
||||||
* of the field, but simplifies mechanics.)
|
* nothing to do with the other uses of the
|
||||||
|
* field, but simplifies mechanics.)
|
||||||
|
* PROPAGATE: A releaseShared should be propagated to other
|
||||||
|
* nodes. This is set (for head node only) in
|
||||||
|
* doReleaseShared to ensure propagation
|
||||||
|
* continues, even if other operations have
|
||||||
|
* since intervened.
|
||||||
* 0: None of the above
|
* 0: None of the above
|
||||||
*
|
*
|
||||||
* The values are arranged numerically to simplify use.
|
* The values are arranged numerically to simplify use.
|
||||||
@ -626,10 +637,13 @@ public abstract class AbstractQueuedSynchronizer
|
|||||||
*/
|
*/
|
||||||
private void unparkSuccessor(Node node) {
|
private void unparkSuccessor(Node node) {
|
||||||
/*
|
/*
|
||||||
* Try to clear status in anticipation of signalling. It is
|
* If status is negative (i.e., possibly needing signal) try
|
||||||
* OK if this fails or if status is changed by waiting thread.
|
* to clear in anticipation of signalling. It is OK if this
|
||||||
|
* fails or if status is changed by waiting thread.
|
||||||
*/
|
*/
|
||||||
compareAndSetWaitStatus(node, Node.SIGNAL, 0);
|
int ws = node.waitStatus;
|
||||||
|
if (ws < 0)
|
||||||
|
compareAndSetWaitStatus(node, ws, 0);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Thread to unpark is held in successor, which is normally
|
* Thread to unpark is held in successor, which is normally
|
||||||
@ -648,24 +662,71 @@ public abstract class AbstractQueuedSynchronizer
|
|||||||
LockSupport.unpark(s.thread);
|
LockSupport.unpark(s.thread);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Release action for shared mode -- signal successor and ensure
|
||||||
|
* propagation. (Note: For exclusive mode, release just amounts
|
||||||
|
* to calling unparkSuccessor of head if it needs signal.)
|
||||||
|
*/
|
||||||
|
private void doReleaseShared() {
|
||||||
|
/*
|
||||||
|
* Ensure that a release propagates, even if there are other
|
||||||
|
* in-progress acquires/releases. This proceeds in the usual
|
||||||
|
* way of trying to unparkSuccessor of head if it needs
|
||||||
|
* signal. But if it does not, status is set to PROPAGATE to
|
||||||
|
* ensure that upon release, propagation continues.
|
||||||
|
* Additionally, we must loop in case a new node is added
|
||||||
|
* while we are doing this. Also, unlike other uses of
|
||||||
|
* unparkSuccessor, we need to know if CAS to reset status
|
||||||
|
* fails, if so rechecking.
|
||||||
|
*/
|
||||||
|
for (;;) {
|
||||||
|
Node h = head;
|
||||||
|
if (h != null && h != tail) {
|
||||||
|
int ws = h.waitStatus;
|
||||||
|
if (ws == Node.SIGNAL) {
|
||||||
|
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
|
||||||
|
continue; // loop to recheck cases
|
||||||
|
unparkSuccessor(h);
|
||||||
|
}
|
||||||
|
else if (ws == 0 &&
|
||||||
|
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
|
||||||
|
continue; // loop on failed CAS
|
||||||
|
}
|
||||||
|
if (h == head) // loop if head changed
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sets head of queue, and checks if successor may be waiting
|
* Sets head of queue, and checks if successor may be waiting
|
||||||
* in shared mode, if so propagating if propagate > 0.
|
* in shared mode, if so propagating if either propagate > 0 or
|
||||||
|
* PROPAGATE status was set.
|
||||||
*
|
*
|
||||||
* @param pred the node holding waitStatus for node
|
|
||||||
* @param node the node
|
* @param node the node
|
||||||
* @param propagate the return value from a tryAcquireShared
|
* @param propagate the return value from a tryAcquireShared
|
||||||
*/
|
*/
|
||||||
private void setHeadAndPropagate(Node node, int propagate) {
|
private void setHeadAndPropagate(Node node, int propagate) {
|
||||||
|
Node h = head; // Record old head for check below
|
||||||
setHead(node);
|
setHead(node);
|
||||||
if (propagate > 0 && node.waitStatus != 0) {
|
/*
|
||||||
/*
|
* Try to signal next queued node if:
|
||||||
* Don't bother fully figuring out successor. If it
|
* Propagation was indicated by caller,
|
||||||
* looks null, call unparkSuccessor anyway to be safe.
|
* or was recorded (as h.waitStatus) by a previous operation
|
||||||
*/
|
* (note: this uses sign-check of waitStatus because
|
||||||
|
* PROPAGATE status may transition to SIGNAL.)
|
||||||
|
* and
|
||||||
|
* The next node is waiting in shared mode,
|
||||||
|
* or we don't know, because it appears null
|
||||||
|
*
|
||||||
|
* The conservatism in both of these checks may cause
|
||||||
|
* unnecessary wake-ups, but only when there are multiple
|
||||||
|
* racing acquires/releases, so most need signals now or soon
|
||||||
|
* anyway.
|
||||||
|
*/
|
||||||
|
if (propagate > 0 || h == null || h.waitStatus < 0) {
|
||||||
Node s = node.next;
|
Node s = node.next;
|
||||||
if (s == null || s.isShared())
|
if (s == null || s.isShared())
|
||||||
unparkSuccessor(node);
|
doReleaseShared();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -688,23 +749,27 @@ public abstract class AbstractQueuedSynchronizer
|
|||||||
while (pred.waitStatus > 0)
|
while (pred.waitStatus > 0)
|
||||||
node.prev = pred = pred.prev;
|
node.prev = pred = pred.prev;
|
||||||
|
|
||||||
// Getting this before setting waitStatus ensures staleness
|
// predNext is the apparent node to unsplice. CASes below will
|
||||||
|
// fail if not, in which case, we lost race vs another cancel
|
||||||
|
// or signal, so no further action is necessary.
|
||||||
Node predNext = pred.next;
|
Node predNext = pred.next;
|
||||||
|
|
||||||
// Can use unconditional write instead of CAS here
|
// Can use unconditional write instead of CAS here.
|
||||||
|
// After this atomic step, other Nodes can skip past us.
|
||||||
|
// Before, we are free of interference from other threads.
|
||||||
node.waitStatus = Node.CANCELLED;
|
node.waitStatus = Node.CANCELLED;
|
||||||
|
|
||||||
// If we are the tail, remove ourselves
|
// If we are the tail, remove ourselves.
|
||||||
if (node == tail && compareAndSetTail(node, pred)) {
|
if (node == tail && compareAndSetTail(node, pred)) {
|
||||||
compareAndSetNext(pred, predNext, null);
|
compareAndSetNext(pred, predNext, null);
|
||||||
} else {
|
} else {
|
||||||
// If "active" predecessor found...
|
// If successor needs signal, try to set pred's next-link
|
||||||
if (pred != head
|
// so it will get one. Otherwise wake it up to propagate.
|
||||||
&& (pred.waitStatus == Node.SIGNAL
|
int ws;
|
||||||
|| compareAndSetWaitStatus(pred, 0, Node.SIGNAL))
|
if (pred != head &&
|
||||||
&& pred.thread != null) {
|
((ws = pred.waitStatus) == Node.SIGNAL ||
|
||||||
|
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
|
||||||
// If successor is active, set predecessor's next link
|
pred.thread != null) {
|
||||||
Node next = node.next;
|
Node next = node.next;
|
||||||
if (next != null && next.waitStatus <= 0)
|
if (next != null && next.waitStatus <= 0)
|
||||||
compareAndSetNext(pred, predNext, next);
|
compareAndSetNext(pred, predNext, next);
|
||||||
@ -726,14 +791,14 @@ public abstract class AbstractQueuedSynchronizer
|
|||||||
* @return {@code true} if thread should block
|
* @return {@code true} if thread should block
|
||||||
*/
|
*/
|
||||||
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
|
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
|
||||||
int s = pred.waitStatus;
|
int ws = pred.waitStatus;
|
||||||
if (s < 0)
|
if (ws == Node.SIGNAL)
|
||||||
/*
|
/*
|
||||||
* This node has already set status asking a release
|
* This node has already set status asking a release
|
||||||
* to signal it, so it can safely park.
|
* to signal it, so it can safely park.
|
||||||
*/
|
*/
|
||||||
return true;
|
return true;
|
||||||
if (s > 0) {
|
if (ws > 0) {
|
||||||
/*
|
/*
|
||||||
* Predecessor was cancelled. Skip over predecessors and
|
* Predecessor was cancelled. Skip over predecessors and
|
||||||
* indicate retry.
|
* indicate retry.
|
||||||
@ -742,14 +807,14 @@ public abstract class AbstractQueuedSynchronizer
|
|||||||
node.prev = pred = pred.prev;
|
node.prev = pred = pred.prev;
|
||||||
} while (pred.waitStatus > 0);
|
} while (pred.waitStatus > 0);
|
||||||
pred.next = node;
|
pred.next = node;
|
||||||
}
|
} else {
|
||||||
else
|
|
||||||
/*
|
/*
|
||||||
* Indicate that we need a signal, but don't park yet. Caller
|
* waitStatus must be 0 or PROPAGATE. Indicate that we
|
||||||
* will need to retry to make sure it cannot acquire before
|
* need a signal, but don't park yet. Caller will need to
|
||||||
* parking.
|
* retry to make sure it cannot acquire before parking.
|
||||||
*/
|
*/
|
||||||
compareAndSetWaitStatus(pred, 0, Node.SIGNAL);
|
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
|
||||||
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1269,9 +1334,7 @@ public abstract class AbstractQueuedSynchronizer
|
|||||||
*/
|
*/
|
||||||
public final boolean releaseShared(int arg) {
|
public final boolean releaseShared(int arg) {
|
||||||
if (tryReleaseShared(arg)) {
|
if (tryReleaseShared(arg)) {
|
||||||
Node h = head;
|
doReleaseShared();
|
||||||
if (h != null && h.waitStatus != 0)
|
|
||||||
unparkSuccessor(h);
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
@ -1613,8 +1676,8 @@ public abstract class AbstractQueuedSynchronizer
|
|||||||
* case the waitStatus can be transiently and harmlessly wrong).
|
* case the waitStatus can be transiently and harmlessly wrong).
|
||||||
*/
|
*/
|
||||||
Node p = enq(node);
|
Node p = enq(node);
|
||||||
int c = p.waitStatus;
|
int ws = p.waitStatus;
|
||||||
if (c > 0 || !compareAndSetWaitStatus(p, c, Node.SIGNAL))
|
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
|
||||||
LockSupport.unpark(node.thread);
|
LockSupport.unpark(node.thread);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
116
jdk/test/java/util/concurrent/Semaphore/RacingReleases.java
Normal file
116
jdk/test/java/util/concurrent/Semaphore/RacingReleases.java
Normal file
@ -0,0 +1,116 @@
|
|||||||
|
/*
|
||||||
|
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||||
|
*
|
||||||
|
* This code is free software; you can redistribute it and/or modify it
|
||||||
|
* under the terms of the GNU General Public License version 2 only, as
|
||||||
|
* published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This code is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
||||||
|
* version 2 for more details (a copy is included in the LICENSE file that
|
||||||
|
* accompanied this code).
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License version
|
||||||
|
* 2 along with this work; if not, write to the Free Software Foundation,
|
||||||
|
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||||
|
*
|
||||||
|
* Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
|
||||||
|
* CA 95054 USA or visit www.sun.com if you need additional information or
|
||||||
|
* have any questions.
|
||||||
|
*/
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is available under and governed by the GNU General Public
|
||||||
|
* License version 2 only, as published by the Free Software Foundation.
|
||||||
|
* However, the following notice accompanied the original version of this
|
||||||
|
* file:
|
||||||
|
*
|
||||||
|
* Written by Doug Lea with assistance from members of JCP JSR-166
|
||||||
|
* Expert Group and released to the public domain, as explained at
|
||||||
|
* http://creativecommons.org/licenses/publicdomain
|
||||||
|
*/
|
||||||
|
|
||||||
|
/*
|
||||||
|
* @test
|
||||||
|
* @bug 6801020 6803402
|
||||||
|
* @summary Try to tickle race conditions in
|
||||||
|
* AbstractQueuedSynchronizer "shared" code
|
||||||
|
*/
|
||||||
|
|
||||||
|
import java.util.concurrent.Semaphore;
|
||||||
|
|
||||||
|
public class RacingReleases {
|
||||||
|
|
||||||
|
/** Increase this for better chance of tickling races */
|
||||||
|
static final int iterations = 1000;
|
||||||
|
|
||||||
|
public static void test(final boolean fair,
|
||||||
|
final boolean interruptibly)
|
||||||
|
throws Throwable {
|
||||||
|
for (int i = 0; i < iterations; i++) {
|
||||||
|
final Semaphore sem = new Semaphore(0, fair);
|
||||||
|
final Throwable[] badness = new Throwable[1];
|
||||||
|
Runnable blocker = interruptibly ?
|
||||||
|
new Runnable() {
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
sem.acquire();
|
||||||
|
} catch (Throwable t) {
|
||||||
|
badness[0] = t;
|
||||||
|
throw new Error(t);
|
||||||
|
}}}
|
||||||
|
:
|
||||||
|
new Runnable() {
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
sem.acquireUninterruptibly();
|
||||||
|
} catch (Throwable t) {
|
||||||
|
badness[0] = t;
|
||||||
|
throw new Error(t);
|
||||||
|
}}};
|
||||||
|
|
||||||
|
Thread b1 = new Thread(blocker);
|
||||||
|
Thread b2 = new Thread(blocker);
|
||||||
|
Runnable signaller = new Runnable() {
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
sem.release();
|
||||||
|
} catch (Throwable t) {
|
||||||
|
badness[0] = t;
|
||||||
|
throw new Error(t);
|
||||||
|
}}};
|
||||||
|
Thread s1 = new Thread(signaller);
|
||||||
|
Thread s2 = new Thread(signaller);
|
||||||
|
Thread[] threads = { b1, b2, s1, s2 };
|
||||||
|
java.util.Collections.shuffle(java.util.Arrays.asList(threads));
|
||||||
|
for (Thread thread : threads)
|
||||||
|
thread.start();
|
||||||
|
for (Thread thread : threads) {
|
||||||
|
thread.join(60 * 1000);
|
||||||
|
if (thread.isAlive())
|
||||||
|
throw new Error
|
||||||
|
(String.format
|
||||||
|
("Semaphore stuck: permits %d, thread waiting %s%n",
|
||||||
|
sem.availablePermits(),
|
||||||
|
sem.hasQueuedThreads() ? "true" : "false"));
|
||||||
|
}
|
||||||
|
if (badness[0] != null)
|
||||||
|
throw new Error(badness[0]);
|
||||||
|
if (sem.availablePermits() != 0)
|
||||||
|
throw new Error(String.valueOf(sem.availablePermits()));
|
||||||
|
if (sem.hasQueuedThreads())
|
||||||
|
throw new Error(String.valueOf(sem.hasQueuedThreads()));
|
||||||
|
if (sem.getQueueLength() != 0)
|
||||||
|
throw new Error(String.valueOf(sem.getQueueLength()));
|
||||||
|
if (sem.isFair() != fair)
|
||||||
|
throw new Error(String.valueOf(sem.isFair()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Throwable {
|
||||||
|
for (boolean fair : new boolean[] { true, false })
|
||||||
|
for (boolean interruptibly : new boolean[] { true, false })
|
||||||
|
test(fair, interruptibly);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user