From f8a4549269d9df145a4f27c5368a2578f18128a7 Mon Sep 17 00:00:00 2001 From: jonmv Date: Sat, 5 Nov 2022 09:24:21 +0100 Subject: Revert "Merge pull request #24763 from vespa-engine/jonmv/revert-streams" This reverts commit 6d8bca79a1f600501290593ecd920eca0b237c78, reversing changes made to 36374eb2d3cc94c3792dd0a70963244abb6284b4. --- .../config/application/api/DeploymentSpec.java | 2 +- .../api/application/v4/model/DeploymentData.java | 41 ++-- .../integration/deployment/ApplicationStore.java | 25 ++- .../hosted/controller/ApplicationController.java | 69 +++--- .../application/pkg/ApplicationPackage.java | 27 +-- .../application/pkg/ApplicationPackageStream.java | 244 +++++++++++++++++++++ .../controller/application/pkg/TestPackage.java | 100 +++++---- .../controller/application/pkg/ZipEntries.java | 3 +- .../controller/deployment/InternalStepRunner.java | 9 +- .../controller/deployment/JobController.java | 46 ++-- .../application/pkg/ApplicationPackageTest.java | 199 +++++++++++++---- .../application/pkg/TestPackageTest.java | 57 ++++- .../controller/application/pkg/ZipEntriesTest.java | 50 ----- .../integration/ApplicationStoreMock.java | 15 +- .../controller/integration/ConfigServerMock.java | 13 +- .../ai/vespa/hosted/api/MultiPartStreamer.java | 11 +- .../ai/vespa/hosted/client/AbstractHttpClient.java | 2 +- .../java/ai/vespa/hosted/client/HttpClient.java | 6 - vespajlib/abi-spec.json | 1 + .../com/yahoo/compress/ArchiveStreamReader.java | 4 +- .../main/java/com/yahoo/io/LazyInputStream.java | 53 +++++ .../java/com/yahoo/yolean/concurrent/Memoized.java | 8 + 22 files changed, 723 insertions(+), 262 deletions(-) create mode 100644 controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackageStream.java delete mode 100644 controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/pkg/ZipEntriesTest.java create mode 100644 vespajlib/src/main/java/com/yahoo/io/LazyInputStream.java diff --git a/config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentSpec.java b/config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentSpec.java index c9ec9780fad..f69bd5fa475 100644 --- a/config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentSpec.java +++ b/config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentSpec.java @@ -194,7 +194,7 @@ public class DeploymentSpec { public DeploymentInstanceSpec requireInstance(InstanceName name) { Optional instance = instance(name); if (instance.isEmpty()) - throw new IllegalArgumentException("No instance '" + name + "' in deployment.xml'. Instances: " + + throw new IllegalArgumentException("No instance '" + name + "' in deployment.xml. Instances: " + instances().stream().map(spec -> spec.name().toString()).collect(Collectors.joining(","))); return instance.get(); } diff --git a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/application/v4/model/DeploymentData.java b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/application/v4/model/DeploymentData.java index d3331c3cfd4..63c744c385d 100644 --- a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/application/v4/model/DeploymentData.java +++ b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/application/v4/model/DeploymentData.java @@ -12,12 +12,14 @@ import com.yahoo.vespa.hosted.controller.api.integration.billing.Quota; import com.yahoo.vespa.hosted.controller.api.integration.certificates.EndpointCertificateMetadata; import com.yahoo.vespa.hosted.controller.api.integration.configserver.ContainerEndpoint; import com.yahoo.vespa.hosted.controller.api.integration.secrets.TenantSecretStore; +import com.yahoo.yolean.concurrent.Memoized; +import java.io.InputStream; import java.security.cert.X509Certificate; import java.util.List; -import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.function.Supplier; import static java.util.Objects.requireNonNull; @@ -32,40 +34,41 @@ public class DeploymentData { private final ApplicationId instance; private final Tags tags; private final ZoneId zone; - private final byte[] applicationPackage; + private final Supplier applicationPackage; private final Version platform; private final Set containerEndpoints; - private final Optional endpointCertificateMetadata; + private final Supplier> endpointCertificateMetadata; private final Optional dockerImageRepo; private final Optional athenzDomain; - private final Quota quota; + private final Supplier quota; private final List tenantSecretStores; private final List operatorCertificates; - private final Optional cloudAccount; + private final Supplier> cloudAccount; private final boolean dryRun; - public DeploymentData(ApplicationId instance, Tags tags, ZoneId zone, byte[] applicationPackage, Version platform, + public DeploymentData(ApplicationId instance, Tags tags, ZoneId zone, Supplier applicationPackage, Version platform, Set containerEndpoints, - Optional endpointCertificateMetadata, + Supplier> endpointCertificateMetadata, Optional dockerImageRepo, Optional athenzDomain, - Quota quota, + Supplier quota, List tenantSecretStores, List operatorCertificates, - Optional cloudAccount, boolean dryRun) { + Supplier> cloudAccount, + boolean dryRun) { this.instance = requireNonNull(instance); this.tags = requireNonNull(tags); this.zone = requireNonNull(zone); this.applicationPackage = requireNonNull(applicationPackage); this.platform = requireNonNull(platform); this.containerEndpoints = Set.copyOf(requireNonNull(containerEndpoints)); - this.endpointCertificateMetadata = requireNonNull(endpointCertificateMetadata); + this.endpointCertificateMetadata = new Memoized<>(requireNonNull(endpointCertificateMetadata)); this.dockerImageRepo = requireNonNull(dockerImageRepo); this.athenzDomain = athenzDomain; - this.quota = quota; + this.quota = new Memoized<>(requireNonNull(quota)); this.tenantSecretStores = List.copyOf(requireNonNull(tenantSecretStores)); this.operatorCertificates = List.copyOf(requireNonNull(operatorCertificates)); - this.cloudAccount = Objects.requireNonNull(cloudAccount); + this.cloudAccount = new Memoized<>(requireNonNull(cloudAccount)); this.dryRun = dryRun; } @@ -79,8 +82,8 @@ public class DeploymentData { return zone; } - public byte[] applicationPackage() { - return applicationPackage; + public InputStream applicationPackage() { + return applicationPackage.get(); } public Version platform() { @@ -92,7 +95,7 @@ public class DeploymentData { } public Optional endpointCertificateMetadata() { - return endpointCertificateMetadata; + return endpointCertificateMetadata.get(); } public Optional dockerImageRepo() { @@ -104,7 +107,7 @@ public class DeploymentData { } public Quota quota() { - return quota; + return quota.get(); } public List tenantSecretStores() { @@ -116,9 +119,11 @@ public class DeploymentData { } public Optional cloudAccount() { - return cloudAccount; + return cloudAccount.get(); } - public boolean isDryRun() { return dryRun; } + public boolean isDryRun() { + return dryRun; + } } diff --git a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/deployment/ApplicationStore.java b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/deployment/ApplicationStore.java index c4db0de539e..71ec07bc2e6 100644 --- a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/deployment/ApplicationStore.java +++ b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/deployment/ApplicationStore.java @@ -5,6 +5,9 @@ import com.yahoo.config.provision.ApplicationName; import com.yahoo.config.provision.TenantName; import com.yahoo.vespa.hosted.controller.api.identifiers.DeploymentId; +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; import java.time.Instant; import java.util.Optional; @@ -17,7 +20,16 @@ import java.util.Optional; public interface ApplicationStore { /** Returns the application package of the given revision. */ - byte[] get(DeploymentId deploymentId, RevisionId revisionId); + default byte[] get(DeploymentId deploymentId, RevisionId revisionId) { + try (InputStream stream = stream(deploymentId, revisionId)) { + return stream.readAllBytes(); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + InputStream stream(DeploymentId deploymentId, RevisionId revisionId); /** Returns the application package diff, compared to the previous build, for the given tenant, application and build number */ Optional getDiff(TenantName tenantName, ApplicationName applicationName, long buildNumber); @@ -43,7 +55,16 @@ public interface ApplicationStore { void removeAll(TenantName tenant, ApplicationName application); /** Returns the tester application package of the given revision. Does NOT contain the services.xml. */ - byte[] getTester(TenantName tenant, ApplicationName application, RevisionId revision); + default byte[] getTester(TenantName tenant, ApplicationName application, RevisionId revision) { + try (InputStream stream = streamTester(tenant, application, revision)) { + return stream.readAllBytes(); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + InputStream streamTester(TenantName tenantName, ApplicationName applicationName, RevisionId revision); /** Returns the application package diff, compared to the previous build, for the given deployment and build number */ Optional getDevDiff(DeploymentId deploymentId, long buildNumber); diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/ApplicationController.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/ApplicationController.java index 34a7ae89dd2..e09e1f04b8e 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/ApplicationController.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/ApplicationController.java @@ -15,7 +15,6 @@ import com.yahoo.config.provision.InstanceName; import com.yahoo.config.provision.Tags; import com.yahoo.config.provision.TenantName; import com.yahoo.config.provision.zone.ZoneId; -import com.yahoo.log.LogLevel; import com.yahoo.text.Text; import com.yahoo.transaction.Mutex; import com.yahoo.vespa.athenz.api.AthenzDomain; @@ -39,7 +38,6 @@ import com.yahoo.vespa.hosted.controller.api.integration.configserver.ConfigServ import com.yahoo.vespa.hosted.controller.api.integration.configserver.ContainerEndpoint; import com.yahoo.vespa.hosted.controller.api.integration.configserver.DeploymentResult; import com.yahoo.vespa.hosted.controller.api.integration.configserver.DeploymentResult.LogEntry; -import com.yahoo.vespa.hosted.controller.api.integration.configserver.Log; import com.yahoo.vespa.hosted.controller.api.integration.configserver.Node; import com.yahoo.vespa.hosted.controller.api.integration.configserver.NodeFilter; import com.yahoo.vespa.hosted.controller.api.integration.deployment.ApplicationStore; @@ -58,6 +56,7 @@ import com.yahoo.vespa.hosted.controller.application.QuotaUsage; import com.yahoo.vespa.hosted.controller.application.SystemApplication; import com.yahoo.vespa.hosted.controller.application.TenantAndApplicationId; import com.yahoo.vespa.hosted.controller.application.pkg.ApplicationPackage; +import com.yahoo.vespa.hosted.controller.application.pkg.ApplicationPackageStream; import com.yahoo.vespa.hosted.controller.application.pkg.ApplicationPackageValidator; import com.yahoo.vespa.hosted.controller.athenz.impl.AthenzFacade; import com.yahoo.vespa.hosted.controller.certificate.EndpointCertificates; @@ -79,6 +78,7 @@ import com.yahoo.vespa.hosted.controller.versions.VespaVersion; import com.yahoo.vespa.hosted.controller.versions.VespaVersion.Confidence; import com.yahoo.yolean.Exceptions; +import java.io.ByteArrayInputStream; import java.security.Principal; import java.security.cert.X509Certificate; import java.time.Clock; @@ -87,7 +87,6 @@ import java.time.Instant; import java.util.Collection; import java.util.Collections; import java.util.Comparator; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -98,6 +97,7 @@ import java.util.TreeMap; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Predicate; +import java.util.function.Supplier; import java.util.logging.Level; import java.util.logging.Logger; import java.util.stream.Collectors; @@ -489,9 +489,6 @@ public class ApplicationController { DeploymentId deployment = new DeploymentId(job.application(), zone); try (Mutex deploymentLock = lockForDeployment(job.application(), zone)) { - Set containerEndpoints; - Optional endpointCertificateMetadata; - Run run = controller.jobController().last(job) .orElseThrow(() -> new IllegalStateException("No known run of '" + job + "'")); @@ -500,30 +497,32 @@ public class ApplicationController { Version platform = run.versions().sourcePlatform().filter(__ -> deploySourceVersions).orElse(run.versions().targetPlatform()); RevisionId revision = run.versions().sourceRevision().filter(__ -> deploySourceVersions).orElse(run.versions().targetRevision()); - ApplicationPackage applicationPackage = new ApplicationPackage(applicationStore.get(deployment, revision)); - + ApplicationPackageStream applicationPackage = new ApplicationPackageStream(() -> applicationStore.stream(deployment, revision), + ApplicationPackageStream.addingCertificate(run.testerCertificate())); AtomicReference lastRevision = new AtomicReference<>(); Instance instance; + Set containerEndpoints; try (Mutex lock = lock(applicationId)) { LockedApplication application = new LockedApplication(requireApplication(applicationId), lock); application.get().revisions().last().map(ApplicationVersion::id).ifPresent(lastRevision::set); instance = application.get().require(job.application().instance()); - if ( ! applicationPackage.trustedCertificates().isEmpty() - && run.testerCertificate().isPresent()) - applicationPackage = applicationPackage.withTrustedCertificate(run.testerCertificate().get()); - - endpointCertificateMetadata = endpointCertificates.getMetadata(instance, zone, applicationPackage.deploymentSpec()); - containerEndpoints = controller.routing().of(deployment).prepare(application); } // Release application lock while doing the deployment, which is a lengthy task. + Supplier> endpointCertificateMetadata = () -> { + try (Mutex lock = lock(applicationId)) { + Optional data = endpointCertificates.getMetadata(instance, zone, applicationPackage.truncatedPackage().deploymentSpec()); + data.ifPresent(e -> deployLogger.accept("Using CA signed certificate version %s".formatted(e.version()))); + return data; + } + }; + // Carry out deployment without holding the application lock. DeploymentResult result = deploy(job.application(), instance.tags(), applicationPackage, zone, platform, containerEndpoints, endpointCertificateMetadata, run.isDryRun()); - endpointCertificateMetadata.ifPresent(e -> deployLogger.accept("Using CA signed certificate version %s".formatted(e.version()))); // Record the quota usage for this application var quotaUsage = deploymentQuotaUsage(zone, job.application()); @@ -544,8 +543,10 @@ public class ApplicationController { .distinct() .collect(Collectors.toList())) .orElseGet(List::of); - if (warnings.isEmpty()) controller.notificationsDb().removeNotification(source, Notification.Type.applicationPackage); - else controller.notificationsDb().setNotification(source, Notification.Type.applicationPackage, Notification.Level.warning, warnings); + if (warnings.isEmpty()) + controller.notificationsDb().removeNotification(source, Notification.Type.applicationPackage); + else + controller.notificationsDb().setNotification(source, Notification.Type.applicationPackage, Notification.Level.warning, warnings); } lockApplicationOrThrow(applicationId, application -> @@ -606,23 +607,23 @@ public class ApplicationController { /** Deploy a system application to given zone */ public DeploymentResult deploySystemApplicationPackage(SystemApplication application, ZoneId zone, Version version) { if (application.hasApplicationPackage()) { - ApplicationPackage applicationPackage = new ApplicationPackage( - artifactRepository.getSystemApplicationPackage(application.id(), zone, version) + ApplicationPackageStream applicationPackage = new ApplicationPackageStream( + () -> new ByteArrayInputStream(artifactRepository.getSystemApplicationPackage(application.id(), zone, version)) ); - return deploy(application.id(), Tags.empty(), applicationPackage, zone, version, Set.of(), /* No application cert */ Optional.empty(), false); + return deploy(application.id(), Tags.empty(), applicationPackage, zone, version, Set.of(), Optional::empty, false); } else { throw new RuntimeException("This system application does not have an application package: " + application.id().toShortString()); } } /** Deploys the given tester application to the given zone. */ - public DeploymentResult deployTester(TesterId tester, ApplicationPackage applicationPackage, ZoneId zone, Version platform) { - return deploy(tester.id(), Tags.empty(), applicationPackage, zone, platform, Set.of(), /* No application cert for tester*/ Optional.empty(), false); + public DeploymentResult deployTester(TesterId tester, ApplicationPackageStream applicationPackage, ZoneId zone, Version platform) { + return deploy(tester.id(), Tags.empty(), applicationPackage, zone, platform, Set.of(), Optional::empty, false); } - private DeploymentResult deploy(ApplicationId application, Tags tags, ApplicationPackage applicationPackage, + private DeploymentResult deploy(ApplicationId application, Tags tags, ApplicationPackageStream applicationPackage, ZoneId zone, Version platform, Set endpoints, - Optional endpointCertificateMetadata, + Supplier> endpointCertificateMetadata, boolean dryRun) { DeploymentId deployment = new DeploymentId(application, zone); try { @@ -638,13 +639,8 @@ public class ApplicationController { .filter(tenant-> tenant instanceof AthenzTenant) .map(tenant -> ((AthenzTenant)tenant).domain()); - if (zone.environment().isManuallyDeployed()) - controller.applications().applicationStore().putMeta(deployment, - clock.instant(), - applicationPackage.metaDataZip()); - - Quota deploymentQuota = DeploymentQuotaCalculator.calculate(billingController.getQuota(application.tenant()), - asList(application.tenant()), application, zone, applicationPackage.deploymentSpec()); + Supplier deploymentQuota = () -> DeploymentQuotaCalculator.calculate(billingController.getQuota(application.tenant()), + asList(application.tenant()), application, zone, applicationPackage.truncatedPackage().deploymentSpec()); List tenantSecretStores = controller.tenants() .get(application.tenant()) @@ -654,9 +650,9 @@ public class ApplicationController { List operatorCertificates = controller.supportAccess().activeGrantsFor(deployment).stream() .map(SupportAccessGrant::certificate) .collect(toList()); - Optional cloudAccount = decideCloudAccountOf(deployment, applicationPackage.deploymentSpec()); + Supplier> cloudAccount = () -> decideCloudAccountOf(deployment, applicationPackage.truncatedPackage().deploymentSpec()); ConfigServer.PreparedApplication preparedApplication = - configServer.deploy(new DeploymentData(application, tags, zone, applicationPackage.zippedContent(), platform, + configServer.deploy(new DeploymentData(application, tags, zone, applicationPackage::zipStream, platform, endpoints, endpointCertificateMetadata, dockerImageRepo, domain, deploymentQuota, tenantSecretStores, operatorCertificates, cloudAccount, dryRun)); @@ -665,7 +661,12 @@ public class ApplicationController { } finally { // Even if prepare fails, routing configuration may need to be updated if ( ! application.instance().isTester()) { - controller.routing().of(deployment).configure(applicationPackage.deploymentSpec()); + controller.routing().of(deployment).configure(applicationPackage.truncatedPackage().deploymentSpec()); + if (zone.environment().isManuallyDeployed()) + controller.applications().applicationStore().putMeta(deployment, + clock.instant(), + applicationPackage.truncatedPackage().metaDataZip()); + } } } diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackage.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackage.java index b99d825a779..53c78d7c8ec 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackage.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackage.java @@ -59,19 +59,19 @@ import static java.util.stream.Collectors.toMap; * A representation of the content of an application package. * Only meta-data content can be accessed as anything other than compressed data. * A package is identified by a hash of the content. - * - * This is immutable. - * + * * @author bratseth * @author jonmv */ public class ApplicationPackage { - private static final String trustedCertificatesFile = "security/clients.pem"; - private static final String buildMetaFile = "build-meta.json"; + static final String trustedCertificatesFile = "security/clients.pem"; + static final String buildMetaFile = "build-meta.json"; static final String deploymentFile = "deployment.xml"; - private static final String validationOverridesFile = "validation-overrides.xml"; + static final String validationOverridesFile = "validation-overrides.xml"; static final String servicesFile = "services.xml"; + static final Set prePopulated = Set.of(deploymentFile, validationOverridesFile, servicesFile, buildMetaFile, trustedCertificatesFile); + private static Hasher hasher() { return Hashing.murmur3_128().newHasher(); } private final String bundleHash; @@ -101,7 +101,7 @@ public class ApplicationPackage { */ public ApplicationPackage(byte[] zippedContent, boolean requireFiles) { this.zippedContent = Objects.requireNonNull(zippedContent, "The application package content cannot be null"); - this.files = new ZipArchiveCache(zippedContent, Set.of(deploymentFile, validationOverridesFile, servicesFile, buildMetaFile, trustedCertificatesFile)); + this.files = new ZipArchiveCache(zippedContent, prePopulated); Optional deploymentSpec = files.get(deploymentFile).map(bytes -> new String(bytes, UTF_8)).map(DeploymentSpec::fromXml); if (requireFiles && deploymentSpec.isEmpty()) @@ -122,17 +122,6 @@ public class ApplicationPackage { preProcessAndPopulateCache(); } - /** Returns a copy of this with the given certificate appended. */ - public ApplicationPackage withTrustedCertificate(X509Certificate certificate) { - List trustedCertificates = new ArrayList<>(this.trustedCertificates); - trustedCertificates.add(certificate); - byte[] certificatesBytes = X509CertificateUtils.toPem(trustedCertificates).getBytes(UTF_8); - - ByteArrayOutputStream modified = new ByteArrayOutputStream(zippedContent.length + certificatesBytes.length); - ZipEntries.transferAndWrite(modified, new ByteArrayInputStream(zippedContent), trustedCertificatesFile, certificatesBytes); - return new ApplicationPackage(modified.toByteArray()); - } - /** Hash of all files and settings that influence what is deployed to config servers. */ public String bundleHash() { return bundleHash; @@ -295,7 +284,7 @@ public class ApplicationPackage { private Map> read(Collection names) { var entries = ZipEntries.from(zip, - name -> names.contains(name), + names::contains, maxSize, true) .asList().stream() diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackageStream.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackageStream.java new file mode 100644 index 00000000000..3288759b174 --- /dev/null +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackageStream.java @@ -0,0 +1,244 @@ +package com.yahoo.vespa.hosted.controller.application.pkg; + +import com.yahoo.security.X509CertificateUtils; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.security.cert.X509Certificate; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Predicate; +import java.util.function.Supplier; +import java.util.function.UnaryOperator; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; +import java.util.zip.ZipOutputStream; + +import static com.yahoo.security.X509CertificateUtils.certificateListFromPem; +import static java.io.OutputStream.nullOutputStream; +import static java.lang.Math.min; +import static java.nio.charset.StandardCharsets.UTF_8; + +/** + * Wraps a zipped application package stream. + * This allows replacing content as the input stream is read. + * This also retains a truncated {@link ApplicationPackage}, containing only the specified set of files, + * which can be accessed when this stream is fully exhausted. + * + * @author jonmv + */ +public class ApplicationPackageStream { + + private final byte[] inBuffer = new byte[1 << 16]; + private final ByteArrayOutputStream out = new ByteArrayOutputStream(1 << 16); + private final ZipOutputStream outZip = new ZipOutputStream(out); + private final ByteArrayOutputStream teeOut = new ByteArrayOutputStream(1 << 16); + private final ZipOutputStream teeZip = new ZipOutputStream(teeOut); + private final Replacer replacer; + private final Predicate filter; + private final Supplier in; + + private ApplicationPackage ap = null; + private boolean done = false; + + public static Replacer addingCertificate(Optional certificate) { + return certificate.map(cert -> Replacer.of(Map.of(ApplicationPackage.trustedCertificatesFile, + trustBytes -> append(trustBytes, cert)))) + .orElse(Replacer.of(Map.of())); + } + + static InputStream append(InputStream trustIn, X509Certificate cert) { + try { + List trusted = trustIn == null ? new ArrayList<>() + : new ArrayList<>(certificateListFromPem(new String(trustIn.readAllBytes(), UTF_8))); + trusted.add(cert); + return new ByteArrayInputStream(X509CertificateUtils.toPem(trusted).getBytes(UTF_8)); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + /** Stream that effectively copies the input stream to its {@link #truncatedPackage()} when exhausted. */ + public ApplicationPackageStream(Supplier in) { + this(in, __ -> true, Map.of()); + } + + /** Stream that replaces the indicated entries, and copies all metadata files to its {@link #truncatedPackage()} when exhausted. */ + public ApplicationPackageStream(Supplier in, Replacer replacer) { + this(in, name -> ApplicationPackage.prePopulated.contains(name) || name.endsWith(".xml"), replacer); + } + + /** Stream that replaces the indicated entries, and copies the filtered entries to its {@link #truncatedPackage()} when exhausted. */ + public ApplicationPackageStream(Supplier in, Predicate truncation, Map> replacements) { + this(in, truncation, Replacer.of(replacements)); + } + + /** Stream that uses the given replacer to modify content, and copies the filtered entries to its {@link #truncatedPackage()} when exhausted. */ + public ApplicationPackageStream(Supplier in, Predicate truncation, Replacer replacer) { + this.in = in; + this.filter = truncation; + this.replacer = replacer; + } + + public InputStream zipStream() { + return new Stream(); + } + + /** Returns the application backed by only the files indicated by the truncation filter. Throws if not yet exhausted. */ + public ApplicationPackage truncatedPackage() { + if (ap == null) throw new IllegalStateException("must completely exhaust input before reading package"); + return ap; + } + + private class Stream extends InputStream { + + private final ZipInputStream inZip = new ZipInputStream(in.get()); + private byte[] currentOut = new byte[0]; + private InputStream currentIn = InputStream.nullInputStream(); + private boolean includeCurrent = false; + private int pos = 0; + private boolean closed = false; + + private void fill() throws IOException { + if (done) return; + while (out.size() == 0) { + // Exhaust current entry first. + int i, n = out.size(); + while (out.size() == 0 && (i = currentIn.read(inBuffer)) != -1) { + if (includeCurrent) teeZip.write(inBuffer, 0, i); + outZip.write(inBuffer, 0, i); + n += i; + } + + // Current entry exhausted, look for next. + if (n == 0) { + next(); + if (done) break; + } + } + + currentOut = out.toByteArray(); + out.reset(); + pos = 0; + } + + private void next() throws IOException { + if (includeCurrent) teeZip.closeEntry(); + outZip.closeEntry(); + + ZipEntry next = inZip.getNextEntry(); + String name; + InputStream content = null; + if (next == null) { + // We may still have replacements to fill in, but if we don't, we're done filling, forever! + name = replacer.next(); + if (name == null) { + outZip.close(); // This typically makes new output available, so must check for that after this. + teeZip.close(); + currentIn = nullInputStream(); + ap = new ApplicationPackage(teeOut.toByteArray()); + done = true; + return; + } + } + else { + name = next.getName(); + content = new FilterInputStream(inZip) { @Override public void close() { } }; // Protect inZip from replacements closing it. + } + + includeCurrent = filter.test(name); + currentIn = replacer.modify(name, content); + if (currentIn == null) { + currentIn = InputStream.nullInputStream(); + } + else { + if (includeCurrent) teeZip.putNextEntry(new ZipEntry(name)); + outZip.putNextEntry(new ZipEntry(name)); + } + } + + @Override + public int read() throws IOException { + if (closed) throw new IOException("stream closed"); + if (pos == currentOut.length) { + fill(); + if (pos == currentOut.length) return -1; + } + return 0xff & inBuffer[pos++]; + } + + @Override + public int read(byte[] out, int off, int len) throws IOException { + if (closed) throw new IOException("stream closed"); + if ((off | len | (off + len) | (out.length - (off + len))) < 0) throw new IndexOutOfBoundsException(); + if (pos == currentOut.length) { + fill(); + if (pos == currentOut.length) return -1; + } + int n = min(currentOut.length - pos, len); + System.arraycopy(currentOut, pos, out, off, n); + pos += n; + return n; + } + + @Override + public int available() throws IOException { + return pos == currentOut.length && done ? 0 : 1; + } + + @Override + public void close() { + if ( ! closed) try { + transferTo(nullOutputStream()); + inZip.close(); + closed = true; + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + } + + /** Replaces entries in a zip stream as they are encountered, then appends remaining entries at the end. */ + public interface Replacer { + + /** Called when the entries of the original zip stream are exhausted. Return remaining names, or {@code null} when none left. */ + String next(); + + /** Modify content for a given name; return {@code null} for removal; in is {@code null} for entries not present in the input. */ + InputStream modify(String name, InputStream in); + + /** + * Wraps a map of fixed replacements, and: + *
    + *
  • Removes entries whose value is {@code null}.
  • + *
  • Modifies entries present in both input and the map.
  • + *
  • Appends entries present exclusively in the map.
  • + *
  • Writes all other entries as they are.
  • + *
+ */ + static Replacer of(Map> replacements) { + return new Replacer() { + final Map> remaining = new HashMap<>(replacements); + @Override public String next() { + return remaining.isEmpty() ? null : remaining.keySet().iterator().next(); + } + @Override public InputStream modify(String name, InputStream in) { + UnaryOperator mapper = remaining.remove(name); + return mapper == null ? in : mapper.apply(in); + } + }; + } + + } + +} diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/TestPackage.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/TestPackage.java index 5b20c57fcca..f838e44bd02 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/TestPackage.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/TestPackage.java @@ -18,15 +18,14 @@ import com.yahoo.text.Text; import com.yahoo.vespa.hosted.controller.api.integration.deployment.RunId; import com.yahoo.vespa.hosted.controller.api.integration.deployment.TesterCloud.Suite; import com.yahoo.vespa.hosted.controller.api.integration.deployment.TesterId; +import com.yahoo.vespa.hosted.controller.application.pkg.ApplicationPackageStream.Replacer; import com.yahoo.vespa.hosted.controller.config.ControllerConfig; import com.yahoo.vespa.hosted.controller.config.ControllerConfig.Steprunner.Testerapp; import com.yahoo.yolean.Exceptions; import javax.security.auth.x500.X500Principal; import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.UncheckedIOException; +import java.io.InputStream; import java.math.BigInteger; import java.security.KeyPair; import java.security.cert.X509Certificate; @@ -43,6 +42,8 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.UUID; +import java.util.function.Supplier; +import java.util.function.UnaryOperator; import java.util.jar.JarInputStream; import java.util.jar.Manifest; import java.util.regex.Pattern; @@ -53,8 +54,9 @@ import static com.yahoo.vespa.hosted.controller.api.integration.deployment.Teste import static com.yahoo.vespa.hosted.controller.api.integration.deployment.TesterCloud.Suite.system; import static com.yahoo.vespa.hosted.controller.application.pkg.ApplicationPackage.deploymentFile; import static com.yahoo.vespa.hosted.controller.application.pkg.ApplicationPackage.servicesFile; -import static com.yahoo.vespa.hosted.controller.application.pkg.ZipEntries.transferAndWrite; +import static java.io.InputStream.nullInputStream; import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.function.UnaryOperator.identity; import static java.util.stream.Collectors.groupingBy; import static java.util.stream.Collectors.mapping; import static java.util.stream.Collectors.toList; @@ -71,32 +73,14 @@ public class TestPackage { static final NodeResources DEFAULT_TESTER_RESOURCES_AWS = new NodeResources(2, 8, 50, 0.3, NodeResources.DiskSpeed.any); static final NodeResources DEFAULT_TESTER_RESOURCES = new NodeResources(1, 4, 50, 0.3, NodeResources.DiskSpeed.any); - private final ApplicationPackage applicationPackage; + private final ApplicationPackageStream applicationPackageStream; private final X509Certificate certificate; - public TestPackage(byte[] testPackage, boolean isPublicSystem, RunId id, Testerapp testerApp, + public TestPackage(Supplier inZip, boolean isPublicSystem, RunId id, Testerapp testerApp, DeploymentSpec spec, Instant certificateValidFrom, Duration certificateValidDuration) { - - // Copy contents of submitted application-test.zip, and ensure required directories exist within the zip. - Map entries = new HashMap<>(); - entries.put("artifacts/.ignore-" + UUID.randomUUID(), new byte[0]); - entries.put("tests/.ignore-" + UUID.randomUUID(), new byte[0]); - - entries.put(servicesFile, - servicesXml(! isPublicSystem, - certificateValidFrom != null, - hasLegacyTests(testPackage), - testerResourcesFor(id.type().zone(), spec.requireInstance(id.application().instance())), - testerApp)); - - entries.put(deploymentFile, - deploymentXml(id.tester(), - spec.athenzDomain(), - spec.requireInstance(id.application().instance()) - .athenzService(id.type().zone().environment(), id.type().zone().region()))); - + KeyPair keyPair; if (certificateValidFrom != null) { - KeyPair keyPair = KeyUtils.generateKeypair(KeyAlgorithm.RSA, 2048); + keyPair = KeyUtils.generateKeypair(KeyAlgorithm.RSA, 2048); X500Principal subject = new X500Principal("CN=" + id.tester().id().toFullString() + "." + id.type() + "." + id.number()); this.certificate = X509CertificateBuilder.fromKeypair(keyPair, subject, @@ -105,26 +89,60 @@ public class TestPackage { SignatureAlgorithm.SHA512_WITH_RSA, BigInteger.valueOf(1)) .build(); - entries.put("artifacts/key", KeyUtils.toPem(keyPair.getPrivate()).getBytes(UTF_8)); - entries.put("artifacts/cert", X509CertificateUtils.toPem(certificate).getBytes(UTF_8)); } else { + keyPair = null; this.certificate = null; } + this.applicationPackageStream = new ApplicationPackageStream(inZip, __ -> false, new Replacer() { + + // Initially skips all declared entries, ensuring they're generated and appended after all input entries. + final Map> entries = new HashMap<>(); + final Map> replacements = new HashMap<>(); + boolean hasLegacyTests = false; + + @Override + public String next() { + if (entries.isEmpty()) return null; + String next = entries.keySet().iterator().next(); + replacements.put(next, entries.remove(next)); + return next; + } - ByteArrayOutputStream buffer = new ByteArrayOutputStream(testPackage.length + 10_000); - transferAndWrite(buffer, new ByteArrayInputStream(testPackage), entries); - this.applicationPackage = new ApplicationPackage(buffer.toByteArray()); - } - - static boolean hasLegacyTests(byte[] testPackage) { - return ZipEntries.from(testPackage, __ -> true, 0, false).asList().stream() - .anyMatch(file -> file.name().startsWith("artifacts/") && file.name().endsWith("-tests.jar")); + @Override + public InputStream modify(String name, InputStream in) { + hasLegacyTests |= name.startsWith("artifacts/") && name.endsWith("-tests.jar"); + return entries.containsKey(name) ? null : replacements.getOrDefault(name, identity()).apply(in); + } + { + // Copy contents of submitted application-test.zip, and ensure required directories exist within the zip. + entries.put("artifacts/.ignore-" + UUID.randomUUID(), __ -> nullInputStream()); + entries.put("tests/.ignore-" + UUID.randomUUID(), __ -> nullInputStream()); + + entries.put(servicesFile, + __ -> new ByteArrayInputStream(servicesXml( ! isPublicSystem, + certificateValidFrom != null, + hasLegacyTests, + testerResourcesFor(id.type().zone(), spec.requireInstance(id.application().instance())), + testerApp))); + + entries.put(deploymentFile, + __ -> new ByteArrayInputStream(deploymentXml(id.tester(), + spec.athenzDomain(), + spec.requireInstance(id.application().instance()) + .athenzService(id.type().zone().environment(), id.type().zone().region())))); + + if (certificate != null) { + entries.put("artifacts/key", __ -> new ByteArrayInputStream(KeyUtils.toPem(keyPair.getPrivate()).getBytes(UTF_8))); + entries.put("artifacts/cert", __ -> new ByteArrayInputStream(X509CertificateUtils.toPem(certificate).getBytes(UTF_8))); + } + } + }); } - public ApplicationPackage asApplicationPackage() { - return applicationPackage; + public ApplicationPackageStream asApplicationPackage() { + return applicationPackageStream; } public X509Certificate certificate() { @@ -207,7 +225,7 @@ public class TestPackage { return new TestSummary(problems, suites); } - public static NodeResources testerResourcesFor(ZoneId zone, DeploymentInstanceSpec spec) { + static NodeResources testerResourcesFor(ZoneId zone, DeploymentInstanceSpec spec) { NodeResources nodeResources = spec.steps().stream() .filter(step -> step.concerns(zone.environment())) .findFirst() @@ -219,7 +237,7 @@ public class TestPackage { } /** Returns the generated services.xml content for the tester application. */ - public static byte[] servicesXml(boolean systemUsesAthenz, boolean useTesterCertificate, boolean hasLegacyTests, + static byte[] servicesXml(boolean systemUsesAthenz, boolean useTesterCertificate, boolean hasLegacyTests, NodeResources resources, ControllerConfig.Steprunner.Testerapp config) { int jdiscMemoryGb = 2; // 2Gb memory for tester application which uses Maven. int jdiscMemoryPct = (int) Math.ceil(100 * jdiscMemoryGb / resources.memoryGb()); @@ -279,7 +297,7 @@ public class TestPackage { } /** Returns a dummy deployment xml which sets up the service identity for the tester, if present. */ - public static byte[] deploymentXml(TesterId id, Optional athenzDomain, Optional athenzService) { + static byte[] deploymentXml(TesterId id, Optional athenzDomain, Optional athenzService) { String deploymentSpec = "\n" + " new NoSuchElementException("'" + name + "' has no content")); } public Optional content() { return content; } public long size() { return size; } } diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/InternalStepRunner.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/InternalStepRunner.java index e8c92d3e3f6..309cb1343f9 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/InternalStepRunner.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/InternalStepRunner.java @@ -34,6 +34,7 @@ import com.yahoo.vespa.hosted.controller.application.Deployment; import com.yahoo.vespa.hosted.controller.application.Endpoint; import com.yahoo.vespa.hosted.controller.application.TenantAndApplicationId; import com.yahoo.vespa.hosted.controller.application.pkg.ApplicationPackage; +import com.yahoo.vespa.hosted.controller.application.pkg.ApplicationPackageStream; import com.yahoo.vespa.hosted.controller.application.pkg.TestPackage; import com.yahoo.vespa.hosted.controller.maintenance.JobRunner; import com.yahoo.vespa.hosted.controller.notification.Notification; @@ -43,6 +44,7 @@ import com.yahoo.vespa.hosted.controller.routing.context.DeploymentRoutingContex import com.yahoo.yolean.Exceptions; import java.io.ByteArrayOutputStream; +import java.io.InputStream; import java.io.PrintStream; import java.io.UncheckedIOException; import java.security.cert.CertificateExpiredException; @@ -926,14 +928,13 @@ public class InternalStepRunner implements StepRunner { } /** Returns the application package for the tester application, assembled from a generated config, fat-jar and services.xml. */ - private ApplicationPackage testerPackage(RunId id) { + private ApplicationPackageStream testerPackage(RunId id) { RevisionId revision = controller.jobController().run(id).versions().targetRevision(); DeploymentSpec spec = controller.applications().requireApplication(TenantAndApplicationId.from(id.application())).deploymentSpec(); - byte[] testZip = controller.applications().applicationStore().getTester(id.application().tenant(), - id.application().application(), revision); boolean useTesterCertificate = useTesterCertificate(id); - TestPackage testPackage = new TestPackage(testZip, + TestPackage testPackage = new TestPackage(() -> controller.applications().applicationStore().streamTester(id.application().tenant(), + id.application().application(), revision), controller.system().isPublic(), id, controller.controllerConfig().steprunner().testerapp(), diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/JobController.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/JobController.java index 08cf8d2e1c4..f94bd51fe4c 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/JobController.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/JobController.java @@ -207,32 +207,36 @@ public class JobController { return run; List log; - Instant deployedAt; + Optional deployedAt; Instant from; if ( ! run.id().type().isProduction()) { - deployedAt = run.stepInfo(installInitialReal).or(() -> run.stepInfo(installReal)).flatMap(StepInfo::startTime).orElseThrow(); - from = run.lastVespaLogTimestamp().isAfter(run.start()) ? run.lastVespaLogTimestamp() : deployedAt.minusSeconds(10); - log = LogEntry.parseVespaLog(controller.serviceRegistry().configServer() - .getLogs(new DeploymentId(id.application(), zone), - Map.of("from", Long.toString(from.toEpochMilli()))), - from); + deployedAt = run.stepInfo(installInitialReal).or(() -> run.stepInfo(installReal)).flatMap(StepInfo::startTime); + if (deployedAt.isPresent()) { + from = run.lastVespaLogTimestamp().isAfter(run.start()) ? run.lastVespaLogTimestamp() : deployedAt.get().minusSeconds(10); + log = LogEntry.parseVespaLog(controller.serviceRegistry().configServer() + .getLogs(new DeploymentId(id.application(), zone), + Map.of("from", Long.toString(from.toEpochMilli()))), + from); + } + else log = List.of(); } - else - log = List.of(); + else log = List.of(); if (id.type().isTest()) { - deployedAt = run.stepInfo(installTester).flatMap(StepInfo::startTime).orElseThrow(); - from = run.lastVespaLogTimestamp().isAfter(run.start()) ? run.lastVespaLogTimestamp() : deployedAt.minusSeconds(10); - List testerLog = LogEntry.parseVespaLog(controller.serviceRegistry().configServer() - .getLogs(new DeploymentId(id.tester().id(), zone), - Map.of("from", Long.toString(from.toEpochMilli()))), - from); - - Instant justNow = controller.clock().instant().minusSeconds(2); - log = Stream.concat(log.stream(), testerLog.stream()) - .filter(entry -> entry.at().isBefore(justNow)) - .sorted(comparing(LogEntry::at)) - .collect(toUnmodifiableList()); + deployedAt = run.stepInfo(installTester).flatMap(StepInfo::startTime); + if (deployedAt.isPresent()) { + from = run.lastVespaLogTimestamp().isAfter(run.start()) ? run.lastVespaLogTimestamp() : deployedAt.get().minusSeconds(10); + List testerLog = LogEntry.parseVespaLog(controller.serviceRegistry().configServer() + .getLogs(new DeploymentId(id.tester().id(), zone), + Map.of("from", Long.toString(from.toEpochMilli()))), + from); + + Instant justNow = controller.clock().instant().minusSeconds(2); + log = Stream.concat(log.stream(), testerLog.stream()) + .filter(entry -> entry.at().isBefore(justNow)) + .sorted(comparing(LogEntry::at)) + .toList(); + } } if (log.isEmpty()) return run; diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackageTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackageTest.java index 8cf861ff963..71ce291df36 100644 --- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackageTest.java +++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackageTest.java @@ -3,18 +3,40 @@ package com.yahoo.vespa.hosted.controller.application.pkg; import com.yahoo.config.application.api.DeploymentSpec; import com.yahoo.config.application.api.ValidationId; +import com.yahoo.io.LazyInputStream; +import com.yahoo.security.KeyAlgorithm; +import com.yahoo.security.KeyUtils; +import com.yahoo.security.SignatureAlgorithm; +import com.yahoo.security.X509CertificateBuilder; import org.junit.jupiter.api.Test; +import javax.security.auth.x500.X500Principal; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.SequenceInputStream; +import java.math.BigInteger; import java.nio.file.Files; import java.nio.file.Path; +import java.security.KeyPair; +import java.security.cert.X509Certificate; import java.time.Instant; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.function.Predicate; +import java.util.function.UnaryOperator; import java.util.stream.Collectors; +import java.util.stream.IntStream; +import static com.yahoo.vespa.hosted.controller.application.pkg.ApplicationPackage.filesZip; +import static com.yahoo.vespa.hosted.controller.application.pkg.ApplicationPackageStream.addingCertificate; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -24,35 +46,41 @@ import static org.junit.jupiter.api.Assertions.fail; */ public class ApplicationPackageTest { - static final String deploymentXml = "\n" + - "\n" + - " \n" + - " \n" + - " \n" + - " us-central-1\n" + - " \n" + - " \n" + - "\n"; - - static final String servicesXml = "\n" + - " \n" + - " \n" + - " \n" + - " \n" + - " \n" + - " \n" + - "\n"; + static final String deploymentXml = """ + + + + + + us-central-1 + + + + """; + + static final String servicesXml = """ + + + + + + + + + """; private static final String jdiscXml = "\n"; - private static final String contentXml = "\n" + - " \n" + - "\n" + - ""; + private static final String contentXml = """ + + + + """; - private static final String nodesXml = "\n" + - " \n" + - ""; + private static final String nodesXml = """ + + + """; @Test void test_createEmptyForDeploymentRemoval() { @@ -67,22 +95,22 @@ public class ApplicationPackageTest { @Test void testMetaData() { - byte[] zip = ApplicationPackage.filesZip(Map.of("services.xml", servicesXml.getBytes(UTF_8), - "jdisc.xml", jdiscXml.getBytes(UTF_8), - "content/content.xml", contentXml.getBytes(UTF_8), - "content/nodes.xml", nodesXml.getBytes(UTF_8), - "gurba", "gurba".getBytes(UTF_8))); + byte[] zip = filesZip(Map.of("services.xml", servicesXml.getBytes(UTF_8), + "jdisc.xml", jdiscXml.getBytes(UTF_8), + "content/content.xml", contentXml.getBytes(UTF_8), + "content/nodes.xml", nodesXml.getBytes(UTF_8), + "gurba", "gurba".getBytes(UTF_8))); assertEquals(Map.of("services.xml", servicesXml, - "jdisc.xml", jdiscXml, - "content/content.xml", contentXml, - "content/nodes.xml", nodesXml), - unzip(new ApplicationPackage(zip, false).metaDataZip())); + "jdisc.xml", jdiscXml, + "content/content.xml", contentXml, + "content/nodes.xml", nodesXml), + unzip(new ApplicationPackage(zip, false).metaDataZip())); } @Test void testMetaDataWithMissingFiles() { - byte[] zip = ApplicationPackage.filesZip(Map.of("services.xml", servicesXml.getBytes(UTF_8))); + byte[] zip = filesZip(Map.of("services.xml", servicesXml.getBytes(UTF_8))); try { new ApplicationPackage(zip, false).metaDataZip(); @@ -132,15 +160,108 @@ public class ApplicationPackageTest { assertEquals(originalPackage.bundleHash(), similarDeploymentXml.bundleHash()); } - private static Map unzip(byte[] zip) { - return ZipEntries.from(zip, __ -> true, 1 << 10, true) + static Map unzip(byte[] zip) { + return ZipEntries.from(zip, __ -> true, 1 << 24, true) .asList().stream() .collect(Collectors.toMap(ZipEntries.ZipEntryWithContent::name, - entry -> new String(entry.contentOrThrow(), UTF_8))); + entry -> new String(entry.content().orElse(new byte[0]), UTF_8))); } - private ApplicationPackage getApplicationZip(String path) throws Exception { + private ApplicationPackage getApplicationZip(String path) throws IOException { return new ApplicationPackage(Files.readAllBytes(Path.of("src/test/resources/application-packages/" + path)), true); } + @Test + void test_replacement() throws IOException { + byte[] zip = zip(Map.of()); + List certificates = IntStream.range(0, 3) + .mapToObj(i -> { + KeyPair keyPair = KeyUtils.generateKeypair(KeyAlgorithm.EC, 256); + X500Principal subject = new X500Principal("CN=subject" + i); + return X509CertificateBuilder.fromKeypair(keyPair, + subject, + Instant.now(), + Instant.now().plusSeconds(1), + SignatureAlgorithm.SHA512_WITH_ECDSA, + BigInteger.valueOf(1)) + .build(); + }).toList(); + + assertEquals(List.of(), new ApplicationPackage(zip).trustedCertificates()); + for (int i = 0; i < certificates.size(); i++) { + InputStream in = new ByteArrayInputStream(zip); + zip = new ApplicationPackageStream(() -> in, __ -> false, addingCertificate(Optional.of(certificates.get(i)))).zipStream().readAllBytes(); + assertEquals(certificates.subList(0, i + 1), new ApplicationPackage(zip).trustedCertificates()); + } + } + + static byte[] zip(Map content) { + return filesZip(content.entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey(), + entry -> entry.getValue().getBytes(UTF_8)))); + } + + @Test + void testApplicationPackageStream() throws Exception { + Map content = Map.of("deployment.xml", deploymentXml, + "services.xml", servicesXml, + "jdisc.xml", jdiscXml, + "unused1.xml", jdiscXml, + "content/content.xml", contentXml, + "content/nodes.xml", nodesXml, + "gurba", "gurba"); + byte[] zip = zip(content); + assertEquals(content, unzip(zip)); + + ApplicationPackageStream identity = new ApplicationPackageStream(() -> new ByteArrayInputStream(zip)); + InputStream lazy = new LazyInputStream(() -> new ByteArrayInputStream(identity.truncatedPackage().zippedContent())); + assertEquals("must completely exhaust input before reading package", + assertThrows(IllegalStateException.class, identity::truncatedPackage).getMessage()); + + // Verify no content has changed when passing through the stream. + ByteArrayOutputStream out = new ByteArrayOutputStream(); + identity.zipStream().transferTo(out); + assertEquals(content, unzip(out.toByteArray())); + assertEquals(content, unzip(identity.truncatedPackage().zippedContent())); + assertEquals(content, unzip(lazy.readAllBytes())); + ApplicationPackage original = new ApplicationPackage(zip); + assertEquals(unzip(original.metaDataZip()), unzip(identity.truncatedPackage().metaDataZip())); + assertEquals(original.bundleHash(), identity.truncatedPackage().bundleHash()); + + // Change deployment.xml, remove unused1.xml and add unused2.xml + Map> replacements = Map.of("deployment.xml", in -> new SequenceInputStream(in, new ByteArrayInputStream("\n\n".getBytes(UTF_8))), + "unused1.xml", in -> null, + "unused2.xml", __ -> new ByteArrayInputStream(jdiscXml.getBytes(UTF_8))); + Predicate truncation = name -> name.endsWith(".xml"); + ApplicationPackageStream modifier = new ApplicationPackageStream(() -> new ByteArrayInputStream(zip), truncation, replacements); + out.reset(); + modifier.zipStream().transferTo(out); + + assertEquals(Map.of("deployment.xml", deploymentXml + "\n\n", + "services.xml", servicesXml, + "jdisc.xml", jdiscXml, + "unused2.xml", jdiscXml, + "content/content.xml", contentXml, + "content/nodes.xml", nodesXml, + "gurba", "gurba"), + unzip(out.toByteArray())); + + assertEquals(Map.of("deployment.xml", deploymentXml + "\n\n", + "services.xml", servicesXml, + "jdisc.xml", jdiscXml, + "unused2.xml", jdiscXml, + "content/content.xml", contentXml, + "content/nodes.xml", nodesXml), + unzip(modifier.truncatedPackage().zippedContent())); + + // Compare retained metadata for an updated original package, and the truncated package of the modifier. + assertEquals(unzip(new ApplicationPackage(zip(Map.of("deployment.xml", deploymentXml + "\n\n", // Expected to change. + "services.xml", servicesXml, + "jdisc.xml", jdiscXml, + "unused1.xml", jdiscXml, // Irrelevant. + "content/content.xml", contentXml, + "content/nodes.xml", nodesXml, + "gurba", "gurba"))).metaDataZip()), + unzip(modifier.truncatedPackage().metaDataZip())); + } + } diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/pkg/TestPackageTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/pkg/TestPackageTest.java index bff0ccc8ae1..6da8db1c259 100644 --- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/pkg/TestPackageTest.java +++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/pkg/TestPackageTest.java @@ -1,18 +1,24 @@ package com.yahoo.vespa.hosted.controller.application.pkg; import com.yahoo.config.application.api.DeploymentSpec; +import com.yahoo.config.provision.ApplicationId; import com.yahoo.config.provision.NodeResources; import com.yahoo.config.provision.zone.ZoneId; +import com.yahoo.vespa.hosted.controller.api.integration.deployment.JobType; +import com.yahoo.vespa.hosted.controller.api.integration.deployment.RunId; import com.yahoo.vespa.hosted.controller.application.pkg.TestPackage.TestSummary; import com.yahoo.vespa.hosted.controller.config.ControllerConfig; +import com.yahoo.vespa.hosted.controller.config.ControllerConfig.Steprunner.Testerapp; import org.junit.jupiter.api.Test; +import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.jar.JarOutputStream; import java.util.zip.ZipEntry; @@ -20,9 +26,11 @@ import static com.yahoo.vespa.hosted.controller.api.integration.deployment.Teste import static com.yahoo.vespa.hosted.controller.api.integration.deployment.TesterCloud.Suite.staging; import static com.yahoo.vespa.hosted.controller.api.integration.deployment.TesterCloud.Suite.staging_setup; import static com.yahoo.vespa.hosted.controller.api.integration.deployment.TesterCloud.Suite.system; +import static com.yahoo.vespa.hosted.controller.application.pkg.ApplicationPackageTest.unzip; import static com.yahoo.vespa.hosted.controller.application.pkg.TestPackage.validateTests; import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; /** * @author jonmv @@ -77,15 +85,15 @@ public class TestPackageTest { @Test void testBundleValidation() throws IOException { byte[] testZip = ApplicationPackage.filesZip(Map.of("components/foo-tests.jar", testsJar("SystemTest", "StagingSetup", "ProductionTest"), - "artifacts/key", new byte[0])); + "artifacts/key", new byte[0])); TestSummary summary = validateTests(List.of(system), testZip); assertEquals(List.of(system, staging_setup, production), summary.suites()); assertEquals(List.of("test package contains 'artifacts/key'; this conflicts with credentials used to run tests in Vespa Cloud", - "test package has staging setup, so it should also include staging tests", - "test package has production tests, but no production tests are declared in deployment.xml", - "see https://docs.vespa.ai/en/testing.html for details on how to write system tests for Vespa"), - summary.problems()); + "test package has staging setup, so it should also include staging tests", + "test package has production tests, but no production tests are declared in deployment.xml", + "see https://docs.vespa.ai/en/testing.html for details on how to write system tests for Vespa"), + summary.problems()); } @Test @@ -95,20 +103,47 @@ public class TestPackageTest { assertEquals(List.of(staging, production), summary.suites()); assertEquals(List.of("test package has staging tests, so it should also include staging setup", - "see https://docs.vespa.ai/en/testing.html for details on how to write system tests for Vespa"), - summary.problems()); + "see https://docs.vespa.ai/en/testing.html for details on how to write system tests for Vespa"), + summary.problems()); } @Test void testBasicTestsValidation() { byte[] testZip = ApplicationPackage.filesZip(Map.of("tests/staging-test/foo.json", new byte[0], - "tests/staging-setup/foo.json", new byte[0])); + "tests/staging-setup/foo.json", new byte[0])); TestSummary summary = validateTests(List.of(system, production), testZip); assertEquals(List.of(staging_setup, staging), summary.suites()); assertEquals(List.of("test package has no system tests, but is declared in deployment.xml", - "test package has no production tests, but production tests are declared in deployment.xml", - "see https://docs.vespa.ai/en/testing.html for details on how to write system tests for Vespa"), - summary.problems()); + "test package has no production tests, but production tests are declared in deployment.xml", + "see https://docs.vespa.ai/en/testing.html for details on how to write system tests for Vespa"), + summary.problems()); + } + + @Test + void testTestPacakgeAssembly() throws IOException { + byte[] bundleZip = ApplicationPackage.filesZip(Map.of("components/foo-tests.jar", testsJar("SystemTest", "ProductionTest"), + "artifacts/key", new byte[0])); + TestPackage bundleTests = new TestPackage(() -> new ByteArrayInputStream(bundleZip), + false, + new RunId(ApplicationId.defaultId(), JobType.dev("abc"), 123), + new Testerapp.Builder().tenantCdBundle("foo").runtimeProviderClass("bar").build(), + DeploymentSpec.fromXml(""" + + + + """), + null, + null); + + Map bundlePackage = unzip(bundleTests.asApplicationPackage().zipStream().readAllBytes()); + bundlePackage.keySet().removeIf(name -> name.startsWith("tests/.ignore") || name.startsWith("artifacts/.ignore")); + assertEquals(Set.of("deployment.xml", + "services.xml", + "components/foo-tests.jar", + "artifacts/key"), + bundlePackage.keySet()); + assertEquals(Map.of(), + unzip(bundleTests.asApplicationPackage().truncatedPackage().zippedContent())); } @Test diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/pkg/ZipEntriesTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/pkg/ZipEntriesTest.java deleted file mode 100644 index 37062e1002b..00000000000 --- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/pkg/ZipEntriesTest.java +++ /dev/null @@ -1,50 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.hosted.controller.application.pkg; - -import com.yahoo.security.KeyAlgorithm; -import com.yahoo.security.KeyUtils; -import com.yahoo.security.SignatureAlgorithm; -import com.yahoo.security.X509CertificateBuilder; -import org.junit.jupiter.api.Test; - -import javax.security.auth.x500.X500Principal; -import java.math.BigInteger; -import java.security.KeyPair; -import java.security.cert.X509Certificate; -import java.time.Instant; -import java.util.List; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -/** - * @author mpolden - */ -public class ZipEntriesTest { - - @Test - void test_replacement() { - ApplicationPackage applicationPackage = new ApplicationPackage(new byte[0]); - List certificates = IntStream.range(0, 3) - .mapToObj(i -> { - KeyPair keyPair = KeyUtils.generateKeypair(KeyAlgorithm.EC, 256); - X500Principal subject = new X500Principal("CN=subject" + i); - return X509CertificateBuilder.fromKeypair(keyPair, - subject, - Instant.now(), - Instant.now().plusSeconds(1), - SignatureAlgorithm.SHA512_WITH_ECDSA, - BigInteger.valueOf(1)) - .build(); - }) - .collect(Collectors.toUnmodifiableList()); - - assertEquals(List.of(), applicationPackage.trustedCertificates()); - for (int i = 0; i < certificates.size(); i++) { - applicationPackage = applicationPackage.withTrustedCertificate(certificates.get(i)); - assertEquals(certificates.subList(0, i + 1), applicationPackage.trustedCertificates()); - } - } - -} diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/integration/ApplicationStoreMock.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/integration/ApplicationStoreMock.java index 8ed38761c95..e025a3bea4f 100644 --- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/integration/ApplicationStoreMock.java +++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/integration/ApplicationStoreMock.java @@ -12,6 +12,8 @@ import com.yahoo.vespa.hosted.controller.api.integration.deployment.RevisionId; import com.yahoo.vespa.hosted.controller.api.integration.deployment.TesterId; import com.yahoo.vespa.hosted.controller.application.TenantAndApplicationId; +import java.io.ByteArrayInputStream; +import java.io.InputStream; import java.time.Instant; import java.util.Map; import java.util.NavigableMap; @@ -46,15 +48,14 @@ public class ApplicationStoreMock implements ApplicationStore { } @Override - public byte[] get(DeploymentId deploymentId, RevisionId revisionId) { + public InputStream stream(DeploymentId deploymentId, RevisionId revisionId) { if ( ! revisionId.isProduction()) - return requireNonNull(devStore.get(deploymentId)); + return new ByteArrayInputStream(devStore.get(deploymentId)); TenantAndApplicationId tenantAndApplicationId = TenantAndApplicationId.from(deploymentId.applicationId()); byte[] bytes = store.get(appId(tenantAndApplicationId.tenant(), tenantAndApplicationId.application())).get(revisionId); - if (bytes == null) - throw new NotExistsException("No " + revisionId + " found for " + tenantAndApplicationId); - return bytes; + if (bytes == null) throw new NotExistsException("No " + revisionId + " found for " + tenantAndApplicationId); + return new ByteArrayInputStream(bytes); } @Override @@ -96,8 +97,8 @@ public class ApplicationStoreMock implements ApplicationStore { } @Override - public byte[] getTester(TenantName tenant, ApplicationName application, RevisionId revision) { - return requireNonNull(store.get(testerId(tenant, application)).get(revision)); + public InputStream streamTester(TenantName tenant, ApplicationName application, RevisionId revision) { + return new ByteArrayInputStream(store.get(testerId(tenant, application)).get(revision)); } diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/integration/ConfigServerMock.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/integration/ConfigServerMock.java index 07d9efdf8fc..eaa178c9727 100644 --- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/integration/ConfigServerMock.java +++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/integration/ConfigServerMock.java @@ -42,9 +42,12 @@ import com.yahoo.vespa.hosted.controller.api.integration.noderepository.RestartF import com.yahoo.vespa.hosted.controller.api.integration.secrets.TenantSecretStore; import com.yahoo.vespa.hosted.controller.application.SystemApplication; import com.yahoo.vespa.hosted.controller.application.pkg.ApplicationPackage; +import wiremock.org.checkerframework.checker.units.qual.A; import java.io.ByteArrayInputStream; +import java.io.IOException; import java.io.InputStream; +import java.io.UncheckedIOException; import java.net.URI; import java.time.Duration; import java.time.Instant; @@ -376,6 +379,13 @@ public class ConfigServerMock extends AbstractComponent implements ConfigServer @Override public PreparedApplication deploy(DeploymentData deployment) { + ApplicationPackage appPackage; + try (InputStream in = deployment.applicationPackage()) { + appPackage = new ApplicationPackage(in.readAllBytes()); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } lastPrepareVersion = deployment.platform(); if (prepareException != null) prepareException.accept(ApplicationId.from(deployment.instance().tenant(), @@ -383,8 +393,9 @@ public class ConfigServerMock extends AbstractComponent implements ConfigServer deployment.instance().instance())); DeploymentId id = new DeploymentId(deployment.instance(), deployment.zone()); - applications.put(id, new Application(id.applicationId(), lastPrepareVersion, new ApplicationPackage(deployment.applicationPackage()))); + applications.put(id, new Application(id.applicationId(), lastPrepareVersion, appPackage)); ClusterSpec.Id cluster = ClusterSpec.Id.from("default"); + deployment.endpointCertificateMetadata(); // Supplier with side effects >_< if (nodeRepository().list(id.zoneId(), NodeFilter.all().applications(id.applicationId())).isEmpty()) provision(id.zoneId(), id.applicationId(), cluster); diff --git a/hosted-api/src/main/java/ai/vespa/hosted/api/MultiPartStreamer.java b/hosted-api/src/main/java/ai/vespa/hosted/api/MultiPartStreamer.java index f1cbc027e17..c47fc60e58b 100644 --- a/hosted-api/src/main/java/ai/vespa/hosted/api/MultiPartStreamer.java +++ b/hosted-api/src/main/java/ai/vespa/hosted/api/MultiPartStreamer.java @@ -15,6 +15,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; import java.util.Collections; +import java.util.Enumeration; import java.util.List; import java.util.UUID; import java.util.function.Supplier; @@ -89,10 +90,12 @@ public class MultiPartStreamer { /** Returns an input stream which is an aggregate of all current parts in this, plus an end marker. */ public InputStream data() { - InputStream aggregate = new SequenceInputStream(Collections.enumeration(Stream.concat(streams.stream().map(Supplier::get), - Stream.of(end())) - .collect(Collectors.toList()))); - + InputStream aggregate = new SequenceInputStream(new Enumeration<>() { + final int j = streams.size(); + int i = -1; + @Override public boolean hasMoreElements() { return i < j; } + @Override public InputStream nextElement() { return ++i < j ? streams.get(i).get() : end(); } + }); try { if (aggregate.skip(2) != 2)// This should never happen, as the first stream is a ByteArrayInputStream. throw new IllegalStateException("Failed skipping extraneous bytes."); diff --git a/http-client/src/main/java/ai/vespa/hosted/client/AbstractHttpClient.java b/http-client/src/main/java/ai/vespa/hosted/client/AbstractHttpClient.java index ed3fee101ed..48bbffc7e37 100644 --- a/http-client/src/main/java/ai/vespa/hosted/client/AbstractHttpClient.java +++ b/http-client/src/main/java/ai/vespa/hosted/client/AbstractHttpClient.java @@ -173,7 +173,7 @@ public abstract class AbstractHttpClient implements HttpClient { @Override public HttpClient.RequestBuilder body(byte[] json) { - return body(HttpEntities.create(json, ContentType.APPLICATION_JSON)); + return body(() -> HttpEntities.create(json, ContentType.APPLICATION_JSON)); } @Override diff --git a/http-client/src/main/java/ai/vespa/hosted/client/HttpClient.java b/http-client/src/main/java/ai/vespa/hosted/client/HttpClient.java index ea8328ed793..4da887f0cbb 100644 --- a/http-client/src/main/java/ai/vespa/hosted/client/HttpClient.java +++ b/http-client/src/main/java/ai/vespa/hosted/client/HttpClient.java @@ -77,12 +77,6 @@ public interface HttpClient extends Closeable { /** Sets the request body as UTF-8 application/json. */ RequestBuilder body(byte[] json); - /** Sets the request body. */ - default RequestBuilder body(HttpEntity entity) { - if (entity.isRepeatable()) return body(() -> entity); - throw new IllegalArgumentException("entitiy must be repeatable, or a supplier must be used"); - } - /** Sets the request body. */ RequestBuilder body(Supplier entity); diff --git a/vespajlib/abi-spec.json b/vespajlib/abi-spec.json index a22e24aafc2..8a2e68a8d8c 100644 --- a/vespajlib/abi-spec.json +++ b/vespajlib/abi-spec.json @@ -3833,6 +3833,7 @@ "public" ], "methods" : [ + "public void (java.util.function.Supplier)", "public void (java.util.function.Supplier, com.yahoo.yolean.concurrent.Memoized$Closer)", "public static com.yahoo.yolean.concurrent.Memoized of(java.util.function.Supplier)", "public static com.yahoo.yolean.concurrent.Memoized combine(com.yahoo.yolean.concurrent.Memoized, java.util.function.Function, com.yahoo.yolean.concurrent.Memoized$Closer)", diff --git a/vespajlib/src/main/java/com/yahoo/compress/ArchiveStreamReader.java b/vespajlib/src/main/java/com/yahoo/compress/ArchiveStreamReader.java index e65a645f5be..f8faf655415 100644 --- a/vespajlib/src/main/java/com/yahoo/compress/ArchiveStreamReader.java +++ b/vespajlib/src/main/java/com/yahoo/compress/ArchiveStreamReader.java @@ -136,8 +136,8 @@ public class ArchiveStreamReader implements AutoCloseable { // Commons Compress only has limited support for symlinks as they are only detected when the ZIP file is read // through org.apache.commons.compress.archivers.zip.ZipFile. This is not the case in this class, because it must // support reading ZIP files from generic input streams. The check below thus always returns false. - if (entry instanceof ZipArchiveEntry) return ((ZipArchiveEntry) entry).isUnixSymlink(); - if (entry instanceof TarArchiveEntry) return ((TarArchiveEntry) entry).isSymbolicLink(); + if (entry instanceof ZipArchiveEntry zipEntry) return zipEntry.isUnixSymlink(); + if (entry instanceof TarArchiveEntry tarEntry) return tarEntry.isSymbolicLink(); throw new IllegalArgumentException("Unsupported archive entry " + entry.getClass().getSimpleName() + ", cannot check for symbolic link"); } diff --git a/vespajlib/src/main/java/com/yahoo/io/LazyInputStream.java b/vespajlib/src/main/java/com/yahoo/io/LazyInputStream.java new file mode 100644 index 00000000000..3ff7ada6b59 --- /dev/null +++ b/vespajlib/src/main/java/com/yahoo/io/LazyInputStream.java @@ -0,0 +1,53 @@ +package com.yahoo.io; + +import java.io.IOException; +import java.io.InputStream; +import java.util.function.Supplier; + +/** + * Input stream wrapping an input stream supplier, which doesn't have content yet at declaration time. + * + * @author jonmv + */ +public class LazyInputStream extends InputStream { + + private Supplier source; + private InputStream delegate; + + public LazyInputStream(Supplier source) { + this.source = source; + } + + private InputStream in() { + if (delegate == null) { + delegate = source.get(); + source = null; + } + return delegate; + } + + @Override + public int read() throws IOException { return in().read(); } + + @Override + public int read(byte[] b, int off, int len) throws IOException { return in().read(b, off, len); } + + @Override + public long skip(long n) throws IOException { return in().skip(n); } + + @Override + public int available() throws IOException { return in().available(); } + + @Override + public void close() throws IOException { in().close(); } + + @Override + public synchronized void mark(int readlimit) { in().mark(readlimit); } + + @Override + public synchronized void reset() throws IOException { in().reset(); } + + @Override + public boolean markSupported() { return in().markSupported(); } + +} diff --git a/vespajlib/src/main/java/com/yahoo/yolean/concurrent/Memoized.java b/vespajlib/src/main/java/com/yahoo/yolean/concurrent/Memoized.java index 8e2b7b7a7eb..ba5ef7bab2d 100644 --- a/vespajlib/src/main/java/com/yahoo/yolean/concurrent/Memoized.java +++ b/vespajlib/src/main/java/com/yahoo/yolean/concurrent/Memoized.java @@ -34,15 +34,23 @@ public class Memoized implements Supplier, AutoClosea private volatile T wrapped; private Supplier factory; + /** Returns a new Memoized which has no close method. */ + public Memoized(Supplier factory) { + this(factory, __ -> { }); + } + + /** Returns a new Memoized with the given factory and closer. */ public Memoized(Supplier factory, Closer closer) { this.factory = requireNonNull(factory); this.closer = requireNonNull(closer); } + /** Returns a generic AutoCloseable Memoized with the given AutoCloseable-supplier. */ public static Memoized of(Supplier factory) { return new Memoized<>(factory, AutoCloseable::close); } + /** Composes the given memoized with a function taking its output as an argument to produce a new Memoized, with the given closer. */ public static Memoized combine(Memoized inner, Function outer, Closer closer) { return new Memoized<>(() -> outer.apply(inner.get()), compose(closer, inner::close)); } -- cgit v1.2.3