From 0d970eb166d94b44af3c6a3c58ee3c8f704e5fb2 Mon Sep 17 00:00:00 2001 From: Jon Marius Venstad Date: Fri, 9 Jul 2021 15:32:23 +0200 Subject: Always hold lock when doing read-write of status --- .../src/main/java/ai/vespa/reindexing/Reindexer.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java index 93b21c8166b..b056ba962fa 100644 --- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java @@ -97,11 +97,10 @@ public class Reindexer { // Keep metrics in sync across cluster controller containers. AtomicReference reindexing = new AtomicReference<>(database.readReindexing(cluster.name())); - database.writeReindexing(reindexing.get(), cluster.name()); metrics.dump(reindexing.get()); try (Lock lock = database.lockReindexing(cluster.name())) { - reindexing.set(updateWithReady(ready, reindexing.get(), clock.instant())); + reindexing.set(updateWithReady(ready, database.readReindexing(cluster.name()), clock.instant())); database.writeReindexing(reindexing.get(), cluster.name()); metrics.dump(reindexing.get()); @@ -178,8 +177,7 @@ public class Reindexer { sessionShutdown.run(); // Shutdown aborts the session unless already complete, then waits for it to terminate normally. // Only as a last resort will we be interrupted here, and the wait for outstanding replies terminate. - CompletionCode result = control.getResult() != null ? control.getResult().getCode() - : CompletionCode.ABORTED; + CompletionCode result = control.getResult() != null ? control.getResult().getCode() : CompletionCode.ABORTED; switch (result) { default: log.log(WARNING, "Unexpected visitor result '" + control.getResult().getCode() + "'"); -- cgit v1.2.3 From 48d4f8d2efa768fa388af45a5156c0b8aa8ca25e Mon Sep 17 00:00:00 2001 From: Jon Marius Venstad Date: Fri, 9 Jul 2021 15:32:54 +0200 Subject: Avoid thread-unsafe use of ProgressToken --- .../src/main/java/ai/vespa/reindexing/Reindexing.java | 10 ++++++---- .../src/main/java/ai/vespa/reindexing/ReindexingCurator.java | 4 +--- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexing.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexing.java index 1b5a685b69c..896b9dfc26e 100644 --- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexing.java +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexing.java @@ -72,11 +72,11 @@ public class Reindexing { private final Instant startedAt; private final Instant endedAt; - private final ProgressToken progress; + private final String progress; private final State state; private final String message; - Status(Instant startedAt, Instant endedAt, ProgressToken progress, State state, String message) { + Status(Instant startedAt, Instant endedAt, String progress, State state, String message) { this.startedAt = startedAt; this.endedAt = endedAt; this.progress = progress; @@ -100,7 +100,9 @@ public class Reindexing { public Status progressed(ProgressToken progress) { if (state != State.RUNNING) throw new IllegalStateException("Current state must be RUNNING when updating progress"); - return new Status(startedAt, null, requireNonNull(progress), state, null); + synchronized (progress) { + return new Status(startedAt, null, progress.serializeToString(), state, null); + } } /** Returns a copy of this in state HALTED. */ @@ -133,7 +135,7 @@ public class Reindexing { } public Optional progress() { - return Optional.ofNullable(progress); + return Optional.ofNullable(progress).map(ProgressToken::fromSerializedString); } public State state() { diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java index d46ab812aca..6074ce99101 100644 --- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java @@ -5,7 +5,6 @@ import ai.vespa.reindexing.Reindexing.Status; import com.google.common.util.concurrent.UncheckedTimeoutException; import com.yahoo.document.DocumentType; import com.yahoo.document.DocumentTypeManager; -import com.yahoo.documentapi.ProgressToken; import com.yahoo.path.Path; import com.yahoo.slime.Cursor; import com.yahoo.slime.Inspector; @@ -17,7 +16,6 @@ import com.yahoo.yolean.Exceptions; import java.time.Duration; import java.time.Instant; -import java.util.Collection; import java.util.Map; import java.util.function.Function; @@ -126,7 +124,7 @@ public class ReindexingCurator { .collect(toUnmodifiableMap(object -> require(TYPE, object, field -> types.getDocumentType(field.asString())), object -> new Status(require(STARTED_MILLIS, object, field -> Instant.ofEpochMilli(field.asLong())), get(ENDED_MILLIS, object, field -> Instant.ofEpochMilli(field.asLong())), - get(PROGRESS, object, field -> ProgressToken.fromSerializedString(field.asString())), + get(PROGRESS, object, field -> field.asString()), require(STATE, object, field -> toState(field.asString())), get(MESSAGE, object, field -> field.asString()))))); } -- cgit v1.2.3