8027348: (process) Enhancement of handling async close of ProcessInputStream
Reviewed-by: martin
This commit is contained in:
parent
49605b7bf9
commit
592907e7f2
@ -342,47 +342,39 @@ final class UNIXProcess extends Process {
|
||||
ProcessPipeInputStream(int fd) {
|
||||
super(new FileInputStream(newFileDescriptor(fd)));
|
||||
}
|
||||
|
||||
private InputStream drainInputStream(InputStream in)
|
||||
private static byte[] drainInputStream(InputStream in)
|
||||
throws IOException {
|
||||
int n = 0;
|
||||
int j;
|
||||
byte[] a = null;
|
||||
synchronized (closeLock) {
|
||||
if (buf == null) // asynchronous close()?
|
||||
return null; // discard
|
||||
j = in.available();
|
||||
}
|
||||
while (j > 0) {
|
||||
while ((j = in.available()) > 0) {
|
||||
a = (a == null) ? new byte[j] : Arrays.copyOf(a, n + j);
|
||||
synchronized (closeLock) {
|
||||
if (buf == null) // asynchronous close()?
|
||||
return null; // discard
|
||||
n += in.read(a, n, j);
|
||||
j = in.available();
|
||||
}
|
||||
n += in.read(a, n, j);
|
||||
}
|
||||
return (a == null) ?
|
||||
ProcessBuilder.NullInputStream.INSTANCE :
|
||||
new ByteArrayInputStream(n == a.length ? a : Arrays.copyOf(a, n));
|
||||
return (a == null || n == a.length) ? a : Arrays.copyOf(a, n);
|
||||
}
|
||||
|
||||
/** Called by the process reaper thread when the process exits. */
|
||||
synchronized void processExited() {
|
||||
try {
|
||||
InputStream in = this.in;
|
||||
if (in != null) {
|
||||
InputStream stragglers = drainInputStream(in);
|
||||
in.close();
|
||||
this.in = stragglers;
|
||||
}
|
||||
} catch (IOException ignored) { }
|
||||
synchronized (closeLock) {
|
||||
try {
|
||||
InputStream in = this.in;
|
||||
// this stream is closed if and only if: in == null
|
||||
if (in != null) {
|
||||
byte[] stragglers = drainInputStream(in);
|
||||
in.close();
|
||||
this.in = (stragglers == null) ?
|
||||
ProcessBuilder.NullInputStream.INSTANCE :
|
||||
new ByteArrayInputStream(stragglers);
|
||||
}
|
||||
} catch (IOException ignored) {}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
// BufferedInputStream#close() is not synchronized unlike most other methods.
|
||||
// Synchronizing helps avoid racing with drainInputStream().
|
||||
// Synchronizing helps avoid race with processExited().
|
||||
synchronized (closeLock) {
|
||||
super.close();
|
||||
}
|
||||
|
@ -344,47 +344,39 @@ final class UNIXProcess extends Process {
|
||||
ProcessPipeInputStream(int fd) {
|
||||
super(new FileInputStream(newFileDescriptor(fd)));
|
||||
}
|
||||
|
||||
private InputStream drainInputStream(InputStream in)
|
||||
private static byte[] drainInputStream(InputStream in)
|
||||
throws IOException {
|
||||
int n = 0;
|
||||
int j;
|
||||
byte[] a = null;
|
||||
synchronized (closeLock) {
|
||||
if (buf == null) // asynchronous close()?
|
||||
return null; // discard
|
||||
j = in.available();
|
||||
}
|
||||
while (j > 0) {
|
||||
while ((j = in.available()) > 0) {
|
||||
a = (a == null) ? new byte[j] : Arrays.copyOf(a, n + j);
|
||||
synchronized (closeLock) {
|
||||
if (buf == null) // asynchronous close()?
|
||||
return null; // discard
|
||||
n += in.read(a, n, j);
|
||||
j = in.available();
|
||||
}
|
||||
n += in.read(a, n, j);
|
||||
}
|
||||
return (a == null) ?
|
||||
ProcessBuilder.NullInputStream.INSTANCE :
|
||||
new ByteArrayInputStream(n == a.length ? a : Arrays.copyOf(a, n));
|
||||
return (a == null || n == a.length) ? a : Arrays.copyOf(a, n);
|
||||
}
|
||||
|
||||
/** Called by the process reaper thread when the process exits. */
|
||||
synchronized void processExited() {
|
||||
try {
|
||||
InputStream in = this.in;
|
||||
if (in != null) {
|
||||
InputStream stragglers = drainInputStream(in);
|
||||
in.close();
|
||||
this.in = stragglers;
|
||||
}
|
||||
} catch (IOException ignored) { }
|
||||
synchronized (closeLock) {
|
||||
try {
|
||||
InputStream in = this.in;
|
||||
// this stream is closed if and only if: in == null
|
||||
if (in != null) {
|
||||
byte[] stragglers = drainInputStream(in);
|
||||
in.close();
|
||||
this.in = (stragglers == null) ?
|
||||
ProcessBuilder.NullInputStream.INSTANCE :
|
||||
new ByteArrayInputStream(stragglers);
|
||||
}
|
||||
} catch (IOException ignored) {}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
// BufferedInputStream#close() is not synchronized unlike most other methods.
|
||||
// Synchronizing helps avoid racing with drainInputStream().
|
||||
// Synchronizing helps avoid race with processExited().
|
||||
synchronized (closeLock) {
|
||||
super.close();
|
||||
}
|
||||
|
140
jdk/test/java/lang/ProcessBuilder/CloseRace.java
Normal file
140
jdk/test/java/lang/ProcessBuilder/CloseRace.java
Normal file
@ -0,0 +1,140 @@
|
||||
/*
|
||||
* Copyright (c) 2013, 2014 Oracle and/or its affiliates. All rights reserved.
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
* under the terms of the GNU General Public License version 2 only, as
|
||||
* published by the Free Software Foundation.
|
||||
*
|
||||
* This code is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
||||
* version 2 for more details (a copy is included in the LICENSE file that
|
||||
* accompanied this code).
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License version
|
||||
* 2 along with this work; if not, write to the Free Software Foundation,
|
||||
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
*
|
||||
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
|
||||
* or visit www.oracle.com if you need additional information or have any
|
||||
* questions.
|
||||
*/
|
||||
|
||||
/**
|
||||
* @test
|
||||
* @bug 8024521
|
||||
* @summary Closing ProcessPipeInputStream at the time the process exits is racy
|
||||
* and leads to data corruption. Run this test manually (as
|
||||
* an ordinary java program) with -Xmx8M to repro bug 8024521.
|
||||
* @run main/othervm -Xmx8M -Dtest.duration=2 CloseRace
|
||||
*/
|
||||
|
||||
import java.io.*;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
public class CloseRace {
|
||||
private static final String BIG_FILE = "bigfile";
|
||||
|
||||
private static final int[] procFDs = new int[6];
|
||||
|
||||
/** default value sufficient to repro bug 8024521. */
|
||||
private static final int testDurationSeconds
|
||||
= Integer.getInteger("test.duration", 600);
|
||||
|
||||
static boolean fdInUse(int i) {
|
||||
return new File("/proc/self/fd/" + i).exists();
|
||||
}
|
||||
|
||||
static boolean[] procFDsInUse() {
|
||||
boolean[] inUse = new boolean[procFDs.length];
|
||||
for (int i = 0; i < procFDs.length; i++)
|
||||
inUse[i] = fdInUse(procFDs[i]);
|
||||
return inUse;
|
||||
}
|
||||
|
||||
static int count(boolean[] bits) {
|
||||
int count = 0;
|
||||
for (int i = 0; i < bits.length; i++)
|
||||
count += bits[i] ? 1 : 0;
|
||||
return count;
|
||||
}
|
||||
|
||||
public static void main(String args[]) throws Exception {
|
||||
if (!(new File("/proc/self/fd").isDirectory()))
|
||||
return;
|
||||
|
||||
// Catch Errors from process reaper
|
||||
Thread.setDefaultUncaughtExceptionHandler
|
||||
((t, e) -> { e.printStackTrace(); System.exit(1); });
|
||||
|
||||
try (RandomAccessFile f = new RandomAccessFile(BIG_FILE, "rw")) {
|
||||
f.setLength(Runtime.getRuntime().maxMemory()); // provoke OOME
|
||||
}
|
||||
|
||||
for (int i = 0, j = 0; j < procFDs.length; i++)
|
||||
if (!fdInUse(i))
|
||||
procFDs[j++] = i;
|
||||
|
||||
Thread[] threads = {
|
||||
new Thread(new OpenLoop()),
|
||||
new Thread(new ExecLoop()),
|
||||
};
|
||||
for (Thread thread : threads)
|
||||
thread.start();
|
||||
|
||||
Thread.sleep(testDurationSeconds * 1000);
|
||||
|
||||
for (Thread thread : threads)
|
||||
thread.interrupt();
|
||||
for (Thread thread : threads)
|
||||
thread.join();
|
||||
}
|
||||
|
||||
static class OpenLoop implements Runnable {
|
||||
public void run() {
|
||||
while (!Thread.interrupted()) {
|
||||
try {
|
||||
// wait for ExecLoop to finish creating process
|
||||
do {} while (count(procFDsInUse()) != 3);
|
||||
List<InputStream> iss = new ArrayList<>(4);
|
||||
|
||||
// eat up three "holes" (closed ends of pipe fd pairs)
|
||||
for (int i = 0; i < 3; i++)
|
||||
iss.add(new FileInputStream(BIG_FILE));
|
||||
do {} while (count(procFDsInUse()) == procFDs.length);
|
||||
// hopefully this will racily occupy empty fd slot
|
||||
iss.add(new FileInputStream(BIG_FILE));
|
||||
Thread.sleep(1); // Widen race window
|
||||
for (InputStream is : iss)
|
||||
is.close();
|
||||
} catch (InterruptedException e) {
|
||||
break;
|
||||
} catch (Exception e) {
|
||||
throw new Error(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static class ExecLoop implements Runnable {
|
||||
public void run() {
|
||||
ProcessBuilder builder = new ProcessBuilder("/bin/true");
|
||||
while (!Thread.interrupted()) {
|
||||
try {
|
||||
// wait for OpenLoop to finish
|
||||
do {} while (count(procFDsInUse()) > 0);
|
||||
Process process = builder.start();
|
||||
InputStream is = process.getInputStream();
|
||||
process.waitFor();
|
||||
is.close();
|
||||
} catch (InterruptedException e) {
|
||||
break;
|
||||
} catch (Exception e) {
|
||||
throw new Error(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -1,146 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved.
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
* under the terms of the GNU General Public License version 2 only, as
|
||||
* published by the Free Software Foundation.
|
||||
*
|
||||
* This code is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
||||
* version 2 for more details (a copy is included in the LICENSE file that
|
||||
* accompanied this code).
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License version
|
||||
* 2 along with this work; if not, write to the Free Software Foundation,
|
||||
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
*
|
||||
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
|
||||
* or visit www.oracle.com if you need additional information or have any
|
||||
* questions.
|
||||
*/
|
||||
|
||||
/**
|
||||
* @test
|
||||
* @bug 8024521
|
||||
* @summary Closing ProcessPipeInputStream at the time the process exits is racy
|
||||
* and leads to the data corruption.
|
||||
* @library /lib/testlibrary
|
||||
* @run main/othervm/timeout=80 CloseRace
|
||||
*/
|
||||
|
||||
/**
|
||||
* This test has a little chance to catch the race during the given default
|
||||
* time gap of 20 seconds. To increase the time gap, set the system property
|
||||
* CloseRaceTimeGap=N to the number of seconds.
|
||||
* Jtreg's timeoutFactor should also be set appropriately.
|
||||
*
|
||||
* For example, to run the test for 10 minutes:
|
||||
* > jtreg \
|
||||
* -testjdk:$(PATH_TO_TESTED_JDK) \
|
||||
* -timeoutFactor:10 \
|
||||
* -DCloseRaceTimeGap=600 \
|
||||
* $(PATH_TO_TESTED_JDK_SOURCE)/test/java/lang/Runtime/exec/CloseRace.java
|
||||
*/
|
||||
|
||||
import java.io.*;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import jdk.testlibrary.OutputAnalyzer;
|
||||
import static jdk.testlibrary.ProcessTools.*;
|
||||
|
||||
public class CloseRace {
|
||||
|
||||
public static void main(String args[]) throws Exception {
|
||||
ProcessBuilder pb = createJavaProcessBuilder("-Xmx64M", "CloseRace$Child",
|
||||
System.getProperty("CloseRaceTimeGap", "20"));
|
||||
OutputAnalyzer oa = new OutputAnalyzer(pb.start());
|
||||
oa.stderrShouldNotContain("java.lang.OutOfMemoryError");
|
||||
}
|
||||
|
||||
public static class Child {
|
||||
private static final String BIG_FILE = "bigfile";
|
||||
private static final String SMALL_FILE = "smallfile";
|
||||
private static int timeGap = 20; // seconds
|
||||
|
||||
public static void main(String args[]) throws Exception {
|
||||
if (args.length > 0) {
|
||||
try {
|
||||
timeGap = Integer.parseUnsignedInt(args[0]);
|
||||
timeGap = Integer.max(timeGap, 10);
|
||||
timeGap = Integer.min(timeGap, 10 * 60 * 60); // no more than 10 hours
|
||||
} catch (NumberFormatException ignore) {}
|
||||
}
|
||||
try (RandomAccessFile f = new RandomAccessFile(BIG_FILE, "rw")) {
|
||||
f.setLength(1024 * 1024 * 1024); // 1 Gb, greater than max heap size
|
||||
}
|
||||
try (FileOutputStream fs = new FileOutputStream(SMALL_FILE);
|
||||
PrintStream ps = new PrintStream(fs)) {
|
||||
for (int i = 0; i < 128; ++i)
|
||||
ps.println("line of text");
|
||||
}
|
||||
|
||||
List<Thread> threads = new LinkedList<>();
|
||||
for (int i = 0; i < 99; ++i) {
|
||||
Thread t = new Thread (new OpenLoop());
|
||||
t.start();
|
||||
threads.add(t);
|
||||
}
|
||||
Thread t2 = new Thread (new ExecLoop());
|
||||
t2.start();
|
||||
threads.add(t2);
|
||||
|
||||
Thread.sleep(timeGap);
|
||||
|
||||
for (Thread t : threads) {
|
||||
t.interrupt();
|
||||
t.join();
|
||||
}
|
||||
}
|
||||
|
||||
private static class OpenLoop implements Runnable {
|
||||
public void run() {
|
||||
final Path bigFilePath = Paths.get(BIG_FILE);
|
||||
while (!Thread.interrupted()) {
|
||||
try (InputStream in = Files.newInputStream(bigFilePath)) {
|
||||
// Widen the race window by sleeping 1ms
|
||||
Thread.sleep(1);
|
||||
} catch (InterruptedException e) {
|
||||
break;
|
||||
} catch (Exception e) {
|
||||
System.err.println(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static class ExecLoop implements Runnable {
|
||||
public void run() {
|
||||
List<String> command = new ArrayList<>(
|
||||
Arrays.asList("/bin/cat", SMALL_FILE));
|
||||
while (!Thread.interrupted()) {
|
||||
try {
|
||||
ProcessBuilder builder = new ProcessBuilder(command);
|
||||
final Process process = builder.start();
|
||||
InputStream is = process.getInputStream();
|
||||
InputStreamReader isr = new InputStreamReader(is);
|
||||
BufferedReader br = new BufferedReader(isr);
|
||||
while (br.readLine() != null) {}
|
||||
process.waitFor();
|
||||
isr.close();
|
||||
} catch (InterruptedException e) {
|
||||
break;
|
||||
} catch (Exception e) {
|
||||
System.err.println(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user