diff options
Diffstat (limited to 'clustercontroller-reindexer')
-rw-r--r-- | clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java | 4 | ||||
-rw-r--r-- | clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java | 27 |
2 files changed, 20 insertions, 11 deletions
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java index c5d484c48b0..54240ebd81c 100644 --- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java @@ -79,6 +79,8 @@ public class Reindexer { this.visitorSessions = visitorSessions; this.metrics = new ReindexingMetrics(metric, cluster.name); this.clock = clock; + + database.initializeIfEmpty(cluster.name, ready, clock.instant()); } /** Lets the reindexer abort any ongoing visit session, wait for it to complete normally, then exit. */ @@ -92,7 +94,7 @@ public class Reindexer { throw new IllegalStateException("Already shut down"); // Keep metrics in sync across cluster controller containers. - AtomicReference<Reindexing> reindexing = new AtomicReference<>(database.readReindexingOrDefault(cluster.name, ready, clock.instant())); + AtomicReference<Reindexing> reindexing = new AtomicReference<>(database.readReindexing(cluster.name())); database.writeReindexing(reindexing.get(), cluster.name()); metrics.dump(reindexing.get()); diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java index c735a27387a..d46ab812aca 100644 --- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java @@ -46,16 +46,23 @@ public class ReindexingCurator { } /** If no reindexing data exists (has been wiped), assume current ready documents are already done. */ - public Reindexing readReindexingOrDefault(String cluster, Map<DocumentType, Instant> ready, Instant now) { - return curator.getData(statusPath(cluster)).map(serializer::deserialize) - .orElseGet(() -> { - Reindexing reindexing = Reindexing.empty(); - for (DocumentType type : ready.keySet()) - if (ready.get(type).isBefore(now)) - reindexing = reindexing.with(type, Status.ready(now).running().successful(now)); - - return reindexing; - }); + public void initializeIfEmpty(String cluster, Map<DocumentType, Instant> ready, Instant now) { + if ( ! curator.exists(statusPath(cluster))) { + try (Lock lock = lockReindexing(cluster)) { + if (curator.exists(statusPath(cluster))) + return; // Some other node already did this. + + Reindexing reindexing = Reindexing.empty(); + for (DocumentType type : ready.keySet()) + if (ready.get(type).isBefore(now)) + reindexing = reindexing.with(type, Status.ready(now).running().successful(now)); + + writeReindexing(reindexing, cluster); + } + catch (ReindexingLockException ignored) { + // Some other node took ownership and is doing this. + } + } } public Reindexing readReindexing(String cluster) { |