diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2021-12-15 12:19:11 +0100 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2021-12-15 12:19:11 +0100 |
commit | 8cffaebf88b753fb0f59ed13c46ca8b86b694193 (patch) | |
tree | fa862ea078fb7d676bba80190f247d84a070bc99 /clustercontroller-reindexer/src/main | |
parent | 4f4feea7dc41252589f14f88d7d0e4e0b107eee1 (diff) |
Support variable reindexing speed, based on config
Diffstat (limited to 'clustercontroller-reindexer/src/main')
4 files changed, 89 insertions, 34 deletions
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java index 1d4571e3fc6..638968cc03e 100644 --- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java @@ -2,6 +2,7 @@ package ai.vespa.reindexing; import ai.vespa.reindexing.Reindexing.Status; +import ai.vespa.reindexing.Reindexing.Trigger; import ai.vespa.reindexing.ReindexingCurator.ReindexingLockException; import com.yahoo.document.DocumentType; import com.yahoo.document.select.parser.ParseException; @@ -18,18 +19,20 @@ import com.yahoo.vespa.curator.Lock; import java.time.Clock; import java.time.Duration; import java.time.Instant; +import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.TreeMap; import java.util.concurrent.Phaser; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.logging.Logger; +import static java.util.Comparator.comparingDouble; import static java.util.Objects.requireNonNull; import static java.util.logging.Level.FINE; import static java.util.logging.Level.INFO; import static java.util.logging.Level.WARNING; +import static java.util.stream.Collectors.toUnmodifiableList; /** * Progresses reindexing efforts by creating visitor sessions against its own content cluster, @@ -45,14 +48,14 @@ public class Reindexer { static final Duration failureGrace = Duration.ofMinutes(10); private final Cluster cluster; - private final Map<DocumentType, Instant> ready; + private final List<Trigger> ready; private final ReindexingCurator database; private final Function<VisitorParameters, Runnable> visitorSessions; private final ReindexingMetrics metrics; private final Clock clock; private final Phaser phaser = new Phaser(2); // Reindexer and visitor. - public Reindexer(Cluster cluster, Map<DocumentType, Instant> ready, ReindexingCurator database, + public Reindexer(Cluster cluster, List<Trigger> ready, ReindexingCurator database, DocumentAccess access, Metric metric, Clock clock) { this(cluster, ready, @@ -70,13 +73,15 @@ public class Reindexer { ); } - Reindexer(Cluster cluster, Map<DocumentType, Instant> ready, ReindexingCurator database, + Reindexer(Cluster cluster, List<Trigger> ready, ReindexingCurator database, Function<VisitorParameters, Runnable> visitorSessions, Metric metric, Clock clock) { - for (DocumentType type : ready.keySet()) - cluster.bucketSpaceOf(type); // Verifies this is known. + for (Trigger trigger : ready) + cluster.bucketSpaceOf(trigger.type()); // Verifies this is known. this.cluster = cluster; - this.ready = new TreeMap<>(ready); // Iterate through document types in consistent order. + this.ready = ready.stream() // Iterate through document types in consistent order. + .sorted(comparingDouble(Trigger::speed).reversed().thenComparing(Trigger::readyAt).thenComparing(Trigger::type)) + .collect(toUnmodifiableList()); this.database = database; this.visitorSessions = visitorSessions; this.metrics = new ReindexingMetrics(metric, cluster.name); @@ -104,12 +109,13 @@ public class Reindexer { database.writeReindexing(reindexing.get(), cluster.name()); metrics.dump(reindexing.get()); - for (DocumentType type : ready.keySet()) { // We consider only document types for which we have config. - if (ready.get(type).isAfter(clock.instant())) + // We consider only document types for which we have config. + for (Trigger trigger : ready) { + if (trigger.readyAt().isAfter(clock.instant())) log.log(INFO, "Received config for reindexing which is ready in the future — will process later " + - "(" + ready.get(type) + " is after " + clock.instant() + ")"); + "(" + trigger.readyAt() + " is after " + clock.instant() + ")"); else - progress(type, reindexing, new AtomicReference<>(reindexing.get().status().get(type))); + progress(trigger.type(), trigger.speed(), reindexing, new AtomicReference<>(reindexing.get().status().get(trigger.type()))); if (phaser.isTerminated()) break; @@ -117,21 +123,21 @@ public class Reindexer { } } - static Reindexing updateWithReady(Map<DocumentType, Instant> ready, Reindexing reindexing, Instant now) { - for (DocumentType type : ready.keySet()) { // We update only for document types for which we have config. - if ( ! ready.get(type).isAfter(now)) { - Status status = reindexing.status().getOrDefault(type, Status.ready(now)); - if (status.startedAt().isBefore(ready.get(type))) + static Reindexing updateWithReady(List<Trigger> ready, Reindexing reindexing, Instant now) { + for (Trigger trigger : ready) { // We update only for document types for which we have config. + if ( ! trigger.readyAt().isAfter(now)) { + Status status = reindexing.status().get(trigger.type()); + if (status == null || status.startedAt().isBefore(trigger.readyAt())) status = Status.ready(now); - reindexing = reindexing.with(type, status); + reindexing = reindexing.with(trigger.type(), status); } } return reindexing; } @SuppressWarnings("fallthrough") // (ノಠ ∩ಠ)ノ彡( \o°o)\ - private void progress(DocumentType type, AtomicReference<Reindexing> reindexing, AtomicReference<Status> status) { + private void progress(DocumentType type, double speed, AtomicReference<Reindexing> reindexing, AtomicReference<Status> status) { switch (status.get().state()) { default: log.log(WARNING, "Unknown reindexing state '" + status.get().state() + "'—not continuing reindexing of " + type); @@ -167,7 +173,7 @@ public class Reindexer { } }; - VisitorParameters parameters = createParameters(type, status.get().progress().orElse(null)); + VisitorParameters parameters = createParameters(type, speed, status.get().progress().orElse(null)); parameters.setControlHandler(control); Runnable sessionShutdown = visitorSessions.apply(parameters); // Also starts the visitor session. log.log(FINE, () -> "Running reindexing of " + type); @@ -197,12 +203,12 @@ public class Reindexer { metrics.dump(reindexing.get()); } - VisitorParameters createParameters(DocumentType type, ProgressToken progress) { + VisitorParameters createParameters(DocumentType type, double speed, ProgressToken progress) { VisitorParameters parameters = new VisitorParameters(type.getName()); - parameters.setThrottlePolicy(new DynamicThrottlePolicy().setWindowSizeIncrement(0.2) + parameters.setThrottlePolicy(new DynamicThrottlePolicy().setWindowSizeIncrement(speed) .setWindowSizeDecrementFactor(5) .setResizeRate(10) - .setMinWindowSize(1)); + .setMinWindowSize((int) (5 + speed))); parameters.setRemoteDataHandler(cluster.name()); parameters.setMaxPending(8); parameters.setResumeToken(progress); diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexing.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexing.java index 2fd38bb6aa5..6f13a07414c 100644 --- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexing.java +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexing.java @@ -191,4 +191,53 @@ public class Reindexing { } + + public static class Trigger { + + private final DocumentType type; + private final Instant readyAt; + private final double speed; + + public Trigger(DocumentType type, Instant readyAt, double speed) { + this.type = requireNonNull(type); + this.readyAt = requireNonNull(readyAt); + this.speed = speed; + } + + public DocumentType type() { + return type; + } + + public Instant readyAt() { + return readyAt; + } + + public double speed() { + return speed; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Trigger trigger = (Trigger) o; + return Double.compare(trigger.speed, speed) == 0 && type.equals(trigger.type) && readyAt.equals(trigger.readyAt); + } + + @Override + public int hashCode() { + return Objects.hash(type, readyAt, speed); + } + + @Override + public String toString() { + return "Trigger{" + + "type=" + type + + ", readyAt=" + readyAt + + ", speed=" + speed + + '}'; + } + + } + } diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java index 6dd4079835d..0d77792de6d 100644 --- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java @@ -2,6 +2,7 @@ package ai.vespa.reindexing; import ai.vespa.reindexing.Reindexing.Status; +import ai.vespa.reindexing.Reindexing.Trigger; import com.google.common.util.concurrent.UncheckedTimeoutException; import com.yahoo.document.DocumentType; import com.yahoo.document.DocumentTypeManager; @@ -16,6 +17,7 @@ import com.yahoo.yolean.Exceptions; import java.time.Duration; import java.time.Instant; +import java.util.List; import java.util.Map; import java.util.function.Function; import java.util.logging.Level; @@ -48,16 +50,16 @@ public class ReindexingCurator { } /** If no reindexing data exists (has been wiped), assume current ready documents are already done. */ - public void initializeIfEmpty(String cluster, Map<DocumentType, Instant> ready, Instant now) { + public void initializeIfEmpty(String cluster, List<Trigger> ready, Instant now) { if ( ! curator.exists(statusPath(cluster))) { try (Lock lock = lockReindexing(cluster)) { if (curator.exists(statusPath(cluster))) return; // Some other node already did this. Reindexing reindexing = Reindexing.empty(); - for (DocumentType type : ready.keySet()) - if (ready.get(type).isBefore(now)) - reindexing = reindexing.with(type, Status.ready(now).running().successful(now)); + for (Trigger trigger : ready) + if (trigger.readyAt().isBefore(now)) + reindexing = reindexing.with(trigger.type(), Status.ready(now).running().successful(now)); log.log(Level.INFO, "Creating initial reindexing status at '" + statusPath(cluster) + "'"); writeReindexing(reindexing, cluster); diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java index 405a6991d23..e784f070188 100644 --- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java @@ -2,15 +2,14 @@ package ai.vespa.reindexing; import ai.vespa.reindexing.Reindexer.Cluster; +import ai.vespa.reindexing.Reindexing.Trigger; import ai.vespa.reindexing.ReindexingCurator.ReindexingLockException; import com.google.inject.Inject; import com.yahoo.cloud.config.ClusterListConfig; import com.yahoo.cloud.config.ZookeepersConfig; import com.yahoo.component.AbstractComponent; import com.yahoo.concurrent.DaemonThreadFactory; -import com.yahoo.document.DocumentType; import com.yahoo.document.DocumentTypeManager; -import com.yahoo.document.config.DocumentmanagerConfig; import com.yahoo.documentapi.DocumentAccess; import com.yahoo.jdisc.Metric; import com.yahoo.net.HostName; @@ -23,13 +22,11 @@ import java.time.Clock; import java.time.Duration; import java.time.Instant; import java.util.List; -import java.util.Map; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.logging.Logger; -import java.util.stream.Collectors; import java.util.stream.Stream; import static java.util.logging.Level.FINE; @@ -37,7 +34,6 @@ import static java.util.logging.Level.WARNING; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toMap; import static java.util.stream.Collectors.toUnmodifiableList; -import static java.util.stream.Collectors.toUnmodifiableMap; /** * Runs in all cluster controller containers, and progresses reindexing efforts. @@ -117,10 +113,12 @@ public class ReindexingMaintainer extends AbstractComponent { curator.close(); } - static Map<DocumentType, Instant> parseReady(ReindexingConfig.Clusters cluster, DocumentTypeManager manager) { + static List<Trigger> parseReady(ReindexingConfig.Clusters cluster, DocumentTypeManager manager) { return cluster.documentTypes().entrySet().stream() - .collect(toUnmodifiableMap(typeStatus -> manager.getDocumentType(typeStatus.getKey()), - typeStatus -> Instant.ofEpochMilli(typeStatus.getValue().readyAtMillis()))); + .map(typeStatus -> new Trigger(manager.getDocumentType(typeStatus.getKey()), + Instant.ofEpochMilli(typeStatus.getValue().readyAtMillis()), + typeStatus.getValue().speed())) + .collect(toUnmodifiableList()); } /** Schedules a task with the given interval (across all containers in this ZK cluster). */ |