diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2020-11-05 12:35:12 +0100 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2020-11-05 12:35:12 +0100 |
commit | 2679d5fbbe935580607791669844d65c11b568f6 (patch) | |
tree | 3ed422e509730683ee051059f6822fcf2970b0ec /clustercontroller-reindexer | |
parent | 97ebd0f758659d00ceef13d0eb71ce0899dfbd9e (diff) |
Add ReindexingMaintainer and Reindexer
Diffstat (limited to 'clustercontroller-reindexer')
3 files changed, 342 insertions, 1 deletions
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java new file mode 100644 index 00000000000..0386d34250c --- /dev/null +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java @@ -0,0 +1,186 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.reindexing; + +import ai.vespa.reindexing.Reindexing.Status; +import com.yahoo.document.DocumentType; +import com.yahoo.document.select.parser.ParseException; +import com.yahoo.documentapi.DocumentAccess; +import com.yahoo.documentapi.ProgressToken; +import com.yahoo.documentapi.VisitorControlHandler; +import com.yahoo.documentapi.VisitorControlHandler.CompletionCode; +import com.yahoo.documentapi.VisitorControlSession; +import com.yahoo.documentapi.VisitorParameters; +import com.yahoo.documentapi.VisitorSession; +import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; +import com.yahoo.vespa.curator.Lock; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.time.temporal.TemporalUnit; +import java.util.Map; +import java.util.TreeMap; +import java.util.logging.Logger; + +import static com.yahoo.documentapi.VisitorControlHandler.CompletionCode.ABORTED; +import static java.time.Instant.EPOCH; +import static java.time.temporal.ChronoUnit.MICROS; +import static java.util.Objects.requireNonNull; +import static java.util.logging.Level.FINE; +import static java.util.logging.Level.INFO; +import static java.util.logging.Level.WARNING; + +public class Reindexer { + + private static final Logger log = Logger.getLogger(Reindexer.class.getName()); + + private final Cluster cluster; + private final Map<DocumentType, Instant> ready; + private final ReindexingCurator database; + private final DocumentAccess access; + private final Clock clock; + + public Reindexer(Cluster cluster, Map<DocumentType, Instant> ready, ReindexingCurator database, + DocumentAccess access, Clock clock) { + for (DocumentType type : ready.keySet()) + cluster.bucketOf(type); // Verifies this is known. + + this.cluster = cluster; + this.ready = new TreeMap<>(ready); + this.database = database; + this.access = access; + this.clock = clock; + } + + /** Starts and tracks reprocessing of ready document types until done, or interrupted. */ + public void reindex(Lock __) { + Reindexing reindexing = database.readReindexing(); + for (DocumentType type : ready.keySet()) { + if (Thread.currentThread().isInterrupted()) + break; + + // We consider only document types for which we have config. + // Get status for document type, or, if this is a new type, mark it as just completed. + Instant readyAt = ready.get(type); + if (readyAt.isAfter(clock.instant())) { + log.log(WARNING, "Received config for reindexing which is ready in the future " + + "(" + readyAt + " is after " + clock.instant() + ")"); + continue; + } + + // If this is a new document type (or a new cluster), no reindexing is required. + Status status = reindexing.status().getOrDefault(type, Status.ready(clock.instant()) + .running() + .successful(clock.instant())); + reindexing = reindexing.with(type, progress(type, status)); + } + database.writeReindexing(reindexing); + } + + @SuppressWarnings("fallthrough") // (ノಠ ∩ಠ)ノ彡( \o°o)\ + private Status progress(DocumentType type, Status status) { + if (ready.get(type).isAfter(status.startedAt())) + status = Status.ready(clock.instant()); // Need to restart, as a newer reindexing is required. + + switch (status.state()) { + default: + log.log(WARNING, "Unknown reindexing state '" + status.state() + "'"); + case FAILED: + log.log(FINE, () -> "Not continuing reindexing of " + type + " due to previous failure"); + case SUCCESSFUL: // Intentional fallthrough — both are done states. + return status; + case RUNNING: + log.log(WARNING, "Unepxected state 'RUNNING' of reindexing of " + type); + case READY: // Intentional fallthrough — must just assume we failed updating state when exiting previously. + } + + // Visit buckets until they're all done, or until we are interrupted. + status = status.running(); + VisitorControlHandler control = new VisitorControlHandler(); + visit(type, status.progress().orElse(null), control); + + // Progress is null if no buckets were successfully visited due to interrupt. + if (control.getProgress() != null) + status = status.progressed(control.getProgress()); + + // If we were interrupted, the result may not yet be set in the control handler. + CompletionCode code = control.getResult() != null ? control.getResult().getCode() : ABORTED; + switch (code) { + default: + log.log(WARNING, "Unexpected visitor result '" + control.getResult().getCode() + "'"); + case FAILURE: // Intentional fallthrough — this is an error. + log.log(WARNING, "Visiting failed: " + control.getResult().getMessage()); + return status.failed(clock.instant(), control.getResult().getMessage()); + case ABORTED: + log.log(FINE, () -> "Aborting reindexing of " + type + " due to shutdown — will continue later"); + return status.halted(); + case SUCCESS: + log.log(INFO, "Completed reindexing of " + type + " after " + Duration.between(status.startedAt(), clock.instant())); + return status.successful(clock.instant()); + } + } + + private void visit(DocumentType type, ProgressToken progress, VisitorControlHandler control) { + VisitorParameters parameters = createParameters(type, progress); + parameters.setControlHandler(control); + VisitorSession session; + try { + session = access.createVisitorSession(parameters); + } + catch (ParseException e) { + throw new IllegalStateException(e); + } + + // Wait until done, or interrupted, in which case we abort the visit but don't wait for it to complete. + try { + control.waitUntilDone(); + } + catch (InterruptedException e) { + control.abort(); + Thread.currentThread().interrupt(); + } + session.destroy(); // If thread is interrupted, this will not wait, but will retain the interrupted flag. + } + + private VisitorParameters createParameters(DocumentType type, ProgressToken progress) { + VisitorParameters parameters = new VisitorParameters(type.getName()); + parameters.setRemoteDataHandler(cluster.name()); + parameters.setResumeToken(progress); + parameters.setFieldSet(type.getName() + ";[document]"); + parameters.setPriority(DocumentProtocol.Priority.LOW_1); + parameters.setRoute(cluster.route()); + parameters.setBucketSpace(cluster.bucketOf(type)); + // parameters.setVisitorLibrary("ReindexVisitor"); + return parameters; + } + + + static class Cluster { + + private final String name; + private final String configId; + private final Map<DocumentType, String> documentBuckets; + + Cluster(String name, String configId, Map<DocumentType, String> documentBuckets) { + this.name = requireNonNull(name); + this.configId = requireNonNull(configId); + this.documentBuckets = Map.copyOf(documentBuckets); + } + + String name() { + return name; + } + + String route() { + return "[Storage:cluster=" + name + ";clusterconfigid=" + configId + "]"; + } + + String bucketOf(DocumentType documentType) { + return requireNonNull(documentBuckets.get(documentType), "Unknown bucket for " + documentType); + } + + } + +} + diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java index 1158b2c9d3f..269d0ef18bb 100644 --- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java @@ -2,6 +2,7 @@ package ai.vespa.reindexing; import ai.vespa.reindexing.Reindexing.Status; +import com.google.common.util.concurrent.UncheckedTimeoutException; import com.yahoo.document.DocumentTypeManager; import com.yahoo.documentapi.ProgressToken; import com.yahoo.path.Path; @@ -10,9 +11,12 @@ import com.yahoo.slime.Inspector; import com.yahoo.slime.Slime; import com.yahoo.slime.SlimeUtils; import com.yahoo.vespa.curator.Curator; +import com.yahoo.vespa.curator.Lock; import com.yahoo.yolean.Exceptions; +import java.time.Duration; import java.time.Instant; +import java.util.concurrent.TimeoutException; import java.util.function.Function; import static java.util.Objects.requireNonNull; @@ -33,7 +37,9 @@ public class ReindexingCurator { private static final String STATE = "state"; private static final String MESSAGE = "message"; - private static final Path statusPath = Path.fromString("/reindexing/v1/status"); + private static final Path rootPath = Path.fromString("/reindexing/v1"); + private static final Path statusPath = rootPath.append("status"); + private static final Path lockPath = rootPath.append("lock"); private final Curator curator; private final ReindexingSerializer serializer; @@ -52,6 +58,16 @@ public class ReindexingCurator { curator.set(statusPath, serializer.serialize(reindexing)); } + /** This lock must be held to manipulate reindexing state, or by whoever has a running visitor. */ + public Lock lockReindexing() throws ReindexingLockException { + try { + return curator.lock(lockPath, Duration.ofSeconds(1)); + } + catch (UncheckedTimeoutException e) { + throw new ReindexingLockException(e); + } + } + private static class ReindexingSerializer { @@ -117,4 +133,13 @@ public class ReindexingCurator { } + /** Indicates that taking the reindexing lock failed within the alotted time. */ + static class ReindexingLockException extends Exception { + + ReindexingLockException(UncheckedTimeoutException cause) { + super("Failed to obtain the reindexing lock", cause); + } + + } + } diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java new file mode 100644 index 00000000000..882f356e312 --- /dev/null +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java @@ -0,0 +1,130 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.reindexing; + +import ai.vespa.reindexing.Reindexer.Cluster; +import ai.vespa.reindexing.ReindexingCurator.ReindexingLockException; +import com.google.inject.Inject; +import com.yahoo.cloud.config.ClusterListConfig; +import com.yahoo.cloud.config.ZookeepersConfig; +import com.yahoo.component.AbstractComponent; +import com.yahoo.concurrent.DaemonThreadFactory; +import com.yahoo.document.DocumentType; +import com.yahoo.document.DocumentTypeManager; +import com.yahoo.document.config.DocumentmanagerConfig; +import com.yahoo.documentapi.DocumentAccess; +import com.yahoo.net.HostName; +import com.yahoo.vespa.config.content.AllClustersBucketSpacesConfig; +import com.yahoo.vespa.config.content.reindexing.ReindexingConfig; +import com.yahoo.vespa.curator.Curator; +import com.yahoo.vespa.curator.Lock; +import com.yahoo.vespa.zookeeper.VespaZooKeeperServer; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; +import java.util.stream.Stream; + +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toMap; +import static java.util.stream.Collectors.toUnmodifiableMap; + +public class ReindexingMaintainer extends AbstractComponent { + + private static final Logger log = Logger.getLogger(Reindexing.class.getName()); + + private final ReindexingCurator database; + private final Reindexer reindexer; + private final ScheduledExecutorService executor; + + // VespaZooKeeperServer dependency to ensure the ZK cluster is running. + @Inject + public ReindexingMaintainer(VespaZooKeeperServer zooKeeperServer, DocumentAccess access, ZookeepersConfig zookeepersConfig, + ClusterListConfig clusterListConfig, AllClustersBucketSpacesConfig allClustersBucketSpacesConfig, + ReindexingConfig reindexingConfig, DocumentmanagerConfig documentmanagerConfig) { + this(Clock.systemUTC(), access, zookeepersConfig, clusterListConfig, allClustersBucketSpacesConfig, reindexingConfig, documentmanagerConfig); + } + + ReindexingMaintainer(Clock clock, DocumentAccess access, ZookeepersConfig zookeepersConfig, + ClusterListConfig clusterListConfig, AllClustersBucketSpacesConfig allClustersBucketSpacesConfig, + ReindexingConfig reindexingConfig, DocumentmanagerConfig documentmanagerConfig) { + DocumentTypeManager manager = new DocumentTypeManager(documentmanagerConfig); + this.database = new ReindexingCurator(Curator.create(zookeepersConfig.zookeeperserverlist()), manager); + this.reindexer = new Reindexer(parseCluster(reindexingConfig.clusterName(), clusterListConfig, allClustersBucketSpacesConfig, manager), + parseReady(reindexingConfig, manager), database, access, clock); + this.executor = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory("reindexer-")); + if (reindexingConfig.enabled()) + scheduleStaggered(this::maintain, executor, Duration.ofMinutes(1), clock.instant(), + HostName.getLocalhost(), zookeepersConfig.zookeeperserverlist()); + } + + private void maintain() { + try (Lock lock = database.lockReindexing()){ + reindexer.reindex(lock); + } + catch (ReindexingLockException e) { + // Some other container is handling the reindexing at this moment, which is fine. + } + catch (Exception e) { + log.log(Level.WARNING, "Exception when reindexing", e); + } + } + + @Override + public void deconstruct() { + try { + executor.shutdownNow(); + if ( ! executor.awaitTermination(20, TimeUnit.SECONDS)) + log.log(Level.SEVERE, "Failed to shut down reindexer within timeout"); + } + catch (InterruptedException e) { + log.log(Level.SEVERE, "Interrupted while waiting for reindexer to shut down"); + Thread.currentThread().interrupt(); + } + + } + + static Map<DocumentType, Instant> parseReady(ReindexingConfig config, DocumentTypeManager manager) { + return config.status().entrySet().stream() + .collect(toUnmodifiableMap(typeStatus -> manager.getDocumentType(typeStatus.getKey()), + typeStatus -> Instant.ofEpochMilli(typeStatus.getValue().readyAtMillis()))); + } + + /** Schedules the given task with the given interval (across all containers in this ZK cluster). */ + static void scheduleStaggered(Runnable task, ScheduledExecutorService executor, + Duration interval, Instant now, + String hostname, String clusterHostnames) { + long delayMillis = 0; + long intervalMillis = interval.toMillis(); + List<String> hostnames = Stream.of(clusterHostnames.split(",")) + .map(hostPort -> hostPort.split(":")[0]) + .collect(toList()); + if (hostnames.contains(hostname)) { + long offset = hostnames.indexOf(hostname) * intervalMillis; + intervalMillis *= hostnames.size(); + delayMillis = Math.floorMod(offset - now.toEpochMilli(), interval.toMillis()); + } + executor.scheduleAtFixedRate(task, delayMillis, intervalMillis, TimeUnit.MILLISECONDS); + } + + private static Cluster parseCluster(String name, ClusterListConfig clusters, AllClustersBucketSpacesConfig buckets, + DocumentTypeManager manager) { + return clusters.storage().stream() + .filter(storage -> storage.name().equals(name)) + .map(storage -> new Cluster(name, + storage.configid(), + buckets.cluster(name) + .documentType().entrySet().stream() + .collect(toMap(entry -> manager.getDocumentType(entry.getKey()), + entry -> entry.getValue().bucketSpace())))) + .findAny() + .orElseThrow(() -> new IllegalStateException("This cluster (" + name + ") not among the list of clusters")); + } + +} |