diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2020-12-03 10:04:03 +0100 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2020-12-03 10:04:03 +0100 |
commit | c64e1fc68e13397d1199847448361a0f47ba13fc (patch) | |
tree | 0ab62c71648cc46f46d213ab721b22130d781273 /clustercontroller-reindexer | |
parent | a30f94af8019b0316d893fcc45b7f84df6ba068d (diff) |
Avoid writing stale reindexing status to ZK
Diffstat (limited to 'clustercontroller-reindexer')
-rw-r--r-- | clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java | 17 |
1 files changed, 7 insertions, 10 deletions
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java index 91ac7851588..ef5e676e36b 100644 --- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java @@ -92,16 +92,17 @@ public class Reindexer { throw new IllegalStateException("Already shut down"); try (Lock lock = database.lockReindexing()) { - Reindexing reindexing = updateWithReady(ready, database.readReindexing(), clock.instant()); - database.writeReindexing(reindexing); - metrics.dump(reindexing); + AtomicReference<Reindexing> reindexing = new AtomicReference<>(database.readReindexing()); + reindexing.set(updateWithReady(ready, reindexing.get(), clock.instant())); + database.writeReindexing(reindexing.get()); + metrics.dump(reindexing.get()); for (DocumentType type : ready.keySet()) { // We consider only document types for which we have config. if (ready.get(type).isAfter(clock.instant())) log.log(INFO, "Received config for reindexing which is ready in the future — will process later " + "(" + ready.get(type) + " is after " + clock.instant() + ")"); else - progress(type, new AtomicReference<>(reindexing), new AtomicReference<>(reindexing.status().get(type))); + progress(type, reindexing, new AtomicReference<>(reindexing.get().status().get(type))); if (phaser.isTerminated()) break; @@ -126,10 +127,6 @@ public class Reindexer { @SuppressWarnings("fallthrough") // (ノಠ ∩ಠ)ノ彡( \o°o)\ private void progress(DocumentType type, AtomicReference<Reindexing> reindexing, AtomicReference<Status> status) { - - database.writeReindexing(reindexing.updateAndGet(value -> value.with(type, status.get()))); - metrics.dump(reindexing.get()); - switch (status.get().state()) { default: log.log(WARNING, "Unknown reindexing state '" + status.get().state() + "'"); @@ -140,10 +137,10 @@ public class Reindexer { case RUNNING: log.log(WARNING, "Unexpected state 'RUNNING' of reindexing of " + type); case READY: // Intentional fallthrough — must just assume we failed updating state when exiting previously. - log.log(FINE, () -> "Running reindexing of " + type); + log.log(FINE, () -> "Running reindexing of " + type); } - // Visit buckets until they're all done, or until we are interrupted. + // Visit buckets until they're all done, or until we are shut down. status.updateAndGet(Status::running); AtomicReference<Instant> progressLastStored = new AtomicReference<>(clock.instant()); VisitorControlHandler control = new VisitorControlHandler() { |