From 51b034bd2ae32161990c88bfac71b057084d818b Mon Sep 17 00:00:00 2001 From: Jon Marius Venstad Date: Mon, 18 Jan 2021 09:48:03 +0100 Subject: Improved model for reindexing data Making the users of reindexing data config aware allows getting rid of the confusing default-for-app-and-cluster status, and the dummy first-time reindexing in the reindexer. Since the upgraded config servers will wipe the "common" status, the non-upgraded config servers will read these as EPOCH, instead of some time last year. This should not be a problem for any users of these data, as far as I can tell --- .../main/java/ai/vespa/reindexing/Reindexer.java | 6 +-- .../java/ai/vespa/reindexing/ReindexerTest.java | 56 +++++++++++----------- 2 files changed, 29 insertions(+), 33 deletions(-) (limited to 'clustercontroller-reindexer') diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java index 37f9d830da8..306e06b7c7e 100644 --- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java @@ -117,11 +117,9 @@ public class Reindexer { } static Reindexing updateWithReady(Map ready, Reindexing reindexing, Instant now) { - for (DocumentType type : ready.keySet()) { // We consider update for document types for which we have config. + for (DocumentType type : ready.keySet()) { // We update only for document types for which we have config. if ( ! ready.get(type).isAfter(now)) { - Status status = reindexing.status().getOrDefault(type, Status.ready(now) - .running() - .successful(now)); + Status status = reindexing.status().getOrDefault(type, Status.ready(now)); if (status.startedAt().isBefore(ready.get(type))) status = Status.ready(now); diff --git a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java index 01586e06015..0f290250e2d 100644 --- a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java +++ b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java @@ -27,7 +27,6 @@ import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Function; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertSame; @@ -40,7 +39,7 @@ import static org.junit.jupiter.api.Assertions.fail; */ class ReindexerTest { - static final Function failIfCalled = __ -> () -> fail("Not supposed to run"); + static Runnable failIfCalled(VisitorParameters ignored) { throw new AssertionError("Not supposed to run"); } final DocumentmanagerConfig musicConfig = Deriver.getDocumentManagerConfig("src/test/resources/schemas/music.sd").build(); final DocumentTypeManager manager = new DocumentTypeManager(musicConfig); @@ -59,12 +58,12 @@ class ReindexerTest { @Test void throwsWhenUnknownBuckets() { assertThrows(NullPointerException.class, - () -> new Reindexer(new Cluster("cluster", Map.of()), Map.of(music, Instant.EPOCH), database, failIfCalled, metric, clock, 0.2)); + () -> new Reindexer(new Cluster("cluster", Map.of()), Map.of(music, Instant.EPOCH), database, ReindexerTest::failIfCalled, metric, clock, 0.2)); } @Test void throwsWhenLockHeldElsewhere() throws InterruptedException, ExecutionException { - Reindexer reindexer = new Reindexer(cluster, Map.of(music, Instant.EPOCH), database, failIfCalled, metric, clock, 0.2); + Reindexer reindexer = new Reindexer(cluster, Map.of(music, Instant.EPOCH), database, ReindexerTest::failIfCalled, metric, clock, 0.2); Executors.newSingleThreadExecutor().submit(() -> database.lockReindexing("cluster")).get(); assertThrows(ReindexingLockException.class, reindexer::reindex); } @@ -72,13 +71,13 @@ class ReindexerTest { @Test @Timeout(10) void nothingToDoWithEmptyConfig() throws ReindexingLockException { - new Reindexer(cluster, Map.of(), database, failIfCalled, metric, clock, 0.2).reindex(); + new Reindexer(cluster, Map.of(), database, ReindexerTest::failIfCalled, metric, clock, 0.2).reindex(); assertEquals(Map.of(), metric.metrics()); } @Test void testParameters() { - Reindexer reindexer = new Reindexer(cluster, Map.of(), database, failIfCalled, metric, clock, 0.2); + Reindexer reindexer = new Reindexer(cluster, Map.of(), database, ReindexerTest::failIfCalled, metric, clock, 0.2); ProgressToken token = new ProgressToken(); VisitorParameters parameters = reindexer.createParameters(music, token); assertEquals("music:[document]", parameters.getFieldSet()); @@ -93,33 +92,15 @@ class ReindexerTest { @Test @Timeout(10) void testReindexing() throws ReindexingLockException { - // Reindexer is told to update "music" documents no earlier than EPOCH, which is just now. - // Since "music" is a new document type, it is stored as just reindexed, and nothing else happens. - new Reindexer(cluster, Map.of(music, Instant.EPOCH), database, failIfCalled, metric, clock, 0.2).reindex(); - Reindexing reindexing = Reindexing.empty().with(music, Status.ready(Instant.EPOCH).running().successful(Instant.EPOCH)); + // Reindexer is created without any ready document types, which means nothing should run. + new Reindexer(cluster, Map.of(), database, ReindexerTest::failIfCalled, metric, clock, 0.2).reindex(); + Reindexing reindexing = Reindexing.empty(); assertEquals(reindexing, database.readReindexing("cluster")); - assertEquals(Map.of("reindexing.progress", Map.of(Map.of("documenttype", "music", - "clusterid", "cluster", - "state", "successful"), - 1.0, - Map.of("documenttype", "music", - "clusterid", "cluster", - "state", "pending"), - -1.0, - Map.of("documenttype", "music", - "clusterid", "cluster", - "state", "failed"), - -1.0, - Map.of("documenttype", "music", - "clusterid", "cluster", - "state", "running"), - -1.0)), - metric.metrics()); // New config tells reindexer to reindex "music" documents no earlier than at 10 millis after EPOCH, which isn't yet. // Nothing happens, since it's not yet time. This isn't supposed to happen unless high clock skew. clock.advance(Duration.ofMillis(5)); - new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(10)), database, failIfCalled, metric, clock, 0.2).reindex(); + new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(10)), database, ReindexerTest::failIfCalled, metric, clock, 0.2).reindex(); assertEquals(reindexing, database.readReindexing("cluster")); // It's time to reindex the "music" documents — let this complete successfully. @@ -134,6 +115,23 @@ class ReindexerTest { reindexing = reindexing.with(music, Status.ready(clock.instant()).running().successful(clock.instant())); assertEquals(reindexing, database.readReindexing("cluster")); assertTrue(shutDown.get(), "Session was shut down"); + assertEquals(Map.of("reindexing.progress", Map.of(Map.of("documenttype", "music", + "clusterid", "cluster", + "state", "successful"), + 1.0, + Map.of("documenttype", "music", + "clusterid", "cluster", + "state", "pending"), + -1.0, + Map.of("documenttype", "music", + "clusterid", "cluster", + "state", "failed"), + -1.0, + Map.of("documenttype", "music", + "clusterid", "cluster", + "state", "running"), + -1.0)), + metric.metrics()); // One more reindexing, this time shut down before visit completes, but after progress is reported. clock.advance(Duration.ofMillis(10)); @@ -172,7 +170,7 @@ class ReindexerTest { assertTrue(shutDown.get(), "Session was shut down"); // Document type is ignored in next run, as it has failed fatally. - new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(30)), database, failIfCalled, metric, clock, 0.2).reindex(); + new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(30)), database, ReindexerTest::failIfCalled, metric, clock, 0.2).reindex(); assertEquals(reindexing, database.readReindexing("cluster")); } -- cgit v1.2.3