8175274: Fix httpclient asynchronous usage

Reviewed-by: dfuchs, michaelm
This commit is contained in:
Sergey Kuksenko 2017-02-21 11:08:34 +00:00
parent e3af634a14
commit 4a259fb5dd
12 changed files with 668 additions and 251 deletions

View File

@ -27,7 +27,6 @@ package jdk.incubator.http;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Proxy;
import java.net.ProxySelector;
import java.net.SocketPermission;
import java.net.URI;
@ -71,7 +70,6 @@ final class Exchange<T> {
final Executor parentExecutor;
final HttpRequest.BodyProcessor requestProcessor;
boolean upgrading; // to HTTP/2
volatile Executor responseExecutor;
final PushGroup<?,T> pushGroup;
// buffer for receiving response headers
@ -139,7 +137,7 @@ final class Exchange<T> {
}
public CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler) {
return exchImpl.readBodyAsync(handler, true, responseExecutor);
return exchImpl.readBodyAsync(handler, true, parentExecutor);
}
public void cancel() {
@ -224,7 +222,8 @@ final class Exchange<T> {
return checkForUpgrade(resp, exchImpl);
} else {
exchImpl.sendRequest();
exchImpl.sendHeadersOnly();
exchImpl.sendBody();
Response resp = exchImpl.getResponse();
HttpResponseImpl.logResponse(resp);
return checkForUpgrade(resp, exchImpl);
@ -235,8 +234,6 @@ final class Exchange<T> {
// will be a non null responseAsync if expect continue returns an error
public CompletableFuture<Response> responseAsync() {
// take one thread from supplied executor to handle response headers and body
responseExecutor = Utils.singleThreadExecutor(parentExecutor);
return responseAsyncImpl(null);
}
@ -267,20 +264,18 @@ final class Exchange<T> {
Log.logTrace("Sending Expect: 100-Continue");
return exchImpl
.sendHeadersAsync()
.thenCompose((v) -> exchImpl.getResponseAsync(responseExecutor))
.thenCompose(v -> exchImpl.getResponseAsync(parentExecutor))
.thenCompose((Response r1) -> {
HttpResponseImpl.logResponse(r1);
int rcode = r1.statusCode();
if (rcode == 100) {
Log.logTrace("Received 100-Continue: sending body");
return exchImpl.sendBodyAsync(parentExecutor)
.thenCompose((v) -> exchImpl.getResponseAsync(responseExecutor))
.thenCompose((Response r2) -> {
return checkForUpgradeAsync(r2, exchImpl);
}).thenApply((Response r) -> {
HttpResponseImpl.logResponse(r);
return r;
});
CompletableFuture<Response> cf =
exchImpl.sendBodyAsync()
.thenCompose(exIm -> exIm.getResponseAsync(parentExecutor));
cf = wrapForUpgrade(cf);
cf = wrapForLog(cf);
return cf;
} else {
Log.logTrace("Expectation failed: Received {0}",
rcode);
@ -289,26 +284,38 @@ final class Exchange<T> {
"Unable to handle 101 while waiting for 100");
return MinimalFuture.failedFuture(failed);
}
return exchImpl.readBodyAsync(this::ignoreBody, false, responseExecutor)
.thenApply((v) -> {
return r1;
});
return exchImpl.readBodyAsync(this::ignoreBody, false, parentExecutor)
.thenApply(v -> r1);
}
});
} else {
return exchImpl
.sendRequestAsync(parentExecutor)
.thenCompose((v) -> exchImpl.getResponseAsync(responseExecutor))
.thenCompose((Response r1) -> {
return checkForUpgradeAsync(r1, exchImpl);
})
.thenApply((Response response) -> {
HttpResponseImpl.logResponse(response);
return response;
});
CompletableFuture<Response> cf = exchImpl
.sendHeadersAsync()
.thenCompose(ExchangeImpl::sendBodyAsync)
.thenCompose(exIm -> exIm.getResponseAsync(parentExecutor));
cf = wrapForUpgrade(cf);
cf = wrapForLog(cf);
return cf;
}
}
private CompletableFuture<Response> wrapForUpgrade(CompletableFuture<Response> cf) {
if (upgrading) {
return cf.thenCompose(r -> checkForUpgradeAsync(r, exchImpl));
}
return cf;
}
private CompletableFuture<Response> wrapForLog(CompletableFuture<Response> cf) {
if (Log.requests()) {
return cf.thenApply(response -> {
HttpResponseImpl.logResponse(response);
return response;
});
}
return cf;
}
HttpResponse.BodyProcessor<T> ignoreBody(int status, HttpHeaders hdrs) {
return HttpResponse.BodyProcessor.discard((T)null);
}

View File

@ -102,16 +102,12 @@ abstract class ExchangeImpl<T> {
// Blocking impl but in async style
CompletableFuture<Void> sendHeadersAsync() {
CompletableFuture<Void> cf = new MinimalFuture<>();
try {
sendHeadersOnly();
cf.complete(null);
} catch (Throwable t) {
cf.completeExceptionally(t);
}
CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
// this is blocking. cf will already be completed.
return cf;
return MinimalFuture.supply(() -> {
sendHeadersOnly();
return this;
});
}
/**
@ -156,39 +152,13 @@ abstract class ExchangeImpl<T> {
// Async version of sendBody(). This only used when body sent separately
// to headers (100 continue)
CompletableFuture<Void> sendBodyAsync(Executor executor) {
CompletableFuture<Void> cf = new MinimalFuture<>();
executor.execute(() -> {
try {
sendBody();
cf.complete(null);
} catch (Throwable t) {
cf.completeExceptionally(t);
}
CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
return MinimalFuture.supply(() -> {
sendBody();
return this;
});
return cf;
}
/**
* Sends the entire request (headers and body) blocking.
*/
void sendRequest() throws IOException, InterruptedException {
sendHeadersOnly();
sendBody();
}
CompletableFuture<Void> sendRequestAsync(Executor executor) {
CompletableFuture<Void> cf = new MinimalFuture<>();
executor.execute(() -> {
try {
sendRequest();
cf.complete(null);
} catch (Throwable t) {
cf.completeExceptionally(t);
}
});
return cf;
}
/**
* Cancels a request. Not currently exposed through API.
*/

View File

@ -194,17 +194,11 @@ class Http1Exchange<T> extends ExchangeImpl<T> {
}
CompletableFuture<Response> getResponseAsyncImpl(Executor executor) {
CompletableFuture<Response> cf = new MinimalFuture<>();
executor.execute(() -> {
try {
response = new Http1Response<>(connection, Http1Exchange.this);
response.readHeaders();
cf.complete(response.response());
} catch (Throwable e) {
cf.completeExceptionally(e);
}
});
return cf;
return MinimalFuture.supply( () -> {
response = new Http1Response<>(connection, Http1Exchange.this);
response.readHeaders();
return response.response();
}, executor);
}
@Override

View File

@ -241,14 +241,7 @@ class Http2Connection {
Http2ClientImpl client2,
Exchange<?> exchange,
ByteBuffer initial) {
CompletableFuture<Http2Connection> cf = new MinimalFuture<>();
try {
Http2Connection c = new Http2Connection(connection, client2, exchange, initial);
cf.complete(c);
} catch (IOException | InterruptedException e) {
cf.completeExceptionally(e);
}
return cf;
return MinimalFuture.supply(() -> new Http2Connection(connection, client2, exchange, initial));
}
/**

View File

@ -235,7 +235,7 @@ class HttpClientImpl extends HttpClient {
sendAsync(HttpRequest req, HttpResponse.BodyHandler<T> responseHandler)
{
MultiExchange<Void,T> mex = new MultiExchange<>(req, this, responseHandler);
return mex.responseAsync(null)
return mex.responseAsync()
.thenApply((HttpResponseImpl<T> b) -> (HttpResponse<T>) b);
}

View File

@ -35,6 +35,8 @@ import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.function.BiFunction;
import java.util.concurrent.Executor;
import java.util.function.UnaryOperator;
import jdk.incubator.http.internal.common.Log;
import jdk.incubator.http.internal.common.MinimalFuture;
import jdk.incubator.http.internal.common.Pair;
@ -150,8 +152,7 @@ class MultiExchange<U,T> {
Exchange<T> currExchange = getExchange();
requestFilters(r);
Response response = currExchange.response();
Pair<Response, HttpRequestImpl> filterResult = responseFilters(response);
HttpRequestImpl newreq = filterResult.second;
HttpRequestImpl newreq = responseFilters(response);
if (newreq == null) {
if (attempts > 1) {
Log.logError("Succeeded on attempt: " + attempts);
@ -213,23 +214,7 @@ class MultiExchange<U,T> {
Log.logTrace("All filters applied");
}
// Filters are assumed to be non-blocking so the async
// versions of these methods just call the blocking ones
private CompletableFuture<Void> requestFiltersAsync(HttpRequestImpl r) {
CompletableFuture<Void> cf = new MinimalFuture<>();
try {
requestFilters(r);
cf.complete(null);
} catch(Throwable e) {
cf.completeExceptionally(e);
}
return cf;
}
private Pair<Response,HttpRequestImpl>
responseFilters(Response response) throws IOException
private HttpRequestImpl responseFilters(Response response) throws IOException
{
Log.logTrace("Applying response filters");
for (HeaderFilter filter : filters) {
@ -237,24 +222,11 @@ class MultiExchange<U,T> {
HttpRequestImpl newreq = filter.response(response);
if (newreq != null) {
Log.logTrace("New request: stopping filters");
return pair(null, newreq);
return newreq;
}
}
Log.logTrace("All filters applied");
return pair(response, null);
}
private CompletableFuture<Pair<Response,HttpRequestImpl>>
responseFiltersAsync(Response response)
{
CompletableFuture<Pair<Response,HttpRequestImpl>> cf = new MinimalFuture<>();
try {
Pair<Response,HttpRequestImpl> n = responseFilters(response); // assumed to be fast
cf.complete(n);
} catch (Throwable e) {
cf.completeExceptionally(e);
}
return cf;
return null;
}
public void cancel() {
@ -267,24 +239,27 @@ class MultiExchange<U,T> {
getExchange().cancel(cause);
}
public CompletableFuture<HttpResponseImpl<T>> responseAsync(Void v) {
return responseAsync1(null)
public CompletableFuture<HttpResponseImpl<T>> responseAsync() {
CompletableFuture<Void> start = new MinimalFuture<>();
CompletableFuture<HttpResponseImpl<T>> cf = responseAsync0(start);
start.completeAsync( () -> null, executor); // trigger execution
return cf;
}
private CompletableFuture<HttpResponseImpl<T>> responseAsync0(CompletableFuture<Void> start) {
return start.thenCompose( v -> responseAsyncImpl())
.thenCompose((Response r) -> {
Exchange<T> exch = getExchange();
return exch.readBodyAsync(responseHandler)
.thenApply((T body) -> {
Pair<Response,T> result = new Pair<>(r, body);
return result;
});
})
.thenApply((Pair<Response,T> result) -> {
return new HttpResponseImpl<>(userRequest, result.first, result.second, getExchange());
.thenApply((T body) -> new HttpResponseImpl<>(userRequest, r, body, exch));
});
}
CompletableFuture<U> multiResponseAsync() {
CompletableFuture<HttpResponse<T>> mainResponse = responseAsync(null)
.thenApply((HttpResponseImpl<T> b) -> {
CompletableFuture<Void> start = new MinimalFuture<>();
CompletableFuture<HttpResponseImpl<T>> cf = responseAsync0(start);
CompletableFuture<HttpResponse<T>> mainResponse =
cf.thenApply((HttpResponseImpl<T> b) -> {
multiResponseHandler.onResponse(b);
return (HttpResponse<T>)b;
});
@ -295,10 +270,12 @@ class MultiExchange<U,T> {
// All push promises received by now.
pushGroup.noMorePushes(true);
});
return multiResponseHandler.completion(pushGroup.groupResult(), pushGroup.pushesCF());
CompletableFuture<U> res = multiResponseHandler.completion(pushGroup.groupResult(), pushGroup.pushesCF());
start.completeAsync( () -> null, executor); // trigger execution
return res;
}
private CompletableFuture<Response> responseAsync1(Void v) {
private CompletableFuture<Response> responseAsyncImpl() {
CompletableFuture<Response> cf;
if (++attempts > max_attempts) {
cf = MinimalFuture.failedFuture(new IOException("Too many retries"));
@ -307,48 +284,51 @@ class MultiExchange<U,T> {
timedEvent = new TimedEvent(currentreq.duration());
client.registerTimer(timedEvent);
}
try {
// 1. Apply request filters
requestFilters(currentreq);
} catch (IOException e) {
return MinimalFuture.failedFuture(e);
}
Exchange<T> exch = getExchange();
// 1. Apply request filters
cf = requestFiltersAsync(currentreq)
// 2. get response
.thenCompose((v1) -> {
return exch.responseAsync();
})
// 3. Apply response filters
.thenCompose(this::responseFiltersAsync)
// 4. Check filter result and repeat or continue
.thenCompose((Pair<Response,HttpRequestImpl> pair) -> {
Response resp = pair.first;
if (resp != null) {
// 2. get response
cf = exch.responseAsync()
.thenCompose((Response response) -> {
HttpRequestImpl newrequest = null;
try {
// 3. Apply response filters
newrequest = responseFilters(response);
} catch (IOException e) {
return MinimalFuture.failedFuture(e);
}
// 4. Check filter result and repeat or continue
if (newrequest == null) {
if (attempts > 1) {
Log.logError("Succeeded on attempt: " + attempts);
}
return MinimalFuture.completedFuture(resp);
return MinimalFuture.completedFuture(response);
} else {
currentreq = pair.second;
Exchange<T> previous = exch;
currentreq = newrequest;
setExchange(new Exchange<>(currentreq, this, acc));
//reads body off previous, and then waits for next response
return responseAsync1(null);
return responseAsyncImpl();
}
})
// 5. Convert result to Pair
.handle((BiFunction<Response, Throwable, Pair<Response, Throwable>>) Pair::new)
// 6. Handle errors and cancel any timer set
.thenCompose((Pair<Response,Throwable> obj) -> {
Response response = obj.first;
// 5. Handle errors and cancel any timer set
.handle((response, ex) -> {
if (response != null) {
return MinimalFuture.completedFuture(response);
}
// all exceptions thrown are handled here
CompletableFuture<Response> error = getExceptionalCF(obj.second);
CompletableFuture<Response> error = getExceptionalCF(ex);
if (error == null) {
cancelTimer();
return responseAsync1(null);
return responseAsyncImpl();
} else {
return error;
}
});
})
.thenCompose(UnaryOperator.identity());
}
return cf;
}

View File

@ -50,7 +50,7 @@ class PlainTunnelingConnection extends HttpConnection {
.thenCompose((Void v) -> {
HttpRequestImpl req = new HttpRequestImpl("CONNECT", client, address);
MultiExchange<Void,Void> mconnectExchange = new MultiExchange<>(req, client, this::ignore);
return mconnectExchange.responseAsync(null)
return mconnectExchange.responseAsync()
.thenCompose((HttpResponseImpl<Void> resp) -> {
CompletableFuture<Void> cf = new MinimalFuture<>();
if (resp.statusCode() != 200) {

View File

@ -50,18 +50,11 @@ class SSLConnection extends HttpConnection {
@Override
public CompletableFuture<Void> connectAsync() {
return delegate.connectAsync()
.thenCompose((Void v) -> {
CompletableFuture<Void> cf = new MinimalFuture<>();
try {
this.sslDelegate = new SSLDelegate(delegate.channel(),
client,
alpn);
cf.complete(null);
} catch (IOException e) {
cf.completeExceptionally(e);
}
return cf;
});
.thenCompose((Void v) ->
MinimalFuture.supply( () -> {
this.sslDelegate = new SSLDelegate(delegate.channel(), client, alpn);
return null;
}));
}
@Override

View File

@ -39,6 +39,8 @@ import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import jdk.incubator.http.internal.common.*;
import jdk.incubator.http.internal.frame.*;
import jdk.incubator.http.internal.hpack.DecodingCallback;
@ -96,7 +98,7 @@ import jdk.incubator.http.internal.hpack.DecodingCallback;
*/
class Stream<T> extends ExchangeImpl<T> {
final Queue<Http2Frame> inputQ;
final AsyncDataReadQueue inputQ = new AsyncDataReadQueue();
/**
* This stream's identifier. Assigned lazily by the HTTP2Connection before
@ -169,7 +171,7 @@ class Stream<T> extends ExchangeImpl<T> {
{
CompletableFuture<T> cf = readBodyAsync(handler,
returnToCache,
this::executeInline);
null);
try {
return cf.join();
} catch (CompletionException e) {
@ -177,10 +179,6 @@ class Stream<T> extends ExchangeImpl<T> {
}
}
void executeInline(Runnable r) {
r.run();
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
@ -189,60 +187,69 @@ class Stream<T> extends ExchangeImpl<T> {
return sb.toString();
}
private boolean receiveDataFrame(Http2Frame frame) throws IOException, InterruptedException {
if (frame instanceof ResetFrame) {
handleReset((ResetFrame) frame);
return true;
} else if (!(frame instanceof DataFrame)) {
assert false;
return true;
}
DataFrame df = (DataFrame) frame;
// RFC 7540 6.1:
// The entire DATA frame payload is included in flow control,
// including the Pad Length and Padding fields if present
int len = df.payloadLength();
ByteBufferReference[] buffers = df.getData();
for (ByteBufferReference b : buffers) {
ByteBuffer buf = b.get();
if (buf.hasRemaining()) {
publisher.acceptData(Optional.of(buf));
}
}
connection.windowUpdater.update(len);
if (df.getFlag(DataFrame.END_STREAM)) {
setEndStreamReceived();
publisher.acceptData(Optional.empty());
return false;
}
// Don't send window update on a stream which is
// closed or half closed.
windowUpdater.update(len);
return true;
}
// pushes entire response body into response processor
// blocking when required by local or remote flow control
CompletableFuture<T> receiveData(Executor executor) {
CompletableFuture<T> cf = responseProcessor
.getBody()
.toCompletableFuture();
executor.execute(() -> {
Http2Frame frame;
DataFrame df = null;
try {
if (!endStreamReceived()) {
do {
frame = inputQ.take();
if (frame instanceof ResetFrame) {
handleReset((ResetFrame)frame);
continue;
} else if (!(frame instanceof DataFrame)) {
assert false;
continue;
}
df = (DataFrame) frame;
// RFC 7540 6.1:
// The entire DATA frame payload is included in flow control,
// including the Pad Length and Padding fields if present
int len = df.payloadLength();
ByteBufferReference[] buffers = df.getData();
for (ByteBufferReference b : buffers) {
publisher.acceptData(Optional.of(b.get()));
}
connection.windowUpdater.update(len);
if (df.getFlag(DataFrame.END_STREAM)) {
break;
}
// Don't send window update on a stream which is
// closed or half closed.
windowUpdater.update(len);
} while (true);
setEndStreamReceived();
}
publisher.acceptData(Optional.empty());
} catch (Throwable e) {
Log.logTrace("receiveData: {0}", e.toString());
e.printStackTrace();
cf.completeExceptionally(e);
publisher.acceptError(e);
}
});
Consumer<Throwable> onError = e -> {
Log.logTrace("receiveData: {0}", e.toString());
e.printStackTrace();
cf.completeExceptionally(e);
publisher.acceptError(e);
};
if (executor == null) {
inputQ.blockingReceive(this::receiveDataFrame, onError);
} else {
inputQ.asyncReceive(executor, this::receiveDataFrame, onError);
}
return cf;
}
@Override
void sendBody() throws IOException, InterruptedException {
sendBodyImpl();
void sendBody() throws IOException {
try {
sendBodyImpl().join();
} catch (CompletionException e) {
throw Utils.getIOException(e);
}
}
CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
return sendBodyImpl().thenApply( v -> this);
}
@SuppressWarnings("unchecked")
@ -268,7 +275,6 @@ class Stream<T> extends ExchangeImpl<T> {
};
this.requestPseudoHeaders = new HttpHeadersImpl();
// NEW
this.inputQ = new Queue<>();
this.publisher = new BlockingPushPublisher<>();
this.windowUpdater = new StreamWindowUpdateSender(connection);
}
@ -673,6 +679,10 @@ class Stream<T> extends ExchangeImpl<T> {
response_cfs.add(cf);
}
}
if (executor != null && !cf.isDone()) {
// protect from executing later chain of CompletableFuture operations from SelectorManager thread
cf = cf.thenApplyAsync(r -> r, executor);
}
Log.logTrace("Response future (stream={0}) is: {1}", streamid, cf);
PushGroup<?,?> pg = exchange.getPushGroup();
if (pg != null) {
@ -743,20 +753,12 @@ class Stream<T> extends ExchangeImpl<T> {
}
}
private void waitForCompletion() throws IOException {
try {
requestBodyCF.join();
} catch (CompletionException e) {
throw Utils.getIOException(e);
}
}
void sendBodyImpl() throws IOException, InterruptedException {
CompletableFuture<Void> sendBodyImpl() {
RequestSubscriber subscriber = new RequestSubscriber(requestContentLen);
subscriber.setClient(client);
requestProcessor.subscribe(subscriber);
waitForCompletion();
requestSent();
requestBodyCF.whenComplete((v,t) -> requestSent());
return requestBodyCF;
}
@Override
@ -846,30 +848,24 @@ class Stream<T> extends ExchangeImpl<T> {
// error record it in the PushGroup. The error method is called
// with a null value when no error occurred (is a no-op)
@Override
CompletableFuture<Void> sendBodyAsync(Executor executor) {
return super.sendBodyAsync(executor)
.whenComplete((Void v, Throwable t) -> {
pushGroup.pushError(t);
});
CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
return super.sendBodyAsync()
.whenComplete((ExchangeImpl<T> v, Throwable t) -> pushGroup.pushError(t));
}
@Override
CompletableFuture<Void> sendHeadersAsync() {
CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
return super.sendHeadersAsync()
.whenComplete((Void v, Throwable t) -> {
pushGroup.pushError(t);
});
}
@Override
CompletableFuture<Void> sendRequestAsync(Executor executor) {
return super.sendRequestAsync(executor)
.whenComplete((v, t) -> pushGroup.pushError(t));
.whenComplete((ExchangeImpl<T> ex, Throwable t) -> pushGroup.pushError(t));
}
@Override
CompletableFuture<Response> getResponseAsync(Executor executor) {
return pushCF.whenComplete((v, t) -> pushGroup.pushError(t));
CompletableFuture<Response> cf = pushCF.whenComplete((v, t) -> pushGroup.pushError(t));
if(executor!=null && !cf.isDone()) {
cf = cf.thenApplyAsync( r -> r, executor);
}
return cf;
}
@Override
@ -887,7 +883,8 @@ class Stream<T> extends ExchangeImpl<T> {
HttpResponseImpl.logResponse(r);
pushCF.complete(r); // not strictly required for push API
// start reading the body using the obtained BodyProcessor
readBodyAsync(getPushHandler(), false, getExchange().executor())
CompletableFuture<Void> start = new MinimalFuture<>();
start.thenCompose( v -> readBodyAsync(getPushHandler(), false, getExchange().executor()))
.whenComplete((T body, Throwable t) -> {
if (t != null) {
responseCF.completeExceptionally(t);
@ -896,6 +893,7 @@ class Stream<T> extends ExchangeImpl<T> {
responseCF.complete(response);
}
});
start.completeAsync(() -> null, getExchange().executor());
}
@Override

View File

@ -0,0 +1,212 @@
/*
* Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package jdk.incubator.http.internal.common;
import jdk.incubator.http.internal.frame.DataFrame;
import jdk.incubator.http.internal.frame.Http2Frame;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
/**
* Http2Frame Producer-Consumer queue which either allows to consume all frames in blocking way
* or allows to consume it asynchronously. In the latter case put operation from the producer thread
* executes consume operation in the given executor.
*/
public class AsyncDataReadQueue implements Closeable {
@FunctionalInterface
public interface DataConsumer {
/**
*
* @param t - frame
* @return true if consuming should be continued. false when END_STREAM was received.
* @throws Throwable
*/
boolean accept(Http2Frame t) throws Throwable;
}
private static final int BLOCKING = 0;
private static final int FLUSHING = 1;
private static final int REFLUSHING = 2;
private static final int ASYNC = 3;
private static final int CLOSED = 4;
private final AtomicInteger state = new AtomicInteger(BLOCKING);
private final BlockingQueue<Http2Frame> queue = new LinkedBlockingQueue<>();
private Executor executor;
private DataConsumer onData;
private Consumer<Throwable> onError;
public AsyncDataReadQueue() {
}
public boolean tryPut(Http2Frame f) {
if(state.get() == CLOSED) {
return false;
} else {
queue.offer(f);
flushAsync(false);
return true;
}
}
public void put(Http2Frame f) throws IOException {
if(!tryPut(f))
throw new IOException("stream closed");
}
public void blockingReceive(DataConsumer onData, Consumer<Throwable> onError) {
if (state.get() == CLOSED) {
onError.accept(new IOException("stream closed"));
return;
}
assert state.get() == BLOCKING;
try {
while (onData.accept(queue.take()));
assert state.get() == CLOSED;
} catch (Throwable e) {
onError.accept(e);
}
}
public void asyncReceive(Executor executor, DataConsumer onData,
Consumer<Throwable> onError) {
if (state.get() == CLOSED) {
onError.accept(new IOException("stream closed"));
return;
}
assert state.get() == BLOCKING;
// Validates that fields not already set.
if (!checkCanSet("executor", this.executor, onError)
|| !checkCanSet("onData", this.onData, onError)
|| !checkCanSet("onError", this.onError, onError)) {
return;
}
this.executor = executor;
this.onData = onData;
this.onError = onError;
// This will report an error if asyncReceive is called twice,
// because we won't be in BLOCKING state if that happens
if (!this.state.compareAndSet(BLOCKING, ASYNC)) {
onError.accept(new IOException(
new IllegalStateException("State: "+this.state.get())));
return;
}
flushAsync(true);
}
private static <T> boolean checkCanSet(String name, T oldval, Consumer<Throwable> onError) {
if (oldval != null) {
onError.accept(new IOException(
new IllegalArgumentException(name)));
return false;
}
return true;
}
@Override
public void close() {
int prevState = state.getAndSet(CLOSED);
if(prevState == BLOCKING) {
// wake up blocked take()
queue.offer(new DataFrame(0, DataFrame.END_STREAM, new ByteBufferReference[0]));
}
}
private void flushAsync(boolean alreadyInExecutor) {
while(true) {
switch (state.get()) {
case BLOCKING:
case CLOSED:
case REFLUSHING:
return;
case ASYNC:
if(state.compareAndSet(ASYNC, FLUSHING)) {
if(alreadyInExecutor) {
flushLoop();
} else {
executor.execute(this::flushLoop);
}
return;
}
break;
case FLUSHING:
if(state.compareAndSet(FLUSHING, REFLUSHING)) {
return;
}
break;
}
}
}
private void flushLoop() {
try {
while(true) {
Http2Frame frame = queue.poll();
while (frame != null) {
if(!onData.accept(frame)) {
assert state.get() == CLOSED;
return; // closed
}
frame = queue.poll();
}
switch (state.get()) {
case BLOCKING:
assert false;
break;
case ASYNC:
throw new RuntimeException("Shouldn't happen");
case FLUSHING:
if(state.compareAndSet(FLUSHING, ASYNC)) {
return;
}
break;
case REFLUSHING:
// We need to check if new elements were put after last
// poll() and do graceful exit
state.compareAndSet(REFLUSHING, FLUSHING);
break;
case CLOSED:
return;
}
}
} catch (Throwable e) {
onError.accept(e);
close();
}
}
}

View File

@ -26,6 +26,7 @@
package jdk.incubator.http.internal.common;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.function.BiConsumer;
@ -40,6 +41,11 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public final class MinimalFuture<T> extends CompletableFuture<T> {
@FunctionalInterface
public interface ExceptionalSupplier<U> {
U get() throws Throwable;
}
final static AtomicLong TOKENS = new AtomicLong();
final long id;
@ -56,6 +62,29 @@ public final class MinimalFuture<T> extends CompletableFuture<T> {
return f;
}
public static <U> CompletableFuture<U> supply(ExceptionalSupplier<U> supplier) {
CompletableFuture<U> cf = new MinimalFuture<>();
try {
U value = supplier.get();
cf.complete(value);
} catch (Throwable t) {
cf.completeExceptionally(t);
}
return cf;
}
public static <U> CompletableFuture<U> supply(ExceptionalSupplier<U> supplier, Executor executor) {
CompletableFuture<U> cf = new MinimalFuture<>();
cf.completeAsync( () -> {
try {
return supplier.get();
} catch (Throwable ex) {
throw new CompletionException(ex);
}
}, executor);
return cf;
}
public MinimalFuture() {
super();
this.id = TOKENS.incrementAndGet();

View File

@ -0,0 +1,241 @@
/*
* Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
/*
* @test
* @bug 8087112
* @library /lib/testlibrary server
* @build jdk.testlibrary.SimpleSSLContext
* @modules jdk.incubator.httpclient/jdk.incubator.http.internal.common
* jdk.incubator.httpclient/jdk.incubator.http.internal.frame
* jdk.incubator.httpclient/jdk.incubator.http.internal.hpack
* @run testng/othervm -Djdk.httpclient.HttpClient.log=ssl,requests,responses,errors FixedThreadPoolTest
*/
import java.net.*;
import jdk.incubator.http.*;
import static jdk.incubator.http.HttpClient.Version.HTTP_2;
import javax.net.ssl.*;
import java.nio.file.*;
import java.util.concurrent.*;
import jdk.testlibrary.SimpleSSLContext;
import static jdk.incubator.http.HttpRequest.BodyProcessor.fromFile;
import static jdk.incubator.http.HttpRequest.BodyProcessor.fromString;
import static jdk.incubator.http.HttpResponse.BodyHandler.asFile;
import static jdk.incubator.http.HttpResponse.BodyHandler.asString;
import org.testng.annotations.Test;
@Test
public class FixedThreadPoolTest {
static int httpPort, httpsPort;
static Http2TestServer httpServer, httpsServer;
static HttpClient client = null;
static ExecutorService exec;
static SSLContext sslContext;
static String httpURIString, httpsURIString;
static void initialize() throws Exception {
try {
SimpleSSLContext sslct = new SimpleSSLContext();
sslContext = sslct.get();
client = getClient();
httpServer = new Http2TestServer(false, 0, exec, sslContext);
httpServer.addHandler(new EchoHandler(), "/");
httpPort = httpServer.getAddress().getPort();
httpsServer = new Http2TestServer(true, 0, exec, sslContext);
httpsServer.addHandler(new EchoHandler(), "/");
httpsPort = httpsServer.getAddress().getPort();
httpURIString = "http://127.0.0.1:" + httpPort + "/foo/";
httpsURIString = "https://127.0.0.1:" + httpsPort + "/bar/";
httpServer.start();
httpsServer.start();
} catch (Throwable e) {
System.err.println("Throwing now");
e.printStackTrace();
throw e;
}
}
@Test(timeOut=3000000)
public static void test() throws Exception {
try {
initialize();
simpleTest(false);
simpleTest(true);
streamTest(false);
streamTest(true);
paramsTest();
Thread.sleep(1000 * 4);
} catch (Exception | Error tt) {
tt.printStackTrace();
throw tt;
} finally {
httpServer.stop();
httpsServer.stop();
exec.shutdownNow();
}
}
static HttpClient getClient() {
if (client == null) {
exec = Executors.newCachedThreadPool();
client = HttpClient.newBuilder()
.executor(Executors.newFixedThreadPool(2))
.sslContext(sslContext)
.version(HTTP_2)
.build();
}
return client;
}
static URI getURI(boolean secure) {
if (secure)
return URI.create(httpsURIString);
else
return URI.create(httpURIString);
}
static void checkStatus(int expected, int found) throws Exception {
if (expected != found) {
System.err.printf ("Test failed: wrong status code %d/%d\n",
expected, found);
throw new RuntimeException("Test failed");
}
}
static void checkStrings(String expected, String found) throws Exception {
if (!expected.equals(found)) {
System.err.printf ("Test failed: wrong string %s/%s\n",
expected, found);
throw new RuntimeException("Test failed");
}
}
static Void compareFiles(Path path1, Path path2) {
return TestUtil.compareFiles(path1, path2);
}
static Path tempFile() {
return TestUtil.tempFile();
}
static final String SIMPLE_STRING = "Hello world Goodbye world";
static final int LOOPS = 32;
static final int FILESIZE = 64 * 1024 + 200;
static void streamTest(boolean secure) throws Exception {
URI uri = getURI(secure);
System.err.printf("streamTest %b to %s\n" , secure, uri);
HttpClient client = getClient();
Path src = TestUtil.getAFile(FILESIZE * 4);
HttpRequest req = HttpRequest.newBuilder(uri)
.POST(fromFile(src))
.build();
Path dest = Paths.get("streamtest.txt");
dest.toFile().delete();
CompletableFuture<Path> response = client.sendAsync(req, asFile(dest))
.thenApply(resp -> {
if (resp.statusCode() != 200)
throw new RuntimeException();
return resp.body();
});
response.join();
compareFiles(src, dest);
System.err.println("DONE");
}
static void paramsTest() throws Exception {
System.err.println("paramsTest");
Http2TestServer server = new Http2TestServer(true, 0, exec, sslContext);
server.addHandler((t -> {
SSLSession s = t.getSSLSession();
String prot = s.getProtocol();
if (prot.equals("TLSv1.2")) {
t.sendResponseHeaders(200, -1);
} else {
System.err.printf("Protocols =%s\n", prot);
t.sendResponseHeaders(500, -1);
}
}), "/");
server.start();
int port = server.getAddress().getPort();
URI u = new URI("https://127.0.0.1:"+port+"/foo");
HttpClient client = getClient();
HttpRequest req = HttpRequest.newBuilder(u).build();
HttpResponse<String> resp = client.sendAsync(req, asString()).get();
int stat = resp.statusCode();
if (stat != 200) {
throw new RuntimeException("paramsTest failed "
+ Integer.toString(stat));
}
}
static void simpleTest(boolean secure) throws Exception {
URI uri = getURI(secure);
System.err.println("Request to " + uri);
// Do a simple warmup request
HttpClient client = getClient();
HttpRequest req = HttpRequest.newBuilder(uri)
.POST(fromString(SIMPLE_STRING))
.build();
HttpResponse<String> response = client.sendAsync(req, asString()).get();
HttpHeaders h = response.headers();
checkStatus(200, response.statusCode());
String responseBody = response.body();
checkStrings(SIMPLE_STRING, responseBody);
checkStrings(h.firstValue("x-hello").get(), "world");
checkStrings(h.firstValue("x-bye").get(), "universe");
// Do loops asynchronously
CompletableFuture[] responses = new CompletableFuture[LOOPS];
final Path source = TestUtil.getAFile(FILESIZE);
HttpRequest request = HttpRequest.newBuilder(uri)
.POST(fromFile(source))
.build();
for (int i = 0; i < LOOPS; i++) {
responses[i] = client.sendAsync(request, asFile(tempFile()))
//.thenApply(resp -> compareFiles(resp.body(), source));
.thenApply(resp -> {
System.out.printf("Resp status %d body size %d\n",
resp.statusCode(), resp.body().toFile().length());
return compareFiles(resp.body(), source);
});
}
CompletableFuture.allOf(responses).join();
System.err.println("DONE");
}
}