From c74eb4302e985e1e88d7918e2ecea6477a25971c Mon Sep 17 00:00:00 2001 From: Pavel Rappo Date: Wed, 10 May 2017 12:36:14 +0100 Subject: [PATCH] 8179021: Latest bugfixes to WebSocket/HPACK from the sandbox repo Reviewed-by: dfuchs --- .../java.base/share/classes/module-info.java | 3 +- .../jdk/incubator/http/HttpRequestImpl.java | 4 +- .../http/PlainTunnelingConnection.java | 6 +- .../classes/jdk/incubator/http/WebSocket.java | 95 +----- .../http/internal/hpack/Decoder.java | 5 +- .../http/internal/hpack/HeaderTable.java | 7 +- .../http/internal/websocket/BuilderImpl.java | 22 +- .../websocket/CooperativeHandler.java | 196 +++++++++--- .../http/internal/websocket/Frame.java | 5 +- .../internal/websocket/FrameConsumer.java | 22 +- .../internal/websocket/OpeningHandshake.java | 22 +- .../internal/websocket/OutgoingMessage.java | 15 +- .../http/internal/websocket/Receiver.java | 74 ++--- .../http/internal/websocket/StatusCodes.java | 136 +++------ .../http/internal/websocket/Transmitter.java | 19 +- .../internal/websocket/WebSocketImpl.java | 103 ++++--- .../java/net/httpclient/whitebox/Driver.java | 3 +- .../jdk/incubator/http/RawChannelTest.java | 287 ++++++++++++++++++ 18 files changed, 656 insertions(+), 368 deletions(-) create mode 100644 jdk/test/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/RawChannelTest.java diff --git a/jdk/src/java.base/share/classes/module-info.java b/jdk/src/java.base/share/classes/module-info.java index 838f215d888..e637bc34d7d 100644 --- a/jdk/src/java.base/share/classes/module-info.java +++ b/jdk/src/java.base/share/classes/module-info.java @@ -190,7 +190,8 @@ module java.base { jdk.unsupported; exports jdk.internal.vm.annotation to jdk.unsupported, - jdk.internal.vm.ci; + jdk.internal.vm.ci, + jdk.incubator.httpclient; exports jdk.internal.util.jar to jdk.jartool, jdk.jdeps, diff --git a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpRequestImpl.java b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpRequestImpl.java index dcf1de7125a..179cb2d7ad2 100644 --- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpRequestImpl.java +++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpRequestImpl.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -123,7 +123,7 @@ class HttpRequestImpl extends HttpRequest implements WebSocketRequest { this.method = method; this.systemHeaders = new HttpHeadersImpl(); this.userHeaders = ImmutableHeaders.empty(); - this.uri = null; + this.uri = URI.create("socket://" + authority.getHostString() + ":" + Integer.toString(authority.getPort()) + "/"); this.requestProcessor = HttpRequest.noBody(); this.authority = authority; this.secure = false; diff --git a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainTunnelingConnection.java b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainTunnelingConnection.java index 6432804bd9f..10549315a3d 100644 --- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainTunnelingConnection.java +++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainTunnelingConnection.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -27,6 +27,7 @@ package jdk.incubator.http; import jdk.incubator.http.internal.common.ByteBufferReference; import jdk.incubator.http.internal.common.MinimalFuture; +import jdk.incubator.http.HttpResponse.BodyHandler; import java.io.IOException; import java.net.InetSocketAddress; @@ -72,7 +73,8 @@ class PlainTunnelingConnection extends HttpConnection { public void connect() throws IOException, InterruptedException { delegate.connect(); HttpRequestImpl req = new HttpRequestImpl("CONNECT", client, address); - Exchange connectExchange = new Exchange<>(req, null); + MultiExchange mul = new MultiExchange<>(req, client, BodyHandler.discard(null)); + Exchange connectExchange = new Exchange<>(req, mul); Response r = connectExchange.responseImpl(delegate); if (r.statusCode() != 200) { throw new IOException("Tunnel failed"); diff --git a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/WebSocket.java b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/WebSocket.java index f657655f408..2433e2499e7 100644 --- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/WebSocket.java +++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/WebSocket.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -43,7 +43,7 @@ import java.util.concurrent.CompletionStage; *

To create a {@code WebSocket} use a {@linkplain HttpClient#newWebSocketBuilder( * URI, Listener) builder}. Once a {@code WebSocket} is built, it's ready * to send and receive messages. When the {@code WebSocket} is no longer needed - * it must be closed: a Close message must both be {@linkplain #sendClose() + * it must be closed: a Close message must both be {@linkplain #sendClose * sent} and {@linkplain Listener#onClose(WebSocket, int, String) received}. * The {@code WebSocket} may be also closed {@linkplain #abort() abruptly}. * @@ -93,17 +93,6 @@ public interface WebSocket { */ int NORMAL_CLOSURE = 1000; - /** - * The WebSocket Close message status code ({@value}), is - * designated for use in applications expecting a status code to indicate - * that the connection was closed abnormally, e.g., without sending or - * receiving a Close message. - * - * @see Listener#onClose(WebSocket, int, String) - * @see #abort() - */ - int CLOSED_ABNORMALLY = 1006; - /** * A builder for creating {@code WebSocket} instances. * {@Incubating} @@ -509,7 +498,7 @@ public interface WebSocket { * *

The {@code WebSocket} will close at the earliest of completion of * the returned {@code CompletionStage} or sending a Close message. In - * particular, if a Close message has been {@link WebSocket#sendClose() + * particular, if a Close message has been {@linkplain WebSocket#sendClose * sent} before, then this invocation completes the closing handshake * and by the time this method is invoked, the {@code WebSocket} will * have been closed. @@ -642,44 +631,6 @@ public interface WebSocket { */ CompletableFuture sendText(CharSequence message, boolean isLast); - /** - * Sends a whole Text message with characters from the given {@code - * CharSequence}. - * - *

This is a convenience method. For the general case, use {@link - * #sendText(CharSequence, boolean)}. - * - *

Returns a {@code CompletableFuture} which completes - * normally when the message has been sent or completes exceptionally if an - * error occurs. - * - *

The {@code CharSequence} must not be modified until the returned - * {@code CompletableFuture} completes (either normally or exceptionally). - * - *

The returned {@code CompletableFuture} can complete exceptionally - * with: - *

    - *
  • {@link IllegalArgumentException} - - * if {@code message} is a malformed UTF-16 sequence - *
  • {@link IllegalStateException} - - * if the {@code WebSocket} is closed; - * or if a Close message has been sent; - * or if there is an outstanding send operation; - * or if a previous Binary message was sent with {@code isLast == false} - *
  • {@link IOException} - - * if an I/O error occurs during this operation; - * or if the {@code WebSocket} has been closed due to an error; - *
- * - * @param message - * the message - * - * @return a {@code CompletableFuture} with this {@code WebSocket} - */ - default CompletableFuture sendText(CharSequence message) { - return sendText(message, true); - } - /** * Sends a Binary message with bytes from the given {@code ByteBuffer}. * @@ -831,46 +782,9 @@ public interface WebSocket { * the reason * * @return a {@code CompletableFuture} with this {@code WebSocket} - * - * @see #sendClose() */ CompletableFuture sendClose(int statusCode, String reason); - /** - * Sends an empty Close message. - * - *

When this method has been invoked, no further messages can be sent. - * - *

For more details on Close message see RFC 6455 section - * 5.5.1. Close - * - *

The method returns a {@code CompletableFuture} which - * completes normally when the message has been sent or completes - * exceptionally if an error occurs. - * - *

The returned {@code CompletableFuture} can complete exceptionally - * with: - *

    - *
  • {@link IOException} - - * if an I/O error occurs during this operation; - * or the {@code WebSocket} has been closed due to an error - *
- * - *

If this method has already been invoked or the {@code WebSocket} is - * closed, then subsequent invocations of this method have no effect and the - * returned {@code CompletableFuture} completes normally. - * - *

If a Close message has been {@linkplain Listener#onClose(WebSocket, - * int, String) received} before, then this invocation completes the closing - * handshake and by the time the returned {@code CompletableFuture} - * completes, the {@code WebSocket} will have been closed. - * - * @return a {@code CompletableFuture} with this {@code WebSocket} - * - * @see #sendClose(int, String) - */ - CompletableFuture sendClose(); - /** * Allows {@code n} more messages to be received by the {@link Listener * Listener}. @@ -928,8 +842,7 @@ public interface WebSocket { * state. * *

As the result {@link Listener#onClose(WebSocket, int, String) - * Listener.onClose} will be invoked with the status code {@link - * #CLOSED_ABNORMALLY} unless either {@code onClose} or {@link + * Listener.onClose} will be invoked unless either {@code onClose} or {@link * Listener#onError(WebSocket, Throwable) onError} has been invoked before. * In which case no additional invocation will happen. * diff --git a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/hpack/Decoder.java b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/hpack/Decoder.java index 179bfc616d7..9841204e50c 100644 --- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/hpack/Decoder.java +++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/hpack/Decoder.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2014, 2016, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2014, 2017, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -24,6 +24,8 @@ */ package jdk.incubator.http.internal.hpack; +import jdk.internal.vm.annotation.Stable; + import java.io.IOException; import java.io.UncheckedIOException; import java.net.ProtocolException; @@ -60,6 +62,7 @@ import static java.util.Objects.requireNonNull; */ public final class Decoder { + @Stable private static final State[] states = new State[256]; static { diff --git a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/hpack/HeaderTable.java b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/hpack/HeaderTable.java index 508c214b958..970ad42b54d 100644 --- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/hpack/HeaderTable.java +++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/hpack/HeaderTable.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2014, 2016, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2014, 2017, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -24,6 +24,8 @@ */ package jdk.incubator.http.internal.hpack; +import jdk.internal.vm.annotation.Stable; + import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; @@ -40,6 +42,7 @@ import static java.lang.String.format; // final class HeaderTable { + @Stable private static final HeaderField[] staticTable = { null, // To make index 1-based, instead of 0-based new HeaderField(":authority"), @@ -110,7 +113,7 @@ final class HeaderTable { private static final Map> staticIndexes; static { - staticIndexes = new HashMap<>(STATIC_TABLE_LENGTH); + staticIndexes = new HashMap<>(STATIC_TABLE_LENGTH); // TODO: Map.of for (int i = 1; i <= STATIC_TABLE_LENGTH; i++) { HeaderField f = staticTable[i]; Map values = staticIndexes diff --git a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/BuilderImpl.java b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/BuilderImpl.java index 15abf5e76bd..7f662922bc0 100644 --- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/BuilderImpl.java +++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/BuilderImpl.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -46,7 +46,7 @@ public final class BuilderImpl implements Builder { private final HttpClient client; private final URI uri; private final Listener listener; - private final List> headers = new LinkedList<>(); + private final Collection> headers = new LinkedList<>(); private final Collection subprotocols = new LinkedList<>(); private Duration timeout; @@ -65,17 +65,18 @@ public final class BuilderImpl implements Builder { } @Override - public Builder subprotocols(String mostPreferred, String... lesserPreferred) + public Builder subprotocols(String mostPreferred, + String... lesserPreferred) { requireNonNull(mostPreferred, "mostPreferred"); requireNonNull(lesserPreferred, "lesserPreferred"); List subprotocols = new LinkedList<>(); + subprotocols.add(mostPreferred); for (int i = 0; i < lesserPreferred.length; i++) { String p = lesserPreferred[i]; requireNonNull(p, "lesserPreferred[" + i + "]"); subprotocols.add(p); } - subprotocols.add(0, mostPreferred); this.subprotocols.clear(); this.subprotocols.addAll(subprotocols); return this; @@ -98,20 +99,9 @@ public final class BuilderImpl implements Builder { Listener getListener() { return listener; } - List> getHeaders() { return headers; } + Collection> getHeaders() { return headers; } Collection getSubprotocols() { return subprotocols; } Duration getConnectTimeout() { return timeout; } - - @Override - public String toString() { - return "WebSocket.Builder{" - + ", uri=" + uri - + ", listener=" + listener - + (!headers.isEmpty() ? ", headers=" + headers : "") - + (!subprotocols.isEmpty() ? ", subprotocols=" + subprotocols : "") - + ( timeout != null ? ", connectTimeout=" + timeout : "") - + '}'; - } } diff --git a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/CooperativeHandler.java b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/CooperativeHandler.java index 5e36a487a12..1ccec2d4e8a 100644 --- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/CooperativeHandler.java +++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/CooperativeHandler.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -25,70 +25,184 @@ package jdk.incubator.http.internal.websocket; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import static java.util.Objects.requireNonNull; -final class CooperativeHandler { +/* + * A synchronization aid that assists a number of parties in running a task + * in a mutually exclusive fashion. + * + * To run the task, a party invokes `handle`. To permanently prevent the task + * from subsequent runs, the party invokes `stop`. + * + * The parties do not have to operate in different threads. + * + * The task can be either synchronous or asynchronous. + * + * If the task is synchronous, it is represented with `Runnable`. + * The handler invokes `Runnable.run` to run the task. + * + * If the task is asynchronous, it is represented with `Consumer`. + * The handler invokes `Consumer.accept(end)` to begin the task. The task + * invokes `end.run()` when it has ended. + * + * The next run of the task will not begin until the previous run has finished. + * + * The task may invoke `handle()` by itself, it's a normal situation. + */ +public final class CooperativeHandler { - private static final long CONTINUE = 0; - private static final long OFF = 1; - private static final long ON = 2; - private static final long STOP = 4; + /* + Since the task is fixed and known beforehand, no blocking synchronization + (locks, queues, etc.) is required. The job can be done solely using + nonblocking primitives. - private final AtomicLong state = new AtomicLong(OFF); + The machinery below addresses two problems: - private final Runnable task; + 1. Running the task in a sequential order (no concurrent runs): - CooperativeHandler(Runnable task) { - this.task = requireNonNull(task); + begin, end, begin, end... + + 2. Avoiding indefinite recursion: + + begin + end + begin + end + ... + + Problem #1 is solved with a finite state machine with 4 states: + + BEGIN, AGAIN, END, and STOP. + + Problem #2 is solved with a "state modifier" OFFLOAD. + + Parties invoke `handle()` to signal the task must run. A party that has + invoked `handle()` either begins the task or exploits the party that is + either beginning the task or ending it. + + The party that is trying to end the task either ends it or begins it + again. + + To avoid indefinite recursion, before re-running the task tryEnd() sets + OFFLOAD bit, signalling to its "child" tryEnd() that this ("parent") + tryEnd() is available and the "child" must offload the task on to the + "parent". Then a race begins. Whichever invocation of tryEnd() manages + to unset OFFLOAD bit first does not do the work. + + There is at most 1 thread that is beginning the task and at most 2 + threads that are trying to end it: "parent" and "child". In case of a + synchronous task "parent" and "child" are the same thread. + */ + + private static final int OFFLOAD = 1; + private static final int AGAIN = 2; + private static final int BEGIN = 4; + private static final int STOP = 8; + private static final int END = 16; + + private final AtomicInteger state = new AtomicInteger(END); + private final Consumer begin; + + public CooperativeHandler(Runnable task) { + this(asyncOf(task)); + } + + public CooperativeHandler(Consumer begin) { + this.begin = requireNonNull(begin); } /* - * Causes the task supplied to the constructor to run. The task may be run - * by this thread as well as by any other that has invoked this method. + * Runs the task (though maybe by a different party). * * The recursion which is possible here will have the maximum depth of 1: * - * task.run() - * this.startOrContinue() - * task.run() + * this.handle() + * begin.accept() + * this.handle() */ - void startOrContinue() { - long s; + public void handle() { while (true) { - s = state.get(); - if (s == OFF && state.compareAndSet(OFF, ON)) { - // No one is running the task, we are going to run it - break; - } - if (s == ON && state.compareAndSet(ON, CONTINUE)) { - // Some other thread is running the task. We have managed to - // update the state, it will be surely noticed by that thread. - return; - } - if (s == CONTINUE || s == STOP) { + int s = state.get(); + if (s == END) { + if (state.compareAndSet(END, BEGIN)) { + break; + } + } else if ((s & BEGIN) != 0) { + // Tries to change the state to AGAIN, preserving OFFLOAD bit + if (state.compareAndSet(s, AGAIN | (s & OFFLOAD))) { + return; + } + } else if ((s & AGAIN) != 0 || s == STOP) { return; + } else { + throw new InternalError(String.valueOf(s)); } } + begin.accept(this::tryEnd); + } + + private void tryEnd() { while (true) { - task.run(); - // State checks are ordered by the probability of expected values - // (it might be different in different usage patterns, say, when - // invocations to `startOrContinue()` are concurrent) - if (state.compareAndSet(ON, OFF)) { - break; // The state hasn't changed, all done + int s; + while (((s = state.get()) & OFFLOAD) != 0) { + // Tries to offload ending of the task to the parent + if (state.compareAndSet(s, s & ~OFFLOAD)) { + return; + } } - if (state.compareAndSet(CONTINUE, ON)) { - continue; + while (true) { + if (s == BEGIN) { + if (state.compareAndSet(BEGIN, END)) { + return; + } + } else if (s == AGAIN) { + if (state.compareAndSet(AGAIN, BEGIN | OFFLOAD)) { + break; + } + } else if (s == STOP) { + return; + } else { + throw new InternalError(String.valueOf(s)); + } + s = state.get(); } - // Other threads can change the state from CONTINUE to STOP only - // So if it's not ON and not CONTINUE, it can only be STOP - break; + begin.accept(this::tryEnd); } } - void stop() { + /* + * Checks whether or not this handler has been permanently stopped. + * + * Should be used from inside the task to poll the status of the handler, + * pretty much the same way as it is done for threads: + * + * if (!Thread.currentThread().isInterrupted()) { + * ... + * } + */ + public boolean isStopped() { + return state.get() == STOP; + } + + /* + * Signals this handler to ignore subsequent invocations to `handle()`. + * + * If the task has already begun, this invocation will not affect it, + * unless the task itself uses `isStopped()` method to check the state + * of the handler. + */ + public void stop() { state.set(STOP); } + + private static Consumer asyncOf(Runnable task) { + requireNonNull(task); + return ender -> { + task.run(); + ender.run(); + }; + } } diff --git a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Frame.java b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Frame.java index 813693aff8f..47a0ff21bde 100644 --- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Frame.java +++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Frame.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -25,6 +25,8 @@ package jdk.incubator.http.internal.websocket; +import jdk.internal.vm.annotation.Stable; + import java.nio.ByteBuffer; import static jdk.incubator.http.internal.common.Utils.dump; @@ -58,6 +60,7 @@ final class Frame { CONTROL_0xE (0xE), CONTROL_0xF (0xF); + @Stable private static final Opcode[] opcodes; static { diff --git a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/FrameConsumer.java b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/FrameConsumer.java index d6b32d704ee..1fe63664499 100644 --- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/FrameConsumer.java +++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/FrameConsumer.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -38,7 +38,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Objects.requireNonNull; import static jdk.incubator.http.internal.common.Utils.dump; import static jdk.incubator.http.internal.websocket.StatusCodes.NO_STATUS_CODE; -import static jdk.incubator.http.internal.websocket.StatusCodes.checkIncomingCode; +import static jdk.incubator.http.internal.websocket.StatusCodes.isLegalToReceiveFromServer; /* * Consumes frame parts and notifies a message consumer, when there is @@ -212,20 +212,20 @@ class FrameConsumer implements Frame.Consumer { } switch (opcode) { case CLOSE: - int statusCode = NO_STATUS_CODE; + char statusCode = NO_STATUS_CODE; String reason = ""; if (payloadLen != 0) { int len = binaryData.remaining(); assert 2 <= len && len <= 125 : dump(len, payloadLen); - try { - statusCode = checkIncomingCode(binaryData.getChar()); - reason = UTF_8.newDecoder().decode(binaryData).toString(); - } catch (CheckFailedException e) { - throw new FailWebSocketException("Incorrect status code") - .initCause(e); - } catch (CharacterCodingException e) { + statusCode = binaryData.getChar(); + if (!isLegalToReceiveFromServer(statusCode)) { throw new FailWebSocketException( - "Close reason is a malformed UTF-8 sequence") + "Illegal status code: " + statusCode); + } + try { + reason = UTF_8.newDecoder().decode(binaryData).toString(); + } catch (CharacterCodingException e) { + throw new FailWebSocketException("Illegal close reason") .initCause(e); } } diff --git a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/OpeningHandshake.java b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/OpeningHandshake.java index 460b3b2b011..4af7771d8c2 100644 --- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/OpeningHandshake.java +++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/OpeningHandshake.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -37,6 +37,8 @@ import jdk.incubator.http.HttpRequest; import jdk.incubator.http.HttpResponse; import jdk.incubator.http.HttpResponse.BodyHandler; import jdk.incubator.http.WebSocketHandshakeException; +import jdk.incubator.http.internal.common.Pair; + import java.nio.charset.StandardCharsets; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; @@ -66,7 +68,6 @@ final class OpeningHandshake { private static final String HEADER_KEY = "Sec-WebSocket-Key"; private static final String HEADER_PROTOCOL = "Sec-WebSocket-Protocol"; private static final String HEADER_VERSION = "Sec-WebSocket-Version"; - private static final String VALUE_VERSION = "13"; // WebSocket's lucky number private static final Set FORBIDDEN_HEADERS; @@ -106,12 +107,18 @@ final class OpeningHandshake { if (connectTimeout != null) { requestBuilder.timeout(connectTimeout); } + for (Pair p : b.getHeaders()) { + if (FORBIDDEN_HEADERS.contains(p.first)) { + throw illegal("Illegal header: " + p.first); + } + requestBuilder.header(p.first, p.second); + } this.subprotocols = createRequestSubprotocols(b.getSubprotocols()); if (!this.subprotocols.isEmpty()) { String p = this.subprotocols.stream().collect(Collectors.joining(", ")); requestBuilder.header(HEADER_PROTOCOL, p); } - requestBuilder.header(HEADER_VERSION, VALUE_VERSION); + requestBuilder.header(HEADER_VERSION, "13"); // WebSocket's lucky number this.nonce = createNonce(); requestBuilder.header(HEADER_KEY, this.nonce); // Setting request version to HTTP/1.1 forcibly, since it's not possible @@ -133,11 +140,7 @@ final class OpeningHandshake { if (s.trim().isEmpty() || !isValidName(s)) { throw illegal("Bad subprotocol syntax: " + s); } - if (FORBIDDEN_HEADERS.contains(s)) { - throw illegal("Forbidden header: " + s); - } - boolean unique = sp.add(s); - if (!unique) { + if (!sp.add(s)) { throw illegal("Duplicating subprotocol: " + s); } } @@ -176,7 +179,7 @@ final class OpeningHandshake { CompletableFuture send() { return client.sendAsync(this.request, BodyHandler.discard(null)) - .thenCompose(this::resultFrom); + .thenCompose(this::resultFrom); } /* @@ -283,7 +286,6 @@ final class OpeningHandshake { private static String requireSingle(HttpHeaders responseHeaders, String headerName) - throws CheckFailedException { List values = responseHeaders.allValues(headerName); if (values.isEmpty()) { diff --git a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/OutgoingMessage.java b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/OutgoingMessage.java index 76d7c8db55f..c1dc19f756e 100644 --- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/OutgoingMessage.java +++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/OutgoingMessage.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -60,6 +60,7 @@ import static jdk.incubator.http.internal.websocket.Frame.Opcode.TEXT; */ abstract class OutgoingMessage { + // Share per WebSocket? private static final SecureRandom maskingKeys = new SecureRandom(); protected ByteBuffer[] frame; @@ -71,6 +72,8 @@ abstract class OutgoingMessage { * convenient moment (up to the point where sentTo is invoked). */ protected void contextualize(Context context) { + // masking and charset decoding should be performed here rather than in + // the constructor (as of today) if (context.isCloseSent()) { throw new IllegalStateException("Close sent"); } @@ -101,7 +104,7 @@ abstract class OutgoingMessage { private final boolean isLast; Text(CharSequence characters, boolean isLast) { - CharsetEncoder encoder = UTF_8.newEncoder(); + CharsetEncoder encoder = UTF_8.newEncoder(); // Share per WebSocket? try { payload = encoder.encode(CharBuffer.wrap(characters)); } catch (CharacterCodingException e) { @@ -172,11 +175,11 @@ abstract class OutgoingMessage { Close(int statusCode, CharSequence reason) { ByteBuffer payload = ByteBuffer.allocate(125) - .putChar((char) statusCode); + .putChar((char) statusCode); CoderResult result = UTF_8.newEncoder() - .encode(CharBuffer.wrap(reason), - payload, - true); + .encode(CharBuffer.wrap(reason), + payload, + true); if (result.isOverflow()) { throw new IllegalArgumentException("Long reason"); } else if (result.isError()) { diff --git a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Receiver.java b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Receiver.java index a2a0ea59aab..4168da9b52b 100644 --- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Receiver.java +++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Receiver.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -58,8 +58,8 @@ final class Receiver { private final Frame.Reader reader = new Frame.Reader(); private final RawChannel.RawEvent event = createHandler(); private final AtomicLong demand = new AtomicLong(); - private final CooperativeHandler receiveHandler = - new CooperativeHandler(this::tryDeliver); + private final CooperativeHandler handler = + new CooperativeHandler(this::pushContinuously); /* * Used to ensure registering the channel event at most once (i.e. to avoid * multiple registrations). @@ -72,8 +72,8 @@ final class Receiver { this.channel = channel; this.data = channel.initialByteBuffer(); this.frameConsumer = new FrameConsumer(this.messageConsumer); - // To ensure the initial `data` will be read correctly (happens-before) - // after readable.get() + // To ensure the initial non-final `data` will be read correctly + // (happens-before) by reader after executing readable.get() readable.set(true); } @@ -88,7 +88,7 @@ final class Receiver { @Override public void handle() { readable.set(true); - receiveHandler.startOrContinue(); + handler.handle(); } }; } @@ -98,7 +98,7 @@ final class Receiver { throw new IllegalArgumentException("Negative: " + n); } demand.accumulateAndGet(n, (p, i) -> p + i < 0 ? Long.MAX_VALUE : p + i); - receiveHandler.startOrContinue(); + handler.handle(); } void acknowledge() { @@ -113,41 +113,21 @@ final class Receiver { * regardless of the current demand. */ void close() { - receiveHandler.stop(); + handler.stop(); } - private void tryDeliver() { - if (readable.get() && demand.get() > 0) { - deliverAtMostOne(); + private void pushContinuously() { + while (readable.get() && demand.get() > 0 && !handler.isStopped()) { + pushOnce(); } } - private void deliverAtMostOne() { - if (data == null) { - try { - data = channel.read(); - } catch (IOException e) { - readable.set(false); - messageConsumer.onError(e); - return; - } - if (data == null || !data.hasRemaining()) { - readable.set(false); - if (!data.hasRemaining()) { - try { - channel.registerEvent(event); - } catch (IOException e) { - messageConsumer.onError(e); - return; - } - } else if (data == null) { - messageConsumer.onComplete(); - } - return; - } + private void pushOnce() { + if (data == null && !readData()) { + return; } try { - reader.readFrame(data, frameConsumer); + reader.readFrame(data, frameConsumer); // Pushing frame parts to the consumer } catch (FailWebSocketException e) { messageConsumer.onError(e); return; @@ -156,4 +136,28 @@ final class Receiver { data = null; } } + + private boolean readData() { + try { + data = channel.read(); + } catch (IOException e) { + messageConsumer.onError(e); + return false; + } + if (data == null) { // EOF + messageConsumer.onComplete(); + return false; + } else if (!data.hasRemaining()) { // No data in the socket at the moment + data = null; + readable.set(false); + try { + channel.registerEvent(event); + } catch (IOException e) { + messageConsumer.onError(e); + } + return false; + } + assert data.hasRemaining(); + return true; + } } diff --git a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/StatusCodes.java b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/StatusCodes.java index 17ac991a66d..f262005ec69 100644 --- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/StatusCodes.java +++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/StatusCodes.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -25,120 +25,72 @@ package jdk.incubator.http.internal.websocket; -import static jdk.incubator.http.WebSocket.CLOSED_ABNORMALLY; - /* - * Utilities and common constants for WebSocket status codes. For more details - * on status codes and their meaning see: + * Utilities for WebSocket status codes. * * 1. https://tools.ietf.org/html/rfc6455#section-7.4 * 2. http://www.iana.org/assignments/websocket/websocket.xhtml#close-code-number */ final class StatusCodes { - static final int PROTOCOL_ERROR = 1002; - static final int CANNOT_ACCEPT = 1003; - static final int NO_STATUS_CODE = 1005; - static final int NOT_CONSISTENT = 1007; - static final int TOO_BIG = 1009; - static final int NO_EXTENSION = 1010; - static final int SERVICE_RESTART = 1012; - static final int TRY_AGAIN_LATER = 1013; - static final int TLS_HANDSHAKE_FAILURE = 1015; + static final int PROTOCOL_ERROR = 1002; + static final int NO_STATUS_CODE = 1005; + static final int CLOSED_ABNORMALLY = 1006; + static final int NOT_CONSISTENT = 1007; private StatusCodes() { } - /* - * Returns the given code if it doesn't violate any rules for outgoing - * codes, otherwise throws a CFE with a detailed description. - */ - static int checkOutgoingCode(int code) { - checkCommon(code); + static boolean isLegalToSendFromClient(int code) { + if (!isLegal(code)) { + return false; + } + // Codes from unreserved range if (code > 4999) { - throw new CheckFailedException("Unspecified: " + code); + return false; } - if (isNotUserSettable(code)) { - throw new CheckFailedException("Cannot set: " + code); - } - return code; - } - - /* - * Returns the given code if it doesn't violate any rules for incoming - * codes, otherwise throws a CFE with a detailed description. - */ - static int checkIncomingCode(int code) { - checkCommon(code); - if (code == NO_EXTENSION) { - throw new CheckFailedException("Bad server code: " + code); - } - return code; - } - - private static int checkCommon(int code) { - if (isOutOfRange(code)) { - throw new CheckFailedException("Out of range: " + code); - } - if (isForbidden(code)) { - throw new CheckFailedException("Forbidden: " + code); - } - if (isUnassigned(code)) { - throw new CheckFailedException("Unassigned: " + code); - } - return code; - } - - /* - * Returns true if the given code cannot be set by a user of the WebSocket - * API. e.g. this code means something which only a WebSocket implementation - * is responsible for or it doesn't make sense to be send by a WebSocket - * client. - */ - private static boolean isNotUserSettable(int code) { + // Codes below are not allowed to be sent using a WebSocket client API switch (code) { case PROTOCOL_ERROR: - case CANNOT_ACCEPT: case NOT_CONSISTENT: - case TOO_BIG: - case NO_EXTENSION: - case TRY_AGAIN_LATER: - case SERVICE_RESTART: - return true; - default: + case 1003: + case 1009: + case 1010: + case 1012: // code sent by servers + case 1013: // code sent by servers + case 1014: // code sent by servers return false; + default: + return true; } } - /* - * Returns true if the given code cannot appear on the wire. It's always an - * error to send a frame with such a code or to receive one. - */ - private static boolean isForbidden(int code) { + static boolean isLegalToReceiveFromServer(int code) { + if (!isLegal(code)) { + return false; + } + return code != 1010; // code sent by clients + } + + private static boolean isLegal(int code) { + // 2-byte unsigned integer excluding first 1000 numbers from the range + // [0, 999] which are never used + if (code < 1000 || code > 65535) { + return false; + } + // Codes from the range below has no known meaning under the WebSocket + // specification (i.e. unassigned/undefined) + if ((code >= 1016 && code <= 2999) || code == 1004) { + return false; + } + // Codes below cannot appear on the wire. It's always an error either + // to send a frame with such a code or to receive one. switch (code) { case NO_STATUS_CODE: case CLOSED_ABNORMALLY: - case TLS_HANDSHAKE_FAILURE: - return true; - default: + case 1015: return false; + default: + return true; } } - - /* - * Returns true if the given code has no known meaning under the WebSocket - * specification (i.e. unassigned/undefined). - */ - private static boolean isUnassigned(int code) { - return (code >= 1016 && code <= 2999) || code == 1004 || code == 1014; - } - - /* - * Returns true if the given code is not in domain of status codes: - * - * 2-byte unsigned integer minus first 1000 numbers from the range [0, 999] - * that are never used. - */ - private static boolean isOutOfRange(int code) { - return code < 1000 || code > 65535; - } } diff --git a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Transmitter.java b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Transmitter.java index 083283bc5f3..4991515cbb6 100644 --- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Transmitter.java +++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Transmitter.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -37,11 +37,12 @@ import static java.util.Objects.requireNonNull; * * No matter whether the message has been fully sent or an error has occurred, * the transmitter reports the outcome to the supplied handler and becomes ready - * to accept a new message. Until then, it is considered "busy" and an - * IllegalStateException will be thrown on each attempt to invoke send. + * to accept a new message. Until then, the transmitter is considered "busy" and + * an IllegalStateException will be thrown on each attempt to invoke send. */ final class Transmitter { + /* This flag is used solely for assertions */ private final AtomicBoolean busy = new AtomicBoolean(); private OutgoingMessage message; private Consumer completionHandler; @@ -53,9 +54,10 @@ final class Transmitter { this.event = createHandler(); } - /* - * The supplied handler may be invoked in the calling thread, so watch out - * for stack overflow. + /** + * The supplied handler may be invoked in the calling thread. + * A {@code StackOverflowError} may thus occur if there's a possibility + * that this method is called again by the supplied handler. */ void send(OutgoingMessage message, Consumer completionHandler) { requireNonNull(message); @@ -86,8 +88,9 @@ final class Transmitter { private void send0(OutgoingMessage message, Consumer handler) { boolean b = busy.get(); assert b; // Please don't inline this, as busy.get() has memory - // visibility effects and we don't want the correctness - // of the algorithm to depend on assertions flag + // visibility effects and we don't want the program behaviour + // to depend on whether the assertions are turned on + // or turned off try { boolean sent = message.sendTo(channel); if (sent) { diff --git a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java index 863211d5948..74d1445fc29 100644 --- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java +++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -51,9 +51,9 @@ import java.util.function.Function; import static java.util.Objects.requireNonNull; import static java.util.concurrent.CompletableFuture.failedFuture; import static jdk.incubator.http.internal.common.Pair.pair; +import static jdk.incubator.http.internal.websocket.StatusCodes.CLOSED_ABNORMALLY; import static jdk.incubator.http.internal.websocket.StatusCodes.NO_STATUS_CODE; -import static jdk.incubator.http.internal.websocket.StatusCodes.TLS_HANDSHAKE_FAILURE; -import static jdk.incubator.http.internal.websocket.StatusCodes.checkOutgoingCode; +import static jdk.incubator.http.internal.websocket.StatusCodes.isLegalToSendFromClient; /* * A WebSocket client. @@ -74,8 +74,8 @@ final class WebSocketImpl implements WebSocket { private final AtomicBoolean outstandingSend = new AtomicBoolean(); private final CooperativeHandler sendHandler = new CooperativeHandler(this::sendFirst); - private final Queue>> queue = - new ConcurrentLinkedQueue<>(); + private final Queue>> + queue = new ConcurrentLinkedQueue<>(); private final Context context = new OutgoingMessage.Context(); private final Transmitter transmitter; private final Receiver receiver; @@ -110,6 +110,9 @@ final class WebSocketImpl implements WebSocket { r.subprotocol, r.channel, b.getListener()); + // The order of calls might cause a subtle effects, like CF will be + // returned from the buildAsync _after_ onOpen has been signalled. + // This means if onOpen is lengthy, it might cause some problems. ws.signalOpen(); return ws; }; @@ -125,7 +128,8 @@ final class WebSocketImpl implements WebSocket { WebSocketImpl(URI uri, String subprotocol, RawChannel channel, - Listener listener) { + Listener listener) + { this.uri = requireNonNull(uri); this.subprotocol = requireNonNull(subprotocol); this.channel = requireNonNull(channel); @@ -182,15 +186,17 @@ final class WebSocketImpl implements WebSocket { * Processes a Close event that came from the channel. Invoked at most once. */ private void processClose(int statusCode, String reason) { - assert statusCode != TLS_HANDSHAKE_FAILURE; // TLS problems happen long before WebSocket is alive receiver.close(); try { channel.shutdownInput(); } catch (IOException e) { Log.logError(e); } - boolean wasComplete = !closeReceived.complete(null); - if (wasComplete) { + boolean alreadyCompleted = !closeReceived.complete(null); + if (alreadyCompleted) { + // This CF is supposed to be completed only once, the first time a + // Close message is received. No further messages are pulled from + // the socket. throw new InternalError(); } int code; @@ -261,19 +267,17 @@ final class WebSocketImpl implements WebSocket { @Override public CompletableFuture sendClose(int statusCode, String reason) { - try { - checkOutgoingCode(statusCode); - } catch (CheckFailedException e) { - IllegalArgumentException ex = new IllegalArgumentException( - "Bad status code: " + statusCode, e); - failedFuture(ex); + if (!isLegalToSendFromClient(statusCode)) { + return failedFuture( + new IllegalArgumentException("statusCode: " + statusCode)); } - return enqueueClose(new Close(statusCode, reason)); - } - - @Override - public CompletableFuture sendClose() { - return enqueueClose(new Close()); + Close msg; + try { + msg = new Close(statusCode, reason); + } catch (IllegalArgumentException e) { + return failedFuture(e); + } + return enqueueClose(msg); } /* @@ -288,8 +292,8 @@ final class WebSocketImpl implements WebSocket { } catch (IOException e) { Log.logError(e); } - boolean wasComplete = !closeSent.complete(null); - if (wasComplete) { + boolean alreadyCompleted = !closeSent.complete(null); + if (alreadyCompleted) { // Shouldn't happen as this callback must run at most once throw new InternalError(); } @@ -316,40 +320,41 @@ final class WebSocketImpl implements WebSocket { private CompletableFuture enqueue(OutgoingMessage m) { CompletableFuture cf = new CompletableFuture<>(); - Consumer h = e -> { - if (e == null) { - cf.complete(WebSocketImpl.this); - sendHandler.startOrContinue(); - } else { - -// what if this is not a users message? (must be different entry points for different messages) - - // TODO: think about correct behaviour in the face of error in - // the queue, for now it seems like the best solution is to - // deliver the error and stop - cf.completeExceptionally(e); - } - }; - queue.add(pair(m, h)); // Always returns true - sendHandler.startOrContinue(); + boolean added = queue.add(pair(m, cf)); + if (!added) { + // The queue is supposed to be unbounded + throw new InternalError(); + } + sendHandler.handle(); return cf; } - private void sendFirst() { - Pair> p = queue.poll(); + /* + * This is the main sending method. It may be run in different threads, + * but never concurrently. + */ + private void sendFirst(Runnable whenSent) { + Pair> p = queue.poll(); if (p == null) { + whenSent.run(); return; } OutgoingMessage message = p.first; - Consumer h = p.second; + CompletableFuture cf = p.second; try { - // At this point messages are finally ordered and will be written - // one by one in a mutually exclusive fashion; thus it's a pretty - // convenient place to contextualize them message.contextualize(context); + Consumer h = e -> { + if (e == null) { + cf.complete(WebSocketImpl.this); + } else { + cf.completeExceptionally(e); + } + sendHandler.handle(); + whenSent.run(); + }; transmitter.send(message, h); } catch (Exception t) { - h.accept(t); + cf.completeExceptionally(t); } } @@ -381,7 +386,7 @@ final class WebSocketImpl implements WebSocket { @Override public String toString() { return super.toString() - + "[" + (closed.get() ? "OPEN" : "CLOSED") + "]: " + uri + + "[" + (closed.get() ? "CLOSED" : "OPEN") + "]: " + uri + (!subprotocol.isEmpty() ? ", subprotocol=" + subprotocol : ""); } @@ -476,7 +481,9 @@ final class WebSocketImpl implements WebSocket { int code = ((FailWebSocketException) error).getStatusCode(); enqueueClose(new Close(code, "")) .whenComplete((r, e) -> { - ex.addSuppressed(e); + if (e != null) { + ex.addSuppressed(e); + } try { channel.close(); } catch (IOException e1) { diff --git a/jdk/test/java/net/httpclient/whitebox/Driver.java b/jdk/test/java/net/httpclient/whitebox/Driver.java index c85b8f076c6..5b9e3438989 100644 --- a/jdk/test/java/net/httpclient/whitebox/Driver.java +++ b/jdk/test/java/net/httpclient/whitebox/Driver.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -26,5 +26,6 @@ * @bug 8151299 8164704 * @modules jdk.incubator.httpclient * @run testng jdk.incubator.httpclient/jdk.incubator.http.SelectorTest + * @run testng jdk.incubator.httpclient/jdk.incubator.http.RawChannelTest * @run testng jdk.incubator.httpclient/jdk.incubator.http.ResponseHeadersTest */ diff --git a/jdk/test/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/RawChannelTest.java b/jdk/test/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/RawChannelTest.java new file mode 100644 index 00000000000..f74c9889611 --- /dev/null +++ b/jdk/test/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/RawChannelTest.java @@ -0,0 +1,287 @@ +/* + * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +package jdk.incubator.http; + +import jdk.incubator.http.internal.websocket.RawChannel; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.UncheckedIOException; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.URI; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import static jdk.incubator.http.HttpResponse.BodyHandler.discard; +import static org.testng.Assert.assertEquals; + +/* + * This test exercises mechanics of _independent_ reads and writes on the + * RawChannel. It verifies that the underlying implementation can manage more + * than a single type of notifications at the same time. + */ +public class RawChannelTest { + + private final AtomicLong clientWritten = new AtomicLong(); + private final AtomicLong serverWritten = new AtomicLong(); + private final AtomicLong clientRead = new AtomicLong(); + private final AtomicLong serverRead = new AtomicLong(); + + /* + * Since at this level we don't have any control over the low level socket + * parameters, this latch ensures a write to the channel will stall at least + * once (socket's send buffer filled up). + */ + private final CountDownLatch writeStall = new CountDownLatch(1); + + /* + * This one works similarly by providing means to ensure a read from the + * channel will stall at least once (no more data available on the socket). + */ + private final CountDownLatch readStall = new CountDownLatch(1); + + private final AtomicInteger writeHandles = new AtomicInteger(); + private final AtomicInteger readHandles = new AtomicInteger(); + + private final CountDownLatch exit = new CountDownLatch(1); + + @Test + public void test() throws Exception { + try (ServerSocket server = new ServerSocket(0)) { + int port = server.getLocalPort(); + new TestServer(server).start(); + final RawChannel chan = channelOf(port); + + // It's very important not to forget the initial bytes, possibly + // left from the HTTP thingy + int initialBytes = chan.initialByteBuffer().remaining(); + print("RawChannel has %s initial bytes", initialBytes); + clientRead.addAndGet(initialBytes); + + chan.registerEvent(new RawChannel.RawEvent() { + + private final ByteBuffer reusableBuffer = ByteBuffer.allocate(32768); + + @Override + public int interestOps() { + return SelectionKey.OP_WRITE; + } + + @Override + public void handle() { + int i = writeHandles.incrementAndGet(); + print("OP_WRITE #%s", i); + if (i > 3) { // Fill up the send buffer not more than 3 times + try { + chan.shutdownOutput(); + } catch (IOException e) { + e.printStackTrace(); + } + return; + } + long total = 0; + try { + long n; + do { + ByteBuffer[] array = {reusableBuffer.slice()}; + n = chan.write(array, 0, 1); + total += n; + } while (n > 0); + print("OP_WRITE clogged SNDBUF with %s bytes", total); + clientWritten.addAndGet(total); + chan.registerEvent(this); + writeStall.countDown(); // signal send buffer is full + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + }); + + chan.registerEvent(new RawChannel.RawEvent() { + + @Override + public int interestOps() { + return SelectionKey.OP_READ; + } + + @Override + public void handle() { + int i = readHandles.incrementAndGet(); + print("OP_READ #%s", i); + ByteBuffer read = null; + long total = 0; + while (true) { + try { + read = chan.read(); + } catch (IOException e) { + e.printStackTrace(); + } + if (read == null) { + print("OP_READ EOF"); + break; + } else if (!read.hasRemaining()) { + print("OP_READ stall"); + try { + chan.registerEvent(this); + } catch (IOException e) { + e.printStackTrace(); + } + readStall.countDown(); + break; + } + int r = read.remaining(); + total += r; + clientRead.addAndGet(r); + } + print("OP_READ read %s bytes", total); + } + }); + exit.await(); // All done, we need to compare results: + assertEquals(clientRead.get(), serverWritten.get()); + assertEquals(serverRead.get(), clientWritten.get()); + } + } + + private static RawChannel channelOf(int port) throws Exception { + URI uri = URI.create("http://127.0.0.1:" + port + "/"); + print("raw channel to %s", uri.toString()); + HttpRequest req = HttpRequest.newBuilder(uri).build(); + HttpResponse r = HttpClient.newHttpClient().send(req, discard(null)); + r.body(); + return ((HttpResponseImpl) r).rawChannel(); + } + + private class TestServer extends Thread { // Powered by Slowpokes + + private final ServerSocket server; + + TestServer(ServerSocket server) throws IOException { + this.server = server; + } + + @Override + public void run() { + try (Socket s = server.accept()) { + InputStream is = s.getInputStream(); + OutputStream os = s.getOutputStream(); + + processHttp(is, os); + + Thread reader = new Thread(() -> { + try { + long n = readSlowly(is); + print("Server read %s bytes", n); + serverRead.addAndGet(n); + s.shutdownInput(); + } catch (Exception e) { + e.printStackTrace(); + } + }); + + Thread writer = new Thread(() -> { + try { + long n = writeSlowly(os); + print("Server written %s bytes", n); + serverWritten.addAndGet(n); + s.shutdownOutput(); + } catch (Exception e) { + e.printStackTrace(); + } + }); + + reader.start(); + writer.start(); + + reader.join(); + writer.join(); + } catch (Exception e) { + e.printStackTrace(); + } finally { + exit.countDown(); + } + } + + private void processHttp(InputStream is, OutputStream os) + throws IOException + { + os.write("HTTP/1.1 200 OK\r\nContent-length: 0\r\n\r\n".getBytes()); + byte[] buf = new byte[1024]; + String s = ""; + while (true) { + int n = is.read(buf); + if (n <= 0) { + throw new RuntimeException("Unexpected end of request"); + } + s = s + new String(buf, 0, n); + if (s.contains("\r\n\r\n")) { + break; + } + } + } + + private long writeSlowly(OutputStream os) throws Exception { + byte[] first = byteArrayOfSize(1024); + long total = first.length; + os.write(first); + // Let's wait for the signal from the raw channel that its read has + // stalled, and then continue sending a bit more stuff + readStall.await(); + for (int i = 0; i < 32; i++) { + byte[] b = byteArrayOfSize(1024); + os.write(b); + total += b.length; + TimeUnit.MILLISECONDS.sleep(1); + } + return total; + } + + private long readSlowly(InputStream is) throws Exception { + // Wait for the raw channel to fill up the its send buffer + writeStall.await(); + long overall = 0; + byte[] array = new byte[1024]; + for (int n = 0; n != -1; n = is.read(array)) { + TimeUnit.MILLISECONDS.sleep(1); + overall += n; + } + return overall; + } + } + + private static void print(String format, Object... args) { + System.out.println(Thread.currentThread() + ": " + String.format(format, args)); + } + + private static byte[] byteArrayOfSize(int bound) { + return new byte[new Random().nextInt(1 + bound)]; + } +}