8222532: (zipfs) Performance regression when writing ZipFileSystem entries in parallel
Reviewed-by: lancea, clanger, alanb
This commit is contained in:
parent
ef40115621
commit
ad207e278d
@ -599,11 +599,11 @@ class ZipFileSystem extends FileSystem {
|
||||
throw new IllegalArgumentException("APPEND + TRUNCATE_EXISTING not allowed");
|
||||
}
|
||||
|
||||
|
||||
// Returns an output SeekableByteChannel for either
|
||||
// (1) writing the contents of a new entry, if the entry doesn't exit, or
|
||||
// (2) updating/replacing the contents of an existing entry.
|
||||
// Note: The content is not compressed.
|
||||
// Note: The content of the channel is not compressed until the
|
||||
// channel is closed
|
||||
private class EntryOutputChannel extends ByteArrayChannel {
|
||||
Entry e;
|
||||
|
||||
@ -622,19 +622,19 @@ class ZipFileSystem extends FileSystem {
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
e.bytes = toByteArray();
|
||||
e.size = e.bytes.length;
|
||||
e.crc = -1;
|
||||
// will update the entry
|
||||
try (OutputStream os = getOutputStream(e)) {
|
||||
os.write(toByteArray());
|
||||
}
|
||||
super.close();
|
||||
update(e);
|
||||
}
|
||||
}
|
||||
|
||||
private int getCompressMethod(FileAttribute<?>... attrs) {
|
||||
private int getCompressMethod() {
|
||||
return defaultMethod;
|
||||
}
|
||||
|
||||
// Returns a Writable/ReadByteChannel for now. Might consdier to use
|
||||
// Returns a Writable/ReadByteChannel for now. Might consider to use
|
||||
// newFileChannel() instead, which dump the entry data into a regular
|
||||
// file on the default file system and create a FileChannel on top of
|
||||
// it.
|
||||
@ -647,10 +647,9 @@ class ZipFileSystem extends FileSystem {
|
||||
if (options.contains(StandardOpenOption.WRITE) ||
|
||||
options.contains(StandardOpenOption.APPEND)) {
|
||||
checkWritable();
|
||||
beginRead(); // only need a readlock, the "update()" will obtain
|
||||
// thewritelock when the channel is closed
|
||||
beginRead(); // only need a read lock, the "update()" will obtain
|
||||
// the write lock when the channel is closed
|
||||
try {
|
||||
ensureOpen();
|
||||
Entry e = getEntry(path);
|
||||
if (e != null) {
|
||||
if (e.isDir() || options.contains(CREATE_NEW))
|
||||
@ -675,8 +674,7 @@ class ZipFileSystem extends FileSystem {
|
||||
throw new NoSuchFileException(getString(path));
|
||||
checkParents(path);
|
||||
return new EntryOutputChannel(
|
||||
new Entry(path, Entry.NEW, false, getCompressMethod(attrs)));
|
||||
|
||||
new Entry(path, Entry.NEW, false, getCompressMethod()));
|
||||
} finally {
|
||||
endRead();
|
||||
}
|
||||
@ -743,7 +741,7 @@ class ZipFileSystem extends FileSystem {
|
||||
final Entry u = isFCH ? e : new Entry(path, tmpfile, Entry.FILECH);
|
||||
if (forWrite) {
|
||||
u.flag = FLAG_DATADESCR;
|
||||
u.method = getCompressMethod(attrs);
|
||||
u.method = getCompressMethod();
|
||||
}
|
||||
// is there a better way to hook into the FileChannel's close method?
|
||||
return new FileChannel() {
|
||||
@ -844,7 +842,11 @@ class ZipFileSystem extends FileSystem {
|
||||
|
||||
// the outstanding input streams that need to be closed
|
||||
private Set<InputStream> streams =
|
||||
Collections.synchronizedSet(new HashSet<InputStream>());
|
||||
Collections.synchronizedSet(new HashSet<>());
|
||||
|
||||
// the ex-channel and ex-path that need to close when their outstanding
|
||||
// input streams are all closed by the obtainers.
|
||||
private Set<ExistingChannelCloser> exChClosers = new HashSet<>();
|
||||
|
||||
private Set<Path> tmppaths = Collections.synchronizedSet(new HashSet<Path>());
|
||||
private Path getTempPathForEntry(byte[] path) throws IOException {
|
||||
@ -1202,25 +1204,20 @@ class ZipFileSystem extends FileSystem {
|
||||
return written;
|
||||
}
|
||||
|
||||
private long writeEntry(Entry e, OutputStream os, byte[] buf)
|
||||
private long writeEntry(Entry e, OutputStream os)
|
||||
throws IOException {
|
||||
|
||||
if (e.bytes == null && e.file == null) // dir, 0-length data
|
||||
return 0;
|
||||
|
||||
long written = 0;
|
||||
try (OutputStream os2 = e.method == METHOD_STORED ?
|
||||
new EntryOutputStreamCRC32(e, os) : new EntryOutputStreamDef(e, os)) {
|
||||
if (e.bytes != null) { // in-memory
|
||||
os2.write(e.bytes, 0, e.bytes.length);
|
||||
} else if (e.file != null) { // tmp file
|
||||
if (e.type == Entry.NEW || e.type == Entry.FILECH) {
|
||||
try (InputStream is = Files.newInputStream(e.file)) {
|
||||
is.transferTo(os2);
|
||||
}
|
||||
}
|
||||
Files.delete(e.file);
|
||||
tmppaths.remove(e.file);
|
||||
if (e.crc != 0 && e.csize > 0) {
|
||||
// pre-compressed entry, write directly to output stream
|
||||
writeTo(e, os);
|
||||
} else {
|
||||
try (OutputStream os2 = (e.method == METHOD_STORED) ?
|
||||
new EntryOutputStreamCRC32(e, os) : new EntryOutputStreamDef(e, os)) {
|
||||
writeTo(e, os2);
|
||||
}
|
||||
}
|
||||
written += e.csize;
|
||||
@ -1230,18 +1227,38 @@ class ZipFileSystem extends FileSystem {
|
||||
return written;
|
||||
}
|
||||
|
||||
private void writeTo(Entry e, OutputStream os) throws IOException {
|
||||
if (e.bytes != null) {
|
||||
os.write(e.bytes, 0, e.bytes.length);
|
||||
} else if (e.file != null) {
|
||||
if (e.type == Entry.NEW || e.type == Entry.FILECH) {
|
||||
try (InputStream is = Files.newInputStream(e.file)) {
|
||||
is.transferTo(os);
|
||||
}
|
||||
}
|
||||
Files.delete(e.file);
|
||||
tmppaths.remove(e.file);
|
||||
}
|
||||
}
|
||||
|
||||
// sync the zip file system, if there is any udpate
|
||||
private void sync() throws IOException {
|
||||
|
||||
// check ex-closer
|
||||
if (!exChClosers.isEmpty()) {
|
||||
for (ExistingChannelCloser ecc : exChClosers) {
|
||||
if (ecc.closeAndDeleteIfDone()) {
|
||||
exChClosers.remove(ecc);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!hasUpdate)
|
||||
return;
|
||||
Path tmpFile = createTempFileInSameDirectoryAs(zfpath);
|
||||
try (OutputStream os = new BufferedOutputStream(Files.newOutputStream(tmpFile, WRITE)))
|
||||
{
|
||||
try (OutputStream os = new BufferedOutputStream(Files.newOutputStream(tmpFile, WRITE))) {
|
||||
ArrayList<Entry> elist = new ArrayList<>(inodes.size());
|
||||
long written = 0;
|
||||
byte[] buf = new byte[8192];
|
||||
Entry e = null;
|
||||
byte[] buf = null;
|
||||
Entry e;
|
||||
|
||||
// write loc
|
||||
for (IndexNode inode : inodes.values()) {
|
||||
@ -1254,11 +1271,13 @@ class ZipFileSystem extends FileSystem {
|
||||
// LOC in new file and simply copy the rest (data and
|
||||
// ext) without enflating/deflating from the old zip
|
||||
// file LOC entry.
|
||||
if (buf == null)
|
||||
buf = new byte[8192];
|
||||
written += copyLOCEntry(e, true, os, written, buf);
|
||||
} else { // NEW, FILECH or CEN
|
||||
e.locoff = written;
|
||||
written += e.writeLOC(os); // write loc header
|
||||
written += writeEntry(e, os, buf);
|
||||
written += writeEntry(e, os);
|
||||
}
|
||||
elist.add(e);
|
||||
} catch (IOException x) {
|
||||
@ -1274,6 +1293,8 @@ class ZipFileSystem extends FileSystem {
|
||||
}
|
||||
e = Entry.readCEN(this, inode);
|
||||
try {
|
||||
if (buf == null)
|
||||
buf = new byte[8192];
|
||||
written += copyLOCEntry(e, false, os, written, buf);
|
||||
elist.add(e);
|
||||
} catch (IOException x) {
|
||||
@ -1291,9 +1312,23 @@ class ZipFileSystem extends FileSystem {
|
||||
end.cenlen = written - end.cenoff;
|
||||
end.write(os, written, forceEnd64);
|
||||
}
|
||||
if (!streams.isEmpty()) {
|
||||
//
|
||||
// There are outstanding input streams open on existing "ch",
|
||||
// so, don't close the "cha" and delete the "file for now, let
|
||||
// the "ex-channel-closer" to handle them
|
||||
Path path = createTempFileInSameDirectoryAs(zfpath);
|
||||
ExistingChannelCloser ecc = new ExistingChannelCloser(path,
|
||||
ch,
|
||||
streams);
|
||||
Files.move(zfpath, path, REPLACE_EXISTING);
|
||||
exChClosers.add(ecc);
|
||||
streams = Collections.synchronizedSet(new HashSet<>());
|
||||
} else {
|
||||
ch.close();
|
||||
Files.delete(zfpath);
|
||||
}
|
||||
|
||||
ch.close();
|
||||
Files.delete(zfpath);
|
||||
Files.move(tmpFile, zfpath, REPLACE_EXISTING);
|
||||
hasUpdate = false; // clear
|
||||
}
|
||||
@ -1351,11 +1386,15 @@ class ZipFileSystem extends FileSystem {
|
||||
} else {
|
||||
os = new ByteArrayOutputStream((e.size > 0)? (int)e.size : 8192);
|
||||
}
|
||||
return new EntryOutputStream(e, os);
|
||||
if (e.method == METHOD_DEFLATED) {
|
||||
return new DeflatingEntryOutputStream(e, os);
|
||||
} else {
|
||||
return new EntryOutputStream(e, os);
|
||||
}
|
||||
}
|
||||
|
||||
private class EntryOutputStream extends FilterOutputStream {
|
||||
private Entry e;
|
||||
private final Entry e;
|
||||
private long written;
|
||||
private boolean isClosed;
|
||||
|
||||
@ -1392,13 +1431,56 @@ class ZipFileSystem extends FileSystem {
|
||||
}
|
||||
}
|
||||
|
||||
// Output stream returned when writing "deflated" entries into memory,
|
||||
// to enable eager (possibly parallel) deflation and reduce memory required.
|
||||
private class DeflatingEntryOutputStream extends DeflaterOutputStream {
|
||||
private final CRC32 crc;
|
||||
private final Entry e;
|
||||
private boolean isClosed;
|
||||
|
||||
DeflatingEntryOutputStream(Entry e, OutputStream os) throws IOException {
|
||||
super(os, getDeflater());
|
||||
this.e = Objects.requireNonNull(e, "Zip entry is null");
|
||||
this.crc = new CRC32();
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void write(int b) throws IOException {
|
||||
super.write(b);
|
||||
crc.update(b);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void write(byte b[], int off, int len)
|
||||
throws IOException {
|
||||
super.write(b, off, len);
|
||||
crc.update(b, off, len);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void close() throws IOException {
|
||||
if (isClosed)
|
||||
return;
|
||||
isClosed = true;
|
||||
finish();
|
||||
e.size = def.getBytesRead();
|
||||
e.csize = def.getBytesWritten();
|
||||
e.crc = crc.getValue();
|
||||
if (out instanceof ByteArrayOutputStream)
|
||||
e.bytes = ((ByteArrayOutputStream)out).toByteArray();
|
||||
super.close();
|
||||
update(e);
|
||||
releaseDeflater(def);
|
||||
}
|
||||
}
|
||||
|
||||
// Wrapper output stream class to write out a "stored" entry.
|
||||
// (1) this class does not close the underlying out stream when
|
||||
// being closed.
|
||||
// (2) no need to be "synchronized", only used by sync()
|
||||
private class EntryOutputStreamCRC32 extends FilterOutputStream {
|
||||
private Entry e;
|
||||
private CRC32 crc;
|
||||
private final CRC32 crc;
|
||||
private final Entry e;
|
||||
private long written;
|
||||
private boolean isClosed;
|
||||
|
||||
@ -1438,8 +1520,8 @@ class ZipFileSystem extends FileSystem {
|
||||
// being closed.
|
||||
// (2) no need to be "synchronized", only used by sync()
|
||||
private class EntryOutputStreamDef extends DeflaterOutputStream {
|
||||
private CRC32 crc;
|
||||
private Entry e;
|
||||
private final CRC32 crc;
|
||||
private final Entry e;
|
||||
private boolean isClosed;
|
||||
|
||||
EntryOutputStreamDef(Entry e, OutputStream os) throws IOException {
|
||||
@ -1471,14 +1553,12 @@ class ZipFileSystem extends FileSystem {
|
||||
private InputStream getInputStream(Entry e)
|
||||
throws IOException
|
||||
{
|
||||
InputStream eis = null;
|
||||
|
||||
InputStream eis;
|
||||
if (e.type == Entry.NEW) {
|
||||
// now bytes & file is uncompressed.
|
||||
if (e.bytes != null)
|
||||
return new ByteArrayInputStream(e.bytes);
|
||||
eis = new ByteArrayInputStream(e.bytes);
|
||||
else if (e.file != null)
|
||||
return Files.newInputStream(e.file);
|
||||
eis = Files.newInputStream(e.file);
|
||||
else
|
||||
throw new ZipException("update entry data is missing");
|
||||
} else if (e.type == Entry.FILECH) {
|
||||
@ -1579,7 +1659,7 @@ class ZipFileSystem extends FileSystem {
|
||||
len = (int) rem;
|
||||
}
|
||||
// readFullyAt()
|
||||
long n = 0;
|
||||
long n;
|
||||
ByteBuffer bb = ByteBuffer.wrap(b);
|
||||
bb.position(off);
|
||||
bb.limit(off + len);
|
||||
@ -1905,7 +1985,7 @@ class ZipFileSystem extends FileSystem {
|
||||
this.type = type;
|
||||
}
|
||||
|
||||
Entry (Entry e, int type) {
|
||||
Entry(Entry e, int type) {
|
||||
name(e.name);
|
||||
this.isdir = e.isdir;
|
||||
this.version = e.version;
|
||||
@ -1928,7 +2008,7 @@ class ZipFileSystem extends FileSystem {
|
||||
this.type = type;
|
||||
}
|
||||
|
||||
Entry (byte[] name, Path file, int type) {
|
||||
Entry(byte[] name, Path file, int type) {
|
||||
this(name, type, false, METHOD_STORED);
|
||||
this.file = file;
|
||||
}
|
||||
@ -2424,6 +2504,36 @@ class ZipFileSystem extends FileSystem {
|
||||
}
|
||||
}
|
||||
|
||||
private static class ExistingChannelCloser {
|
||||
private final Path path;
|
||||
private final SeekableByteChannel ch;
|
||||
private final Set<InputStream> streams;
|
||||
ExistingChannelCloser(Path path,
|
||||
SeekableByteChannel ch,
|
||||
Set<InputStream> streams) {
|
||||
this.path = path;
|
||||
this.ch = ch;
|
||||
this.streams = streams;
|
||||
}
|
||||
|
||||
/**
|
||||
* If there are no more outstanding streams, close the channel and
|
||||
* delete the backing file
|
||||
*
|
||||
* @return true if we're done and closed the backing file,
|
||||
* otherwise false
|
||||
* @throws IOException
|
||||
*/
|
||||
public boolean closeAndDeleteIfDone() throws IOException {
|
||||
if (streams.isEmpty()) {
|
||||
ch.close();
|
||||
Files.delete(path);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// ZIP directory has two issues:
|
||||
// (1) ZIP spec does not require the ZIP file to include
|
||||
// directory entry
|
||||
|
@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright (c) 2009, 2018, Oracle and/or its affiliates. All rights reserved.
|
||||
* Copyright (c) 2009, 2019, Oracle and/or its affiliates. All rights reserved.
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
@ -104,7 +104,7 @@ public class ZipFileSystemProvider extends FileSystemProvider {
|
||||
if (filesystems.containsKey(realPath))
|
||||
throw new FileSystemAlreadyExistsException();
|
||||
}
|
||||
ZipFileSystem zipfs = null;
|
||||
ZipFileSystem zipfs;
|
||||
try {
|
||||
if (env.containsKey("multi-release")) {
|
||||
zipfs = new JarFileSystem(this, path, env);
|
||||
@ -131,13 +131,13 @@ public class ZipFileSystemProvider extends FileSystemProvider {
|
||||
throws IOException
|
||||
{
|
||||
ensureFile(path);
|
||||
try {
|
||||
ZipFileSystem zipfs;
|
||||
if (env.containsKey("multi-release")) {
|
||||
zipfs = new JarFileSystem(this, path, env);
|
||||
} else {
|
||||
zipfs = new ZipFileSystem(this, path, env);
|
||||
}
|
||||
try {
|
||||
ZipFileSystem zipfs;
|
||||
if (env.containsKey("multi-release")) {
|
||||
zipfs = new JarFileSystem(this, path, env);
|
||||
} else {
|
||||
zipfs = new ZipFileSystem(this, path, env);
|
||||
}
|
||||
return zipfs;
|
||||
} catch (ZipException ze) {
|
||||
String pname = path.toString();
|
||||
|
@ -676,7 +676,7 @@ final class ZipPath implements Path {
|
||||
|
||||
@Override
|
||||
public Iterator<Path> iterator() {
|
||||
return new Iterator<Path>() {
|
||||
return new Iterator<>() {
|
||||
private int i = 0;
|
||||
|
||||
@Override
|
||||
@ -746,8 +746,8 @@ final class ZipPath implements Path {
|
||||
void setAttribute(String attribute, Object value, LinkOption... options)
|
||||
throws IOException
|
||||
{
|
||||
String type = null;
|
||||
String attr = null;
|
||||
String type;
|
||||
String attr;
|
||||
int colonPos = attribute.indexOf(':');
|
||||
if (colonPos == -1) {
|
||||
type = "basic";
|
||||
@ -772,8 +772,8 @@ final class ZipPath implements Path {
|
||||
throws IOException
|
||||
|
||||
{
|
||||
String view = null;
|
||||
String attrs = null;
|
||||
String view;
|
||||
String attrs;
|
||||
int colonPos = attributes.indexOf(':');
|
||||
if (colonPos == -1) {
|
||||
view = "basic";
|
||||
|
Loading…
x
Reference in New Issue
Block a user