/*
 * Copyright (c) 2018, 2022, Oracle and/or its affiliates. All rights reserved.
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License version 2 only, as
 * published by the Free Software Foundation.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 * or visit www.oracle.com if you need additional information or have any
 * questions.
 */

/**
 * @test id=default
 * @bug 8284161
 * @summary Basic tests of virtual threads doing blocking I/O with java.net sockets
 * @enablePreview
 * @library /test/lib
 * @run testng/othervm/timeout=300 BlockingSocketOps
 */

/**
 * @test id=indirect-register
 * @summary Basic tests of virtual threads doing blocking I/O with java.net sockets
 * @enablePreview
 * @library /test/lib
 * @run testng/othervm/timeout=300 -Djdk.useDirectRegister BlockingSocketOps
 */

/**
 * @test id=no-vmcontinuations
 * @requires vm.continuations
 * @enablePreview
 * @library /test/lib
 * @run testng/othervm/timeout=300 -XX:+UnlockExperimentalVMOptions -XX:-VMContinuations BlockingSocketOps
 */

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.SocketTimeoutException;

import jdk.test.lib.thread.VThreadRunner;
import org.testng.annotations.Test;
import static org.testng.Assert.*;

public class BlockingSocketOps {

    private static final long DELAY = 2000;

    /**
     * Socket read/write, no blocking.
     */
    @Test
    public void testSocketReadWrite1() throws Exception {
        VThreadRunner.run(() -> {
            try (var connection = new Connection()) {
                Socket s1 = connection.socket1();
                Socket s2 = connection.socket2();

                // write should not block
                byte[] ba = "XXX".getBytes("UTF-8");
                s1.getOutputStream().write(ba);

                // read should not block
                ba = new byte[10];
                int n = s2.getInputStream().read(ba);
                assertTrue(n > 0);
                assertTrue(ba[0] == 'X');
            }
        });
    }

    /**
     * Virtual thread blocks in read.
     */
    @Test
    public void testSocketRead1() throws Exception {
        testSocketRead(0);
    }

    /**
     * Virtual thread blocks in timed read.
     */
    @Test
    public void testSocketRead2() throws Exception {
        testSocketRead(60_000);
    }

    void testSocketRead(int timeout) throws Exception {
        VThreadRunner.run(() -> {
            try (var connection = new Connection()) {
                Socket s1 = connection.socket1();
                Socket s2 = connection.socket2();

                // schedule write
                byte[] ba = "XXX".getBytes("UTF-8");
                ScheduledWriter.schedule(s1, ba, DELAY);

                // read should block
                if (timeout > 0) {
                    assert timeout > DELAY;
                    s2.setSoTimeout(timeout);
                }
                ba = new byte[10];
                int n = s2.getInputStream().read(ba);
                assertTrue(n > 0);
                assertTrue(ba[0] == 'X');
            }
        });
    }

    /**
     * Virtual thread blocks in write.
     */
    @Test
    public void testSocketWrite1() throws Exception {
        VThreadRunner.run(() -> {
            try (var connection = new Connection()) {
                Socket s1 = connection.socket1();
                Socket s2 = connection.socket2();

                // schedule thread to read to EOF
                ScheduledReader.schedule(s2, true, DELAY);

                // write should block
                byte[] ba = new byte[100*1024];
                OutputStream out = s1.getOutputStream();
                for (int i=0; i<1000; i++) {
                    out.write(ba);
                }
            }
        });
    }

    /**
     * Virtual thread blocks in read, peer closes connection.
     */
    @Test
    public void testSocketReadPeerClose1() throws Exception {
        VThreadRunner.run(() -> {
            try (var connection = new Connection()) {
                Socket s1 = connection.socket1();
                Socket s2 = connection.socket2();

                ScheduledCloser.schedule(s2, DELAY);

                int n = s1.getInputStream().read();
                assertTrue(n == -1);
            }
        });
    }

    /**
     * Virtual thread blocks in read, peer closes connection abruptly.
     */
    @Test
    public void testSocketReadPeerClose2() throws Exception {
        VThreadRunner.run(() -> {
            try (var connection = new Connection()) {
                Socket s1 = connection.socket1();
                Socket s2 = connection.socket2();

                s2.setSoLinger(true, 0);
                ScheduledCloser.schedule(s2, DELAY);

                try {
                    s1.getInputStream().read();
                    fail();
                } catch (IOException ioe) {
                    // expected
                }
            }
        });
    }

    /**
     * Socket close while virtual thread blocked in read.
     */
    @Test
    public void testSocketReadAsyncClose1() throws Exception {
        testSocketReadAsyncClose(0);
    }

    /**
     * Socket close while virtual thread blocked in timed read.
     */
    @Test
    public void testSocketReadAsyncClose2() throws Exception {
        testSocketReadAsyncClose(0);
    }

    void testSocketReadAsyncClose(int timeout) throws Exception {
        VThreadRunner.run(() -> {
            try (var connection = new Connection()) {
                Socket s = connection.socket1();
                ScheduledCloser.schedule(s, DELAY);
                try {
                    if (timeout > 0) {
                        assert timeout > DELAY;
                        s.setSoTimeout(timeout);
                    }
                    int n = s.getInputStream().read();
                    throw new RuntimeException("read returned " + n);
                } catch (SocketException expected) { }
            }
        });
    }

    /**
     * Virtual thread interrupted while blocked in Socket read.
     */
    @Test
    public void testSocketReadInterrupt1() throws Exception {
        testSocketReadInterrupt(0);
    }

    /**
     * Virtual thread interrupted while blocked in Socket read with timeout
     */
    @Test
    public void testSocketReadInterrupt2() throws Exception {
        testSocketReadInterrupt(60_000);
    }

    void testSocketReadInterrupt(int timeout) throws Exception {
        VThreadRunner.run(() -> {
            try (var connection = new Connection()) {
                Socket s = connection.socket1();
                ScheduledInterrupter.schedule(Thread.currentThread(), DELAY);
                try {
                    if (timeout > 0) {
                        assert timeout > DELAY;
                        s.setSoTimeout(timeout);
                    }
                    int n = s.getInputStream().read();
                    throw new RuntimeException("read returned " + n);
                } catch (SocketException expected) {
                    assertTrue(Thread.interrupted());
                    assertTrue(s.isClosed());
                }
            }
        });
    }

    /**
     * Socket close while virtual thread blocked in write.
     */
    @Test
    public void testSocketWriteAsyncClose() throws Exception {
        VThreadRunner.run(() -> {
            try (var connection = new Connection()) {
                Socket s = connection.socket1();
                ScheduledCloser.schedule(s, DELAY);
                try {
                    byte[] ba = new byte[100*1024];
                    OutputStream out = s.getOutputStream();
                    for (;;) {
                        out.write(ba);
                    }
                } catch (SocketException expected) { }
            }
        });
    }

    /**
     * Virtual thread interrupted while blocked in Socket write
     */
    @Test
    public void testSocketWriteInterrupt() throws Exception {
        VThreadRunner.run(() -> {
            try (var connection = new Connection()) {
                Socket s = connection.socket1();
                ScheduledInterrupter.schedule(Thread.currentThread(), DELAY);
                try {
                    byte[] ba = new byte[100*1024];
                    OutputStream out = s.getOutputStream();
                    for (;;) {
                        out.write(ba);
                    }
                } catch (SocketException expected) {
                    assertTrue(Thread.interrupted());
                    assertTrue(s.isClosed());
                }
            }
        });
    }

    /**
     * Virtual thread reading urgent data when SO_OOBINLINE is enabled.
     */
    @Test
    public void testSocketReadUrgentData() throws Exception {
        VThreadRunner.run(() -> {
            try (var connection = new Connection()) {
                Socket s1 = connection.socket1();
                Socket s2 = connection.socket2();

                // urgent data should be received
                ScheduledUrgentData.scheduleUrgentData(s2, 'X', DELAY);
                s1.setOOBInline(true);
                byte[] ba = new byte[10];
                int n = s1.getInputStream().read(ba);
                assertTrue(n == 1);
                assertTrue(ba[0] == 'X');

                // urgent data should not be received
                s1.setOOBInline(false);
                s1.setSoTimeout(500);
                s2.sendUrgentData('X');
                try {
                    s1.getInputStream().read(ba);
                    fail();
                } catch (SocketTimeoutException expected) { }
            }
        });
    }

    /**
     * ServerSocket accept, no blocking.
     */
    @Test
    public void testServerSocketAccept1() throws Exception {
        VThreadRunner.run(() -> {
            try (var listener = new ServerSocket(0)) {
                var socket1 = new Socket(listener.getInetAddress(), listener.getLocalPort());
                // accept should not block
                var socket2 = listener.accept();
                socket1.close();
                socket2.close();
            }
        });
    }

    /**
     * Virtual thread blocks in accept.
     */
    @Test
    public void testServerSocketAccept2() throws Exception {
        testServerSocketAccept(0);
    }

    /**
     * Virtual thread blocks in timed accept.
     */
    @Test
    public void testServerSocketAccept3() throws Exception {
        testServerSocketAccept(60_000);
    }

    void testServerSocketAccept(int timeout) throws Exception {
        VThreadRunner.run(() -> {
            try (var listener = new ServerSocket(0)) {
                var socket1 = new Socket();
                ScheduledConnector.schedule(socket1, listener.getLocalSocketAddress(), DELAY);
                // accept will block
                if (timeout > 0) {
                    assert timeout > DELAY;
                    listener.setSoTimeout(timeout);
                }
                var socket2 = listener.accept();
                socket1.close();
                socket2.close();
            }
        });
    }

    /**
     * ServerSocket close while virtual thread blocked in accept.
     */
    @Test
    public void testServerSocketAcceptAsyncClose1() throws Exception {
        testServerSocketAcceptAsyncClose(0);
    }

    /**
     * ServerSocket close while virtual thread blocked in timed accept.
     */
    @Test
    public void testServerSocketAcceptAsyncClose2() throws Exception {
        testServerSocketAcceptAsyncClose(60_000);
    }

    void testServerSocketAcceptAsyncClose(int timeout) throws Exception {
        VThreadRunner.run(() -> {
            try (var listener = new ServerSocket(0)) {
                ScheduledCloser.schedule(listener, DELAY);
                if (timeout > 0) {
                    assert timeout > DELAY;
                    listener.setSoTimeout(timeout);
                }
                try {
                    listener.accept().close();
                    throw new RuntimeException("connection accepted???");
                } catch (SocketException expected) { }
            }
        });
    }

    /**
     * Virtual thread interrupted while blocked in ServerSocket accept
     */
    @Test
    public void testServerSocketAcceptInterrupt1() throws Exception {
        testServerSocketAcceptInterrupt(0);
    }

    /**
     * Virtual thread interrupted while blocked in ServerSocket accept with timeout
     */
    @Test
    public void testServerSocketAcceptInterrupt2() throws Exception {
        testServerSocketAcceptInterrupt(60_000);
    }

    void testServerSocketAcceptInterrupt(int timeout) throws Exception {
        VThreadRunner.run(() -> {
            try (var listener = new ServerSocket(0)) {
                ScheduledInterrupter.schedule(Thread.currentThread(), DELAY);
                if (timeout > 0) {
                    assert timeout > DELAY;
                    listener.setSoTimeout(timeout);
                }
                try {
                    listener.accept().close();
                    throw new RuntimeException("connection accepted???");
                } catch (SocketException expected) {
                    assertTrue(Thread.interrupted());
                    assertTrue(listener.isClosed());
                }
            }
        });
    }

    /**
     * DatagramSocket receive/send, no blocking.
     */
    @Test
    public void testDatagramSocketSendReceive1() throws Exception {
        VThreadRunner.run(() -> {
            try (DatagramSocket s1 = new DatagramSocket(null);
                 DatagramSocket s2 = new DatagramSocket(null)) {

                InetAddress lh = InetAddress.getLoopbackAddress();
                s1.bind(new InetSocketAddress(lh, 0));
                s2.bind(new InetSocketAddress(lh, 0));

                // send should not block
                byte[] bytes = "XXX".getBytes("UTF-8");
                DatagramPacket p1 = new DatagramPacket(bytes, bytes.length);
                p1.setSocketAddress(s2.getLocalSocketAddress());
                s1.send(p1);

                // receive should not block
                byte[] ba = new byte[100];
                DatagramPacket p2 = new DatagramPacket(ba, ba.length);
                s2.receive(p2);
                assertEquals(p2.getSocketAddress(), s1.getLocalSocketAddress());
                assertTrue(ba[0] == 'X');
            }
        });
    }

    /**
     * Virtual thread blocks in DatagramSocket receive
     */
    @Test
    public void testDatagramSocketSendReceive2() throws Exception {
        testDatagramSocketSendReceive(0);
    }

    /**
     * Virtual thread blocks in DatagramSocket receive with timeout
     */
    @Test
    public void testDatagramSocketSendReceive3() throws Exception {
        testDatagramSocketSendReceive(60_000);
    }

    private void testDatagramSocketSendReceive(int timeout) throws Exception {
        VThreadRunner.run(() -> {
            try (DatagramSocket s1 = new DatagramSocket(null);
                 DatagramSocket s2 = new DatagramSocket(null)) {

                InetAddress lh = InetAddress.getLoopbackAddress();
                s1.bind(new InetSocketAddress(lh, 0));
                s2.bind(new InetSocketAddress(lh, 0));

                // schedule send
                byte[] bytes = "XXX".getBytes("UTF-8");
                DatagramPacket p1 = new DatagramPacket(bytes, bytes.length);
                p1.setSocketAddress(s2.getLocalSocketAddress());
                ScheduledSender.schedule(s1, p1, DELAY);

                // receive should block
                if (timeout > 0) {
                    assert timeout > DELAY;
                    s2.setSoTimeout(timeout);
                }
                byte[] ba = new byte[100];
                DatagramPacket p2 = new DatagramPacket(ba, ba.length);
                s2.receive(p2);
                assertEquals(p2.getSocketAddress(), s1.getLocalSocketAddress());
                assertTrue(ba[0] == 'X');
            }
        });
    }

    /**
     * Virtual thread blocks in DatagramSocket receive that times out
     */
    @Test
    public void testDatagramSocketReceiveTimeout() throws Exception {
        VThreadRunner.run(() -> {
            try (DatagramSocket s = new DatagramSocket(null)) {
                InetAddress lh = InetAddress.getLoopbackAddress();
                s.bind(new InetSocketAddress(lh, 0));
                s.setSoTimeout(2000);
                byte[] ba = new byte[100];
                DatagramPacket p = new DatagramPacket(ba, ba.length);
                try {
                    s.receive(p);
                    fail();
                } catch (SocketTimeoutException expected) { }
            }
        });
    }

    /**
     * DatagramSocket close while virtual thread blocked in receive.
     */
    @Test
    public void testDatagramSocketReceiveAsyncClose1() throws Exception {
        testDatagramSocketReceiveAsyncClose(0);
    }

    /**
     * DatagramSocket close while virtual thread blocked with timeout.
     */
    @Test
    public void testDatagramSocketReceiveAsyncClose2() throws Exception {
        testDatagramSocketReceiveAsyncClose(60_000);
    }

    private void testDatagramSocketReceiveAsyncClose(int timeout) throws Exception {
        VThreadRunner.run(() -> {
            try (DatagramSocket s = new DatagramSocket(null)) {
                InetAddress lh = InetAddress.getLoopbackAddress();
                s.bind(new InetSocketAddress(lh, 0));

                // schedule close
                ScheduledCloser.schedule(s, DELAY);

                // receive
                if (timeout > 0) {
                    assert timeout > DELAY;
                    s.setSoTimeout(timeout);
                }
                try {
                    byte[] ba = new byte[100];
                    DatagramPacket p = new DatagramPacket(ba, ba.length);
                    s.receive(p);
                    fail();
                } catch (SocketException expected) { }
            }
        });
    }

    /**
     * Virtual thread interrupted while blocked in DatagramSocket receive.
     */
    @Test
    public void testDatagramSocketReceiveInterrupt1() throws Exception {
        testDatagramSocketReceiveInterrupt(0);
    }

    /**
     * Virtual thread interrupted while blocked in DatagramSocket receive with timeout
     */
    @Test
    public void testDatagramSocketReceiveInterrupt2() throws Exception {
        testDatagramSocketReceiveInterrupt(60_000);
    }

    private void testDatagramSocketReceiveInterrupt(int timeout) throws Exception {
        VThreadRunner.run(() -> {
            try (DatagramSocket s = new DatagramSocket(null)) {
                InetAddress lh = InetAddress.getLoopbackAddress();
                s.bind(new InetSocketAddress(lh, 0));
                if (timeout > 0) {
                    assert timeout > DELAY;
                    s.setSoTimeout(timeout);
                }

                // schedule interrupt
                ScheduledInterrupter.schedule(Thread.currentThread(), DELAY);

                // receive
                try {
                    byte[] ba = new byte[100];
                    DatagramPacket p = new DatagramPacket(ba, ba.length);
                    s.receive(p);
                    fail();
                } catch (SocketException expected) {
                    assertTrue(Thread.interrupted());
                    assertTrue(s.isClosed());
                }
            }
        });
    }

    // -- supporting classes --

    /**
     * Creates a loopback connection
     */
    static class Connection implements Closeable {
        private final Socket s1;
        private final Socket s2;
        Connection() throws IOException {
            var lh = InetAddress.getLoopbackAddress();
            try (var listener = new ServerSocket()) {
                listener.bind(new InetSocketAddress(lh, 0));
                Socket s1 = new Socket();
                Socket s2;
                try {
                    s1.connect(listener.getLocalSocketAddress(), 10_000);
                    s2 = listener.accept();
                } catch (IOException ioe) {
                    s1.close();
                    throw ioe;
                }
                this.s1 = s1;
                this.s2 = s2;
            }

        }
        Socket socket1() {
            return s1;
        }
        Socket socket2() {
            return s2;
        }
        @Override
        public void close() throws IOException {
            s1.close();
            s2.close();
        }
    }

    /**
     * Closes a socket after a delay
     */
    static class ScheduledCloser implements Runnable {
        private final Closeable c;
        private final long delay;
        ScheduledCloser(Closeable c, long delay) {
            this.c = c;
            this.delay = delay;
        }
        @Override
        public void run() {
            try {
                Thread.sleep(delay);
                c.close();
            } catch (Exception e) { }
        }
        static void schedule(Closeable c, long delay) {
            new Thread(new ScheduledCloser(c, delay)).start();
        }
    }

    /**
     * Interrupts a thread after a delay
     */
    static class ScheduledInterrupter implements Runnable {
        private final Thread thread;
        private final long delay;

        ScheduledInterrupter(Thread thread, long delay) {
            this.thread = thread;
            this.delay = delay;
        }

        @Override
        public void run() {
            try {
                Thread.sleep(delay);
                thread.interrupt();
            } catch (Exception e) { }
        }

        static void schedule(Thread thread, long delay) {
            new Thread(new ScheduledInterrupter(thread, delay)).start();
        }
    }

    /**
     * Reads from a socket, and to EOF, after a delay
     */
    static class ScheduledReader implements Runnable {
        private final Socket s;
        private final boolean readAll;
        private final long delay;

        ScheduledReader(Socket s, boolean readAll, long delay) {
            this.s = s;
            this.readAll = readAll;
            this.delay = delay;
        }

        @Override
        public void run() {
            try {
                Thread.sleep(delay);
                byte[] ba = new byte[8192];
                InputStream in = s.getInputStream();
                for (;;) {
                    int n = in.read(ba);
                    if (n == -1 || !readAll)
                        break;
                }
            } catch (Exception e) { }
        }

        static void schedule(Socket s, boolean readAll, long delay) {
            new Thread(new ScheduledReader(s, readAll, delay)).start();
        }
    }

    /**
     * Writes to a socket after a delay
     */
    static class ScheduledWriter implements Runnable {
        private final Socket s;
        private final byte[] ba;
        private final long delay;

        ScheduledWriter(Socket s, byte[] ba, long delay) {
            this.s = s;
            this.ba = ba.clone();
            this.delay = delay;
        }

        @Override
        public void run() {
            try {
                Thread.sleep(delay);
                s.getOutputStream().write(ba);
            } catch (Exception e) { }
        }

        static void schedule(Socket s, byte[] ba, long delay) {
            new Thread(new ScheduledWriter(s, ba, delay)).start();
        }
    }

    /**
     * Establish a connection to a socket address after a delay
     */
    static class ScheduledConnector implements Runnable {
        private final Socket socket;
        private final SocketAddress address;
        private final long delay;

        ScheduledConnector(Socket socket, SocketAddress address, long delay) {
            this.socket = socket;
            this.address = address;
            this.delay = delay;
        }

        @Override
        public void run() {
            try {
                Thread.sleep(delay);
                socket.connect(address);
            } catch (Exception e) { }
        }

        static void schedule(Socket socket, SocketAddress address, long delay) {
            new Thread(new ScheduledConnector(socket, address, delay)).start();
        }
    }

    /**
     * Sends a datagram to a target address after a delay
     */
    static class ScheduledSender implements Runnable {
        private final DatagramSocket socket;
        private final DatagramPacket packet;
        private final long delay;

        ScheduledSender(DatagramSocket socket, DatagramPacket packet, long delay) {
            this.socket = socket;
            this.packet = packet;
            this.delay = delay;
        }

        @Override
        public void run() {
            try {
                Thread.sleep(delay);
                socket.send(packet);
            } catch (Exception e) { }
        }

        static void schedule(DatagramSocket socket, DatagramPacket packet, long delay) {
            new Thread(new ScheduledSender(socket, packet, delay)).start();
        }
    }

    /**
     * Sends urgent data after a delay
     */
    static class ScheduledUrgentData implements Runnable {
        private final Socket s;
        private final int data;
        private final long delay;

        ScheduledUrgentData(Socket s, int data, long delay) {
            this.s = s;
            this.data = data;
            this.delay = delay;
        }

        @Override
        public void run() {
            try {
                Thread.sleep(delay);
                s.sendUrgentData(data);
            } catch (Exception e) { }
        }

        static void scheduleUrgentData(Socket s, int data, long delay) {
            new Thread(new ScheduledUrgentData(s, data, delay)).start();
        }
    }
}