diff options
Diffstat (limited to 'controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackageStream.java')
-rw-r--r-- | controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackageStream.java | 244 |
1 files changed, 244 insertions, 0 deletions
diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackageStream.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackageStream.java new file mode 100644 index 00000000000..3288759b174 --- /dev/null +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackageStream.java @@ -0,0 +1,244 @@ +package com.yahoo.vespa.hosted.controller.application.pkg; + +import com.yahoo.security.X509CertificateUtils; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.security.cert.X509Certificate; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Predicate; +import java.util.function.Supplier; +import java.util.function.UnaryOperator; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; +import java.util.zip.ZipOutputStream; + +import static com.yahoo.security.X509CertificateUtils.certificateListFromPem; +import static java.io.OutputStream.nullOutputStream; +import static java.lang.Math.min; +import static java.nio.charset.StandardCharsets.UTF_8; + +/** + * Wraps a zipped application package stream. + * This allows replacing content as the input stream is read. + * This also retains a truncated {@link ApplicationPackage}, containing only the specified set of files, + * which can be accessed when this stream is fully exhausted. + * + * @author jonmv + */ +public class ApplicationPackageStream { + + private final byte[] inBuffer = new byte[1 << 16]; + private final ByteArrayOutputStream out = new ByteArrayOutputStream(1 << 16); + private final ZipOutputStream outZip = new ZipOutputStream(out); + private final ByteArrayOutputStream teeOut = new ByteArrayOutputStream(1 << 16); + private final ZipOutputStream teeZip = new ZipOutputStream(teeOut); + private final Replacer replacer; + private final Predicate<String> filter; + private final Supplier<InputStream> in; + + private ApplicationPackage ap = null; + private boolean done = false; + + public static Replacer addingCertificate(Optional<X509Certificate> certificate) { + return certificate.map(cert -> Replacer.of(Map.of(ApplicationPackage.trustedCertificatesFile, + trustBytes -> append(trustBytes, cert)))) + .orElse(Replacer.of(Map.of())); + } + + static InputStream append(InputStream trustIn, X509Certificate cert) { + try { + List<X509Certificate> trusted = trustIn == null ? new ArrayList<>() + : new ArrayList<>(certificateListFromPem(new String(trustIn.readAllBytes(), UTF_8))); + trusted.add(cert); + return new ByteArrayInputStream(X509CertificateUtils.toPem(trusted).getBytes(UTF_8)); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + /** Stream that effectively copies the input stream to its {@link #truncatedPackage()} when exhausted. */ + public ApplicationPackageStream(Supplier<InputStream> in) { + this(in, __ -> true, Map.of()); + } + + /** Stream that replaces the indicated entries, and copies all metadata files to its {@link #truncatedPackage()} when exhausted. */ + public ApplicationPackageStream(Supplier<InputStream> in, Replacer replacer) { + this(in, name -> ApplicationPackage.prePopulated.contains(name) || name.endsWith(".xml"), replacer); + } + + /** Stream that replaces the indicated entries, and copies the filtered entries to its {@link #truncatedPackage()} when exhausted. */ + public ApplicationPackageStream(Supplier<InputStream> in, Predicate<String> truncation, Map<String, UnaryOperator<InputStream>> replacements) { + this(in, truncation, Replacer.of(replacements)); + } + + /** Stream that uses the given replacer to modify content, and copies the filtered entries to its {@link #truncatedPackage()} when exhausted. */ + public ApplicationPackageStream(Supplier<InputStream> in, Predicate<String> truncation, Replacer replacer) { + this.in = in; + this.filter = truncation; + this.replacer = replacer; + } + + public InputStream zipStream() { + return new Stream(); + } + + /** Returns the application backed by only the files indicated by the truncation filter. Throws if not yet exhausted. */ + public ApplicationPackage truncatedPackage() { + if (ap == null) throw new IllegalStateException("must completely exhaust input before reading package"); + return ap; + } + + private class Stream extends InputStream { + + private final ZipInputStream inZip = new ZipInputStream(in.get()); + private byte[] currentOut = new byte[0]; + private InputStream currentIn = InputStream.nullInputStream(); + private boolean includeCurrent = false; + private int pos = 0; + private boolean closed = false; + + private void fill() throws IOException { + if (done) return; + while (out.size() == 0) { + // Exhaust current entry first. + int i, n = out.size(); + while (out.size() == 0 && (i = currentIn.read(inBuffer)) != -1) { + if (includeCurrent) teeZip.write(inBuffer, 0, i); + outZip.write(inBuffer, 0, i); + n += i; + } + + // Current entry exhausted, look for next. + if (n == 0) { + next(); + if (done) break; + } + } + + currentOut = out.toByteArray(); + out.reset(); + pos = 0; + } + + private void next() throws IOException { + if (includeCurrent) teeZip.closeEntry(); + outZip.closeEntry(); + + ZipEntry next = inZip.getNextEntry(); + String name; + InputStream content = null; + if (next == null) { + // We may still have replacements to fill in, but if we don't, we're done filling, forever! + name = replacer.next(); + if (name == null) { + outZip.close(); // This typically makes new output available, so must check for that after this. + teeZip.close(); + currentIn = nullInputStream(); + ap = new ApplicationPackage(teeOut.toByteArray()); + done = true; + return; + } + } + else { + name = next.getName(); + content = new FilterInputStream(inZip) { @Override public void close() { } }; // Protect inZip from replacements closing it. + } + + includeCurrent = filter.test(name); + currentIn = replacer.modify(name, content); + if (currentIn == null) { + currentIn = InputStream.nullInputStream(); + } + else { + if (includeCurrent) teeZip.putNextEntry(new ZipEntry(name)); + outZip.putNextEntry(new ZipEntry(name)); + } + } + + @Override + public int read() throws IOException { + if (closed) throw new IOException("stream closed"); + if (pos == currentOut.length) { + fill(); + if (pos == currentOut.length) return -1; + } + return 0xff & inBuffer[pos++]; + } + + @Override + public int read(byte[] out, int off, int len) throws IOException { + if (closed) throw new IOException("stream closed"); + if ((off | len | (off + len) | (out.length - (off + len))) < 0) throw new IndexOutOfBoundsException(); + if (pos == currentOut.length) { + fill(); + if (pos == currentOut.length) return -1; + } + int n = min(currentOut.length - pos, len); + System.arraycopy(currentOut, pos, out, off, n); + pos += n; + return n; + } + + @Override + public int available() throws IOException { + return pos == currentOut.length && done ? 0 : 1; + } + + @Override + public void close() { + if ( ! closed) try { + transferTo(nullOutputStream()); + inZip.close(); + closed = true; + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + } + + /** Replaces entries in a zip stream as they are encountered, then appends remaining entries at the end. */ + public interface Replacer { + + /** Called when the entries of the original zip stream are exhausted. Return remaining names, or {@code null} when none left. */ + String next(); + + /** Modify content for a given name; return {@code null} for removal; in is {@code null} for entries not present in the input. */ + InputStream modify(String name, InputStream in); + + /** + * Wraps a map of fixed replacements, and: + * <ul> + * <li>Removes entries whose value is {@code null}.</li> + * <li>Modifies entries present in both input and the map.</li> + * <li>Appends entries present exclusively in the map.</li> + * <li>Writes all other entries as they are.</li> + * </ul> + */ + static Replacer of(Map<String, UnaryOperator<InputStream>> replacements) { + return new Replacer() { + final Map<String, UnaryOperator<InputStream>> remaining = new HashMap<>(replacements); + @Override public String next() { + return remaining.isEmpty() ? null : remaining.keySet().iterator().next(); + } + @Override public InputStream modify(String name, InputStream in) { + UnaryOperator<InputStream> mapper = remaining.remove(name); + return mapper == null ? in : mapper.apply(in); + } + }; + } + + } + +} |