8191937: Lost interrupt in AbstractQueuedSynchronizer when tryAcquire methods throw
Reviewed-by: martin, psandoz
This commit is contained in:
parent
fad5094503
commit
eac77274e8
@ -422,8 +422,8 @@ public abstract class AbstractQueuedLongSynchronizer
|
||||
* @return {@code true} if interrupted while waiting
|
||||
*/
|
||||
final boolean acquireQueued(final Node node, long arg) {
|
||||
boolean interrupted = false;
|
||||
try {
|
||||
boolean interrupted = false;
|
||||
for (;;) {
|
||||
final Node p = node.predecessor();
|
||||
if (p == head && tryAcquire(arg)) {
|
||||
@ -431,12 +431,13 @@ public abstract class AbstractQueuedLongSynchronizer
|
||||
p.next = null; // help GC
|
||||
return interrupted;
|
||||
}
|
||||
if (shouldParkAfterFailedAcquire(p, node) &&
|
||||
parkAndCheckInterrupt())
|
||||
interrupted = true;
|
||||
if (shouldParkAfterFailedAcquire(p, node))
|
||||
interrupted |= parkAndCheckInterrupt();
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
cancelAcquire(node);
|
||||
if (interrupted)
|
||||
selfInterrupt();
|
||||
throw t;
|
||||
}
|
||||
}
|
||||
@ -510,8 +511,8 @@ public abstract class AbstractQueuedLongSynchronizer
|
||||
*/
|
||||
private void doAcquireShared(long arg) {
|
||||
final Node node = addWaiter(Node.SHARED);
|
||||
boolean interrupted = false;
|
||||
try {
|
||||
boolean interrupted = false;
|
||||
for (;;) {
|
||||
final Node p = node.predecessor();
|
||||
if (p == head) {
|
||||
@ -519,18 +520,18 @@ public abstract class AbstractQueuedLongSynchronizer
|
||||
if (r >= 0) {
|
||||
setHeadAndPropagate(node, r);
|
||||
p.next = null; // help GC
|
||||
if (interrupted)
|
||||
selfInterrupt();
|
||||
return;
|
||||
}
|
||||
}
|
||||
if (shouldParkAfterFailedAcquire(p, node) &&
|
||||
parkAndCheckInterrupt())
|
||||
interrupted = true;
|
||||
if (shouldParkAfterFailedAcquire(p, node))
|
||||
interrupted |= parkAndCheckInterrupt();
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
cancelAcquire(node);
|
||||
throw t;
|
||||
} finally {
|
||||
if (interrupted)
|
||||
selfInterrupt();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -505,7 +505,7 @@ public abstract class AbstractQueuedSynchronizer
|
||||
*
|
||||
* @return the predecessor of this node
|
||||
*/
|
||||
final Node predecessor() throws NullPointerException {
|
||||
final Node predecessor() {
|
||||
Node p = prev;
|
||||
if (p == null)
|
||||
throw new NullPointerException();
|
||||
@ -902,8 +902,8 @@ public abstract class AbstractQueuedSynchronizer
|
||||
* @return {@code true} if interrupted while waiting
|
||||
*/
|
||||
final boolean acquireQueued(final Node node, int arg) {
|
||||
boolean interrupted = false;
|
||||
try {
|
||||
boolean interrupted = false;
|
||||
for (;;) {
|
||||
final Node p = node.predecessor();
|
||||
if (p == head && tryAcquire(arg)) {
|
||||
@ -911,12 +911,13 @@ public abstract class AbstractQueuedSynchronizer
|
||||
p.next = null; // help GC
|
||||
return interrupted;
|
||||
}
|
||||
if (shouldParkAfterFailedAcquire(p, node) &&
|
||||
parkAndCheckInterrupt())
|
||||
interrupted = true;
|
||||
if (shouldParkAfterFailedAcquire(p, node))
|
||||
interrupted |= parkAndCheckInterrupt();
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
cancelAcquire(node);
|
||||
if (interrupted)
|
||||
selfInterrupt();
|
||||
throw t;
|
||||
}
|
||||
}
|
||||
@ -990,8 +991,8 @@ public abstract class AbstractQueuedSynchronizer
|
||||
*/
|
||||
private void doAcquireShared(int arg) {
|
||||
final Node node = addWaiter(Node.SHARED);
|
||||
boolean interrupted = false;
|
||||
try {
|
||||
boolean interrupted = false;
|
||||
for (;;) {
|
||||
final Node p = node.predecessor();
|
||||
if (p == head) {
|
||||
@ -999,18 +1000,18 @@ public abstract class AbstractQueuedSynchronizer
|
||||
if (r >= 0) {
|
||||
setHeadAndPropagate(node, r);
|
||||
p.next = null; // help GC
|
||||
if (interrupted)
|
||||
selfInterrupt();
|
||||
return;
|
||||
}
|
||||
}
|
||||
if (shouldParkAfterFailedAcquire(p, node) &&
|
||||
parkAndCheckInterrupt())
|
||||
interrupted = true;
|
||||
if (shouldParkAfterFailedAcquire(p, node))
|
||||
interrupted |= parkAndCheckInterrupt();
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
cancelAcquire(node);
|
||||
throw t;
|
||||
} finally {
|
||||
if (interrupted)
|
||||
selfInterrupt();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -39,6 +39,7 @@ import static java.util.concurrent.TimeUnit.NANOSECONDS;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.locks.AbstractQueuedLongSynchronizer;
|
||||
import java.util.concurrent.locks.AbstractQueuedLongSynchronizer.ConditionObject;
|
||||
|
||||
@ -1283,4 +1284,57 @@ public class AbstractQueuedLongSynchronizerTest extends JSR166TestCase {
|
||||
sync.release();
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests scenario for
|
||||
* JDK-8191937: Lost interrupt in AbstractQueuedSynchronizer when tryAcquire methods throw
|
||||
*/
|
||||
public void testInterruptedFailingAcquire() throws InterruptedException {
|
||||
final RuntimeException ex = new RuntimeException();
|
||||
|
||||
// A synchronizer only offering a choice of failure modes
|
||||
class Sync extends AbstractQueuedLongSynchronizer {
|
||||
boolean pleaseThrow;
|
||||
@Override protected boolean tryAcquire(long ignored) {
|
||||
if (pleaseThrow) throw ex;
|
||||
return false;
|
||||
}
|
||||
@Override protected long tryAcquireShared(long ignored) {
|
||||
if (pleaseThrow) throw ex;
|
||||
return -1;
|
||||
}
|
||||
@Override protected boolean tryRelease(long ignored) {
|
||||
return true;
|
||||
}
|
||||
@Override protected boolean tryReleaseShared(long ignored) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
final Sync s = new Sync();
|
||||
|
||||
final Thread thread = newStartedThread(new CheckedRunnable() {
|
||||
public void realRun() {
|
||||
try {
|
||||
if (ThreadLocalRandom.current().nextBoolean())
|
||||
s.acquire(1);
|
||||
else
|
||||
s.acquireShared(1);
|
||||
shouldThrow();
|
||||
} catch (Throwable t) {
|
||||
assertSame(ex, t);
|
||||
assertTrue(Thread.interrupted());
|
||||
}
|
||||
}});
|
||||
waitForThreadToEnterWaitState(thread);
|
||||
assertSame(thread, s.getFirstQueuedThread());
|
||||
assertTrue(s.hasQueuedPredecessors());
|
||||
assertTrue(s.hasQueuedThreads());
|
||||
assertEquals(1, s.getQueueLength());
|
||||
|
||||
s.pleaseThrow = true;
|
||||
thread.interrupt();
|
||||
s.release(1);
|
||||
awaitTermination(thread);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -36,9 +36,11 @@
|
||||
import static java.util.concurrent.TimeUnit.MILLISECONDS;
|
||||
import static java.util.concurrent.TimeUnit.NANOSECONDS;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
|
||||
import java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject;
|
||||
|
||||
@ -1286,4 +1288,105 @@ public class AbstractQueuedSynchronizerTest extends JSR166TestCase {
|
||||
sync.release();
|
||||
}
|
||||
|
||||
/**
|
||||
* Disabled demo test for (unfixed as of 2017-11)
|
||||
* JDK-8191483: AbstractQueuedSynchronizer cancel/cancel race
|
||||
* ant -Djsr166.tckTestClass=AbstractQueuedSynchronizerTest -Djsr166.methodFilter=testCancelCancelRace -Djsr166.runsPerTest=100 tck
|
||||
*/
|
||||
public void DISABLED_testCancelCancelRace() throws InterruptedException {
|
||||
class Sync extends AbstractQueuedSynchronizer {
|
||||
protected boolean tryAcquire(int acquires) {
|
||||
return !hasQueuedPredecessors() && compareAndSetState(0, 1);
|
||||
}
|
||||
protected boolean tryRelease(int releases) {
|
||||
return compareAndSetState(1, 0);
|
||||
}
|
||||
}
|
||||
|
||||
Sync s = new Sync();
|
||||
s.acquire(1); // acquire to force other threads to enqueue
|
||||
|
||||
// try to trigger double cancel race with two background threads
|
||||
ArrayList<Thread> threads = new ArrayList<>();
|
||||
Runnable failedAcquire = () -> {
|
||||
try {
|
||||
s.acquireInterruptibly(1);
|
||||
shouldThrow();
|
||||
} catch (InterruptedException expected) {}
|
||||
};
|
||||
for (int i = 0; i < 2; i++) {
|
||||
Thread thread = new Thread(failedAcquire);
|
||||
thread.start();
|
||||
threads.add(thread);
|
||||
}
|
||||
Thread.sleep(100);
|
||||
for (Thread thread : threads) thread.interrupt();
|
||||
for (Thread thread : threads) awaitTermination(thread);
|
||||
|
||||
s.release(1);
|
||||
|
||||
// no one holds lock now, we should be able to acquire
|
||||
if (!s.tryAcquire(1))
|
||||
throw new RuntimeException(
|
||||
String.format(
|
||||
"Broken: hasQueuedPredecessors=%s hasQueuedThreads=%s queueLength=%d firstQueuedThread=%s",
|
||||
s.hasQueuedPredecessors(),
|
||||
s.hasQueuedThreads(),
|
||||
s.getQueueLength(),
|
||||
s.getFirstQueuedThread()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests scenario for
|
||||
* JDK-8191937: Lost interrupt in AbstractQueuedSynchronizer when tryAcquire methods throw
|
||||
*/
|
||||
public void testInterruptedFailingAcquire() throws InterruptedException {
|
||||
final RuntimeException ex = new RuntimeException();
|
||||
|
||||
// A synchronizer only offering a choice of failure modes
|
||||
class Sync extends AbstractQueuedSynchronizer {
|
||||
boolean pleaseThrow;
|
||||
@Override protected boolean tryAcquire(int ignored) {
|
||||
if (pleaseThrow) throw ex;
|
||||
return false;
|
||||
}
|
||||
@Override protected int tryAcquireShared(int ignored) {
|
||||
if (pleaseThrow) throw ex;
|
||||
return -1;
|
||||
}
|
||||
@Override protected boolean tryRelease(int ignored) {
|
||||
return true;
|
||||
}
|
||||
@Override protected boolean tryReleaseShared(int ignored) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
final Sync s = new Sync();
|
||||
|
||||
final Thread thread = newStartedThread(new CheckedRunnable() {
|
||||
public void realRun() {
|
||||
try {
|
||||
if (ThreadLocalRandom.current().nextBoolean())
|
||||
s.acquire(1);
|
||||
else
|
||||
s.acquireShared(1);
|
||||
shouldThrow();
|
||||
} catch (Throwable t) {
|
||||
assertSame(ex, t);
|
||||
assertTrue(Thread.interrupted());
|
||||
}
|
||||
}});
|
||||
waitForThreadToEnterWaitState(thread);
|
||||
assertSame(thread, s.getFirstQueuedThread());
|
||||
assertTrue(s.hasQueuedPredecessors());
|
||||
assertTrue(s.hasQueuedThreads());
|
||||
assertEquals(1, s.getQueueLength());
|
||||
|
||||
s.pleaseThrow = true;
|
||||
thread.interrupt();
|
||||
s.release(1);
|
||||
awaitTermination(thread);
|
||||
}
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user