diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2020-11-19 10:59:59 +0100 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2020-11-19 11:04:46 +0100 |
commit | e70424ae2180ddd6c04267b009c0fb715a41fa5f (patch) | |
tree | 49f66bea14acee0fe0fb4d8e47699bef6dcbbe08 /clustercontroller-reindexer | |
parent | efee08d9b656445ddd13c31c14394e6031654047 (diff) |
Status may be modified by different threads — wrap in AtomicRefence
Diffstat (limited to 'clustercontroller-reindexer')
-rw-r--r-- | clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java | 51 |
1 files changed, 24 insertions, 27 deletions
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java index 78d6a583c24..e297cfe042c 100644 --- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java @@ -49,9 +49,6 @@ public class Reindexer { private final Clock clock; private final Phaser phaser = new Phaser(2); // Reindexer and visitor. - private Reindexing reindexing; - private Status status; - @Inject public Reindexer(Cluster cluster, Map<DocumentType, Instant> ready, ReindexingCurator database, DocumentAccess access, Metric metric, Clock clock) { @@ -110,20 +107,20 @@ public class Reindexer { @SuppressWarnings("fallthrough") // (ノಠ ∩ಠ)ノ彡( \o°o)\ private void progress(DocumentType type) { // If this is a new document type (or a new cluster), no reindexing is required. - reindexing = database.readReindexing(); - status = reindexing.status().getOrDefault(type, - Status.ready(clock.instant()) - .running() - .successful(clock.instant())); - if (ready.get(type).isAfter(status.startedAt())) - status = Status.ready(clock.instant()); // Need to restart, as a newer reindexing is required. - - database.writeReindexing(reindexing = reindexing.with(type, status)); - metrics.dump(reindexing); - - switch (status.state()) { + AtomicReference<Reindexing> reindexing = new AtomicReference<>(database.readReindexing()); + AtomicReference<Status> status = new AtomicReference<>(reindexing.get().status().getOrDefault(type, + Status.ready(clock.instant()) + .running() + .successful(clock.instant()))); + // Need to restart if a newer indexing is required. + status.updateAndGet(value -> ready.get(type).isAfter(value.startedAt()) ? Status.ready(clock.instant()) : value); + + database.writeReindexing(reindexing.updateAndGet(value -> value.with(type, status.get()))); + metrics.dump(reindexing.get()); + + switch (status.get().state()) { default: - log.log(WARNING, "Unknown reindexing state '" + status.state() + "'"); + log.log(WARNING, "Unknown reindexing state '" + status.get().state() + "'"); case FAILED: log.log(FINE, () -> "Not continuing reindexing of " + type + " due to previous failure"); case SUCCESSFUL: // Intentional fallthrough — all three are done states. @@ -135,17 +132,17 @@ public class Reindexer { } // Visit buckets until they're all done, or until we are interrupted. - status = status.running(); + status.updateAndGet(Status::running); AtomicReference<Instant> progressLastStored = new AtomicReference<>(clock.instant()); VisitorControlHandler control = new VisitorControlHandler() { @Override public void onProgress(ProgressToken token) { super.onProgress(token); - status = status.progressed(token); + status.updateAndGet(value -> value.progressed(token)); if (progressLastStored.get().isBefore(clock.instant().minusSeconds(10))) { progressLastStored.set(clock.instant()); - database.writeReindexing(reindexing = reindexing.with(type, status)); - metrics.dump(reindexing); + database.writeReindexing(reindexing.updateAndGet(value -> value.with(type, status.get()))); + metrics.dump(reindexing.get()); } } @Override @@ -155,7 +152,7 @@ public class Reindexer { } }; - VisitorParameters parameters = createParameters(type, status.progress().orElse(null)); + VisitorParameters parameters = createParameters(type, status.get().progress().orElse(null)); parameters.setControlHandler(control); Runnable sessionShutdown = visitorSessions.apply(parameters); // Also starts the visitor session. @@ -168,18 +165,18 @@ public class Reindexer { log.log(WARNING, "Unexpected visitor result '" + control.getResult().getCode() + "'"); case FAILURE: // Intentional fallthrough — this is an error. log.log(WARNING, "Visiting failed: " + control.getResult().getMessage()); - status = status.failed(clock.instant(), control.getResult().getMessage()); + status.updateAndGet(value -> value.failed(clock.instant(), control.getResult().getMessage())); break; case ABORTED: log.log(FINE, () -> "Halting reindexing of " + type + " due to shutdown — will continue later"); - status = status.halted(); + status.updateAndGet(Status::halted); break; case SUCCESS: - log.log(INFO, "Completed reindexing of " + type + " after " + Duration.between(status.startedAt(), clock.instant())); - status = status.successful(clock.instant()); + log.log(INFO, "Completed reindexing of " + type + " after " + Duration.between(status.get().startedAt(), clock.instant())); + status.updateAndGet(value -> value.successful(clock.instant())); } - database.writeReindexing(reindexing = reindexing.with(type, status)); - metrics.dump(reindexing); + database.writeReindexing(reindexing.updateAndGet(value -> value.with(type, status.get()))); + metrics.dump(reindexing.get()); } VisitorParameters createParameters(DocumentType type, ProgressToken progress) { |