8217825: Verify @AfterTest is used correctly in WebSocket tests

Remove @AfterTest tags; added in explicit closing of resources e.g. httpServer, webSocket, etc

Reviewed-by: dfuchs, prappo
This commit is contained in:
Patrick Concannon 2019-09-23 16:53:16 +01:00
parent d15a57b842
commit 11d43732bc
4 changed files with 848 additions and 792 deletions

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2018, 2019, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -29,7 +29,6 @@
* Abort
*/
import org.testng.annotations.AfterTest;
import org.testng.annotations.Test;
import java.io.IOException;
@ -56,38 +55,35 @@ public class Abort {
private static final Class<IllegalArgumentException> IAE = IllegalArgumentException.class;
private static final Class<IOException> IOE = IOException.class;
private DummyWebSocketServer server;
private WebSocket webSocket;
@AfterTest
public void cleanup() {
server.close();
webSocket.abort();
}
@Test
public void onOpenThenAbort() throws Exception {
int[] bytes = new int[]{
0x88, 0x00, // opcode=close
};
server = Support.serverWithCannedData(bytes);
server.open();
// messages are available
MockListener listener = new MockListener() {
@Override
protected void onOpen0(WebSocket webSocket) {
// unbounded request
webSocket.request(Long.MAX_VALUE);
try (var server = Support.serverWithCannedData(bytes)) {
server.open();
// messages are available
MockListener listener = new MockListener() {
@Override
protected void onOpen0(WebSocket webSocket) {
// unbounded request
webSocket.request(Long.MAX_VALUE);
webSocket.abort();
}
};
var webSocket = newHttpClient().newWebSocketBuilder()
.buildAsync(server.getURI(), listener)
.join();
try {
TimeUnit.SECONDS.sleep(5);
List<MockListener.Invocation> inv = listener.invocationsSoFar();
// no more invocations after onOpen as WebSocket was aborted
assertEquals(inv, List.of(MockListener.Invocation.onOpen(webSocket)));
} finally {
webSocket.abort();
}
};
webSocket = newHttpClient().newWebSocketBuilder()
.buildAsync(server.getURI(), listener)
.join();
TimeUnit.SECONDS.sleep(5);
List<MockListener.Invocation> inv = listener.invocationsSoFar();
// no more invocations after onOpen as WebSocket was aborted
assertEquals(inv, List.of(MockListener.Invocation.onOpen(webSocket)));
}
}
@Test
@ -96,33 +92,38 @@ public class Abort {
0x81, 0x00, // opcode=text, fin=true
0x88, 0x00, // opcode=close
};
server = Support.serverWithCannedData(bytes);
server.open();
MockListener listener = new MockListener() {
@Override
protected void onOpen0(WebSocket webSocket) {
// unbounded request
webSocket.request(Long.MAX_VALUE);
}
try (var server = Support.serverWithCannedData(bytes)) {
server.open();
MockListener listener = new MockListener() {
@Override
protected void onOpen0(WebSocket webSocket) {
// unbounded request
webSocket.request(Long.MAX_VALUE);
}
@Override
protected CompletionStage<?> onText0(WebSocket webSocket,
CharSequence message,
boolean last) {
@Override
protected CompletionStage<?> onText0(WebSocket webSocket,
CharSequence message,
boolean last) {
webSocket.abort();
return super.onText0(webSocket, message, last);
}
};
var webSocket = newHttpClient().newWebSocketBuilder()
.buildAsync(server.getURI(), listener)
.join();
try {
TimeUnit.SECONDS.sleep(5);
List<MockListener.Invocation> inv = listener.invocationsSoFar();
// no more invocations after onOpen, onBinary as WebSocket was aborted
List<MockListener.Invocation> expected = List.of(
MockListener.Invocation.onOpen(webSocket),
MockListener.Invocation.onText(webSocket, "", true));
assertEquals(inv, expected);
} finally {
webSocket.abort();
return super.onText0(webSocket, message, last);
}
};
webSocket = newHttpClient().newWebSocketBuilder()
.buildAsync(server.getURI(), listener)
.join();
TimeUnit.SECONDS.sleep(5);
List<MockListener.Invocation> inv = listener.invocationsSoFar();
// no more invocations after onOpen, onBinary as WebSocket was aborted
List<MockListener.Invocation> expected = List.of(
MockListener.Invocation.onOpen(webSocket),
MockListener.Invocation.onText(webSocket, "", true));
assertEquals(inv, expected);
}
}
@Test
@ -131,33 +132,38 @@ public class Abort {
0x82, 0x00, // opcode=binary, fin=true
0x88, 0x00, // opcode=close
};
server = Support.serverWithCannedData(bytes);
server.open();
MockListener listener = new MockListener() {
@Override
protected void onOpen0(WebSocket webSocket) {
// unbounded request
webSocket.request(Long.MAX_VALUE);
}
try (var server = Support.serverWithCannedData(bytes)) {
server.open();
MockListener listener = new MockListener() {
@Override
protected void onOpen0(WebSocket webSocket) {
// unbounded request
webSocket.request(Long.MAX_VALUE);
}
@Override
protected CompletionStage<?> onBinary0(WebSocket webSocket,
ByteBuffer message,
boolean last) {
@Override
protected CompletionStage<?> onBinary0(WebSocket webSocket,
ByteBuffer message,
boolean last) {
webSocket.abort();
return super.onBinary0(webSocket, message, last);
}
};
var webSocket = newHttpClient().newWebSocketBuilder()
.buildAsync(server.getURI(), listener)
.join();
try {
TimeUnit.SECONDS.sleep(5);
List<MockListener.Invocation> inv = listener.invocationsSoFar();
// no more invocations after onOpen, onBinary as WebSocket was aborted
List<MockListener.Invocation> expected = List.of(
MockListener.Invocation.onOpen(webSocket),
MockListener.Invocation.onBinary(webSocket, ByteBuffer.allocate(0), true));
assertEquals(inv, expected);
} finally {
webSocket.abort();
return super.onBinary0(webSocket, message, last);
}
};
webSocket = newHttpClient().newWebSocketBuilder()
.buildAsync(server.getURI(), listener)
.join();
TimeUnit.SECONDS.sleep(5);
List<MockListener.Invocation> inv = listener.invocationsSoFar();
// no more invocations after onOpen, onBinary as WebSocket was aborted
List<MockListener.Invocation> expected = List.of(
MockListener.Invocation.onOpen(webSocket),
MockListener.Invocation.onBinary(webSocket, ByteBuffer.allocate(0), true));
assertEquals(inv, expected);
}
}
@Test
@ -166,32 +172,37 @@ public class Abort {
0x89, 0x00, // opcode=ping
0x88, 0x00, // opcode=close
};
server = Support.serverWithCannedData(bytes);
server.open();
MockListener listener = new MockListener() {
@Override
protected void onOpen0(WebSocket webSocket) {
// unbounded request
webSocket.request(Long.MAX_VALUE);
}
try (var server = Support.serverWithCannedData(bytes)) {
server.open();
MockListener listener = new MockListener() {
@Override
protected void onOpen0(WebSocket webSocket) {
// unbounded request
webSocket.request(Long.MAX_VALUE);
}
@Override
protected CompletionStage<?> onPing0(WebSocket webSocket,
ByteBuffer message) {
@Override
protected CompletionStage<?> onPing0(WebSocket webSocket,
ByteBuffer message) {
webSocket.abort();
return super.onPing0(webSocket, message);
}
};
var webSocket = newHttpClient().newWebSocketBuilder()
.buildAsync(server.getURI(), listener)
.join();
try {
TimeUnit.SECONDS.sleep(5);
List<MockListener.Invocation> inv = listener.invocationsSoFar();
// no more invocations after onOpen, onPing as WebSocket was aborted
List<MockListener.Invocation> expected = List.of(
MockListener.Invocation.onOpen(webSocket),
MockListener.Invocation.onPing(webSocket, ByteBuffer.allocate(0)));
assertEquals(inv, expected);
} finally {
webSocket.abort();
return super.onPing0(webSocket, message);
}
};
webSocket = newHttpClient().newWebSocketBuilder()
.buildAsync(server.getURI(), listener)
.join();
TimeUnit.SECONDS.sleep(5);
List<MockListener.Invocation> inv = listener.invocationsSoFar();
// no more invocations after onOpen, onPing as WebSocket was aborted
List<MockListener.Invocation> expected = List.of(
MockListener.Invocation.onOpen(webSocket),
MockListener.Invocation.onPing(webSocket, ByteBuffer.allocate(0)));
assertEquals(inv, expected);
}
}
@Test
@ -200,32 +211,37 @@ public class Abort {
0x8a, 0x00, // opcode=pong
0x88, 0x00, // opcode=close
};
server = Support.serverWithCannedData(bytes);
server.open();
MockListener listener = new MockListener() {
@Override
protected void onOpen0(WebSocket webSocket) {
// unbounded request
webSocket.request(Long.MAX_VALUE);
}
try (var server = Support.serverWithCannedData(bytes)) {
server.open();
MockListener listener = new MockListener() {
@Override
protected void onOpen0(WebSocket webSocket) {
// unbounded request
webSocket.request(Long.MAX_VALUE);
}
@Override
protected CompletionStage<?> onPong0(WebSocket webSocket,
ByteBuffer message) {
@Override
protected CompletionStage<?> onPong0(WebSocket webSocket,
ByteBuffer message) {
webSocket.abort();
return super.onPong0(webSocket, message);
}
};
var webSocket = newHttpClient().newWebSocketBuilder()
.buildAsync(server.getURI(), listener)
.join();
try {
TimeUnit.SECONDS.sleep(5);
List<MockListener.Invocation> inv = listener.invocationsSoFar();
// no more invocations after onOpen, onPong as WebSocket was aborted
List<MockListener.Invocation> expected = List.of(
MockListener.Invocation.onOpen(webSocket),
MockListener.Invocation.onPong(webSocket, ByteBuffer.allocate(0)));
assertEquals(inv, expected);
} finally {
webSocket.abort();
return super.onPong0(webSocket, message);
}
};
webSocket = newHttpClient().newWebSocketBuilder()
.buildAsync(server.getURI(), listener)
.join();
TimeUnit.SECONDS.sleep(5);
List<MockListener.Invocation> inv = listener.invocationsSoFar();
// no more invocations after onOpen, onPong as WebSocket was aborted
List<MockListener.Invocation> expected = List.of(
MockListener.Invocation.onOpen(webSocket),
MockListener.Invocation.onPong(webSocket, ByteBuffer.allocate(0)));
assertEquals(inv, expected);
}
}
@Test
@ -234,33 +250,38 @@ public class Abort {
0x88, 0x00, // opcode=close
0x8a, 0x00, // opcode=pong
};
server = Support.serverWithCannedData(bytes);
server.open();
MockListener listener = new MockListener() {
@Override
protected void onOpen0(WebSocket webSocket) {
// unbounded request
webSocket.request(Long.MAX_VALUE);
}
try (var server = Support.serverWithCannedData(bytes)) {
server.open();
MockListener listener = new MockListener() {
@Override
protected void onOpen0(WebSocket webSocket) {
// unbounded request
webSocket.request(Long.MAX_VALUE);
}
@Override
protected CompletionStage<?> onClose0(WebSocket webSocket,
int statusCode,
String reason) {
@Override
protected CompletionStage<?> onClose0(WebSocket webSocket,
int statusCode,
String reason) {
webSocket.abort();
return super.onClose0(webSocket, statusCode, reason);
}
};
var webSocket = newHttpClient().newWebSocketBuilder()
.buildAsync(server.getURI(), listener)
.join();
try {
TimeUnit.SECONDS.sleep(5);
List<MockListener.Invocation> inv = listener.invocationsSoFar();
// no more invocations after onOpen, onClose
List<MockListener.Invocation> expected = List.of(
MockListener.Invocation.onOpen(webSocket),
MockListener.Invocation.onClose(webSocket, 1005, ""));
assertEquals(inv, expected);
} finally {
webSocket.abort();
return super.onClose0(webSocket, statusCode, reason);
}
};
webSocket = newHttpClient().newWebSocketBuilder()
.buildAsync(server.getURI(), listener)
.join();
TimeUnit.SECONDS.sleep(5);
List<MockListener.Invocation> inv = listener.invocationsSoFar();
// no more invocations after onOpen, onClose
List<MockListener.Invocation> expected = List.of(
MockListener.Invocation.onOpen(webSocket),
MockListener.Invocation.onClose(webSocket, 1005, ""));
assertEquals(inv, expected);
}
}
@Test
@ -271,32 +292,37 @@ public class Abort {
int[] bytes = new int[badPingHeader.length + 128 + closeMessage.length];
System.arraycopy(badPingHeader, 0, bytes, 0, badPingHeader.length);
System.arraycopy(closeMessage, 0, bytes, badPingHeader.length + 128, closeMessage.length);
server = Support.serverWithCannedData(bytes);
server.open();
MockListener listener = new MockListener() {
@Override
protected void onOpen0(WebSocket webSocket) {
// unbounded request
webSocket.request(Long.MAX_VALUE);
}
try (var server = Support.serverWithCannedData(bytes)) {
server.open();
MockListener listener = new MockListener() {
@Override
protected void onOpen0(WebSocket webSocket) {
// unbounded request
webSocket.request(Long.MAX_VALUE);
}
@Override
protected void onError0(WebSocket webSocket, Throwable error) {
@Override
protected void onError0(WebSocket webSocket, Throwable error) {
webSocket.abort();
super.onError0(webSocket, error);
}
};
var webSocket = newHttpClient().newWebSocketBuilder()
.buildAsync(server.getURI(), listener)
.join();
try {
TimeUnit.SECONDS.sleep(5);
List<MockListener.Invocation> inv = listener.invocationsSoFar();
// no more invocations after onOpen, onError
List<MockListener.Invocation> expected = List.of(
MockListener.Invocation.onOpen(webSocket),
MockListener.Invocation.onError(webSocket, ProtocolException.class));
System.out.println("actual invocations:" + Arrays.toString(inv.toArray()));
assertEquals(inv, expected);
} finally {
webSocket.abort();
super.onError0(webSocket, error);
}
};
webSocket = newHttpClient().newWebSocketBuilder()
.buildAsync(server.getURI(), listener)
.join();
TimeUnit.SECONDS.sleep(5);
List<MockListener.Invocation> inv = listener.invocationsSoFar();
// no more invocations after onOpen, onError
List<MockListener.Invocation> expected = List.of(
MockListener.Invocation.onOpen(webSocket),
MockListener.Invocation.onError(webSocket, ProtocolException.class));
System.out.println("actual invocations:" + Arrays.toString(inv.toArray()));
assertEquals(inv, expected);
}
}
@Test
@ -352,65 +378,70 @@ public class Abort {
0x82, 0x00, // opcode=binary, fin=true
0x88, 0x00, // opcode=close
};
server = Support.serverWithCannedData(bytes);
server.open();
try (var server = Support.serverWithCannedData(bytes)) {
server.open();
WebSocket ws = newHttpClient()
.newWebSocketBuilder()
.buildAsync(server.getURI(), listener)
.join();
for (int i = 0; i < 3; i++) {
System.out.printf("iteration #%s%n", i);
// after the first abort() each consecutive one must be a no-op,
// moreover, query methods should continue to return consistent
// values
for (int j = 0; j < 3; j++) {
System.out.printf("abort #%s%n", j);
WebSocket ws = newHttpClient()
.newWebSocketBuilder()
.buildAsync(server.getURI(), listener)
.join();
try {
for (int i = 0; i < 3; i++) {
System.out.printf("iteration #%s%n", i);
// after the first abort() each consecutive one must be a no-op,
// moreover, query methods should continue to return consistent
// values
for (int j = 0; j < 3; j++) {
System.out.printf("abort #%s%n", j);
ws.abort();
assertTrue(ws.isInputClosed());
assertTrue(ws.isOutputClosed());
assertEquals(ws.getSubprotocol(), "");
}
// at this point valid requests MUST be a no-op:
for (int j = 0; j < 3; j++) {
System.out.printf("request #%s%n", j);
ws.request(1);
ws.request(2);
ws.request(8);
ws.request(Integer.MAX_VALUE);
ws.request(Long.MAX_VALUE);
// invalid requests MUST throw IAE:
assertThrows(IAE, () -> ws.request(Integer.MIN_VALUE));
assertThrows(IAE, () -> ws.request(Long.MIN_VALUE));
assertThrows(IAE, () -> ws.request(-1));
assertThrows(IAE, () -> ws.request(0));
}
}
// even though there is a bunch of messages readily available on the
// wire we shouldn't have received any of them as we aborted before
// the first request
try {
messageReceived.get(5, TimeUnit.SECONDS);
fail();
} catch (TimeoutException expected) {
System.out.println("Finished waiting");
}
for (int i = 0; i < 3; i++) {
System.out.printf("send #%s%n", i);
Support.assertFails(IOE, ws.sendText("text!", false));
Support.assertFails(IOE, ws.sendText("text!", true));
Support.assertFails(IOE, ws.sendBinary(ByteBuffer.allocate(16), false));
Support.assertFails(IOE, ws.sendBinary(ByteBuffer.allocate(16), true));
Support.assertFails(IOE, ws.sendPing(ByteBuffer.allocate(16)));
Support.assertFails(IOE, ws.sendPong(ByteBuffer.allocate(16)));
Support.assertFails(IOE, ws.sendClose(NORMAL_CLOSURE, "a reason"));
assertThrows(NPE, () -> ws.sendText(null, false));
assertThrows(NPE, () -> ws.sendText(null, true));
assertThrows(NPE, () -> ws.sendBinary(null, false));
assertThrows(NPE, () -> ws.sendBinary(null, true));
assertThrows(NPE, () -> ws.sendPing(null));
assertThrows(NPE, () -> ws.sendPong(null));
assertThrows(NPE, () -> ws.sendClose(NORMAL_CLOSURE, null));
}
} finally {
ws.abort();
assertTrue(ws.isInputClosed());
assertTrue(ws.isOutputClosed());
assertEquals(ws.getSubprotocol(), "");
}
// at this point valid requests MUST be a no-op:
for (int j = 0; j < 3; j++) {
System.out.printf("request #%s%n", j);
ws.request(1);
ws.request(2);
ws.request(8);
ws.request(Integer.MAX_VALUE);
ws.request(Long.MAX_VALUE);
// invalid requests MUST throw IAE:
assertThrows(IAE, () -> ws.request(Integer.MIN_VALUE));
assertThrows(IAE, () -> ws.request(Long.MIN_VALUE));
assertThrows(IAE, () -> ws.request(-1));
assertThrows(IAE, () -> ws.request(0));
}
}
// even though there is a bunch of messages readily available on the
// wire we shouldn't have received any of them as we aborted before
// the first request
try {
messageReceived.get(5, TimeUnit.SECONDS);
fail();
} catch (TimeoutException expected) {
System.out.println("Finished waiting");
}
for (int i = 0; i < 3; i++) {
System.out.printf("send #%s%n", i);
Support.assertFails(IOE, ws.sendText("text!", false));
Support.assertFails(IOE, ws.sendText("text!", true));
Support.assertFails(IOE, ws.sendBinary(ByteBuffer.allocate(16), false));
Support.assertFails(IOE, ws.sendBinary(ByteBuffer.allocate(16), true));
Support.assertFails(IOE, ws.sendPing(ByteBuffer.allocate(16)));
Support.assertFails(IOE, ws.sendPong(ByteBuffer.allocate(16)));
Support.assertFails(IOE, ws.sendClose(NORMAL_CLOSURE, "a reason"));
assertThrows(NPE, () -> ws.sendText(null, false));
assertThrows(NPE, () -> ws.sendText(null, true));
assertThrows(NPE, () -> ws.sendBinary(null, false));
assertThrows(NPE, () -> ws.sendBinary(null, true));
assertThrows(NPE, () -> ws.sendPing(null));
assertThrows(NPE, () -> ws.sendPong(null));
assertThrows(NPE, () -> ws.sendClose(NORMAL_CLOSURE, null));
}
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2018, 2019, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -28,8 +28,6 @@
* -Djdk.internal.httpclient.websocket.debug=true
* AutomaticPong
*/
import org.testng.annotations.AfterTest;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@ -46,16 +44,6 @@ import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
public class AutomaticPong {
private DummyWebSocketServer server;
private WebSocket webSocket;
@AfterTest
public void cleanup() {
server.close();
webSocket.abort();
}
/*
* The sendClose method has been invoked and a Ping comes from the server.
* Naturally, the client cannot reply with a Pong (the output has been
@ -72,32 +60,36 @@ public class AutomaticPong {
0x89, 0x06, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x3f, // ping hello?
0x88, 0x00, // close
};
server = Support.serverWithCannedData(bytes);
server.open();
MockListener listener = new MockListener() {
@Override
protected void onOpen0(WebSocket webSocket) {
/* request nothing */
try (var server = Support.serverWithCannedData(bytes)) {
server.open();
MockListener listener = new MockListener() {
@Override
protected void onOpen0(WebSocket webSocket) {
/* request nothing */
}
};
var webSocket = newHttpClient()
.newWebSocketBuilder()
.buildAsync(server.getURI(), listener)
.join();
try {
webSocket.sendClose(WebSocket.NORMAL_CLOSURE, "ok").join();
// now request all messages available
webSocket.request(Long.MAX_VALUE);
List<MockListener.Invocation> actual = listener.invocations();
ByteBuffer hello = ByteBuffer.wrap("hello?".getBytes(StandardCharsets.UTF_8));
ByteBuffer empty = ByteBuffer.allocate(0);
List<MockListener.Invocation> expected = List.of(
MockListener.Invocation.onOpen(webSocket),
MockListener.Invocation.onPing(webSocket, empty),
MockListener.Invocation.onPing(webSocket, hello),
MockListener.Invocation.onClose(webSocket, 1005, "")
);
assertEquals(actual, expected);
} finally {
webSocket.abort();
}
};
webSocket = newHttpClient()
.newWebSocketBuilder()
.buildAsync(server.getURI(), listener)
.join();
webSocket.sendClose(WebSocket.NORMAL_CLOSURE, "ok").join();
// now request all messages available
webSocket.request(Long.MAX_VALUE);
List<MockListener.Invocation> actual = listener.invocations();
ByteBuffer hello = ByteBuffer.wrap("hello?".getBytes(StandardCharsets.UTF_8));
ByteBuffer empty = ByteBuffer.allocate(0);
List<MockListener.Invocation> expected = List.of(
MockListener.Invocation.onOpen(webSocket),
MockListener.Invocation.onPing(webSocket, empty),
MockListener.Invocation.onPing(webSocket, hello),
MockListener.Invocation.onClose(webSocket, 1005, "")
);
assertEquals(actual, expected);
}
}
/*
@ -131,84 +123,100 @@ public class AutomaticPong {
.write(buffer);
buffer.putChar((char) 1000);
buffer.flip();
server = Support.serverWithCannedData(buffer.array());
server.open();
MockListener listener = new MockListener();
webSocket = newHttpClient()
.newWebSocketBuilder()
.buildAsync(server.getURI(), listener)
.join();
List<MockListener.Invocation> inv = listener.invocations();
assertEquals(inv.size(), nPings + 2); // n * onPing + onOpen + onClose
try (var server = Support.serverWithCannedData(buffer.array())) {
server.open();
MockListener listener = new MockListener();
var webSocket = newHttpClient()
.newWebSocketBuilder()
.buildAsync(server.getURI(), listener)
.join();
try {
List<MockListener.Invocation> inv = listener.invocations();
assertEquals(inv.size(), nPings + 2); // n * onPing + onOpen + onClose
ByteBuffer data = server.read();
Frame.Reader reader = new Frame.Reader();
ByteBuffer data = server.read();
Frame.Reader reader = new Frame.Reader();
Frame.Consumer consumer = new Frame.Consumer() {
Frame.Consumer consumer = new Frame.Consumer() {
ByteBuffer number = ByteBuffer.allocate(4);
Frame.Masker masker = new Frame.Masker();
int i = -1;
boolean closed;
ByteBuffer number = ByteBuffer.allocate(4);
Frame.Masker masker = new Frame.Masker();
int i = -1;
boolean closed;
@Override
public void fin(boolean value) { assertTrue(value); }
@Override
public void fin(boolean value) {
assertTrue(value);
}
@Override
public void rsv1(boolean value) { assertFalse(value); }
@Override
public void rsv1(boolean value) {
assertFalse(value);
}
@Override
public void rsv2(boolean value) { assertFalse(value); }
@Override
public void rsv2(boolean value) {
assertFalse(value);
}
@Override
public void rsv3(boolean value) { assertFalse(value); }
@Override
public void rsv3(boolean value) {
assertFalse(value);
}
@Override
public void opcode(Frame.Opcode value) {
if (value == Frame.Opcode.CLOSE) {
closed = true;
return;
@Override
public void opcode(Frame.Opcode value) {
if (value == Frame.Opcode.CLOSE) {
closed = true;
return;
}
assertEquals(value, Frame.Opcode.PONG);
}
@Override
public void mask(boolean value) {
assertTrue(value);
}
@Override
public void payloadLen(long value) {
if (!closed)
assertEquals(value, 4);
}
@Override
public void maskingKey(int value) {
masker.mask(value);
}
@Override
public void payloadData(ByteBuffer src) {
masker.transferMasking(src, number);
if (closed) {
return;
}
number.flip();
int n = number.getInt();
System.out.printf("pong number=%s%n", n);
number.clear();
// a Pong with the number less than the maximum of Pongs already
// received MUST never be received
if (i >= n) {
fail(String.format("i=%s, n=%s", i, n));
}
i = n;
}
@Override
public void endFrame() {
}
};
while (data.hasRemaining()) {
reader.readFrame(data, consumer);
}
assertEquals(value, Frame.Opcode.PONG);
} finally {
webSocket.abort();
}
@Override
public void mask(boolean value) { assertTrue(value); }
@Override
public void payloadLen(long value) {
if (!closed)
assertEquals(value, 4);
}
@Override
public void maskingKey(int value) {
masker.mask(value);
}
@Override
public void payloadData(ByteBuffer src) {
masker.transferMasking(src, number);
if (closed) {
return;
}
number.flip();
int n = number.getInt();
System.out.printf("pong number=%s%n", n);
number.clear();
// a Pong with the number less than the maximum of Pongs already
// received MUST never be received
if (i >= n) {
fail(String.format("i=%s, n=%s", i, n));
}
i = n;
}
@Override
public void endFrame() { }
};
while (data.hasRemaining()) {
reader.readFrame(data, consumer);
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2018, 2019, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -29,7 +29,6 @@
* SendTest
*/
import org.testng.annotations.AfterTest;
import org.testng.annotations.Test;
import java.io.IOException;
@ -46,40 +45,35 @@ public class SendTest {
private static final Class<NullPointerException> NPE = NullPointerException.class;
private DummyWebSocketServer server;
private WebSocket webSocket;
@AfterTest
public void cleanup() {
server.close();
webSocket.abort();
}
@Test
public void sendMethodsThrowNPE() throws IOException {
server = new DummyWebSocketServer();
server.open();
webSocket = newBuilder().proxy(NO_PROXY).build().newWebSocketBuilder()
.buildAsync(server.getURI(), new WebSocket.Listener() { })
.join();
try (var server = new DummyWebSocketServer()) {
server.open();
var webSocket = newBuilder().proxy(NO_PROXY).build().newWebSocketBuilder()
.buildAsync(server.getURI(), new WebSocket.Listener() { })
.join();
try {
assertThrows(NPE, () -> webSocket.sendText(null, false));
assertThrows(NPE, () -> webSocket.sendText(null, true));
assertThrows(NPE, () -> webSocket.sendBinary(null, false));
assertThrows(NPE, () -> webSocket.sendBinary(null, true));
assertThrows(NPE, () -> webSocket.sendPing(null));
assertThrows(NPE, () -> webSocket.sendPong(null));
assertThrows(NPE, () -> webSocket.sendClose(NORMAL_CLOSURE, null));
assertThrows(NPE, () -> webSocket.sendText(null, false));
assertThrows(NPE, () -> webSocket.sendText(null, true));
assertThrows(NPE, () -> webSocket.sendBinary(null, false));
assertThrows(NPE, () -> webSocket.sendBinary(null, true));
assertThrows(NPE, () -> webSocket.sendPing(null));
assertThrows(NPE, () -> webSocket.sendPong(null));
assertThrows(NPE, () -> webSocket.sendClose(NORMAL_CLOSURE, null));
webSocket.abort();
webSocket.abort();
assertThrows(NPE, () -> webSocket.sendText(null, false));
assertThrows(NPE, () -> webSocket.sendText(null, true));
assertThrows(NPE, () -> webSocket.sendBinary(null, false));
assertThrows(NPE, () -> webSocket.sendBinary(null, true));
assertThrows(NPE, () -> webSocket.sendPing(null));
assertThrows(NPE, () -> webSocket.sendPong(null));
assertThrows(NPE, () -> webSocket.sendClose(NORMAL_CLOSURE, null));
assertThrows(NPE, () -> webSocket.sendText(null, false));
assertThrows(NPE, () -> webSocket.sendText(null, true));
assertThrows(NPE, () -> webSocket.sendBinary(null, false));
assertThrows(NPE, () -> webSocket.sendBinary(null, true));
assertThrows(NPE, () -> webSocket.sendPing(null));
assertThrows(NPE, () -> webSocket.sendPong(null));
assertThrows(NPE, () -> webSocket.sendClose(NORMAL_CLOSURE, null));
} finally {
webSocket.abort();
}
}
}
// TODO: request in onClose/onError
@ -88,14 +82,20 @@ public class SendTest {
@Test
public void sendCloseCompleted() throws IOException {
server = new DummyWebSocketServer();
server.open();
webSocket = newBuilder().proxy(NO_PROXY).build().newWebSocketBuilder()
.buildAsync(server.getURI(), new WebSocket.Listener() { })
.join();
webSocket.sendClose(NORMAL_CLOSURE, "").join();
assertTrue(webSocket.isOutputClosed());
assertEquals(webSocket.getSubprotocol(), "");
webSocket.request(1); // No exceptions must be thrown
try (var server = new DummyWebSocketServer()) {
server.open();
var webSocket = newBuilder().proxy(NO_PROXY).build().newWebSocketBuilder()
.buildAsync(server.getURI(), new WebSocket.Listener() { })
.join();
try {
webSocket.sendClose(NORMAL_CLOSURE, "").join();
assertTrue(webSocket.isOutputClosed());
assertEquals(webSocket.getSubprotocol(), "");
webSocket.request(1); // No exceptions must be thrown
} finally {
webSocket.abort();
}
}
}
}

View File

@ -29,7 +29,6 @@
* WebSocketTest
*/
import org.testng.annotations.AfterTest;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@ -73,150 +72,150 @@ public class WebSocketTest {
Support.assertCompletesExceptionally(clazz, stage);
}
private DummyWebSocketServer server;
private WebSocket webSocket;
@AfterTest
public void cleanup() {
System.out.println("AFTER TEST");
if (server != null)
server.close();
if (webSocket != null)
webSocket.abort();
}
@Test
public void illegalArgument() throws IOException {
server = new DummyWebSocketServer();
server.open();
webSocket = newBuilder().proxy(NO_PROXY).build()
.newWebSocketBuilder()
.buildAsync(server.getURI(), new WebSocket.Listener() { })
.join();
try (var server = new DummyWebSocketServer()) {
server.open();
var webSocket = newBuilder().proxy(NO_PROXY).build()
.newWebSocketBuilder()
.buildAsync(server.getURI(), new WebSocket.Listener() { })
.join();
try {
assertFails(IAE, webSocket.sendPing(ByteBuffer.allocate(126)));
assertFails(IAE, webSocket.sendPing(ByteBuffer.allocate(127)));
assertFails(IAE, webSocket.sendPing(ByteBuffer.allocate(128)));
assertFails(IAE, webSocket.sendPing(ByteBuffer.allocate(129)));
assertFails(IAE, webSocket.sendPing(ByteBuffer.allocate(256)));
assertFails(IAE, webSocket.sendPing(ByteBuffer.allocate(126)));
assertFails(IAE, webSocket.sendPing(ByteBuffer.allocate(127)));
assertFails(IAE, webSocket.sendPing(ByteBuffer.allocate(128)));
assertFails(IAE, webSocket.sendPing(ByteBuffer.allocate(129)));
assertFails(IAE, webSocket.sendPing(ByteBuffer.allocate(256)));
assertFails(IAE, webSocket.sendPong(ByteBuffer.allocate(126)));
assertFails(IAE, webSocket.sendPong(ByteBuffer.allocate(127)));
assertFails(IAE, webSocket.sendPong(ByteBuffer.allocate(128)));
assertFails(IAE, webSocket.sendPong(ByteBuffer.allocate(129)));
assertFails(IAE, webSocket.sendPong(ByteBuffer.allocate(256)));
assertFails(IAE, webSocket.sendPong(ByteBuffer.allocate(126)));
assertFails(IAE, webSocket.sendPong(ByteBuffer.allocate(127)));
assertFails(IAE, webSocket.sendPong(ByteBuffer.allocate(128)));
assertFails(IAE, webSocket.sendPong(ByteBuffer.allocate(129)));
assertFails(IAE, webSocket.sendPong(ByteBuffer.allocate(256)));
assertFails(IOE, webSocket.sendText(Support.incompleteString(), true));
assertFails(IOE, webSocket.sendText(Support.incompleteString(), false));
assertFails(IOE, webSocket.sendText(Support.malformedString(), true));
assertFails(IOE, webSocket.sendText(Support.malformedString(), false));
assertFails(IOE, webSocket.sendText(Support.incompleteString(), true));
assertFails(IOE, webSocket.sendText(Support.incompleteString(), false));
assertFails(IOE, webSocket.sendText(Support.malformedString(), true));
assertFails(IOE, webSocket.sendText(Support.malformedString(), false));
assertFails(IAE, webSocket.sendClose(NORMAL_CLOSURE, Support.stringWithNBytes(124)));
assertFails(IAE, webSocket.sendClose(NORMAL_CLOSURE, Support.stringWithNBytes(125)));
assertFails(IAE, webSocket.sendClose(NORMAL_CLOSURE, Support.stringWithNBytes(128)));
assertFails(IAE, webSocket.sendClose(NORMAL_CLOSURE, Support.stringWithNBytes(256)));
assertFails(IAE, webSocket.sendClose(NORMAL_CLOSURE, Support.stringWithNBytes(257)));
assertFails(IAE, webSocket.sendClose(NORMAL_CLOSURE, Support.stringWith2NBytes((123 / 2) + 1)));
assertFails(IAE, webSocket.sendClose(NORMAL_CLOSURE, Support.malformedString()));
assertFails(IAE, webSocket.sendClose(NORMAL_CLOSURE, Support.incompleteString()));
assertFails(IAE, webSocket.sendClose(NORMAL_CLOSURE, Support.stringWithNBytes(124)));
assertFails(IAE, webSocket.sendClose(NORMAL_CLOSURE, Support.stringWithNBytes(125)));
assertFails(IAE, webSocket.sendClose(NORMAL_CLOSURE, Support.stringWithNBytes(128)));
assertFails(IAE, webSocket.sendClose(NORMAL_CLOSURE, Support.stringWithNBytes(256)));
assertFails(IAE, webSocket.sendClose(NORMAL_CLOSURE, Support.stringWithNBytes(257)));
assertFails(IAE, webSocket.sendClose(NORMAL_CLOSURE, Support.stringWith2NBytes((123 / 2) + 1)));
assertFails(IAE, webSocket.sendClose(NORMAL_CLOSURE, Support.malformedString()));
assertFails(IAE, webSocket.sendClose(NORMAL_CLOSURE, Support.incompleteString()));
assertFails(IAE, webSocket.sendClose(-2, "a reason"));
assertFails(IAE, webSocket.sendClose(-1, "a reason"));
assertFails(IAE, webSocket.sendClose(0, "a reason"));
assertFails(IAE, webSocket.sendClose(1, "a reason"));
assertFails(IAE, webSocket.sendClose(500, "a reason"));
assertFails(IAE, webSocket.sendClose(998, "a reason"));
assertFails(IAE, webSocket.sendClose(999, "a reason"));
assertFails(IAE, webSocket.sendClose(1002, "a reason"));
assertFails(IAE, webSocket.sendClose(1003, "a reason"));
assertFails(IAE, webSocket.sendClose(1006, "a reason"));
assertFails(IAE, webSocket.sendClose(1007, "a reason"));
assertFails(IAE, webSocket.sendClose(1009, "a reason"));
assertFails(IAE, webSocket.sendClose(1010, "a reason"));
assertFails(IAE, webSocket.sendClose(1012, "a reason"));
assertFails(IAE, webSocket.sendClose(1013, "a reason"));
assertFails(IAE, webSocket.sendClose(1015, "a reason"));
assertFails(IAE, webSocket.sendClose(5000, "a reason"));
assertFails(IAE, webSocket.sendClose(32768, "a reason"));
assertFails(IAE, webSocket.sendClose(65535, "a reason"));
assertFails(IAE, webSocket.sendClose(65536, "a reason"));
assertFails(IAE, webSocket.sendClose(Integer.MAX_VALUE, "a reason"));
assertFails(IAE, webSocket.sendClose(Integer.MIN_VALUE, "a reason"));
assertFails(IAE, webSocket.sendClose(-2, "a reason"));
assertFails(IAE, webSocket.sendClose(-1, "a reason"));
assertFails(IAE, webSocket.sendClose(0, "a reason"));
assertFails(IAE, webSocket.sendClose(1, "a reason"));
assertFails(IAE, webSocket.sendClose(500, "a reason"));
assertFails(IAE, webSocket.sendClose(998, "a reason"));
assertFails(IAE, webSocket.sendClose(999, "a reason"));
assertFails(IAE, webSocket.sendClose(1002, "a reason"));
assertFails(IAE, webSocket.sendClose(1003, "a reason"));
assertFails(IAE, webSocket.sendClose(1006, "a reason"));
assertFails(IAE, webSocket.sendClose(1007, "a reason"));
assertFails(IAE, webSocket.sendClose(1009, "a reason"));
assertFails(IAE, webSocket.sendClose(1010, "a reason"));
assertFails(IAE, webSocket.sendClose(1012, "a reason"));
assertFails(IAE, webSocket.sendClose(1013, "a reason"));
assertFails(IAE, webSocket.sendClose(1015, "a reason"));
assertFails(IAE, webSocket.sendClose(5000, "a reason"));
assertFails(IAE, webSocket.sendClose(32768, "a reason"));
assertFails(IAE, webSocket.sendClose(65535, "a reason"));
assertFails(IAE, webSocket.sendClose(65536, "a reason"));
assertFails(IAE, webSocket.sendClose(Integer.MAX_VALUE, "a reason"));
assertFails(IAE, webSocket.sendClose(Integer.MIN_VALUE, "a reason"));
assertThrows(IAE, () -> webSocket.request(Integer.MIN_VALUE));
assertThrows(IAE, () -> webSocket.request(Long.MIN_VALUE));
assertThrows(IAE, () -> webSocket.request(-1));
assertThrows(IAE, () -> webSocket.request(0));
assertThrows(IAE, () -> webSocket.request(Integer.MIN_VALUE));
assertThrows(IAE, () -> webSocket.request(Long.MIN_VALUE));
assertThrows(IAE, () -> webSocket.request(-1));
assertThrows(IAE, () -> webSocket.request(0));
server.close();
} finally {
webSocket.abort();
}
}
}
@Test
public void partialBinaryThenText() throws IOException {
server = new DummyWebSocketServer();
server.open();
webSocket = newBuilder().proxy(NO_PROXY).build().newWebSocketBuilder()
.buildAsync(server.getURI(), new WebSocket.Listener() { })
.join();
webSocket.sendBinary(ByteBuffer.allocate(16), false).join();
assertFails(ISE, webSocket.sendText("text", false));
assertFails(ISE, webSocket.sendText("text", true));
// Pings & Pongs are fine
webSocket.sendPing(ByteBuffer.allocate(125)).join();
webSocket.sendPong(ByteBuffer.allocate(125)).join();
server.close();
try (var server = new DummyWebSocketServer()) {
server.open();
var webSocket = newBuilder().proxy(NO_PROXY).build().newWebSocketBuilder()
.buildAsync(server.getURI(), new WebSocket.Listener() { })
.join();
try {
webSocket.sendBinary(ByteBuffer.allocate(16), false).join();
assertFails(ISE, webSocket.sendText("text", false));
assertFails(ISE, webSocket.sendText("text", true));
// Pings & Pongs are fine
webSocket.sendPing(ByteBuffer.allocate(125)).join();
webSocket.sendPong(ByteBuffer.allocate(125)).join();
} finally {
webSocket.abort();
}
}
}
@Test
public void partialTextThenBinary() throws IOException {
server = new DummyWebSocketServer();
server.open();
webSocket = newBuilder().proxy(NO_PROXY).build().newWebSocketBuilder()
.buildAsync(server.getURI(), new WebSocket.Listener() { })
.join();
webSocket.sendText("text", false).join();
assertFails(ISE, webSocket.sendBinary(ByteBuffer.allocate(16), false));
assertFails(ISE, webSocket.sendBinary(ByteBuffer.allocate(16), true));
// Pings & Pongs are fine
webSocket.sendPing(ByteBuffer.allocate(125)).join();
webSocket.sendPong(ByteBuffer.allocate(125)).join();
server.close();
try (var server = new DummyWebSocketServer()) {
server.open();
var webSocket = newBuilder().proxy(NO_PROXY).build().newWebSocketBuilder()
.buildAsync(server.getURI(), new WebSocket.Listener() { })
.join();
try {
webSocket.sendText("text", false).join();
assertFails(ISE, webSocket.sendBinary(ByteBuffer.allocate(16), false));
assertFails(ISE, webSocket.sendBinary(ByteBuffer.allocate(16), true));
// Pings & Pongs are fine
webSocket.sendPing(ByteBuffer.allocate(125)).join();
webSocket.sendPong(ByteBuffer.allocate(125)).join();
} finally {
webSocket.abort();
}
}
}
@Test
public void sendMethodsThrowIOE1() throws IOException {
server = new DummyWebSocketServer();
server.open();
webSocket = newBuilder().proxy(NO_PROXY).build()
.newWebSocketBuilder()
.buildAsync(server.getURI(), new WebSocket.Listener() { })
.join();
try (var server = new DummyWebSocketServer()) {
server.open();
var webSocket = newBuilder().proxy(NO_PROXY).build()
.newWebSocketBuilder()
.buildAsync(server.getURI(), new WebSocket.Listener() { })
.join();
try {
webSocket.sendClose(NORMAL_CLOSURE, "ok").join();
webSocket.sendClose(NORMAL_CLOSURE, "ok").join();
assertFails(IOE, webSocket.sendClose(WebSocket.NORMAL_CLOSURE, "ok"));
assertFails(IOE, webSocket.sendClose(WebSocket.NORMAL_CLOSURE, "ok"));
assertFails(IOE, webSocket.sendText("", true));
assertFails(IOE, webSocket.sendText("", false));
assertFails(IOE, webSocket.sendText("abc", true));
assertFails(IOE, webSocket.sendText("abc", false));
assertFails(IOE, webSocket.sendBinary(ByteBuffer.allocate(0), true));
assertFails(IOE, webSocket.sendBinary(ByteBuffer.allocate(0), false));
assertFails(IOE, webSocket.sendBinary(ByteBuffer.allocate(1), true));
assertFails(IOE, webSocket.sendBinary(ByteBuffer.allocate(1), false));
assertFails(IOE, webSocket.sendText("", true));
assertFails(IOE, webSocket.sendText("", false));
assertFails(IOE, webSocket.sendText("abc", true));
assertFails(IOE, webSocket.sendText("abc", false));
assertFails(IOE, webSocket.sendBinary(ByteBuffer.allocate(0), true));
assertFails(IOE, webSocket.sendBinary(ByteBuffer.allocate(0), false));
assertFails(IOE, webSocket.sendBinary(ByteBuffer.allocate(1), true));
assertFails(IOE, webSocket.sendBinary(ByteBuffer.allocate(1), false));
assertFails(IOE, webSocket.sendPing(ByteBuffer.allocate(125)));
assertFails(IOE, webSocket.sendPing(ByteBuffer.allocate(124)));
assertFails(IOE, webSocket.sendPing(ByteBuffer.allocate(1)));
assertFails(IOE, webSocket.sendPing(ByteBuffer.allocate(0)));
assertFails(IOE, webSocket.sendPing(ByteBuffer.allocate(125)));
assertFails(IOE, webSocket.sendPing(ByteBuffer.allocate(124)));
assertFails(IOE, webSocket.sendPing(ByteBuffer.allocate(1)));
assertFails(IOE, webSocket.sendPing(ByteBuffer.allocate(0)));
assertFails(IOE, webSocket.sendPong(ByteBuffer.allocate(125)));
assertFails(IOE, webSocket.sendPong(ByteBuffer.allocate(124)));
assertFails(IOE, webSocket.sendPong(ByteBuffer.allocate(1)));
assertFails(IOE, webSocket.sendPong(ByteBuffer.allocate(0)));
server.close();
assertFails(IOE, webSocket.sendPong(ByteBuffer.allocate(125)));
assertFails(IOE, webSocket.sendPong(ByteBuffer.allocate(124)));
assertFails(IOE, webSocket.sendPong(ByteBuffer.allocate(1)));
assertFails(IOE, webSocket.sendPong(ByteBuffer.allocate(0)));
} finally {
webSocket.abort();
}
}
}
@DataProvider(name = "sequence")
@ -251,150 +250,153 @@ public class WebSocketTest {
public void listenerSequentialOrder(int[] binary, long requestSize)
throws IOException
{
try (var server = Support.serverWithCannedData(binary)) {
server.open();
server = Support.serverWithCannedData(binary);
server.open();
CompletableFuture<Void> violation = new CompletableFuture<>();
CompletableFuture<Void> violation = new CompletableFuture<>();
MockListener listener = new MockListener(requestSize) {
MockListener listener = new MockListener(requestSize) {
final AtomicBoolean guard = new AtomicBoolean();
final AtomicBoolean guard = new AtomicBoolean();
private <T> T checkRunExclusively(Supplier<T> action) {
if (guard.getAndSet(true)) {
violation.completeExceptionally(new RuntimeException());
}
try {
return action.get();
} finally {
if (!guard.getAndSet(false)) {
private <T> T checkRunExclusively(Supplier<T> action) {
if (guard.getAndSet(true)) {
violation.completeExceptionally(new RuntimeException());
}
try {
return action.get();
} finally {
if (!guard.getAndSet(false)) {
violation.completeExceptionally(new RuntimeException());
}
}
}
@Override
public void onOpen(WebSocket webSocket) {
checkRunExclusively(() -> {
super.onOpen(webSocket);
return null;
});
}
@Override
public CompletionStage<?> onText(WebSocket webSocket,
CharSequence data,
boolean last) {
return checkRunExclusively(
() -> super.onText(webSocket, data, last));
}
@Override
public CompletionStage<?> onBinary(WebSocket webSocket,
ByteBuffer data,
boolean last) {
return checkRunExclusively(
() -> super.onBinary(webSocket, data, last));
}
@Override
public CompletionStage<?> onPing(WebSocket webSocket,
ByteBuffer message) {
return checkRunExclusively(
() -> super.onPing(webSocket, message));
}
@Override
public CompletionStage<?> onPong(WebSocket webSocket,
ByteBuffer message) {
return checkRunExclusively(
() -> super.onPong(webSocket, message));
}
@Override
public CompletionStage<?> onClose(WebSocket webSocket,
int statusCode,
String reason) {
return checkRunExclusively(
() -> super.onClose(webSocket, statusCode, reason));
}
@Override
public void onError(WebSocket webSocket, Throwable error) {
checkRunExclusively(() -> {
super.onError(webSocket, error);
return null;
});
}
};
var webSocket = newBuilder().proxy(NO_PROXY).build().newWebSocketBuilder()
.buildAsync(server.getURI(), listener)
.join();
try {
listener.invocations();
violation.complete(null); // won't affect if completed exceptionally
violation.join();
} finally {
webSocket.abort();
}
@Override
public void onOpen(WebSocket webSocket) {
checkRunExclusively(() -> {
super.onOpen(webSocket);
return null;
});
}
@Override
public CompletionStage<?> onText(WebSocket webSocket,
CharSequence data,
boolean last) {
return checkRunExclusively(
() -> super.onText(webSocket, data, last));
}
@Override
public CompletionStage<?> onBinary(WebSocket webSocket,
ByteBuffer data,
boolean last) {
return checkRunExclusively(
() -> super.onBinary(webSocket, data, last));
}
@Override
public CompletionStage<?> onPing(WebSocket webSocket,
ByteBuffer message) {
return checkRunExclusively(
() -> super.onPing(webSocket, message));
}
@Override
public CompletionStage<?> onPong(WebSocket webSocket,
ByteBuffer message) {
return checkRunExclusively(
() -> super.onPong(webSocket, message));
}
@Override
public CompletionStage<?> onClose(WebSocket webSocket,
int statusCode,
String reason) {
return checkRunExclusively(
() -> super.onClose(webSocket, statusCode, reason));
}
@Override
public void onError(WebSocket webSocket, Throwable error) {
checkRunExclusively(() -> {
super.onError(webSocket, error);
return null;
});
}
};
webSocket = newBuilder().proxy(NO_PROXY).build().newWebSocketBuilder()
.buildAsync(server.getURI(), listener)
.join();
listener.invocations();
violation.complete(null); // won't affect if completed exceptionally
violation.join();
server.close();
}
}
@Test
public void sendMethodsThrowIOE2() throws Exception {
server = Support.serverWithCannedData(0x88, 0x00);
server.open();
CompletableFuture<Void> onCloseCalled = new CompletableFuture<>();
CompletableFuture<Void> canClose = new CompletableFuture<>();
try (var server = Support.serverWithCannedData(0x88, 0x00)) {
server.open();
WebSocket.Listener listener = new WebSocket.Listener() {
@Override
public CompletionStage<?> onClose(WebSocket webSocket,
int statusCode,
String reason) {
System.out.printf("onClose(%s, '%s')%n", statusCode, reason);
onCloseCalled.complete(null);
return canClose;
CompletableFuture<Void> onCloseCalled = new CompletableFuture<>();
CompletableFuture<Void> canClose = new CompletableFuture<>();
WebSocket.Listener listener = new WebSocket.Listener() {
@Override
public CompletionStage<?> onClose(WebSocket webSocket,
int statusCode,
String reason) {
System.out.printf("onClose(%s, '%s')%n", statusCode, reason);
onCloseCalled.complete(null);
return canClose;
}
@Override
public void onError(WebSocket webSocket, Throwable error) {
System.out.println("onError(" + error + ")");
onCloseCalled.completeExceptionally(error);
}
};
var webSocket = newBuilder().proxy(NO_PROXY).build().newWebSocketBuilder()
.buildAsync(server.getURI(), listener)
.join();
try {
onCloseCalled.join(); // Wait for onClose to be called
canClose.complete(null); // Signal to the WebSocket it can close the output
TimeUnit.SECONDS.sleep(5); // Give canClose some time to reach the WebSocket
assertFails(IOE, webSocket.sendClose(WebSocket.NORMAL_CLOSURE, "ok"));
assertFails(IOE, webSocket.sendText("", true));
assertFails(IOE, webSocket.sendText("", false));
assertFails(IOE, webSocket.sendText("abc", true));
assertFails(IOE, webSocket.sendText("abc", false));
assertFails(IOE, webSocket.sendBinary(ByteBuffer.allocate(0), true));
assertFails(IOE, webSocket.sendBinary(ByteBuffer.allocate(0), false));
assertFails(IOE, webSocket.sendBinary(ByteBuffer.allocate(1), true));
assertFails(IOE, webSocket.sendBinary(ByteBuffer.allocate(1), false));
assertFails(IOE, webSocket.sendPing(ByteBuffer.allocate(125)));
assertFails(IOE, webSocket.sendPing(ByteBuffer.allocate(124)));
assertFails(IOE, webSocket.sendPing(ByteBuffer.allocate(1)));
assertFails(IOE, webSocket.sendPing(ByteBuffer.allocate(0)));
assertFails(IOE, webSocket.sendPong(ByteBuffer.allocate(125)));
assertFails(IOE, webSocket.sendPong(ByteBuffer.allocate(124)));
assertFails(IOE, webSocket.sendPong(ByteBuffer.allocate(1)));
assertFails(IOE, webSocket.sendPong(ByteBuffer.allocate(0)));
} finally {
webSocket.abort();
}
@Override
public void onError(WebSocket webSocket, Throwable error) {
System.out.println("onError(" + error + ")");
onCloseCalled.completeExceptionally(error);
}
};
webSocket = newBuilder().proxy(NO_PROXY).build().newWebSocketBuilder()
.buildAsync(server.getURI(), listener)
.join();
onCloseCalled.join(); // Wait for onClose to be called
canClose.complete(null); // Signal to the WebSocket it can close the output
TimeUnit.SECONDS.sleep(5); // Give canClose some time to reach the WebSocket
assertFails(IOE, webSocket.sendClose(WebSocket.NORMAL_CLOSURE, "ok"));
assertFails(IOE, webSocket.sendText("", true));
assertFails(IOE, webSocket.sendText("", false));
assertFails(IOE, webSocket.sendText("abc", true));
assertFails(IOE, webSocket.sendText("abc", false));
assertFails(IOE, webSocket.sendBinary(ByteBuffer.allocate(0), true));
assertFails(IOE, webSocket.sendBinary(ByteBuffer.allocate(0), false));
assertFails(IOE, webSocket.sendBinary(ByteBuffer.allocate(1), true));
assertFails(IOE, webSocket.sendBinary(ByteBuffer.allocate(1), false));
assertFails(IOE, webSocket.sendPing(ByteBuffer.allocate(125)));
assertFails(IOE, webSocket.sendPing(ByteBuffer.allocate(124)));
assertFails(IOE, webSocket.sendPing(ByteBuffer.allocate(1)));
assertFails(IOE, webSocket.sendPing(ByteBuffer.allocate(0)));
assertFails(IOE, webSocket.sendPong(ByteBuffer.allocate(125)));
assertFails(IOE, webSocket.sendPong(ByteBuffer.allocate(124)));
assertFails(IOE, webSocket.sendPong(ByteBuffer.allocate(1)));
assertFails(IOE, webSocket.sendPong(ByteBuffer.allocate(0)));
server.close();
}
}
// Used to verify a server requiring Authentication
@ -458,74 +460,76 @@ public class WebSocketTest {
};
CompletableFuture<List<byte[]>> actual = new CompletableFuture<>();
server = serverSupplier.apply(binary);
server.open();
try (var server = serverSupplier.apply(binary)) {
server.open();
WebSocket.Listener listener = new WebSocket.Listener() {
WebSocket.Listener listener = new WebSocket.Listener() {
List<byte[]> collectedBytes = new ArrayList<>();
ByteBuffer buffer = ByteBuffer.allocate(1024);
List<byte[]> collectedBytes = new ArrayList<>();
ByteBuffer buffer = ByteBuffer.allocate(1024);
@Override
public CompletionStage<?> onBinary(WebSocket webSocket,
ByteBuffer message,
boolean last) {
System.out.printf("onBinary(%s, %s)%n", message, last);
webSocket.request(1);
@Override
public CompletionStage<?> onBinary(WebSocket webSocket,
ByteBuffer message,
boolean last) {
System.out.printf("onBinary(%s, %s)%n", message, last);
webSocket.request(1);
append(message);
if (last) {
buffer.flip();
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
buffer.clear();
processWholeBinary(bytes);
append(message);
if (last) {
buffer.flip();
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
buffer.clear();
processWholeBinary(bytes);
}
return null;
}
return null;
}
private void append(ByteBuffer message) {
if (buffer.remaining() < message.remaining()) {
assert message.remaining() > 0;
int cap = (buffer.capacity() + message.remaining()) * 2;
ByteBuffer b = ByteBuffer.allocate(cap);
b.put(buffer.flip());
buffer = b;
private void append(ByteBuffer message) {
if (buffer.remaining() < message.remaining()) {
assert message.remaining() > 0;
int cap = (buffer.capacity() + message.remaining()) * 2;
ByteBuffer b = ByteBuffer.allocate(cap);
b.put(buffer.flip());
buffer = b;
}
buffer.put(message);
}
buffer.put(message);
private void processWholeBinary(byte[] bytes) {
String stringBytes = new String(bytes, UTF_8);
System.out.println("processWholeBinary: " + stringBytes);
collectedBytes.add(bytes);
}
@Override
public CompletionStage<?> onClose(WebSocket webSocket,
int statusCode,
String reason) {
actual.complete(collectedBytes);
return null;
}
@Override
public void onError(WebSocket webSocket, Throwable error) {
actual.completeExceptionally(error);
}
};
var webSocket = newBuilder()
.proxy(NO_PROXY)
.authenticator(new WSAuthenticator())
.build().newWebSocketBuilder()
.buildAsync(server.getURI(), listener)
.join();
try {
List<byte[]> a = actual.join();
assertEquals(a, expected);
} finally {
webSocket.abort();
}
private void processWholeBinary(byte[] bytes) {
String stringBytes = new String(bytes, UTF_8);
System.out.println("processWholeBinary: " + stringBytes);
collectedBytes.add(bytes);
}
@Override
public CompletionStage<?> onClose(WebSocket webSocket,
int statusCode,
String reason) {
actual.complete(collectedBytes);
return null;
}
@Override
public void onError(WebSocket webSocket, Throwable error) {
actual.completeExceptionally(error);
}
};
webSocket = newBuilder()
.proxy(NO_PROXY)
.authenticator(new WSAuthenticator())
.build().newWebSocketBuilder()
.buildAsync(server.getURI(), listener)
.join();
List<byte[]> a = actual.join();
assertEquals(a, expected);
server.close();
}
}
@Test(dataProvider = "servers")
@ -554,59 +558,61 @@ public class WebSocketTest {
};
CompletableFuture<List<String>> actual = new CompletableFuture<>();
server = serverSupplier.apply(binary);
server.open();
try (var server = serverSupplier.apply(binary)) {
server.open();
WebSocket.Listener listener = new WebSocket.Listener() {
WebSocket.Listener listener = new WebSocket.Listener() {
List<String> collectedStrings = new ArrayList<>();
StringBuilder text = new StringBuilder();
List<String> collectedStrings = new ArrayList<>();
StringBuilder text = new StringBuilder();
@Override
public CompletionStage<?> onText(WebSocket webSocket,
CharSequence message,
boolean last) {
System.out.printf("onText(%s, %s)%n", message, last);
webSocket.request(1);
text.append(message);
if (last) {
String str = text.toString();
text.setLength(0);
processWholeText(str);
@Override
public CompletionStage<?> onText(WebSocket webSocket,
CharSequence message,
boolean last) {
System.out.printf("onText(%s, %s)%n", message, last);
webSocket.request(1);
text.append(message);
if (last) {
String str = text.toString();
text.setLength(0);
processWholeText(str);
}
return null;
}
return null;
private void processWholeText(String string) {
System.out.println(string);
collectedStrings.add(string);
}
@Override
public CompletionStage<?> onClose(WebSocket webSocket,
int statusCode,
String reason) {
actual.complete(collectedStrings);
return null;
}
@Override
public void onError(WebSocket webSocket, Throwable error) {
actual.completeExceptionally(error);
}
};
var webSocket = newBuilder()
.proxy(NO_PROXY)
.authenticator(new WSAuthenticator())
.build().newWebSocketBuilder()
.buildAsync(server.getURI(), listener)
.join();
try {
List<String> a = actual.join();
assertEquals(a, expected);
} finally {
webSocket.abort();
}
private void processWholeText(String string) {
System.out.println(string);
collectedStrings.add(string);
}
@Override
public CompletionStage<?> onClose(WebSocket webSocket,
int statusCode,
String reason) {
actual.complete(collectedStrings);
return null;
}
@Override
public void onError(WebSocket webSocket, Throwable error) {
actual.completeExceptionally(error);
}
};
webSocket = newBuilder()
.proxy(NO_PROXY)
.authenticator(new WSAuthenticator())
.build().newWebSocketBuilder()
.buildAsync(server.getURI(), listener)
.join();
List<String> a = actual.join();
assertEquals(a, expected);
server.close();
}
}
/*
@ -639,73 +645,75 @@ public class WebSocketTest {
};
CompletableFuture<List<String>> actual = new CompletableFuture<>();
server = serverSupplier.apply(binary);
server.open();
try (var server = serverSupplier.apply(binary)) {
server.open();
WebSocket.Listener listener = new WebSocket.Listener() {
WebSocket.Listener listener = new WebSocket.Listener() {
List<CharSequence> parts = new ArrayList<>();
/*
* A CompletableFuture which will complete once the current
* message has been fully assembled. Until then the listener
* returns this instance for every call.
*/
CompletableFuture<?> currentCf = new CompletableFuture<>();
List<String> collected = new ArrayList<>();
List<CharSequence> parts = new ArrayList<>();
/*
* A CompletableFuture which will complete once the current
* message has been fully assembled. Until then the listener
* returns this instance for every call.
*/
CompletableFuture<?> currentCf = new CompletableFuture<>();
List<String> collected = new ArrayList<>();
@Override
public CompletionStage<?> onText(WebSocket webSocket,
CharSequence message,
boolean last) {
parts.add(message);
if (!last) {
webSocket.request(1);
} else {
this.currentCf.thenRun(() -> webSocket.request(1));
CompletableFuture<?> refCf = this.currentCf;
processWholeMessage(new ArrayList<>(parts), refCf);
currentCf = new CompletableFuture<>();
parts.clear();
return refCf;
@Override
public CompletionStage<?> onText(WebSocket webSocket,
CharSequence message,
boolean last) {
parts.add(message);
if (!last) {
webSocket.request(1);
} else {
this.currentCf.thenRun(() -> webSocket.request(1));
CompletableFuture<?> refCf = this.currentCf;
processWholeMessage(new ArrayList<>(parts), refCf);
currentCf = new CompletableFuture<>();
parts.clear();
return refCf;
}
return currentCf;
}
return currentCf;
@Override
public CompletionStage<?> onClose(WebSocket webSocket,
int statusCode,
String reason) {
actual.complete(collected);
return null;
}
@Override
public void onError(WebSocket webSocket, Throwable error) {
actual.completeExceptionally(error);
}
public void processWholeMessage(List<CharSequence> data,
CompletableFuture<?> cf) {
StringBuilder b = new StringBuilder();
data.forEach(b::append);
String s = b.toString();
System.out.println(s);
cf.complete(null);
collected.add(s);
}
};
var webSocket = newBuilder()
.proxy(NO_PROXY)
.authenticator(new WSAuthenticator())
.build().newWebSocketBuilder()
.buildAsync(server.getURI(), listener)
.join();
try {
List<String> a = actual.join();
assertEquals(a, expected);
} finally {
webSocket.abort();
}
@Override
public CompletionStage<?> onClose(WebSocket webSocket,
int statusCode,
String reason) {
actual.complete(collected);
return null;
}
@Override
public void onError(WebSocket webSocket, Throwable error) {
actual.completeExceptionally(error);
}
public void processWholeMessage(List<CharSequence> data,
CompletableFuture<?> cf) {
StringBuilder b = new StringBuilder();
data.forEach(b::append);
String s = b.toString();
System.out.println(s);
cf.complete(null);
collected.add(s);
}
};
webSocket = newBuilder()
.proxy(NO_PROXY)
.authenticator(new WSAuthenticator())
.build().newWebSocketBuilder()
.buildAsync(server.getURI(), listener)
.join();
List<String> a = actual.join();
assertEquals(a, expected);
server.close();
}
}
// -- authentication specific tests
@ -725,6 +733,7 @@ public class WebSocketTest {
.newWebSocketBuilder()
.buildAsync(server.getURI(), new WebSocket.Listener() { })
.join();
webSocket.abort();
}
}
@ -745,6 +754,7 @@ public class WebSocketTest {
.header("Authorization", hv)
.buildAsync(server.getURI(), new WebSocket.Listener() { })
.join();
webSocket.abort();
}
}
@ -763,6 +773,7 @@ public class WebSocketTest {
try {
var webSocket = cf.join();
silentAbort(webSocket);
fail("Expected exception not thrown");
} catch (CompletionException expected) {
WebSocketHandshakeException e = (WebSocketHandshakeException)expected.getCause();
@ -783,7 +794,7 @@ public class WebSocketTest {
Authenticator authenticator = new Authenticator() {
@Override protected PasswordAuthentication getPasswordAuthentication() {
return new PasswordAuthentication("BAD"+USERNAME, "".toCharArray());
return new PasswordAuthentication("BAD" + USERNAME, "".toCharArray());
}
};
@ -796,10 +807,16 @@ public class WebSocketTest {
try {
var webSocket = cf.join();
silentAbort(webSocket);
fail("Expected exception not thrown");
} catch (CompletionException expected) {
System.out.println("caught expected exception:" + expected);
}
}
}
private static void silentAbort(WebSocket ws) {
try {
ws.abort();
} catch (Throwable t) { }
}
}