8252374: Add a new factory method to concatenate a sequence of BodyPublisher instances into a single publisher.

Reviewed-by: chegar
This commit is contained in:
Daniel Fuchs 2020-10-12 12:52:55 +00:00
parent 05459df0c7
commit 4184959d85
6 changed files with 1183 additions and 2 deletions
src/java.net.http/share/classes
java/net/http
jdk/internal/net/http
test/jdk/java/net/httpclient

@ -40,8 +40,10 @@ import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Flow;
import java.util.function.Supplier;
import jdk.internal.net.http.HttpRequestBuilderImpl;
import jdk.internal.net.http.RequestPublishers;
import static java.nio.charset.StandardCharsets.UTF_8;
/**
@ -654,5 +656,56 @@ public abstract class HttpRequest {
public static BodyPublisher noBody() {
return new RequestPublishers.EmptyPublisher();
}
/**
* Returns a {@code BodyPublisher} that publishes a request
* body consisting of the concatenation of the request bodies
* published by a sequence of publishers.
*
* <p> If the sequence is empty an {@linkplain #noBody() empty} publisher
* is returned. Otherwise, if the sequence contains a single element,
* that publisher is returned. Otherwise a <em>concatenation publisher</em>
* is returned.
*
* <p> The request body published by a <em>concatenation publisher</em>
* is logically equivalent to the request body that would have
* been published by concatenating all the bytes of each publisher
* in sequence.
*
* <p> Each publisher is lazily subscribed to in turn,
* until all the body bytes are published, an error occurs, or the
* concatenation publisher's subscription is cancelled.
* The concatenation publisher may be subscribed to more than once,
* which in turn may result in the publishers in the sequence being
* subscribed to more than once.
*
* <p> The concatenation publisher has a known content
* length only if all publishers in the sequence have a known content
* length. The {@link BodyPublisher#contentLength() contentLength}
* reported by the concatenation publisher is computed as follows:
* <ul>
* <li> If any of the publishers reports an <em>{@linkplain
* BodyPublisher#contentLength() unknown}</em> content length,
* or if the sum of the known content lengths would exceed
* {@link Long#MAX_VALUE}, the resulting
* content length is <em>unknown</em>.</li>
* <li> Otherwise, the resulting content length is the sum of the
* known content lengths, a number between
* {@code 0} and {@link Long#MAX_VALUE}, inclusive.</li>
* </ul>
*
* @implNote If the concatenation publisher's subscription is
* {@linkplain Flow.Subscription#cancel() cancelled}, or an error occurs
* while publishing the bytes, not all publishers in the sequence may
* be subscribed to.
*
* @param publishers a sequence of publishers.
* @return An aggregate publisher that publishes a request body
* logically equivalent to the concatenation of all bytes published
* by each publisher in the sequence.
*/
public static BodyPublisher concat(BodyPublisher... publishers) {
return RequestPublishers.concat(Objects.requireNonNull(publishers));
}
}
}

@ -32,6 +32,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.http.HttpRequest.BodyPublisher;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.file.Files;
@ -47,12 +48,16 @@ import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import java.net.http.HttpRequest.BodyPublisher;
import jdk.internal.net.http.common.Demand;
import jdk.internal.net.http.common.SequentialScheduler;
import jdk.internal.net.http.common.Utils;
public final class RequestPublishers {
@ -491,4 +496,194 @@ public final class RequestPublishers {
publisher.subscribe(subscriber);
}
}
public static BodyPublisher concat(BodyPublisher... publishers) {
if (publishers.length == 0) {
return new EmptyPublisher();
} else if (publishers.length == 1) {
return Objects.requireNonNull(publishers[0]);
} else {
return new AggregatePublisher(List.of(publishers));
}
}
/**
* An aggregate publisher acts as a proxy between a subscriber
* and a list of publishers. It lazily subscribes to each publisher
* in sequence in order to publish a request body that is
* composed from all the bytes obtained from each publisher.
* For instance, the following two publishers are equivalent, even
* though they may result in a different count of {@code onNext}
* invocations.
* <pre>{@code
* var bp1 = BodyPublishers.ofString("ab");
* var bp2 = BodyPublishers.concat(BodyPublishers.ofString("a"),
* BodyPublisher.ofByteArray(new byte[] {(byte)'b'}));
* }</pre>
*
*/
private static final class AggregatePublisher implements BodyPublisher {
final List<BodyPublisher> bodies;
AggregatePublisher(List<BodyPublisher> bodies) {
this.bodies = bodies;
}
// -1 must be returned if any publisher returns -1
// Otherwise, we can just sum the contents.
@Override
public long contentLength() {
long length = bodies.stream()
.mapToLong(BodyPublisher::contentLength)
.reduce((a,b) -> a < 0 || b < 0 ? -1 : a + b)
.orElse(0);
// In case of overflow in any operation but the last, length
// will be -1.
// In case of overflow in the last reduce operation, length
// will be negative, but not necessarily -1: in that case,
// return -1
if (length < 0) return -1;
return length;
}
@Override
public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
subscriber.onSubscribe(new AggregateSubscription(bodies, subscriber));
}
}
private static final class AggregateSubscription
implements Flow.Subscription, Flow.Subscriber<ByteBuffer> {
final Flow.Subscriber<? super ByteBuffer> subscriber; // upstream
final Queue<BodyPublisher> bodies;
final SequentialScheduler scheduler;
final Demand demand = new Demand(); // from upstream
final Demand demanded = new Demand(); // requested downstream
final AtomicReference<Throwable> error = new AtomicReference<>();
volatile Throwable illegalRequest;
volatile BodyPublisher publisher; // downstream
volatile Flow.Subscription subscription; // downstream
volatile boolean cancelled;
AggregateSubscription(List<BodyPublisher> bodies, Flow.Subscriber<? super ByteBuffer> subscriber) {
this.bodies = new ConcurrentLinkedQueue<>(bodies);
this.subscriber = subscriber;
this.scheduler = SequentialScheduler.synchronizedScheduler(this::run);
}
@Override
public void request(long n) {
if (cancelled || publisher == null && bodies.isEmpty()) {
return;
}
try {
demand.increase(n);
} catch (IllegalArgumentException x) {
illegalRequest = x;
}
scheduler.runOrSchedule();
}
@Override
public void cancel() {
cancelled = true;
scheduler.runOrSchedule();
}
private boolean cancelSubscription() {
Flow.Subscription subscription = this.subscription;
if (subscription != null) {
this.subscription = null;
this.publisher = null;
subscription.cancel();
}
scheduler.stop();
return subscription != null;
}
public void run() {
try {
while (error.get() == null
&& (!demand.isFulfilled()
|| (publisher == null && !bodies.isEmpty()))) {
boolean cancelled = this.cancelled;
BodyPublisher publisher = this.publisher;
Flow.Subscription subscription = this.subscription;
Throwable illegalRequest = this.illegalRequest;
if (cancelled) {
bodies.clear();
cancelSubscription();
return;
}
if (publisher == null && !bodies.isEmpty()) {
this.publisher = publisher = bodies.poll();
publisher.subscribe(this);
subscription = this.subscription;
} else if (publisher == null) {
return;
}
if (illegalRequest != null) {
onError(illegalRequest);
return;
}
if (subscription == null) return;
if (!demand.isFulfilled()) {
long n = demand.decreaseAndGet(demand.get());
demanded.increase(n);
subscription.request(n);
}
}
} catch (Throwable t) {
onError(t);
}
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
scheduler.runOrSchedule();
}
@Override
public void onNext(ByteBuffer item) {
// make sure to cancel the subscription if we receive
// an item after the subscription was cancelled or
// an error was reported.
if (cancelled || error.get() != null) {
cancelSubscription();
return;
}
demanded.tryDecrement();
subscriber.onNext(item);
}
@Override
public void onError(Throwable throwable) {
if (error.compareAndSet(null, throwable)) {
publisher = null;
subscription = null;
subscriber.onError(throwable);
scheduler.stop();
}
}
@Override
public void onComplete() {
if (publisher != null && !bodies.isEmpty()) {
while (!demanded.isFulfilled()) {
demand.increase(demanded.decreaseAndGet(demanded.get()));
}
publisher = null;
subscription = null;
scheduler.runOrSchedule();
} else {
publisher = null;
subscription = null;
if (!cancelled) {
subscriber.onComplete();
}
scheduler.stop();
}
}
}
}

@ -47,7 +47,7 @@ public final class Demand {
*/
public boolean increase(long n) {
if (n <= 0) {
throw new IllegalArgumentException(String.valueOf(n));
throw new IllegalArgumentException("non-positive subscription request: " + String.valueOf(n));
}
long prev = val.getAndAccumulate(n, (p, i) -> p + i < 0 ? Long.MAX_VALUE : p + i);
return prev == 0;

@ -0,0 +1,839 @@
/*
* Copyright (c) 2020, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
/*
* @test
* @bug 8252374
* @library /test/lib http2/server
* @build jdk.test.lib.net.SimpleSSLContext HttpServerAdapters
* ReferenceTracker AggregateRequestBodyTest
* @modules java.base/sun.net.www.http
* java.net.http/jdk.internal.net.http.common
* java.net.http/jdk.internal.net.http.frame
* java.net.http/jdk.internal.net.http.hpack
* @run testng/othervm -Djdk.internal.httpclient.debug=true
* -Djdk.httpclient.HttpClient.log=requests,responses,errors
* AggregateRequestBodyTest
* @summary Tests HttpRequest.BodyPublishers::concat
*/
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpRequest.BodyPublisher;
import java.net.http.HttpRequest.BodyPublishers;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandlers;
import java.nio.ByteBuffer;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import javax.net.ssl.SSLContext;
import com.sun.net.httpserver.HttpServer;
import com.sun.net.httpserver.HttpsConfigurator;
import com.sun.net.httpserver.HttpsServer;
import jdk.test.lib.net.SimpleSSLContext;
import org.testng.Assert;
import org.testng.ITestContext;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterTest;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import static java.lang.System.out;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.expectThrows;
public class AggregateRequestBodyTest implements HttpServerAdapters {
SSLContext sslContext;
HttpTestServer http1TestServer; // HTTP/1.1 ( http )
HttpTestServer https1TestServer; // HTTPS/1.1 ( https )
HttpTestServer http2TestServer; // HTTP/2 ( h2c )
HttpTestServer https2TestServer; // HTTP/2 ( h2 )
String http1URI;
String https1URI;
String http2URI;
String https2URI;
static final int RESPONSE_CODE = 200;
static final int ITERATION_COUNT = 4;
static final Class<IllegalArgumentException> IAE = IllegalArgumentException.class;
static final Class<CompletionException> CE = CompletionException.class;
// a shared executor helps reduce the amount of threads created by the test
static final Executor executor = new TestExecutor(Executors.newCachedThreadPool());
static final ConcurrentMap<String, Throwable> FAILURES = new ConcurrentHashMap<>();
static volatile boolean tasksFailed;
static final AtomicLong serverCount = new AtomicLong();
static final AtomicLong clientCount = new AtomicLong();
static final long start = System.nanoTime();
public static String now() {
long now = System.nanoTime() - start;
long secs = now / 1000_000_000;
long mill = (now % 1000_000_000) / 1000_000;
long nan = now % 1000_000;
return String.format("[%d s, %d ms, %d ns] ", secs, mill, nan);
}
final ReferenceTracker TRACKER = ReferenceTracker.INSTANCE;
private volatile HttpClient sharedClient;
static class TestExecutor implements Executor {
final AtomicLong tasks = new AtomicLong();
Executor executor;
TestExecutor(Executor executor) {
this.executor = executor;
}
@Override
public void execute(Runnable command) {
long id = tasks.incrementAndGet();
executor.execute(() -> {
try {
command.run();
} catch (Throwable t) {
tasksFailed = true;
System.out.printf(now() + "Task %s failed: %s%n", id, t);
System.err.printf(now() + "Task %s failed: %s%n", id, t);
FAILURES.putIfAbsent("Task " + id, t);
throw t;
}
});
}
}
protected boolean stopAfterFirstFailure() {
return Boolean.getBoolean("jdk.internal.httpclient.debug");
}
@BeforeMethod
void beforeMethod(ITestContext context) {
if (stopAfterFirstFailure() && context.getFailedTests().size() > 0) {
throw new RuntimeException("some tests failed");
}
}
@AfterClass
static final void printFailedTests() {
out.println("\n=========================");
try {
out.printf("%n%sCreated %d servers and %d clients%n",
now(), serverCount.get(), clientCount.get());
if (FAILURES.isEmpty()) return;
out.println("Failed tests: ");
FAILURES.entrySet().forEach((e) -> {
out.printf("\t%s: %s%n", e.getKey(), e.getValue());
e.getValue().printStackTrace(out);
e.getValue().printStackTrace();
});
if (tasksFailed) {
System.out.println("WARNING: Some tasks failed");
}
} finally {
out.println("\n=========================\n");
}
}
private String[] uris() {
return new String[] {
http1URI,
https1URI,
http2URI,
https2URI,
};
}
static AtomicLong URICOUNT = new AtomicLong();
@DataProvider(name = "variants")
public Object[][] variants(ITestContext context) {
if (stopAfterFirstFailure() && context.getFailedTests().size() > 0) {
return new Object[0][];
}
String[] uris = uris();
Object[][] result = new Object[uris.length * 2][];
int i = 0;
for (boolean sameClient : List.of(false, true)) {
for (String uri : uris()) {
result[i++] = new Object[]{uri, sameClient};
}
}
assert i == uris.length * 2;
return result;
}
private HttpClient makeNewClient() {
clientCount.incrementAndGet();
HttpClient client = HttpClient.newBuilder()
.proxy(HttpClient.Builder.NO_PROXY)
.executor(executor)
.sslContext(sslContext)
.build();
return TRACKER.track(client);
}
HttpClient newHttpClient(boolean share) {
if (!share) return makeNewClient();
HttpClient shared = sharedClient;
if (shared != null) return shared;
synchronized (this) {
shared = sharedClient;
if (shared == null) {
shared = sharedClient = makeNewClient();
}
return shared;
}
}
static final List<String> BODIES = List.of(
"Lorem ipsum",
"dolor sit amet",
"consectetur adipiscing elit, sed do eiusmod tempor",
"quis nostrud exercitation ullamco",
"laboris nisi",
"ut",
"aliquip ex ea commodo consequat." +
"Duis aute irure dolor in reprehenderit in voluptate velit esse" +
"cillum dolore eu fugiat nulla pariatur.",
"Excepteur sint occaecat cupidatat non proident."
);
static BodyPublisher[] publishers(String... content) {
if (content == null) return null;
BodyPublisher[] result = new BodyPublisher[content.length];
for (int i=0; i < content.length ; i++) {
result[i] = content[i] == null ? null : BodyPublishers.ofString(content[i]);
}
return result;
}
static String[] strings(String... s) {
return s;
}
@DataProvider(name = "sparseContent")
Object[][] nulls() {
return new Object[][] {
{"null array", null},
{"null element", strings((String)null)},
{"null first element", strings(null, "one")},
{"null second element", strings( "one", null)},
{"null third element", strings( "one", "two", null)},
{"null fourth element", strings( "one", "two", "three", null)},
{"null random element", strings( "one", "two", "three", null, "five")},
};
}
static List<Long> lengths(long... lengths) {
return LongStream.of(lengths)
.mapToObj(Long::valueOf)
.collect(Collectors.toList());
}
@DataProvider(name = "contentLengths")
Object[][] contentLengths() {
return new Object[][] {
{-1, lengths(-1)},
{-42, lengths(-42)},
{42, lengths(42)},
{42, lengths(10, 0, 20, 0, 12)},
{-1, lengths(10, 0, 20, -1, 12)},
{-1, lengths(-1, 0, 20, 10, 12)},
{-1, lengths(10, 0, 20, 12, -1)},
{-1, lengths(10, 0, 20, -10, 12)},
{-1, lengths(-10, 0, 20, 10, 12)},
{-1, lengths(10, 0, 20, 12, -10)},
{-1, lengths(10, 0, Long.MIN_VALUE, -1, 12)},
{-1, lengths(-1, 0, Long.MIN_VALUE, 10, 12)},
{-1, lengths(10, Long.MIN_VALUE, 20, 12, -1)},
{Long.MAX_VALUE, lengths(10, Long.MAX_VALUE - 42L, 20, 0, 12)},
{-1, lengths(10, Long.MAX_VALUE - 40L, 20, 0, 12)},
{-1, lengths(10, Long.MAX_VALUE - 12L, 20, 0, 12)},
{-1, lengths(10, Long.MAX_VALUE/2L, Long.MAX_VALUE/2L + 1L, 0, 12)},
{-1, lengths(10, Long.MAX_VALUE/2L, -1, Long.MAX_VALUE/2L + 1L, 12)},
{-1, lengths(10, Long.MAX_VALUE, 12, Long.MAX_VALUE, 20)},
{-1, lengths(10, Long.MAX_VALUE, Long.MAX_VALUE, 12, 20)},
{-1, lengths(0, Long.MAX_VALUE, Long.MAX_VALUE, 12, 20)},
{-1, lengths(Long.MAX_VALUE, Long.MAX_VALUE, 12, 0, 20)}
};
}
@DataProvider(name="negativeRequests")
Object[][] negativeRequests() {
return new Object[][] {
{0L}, {-1L}, {-2L}, {Long.MIN_VALUE + 1L}, {Long.MIN_VALUE}
};
}
static class ContentLengthPublisher implements BodyPublisher {
final long length;
ContentLengthPublisher(long length) {
this.length = length;
}
@Override
public long contentLength() {
return length;
}
@Override
public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
}
static ContentLengthPublisher[] of(List<Long> lengths) {
return lengths.stream()
.map(ContentLengthPublisher::new)
.toArray(ContentLengthPublisher[]::new);
}
}
/**
* A dummy publisher that allows to call onError on its subscriber (or not...).
*/
static class PublishWithError implements BodyPublisher {
final ConcurrentHashMap<Subscriber<?>, ErrorSubscription> subscribers = new ConcurrentHashMap<>();
final long length;
final List<String> content;
final int errorAt;
final Supplier<? extends Throwable> errorSupplier;
PublishWithError(List<String> content, int errorAt, Supplier<? extends Throwable> supplier) {
this.content = content;
this.errorAt = errorAt;
this.errorSupplier = supplier;
length = content.stream().mapToInt(String::length).sum();
}
boolean hasErrors() {
return errorAt < content.size();
}
@Override
public long contentLength() {
return length;
}
@Override
public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
ErrorSubscription subscription = new ErrorSubscription(subscriber);
subscribers.put(subscriber, subscription);
subscriber.onSubscribe(subscription);
}
class ErrorSubscription implements Flow.Subscription {
volatile boolean cancelled;
volatile int at;
final Subscriber<? super ByteBuffer> subscriber;
ErrorSubscription(Subscriber<? super ByteBuffer> subscriber) {
this.subscriber = subscriber;
}
@Override
public void request(long n) {
while (!cancelled && --n >= 0 && at < Math.min(errorAt+1, content.size())) {
if (at++ == errorAt) {
subscriber.onError(errorSupplier.get());
return;
} else if (at <= content.size()){
subscriber.onNext(ByteBuffer.wrap(
content.get(at-1).getBytes()));
if (at == content.size()) {
subscriber.onComplete();
return;
}
}
}
}
@Override
public void cancel() {
cancelled = true;
}
}
}
static class RequestSubscriber implements Flow.Subscriber<ByteBuffer> {
CompletableFuture<Subscription> subscriptionCF = new CompletableFuture<>();
ConcurrentLinkedDeque<ByteBuffer> items = new ConcurrentLinkedDeque<>();
CompletableFuture<List<ByteBuffer>> resultCF = new CompletableFuture<>();
@Override
public void onSubscribe(Subscription subscription) {
this.subscriptionCF.complete(subscription);
}
@Override
public void onNext(ByteBuffer item) {
items.addLast(item);
}
@Override
public void onError(Throwable throwable) {
resultCF.completeExceptionally(throwable);
}
@Override
public void onComplete() {
resultCF.complete(items.stream().collect(Collectors.toUnmodifiableList()));
}
CompletableFuture<List<ByteBuffer>> resultCF() { return resultCF; }
}
static String stringFromBuffer(ByteBuffer buffer) {
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
return new String(bytes);
}
String stringFromBytes(Stream<ByteBuffer> buffers) {
return buffers.map(AggregateRequestBodyTest::stringFromBuffer)
.collect(Collectors.joining());
}
static PublishWithError withNoError(String content) {
return new PublishWithError(List.of(content), 1,
() -> new AssertionError("Should not happen!"));
}
static PublishWithError withNoError(List<String> content) {
return new PublishWithError(content, content.size(),
() -> new AssertionError("Should not happen!"));
}
@Test(dataProvider = "sparseContent") // checks that NPE is thrown
public void testNullPointerException(String description, String[] content) {
BodyPublisher[] publishers = publishers(content);
Assert.assertThrows(NullPointerException.class, () -> BodyPublishers.concat(publishers));
}
// Verifies that an empty array creates a "noBody" publisher
@Test
public void testEmpty() {
BodyPublisher publisher = BodyPublishers.concat();
RequestSubscriber subscriber = new RequestSubscriber();
assertEquals(publisher.contentLength(), 0);
publisher.subscribe(subscriber);
subscriber.subscriptionCF.thenAccept(s -> s.request(1));
List<ByteBuffer> result = subscriber.resultCF.join();
assertEquals(result, List.of());
assertTrue(subscriber.items.isEmpty());;
}
// verifies that error emitted by upstream publishers are propagated downstream.
@Test(dataProvider = "sparseContent") // nulls are replaced with error publisher
public void testOnError(String description, String[] content) {
final RequestSubscriber subscriber = new RequestSubscriber();
final PublishWithError errorPublisher;
final BodyPublisher[] publishers;
String result = BODIES.stream().collect(Collectors.joining());
if (content == null) {
content = List.of(result).toArray(String[]::new);
errorPublisher = new PublishWithError(BODIES, BODIES.size(),
() -> new AssertionError("Unexpected!!"));
publishers = List.of(errorPublisher).toArray(new BodyPublisher[0]);
description = "No error";
} else {
publishers = publishers(content);
description = description.replace("null", "error at");
errorPublisher = new PublishWithError(BODIES, 2, () -> new Exception("expected"));
}
result = "";
boolean hasErrors = false;
for (int i=0; i < content.length; i++) {
if (content[i] == null) {
publishers[i] = errorPublisher;
if (hasErrors) continue;
if (!errorPublisher.hasErrors()) {
result = result + errorPublisher
.content.stream().collect(Collectors.joining());
} else {
result = result + errorPublisher.content
.stream().limit(errorPublisher.errorAt)
.collect(Collectors.joining());
result = result + "<error>";
hasErrors = true;
}
} else if (!hasErrors) {
result = result + content[i];
}
}
BodyPublisher publisher = BodyPublishers.concat(publishers);
publisher.subscribe(subscriber);
subscriber.subscriptionCF.thenAccept(s -> s.request(Long.MAX_VALUE));
if (errorPublisher.hasErrors()) {
CompletionException ce = expectThrows(CompletionException.class,
() -> subscriber.resultCF.join());
out.println(description + ": got expected " + ce);
assertEquals(ce.getCause().getClass(), Exception.class);
assertEquals(stringFromBytes(subscriber.items.stream()) + "<error>", result);
} else {
assertEquals(stringFromBytes(subscriber.resultCF.join().stream()), result);
out.println(description + ": got expected result: " + result);
}
}
// Verifies that if an upstream publisher has an unknown length, the
// aggregate publisher will have an unknown length as well. Otherwise
// the length should be known.
@Test(dataProvider = "sparseContent") // nulls are replaced with unknown length
public void testUnknownContentLength(String description, String[] content) {
if (content == null) {
content = BODIES.toArray(String[]::new);
description = "BODIES (known length)";
} else {
description = description.replace("null", "length(-1)");
}
BodyPublisher[] publishers = publishers(content);
BodyPublisher nolength = new BodyPublisher() {
final BodyPublisher missing = BodyPublishers.ofString("missing");
@Override
public long contentLength() { return -1; }
@Override
public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
missing.subscribe(subscriber);
}
};
long length = 0;
for (int i=0; i < content.length; i++) {
if (content[i] == null) {
publishers[i] = nolength;
length = -1;
} else if (length >= 0) {
length += content[i].length();
}
}
out.printf("testUnknownContentLength(%s): %d%n", description, length);
BodyPublisher publisher = BodyPublishers.concat(publishers);
assertEquals(publisher.contentLength(), length,
description.replace("null", "length(-1)"));
}
private static final Throwable completionCause(CompletionException x) {
while (x.getCause() instanceof CompletionException) {
x = (CompletionException)x.getCause();
}
return x.getCause();
}
@Test(dataProvider = "negativeRequests")
public void testNegativeRequest(long n) {
assert n <= 0 : "test for negative request called with n > 0 : " + n;
BodyPublisher[] publishers = ContentLengthPublisher.of(List.of(1L, 2L, 3L));
BodyPublisher publisher = BodyPublishers.concat(publishers);
RequestSubscriber subscriber = new RequestSubscriber();
publisher.subscribe(subscriber);
Subscription subscription = subscriber.subscriptionCF.join();
subscription.request(n);
CompletionException expected = expectThrows(CE, () -> subscriber.resultCF.join());
Throwable cause = completionCause(expected);
if (cause instanceof IllegalArgumentException) {
System.out.printf("Got expected IAE for %d: %s%n", n, cause);
} else {
throw new AssertionError("Unexpected exception: " + cause,
(cause == null) ? expected : cause);
}
}
static BodyPublisher[] ofStrings(String... strings) {
return Stream.of(strings).map(BodyPublishers::ofString).toArray(BodyPublisher[]::new);
}
@Test
public void testPositiveRequests() {
// A composite array of publishers
BodyPublisher[] publishers = Stream.of(
Stream.of(ofStrings("Lorem", " ", "ipsum", " ")),
Stream.of(BodyPublishers.concat(ofStrings("dolor", " ", "sit", " ", "amet", ", "))),
Stream.<BodyPublisher>of(withNoError(List.of("consectetur", " ", "adipiscing"))),
Stream.of(ofStrings(" ")),
Stream.of(BodyPublishers.concat(ofStrings("elit", ".")))
).flatMap((s) -> s).toArray(BodyPublisher[]::new);
BodyPublisher publisher = BodyPublishers.concat(publishers);
// Test that we can request all 13 items in a single request call.
RequestSubscriber requestSubscriber1 = new RequestSubscriber();
publisher.subscribe(requestSubscriber1);
Subscription subscription1 = requestSubscriber1.subscriptionCF.join();
subscription1.request(16);
assertTrue(requestSubscriber1.resultCF().isDone());
List<ByteBuffer> list1 = requestSubscriber1.resultCF().join();
String result1 = stringFromBytes(list1.stream());
assertEquals(result1, "Lorem ipsum dolor sit amet, consectetur adipiscing elit.");
System.out.println("Got expected sentence with one request: \"%s\"".formatted(result1));
// Test that we can split our requests call any which way we want
// (whether in the 'middle of a publisher' or at the boundaries.
RequestSubscriber requestSubscriber2 = new RequestSubscriber();
publisher.subscribe(requestSubscriber2);
Subscription subscription2 = requestSubscriber2.subscriptionCF.join();
subscription2.request(1);
assertFalse(requestSubscriber2.resultCF().isDone());
subscription2.request(10);
assertFalse(requestSubscriber2.resultCF().isDone());
subscription2.request(4);
assertFalse(requestSubscriber2.resultCF().isDone());
subscription2.request(1);
assertTrue(requestSubscriber2.resultCF().isDone());
List<ByteBuffer> list2 = requestSubscriber2.resultCF().join();
String result2 = stringFromBytes(list2.stream());
assertEquals(result2, "Lorem ipsum dolor sit amet, consectetur adipiscing elit.");
System.out.println("Got expected sentence with 4 requests: \"%s\"".formatted(result1));
}
@Test(dataProvider = "contentLengths")
public void testContentLength(long expected, List<Long> lengths) {
BodyPublisher[] publishers = ContentLengthPublisher.of(lengths);
BodyPublisher aggregate = BodyPublishers.concat(publishers);
assertEquals(aggregate.contentLength(), expected,
"Unexpected result for %s".formatted(lengths));
}
// Verifies that cancelling the subscription ensure that downstream
// publishers are no longer subscribed etc...
@Test
public void testCancel() {
BodyPublisher[] publishers = BODIES.stream()
.map(BodyPublishers::ofString)
.toArray(BodyPublisher[]::new);
BodyPublisher publisher = BodyPublishers.concat(publishers);
assertEquals(publisher.contentLength(),
BODIES.stream().mapToInt(String::length).sum());
Map<RequestSubscriber, String> subscribers = new LinkedHashMap<>();
for (int n=0; n < BODIES.size(); n++) {
String description = String.format(
"cancel after %d/%d onNext() invocations",
n, BODIES.size());
RequestSubscriber subscriber = new RequestSubscriber();
publisher.subscribe(subscriber);
Subscription subscription = subscriber.subscriptionCF.join();
subscribers.put(subscriber, description);
// receive half the data
for (int i = 0; i < n; i++) {
subscription.request(1);
ByteBuffer buffer = subscriber.items.pop();
}
// cancel subscription
subscription.cancel();
// request the rest...
subscription.request(Long.MAX_VALUE);
}
CompletableFuture[] results = subscribers.keySet()
.stream().map(RequestSubscriber::resultCF)
.toArray(CompletableFuture[]::new);
CompletableFuture<?> any = CompletableFuture.anyOf(results);
// subscription was cancelled, so nothing should be received...
try {
TimeoutException x = Assert.expectThrows(TimeoutException.class,
() -> any.get(5, TimeUnit.SECONDS));
out.println("Got expected " + x);
} finally {
subscribers.keySet().stream()
.filter(rs -> rs.resultCF.isDone())
.forEach(rs -> System.err.printf(
"Failed: %s completed with %s",
subscribers.get(rs), rs.resultCF));
}
Consumer<RequestSubscriber> check = (rs) -> {
Assert.assertTrue(rs.items.isEmpty(), subscribers.get(rs) + " has items");
Assert.assertFalse(rs.resultCF.isDone(), subscribers.get(rs) + " was not cancelled");
out.println(subscribers.get(rs) + ": PASSED");
};
subscribers.keySet().stream().forEach(check);
}
// Verifies that cancelling the subscription is propagated downstream
@Test
public void testCancelSubscription() {
PublishWithError upstream = new PublishWithError(BODIES, BODIES.size(),
() -> new AssertionError("should not come here"));
BodyPublisher publisher = BodyPublishers.concat(upstream);
assertEquals(publisher.contentLength(),
BODIES.stream().mapToInt(String::length).sum());
Map<RequestSubscriber, String> subscribers = new LinkedHashMap<>();
for (int n=0; n < BODIES.size(); n++) {
String description = String.format(
"cancel after %d/%d onNext() invocations",
n, BODIES.size());
RequestSubscriber subscriber = new RequestSubscriber();
publisher.subscribe(subscriber);
Subscription subscription = subscriber.subscriptionCF.join();
subscribers.put(subscriber, description);
// receive half the data
for (int i = 0; i < n; i++) {
subscription.request(1);
ByteBuffer buffer = subscriber.items.pop();
}
// cancel subscription
subscription.cancel();
// request the rest...
subscription.request(Long.MAX_VALUE);
assertTrue(upstream.subscribers.get(subscriber).cancelled,
description + " upstream subscription not cancelled");
out.println(description + " upstream subscription was properly cancelled");
}
CompletableFuture[] results = subscribers.keySet()
.stream().map(RequestSubscriber::resultCF)
.toArray(CompletableFuture[]::new);
CompletableFuture<?> any = CompletableFuture.anyOf(results);
// subscription was cancelled, so nothing should be received...
try {
TimeoutException x = Assert.expectThrows(TimeoutException.class,
() -> any.get(5, TimeUnit.SECONDS));
out.println("Got expected " + x);
} finally {
subscribers.keySet().stream()
.filter(rs -> rs.resultCF.isDone())
.forEach(rs -> System.err.printf(
"Failed: %s completed with %s",
subscribers.get(rs), rs.resultCF));
}
Consumer<RequestSubscriber> check = (rs) -> {
Assert.assertTrue(rs.items.isEmpty(), subscribers.get(rs) + " has items");
Assert.assertFalse(rs.resultCF.isDone(), subscribers.get(rs) + " was not cancelled");
out.println(subscribers.get(rs) + ": PASSED");
};
subscribers.keySet().stream().forEach(check);
}
@Test(dataProvider = "variants")
public void test(String uri, boolean sameClient) throws Exception {
System.out.println("Request to " + uri);
HttpClient client = newHttpClient(sameClient);
BodyPublisher publisher = BodyPublishers.concat(
BODIES.stream()
.map(BodyPublishers::ofString)
.toArray(HttpRequest.BodyPublisher[]::new)
);
HttpRequest request = HttpRequest.newBuilder(URI.create(uri))
.POST(publisher)
.build();
for (int i = 0; i < ITERATION_COUNT; i++) {
System.out.println("Iteration: " + i);
HttpResponse<String> response = client.send(request, BodyHandlers.ofString());
int expectedResponse = RESPONSE_CODE;
if (response.statusCode() != expectedResponse)
throw new RuntimeException("wrong response code " + Integer.toString(response.statusCode()));
assertEquals(response.body(), BODIES.stream().collect(Collectors.joining()));
}
System.out.println("test: DONE");
}
@BeforeTest
public void setup() throws Exception {
sslContext = new SimpleSSLContext().get();
if (sslContext == null)
throw new AssertionError("Unexpected null sslContext");
HttpTestHandler handler = new HttpTestEchoHandler();
InetSocketAddress loopback = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);
HttpServer http1 = HttpServer.create(loopback, 0);
http1TestServer = HttpTestServer.of(http1);
http1TestServer.addHandler(handler, "/http1/echo/");
http1URI = "http://" + http1TestServer.serverAuthority() + "/http1/echo/x";
HttpsServer https1 = HttpsServer.create(loopback, 0);
https1.setHttpsConfigurator(new HttpsConfigurator(sslContext));
https1TestServer = HttpTestServer.of(https1);
https1TestServer.addHandler(handler, "/https1/echo/");
https1URI = "https://" + https1TestServer.serverAuthority() + "/https1/echo/x";
// HTTP/2
http2TestServer = HttpTestServer.of(new Http2TestServer("localhost", false, 0));
http2TestServer.addHandler(handler, "/http2/echo/");
http2URI = "http://" + http2TestServer.serverAuthority() + "/http2/echo/x";
https2TestServer = HttpTestServer.of(new Http2TestServer("localhost", true, sslContext));
https2TestServer.addHandler(handler, "/https2/echo/");
https2URI = "https://" + https2TestServer.serverAuthority() + "/https2/echo/x";
serverCount.addAndGet(4);
http1TestServer.start();
https1TestServer.start();
http2TestServer.start();
https2TestServer.start();
}
@AfterTest
public void teardown() throws Exception {
String sharedClientName =
sharedClient == null ? null : sharedClient.toString();
sharedClient = null;
Thread.sleep(100);
AssertionError fail = TRACKER.check(500);
try {
http1TestServer.stop();
https1TestServer.stop();
http2TestServer.stop();
https2TestServer.stop();
} finally {
if (fail != null) {
if (sharedClientName != null) {
System.err.println("Shared client name is: " + sharedClientName);
}
throw fail;
}
}
}
}

@ -0,0 +1,92 @@
/*
* Copyright (c) 2020, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
import org.reactivestreams.tck.TestEnvironment;
import org.reactivestreams.tck.flow.FlowPublisherVerification;
import java.net.http.HttpRequest.BodyPublisher;
import java.net.http.HttpRequest.BodyPublishers;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Flow.Publisher;
/* See TckDriver.java for more information */
public class BodyPublishersConcat
extends FlowPublisherVerification<ByteBuffer> {
private static final int ELEMENT_SIZE = 16 * 1024;
public BodyPublishersConcat() {
super(new TestEnvironment(450L));
}
private static BodyPublisher ofByteArrays(int n, byte[] bytes) {
return BodyPublishers.ofByteArrays(Collections.nCopies((int) n, bytes));
}
@Override
public Publisher<ByteBuffer> createFlowPublisher(long nElements) {
System.out.println("BodyPublishersConcat: %d elements requested"
.formatted(nElements));
byte[] bytes = S.arrayOfNRandomBytes(ELEMENT_SIZE);
if (nElements == 0) {
System.out.println("BodyPublishersConcat: empty publisher");
return BodyPublishers.concat();
} else if (nElements == 1) {
System.out.println("BodyPublishersConcat: singleton publisher");
return BodyPublishers.concat(ofByteArrays(1, bytes));
} else if (nElements < 4) {
int left = (int)nElements/2;
int right = (int)nElements - left;
System.out.println("BodyPublishersConcat: dual publisher (%d, %d)".formatted(left, right));
return BodyPublishers.concat(ofByteArrays(left, bytes),
ofByteArrays(right, bytes));
} else {
List<BodyPublisher> publishers = new ArrayList<>();
List<Integer> sizes = new ArrayList<>();
long remaining = nElements;
int max = (int) Math.min((long)Integer.MAX_VALUE, nElements/2L);
while (remaining > 0) {
int length = S.randomIntUpTo(max);
if (length == 0) length = 1;
sizes.add(length);
if (remaining > length) {
publishers.add(ofByteArrays(length, bytes));
remaining = remaining - length;
} else {
publishers.add(ofByteArrays((int)remaining, bytes));
remaining = 0;
}
}
System.out.println("BodyPublishersConcat: multi publisher " + sizes);
return BodyPublishers.concat(publishers.toArray(BodyPublisher[]::new));
}
}
@Override
public Publisher<ByteBuffer> createFailedFlowPublisher() {
return null;
}
}

@ -38,6 +38,7 @@
* @compile -encoding UTF-8 BodyPublishersOfFile.java
* @compile -encoding UTF-8 BodyPublishersOfInputStream.java
* @compile -encoding UTF-8 BodyPublishersOfSubByteArray.java
* @compile -encoding UTF-8 BodyPublishersConcat.java
*
* @compile -encoding UTF-8 BodySubscribersBuffering.java
* @compile -encoding UTF-8 BodySubscribersDiscarding.java
@ -65,6 +66,7 @@
* @run testng/othervm BodyPublishersOfFile
* @run testng/othervm BodyPublishersOfInputStream
* @run testng/othervm BodyPublishersOfSubByteArray
* @run testng/othervm BodyPublishersConcat
*
* @run testng/othervm BodySubscribersBuffering
* @run testng/othervm BodySubscribersDiscarding