diff options
Diffstat (limited to 'clustercontroller-reindexer/src')
3 files changed, 16 insertions, 21 deletions
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java index 306e06b7c7e..af43211011a 100644 --- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java @@ -49,10 +49,9 @@ public class Reindexer { private final ReindexingMetrics metrics; private final Clock clock; private final Phaser phaser = new Phaser(2); // Reindexer and visitor. - private final double windowSizeIncrement; public Reindexer(Cluster cluster, Map<DocumentType, Instant> ready, ReindexingCurator database, - DocumentAccess access, Metric metric, Clock clock, double windowSizeIncrement) { + DocumentAccess access, Metric metric, Clock clock) { this(cluster, ready, database, @@ -65,13 +64,12 @@ public class Reindexer { } }, metric, - clock, - windowSizeIncrement); + clock + ); } Reindexer(Cluster cluster, Map<DocumentType, Instant> ready, ReindexingCurator database, - Function<VisitorParameters, Runnable> visitorSessions, Metric metric, Clock clock, - double windowSizeIncrement) { + Function<VisitorParameters, Runnable> visitorSessions, Metric metric, Clock clock) { for (DocumentType type : ready.keySet()) cluster.bucketSpaceOf(type); // Verifies this is known. @@ -81,7 +79,6 @@ public class Reindexer { this.visitorSessions = visitorSessions; this.metrics = new ReindexingMetrics(metric, cluster.name); this.clock = clock; - this.windowSizeIncrement = windowSizeIncrement; } /** Lets the reindexer abort any ongoing visit session, wait for it to complete normally, then exit. */ @@ -198,7 +195,7 @@ public class Reindexer { VisitorParameters createParameters(DocumentType type, ProgressToken progress) { VisitorParameters parameters = new VisitorParameters(type.getName()); - parameters.setThrottlePolicy(new DynamicThrottlePolicy().setWindowSizeIncrement(windowSizeIncrement) + parameters.setThrottlePolicy(new DynamicThrottlePolicy().setWindowSizeIncrement(0.2) .setWindowSizeDecrementFactor(5) .setResizeRate(10) .setMinWindowSize(1)); diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java index 9cb1981c230..a37a24d2b01 100644 --- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java @@ -72,8 +72,7 @@ public class ReindexingMaintainer extends AbstractComponent { access.getDocumentTypeManager()), access, metric, - clock, - reindexingConfig.windowSizeIncrement())) + clock)) .collect(toUnmodifiableList()); this.executor = new ScheduledThreadPoolExecutor(reindexingConfig.clusters().size(), new DaemonThreadFactory("reindexer-")); if (reindexingConfig.enabled()) diff --git a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java index 0f290250e2d..8cf96fa8c45 100644 --- a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java +++ b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java @@ -32,7 +32,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; /** * @author jonmv @@ -58,12 +57,12 @@ class ReindexerTest { @Test void throwsWhenUnknownBuckets() { assertThrows(NullPointerException.class, - () -> new Reindexer(new Cluster("cluster", Map.of()), Map.of(music, Instant.EPOCH), database, ReindexerTest::failIfCalled, metric, clock, 0.2)); + () -> new Reindexer(new Cluster("cluster", Map.of()), Map.of(music, Instant.EPOCH), database, ReindexerTest::failIfCalled, metric, clock)); } @Test void throwsWhenLockHeldElsewhere() throws InterruptedException, ExecutionException { - Reindexer reindexer = new Reindexer(cluster, Map.of(music, Instant.EPOCH), database, ReindexerTest::failIfCalled, metric, clock, 0.2); + Reindexer reindexer = new Reindexer(cluster, Map.of(music, Instant.EPOCH), database, ReindexerTest::failIfCalled, metric, clock); Executors.newSingleThreadExecutor().submit(() -> database.lockReindexing("cluster")).get(); assertThrows(ReindexingLockException.class, reindexer::reindex); } @@ -71,13 +70,13 @@ class ReindexerTest { @Test @Timeout(10) void nothingToDoWithEmptyConfig() throws ReindexingLockException { - new Reindexer(cluster, Map.of(), database, ReindexerTest::failIfCalled, metric, clock, 0.2).reindex(); + new Reindexer(cluster, Map.of(), database, ReindexerTest::failIfCalled, metric, clock).reindex(); assertEquals(Map.of(), metric.metrics()); } @Test void testParameters() { - Reindexer reindexer = new Reindexer(cluster, Map.of(), database, ReindexerTest::failIfCalled, metric, clock, 0.2); + Reindexer reindexer = new Reindexer(cluster, Map.of(), database, ReindexerTest::failIfCalled, metric, clock); ProgressToken token = new ProgressToken(); VisitorParameters parameters = reindexer.createParameters(music, token); assertEquals("music:[document]", parameters.getFieldSet()); @@ -93,14 +92,14 @@ class ReindexerTest { @Timeout(10) void testReindexing() throws ReindexingLockException { // Reindexer is created without any ready document types, which means nothing should run. - new Reindexer(cluster, Map.of(), database, ReindexerTest::failIfCalled, metric, clock, 0.2).reindex(); + new Reindexer(cluster, Map.of(), database, ReindexerTest::failIfCalled, metric, clock).reindex(); Reindexing reindexing = Reindexing.empty(); assertEquals(reindexing, database.readReindexing("cluster")); // New config tells reindexer to reindex "music" documents no earlier than at 10 millis after EPOCH, which isn't yet. // Nothing happens, since it's not yet time. This isn't supposed to happen unless high clock skew. clock.advance(Duration.ofMillis(5)); - new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(10)), database, ReindexerTest::failIfCalled, metric, clock, 0.2).reindex(); + new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(10)), database, ReindexerTest::failIfCalled, metric, clock).reindex(); assertEquals(reindexing, database.readReindexing("cluster")); // It's time to reindex the "music" documents — let this complete successfully. @@ -111,7 +110,7 @@ class ReindexerTest { database.writeReindexing(Reindexing.empty(), "cluster"); // Wipe database to verify we write data from reindexer. executor.execute(() -> parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.SUCCESS, "OK")); return () -> shutDown.set(true); - }, metric, clock, 0.2).reindex(); + }, metric, clock).reindex(); reindexing = reindexing.with(music, Status.ready(clock.instant()).running().successful(clock.instant())); assertEquals(reindexing, database.readReindexing("cluster")); assertTrue(shutDown.get(), "Session was shut down"); @@ -146,7 +145,7 @@ class ReindexerTest { shutDown.set(true); parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.ABORTED, "Shut down"); }; - }, metric, clock, 0.2)); + }, metric, clock)); aborted.get().reindex(); reindexing = reindexing.with(music, Status.ready(clock.instant()).running().progressed(new ProgressToken()).halted()); assertEquals(reindexing, database.readReindexing("cluster")); @@ -164,13 +163,13 @@ class ReindexerTest { database.writeReindexing(Reindexing.empty(), "cluster"); // Wipe database to verify we write data from reindexer. executor.execute(() -> parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.FAILURE, "Error")); return () -> shutDown.set(true); - }, metric, clock, 0.2).reindex(); + }, metric, clock).reindex(); reindexing = reindexing.with(music, Status.ready(clock.instant()).running().failed(clock.instant(), "Error")); assertEquals(reindexing, database.readReindexing("cluster")); assertTrue(shutDown.get(), "Session was shut down"); // Document type is ignored in next run, as it has failed fatally. - new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(30)), database, ReindexerTest::failIfCalled, metric, clock, 0.2).reindex(); + new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(30)), database, ReindexerTest::failIfCalled, metric, clock).reindex(); assertEquals(reindexing, database.readReindexing("cluster")); } |