aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--.gitignore2
-rw-r--r--config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentSpec.java2
-rwxr-xr-xconfig-proxy/src/main/sh/vespa-config-ctl.sh2
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java18
-rwxr-xr-xconfigserver/src/main/sh/start-configserver2
-rwxr-xr-xcontainer-disc/src/main/sh/vespa-start-container-daemon.sh2
-rw-r--r--controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/application/v4/model/DeploymentData.java41
-rw-r--r--controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/deployment/ApplicationStore.java25
-rw-r--r--controller-server/src/main/java/com/yahoo/vespa/hosted/controller/ApplicationController.java69
-rw-r--r--controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackage.java27
-rw-r--r--controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackageStream.java265
-rw-r--r--controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/TestPackage.java100
-rw-r--r--controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/ZipEntries.java33
-rw-r--r--controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/InternalStepRunner.java13
-rw-r--r--controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/JobController.java46
-rw-r--r--controller-server/src/main/java/com/yahoo/vespa/hosted/controller/proxy/ConfigServerRestExecutorImpl.java2
-rw-r--r--controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiHandler.java4
-rw-r--r--controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackageTest.java263
-rw-r--r--controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/pkg/TestPackageTest.java57
-rw-r--r--controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/pkg/ZipEntriesTest.java50
-rw-r--r--controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/InternalStepRunnerTest.java2
-rw-r--r--controller-server/src/test/java/com/yahoo/vespa/hosted/controller/integration/ApplicationStoreMock.java15
-rw-r--r--controller-server/src/test/java/com/yahoo/vespa/hosted/controller/integration/ConfigServerMock.java13
-rw-r--r--controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiCloudTest.java17
-rw-r--r--dist/vespa.spec12
-rw-r--r--hosted-api/src/main/java/ai/vespa/hosted/api/MultiPartStreamer.java24
-rw-r--r--http-client/pom.xml6
-rw-r--r--http-client/src/main/java/ai/vespa/hosted/client/AbstractHttpClient.java2
-rw-r--r--http-client/src/main/java/ai/vespa/hosted/client/HttpClient.java6
-rwxr-xr-xlogserver/bin/logserver-start.sh2
-rw-r--r--node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/servicedump/VespaServiceDumperImpl.java2
-rw-r--r--node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/maintenance/servicedump/VespaServiceDumperImplTest.java10
-rw-r--r--node-repository/src/main/java/com/yahoo/vespa/hosted/provision/Node.java1
-rw-r--r--parent/pom.xml167
-rw-r--r--pom.xml1
-rw-r--r--standalone-container/src/main/java/com/yahoo/container/standalone/StandaloneContainerApplication.java2
-rw-r--r--vespa-hadoop/OWNERS1
-rw-r--r--vespa-hadoop/README4
-rw-r--r--vespa-hadoop/abi-spec.json1
-rw-r--r--vespa-hadoop/pom.xml166
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputCommitter.java37
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java60
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java153
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaSimpleJsonInputFormat.java99
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/package-info.java10
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/TupleTools.java70
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java194
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaCounters.java105
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaHttpClient.java102
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaQuerySchema.java114
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/package-info.java10
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java669
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaQuery.java114
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaSimpleJsonLoader.java63
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaStorage.java178
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/package-info.java10
-rw-r--r--vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/MapReduceTest.java200
-rw-r--r--vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java633
-rw-r--r--vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaQueryTest.java101
-rw-r--r--vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java106
-rw-r--r--vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/util/MockQueryHandler.java219
-rw-r--r--vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/util/TupleToolsTest.java51
-rw-r--r--vespa-hadoop/src/test/pig/feed_create_operations.pig24
-rw-r--r--vespa-hadoop/src/test/pig/feed_create_operations_short_form.pig19
-rw-r--r--vespa-hadoop/src/test/pig/feed_multiline_operations.pig15
-rw-r--r--vespa-hadoop/src/test/pig/feed_operations.pig11
-rw-r--r--vespa-hadoop/src/test/pig/feed_operations_with_json_loader.pig14
-rw-r--r--vespa-hadoop/src/test/pig/feed_operations_xml.pig11
-rw-r--r--vespa-hadoop/src/test/pig/feed_visit_data.pig12
-rw-r--r--vespa-hadoop/src/test/pig/query.pig19
-rw-r--r--vespa-hadoop/src/test/pig/query_alt_root.pig20
-rw-r--r--vespa-hadoop/src/test/resources/operations_data.json10
-rw-r--r--vespa-hadoop/src/test/resources/operations_data.xml14
-rw-r--r--vespa-hadoop/src/test/resources/operations_multiline_data.json93
-rw-r--r--vespa-hadoop/src/test/resources/tabular_data.csv11
-rw-r--r--vespa-hadoop/src/test/resources/user_ids.csv4
-rw-r--r--vespa-hadoop/src/test/resources/visit_data.json10
-rw-r--r--vespabase/CMakeLists.txt2
-rwxr-xr-xvespabase/src/rhel-prestart.sh4
-rw-r--r--vespajlib/abi-spec.json1
-rw-r--r--vespajlib/src/main/java/com/yahoo/compress/ArchiveStreamReader.java4
-rw-r--r--vespajlib/src/main/java/com/yahoo/io/LazyInputStream.java53
-rw-r--r--vespajlib/src/main/java/com/yahoo/yolean/concurrent/Memoized.java8
-rw-r--r--vespalib/src/tests/coro/lazy/lazy_test.cpp63
-rw-r--r--vespalib/src/vespa/vespalib/coro/completion.h104
-rw-r--r--vespalib/src/vespa/vespalib/coro/lazy.h4
-rw-r--r--vespalib/src/vespa/vespalib/coro/received.h46
-rw-r--r--vespalib/src/vespa/vespalib/coro/schedule.h1
-rw-r--r--vespalib/src/vespa/vespalib/coro/sync_wait.h59
89 files changed, 1062 insertions, 4349 deletions
diff --git a/.gitignore b/.gitignore
index c77fe07eee7..adc898a7266 100644
--- a/.gitignore
+++ b/.gitignore
@@ -40,12 +40,10 @@ Testing
/build.ninja
/rules.ninja
*_test_app
-/hadoop/dependency-reduced-pom.xml
/mvnw
/mvnw.cmd
/mvnwDebug
/mvnwDebug.cmd
-/vespa-hadoop/dependency-reduced-pom.xml
.preprocessed/
.DS_Store
install_manifest.txt
diff --git a/config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentSpec.java b/config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentSpec.java
index c9ec9780fad..f69bd5fa475 100644
--- a/config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentSpec.java
+++ b/config-model-api/src/main/java/com/yahoo/config/application/api/DeploymentSpec.java
@@ -194,7 +194,7 @@ public class DeploymentSpec {
public DeploymentInstanceSpec requireInstance(InstanceName name) {
Optional<DeploymentInstanceSpec> instance = instance(name);
if (instance.isEmpty())
- throw new IllegalArgumentException("No instance '" + name + "' in deployment.xml'. Instances: " +
+ throw new IllegalArgumentException("No instance '" + name + "' in deployment.xml. Instances: " +
instances().stream().map(spec -> spec.name().toString()).collect(Collectors.joining(",")));
return instance.get();
}
diff --git a/config-proxy/src/main/sh/vespa-config-ctl.sh b/config-proxy/src/main/sh/vespa-config-ctl.sh
index 63aaf11280f..be141561b07 100755
--- a/config-proxy/src/main/sh/vespa-config-ctl.sh
+++ b/config-proxy/src/main/sh/vespa-config-ctl.sh
@@ -122,7 +122,7 @@ case $1 in
java ${jvmopts} \
-XX:+ExitOnOutOfMemoryError $(getJavaOptionsIPV46) \
-Dproxyconfigsources="${configsources}" \
- -Djava.io.tmpdir=${VESPA_HOME}/tmp \
+ -Djava.io.tmpdir=${VESPA_HOME}/var/tmp \
${userargs} \
-XX:ActiveProcessorCount=2 \
-cp $cp com.yahoo.vespa.config.proxy.ProxyServer 19090
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java
index b36e8d2d37c..d05640ef818 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java
@@ -216,19 +216,6 @@ public class SessionRepository {
return List.copyOf(localSessionCache.values());
}
- public Set<LocalSession> getLocalSessionsFromFileSystem() {
- File[] sessions = tenantFileSystemDirs.sessionsPath().listFiles(sessionApplicationsFilter);
- if (sessions == null) return Set.of();
-
- Set<LocalSession> sessionIds = new HashSet<>();
- for (File session : sessions) {
- long sessionId = Long.parseLong(session.getName());
- LocalSession localSession = getSessionFromFile(sessionId);
- sessionIds.add(localSession);
- }
- return sessionIds;
- }
-
private LocalSession getSessionFromFile(long sessionId) {
SessionZooKeeperClient sessionZKClient = createSessionZooKeeperClient(sessionId);
File sessionDir = getAndValidateExistingSessionAppDir(sessionId);
@@ -377,10 +364,7 @@ public class SessionRepository {
}
public int deleteExpiredRemoteSessions(Clock clock) {
- Duration expiryTime = configserverConfig.hostedVespa()
- ? sessionLifetime.multipliedBy(2)
- : sessionLifetime.multipliedBy(12); // TODO: Remove when tested more (Oct. 2022 at the latest)
-
+ Duration expiryTime = sessionLifetime.multipliedBy(2);
List<Long> remoteSessionsFromZooKeeper = getRemoteSessionsFromZooKeeper();
log.log(Level.FINE, () -> "Remote sessions for tenant " + tenantName + ": " + remoteSessionsFromZooKeeper);
diff --git a/configserver/src/main/sh/start-configserver b/configserver/src/main/sh/start-configserver
index 8f515a4c309..97ccae9125f 100755
--- a/configserver/src/main/sh/start-configserver
+++ b/configserver/src/main/sh/start-configserver
@@ -182,7 +182,7 @@ vespa-run-as-vespa-user vespa-runserver -s ${VESPA_SERVICE_NAME} -r 30 -p $pidfi
--add-opens=java.base/java.nio=ALL-UNNAMED \
--add-opens=java.base/jdk.internal.loader=ALL-UNNAMED \
--add-opens=java.base/sun.security.ssl=ALL-UNNAMED \
- -Djava.io.tmpdir=${VESPA_HOME}/tmp \
+ -Djava.io.tmpdir=${VESPA_HOME}/var/tmp \
-Djava.library.path=${VESPA_HOME}/lib64 \
-Djava.security.properties=${VESPA_HOME}/conf/vespa/java.security.override \
-Djava.awt.headless=true \
diff --git a/container-disc/src/main/sh/vespa-start-container-daemon.sh b/container-disc/src/main/sh/vespa-start-container-daemon.sh
index 09a873f06f9..bd218ba176d 100755
--- a/container-disc/src/main/sh/vespa-start-container-daemon.sh
+++ b/container-disc/src/main/sh/vespa-start-container-daemon.sh
@@ -274,7 +274,7 @@ exec $numactlcmd $envcmd java \
--add-opens=java.base/java.nio=ALL-UNNAMED \
--add-opens=java.base/jdk.internal.loader=ALL-UNNAMED \
--add-opens=java.base/sun.security.ssl=ALL-UNNAMED \
- -Djava.io.tmpdir="${VESPA_HOME}/tmp" \
+ -Djava.io.tmpdir="${VESPA_HOME}/var/tmp" \
-Djava.library.path="${VESPA_HOME}/lib64" \
-Djava.security.properties=${VESPA_HOME}/conf/vespa/java.security.override \
-Djava.awt.headless=true \
diff --git a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/application/v4/model/DeploymentData.java b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/application/v4/model/DeploymentData.java
index d3331c3cfd4..63c744c385d 100644
--- a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/application/v4/model/DeploymentData.java
+++ b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/application/v4/model/DeploymentData.java
@@ -12,12 +12,14 @@ import com.yahoo.vespa.hosted.controller.api.integration.billing.Quota;
import com.yahoo.vespa.hosted.controller.api.integration.certificates.EndpointCertificateMetadata;
import com.yahoo.vespa.hosted.controller.api.integration.configserver.ContainerEndpoint;
import com.yahoo.vespa.hosted.controller.api.integration.secrets.TenantSecretStore;
+import com.yahoo.yolean.concurrent.Memoized;
+import java.io.InputStream;
import java.security.cert.X509Certificate;
import java.util.List;
-import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.function.Supplier;
import static java.util.Objects.requireNonNull;
@@ -32,40 +34,41 @@ public class DeploymentData {
private final ApplicationId instance;
private final Tags tags;
private final ZoneId zone;
- private final byte[] applicationPackage;
+ private final Supplier<InputStream> applicationPackage;
private final Version platform;
private final Set<ContainerEndpoint> containerEndpoints;
- private final Optional<EndpointCertificateMetadata> endpointCertificateMetadata;
+ private final Supplier<Optional<EndpointCertificateMetadata>> endpointCertificateMetadata;
private final Optional<DockerImage> dockerImageRepo;
private final Optional<AthenzDomain> athenzDomain;
- private final Quota quota;
+ private final Supplier<Quota> quota;
private final List<TenantSecretStore> tenantSecretStores;
private final List<X509Certificate> operatorCertificates;
- private final Optional<CloudAccount> cloudAccount;
+ private final Supplier<Optional<CloudAccount>> cloudAccount;
private final boolean dryRun;
- public DeploymentData(ApplicationId instance, Tags tags, ZoneId zone, byte[] applicationPackage, Version platform,
+ public DeploymentData(ApplicationId instance, Tags tags, ZoneId zone, Supplier<InputStream> applicationPackage, Version platform,
Set<ContainerEndpoint> containerEndpoints,
- Optional<EndpointCertificateMetadata> endpointCertificateMetadata,
+ Supplier<Optional<EndpointCertificateMetadata>> endpointCertificateMetadata,
Optional<DockerImage> dockerImageRepo,
Optional<AthenzDomain> athenzDomain,
- Quota quota,
+ Supplier<Quota> quota,
List<TenantSecretStore> tenantSecretStores,
List<X509Certificate> operatorCertificates,
- Optional<CloudAccount> cloudAccount, boolean dryRun) {
+ Supplier<Optional<CloudAccount>> cloudAccount,
+ boolean dryRun) {
this.instance = requireNonNull(instance);
this.tags = requireNonNull(tags);
this.zone = requireNonNull(zone);
this.applicationPackage = requireNonNull(applicationPackage);
this.platform = requireNonNull(platform);
this.containerEndpoints = Set.copyOf(requireNonNull(containerEndpoints));
- this.endpointCertificateMetadata = requireNonNull(endpointCertificateMetadata);
+ this.endpointCertificateMetadata = new Memoized<>(requireNonNull(endpointCertificateMetadata));
this.dockerImageRepo = requireNonNull(dockerImageRepo);
this.athenzDomain = athenzDomain;
- this.quota = quota;
+ this.quota = new Memoized<>(requireNonNull(quota));
this.tenantSecretStores = List.copyOf(requireNonNull(tenantSecretStores));
this.operatorCertificates = List.copyOf(requireNonNull(operatorCertificates));
- this.cloudAccount = Objects.requireNonNull(cloudAccount);
+ this.cloudAccount = new Memoized<>(requireNonNull(cloudAccount));
this.dryRun = dryRun;
}
@@ -79,8 +82,8 @@ public class DeploymentData {
return zone;
}
- public byte[] applicationPackage() {
- return applicationPackage;
+ public InputStream applicationPackage() {
+ return applicationPackage.get();
}
public Version platform() {
@@ -92,7 +95,7 @@ public class DeploymentData {
}
public Optional<EndpointCertificateMetadata> endpointCertificateMetadata() {
- return endpointCertificateMetadata;
+ return endpointCertificateMetadata.get();
}
public Optional<DockerImage> dockerImageRepo() {
@@ -104,7 +107,7 @@ public class DeploymentData {
}
public Quota quota() {
- return quota;
+ return quota.get();
}
public List<TenantSecretStore> tenantSecretStores() {
@@ -116,9 +119,11 @@ public class DeploymentData {
}
public Optional<CloudAccount> cloudAccount() {
- return cloudAccount;
+ return cloudAccount.get();
}
- public boolean isDryRun() { return dryRun; }
+ public boolean isDryRun() {
+ return dryRun;
+ }
}
diff --git a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/deployment/ApplicationStore.java b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/deployment/ApplicationStore.java
index c4db0de539e..71ec07bc2e6 100644
--- a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/deployment/ApplicationStore.java
+++ b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/deployment/ApplicationStore.java
@@ -5,6 +5,9 @@ import com.yahoo.config.provision.ApplicationName;
import com.yahoo.config.provision.TenantName;
import com.yahoo.vespa.hosted.controller.api.identifiers.DeploymentId;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
import java.time.Instant;
import java.util.Optional;
@@ -17,7 +20,16 @@ import java.util.Optional;
public interface ApplicationStore {
/** Returns the application package of the given revision. */
- byte[] get(DeploymentId deploymentId, RevisionId revisionId);
+ default byte[] get(DeploymentId deploymentId, RevisionId revisionId) {
+ try (InputStream stream = stream(deploymentId, revisionId)) {
+ return stream.readAllBytes();
+ }
+ catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ InputStream stream(DeploymentId deploymentId, RevisionId revisionId);
/** Returns the application package diff, compared to the previous build, for the given tenant, application and build number */
Optional<byte[]> getDiff(TenantName tenantName, ApplicationName applicationName, long buildNumber);
@@ -43,7 +55,16 @@ public interface ApplicationStore {
void removeAll(TenantName tenant, ApplicationName application);
/** Returns the tester application package of the given revision. Does NOT contain the services.xml. */
- byte[] getTester(TenantName tenant, ApplicationName application, RevisionId revision);
+ default byte[] getTester(TenantName tenant, ApplicationName application, RevisionId revision) {
+ try (InputStream stream = streamTester(tenant, application, revision)) {
+ return stream.readAllBytes();
+ }
+ catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ InputStream streamTester(TenantName tenantName, ApplicationName applicationName, RevisionId revision);
/** Returns the application package diff, compared to the previous build, for the given deployment and build number */
Optional<byte[]> getDevDiff(DeploymentId deploymentId, long buildNumber);
diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/ApplicationController.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/ApplicationController.java
index 34a7ae89dd2..e09e1f04b8e 100644
--- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/ApplicationController.java
+++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/ApplicationController.java
@@ -15,7 +15,6 @@ import com.yahoo.config.provision.InstanceName;
import com.yahoo.config.provision.Tags;
import com.yahoo.config.provision.TenantName;
import com.yahoo.config.provision.zone.ZoneId;
-import com.yahoo.log.LogLevel;
import com.yahoo.text.Text;
import com.yahoo.transaction.Mutex;
import com.yahoo.vespa.athenz.api.AthenzDomain;
@@ -39,7 +38,6 @@ import com.yahoo.vespa.hosted.controller.api.integration.configserver.ConfigServ
import com.yahoo.vespa.hosted.controller.api.integration.configserver.ContainerEndpoint;
import com.yahoo.vespa.hosted.controller.api.integration.configserver.DeploymentResult;
import com.yahoo.vespa.hosted.controller.api.integration.configserver.DeploymentResult.LogEntry;
-import com.yahoo.vespa.hosted.controller.api.integration.configserver.Log;
import com.yahoo.vespa.hosted.controller.api.integration.configserver.Node;
import com.yahoo.vespa.hosted.controller.api.integration.configserver.NodeFilter;
import com.yahoo.vespa.hosted.controller.api.integration.deployment.ApplicationStore;
@@ -58,6 +56,7 @@ import com.yahoo.vespa.hosted.controller.application.QuotaUsage;
import com.yahoo.vespa.hosted.controller.application.SystemApplication;
import com.yahoo.vespa.hosted.controller.application.TenantAndApplicationId;
import com.yahoo.vespa.hosted.controller.application.pkg.ApplicationPackage;
+import com.yahoo.vespa.hosted.controller.application.pkg.ApplicationPackageStream;
import com.yahoo.vespa.hosted.controller.application.pkg.ApplicationPackageValidator;
import com.yahoo.vespa.hosted.controller.athenz.impl.AthenzFacade;
import com.yahoo.vespa.hosted.controller.certificate.EndpointCertificates;
@@ -79,6 +78,7 @@ import com.yahoo.vespa.hosted.controller.versions.VespaVersion;
import com.yahoo.vespa.hosted.controller.versions.VespaVersion.Confidence;
import com.yahoo.yolean.Exceptions;
+import java.io.ByteArrayInputStream;
import java.security.Principal;
import java.security.cert.X509Certificate;
import java.time.Clock;
@@ -87,7 +87,6 @@ import java.time.Instant;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -98,6 +97,7 @@ import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Predicate;
+import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
@@ -489,9 +489,6 @@ public class ApplicationController {
DeploymentId deployment = new DeploymentId(job.application(), zone);
try (Mutex deploymentLock = lockForDeployment(job.application(), zone)) {
- Set<ContainerEndpoint> containerEndpoints;
- Optional<EndpointCertificateMetadata> endpointCertificateMetadata;
-
Run run = controller.jobController().last(job)
.orElseThrow(() -> new IllegalStateException("No known run of '" + job + "'"));
@@ -500,30 +497,32 @@ public class ApplicationController {
Version platform = run.versions().sourcePlatform().filter(__ -> deploySourceVersions).orElse(run.versions().targetPlatform());
RevisionId revision = run.versions().sourceRevision().filter(__ -> deploySourceVersions).orElse(run.versions().targetRevision());
- ApplicationPackage applicationPackage = new ApplicationPackage(applicationStore.get(deployment, revision));
-
+ ApplicationPackageStream applicationPackage = new ApplicationPackageStream(() -> applicationStore.stream(deployment, revision),
+ ApplicationPackageStream.addingCertificate(run.testerCertificate()));
AtomicReference<RevisionId> lastRevision = new AtomicReference<>();
Instance instance;
+ Set<ContainerEndpoint> containerEndpoints;
try (Mutex lock = lock(applicationId)) {
LockedApplication application = new LockedApplication(requireApplication(applicationId), lock);
application.get().revisions().last().map(ApplicationVersion::id).ifPresent(lastRevision::set);
instance = application.get().require(job.application().instance());
- if ( ! applicationPackage.trustedCertificates().isEmpty()
- && run.testerCertificate().isPresent())
- applicationPackage = applicationPackage.withTrustedCertificate(run.testerCertificate().get());
-
- endpointCertificateMetadata = endpointCertificates.getMetadata(instance, zone, applicationPackage.deploymentSpec());
-
containerEndpoints = controller.routing().of(deployment).prepare(application);
} // Release application lock while doing the deployment, which is a lengthy task.
+ Supplier<Optional<EndpointCertificateMetadata>> endpointCertificateMetadata = () -> {
+ try (Mutex lock = lock(applicationId)) {
+ Optional<EndpointCertificateMetadata> data = endpointCertificates.getMetadata(instance, zone, applicationPackage.truncatedPackage().deploymentSpec());
+ data.ifPresent(e -> deployLogger.accept("Using CA signed certificate version %s".formatted(e.version())));
+ return data;
+ }
+ };
+
// Carry out deployment without holding the application lock.
DeploymentResult result = deploy(job.application(), instance.tags(), applicationPackage, zone, platform, containerEndpoints,
endpointCertificateMetadata, run.isDryRun());
- endpointCertificateMetadata.ifPresent(e -> deployLogger.accept("Using CA signed certificate version %s".formatted(e.version())));
// Record the quota usage for this application
var quotaUsage = deploymentQuotaUsage(zone, job.application());
@@ -544,8 +543,10 @@ public class ApplicationController {
.distinct()
.collect(Collectors.toList()))
.orElseGet(List::of);
- if (warnings.isEmpty()) controller.notificationsDb().removeNotification(source, Notification.Type.applicationPackage);
- else controller.notificationsDb().setNotification(source, Notification.Type.applicationPackage, Notification.Level.warning, warnings);
+ if (warnings.isEmpty())
+ controller.notificationsDb().removeNotification(source, Notification.Type.applicationPackage);
+ else
+ controller.notificationsDb().setNotification(source, Notification.Type.applicationPackage, Notification.Level.warning, warnings);
}
lockApplicationOrThrow(applicationId, application ->
@@ -606,23 +607,23 @@ public class ApplicationController {
/** Deploy a system application to given zone */
public DeploymentResult deploySystemApplicationPackage(SystemApplication application, ZoneId zone, Version version) {
if (application.hasApplicationPackage()) {
- ApplicationPackage applicationPackage = new ApplicationPackage(
- artifactRepository.getSystemApplicationPackage(application.id(), zone, version)
+ ApplicationPackageStream applicationPackage = new ApplicationPackageStream(
+ () -> new ByteArrayInputStream(artifactRepository.getSystemApplicationPackage(application.id(), zone, version))
);
- return deploy(application.id(), Tags.empty(), applicationPackage, zone, version, Set.of(), /* No application cert */ Optional.empty(), false);
+ return deploy(application.id(), Tags.empty(), applicationPackage, zone, version, Set.of(), Optional::empty, false);
} else {
throw new RuntimeException("This system application does not have an application package: " + application.id().toShortString());
}
}
/** Deploys the given tester application to the given zone. */
- public DeploymentResult deployTester(TesterId tester, ApplicationPackage applicationPackage, ZoneId zone, Version platform) {
- return deploy(tester.id(), Tags.empty(), applicationPackage, zone, platform, Set.of(), /* No application cert for tester*/ Optional.empty(), false);
+ public DeploymentResult deployTester(TesterId tester, ApplicationPackageStream applicationPackage, ZoneId zone, Version platform) {
+ return deploy(tester.id(), Tags.empty(), applicationPackage, zone, platform, Set.of(), Optional::empty, false);
}
- private DeploymentResult deploy(ApplicationId application, Tags tags, ApplicationPackage applicationPackage,
+ private DeploymentResult deploy(ApplicationId application, Tags tags, ApplicationPackageStream applicationPackage,
ZoneId zone, Version platform, Set<ContainerEndpoint> endpoints,
- Optional<EndpointCertificateMetadata> endpointCertificateMetadata,
+ Supplier<Optional<EndpointCertificateMetadata>> endpointCertificateMetadata,
boolean dryRun) {
DeploymentId deployment = new DeploymentId(application, zone);
try {
@@ -638,13 +639,8 @@ public class ApplicationController {
.filter(tenant-> tenant instanceof AthenzTenant)
.map(tenant -> ((AthenzTenant)tenant).domain());
- if (zone.environment().isManuallyDeployed())
- controller.applications().applicationStore().putMeta(deployment,
- clock.instant(),
- applicationPackage.metaDataZip());
-
- Quota deploymentQuota = DeploymentQuotaCalculator.calculate(billingController.getQuota(application.tenant()),
- asList(application.tenant()), application, zone, applicationPackage.deploymentSpec());
+ Supplier<Quota> deploymentQuota = () -> DeploymentQuotaCalculator.calculate(billingController.getQuota(application.tenant()),
+ asList(application.tenant()), application, zone, applicationPackage.truncatedPackage().deploymentSpec());
List<TenantSecretStore> tenantSecretStores = controller.tenants()
.get(application.tenant())
@@ -654,9 +650,9 @@ public class ApplicationController {
List<X509Certificate> operatorCertificates = controller.supportAccess().activeGrantsFor(deployment).stream()
.map(SupportAccessGrant::certificate)
.collect(toList());
- Optional<CloudAccount> cloudAccount = decideCloudAccountOf(deployment, applicationPackage.deploymentSpec());
+ Supplier<Optional<CloudAccount>> cloudAccount = () -> decideCloudAccountOf(deployment, applicationPackage.truncatedPackage().deploymentSpec());
ConfigServer.PreparedApplication preparedApplication =
- configServer.deploy(new DeploymentData(application, tags, zone, applicationPackage.zippedContent(), platform,
+ configServer.deploy(new DeploymentData(application, tags, zone, applicationPackage::zipStream, platform,
endpoints, endpointCertificateMetadata, dockerImageRepo, domain,
deploymentQuota, tenantSecretStores, operatorCertificates,
cloudAccount, dryRun));
@@ -665,7 +661,12 @@ public class ApplicationController {
} finally {
// Even if prepare fails, routing configuration may need to be updated
if ( ! application.instance().isTester()) {
- controller.routing().of(deployment).configure(applicationPackage.deploymentSpec());
+ controller.routing().of(deployment).configure(applicationPackage.truncatedPackage().deploymentSpec());
+ if (zone.environment().isManuallyDeployed())
+ controller.applications().applicationStore().putMeta(deployment,
+ clock.instant(),
+ applicationPackage.truncatedPackage().metaDataZip());
+
}
}
}
diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackage.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackage.java
index b99d825a779..53c78d7c8ec 100644
--- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackage.java
+++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackage.java
@@ -59,19 +59,19 @@ import static java.util.stream.Collectors.toMap;
* A representation of the content of an application package.
* Only meta-data content can be accessed as anything other than compressed data.
* A package is identified by a hash of the content.
- *
- * This is immutable.
- *
+ *
* @author bratseth
* @author jonmv
*/
public class ApplicationPackage {
- private static final String trustedCertificatesFile = "security/clients.pem";
- private static final String buildMetaFile = "build-meta.json";
+ static final String trustedCertificatesFile = "security/clients.pem";
+ static final String buildMetaFile = "build-meta.json";
static final String deploymentFile = "deployment.xml";
- private static final String validationOverridesFile = "validation-overrides.xml";
+ static final String validationOverridesFile = "validation-overrides.xml";
static final String servicesFile = "services.xml";
+ static final Set<String> prePopulated = Set.of(deploymentFile, validationOverridesFile, servicesFile, buildMetaFile, trustedCertificatesFile);
+
private static Hasher hasher() { return Hashing.murmur3_128().newHasher(); }
private final String bundleHash;
@@ -101,7 +101,7 @@ public class ApplicationPackage {
*/
public ApplicationPackage(byte[] zippedContent, boolean requireFiles) {
this.zippedContent = Objects.requireNonNull(zippedContent, "The application package content cannot be null");
- this.files = new ZipArchiveCache(zippedContent, Set.of(deploymentFile, validationOverridesFile, servicesFile, buildMetaFile, trustedCertificatesFile));
+ this.files = new ZipArchiveCache(zippedContent, prePopulated);
Optional<DeploymentSpec> deploymentSpec = files.get(deploymentFile).map(bytes -> new String(bytes, UTF_8)).map(DeploymentSpec::fromXml);
if (requireFiles && deploymentSpec.isEmpty())
@@ -122,17 +122,6 @@ public class ApplicationPackage {
preProcessAndPopulateCache();
}
- /** Returns a copy of this with the given certificate appended. */
- public ApplicationPackage withTrustedCertificate(X509Certificate certificate) {
- List<X509Certificate> trustedCertificates = new ArrayList<>(this.trustedCertificates);
- trustedCertificates.add(certificate);
- byte[] certificatesBytes = X509CertificateUtils.toPem(trustedCertificates).getBytes(UTF_8);
-
- ByteArrayOutputStream modified = new ByteArrayOutputStream(zippedContent.length + certificatesBytes.length);
- ZipEntries.transferAndWrite(modified, new ByteArrayInputStream(zippedContent), trustedCertificatesFile, certificatesBytes);
- return new ApplicationPackage(modified.toByteArray());
- }
-
/** Hash of all files and settings that influence what is deployed to config servers. */
public String bundleHash() {
return bundleHash;
@@ -295,7 +284,7 @@ public class ApplicationPackage {
private Map<Path, Optional<byte[]>> read(Collection<String> names) {
var entries = ZipEntries.from(zip,
- name -> names.contains(name),
+ names::contains,
maxSize,
true)
.asList().stream()
diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackageStream.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackageStream.java
new file mode 100644
index 00000000000..021064417ac
--- /dev/null
+++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackageStream.java
@@ -0,0 +1,265 @@
+package com.yahoo.vespa.hosted.controller.application.pkg;
+
+import com.yahoo.security.X509CertificateUtils;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.security.cert.X509Certificate;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.function.UnaryOperator;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+import java.util.zip.ZipOutputStream;
+
+import static com.yahoo.security.X509CertificateUtils.certificateListFromPem;
+import static java.io.OutputStream.nullOutputStream;
+import static java.lang.Math.min;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+/**
+ * Wraps a zipped application package stream.
+ * This allows replacing content as the input stream is read.
+ * This also retains a truncated {@link ApplicationPackage}, containing only the specified set of files,
+ * which can be accessed when this stream is fully exhausted.
+ *
+ * @author jonmv
+ */
+public class ApplicationPackageStream {
+
+ private final Supplier<Replacer> replacer;
+ private final Supplier<Predicate<String>> filter;
+ private final Supplier<InputStream> in;
+ private final AtomicReference<ApplicationPackage> truncatedPackage = new AtomicReference<>();
+
+ public static Supplier<Replacer> addingCertificate(Optional<X509Certificate> certificate) {
+ return certificate.map(cert -> Replacer.of(Map.of(ApplicationPackage.trustedCertificatesFile,
+ trustBytes -> append(trustBytes, cert))))
+ .orElse(Replacer.of(Map.of()));
+ }
+
+ static InputStream append(InputStream trustIn, X509Certificate cert) {
+ try {
+ List<X509Certificate> trusted = trustIn == null ? new ArrayList<>()
+ : new ArrayList<>(certificateListFromPem(new String(trustIn.readAllBytes(), UTF_8)));
+ trusted.add(cert);
+ return new ByteArrayInputStream(X509CertificateUtils.toPem(trusted).getBytes(UTF_8));
+ }
+ catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ /** Stream that effectively copies the input stream to its {@link #truncatedPackage()} when exhausted. */
+ public ApplicationPackageStream(Supplier<InputStream> in) {
+ this(in, () -> __ -> true, Map.of());
+ }
+
+ /** Stream that replaces the indicated entries, and copies all metadata files to its {@link #truncatedPackage()} when exhausted. */
+ public ApplicationPackageStream(Supplier<InputStream> in, Supplier<Replacer> replacer) {
+ this(in, () -> name -> ApplicationPackage.prePopulated.contains(name) || name.endsWith(".xml"), replacer);
+ }
+
+ /** Stream that replaces the indicated entries, and copies the filtered entries to its {@link #truncatedPackage()} when exhausted. */
+ public ApplicationPackageStream(Supplier<InputStream> in, Supplier<Predicate<String>> truncation, Map<String, UnaryOperator<InputStream>> replacements) {
+ this(in, truncation, Replacer.of(replacements));
+ }
+
+ /** Stream that uses the given replacer to modify content, and copies the filtered entries to its {@link #truncatedPackage()} when exhausted. */
+ public ApplicationPackageStream(Supplier<InputStream> in, Supplier<Predicate<String>> truncation, Supplier<Replacer> replacer) {
+ this.in = in;
+ this.filter = truncation;
+ this.replacer = replacer;
+ }
+
+ /**
+ * Returns a new stream continaing the zipped application package this wraps. Separate streams may exist concurrently,
+ * and the first to be exhausted will populate the truncated application package.
+ */
+ public InputStream zipStream() {
+ return new Stream(in.get(), replacer.get(), filter.get(), truncatedPackage);
+ }
+
+ /**
+ * Returns the application package backed by only the files indicated by the truncation filter.
+ * Throws if no instances of {@link #zipStream()} have been exhausted yet.
+ */
+ public ApplicationPackage truncatedPackage() {
+ ApplicationPackage truncated = truncatedPackage.get();
+ if (truncated == null) throw new IllegalStateException("must completely exhaust input before reading package");
+ return truncated;
+ }
+
+ private static class Stream extends InputStream {
+
+ private final byte[] inBuffer = new byte[1 << 16];
+ private final ByteArrayOutputStream teeOut = new ByteArrayOutputStream(1 << 16);
+ private final ZipOutputStream teeZip = new ZipOutputStream(teeOut);
+ private final ByteArrayOutputStream out = new ByteArrayOutputStream(1 << 16);
+ private final ZipOutputStream outZip = new ZipOutputStream(out);
+ private final AtomicReference<ApplicationPackage> truncatedPackage;
+ private final InputStream in;
+ private final ZipInputStream inZip;
+ private final Replacer replacer;
+ private final Predicate<String> filter;
+ private byte[] currentOut = new byte[0];
+ private InputStream currentIn = InputStream.nullInputStream();
+ private boolean includeCurrent = false;
+ private int pos = 0;
+ private boolean closed = false;
+ private boolean done = false;
+
+ private Stream(InputStream in, Replacer replacer, Predicate<String> filter, AtomicReference<ApplicationPackage> truncatedPackage) {
+ this.in = in;
+ this.inZip = new ZipInputStream(in);
+ this.replacer = replacer;
+ this.filter = filter;
+ this.truncatedPackage = truncatedPackage;
+ }
+
+ private void fill() throws IOException {
+ if (done) return;
+ while (out.size() == 0) {
+ // Exhaust current entry first.
+ int i, n = out.size();
+ while (out.size() == 0 && (i = currentIn.read(inBuffer)) != -1) {
+ if (includeCurrent) teeZip.write(inBuffer, 0, i);
+ outZip.write(inBuffer, 0, i);
+ n += i;
+ }
+
+ // Current entry exhausted, look for next.
+ if (n == 0) {
+ next();
+ if (done) break;
+ }
+ }
+
+ currentOut = out.toByteArray();
+ out.reset();
+ pos = 0;
+ }
+
+ private void next() throws IOException {
+ if (includeCurrent) teeZip.closeEntry();
+ outZip.closeEntry();
+
+ ZipEntry next = inZip.getNextEntry();
+ String name;
+ InputStream content = null;
+ if (next == null) {
+ // We may still have replacements to fill in, but if we don't, we're done filling, forever!
+ name = replacer.next();
+ if (name == null) {
+ outZip.close(); // This typically makes new output available, so must check for that after this.
+ teeZip.close();
+ currentIn = nullInputStream();
+ truncatedPackage.compareAndSet(null, new ApplicationPackage(teeOut.toByteArray()));
+ done = true;
+ return;
+ }
+ }
+ else {
+ name = next.getName();
+ content = new FilterInputStream(inZip) { @Override public void close() { } }; // Protect inZip from replacements closing it.
+ }
+
+ includeCurrent = truncatedPackage.get() == null && filter.test(name);
+ currentIn = replacer.modify(name, content);
+ if (currentIn == null) {
+ currentIn = InputStream.nullInputStream();
+ }
+ else {
+ if (includeCurrent) teeZip.putNextEntry(new ZipEntry(name));
+ outZip.putNextEntry(new ZipEntry(name));
+ }
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (closed) throw new IOException("stream closed");
+ if (pos == currentOut.length) {
+ fill();
+ if (pos == currentOut.length) return -1;
+ }
+ return 0xff & currentOut[pos++];
+ }
+
+ @Override
+ public int read(byte[] out, int off, int len) throws IOException {
+ if (closed) throw new IOException("stream closed");
+ if ((off | len | (off + len) | (out.length - (off + len))) < 0) throw new IndexOutOfBoundsException();
+ if (pos == currentOut.length) {
+ fill();
+ if (pos == currentOut.length) return -1;
+ }
+ int n = min(currentOut.length - pos, len);
+ System.arraycopy(currentOut, pos, out, off, n);
+ pos += n;
+ return n;
+ }
+
+ @Override
+ public int available() throws IOException {
+ return pos == currentOut.length && done ? 0 : 1;
+ }
+
+ @Override
+ public void close() {
+ if ( ! closed) try {
+ transferTo(nullOutputStream()); // Finish reading the zip, to populate the truncated package in case of errors.
+ in.transferTo(nullOutputStream()); // For some inane reason, ZipInputStream doesn't exhaust its wrapped input.
+ inZip.close();
+ closed = true;
+ }
+ catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ }
+
+ /** Replaces entries in a zip stream as they are encountered, then appends remaining entries at the end. */
+ public interface Replacer {
+
+ /** Called when the entries of the original zip stream are exhausted. Return remaining names, or {@code null} when none left. */
+ String next();
+
+ /** Modify content for a given name; return {@code null} for removal; in is {@code null} for entries not present in the input. */
+ InputStream modify(String name, InputStream in);
+
+ /**
+ * Wraps a map of fixed replacements, and:
+ * <ul>
+ * <li>Removes entries whose value is {@code null}.</li>
+ * <li>Modifies entries present in both input and the map.</li>
+ * <li>Appends entries present exclusively in the map.</li>
+ * <li>Writes all other entries as they are.</li>
+ * </ul>
+ */
+ static Supplier<Replacer> of(Map<String, UnaryOperator<InputStream>> replacements) {
+ return () -> new Replacer() {
+ final Map<String, UnaryOperator<InputStream>> remaining = new HashMap<>(replacements);
+ @Override public String next() {
+ return remaining.isEmpty() ? null : remaining.keySet().iterator().next();
+ }
+ @Override public InputStream modify(String name, InputStream in) {
+ UnaryOperator<InputStream> mapper = remaining.remove(name);
+ return mapper == null ? in : mapper.apply(in);
+ }
+ };
+ }
+
+ }
+
+}
diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/TestPackage.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/TestPackage.java
index 5b20c57fcca..17644d5e207 100644
--- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/TestPackage.java
+++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/TestPackage.java
@@ -18,15 +18,14 @@ import com.yahoo.text.Text;
import com.yahoo.vespa.hosted.controller.api.integration.deployment.RunId;
import com.yahoo.vespa.hosted.controller.api.integration.deployment.TesterCloud.Suite;
import com.yahoo.vespa.hosted.controller.api.integration.deployment.TesterId;
+import com.yahoo.vespa.hosted.controller.application.pkg.ApplicationPackageStream.Replacer;
import com.yahoo.vespa.hosted.controller.config.ControllerConfig;
import com.yahoo.vespa.hosted.controller.config.ControllerConfig.Steprunner.Testerapp;
import com.yahoo.yolean.Exceptions;
import javax.security.auth.x500.X500Principal;
import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.UncheckedIOException;
+import java.io.InputStream;
import java.math.BigInteger;
import java.security.KeyPair;
import java.security.cert.X509Certificate;
@@ -43,6 +42,8 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
+import java.util.function.Supplier;
+import java.util.function.UnaryOperator;
import java.util.jar.JarInputStream;
import java.util.jar.Manifest;
import java.util.regex.Pattern;
@@ -53,8 +54,9 @@ import static com.yahoo.vespa.hosted.controller.api.integration.deployment.Teste
import static com.yahoo.vespa.hosted.controller.api.integration.deployment.TesterCloud.Suite.system;
import static com.yahoo.vespa.hosted.controller.application.pkg.ApplicationPackage.deploymentFile;
import static com.yahoo.vespa.hosted.controller.application.pkg.ApplicationPackage.servicesFile;
-import static com.yahoo.vespa.hosted.controller.application.pkg.ZipEntries.transferAndWrite;
+import static java.io.InputStream.nullInputStream;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.function.UnaryOperator.identity;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.mapping;
import static java.util.stream.Collectors.toList;
@@ -71,32 +73,14 @@ public class TestPackage {
static final NodeResources DEFAULT_TESTER_RESOURCES_AWS = new NodeResources(2, 8, 50, 0.3, NodeResources.DiskSpeed.any);
static final NodeResources DEFAULT_TESTER_RESOURCES = new NodeResources(1, 4, 50, 0.3, NodeResources.DiskSpeed.any);
- private final ApplicationPackage applicationPackage;
+ private final ApplicationPackageStream applicationPackageStream;
private final X509Certificate certificate;
- public TestPackage(byte[] testPackage, boolean isPublicSystem, RunId id, Testerapp testerApp,
+ public TestPackage(Supplier<InputStream> inZip, boolean isPublicSystem, RunId id, Testerapp testerApp,
DeploymentSpec spec, Instant certificateValidFrom, Duration certificateValidDuration) {
-
- // Copy contents of submitted application-test.zip, and ensure required directories exist within the zip.
- Map<String, byte[]> entries = new HashMap<>();
- entries.put("artifacts/.ignore-" + UUID.randomUUID(), new byte[0]);
- entries.put("tests/.ignore-" + UUID.randomUUID(), new byte[0]);
-
- entries.put(servicesFile,
- servicesXml(! isPublicSystem,
- certificateValidFrom != null,
- hasLegacyTests(testPackage),
- testerResourcesFor(id.type().zone(), spec.requireInstance(id.application().instance())),
- testerApp));
-
- entries.put(deploymentFile,
- deploymentXml(id.tester(),
- spec.athenzDomain(),
- spec.requireInstance(id.application().instance())
- .athenzService(id.type().zone().environment(), id.type().zone().region())));
-
+ KeyPair keyPair;
if (certificateValidFrom != null) {
- KeyPair keyPair = KeyUtils.generateKeypair(KeyAlgorithm.RSA, 2048);
+ keyPair = KeyUtils.generateKeypair(KeyAlgorithm.RSA, 2048);
X500Principal subject = new X500Principal("CN=" + id.tester().id().toFullString() + "." + id.type() + "." + id.number());
this.certificate = X509CertificateBuilder.fromKeypair(keyPair,
subject,
@@ -105,26 +89,60 @@ public class TestPackage {
SignatureAlgorithm.SHA512_WITH_RSA,
BigInteger.valueOf(1))
.build();
- entries.put("artifacts/key", KeyUtils.toPem(keyPair.getPrivate()).getBytes(UTF_8));
- entries.put("artifacts/cert", X509CertificateUtils.toPem(certificate).getBytes(UTF_8));
}
else {
+ keyPair = null;
this.certificate = null;
}
+ this.applicationPackageStream = new ApplicationPackageStream(inZip, () -> __ -> false, () -> new Replacer() {
+
+ // Initially skips all declared entries, ensuring they're generated and appended after all input entries.
+ final Map<String, UnaryOperator<InputStream>> entries = new HashMap<>();
+ final Map<String, UnaryOperator<InputStream>> replacements = new HashMap<>();
+ boolean hasLegacyTests = false;
+
+ @Override
+ public String next() {
+ if (entries.isEmpty()) return null;
+ String next = entries.keySet().iterator().next();
+ replacements.put(next, entries.remove(next));
+ return next;
+ }
- ByteArrayOutputStream buffer = new ByteArrayOutputStream(testPackage.length + 10_000);
- transferAndWrite(buffer, new ByteArrayInputStream(testPackage), entries);
- this.applicationPackage = new ApplicationPackage(buffer.toByteArray());
- }
-
- static boolean hasLegacyTests(byte[] testPackage) {
- return ZipEntries.from(testPackage, __ -> true, 0, false).asList().stream()
- .anyMatch(file -> file.name().startsWith("artifacts/") && file.name().endsWith("-tests.jar"));
+ @Override
+ public InputStream modify(String name, InputStream in) {
+ hasLegacyTests |= name.startsWith("artifacts/") && name.endsWith("-tests.jar");
+ return entries.containsKey(name) ? null : replacements.getOrDefault(name, identity()).apply(in);
+ }
+ {
+ // Copy contents of submitted application-test.zip, and ensure required directories exist within the zip.
+ entries.put("artifacts/.ignore-" + UUID.randomUUID(), __ -> nullInputStream());
+ entries.put("tests/.ignore-" + UUID.randomUUID(), __ -> nullInputStream());
+
+ entries.put(servicesFile,
+ __ -> new ByteArrayInputStream(servicesXml( ! isPublicSystem,
+ certificateValidFrom != null,
+ hasLegacyTests,
+ testerResourcesFor(id.type().zone(), spec.requireInstance(id.application().instance())),
+ testerApp)));
+
+ entries.put(deploymentFile,
+ __ -> new ByteArrayInputStream(deploymentXml(id.tester(),
+ spec.athenzDomain(),
+ spec.requireInstance(id.application().instance())
+ .athenzService(id.type().zone().environment(), id.type().zone().region()))));
+
+ if (certificate != null) {
+ entries.put("artifacts/key", __ -> new ByteArrayInputStream(KeyUtils.toPem(keyPair.getPrivate()).getBytes(UTF_8)));
+ entries.put("artifacts/cert", __ -> new ByteArrayInputStream(X509CertificateUtils.toPem(certificate).getBytes(UTF_8)));
+ }
+ }
+ });
}
- public ApplicationPackage asApplicationPackage() {
- return applicationPackage;
+ public ApplicationPackageStream asApplicationPackage() {
+ return applicationPackageStream;
}
public X509Certificate certificate() {
@@ -207,7 +225,7 @@ public class TestPackage {
return new TestSummary(problems, suites);
}
- public static NodeResources testerResourcesFor(ZoneId zone, DeploymentInstanceSpec spec) {
+ static NodeResources testerResourcesFor(ZoneId zone, DeploymentInstanceSpec spec) {
NodeResources nodeResources = spec.steps().stream()
.filter(step -> step.concerns(zone.environment()))
.findFirst()
@@ -219,7 +237,7 @@ public class TestPackage {
}
/** Returns the generated services.xml content for the tester application. */
- public static byte[] servicesXml(boolean systemUsesAthenz, boolean useTesterCertificate, boolean hasLegacyTests,
+ static byte[] servicesXml(boolean systemUsesAthenz, boolean useTesterCertificate, boolean hasLegacyTests,
NodeResources resources, ControllerConfig.Steprunner.Testerapp config) {
int jdiscMemoryGb = 2; // 2Gb memory for tester application which uses Maven.
int jdiscMemoryPct = (int) Math.ceil(100 * jdiscMemoryGb / resources.memoryGb());
@@ -279,7 +297,7 @@ public class TestPackage {
}
/** Returns a dummy deployment xml which sets up the service identity for the tester, if present. */
- public static byte[] deploymentXml(TesterId id, Optional<AthenzDomain> athenzDomain, Optional<AthenzService> athenzService) {
+ static byte[] deploymentXml(TesterId id, Optional<AthenzDomain> athenzDomain, Optional<AthenzService> athenzService) {
String deploymentSpec =
"<?xml version='1.0' encoding='UTF-8'?>\n" +
"<deployment version=\"1.0\" " +
diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/ZipEntries.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/ZipEntries.java
index 6bbcd551924..185c97f866e 100644
--- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/ZipEntries.java
+++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/ZipEntries.java
@@ -15,6 +15,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Predicate;
@@ -35,36 +36,6 @@ public class ZipEntries {
this.entries = List.copyOf(Objects.requireNonNull(entries));
}
- /** Copies the zipped content from in to out, adding/overwriting an entry with the given name and content. */
- public static void transferAndWrite(OutputStream out, InputStream in, String name, byte[] content) {
- transferAndWrite(out, in, Map.of(name, content));
- }
-
- /** Copies the zipped content from in to out, adding/overwriting/removing (on {@code null}) entries as specified. */
- public static void transferAndWrite(OutputStream out, InputStream in, Map<String, byte[]> entries) {
- try (ZipOutputStream zipOut = new ZipOutputStream(out);
- ZipInputStream zipIn = new ZipInputStream(in)) {
- for (ZipEntry entry = zipIn.getNextEntry(); entry != null; entry = zipIn.getNextEntry()) {
- if (entries.containsKey(entry.getName()))
- continue;
-
- zipOut.putNextEntry(new ZipEntry(entry.getName()));
- zipIn.transferTo(zipOut);
- zipOut.closeEntry();
- }
- for (Entry<String, byte[]> entry : entries.entrySet()) {
- if (entry.getValue() != null) {
- zipOut.putNextEntry(new ZipEntry(entry.getKey()));
- zipOut.write(entry.getValue());
- zipOut.closeEntry();
- }
- }
- }
- catch (IOException e) {
- throw new UncheckedIOException(e);
- }
- }
-
/** Read ZIP entries from inputStream */
public static ZipEntries from(byte[] zip, Predicate<String> entryNameMatcher, int maxEntrySizeInBytes, boolean throwIfEntryExceedsMaxSize) {
@@ -107,7 +78,7 @@ public class ZipEntries {
}
public String name() { return name; }
- public byte[] contentOrThrow() { return content.orElseThrow(); }
+ public byte[] contentOrThrow() { return content.orElseThrow(() -> new NoSuchElementException("'" + name + "' has no content")); }
public Optional<byte[]> content() { return content; }
public long size() { return size; }
}
diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/InternalStepRunner.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/InternalStepRunner.java
index e8c92d3e3f6..efe072c2a6d 100644
--- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/InternalStepRunner.java
+++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/InternalStepRunner.java
@@ -34,6 +34,7 @@ import com.yahoo.vespa.hosted.controller.application.Deployment;
import com.yahoo.vespa.hosted.controller.application.Endpoint;
import com.yahoo.vespa.hosted.controller.application.TenantAndApplicationId;
import com.yahoo.vespa.hosted.controller.application.pkg.ApplicationPackage;
+import com.yahoo.vespa.hosted.controller.application.pkg.ApplicationPackageStream;
import com.yahoo.vespa.hosted.controller.application.pkg.TestPackage;
import com.yahoo.vespa.hosted.controller.maintenance.JobRunner;
import com.yahoo.vespa.hosted.controller.notification.Notification;
@@ -43,6 +44,7 @@ import com.yahoo.vespa.hosted.controller.routing.context.DeploymentRoutingContex
import com.yahoo.yolean.Exceptions;
import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
import java.io.PrintStream;
import java.io.UncheckedIOException;
import java.security.cert.CertificateExpiredException;
@@ -248,7 +250,9 @@ public class InternalStepRunner implements StepRunner {
}
case LOAD_BALANCER_NOT_READY, PARENT_HOST_NOT_READY -> {
logger.log(e.message()); // Consider splitting these messages in summary and details, on config server.
- controller.jobController().locked(id, run -> run.sleepingUntil(startTime.plusSeconds(300)));
+ Instant someTimeAfterStart = startTime.plusSeconds(450);
+ Instant inALittleWhile = controller.clock().instant().plusSeconds(90);
+ controller.jobController().locked(id, run -> run.sleepingUntil(someTimeAfterStart.isAfter(inALittleWhile) ? someTimeAfterStart : inALittleWhile));
return result;
}
case NODE_ALLOCATION_FAILURE -> {
@@ -926,14 +930,13 @@ public class InternalStepRunner implements StepRunner {
}
/** Returns the application package for the tester application, assembled from a generated config, fat-jar and services.xml. */
- private ApplicationPackage testerPackage(RunId id) {
+ private ApplicationPackageStream testerPackage(RunId id) {
RevisionId revision = controller.jobController().run(id).versions().targetRevision();
DeploymentSpec spec = controller.applications().requireApplication(TenantAndApplicationId.from(id.application())).deploymentSpec();
- byte[] testZip = controller.applications().applicationStore().getTester(id.application().tenant(),
- id.application().application(), revision);
boolean useTesterCertificate = useTesterCertificate(id);
- TestPackage testPackage = new TestPackage(testZip,
+ TestPackage testPackage = new TestPackage(() -> controller.applications().applicationStore().streamTester(id.application().tenant(),
+ id.application().application(), revision),
controller.system().isPublic(),
id,
controller.controllerConfig().steprunner().testerapp(),
diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/JobController.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/JobController.java
index 08cf8d2e1c4..f94bd51fe4c 100644
--- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/JobController.java
+++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/JobController.java
@@ -207,32 +207,36 @@ public class JobController {
return run;
List<LogEntry> log;
- Instant deployedAt;
+ Optional<Instant> deployedAt;
Instant from;
if ( ! run.id().type().isProduction()) {
- deployedAt = run.stepInfo(installInitialReal).or(() -> run.stepInfo(installReal)).flatMap(StepInfo::startTime).orElseThrow();
- from = run.lastVespaLogTimestamp().isAfter(run.start()) ? run.lastVespaLogTimestamp() : deployedAt.minusSeconds(10);
- log = LogEntry.parseVespaLog(controller.serviceRegistry().configServer()
- .getLogs(new DeploymentId(id.application(), zone),
- Map.of("from", Long.toString(from.toEpochMilli()))),
- from);
+ deployedAt = run.stepInfo(installInitialReal).or(() -> run.stepInfo(installReal)).flatMap(StepInfo::startTime);
+ if (deployedAt.isPresent()) {
+ from = run.lastVespaLogTimestamp().isAfter(run.start()) ? run.lastVespaLogTimestamp() : deployedAt.get().minusSeconds(10);
+ log = LogEntry.parseVespaLog(controller.serviceRegistry().configServer()
+ .getLogs(new DeploymentId(id.application(), zone),
+ Map.of("from", Long.toString(from.toEpochMilli()))),
+ from);
+ }
+ else log = List.of();
}
- else
- log = List.of();
+ else log = List.of();
if (id.type().isTest()) {
- deployedAt = run.stepInfo(installTester).flatMap(StepInfo::startTime).orElseThrow();
- from = run.lastVespaLogTimestamp().isAfter(run.start()) ? run.lastVespaLogTimestamp() : deployedAt.minusSeconds(10);
- List<LogEntry> testerLog = LogEntry.parseVespaLog(controller.serviceRegistry().configServer()
- .getLogs(new DeploymentId(id.tester().id(), zone),
- Map.of("from", Long.toString(from.toEpochMilli()))),
- from);
-
- Instant justNow = controller.clock().instant().minusSeconds(2);
- log = Stream.concat(log.stream(), testerLog.stream())
- .filter(entry -> entry.at().isBefore(justNow))
- .sorted(comparing(LogEntry::at))
- .collect(toUnmodifiableList());
+ deployedAt = run.stepInfo(installTester).flatMap(StepInfo::startTime);
+ if (deployedAt.isPresent()) {
+ from = run.lastVespaLogTimestamp().isAfter(run.start()) ? run.lastVespaLogTimestamp() : deployedAt.get().minusSeconds(10);
+ List<LogEntry> testerLog = LogEntry.parseVespaLog(controller.serviceRegistry().configServer()
+ .getLogs(new DeploymentId(id.tester().id(), zone),
+ Map.of("from", Long.toString(from.toEpochMilli()))),
+ from);
+
+ Instant justNow = controller.clock().instant().minusSeconds(2);
+ log = Stream.concat(log.stream(), testerLog.stream())
+ .filter(entry -> entry.at().isBefore(justNow))
+ .sorted(comparing(LogEntry::at))
+ .toList();
+ }
}
if (log.isEmpty())
return run;
diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/proxy/ConfigServerRestExecutorImpl.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/proxy/ConfigServerRestExecutorImpl.java
index 9bea7fb829d..4f4e21d9f25 100644
--- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/proxy/ConfigServerRestExecutorImpl.java
+++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/proxy/ConfigServerRestExecutorImpl.java
@@ -58,7 +58,7 @@ import static com.yahoo.yolean.Exceptions.uncheck;
public class ConfigServerRestExecutorImpl extends AbstractComponent implements ConfigServerRestExecutor {
private static final Logger LOG = Logger.getLogger(ConfigServerRestExecutorImpl.class.getName());
- private static final Duration PROXY_REQUEST_TIMEOUT = Duration.ofSeconds(10);
+ private static final Duration PROXY_REQUEST_TIMEOUT = Duration.ofSeconds(20);
private static final Duration PING_REQUEST_TIMEOUT = Duration.ofMillis(500);
private static final Duration SINGLE_TARGET_WAIT = Duration.ofSeconds(2);
private static final int SINGLE_TARGET_RETRIES = 3;
diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiHandler.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiHandler.java
index 0abf1470d29..d8acd2aa8b2 100644
--- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiHandler.java
+++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiHandler.java
@@ -305,7 +305,6 @@ public class ApplicationApiHandler extends AuditLoggingRequestHandler {
if (path.matches("/application/v4/tenant/{tenant}/info/billing")) return withCloudTenant(path.get("tenant"), request, this::putTenantInfoBilling);
if (path.matches("/application/v4/tenant/{tenant}/info/contacts")) return withCloudTenant(path.get("tenant"), request, this::putTenantInfoContacts);
if (path.matches("/application/v4/tenant/{tenant}/info/resend-mail-verification")) return withCloudTenant(path.get("tenant"), request, this::resendEmailVerification);
- if (path.matches("/application/v4/tenant/{tenant}/archive-access")) return allowAwsArchiveAccess(path.get("tenant"), request); // TODO(enygaard, 2022-05-25) Remove when no longer used by console
if (path.matches("/application/v4/tenant/{tenant}/archive-access/aws")) return allowAwsArchiveAccess(path.get("tenant"), request);
if (path.matches("/application/v4/tenant/{tenant}/archive-access/gcp")) return allowGcpArchiveAccess(path.get("tenant"), request);
if (path.matches("/application/v4/tenant/{tenant}/secret-store/{name}")) return addSecretStore(path.get("tenant"), path.get("name"), request);
@@ -355,7 +354,6 @@ public class ApplicationApiHandler extends AuditLoggingRequestHandler {
if (path.matches("/application/v4/tenant/{tenant}")) return deleteTenant(path.get("tenant"), request);
if (path.matches("/application/v4/tenant/{tenant}/access/managed/operator")) return removeManagedAccess(path.get("tenant"));
if (path.matches("/application/v4/tenant/{tenant}/key")) return removeDeveloperKey(path.get("tenant"), request);
- if (path.matches("/application/v4/tenant/{tenant}/archive-access")) return removeAwsArchiveAccess(path.get("tenant")); // TODO(enygaard, 2022-05-25) Remove when no longer used by console
if (path.matches("/application/v4/tenant/{tenant}/archive-access/aws")) return removeAwsArchiveAccess(path.get("tenant"));
if (path.matches("/application/v4/tenant/{tenant}/archive-access/gcp")) return removeGcpArchiveAccess(path.get("tenant"));
if (path.matches("/application/v4/tenant/{tenant}/secret-store/{name}")) return deleteSecretStore(path.get("tenant"), path.get("name"), request);
@@ -2626,8 +2624,6 @@ public class ApplicationApiHandler extends AuditLoggingRequestHandler {
log.warning(String.format("Failed to get quota for tenant %s: %s", tenant.name(), Exceptions.toMessageString(e)));
}
- // TODO(enygaard, 2022-05-25) Remove when console is using new archive access structure
- cloudTenant.archiveAccess().awsRole().ifPresent(role -> object.setString("archiveAccessRole", role));
toSlime(cloudTenant.archiveAccess(), object.setObject("archiveAccess"));
break;
diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackageTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackageTest.java
index 8cf861ff963..8ac8b87ac45 100644
--- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackageTest.java
+++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackageTest.java
@@ -3,18 +3,47 @@ package com.yahoo.vespa.hosted.controller.application.pkg;
import com.yahoo.config.application.api.DeploymentSpec;
import com.yahoo.config.application.api.ValidationId;
+import com.yahoo.io.LazyInputStream;
+import com.yahoo.security.KeyAlgorithm;
+import com.yahoo.security.KeyUtils;
+import com.yahoo.security.SignatureAlgorithm;
+import com.yahoo.security.X509CertificateBuilder;
import org.junit.jupiter.api.Test;
+import javax.security.auth.x500.X500Principal;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.io.SequenceInputStream;
+import java.math.BigInteger;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.security.KeyPair;
+import java.security.cert.X509Certificate;
import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
+import java.util.function.Predicate;
+import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import static com.yahoo.vespa.hosted.controller.application.pkg.ApplicationPackage.filesZip;
+import static com.yahoo.vespa.hosted.controller.application.pkg.ApplicationPackageStream.addingCertificate;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@@ -24,35 +53,41 @@ import static org.junit.jupiter.api.Assertions.fail;
*/
public class ApplicationPackageTest {
- static final String deploymentXml = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n" +
- "<deployment version=\"1.0\">\n" +
- " <test />\n" +
- " <prod>\n" +
- " <parallel>\n" +
- " <region active=\"true\">us-central-1</region>\n" +
- " </parallel>\n" +
- " </prod>\n" +
- "</deployment>\n";
-
- static final String servicesXml = "<services version='1.0' xmlns:deploy=\"vespa\" xmlns:preprocess=\"properties\">\n" +
- " <preprocess:include file='jdisc.xml' />\n" +
- " <content version='1.0' if='foo' />\n" +
- " <content version='1.0' id='foo' deploy:environment='staging prod' deploy:region='us-east-3 us-central-1'>\n" +
- " <preprocess:include file='content/content.xml' />\n" +
- " </content>\n" +
- " <preprocess:include file='not_found.xml' required='false' />\n" +
- "</services>\n";
+ static final String deploymentXml = """
+ <?xml version="1.0" encoding="UTF-8"?>
+ <deployment version="1.0">
+ <test />
+ <prod>
+ <parallel>
+ <region active="true">us-central-1</region>
+ </parallel>
+ </prod>
+ </deployment>
+ """;
+
+ static final String servicesXml = """
+ <services version='1.0' xmlns:deploy="vespa" xmlns:preprocess="properties">
+ <preprocess:include file='jdisc.xml' />
+ <content version='1.0' if='foo' />
+ <content version='1.0' id='foo' deploy:environment='staging prod' deploy:region='us-east-3 us-central-1'>
+ <preprocess:include file='content/content.xml' />
+ </content>
+ <preprocess:include file='not_found.xml' required='false' />
+ </services>
+ """;
private static final String jdiscXml = "<container id='stateless' version='1.0' />\n";
- private static final String contentXml = "<documents>\n" +
- " <document type=\"music.sd\" mode=\"index\" />\n" +
- "</documents>\n" +
- "<preprocess:include file=\"nodes.xml\" />";
+ private static final String contentXml = """
+ <documents>
+ <document type="music.sd" mode="index" />
+ </documents>
+ <preprocess:include file="nodes.xml" />""";
- private static final String nodesXml = "<nodes>\n" +
- " <node hostalias=\"node0\" distribution-key=\"0\" />\n" +
- "</nodes>";
+ private static final String nodesXml = """
+ <nodes>
+ <node hostalias="node0" distribution-key="0" />
+ </nodes>""";
@Test
void test_createEmptyForDeploymentRemoval() {
@@ -67,22 +102,22 @@ public class ApplicationPackageTest {
@Test
void testMetaData() {
- byte[] zip = ApplicationPackage.filesZip(Map.of("services.xml", servicesXml.getBytes(UTF_8),
- "jdisc.xml", jdiscXml.getBytes(UTF_8),
- "content/content.xml", contentXml.getBytes(UTF_8),
- "content/nodes.xml", nodesXml.getBytes(UTF_8),
- "gurba", "gurba".getBytes(UTF_8)));
+ byte[] zip = filesZip(Map.of("services.xml", servicesXml.getBytes(UTF_8),
+ "jdisc.xml", jdiscXml.getBytes(UTF_8),
+ "content/content.xml", contentXml.getBytes(UTF_8),
+ "content/nodes.xml", nodesXml.getBytes(UTF_8),
+ "gurba", "gurba".getBytes(UTF_8)));
assertEquals(Map.of("services.xml", servicesXml,
- "jdisc.xml", jdiscXml,
- "content/content.xml", contentXml,
- "content/nodes.xml", nodesXml),
- unzip(new ApplicationPackage(zip, false).metaDataZip()));
+ "jdisc.xml", jdiscXml,
+ "content/content.xml", contentXml,
+ "content/nodes.xml", nodesXml),
+ unzip(new ApplicationPackage(zip, false).metaDataZip()));
}
@Test
void testMetaDataWithMissingFiles() {
- byte[] zip = ApplicationPackage.filesZip(Map.of("services.xml", servicesXml.getBytes(UTF_8)));
+ byte[] zip = filesZip(Map.of("services.xml", servicesXml.getBytes(UTF_8)));
try {
new ApplicationPackage(zip, false).metaDataZip();
@@ -132,15 +167,165 @@ public class ApplicationPackageTest {
assertEquals(originalPackage.bundleHash(), similarDeploymentXml.bundleHash());
}
- private static Map<String, String> unzip(byte[] zip) {
- return ZipEntries.from(zip, __ -> true, 1 << 10, true)
+ static Map<String, String> unzip(byte[] zip) {
+ return ZipEntries.from(zip, __ -> true, 1 << 24, true)
.asList().stream()
.collect(Collectors.toMap(ZipEntries.ZipEntryWithContent::name,
- entry -> new String(entry.contentOrThrow(), UTF_8)));
+ entry -> new String(entry.content().orElse(new byte[0]), UTF_8)));
}
- private ApplicationPackage getApplicationZip(String path) throws Exception {
+ private ApplicationPackage getApplicationZip(String path) throws IOException {
return new ApplicationPackage(Files.readAllBytes(Path.of("src/test/resources/application-packages/" + path)), true);
}
+ @Test
+ void test_replacement() throws IOException {
+ byte[] zip = zip(Map.of());
+ List<X509Certificate> certificates = IntStream.range(0, 3)
+ .mapToObj(i -> {
+ KeyPair keyPair = KeyUtils.generateKeypair(KeyAlgorithm.EC, 256);
+ X500Principal subject = new X500Principal("CN=subject" + i);
+ return X509CertificateBuilder.fromKeypair(keyPair,
+ subject,
+ Instant.now(),
+ Instant.now().plusSeconds(1),
+ SignatureAlgorithm.SHA512_WITH_ECDSA,
+ BigInteger.valueOf(1))
+ .build();
+ }).toList();
+
+ assertEquals(List.of(), new ApplicationPackage(zip).trustedCertificates());
+ for (int i = 0; i < certificates.size(); i++) {
+ InputStream in = new ByteArrayInputStream(zip);
+ zip = new ApplicationPackageStream(() -> in, () -> __ -> false, addingCertificate(Optional.of(certificates.get(i)))).zipStream().readAllBytes();
+ assertEquals(certificates.subList(0, i + 1), new ApplicationPackage(zip).trustedCertificates());
+ }
+ }
+
+ static byte[] zip(Map<String, String> content) {
+ return filesZip(content.entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey(),
+ entry -> entry.getValue().getBytes(UTF_8))));
+ }
+
+ private static class AngryStreams {
+
+ private final byte[] content;
+ private final Map<ByteArrayInputStream, Throwable> streams = new LinkedHashMap<>();
+
+ AngryStreams(byte[] content) {
+ this.content = content;
+ }
+
+ InputStream stream() {
+ ByteArrayInputStream stream = new ByteArrayInputStream(Arrays.copyOf(content, content.length)) {
+ boolean closed = false;
+ @Override public void close() { closed = true; }
+ @Override public int read() { assertFalse(closed); return super.read(); }
+ @Override public int read(byte[] b, int off, int len) { assertFalse(closed); return super.read(b, off, len); }
+ @Override public long transferTo(OutputStream out) throws IOException { assertFalse(closed); return super.transferTo(out); }
+ @Override public byte[] readAllBytes() { assertFalse(closed); return super.readAllBytes(); }
+ };
+ streams.put(stream, new Throwable());
+ return stream;
+ }
+
+ void verifyAllRead() {
+ streams.forEach((stream, stack) -> assertEquals(0, stream.available(),
+ "unconsumed content in stream created at " +
+ new ByteArrayOutputStream() {{ stack.printStackTrace(new PrintStream(this)); }}));
+ }
+
+ }
+
+ @Test
+ void testApplicationPackageStream() throws Exception {
+ Map<String, String> content = Map.of("deployment.xml", deploymentXml,
+ "services.xml", servicesXml,
+ "jdisc.xml", jdiscXml,
+ "unused1.xml", jdiscXml,
+ "content/content.xml", contentXml,
+ "content/nodes.xml", nodesXml,
+ "gurba", "gurba");
+ byte[] zip = zip(content);
+ assertEquals(content, unzip(zip));
+ AngryStreams angry = new AngryStreams(zip);
+
+ ApplicationPackageStream identity = new ApplicationPackageStream(angry::stream);
+ InputStream lazy = new LazyInputStream(() -> new ByteArrayInputStream(identity.truncatedPackage().zippedContent()));
+ assertEquals("must completely exhaust input before reading package",
+ assertThrows(IllegalStateException.class, identity::truncatedPackage).getMessage());
+
+ // Verify no content has changed when passing through the stream.
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ try (InputStream stream = identity.zipStream()) { stream.transferTo(out); }
+ assertEquals(content, unzip(out.toByteArray()));
+ assertEquals(content, unzip(identity.truncatedPackage().zippedContent()));
+ assertEquals(content, unzip(lazy.readAllBytes()));
+ ApplicationPackage original = new ApplicationPackage(zip);
+ assertEquals(unzip(original.metaDataZip()), unzip(identity.truncatedPackage().metaDataZip()));
+ assertEquals(original.bundleHash(), identity.truncatedPackage().bundleHash());
+
+ // Change deployment.xml, remove unused1.xml and add unused2.xml
+ Map<String, UnaryOperator<InputStream>> replacements = Map.of("deployment.xml", in -> new SequenceInputStream(in, new ByteArrayInputStream("\n\n".getBytes(UTF_8))),
+ "unused1.xml", in -> null,
+ "unused2.xml", __ -> new ByteArrayInputStream(jdiscXml.getBytes(UTF_8)));
+ Predicate<String> truncation = name -> name.endsWith(".xml");
+ ApplicationPackageStream modifier = new ApplicationPackageStream(angry::stream, () -> truncation, replacements);
+ out.reset();
+
+ InputStream partiallyRead = modifier.zipStream();
+ assertEquals(15, partiallyRead.readNBytes(15).length);
+
+ try (InputStream stream = modifier.zipStream()) { stream.transferTo(out); }
+
+ assertEquals(Map.of("deployment.xml", deploymentXml + "\n\n",
+ "services.xml", servicesXml,
+ "jdisc.xml", jdiscXml,
+ "unused2.xml", jdiscXml,
+ "content/content.xml", contentXml,
+ "content/nodes.xml", nodesXml,
+ "gurba", "gurba"),
+ unzip(out.toByteArray()));
+
+ assertEquals(Map.of("deployment.xml", deploymentXml + "\n\n",
+ "services.xml", servicesXml,
+ "jdisc.xml", jdiscXml,
+ "unused2.xml", jdiscXml,
+ "content/content.xml", contentXml,
+ "content/nodes.xml", nodesXml),
+ unzip(modifier.truncatedPackage().zippedContent()));
+
+ // Compare retained metadata for an updated original package, and the truncated package of the modifier.
+ assertEquals(unzip(new ApplicationPackage(zip(Map.of("deployment.xml", deploymentXml + "\n\n", // Expected to change.
+ "services.xml", servicesXml,
+ "jdisc.xml", jdiscXml,
+ "unused1.xml", jdiscXml, // Irrelevant.
+ "content/content.xml", contentXml,
+ "content/nodes.xml", nodesXml,
+ "gurba", "gurba"))).metaDataZip()),
+ unzip(modifier.truncatedPackage().metaDataZip()));
+
+ try (InputStream stream1 = modifier.zipStream();
+ InputStream stream2 = modifier.zipStream()) {
+ assertArrayEquals(stream1.readAllBytes(),
+ stream2.readAllBytes());
+ }
+
+ ByteArrayOutputStream byteAtATime = new ByteArrayOutputStream();
+ try (InputStream stream1 = modifier.zipStream();
+ InputStream stream2 = modifier.zipStream()) {
+ for (int b; (b = stream1.read()) != -1; ) byteAtATime.write(b);
+ assertArrayEquals(stream2.readAllBytes(),
+ byteAtATime.toByteArray());
+ }
+
+ assertEquals(byteAtATime.size(),
+ 15 + partiallyRead.readAllBytes().length);
+ partiallyRead.close();
+
+ try (InputStream stream = modifier.zipStream()) { stream.readNBytes(12); }
+
+ angry.verifyAllRead();
+ }
+
}
diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/pkg/TestPackageTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/pkg/TestPackageTest.java
index bff0ccc8ae1..6da8db1c259 100644
--- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/pkg/TestPackageTest.java
+++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/pkg/TestPackageTest.java
@@ -1,18 +1,24 @@
package com.yahoo.vespa.hosted.controller.application.pkg;
import com.yahoo.config.application.api.DeploymentSpec;
+import com.yahoo.config.provision.ApplicationId;
import com.yahoo.config.provision.NodeResources;
import com.yahoo.config.provision.zone.ZoneId;
+import com.yahoo.vespa.hosted.controller.api.integration.deployment.JobType;
+import com.yahoo.vespa.hosted.controller.api.integration.deployment.RunId;
import com.yahoo.vespa.hosted.controller.application.pkg.TestPackage.TestSummary;
import com.yahoo.vespa.hosted.controller.config.ControllerConfig;
+import com.yahoo.vespa.hosted.controller.config.ControllerConfig.Steprunner.Testerapp;
import org.junit.jupiter.api.Test;
+import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.jar.JarOutputStream;
import java.util.zip.ZipEntry;
@@ -20,9 +26,11 @@ import static com.yahoo.vespa.hosted.controller.api.integration.deployment.Teste
import static com.yahoo.vespa.hosted.controller.api.integration.deployment.TesterCloud.Suite.staging;
import static com.yahoo.vespa.hosted.controller.api.integration.deployment.TesterCloud.Suite.staging_setup;
import static com.yahoo.vespa.hosted.controller.api.integration.deployment.TesterCloud.Suite.system;
+import static com.yahoo.vespa.hosted.controller.application.pkg.ApplicationPackageTest.unzip;
import static com.yahoo.vespa.hosted.controller.application.pkg.TestPackage.validateTests;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* @author jonmv
@@ -77,15 +85,15 @@ public class TestPackageTest {
@Test
void testBundleValidation() throws IOException {
byte[] testZip = ApplicationPackage.filesZip(Map.of("components/foo-tests.jar", testsJar("SystemTest", "StagingSetup", "ProductionTest"),
- "artifacts/key", new byte[0]));
+ "artifacts/key", new byte[0]));
TestSummary summary = validateTests(List.of(system), testZip);
assertEquals(List.of(system, staging_setup, production), summary.suites());
assertEquals(List.of("test package contains 'artifacts/key'; this conflicts with credentials used to run tests in Vespa Cloud",
- "test package has staging setup, so it should also include staging tests",
- "test package has production tests, but no production tests are declared in deployment.xml",
- "see https://docs.vespa.ai/en/testing.html for details on how to write system tests for Vespa"),
- summary.problems());
+ "test package has staging setup, so it should also include staging tests",
+ "test package has production tests, but no production tests are declared in deployment.xml",
+ "see https://docs.vespa.ai/en/testing.html for details on how to write system tests for Vespa"),
+ summary.problems());
}
@Test
@@ -95,20 +103,47 @@ public class TestPackageTest {
assertEquals(List.of(staging, production), summary.suites());
assertEquals(List.of("test package has staging tests, so it should also include staging setup",
- "see https://docs.vespa.ai/en/testing.html for details on how to write system tests for Vespa"),
- summary.problems());
+ "see https://docs.vespa.ai/en/testing.html for details on how to write system tests for Vespa"),
+ summary.problems());
}
@Test
void testBasicTestsValidation() {
byte[] testZip = ApplicationPackage.filesZip(Map.of("tests/staging-test/foo.json", new byte[0],
- "tests/staging-setup/foo.json", new byte[0]));
+ "tests/staging-setup/foo.json", new byte[0]));
TestSummary summary = validateTests(List.of(system, production), testZip);
assertEquals(List.of(staging_setup, staging), summary.suites());
assertEquals(List.of("test package has no system tests, but <test /> is declared in deployment.xml",
- "test package has no production tests, but production tests are declared in deployment.xml",
- "see https://docs.vespa.ai/en/testing.html for details on how to write system tests for Vespa"),
- summary.problems());
+ "test package has no production tests, but production tests are declared in deployment.xml",
+ "see https://docs.vespa.ai/en/testing.html for details on how to write system tests for Vespa"),
+ summary.problems());
+ }
+
+ @Test
+ void testTestPacakgeAssembly() throws IOException {
+ byte[] bundleZip = ApplicationPackage.filesZip(Map.of("components/foo-tests.jar", testsJar("SystemTest", "ProductionTest"),
+ "artifacts/key", new byte[0]));
+ TestPackage bundleTests = new TestPackage(() -> new ByteArrayInputStream(bundleZip),
+ false,
+ new RunId(ApplicationId.defaultId(), JobType.dev("abc"), 123),
+ new Testerapp.Builder().tenantCdBundle("foo").runtimeProviderClass("bar").build(),
+ DeploymentSpec.fromXml("""
+ <deployment>
+ <test />
+ </deployment>
+ """),
+ null,
+ null);
+
+ Map<String, String> bundlePackage = unzip(bundleTests.asApplicationPackage().zipStream().readAllBytes());
+ bundlePackage.keySet().removeIf(name -> name.startsWith("tests/.ignore") || name.startsWith("artifacts/.ignore"));
+ assertEquals(Set.of("deployment.xml",
+ "services.xml",
+ "components/foo-tests.jar",
+ "artifacts/key"),
+ bundlePackage.keySet());
+ assertEquals(Map.of(),
+ unzip(bundleTests.asApplicationPackage().truncatedPackage().zippedContent()));
}
@Test
diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/pkg/ZipEntriesTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/pkg/ZipEntriesTest.java
deleted file mode 100644
index 37062e1002b..00000000000
--- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/pkg/ZipEntriesTest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.hosted.controller.application.pkg;
-
-import com.yahoo.security.KeyAlgorithm;
-import com.yahoo.security.KeyUtils;
-import com.yahoo.security.SignatureAlgorithm;
-import com.yahoo.security.X509CertificateBuilder;
-import org.junit.jupiter.api.Test;
-
-import javax.security.auth.x500.X500Principal;
-import java.math.BigInteger;
-import java.security.KeyPair;
-import java.security.cert.X509Certificate;
-import java.time.Instant;
-import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-/**
- * @author mpolden
- */
-public class ZipEntriesTest {
-
- @Test
- void test_replacement() {
- ApplicationPackage applicationPackage = new ApplicationPackage(new byte[0]);
- List<X509Certificate> certificates = IntStream.range(0, 3)
- .mapToObj(i -> {
- KeyPair keyPair = KeyUtils.generateKeypair(KeyAlgorithm.EC, 256);
- X500Principal subject = new X500Principal("CN=subject" + i);
- return X509CertificateBuilder.fromKeypair(keyPair,
- subject,
- Instant.now(),
- Instant.now().plusSeconds(1),
- SignatureAlgorithm.SHA512_WITH_ECDSA,
- BigInteger.valueOf(1))
- .build();
- })
- .collect(Collectors.toUnmodifiableList());
-
- assertEquals(List.of(), applicationPackage.trustedCertificates());
- for (int i = 0; i < certificates.size(); i++) {
- applicationPackage = applicationPackage.withTrustedCertificate(certificates.get(i));
- assertEquals(certificates.subList(0, i + 1), applicationPackage.trustedCertificates());
- }
- }
-
-}
diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/InternalStepRunnerTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/InternalStepRunnerTest.java
index 9bf762d2f99..2f245ab9736 100644
--- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/InternalStepRunnerTest.java
+++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/InternalStepRunnerTest.java
@@ -516,7 +516,7 @@ public class InternalStepRunnerTest {
assertEquals(oldTrusted, tester.configServer().application(app.instanceId(), id.type().zone()).get().applicationPackage().trustedCertificates());
tester.configServer().throwOnNextPrepare(null);
- tester.clock().advance(Duration.ofSeconds(300));
+ tester.clock().advance(Duration.ofSeconds(450));
tester.runner().run();
assertEquals(succeeded, tester.jobs().run(id).stepStatuses().get(Step.deployTester));
assertEquals(succeeded, tester.jobs().run(id).stepStatuses().get(Step.deployReal));
diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/integration/ApplicationStoreMock.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/integration/ApplicationStoreMock.java
index 8ed38761c95..e025a3bea4f 100644
--- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/integration/ApplicationStoreMock.java
+++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/integration/ApplicationStoreMock.java
@@ -12,6 +12,8 @@ import com.yahoo.vespa.hosted.controller.api.integration.deployment.RevisionId;
import com.yahoo.vespa.hosted.controller.api.integration.deployment.TesterId;
import com.yahoo.vespa.hosted.controller.application.TenantAndApplicationId;
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
import java.time.Instant;
import java.util.Map;
import java.util.NavigableMap;
@@ -46,15 +48,14 @@ public class ApplicationStoreMock implements ApplicationStore {
}
@Override
- public byte[] get(DeploymentId deploymentId, RevisionId revisionId) {
+ public InputStream stream(DeploymentId deploymentId, RevisionId revisionId) {
if ( ! revisionId.isProduction())
- return requireNonNull(devStore.get(deploymentId));
+ return new ByteArrayInputStream(devStore.get(deploymentId));
TenantAndApplicationId tenantAndApplicationId = TenantAndApplicationId.from(deploymentId.applicationId());
byte[] bytes = store.get(appId(tenantAndApplicationId.tenant(), tenantAndApplicationId.application())).get(revisionId);
- if (bytes == null)
- throw new NotExistsException("No " + revisionId + " found for " + tenantAndApplicationId);
- return bytes;
+ if (bytes == null) throw new NotExistsException("No " + revisionId + " found for " + tenantAndApplicationId);
+ return new ByteArrayInputStream(bytes);
}
@Override
@@ -96,8 +97,8 @@ public class ApplicationStoreMock implements ApplicationStore {
}
@Override
- public byte[] getTester(TenantName tenant, ApplicationName application, RevisionId revision) {
- return requireNonNull(store.get(testerId(tenant, application)).get(revision));
+ public InputStream streamTester(TenantName tenant, ApplicationName application, RevisionId revision) {
+ return new ByteArrayInputStream(store.get(testerId(tenant, application)).get(revision));
}
diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/integration/ConfigServerMock.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/integration/ConfigServerMock.java
index 07d9efdf8fc..eaa178c9727 100644
--- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/integration/ConfigServerMock.java
+++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/integration/ConfigServerMock.java
@@ -42,9 +42,12 @@ import com.yahoo.vespa.hosted.controller.api.integration.noderepository.RestartF
import com.yahoo.vespa.hosted.controller.api.integration.secrets.TenantSecretStore;
import com.yahoo.vespa.hosted.controller.application.SystemApplication;
import com.yahoo.vespa.hosted.controller.application.pkg.ApplicationPackage;
+import wiremock.org.checkerframework.checker.units.qual.A;
import java.io.ByteArrayInputStream;
+import java.io.IOException;
import java.io.InputStream;
+import java.io.UncheckedIOException;
import java.net.URI;
import java.time.Duration;
import java.time.Instant;
@@ -376,6 +379,13 @@ public class ConfigServerMock extends AbstractComponent implements ConfigServer
@Override
public PreparedApplication deploy(DeploymentData deployment) {
+ ApplicationPackage appPackage;
+ try (InputStream in = deployment.applicationPackage()) {
+ appPackage = new ApplicationPackage(in.readAllBytes());
+ }
+ catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
lastPrepareVersion = deployment.platform();
if (prepareException != null)
prepareException.accept(ApplicationId.from(deployment.instance().tenant(),
@@ -383,8 +393,9 @@ public class ConfigServerMock extends AbstractComponent implements ConfigServer
deployment.instance().instance()));
DeploymentId id = new DeploymentId(deployment.instance(), deployment.zone());
- applications.put(id, new Application(id.applicationId(), lastPrepareVersion, new ApplicationPackage(deployment.applicationPackage())));
+ applications.put(id, new Application(id.applicationId(), lastPrepareVersion, appPackage));
ClusterSpec.Id cluster = ClusterSpec.Id.from("default");
+ deployment.endpointCertificateMetadata(); // Supplier with side effects >_<
if (nodeRepository().list(id.zoneId(), NodeFilter.all().applications(id.applicationId())).isEmpty())
provision(id.zoneId(), id.applicationId(), cluster);
diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiCloudTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiCloudTest.java
index 3b5a09e4a74..a1e70b77948 100644
--- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiCloudTest.java
+++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiCloudTest.java
@@ -384,20 +384,17 @@ public class ApplicationApiCloudTest extends ControllerContainerCloudTest {
tester.assertResponse(request("/application/v4/tenant/scoober", GET).roles(Role.reader(tenantName)),
(response) -> assertFalse(response.getBodyAsString().contains("archiveAccessRole")),
200);
- tester.assertResponse(request("/application/v4/tenant/scoober/archive-access", PUT)
- .data("{\"role\":\"dummy\"}").roles(Role.administrator(tenantName)),
- "{\"error-code\":\"BAD_REQUEST\",\"message\":\"Invalid archive access role 'dummy': Must match expected pattern: 'arn:aws:iam::\\\\d{12}:.+'\"}", 400);
tester.assertResponse(request("/application/v4/tenant/scoober/archive-access/aws", PUT)
.data("{\"role\":\"arn:aws:iam::123456789012:role/my-role\"}").roles(Role.administrator(tenantName)),
"{\"message\":\"AWS archive access role set to 'arn:aws:iam::123456789012:role/my-role' for tenant scoober.\"}", 200);
tester.assertResponse(request("/application/v4/tenant/scoober", GET).roles(Role.reader(tenantName)),
- (response) -> assertTrue(response.getBodyAsString().contains("\"archiveAccessRole\":\"arn:aws:iam::123456789012:role/my-role\"")),
+ (response) -> assertTrue(response.getBodyAsString().contains("\"awsRole\":\"arn:aws:iam::123456789012:role/my-role\"")),
200);
tester.assertResponse(request("/application/v4/tenant/scoober/archive-access/aws", DELETE).roles(Role.administrator(tenantName)),
"{\"message\":\"AWS archive access role removed for tenant scoober.\"}", 200);
tester.assertResponse(request("/application/v4/tenant/scoober", GET).roles(Role.reader(tenantName)),
- (response) -> assertFalse(response.getBodyAsString().contains("\"archiveAccessRole\":\"arn:aws:iam::123456789012:role/my-role\"")),
+ (response) -> assertFalse(response.getBodyAsString().contains("\"awsRole\":\"arn:aws:iam::123456789012:role/my-role\"")),
200);
tester.assertResponse(request("/application/v4/tenant/scoober/archive-access/gcp", PUT)
@@ -412,25 +409,25 @@ public class ApplicationApiCloudTest extends ControllerContainerCloudTest {
(response) -> assertFalse(response.getBodyAsString().contains("\"gcpMember\":\"user:test@example.com\"")),
200);
- tester.assertResponse(request("/application/v4/tenant/scoober/archive-access", PUT)
+ tester.assertResponse(request("/application/v4/tenant/scoober/archive-access/aws", PUT)
.data("{\"role\":\"arn:aws:iam::123456789012:role/my-role\"}").roles(Role.administrator(tenantName)),
"{\"message\":\"AWS archive access role set to 'arn:aws:iam::123456789012:role/my-role' for tenant scoober.\"}", 200);
tester.assertResponse(request("/application/v4/tenant/scoober", GET).roles(Role.reader(tenantName)),
- (response) -> assertTrue(response.getBodyAsString().contains("\"archiveAccessRole\":\"arn:aws:iam::123456789012:role/my-role\"")),
+ (response) -> assertTrue(response.getBodyAsString().contains("\"awsRole\":\"arn:aws:iam::123456789012:role/my-role\"")),
200);
- tester.assertResponse(request("/application/v4/tenant/scoober/archive-access", PUT)
+ tester.assertResponse(request("/application/v4/tenant/scoober/archive-access/aws", PUT)
.data("{\"role\":\"arn:aws:iam::123456789012:role/my-role\"}").roles(Role.administrator(tenantName)),
"{\"message\":\"AWS archive access role set to 'arn:aws:iam::123456789012:role/my-role' for tenant scoober.\"}", 200);
tester.assertResponse(request("/application/v4/tenant/scoober", GET).roles(Role.reader(tenantName)),
- (response) -> assertTrue(response.getBodyAsString().contains("\"archiveAccessRole\":\"arn:aws:iam::123456789012:role/my-role\"")),
+ (response) -> assertTrue(response.getBodyAsString().contains("\"awsRole\":\"arn:aws:iam::123456789012:role/my-role\"")),
200);
tester.assertResponse(request("/application/v4/tenant/scoober/application/albums/environment/prod/region/aws-us-east-1c/instance/default", GET)
.roles(Role.reader(tenantName)),
new File("deployment-cloud.json"));
- tester.assertResponse(request("/application/v4/tenant/scoober/archive-access", DELETE).roles(Role.administrator(tenantName)),
+ tester.assertResponse(request("/application/v4/tenant/scoober/archive-access/aws", DELETE).roles(Role.administrator(tenantName)),
"{\"message\":\"AWS archive access role removed for tenant scoober.\"}", 200);
tester.assertResponse(request("/application/v4/tenant/scoober", GET).roles(Role.reader(tenantName)),
(response) -> assertFalse(response.getBodyAsString().contains("archiveAccessRole")),
diff --git a/dist/vespa.spec b/dist/vespa.spec
index 437daf3f4e2..2592e769359 100644
--- a/dist/vespa.spec
+++ b/dist/vespa.spec
@@ -586,6 +586,10 @@ exit 0
%systemd_postun_with_restart vespa-configserver.service
%endif
+%post base
+
+ln -sf %{_prefix}/var/tmp %{_prefix}/tmp
+
%postun base
if [ $1 -eq 0 ]; then # this is an uninstallation
rm -f /etc/profile.d/vespa.sh
@@ -603,6 +607,10 @@ then
mv %{_prefix}/conf/vespa/default-env.txt.rpmsave %{_prefix}/conf/vespa/default-env.txt
fi
fi
+if test -L %{_prefix}/tmp
+then
+ rm -f %{_prefix}/tmp
+fi
%files
%if %{_defattr_is_vespa_vespa}
@@ -690,8 +698,6 @@ fi
%{_prefix}/man
%{_prefix}/sbin
%{_prefix}/share
-%dir %attr(-,%{_vespa_user},%{_vespa_group}) %{_prefix}/tmp
-%dir %attr(-,%{_vespa_user},%{_vespa_group}) %{_prefix}/tmp/vespa
%dir %attr(-,%{_vespa_user},%{_vespa_group}) %{_prefix}/var
%dir %attr(-,%{_vespa_user},%{_vespa_group}) %{_prefix}/var/crash
%dir %attr(-,%{_vespa_user},%{_vespa_group}) %{_prefix}/var/db
@@ -707,6 +713,8 @@ fi
%dir %attr(-,%{_vespa_user},%{_vespa_group}) %{_prefix}/var/db/vespa/tmp
%dir %attr(-,%{_vespa_user},%{_vespa_group}) %{_prefix}/var/jdisc_container
%dir %attr(-,%{_vespa_user},%{_vespa_group}) %{_prefix}/var/run
+%dir %attr(-,%{_vespa_user},%{_vespa_group}) %{_prefix}/var/tmp
+%dir %attr(-,%{_vespa_user},%{_vespa_group}) %{_prefix}/var/tmp/vespa
%dir %attr(-,%{_vespa_user},%{_vespa_group}) %{_prefix}/var/vespa
%dir %attr(-,%{_vespa_user},%{_vespa_group}) %{_prefix}/var/vespa/application
%dir %attr(-,%{_vespa_user},%{_vespa_group}) %{_prefix}/var/vespa/bundlecache
diff --git a/hosted-api/src/main/java/ai/vespa/hosted/api/MultiPartStreamer.java b/hosted-api/src/main/java/ai/vespa/hosted/api/MultiPartStreamer.java
index f1cbc027e17..b3862b76296 100644
--- a/hosted-api/src/main/java/ai/vespa/hosted/api/MultiPartStreamer.java
+++ b/hosted-api/src/main/java/ai/vespa/hosted/api/MultiPartStreamer.java
@@ -15,6 +15,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Enumeration;
import java.util.List;
import java.util.UUID;
import java.util.function.Supplier;
@@ -89,10 +90,12 @@ public class MultiPartStreamer {
/** Returns an input stream which is an aggregate of all current parts in this, plus an end marker. */
public InputStream data() {
- InputStream aggregate = new SequenceInputStream(Collections.enumeration(Stream.concat(streams.stream().map(Supplier::get),
- Stream.of(end()))
- .collect(Collectors.toList())));
-
+ InputStream aggregate = new SequenceInputStream(new Enumeration<>() {
+ final int j = streams.size();
+ int i = -1;
+ @Override public boolean hasMoreElements() { return i < j; }
+ @Override public InputStream nextElement() { return ++i < j ? streams.get(i).get() : i == j ? end() : null; }
+ });
try {
if (aggregate.skip(2) != 2)// This should never happen, as the first stream is a ByteArrayInputStream.
throw new IllegalStateException("Failed skipping extraneous bytes.");
@@ -113,17 +116,6 @@ public class MultiPartStreamer {
return asStream(disposition(name) + (filename == null ? "" : "; filename=\"" + filename + "\"") + type(contentType));
}
- /** Returns the separator to put between one part and the next, when this is a file. */
- private InputStream separator(String name, Path path) {
- try {
- String contentType = Files.probeContentType(path);
- return separator(name, path.getFileName().toString(), contentType != null ? contentType : "application/octet-stream");
- }
- catch (IOException e) {
- throw new UncheckedIOException(e);
- }
- }
-
/** Returns the end delimiter of the request, with line breaks prepended. */
private InputStream end() {
return asStream("\r\n--" + boundary + "--");
@@ -140,7 +132,7 @@ public class MultiPartStreamer {
return "\r\nContent-Type: " + contentType + "\r\n\r\n";
}
- /** Returns the a ByteArrayInputStream over the given string, UTF-8 encoded. */
+ /** Returns a ByteArrayInputStream over the given string, UTF-8 encoded. */
private static InputStream asStream(String string) {
return new ByteArrayInputStream(string.getBytes(StandardCharsets.UTF_8));
}
diff --git a/http-client/pom.xml b/http-client/pom.xml
index c396533d7b9..133da65631c 100644
--- a/http-client/pom.xml
+++ b/http-client/pom.xml
@@ -58,6 +58,12 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
<build>
diff --git a/http-client/src/main/java/ai/vespa/hosted/client/AbstractHttpClient.java b/http-client/src/main/java/ai/vespa/hosted/client/AbstractHttpClient.java
index ed3fee101ed..48bbffc7e37 100644
--- a/http-client/src/main/java/ai/vespa/hosted/client/AbstractHttpClient.java
+++ b/http-client/src/main/java/ai/vespa/hosted/client/AbstractHttpClient.java
@@ -173,7 +173,7 @@ public abstract class AbstractHttpClient implements HttpClient {
@Override
public HttpClient.RequestBuilder body(byte[] json) {
- return body(HttpEntities.create(json, ContentType.APPLICATION_JSON));
+ return body(() -> HttpEntities.create(json, ContentType.APPLICATION_JSON));
}
@Override
diff --git a/http-client/src/main/java/ai/vespa/hosted/client/HttpClient.java b/http-client/src/main/java/ai/vespa/hosted/client/HttpClient.java
index ea8328ed793..4da887f0cbb 100644
--- a/http-client/src/main/java/ai/vespa/hosted/client/HttpClient.java
+++ b/http-client/src/main/java/ai/vespa/hosted/client/HttpClient.java
@@ -78,12 +78,6 @@ public interface HttpClient extends Closeable {
RequestBuilder body(byte[] json);
/** Sets the request body. */
- default RequestBuilder body(HttpEntity entity) {
- if (entity.isRepeatable()) return body(() -> entity);
- throw new IllegalArgumentException("entitiy must be repeatable, or a supplier must be used");
- }
-
- /** Sets the request body. */
RequestBuilder body(Supplier<HttpEntity> entity);
/** Sets query parameters without a value, like {@code ?debug&recursive}. */
diff --git a/logserver/bin/logserver-start.sh b/logserver/bin/logserver-start.sh
index d37a2f31720..942120ceb21 100755
--- a/logserver/bin/logserver-start.sh
+++ b/logserver/bin/logserver-start.sh
@@ -81,7 +81,7 @@ cd ${VESPA_HOME} || { echo "Cannot cd to ${VESPA_HOME}" 1>&2; exit 1; }
heap_min=32
heap_max=256
-addopts="-server -Xms${heap_min}m -Xmx${heap_max}m -XX:+PreserveFramePointer $(get_jvm_hugepage_settings $heap_max) -XX:CompressedClassSpaceSize=32m -XX:MaxDirectMemorySize=32m -XX:ThreadStackSize=448 -XX:MaxJavaStackTraceDepth=1000 -XX:ActiveProcessorCount=2 -XX:-OmitStackTraceInFastThrow -Djava.io.tmpdir=${VESPA_HOME}/tmp"
+addopts="-server -Xms${heap_min}m -Xmx${heap_max}m -XX:+PreserveFramePointer $(get_jvm_hugepage_settings $heap_max) -XX:CompressedClassSpaceSize=32m -XX:MaxDirectMemorySize=32m -XX:ThreadStackSize=448 -XX:MaxJavaStackTraceDepth=1000 -XX:ActiveProcessorCount=2 -XX:-OmitStackTraceInFastThrow -Djava.io.tmpdir=${VESPA_HOME}/var/tmp"
oomopt="-XX:+ExitOnOutOfMemoryError"
diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/servicedump/VespaServiceDumperImpl.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/servicedump/VespaServiceDumperImpl.java
index 3a0cd412a2e..43b4df7415e 100644
--- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/servicedump/VespaServiceDumperImpl.java
+++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/servicedump/VespaServiceDumperImpl.java
@@ -98,7 +98,7 @@ public class VespaServiceDumperImpl implements VespaServiceDumper {
handleFailure(context, request, startedAt, "No artifacts requested");
return;
}
- ContainerPath directory = context.paths().underVespaHome("tmp/vespa-service-dump-" + request.getCreatedMillisOrNull());
+ ContainerPath directory = context.paths().underVespaHome("var/tmp/vespa-service-dump-" + request.getCreatedMillisOrNull());
UnixPath unixPathDirectory = new UnixPath(directory);
try {
context.log(log, Level.INFO,
diff --git a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/maintenance/servicedump/VespaServiceDumperImplTest.java b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/maintenance/servicedump/VespaServiceDumperImplTest.java
index 081f0038e06..5366156cfbe 100644
--- a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/maintenance/servicedump/VespaServiceDumperImplTest.java
+++ b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/maintenance/servicedump/VespaServiceDumperImplTest.java
@@ -44,7 +44,7 @@ class VespaServiceDumperImplTest {
private static final String HOSTNAME = "host-1.domain.tld";
private final FileSystem fileSystem = TestFileSystem.create();
- private final Path tmpDirectory = fileSystem.getPath("/data/vespa/storage/host-1/opt/vespa/tmp");
+ private final Path tmpDirectory = fileSystem.getPath("/data/vespa/storage/host-1/opt/vespa/var/tmp");
@BeforeEach
void create_tmp_directory() throws IOException {
@@ -84,11 +84,11 @@ class VespaServiceDumperImplTest {
verify(operations).executeCommandInContainer(
context, context.users().vespa(), "/opt/vespa/libexec/vespa/find-pid", "default/container.1");
verify(operations).executeCommandInContainer(
- context, context.users().vespa(), "perf", "record", "-g", "--output=/opt/vespa/tmp/vespa-service-dump-1600000000000/perf-record.bin",
+ context, context.users().vespa(), "perf", "record", "-g", "--output=/opt/vespa/var/tmp/vespa-service-dump-1600000000000/perf-record.bin",
"--pid=12345", "sleep", "45");
verify(operations).executeCommandInContainer(
- context, context.users().vespa(), "bash", "-c", "perf report --input=/opt/vespa/tmp/vespa-service-dump-1600000000000/perf-record.bin" +
- " > /opt/vespa/tmp/vespa-service-dump-1600000000000/perf-report.txt");
+ context, context.users().vespa(), "bash", "-c", "perf report --input=/opt/vespa/var/tmp/vespa-service-dump-1600000000000/perf-record.bin" +
+ " > /opt/vespa/var/tmp/vespa-service-dump-1600000000000/perf-report.txt");
String expectedJson = "{\"createdMillis\":1600000000000,\"startedAt\":1600001000000,\"completedAt\":1600001000000," +
"\"location\":\"s3://uri-1/tenant1/service-dump/default-container-1-1600000000000/\"," +
@@ -127,7 +127,7 @@ class VespaServiceDumperImplTest {
context, context.users().vespa(), "/opt/vespa/libexec/vespa/find-pid", "default/container.1");
verify(operations).executeCommandInContainer(
context, context.users().vespa(), "jcmd", "12345", "JFR.start", "name=host-admin", "path-to-gc-roots=true", "settings=profile",
- "filename=/opt/vespa/tmp/vespa-service-dump-1600000000000/recording.jfr", "duration=30s");
+ "filename=/opt/vespa/var/tmp/vespa-service-dump-1600000000000/recording.jfr", "duration=30s");
verify(operations).executeCommandInContainer(context, context.users().vespa(), "jcmd", "12345", "JFR.check", "name=host-admin");
String expectedJson = "{\"createdMillis\":1600000000000,\"startedAt\":1600001000000," +
diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/Node.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/Node.java
index 768036fd284..de1f9e65415 100644
--- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/Node.java
+++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/Node.java
@@ -118,7 +118,6 @@ public final class Node implements Nodelike {
if (!ipConfig.pool().ipSet().isEmpty()) throw new IllegalArgumentException("A child node cannot have an IP address pool");
if (modelName.isPresent()) throw new IllegalArgumentException("A child node cannot have model name set");
if (switchHostname.isPresent()) throw new IllegalArgumentException("A child node cannot have switch hostname set");
- if (!cloudAccount.isEmpty()) throw new IllegalArgumentException("A child node cannot have cloud account set");
}
if (type != NodeType.host && reservedTo.isPresent())
diff --git a/parent/pom.xml b/parent/pom.xml
index 549d66e37a0..66fffa82b6e 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -495,61 +495,6 @@
<!-- No version property, as we don't want maven-dependency-plugin to alert about newer versions. -->
<version>3.1.9</version>
</dependency>
- <dependency> <!-- Control netty-all version -->
- <groupId>io.netty</groupId>
- <artifactId>netty-all</artifactId>
- <version>${netty.version}</version>
- </dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-buffer</artifactId>
- <version>${netty.version}</version>
- </dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-common</artifactId>
- <version>${netty.version}</version>
- </dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-codec</artifactId>
- <version>${netty.version}</version>
- </dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-codec-http2</artifactId>
- <version>${netty.version}</version>
- </dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-codec-http</artifactId>
- <version>${netty.version}</version>
- </dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-transport</artifactId>
- <version>${netty.version}</version>
- </dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-transport-classes-epoll</artifactId>
- <version>${netty.version}</version>
- </dependency>
- <dependency> <!-- Control netty-handler version -->
- <groupId>io.netty</groupId>
- <artifactId>netty-handler</artifactId>
- <version>${netty.version}</version>
- </dependency>
- <dependency> <!-- Control netty-transport-native-epoll version -->
- <groupId>io.netty</groupId>
- <artifactId>netty-transport-native-epoll</artifactId>
- <version>${netty.version}</version>
- </dependency>
- <dependency> <!-- Control netty-handler version -->
- <groupId>io.netty</groupId>
- <artifactId>netty-tcnative</artifactId>
- <version>${netty-tcnative.version}</version>
- </dependency>
<dependency>
<groupId>com.github.tomakehurst</groupId>
<artifactId>wiremock-jre8-standalone</artifactId>
@@ -1075,104 +1020,6 @@
<artifactId>json</artifactId>
<version>${org.json.version}</version>
</dependency>
- <dependency> <!-- Due to hadoop-common pulling in 1.7.7 -->
- <groupId>org.apache.avro</groupId>
- <artifactId>avro</artifactId>
- <version>${avro.version}</version>
- </dependency>
- <dependency> <!-- Due to hadoop-common pulling in 9.8.1 -->
- <groupId>com.nimbusds</groupId>
- <artifactId>nimbus-jose-jwt</artifactId>
- <version>${nimbus.version}</version>
- </dependency>
- <dependency> <!-- Due to hadoop-common pulling in older version -->
- <groupId>net.minidev</groupId>
- <artifactId>json-smart</artifactId>
- <version>${json-smart.version}</version>
- </dependency>
- <dependency>
- <!-- Force fresh woodstox-core without security issue hadoop-3.3.4 -->
- <groupId>com.fasterxml.woodstox</groupId>
- <artifactId>woodstox-core</artifactId>
- <version>${woodstox.version}</version>
- </dependency>
- <dependency>
- <!-- Force fresh jersey-json without security issue hadoop-3.3.4 -->
- <groupId>com.sun.jersey</groupId>
- <artifactId>jersey-json</artifactId>
- <version>${sun-jersey-json.version}</version>
- </dependency>
- <dependency>
- <!-- Force fresh jettison without security issue hadoop-3.3.4 -->
- <groupId>org.codehaus.jettison</groupId>
- <artifactId>jettison</artifactId>
- <version>${jettison.version}</version>
- </dependency>
- <dependency>
- <!-- Transitive dependencies from pig-0.16 up-to-date -->
- <groupId>tomcat</groupId>
- <artifactId>jasper-runtime</artifactId>
- <version>${tomcat-jasper.version}</version>
- </dependency>
- <dependency>
- <!-- Transitive dependencies from pig-0.16 up-to-date -->
- <groupId>tomcat</groupId>
- <artifactId>jasper-compiler</artifactId>
- <version>${tomcat-jasper.version}</version>
- </dependency>
- <!-- Hadoop dependencies -->
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <version>${hadoop.version}</version>
- <exclusions>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <version>${hadoop.version}</version>
- <exclusions>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.pig</groupId>
- <artifactId>pig</artifactId>
- <version>${pig.version}</version>
- <classifier>h2</classifier>
- </dependency>
- <dependency>
- <!-- Hadoop test dependency -->
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-minicluster</artifactId>
- <version>${hadoop.version}</version>
- <exclusions>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
</dependencies>
</dependencyManagement>
@@ -1188,7 +1035,6 @@
<!-- Athenz dependencies. Make sure these dependencies match those in Vespa's internal repositories -->
<athenz.version>1.10.54</athenz.version> <!-- WARNING: sync cloud-tenant-base-dependencies-enforcer/pom.xml -->
- <avro.version>1.11.1</avro.version>
<aws-sdk.version>1.12.331</aws-sdk.version>
<!-- Athenz END -->
@@ -1207,15 +1053,11 @@
<felix.version>7.0.1</felix.version>
<felix.log.version>1.0.1</felix.log.version>
<findbugs.version>3.0.2</findbugs.version> <!-- Should be kept in sync with guava -->
- <groovy.version>3.0.13</groovy.version>
- <hadoop.version>3.3.4</hadoop.version>
<hdrhistogram.version>2.1.12</hdrhistogram.version>
- <jettison.version>1.5.1</jettison.version>
<jetty.version>9.4.49.v20220914</jetty.version>
<jetty-alpn.version>1.1.3.v20160715</jetty-alpn.version>
<jjwt.version>0.11.2</jjwt.version>
<jna.version>5.11.0</jna.version>
- <json-smart.version>2.4.8</json-smart.version>
<junit.version>5.8.1</junit.version>
<maven-archiver.version>3.5.2</maven-archiver.version>
<maven-assembly-plugin.version>3.3.0</maven-assembly-plugin.version>
@@ -1236,21 +1078,14 @@
<maven-site-plugin.version>3.9.1</maven-site-plugin.version>
<maven-source-plugin.version>3.2.1</maven-source-plugin.version>
<mockito.version>4.0.0</mockito.version>
- <netty.version>4.1.84.Final</netty.version>
- <netty-tcnative.version>2.0.54.Final</netty-tcnative.version>
- <nimbus.version>9.25.6</nimbus.version>
<onnxruntime.version>1.12.1</onnxruntime.version> <!-- WARNING: sync cloud-tenant-base-dependencies-enforcer/pom.xml -->
<org.json.version>20220320</org.json.version>
<org.lz4.version>1.8.0</org.lz4.version>
- <pig.version>0.16.0</pig.version>
<prometheus.client.version>0.6.0</prometheus.client.version>
<protobuf.version>3.21.7</protobuf.version>
<spifly.version>1.3.5</spifly.version>
- <sun-jersey-json.version>1.19.4</sun-jersey-json.version>
<surefire.version>2.22.2</surefire.version>
- <tomcat-jasper.version>5.5.23</tomcat-jasper.version>
- <wiremock.version>2.34.0</wiremock.version>
- <woodstox.version>6.4.0</woodstox.version>
+ <wiremock.version>2.35.0</wiremock.version>
<zookeeper.client.version>3.8.0</zookeeper.client.version>
<doclint>all</doclint>
diff --git a/pom.xml b/pom.xml
index de19e9585c0..da105771d8d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -131,7 +131,6 @@
<module>vespa-feed-client</module>
<module>vespa-feed-client-api</module>
<module>vespa-feed-client-cli</module>
- <module>vespa-hadoop</module>
<module>vespa-maven-plugin</module>
<module>vespa-osgi-testrunner</module>
<module>vespa-testrunner-components</module>
diff --git a/standalone-container/src/main/java/com/yahoo/container/standalone/StandaloneContainerApplication.java b/standalone-container/src/main/java/com/yahoo/container/standalone/StandaloneContainerApplication.java
index a677c69cb79..f80cdbed900 100644
--- a/standalone-container/src/main/java/com/yahoo/container/standalone/StandaloneContainerApplication.java
+++ b/standalone-container/src/main/java/com/yahoo/container/standalone/StandaloneContainerApplication.java
@@ -67,7 +67,7 @@ public class StandaloneContainerApplication implements Application {
public static final Named APPLICATION_PATH_NAME = Names.named(APPLICATION_LOCATION_INSTALL_VARIABLE);
public static final Named CONFIG_MODEL_REPO_NAME = Names.named("ConfigModelRepo");
- private static final String DEFAULT_TMP_BASE_DIR = Defaults.getDefaults().underVespaHome("tmp");
+ private static final String DEFAULT_TMP_BASE_DIR = Defaults.getDefaults().underVespaHome("var/tmp");
private static final String TMP_DIR_NAME = "standalone_container";
private static final StaticConfigDefinitionRepo configDefinitionRepo = new StaticConfigDefinitionRepo();
diff --git a/vespa-hadoop/OWNERS b/vespa-hadoop/OWNERS
deleted file mode 100644
index 6b09ce48bd4..00000000000
--- a/vespa-hadoop/OWNERS
+++ /dev/null
@@ -1 +0,0 @@
-lesters
diff --git a/vespa-hadoop/README b/vespa-hadoop/README
deleted file mode 100644
index 1b567b88c1d..00000000000
--- a/vespa-hadoop/README
+++ /dev/null
@@ -1,4 +0,0 @@
-The Vespa Hadoop client.
-
-Contains APIs for feeding and querying Vespa from the grid.
-
diff --git a/vespa-hadoop/abi-spec.json b/vespa-hadoop/abi-spec.json
deleted file mode 100644
index 9e26dfeeb6e..00000000000
--- a/vespa-hadoop/abi-spec.json
+++ /dev/null
@@ -1 +0,0 @@
-{} \ No newline at end of file
diff --git a/vespa-hadoop/pom.xml b/vespa-hadoop/pom.xml
deleted file mode 100644
index 43f7c17967d..00000000000
--- a/vespa-hadoop/pom.xml
+++ /dev/null
@@ -1,166 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!-- Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>com.yahoo.vespa</groupId>
- <artifactId>parent</artifactId>
- <version>8-SNAPSHOT</version>
- <relativePath>../parent/pom.xml</relativePath>
- </parent>
- <artifactId>vespa-hadoop</artifactId>
- <version>8-SNAPSHOT</version>
- <name>${project.artifactId}</name>
- <description>Integration tools between Vespa and Hadoop</description>
-
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- </properties>
-
- <dependencies>
- <!-- Hadoop dependencies -->
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.pig</groupId>
- <artifactId>pig</artifactId>
- <classifier>h2</classifier>
- <scope>provided</scope>
- </dependency>
-
- <!-- These are inherited from parent. Needed for correct versions on Hadoop. -->
- <dependency>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpclient</artifactId>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpcore</artifactId>
- <scope>compile</scope>
- </dependency>
-
- <!-- Test dependencies -->
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-minicluster</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <!-- This is a HACK due to hadoop relying on mockito in NameNodeAdapter, but not providing it. Brum, brum !! -->
- <groupId>org.mockito</groupId>
- <artifactId>mockito-core</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.junit.jupiter</groupId>
- <artifactId>junit-jupiter</artifactId>
- <scope>test</scope>
- </dependency>
- <!-- Vespa feeding dependencies -->
- <dependency>
- <groupId>com.yahoo.vespa</groupId>
- <artifactId>vespa-feed-client</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>com.yahoo.vespa</groupId>
- <artifactId>vespa-feed-client-api</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <!-- Jackson dependencies used in this module -->
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-core</artifactId>
- </dependency>
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- </dependency>
-
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <executions>
- <execution>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- <configuration>
- <minimizeJar>false</minimizeJar>
-
- <relocations>
- <relocation>
- <pattern>com.google</pattern>
- <shadedPattern>shaded.vespa</shadedPattern>
- </relocation>
- <relocation>
- <pattern>org.bouncycastle</pattern>
- <shadedPattern>shaded.vespa.bouncycastle</shadedPattern>
- </relocation>
- <relocation>
- <pattern>commons-codec</pattern>
- <shadedPattern>shaded.vespa</shadedPattern>
- </relocation>
- <relocation>
- <pattern>commons-logging</pattern>
- <shadedPattern>shaded.vespa</shadedPattern>
- </relocation>
- <relocation>
- <pattern>org.apache</pattern>
- <shadedPattern>shaded.vespa</shadedPattern>
- <excludes>
- <exclude>org.apache.hadoop.**</exclude>
- <exclude>org.apache.pig.**</exclude>
- </excludes>
- </relocation>
- <relocation>
- <pattern>com.fasterxml</pattern>
- <shadedPattern>shaded.vespa</shadedPattern>
- </relocation>
- <relocation>
- <pattern>org.codehaus</pattern>
- <shadedPattern>shaded.vespa</shadedPattern>
- </relocation>
- <relocation>
- <pattern>io.airlift</pattern>
- <shadedPattern>shaded.vespa</shadedPattern>
- </relocation>
- <relocation>
- <pattern>com.ctc.wstx</pattern>
- <shadedPattern>shaded.vespa</shadedPattern>
- </relocation>
- </relocations>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <configuration>
- <release>${vespaClients.jdk.releaseVersion}</release>
- </configuration>
- </plugin>
- </plugins>
-
- </build>
-</project>
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputCommitter.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputCommitter.java
deleted file mode 100644
index 6cb4ef45a96..00000000000
--- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputCommitter.java
+++ /dev/null
@@ -1,37 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.hadoop.mapreduce;
-
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import java.io.IOException;
-
-/**
- * The output committer describes the commit task output for a Map-Reduce
- * job. Not currently used, but is part of the Hadoop protocol since 2.7.
- *
- * @author lesters
- */
-public class VespaOutputCommitter extends OutputCommitter {
- @Override
- public void setupJob(JobContext jobContext) throws IOException {
- }
-
- @Override
- public void setupTask(TaskAttemptContext taskAttemptContext) throws IOException {
- }
-
- @Override
- public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) throws IOException {
- return false;
- }
-
- @Override
- public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException {
- }
-
- @Override
- public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException {
- }
-}
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java
deleted file mode 100644
index e49a5e17970..00000000000
--- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java
+++ /dev/null
@@ -1,60 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.hadoop.mapreduce;
-
-import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration;
-import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import java.io.IOException;
-import java.util.Properties;
-import java.util.logging.Logger;
-
-/**
- * An output specification for writing to Vespa instances in a Map-Reduce job.
- * Mainly returns an instance of a {@link VespaRecordWriter} that does the
- * actual feeding to Vespa.
- *
- * @author lesters
- */
-@SuppressWarnings("rawtypes")
-public class VespaOutputFormat extends OutputFormat {
-
- private static final Logger log = Logger.getLogger(VespaOutputFormat.class.getName());
-
- final Properties configOverride;
-
- public VespaOutputFormat() {
- super();
- this.configOverride = null;
- }
-
- public VespaOutputFormat(Properties configOverride) {
- super();
- this.configOverride = configOverride;
- }
-
-
- @Override
- @SuppressWarnings("deprecation")
- public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException {
- VespaCounters counters = VespaCounters.get(context);
- VespaConfiguration configuration = VespaConfiguration.get(context.getConfiguration(), configOverride);
- return new VespaRecordWriter(configuration, counters);
- }
-
-
- @Override
- public OutputCommitter getOutputCommitter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
- return new VespaOutputCommitter();
- }
-
-
- @Override
- public void checkOutputSpecs(JobContext jobContext) throws IOException, InterruptedException {
- }
-
-}
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java
deleted file mode 100644
index c450d7cdeef..00000000000
--- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java
+++ /dev/null
@@ -1,153 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.hadoop.mapreduce;
-
-import ai.vespa.feed.client.FeedClient;
-import ai.vespa.feed.client.FeedClientBuilder;
-import ai.vespa.feed.client.JsonFeeder;
-import ai.vespa.feed.client.OperationParseException;
-import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration;
-import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import java.io.IOException;
-import java.net.URI;
-import java.time.Duration;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import static java.util.stream.Collectors.toList;
-
-/**
- * {@link VespaRecordWriter} sends the output &lt;key, value&gt; to one or more Vespa endpoints using vespa-feed-client.
- *
- * @author bjorncs
- */
-public class VespaRecordWriter extends RecordWriter<Object, Object> {
-
- private final static Logger log = Logger.getLogger(VespaRecordWriter.class.getCanonicalName());
-
- private final VespaCounters counters;
- private final VespaConfiguration config;
-
- private boolean initialized = false;
- private JsonFeeder feeder;
-
- protected VespaRecordWriter(VespaConfiguration config, VespaCounters counters) {
- this.counters = counters;
- this.config = config;
- }
-
- @Override
- public void write(Object key, Object data) throws IOException {
- initializeOnFirstWrite();
- String json = data.toString().trim();
- feeder.feedSingle(json)
- .whenComplete((result, error) -> {
- if (error != null) {
- if (error instanceof OperationParseException) {
- counters.incrementDocumentsSkipped(1);
- } else {
- String msg = "Failed to feed single document: " + error;
- log.log(Level.WARNING, msg, error);
- counters.incrementDocumentsFailed(1);
- }
- } else {
- counters.incrementDocumentsOk(1);
- }
- });
- counters.incrementDocumentsSent(1);
- if (counters.getDocumentsSent() % config.progressInterval() == 0) {
- String progress = String.format("Feed progress: %d / %d / %d / %d (sent, ok, failed, skipped)",
- counters.getDocumentsSent(),
- counters.getDocumentsOk(),
- counters.getDocumentsFailed(),
- counters.getDocumentsSkipped());
- log.info(progress);
- }
- }
-
- @Override
- public void close(TaskAttemptContext context) throws IOException {
- if (feeder != null) {
- feeder.close();
- feeder = null;
- initialized = false;
- }
- }
-
- /** Override method to alter {@link FeedClient} configuration */
- protected void onFeedClientInitialization(FeedClientBuilder builder) {}
-
- private void initializeOnFirstWrite() {
- if (initialized) return;
- useRandomizedStartupDelayIfEnabled();
- feeder = createJsonStreamFeeder();
- initialized = true;
- }
-
- private void useRandomizedStartupDelayIfEnabled() {
- if (!config.dryrun() && config.randomStartupSleepMs() > 0) {
- int delay = ThreadLocalRandom.current().nextInt(config.randomStartupSleepMs());
- log.info("Delaying startup by " + delay + " ms");
- try {
- Thread.sleep(delay);
- } catch (Exception e) {}
- }
- }
-
-
- private JsonFeeder createJsonStreamFeeder() {
- FeedClient feedClient = createFeedClient();
- JsonFeeder.Builder builder = JsonFeeder.builder(feedClient)
- .withTimeout(Duration.ofMinutes(10));
- if (config.route() != null) {
- builder.withRoute(config.route());
- }
- return builder.build();
-
- }
-
- private FeedClient createFeedClient() {
- List<URI> endpoints = endpointUris(config);
- log.info("Using endpoints " + endpoints);
- int streamsPerConnection = streamsPerConnection(config);
- log.log(Level.INFO, "Using {0} max streams per connection", new Object[] {streamsPerConnection});
- log.log(Level.INFO, "Using {0} connections", new Object[] {config.numConnections()});
- FeedClientBuilder feedClientBuilder = FeedClientBuilder.create(endpoints)
- .setConnectionsPerEndpoint(config.numConnections())
- .setMaxStreamPerConnection(streamsPerConnection)
- .setDryrun(config.dryrun())
- .setRetryStrategy(retryStrategy(config));
- if (config.proxyHost() != null) {
- URI proxyUri = URI.create(String.format(
- "%s://%s:%d", config.proxyScheme(), config.proxyHost(), config.proxyPort()));
- log.info("Using proxy " + proxyUri);
- feedClientBuilder.setProxy(proxyUri);
- }
-
- onFeedClientInitialization(feedClientBuilder);
- return feedClientBuilder.build();
- }
-
- private static FeedClient.RetryStrategy retryStrategy(VespaConfiguration config) {
- int maxRetries = config.numRetries();
- return new FeedClient.RetryStrategy() {
- @Override public int retries() { return maxRetries; }
- };
- }
-
- private static int streamsPerConnection(VespaConfiguration config) {
- return Math.min(256, config.maxInFlightRequests() / config.numConnections());
- }
-
- private static List<URI> endpointUris(VespaConfiguration config) {
- String scheme = config.useSSL().orElse(true) ? "https" : "http";
- return Arrays.stream(config.endpoint().split(","))
- .map(hostname -> URI.create(String.format("%s://%s:%d/", scheme, hostname, config.defaultPort())))
- .collect(toList());
- }
-}
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaSimpleJsonInputFormat.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaSimpleJsonInputFormat.java
deleted file mode 100644
index f9bcba96a69..00000000000
--- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaSimpleJsonInputFormat.java
+++ /dev/null
@@ -1,99 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.hadoop.mapreduce;
-
-import com.fasterxml.jackson.core.JsonFactory;
-import com.fasterxml.jackson.core.JsonFactoryBuilder;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonToken;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-
-import java.io.BufferedInputStream;
-import java.io.IOException;
-
-/**
- * Simple JSON reader which splits the input file along JSON object boundaries.
- *
- * There are two cases handled here:
- * 1. Each line contains a JSON object, i.e. { ... }
- * 2. The file contains an array of objects with arbitrary line breaks, i.e. [ {...}, {...} ]
- *
- * Not suitable for cases where you want to extract objects from some other arbitrary structure.
- *
- * TODO: Support config which points to a array in the JSON as start point for object extraction,
- * ala how it is done in VespaHttpClient.parseResultJson, i.e. support rootNode config.
- *
- * @author lesters
- */
-public class VespaSimpleJsonInputFormat extends FileInputFormat<Text, NullWritable> {
-
- @Override
- public RecordReader<Text, NullWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
- return new VespaJsonRecordReader();
- }
-
- public static class VespaJsonRecordReader extends RecordReader<Text, NullWritable> {
- private long remaining;
- private JsonParser parser;
- private Text currentKey;
- private NullWritable currentValue = NullWritable.get();
-
- @Override
- public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
- FileSplit fileSplit = (FileSplit) split;
- FSDataInputStream stream = FileSystem.get(context.getConfiguration()).open(fileSplit.getPath());
- if (fileSplit.getStart() != 0) {
- stream.seek(fileSplit.getStart());
- }
-
- remaining = fileSplit.getLength();
- JsonFactory factory = new JsonFactoryBuilder().disable(JsonFactory.Feature.CANONICALIZE_FIELD_NAMES).build();
- parser = factory.createParser(new BufferedInputStream(stream));
- parser.setCodec(new ObjectMapper());
- parser.nextToken();
- if (parser.currentToken() == JsonToken.START_ARRAY) {
- parser.nextToken();
- }
- }
-
- @Override
- public boolean nextKeyValue() throws IOException, InterruptedException {
- if (parser.currentToken() != JsonToken.START_OBJECT) {
- return true;
- }
- currentKey = new Text(parser.readValueAsTree().toString());
- parser.nextToken();
- return false;
- }
-
- @Override
- public Text getCurrentKey() throws IOException, InterruptedException {
- return currentKey;
- }
-
- @Override
- public NullWritable getCurrentValue() throws IOException, InterruptedException {
- return currentValue;
- }
-
- @Override
- public float getProgress() throws IOException, InterruptedException {
- return parser.getCurrentLocation().getByteOffset() / remaining;
- }
-
- @Override
- public void close() throws IOException {
- parser.close();
- }
- }
-
-}
-
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/package-info.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/package-info.java
deleted file mode 100644
index 22a742566cd..00000000000
--- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/package-info.java
+++ /dev/null
@@ -1,10 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-/**
- * com.yahoo.vespa.hadoop.mapreduce contains classes and utilities
- * to enable feeding directly to Vespa endpoints from mapreduce.
- * It is a minimal layer over the Vespa HTTP client.
- *
- * NOTE: This is a PUBLIC API, but not annotated as such because this is not a bundle and
- * we don't want to introduce Vespa dependencies.
- */
-package com.yahoo.vespa.hadoop.mapreduce;
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/TupleTools.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/TupleTools.java
deleted file mode 100644
index 5147dc3496c..00000000000
--- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/TupleTools.java
+++ /dev/null
@@ -1,70 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.hadoop.mapreduce.util;
-
-import org.apache.pig.ResourceSchema;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-public class TupleTools {
-
- private static final Pattern pattern = Pattern.compile("<([\\w]+)>");
-
- public static Map<String, Object> tupleMap(Schema schema, Tuple tuple) throws IOException {
- Map<String, Object> tupleMap = new HashMap<>((int)Math.ceil(tuple.size() / 0.75) + 1);
- List<Schema.FieldSchema> schemas = schema.getFields();
- for (int i = 0; i < schemas.size(); i++) {
- Schema.FieldSchema field = schemas.get(i);
- String alias = field.alias;
- Object value = tuple.get(i);
- if (value != null) {
- tupleMap.put(alias, value);
- }
- }
- return tupleMap;
- }
-
- public static Map<String, Object> tupleMap(ResourceSchema schema, Tuple tuple) throws IOException {
- Map<String, Object> tupleMap = new HashMap<>((int)Math.ceil(tuple.size() / 0.75) + 1);
- ResourceSchema.ResourceFieldSchema[] schemas = schema.getFields();
- for (int i = 0; i < schemas.length; i++) {
- ResourceSchema.ResourceFieldSchema field = schemas[i];
- String alias = field.getName();
- Object value = tuple.get(i);
- if (value != null) {
- tupleMap.put(alias, value);
- }
- }
- return tupleMap;
- }
-
- public static String toString(Schema schema, Tuple tuple, String template) throws IOException {
- return toString(tupleMap(schema, tuple), template);
- }
-
- public static String toString(Map<String,Object> fields, String template) {
- if (template == null || template.length() == 0) {
- return template;
- }
- if (fields == null || fields.size() == 0) {
- return template;
- }
-
- Matcher m = pattern.matcher(template);
- StringBuffer sb = new StringBuffer();
- while (m.find()) {
- Object value = fields.get(m.group(1));
- String replacement = value != null ? value.toString() : m.group(0);
- m.appendReplacement(sb, Matcher.quoteReplacement(replacement));
- }
- m.appendTail(sb);
- return sb.toString();
- }
-
-}
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java
deleted file mode 100644
index ae0b6a58155..00000000000
--- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java
+++ /dev/null
@@ -1,194 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.hadoop.mapreduce.util;
-
-import org.apache.hadoop.conf.Configuration;
-
-import java.io.IOException;
-import java.io.StringReader;
-import java.util.Optional;
-import java.util.Properties;
-
-public class VespaConfiguration {
-
- public static final String ENDPOINT = "vespa.feed.endpoint";
- public static final String DEFAULT_PORT = "vespa.feed.defaultport";
- public static final String USE_SSL = "vespa.feed.ssl";
- public static final String PROXY_HOST = "vespa.feed.proxy.host";
- public static final String PROXY_PORT = "vespa.feed.proxy.port";
- public static final String PROXY_SCHEME = "vespa.feed.proxy.scheme";
- public static final String DRYRUN = "vespa.feed.dryrun";
- public static final String USE_COMPRESSION = "vespa.feed.usecompression";
- public static final String PROGRESS_REPORT = "vespa.feed.progress.interval";
- public static final String CONNECTIONS = "vespa.feed.connections";
- public static final String THROTTLER_MIN_SIZE = "vespa.feed.throttler.min.size";
- public static final String QUERY_CONNECTION_TIMEOUT = "vespa.query.connection.timeout";
- public static final String ROUTE = "vespa.feed.route";
- public static final String MAX_SLEEP_TIME_MS = "vespa.feed.max.sleep.time.ms";
- public static final String MAX_IN_FLIGHT_REQUESTS = "vespa.feed.max.in.flight.requests";
- public static final String RANDOM_STARTUP_SLEEP = "vespa.feed.random.startup.sleep.ms";
- public static final String NUM_RETRIES = "vespa.feed.num.retries";
-
- private final Configuration conf;
- private final Properties override;
-
- private VespaConfiguration(Configuration conf, Properties override) {
- this.conf = conf;
- this.override = override;
- }
-
-
- public static VespaConfiguration get(Configuration conf, Properties override) {
- return new VespaConfiguration(conf, override);
- }
-
-
- public String endpoint() {
- return getString(ENDPOINT);
- }
-
-
- public int defaultPort() {
- return getInt(DEFAULT_PORT, 4080);
- }
-
-
- public Optional<Boolean> useSSL() {
- String raw = getString(USE_SSL);
- if (raw == null || raw.trim().isEmpty()) return Optional.empty();
- return Optional.of(Boolean.parseBoolean(raw));
- }
-
-
- public String proxyHost() {
- return getString(PROXY_HOST);
- }
-
-
- public int proxyPort() {
- return getInt(PROXY_PORT, 4080);
- }
-
-
- public String proxyScheme() {
- String raw = getString(PROXY_SCHEME);
- if (raw == null) return "http";
- return raw;
- }
-
-
- public boolean dryrun() {
- return getBoolean(DRYRUN, false);
- }
-
-
- public boolean useCompression() {
- return getBoolean(USE_COMPRESSION, true);
- }
-
-
- public int numConnections() {
- return getInt(CONNECTIONS, 1);
- }
-
-
- public int throttlerMinSize() {
- return getInt(THROTTLER_MIN_SIZE, 0);
- }
-
-
- public int queryConnectionTimeout() {
- return getInt(QUERY_CONNECTION_TIMEOUT, 10000);
- }
-
-
- public String route() {
- return getString(ROUTE);
- }
-
-
- public int maxSleepTimeMs() {
- return getInt(MAX_SLEEP_TIME_MS, 10000);
- }
-
-
- public int maxInFlightRequests() {
- return getInt(MAX_IN_FLIGHT_REQUESTS, 500);
- }
-
-
- public int randomStartupSleepMs() {
- return getInt(RANDOM_STARTUP_SLEEP, 30000);
- }
-
-
- public int numRetries() {
- return getInt(NUM_RETRIES, 100);
- }
-
-
- public int progressInterval() {
- return getInt(PROGRESS_REPORT, 1000);
- }
-
- public String getString(String name) {
- if (override != null && override.containsKey(name)) {
- return override.getProperty(name);
- }
- return conf != null ? conf.get(name) : null;
- }
-
-
- public int getInt(String name, int defaultValue) {
- if (override != null && override.containsKey(name)) {
- return Integer.parseInt(override.getProperty(name));
- }
- return conf != null ? conf.getInt(name, defaultValue) : defaultValue;
- }
-
-
- public boolean getBoolean(String name, boolean defaultValue) {
- if (override != null && override.containsKey(name)) {
- return Boolean.parseBoolean(override.getProperty(name));
- }
- return conf != null ? conf.getBoolean(name, defaultValue) : defaultValue;
-
- }
-
- public static Properties loadProperties(String... params) {
- Properties properties = new Properties();
- if (params != null) {
- for (String s : params) {
- try {
- properties.load(new StringReader(s));
- } catch (IOException e) {
- throw new IllegalArgumentException(e);
- }
- }
- }
- return properties;
- }
-
-
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append(ENDPOINT + ": " + endpoint() + "\n");
- sb.append(DEFAULT_PORT + ": " + defaultPort() + "\n");
- sb.append(USE_SSL + ": " + useSSL().map(Object::toString).orElse("<empty>") + "\n");
- sb.append(PROXY_HOST + ": " + proxyHost() + "\n");
- sb.append(PROXY_PORT + ": " + proxyPort() + "\n");
- sb.append(PROXY_SCHEME + ": " + proxyScheme() + "\n");
- sb.append(DRYRUN + ": " + dryrun() +"\n");
- sb.append(USE_COMPRESSION + ": " + useCompression() +"\n");
- sb.append(PROGRESS_REPORT + ": " + progressInterval() +"\n");
- sb.append(CONNECTIONS + ": " + numConnections() +"\n");
- sb.append(THROTTLER_MIN_SIZE + ": " + throttlerMinSize() +"\n");
- sb.append(QUERY_CONNECTION_TIMEOUT + ": " + queryConnectionTimeout() +"\n");
- sb.append(ROUTE + ": " + route() +"\n");
- sb.append(MAX_SLEEP_TIME_MS + ": " + maxSleepTimeMs() +"\n");
- sb.append(MAX_IN_FLIGHT_REQUESTS + ": " + maxInFlightRequests() +"\n");
- sb.append(RANDOM_STARTUP_SLEEP + ": " + randomStartupSleepMs() +"\n");
- sb.append(NUM_RETRIES + ": " + numRetries() +"\n");
- return sb.toString();
- }
-
-}
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaCounters.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaCounters.java
deleted file mode 100644
index 63b4b6600fd..00000000000
--- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaCounters.java
+++ /dev/null
@@ -1,105 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.hadoop.mapreduce.util;
-
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.Counters;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import java.io.IOException;
-
-public class VespaCounters {
-
- public static final String GROUP = "Vespa Feed Counters";
- public static final String DOCS_OK = "Documents ok";
- public static final String DOCS_SENT = "Documents sent";
- public static final String DOCS_FAILED = "Documents failed";
- public static final String DOCS_SKIPPED = "Documents skipped";
-
- private final Counter documentsSent;
- private final Counter documentsOk;
- private final Counter documentsFailed;
- private final Counter documentsSkipped;
-
-
- private VespaCounters(Job job) throws IOException {
- Counters counters = job.getCounters();
- documentsSent = counters.findCounter(GROUP, DOCS_SENT);
- documentsOk = counters.findCounter(GROUP, DOCS_OK);
- documentsFailed = counters.findCounter(GROUP, DOCS_FAILED);
- documentsSkipped = counters.findCounter(GROUP, DOCS_SKIPPED);
- }
-
-
- private VespaCounters(TaskAttemptContext context) {
- documentsSent = context.getCounter(GROUP, DOCS_SENT);
- documentsOk = context.getCounter(GROUP, DOCS_OK);
- documentsFailed = context.getCounter(GROUP, DOCS_FAILED);
- documentsSkipped = context.getCounter(GROUP, DOCS_SKIPPED);
- }
-
-
- private VespaCounters(org.apache.hadoop.mapred.Counters counters) {
- documentsSent = counters.findCounter(GROUP, DOCS_SENT);
- documentsOk = counters.findCounter(GROUP, DOCS_OK);
- documentsFailed = counters.findCounter(GROUP, DOCS_FAILED);
- documentsSkipped = counters.findCounter(GROUP, DOCS_SKIPPED);
- }
-
-
- public static VespaCounters get(Job job) throws IOException {
- return new VespaCounters(job);
- }
-
-
- public static VespaCounters get(TaskAttemptContext context) {
- return new VespaCounters(context);
- }
-
-
- public static VespaCounters get(org.apache.hadoop.mapred.Counters counters) {
- return new VespaCounters(counters);
-
- }
-
-
- public long getDocumentsSent() {
- return documentsSent.getValue();
- }
-
-
- public void incrementDocumentsSent(long incr) {
- documentsSent.increment(incr);
- }
-
-
- public long getDocumentsOk() {
- return documentsOk.getValue();
- }
-
-
- public void incrementDocumentsOk(long incr) {
- documentsOk.increment(incr);
- }
-
-
- public long getDocumentsFailed() {
- return documentsFailed.getValue();
- }
-
-
- public void incrementDocumentsFailed(long incr) {
- documentsFailed.increment(incr);
- }
-
-
- public long getDocumentsSkipped() {
- return documentsSkipped.getValue();
- }
-
-
- public void incrementDocumentsSkipped(long incr) {
- documentsSkipped.increment(incr);
- }
-
-}
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaHttpClient.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaHttpClient.java
deleted file mode 100644
index c7ed52a01c0..00000000000
--- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaHttpClient.java
+++ /dev/null
@@ -1,102 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.hadoop.mapreduce.util;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.http.HttpEntity;
-import org.apache.http.HttpHost;
-import org.apache.http.HttpResponse;
-import org.apache.http.client.HttpClient;
-import org.apache.http.client.config.RequestConfig;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.impl.client.HttpClientBuilder;
-import org.apache.http.util.EntityUtils;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Scanner;
-
-public class VespaHttpClient {
-
- private final HttpClient httpClient;
-
- public VespaHttpClient() {
- this(null);
- }
-
- public VespaHttpClient(VespaConfiguration configuration) {
- httpClient = createClient(configuration);
- }
-
- public String get(String url) throws IOException {
- HttpGet httpGet = new HttpGet(url);
- HttpResponse httpResponse = httpClient.execute(httpGet);
-
- HttpEntity entity = httpResponse.getEntity();
- InputStream is = entity.getContent();
-
- String result = "";
- Scanner scanner = new Scanner(is, "UTF-8").useDelimiter("\\A");
- if (scanner.hasNext()) {
- result = scanner.next();
- }
- EntityUtils.consume(entity);
-
- if (httpResponse.getStatusLine().getStatusCode() != 200) {
- return null;
- }
-
- return result;
- }
-
- public JsonNode parseResultJson(String json, String rootNode) throws IOException {
- if (json == null || json.isEmpty()) {
- return null;
- }
- if (rootNode == null || rootNode.isEmpty()) {
- return null;
- }
-
- ObjectMapper m = new ObjectMapper();
- JsonNode node = m.readTree(json);
- if (node != null) {
- String[] path = rootNode.split("/");
- for (String p : path) {
- node = node.get(p);
-
- if (node == null) {
- return null;
- }
-
- // if node is an array, return the first node that has the correct path
- if (node.isArray()) {
- for (int i = 0; i < node.size(); ++i) {
- JsonNode n = node.get(i);
- if (n.has(p)) {
- node = n;
- break;
- }
- }
- }
-
- }
- }
- return node;
- }
-
- private HttpClient createClient(VespaConfiguration configuration) {
- HttpClientBuilder clientBuilder = HttpClientBuilder.create();
-
- RequestConfig.Builder requestConfigBuilder = RequestConfig.custom();
- if (configuration != null) {
- requestConfigBuilder.setSocketTimeout(configuration.queryConnectionTimeout());
- requestConfigBuilder.setConnectTimeout(configuration.queryConnectionTimeout());
- if (configuration.proxyHost() != null) {
- requestConfigBuilder.setProxy(new HttpHost(configuration.proxyHost(), configuration.proxyPort()));
- }
- }
- clientBuilder.setDefaultRequestConfig(requestConfigBuilder.build());
- return clientBuilder.build();
- }
-
-}
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaQuerySchema.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaQuerySchema.java
deleted file mode 100644
index cfaff44addb..00000000000
--- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaQuerySchema.java
+++ /dev/null
@@ -1,114 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.hadoop.mapreduce.util;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.util.Utils;
-import org.apache.pig.parser.ParserException;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-public class VespaQuerySchema implements Iterable<VespaQuerySchema.AliasTypePair> {
-
- private final List<AliasTypePair> tupleSchema = new ArrayList<>();
-
- public VespaQuerySchema(String schema) {
- for (String e : schema.split(",")) {
- String[] pair = e.split(":");
- String alias = pair[0].trim();
- String type = pair[1].trim();
- tupleSchema.add(new AliasTypePair(alias, type));
- }
- }
-
- public Tuple buildTuple(int rank, JsonNode hit) {
- Tuple tuple = TupleFactory.getInstance().newTuple();
-
- for (VespaQuerySchema.AliasTypePair tupleElement : tupleSchema) {
- String alias = tupleElement.getAlias();
- Byte type = DataType.findTypeByName(tupleElement.getType());
-
- // reserved word
- if ("rank".equals(alias)) {
- tuple.append(rank);
- } else {
- JsonNode field = hit;
- String[] path = alias.split("/"); // move outside
- for (String p : path) {
- field = field.get(p);
- if (field == null) {
- type = DataType.NULL; // effectively skip field as it is not found
- break;
- }
- }
- switch (type) {
- case DataType.BOOLEAN:
- tuple.append(field.asBoolean());
- break;
- case DataType.INTEGER:
- tuple.append(field.asInt());
- break;
- case DataType.LONG:
- tuple.append(field.asLong());
- break;
- case DataType.FLOAT:
- case DataType.DOUBLE:
- tuple.append(field.asDouble());
- break;
- case DataType.DATETIME:
- tuple.append(field.asText());
- break;
- case DataType.CHARARRAY:
- tuple.append(field.asText());
- break;
- default:
- // the rest of the data types are currently not supported
- }
- }
- }
- return tuple;
- }
-
- public static Schema getPigSchema(String schemaString) {
- Schema schema = null;
- schemaString = schemaString.replace("/", "_");
- schemaString = "{(" + schemaString + ")}";
- try {
- schema = Utils.getSchemaFromString(schemaString);
- } catch (ParserException e) {
- e.printStackTrace();
- }
- return schema;
- }
-
- @Override
- public Iterator<AliasTypePair> iterator() {
- return tupleSchema.iterator();
- }
-
-
- public static class AliasTypePair {
- private final String alias;
- private final String type;
-
- AliasTypePair(String alias, String type) {
- this.alias = alias;
- this.type = type;
- }
-
- public String getAlias() {
- return alias;
- }
-
- public String getType() {
- return type;
- }
-
- }
-
-}
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/package-info.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/package-info.java
deleted file mode 100644
index 41c621d2877..00000000000
--- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/package-info.java
+++ /dev/null
@@ -1,10 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-/**
- * com.yahoo.vespa.hadoop contains classes and utilities
- * to enable feeding directly to Vespa endpoints from pig and mapreduce.
- * It is a minimal layer over the Vespa HTTP client.
- *
- * NOTE: This is a PUBLIC API, but not annotated as such because this is not a bundle and
- * we don't want to introduce Vespa dependencies.
- */
-package com.yahoo.vespa.hadoop;
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java
deleted file mode 100644
index c95aa02215f..00000000000
--- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java
+++ /dev/null
@@ -1,669 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.hadoop.pig;
-
-import com.fasterxml.jackson.core.JsonEncoding;
-import com.fasterxml.jackson.core.JsonFactory;
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.yahoo.vespa.hadoop.mapreduce.util.TupleTools;
-import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration;
-import org.apache.pig.EvalFunc;
-import org.apache.pig.PigWarning;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataByteArray;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.tools.pigstats.PigStatusReporter;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.joda.time.DateTime;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.io.UncheckedIOException;
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.util.*;
-
-/**
- * A Pig UDF to convert simple Pig types into a valid Vespa JSON document format.
- *
- * @author lesters
- */
-public class VespaDocumentOperation extends EvalFunc<String> {
-
- public enum Operation {
- DOCUMENT,
- PUT,
- ID,
- REMOVE,
- UPDATE;
-
- @Override
- public String toString() {
- return super.toString().toLowerCase();
- }
-
- public static Operation fromString(String text) {
- for (Operation op : Operation.values()) {
- if (op.toString().equalsIgnoreCase(text)) {
- return op;
- }
- }
- throw new IllegalArgumentException("Unknown operation: " + text);
- }
-
- public static boolean valid(String text) {
- for (Operation op : Operation.values()) {
- if (op.toString().equalsIgnoreCase(text)) {
- return true;
- }
- }
- return false;
- }
-
- }
-
- private static final String PROPERTY_CREATE_IF_NON_EXISTENT = "create-if-non-existent";
- private static final String PROPERTY_ID_TEMPLATE = "docid";
- private static final String PROPERTY_OPERATION = "operation";
- private static final String PROPERTY_VERBOSE = "verbose";
- private static final String BAG_AS_MAP_FIELDS = "bag-as-map-fields";
- private static final String SIMPLE_ARRAY_FIELDS = "simple-array-fields";
- private static final String SIMPLE_OBJECT_FIELDS = "simple-object-fields";
- private static final String CREATE_TENSOR_FIELDS = "create-tensor-fields";
- private static final String REMOVE_TENSOR_FIELDS = "remove-tensor-fields";
- private static final String UPDATE_TENSOR_FIELDS = "update-tensor-fields";
- private static final String REMOVE_MAP_FIELDS = "remove-map-fields";
- private static final String UPDATE_MAP_FIELDS = "update-map-fields";
- private static final String EXCLUDE_FIELDS = "exclude-fields";
- private static final String TESTSET_CONDITION = "condition";
- private static final String PARTIAL_UPDATE_ASSIGN = "assign";
- private static final String PARTIAL_UPDATE_ADD = "add";
- private static final String PARTIAL_UPDATE_REMOVE = "remove";
-
- private static Map<String, String> mapPartialOperationMap;
-
- static {
- mapPartialOperationMap = new HashMap<>();
- mapPartialOperationMap.put(REMOVE_MAP_FIELDS, PARTIAL_UPDATE_REMOVE);
- mapPartialOperationMap.put(UPDATE_MAP_FIELDS, PARTIAL_UPDATE_ASSIGN);
- }
-
- private static Map<String, String> partialOperationMap;
-
- static {
- partialOperationMap = new HashMap<>();
- partialOperationMap.put(REMOVE_TENSOR_FIELDS, PARTIAL_UPDATE_REMOVE);
- partialOperationMap.put(UPDATE_TENSOR_FIELDS, PARTIAL_UPDATE_ADD);
- partialOperationMap.put(REMOVE_MAP_FIELDS, PARTIAL_UPDATE_REMOVE);
- partialOperationMap.put(UPDATE_MAP_FIELDS, PARTIAL_UPDATE_ASSIGN);
- }
-
- private final boolean verbose;
- private final String template;
- private final Operation operation;
- private final Properties properties;
- private PigStatusReporter statusReporter;
-
- public VespaDocumentOperation(String... params) {
- statusReporter = PigStatusReporter.getInstance();
- if (statusReporter != null) {
- statusReporter.incrCounter("Vespa Document Operation Counters", "Document operation ok", 0);
- statusReporter.incrCounter("Vespa Document Operation Counters", "Document operation failed", 0);
- }
- properties = VespaConfiguration.loadProperties(params);
- template = properties.getProperty(PROPERTY_ID_TEMPLATE);
- operation = Operation.fromString(properties.getProperty(PROPERTY_OPERATION, "put"));
- verbose = Boolean.parseBoolean(properties.getProperty(PROPERTY_VERBOSE, "false"));
- }
-
- @Override
- public String exec(Tuple tuple) throws IOException {
- if (tuple == null || tuple.size() == 0) {
- if (statusReporter != null) {
- statusReporter.incrCounter("Vespa Document Operation Counters", "Document operation failed", 1);
- }
- return null;
- }
- if (template == null || template.length() == 0) {
- if (statusReporter != null) {
- statusReporter.incrCounter("Vespa Document Operation Counters", "Document operation failed", 1);
- }
- warnLog("No valid document id template found. Skipping.", PigWarning.UDF_WARNING_1);
- return null;
- }
- if (operation == null) {
- warnLog("No valid operation found. Skipping.", PigWarning.UDF_WARNING_2);
- return null;
- }
-
- String json = null;
-
- try {
- if (reporter != null) {
- reporter.progress();
- }
-
- Schema inputSchema = getInputSchema();
- Map<String, Object> fields = TupleTools.tupleMap(inputSchema, tuple);
- String docId = TupleTools.toString(fields, template);
- if (verbose) {
- System.out.println("Processing docId: "+ docId);
- }
- // create json
- json = create(operation, docId, fields, properties, inputSchema);
- if (json == null || json.length() == 0) {
- warnLog("No valid document operation could be created.", PigWarning.UDF_WARNING_3);
- return null;
- }
-
-
- } catch (Exception e) {
- if (statusReporter != null) {
- statusReporter.incrCounter("Vespa Document Operation Counters", "Document operation failed", 1);
- }
- StringBuilder sb = new StringBuilder();
- sb.append("Caught exception processing input row: \n");
- sb.append(tuple.toString());
- sb.append("\nException: ");
- sb.append(getStackTraceAsString(e));
- warnLog(sb.toString(), PigWarning.UDF_WARNING_4);
- return null;
- }
- if (statusReporter != null) {
- statusReporter.incrCounter("Vespa Document Operation Counters", "Document operation ok", 1);
- }
- return json;
- }
-
-
- /**
- * Create a JSON Vespa document operation given the supplied fields,
- * operation and document id template.
- *
- * @param op Operation (put, remove, update)
- * @param docId Document id
- * @param fields Fields to put in document operation
- * @return A valid JSON Vespa document operation
- * @throws IOException ...
- */
- public static String create(Operation op, String docId, Map<String, Object> fields, Properties properties,
- Schema schema) throws IOException {
- if (op == null) {
- return null;
- }
- if (docId == null || docId.length() == 0) {
- return null;
- }
- if (fields.isEmpty()) {
- return null;
- }
-
- // create json format
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- JsonGenerator g = new JsonFactory().createGenerator(out, JsonEncoding.UTF8);
- g.writeStartObject();
-
- g.writeStringField(op.toString(), docId);
-
- boolean createIfNonExistent = Boolean.parseBoolean(properties.getProperty(PROPERTY_CREATE_IF_NON_EXISTENT, "false"));
- if (op == Operation.UPDATE && createIfNonExistent) {
- writeField("create", true, DataType.BOOLEAN, g, properties, schema, op, 0);
- }
- String testSetConditionTemplate = properties.getProperty(TESTSET_CONDITION);
- if (testSetConditionTemplate != null) {
- String testSetCondition = TupleTools.toString(fields, testSetConditionTemplate);
- writeField(TESTSET_CONDITION, testSetCondition, DataType.CHARARRAY, g, properties, schema, op, 0);
- }
- if (op != Operation.REMOVE) {
- writeField("fields", fields, DataType.MAP, g, properties, schema, op, 0);
- }
-
- g.writeEndObject();
- g.close();
-
- return out.toString();
- }
-
- private static String getPartialOperation(Map<String, String> operationMap, String name, Properties properties) {
- // This function checks if the property of the name falls into the map provided
- // if yes, return the desired operation. if no, return null
- // for example, input:
- // operationMap map{"update-map-fields":"assign","remove-map-fields":"remove"}
- // name date
- // properties "update-map-fields":"date,month"
- // output: assign
- for (String label : operationMap.keySet()) {
- if (properties.getProperty(label) != null) {
- String[] p = properties.getProperty(label).split(",");
- if (Arrays.asList(p).contains(name)) {
- return operationMap.get(label);
- }
- }
- }
- return null;
- }
-
- @SuppressWarnings("unchecked")
- private static void writeField(String name, Object value, Byte type, JsonGenerator g, Properties properties, Schema schema, Operation op, int depth) throws IOException {
- if (shouldWriteField(name, properties, depth)) {
- String operation = getPartialOperation(mapPartialOperationMap, name, properties);
- // check if the name has the property update-map-fields/remove-map-fields
- // if yes, we need special treatments here as we need to loop through the tuple
- // be aware the the operation here is not vespa operation such as "put" and "update"
- // operation here are the field name we wish use to such as "assign" and "remove"
- if (operation != null) {
- writePartialUpdateAndRemoveMap(name, value, g, properties, schema, op, depth, operation);
- } else {
- g.writeFieldName(name);
- if (shouldWritePartialUpdate(op, depth)) {
- writePartialUpdate(value, type, g, name, properties, schema, op, depth);
- } else {
- writeValue(value, type, g, name, properties, schema, op, depth);
- }
- }
-
- }
- }
-
- private static void writePartialUpdateAndRemoveMap(String name, Object value, JsonGenerator g, Properties properties, Schema schema, Operation op, int depth, String operation) throws IOException {
- schema = (schema != null) ? schema.getField(0).schema : null;
- // extract the key of map and keys in map for writing json when partial updating maps
- Schema valueSchema = (schema != null) ? schema.getField(1).schema : null;
- // data format { ( key; id, value: (abc,123,(123234,bbaa))) }
- // the first element of each tuple in the bag will be the map to update
- // the second element of each tuple in the bag will be the new value of the map
- DataBag bag = (DataBag) value;
- for (Tuple element : bag) {
- if (element.size() != 2) {
- continue;
- }
- String k = (String) element.get(0);
- Object v = element.get(1);
- Byte t = DataType.findType(v);
- if (t == DataType.TUPLE) {
- g.writeFieldName(name + "{" + k + "}");
- if (operation.equals(PARTIAL_UPDATE_REMOVE)) {
- g.writeStartObject();
- g.writeFieldName(PARTIAL_UPDATE_REMOVE);
- g.writeNumber(0);
- g.writeEndObject();
- } else {
- writePartialUpdate(v, t, g, name, properties, valueSchema, op, depth);
- }
- }
- }
- }
-
- @SuppressWarnings("unchecked")
- private static void writeValue(Object value, Byte type, JsonGenerator g, String name, Properties properties, Schema schema, Operation op, int depth) throws IOException {
- switch (type) {
- case DataType.UNKNOWN:
- break;
- case DataType.NULL:
- g.writeNull();
- break;
- case DataType.BOOLEAN:
- g.writeBoolean((boolean) value);
- break;
- case DataType.INTEGER:
- g.writeNumber((int) value);
- break;
- case DataType.LONG:
- g.writeNumber((long) value);
- break;
- case DataType.FLOAT:
- g.writeNumber((float) value);
- break;
- case DataType.DOUBLE:
- g.writeNumber((double) value);
- break;
- case DataType.DATETIME:
- g.writeNumber(((DateTime) value).getMillis());
- break;
- case DataType.BYTEARRAY:
- DataByteArray bytes = (DataByteArray) value;
- String raw = Base64.getEncoder().encodeToString(bytes.get());
- g.writeString(raw);
- break;
- case DataType.CHARARRAY:
- g.writeString((String) value);
- break;
- case DataType.BIGINTEGER:
- g.writeNumber((BigInteger) value);
- break;
- case DataType.BIGDECIMAL:
- g.writeNumber((BigDecimal) value);
- break;
- case DataType.MAP:
- g.writeStartObject();
- Map<Object, Object> map = (Map<Object, Object>) value;
- if (shouldCreateTensor(map, name, properties)) {
- if (isRemoveTensor(name, properties)) {
- writeRemoveTensor(map, g);
- } else {
- writeTensor(map, g);
- }
- } else {
- for (Map.Entry<Object, Object> entry : map.entrySet()) {
- String k = entry.getKey().toString();
- Object v = entry.getValue();
- Byte t = DataType.findType(v);
- Schema fieldSchema = (schema != null) ? schema.getField(k).schema : null;
- writeField(k, v, t, g, properties, fieldSchema, op, depth + 1);
- }
- }
- g.writeEndObject();
- break;
- case DataType.TUPLE:
- Tuple tuple = (Tuple) value;
- if (shouldWriteTupleAsMap(name, properties)) {
- Map<String, Object> fields = TupleTools.tupleMap(schema, tuple);
- writeValue(fields, DataType.MAP, g, name, properties, schema, op, depth);
- } else {
- boolean writeStartArray = shouldWriteTupleStart(tuple, name, properties);
- if (writeStartArray) {
- g.writeStartArray();
- }
- for (Object v : tuple) {
- writeValue(v, DataType.findType(v), g, name, properties, schema, op, depth);
- }
- if (writeStartArray) {
- g.writeEndArray();
- }
- }
- break;
- case DataType.BAG:
- DataBag bag = (DataBag) value;
- // get the schema of the tuple in bag
- schema = (schema != null) ? schema.getField(0).schema : null;
- if (shouldWriteBagAsMap(name, properties)) {
- // when treating bag as map, the schema of bag should be {(key, val)....}
- // the size of tuple in bag should be 2. 1st one is key. 2nd one is val.
- Schema valueSchema = (schema != null) ? schema.getField(1).schema : null;
-
- g.writeStartObject();
- for (Tuple element : bag) {
- if (element.size() != 2) {
- continue;
- }
- String k = (String) element.get(0);
- Object v = element.get(1);
- Byte t = DataType.findType(v);
- if (t == DataType.TUPLE) {
- Map<String, Object> fields = TupleTools.tupleMap(valueSchema, (Tuple) v);
- writeField(k, fields, DataType.MAP, g, properties, valueSchema, op, depth + 1);
- } else {
- writeField(k, v, t, g, properties, valueSchema, op, depth + 1);
- }
- }
- g.writeEndObject();
- } else {
- g.writeStartArray();
- for (Tuple t : bag) {
- writeValue(t, DataType.TUPLE, g, name, properties, schema, op, depth);
- }
- g.writeEndArray();
- }
- break;
- }
-
- }
-
- private static boolean shouldWritePartialUpdate(Operation op, int depth) {
- return op == Operation.UPDATE && depth == 1;
- }
-
- private static void writePartialUpdate(Object value, Byte type, JsonGenerator g, String name, Properties properties, Schema schema, Operation op, int depth) throws IOException {
- g.writeStartObject();
- // here we check if the operation falls into the four partial operations we do on map/tensor structure
- // if no, we assume it's a update on the whole document and we write assign here
- // if yes, we write the desired operation here
- String operation = getPartialOperation(partialOperationMap, name, properties);
- if (operation != null) {
- g.writeFieldName(operation);
- } else {
- g.writeFieldName(PARTIAL_UPDATE_ASSIGN);
- }
- writeValue(value, type, g, name, properties, schema, op, depth);
- g.writeEndObject();
- }
-
- private static boolean shouldWriteTupleStart(Tuple tuple, String name, Properties properties) {
- if (tuple.size() > 1 || properties == null) {
- return true;
- }
- String simpleArrayFields = properties.getProperty(SIMPLE_ARRAY_FIELDS);
- if (simpleArrayFields == null) {
- return true;
- }
- if (simpleArrayFields.equals("*")) {
- return false;
- }
- String[] fields = simpleArrayFields.split(",");
- for (String field : fields) {
- if (field.trim().equalsIgnoreCase(name)) {
- return false;
- }
- }
- return true;
- }
-
- private static boolean shouldWriteTupleAsMap(String name, Properties properties) {
- // include UPDATE_MAP_FIELDS here because when updating the map
- // the second element in each tuple should be written as a map
- if (properties == null) {
- return false;
- }
- String addBagAsMapFields = properties.getProperty(UPDATE_MAP_FIELDS);
- String simpleObjectFields = properties.getProperty(SIMPLE_OBJECT_FIELDS);
- if (simpleObjectFields == null && addBagAsMapFields == null) {
- return false;
- }
- if (addBagAsMapFields != null) {
- if (addBagAsMapFields.equals("*")) {
- return true;
- }
- String[] fields = addBagAsMapFields.split(",");
- for (String field : fields) {
- if (field.trim().equalsIgnoreCase(name)) {
- return true;
- }
- }
-
- }
- if (simpleObjectFields != null) {
- if (simpleObjectFields.equals("*")) {
- return true;
- }
- String[] fields = simpleObjectFields.split(",");
- for (String field : fields) {
- if (field.trim().equalsIgnoreCase(name)) {
- return true;
- }
- }
- }
- return false;
- }
-
- private static boolean shouldWriteBagAsMap(String name, Properties properties) {
- if (properties == null) {
- return false;
- }
- String bagAsMapFields = properties.getProperty(BAG_AS_MAP_FIELDS);
- if (bagAsMapFields == null) {
- return false;
- }
- if (bagAsMapFields.equals("*")) {
- return true;
- }
- String[] fields = bagAsMapFields.split(",");
- for (String field : fields) {
- if (field.trim().equalsIgnoreCase(name)) {
- return true;
- }
- }
- return false;
- }
-
- private static boolean shouldCreateTensor(Map<Object, Object> map, String name, Properties properties) {
- if (properties == null) {
- return false;
- }
- String createTensorFields = properties.getProperty(CREATE_TENSOR_FIELDS);
- String addTensorFields = properties.getProperty(UPDATE_TENSOR_FIELDS);
- String removeTensorFields = properties.getProperty(REMOVE_TENSOR_FIELDS);
-
- if (createTensorFields == null && addTensorFields == null && removeTensorFields == null) {
- return false;
- }
- String[] fields;
- if (createTensorFields != null) {
- fields = createTensorFields.split(",");
- for (String field : fields) {
- if (field.trim().equalsIgnoreCase(name)) {
- return true;
- }
- }
- }
- if (addTensorFields != null) {
- fields = addTensorFields.split(",");
- for (String field : fields) {
- if (field.trim().equalsIgnoreCase(name)) {
- return true;
- }
- }
- }
- if (removeTensorFields != null) {
- fields = removeTensorFields.split(",");
- for (String field : fields) {
- if (field.trim().equalsIgnoreCase(name)) {
- return true;
- }
- }
- }
- return false;
- }
-
- private static boolean isRemoveTensor(String name, Properties properties) {
- if (properties == null) {
- return false;
- }
- String removeTensorFields = properties.getProperty(REMOVE_TENSOR_FIELDS);
- if (removeTensorFields == null) {
- return false;
- }
- String[] fields = removeTensorFields.split(",");
- for (String field : fields) {
- if (field.trim().equalsIgnoreCase(name)) {
- return true;
- }
- }
- return false;
- }
-
- private static boolean shouldWriteField(String name, Properties properties, int depth) {
- if (properties == null || depth != 1) {
- return true;
- }
- String excludeFields = properties.getProperty(EXCLUDE_FIELDS);
- if (excludeFields == null) {
- return true;
- }
- String[] fields = excludeFields.split(",");
- for (String field : fields) {
- if (field.trim().equalsIgnoreCase(name)) {
- return false;
- }
- }
- return true;
- }
-
- private static void writeTensor(Map<Object, Object> map, JsonGenerator g) throws IOException {
- g.writeFieldName("cells");
- g.writeStartArray();
- for (Map.Entry<Object, Object> entry : map.entrySet()) {
- String k = entry.getKey().toString();
- Double v = Double.parseDouble(entry.getValue().toString());
-
- g.writeStartObject();
-
- // Write address
- g.writeFieldName("address");
- g.writeStartObject();
-
- String[] dimensions = k.split(",");
- for (String dimension : dimensions) {
- if (dimension == null || dimension.isEmpty()) {
- continue;
- }
- String[] address = dimension.split(":");
- if (address.length != 2) {
- throw new IllegalArgumentException("Malformed cell address: " + dimension);
- }
- String dim = address[0];
- String label = address[1];
- if (dim == null || label == null || dim.isEmpty() || label.isEmpty()) {
- throw new IllegalArgumentException("Malformed cell address: " + dimension);
- }
- g.writeFieldName(dim.trim());
- g.writeString(label.trim());
- }
- g.writeEndObject();
-
- // Write value
- g.writeFieldName("value");
- g.writeNumber(v);
-
- g.writeEndObject();
- }
- g.writeEndArray();
- }
-
- private static void writeRemoveTensor(Map<Object, Object> map, JsonGenerator g) throws IOException {
- g.writeFieldName("addresses");
- g.writeStartArray();
- for (Map.Entry<Object, Object> entry : map.entrySet()) {
- String k = entry.getKey().toString();
- String[] dimensions = k.split(",");
- for (String dimension : dimensions) {
- g.writeStartObject();
- if (dimension == null || dimension.isEmpty()) {
- continue;
- }
- String[] address = dimension.split(":");
- if (address.length != 2) {
- throw new IllegalArgumentException("Malformed cell address: " + dimension);
- }
- String dim = address[0];
- String label = address[1];
- if (dim == null || label == null || dim.isEmpty() || label.isEmpty()) {
- throw new IllegalArgumentException("Malformed cell address: " + dimension);
- }
- g.writeFieldName(dim.trim());
- g.writeString(label.trim());
- g.writeEndObject();
- // Write address
- }
- }
- g.writeEndArray();
- }
-
- // copied from vespajlib for reducing dependency and building with JDK 8
- private static String getStackTraceAsString(Throwable throwable) {
- try (StringWriter stringWriter = new StringWriter();
- PrintWriter printWriter = new PrintWriter(stringWriter, true)) {
- throwable.printStackTrace(printWriter);
- return stringWriter.getBuffer().toString();
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
- }
-
- // wrapper to emit logs
- private void warnLog(String msg, PigWarning warning) {
- warn(msg, warning);
- System.err.println(msg);
- }
-}
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaQuery.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaQuery.java
deleted file mode 100644
index 1d50f2909a2..00000000000
--- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaQuery.java
+++ /dev/null
@@ -1,114 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.hadoop.pig;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration;
-import com.yahoo.vespa.hadoop.mapreduce.util.VespaHttpClient;
-import com.yahoo.vespa.hadoop.mapreduce.util.VespaQuerySchema;
-import com.yahoo.vespa.hadoop.mapreduce.util.TupleTools;
-import org.apache.pig.EvalFunc;
-import org.apache.pig.data.*;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.util.UDFContext;
-
-import java.io.IOException;
-import java.util.*;
-
-/**
- * A Pig UDF to run a query against a Vespa cluster and return the
- * results.
- *
- * @author lesters
- */
-public class VespaQuery extends EvalFunc<DataBag> {
-
- private final String PROPERTY_QUERY_TEMPLATE = "query";
- private final String PROPERTY_QUERY_SCHEMA = "schema";
- private final String PROPERTY_ROOT_NODE = "rootnode";
-
- private final VespaConfiguration configuration;
- private final Properties properties;
- private final String queryTemplate;
- private final String querySchema;
- private final String queryRootNode;
-
- private VespaHttpClient httpClient;
-
- public VespaQuery() {
- this(new String[0]);
- }
-
- public VespaQuery(String... params) {
- configuration = VespaConfiguration.get(UDFContext.getUDFContext().getJobConf(), null);
- properties = VespaConfiguration.loadProperties(params);
-
- queryTemplate = properties.getProperty(PROPERTY_QUERY_TEMPLATE);
- if (queryTemplate == null || queryTemplate.isEmpty()) {
- throw new IllegalArgumentException("Query template cannot be empty");
- }
-
- querySchema = properties.getProperty(PROPERTY_QUERY_SCHEMA, "rank:int,id:chararray");
- queryRootNode = properties.getProperty(PROPERTY_ROOT_NODE, "root/children");
- }
-
- @Override
- public DataBag exec(Tuple input) throws IOException {
- if (input == null || input.size() == 0) {
- return null;
- }
- JsonNode jsonResult = queryVespa(input);
- if (jsonResult == null) {
- return null;
- }
- return createPigRepresentation(jsonResult);
- }
-
- @Override
- public Schema outputSchema(Schema input) {
- return VespaQuerySchema.getPigSchema(querySchema);
- }
-
-
- private JsonNode queryVespa(Tuple input) throws IOException {
- String url = createVespaQueryUrl(input);
- if (url == null) {
- return null;
- }
- String result = executeVespaQuery(url);
- return parseVespaResultJson(result);
- }
-
-
- private String createVespaQueryUrl(Tuple input) throws IOException {
- return TupleTools.toString(getInputSchema(), input, queryTemplate);
- }
-
-
- private String executeVespaQuery(String url) throws IOException {
- if (httpClient == null) {
- httpClient = new VespaHttpClient(configuration);
- }
- return httpClient.get(url);
- }
-
- private JsonNode parseVespaResultJson(String result) throws IOException {
- return httpClient == null ? null : httpClient.parseResultJson(result, queryRootNode);
- }
-
- private DataBag createPigRepresentation(JsonNode hits) {
- DataBag bag = new SortedDataBag(null);
- VespaQuerySchema querySchema = new VespaQuerySchema(this.querySchema);
-
- for (int rank = 0; rank < hits.size(); ++rank) {
- JsonNode hit = hits.get(rank);
- Tuple tuple = querySchema.buildTuple(rank, hit);
- bag.add(tuple);
- }
-
- return bag;
- }
-
-
-
-
-}
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaSimpleJsonLoader.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaSimpleJsonLoader.java
deleted file mode 100644
index 9dc294ce243..00000000000
--- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaSimpleJsonLoader.java
+++ /dev/null
@@ -1,63 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.hadoop.pig;
-
-import com.yahoo.vespa.hadoop.mapreduce.VespaSimpleJsonInputFormat;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.pig.LoadFunc;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-
-import java.io.IOException;
-
-/**
- * Simple JSON loader which loads either one JSON object per line or a
- * multiline JSON consisting of objects in an array.
- *
- * Returns only the textual representation of the JSON object.
- *
- * @author lesters
- */
-@SuppressWarnings("rawtypes")
-public class VespaSimpleJsonLoader extends LoadFunc {
-
- private TupleFactory tupleFactory = TupleFactory.getInstance();
- private VespaSimpleJsonInputFormat.VespaJsonRecordReader recordReader;
-
- @Override
- public void setLocation(String location, Job job) throws IOException {
- FileInputFormat.setInputPaths(job, location);
- }
-
- @Override
- public InputFormat getInputFormat() throws IOException {
- return new VespaSimpleJsonInputFormat();
- }
-
- @Override
- public void prepareToRead(RecordReader reader, PigSplit split) throws IOException {
- recordReader = (VespaSimpleJsonInputFormat.VespaJsonRecordReader) reader;
- }
-
- @Override
- public Tuple getNext() throws IOException {
- try {
- boolean done = recordReader.nextKeyValue();
- if (done) {
- return null;
- }
- Text json = recordReader.getCurrentKey();
- if (json == null) {
- return null;
- }
- return tupleFactory.newTuple(json.toString());
-
- } catch (InterruptedException ex) {
- return null;
- }
- }
-}
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaStorage.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaStorage.java
deleted file mode 100644
index a564dfac25d..00000000000
--- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaStorage.java
+++ /dev/null
@@ -1,178 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.hadoop.pig;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.yahoo.vespa.hadoop.mapreduce.VespaOutputFormat;
-import com.yahoo.vespa.hadoop.mapreduce.util.TupleTools;
-import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.pig.ResourceSchema;
-import org.apache.pig.StoreFunc;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.util.UDFContext;
-
-import java.io.*;
-import java.util.Base64;
-import java.util.Map;
-import java.util.Properties;
-
-/**
- * A small Pig UDF wrapper around the Vespa http client for
- * feeding data into a Vespa endpoint.
- *
- * @author lesters
- */
-@SuppressWarnings("rawtypes")
-public class VespaStorage extends StoreFunc {
-
- private final boolean createDocOp;
- private final String template;
- private final VespaDocumentOperation.Operation operation;
-
- private String signature = null;
- private RecordWriter recordWriter = null;
- private ResourceSchema resourceSchema = null;
-
- private static final String PROPERTY_CREATE_DOC_OP = "create-document-operation";
- private static final String PROPERTY_ID_TEMPLATE = "docid";
- private static final String PROPERTY_OPERATION = "operation";
- private static final String PROPERTY_RESOURCE_SCHEMA = "resource_schema";
-
- Properties properties = new Properties();
-
- public VespaStorage() {
- createDocOp = false;
- template = null;
- operation = null;
- }
-
- public VespaStorage(String... params) {
- properties = VespaConfiguration.loadProperties(params);
- createDocOp = Boolean.parseBoolean(properties.getProperty(PROPERTY_CREATE_DOC_OP, "false"));
- operation = VespaDocumentOperation.Operation.fromString(properties.getProperty(PROPERTY_OPERATION, "put"));
- template = properties.getProperty(PROPERTY_ID_TEMPLATE);
- }
-
-
- @Override
- public OutputFormat getOutputFormat() throws IOException {
- return new VespaOutputFormat(properties);
- }
-
-
- @Override
- public void setStoreLocation(String endpoint, Job job) throws IOException {
- properties.setProperty(VespaConfiguration.ENDPOINT, endpoint);
- }
-
-
- @Override
- public void prepareToWrite(RecordWriter recordWriter) throws IOException {
- this.recordWriter = recordWriter;
- this.resourceSchema = getResourceSchema();
- }
-
-
- @SuppressWarnings("unchecked")
- @Override
- public void putNext(Tuple tuple) throws IOException {
- if (tuple == null || tuple.size() == 0) {
- return;
- }
-
- String data = null;
- if (createDocOp) {
- data = createDocumentOperation(tuple);
- } else if (!tuple.isNull(0)) {
- data = tuple.get(0).toString(); // assume single field with correctly formatted doc op.
- }
-
- if (data == null || data.length() == 0) {
- return;
- }
-
- try {
- recordWriter.write(0, data);
- } catch (InterruptedException e) {
- throw new IOException(e);
- }
- }
-
-
- @Override
- public void checkSchema(ResourceSchema resourceSchema) throws IOException {
- setResourceSchema(resourceSchema);
- }
-
-
- @Override
- public String relToAbsPathForStoreLocation(String endpoint, Path path) throws IOException {
- return endpoint;
- }
-
-
- @Override
- public void setStoreFuncUDFContextSignature(String s) {
- this.signature = s;
- }
-
-
- @Override
- public void cleanupOnFailure(String s, Job job) throws IOException {
- }
-
-
- @Override
- public void cleanupOnSuccess(String s, Job job) throws IOException {
- }
-
-
- private ResourceSchema getResourceSchema() throws IOException {
- Properties properties = getUDFProperties();
- return base64Deserialize(properties.getProperty(PROPERTY_RESOURCE_SCHEMA));
- }
-
-
- private void setResourceSchema(ResourceSchema schema) throws IOException {
- Properties properties = getUDFProperties();
- if (properties.getProperty(PROPERTY_RESOURCE_SCHEMA) == null) {
- properties.setProperty(PROPERTY_RESOURCE_SCHEMA, base64Serialize(schema));
- }
- }
-
-
- private Properties getUDFProperties() {
- String[] context = { signature };
- return UDFContext.getUDFContext().getUDFProperties(getClass(), context);
- }
-
-
- private String createDocumentOperation(Tuple tuple) throws IOException {
- if (tuple == null || tuple.size() == 0) {
- return null;
- }
- if (resourceSchema == null) {
- return null;
- }
-
- Map<String, Object> fields = TupleTools.tupleMap(resourceSchema, tuple);
- String docId = TupleTools.toString(fields, template);
-
- Schema schema = Schema.getPigSchema(resourceSchema);
- return VespaDocumentOperation.create(operation, docId, fields, properties, schema);
- }
-
- public static String base64Serialize(ResourceSchema resourceSchema) throws IOException {
- byte[] bytes = new ObjectMapper().writeValueAsBytes(resourceSchema);
- return Base64.getEncoder().encodeToString(bytes);
- }
-
- public static ResourceSchema base64Deserialize(String s) throws IOException {
- byte[] data = Base64.getDecoder().decode(s);
- return new ObjectMapper().readValue(data, ResourceSchema.class);
- }
-}
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/package-info.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/package-info.java
deleted file mode 100644
index 686765ac047..00000000000
--- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/package-info.java
+++ /dev/null
@@ -1,10 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-/**
- * com.yahoo.vespa.hadoop.pig contains classes and utilities
- * to enable feeding directly to Vespa endpoints from pig.
- * It is a minimal layer over the Vespa HTTP client.
- *
- * NOTE: This is a PUBLIC API, but not annotated as such because this is not a bundle and
- * we don't want to introduce Vespa dependencies.
- */
-package com.yahoo.vespa.hadoop.pig;
diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/MapReduceTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/MapReduceTest.java
deleted file mode 100644
index d56cd818de2..00000000000
--- a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/MapReduceTest.java
+++ /dev/null
@@ -1,200 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.hadoop.pig;
-
-import com.fasterxml.jackson.core.JsonEncoding;
-import com.fasterxml.jackson.core.JsonFactory;
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.yahoo.vespa.hadoop.mapreduce.VespaOutputFormat;
-import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration;
-import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.test.PathUtils;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
-
-import java.io.BufferedInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.StringTokenizer;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-public class MapReduceTest {
-
- protected static File hdfsBaseDir;
- protected static FileSystem hdfs;
- protected static Configuration conf;
- protected static MiniDFSCluster cluster;
-
- protected static Path metricsJsonPath;
- protected static Path metricsCsvPath;
-
- @BeforeAll
- public static void setUp() throws IOException {
- hdfsBaseDir = new File(PathUtils.getTestDir(MapReduceTest.class).getCanonicalPath());
-
- conf = new HdfsConfiguration();
- conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, hdfsBaseDir.getAbsolutePath());
- conf.set(VespaConfiguration.DRYRUN, "true");
- conf.set(VespaConfiguration.ENDPOINT, "endpoint-does-not-matter-in-dryrun");
-
- cluster = new MiniDFSCluster.Builder(conf).build();
- hdfs = FileSystem.get(conf);
-
- metricsJsonPath = new Path("metrics_json");
- metricsCsvPath = new Path("metrics_csv");
- copyToHdfs("src/test/resources/operations_data.json", metricsJsonPath, "data");
- copyToHdfs("src/test/resources/tabular_data.csv", metricsCsvPath, "data");
- }
-
- @AfterAll
- public static void tearDown() throws IOException {
- Path testDir = new Path(hdfsBaseDir.getParent());
- hdfs.delete(testDir, true);
- cluster.shutdown();
- LocalFileSystem localFileSystem = FileSystem.getLocal(conf);
- localFileSystem.delete(testDir, true);
- }
-
- @Test
- public void requireThatMapOnlyJobSucceeds() throws Exception {
- Job job = Job.getInstance(conf);
- job.setJarByClass(MapReduceTest.class);
- job.setMapperClass(FeedMapper.class);
- job.setOutputFormatClass(VespaOutputFormat.class);
- job.setMapOutputValueClass(Text.class);
-
- FileInputFormat.setInputPaths(job, metricsJsonPath);
-
- boolean success = job.waitForCompletion(true);
- assertTrue(success, "Job Failed");
-
- VespaCounters counters = VespaCounters.get(job);
- assertEquals(10, counters.getDocumentsSent());
- assertEquals(0, counters.getDocumentsFailed());
- assertEquals(10, counters.getDocumentsOk());
- }
-
- @Test
- public void requireThatMapReduceJobSucceeds() throws Exception {
- Job job = Job.getInstance(conf);
- job.setJarByClass(MapReduceTest.class);
- job.setMapperClass(FeedMapper.class);
- job.setOutputFormatClass(VespaOutputFormat.class);
- job.setMapOutputValueClass(Text.class);
- job.setReducerClass(FeedReducer.class);
- job.setNumReduceTasks(1);
-
- FileInputFormat.setInputPaths(job, metricsJsonPath);
-
- boolean success = job.waitForCompletion(true);
- assertTrue(success, "Job Failed");
-
- VespaCounters counters = VespaCounters.get(job);
- assertEquals(10, counters.getDocumentsSent());
- assertEquals(0, counters.getDocumentsFailed());
- assertEquals(10, counters.getDocumentsOk());
- }
-
-
- @Test
- public void requireThatTransformMapJobSucceeds() throws Exception {
- Job job = Job.getInstance(conf);
- job.setJarByClass(MapReduceTest.class);
- job.setMapperClass(ParsingMapper.class);
- job.setOutputFormatClass(VespaOutputFormat.class);
- job.setMapOutputValueClass(Text.class);
- job.setReducerClass(FeedReducer.class);
- job.setNumReduceTasks(1);
-
- FileInputFormat.setInputPaths(job, metricsCsvPath);
-
- boolean success = job.waitForCompletion(true);
- assertTrue(success, "Job Failed");
-
- VespaCounters counters = VespaCounters.get(job);
- assertEquals(10, counters.getDocumentsSent());
- assertEquals(0, counters.getDocumentsFailed());
- assertEquals(10, counters.getDocumentsOk());
- assertEquals(0, counters.getDocumentsSkipped());
- }
-
-
- private static void copyToHdfs(String localFile, Path hdfsDir, String hdfsName) throws IOException {
- Path hdfsPath = new Path(hdfsDir, hdfsName);
- FSDataOutputStream out = hdfs.create(hdfsPath);
-
- try (InputStream in = new BufferedInputStream(new FileInputStream(localFile))) {
- int len;
- byte[] buffer = new byte[1024];
- while ((len = in.read(buffer)) > 0) {
- out.write(buffer, 0, len);
- }
- } finally {
- out.close();
- }
- }
-
- public static class FeedMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
- public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
- context.write(key, value);
- }
- }
-
- public static class FeedReducer extends Reducer<Object, Text, LongWritable, Text> {
- public void reduce(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
- context.write(key, value);
- }
- }
-
- public static class ParsingMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
- public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
- String line = value.toString();
- if (line == null || line.length() == 0)
- return;
-
- StringTokenizer tokenizer = new StringTokenizer(line);
- long date = Long.parseLong(tokenizer.nextToken());
- String metricName = tokenizer.nextToken();
- long metricValue = Long.parseLong(tokenizer.nextToken());
- String application = tokenizer.nextToken();
-
- String docid = "id:"+application+":metric::"+metricName+"-"+date;
-
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- JsonGenerator g = new JsonFactory().createGenerator(out, JsonEncoding.UTF8);
-
- g.writeStartObject();
- g.writeObjectFieldStart("fields");
- g.writeNumberField("date", date);
- g.writeStringField("name", metricName);
- g.writeNumberField("value", metricValue);
- g.writeStringField("application", application);
- g.writeEndObject();
- g.writeStringField("put", docid);
- g.writeEndObject();
- g.close();
-
- context.write(key, new Text(out.toString()));
- }
- }
-
-
-}
diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java
deleted file mode 100644
index ec20e82763c..00000000000
--- a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java
+++ /dev/null
@@ -1,633 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.hadoop.pig;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.pig.data.BagFactory;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataByteArray;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.SortedDataBag;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-public class VespaDocumentOperationTest {
- private final ByteArrayOutputStream outContent = new ByteArrayOutputStream();
- private final PrintStream originalOut = System.out;
-
- @BeforeEach
- public void setUpStreams() {
- System.setOut(new PrintStream(outContent));
- }
-
- @AfterEach
- public void restoreStreams() {
- System.setOut(originalOut);
- }
- @Test
- public void requireThatUDFReturnsCorrectJson() throws Exception {
- String json = getDocumentOperationJson("docid=id:<application>:metrics::<name>-<date>");
- ObjectMapper m = new ObjectMapper();
- JsonNode root = m.readTree(json);
- JsonNode fields = root.path("fields");
-
- // operation put is default
- assertEquals("id:testapp:metrics::clicks-20160112", root.get("put").asText());
- assertEquals("testapp", fields.get("application").asText());
- assertEquals("clicks", fields.get("name").asText());
- assertEquals(3, fields.get("value").asInt());
- }
-
-
- @Test
- public void requireThatUDFSupportsUpdateAssign() throws IOException {
- String json = getDocumentOperationJson("docid=id:<application>:metrics::<name>-<date>", "operation=update");
- ObjectMapper m = new ObjectMapper();
- JsonNode root = m.readTree(json);
- JsonNode fields = root.path("fields");
-
- assertEquals("id:testapp:metrics::clicks-20160112", root.get("update").asText());
- assertEquals("testapp", fields.get("application").get("assign").asText());
- assertEquals("clicks", fields.get("name").get("assign").asText());
- assertEquals(3, fields.get("value").get("assign").asInt());
- }
-
- @Test
- public void requireThatUDFSupportsConditionalUpdateAssign() throws IOException {
- String json = getDocumentOperationJson("docid=id:<application>:metrics::<name>-<date>", "operation=update", "condition=clicks < <value>");
- ObjectMapper m = new ObjectMapper();
- JsonNode root = m.readTree(json);
- JsonNode fields = root.path("fields");
-
- assertEquals("id:testapp:metrics::clicks-20160112", root.get("update").asText());
- assertEquals("clicks < 3", root.get("condition").asText());
- assertEquals("testapp", fields.get("application").get("assign").asText());
- assertEquals("clicks", fields.get("name").get("assign").asText());
- assertEquals(3, fields.get("value").get("assign").asInt());
- }
-
- @Test
- public void requireThatUDFSupportsCreateIfNonExistent() throws IOException {
- String json = getDocumentOperationJson("docid=id:<application>:metrics::<name>-<date>", "operation=update",
- "create-if-non-existent=true");
- ObjectMapper m = new ObjectMapper();
- JsonNode root = m.readTree(json);
- JsonNode fields = root.path("fields");
-
- assertEquals("id:testapp:metrics::clicks-20160112", root.get("update").asText());
- assertTrue(root.get("create").asBoolean());
- assertEquals("testapp", fields.get("application").get("assign").asText());
- assertEquals("clicks", fields.get("name").get("assign").asText());
- assertEquals(3, fields.get("value").get("assign").asInt());
- }
-
-
- @Test
- public void requireThatUDFReturnsNullForMissingConfig() throws Exception {
- String json = getDocumentOperationJson();
- assertNull(json);
- }
-
-
- @Test
- public void requireThatUDFCorrectlyGeneratesRemoveBagAsMapOperation() throws Exception {
- DataBag bag = BagFactory.getInstance().newDefaultBag();
-
- Schema innerObjectSchema = new Schema();
- Tuple innerObjectTuple = TupleFactory.getInstance().newTuple();
- addToTuple("year", DataType.CHARARRAY, "2020", innerObjectSchema, innerObjectTuple);
- addToTuple("month", DataType.INTEGER, 3, innerObjectSchema, innerObjectTuple);
-
- Schema objectSchema = new Schema();
- Tuple objectTuple = TupleFactory.getInstance().newTuple();
- addToTuple("key", DataType.CHARARRAY, "234566", objectSchema, objectTuple);
- addToTuple("value", DataType.TUPLE, innerObjectTuple,innerObjectSchema,objectSchema, objectTuple);
-
- Schema bagSchema = new Schema();
- addToBagWithSchema("firstLayerTuple",DataType.TUPLE,objectTuple,objectSchema,bagSchema,bag);
-
- innerObjectSchema = new Schema();
- innerObjectTuple = TupleFactory.getInstance().newTuple();
- addToTuple("year", DataType.CHARARRAY, "2020", innerObjectSchema, innerObjectTuple);
- addToTuple("month", DataType.INTEGER, 3, innerObjectSchema, innerObjectTuple);
-
- objectSchema = new Schema();
- objectTuple = TupleFactory.getInstance().newTuple();
- addToTuple("key", DataType.CHARARRAY, "123456", objectSchema, objectTuple);
- addToTuple("value", DataType.TUPLE, innerObjectTuple,innerObjectSchema,objectSchema, objectTuple);
-
- addToBagWithSchema("firstLayerTuple",DataType.TUPLE,objectTuple,objectSchema,bagSchema,bag);
-
- Schema schema = new Schema();
- Tuple tuple = TupleFactory.getInstance().newTuple();
- addToTuple("bag", DataType.BAG, bag, bagSchema, schema, tuple);
- addToTuple("id", DataType.CHARARRAY, "123", schema, tuple);
-
- VespaDocumentOperation docOp = new VespaDocumentOperation("docid=id", "remove-map-fields=bag","operation=update");
- docOp.setInputSchema(schema);
- String json = docOp.exec(tuple);
-
- ObjectMapper m = new ObjectMapper();
- JsonNode root = m.readTree(json);
- JsonNode fields = root.get("fields");
- assertEquals("{\"remove\":0}", fields.get("bag{123456}").toString());
- assertEquals("{\"remove\":0}", fields.get("bag{234566}").toString());
-
- }
-
- @Test
- public void requireThatUDFCorrectlyGeneratesAddBagAsMapOperation() throws Exception {
-
- DataBag bag = BagFactory.getInstance().newDefaultBag();
-
- Schema innerObjectSchema = new Schema();
- Tuple innerObjectTuple = TupleFactory.getInstance().newTuple();
- addToTuple("year", DataType.CHARARRAY, "2020", innerObjectSchema, innerObjectTuple);
- addToTuple("month", DataType.INTEGER, 3, innerObjectSchema, innerObjectTuple);
-
- Schema objectSchema = new Schema();
- Tuple objectTuple = TupleFactory.getInstance().newTuple();
- addToTuple("key", DataType.CHARARRAY, "123456", objectSchema, objectTuple);
- addToTuple("value", DataType.TUPLE, innerObjectTuple,innerObjectSchema,objectSchema, objectTuple);
-
- Schema bagSchema = new Schema();
- addToBagWithSchema("firstLayerTuple",DataType.TUPLE,objectTuple,objectSchema,bagSchema,bag);
-
- Schema schema = new Schema();
- Tuple tuple = TupleFactory.getInstance().newTuple();
- addToTuple("bag", DataType.BAG, bag, bagSchema, schema, tuple);
- addToTuple("id", DataType.CHARARRAY, "123", schema, tuple);
- VespaDocumentOperation docOp = new VespaDocumentOperation("docid=id", "update-map-fields=bag","operation=update");
- docOp.setInputSchema(schema);
- String json = docOp.exec(tuple);
-
- ObjectMapper m = new ObjectMapper();
- JsonNode root = m.readTree(json);
-
- JsonNode fields = root.get("fields");
- JsonNode value = fields.get("bag{123456}");
- JsonNode assign = value.get("assign");
- assertEquals("2020", assign.get("year").asText());
- assertEquals(3, assign.get("month").asInt());
- }
-
- @Test
- public void requireThatUDFCorrectlyGeneratesAddTensorOperation() throws Exception {
-
- Schema schema = new Schema();
- Tuple tuple = TupleFactory.getInstance().newTuple();
-
- // Please refer to the tensor format documentation
-
- Map<String, Double> tensor = new HashMap<String, Double>() {{
- put("x:label1,y:label2,z:label4", 2.0);
- put("x:label3", 3.0);
- }};
-
- addToTuple("id", DataType.CHARARRAY, "123", schema, tuple);
- addToTuple("tensor", DataType.MAP, tensor, schema, tuple);
-
- VespaDocumentOperation docOp = new VespaDocumentOperation("docid=empty", "update-tensor-fields=tensor","operation=update");
- docOp.setInputSchema(schema);
- String json = docOp.exec(tuple);
-
- ObjectMapper m = new ObjectMapper();
- JsonNode root = m.readTree(json);
- JsonNode fields = root.get("fields");
- JsonNode tensorValue = fields.get("tensor");
- JsonNode add = tensorValue.get("add");
- JsonNode cells = add.get("cells");
- Iterator<JsonNode> cellsIterator = cells.iterator();
-
- JsonNode element = cellsIterator.next();
- assertEquals("label1", element.get("address").get("x").asText());
- assertEquals("label2", element.get("address").get("y").asText());
- assertEquals("label4", element.get("address").get("z").asText());
- assertEquals("2.0", element.get("value").toString());
-
- element = cellsIterator.next();
- assertEquals("label3", element.get("address").get("x").asText());
- assertEquals("3.0", element.get("value").toString());
- }
-
- @Test
- public void requireThatUDFCorrectlyGeneratesRemoveTensorOperation() throws Exception {
-
- Schema schema = new Schema();
- Tuple tuple = TupleFactory.getInstance().newTuple();
-
- // Please refer to the tensor format documentation
-
- Map<String, Double> tensor = new HashMap<String, Double>() {{
- put("x:label1,y:label2,z:label4", 2.0);
- put("x:label3", 3.0);
- }};
-
- addToTuple("id", DataType.CHARARRAY, "123", schema, tuple);
- addToTuple("tensor", DataType.MAP, tensor, schema, tuple);
-
- VespaDocumentOperation docOp = new VespaDocumentOperation("docid=empty", "remove-tensor-fields=tensor","operation=update");
- docOp.setInputSchema(schema);
- String json = docOp.exec(tuple);
-
- ObjectMapper m = new ObjectMapper();
- JsonNode root = m.readTree(json);
- JsonNode fields = root.get("fields");
- JsonNode tensorValue = fields.get("tensor");
- JsonNode remove = tensorValue.get("remove");
- JsonNode address = remove.get("addresses");
-
- Iterator<JsonNode> addressIterator = address.iterator();
-
- JsonNode element = addressIterator.next();
- assertEquals("label1", element.get("x").asText());
-
- element = addressIterator.next();
- assertEquals("label2", element.get("y").asText());
-
- element = addressIterator.next();
- assertEquals("label4", element.get("z").asText());
-
- element = addressIterator.next();
- assertEquals("label3", element.get("x").asText());
- }
-
- @Test
- public void requireThatUDFReturnsNullWhenExceptionHappens() throws IOException {
- Schema schema = new Schema();
- Tuple tuple = TupleFactory.getInstance().newTuple();
-
- // broken DELTA format that would throw internally
- Map<String, Double> tensor = new HashMap<String, Double>() {{
- put("xlabel1", 2.0); // missing : between 'x' and 'label1'
- }};
-
- addToTuple("id", DataType.CHARARRAY, "123", schema, tuple);
- addToTuple("tensor", DataType.MAP, tensor, schema, tuple);
-
- VespaDocumentOperation docOp = new VespaDocumentOperation("docid=empty", "create-tensor-fields=tensor");
- docOp.setInputSchema(schema);
- String json = docOp.exec(tuple);
-
- assertNull(json);
- }
-
- @Test
- public void requireThatUDFCorrectlyGeneratesRemoveOperation() throws Exception {
- String json = getDocumentOperationJson("operation=remove", "docid=id:<application>:metrics::<name>-<date>");
- ObjectMapper m = new ObjectMapper();
- JsonNode root = m.readTree(json);
- JsonNode fields = root.get("fields");
-
- assertEquals("id:testapp:metrics::clicks-20160112", root.get("remove").asText());
- assertNull(fields);
- }
-
-
- @Test
- public void requireThatUDFGeneratesComplexDataTypes() throws Exception {
- Schema schema = new Schema();
- Tuple tuple = TupleFactory.getInstance().newTuple();
-
- Tuple intTuple = TupleFactory.getInstance().newTuple();
- int[] intArray = {1, 2, 3};
- for (int i : intArray) { intTuple.append(i); }
-
- Tuple stringTuple = TupleFactory.getInstance().newTuple();
- String[] stringArray = {"a", "b", "c"};
- for (String s : stringArray) { stringTuple.append(s); }
-
- DataBag bag = new SortedDataBag(null);
- bag.add(intTuple);
- bag.add(stringTuple);
-
- Map<String, Object> innerMap = new HashMap<String, Object>() {{
- put("a", "string");
- put("tuple", intTuple);
- }};
-
- DataByteArray bytes = new DataByteArray("testdata".getBytes());
-
- Map<String, Object> outerMap = new HashMap<String, Object>() {{
- put("string", "value");
- put("int", 3);
- put("float", 3.145);
- put("bool", true);
- put("byte", bytes);
- put("map", innerMap);
- put("bag", bag);
- }};
-
- addToTuple("map", DataType.MAP, outerMap, schema, tuple);
-
- VespaDocumentOperation docOp = new VespaDocumentOperation("docid=empty");
- docOp.setInputSchema(schema);
- String json = docOp.exec(tuple);
-
- ObjectMapper m = new ObjectMapper();
- JsonNode root = m.readTree(json);
- JsonNode fields = root.get("fields");
- JsonNode map = fields.get("map");
-
- assertEquals("value", map.get("string").asText());
- assertEquals(3, map.get("int").asInt());
- assertEquals(3.145, map.get("float").asDouble(), 1e-6);
- assertTrue(map.get("bool").asBoolean());
- assertEquals("dGVzdGRhdGE=", map.get("byte").asText());
-
- assertEquals("string", map.get("map").get("a").asText());
- for (int i = 0; i < intArray.length; ++i) {
- assertEquals(intArray[i], map.get("map").get("tuple").get(i).asInt());
- }
-
- JsonNode bagField = map.get("bag");
- for (int i = 0; i < intArray.length; ++i) {
- assertEquals(intArray[i], bagField.get(0).get(i).asInt());
- }
- for (int i = 0; i < stringArray.length; ++i) {
- assertEquals(stringArray[i], bagField.get(1).get(i).asText());
- }
- }
-
-
- @Test
- public void requireThatSimpleArraysMustBeConfigured() throws Exception {
- String[] stringArray = {"a", "b", "c"};
- JsonNode array = setupSimpleArrayOperation("array", stringArray, "docid=empty"); // simple arrays not configured
- // json: [["a"], ["b"], ["c"]]
- assertEquals("a", array.get(0).get(0).asText());
- assertEquals("b", array.get(1).get(0).asText());
- assertEquals("c", array.get(2).get(0).asText());
- }
-
-
- @Test
- public void requireThatSimpleArraysAreSupported() throws Exception {
- String[] stringArray = {"a", "b", "c"};
- JsonNode array = setupSimpleArrayOperation("array", stringArray, "docid=empty", "simple-array-fields=array");
- // json: ["a", "b", "c"]
- assertEquals("a", array.get(0).asText());
- assertEquals("b", array.get(1).asText());
- assertEquals("c", array.get(2).asText());
- }
-
-
- @Test
- public void requireThatSimpleArraysCanBeConfiguredWithWildcard() throws Exception {
- String[] stringArray = {"a", "b", "c"};
- JsonNode array = setupSimpleArrayOperation("array", stringArray, "docid=empty", "simple-array-fields=*");
- // json: ["a", "b", "c"]
- assertEquals("a", array.get(0).asText());
- assertEquals("b", array.get(1).asText());
- assertEquals("c", array.get(2).asText());
- }
-
-
- @Test
- public void requireThatMultipleSimpleArraysAreSupported() throws Exception {
- String[] stringArray = {"a", "b", "c"};
- JsonNode array = setupSimpleArrayOperation("array", stringArray, "docid=empty", "simple-array-fields=empty,array");
- // json: ["a", "b", "c"]
- assertEquals("a", array.get(0).asText());
- assertEquals("b", array.get(1).asText());
- assertEquals("c", array.get(2).asText());
- }
-
-
- private JsonNode setupSimpleArrayOperation(String name, String[] array, String... params) throws IOException {
- Schema schema = new Schema();
- Tuple tuple = TupleFactory.getInstance().newTuple();
-
- DataBag bag = new SortedDataBag(null);
- for (String s : array) {
- Tuple stringTuple = TupleFactory.getInstance().newTuple();
- stringTuple.append(s);
- bag.add(stringTuple);
- }
- addToTuple(name, DataType.BAG, bag, schema, tuple);
-
- VespaDocumentOperation docOp = new VespaDocumentOperation(params);
- docOp.setInputSchema(schema);
- String json = docOp.exec(tuple);
-
- ObjectMapper m = new ObjectMapper();
- JsonNode root = m.readTree(json);
- JsonNode fields = root.get("fields");
- return fields.get(name);
- }
-
-
- @Test
- public void requireThatUDFSupportsTensors() throws IOException {
- Schema schema = new Schema();
- Tuple tuple = TupleFactory.getInstance().newTuple();
-
- // Please refer to the tensor format documentation
-
- Map<String, Double> tensor = new HashMap<String, Double>() {{
- put("x:label1,y:label2,z:label4", 2.0);
- put("x:label3", 3.0);
- }};
-
- addToTuple("id", DataType.CHARARRAY, "123", schema, tuple);
- addToTuple("tensor", DataType.MAP, tensor, schema, tuple);
-
- VespaDocumentOperation docOp = new VespaDocumentOperation("docid=empty", "create-tensor-fields=tensor");
- docOp.setInputSchema(schema);
- String json = docOp.exec(tuple);
-
- ObjectMapper m = new ObjectMapper();
- JsonNode root = m.readTree(json);
- JsonNode fields = root.get("fields");
- JsonNode tensorNode = fields.get("tensor");
- JsonNode cells = tensorNode.get("cells");
-
- assertEquals("label1", cells.get(0).get("address").get("x").asText());
- assertEquals("label2", cells.get(0).get("address").get("y").asText());
- assertEquals("label4", cells.get(0).get("address").get("z").asText());
- assertEquals("label3", cells.get(1).get("address").get("x").asText());
-
- assertEquals(2.0, cells.get(0).get("value").asDouble(), 1e-6);
- assertEquals(3.0, cells.get(1).get("value").asDouble(), 1e-6);
- }
-
-
- @Test
- public void requireThatUDFCanExcludeFields() throws IOException {
- String json = getDocumentOperationJson("docid=id:<application>:metrics::<name>-<date>", "exclude-fields=application,date");
- ObjectMapper m = new ObjectMapper();
- JsonNode root = m.readTree(json);
- JsonNode fields = root.path("fields");
-
- // 'application' and 'date' fields should not appear in JSON
- assertNull(fields.get("application"));
- assertNull(fields.get("date"));
- assertNotNull(fields.get("name"));
- assertNotNull(fields.get("value"));
- }
-
-
- private String getDocumentOperationJson(String... params) throws IOException {
- Schema schema = new Schema();
- Tuple tuple = TupleFactory.getInstance().newTuple();
-
- addToTuple("application", DataType.CHARARRAY, "testapp", schema, tuple);
- addToTuple("name", DataType.CHARARRAY, "clicks", schema, tuple);
- addToTuple("date", DataType.CHARARRAY, "20160112", schema, tuple);
- addToTuple("value", DataType.CHARARRAY, 3, schema, tuple);
-
- VespaDocumentOperation docOp = new VespaDocumentOperation(params);
- docOp.setInputSchema(schema);
- return docOp.exec(tuple);
- }
-
-
- @Test
- public void requireThatUDFSupportsSimpleObjectFields() throws IOException {
- Schema objectSchema = new Schema();
- Tuple objectTuple = TupleFactory.getInstance().newTuple();
- addToTuple("id", DataType.LONG, 123456789L, objectSchema, objectTuple);
- addToTuple("url", DataType.CHARARRAY, "example.com", objectSchema, objectTuple);
- addToTuple("value", DataType.INTEGER, 123, objectSchema, objectTuple);
-
- Schema schema = new Schema();
- Tuple tuple = TupleFactory.getInstance().newTuple();
- addToTuple("object", DataType.TUPLE, objectTuple, objectSchema, schema, tuple);
-
- VespaDocumentOperation docOp = new VespaDocumentOperation("docid=empty", "simple-object-fields=object");
- docOp.setInputSchema(schema);
- String json = docOp.exec(tuple);
-
- ObjectMapper m = new ObjectMapper();
- JsonNode root = m.readTree(json);
- JsonNode fields = root.get("fields");
- JsonNode objectNode = fields.get("object");
-
- assertEquals(123456789L, objectNode.get("id").asLong());
- assertEquals("example.com", objectNode.get("url").asText());
- assertEquals(123, objectNode.get("value").asInt());
- }
-
-
- @Test
- public void requireThatUDFSupportsBagAsMapFields() throws IOException {
- DataBag bag = BagFactory.getInstance().newDefaultBag();
-
- Schema objectSchema = new Schema();
- Tuple objectTuple = TupleFactory.getInstance().newTuple();
- addToTuple("key", DataType.CHARARRAY, "123456", objectSchema, objectTuple);
- addToTuple("value", DataType.INTEGER, 123456, objectSchema, objectTuple);
- bag.add(objectTuple);
-
- objectSchema = new Schema();
- objectTuple = TupleFactory.getInstance().newTuple();
- addToTuple("key", DataType.CHARARRAY, "234567", objectSchema, objectTuple);
- addToTuple("value", DataType.INTEGER, 234567, objectSchema, objectTuple);
- bag.add(objectTuple);
-
- Schema schema = new Schema();
- Tuple tuple = TupleFactory.getInstance().newTuple();
- addToTuple("bag", DataType.BAG, bag, objectSchema, schema, tuple);
-
- VespaDocumentOperation docOp = new VespaDocumentOperation("docid=empty", "bag-as-map-fields=bag");
- docOp.setInputSchema(schema);
- String json = docOp.exec(tuple);
-
- ObjectMapper m = new ObjectMapper();
- JsonNode root = m.readTree(json);
- JsonNode fields = root.get("fields");
- JsonNode bagNode = fields.get("bag");
-
- assertEquals(123456, bagNode.get("123456").asInt());
- assertEquals(234567, bagNode.get("234567").asInt());
- }
-
- @Test
- public void requireThatUDFPrintIdWhenVerbose() throws IOException {
- DataBag bag = BagFactory.getInstance().newDefaultBag();
-
- Schema objectSchema = new Schema();
- Tuple objectTuple = TupleFactory.getInstance().newTuple();
- addToTuple("key", DataType.CHARARRAY, "123456", objectSchema, objectTuple);
- addToTuple("value", DataType.INTEGER, 123456, objectSchema, objectTuple);
- bag.add(objectTuple);
-
- objectSchema = new Schema();
- objectTuple = TupleFactory.getInstance().newTuple();
- addToTuple("key", DataType.CHARARRAY, "234567", objectSchema, objectTuple);
- addToTuple("value", DataType.INTEGER, 234567, objectSchema, objectTuple);
- bag.add(objectTuple);
-
- Schema schema = new Schema();
- Tuple tuple = TupleFactory.getInstance().newTuple();
- addToTuple("bag", DataType.BAG, bag, objectSchema, schema, tuple);
-
- VespaDocumentOperation docOp = new VespaDocumentOperation("docid=7654321", "bag-as-map-fields=bag","verbose=true");
- docOp.setInputSchema(schema);
- String json = docOp.exec(tuple);
-
- assertTrue(outContent.toString().contains("Processing docId: 7654321"));
- }
-
- @Test
- public void requireThatUDFVerboseSetToFalseByDefault() throws IOException {
- DataBag bag = BagFactory.getInstance().newDefaultBag();
-
- Schema objectSchema = new Schema();
- Tuple objectTuple = TupleFactory.getInstance().newTuple();
- addToTuple("key", DataType.CHARARRAY, "123456", objectSchema, objectTuple);
- addToTuple("value", DataType.INTEGER, 123456, objectSchema, objectTuple);
- bag.add(objectTuple);
-
- objectSchema = new Schema();
- objectTuple = TupleFactory.getInstance().newTuple();
- addToTuple("key", DataType.CHARARRAY, "234567", objectSchema, objectTuple);
- addToTuple("value", DataType.INTEGER, 234567, objectSchema, objectTuple);
- bag.add(objectTuple);
-
- Schema schema = new Schema();
- Tuple tuple = TupleFactory.getInstance().newTuple();
- addToTuple("bag", DataType.BAG, bag, objectSchema, schema, tuple);
-
- VespaDocumentOperation docOp = new VespaDocumentOperation("docid=7654321", "bag-as-map-fields=bag");
- docOp.setInputSchema(schema);
- String json = docOp.exec(tuple);
-
- assertEquals("", outContent.toString());
- }
-
- private void addToTuple(String alias, byte type, Object value, Schema schema, Tuple tuple) {
- schema.add(new Schema.FieldSchema(alias, type));
- tuple.append(value);
- }
-
-
- private void addToTuple(String alias, byte type, Object value, Schema schemaInField, Schema schema, Tuple tuple)
- throws FrontendException {
- schema.add(new Schema.FieldSchema(alias, schemaInField, type));
- tuple.append(value);
- }
-
- private void addToBagWithSchema(String alias, byte type, Tuple value, Schema schemaInField, Schema schema,DataBag bag)
- throws FrontendException {
- schema.add(new Schema.FieldSchema(alias, schemaInField, type));
- bag.add(value);
- }
-}
diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaQueryTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaQueryTest.java
deleted file mode 100644
index a0b549a737f..00000000000
--- a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaQueryTest.java
+++ /dev/null
@@ -1,101 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.hadoop.pig;
-
-import com.sun.net.httpserver.HttpServer;
-import com.yahoo.vespa.hadoop.util.MockQueryHandler;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.pig.ExecType;
-import org.apache.pig.PigServer;
-import org.apache.pig.data.Tuple;
-import org.junit.jupiter.api.Test;
-
-import java.net.InetSocketAddress;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-public class VespaQueryTest {
-
- @Test
- public void requireThatQueriesAreReturnedCorrectly() throws Exception {
- runQueryTest("src/test/pig/query.pig", createQueryHandler(""), 18901);
- }
-
- @Test
- public void requireThatQueriesAreReturnedCorrectlyWithAlternativeJsonRoot() throws Exception {
- runQueryTest("src/test/pig/query_alt_root.pig", createQueryHandler("children"), 18902);
- }
-
- private void runQueryTest(String script, MockQueryHandler queryHandler, int port) throws Exception {
- final String endpoint = "http://localhost:" + port;
-
- HttpServer server = HttpServer.create(new InetSocketAddress(port), 0);
- server.createContext("/", queryHandler);
- server.start();
-
- PigServer ps = setup(script, endpoint);
-
- Iterator<Tuple> recommendations = ps.openIterator("recommendations");
- while (recommendations.hasNext()) {
- Tuple tuple = recommendations.next();
-
- String userid = (String) tuple.get(0);
- Integer rank = (Integer) tuple.get(1);
- String docid = (String) tuple.get(2);
- Double relevance = (Double) tuple.get(3);
- String fieldId = (String) tuple.get(4);
- String fieldContent = (String) tuple.get(5);
-
- MockQueryHandler.MockQueryHit hit = queryHandler.getHit(userid, rank);
- assertEquals(docid, hit.id);
- assertEquals(relevance, hit.relevance, 1e-3);
- assertEquals(fieldId, hit.fieldId);
- assertEquals(fieldContent, hit.fieldContent);
- }
-
- if (server != null) {
- server.stop(0);
- }
-
- }
-
- private PigServer setup(String script, String endpoint) throws Exception {
- Configuration conf = new HdfsConfiguration();
- Map<String, String> parameters = new HashMap<>();
- parameters.put("ENDPOINT", endpoint);
-
- PigServer ps = new PigServer(ExecType.LOCAL, conf);
- ps.setBatchOn();
- ps.registerScript(script, parameters);
-
- return ps;
- }
-
- private MockQueryHandler createQueryHandler(String childNode) {
- MockQueryHandler queryHandler = new MockQueryHandler(childNode);
-
- List<String> userIds = Arrays.asList("5", "104", "313");
-
- int hitsPerUser = 3;
- for (int i = 0; i < hitsPerUser * userIds.size(); ++i) {
- String id = "" + (i+1);
- String userId = userIds.get(i / hitsPerUser);
- queryHandler.newHit().
- setId("id::::" + id).
- setRelevance(1.0 - (i % hitsPerUser) * 0.1).
- setFieldSddocname("doctype").
- setFieldId("" + id).
- setFieldDate("2016060" + id).
- setFieldContent("Content for user " + userId + " hit " + i % hitsPerUser + "...").
- add(userId);
- }
-
- return queryHandler;
- }
-
-}
diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java
deleted file mode 100644
index 3183c770bc7..00000000000
--- a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java
+++ /dev/null
@@ -1,106 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.hadoop.pig;
-
-import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration;
-import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.mapred.Counters;
-import org.apache.pig.ExecType;
-import org.apache.pig.PigServer;
-import org.apache.pig.backend.executionengine.ExecJob;
-import org.apache.pig.tools.pigstats.JobStats;
-import org.apache.pig.tools.pigstats.PigStats;
-import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
-import org.junit.jupiter.api.Test;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-
-public class VespaStorageTest {
-
- @Test
- public void requireThatPremadeOperationsFeedSucceeds() throws Exception {
- assertAllDocumentsOk("src/test/pig/feed_operations.pig");
- }
-
-
- @Test
- public void requireThatPremadeMultilineOperationsFeedSucceeds() throws Exception {
- assertAllDocumentsOk("src/test/pig/feed_multiline_operations.pig");
- }
-
-
- @Test
- public void requireThatPremadeOperationsWithJsonLoaderFeedSucceeds() throws Exception {
- assertAllDocumentsOk("src/test/pig/feed_operations_with_json_loader.pig");
- }
-
- @Test
- public void requireThatPremadeOperationsWithJsonLoaderFeedAndNonLegacyClientSucceeds() throws Exception {
- Configuration conf = new HdfsConfiguration();
- conf.set(VespaConfiguration.USE_SSL, Boolean.TRUE.toString());
- assertAllDocumentsOk("src/test/pig/feed_operations_with_json_loader.pig", conf);
- }
-
- @Test
- public void requireThatCreateOperationsFeedSucceeds() throws Exception {
- assertAllDocumentsOk("src/test/pig/feed_create_operations.pig");
- }
-
-
- @Test
- public void requireThatCreateOperationsShortFormFeedSucceeds() throws Exception {
- assertAllDocumentsOk("src/test/pig/feed_create_operations_short_form.pig");
- }
-
-
- @Test
- public void requireThatFeedVisitDataSucceeds() throws Exception {
- assertAllDocumentsOk("src/test/pig/feed_visit_data.pig");
- }
-
-
- private PigServer setup(String script, Configuration conf) throws Exception {
- if (conf == null) {
- conf = new HdfsConfiguration();
- }
- conf.setIfUnset(VespaConfiguration.DRYRUN, "true");
- conf.setIfUnset(VespaConfiguration.ENDPOINT, "dummy-endpoint");
-
- // Parameter substitutions - can also be set by configuration
- Map<String, String> parameters = new HashMap<>();
- parameters.put("ENDPOINT", "endpoint-does-not-matter-in-dryrun,another-endpoint-that-does-not-matter");
-
- PigServer ps = new PigServer(ExecType.LOCAL, conf);
- ps.setBatchOn();
- ps.registerScript(script, parameters);
-
- return ps;
- }
-
-
- private void assertAllDocumentsOk(String script) throws Exception {
- assertAllDocumentsOk(script, null);
- }
-
-
- private void assertAllDocumentsOk(String script, Configuration conf) throws Exception {
- PigServer ps = setup(script, conf);
- List<ExecJob> jobs = ps.executeBatch();
- PigStats stats = jobs.get(0).getStatistics();
- for (JobStats js : stats.getJobGraph()) {
- Counters hadoopCounters = ((MRJobStats)js).getHadoopCounters();
- assertNotNull(hadoopCounters);
- VespaCounters counters = VespaCounters.get(hadoopCounters);
- assertEquals(10, counters.getDocumentsSent());
- assertEquals(0, counters.getDocumentsFailed());
- assertEquals(10, counters.getDocumentsOk());
- }
- }
-
-}
diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/util/MockQueryHandler.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/util/MockQueryHandler.java
deleted file mode 100644
index 64c160ea14c..00000000000
--- a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/util/MockQueryHandler.java
+++ /dev/null
@@ -1,219 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.hadoop.util;
-
-import com.fasterxml.jackson.core.JsonEncoding;
-import com.fasterxml.jackson.core.JsonFactory;
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.sun.net.httpserver.HttpExchange;
-import com.sun.net.httpserver.HttpHandler;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.URI;
-import java.net.URLDecoder;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class MockQueryHandler implements HttpHandler {
-
- private final Map<String, List<MockQueryHit>> hitMap;
- private final String childNode;
-
- public MockQueryHandler(String childNode) {
- this.hitMap = new HashMap<>();
- this.childNode = childNode;
- }
-
- public void handle(HttpExchange t) throws IOException {
- URI uri = t.getRequestURI();
- String query = uri.getQuery();
- String response = null;
-
- // Parse query - extract "query" element
- if (query != null) {
- String params[] = query.split("[&]");
- for (String param : params) {
- int i = param.indexOf('=');
- String name = param.substring(0, i);
- String value = URLDecoder.decode(param.substring(i + 1), "UTF-8");
-
- if ("query".equalsIgnoreCase(name)) {
- response = getResponse(URLDecoder.decode(param.substring(i + 1), "UTF-8"));
- }
- }
- }
-
- t.sendResponseHeaders(200, response == null ? 0 : response.length());
- OutputStream os = t.getResponseBody();
- os.write(response == null ? "".getBytes() : response.getBytes());
- os.close();
-
- }
-
- public MockQueryHit getHit(String query, Integer rank) {
- if (!hitMap.containsKey(query)) {
- return null;
- }
- if (rank >= hitMap.get(query).size()) {
- return null;
- }
- return hitMap.get(query).get(rank);
- }
-
- public MockQueryHit newHit() {
- return new MockQueryHit(this);
- }
-
- public void addHit(String query, MockQueryHit hit) {
- if (!hitMap.containsKey(query)) {
- hitMap.put(query, new ArrayList<>());
- }
- hitMap.get(query).add(hit);
- }
-
- private String getResponse(String query) throws IOException {
- List<MockQueryHit> hits = hitMap.get(query);
- if (hits == null) {
- return null;
- }
-
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- JsonGenerator g = new JsonFactory().createGenerator(out, JsonEncoding.UTF8);
-
- writeResultStart(g, hits.size());
- for (MockQueryHit hit : hits) {
- writeHit(g, hit);
- }
- writeResultsEnd(g);
- g.close();
-
- return out.toString();
- }
-
- private void writeHit(JsonGenerator g, MockQueryHit hit) throws IOException {
- g.writeStartObject();
-
- g.writeFieldName("id");
- g.writeString(hit.id);
-
- g.writeFieldName("relevance");
- g.writeNumber(hit.relevance);
-
- g.writeFieldName("fields");
- g.writeStartObject();
-
- g.writeFieldName("sddocname");
- g.writeString(hit.fieldSddocname);
-
- g.writeFieldName("date");
- g.writeString(hit.fieldDate);
-
- g.writeFieldName("content");
- g.writeString(hit.fieldContent);
-
- g.writeFieldName("id");
- g.writeString(hit.fieldId);
-
- g.writeEndObject();
- g.writeEndObject();
- }
-
- private void writeResultStart(JsonGenerator g, int count) throws IOException {
- g.writeStartObject();
- g.writeFieldName("root");
-
- g.writeStartObject();
-
- g.writeFieldName("id");
- g.writeString("toplevel");
-
- g.writeFieldName("relevance");
- g.writeNumber(1);
-
- g.writeFieldName("fields");
- g.writeStartObject();
- g.writeFieldName("totalCount");
- g.writeNumber(count);
- g.writeEndObject();
-
- g.writeFieldName("coverage");
- g.writeStartObject();
- g.writeFieldName("coverage");
- g.writeNumber(100);
- // ... more stuff here usually
- g.writeEndObject();
-
- g.writeFieldName("children");
- g.writeStartArray();
-
- if (!childNode.isEmpty()) {
- g.writeStartObject();
- g.writeFieldName(childNode);
- g.writeStartArray();
- }
- }
-
- private void writeResultsEnd(JsonGenerator g) throws IOException {
- if (!childNode.isEmpty()) {
- g.writeEndArray();
- g.writeEndObject();
- }
- g.writeEndArray();
- g.writeEndObject();
- g.writeEndObject();
- }
-
- public static class MockQueryHit {
-
- private final MockQueryHandler handler;
-
- public String id;
- public Double relevance;
- public String fieldSddocname;
- public String fieldDate;
- public String fieldContent;
- public String fieldId;
-
- private MockQueryHit(MockQueryHandler handler) {
- this.handler = handler;
- }
-
- public void add(String query) {
- handler.addHit(query, this);
- }
-
- public MockQueryHit setId(String id) {
- this.id = id;
- return this;
- }
-
- public MockQueryHit setRelevance(Double relevance) {
- this.relevance = relevance;
- return this;
- }
-
- public MockQueryHit setFieldSddocname(String fieldSddocname) {
- this.fieldSddocname = fieldSddocname;
- return this;
- }
-
- public MockQueryHit setFieldDate(String fieldDate) {
- this.fieldDate = fieldDate;
- return this;
- }
-
- public MockQueryHit setFieldContent(String fieldContent) {
- this.fieldContent = fieldContent;
- return this;
- }
-
- public MockQueryHit setFieldId(String fieldId) {
- this.fieldId = fieldId;
- return this;
- }
- }
-
-}
diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/util/TupleToolsTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/util/TupleToolsTest.java
deleted file mode 100644
index b4ccbdf2183..00000000000
--- a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/util/TupleToolsTest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.hadoop.util;
-
-import com.yahoo.vespa.hadoop.mapreduce.util.TupleTools;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.junit.jupiter.api.Test;
-
-import java.io.IOException;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-public class TupleToolsTest {
-
- @Test
- public void requireThatTupleToStringHandlesSimpleTypes() throws IOException {
- Schema schema = new Schema();
- Tuple tuple = TupleFactory.getInstance().newTuple();
-
- addToTuple("id", DataType.CHARARRAY, "123", schema, tuple);
- addToTuple("rank", DataType.INTEGER, 1, schema, tuple);
-
- String template = "Id is <id> and rank is <rank>";
- String result = TupleTools.toString(schema, tuple, template);
-
- assertEquals("Id is 123 and rank is 1", result);
- }
-
-
- private void addToTuple(String alias, byte type, Object value, Schema schema, Tuple tuple) {
- schema.add(new Schema.FieldSchema(alias, type));
- tuple.append(value);
- }
-
- @Test
- public void requireThatTupleToStringHandlesStringCharacters() throws IOException {
- Schema schema = new Schema();
- Tuple tuple = TupleFactory.getInstance().newTuple();
-
- addToTuple("id", DataType.CHARARRAY, "_!@#$%^&*()", schema, tuple);
- addToTuple("rank", DataType.INTEGER, 1, schema, tuple);
-
- String template = "Id is <id> and rank is <rank>";
- String result = TupleTools.toString(schema, tuple, template);
-
- assertEquals("Id is _!@#$%^&*() and rank is 1", result);
- }
-
-}
diff --git a/vespa-hadoop/src/test/pig/feed_create_operations.pig b/vespa-hadoop/src/test/pig/feed_create_operations.pig
deleted file mode 100644
index 4583c095133..00000000000
--- a/vespa-hadoop/src/test/pig/feed_create_operations.pig
+++ /dev/null
@@ -1,24 +0,0 @@
--- Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
--- REGISTER vespa-hadoop.jar -- Not needed in tests
-
--- Create valid Vespa put operations
-DEFINE VespaPutOperation
- com.yahoo.vespa.hadoop.pig.VespaDocumentOperation(
- 'operation=put',
- 'docid=id:<application>:metrics::<name>-<date>'
- );
-
--- By default, VespaStorage assumes it's feeding valid Vespa operations
-DEFINE VespaStorage
- com.yahoo.vespa.hadoop.pig.VespaStorage();
-
--- Load tabular data
-metrics = LOAD 'src/test/resources/tabular_data.csv' AS (date:chararray, name:chararray, value:int, application:chararray);
-
--- Transform tabular data to a Vespa document operation JSON format
-metrics = FOREACH metrics GENERATE VespaPutOperation(*);
-
--- Store into Vespa
-STORE metrics INTO '$ENDPOINT' USING VespaStorage();
-
-
diff --git a/vespa-hadoop/src/test/pig/feed_create_operations_short_form.pig b/vespa-hadoop/src/test/pig/feed_create_operations_short_form.pig
deleted file mode 100644
index 0f0e63d843a..00000000000
--- a/vespa-hadoop/src/test/pig/feed_create_operations_short_form.pig
+++ /dev/null
@@ -1,19 +0,0 @@
--- Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
--- REGISTER vespa-hadoop.jar -- Not needed in tests
-
--- Transform tabular data to a Vespa document operation JSON format
--- as part of storing the data.
-DEFINE VespaStorage
- com.yahoo.vespa.hadoop.pig.VespaStorage(
- 'create-document-operation=true',
- 'operation=put',
- 'docid=id:<application>:metrics::<name>-<date>'
- );
-
--- Load tabular data
-metrics = LOAD 'src/test/resources/tabular_data.csv' AS (date:chararray, name:chararray, value:int, application:chararray);
-
--- Store into Vespa
-STORE metrics INTO '$ENDPOINT' USING VespaStorage();
-
-
diff --git a/vespa-hadoop/src/test/pig/feed_multiline_operations.pig b/vespa-hadoop/src/test/pig/feed_multiline_operations.pig
deleted file mode 100644
index 1971270cbdc..00000000000
--- a/vespa-hadoop/src/test/pig/feed_multiline_operations.pig
+++ /dev/null
@@ -1,15 +0,0 @@
--- Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
--- REGISTER vespa-hadoop.jar -- Not needed in tests
-
--- Define short name for VespaJsonLoader
-DEFINE VespaJsonLoader com.yahoo.vespa.hadoop.pig.VespaSimpleJsonLoader();
-
--- Define short name for VespaStorage
-DEFINE VespaStorage com.yahoo.vespa.hadoop.pig.VespaStorage();
-
--- Load data - one column for json data
-metrics = LOAD 'src/test/resources/operations_multiline_data.json' USING VespaJsonLoader() AS (data:chararray);
-
--- Store into Vespa
-STORE metrics INTO '$ENDPOINT' USING VespaStorage();
-
diff --git a/vespa-hadoop/src/test/pig/feed_operations.pig b/vespa-hadoop/src/test/pig/feed_operations.pig
deleted file mode 100644
index 48873fde87a..00000000000
--- a/vespa-hadoop/src/test/pig/feed_operations.pig
+++ /dev/null
@@ -1,11 +0,0 @@
--- Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
--- REGISTER vespa-hadoop.jar -- Not needed in tests
-
--- Define short name for VespaStorage
-DEFINE VespaStorage com.yahoo.vespa.hadoop.pig.VespaStorage();
-
--- Load data - one column for json data
-metrics = LOAD 'src/test/resources/operations_data.json' AS (data:chararray);
-
--- Store into Vespa
-STORE metrics INTO '$ENDPOINT' USING VespaStorage();
diff --git a/vespa-hadoop/src/test/pig/feed_operations_with_json_loader.pig b/vespa-hadoop/src/test/pig/feed_operations_with_json_loader.pig
deleted file mode 100644
index da58fe3c678..00000000000
--- a/vespa-hadoop/src/test/pig/feed_operations_with_json_loader.pig
+++ /dev/null
@@ -1,14 +0,0 @@
--- Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
--- REGISTER vespa-hadoop.jar -- Not needed in tests
-
--- Define short name for VespaJsonLoader
-DEFINE VespaJsonLoader com.yahoo.vespa.hadoop.pig.VespaSimpleJsonLoader();
-
--- Define short name for VespaStorage
-DEFINE VespaStorage com.yahoo.vespa.hadoop.pig.VespaStorage();
-
--- Load data - one column for json data
-metrics = LOAD 'src/test/resources/operations_data.json' USING VespaJsonLoader() AS (data:chararray);
-
--- Store into Vespa
-STORE metrics INTO '$ENDPOINT' USING VespaStorage();
diff --git a/vespa-hadoop/src/test/pig/feed_operations_xml.pig b/vespa-hadoop/src/test/pig/feed_operations_xml.pig
deleted file mode 100644
index 4e5057f4909..00000000000
--- a/vespa-hadoop/src/test/pig/feed_operations_xml.pig
+++ /dev/null
@@ -1,11 +0,0 @@
--- Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
--- REGISTER vespa-hadoop.jar -- Not needed in tests
-
--- Define short name for VespaStorage
-DEFINE VespaStorage com.yahoo.vespa.hadoop.pig.VespaStorage();
-
--- Load data - one column for xml data
-data = LOAD 'src/test/resources/operations_data.xml' AS (data:chararray);
-
--- Store into Vespa
-STORE data INTO '$ENDPOINT' USING VespaStorage();
diff --git a/vespa-hadoop/src/test/pig/feed_visit_data.pig b/vespa-hadoop/src/test/pig/feed_visit_data.pig
deleted file mode 100644
index 59d144b53dc..00000000000
--- a/vespa-hadoop/src/test/pig/feed_visit_data.pig
+++ /dev/null
@@ -1,12 +0,0 @@
--- Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
--- REGISTER vespa-hadoop.jar -- Not needed in tests
-
--- Define short name for VespaStorage
-DEFINE VespaStorage com.yahoo.vespa.hadoop.pig.VespaStorage();
-
--- Load data - one column for json data
-metrics = LOAD 'src/test/resources/visit_data.json' AS (data:chararray);
-
--- Store into Vespa
-STORE metrics INTO '$ENDPOINT' USING VespaStorage();
-
diff --git a/vespa-hadoop/src/test/pig/query.pig b/vespa-hadoop/src/test/pig/query.pig
deleted file mode 100644
index 96caa5cd0c4..00000000000
--- a/vespa-hadoop/src/test/pig/query.pig
+++ /dev/null
@@ -1,19 +0,0 @@
--- Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
--- REGISTER vespa-hadoop.jar -- Not needed in tests
-
--- Define Vespa query for retrieving blog posts
-DEFINE BlogPostRecommendations
- com.yahoo.vespa.hadoop.pig.VespaQuery(
- 'query=$ENDPOINT/search?query=<userid>&hits=100',
- 'schema=rank:int,id:chararray,relevance:double,fields/id:chararray,fields/content:chararray'
- );
-
--- Load data from a local file
-users = LOAD 'src/test/resources/user_ids.csv' AS (userid:chararray);
-users = FILTER users BY userid IS NOT null;
-
--- Run a set of queries against Vespa
-recommendations = FOREACH users GENERATE userid, FLATTEN(BlogPostRecommendations(*));
-
--- Output recommendations
-DUMP recommendations;
diff --git a/vespa-hadoop/src/test/pig/query_alt_root.pig b/vespa-hadoop/src/test/pig/query_alt_root.pig
deleted file mode 100644
index 2884b4a600f..00000000000
--- a/vespa-hadoop/src/test/pig/query_alt_root.pig
+++ /dev/null
@@ -1,20 +0,0 @@
--- Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
--- REGISTER vespa-hadoop.jar -- Not needed in tests
-
--- Define Vespa query for retrieving blog posts
-DEFINE BlogPostRecommendations
- com.yahoo.vespa.hadoop.pig.VespaQuery(
- 'query=$ENDPOINT/search?query=<userid>&hits=100',
- 'rootnode=root/children/children',
- 'schema=rank:int,id:chararray,relevance:double,fields/id:chararray,fields/content:chararray'
- );
-
--- Load data from a local file
-users = LOAD 'src/test/resources/user_ids.csv' AS (userid:chararray);
-users = FILTER users BY userid IS NOT null;
-
--- Run a set of queries against Vespa
-recommendations = FOREACH users GENERATE userid, FLATTEN(BlogPostRecommendations(*));
-
--- Output recommendations
-DUMP recommendations;
diff --git a/vespa-hadoop/src/test/resources/operations_data.json b/vespa-hadoop/src/test/resources/operations_data.json
deleted file mode 100644
index 5af436dbfe7..00000000000
--- a/vespa-hadoop/src/test/resources/operations_data.json
+++ /dev/null
@@ -1,10 +0,0 @@
-{"put":"id:testapp:metric::clicks-2015110414","fields":{"date":"2015110414","name":"clicks","value":1,"application":"testapp"}}
-{"fields":{"date":"2015110416","name":"clicks","value":5,"application":"testapp"},"put":"id:testapp:metric::clicks-2015110416"}
-{"put":"id:testapp:metric::clicks-2015110415","fields":{"date":"2015110415","name":"clicks","value":2,"application":"testapp"}}
-{"put":"id:testapp:metric::clicks-2015110417","fields":{"date":"2015110417","name":"clicks","value":3,"application":"testapp"}}
-{"put":"id:testapp:metric::clicks-2015110418","fields":{"date":"2015110418","name":"clicks","value":6,"application":"testapp"}}
-{"put":"id:testapp:metric::clicks-2015110419","fields":{"date":"2015110419","name":"clicks","value":3,"application":"testapp"}}
-{"put":"id:testapp:metric::clicks-2015110420","fields":{"date":"2015110420","name":"clicks","value":4,"application":"testapp"}}
-{"put":"id:testapp:metric::clicks-2015110421","fields":{"date":"2015110421","name":"clicks","value":2,"application":"testapp"}}
-{"fields":{"date":"2015110422","name":"clicks","value":5,"application":"testapp"},"condition":"metrics==0","put":"id:testapp:metric::clicks-2015110422"}
-{"put":"id:testapp:metric::clicks-2015110423","fields":{"date":"2015110423","name":"clicks","value":1,"application":"testapp"}}
diff --git a/vespa-hadoop/src/test/resources/operations_data.xml b/vespa-hadoop/src/test/resources/operations_data.xml
deleted file mode 100644
index db02b6bee73..00000000000
--- a/vespa-hadoop/src/test/resources/operations_data.xml
+++ /dev/null
@@ -1,14 +0,0 @@
-<?xml version="1.0" encoding="utf-8"?>
-<!-- Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -->
-<vespafeed>
- <document documenttype="music" documentid="id:music:music::http://music.yahoo.com/a-ha/Scoundrel+Days"> <url>http://music.yahoo.com/a-ha/Scoundrel+Days</url> <title><![CDATA[Scoundrel Days]]></title> <artist><![CDATA[a-ha]]></artist> <year>0</year> <popularity>290</popularity> </document>
- <document documenttype="music" documentid="id:music:music::http://music.yahoo.com/Accept/Restless+And+Wild"> <url>http://music.yahoo.com/Accept/Restless+And+Wild</url> <title><![CDATA[Restless And Wild]]></title> <artist><![CDATA[Accept]]></artist> <year>0</year> <popularity>75</popularity> </document>
- <document documenttype="music" documentid="id:music:music::http://music.yahoo.com/Accept/Staying+A+Life"> <url>http://music.yahoo.com/Accept/Staying+A+Life</url> <title><![CDATA[Staying A Life]]></title> <artist><![CDATA[Accept]]></artist> <year>1985</year> <popularity>77</popularity> </document>
- <document documenttype="music" documentid="id:music:music::http://music.yahoo.com/Alice+In+Chains/Dirt"> <url>http://music.yahoo.com/Alice+In+Chains/Dirt</url> <title><![CDATA[Dirt]]></title> <artist><![CDATA[Alice In Chains]]></artist> <year>1992</year> <popularity>114</popularity> </document>
- <document documenttype="music" documentid="id:music:music::http://music.yahoo.com/Alice+In+Chains/Live"> <url>http://music.yahoo.com/Alice+In+Chains/Live</url> <title><![CDATA[Live]]></title> <artist><![CDATA[Alice In Chains]]></artist> <year>1990</year> <popularity>363</popularity> </document>
- <document documenttype="music" documentid="id:music:music::http://music.yahoo.com/Amy+MacDonald/This+Is+The+Life"> <url>http://music.yahoo.com/Amy+MacDonald/This+Is+The+Life</url> <title><![CDATA[This Is The Life]]></title> <artist><![CDATA[Amy MacDonald]]></artist> <year>2007</year> <popularity>355</popularity> </document>
- <document documenttype="music" documentid="id:music:music::http://music.yahoo.com/Ane+Brun/Duets"> <url>http://music.yahoo.com/Ane+Brun/Duets</url> <title><![CDATA[Duets]]></title> <artist><![CDATA[Ane Brun]]></artist> <year>0</year> <popularity>255</popularity> </document>
- <update documenttype="music" documentid="id:music:music::http://music.yahoo.com/bobdylan/BestOf"><assign field="title">The Best of Bob Dylan</assign><add field="tracks"><item>Man Of Constant Sorrow</item></add></update>
- <remove documentid="id:music:music::http://music.yahoo.com/Aqpop/Beautifully+Smart" />
- <document documenttype="music" documentid="id:music:music::http://music.yahoo.com/Annuals/Be+He+Me"> <url>http://music.yahoo.com/Annuals/Be+He+Me</url> <title><![CDATA[Be He Me]]></title> <artist><![CDATA[Annuals]]></artist> <year>0</year> <popularity>207</popularity> </document>
-</vespafeed>
diff --git a/vespa-hadoop/src/test/resources/operations_multiline_data.json b/vespa-hadoop/src/test/resources/operations_multiline_data.json
deleted file mode 100644
index 2b51698d9b7..00000000000
--- a/vespa-hadoop/src/test/resources/operations_multiline_data.json
+++ /dev/null
@@ -1,93 +0,0 @@
-[
- {
- "put": "id:testapp:metric::clicks-2015110414",
- "fields": {
- "date": "2015110414",
- "name": "clicks",
- "value": 1,
- "application": "testapp"
- }
- },
- {
- "fields": {
- "date": "2015110416",
- "name": "clicks",
- "value": 5,
- "application": "testapp"
- },
- "put": "id:testapp:metric::clicks-2015110416"
- },
- {
- "put": "id:testapp:metric::clicks-2015110415",
- "fields": {
- "date": "2015110415",
- "name": "clicks",
- "value": 2,
- "application": "testapp"
- }
- },
- {
- "put": "id:testapp:metric::clicks-2015110417",
- "fields": {
- "date": "2015110417",
- "name": "clicks",
- "value": 3,
- "application": "testapp"
- }
- },
- {
- "put": "id:testapp:metric::clicks-2015110418",
- "fields": {
- "date": "2015110418",
- "name": "clicks",
- "value": 6,
- "application": "testapp"
- }
- },
- {
- "put": "id:testapp:metric::clicks-2015110419",
- "fields": {
- "date": "2015110419",
- "name": "clicks",
- "value": 3,
- "application": "testapp"
- }
- },
- {
- "put": "id:testapp:metric::clicks-2015110420",
- "fields": {
- "date": "2015110420",
- "name": "clicks",
- "value": 4,
- "application": "testapp"
- }
- },
- {
- "put": "id:testapp:metric::clicks-2015110421",
- "fields": {
- "date": "2015110421",
- "name": "clicks",
- "value": 2,
- "application": "testapp"
- }
- },
- {
- "fields": {
- "date": "2015110422",
- "name": "clicks",
- "value": 5,
- "application": "testapp"
- },
- "condition": "metrics==0",
- "put": "id:testapp:metric::clicks-2015110422"
- },
- {
- "put": "id:testapp:metric::clicks-2015110423",
- "fields": {
- "date": "2015110423",
- "name": "clicks",
- "value": 1,
- "application": "testapp"
- }
- }
-]
diff --git a/vespa-hadoop/src/test/resources/tabular_data.csv b/vespa-hadoop/src/test/resources/tabular_data.csv
deleted file mode 100644
index 541597998e9..00000000000
--- a/vespa-hadoop/src/test/resources/tabular_data.csv
+++ /dev/null
@@ -1,11 +0,0 @@
-2015110414 clicks 1 testapp
-2015110415 clicks 2 testapp
-2015110416 clicks 5 testapp
-2015110417 clicks 3 testapp
-2015110418 clicks 6 testapp
-2015110419 clicks 3 testapp
-2015110420 clicks 4 testapp
-2015110421 clicks 2 testapp
-2015110422 clicks 5 testapp
-2015110423 clicks 1 testapp
-
diff --git a/vespa-hadoop/src/test/resources/user_ids.csv b/vespa-hadoop/src/test/resources/user_ids.csv
deleted file mode 100644
index 5875a3b9a7c..00000000000
--- a/vespa-hadoop/src/test/resources/user_ids.csv
+++ /dev/null
@@ -1,4 +0,0 @@
-5
-104
-313
-
diff --git a/vespa-hadoop/src/test/resources/visit_data.json b/vespa-hadoop/src/test/resources/visit_data.json
deleted file mode 100644
index 947b9326cc8..00000000000
--- a/vespa-hadoop/src/test/resources/visit_data.json
+++ /dev/null
@@ -1,10 +0,0 @@
-{"id":"id:testapp:metric::clicks-2015110414","fields":{"date":"2015110414","name":"clicks","value":1,"application":"testapp"}}
-{"id":"id:testapp:metric::clicks-2015110415","fields":{"date":"2015110415","name":"clicks","value":2,"application":"testapp"}}
-{"id":"id:testapp:metric::clicks-2015110416","fields":{"date":"2015110416","name":"clicks","value":4,"application":"testapp"}}
-{"id":"id:testapp:metric::clicks-2015110417","fields":{"date":"2015110417","name":"clicks","value":3,"application":"testapp"}}
-{"id":"id:testapp:metric::clicks-2015110418","fields":{"date":"2015110418","name":"clicks","value":6,"application":"testapp"}}
-{"id":"id:testapp:metric::clicks-2015110419","fields":{"date":"2015110419","name":"clicks","value":3,"application":"testapp"}}
-{"id":"id:testapp:metric::clicks-2015110420","fields":{"date":"2015110420","name":"clicks","value":4,"application":"testapp"}}
-{"id":"id:testapp:metric::clicks-2015110421","fields":{"date":"2015110421","name":"clicks","value":2,"application":"testapp"}}
-{"id":"id:testapp:metric::clicks-2015110422","fields":{"date":"2015110422","name":"clicks","value":7,"application":"testapp"}}
-{"id":"id:testapp:metric::clicks-2015110423","fields":{"date":"2015110423","name":"clicks","value":1,"application":"testapp"}} \ No newline at end of file
diff --git a/vespabase/CMakeLists.txt b/vespabase/CMakeLists.txt
index ce19dbb56b3..e72f02d5eeb 100644
--- a/vespabase/CMakeLists.txt
+++ b/vespabase/CMakeLists.txt
@@ -23,7 +23,6 @@ install(FILES ${CMAKE_CURRENT_BINARY_DIR}/conf/default-env.txt DESTINATION conf/
install(DIRECTORY DESTINATION logs/vespa)
install(DIRECTORY DESTINATION logs/vespa/access)
-install(DIRECTORY DESTINATION tmp/vespa)
install(DIRECTORY DESTINATION var/crash)
install(DIRECTORY DESTINATION var/db/vespa)
install(DIRECTORY DESTINATION var/db/vespa/config_server)
@@ -36,6 +35,7 @@ install(DIRECTORY DESTINATION var/db/vespa/search)
install(DIRECTORY DESTINATION var/db/vespa/tmp)
install(DIRECTORY DESTINATION var/jdisc_container)
install(DIRECTORY DESTINATION var/run)
+install(DIRECTORY DESTINATION var/tmp/vespa)
install(DIRECTORY DESTINATION var/vespa)
install(DIRECTORY DESTINATION var/vespa/application)
install(DIRECTORY DESTINATION var/vespa/bundlecache)
diff --git a/vespabase/src/rhel-prestart.sh b/vespabase/src/rhel-prestart.sh
index d6f53046b47..358e9ceccdb 100755
--- a/vespabase/src/rhel-prestart.sh
+++ b/vespabase/src/rhel-prestart.sh
@@ -124,8 +124,6 @@ fixdir ${VESPA_USER} ${VESPA_GROUP} 755 logs/vespa
fixdir ${VESPA_USER} ${VESPA_GROUP} 755 logs/vespa/access
fixdir ${VESPA_USER} ${VESPA_GROUP} 755 logs/vespa/configserver
fixdir ${VESPA_USER} ${VESPA_GROUP} 755 logs/vespa/search
-fixdir ${VESPA_USER} ${VESPA_GROUP} 755 tmp
-fixdir ${VESPA_USER} ${VESPA_GROUP} 755 tmp/vespa
fixdir ${VESPA_USER} ${VESPA_GROUP} 755 var
fixdir ${VESPA_USER} ${VESPA_GROUP} 755 var/crash
fixdir ${VESPA_USER} ${VESPA_GROUP} 755 var/db
@@ -141,6 +139,8 @@ fixdir ${VESPA_USER} ${VESPA_GROUP} 755 var/db/vespa/search
fixdir ${VESPA_USER} ${VESPA_GROUP} 755 var/db/vespa/tmp
fixdir ${VESPA_USER} ${VESPA_GROUP} 755 var/jdisc_container
fixdir ${VESPA_USER} ${VESPA_GROUP} 755 var/run
+fixdir ${VESPA_USER} ${VESPA_GROUP} 755 var/tmp
+fixdir ${VESPA_USER} ${VESPA_GROUP} 755 var/tmp/vespa
fixdir ${VESPA_USER} ${VESPA_GROUP} 755 var/vespa
fixdir ${VESPA_USER} ${VESPA_GROUP} 755 var/vespa/application
fixdir ${VESPA_USER} ${VESPA_GROUP} 755 var/vespa/bundlecache
diff --git a/vespajlib/abi-spec.json b/vespajlib/abi-spec.json
index a22e24aafc2..8a2e68a8d8c 100644
--- a/vespajlib/abi-spec.json
+++ b/vespajlib/abi-spec.json
@@ -3833,6 +3833,7 @@
"public"
],
"methods" : [
+ "public void <init>(java.util.function.Supplier)",
"public void <init>(java.util.function.Supplier, com.yahoo.yolean.concurrent.Memoized$Closer)",
"public static com.yahoo.yolean.concurrent.Memoized of(java.util.function.Supplier)",
"public static com.yahoo.yolean.concurrent.Memoized combine(com.yahoo.yolean.concurrent.Memoized, java.util.function.Function, com.yahoo.yolean.concurrent.Memoized$Closer)",
diff --git a/vespajlib/src/main/java/com/yahoo/compress/ArchiveStreamReader.java b/vespajlib/src/main/java/com/yahoo/compress/ArchiveStreamReader.java
index e65a645f5be..f8faf655415 100644
--- a/vespajlib/src/main/java/com/yahoo/compress/ArchiveStreamReader.java
+++ b/vespajlib/src/main/java/com/yahoo/compress/ArchiveStreamReader.java
@@ -136,8 +136,8 @@ public class ArchiveStreamReader implements AutoCloseable {
// Commons Compress only has limited support for symlinks as they are only detected when the ZIP file is read
// through org.apache.commons.compress.archivers.zip.ZipFile. This is not the case in this class, because it must
// support reading ZIP files from generic input streams. The check below thus always returns false.
- if (entry instanceof ZipArchiveEntry) return ((ZipArchiveEntry) entry).isUnixSymlink();
- if (entry instanceof TarArchiveEntry) return ((TarArchiveEntry) entry).isSymbolicLink();
+ if (entry instanceof ZipArchiveEntry zipEntry) return zipEntry.isUnixSymlink();
+ if (entry instanceof TarArchiveEntry tarEntry) return tarEntry.isSymbolicLink();
throw new IllegalArgumentException("Unsupported archive entry " + entry.getClass().getSimpleName() + ", cannot check for symbolic link");
}
diff --git a/vespajlib/src/main/java/com/yahoo/io/LazyInputStream.java b/vespajlib/src/main/java/com/yahoo/io/LazyInputStream.java
new file mode 100644
index 00000000000..3ff7ada6b59
--- /dev/null
+++ b/vespajlib/src/main/java/com/yahoo/io/LazyInputStream.java
@@ -0,0 +1,53 @@
+package com.yahoo.io;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.function.Supplier;
+
+/**
+ * Input stream wrapping an input stream supplier, which doesn't have content yet at declaration time.
+ *
+ * @author jonmv
+ */
+public class LazyInputStream extends InputStream {
+
+ private Supplier<InputStream> source;
+ private InputStream delegate;
+
+ public LazyInputStream(Supplier<InputStream> source) {
+ this.source = source;
+ }
+
+ private InputStream in() {
+ if (delegate == null) {
+ delegate = source.get();
+ source = null;
+ }
+ return delegate;
+ }
+
+ @Override
+ public int read() throws IOException { return in().read(); }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException { return in().read(b, off, len); }
+
+ @Override
+ public long skip(long n) throws IOException { return in().skip(n); }
+
+ @Override
+ public int available() throws IOException { return in().available(); }
+
+ @Override
+ public void close() throws IOException { in().close(); }
+
+ @Override
+ public synchronized void mark(int readlimit) { in().mark(readlimit); }
+
+ @Override
+ public synchronized void reset() throws IOException { in().reset(); }
+
+ @Override
+ public boolean markSupported() { return in().markSupported(); }
+
+}
diff --git a/vespajlib/src/main/java/com/yahoo/yolean/concurrent/Memoized.java b/vespajlib/src/main/java/com/yahoo/yolean/concurrent/Memoized.java
index 8e2b7b7a7eb..ba5ef7bab2d 100644
--- a/vespajlib/src/main/java/com/yahoo/yolean/concurrent/Memoized.java
+++ b/vespajlib/src/main/java/com/yahoo/yolean/concurrent/Memoized.java
@@ -34,15 +34,23 @@ public class Memoized<T, E extends Exception> implements Supplier<T>, AutoClosea
private volatile T wrapped;
private Supplier<T> factory;
+ /** Returns a new Memoized which has no close method. */
+ public Memoized(Supplier<T> factory) {
+ this(factory, __ -> { });
+ }
+
+ /** Returns a new Memoized with the given factory and closer. */
public Memoized(Supplier<T> factory, Closer<T, E> closer) {
this.factory = requireNonNull(factory);
this.closer = requireNonNull(closer);
}
+ /** Returns a generic AutoCloseable Memoized with the given AutoCloseable-supplier. */
public static <T extends AutoCloseable> Memoized<T, ?> of(Supplier<T> factory) {
return new Memoized<>(factory, AutoCloseable::close);
}
+ /** Composes the given memoized with a function taking its output as an argument to produce a new Memoized, with the given closer. */
public static <T, U, E extends Exception> Memoized<U, E> combine(Memoized<T, ? extends E> inner, Function<T, U> outer, Closer<U, ? extends E> closer) {
return new Memoized<>(() -> outer.apply(inner.get()), compose(closer, inner::close));
}
diff --git a/vespalib/src/tests/coro/lazy/lazy_test.cpp b/vespalib/src/tests/coro/lazy/lazy_test.cpp
index ead5e8e6427..ec27bf195ec 100644
--- a/vespalib/src/tests/coro/lazy/lazy_test.cpp
+++ b/vespalib/src/tests/coro/lazy/lazy_test.cpp
@@ -1,10 +1,11 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/vespalib/coro/lazy.h>
-#include <vespa/vespalib/coro/sync_wait.h>
+#include <vespa/vespalib/coro/completion.h>
#include <vespa/vespalib/coro/schedule.h>
#include <vespa/vespalib/util/size_literals.h>
#include <vespa/vespalib/util/require.h>
+#include <vespa/vespalib/util/gate.h>
#include <vespa/vespalib/util/threadstackexecutor.h>
#include <vespa/vespalib/gtest/gtest.h>
#include <mutex>
@@ -12,7 +13,9 @@
#include <thread>
using vespalib::Executor;
+using vespalib::Gate;
using vespalib::coro::Lazy;
+using vespalib::coro::Received;
using vespalib::coro::ScheduleFailedException;
using vespalib::coro::schedule;
using vespalib::coro::sync_wait;
@@ -56,7 +59,7 @@ Lazy<std::pair<bool,T>> try_schedule_on(Executor &executor, Lazy<T> value) {
std::cerr << "switching from thread " << std::this_thread::get_id() << std::endl;
bool accepted = co_await try_schedule(executor);
std::cerr << "........... to thread " << std::this_thread::get_id() << std::endl;
- co_return std::make_pair(accepted, co_await value);
+ co_return std::make_pair(accepted, co_await std::move(value));
}
template <typename T>
@@ -64,18 +67,18 @@ Lazy<T> schedule_on(Executor &executor, Lazy<T> value) {
std::cerr << "switching from thread " << std::this_thread::get_id() << std::endl;
co_await schedule(executor);
std::cerr << "........... to thread " << std::this_thread::get_id() << std::endl;
- co_return co_await value;
+ co_return co_await std::move(value);
}
TEST(LazyTest, simple_lazy_value) {
auto lazy = make_lazy(42);
- auto result = sync_wait(lazy);
+ auto result = sync_wait(std::move(lazy));
EXPECT_EQ(result, 42);
}
TEST(LazyTest, async_sum_of_async_values) {
auto lazy = async_add_values(10, 20);
- auto result = sync_wait(lazy);
+ auto result = sync_wait(std::move(lazy));
EXPECT_EQ(result, 30);
}
@@ -83,13 +86,13 @@ TEST(LazyTest, async_sum_of_external_async_values) {
auto a = make_lazy(100);
auto b = make_lazy(200);
auto lazy = async_sum(std::move(a), std::move(b));
- auto result = sync_wait(lazy);
+ auto result = sync_wait(std::move(lazy));
EXPECT_EQ(result, 300);
}
TEST(LazyTest, extract_rvalue_from_lazy_in_coroutine) {
auto lazy = extract_rvalue();
- auto result = sync_wait(lazy);
+ auto result = sync_wait(std::move(lazy));
EXPECT_EQ(result, 123);
}
@@ -110,7 +113,7 @@ TEST(LazyTest, calculate_result_in_another_thread) {
TEST(LazyTest, exceptions_are_propagated) {
vespalib::ThreadStackExecutor executor(1, 128_Ki);
auto lazy = try_schedule_on(executor, forward_value(will_throw()));
- EXPECT_THROW(sync_wait(lazy), vespalib::RequireFailedException);
+ EXPECT_THROW(sync_wait(std::move(lazy)), vespalib::RequireFailedException);
}
TEST(LazyTest, not_able_to_switch_thread_if_executor_is_shut_down) {
@@ -120,7 +123,49 @@ TEST(LazyTest, not_able_to_switch_thread_if_executor_is_shut_down) {
EXPECT_EQ(result.first, false);
EXPECT_EQ(result.second, 7);
auto lazy = schedule_on(executor, make_lazy(8));
- EXPECT_THROW(sync_wait(lazy), ScheduleFailedException);
+ EXPECT_THROW(sync_wait(std::move(lazy)), ScheduleFailedException);
+}
+
+TEST(LazyTest, async_wait_with_lambda) {
+ Gate gate;
+ Received<int> result;
+ vespalib::ThreadStackExecutor executor(1, 128_Ki);
+ auto lazy = schedule_on(executor, make_lazy(7));
+ async_wait(std::move(lazy), [&](auto res)
+ {
+ result = res;
+ gate.countDown();
+ });
+ gate.await();
+ EXPECT_EQ(result.get_value(), 7);
+}
+
+TEST(LazyTest, async_wait_with_error) {
+ Gate gate;
+ Received<int> result;
+ vespalib::ThreadStackExecutor executor(1, 128_Ki);
+ auto lazy = schedule_on(executor, will_throw());
+ async_wait(std::move(lazy), [&](auto res)
+ {
+ result = res;
+ gate.countDown();
+ });
+ gate.await();
+ EXPECT_THROW(result.get_value(), vespalib::RequireFailedException);
+}
+
+TEST(LazyTest, async_wait_with_move_only_result) {
+ Gate gate;
+ Received<std::unique_ptr<int>> result;
+ vespalib::ThreadStackExecutor executor(1, 128_Ki);
+ auto lazy = schedule_on(executor, move_only_int());
+ async_wait(std::move(lazy), [&](auto res)
+ {
+ result = std::move(res);
+ gate.countDown();
+ });
+ gate.await();
+ EXPECT_EQ(*(result.get_value()), 123);
}
GTEST_MAIN_RUN_ALL_TESTS()
diff --git a/vespalib/src/vespa/vespalib/coro/completion.h b/vespalib/src/vespa/vespalib/coro/completion.h
new file mode 100644
index 00000000000..f323d8c68bf
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/coro/completion.h
@@ -0,0 +1,104 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "lazy.h"
+#include "detached.h"
+#include "received.h"
+
+#include <coroutine>
+#include <exception>
+#include <future>
+#include <type_traits>
+
+namespace vespalib::coro {
+
+// Resume/start the coroutine responsible for calculating the result
+// and signal the receiver when it completes or fails. Note that the
+// detached coroutine will own both the coroutine calculating the
+// result and the receiver that is later notified of the result. The
+// detached coroutine will automatically self-destroy when it returns,
+// thereby also destroying the value and receiver. This is the
+// fundamental building block used to adapt the asynchronous result of
+// a coroutine with external code. This also closely models abstract
+// execution where the coroutine represented by Lazy<T> is the
+// sender. Execution parameters can be encapsulated inside Lazy<T>
+// using composition (for example which executor should run the
+// coroutine).
+
+template <typename T, typename R>
+Detached connect_resume(Lazy<T> value, R receiver) {
+ try {
+ receiver.set_value(co_await std::move(value));
+ } catch (...) {
+ receiver.set_error(std::current_exception());
+ }
+}
+
+// replace Lazy<T> with std::future<T> to be able to synchronously
+// wait for its completion. Implemented in terms of connect_resume.
+
+template <typename T>
+std::future<T> make_future(Lazy<T> value) {
+ struct receiver {
+ std::promise<T> promise;
+ receiver() : promise() {}
+ void set_value(T value) {
+ promise.set_value(std::move(value));
+ }
+ void set_error(std::exception_ptr error) {
+ promise.set_exception(error);
+ }
+ };
+ receiver my_receiver;
+ auto future = my_receiver.promise.get_future();
+ connect_resume(std::move(value), std::move(my_receiver));
+ return future;
+}
+
+// Create a receiver from a function object (typically a lambda
+// closure) that takes a received value (stored receiver result) as
+// its only parameter.
+
+template <typename T, typename F>
+auto make_receiver(F &&f) {
+ struct receiver {
+ Received<T> result;
+ std::decay_t<F> fun;
+ receiver(F &&f)
+ : result(), fun(std::forward<F>(f)) {}
+ void set_value(T value) {
+ result.set_value(std::move(value));
+ fun(std::move(result));
+ }
+ void set_error(std::exception_ptr why) {
+ result.set_error(why);
+ fun(std::move(result));
+ }
+ };
+ return receiver(std::forward<F>(f));
+}
+
+/**
+ * Wait for a lazy value to be calculated synchronously. Make sure the
+ * thread waiting is not needed in the calculation of the value, or
+ * you will end up with a deadlock.
+ **/
+template <typename T>
+T sync_wait(Lazy<T> value) {
+ return make_future(std::move(value)).get();
+}
+
+/**
+ * Wait for a lazy value to be calculated asynchronously; the provided
+ * callback will be called with a Received<T> when the Lazy<T> is
+ * done. Both the callback itself and the Lazy<T> will be destructed
+ * afterwards; cleaning up the coroutine tree representing the
+ * calculation.
+ **/
+template <typename T, typename F>
+void async_wait(Lazy<T> value, F &&f) {
+ connect_resume(std::move(value), make_receiver<T>(std::forward<F>(f)));
+}
+
+}
diff --git a/vespalib/src/vespa/vespalib/coro/lazy.h b/vespalib/src/vespa/vespalib/coro/lazy.h
index 5a10c05bc24..144b5c945f0 100644
--- a/vespalib/src/vespa/vespalib/coro/lazy.h
+++ b/vespalib/src/vespa/vespalib/coro/lazy.h
@@ -64,6 +64,7 @@ public:
}
return std::move(*value);
}
+ ~promise_type();
};
using Handle = std::coroutine_handle<promise_type>;
@@ -108,4 +109,7 @@ public:
}
};
+template<typename T>
+Lazy<T>::promise_type::~promise_type() = default;
+
}
diff --git a/vespalib/src/vespa/vespalib/coro/received.h b/vespalib/src/vespa/vespalib/coro/received.h
new file mode 100644
index 00000000000..4f2efddcfa1
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/coro/received.h
@@ -0,0 +1,46 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <variant>
+#include <exception>
+#include <stdexcept>
+
+namespace vespalib::coro {
+
+struct UnavailableResultException : std::runtime_error {
+ using std::runtime_error::runtime_error;
+};
+
+/**
+ * Simple value wrapper that stores the result observed by a receiver
+ * (value/error/done). A receiver is the continuation of an
+ * asynchronous operation in the world of executors.
+ **/
+template <std::movable T>
+class Received {
+private:
+ std::variant<std::exception_ptr,T> _value;
+public:
+ Received() : _value() {}
+ void set_value(T value) { _value.template emplace<1>(std::move(value)); }
+ void set_error(std::exception_ptr exception) { _value.template emplace<0>(exception); }
+ void set_done() { _value.template emplace<0>(nullptr); }
+ bool has_value() const { return (_value.index() == 1); }
+ bool has_error() const { return (_value.index() == 0) && bool(std::get<0>(_value)); }
+ bool was_canceled() const { return !has_value() && !has_error(); }
+ std::exception_ptr get_error() const { return has_error() ? std::get<0>(_value) : std::exception_ptr(); }
+ T get_value() {
+ if (_value.index() == 1) {
+ return std::move(std::get<1>(_value));
+ } else {
+ if (auto ex = std::get<0>(_value)) {
+ std::rethrow_exception(ex);
+ } else {
+ throw UnavailableResultException("tried to access the result of a canceled operation");
+ }
+ }
+ }
+};
+
+}
diff --git a/vespalib/src/vespa/vespalib/coro/schedule.h b/vespalib/src/vespa/vespalib/coro/schedule.h
index 6dfa5b9536c..71a384f356f 100644
--- a/vespalib/src/vespa/vespalib/coro/schedule.h
+++ b/vespalib/src/vespa/vespalib/coro/schedule.h
@@ -5,6 +5,7 @@
#include <vespa/vespalib/util/executor.h>
#include <coroutine>
#include <exception>
+#include <stdexcept>
namespace vespalib::coro {
diff --git a/vespalib/src/vespa/vespalib/coro/sync_wait.h b/vespalib/src/vespa/vespalib/coro/sync_wait.h
deleted file mode 100644
index bdea2dfc7f0..00000000000
--- a/vespalib/src/vespa/vespalib/coro/sync_wait.h
+++ /dev/null
@@ -1,59 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#pragma once
-
-#include "detached.h"
-#include "lazy.h"
-#include <vespa/vespalib/util/gate.h>
-
-#include <coroutine>
-#include <exception>
-
-namespace vespalib::coro {
-
-template <typename T, typename S>
-Detached signal_when_done(Lazy<T> &value, S &sink) {
- try {
- sink(co_await value);
- } catch (...) {
- sink(std::current_exception());
- }
-}
-
-/**
- * Wait for a lazy value to be calculated (note that waiting for a
- * value will also start calculating it). Make sure the thread waiting
- * is not needed in the calculation of the value, or you will end up
- * with a deadlock.
- **/
-template <typename T>
-T &sync_wait(Lazy<T> &value) {
- struct MySink {
- Gate gate;
- T *result;
- std::exception_ptr exception;
- void operator()(T &result_in) {
- result = &result_in;
- gate.countDown();
- }
- void operator()(std::exception_ptr exception_in) {
- exception = exception_in;
- gate.countDown();
- }
- MySink() : gate(), result(nullptr), exception() {}
- };
- MySink sink;
- signal_when_done(value, sink);
- sink.gate.await();
- if (sink.exception) {
- std::rethrow_exception(sink.exception);
- }
- return *sink.result;
-}
-
-template <typename T>
-T &&sync_wait(Lazy<T> &&value) {
- return std::move(sync_wait(value));
-}
-
-}