diff --git a/jdk/src/java.httpclient/share/classes/java/net/http/WS.java b/jdk/src/java.httpclient/share/classes/java/net/http/WS.java index da8d400836d..7d51f646638 100644 --- a/jdk/src/java.httpclient/share/classes/java/net/http/WS.java +++ b/jdk/src/java.httpclient/share/classes/java/net/http/WS.java @@ -86,7 +86,7 @@ final class WS implements WebSocket { } } }; - transmitter = new WSTransmitter(executor, channel, errorHandler); + transmitter = new WSTransmitter(this, executor, channel, errorHandler); receiver = new WSReceiver(this.listener, this, executor, channel); } @@ -95,7 +95,7 @@ final class WS implements WebSocket { } @Override - public CompletableFuture sendText(CharSequence message, boolean isLast) { + public CompletableFuture sendText(CharSequence message, boolean isLast) { requireNonNull(message, "message"); synchronized (stateLock) { checkState(); @@ -104,7 +104,7 @@ final class WS implements WebSocket { } @Override - public CompletableFuture sendText(Stream message) { + public CompletableFuture sendText(Stream message) { requireNonNull(message, "message"); synchronized (stateLock) { checkState(); @@ -113,7 +113,7 @@ final class WS implements WebSocket { } @Override - public CompletableFuture sendBinary(ByteBuffer message, boolean isLast) { + public CompletableFuture sendBinary(ByteBuffer message, boolean isLast) { requireNonNull(message, "message"); synchronized (stateLock) { checkState(); @@ -122,7 +122,7 @@ final class WS implements WebSocket { } @Override - public CompletableFuture sendPing(ByteBuffer message) { + public CompletableFuture sendPing(ByteBuffer message) { requireNonNull(message, "message"); synchronized (stateLock) { checkState(); @@ -131,7 +131,7 @@ final class WS implements WebSocket { } @Override - public CompletableFuture sendPong(ByteBuffer message) { + public CompletableFuture sendPong(ByteBuffer message) { requireNonNull(message, "message"); synchronized (stateLock) { checkState(); @@ -140,7 +140,7 @@ final class WS implements WebSocket { } @Override - public CompletableFuture sendClose(CloseCode code, CharSequence reason) { + public CompletableFuture sendClose(CloseCode code, CharSequence reason) { requireNonNull(code, "code"); requireNonNull(reason, "reason"); synchronized (stateLock) { @@ -149,13 +149,13 @@ final class WS implements WebSocket { } @Override - public CompletableFuture sendClose() { + public CompletableFuture sendClose() { synchronized (stateLock) { return doSendClose(() -> transmitter.sendClose()); } } - private CompletableFuture doSendClose(Supplier> s) { + private CompletableFuture doSendClose(Supplier> s) { checkState(); boolean closeChannel = false; synchronized (stateLock) { @@ -165,7 +165,7 @@ final class WS implements WebSocket { tryChangeState(State.CLOSED_LOCALLY); } } - CompletableFuture sent = s.get(); + CompletableFuture sent = s.get(); if (closeChannel) { sent.whenComplete((v, t) -> { try { diff --git a/jdk/src/java.httpclient/share/classes/java/net/http/WSTransmitter.java b/jdk/src/java.httpclient/share/classes/java/net/http/WSTransmitter.java index f9827306646..a9a8287478e 100644 --- a/jdk/src/java.httpclient/share/classes/java/net/http/WSTransmitter.java +++ b/jdk/src/java.httpclient/share/classes/java/net/http/WSTransmitter.java @@ -51,15 +51,17 @@ import static java.net.http.Pair.pair; */ final class WSTransmitter { - private final BlockingQueue>> + private final BlockingQueue>> backlog = new LinkedBlockingQueue<>(); private final WSMessageSender sender; private final WSSignalHandler handler; + private final WebSocket webSocket; private boolean previousMessageSent = true; private boolean canSendBinary = true; private boolean canSendText = true; - WSTransmitter(Executor executor, RawChannel channel, Consumer errorHandler) { + WSTransmitter(WebSocket ws, Executor executor, RawChannel channel, Consumer errorHandler) { + this.webSocket = ws; this.handler = new WSSignalHandler(executor, this::handleSignal); Consumer sendCompletion = (error) -> { synchronized (this) { @@ -76,41 +78,41 @@ final class WSTransmitter { this.sender = new WSMessageSender(channel, sendCompletion); } - CompletableFuture sendText(CharSequence message, boolean isLast) { + CompletableFuture sendText(CharSequence message, boolean isLast) { checkAndUpdateText(isLast); return acceptMessage(new Text(isLast, message)); } - CompletableFuture sendText(Stream message) { + CompletableFuture sendText(Stream message) { checkAndUpdateText(true); return acceptMessage(new StreamedText(message)); } - CompletableFuture sendBinary(ByteBuffer message, boolean isLast) { + CompletableFuture sendBinary(ByteBuffer message, boolean isLast) { checkAndUpdateBinary(isLast); return acceptMessage(new Binary(isLast, message)); } - CompletableFuture sendPing(ByteBuffer message) { + CompletableFuture sendPing(ByteBuffer message) { checkSize(message.remaining(), 125); return acceptMessage(new Ping(message)); } - CompletableFuture sendPong(ByteBuffer message) { + CompletableFuture sendPong(ByteBuffer message) { checkSize(message.remaining(), 125); return acceptMessage(new Pong(message)); } - CompletableFuture sendClose(WebSocket.CloseCode code, CharSequence reason) { + CompletableFuture sendClose(WebSocket.CloseCode code, CharSequence reason) { return acceptMessage(createCloseMessage(code, reason)); } - CompletableFuture sendClose() { + CompletableFuture sendClose() { return acceptMessage(new Close(ByteBuffer.allocate(0))); } - private CompletableFuture acceptMessage(WSOutgoingMessage m) { - CompletableFuture cf = new CompletableFuture<>(); + private CompletableFuture acceptMessage(WSOutgoingMessage m) { + CompletableFuture cf = new CompletableFuture<>(); synchronized (this) { backlog.offer(pair(m, cf)); } @@ -123,11 +125,11 @@ final class WSTransmitter { synchronized (this) { while (!backlog.isEmpty() && previousMessageSent) { previousMessageSent = false; - Pair> p = backlog.peek(); + Pair> p = backlog.peek(); boolean sent = sender.trySendFully(p.first); if (sent) { backlog.remove(); - p.second.complete(null); + p.second.complete(webSocket); previousMessageSent = true; } } diff --git a/jdk/src/java.httpclient/share/classes/java/net/http/WebSocket.java b/jdk/src/java.httpclient/share/classes/java/net/http/WebSocket.java index 0b7123772f7..dfc7f71dc3b 100644 --- a/jdk/src/java.httpclient/share/classes/java/net/http/WebSocket.java +++ b/jdk/src/java.httpclient/share/classes/java/net/http/WebSocket.java @@ -52,8 +52,8 @@ import java.util.stream.Stream; * *

Messages of type {@code X} are sent through the {@code WebSocket.sendX} * methods and received through {@link WebSocket.Listener}{@code .onX} methods - * asynchronously. Each of the methods begins the operation and returns a {@link - * CompletionStage} which completes when the operation has completed. + * asynchronously. Each of the methods returns a {@link CompletionStage} which + * completes when the operation has completed. * *

Messages are received only if {@linkplain #request(long) requested}. * @@ -79,6 +79,9 @@ import java.util.stream.Stream; * or method of this type will cause a {@link NullPointerException * NullPointerException} to be thrown. * + * @implNote The default implementation's methods do not block before returning + * a {@code CompletableFuture}. + * * @since 9 */ public interface WebSocket { @@ -234,9 +237,9 @@ public interface WebSocket { /** * Builds a {@code WebSocket}. * - *

Returns immediately with a {@code CompletableFuture} - * which completes with the {@code WebSocket} when it is connected, or - * completes exceptionally if an error occurs. + *

Returns a {@code CompletableFuture} which completes + * normally with the {@code WebSocket} when it is connected or completes + * exceptionally if an error occurs. * *

{@code CompletableFuture} may complete exceptionally with the * following errors: @@ -252,7 +255,7 @@ public interface WebSocket { * if the opening handshake fails * * - * @return a {@code CompletableFuture} of {@code WebSocket} + * @return a {@code CompletableFuture} with the {@code WebSocket} */ CompletableFuture buildAsync(); } @@ -601,9 +604,9 @@ public interface WebSocket { /** * Sends a Text message with characters from the given {@code CharSequence}. * - *

Returns immediately with a {@code CompletableFuture} which - * completes normally when the message has been sent, or completes - * exceptionally if an error occurs. + *

Returns a {@code CompletableFuture} which completes + * normally when the message has been sent or completes exceptionally if an + * error occurs. * *

The {@code CharSequence} should not be modified until the returned * {@code CompletableFuture} completes (either normally or exceptionally). @@ -612,9 +615,12 @@ public interface WebSocket { * with: *

    *
  • {@link IOException} - * if an I/O error occurs during this operation; or the - * {@code WebSocket} closes while this operation is in progress; - * or the {@code message} is a malformed UTF-16 sequence + * if an I/O error occurs during this operation + *
  • {@link IllegalStateException} + * if the {@code WebSocket} closes while this operation is in progress; + * or if a Close message has been sent already; + * or if there is an outstanding send operation; + * or if a previous Binary message was not sent with {@code isLast == true} *
* * @implNote This implementation does not accept partial UTF-16 @@ -624,22 +630,15 @@ public interface WebSocket { * @param message * the message * @param isLast - * {@code true} if this is the final part of the message + * {@code true} if this is the final part of the message, * {@code false} otherwise * - * @return a CompletableFuture of Void + * @return a CompletableFuture with this WebSocket * - * @throws IllegalStateException - * if the WebSocket is closed - * @throws IllegalStateException - * if a Close message has been already sent - * @throws IllegalStateException - * if there is an outstanding send operation - * @throws IllegalStateException - * if a previous Binary message was not sent - * with {@code isLast == true} + * @throws IllegalArgumentException + * if {@code message} is a malformed (or an incomplete) UTF-16 sequence */ - CompletableFuture sendText(CharSequence message, boolean isLast); + CompletableFuture sendText(CharSequence message, boolean isLast); /** * Sends a whole Text message with characters from the given {@code @@ -648,9 +647,9 @@ public interface WebSocket { *

This is a convenience method. For the general case, use {@link * #sendText(CharSequence, boolean)}. * - *

Returns immediately with a {@code CompletableFuture} which - * completes normally when the message has been sent, or completes - * exceptionally if an error occurs. + *

Returns a {@code CompletableFuture} which completes + * normally when the message has been sent or completes exceptionally if an + * error occurs. * *

The {@code CharSequence} should not be modified until the returned * {@code CompletableFuture} completes (either normally or exceptionally). @@ -659,27 +658,23 @@ public interface WebSocket { * with: *

    *
  • {@link IOException} - * if an I/O error occurs during this operation; - * or the {@code WebSocket} closes while this operation is in progress; - * or the message is a malformed (or an incomplete) UTF-16 sequence + * if an I/O error occurs during this operation + *
  • {@link IllegalStateException} + * if the {@code WebSocket} closes while this operation is in progress; + * or if a Close message has been sent already; + * or if there is an outstanding send operation; + * or if a previous Binary message was not sent with {@code isLast == true} *
* * @param message * the message * - * @return a CompletableFuture of Void + * @return a CompletableFuture with this WebSocket * - * @throws IllegalStateException - * if the WebSocket is closed - * @throws IllegalStateException - * if a Close message has been already sent - * @throws IllegalStateException - * if there is an outstanding send operation - * @throws IllegalStateException - * if a previous Binary message was not sent - * with {@code isLast == true} + * @throws IllegalArgumentException + * if {@code message} is a malformed (or an incomplete) UTF-16 sequence */ - default CompletableFuture sendText(CharSequence message) { + default CompletableFuture sendText(CharSequence message) { return sendText(message, true); } @@ -690,9 +685,9 @@ public interface WebSocket { *

This is a convenience method. For the general case use {@link * #sendText(CharSequence, boolean)}. * - *

Returns immediately with a {@code CompletableFuture} which - * completes normally when the message has been sent, or completes - * exceptionally if an error occurs. + *

Returns a {@code CompletableFuture} which completes + * normally when the message has been sent or completes exceptionally if an + * error occurs. * *

Streamed character sequences should not be modified until the * returned {@code CompletableFuture} completes (either normally or @@ -702,41 +697,41 @@ public interface WebSocket { * with: *

    *
  • {@link IOException} - * if an I/O error occurs during this operation; - * or the {@code WebSocket} closes while this operation is in progress; - * or the message is a malformed (or an incomplete) UTF-16 sequence + * if an I/O error occurs during this operation + *
  • {@link IllegalStateException} + * if the {@code WebSocket} closes while this operation is in progress; + * or if a Close message has been sent already; + * or if there is an outstanding send operation; + * or if a previous Binary message was not sent with {@code isLast == true} *
* * @param message * the message * - * @return a CompletableFuture of Void + * @return a CompletableFuture with this WebSocket * - * @throws IllegalStateException - * if the WebSocket is closed - * @throws IllegalStateException - * if a Close message has been already sent - * @throws IllegalStateException - * if there is an outstanding send operation - * @throws IllegalStateException - * if a previous Binary message was not sent - * with {@code isLast == true} + * @throws IllegalArgumentException + * if {@code message} is a malformed (or an incomplete) UTF-16 sequence */ - CompletableFuture sendText(Stream message); + CompletableFuture sendText(Stream message); /** * Sends a Binary message with bytes from the given {@code ByteBuffer}. * - *

Returns immediately with a {@code CompletableFuture} which - * completes normally when the message has been sent, or completes - * exceptionally if an error occurs. + *

Returns a {@code CompletableFuture} which completes + * normally when the message has been sent or completes exceptionally if an + * error occurs. * *

The returned {@code CompletableFuture} can complete exceptionally * with: *

    *
  • {@link IOException} - * if an I/O error occurs during this operation or the - * {@code WebSocket} closes while this operation is in progress + * if an I/O error occurs during this operation + *
  • {@link IllegalStateException} + * if the {@code WebSocket} closes while this operation is in progress; + * or if a Close message has been sent already; + * or if there is an outstanding send operation; + * or if a previous Text message was not sent with {@code isLast == true} *
* * @param message @@ -745,33 +740,27 @@ public interface WebSocket { * {@code true} if this is the final part of the message, * {@code false} otherwise * - * @return a CompletableFuture of Void - * - * @throws IllegalStateException - * if the WebSocket is closed - * @throws IllegalStateException - * if a Close message has been already sent - * @throws IllegalStateException - * if there is an outstanding send operation - * @throws IllegalStateException - * if a previous Text message was not sent - * with {@code isLast == true} + * @return a CompletableFuture with this WebSocket */ - CompletableFuture sendBinary(ByteBuffer message, boolean isLast); + CompletableFuture sendBinary(ByteBuffer message, boolean isLast); /** * Sends a Binary message with bytes from the given {@code byte[]}. * - *

Returns immediately with a {@code CompletableFuture} which - * completes normally when the message has been sent, or completes - * exceptionally if an error occurs. + *

Returns a {@code CompletableFuture} which completes + * normally when the message has been sent or completes exceptionally if an + * error occurs. * *

The returned {@code CompletableFuture} can complete exceptionally * with: *

    *
  • {@link IOException} - * if an I/O error occurs during this operation or the - * {@code WebSocket} closes while this operation is in progress + * if an I/O error occurs during this operation + *
  • {@link IllegalStateException} + * if the {@code WebSocket} closes while this operation is in progress; + * or if a Close message has been sent already; + * or if there is an outstanding send operation; + * or if a previous Text message was not sent with {@code isLast == true} *
* * @implSpec This is equivalent to: @@ -785,19 +774,9 @@ public interface WebSocket { * {@code true} if this is the final part of the message, * {@code false} otherwise * - * @return a CompletableFuture of Void - * - * @throws IllegalStateException - * if the WebSocket is closed - * @throws IllegalStateException - * if a Close message has been already sent - * @throws IllegalStateException - * if there is an outstanding send operation - * @throws IllegalStateException - * if a previous Text message was not sent - * with {@code isLast == true} + * @return a CompletableFuture with this WebSocket */ - default CompletableFuture sendBinary(byte[] message, boolean isLast) { + default CompletableFuture sendBinary(byte[] message, boolean isLast) { Objects.requireNonNull(message, "message"); return sendBinary(ByteBuffer.wrap(message), isLast); } @@ -805,9 +784,9 @@ public interface WebSocket { /** * Sends a Ping message. * - *

Returns immediately with a {@code CompletableFuture} which - * completes normally when the message has been sent, or completes - * exceptionally if an error occurs. + *

Returns a {@code CompletableFuture} which completes + * normally when the message has been sent or completes exceptionally if an + * error occurs. * *

A Ping message may be sent or received by either client or server. * It may serve either as a keepalive or as a means to verify that the @@ -820,32 +799,29 @@ public interface WebSocket { * with: *

    *
  • {@link IOException} - * if an I/O error occurs during this operation or the - * {@code WebSocket} closes while this operation is in progress + * if an I/O error occurs during this operation + *
  • {@link IllegalStateException} + * if the {@code WebSocket} closes while this operation is in progress; + * or if a Close message has been sent already; + * or if there is an outstanding send operation *
* * @param message * the message * - * @return a CompletableFuture of Void + * @return a CompletableFuture with this WebSocket * - * @throws IllegalStateException - * if the WebSocket is closed - * @throws IllegalStateException - * if a Close message has been already sent - * @throws IllegalStateException - * if there is an outstanding send operation * @throws IllegalArgumentException * if {@code message.remaining() > 125} */ - CompletableFuture sendPing(ByteBuffer message); + CompletableFuture sendPing(ByteBuffer message); /** * Sends a Pong message. * - *

Returns immediately with a {@code CompletableFuture} which - * completes normally when the message has been sent, or completes - * exceptionally if an error occurs. + *

Returns a {@code CompletableFuture} which completes + * normally when the message has been sent or completes exceptionally if an + * error occurs. * *

A Pong message may be unsolicited or may be sent in response to a * previously received Ping. In latter case the contents of the Pong is @@ -858,32 +834,29 @@ public interface WebSocket { * with: *

    *
  • {@link IOException} - * if an I/O error occurs during this operation or the - * {@code WebSocket} closes while this operation is in progress + * if an I/O error occurs during this operation + *
  • {@link IllegalStateException} + * if the {@code WebSocket} closes while this operation is in progress; + * or if a Close message has been sent already; + * or if there is an outstanding send operation *
* * @param message * the message * - * @return a CompletableFuture of Void + * @return a CompletableFuture with this WebSocket * - * @throws IllegalStateException - * if the WebSocket is closed - * @throws IllegalStateException - * if a Close message has been already sent - * @throws IllegalStateException - * if there is an outstanding send operation * @throws IllegalArgumentException * if {@code message.remaining() > 125} */ - CompletableFuture sendPong(ByteBuffer message); + CompletableFuture sendPong(ByteBuffer message); /** * Sends a Close message with the given close code and the reason. * - *

Returns immediately with a {@code CompletableFuture} which - * completes normally when the message has been sent, or completes - * exceptionally if an error occurs. + *

Returns a {@code CompletableFuture} which completes + * normally when the message has been sent or completes exceptionally if an + * error occurs. * *

A Close message may consist of a close code and a reason for closing. * The reason must have a valid UTF-8 representation not longer than {@code @@ -894,8 +867,11 @@ public interface WebSocket { * with: *

    *
  • {@link IOException} - * if an I/O error occurs during this operation or the - * {@code WebSocket} closes while this operation is in progress + * if an I/O error occurs during this operation + *
  • {@link IllegalStateException} + * if the {@code WebSocket} closes while this operation is in progress; + * or if a Close message has been sent already; + * or if there is an outstanding send operation *
* * @param code @@ -903,45 +879,35 @@ public interface WebSocket { * @param reason * the reason; can be empty * - * @return a CompletableFuture of Void + * @return a CompletableFuture with this WebSocket * - * @throws IllegalStateException - * if the WebSocket is closed - * @throws IllegalStateException - * if a Close message has been already sent - * @throws IllegalStateException - * if there is an outstanding send operation * @throws IllegalArgumentException - * if the {@code reason} doesn't have a valid UTF-8 - * representation not longer than {@code 123} bytes + * if {@code reason} doesn't have an UTF-8 representation not longer + * than {@code 123} bytes */ - CompletableFuture sendClose(CloseCode code, CharSequence reason); + CompletableFuture sendClose(CloseCode code, CharSequence reason); /** * Sends an empty Close message. * - *

Returns immediately with a {@code CompletableFuture} which - * completes normally when the message has been sent, or completes - * exceptionally if an error occurs. + *

Returns a {@code CompletableFuture} which completes + * normally when the message has been sent or completes exceptionally if an + * error occurs. * *

The returned {@code CompletableFuture} can complete exceptionally * with: *

    *
  • {@link IOException} - * if an I/O error occurs during this operation or the - * {@code WebSocket} closes while this operation is in progress + * if an I/O error occurs during this operation + *
  • {@link IllegalStateException} + * if the {@code WebSocket} closes while this operation is in progress; + * or if a Close message has been sent already; + * or if there is an outstanding send operation *
* - * @return a CompletableFuture of Void - * - * @throws IllegalStateException - * if the WebSocket is closed - * @throws IllegalStateException - * if a Close message has been already sent - * @throws IllegalStateException - * if there is an outstanding send operation + * @return a CompletableFuture with this WebSocket */ - CompletableFuture sendClose(); + CompletableFuture sendClose(); /** * Requests {@code n} more messages to be received by the {@link Listener diff --git a/jdk/test/java/net/httpclient/BasicWebSocketAPITest.java b/jdk/test/java/net/httpclient/BasicWebSocketAPITest.java index 4e454c0a533..0ee412450ed 100644 --- a/jdk/test/java/net/httpclient/BasicWebSocketAPITest.java +++ b/jdk/test/java/net/httpclient/BasicWebSocketAPITest.java @@ -30,6 +30,7 @@ import java.net.http.HttpClient; import java.net.http.WebSocket; import java.net.http.WebSocket.CloseCode; import java.nio.ByteBuffer; +import java.nio.CharBuffer; import java.nio.channels.SocketChannel; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -90,12 +91,24 @@ public class BasicWebSocketAPITest { "message", () -> ws.sendBinary((ByteBuffer) null, true)) ); + checkAndClose( + (ws) -> + TestKit.assertThrows(IllegalArgumentException.class, + ".*message.*", + () -> ws.sendPing(ByteBuffer.allocate(126))) + ); checkAndClose( (ws) -> TestKit.assertThrows(NullPointerException.class, "message", () -> ws.sendPing(null)) ); + checkAndClose( + (ws) -> + TestKit.assertThrows(IllegalArgumentException.class, + ".*message.*", + () -> ws.sendPong(ByteBuffer.allocate(126))) + ); checkAndClose( (ws) -> TestKit.assertThrows(NullPointerException.class, @@ -106,7 +119,7 @@ public class BasicWebSocketAPITest { (ws) -> TestKit.assertThrows(NullPointerException.class, "message", - () -> ws.sendText((CharSequence) null, true)) + () -> ws.sendText(null, true)) ); checkAndClose( (ws) -> @@ -120,6 +133,12 @@ public class BasicWebSocketAPITest { "message", () -> ws.sendText((Stream) null)) ); + checkAndClose( + (ws) -> + TestKit.assertThrows(IllegalArgumentException.class, + "(?i).*reason.*", + () -> ws.sendClose(CloseCode.NORMAL_CLOSURE, CharBuffer.allocate(124))) + ); checkAndClose( (ws) -> TestKit.assertThrows(NullPointerException.class,