summaryrefslogtreecommitdiffstats
path: root/clustercontroller-reindexer
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2020-11-19 10:59:59 +0100
committerJon Marius Venstad <venstad@gmail.com>2020-11-19 11:04:46 +0100
commite70424ae2180ddd6c04267b009c0fb715a41fa5f (patch)
tree49f66bea14acee0fe0fb4d8e47699bef6dcbbe08 /clustercontroller-reindexer
parentefee08d9b656445ddd13c31c14394e6031654047 (diff)
Status may be modified by different threads — wrap in AtomicRefence
Diffstat (limited to 'clustercontroller-reindexer')
-rw-r--r--clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java51
1 files changed, 24 insertions, 27 deletions
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java
index 78d6a583c24..e297cfe042c 100644
--- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java
+++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java
@@ -49,9 +49,6 @@ public class Reindexer {
private final Clock clock;
private final Phaser phaser = new Phaser(2); // Reindexer and visitor.
- private Reindexing reindexing;
- private Status status;
-
@Inject
public Reindexer(Cluster cluster, Map<DocumentType, Instant> ready, ReindexingCurator database,
DocumentAccess access, Metric metric, Clock clock) {
@@ -110,20 +107,20 @@ public class Reindexer {
@SuppressWarnings("fallthrough") // (ノಠ ∩ಠ)ノ彡( \o°o)\
private void progress(DocumentType type) {
// If this is a new document type (or a new cluster), no reindexing is required.
- reindexing = database.readReindexing();
- status = reindexing.status().getOrDefault(type,
- Status.ready(clock.instant())
- .running()
- .successful(clock.instant()));
- if (ready.get(type).isAfter(status.startedAt()))
- status = Status.ready(clock.instant()); // Need to restart, as a newer reindexing is required.
-
- database.writeReindexing(reindexing = reindexing.with(type, status));
- metrics.dump(reindexing);
-
- switch (status.state()) {
+ AtomicReference<Reindexing> reindexing = new AtomicReference<>(database.readReindexing());
+ AtomicReference<Status> status = new AtomicReference<>(reindexing.get().status().getOrDefault(type,
+ Status.ready(clock.instant())
+ .running()
+ .successful(clock.instant())));
+ // Need to restart if a newer indexing is required.
+ status.updateAndGet(value -> ready.get(type).isAfter(value.startedAt()) ? Status.ready(clock.instant()) : value);
+
+ database.writeReindexing(reindexing.updateAndGet(value -> value.with(type, status.get())));
+ metrics.dump(reindexing.get());
+
+ switch (status.get().state()) {
default:
- log.log(WARNING, "Unknown reindexing state '" + status.state() + "'");
+ log.log(WARNING, "Unknown reindexing state '" + status.get().state() + "'");
case FAILED:
log.log(FINE, () -> "Not continuing reindexing of " + type + " due to previous failure");
case SUCCESSFUL: // Intentional fallthrough — all three are done states.
@@ -135,17 +132,17 @@ public class Reindexer {
}
// Visit buckets until they're all done, or until we are interrupted.
- status = status.running();
+ status.updateAndGet(Status::running);
AtomicReference<Instant> progressLastStored = new AtomicReference<>(clock.instant());
VisitorControlHandler control = new VisitorControlHandler() {
@Override
public void onProgress(ProgressToken token) {
super.onProgress(token);
- status = status.progressed(token);
+ status.updateAndGet(value -> value.progressed(token));
if (progressLastStored.get().isBefore(clock.instant().minusSeconds(10))) {
progressLastStored.set(clock.instant());
- database.writeReindexing(reindexing = reindexing.with(type, status));
- metrics.dump(reindexing);
+ database.writeReindexing(reindexing.updateAndGet(value -> value.with(type, status.get())));
+ metrics.dump(reindexing.get());
}
}
@Override
@@ -155,7 +152,7 @@ public class Reindexer {
}
};
- VisitorParameters parameters = createParameters(type, status.progress().orElse(null));
+ VisitorParameters parameters = createParameters(type, status.get().progress().orElse(null));
parameters.setControlHandler(control);
Runnable sessionShutdown = visitorSessions.apply(parameters); // Also starts the visitor session.
@@ -168,18 +165,18 @@ public class Reindexer {
log.log(WARNING, "Unexpected visitor result '" + control.getResult().getCode() + "'");
case FAILURE: // Intentional fallthrough — this is an error.
log.log(WARNING, "Visiting failed: " + control.getResult().getMessage());
- status = status.failed(clock.instant(), control.getResult().getMessage());
+ status.updateAndGet(value -> value.failed(clock.instant(), control.getResult().getMessage()));
break;
case ABORTED:
log.log(FINE, () -> "Halting reindexing of " + type + " due to shutdown — will continue later");
- status = status.halted();
+ status.updateAndGet(Status::halted);
break;
case SUCCESS:
- log.log(INFO, "Completed reindexing of " + type + " after " + Duration.between(status.startedAt(), clock.instant()));
- status = status.successful(clock.instant());
+ log.log(INFO, "Completed reindexing of " + type + " after " + Duration.between(status.get().startedAt(), clock.instant()));
+ status.updateAndGet(value -> value.successful(clock.instant()));
}
- database.writeReindexing(reindexing = reindexing.with(type, status));
- metrics.dump(reindexing);
+ database.writeReindexing(reindexing.updateAndGet(value -> value.with(type, status.get())));
+ metrics.dump(reindexing.get());
}
VisitorParameters createParameters(DocumentType type, ProgressToken progress) {