summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorjonmv <venstad@gmail.com>2022-11-07 12:44:46 +0100
committerjonmv <venstad@gmail.com>2022-11-07 12:44:46 +0100
commita50efd62aa586b836feb1f394b38c384ab260851 (patch)
tree9b4395e1a1235acb2c0b4bcf616dfcbf583f107b
parenta48b3b9e2cd688459ddf5143b303f9f929d667c2 (diff)
Copy more state for separate streams ...
-rw-r--r--controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackageStream.java40
-rw-r--r--controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/TestPackage.java2
-rw-r--r--controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackageTest.java22
3 files changed, 42 insertions, 22 deletions
diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackageStream.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackageStream.java
index 3288759b174..6d12b7ee7ea 100644
--- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackageStream.java
+++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackageStream.java
@@ -36,19 +36,13 @@ import static java.nio.charset.StandardCharsets.UTF_8;
*/
public class ApplicationPackageStream {
- private final byte[] inBuffer = new byte[1 << 16];
- private final ByteArrayOutputStream out = new ByteArrayOutputStream(1 << 16);
- private final ZipOutputStream outZip = new ZipOutputStream(out);
- private final ByteArrayOutputStream teeOut = new ByteArrayOutputStream(1 << 16);
- private final ZipOutputStream teeZip = new ZipOutputStream(teeOut);
- private final Replacer replacer;
- private final Predicate<String> filter;
+ private final Supplier<Replacer> replacer;
+ private final Supplier<Predicate<String>> filter;
private final Supplier<InputStream> in;
private ApplicationPackage ap = null;
- private boolean done = false;
- public static Replacer addingCertificate(Optional<X509Certificate> certificate) {
+ public static Supplier<Replacer> addingCertificate(Optional<X509Certificate> certificate) {
return certificate.map(cert -> Replacer.of(Map.of(ApplicationPackage.trustedCertificatesFile,
trustBytes -> append(trustBytes, cert))))
.orElse(Replacer.of(Map.of()));
@@ -68,21 +62,21 @@ public class ApplicationPackageStream {
/** Stream that effectively copies the input stream to its {@link #truncatedPackage()} when exhausted. */
public ApplicationPackageStream(Supplier<InputStream> in) {
- this(in, __ -> true, Map.of());
+ this(in, () -> __ -> true, Map.of());
}
/** Stream that replaces the indicated entries, and copies all metadata files to its {@link #truncatedPackage()} when exhausted. */
- public ApplicationPackageStream(Supplier<InputStream> in, Replacer replacer) {
- this(in, name -> ApplicationPackage.prePopulated.contains(name) || name.endsWith(".xml"), replacer);
+ public ApplicationPackageStream(Supplier<InputStream> in, Supplier<Replacer> replacer) {
+ this(in, () -> name -> ApplicationPackage.prePopulated.contains(name) || name.endsWith(".xml"), replacer);
}
/** Stream that replaces the indicated entries, and copies the filtered entries to its {@link #truncatedPackage()} when exhausted. */
- public ApplicationPackageStream(Supplier<InputStream> in, Predicate<String> truncation, Map<String, UnaryOperator<InputStream>> replacements) {
+ public ApplicationPackageStream(Supplier<InputStream> in, Supplier<Predicate<String>> truncation, Map<String, UnaryOperator<InputStream>> replacements) {
this(in, truncation, Replacer.of(replacements));
}
/** Stream that uses the given replacer to modify content, and copies the filtered entries to its {@link #truncatedPackage()} when exhausted. */
- public ApplicationPackageStream(Supplier<InputStream> in, Predicate<String> truncation, Replacer replacer) {
+ public ApplicationPackageStream(Supplier<InputStream> in, Supplier<Predicate<String>> truncation, Supplier<Replacer> replacer) {
this.in = in;
this.filter = truncation;
this.replacer = replacer;
@@ -100,12 +94,20 @@ public class ApplicationPackageStream {
private class Stream extends InputStream {
+ private final byte[] inBuffer = new byte[1 << 16];
+ private final ByteArrayOutputStream teeOut = new ByteArrayOutputStream(1 << 16);
+ private final ZipOutputStream teeZip = new ZipOutputStream(teeOut);
private final ZipInputStream inZip = new ZipInputStream(in.get());
+ private final ByteArrayOutputStream out = new ByteArrayOutputStream(1 << 16);
+ private final ZipOutputStream outZip = new ZipOutputStream(out);
+ private final Replacer replacer = ApplicationPackageStream.this.replacer.get();
+ private final Predicate<String> filter = ApplicationPackageStream.this.filter.get();
private byte[] currentOut = new byte[0];
private InputStream currentIn = InputStream.nullInputStream();
private boolean includeCurrent = false;
private int pos = 0;
private boolean closed = false;
+ private boolean done = false;
private void fill() throws IOException {
if (done) return;
@@ -144,7 +146,7 @@ public class ApplicationPackageStream {
outZip.close(); // This typically makes new output available, so must check for that after this.
teeZip.close();
currentIn = nullInputStream();
- ap = new ApplicationPackage(teeOut.toByteArray());
+ if (ap == null) ap = new ApplicationPackage(teeOut.toByteArray());
done = true;
return;
}
@@ -154,7 +156,7 @@ public class ApplicationPackageStream {
content = new FilterInputStream(inZip) { @Override public void close() { } }; // Protect inZip from replacements closing it.
}
- includeCurrent = filter.test(name);
+ includeCurrent = ap == null && filter.test(name);
currentIn = replacer.modify(name, content);
if (currentIn == null) {
currentIn = InputStream.nullInputStream();
@@ -172,7 +174,7 @@ public class ApplicationPackageStream {
fill();
if (pos == currentOut.length) return -1;
}
- return 0xff & inBuffer[pos++];
+ return 0xff & currentOut[pos++];
}
@Override
@@ -226,8 +228,8 @@ public class ApplicationPackageStream {
* <li>Writes all other entries as they are.</li>
* </ul>
*/
- static Replacer of(Map<String, UnaryOperator<InputStream>> replacements) {
- return new Replacer() {
+ static Supplier<Replacer> of(Map<String, UnaryOperator<InputStream>> replacements) {
+ return () -> new Replacer() {
final Map<String, UnaryOperator<InputStream>> remaining = new HashMap<>(replacements);
@Override public String next() {
return remaining.isEmpty() ? null : remaining.keySet().iterator().next();
diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/TestPackage.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/TestPackage.java
index f838e44bd02..17644d5e207 100644
--- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/TestPackage.java
+++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/TestPackage.java
@@ -94,7 +94,7 @@ public class TestPackage {
keyPair = null;
this.certificate = null;
}
- this.applicationPackageStream = new ApplicationPackageStream(inZip, __ -> false, new Replacer() {
+ this.applicationPackageStream = new ApplicationPackageStream(inZip, () -> __ -> false, () -> new Replacer() {
// Initially skips all declared entries, ensuring they're generated and appended after all input entries.
final Map<String, UnaryOperator<InputStream>> entries = new HashMap<>();
diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackageTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackageTest.java
index 71ce291df36..ab8f696492a 100644
--- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackageTest.java
+++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackageTest.java
@@ -22,6 +22,7 @@ import java.nio.file.Path;
import java.security.KeyPair;
import java.security.cert.X509Certificate;
import java.time.Instant;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -190,7 +191,7 @@ public class ApplicationPackageTest {
assertEquals(List.of(), new ApplicationPackage(zip).trustedCertificates());
for (int i = 0; i < certificates.size(); i++) {
InputStream in = new ByteArrayInputStream(zip);
- zip = new ApplicationPackageStream(() -> in, __ -> false, addingCertificate(Optional.of(certificates.get(i)))).zipStream().readAllBytes();
+ zip = new ApplicationPackageStream(() -> in, () -> __ -> false, addingCertificate(Optional.of(certificates.get(i)))).zipStream().readAllBytes();
assertEquals(certificates.subList(0, i + 1), new ApplicationPackage(zip).trustedCertificates());
}
}
@@ -232,8 +233,12 @@ public class ApplicationPackageTest {
"unused1.xml", in -> null,
"unused2.xml", __ -> new ByteArrayInputStream(jdiscXml.getBytes(UTF_8)));
Predicate<String> truncation = name -> name.endsWith(".xml");
- ApplicationPackageStream modifier = new ApplicationPackageStream(() -> new ByteArrayInputStream(zip), truncation, replacements);
+ ApplicationPackageStream modifier = new ApplicationPackageStream(() -> new ByteArrayInputStream(Arrays.copyOf(zip, zip.length)), () -> truncation, replacements);
out.reset();
+
+ InputStream partiallyRead = modifier.zipStream();
+ assertEquals(15, partiallyRead.readNBytes(15).length);
+
modifier.zipStream().transferTo(out);
assertEquals(Map.of("deployment.xml", deploymentXml + "\n\n",
@@ -262,6 +267,19 @@ public class ApplicationPackageTest {
"content/nodes.xml", nodesXml,
"gurba", "gurba"))).metaDataZip()),
unzip(modifier.truncatedPackage().metaDataZip()));
+
+ assertArrayEquals(modifier.zipStream().readAllBytes(),
+ modifier.zipStream().readAllBytes());
+
+ ByteArrayOutputStream byteAtATime = new ByteArrayOutputStream();
+ try (InputStream stream = modifier.zipStream()) {
+ for (int b; (b = stream.read()) != -1; ) byteAtATime.write(b);
+ assertArrayEquals(modifier.zipStream().readAllBytes(),
+ byteAtATime.toByteArray());
+ }
+
+ assertEquals(modifier.zipStream().readAllBytes().length,
+ 15 + partiallyRead.readAllBytes().length);
}
}