diff options
Diffstat (limited to 'clustercontroller-reindexer')
-rw-r--r-- | clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java | 31 |
1 files changed, 21 insertions, 10 deletions
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java index e297cfe042c..13ed9800db3 100644 --- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java @@ -91,12 +91,16 @@ public class Reindexer { throw new IllegalStateException("Already shut down"); try (Lock lock = database.lockReindexing()) { + Reindexing reindexing = updateWithReady(ready, database.readReindexing(), clock.instant()); + database.writeReindexing(reindexing); + metrics.dump(reindexing); + for (DocumentType type : ready.keySet()) { // We consider only document types for which we have config. if (ready.get(type).isAfter(clock.instant())) log.log(INFO, "Received config for reindexing which is ready in the future — will process later " + "(" + ready.get(type) + " is after " + clock.instant() + ")"); else - progress(type); + progress(type, new AtomicReference<>(reindexing), new AtomicReference<>(reindexing.status().get(type))); if (phaser.isTerminated()) break; @@ -104,16 +108,23 @@ public class Reindexer { } } + static Reindexing updateWithReady(Map<DocumentType, Instant> ready, Reindexing reindexing, Instant now) { + for (DocumentType type : ready.keySet()) { // We consider update for document types for which we have config. + if ( ! ready.get(type).isAfter(now)) { + Status status = reindexing.status().getOrDefault(type, Status.ready(now) + .running() + .successful(now)); + if (status.startedAt().isBefore(ready.get(type))) + status = Status.ready(now); + + reindexing = reindexing.with(type, status); + } + } + return reindexing; + } + @SuppressWarnings("fallthrough") // (ノಠ ∩ಠ)ノ彡( \o°o)\ - private void progress(DocumentType type) { - // If this is a new document type (or a new cluster), no reindexing is required. - AtomicReference<Reindexing> reindexing = new AtomicReference<>(database.readReindexing()); - AtomicReference<Status> status = new AtomicReference<>(reindexing.get().status().getOrDefault(type, - Status.ready(clock.instant()) - .running() - .successful(clock.instant()))); - // Need to restart if a newer indexing is required. - status.updateAndGet(value -> ready.get(type).isAfter(value.startedAt()) ? Status.ready(clock.instant()) : value); + private void progress(DocumentType type, AtomicReference<Reindexing> reindexing, AtomicReference<Status> status) { database.writeReindexing(reindexing.updateAndGet(value -> value.with(type, status.get()))); metrics.dump(reindexing.get()); |