From b63d6d68d93ebc34f8b4091a752eba86ff575fc2 Mon Sep 17 00:00:00 2001
From: Doug Lea
Date: Thu, 26 Mar 2009 11:59:07 -0700
Subject: [PATCH] 6801020: Concurrent Semaphore release may cause some require
thread not signaled
Introduce PROPAGATE waitStatus
Reviewed-by: martin
---
.../locks/AbstractQueuedLongSynchronizer.java | 141 +++++++++++++-----
.../locks/AbstractQueuedSynchronizer.java | 141 +++++++++++++-----
.../concurrent/Semaphore/RacingReleases.java | 116 ++++++++++++++
3 files changed, 320 insertions(+), 78 deletions(-)
create mode 100644 jdk/test/java/util/concurrent/Semaphore/RacingReleases.java
diff --git a/jdk/src/share/classes/java/util/concurrent/locks/AbstractQueuedLongSynchronizer.java b/jdk/src/share/classes/java/util/concurrent/locks/AbstractQueuedLongSynchronizer.java
index a7f048e8bdb..68135015c29 100644
--- a/jdk/src/share/classes/java/util/concurrent/locks/AbstractQueuedLongSynchronizer.java
+++ b/jdk/src/share/classes/java/util/concurrent/locks/AbstractQueuedLongSynchronizer.java
@@ -166,6 +166,11 @@ public abstract class AbstractQueuedLongSynchronizer
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
+ /**
+ * waitStatus value to indicate the next acquireShared should
+ * unconditionally propagate
+ */
+ static final int PROPAGATE = -3;
/**
* Status field, taking on only the values:
@@ -180,10 +185,16 @@ public abstract class AbstractQueuedLongSynchronizer
* Nodes never leave this state. In particular,
* a thread with cancelled node never again blocks.
* CONDITION: This node is currently on a condition queue.
- * It will not be used as a sync queue node until
- * transferred. (Use of this value here
- * has nothing to do with the other uses
- * of the field, but simplifies mechanics.)
+ * It will not be used as a sync queue node
+ * until transferred, at which time the status
+ * will be set to 0. (Use of this value here has
+ * nothing to do with the other uses of the
+ * field, but simplifies mechanics.)
+ * PROPAGATE: A releaseShared should be propagated to other
+ * nodes. This is set (for head node only) in
+ * doReleaseShared to ensure propagation
+ * continues, even if other operations have
+ * since intervened.
* 0: None of the above
*
* The values are arranged numerically to simplify use.
@@ -403,10 +414,13 @@ public abstract class AbstractQueuedLongSynchronizer
*/
private void unparkSuccessor(Node node) {
/*
- * Try to clear status in anticipation of signalling. It is
- * OK if this fails or if status is changed by waiting thread.
+ * If status is negative (i.e., possibly needing signal) try
+ * to clear in anticipation of signalling. It is OK if this
+ * fails or if status is changed by waiting thread.
*/
- compareAndSetWaitStatus(node, Node.SIGNAL, 0);
+ int ws = node.waitStatus;
+ if (ws < 0)
+ compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
@@ -425,24 +439,71 @@ public abstract class AbstractQueuedLongSynchronizer
LockSupport.unpark(s.thread);
}
+ /**
+ * Release action for shared mode -- signal successor and ensure
+ * propagation. (Note: For exclusive mode, release just amounts
+ * to calling unparkSuccessor of head if it needs signal.)
+ */
+ private void doReleaseShared() {
+ /*
+ * Ensure that a release propagates, even if there are other
+ * in-progress acquires/releases. This proceeds in the usual
+ * way of trying to unparkSuccessor of head if it needs
+ * signal. But if it does not, status is set to PROPAGATE to
+ * ensure that upon release, propagation continues.
+ * Additionally, we must loop in case a new node is added
+ * while we are doing this. Also, unlike other uses of
+ * unparkSuccessor, we need to know if CAS to reset status
+ * fails, if so rechecking.
+ */
+ for (;;) {
+ Node h = head;
+ if (h != null && h != tail) {
+ int ws = h.waitStatus;
+ if (ws == Node.SIGNAL) {
+ if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
+ continue; // loop to recheck cases
+ unparkSuccessor(h);
+ }
+ else if (ws == 0 &&
+ !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
+ continue; // loop on failed CAS
+ }
+ if (h == head) // loop if head changed
+ break;
+ }
+ }
+
/**
* Sets head of queue, and checks if successor may be waiting
- * in shared mode, if so propagating if propagate > 0.
+ * in shared mode, if so propagating if either propagate > 0 or
+ * PROPAGATE status was set.
*
- * @param pred the node holding waitStatus for node
* @param node the node
* @param propagate the return value from a tryAcquireShared
*/
private void setHeadAndPropagate(Node node, long propagate) {
+ Node h = head; // Record old head for check below
setHead(node);
- if (propagate > 0 && node.waitStatus != 0) {
- /*
- * Don't bother fully figuring out successor. If it
- * looks null, call unparkSuccessor anyway to be safe.
- */
+ /*
+ * Try to signal next queued node if:
+ * Propagation was indicated by caller,
+ * or was recorded (as h.waitStatus) by a previous operation
+ * (note: this uses sign-check of waitStatus because
+ * PROPAGATE status may transition to SIGNAL.)
+ * and
+ * The next node is waiting in shared mode,
+ * or we don't know, because it appears null
+ *
+ * The conservatism in both of these checks may cause
+ * unnecessary wake-ups, but only when there are multiple
+ * racing acquires/releases, so most need signals now or soon
+ * anyway.
+ */
+ if (propagate > 0 || h == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
- unparkSuccessor(node);
+ doReleaseShared();
}
}
@@ -465,23 +526,27 @@ public abstract class AbstractQueuedLongSynchronizer
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
- // Getting this before setting waitStatus ensures staleness
+ // predNext is the apparent node to unsplice. CASes below will
+ // fail if not, in which case, we lost race vs another cancel
+ // or signal, so no further action is necessary.
Node predNext = pred.next;
- // Can use unconditional write instead of CAS here
+ // Can use unconditional write instead of CAS here.
+ // After this atomic step, other Nodes can skip past us.
+ // Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED;
- // If we are the tail, remove ourselves
+ // If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
- // If "active" predecessor found...
- if (pred != head
- && (pred.waitStatus == Node.SIGNAL
- || compareAndSetWaitStatus(pred, 0, Node.SIGNAL))
- && pred.thread != null) {
-
- // If successor is active, set predecessor's next link
+ // If successor needs signal, try to set pred's next-link
+ // so it will get one. Otherwise wake it up to propagate.
+ int ws;
+ if (pred != head &&
+ ((ws = pred.waitStatus) == Node.SIGNAL ||
+ (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
+ pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
@@ -503,14 +568,14 @@ public abstract class AbstractQueuedLongSynchronizer
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
- int s = pred.waitStatus;
- if (s < 0)
+ int ws = pred.waitStatus;
+ if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
- if (s > 0) {
+ if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
@@ -519,14 +584,14 @@ public abstract class AbstractQueuedLongSynchronizer
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
- }
- else
+ } else {
/*
- * Indicate that we need a signal, but don't park yet. Caller
- * will need to retry to make sure it cannot acquire before
- * parking.
+ * waitStatus must be 0 or PROPAGATE. Indicate that we
+ * need a signal, but don't park yet. Caller will need to
+ * retry to make sure it cannot acquire before parking.
*/
- compareAndSetWaitStatus(pred, 0, Node.SIGNAL);
+ compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
+ }
return false;
}
@@ -1046,9 +1111,7 @@ public abstract class AbstractQueuedLongSynchronizer
*/
public final boolean releaseShared(long arg) {
if (tryReleaseShared(arg)) {
- Node h = head;
- if (h != null && h.waitStatus != 0)
- unparkSuccessor(h);
+ doReleaseShared();
return true;
}
return false;
@@ -1390,8 +1453,8 @@ public abstract class AbstractQueuedLongSynchronizer
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);
- int c = p.waitStatus;
- if (c > 0 || !compareAndSetWaitStatus(p, c, Node.SIGNAL))
+ int ws = p.waitStatus;
+ if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
diff --git a/jdk/src/share/classes/java/util/concurrent/locks/AbstractQueuedSynchronizer.java b/jdk/src/share/classes/java/util/concurrent/locks/AbstractQueuedSynchronizer.java
index 39219bb5fd9..8de1cad1d50 100644
--- a/jdk/src/share/classes/java/util/concurrent/locks/AbstractQueuedSynchronizer.java
+++ b/jdk/src/share/classes/java/util/concurrent/locks/AbstractQueuedSynchronizer.java
@@ -389,6 +389,11 @@ public abstract class AbstractQueuedSynchronizer
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
+ /**
+ * waitStatus value to indicate the next acquireShared should
+ * unconditionally propagate
+ */
+ static final int PROPAGATE = -3;
/**
* Status field, taking on only the values:
@@ -403,10 +408,16 @@ public abstract class AbstractQueuedSynchronizer
* Nodes never leave this state. In particular,
* a thread with cancelled node never again blocks.
* CONDITION: This node is currently on a condition queue.
- * It will not be used as a sync queue node until
- * transferred. (Use of this value here
- * has nothing to do with the other uses
- * of the field, but simplifies mechanics.)
+ * It will not be used as a sync queue node
+ * until transferred, at which time the status
+ * will be set to 0. (Use of this value here has
+ * nothing to do with the other uses of the
+ * field, but simplifies mechanics.)
+ * PROPAGATE: A releaseShared should be propagated to other
+ * nodes. This is set (for head node only) in
+ * doReleaseShared to ensure propagation
+ * continues, even if other operations have
+ * since intervened.
* 0: None of the above
*
* The values are arranged numerically to simplify use.
@@ -626,10 +637,13 @@ public abstract class AbstractQueuedSynchronizer
*/
private void unparkSuccessor(Node node) {
/*
- * Try to clear status in anticipation of signalling. It is
- * OK if this fails or if status is changed by waiting thread.
+ * If status is negative (i.e., possibly needing signal) try
+ * to clear in anticipation of signalling. It is OK if this
+ * fails or if status is changed by waiting thread.
*/
- compareAndSetWaitStatus(node, Node.SIGNAL, 0);
+ int ws = node.waitStatus;
+ if (ws < 0)
+ compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
@@ -648,24 +662,71 @@ public abstract class AbstractQueuedSynchronizer
LockSupport.unpark(s.thread);
}
+ /**
+ * Release action for shared mode -- signal successor and ensure
+ * propagation. (Note: For exclusive mode, release just amounts
+ * to calling unparkSuccessor of head if it needs signal.)
+ */
+ private void doReleaseShared() {
+ /*
+ * Ensure that a release propagates, even if there are other
+ * in-progress acquires/releases. This proceeds in the usual
+ * way of trying to unparkSuccessor of head if it needs
+ * signal. But if it does not, status is set to PROPAGATE to
+ * ensure that upon release, propagation continues.
+ * Additionally, we must loop in case a new node is added
+ * while we are doing this. Also, unlike other uses of
+ * unparkSuccessor, we need to know if CAS to reset status
+ * fails, if so rechecking.
+ */
+ for (;;) {
+ Node h = head;
+ if (h != null && h != tail) {
+ int ws = h.waitStatus;
+ if (ws == Node.SIGNAL) {
+ if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
+ continue; // loop to recheck cases
+ unparkSuccessor(h);
+ }
+ else if (ws == 0 &&
+ !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
+ continue; // loop on failed CAS
+ }
+ if (h == head) // loop if head changed
+ break;
+ }
+ }
+
/**
* Sets head of queue, and checks if successor may be waiting
- * in shared mode, if so propagating if propagate > 0.
+ * in shared mode, if so propagating if either propagate > 0 or
+ * PROPAGATE status was set.
*
- * @param pred the node holding waitStatus for node
* @param node the node
* @param propagate the return value from a tryAcquireShared
*/
private void setHeadAndPropagate(Node node, int propagate) {
+ Node h = head; // Record old head for check below
setHead(node);
- if (propagate > 0 && node.waitStatus != 0) {
- /*
- * Don't bother fully figuring out successor. If it
- * looks null, call unparkSuccessor anyway to be safe.
- */
+ /*
+ * Try to signal next queued node if:
+ * Propagation was indicated by caller,
+ * or was recorded (as h.waitStatus) by a previous operation
+ * (note: this uses sign-check of waitStatus because
+ * PROPAGATE status may transition to SIGNAL.)
+ * and
+ * The next node is waiting in shared mode,
+ * or we don't know, because it appears null
+ *
+ * The conservatism in both of these checks may cause
+ * unnecessary wake-ups, but only when there are multiple
+ * racing acquires/releases, so most need signals now or soon
+ * anyway.
+ */
+ if (propagate > 0 || h == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
- unparkSuccessor(node);
+ doReleaseShared();
}
}
@@ -688,23 +749,27 @@ public abstract class AbstractQueuedSynchronizer
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
- // Getting this before setting waitStatus ensures staleness
+ // predNext is the apparent node to unsplice. CASes below will
+ // fail if not, in which case, we lost race vs another cancel
+ // or signal, so no further action is necessary.
Node predNext = pred.next;
- // Can use unconditional write instead of CAS here
+ // Can use unconditional write instead of CAS here.
+ // After this atomic step, other Nodes can skip past us.
+ // Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED;
- // If we are the tail, remove ourselves
+ // If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
- // If "active" predecessor found...
- if (pred != head
- && (pred.waitStatus == Node.SIGNAL
- || compareAndSetWaitStatus(pred, 0, Node.SIGNAL))
- && pred.thread != null) {
-
- // If successor is active, set predecessor's next link
+ // If successor needs signal, try to set pred's next-link
+ // so it will get one. Otherwise wake it up to propagate.
+ int ws;
+ if (pred != head &&
+ ((ws = pred.waitStatus) == Node.SIGNAL ||
+ (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
+ pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
@@ -726,14 +791,14 @@ public abstract class AbstractQueuedSynchronizer
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
- int s = pred.waitStatus;
- if (s < 0)
+ int ws = pred.waitStatus;
+ if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
- if (s > 0) {
+ if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
@@ -742,14 +807,14 @@ public abstract class AbstractQueuedSynchronizer
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
- }
- else
+ } else {
/*
- * Indicate that we need a signal, but don't park yet. Caller
- * will need to retry to make sure it cannot acquire before
- * parking.
+ * waitStatus must be 0 or PROPAGATE. Indicate that we
+ * need a signal, but don't park yet. Caller will need to
+ * retry to make sure it cannot acquire before parking.
*/
- compareAndSetWaitStatus(pred, 0, Node.SIGNAL);
+ compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
+ }
return false;
}
@@ -1269,9 +1334,7 @@ public abstract class AbstractQueuedSynchronizer
*/
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
- Node h = head;
- if (h != null && h.waitStatus != 0)
- unparkSuccessor(h);
+ doReleaseShared();
return true;
}
return false;
@@ -1613,8 +1676,8 @@ public abstract class AbstractQueuedSynchronizer
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);
- int c = p.waitStatus;
- if (c > 0 || !compareAndSetWaitStatus(p, c, Node.SIGNAL))
+ int ws = p.waitStatus;
+ if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
diff --git a/jdk/test/java/util/concurrent/Semaphore/RacingReleases.java b/jdk/test/java/util/concurrent/Semaphore/RacingReleases.java
new file mode 100644
index 00000000000..c1c573f443f
--- /dev/null
+++ b/jdk/test/java/util/concurrent/Semaphore/RacingReleases.java
@@ -0,0 +1,116 @@
+/*
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ */
+
+/*
+ * This file is available under and governed by the GNU General Public
+ * License version 2 only, as published by the Free Software Foundation.
+ * However, the following notice accompanied the original version of this
+ * file:
+ *
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ */
+
+/*
+ * @test
+ * @bug 6801020 6803402
+ * @summary Try to tickle race conditions in
+ * AbstractQueuedSynchronizer "shared" code
+ */
+
+import java.util.concurrent.Semaphore;
+
+public class RacingReleases {
+
+ /** Increase this for better chance of tickling races */
+ static final int iterations = 1000;
+
+ public static void test(final boolean fair,
+ final boolean interruptibly)
+ throws Throwable {
+ for (int i = 0; i < iterations; i++) {
+ final Semaphore sem = new Semaphore(0, fair);
+ final Throwable[] badness = new Throwable[1];
+ Runnable blocker = interruptibly ?
+ new Runnable() {
+ public void run() {
+ try {
+ sem.acquire();
+ } catch (Throwable t) {
+ badness[0] = t;
+ throw new Error(t);
+ }}}
+ :
+ new Runnable() {
+ public void run() {
+ try {
+ sem.acquireUninterruptibly();
+ } catch (Throwable t) {
+ badness[0] = t;
+ throw new Error(t);
+ }}};
+
+ Thread b1 = new Thread(blocker);
+ Thread b2 = new Thread(blocker);
+ Runnable signaller = new Runnable() {
+ public void run() {
+ try {
+ sem.release();
+ } catch (Throwable t) {
+ badness[0] = t;
+ throw new Error(t);
+ }}};
+ Thread s1 = new Thread(signaller);
+ Thread s2 = new Thread(signaller);
+ Thread[] threads = { b1, b2, s1, s2 };
+ java.util.Collections.shuffle(java.util.Arrays.asList(threads));
+ for (Thread thread : threads)
+ thread.start();
+ for (Thread thread : threads) {
+ thread.join(60 * 1000);
+ if (thread.isAlive())
+ throw new Error
+ (String.format
+ ("Semaphore stuck: permits %d, thread waiting %s%n",
+ sem.availablePermits(),
+ sem.hasQueuedThreads() ? "true" : "false"));
+ }
+ if (badness[0] != null)
+ throw new Error(badness[0]);
+ if (sem.availablePermits() != 0)
+ throw new Error(String.valueOf(sem.availablePermits()));
+ if (sem.hasQueuedThreads())
+ throw new Error(String.valueOf(sem.hasQueuedThreads()));
+ if (sem.getQueueLength() != 0)
+ throw new Error(String.valueOf(sem.getQueueLength()));
+ if (sem.isFair() != fair)
+ throw new Error(String.valueOf(sem.isFair()));
+ }
+ }
+
+ public static void main(String[] args) throws Throwable {
+ for (boolean fair : new boolean[] { true, false })
+ for (boolean interruptibly : new boolean[] { true, false })
+ test(fair, interruptibly);
+ }
+}