summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--athenz-identity-provider-service/src/main/java/com/yahoo/vespa/hosted/athenz/instanceproviderservice/ConfigserverSslContextFactoryProvider.java5
-rw-r--r--athenz-identity-provider-service/src/test/java/com/yahoo/vespa/hosted/ca/restapi/CertificateAuthorityApiTest.java4
-rw-r--r--clustercontroller-reindexer/pom.xml6
-rw-r--r--clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java235
-rw-r--r--clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexing.java2
-rw-r--r--clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java42
-rw-r--r--clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java142
-rw-r--r--clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java182
-rw-r--r--clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingCuratorTest.java4
-rw-r--r--clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingMaintainerTest.java138
-rw-r--r--clustercontroller-reindexer/src/test/resources/schemas/music.sd3
-rw-r--r--config-application-package/src/main/java/com/yahoo/config/model/application/provider/FilesApplicationPackage.java27
-rw-r--r--configserver/pom.xml5
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/application/ApplicationReindexing.java20
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/application/ConfigConvergenceChecker.java203
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/serviceview/ConfigServerLocation.java1
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/serviceview/StateResource.java1
-rw-r--r--configserver/src/test/java/com/yahoo/vespa/config/server/application/ApplicationReindexingTest.java11
-rw-r--r--configserver/src/test/java/com/yahoo/vespa/config/server/application/ConfigConvergenceCheckerTest.java32
-rw-r--r--configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandlerTest.java19
-rw-r--r--configserver/src/test/java/com/yahoo/vespa/config/server/maintenance/ReindexingMaintainerTest.java5
-rw-r--r--controller-server/src/main/java/com/yahoo/vespa/hosted/controller/athenz/impl/AthenzClientFactoryImpl.java25
-rw-r--r--controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/CloudEventReporter.java2
-rw-r--r--document/src/main/java/com/yahoo/document/fieldset/FieldSetRepo.java23
-rw-r--r--document/src/vespa/document/base/globalid.cpp33
-rw-r--r--document/src/vespa/document/base/globalid.h19
-rwxr-xr-xdocumentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java4
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java47
-rwxr-xr-xdocumentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java2
-rw-r--r--eval/src/vespa/eval/eval/fast_sparse_map.cpp2
-rw-r--r--eval/src/vespa/eval/eval/fast_sparse_map.h2
-rw-r--r--eval/src/vespa/eval/eval/fast_value.hpp108
-rw-r--r--jaxrs_client_utils/src/main/java/ai/vespa/util/http/VespaClientBuilderFactory.java2
-rw-r--r--jaxrs_client_utils/src/main/java/com/yahoo/vespa/jaxrs/client/VespaJerseyJaxRsClientFactory.java3
-rw-r--r--node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/identity/AthenzCredentialsMaintainer.java4
-rw-r--r--node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/QuestMetricsDb.java7
-rw-r--r--orchestrator/src/main/java/com/yahoo/vespa/orchestrator/controller/RetryingClusterControllerClientFactory.java1
-rw-r--r--orchestrator/src/test/java/com/yahoo/vespa/orchestrator/controller/RetryingClusterControllerClientFactoryTest.java1
-rwxr-xr-xsearchcore/src/apps/vespa-feed-bm/runtest.sh32
-rw-r--r--searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp11
-rw-r--r--searchcore/src/tests/proton/bucketdb/bucketdb/bucketdb_test.cpp70
-rw-r--r--searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp2
-rw-r--r--searchcore/src/tests/proton/documentmetastore/documentmetastore_test.cpp22
-rw-r--r--searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/proton/attribute/ifieldupdatecallback.h7
-rw-r--r--searchcore/src/vespa/searchcore/proton/bucketdb/CMakeLists.txt1
-rw-r--r--searchcore/src/vespa/searchcore/proton/bucketdb/bucketstate.cpp80
-rw-r--r--searchcore/src/vespa/searchcore/proton/bucketdb/bucketstate.h6
-rw-r--r--searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregator.cpp18
-rw-r--r--searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregator.h9
-rw-r--r--searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregators.cpp81
-rw-r--r--searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregators.h31
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp79
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h25
-rw-r--r--vespa-athenz/src/main/java/com/yahoo/vespa/athenz/aws/AwsCredentials.java4
-rw-r--r--vespa-athenz/src/main/java/com/yahoo/vespa/athenz/client/common/ClientBase.java19
-rw-r--r--vespa-athenz/src/main/java/com/yahoo/vespa/athenz/client/zms/DefaultZmsClient.java13
-rw-r--r--vespa-athenz/src/main/java/com/yahoo/vespa/athenz/client/zts/DefaultZtsClient.java58
-rw-r--r--vespa-athenz/src/main/java/com/yahoo/vespa/athenz/identityprovider/client/AthenzCredentialsService.java4
-rw-r--r--vespa-athenz/src/main/java/com/yahoo/vespa/athenz/identityprovider/client/AthenzIdentityProviderImpl.java2
61 files changed, 1331 insertions, 619 deletions
diff --git a/athenz-identity-provider-service/src/main/java/com/yahoo/vespa/hosted/athenz/instanceproviderservice/ConfigserverSslContextFactoryProvider.java b/athenz-identity-provider-service/src/main/java/com/yahoo/vespa/hosted/athenz/instanceproviderservice/ConfigserverSslContextFactoryProvider.java
index 880646e37e5..ae4a5933ac2 100644
--- a/athenz-identity-provider-service/src/main/java/com/yahoo/vespa/hosted/athenz/instanceproviderservice/ConfigserverSslContextFactoryProvider.java
+++ b/athenz-identity-provider-service/src/main/java/com/yahoo/vespa/hosted/athenz/instanceproviderservice/ConfigserverSslContextFactoryProvider.java
@@ -3,6 +3,7 @@ package com.yahoo.vespa.hosted.athenz.instanceproviderservice;
import com.google.inject.Inject;
import com.yahoo.jdisc.http.ssl.impl.TlsContextBasedProvider;
+import java.util.logging.Level;
import com.yahoo.security.KeyStoreBuilder;
import com.yahoo.security.KeyStoreType;
import com.yahoo.security.KeyUtils;
@@ -36,7 +37,6 @@ import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.logging.Level;
import java.util.logging.Logger;
/**
@@ -66,8 +66,7 @@ public class ConfigserverSslContextFactoryProvider extends TlsContextBasedProvid
KeyProvider keyProvider,
AthenzProviderServiceConfig config) {
this.athenzProviderServiceConfig = config;
- this.ztsClient = new DefaultZtsClient.Builder(URI.create(athenzProviderServiceConfig.ztsUrl()))
- .withIdentityProvider(bootstrapIdentity).build();
+ this.ztsClient = new DefaultZtsClient(URI.create(athenzProviderServiceConfig.ztsUrl()), bootstrapIdentity);
this.keyProvider = keyProvider;
this.configserverIdentity = new AthenzService(athenzProviderServiceConfig.domain(), athenzProviderServiceConfig.serviceName());
diff --git a/athenz-identity-provider-service/src/test/java/com/yahoo/vespa/hosted/ca/restapi/CertificateAuthorityApiTest.java b/athenz-identity-provider-service/src/test/java/com/yahoo/vespa/hosted/ca/restapi/CertificateAuthorityApiTest.java
index bf2a6719842..343a9feeed6 100644
--- a/athenz-identity-provider-service/src/test/java/com/yahoo/vespa/hosted/ca/restapi/CertificateAuthorityApiTest.java
+++ b/athenz-identity-provider-service/src/test/java/com/yahoo/vespa/hosted/ca/restapi/CertificateAuthorityApiTest.java
@@ -8,11 +8,11 @@ import com.yahoo.security.KeyUtils;
import com.yahoo.security.Pkcs10Csr;
import com.yahoo.security.Pkcs10CsrUtils;
import com.yahoo.security.X509CertificateUtils;
-import com.yahoo.slime.SlimeUtils;
import com.yahoo.text.StringUtilities;
import com.yahoo.vespa.athenz.api.AthenzPrincipal;
import com.yahoo.vespa.athenz.api.AthenzService;
import com.yahoo.vespa.athenz.client.zts.DefaultZtsClient;
+import com.yahoo.slime.SlimeUtils;
import com.yahoo.vespa.hosted.ca.CertificateTester;
import org.apache.http.client.ResponseHandler;
import org.apache.http.client.methods.HttpUriRequest;
@@ -224,7 +224,7 @@ public class CertificateAuthorityApiTest extends ContainerTester {
private final X509Certificate certificate;
public TestZtsClient(Principal principal, X509Certificate certificate, URI ztsUrl, SSLContext sslContext) {
- super(ztsUrl, () -> sslContext, null, ErrorHandler.empty());
+ super(ztsUrl, sslContext);
this.principal = principal;
this.certificate = certificate;
}
diff --git a/clustercontroller-reindexer/pom.xml b/clustercontroller-reindexer/pom.xml
index 3791ea7d3f4..172bff6fdb4 100644
--- a/clustercontroller-reindexer/pom.xml
+++ b/clustercontroller-reindexer/pom.xml
@@ -48,6 +48,12 @@
</dependency>
<dependency>
<groupId>com.yahoo.vespa</groupId>
+ <artifactId>testutil</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.yahoo.vespa</groupId>
<artifactId>config-model</artifactId>
<version>${project.version}</version>
<scope>test</scope>
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java
new file mode 100644
index 00000000000..9e7e2880036
--- /dev/null
+++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java
@@ -0,0 +1,235 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.reindexing;
+
+import ai.vespa.reindexing.Reindexing.Status;
+import ai.vespa.reindexing.ReindexingCurator.ReindexingLockException;
+import com.yahoo.document.DocumentType;
+import com.yahoo.document.select.parser.ParseException;
+import com.yahoo.documentapi.DocumentAccess;
+import com.yahoo.documentapi.ProgressToken;
+import com.yahoo.documentapi.VisitorControlHandler;
+import com.yahoo.documentapi.VisitorParameters;
+import com.yahoo.documentapi.VisitorSession;
+import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
+import com.yahoo.vespa.curator.Lock;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TreeMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.logging.Logger;
+
+import static java.util.Objects.requireNonNull;
+import static java.util.logging.Level.FINE;
+import static java.util.logging.Level.INFO;
+import static java.util.logging.Level.WARNING;
+import static java.util.stream.Collectors.joining;
+
+/**
+ * Progresses reindexing efforts by creating visitor sessions against its own content cluster,
+ * which send documents straight to storage — via indexing if the documenet type has "index" mode.
+ * The {@link #reindex} method blocks until shutdown is called, or until no more reindexing is left to do.
+ *
+ * @author jonmv
+ */
+public class Reindexer {
+
+ private static final Logger log = Logger.getLogger(Reindexer.class.getName());
+
+ private final Cluster cluster;
+ private final Map<DocumentType, Instant> ready;
+ private final ReindexingCurator database;
+ private final DocumentAccess access;
+ private final Clock clock;
+ private final Phaser phaser = new Phaser(2); // Reindexer and visitor.
+
+ private Reindexing reindexing;
+ private Status status;
+
+ public Reindexer(Cluster cluster, Map<DocumentType, Instant> ready, ReindexingCurator database,
+ DocumentAccess access, Clock clock) {
+ for (DocumentType type : ready.keySet())
+ cluster.bucketSpaceOf(type); // Verifies this is known.
+
+ this.cluster = cluster;
+ this.ready = new TreeMap<>(ready); // Iterate through document types in consistent order.
+ this.database = database;
+ this.access = access;
+ this.clock = clock;
+ }
+
+ /** Lets the reindexere abort any ongoing visit session, wait for it to complete normally, then exit. */
+ public void shutdown() {
+ phaser.forceTermination(); // All parties waiting on this phaser are immediately allowed to proceed.
+ }
+
+ /** Starts and tracks reprocessing of ready document types until done, or interrupted. */
+ public void reindex() throws ReindexingLockException {
+ if (phaser.isTerminated())
+ throw new IllegalStateException("Already shut down");
+
+ try (Lock lock = database.lockReindexing()) {
+ for (DocumentType type : ready.keySet()) { // We consider only document types for which we have config.
+ if (ready.get(type).isAfter(clock.instant()))
+ log.log(INFO, "Received config for reindexing which is ready in the future — will process later " +
+ "(" + ready.get(type) + " is after " + clock.instant() + ")");
+ else
+ progress(type);
+
+ if (phaser.isTerminated())
+ break;
+ }
+ }
+ }
+
+ @SuppressWarnings("fallthrough") // (ノಠ ∩ಠ)ノ彡( \o°o)\
+ private void progress(DocumentType type) {
+ // If this is a new document type (or a new cluster), no reindexing is required.
+ reindexing = database.readReindexing();
+ status = reindexing.status().getOrDefault(type,
+ Status.ready(clock.instant())
+ .running()
+ .successful(clock.instant()));
+ if (ready.get(type).isAfter(status.startedAt()))
+ status = Status.ready(clock.instant()); // Need to restart, as a newer reindexing is required.
+
+ database.writeReindexing(reindexing = reindexing.with(type, status));
+
+ switch (status.state()) {
+ default:
+ log.log(WARNING, "Unknown reindexing state '" + status.state() + "'");
+ case FAILED:
+ log.log(FINE, () -> "Not continuing reindexing of " + type + " due to previous failure");
+ case SUCCESSFUL: // Intentional fallthrough — all three are done states.
+ return;
+ case RUNNING:
+ log.log(WARNING, "Unexpected state 'RUNNING' of reindexing of " + type);
+ case READY: // Intentional fallthrough — must just assume we failed updating state when exiting previously.
+ log.log(FINE, () -> "Running reindexing of " + type);
+ }
+
+ // Visit buckets until they're all done, or until we are interrupted.
+ status = status.running();
+ AtomicReference<Instant> progressLastStored = new AtomicReference<>(clock.instant());
+ VisitorControlHandler control = new VisitorControlHandler() {
+ @Override
+ public void onProgress(ProgressToken token) {
+ super.onProgress(token);
+ status = status.progressed(token);
+ if (progressLastStored.get().isBefore(clock.instant().minusSeconds(10))) {
+ progressLastStored.set(clock.instant());
+ database.writeReindexing(reindexing = reindexing.with(type, status));
+ }
+ }
+ @Override
+ public void onDone(CompletionCode code, String message) {
+ super.onDone(code, message);
+ phaser.arriveAndAwaitAdvance(); // Synchronize with the reindex thread.
+ }
+ };
+ visit(type, status.progress().orElse(null), control);
+
+ // If we were interrupted, the result may not yet be set in the control handler.
+ switch (control.getResult().getCode()) {
+ default:
+ log.log(WARNING, "Unexpected visitor result '" + control.getResult().getCode() + "'");
+ case FAILURE: // Intentional fallthrough — this is an error.
+ log.log(WARNING, "Visiting failed: " + control.getResult().getMessage());
+ status = status.failed(clock.instant(), control.getResult().getMessage());
+ break;
+ case ABORTED:
+ log.log(FINE, () -> "Halting reindexing of " + type + " due to shutdown — will continue later");
+ status = status.halted();
+ break;
+ case SUCCESS:
+ log.log(INFO, "Completed reindexing of " + type + " after " + Duration.between(status.startedAt(), clock.instant()));
+ status = status.successful(clock.instant());
+ }
+ database.writeReindexing(reindexing.with(type, status));
+ }
+
+ private void visit(DocumentType type, ProgressToken progress, VisitorControlHandler control) {
+ VisitorParameters parameters = createParameters(type, progress);
+ parameters.setControlHandler(control);
+ VisitorSession session;
+ try {
+ session = access.createVisitorSession(parameters);
+ }
+ catch (ParseException e) {
+ throw new IllegalStateException(e);
+ }
+
+ // Wait until done; or until termination is forced, in which case we abort the visit and wait for it to complete.
+ phaser.arriveAndAwaitAdvance(); // Synchronize with the visitor completion thread.
+ session.destroy();
+ }
+
+ VisitorParameters createParameters(DocumentType type, ProgressToken progress) {
+ VisitorParameters parameters = new VisitorParameters(type.getName());
+ parameters.setRemoteDataHandler(cluster.name());
+ parameters.setResumeToken(progress);
+ parameters.setFieldSet(type.getName() + ":[document]");
+ parameters.setPriority(DocumentProtocol.Priority.LOW_1);
+ parameters.setRoute(cluster.route());
+ parameters.setBucketSpace(cluster.bucketSpaceOf(type));
+ // parameters.setVisitorLibrary("ReindexVisitor"); // TODO jonmv: Use when ready, or perhaps an argument to the DumpVisitor is enough?
+ return parameters;
+ }
+
+
+ static class Cluster {
+
+ private final String name;
+ private final String configId;
+ private final Map<DocumentType, String> documentBuckets;
+
+ Cluster(String name, String configId, Map<DocumentType, String> documentBuckets) {
+ this.name = requireNonNull(name);
+ this.configId = requireNonNull(configId);
+ this.documentBuckets = Map.copyOf(documentBuckets);
+ }
+
+ String name() {
+ return name;
+ }
+
+ String route() {
+ return "[Storage:cluster=" + name + ";clusterconfigid=" + configId + "]";
+ }
+
+ String bucketSpaceOf(DocumentType documentType) {
+ return requireNonNull(documentBuckets.get(documentType), "Unknown bucket space for " + documentType);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ Cluster cluster = (Cluster) o;
+ return name.equals(cluster.name) &&
+ configId.equals(cluster.configId) &&
+ documentBuckets.equals(cluster.documentBuckets);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name, configId, documentBuckets);
+ }
+
+ @Override
+ public String toString() {
+ return "Cluster{" +
+ "name='" + name + '\'' +
+ ", configId='" + configId + '\'' +
+ ", documentBuckets=" + documentBuckets +
+ '}';
+ }
+
+ }
+
+}
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexing.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexing.java
index e358beb2deb..792889e4aa8 100644
--- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexing.java
+++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexing.java
@@ -86,7 +86,7 @@ public class Reindexing {
/** Returns a new, empty status, with no progress or result, in state READY. */
public static Status ready(Instant now) {
- return new Status(requireNonNull(now), null, new ProgressToken(), State.READY, null);
+ return new Status(requireNonNull(now), null, null, State.READY, null);
}
/** Returns a copy of this, in state RUNNING. */
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java
index 1158b2c9d3f..2044e6869f6 100644
--- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java
+++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java
@@ -2,6 +2,7 @@
package ai.vespa.reindexing;
import ai.vespa.reindexing.Reindexing.Status;
+import com.google.common.util.concurrent.UncheckedTimeoutException;
import com.yahoo.document.DocumentTypeManager;
import com.yahoo.documentapi.ProgressToken;
import com.yahoo.path.Path;
@@ -10,8 +11,10 @@ import com.yahoo.slime.Inspector;
import com.yahoo.slime.Slime;
import com.yahoo.slime.SlimeUtils;
import com.yahoo.vespa.curator.Curator;
+import com.yahoo.vespa.curator.Lock;
import com.yahoo.yolean.Exceptions;
+import java.time.Duration;
import java.time.Instant;
import java.util.function.Function;
@@ -33,25 +36,45 @@ public class ReindexingCurator {
private static final String STATE = "state";
private static final String MESSAGE = "message";
- private static final Path statusPath = Path.fromString("/reindexing/v1/status");
-
private final Curator curator;
+ private final String clusterName;
private final ReindexingSerializer serializer;
+ private final Duration lockTimeout;
+
+ public ReindexingCurator(Curator curator, String clusterName, DocumentTypeManager manager) {
+ this(curator, clusterName, manager, Duration.ofSeconds(1));
+ }
- public ReindexingCurator(Curator curator, DocumentTypeManager manager) {
+ ReindexingCurator(Curator curator, String clusterName, DocumentTypeManager manager, Duration lockTimeout) {
this.curator = curator;
+ this.clusterName = clusterName;
this.serializer = new ReindexingSerializer(manager);
+ this.lockTimeout = lockTimeout;
}
public Reindexing readReindexing() {
- return curator.getData(statusPath).map(serializer::deserialize)
+ return curator.getData(statusPath()).map(serializer::deserialize)
.orElse(Reindexing.empty());
}
public void writeReindexing(Reindexing reindexing) {
- curator.set(statusPath, serializer.serialize(reindexing));
+ curator.set(statusPath(), serializer.serialize(reindexing));
+ }
+
+ /** This lock must be held to manipulate reindexing state, or by whoever has a running visitor. */
+ public Lock lockReindexing() throws ReindexingLockException {
+ try {
+ return curator.lock(lockPath(), lockTimeout);
+ }
+ catch (UncheckedTimeoutException e) { // TODO jonmv: Avoid use of guava classes.
+ throw new ReindexingLockException(e);
+ }
}
+ private Path rootPath() { return Path.fromString("/reindexing/v1/" + clusterName); }
+ private Path statusPath() { return rootPath().append("status"); }
+ private Path lockPath() { return rootPath().append("lock"); }
+
private static class ReindexingSerializer {
@@ -117,4 +140,13 @@ public class ReindexingCurator {
}
+ /** Indicates that taking the reindexing lock failed within the alotted time. */
+ static class ReindexingLockException extends Exception {
+
+ ReindexingLockException(UncheckedTimeoutException cause) {
+ super("Failed to obtain the reindexing lock", cause);
+ }
+
+ }
+
}
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java
new file mode 100644
index 00000000000..0cf7f9eed9a
--- /dev/null
+++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java
@@ -0,0 +1,142 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.reindexing;
+
+import ai.vespa.reindexing.Reindexer.Cluster;
+import ai.vespa.reindexing.ReindexingCurator.ReindexingLockException;
+import com.google.inject.Inject;
+import com.yahoo.cloud.config.ClusterListConfig;
+import com.yahoo.cloud.config.ZookeepersConfig;
+import com.yahoo.component.AbstractComponent;
+import com.yahoo.concurrent.DaemonThreadFactory;
+import com.yahoo.document.DocumentType;
+import com.yahoo.document.DocumentTypeManager;
+import com.yahoo.document.config.DocumentmanagerConfig;
+import com.yahoo.documentapi.DocumentAccess;
+import com.yahoo.net.HostName;
+import com.yahoo.vespa.config.content.AllClustersBucketSpacesConfig;
+import com.yahoo.vespa.config.content.reindexing.ReindexingConfig;
+import com.yahoo.vespa.curator.Curator;
+import com.yahoo.vespa.zookeeper.VespaZooKeeperServer;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import java.util.logging.Logger;
+import java.util.stream.Stream;
+
+import static java.util.logging.Level.FINE;
+import static java.util.logging.Level.WARNING;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+import static java.util.stream.Collectors.toUnmodifiableMap;
+
+/**
+ * Runs in all cluster controller containers, and progresses reindexing efforts.
+ * Work is only done by one container at a time, by requiring a shared ZooKeeper lock to be held while visiting.
+ * Whichever maintainer gets the lock holds it until all reindexing is done, or until shutdown.
+ *
+ * @author jonmv
+ */
+public class ReindexingMaintainer extends AbstractComponent {
+
+ private static final Logger log = Logger.getLogger(Reindexing.class.getName());
+
+ private final Reindexer reindexer;
+ private final ScheduledExecutorService executor;
+
+ // VespaZooKeeperServer dependency to ensure the ZK cluster is running.
+ @Inject
+ public ReindexingMaintainer(VespaZooKeeperServer zooKeeperServer, DocumentAccess access, ZookeepersConfig zookeepersConfig,
+ ClusterListConfig clusterListConfig, AllClustersBucketSpacesConfig allClustersBucketSpacesConfig,
+ ReindexingConfig reindexingConfig, DocumentmanagerConfig documentmanagerConfig) {
+ this(Clock.systemUTC(), access, zookeepersConfig, clusterListConfig, allClustersBucketSpacesConfig, reindexingConfig, documentmanagerConfig);
+ }
+
+ ReindexingMaintainer(Clock clock, DocumentAccess access, ZookeepersConfig zookeepersConfig,
+ ClusterListConfig clusterListConfig, AllClustersBucketSpacesConfig allClustersBucketSpacesConfig,
+ ReindexingConfig reindexingConfig, DocumentmanagerConfig documentmanagerConfig) {
+ DocumentTypeManager manager = new DocumentTypeManager(documentmanagerConfig);
+ this.reindexer = new Reindexer(parseCluster(reindexingConfig.clusterName(), clusterListConfig, allClustersBucketSpacesConfig, manager),
+ parseReady(reindexingConfig, manager),
+ new ReindexingCurator(Curator.create(zookeepersConfig.zookeeperserverlist()),
+ reindexingConfig.clusterName(),
+ manager),
+ access,
+ clock);
+ this.executor = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory("reindexer-"));
+ if (reindexingConfig.enabled())
+ scheduleStaggered((delayMillis, intervalMillis) -> executor.scheduleAtFixedRate(this::maintain, delayMillis, intervalMillis, TimeUnit.MILLISECONDS),
+ Duration.ofMinutes(1), clock.instant(), HostName.getLocalhost(), zookeepersConfig.zookeeperserverlist());
+ }
+
+ private void maintain() {
+ try {
+ reindexer.reindex();
+ }
+ catch (ReindexingLockException e) {
+ log.log(FINE, "Failed to acquire reindexing lock");
+ }
+ catch (Exception e) {
+ log.log(WARNING, "Exception when reindexing", e);
+ }
+ }
+
+ @Override
+ public void deconstruct() {
+ try {
+ reindexer.shutdown();
+ executor.shutdown();
+ if ( ! executor.awaitTermination(45, TimeUnit.SECONDS))
+ log.log(WARNING, "Failed to shut down reindexer within timeout");
+ }
+ catch (InterruptedException e) {
+ log.log(WARNING, "Interrupted while waiting for reindexer to shut down");
+ Thread.currentThread().interrupt();
+ }
+
+ }
+
+ static Map<DocumentType, Instant> parseReady(ReindexingConfig config, DocumentTypeManager manager) {
+ return config.status().entrySet().stream()
+ .collect(toUnmodifiableMap(typeStatus -> manager.getDocumentType(typeStatus.getKey()),
+ typeStatus -> Instant.ofEpochMilli(typeStatus.getValue().readyAtMillis())));
+ }
+
+ /** Schedules a task with the given interval (across all containers in this ZK cluster). */
+ static void scheduleStaggered(BiConsumer<Long, Long> scheduler,
+ Duration interval, Instant now,
+ String hostname, String clusterHostnames) {
+ long delayMillis = 0;
+ long intervalMillis = interval.toMillis();
+ List<String> hostnames = Stream.of(clusterHostnames.split(","))
+ .map(hostPort -> hostPort.split(":")[0])
+ .collect(toList());
+ if (hostnames.contains(hostname)) {
+ long offset = hostnames.indexOf(hostname) * intervalMillis;
+ intervalMillis *= hostnames.size();
+ delayMillis = Math.floorMod(offset - now.toEpochMilli(), intervalMillis);
+ }
+ scheduler.accept(delayMillis, intervalMillis);
+ }
+
+ static Cluster parseCluster(String name, ClusterListConfig clusters, AllClustersBucketSpacesConfig bucketSpaces,
+ DocumentTypeManager manager) {
+ return clusters.storage().stream()
+ .filter(storage -> storage.name().equals(name))
+ .map(storage -> new Cluster(name,
+ storage.configid(),
+ bucketSpaces.cluster(name)
+ .documentType().entrySet().stream()
+ .collect(toMap(entry -> manager.getDocumentType(entry.getKey()),
+ entry -> entry.getValue().bucketSpace()))))
+ .findAny()
+ .orElseThrow(() -> new IllegalStateException("This cluster (" + name + ") not among the list of clusters"));
+ }
+
+}
diff --git a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java
new file mode 100644
index 00000000000..20393cba958
--- /dev/null
+++ b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java
@@ -0,0 +1,182 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.reindexing;
+
+import ai.vespa.reindexing.Reindexer.Cluster;
+import ai.vespa.reindexing.Reindexing.Status;
+import ai.vespa.reindexing.ReindexingCurator.ReindexingLockException;
+import com.yahoo.document.Document;
+import com.yahoo.document.DocumentPut;
+import com.yahoo.document.DocumentType;
+import com.yahoo.document.DocumentTypeManager;
+import com.yahoo.document.config.DocumentmanagerConfig;
+import com.yahoo.documentapi.DocumentAccessParams;
+import com.yahoo.documentapi.ProgressToken;
+import com.yahoo.documentapi.SyncParameters;
+import com.yahoo.documentapi.VisitorParameters;
+import com.yahoo.documentapi.local.LocalDocumentAccess;
+import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
+import com.yahoo.searchdefinition.derived.Deriver;
+import com.yahoo.test.ManualClock;
+import com.yahoo.vespa.curator.mock.MockCurator;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Phaser;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.fail;
+
+/**
+ * @author jonmv
+ */
+class ReindexerTest {
+
+ final DocumentmanagerConfig musicConfig = Deriver.getDocumentManagerConfig("src/test/resources/schemas/music.sd").build();
+ final DocumentTypeManager manager = new DocumentTypeManager(musicConfig);
+ final DocumentType music = manager.getDocumentType("music");
+ final Document document1 = new Document(music, "id:ns:music::one");
+ final Document document2 = new Document(music, "id:ns:music::two");
+ final Cluster cluster = new Cluster("cluster", "id", Map.of(music, "default"));
+ final ManualClock clock = new ManualClock(Instant.EPOCH);
+
+ ReindexingCurator database;
+ LocalDocumentAccess access;
+
+ @BeforeEach
+ void setUp() {
+ database = new ReindexingCurator(new MockCurator(), "cluster", manager, Duration.ofMillis(1));
+ access = new LocalDocumentAccess(new DocumentAccessParams().setDocumentmanagerConfig(musicConfig));
+ }
+
+ @Test
+ void throwsWhenUnknownBuckets() {
+ assertThrows(NullPointerException.class,
+ () -> new Reindexer(new Cluster("cluster", "id", Map.of()),
+ Map.of(music, Instant.EPOCH),
+ database,
+ access,
+ clock));
+ }
+
+ @Test
+ void throwsWhenLockHeldElsewhere() throws InterruptedException, ExecutionException {
+ Reindexer reindexer = new Reindexer(cluster, Map.of(music, Instant.EPOCH), database, access, clock);
+ Executors.newSingleThreadExecutor().submit(database::lockReindexing).get();
+ assertThrows(ReindexingLockException.class, reindexer::reindex);
+ }
+
+ @Test
+ @Timeout(10)
+ void nothingToDo() throws ReindexingLockException {
+ Reindexer reindexer = new Reindexer(cluster, Map.of(), database, access, clock);
+ access.createSyncSession(new SyncParameters.Builder().build()).put(new DocumentPut(document1));
+ access.setPhaser(new Phaser(1)); // Would block any visiting until timeout.
+ reindexer.reindex();
+ }
+
+ @Test
+ void parameters() {
+ Reindexer reindexer = new Reindexer(cluster, Map.of(), database, access, clock);
+ ProgressToken token = new ProgressToken();
+ VisitorParameters parameters = reindexer.createParameters(music, token);
+ assertEquals("music:[document]", parameters.getFieldSet());
+ assertSame(token, parameters.getResumeToken());
+ assertEquals("default", parameters.getBucketSpace());
+ assertEquals("[Storage:cluster=cluster;clusterconfigid=id]", parameters.getRoute().toString());
+ assertEquals("cluster", parameters.getRemoteDataHandler());
+ assertEquals("music", parameters.getDocumentSelection());
+ assertEquals(DocumentProtocol.Priority.LOW_1, parameters.getPriority());
+ }
+
+ @Test
+ @Timeout(10)
+ void testReindexing() throws ReindexingLockException, ExecutionException, InterruptedException {
+ // Reindexer is told to update "music" documents no earlier than EPOCH, which is just now.
+ Reindexer reindexer = new Reindexer(cluster, Map.of(music, Instant.EPOCH), database, access, clock);
+ Phaser phaser = new Phaser(1);
+ access.setPhaser(phaser); // Will block any visiting until timeout.
+ reindexer.reindex();
+
+ // Since "music" was a new document type, it is stored as just reindexed, and nothing else happens.
+ Reindexing reindexing = Reindexing.empty().with(music, Status.ready(Instant.EPOCH).running().successful(Instant.EPOCH));
+ assertEquals(reindexing, database.readReindexing());
+
+ // New config tells reindexer to reindex "music" documents no earlier than at 10 millis after EPOCH, which isn't yet.
+ clock.advance(Duration.ofMillis(5));
+ reindexer = new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(10)), database, access, clock);
+ reindexer.reindex(); // Nothing happens because it's not yet time. This isn't supposed to happen unless high clock skew.
+ assertEquals(reindexing, database.readReindexing());
+
+ // It's time to reindex the "music" documents, but since we sneak a FAILED status in there, nothing is done.
+ clock.advance(Duration.ofMillis(10));
+ Reindexing failed = Reindexing.empty().with(music, Status.ready(clock.instant()).running().failed(clock.instant(), "fail"));
+ database.writeReindexing(failed);
+ reindexer = new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(10)), database, access, clock);
+ reindexer.reindex(); // Nothing happens because status is already FAILED for the document type.
+ assertEquals(failed, database.readReindexing());
+
+ // It's time to reindex the "music" documents — none yet, so this is a no-op, which just updates the timestamp.
+ database.writeReindexing(reindexing); // Restore state where reindexing was complete at 5 ms after EPOCH.
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ reindexer.reindex();
+ reindexing = reindexing.with(music, Status.ready(clock.instant()).running().successful(clock.instant()));
+ assertEquals(reindexing, database.readReindexing());
+
+ // We add a document and interrupt reindexing before the visit is complete.
+ access.createSyncSession(new SyncParameters.Builder().build()).put(new DocumentPut(document1));
+ clock.advance(Duration.ofMillis(10));
+ reindexer = new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(20)), database, access, clock);
+ Future<?> future = executor.submit(uncheckedReindex(reindexer));
+ while (phaser.getRegisteredParties() == 1)
+ Thread.sleep(1); // Need to wait for the visitor to register, without any proper way of doing it >_<
+ database.writeReindexing(Reindexing.empty()); // Wreck database while running, to verify we write the expected value.
+ reindexer.shutdown();
+ phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which may now send the document.
+ phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which may now complete.
+ future.get(); // Write state to database.
+ reindexing = reindexing.with(music, Status.ready(clock.instant()).running().halted());
+ assertEquals(reindexing, database.readReindexing());
+
+ // Manually put a progress token in there, to verify this is used when resuming, and that we store what we retrieve.
+ reindexing = reindexing.with(music, Status.ready(clock.instant()).running().progressed(new ProgressToken()).halted());
+ database.writeReindexing(reindexing);
+ reindexer = new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(20)), database, access, clock);
+ future = executor.submit(uncheckedReindex(reindexer));
+ while (phaser.getRegisteredParties() == 1)
+ Thread.sleep(1); // Need to wait for the visitor to register, without any proper way of doing it >_<
+ database.writeReindexing(Reindexing.empty()); // Wreck database while running, to verify we write the expected value.
+ reindexer.shutdown(); // Interrupt the visit before it completes.
+ phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which may now send the document.
+ phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which may now complete.
+ future.get(); // Write state to database.
+ assertEquals(reindexing, database.readReindexing());
+
+ // Finally let the visit complete normally.
+ reindexer = new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(20)), database, access, clock);
+ future = executor.submit(uncheckedReindex(reindexer));
+ while (phaser.getRegisteredParties() == 1)
+ Thread.sleep(1); // Need to wait for the visitor to register, without any proper way of doing it >_<
+ database.writeReindexing(Reindexing.empty()); // Wreck database while running, to verify we write the expected value.
+ phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which may now send the document.
+ phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which may now complete.
+ future.get(); // Write state to database.
+ reindexing = reindexing.with(music, Status.ready(clock.instant()).running().successful(clock.instant()));
+ assertEquals(reindexing, database.readReindexing());
+ }
+
+ Callable<Void> uncheckedReindex(Reindexer reindexer) {
+ return () -> { reindexer.reindex(); return null; };
+ }
+
+}
diff --git a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingCuratorTest.java b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingCuratorTest.java
index 0501ff474ca..c5a58dcae68 100644
--- a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingCuratorTest.java
+++ b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingCuratorTest.java
@@ -25,7 +25,7 @@ class ReindexingCuratorTest {
DocumentTypeManager manager = new DocumentTypeManager(musicConfig);
DocumentType music = manager.getDocumentType("music");
MockCurator mockCurator = new MockCurator();
- ReindexingCurator curator = new ReindexingCurator(mockCurator, manager);
+ ReindexingCurator curator = new ReindexingCurator(mockCurator, "cluster", manager);
assertEquals(Reindexing.empty(), curator.readReindexing());
@@ -42,7 +42,7 @@ class ReindexingCuratorTest {
assertEquals(reindexing, curator.readReindexing());
// Unknown document types are forgotten.
- assertEquals(Reindexing.empty(), new ReindexingCurator(mockCurator, new DocumentTypeManager(emptyConfig)).readReindexing());
+ assertEquals(Reindexing.empty(), new ReindexingCurator(mockCurator, "cluster", new DocumentTypeManager(emptyConfig)).readReindexing());
}
}
diff --git a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingMaintainerTest.java b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingMaintainerTest.java
new file mode 100644
index 00000000000..713fb836d62
--- /dev/null
+++ b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingMaintainerTest.java
@@ -0,0 +1,138 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.reindexing;
+
+import ai.vespa.reindexing.Reindexer.Cluster;
+import com.yahoo.cloud.config.ClusterListConfig;
+import com.yahoo.document.DocumentTypeManager;
+import com.yahoo.document.config.DocumentmanagerConfig;
+import com.yahoo.searchdefinition.derived.Deriver;
+import com.yahoo.vespa.config.content.AllClustersBucketSpacesConfig;
+import com.yahoo.vespa.config.content.reindexing.ReindexingConfig;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Map;
+
+import static ai.vespa.reindexing.ReindexingMaintainer.parseCluster;
+import static ai.vespa.reindexing.ReindexingMaintainer.parseReady;
+import static ai.vespa.reindexing.ReindexingMaintainer.scheduleStaggered;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/**
+ * @author jonmv
+ */
+class ReindexingMaintainerTest {
+
+ @Test
+ void testParsing() {
+ DocumentmanagerConfig musicConfig = Deriver.getDocumentManagerConfig("src/test/resources/schemas/music.sd").build();
+ DocumentTypeManager manager = new DocumentTypeManager(musicConfig);
+
+ assertEquals(Map.of(manager.getDocumentType("music"), Instant.ofEpochMilli(123)),
+ parseReady(new ReindexingConfig.Builder()
+ .enabled(true)
+ .clusterName("cluster")
+ .status("music", new ReindexingConfig.Status.Builder().readyAtMillis(123))
+ .build(),
+ manager));
+
+ // Unknown document type fails
+ assertThrows(IllegalArgumentException.class,
+ () -> parseReady(new ReindexingConfig.Builder()
+ .status("poetry", new ReindexingConfig.Status.Builder().readyAtMillis(123))
+ .build(),
+ manager));
+
+ assertEquals(new Cluster("cluster", "configId", Map.of(manager.getDocumentType("music"), "default")),
+ parseCluster("cluster",
+ new ClusterListConfig.Builder()
+ .storage(new ClusterListConfig.Storage.Builder()
+ .name("oyster")
+ .configid("configId"))
+ .storage(new ClusterListConfig.Storage.Builder()
+ .name("cluster")
+ .configid("configId"))
+ .build(),
+ new AllClustersBucketSpacesConfig.Builder()
+ .cluster("oyster", new AllClustersBucketSpacesConfig.Cluster.Builder()
+ .documentType("music", new AllClustersBucketSpacesConfig.Cluster.DocumentType.Builder()
+ .bucketSpace("global")))
+ .cluster("cluster", new AllClustersBucketSpacesConfig.Cluster.Builder()
+ .documentType("music", new AllClustersBucketSpacesConfig.Cluster.DocumentType.Builder()
+ .bucketSpace("default")))
+ .build(),
+ manager));
+
+ // Cluster missing in bucket space list fails.
+ assertThrows(NullPointerException.class,
+ () -> parseCluster("cluster",
+ new ClusterListConfig.Builder()
+ .storage(new ClusterListConfig.Storage.Builder()
+ .name("cluster")
+ .configid("configId"))
+ .build(),
+ new AllClustersBucketSpacesConfig.Builder()
+ .cluster("oyster", new AllClustersBucketSpacesConfig.Cluster.Builder()
+ .documentType("music", new AllClustersBucketSpacesConfig.Cluster.DocumentType.Builder()
+ .bucketSpace("global")))
+ .build(),
+ manager));
+
+ // Cluster missing in cluster list fails.
+ assertThrows(IllegalStateException.class,
+ () -> parseCluster("cluster",
+ new ClusterListConfig.Builder()
+ .storage(new ClusterListConfig.Storage.Builder()
+ .name("oyster")
+ .configid("configId"))
+ .build(),
+ new AllClustersBucketSpacesConfig.Builder()
+ .cluster("cluster", new AllClustersBucketSpacesConfig.Cluster.Builder()
+ .documentType("music", new AllClustersBucketSpacesConfig.Cluster.DocumentType.Builder()
+ .bucketSpace("default")))
+ .build(),
+ manager));
+ }
+
+ @Test
+ void testStaggering() {
+ scheduleStaggered((delayMillis, intervalMillis) -> {
+ assertEquals(0, delayMillis);
+ assertEquals(10, intervalMillis);
+ },
+ Duration.ofMillis(10),
+ Instant.ofEpochMilli(27),
+ "host",
+ "nys:123,hark:123");
+
+ scheduleStaggered((delayMillis, intervalMillis) -> {
+ assertEquals(3, delayMillis);
+ assertEquals(10, intervalMillis);
+ },
+ Duration.ofMillis(10),
+ Instant.ofEpochMilli(27),
+ "host",
+ "host:123");
+
+ scheduleStaggered((delayMillis, intervalMillis) -> {
+ assertEquals(7, delayMillis);
+ assertEquals(20, intervalMillis);
+ },
+ Duration.ofMillis(10),
+ Instant.ofEpochMilli(13),
+ "host",
+ "host:123,:nys:321");
+
+ scheduleStaggered((delayMillis, intervalMillis) -> {
+ assertEquals(17, delayMillis);
+ assertEquals(20, intervalMillis);
+ },
+ Duration.ofMillis(10),
+ Instant.ofEpochMilli(13),
+ "nys",
+ "host:123,nys:321");
+ }
+
+}
diff --git a/clustercontroller-reindexer/src/test/resources/schemas/music.sd b/clustercontroller-reindexer/src/test/resources/schemas/music.sd
index a289f5a686b..e55fff26ea5 100644
--- a/clustercontroller-reindexer/src/test/resources/schemas/music.sd
+++ b/clustercontroller-reindexer/src/test/resources/schemas/music.sd
@@ -1,5 +1,8 @@
# Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
search music {
+ field auxilliary type int {
+ indexing: random 123 | summary
+ }
document music {
field artist type string { }
}
diff --git a/config-application-package/src/main/java/com/yahoo/config/model/application/provider/FilesApplicationPackage.java b/config-application-package/src/main/java/com/yahoo/config/model/application/provider/FilesApplicationPackage.java
index 91ff1c6d339..9c68fb95cce 100644
--- a/config-application-package/src/main/java/com/yahoo/config/model/application/provider/FilesApplicationPackage.java
+++ b/config-application-package/src/main/java/com/yahoo/config/model/application/provider/FilesApplicationPackage.java
@@ -80,7 +80,7 @@ public class FilesApplicationPackage implements ApplicationPackage {
* The name of the subdirectory (below the original application package root)
* where a preprocessed version of this application package is stored.
* As it happens, the config model is first created with an application package in this subdirectory,
- * and later used backed by an application package which is not in this subdirectory.
+ * and later backed by an application package which is not in this subdirectory.
* To enable model code to correct for this, this constant must be publicly known.
*
* All of this stuff is Very Unfortunate and should be fixed. -Jon
@@ -306,34 +306,15 @@ public class FilesApplicationPackage implements ApplicationPackage {
@Override
public Collection<NamedReader> searchDefinitionContents() {
- Map<String, NamedReader> ret = new LinkedHashMap<>();
- Set<String> fileSds = new LinkedHashSet<>();
- Set<String> bundleSds = new LinkedHashSet<>();
+ Set<NamedReader> ret = new LinkedHashSet<>();
try {
for (File f : getSearchDefinitionFiles()) {
- fileSds.add(f.getName());
- ret.put(f.getName(), new NamedReader(f.getName(), new FileReader(f)));
+ ret.add(new NamedReader(f.getName(), new FileReader(f)));
}
} catch (Exception e) {
throw new IllegalArgumentException("Couldn't get search definition contents.", e);
}
- verifySdsDisjoint(fileSds, bundleSds);
- return ret.values();
- }
-
- /**
- * Verify that two sets of search definitions are disjoint (TODO: everything except error message is very generic).
- *
- * @param fileSds Set of search definitions from file
- * @param bundleSds Set of search definitions from bundles
- */
- private void verifySdsDisjoint(Set<String> fileSds, Set<String> bundleSds) {
- if (!Collections.disjoint(fileSds, bundleSds)) {
- Collection<String> disjoint = new ArrayList<>(fileSds);
- disjoint.retainAll(bundleSds);
- throw new IllegalArgumentException("For the following search definitions names there are collisions between those specified inside " +
- "docproc bundles and those in searchdefinitions/ in application package: "+disjoint);
- }
+ return ret;
}
/**
diff --git a/configserver/pom.xml b/configserver/pom.xml
index 8cd1b4b4254..43187038f97 100644
--- a/configserver/pom.xml
+++ b/configserver/pom.xml
@@ -220,6 +220,11 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>com.yahoo.vespa</groupId>
<artifactId>http-utils</artifactId>
<version>${project.version}</version>
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ApplicationReindexing.java b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ApplicationReindexing.java
index fae52f318bd..ed28bec0d81 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ApplicationReindexing.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ApplicationReindexing.java
@@ -42,14 +42,14 @@ public class ApplicationReindexing implements Reindexing {
/** Returns a copy of this with common reindexing for the given cluster ready at the given instant. */
public ApplicationReindexing withReady(String cluster, Instant readyAt) {
- Cluster current = clusters.getOrDefault(cluster, Cluster.empty);
+ Cluster current = clusters.getOrDefault(cluster, Cluster.ready(common));
Cluster modified = new Cluster(new Status(readyAt), current.pending, current.ready);
return new ApplicationReindexing(common, with(cluster, modified, clusters));
}
/** Returns a copy of this with reindexing for the given document type in the given cluster ready at the given instant. */
public ApplicationReindexing withReady(String cluster, String documentType, Instant readyAt) {
- Cluster current = clusters.getOrDefault(cluster, Cluster.empty);
+ Cluster current = clusters.getOrDefault(cluster, Cluster.ready(common));
Cluster modified = new Cluster(current.common,
without(documentType, current.pending),
with(documentType, new Status(readyAt), current.ready));
@@ -58,7 +58,7 @@ public class ApplicationReindexing implements Reindexing {
/** Returns a copy of this with a pending reindexing at the given generation, for the given document type. */
public ApplicationReindexing withPending(String cluster, String documentType, long requiredGeneration) {
- Cluster current = clusters.getOrDefault(cluster, Cluster.empty);
+ Cluster current = clusters.getOrDefault(cluster, Cluster.ready(common));
Cluster modified = new Cluster(current.common,
with(documentType, requirePositive(requiredGeneration), current.pending),
without(documentType, current.ready));
@@ -79,13 +79,10 @@ public class ApplicationReindexing implements Reindexing {
if (clusters.get(cluster).pending().containsKey(documentType))
return Optional.empty();
- Status documentStatus = clusters.get(cluster).ready().get(documentType);
- Status clusterStatus = clusters.get(cluster).common();
- if (documentStatus == null || documentStatus.ready().isBefore(clusterStatus.ready()))
- documentStatus = clusterStatus;
+ if (clusters.get(cluster).ready().containsKey(documentType))
+ return Optional.of(clusters.get(cluster).ready().get(documentType));
- if (documentStatus.ready().isAfter(common().ready()))
- return Optional.of(documentStatus);
+ return Optional.of(clusters.get(cluster).common());
}
return Optional.of(common());
}
@@ -116,7 +113,7 @@ public class ApplicationReindexing implements Reindexing {
/** Reindexing status for a single content cluster in an application. */
public static class Cluster {
- private static final Cluster empty = new Cluster(Status.ALWAYS_READY, Map.of(), Map.of());
+ private static Cluster ready(Status common) { return new Cluster(common, Map.of(), Map.of()); }
private final Status common;
private final Map<String, Long> pending;
@@ -173,9 +170,6 @@ public class ApplicationReindexing implements Reindexing {
/** Reindexing status common to an application, one of its clusters, or a single document type in a cluster. */
public static class Status implements Reindexing.Status {
- /** Always ready, i.e., ignored when joining with more specific statuses. */
- private static final Status ALWAYS_READY = new Status(Instant.EPOCH);
-
private final Instant ready;
Status(Instant ready) {
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ConfigConvergenceChecker.java b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ConfigConvergenceChecker.java
index 6b316c06b54..7376452df42 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ConfigConvergenceChecker.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ConfigConvergenceChecker.java
@@ -1,27 +1,29 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.config.server.application;
-import ai.vespa.util.http.VespaClientBuilderFactory;
+import ai.vespa.util.http.VespaHttpClientBuilder;
import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import com.yahoo.component.AbstractComponent;
+import com.yahoo.concurrent.DaemonThreadFactory;
import com.yahoo.config.model.api.HostInfo;
import com.yahoo.config.model.api.PortInfo;
import com.yahoo.config.model.api.ServiceInfo;
-import java.util.logging.Level;
+import com.yahoo.log.LogLevel;
import com.yahoo.slime.Cursor;
import com.yahoo.vespa.config.server.http.JSONResponse;
-import org.glassfish.jersey.client.ClientProperties;
-import org.glassfish.jersey.client.proxy.WebResourceFactory;
-
-import javax.ws.rs.GET;
-import javax.ws.rs.Path;
-import javax.ws.rs.ProcessingException;
-import javax.ws.rs.client.Client;
-import javax.ws.rs.client.ClientRequestFilter;
-import javax.ws.rs.client.WebTarget;
-import javax.ws.rs.core.HttpHeaders;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.impl.client.CloseableHttpClient;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
import java.net.URI;
+import java.net.URISyntaxException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.LinkedHashMap;
@@ -29,25 +31,29 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.logging.Logger;
-import java.util.stream.Collectors;
import static com.yahoo.config.model.api.container.ContainerServiceType.CLUSTERCONTROLLER_CONTAINER;
import static com.yahoo.config.model.api.container.ContainerServiceType.CONTAINER;
import static com.yahoo.config.model.api.container.ContainerServiceType.LOGSERVER_CONTAINER;
import static com.yahoo.config.model.api.container.ContainerServiceType.QRSERVER;
+import static java.util.stream.Collectors.toList;
/**
* Checks for convergence of config generation for a given application.
*
* @author Ulf Lilleengen
* @author hmusum
+ * @author bjorncs
*/
public class ConfigConvergenceChecker extends AbstractComponent {
private static final Logger log = Logger.getLogger(ConfigConvergenceChecker.class.getName());
- private static final String statePath = "/state/v1/";
- private static final String configSubPath = "config";
private final static Set<String> serviceTypesToCheck = Set.of(
CONTAINER.serviceName,
QRSERVER.serviceName,
@@ -58,16 +64,13 @@ public class ConfigConvergenceChecker extends AbstractComponent {
"distributor"
);
- private final StateApiFactory stateApiFactory;
- private final VespaClientBuilderFactory clientBuilderFactory = new VespaClientBuilderFactory();
+ private final CloseableHttpClient httpClient;
+ private final ObjectMapper jsonMapper = new ObjectMapper();
+ private final ExecutorService executor = createThreadpool();
@Inject
public ConfigConvergenceChecker() {
- this(ConfigConvergenceChecker::createStateApi);
- }
-
- public ConfigConvergenceChecker(StateApiFactory stateApiFactory) {
- this.stateApiFactory = stateApiFactory;
+ this.httpClient = createHttpClient();
}
/** Fetches the active config generation for all services in the given application. */
@@ -82,7 +85,7 @@ public class ConfigConvergenceChecker extends AbstractComponent {
}
/** Check all services in given application. Returns the minimum current generation of all services */
- public ServiceListResponse getServiceConfigGenerationsResponse(Application application, URI requestUrl, Duration timeoutPerService) {
+ public JSONResponse getServiceConfigGenerationsResponse(Application application, URI requestUrl, Duration timeoutPerService) {
Map<ServiceInfo, Long> currentGenerations = getServiceConfigGenerations(application, timeoutPerService);
long currentGeneration = currentGenerations.values().stream().mapToLong(Long::longValue).min().orElse(-1);
return new ServiceListResponse(200, currentGenerations, requestUrl, application.getApplicationGeneration(),
@@ -90,7 +93,7 @@ public class ConfigConvergenceChecker extends AbstractComponent {
}
/** Check service identified by host and port in given application */
- public ServiceResponse getServiceConfigGenerationResponse(Application application, String hostAndPortToCheck, URI requestUrl, Duration timeout) {
+ public JSONResponse getServiceConfigGenerationResponse(Application application, String hostAndPortToCheck, URI requestUrl, Duration timeout) {
Long wantedGeneration = application.getApplicationGeneration();
try {
if ( ! hostInApplication(application, hostAndPortToCheck))
@@ -99,7 +102,7 @@ public class ConfigConvergenceChecker extends AbstractComponent {
long currentGeneration = getServiceGeneration(URI.create("http://" + hostAndPortToCheck), timeout);
boolean converged = currentGeneration >= wantedGeneration;
return ServiceResponse.createOkResponse(requestUrl, hostAndPortToCheck, wantedGeneration, currentGeneration, converged);
- } catch (ProcessingException e) { // e.g. if we cannot connect to the service to find generation
+ } catch (NonSuccessStatusCodeException | IOException e) { // e.g. if we cannot connect to the service to find generation
return ServiceResponse.createNotFoundResponse(requestUrl, hostAndPortToCheck, wantedGeneration, e.getMessage());
} catch (Exception e) {
return ServiceResponse.createErrorResponse(requestUrl, hostAndPortToCheck, wantedGeneration, e.getMessage());
@@ -108,46 +111,64 @@ public class ConfigConvergenceChecker extends AbstractComponent {
@Override
public void deconstruct() {
- clientBuilderFactory.close();
- }
-
- @Path(statePath)
- public interface StateApi {
- @Path(configSubPath)
- @GET
- JsonNode config();
- }
-
- public interface StateApiFactory {
- StateApi createStateApi(Client client, URI serviceUri);
+ try {
+ httpClient.close();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
}
- /** Gets service generation for a list of services (in parallel). */
+ /**
+ * Gets service generation for a list of services (in parallel).
+ * This should ideally be implemented using an async http client
+ * */
private Map<ServiceInfo, Long> getServiceGenerations(List<ServiceInfo> services, Duration timeout) {
- return services.parallelStream()
- .collect(Collectors.toMap(service -> service,
- service -> {
- try {
- return getServiceGeneration(URI.create("http://" + service.getHostName()
- + ":" + getStatePort(service).get()), timeout);
- }
- catch (ProcessingException e) { // Cannot connect to service to determine service generation
- return -1L;
- }
- },
- (v1, v2) -> { throw new IllegalStateException("Duplicate keys for values '" + v1 + "' and '" + v2 + "'."); },
- LinkedHashMap::new
- ));
+ List<Callable<ServiceInfoWithGeneration>> tasks = services.stream()
+ .map(service ->
+ (Callable<ServiceInfoWithGeneration>) () -> {
+ long generation;
+ try {
+ generation = getServiceGeneration(URI.create("http://" + service.getHostName()
+ + ":" + getStatePort(service).get()), timeout);
+ } catch (IOException | NonSuccessStatusCodeException e) {
+ generation = -1L;
+ }
+ return new ServiceInfoWithGeneration(service, generation);
+ })
+ .collect(toList());
+ try {
+ List<Future<ServiceInfoWithGeneration>> taskResults = executor.invokeAll(tasks);
+ Map<ServiceInfo, Long> result = new LinkedHashMap<>();
+ for (Future<ServiceInfoWithGeneration> taskResult : taskResults) {
+ ServiceInfoWithGeneration info = taskResult.get();
+ result.put(info.service, info.generation);
+ }
+ return result;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Failed to retrieve config generation: " + e.getMessage(), e);
+ } catch (ExecutionException e) {
+ throw new RuntimeException("Failed to retrieve config generation: " + e.getMessage(), e);
+ }
}
/** Get service generation of service at given URL */
- private long getServiceGeneration(URI serviceUrl, Duration timeout) {
- Client client = createClient(timeout);
- try {
- StateApi state = stateApiFactory.createStateApi(client, serviceUrl);
- return generationFromContainerState(state.config());
- } finally {
- client.close();
+ private long getServiceGeneration(URI serviceUrl, Duration timeout) throws IOException, NonSuccessStatusCodeException {
+ HttpGet request = new HttpGet(createApiUri(serviceUrl));
+ request.addHeader("Connection", "close");
+ request.setConfig(createRequestConfig(timeout));
+ try (CloseableHttpResponse response = httpClient.execute(request)) {
+ int statusCode = response.getStatusLine().getStatusCode();
+ if (statusCode != HttpStatus.SC_OK) throw new NonSuccessStatusCodeException(statusCode);
+ if (response.getEntity() == null) throw new IOException("Response has no content");
+ JsonNode jsonContent = jsonMapper.readTree(response.getEntity().getContent());
+ return generationFromContainerState(jsonContent);
+ } catch (Exception e) {
+ log.log(
+ LogLevel.DEBUG,
+ e,
+ () -> String.format("Failed to retrieve service config generation for '%s': %s", serviceUrl, e.getMessage()));
+ throw e;
}
}
@@ -166,16 +187,6 @@ public class ConfigConvergenceChecker extends AbstractComponent {
return false;
}
- private Client createClient(Duration timeout) {
- return clientBuilderFactory.newBuilder()
- .register(
- (ClientRequestFilter) ctx ->
- ctx.getHeaders().put(HttpHeaders.USER_AGENT, List.of("config-convergence-checker")))
- .property(ClientProperties.CONNECT_TIMEOUT, (int) timeout.toMillis())
- .property(ClientProperties.READ_TIMEOUT, (int) timeout.toMillis())
- .build();
- }
-
private static Optional<Integer> getStatePort(ServiceInfo service) {
return service.getPorts().stream()
.filter(port -> port.getTags().contains("state"))
@@ -187,9 +198,57 @@ public class ConfigConvergenceChecker extends AbstractComponent {
return state.get("config").get("generation").asLong(-1);
}
- private static StateApi createStateApi(Client client, URI uri) {
- WebTarget target = client.target(uri);
- return WebResourceFactory.newResource(StateApi.class, target);
+ private static URI createApiUri(URI serviceUrl) {
+ try {
+ return new URIBuilder(serviceUrl)
+ .setPath("/state/v1/config")
+ .build();
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static ExecutorService createThreadpool() {
+ return Executors.newFixedThreadPool(
+ Runtime.getRuntime().availableProcessors(), new DaemonThreadFactory("config-convergence-checker-"));
+ }
+
+ private static CloseableHttpClient createHttpClient() {
+ return VespaHttpClientBuilder
+ .create()
+ .setUserAgent("config-convergence-checker")
+ .setMaxConnPerRoute(10)
+ .setMaxConnTotal(400)
+ .setConnectionReuseStrategy((response, context) -> false) // Disable connection reuse
+ .build();
+ }
+
+ private static RequestConfig createRequestConfig(Duration timeout) {
+ int timeoutMillis = (int)timeout.toMillis();
+ return RequestConfig.custom()
+ .setConnectionRequestTimeout(timeoutMillis)
+ .setConnectTimeout(timeoutMillis)
+ .setSocketTimeout(timeoutMillis)
+ .build();
+ }
+
+ private static class ServiceInfoWithGeneration {
+ final ServiceInfo service;
+ final long generation;
+
+ ServiceInfoWithGeneration(ServiceInfo service, long generation) {
+ this.service = service;
+ this.generation = generation;
+ }
+ }
+
+ private static class NonSuccessStatusCodeException extends Exception {
+ final int statusCode;
+
+ NonSuccessStatusCodeException(int statusCode) {
+ super("Expected status code 200, got " + statusCode);
+ this.statusCode = statusCode;
+ }
}
private static class ServiceListResponse extends JSONResponse {
diff --git a/configserver/src/main/java/com/yahoo/vespa/serviceview/ConfigServerLocation.java b/configserver/src/main/java/com/yahoo/vespa/serviceview/ConfigServerLocation.java
index cc452421d2d..05d1119aa4f 100644
--- a/configserver/src/main/java/com/yahoo/vespa/serviceview/ConfigServerLocation.java
+++ b/configserver/src/main/java/com/yahoo/vespa/serviceview/ConfigServerLocation.java
@@ -15,6 +15,7 @@ public class ConfigServerLocation extends AbstractComponent {
final int restApiPort;
// The client factory must be owned by a component as StateResource is instantiated per request
+ @SuppressWarnings("removal")
final VespaClientBuilderFactory clientBuilderFactory = new VespaClientBuilderFactory();
@Inject
diff --git a/configserver/src/main/java/com/yahoo/vespa/serviceview/StateResource.java b/configserver/src/main/java/com/yahoo/vespa/serviceview/StateResource.java
index 76e600d2ad8..138e6c8798c 100644
--- a/configserver/src/main/java/com/yahoo/vespa/serviceview/StateResource.java
+++ b/configserver/src/main/java/com/yahoo/vespa/serviceview/StateResource.java
@@ -40,6 +40,7 @@ public class StateResource implements StateClient {
private static final String USER_AGENT = "service-view-config-server-client";
private static final String SINGLE_API_LINK = "url";
+ @SuppressWarnings("removal")
private final VespaClientBuilderFactory clientBuilderFactory;
private final int restApiPort;
private final String host;
diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/application/ApplicationReindexingTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/application/ApplicationReindexingTest.java
index b803413d6ea..82de3e2eefb 100644
--- a/configserver/src/test/java/com/yahoo/vespa/config/server/application/ApplicationReindexingTest.java
+++ b/configserver/src/test/java/com/yahoo/vespa/config/server/application/ApplicationReindexingTest.java
@@ -27,12 +27,19 @@ public class ApplicationReindexingTest {
.withReady("one", "a", Instant.ofEpochMilli(1))
.withReady("two", "c", Instant.ofEpochMilli(3));
- assertEquals(Instant.ofEpochMilli(1 << 20),
+ // Document is most specific, and is used.
+ assertEquals(Instant.ofEpochMilli(1),
reindexing.status("one", "a").orElseThrow().ready());
+ // Cluster is most specific, and inherits application's common status.
assertEquals(Instant.ofEpochMilli(1 << 20),
reindexing.status("one", "d").orElseThrow().ready());
+ // Cluster is most specific, and has its own status set.
+ assertEquals(Instant.ofEpochMilli(2 << 10),
+ reindexing.status("two", "d").orElseThrow().ready());
+
+ // Application is most specific, as cluster and documeent are unknown.
assertEquals(Instant.ofEpochMilli(1 << 20),
reindexing.status("three", "a").orElseThrow().ready());
@@ -45,7 +52,7 @@ public class ApplicationReindexingTest {
assertEquals(Set.of("one", "two"),
reindexing.clusters().keySet());
- assertEquals(new Status(Instant.EPOCH),
+ assertEquals(new Status(Instant.ofEpochMilli(1 << 20)),
reindexing.clusters().get("one").common());
assertEquals(Map.of("a", new Status(Instant.ofEpochMilli(1))),
diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/application/ConfigConvergenceCheckerTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/application/ConfigConvergenceCheckerTest.java
index 6aeb774d2b0..4948432c646 100644
--- a/configserver/src/test/java/com/yahoo/vespa/config/server/application/ConfigConvergenceCheckerTest.java
+++ b/configserver/src/test/java/com/yahoo/vespa/config/server/application/ConfigConvergenceCheckerTest.java
@@ -2,15 +2,13 @@
package com.yahoo.vespa.config.server.application;
import com.github.tomakehurst.wiremock.junit.WireMockRule;
+import com.yahoo.component.Version;
import com.yahoo.config.model.api.Model;
import com.yahoo.config.provision.ApplicationId;
import com.yahoo.config.provision.ApplicationName;
import com.yahoo.config.provision.InstanceName;
import com.yahoo.config.provision.TenantName;
-import com.yahoo.component.Version;
import com.yahoo.container.jdisc.HttpResponse;
-import com.yahoo.slime.Slime;
-import com.yahoo.slime.SlimeUtils;
import com.yahoo.vespa.config.server.ServerCache;
import com.yahoo.vespa.config.server.monitoring.MetricUpdater;
import org.junit.Before;
@@ -31,8 +29,9 @@ import static com.github.tomakehurst.wiremock.client.WireMock.get;
import static com.github.tomakehurst.wiremock.client.WireMock.okJson;
import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.options;
+import static com.yahoo.test.json.JsonTestHelper.assertJsonEquals;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
/**
* @author Ulf Lilleengen
@@ -184,10 +183,15 @@ public class ConfigConvergenceCheckerTest {
.withBody("response too slow")));
HttpResponse response = checker.getServiceConfigGenerationResponse(application, hostAndPort(service), requestUrl, Duration.ofMillis(1));
// Message contained in a SocketTimeoutException may differ across platforms, so we do a partial match of the response here
- assertResponse((responseBody) -> assertTrue("Response matches", responseBody.startsWith(
- "{\"url\":\"" + requestUrl.toString() + "\",\"host\":\"" + hostAndPort(requestUrl) +
- "\",\"wantedGeneration\":3,\"error\":\"java.net.SocketTimeoutException") &&
- responseBody.endsWith("\"}")), 404, response);
+ assertResponse(
+ responseBody ->
+ assertThat(responseBody)
+ .startsWith("{\"url\":\"" + requestUrl.toString() + "\",\"host\":\"" + hostAndPort(requestUrl) +
+ "\",\"wantedGeneration\":3,\"error\":\"")
+ .contains("timed out")
+ .endsWith("\"}"),
+ 404,
+ response);
}
private URI testServer() {
@@ -202,16 +206,8 @@ public class ConfigConvergenceCheckerTest {
return uri.getHost() + ":" + uri.getPort();
}
- private static void assertResponse(String json, int status, HttpResponse response) {
- assertResponse((responseBody) -> {
- Slime expected = SlimeUtils.jsonToSlime(json.getBytes());
- Slime actual = SlimeUtils.jsonToSlime(responseBody.getBytes());
- try {
- assertEquals(new String((SlimeUtils.toJsonBytes(expected))), new String(SlimeUtils.toJsonBytes(actual)));
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
- }, status, response);
+ private static void assertResponse(String expectedJson, int status, HttpResponse response) {
+ assertResponse((responseBody) -> assertJsonEquals(new String(responseBody.getBytes()), expectedJson), status, response);
}
private static void assertResponse(Consumer<String> assertFunc, int status, HttpResponse response) {
diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandlerTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandlerTest.java
index 7276091fed0..06a455954ac 100644
--- a/configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandlerTest.java
+++ b/configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandlerTest.java
@@ -1,7 +1,6 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.config.server.http.v2;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.yahoo.cloud.config.ConfigserverConfig;
import com.yahoo.component.Version;
import com.yahoo.config.model.api.ModelFactory;
@@ -18,7 +17,6 @@ import com.yahoo.vespa.config.server.MockLogRetriever;
import com.yahoo.vespa.config.server.MockProvisioner;
import com.yahoo.vespa.config.server.MockTesterClient;
import com.yahoo.vespa.config.server.TestComponentRegistry;
-import com.yahoo.vespa.config.server.application.ConfigConvergenceChecker;
import com.yahoo.vespa.config.server.application.HttpProxy;
import com.yahoo.vespa.config.server.application.OrchestratorMock;
import com.yahoo.vespa.config.server.deploy.DeployTester;
@@ -37,12 +35,10 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
-import javax.ws.rs.client.Client;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
-import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.List;
@@ -425,21 +421,6 @@ public class ApplicationHandlerTest {
return createApplicationHandler().handle(HttpRequest.createTestRequest(restartUrl, GET));
}
- private static class MockStateApiFactory implements ConfigConvergenceChecker.StateApiFactory {
- boolean createdApi = false;
- @Override
- public ConfigConvergenceChecker.StateApi createStateApi(Client client, URI serviceUri) {
- createdApi = true;
- return () -> {
- try {
- return new ObjectMapper().readTree("{\"config\":{\"generation\":1}}");
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- };
- }
- }
-
private ApplicationHandler createApplicationHandler() {
return createApplicationHandler(applicationRepository);
}
diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/maintenance/ReindexingMaintainerTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/maintenance/ReindexingMaintainerTest.java
index f4a553e25b7..57582040552 100644
--- a/configserver/src/test/java/com/yahoo/vespa/config/server/maintenance/ReindexingMaintainerTest.java
+++ b/configserver/src/test/java/com/yahoo/vespa/config/server/maintenance/ReindexingMaintainerTest.java
@@ -16,13 +16,14 @@ public class ReindexingMaintainerTest {
@Test
public void testReadyComputation() {
- ApplicationReindexing reindexing = ApplicationReindexing.ready(Instant.ofEpochMilli(1 << 20))
+ ApplicationReindexing reindexing = ApplicationReindexing.ready(Instant.EPOCH)
.withPending("one", "a", 10)
.withReady("two", "b", Instant.ofEpochMilli(2))
.withPending("two", "b", 20)
.withReady("two", Instant.ofEpochMilli(2 << 10))
.withReady("one", "a", Instant.ofEpochMilli(1))
- .withReady("two", "c", Instant.ofEpochMilli(3));
+ .withReady("two", "c", Instant.ofEpochMilli(3))
+ .withReady(Instant.ofEpochMilli(1 << 20));
assertEquals(reindexing,
withReady(reindexing, () -> -1L, Instant.EPOCH));
diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/athenz/impl/AthenzClientFactoryImpl.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/athenz/impl/AthenzClientFactoryImpl.java
index 323d49e4639..173729c7472 100644
--- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/athenz/impl/AthenzClientFactoryImpl.java
+++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/athenz/impl/AthenzClientFactoryImpl.java
@@ -2,7 +2,6 @@
package com.yahoo.vespa.hosted.controller.athenz.impl;
import com.google.inject.Inject;
-import com.yahoo.jdisc.Metric;
import com.yahoo.vespa.athenz.api.AthenzIdentity;
import com.yahoo.vespa.athenz.client.zms.DefaultZmsClient;
import com.yahoo.vespa.athenz.client.zms.ZmsClient;
@@ -11,32 +10,21 @@ import com.yahoo.vespa.athenz.client.zts.ZtsClient;
import com.yahoo.vespa.athenz.identity.ServiceIdentityProvider;
import com.yahoo.vespa.hosted.controller.api.integration.athenz.AthenzClientFactory;
import com.yahoo.vespa.hosted.controller.athenz.config.AthenzConfig;
-import org.apache.http.client.methods.HttpUriRequest;
import java.net.URI;
-import java.util.HashMap;
-import java.util.Map;
/**
* @author bjorncs
*/
public class AthenzClientFactoryImpl implements AthenzClientFactory {
- private static final String METRIC_NAME = "athenz.request.error";
- private static final String ATHENZ_SERVICE_DIMENSION = "athenzService";
- private static final String EXCEPTION_DIMENSION = "exception";
-
private final AthenzConfig config;
private final ServiceIdentityProvider identityProvider;
- private final Metric metrics;
- private final Map<String, Metric.Context> metricContexts;
@Inject
- public AthenzClientFactoryImpl(ServiceIdentityProvider identityProvider, AthenzConfig config, Metric metrics) {
+ public AthenzClientFactoryImpl(ServiceIdentityProvider identityProvider, AthenzConfig config) {
this.identityProvider = identityProvider;
this.config = config;
- this.metrics = metrics;
- this.metricContexts = new HashMap<>();
}
@Override
@@ -49,7 +37,7 @@ public class AthenzClientFactoryImpl implements AthenzClientFactory {
*/
@Override
public ZmsClient createZmsClient() {
- return new DefaultZmsClient(URI.create(config.zmsUrl()), identityProvider, this::reportMetricErrorHandler);
+ return new DefaultZmsClient(URI.create(config.zmsUrl()), identityProvider);
}
/**
@@ -57,7 +45,7 @@ public class AthenzClientFactoryImpl implements AthenzClientFactory {
*/
@Override
public ZtsClient createZtsClient() {
- return new DefaultZtsClient.Builder(URI.create(config.ztsUrl())).withIdentityProvider(identityProvider).build();
+ return new DefaultZtsClient(URI.create(config.ztsUrl()), identityProvider);
}
@Override
@@ -65,11 +53,4 @@ public class AthenzClientFactoryImpl implements AthenzClientFactory {
return true;
}
- private void reportMetricErrorHandler(HttpUriRequest request, Exception error) {
- String hostname = request.getURI().getHost();
- Metric.Context context = metricContexts.computeIfAbsent(hostname, host -> metrics.createContext(
- Map.of(ATHENZ_SERVICE_DIMENSION, host,
- EXCEPTION_DIMENSION, error.getClass().getSimpleName())));
- metrics.add(METRIC_NAME, 1, context);
- }
}
diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/CloudEventReporter.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/CloudEventReporter.java
index 402a82a9ca1..2e62f8e54df 100644
--- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/CloudEventReporter.java
+++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/CloudEventReporter.java
@@ -96,6 +96,8 @@ public class CloudEventReporter extends ControllerMaintainer {
}
private void submitIssue(CloudEvent event) {
+ if (controller().system().isPublic())
+ return;
Issue issue = eventFetcher.createIssue(event);
if (!issueHandler.issueExists(issue)) {
issueHandler.file(issue);
diff --git a/document/src/main/java/com/yahoo/document/fieldset/FieldSetRepo.java b/document/src/main/java/com/yahoo/document/fieldset/FieldSetRepo.java
index 285c78ef0ab..905376e81ca 100644
--- a/document/src/main/java/com/yahoo/document/fieldset/FieldSetRepo.java
+++ b/document/src/main/java/com/yahoo/document/fieldset/FieldSetRepo.java
@@ -34,21 +34,24 @@ public class FieldSetRepo {
FieldSet parseFieldCollection(DocumentTypeManager docMan, String docType, String fieldNames) {
DocumentType type = docMan.getDocumentType(docType);
if (type == null) {
- throw new IllegalArgumentException("Unknown document type " + docType);
+ throw new IllegalArgumentException("Unknown document type " + docType);
}
- StringTokenizer tokenizer = new StringTokenizer(fieldNames, ",");
FieldCollection collection = new FieldCollection(type);
-
- while (tokenizer.hasMoreTokens()) {
- String token = tokenizer.nextToken();
- Field f = type.getField(token);
- if (f == null) {
- throw new IllegalArgumentException("No such field " + token);
+ if (fieldNames.equals("[document]")) {
+ collection.addAll(type.fieldSet());
+ }
+ else {
+ StringTokenizer tokenizer = new StringTokenizer(fieldNames, ",");
+ while (tokenizer.hasMoreTokens()) {
+ String token = tokenizer.nextToken();
+ Field f = type.getField(token);
+ if (f == null) {
+ throw new IllegalArgumentException("No such field " + token);
+ }
+ collection.add(f);
}
- collection.add(f);
}
-
return collection;
}
diff --git a/document/src/vespa/document/base/globalid.cpp b/document/src/vespa/document/base/globalid.cpp
index bdf81d1c3c8..4f96ac0e659 100644
--- a/document/src/vespa/document/base/globalid.cpp
+++ b/document/src/vespa/document/base/globalid.cpp
@@ -39,39 +39,6 @@ getHexVal(char c)
namespace document {
-bool
-GlobalId::BucketOrderCmp::operator()(const GlobalId &lhs, const GlobalId &rhs) const
-{
- const unsigned char * __restrict__ a = lhs._gid._buffer;
- const unsigned char * __restrict__ b = rhs._gid._buffer;
- int diff;
- if ((diff = compare(a[0], b[0])) != 0) {
- return diff < 0;
- }
- if ((diff = compare(a[1], b[1])) != 0) {
- return diff < 0;
- }
- if ((diff = compare(a[2], b[2])) != 0) {
- return diff < 0;
- }
- if ((diff = compare(a[3], b[3])) != 0) {
- return diff < 0;
- }
- if ((diff = compare(a[8], b[8])) != 0) {
- return diff < 0;
- }
- if ((diff = compare(a[9], b[9])) != 0) {
- return diff < 0;
- }
- if ((diff = compare(a[10], b[10])) != 0) {
- return diff < 0;
- }
- if ((diff = compare(a[11], b[11])) != 0) {
- return diff < 0;
- }
- return lhs < rhs;
-}
-
vespalib::string GlobalId::toString() const {
vespalib::asciistream out;
out << "gid(0x" << vespalib::hex;
diff --git a/document/src/vespa/document/base/globalid.h b/document/src/vespa/document/base/globalid.h
index 1bb257b7cbf..ac30e189033 100644
--- a/document/src/vespa/document/base/globalid.h
+++ b/document/src/vespa/document/base/globalid.h
@@ -64,7 +64,24 @@ public:
* given bucket.
*/
struct BucketOrderCmp {
- bool operator()(const GlobalId &lhs, const GlobalId &rhs) const;
+ bool operator()(const GlobalId &lhs, const GlobalId &rhs) const {
+ const uint32_t * __restrict__ a = lhs._gid._nums;
+ const uint32_t * __restrict__ b = rhs._gid._nums;
+ if (a[0] != b[0]) {
+ return bitswap(a[0]) < bitswap(b[0]);
+ }
+ if (a[2] != b[2]) {
+ return bitswap(a[2]) < bitswap(b[2]);
+ }
+ return __builtin_bswap32(a[1]) < __builtin_bswap32(b[1]);
+ }
+ static uint32_t bitswap(uint32_t value) {
+ value = ((value & 0x55555555) << 1) | ((value & 0xaaaaaaaa) >> 1);
+ value = ((value & 0x33333333) << 2) | ((value & 0xcccccccc) >> 2);
+ value = ((value & 0x0f0f0f0f) << 4) | ((value & 0xf0f0f0f0) >> 4);
+ return __builtin_bswap32(value);
+ }
+
//These 2 compare methods are exposed only for testing
static int compareRaw(unsigned char a, unsigned char b) {
return a - b;
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java b/documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java
index 0dd96275f9d..e98be6871b4 100755
--- a/documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java
@@ -26,8 +26,8 @@ import java.util.logging.Logger;
*/
public class VisitorIterator {
- private ProgressToken progressToken;
- private BucketSource bucketSource;
+ private final ProgressToken progressToken;
+ private final BucketSource bucketSource;
private int distributionBitCount;
private static final Logger log = Logger.getLogger(VisitorIterator.class.getName());
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java
index e0ae0278de8..d332b1fb1ca 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java
@@ -28,7 +28,7 @@ import java.util.concurrent.atomic.AtomicReference;
/**
* Local visitor session that copies and iterates through all items in the local document access.
- * Each document must be ack'ed for the session to be done visiting.
+ * Each document must be ack'ed for the session to be done visiting, unless the destination is remote.
* Only document puts are sent by this session, and this is done from a separate thread.
*
* @author jonmv
@@ -44,20 +44,23 @@ public class LocalVisitorSession implements VisitorSession {
private final FieldSet fieldSet;
private final AtomicReference<State> state;
private final AtomicReference<Phaser> phaser;
+ private final ProgressToken token;
public LocalVisitorSession(LocalDocumentAccess access, VisitorParameters parameters) throws ParseException {
- if (parameters.getResumeToken() != null)
- throw new UnsupportedOperationException("Continuation via progress tokens is not supported");
-
- if (parameters.getRemoteDataHandler() != null)
- throw new UnsupportedOperationException("Remote data handlers are not supported");
-
this.selector = new DocumentSelector(parameters.getDocumentSelection());
this.fieldSet = new FieldSetRepo().parse(access.getDocumentTypeManager(), parameters.fieldSet());
-
- this.data = parameters.getLocalDataHandler() == null ? new VisitorDataQueue() : parameters.getLocalDataHandler();
- this.data.reset();
- this.data.setSession(this);
+ this.token = parameters.getResumeToken();
+
+ if (parameters.getRemoteDataHandler() == null) {
+ this.data = parameters.getLocalDataHandler() == null ? new VisitorDataQueue() : parameters.getLocalDataHandler();
+ this.data.reset();
+ this.data.setSession(this);
+ }
+ else {
+ if (parameters.getLocalDataHandler() != null)
+ throw new IllegalArgumentException("Cannot have both a remote and a local data handler");
+ this.data = null;
+ }
this.control = parameters.getControlHandler() == null ? new VisitorControlHandler() : parameters.getControlHandler();
this.control.reset();
@@ -98,8 +101,11 @@ public class LocalVisitorSession implements VisitorSession {
if (synchronizer != null)
synchronizer.arriveAndAwaitAdvance();
- data.onMessage(new PutDocumentMessage(new DocumentPut(copy)),
- new AckToken(id));
+ if (data != null)
+ data.onMessage(new PutDocumentMessage(new DocumentPut(copy)),
+ new AckToken(id));
+ else
+ outstanding.remove(id);
if (synchronizer != null)
synchronizer.arriveAndAwaitAdvance();
@@ -127,9 +133,9 @@ public class LocalVisitorSession implements VisitorSession {
}
finally {
if (synchronizer != null)
- synchronizer.arriveAndDeregister();
-
- data.onDone();
+ synchronizer.awaitAdvance(synchronizer.arriveAndDeregister());
+ if (data != null)
+ data.onDone();
}
}).start();
}
@@ -140,9 +146,10 @@ public class LocalVisitorSession implements VisitorSession {
&& control.isDone(); // Control handler has been notified
}
+ /** Returns the token set in the parameters used to create this. */
@Override
public ProgressToken getProgress() {
- throw new UnsupportedOperationException("Progress tokens are not supported");
+ return token;
}
@Override
@@ -179,6 +186,12 @@ public class LocalVisitorSession implements VisitorSession {
@Override
public void destroy() {
abort();
+ try {
+ control.waitUntilDone(0);
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
}
}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java
index e1d18080faf..257d491ea93 100755
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java
@@ -1166,7 +1166,7 @@ public class MessageBusVisitorSession implements VisitorSession {
}
}
} catch (InterruptedException e) {
- e.printStackTrace();
+ log.log(Level.WARNING, "Interrupted waiting for visitor session to be destroyed");
} finally {
try {
sender.destroy();
diff --git a/eval/src/vespa/eval/eval/fast_sparse_map.cpp b/eval/src/vespa/eval/eval/fast_sparse_map.cpp
index 3215b5f8995..2e95934286c 100644
--- a/eval/src/vespa/eval/eval/fast_sparse_map.cpp
+++ b/eval/src/vespa/eval/eval/fast_sparse_map.cpp
@@ -7,6 +7,8 @@ namespace vespalib::eval {
FastSparseMap::~FastSparseMap() = default;
+const FastSparseMap::HashedLabel FastSparseMap::empty_label;
+
}
VESPALIB_HASH_MAP_INSTANTIATE_H_E(vespalib::eval::FastSparseMap::Key, uint32_t, vespalib::eval::FastSparseMap::Hash, vespalib::eval::FastSparseMap::Equal);
diff --git a/eval/src/vespa/eval/eval/fast_sparse_map.h b/eval/src/vespa/eval/eval/fast_sparse_map.h
index 19c171cfed8..0d7597a19a0 100644
--- a/eval/src/vespa/eval/eval/fast_sparse_map.h
+++ b/eval/src/vespa/eval/eval/fast_sparse_map.h
@@ -58,6 +58,8 @@ public:
HashedLabel(const vespalib::stringref *str) : label(*str), hash(hash_label(*str)) {}
};
+ static const HashedLabel empty_label;
+
static uint64_t hash_label(const HashedLabel &label) {
return label.hash;
}
diff --git a/eval/src/vespa/eval/eval/fast_value.hpp b/eval/src/vespa/eval/eval/fast_value.hpp
index 559f2e0a7f6..932924cd409 100644
--- a/eval/src/vespa/eval/eval/fast_value.hpp
+++ b/eval/src/vespa/eval/eval/fast_value.hpp
@@ -155,17 +155,17 @@ struct FastValueIndex final : Value::Index {
ConstArrayRef<LCT> lhs_cells, ConstArrayRef<RCT> rhs_cells, Stash &stash);
template <typename LCT, typename RCT, typename OCT, typename Fun>
- static const Value &sparse_only_merge(const ValueType &res_type, const Fun &fun,
- const FastValueIndex &lhs, const FastValueIndex &rhs,
- ConstArrayRef<LCT> lhs_cells, ConstArrayRef<RCT> rhs_cells,
- Stash &stash) __attribute((noinline));
-
- template <typename LCT, typename RCT, typename OCT, typename Fun>
static const Value &sparse_no_overlap_join(const ValueType &res_type, const Fun &fun,
const FastValueIndex &lhs, const FastValueIndex &rhs,
const std::vector<JoinAddrSource> &addr_sources,
ConstArrayRef<LCT> lhs_cells, ConstArrayRef<RCT> rhs_cells, Stash &stash);
+ template <typename LCT, typename RCT, typename OCT, typename Fun>
+ static const Value &sparse_only_merge(const ValueType &res_type, const Fun &fun,
+ const FastValueIndex &lhs, const FastValueIndex &rhs,
+ ConstArrayRef<LCT> lhs_cells, ConstArrayRef<RCT> rhs_cells,
+ Stash &stash) __attribute((noinline));
+
size_t size() const override { return map.size(); }
std::unique_ptr<View> create_view(const std::vector<size_t> &dims) const override;
};
@@ -333,6 +333,59 @@ FastValueIndex::sparse_full_overlap_join(const ValueType &res_type, const Fun &f
template <typename LCT, typename RCT, typename OCT, typename Fun>
const Value &
+FastValueIndex::sparse_no_overlap_join(const ValueType &res_type, const Fun &fun,
+ const FastValueIndex &lhs, const FastValueIndex &rhs,
+ const std::vector<JoinAddrSource> &addr_sources,
+ ConstArrayRef<LCT> lhs_cells, ConstArrayRef<RCT> rhs_cells, Stash &stash)
+{
+ using HashedLabelRef = std::reference_wrapper<const FastSparseMap::HashedLabel>;
+ size_t num_mapped_dims = addr_sources.size();
+ auto &result = stash.create<FastValue<OCT>>(res_type, num_mapped_dims, 1, lhs.map.size()*rhs.map.size());
+ std::vector<HashedLabelRef> output_addr(num_mapped_dims, FastSparseMap::empty_label);
+ std::vector<size_t> store_lhs_idx;
+ std::vector<size_t> store_rhs_idx;
+ size_t out_idx = 0;
+ for (JoinAddrSource source : addr_sources) {
+ switch (source) {
+ case JoinAddrSource::LHS:
+ store_lhs_idx.push_back(out_idx++);
+ break;
+ case JoinAddrSource::RHS:
+ store_rhs_idx.push_back(out_idx++);
+ break;
+ default:
+ abort();
+ }
+ }
+ assert(out_idx == output_addr.size());
+ for (size_t lhs_subspace = 0; lhs_subspace < lhs.map.size(); ++lhs_subspace) {
+ auto l_addr = lhs.map.make_addr(lhs_subspace);
+ assert(l_addr.size() == store_lhs_idx.size());
+ for (size_t i = 0; i < store_lhs_idx.size(); ++i) {
+ size_t addr_idx = store_lhs_idx[i];
+ output_addr[addr_idx] = l_addr[i];
+ }
+ for (size_t rhs_subspace = 0; rhs_subspace < rhs.map.size(); ++rhs_subspace) {
+ auto r_addr = rhs.map.make_addr(rhs_subspace);
+ assert(r_addr.size() == store_rhs_idx.size());
+ for (size_t i = 0; i < store_rhs_idx.size(); ++i) {
+ size_t addr_idx = store_rhs_idx[i];
+ output_addr[addr_idx] = r_addr[i];
+ }
+ auto idx = result.my_index.map.add_mapping(ConstArrayRef(output_addr));
+ if (__builtin_expect((idx == result.my_cells.size), true)) {
+ auto cell_value = fun(lhs_cells[lhs_subspace], rhs_cells[rhs_subspace]);
+ result.my_cells.push_back_fast(cell_value);
+ }
+ }
+ }
+ return result;
+}
+
+//-----------------------------------------------------------------------------
+
+template <typename LCT, typename RCT, typename OCT, typename Fun>
+const Value &
FastValueIndex::sparse_only_merge(const ValueType &res_type, const Fun &fun,
const FastValueIndex &lhs, const FastValueIndex &rhs,
ConstArrayRef<LCT> lhs_cells, ConstArrayRef<RCT> rhs_cells, Stash &stash)
@@ -365,47 +418,4 @@ FastValueIndex::sparse_only_merge(const ValueType &res_type, const Fun &fun,
return result;
}
-//-----------------------------------------------------------------------------
-
-template <typename LCT, typename RCT, typename OCT, typename Fun>
-const Value &
-FastValueIndex::sparse_no_overlap_join(const ValueType &res_type, const Fun &fun,
- const FastValueIndex &lhs, const FastValueIndex &rhs,
- const std::vector<JoinAddrSource> &addr_sources,
- ConstArrayRef<LCT> lhs_cells, ConstArrayRef<RCT> rhs_cells, Stash &stash)
-{
- using HashedLabelRef = std::reference_wrapper<const FastSparseMap::HashedLabel>;
- auto &result = stash.create<FastValue<OCT>>(res_type, res_type.count_mapped_dimensions(), 1, lhs.map.size()*rhs.map.size());
- std::vector<HashedLabelRef> output_addr;
- for (size_t lhs_subspace = 0; lhs_subspace < lhs.map.size(); ++lhs_subspace) {
- auto l_addr = lhs.map.make_addr(lhs_subspace);
- for (size_t rhs_subspace = 0; rhs_subspace < rhs.map.size(); ++rhs_subspace) {
- auto r_addr = rhs.map.make_addr(rhs_subspace);
- output_addr.clear();
- size_t l_idx = 0;
- size_t r_idx = 0;
- for (JoinAddrSource source : addr_sources) {
- switch (source) {
- case JoinAddrSource::LHS:
- output_addr.push_back(l_addr[l_idx++]);
- break;
- case JoinAddrSource::RHS:
- output_addr.push_back(r_addr[r_idx++]);
- break;
- default:
- abort();
- }
- }
- assert(l_idx == l_addr.size());
- assert(r_idx == r_addr.size());
- auto idx = result.my_index.map.add_mapping(ConstArrayRef(output_addr));
- if (__builtin_expect((idx == result.my_cells.size), true)) {
- auto cell_value = fun(lhs_cells[lhs_subspace], rhs_cells[rhs_subspace]);
- result.my_cells.push_back_fast(cell_value);
- }
- }
- }
- return result;
-}
-
}
diff --git a/jaxrs_client_utils/src/main/java/ai/vespa/util/http/VespaClientBuilderFactory.java b/jaxrs_client_utils/src/main/java/ai/vespa/util/http/VespaClientBuilderFactory.java
index 2bac7f66799..c6afa889041 100644
--- a/jaxrs_client_utils/src/main/java/ai/vespa/util/http/VespaClientBuilderFactory.java
+++ b/jaxrs_client_utils/src/main/java/ai/vespa/util/http/VespaClientBuilderFactory.java
@@ -27,8 +27,10 @@ import static java.util.logging.Level.CONFIG;
* - hostname verification is not enabled - CN/SAN verification is assumed to be handled by the underlying x509 trust manager.
* - ssl context or hostname verifier must not be overridden by the caller
*
+ * @deprecated Use Apache httpclient based client factory instead (VespaHttpClientBuilder).
* @author bjorncs
*/
+@Deprecated(forRemoval = true)
public class VespaClientBuilderFactory implements AutoCloseable {
private static final Logger log = Logger.getLogger(VespaClientBuilderFactory.class.getName());
diff --git a/jaxrs_client_utils/src/main/java/com/yahoo/vespa/jaxrs/client/VespaJerseyJaxRsClientFactory.java b/jaxrs_client_utils/src/main/java/com/yahoo/vespa/jaxrs/client/VespaJerseyJaxRsClientFactory.java
index bdc89d737d4..6d1c1c71f21 100644
--- a/jaxrs_client_utils/src/main/java/com/yahoo/vespa/jaxrs/client/VespaJerseyJaxRsClientFactory.java
+++ b/jaxrs_client_utils/src/main/java/com/yahoo/vespa/jaxrs/client/VespaJerseyJaxRsClientFactory.java
@@ -17,10 +17,13 @@ import java.util.List;
/**
* Factory for creating Jersey based Vespa clients from a JAX-RS resource interface.
*
+ * @deprecated Use Apache httpclient based client factory instead (VespaHttpClientBuilder).
* @author bjorncs
*/
+@Deprecated(forRemoval = true)
public class VespaJerseyJaxRsClientFactory implements JaxRsClientFactory, AutoCloseable {
+ @SuppressWarnings("removal")
private final VespaClientBuilderFactory clientBuilder = new VespaClientBuilderFactory();
// Client is a heavy-weight object with a finalizer so we create only one and re-use it
private final Client client;
diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/identity/AthenzCredentialsMaintainer.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/identity/AthenzCredentialsMaintainer.java
index 0a29da57220..d6c08a820cd 100644
--- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/identity/AthenzCredentialsMaintainer.java
+++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/identity/AthenzCredentialsMaintainer.java
@@ -198,7 +198,7 @@ public class AthenzCredentialsMaintainer implements CredentialsMaintainer {
HostnameVerifier ztsHostNameVerifier = useInternalZts
? new AthenzIdentityVerifier(Set.of(configserverIdentity))
: null;
- try (ZtsClient ztsClient = new DefaultZtsClient.Builder(ztsEndpoint).withIdentityProvider(hostIdentityProvider).withHostnameVerifier(ztsHostNameVerifier).build()) {
+ try (ZtsClient ztsClient = new DefaultZtsClient(ztsEndpoint, hostIdentityProvider, ztsHostNameVerifier)) {
InstanceIdentity instanceIdentity =
ztsClient.registerInstance(
configserverIdentity,
@@ -227,7 +227,7 @@ public class AthenzCredentialsMaintainer implements CredentialsMaintainer {
HostnameVerifier ztsHostNameVerifier = useInternalZts
? new AthenzIdentityVerifier(Set.of(configserverIdentity))
: null;
- try (ZtsClient ztsClient = new DefaultZtsClient.Builder(ztsEndpoint).withSslContext(containerIdentitySslContext).withHostnameVerifier(ztsHostNameVerifier).build()) {
+ try (ZtsClient ztsClient = new DefaultZtsClient(ztsEndpoint, containerIdentitySslContext, ztsHostNameVerifier)) {
InstanceIdentity instanceIdentity =
ztsClient.refreshInstance(
configserverIdentity,
diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/QuestMetricsDb.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/QuestMetricsDb.java
index 070bf98bf87..163ee74e8f1 100644
--- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/QuestMetricsDb.java
+++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/QuestMetricsDb.java
@@ -114,18 +114,23 @@ public class QuestMetricsDb implements MetricsDb {
// Since we remove full days at once we need to keep at least the scaling window + 1 day
Instant oldestToKeep = clock.instant().minus(Autoscaler.maxScalingWindow().plus(Duration.ofDays(1)));
SqlExecutionContext context = newContext();
+ int partitions = 0;
try (SqlCompiler compiler = new SqlCompiler(engine)) {
File tableRoot = new File(dataDir, tableName);
List<String> removeList = new ArrayList<>();
for (String dirEntry : tableRoot.list()) {
File partitionDir = new File(tableRoot, dirEntry);
if ( ! partitionDir.isDirectory()) continue;
+
+ partitions++;
DateTimeFormatter formatter = DateTimeFormatter.ISO_DATE_TIME.withZone(ZoneId.of("UTC"));
Instant partitionDay = Instant.from(formatter.parse(dirEntry + "T00:00:00"));
if (partitionDay.isBefore(oldestToKeep))
removeList.add(dirEntry);
+
}
- if ( ! removeList.isEmpty())
+ // Remove unless all partitions are old: Removing all partitions "will be supported in the future"
+ if ( removeList.size() < partitions && ! removeList.isEmpty())
compiler.compile("alter table " + tableName + " drop partition " +
removeList.stream().map(dir -> "'" + dir + "'").collect(Collectors.joining(",")),
context);
diff --git a/orchestrator/src/main/java/com/yahoo/vespa/orchestrator/controller/RetryingClusterControllerClientFactory.java b/orchestrator/src/main/java/com/yahoo/vespa/orchestrator/controller/RetryingClusterControllerClientFactory.java
index 4b82f278f23..e2e769f8556 100644
--- a/orchestrator/src/main/java/com/yahoo/vespa/orchestrator/controller/RetryingClusterControllerClientFactory.java
+++ b/orchestrator/src/main/java/com/yahoo/vespa/orchestrator/controller/RetryingClusterControllerClientFactory.java
@@ -14,6 +14,7 @@ import java.util.List;
/**
* @author bakksjo
*/
+@SuppressWarnings("removal") // VespaJerseyJaxRsClientFactory
public class RetryingClusterControllerClientFactory extends AbstractComponent implements ClusterControllerClientFactory {
// TODO: Figure this port out dynamically.
diff --git a/orchestrator/src/test/java/com/yahoo/vespa/orchestrator/controller/RetryingClusterControllerClientFactoryTest.java b/orchestrator/src/test/java/com/yahoo/vespa/orchestrator/controller/RetryingClusterControllerClientFactoryTest.java
index 95fdd61563b..309d6a756f6 100644
--- a/orchestrator/src/test/java/com/yahoo/vespa/orchestrator/controller/RetryingClusterControllerClientFactoryTest.java
+++ b/orchestrator/src/test/java/com/yahoo/vespa/orchestrator/controller/RetryingClusterControllerClientFactoryTest.java
@@ -27,6 +27,7 @@ public class RetryingClusterControllerClientFactoryTest {
private final Clock clock = new ManualClock();
@Test
+ @SuppressWarnings("removal") // VespaJerseyJaxRsClientFactory
public void verifyJerseyCallForSetNodeState() throws IOException {
VespaJerseyJaxRsClientFactory clientFactory = mock(VespaJerseyJaxRsClientFactory.class);
ClusterControllerJaxRsApi api = mock(ClusterControllerJaxRsApi.class);
diff --git a/searchcore/src/apps/vespa-feed-bm/runtest.sh b/searchcore/src/apps/vespa-feed-bm/runtest.sh
new file mode 100755
index 00000000000..c859fe723b6
--- /dev/null
+++ b/searchcore/src/apps/vespa-feed-bm/runtest.sh
@@ -0,0 +1,32 @@
+#!/bin/bash
+
+numdocs=2000000
+stripe_bits=8
+
+base_cmd="numactl --cpunodebind=0 --localalloc perf stat -ddd env LD_PRELOAD=$HOME/vespa/lib64/vespa/malloc/libvespamalloc.so ./vespa-feed-bm --documents $numdocs --put-passes 1 --update-passes 10 --remove-passes 0 --max-pending 8000 --indexing-sequencer throughput"
+
+spi_only="$base_cmd --client-threads 1"
+base_for_rest="$base_cmd --client-threads 2 --response-threads 3"
+
+chain_base="$base_for_rest --use-storage-chain "
+chain_stripe="$chain_base --bucket-db-stripe-bits $stripe_bits"
+chain_stripe_async="$chain_stripe --use-async-message-handling"
+service_layer="$base_for_rest --enable-service-layer --bucket-db-stripe-bits $stripe_bits --use-async-message-handling"
+service_layer_rpc="$service_layer --rpc-network-threads 5 --rpc-targets-per-node 8"
+service_layer_mbus="$service_layer --use-message-bus"
+distributor="$service_layer_rpc --enable-distributor"
+
+echo "Running test: spi_only"
+$spi_only
+#echo "Running test: chain_base"
+#$chain_base
+echo "Running test: chain_stripe"
+$chain_stripe
+echo "Running test: chain_stripe_async"
+$chain_stripe_async
+echo "Running test: service_layer_rpc"
+$service_layer_rpc
+#echo "Running test: service_layer_mbus"
+#$service_layer_mbus
+echo "Running test: distributor"
+$distributor
diff --git a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
index 7082b9bfef2..a55638651d6 100644
--- a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
+++ b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
@@ -280,7 +280,6 @@ class BMParams {
bool _use_document_api;
bool _use_message_bus;
bool _use_storage_chain;
- bool _use_legacy_bucket_db;
bool _use_async_message_handling_on_schedule;
uint32_t _bucket_db_stripe_bits;
uint32_t get_start(uint32_t thread_id) const {
@@ -305,7 +304,6 @@ public:
_use_document_api(false),
_use_message_bus(false),
_use_storage_chain(false),
- _use_legacy_bucket_db(false),
_use_async_message_handling_on_schedule(false),
_bucket_db_stripe_bits(0)
{
@@ -329,7 +327,6 @@ public:
bool get_use_document_api() const { return _use_document_api; }
bool get_use_message_bus() const { return _use_message_bus; }
bool get_use_storage_chain() const { return _use_storage_chain; }
- bool get_use_legacy_bucket_db() const { return _use_legacy_bucket_db; }
bool get_use_async_message_handling_on_schedule() const { return _use_async_message_handling_on_schedule; }
uint32_t get_bucket_db_stripe_bits() const { return _bucket_db_stripe_bits; }
void set_documents(uint32_t documents_in) { _documents = documents_in; }
@@ -349,7 +346,6 @@ public:
void set_use_document_api(bool value) { _use_document_api = value; }
void set_use_message_bus(bool value) { _use_message_bus = value; }
void set_use_storage_chain(bool value) { _use_storage_chain = value; }
- void set_use_legacy_bucket_db(bool value) { _use_legacy_bucket_db = value; }
void set_use_async_message_handling_on_schedule(bool value) { _use_async_message_handling_on_schedule = value; }
void set_bucket_db_stripe_bits(uint32_t value) { _bucket_db_stripe_bits = value; }
bool check() const;
@@ -490,7 +486,6 @@ struct MyStorageConfig
dc.readyCopies = 1;
}
stor_server.isDistributor = distributor;
- stor_server.useContentNodeBtreeBucketDb = !params.get_use_legacy_bucket_db();
stor_server.contentNodeBucketDbStripeBits = params.get_bucket_db_stripe_bits();
if (distributor) {
stor_server.rootFolder = "distributor";
@@ -1388,7 +1383,6 @@ App::usage()
"[--enable-service-layer]\n"
"[--skip-get-spi-bucket-info]\n"
"[--use-document-api]\n"
- "[--use-legacy-bucket-db]\n"
"[--use-async-message-handling]\n"
"[--use-message-bus\n"
"[--use-storage-chain]" << std::endl;
@@ -1418,7 +1412,6 @@ App::get_options()
{ "update-passes", 1, nullptr, 0 },
{ "use-async-message-handling", 0, nullptr, 0 },
{ "use-document-api", 0, nullptr, 0 },
- { "use-legacy-bucket-db", 0, nullptr, 0 },
{ "use-message-bus", 0, nullptr, 0 },
{ "use-storage-chain", 0, nullptr, 0 }
};
@@ -1440,7 +1433,6 @@ App::get_options()
LONGOPT_UPDATE_PASSES,
LONGOPT_USE_ASYNC_MESSAGE_HANDLING,
LONGOPT_USE_DOCUMENT_API,
- LONGOPT_USE_LEGACY_BUCKET_DB,
LONGOPT_USE_MESSAGE_BUS,
LONGOPT_USE_STORAGE_CHAIN
};
@@ -1501,9 +1493,6 @@ App::get_options()
case LONGOPT_USE_DOCUMENT_API:
_bm_params.set_use_document_api(true);
break;
- case LONGOPT_USE_LEGACY_BUCKET_DB:
- _bm_params.set_use_legacy_bucket_db(true);
- break;
case LONGOPT_USE_MESSAGE_BUS:
_bm_params.set_use_message_bus(true);
break;
diff --git a/searchcore/src/tests/proton/bucketdb/bucketdb/bucketdb_test.cpp b/searchcore/src/tests/proton/bucketdb/bucketdb/bucketdb_test.cpp
index 998b1626aeb..aff5573e4e0 100644
--- a/searchcore/src/tests/proton/bucketdb/bucketdb/bucketdb_test.cpp
+++ b/searchcore/src/tests/proton/bucketdb/bucketdb/bucketdb_test.cpp
@@ -136,6 +136,10 @@ TEST_F("require that bucket checksum is a combination of sub db types", Fixture)
EXPECT_EQUAL(zero, f.remove(TIME_3, SDT::REMOVED).getChecksum());
}
+TEST("require that BucketState follows checksum type") {
+ EXPECT_EQUAL(48u, sizeof(BucketState));
+}
+
TEST_F("require that bucket is ready when not having docs in notready sub db", Fixture)
{
assertReady(true, f.get());
@@ -222,47 +226,47 @@ verifyChecksumCompliance(ChecksumAggregator::ChecksumType type) {
GlobalId gid2("bbbbbbbbbbbb");
Timestamp t1(0);
Timestamp t2(1);
- auto ckaggr = ChecksumAggregator::create(type, BucketChecksum(0));
+ BucketState::setChecksumType(type);
+ BucketState bs;
- EXPECT_EQUAL(0u, ckaggr->getChecksum());
- ckaggr->addDoc(gid1, t1);
- BucketChecksum afterAdd = ckaggr->getChecksum();
+ EXPECT_EQUAL(0u, bs.getChecksum());
+ bs.add(gid1, t1, 1, SubDbType::READY);
+ BucketChecksum afterAdd = bs.getChecksum();
EXPECT_NOT_EQUAL(0u, afterAdd); // add Changes checksum
- ckaggr->removeDoc(gid1, t1);
- EXPECT_EQUAL(0u, ckaggr->getChecksum()); // add/remove are symmetrical
- ckaggr->addDoc(gid1, t2);
- EXPECT_NOT_EQUAL(afterAdd, ckaggr->getChecksum()); // timestamp changes checksum
- ckaggr->removeDoc(gid1, t2);
- EXPECT_EQUAL(0u, ckaggr->getChecksum()); // add/remove are symmetrical
- ckaggr->addDoc(gid2, t1);
- EXPECT_NOT_EQUAL(afterAdd, ckaggr->getChecksum()); // gid changes checksum
- ckaggr->removeDoc(gid2, t1);
- EXPECT_EQUAL(0u, ckaggr->getChecksum()); // add/remove are symmetrical
+ bs.remove(gid1, t1, 1, SubDbType::READY);
+ EXPECT_EQUAL(0u, bs.getChecksum()); // add/remove are symmetrical
+ bs.add(gid1, t2, 1, SubDbType::READY);
+ EXPECT_NOT_EQUAL(afterAdd, bs.getChecksum()); // timestamp changes checksum
+ bs.remove(gid1, t2, 1, SubDbType::READY);
+ EXPECT_EQUAL(0u, bs.getChecksum()); // add/remove are symmetrical
+ bs.add(gid2, t1, 1, SubDbType::READY);
+ EXPECT_NOT_EQUAL(afterAdd, bs.getChecksum()); // gid changes checksum
+ bs.remove(gid2, t1, 1, SubDbType::READY);
+ EXPECT_EQUAL(0u, bs.getChecksum()); // add/remove are symmetrical
{
// Verify order does not matter, only current content. A,B == B,A
- ckaggr->addDoc(gid1, t1);
- BucketChecksum after1AddOfGid1 = ckaggr->getChecksum();
- ckaggr->addDoc(gid2, t2);
- BucketChecksum after2Add1 = ckaggr->getChecksum();
- ckaggr->removeDoc(gid2, t2);
- EXPECT_EQUAL(after1AddOfGid1, ckaggr->getChecksum());
- ckaggr->removeDoc(gid1, t1);
- EXPECT_EQUAL(0u, ckaggr->getChecksum());
+ bs.add(gid1, t1, 1, SubDbType::READY);
+ BucketChecksum after1AddOfGid1 = bs.getChecksum();
+ bs.add(gid2, t2, 1, SubDbType::READY);
+ BucketChecksum after2Add1 = bs.getChecksum();
+ bs.remove(gid2, t2, 1, SubDbType::READY);
+ EXPECT_EQUAL(after1AddOfGid1, bs.getChecksum());
+ bs.remove(gid1, t1, 1, SubDbType::READY);
+ EXPECT_EQUAL(0u, bs.getChecksum());
- ckaggr->addDoc(gid2, t2);
- EXPECT_NOT_EQUAL(after1AddOfGid1, ckaggr->getChecksum());
- ckaggr->addDoc(gid1, t1);
- EXPECT_EQUAL(after2Add1, ckaggr->getChecksum());
- ckaggr->removeDoc(gid2, t2);
- EXPECT_EQUAL(after1AddOfGid1, ckaggr->getChecksum());
- ckaggr->removeDoc(gid1, t1);
- EXPECT_EQUAL(0u, ckaggr->getChecksum()); // add/remove are symmetrical
+ bs.add(gid2, t2, 1, SubDbType::READY);
+ EXPECT_NOT_EQUAL(after1AddOfGid1, bs.getChecksum());
+ bs.add(gid1, t1, 1, SubDbType::READY);
+ EXPECT_EQUAL(after2Add1, bs.getChecksum());
+ bs.remove(gid2, t2, 1, SubDbType::READY);
+ EXPECT_EQUAL(after1AddOfGid1, bs.getChecksum());
+ bs.remove(gid1, t1, 1, SubDbType::READY);
+ EXPECT_EQUAL(0u, bs.getChecksum()); // add/remove are symmetrical
}
- ckaggr->addDoc(gid1, t1); // Add something so we can verify it does not change between releases.
-
- return ckaggr->getChecksum();
+ bs.add(gid1, t1, 1, SubDbType::READY); // Add something so we can verify it does not change between releases.
+ return bs.getChecksum();
}
TEST("test that legacy checksum complies") {
diff --git a/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp b/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp
index 54fe88050b5..b875ab8e058 100644
--- a/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp
@@ -355,7 +355,7 @@ struct MyAttributeWriter : public IAttributeWriter
_updateLid = lid;
for (const auto & fieldUpdate : upd.getUpdates()) {
search::AttributeVector * attr = getWritableAttribute(fieldUpdate.getField().getName());
- onUpdate.onUpdateField(fieldUpdate.getField(), attr);
+ onUpdate.onUpdateField(fieldUpdate.getField().getName(), attr);
}
}
void update(SerialNum serialNum, const document::Document &doc, DocumentIdT lid, OnWriteDoneType) override {
diff --git a/searchcore/src/tests/proton/documentmetastore/documentmetastore_test.cpp b/searchcore/src/tests/proton/documentmetastore/documentmetastore_test.cpp
index 60d96e37e15..db0f26a9d55 100644
--- a/searchcore/src/tests/proton/documentmetastore/documentmetastore_test.cpp
+++ b/searchcore/src/tests/proton/documentmetastore/documentmetastore_test.cpp
@@ -759,7 +759,6 @@ TEST(DocumentMetaStoreTest, can_sort_gids)
}
}
-template <typename ChecksumType>
void
requireThatBasicBucketInfoWorks()
{
@@ -773,8 +772,7 @@ requireThatBasicBucketInfoWorks()
GlobalId gid = createGid(lid);
Timestamp timestamp(UINT64_C(123456789) * lid);
Timestamp oldTimestamp;
- BucketId bucketId(minNumBits,
- gid.convertToBucketId().getRawId());
+ BucketId bucketId(minNumBits, gid.convertToBucketId().getRawId());
uint32_t addLid = addGid(dms, gid, bucketId, timestamp);
EXPECT_EQ(lid, addLid);
m[std::make_pair(bucketId, gid)] = timestamp;
@@ -783,37 +781,35 @@ requireThatBasicBucketInfoWorks()
GlobalId gid = createGid(lid);
Timestamp timestamp(UINT64_C(14735) * lid);
Timestamp oldTimestamp;
- BucketId bucketId(minNumBits,
- gid.convertToBucketId().getRawId());
+ BucketId bucketId(minNumBits, gid.convertToBucketId().getRawId());
uint32_t addLid = addGid(dms, gid, bucketId, timestamp);
EXPECT_EQ(lid, addLid);
m[std::make_pair(bucketId, gid)] = timestamp;
}
for (uint32_t lid = 3; lid <= numLids; lid += 5) {
GlobalId gid = createGid(lid);
- BucketId bucketId(minNumBits,
- gid.convertToBucketId().getRawId());
+ BucketId bucketId(minNumBits, gid.convertToBucketId().getRawId());
EXPECT_TRUE(dms.remove(lid, 0u));
dms.removeComplete(lid);
m.erase(std::make_pair(bucketId, gid));
}
assert(!m.empty());
- ChecksumType cksum(BucketChecksum(0));
+ BucketState cksum;
BucketId prevBucket = m.begin()->first.first;
uint32_t cnt = 0u;
uint32_t maxcnt = 0u;
BucketDBOwner::Guard bucketDB = dms.getBucketDB().takeGuard();
for (Map::const_iterator i = m.begin(), ie = m.end(); i != ie; ++i) {
if (i->first.first == prevBucket) {
- cksum.addDoc(i->first.second, i->second);
+ cksum.add(i->first.second, i->second, 1, SubDbType::READY);
++cnt;
} else {
BucketInfo bi = bucketDB->get(prevBucket);
EXPECT_EQ(cnt, bi.getDocumentCount());
EXPECT_EQ(cksum.getChecksum(), bi.getChecksum());
prevBucket = i->first.first;
- cksum = ChecksumType(BucketChecksum(0));
- cksum.addDoc(i->first.second, i->second);
+ cksum = BucketState();
+ cksum.add(i->first.second, i->second, 1, SubDbType::READY);
maxcnt = std::max(maxcnt, cnt);
cnt = 1u;
}
@@ -828,9 +824,9 @@ requireThatBasicBucketInfoWorks()
TEST(DocumentMetaStoreTest, basic_bucket_info_works)
{
BucketState::setChecksumType(BucketState::ChecksumType::LEGACY);
- requireThatBasicBucketInfoWorks<LegacyChecksumAggregator>();
+ requireThatBasicBucketInfoWorks();
BucketState::setChecksumType(BucketState::ChecksumType::XXHASH64);
- requireThatBasicBucketInfoWorks<XXH64ChecksumAggregator>();
+ requireThatBasicBucketInfoWorks();
}
TEST(DocumentMetaStoreTest, can_retrieve_list_of_lids_from_bucket_id)
diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp
index ee7f9d0c851..42971fe3d4c 100644
--- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp
+++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp
@@ -680,7 +680,7 @@ AttributeWriter::update(SerialNum serialNum, const DocumentUpdate &upd, Document
LOG(debug, "Retrieving guard for attribute vector '%s'.", fupd.getField().getName().data());
auto found = _attrMap.find(fupd.getField().getName());
AttributeVector * attrp = (found != _attrMap.end()) ? found->second.first : nullptr;
- onUpdate.onUpdateField(fupd.getField(), attrp);
+ onUpdate.onUpdateField(fupd.getField().getName(), attrp);
if (__builtin_expect(attrp == nullptr, false)) {
LOG(spam, "Failed to find attribute vector %s", fupd.getField().getName().data());
continue;
diff --git a/searchcore/src/vespa/searchcore/proton/attribute/ifieldupdatecallback.h b/searchcore/src/vespa/searchcore/proton/attribute/ifieldupdatecallback.h
index c7453d618ff..ffb8555cd2c 100644
--- a/searchcore/src/vespa/searchcore/proton/attribute/ifieldupdatecallback.h
+++ b/searchcore/src/vespa/searchcore/proton/attribute/ifieldupdatecallback.h
@@ -5,17 +5,16 @@
#include <vespa/vespalib/stllike/string.h>
namespace search { class AttributeVector; }
-namespace document { class Field; }
namespace proton {
struct IFieldUpdateCallback {
- virtual ~IFieldUpdateCallback() = default;
- virtual void onUpdateField(const document::Field & field, const search::AttributeVector * attr) = 0;
+ virtual ~IFieldUpdateCallback() { }
+ virtual void onUpdateField(vespalib::stringref fieldName, const search::AttributeVector * attr) = 0;
};
struct DummyFieldUpdateCallback : IFieldUpdateCallback {
- void onUpdateField(const document::Field & , const search::AttributeVector *) override {}
+ void onUpdateField(vespalib::stringref, const search::AttributeVector *) override {}
};
}
diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/bucketdb/CMakeLists.txt
index 375515938b3..6619f9e419d 100644
--- a/searchcore/src/vespa/searchcore/proton/bucketdb/CMakeLists.txt
+++ b/searchcore/src/vespa/searchcore/proton/bucketdb/CMakeLists.txt
@@ -8,7 +8,6 @@ vespa_add_library(searchcore_bucketdb STATIC
bucketdbhandler.cpp
bucketsessionbase.cpp
bucketstate.cpp
- checksumaggregator.cpp
checksumaggregators.cpp
joinbucketssession.cpp
splitbucketsession.cpp
diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/bucketstate.cpp b/searchcore/src/vespa/searchcore/proton/bucketdb/bucketstate.cpp
index cea30680faf..0eb9c27d9cf 100644
--- a/searchcore/src/vespa/searchcore/proton/bucketdb/bucketstate.cpp
+++ b/searchcore/src/vespa/searchcore/proton/bucketdb/bucketstate.cpp
@@ -18,11 +18,6 @@ toIdx(SubDbType subDbType) {
BucketState::ChecksumType BucketState::_checksumType = BucketState::ChecksumType::LEGACY;
-std::unique_ptr<ChecksumAggregator>
-BucketState::createChecksum(BucketChecksum seed) {
- return ChecksumAggregator::create(_checksumType, seed);
-}
-
void
BucketState::setChecksumType(ChecksumType type) {
_checksumType = type;
@@ -34,11 +29,12 @@ BucketState::BucketState(BucketState && rhs) noexcept = default;
BucketState & BucketState::operator=(BucketState && rhs) noexcept = default;
BucketState::BucketState()
- : _docCount(),
+ : _ch(),
_docSizes(),
- _checksum(createChecksum(BucketChecksum(0))),
+ _docCount(),
_active(false)
{
+ memset(&_ch, 0, sizeof(_ch));
for (uint32_t i = 0; i < COUNTS; ++i) {
_docCount[i] = 0;
_docSizes[i] = 0;
@@ -46,8 +42,25 @@ BucketState::BucketState()
}
BucketState::BucketChecksum
+BucketState::getChecksum() const {
+ switch (_checksumType) {
+ case ChecksumAggregator::ChecksumType::LEGACY:
+ return LegacyChecksumAggregator::get(_ch._legacy);
+ case ChecksumAggregator::ChecksumType::XXHASH64:
+ return XXH64ChecksumAggregator::get(_ch._xxh64);
+ }
+ abort();
+}
+
+BucketState::BucketChecksum
BucketState::addChecksum(BucketChecksum a, BucketChecksum b) {
- return createChecksum(a)->addChecksum(*createChecksum(b)).getChecksum();
+ switch (_checksumType) {
+ case ChecksumAggregator::ChecksumType::LEGACY:
+ return LegacyChecksumAggregator::get(LegacyChecksumAggregator::add(b, a));
+ case ChecksumAggregator::ChecksumType::XXHASH64:
+ return XXH64ChecksumAggregator::get(XXH64ChecksumAggregator::update(b, a));
+ }
+ abort();
}
void
@@ -55,7 +68,14 @@ BucketState::add(const GlobalId &gid, const Timestamp &timestamp, uint32_t docSi
{
assert(subDbType < SubDbType::COUNT);
if (subDbType != SubDbType::REMOVED) {
- _checksum->addDoc(gid, timestamp);
+ switch (_checksumType) {
+ case ChecksumAggregator::ChecksumType::LEGACY:
+ _ch._legacy = LegacyChecksumAggregator::addDoc(gid, timestamp, _ch._legacy);
+ break;
+ case ChecksumAggregator::ChecksumType::XXHASH64:
+ _ch._xxh64 = XXH64ChecksumAggregator::update(gid, timestamp, _ch._xxh64);
+ break;
+ }
}
uint32_t subDbTypeIdx = toIdx(subDbType);
++_docCount[subDbTypeIdx];
@@ -70,7 +90,14 @@ BucketState::remove(const GlobalId &gid, const Timestamp &timestamp, uint32_t do
assert(_docCount[subDbTypeIdx] > 0);
assert(_docSizes[subDbTypeIdx] >= docSize);
if (subDbType != SubDbType::REMOVED) {
- _checksum->removeDoc(gid, timestamp);
+ switch (_checksumType) {
+ case ChecksumAggregator::ChecksumType::LEGACY:
+ _ch._legacy = LegacyChecksumAggregator::removeDoc(gid, timestamp, _ch._legacy);
+ break;
+ case ChecksumAggregator::ChecksumType::XXHASH64:
+ _ch._xxh64 = XXH64ChecksumAggregator::update(gid, timestamp, _ch._xxh64);
+ break;
+ }
}
--_docCount[subDbTypeIdx];
_docSizes[subDbTypeIdx] -= docSize;
@@ -87,8 +114,16 @@ BucketState::modify(const GlobalId &gid,
assert(_docCount[subDbTypeIdx] > 0);
assert(_docSizes[subDbTypeIdx] >= oldDocSize);
if (subDbType != SubDbType::REMOVED) {
- _checksum->removeDoc(gid, oldTimestamp);
- _checksum->addDoc(gid, newTimestamp);
+ switch (_checksumType) {
+ case ChecksumAggregator::ChecksumType::LEGACY:
+ _ch._legacy = LegacyChecksumAggregator::removeDoc(gid, oldTimestamp, _ch._legacy);
+ _ch._legacy = LegacyChecksumAggregator::addDoc(gid, newTimestamp, _ch._legacy);
+ break;
+ case ChecksumAggregator::ChecksumType::XXHASH64:
+ _ch._xxh64 = XXH64ChecksumAggregator::update(gid, oldTimestamp, _ch._xxh64);
+ _ch._xxh64 = XXH64ChecksumAggregator::update(gid, newTimestamp, _ch._xxh64);
+ break;
+ }
}
_docSizes[subDbTypeIdx] = _docSizes[subDbTypeIdx] + newDocSize - oldDocSize;
}
@@ -99,7 +134,7 @@ BucketState::empty() const
if (getReadyCount() != 0 || getRemovedCount() != 0 ||
getNotReadyCount() != 0)
return false;
- assert(_checksum->empty());
+ assert((_checksumType == ChecksumAggregator::ChecksumType::LEGACY) ? (_ch._legacy == 0) : (_ch._xxh64 == 0));
for (uint32_t i = 0; i < COUNTS; ++i) {
assert(_docSizes[i] == 0);
}
@@ -115,7 +150,15 @@ BucketState::operator+=(const BucketState &rhs)
for (uint32_t i = 0; i < COUNTS; ++i) {
_docSizes[i] += rhs._docSizes[i];
}
- _checksum->addChecksum(*rhs._checksum);
+
+ switch (_checksumType) {
+ case ChecksumAggregator::ChecksumType::LEGACY:
+ _ch._legacy = LegacyChecksumAggregator::add(rhs._ch._legacy, _ch._legacy);
+ break;
+ case ChecksumAggregator::ChecksumType::XXHASH64:
+ _ch._xxh64 = XXH64ChecksumAggregator::update(rhs._ch._xxh64, _ch._xxh64);
+ break;
+ }
return *this;
}
@@ -134,7 +177,14 @@ BucketState::operator-=(const BucketState &rhs)
for (uint32_t i = 0; i < COUNTS; ++i) {
_docSizes[i] -= rhs._docSizes[i];
}
- _checksum->removeChecksum(*rhs._checksum);
+ switch (_checksumType) {
+ case ChecksumAggregator::ChecksumType::LEGACY:
+ _ch._legacy = LegacyChecksumAggregator::remove(rhs._ch._legacy, _ch._legacy);
+ break;
+ case ChecksumAggregator::ChecksumType::XXHASH64:
+ _ch._xxh64 = XXH64ChecksumAggregator::update(rhs._ch._xxh64, _ch._xxh64);
+ break;
+ }
return *this;
}
diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/bucketstate.h b/searchcore/src/vespa/searchcore/proton/bucketdb/bucketstate.h
index 8c390b288ae..7f16500fad0 100644
--- a/searchcore/src/vespa/searchcore/proton/bucketdb/bucketstate.h
+++ b/searchcore/src/vespa/searchcore/proton/bucketdb/bucketstate.h
@@ -26,9 +26,9 @@ private:
static constexpr uint32_t REMOVED = static_cast<uint32_t>(SubDbType::REMOVED);
static constexpr uint32_t NOTREADY = static_cast<uint32_t>(SubDbType::NOTREADY);
static constexpr uint32_t COUNTS = static_cast<uint32_t>(SubDbType::COUNT);
- uint32_t _docCount[COUNTS];
+ union { uint32_t _legacy; uint64_t _xxh64;} _ch;
size_t _docSizes[COUNTS];
- vespalib::CloneablePtr<ChecksumAggregator> _checksum;
+ uint32_t _docCount[COUNTS];
bool _active;
static ChecksumType _checksumType;
@@ -67,7 +67,7 @@ public:
size_t getNotReadyDocSizes() const { return _docSizes[NOTREADY]; }
uint32_t getDocumentCount() const { return getReadyCount() + getNotReadyCount(); }
uint32_t getEntryCount() const { return getDocumentCount() + getRemovedCount(); }
- BucketChecksum getChecksum() const { return _checksum->getChecksum(); }
+ BucketChecksum getChecksum() const;
bool empty() const;
BucketState &operator+=(const BucketState &rhs);
BucketState &operator-=(const BucketState &rhs);
diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregator.cpp b/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregator.cpp
deleted file mode 100644
index 795d88ab106..00000000000
--- a/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregator.cpp
+++ /dev/null
@@ -1,18 +0,0 @@
-// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "checksumaggregators.h"
-
-namespace proton::bucketdb {
-
-std::unique_ptr<ChecksumAggregator>
-ChecksumAggregator::create(ChecksumType type, BucketChecksum seed) {
- switch (type) {
- case ChecksumType::LEGACY:
- return std::make_unique<LegacyChecksumAggregator>(seed);
- case ChecksumType::XXHASH64:
- return std::make_unique<XXH64ChecksumAggregator>(seed);
- }
- abort();
-}
-
-}
diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregator.h b/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregator.h
index 421cca5c6b5..b206f11f02f 100644
--- a/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregator.h
+++ b/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregator.h
@@ -16,15 +16,6 @@ public:
using GlobalId = document::GlobalId;
using Timestamp = storage::spi::Timestamp;
using BucketChecksum = storage::spi::BucketChecksum;
- virtual ~ChecksumAggregator() = default;
- virtual ChecksumAggregator & addDoc(const GlobalId &gid, const Timestamp &timestamp) = 0;
- virtual ChecksumAggregator & removeDoc(const GlobalId &gid, const Timestamp &timestamp) = 0;
- virtual ChecksumAggregator & addChecksum(const ChecksumAggregator & rhs) = 0;
- virtual ChecksumAggregator & removeChecksum(const ChecksumAggregator & rhs) = 0;
- virtual BucketChecksum getChecksum() const = 0;
- virtual bool empty() const = 0;;
- virtual ChecksumAggregator * clone() const = 0;
- static std::unique_ptr<ChecksumAggregator> create(ChecksumType type, BucketChecksum seed);
};
}
diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregators.cpp b/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregators.cpp
index a118f416b6b..9246f94ca83 100644
--- a/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregators.cpp
+++ b/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregators.cpp
@@ -33,79 +33,30 @@ calcChecksum(const GlobalId &gid, const Timestamp &timestamp)
return gidChecksum(gid) + timestampChecksum(timestamp);
}
+uint64_t
+compute(const GlobalId &gid, const Timestamp &timestamp) {
+ char buffer[20];
+ memcpy(&buffer[0], gid.get(), GlobalId::LENGTH);
+ uint64_t tmp = timestamp.getValue();
+ memcpy(&buffer[GlobalId::LENGTH], &tmp, sizeof(tmp));
+ return XXH64(buffer, sizeof(buffer), 0);
}
-LegacyChecksumAggregator *
-LegacyChecksumAggregator::clone() const {
- return new LegacyChecksumAggregator(*this);
-}
-LegacyChecksumAggregator &
-LegacyChecksumAggregator::addDoc(const GlobalId &gid, const Timestamp &timestamp) {
- _checksum += calcChecksum(gid, timestamp);
- return *this;
}
-LegacyChecksumAggregator &
-LegacyChecksumAggregator::removeDoc(const GlobalId &gid, const Timestamp &timestamp) {
- _checksum -= calcChecksum(gid, timestamp);
- return *this;
-}
-LegacyChecksumAggregator &
-LegacyChecksumAggregator::addChecksum(const ChecksumAggregator & rhs) {
- _checksum += dynamic_cast<const LegacyChecksumAggregator &>(rhs)._checksum;
- return *this;
-}
-LegacyChecksumAggregator &
-LegacyChecksumAggregator::removeChecksum(const ChecksumAggregator & rhs) {
- _checksum -= dynamic_cast<const LegacyChecksumAggregator &>(rhs)._checksum;
- return *this;
-}
-BucketChecksum
-LegacyChecksumAggregator::getChecksum() const {
- return BucketChecksum(_checksum);
-}
-bool
-LegacyChecksumAggregator::empty() const { return _checksum == 0; }
-
-
-XXH64ChecksumAggregator *
-XXH64ChecksumAggregator::clone() const {
- return new XXH64ChecksumAggregator(*this);
-}
-XXH64ChecksumAggregator &
-XXH64ChecksumAggregator::addDoc(const GlobalId &gid, const Timestamp &timestamp) {
- _checksum ^= compute(gid, timestamp);
- return *this;
-}
-XXH64ChecksumAggregator &
-XXH64ChecksumAggregator::removeDoc(const GlobalId &gid, const Timestamp &timestamp) {
- _checksum ^= compute(gid, timestamp);
- return *this;
-}
-XXH64ChecksumAggregator &
-XXH64ChecksumAggregator::addChecksum(const ChecksumAggregator & rhs) {
- _checksum ^= dynamic_cast<const XXH64ChecksumAggregator &>(rhs)._checksum;
- return *this;
-}
-XXH64ChecksumAggregator &
-XXH64ChecksumAggregator::removeChecksum(const ChecksumAggregator & rhs) {
- _checksum ^= dynamic_cast<const XXH64ChecksumAggregator &>(rhs)._checksum;
- return *this;
+uint32_t
+LegacyChecksumAggregator::addDoc(const GlobalId &gid, const Timestamp &timestamp, uint32_t checkSum) {
+ return add(calcChecksum(gid, timestamp), checkSum);
}
-BucketChecksum
-XXH64ChecksumAggregator::getChecksum() const {
- return BucketChecksum((_checksum >> 32) ^ (_checksum & 0xffffffffL));
+
+uint32_t
+LegacyChecksumAggregator::removeDoc(const GlobalId &gid, const Timestamp &timestamp, uint32_t checkSum) {
+ return remove(calcChecksum(gid, timestamp), checkSum);
}
-bool
-XXH64ChecksumAggregator::empty() const { return _checksum == 0; }
uint64_t
-XXH64ChecksumAggregator::compute(const GlobalId &gid, const Timestamp &timestamp) {
- char buffer[20];
- memcpy(&buffer[0], gid.get(), GlobalId::LENGTH);
- uint64_t tmp = timestamp.getValue();
- memcpy(&buffer[GlobalId::LENGTH], &tmp, sizeof(tmp));
- return XXH64(buffer, sizeof(buffer), 0);
+XXH64ChecksumAggregator::update(const GlobalId &gid, const Timestamp &timestamp, uint64_t checkSum) {
+ return update(compute(gid, timestamp), checkSum);
}
}
diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregators.h b/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregators.h
index 49762ad107f..1c0c2bc8f7a 100644
--- a/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregators.h
+++ b/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregators.h
@@ -9,16 +9,11 @@ namespace proton::bucketdb {
**/
class LegacyChecksumAggregator : public ChecksumAggregator {
public:
- explicit LegacyChecksumAggregator(BucketChecksum seed) : _checksum(seed) { }
- LegacyChecksumAggregator * clone() const override;
- LegacyChecksumAggregator & addDoc(const GlobalId &gid, const Timestamp &timestamp) override;
- LegacyChecksumAggregator & removeDoc(const GlobalId &gid, const Timestamp &timestamp) override;
- LegacyChecksumAggregator & addChecksum(const ChecksumAggregator & rhs) override;
- LegacyChecksumAggregator & removeChecksum(const ChecksumAggregator & rhs) override;
- BucketChecksum getChecksum() const override;
- bool empty() const override;
-private:
- uint32_t _checksum;
+ static uint32_t addDoc(const GlobalId &gid, const Timestamp &timestamp, uint32_t checkSum);
+ static uint32_t removeDoc(const GlobalId &gid, const Timestamp &timestamp, uint32_t checkSum);
+ static uint32_t add(uint32_t checksum, uint32_t aggr) { return aggr + checksum; }
+ static uint32_t remove(uint32_t checksum, uint32_t aggr) { return aggr - checksum; }
+ static BucketChecksum get(uint32_t checkSum) { return BucketChecksum(checkSum); }
};
/**
@@ -26,17 +21,11 @@ private:
**/
class XXH64ChecksumAggregator : public ChecksumAggregator {
public:
- explicit XXH64ChecksumAggregator(BucketChecksum seed) : _checksum(seed) { }
- XXH64ChecksumAggregator * clone() const override;
- XXH64ChecksumAggregator & addDoc(const GlobalId &gid, const Timestamp &timestamp) override;
- XXH64ChecksumAggregator & removeDoc(const GlobalId &gid, const Timestamp &timestamp) override;
- XXH64ChecksumAggregator & addChecksum(const ChecksumAggregator & rhs) override;
- XXH64ChecksumAggregator & removeChecksum(const ChecksumAggregator & rhs) override;
- BucketChecksum getChecksum() const override;
- bool empty() const override;
-private:
- static uint64_t compute(const GlobalId &gid, const Timestamp &timestamp);
- uint64_t _checksum;
+ static uint64_t update(const GlobalId &gid, const Timestamp &timestamp, uint64_t checkSum);
+ static uint64_t update(uint64_t a, uint64_t b) { return a ^ b; }
+ static BucketChecksum get(uint64_t checkSum) {
+ return BucketChecksum((checkSum >> 32) ^ (checkSum & 0xffffffffL));
+ }
};
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp b/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp
index fdacf59fa02..7cfad4f1ac1 100644
--- a/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp
@@ -34,7 +34,7 @@ SearchableFeedView::SearchableFeedView(StoreOnlyFeedView::Context storeOnlyCtx,
const FastAccessFeedView::Context &fastUpdateCtx, Context ctx)
: Parent(std::move(storeOnlyCtx), params, fastUpdateCtx),
_indexWriter(ctx._indexWriter),
- _hasIndexedFields(getSchema()->getNumIndexFields() > 0)
+ _hasIndexedFields(_schema->getNumIndexFields() > 0)
{ }
SearchableFeedView::~SearchableFeedView() = default;
diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp
index 89e765e2e2b..bf357188766 100644
--- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp
@@ -8,14 +8,14 @@
#include "remove_batch_done_context.h"
#include "removedonecontext.h"
#include "updatedonecontext.h"
+#include <vespa/document/datatype/documenttype.h>
+#include <vespa/document/fieldvalue/document.h>
+#include <vespa/document/repo/documenttyperepo.h>
#include <vespa/searchcore/proton/attribute/ifieldupdatecallback.h>
#include <vespa/searchcore/proton/common/feedtoken.h>
#include <vespa/searchcore/proton/feedoperation/operations.h>
#include <vespa/searchcore/proton/reference/i_gid_to_lid_change_handler.h>
#include <vespa/searchlib/common/gatecallback.h>
-#include <vespa/document/datatype/documenttype.h>
-#include <vespa/document/fieldvalue/document.h>
-#include <vespa/document/repo/documenttyperepo.h>
#include <vespa/vespalib/util/isequencedtaskexecutor.h>
#include <vespa/vespalib/util/exceptions.h>
@@ -148,8 +148,8 @@ SummaryPutDoneContext::SummaryPutDoneContext(FeedToken token, IPendingLidTracker
SummaryPutDoneContext::~SummaryPutDoneContext() = default;
-std::vector<document::GlobalId>
-getGidsToRemove(const IDocumentMetaStore &metaStore, const LidVectorContext::LidVector &lidsToRemove)
+std::vector<document::GlobalId> getGidsToRemove(const IDocumentMetaStore &metaStore,
+ const LidVectorContext::LidVector &lidsToRemove)
{
std::vector<document::GlobalId> gids;
gids.reserve(lidsToRemove.size());
@@ -162,9 +162,8 @@ getGidsToRemove(const IDocumentMetaStore &metaStore, const LidVectorContext::Lid
return gids;
}
-void
-putMetaData(documentmetastore::IStore &meta_store, const DocumentId & doc_id,
- const DocumentOperation &op, bool is_removed_doc)
+void putMetaData(documentmetastore::IStore &meta_store, const DocumentId & doc_id,
+ const DocumentOperation &op, bool is_removed_doc)
{
documentmetastore::IStore::Result putRes(
meta_store.put(doc_id.getGlobalId(),
@@ -178,9 +177,8 @@ putMetaData(documentmetastore::IStore &meta_store, const DocumentId & doc_id,
assert(op.getLid() == putRes._lid);
}
-void
-removeMetaData(documentmetastore::IStore &meta_store, const GlobalId & gid, const DocumentId &doc_id,
- const DocumentOperation &op, bool is_removed_doc)
+void removeMetaData(documentmetastore::IStore &meta_store, const GlobalId & gid, const DocumentId &doc_id,
+ const DocumentOperation &op, bool is_removed_doc)
{
assert(meta_store.validLid(op.getPrevLid()));
assert(is_removed_doc == op.getPrevMarkedAsRemoved());
@@ -209,37 +207,6 @@ moveMetaData(documentmetastore::IStore &meta_store, const DocumentId & doc_id, c
meta_store.move(op.getPrevLid(), op.getLid(), op.get_prepare_serial_num());
}
-class UpdateScope final : public IFieldUpdateCallback
-{
-private:
- const vespalib::hash_set<int32_t> & _indexedFields;
- bool _nonAttributeFields;
-public:
- bool _hasIndexedFields;
-
- UpdateScope(const vespalib::hash_set<int32_t> & indexedFields, const DocumentUpdate & upd);
- bool hasIndexOrNonAttributeFields() const {
- return _hasIndexedFields || _nonAttributeFields;
- }
- void onUpdateField(const document::Field & field, const search::AttributeVector * attr) override;
-};
-
-UpdateScope::UpdateScope(const vespalib::hash_set<int32_t> & indexedFields, const DocumentUpdate & upd)
- : _indexedFields(indexedFields),
- _nonAttributeFields(!upd.getFieldPathUpdates().empty()),
- _hasIndexedFields(false)
-{}
-
-void
-UpdateScope::onUpdateField(const document::Field & field, const search::AttributeVector * attr) {
- if (!_nonAttributeFields && (attr == nullptr || !attr->isUpdateableInMemoryOnly())) {
- _nonAttributeFields = true;
- }
- if (!_hasIndexedFields && (_indexedFields.find(field.getId()) != _indexedFields.end())) {
- _hasIndexedFields = true;
- }
-}
-
} // namespace
StoreOnlyFeedView::StoreOnlyFeedView(Context ctx, const PersistentParams &params)
@@ -253,18 +220,12 @@ StoreOnlyFeedView::StoreOnlyFeedView(Context ctx, const PersistentParams &params
_pendingLidsForDocStore(),
_pendingLidsForCommit(std::move(ctx._pendingLidsForCommit)),
_schema(std::move(ctx._schema)),
- _indexedFields(),
_writeService(ctx._writeService),
_params(params),
_metaStore(_documentMetaStoreContext->get()),
_gidToLidChangeHandler(ctx._gidToLidChangeHandler)
{
_docType = _repo->getDocumentType(_params._docTypeName.getName());
- if (_schema && _docType) {
- for (const auto &indexField : _schema->getIndexFields()) {
- _indexedFields.insert(_docType->getField(indexField.getName()).getId());
- }
- }
}
StoreOnlyFeedView::~StoreOnlyFeedView() = default;
@@ -378,7 +339,7 @@ StoreOnlyFeedView::updateAttributes(SerialNum, Lid, const DocumentUpdate & upd,
OnOperationDoneType, IFieldUpdateCallback & onUpdate)
{
for (const auto & fieldUpdate : upd.getUpdates()) {
- onUpdate.onUpdateField(fieldUpdate.getField(), nullptr);
+ onUpdate.onUpdateField(fieldUpdate.getField().getName(), nullptr);
}
}
@@ -447,6 +408,22 @@ void StoreOnlyFeedView::heartBeatSummary(SerialNum serialNum) {
}));
}
+StoreOnlyFeedView::UpdateScope::UpdateScope(const search::index::Schema & schema, const DocumentUpdate & upd)
+ : _schema(&schema),
+ _indexedFields(false),
+ _nonAttributeFields(!upd.getFieldPathUpdates().empty())
+{}
+
+void
+StoreOnlyFeedView::UpdateScope::onUpdateField(vespalib::stringref fieldName, const search::AttributeVector * attr) {
+ if (!_nonAttributeFields && (attr == nullptr || !attr->isUpdateableInMemoryOnly())) {
+ _nonAttributeFields = true;
+ }
+ if (!_indexedFields && _schema->isIndexField(fieldName)) {
+ _indexedFields = true;
+ }
+}
+
void
StoreOnlyFeedView::internalUpdate(FeedToken token, const UpdateOperation &updOp) {
if ( ! updOp.getUpdate()) {
@@ -478,7 +455,7 @@ StoreOnlyFeedView::internalUpdate(FeedToken token, const UpdateOperation &updOp)
auto uncommitted = get_pending_lid_token(updOp);
auto onWriteDone = createUpdateDoneContext(std::move(token), std::move(uncommitted), updOp.getUpdate());
- UpdateScope updateScope(_indexedFields, upd);
+ UpdateScope updateScope(*_schema, upd);
updateAttributes(serialNum, lid, upd, onWriteDone, updateScope);
if (updateScope.hasIndexOrNonAttributeFields()) {
@@ -486,7 +463,7 @@ StoreOnlyFeedView::internalUpdate(FeedToken token, const UpdateOperation &updOp)
FutureDoc futureDoc = promisedDoc.get_future().share();
onWriteDone->setDocument(futureDoc);
_pendingLidsForDocStore.waitComplete(lid);
- if (updateScope._hasIndexedFields) {
+ if (updateScope._indexedFields) {
updateIndexedFields(serialNum, lid, futureDoc, onWriteDone);
}
PromisedStream promisedStream;
diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h
index c033ae0c43f..da7d5e53a88 100644
--- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h
+++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h
@@ -9,6 +9,7 @@
#include "searchcontext.h"
#include <vespa/searchcore/proton/common/pendinglidtracker.h>
#include <vespa/searchcore/proton/common/doctypename.h>
+#include <vespa/searchcore/proton/attribute/ifieldupdatecallback.h>
#include <vespa/searchcore/proton/common/feeddebugger.h>
#include <vespa/searchcore/proton/documentmetastore/documentmetastore.h>
#include <vespa/searchcore/proton/documentmetastore/documentmetastorecontext.h>
@@ -18,10 +19,9 @@
#include <vespa/searchcore/proton/reference/pending_notify_remove_done.h>
#include <vespa/searchcorespi/index/ithreadingservice.h>
#include <vespa/searchlib/query/base.h>
-#include <vespa/searchcore/proton/feedoperation/operations.h>
#include <vespa/vespalib/util/threadstackexecutorbase.h>
-#include <vespa/vespalib/stllike/hash_set.h>
#include <future>
+#include <vespa/searchcore/proton/feedoperation/operations.h>
namespace search { class IDestructorCallback; }
@@ -60,6 +60,7 @@ public:
using OnOperationDoneType = const std::shared_ptr<OperationDoneContext> &;
using OnPutDoneType = const std::shared_ptr<PutDoneContext> &;
using OnRemoveDoneType = const std::shared_ptr<RemoveDoneContext> &;
+ using FeedTokenUP = std::unique_ptr<FeedToken>;
using FutureDoc = std::shared_future<std::unique_ptr<const Document>>;
using PromisedDoc = std::promise<std::unique_ptr<const Document>>;
using FutureStream = std::future<vespalib::nbostream>;
@@ -120,6 +121,22 @@ public:
{}
};
+protected:
+ class UpdateScope : public IFieldUpdateCallback
+ {
+ private:
+ const search::index::Schema *_schema;
+ public:
+ bool _indexedFields;
+ bool _nonAttributeFields;
+
+ UpdateScope(const search::index::Schema & schema, const DocumentUpdate & upd);
+ bool hasIndexOrNonAttributeFields() const {
+ return _indexedFields || _nonAttributeFields;
+ }
+ void onUpdateField(vespalib::stringref fieldName, const search::AttributeVector * attr) override;
+ };
+
private:
const ISummaryAdapter::SP _summaryAdapter;
const IDocumentMetaStoreContext::SP _documentMetaStoreContext;
@@ -128,9 +145,9 @@ private:
LidReuseDelayer _lidReuseDelayer;
PendingLidTracker _pendingLidsForDocStore;
std::shared_ptr<PendingLidTrackerBase> _pendingLidsForCommit;
- const search::index::Schema::SP _schema;
- vespalib::hash_set<int32_t> _indexedFields;
+
protected:
+ const search::index::Schema::SP _schema;
searchcorespi::index::IThreadingService &_writeService;
PersistentParams _params;
IDocumentMetaStore &_metaStore;
diff --git a/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/aws/AwsCredentials.java b/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/aws/AwsCredentials.java
index 30ff63fb108..b027e7272ea 100644
--- a/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/aws/AwsCredentials.java
+++ b/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/aws/AwsCredentials.java
@@ -35,11 +35,11 @@ public class AwsCredentials {
}
public AwsCredentials(URI ztsUrl, ServiceIdentityProvider identityProvider, AthenzDomain athenzDomain, AwsRole awsRole) {
- this(new DefaultZtsClient.Builder(ztsUrl).withIdentityProvider(identityProvider).build(), athenzDomain, awsRole);
+ this(new DefaultZtsClient(ztsUrl, identityProvider), athenzDomain, awsRole);
}
public AwsCredentials(URI ztsUrl, SSLContext sslContext, AthenzDomain athenzDomain, AwsRole awsRole) {
- this(new DefaultZtsClient.Builder(ztsUrl).withSslContext(sslContext).build(), athenzDomain, awsRole);
+ this(new DefaultZtsClient(ztsUrl, sslContext), athenzDomain, awsRole);
}
/**
diff --git a/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/client/common/ClientBase.java b/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/client/common/ClientBase.java
index 37ef513c786..c1ce45c35da 100644
--- a/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/client/common/ClientBase.java
+++ b/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/client/common/ClientBase.java
@@ -39,15 +39,12 @@ public abstract class ClientBase implements AutoCloseable {
private final CloseableHttpClient client;
private final ClientExceptionFactory exceptionFactory;
- private final ErrorHandler errorHandler;
protected ClientBase(String userAgent,
Supplier<SSLContext> sslContextSupplier,
ClientExceptionFactory exceptionFactory,
- HostnameVerifier hostnameVerifier,
- ErrorHandler errorHandler) {
+ HostnameVerifier hostnameVerifier) {
this.exceptionFactory = exceptionFactory;
- this.errorHandler = errorHandler;
this.client = createHttpClient(userAgent, sslContextSupplier, hostnameVerifier);
}
@@ -55,17 +52,10 @@ public abstract class ClientBase implements AutoCloseable {
try {
return client.execute(request, responseHandler);
} catch (IOException e) {
- try {
- reportError(request, e);
- } catch (Exception _ignored) {}
throw new UncheckedIOException(e);
}
}
- private void reportError(HttpUriRequest request, Exception e) {
- errorHandler.reportError(request, e);
- }
-
protected StringEntity toJsonStringEntity(Object entity) {
try {
return new StringEntity(objectMapper.writeValueAsString(entity), ContentType.APPLICATION_JSON);
@@ -124,11 +114,4 @@ public abstract class ClientBase implements AutoCloseable {
protected interface ClientExceptionFactory {
RuntimeException createException(int errorCode, String description);
}
-
- public interface ErrorHandler {
- static ErrorHandler empty() {
- return (r,e)->{};
- }
- void reportError(HttpUriRequest request, Exception error);
- }
}
diff --git a/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/client/zms/DefaultZmsClient.java b/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/client/zms/DefaultZmsClient.java
index 3742996c274..33cb6d7d5d4 100644
--- a/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/client/zms/DefaultZmsClient.java
+++ b/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/client/zms/DefaultZmsClient.java
@@ -25,6 +25,7 @@ import javax.net.ssl.SSLContext;
import java.net.URI;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.function.Supplier;
@@ -39,16 +40,16 @@ public class DefaultZmsClient extends ClientBase implements ZmsClient {
private final URI zmsUrl;
private final AthenzIdentity identity;
- public DefaultZmsClient(URI zmsUrl, AthenzIdentity identity, SSLContext sslContext, ErrorHandler errorHandler) {
- this(zmsUrl, identity, () -> sslContext, errorHandler);
+ public DefaultZmsClient(URI zmsUrl, AthenzIdentity identity, SSLContext sslContext) {
+ this(zmsUrl, identity, () -> sslContext);
}
- public DefaultZmsClient(URI zmsUrl, ServiceIdentityProvider identityProvider, ErrorHandler errorHandler) {
- this(zmsUrl, identityProvider.identity(), identityProvider::getIdentitySslContext, errorHandler);
+ public DefaultZmsClient(URI zmsUrl, ServiceIdentityProvider identityProvider) {
+ this(zmsUrl, identityProvider.identity(), identityProvider::getIdentitySslContext);
}
- private DefaultZmsClient(URI zmsUrl, AthenzIdentity identity, Supplier<SSLContext> sslContextSupplier, ErrorHandler errorHandler) {
- super("vespa-zms-client", sslContextSupplier, ZmsClientException::new, null, errorHandler);
+ private DefaultZmsClient(URI zmsUrl, AthenzIdentity identity, Supplier<SSLContext> sslContextSupplier) {
+ super("vespa-zms-client", sslContextSupplier, ZmsClientException::new, null);
this.zmsUrl = addTrailingSlash(zmsUrl);
this.identity = identity;
}
diff --git a/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/client/zts/DefaultZtsClient.java b/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/client/zts/DefaultZtsClient.java
index 28119dc1f5a..c05213c8008 100644
--- a/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/client/zts/DefaultZtsClient.java
+++ b/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/client/zts/DefaultZtsClient.java
@@ -37,7 +37,6 @@ import java.security.KeyPair;
import java.security.cert.X509Certificate;
import java.time.Duration;
import java.util.List;
-import java.util.Objects;
import java.util.Optional;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -53,8 +52,25 @@ import static java.util.stream.Collectors.toList;
public class DefaultZtsClient extends ClientBase implements ZtsClient {
private final URI ztsUrl;
- protected DefaultZtsClient(URI ztsUrl, Supplier<SSLContext> sslContextSupplier, HostnameVerifier hostnameVerifier, ErrorHandler errorHandler) {
- super("vespa-zts-client", sslContextSupplier, ZtsClientException::new, hostnameVerifier, errorHandler);
+
+ public DefaultZtsClient(URI ztsUrl, SSLContext sslContext) {
+ this(ztsUrl, sslContext, null);
+ }
+
+ public DefaultZtsClient(URI ztsUrl, SSLContext sslContext, HostnameVerifier hostnameVerifier) {
+ this(ztsUrl, () -> sslContext, hostnameVerifier);
+ }
+
+ public DefaultZtsClient(URI ztsUrl, ServiceIdentityProvider identityProvider) {
+ this(ztsUrl, identityProvider::getIdentitySslContext, null);
+ }
+
+ public DefaultZtsClient(URI ztsUrl, ServiceIdentityProvider identityProvider, HostnameVerifier hostnameVerifier) {
+ this(ztsUrl, identityProvider::getIdentitySslContext, hostnameVerifier);
+ }
+
+ private DefaultZtsClient(URI ztsUrl, Supplier<SSLContext> sslContextSupplier, HostnameVerifier hostnameVerifier) {
+ super("vespa-zts-client", sslContextSupplier, ZtsClientException::new, hostnameVerifier);
this.ztsUrl = addTrailingSlash(ztsUrl);
}
@@ -223,41 +239,5 @@ public class DefaultZtsClient extends ClientBase implements ZtsClient {
else
return URI.create(ztsUrl.toString() + '/');
}
- public static class Builder {
- private URI ztsUrl;
- private ClientBase.ErrorHandler errorHandler = ErrorHandler.empty();
- private HostnameVerifier hostnameVerifier = null;
- private Supplier<SSLContext> sslContextSupplier = null;
-
- public Builder(URI ztsUrl) {
- this.ztsUrl = ztsUrl;
- }
- public Builder withErrorHandler(ClientBase.ErrorHandler errorHandler) {
- this.errorHandler = errorHandler;
- return this;
- }
-
- public Builder withHostnameVerifier(HostnameVerifier hostnameVerifier) {
- this.hostnameVerifier = hostnameVerifier;
- return this;
- }
-
- public Builder withSslContext(SSLContext sslContext) {
- this.sslContextSupplier = () -> sslContext;
- return this;
- }
-
- public Builder withIdentityProvider(ServiceIdentityProvider identityProvider) {
- this.sslContextSupplier = identityProvider::getIdentitySslContext;
- return this;
- }
-
- public DefaultZtsClient build() {
- if (Objects.isNull(sslContextSupplier)) {
- throw new IllegalArgumentException("No ssl context or identity provider available to set up zts client");
- }
- return new DefaultZtsClient(ztsUrl, sslContextSupplier, hostnameVerifier, errorHandler);
- }
- }
}
diff --git a/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/identityprovider/client/AthenzCredentialsService.java b/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/identityprovider/client/AthenzCredentialsService.java
index 612f9caa691..8e029906c30 100644
--- a/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/identityprovider/client/AthenzCredentialsService.java
+++ b/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/identityprovider/client/AthenzCredentialsService.java
@@ -81,7 +81,7 @@ class AthenzCredentialsService {
document.ipAddresses(),
keyPair);
- try (ZtsClient ztsClient = new DefaultZtsClient.Builder(ztsEndpoint).withIdentityProvider(nodeIdentityProvider).build()) {
+ try (ZtsClient ztsClient = new DefaultZtsClient(ztsEndpoint, nodeIdentityProvider)) {
InstanceIdentity instanceIdentity =
ztsClient.registerInstance(
configserverIdentity,
@@ -102,7 +102,7 @@ class AthenzCredentialsService {
document.ipAddresses(),
newKeyPair);
- try (ZtsClient ztsClient = new DefaultZtsClient.Builder(ztsEndpoint).withSslContext(sslContext).build()) {
+ try (ZtsClient ztsClient = new DefaultZtsClient(ztsEndpoint, sslContext)) {
InstanceIdentity instanceIdentity =
ztsClient.refreshInstance(
configserverIdentity,
diff --git a/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/identityprovider/client/AthenzIdentityProviderImpl.java b/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/identityprovider/client/AthenzIdentityProviderImpl.java
index 724a3059f6d..65574d7583e 100644
--- a/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/identityprovider/client/AthenzIdentityProviderImpl.java
+++ b/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/identityprovider/client/AthenzIdentityProviderImpl.java
@@ -301,7 +301,7 @@ public final class AthenzIdentityProviderImpl extends AbstractComponent implemen
}
private DefaultZtsClient createZtsClient() {
- return new DefaultZtsClient.Builder(ztsEndpoint).withSslContext(getIdentitySslContext()).build();
+ return new DefaultZtsClient(ztsEndpoint, getIdentitySslContext());
}
@Override