diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2021-12-15 12:19:11 +0100 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2021-12-15 12:19:11 +0100 |
commit | 8cffaebf88b753fb0f59ed13c46ca8b86b694193 (patch) | |
tree | fa862ea078fb7d676bba80190f247d84a070bc99 /clustercontroller-reindexer/src/test | |
parent | 4f4feea7dc41252589f14f88d7d0e4e0b107eee1 (diff) |
Support variable reindexing speed, based on config
Diffstat (limited to 'clustercontroller-reindexer/src/test')
-rw-r--r-- | clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java | 37 | ||||
-rw-r--r-- | clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingMaintainerTest.java | 6 |
2 files changed, 28 insertions, 15 deletions
diff --git a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java index 70a65b4a072..5737f038a17 100644 --- a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java +++ b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java @@ -3,6 +3,7 @@ package ai.vespa.reindexing; import ai.vespa.reindexing.Reindexer.Cluster; import ai.vespa.reindexing.Reindexing.Status; +import ai.vespa.reindexing.Reindexing.Trigger; import ai.vespa.reindexing.ReindexingCurator.ReindexingLockException; import com.yahoo.document.DocumentType; import com.yahoo.document.DocumentTypeManager; @@ -12,6 +13,7 @@ import com.yahoo.documentapi.VisitorControlHandler; import com.yahoo.documentapi.VisitorParameters; import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; import com.yahoo.jdisc.test.MockMetric; +import com.yahoo.messagebus.DynamicThrottlePolicy; import com.yahoo.searchdefinition.derived.Deriver; import com.yahoo.test.ManualClock; import com.yahoo.vespa.curator.mock.MockCurator; @@ -21,13 +23,17 @@ import org.junit.jupiter.api.Timeout; import java.time.Duration; import java.time.Instant; +import java.util.Arrays; +import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import static java.util.stream.Collectors.toList; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -57,12 +63,12 @@ class ReindexerTest { @Test void throwsWhenUnknownBuckets() { assertThrows(NullPointerException.class, - () -> new Reindexer(new Cluster("cluster", Map.of()), Map.of(music, Instant.EPOCH), database, ReindexerTest::failIfCalled, metric, clock)); + () -> new Reindexer(new Cluster("cluster", Map.of()), triggers(0), database, ReindexerTest::failIfCalled, metric, clock)); } @Test void throwsWhenLockHeldElsewhere() throws InterruptedException, ExecutionException { - Reindexer reindexer = new Reindexer(cluster, Map.of(music, Instant.EPOCH), database, ReindexerTest::failIfCalled, metric, clock); + Reindexer reindexer = new Reindexer(cluster, triggers(0), database, ReindexerTest::failIfCalled, metric, clock); Executors.newSingleThreadExecutor().submit(() -> database.lockReindexing("cluster")).get(); assertThrows(ReindexingLockException.class, reindexer::reindex); } @@ -70,15 +76,15 @@ class ReindexerTest { @Test @Timeout(10) void nothingToDoWithEmptyConfig() throws ReindexingLockException { - new Reindexer(cluster, Map.of(), database, ReindexerTest::failIfCalled, metric, clock).reindex(); + new Reindexer(cluster, triggers(), database, ReindexerTest::failIfCalled, metric, clock).reindex(); assertEquals(Map.of(), metric.metrics()); } @Test void testParameters() { - Reindexer reindexer = new Reindexer(cluster, Map.of(), database, ReindexerTest::failIfCalled, metric, clock); + Reindexer reindexer = new Reindexer(cluster, triggers(), database, ReindexerTest::failIfCalled, metric, clock); ProgressToken token = new ProgressToken(); - VisitorParameters parameters = reindexer.createParameters(music, token); + VisitorParameters parameters = reindexer.createParameters(music, 0.3, token); assertEquals("music:[document]", parameters.getFieldSet()); assertSame(token, parameters.getResumeToken()); assertEquals("default", parameters.getBucketSpace()); @@ -86,27 +92,28 @@ class ReindexerTest { assertEquals("cluster", parameters.getRemoteDataHandler()); assertEquals("music", parameters.getDocumentSelection()); assertEquals(DocumentProtocol.Priority.NORMAL_3, parameters.getPriority()); + assertEquals(0.3, ((DynamicThrottlePolicy) parameters.getThrottlePolicy()).getWindowSizeIncrement()); } @Test @Timeout(10) void testReindexing() throws ReindexingLockException { // Reindexer is created against en empty database, so any ready document types are assumed already done. - new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(-10)), database, ReindexerTest::failIfCalled, metric, clock).reindex(); + new Reindexer(cluster, triggers(-10), database, ReindexerTest::failIfCalled, metric, clock).reindex(); Reindexing reindexing = Reindexing.empty().with(music, Status.ready(Instant.EPOCH).running().successful(Instant.EPOCH)); assertEquals(reindexing, database.readReindexing("cluster")); // New config tells reindexer to reindex "music" documents no earlier than at 10 millis after EPOCH, which isn't yet. // Nothing happens, since it's not yet time. This isn't supposed to happen unless high clock skew. clock.advance(Duration.ofMillis(5)); - new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(10)), database, ReindexerTest::failIfCalled, metric, clock).reindex(); + new Reindexer(cluster, triggers(10), database, ReindexerTest::failIfCalled, metric, clock).reindex(); assertEquals(reindexing, database.readReindexing("cluster")); // It's time to reindex the "music" documents — let this complete successfully. clock.advance(Duration.ofMillis(10)); AtomicBoolean shutDown = new AtomicBoolean(); Executor executor = Executors.newSingleThreadExecutor(); - new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(10)), database, parameters -> { + new Reindexer(cluster, triggers(10), database, parameters -> { database.writeReindexing(Reindexing.empty(), "cluster"); // Wipe database to verify we write data from reindexer. executor.execute(() -> parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.SUCCESS, "OK")); return () -> shutDown.set(true); @@ -137,7 +144,7 @@ class ReindexerTest { metric.metrics().clear(); shutDown.set(false); AtomicReference<Reindexer> aborted = new AtomicReference<>(); - aborted.set(new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(20)), database, parameters -> { + aborted.set(new Reindexer(cluster, triggers(20), database, parameters -> { database.writeReindexing(Reindexing.empty(), "cluster"); // Wipe database to verify we write data from reindexer. parameters.getControlHandler().onProgress(new ProgressToken()); aborted.get().shutdown(); @@ -157,13 +164,13 @@ class ReindexerTest { "state", "pending"))); // Reindexer is created without any ready document types, which means nothing should run. - new Reindexer(cluster, Map.of(), database, ReindexerTest::failIfCalled, metric, clock).reindex(); + new Reindexer(cluster, triggers(), database, ReindexerTest::failIfCalled, metric, clock).reindex(); assertEquals(reindexing, database.readReindexing("cluster")); // Last reindexing fails. clock.advance(Duration.ofMillis(10)); shutDown.set(false); - new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(30)), database, parameters -> { + new Reindexer(cluster, triggers(30), database, parameters -> { database.writeReindexing(Reindexing.empty(), "cluster"); // Wipe database to verify we write data from reindexer. executor.execute(() -> parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.FAILURE, "Error")); return () -> shutDown.set(true); @@ -174,13 +181,13 @@ class ReindexerTest { // Document type is ignored in next run, as it has failed, and grace period is not yet over. clock.advance(Reindexer.failureGrace.minusMillis(1)); - new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(30)), database, ReindexerTest::failIfCalled, metric, clock).reindex(); + new Reindexer(cluster, triggers(30), database, ReindexerTest::failIfCalled, metric, clock).reindex(); assertEquals(reindexing, database.readReindexing("cluster")); // When failure grace period is over, reindexing resumes as usual. clock.advance(Duration.ofMillis(1)); shutDown.set(false); - new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(30)), database, parameters -> { + new Reindexer(cluster, triggers(30), database, parameters -> { executor.execute(() -> parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.SUCCESS, "OK")); return () -> shutDown.set(true); }, metric, clock).reindex(); @@ -189,4 +196,8 @@ class ReindexerTest { assertTrue(shutDown.get(), "Session was shut down"); } + List<Trigger> triggers(long... ready) { + return Arrays.stream(ready).mapToObj(at -> new Trigger(music, Instant.ofEpochMilli(at), 0.2)).collect(toList()); + } + } diff --git a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingMaintainerTest.java b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingMaintainerTest.java index 2d2119a4b97..c5043f0fac3 100644 --- a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingMaintainerTest.java +++ b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingMaintainerTest.java @@ -2,6 +2,7 @@ package ai.vespa.reindexing; import ai.vespa.reindexing.Reindexer.Cluster; +import ai.vespa.reindexing.Reindexing.Trigger; import com.yahoo.cloud.config.ClusterListConfig; import com.yahoo.document.DocumentTypeManager; import com.yahoo.document.config.DocumentmanagerConfig; @@ -12,6 +13,7 @@ import org.junit.jupiter.api.Test; import java.time.Duration; import java.time.Instant; +import java.util.List; import java.util.Map; import static ai.vespa.reindexing.ReindexingMaintainer.parseCluster; @@ -30,9 +32,9 @@ class ReindexingMaintainerTest { DocumentmanagerConfig musicConfig = Deriver.getDocumentManagerConfig("src/test/resources/schemas/music.sd").build(); DocumentTypeManager manager = new DocumentTypeManager(musicConfig); - assertEquals(Map.of(manager.getDocumentType("music"), Instant.ofEpochMilli(123)), + assertEquals(List.of(new Trigger(manager.getDocumentType("music"), Instant.ofEpochMilli(123), 0.5)), parseReady(new ReindexingConfig.Clusters.Builder() - .documentTypes("music", new ReindexingConfig.Clusters.DocumentTypes.Builder().readyAtMillis(123)) + .documentTypes("music", new ReindexingConfig.Clusters.DocumentTypes.Builder().readyAtMillis(123).speed(0.5)) .build(), manager)); |