aboutsummaryrefslogtreecommitdiffstats
path: root/clustercontroller-reindexer
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2021-02-19 17:42:49 +0100
committerJon Marius Venstad <venstad@gmail.com>2021-02-19 17:42:49 +0100
commiteaa8348f763045387ae231be51da1782ffe434ec (patch)
treefec0c5f00c0712a32732460bef8079aabe3734c3 /clustercontroller-reindexer
parenta219da1a7379606860c874a64f94aafb8973d205 (diff)
Assume reindexing is done for ready types when no data/data lost
Diffstat (limited to 'clustercontroller-reindexer')
-rw-r--r--clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java5
-rw-r--r--clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java16
-rw-r--r--clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java10
3 files changed, 26 insertions, 5 deletions
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java
index af43211011a..c5d484c48b0 100644
--- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java
+++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java
@@ -92,10 +92,11 @@ public class Reindexer {
throw new IllegalStateException("Already shut down");
// Keep metrics in sync across cluster controller containers.
- metrics.dump(database.readReindexing(cluster.name));
+ AtomicReference<Reindexing> reindexing = new AtomicReference<>(database.readReindexingOrDefault(cluster.name, ready, clock.instant()));
+ database.writeReindexing(reindexing.get(), cluster.name());
+ metrics.dump(reindexing.get());
try (Lock lock = database.lockReindexing(cluster.name())) {
- AtomicReference<Reindexing> reindexing = new AtomicReference<>(database.readReindexing(cluster.name()));
reindexing.set(updateWithReady(ready, reindexing.get(), clock.instant()));
database.writeReindexing(reindexing.get(), cluster.name());
metrics.dump(reindexing.get());
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java
index 22ae54fcc6b..c735a27387a 100644
--- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java
+++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java
@@ -3,6 +3,7 @@ package ai.vespa.reindexing;
import ai.vespa.reindexing.Reindexing.Status;
import com.google.common.util.concurrent.UncheckedTimeoutException;
+import com.yahoo.document.DocumentType;
import com.yahoo.document.DocumentTypeManager;
import com.yahoo.documentapi.ProgressToken;
import com.yahoo.path.Path;
@@ -16,6 +17,8 @@ import com.yahoo.yolean.Exceptions;
import java.time.Duration;
import java.time.Instant;
+import java.util.Collection;
+import java.util.Map;
import java.util.function.Function;
import static java.util.Objects.requireNonNull;
@@ -42,6 +45,19 @@ public class ReindexingCurator {
this.lockTimeout = lockTimeout;
}
+ /** If no reindexing data exists (has been wiped), assume current ready documents are already done. */
+ public Reindexing readReindexingOrDefault(String cluster, Map<DocumentType, Instant> ready, Instant now) {
+ return curator.getData(statusPath(cluster)).map(serializer::deserialize)
+ .orElseGet(() -> {
+ Reindexing reindexing = Reindexing.empty();
+ for (DocumentType type : ready.keySet())
+ if (ready.get(type).isBefore(now))
+ reindexing = reindexing.with(type, Status.ready(now).running().successful(now));
+
+ return reindexing;
+ });
+ }
+
public Reindexing readReindexing(String cluster) {
return curator.getData(statusPath(cluster)).map(serializer::deserialize)
.orElse(Reindexing.empty());
diff --git a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java
index 8cf96fa8c45..cd9cf9f845d 100644
--- a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java
+++ b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java
@@ -91,9 +91,9 @@ class ReindexerTest {
@Test
@Timeout(10)
void testReindexing() throws ReindexingLockException {
- // Reindexer is created without any ready document types, which means nothing should run.
- new Reindexer(cluster, Map.of(), database, ReindexerTest::failIfCalled, metric, clock).reindex();
- Reindexing reindexing = Reindexing.empty();
+ // Reindexer is created against en empty database, so any ready document types are assumed already done.
+ new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(-10)), database, ReindexerTest::failIfCalled, metric, clock).reindex();
+ Reindexing reindexing = Reindexing.empty().with(music, Status.ready(Instant.EPOCH).running().successful(Instant.EPOCH));
assertEquals(reindexing, database.readReindexing("cluster"));
// New config tells reindexer to reindex "music" documents no earlier than at 10 millis after EPOCH, which isn't yet.
@@ -156,6 +156,10 @@ class ReindexerTest {
"clusterid", "cluster",
"state", "pending")));
+ // Reindexer is created without any ready document types, which means nothing should run.
+ new Reindexer(cluster, Map.of(), database, ReindexerTest::failIfCalled, metric, clock).reindex();
+ assertEquals(reindexing, database.readReindexing("cluster"));
+
// Last reindexing fails.
clock.advance(Duration.ofMillis(10));
shutDown.set(false);