aboutsummaryrefslogtreecommitdiffstats
path: root/clustercontroller-reindexer
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2020-12-03 10:04:03 +0100
committerJon Marius Venstad <venstad@gmail.com>2020-12-03 10:04:03 +0100
commitc64e1fc68e13397d1199847448361a0f47ba13fc (patch)
tree0ab62c71648cc46f46d213ab721b22130d781273 /clustercontroller-reindexer
parenta30f94af8019b0316d893fcc45b7f84df6ba068d (diff)
Avoid writing stale reindexing status to ZK
Diffstat (limited to 'clustercontroller-reindexer')
-rw-r--r--clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java17
1 files changed, 7 insertions, 10 deletions
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java
index 91ac7851588..ef5e676e36b 100644
--- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java
+++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java
@@ -92,16 +92,17 @@ public class Reindexer {
throw new IllegalStateException("Already shut down");
try (Lock lock = database.lockReindexing()) {
- Reindexing reindexing = updateWithReady(ready, database.readReindexing(), clock.instant());
- database.writeReindexing(reindexing);
- metrics.dump(reindexing);
+ AtomicReference<Reindexing> reindexing = new AtomicReference<>(database.readReindexing());
+ reindexing.set(updateWithReady(ready, reindexing.get(), clock.instant()));
+ database.writeReindexing(reindexing.get());
+ metrics.dump(reindexing.get());
for (DocumentType type : ready.keySet()) { // We consider only document types for which we have config.
if (ready.get(type).isAfter(clock.instant()))
log.log(INFO, "Received config for reindexing which is ready in the future — will process later " +
"(" + ready.get(type) + " is after " + clock.instant() + ")");
else
- progress(type, new AtomicReference<>(reindexing), new AtomicReference<>(reindexing.status().get(type)));
+ progress(type, reindexing, new AtomicReference<>(reindexing.get().status().get(type)));
if (phaser.isTerminated())
break;
@@ -126,10 +127,6 @@ public class Reindexer {
@SuppressWarnings("fallthrough") // (ノಠ ∩ಠ)ノ彡( \o°o)\
private void progress(DocumentType type, AtomicReference<Reindexing> reindexing, AtomicReference<Status> status) {
-
- database.writeReindexing(reindexing.updateAndGet(value -> value.with(type, status.get())));
- metrics.dump(reindexing.get());
-
switch (status.get().state()) {
default:
log.log(WARNING, "Unknown reindexing state '" + status.get().state() + "'");
@@ -140,10 +137,10 @@ public class Reindexer {
case RUNNING:
log.log(WARNING, "Unexpected state 'RUNNING' of reindexing of " + type);
case READY: // Intentional fallthrough — must just assume we failed updating state when exiting previously.
- log.log(FINE, () -> "Running reindexing of " + type);
+ log.log(FINE, () -> "Running reindexing of " + type);
}
- // Visit buckets until they're all done, or until we are interrupted.
+ // Visit buckets until they're all done, or until we are shut down.
status.updateAndGet(Status::running);
AtomicReference<Instant> progressLastStored = new AtomicReference<>(clock.instant());
VisitorControlHandler control = new VisitorControlHandler() {