summaryrefslogtreecommitdiffstats
path: root/clustercontroller-reindexer/src
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2020-11-05 12:35:12 +0100
committerJon Marius Venstad <venstad@gmail.com>2020-11-05 12:35:12 +0100
commit2679d5fbbe935580607791669844d65c11b568f6 (patch)
tree3ed422e509730683ee051059f6822fcf2970b0ec /clustercontroller-reindexer/src
parent97ebd0f758659d00ceef13d0eb71ce0899dfbd9e (diff)
Add ReindexingMaintainer and Reindexer
Diffstat (limited to 'clustercontroller-reindexer/src')
-rw-r--r--clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java186
-rw-r--r--clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java27
-rw-r--r--clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java130
3 files changed, 342 insertions, 1 deletions
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java
new file mode 100644
index 00000000000..0386d34250c
--- /dev/null
+++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java
@@ -0,0 +1,186 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.reindexing;
+
+import ai.vespa.reindexing.Reindexing.Status;
+import com.yahoo.document.DocumentType;
+import com.yahoo.document.select.parser.ParseException;
+import com.yahoo.documentapi.DocumentAccess;
+import com.yahoo.documentapi.ProgressToken;
+import com.yahoo.documentapi.VisitorControlHandler;
+import com.yahoo.documentapi.VisitorControlHandler.CompletionCode;
+import com.yahoo.documentapi.VisitorControlSession;
+import com.yahoo.documentapi.VisitorParameters;
+import com.yahoo.documentapi.VisitorSession;
+import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
+import com.yahoo.vespa.curator.Lock;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.time.temporal.TemporalUnit;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.logging.Logger;
+
+import static com.yahoo.documentapi.VisitorControlHandler.CompletionCode.ABORTED;
+import static java.time.Instant.EPOCH;
+import static java.time.temporal.ChronoUnit.MICROS;
+import static java.util.Objects.requireNonNull;
+import static java.util.logging.Level.FINE;
+import static java.util.logging.Level.INFO;
+import static java.util.logging.Level.WARNING;
+
+public class Reindexer {
+
+ private static final Logger log = Logger.getLogger(Reindexer.class.getName());
+
+ private final Cluster cluster;
+ private final Map<DocumentType, Instant> ready;
+ private final ReindexingCurator database;
+ private final DocumentAccess access;
+ private final Clock clock;
+
+ public Reindexer(Cluster cluster, Map<DocumentType, Instant> ready, ReindexingCurator database,
+ DocumentAccess access, Clock clock) {
+ for (DocumentType type : ready.keySet())
+ cluster.bucketOf(type); // Verifies this is known.
+
+ this.cluster = cluster;
+ this.ready = new TreeMap<>(ready);
+ this.database = database;
+ this.access = access;
+ this.clock = clock;
+ }
+
+ /** Starts and tracks reprocessing of ready document types until done, or interrupted. */
+ public void reindex(Lock __) {
+ Reindexing reindexing = database.readReindexing();
+ for (DocumentType type : ready.keySet()) {
+ if (Thread.currentThread().isInterrupted())
+ break;
+
+ // We consider only document types for which we have config.
+ // Get status for document type, or, if this is a new type, mark it as just completed.
+ Instant readyAt = ready.get(type);
+ if (readyAt.isAfter(clock.instant())) {
+ log.log(WARNING, "Received config for reindexing which is ready in the future " +
+ "(" + readyAt + " is after " + clock.instant() + ")");
+ continue;
+ }
+
+ // If this is a new document type (or a new cluster), no reindexing is required.
+ Status status = reindexing.status().getOrDefault(type, Status.ready(clock.instant())
+ .running()
+ .successful(clock.instant()));
+ reindexing = reindexing.with(type, progress(type, status));
+ }
+ database.writeReindexing(reindexing);
+ }
+
+ @SuppressWarnings("fallthrough") // (ノಠ ∩ಠ)ノ彡( \o°o)\
+ private Status progress(DocumentType type, Status status) {
+ if (ready.get(type).isAfter(status.startedAt()))
+ status = Status.ready(clock.instant()); // Need to restart, as a newer reindexing is required.
+
+ switch (status.state()) {
+ default:
+ log.log(WARNING, "Unknown reindexing state '" + status.state() + "'");
+ case FAILED:
+ log.log(FINE, () -> "Not continuing reindexing of " + type + " due to previous failure");
+ case SUCCESSFUL: // Intentional fallthrough — both are done states.
+ return status;
+ case RUNNING:
+ log.log(WARNING, "Unepxected state 'RUNNING' of reindexing of " + type);
+ case READY: // Intentional fallthrough — must just assume we failed updating state when exiting previously.
+ }
+
+ // Visit buckets until they're all done, or until we are interrupted.
+ status = status.running();
+ VisitorControlHandler control = new VisitorControlHandler();
+ visit(type, status.progress().orElse(null), control);
+
+ // Progress is null if no buckets were successfully visited due to interrupt.
+ if (control.getProgress() != null)
+ status = status.progressed(control.getProgress());
+
+ // If we were interrupted, the result may not yet be set in the control handler.
+ CompletionCode code = control.getResult() != null ? control.getResult().getCode() : ABORTED;
+ switch (code) {
+ default:
+ log.log(WARNING, "Unexpected visitor result '" + control.getResult().getCode() + "'");
+ case FAILURE: // Intentional fallthrough — this is an error.
+ log.log(WARNING, "Visiting failed: " + control.getResult().getMessage());
+ return status.failed(clock.instant(), control.getResult().getMessage());
+ case ABORTED:
+ log.log(FINE, () -> "Aborting reindexing of " + type + " due to shutdown — will continue later");
+ return status.halted();
+ case SUCCESS:
+ log.log(INFO, "Completed reindexing of " + type + " after " + Duration.between(status.startedAt(), clock.instant()));
+ return status.successful(clock.instant());
+ }
+ }
+
+ private void visit(DocumentType type, ProgressToken progress, VisitorControlHandler control) {
+ VisitorParameters parameters = createParameters(type, progress);
+ parameters.setControlHandler(control);
+ VisitorSession session;
+ try {
+ session = access.createVisitorSession(parameters);
+ }
+ catch (ParseException e) {
+ throw new IllegalStateException(e);
+ }
+
+ // Wait until done, or interrupted, in which case we abort the visit but don't wait for it to complete.
+ try {
+ control.waitUntilDone();
+ }
+ catch (InterruptedException e) {
+ control.abort();
+ Thread.currentThread().interrupt();
+ }
+ session.destroy(); // If thread is interrupted, this will not wait, but will retain the interrupted flag.
+ }
+
+ private VisitorParameters createParameters(DocumentType type, ProgressToken progress) {
+ VisitorParameters parameters = new VisitorParameters(type.getName());
+ parameters.setRemoteDataHandler(cluster.name());
+ parameters.setResumeToken(progress);
+ parameters.setFieldSet(type.getName() + ";[document]");
+ parameters.setPriority(DocumentProtocol.Priority.LOW_1);
+ parameters.setRoute(cluster.route());
+ parameters.setBucketSpace(cluster.bucketOf(type));
+ // parameters.setVisitorLibrary("ReindexVisitor");
+ return parameters;
+ }
+
+
+ static class Cluster {
+
+ private final String name;
+ private final String configId;
+ private final Map<DocumentType, String> documentBuckets;
+
+ Cluster(String name, String configId, Map<DocumentType, String> documentBuckets) {
+ this.name = requireNonNull(name);
+ this.configId = requireNonNull(configId);
+ this.documentBuckets = Map.copyOf(documentBuckets);
+ }
+
+ String name() {
+ return name;
+ }
+
+ String route() {
+ return "[Storage:cluster=" + name + ";clusterconfigid=" + configId + "]";
+ }
+
+ String bucketOf(DocumentType documentType) {
+ return requireNonNull(documentBuckets.get(documentType), "Unknown bucket for " + documentType);
+ }
+
+ }
+
+}
+
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java
index 1158b2c9d3f..269d0ef18bb 100644
--- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java
+++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java
@@ -2,6 +2,7 @@
package ai.vespa.reindexing;
import ai.vespa.reindexing.Reindexing.Status;
+import com.google.common.util.concurrent.UncheckedTimeoutException;
import com.yahoo.document.DocumentTypeManager;
import com.yahoo.documentapi.ProgressToken;
import com.yahoo.path.Path;
@@ -10,9 +11,12 @@ import com.yahoo.slime.Inspector;
import com.yahoo.slime.Slime;
import com.yahoo.slime.SlimeUtils;
import com.yahoo.vespa.curator.Curator;
+import com.yahoo.vespa.curator.Lock;
import com.yahoo.yolean.Exceptions;
+import java.time.Duration;
import java.time.Instant;
+import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import static java.util.Objects.requireNonNull;
@@ -33,7 +37,9 @@ public class ReindexingCurator {
private static final String STATE = "state";
private static final String MESSAGE = "message";
- private static final Path statusPath = Path.fromString("/reindexing/v1/status");
+ private static final Path rootPath = Path.fromString("/reindexing/v1");
+ private static final Path statusPath = rootPath.append("status");
+ private static final Path lockPath = rootPath.append("lock");
private final Curator curator;
private final ReindexingSerializer serializer;
@@ -52,6 +58,16 @@ public class ReindexingCurator {
curator.set(statusPath, serializer.serialize(reindexing));
}
+ /** This lock must be held to manipulate reindexing state, or by whoever has a running visitor. */
+ public Lock lockReindexing() throws ReindexingLockException {
+ try {
+ return curator.lock(lockPath, Duration.ofSeconds(1));
+ }
+ catch (UncheckedTimeoutException e) {
+ throw new ReindexingLockException(e);
+ }
+ }
+
private static class ReindexingSerializer {
@@ -117,4 +133,13 @@ public class ReindexingCurator {
}
+ /** Indicates that taking the reindexing lock failed within the alotted time. */
+ static class ReindexingLockException extends Exception {
+
+ ReindexingLockException(UncheckedTimeoutException cause) {
+ super("Failed to obtain the reindexing lock", cause);
+ }
+
+ }
+
}
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java
new file mode 100644
index 00000000000..882f356e312
--- /dev/null
+++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java
@@ -0,0 +1,130 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.reindexing;
+
+import ai.vespa.reindexing.Reindexer.Cluster;
+import ai.vespa.reindexing.ReindexingCurator.ReindexingLockException;
+import com.google.inject.Inject;
+import com.yahoo.cloud.config.ClusterListConfig;
+import com.yahoo.cloud.config.ZookeepersConfig;
+import com.yahoo.component.AbstractComponent;
+import com.yahoo.concurrent.DaemonThreadFactory;
+import com.yahoo.document.DocumentType;
+import com.yahoo.document.DocumentTypeManager;
+import com.yahoo.document.config.DocumentmanagerConfig;
+import com.yahoo.documentapi.DocumentAccess;
+import com.yahoo.net.HostName;
+import com.yahoo.vespa.config.content.AllClustersBucketSpacesConfig;
+import com.yahoo.vespa.config.content.reindexing.ReindexingConfig;
+import com.yahoo.vespa.curator.Curator;
+import com.yahoo.vespa.curator.Lock;
+import com.yahoo.vespa.zookeeper.VespaZooKeeperServer;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.stream.Stream;
+
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+import static java.util.stream.Collectors.toUnmodifiableMap;
+
+public class ReindexingMaintainer extends AbstractComponent {
+
+ private static final Logger log = Logger.getLogger(Reindexing.class.getName());
+
+ private final ReindexingCurator database;
+ private final Reindexer reindexer;
+ private final ScheduledExecutorService executor;
+
+ // VespaZooKeeperServer dependency to ensure the ZK cluster is running.
+ @Inject
+ public ReindexingMaintainer(VespaZooKeeperServer zooKeeperServer, DocumentAccess access, ZookeepersConfig zookeepersConfig,
+ ClusterListConfig clusterListConfig, AllClustersBucketSpacesConfig allClustersBucketSpacesConfig,
+ ReindexingConfig reindexingConfig, DocumentmanagerConfig documentmanagerConfig) {
+ this(Clock.systemUTC(), access, zookeepersConfig, clusterListConfig, allClustersBucketSpacesConfig, reindexingConfig, documentmanagerConfig);
+ }
+
+ ReindexingMaintainer(Clock clock, DocumentAccess access, ZookeepersConfig zookeepersConfig,
+ ClusterListConfig clusterListConfig, AllClustersBucketSpacesConfig allClustersBucketSpacesConfig,
+ ReindexingConfig reindexingConfig, DocumentmanagerConfig documentmanagerConfig) {
+ DocumentTypeManager manager = new DocumentTypeManager(documentmanagerConfig);
+ this.database = new ReindexingCurator(Curator.create(zookeepersConfig.zookeeperserverlist()), manager);
+ this.reindexer = new Reindexer(parseCluster(reindexingConfig.clusterName(), clusterListConfig, allClustersBucketSpacesConfig, manager),
+ parseReady(reindexingConfig, manager), database, access, clock);
+ this.executor = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory("reindexer-"));
+ if (reindexingConfig.enabled())
+ scheduleStaggered(this::maintain, executor, Duration.ofMinutes(1), clock.instant(),
+ HostName.getLocalhost(), zookeepersConfig.zookeeperserverlist());
+ }
+
+ private void maintain() {
+ try (Lock lock = database.lockReindexing()){
+ reindexer.reindex(lock);
+ }
+ catch (ReindexingLockException e) {
+ // Some other container is handling the reindexing at this moment, which is fine.
+ }
+ catch (Exception e) {
+ log.log(Level.WARNING, "Exception when reindexing", e);
+ }
+ }
+
+ @Override
+ public void deconstruct() {
+ try {
+ executor.shutdownNow();
+ if ( ! executor.awaitTermination(20, TimeUnit.SECONDS))
+ log.log(Level.SEVERE, "Failed to shut down reindexer within timeout");
+ }
+ catch (InterruptedException e) {
+ log.log(Level.SEVERE, "Interrupted while waiting for reindexer to shut down");
+ Thread.currentThread().interrupt();
+ }
+
+ }
+
+ static Map<DocumentType, Instant> parseReady(ReindexingConfig config, DocumentTypeManager manager) {
+ return config.status().entrySet().stream()
+ .collect(toUnmodifiableMap(typeStatus -> manager.getDocumentType(typeStatus.getKey()),
+ typeStatus -> Instant.ofEpochMilli(typeStatus.getValue().readyAtMillis())));
+ }
+
+ /** Schedules the given task with the given interval (across all containers in this ZK cluster). */
+ static void scheduleStaggered(Runnable task, ScheduledExecutorService executor,
+ Duration interval, Instant now,
+ String hostname, String clusterHostnames) {
+ long delayMillis = 0;
+ long intervalMillis = interval.toMillis();
+ List<String> hostnames = Stream.of(clusterHostnames.split(","))
+ .map(hostPort -> hostPort.split(":")[0])
+ .collect(toList());
+ if (hostnames.contains(hostname)) {
+ long offset = hostnames.indexOf(hostname) * intervalMillis;
+ intervalMillis *= hostnames.size();
+ delayMillis = Math.floorMod(offset - now.toEpochMilli(), interval.toMillis());
+ }
+ executor.scheduleAtFixedRate(task, delayMillis, intervalMillis, TimeUnit.MILLISECONDS);
+ }
+
+ private static Cluster parseCluster(String name, ClusterListConfig clusters, AllClustersBucketSpacesConfig buckets,
+ DocumentTypeManager manager) {
+ return clusters.storage().stream()
+ .filter(storage -> storage.name().equals(name))
+ .map(storage -> new Cluster(name,
+ storage.configid(),
+ buckets.cluster(name)
+ .documentType().entrySet().stream()
+ .collect(toMap(entry -> manager.getDocumentType(entry.getKey()),
+ entry -> entry.getValue().bucketSpace()))))
+ .findAny()
+ .orElseThrow(() -> new IllegalStateException("This cluster (" + name + ") not among the list of clusters"));
+ }
+
+}