diff options
89 files changed, 1062 insertions, 4349 deletions
diff --git a/.gitignore b/.gitignore index c77fe07eee7..adc898a7266 100644 --- a/.gitignore +++ b/.gitignore @@ -40,12 +40,10 @@ Testing /build.ninja /rules.ninja *_test_app -/hadoop/dependency-reduced-pom.xml /mvnw /mvnw.cmd /mvnwDebug /mvnwDebug.cmd -/vespa-hadoop/dependency-reduced-pom.xml .preprocessed/ .DS_Store install_manifest.txt diff --git a/config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentSpec.java b/config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentSpec.java index c9ec9780fad..f69bd5fa475 100644 --- a/config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentSpec.java +++ b/config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentSpec.java @@ -194,7 +194,7 @@ public class DeploymentSpec { public DeploymentInstanceSpec requireInstance(InstanceName name) { Optional<DeploymentInstanceSpec> instance = instance(name); if (instance.isEmpty()) - throw new IllegalArgumentException("No instance '" + name + "' in deployment.xml'. Instances: " + + throw new IllegalArgumentException("No instance '" + name + "' in deployment.xml. Instances: " + instances().stream().map(spec -> spec.name().toString()).collect(Collectors.joining(","))); return instance.get(); } diff --git a/config-proxy/src/main/sh/vespa-config-ctl.sh b/config-proxy/src/main/sh/vespa-config-ctl.sh index 63aaf11280f..be141561b07 100755 --- a/config-proxy/src/main/sh/vespa-config-ctl.sh +++ b/config-proxy/src/main/sh/vespa-config-ctl.sh @@ -122,7 +122,7 @@ case $1 in java ${jvmopts} \ -XX:+ExitOnOutOfMemoryError $(getJavaOptionsIPV46) \ -Dproxyconfigsources="${configsources}" \ - -Djava.io.tmpdir=${VESPA_HOME}/tmp \ + -Djava.io.tmpdir=${VESPA_HOME}/var/tmp \ ${userargs} \ -XX:ActiveProcessorCount=2 \ -cp $cp com.yahoo.vespa.config.proxy.ProxyServer 19090 diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java index b36e8d2d37c..d05640ef818 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java @@ -216,19 +216,6 @@ public class SessionRepository { return List.copyOf(localSessionCache.values()); } - public Set<LocalSession> getLocalSessionsFromFileSystem() { - File[] sessions = tenantFileSystemDirs.sessionsPath().listFiles(sessionApplicationsFilter); - if (sessions == null) return Set.of(); - - Set<LocalSession> sessionIds = new HashSet<>(); - for (File session : sessions) { - long sessionId = Long.parseLong(session.getName()); - LocalSession localSession = getSessionFromFile(sessionId); - sessionIds.add(localSession); - } - return sessionIds; - } - private LocalSession getSessionFromFile(long sessionId) { SessionZooKeeperClient sessionZKClient = createSessionZooKeeperClient(sessionId); File sessionDir = getAndValidateExistingSessionAppDir(sessionId); @@ -377,10 +364,7 @@ public class SessionRepository { } public int deleteExpiredRemoteSessions(Clock clock) { - Duration expiryTime = configserverConfig.hostedVespa() - ? sessionLifetime.multipliedBy(2) - : sessionLifetime.multipliedBy(12); // TODO: Remove when tested more (Oct. 2022 at the latest) - + Duration expiryTime = sessionLifetime.multipliedBy(2); List<Long> remoteSessionsFromZooKeeper = getRemoteSessionsFromZooKeeper(); log.log(Level.FINE, () -> "Remote sessions for tenant " + tenantName + ": " + remoteSessionsFromZooKeeper); diff --git a/configserver/src/main/sh/start-configserver b/configserver/src/main/sh/start-configserver index 8f515a4c309..97ccae9125f 100755 --- a/configserver/src/main/sh/start-configserver +++ b/configserver/src/main/sh/start-configserver @@ -182,7 +182,7 @@ vespa-run-as-vespa-user vespa-runserver -s ${VESPA_SERVICE_NAME} -r 30 -p $pidfi --add-opens=java.base/java.nio=ALL-UNNAMED \ --add-opens=java.base/jdk.internal.loader=ALL-UNNAMED \ --add-opens=java.base/sun.security.ssl=ALL-UNNAMED \ - -Djava.io.tmpdir=${VESPA_HOME}/tmp \ + -Djava.io.tmpdir=${VESPA_HOME}/var/tmp \ -Djava.library.path=${VESPA_HOME}/lib64 \ -Djava.security.properties=${VESPA_HOME}/conf/vespa/java.security.override \ -Djava.awt.headless=true \ diff --git a/container-disc/src/main/sh/vespa-start-container-daemon.sh b/container-disc/src/main/sh/vespa-start-container-daemon.sh index 09a873f06f9..bd218ba176d 100755 --- a/container-disc/src/main/sh/vespa-start-container-daemon.sh +++ b/container-disc/src/main/sh/vespa-start-container-daemon.sh @@ -274,7 +274,7 @@ exec $numactlcmd $envcmd java \ --add-opens=java.base/java.nio=ALL-UNNAMED \ --add-opens=java.base/jdk.internal.loader=ALL-UNNAMED \ --add-opens=java.base/sun.security.ssl=ALL-UNNAMED \ - -Djava.io.tmpdir="${VESPA_HOME}/tmp" \ + -Djava.io.tmpdir="${VESPA_HOME}/var/tmp" \ -Djava.library.path="${VESPA_HOME}/lib64" \ -Djava.security.properties=${VESPA_HOME}/conf/vespa/java.security.override \ -Djava.awt.headless=true \ diff --git a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/application/v4/model/DeploymentData.java b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/application/v4/model/DeploymentData.java index d3331c3cfd4..63c744c385d 100644 --- a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/application/v4/model/DeploymentData.java +++ b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/application/v4/model/DeploymentData.java @@ -12,12 +12,14 @@ import com.yahoo.vespa.hosted.controller.api.integration.billing.Quota; import com.yahoo.vespa.hosted.controller.api.integration.certificates.EndpointCertificateMetadata; import com.yahoo.vespa.hosted.controller.api.integration.configserver.ContainerEndpoint; import com.yahoo.vespa.hosted.controller.api.integration.secrets.TenantSecretStore; +import com.yahoo.yolean.concurrent.Memoized; +import java.io.InputStream; import java.security.cert.X509Certificate; import java.util.List; -import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.function.Supplier; import static java.util.Objects.requireNonNull; @@ -32,40 +34,41 @@ public class DeploymentData { private final ApplicationId instance; private final Tags tags; private final ZoneId zone; - private final byte[] applicationPackage; + private final Supplier<InputStream> applicationPackage; private final Version platform; private final Set<ContainerEndpoint> containerEndpoints; - private final Optional<EndpointCertificateMetadata> endpointCertificateMetadata; + private final Supplier<Optional<EndpointCertificateMetadata>> endpointCertificateMetadata; private final Optional<DockerImage> dockerImageRepo; private final Optional<AthenzDomain> athenzDomain; - private final Quota quota; + private final Supplier<Quota> quota; private final List<TenantSecretStore> tenantSecretStores; private final List<X509Certificate> operatorCertificates; - private final Optional<CloudAccount> cloudAccount; + private final Supplier<Optional<CloudAccount>> cloudAccount; private final boolean dryRun; - public DeploymentData(ApplicationId instance, Tags tags, ZoneId zone, byte[] applicationPackage, Version platform, + public DeploymentData(ApplicationId instance, Tags tags, ZoneId zone, Supplier<InputStream> applicationPackage, Version platform, Set<ContainerEndpoint> containerEndpoints, - Optional<EndpointCertificateMetadata> endpointCertificateMetadata, + Supplier<Optional<EndpointCertificateMetadata>> endpointCertificateMetadata, Optional<DockerImage> dockerImageRepo, Optional<AthenzDomain> athenzDomain, - Quota quota, + Supplier<Quota> quota, List<TenantSecretStore> tenantSecretStores, List<X509Certificate> operatorCertificates, - Optional<CloudAccount> cloudAccount, boolean dryRun) { + Supplier<Optional<CloudAccount>> cloudAccount, + boolean dryRun) { this.instance = requireNonNull(instance); this.tags = requireNonNull(tags); this.zone = requireNonNull(zone); this.applicationPackage = requireNonNull(applicationPackage); this.platform = requireNonNull(platform); this.containerEndpoints = Set.copyOf(requireNonNull(containerEndpoints)); - this.endpointCertificateMetadata = requireNonNull(endpointCertificateMetadata); + this.endpointCertificateMetadata = new Memoized<>(requireNonNull(endpointCertificateMetadata)); this.dockerImageRepo = requireNonNull(dockerImageRepo); this.athenzDomain = athenzDomain; - this.quota = quota; + this.quota = new Memoized<>(requireNonNull(quota)); this.tenantSecretStores = List.copyOf(requireNonNull(tenantSecretStores)); this.operatorCertificates = List.copyOf(requireNonNull(operatorCertificates)); - this.cloudAccount = Objects.requireNonNull(cloudAccount); + this.cloudAccount = new Memoized<>(requireNonNull(cloudAccount)); this.dryRun = dryRun; } @@ -79,8 +82,8 @@ public class DeploymentData { return zone; } - public byte[] applicationPackage() { - return applicationPackage; + public InputStream applicationPackage() { + return applicationPackage.get(); } public Version platform() { @@ -92,7 +95,7 @@ public class DeploymentData { } public Optional<EndpointCertificateMetadata> endpointCertificateMetadata() { - return endpointCertificateMetadata; + return endpointCertificateMetadata.get(); } public Optional<DockerImage> dockerImageRepo() { @@ -104,7 +107,7 @@ public class DeploymentData { } public Quota quota() { - return quota; + return quota.get(); } public List<TenantSecretStore> tenantSecretStores() { @@ -116,9 +119,11 @@ public class DeploymentData { } public Optional<CloudAccount> cloudAccount() { - return cloudAccount; + return cloudAccount.get(); } - public boolean isDryRun() { return dryRun; } + public boolean isDryRun() { + return dryRun; + } } diff --git a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/deployment/ApplicationStore.java b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/deployment/ApplicationStore.java index c4db0de539e..71ec07bc2e6 100644 --- a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/deployment/ApplicationStore.java +++ b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/deployment/ApplicationStore.java @@ -5,6 +5,9 @@ import com.yahoo.config.provision.ApplicationName; import com.yahoo.config.provision.TenantName; import com.yahoo.vespa.hosted.controller.api.identifiers.DeploymentId; +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; import java.time.Instant; import java.util.Optional; @@ -17,7 +20,16 @@ import java.util.Optional; public interface ApplicationStore { /** Returns the application package of the given revision. */ - byte[] get(DeploymentId deploymentId, RevisionId revisionId); + default byte[] get(DeploymentId deploymentId, RevisionId revisionId) { + try (InputStream stream = stream(deploymentId, revisionId)) { + return stream.readAllBytes(); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + InputStream stream(DeploymentId deploymentId, RevisionId revisionId); /** Returns the application package diff, compared to the previous build, for the given tenant, application and build number */ Optional<byte[]> getDiff(TenantName tenantName, ApplicationName applicationName, long buildNumber); @@ -43,7 +55,16 @@ public interface ApplicationStore { void removeAll(TenantName tenant, ApplicationName application); /** Returns the tester application package of the given revision. Does NOT contain the services.xml. */ - byte[] getTester(TenantName tenant, ApplicationName application, RevisionId revision); + default byte[] getTester(TenantName tenant, ApplicationName application, RevisionId revision) { + try (InputStream stream = streamTester(tenant, application, revision)) { + return stream.readAllBytes(); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + InputStream streamTester(TenantName tenantName, ApplicationName applicationName, RevisionId revision); /** Returns the application package diff, compared to the previous build, for the given deployment and build number */ Optional<byte[]> getDevDiff(DeploymentId deploymentId, long buildNumber); diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/ApplicationController.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/ApplicationController.java index 34a7ae89dd2..e09e1f04b8e 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/ApplicationController.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/ApplicationController.java @@ -15,7 +15,6 @@ import com.yahoo.config.provision.InstanceName; import com.yahoo.config.provision.Tags; import com.yahoo.config.provision.TenantName; import com.yahoo.config.provision.zone.ZoneId; -import com.yahoo.log.LogLevel; import com.yahoo.text.Text; import com.yahoo.transaction.Mutex; import com.yahoo.vespa.athenz.api.AthenzDomain; @@ -39,7 +38,6 @@ import com.yahoo.vespa.hosted.controller.api.integration.configserver.ConfigServ import com.yahoo.vespa.hosted.controller.api.integration.configserver.ContainerEndpoint; import com.yahoo.vespa.hosted.controller.api.integration.configserver.DeploymentResult; import com.yahoo.vespa.hosted.controller.api.integration.configserver.DeploymentResult.LogEntry; -import com.yahoo.vespa.hosted.controller.api.integration.configserver.Log; import com.yahoo.vespa.hosted.controller.api.integration.configserver.Node; import com.yahoo.vespa.hosted.controller.api.integration.configserver.NodeFilter; import com.yahoo.vespa.hosted.controller.api.integration.deployment.ApplicationStore; @@ -58,6 +56,7 @@ import com.yahoo.vespa.hosted.controller.application.QuotaUsage; import com.yahoo.vespa.hosted.controller.application.SystemApplication; import com.yahoo.vespa.hosted.controller.application.TenantAndApplicationId; import com.yahoo.vespa.hosted.controller.application.pkg.ApplicationPackage; +import com.yahoo.vespa.hosted.controller.application.pkg.ApplicationPackageStream; import com.yahoo.vespa.hosted.controller.application.pkg.ApplicationPackageValidator; import com.yahoo.vespa.hosted.controller.athenz.impl.AthenzFacade; import com.yahoo.vespa.hosted.controller.certificate.EndpointCertificates; @@ -79,6 +78,7 @@ import com.yahoo.vespa.hosted.controller.versions.VespaVersion; import com.yahoo.vespa.hosted.controller.versions.VespaVersion.Confidence; import com.yahoo.yolean.Exceptions; +import java.io.ByteArrayInputStream; import java.security.Principal; import java.security.cert.X509Certificate; import java.time.Clock; @@ -87,7 +87,6 @@ import java.time.Instant; import java.util.Collection; import java.util.Collections; import java.util.Comparator; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -98,6 +97,7 @@ import java.util.TreeMap; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Predicate; +import java.util.function.Supplier; import java.util.logging.Level; import java.util.logging.Logger; import java.util.stream.Collectors; @@ -489,9 +489,6 @@ public class ApplicationController { DeploymentId deployment = new DeploymentId(job.application(), zone); try (Mutex deploymentLock = lockForDeployment(job.application(), zone)) { - Set<ContainerEndpoint> containerEndpoints; - Optional<EndpointCertificateMetadata> endpointCertificateMetadata; - Run run = controller.jobController().last(job) .orElseThrow(() -> new IllegalStateException("No known run of '" + job + "'")); @@ -500,30 +497,32 @@ public class ApplicationController { Version platform = run.versions().sourcePlatform().filter(__ -> deploySourceVersions).orElse(run.versions().targetPlatform()); RevisionId revision = run.versions().sourceRevision().filter(__ -> deploySourceVersions).orElse(run.versions().targetRevision()); - ApplicationPackage applicationPackage = new ApplicationPackage(applicationStore.get(deployment, revision)); - + ApplicationPackageStream applicationPackage = new ApplicationPackageStream(() -> applicationStore.stream(deployment, revision), + ApplicationPackageStream.addingCertificate(run.testerCertificate())); AtomicReference<RevisionId> lastRevision = new AtomicReference<>(); Instance instance; + Set<ContainerEndpoint> containerEndpoints; try (Mutex lock = lock(applicationId)) { LockedApplication application = new LockedApplication(requireApplication(applicationId), lock); application.get().revisions().last().map(ApplicationVersion::id).ifPresent(lastRevision::set); instance = application.get().require(job.application().instance()); - if ( ! applicationPackage.trustedCertificates().isEmpty() - && run.testerCertificate().isPresent()) - applicationPackage = applicationPackage.withTrustedCertificate(run.testerCertificate().get()); - - endpointCertificateMetadata = endpointCertificates.getMetadata(instance, zone, applicationPackage.deploymentSpec()); - containerEndpoints = controller.routing().of(deployment).prepare(application); } // Release application lock while doing the deployment, which is a lengthy task. + Supplier<Optional<EndpointCertificateMetadata>> endpointCertificateMetadata = () -> { + try (Mutex lock = lock(applicationId)) { + Optional<EndpointCertificateMetadata> data = endpointCertificates.getMetadata(instance, zone, applicationPackage.truncatedPackage().deploymentSpec()); + data.ifPresent(e -> deployLogger.accept("Using CA signed certificate version %s".formatted(e.version()))); + return data; + } + }; + // Carry out deployment without holding the application lock. DeploymentResult result = deploy(job.application(), instance.tags(), applicationPackage, zone, platform, containerEndpoints, endpointCertificateMetadata, run.isDryRun()); - endpointCertificateMetadata.ifPresent(e -> deployLogger.accept("Using CA signed certificate version %s".formatted(e.version()))); // Record the quota usage for this application var quotaUsage = deploymentQuotaUsage(zone, job.application()); @@ -544,8 +543,10 @@ public class ApplicationController { .distinct() .collect(Collectors.toList())) .orElseGet(List::of); - if (warnings.isEmpty()) controller.notificationsDb().removeNotification(source, Notification.Type.applicationPackage); - else controller.notificationsDb().setNotification(source, Notification.Type.applicationPackage, Notification.Level.warning, warnings); + if (warnings.isEmpty()) + controller.notificationsDb().removeNotification(source, Notification.Type.applicationPackage); + else + controller.notificationsDb().setNotification(source, Notification.Type.applicationPackage, Notification.Level.warning, warnings); } lockApplicationOrThrow(applicationId, application -> @@ -606,23 +607,23 @@ public class ApplicationController { /** Deploy a system application to given zone */ public DeploymentResult deploySystemApplicationPackage(SystemApplication application, ZoneId zone, Version version) { if (application.hasApplicationPackage()) { - ApplicationPackage applicationPackage = new ApplicationPackage( - artifactRepository.getSystemApplicationPackage(application.id(), zone, version) + ApplicationPackageStream applicationPackage = new ApplicationPackageStream( + () -> new ByteArrayInputStream(artifactRepository.getSystemApplicationPackage(application.id(), zone, version)) ); - return deploy(application.id(), Tags.empty(), applicationPackage, zone, version, Set.of(), /* No application cert */ Optional.empty(), false); + return deploy(application.id(), Tags.empty(), applicationPackage, zone, version, Set.of(), Optional::empty, false); } else { throw new RuntimeException("This system application does not have an application package: " + application.id().toShortString()); } } /** Deploys the given tester application to the given zone. */ - public DeploymentResult deployTester(TesterId tester, ApplicationPackage applicationPackage, ZoneId zone, Version platform) { - return deploy(tester.id(), Tags.empty(), applicationPackage, zone, platform, Set.of(), /* No application cert for tester*/ Optional.empty(), false); + public DeploymentResult deployTester(TesterId tester, ApplicationPackageStream applicationPackage, ZoneId zone, Version platform) { + return deploy(tester.id(), Tags.empty(), applicationPackage, zone, platform, Set.of(), Optional::empty, false); } - private DeploymentResult deploy(ApplicationId application, Tags tags, ApplicationPackage applicationPackage, + private DeploymentResult deploy(ApplicationId application, Tags tags, ApplicationPackageStream applicationPackage, ZoneId zone, Version platform, Set<ContainerEndpoint> endpoints, - Optional<EndpointCertificateMetadata> endpointCertificateMetadata, + Supplier<Optional<EndpointCertificateMetadata>> endpointCertificateMetadata, boolean dryRun) { DeploymentId deployment = new DeploymentId(application, zone); try { @@ -638,13 +639,8 @@ public class ApplicationController { .filter(tenant-> tenant instanceof AthenzTenant) .map(tenant -> ((AthenzTenant)tenant).domain()); - if (zone.environment().isManuallyDeployed()) - controller.applications().applicationStore().putMeta(deployment, - clock.instant(), - applicationPackage.metaDataZip()); - - Quota deploymentQuota = DeploymentQuotaCalculator.calculate(billingController.getQuota(application.tenant()), - asList(application.tenant()), application, zone, applicationPackage.deploymentSpec()); + Supplier<Quota> deploymentQuota = () -> DeploymentQuotaCalculator.calculate(billingController.getQuota(application.tenant()), + asList(application.tenant()), application, zone, applicationPackage.truncatedPackage().deploymentSpec()); List<TenantSecretStore> tenantSecretStores = controller.tenants() .get(application.tenant()) @@ -654,9 +650,9 @@ public class ApplicationController { List<X509Certificate> operatorCertificates = controller.supportAccess().activeGrantsFor(deployment).stream() .map(SupportAccessGrant::certificate) .collect(toList()); - Optional<CloudAccount> cloudAccount = decideCloudAccountOf(deployment, applicationPackage.deploymentSpec()); + Supplier<Optional<CloudAccount>> cloudAccount = () -> decideCloudAccountOf(deployment, applicationPackage.truncatedPackage().deploymentSpec()); ConfigServer.PreparedApplication preparedApplication = - configServer.deploy(new DeploymentData(application, tags, zone, applicationPackage.zippedContent(), platform, + configServer.deploy(new DeploymentData(application, tags, zone, applicationPackage::zipStream, platform, endpoints, endpointCertificateMetadata, dockerImageRepo, domain, deploymentQuota, tenantSecretStores, operatorCertificates, cloudAccount, dryRun)); @@ -665,7 +661,12 @@ public class ApplicationController { } finally { // Even if prepare fails, routing configuration may need to be updated if ( ! application.instance().isTester()) { - controller.routing().of(deployment).configure(applicationPackage.deploymentSpec()); + controller.routing().of(deployment).configure(applicationPackage.truncatedPackage().deploymentSpec()); + if (zone.environment().isManuallyDeployed()) + controller.applications().applicationStore().putMeta(deployment, + clock.instant(), + applicationPackage.truncatedPackage().metaDataZip()); + } } } diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackage.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackage.java index b99d825a779..53c78d7c8ec 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackage.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackage.java @@ -59,19 +59,19 @@ import static java.util.stream.Collectors.toMap; * A representation of the content of an application package. * Only meta-data content can be accessed as anything other than compressed data. * A package is identified by a hash of the content. - * - * This is immutable. - * + * * @author bratseth * @author jonmv */ public class ApplicationPackage { - private static final String trustedCertificatesFile = "security/clients.pem"; - private static final String buildMetaFile = "build-meta.json"; + static final String trustedCertificatesFile = "security/clients.pem"; + static final String buildMetaFile = "build-meta.json"; static final String deploymentFile = "deployment.xml"; - private static final String validationOverridesFile = "validation-overrides.xml"; + static final String validationOverridesFile = "validation-overrides.xml"; static final String servicesFile = "services.xml"; + static final Set<String> prePopulated = Set.of(deploymentFile, validationOverridesFile, servicesFile, buildMetaFile, trustedCertificatesFile); + private static Hasher hasher() { return Hashing.murmur3_128().newHasher(); } private final String bundleHash; @@ -101,7 +101,7 @@ public class ApplicationPackage { */ public ApplicationPackage(byte[] zippedContent, boolean requireFiles) { this.zippedContent = Objects.requireNonNull(zippedContent, "The application package content cannot be null"); - this.files = new ZipArchiveCache(zippedContent, Set.of(deploymentFile, validationOverridesFile, servicesFile, buildMetaFile, trustedCertificatesFile)); + this.files = new ZipArchiveCache(zippedContent, prePopulated); Optional<DeploymentSpec> deploymentSpec = files.get(deploymentFile).map(bytes -> new String(bytes, UTF_8)).map(DeploymentSpec::fromXml); if (requireFiles && deploymentSpec.isEmpty()) @@ -122,17 +122,6 @@ public class ApplicationPackage { preProcessAndPopulateCache(); } - /** Returns a copy of this with the given certificate appended. */ - public ApplicationPackage withTrustedCertificate(X509Certificate certificate) { - List<X509Certificate> trustedCertificates = new ArrayList<>(this.trustedCertificates); - trustedCertificates.add(certificate); - byte[] certificatesBytes = X509CertificateUtils.toPem(trustedCertificates).getBytes(UTF_8); - - ByteArrayOutputStream modified = new ByteArrayOutputStream(zippedContent.length + certificatesBytes.length); - ZipEntries.transferAndWrite(modified, new ByteArrayInputStream(zippedContent), trustedCertificatesFile, certificatesBytes); - return new ApplicationPackage(modified.toByteArray()); - } - /** Hash of all files and settings that influence what is deployed to config servers. */ public String bundleHash() { return bundleHash; @@ -295,7 +284,7 @@ public class ApplicationPackage { private Map<Path, Optional<byte[]>> read(Collection<String> names) { var entries = ZipEntries.from(zip, - name -> names.contains(name), + names::contains, maxSize, true) .asList().stream() diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackageStream.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackageStream.java new file mode 100644 index 00000000000..021064417ac --- /dev/null +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackageStream.java @@ -0,0 +1,265 @@ +package com.yahoo.vespa.hosted.controller.application.pkg; + +import com.yahoo.security.X509CertificateUtils; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.security.cert.X509Certificate; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Predicate; +import java.util.function.Supplier; +import java.util.function.UnaryOperator; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; +import java.util.zip.ZipOutputStream; + +import static com.yahoo.security.X509CertificateUtils.certificateListFromPem; +import static java.io.OutputStream.nullOutputStream; +import static java.lang.Math.min; +import static java.nio.charset.StandardCharsets.UTF_8; + +/** + * Wraps a zipped application package stream. + * This allows replacing content as the input stream is read. + * This also retains a truncated {@link ApplicationPackage}, containing only the specified set of files, + * which can be accessed when this stream is fully exhausted. + * + * @author jonmv + */ +public class ApplicationPackageStream { + + private final Supplier<Replacer> replacer; + private final Supplier<Predicate<String>> filter; + private final Supplier<InputStream> in; + private final AtomicReference<ApplicationPackage> truncatedPackage = new AtomicReference<>(); + + public static Supplier<Replacer> addingCertificate(Optional<X509Certificate> certificate) { + return certificate.map(cert -> Replacer.of(Map.of(ApplicationPackage.trustedCertificatesFile, + trustBytes -> append(trustBytes, cert)))) + .orElse(Replacer.of(Map.of())); + } + + static InputStream append(InputStream trustIn, X509Certificate cert) { + try { + List<X509Certificate> trusted = trustIn == null ? new ArrayList<>() + : new ArrayList<>(certificateListFromPem(new String(trustIn.readAllBytes(), UTF_8))); + trusted.add(cert); + return new ByteArrayInputStream(X509CertificateUtils.toPem(trusted).getBytes(UTF_8)); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + /** Stream that effectively copies the input stream to its {@link #truncatedPackage()} when exhausted. */ + public ApplicationPackageStream(Supplier<InputStream> in) { + this(in, () -> __ -> true, Map.of()); + } + + /** Stream that replaces the indicated entries, and copies all metadata files to its {@link #truncatedPackage()} when exhausted. */ + public ApplicationPackageStream(Supplier<InputStream> in, Supplier<Replacer> replacer) { + this(in, () -> name -> ApplicationPackage.prePopulated.contains(name) || name.endsWith(".xml"), replacer); + } + + /** Stream that replaces the indicated entries, and copies the filtered entries to its {@link #truncatedPackage()} when exhausted. */ + public ApplicationPackageStream(Supplier<InputStream> in, Supplier<Predicate<String>> truncation, Map<String, UnaryOperator<InputStream>> replacements) { + this(in, truncation, Replacer.of(replacements)); + } + + /** Stream that uses the given replacer to modify content, and copies the filtered entries to its {@link #truncatedPackage()} when exhausted. */ + public ApplicationPackageStream(Supplier<InputStream> in, Supplier<Predicate<String>> truncation, Supplier<Replacer> replacer) { + this.in = in; + this.filter = truncation; + this.replacer = replacer; + } + + /** + * Returns a new stream continaing the zipped application package this wraps. Separate streams may exist concurrently, + * and the first to be exhausted will populate the truncated application package. + */ + public InputStream zipStream() { + return new Stream(in.get(), replacer.get(), filter.get(), truncatedPackage); + } + + /** + * Returns the application package backed by only the files indicated by the truncation filter. + * Throws if no instances of {@link #zipStream()} have been exhausted yet. + */ + public ApplicationPackage truncatedPackage() { + ApplicationPackage truncated = truncatedPackage.get(); + if (truncated == null) throw new IllegalStateException("must completely exhaust input before reading package"); + return truncated; + } + + private static class Stream extends InputStream { + + private final byte[] inBuffer = new byte[1 << 16]; + private final ByteArrayOutputStream teeOut = new ByteArrayOutputStream(1 << 16); + private final ZipOutputStream teeZip = new ZipOutputStream(teeOut); + private final ByteArrayOutputStream out = new ByteArrayOutputStream(1 << 16); + private final ZipOutputStream outZip = new ZipOutputStream(out); + private final AtomicReference<ApplicationPackage> truncatedPackage; + private final InputStream in; + private final ZipInputStream inZip; + private final Replacer replacer; + private final Predicate<String> filter; + private byte[] currentOut = new byte[0]; + private InputStream currentIn = InputStream.nullInputStream(); + private boolean includeCurrent = false; + private int pos = 0; + private boolean closed = false; + private boolean done = false; + + private Stream(InputStream in, Replacer replacer, Predicate<String> filter, AtomicReference<ApplicationPackage> truncatedPackage) { + this.in = in; + this.inZip = new ZipInputStream(in); + this.replacer = replacer; + this.filter = filter; + this.truncatedPackage = truncatedPackage; + } + + private void fill() throws IOException { + if (done) return; + while (out.size() == 0) { + // Exhaust current entry first. + int i, n = out.size(); + while (out.size() == 0 && (i = currentIn.read(inBuffer)) != -1) { + if (includeCurrent) teeZip.write(inBuffer, 0, i); + outZip.write(inBuffer, 0, i); + n += i; + } + + // Current entry exhausted, look for next. + if (n == 0) { + next(); + if (done) break; + } + } + + currentOut = out.toByteArray(); + out.reset(); + pos = 0; + } + + private void next() throws IOException { + if (includeCurrent) teeZip.closeEntry(); + outZip.closeEntry(); + + ZipEntry next = inZip.getNextEntry(); + String name; + InputStream content = null; + if (next == null) { + // We may still have replacements to fill in, but if we don't, we're done filling, forever! + name = replacer.next(); + if (name == null) { + outZip.close(); // This typically makes new output available, so must check for that after this. + teeZip.close(); + currentIn = nullInputStream(); + truncatedPackage.compareAndSet(null, new ApplicationPackage(teeOut.toByteArray())); + done = true; + return; + } + } + else { + name = next.getName(); + content = new FilterInputStream(inZip) { @Override public void close() { } }; // Protect inZip from replacements closing it. + } + + includeCurrent = truncatedPackage.get() == null && filter.test(name); + currentIn = replacer.modify(name, content); + if (currentIn == null) { + currentIn = InputStream.nullInputStream(); + } + else { + if (includeCurrent) teeZip.putNextEntry(new ZipEntry(name)); + outZip.putNextEntry(new ZipEntry(name)); + } + } + + @Override + public int read() throws IOException { + if (closed) throw new IOException("stream closed"); + if (pos == currentOut.length) { + fill(); + if (pos == currentOut.length) return -1; + } + return 0xff & currentOut[pos++]; + } + + @Override + public int read(byte[] out, int off, int len) throws IOException { + if (closed) throw new IOException("stream closed"); + if ((off | len | (off + len) | (out.length - (off + len))) < 0) throw new IndexOutOfBoundsException(); + if (pos == currentOut.length) { + fill(); + if (pos == currentOut.length) return -1; + } + int n = min(currentOut.length - pos, len); + System.arraycopy(currentOut, pos, out, off, n); + pos += n; + return n; + } + + @Override + public int available() throws IOException { + return pos == currentOut.length && done ? 0 : 1; + } + + @Override + public void close() { + if ( ! closed) try { + transferTo(nullOutputStream()); // Finish reading the zip, to populate the truncated package in case of errors. + in.transferTo(nullOutputStream()); // For some inane reason, ZipInputStream doesn't exhaust its wrapped input. + inZip.close(); + closed = true; + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + } + + /** Replaces entries in a zip stream as they are encountered, then appends remaining entries at the end. */ + public interface Replacer { + + /** Called when the entries of the original zip stream are exhausted. Return remaining names, or {@code null} when none left. */ + String next(); + + /** Modify content for a given name; return {@code null} for removal; in is {@code null} for entries not present in the input. */ + InputStream modify(String name, InputStream in); + + /** + * Wraps a map of fixed replacements, and: + * <ul> + * <li>Removes entries whose value is {@code null}.</li> + * <li>Modifies entries present in both input and the map.</li> + * <li>Appends entries present exclusively in the map.</li> + * <li>Writes all other entries as they are.</li> + * </ul> + */ + static Supplier<Replacer> of(Map<String, UnaryOperator<InputStream>> replacements) { + return () -> new Replacer() { + final Map<String, UnaryOperator<InputStream>> remaining = new HashMap<>(replacements); + @Override public String next() { + return remaining.isEmpty() ? null : remaining.keySet().iterator().next(); + } + @Override public InputStream modify(String name, InputStream in) { + UnaryOperator<InputStream> mapper = remaining.remove(name); + return mapper == null ? in : mapper.apply(in); + } + }; + } + + } + +} diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/TestPackage.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/TestPackage.java index 5b20c57fcca..17644d5e207 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/TestPackage.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/TestPackage.java @@ -18,15 +18,14 @@ import com.yahoo.text.Text; import com.yahoo.vespa.hosted.controller.api.integration.deployment.RunId; import com.yahoo.vespa.hosted.controller.api.integration.deployment.TesterCloud.Suite; import com.yahoo.vespa.hosted.controller.api.integration.deployment.TesterId; +import com.yahoo.vespa.hosted.controller.application.pkg.ApplicationPackageStream.Replacer; import com.yahoo.vespa.hosted.controller.config.ControllerConfig; import com.yahoo.vespa.hosted.controller.config.ControllerConfig.Steprunner.Testerapp; import com.yahoo.yolean.Exceptions; import javax.security.auth.x500.X500Principal; import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.UncheckedIOException; +import java.io.InputStream; import java.math.BigInteger; import java.security.KeyPair; import java.security.cert.X509Certificate; @@ -43,6 +42,8 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.UUID; +import java.util.function.Supplier; +import java.util.function.UnaryOperator; import java.util.jar.JarInputStream; import java.util.jar.Manifest; import java.util.regex.Pattern; @@ -53,8 +54,9 @@ import static com.yahoo.vespa.hosted.controller.api.integration.deployment.Teste import static com.yahoo.vespa.hosted.controller.api.integration.deployment.TesterCloud.Suite.system; import static com.yahoo.vespa.hosted.controller.application.pkg.ApplicationPackage.deploymentFile; import static com.yahoo.vespa.hosted.controller.application.pkg.ApplicationPackage.servicesFile; -import static com.yahoo.vespa.hosted.controller.application.pkg.ZipEntries.transferAndWrite; +import static java.io.InputStream.nullInputStream; import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.function.UnaryOperator.identity; import static java.util.stream.Collectors.groupingBy; import static java.util.stream.Collectors.mapping; import static java.util.stream.Collectors.toList; @@ -71,32 +73,14 @@ public class TestPackage { static final NodeResources DEFAULT_TESTER_RESOURCES_AWS = new NodeResources(2, 8, 50, 0.3, NodeResources.DiskSpeed.any); static final NodeResources DEFAULT_TESTER_RESOURCES = new NodeResources(1, 4, 50, 0.3, NodeResources.DiskSpeed.any); - private final ApplicationPackage applicationPackage; + private final ApplicationPackageStream applicationPackageStream; private final X509Certificate certificate; - public TestPackage(byte[] testPackage, boolean isPublicSystem, RunId id, Testerapp testerApp, + public TestPackage(Supplier<InputStream> inZip, boolean isPublicSystem, RunId id, Testerapp testerApp, DeploymentSpec spec, Instant certificateValidFrom, Duration certificateValidDuration) { - - // Copy contents of submitted application-test.zip, and ensure required directories exist within the zip. - Map<String, byte[]> entries = new HashMap<>(); - entries.put("artifacts/.ignore-" + UUID.randomUUID(), new byte[0]); - entries.put("tests/.ignore-" + UUID.randomUUID(), new byte[0]); - - entries.put(servicesFile, - servicesXml(! isPublicSystem, - certificateValidFrom != null, - hasLegacyTests(testPackage), - testerResourcesFor(id.type().zone(), spec.requireInstance(id.application().instance())), - testerApp)); - - entries.put(deploymentFile, - deploymentXml(id.tester(), - spec.athenzDomain(), - spec.requireInstance(id.application().instance()) - .athenzService(id.type().zone().environment(), id.type().zone().region()))); - + KeyPair keyPair; if (certificateValidFrom != null) { - KeyPair keyPair = KeyUtils.generateKeypair(KeyAlgorithm.RSA, 2048); + keyPair = KeyUtils.generateKeypair(KeyAlgorithm.RSA, 2048); X500Principal subject = new X500Principal("CN=" + id.tester().id().toFullString() + "." + id.type() + "." + id.number()); this.certificate = X509CertificateBuilder.fromKeypair(keyPair, subject, @@ -105,26 +89,60 @@ public class TestPackage { SignatureAlgorithm.SHA512_WITH_RSA, BigInteger.valueOf(1)) .build(); - entries.put("artifacts/key", KeyUtils.toPem(keyPair.getPrivate()).getBytes(UTF_8)); - entries.put("artifacts/cert", X509CertificateUtils.toPem(certificate).getBytes(UTF_8)); } else { + keyPair = null; this.certificate = null; } + this.applicationPackageStream = new ApplicationPackageStream(inZip, () -> __ -> false, () -> new Replacer() { + + // Initially skips all declared entries, ensuring they're generated and appended after all input entries. + final Map<String, UnaryOperator<InputStream>> entries = new HashMap<>(); + final Map<String, UnaryOperator<InputStream>> replacements = new HashMap<>(); + boolean hasLegacyTests = false; + + @Override + public String next() { + if (entries.isEmpty()) return null; + String next = entries.keySet().iterator().next(); + replacements.put(next, entries.remove(next)); + return next; + } - ByteArrayOutputStream buffer = new ByteArrayOutputStream(testPackage.length + 10_000); - transferAndWrite(buffer, new ByteArrayInputStream(testPackage), entries); - this.applicationPackage = new ApplicationPackage(buffer.toByteArray()); - } - - static boolean hasLegacyTests(byte[] testPackage) { - return ZipEntries.from(testPackage, __ -> true, 0, false).asList().stream() - .anyMatch(file -> file.name().startsWith("artifacts/") && file.name().endsWith("-tests.jar")); + @Override + public InputStream modify(String name, InputStream in) { + hasLegacyTests |= name.startsWith("artifacts/") && name.endsWith("-tests.jar"); + return entries.containsKey(name) ? null : replacements.getOrDefault(name, identity()).apply(in); + } + { + // Copy contents of submitted application-test.zip, and ensure required directories exist within the zip. + entries.put("artifacts/.ignore-" + UUID.randomUUID(), __ -> nullInputStream()); + entries.put("tests/.ignore-" + UUID.randomUUID(), __ -> nullInputStream()); + + entries.put(servicesFile, + __ -> new ByteArrayInputStream(servicesXml( ! isPublicSystem, + certificateValidFrom != null, + hasLegacyTests, + testerResourcesFor(id.type().zone(), spec.requireInstance(id.application().instance())), + testerApp))); + + entries.put(deploymentFile, + __ -> new ByteArrayInputStream(deploymentXml(id.tester(), + spec.athenzDomain(), + spec.requireInstance(id.application().instance()) + .athenzService(id.type().zone().environment(), id.type().zone().region())))); + + if (certificate != null) { + entries.put("artifacts/key", __ -> new ByteArrayInputStream(KeyUtils.toPem(keyPair.getPrivate()).getBytes(UTF_8))); + entries.put("artifacts/cert", __ -> new ByteArrayInputStream(X509CertificateUtils.toPem(certificate).getBytes(UTF_8))); + } + } + }); } - public ApplicationPackage asApplicationPackage() { - return applicationPackage; + public ApplicationPackageStream asApplicationPackage() { + return applicationPackageStream; } public X509Certificate certificate() { @@ -207,7 +225,7 @@ public class TestPackage { return new TestSummary(problems, suites); } - public static NodeResources testerResourcesFor(ZoneId zone, DeploymentInstanceSpec spec) { + static NodeResources testerResourcesFor(ZoneId zone, DeploymentInstanceSpec spec) { NodeResources nodeResources = spec.steps().stream() .filter(step -> step.concerns(zone.environment())) .findFirst() @@ -219,7 +237,7 @@ public class TestPackage { } /** Returns the generated services.xml content for the tester application. */ - public static byte[] servicesXml(boolean systemUsesAthenz, boolean useTesterCertificate, boolean hasLegacyTests, + static byte[] servicesXml(boolean systemUsesAthenz, boolean useTesterCertificate, boolean hasLegacyTests, NodeResources resources, ControllerConfig.Steprunner.Testerapp config) { int jdiscMemoryGb = 2; // 2Gb memory for tester application which uses Maven. int jdiscMemoryPct = (int) Math.ceil(100 * jdiscMemoryGb / resources.memoryGb()); @@ -279,7 +297,7 @@ public class TestPackage { } /** Returns a dummy deployment xml which sets up the service identity for the tester, if present. */ - public static byte[] deploymentXml(TesterId id, Optional<AthenzDomain> athenzDomain, Optional<AthenzService> athenzService) { + static byte[] deploymentXml(TesterId id, Optional<AthenzDomain> athenzDomain, Optional<AthenzService> athenzService) { String deploymentSpec = "<?xml version='1.0' encoding='UTF-8'?>\n" + "<deployment version=\"1.0\" " + diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/ZipEntries.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/ZipEntries.java index 6bbcd551924..185c97f866e 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/ZipEntries.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/ZipEntries.java @@ -15,6 +15,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.NoSuchElementException; import java.util.Objects; import java.util.Optional; import java.util.function.Predicate; @@ -35,36 +36,6 @@ public class ZipEntries { this.entries = List.copyOf(Objects.requireNonNull(entries)); } - /** Copies the zipped content from in to out, adding/overwriting an entry with the given name and content. */ - public static void transferAndWrite(OutputStream out, InputStream in, String name, byte[] content) { - transferAndWrite(out, in, Map.of(name, content)); - } - - /** Copies the zipped content from in to out, adding/overwriting/removing (on {@code null}) entries as specified. */ - public static void transferAndWrite(OutputStream out, InputStream in, Map<String, byte[]> entries) { - try (ZipOutputStream zipOut = new ZipOutputStream(out); - ZipInputStream zipIn = new ZipInputStream(in)) { - for (ZipEntry entry = zipIn.getNextEntry(); entry != null; entry = zipIn.getNextEntry()) { - if (entries.containsKey(entry.getName())) - continue; - - zipOut.putNextEntry(new ZipEntry(entry.getName())); - zipIn.transferTo(zipOut); - zipOut.closeEntry(); - } - for (Entry<String, byte[]> entry : entries.entrySet()) { - if (entry.getValue() != null) { - zipOut.putNextEntry(new ZipEntry(entry.getKey())); - zipOut.write(entry.getValue()); - zipOut.closeEntry(); - } - } - } - catch (IOException e) { - throw new UncheckedIOException(e); - } - } - /** Read ZIP entries from inputStream */ public static ZipEntries from(byte[] zip, Predicate<String> entryNameMatcher, int maxEntrySizeInBytes, boolean throwIfEntryExceedsMaxSize) { @@ -107,7 +78,7 @@ public class ZipEntries { } public String name() { return name; } - public byte[] contentOrThrow() { return content.orElseThrow(); } + public byte[] contentOrThrow() { return content.orElseThrow(() -> new NoSuchElementException("'" + name + "' has no content")); } public Optional<byte[]> content() { return content; } public long size() { return size; } } diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/InternalStepRunner.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/InternalStepRunner.java index e8c92d3e3f6..efe072c2a6d 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/InternalStepRunner.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/InternalStepRunner.java @@ -34,6 +34,7 @@ import com.yahoo.vespa.hosted.controller.application.Deployment; import com.yahoo.vespa.hosted.controller.application.Endpoint; import com.yahoo.vespa.hosted.controller.application.TenantAndApplicationId; import com.yahoo.vespa.hosted.controller.application.pkg.ApplicationPackage; +import com.yahoo.vespa.hosted.controller.application.pkg.ApplicationPackageStream; import com.yahoo.vespa.hosted.controller.application.pkg.TestPackage; import com.yahoo.vespa.hosted.controller.maintenance.JobRunner; import com.yahoo.vespa.hosted.controller.notification.Notification; @@ -43,6 +44,7 @@ import com.yahoo.vespa.hosted.controller.routing.context.DeploymentRoutingContex import com.yahoo.yolean.Exceptions; import java.io.ByteArrayOutputStream; +import java.io.InputStream; import java.io.PrintStream; import java.io.UncheckedIOException; import java.security.cert.CertificateExpiredException; @@ -248,7 +250,9 @@ public class InternalStepRunner implements StepRunner { } case LOAD_BALANCER_NOT_READY, PARENT_HOST_NOT_READY -> { logger.log(e.message()); // Consider splitting these messages in summary and details, on config server. - controller.jobController().locked(id, run -> run.sleepingUntil(startTime.plusSeconds(300))); + Instant someTimeAfterStart = startTime.plusSeconds(450); + Instant inALittleWhile = controller.clock().instant().plusSeconds(90); + controller.jobController().locked(id, run -> run.sleepingUntil(someTimeAfterStart.isAfter(inALittleWhile) ? someTimeAfterStart : inALittleWhile)); return result; } case NODE_ALLOCATION_FAILURE -> { @@ -926,14 +930,13 @@ public class InternalStepRunner implements StepRunner { } /** Returns the application package for the tester application, assembled from a generated config, fat-jar and services.xml. */ - private ApplicationPackage testerPackage(RunId id) { + private ApplicationPackageStream testerPackage(RunId id) { RevisionId revision = controller.jobController().run(id).versions().targetRevision(); DeploymentSpec spec = controller.applications().requireApplication(TenantAndApplicationId.from(id.application())).deploymentSpec(); - byte[] testZip = controller.applications().applicationStore().getTester(id.application().tenant(), - id.application().application(), revision); boolean useTesterCertificate = useTesterCertificate(id); - TestPackage testPackage = new TestPackage(testZip, + TestPackage testPackage = new TestPackage(() -> controller.applications().applicationStore().streamTester(id.application().tenant(), + id.application().application(), revision), controller.system().isPublic(), id, controller.controllerConfig().steprunner().testerapp(), diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/JobController.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/JobController.java index 08cf8d2e1c4..f94bd51fe4c 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/JobController.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/JobController.java @@ -207,32 +207,36 @@ public class JobController { return run; List<LogEntry> log; - Instant deployedAt; + Optional<Instant> deployedAt; Instant from; if ( ! run.id().type().isProduction()) { - deployedAt = run.stepInfo(installInitialReal).or(() -> run.stepInfo(installReal)).flatMap(StepInfo::startTime).orElseThrow(); - from = run.lastVespaLogTimestamp().isAfter(run.start()) ? run.lastVespaLogTimestamp() : deployedAt.minusSeconds(10); - log = LogEntry.parseVespaLog(controller.serviceRegistry().configServer() - .getLogs(new DeploymentId(id.application(), zone), - Map.of("from", Long.toString(from.toEpochMilli()))), - from); + deployedAt = run.stepInfo(installInitialReal).or(() -> run.stepInfo(installReal)).flatMap(StepInfo::startTime); + if (deployedAt.isPresent()) { + from = run.lastVespaLogTimestamp().isAfter(run.start()) ? run.lastVespaLogTimestamp() : deployedAt.get().minusSeconds(10); + log = LogEntry.parseVespaLog(controller.serviceRegistry().configServer() + .getLogs(new DeploymentId(id.application(), zone), + Map.of("from", Long.toString(from.toEpochMilli()))), + from); + } + else log = List.of(); } - else - log = List.of(); + else log = List.of(); if (id.type().isTest()) { - deployedAt = run.stepInfo(installTester).flatMap(StepInfo::startTime).orElseThrow(); - from = run.lastVespaLogTimestamp().isAfter(run.start()) ? run.lastVespaLogTimestamp() : deployedAt.minusSeconds(10); - List<LogEntry> testerLog = LogEntry.parseVespaLog(controller.serviceRegistry().configServer() - .getLogs(new DeploymentId(id.tester().id(), zone), - Map.of("from", Long.toString(from.toEpochMilli()))), - from); - - Instant justNow = controller.clock().instant().minusSeconds(2); - log = Stream.concat(log.stream(), testerLog.stream()) - .filter(entry -> entry.at().isBefore(justNow)) - .sorted(comparing(LogEntry::at)) - .collect(toUnmodifiableList()); + deployedAt = run.stepInfo(installTester).flatMap(StepInfo::startTime); + if (deployedAt.isPresent()) { + from = run.lastVespaLogTimestamp().isAfter(run.start()) ? run.lastVespaLogTimestamp() : deployedAt.get().minusSeconds(10); + List<LogEntry> testerLog = LogEntry.parseVespaLog(controller.serviceRegistry().configServer() + .getLogs(new DeploymentId(id.tester().id(), zone), + Map.of("from", Long.toString(from.toEpochMilli()))), + from); + + Instant justNow = controller.clock().instant().minusSeconds(2); + log = Stream.concat(log.stream(), testerLog.stream()) + .filter(entry -> entry.at().isBefore(justNow)) + .sorted(comparing(LogEntry::at)) + .toList(); + } } if (log.isEmpty()) return run; diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/proxy/ConfigServerRestExecutorImpl.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/proxy/ConfigServerRestExecutorImpl.java index 9bea7fb829d..4f4e21d9f25 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/proxy/ConfigServerRestExecutorImpl.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/proxy/ConfigServerRestExecutorImpl.java @@ -58,7 +58,7 @@ import static com.yahoo.yolean.Exceptions.uncheck; public class ConfigServerRestExecutorImpl extends AbstractComponent implements ConfigServerRestExecutor { private static final Logger LOG = Logger.getLogger(ConfigServerRestExecutorImpl.class.getName()); - private static final Duration PROXY_REQUEST_TIMEOUT = Duration.ofSeconds(10); + private static final Duration PROXY_REQUEST_TIMEOUT = Duration.ofSeconds(20); private static final Duration PING_REQUEST_TIMEOUT = Duration.ofMillis(500); private static final Duration SINGLE_TARGET_WAIT = Duration.ofSeconds(2); private static final int SINGLE_TARGET_RETRIES = 3; diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiHandler.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiHandler.java index 0abf1470d29..d8acd2aa8b2 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiHandler.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiHandler.java @@ -305,7 +305,6 @@ public class ApplicationApiHandler extends AuditLoggingRequestHandler { if (path.matches("/application/v4/tenant/{tenant}/info/billing")) return withCloudTenant(path.get("tenant"), request, this::putTenantInfoBilling); if (path.matches("/application/v4/tenant/{tenant}/info/contacts")) return withCloudTenant(path.get("tenant"), request, this::putTenantInfoContacts); if (path.matches("/application/v4/tenant/{tenant}/info/resend-mail-verification")) return withCloudTenant(path.get("tenant"), request, this::resendEmailVerification); - if (path.matches("/application/v4/tenant/{tenant}/archive-access")) return allowAwsArchiveAccess(path.get("tenant"), request); // TODO(enygaard, 2022-05-25) Remove when no longer used by console if (path.matches("/application/v4/tenant/{tenant}/archive-access/aws")) return allowAwsArchiveAccess(path.get("tenant"), request); if (path.matches("/application/v4/tenant/{tenant}/archive-access/gcp")) return allowGcpArchiveAccess(path.get("tenant"), request); if (path.matches("/application/v4/tenant/{tenant}/secret-store/{name}")) return addSecretStore(path.get("tenant"), path.get("name"), request); @@ -355,7 +354,6 @@ public class ApplicationApiHandler extends AuditLoggingRequestHandler { if (path.matches("/application/v4/tenant/{tenant}")) return deleteTenant(path.get("tenant"), request); if (path.matches("/application/v4/tenant/{tenant}/access/managed/operator")) return removeManagedAccess(path.get("tenant")); if (path.matches("/application/v4/tenant/{tenant}/key")) return removeDeveloperKey(path.get("tenant"), request); - if (path.matches("/application/v4/tenant/{tenant}/archive-access")) return removeAwsArchiveAccess(path.get("tenant")); // TODO(enygaard, 2022-05-25) Remove when no longer used by console if (path.matches("/application/v4/tenant/{tenant}/archive-access/aws")) return removeAwsArchiveAccess(path.get("tenant")); if (path.matches("/application/v4/tenant/{tenant}/archive-access/gcp")) return removeGcpArchiveAccess(path.get("tenant")); if (path.matches("/application/v4/tenant/{tenant}/secret-store/{name}")) return deleteSecretStore(path.get("tenant"), path.get("name"), request); @@ -2626,8 +2624,6 @@ public class ApplicationApiHandler extends AuditLoggingRequestHandler { log.warning(String.format("Failed to get quota for tenant %s: %s", tenant.name(), Exceptions.toMessageString(e))); } - // TODO(enygaard, 2022-05-25) Remove when console is using new archive access structure - cloudTenant.archiveAccess().awsRole().ifPresent(role -> object.setString("archiveAccessRole", role)); toSlime(cloudTenant.archiveAccess(), object.setObject("archiveAccess")); break; diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackageTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackageTest.java index 8cf861ff963..8ac8b87ac45 100644 --- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackageTest.java +++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackageTest.java @@ -3,18 +3,47 @@ package com.yahoo.vespa.hosted.controller.application.pkg; import com.yahoo.config.application.api.DeploymentSpec; import com.yahoo.config.application.api.ValidationId; +import com.yahoo.io.LazyInputStream; +import com.yahoo.security.KeyAlgorithm; +import com.yahoo.security.KeyUtils; +import com.yahoo.security.SignatureAlgorithm; +import com.yahoo.security.X509CertificateBuilder; import org.junit.jupiter.api.Test; +import javax.security.auth.x500.X500Principal; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.PrintStream; +import java.io.SequenceInputStream; +import java.math.BigInteger; import java.nio.file.Files; import java.nio.file.Path; +import java.security.KeyPair; +import java.security.cert.X509Certificate; import java.time.Instant; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.function.Predicate; +import java.util.function.UnaryOperator; import java.util.stream.Collectors; +import java.util.stream.IntStream; +import static com.yahoo.vespa.hosted.controller.application.pkg.ApplicationPackage.filesZip; +import static com.yahoo.vespa.hosted.controller.application.pkg.ApplicationPackageStream.addingCertificate; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -24,35 +53,41 @@ import static org.junit.jupiter.api.Assertions.fail; */ public class ApplicationPackageTest { - static final String deploymentXml = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n" + - "<deployment version=\"1.0\">\n" + - " <test />\n" + - " <prod>\n" + - " <parallel>\n" + - " <region active=\"true\">us-central-1</region>\n" + - " </parallel>\n" + - " </prod>\n" + - "</deployment>\n"; - - static final String servicesXml = "<services version='1.0' xmlns:deploy=\"vespa\" xmlns:preprocess=\"properties\">\n" + - " <preprocess:include file='jdisc.xml' />\n" + - " <content version='1.0' if='foo' />\n" + - " <content version='1.0' id='foo' deploy:environment='staging prod' deploy:region='us-east-3 us-central-1'>\n" + - " <preprocess:include file='content/content.xml' />\n" + - " </content>\n" + - " <preprocess:include file='not_found.xml' required='false' />\n" + - "</services>\n"; + static final String deploymentXml = """ + <?xml version="1.0" encoding="UTF-8"?> + <deployment version="1.0"> + <test /> + <prod> + <parallel> + <region active="true">us-central-1</region> + </parallel> + </prod> + </deployment> + """; + + static final String servicesXml = """ + <services version='1.0' xmlns:deploy="vespa" xmlns:preprocess="properties"> + <preprocess:include file='jdisc.xml' /> + <content version='1.0' if='foo' /> + <content version='1.0' id='foo' deploy:environment='staging prod' deploy:region='us-east-3 us-central-1'> + <preprocess:include file='content/content.xml' /> + </content> + <preprocess:include file='not_found.xml' required='false' /> + </services> + """; private static final String jdiscXml = "<container id='stateless' version='1.0' />\n"; - private static final String contentXml = "<documents>\n" + - " <document type=\"music.sd\" mode=\"index\" />\n" + - "</documents>\n" + - "<preprocess:include file=\"nodes.xml\" />"; + private static final String contentXml = """ + <documents> + <document type="music.sd" mode="index" /> + </documents> + <preprocess:include file="nodes.xml" />"""; - private static final String nodesXml = "<nodes>\n" + - " <node hostalias=\"node0\" distribution-key=\"0\" />\n" + - "</nodes>"; + private static final String nodesXml = """ + <nodes> + <node hostalias="node0" distribution-key="0" /> + </nodes>"""; @Test void test_createEmptyForDeploymentRemoval() { @@ -67,22 +102,22 @@ public class ApplicationPackageTest { @Test void testMetaData() { - byte[] zip = ApplicationPackage.filesZip(Map.of("services.xml", servicesXml.getBytes(UTF_8), - "jdisc.xml", jdiscXml.getBytes(UTF_8), - "content/content.xml", contentXml.getBytes(UTF_8), - "content/nodes.xml", nodesXml.getBytes(UTF_8), - "gurba", "gurba".getBytes(UTF_8))); + byte[] zip = filesZip(Map.of("services.xml", servicesXml.getBytes(UTF_8), + "jdisc.xml", jdiscXml.getBytes(UTF_8), + "content/content.xml", contentXml.getBytes(UTF_8), + "content/nodes.xml", nodesXml.getBytes(UTF_8), + "gurba", "gurba".getBytes(UTF_8))); assertEquals(Map.of("services.xml", servicesXml, - "jdisc.xml", jdiscXml, - "content/content.xml", contentXml, - "content/nodes.xml", nodesXml), - unzip(new ApplicationPackage(zip, false).metaDataZip())); + "jdisc.xml", jdiscXml, + "content/content.xml", contentXml, + "content/nodes.xml", nodesXml), + unzip(new ApplicationPackage(zip, false).metaDataZip())); } @Test void testMetaDataWithMissingFiles() { - byte[] zip = ApplicationPackage.filesZip(Map.of("services.xml", servicesXml.getBytes(UTF_8))); + byte[] zip = filesZip(Map.of("services.xml", servicesXml.getBytes(UTF_8))); try { new ApplicationPackage(zip, false).metaDataZip(); @@ -132,15 +167,165 @@ public class ApplicationPackageTest { assertEquals(originalPackage.bundleHash(), similarDeploymentXml.bundleHash()); } - private static Map<String, String> unzip(byte[] zip) { - return ZipEntries.from(zip, __ -> true, 1 << 10, true) + static Map<String, String> unzip(byte[] zip) { + return ZipEntries.from(zip, __ -> true, 1 << 24, true) .asList().stream() .collect(Collectors.toMap(ZipEntries.ZipEntryWithContent::name, - entry -> new String(entry.contentOrThrow(), UTF_8))); + entry -> new String(entry.content().orElse(new byte[0]), UTF_8))); } - private ApplicationPackage getApplicationZip(String path) throws Exception { + private ApplicationPackage getApplicationZip(String path) throws IOException { return new ApplicationPackage(Files.readAllBytes(Path.of("src/test/resources/application-packages/" + path)), true); } + @Test + void test_replacement() throws IOException { + byte[] zip = zip(Map.of()); + List<X509Certificate> certificates = IntStream.range(0, 3) + .mapToObj(i -> { + KeyPair keyPair = KeyUtils.generateKeypair(KeyAlgorithm.EC, 256); + X500Principal subject = new X500Principal("CN=subject" + i); + return X509CertificateBuilder.fromKeypair(keyPair, + subject, + Instant.now(), + Instant.now().plusSeconds(1), + SignatureAlgorithm.SHA512_WITH_ECDSA, + BigInteger.valueOf(1)) + .build(); + }).toList(); + + assertEquals(List.of(), new ApplicationPackage(zip).trustedCertificates()); + for (int i = 0; i < certificates.size(); i++) { + InputStream in = new ByteArrayInputStream(zip); + zip = new ApplicationPackageStream(() -> in, () -> __ -> false, addingCertificate(Optional.of(certificates.get(i)))).zipStream().readAllBytes(); + assertEquals(certificates.subList(0, i + 1), new ApplicationPackage(zip).trustedCertificates()); + } + } + + static byte[] zip(Map<String, String> content) { + return filesZip(content.entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey(), + entry -> entry.getValue().getBytes(UTF_8)))); + } + + private static class AngryStreams { + + private final byte[] content; + private final Map<ByteArrayInputStream, Throwable> streams = new LinkedHashMap<>(); + + AngryStreams(byte[] content) { + this.content = content; + } + + InputStream stream() { + ByteArrayInputStream stream = new ByteArrayInputStream(Arrays.copyOf(content, content.length)) { + boolean closed = false; + @Override public void close() { closed = true; } + @Override public int read() { assertFalse(closed); return super.read(); } + @Override public int read(byte[] b, int off, int len) { assertFalse(closed); return super.read(b, off, len); } + @Override public long transferTo(OutputStream out) throws IOException { assertFalse(closed); return super.transferTo(out); } + @Override public byte[] readAllBytes() { assertFalse(closed); return super.readAllBytes(); } + }; + streams.put(stream, new Throwable()); + return stream; + } + + void verifyAllRead() { + streams.forEach((stream, stack) -> assertEquals(0, stream.available(), + "unconsumed content in stream created at " + + new ByteArrayOutputStream() {{ stack.printStackTrace(new PrintStream(this)); }})); + } + + } + + @Test + void testApplicationPackageStream() throws Exception { + Map<String, String> content = Map.of("deployment.xml", deploymentXml, + "services.xml", servicesXml, + "jdisc.xml", jdiscXml, + "unused1.xml", jdiscXml, + "content/content.xml", contentXml, + "content/nodes.xml", nodesXml, + "gurba", "gurba"); + byte[] zip = zip(content); + assertEquals(content, unzip(zip)); + AngryStreams angry = new AngryStreams(zip); + + ApplicationPackageStream identity = new ApplicationPackageStream(angry::stream); + InputStream lazy = new LazyInputStream(() -> new ByteArrayInputStream(identity.truncatedPackage().zippedContent())); + assertEquals("must completely exhaust input before reading package", + assertThrows(IllegalStateException.class, identity::truncatedPackage).getMessage()); + + // Verify no content has changed when passing through the stream. + ByteArrayOutputStream out = new ByteArrayOutputStream(); + try (InputStream stream = identity.zipStream()) { stream.transferTo(out); } + assertEquals(content, unzip(out.toByteArray())); + assertEquals(content, unzip(identity.truncatedPackage().zippedContent())); + assertEquals(content, unzip(lazy.readAllBytes())); + ApplicationPackage original = new ApplicationPackage(zip); + assertEquals(unzip(original.metaDataZip()), unzip(identity.truncatedPackage().metaDataZip())); + assertEquals(original.bundleHash(), identity.truncatedPackage().bundleHash()); + + // Change deployment.xml, remove unused1.xml and add unused2.xml + Map<String, UnaryOperator<InputStream>> replacements = Map.of("deployment.xml", in -> new SequenceInputStream(in, new ByteArrayInputStream("\n\n".getBytes(UTF_8))), + "unused1.xml", in -> null, + "unused2.xml", __ -> new ByteArrayInputStream(jdiscXml.getBytes(UTF_8))); + Predicate<String> truncation = name -> name.endsWith(".xml"); + ApplicationPackageStream modifier = new ApplicationPackageStream(angry::stream, () -> truncation, replacements); + out.reset(); + + InputStream partiallyRead = modifier.zipStream(); + assertEquals(15, partiallyRead.readNBytes(15).length); + + try (InputStream stream = modifier.zipStream()) { stream.transferTo(out); } + + assertEquals(Map.of("deployment.xml", deploymentXml + "\n\n", + "services.xml", servicesXml, + "jdisc.xml", jdiscXml, + "unused2.xml", jdiscXml, + "content/content.xml", contentXml, + "content/nodes.xml", nodesXml, + "gurba", "gurba"), + unzip(out.toByteArray())); + + assertEquals(Map.of("deployment.xml", deploymentXml + "\n\n", + "services.xml", servicesXml, + "jdisc.xml", jdiscXml, + "unused2.xml", jdiscXml, + "content/content.xml", contentXml, + "content/nodes.xml", nodesXml), + unzip(modifier.truncatedPackage().zippedContent())); + + // Compare retained metadata for an updated original package, and the truncated package of the modifier. + assertEquals(unzip(new ApplicationPackage(zip(Map.of("deployment.xml", deploymentXml + "\n\n", // Expected to change. + "services.xml", servicesXml, + "jdisc.xml", jdiscXml, + "unused1.xml", jdiscXml, // Irrelevant. + "content/content.xml", contentXml, + "content/nodes.xml", nodesXml, + "gurba", "gurba"))).metaDataZip()), + unzip(modifier.truncatedPackage().metaDataZip())); + + try (InputStream stream1 = modifier.zipStream(); + InputStream stream2 = modifier.zipStream()) { + assertArrayEquals(stream1.readAllBytes(), + stream2.readAllBytes()); + } + + ByteArrayOutputStream byteAtATime = new ByteArrayOutputStream(); + try (InputStream stream1 = modifier.zipStream(); + InputStream stream2 = modifier.zipStream()) { + for (int b; (b = stream1.read()) != -1; ) byteAtATime.write(b); + assertArrayEquals(stream2.readAllBytes(), + byteAtATime.toByteArray()); + } + + assertEquals(byteAtATime.size(), + 15 + partiallyRead.readAllBytes().length); + partiallyRead.close(); + + try (InputStream stream = modifier.zipStream()) { stream.readNBytes(12); } + + angry.verifyAllRead(); + } + } diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/pkg/TestPackageTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/pkg/TestPackageTest.java index bff0ccc8ae1..6da8db1c259 100644 --- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/pkg/TestPackageTest.java +++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/pkg/TestPackageTest.java @@ -1,18 +1,24 @@ package com.yahoo.vespa.hosted.controller.application.pkg; import com.yahoo.config.application.api.DeploymentSpec; +import com.yahoo.config.provision.ApplicationId; import com.yahoo.config.provision.NodeResources; import com.yahoo.config.provision.zone.ZoneId; +import com.yahoo.vespa.hosted.controller.api.integration.deployment.JobType; +import com.yahoo.vespa.hosted.controller.api.integration.deployment.RunId; import com.yahoo.vespa.hosted.controller.application.pkg.TestPackage.TestSummary; import com.yahoo.vespa.hosted.controller.config.ControllerConfig; +import com.yahoo.vespa.hosted.controller.config.ControllerConfig.Steprunner.Testerapp; import org.junit.jupiter.api.Test; +import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.jar.JarOutputStream; import java.util.zip.ZipEntry; @@ -20,9 +26,11 @@ import static com.yahoo.vespa.hosted.controller.api.integration.deployment.Teste import static com.yahoo.vespa.hosted.controller.api.integration.deployment.TesterCloud.Suite.staging; import static com.yahoo.vespa.hosted.controller.api.integration.deployment.TesterCloud.Suite.staging_setup; import static com.yahoo.vespa.hosted.controller.api.integration.deployment.TesterCloud.Suite.system; +import static com.yahoo.vespa.hosted.controller.application.pkg.ApplicationPackageTest.unzip; import static com.yahoo.vespa.hosted.controller.application.pkg.TestPackage.validateTests; import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; /** * @author jonmv @@ -77,15 +85,15 @@ public class TestPackageTest { @Test void testBundleValidation() throws IOException { byte[] testZip = ApplicationPackage.filesZip(Map.of("components/foo-tests.jar", testsJar("SystemTest", "StagingSetup", "ProductionTest"), - "artifacts/key", new byte[0])); + "artifacts/key", new byte[0])); TestSummary summary = validateTests(List.of(system), testZip); assertEquals(List.of(system, staging_setup, production), summary.suites()); assertEquals(List.of("test package contains 'artifacts/key'; this conflicts with credentials used to run tests in Vespa Cloud", - "test package has staging setup, so it should also include staging tests", - "test package has production tests, but no production tests are declared in deployment.xml", - "see https://docs.vespa.ai/en/testing.html for details on how to write system tests for Vespa"), - summary.problems()); + "test package has staging setup, so it should also include staging tests", + "test package has production tests, but no production tests are declared in deployment.xml", + "see https://docs.vespa.ai/en/testing.html for details on how to write system tests for Vespa"), + summary.problems()); } @Test @@ -95,20 +103,47 @@ public class TestPackageTest { assertEquals(List.of(staging, production), summary.suites()); assertEquals(List.of("test package has staging tests, so it should also include staging setup", - "see https://docs.vespa.ai/en/testing.html for details on how to write system tests for Vespa"), - summary.problems()); + "see https://docs.vespa.ai/en/testing.html for details on how to write system tests for Vespa"), + summary.problems()); } @Test void testBasicTestsValidation() { byte[] testZip = ApplicationPackage.filesZip(Map.of("tests/staging-test/foo.json", new byte[0], - "tests/staging-setup/foo.json", new byte[0])); + "tests/staging-setup/foo.json", new byte[0])); TestSummary summary = validateTests(List.of(system, production), testZip); assertEquals(List.of(staging_setup, staging), summary.suites()); assertEquals(List.of("test package has no system tests, but <test /> is declared in deployment.xml", - "test package has no production tests, but production tests are declared in deployment.xml", - "see https://docs.vespa.ai/en/testing.html for details on how to write system tests for Vespa"), - summary.problems()); + "test package has no production tests, but production tests are declared in deployment.xml", + "see https://docs.vespa.ai/en/testing.html for details on how to write system tests for Vespa"), + summary.problems()); + } + + @Test + void testTestPacakgeAssembly() throws IOException { + byte[] bundleZip = ApplicationPackage.filesZip(Map.of("components/foo-tests.jar", testsJar("SystemTest", "ProductionTest"), + "artifacts/key", new byte[0])); + TestPackage bundleTests = new TestPackage(() -> new ByteArrayInputStream(bundleZip), + false, + new RunId(ApplicationId.defaultId(), JobType.dev("abc"), 123), + new Testerapp.Builder().tenantCdBundle("foo").runtimeProviderClass("bar").build(), + DeploymentSpec.fromXml(""" + <deployment> + <test /> + </deployment> + """), + null, + null); + + Map<String, String> bundlePackage = unzip(bundleTests.asApplicationPackage().zipStream().readAllBytes()); + bundlePackage.keySet().removeIf(name -> name.startsWith("tests/.ignore") || name.startsWith("artifacts/.ignore")); + assertEquals(Set.of("deployment.xml", + "services.xml", + "components/foo-tests.jar", + "artifacts/key"), + bundlePackage.keySet()); + assertEquals(Map.of(), + unzip(bundleTests.asApplicationPackage().truncatedPackage().zippedContent())); } @Test diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/pkg/ZipEntriesTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/pkg/ZipEntriesTest.java deleted file mode 100644 index 37062e1002b..00000000000 --- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/pkg/ZipEntriesTest.java +++ /dev/null @@ -1,50 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.hosted.controller.application.pkg; - -import com.yahoo.security.KeyAlgorithm; -import com.yahoo.security.KeyUtils; -import com.yahoo.security.SignatureAlgorithm; -import com.yahoo.security.X509CertificateBuilder; -import org.junit.jupiter.api.Test; - -import javax.security.auth.x500.X500Principal; -import java.math.BigInteger; -import java.security.KeyPair; -import java.security.cert.X509Certificate; -import java.time.Instant; -import java.util.List; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -/** - * @author mpolden - */ -public class ZipEntriesTest { - - @Test - void test_replacement() { - ApplicationPackage applicationPackage = new ApplicationPackage(new byte[0]); - List<X509Certificate> certificates = IntStream.range(0, 3) - .mapToObj(i -> { - KeyPair keyPair = KeyUtils.generateKeypair(KeyAlgorithm.EC, 256); - X500Principal subject = new X500Principal("CN=subject" + i); - return X509CertificateBuilder.fromKeypair(keyPair, - subject, - Instant.now(), - Instant.now().plusSeconds(1), - SignatureAlgorithm.SHA512_WITH_ECDSA, - BigInteger.valueOf(1)) - .build(); - }) - .collect(Collectors.toUnmodifiableList()); - - assertEquals(List.of(), applicationPackage.trustedCertificates()); - for (int i = 0; i < certificates.size(); i++) { - applicationPackage = applicationPackage.withTrustedCertificate(certificates.get(i)); - assertEquals(certificates.subList(0, i + 1), applicationPackage.trustedCertificates()); - } - } - -} diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/InternalStepRunnerTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/InternalStepRunnerTest.java index 9bf762d2f99..2f245ab9736 100644 --- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/InternalStepRunnerTest.java +++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/InternalStepRunnerTest.java @@ -516,7 +516,7 @@ public class InternalStepRunnerTest { assertEquals(oldTrusted, tester.configServer().application(app.instanceId(), id.type().zone()).get().applicationPackage().trustedCertificates()); tester.configServer().throwOnNextPrepare(null); - tester.clock().advance(Duration.ofSeconds(300)); + tester.clock().advance(Duration.ofSeconds(450)); tester.runner().run(); assertEquals(succeeded, tester.jobs().run(id).stepStatuses().get(Step.deployTester)); assertEquals(succeeded, tester.jobs().run(id).stepStatuses().get(Step.deployReal)); diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/integration/ApplicationStoreMock.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/integration/ApplicationStoreMock.java index 8ed38761c95..e025a3bea4f 100644 --- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/integration/ApplicationStoreMock.java +++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/integration/ApplicationStoreMock.java @@ -12,6 +12,8 @@ import com.yahoo.vespa.hosted.controller.api.integration.deployment.RevisionId; import com.yahoo.vespa.hosted.controller.api.integration.deployment.TesterId; import com.yahoo.vespa.hosted.controller.application.TenantAndApplicationId; +import java.io.ByteArrayInputStream; +import java.io.InputStream; import java.time.Instant; import java.util.Map; import java.util.NavigableMap; @@ -46,15 +48,14 @@ public class ApplicationStoreMock implements ApplicationStore { } @Override - public byte[] get(DeploymentId deploymentId, RevisionId revisionId) { + public InputStream stream(DeploymentId deploymentId, RevisionId revisionId) { if ( ! revisionId.isProduction()) - return requireNonNull(devStore.get(deploymentId)); + return new ByteArrayInputStream(devStore.get(deploymentId)); TenantAndApplicationId tenantAndApplicationId = TenantAndApplicationId.from(deploymentId.applicationId()); byte[] bytes = store.get(appId(tenantAndApplicationId.tenant(), tenantAndApplicationId.application())).get(revisionId); - if (bytes == null) - throw new NotExistsException("No " + revisionId + " found for " + tenantAndApplicationId); - return bytes; + if (bytes == null) throw new NotExistsException("No " + revisionId + " found for " + tenantAndApplicationId); + return new ByteArrayInputStream(bytes); } @Override @@ -96,8 +97,8 @@ public class ApplicationStoreMock implements ApplicationStore { } @Override - public byte[] getTester(TenantName tenant, ApplicationName application, RevisionId revision) { - return requireNonNull(store.get(testerId(tenant, application)).get(revision)); + public InputStream streamTester(TenantName tenant, ApplicationName application, RevisionId revision) { + return new ByteArrayInputStream(store.get(testerId(tenant, application)).get(revision)); } diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/integration/ConfigServerMock.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/integration/ConfigServerMock.java index 07d9efdf8fc..eaa178c9727 100644 --- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/integration/ConfigServerMock.java +++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/integration/ConfigServerMock.java @@ -42,9 +42,12 @@ import com.yahoo.vespa.hosted.controller.api.integration.noderepository.RestartF import com.yahoo.vespa.hosted.controller.api.integration.secrets.TenantSecretStore; import com.yahoo.vespa.hosted.controller.application.SystemApplication; import com.yahoo.vespa.hosted.controller.application.pkg.ApplicationPackage; +import wiremock.org.checkerframework.checker.units.qual.A; import java.io.ByteArrayInputStream; +import java.io.IOException; import java.io.InputStream; +import java.io.UncheckedIOException; import java.net.URI; import java.time.Duration; import java.time.Instant; @@ -376,6 +379,13 @@ public class ConfigServerMock extends AbstractComponent implements ConfigServer @Override public PreparedApplication deploy(DeploymentData deployment) { + ApplicationPackage appPackage; + try (InputStream in = deployment.applicationPackage()) { + appPackage = new ApplicationPackage(in.readAllBytes()); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } lastPrepareVersion = deployment.platform(); if (prepareException != null) prepareException.accept(ApplicationId.from(deployment.instance().tenant(), @@ -383,8 +393,9 @@ public class ConfigServerMock extends AbstractComponent implements ConfigServer deployment.instance().instance())); DeploymentId id = new DeploymentId(deployment.instance(), deployment.zone()); - applications.put(id, new Application(id.applicationId(), lastPrepareVersion, new ApplicationPackage(deployment.applicationPackage()))); + applications.put(id, new Application(id.applicationId(), lastPrepareVersion, appPackage)); ClusterSpec.Id cluster = ClusterSpec.Id.from("default"); + deployment.endpointCertificateMetadata(); // Supplier with side effects >_< if (nodeRepository().list(id.zoneId(), NodeFilter.all().applications(id.applicationId())).isEmpty()) provision(id.zoneId(), id.applicationId(), cluster); diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiCloudTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiCloudTest.java index 3b5a09e4a74..a1e70b77948 100644 --- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiCloudTest.java +++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiCloudTest.java @@ -384,20 +384,17 @@ public class ApplicationApiCloudTest extends ControllerContainerCloudTest { tester.assertResponse(request("/application/v4/tenant/scoober", GET).roles(Role.reader(tenantName)), (response) -> assertFalse(response.getBodyAsString().contains("archiveAccessRole")), 200); - tester.assertResponse(request("/application/v4/tenant/scoober/archive-access", PUT) - .data("{\"role\":\"dummy\"}").roles(Role.administrator(tenantName)), - "{\"error-code\":\"BAD_REQUEST\",\"message\":\"Invalid archive access role 'dummy': Must match expected pattern: 'arn:aws:iam::\\\\d{12}:.+'\"}", 400); tester.assertResponse(request("/application/v4/tenant/scoober/archive-access/aws", PUT) .data("{\"role\":\"arn:aws:iam::123456789012:role/my-role\"}").roles(Role.administrator(tenantName)), "{\"message\":\"AWS archive access role set to 'arn:aws:iam::123456789012:role/my-role' for tenant scoober.\"}", 200); tester.assertResponse(request("/application/v4/tenant/scoober", GET).roles(Role.reader(tenantName)), - (response) -> assertTrue(response.getBodyAsString().contains("\"archiveAccessRole\":\"arn:aws:iam::123456789012:role/my-role\"")), + (response) -> assertTrue(response.getBodyAsString().contains("\"awsRole\":\"arn:aws:iam::123456789012:role/my-role\"")), 200); tester.assertResponse(request("/application/v4/tenant/scoober/archive-access/aws", DELETE).roles(Role.administrator(tenantName)), "{\"message\":\"AWS archive access role removed for tenant scoober.\"}", 200); tester.assertResponse(request("/application/v4/tenant/scoober", GET).roles(Role.reader(tenantName)), - (response) -> assertFalse(response.getBodyAsString().contains("\"archiveAccessRole\":\"arn:aws:iam::123456789012:role/my-role\"")), + (response) -> assertFalse(response.getBodyAsString().contains("\"awsRole\":\"arn:aws:iam::123456789012:role/my-role\"")), 200); tester.assertResponse(request("/application/v4/tenant/scoober/archive-access/gcp", PUT) @@ -412,25 +409,25 @@ public class ApplicationApiCloudTest extends ControllerContainerCloudTest { (response) -> assertFalse(response.getBodyAsString().contains("\"gcpMember\":\"user:test@example.com\"")), 200); - tester.assertResponse(request("/application/v4/tenant/scoober/archive-access", PUT) + tester.assertResponse(request("/application/v4/tenant/scoober/archive-access/aws", PUT) .data("{\"role\":\"arn:aws:iam::123456789012:role/my-role\"}").roles(Role.administrator(tenantName)), "{\"message\":\"AWS archive access role set to 'arn:aws:iam::123456789012:role/my-role' for tenant scoober.\"}", 200); tester.assertResponse(request("/application/v4/tenant/scoober", GET).roles(Role.reader(tenantName)), - (response) -> assertTrue(response.getBodyAsString().contains("\"archiveAccessRole\":\"arn:aws:iam::123456789012:role/my-role\"")), + (response) -> assertTrue(response.getBodyAsString().contains("\"awsRole\":\"arn:aws:iam::123456789012:role/my-role\"")), 200); - tester.assertResponse(request("/application/v4/tenant/scoober/archive-access", PUT) + tester.assertResponse(request("/application/v4/tenant/scoober/archive-access/aws", PUT) .data("{\"role\":\"arn:aws:iam::123456789012:role/my-role\"}").roles(Role.administrator(tenantName)), "{\"message\":\"AWS archive access role set to 'arn:aws:iam::123456789012:role/my-role' for tenant scoober.\"}", 200); tester.assertResponse(request("/application/v4/tenant/scoober", GET).roles(Role.reader(tenantName)), - (response) -> assertTrue(response.getBodyAsString().contains("\"archiveAccessRole\":\"arn:aws:iam::123456789012:role/my-role\"")), + (response) -> assertTrue(response.getBodyAsString().contains("\"awsRole\":\"arn:aws:iam::123456789012:role/my-role\"")), 200); tester.assertResponse(request("/application/v4/tenant/scoober/application/albums/environment/prod/region/aws-us-east-1c/instance/default", GET) .roles(Role.reader(tenantName)), new File("deployment-cloud.json")); - tester.assertResponse(request("/application/v4/tenant/scoober/archive-access", DELETE).roles(Role.administrator(tenantName)), + tester.assertResponse(request("/application/v4/tenant/scoober/archive-access/aws", DELETE).roles(Role.administrator(tenantName)), "{\"message\":\"AWS archive access role removed for tenant scoober.\"}", 200); tester.assertResponse(request("/application/v4/tenant/scoober", GET).roles(Role.reader(tenantName)), (response) -> assertFalse(response.getBodyAsString().contains("archiveAccessRole")), diff --git a/dist/vespa.spec b/dist/vespa.spec index 437daf3f4e2..2592e769359 100644 --- a/dist/vespa.spec +++ b/dist/vespa.spec @@ -586,6 +586,10 @@ exit 0 %systemd_postun_with_restart vespa-configserver.service %endif +%post base + +ln -sf %{_prefix}/var/tmp %{_prefix}/tmp + %postun base if [ $1 -eq 0 ]; then # this is an uninstallation rm -f /etc/profile.d/vespa.sh @@ -603,6 +607,10 @@ then mv %{_prefix}/conf/vespa/default-env.txt.rpmsave %{_prefix}/conf/vespa/default-env.txt fi fi +if test -L %{_prefix}/tmp +then + rm -f %{_prefix}/tmp +fi %files %if %{_defattr_is_vespa_vespa} @@ -690,8 +698,6 @@ fi %{_prefix}/man %{_prefix}/sbin %{_prefix}/share -%dir %attr(-,%{_vespa_user},%{_vespa_group}) %{_prefix}/tmp -%dir %attr(-,%{_vespa_user},%{_vespa_group}) %{_prefix}/tmp/vespa %dir %attr(-,%{_vespa_user},%{_vespa_group}) %{_prefix}/var %dir %attr(-,%{_vespa_user},%{_vespa_group}) %{_prefix}/var/crash %dir %attr(-,%{_vespa_user},%{_vespa_group}) %{_prefix}/var/db @@ -707,6 +713,8 @@ fi %dir %attr(-,%{_vespa_user},%{_vespa_group}) %{_prefix}/var/db/vespa/tmp %dir %attr(-,%{_vespa_user},%{_vespa_group}) %{_prefix}/var/jdisc_container %dir %attr(-,%{_vespa_user},%{_vespa_group}) %{_prefix}/var/run +%dir %attr(-,%{_vespa_user},%{_vespa_group}) %{_prefix}/var/tmp +%dir %attr(-,%{_vespa_user},%{_vespa_group}) %{_prefix}/var/tmp/vespa %dir %attr(-,%{_vespa_user},%{_vespa_group}) %{_prefix}/var/vespa %dir %attr(-,%{_vespa_user},%{_vespa_group}) %{_prefix}/var/vespa/application %dir %attr(-,%{_vespa_user},%{_vespa_group}) %{_prefix}/var/vespa/bundlecache diff --git a/hosted-api/src/main/java/ai/vespa/hosted/api/MultiPartStreamer.java b/hosted-api/src/main/java/ai/vespa/hosted/api/MultiPartStreamer.java index f1cbc027e17..b3862b76296 100644 --- a/hosted-api/src/main/java/ai/vespa/hosted/api/MultiPartStreamer.java +++ b/hosted-api/src/main/java/ai/vespa/hosted/api/MultiPartStreamer.java @@ -15,6 +15,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; import java.util.Collections; +import java.util.Enumeration; import java.util.List; import java.util.UUID; import java.util.function.Supplier; @@ -89,10 +90,12 @@ public class MultiPartStreamer { /** Returns an input stream which is an aggregate of all current parts in this, plus an end marker. */ public InputStream data() { - InputStream aggregate = new SequenceInputStream(Collections.enumeration(Stream.concat(streams.stream().map(Supplier::get), - Stream.of(end())) - .collect(Collectors.toList()))); - + InputStream aggregate = new SequenceInputStream(new Enumeration<>() { + final int j = streams.size(); + int i = -1; + @Override public boolean hasMoreElements() { return i < j; } + @Override public InputStream nextElement() { return ++i < j ? streams.get(i).get() : i == j ? end() : null; } + }); try { if (aggregate.skip(2) != 2)// This should never happen, as the first stream is a ByteArrayInputStream. throw new IllegalStateException("Failed skipping extraneous bytes."); @@ -113,17 +116,6 @@ public class MultiPartStreamer { return asStream(disposition(name) + (filename == null ? "" : "; filename=\"" + filename + "\"") + type(contentType)); } - /** Returns the separator to put between one part and the next, when this is a file. */ - private InputStream separator(String name, Path path) { - try { - String contentType = Files.probeContentType(path); - return separator(name, path.getFileName().toString(), contentType != null ? contentType : "application/octet-stream"); - } - catch (IOException e) { - throw new UncheckedIOException(e); - } - } - /** Returns the end delimiter of the request, with line breaks prepended. */ private InputStream end() { return asStream("\r\n--" + boundary + "--"); @@ -140,7 +132,7 @@ public class MultiPartStreamer { return "\r\nContent-Type: " + contentType + "\r\n\r\n"; } - /** Returns the a ByteArrayInputStream over the given string, UTF-8 encoded. */ + /** Returns a ByteArrayInputStream over the given string, UTF-8 encoded. */ private static InputStream asStream(String string) { return new ByteArrayInputStream(string.getBytes(StandardCharsets.UTF_8)); } diff --git a/http-client/pom.xml b/http-client/pom.xml index c396533d7b9..133da65631c 100644 --- a/http-client/pom.xml +++ b/http-client/pom.xml @@ -58,6 +58,12 @@ <version>${project.version}</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <scope>test</scope> + </dependency> + </dependencies> <build> diff --git a/http-client/src/main/java/ai/vespa/hosted/client/AbstractHttpClient.java b/http-client/src/main/java/ai/vespa/hosted/client/AbstractHttpClient.java index ed3fee101ed..48bbffc7e37 100644 --- a/http-client/src/main/java/ai/vespa/hosted/client/AbstractHttpClient.java +++ b/http-client/src/main/java/ai/vespa/hosted/client/AbstractHttpClient.java @@ -173,7 +173,7 @@ public abstract class AbstractHttpClient implements HttpClient { @Override public HttpClient.RequestBuilder body(byte[] json) { - return body(HttpEntities.create(json, ContentType.APPLICATION_JSON)); + return body(() -> HttpEntities.create(json, ContentType.APPLICATION_JSON)); } @Override diff --git a/http-client/src/main/java/ai/vespa/hosted/client/HttpClient.java b/http-client/src/main/java/ai/vespa/hosted/client/HttpClient.java index ea8328ed793..4da887f0cbb 100644 --- a/http-client/src/main/java/ai/vespa/hosted/client/HttpClient.java +++ b/http-client/src/main/java/ai/vespa/hosted/client/HttpClient.java @@ -78,12 +78,6 @@ public interface HttpClient extends Closeable { RequestBuilder body(byte[] json); /** Sets the request body. */ - default RequestBuilder body(HttpEntity entity) { - if (entity.isRepeatable()) return body(() -> entity); - throw new IllegalArgumentException("entitiy must be repeatable, or a supplier must be used"); - } - - /** Sets the request body. */ RequestBuilder body(Supplier<HttpEntity> entity); /** Sets query parameters without a value, like {@code ?debug&recursive}. */ diff --git a/logserver/bin/logserver-start.sh b/logserver/bin/logserver-start.sh index d37a2f31720..942120ceb21 100755 --- a/logserver/bin/logserver-start.sh +++ b/logserver/bin/logserver-start.sh @@ -81,7 +81,7 @@ cd ${VESPA_HOME} || { echo "Cannot cd to ${VESPA_HOME}" 1>&2; exit 1; } heap_min=32 heap_max=256 -addopts="-server -Xms${heap_min}m -Xmx${heap_max}m -XX:+PreserveFramePointer $(get_jvm_hugepage_settings $heap_max) -XX:CompressedClassSpaceSize=32m -XX:MaxDirectMemorySize=32m -XX:ThreadStackSize=448 -XX:MaxJavaStackTraceDepth=1000 -XX:ActiveProcessorCount=2 -XX:-OmitStackTraceInFastThrow -Djava.io.tmpdir=${VESPA_HOME}/tmp" +addopts="-server -Xms${heap_min}m -Xmx${heap_max}m -XX:+PreserveFramePointer $(get_jvm_hugepage_settings $heap_max) -XX:CompressedClassSpaceSize=32m -XX:MaxDirectMemorySize=32m -XX:ThreadStackSize=448 -XX:MaxJavaStackTraceDepth=1000 -XX:ActiveProcessorCount=2 -XX:-OmitStackTraceInFastThrow -Djava.io.tmpdir=${VESPA_HOME}/var/tmp" oomopt="-XX:+ExitOnOutOfMemoryError" diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/servicedump/VespaServiceDumperImpl.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/servicedump/VespaServiceDumperImpl.java index 3a0cd412a2e..43b4df7415e 100644 --- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/servicedump/VespaServiceDumperImpl.java +++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/servicedump/VespaServiceDumperImpl.java @@ -98,7 +98,7 @@ public class VespaServiceDumperImpl implements VespaServiceDumper { handleFailure(context, request, startedAt, "No artifacts requested"); return; } - ContainerPath directory = context.paths().underVespaHome("tmp/vespa-service-dump-" + request.getCreatedMillisOrNull()); + ContainerPath directory = context.paths().underVespaHome("var/tmp/vespa-service-dump-" + request.getCreatedMillisOrNull()); UnixPath unixPathDirectory = new UnixPath(directory); try { context.log(log, Level.INFO, diff --git a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/maintenance/servicedump/VespaServiceDumperImplTest.java b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/maintenance/servicedump/VespaServiceDumperImplTest.java index 081f0038e06..5366156cfbe 100644 --- a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/maintenance/servicedump/VespaServiceDumperImplTest.java +++ b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/maintenance/servicedump/VespaServiceDumperImplTest.java @@ -44,7 +44,7 @@ class VespaServiceDumperImplTest { private static final String HOSTNAME = "host-1.domain.tld"; private final FileSystem fileSystem = TestFileSystem.create(); - private final Path tmpDirectory = fileSystem.getPath("/data/vespa/storage/host-1/opt/vespa/tmp"); + private final Path tmpDirectory = fileSystem.getPath("/data/vespa/storage/host-1/opt/vespa/var/tmp"); @BeforeEach void create_tmp_directory() throws IOException { @@ -84,11 +84,11 @@ class VespaServiceDumperImplTest { verify(operations).executeCommandInContainer( context, context.users().vespa(), "/opt/vespa/libexec/vespa/find-pid", "default/container.1"); verify(operations).executeCommandInContainer( - context, context.users().vespa(), "perf", "record", "-g", "--output=/opt/vespa/tmp/vespa-service-dump-1600000000000/perf-record.bin", + context, context.users().vespa(), "perf", "record", "-g", "--output=/opt/vespa/var/tmp/vespa-service-dump-1600000000000/perf-record.bin", "--pid=12345", "sleep", "45"); verify(operations).executeCommandInContainer( - context, context.users().vespa(), "bash", "-c", "perf report --input=/opt/vespa/tmp/vespa-service-dump-1600000000000/perf-record.bin" + - " > /opt/vespa/tmp/vespa-service-dump-1600000000000/perf-report.txt"); + context, context.users().vespa(), "bash", "-c", "perf report --input=/opt/vespa/var/tmp/vespa-service-dump-1600000000000/perf-record.bin" + + " > /opt/vespa/var/tmp/vespa-service-dump-1600000000000/perf-report.txt"); String expectedJson = "{\"createdMillis\":1600000000000,\"startedAt\":1600001000000,\"completedAt\":1600001000000," + "\"location\":\"s3://uri-1/tenant1/service-dump/default-container-1-1600000000000/\"," + @@ -127,7 +127,7 @@ class VespaServiceDumperImplTest { context, context.users().vespa(), "/opt/vespa/libexec/vespa/find-pid", "default/container.1"); verify(operations).executeCommandInContainer( context, context.users().vespa(), "jcmd", "12345", "JFR.start", "name=host-admin", "path-to-gc-roots=true", "settings=profile", - "filename=/opt/vespa/tmp/vespa-service-dump-1600000000000/recording.jfr", "duration=30s"); + "filename=/opt/vespa/var/tmp/vespa-service-dump-1600000000000/recording.jfr", "duration=30s"); verify(operations).executeCommandInContainer(context, context.users().vespa(), "jcmd", "12345", "JFR.check", "name=host-admin"); String expectedJson = "{\"createdMillis\":1600000000000,\"startedAt\":1600001000000," + diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/Node.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/Node.java index 768036fd284..de1f9e65415 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/Node.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/Node.java @@ -118,7 +118,6 @@ public final class Node implements Nodelike { if (!ipConfig.pool().ipSet().isEmpty()) throw new IllegalArgumentException("A child node cannot have an IP address pool"); if (modelName.isPresent()) throw new IllegalArgumentException("A child node cannot have model name set"); if (switchHostname.isPresent()) throw new IllegalArgumentException("A child node cannot have switch hostname set"); - if (!cloudAccount.isEmpty()) throw new IllegalArgumentException("A child node cannot have cloud account set"); } if (type != NodeType.host && reservedTo.isPresent()) diff --git a/parent/pom.xml b/parent/pom.xml index 549d66e37a0..66fffa82b6e 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -495,61 +495,6 @@ <!-- No version property, as we don't want maven-dependency-plugin to alert about newer versions. --> <version>3.1.9</version> </dependency> - <dependency> <!-- Control netty-all version --> - <groupId>io.netty</groupId> - <artifactId>netty-all</artifactId> - <version>${netty.version}</version> - </dependency> - <dependency> - <groupId>io.netty</groupId> - <artifactId>netty-buffer</artifactId> - <version>${netty.version}</version> - </dependency> - <dependency> - <groupId>io.netty</groupId> - <artifactId>netty-common</artifactId> - <version>${netty.version}</version> - </dependency> - <dependency> - <groupId>io.netty</groupId> - <artifactId>netty-codec</artifactId> - <version>${netty.version}</version> - </dependency> - <dependency> - <groupId>io.netty</groupId> - <artifactId>netty-codec-http2</artifactId> - <version>${netty.version}</version> - </dependency> - <dependency> - <groupId>io.netty</groupId> - <artifactId>netty-codec-http</artifactId> - <version>${netty.version}</version> - </dependency> - <dependency> - <groupId>io.netty</groupId> - <artifactId>netty-transport</artifactId> - <version>${netty.version}</version> - </dependency> - <dependency> - <groupId>io.netty</groupId> - <artifactId>netty-transport-classes-epoll</artifactId> - <version>${netty.version}</version> - </dependency> - <dependency> <!-- Control netty-handler version --> - <groupId>io.netty</groupId> - <artifactId>netty-handler</artifactId> - <version>${netty.version}</version> - </dependency> - <dependency> <!-- Control netty-transport-native-epoll version --> - <groupId>io.netty</groupId> - <artifactId>netty-transport-native-epoll</artifactId> - <version>${netty.version}</version> - </dependency> - <dependency> <!-- Control netty-handler version --> - <groupId>io.netty</groupId> - <artifactId>netty-tcnative</artifactId> - <version>${netty-tcnative.version}</version> - </dependency> <dependency> <groupId>com.github.tomakehurst</groupId> <artifactId>wiremock-jre8-standalone</artifactId> @@ -1075,104 +1020,6 @@ <artifactId>json</artifactId> <version>${org.json.version}</version> </dependency> - <dependency> <!-- Due to hadoop-common pulling in 1.7.7 --> - <groupId>org.apache.avro</groupId> - <artifactId>avro</artifactId> - <version>${avro.version}</version> - </dependency> - <dependency> <!-- Due to hadoop-common pulling in 9.8.1 --> - <groupId>com.nimbusds</groupId> - <artifactId>nimbus-jose-jwt</artifactId> - <version>${nimbus.version}</version> - </dependency> - <dependency> <!-- Due to hadoop-common pulling in older version --> - <groupId>net.minidev</groupId> - <artifactId>json-smart</artifactId> - <version>${json-smart.version}</version> - </dependency> - <dependency> - <!-- Force fresh woodstox-core without security issue hadoop-3.3.4 --> - <groupId>com.fasterxml.woodstox</groupId> - <artifactId>woodstox-core</artifactId> - <version>${woodstox.version}</version> - </dependency> - <dependency> - <!-- Force fresh jersey-json without security issue hadoop-3.3.4 --> - <groupId>com.sun.jersey</groupId> - <artifactId>jersey-json</artifactId> - <version>${sun-jersey-json.version}</version> - </dependency> - <dependency> - <!-- Force fresh jettison without security issue hadoop-3.3.4 --> - <groupId>org.codehaus.jettison</groupId> - <artifactId>jettison</artifactId> - <version>${jettison.version}</version> - </dependency> - <dependency> - <!-- Transitive dependencies from pig-0.16 up-to-date --> - <groupId>tomcat</groupId> - <artifactId>jasper-runtime</artifactId> - <version>${tomcat-jasper.version}</version> - </dependency> - <dependency> - <!-- Transitive dependencies from pig-0.16 up-to-date --> - <groupId>tomcat</groupId> - <artifactId>jasper-compiler</artifactId> - <version>${tomcat-jasper.version}</version> - </dependency> - <!-- Hadoop dependencies --> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - <version>${hadoop.version}</version> - <exclusions> - <exclusion> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - </exclusion> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - <version>${hadoop.version}</version> - <exclusions> - <exclusion> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - </exclusion> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.pig</groupId> - <artifactId>pig</artifactId> - <version>${pig.version}</version> - <classifier>h2</classifier> - </dependency> - <dependency> - <!-- Hadoop test dependency --> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-minicluster</artifactId> - <version>${hadoop.version}</version> - <exclusions> - <exclusion> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - </exclusion> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - </exclusion> - </exclusions> - </dependency> </dependencies> </dependencyManagement> @@ -1188,7 +1035,6 @@ <!-- Athenz dependencies. Make sure these dependencies match those in Vespa's internal repositories --> <athenz.version>1.10.54</athenz.version> <!-- WARNING: sync cloud-tenant-base-dependencies-enforcer/pom.xml --> - <avro.version>1.11.1</avro.version> <aws-sdk.version>1.12.331</aws-sdk.version> <!-- Athenz END --> @@ -1207,15 +1053,11 @@ <felix.version>7.0.1</felix.version> <felix.log.version>1.0.1</felix.log.version> <findbugs.version>3.0.2</findbugs.version> <!-- Should be kept in sync with guava --> - <groovy.version>3.0.13</groovy.version> - <hadoop.version>3.3.4</hadoop.version> <hdrhistogram.version>2.1.12</hdrhistogram.version> - <jettison.version>1.5.1</jettison.version> <jetty.version>9.4.49.v20220914</jetty.version> <jetty-alpn.version>1.1.3.v20160715</jetty-alpn.version> <jjwt.version>0.11.2</jjwt.version> <jna.version>5.11.0</jna.version> - <json-smart.version>2.4.8</json-smart.version> <junit.version>5.8.1</junit.version> <maven-archiver.version>3.5.2</maven-archiver.version> <maven-assembly-plugin.version>3.3.0</maven-assembly-plugin.version> @@ -1236,21 +1078,14 @@ <maven-site-plugin.version>3.9.1</maven-site-plugin.version> <maven-source-plugin.version>3.2.1</maven-source-plugin.version> <mockito.version>4.0.0</mockito.version> - <netty.version>4.1.84.Final</netty.version> - <netty-tcnative.version>2.0.54.Final</netty-tcnative.version> - <nimbus.version>9.25.6</nimbus.version> <onnxruntime.version>1.12.1</onnxruntime.version> <!-- WARNING: sync cloud-tenant-base-dependencies-enforcer/pom.xml --> <org.json.version>20220320</org.json.version> <org.lz4.version>1.8.0</org.lz4.version> - <pig.version>0.16.0</pig.version> <prometheus.client.version>0.6.0</prometheus.client.version> <protobuf.version>3.21.7</protobuf.version> <spifly.version>1.3.5</spifly.version> - <sun-jersey-json.version>1.19.4</sun-jersey-json.version> <surefire.version>2.22.2</surefire.version> - <tomcat-jasper.version>5.5.23</tomcat-jasper.version> - <wiremock.version>2.34.0</wiremock.version> - <woodstox.version>6.4.0</woodstox.version> + <wiremock.version>2.35.0</wiremock.version> <zookeeper.client.version>3.8.0</zookeeper.client.version> <doclint>all</doclint> @@ -131,7 +131,6 @@ <module>vespa-feed-client</module> <module>vespa-feed-client-api</module> <module>vespa-feed-client-cli</module> - <module>vespa-hadoop</module> <module>vespa-maven-plugin</module> <module>vespa-osgi-testrunner</module> <module>vespa-testrunner-components</module> diff --git a/standalone-container/src/main/java/com/yahoo/container/standalone/StandaloneContainerApplication.java b/standalone-container/src/main/java/com/yahoo/container/standalone/StandaloneContainerApplication.java index a677c69cb79..f80cdbed900 100644 --- a/standalone-container/src/main/java/com/yahoo/container/standalone/StandaloneContainerApplication.java +++ b/standalone-container/src/main/java/com/yahoo/container/standalone/StandaloneContainerApplication.java @@ -67,7 +67,7 @@ public class StandaloneContainerApplication implements Application { public static final Named APPLICATION_PATH_NAME = Names.named(APPLICATION_LOCATION_INSTALL_VARIABLE); public static final Named CONFIG_MODEL_REPO_NAME = Names.named("ConfigModelRepo"); - private static final String DEFAULT_TMP_BASE_DIR = Defaults.getDefaults().underVespaHome("tmp"); + private static final String DEFAULT_TMP_BASE_DIR = Defaults.getDefaults().underVespaHome("var/tmp"); private static final String TMP_DIR_NAME = "standalone_container"; private static final StaticConfigDefinitionRepo configDefinitionRepo = new StaticConfigDefinitionRepo(); diff --git a/vespa-hadoop/OWNERS b/vespa-hadoop/OWNERS deleted file mode 100644 index 6b09ce48bd4..00000000000 --- a/vespa-hadoop/OWNERS +++ /dev/null @@ -1 +0,0 @@ -lesters diff --git a/vespa-hadoop/README b/vespa-hadoop/README deleted file mode 100644 index 1b567b88c1d..00000000000 --- a/vespa-hadoop/README +++ /dev/null @@ -1,4 +0,0 @@ -The Vespa Hadoop client. - -Contains APIs for feeding and querying Vespa from the grid. - diff --git a/vespa-hadoop/abi-spec.json b/vespa-hadoop/abi-spec.json deleted file mode 100644 index 9e26dfeeb6e..00000000000 --- a/vespa-hadoop/abi-spec.json +++ /dev/null @@ -1 +0,0 @@ -{}
\ No newline at end of file diff --git a/vespa-hadoop/pom.xml b/vespa-hadoop/pom.xml deleted file mode 100644 index 43f7c17967d..00000000000 --- a/vespa-hadoop/pom.xml +++ /dev/null @@ -1,166 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>com.yahoo.vespa</groupId> - <artifactId>parent</artifactId> - <version>8-SNAPSHOT</version> - <relativePath>../parent/pom.xml</relativePath> - </parent> - <artifactId>vespa-hadoop</artifactId> - <version>8-SNAPSHOT</version> - <name>${project.artifactId}</name> - <description>Integration tools between Vespa and Hadoop</description> - - <properties> - <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> - </properties> - - <dependencies> - <!-- Hadoop dependencies --> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.pig</groupId> - <artifactId>pig</artifactId> - <classifier>h2</classifier> - <scope>provided</scope> - </dependency> - - <!-- These are inherited from parent. Needed for correct versions on Hadoop. --> - <dependency> - <groupId>org.apache.httpcomponents</groupId> - <artifactId>httpclient</artifactId> - <scope>compile</scope> - </dependency> - <dependency> - <groupId>org.apache.httpcomponents</groupId> - <artifactId>httpcore</artifactId> - <scope>compile</scope> - </dependency> - - <!-- Test dependencies --> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-minicluster</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <!-- This is a HACK due to hadoop relying on mockito in NameNodeAdapter, but not providing it. Brum, brum !! --> - <groupId>org.mockito</groupId> - <artifactId>mockito-core</artifactId> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.junit.jupiter</groupId> - <artifactId>junit-jupiter</artifactId> - <scope>test</scope> - </dependency> - <!-- Vespa feeding dependencies --> - <dependency> - <groupId>com.yahoo.vespa</groupId> - <artifactId>vespa-feed-client</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>com.yahoo.vespa</groupId> - <artifactId>vespa-feed-client-api</artifactId> - <version>${project.version}</version> - </dependency> - - <!-- Jackson dependencies used in this module --> - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-core</artifactId> - </dependency> - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-databind</artifactId> - </dependency> - - </dependencies> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-shade-plugin</artifactId> - <executions> - <execution> - <phase>package</phase> - <goals> - <goal>shade</goal> - </goals> - <configuration> - <minimizeJar>false</minimizeJar> - - <relocations> - <relocation> - <pattern>com.google</pattern> - <shadedPattern>shaded.vespa</shadedPattern> - </relocation> - <relocation> - <pattern>org.bouncycastle</pattern> - <shadedPattern>shaded.vespa.bouncycastle</shadedPattern> - </relocation> - <relocation> - <pattern>commons-codec</pattern> - <shadedPattern>shaded.vespa</shadedPattern> - </relocation> - <relocation> - <pattern>commons-logging</pattern> - <shadedPattern>shaded.vespa</shadedPattern> - </relocation> - <relocation> - <pattern>org.apache</pattern> - <shadedPattern>shaded.vespa</shadedPattern> - <excludes> - <exclude>org.apache.hadoop.**</exclude> - <exclude>org.apache.pig.**</exclude> - </excludes> - </relocation> - <relocation> - <pattern>com.fasterxml</pattern> - <shadedPattern>shaded.vespa</shadedPattern> - </relocation> - <relocation> - <pattern>org.codehaus</pattern> - <shadedPattern>shaded.vespa</shadedPattern> - </relocation> - <relocation> - <pattern>io.airlift</pattern> - <shadedPattern>shaded.vespa</shadedPattern> - </relocation> - <relocation> - <pattern>com.ctc.wstx</pattern> - <shadedPattern>shaded.vespa</shadedPattern> - </relocation> - </relocations> - </configuration> - </execution> - </executions> - </plugin> - - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-compiler-plugin</artifactId> - <configuration> - <release>${vespaClients.jdk.releaseVersion}</release> - </configuration> - </plugin> - </plugins> - - </build> -</project> diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputCommitter.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputCommitter.java deleted file mode 100644 index 6cb4ef45a96..00000000000 --- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputCommitter.java +++ /dev/null @@ -1,37 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.hadoop.mapreduce; - -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.OutputCommitter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; - -import java.io.IOException; - -/** - * The output committer describes the commit task output for a Map-Reduce - * job. Not currently used, but is part of the Hadoop protocol since 2.7. - * - * @author lesters - */ -public class VespaOutputCommitter extends OutputCommitter { - @Override - public void setupJob(JobContext jobContext) throws IOException { - } - - @Override - public void setupTask(TaskAttemptContext taskAttemptContext) throws IOException { - } - - @Override - public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) throws IOException { - return false; - } - - @Override - public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException { - } - - @Override - public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException { - } -} diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java deleted file mode 100644 index e49a5e17970..00000000000 --- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java +++ /dev/null @@ -1,60 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.hadoop.mapreduce; - -import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration; -import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.OutputCommitter; -import org.apache.hadoop.mapreduce.OutputFormat; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; - -import java.io.IOException; -import java.util.Properties; -import java.util.logging.Logger; - -/** - * An output specification for writing to Vespa instances in a Map-Reduce job. - * Mainly returns an instance of a {@link VespaRecordWriter} that does the - * actual feeding to Vespa. - * - * @author lesters - */ -@SuppressWarnings("rawtypes") -public class VespaOutputFormat extends OutputFormat { - - private static final Logger log = Logger.getLogger(VespaOutputFormat.class.getName()); - - final Properties configOverride; - - public VespaOutputFormat() { - super(); - this.configOverride = null; - } - - public VespaOutputFormat(Properties configOverride) { - super(); - this.configOverride = configOverride; - } - - - @Override - @SuppressWarnings("deprecation") - public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException { - VespaCounters counters = VespaCounters.get(context); - VespaConfiguration configuration = VespaConfiguration.get(context.getConfiguration(), configOverride); - return new VespaRecordWriter(configuration, counters); - } - - - @Override - public OutputCommitter getOutputCommitter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { - return new VespaOutputCommitter(); - } - - - @Override - public void checkOutputSpecs(JobContext jobContext) throws IOException, InterruptedException { - } - -} diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java deleted file mode 100644 index c450d7cdeef..00000000000 --- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java +++ /dev/null @@ -1,153 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.hadoop.mapreduce; - -import ai.vespa.feed.client.FeedClient; -import ai.vespa.feed.client.FeedClientBuilder; -import ai.vespa.feed.client.JsonFeeder; -import ai.vespa.feed.client.OperationParseException; -import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration; -import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; - -import java.io.IOException; -import java.net.URI; -import java.time.Duration; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.ThreadLocalRandom; -import java.util.logging.Level; -import java.util.logging.Logger; - -import static java.util.stream.Collectors.toList; - -/** - * {@link VespaRecordWriter} sends the output <key, value> to one or more Vespa endpoints using vespa-feed-client. - * - * @author bjorncs - */ -public class VespaRecordWriter extends RecordWriter<Object, Object> { - - private final static Logger log = Logger.getLogger(VespaRecordWriter.class.getCanonicalName()); - - private final VespaCounters counters; - private final VespaConfiguration config; - - private boolean initialized = false; - private JsonFeeder feeder; - - protected VespaRecordWriter(VespaConfiguration config, VespaCounters counters) { - this.counters = counters; - this.config = config; - } - - @Override - public void write(Object key, Object data) throws IOException { - initializeOnFirstWrite(); - String json = data.toString().trim(); - feeder.feedSingle(json) - .whenComplete((result, error) -> { - if (error != null) { - if (error instanceof OperationParseException) { - counters.incrementDocumentsSkipped(1); - } else { - String msg = "Failed to feed single document: " + error; - log.log(Level.WARNING, msg, error); - counters.incrementDocumentsFailed(1); - } - } else { - counters.incrementDocumentsOk(1); - } - }); - counters.incrementDocumentsSent(1); - if (counters.getDocumentsSent() % config.progressInterval() == 0) { - String progress = String.format("Feed progress: %d / %d / %d / %d (sent, ok, failed, skipped)", - counters.getDocumentsSent(), - counters.getDocumentsOk(), - counters.getDocumentsFailed(), - counters.getDocumentsSkipped()); - log.info(progress); - } - } - - @Override - public void close(TaskAttemptContext context) throws IOException { - if (feeder != null) { - feeder.close(); - feeder = null; - initialized = false; - } - } - - /** Override method to alter {@link FeedClient} configuration */ - protected void onFeedClientInitialization(FeedClientBuilder builder) {} - - private void initializeOnFirstWrite() { - if (initialized) return; - useRandomizedStartupDelayIfEnabled(); - feeder = createJsonStreamFeeder(); - initialized = true; - } - - private void useRandomizedStartupDelayIfEnabled() { - if (!config.dryrun() && config.randomStartupSleepMs() > 0) { - int delay = ThreadLocalRandom.current().nextInt(config.randomStartupSleepMs()); - log.info("Delaying startup by " + delay + " ms"); - try { - Thread.sleep(delay); - } catch (Exception e) {} - } - } - - - private JsonFeeder createJsonStreamFeeder() { - FeedClient feedClient = createFeedClient(); - JsonFeeder.Builder builder = JsonFeeder.builder(feedClient) - .withTimeout(Duration.ofMinutes(10)); - if (config.route() != null) { - builder.withRoute(config.route()); - } - return builder.build(); - - } - - private FeedClient createFeedClient() { - List<URI> endpoints = endpointUris(config); - log.info("Using endpoints " + endpoints); - int streamsPerConnection = streamsPerConnection(config); - log.log(Level.INFO, "Using {0} max streams per connection", new Object[] {streamsPerConnection}); - log.log(Level.INFO, "Using {0} connections", new Object[] {config.numConnections()}); - FeedClientBuilder feedClientBuilder = FeedClientBuilder.create(endpoints) - .setConnectionsPerEndpoint(config.numConnections()) - .setMaxStreamPerConnection(streamsPerConnection) - .setDryrun(config.dryrun()) - .setRetryStrategy(retryStrategy(config)); - if (config.proxyHost() != null) { - URI proxyUri = URI.create(String.format( - "%s://%s:%d", config.proxyScheme(), config.proxyHost(), config.proxyPort())); - log.info("Using proxy " + proxyUri); - feedClientBuilder.setProxy(proxyUri); - } - - onFeedClientInitialization(feedClientBuilder); - return feedClientBuilder.build(); - } - - private static FeedClient.RetryStrategy retryStrategy(VespaConfiguration config) { - int maxRetries = config.numRetries(); - return new FeedClient.RetryStrategy() { - @Override public int retries() { return maxRetries; } - }; - } - - private static int streamsPerConnection(VespaConfiguration config) { - return Math.min(256, config.maxInFlightRequests() / config.numConnections()); - } - - private static List<URI> endpointUris(VespaConfiguration config) { - String scheme = config.useSSL().orElse(true) ? "https" : "http"; - return Arrays.stream(config.endpoint().split(",")) - .map(hostname -> URI.create(String.format("%s://%s:%d/", scheme, hostname, config.defaultPort()))) - .collect(toList()); - } -} diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaSimpleJsonInputFormat.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaSimpleJsonInputFormat.java deleted file mode 100644 index f9bcba96a69..00000000000 --- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaSimpleJsonInputFormat.java +++ /dev/null @@ -1,99 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.hadoop.mapreduce; - -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.JsonFactoryBuilder; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonToken; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.input.FileSplit; - -import java.io.BufferedInputStream; -import java.io.IOException; - -/** - * Simple JSON reader which splits the input file along JSON object boundaries. - * - * There are two cases handled here: - * 1. Each line contains a JSON object, i.e. { ... } - * 2. The file contains an array of objects with arbitrary line breaks, i.e. [ {...}, {...} ] - * - * Not suitable for cases where you want to extract objects from some other arbitrary structure. - * - * TODO: Support config which points to a array in the JSON as start point for object extraction, - * ala how it is done in VespaHttpClient.parseResultJson, i.e. support rootNode config. - * - * @author lesters - */ -public class VespaSimpleJsonInputFormat extends FileInputFormat<Text, NullWritable> { - - @Override - public RecordReader<Text, NullWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { - return new VespaJsonRecordReader(); - } - - public static class VespaJsonRecordReader extends RecordReader<Text, NullWritable> { - private long remaining; - private JsonParser parser; - private Text currentKey; - private NullWritable currentValue = NullWritable.get(); - - @Override - public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { - FileSplit fileSplit = (FileSplit) split; - FSDataInputStream stream = FileSystem.get(context.getConfiguration()).open(fileSplit.getPath()); - if (fileSplit.getStart() != 0) { - stream.seek(fileSplit.getStart()); - } - - remaining = fileSplit.getLength(); - JsonFactory factory = new JsonFactoryBuilder().disable(JsonFactory.Feature.CANONICALIZE_FIELD_NAMES).build(); - parser = factory.createParser(new BufferedInputStream(stream)); - parser.setCodec(new ObjectMapper()); - parser.nextToken(); - if (parser.currentToken() == JsonToken.START_ARRAY) { - parser.nextToken(); - } - } - - @Override - public boolean nextKeyValue() throws IOException, InterruptedException { - if (parser.currentToken() != JsonToken.START_OBJECT) { - return true; - } - currentKey = new Text(parser.readValueAsTree().toString()); - parser.nextToken(); - return false; - } - - @Override - public Text getCurrentKey() throws IOException, InterruptedException { - return currentKey; - } - - @Override - public NullWritable getCurrentValue() throws IOException, InterruptedException { - return currentValue; - } - - @Override - public float getProgress() throws IOException, InterruptedException { - return parser.getCurrentLocation().getByteOffset() / remaining; - } - - @Override - public void close() throws IOException { - parser.close(); - } - } - -} - diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/package-info.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/package-info.java deleted file mode 100644 index 22a742566cd..00000000000 --- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/package-info.java +++ /dev/null @@ -1,10 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -/** - * com.yahoo.vespa.hadoop.mapreduce contains classes and utilities - * to enable feeding directly to Vespa endpoints from mapreduce. - * It is a minimal layer over the Vespa HTTP client. - * - * NOTE: This is a PUBLIC API, but not annotated as such because this is not a bundle and - * we don't want to introduce Vespa dependencies. - */ -package com.yahoo.vespa.hadoop.mapreduce; diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/TupleTools.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/TupleTools.java deleted file mode 100644 index 5147dc3496c..00000000000 --- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/TupleTools.java +++ /dev/null @@ -1,70 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.hadoop.mapreduce.util; - -import org.apache.pig.ResourceSchema; -import org.apache.pig.data.Tuple; -import org.apache.pig.impl.logicalLayer.schema.Schema; - -import java.io.IOException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -public class TupleTools { - - private static final Pattern pattern = Pattern.compile("<([\\w]+)>"); - - public static Map<String, Object> tupleMap(Schema schema, Tuple tuple) throws IOException { - Map<String, Object> tupleMap = new HashMap<>((int)Math.ceil(tuple.size() / 0.75) + 1); - List<Schema.FieldSchema> schemas = schema.getFields(); - for (int i = 0; i < schemas.size(); i++) { - Schema.FieldSchema field = schemas.get(i); - String alias = field.alias; - Object value = tuple.get(i); - if (value != null) { - tupleMap.put(alias, value); - } - } - return tupleMap; - } - - public static Map<String, Object> tupleMap(ResourceSchema schema, Tuple tuple) throws IOException { - Map<String, Object> tupleMap = new HashMap<>((int)Math.ceil(tuple.size() / 0.75) + 1); - ResourceSchema.ResourceFieldSchema[] schemas = schema.getFields(); - for (int i = 0; i < schemas.length; i++) { - ResourceSchema.ResourceFieldSchema field = schemas[i]; - String alias = field.getName(); - Object value = tuple.get(i); - if (value != null) { - tupleMap.put(alias, value); - } - } - return tupleMap; - } - - public static String toString(Schema schema, Tuple tuple, String template) throws IOException { - return toString(tupleMap(schema, tuple), template); - } - - public static String toString(Map<String,Object> fields, String template) { - if (template == null || template.length() == 0) { - return template; - } - if (fields == null || fields.size() == 0) { - return template; - } - - Matcher m = pattern.matcher(template); - StringBuffer sb = new StringBuffer(); - while (m.find()) { - Object value = fields.get(m.group(1)); - String replacement = value != null ? value.toString() : m.group(0); - m.appendReplacement(sb, Matcher.quoteReplacement(replacement)); - } - m.appendTail(sb); - return sb.toString(); - } - -} diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java deleted file mode 100644 index ae0b6a58155..00000000000 --- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java +++ /dev/null @@ -1,194 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.hadoop.mapreduce.util; - -import org.apache.hadoop.conf.Configuration; - -import java.io.IOException; -import java.io.StringReader; -import java.util.Optional; -import java.util.Properties; - -public class VespaConfiguration { - - public static final String ENDPOINT = "vespa.feed.endpoint"; - public static final String DEFAULT_PORT = "vespa.feed.defaultport"; - public static final String USE_SSL = "vespa.feed.ssl"; - public static final String PROXY_HOST = "vespa.feed.proxy.host"; - public static final String PROXY_PORT = "vespa.feed.proxy.port"; - public static final String PROXY_SCHEME = "vespa.feed.proxy.scheme"; - public static final String DRYRUN = "vespa.feed.dryrun"; - public static final String USE_COMPRESSION = "vespa.feed.usecompression"; - public static final String PROGRESS_REPORT = "vespa.feed.progress.interval"; - public static final String CONNECTIONS = "vespa.feed.connections"; - public static final String THROTTLER_MIN_SIZE = "vespa.feed.throttler.min.size"; - public static final String QUERY_CONNECTION_TIMEOUT = "vespa.query.connection.timeout"; - public static final String ROUTE = "vespa.feed.route"; - public static final String MAX_SLEEP_TIME_MS = "vespa.feed.max.sleep.time.ms"; - public static final String MAX_IN_FLIGHT_REQUESTS = "vespa.feed.max.in.flight.requests"; - public static final String RANDOM_STARTUP_SLEEP = "vespa.feed.random.startup.sleep.ms"; - public static final String NUM_RETRIES = "vespa.feed.num.retries"; - - private final Configuration conf; - private final Properties override; - - private VespaConfiguration(Configuration conf, Properties override) { - this.conf = conf; - this.override = override; - } - - - public static VespaConfiguration get(Configuration conf, Properties override) { - return new VespaConfiguration(conf, override); - } - - - public String endpoint() { - return getString(ENDPOINT); - } - - - public int defaultPort() { - return getInt(DEFAULT_PORT, 4080); - } - - - public Optional<Boolean> useSSL() { - String raw = getString(USE_SSL); - if (raw == null || raw.trim().isEmpty()) return Optional.empty(); - return Optional.of(Boolean.parseBoolean(raw)); - } - - - public String proxyHost() { - return getString(PROXY_HOST); - } - - - public int proxyPort() { - return getInt(PROXY_PORT, 4080); - } - - - public String proxyScheme() { - String raw = getString(PROXY_SCHEME); - if (raw == null) return "http"; - return raw; - } - - - public boolean dryrun() { - return getBoolean(DRYRUN, false); - } - - - public boolean useCompression() { - return getBoolean(USE_COMPRESSION, true); - } - - - public int numConnections() { - return getInt(CONNECTIONS, 1); - } - - - public int throttlerMinSize() { - return getInt(THROTTLER_MIN_SIZE, 0); - } - - - public int queryConnectionTimeout() { - return getInt(QUERY_CONNECTION_TIMEOUT, 10000); - } - - - public String route() { - return getString(ROUTE); - } - - - public int maxSleepTimeMs() { - return getInt(MAX_SLEEP_TIME_MS, 10000); - } - - - public int maxInFlightRequests() { - return getInt(MAX_IN_FLIGHT_REQUESTS, 500); - } - - - public int randomStartupSleepMs() { - return getInt(RANDOM_STARTUP_SLEEP, 30000); - } - - - public int numRetries() { - return getInt(NUM_RETRIES, 100); - } - - - public int progressInterval() { - return getInt(PROGRESS_REPORT, 1000); - } - - public String getString(String name) { - if (override != null && override.containsKey(name)) { - return override.getProperty(name); - } - return conf != null ? conf.get(name) : null; - } - - - public int getInt(String name, int defaultValue) { - if (override != null && override.containsKey(name)) { - return Integer.parseInt(override.getProperty(name)); - } - return conf != null ? conf.getInt(name, defaultValue) : defaultValue; - } - - - public boolean getBoolean(String name, boolean defaultValue) { - if (override != null && override.containsKey(name)) { - return Boolean.parseBoolean(override.getProperty(name)); - } - return conf != null ? conf.getBoolean(name, defaultValue) : defaultValue; - - } - - public static Properties loadProperties(String... params) { - Properties properties = new Properties(); - if (params != null) { - for (String s : params) { - try { - properties.load(new StringReader(s)); - } catch (IOException e) { - throw new IllegalArgumentException(e); - } - } - } - return properties; - } - - - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append(ENDPOINT + ": " + endpoint() + "\n"); - sb.append(DEFAULT_PORT + ": " + defaultPort() + "\n"); - sb.append(USE_SSL + ": " + useSSL().map(Object::toString).orElse("<empty>") + "\n"); - sb.append(PROXY_HOST + ": " + proxyHost() + "\n"); - sb.append(PROXY_PORT + ": " + proxyPort() + "\n"); - sb.append(PROXY_SCHEME + ": " + proxyScheme() + "\n"); - sb.append(DRYRUN + ": " + dryrun() +"\n"); - sb.append(USE_COMPRESSION + ": " + useCompression() +"\n"); - sb.append(PROGRESS_REPORT + ": " + progressInterval() +"\n"); - sb.append(CONNECTIONS + ": " + numConnections() +"\n"); - sb.append(THROTTLER_MIN_SIZE + ": " + throttlerMinSize() +"\n"); - sb.append(QUERY_CONNECTION_TIMEOUT + ": " + queryConnectionTimeout() +"\n"); - sb.append(ROUTE + ": " + route() +"\n"); - sb.append(MAX_SLEEP_TIME_MS + ": " + maxSleepTimeMs() +"\n"); - sb.append(MAX_IN_FLIGHT_REQUESTS + ": " + maxInFlightRequests() +"\n"); - sb.append(RANDOM_STARTUP_SLEEP + ": " + randomStartupSleepMs() +"\n"); - sb.append(NUM_RETRIES + ": " + numRetries() +"\n"); - return sb.toString(); - } - -} diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaCounters.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaCounters.java deleted file mode 100644 index 63b4b6600fd..00000000000 --- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaCounters.java +++ /dev/null @@ -1,105 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.hadoop.mapreduce.util; - -import org.apache.hadoop.mapreduce.Counter; -import org.apache.hadoop.mapreduce.Counters; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.TaskAttemptContext; - -import java.io.IOException; - -public class VespaCounters { - - public static final String GROUP = "Vespa Feed Counters"; - public static final String DOCS_OK = "Documents ok"; - public static final String DOCS_SENT = "Documents sent"; - public static final String DOCS_FAILED = "Documents failed"; - public static final String DOCS_SKIPPED = "Documents skipped"; - - private final Counter documentsSent; - private final Counter documentsOk; - private final Counter documentsFailed; - private final Counter documentsSkipped; - - - private VespaCounters(Job job) throws IOException { - Counters counters = job.getCounters(); - documentsSent = counters.findCounter(GROUP, DOCS_SENT); - documentsOk = counters.findCounter(GROUP, DOCS_OK); - documentsFailed = counters.findCounter(GROUP, DOCS_FAILED); - documentsSkipped = counters.findCounter(GROUP, DOCS_SKIPPED); - } - - - private VespaCounters(TaskAttemptContext context) { - documentsSent = context.getCounter(GROUP, DOCS_SENT); - documentsOk = context.getCounter(GROUP, DOCS_OK); - documentsFailed = context.getCounter(GROUP, DOCS_FAILED); - documentsSkipped = context.getCounter(GROUP, DOCS_SKIPPED); - } - - - private VespaCounters(org.apache.hadoop.mapred.Counters counters) { - documentsSent = counters.findCounter(GROUP, DOCS_SENT); - documentsOk = counters.findCounter(GROUP, DOCS_OK); - documentsFailed = counters.findCounter(GROUP, DOCS_FAILED); - documentsSkipped = counters.findCounter(GROUP, DOCS_SKIPPED); - } - - - public static VespaCounters get(Job job) throws IOException { - return new VespaCounters(job); - } - - - public static VespaCounters get(TaskAttemptContext context) { - return new VespaCounters(context); - } - - - public static VespaCounters get(org.apache.hadoop.mapred.Counters counters) { - return new VespaCounters(counters); - - } - - - public long getDocumentsSent() { - return documentsSent.getValue(); - } - - - public void incrementDocumentsSent(long incr) { - documentsSent.increment(incr); - } - - - public long getDocumentsOk() { - return documentsOk.getValue(); - } - - - public void incrementDocumentsOk(long incr) { - documentsOk.increment(incr); - } - - - public long getDocumentsFailed() { - return documentsFailed.getValue(); - } - - - public void incrementDocumentsFailed(long incr) { - documentsFailed.increment(incr); - } - - - public long getDocumentsSkipped() { - return documentsSkipped.getValue(); - } - - - public void incrementDocumentsSkipped(long incr) { - documentsSkipped.increment(incr); - } - -} diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaHttpClient.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaHttpClient.java deleted file mode 100644 index c7ed52a01c0..00000000000 --- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaHttpClient.java +++ /dev/null @@ -1,102 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.hadoop.mapreduce.util; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.http.HttpEntity; -import org.apache.http.HttpHost; -import org.apache.http.HttpResponse; -import org.apache.http.client.HttpClient; -import org.apache.http.client.config.RequestConfig; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.impl.client.HttpClientBuilder; -import org.apache.http.util.EntityUtils; - -import java.io.IOException; -import java.io.InputStream; -import java.util.Scanner; - -public class VespaHttpClient { - - private final HttpClient httpClient; - - public VespaHttpClient() { - this(null); - } - - public VespaHttpClient(VespaConfiguration configuration) { - httpClient = createClient(configuration); - } - - public String get(String url) throws IOException { - HttpGet httpGet = new HttpGet(url); - HttpResponse httpResponse = httpClient.execute(httpGet); - - HttpEntity entity = httpResponse.getEntity(); - InputStream is = entity.getContent(); - - String result = ""; - Scanner scanner = new Scanner(is, "UTF-8").useDelimiter("\\A"); - if (scanner.hasNext()) { - result = scanner.next(); - } - EntityUtils.consume(entity); - - if (httpResponse.getStatusLine().getStatusCode() != 200) { - return null; - } - - return result; - } - - public JsonNode parseResultJson(String json, String rootNode) throws IOException { - if (json == null || json.isEmpty()) { - return null; - } - if (rootNode == null || rootNode.isEmpty()) { - return null; - } - - ObjectMapper m = new ObjectMapper(); - JsonNode node = m.readTree(json); - if (node != null) { - String[] path = rootNode.split("/"); - for (String p : path) { - node = node.get(p); - - if (node == null) { - return null; - } - - // if node is an array, return the first node that has the correct path - if (node.isArray()) { - for (int i = 0; i < node.size(); ++i) { - JsonNode n = node.get(i); - if (n.has(p)) { - node = n; - break; - } - } - } - - } - } - return node; - } - - private HttpClient createClient(VespaConfiguration configuration) { - HttpClientBuilder clientBuilder = HttpClientBuilder.create(); - - RequestConfig.Builder requestConfigBuilder = RequestConfig.custom(); - if (configuration != null) { - requestConfigBuilder.setSocketTimeout(configuration.queryConnectionTimeout()); - requestConfigBuilder.setConnectTimeout(configuration.queryConnectionTimeout()); - if (configuration.proxyHost() != null) { - requestConfigBuilder.setProxy(new HttpHost(configuration.proxyHost(), configuration.proxyPort())); - } - } - clientBuilder.setDefaultRequestConfig(requestConfigBuilder.build()); - return clientBuilder.build(); - } - -} diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaQuerySchema.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaQuerySchema.java deleted file mode 100644 index cfaff44addb..00000000000 --- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaQuerySchema.java +++ /dev/null @@ -1,114 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.hadoop.mapreduce.util; - -import com.fasterxml.jackson.databind.JsonNode; -import org.apache.pig.data.DataType; -import org.apache.pig.data.Tuple; -import org.apache.pig.data.TupleFactory; -import org.apache.pig.impl.logicalLayer.schema.Schema; -import org.apache.pig.impl.util.Utils; -import org.apache.pig.parser.ParserException; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; - -public class VespaQuerySchema implements Iterable<VespaQuerySchema.AliasTypePair> { - - private final List<AliasTypePair> tupleSchema = new ArrayList<>(); - - public VespaQuerySchema(String schema) { - for (String e : schema.split(",")) { - String[] pair = e.split(":"); - String alias = pair[0].trim(); - String type = pair[1].trim(); - tupleSchema.add(new AliasTypePair(alias, type)); - } - } - - public Tuple buildTuple(int rank, JsonNode hit) { - Tuple tuple = TupleFactory.getInstance().newTuple(); - - for (VespaQuerySchema.AliasTypePair tupleElement : tupleSchema) { - String alias = tupleElement.getAlias(); - Byte type = DataType.findTypeByName(tupleElement.getType()); - - // reserved word - if ("rank".equals(alias)) { - tuple.append(rank); - } else { - JsonNode field = hit; - String[] path = alias.split("/"); // move outside - for (String p : path) { - field = field.get(p); - if (field == null) { - type = DataType.NULL; // effectively skip field as it is not found - break; - } - } - switch (type) { - case DataType.BOOLEAN: - tuple.append(field.asBoolean()); - break; - case DataType.INTEGER: - tuple.append(field.asInt()); - break; - case DataType.LONG: - tuple.append(field.asLong()); - break; - case DataType.FLOAT: - case DataType.DOUBLE: - tuple.append(field.asDouble()); - break; - case DataType.DATETIME: - tuple.append(field.asText()); - break; - case DataType.CHARARRAY: - tuple.append(field.asText()); - break; - default: - // the rest of the data types are currently not supported - } - } - } - return tuple; - } - - public static Schema getPigSchema(String schemaString) { - Schema schema = null; - schemaString = schemaString.replace("/", "_"); - schemaString = "{(" + schemaString + ")}"; - try { - schema = Utils.getSchemaFromString(schemaString); - } catch (ParserException e) { - e.printStackTrace(); - } - return schema; - } - - @Override - public Iterator<AliasTypePair> iterator() { - return tupleSchema.iterator(); - } - - - public static class AliasTypePair { - private final String alias; - private final String type; - - AliasTypePair(String alias, String type) { - this.alias = alias; - this.type = type; - } - - public String getAlias() { - return alias; - } - - public String getType() { - return type; - } - - } - -} diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/package-info.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/package-info.java deleted file mode 100644 index 41c621d2877..00000000000 --- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/package-info.java +++ /dev/null @@ -1,10 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -/** - * com.yahoo.vespa.hadoop contains classes and utilities - * to enable feeding directly to Vespa endpoints from pig and mapreduce. - * It is a minimal layer over the Vespa HTTP client. - * - * NOTE: This is a PUBLIC API, but not annotated as such because this is not a bundle and - * we don't want to introduce Vespa dependencies. - */ -package com.yahoo.vespa.hadoop; diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java deleted file mode 100644 index c95aa02215f..00000000000 --- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java +++ /dev/null @@ -1,669 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.hadoop.pig; - -import com.fasterxml.jackson.core.JsonEncoding; -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.JsonGenerator; -import com.yahoo.vespa.hadoop.mapreduce.util.TupleTools; -import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration; -import org.apache.pig.EvalFunc; -import org.apache.pig.PigWarning; -import org.apache.pig.data.DataBag; -import org.apache.pig.data.DataByteArray; -import org.apache.pig.data.DataType; -import org.apache.pig.data.Tuple; -import org.apache.pig.tools.pigstats.PigStatusReporter; -import org.apache.pig.impl.logicalLayer.schema.Schema; -import org.joda.time.DateTime; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.PrintWriter; -import java.io.StringWriter; -import java.io.UncheckedIOException; -import java.math.BigDecimal; -import java.math.BigInteger; -import java.util.*; - -/** - * A Pig UDF to convert simple Pig types into a valid Vespa JSON document format. - * - * @author lesters - */ -public class VespaDocumentOperation extends EvalFunc<String> { - - public enum Operation { - DOCUMENT, - PUT, - ID, - REMOVE, - UPDATE; - - @Override - public String toString() { - return super.toString().toLowerCase(); - } - - public static Operation fromString(String text) { - for (Operation op : Operation.values()) { - if (op.toString().equalsIgnoreCase(text)) { - return op; - } - } - throw new IllegalArgumentException("Unknown operation: " + text); - } - - public static boolean valid(String text) { - for (Operation op : Operation.values()) { - if (op.toString().equalsIgnoreCase(text)) { - return true; - } - } - return false; - } - - } - - private static final String PROPERTY_CREATE_IF_NON_EXISTENT = "create-if-non-existent"; - private static final String PROPERTY_ID_TEMPLATE = "docid"; - private static final String PROPERTY_OPERATION = "operation"; - private static final String PROPERTY_VERBOSE = "verbose"; - private static final String BAG_AS_MAP_FIELDS = "bag-as-map-fields"; - private static final String SIMPLE_ARRAY_FIELDS = "simple-array-fields"; - private static final String SIMPLE_OBJECT_FIELDS = "simple-object-fields"; - private static final String CREATE_TENSOR_FIELDS = "create-tensor-fields"; - private static final String REMOVE_TENSOR_FIELDS = "remove-tensor-fields"; - private static final String UPDATE_TENSOR_FIELDS = "update-tensor-fields"; - private static final String REMOVE_MAP_FIELDS = "remove-map-fields"; - private static final String UPDATE_MAP_FIELDS = "update-map-fields"; - private static final String EXCLUDE_FIELDS = "exclude-fields"; - private static final String TESTSET_CONDITION = "condition"; - private static final String PARTIAL_UPDATE_ASSIGN = "assign"; - private static final String PARTIAL_UPDATE_ADD = "add"; - private static final String PARTIAL_UPDATE_REMOVE = "remove"; - - private static Map<String, String> mapPartialOperationMap; - - static { - mapPartialOperationMap = new HashMap<>(); - mapPartialOperationMap.put(REMOVE_MAP_FIELDS, PARTIAL_UPDATE_REMOVE); - mapPartialOperationMap.put(UPDATE_MAP_FIELDS, PARTIAL_UPDATE_ASSIGN); - } - - private static Map<String, String> partialOperationMap; - - static { - partialOperationMap = new HashMap<>(); - partialOperationMap.put(REMOVE_TENSOR_FIELDS, PARTIAL_UPDATE_REMOVE); - partialOperationMap.put(UPDATE_TENSOR_FIELDS, PARTIAL_UPDATE_ADD); - partialOperationMap.put(REMOVE_MAP_FIELDS, PARTIAL_UPDATE_REMOVE); - partialOperationMap.put(UPDATE_MAP_FIELDS, PARTIAL_UPDATE_ASSIGN); - } - - private final boolean verbose; - private final String template; - private final Operation operation; - private final Properties properties; - private PigStatusReporter statusReporter; - - public VespaDocumentOperation(String... params) { - statusReporter = PigStatusReporter.getInstance(); - if (statusReporter != null) { - statusReporter.incrCounter("Vespa Document Operation Counters", "Document operation ok", 0); - statusReporter.incrCounter("Vespa Document Operation Counters", "Document operation failed", 0); - } - properties = VespaConfiguration.loadProperties(params); - template = properties.getProperty(PROPERTY_ID_TEMPLATE); - operation = Operation.fromString(properties.getProperty(PROPERTY_OPERATION, "put")); - verbose = Boolean.parseBoolean(properties.getProperty(PROPERTY_VERBOSE, "false")); - } - - @Override - public String exec(Tuple tuple) throws IOException { - if (tuple == null || tuple.size() == 0) { - if (statusReporter != null) { - statusReporter.incrCounter("Vespa Document Operation Counters", "Document operation failed", 1); - } - return null; - } - if (template == null || template.length() == 0) { - if (statusReporter != null) { - statusReporter.incrCounter("Vespa Document Operation Counters", "Document operation failed", 1); - } - warnLog("No valid document id template found. Skipping.", PigWarning.UDF_WARNING_1); - return null; - } - if (operation == null) { - warnLog("No valid operation found. Skipping.", PigWarning.UDF_WARNING_2); - return null; - } - - String json = null; - - try { - if (reporter != null) { - reporter.progress(); - } - - Schema inputSchema = getInputSchema(); - Map<String, Object> fields = TupleTools.tupleMap(inputSchema, tuple); - String docId = TupleTools.toString(fields, template); - if (verbose) { - System.out.println("Processing docId: "+ docId); - } - // create json - json = create(operation, docId, fields, properties, inputSchema); - if (json == null || json.length() == 0) { - warnLog("No valid document operation could be created.", PigWarning.UDF_WARNING_3); - return null; - } - - - } catch (Exception e) { - if (statusReporter != null) { - statusReporter.incrCounter("Vespa Document Operation Counters", "Document operation failed", 1); - } - StringBuilder sb = new StringBuilder(); - sb.append("Caught exception processing input row: \n"); - sb.append(tuple.toString()); - sb.append("\nException: "); - sb.append(getStackTraceAsString(e)); - warnLog(sb.toString(), PigWarning.UDF_WARNING_4); - return null; - } - if (statusReporter != null) { - statusReporter.incrCounter("Vespa Document Operation Counters", "Document operation ok", 1); - } - return json; - } - - - /** - * Create a JSON Vespa document operation given the supplied fields, - * operation and document id template. - * - * @param op Operation (put, remove, update) - * @param docId Document id - * @param fields Fields to put in document operation - * @return A valid JSON Vespa document operation - * @throws IOException ... - */ - public static String create(Operation op, String docId, Map<String, Object> fields, Properties properties, - Schema schema) throws IOException { - if (op == null) { - return null; - } - if (docId == null || docId.length() == 0) { - return null; - } - if (fields.isEmpty()) { - return null; - } - - // create json format - ByteArrayOutputStream out = new ByteArrayOutputStream(); - JsonGenerator g = new JsonFactory().createGenerator(out, JsonEncoding.UTF8); - g.writeStartObject(); - - g.writeStringField(op.toString(), docId); - - boolean createIfNonExistent = Boolean.parseBoolean(properties.getProperty(PROPERTY_CREATE_IF_NON_EXISTENT, "false")); - if (op == Operation.UPDATE && createIfNonExistent) { - writeField("create", true, DataType.BOOLEAN, g, properties, schema, op, 0); - } - String testSetConditionTemplate = properties.getProperty(TESTSET_CONDITION); - if (testSetConditionTemplate != null) { - String testSetCondition = TupleTools.toString(fields, testSetConditionTemplate); - writeField(TESTSET_CONDITION, testSetCondition, DataType.CHARARRAY, g, properties, schema, op, 0); - } - if (op != Operation.REMOVE) { - writeField("fields", fields, DataType.MAP, g, properties, schema, op, 0); - } - - g.writeEndObject(); - g.close(); - - return out.toString(); - } - - private static String getPartialOperation(Map<String, String> operationMap, String name, Properties properties) { - // This function checks if the property of the name falls into the map provided - // if yes, return the desired operation. if no, return null - // for example, input: - // operationMap map{"update-map-fields":"assign","remove-map-fields":"remove"} - // name date - // properties "update-map-fields":"date,month" - // output: assign - for (String label : operationMap.keySet()) { - if (properties.getProperty(label) != null) { - String[] p = properties.getProperty(label).split(","); - if (Arrays.asList(p).contains(name)) { - return operationMap.get(label); - } - } - } - return null; - } - - @SuppressWarnings("unchecked") - private static void writeField(String name, Object value, Byte type, JsonGenerator g, Properties properties, Schema schema, Operation op, int depth) throws IOException { - if (shouldWriteField(name, properties, depth)) { - String operation = getPartialOperation(mapPartialOperationMap, name, properties); - // check if the name has the property update-map-fields/remove-map-fields - // if yes, we need special treatments here as we need to loop through the tuple - // be aware the the operation here is not vespa operation such as "put" and "update" - // operation here are the field name we wish use to such as "assign" and "remove" - if (operation != null) { - writePartialUpdateAndRemoveMap(name, value, g, properties, schema, op, depth, operation); - } else { - g.writeFieldName(name); - if (shouldWritePartialUpdate(op, depth)) { - writePartialUpdate(value, type, g, name, properties, schema, op, depth); - } else { - writeValue(value, type, g, name, properties, schema, op, depth); - } - } - - } - } - - private static void writePartialUpdateAndRemoveMap(String name, Object value, JsonGenerator g, Properties properties, Schema schema, Operation op, int depth, String operation) throws IOException { - schema = (schema != null) ? schema.getField(0).schema : null; - // extract the key of map and keys in map for writing json when partial updating maps - Schema valueSchema = (schema != null) ? schema.getField(1).schema : null; - // data format { ( key; id, value: (abc,123,(123234,bbaa))) } - // the first element of each tuple in the bag will be the map to update - // the second element of each tuple in the bag will be the new value of the map - DataBag bag = (DataBag) value; - for (Tuple element : bag) { - if (element.size() != 2) { - continue; - } - String k = (String) element.get(0); - Object v = element.get(1); - Byte t = DataType.findType(v); - if (t == DataType.TUPLE) { - g.writeFieldName(name + "{" + k + "}"); - if (operation.equals(PARTIAL_UPDATE_REMOVE)) { - g.writeStartObject(); - g.writeFieldName(PARTIAL_UPDATE_REMOVE); - g.writeNumber(0); - g.writeEndObject(); - } else { - writePartialUpdate(v, t, g, name, properties, valueSchema, op, depth); - } - } - } - } - - @SuppressWarnings("unchecked") - private static void writeValue(Object value, Byte type, JsonGenerator g, String name, Properties properties, Schema schema, Operation op, int depth) throws IOException { - switch (type) { - case DataType.UNKNOWN: - break; - case DataType.NULL: - g.writeNull(); - break; - case DataType.BOOLEAN: - g.writeBoolean((boolean) value); - break; - case DataType.INTEGER: - g.writeNumber((int) value); - break; - case DataType.LONG: - g.writeNumber((long) value); - break; - case DataType.FLOAT: - g.writeNumber((float) value); - break; - case DataType.DOUBLE: - g.writeNumber((double) value); - break; - case DataType.DATETIME: - g.writeNumber(((DateTime) value).getMillis()); - break; - case DataType.BYTEARRAY: - DataByteArray bytes = (DataByteArray) value; - String raw = Base64.getEncoder().encodeToString(bytes.get()); - g.writeString(raw); - break; - case DataType.CHARARRAY: - g.writeString((String) value); - break; - case DataType.BIGINTEGER: - g.writeNumber((BigInteger) value); - break; - case DataType.BIGDECIMAL: - g.writeNumber((BigDecimal) value); - break; - case DataType.MAP: - g.writeStartObject(); - Map<Object, Object> map = (Map<Object, Object>) value; - if (shouldCreateTensor(map, name, properties)) { - if (isRemoveTensor(name, properties)) { - writeRemoveTensor(map, g); - } else { - writeTensor(map, g); - } - } else { - for (Map.Entry<Object, Object> entry : map.entrySet()) { - String k = entry.getKey().toString(); - Object v = entry.getValue(); - Byte t = DataType.findType(v); - Schema fieldSchema = (schema != null) ? schema.getField(k).schema : null; - writeField(k, v, t, g, properties, fieldSchema, op, depth + 1); - } - } - g.writeEndObject(); - break; - case DataType.TUPLE: - Tuple tuple = (Tuple) value; - if (shouldWriteTupleAsMap(name, properties)) { - Map<String, Object> fields = TupleTools.tupleMap(schema, tuple); - writeValue(fields, DataType.MAP, g, name, properties, schema, op, depth); - } else { - boolean writeStartArray = shouldWriteTupleStart(tuple, name, properties); - if (writeStartArray) { - g.writeStartArray(); - } - for (Object v : tuple) { - writeValue(v, DataType.findType(v), g, name, properties, schema, op, depth); - } - if (writeStartArray) { - g.writeEndArray(); - } - } - break; - case DataType.BAG: - DataBag bag = (DataBag) value; - // get the schema of the tuple in bag - schema = (schema != null) ? schema.getField(0).schema : null; - if (shouldWriteBagAsMap(name, properties)) { - // when treating bag as map, the schema of bag should be {(key, val)....} - // the size of tuple in bag should be 2. 1st one is key. 2nd one is val. - Schema valueSchema = (schema != null) ? schema.getField(1).schema : null; - - g.writeStartObject(); - for (Tuple element : bag) { - if (element.size() != 2) { - continue; - } - String k = (String) element.get(0); - Object v = element.get(1); - Byte t = DataType.findType(v); - if (t == DataType.TUPLE) { - Map<String, Object> fields = TupleTools.tupleMap(valueSchema, (Tuple) v); - writeField(k, fields, DataType.MAP, g, properties, valueSchema, op, depth + 1); - } else { - writeField(k, v, t, g, properties, valueSchema, op, depth + 1); - } - } - g.writeEndObject(); - } else { - g.writeStartArray(); - for (Tuple t : bag) { - writeValue(t, DataType.TUPLE, g, name, properties, schema, op, depth); - } - g.writeEndArray(); - } - break; - } - - } - - private static boolean shouldWritePartialUpdate(Operation op, int depth) { - return op == Operation.UPDATE && depth == 1; - } - - private static void writePartialUpdate(Object value, Byte type, JsonGenerator g, String name, Properties properties, Schema schema, Operation op, int depth) throws IOException { - g.writeStartObject(); - // here we check if the operation falls into the four partial operations we do on map/tensor structure - // if no, we assume it's a update on the whole document and we write assign here - // if yes, we write the desired operation here - String operation = getPartialOperation(partialOperationMap, name, properties); - if (operation != null) { - g.writeFieldName(operation); - } else { - g.writeFieldName(PARTIAL_UPDATE_ASSIGN); - } - writeValue(value, type, g, name, properties, schema, op, depth); - g.writeEndObject(); - } - - private static boolean shouldWriteTupleStart(Tuple tuple, String name, Properties properties) { - if (tuple.size() > 1 || properties == null) { - return true; - } - String simpleArrayFields = properties.getProperty(SIMPLE_ARRAY_FIELDS); - if (simpleArrayFields == null) { - return true; - } - if (simpleArrayFields.equals("*")) { - return false; - } - String[] fields = simpleArrayFields.split(","); - for (String field : fields) { - if (field.trim().equalsIgnoreCase(name)) { - return false; - } - } - return true; - } - - private static boolean shouldWriteTupleAsMap(String name, Properties properties) { - // include UPDATE_MAP_FIELDS here because when updating the map - // the second element in each tuple should be written as a map - if (properties == null) { - return false; - } - String addBagAsMapFields = properties.getProperty(UPDATE_MAP_FIELDS); - String simpleObjectFields = properties.getProperty(SIMPLE_OBJECT_FIELDS); - if (simpleObjectFields == null && addBagAsMapFields == null) { - return false; - } - if (addBagAsMapFields != null) { - if (addBagAsMapFields.equals("*")) { - return true; - } - String[] fields = addBagAsMapFields.split(","); - for (String field : fields) { - if (field.trim().equalsIgnoreCase(name)) { - return true; - } - } - - } - if (simpleObjectFields != null) { - if (simpleObjectFields.equals("*")) { - return true; - } - String[] fields = simpleObjectFields.split(","); - for (String field : fields) { - if (field.trim().equalsIgnoreCase(name)) { - return true; - } - } - } - return false; - } - - private static boolean shouldWriteBagAsMap(String name, Properties properties) { - if (properties == null) { - return false; - } - String bagAsMapFields = properties.getProperty(BAG_AS_MAP_FIELDS); - if (bagAsMapFields == null) { - return false; - } - if (bagAsMapFields.equals("*")) { - return true; - } - String[] fields = bagAsMapFields.split(","); - for (String field : fields) { - if (field.trim().equalsIgnoreCase(name)) { - return true; - } - } - return false; - } - - private static boolean shouldCreateTensor(Map<Object, Object> map, String name, Properties properties) { - if (properties == null) { - return false; - } - String createTensorFields = properties.getProperty(CREATE_TENSOR_FIELDS); - String addTensorFields = properties.getProperty(UPDATE_TENSOR_FIELDS); - String removeTensorFields = properties.getProperty(REMOVE_TENSOR_FIELDS); - - if (createTensorFields == null && addTensorFields == null && removeTensorFields == null) { - return false; - } - String[] fields; - if (createTensorFields != null) { - fields = createTensorFields.split(","); - for (String field : fields) { - if (field.trim().equalsIgnoreCase(name)) { - return true; - } - } - } - if (addTensorFields != null) { - fields = addTensorFields.split(","); - for (String field : fields) { - if (field.trim().equalsIgnoreCase(name)) { - return true; - } - } - } - if (removeTensorFields != null) { - fields = removeTensorFields.split(","); - for (String field : fields) { - if (field.trim().equalsIgnoreCase(name)) { - return true; - } - } - } - return false; - } - - private static boolean isRemoveTensor(String name, Properties properties) { - if (properties == null) { - return false; - } - String removeTensorFields = properties.getProperty(REMOVE_TENSOR_FIELDS); - if (removeTensorFields == null) { - return false; - } - String[] fields = removeTensorFields.split(","); - for (String field : fields) { - if (field.trim().equalsIgnoreCase(name)) { - return true; - } - } - return false; - } - - private static boolean shouldWriteField(String name, Properties properties, int depth) { - if (properties == null || depth != 1) { - return true; - } - String excludeFields = properties.getProperty(EXCLUDE_FIELDS); - if (excludeFields == null) { - return true; - } - String[] fields = excludeFields.split(","); - for (String field : fields) { - if (field.trim().equalsIgnoreCase(name)) { - return false; - } - } - return true; - } - - private static void writeTensor(Map<Object, Object> map, JsonGenerator g) throws IOException { - g.writeFieldName("cells"); - g.writeStartArray(); - for (Map.Entry<Object, Object> entry : map.entrySet()) { - String k = entry.getKey().toString(); - Double v = Double.parseDouble(entry.getValue().toString()); - - g.writeStartObject(); - - // Write address - g.writeFieldName("address"); - g.writeStartObject(); - - String[] dimensions = k.split(","); - for (String dimension : dimensions) { - if (dimension == null || dimension.isEmpty()) { - continue; - } - String[] address = dimension.split(":"); - if (address.length != 2) { - throw new IllegalArgumentException("Malformed cell address: " + dimension); - } - String dim = address[0]; - String label = address[1]; - if (dim == null || label == null || dim.isEmpty() || label.isEmpty()) { - throw new IllegalArgumentException("Malformed cell address: " + dimension); - } - g.writeFieldName(dim.trim()); - g.writeString(label.trim()); - } - g.writeEndObject(); - - // Write value - g.writeFieldName("value"); - g.writeNumber(v); - - g.writeEndObject(); - } - g.writeEndArray(); - } - - private static void writeRemoveTensor(Map<Object, Object> map, JsonGenerator g) throws IOException { - g.writeFieldName("addresses"); - g.writeStartArray(); - for (Map.Entry<Object, Object> entry : map.entrySet()) { - String k = entry.getKey().toString(); - String[] dimensions = k.split(","); - for (String dimension : dimensions) { - g.writeStartObject(); - if (dimension == null || dimension.isEmpty()) { - continue; - } - String[] address = dimension.split(":"); - if (address.length != 2) { - throw new IllegalArgumentException("Malformed cell address: " + dimension); - } - String dim = address[0]; - String label = address[1]; - if (dim == null || label == null || dim.isEmpty() || label.isEmpty()) { - throw new IllegalArgumentException("Malformed cell address: " + dimension); - } - g.writeFieldName(dim.trim()); - g.writeString(label.trim()); - g.writeEndObject(); - // Write address - } - } - g.writeEndArray(); - } - - // copied from vespajlib for reducing dependency and building with JDK 8 - private static String getStackTraceAsString(Throwable throwable) { - try (StringWriter stringWriter = new StringWriter(); - PrintWriter printWriter = new PrintWriter(stringWriter, true)) { - throwable.printStackTrace(printWriter); - return stringWriter.getBuffer().toString(); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - - // wrapper to emit logs - private void warnLog(String msg, PigWarning warning) { - warn(msg, warning); - System.err.println(msg); - } -} diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaQuery.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaQuery.java deleted file mode 100644 index 1d50f2909a2..00000000000 --- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaQuery.java +++ /dev/null @@ -1,114 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.hadoop.pig; - -import com.fasterxml.jackson.databind.JsonNode; -import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration; -import com.yahoo.vespa.hadoop.mapreduce.util.VespaHttpClient; -import com.yahoo.vespa.hadoop.mapreduce.util.VespaQuerySchema; -import com.yahoo.vespa.hadoop.mapreduce.util.TupleTools; -import org.apache.pig.EvalFunc; -import org.apache.pig.data.*; -import org.apache.pig.impl.logicalLayer.schema.Schema; -import org.apache.pig.impl.util.UDFContext; - -import java.io.IOException; -import java.util.*; - -/** - * A Pig UDF to run a query against a Vespa cluster and return the - * results. - * - * @author lesters - */ -public class VespaQuery extends EvalFunc<DataBag> { - - private final String PROPERTY_QUERY_TEMPLATE = "query"; - private final String PROPERTY_QUERY_SCHEMA = "schema"; - private final String PROPERTY_ROOT_NODE = "rootnode"; - - private final VespaConfiguration configuration; - private final Properties properties; - private final String queryTemplate; - private final String querySchema; - private final String queryRootNode; - - private VespaHttpClient httpClient; - - public VespaQuery() { - this(new String[0]); - } - - public VespaQuery(String... params) { - configuration = VespaConfiguration.get(UDFContext.getUDFContext().getJobConf(), null); - properties = VespaConfiguration.loadProperties(params); - - queryTemplate = properties.getProperty(PROPERTY_QUERY_TEMPLATE); - if (queryTemplate == null || queryTemplate.isEmpty()) { - throw new IllegalArgumentException("Query template cannot be empty"); - } - - querySchema = properties.getProperty(PROPERTY_QUERY_SCHEMA, "rank:int,id:chararray"); - queryRootNode = properties.getProperty(PROPERTY_ROOT_NODE, "root/children"); - } - - @Override - public DataBag exec(Tuple input) throws IOException { - if (input == null || input.size() == 0) { - return null; - } - JsonNode jsonResult = queryVespa(input); - if (jsonResult == null) { - return null; - } - return createPigRepresentation(jsonResult); - } - - @Override - public Schema outputSchema(Schema input) { - return VespaQuerySchema.getPigSchema(querySchema); - } - - - private JsonNode queryVespa(Tuple input) throws IOException { - String url = createVespaQueryUrl(input); - if (url == null) { - return null; - } - String result = executeVespaQuery(url); - return parseVespaResultJson(result); - } - - - private String createVespaQueryUrl(Tuple input) throws IOException { - return TupleTools.toString(getInputSchema(), input, queryTemplate); - } - - - private String executeVespaQuery(String url) throws IOException { - if (httpClient == null) { - httpClient = new VespaHttpClient(configuration); - } - return httpClient.get(url); - } - - private JsonNode parseVespaResultJson(String result) throws IOException { - return httpClient == null ? null : httpClient.parseResultJson(result, queryRootNode); - } - - private DataBag createPigRepresentation(JsonNode hits) { - DataBag bag = new SortedDataBag(null); - VespaQuerySchema querySchema = new VespaQuerySchema(this.querySchema); - - for (int rank = 0; rank < hits.size(); ++rank) { - JsonNode hit = hits.get(rank); - Tuple tuple = querySchema.buildTuple(rank, hit); - bag.add(tuple); - } - - return bag; - } - - - - -} diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaSimpleJsonLoader.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaSimpleJsonLoader.java deleted file mode 100644 index 9dc294ce243..00000000000 --- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaSimpleJsonLoader.java +++ /dev/null @@ -1,63 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.hadoop.pig; - -import com.yahoo.vespa.hadoop.mapreduce.VespaSimpleJsonInputFormat; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.InputFormat; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.pig.LoadFunc; -import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; -import org.apache.pig.data.Tuple; -import org.apache.pig.data.TupleFactory; - -import java.io.IOException; - -/** - * Simple JSON loader which loads either one JSON object per line or a - * multiline JSON consisting of objects in an array. - * - * Returns only the textual representation of the JSON object. - * - * @author lesters - */ -@SuppressWarnings("rawtypes") -public class VespaSimpleJsonLoader extends LoadFunc { - - private TupleFactory tupleFactory = TupleFactory.getInstance(); - private VespaSimpleJsonInputFormat.VespaJsonRecordReader recordReader; - - @Override - public void setLocation(String location, Job job) throws IOException { - FileInputFormat.setInputPaths(job, location); - } - - @Override - public InputFormat getInputFormat() throws IOException { - return new VespaSimpleJsonInputFormat(); - } - - @Override - public void prepareToRead(RecordReader reader, PigSplit split) throws IOException { - recordReader = (VespaSimpleJsonInputFormat.VespaJsonRecordReader) reader; - } - - @Override - public Tuple getNext() throws IOException { - try { - boolean done = recordReader.nextKeyValue(); - if (done) { - return null; - } - Text json = recordReader.getCurrentKey(); - if (json == null) { - return null; - } - return tupleFactory.newTuple(json.toString()); - - } catch (InterruptedException ex) { - return null; - } - } -} diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaStorage.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaStorage.java deleted file mode 100644 index a564dfac25d..00000000000 --- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaStorage.java +++ /dev/null @@ -1,178 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.hadoop.pig; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.yahoo.vespa.hadoop.mapreduce.VespaOutputFormat; -import com.yahoo.vespa.hadoop.mapreduce.util.TupleTools; -import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.OutputFormat; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.pig.ResourceSchema; -import org.apache.pig.StoreFunc; -import org.apache.pig.data.Tuple; -import org.apache.pig.impl.logicalLayer.schema.Schema; -import org.apache.pig.impl.util.UDFContext; - -import java.io.*; -import java.util.Base64; -import java.util.Map; -import java.util.Properties; - -/** - * A small Pig UDF wrapper around the Vespa http client for - * feeding data into a Vespa endpoint. - * - * @author lesters - */ -@SuppressWarnings("rawtypes") -public class VespaStorage extends StoreFunc { - - private final boolean createDocOp; - private final String template; - private final VespaDocumentOperation.Operation operation; - - private String signature = null; - private RecordWriter recordWriter = null; - private ResourceSchema resourceSchema = null; - - private static final String PROPERTY_CREATE_DOC_OP = "create-document-operation"; - private static final String PROPERTY_ID_TEMPLATE = "docid"; - private static final String PROPERTY_OPERATION = "operation"; - private static final String PROPERTY_RESOURCE_SCHEMA = "resource_schema"; - - Properties properties = new Properties(); - - public VespaStorage() { - createDocOp = false; - template = null; - operation = null; - } - - public VespaStorage(String... params) { - properties = VespaConfiguration.loadProperties(params); - createDocOp = Boolean.parseBoolean(properties.getProperty(PROPERTY_CREATE_DOC_OP, "false")); - operation = VespaDocumentOperation.Operation.fromString(properties.getProperty(PROPERTY_OPERATION, "put")); - template = properties.getProperty(PROPERTY_ID_TEMPLATE); - } - - - @Override - public OutputFormat getOutputFormat() throws IOException { - return new VespaOutputFormat(properties); - } - - - @Override - public void setStoreLocation(String endpoint, Job job) throws IOException { - properties.setProperty(VespaConfiguration.ENDPOINT, endpoint); - } - - - @Override - public void prepareToWrite(RecordWriter recordWriter) throws IOException { - this.recordWriter = recordWriter; - this.resourceSchema = getResourceSchema(); - } - - - @SuppressWarnings("unchecked") - @Override - public void putNext(Tuple tuple) throws IOException { - if (tuple == null || tuple.size() == 0) { - return; - } - - String data = null; - if (createDocOp) { - data = createDocumentOperation(tuple); - } else if (!tuple.isNull(0)) { - data = tuple.get(0).toString(); // assume single field with correctly formatted doc op. - } - - if (data == null || data.length() == 0) { - return; - } - - try { - recordWriter.write(0, data); - } catch (InterruptedException e) { - throw new IOException(e); - } - } - - - @Override - public void checkSchema(ResourceSchema resourceSchema) throws IOException { - setResourceSchema(resourceSchema); - } - - - @Override - public String relToAbsPathForStoreLocation(String endpoint, Path path) throws IOException { - return endpoint; - } - - - @Override - public void setStoreFuncUDFContextSignature(String s) { - this.signature = s; - } - - - @Override - public void cleanupOnFailure(String s, Job job) throws IOException { - } - - - @Override - public void cleanupOnSuccess(String s, Job job) throws IOException { - } - - - private ResourceSchema getResourceSchema() throws IOException { - Properties properties = getUDFProperties(); - return base64Deserialize(properties.getProperty(PROPERTY_RESOURCE_SCHEMA)); - } - - - private void setResourceSchema(ResourceSchema schema) throws IOException { - Properties properties = getUDFProperties(); - if (properties.getProperty(PROPERTY_RESOURCE_SCHEMA) == null) { - properties.setProperty(PROPERTY_RESOURCE_SCHEMA, base64Serialize(schema)); - } - } - - - private Properties getUDFProperties() { - String[] context = { signature }; - return UDFContext.getUDFContext().getUDFProperties(getClass(), context); - } - - - private String createDocumentOperation(Tuple tuple) throws IOException { - if (tuple == null || tuple.size() == 0) { - return null; - } - if (resourceSchema == null) { - return null; - } - - Map<String, Object> fields = TupleTools.tupleMap(resourceSchema, tuple); - String docId = TupleTools.toString(fields, template); - - Schema schema = Schema.getPigSchema(resourceSchema); - return VespaDocumentOperation.create(operation, docId, fields, properties, schema); - } - - public static String base64Serialize(ResourceSchema resourceSchema) throws IOException { - byte[] bytes = new ObjectMapper().writeValueAsBytes(resourceSchema); - return Base64.getEncoder().encodeToString(bytes); - } - - public static ResourceSchema base64Deserialize(String s) throws IOException { - byte[] data = Base64.getDecoder().decode(s); - return new ObjectMapper().readValue(data, ResourceSchema.class); - } -} diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/package-info.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/package-info.java deleted file mode 100644 index 686765ac047..00000000000 --- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/package-info.java +++ /dev/null @@ -1,10 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -/** - * com.yahoo.vespa.hadoop.pig contains classes and utilities - * to enable feeding directly to Vespa endpoints from pig. - * It is a minimal layer over the Vespa HTTP client. - * - * NOTE: This is a PUBLIC API, but not annotated as such because this is not a bundle and - * we don't want to introduce Vespa dependencies. - */ -package com.yahoo.vespa.hadoop.pig; diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/MapReduceTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/MapReduceTest.java deleted file mode 100644 index d56cd818de2..00000000000 --- a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/MapReduceTest.java +++ /dev/null @@ -1,200 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.hadoop.pig; - -import com.fasterxml.jackson.core.JsonEncoding; -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.JsonGenerator; -import com.yahoo.vespa.hadoop.mapreduce.VespaOutputFormat; -import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration; -import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocalFileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.HdfsConfiguration; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.Reducer; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.test.PathUtils; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; - -import java.io.BufferedInputStream; -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.StringTokenizer; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; - -public class MapReduceTest { - - protected static File hdfsBaseDir; - protected static FileSystem hdfs; - protected static Configuration conf; - protected static MiniDFSCluster cluster; - - protected static Path metricsJsonPath; - protected static Path metricsCsvPath; - - @BeforeAll - public static void setUp() throws IOException { - hdfsBaseDir = new File(PathUtils.getTestDir(MapReduceTest.class).getCanonicalPath()); - - conf = new HdfsConfiguration(); - conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, hdfsBaseDir.getAbsolutePath()); - conf.set(VespaConfiguration.DRYRUN, "true"); - conf.set(VespaConfiguration.ENDPOINT, "endpoint-does-not-matter-in-dryrun"); - - cluster = new MiniDFSCluster.Builder(conf).build(); - hdfs = FileSystem.get(conf); - - metricsJsonPath = new Path("metrics_json"); - metricsCsvPath = new Path("metrics_csv"); - copyToHdfs("src/test/resources/operations_data.json", metricsJsonPath, "data"); - copyToHdfs("src/test/resources/tabular_data.csv", metricsCsvPath, "data"); - } - - @AfterAll - public static void tearDown() throws IOException { - Path testDir = new Path(hdfsBaseDir.getParent()); - hdfs.delete(testDir, true); - cluster.shutdown(); - LocalFileSystem localFileSystem = FileSystem.getLocal(conf); - localFileSystem.delete(testDir, true); - } - - @Test - public void requireThatMapOnlyJobSucceeds() throws Exception { - Job job = Job.getInstance(conf); - job.setJarByClass(MapReduceTest.class); - job.setMapperClass(FeedMapper.class); - job.setOutputFormatClass(VespaOutputFormat.class); - job.setMapOutputValueClass(Text.class); - - FileInputFormat.setInputPaths(job, metricsJsonPath); - - boolean success = job.waitForCompletion(true); - assertTrue(success, "Job Failed"); - - VespaCounters counters = VespaCounters.get(job); - assertEquals(10, counters.getDocumentsSent()); - assertEquals(0, counters.getDocumentsFailed()); - assertEquals(10, counters.getDocumentsOk()); - } - - @Test - public void requireThatMapReduceJobSucceeds() throws Exception { - Job job = Job.getInstance(conf); - job.setJarByClass(MapReduceTest.class); - job.setMapperClass(FeedMapper.class); - job.setOutputFormatClass(VespaOutputFormat.class); - job.setMapOutputValueClass(Text.class); - job.setReducerClass(FeedReducer.class); - job.setNumReduceTasks(1); - - FileInputFormat.setInputPaths(job, metricsJsonPath); - - boolean success = job.waitForCompletion(true); - assertTrue(success, "Job Failed"); - - VespaCounters counters = VespaCounters.get(job); - assertEquals(10, counters.getDocumentsSent()); - assertEquals(0, counters.getDocumentsFailed()); - assertEquals(10, counters.getDocumentsOk()); - } - - - @Test - public void requireThatTransformMapJobSucceeds() throws Exception { - Job job = Job.getInstance(conf); - job.setJarByClass(MapReduceTest.class); - job.setMapperClass(ParsingMapper.class); - job.setOutputFormatClass(VespaOutputFormat.class); - job.setMapOutputValueClass(Text.class); - job.setReducerClass(FeedReducer.class); - job.setNumReduceTasks(1); - - FileInputFormat.setInputPaths(job, metricsCsvPath); - - boolean success = job.waitForCompletion(true); - assertTrue(success, "Job Failed"); - - VespaCounters counters = VespaCounters.get(job); - assertEquals(10, counters.getDocumentsSent()); - assertEquals(0, counters.getDocumentsFailed()); - assertEquals(10, counters.getDocumentsOk()); - assertEquals(0, counters.getDocumentsSkipped()); - } - - - private static void copyToHdfs(String localFile, Path hdfsDir, String hdfsName) throws IOException { - Path hdfsPath = new Path(hdfsDir, hdfsName); - FSDataOutputStream out = hdfs.create(hdfsPath); - - try (InputStream in = new BufferedInputStream(new FileInputStream(localFile))) { - int len; - byte[] buffer = new byte[1024]; - while ((len = in.read(buffer)) > 0) { - out.write(buffer, 0, len); - } - } finally { - out.close(); - } - } - - public static class FeedMapper extends Mapper<LongWritable, Text, LongWritable, Text> { - public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { - context.write(key, value); - } - } - - public static class FeedReducer extends Reducer<Object, Text, LongWritable, Text> { - public void reduce(LongWritable key, Text value, Context context) throws IOException, InterruptedException { - context.write(key, value); - } - } - - public static class ParsingMapper extends Mapper<LongWritable, Text, LongWritable, Text> { - public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { - String line = value.toString(); - if (line == null || line.length() == 0) - return; - - StringTokenizer tokenizer = new StringTokenizer(line); - long date = Long.parseLong(tokenizer.nextToken()); - String metricName = tokenizer.nextToken(); - long metricValue = Long.parseLong(tokenizer.nextToken()); - String application = tokenizer.nextToken(); - - String docid = "id:"+application+":metric::"+metricName+"-"+date; - - ByteArrayOutputStream out = new ByteArrayOutputStream(); - JsonGenerator g = new JsonFactory().createGenerator(out, JsonEncoding.UTF8); - - g.writeStartObject(); - g.writeObjectFieldStart("fields"); - g.writeNumberField("date", date); - g.writeStringField("name", metricName); - g.writeNumberField("value", metricValue); - g.writeStringField("application", application); - g.writeEndObject(); - g.writeStringField("put", docid); - g.writeEndObject(); - g.close(); - - context.write(key, new Text(out.toString())); - } - } - - -} diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java deleted file mode 100644 index ec20e82763c..00000000000 --- a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java +++ /dev/null @@ -1,633 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.hadoop.pig; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.pig.data.BagFactory; -import org.apache.pig.data.DataBag; -import org.apache.pig.data.DataByteArray; -import org.apache.pig.data.DataType; -import org.apache.pig.data.SortedDataBag; -import org.apache.pig.data.Tuple; -import org.apache.pig.data.TupleFactory; -import org.apache.pig.impl.logicalLayer.FrontendException; -import org.apache.pig.impl.logicalLayer.schema.Schema; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.PrintStream; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertTrue; - -public class VespaDocumentOperationTest { - private final ByteArrayOutputStream outContent = new ByteArrayOutputStream(); - private final PrintStream originalOut = System.out; - - @BeforeEach - public void setUpStreams() { - System.setOut(new PrintStream(outContent)); - } - - @AfterEach - public void restoreStreams() { - System.setOut(originalOut); - } - @Test - public void requireThatUDFReturnsCorrectJson() throws Exception { - String json = getDocumentOperationJson("docid=id:<application>:metrics::<name>-<date>"); - ObjectMapper m = new ObjectMapper(); - JsonNode root = m.readTree(json); - JsonNode fields = root.path("fields"); - - // operation put is default - assertEquals("id:testapp:metrics::clicks-20160112", root.get("put").asText()); - assertEquals("testapp", fields.get("application").asText()); - assertEquals("clicks", fields.get("name").asText()); - assertEquals(3, fields.get("value").asInt()); - } - - - @Test - public void requireThatUDFSupportsUpdateAssign() throws IOException { - String json = getDocumentOperationJson("docid=id:<application>:metrics::<name>-<date>", "operation=update"); - ObjectMapper m = new ObjectMapper(); - JsonNode root = m.readTree(json); - JsonNode fields = root.path("fields"); - - assertEquals("id:testapp:metrics::clicks-20160112", root.get("update").asText()); - assertEquals("testapp", fields.get("application").get("assign").asText()); - assertEquals("clicks", fields.get("name").get("assign").asText()); - assertEquals(3, fields.get("value").get("assign").asInt()); - } - - @Test - public void requireThatUDFSupportsConditionalUpdateAssign() throws IOException { - String json = getDocumentOperationJson("docid=id:<application>:metrics::<name>-<date>", "operation=update", "condition=clicks < <value>"); - ObjectMapper m = new ObjectMapper(); - JsonNode root = m.readTree(json); - JsonNode fields = root.path("fields"); - - assertEquals("id:testapp:metrics::clicks-20160112", root.get("update").asText()); - assertEquals("clicks < 3", root.get("condition").asText()); - assertEquals("testapp", fields.get("application").get("assign").asText()); - assertEquals("clicks", fields.get("name").get("assign").asText()); - assertEquals(3, fields.get("value").get("assign").asInt()); - } - - @Test - public void requireThatUDFSupportsCreateIfNonExistent() throws IOException { - String json = getDocumentOperationJson("docid=id:<application>:metrics::<name>-<date>", "operation=update", - "create-if-non-existent=true"); - ObjectMapper m = new ObjectMapper(); - JsonNode root = m.readTree(json); - JsonNode fields = root.path("fields"); - - assertEquals("id:testapp:metrics::clicks-20160112", root.get("update").asText()); - assertTrue(root.get("create").asBoolean()); - assertEquals("testapp", fields.get("application").get("assign").asText()); - assertEquals("clicks", fields.get("name").get("assign").asText()); - assertEquals(3, fields.get("value").get("assign").asInt()); - } - - - @Test - public void requireThatUDFReturnsNullForMissingConfig() throws Exception { - String json = getDocumentOperationJson(); - assertNull(json); - } - - - @Test - public void requireThatUDFCorrectlyGeneratesRemoveBagAsMapOperation() throws Exception { - DataBag bag = BagFactory.getInstance().newDefaultBag(); - - Schema innerObjectSchema = new Schema(); - Tuple innerObjectTuple = TupleFactory.getInstance().newTuple(); - addToTuple("year", DataType.CHARARRAY, "2020", innerObjectSchema, innerObjectTuple); - addToTuple("month", DataType.INTEGER, 3, innerObjectSchema, innerObjectTuple); - - Schema objectSchema = new Schema(); - Tuple objectTuple = TupleFactory.getInstance().newTuple(); - addToTuple("key", DataType.CHARARRAY, "234566", objectSchema, objectTuple); - addToTuple("value", DataType.TUPLE, innerObjectTuple,innerObjectSchema,objectSchema, objectTuple); - - Schema bagSchema = new Schema(); - addToBagWithSchema("firstLayerTuple",DataType.TUPLE,objectTuple,objectSchema,bagSchema,bag); - - innerObjectSchema = new Schema(); - innerObjectTuple = TupleFactory.getInstance().newTuple(); - addToTuple("year", DataType.CHARARRAY, "2020", innerObjectSchema, innerObjectTuple); - addToTuple("month", DataType.INTEGER, 3, innerObjectSchema, innerObjectTuple); - - objectSchema = new Schema(); - objectTuple = TupleFactory.getInstance().newTuple(); - addToTuple("key", DataType.CHARARRAY, "123456", objectSchema, objectTuple); - addToTuple("value", DataType.TUPLE, innerObjectTuple,innerObjectSchema,objectSchema, objectTuple); - - addToBagWithSchema("firstLayerTuple",DataType.TUPLE,objectTuple,objectSchema,bagSchema,bag); - - Schema schema = new Schema(); - Tuple tuple = TupleFactory.getInstance().newTuple(); - addToTuple("bag", DataType.BAG, bag, bagSchema, schema, tuple); - addToTuple("id", DataType.CHARARRAY, "123", schema, tuple); - - VespaDocumentOperation docOp = new VespaDocumentOperation("docid=id", "remove-map-fields=bag","operation=update"); - docOp.setInputSchema(schema); - String json = docOp.exec(tuple); - - ObjectMapper m = new ObjectMapper(); - JsonNode root = m.readTree(json); - JsonNode fields = root.get("fields"); - assertEquals("{\"remove\":0}", fields.get("bag{123456}").toString()); - assertEquals("{\"remove\":0}", fields.get("bag{234566}").toString()); - - } - - @Test - public void requireThatUDFCorrectlyGeneratesAddBagAsMapOperation() throws Exception { - - DataBag bag = BagFactory.getInstance().newDefaultBag(); - - Schema innerObjectSchema = new Schema(); - Tuple innerObjectTuple = TupleFactory.getInstance().newTuple(); - addToTuple("year", DataType.CHARARRAY, "2020", innerObjectSchema, innerObjectTuple); - addToTuple("month", DataType.INTEGER, 3, innerObjectSchema, innerObjectTuple); - - Schema objectSchema = new Schema(); - Tuple objectTuple = TupleFactory.getInstance().newTuple(); - addToTuple("key", DataType.CHARARRAY, "123456", objectSchema, objectTuple); - addToTuple("value", DataType.TUPLE, innerObjectTuple,innerObjectSchema,objectSchema, objectTuple); - - Schema bagSchema = new Schema(); - addToBagWithSchema("firstLayerTuple",DataType.TUPLE,objectTuple,objectSchema,bagSchema,bag); - - Schema schema = new Schema(); - Tuple tuple = TupleFactory.getInstance().newTuple(); - addToTuple("bag", DataType.BAG, bag, bagSchema, schema, tuple); - addToTuple("id", DataType.CHARARRAY, "123", schema, tuple); - VespaDocumentOperation docOp = new VespaDocumentOperation("docid=id", "update-map-fields=bag","operation=update"); - docOp.setInputSchema(schema); - String json = docOp.exec(tuple); - - ObjectMapper m = new ObjectMapper(); - JsonNode root = m.readTree(json); - - JsonNode fields = root.get("fields"); - JsonNode value = fields.get("bag{123456}"); - JsonNode assign = value.get("assign"); - assertEquals("2020", assign.get("year").asText()); - assertEquals(3, assign.get("month").asInt()); - } - - @Test - public void requireThatUDFCorrectlyGeneratesAddTensorOperation() throws Exception { - - Schema schema = new Schema(); - Tuple tuple = TupleFactory.getInstance().newTuple(); - - // Please refer to the tensor format documentation - - Map<String, Double> tensor = new HashMap<String, Double>() {{ - put("x:label1,y:label2,z:label4", 2.0); - put("x:label3", 3.0); - }}; - - addToTuple("id", DataType.CHARARRAY, "123", schema, tuple); - addToTuple("tensor", DataType.MAP, tensor, schema, tuple); - - VespaDocumentOperation docOp = new VespaDocumentOperation("docid=empty", "update-tensor-fields=tensor","operation=update"); - docOp.setInputSchema(schema); - String json = docOp.exec(tuple); - - ObjectMapper m = new ObjectMapper(); - JsonNode root = m.readTree(json); - JsonNode fields = root.get("fields"); - JsonNode tensorValue = fields.get("tensor"); - JsonNode add = tensorValue.get("add"); - JsonNode cells = add.get("cells"); - Iterator<JsonNode> cellsIterator = cells.iterator(); - - JsonNode element = cellsIterator.next(); - assertEquals("label1", element.get("address").get("x").asText()); - assertEquals("label2", element.get("address").get("y").asText()); - assertEquals("label4", element.get("address").get("z").asText()); - assertEquals("2.0", element.get("value").toString()); - - element = cellsIterator.next(); - assertEquals("label3", element.get("address").get("x").asText()); - assertEquals("3.0", element.get("value").toString()); - } - - @Test - public void requireThatUDFCorrectlyGeneratesRemoveTensorOperation() throws Exception { - - Schema schema = new Schema(); - Tuple tuple = TupleFactory.getInstance().newTuple(); - - // Please refer to the tensor format documentation - - Map<String, Double> tensor = new HashMap<String, Double>() {{ - put("x:label1,y:label2,z:label4", 2.0); - put("x:label3", 3.0); - }}; - - addToTuple("id", DataType.CHARARRAY, "123", schema, tuple); - addToTuple("tensor", DataType.MAP, tensor, schema, tuple); - - VespaDocumentOperation docOp = new VespaDocumentOperation("docid=empty", "remove-tensor-fields=tensor","operation=update"); - docOp.setInputSchema(schema); - String json = docOp.exec(tuple); - - ObjectMapper m = new ObjectMapper(); - JsonNode root = m.readTree(json); - JsonNode fields = root.get("fields"); - JsonNode tensorValue = fields.get("tensor"); - JsonNode remove = tensorValue.get("remove"); - JsonNode address = remove.get("addresses"); - - Iterator<JsonNode> addressIterator = address.iterator(); - - JsonNode element = addressIterator.next(); - assertEquals("label1", element.get("x").asText()); - - element = addressIterator.next(); - assertEquals("label2", element.get("y").asText()); - - element = addressIterator.next(); - assertEquals("label4", element.get("z").asText()); - - element = addressIterator.next(); - assertEquals("label3", element.get("x").asText()); - } - - @Test - public void requireThatUDFReturnsNullWhenExceptionHappens() throws IOException { - Schema schema = new Schema(); - Tuple tuple = TupleFactory.getInstance().newTuple(); - - // broken DELTA format that would throw internally - Map<String, Double> tensor = new HashMap<String, Double>() {{ - put("xlabel1", 2.0); // missing : between 'x' and 'label1' - }}; - - addToTuple("id", DataType.CHARARRAY, "123", schema, tuple); - addToTuple("tensor", DataType.MAP, tensor, schema, tuple); - - VespaDocumentOperation docOp = new VespaDocumentOperation("docid=empty", "create-tensor-fields=tensor"); - docOp.setInputSchema(schema); - String json = docOp.exec(tuple); - - assertNull(json); - } - - @Test - public void requireThatUDFCorrectlyGeneratesRemoveOperation() throws Exception { - String json = getDocumentOperationJson("operation=remove", "docid=id:<application>:metrics::<name>-<date>"); - ObjectMapper m = new ObjectMapper(); - JsonNode root = m.readTree(json); - JsonNode fields = root.get("fields"); - - assertEquals("id:testapp:metrics::clicks-20160112", root.get("remove").asText()); - assertNull(fields); - } - - - @Test - public void requireThatUDFGeneratesComplexDataTypes() throws Exception { - Schema schema = new Schema(); - Tuple tuple = TupleFactory.getInstance().newTuple(); - - Tuple intTuple = TupleFactory.getInstance().newTuple(); - int[] intArray = {1, 2, 3}; - for (int i : intArray) { intTuple.append(i); } - - Tuple stringTuple = TupleFactory.getInstance().newTuple(); - String[] stringArray = {"a", "b", "c"}; - for (String s : stringArray) { stringTuple.append(s); } - - DataBag bag = new SortedDataBag(null); - bag.add(intTuple); - bag.add(stringTuple); - - Map<String, Object> innerMap = new HashMap<String, Object>() {{ - put("a", "string"); - put("tuple", intTuple); - }}; - - DataByteArray bytes = new DataByteArray("testdata".getBytes()); - - Map<String, Object> outerMap = new HashMap<String, Object>() {{ - put("string", "value"); - put("int", 3); - put("float", 3.145); - put("bool", true); - put("byte", bytes); - put("map", innerMap); - put("bag", bag); - }}; - - addToTuple("map", DataType.MAP, outerMap, schema, tuple); - - VespaDocumentOperation docOp = new VespaDocumentOperation("docid=empty"); - docOp.setInputSchema(schema); - String json = docOp.exec(tuple); - - ObjectMapper m = new ObjectMapper(); - JsonNode root = m.readTree(json); - JsonNode fields = root.get("fields"); - JsonNode map = fields.get("map"); - - assertEquals("value", map.get("string").asText()); - assertEquals(3, map.get("int").asInt()); - assertEquals(3.145, map.get("float").asDouble(), 1e-6); - assertTrue(map.get("bool").asBoolean()); - assertEquals("dGVzdGRhdGE=", map.get("byte").asText()); - - assertEquals("string", map.get("map").get("a").asText()); - for (int i = 0; i < intArray.length; ++i) { - assertEquals(intArray[i], map.get("map").get("tuple").get(i).asInt()); - } - - JsonNode bagField = map.get("bag"); - for (int i = 0; i < intArray.length; ++i) { - assertEquals(intArray[i], bagField.get(0).get(i).asInt()); - } - for (int i = 0; i < stringArray.length; ++i) { - assertEquals(stringArray[i], bagField.get(1).get(i).asText()); - } - } - - - @Test - public void requireThatSimpleArraysMustBeConfigured() throws Exception { - String[] stringArray = {"a", "b", "c"}; - JsonNode array = setupSimpleArrayOperation("array", stringArray, "docid=empty"); // simple arrays not configured - // json: [["a"], ["b"], ["c"]] - assertEquals("a", array.get(0).get(0).asText()); - assertEquals("b", array.get(1).get(0).asText()); - assertEquals("c", array.get(2).get(0).asText()); - } - - - @Test - public void requireThatSimpleArraysAreSupported() throws Exception { - String[] stringArray = {"a", "b", "c"}; - JsonNode array = setupSimpleArrayOperation("array", stringArray, "docid=empty", "simple-array-fields=array"); - // json: ["a", "b", "c"] - assertEquals("a", array.get(0).asText()); - assertEquals("b", array.get(1).asText()); - assertEquals("c", array.get(2).asText()); - } - - - @Test - public void requireThatSimpleArraysCanBeConfiguredWithWildcard() throws Exception { - String[] stringArray = {"a", "b", "c"}; - JsonNode array = setupSimpleArrayOperation("array", stringArray, "docid=empty", "simple-array-fields=*"); - // json: ["a", "b", "c"] - assertEquals("a", array.get(0).asText()); - assertEquals("b", array.get(1).asText()); - assertEquals("c", array.get(2).asText()); - } - - - @Test - public void requireThatMultipleSimpleArraysAreSupported() throws Exception { - String[] stringArray = {"a", "b", "c"}; - JsonNode array = setupSimpleArrayOperation("array", stringArray, "docid=empty", "simple-array-fields=empty,array"); - // json: ["a", "b", "c"] - assertEquals("a", array.get(0).asText()); - assertEquals("b", array.get(1).asText()); - assertEquals("c", array.get(2).asText()); - } - - - private JsonNode setupSimpleArrayOperation(String name, String[] array, String... params) throws IOException { - Schema schema = new Schema(); - Tuple tuple = TupleFactory.getInstance().newTuple(); - - DataBag bag = new SortedDataBag(null); - for (String s : array) { - Tuple stringTuple = TupleFactory.getInstance().newTuple(); - stringTuple.append(s); - bag.add(stringTuple); - } - addToTuple(name, DataType.BAG, bag, schema, tuple); - - VespaDocumentOperation docOp = new VespaDocumentOperation(params); - docOp.setInputSchema(schema); - String json = docOp.exec(tuple); - - ObjectMapper m = new ObjectMapper(); - JsonNode root = m.readTree(json); - JsonNode fields = root.get("fields"); - return fields.get(name); - } - - - @Test - public void requireThatUDFSupportsTensors() throws IOException { - Schema schema = new Schema(); - Tuple tuple = TupleFactory.getInstance().newTuple(); - - // Please refer to the tensor format documentation - - Map<String, Double> tensor = new HashMap<String, Double>() {{ - put("x:label1,y:label2,z:label4", 2.0); - put("x:label3", 3.0); - }}; - - addToTuple("id", DataType.CHARARRAY, "123", schema, tuple); - addToTuple("tensor", DataType.MAP, tensor, schema, tuple); - - VespaDocumentOperation docOp = new VespaDocumentOperation("docid=empty", "create-tensor-fields=tensor"); - docOp.setInputSchema(schema); - String json = docOp.exec(tuple); - - ObjectMapper m = new ObjectMapper(); - JsonNode root = m.readTree(json); - JsonNode fields = root.get("fields"); - JsonNode tensorNode = fields.get("tensor"); - JsonNode cells = tensorNode.get("cells"); - - assertEquals("label1", cells.get(0).get("address").get("x").asText()); - assertEquals("label2", cells.get(0).get("address").get("y").asText()); - assertEquals("label4", cells.get(0).get("address").get("z").asText()); - assertEquals("label3", cells.get(1).get("address").get("x").asText()); - - assertEquals(2.0, cells.get(0).get("value").asDouble(), 1e-6); - assertEquals(3.0, cells.get(1).get("value").asDouble(), 1e-6); - } - - - @Test - public void requireThatUDFCanExcludeFields() throws IOException { - String json = getDocumentOperationJson("docid=id:<application>:metrics::<name>-<date>", "exclude-fields=application,date"); - ObjectMapper m = new ObjectMapper(); - JsonNode root = m.readTree(json); - JsonNode fields = root.path("fields"); - - // 'application' and 'date' fields should not appear in JSON - assertNull(fields.get("application")); - assertNull(fields.get("date")); - assertNotNull(fields.get("name")); - assertNotNull(fields.get("value")); - } - - - private String getDocumentOperationJson(String... params) throws IOException { - Schema schema = new Schema(); - Tuple tuple = TupleFactory.getInstance().newTuple(); - - addToTuple("application", DataType.CHARARRAY, "testapp", schema, tuple); - addToTuple("name", DataType.CHARARRAY, "clicks", schema, tuple); - addToTuple("date", DataType.CHARARRAY, "20160112", schema, tuple); - addToTuple("value", DataType.CHARARRAY, 3, schema, tuple); - - VespaDocumentOperation docOp = new VespaDocumentOperation(params); - docOp.setInputSchema(schema); - return docOp.exec(tuple); - } - - - @Test - public void requireThatUDFSupportsSimpleObjectFields() throws IOException { - Schema objectSchema = new Schema(); - Tuple objectTuple = TupleFactory.getInstance().newTuple(); - addToTuple("id", DataType.LONG, 123456789L, objectSchema, objectTuple); - addToTuple("url", DataType.CHARARRAY, "example.com", objectSchema, objectTuple); - addToTuple("value", DataType.INTEGER, 123, objectSchema, objectTuple); - - Schema schema = new Schema(); - Tuple tuple = TupleFactory.getInstance().newTuple(); - addToTuple("object", DataType.TUPLE, objectTuple, objectSchema, schema, tuple); - - VespaDocumentOperation docOp = new VespaDocumentOperation("docid=empty", "simple-object-fields=object"); - docOp.setInputSchema(schema); - String json = docOp.exec(tuple); - - ObjectMapper m = new ObjectMapper(); - JsonNode root = m.readTree(json); - JsonNode fields = root.get("fields"); - JsonNode objectNode = fields.get("object"); - - assertEquals(123456789L, objectNode.get("id").asLong()); - assertEquals("example.com", objectNode.get("url").asText()); - assertEquals(123, objectNode.get("value").asInt()); - } - - - @Test - public void requireThatUDFSupportsBagAsMapFields() throws IOException { - DataBag bag = BagFactory.getInstance().newDefaultBag(); - - Schema objectSchema = new Schema(); - Tuple objectTuple = TupleFactory.getInstance().newTuple(); - addToTuple("key", DataType.CHARARRAY, "123456", objectSchema, objectTuple); - addToTuple("value", DataType.INTEGER, 123456, objectSchema, objectTuple); - bag.add(objectTuple); - - objectSchema = new Schema(); - objectTuple = TupleFactory.getInstance().newTuple(); - addToTuple("key", DataType.CHARARRAY, "234567", objectSchema, objectTuple); - addToTuple("value", DataType.INTEGER, 234567, objectSchema, objectTuple); - bag.add(objectTuple); - - Schema schema = new Schema(); - Tuple tuple = TupleFactory.getInstance().newTuple(); - addToTuple("bag", DataType.BAG, bag, objectSchema, schema, tuple); - - VespaDocumentOperation docOp = new VespaDocumentOperation("docid=empty", "bag-as-map-fields=bag"); - docOp.setInputSchema(schema); - String json = docOp.exec(tuple); - - ObjectMapper m = new ObjectMapper(); - JsonNode root = m.readTree(json); - JsonNode fields = root.get("fields"); - JsonNode bagNode = fields.get("bag"); - - assertEquals(123456, bagNode.get("123456").asInt()); - assertEquals(234567, bagNode.get("234567").asInt()); - } - - @Test - public void requireThatUDFPrintIdWhenVerbose() throws IOException { - DataBag bag = BagFactory.getInstance().newDefaultBag(); - - Schema objectSchema = new Schema(); - Tuple objectTuple = TupleFactory.getInstance().newTuple(); - addToTuple("key", DataType.CHARARRAY, "123456", objectSchema, objectTuple); - addToTuple("value", DataType.INTEGER, 123456, objectSchema, objectTuple); - bag.add(objectTuple); - - objectSchema = new Schema(); - objectTuple = TupleFactory.getInstance().newTuple(); - addToTuple("key", DataType.CHARARRAY, "234567", objectSchema, objectTuple); - addToTuple("value", DataType.INTEGER, 234567, objectSchema, objectTuple); - bag.add(objectTuple); - - Schema schema = new Schema(); - Tuple tuple = TupleFactory.getInstance().newTuple(); - addToTuple("bag", DataType.BAG, bag, objectSchema, schema, tuple); - - VespaDocumentOperation docOp = new VespaDocumentOperation("docid=7654321", "bag-as-map-fields=bag","verbose=true"); - docOp.setInputSchema(schema); - String json = docOp.exec(tuple); - - assertTrue(outContent.toString().contains("Processing docId: 7654321")); - } - - @Test - public void requireThatUDFVerboseSetToFalseByDefault() throws IOException { - DataBag bag = BagFactory.getInstance().newDefaultBag(); - - Schema objectSchema = new Schema(); - Tuple objectTuple = TupleFactory.getInstance().newTuple(); - addToTuple("key", DataType.CHARARRAY, "123456", objectSchema, objectTuple); - addToTuple("value", DataType.INTEGER, 123456, objectSchema, objectTuple); - bag.add(objectTuple); - - objectSchema = new Schema(); - objectTuple = TupleFactory.getInstance().newTuple(); - addToTuple("key", DataType.CHARARRAY, "234567", objectSchema, objectTuple); - addToTuple("value", DataType.INTEGER, 234567, objectSchema, objectTuple); - bag.add(objectTuple); - - Schema schema = new Schema(); - Tuple tuple = TupleFactory.getInstance().newTuple(); - addToTuple("bag", DataType.BAG, bag, objectSchema, schema, tuple); - - VespaDocumentOperation docOp = new VespaDocumentOperation("docid=7654321", "bag-as-map-fields=bag"); - docOp.setInputSchema(schema); - String json = docOp.exec(tuple); - - assertEquals("", outContent.toString()); - } - - private void addToTuple(String alias, byte type, Object value, Schema schema, Tuple tuple) { - schema.add(new Schema.FieldSchema(alias, type)); - tuple.append(value); - } - - - private void addToTuple(String alias, byte type, Object value, Schema schemaInField, Schema schema, Tuple tuple) - throws FrontendException { - schema.add(new Schema.FieldSchema(alias, schemaInField, type)); - tuple.append(value); - } - - private void addToBagWithSchema(String alias, byte type, Tuple value, Schema schemaInField, Schema schema,DataBag bag) - throws FrontendException { - schema.add(new Schema.FieldSchema(alias, schemaInField, type)); - bag.add(value); - } -} diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaQueryTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaQueryTest.java deleted file mode 100644 index a0b549a737f..00000000000 --- a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaQueryTest.java +++ /dev/null @@ -1,101 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.hadoop.pig; - -import com.sun.net.httpserver.HttpServer; -import com.yahoo.vespa.hadoop.util.MockQueryHandler; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.HdfsConfiguration; -import org.apache.pig.ExecType; -import org.apache.pig.PigServer; -import org.apache.pig.data.Tuple; -import org.junit.jupiter.api.Test; - -import java.net.InetSocketAddress; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -public class VespaQueryTest { - - @Test - public void requireThatQueriesAreReturnedCorrectly() throws Exception { - runQueryTest("src/test/pig/query.pig", createQueryHandler(""), 18901); - } - - @Test - public void requireThatQueriesAreReturnedCorrectlyWithAlternativeJsonRoot() throws Exception { - runQueryTest("src/test/pig/query_alt_root.pig", createQueryHandler("children"), 18902); - } - - private void runQueryTest(String script, MockQueryHandler queryHandler, int port) throws Exception { - final String endpoint = "http://localhost:" + port; - - HttpServer server = HttpServer.create(new InetSocketAddress(port), 0); - server.createContext("/", queryHandler); - server.start(); - - PigServer ps = setup(script, endpoint); - - Iterator<Tuple> recommendations = ps.openIterator("recommendations"); - while (recommendations.hasNext()) { - Tuple tuple = recommendations.next(); - - String userid = (String) tuple.get(0); - Integer rank = (Integer) tuple.get(1); - String docid = (String) tuple.get(2); - Double relevance = (Double) tuple.get(3); - String fieldId = (String) tuple.get(4); - String fieldContent = (String) tuple.get(5); - - MockQueryHandler.MockQueryHit hit = queryHandler.getHit(userid, rank); - assertEquals(docid, hit.id); - assertEquals(relevance, hit.relevance, 1e-3); - assertEquals(fieldId, hit.fieldId); - assertEquals(fieldContent, hit.fieldContent); - } - - if (server != null) { - server.stop(0); - } - - } - - private PigServer setup(String script, String endpoint) throws Exception { - Configuration conf = new HdfsConfiguration(); - Map<String, String> parameters = new HashMap<>(); - parameters.put("ENDPOINT", endpoint); - - PigServer ps = new PigServer(ExecType.LOCAL, conf); - ps.setBatchOn(); - ps.registerScript(script, parameters); - - return ps; - } - - private MockQueryHandler createQueryHandler(String childNode) { - MockQueryHandler queryHandler = new MockQueryHandler(childNode); - - List<String> userIds = Arrays.asList("5", "104", "313"); - - int hitsPerUser = 3; - for (int i = 0; i < hitsPerUser * userIds.size(); ++i) { - String id = "" + (i+1); - String userId = userIds.get(i / hitsPerUser); - queryHandler.newHit(). - setId("id::::" + id). - setRelevance(1.0 - (i % hitsPerUser) * 0.1). - setFieldSddocname("doctype"). - setFieldId("" + id). - setFieldDate("2016060" + id). - setFieldContent("Content for user " + userId + " hit " + i % hitsPerUser + "..."). - add(userId); - } - - return queryHandler; - } - -} diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java deleted file mode 100644 index 3183c770bc7..00000000000 --- a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java +++ /dev/null @@ -1,106 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.hadoop.pig; - -import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration; -import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.HdfsConfiguration; -import org.apache.hadoop.mapred.Counters; -import org.apache.pig.ExecType; -import org.apache.pig.PigServer; -import org.apache.pig.backend.executionengine.ExecJob; -import org.apache.pig.tools.pigstats.JobStats; -import org.apache.pig.tools.pigstats.PigStats; -import org.apache.pig.tools.pigstats.mapreduce.MRJobStats; -import org.junit.jupiter.api.Test; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; - -public class VespaStorageTest { - - @Test - public void requireThatPremadeOperationsFeedSucceeds() throws Exception { - assertAllDocumentsOk("src/test/pig/feed_operations.pig"); - } - - - @Test - public void requireThatPremadeMultilineOperationsFeedSucceeds() throws Exception { - assertAllDocumentsOk("src/test/pig/feed_multiline_operations.pig"); - } - - - @Test - public void requireThatPremadeOperationsWithJsonLoaderFeedSucceeds() throws Exception { - assertAllDocumentsOk("src/test/pig/feed_operations_with_json_loader.pig"); - } - - @Test - public void requireThatPremadeOperationsWithJsonLoaderFeedAndNonLegacyClientSucceeds() throws Exception { - Configuration conf = new HdfsConfiguration(); - conf.set(VespaConfiguration.USE_SSL, Boolean.TRUE.toString()); - assertAllDocumentsOk("src/test/pig/feed_operations_with_json_loader.pig", conf); - } - - @Test - public void requireThatCreateOperationsFeedSucceeds() throws Exception { - assertAllDocumentsOk("src/test/pig/feed_create_operations.pig"); - } - - - @Test - public void requireThatCreateOperationsShortFormFeedSucceeds() throws Exception { - assertAllDocumentsOk("src/test/pig/feed_create_operations_short_form.pig"); - } - - - @Test - public void requireThatFeedVisitDataSucceeds() throws Exception { - assertAllDocumentsOk("src/test/pig/feed_visit_data.pig"); - } - - - private PigServer setup(String script, Configuration conf) throws Exception { - if (conf == null) { - conf = new HdfsConfiguration(); - } - conf.setIfUnset(VespaConfiguration.DRYRUN, "true"); - conf.setIfUnset(VespaConfiguration.ENDPOINT, "dummy-endpoint"); - - // Parameter substitutions - can also be set by configuration - Map<String, String> parameters = new HashMap<>(); - parameters.put("ENDPOINT", "endpoint-does-not-matter-in-dryrun,another-endpoint-that-does-not-matter"); - - PigServer ps = new PigServer(ExecType.LOCAL, conf); - ps.setBatchOn(); - ps.registerScript(script, parameters); - - return ps; - } - - - private void assertAllDocumentsOk(String script) throws Exception { - assertAllDocumentsOk(script, null); - } - - - private void assertAllDocumentsOk(String script, Configuration conf) throws Exception { - PigServer ps = setup(script, conf); - List<ExecJob> jobs = ps.executeBatch(); - PigStats stats = jobs.get(0).getStatistics(); - for (JobStats js : stats.getJobGraph()) { - Counters hadoopCounters = ((MRJobStats)js).getHadoopCounters(); - assertNotNull(hadoopCounters); - VespaCounters counters = VespaCounters.get(hadoopCounters); - assertEquals(10, counters.getDocumentsSent()); - assertEquals(0, counters.getDocumentsFailed()); - assertEquals(10, counters.getDocumentsOk()); - } - } - -} diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/util/MockQueryHandler.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/util/MockQueryHandler.java deleted file mode 100644 index 64c160ea14c..00000000000 --- a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/util/MockQueryHandler.java +++ /dev/null @@ -1,219 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.hadoop.util; - -import com.fasterxml.jackson.core.JsonEncoding; -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.JsonGenerator; -import com.sun.net.httpserver.HttpExchange; -import com.sun.net.httpserver.HttpHandler; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.net.URI; -import java.net.URLDecoder; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -public class MockQueryHandler implements HttpHandler { - - private final Map<String, List<MockQueryHit>> hitMap; - private final String childNode; - - public MockQueryHandler(String childNode) { - this.hitMap = new HashMap<>(); - this.childNode = childNode; - } - - public void handle(HttpExchange t) throws IOException { - URI uri = t.getRequestURI(); - String query = uri.getQuery(); - String response = null; - - // Parse query - extract "query" element - if (query != null) { - String params[] = query.split("[&]"); - for (String param : params) { - int i = param.indexOf('='); - String name = param.substring(0, i); - String value = URLDecoder.decode(param.substring(i + 1), "UTF-8"); - - if ("query".equalsIgnoreCase(name)) { - response = getResponse(URLDecoder.decode(param.substring(i + 1), "UTF-8")); - } - } - } - - t.sendResponseHeaders(200, response == null ? 0 : response.length()); - OutputStream os = t.getResponseBody(); - os.write(response == null ? "".getBytes() : response.getBytes()); - os.close(); - - } - - public MockQueryHit getHit(String query, Integer rank) { - if (!hitMap.containsKey(query)) { - return null; - } - if (rank >= hitMap.get(query).size()) { - return null; - } - return hitMap.get(query).get(rank); - } - - public MockQueryHit newHit() { - return new MockQueryHit(this); - } - - public void addHit(String query, MockQueryHit hit) { - if (!hitMap.containsKey(query)) { - hitMap.put(query, new ArrayList<>()); - } - hitMap.get(query).add(hit); - } - - private String getResponse(String query) throws IOException { - List<MockQueryHit> hits = hitMap.get(query); - if (hits == null) { - return null; - } - - ByteArrayOutputStream out = new ByteArrayOutputStream(); - JsonGenerator g = new JsonFactory().createGenerator(out, JsonEncoding.UTF8); - - writeResultStart(g, hits.size()); - for (MockQueryHit hit : hits) { - writeHit(g, hit); - } - writeResultsEnd(g); - g.close(); - - return out.toString(); - } - - private void writeHit(JsonGenerator g, MockQueryHit hit) throws IOException { - g.writeStartObject(); - - g.writeFieldName("id"); - g.writeString(hit.id); - - g.writeFieldName("relevance"); - g.writeNumber(hit.relevance); - - g.writeFieldName("fields"); - g.writeStartObject(); - - g.writeFieldName("sddocname"); - g.writeString(hit.fieldSddocname); - - g.writeFieldName("date"); - g.writeString(hit.fieldDate); - - g.writeFieldName("content"); - g.writeString(hit.fieldContent); - - g.writeFieldName("id"); - g.writeString(hit.fieldId); - - g.writeEndObject(); - g.writeEndObject(); - } - - private void writeResultStart(JsonGenerator g, int count) throws IOException { - g.writeStartObject(); - g.writeFieldName("root"); - - g.writeStartObject(); - - g.writeFieldName("id"); - g.writeString("toplevel"); - - g.writeFieldName("relevance"); - g.writeNumber(1); - - g.writeFieldName("fields"); - g.writeStartObject(); - g.writeFieldName("totalCount"); - g.writeNumber(count); - g.writeEndObject(); - - g.writeFieldName("coverage"); - g.writeStartObject(); - g.writeFieldName("coverage"); - g.writeNumber(100); - // ... more stuff here usually - g.writeEndObject(); - - g.writeFieldName("children"); - g.writeStartArray(); - - if (!childNode.isEmpty()) { - g.writeStartObject(); - g.writeFieldName(childNode); - g.writeStartArray(); - } - } - - private void writeResultsEnd(JsonGenerator g) throws IOException { - if (!childNode.isEmpty()) { - g.writeEndArray(); - g.writeEndObject(); - } - g.writeEndArray(); - g.writeEndObject(); - g.writeEndObject(); - } - - public static class MockQueryHit { - - private final MockQueryHandler handler; - - public String id; - public Double relevance; - public String fieldSddocname; - public String fieldDate; - public String fieldContent; - public String fieldId; - - private MockQueryHit(MockQueryHandler handler) { - this.handler = handler; - } - - public void add(String query) { - handler.addHit(query, this); - } - - public MockQueryHit setId(String id) { - this.id = id; - return this; - } - - public MockQueryHit setRelevance(Double relevance) { - this.relevance = relevance; - return this; - } - - public MockQueryHit setFieldSddocname(String fieldSddocname) { - this.fieldSddocname = fieldSddocname; - return this; - } - - public MockQueryHit setFieldDate(String fieldDate) { - this.fieldDate = fieldDate; - return this; - } - - public MockQueryHit setFieldContent(String fieldContent) { - this.fieldContent = fieldContent; - return this; - } - - public MockQueryHit setFieldId(String fieldId) { - this.fieldId = fieldId; - return this; - } - } - -} diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/util/TupleToolsTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/util/TupleToolsTest.java deleted file mode 100644 index b4ccbdf2183..00000000000 --- a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/util/TupleToolsTest.java +++ /dev/null @@ -1,51 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.hadoop.util; - -import com.yahoo.vespa.hadoop.mapreduce.util.TupleTools; -import org.apache.pig.data.DataType; -import org.apache.pig.data.Tuple; -import org.apache.pig.data.TupleFactory; -import org.apache.pig.impl.logicalLayer.schema.Schema; -import org.junit.jupiter.api.Test; - -import java.io.IOException; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -public class TupleToolsTest { - - @Test - public void requireThatTupleToStringHandlesSimpleTypes() throws IOException { - Schema schema = new Schema(); - Tuple tuple = TupleFactory.getInstance().newTuple(); - - addToTuple("id", DataType.CHARARRAY, "123", schema, tuple); - addToTuple("rank", DataType.INTEGER, 1, schema, tuple); - - String template = "Id is <id> and rank is <rank>"; - String result = TupleTools.toString(schema, tuple, template); - - assertEquals("Id is 123 and rank is 1", result); - } - - - private void addToTuple(String alias, byte type, Object value, Schema schema, Tuple tuple) { - schema.add(new Schema.FieldSchema(alias, type)); - tuple.append(value); - } - - @Test - public void requireThatTupleToStringHandlesStringCharacters() throws IOException { - Schema schema = new Schema(); - Tuple tuple = TupleFactory.getInstance().newTuple(); - - addToTuple("id", DataType.CHARARRAY, "_!@#$%^&*()", schema, tuple); - addToTuple("rank", DataType.INTEGER, 1, schema, tuple); - - String template = "Id is <id> and rank is <rank>"; - String result = TupleTools.toString(schema, tuple, template); - - assertEquals("Id is _!@#$%^&*() and rank is 1", result); - } - -} diff --git a/vespa-hadoop/src/test/pig/feed_create_operations.pig b/vespa-hadoop/src/test/pig/feed_create_operations.pig deleted file mode 100644 index 4583c095133..00000000000 --- a/vespa-hadoop/src/test/pig/feed_create_operations.pig +++ /dev/null @@ -1,24 +0,0 @@ --- Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. --- REGISTER vespa-hadoop.jar -- Not needed in tests - --- Create valid Vespa put operations -DEFINE VespaPutOperation - com.yahoo.vespa.hadoop.pig.VespaDocumentOperation( - 'operation=put', - 'docid=id:<application>:metrics::<name>-<date>' - ); - --- By default, VespaStorage assumes it's feeding valid Vespa operations -DEFINE VespaStorage - com.yahoo.vespa.hadoop.pig.VespaStorage(); - --- Load tabular data -metrics = LOAD 'src/test/resources/tabular_data.csv' AS (date:chararray, name:chararray, value:int, application:chararray); - --- Transform tabular data to a Vespa document operation JSON format -metrics = FOREACH metrics GENERATE VespaPutOperation(*); - --- Store into Vespa -STORE metrics INTO '$ENDPOINT' USING VespaStorage(); - - diff --git a/vespa-hadoop/src/test/pig/feed_create_operations_short_form.pig b/vespa-hadoop/src/test/pig/feed_create_operations_short_form.pig deleted file mode 100644 index 0f0e63d843a..00000000000 --- a/vespa-hadoop/src/test/pig/feed_create_operations_short_form.pig +++ /dev/null @@ -1,19 +0,0 @@ --- Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. --- REGISTER vespa-hadoop.jar -- Not needed in tests - --- Transform tabular data to a Vespa document operation JSON format --- as part of storing the data. -DEFINE VespaStorage - com.yahoo.vespa.hadoop.pig.VespaStorage( - 'create-document-operation=true', - 'operation=put', - 'docid=id:<application>:metrics::<name>-<date>' - ); - --- Load tabular data -metrics = LOAD 'src/test/resources/tabular_data.csv' AS (date:chararray, name:chararray, value:int, application:chararray); - --- Store into Vespa -STORE metrics INTO '$ENDPOINT' USING VespaStorage(); - - diff --git a/vespa-hadoop/src/test/pig/feed_multiline_operations.pig b/vespa-hadoop/src/test/pig/feed_multiline_operations.pig deleted file mode 100644 index 1971270cbdc..00000000000 --- a/vespa-hadoop/src/test/pig/feed_multiline_operations.pig +++ /dev/null @@ -1,15 +0,0 @@ --- Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. --- REGISTER vespa-hadoop.jar -- Not needed in tests - --- Define short name for VespaJsonLoader -DEFINE VespaJsonLoader com.yahoo.vespa.hadoop.pig.VespaSimpleJsonLoader(); - --- Define short name for VespaStorage -DEFINE VespaStorage com.yahoo.vespa.hadoop.pig.VespaStorage(); - --- Load data - one column for json data -metrics = LOAD 'src/test/resources/operations_multiline_data.json' USING VespaJsonLoader() AS (data:chararray); - --- Store into Vespa -STORE metrics INTO '$ENDPOINT' USING VespaStorage(); - diff --git a/vespa-hadoop/src/test/pig/feed_operations.pig b/vespa-hadoop/src/test/pig/feed_operations.pig deleted file mode 100644 index 48873fde87a..00000000000 --- a/vespa-hadoop/src/test/pig/feed_operations.pig +++ /dev/null @@ -1,11 +0,0 @@ --- Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. --- REGISTER vespa-hadoop.jar -- Not needed in tests - --- Define short name for VespaStorage -DEFINE VespaStorage com.yahoo.vespa.hadoop.pig.VespaStorage(); - --- Load data - one column for json data -metrics = LOAD 'src/test/resources/operations_data.json' AS (data:chararray); - --- Store into Vespa -STORE metrics INTO '$ENDPOINT' USING VespaStorage(); diff --git a/vespa-hadoop/src/test/pig/feed_operations_with_json_loader.pig b/vespa-hadoop/src/test/pig/feed_operations_with_json_loader.pig deleted file mode 100644 index da58fe3c678..00000000000 --- a/vespa-hadoop/src/test/pig/feed_operations_with_json_loader.pig +++ /dev/null @@ -1,14 +0,0 @@ --- Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. --- REGISTER vespa-hadoop.jar -- Not needed in tests - --- Define short name for VespaJsonLoader -DEFINE VespaJsonLoader com.yahoo.vespa.hadoop.pig.VespaSimpleJsonLoader(); - --- Define short name for VespaStorage -DEFINE VespaStorage com.yahoo.vespa.hadoop.pig.VespaStorage(); - --- Load data - one column for json data -metrics = LOAD 'src/test/resources/operations_data.json' USING VespaJsonLoader() AS (data:chararray); - --- Store into Vespa -STORE metrics INTO '$ENDPOINT' USING VespaStorage(); diff --git a/vespa-hadoop/src/test/pig/feed_operations_xml.pig b/vespa-hadoop/src/test/pig/feed_operations_xml.pig deleted file mode 100644 index 4e5057f4909..00000000000 --- a/vespa-hadoop/src/test/pig/feed_operations_xml.pig +++ /dev/null @@ -1,11 +0,0 @@ --- Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. --- REGISTER vespa-hadoop.jar -- Not needed in tests - --- Define short name for VespaStorage -DEFINE VespaStorage com.yahoo.vespa.hadoop.pig.VespaStorage(); - --- Load data - one column for xml data -data = LOAD 'src/test/resources/operations_data.xml' AS (data:chararray); - --- Store into Vespa -STORE data INTO '$ENDPOINT' USING VespaStorage(); diff --git a/vespa-hadoop/src/test/pig/feed_visit_data.pig b/vespa-hadoop/src/test/pig/feed_visit_data.pig deleted file mode 100644 index 59d144b53dc..00000000000 --- a/vespa-hadoop/src/test/pig/feed_visit_data.pig +++ /dev/null @@ -1,12 +0,0 @@ --- Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. --- REGISTER vespa-hadoop.jar -- Not needed in tests - --- Define short name for VespaStorage -DEFINE VespaStorage com.yahoo.vespa.hadoop.pig.VespaStorage(); - --- Load data - one column for json data -metrics = LOAD 'src/test/resources/visit_data.json' AS (data:chararray); - --- Store into Vespa -STORE metrics INTO '$ENDPOINT' USING VespaStorage(); - diff --git a/vespa-hadoop/src/test/pig/query.pig b/vespa-hadoop/src/test/pig/query.pig deleted file mode 100644 index 96caa5cd0c4..00000000000 --- a/vespa-hadoop/src/test/pig/query.pig +++ /dev/null @@ -1,19 +0,0 @@ --- Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. --- REGISTER vespa-hadoop.jar -- Not needed in tests - --- Define Vespa query for retrieving blog posts -DEFINE BlogPostRecommendations - com.yahoo.vespa.hadoop.pig.VespaQuery( - 'query=$ENDPOINT/search?query=<userid>&hits=100', - 'schema=rank:int,id:chararray,relevance:double,fields/id:chararray,fields/content:chararray' - ); - --- Load data from a local file -users = LOAD 'src/test/resources/user_ids.csv' AS (userid:chararray); -users = FILTER users BY userid IS NOT null; - --- Run a set of queries against Vespa -recommendations = FOREACH users GENERATE userid, FLATTEN(BlogPostRecommendations(*)); - --- Output recommendations -DUMP recommendations; diff --git a/vespa-hadoop/src/test/pig/query_alt_root.pig b/vespa-hadoop/src/test/pig/query_alt_root.pig deleted file mode 100644 index 2884b4a600f..00000000000 --- a/vespa-hadoop/src/test/pig/query_alt_root.pig +++ /dev/null @@ -1,20 +0,0 @@ --- Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. --- REGISTER vespa-hadoop.jar -- Not needed in tests - --- Define Vespa query for retrieving blog posts -DEFINE BlogPostRecommendations - com.yahoo.vespa.hadoop.pig.VespaQuery( - 'query=$ENDPOINT/search?query=<userid>&hits=100', - 'rootnode=root/children/children', - 'schema=rank:int,id:chararray,relevance:double,fields/id:chararray,fields/content:chararray' - ); - --- Load data from a local file -users = LOAD 'src/test/resources/user_ids.csv' AS (userid:chararray); -users = FILTER users BY userid IS NOT null; - --- Run a set of queries against Vespa -recommendations = FOREACH users GENERATE userid, FLATTEN(BlogPostRecommendations(*)); - --- Output recommendations -DUMP recommendations; diff --git a/vespa-hadoop/src/test/resources/operations_data.json b/vespa-hadoop/src/test/resources/operations_data.json deleted file mode 100644 index 5af436dbfe7..00000000000 --- a/vespa-hadoop/src/test/resources/operations_data.json +++ /dev/null @@ -1,10 +0,0 @@ -{"put":"id:testapp:metric::clicks-2015110414","fields":{"date":"2015110414","name":"clicks","value":1,"application":"testapp"}} -{"fields":{"date":"2015110416","name":"clicks","value":5,"application":"testapp"},"put":"id:testapp:metric::clicks-2015110416"} -{"put":"id:testapp:metric::clicks-2015110415","fields":{"date":"2015110415","name":"clicks","value":2,"application":"testapp"}} -{"put":"id:testapp:metric::clicks-2015110417","fields":{"date":"2015110417","name":"clicks","value":3,"application":"testapp"}} -{"put":"id:testapp:metric::clicks-2015110418","fields":{"date":"2015110418","name":"clicks","value":6,"application":"testapp"}} -{"put":"id:testapp:metric::clicks-2015110419","fields":{"date":"2015110419","name":"clicks","value":3,"application":"testapp"}} -{"put":"id:testapp:metric::clicks-2015110420","fields":{"date":"2015110420","name":"clicks","value":4,"application":"testapp"}} -{"put":"id:testapp:metric::clicks-2015110421","fields":{"date":"2015110421","name":"clicks","value":2,"application":"testapp"}} -{"fields":{"date":"2015110422","name":"clicks","value":5,"application":"testapp"},"condition":"metrics==0","put":"id:testapp:metric::clicks-2015110422"} -{"put":"id:testapp:metric::clicks-2015110423","fields":{"date":"2015110423","name":"clicks","value":1,"application":"testapp"}} diff --git a/vespa-hadoop/src/test/resources/operations_data.xml b/vespa-hadoop/src/test/resources/operations_data.xml deleted file mode 100644 index db02b6bee73..00000000000 --- a/vespa-hadoop/src/test/resources/operations_data.xml +++ /dev/null @@ -1,14 +0,0 @@ -<?xml version="1.0" encoding="utf-8"?> -<!-- Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. --> -<vespafeed> - <document documenttype="music" documentid="id:music:music::http://music.yahoo.com/a-ha/Scoundrel+Days"> <url>http://music.yahoo.com/a-ha/Scoundrel+Days</url> <title><![CDATA[Scoundrel Days]]></title> <artist><![CDATA[a-ha]]></artist> <year>0</year> <popularity>290</popularity> </document> - <document documenttype="music" documentid="id:music:music::http://music.yahoo.com/Accept/Restless+And+Wild"> <url>http://music.yahoo.com/Accept/Restless+And+Wild</url> <title><![CDATA[Restless And Wild]]></title> <artist><![CDATA[Accept]]></artist> <year>0</year> <popularity>75</popularity> </document> - <document documenttype="music" documentid="id:music:music::http://music.yahoo.com/Accept/Staying+A+Life"> <url>http://music.yahoo.com/Accept/Staying+A+Life</url> <title><![CDATA[Staying A Life]]></title> <artist><![CDATA[Accept]]></artist> <year>1985</year> <popularity>77</popularity> </document> - <document documenttype="music" documentid="id:music:music::http://music.yahoo.com/Alice+In+Chains/Dirt"> <url>http://music.yahoo.com/Alice+In+Chains/Dirt</url> <title><![CDATA[Dirt]]></title> <artist><![CDATA[Alice In Chains]]></artist> <year>1992</year> <popularity>114</popularity> </document> - <document documenttype="music" documentid="id:music:music::http://music.yahoo.com/Alice+In+Chains/Live"> <url>http://music.yahoo.com/Alice+In+Chains/Live</url> <title><![CDATA[Live]]></title> <artist><![CDATA[Alice In Chains]]></artist> <year>1990</year> <popularity>363</popularity> </document> - <document documenttype="music" documentid="id:music:music::http://music.yahoo.com/Amy+MacDonald/This+Is+The+Life"> <url>http://music.yahoo.com/Amy+MacDonald/This+Is+The+Life</url> <title><![CDATA[This Is The Life]]></title> <artist><![CDATA[Amy MacDonald]]></artist> <year>2007</year> <popularity>355</popularity> </document> - <document documenttype="music" documentid="id:music:music::http://music.yahoo.com/Ane+Brun/Duets"> <url>http://music.yahoo.com/Ane+Brun/Duets</url> <title><![CDATA[Duets]]></title> <artist><![CDATA[Ane Brun]]></artist> <year>0</year> <popularity>255</popularity> </document> - <update documenttype="music" documentid="id:music:music::http://music.yahoo.com/bobdylan/BestOf"><assign field="title">The Best of Bob Dylan</assign><add field="tracks"><item>Man Of Constant Sorrow</item></add></update> - <remove documentid="id:music:music::http://music.yahoo.com/Aqpop/Beautifully+Smart" /> - <document documenttype="music" documentid="id:music:music::http://music.yahoo.com/Annuals/Be+He+Me"> <url>http://music.yahoo.com/Annuals/Be+He+Me</url> <title><![CDATA[Be He Me]]></title> <artist><![CDATA[Annuals]]></artist> <year>0</year> <popularity>207</popularity> </document> -</vespafeed> diff --git a/vespa-hadoop/src/test/resources/operations_multiline_data.json b/vespa-hadoop/src/test/resources/operations_multiline_data.json deleted file mode 100644 index 2b51698d9b7..00000000000 --- a/vespa-hadoop/src/test/resources/operations_multiline_data.json +++ /dev/null @@ -1,93 +0,0 @@ -[ - { - "put": "id:testapp:metric::clicks-2015110414", - "fields": { - "date": "2015110414", - "name": "clicks", - "value": 1, - "application": "testapp" - } - }, - { - "fields": { - "date": "2015110416", - "name": "clicks", - "value": 5, - "application": "testapp" - }, - "put": "id:testapp:metric::clicks-2015110416" - }, - { - "put": "id:testapp:metric::clicks-2015110415", - "fields": { - "date": "2015110415", - "name": "clicks", - "value": 2, - "application": "testapp" - } - }, - { - "put": "id:testapp:metric::clicks-2015110417", - "fields": { - "date": "2015110417", - "name": "clicks", - "value": 3, - "application": "testapp" - } - }, - { - "put": "id:testapp:metric::clicks-2015110418", - "fields": { - "date": "2015110418", - "name": "clicks", - "value": 6, - "application": "testapp" - } - }, - { - "put": "id:testapp:metric::clicks-2015110419", - "fields": { - "date": "2015110419", - "name": "clicks", - "value": 3, - "application": "testapp" - } - }, - { - "put": "id:testapp:metric::clicks-2015110420", - "fields": { - "date": "2015110420", - "name": "clicks", - "value": 4, - "application": "testapp" - } - }, - { - "put": "id:testapp:metric::clicks-2015110421", - "fields": { - "date": "2015110421", - "name": "clicks", - "value": 2, - "application": "testapp" - } - }, - { - "fields": { - "date": "2015110422", - "name": "clicks", - "value": 5, - "application": "testapp" - }, - "condition": "metrics==0", - "put": "id:testapp:metric::clicks-2015110422" - }, - { - "put": "id:testapp:metric::clicks-2015110423", - "fields": { - "date": "2015110423", - "name": "clicks", - "value": 1, - "application": "testapp" - } - } -] diff --git a/vespa-hadoop/src/test/resources/tabular_data.csv b/vespa-hadoop/src/test/resources/tabular_data.csv deleted file mode 100644 index 541597998e9..00000000000 --- a/vespa-hadoop/src/test/resources/tabular_data.csv +++ /dev/null @@ -1,11 +0,0 @@ -2015110414 clicks 1 testapp -2015110415 clicks 2 testapp -2015110416 clicks 5 testapp -2015110417 clicks 3 testapp -2015110418 clicks 6 testapp -2015110419 clicks 3 testapp -2015110420 clicks 4 testapp -2015110421 clicks 2 testapp -2015110422 clicks 5 testapp -2015110423 clicks 1 testapp - diff --git a/vespa-hadoop/src/test/resources/user_ids.csv b/vespa-hadoop/src/test/resources/user_ids.csv deleted file mode 100644 index 5875a3b9a7c..00000000000 --- a/vespa-hadoop/src/test/resources/user_ids.csv +++ /dev/null @@ -1,4 +0,0 @@ -5 -104 -313 - diff --git a/vespa-hadoop/src/test/resources/visit_data.json b/vespa-hadoop/src/test/resources/visit_data.json deleted file mode 100644 index 947b9326cc8..00000000000 --- a/vespa-hadoop/src/test/resources/visit_data.json +++ /dev/null @@ -1,10 +0,0 @@ -{"id":"id:testapp:metric::clicks-2015110414","fields":{"date":"2015110414","name":"clicks","value":1,"application":"testapp"}} -{"id":"id:testapp:metric::clicks-2015110415","fields":{"date":"2015110415","name":"clicks","value":2,"application":"testapp"}} -{"id":"id:testapp:metric::clicks-2015110416","fields":{"date":"2015110416","name":"clicks","value":4,"application":"testapp"}} -{"id":"id:testapp:metric::clicks-2015110417","fields":{"date":"2015110417","name":"clicks","value":3,"application":"testapp"}} -{"id":"id:testapp:metric::clicks-2015110418","fields":{"date":"2015110418","name":"clicks","value":6,"application":"testapp"}} -{"id":"id:testapp:metric::clicks-2015110419","fields":{"date":"2015110419","name":"clicks","value":3,"application":"testapp"}} -{"id":"id:testapp:metric::clicks-2015110420","fields":{"date":"2015110420","name":"clicks","value":4,"application":"testapp"}} -{"id":"id:testapp:metric::clicks-2015110421","fields":{"date":"2015110421","name":"clicks","value":2,"application":"testapp"}} -{"id":"id:testapp:metric::clicks-2015110422","fields":{"date":"2015110422","name":"clicks","value":7,"application":"testapp"}} -{"id":"id:testapp:metric::clicks-2015110423","fields":{"date":"2015110423","name":"clicks","value":1,"application":"testapp"}}
\ No newline at end of file diff --git a/vespabase/CMakeLists.txt b/vespabase/CMakeLists.txt index ce19dbb56b3..e72f02d5eeb 100644 --- a/vespabase/CMakeLists.txt +++ b/vespabase/CMakeLists.txt @@ -23,7 +23,6 @@ install(FILES ${CMAKE_CURRENT_BINARY_DIR}/conf/default-env.txt DESTINATION conf/ install(DIRECTORY DESTINATION logs/vespa) install(DIRECTORY DESTINATION logs/vespa/access) -install(DIRECTORY DESTINATION tmp/vespa) install(DIRECTORY DESTINATION var/crash) install(DIRECTORY DESTINATION var/db/vespa) install(DIRECTORY DESTINATION var/db/vespa/config_server) @@ -36,6 +35,7 @@ install(DIRECTORY DESTINATION var/db/vespa/search) install(DIRECTORY DESTINATION var/db/vespa/tmp) install(DIRECTORY DESTINATION var/jdisc_container) install(DIRECTORY DESTINATION var/run) +install(DIRECTORY DESTINATION var/tmp/vespa) install(DIRECTORY DESTINATION var/vespa) install(DIRECTORY DESTINATION var/vespa/application) install(DIRECTORY DESTINATION var/vespa/bundlecache) diff --git a/vespabase/src/rhel-prestart.sh b/vespabase/src/rhel-prestart.sh index d6f53046b47..358e9ceccdb 100755 --- a/vespabase/src/rhel-prestart.sh +++ b/vespabase/src/rhel-prestart.sh @@ -124,8 +124,6 @@ fixdir ${VESPA_USER} ${VESPA_GROUP} 755 logs/vespa fixdir ${VESPA_USER} ${VESPA_GROUP} 755 logs/vespa/access fixdir ${VESPA_USER} ${VESPA_GROUP} 755 logs/vespa/configserver fixdir ${VESPA_USER} ${VESPA_GROUP} 755 logs/vespa/search -fixdir ${VESPA_USER} ${VESPA_GROUP} 755 tmp -fixdir ${VESPA_USER} ${VESPA_GROUP} 755 tmp/vespa fixdir ${VESPA_USER} ${VESPA_GROUP} 755 var fixdir ${VESPA_USER} ${VESPA_GROUP} 755 var/crash fixdir ${VESPA_USER} ${VESPA_GROUP} 755 var/db @@ -141,6 +139,8 @@ fixdir ${VESPA_USER} ${VESPA_GROUP} 755 var/db/vespa/search fixdir ${VESPA_USER} ${VESPA_GROUP} 755 var/db/vespa/tmp fixdir ${VESPA_USER} ${VESPA_GROUP} 755 var/jdisc_container fixdir ${VESPA_USER} ${VESPA_GROUP} 755 var/run +fixdir ${VESPA_USER} ${VESPA_GROUP} 755 var/tmp +fixdir ${VESPA_USER} ${VESPA_GROUP} 755 var/tmp/vespa fixdir ${VESPA_USER} ${VESPA_GROUP} 755 var/vespa fixdir ${VESPA_USER} ${VESPA_GROUP} 755 var/vespa/application fixdir ${VESPA_USER} ${VESPA_GROUP} 755 var/vespa/bundlecache diff --git a/vespajlib/abi-spec.json b/vespajlib/abi-spec.json index a22e24aafc2..8a2e68a8d8c 100644 --- a/vespajlib/abi-spec.json +++ b/vespajlib/abi-spec.json @@ -3833,6 +3833,7 @@ "public" ], "methods" : [ + "public void <init>(java.util.function.Supplier)", "public void <init>(java.util.function.Supplier, com.yahoo.yolean.concurrent.Memoized$Closer)", "public static com.yahoo.yolean.concurrent.Memoized of(java.util.function.Supplier)", "public static com.yahoo.yolean.concurrent.Memoized combine(com.yahoo.yolean.concurrent.Memoized, java.util.function.Function, com.yahoo.yolean.concurrent.Memoized$Closer)", diff --git a/vespajlib/src/main/java/com/yahoo/compress/ArchiveStreamReader.java b/vespajlib/src/main/java/com/yahoo/compress/ArchiveStreamReader.java index e65a645f5be..f8faf655415 100644 --- a/vespajlib/src/main/java/com/yahoo/compress/ArchiveStreamReader.java +++ b/vespajlib/src/main/java/com/yahoo/compress/ArchiveStreamReader.java @@ -136,8 +136,8 @@ public class ArchiveStreamReader implements AutoCloseable { // Commons Compress only has limited support for symlinks as they are only detected when the ZIP file is read // through org.apache.commons.compress.archivers.zip.ZipFile. This is not the case in this class, because it must // support reading ZIP files from generic input streams. The check below thus always returns false. - if (entry instanceof ZipArchiveEntry) return ((ZipArchiveEntry) entry).isUnixSymlink(); - if (entry instanceof TarArchiveEntry) return ((TarArchiveEntry) entry).isSymbolicLink(); + if (entry instanceof ZipArchiveEntry zipEntry) return zipEntry.isUnixSymlink(); + if (entry instanceof TarArchiveEntry tarEntry) return tarEntry.isSymbolicLink(); throw new IllegalArgumentException("Unsupported archive entry " + entry.getClass().getSimpleName() + ", cannot check for symbolic link"); } diff --git a/vespajlib/src/main/java/com/yahoo/io/LazyInputStream.java b/vespajlib/src/main/java/com/yahoo/io/LazyInputStream.java new file mode 100644 index 00000000000..3ff7ada6b59 --- /dev/null +++ b/vespajlib/src/main/java/com/yahoo/io/LazyInputStream.java @@ -0,0 +1,53 @@ +package com.yahoo.io; + +import java.io.IOException; +import java.io.InputStream; +import java.util.function.Supplier; + +/** + * Input stream wrapping an input stream supplier, which doesn't have content yet at declaration time. + * + * @author jonmv + */ +public class LazyInputStream extends InputStream { + + private Supplier<InputStream> source; + private InputStream delegate; + + public LazyInputStream(Supplier<InputStream> source) { + this.source = source; + } + + private InputStream in() { + if (delegate == null) { + delegate = source.get(); + source = null; + } + return delegate; + } + + @Override + public int read() throws IOException { return in().read(); } + + @Override + public int read(byte[] b, int off, int len) throws IOException { return in().read(b, off, len); } + + @Override + public long skip(long n) throws IOException { return in().skip(n); } + + @Override + public int available() throws IOException { return in().available(); } + + @Override + public void close() throws IOException { in().close(); } + + @Override + public synchronized void mark(int readlimit) { in().mark(readlimit); } + + @Override + public synchronized void reset() throws IOException { in().reset(); } + + @Override + public boolean markSupported() { return in().markSupported(); } + +} diff --git a/vespajlib/src/main/java/com/yahoo/yolean/concurrent/Memoized.java b/vespajlib/src/main/java/com/yahoo/yolean/concurrent/Memoized.java index 8e2b7b7a7eb..ba5ef7bab2d 100644 --- a/vespajlib/src/main/java/com/yahoo/yolean/concurrent/Memoized.java +++ b/vespajlib/src/main/java/com/yahoo/yolean/concurrent/Memoized.java @@ -34,15 +34,23 @@ public class Memoized<T, E extends Exception> implements Supplier<T>, AutoClosea private volatile T wrapped; private Supplier<T> factory; + /** Returns a new Memoized which has no close method. */ + public Memoized(Supplier<T> factory) { + this(factory, __ -> { }); + } + + /** Returns a new Memoized with the given factory and closer. */ public Memoized(Supplier<T> factory, Closer<T, E> closer) { this.factory = requireNonNull(factory); this.closer = requireNonNull(closer); } + /** Returns a generic AutoCloseable Memoized with the given AutoCloseable-supplier. */ public static <T extends AutoCloseable> Memoized<T, ?> of(Supplier<T> factory) { return new Memoized<>(factory, AutoCloseable::close); } + /** Composes the given memoized with a function taking its output as an argument to produce a new Memoized, with the given closer. */ public static <T, U, E extends Exception> Memoized<U, E> combine(Memoized<T, ? extends E> inner, Function<T, U> outer, Closer<U, ? extends E> closer) { return new Memoized<>(() -> outer.apply(inner.get()), compose(closer, inner::close)); } diff --git a/vespalib/src/tests/coro/lazy/lazy_test.cpp b/vespalib/src/tests/coro/lazy/lazy_test.cpp index ead5e8e6427..ec27bf195ec 100644 --- a/vespalib/src/tests/coro/lazy/lazy_test.cpp +++ b/vespalib/src/tests/coro/lazy/lazy_test.cpp @@ -1,10 +1,11 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <vespa/vespalib/coro/lazy.h> -#include <vespa/vespalib/coro/sync_wait.h> +#include <vespa/vespalib/coro/completion.h> #include <vespa/vespalib/coro/schedule.h> #include <vespa/vespalib/util/size_literals.h> #include <vespa/vespalib/util/require.h> +#include <vespa/vespalib/util/gate.h> #include <vespa/vespalib/util/threadstackexecutor.h> #include <vespa/vespalib/gtest/gtest.h> #include <mutex> @@ -12,7 +13,9 @@ #include <thread> using vespalib::Executor; +using vespalib::Gate; using vespalib::coro::Lazy; +using vespalib::coro::Received; using vespalib::coro::ScheduleFailedException; using vespalib::coro::schedule; using vespalib::coro::sync_wait; @@ -56,7 +59,7 @@ Lazy<std::pair<bool,T>> try_schedule_on(Executor &executor, Lazy<T> value) { std::cerr << "switching from thread " << std::this_thread::get_id() << std::endl; bool accepted = co_await try_schedule(executor); std::cerr << "........... to thread " << std::this_thread::get_id() << std::endl; - co_return std::make_pair(accepted, co_await value); + co_return std::make_pair(accepted, co_await std::move(value)); } template <typename T> @@ -64,18 +67,18 @@ Lazy<T> schedule_on(Executor &executor, Lazy<T> value) { std::cerr << "switching from thread " << std::this_thread::get_id() << std::endl; co_await schedule(executor); std::cerr << "........... to thread " << std::this_thread::get_id() << std::endl; - co_return co_await value; + co_return co_await std::move(value); } TEST(LazyTest, simple_lazy_value) { auto lazy = make_lazy(42); - auto result = sync_wait(lazy); + auto result = sync_wait(std::move(lazy)); EXPECT_EQ(result, 42); } TEST(LazyTest, async_sum_of_async_values) { auto lazy = async_add_values(10, 20); - auto result = sync_wait(lazy); + auto result = sync_wait(std::move(lazy)); EXPECT_EQ(result, 30); } @@ -83,13 +86,13 @@ TEST(LazyTest, async_sum_of_external_async_values) { auto a = make_lazy(100); auto b = make_lazy(200); auto lazy = async_sum(std::move(a), std::move(b)); - auto result = sync_wait(lazy); + auto result = sync_wait(std::move(lazy)); EXPECT_EQ(result, 300); } TEST(LazyTest, extract_rvalue_from_lazy_in_coroutine) { auto lazy = extract_rvalue(); - auto result = sync_wait(lazy); + auto result = sync_wait(std::move(lazy)); EXPECT_EQ(result, 123); } @@ -110,7 +113,7 @@ TEST(LazyTest, calculate_result_in_another_thread) { TEST(LazyTest, exceptions_are_propagated) { vespalib::ThreadStackExecutor executor(1, 128_Ki); auto lazy = try_schedule_on(executor, forward_value(will_throw())); - EXPECT_THROW(sync_wait(lazy), vespalib::RequireFailedException); + EXPECT_THROW(sync_wait(std::move(lazy)), vespalib::RequireFailedException); } TEST(LazyTest, not_able_to_switch_thread_if_executor_is_shut_down) { @@ -120,7 +123,49 @@ TEST(LazyTest, not_able_to_switch_thread_if_executor_is_shut_down) { EXPECT_EQ(result.first, false); EXPECT_EQ(result.second, 7); auto lazy = schedule_on(executor, make_lazy(8)); - EXPECT_THROW(sync_wait(lazy), ScheduleFailedException); + EXPECT_THROW(sync_wait(std::move(lazy)), ScheduleFailedException); +} + +TEST(LazyTest, async_wait_with_lambda) { + Gate gate; + Received<int> result; + vespalib::ThreadStackExecutor executor(1, 128_Ki); + auto lazy = schedule_on(executor, make_lazy(7)); + async_wait(std::move(lazy), [&](auto res) + { + result = res; + gate.countDown(); + }); + gate.await(); + EXPECT_EQ(result.get_value(), 7); +} + +TEST(LazyTest, async_wait_with_error) { + Gate gate; + Received<int> result; + vespalib::ThreadStackExecutor executor(1, 128_Ki); + auto lazy = schedule_on(executor, will_throw()); + async_wait(std::move(lazy), [&](auto res) + { + result = res; + gate.countDown(); + }); + gate.await(); + EXPECT_THROW(result.get_value(), vespalib::RequireFailedException); +} + +TEST(LazyTest, async_wait_with_move_only_result) { + Gate gate; + Received<std::unique_ptr<int>> result; + vespalib::ThreadStackExecutor executor(1, 128_Ki); + auto lazy = schedule_on(executor, move_only_int()); + async_wait(std::move(lazy), [&](auto res) + { + result = std::move(res); + gate.countDown(); + }); + gate.await(); + EXPECT_EQ(*(result.get_value()), 123); } GTEST_MAIN_RUN_ALL_TESTS() diff --git a/vespalib/src/vespa/vespalib/coro/completion.h b/vespalib/src/vespa/vespalib/coro/completion.h new file mode 100644 index 00000000000..f323d8c68bf --- /dev/null +++ b/vespalib/src/vespa/vespalib/coro/completion.h @@ -0,0 +1,104 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "lazy.h" +#include "detached.h" +#include "received.h" + +#include <coroutine> +#include <exception> +#include <future> +#include <type_traits> + +namespace vespalib::coro { + +// Resume/start the coroutine responsible for calculating the result +// and signal the receiver when it completes or fails. Note that the +// detached coroutine will own both the coroutine calculating the +// result and the receiver that is later notified of the result. The +// detached coroutine will automatically self-destroy when it returns, +// thereby also destroying the value and receiver. This is the +// fundamental building block used to adapt the asynchronous result of +// a coroutine with external code. This also closely models abstract +// execution where the coroutine represented by Lazy<T> is the +// sender. Execution parameters can be encapsulated inside Lazy<T> +// using composition (for example which executor should run the +// coroutine). + +template <typename T, typename R> +Detached connect_resume(Lazy<T> value, R receiver) { + try { + receiver.set_value(co_await std::move(value)); + } catch (...) { + receiver.set_error(std::current_exception()); + } +} + +// replace Lazy<T> with std::future<T> to be able to synchronously +// wait for its completion. Implemented in terms of connect_resume. + +template <typename T> +std::future<T> make_future(Lazy<T> value) { + struct receiver { + std::promise<T> promise; + receiver() : promise() {} + void set_value(T value) { + promise.set_value(std::move(value)); + } + void set_error(std::exception_ptr error) { + promise.set_exception(error); + } + }; + receiver my_receiver; + auto future = my_receiver.promise.get_future(); + connect_resume(std::move(value), std::move(my_receiver)); + return future; +} + +// Create a receiver from a function object (typically a lambda +// closure) that takes a received value (stored receiver result) as +// its only parameter. + +template <typename T, typename F> +auto make_receiver(F &&f) { + struct receiver { + Received<T> result; + std::decay_t<F> fun; + receiver(F &&f) + : result(), fun(std::forward<F>(f)) {} + void set_value(T value) { + result.set_value(std::move(value)); + fun(std::move(result)); + } + void set_error(std::exception_ptr why) { + result.set_error(why); + fun(std::move(result)); + } + }; + return receiver(std::forward<F>(f)); +} + +/** + * Wait for a lazy value to be calculated synchronously. Make sure the + * thread waiting is not needed in the calculation of the value, or + * you will end up with a deadlock. + **/ +template <typename T> +T sync_wait(Lazy<T> value) { + return make_future(std::move(value)).get(); +} + +/** + * Wait for a lazy value to be calculated asynchronously; the provided + * callback will be called with a Received<T> when the Lazy<T> is + * done. Both the callback itself and the Lazy<T> will be destructed + * afterwards; cleaning up the coroutine tree representing the + * calculation. + **/ +template <typename T, typename F> +void async_wait(Lazy<T> value, F &&f) { + connect_resume(std::move(value), make_receiver<T>(std::forward<F>(f))); +} + +} diff --git a/vespalib/src/vespa/vespalib/coro/lazy.h b/vespalib/src/vespa/vespalib/coro/lazy.h index 5a10c05bc24..144b5c945f0 100644 --- a/vespalib/src/vespa/vespalib/coro/lazy.h +++ b/vespalib/src/vespa/vespalib/coro/lazy.h @@ -64,6 +64,7 @@ public: } return std::move(*value); } + ~promise_type(); }; using Handle = std::coroutine_handle<promise_type>; @@ -108,4 +109,7 @@ public: } }; +template<typename T> +Lazy<T>::promise_type::~promise_type() = default; + } diff --git a/vespalib/src/vespa/vespalib/coro/received.h b/vespalib/src/vespa/vespalib/coro/received.h new file mode 100644 index 00000000000..4f2efddcfa1 --- /dev/null +++ b/vespalib/src/vespa/vespalib/coro/received.h @@ -0,0 +1,46 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <variant> +#include <exception> +#include <stdexcept> + +namespace vespalib::coro { + +struct UnavailableResultException : std::runtime_error { + using std::runtime_error::runtime_error; +}; + +/** + * Simple value wrapper that stores the result observed by a receiver + * (value/error/done). A receiver is the continuation of an + * asynchronous operation in the world of executors. + **/ +template <std::movable T> +class Received { +private: + std::variant<std::exception_ptr,T> _value; +public: + Received() : _value() {} + void set_value(T value) { _value.template emplace<1>(std::move(value)); } + void set_error(std::exception_ptr exception) { _value.template emplace<0>(exception); } + void set_done() { _value.template emplace<0>(nullptr); } + bool has_value() const { return (_value.index() == 1); } + bool has_error() const { return (_value.index() == 0) && bool(std::get<0>(_value)); } + bool was_canceled() const { return !has_value() && !has_error(); } + std::exception_ptr get_error() const { return has_error() ? std::get<0>(_value) : std::exception_ptr(); } + T get_value() { + if (_value.index() == 1) { + return std::move(std::get<1>(_value)); + } else { + if (auto ex = std::get<0>(_value)) { + std::rethrow_exception(ex); + } else { + throw UnavailableResultException("tried to access the result of a canceled operation"); + } + } + } +}; + +} diff --git a/vespalib/src/vespa/vespalib/coro/schedule.h b/vespalib/src/vespa/vespalib/coro/schedule.h index 6dfa5b9536c..71a384f356f 100644 --- a/vespalib/src/vespa/vespalib/coro/schedule.h +++ b/vespalib/src/vespa/vespalib/coro/schedule.h @@ -5,6 +5,7 @@ #include <vespa/vespalib/util/executor.h> #include <coroutine> #include <exception> +#include <stdexcept> namespace vespalib::coro { diff --git a/vespalib/src/vespa/vespalib/coro/sync_wait.h b/vespalib/src/vespa/vespalib/coro/sync_wait.h deleted file mode 100644 index bdea2dfc7f0..00000000000 --- a/vespalib/src/vespa/vespalib/coro/sync_wait.h +++ /dev/null @@ -1,59 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#pragma once - -#include "detached.h" -#include "lazy.h" -#include <vespa/vespalib/util/gate.h> - -#include <coroutine> -#include <exception> - -namespace vespalib::coro { - -template <typename T, typename S> -Detached signal_when_done(Lazy<T> &value, S &sink) { - try { - sink(co_await value); - } catch (...) { - sink(std::current_exception()); - } -} - -/** - * Wait for a lazy value to be calculated (note that waiting for a - * value will also start calculating it). Make sure the thread waiting - * is not needed in the calculation of the value, or you will end up - * with a deadlock. - **/ -template <typename T> -T &sync_wait(Lazy<T> &value) { - struct MySink { - Gate gate; - T *result; - std::exception_ptr exception; - void operator()(T &result_in) { - result = &result_in; - gate.countDown(); - } - void operator()(std::exception_ptr exception_in) { - exception = exception_in; - gate.countDown(); - } - MySink() : gate(), result(nullptr), exception() {} - }; - MySink sink; - signal_when_done(value, sink); - sink.gate.await(); - if (sink.exception) { - std::rethrow_exception(sink.exception); - } - return *sink.result; -} - -template <typename T> -T &&sync_wait(Lazy<T> &&value) { - return std::move(sync_wait(value)); -} - -} |