// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.hosted.controller.application.pkg; import java.io.ByteArrayOutputStream; import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; import java.io.UncheckedIOException; import java.nio.file.attribute.FileTime; import java.util.HashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; import java.util.function.Supplier; import java.util.function.UnaryOperator; import java.util.zip.ZipEntry; import java.util.zip.ZipInputStream; import java.util.zip.ZipOutputStream; import static java.io.OutputStream.nullOutputStream; import static java.lang.Math.min; /** * Wraps a zipped application package stream. * This allows replacing content as the input stream is read. * This also retains a truncated {@link ApplicationPackage}, containing only the specified set of files, * which can be accessed when this stream is fully exhausted. * * @author jonmv */ public class ApplicationPackageStream { private final Supplier replacer; private final Supplier> filter; private final Supplier in; private final AtomicReference truncatedPackage = new AtomicReference<>(); private final FileTime createdAt = FileTime.fromMillis(System.currentTimeMillis()); /** Stream that copies application meta and other XML files from the input stream to its {@link #truncatedPackage()} when exhausted. */ public ApplicationPackageStream(Supplier in) { this(in, () -> name -> ApplicationPackage.prePopulated.contains(name) || name.endsWith(".xml"), Map.of()); } /** Stream that copies the indicated entries from the input stream to its {@link #truncatedPackage()} when exhausted. */ public ApplicationPackageStream(Supplier in, Supplier> truncation) { this(in, truncation, Map.of()); } /** Stream that replaces the indicated entries, and copies the filtered entries to its {@link #truncatedPackage()} when exhausted. */ public ApplicationPackageStream(Supplier in, Supplier> truncation, Map> replacements) { this(in, truncation, Replacer.of(replacements)); } /** Stream that uses the given replacer to modify content, and copies the filtered entries to its {@link #truncatedPackage()} when exhausted. */ public ApplicationPackageStream(Supplier in, Supplier> truncation, Supplier replacer) { this.in = in; this.filter = truncation; this.replacer = replacer; } /** * Returns a new stream containing the zipped application package this wraps. Separate streams may exist concurrently, * and the first to be exhausted will populate the truncated application package. */ public InputStream zipStream() { return new Stream(in.get(), replacer.get(), filter.get(), createdAt, truncatedPackage); } /** * Returns the application package backed by only the files indicated by the truncation filter. * Throws if no instances of {@link #zipStream()} have been exhausted yet. */ public ApplicationPackage truncatedPackage() { ApplicationPackage truncated = truncatedPackage.get(); if (truncated == null) throw new IllegalStateException("must completely exhaust input before reading package"); return truncated; } private static class Stream extends InputStream { private final byte[] inBuffer = new byte[1 << 16]; private final ByteArrayOutputStream teeOut = new ByteArrayOutputStream(1 << 16); private final ZipOutputStream teeZip = new ZipOutputStream(teeOut); private final ByteArrayOutputStream out = new ByteArrayOutputStream(1 << 16); private final ZipOutputStream outZip = new ZipOutputStream(out); private final AtomicReference truncatedPackage; private final InputStream in; private final ZipInputStream inZip; private final Replacer replacer; private final Predicate filter; private final FileTime createdAt; private byte[] currentOut = new byte[0]; private InputStream currentIn = InputStream.nullInputStream(); private boolean includeCurrent = false; private int pos = 0; private boolean closed = false; private boolean done = false; private Stream(InputStream in, Replacer replacer, Predicate filter, FileTime createdAt, AtomicReference truncatedPackage) { this.in = in; this.inZip = new ZipInputStream(in); this.replacer = replacer; this.filter = filter; this.createdAt = createdAt; this.truncatedPackage = truncatedPackage; } private void fill() throws IOException { if (done) return; while (out.size() == 0) { // Exhaust current entry first. int i, n = out.size(); while (out.size() == 0 && (i = currentIn.read(inBuffer)) != -1) { if (includeCurrent) teeZip.write(inBuffer, 0, i); outZip.write(inBuffer, 0, i); n += i; } // Current entry exhausted, look for next. if (n == 0) { next(); if (done) break; } } currentOut = out.toByteArray(); out.reset(); pos = 0; } private void next() throws IOException { if (includeCurrent) teeZip.closeEntry(); outZip.closeEntry(); ZipEntry next = inZip.getNextEntry(); String name; FileTime modifiedAt; InputStream content = null; if (next == null) { // We may still have replacements to fill in, but if we don't, we're done filling, forever! name = replacer.next(); modifiedAt = createdAt; if (name == null) { outZip.close(); // This typically makes new output available, so must check for that after this. teeZip.close(); currentIn = nullInputStream(); truncatedPackage.compareAndSet(null, new ApplicationPackage(teeOut.toByteArray())); done = true; return; } } else { name = next.getName(); modifiedAt = next.getLastModifiedTime(); content = new FilterInputStream(inZip) { @Override public void close() { } }; // Protect inZip from replacements closing it. } includeCurrent = truncatedPackage.get() == null && filter.test(name); currentIn = replacer.modify(name, content); if (currentIn == null) { currentIn = InputStream.nullInputStream(); } else { if (includeCurrent) teeZip.putNextEntry(new ZipEntry(name) {{ setLastModifiedTime(modifiedAt); }}); outZip.putNextEntry(new ZipEntry(name) {{ setLastModifiedTime(modifiedAt); }}); } } @Override public int read() throws IOException { if (closed) throw new IOException("stream closed"); if (pos == currentOut.length) { fill(); if (pos == currentOut.length) return -1; } return 0xff & currentOut[pos++]; } @Override public int read(byte[] out, int off, int len) throws IOException { if (closed) throw new IOException("stream closed"); if ((off | len | (off + len) | (out.length - (off + len))) < 0) throw new IndexOutOfBoundsException(); if (pos == currentOut.length) { fill(); if (pos == currentOut.length) return -1; } int n = min(currentOut.length - pos, len); System.arraycopy(currentOut, pos, out, off, n); pos += n; return n; } @Override public int available() throws IOException { return pos == currentOut.length && done ? 0 : 1; } @Override public void close() { if ( ! closed) try { transferTo(nullOutputStream()); // Finish reading the zip, to populate the truncated package in case of errors. in.transferTo(nullOutputStream()); // For some inane reason, ZipInputStream doesn't exhaust its wrapped input. inZip.close(); closed = true; } catch (IOException e) { throw new UncheckedIOException(e); } } } /** Replaces entries in a zip stream as they are encountered, then appends remaining entries at the end. */ public interface Replacer { /** Called when the entries of the original zip stream are exhausted. Return remaining names, or {@code null} when none left. */ String next(); /** Modify content for a given name; return {@code null} for removal; in is {@code null} for entries not present in the input. */ InputStream modify(String name, InputStream in); /** * Wraps a map of fixed replacements, and: *
    *
  • Removes entries whose value is {@code null}.
  • *
  • Modifies entries present in both input and the map.
  • *
  • Appends entries present exclusively in the map.
  • *
  • Writes all other entries as they are.
  • *
*/ static Supplier of(Map> replacements) { return () -> new Replacer() { final Map> remaining = new HashMap<>(replacements); @Override public String next() { return remaining.isEmpty() ? null : remaining.keySet().iterator().next(); } @Override public InputStream modify(String name, InputStream in) { UnaryOperator mapper = remaining.remove(name); return mapper == null ? in : mapper.apply(in); } }; } } }