summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJon Marius Venstad <jonmv@users.noreply.github.com>2021-07-09 15:47:02 +0200
committerGitHub <noreply@github.com>2021-07-09 15:47:02 +0200
commitfa59320199ed5b9bf644c53e93bb3656d8fd890c (patch)
tree82afc06b01e0a70bcd1312125828e3de3d728518
parent69d8d5cd460a0df75e4957bfbbeae26f495b5eeb (diff)
parent48d4f8d2efa768fa388af45a5156c0b8aa8ca25e (diff)
Merge pull request #18585 from vespa-engine/jonmv/more-reindexer-synch
Jonmv/more reindexer synch
-rw-r--r--clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java6
-rw-r--r--clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexing.java10
-rw-r--r--clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java4
3 files changed, 9 insertions, 11 deletions
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java
index 93b21c8166b..b056ba962fa 100644
--- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java
+++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java
@@ -97,11 +97,10 @@ public class Reindexer {
// Keep metrics in sync across cluster controller containers.
AtomicReference<Reindexing> reindexing = new AtomicReference<>(database.readReindexing(cluster.name()));
- database.writeReindexing(reindexing.get(), cluster.name());
metrics.dump(reindexing.get());
try (Lock lock = database.lockReindexing(cluster.name())) {
- reindexing.set(updateWithReady(ready, reindexing.get(), clock.instant()));
+ reindexing.set(updateWithReady(ready, database.readReindexing(cluster.name()), clock.instant()));
database.writeReindexing(reindexing.get(), cluster.name());
metrics.dump(reindexing.get());
@@ -178,8 +177,7 @@ public class Reindexer {
sessionShutdown.run(); // Shutdown aborts the session unless already complete, then waits for it to terminate normally.
// Only as a last resort will we be interrupted here, and the wait for outstanding replies terminate.
- CompletionCode result = control.getResult() != null ? control.getResult().getCode()
- : CompletionCode.ABORTED;
+ CompletionCode result = control.getResult() != null ? control.getResult().getCode() : CompletionCode.ABORTED;
switch (result) {
default:
log.log(WARNING, "Unexpected visitor result '" + control.getResult().getCode() + "'");
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexing.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexing.java
index 1b5a685b69c..896b9dfc26e 100644
--- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexing.java
+++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexing.java
@@ -72,11 +72,11 @@ public class Reindexing {
private final Instant startedAt;
private final Instant endedAt;
- private final ProgressToken progress;
+ private final String progress;
private final State state;
private final String message;
- Status(Instant startedAt, Instant endedAt, ProgressToken progress, State state, String message) {
+ Status(Instant startedAt, Instant endedAt, String progress, State state, String message) {
this.startedAt = startedAt;
this.endedAt = endedAt;
this.progress = progress;
@@ -100,7 +100,9 @@ public class Reindexing {
public Status progressed(ProgressToken progress) {
if (state != State.RUNNING)
throw new IllegalStateException("Current state must be RUNNING when updating progress");
- return new Status(startedAt, null, requireNonNull(progress), state, null);
+ synchronized (progress) {
+ return new Status(startedAt, null, progress.serializeToString(), state, null);
+ }
}
/** Returns a copy of this in state HALTED. */
@@ -133,7 +135,7 @@ public class Reindexing {
}
public Optional<ProgressToken> progress() {
- return Optional.ofNullable(progress);
+ return Optional.ofNullable(progress).map(ProgressToken::fromSerializedString);
}
public State state() {
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java
index d46ab812aca..6074ce99101 100644
--- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java
+++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java
@@ -5,7 +5,6 @@ import ai.vespa.reindexing.Reindexing.Status;
import com.google.common.util.concurrent.UncheckedTimeoutException;
import com.yahoo.document.DocumentType;
import com.yahoo.document.DocumentTypeManager;
-import com.yahoo.documentapi.ProgressToken;
import com.yahoo.path.Path;
import com.yahoo.slime.Cursor;
import com.yahoo.slime.Inspector;
@@ -17,7 +16,6 @@ import com.yahoo.yolean.Exceptions;
import java.time.Duration;
import java.time.Instant;
-import java.util.Collection;
import java.util.Map;
import java.util.function.Function;
@@ -126,7 +124,7 @@ public class ReindexingCurator {
.collect(toUnmodifiableMap(object -> require(TYPE, object, field -> types.getDocumentType(field.asString())),
object -> new Status(require(STARTED_MILLIS, object, field -> Instant.ofEpochMilli(field.asLong())),
get(ENDED_MILLIS, object, field -> Instant.ofEpochMilli(field.asLong())),
- get(PROGRESS, object, field -> ProgressToken.fromSerializedString(field.asString())),
+ get(PROGRESS, object, field -> field.asString()),
require(STATE, object, field -> toState(field.asString())),
get(MESSAGE, object, field -> field.asString())))));
}