8156693: Improve usability of CompletableFuture use in WebSocket API

Reviewed-by: rriggs
This commit is contained in:
Pavel Rappo 2016-06-08 15:19:58 +01:00
parent ebf4782aa1
commit 7c857909cf
4 changed files with 158 additions and 171 deletions

View File

@ -86,7 +86,7 @@ final class WS implements WebSocket {
} }
} }
}; };
transmitter = new WSTransmitter(executor, channel, errorHandler); transmitter = new WSTransmitter(this, executor, channel, errorHandler);
receiver = new WSReceiver(this.listener, this, executor, channel); receiver = new WSReceiver(this.listener, this, executor, channel);
} }
@ -95,7 +95,7 @@ final class WS implements WebSocket {
} }
@Override @Override
public CompletableFuture<Void> sendText(CharSequence message, boolean isLast) { public CompletableFuture<WebSocket> sendText(CharSequence message, boolean isLast) {
requireNonNull(message, "message"); requireNonNull(message, "message");
synchronized (stateLock) { synchronized (stateLock) {
checkState(); checkState();
@ -104,7 +104,7 @@ final class WS implements WebSocket {
} }
@Override @Override
public CompletableFuture<Void> sendText(Stream<? extends CharSequence> message) { public CompletableFuture<WebSocket> sendText(Stream<? extends CharSequence> message) {
requireNonNull(message, "message"); requireNonNull(message, "message");
synchronized (stateLock) { synchronized (stateLock) {
checkState(); checkState();
@ -113,7 +113,7 @@ final class WS implements WebSocket {
} }
@Override @Override
public CompletableFuture<Void> sendBinary(ByteBuffer message, boolean isLast) { public CompletableFuture<WebSocket> sendBinary(ByteBuffer message, boolean isLast) {
requireNonNull(message, "message"); requireNonNull(message, "message");
synchronized (stateLock) { synchronized (stateLock) {
checkState(); checkState();
@ -122,7 +122,7 @@ final class WS implements WebSocket {
} }
@Override @Override
public CompletableFuture<Void> sendPing(ByteBuffer message) { public CompletableFuture<WebSocket> sendPing(ByteBuffer message) {
requireNonNull(message, "message"); requireNonNull(message, "message");
synchronized (stateLock) { synchronized (stateLock) {
checkState(); checkState();
@ -131,7 +131,7 @@ final class WS implements WebSocket {
} }
@Override @Override
public CompletableFuture<Void> sendPong(ByteBuffer message) { public CompletableFuture<WebSocket> sendPong(ByteBuffer message) {
requireNonNull(message, "message"); requireNonNull(message, "message");
synchronized (stateLock) { synchronized (stateLock) {
checkState(); checkState();
@ -140,7 +140,7 @@ final class WS implements WebSocket {
} }
@Override @Override
public CompletableFuture<Void> sendClose(CloseCode code, CharSequence reason) { public CompletableFuture<WebSocket> sendClose(CloseCode code, CharSequence reason) {
requireNonNull(code, "code"); requireNonNull(code, "code");
requireNonNull(reason, "reason"); requireNonNull(reason, "reason");
synchronized (stateLock) { synchronized (stateLock) {
@ -149,13 +149,13 @@ final class WS implements WebSocket {
} }
@Override @Override
public CompletableFuture<Void> sendClose() { public CompletableFuture<WebSocket> sendClose() {
synchronized (stateLock) { synchronized (stateLock) {
return doSendClose(() -> transmitter.sendClose()); return doSendClose(() -> transmitter.sendClose());
} }
} }
private CompletableFuture<Void> doSendClose(Supplier<CompletableFuture<Void>> s) { private CompletableFuture<WebSocket> doSendClose(Supplier<CompletableFuture<WebSocket>> s) {
checkState(); checkState();
boolean closeChannel = false; boolean closeChannel = false;
synchronized (stateLock) { synchronized (stateLock) {
@ -165,7 +165,7 @@ final class WS implements WebSocket {
tryChangeState(State.CLOSED_LOCALLY); tryChangeState(State.CLOSED_LOCALLY);
} }
} }
CompletableFuture<Void> sent = s.get(); CompletableFuture<WebSocket> sent = s.get();
if (closeChannel) { if (closeChannel) {
sent.whenComplete((v, t) -> { sent.whenComplete((v, t) -> {
try { try {

View File

@ -51,15 +51,17 @@ import static java.net.http.Pair.pair;
*/ */
final class WSTransmitter { final class WSTransmitter {
private final BlockingQueue<Pair<WSOutgoingMessage, CompletableFuture<Void>>> private final BlockingQueue<Pair<WSOutgoingMessage, CompletableFuture<WebSocket>>>
backlog = new LinkedBlockingQueue<>(); backlog = new LinkedBlockingQueue<>();
private final WSMessageSender sender; private final WSMessageSender sender;
private final WSSignalHandler handler; private final WSSignalHandler handler;
private final WebSocket webSocket;
private boolean previousMessageSent = true; private boolean previousMessageSent = true;
private boolean canSendBinary = true; private boolean canSendBinary = true;
private boolean canSendText = true; private boolean canSendText = true;
WSTransmitter(Executor executor, RawChannel channel, Consumer<Throwable> errorHandler) { WSTransmitter(WebSocket ws, Executor executor, RawChannel channel, Consumer<Throwable> errorHandler) {
this.webSocket = ws;
this.handler = new WSSignalHandler(executor, this::handleSignal); this.handler = new WSSignalHandler(executor, this::handleSignal);
Consumer<Throwable> sendCompletion = (error) -> { Consumer<Throwable> sendCompletion = (error) -> {
synchronized (this) { synchronized (this) {
@ -76,41 +78,41 @@ final class WSTransmitter {
this.sender = new WSMessageSender(channel, sendCompletion); this.sender = new WSMessageSender(channel, sendCompletion);
} }
CompletableFuture<Void> sendText(CharSequence message, boolean isLast) { CompletableFuture<WebSocket> sendText(CharSequence message, boolean isLast) {
checkAndUpdateText(isLast); checkAndUpdateText(isLast);
return acceptMessage(new Text(isLast, message)); return acceptMessage(new Text(isLast, message));
} }
CompletableFuture<Void> sendText(Stream<? extends CharSequence> message) { CompletableFuture<WebSocket> sendText(Stream<? extends CharSequence> message) {
checkAndUpdateText(true); checkAndUpdateText(true);
return acceptMessage(new StreamedText(message)); return acceptMessage(new StreamedText(message));
} }
CompletableFuture<Void> sendBinary(ByteBuffer message, boolean isLast) { CompletableFuture<WebSocket> sendBinary(ByteBuffer message, boolean isLast) {
checkAndUpdateBinary(isLast); checkAndUpdateBinary(isLast);
return acceptMessage(new Binary(isLast, message)); return acceptMessage(new Binary(isLast, message));
} }
CompletableFuture<Void> sendPing(ByteBuffer message) { CompletableFuture<WebSocket> sendPing(ByteBuffer message) {
checkSize(message.remaining(), 125); checkSize(message.remaining(), 125);
return acceptMessage(new Ping(message)); return acceptMessage(new Ping(message));
} }
CompletableFuture<Void> sendPong(ByteBuffer message) { CompletableFuture<WebSocket> sendPong(ByteBuffer message) {
checkSize(message.remaining(), 125); checkSize(message.remaining(), 125);
return acceptMessage(new Pong(message)); return acceptMessage(new Pong(message));
} }
CompletableFuture<Void> sendClose(WebSocket.CloseCode code, CharSequence reason) { CompletableFuture<WebSocket> sendClose(WebSocket.CloseCode code, CharSequence reason) {
return acceptMessage(createCloseMessage(code, reason)); return acceptMessage(createCloseMessage(code, reason));
} }
CompletableFuture<Void> sendClose() { CompletableFuture<WebSocket> sendClose() {
return acceptMessage(new Close(ByteBuffer.allocate(0))); return acceptMessage(new Close(ByteBuffer.allocate(0)));
} }
private CompletableFuture<Void> acceptMessage(WSOutgoingMessage m) { private CompletableFuture<WebSocket> acceptMessage(WSOutgoingMessage m) {
CompletableFuture<Void> cf = new CompletableFuture<>(); CompletableFuture<WebSocket> cf = new CompletableFuture<>();
synchronized (this) { synchronized (this) {
backlog.offer(pair(m, cf)); backlog.offer(pair(m, cf));
} }
@ -123,11 +125,11 @@ final class WSTransmitter {
synchronized (this) { synchronized (this) {
while (!backlog.isEmpty() && previousMessageSent) { while (!backlog.isEmpty() && previousMessageSent) {
previousMessageSent = false; previousMessageSent = false;
Pair<WSOutgoingMessage, CompletableFuture<Void>> p = backlog.peek(); Pair<WSOutgoingMessage, CompletableFuture<WebSocket>> p = backlog.peek();
boolean sent = sender.trySendFully(p.first); boolean sent = sender.trySendFully(p.first);
if (sent) { if (sent) {
backlog.remove(); backlog.remove();
p.second.complete(null); p.second.complete(webSocket);
previousMessageSent = true; previousMessageSent = true;
} }
} }

View File

@ -52,8 +52,8 @@ import java.util.stream.Stream;
* *
* <p> Messages of type {@code X} are sent through the {@code WebSocket.sendX} * <p> Messages of type {@code X} are sent through the {@code WebSocket.sendX}
* methods and received through {@link WebSocket.Listener}{@code .onX} methods * methods and received through {@link WebSocket.Listener}{@code .onX} methods
* asynchronously. Each of the methods begins the operation and returns a {@link * asynchronously. Each of the methods returns a {@link CompletionStage} which
* CompletionStage} which completes when the operation has completed. * completes when the operation has completed.
* *
* <p> Messages are received only if {@linkplain #request(long) requested}. * <p> Messages are received only if {@linkplain #request(long) requested}.
* *
@ -79,6 +79,9 @@ import java.util.stream.Stream;
* or method of this type will cause a {@link NullPointerException * or method of this type will cause a {@link NullPointerException
* NullPointerException} to be thrown. * NullPointerException} to be thrown.
* *
* @implNote The default implementation's methods do not block before returning
* a {@code CompletableFuture}.
*
* @since 9 * @since 9
*/ */
public interface WebSocket { public interface WebSocket {
@ -234,9 +237,9 @@ public interface WebSocket {
/** /**
* Builds a {@code WebSocket}. * Builds a {@code WebSocket}.
* *
* <p> Returns immediately with a {@code CompletableFuture<WebSocket>} * <p> Returns a {@code CompletableFuture<WebSocket>} which completes
* which completes with the {@code WebSocket} when it is connected, or * normally with the {@code WebSocket} when it is connected or completes
* completes exceptionally if an error occurs. * exceptionally if an error occurs.
* *
* <p> {@code CompletableFuture} may complete exceptionally with the * <p> {@code CompletableFuture} may complete exceptionally with the
* following errors: * following errors:
@ -252,7 +255,7 @@ public interface WebSocket {
* if the opening handshake fails * if the opening handshake fails
* </ul> * </ul>
* *
* @return a {@code CompletableFuture} of {@code WebSocket} * @return a {@code CompletableFuture} with the {@code WebSocket}
*/ */
CompletableFuture<WebSocket> buildAsync(); CompletableFuture<WebSocket> buildAsync();
} }
@ -601,9 +604,9 @@ public interface WebSocket {
/** /**
* Sends a Text message with characters from the given {@code CharSequence}. * Sends a Text message with characters from the given {@code CharSequence}.
* *
* <p> Returns immediately with a {@code CompletableFuture<Void>} which * <p> Returns a {@code CompletableFuture<WebSocket>} which completes
* completes normally when the message has been sent, or completes * normally when the message has been sent or completes exceptionally if an
* exceptionally if an error occurs. * error occurs.
* *
* <p> The {@code CharSequence} should not be modified until the returned * <p> The {@code CharSequence} should not be modified until the returned
* {@code CompletableFuture} completes (either normally or exceptionally). * {@code CompletableFuture} completes (either normally or exceptionally).
@ -612,9 +615,12 @@ public interface WebSocket {
* with: * with:
* <ul> * <ul>
* <li> {@link IOException} * <li> {@link IOException}
* if an I/O error occurs during this operation; or the * if an I/O error occurs during this operation
* {@code WebSocket} closes while this operation is in progress; * <li> {@link IllegalStateException}
* or the {@code message} is a malformed UTF-16 sequence * if the {@code WebSocket} closes while this operation is in progress;
* or if a Close message has been sent already;
* or if there is an outstanding send operation;
* or if a previous Binary message was not sent with {@code isLast == true}
* </ul> * </ul>
* *
* @implNote This implementation does not accept partial UTF-16 * @implNote This implementation does not accept partial UTF-16
@ -624,22 +630,15 @@ public interface WebSocket {
* @param message * @param message
* the message * the message
* @param isLast * @param isLast
* {@code true} if this is the final part of the message * {@code true} if this is the final part of the message,
* {@code false} otherwise * {@code false} otherwise
* *
* @return a CompletableFuture of Void * @return a CompletableFuture with this WebSocket
* *
* @throws IllegalStateException * @throws IllegalArgumentException
* if the WebSocket is closed * if {@code message} is a malformed (or an incomplete) UTF-16 sequence
* @throws IllegalStateException
* if a Close message has been already sent
* @throws IllegalStateException
* if there is an outstanding send operation
* @throws IllegalStateException
* if a previous Binary message was not sent
* with {@code isLast == true}
*/ */
CompletableFuture<Void> sendText(CharSequence message, boolean isLast); CompletableFuture<WebSocket> sendText(CharSequence message, boolean isLast);
/** /**
* Sends a whole Text message with characters from the given {@code * Sends a whole Text message with characters from the given {@code
@ -648,9 +647,9 @@ public interface WebSocket {
* <p> This is a convenience method. For the general case, use {@link * <p> This is a convenience method. For the general case, use {@link
* #sendText(CharSequence, boolean)}. * #sendText(CharSequence, boolean)}.
* *
* <p> Returns immediately with a {@code CompletableFuture<Void>} which * <p> Returns a {@code CompletableFuture<WebSocket>} which completes
* completes normally when the message has been sent, or completes * normally when the message has been sent or completes exceptionally if an
* exceptionally if an error occurs. * error occurs.
* *
* <p> The {@code CharSequence} should not be modified until the returned * <p> The {@code CharSequence} should not be modified until the returned
* {@code CompletableFuture} completes (either normally or exceptionally). * {@code CompletableFuture} completes (either normally or exceptionally).
@ -659,27 +658,23 @@ public interface WebSocket {
* with: * with:
* <ul> * <ul>
* <li> {@link IOException} * <li> {@link IOException}
* if an I/O error occurs during this operation; * if an I/O error occurs during this operation
* or the {@code WebSocket} closes while this operation is in progress; * <li> {@link IllegalStateException}
* or the message is a malformed (or an incomplete) UTF-16 sequence * if the {@code WebSocket} closes while this operation is in progress;
* or if a Close message has been sent already;
* or if there is an outstanding send operation;
* or if a previous Binary message was not sent with {@code isLast == true}
* </ul> * </ul>
* *
* @param message * @param message
* the message * the message
* *
* @return a CompletableFuture of Void * @return a CompletableFuture with this WebSocket
* *
* @throws IllegalStateException * @throws IllegalArgumentException
* if the WebSocket is closed * if {@code message} is a malformed (or an incomplete) UTF-16 sequence
* @throws IllegalStateException
* if a Close message has been already sent
* @throws IllegalStateException
* if there is an outstanding send operation
* @throws IllegalStateException
* if a previous Binary message was not sent
* with {@code isLast == true}
*/ */
default CompletableFuture<Void> sendText(CharSequence message) { default CompletableFuture<WebSocket> sendText(CharSequence message) {
return sendText(message, true); return sendText(message, true);
} }
@ -690,9 +685,9 @@ public interface WebSocket {
* <p> This is a convenience method. For the general case use {@link * <p> This is a convenience method. For the general case use {@link
* #sendText(CharSequence, boolean)}. * #sendText(CharSequence, boolean)}.
* *
* <p> Returns immediately with a {@code CompletableFuture<Void>} which * <p> Returns a {@code CompletableFuture<WebSocket>} which completes
* completes normally when the message has been sent, or completes * normally when the message has been sent or completes exceptionally if an
* exceptionally if an error occurs. * error occurs.
* *
* <p> Streamed character sequences should not be modified until the * <p> Streamed character sequences should not be modified until the
* returned {@code CompletableFuture} completes (either normally or * returned {@code CompletableFuture} completes (either normally or
@ -702,41 +697,41 @@ public interface WebSocket {
* with: * with:
* <ul> * <ul>
* <li> {@link IOException} * <li> {@link IOException}
* if an I/O error occurs during this operation; * if an I/O error occurs during this operation
* or the {@code WebSocket} closes while this operation is in progress; * <li> {@link IllegalStateException}
* or the message is a malformed (or an incomplete) UTF-16 sequence * if the {@code WebSocket} closes while this operation is in progress;
* or if a Close message has been sent already;
* or if there is an outstanding send operation;
* or if a previous Binary message was not sent with {@code isLast == true}
* </ul> * </ul>
* *
* @param message * @param message
* the message * the message
* *
* @return a CompletableFuture of Void * @return a CompletableFuture with this WebSocket
* *
* @throws IllegalStateException * @throws IllegalArgumentException
* if the WebSocket is closed * if {@code message} is a malformed (or an incomplete) UTF-16 sequence
* @throws IllegalStateException
* if a Close message has been already sent
* @throws IllegalStateException
* if there is an outstanding send operation
* @throws IllegalStateException
* if a previous Binary message was not sent
* with {@code isLast == true}
*/ */
CompletableFuture<Void> sendText(Stream<? extends CharSequence> message); CompletableFuture<WebSocket> sendText(Stream<? extends CharSequence> message);
/** /**
* Sends a Binary message with bytes from the given {@code ByteBuffer}. * Sends a Binary message with bytes from the given {@code ByteBuffer}.
* *
* <p> Returns immediately with a {@code CompletableFuture<Void>} which * <p> Returns a {@code CompletableFuture<WebSocket>} which completes
* completes normally when the message has been sent, or completes * normally when the message has been sent or completes exceptionally if an
* exceptionally if an error occurs. * error occurs.
* *
* <p> The returned {@code CompletableFuture} can complete exceptionally * <p> The returned {@code CompletableFuture} can complete exceptionally
* with: * with:
* <ul> * <ul>
* <li> {@link IOException} * <li> {@link IOException}
* if an I/O error occurs during this operation or the * if an I/O error occurs during this operation
* {@code WebSocket} closes while this operation is in progress * <li> {@link IllegalStateException}
* if the {@code WebSocket} closes while this operation is in progress;
* or if a Close message has been sent already;
* or if there is an outstanding send operation;
* or if a previous Text message was not sent with {@code isLast == true}
* </ul> * </ul>
* *
* @param message * @param message
@ -745,33 +740,27 @@ public interface WebSocket {
* {@code true} if this is the final part of the message, * {@code true} if this is the final part of the message,
* {@code false} otherwise * {@code false} otherwise
* *
* @return a CompletableFuture of Void * @return a CompletableFuture with this WebSocket
*
* @throws IllegalStateException
* if the WebSocket is closed
* @throws IllegalStateException
* if a Close message has been already sent
* @throws IllegalStateException
* if there is an outstanding send operation
* @throws IllegalStateException
* if a previous Text message was not sent
* with {@code isLast == true}
*/ */
CompletableFuture<Void> sendBinary(ByteBuffer message, boolean isLast); CompletableFuture<WebSocket> sendBinary(ByteBuffer message, boolean isLast);
/** /**
* Sends a Binary message with bytes from the given {@code byte[]}. * Sends a Binary message with bytes from the given {@code byte[]}.
* *
* <p> Returns immediately with a {@code CompletableFuture<Void>} which * <p> Returns a {@code CompletableFuture<WebSocket>} which completes
* completes normally when the message has been sent, or completes * normally when the message has been sent or completes exceptionally if an
* exceptionally if an error occurs. * error occurs.
* *
* <p> The returned {@code CompletableFuture} can complete exceptionally * <p> The returned {@code CompletableFuture} can complete exceptionally
* with: * with:
* <ul> * <ul>
* <li> {@link IOException} * <li> {@link IOException}
* if an I/O error occurs during this operation or the * if an I/O error occurs during this operation
* {@code WebSocket} closes while this operation is in progress * <li> {@link IllegalStateException}
* if the {@code WebSocket} closes while this operation is in progress;
* or if a Close message has been sent already;
* or if there is an outstanding send operation;
* or if a previous Text message was not sent with {@code isLast == true}
* </ul> * </ul>
* *
* @implSpec This is equivalent to: * @implSpec This is equivalent to:
@ -785,19 +774,9 @@ public interface WebSocket {
* {@code true} if this is the final part of the message, * {@code true} if this is the final part of the message,
* {@code false} otherwise * {@code false} otherwise
* *
* @return a CompletableFuture of Void * @return a CompletableFuture with this WebSocket
*
* @throws IllegalStateException
* if the WebSocket is closed
* @throws IllegalStateException
* if a Close message has been already sent
* @throws IllegalStateException
* if there is an outstanding send operation
* @throws IllegalStateException
* if a previous Text message was not sent
* with {@code isLast == true}
*/ */
default CompletableFuture<Void> sendBinary(byte[] message, boolean isLast) { default CompletableFuture<WebSocket> sendBinary(byte[] message, boolean isLast) {
Objects.requireNonNull(message, "message"); Objects.requireNonNull(message, "message");
return sendBinary(ByteBuffer.wrap(message), isLast); return sendBinary(ByteBuffer.wrap(message), isLast);
} }
@ -805,9 +784,9 @@ public interface WebSocket {
/** /**
* Sends a Ping message. * Sends a Ping message.
* *
* <p> Returns immediately with a {@code CompletableFuture<Void>} which * <p> Returns a {@code CompletableFuture<WebSocket>} which completes
* completes normally when the message has been sent, or completes * normally when the message has been sent or completes exceptionally if an
* exceptionally if an error occurs. * error occurs.
* *
* <p> A Ping message may be sent or received by either client or server. * <p> A Ping message may be sent or received by either client or server.
* It may serve either as a keepalive or as a means to verify that the * It may serve either as a keepalive or as a means to verify that the
@ -820,32 +799,29 @@ public interface WebSocket {
* with: * with:
* <ul> * <ul>
* <li> {@link IOException} * <li> {@link IOException}
* if an I/O error occurs during this operation or the * if an I/O error occurs during this operation
* {@code WebSocket} closes while this operation is in progress * <li> {@link IllegalStateException}
* if the {@code WebSocket} closes while this operation is in progress;
* or if a Close message has been sent already;
* or if there is an outstanding send operation
* </ul> * </ul>
* *
* @param message * @param message
* the message * the message
* *
* @return a CompletableFuture of Void * @return a CompletableFuture with this WebSocket
* *
* @throws IllegalStateException
* if the WebSocket is closed
* @throws IllegalStateException
* if a Close message has been already sent
* @throws IllegalStateException
* if there is an outstanding send operation
* @throws IllegalArgumentException * @throws IllegalArgumentException
* if {@code message.remaining() > 125} * if {@code message.remaining() > 125}
*/ */
CompletableFuture<Void> sendPing(ByteBuffer message); CompletableFuture<WebSocket> sendPing(ByteBuffer message);
/** /**
* Sends a Pong message. * Sends a Pong message.
* *
* <p> Returns immediately with a {@code CompletableFuture<Void>} which * <p> Returns a {@code CompletableFuture<WebSocket>} which completes
* completes normally when the message has been sent, or completes * normally when the message has been sent or completes exceptionally if an
* exceptionally if an error occurs. * error occurs.
* *
* <p> A Pong message may be unsolicited or may be sent in response to a * <p> A Pong message may be unsolicited or may be sent in response to a
* previously received Ping. In latter case the contents of the Pong is * previously received Ping. In latter case the contents of the Pong is
@ -858,32 +834,29 @@ public interface WebSocket {
* with: * with:
* <ul> * <ul>
* <li> {@link IOException} * <li> {@link IOException}
* if an I/O error occurs during this operation or the * if an I/O error occurs during this operation
* {@code WebSocket} closes while this operation is in progress * <li> {@link IllegalStateException}
* if the {@code WebSocket} closes while this operation is in progress;
* or if a Close message has been sent already;
* or if there is an outstanding send operation
* </ul> * </ul>
* *
* @param message * @param message
* the message * the message
* *
* @return a CompletableFuture of Void * @return a CompletableFuture with this WebSocket
* *
* @throws IllegalStateException
* if the WebSocket is closed
* @throws IllegalStateException
* if a Close message has been already sent
* @throws IllegalStateException
* if there is an outstanding send operation
* @throws IllegalArgumentException * @throws IllegalArgumentException
* if {@code message.remaining() > 125} * if {@code message.remaining() > 125}
*/ */
CompletableFuture<Void> sendPong(ByteBuffer message); CompletableFuture<WebSocket> sendPong(ByteBuffer message);
/** /**
* Sends a Close message with the given close code and the reason. * Sends a Close message with the given close code and the reason.
* *
* <p> Returns immediately with a {@code CompletableFuture<Void>} which * <p> Returns a {@code CompletableFuture<WebSocket>} which completes
* completes normally when the message has been sent, or completes * normally when the message has been sent or completes exceptionally if an
* exceptionally if an error occurs. * error occurs.
* *
* <p> A Close message may consist of a close code and a reason for closing. * <p> A Close message may consist of a close code and a reason for closing.
* The reason must have a valid UTF-8 representation not longer than {@code * The reason must have a valid UTF-8 representation not longer than {@code
@ -894,8 +867,11 @@ public interface WebSocket {
* with: * with:
* <ul> * <ul>
* <li> {@link IOException} * <li> {@link IOException}
* if an I/O error occurs during this operation or the * if an I/O error occurs during this operation
* {@code WebSocket} closes while this operation is in progress * <li> {@link IllegalStateException}
* if the {@code WebSocket} closes while this operation is in progress;
* or if a Close message has been sent already;
* or if there is an outstanding send operation
* </ul> * </ul>
* *
* @param code * @param code
@ -903,45 +879,35 @@ public interface WebSocket {
* @param reason * @param reason
* the reason; can be empty * the reason; can be empty
* *
* @return a CompletableFuture of Void * @return a CompletableFuture with this WebSocket
* *
* @throws IllegalStateException
* if the WebSocket is closed
* @throws IllegalStateException
* if a Close message has been already sent
* @throws IllegalStateException
* if there is an outstanding send operation
* @throws IllegalArgumentException * @throws IllegalArgumentException
* if the {@code reason} doesn't have a valid UTF-8 * if {@code reason} doesn't have an UTF-8 representation not longer
* representation not longer than {@code 123} bytes * than {@code 123} bytes
*/ */
CompletableFuture<Void> sendClose(CloseCode code, CharSequence reason); CompletableFuture<WebSocket> sendClose(CloseCode code, CharSequence reason);
/** /**
* Sends an empty Close message. * Sends an empty Close message.
* *
* <p> Returns immediately with a {@code CompletableFuture<Void>} which * <p> Returns a {@code CompletableFuture<WebSocket>} which completes
* completes normally when the message has been sent, or completes * normally when the message has been sent or completes exceptionally if an
* exceptionally if an error occurs. * error occurs.
* *
* <p> The returned {@code CompletableFuture} can complete exceptionally * <p> The returned {@code CompletableFuture} can complete exceptionally
* with: * with:
* <ul> * <ul>
* <li> {@link IOException} * <li> {@link IOException}
* if an I/O error occurs during this operation or the * if an I/O error occurs during this operation
* {@code WebSocket} closes while this operation is in progress * <li> {@link IllegalStateException}
* if the {@code WebSocket} closes while this operation is in progress;
* or if a Close message has been sent already;
* or if there is an outstanding send operation
* </ul> * </ul>
* *
* @return a CompletableFuture of Void * @return a CompletableFuture with this WebSocket
*
* @throws IllegalStateException
* if the WebSocket is closed
* @throws IllegalStateException
* if a Close message has been already sent
* @throws IllegalStateException
* if there is an outstanding send operation
*/ */
CompletableFuture<Void> sendClose(); CompletableFuture<WebSocket> sendClose();
/** /**
* Requests {@code n} more messages to be received by the {@link Listener * Requests {@code n} more messages to be received by the {@link Listener

View File

@ -30,6 +30,7 @@ import java.net.http.HttpClient;
import java.net.http.WebSocket; import java.net.http.WebSocket;
import java.net.http.WebSocket.CloseCode; import java.net.http.WebSocket.CloseCode;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -90,12 +91,24 @@ public class BasicWebSocketAPITest {
"message", "message",
() -> ws.sendBinary((ByteBuffer) null, true)) () -> ws.sendBinary((ByteBuffer) null, true))
); );
checkAndClose(
(ws) ->
TestKit.assertThrows(IllegalArgumentException.class,
".*message.*",
() -> ws.sendPing(ByteBuffer.allocate(126)))
);
checkAndClose( checkAndClose(
(ws) -> (ws) ->
TestKit.assertThrows(NullPointerException.class, TestKit.assertThrows(NullPointerException.class,
"message", "message",
() -> ws.sendPing(null)) () -> ws.sendPing(null))
); );
checkAndClose(
(ws) ->
TestKit.assertThrows(IllegalArgumentException.class,
".*message.*",
() -> ws.sendPong(ByteBuffer.allocate(126)))
);
checkAndClose( checkAndClose(
(ws) -> (ws) ->
TestKit.assertThrows(NullPointerException.class, TestKit.assertThrows(NullPointerException.class,
@ -106,7 +119,7 @@ public class BasicWebSocketAPITest {
(ws) -> (ws) ->
TestKit.assertThrows(NullPointerException.class, TestKit.assertThrows(NullPointerException.class,
"message", "message",
() -> ws.sendText((CharSequence) null, true)) () -> ws.sendText(null, true))
); );
checkAndClose( checkAndClose(
(ws) -> (ws) ->
@ -120,6 +133,12 @@ public class BasicWebSocketAPITest {
"message", "message",
() -> ws.sendText((Stream<? extends CharSequence>) null)) () -> ws.sendText((Stream<? extends CharSequence>) null))
); );
checkAndClose(
(ws) ->
TestKit.assertThrows(IllegalArgumentException.class,
"(?i).*reason.*",
() -> ws.sendClose(CloseCode.NORMAL_CLOSURE, CharBuffer.allocate(124)))
);
checkAndClose( checkAndClose(
(ws) -> (ws) ->
TestKit.assertThrows(NullPointerException.class, TestKit.assertThrows(NullPointerException.class,