6944584: Improvements to subprocess handling on Unix
Use thread pool for reaper thread; move most I/O operations out of reaper thread Reviewed-by: michaelm, hiroshi
This commit is contained in:
parent
5b0c4babe6
commit
45b78120fc
@ -418,6 +418,8 @@ public final class ProcessBuilder
|
||||
* Implements a <a href="#redirect-output">null input stream</a>.
|
||||
*/
|
||||
static class NullInputStream extends InputStream {
|
||||
static final NullInputStream INSTANCE = new NullInputStream();
|
||||
private NullInputStream() {}
|
||||
public int read() { return -1; }
|
||||
public int available() { return 0; }
|
||||
}
|
||||
@ -426,6 +428,8 @@ public final class ProcessBuilder
|
||||
* Implements a <a href="#redirect-input">null output stream</a>.
|
||||
*/
|
||||
static class NullOutputStream extends OutputStream {
|
||||
static final NullOutputStream INSTANCE = new NullOutputStream();
|
||||
private NullOutputStream() {}
|
||||
public void write(int b) throws IOException {
|
||||
throw new IOException("Stream closed");
|
||||
}
|
||||
|
@ -25,25 +25,42 @@
|
||||
|
||||
package java.lang;
|
||||
|
||||
import java.io.*;
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.BufferedOutputStream;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.FileDescriptor;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.util.Arrays;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.security.AccessController;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.security.PrivilegedActionException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
|
||||
/* java.lang.Process subclass in the UNIX environment.
|
||||
/**
|
||||
* java.lang.Process subclass in the UNIX environment.
|
||||
*
|
||||
* @author Mario Wolczko and Ross Knippel.
|
||||
* @author Konstantin Kladko (ported to Linux)
|
||||
* @author Martin Buchholz
|
||||
*/
|
||||
|
||||
final class UNIXProcess extends Process {
|
||||
private static final sun.misc.JavaIOFileDescriptorAccess fdAccess
|
||||
= sun.misc.SharedSecrets.getJavaIOFileDescriptorAccess();
|
||||
|
||||
private int pid;
|
||||
private final int pid;
|
||||
private int exitcode;
|
||||
private boolean hasExited;
|
||||
|
||||
private OutputStream stdin_stream;
|
||||
private InputStream stdout_stream;
|
||||
private InputStream stderr_stream;
|
||||
private /* final */ OutputStream stdin;
|
||||
private /* final */ InputStream stdout;
|
||||
private /* final */ InputStream stderr;
|
||||
|
||||
/* this is for the reaping thread */
|
||||
private native int waitForProcessExit(int pid);
|
||||
@ -51,155 +68,136 @@ final class UNIXProcess extends Process {
|
||||
/**
|
||||
* Create a process using fork(2) and exec(2).
|
||||
*
|
||||
* @param std_fds array of file descriptors. Indexes 0, 1, and
|
||||
* 2 correspond to standard input, standard output and
|
||||
* standard error, respectively. On input, a value of -1
|
||||
* means to create a pipe to connect child and parent
|
||||
* processes. On output, a value which is not -1 is the
|
||||
* parent pipe fd corresponding to the pipe which has
|
||||
* been created. An element of this array is -1 on input
|
||||
* if and only if it is <em>not</em> -1 on output.
|
||||
* @param fds an array of three file descriptors.
|
||||
* Indexes 0, 1, and 2 correspond to standard input,
|
||||
* standard output and standard error, respectively. On
|
||||
* input, a value of -1 means to create a pipe to connect
|
||||
* child and parent processes. On output, a value which
|
||||
* is not -1 is the parent pipe fd corresponding to the
|
||||
* pipe which has been created. An element of this array
|
||||
* is -1 on input if and only if it is <em>not</em> -1 on
|
||||
* output.
|
||||
* @return the pid of the subprocess
|
||||
*/
|
||||
private native int forkAndExec(byte[] prog,
|
||||
byte[] argBlock, int argc,
|
||||
byte[] envBlock, int envc,
|
||||
byte[] dir,
|
||||
int[] std_fds,
|
||||
int[] fds,
|
||||
boolean redirectErrorStream)
|
||||
throws IOException;
|
||||
|
||||
/* In the process constructor we wait on this gate until the process */
|
||||
/* has been created. Then we return from the constructor. */
|
||||
/* fork() is called by the same thread which later waits for the process */
|
||||
/* to terminate */
|
||||
/**
|
||||
* The thread factory used to create "process reaper" daemon threads.
|
||||
*/
|
||||
private static class ProcessReaperThreadFactory implements ThreadFactory {
|
||||
private final static ThreadGroup group = getRootThreadGroup();
|
||||
|
||||
private static class Gate {
|
||||
|
||||
private boolean exited = false;
|
||||
private IOException savedException;
|
||||
|
||||
synchronized void exit() { /* Opens the gate */
|
||||
exited = true;
|
||||
this.notify();
|
||||
private static ThreadGroup getRootThreadGroup() {
|
||||
return AccessController.doPrivileged
|
||||
(new PrivilegedAction<ThreadGroup> () {
|
||||
public ThreadGroup run() {
|
||||
ThreadGroup root = Thread.currentThread().getThreadGroup();
|
||||
while (root.getParent() != null)
|
||||
root = root.getParent();
|
||||
return root;
|
||||
}});
|
||||
}
|
||||
|
||||
synchronized void waitForExit() { /* wait until the gate is open */
|
||||
boolean interrupted = false;
|
||||
while (!exited) {
|
||||
try {
|
||||
this.wait();
|
||||
} catch (InterruptedException e) {
|
||||
interrupted = true;
|
||||
}
|
||||
}
|
||||
if (interrupted) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
void setException (IOException e) {
|
||||
savedException = e;
|
||||
}
|
||||
|
||||
IOException getException() {
|
||||
return savedException;
|
||||
public Thread newThread(Runnable grimReaper) {
|
||||
// Our thread stack requirement is quite modest.
|
||||
Thread t = new Thread(group, grimReaper, "process reaper", 32768);
|
||||
t.setDaemon(true);
|
||||
// A small attempt (probably futile) to avoid priority inversion
|
||||
t.setPriority(Thread.MAX_PRIORITY);
|
||||
return t;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The thread pool of "process reaper" daemon threads.
|
||||
*/
|
||||
private static final Executor processReaperExecutor
|
||||
= Executors.newCachedThreadPool(new ProcessReaperThreadFactory());
|
||||
|
||||
UNIXProcess(final byte[] prog,
|
||||
final byte[] argBlock, final int argc,
|
||||
final byte[] envBlock, final int envc,
|
||||
final byte[] dir,
|
||||
final int[] std_fds,
|
||||
final int[] fds,
|
||||
final boolean redirectErrorStream)
|
||||
throws IOException {
|
||||
throws IOException {
|
||||
|
||||
final Gate gate = new Gate();
|
||||
/*
|
||||
* For each subprocess forked a corresponding reaper thread
|
||||
* is started. That thread is the only thread which waits
|
||||
* for the subprocess to terminate and it doesn't hold any
|
||||
* locks while doing so. This design allows waitFor() and
|
||||
* exitStatus() to be safely executed in parallel (and they
|
||||
* need no native code).
|
||||
*/
|
||||
pid = forkAndExec(prog,
|
||||
argBlock, argc,
|
||||
envBlock, envc,
|
||||
dir,
|
||||
fds,
|
||||
redirectErrorStream);
|
||||
|
||||
java.security.AccessController.doPrivileged(
|
||||
new java.security.PrivilegedAction<Void>() {
|
||||
public Void run() {
|
||||
Thread t = new Thread("process reaper") {
|
||||
public void run() {
|
||||
try {
|
||||
pid = forkAndExec(prog,
|
||||
argBlock, argc,
|
||||
envBlock, envc,
|
||||
dir,
|
||||
std_fds,
|
||||
redirectErrorStream);
|
||||
} catch (IOException e) {
|
||||
gate.setException(e); /*remember to rethrow later*/
|
||||
gate.exit();
|
||||
return;
|
||||
}
|
||||
java.security.AccessController.doPrivileged(
|
||||
new java.security.PrivilegedAction<Void>() {
|
||||
public Void run() {
|
||||
if (std_fds[0] == -1)
|
||||
stdin_stream = new ProcessBuilder.NullOutputStream();
|
||||
else {
|
||||
FileDescriptor stdin_fd = new FileDescriptor();
|
||||
fdAccess.set(stdin_fd, std_fds[0]);
|
||||
stdin_stream = new BufferedOutputStream(
|
||||
new FileOutputStream(stdin_fd));
|
||||
}
|
||||
try {
|
||||
AccessController.doPrivileged
|
||||
(new PrivilegedExceptionAction<Void>() {
|
||||
public Void run() throws IOException {
|
||||
initStreams(fds);
|
||||
return null;
|
||||
}});
|
||||
} catch (PrivilegedActionException ex) {
|
||||
throw (IOException) ex.getException();
|
||||
}
|
||||
}
|
||||
|
||||
if (std_fds[1] == -1)
|
||||
stdout_stream = new ProcessBuilder.NullInputStream();
|
||||
else {
|
||||
FileDescriptor stdout_fd = new FileDescriptor();
|
||||
fdAccess.set(stdout_fd, std_fds[1]);
|
||||
stdout_stream = new BufferedInputStream(
|
||||
new FileInputStream(stdout_fd));
|
||||
}
|
||||
static FileDescriptor newFileDescriptor(int fd) {
|
||||
FileDescriptor fileDescriptor = new FileDescriptor();
|
||||
fdAccess.set(fileDescriptor, fd);
|
||||
return fileDescriptor;
|
||||
}
|
||||
|
||||
if (std_fds[2] == -1)
|
||||
stderr_stream = new ProcessBuilder.NullInputStream();
|
||||
else {
|
||||
FileDescriptor stderr_fd = new FileDescriptor();
|
||||
fdAccess.set(stderr_fd, std_fds[2]);
|
||||
stderr_stream = new FileInputStream(stderr_fd);
|
||||
}
|
||||
void initStreams(int[] fds) throws IOException {
|
||||
stdin = (fds[0] == -1) ?
|
||||
ProcessBuilder.NullOutputStream.INSTANCE :
|
||||
new ProcessPipeOutputStream(fds[0]);
|
||||
|
||||
return null; }});
|
||||
gate.exit(); /* exit from constructor */
|
||||
int res = waitForProcessExit(pid);
|
||||
synchronized (UNIXProcess.this) {
|
||||
hasExited = true;
|
||||
exitcode = res;
|
||||
UNIXProcess.this.notifyAll();
|
||||
}
|
||||
}
|
||||
};
|
||||
t.setDaemon(true);
|
||||
t.start();
|
||||
return null; }});
|
||||
gate.waitForExit();
|
||||
IOException e = gate.getException();
|
||||
if (e != null)
|
||||
throw new IOException(e.toString());
|
||||
stdout = (fds[1] == -1) ?
|
||||
ProcessBuilder.NullInputStream.INSTANCE :
|
||||
new ProcessPipeInputStream(fds[1]);
|
||||
|
||||
stderr = (fds[2] == -1) ?
|
||||
ProcessBuilder.NullInputStream.INSTANCE :
|
||||
new ProcessPipeInputStream(fds[2]);
|
||||
|
||||
processReaperExecutor.execute(new Runnable() {
|
||||
public void run() {
|
||||
int exitcode = waitForProcessExit(pid);
|
||||
UNIXProcess.this.processExited(exitcode);
|
||||
}});
|
||||
}
|
||||
|
||||
synchronized void processExited(int exitcode) {
|
||||
if (stdout instanceof ProcessPipeInputStream)
|
||||
((ProcessPipeInputStream) stdout).processExited();
|
||||
|
||||
if (stderr instanceof ProcessPipeInputStream)
|
||||
((ProcessPipeInputStream) stderr).processExited();
|
||||
|
||||
if (stdin instanceof ProcessPipeOutputStream)
|
||||
((ProcessPipeOutputStream) stdin).processExited();
|
||||
|
||||
this.exitcode = exitcode;
|
||||
hasExited = true;
|
||||
notifyAll();
|
||||
}
|
||||
|
||||
public OutputStream getOutputStream() {
|
||||
return stdin_stream;
|
||||
return stdin;
|
||||
}
|
||||
|
||||
public InputStream getInputStream() {
|
||||
return stdout_stream;
|
||||
return stdout;
|
||||
}
|
||||
|
||||
public InputStream getErrorStream() {
|
||||
return stderr_stream;
|
||||
return stderr;
|
||||
}
|
||||
|
||||
public synchronized int waitFor() throws InterruptedException {
|
||||
@ -228,13 +226,9 @@ final class UNIXProcess extends Process {
|
||||
if (!hasExited)
|
||||
destroyProcess(pid);
|
||||
}
|
||||
try {
|
||||
stdin_stream.close();
|
||||
stdout_stream.close();
|
||||
stderr_stream.close();
|
||||
} catch (IOException e) {
|
||||
// ignore
|
||||
}
|
||||
try { stdin.close(); } catch (IOException ignored) {}
|
||||
try { stdout.close(); } catch (IOException ignored) {}
|
||||
try { stderr.close(); } catch (IOException ignored) {}
|
||||
}
|
||||
|
||||
/* This routine initializes JNI field offsets for the class */
|
||||
@ -243,4 +237,77 @@ final class UNIXProcess extends Process {
|
||||
static {
|
||||
initIDs();
|
||||
}
|
||||
|
||||
/**
|
||||
* A buffered input stream for a subprocess pipe file descriptor
|
||||
* that allows the underlying file descriptor to be reclaimed when
|
||||
* the process exits, via the processExited hook.
|
||||
*
|
||||
* This is tricky because we do not want the user-level InputStream to be
|
||||
* closed until the user invokes close(), and we need to continue to be
|
||||
* able to read any buffered data lingering in the OS pipe buffer.
|
||||
*/
|
||||
static class ProcessPipeInputStream extends BufferedInputStream {
|
||||
ProcessPipeInputStream(int fd) {
|
||||
super(new FileInputStream(newFileDescriptor(fd)));
|
||||
}
|
||||
|
||||
private static byte[] drainInputStream(InputStream in)
|
||||
throws IOException {
|
||||
if (in == null) return null;
|
||||
int n = 0;
|
||||
int j;
|
||||
byte[] a = null;
|
||||
while ((j = in.available()) > 0) {
|
||||
a = (a == null) ? new byte[j] : Arrays.copyOf(a, n + j);
|
||||
n += in.read(a, n, j);
|
||||
}
|
||||
return (a == null || n == a.length) ? a : Arrays.copyOf(a, n);
|
||||
}
|
||||
|
||||
/** Called by the process reaper thread when the process exits. */
|
||||
synchronized void processExited() {
|
||||
// Most BufferedInputStream methods are synchronized, but close()
|
||||
// is not, and so we have to handle concurrent racing close().
|
||||
try {
|
||||
InputStream in = this.in;
|
||||
if (in != null) {
|
||||
byte[] stragglers = drainInputStream(in);
|
||||
in.close();
|
||||
this.in = (stragglers == null) ?
|
||||
ProcessBuilder.NullInputStream.INSTANCE :
|
||||
new ByteArrayInputStream(stragglers);
|
||||
if (buf == null) // asynchronous close()?
|
||||
this.in = null;
|
||||
}
|
||||
} catch (IOException ignored) {
|
||||
// probably an asynchronous close().
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A buffered output stream for a subprocess pipe file descriptor
|
||||
* that allows the underlying file descriptor to be reclaimed when
|
||||
* the process exits, via the processExited hook.
|
||||
*/
|
||||
static class ProcessPipeOutputStream extends BufferedOutputStream {
|
||||
ProcessPipeOutputStream(int fd) {
|
||||
super(new FileOutputStream(newFileDescriptor(fd)));
|
||||
}
|
||||
|
||||
/** Called by the process reaper thread when the process exits. */
|
||||
synchronized void processExited() {
|
||||
OutputStream out = this.out;
|
||||
if (out != null) {
|
||||
try {
|
||||
out.close();
|
||||
} catch (IOException ignored) {
|
||||
// We know of no reason to get an IOException, but if
|
||||
// we do, there's nothing else to do but carry on.
|
||||
}
|
||||
this.out = ProcessBuilder.NullOutputStream.INSTANCE;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -37,6 +37,7 @@ import static java.lang.ProcessBuilder.Redirect.*;
|
||||
|
||||
import java.io.*;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.security.*;
|
||||
import java.util.regex.Pattern;
|
||||
import static java.lang.System.getenv;
|
||||
@ -252,9 +253,9 @@ public class Basic {
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
static void print4095(OutputStream s) throws Throwable {
|
||||
static void print4095(OutputStream s, byte b) throws Throwable {
|
||||
byte[] bytes = new byte[4095];
|
||||
Arrays.fill(bytes, (byte) '!');
|
||||
Arrays.fill(bytes, b);
|
||||
s.write(bytes); // Might hang!
|
||||
}
|
||||
|
||||
@ -273,7 +274,9 @@ public class Basic {
|
||||
public static class JavaChild {
|
||||
public static void main(String args[]) throws Throwable {
|
||||
String action = args[0];
|
||||
if (action.equals("testIO")) {
|
||||
if (action.equals("sleep")) {
|
||||
Thread.sleep(10 * 60 * 1000L);
|
||||
} else if (action.equals("testIO")) {
|
||||
String expected = "standard input";
|
||||
char[] buf = new char[expected.length()+1];
|
||||
int n = new InputStreamReader(System.in).read(buf,0,buf.length);
|
||||
@ -315,7 +318,8 @@ public class Basic {
|
||||
printUTF8(new File(System.getProperty("user.dir"))
|
||||
.getCanonicalPath());
|
||||
} else if (action.equals("print4095")) {
|
||||
print4095(System.out);
|
||||
print4095(System.out, (byte) '!');
|
||||
print4095(System.err, (byte) 'E');
|
||||
System.exit(5);
|
||||
} else if (action.equals("OutErr")) {
|
||||
// You might think the system streams would be
|
||||
@ -1717,16 +1721,107 @@ public class Basic {
|
||||
} catch (Throwable t) { unexpected(t); }
|
||||
|
||||
//----------------------------------------------------------------
|
||||
// This would deadlock, if not for the fact that
|
||||
// Attempt to write 4095 bytes to the pipe buffer without a
|
||||
// reader to drain it would deadlock, if not for the fact that
|
||||
// interprocess pipe buffers are at least 4096 bytes.
|
||||
//
|
||||
// Also, check that available reports all the bytes expected
|
||||
// in the pipe buffer, and that I/O operations do the expected
|
||||
// things.
|
||||
//----------------------------------------------------------------
|
||||
try {
|
||||
List<String> childArgs = new ArrayList<String>(javaChildArgs);
|
||||
childArgs.add("print4095");
|
||||
Process p = new ProcessBuilder(childArgs).start();
|
||||
print4095(p.getOutputStream()); // Might hang!
|
||||
p.waitFor(); // Might hang!
|
||||
final int SIZE = 4095;
|
||||
final Process p = new ProcessBuilder(childArgs).start();
|
||||
print4095(p.getOutputStream(), (byte) '!'); // Might hang!
|
||||
p.waitFor(); // Might hang!
|
||||
equal(SIZE, p.getInputStream().available());
|
||||
equal(SIZE, p.getErrorStream().available());
|
||||
THROWS(IOException.class,
|
||||
new Fun(){void f() throws IOException {
|
||||
p.getOutputStream().write((byte) '!');
|
||||
p.getOutputStream().flush();
|
||||
}});
|
||||
|
||||
final byte[] bytes = new byte[SIZE + 1];
|
||||
equal(SIZE, p.getInputStream().read(bytes));
|
||||
for (int i = 0; i < SIZE; i++)
|
||||
equal((byte) '!', bytes[i]);
|
||||
equal((byte) 0, bytes[SIZE]);
|
||||
|
||||
equal(SIZE, p.getErrorStream().read(bytes));
|
||||
for (int i = 0; i < SIZE; i++)
|
||||
equal((byte) 'E', bytes[i]);
|
||||
equal((byte) 0, bytes[SIZE]);
|
||||
|
||||
equal(0, p.getInputStream().available());
|
||||
equal(0, p.getErrorStream().available());
|
||||
equal(-1, p.getErrorStream().read());
|
||||
equal(-1, p.getInputStream().read());
|
||||
|
||||
equal(p.exitValue(), 5);
|
||||
|
||||
p.getInputStream().close();
|
||||
p.getErrorStream().close();
|
||||
p.getOutputStream().close();
|
||||
|
||||
InputStream[] streams = { p.getInputStream(), p.getErrorStream() };
|
||||
for (final InputStream in : streams) {
|
||||
Fun[] ops = {
|
||||
new Fun(){void f() throws IOException {
|
||||
in.read(); }},
|
||||
new Fun(){void f() throws IOException {
|
||||
in.read(bytes); }},
|
||||
new Fun(){void f() throws IOException {
|
||||
in.available(); }}
|
||||
};
|
||||
for (Fun op : ops) {
|
||||
try {
|
||||
op.f();
|
||||
fail();
|
||||
} catch (IOException expected) {
|
||||
check(expected.getMessage()
|
||||
.matches("[Ss]tream [Cc]losed"));
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Throwable t) { unexpected(t); }
|
||||
|
||||
//----------------------------------------------------------------
|
||||
// Check that reads which are pending when Process.destroy is
|
||||
// called, get EOF, not IOException("Stream closed").
|
||||
//----------------------------------------------------------------
|
||||
try {
|
||||
final int cases = 4;
|
||||
for (int i = 0; i < cases; i++) {
|
||||
final int action = i;
|
||||
List<String> childArgs = new ArrayList<String>(javaChildArgs);
|
||||
childArgs.add("sleep");
|
||||
final byte[] bytes = new byte[10];
|
||||
final Process p = new ProcessBuilder(childArgs).start();
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
final Thread thread = new Thread() {
|
||||
public void run() {
|
||||
try {
|
||||
latch.countDown();
|
||||
int r;
|
||||
switch (action) {
|
||||
case 0: r = p.getInputStream().read(); break;
|
||||
case 1: r = p.getErrorStream().read(); break;
|
||||
case 2: r = p.getInputStream().read(bytes); break;
|
||||
case 3: r = p.getErrorStream().read(bytes); break;
|
||||
default: throw new Error();
|
||||
}
|
||||
equal(-1, r);
|
||||
} catch (Throwable t) { unexpected(t); }}};
|
||||
|
||||
thread.start();
|
||||
latch.await();
|
||||
Thread.sleep(10);
|
||||
p.destroy();
|
||||
thread.join();
|
||||
}
|
||||
} catch (Throwable t) { unexpected(t); }
|
||||
|
||||
//----------------------------------------------------------------
|
||||
@ -1741,7 +1836,6 @@ public class Basic {
|
||||
} catch (IOException e) {
|
||||
new File("./emptyCommand").delete();
|
||||
String m = e.getMessage();
|
||||
//e.printStackTrace();
|
||||
if (EnglishUnix.is() &&
|
||||
! matches(m, "Permission denied"))
|
||||
unexpected(e);
|
||||
|
Loading…
x
Reference in New Issue
Block a user