8317295: ResponseSubscribers.SubscriberAdapter should call the finisher function asynchronously

Reviewed-by: djelinski
This commit is contained in:
Daniel Fuchs 2023-10-02 13:06:43 +00:00
parent 516cfb135f
commit 8093563bce
2 changed files with 73 additions and 12 deletions
src/java.net.http/share/classes/jdk/internal/net/http
test/jdk/java/net/httpclient

@ -780,7 +780,7 @@ public class ResponseSubscribers {
subscriber.onComplete();
} finally {
try {
cf.complete(finisher.apply(subscriber));
cf.completeAsync(() -> finisher.apply(subscriber));
} catch (Throwable throwable) {
cf.completeExceptionally(throwable);
}

@ -26,15 +26,21 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.lang.StackWalker.StackFrame;
import java.net.URI;
import java.net.http.HttpClient.Builder;
import java.net.http.HttpClient.Version;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
@ -57,6 +63,7 @@ import static org.testng.Assert.assertTrue;
/*
* @test
* @bug 8193365 8317295
* @summary Basic tests for Flow adapter Subscribers
* @library /test/lib /test/jdk/java/net/httpclient/lib
* @build jdk.httpclient.test.lib.common.HttpServerAdapters
@ -76,6 +83,9 @@ public class FlowAdapterSubscriberTest implements HttpServerAdapters {
String http2URI;
String https2URI;
static final StackWalker WALKER =
StackWalker.getInstance(StackWalker.Option.RETAIN_CLASS_REFERENCE);
static final long start = System.nanoTime();
public static String now() {
long now = System.nanoTime() - start;
@ -264,8 +274,8 @@ public class FlowAdapterSubscriberTest implements HttpServerAdapters {
}
@Test(dataProvider = "uris")
void testCollectionWithoutFinisheBlocking(String uri) throws Exception {
System.out.printf(now() + "testCollectionWithoutFinisheBlocking(%s) starting%n", uri);
void testCollectionWithoutFinisherBlocking(String uri) throws Exception {
System.out.printf(now() + "testCollectionWithoutFinisherBlocking(%s) starting%n", uri);
try (HttpClient client = newHttpClient(uri)) {
HttpRequest request = newRequestBuilder(uri)
.POST(BodyPublishers.ofString("What's the craic?")).build();
@ -455,18 +465,69 @@ public class FlowAdapterSubscriberTest implements HttpServerAdapters {
HttpRequest request = newRequestBuilder(uri)
.POST(BodyPublishers.ofString("May the wind always be at your back.")).build();
client.sendAsync(request, BodyHandlers.fromSubscriber(BodySubscribers.ofInputStream(),
ins -> {
InputStream is = ins.getBody().toCompletableFuture().join();
return new String(uncheckedReadAllBytes(is), UTF_8);
}))
.thenApply(FlowAdapterSubscriberTest::assert200ResponseCode)
.thenApply(HttpResponse::body)
.thenAccept(body -> assertEquals(body, "May the wind always be at your back."))
.join();
var adaptee = BodySubscribers.ofInputStream();
var exec = Executors.newSingleThreadExecutor();
// Use an executor to pull on the InputStream in order to reach the
// point where the Subscriber gets completed and the finisher function
// is called. If we didn't use an executor here, the finisher function
// may never get called.
var futureResult = exec.submit(() -> uncheckedReadAllBytes(
adaptee.getBody().toCompletableFuture().join()));
Supplier<byte[]> bytes = () -> {
try {
return futureResult.get();
} catch (InterruptedException e) {
throw new CompletionException(e);
} catch (ExecutionException e) {
throw new CompletionException(e.getCause());
}
};
AtomicReference<AssertionError> failed = new AtomicReference<>();
Function<? super Flow.Subscriber<List<ByteBuffer>>, String> finisher = (s) -> {
failed.set(checkThreadAndStack());
return new String(bytes.get(), UTF_8);
};
try {
var cf = client.sendAsync(request, BodyHandlers.fromSubscriber(adaptee,
finisher))
.thenApply(FlowAdapterSubscriberTest::assert200ResponseCode)
.thenApply(HttpResponse::body)
.thenAccept(body -> assertEquals(body, "May the wind always be at your back."))
.join();
var error = failed.get();
if (error != null) throw error;
} finally {
exec.close();
}
}
}
static final Predicate<StackFrame> DAT = sfe ->
sfe.getClassName().startsWith("FlowAdapterSubscriberTest");
static final Predicate<StackFrame> JUC = sfe ->
sfe.getClassName().startsWith("java.util.concurrent");
static final Predicate<StackFrame> JLT = sfe ->
sfe.getClassName().startsWith("java.lang.Thread");
static final Predicate<StackFrame> RSP = sfe ->
sfe.getClassName().startsWith("jdk.internal.net.http.ResponseSubscribers");
static final Predicate<StackFrame> NotDATorJUCorJLT = Predicate.not(DAT.or(JUC).or(JLT).or(RSP));
AssertionError checkThreadAndStack() {
System.out.println("Check stack trace");
List<StackFrame> otherFrames = WALKER.walk(s -> s.filter(NotDATorJUCorJLT).toList());
if (!otherFrames.isEmpty()) {
System.out.println("Found unexpected trace: ");
otherFrames.forEach(f -> System.out.printf("\t%s%n", f));
return new AssertionError("Dependant action has unexpected frame in " +
Thread.currentThread() + ": " + otherFrames.get(0));
}
return null;
}
/** An abstract Subscriber that converts all received data into a String. */
static abstract class AbstractSubscriber implements Supplier<String> {
protected volatile Flow.Subscription subscription;