aboutsummaryrefslogtreecommitdiffstats
path: root/clustercontroller-reindexer
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2021-02-20 11:11:59 +0100
committerJon Marius Venstad <venstad@gmail.com>2021-02-20 11:11:59 +0100
commitf79953a481f337d8aefa0dade27b50a01fbe6929 (patch)
tree98337feceba6b77b109aca9f8eda14e7da377737 /clustercontroller-reindexer
parent5b9841604e931a6e61b53526680bdf130261e98c (diff)
Write initial state on construction, not run
Diffstat (limited to 'clustercontroller-reindexer')
-rw-r--r--clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java4
-rw-r--r--clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java27
2 files changed, 20 insertions, 11 deletions
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java
index c5d484c48b0..54240ebd81c 100644
--- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java
+++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java
@@ -79,6 +79,8 @@ public class Reindexer {
this.visitorSessions = visitorSessions;
this.metrics = new ReindexingMetrics(metric, cluster.name);
this.clock = clock;
+
+ database.initializeIfEmpty(cluster.name, ready, clock.instant());
}
/** Lets the reindexer abort any ongoing visit session, wait for it to complete normally, then exit. */
@@ -92,7 +94,7 @@ public class Reindexer {
throw new IllegalStateException("Already shut down");
// Keep metrics in sync across cluster controller containers.
- AtomicReference<Reindexing> reindexing = new AtomicReference<>(database.readReindexingOrDefault(cluster.name, ready, clock.instant()));
+ AtomicReference<Reindexing> reindexing = new AtomicReference<>(database.readReindexing(cluster.name()));
database.writeReindexing(reindexing.get(), cluster.name());
metrics.dump(reindexing.get());
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java
index c735a27387a..d46ab812aca 100644
--- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java
+++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java
@@ -46,16 +46,23 @@ public class ReindexingCurator {
}
/** If no reindexing data exists (has been wiped), assume current ready documents are already done. */
- public Reindexing readReindexingOrDefault(String cluster, Map<DocumentType, Instant> ready, Instant now) {
- return curator.getData(statusPath(cluster)).map(serializer::deserialize)
- .orElseGet(() -> {
- Reindexing reindexing = Reindexing.empty();
- for (DocumentType type : ready.keySet())
- if (ready.get(type).isBefore(now))
- reindexing = reindexing.with(type, Status.ready(now).running().successful(now));
-
- return reindexing;
- });
+ public void initializeIfEmpty(String cluster, Map<DocumentType, Instant> ready, Instant now) {
+ if ( ! curator.exists(statusPath(cluster))) {
+ try (Lock lock = lockReindexing(cluster)) {
+ if (curator.exists(statusPath(cluster)))
+ return; // Some other node already did this.
+
+ Reindexing reindexing = Reindexing.empty();
+ for (DocumentType type : ready.keySet())
+ if (ready.get(type).isBefore(now))
+ reindexing = reindexing.with(type, Status.ready(now).running().successful(now));
+
+ writeReindexing(reindexing, cluster);
+ }
+ catch (ReindexingLockException ignored) {
+ // Some other node took ownership and is doing this.
+ }
+ }
}
public Reindexing readReindexing(String cluster) {