8160402: Garbage retention with CompletableFuture.anyOf

Reviewed-by: martin, psandoz, plevart
This commit is contained in:
Doug Lea 2016-07-26 09:53:38 -07:00
parent b7284ca20e
commit 6db3f79f39
2 changed files with 93 additions and 54 deletions

View File

@ -221,7 +221,10 @@ public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
* across both while pushing actions. The second completion is
* a CoCompletion pointing to the first, shared so that at most
* one performs the action. The multiple-arity methods allOf
* and anyOf do this pairwise to form trees of completions.
* does this pairwise to form trees of completions. Method
* anyOf is handled differently from allOf because completion of
* any source should trigger a cleanStack of other sources.
* Each AnyOf completion can reach others via a shared array.
*
* Note that the generic type parameters of methods vary according
* to whether "this" is a source, dependent, or completion.
@ -588,9 +591,9 @@ public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
}
/**
* Post-processing by dependent after successful UniCompletion
* tryFire. Tries to clean stack of source a, and then either runs
* postComplete or returns this to caller, depending on mode.
* Post-processing by dependent after successful UniCompletion tryFire.
* Tries to clean stack of source a, and then either runs postComplete
* or returns this to caller, depending on mode.
*/
final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) {
if (a != null && a.stack != null) {
@ -1003,12 +1006,12 @@ public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
}
@SuppressWarnings("serial")
static final class UniRelay<T> extends UniCompletion<T,T> {
UniRelay(CompletableFuture<T> dep, CompletableFuture<T> src) {
static final class UniRelay<U, T extends U> extends UniCompletion<T,U> {
UniRelay(CompletableFuture<U> dep, CompletableFuture<T> src) {
super(null, dep, src);
}
final CompletableFuture<T> tryFire(int mode) {
CompletableFuture<T> d; CompletableFuture<T> a; Object r;
final CompletableFuture<U> tryFire(int mode) {
CompletableFuture<U> d; CompletableFuture<T> a; Object r;
if ((d = dep) == null
|| (a = src) == null || (r = a.result) == null)
return null;
@ -1019,13 +1022,14 @@ public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
}
}
private CompletableFuture<T> uniCopyStage() {
private static <U, T extends U> CompletableFuture<U> uniCopyStage(
CompletableFuture<T> src) {
Object r;
CompletableFuture<T> d = newIncompleteFuture();
if ((r = result) != null)
CompletableFuture<U> d = src.newIncompleteFuture();
if ((r = src.result) != null)
d.result = encodeRelay(r);
else
unipush(new UniRelay<T>(d, this));
src.unipush(new UniRelay<U,T>(d, src));
return d;
}
@ -1034,7 +1038,7 @@ public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
if ((r = result) != null)
return new MinimalStage<T>(encodeRelay(r));
MinimalStage<T> d = new MinimalStage<T>();
unipush(new UniRelay<T>(d, this));
unipush(new UniRelay<T,T>(d, this));
return d;
}
@ -1069,7 +1073,7 @@ public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
if ((r = g.result) != null)
d.completeRelay(r);
else {
g.unipush(new UniRelay<V>(d, g));
g.unipush(new UniRelay<V,V>(d, g));
if (d.result == null)
return null;
}
@ -1103,7 +1107,7 @@ public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
if ((s = g.result) != null)
d.result = encodeRelay(s);
else {
g.unipush(new UniRelay<V>(d, g));
g.unipush(new UniRelay<V,V>(d, g));
}
} catch (Throwable ex) {
d.result = encodeThrowable(ex);
@ -1637,45 +1641,40 @@ public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
return d;
}
/** Completion for an anyOf input future. */
@SuppressWarnings("serial")
static final class OrRelay<T,U> extends BiCompletion<T,U,Object> { // for Or
OrRelay(CompletableFuture<Object> dep,
CompletableFuture<T> src, CompletableFuture<U> snd) {
super(null, dep, src, snd);
static class AnyOf extends Completion {
CompletableFuture<Object> dep; CompletableFuture<?> src;
CompletableFuture<?>[] srcs;
AnyOf(CompletableFuture<Object> dep, CompletableFuture<?> src,
CompletableFuture<?>[] srcs) {
this.dep = dep; this.src = src; this.srcs = srcs;
}
final CompletableFuture<Object> tryFire(int mode) {
CompletableFuture<Object> d;
CompletableFuture<T> a;
CompletableFuture<U> b;
// assert mode != ASYNC;
CompletableFuture<Object> d; CompletableFuture<?> a;
CompletableFuture<?>[] as;
Object r;
if ((d = dep) == null
|| (a = src) == null || (b = snd) == null
|| ((r = a.result) == null && (r = b.result) == null))
|| (a = src) == null || (r = a.result) == null
|| (as = srcs) == null)
return null;
d.completeRelay(r);
src = null; snd = null; dep = null;
return d.postFire(a, b, mode);
dep = null; src = null; srcs = null;
if (d.completeRelay(r)) {
for (CompletableFuture<?> b : as)
if (b != a)
b.cleanStack();
if (mode < 0)
return d;
else
d.postComplete();
}
return null;
}
}
/** Recursively constructs a tree of completions. */
static CompletableFuture<Object> orTree(CompletableFuture<?>[] cfs,
int lo, int hi) {
CompletableFuture<Object> d = new CompletableFuture<Object>();
if (lo <= hi) {
CompletableFuture<?> a, b; Object r;
int mid = (lo + hi) >>> 1;
if ((a = (lo == mid ? cfs[lo] :
orTree(cfs, lo, mid))) == null ||
(b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] :
orTree(cfs, mid+1, hi))) == null)
throw new NullPointerException();
if ((r = a.result) != null && (r = b.result) != null)
d.result = encodeRelay(r);
else
a.orpush(b, new OrRelay<>(d, a, b));
final boolean isLive() {
CompletableFuture<Object> d;
return (d = dep) != null && d.result == null;
}
return d;
}
/* ------------- Zero-input Async forms -------------- */
@ -2354,7 +2353,28 @@ public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
* {@code null}
*/
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
return orTree(cfs, 0, cfs.length - 1);
int n; Object r;
if ((n = cfs.length) <= 1)
return (n == 0)
? new CompletableFuture<Object>()
: uniCopyStage(cfs[0]);
for (CompletableFuture<?> cf : cfs)
if ((r = cf.result) != null)
return new CompletableFuture<Object>(encodeRelay(r));
cfs = cfs.clone();
CompletableFuture<Object> d = new CompletableFuture<>();
for (CompletableFuture<?> cf : cfs)
cf.unipush(new AnyOf(d, cf, cfs));
// If d was completed while we were adding completions, we should
// clean the stack of any sources that may have had completions
// pushed on their stack after d was completed.
if (d.result != null)
for (int i = 0, len = cfs.length; i < len; i++)
if (cfs[i].result != null)
for (i++; i < len; i++)
if (cfs[i].result == null)
cfs[i].cleanStack();
return d;
}
/* ------------- Control and status methods -------------- */
@ -2526,7 +2546,7 @@ public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
* @since 9
*/
public CompletableFuture<T> copy() {
return uniCopyStage();
return uniCopyStage(this);
}
/**

View File

@ -4269,12 +4269,11 @@ public class CompletableFutureTest extends JSR166TestCase {
}
}
/*
* Tests below currently fail in stress mode due to memory retention.
* ant -Dvmoptions=-Xmx8m -Djsr166.expensiveTests=true -Djsr166.tckTestClass=CompletableFutureTest tck
/**
* Reproduction recipe for:
* 8160402: Garbage retention with CompletableFuture.anyOf
* cvs update -D '2016-05-01' ./src/main/java/util/concurrent/CompletableFuture.java && ant -Dvmoptions=-Xmx8m -Djsr166.expensiveTests=true -Djsr166.tckTestClass=CompletableFutureTest -Djsr166.methodFilter=testAnyOfGarbageRetention tck; cvs update -A
*/
/** Checks for garbage retention with anyOf. */
public void testAnyOfGarbageRetention() throws Throwable {
for (Integer v : new Integer[] { 1, null })
{
@ -4288,7 +4287,12 @@ public class CompletableFutureTest extends JSR166TestCase {
checkCompletedNormally(CompletableFuture.anyOf(fs), v);
}}
/** Checks for garbage retention with allOf. */
/**
* Checks for garbage retention with allOf.
*
* As of 2016-07, fails with OOME:
* ant -Dvmoptions=-Xmx8m -Djsr166.expensiveTests=true -Djsr166.tckTestClass=CompletableFutureTest -Djsr166.methodFilter=testCancelledAllOfGarbageRetention tck
*/
public void testCancelledAllOfGarbageRetention() throws Throwable {
final int n = expensiveTests ? 100_000 : 10;
CompletableFuture<Integer>[] fs
@ -4299,6 +4303,21 @@ public class CompletableFutureTest extends JSR166TestCase {
assertTrue(CompletableFuture.allOf(fs).cancel(false));
}
/**
* Checks for garbage retention when a dependent future is
* cancelled and garbage-collected.
* 8161600: Garbage retention when source CompletableFutures are never completed
*
* As of 2016-07, fails with OOME:
* ant -Dvmoptions=-Xmx8m -Djsr166.expensiveTests=true -Djsr166.tckTestClass=CompletableFutureTest -Djsr166.methodFilter=testCancelledGarbageRetention tck
*/
public void testCancelledGarbageRetention() throws Throwable {
final int n = expensiveTests ? 100_000 : 10;
CompletableFuture<Integer> neverCompleted = new CompletableFuture<>();
for (int i = 0; i < n; i++)
assertTrue(neverCompleted.thenRun(() -> {}).cancel(true));
}
// static <U> U join(CompletionStage<U> stage) {
// CompletableFuture<U> f = new CompletableFuture<>();
// stage.whenComplete((v, ex) -> {