summaryrefslogtreecommitdiffstats
path: root/clustercontroller-reindexer
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2020-11-19 18:09:38 +0100
committerJon Marius Venstad <venstad@gmail.com>2020-11-19 18:09:38 +0100
commitb5af700358a57ffce0bac8dd3ffcac43d6f2cce6 (patch)
tree8e4768fc403a038877d4065044be690a7e14cfe3 /clustercontroller-reindexer
parente70424ae2180ddd6c04267b009c0fb715a41fa5f (diff)
Update with new ready-state immediately when ready
Diffstat (limited to 'clustercontroller-reindexer')
-rw-r--r--clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java31
1 files changed, 21 insertions, 10 deletions
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java
index e297cfe042c..13ed9800db3 100644
--- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java
+++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java
@@ -91,12 +91,16 @@ public class Reindexer {
throw new IllegalStateException("Already shut down");
try (Lock lock = database.lockReindexing()) {
+ Reindexing reindexing = updateWithReady(ready, database.readReindexing(), clock.instant());
+ database.writeReindexing(reindexing);
+ metrics.dump(reindexing);
+
for (DocumentType type : ready.keySet()) { // We consider only document types for which we have config.
if (ready.get(type).isAfter(clock.instant()))
log.log(INFO, "Received config for reindexing which is ready in the future — will process later " +
"(" + ready.get(type) + " is after " + clock.instant() + ")");
else
- progress(type);
+ progress(type, new AtomicReference<>(reindexing), new AtomicReference<>(reindexing.status().get(type)));
if (phaser.isTerminated())
break;
@@ -104,16 +108,23 @@ public class Reindexer {
}
}
+ static Reindexing updateWithReady(Map<DocumentType, Instant> ready, Reindexing reindexing, Instant now) {
+ for (DocumentType type : ready.keySet()) { // We consider update for document types for which we have config.
+ if ( ! ready.get(type).isAfter(now)) {
+ Status status = reindexing.status().getOrDefault(type, Status.ready(now)
+ .running()
+ .successful(now));
+ if (status.startedAt().isBefore(ready.get(type)))
+ status = Status.ready(now);
+
+ reindexing = reindexing.with(type, status);
+ }
+ }
+ return reindexing;
+ }
+
@SuppressWarnings("fallthrough") // (ノಠ ∩ಠ)ノ彡( \o°o)\
- private void progress(DocumentType type) {
- // If this is a new document type (or a new cluster), no reindexing is required.
- AtomicReference<Reindexing> reindexing = new AtomicReference<>(database.readReindexing());
- AtomicReference<Status> status = new AtomicReference<>(reindexing.get().status().getOrDefault(type,
- Status.ready(clock.instant())
- .running()
- .successful(clock.instant())));
- // Need to restart if a newer indexing is required.
- status.updateAndGet(value -> ready.get(type).isAfter(value.startedAt()) ? Status.ready(clock.instant()) : value);
+ private void progress(DocumentType type, AtomicReference<Reindexing> reindexing, AtomicReference<Status> status) {
database.writeReindexing(reindexing.updateAndGet(value -> value.with(type, status.get())));
metrics.dump(reindexing.get());