diff options
author | jonmv <venstad@gmail.com> | 2022-11-07 12:44:46 +0100 |
---|---|---|
committer | jonmv <venstad@gmail.com> | 2022-11-07 12:44:46 +0100 |
commit | a50efd62aa586b836feb1f394b38c384ab260851 (patch) | |
tree | 9b4395e1a1235acb2c0b4bcf616dfcbf583f107b | |
parent | a48b3b9e2cd688459ddf5143b303f9f929d667c2 (diff) |
Copy more state for separate streams ...
3 files changed, 42 insertions, 22 deletions
diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackageStream.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackageStream.java index 3288759b174..6d12b7ee7ea 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackageStream.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackageStream.java @@ -36,19 +36,13 @@ import static java.nio.charset.StandardCharsets.UTF_8; */ public class ApplicationPackageStream { - private final byte[] inBuffer = new byte[1 << 16]; - private final ByteArrayOutputStream out = new ByteArrayOutputStream(1 << 16); - private final ZipOutputStream outZip = new ZipOutputStream(out); - private final ByteArrayOutputStream teeOut = new ByteArrayOutputStream(1 << 16); - private final ZipOutputStream teeZip = new ZipOutputStream(teeOut); - private final Replacer replacer; - private final Predicate<String> filter; + private final Supplier<Replacer> replacer; + private final Supplier<Predicate<String>> filter; private final Supplier<InputStream> in; private ApplicationPackage ap = null; - private boolean done = false; - public static Replacer addingCertificate(Optional<X509Certificate> certificate) { + public static Supplier<Replacer> addingCertificate(Optional<X509Certificate> certificate) { return certificate.map(cert -> Replacer.of(Map.of(ApplicationPackage.trustedCertificatesFile, trustBytes -> append(trustBytes, cert)))) .orElse(Replacer.of(Map.of())); @@ -68,21 +62,21 @@ public class ApplicationPackageStream { /** Stream that effectively copies the input stream to its {@link #truncatedPackage()} when exhausted. */ public ApplicationPackageStream(Supplier<InputStream> in) { - this(in, __ -> true, Map.of()); + this(in, () -> __ -> true, Map.of()); } /** Stream that replaces the indicated entries, and copies all metadata files to its {@link #truncatedPackage()} when exhausted. */ - public ApplicationPackageStream(Supplier<InputStream> in, Replacer replacer) { - this(in, name -> ApplicationPackage.prePopulated.contains(name) || name.endsWith(".xml"), replacer); + public ApplicationPackageStream(Supplier<InputStream> in, Supplier<Replacer> replacer) { + this(in, () -> name -> ApplicationPackage.prePopulated.contains(name) || name.endsWith(".xml"), replacer); } /** Stream that replaces the indicated entries, and copies the filtered entries to its {@link #truncatedPackage()} when exhausted. */ - public ApplicationPackageStream(Supplier<InputStream> in, Predicate<String> truncation, Map<String, UnaryOperator<InputStream>> replacements) { + public ApplicationPackageStream(Supplier<InputStream> in, Supplier<Predicate<String>> truncation, Map<String, UnaryOperator<InputStream>> replacements) { this(in, truncation, Replacer.of(replacements)); } /** Stream that uses the given replacer to modify content, and copies the filtered entries to its {@link #truncatedPackage()} when exhausted. */ - public ApplicationPackageStream(Supplier<InputStream> in, Predicate<String> truncation, Replacer replacer) { + public ApplicationPackageStream(Supplier<InputStream> in, Supplier<Predicate<String>> truncation, Supplier<Replacer> replacer) { this.in = in; this.filter = truncation; this.replacer = replacer; @@ -100,12 +94,20 @@ public class ApplicationPackageStream { private class Stream extends InputStream { + private final byte[] inBuffer = new byte[1 << 16]; + private final ByteArrayOutputStream teeOut = new ByteArrayOutputStream(1 << 16); + private final ZipOutputStream teeZip = new ZipOutputStream(teeOut); private final ZipInputStream inZip = new ZipInputStream(in.get()); + private final ByteArrayOutputStream out = new ByteArrayOutputStream(1 << 16); + private final ZipOutputStream outZip = new ZipOutputStream(out); + private final Replacer replacer = ApplicationPackageStream.this.replacer.get(); + private final Predicate<String> filter = ApplicationPackageStream.this.filter.get(); private byte[] currentOut = new byte[0]; private InputStream currentIn = InputStream.nullInputStream(); private boolean includeCurrent = false; private int pos = 0; private boolean closed = false; + private boolean done = false; private void fill() throws IOException { if (done) return; @@ -144,7 +146,7 @@ public class ApplicationPackageStream { outZip.close(); // This typically makes new output available, so must check for that after this. teeZip.close(); currentIn = nullInputStream(); - ap = new ApplicationPackage(teeOut.toByteArray()); + if (ap == null) ap = new ApplicationPackage(teeOut.toByteArray()); done = true; return; } @@ -154,7 +156,7 @@ public class ApplicationPackageStream { content = new FilterInputStream(inZip) { @Override public void close() { } }; // Protect inZip from replacements closing it. } - includeCurrent = filter.test(name); + includeCurrent = ap == null && filter.test(name); currentIn = replacer.modify(name, content); if (currentIn == null) { currentIn = InputStream.nullInputStream(); @@ -172,7 +174,7 @@ public class ApplicationPackageStream { fill(); if (pos == currentOut.length) return -1; } - return 0xff & inBuffer[pos++]; + return 0xff & currentOut[pos++]; } @Override @@ -226,8 +228,8 @@ public class ApplicationPackageStream { * <li>Writes all other entries as they are.</li> * </ul> */ - static Replacer of(Map<String, UnaryOperator<InputStream>> replacements) { - return new Replacer() { + static Supplier<Replacer> of(Map<String, UnaryOperator<InputStream>> replacements) { + return () -> new Replacer() { final Map<String, UnaryOperator<InputStream>> remaining = new HashMap<>(replacements); @Override public String next() { return remaining.isEmpty() ? null : remaining.keySet().iterator().next(); diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/TestPackage.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/TestPackage.java index f838e44bd02..17644d5e207 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/TestPackage.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/TestPackage.java @@ -94,7 +94,7 @@ public class TestPackage { keyPair = null; this.certificate = null; } - this.applicationPackageStream = new ApplicationPackageStream(inZip, __ -> false, new Replacer() { + this.applicationPackageStream = new ApplicationPackageStream(inZip, () -> __ -> false, () -> new Replacer() { // Initially skips all declared entries, ensuring they're generated and appended after all input entries. final Map<String, UnaryOperator<InputStream>> entries = new HashMap<>(); diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackageTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackageTest.java index 71ce291df36..ab8f696492a 100644 --- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackageTest.java +++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackageTest.java @@ -22,6 +22,7 @@ import java.nio.file.Path; import java.security.KeyPair; import java.security.cert.X509Certificate; import java.time.Instant; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Optional; @@ -190,7 +191,7 @@ public class ApplicationPackageTest { assertEquals(List.of(), new ApplicationPackage(zip).trustedCertificates()); for (int i = 0; i < certificates.size(); i++) { InputStream in = new ByteArrayInputStream(zip); - zip = new ApplicationPackageStream(() -> in, __ -> false, addingCertificate(Optional.of(certificates.get(i)))).zipStream().readAllBytes(); + zip = new ApplicationPackageStream(() -> in, () -> __ -> false, addingCertificate(Optional.of(certificates.get(i)))).zipStream().readAllBytes(); assertEquals(certificates.subList(0, i + 1), new ApplicationPackage(zip).trustedCertificates()); } } @@ -232,8 +233,12 @@ public class ApplicationPackageTest { "unused1.xml", in -> null, "unused2.xml", __ -> new ByteArrayInputStream(jdiscXml.getBytes(UTF_8))); Predicate<String> truncation = name -> name.endsWith(".xml"); - ApplicationPackageStream modifier = new ApplicationPackageStream(() -> new ByteArrayInputStream(zip), truncation, replacements); + ApplicationPackageStream modifier = new ApplicationPackageStream(() -> new ByteArrayInputStream(Arrays.copyOf(zip, zip.length)), () -> truncation, replacements); out.reset(); + + InputStream partiallyRead = modifier.zipStream(); + assertEquals(15, partiallyRead.readNBytes(15).length); + modifier.zipStream().transferTo(out); assertEquals(Map.of("deployment.xml", deploymentXml + "\n\n", @@ -262,6 +267,19 @@ public class ApplicationPackageTest { "content/nodes.xml", nodesXml, "gurba", "gurba"))).metaDataZip()), unzip(modifier.truncatedPackage().metaDataZip())); + + assertArrayEquals(modifier.zipStream().readAllBytes(), + modifier.zipStream().readAllBytes()); + + ByteArrayOutputStream byteAtATime = new ByteArrayOutputStream(); + try (InputStream stream = modifier.zipStream()) { + for (int b; (b = stream.read()) != -1; ) byteAtATime.write(b); + assertArrayEquals(modifier.zipStream().readAllBytes(), + byteAtATime.toByteArray()); + } + + assertEquals(modifier.zipStream().readAllBytes().length, + 15 + partiallyRead.readAllBytes().length); } } |