8179021: Latest bugfixes to WebSocket/HPACK from the sandbox repo

Reviewed-by: dfuchs
This commit is contained in:
Pavel Rappo 2017-05-10 12:36:14 +01:00
parent e52af5f5b3
commit c74eb4302e
18 changed files with 656 additions and 368 deletions

View File

@ -190,7 +190,8 @@ module java.base {
jdk.unsupported;
exports jdk.internal.vm.annotation to
jdk.unsupported,
jdk.internal.vm.ci;
jdk.internal.vm.ci,
jdk.incubator.httpclient;
exports jdk.internal.util.jar to
jdk.jartool,
jdk.jdeps,

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -123,7 +123,7 @@ class HttpRequestImpl extends HttpRequest implements WebSocketRequest {
this.method = method;
this.systemHeaders = new HttpHeadersImpl();
this.userHeaders = ImmutableHeaders.empty();
this.uri = null;
this.uri = URI.create("socket://" + authority.getHostString() + ":" + Integer.toString(authority.getPort()) + "/");
this.requestProcessor = HttpRequest.noBody();
this.authority = authority;
this.secure = false;

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -27,6 +27,7 @@ package jdk.incubator.http;
import jdk.incubator.http.internal.common.ByteBufferReference;
import jdk.incubator.http.internal.common.MinimalFuture;
import jdk.incubator.http.HttpResponse.BodyHandler;
import java.io.IOException;
import java.net.InetSocketAddress;
@ -72,7 +73,8 @@ class PlainTunnelingConnection extends HttpConnection {
public void connect() throws IOException, InterruptedException {
delegate.connect();
HttpRequestImpl req = new HttpRequestImpl("CONNECT", client, address);
Exchange<?> connectExchange = new Exchange<>(req, null);
MultiExchange<Void,Void> mul = new MultiExchange<>(req, client, BodyHandler.<Void>discard(null));
Exchange<Void> connectExchange = new Exchange<>(req, mul);
Response r = connectExchange.responseImpl(delegate);
if (r.statusCode() != 200) {
throw new IOException("Tunnel failed");

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -43,7 +43,7 @@ import java.util.concurrent.CompletionStage;
* <p> To create a {@code WebSocket} use a {@linkplain HttpClient#newWebSocketBuilder(
* URI, Listener) builder}. Once a {@code WebSocket} is built, it's ready
* to send and receive messages. When the {@code WebSocket} is no longer needed
* it must be closed: a Close message must both be {@linkplain #sendClose()
* it must be closed: a Close message must both be {@linkplain #sendClose
* sent} and {@linkplain Listener#onClose(WebSocket, int, String) received}.
* The {@code WebSocket} may be also closed {@linkplain #abort() abruptly}.
*
@ -93,17 +93,6 @@ public interface WebSocket {
*/
int NORMAL_CLOSURE = 1000;
/**
* The WebSocket Close message status code (<code>{@value}</code>), is
* designated for use in applications expecting a status code to indicate
* that the connection was closed abnormally, e.g., without sending or
* receiving a Close message.
*
* @see Listener#onClose(WebSocket, int, String)
* @see #abort()
*/
int CLOSED_ABNORMALLY = 1006;
/**
* A builder for creating {@code WebSocket} instances.
* {@Incubating}
@ -509,7 +498,7 @@ public interface WebSocket {
*
* <p> The {@code WebSocket} will close at the earliest of completion of
* the returned {@code CompletionStage} or sending a Close message. In
* particular, if a Close message has been {@link WebSocket#sendClose()
* particular, if a Close message has been {@linkplain WebSocket#sendClose
* sent} before, then this invocation completes the closing handshake
* and by the time this method is invoked, the {@code WebSocket} will
* have been closed.
@ -642,44 +631,6 @@ public interface WebSocket {
*/
CompletableFuture<WebSocket> sendText(CharSequence message, boolean isLast);
/**
* Sends a whole Text message with characters from the given {@code
* CharSequence}.
*
* <p> This is a convenience method. For the general case, use {@link
* #sendText(CharSequence, boolean)}.
*
* <p> Returns a {@code CompletableFuture<WebSocket>} which completes
* normally when the message has been sent or completes exceptionally if an
* error occurs.
*
* <p> The {@code CharSequence} must not be modified until the returned
* {@code CompletableFuture} completes (either normally or exceptionally).
*
* <p> The returned {@code CompletableFuture} can complete exceptionally
* with:
* <ul>
* <li> {@link IllegalArgumentException} -
* if {@code message} is a malformed UTF-16 sequence
* <li> {@link IllegalStateException} -
* if the {@code WebSocket} is closed;
* or if a Close message has been sent;
* or if there is an outstanding send operation;
* or if a previous Binary message was sent with {@code isLast == false}
* <li> {@link IOException} -
* if an I/O error occurs during this operation;
* or if the {@code WebSocket} has been closed due to an error;
* </ul>
*
* @param message
* the message
*
* @return a {@code CompletableFuture} with this {@code WebSocket}
*/
default CompletableFuture<WebSocket> sendText(CharSequence message) {
return sendText(message, true);
}
/**
* Sends a Binary message with bytes from the given {@code ByteBuffer}.
*
@ -831,46 +782,9 @@ public interface WebSocket {
* the reason
*
* @return a {@code CompletableFuture} with this {@code WebSocket}
*
* @see #sendClose()
*/
CompletableFuture<WebSocket> sendClose(int statusCode, String reason);
/**
* Sends an empty Close message.
*
* <p> When this method has been invoked, no further messages can be sent.
*
* <p> For more details on Close message see RFC 6455 section
* <a href="https://tools.ietf.org/html/rfc6455#section-5.5.1">5.5.1. Close</a>
*
* <p> The method returns a {@code CompletableFuture<WebSocket>} which
* completes normally when the message has been sent or completes
* exceptionally if an error occurs.
*
* <p> The returned {@code CompletableFuture} can complete exceptionally
* with:
* <ul>
* <li> {@link IOException} -
* if an I/O error occurs during this operation;
* or the {@code WebSocket} has been closed due to an error
* </ul>
*
* <p> If this method has already been invoked or the {@code WebSocket} is
* closed, then subsequent invocations of this method have no effect and the
* returned {@code CompletableFuture} completes normally.
*
* <p> If a Close message has been {@linkplain Listener#onClose(WebSocket,
* int, String) received} before, then this invocation completes the closing
* handshake and by the time the returned {@code CompletableFuture}
* completes, the {@code WebSocket} will have been closed.
*
* @return a {@code CompletableFuture} with this {@code WebSocket}
*
* @see #sendClose(int, String)
*/
CompletableFuture<WebSocket> sendClose();
/**
* Allows {@code n} more messages to be received by the {@link Listener
* Listener}.
@ -928,8 +842,7 @@ public interface WebSocket {
* state.
*
* <p> As the result {@link Listener#onClose(WebSocket, int, String)
* Listener.onClose} will be invoked with the status code {@link
* #CLOSED_ABNORMALLY} unless either {@code onClose} or {@link
* Listener.onClose} will be invoked unless either {@code onClose} or {@link
* Listener#onError(WebSocket, Throwable) onError} has been invoked before.
* In which case no additional invocation will happen.
*

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2014, 2016, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2014, 2017, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -24,6 +24,8 @@
*/
package jdk.incubator.http.internal.hpack;
import jdk.internal.vm.annotation.Stable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.ProtocolException;
@ -60,6 +62,7 @@ import static java.util.Objects.requireNonNull;
*/
public final class Decoder {
@Stable
private static final State[] states = new State[256];
static {

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2014, 2016, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2014, 2017, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -24,6 +24,8 @@
*/
package jdk.incubator.http.internal.hpack;
import jdk.internal.vm.annotation.Stable;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
@ -40,6 +42,7 @@ import static java.lang.String.format;
//
final class HeaderTable {
@Stable
private static final HeaderField[] staticTable = {
null, // To make index 1-based, instead of 0-based
new HeaderField(":authority"),
@ -110,7 +113,7 @@ final class HeaderTable {
private static final Map<String, LinkedHashMap<String, Integer>> staticIndexes;
static {
staticIndexes = new HashMap<>(STATIC_TABLE_LENGTH);
staticIndexes = new HashMap<>(STATIC_TABLE_LENGTH); // TODO: Map.of
for (int i = 1; i <= STATIC_TABLE_LENGTH; i++) {
HeaderField f = staticTable[i];
Map<String, Integer> values = staticIndexes

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -46,7 +46,7 @@ public final class BuilderImpl implements Builder {
private final HttpClient client;
private final URI uri;
private final Listener listener;
private final List<Pair<String, String>> headers = new LinkedList<>();
private final Collection<Pair<String, String>> headers = new LinkedList<>();
private final Collection<String> subprotocols = new LinkedList<>();
private Duration timeout;
@ -65,17 +65,18 @@ public final class BuilderImpl implements Builder {
}
@Override
public Builder subprotocols(String mostPreferred, String... lesserPreferred)
public Builder subprotocols(String mostPreferred,
String... lesserPreferred)
{
requireNonNull(mostPreferred, "mostPreferred");
requireNonNull(lesserPreferred, "lesserPreferred");
List<String> subprotocols = new LinkedList<>();
subprotocols.add(mostPreferred);
for (int i = 0; i < lesserPreferred.length; i++) {
String p = lesserPreferred[i];
requireNonNull(p, "lesserPreferred[" + i + "]");
subprotocols.add(p);
}
subprotocols.add(0, mostPreferred);
this.subprotocols.clear();
this.subprotocols.addAll(subprotocols);
return this;
@ -98,20 +99,9 @@ public final class BuilderImpl implements Builder {
Listener getListener() { return listener; }
List<Pair<String, String>> getHeaders() { return headers; }
Collection<Pair<String, String>> getHeaders() { return headers; }
Collection<String> getSubprotocols() { return subprotocols; }
Duration getConnectTimeout() { return timeout; }
@Override
public String toString() {
return "WebSocket.Builder{"
+ ", uri=" + uri
+ ", listener=" + listener
+ (!headers.isEmpty() ? ", headers=" + headers : "")
+ (!subprotocols.isEmpty() ? ", subprotocols=" + subprotocols : "")
+ ( timeout != null ? ", connectTimeout=" + timeout : "")
+ '}';
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2016, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -25,70 +25,184 @@
package jdk.incubator.http.internal.websocket;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import static java.util.Objects.requireNonNull;
final class CooperativeHandler {
/*
* A synchronization aid that assists a number of parties in running a task
* in a mutually exclusive fashion.
*
* To run the task, a party invokes `handle`. To permanently prevent the task
* from subsequent runs, the party invokes `stop`.
*
* The parties do not have to operate in different threads.
*
* The task can be either synchronous or asynchronous.
*
* If the task is synchronous, it is represented with `Runnable`.
* The handler invokes `Runnable.run` to run the task.
*
* If the task is asynchronous, it is represented with `Consumer<Runnable>`.
* The handler invokes `Consumer.accept(end)` to begin the task. The task
* invokes `end.run()` when it has ended.
*
* The next run of the task will not begin until the previous run has finished.
*
* The task may invoke `handle()` by itself, it's a normal situation.
*/
public final class CooperativeHandler {
private static final long CONTINUE = 0;
private static final long OFF = 1;
private static final long ON = 2;
private static final long STOP = 4;
/*
Since the task is fixed and known beforehand, no blocking synchronization
(locks, queues, etc.) is required. The job can be done solely using
nonblocking primitives.
private final AtomicLong state = new AtomicLong(OFF);
The machinery below addresses two problems:
private final Runnable task;
1. Running the task in a sequential order (no concurrent runs):
CooperativeHandler(Runnable task) {
this.task = requireNonNull(task);
begin, end, begin, end...
2. Avoiding indefinite recursion:
begin
end
begin
end
...
Problem #1 is solved with a finite state machine with 4 states:
BEGIN, AGAIN, END, and STOP.
Problem #2 is solved with a "state modifier" OFFLOAD.
Parties invoke `handle()` to signal the task must run. A party that has
invoked `handle()` either begins the task or exploits the party that is
either beginning the task or ending it.
The party that is trying to end the task either ends it or begins it
again.
To avoid indefinite recursion, before re-running the task tryEnd() sets
OFFLOAD bit, signalling to its "child" tryEnd() that this ("parent")
tryEnd() is available and the "child" must offload the task on to the
"parent". Then a race begins. Whichever invocation of tryEnd() manages
to unset OFFLOAD bit first does not do the work.
There is at most 1 thread that is beginning the task and at most 2
threads that are trying to end it: "parent" and "child". In case of a
synchronous task "parent" and "child" are the same thread.
*/
private static final int OFFLOAD = 1;
private static final int AGAIN = 2;
private static final int BEGIN = 4;
private static final int STOP = 8;
private static final int END = 16;
private final AtomicInteger state = new AtomicInteger(END);
private final Consumer<Runnable> begin;
public CooperativeHandler(Runnable task) {
this(asyncOf(task));
}
public CooperativeHandler(Consumer<Runnable> begin) {
this.begin = requireNonNull(begin);
}
/*
* Causes the task supplied to the constructor to run. The task may be run
* by this thread as well as by any other that has invoked this method.
* Runs the task (though maybe by a different party).
*
* The recursion which is possible here will have the maximum depth of 1:
*
* task.run()
* this.startOrContinue()
* task.run()
* this.handle()
* begin.accept()
* this.handle()
*/
void startOrContinue() {
long s;
public void handle() {
while (true) {
s = state.get();
if (s == OFF && state.compareAndSet(OFF, ON)) {
// No one is running the task, we are going to run it
break;
}
if (s == ON && state.compareAndSet(ON, CONTINUE)) {
// Some other thread is running the task. We have managed to
// update the state, it will be surely noticed by that thread.
return;
}
if (s == CONTINUE || s == STOP) {
int s = state.get();
if (s == END) {
if (state.compareAndSet(END, BEGIN)) {
break;
}
} else if ((s & BEGIN) != 0) {
// Tries to change the state to AGAIN, preserving OFFLOAD bit
if (state.compareAndSet(s, AGAIN | (s & OFFLOAD))) {
return;
}
} else if ((s & AGAIN) != 0 || s == STOP) {
return;
} else {
throw new InternalError(String.valueOf(s));
}
}
begin.accept(this::tryEnd);
}
private void tryEnd() {
while (true) {
task.run();
// State checks are ordered by the probability of expected values
// (it might be different in different usage patterns, say, when
// invocations to `startOrContinue()` are concurrent)
if (state.compareAndSet(ON, OFF)) {
break; // The state hasn't changed, all done
int s;
while (((s = state.get()) & OFFLOAD) != 0) {
// Tries to offload ending of the task to the parent
if (state.compareAndSet(s, s & ~OFFLOAD)) {
return;
}
}
if (state.compareAndSet(CONTINUE, ON)) {
continue;
while (true) {
if (s == BEGIN) {
if (state.compareAndSet(BEGIN, END)) {
return;
}
} else if (s == AGAIN) {
if (state.compareAndSet(AGAIN, BEGIN | OFFLOAD)) {
break;
}
} else if (s == STOP) {
return;
} else {
throw new InternalError(String.valueOf(s));
}
s = state.get();
}
// Other threads can change the state from CONTINUE to STOP only
// So if it's not ON and not CONTINUE, it can only be STOP
break;
begin.accept(this::tryEnd);
}
}
void stop() {
/*
* Checks whether or not this handler has been permanently stopped.
*
* Should be used from inside the task to poll the status of the handler,
* pretty much the same way as it is done for threads:
*
* if (!Thread.currentThread().isInterrupted()) {
* ...
* }
*/
public boolean isStopped() {
return state.get() == STOP;
}
/*
* Signals this handler to ignore subsequent invocations to `handle()`.
*
* If the task has already begun, this invocation will not affect it,
* unless the task itself uses `isStopped()` method to check the state
* of the handler.
*/
public void stop() {
state.set(STOP);
}
private static Consumer<Runnable> asyncOf(Runnable task) {
requireNonNull(task);
return ender -> {
task.run();
ender.run();
};
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -25,6 +25,8 @@
package jdk.incubator.http.internal.websocket;
import jdk.internal.vm.annotation.Stable;
import java.nio.ByteBuffer;
import static jdk.incubator.http.internal.common.Utils.dump;
@ -58,6 +60,7 @@ final class Frame {
CONTROL_0xE (0xE),
CONTROL_0xF (0xF);
@Stable
private static final Opcode[] opcodes;
static {

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -38,7 +38,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Objects.requireNonNull;
import static jdk.incubator.http.internal.common.Utils.dump;
import static jdk.incubator.http.internal.websocket.StatusCodes.NO_STATUS_CODE;
import static jdk.incubator.http.internal.websocket.StatusCodes.checkIncomingCode;
import static jdk.incubator.http.internal.websocket.StatusCodes.isLegalToReceiveFromServer;
/*
* Consumes frame parts and notifies a message consumer, when there is
@ -212,20 +212,20 @@ class FrameConsumer implements Frame.Consumer {
}
switch (opcode) {
case CLOSE:
int statusCode = NO_STATUS_CODE;
char statusCode = NO_STATUS_CODE;
String reason = "";
if (payloadLen != 0) {
int len = binaryData.remaining();
assert 2 <= len && len <= 125 : dump(len, payloadLen);
try {
statusCode = checkIncomingCode(binaryData.getChar());
reason = UTF_8.newDecoder().decode(binaryData).toString();
} catch (CheckFailedException e) {
throw new FailWebSocketException("Incorrect status code")
.initCause(e);
} catch (CharacterCodingException e) {
statusCode = binaryData.getChar();
if (!isLegalToReceiveFromServer(statusCode)) {
throw new FailWebSocketException(
"Close reason is a malformed UTF-8 sequence")
"Illegal status code: " + statusCode);
}
try {
reason = UTF_8.newDecoder().decode(binaryData).toString();
} catch (CharacterCodingException e) {
throw new FailWebSocketException("Illegal close reason")
.initCause(e);
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -37,6 +37,8 @@ import jdk.incubator.http.HttpRequest;
import jdk.incubator.http.HttpResponse;
import jdk.incubator.http.HttpResponse.BodyHandler;
import jdk.incubator.http.WebSocketHandshakeException;
import jdk.incubator.http.internal.common.Pair;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
@ -66,7 +68,6 @@ final class OpeningHandshake {
private static final String HEADER_KEY = "Sec-WebSocket-Key";
private static final String HEADER_PROTOCOL = "Sec-WebSocket-Protocol";
private static final String HEADER_VERSION = "Sec-WebSocket-Version";
private static final String VALUE_VERSION = "13"; // WebSocket's lucky number
private static final Set<String> FORBIDDEN_HEADERS;
@ -106,12 +107,18 @@ final class OpeningHandshake {
if (connectTimeout != null) {
requestBuilder.timeout(connectTimeout);
}
for (Pair<String, String> p : b.getHeaders()) {
if (FORBIDDEN_HEADERS.contains(p.first)) {
throw illegal("Illegal header: " + p.first);
}
requestBuilder.header(p.first, p.second);
}
this.subprotocols = createRequestSubprotocols(b.getSubprotocols());
if (!this.subprotocols.isEmpty()) {
String p = this.subprotocols.stream().collect(Collectors.joining(", "));
requestBuilder.header(HEADER_PROTOCOL, p);
}
requestBuilder.header(HEADER_VERSION, VALUE_VERSION);
requestBuilder.header(HEADER_VERSION, "13"); // WebSocket's lucky number
this.nonce = createNonce();
requestBuilder.header(HEADER_KEY, this.nonce);
// Setting request version to HTTP/1.1 forcibly, since it's not possible
@ -133,11 +140,7 @@ final class OpeningHandshake {
if (s.trim().isEmpty() || !isValidName(s)) {
throw illegal("Bad subprotocol syntax: " + s);
}
if (FORBIDDEN_HEADERS.contains(s)) {
throw illegal("Forbidden header: " + s);
}
boolean unique = sp.add(s);
if (!unique) {
if (!sp.add(s)) {
throw illegal("Duplicating subprotocol: " + s);
}
}
@ -176,7 +179,7 @@ final class OpeningHandshake {
CompletableFuture<Result> send() {
return client.sendAsync(this.request, BodyHandler.<Void>discard(null))
.thenCompose(this::resultFrom);
.thenCompose(this::resultFrom);
}
/*
@ -283,7 +286,6 @@ final class OpeningHandshake {
private static String requireSingle(HttpHeaders responseHeaders,
String headerName)
throws CheckFailedException
{
List<String> values = responseHeaders.allValues(headerName);
if (values.isEmpty()) {

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -60,6 +60,7 @@ import static jdk.incubator.http.internal.websocket.Frame.Opcode.TEXT;
*/
abstract class OutgoingMessage {
// Share per WebSocket?
private static final SecureRandom maskingKeys = new SecureRandom();
protected ByteBuffer[] frame;
@ -71,6 +72,8 @@ abstract class OutgoingMessage {
* convenient moment (up to the point where sentTo is invoked).
*/
protected void contextualize(Context context) {
// masking and charset decoding should be performed here rather than in
// the constructor (as of today)
if (context.isCloseSent()) {
throw new IllegalStateException("Close sent");
}
@ -101,7 +104,7 @@ abstract class OutgoingMessage {
private final boolean isLast;
Text(CharSequence characters, boolean isLast) {
CharsetEncoder encoder = UTF_8.newEncoder();
CharsetEncoder encoder = UTF_8.newEncoder(); // Share per WebSocket?
try {
payload = encoder.encode(CharBuffer.wrap(characters));
} catch (CharacterCodingException e) {
@ -172,11 +175,11 @@ abstract class OutgoingMessage {
Close(int statusCode, CharSequence reason) {
ByteBuffer payload = ByteBuffer.allocate(125)
.putChar((char) statusCode);
.putChar((char) statusCode);
CoderResult result = UTF_8.newEncoder()
.encode(CharBuffer.wrap(reason),
payload,
true);
.encode(CharBuffer.wrap(reason),
payload,
true);
if (result.isOverflow()) {
throw new IllegalArgumentException("Long reason");
} else if (result.isError()) {

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -58,8 +58,8 @@ final class Receiver {
private final Frame.Reader reader = new Frame.Reader();
private final RawChannel.RawEvent event = createHandler();
private final AtomicLong demand = new AtomicLong();
private final CooperativeHandler receiveHandler =
new CooperativeHandler(this::tryDeliver);
private final CooperativeHandler handler =
new CooperativeHandler(this::pushContinuously);
/*
* Used to ensure registering the channel event at most once (i.e. to avoid
* multiple registrations).
@ -72,8 +72,8 @@ final class Receiver {
this.channel = channel;
this.data = channel.initialByteBuffer();
this.frameConsumer = new FrameConsumer(this.messageConsumer);
// To ensure the initial `data` will be read correctly (happens-before)
// after readable.get()
// To ensure the initial non-final `data` will be read correctly
// (happens-before) by reader after executing readable.get()
readable.set(true);
}
@ -88,7 +88,7 @@ final class Receiver {
@Override
public void handle() {
readable.set(true);
receiveHandler.startOrContinue();
handler.handle();
}
};
}
@ -98,7 +98,7 @@ final class Receiver {
throw new IllegalArgumentException("Negative: " + n);
}
demand.accumulateAndGet(n, (p, i) -> p + i < 0 ? Long.MAX_VALUE : p + i);
receiveHandler.startOrContinue();
handler.handle();
}
void acknowledge() {
@ -113,41 +113,21 @@ final class Receiver {
* regardless of the current demand.
*/
void close() {
receiveHandler.stop();
handler.stop();
}
private void tryDeliver() {
if (readable.get() && demand.get() > 0) {
deliverAtMostOne();
private void pushContinuously() {
while (readable.get() && demand.get() > 0 && !handler.isStopped()) {
pushOnce();
}
}
private void deliverAtMostOne() {
if (data == null) {
try {
data = channel.read();
} catch (IOException e) {
readable.set(false);
messageConsumer.onError(e);
return;
}
if (data == null || !data.hasRemaining()) {
readable.set(false);
if (!data.hasRemaining()) {
try {
channel.registerEvent(event);
} catch (IOException e) {
messageConsumer.onError(e);
return;
}
} else if (data == null) {
messageConsumer.onComplete();
}
return;
}
private void pushOnce() {
if (data == null && !readData()) {
return;
}
try {
reader.readFrame(data, frameConsumer);
reader.readFrame(data, frameConsumer); // Pushing frame parts to the consumer
} catch (FailWebSocketException e) {
messageConsumer.onError(e);
return;
@ -156,4 +136,28 @@ final class Receiver {
data = null;
}
}
private boolean readData() {
try {
data = channel.read();
} catch (IOException e) {
messageConsumer.onError(e);
return false;
}
if (data == null) { // EOF
messageConsumer.onComplete();
return false;
} else if (!data.hasRemaining()) { // No data in the socket at the moment
data = null;
readable.set(false);
try {
channel.registerEvent(event);
} catch (IOException e) {
messageConsumer.onError(e);
}
return false;
}
assert data.hasRemaining();
return true;
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2016, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -25,120 +25,72 @@
package jdk.incubator.http.internal.websocket;
import static jdk.incubator.http.WebSocket.CLOSED_ABNORMALLY;
/*
* Utilities and common constants for WebSocket status codes. For more details
* on status codes and their meaning see:
* Utilities for WebSocket status codes.
*
* 1. https://tools.ietf.org/html/rfc6455#section-7.4
* 2. http://www.iana.org/assignments/websocket/websocket.xhtml#close-code-number
*/
final class StatusCodes {
static final int PROTOCOL_ERROR = 1002;
static final int CANNOT_ACCEPT = 1003;
static final int NO_STATUS_CODE = 1005;
static final int NOT_CONSISTENT = 1007;
static final int TOO_BIG = 1009;
static final int NO_EXTENSION = 1010;
static final int SERVICE_RESTART = 1012;
static final int TRY_AGAIN_LATER = 1013;
static final int TLS_HANDSHAKE_FAILURE = 1015;
static final int PROTOCOL_ERROR = 1002;
static final int NO_STATUS_CODE = 1005;
static final int CLOSED_ABNORMALLY = 1006;
static final int NOT_CONSISTENT = 1007;
private StatusCodes() { }
/*
* Returns the given code if it doesn't violate any rules for outgoing
* codes, otherwise throws a CFE with a detailed description.
*/
static int checkOutgoingCode(int code) {
checkCommon(code);
static boolean isLegalToSendFromClient(int code) {
if (!isLegal(code)) {
return false;
}
// Codes from unreserved range
if (code > 4999) {
throw new CheckFailedException("Unspecified: " + code);
return false;
}
if (isNotUserSettable(code)) {
throw new CheckFailedException("Cannot set: " + code);
}
return code;
}
/*
* Returns the given code if it doesn't violate any rules for incoming
* codes, otherwise throws a CFE with a detailed description.
*/
static int checkIncomingCode(int code) {
checkCommon(code);
if (code == NO_EXTENSION) {
throw new CheckFailedException("Bad server code: " + code);
}
return code;
}
private static int checkCommon(int code) {
if (isOutOfRange(code)) {
throw new CheckFailedException("Out of range: " + code);
}
if (isForbidden(code)) {
throw new CheckFailedException("Forbidden: " + code);
}
if (isUnassigned(code)) {
throw new CheckFailedException("Unassigned: " + code);
}
return code;
}
/*
* Returns true if the given code cannot be set by a user of the WebSocket
* API. e.g. this code means something which only a WebSocket implementation
* is responsible for or it doesn't make sense to be send by a WebSocket
* client.
*/
private static boolean isNotUserSettable(int code) {
// Codes below are not allowed to be sent using a WebSocket client API
switch (code) {
case PROTOCOL_ERROR:
case CANNOT_ACCEPT:
case NOT_CONSISTENT:
case TOO_BIG:
case NO_EXTENSION:
case TRY_AGAIN_LATER:
case SERVICE_RESTART:
return true;
default:
case 1003:
case 1009:
case 1010:
case 1012: // code sent by servers
case 1013: // code sent by servers
case 1014: // code sent by servers
return false;
default:
return true;
}
}
/*
* Returns true if the given code cannot appear on the wire. It's always an
* error to send a frame with such a code or to receive one.
*/
private static boolean isForbidden(int code) {
static boolean isLegalToReceiveFromServer(int code) {
if (!isLegal(code)) {
return false;
}
return code != 1010; // code sent by clients
}
private static boolean isLegal(int code) {
// 2-byte unsigned integer excluding first 1000 numbers from the range
// [0, 999] which are never used
if (code < 1000 || code > 65535) {
return false;
}
// Codes from the range below has no known meaning under the WebSocket
// specification (i.e. unassigned/undefined)
if ((code >= 1016 && code <= 2999) || code == 1004) {
return false;
}
// Codes below cannot appear on the wire. It's always an error either
// to send a frame with such a code or to receive one.
switch (code) {
case NO_STATUS_CODE:
case CLOSED_ABNORMALLY:
case TLS_HANDSHAKE_FAILURE:
return true;
default:
case 1015:
return false;
default:
return true;
}
}
/*
* Returns true if the given code has no known meaning under the WebSocket
* specification (i.e. unassigned/undefined).
*/
private static boolean isUnassigned(int code) {
return (code >= 1016 && code <= 2999) || code == 1004 || code == 1014;
}
/*
* Returns true if the given code is not in domain of status codes:
*
* 2-byte unsigned integer minus first 1000 numbers from the range [0, 999]
* that are never used.
*/
private static boolean isOutOfRange(int code) {
return code < 1000 || code > 65535;
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -37,11 +37,12 @@ import static java.util.Objects.requireNonNull;
*
* No matter whether the message has been fully sent or an error has occurred,
* the transmitter reports the outcome to the supplied handler and becomes ready
* to accept a new message. Until then, it is considered "busy" and an
* IllegalStateException will be thrown on each attempt to invoke send.
* to accept a new message. Until then, the transmitter is considered "busy" and
* an IllegalStateException will be thrown on each attempt to invoke send.
*/
final class Transmitter {
/* This flag is used solely for assertions */
private final AtomicBoolean busy = new AtomicBoolean();
private OutgoingMessage message;
private Consumer<Exception> completionHandler;
@ -53,9 +54,10 @@ final class Transmitter {
this.event = createHandler();
}
/*
* The supplied handler may be invoked in the calling thread, so watch out
* for stack overflow.
/**
* The supplied handler may be invoked in the calling thread.
* A {@code StackOverflowError} may thus occur if there's a possibility
* that this method is called again by the supplied handler.
*/
void send(OutgoingMessage message, Consumer<Exception> completionHandler) {
requireNonNull(message);
@ -86,8 +88,9 @@ final class Transmitter {
private void send0(OutgoingMessage message, Consumer<Exception> handler) {
boolean b = busy.get();
assert b; // Please don't inline this, as busy.get() has memory
// visibility effects and we don't want the correctness
// of the algorithm to depend on assertions flag
// visibility effects and we don't want the program behaviour
// to depend on whether the assertions are turned on
// or turned off
try {
boolean sent = message.sendTo(channel);
if (sent) {

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -51,9 +51,9 @@ import java.util.function.Function;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static jdk.incubator.http.internal.common.Pair.pair;
import static jdk.incubator.http.internal.websocket.StatusCodes.CLOSED_ABNORMALLY;
import static jdk.incubator.http.internal.websocket.StatusCodes.NO_STATUS_CODE;
import static jdk.incubator.http.internal.websocket.StatusCodes.TLS_HANDSHAKE_FAILURE;
import static jdk.incubator.http.internal.websocket.StatusCodes.checkOutgoingCode;
import static jdk.incubator.http.internal.websocket.StatusCodes.isLegalToSendFromClient;
/*
* A WebSocket client.
@ -74,8 +74,8 @@ final class WebSocketImpl implements WebSocket {
private final AtomicBoolean outstandingSend = new AtomicBoolean();
private final CooperativeHandler sendHandler =
new CooperativeHandler(this::sendFirst);
private final Queue<Pair<OutgoingMessage, Consumer<Exception>>> queue =
new ConcurrentLinkedQueue<>();
private final Queue<Pair<OutgoingMessage, CompletableFuture<WebSocket>>>
queue = new ConcurrentLinkedQueue<>();
private final Context context = new OutgoingMessage.Context();
private final Transmitter transmitter;
private final Receiver receiver;
@ -110,6 +110,9 @@ final class WebSocketImpl implements WebSocket {
r.subprotocol,
r.channel,
b.getListener());
// The order of calls might cause a subtle effects, like CF will be
// returned from the buildAsync _after_ onOpen has been signalled.
// This means if onOpen is lengthy, it might cause some problems.
ws.signalOpen();
return ws;
};
@ -125,7 +128,8 @@ final class WebSocketImpl implements WebSocket {
WebSocketImpl(URI uri,
String subprotocol,
RawChannel channel,
Listener listener) {
Listener listener)
{
this.uri = requireNonNull(uri);
this.subprotocol = requireNonNull(subprotocol);
this.channel = requireNonNull(channel);
@ -182,15 +186,17 @@ final class WebSocketImpl implements WebSocket {
* Processes a Close event that came from the channel. Invoked at most once.
*/
private void processClose(int statusCode, String reason) {
assert statusCode != TLS_HANDSHAKE_FAILURE; // TLS problems happen long before WebSocket is alive
receiver.close();
try {
channel.shutdownInput();
} catch (IOException e) {
Log.logError(e);
}
boolean wasComplete = !closeReceived.complete(null);
if (wasComplete) {
boolean alreadyCompleted = !closeReceived.complete(null);
if (alreadyCompleted) {
// This CF is supposed to be completed only once, the first time a
// Close message is received. No further messages are pulled from
// the socket.
throw new InternalError();
}
int code;
@ -261,19 +267,17 @@ final class WebSocketImpl implements WebSocket {
@Override
public CompletableFuture<WebSocket> sendClose(int statusCode,
String reason) {
try {
checkOutgoingCode(statusCode);
} catch (CheckFailedException e) {
IllegalArgumentException ex = new IllegalArgumentException(
"Bad status code: " + statusCode, e);
failedFuture(ex);
if (!isLegalToSendFromClient(statusCode)) {
return failedFuture(
new IllegalArgumentException("statusCode: " + statusCode));
}
return enqueueClose(new Close(statusCode, reason));
}
@Override
public CompletableFuture<WebSocket> sendClose() {
return enqueueClose(new Close());
Close msg;
try {
msg = new Close(statusCode, reason);
} catch (IllegalArgumentException e) {
return failedFuture(e);
}
return enqueueClose(msg);
}
/*
@ -288,8 +292,8 @@ final class WebSocketImpl implements WebSocket {
} catch (IOException e) {
Log.logError(e);
}
boolean wasComplete = !closeSent.complete(null);
if (wasComplete) {
boolean alreadyCompleted = !closeSent.complete(null);
if (alreadyCompleted) {
// Shouldn't happen as this callback must run at most once
throw new InternalError();
}
@ -316,40 +320,41 @@ final class WebSocketImpl implements WebSocket {
private CompletableFuture<WebSocket> enqueue(OutgoingMessage m) {
CompletableFuture<WebSocket> cf = new CompletableFuture<>();
Consumer<Exception> h = e -> {
if (e == null) {
cf.complete(WebSocketImpl.this);
sendHandler.startOrContinue();
} else {
// what if this is not a users message? (must be different entry points for different messages)
// TODO: think about correct behaviour in the face of error in
// the queue, for now it seems like the best solution is to
// deliver the error and stop
cf.completeExceptionally(e);
}
};
queue.add(pair(m, h)); // Always returns true
sendHandler.startOrContinue();
boolean added = queue.add(pair(m, cf));
if (!added) {
// The queue is supposed to be unbounded
throw new InternalError();
}
sendHandler.handle();
return cf;
}
private void sendFirst() {
Pair<OutgoingMessage, Consumer<Exception>> p = queue.poll();
/*
* This is the main sending method. It may be run in different threads,
* but never concurrently.
*/
private void sendFirst(Runnable whenSent) {
Pair<OutgoingMessage, CompletableFuture<WebSocket>> p = queue.poll();
if (p == null) {
whenSent.run();
return;
}
OutgoingMessage message = p.first;
Consumer<Exception> h = p.second;
CompletableFuture<WebSocket> cf = p.second;
try {
// At this point messages are finally ordered and will be written
// one by one in a mutually exclusive fashion; thus it's a pretty
// convenient place to contextualize them
message.contextualize(context);
Consumer<Exception> h = e -> {
if (e == null) {
cf.complete(WebSocketImpl.this);
} else {
cf.completeExceptionally(e);
}
sendHandler.handle();
whenSent.run();
};
transmitter.send(message, h);
} catch (Exception t) {
h.accept(t);
cf.completeExceptionally(t);
}
}
@ -381,7 +386,7 @@ final class WebSocketImpl implements WebSocket {
@Override
public String toString() {
return super.toString()
+ "[" + (closed.get() ? "OPEN" : "CLOSED") + "]: " + uri
+ "[" + (closed.get() ? "CLOSED" : "OPEN") + "]: " + uri
+ (!subprotocol.isEmpty() ? ", subprotocol=" + subprotocol : "");
}
@ -476,7 +481,9 @@ final class WebSocketImpl implements WebSocket {
int code = ((FailWebSocketException) error).getStatusCode();
enqueueClose(new Close(code, ""))
.whenComplete((r, e) -> {
ex.addSuppressed(e);
if (e != null) {
ex.addSuppressed(e);
}
try {
channel.close();
} catch (IOException e1) {

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2016, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -26,5 +26,6 @@
* @bug 8151299 8164704
* @modules jdk.incubator.httpclient
* @run testng jdk.incubator.httpclient/jdk.incubator.http.SelectorTest
* @run testng jdk.incubator.httpclient/jdk.incubator.http.RawChannelTest
* @run testng jdk.incubator.httpclient/jdk.incubator.http.ResponseHeadersTest
*/

View File

@ -0,0 +1,287 @@
/*
* Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package jdk.incubator.http;
import jdk.incubator.http.internal.websocket.RawChannel;
import org.testng.annotations.Test;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import static jdk.incubator.http.HttpResponse.BodyHandler.discard;
import static org.testng.Assert.assertEquals;
/*
* This test exercises mechanics of _independent_ reads and writes on the
* RawChannel. It verifies that the underlying implementation can manage more
* than a single type of notifications at the same time.
*/
public class RawChannelTest {
private final AtomicLong clientWritten = new AtomicLong();
private final AtomicLong serverWritten = new AtomicLong();
private final AtomicLong clientRead = new AtomicLong();
private final AtomicLong serverRead = new AtomicLong();
/*
* Since at this level we don't have any control over the low level socket
* parameters, this latch ensures a write to the channel will stall at least
* once (socket's send buffer filled up).
*/
private final CountDownLatch writeStall = new CountDownLatch(1);
/*
* This one works similarly by providing means to ensure a read from the
* channel will stall at least once (no more data available on the socket).
*/
private final CountDownLatch readStall = new CountDownLatch(1);
private final AtomicInteger writeHandles = new AtomicInteger();
private final AtomicInteger readHandles = new AtomicInteger();
private final CountDownLatch exit = new CountDownLatch(1);
@Test
public void test() throws Exception {
try (ServerSocket server = new ServerSocket(0)) {
int port = server.getLocalPort();
new TestServer(server).start();
final RawChannel chan = channelOf(port);
// It's very important not to forget the initial bytes, possibly
// left from the HTTP thingy
int initialBytes = chan.initialByteBuffer().remaining();
print("RawChannel has %s initial bytes", initialBytes);
clientRead.addAndGet(initialBytes);
chan.registerEvent(new RawChannel.RawEvent() {
private final ByteBuffer reusableBuffer = ByteBuffer.allocate(32768);
@Override
public int interestOps() {
return SelectionKey.OP_WRITE;
}
@Override
public void handle() {
int i = writeHandles.incrementAndGet();
print("OP_WRITE #%s", i);
if (i > 3) { // Fill up the send buffer not more than 3 times
try {
chan.shutdownOutput();
} catch (IOException e) {
e.printStackTrace();
}
return;
}
long total = 0;
try {
long n;
do {
ByteBuffer[] array = {reusableBuffer.slice()};
n = chan.write(array, 0, 1);
total += n;
} while (n > 0);
print("OP_WRITE clogged SNDBUF with %s bytes", total);
clientWritten.addAndGet(total);
chan.registerEvent(this);
writeStall.countDown(); // signal send buffer is full
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
});
chan.registerEvent(new RawChannel.RawEvent() {
@Override
public int interestOps() {
return SelectionKey.OP_READ;
}
@Override
public void handle() {
int i = readHandles.incrementAndGet();
print("OP_READ #%s", i);
ByteBuffer read = null;
long total = 0;
while (true) {
try {
read = chan.read();
} catch (IOException e) {
e.printStackTrace();
}
if (read == null) {
print("OP_READ EOF");
break;
} else if (!read.hasRemaining()) {
print("OP_READ stall");
try {
chan.registerEvent(this);
} catch (IOException e) {
e.printStackTrace();
}
readStall.countDown();
break;
}
int r = read.remaining();
total += r;
clientRead.addAndGet(r);
}
print("OP_READ read %s bytes", total);
}
});
exit.await(); // All done, we need to compare results:
assertEquals(clientRead.get(), serverWritten.get());
assertEquals(serverRead.get(), clientWritten.get());
}
}
private static RawChannel channelOf(int port) throws Exception {
URI uri = URI.create("http://127.0.0.1:" + port + "/");
print("raw channel to %s", uri.toString());
HttpRequest req = HttpRequest.newBuilder(uri).build();
HttpResponse<?> r = HttpClient.newHttpClient().send(req, discard(null));
r.body();
return ((HttpResponseImpl) r).rawChannel();
}
private class TestServer extends Thread { // Powered by Slowpokes
private final ServerSocket server;
TestServer(ServerSocket server) throws IOException {
this.server = server;
}
@Override
public void run() {
try (Socket s = server.accept()) {
InputStream is = s.getInputStream();
OutputStream os = s.getOutputStream();
processHttp(is, os);
Thread reader = new Thread(() -> {
try {
long n = readSlowly(is);
print("Server read %s bytes", n);
serverRead.addAndGet(n);
s.shutdownInput();
} catch (Exception e) {
e.printStackTrace();
}
});
Thread writer = new Thread(() -> {
try {
long n = writeSlowly(os);
print("Server written %s bytes", n);
serverWritten.addAndGet(n);
s.shutdownOutput();
} catch (Exception e) {
e.printStackTrace();
}
});
reader.start();
writer.start();
reader.join();
writer.join();
} catch (Exception e) {
e.printStackTrace();
} finally {
exit.countDown();
}
}
private void processHttp(InputStream is, OutputStream os)
throws IOException
{
os.write("HTTP/1.1 200 OK\r\nContent-length: 0\r\n\r\n".getBytes());
byte[] buf = new byte[1024];
String s = "";
while (true) {
int n = is.read(buf);
if (n <= 0) {
throw new RuntimeException("Unexpected end of request");
}
s = s + new String(buf, 0, n);
if (s.contains("\r\n\r\n")) {
break;
}
}
}
private long writeSlowly(OutputStream os) throws Exception {
byte[] first = byteArrayOfSize(1024);
long total = first.length;
os.write(first);
// Let's wait for the signal from the raw channel that its read has
// stalled, and then continue sending a bit more stuff
readStall.await();
for (int i = 0; i < 32; i++) {
byte[] b = byteArrayOfSize(1024);
os.write(b);
total += b.length;
TimeUnit.MILLISECONDS.sleep(1);
}
return total;
}
private long readSlowly(InputStream is) throws Exception {
// Wait for the raw channel to fill up the its send buffer
writeStall.await();
long overall = 0;
byte[] array = new byte[1024];
for (int n = 0; n != -1; n = is.read(array)) {
TimeUnit.MILLISECONDS.sleep(1);
overall += n;
}
return overall;
}
}
private static void print(String format, Object... args) {
System.out.println(Thread.currentThread() + ": " + String.format(format, args));
}
private static byte[] byteArrayOfSize(int bound) {
return new byte[new Random().nextInt(1 + bound)];
}
}