diff options
61 files changed, 1331 insertions, 619 deletions
diff --git a/athenz-identity-provider-service/src/main/java/com/yahoo/vespa/hosted/athenz/instanceproviderservice/ConfigserverSslContextFactoryProvider.java b/athenz-identity-provider-service/src/main/java/com/yahoo/vespa/hosted/athenz/instanceproviderservice/ConfigserverSslContextFactoryProvider.java index 880646e37e5..ae4a5933ac2 100644 --- a/athenz-identity-provider-service/src/main/java/com/yahoo/vespa/hosted/athenz/instanceproviderservice/ConfigserverSslContextFactoryProvider.java +++ b/athenz-identity-provider-service/src/main/java/com/yahoo/vespa/hosted/athenz/instanceproviderservice/ConfigserverSslContextFactoryProvider.java @@ -3,6 +3,7 @@ package com.yahoo.vespa.hosted.athenz.instanceproviderservice; import com.google.inject.Inject; import com.yahoo.jdisc.http.ssl.impl.TlsContextBasedProvider; +import java.util.logging.Level; import com.yahoo.security.KeyStoreBuilder; import com.yahoo.security.KeyStoreType; import com.yahoo.security.KeyUtils; @@ -36,7 +37,6 @@ import java.util.UUID; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.logging.Level; import java.util.logging.Logger; /** @@ -66,8 +66,7 @@ public class ConfigserverSslContextFactoryProvider extends TlsContextBasedProvid KeyProvider keyProvider, AthenzProviderServiceConfig config) { this.athenzProviderServiceConfig = config; - this.ztsClient = new DefaultZtsClient.Builder(URI.create(athenzProviderServiceConfig.ztsUrl())) - .withIdentityProvider(bootstrapIdentity).build(); + this.ztsClient = new DefaultZtsClient(URI.create(athenzProviderServiceConfig.ztsUrl()), bootstrapIdentity); this.keyProvider = keyProvider; this.configserverIdentity = new AthenzService(athenzProviderServiceConfig.domain(), athenzProviderServiceConfig.serviceName()); diff --git a/athenz-identity-provider-service/src/test/java/com/yahoo/vespa/hosted/ca/restapi/CertificateAuthorityApiTest.java b/athenz-identity-provider-service/src/test/java/com/yahoo/vespa/hosted/ca/restapi/CertificateAuthorityApiTest.java index bf2a6719842..343a9feeed6 100644 --- a/athenz-identity-provider-service/src/test/java/com/yahoo/vespa/hosted/ca/restapi/CertificateAuthorityApiTest.java +++ b/athenz-identity-provider-service/src/test/java/com/yahoo/vespa/hosted/ca/restapi/CertificateAuthorityApiTest.java @@ -8,11 +8,11 @@ import com.yahoo.security.KeyUtils; import com.yahoo.security.Pkcs10Csr; import com.yahoo.security.Pkcs10CsrUtils; import com.yahoo.security.X509CertificateUtils; -import com.yahoo.slime.SlimeUtils; import com.yahoo.text.StringUtilities; import com.yahoo.vespa.athenz.api.AthenzPrincipal; import com.yahoo.vespa.athenz.api.AthenzService; import com.yahoo.vespa.athenz.client.zts.DefaultZtsClient; +import com.yahoo.slime.SlimeUtils; import com.yahoo.vespa.hosted.ca.CertificateTester; import org.apache.http.client.ResponseHandler; import org.apache.http.client.methods.HttpUriRequest; @@ -224,7 +224,7 @@ public class CertificateAuthorityApiTest extends ContainerTester { private final X509Certificate certificate; public TestZtsClient(Principal principal, X509Certificate certificate, URI ztsUrl, SSLContext sslContext) { - super(ztsUrl, () -> sslContext, null, ErrorHandler.empty()); + super(ztsUrl, sslContext); this.principal = principal; this.certificate = certificate; } diff --git a/clustercontroller-reindexer/pom.xml b/clustercontroller-reindexer/pom.xml index 3791ea7d3f4..172bff6fdb4 100644 --- a/clustercontroller-reindexer/pom.xml +++ b/clustercontroller-reindexer/pom.xml @@ -48,6 +48,12 @@ </dependency> <dependency> <groupId>com.yahoo.vespa</groupId> + <artifactId>testutil</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.yahoo.vespa</groupId> <artifactId>config-model</artifactId> <version>${project.version}</version> <scope>test</scope> diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java new file mode 100644 index 00000000000..9e7e2880036 --- /dev/null +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java @@ -0,0 +1,235 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.reindexing; + +import ai.vespa.reindexing.Reindexing.Status; +import ai.vespa.reindexing.ReindexingCurator.ReindexingLockException; +import com.yahoo.document.DocumentType; +import com.yahoo.document.select.parser.ParseException; +import com.yahoo.documentapi.DocumentAccess; +import com.yahoo.documentapi.ProgressToken; +import com.yahoo.documentapi.VisitorControlHandler; +import com.yahoo.documentapi.VisitorParameters; +import com.yahoo.documentapi.VisitorSession; +import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; +import com.yahoo.vespa.curator.Lock; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.util.Map; +import java.util.Objects; +import java.util.TreeMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Phaser; +import java.util.concurrent.atomic.AtomicReference; +import java.util.logging.Logger; + +import static java.util.Objects.requireNonNull; +import static java.util.logging.Level.FINE; +import static java.util.logging.Level.INFO; +import static java.util.logging.Level.WARNING; +import static java.util.stream.Collectors.joining; + +/** + * Progresses reindexing efforts by creating visitor sessions against its own content cluster, + * which send documents straight to storage — via indexing if the documenet type has "index" mode. + * The {@link #reindex} method blocks until shutdown is called, or until no more reindexing is left to do. + * + * @author jonmv + */ +public class Reindexer { + + private static final Logger log = Logger.getLogger(Reindexer.class.getName()); + + private final Cluster cluster; + private final Map<DocumentType, Instant> ready; + private final ReindexingCurator database; + private final DocumentAccess access; + private final Clock clock; + private final Phaser phaser = new Phaser(2); // Reindexer and visitor. + + private Reindexing reindexing; + private Status status; + + public Reindexer(Cluster cluster, Map<DocumentType, Instant> ready, ReindexingCurator database, + DocumentAccess access, Clock clock) { + for (DocumentType type : ready.keySet()) + cluster.bucketSpaceOf(type); // Verifies this is known. + + this.cluster = cluster; + this.ready = new TreeMap<>(ready); // Iterate through document types in consistent order. + this.database = database; + this.access = access; + this.clock = clock; + } + + /** Lets the reindexere abort any ongoing visit session, wait for it to complete normally, then exit. */ + public void shutdown() { + phaser.forceTermination(); // All parties waiting on this phaser are immediately allowed to proceed. + } + + /** Starts and tracks reprocessing of ready document types until done, or interrupted. */ + public void reindex() throws ReindexingLockException { + if (phaser.isTerminated()) + throw new IllegalStateException("Already shut down"); + + try (Lock lock = database.lockReindexing()) { + for (DocumentType type : ready.keySet()) { // We consider only document types for which we have config. + if (ready.get(type).isAfter(clock.instant())) + log.log(INFO, "Received config for reindexing which is ready in the future — will process later " + + "(" + ready.get(type) + " is after " + clock.instant() + ")"); + else + progress(type); + + if (phaser.isTerminated()) + break; + } + } + } + + @SuppressWarnings("fallthrough") // (ノಠ ∩ಠ)ノ彡( \o°o)\ + private void progress(DocumentType type) { + // If this is a new document type (or a new cluster), no reindexing is required. + reindexing = database.readReindexing(); + status = reindexing.status().getOrDefault(type, + Status.ready(clock.instant()) + .running() + .successful(clock.instant())); + if (ready.get(type).isAfter(status.startedAt())) + status = Status.ready(clock.instant()); // Need to restart, as a newer reindexing is required. + + database.writeReindexing(reindexing = reindexing.with(type, status)); + + switch (status.state()) { + default: + log.log(WARNING, "Unknown reindexing state '" + status.state() + "'"); + case FAILED: + log.log(FINE, () -> "Not continuing reindexing of " + type + " due to previous failure"); + case SUCCESSFUL: // Intentional fallthrough — all three are done states. + return; + case RUNNING: + log.log(WARNING, "Unexpected state 'RUNNING' of reindexing of " + type); + case READY: // Intentional fallthrough — must just assume we failed updating state when exiting previously. + log.log(FINE, () -> "Running reindexing of " + type); + } + + // Visit buckets until they're all done, or until we are interrupted. + status = status.running(); + AtomicReference<Instant> progressLastStored = new AtomicReference<>(clock.instant()); + VisitorControlHandler control = new VisitorControlHandler() { + @Override + public void onProgress(ProgressToken token) { + super.onProgress(token); + status = status.progressed(token); + if (progressLastStored.get().isBefore(clock.instant().minusSeconds(10))) { + progressLastStored.set(clock.instant()); + database.writeReindexing(reindexing = reindexing.with(type, status)); + } + } + @Override + public void onDone(CompletionCode code, String message) { + super.onDone(code, message); + phaser.arriveAndAwaitAdvance(); // Synchronize with the reindex thread. + } + }; + visit(type, status.progress().orElse(null), control); + + // If we were interrupted, the result may not yet be set in the control handler. + switch (control.getResult().getCode()) { + default: + log.log(WARNING, "Unexpected visitor result '" + control.getResult().getCode() + "'"); + case FAILURE: // Intentional fallthrough — this is an error. + log.log(WARNING, "Visiting failed: " + control.getResult().getMessage()); + status = status.failed(clock.instant(), control.getResult().getMessage()); + break; + case ABORTED: + log.log(FINE, () -> "Halting reindexing of " + type + " due to shutdown — will continue later"); + status = status.halted(); + break; + case SUCCESS: + log.log(INFO, "Completed reindexing of " + type + " after " + Duration.between(status.startedAt(), clock.instant())); + status = status.successful(clock.instant()); + } + database.writeReindexing(reindexing.with(type, status)); + } + + private void visit(DocumentType type, ProgressToken progress, VisitorControlHandler control) { + VisitorParameters parameters = createParameters(type, progress); + parameters.setControlHandler(control); + VisitorSession session; + try { + session = access.createVisitorSession(parameters); + } + catch (ParseException e) { + throw new IllegalStateException(e); + } + + // Wait until done; or until termination is forced, in which case we abort the visit and wait for it to complete. + phaser.arriveAndAwaitAdvance(); // Synchronize with the visitor completion thread. + session.destroy(); + } + + VisitorParameters createParameters(DocumentType type, ProgressToken progress) { + VisitorParameters parameters = new VisitorParameters(type.getName()); + parameters.setRemoteDataHandler(cluster.name()); + parameters.setResumeToken(progress); + parameters.setFieldSet(type.getName() + ":[document]"); + parameters.setPriority(DocumentProtocol.Priority.LOW_1); + parameters.setRoute(cluster.route()); + parameters.setBucketSpace(cluster.bucketSpaceOf(type)); + // parameters.setVisitorLibrary("ReindexVisitor"); // TODO jonmv: Use when ready, or perhaps an argument to the DumpVisitor is enough? + return parameters; + } + + + static class Cluster { + + private final String name; + private final String configId; + private final Map<DocumentType, String> documentBuckets; + + Cluster(String name, String configId, Map<DocumentType, String> documentBuckets) { + this.name = requireNonNull(name); + this.configId = requireNonNull(configId); + this.documentBuckets = Map.copyOf(documentBuckets); + } + + String name() { + return name; + } + + String route() { + return "[Storage:cluster=" + name + ";clusterconfigid=" + configId + "]"; + } + + String bucketSpaceOf(DocumentType documentType) { + return requireNonNull(documentBuckets.get(documentType), "Unknown bucket space for " + documentType); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Cluster cluster = (Cluster) o; + return name.equals(cluster.name) && + configId.equals(cluster.configId) && + documentBuckets.equals(cluster.documentBuckets); + } + + @Override + public int hashCode() { + return Objects.hash(name, configId, documentBuckets); + } + + @Override + public String toString() { + return "Cluster{" + + "name='" + name + '\'' + + ", configId='" + configId + '\'' + + ", documentBuckets=" + documentBuckets + + '}'; + } + + } + +} diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexing.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexing.java index e358beb2deb..792889e4aa8 100644 --- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexing.java +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexing.java @@ -86,7 +86,7 @@ public class Reindexing { /** Returns a new, empty status, with no progress or result, in state READY. */ public static Status ready(Instant now) { - return new Status(requireNonNull(now), null, new ProgressToken(), State.READY, null); + return new Status(requireNonNull(now), null, null, State.READY, null); } /** Returns a copy of this, in state RUNNING. */ diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java index 1158b2c9d3f..2044e6869f6 100644 --- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java @@ -2,6 +2,7 @@ package ai.vespa.reindexing; import ai.vespa.reindexing.Reindexing.Status; +import com.google.common.util.concurrent.UncheckedTimeoutException; import com.yahoo.document.DocumentTypeManager; import com.yahoo.documentapi.ProgressToken; import com.yahoo.path.Path; @@ -10,8 +11,10 @@ import com.yahoo.slime.Inspector; import com.yahoo.slime.Slime; import com.yahoo.slime.SlimeUtils; import com.yahoo.vespa.curator.Curator; +import com.yahoo.vespa.curator.Lock; import com.yahoo.yolean.Exceptions; +import java.time.Duration; import java.time.Instant; import java.util.function.Function; @@ -33,25 +36,45 @@ public class ReindexingCurator { private static final String STATE = "state"; private static final String MESSAGE = "message"; - private static final Path statusPath = Path.fromString("/reindexing/v1/status"); - private final Curator curator; + private final String clusterName; private final ReindexingSerializer serializer; + private final Duration lockTimeout; + + public ReindexingCurator(Curator curator, String clusterName, DocumentTypeManager manager) { + this(curator, clusterName, manager, Duration.ofSeconds(1)); + } - public ReindexingCurator(Curator curator, DocumentTypeManager manager) { + ReindexingCurator(Curator curator, String clusterName, DocumentTypeManager manager, Duration lockTimeout) { this.curator = curator; + this.clusterName = clusterName; this.serializer = new ReindexingSerializer(manager); + this.lockTimeout = lockTimeout; } public Reindexing readReindexing() { - return curator.getData(statusPath).map(serializer::deserialize) + return curator.getData(statusPath()).map(serializer::deserialize) .orElse(Reindexing.empty()); } public void writeReindexing(Reindexing reindexing) { - curator.set(statusPath, serializer.serialize(reindexing)); + curator.set(statusPath(), serializer.serialize(reindexing)); + } + + /** This lock must be held to manipulate reindexing state, or by whoever has a running visitor. */ + public Lock lockReindexing() throws ReindexingLockException { + try { + return curator.lock(lockPath(), lockTimeout); + } + catch (UncheckedTimeoutException e) { // TODO jonmv: Avoid use of guava classes. + throw new ReindexingLockException(e); + } } + private Path rootPath() { return Path.fromString("/reindexing/v1/" + clusterName); } + private Path statusPath() { return rootPath().append("status"); } + private Path lockPath() { return rootPath().append("lock"); } + private static class ReindexingSerializer { @@ -117,4 +140,13 @@ public class ReindexingCurator { } + /** Indicates that taking the reindexing lock failed within the alotted time. */ + static class ReindexingLockException extends Exception { + + ReindexingLockException(UncheckedTimeoutException cause) { + super("Failed to obtain the reindexing lock", cause); + } + + } + } diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java new file mode 100644 index 00000000000..0cf7f9eed9a --- /dev/null +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java @@ -0,0 +1,142 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.reindexing; + +import ai.vespa.reindexing.Reindexer.Cluster; +import ai.vespa.reindexing.ReindexingCurator.ReindexingLockException; +import com.google.inject.Inject; +import com.yahoo.cloud.config.ClusterListConfig; +import com.yahoo.cloud.config.ZookeepersConfig; +import com.yahoo.component.AbstractComponent; +import com.yahoo.concurrent.DaemonThreadFactory; +import com.yahoo.document.DocumentType; +import com.yahoo.document.DocumentTypeManager; +import com.yahoo.document.config.DocumentmanagerConfig; +import com.yahoo.documentapi.DocumentAccess; +import com.yahoo.net.HostName; +import com.yahoo.vespa.config.content.AllClustersBucketSpacesConfig; +import com.yahoo.vespa.config.content.reindexing.ReindexingConfig; +import com.yahoo.vespa.curator.Curator; +import com.yahoo.vespa.zookeeper.VespaZooKeeperServer; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; +import java.util.logging.Logger; +import java.util.stream.Stream; + +import static java.util.logging.Level.FINE; +import static java.util.logging.Level.WARNING; +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toMap; +import static java.util.stream.Collectors.toUnmodifiableMap; + +/** + * Runs in all cluster controller containers, and progresses reindexing efforts. + * Work is only done by one container at a time, by requiring a shared ZooKeeper lock to be held while visiting. + * Whichever maintainer gets the lock holds it until all reindexing is done, or until shutdown. + * + * @author jonmv + */ +public class ReindexingMaintainer extends AbstractComponent { + + private static final Logger log = Logger.getLogger(Reindexing.class.getName()); + + private final Reindexer reindexer; + private final ScheduledExecutorService executor; + + // VespaZooKeeperServer dependency to ensure the ZK cluster is running. + @Inject + public ReindexingMaintainer(VespaZooKeeperServer zooKeeperServer, DocumentAccess access, ZookeepersConfig zookeepersConfig, + ClusterListConfig clusterListConfig, AllClustersBucketSpacesConfig allClustersBucketSpacesConfig, + ReindexingConfig reindexingConfig, DocumentmanagerConfig documentmanagerConfig) { + this(Clock.systemUTC(), access, zookeepersConfig, clusterListConfig, allClustersBucketSpacesConfig, reindexingConfig, documentmanagerConfig); + } + + ReindexingMaintainer(Clock clock, DocumentAccess access, ZookeepersConfig zookeepersConfig, + ClusterListConfig clusterListConfig, AllClustersBucketSpacesConfig allClustersBucketSpacesConfig, + ReindexingConfig reindexingConfig, DocumentmanagerConfig documentmanagerConfig) { + DocumentTypeManager manager = new DocumentTypeManager(documentmanagerConfig); + this.reindexer = new Reindexer(parseCluster(reindexingConfig.clusterName(), clusterListConfig, allClustersBucketSpacesConfig, manager), + parseReady(reindexingConfig, manager), + new ReindexingCurator(Curator.create(zookeepersConfig.zookeeperserverlist()), + reindexingConfig.clusterName(), + manager), + access, + clock); + this.executor = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory("reindexer-")); + if (reindexingConfig.enabled()) + scheduleStaggered((delayMillis, intervalMillis) -> executor.scheduleAtFixedRate(this::maintain, delayMillis, intervalMillis, TimeUnit.MILLISECONDS), + Duration.ofMinutes(1), clock.instant(), HostName.getLocalhost(), zookeepersConfig.zookeeperserverlist()); + } + + private void maintain() { + try { + reindexer.reindex(); + } + catch (ReindexingLockException e) { + log.log(FINE, "Failed to acquire reindexing lock"); + } + catch (Exception e) { + log.log(WARNING, "Exception when reindexing", e); + } + } + + @Override + public void deconstruct() { + try { + reindexer.shutdown(); + executor.shutdown(); + if ( ! executor.awaitTermination(45, TimeUnit.SECONDS)) + log.log(WARNING, "Failed to shut down reindexer within timeout"); + } + catch (InterruptedException e) { + log.log(WARNING, "Interrupted while waiting for reindexer to shut down"); + Thread.currentThread().interrupt(); + } + + } + + static Map<DocumentType, Instant> parseReady(ReindexingConfig config, DocumentTypeManager manager) { + return config.status().entrySet().stream() + .collect(toUnmodifiableMap(typeStatus -> manager.getDocumentType(typeStatus.getKey()), + typeStatus -> Instant.ofEpochMilli(typeStatus.getValue().readyAtMillis()))); + } + + /** Schedules a task with the given interval (across all containers in this ZK cluster). */ + static void scheduleStaggered(BiConsumer<Long, Long> scheduler, + Duration interval, Instant now, + String hostname, String clusterHostnames) { + long delayMillis = 0; + long intervalMillis = interval.toMillis(); + List<String> hostnames = Stream.of(clusterHostnames.split(",")) + .map(hostPort -> hostPort.split(":")[0]) + .collect(toList()); + if (hostnames.contains(hostname)) { + long offset = hostnames.indexOf(hostname) * intervalMillis; + intervalMillis *= hostnames.size(); + delayMillis = Math.floorMod(offset - now.toEpochMilli(), intervalMillis); + } + scheduler.accept(delayMillis, intervalMillis); + } + + static Cluster parseCluster(String name, ClusterListConfig clusters, AllClustersBucketSpacesConfig bucketSpaces, + DocumentTypeManager manager) { + return clusters.storage().stream() + .filter(storage -> storage.name().equals(name)) + .map(storage -> new Cluster(name, + storage.configid(), + bucketSpaces.cluster(name) + .documentType().entrySet().stream() + .collect(toMap(entry -> manager.getDocumentType(entry.getKey()), + entry -> entry.getValue().bucketSpace())))) + .findAny() + .orElseThrow(() -> new IllegalStateException("This cluster (" + name + ") not among the list of clusters")); + } + +} diff --git a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java new file mode 100644 index 00000000000..20393cba958 --- /dev/null +++ b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java @@ -0,0 +1,182 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.reindexing; + +import ai.vespa.reindexing.Reindexer.Cluster; +import ai.vespa.reindexing.Reindexing.Status; +import ai.vespa.reindexing.ReindexingCurator.ReindexingLockException; +import com.yahoo.document.Document; +import com.yahoo.document.DocumentPut; +import com.yahoo.document.DocumentType; +import com.yahoo.document.DocumentTypeManager; +import com.yahoo.document.config.DocumentmanagerConfig; +import com.yahoo.documentapi.DocumentAccessParams; +import com.yahoo.documentapi.ProgressToken; +import com.yahoo.documentapi.SyncParameters; +import com.yahoo.documentapi.VisitorParameters; +import com.yahoo.documentapi.local.LocalDocumentAccess; +import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; +import com.yahoo.searchdefinition.derived.Deriver; +import com.yahoo.test.ManualClock; +import com.yahoo.vespa.curator.mock.MockCurator; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.time.Duration; +import java.time.Instant; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.Phaser; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.fail; + +/** + * @author jonmv + */ +class ReindexerTest { + + final DocumentmanagerConfig musicConfig = Deriver.getDocumentManagerConfig("src/test/resources/schemas/music.sd").build(); + final DocumentTypeManager manager = new DocumentTypeManager(musicConfig); + final DocumentType music = manager.getDocumentType("music"); + final Document document1 = new Document(music, "id:ns:music::one"); + final Document document2 = new Document(music, "id:ns:music::two"); + final Cluster cluster = new Cluster("cluster", "id", Map.of(music, "default")); + final ManualClock clock = new ManualClock(Instant.EPOCH); + + ReindexingCurator database; + LocalDocumentAccess access; + + @BeforeEach + void setUp() { + database = new ReindexingCurator(new MockCurator(), "cluster", manager, Duration.ofMillis(1)); + access = new LocalDocumentAccess(new DocumentAccessParams().setDocumentmanagerConfig(musicConfig)); + } + + @Test + void throwsWhenUnknownBuckets() { + assertThrows(NullPointerException.class, + () -> new Reindexer(new Cluster("cluster", "id", Map.of()), + Map.of(music, Instant.EPOCH), + database, + access, + clock)); + } + + @Test + void throwsWhenLockHeldElsewhere() throws InterruptedException, ExecutionException { + Reindexer reindexer = new Reindexer(cluster, Map.of(music, Instant.EPOCH), database, access, clock); + Executors.newSingleThreadExecutor().submit(database::lockReindexing).get(); + assertThrows(ReindexingLockException.class, reindexer::reindex); + } + + @Test + @Timeout(10) + void nothingToDo() throws ReindexingLockException { + Reindexer reindexer = new Reindexer(cluster, Map.of(), database, access, clock); + access.createSyncSession(new SyncParameters.Builder().build()).put(new DocumentPut(document1)); + access.setPhaser(new Phaser(1)); // Would block any visiting until timeout. + reindexer.reindex(); + } + + @Test + void parameters() { + Reindexer reindexer = new Reindexer(cluster, Map.of(), database, access, clock); + ProgressToken token = new ProgressToken(); + VisitorParameters parameters = reindexer.createParameters(music, token); + assertEquals("music:[document]", parameters.getFieldSet()); + assertSame(token, parameters.getResumeToken()); + assertEquals("default", parameters.getBucketSpace()); + assertEquals("[Storage:cluster=cluster;clusterconfigid=id]", parameters.getRoute().toString()); + assertEquals("cluster", parameters.getRemoteDataHandler()); + assertEquals("music", parameters.getDocumentSelection()); + assertEquals(DocumentProtocol.Priority.LOW_1, parameters.getPriority()); + } + + @Test + @Timeout(10) + void testReindexing() throws ReindexingLockException, ExecutionException, InterruptedException { + // Reindexer is told to update "music" documents no earlier than EPOCH, which is just now. + Reindexer reindexer = new Reindexer(cluster, Map.of(music, Instant.EPOCH), database, access, clock); + Phaser phaser = new Phaser(1); + access.setPhaser(phaser); // Will block any visiting until timeout. + reindexer.reindex(); + + // Since "music" was a new document type, it is stored as just reindexed, and nothing else happens. + Reindexing reindexing = Reindexing.empty().with(music, Status.ready(Instant.EPOCH).running().successful(Instant.EPOCH)); + assertEquals(reindexing, database.readReindexing()); + + // New config tells reindexer to reindex "music" documents no earlier than at 10 millis after EPOCH, which isn't yet. + clock.advance(Duration.ofMillis(5)); + reindexer = new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(10)), database, access, clock); + reindexer.reindex(); // Nothing happens because it's not yet time. This isn't supposed to happen unless high clock skew. + assertEquals(reindexing, database.readReindexing()); + + // It's time to reindex the "music" documents, but since we sneak a FAILED status in there, nothing is done. + clock.advance(Duration.ofMillis(10)); + Reindexing failed = Reindexing.empty().with(music, Status.ready(clock.instant()).running().failed(clock.instant(), "fail")); + database.writeReindexing(failed); + reindexer = new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(10)), database, access, clock); + reindexer.reindex(); // Nothing happens because status is already FAILED for the document type. + assertEquals(failed, database.readReindexing()); + + // It's time to reindex the "music" documents — none yet, so this is a no-op, which just updates the timestamp. + database.writeReindexing(reindexing); // Restore state where reindexing was complete at 5 ms after EPOCH. + ExecutorService executor = Executors.newSingleThreadExecutor(); + reindexer.reindex(); + reindexing = reindexing.with(music, Status.ready(clock.instant()).running().successful(clock.instant())); + assertEquals(reindexing, database.readReindexing()); + + // We add a document and interrupt reindexing before the visit is complete. + access.createSyncSession(new SyncParameters.Builder().build()).put(new DocumentPut(document1)); + clock.advance(Duration.ofMillis(10)); + reindexer = new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(20)), database, access, clock); + Future<?> future = executor.submit(uncheckedReindex(reindexer)); + while (phaser.getRegisteredParties() == 1) + Thread.sleep(1); // Need to wait for the visitor to register, without any proper way of doing it >_< + database.writeReindexing(Reindexing.empty()); // Wreck database while running, to verify we write the expected value. + reindexer.shutdown(); + phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which may now send the document. + phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which may now complete. + future.get(); // Write state to database. + reindexing = reindexing.with(music, Status.ready(clock.instant()).running().halted()); + assertEquals(reindexing, database.readReindexing()); + + // Manually put a progress token in there, to verify this is used when resuming, and that we store what we retrieve. + reindexing = reindexing.with(music, Status.ready(clock.instant()).running().progressed(new ProgressToken()).halted()); + database.writeReindexing(reindexing); + reindexer = new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(20)), database, access, clock); + future = executor.submit(uncheckedReindex(reindexer)); + while (phaser.getRegisteredParties() == 1) + Thread.sleep(1); // Need to wait for the visitor to register, without any proper way of doing it >_< + database.writeReindexing(Reindexing.empty()); // Wreck database while running, to verify we write the expected value. + reindexer.shutdown(); // Interrupt the visit before it completes. + phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which may now send the document. + phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which may now complete. + future.get(); // Write state to database. + assertEquals(reindexing, database.readReindexing()); + + // Finally let the visit complete normally. + reindexer = new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(20)), database, access, clock); + future = executor.submit(uncheckedReindex(reindexer)); + while (phaser.getRegisteredParties() == 1) + Thread.sleep(1); // Need to wait for the visitor to register, without any proper way of doing it >_< + database.writeReindexing(Reindexing.empty()); // Wreck database while running, to verify we write the expected value. + phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which may now send the document. + phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which may now complete. + future.get(); // Write state to database. + reindexing = reindexing.with(music, Status.ready(clock.instant()).running().successful(clock.instant())); + assertEquals(reindexing, database.readReindexing()); + } + + Callable<Void> uncheckedReindex(Reindexer reindexer) { + return () -> { reindexer.reindex(); return null; }; + } + +} diff --git a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingCuratorTest.java b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingCuratorTest.java index 0501ff474ca..c5a58dcae68 100644 --- a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingCuratorTest.java +++ b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingCuratorTest.java @@ -25,7 +25,7 @@ class ReindexingCuratorTest { DocumentTypeManager manager = new DocumentTypeManager(musicConfig); DocumentType music = manager.getDocumentType("music"); MockCurator mockCurator = new MockCurator(); - ReindexingCurator curator = new ReindexingCurator(mockCurator, manager); + ReindexingCurator curator = new ReindexingCurator(mockCurator, "cluster", manager); assertEquals(Reindexing.empty(), curator.readReindexing()); @@ -42,7 +42,7 @@ class ReindexingCuratorTest { assertEquals(reindexing, curator.readReindexing()); // Unknown document types are forgotten. - assertEquals(Reindexing.empty(), new ReindexingCurator(mockCurator, new DocumentTypeManager(emptyConfig)).readReindexing()); + assertEquals(Reindexing.empty(), new ReindexingCurator(mockCurator, "cluster", new DocumentTypeManager(emptyConfig)).readReindexing()); } } diff --git a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingMaintainerTest.java b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingMaintainerTest.java new file mode 100644 index 00000000000..713fb836d62 --- /dev/null +++ b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingMaintainerTest.java @@ -0,0 +1,138 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.reindexing; + +import ai.vespa.reindexing.Reindexer.Cluster; +import com.yahoo.cloud.config.ClusterListConfig; +import com.yahoo.document.DocumentTypeManager; +import com.yahoo.document.config.DocumentmanagerConfig; +import com.yahoo.searchdefinition.derived.Deriver; +import com.yahoo.vespa.config.content.AllClustersBucketSpacesConfig; +import com.yahoo.vespa.config.content.reindexing.ReindexingConfig; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.time.Instant; +import java.util.Map; + +import static ai.vespa.reindexing.ReindexingMaintainer.parseCluster; +import static ai.vespa.reindexing.ReindexingMaintainer.parseReady; +import static ai.vespa.reindexing.ReindexingMaintainer.scheduleStaggered; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +/** + * @author jonmv + */ +class ReindexingMaintainerTest { + + @Test + void testParsing() { + DocumentmanagerConfig musicConfig = Deriver.getDocumentManagerConfig("src/test/resources/schemas/music.sd").build(); + DocumentTypeManager manager = new DocumentTypeManager(musicConfig); + + assertEquals(Map.of(manager.getDocumentType("music"), Instant.ofEpochMilli(123)), + parseReady(new ReindexingConfig.Builder() + .enabled(true) + .clusterName("cluster") + .status("music", new ReindexingConfig.Status.Builder().readyAtMillis(123)) + .build(), + manager)); + + // Unknown document type fails + assertThrows(IllegalArgumentException.class, + () -> parseReady(new ReindexingConfig.Builder() + .status("poetry", new ReindexingConfig.Status.Builder().readyAtMillis(123)) + .build(), + manager)); + + assertEquals(new Cluster("cluster", "configId", Map.of(manager.getDocumentType("music"), "default")), + parseCluster("cluster", + new ClusterListConfig.Builder() + .storage(new ClusterListConfig.Storage.Builder() + .name("oyster") + .configid("configId")) + .storage(new ClusterListConfig.Storage.Builder() + .name("cluster") + .configid("configId")) + .build(), + new AllClustersBucketSpacesConfig.Builder() + .cluster("oyster", new AllClustersBucketSpacesConfig.Cluster.Builder() + .documentType("music", new AllClustersBucketSpacesConfig.Cluster.DocumentType.Builder() + .bucketSpace("global"))) + .cluster("cluster", new AllClustersBucketSpacesConfig.Cluster.Builder() + .documentType("music", new AllClustersBucketSpacesConfig.Cluster.DocumentType.Builder() + .bucketSpace("default"))) + .build(), + manager)); + + // Cluster missing in bucket space list fails. + assertThrows(NullPointerException.class, + () -> parseCluster("cluster", + new ClusterListConfig.Builder() + .storage(new ClusterListConfig.Storage.Builder() + .name("cluster") + .configid("configId")) + .build(), + new AllClustersBucketSpacesConfig.Builder() + .cluster("oyster", new AllClustersBucketSpacesConfig.Cluster.Builder() + .documentType("music", new AllClustersBucketSpacesConfig.Cluster.DocumentType.Builder() + .bucketSpace("global"))) + .build(), + manager)); + + // Cluster missing in cluster list fails. + assertThrows(IllegalStateException.class, + () -> parseCluster("cluster", + new ClusterListConfig.Builder() + .storage(new ClusterListConfig.Storage.Builder() + .name("oyster") + .configid("configId")) + .build(), + new AllClustersBucketSpacesConfig.Builder() + .cluster("cluster", new AllClustersBucketSpacesConfig.Cluster.Builder() + .documentType("music", new AllClustersBucketSpacesConfig.Cluster.DocumentType.Builder() + .bucketSpace("default"))) + .build(), + manager)); + } + + @Test + void testStaggering() { + scheduleStaggered((delayMillis, intervalMillis) -> { + assertEquals(0, delayMillis); + assertEquals(10, intervalMillis); + }, + Duration.ofMillis(10), + Instant.ofEpochMilli(27), + "host", + "nys:123,hark:123"); + + scheduleStaggered((delayMillis, intervalMillis) -> { + assertEquals(3, delayMillis); + assertEquals(10, intervalMillis); + }, + Duration.ofMillis(10), + Instant.ofEpochMilli(27), + "host", + "host:123"); + + scheduleStaggered((delayMillis, intervalMillis) -> { + assertEquals(7, delayMillis); + assertEquals(20, intervalMillis); + }, + Duration.ofMillis(10), + Instant.ofEpochMilli(13), + "host", + "host:123,:nys:321"); + + scheduleStaggered((delayMillis, intervalMillis) -> { + assertEquals(17, delayMillis); + assertEquals(20, intervalMillis); + }, + Duration.ofMillis(10), + Instant.ofEpochMilli(13), + "nys", + "host:123,nys:321"); + } + +} diff --git a/clustercontroller-reindexer/src/test/resources/schemas/music.sd b/clustercontroller-reindexer/src/test/resources/schemas/music.sd index a289f5a686b..e55fff26ea5 100644 --- a/clustercontroller-reindexer/src/test/resources/schemas/music.sd +++ b/clustercontroller-reindexer/src/test/resources/schemas/music.sd @@ -1,5 +1,8 @@ # Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. search music { + field auxilliary type int { + indexing: random 123 | summary + } document music { field artist type string { } } diff --git a/config-application-package/src/main/java/com/yahoo/config/model/application/provider/FilesApplicationPackage.java b/config-application-package/src/main/java/com/yahoo/config/model/application/provider/FilesApplicationPackage.java index 91ff1c6d339..9c68fb95cce 100644 --- a/config-application-package/src/main/java/com/yahoo/config/model/application/provider/FilesApplicationPackage.java +++ b/config-application-package/src/main/java/com/yahoo/config/model/application/provider/FilesApplicationPackage.java @@ -80,7 +80,7 @@ public class FilesApplicationPackage implements ApplicationPackage { * The name of the subdirectory (below the original application package root) * where a preprocessed version of this application package is stored. * As it happens, the config model is first created with an application package in this subdirectory, - * and later used backed by an application package which is not in this subdirectory. + * and later backed by an application package which is not in this subdirectory. * To enable model code to correct for this, this constant must be publicly known. * * All of this stuff is Very Unfortunate and should be fixed. -Jon @@ -306,34 +306,15 @@ public class FilesApplicationPackage implements ApplicationPackage { @Override public Collection<NamedReader> searchDefinitionContents() { - Map<String, NamedReader> ret = new LinkedHashMap<>(); - Set<String> fileSds = new LinkedHashSet<>(); - Set<String> bundleSds = new LinkedHashSet<>(); + Set<NamedReader> ret = new LinkedHashSet<>(); try { for (File f : getSearchDefinitionFiles()) { - fileSds.add(f.getName()); - ret.put(f.getName(), new NamedReader(f.getName(), new FileReader(f))); + ret.add(new NamedReader(f.getName(), new FileReader(f))); } } catch (Exception e) { throw new IllegalArgumentException("Couldn't get search definition contents.", e); } - verifySdsDisjoint(fileSds, bundleSds); - return ret.values(); - } - - /** - * Verify that two sets of search definitions are disjoint (TODO: everything except error message is very generic). - * - * @param fileSds Set of search definitions from file - * @param bundleSds Set of search definitions from bundles - */ - private void verifySdsDisjoint(Set<String> fileSds, Set<String> bundleSds) { - if (!Collections.disjoint(fileSds, bundleSds)) { - Collection<String> disjoint = new ArrayList<>(fileSds); - disjoint.retainAll(bundleSds); - throw new IllegalArgumentException("For the following search definitions names there are collisions between those specified inside " + - "docproc bundles and those in searchdefinitions/ in application package: "+disjoint); - } + return ret; } /** diff --git a/configserver/pom.xml b/configserver/pom.xml index 8cd1b4b4254..43187038f97 100644 --- a/configserver/pom.xml +++ b/configserver/pom.xml @@ -220,6 +220,11 @@ <scope>test</scope> </dependency> <dependency> + <groupId>org.assertj</groupId> + <artifactId>assertj-core</artifactId> + <scope>test</scope> + </dependency> + <dependency> <groupId>com.yahoo.vespa</groupId> <artifactId>http-utils</artifactId> <version>${project.version}</version> diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ApplicationReindexing.java b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ApplicationReindexing.java index fae52f318bd..ed28bec0d81 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ApplicationReindexing.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ApplicationReindexing.java @@ -42,14 +42,14 @@ public class ApplicationReindexing implements Reindexing { /** Returns a copy of this with common reindexing for the given cluster ready at the given instant. */ public ApplicationReindexing withReady(String cluster, Instant readyAt) { - Cluster current = clusters.getOrDefault(cluster, Cluster.empty); + Cluster current = clusters.getOrDefault(cluster, Cluster.ready(common)); Cluster modified = new Cluster(new Status(readyAt), current.pending, current.ready); return new ApplicationReindexing(common, with(cluster, modified, clusters)); } /** Returns a copy of this with reindexing for the given document type in the given cluster ready at the given instant. */ public ApplicationReindexing withReady(String cluster, String documentType, Instant readyAt) { - Cluster current = clusters.getOrDefault(cluster, Cluster.empty); + Cluster current = clusters.getOrDefault(cluster, Cluster.ready(common)); Cluster modified = new Cluster(current.common, without(documentType, current.pending), with(documentType, new Status(readyAt), current.ready)); @@ -58,7 +58,7 @@ public class ApplicationReindexing implements Reindexing { /** Returns a copy of this with a pending reindexing at the given generation, for the given document type. */ public ApplicationReindexing withPending(String cluster, String documentType, long requiredGeneration) { - Cluster current = clusters.getOrDefault(cluster, Cluster.empty); + Cluster current = clusters.getOrDefault(cluster, Cluster.ready(common)); Cluster modified = new Cluster(current.common, with(documentType, requirePositive(requiredGeneration), current.pending), without(documentType, current.ready)); @@ -79,13 +79,10 @@ public class ApplicationReindexing implements Reindexing { if (clusters.get(cluster).pending().containsKey(documentType)) return Optional.empty(); - Status documentStatus = clusters.get(cluster).ready().get(documentType); - Status clusterStatus = clusters.get(cluster).common(); - if (documentStatus == null || documentStatus.ready().isBefore(clusterStatus.ready())) - documentStatus = clusterStatus; + if (clusters.get(cluster).ready().containsKey(documentType)) + return Optional.of(clusters.get(cluster).ready().get(documentType)); - if (documentStatus.ready().isAfter(common().ready())) - return Optional.of(documentStatus); + return Optional.of(clusters.get(cluster).common()); } return Optional.of(common()); } @@ -116,7 +113,7 @@ public class ApplicationReindexing implements Reindexing { /** Reindexing status for a single content cluster in an application. */ public static class Cluster { - private static final Cluster empty = new Cluster(Status.ALWAYS_READY, Map.of(), Map.of()); + private static Cluster ready(Status common) { return new Cluster(common, Map.of(), Map.of()); } private final Status common; private final Map<String, Long> pending; @@ -173,9 +170,6 @@ public class ApplicationReindexing implements Reindexing { /** Reindexing status common to an application, one of its clusters, or a single document type in a cluster. */ public static class Status implements Reindexing.Status { - /** Always ready, i.e., ignored when joining with more specific statuses. */ - private static final Status ALWAYS_READY = new Status(Instant.EPOCH); - private final Instant ready; Status(Instant ready) { diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ConfigConvergenceChecker.java b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ConfigConvergenceChecker.java index 6b316c06b54..7376452df42 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ConfigConvergenceChecker.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ConfigConvergenceChecker.java @@ -1,27 +1,29 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.config.server.application; -import ai.vespa.util.http.VespaClientBuilderFactory; +import ai.vespa.util.http.VespaHttpClientBuilder; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.Inject; import com.yahoo.component.AbstractComponent; +import com.yahoo.concurrent.DaemonThreadFactory; import com.yahoo.config.model.api.HostInfo; import com.yahoo.config.model.api.PortInfo; import com.yahoo.config.model.api.ServiceInfo; -import java.util.logging.Level; +import com.yahoo.log.LogLevel; import com.yahoo.slime.Cursor; import com.yahoo.vespa.config.server.http.JSONResponse; -import org.glassfish.jersey.client.ClientProperties; -import org.glassfish.jersey.client.proxy.WebResourceFactory; - -import javax.ws.rs.GET; -import javax.ws.rs.Path; -import javax.ws.rs.ProcessingException; -import javax.ws.rs.client.Client; -import javax.ws.rs.client.ClientRequestFilter; -import javax.ws.rs.client.WebTarget; -import javax.ws.rs.core.HttpHeaders; +import org.apache.http.HttpStatus; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.utils.URIBuilder; +import org.apache.http.impl.client.CloseableHttpClient; + +import java.io.IOException; +import java.io.UncheckedIOException; import java.net.URI; +import java.net.URISyntaxException; import java.time.Duration; import java.util.ArrayList; import java.util.LinkedHashMap; @@ -29,25 +31,29 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.logging.Logger; -import java.util.stream.Collectors; import static com.yahoo.config.model.api.container.ContainerServiceType.CLUSTERCONTROLLER_CONTAINER; import static com.yahoo.config.model.api.container.ContainerServiceType.CONTAINER; import static com.yahoo.config.model.api.container.ContainerServiceType.LOGSERVER_CONTAINER; import static com.yahoo.config.model.api.container.ContainerServiceType.QRSERVER; +import static java.util.stream.Collectors.toList; /** * Checks for convergence of config generation for a given application. * * @author Ulf Lilleengen * @author hmusum + * @author bjorncs */ public class ConfigConvergenceChecker extends AbstractComponent { private static final Logger log = Logger.getLogger(ConfigConvergenceChecker.class.getName()); - private static final String statePath = "/state/v1/"; - private static final String configSubPath = "config"; private final static Set<String> serviceTypesToCheck = Set.of( CONTAINER.serviceName, QRSERVER.serviceName, @@ -58,16 +64,13 @@ public class ConfigConvergenceChecker extends AbstractComponent { "distributor" ); - private final StateApiFactory stateApiFactory; - private final VespaClientBuilderFactory clientBuilderFactory = new VespaClientBuilderFactory(); + private final CloseableHttpClient httpClient; + private final ObjectMapper jsonMapper = new ObjectMapper(); + private final ExecutorService executor = createThreadpool(); @Inject public ConfigConvergenceChecker() { - this(ConfigConvergenceChecker::createStateApi); - } - - public ConfigConvergenceChecker(StateApiFactory stateApiFactory) { - this.stateApiFactory = stateApiFactory; + this.httpClient = createHttpClient(); } /** Fetches the active config generation for all services in the given application. */ @@ -82,7 +85,7 @@ public class ConfigConvergenceChecker extends AbstractComponent { } /** Check all services in given application. Returns the minimum current generation of all services */ - public ServiceListResponse getServiceConfigGenerationsResponse(Application application, URI requestUrl, Duration timeoutPerService) { + public JSONResponse getServiceConfigGenerationsResponse(Application application, URI requestUrl, Duration timeoutPerService) { Map<ServiceInfo, Long> currentGenerations = getServiceConfigGenerations(application, timeoutPerService); long currentGeneration = currentGenerations.values().stream().mapToLong(Long::longValue).min().orElse(-1); return new ServiceListResponse(200, currentGenerations, requestUrl, application.getApplicationGeneration(), @@ -90,7 +93,7 @@ public class ConfigConvergenceChecker extends AbstractComponent { } /** Check service identified by host and port in given application */ - public ServiceResponse getServiceConfigGenerationResponse(Application application, String hostAndPortToCheck, URI requestUrl, Duration timeout) { + public JSONResponse getServiceConfigGenerationResponse(Application application, String hostAndPortToCheck, URI requestUrl, Duration timeout) { Long wantedGeneration = application.getApplicationGeneration(); try { if ( ! hostInApplication(application, hostAndPortToCheck)) @@ -99,7 +102,7 @@ public class ConfigConvergenceChecker extends AbstractComponent { long currentGeneration = getServiceGeneration(URI.create("http://" + hostAndPortToCheck), timeout); boolean converged = currentGeneration >= wantedGeneration; return ServiceResponse.createOkResponse(requestUrl, hostAndPortToCheck, wantedGeneration, currentGeneration, converged); - } catch (ProcessingException e) { // e.g. if we cannot connect to the service to find generation + } catch (NonSuccessStatusCodeException | IOException e) { // e.g. if we cannot connect to the service to find generation return ServiceResponse.createNotFoundResponse(requestUrl, hostAndPortToCheck, wantedGeneration, e.getMessage()); } catch (Exception e) { return ServiceResponse.createErrorResponse(requestUrl, hostAndPortToCheck, wantedGeneration, e.getMessage()); @@ -108,46 +111,64 @@ public class ConfigConvergenceChecker extends AbstractComponent { @Override public void deconstruct() { - clientBuilderFactory.close(); - } - - @Path(statePath) - public interface StateApi { - @Path(configSubPath) - @GET - JsonNode config(); - } - - public interface StateApiFactory { - StateApi createStateApi(Client client, URI serviceUri); + try { + httpClient.close(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } } - /** Gets service generation for a list of services (in parallel). */ + /** + * Gets service generation for a list of services (in parallel). + * This should ideally be implemented using an async http client + * */ private Map<ServiceInfo, Long> getServiceGenerations(List<ServiceInfo> services, Duration timeout) { - return services.parallelStream() - .collect(Collectors.toMap(service -> service, - service -> { - try { - return getServiceGeneration(URI.create("http://" + service.getHostName() - + ":" + getStatePort(service).get()), timeout); - } - catch (ProcessingException e) { // Cannot connect to service to determine service generation - return -1L; - } - }, - (v1, v2) -> { throw new IllegalStateException("Duplicate keys for values '" + v1 + "' and '" + v2 + "'."); }, - LinkedHashMap::new - )); + List<Callable<ServiceInfoWithGeneration>> tasks = services.stream() + .map(service -> + (Callable<ServiceInfoWithGeneration>) () -> { + long generation; + try { + generation = getServiceGeneration(URI.create("http://" + service.getHostName() + + ":" + getStatePort(service).get()), timeout); + } catch (IOException | NonSuccessStatusCodeException e) { + generation = -1L; + } + return new ServiceInfoWithGeneration(service, generation); + }) + .collect(toList()); + try { + List<Future<ServiceInfoWithGeneration>> taskResults = executor.invokeAll(tasks); + Map<ServiceInfo, Long> result = new LinkedHashMap<>(); + for (Future<ServiceInfoWithGeneration> taskResult : taskResults) { + ServiceInfoWithGeneration info = taskResult.get(); + result.put(info.service, info.generation); + } + return result; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Failed to retrieve config generation: " + e.getMessage(), e); + } catch (ExecutionException e) { + throw new RuntimeException("Failed to retrieve config generation: " + e.getMessage(), e); + } } /** Get service generation of service at given URL */ - private long getServiceGeneration(URI serviceUrl, Duration timeout) { - Client client = createClient(timeout); - try { - StateApi state = stateApiFactory.createStateApi(client, serviceUrl); - return generationFromContainerState(state.config()); - } finally { - client.close(); + private long getServiceGeneration(URI serviceUrl, Duration timeout) throws IOException, NonSuccessStatusCodeException { + HttpGet request = new HttpGet(createApiUri(serviceUrl)); + request.addHeader("Connection", "close"); + request.setConfig(createRequestConfig(timeout)); + try (CloseableHttpResponse response = httpClient.execute(request)) { + int statusCode = response.getStatusLine().getStatusCode(); + if (statusCode != HttpStatus.SC_OK) throw new NonSuccessStatusCodeException(statusCode); + if (response.getEntity() == null) throw new IOException("Response has no content"); + JsonNode jsonContent = jsonMapper.readTree(response.getEntity().getContent()); + return generationFromContainerState(jsonContent); + } catch (Exception e) { + log.log( + LogLevel.DEBUG, + e, + () -> String.format("Failed to retrieve service config generation for '%s': %s", serviceUrl, e.getMessage())); + throw e; } } @@ -166,16 +187,6 @@ public class ConfigConvergenceChecker extends AbstractComponent { return false; } - private Client createClient(Duration timeout) { - return clientBuilderFactory.newBuilder() - .register( - (ClientRequestFilter) ctx -> - ctx.getHeaders().put(HttpHeaders.USER_AGENT, List.of("config-convergence-checker"))) - .property(ClientProperties.CONNECT_TIMEOUT, (int) timeout.toMillis()) - .property(ClientProperties.READ_TIMEOUT, (int) timeout.toMillis()) - .build(); - } - private static Optional<Integer> getStatePort(ServiceInfo service) { return service.getPorts().stream() .filter(port -> port.getTags().contains("state")) @@ -187,9 +198,57 @@ public class ConfigConvergenceChecker extends AbstractComponent { return state.get("config").get("generation").asLong(-1); } - private static StateApi createStateApi(Client client, URI uri) { - WebTarget target = client.target(uri); - return WebResourceFactory.newResource(StateApi.class, target); + private static URI createApiUri(URI serviceUrl) { + try { + return new URIBuilder(serviceUrl) + .setPath("/state/v1/config") + .build(); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } + + private static ExecutorService createThreadpool() { + return Executors.newFixedThreadPool( + Runtime.getRuntime().availableProcessors(), new DaemonThreadFactory("config-convergence-checker-")); + } + + private static CloseableHttpClient createHttpClient() { + return VespaHttpClientBuilder + .create() + .setUserAgent("config-convergence-checker") + .setMaxConnPerRoute(10) + .setMaxConnTotal(400) + .setConnectionReuseStrategy((response, context) -> false) // Disable connection reuse + .build(); + } + + private static RequestConfig createRequestConfig(Duration timeout) { + int timeoutMillis = (int)timeout.toMillis(); + return RequestConfig.custom() + .setConnectionRequestTimeout(timeoutMillis) + .setConnectTimeout(timeoutMillis) + .setSocketTimeout(timeoutMillis) + .build(); + } + + private static class ServiceInfoWithGeneration { + final ServiceInfo service; + final long generation; + + ServiceInfoWithGeneration(ServiceInfo service, long generation) { + this.service = service; + this.generation = generation; + } + } + + private static class NonSuccessStatusCodeException extends Exception { + final int statusCode; + + NonSuccessStatusCodeException(int statusCode) { + super("Expected status code 200, got " + statusCode); + this.statusCode = statusCode; + } } private static class ServiceListResponse extends JSONResponse { diff --git a/configserver/src/main/java/com/yahoo/vespa/serviceview/ConfigServerLocation.java b/configserver/src/main/java/com/yahoo/vespa/serviceview/ConfigServerLocation.java index cc452421d2d..05d1119aa4f 100644 --- a/configserver/src/main/java/com/yahoo/vespa/serviceview/ConfigServerLocation.java +++ b/configserver/src/main/java/com/yahoo/vespa/serviceview/ConfigServerLocation.java @@ -15,6 +15,7 @@ public class ConfigServerLocation extends AbstractComponent { final int restApiPort; // The client factory must be owned by a component as StateResource is instantiated per request + @SuppressWarnings("removal") final VespaClientBuilderFactory clientBuilderFactory = new VespaClientBuilderFactory(); @Inject diff --git a/configserver/src/main/java/com/yahoo/vespa/serviceview/StateResource.java b/configserver/src/main/java/com/yahoo/vespa/serviceview/StateResource.java index 76e600d2ad8..138e6c8798c 100644 --- a/configserver/src/main/java/com/yahoo/vespa/serviceview/StateResource.java +++ b/configserver/src/main/java/com/yahoo/vespa/serviceview/StateResource.java @@ -40,6 +40,7 @@ public class StateResource implements StateClient { private static final String USER_AGENT = "service-view-config-server-client"; private static final String SINGLE_API_LINK = "url"; + @SuppressWarnings("removal") private final VespaClientBuilderFactory clientBuilderFactory; private final int restApiPort; private final String host; diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/application/ApplicationReindexingTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/application/ApplicationReindexingTest.java index b803413d6ea..82de3e2eefb 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/application/ApplicationReindexingTest.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/application/ApplicationReindexingTest.java @@ -27,12 +27,19 @@ public class ApplicationReindexingTest { .withReady("one", "a", Instant.ofEpochMilli(1)) .withReady("two", "c", Instant.ofEpochMilli(3)); - assertEquals(Instant.ofEpochMilli(1 << 20), + // Document is most specific, and is used. + assertEquals(Instant.ofEpochMilli(1), reindexing.status("one", "a").orElseThrow().ready()); + // Cluster is most specific, and inherits application's common status. assertEquals(Instant.ofEpochMilli(1 << 20), reindexing.status("one", "d").orElseThrow().ready()); + // Cluster is most specific, and has its own status set. + assertEquals(Instant.ofEpochMilli(2 << 10), + reindexing.status("two", "d").orElseThrow().ready()); + + // Application is most specific, as cluster and documeent are unknown. assertEquals(Instant.ofEpochMilli(1 << 20), reindexing.status("three", "a").orElseThrow().ready()); @@ -45,7 +52,7 @@ public class ApplicationReindexingTest { assertEquals(Set.of("one", "two"), reindexing.clusters().keySet()); - assertEquals(new Status(Instant.EPOCH), + assertEquals(new Status(Instant.ofEpochMilli(1 << 20)), reindexing.clusters().get("one").common()); assertEquals(Map.of("a", new Status(Instant.ofEpochMilli(1))), diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/application/ConfigConvergenceCheckerTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/application/ConfigConvergenceCheckerTest.java index 6aeb774d2b0..4948432c646 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/application/ConfigConvergenceCheckerTest.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/application/ConfigConvergenceCheckerTest.java @@ -2,15 +2,13 @@ package com.yahoo.vespa.config.server.application; import com.github.tomakehurst.wiremock.junit.WireMockRule; +import com.yahoo.component.Version; import com.yahoo.config.model.api.Model; import com.yahoo.config.provision.ApplicationId; import com.yahoo.config.provision.ApplicationName; import com.yahoo.config.provision.InstanceName; import com.yahoo.config.provision.TenantName; -import com.yahoo.component.Version; import com.yahoo.container.jdisc.HttpResponse; -import com.yahoo.slime.Slime; -import com.yahoo.slime.SlimeUtils; import com.yahoo.vespa.config.server.ServerCache; import com.yahoo.vespa.config.server.monitoring.MetricUpdater; import org.junit.Before; @@ -31,8 +29,9 @@ import static com.github.tomakehurst.wiremock.client.WireMock.get; import static com.github.tomakehurst.wiremock.client.WireMock.okJson; import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.options; +import static com.yahoo.test.json.JsonTestHelper.assertJsonEquals; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; /** * @author Ulf Lilleengen @@ -184,10 +183,15 @@ public class ConfigConvergenceCheckerTest { .withBody("response too slow"))); HttpResponse response = checker.getServiceConfigGenerationResponse(application, hostAndPort(service), requestUrl, Duration.ofMillis(1)); // Message contained in a SocketTimeoutException may differ across platforms, so we do a partial match of the response here - assertResponse((responseBody) -> assertTrue("Response matches", responseBody.startsWith( - "{\"url\":\"" + requestUrl.toString() + "\",\"host\":\"" + hostAndPort(requestUrl) + - "\",\"wantedGeneration\":3,\"error\":\"java.net.SocketTimeoutException") && - responseBody.endsWith("\"}")), 404, response); + assertResponse( + responseBody -> + assertThat(responseBody) + .startsWith("{\"url\":\"" + requestUrl.toString() + "\",\"host\":\"" + hostAndPort(requestUrl) + + "\",\"wantedGeneration\":3,\"error\":\"") + .contains("timed out") + .endsWith("\"}"), + 404, + response); } private URI testServer() { @@ -202,16 +206,8 @@ public class ConfigConvergenceCheckerTest { return uri.getHost() + ":" + uri.getPort(); } - private static void assertResponse(String json, int status, HttpResponse response) { - assertResponse((responseBody) -> { - Slime expected = SlimeUtils.jsonToSlime(json.getBytes()); - Slime actual = SlimeUtils.jsonToSlime(responseBody.getBytes()); - try { - assertEquals(new String((SlimeUtils.toJsonBytes(expected))), new String(SlimeUtils.toJsonBytes(actual))); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - }, status, response); + private static void assertResponse(String expectedJson, int status, HttpResponse response) { + assertResponse((responseBody) -> assertJsonEquals(new String(responseBody.getBytes()), expectedJson), status, response); } private static void assertResponse(Consumer<String> assertFunc, int status, HttpResponse response) { diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandlerTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandlerTest.java index 7276091fed0..06a455954ac 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandlerTest.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandlerTest.java @@ -1,7 +1,6 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.config.server.http.v2; -import com.fasterxml.jackson.databind.ObjectMapper; import com.yahoo.cloud.config.ConfigserverConfig; import com.yahoo.component.Version; import com.yahoo.config.model.api.ModelFactory; @@ -18,7 +17,6 @@ import com.yahoo.vespa.config.server.MockLogRetriever; import com.yahoo.vespa.config.server.MockProvisioner; import com.yahoo.vespa.config.server.MockTesterClient; import com.yahoo.vespa.config.server.TestComponentRegistry; -import com.yahoo.vespa.config.server.application.ConfigConvergenceChecker; import com.yahoo.vespa.config.server.application.HttpProxy; import com.yahoo.vespa.config.server.application.OrchestratorMock; import com.yahoo.vespa.config.server.deploy.DeployTester; @@ -37,12 +35,10 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import javax.ws.rs.client.Client; import java.io.ByteArrayInputStream; import java.io.File; import java.io.IOException; import java.io.InputStream; -import java.net.URI; import java.nio.charset.StandardCharsets; import java.util.List; @@ -425,21 +421,6 @@ public class ApplicationHandlerTest { return createApplicationHandler().handle(HttpRequest.createTestRequest(restartUrl, GET)); } - private static class MockStateApiFactory implements ConfigConvergenceChecker.StateApiFactory { - boolean createdApi = false; - @Override - public ConfigConvergenceChecker.StateApi createStateApi(Client client, URI serviceUri) { - createdApi = true; - return () -> { - try { - return new ObjectMapper().readTree("{\"config\":{\"generation\":1}}"); - } catch (IOException e) { - throw new RuntimeException(e); - } - }; - } - } - private ApplicationHandler createApplicationHandler() { return createApplicationHandler(applicationRepository); } diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/maintenance/ReindexingMaintainerTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/maintenance/ReindexingMaintainerTest.java index f4a553e25b7..57582040552 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/maintenance/ReindexingMaintainerTest.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/maintenance/ReindexingMaintainerTest.java @@ -16,13 +16,14 @@ public class ReindexingMaintainerTest { @Test public void testReadyComputation() { - ApplicationReindexing reindexing = ApplicationReindexing.ready(Instant.ofEpochMilli(1 << 20)) + ApplicationReindexing reindexing = ApplicationReindexing.ready(Instant.EPOCH) .withPending("one", "a", 10) .withReady("two", "b", Instant.ofEpochMilli(2)) .withPending("two", "b", 20) .withReady("two", Instant.ofEpochMilli(2 << 10)) .withReady("one", "a", Instant.ofEpochMilli(1)) - .withReady("two", "c", Instant.ofEpochMilli(3)); + .withReady("two", "c", Instant.ofEpochMilli(3)) + .withReady(Instant.ofEpochMilli(1 << 20)); assertEquals(reindexing, withReady(reindexing, () -> -1L, Instant.EPOCH)); diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/athenz/impl/AthenzClientFactoryImpl.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/athenz/impl/AthenzClientFactoryImpl.java index 323d49e4639..173729c7472 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/athenz/impl/AthenzClientFactoryImpl.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/athenz/impl/AthenzClientFactoryImpl.java @@ -2,7 +2,6 @@ package com.yahoo.vespa.hosted.controller.athenz.impl; import com.google.inject.Inject; -import com.yahoo.jdisc.Metric; import com.yahoo.vespa.athenz.api.AthenzIdentity; import com.yahoo.vespa.athenz.client.zms.DefaultZmsClient; import com.yahoo.vespa.athenz.client.zms.ZmsClient; @@ -11,32 +10,21 @@ import com.yahoo.vespa.athenz.client.zts.ZtsClient; import com.yahoo.vespa.athenz.identity.ServiceIdentityProvider; import com.yahoo.vespa.hosted.controller.api.integration.athenz.AthenzClientFactory; import com.yahoo.vespa.hosted.controller.athenz.config.AthenzConfig; -import org.apache.http.client.methods.HttpUriRequest; import java.net.URI; -import java.util.HashMap; -import java.util.Map; /** * @author bjorncs */ public class AthenzClientFactoryImpl implements AthenzClientFactory { - private static final String METRIC_NAME = "athenz.request.error"; - private static final String ATHENZ_SERVICE_DIMENSION = "athenzService"; - private static final String EXCEPTION_DIMENSION = "exception"; - private final AthenzConfig config; private final ServiceIdentityProvider identityProvider; - private final Metric metrics; - private final Map<String, Metric.Context> metricContexts; @Inject - public AthenzClientFactoryImpl(ServiceIdentityProvider identityProvider, AthenzConfig config, Metric metrics) { + public AthenzClientFactoryImpl(ServiceIdentityProvider identityProvider, AthenzConfig config) { this.identityProvider = identityProvider; this.config = config; - this.metrics = metrics; - this.metricContexts = new HashMap<>(); } @Override @@ -49,7 +37,7 @@ public class AthenzClientFactoryImpl implements AthenzClientFactory { */ @Override public ZmsClient createZmsClient() { - return new DefaultZmsClient(URI.create(config.zmsUrl()), identityProvider, this::reportMetricErrorHandler); + return new DefaultZmsClient(URI.create(config.zmsUrl()), identityProvider); } /** @@ -57,7 +45,7 @@ public class AthenzClientFactoryImpl implements AthenzClientFactory { */ @Override public ZtsClient createZtsClient() { - return new DefaultZtsClient.Builder(URI.create(config.ztsUrl())).withIdentityProvider(identityProvider).build(); + return new DefaultZtsClient(URI.create(config.ztsUrl()), identityProvider); } @Override @@ -65,11 +53,4 @@ public class AthenzClientFactoryImpl implements AthenzClientFactory { return true; } - private void reportMetricErrorHandler(HttpUriRequest request, Exception error) { - String hostname = request.getURI().getHost(); - Metric.Context context = metricContexts.computeIfAbsent(hostname, host -> metrics.createContext( - Map.of(ATHENZ_SERVICE_DIMENSION, host, - EXCEPTION_DIMENSION, error.getClass().getSimpleName()))); - metrics.add(METRIC_NAME, 1, context); - } } diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/CloudEventReporter.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/CloudEventReporter.java index 402a82a9ca1..2e62f8e54df 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/CloudEventReporter.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/CloudEventReporter.java @@ -96,6 +96,8 @@ public class CloudEventReporter extends ControllerMaintainer { } private void submitIssue(CloudEvent event) { + if (controller().system().isPublic()) + return; Issue issue = eventFetcher.createIssue(event); if (!issueHandler.issueExists(issue)) { issueHandler.file(issue); diff --git a/document/src/main/java/com/yahoo/document/fieldset/FieldSetRepo.java b/document/src/main/java/com/yahoo/document/fieldset/FieldSetRepo.java index 285c78ef0ab..905376e81ca 100644 --- a/document/src/main/java/com/yahoo/document/fieldset/FieldSetRepo.java +++ b/document/src/main/java/com/yahoo/document/fieldset/FieldSetRepo.java @@ -34,21 +34,24 @@ public class FieldSetRepo { FieldSet parseFieldCollection(DocumentTypeManager docMan, String docType, String fieldNames) { DocumentType type = docMan.getDocumentType(docType); if (type == null) { - throw new IllegalArgumentException("Unknown document type " + docType); + throw new IllegalArgumentException("Unknown document type " + docType); } - StringTokenizer tokenizer = new StringTokenizer(fieldNames, ","); FieldCollection collection = new FieldCollection(type); - - while (tokenizer.hasMoreTokens()) { - String token = tokenizer.nextToken(); - Field f = type.getField(token); - if (f == null) { - throw new IllegalArgumentException("No such field " + token); + if (fieldNames.equals("[document]")) { + collection.addAll(type.fieldSet()); + } + else { + StringTokenizer tokenizer = new StringTokenizer(fieldNames, ","); + while (tokenizer.hasMoreTokens()) { + String token = tokenizer.nextToken(); + Field f = type.getField(token); + if (f == null) { + throw new IllegalArgumentException("No such field " + token); + } + collection.add(f); } - collection.add(f); } - return collection; } diff --git a/document/src/vespa/document/base/globalid.cpp b/document/src/vespa/document/base/globalid.cpp index bdf81d1c3c8..4f96ac0e659 100644 --- a/document/src/vespa/document/base/globalid.cpp +++ b/document/src/vespa/document/base/globalid.cpp @@ -39,39 +39,6 @@ getHexVal(char c) namespace document { -bool -GlobalId::BucketOrderCmp::operator()(const GlobalId &lhs, const GlobalId &rhs) const -{ - const unsigned char * __restrict__ a = lhs._gid._buffer; - const unsigned char * __restrict__ b = rhs._gid._buffer; - int diff; - if ((diff = compare(a[0], b[0])) != 0) { - return diff < 0; - } - if ((diff = compare(a[1], b[1])) != 0) { - return diff < 0; - } - if ((diff = compare(a[2], b[2])) != 0) { - return diff < 0; - } - if ((diff = compare(a[3], b[3])) != 0) { - return diff < 0; - } - if ((diff = compare(a[8], b[8])) != 0) { - return diff < 0; - } - if ((diff = compare(a[9], b[9])) != 0) { - return diff < 0; - } - if ((diff = compare(a[10], b[10])) != 0) { - return diff < 0; - } - if ((diff = compare(a[11], b[11])) != 0) { - return diff < 0; - } - return lhs < rhs; -} - vespalib::string GlobalId::toString() const { vespalib::asciistream out; out << "gid(0x" << vespalib::hex; diff --git a/document/src/vespa/document/base/globalid.h b/document/src/vespa/document/base/globalid.h index 1bb257b7cbf..ac30e189033 100644 --- a/document/src/vespa/document/base/globalid.h +++ b/document/src/vespa/document/base/globalid.h @@ -64,7 +64,24 @@ public: * given bucket. */ struct BucketOrderCmp { - bool operator()(const GlobalId &lhs, const GlobalId &rhs) const; + bool operator()(const GlobalId &lhs, const GlobalId &rhs) const { + const uint32_t * __restrict__ a = lhs._gid._nums; + const uint32_t * __restrict__ b = rhs._gid._nums; + if (a[0] != b[0]) { + return bitswap(a[0]) < bitswap(b[0]); + } + if (a[2] != b[2]) { + return bitswap(a[2]) < bitswap(b[2]); + } + return __builtin_bswap32(a[1]) < __builtin_bswap32(b[1]); + } + static uint32_t bitswap(uint32_t value) { + value = ((value & 0x55555555) << 1) | ((value & 0xaaaaaaaa) >> 1); + value = ((value & 0x33333333) << 2) | ((value & 0xcccccccc) >> 2); + value = ((value & 0x0f0f0f0f) << 4) | ((value & 0xf0f0f0f0) >> 4); + return __builtin_bswap32(value); + } + //These 2 compare methods are exposed only for testing static int compareRaw(unsigned char a, unsigned char b) { return a - b; diff --git a/documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java b/documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java index 0dd96275f9d..e98be6871b4 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java @@ -26,8 +26,8 @@ import java.util.logging.Logger; */ public class VisitorIterator { - private ProgressToken progressToken; - private BucketSource bucketSource; + private final ProgressToken progressToken; + private final BucketSource bucketSource; private int distributionBitCount; private static final Logger log = Logger.getLogger(VisitorIterator.class.getName()); diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java index e0ae0278de8..d332b1fb1ca 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java @@ -28,7 +28,7 @@ import java.util.concurrent.atomic.AtomicReference; /** * Local visitor session that copies and iterates through all items in the local document access. - * Each document must be ack'ed for the session to be done visiting. + * Each document must be ack'ed for the session to be done visiting, unless the destination is remote. * Only document puts are sent by this session, and this is done from a separate thread. * * @author jonmv @@ -44,20 +44,23 @@ public class LocalVisitorSession implements VisitorSession { private final FieldSet fieldSet; private final AtomicReference<State> state; private final AtomicReference<Phaser> phaser; + private final ProgressToken token; public LocalVisitorSession(LocalDocumentAccess access, VisitorParameters parameters) throws ParseException { - if (parameters.getResumeToken() != null) - throw new UnsupportedOperationException("Continuation via progress tokens is not supported"); - - if (parameters.getRemoteDataHandler() != null) - throw new UnsupportedOperationException("Remote data handlers are not supported"); - this.selector = new DocumentSelector(parameters.getDocumentSelection()); this.fieldSet = new FieldSetRepo().parse(access.getDocumentTypeManager(), parameters.fieldSet()); - - this.data = parameters.getLocalDataHandler() == null ? new VisitorDataQueue() : parameters.getLocalDataHandler(); - this.data.reset(); - this.data.setSession(this); + this.token = parameters.getResumeToken(); + + if (parameters.getRemoteDataHandler() == null) { + this.data = parameters.getLocalDataHandler() == null ? new VisitorDataQueue() : parameters.getLocalDataHandler(); + this.data.reset(); + this.data.setSession(this); + } + else { + if (parameters.getLocalDataHandler() != null) + throw new IllegalArgumentException("Cannot have both a remote and a local data handler"); + this.data = null; + } this.control = parameters.getControlHandler() == null ? new VisitorControlHandler() : parameters.getControlHandler(); this.control.reset(); @@ -98,8 +101,11 @@ public class LocalVisitorSession implements VisitorSession { if (synchronizer != null) synchronizer.arriveAndAwaitAdvance(); - data.onMessage(new PutDocumentMessage(new DocumentPut(copy)), - new AckToken(id)); + if (data != null) + data.onMessage(new PutDocumentMessage(new DocumentPut(copy)), + new AckToken(id)); + else + outstanding.remove(id); if (synchronizer != null) synchronizer.arriveAndAwaitAdvance(); @@ -127,9 +133,9 @@ public class LocalVisitorSession implements VisitorSession { } finally { if (synchronizer != null) - synchronizer.arriveAndDeregister(); - - data.onDone(); + synchronizer.awaitAdvance(synchronizer.arriveAndDeregister()); + if (data != null) + data.onDone(); } }).start(); } @@ -140,9 +146,10 @@ public class LocalVisitorSession implements VisitorSession { && control.isDone(); // Control handler has been notified } + /** Returns the token set in the parameters used to create this. */ @Override public ProgressToken getProgress() { - throw new UnsupportedOperationException("Progress tokens are not supported"); + return token; } @Override @@ -179,6 +186,12 @@ public class LocalVisitorSession implements VisitorSession { @Override public void destroy() { abort(); + try { + control.waitUntilDone(0); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } } } diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java index e1d18080faf..257d491ea93 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java @@ -1166,7 +1166,7 @@ public class MessageBusVisitorSession implements VisitorSession { } } } catch (InterruptedException e) { - e.printStackTrace(); + log.log(Level.WARNING, "Interrupted waiting for visitor session to be destroyed"); } finally { try { sender.destroy(); diff --git a/eval/src/vespa/eval/eval/fast_sparse_map.cpp b/eval/src/vespa/eval/eval/fast_sparse_map.cpp index 3215b5f8995..2e95934286c 100644 --- a/eval/src/vespa/eval/eval/fast_sparse_map.cpp +++ b/eval/src/vespa/eval/eval/fast_sparse_map.cpp @@ -7,6 +7,8 @@ namespace vespalib::eval { FastSparseMap::~FastSparseMap() = default; +const FastSparseMap::HashedLabel FastSparseMap::empty_label; + } VESPALIB_HASH_MAP_INSTANTIATE_H_E(vespalib::eval::FastSparseMap::Key, uint32_t, vespalib::eval::FastSparseMap::Hash, vespalib::eval::FastSparseMap::Equal); diff --git a/eval/src/vespa/eval/eval/fast_sparse_map.h b/eval/src/vespa/eval/eval/fast_sparse_map.h index 19c171cfed8..0d7597a19a0 100644 --- a/eval/src/vespa/eval/eval/fast_sparse_map.h +++ b/eval/src/vespa/eval/eval/fast_sparse_map.h @@ -58,6 +58,8 @@ public: HashedLabel(const vespalib::stringref *str) : label(*str), hash(hash_label(*str)) {} }; + static const HashedLabel empty_label; + static uint64_t hash_label(const HashedLabel &label) { return label.hash; } diff --git a/eval/src/vespa/eval/eval/fast_value.hpp b/eval/src/vespa/eval/eval/fast_value.hpp index 559f2e0a7f6..932924cd409 100644 --- a/eval/src/vespa/eval/eval/fast_value.hpp +++ b/eval/src/vespa/eval/eval/fast_value.hpp @@ -155,17 +155,17 @@ struct FastValueIndex final : Value::Index { ConstArrayRef<LCT> lhs_cells, ConstArrayRef<RCT> rhs_cells, Stash &stash); template <typename LCT, typename RCT, typename OCT, typename Fun> - static const Value &sparse_only_merge(const ValueType &res_type, const Fun &fun, - const FastValueIndex &lhs, const FastValueIndex &rhs, - ConstArrayRef<LCT> lhs_cells, ConstArrayRef<RCT> rhs_cells, - Stash &stash) __attribute((noinline)); - - template <typename LCT, typename RCT, typename OCT, typename Fun> static const Value &sparse_no_overlap_join(const ValueType &res_type, const Fun &fun, const FastValueIndex &lhs, const FastValueIndex &rhs, const std::vector<JoinAddrSource> &addr_sources, ConstArrayRef<LCT> lhs_cells, ConstArrayRef<RCT> rhs_cells, Stash &stash); + template <typename LCT, typename RCT, typename OCT, typename Fun> + static const Value &sparse_only_merge(const ValueType &res_type, const Fun &fun, + const FastValueIndex &lhs, const FastValueIndex &rhs, + ConstArrayRef<LCT> lhs_cells, ConstArrayRef<RCT> rhs_cells, + Stash &stash) __attribute((noinline)); + size_t size() const override { return map.size(); } std::unique_ptr<View> create_view(const std::vector<size_t> &dims) const override; }; @@ -333,6 +333,59 @@ FastValueIndex::sparse_full_overlap_join(const ValueType &res_type, const Fun &f template <typename LCT, typename RCT, typename OCT, typename Fun> const Value & +FastValueIndex::sparse_no_overlap_join(const ValueType &res_type, const Fun &fun, + const FastValueIndex &lhs, const FastValueIndex &rhs, + const std::vector<JoinAddrSource> &addr_sources, + ConstArrayRef<LCT> lhs_cells, ConstArrayRef<RCT> rhs_cells, Stash &stash) +{ + using HashedLabelRef = std::reference_wrapper<const FastSparseMap::HashedLabel>; + size_t num_mapped_dims = addr_sources.size(); + auto &result = stash.create<FastValue<OCT>>(res_type, num_mapped_dims, 1, lhs.map.size()*rhs.map.size()); + std::vector<HashedLabelRef> output_addr(num_mapped_dims, FastSparseMap::empty_label); + std::vector<size_t> store_lhs_idx; + std::vector<size_t> store_rhs_idx; + size_t out_idx = 0; + for (JoinAddrSource source : addr_sources) { + switch (source) { + case JoinAddrSource::LHS: + store_lhs_idx.push_back(out_idx++); + break; + case JoinAddrSource::RHS: + store_rhs_idx.push_back(out_idx++); + break; + default: + abort(); + } + } + assert(out_idx == output_addr.size()); + for (size_t lhs_subspace = 0; lhs_subspace < lhs.map.size(); ++lhs_subspace) { + auto l_addr = lhs.map.make_addr(lhs_subspace); + assert(l_addr.size() == store_lhs_idx.size()); + for (size_t i = 0; i < store_lhs_idx.size(); ++i) { + size_t addr_idx = store_lhs_idx[i]; + output_addr[addr_idx] = l_addr[i]; + } + for (size_t rhs_subspace = 0; rhs_subspace < rhs.map.size(); ++rhs_subspace) { + auto r_addr = rhs.map.make_addr(rhs_subspace); + assert(r_addr.size() == store_rhs_idx.size()); + for (size_t i = 0; i < store_rhs_idx.size(); ++i) { + size_t addr_idx = store_rhs_idx[i]; + output_addr[addr_idx] = r_addr[i]; + } + auto idx = result.my_index.map.add_mapping(ConstArrayRef(output_addr)); + if (__builtin_expect((idx == result.my_cells.size), true)) { + auto cell_value = fun(lhs_cells[lhs_subspace], rhs_cells[rhs_subspace]); + result.my_cells.push_back_fast(cell_value); + } + } + } + return result; +} + +//----------------------------------------------------------------------------- + +template <typename LCT, typename RCT, typename OCT, typename Fun> +const Value & FastValueIndex::sparse_only_merge(const ValueType &res_type, const Fun &fun, const FastValueIndex &lhs, const FastValueIndex &rhs, ConstArrayRef<LCT> lhs_cells, ConstArrayRef<RCT> rhs_cells, Stash &stash) @@ -365,47 +418,4 @@ FastValueIndex::sparse_only_merge(const ValueType &res_type, const Fun &fun, return result; } -//----------------------------------------------------------------------------- - -template <typename LCT, typename RCT, typename OCT, typename Fun> -const Value & -FastValueIndex::sparse_no_overlap_join(const ValueType &res_type, const Fun &fun, - const FastValueIndex &lhs, const FastValueIndex &rhs, - const std::vector<JoinAddrSource> &addr_sources, - ConstArrayRef<LCT> lhs_cells, ConstArrayRef<RCT> rhs_cells, Stash &stash) -{ - using HashedLabelRef = std::reference_wrapper<const FastSparseMap::HashedLabel>; - auto &result = stash.create<FastValue<OCT>>(res_type, res_type.count_mapped_dimensions(), 1, lhs.map.size()*rhs.map.size()); - std::vector<HashedLabelRef> output_addr; - for (size_t lhs_subspace = 0; lhs_subspace < lhs.map.size(); ++lhs_subspace) { - auto l_addr = lhs.map.make_addr(lhs_subspace); - for (size_t rhs_subspace = 0; rhs_subspace < rhs.map.size(); ++rhs_subspace) { - auto r_addr = rhs.map.make_addr(rhs_subspace); - output_addr.clear(); - size_t l_idx = 0; - size_t r_idx = 0; - for (JoinAddrSource source : addr_sources) { - switch (source) { - case JoinAddrSource::LHS: - output_addr.push_back(l_addr[l_idx++]); - break; - case JoinAddrSource::RHS: - output_addr.push_back(r_addr[r_idx++]); - break; - default: - abort(); - } - } - assert(l_idx == l_addr.size()); - assert(r_idx == r_addr.size()); - auto idx = result.my_index.map.add_mapping(ConstArrayRef(output_addr)); - if (__builtin_expect((idx == result.my_cells.size), true)) { - auto cell_value = fun(lhs_cells[lhs_subspace], rhs_cells[rhs_subspace]); - result.my_cells.push_back_fast(cell_value); - } - } - } - return result; -} - } diff --git a/jaxrs_client_utils/src/main/java/ai/vespa/util/http/VespaClientBuilderFactory.java b/jaxrs_client_utils/src/main/java/ai/vespa/util/http/VespaClientBuilderFactory.java index 2bac7f66799..c6afa889041 100644 --- a/jaxrs_client_utils/src/main/java/ai/vespa/util/http/VespaClientBuilderFactory.java +++ b/jaxrs_client_utils/src/main/java/ai/vespa/util/http/VespaClientBuilderFactory.java @@ -27,8 +27,10 @@ import static java.util.logging.Level.CONFIG; * - hostname verification is not enabled - CN/SAN verification is assumed to be handled by the underlying x509 trust manager. * - ssl context or hostname verifier must not be overridden by the caller * + * @deprecated Use Apache httpclient based client factory instead (VespaHttpClientBuilder). * @author bjorncs */ +@Deprecated(forRemoval = true) public class VespaClientBuilderFactory implements AutoCloseable { private static final Logger log = Logger.getLogger(VespaClientBuilderFactory.class.getName()); diff --git a/jaxrs_client_utils/src/main/java/com/yahoo/vespa/jaxrs/client/VespaJerseyJaxRsClientFactory.java b/jaxrs_client_utils/src/main/java/com/yahoo/vespa/jaxrs/client/VespaJerseyJaxRsClientFactory.java index bdc89d737d4..6d1c1c71f21 100644 --- a/jaxrs_client_utils/src/main/java/com/yahoo/vespa/jaxrs/client/VespaJerseyJaxRsClientFactory.java +++ b/jaxrs_client_utils/src/main/java/com/yahoo/vespa/jaxrs/client/VespaJerseyJaxRsClientFactory.java @@ -17,10 +17,13 @@ import java.util.List; /** * Factory for creating Jersey based Vespa clients from a JAX-RS resource interface. * + * @deprecated Use Apache httpclient based client factory instead (VespaHttpClientBuilder). * @author bjorncs */ +@Deprecated(forRemoval = true) public class VespaJerseyJaxRsClientFactory implements JaxRsClientFactory, AutoCloseable { + @SuppressWarnings("removal") private final VespaClientBuilderFactory clientBuilder = new VespaClientBuilderFactory(); // Client is a heavy-weight object with a finalizer so we create only one and re-use it private final Client client; diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/identity/AthenzCredentialsMaintainer.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/identity/AthenzCredentialsMaintainer.java index 0a29da57220..d6c08a820cd 100644 --- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/identity/AthenzCredentialsMaintainer.java +++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/identity/AthenzCredentialsMaintainer.java @@ -198,7 +198,7 @@ public class AthenzCredentialsMaintainer implements CredentialsMaintainer { HostnameVerifier ztsHostNameVerifier = useInternalZts ? new AthenzIdentityVerifier(Set.of(configserverIdentity)) : null; - try (ZtsClient ztsClient = new DefaultZtsClient.Builder(ztsEndpoint).withIdentityProvider(hostIdentityProvider).withHostnameVerifier(ztsHostNameVerifier).build()) { + try (ZtsClient ztsClient = new DefaultZtsClient(ztsEndpoint, hostIdentityProvider, ztsHostNameVerifier)) { InstanceIdentity instanceIdentity = ztsClient.registerInstance( configserverIdentity, @@ -227,7 +227,7 @@ public class AthenzCredentialsMaintainer implements CredentialsMaintainer { HostnameVerifier ztsHostNameVerifier = useInternalZts ? new AthenzIdentityVerifier(Set.of(configserverIdentity)) : null; - try (ZtsClient ztsClient = new DefaultZtsClient.Builder(ztsEndpoint).withSslContext(containerIdentitySslContext).withHostnameVerifier(ztsHostNameVerifier).build()) { + try (ZtsClient ztsClient = new DefaultZtsClient(ztsEndpoint, containerIdentitySslContext, ztsHostNameVerifier)) { InstanceIdentity instanceIdentity = ztsClient.refreshInstance( configserverIdentity, diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/QuestMetricsDb.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/QuestMetricsDb.java index 070bf98bf87..163ee74e8f1 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/QuestMetricsDb.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/QuestMetricsDb.java @@ -114,18 +114,23 @@ public class QuestMetricsDb implements MetricsDb { // Since we remove full days at once we need to keep at least the scaling window + 1 day Instant oldestToKeep = clock.instant().minus(Autoscaler.maxScalingWindow().plus(Duration.ofDays(1))); SqlExecutionContext context = newContext(); + int partitions = 0; try (SqlCompiler compiler = new SqlCompiler(engine)) { File tableRoot = new File(dataDir, tableName); List<String> removeList = new ArrayList<>(); for (String dirEntry : tableRoot.list()) { File partitionDir = new File(tableRoot, dirEntry); if ( ! partitionDir.isDirectory()) continue; + + partitions++; DateTimeFormatter formatter = DateTimeFormatter.ISO_DATE_TIME.withZone(ZoneId.of("UTC")); Instant partitionDay = Instant.from(formatter.parse(dirEntry + "T00:00:00")); if (partitionDay.isBefore(oldestToKeep)) removeList.add(dirEntry); + } - if ( ! removeList.isEmpty()) + // Remove unless all partitions are old: Removing all partitions "will be supported in the future" + if ( removeList.size() < partitions && ! removeList.isEmpty()) compiler.compile("alter table " + tableName + " drop partition " + removeList.stream().map(dir -> "'" + dir + "'").collect(Collectors.joining(",")), context); diff --git a/orchestrator/src/main/java/com/yahoo/vespa/orchestrator/controller/RetryingClusterControllerClientFactory.java b/orchestrator/src/main/java/com/yahoo/vespa/orchestrator/controller/RetryingClusterControllerClientFactory.java index 4b82f278f23..e2e769f8556 100644 --- a/orchestrator/src/main/java/com/yahoo/vespa/orchestrator/controller/RetryingClusterControllerClientFactory.java +++ b/orchestrator/src/main/java/com/yahoo/vespa/orchestrator/controller/RetryingClusterControllerClientFactory.java @@ -14,6 +14,7 @@ import java.util.List; /** * @author bakksjo */ +@SuppressWarnings("removal") // VespaJerseyJaxRsClientFactory public class RetryingClusterControllerClientFactory extends AbstractComponent implements ClusterControllerClientFactory { // TODO: Figure this port out dynamically. diff --git a/orchestrator/src/test/java/com/yahoo/vespa/orchestrator/controller/RetryingClusterControllerClientFactoryTest.java b/orchestrator/src/test/java/com/yahoo/vespa/orchestrator/controller/RetryingClusterControllerClientFactoryTest.java index 95fdd61563b..309d6a756f6 100644 --- a/orchestrator/src/test/java/com/yahoo/vespa/orchestrator/controller/RetryingClusterControllerClientFactoryTest.java +++ b/orchestrator/src/test/java/com/yahoo/vespa/orchestrator/controller/RetryingClusterControllerClientFactoryTest.java @@ -27,6 +27,7 @@ public class RetryingClusterControllerClientFactoryTest { private final Clock clock = new ManualClock(); @Test + @SuppressWarnings("removal") // VespaJerseyJaxRsClientFactory public void verifyJerseyCallForSetNodeState() throws IOException { VespaJerseyJaxRsClientFactory clientFactory = mock(VespaJerseyJaxRsClientFactory.class); ClusterControllerJaxRsApi api = mock(ClusterControllerJaxRsApi.class); diff --git a/searchcore/src/apps/vespa-feed-bm/runtest.sh b/searchcore/src/apps/vespa-feed-bm/runtest.sh new file mode 100755 index 00000000000..c859fe723b6 --- /dev/null +++ b/searchcore/src/apps/vespa-feed-bm/runtest.sh @@ -0,0 +1,32 @@ +#!/bin/bash + +numdocs=2000000 +stripe_bits=8 + +base_cmd="numactl --cpunodebind=0 --localalloc perf stat -ddd env LD_PRELOAD=$HOME/vespa/lib64/vespa/malloc/libvespamalloc.so ./vespa-feed-bm --documents $numdocs --put-passes 1 --update-passes 10 --remove-passes 0 --max-pending 8000 --indexing-sequencer throughput" + +spi_only="$base_cmd --client-threads 1" +base_for_rest="$base_cmd --client-threads 2 --response-threads 3" + +chain_base="$base_for_rest --use-storage-chain " +chain_stripe="$chain_base --bucket-db-stripe-bits $stripe_bits" +chain_stripe_async="$chain_stripe --use-async-message-handling" +service_layer="$base_for_rest --enable-service-layer --bucket-db-stripe-bits $stripe_bits --use-async-message-handling" +service_layer_rpc="$service_layer --rpc-network-threads 5 --rpc-targets-per-node 8" +service_layer_mbus="$service_layer --use-message-bus" +distributor="$service_layer_rpc --enable-distributor" + +echo "Running test: spi_only" +$spi_only +#echo "Running test: chain_base" +#$chain_base +echo "Running test: chain_stripe" +$chain_stripe +echo "Running test: chain_stripe_async" +$chain_stripe_async +echo "Running test: service_layer_rpc" +$service_layer_rpc +#echo "Running test: service_layer_mbus" +#$service_layer_mbus +echo "Running test: distributor" +$distributor diff --git a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp index 7082b9bfef2..a55638651d6 100644 --- a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp +++ b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp @@ -280,7 +280,6 @@ class BMParams { bool _use_document_api; bool _use_message_bus; bool _use_storage_chain; - bool _use_legacy_bucket_db; bool _use_async_message_handling_on_schedule; uint32_t _bucket_db_stripe_bits; uint32_t get_start(uint32_t thread_id) const { @@ -305,7 +304,6 @@ public: _use_document_api(false), _use_message_bus(false), _use_storage_chain(false), - _use_legacy_bucket_db(false), _use_async_message_handling_on_schedule(false), _bucket_db_stripe_bits(0) { @@ -329,7 +327,6 @@ public: bool get_use_document_api() const { return _use_document_api; } bool get_use_message_bus() const { return _use_message_bus; } bool get_use_storage_chain() const { return _use_storage_chain; } - bool get_use_legacy_bucket_db() const { return _use_legacy_bucket_db; } bool get_use_async_message_handling_on_schedule() const { return _use_async_message_handling_on_schedule; } uint32_t get_bucket_db_stripe_bits() const { return _bucket_db_stripe_bits; } void set_documents(uint32_t documents_in) { _documents = documents_in; } @@ -349,7 +346,6 @@ public: void set_use_document_api(bool value) { _use_document_api = value; } void set_use_message_bus(bool value) { _use_message_bus = value; } void set_use_storage_chain(bool value) { _use_storage_chain = value; } - void set_use_legacy_bucket_db(bool value) { _use_legacy_bucket_db = value; } void set_use_async_message_handling_on_schedule(bool value) { _use_async_message_handling_on_schedule = value; } void set_bucket_db_stripe_bits(uint32_t value) { _bucket_db_stripe_bits = value; } bool check() const; @@ -490,7 +486,6 @@ struct MyStorageConfig dc.readyCopies = 1; } stor_server.isDistributor = distributor; - stor_server.useContentNodeBtreeBucketDb = !params.get_use_legacy_bucket_db(); stor_server.contentNodeBucketDbStripeBits = params.get_bucket_db_stripe_bits(); if (distributor) { stor_server.rootFolder = "distributor"; @@ -1388,7 +1383,6 @@ App::usage() "[--enable-service-layer]\n" "[--skip-get-spi-bucket-info]\n" "[--use-document-api]\n" - "[--use-legacy-bucket-db]\n" "[--use-async-message-handling]\n" "[--use-message-bus\n" "[--use-storage-chain]" << std::endl; @@ -1418,7 +1412,6 @@ App::get_options() { "update-passes", 1, nullptr, 0 }, { "use-async-message-handling", 0, nullptr, 0 }, { "use-document-api", 0, nullptr, 0 }, - { "use-legacy-bucket-db", 0, nullptr, 0 }, { "use-message-bus", 0, nullptr, 0 }, { "use-storage-chain", 0, nullptr, 0 } }; @@ -1440,7 +1433,6 @@ App::get_options() LONGOPT_UPDATE_PASSES, LONGOPT_USE_ASYNC_MESSAGE_HANDLING, LONGOPT_USE_DOCUMENT_API, - LONGOPT_USE_LEGACY_BUCKET_DB, LONGOPT_USE_MESSAGE_BUS, LONGOPT_USE_STORAGE_CHAIN }; @@ -1501,9 +1493,6 @@ App::get_options() case LONGOPT_USE_DOCUMENT_API: _bm_params.set_use_document_api(true); break; - case LONGOPT_USE_LEGACY_BUCKET_DB: - _bm_params.set_use_legacy_bucket_db(true); - break; case LONGOPT_USE_MESSAGE_BUS: _bm_params.set_use_message_bus(true); break; diff --git a/searchcore/src/tests/proton/bucketdb/bucketdb/bucketdb_test.cpp b/searchcore/src/tests/proton/bucketdb/bucketdb/bucketdb_test.cpp index 998b1626aeb..aff5573e4e0 100644 --- a/searchcore/src/tests/proton/bucketdb/bucketdb/bucketdb_test.cpp +++ b/searchcore/src/tests/proton/bucketdb/bucketdb/bucketdb_test.cpp @@ -136,6 +136,10 @@ TEST_F("require that bucket checksum is a combination of sub db types", Fixture) EXPECT_EQUAL(zero, f.remove(TIME_3, SDT::REMOVED).getChecksum()); } +TEST("require that BucketState follows checksum type") { + EXPECT_EQUAL(48u, sizeof(BucketState)); +} + TEST_F("require that bucket is ready when not having docs in notready sub db", Fixture) { assertReady(true, f.get()); @@ -222,47 +226,47 @@ verifyChecksumCompliance(ChecksumAggregator::ChecksumType type) { GlobalId gid2("bbbbbbbbbbbb"); Timestamp t1(0); Timestamp t2(1); - auto ckaggr = ChecksumAggregator::create(type, BucketChecksum(0)); + BucketState::setChecksumType(type); + BucketState bs; - EXPECT_EQUAL(0u, ckaggr->getChecksum()); - ckaggr->addDoc(gid1, t1); - BucketChecksum afterAdd = ckaggr->getChecksum(); + EXPECT_EQUAL(0u, bs.getChecksum()); + bs.add(gid1, t1, 1, SubDbType::READY); + BucketChecksum afterAdd = bs.getChecksum(); EXPECT_NOT_EQUAL(0u, afterAdd); // add Changes checksum - ckaggr->removeDoc(gid1, t1); - EXPECT_EQUAL(0u, ckaggr->getChecksum()); // add/remove are symmetrical - ckaggr->addDoc(gid1, t2); - EXPECT_NOT_EQUAL(afterAdd, ckaggr->getChecksum()); // timestamp changes checksum - ckaggr->removeDoc(gid1, t2); - EXPECT_EQUAL(0u, ckaggr->getChecksum()); // add/remove are symmetrical - ckaggr->addDoc(gid2, t1); - EXPECT_NOT_EQUAL(afterAdd, ckaggr->getChecksum()); // gid changes checksum - ckaggr->removeDoc(gid2, t1); - EXPECT_EQUAL(0u, ckaggr->getChecksum()); // add/remove are symmetrical + bs.remove(gid1, t1, 1, SubDbType::READY); + EXPECT_EQUAL(0u, bs.getChecksum()); // add/remove are symmetrical + bs.add(gid1, t2, 1, SubDbType::READY); + EXPECT_NOT_EQUAL(afterAdd, bs.getChecksum()); // timestamp changes checksum + bs.remove(gid1, t2, 1, SubDbType::READY); + EXPECT_EQUAL(0u, bs.getChecksum()); // add/remove are symmetrical + bs.add(gid2, t1, 1, SubDbType::READY); + EXPECT_NOT_EQUAL(afterAdd, bs.getChecksum()); // gid changes checksum + bs.remove(gid2, t1, 1, SubDbType::READY); + EXPECT_EQUAL(0u, bs.getChecksum()); // add/remove are symmetrical { // Verify order does not matter, only current content. A,B == B,A - ckaggr->addDoc(gid1, t1); - BucketChecksum after1AddOfGid1 = ckaggr->getChecksum(); - ckaggr->addDoc(gid2, t2); - BucketChecksum after2Add1 = ckaggr->getChecksum(); - ckaggr->removeDoc(gid2, t2); - EXPECT_EQUAL(after1AddOfGid1, ckaggr->getChecksum()); - ckaggr->removeDoc(gid1, t1); - EXPECT_EQUAL(0u, ckaggr->getChecksum()); + bs.add(gid1, t1, 1, SubDbType::READY); + BucketChecksum after1AddOfGid1 = bs.getChecksum(); + bs.add(gid2, t2, 1, SubDbType::READY); + BucketChecksum after2Add1 = bs.getChecksum(); + bs.remove(gid2, t2, 1, SubDbType::READY); + EXPECT_EQUAL(after1AddOfGid1, bs.getChecksum()); + bs.remove(gid1, t1, 1, SubDbType::READY); + EXPECT_EQUAL(0u, bs.getChecksum()); - ckaggr->addDoc(gid2, t2); - EXPECT_NOT_EQUAL(after1AddOfGid1, ckaggr->getChecksum()); - ckaggr->addDoc(gid1, t1); - EXPECT_EQUAL(after2Add1, ckaggr->getChecksum()); - ckaggr->removeDoc(gid2, t2); - EXPECT_EQUAL(after1AddOfGid1, ckaggr->getChecksum()); - ckaggr->removeDoc(gid1, t1); - EXPECT_EQUAL(0u, ckaggr->getChecksum()); // add/remove are symmetrical + bs.add(gid2, t2, 1, SubDbType::READY); + EXPECT_NOT_EQUAL(after1AddOfGid1, bs.getChecksum()); + bs.add(gid1, t1, 1, SubDbType::READY); + EXPECT_EQUAL(after2Add1, bs.getChecksum()); + bs.remove(gid2, t2, 1, SubDbType::READY); + EXPECT_EQUAL(after1AddOfGid1, bs.getChecksum()); + bs.remove(gid1, t1, 1, SubDbType::READY); + EXPECT_EQUAL(0u, bs.getChecksum()); // add/remove are symmetrical } - ckaggr->addDoc(gid1, t1); // Add something so we can verify it does not change between releases. - - return ckaggr->getChecksum(); + bs.add(gid1, t1, 1, SubDbType::READY); // Add something so we can verify it does not change between releases. + return bs.getChecksum(); } TEST("test that legacy checksum complies") { diff --git a/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp b/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp index 54fe88050b5..b875ab8e058 100644 --- a/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp +++ b/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp @@ -355,7 +355,7 @@ struct MyAttributeWriter : public IAttributeWriter _updateLid = lid; for (const auto & fieldUpdate : upd.getUpdates()) { search::AttributeVector * attr = getWritableAttribute(fieldUpdate.getField().getName()); - onUpdate.onUpdateField(fieldUpdate.getField(), attr); + onUpdate.onUpdateField(fieldUpdate.getField().getName(), attr); } } void update(SerialNum serialNum, const document::Document &doc, DocumentIdT lid, OnWriteDoneType) override { diff --git a/searchcore/src/tests/proton/documentmetastore/documentmetastore_test.cpp b/searchcore/src/tests/proton/documentmetastore/documentmetastore_test.cpp index 60d96e37e15..db0f26a9d55 100644 --- a/searchcore/src/tests/proton/documentmetastore/documentmetastore_test.cpp +++ b/searchcore/src/tests/proton/documentmetastore/documentmetastore_test.cpp @@ -759,7 +759,6 @@ TEST(DocumentMetaStoreTest, can_sort_gids) } } -template <typename ChecksumType> void requireThatBasicBucketInfoWorks() { @@ -773,8 +772,7 @@ requireThatBasicBucketInfoWorks() GlobalId gid = createGid(lid); Timestamp timestamp(UINT64_C(123456789) * lid); Timestamp oldTimestamp; - BucketId bucketId(minNumBits, - gid.convertToBucketId().getRawId()); + BucketId bucketId(minNumBits, gid.convertToBucketId().getRawId()); uint32_t addLid = addGid(dms, gid, bucketId, timestamp); EXPECT_EQ(lid, addLid); m[std::make_pair(bucketId, gid)] = timestamp; @@ -783,37 +781,35 @@ requireThatBasicBucketInfoWorks() GlobalId gid = createGid(lid); Timestamp timestamp(UINT64_C(14735) * lid); Timestamp oldTimestamp; - BucketId bucketId(minNumBits, - gid.convertToBucketId().getRawId()); + BucketId bucketId(minNumBits, gid.convertToBucketId().getRawId()); uint32_t addLid = addGid(dms, gid, bucketId, timestamp); EXPECT_EQ(lid, addLid); m[std::make_pair(bucketId, gid)] = timestamp; } for (uint32_t lid = 3; lid <= numLids; lid += 5) { GlobalId gid = createGid(lid); - BucketId bucketId(minNumBits, - gid.convertToBucketId().getRawId()); + BucketId bucketId(minNumBits, gid.convertToBucketId().getRawId()); EXPECT_TRUE(dms.remove(lid, 0u)); dms.removeComplete(lid); m.erase(std::make_pair(bucketId, gid)); } assert(!m.empty()); - ChecksumType cksum(BucketChecksum(0)); + BucketState cksum; BucketId prevBucket = m.begin()->first.first; uint32_t cnt = 0u; uint32_t maxcnt = 0u; BucketDBOwner::Guard bucketDB = dms.getBucketDB().takeGuard(); for (Map::const_iterator i = m.begin(), ie = m.end(); i != ie; ++i) { if (i->first.first == prevBucket) { - cksum.addDoc(i->first.second, i->second); + cksum.add(i->first.second, i->second, 1, SubDbType::READY); ++cnt; } else { BucketInfo bi = bucketDB->get(prevBucket); EXPECT_EQ(cnt, bi.getDocumentCount()); EXPECT_EQ(cksum.getChecksum(), bi.getChecksum()); prevBucket = i->first.first; - cksum = ChecksumType(BucketChecksum(0)); - cksum.addDoc(i->first.second, i->second); + cksum = BucketState(); + cksum.add(i->first.second, i->second, 1, SubDbType::READY); maxcnt = std::max(maxcnt, cnt); cnt = 1u; } @@ -828,9 +824,9 @@ requireThatBasicBucketInfoWorks() TEST(DocumentMetaStoreTest, basic_bucket_info_works) { BucketState::setChecksumType(BucketState::ChecksumType::LEGACY); - requireThatBasicBucketInfoWorks<LegacyChecksumAggregator>(); + requireThatBasicBucketInfoWorks(); BucketState::setChecksumType(BucketState::ChecksumType::XXHASH64); - requireThatBasicBucketInfoWorks<XXH64ChecksumAggregator>(); + requireThatBasicBucketInfoWorks(); } TEST(DocumentMetaStoreTest, can_retrieve_list_of_lids_from_bucket_id) diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp index ee7f9d0c851..42971fe3d4c 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp +++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp @@ -680,7 +680,7 @@ AttributeWriter::update(SerialNum serialNum, const DocumentUpdate &upd, Document LOG(debug, "Retrieving guard for attribute vector '%s'.", fupd.getField().getName().data()); auto found = _attrMap.find(fupd.getField().getName()); AttributeVector * attrp = (found != _attrMap.end()) ? found->second.first : nullptr; - onUpdate.onUpdateField(fupd.getField(), attrp); + onUpdate.onUpdateField(fupd.getField().getName(), attrp); if (__builtin_expect(attrp == nullptr, false)) { LOG(spam, "Failed to find attribute vector %s", fupd.getField().getName().data()); continue; diff --git a/searchcore/src/vespa/searchcore/proton/attribute/ifieldupdatecallback.h b/searchcore/src/vespa/searchcore/proton/attribute/ifieldupdatecallback.h index c7453d618ff..ffb8555cd2c 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/ifieldupdatecallback.h +++ b/searchcore/src/vespa/searchcore/proton/attribute/ifieldupdatecallback.h @@ -5,17 +5,16 @@ #include <vespa/vespalib/stllike/string.h> namespace search { class AttributeVector; } -namespace document { class Field; } namespace proton { struct IFieldUpdateCallback { - virtual ~IFieldUpdateCallback() = default; - virtual void onUpdateField(const document::Field & field, const search::AttributeVector * attr) = 0; + virtual ~IFieldUpdateCallback() { } + virtual void onUpdateField(vespalib::stringref fieldName, const search::AttributeVector * attr) = 0; }; struct DummyFieldUpdateCallback : IFieldUpdateCallback { - void onUpdateField(const document::Field & , const search::AttributeVector *) override {} + void onUpdateField(vespalib::stringref, const search::AttributeVector *) override {} }; } diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/bucketdb/CMakeLists.txt index 375515938b3..6619f9e419d 100644 --- a/searchcore/src/vespa/searchcore/proton/bucketdb/CMakeLists.txt +++ b/searchcore/src/vespa/searchcore/proton/bucketdb/CMakeLists.txt @@ -8,7 +8,6 @@ vespa_add_library(searchcore_bucketdb STATIC bucketdbhandler.cpp bucketsessionbase.cpp bucketstate.cpp - checksumaggregator.cpp checksumaggregators.cpp joinbucketssession.cpp splitbucketsession.cpp diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/bucketstate.cpp b/searchcore/src/vespa/searchcore/proton/bucketdb/bucketstate.cpp index cea30680faf..0eb9c27d9cf 100644 --- a/searchcore/src/vespa/searchcore/proton/bucketdb/bucketstate.cpp +++ b/searchcore/src/vespa/searchcore/proton/bucketdb/bucketstate.cpp @@ -18,11 +18,6 @@ toIdx(SubDbType subDbType) { BucketState::ChecksumType BucketState::_checksumType = BucketState::ChecksumType::LEGACY; -std::unique_ptr<ChecksumAggregator> -BucketState::createChecksum(BucketChecksum seed) { - return ChecksumAggregator::create(_checksumType, seed); -} - void BucketState::setChecksumType(ChecksumType type) { _checksumType = type; @@ -34,11 +29,12 @@ BucketState::BucketState(BucketState && rhs) noexcept = default; BucketState & BucketState::operator=(BucketState && rhs) noexcept = default; BucketState::BucketState() - : _docCount(), + : _ch(), _docSizes(), - _checksum(createChecksum(BucketChecksum(0))), + _docCount(), _active(false) { + memset(&_ch, 0, sizeof(_ch)); for (uint32_t i = 0; i < COUNTS; ++i) { _docCount[i] = 0; _docSizes[i] = 0; @@ -46,8 +42,25 @@ BucketState::BucketState() } BucketState::BucketChecksum +BucketState::getChecksum() const { + switch (_checksumType) { + case ChecksumAggregator::ChecksumType::LEGACY: + return LegacyChecksumAggregator::get(_ch._legacy); + case ChecksumAggregator::ChecksumType::XXHASH64: + return XXH64ChecksumAggregator::get(_ch._xxh64); + } + abort(); +} + +BucketState::BucketChecksum BucketState::addChecksum(BucketChecksum a, BucketChecksum b) { - return createChecksum(a)->addChecksum(*createChecksum(b)).getChecksum(); + switch (_checksumType) { + case ChecksumAggregator::ChecksumType::LEGACY: + return LegacyChecksumAggregator::get(LegacyChecksumAggregator::add(b, a)); + case ChecksumAggregator::ChecksumType::XXHASH64: + return XXH64ChecksumAggregator::get(XXH64ChecksumAggregator::update(b, a)); + } + abort(); } void @@ -55,7 +68,14 @@ BucketState::add(const GlobalId &gid, const Timestamp ×tamp, uint32_t docSi { assert(subDbType < SubDbType::COUNT); if (subDbType != SubDbType::REMOVED) { - _checksum->addDoc(gid, timestamp); + switch (_checksumType) { + case ChecksumAggregator::ChecksumType::LEGACY: + _ch._legacy = LegacyChecksumAggregator::addDoc(gid, timestamp, _ch._legacy); + break; + case ChecksumAggregator::ChecksumType::XXHASH64: + _ch._xxh64 = XXH64ChecksumAggregator::update(gid, timestamp, _ch._xxh64); + break; + } } uint32_t subDbTypeIdx = toIdx(subDbType); ++_docCount[subDbTypeIdx]; @@ -70,7 +90,14 @@ BucketState::remove(const GlobalId &gid, const Timestamp ×tamp, uint32_t do assert(_docCount[subDbTypeIdx] > 0); assert(_docSizes[subDbTypeIdx] >= docSize); if (subDbType != SubDbType::REMOVED) { - _checksum->removeDoc(gid, timestamp); + switch (_checksumType) { + case ChecksumAggregator::ChecksumType::LEGACY: + _ch._legacy = LegacyChecksumAggregator::removeDoc(gid, timestamp, _ch._legacy); + break; + case ChecksumAggregator::ChecksumType::XXHASH64: + _ch._xxh64 = XXH64ChecksumAggregator::update(gid, timestamp, _ch._xxh64); + break; + } } --_docCount[subDbTypeIdx]; _docSizes[subDbTypeIdx] -= docSize; @@ -87,8 +114,16 @@ BucketState::modify(const GlobalId &gid, assert(_docCount[subDbTypeIdx] > 0); assert(_docSizes[subDbTypeIdx] >= oldDocSize); if (subDbType != SubDbType::REMOVED) { - _checksum->removeDoc(gid, oldTimestamp); - _checksum->addDoc(gid, newTimestamp); + switch (_checksumType) { + case ChecksumAggregator::ChecksumType::LEGACY: + _ch._legacy = LegacyChecksumAggregator::removeDoc(gid, oldTimestamp, _ch._legacy); + _ch._legacy = LegacyChecksumAggregator::addDoc(gid, newTimestamp, _ch._legacy); + break; + case ChecksumAggregator::ChecksumType::XXHASH64: + _ch._xxh64 = XXH64ChecksumAggregator::update(gid, oldTimestamp, _ch._xxh64); + _ch._xxh64 = XXH64ChecksumAggregator::update(gid, newTimestamp, _ch._xxh64); + break; + } } _docSizes[subDbTypeIdx] = _docSizes[subDbTypeIdx] + newDocSize - oldDocSize; } @@ -99,7 +134,7 @@ BucketState::empty() const if (getReadyCount() != 0 || getRemovedCount() != 0 || getNotReadyCount() != 0) return false; - assert(_checksum->empty()); + assert((_checksumType == ChecksumAggregator::ChecksumType::LEGACY) ? (_ch._legacy == 0) : (_ch._xxh64 == 0)); for (uint32_t i = 0; i < COUNTS; ++i) { assert(_docSizes[i] == 0); } @@ -115,7 +150,15 @@ BucketState::operator+=(const BucketState &rhs) for (uint32_t i = 0; i < COUNTS; ++i) { _docSizes[i] += rhs._docSizes[i]; } - _checksum->addChecksum(*rhs._checksum); + + switch (_checksumType) { + case ChecksumAggregator::ChecksumType::LEGACY: + _ch._legacy = LegacyChecksumAggregator::add(rhs._ch._legacy, _ch._legacy); + break; + case ChecksumAggregator::ChecksumType::XXHASH64: + _ch._xxh64 = XXH64ChecksumAggregator::update(rhs._ch._xxh64, _ch._xxh64); + break; + } return *this; } @@ -134,7 +177,14 @@ BucketState::operator-=(const BucketState &rhs) for (uint32_t i = 0; i < COUNTS; ++i) { _docSizes[i] -= rhs._docSizes[i]; } - _checksum->removeChecksum(*rhs._checksum); + switch (_checksumType) { + case ChecksumAggregator::ChecksumType::LEGACY: + _ch._legacy = LegacyChecksumAggregator::remove(rhs._ch._legacy, _ch._legacy); + break; + case ChecksumAggregator::ChecksumType::XXHASH64: + _ch._xxh64 = XXH64ChecksumAggregator::update(rhs._ch._xxh64, _ch._xxh64); + break; + } return *this; } diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/bucketstate.h b/searchcore/src/vespa/searchcore/proton/bucketdb/bucketstate.h index 8c390b288ae..7f16500fad0 100644 --- a/searchcore/src/vespa/searchcore/proton/bucketdb/bucketstate.h +++ b/searchcore/src/vespa/searchcore/proton/bucketdb/bucketstate.h @@ -26,9 +26,9 @@ private: static constexpr uint32_t REMOVED = static_cast<uint32_t>(SubDbType::REMOVED); static constexpr uint32_t NOTREADY = static_cast<uint32_t>(SubDbType::NOTREADY); static constexpr uint32_t COUNTS = static_cast<uint32_t>(SubDbType::COUNT); - uint32_t _docCount[COUNTS]; + union { uint32_t _legacy; uint64_t _xxh64;} _ch; size_t _docSizes[COUNTS]; - vespalib::CloneablePtr<ChecksumAggregator> _checksum; + uint32_t _docCount[COUNTS]; bool _active; static ChecksumType _checksumType; @@ -67,7 +67,7 @@ public: size_t getNotReadyDocSizes() const { return _docSizes[NOTREADY]; } uint32_t getDocumentCount() const { return getReadyCount() + getNotReadyCount(); } uint32_t getEntryCount() const { return getDocumentCount() + getRemovedCount(); } - BucketChecksum getChecksum() const { return _checksum->getChecksum(); } + BucketChecksum getChecksum() const; bool empty() const; BucketState &operator+=(const BucketState &rhs); BucketState &operator-=(const BucketState &rhs); diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregator.cpp b/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregator.cpp deleted file mode 100644 index 795d88ab106..00000000000 --- a/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregator.cpp +++ /dev/null @@ -1,18 +0,0 @@ -// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "checksumaggregators.h" - -namespace proton::bucketdb { - -std::unique_ptr<ChecksumAggregator> -ChecksumAggregator::create(ChecksumType type, BucketChecksum seed) { - switch (type) { - case ChecksumType::LEGACY: - return std::make_unique<LegacyChecksumAggregator>(seed); - case ChecksumType::XXHASH64: - return std::make_unique<XXH64ChecksumAggregator>(seed); - } - abort(); -} - -} diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregator.h b/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregator.h index 421cca5c6b5..b206f11f02f 100644 --- a/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregator.h +++ b/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregator.h @@ -16,15 +16,6 @@ public: using GlobalId = document::GlobalId; using Timestamp = storage::spi::Timestamp; using BucketChecksum = storage::spi::BucketChecksum; - virtual ~ChecksumAggregator() = default; - virtual ChecksumAggregator & addDoc(const GlobalId &gid, const Timestamp ×tamp) = 0; - virtual ChecksumAggregator & removeDoc(const GlobalId &gid, const Timestamp ×tamp) = 0; - virtual ChecksumAggregator & addChecksum(const ChecksumAggregator & rhs) = 0; - virtual ChecksumAggregator & removeChecksum(const ChecksumAggregator & rhs) = 0; - virtual BucketChecksum getChecksum() const = 0; - virtual bool empty() const = 0;; - virtual ChecksumAggregator * clone() const = 0; - static std::unique_ptr<ChecksumAggregator> create(ChecksumType type, BucketChecksum seed); }; } diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregators.cpp b/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregators.cpp index a118f416b6b..9246f94ca83 100644 --- a/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregators.cpp +++ b/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregators.cpp @@ -33,79 +33,30 @@ calcChecksum(const GlobalId &gid, const Timestamp ×tamp) return gidChecksum(gid) + timestampChecksum(timestamp); } +uint64_t +compute(const GlobalId &gid, const Timestamp ×tamp) { + char buffer[20]; + memcpy(&buffer[0], gid.get(), GlobalId::LENGTH); + uint64_t tmp = timestamp.getValue(); + memcpy(&buffer[GlobalId::LENGTH], &tmp, sizeof(tmp)); + return XXH64(buffer, sizeof(buffer), 0); } -LegacyChecksumAggregator * -LegacyChecksumAggregator::clone() const { - return new LegacyChecksumAggregator(*this); -} -LegacyChecksumAggregator & -LegacyChecksumAggregator::addDoc(const GlobalId &gid, const Timestamp ×tamp) { - _checksum += calcChecksum(gid, timestamp); - return *this; } -LegacyChecksumAggregator & -LegacyChecksumAggregator::removeDoc(const GlobalId &gid, const Timestamp ×tamp) { - _checksum -= calcChecksum(gid, timestamp); - return *this; -} -LegacyChecksumAggregator & -LegacyChecksumAggregator::addChecksum(const ChecksumAggregator & rhs) { - _checksum += dynamic_cast<const LegacyChecksumAggregator &>(rhs)._checksum; - return *this; -} -LegacyChecksumAggregator & -LegacyChecksumAggregator::removeChecksum(const ChecksumAggregator & rhs) { - _checksum -= dynamic_cast<const LegacyChecksumAggregator &>(rhs)._checksum; - return *this; -} -BucketChecksum -LegacyChecksumAggregator::getChecksum() const { - return BucketChecksum(_checksum); -} -bool -LegacyChecksumAggregator::empty() const { return _checksum == 0; } - - -XXH64ChecksumAggregator * -XXH64ChecksumAggregator::clone() const { - return new XXH64ChecksumAggregator(*this); -} -XXH64ChecksumAggregator & -XXH64ChecksumAggregator::addDoc(const GlobalId &gid, const Timestamp ×tamp) { - _checksum ^= compute(gid, timestamp); - return *this; -} -XXH64ChecksumAggregator & -XXH64ChecksumAggregator::removeDoc(const GlobalId &gid, const Timestamp ×tamp) { - _checksum ^= compute(gid, timestamp); - return *this; -} -XXH64ChecksumAggregator & -XXH64ChecksumAggregator::addChecksum(const ChecksumAggregator & rhs) { - _checksum ^= dynamic_cast<const XXH64ChecksumAggregator &>(rhs)._checksum; - return *this; -} -XXH64ChecksumAggregator & -XXH64ChecksumAggregator::removeChecksum(const ChecksumAggregator & rhs) { - _checksum ^= dynamic_cast<const XXH64ChecksumAggregator &>(rhs)._checksum; - return *this; +uint32_t +LegacyChecksumAggregator::addDoc(const GlobalId &gid, const Timestamp ×tamp, uint32_t checkSum) { + return add(calcChecksum(gid, timestamp), checkSum); } -BucketChecksum -XXH64ChecksumAggregator::getChecksum() const { - return BucketChecksum((_checksum >> 32) ^ (_checksum & 0xffffffffL)); + +uint32_t +LegacyChecksumAggregator::removeDoc(const GlobalId &gid, const Timestamp ×tamp, uint32_t checkSum) { + return remove(calcChecksum(gid, timestamp), checkSum); } -bool -XXH64ChecksumAggregator::empty() const { return _checksum == 0; } uint64_t -XXH64ChecksumAggregator::compute(const GlobalId &gid, const Timestamp ×tamp) { - char buffer[20]; - memcpy(&buffer[0], gid.get(), GlobalId::LENGTH); - uint64_t tmp = timestamp.getValue(); - memcpy(&buffer[GlobalId::LENGTH], &tmp, sizeof(tmp)); - return XXH64(buffer, sizeof(buffer), 0); +XXH64ChecksumAggregator::update(const GlobalId &gid, const Timestamp ×tamp, uint64_t checkSum) { + return update(compute(gid, timestamp), checkSum); } } diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregators.h b/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregators.h index 49762ad107f..1c0c2bc8f7a 100644 --- a/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregators.h +++ b/searchcore/src/vespa/searchcore/proton/bucketdb/checksumaggregators.h @@ -9,16 +9,11 @@ namespace proton::bucketdb { **/ class LegacyChecksumAggregator : public ChecksumAggregator { public: - explicit LegacyChecksumAggregator(BucketChecksum seed) : _checksum(seed) { } - LegacyChecksumAggregator * clone() const override; - LegacyChecksumAggregator & addDoc(const GlobalId &gid, const Timestamp ×tamp) override; - LegacyChecksumAggregator & removeDoc(const GlobalId &gid, const Timestamp ×tamp) override; - LegacyChecksumAggregator & addChecksum(const ChecksumAggregator & rhs) override; - LegacyChecksumAggregator & removeChecksum(const ChecksumAggregator & rhs) override; - BucketChecksum getChecksum() const override; - bool empty() const override; -private: - uint32_t _checksum; + static uint32_t addDoc(const GlobalId &gid, const Timestamp ×tamp, uint32_t checkSum); + static uint32_t removeDoc(const GlobalId &gid, const Timestamp ×tamp, uint32_t checkSum); + static uint32_t add(uint32_t checksum, uint32_t aggr) { return aggr + checksum; } + static uint32_t remove(uint32_t checksum, uint32_t aggr) { return aggr - checksum; } + static BucketChecksum get(uint32_t checkSum) { return BucketChecksum(checkSum); } }; /** @@ -26,17 +21,11 @@ private: **/ class XXH64ChecksumAggregator : public ChecksumAggregator { public: - explicit XXH64ChecksumAggregator(BucketChecksum seed) : _checksum(seed) { } - XXH64ChecksumAggregator * clone() const override; - XXH64ChecksumAggregator & addDoc(const GlobalId &gid, const Timestamp ×tamp) override; - XXH64ChecksumAggregator & removeDoc(const GlobalId &gid, const Timestamp ×tamp) override; - XXH64ChecksumAggregator & addChecksum(const ChecksumAggregator & rhs) override; - XXH64ChecksumAggregator & removeChecksum(const ChecksumAggregator & rhs) override; - BucketChecksum getChecksum() const override; - bool empty() const override; -private: - static uint64_t compute(const GlobalId &gid, const Timestamp ×tamp); - uint64_t _checksum; + static uint64_t update(const GlobalId &gid, const Timestamp ×tamp, uint64_t checkSum); + static uint64_t update(uint64_t a, uint64_t b) { return a ^ b; } + static BucketChecksum get(uint64_t checkSum) { + return BucketChecksum((checkSum >> 32) ^ (checkSum & 0xffffffffL)); + } }; } diff --git a/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp b/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp index fdacf59fa02..7cfad4f1ac1 100644 --- a/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp @@ -34,7 +34,7 @@ SearchableFeedView::SearchableFeedView(StoreOnlyFeedView::Context storeOnlyCtx, const FastAccessFeedView::Context &fastUpdateCtx, Context ctx) : Parent(std::move(storeOnlyCtx), params, fastUpdateCtx), _indexWriter(ctx._indexWriter), - _hasIndexedFields(getSchema()->getNumIndexFields() > 0) + _hasIndexedFields(_schema->getNumIndexFields() > 0) { } SearchableFeedView::~SearchableFeedView() = default; diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp index 89e765e2e2b..bf357188766 100644 --- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp @@ -8,14 +8,14 @@ #include "remove_batch_done_context.h" #include "removedonecontext.h" #include "updatedonecontext.h" +#include <vespa/document/datatype/documenttype.h> +#include <vespa/document/fieldvalue/document.h> +#include <vespa/document/repo/documenttyperepo.h> #include <vespa/searchcore/proton/attribute/ifieldupdatecallback.h> #include <vespa/searchcore/proton/common/feedtoken.h> #include <vespa/searchcore/proton/feedoperation/operations.h> #include <vespa/searchcore/proton/reference/i_gid_to_lid_change_handler.h> #include <vespa/searchlib/common/gatecallback.h> -#include <vespa/document/datatype/documenttype.h> -#include <vespa/document/fieldvalue/document.h> -#include <vespa/document/repo/documenttyperepo.h> #include <vespa/vespalib/util/isequencedtaskexecutor.h> #include <vespa/vespalib/util/exceptions.h> @@ -148,8 +148,8 @@ SummaryPutDoneContext::SummaryPutDoneContext(FeedToken token, IPendingLidTracker SummaryPutDoneContext::~SummaryPutDoneContext() = default; -std::vector<document::GlobalId> -getGidsToRemove(const IDocumentMetaStore &metaStore, const LidVectorContext::LidVector &lidsToRemove) +std::vector<document::GlobalId> getGidsToRemove(const IDocumentMetaStore &metaStore, + const LidVectorContext::LidVector &lidsToRemove) { std::vector<document::GlobalId> gids; gids.reserve(lidsToRemove.size()); @@ -162,9 +162,8 @@ getGidsToRemove(const IDocumentMetaStore &metaStore, const LidVectorContext::Lid return gids; } -void -putMetaData(documentmetastore::IStore &meta_store, const DocumentId & doc_id, - const DocumentOperation &op, bool is_removed_doc) +void putMetaData(documentmetastore::IStore &meta_store, const DocumentId & doc_id, + const DocumentOperation &op, bool is_removed_doc) { documentmetastore::IStore::Result putRes( meta_store.put(doc_id.getGlobalId(), @@ -178,9 +177,8 @@ putMetaData(documentmetastore::IStore &meta_store, const DocumentId & doc_id, assert(op.getLid() == putRes._lid); } -void -removeMetaData(documentmetastore::IStore &meta_store, const GlobalId & gid, const DocumentId &doc_id, - const DocumentOperation &op, bool is_removed_doc) +void removeMetaData(documentmetastore::IStore &meta_store, const GlobalId & gid, const DocumentId &doc_id, + const DocumentOperation &op, bool is_removed_doc) { assert(meta_store.validLid(op.getPrevLid())); assert(is_removed_doc == op.getPrevMarkedAsRemoved()); @@ -209,37 +207,6 @@ moveMetaData(documentmetastore::IStore &meta_store, const DocumentId & doc_id, c meta_store.move(op.getPrevLid(), op.getLid(), op.get_prepare_serial_num()); } -class UpdateScope final : public IFieldUpdateCallback -{ -private: - const vespalib::hash_set<int32_t> & _indexedFields; - bool _nonAttributeFields; -public: - bool _hasIndexedFields; - - UpdateScope(const vespalib::hash_set<int32_t> & indexedFields, const DocumentUpdate & upd); - bool hasIndexOrNonAttributeFields() const { - return _hasIndexedFields || _nonAttributeFields; - } - void onUpdateField(const document::Field & field, const search::AttributeVector * attr) override; -}; - -UpdateScope::UpdateScope(const vespalib::hash_set<int32_t> & indexedFields, const DocumentUpdate & upd) - : _indexedFields(indexedFields), - _nonAttributeFields(!upd.getFieldPathUpdates().empty()), - _hasIndexedFields(false) -{} - -void -UpdateScope::onUpdateField(const document::Field & field, const search::AttributeVector * attr) { - if (!_nonAttributeFields && (attr == nullptr || !attr->isUpdateableInMemoryOnly())) { - _nonAttributeFields = true; - } - if (!_hasIndexedFields && (_indexedFields.find(field.getId()) != _indexedFields.end())) { - _hasIndexedFields = true; - } -} - } // namespace StoreOnlyFeedView::StoreOnlyFeedView(Context ctx, const PersistentParams ¶ms) @@ -253,18 +220,12 @@ StoreOnlyFeedView::StoreOnlyFeedView(Context ctx, const PersistentParams ¶ms _pendingLidsForDocStore(), _pendingLidsForCommit(std::move(ctx._pendingLidsForCommit)), _schema(std::move(ctx._schema)), - _indexedFields(), _writeService(ctx._writeService), _params(params), _metaStore(_documentMetaStoreContext->get()), _gidToLidChangeHandler(ctx._gidToLidChangeHandler) { _docType = _repo->getDocumentType(_params._docTypeName.getName()); - if (_schema && _docType) { - for (const auto &indexField : _schema->getIndexFields()) { - _indexedFields.insert(_docType->getField(indexField.getName()).getId()); - } - } } StoreOnlyFeedView::~StoreOnlyFeedView() = default; @@ -378,7 +339,7 @@ StoreOnlyFeedView::updateAttributes(SerialNum, Lid, const DocumentUpdate & upd, OnOperationDoneType, IFieldUpdateCallback & onUpdate) { for (const auto & fieldUpdate : upd.getUpdates()) { - onUpdate.onUpdateField(fieldUpdate.getField(), nullptr); + onUpdate.onUpdateField(fieldUpdate.getField().getName(), nullptr); } } @@ -447,6 +408,22 @@ void StoreOnlyFeedView::heartBeatSummary(SerialNum serialNum) { })); } +StoreOnlyFeedView::UpdateScope::UpdateScope(const search::index::Schema & schema, const DocumentUpdate & upd) + : _schema(&schema), + _indexedFields(false), + _nonAttributeFields(!upd.getFieldPathUpdates().empty()) +{} + +void +StoreOnlyFeedView::UpdateScope::onUpdateField(vespalib::stringref fieldName, const search::AttributeVector * attr) { + if (!_nonAttributeFields && (attr == nullptr || !attr->isUpdateableInMemoryOnly())) { + _nonAttributeFields = true; + } + if (!_indexedFields && _schema->isIndexField(fieldName)) { + _indexedFields = true; + } +} + void StoreOnlyFeedView::internalUpdate(FeedToken token, const UpdateOperation &updOp) { if ( ! updOp.getUpdate()) { @@ -478,7 +455,7 @@ StoreOnlyFeedView::internalUpdate(FeedToken token, const UpdateOperation &updOp) auto uncommitted = get_pending_lid_token(updOp); auto onWriteDone = createUpdateDoneContext(std::move(token), std::move(uncommitted), updOp.getUpdate()); - UpdateScope updateScope(_indexedFields, upd); + UpdateScope updateScope(*_schema, upd); updateAttributes(serialNum, lid, upd, onWriteDone, updateScope); if (updateScope.hasIndexOrNonAttributeFields()) { @@ -486,7 +463,7 @@ StoreOnlyFeedView::internalUpdate(FeedToken token, const UpdateOperation &updOp) FutureDoc futureDoc = promisedDoc.get_future().share(); onWriteDone->setDocument(futureDoc); _pendingLidsForDocStore.waitComplete(lid); - if (updateScope._hasIndexedFields) { + if (updateScope._indexedFields) { updateIndexedFields(serialNum, lid, futureDoc, onWriteDone); } PromisedStream promisedStream; diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h index c033ae0c43f..da7d5e53a88 100644 --- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h +++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h @@ -9,6 +9,7 @@ #include "searchcontext.h" #include <vespa/searchcore/proton/common/pendinglidtracker.h> #include <vespa/searchcore/proton/common/doctypename.h> +#include <vespa/searchcore/proton/attribute/ifieldupdatecallback.h> #include <vespa/searchcore/proton/common/feeddebugger.h> #include <vespa/searchcore/proton/documentmetastore/documentmetastore.h> #include <vespa/searchcore/proton/documentmetastore/documentmetastorecontext.h> @@ -18,10 +19,9 @@ #include <vespa/searchcore/proton/reference/pending_notify_remove_done.h> #include <vespa/searchcorespi/index/ithreadingservice.h> #include <vespa/searchlib/query/base.h> -#include <vespa/searchcore/proton/feedoperation/operations.h> #include <vespa/vespalib/util/threadstackexecutorbase.h> -#include <vespa/vespalib/stllike/hash_set.h> #include <future> +#include <vespa/searchcore/proton/feedoperation/operations.h> namespace search { class IDestructorCallback; } @@ -60,6 +60,7 @@ public: using OnOperationDoneType = const std::shared_ptr<OperationDoneContext> &; using OnPutDoneType = const std::shared_ptr<PutDoneContext> &; using OnRemoveDoneType = const std::shared_ptr<RemoveDoneContext> &; + using FeedTokenUP = std::unique_ptr<FeedToken>; using FutureDoc = std::shared_future<std::unique_ptr<const Document>>; using PromisedDoc = std::promise<std::unique_ptr<const Document>>; using FutureStream = std::future<vespalib::nbostream>; @@ -120,6 +121,22 @@ public: {} }; +protected: + class UpdateScope : public IFieldUpdateCallback + { + private: + const search::index::Schema *_schema; + public: + bool _indexedFields; + bool _nonAttributeFields; + + UpdateScope(const search::index::Schema & schema, const DocumentUpdate & upd); + bool hasIndexOrNonAttributeFields() const { + return _indexedFields || _nonAttributeFields; + } + void onUpdateField(vespalib::stringref fieldName, const search::AttributeVector * attr) override; + }; + private: const ISummaryAdapter::SP _summaryAdapter; const IDocumentMetaStoreContext::SP _documentMetaStoreContext; @@ -128,9 +145,9 @@ private: LidReuseDelayer _lidReuseDelayer; PendingLidTracker _pendingLidsForDocStore; std::shared_ptr<PendingLidTrackerBase> _pendingLidsForCommit; - const search::index::Schema::SP _schema; - vespalib::hash_set<int32_t> _indexedFields; + protected: + const search::index::Schema::SP _schema; searchcorespi::index::IThreadingService &_writeService; PersistentParams _params; IDocumentMetaStore &_metaStore; diff --git a/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/aws/AwsCredentials.java b/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/aws/AwsCredentials.java index 30ff63fb108..b027e7272ea 100644 --- a/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/aws/AwsCredentials.java +++ b/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/aws/AwsCredentials.java @@ -35,11 +35,11 @@ public class AwsCredentials { } public AwsCredentials(URI ztsUrl, ServiceIdentityProvider identityProvider, AthenzDomain athenzDomain, AwsRole awsRole) { - this(new DefaultZtsClient.Builder(ztsUrl).withIdentityProvider(identityProvider).build(), athenzDomain, awsRole); + this(new DefaultZtsClient(ztsUrl, identityProvider), athenzDomain, awsRole); } public AwsCredentials(URI ztsUrl, SSLContext sslContext, AthenzDomain athenzDomain, AwsRole awsRole) { - this(new DefaultZtsClient.Builder(ztsUrl).withSslContext(sslContext).build(), athenzDomain, awsRole); + this(new DefaultZtsClient(ztsUrl, sslContext), athenzDomain, awsRole); } /** diff --git a/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/client/common/ClientBase.java b/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/client/common/ClientBase.java index 37ef513c786..c1ce45c35da 100644 --- a/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/client/common/ClientBase.java +++ b/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/client/common/ClientBase.java @@ -39,15 +39,12 @@ public abstract class ClientBase implements AutoCloseable { private final CloseableHttpClient client; private final ClientExceptionFactory exceptionFactory; - private final ErrorHandler errorHandler; protected ClientBase(String userAgent, Supplier<SSLContext> sslContextSupplier, ClientExceptionFactory exceptionFactory, - HostnameVerifier hostnameVerifier, - ErrorHandler errorHandler) { + HostnameVerifier hostnameVerifier) { this.exceptionFactory = exceptionFactory; - this.errorHandler = errorHandler; this.client = createHttpClient(userAgent, sslContextSupplier, hostnameVerifier); } @@ -55,17 +52,10 @@ public abstract class ClientBase implements AutoCloseable { try { return client.execute(request, responseHandler); } catch (IOException e) { - try { - reportError(request, e); - } catch (Exception _ignored) {} throw new UncheckedIOException(e); } } - private void reportError(HttpUriRequest request, Exception e) { - errorHandler.reportError(request, e); - } - protected StringEntity toJsonStringEntity(Object entity) { try { return new StringEntity(objectMapper.writeValueAsString(entity), ContentType.APPLICATION_JSON); @@ -124,11 +114,4 @@ public abstract class ClientBase implements AutoCloseable { protected interface ClientExceptionFactory { RuntimeException createException(int errorCode, String description); } - - public interface ErrorHandler { - static ErrorHandler empty() { - return (r,e)->{}; - } - void reportError(HttpUriRequest request, Exception error); - } } diff --git a/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/client/zms/DefaultZmsClient.java b/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/client/zms/DefaultZmsClient.java index 3742996c274..33cb6d7d5d4 100644 --- a/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/client/zms/DefaultZmsClient.java +++ b/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/client/zms/DefaultZmsClient.java @@ -25,6 +25,7 @@ import javax.net.ssl.SSLContext; import java.net.URI; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.OptionalInt; import java.util.Set; import java.util.function.Supplier; @@ -39,16 +40,16 @@ public class DefaultZmsClient extends ClientBase implements ZmsClient { private final URI zmsUrl; private final AthenzIdentity identity; - public DefaultZmsClient(URI zmsUrl, AthenzIdentity identity, SSLContext sslContext, ErrorHandler errorHandler) { - this(zmsUrl, identity, () -> sslContext, errorHandler); + public DefaultZmsClient(URI zmsUrl, AthenzIdentity identity, SSLContext sslContext) { + this(zmsUrl, identity, () -> sslContext); } - public DefaultZmsClient(URI zmsUrl, ServiceIdentityProvider identityProvider, ErrorHandler errorHandler) { - this(zmsUrl, identityProvider.identity(), identityProvider::getIdentitySslContext, errorHandler); + public DefaultZmsClient(URI zmsUrl, ServiceIdentityProvider identityProvider) { + this(zmsUrl, identityProvider.identity(), identityProvider::getIdentitySslContext); } - private DefaultZmsClient(URI zmsUrl, AthenzIdentity identity, Supplier<SSLContext> sslContextSupplier, ErrorHandler errorHandler) { - super("vespa-zms-client", sslContextSupplier, ZmsClientException::new, null, errorHandler); + private DefaultZmsClient(URI zmsUrl, AthenzIdentity identity, Supplier<SSLContext> sslContextSupplier) { + super("vespa-zms-client", sslContextSupplier, ZmsClientException::new, null); this.zmsUrl = addTrailingSlash(zmsUrl); this.identity = identity; } diff --git a/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/client/zts/DefaultZtsClient.java b/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/client/zts/DefaultZtsClient.java index 28119dc1f5a..c05213c8008 100644 --- a/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/client/zts/DefaultZtsClient.java +++ b/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/client/zts/DefaultZtsClient.java @@ -37,7 +37,6 @@ import java.security.KeyPair; import java.security.cert.X509Certificate; import java.time.Duration; import java.util.List; -import java.util.Objects; import java.util.Optional; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -53,8 +52,25 @@ import static java.util.stream.Collectors.toList; public class DefaultZtsClient extends ClientBase implements ZtsClient { private final URI ztsUrl; - protected DefaultZtsClient(URI ztsUrl, Supplier<SSLContext> sslContextSupplier, HostnameVerifier hostnameVerifier, ErrorHandler errorHandler) { - super("vespa-zts-client", sslContextSupplier, ZtsClientException::new, hostnameVerifier, errorHandler); + + public DefaultZtsClient(URI ztsUrl, SSLContext sslContext) { + this(ztsUrl, sslContext, null); + } + + public DefaultZtsClient(URI ztsUrl, SSLContext sslContext, HostnameVerifier hostnameVerifier) { + this(ztsUrl, () -> sslContext, hostnameVerifier); + } + + public DefaultZtsClient(URI ztsUrl, ServiceIdentityProvider identityProvider) { + this(ztsUrl, identityProvider::getIdentitySslContext, null); + } + + public DefaultZtsClient(URI ztsUrl, ServiceIdentityProvider identityProvider, HostnameVerifier hostnameVerifier) { + this(ztsUrl, identityProvider::getIdentitySslContext, hostnameVerifier); + } + + private DefaultZtsClient(URI ztsUrl, Supplier<SSLContext> sslContextSupplier, HostnameVerifier hostnameVerifier) { + super("vespa-zts-client", sslContextSupplier, ZtsClientException::new, hostnameVerifier); this.ztsUrl = addTrailingSlash(ztsUrl); } @@ -223,41 +239,5 @@ public class DefaultZtsClient extends ClientBase implements ZtsClient { else return URI.create(ztsUrl.toString() + '/'); } - public static class Builder { - private URI ztsUrl; - private ClientBase.ErrorHandler errorHandler = ErrorHandler.empty(); - private HostnameVerifier hostnameVerifier = null; - private Supplier<SSLContext> sslContextSupplier = null; - - public Builder(URI ztsUrl) { - this.ztsUrl = ztsUrl; - } - public Builder withErrorHandler(ClientBase.ErrorHandler errorHandler) { - this.errorHandler = errorHandler; - return this; - } - - public Builder withHostnameVerifier(HostnameVerifier hostnameVerifier) { - this.hostnameVerifier = hostnameVerifier; - return this; - } - - public Builder withSslContext(SSLContext sslContext) { - this.sslContextSupplier = () -> sslContext; - return this; - } - - public Builder withIdentityProvider(ServiceIdentityProvider identityProvider) { - this.sslContextSupplier = identityProvider::getIdentitySslContext; - return this; - } - - public DefaultZtsClient build() { - if (Objects.isNull(sslContextSupplier)) { - throw new IllegalArgumentException("No ssl context or identity provider available to set up zts client"); - } - return new DefaultZtsClient(ztsUrl, sslContextSupplier, hostnameVerifier, errorHandler); - } - } } diff --git a/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/identityprovider/client/AthenzCredentialsService.java b/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/identityprovider/client/AthenzCredentialsService.java index 612f9caa691..8e029906c30 100644 --- a/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/identityprovider/client/AthenzCredentialsService.java +++ b/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/identityprovider/client/AthenzCredentialsService.java @@ -81,7 +81,7 @@ class AthenzCredentialsService { document.ipAddresses(), keyPair); - try (ZtsClient ztsClient = new DefaultZtsClient.Builder(ztsEndpoint).withIdentityProvider(nodeIdentityProvider).build()) { + try (ZtsClient ztsClient = new DefaultZtsClient(ztsEndpoint, nodeIdentityProvider)) { InstanceIdentity instanceIdentity = ztsClient.registerInstance( configserverIdentity, @@ -102,7 +102,7 @@ class AthenzCredentialsService { document.ipAddresses(), newKeyPair); - try (ZtsClient ztsClient = new DefaultZtsClient.Builder(ztsEndpoint).withSslContext(sslContext).build()) { + try (ZtsClient ztsClient = new DefaultZtsClient(ztsEndpoint, sslContext)) { InstanceIdentity instanceIdentity = ztsClient.refreshInstance( configserverIdentity, diff --git a/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/identityprovider/client/AthenzIdentityProviderImpl.java b/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/identityprovider/client/AthenzIdentityProviderImpl.java index 724a3059f6d..65574d7583e 100644 --- a/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/identityprovider/client/AthenzIdentityProviderImpl.java +++ b/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/identityprovider/client/AthenzIdentityProviderImpl.java @@ -301,7 +301,7 @@ public final class AthenzIdentityProviderImpl extends AbstractComponent implemen } private DefaultZtsClient createZtsClient() { - return new DefaultZtsClient.Builder(ztsEndpoint).withSslContext(getIdentitySslContext()).build(); + return new DefaultZtsClient(ztsEndpoint, getIdentitySslContext()); } @Override |