a223189b06
Co-authored-by: Paul Sandoz <psandoz@openjdk.org> Co-authored-by: Jorn Vernee <jvernee@openjdk.org> Co-authored-by: Vladimir Ivanov <vlivanov@openjdk.org> Co-authored-by: Athijegannathan Sundararajan <sundar@openjdk.org> Co-authored-by: Chris Hegarty <chegar@openjdk.org> Reviewed-by: psandoz, chegar, mchung, vlivanov
239 lines
10 KiB
Java
239 lines
10 KiB
Java
/*
|
|
* Copyright (c) 2021, Oracle and/or its affiliates. All rights reserved.
|
|
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
|
*
|
|
* This code is free software; you can redistribute it and/or modify it
|
|
* under the terms of the GNU General Public License version 2 only, as
|
|
* published by the Free Software Foundation.
|
|
*
|
|
* This code is distributed in the hope that it will be useful, but WITHOUT
|
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
|
* version 2 for more details (a copy is included in the LICENSE file that
|
|
* accompanied this code).
|
|
*
|
|
* You should have received a copy of the GNU General Public License version
|
|
* 2 along with this work; if not, write to the Free Software Foundation,
|
|
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
|
|
*
|
|
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
|
|
* or visit www.oracle.com if you need additional information or have any
|
|
* questions.
|
|
*/
|
|
|
|
/*
|
|
* @test
|
|
* @library /test/lib
|
|
* @modules java.base/sun.nio.ch
|
|
* jdk.incubator.foreign/jdk.internal.foreign
|
|
* @key randomness
|
|
* @run testng/othervm TestSocketChannels
|
|
*/
|
|
|
|
import java.net.InetAddress;
|
|
import java.net.InetSocketAddress;
|
|
import java.nio.ByteBuffer;
|
|
import java.nio.channels.ServerSocketChannel;
|
|
import java.nio.channels.SocketChannel;
|
|
import java.util.Arrays;
|
|
import java.util.List;
|
|
import java.util.concurrent.atomic.AtomicReference;
|
|
import java.util.function.Supplier;
|
|
import java.util.stream.Stream;
|
|
import jdk.incubator.foreign.MemoryAccess;
|
|
import jdk.incubator.foreign.MemorySegment;
|
|
import jdk.incubator.foreign.ResourceScope;
|
|
import org.testng.annotations.*;
|
|
import static org.testng.Assert.*;
|
|
|
|
/**
|
|
* Tests consisting of buffer views with synchronous NIO network channels.
|
|
*/
|
|
public class TestSocketChannels extends AbstractChannelsTest {
|
|
|
|
static final Class<IllegalStateException> ISE = IllegalStateException.class;
|
|
|
|
@Test(dataProvider = "closeableScopes")
|
|
public void testBasicIOWithClosedSegment(Supplier<ResourceScope> scopeSupplier)
|
|
throws Exception
|
|
{
|
|
try (var channel = SocketChannel.open();
|
|
var server = ServerSocketChannel.open();
|
|
var connectedChannel = connectChannels(server, channel)) {
|
|
ResourceScope scope = scopeSupplier.get();
|
|
ByteBuffer bb = segmentBufferOfSize(scope, 16);
|
|
scope.close();
|
|
assertMessage(expectThrows(ISE, () -> channel.read(bb)), "Already closed");
|
|
assertMessage(expectThrows(ISE, () -> channel.read(new ByteBuffer[] {bb})), "Already closed");
|
|
assertMessage(expectThrows(ISE, () -> channel.read(new ByteBuffer[] {bb}, 0, 1)), "Already closed");
|
|
assertMessage(expectThrows(ISE, () -> channel.write(bb)), "Already closed");
|
|
assertMessage(expectThrows(ISE, () -> channel.write(new ByteBuffer[] {bb})), "Already closed");
|
|
assertMessage(expectThrows(ISE, () -> channel.write(new ByteBuffer[] {bb}, 0 ,1)), "Already closed");
|
|
}
|
|
}
|
|
|
|
@Test(dataProvider = "closeableScopes")
|
|
public void testScatterGatherWithClosedSegment(Supplier<ResourceScope> scopeSupplier)
|
|
throws Exception
|
|
{
|
|
try (var channel = SocketChannel.open();
|
|
var server = ServerSocketChannel.open();
|
|
var connectedChannel = connectChannels(server, channel)) {
|
|
ResourceScope scope = scopeSupplier.get();
|
|
ByteBuffer[] buffers = segmentBuffersOfSize(8, scope, 16);
|
|
scope.close();
|
|
assertMessage(expectThrows(ISE, () -> channel.write(buffers)), "Already closed");
|
|
assertMessage(expectThrows(ISE, () -> channel.read(buffers)), "Already closed");
|
|
assertMessage(expectThrows(ISE, () -> channel.write(buffers, 0 ,8)), "Already closed");
|
|
assertMessage(expectThrows(ISE, () -> channel.read(buffers, 0, 8)), "Already closed");
|
|
}
|
|
}
|
|
|
|
@Test(dataProvider = "allScopes")
|
|
public void testBasicIO(Supplier<ResourceScope> scopeSupplier)
|
|
throws Exception
|
|
{
|
|
ResourceScope scope;
|
|
try (var sc1 = SocketChannel.open();
|
|
var ssc = ServerSocketChannel.open();
|
|
var sc2 = connectChannels(ssc, sc1);
|
|
var scp = closeableScopeOrNull(scope = scopeSupplier.get())) {
|
|
MemorySegment segment1 = MemorySegment.allocateNative(10, 1, scope);
|
|
MemorySegment segment2 = MemorySegment.allocateNative(10, 1, scope);
|
|
for (int i = 0; i < 10; i++) {
|
|
MemoryAccess.setByteAtOffset(segment1, i, (byte) i);
|
|
}
|
|
ByteBuffer bb1 = segment1.asByteBuffer();
|
|
ByteBuffer bb2 = segment2.asByteBuffer();
|
|
assertEquals(sc1.write(bb1), 10);
|
|
assertEquals(sc2.read(bb2), 10);
|
|
assertEquals(bb2.flip(), ByteBuffer.wrap(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
|
|
}
|
|
}
|
|
|
|
@Test
|
|
public void testBasicHeapIOWithGlobalScope() throws Exception {
|
|
try (var sc1 = SocketChannel.open();
|
|
var ssc = ServerSocketChannel.open();
|
|
var sc2 = connectChannels(ssc, sc1)) {
|
|
var segment1 = MemorySegment.ofArray(new byte[10]);
|
|
var segment2 = MemorySegment.ofArray(new byte[10]);
|
|
for (int i = 0; i < 10; i++) {
|
|
MemoryAccess.setByteAtOffset(segment1, i, (byte) i);
|
|
}
|
|
ByteBuffer bb1 = segment1.asByteBuffer();
|
|
ByteBuffer bb2 = segment2.asByteBuffer();
|
|
assertEquals(sc1.write(bb1), 10);
|
|
assertEquals(sc2.read(bb2), 10);
|
|
assertEquals(bb2.flip(), ByteBuffer.wrap(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
|
|
}
|
|
}
|
|
|
|
@Test(dataProvider = "confinedScopes")
|
|
public void testIOOnConfinedFromAnotherThread(Supplier<ResourceScope> scopeSupplier)
|
|
throws Exception
|
|
{
|
|
try (var channel = SocketChannel.open();
|
|
var server = ServerSocketChannel.open();
|
|
var connected = connectChannels(server, channel);
|
|
var scope = scopeSupplier.get()) {
|
|
var segment = MemorySegment.allocateNative(10, 1, scope);
|
|
ByteBuffer bb = segment.asByteBuffer();
|
|
List<ThrowingRunnable> ioOps = List.of(
|
|
() -> channel.write(bb),
|
|
() -> channel.read(bb),
|
|
() -> channel.write(new ByteBuffer[] {bb}),
|
|
() -> channel.read(new ByteBuffer[] {bb}),
|
|
() -> channel.write(new ByteBuffer[] {bb}, 0, 1),
|
|
() -> channel.read(new ByteBuffer[] {bb}, 0, 1)
|
|
);
|
|
for (var ioOp : ioOps) {
|
|
AtomicReference<Exception> exception = new AtomicReference<>();
|
|
Runnable task = () -> exception.set(expectThrows(ISE, ioOp));
|
|
var t = new Thread(task);
|
|
t.start();
|
|
t.join();
|
|
assertMessage(exception.get(), "Attempted access outside owning thread");
|
|
}
|
|
}
|
|
}
|
|
|
|
@Test(dataProvider = "allScopes")
|
|
public void testScatterGatherIO(Supplier<ResourceScope> scopeSupplier)
|
|
throws Exception
|
|
{
|
|
ResourceScope scope;
|
|
try (var sc1 = SocketChannel.open();
|
|
var ssc = ServerSocketChannel.open();
|
|
var sc2 = connectChannels(ssc, sc1);
|
|
var scp = closeableScopeOrNull(scope = scopeSupplier.get())) {
|
|
var writeBuffers = mixedBuffersOfSize(32, scope, 64);
|
|
var readBuffers = mixedBuffersOfSize(32, scope, 64);
|
|
long expectedCount = remaining(writeBuffers);
|
|
assertEquals(writeNBytes(sc1, writeBuffers, 0, 32, expectedCount), expectedCount);
|
|
assertEquals(readNBytes(sc2, readBuffers, 0, 32, expectedCount), expectedCount);
|
|
assertEquals(flip(readBuffers), clear(writeBuffers));
|
|
}
|
|
}
|
|
|
|
@Test(dataProvider = "closeableScopes")
|
|
public void testBasicIOWithDifferentScopes(Supplier<ResourceScope> scopeSupplier)
|
|
throws Exception
|
|
{
|
|
try (var sc1 = SocketChannel.open();
|
|
var ssc = ServerSocketChannel.open();
|
|
var sc2 = connectChannels(ssc, sc1);
|
|
var scope1 = scopeSupplier.get();
|
|
var scope2 = scopeSupplier.get()) {
|
|
var writeBuffers = Stream.of(mixedBuffersOfSize(16, scope1, 64), mixedBuffersOfSize(16, scope2, 64))
|
|
.flatMap(Arrays::stream)
|
|
.toArray(ByteBuffer[]::new);
|
|
var readBuffers = Stream.of(mixedBuffersOfSize(16, scope1, 64), mixedBuffersOfSize(16, scope2, 64))
|
|
.flatMap(Arrays::stream)
|
|
.toArray(ByteBuffer[]::new);
|
|
|
|
long expectedCount = remaining(writeBuffers);
|
|
assertEquals(writeNBytes(sc1, writeBuffers, 0, 32, expectedCount), expectedCount);
|
|
assertEquals(readNBytes(sc2, readBuffers, 0, 32, expectedCount), expectedCount);
|
|
assertEquals(flip(readBuffers), clear(writeBuffers));
|
|
}
|
|
}
|
|
|
|
static SocketChannel connectChannels(ServerSocketChannel ssc, SocketChannel sc)
|
|
throws Exception
|
|
{
|
|
ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
|
|
sc.connect(ssc.getLocalAddress());
|
|
return ssc.accept();
|
|
}
|
|
|
|
static long writeNBytes(SocketChannel channel,
|
|
ByteBuffer[] buffers, int offset, int len,
|
|
long bytes)
|
|
throws Exception
|
|
{
|
|
long total = 0L;
|
|
do {
|
|
long n = channel.write(buffers, offset, len);
|
|
assertTrue(n > 0, "got:" + n);
|
|
total += n;
|
|
} while (total < bytes);
|
|
return total;
|
|
}
|
|
|
|
static long readNBytes(SocketChannel channel,
|
|
ByteBuffer[] buffers, int offset, int len,
|
|
long bytes)
|
|
throws Exception
|
|
{
|
|
long total = 0L;
|
|
do {
|
|
long n = channel.read(buffers, offset, len);
|
|
assertTrue(n > 0, "got:" + n);
|
|
total += n;
|
|
} while (total < bytes);
|
|
return total;
|
|
}
|
|
}
|
|
|