diff options
author | Jon Marius Venstad <jonmv@users.noreply.github.com> | 2020-12-09 14:58:40 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-12-09 14:58:40 +0100 |
commit | 738b5dbd6607614232c0bf147ffbcc9d77da7f8f (patch) | |
tree | 270d5b36ef804d11741cc9d65208031f7a64d54c /clustercontroller-reindexer | |
parent | b3a1ba0cd48bdad64d672a03ad69551fad942670 (diff) | |
parent | f83b66ff58d911eb3ced062bc89a65dfee9362ec (diff) |
Merge pull request #15754 from vespa-engine/jonmv/reindexing-aggressiveness-in-feature-flag
Control reindexer resource usage with a feature flag
Diffstat (limited to 'clustercontroller-reindexer')
3 files changed, 33 insertions, 34 deletions
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java index c0a54d55a03..fd887a4196b 100644 --- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java @@ -49,10 +49,10 @@ public class Reindexer { private final ReindexingMetrics metrics; private final Clock clock; private final Phaser phaser = new Phaser(2); // Reindexer and visitor. + private final double windowSizeIncrement; - @Inject public Reindexer(Cluster cluster, Map<DocumentType, Instant> ready, ReindexingCurator database, - DocumentAccess access, Metric metric, Clock clock) { + DocumentAccess access, Metric metric, Clock clock, double windowSizeIncrement) { this(cluster, ready, database, @@ -65,11 +65,13 @@ public class Reindexer { } }, metric, - clock); + clock, + windowSizeIncrement); } Reindexer(Cluster cluster, Map<DocumentType, Instant> ready, ReindexingCurator database, - Function<VisitorParameters, Runnable> visitorSessions, Metric metric, Clock clock) { + Function<VisitorParameters, Runnable> visitorSessions, Metric metric, Clock clock, + double windowSizeIncrement) { for (DocumentType type : ready.keySet()) cluster.bucketSpaceOf(type); // Verifies this is known. @@ -79,6 +81,7 @@ public class Reindexer { this.visitorSessions = visitorSessions; this.metrics = new ReindexingMetrics(metric, cluster.name); this.clock = clock; + this.windowSizeIncrement = windowSizeIncrement; } /** Lets the reindexer abort any ongoing visit session, wait for it to complete normally, then exit. */ @@ -194,7 +197,7 @@ public class Reindexer { VisitorParameters createParameters(DocumentType type, ProgressToken progress) { VisitorParameters parameters = new VisitorParameters(type.getName()); - parameters.setThrottlePolicy(new DynamicThrottlePolicy().setWindowSizeIncrement(0.2) + parameters.setThrottlePolicy(new DynamicThrottlePolicy().setWindowSizeIncrement(windowSizeIncrement) .setWindowSizeDecrementFactor(5) .setResizeRate(10) .setMinWindowSize(1)); diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java index 9a114eabbb5..8668ed037ef 100644 --- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java @@ -72,7 +72,8 @@ public class ReindexingMaintainer extends AbstractComponent { access.getDocumentTypeManager()), access, metric, - clock)) + clock, + reindexingConfig.windowSizeIncrement())) .collect(toUnmodifiableList()); this.executor = new ScheduledThreadPoolExecutor(reindexingConfig.clusters().size(), new DaemonThreadFactory("reindexer-")); if (reindexingConfig.enabled()) diff --git a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java index 7086c36af3f..9a88d8aad1f 100644 --- a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java +++ b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java @@ -59,17 +59,12 @@ class ReindexerTest { @Test void throwsWhenUnknownBuckets() { assertThrows(NullPointerException.class, - () -> new Reindexer(new Cluster("cluster", "id", Map.of()), - Map.of(music, Instant.EPOCH), - database, - failIfCalled, - metric, - clock)); + () -> new Reindexer(new Cluster("cluster", "id", Map.of()), Map.of(music, Instant.EPOCH), database, failIfCalled, metric, clock, 0.2)); } @Test void throwsWhenLockHeldElsewhere() throws InterruptedException, ExecutionException { - Reindexer reindexer = new Reindexer(cluster, Map.of(music, Instant.EPOCH), database, failIfCalled, metric, clock); + Reindexer reindexer = new Reindexer(cluster, Map.of(music, Instant.EPOCH), database, failIfCalled, metric, clock, 0.2); Executors.newSingleThreadExecutor().submit(() -> database.lockReindexing("cluster")).get(); assertThrows(ReindexingLockException.class, reindexer::reindex); } @@ -77,13 +72,13 @@ class ReindexerTest { @Test @Timeout(10) void nothingToDoWithEmptyConfig() throws ReindexingLockException { - new Reindexer(cluster, Map.of(), database, failIfCalled, metric, clock).reindex(); + new Reindexer(cluster, Map.of(), database, failIfCalled, metric, clock, 0.2).reindex(); assertEquals(Map.of(), metric.metrics()); } @Test void testParameters() { - Reindexer reindexer = new Reindexer(cluster, Map.of(), database, failIfCalled, metric, clock); + Reindexer reindexer = new Reindexer(cluster, Map.of(), database, failIfCalled, metric, clock, 0.2); ProgressToken token = new ProgressToken(); VisitorParameters parameters = reindexer.createParameters(music, token); assertEquals("music:[document]", parameters.getFieldSet()); @@ -100,7 +95,7 @@ class ReindexerTest { void testReindexing() throws ReindexingLockException { // Reindexer is told to update "music" documents no earlier than EPOCH, which is just now. // Since "music" is a new document type, it is stored as just reindexed, and nothing else happens. - new Reindexer(cluster, Map.of(music, Instant.EPOCH), database, failIfCalled, metric, clock).reindex(); + new Reindexer(cluster, Map.of(music, Instant.EPOCH), database, failIfCalled, metric, clock, 0.2).reindex(); Reindexing reindexing = Reindexing.empty().with(music, Status.ready(Instant.EPOCH).running().successful(Instant.EPOCH)); assertEquals(reindexing, database.readReindexing("cluster")); assertEquals(Map.of("reindexing.progress", Map.of(Map.of("documenttype", "music", @@ -124,7 +119,7 @@ class ReindexerTest { // New config tells reindexer to reindex "music" documents no earlier than at 10 millis after EPOCH, which isn't yet. // Nothing happens, since it's not yet time. This isn't supposed to happen unless high clock skew. clock.advance(Duration.ofMillis(5)); - new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(10)), database, failIfCalled, metric, clock).reindex(); + new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(10)), database, failIfCalled, metric, clock, 0.2).reindex(); assertEquals(reindexing, database.readReindexing("cluster")); // It's time to reindex the "music" documents — let this complete successfully. @@ -132,10 +127,10 @@ class ReindexerTest { AtomicBoolean shutDown = new AtomicBoolean(); Executor executor = Executors.newSingleThreadExecutor(); new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(10)), database, parameters -> { - database.writeReindexing(Reindexing.empty(), "cluster"); // Wipe database to verify we write data from reindexer. - executor.execute(() -> parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.SUCCESS, "OK")); - return () -> shutDown.set(true); - }, metric, clock).reindex(); + database.writeReindexing(Reindexing.empty(), "cluster"); // Wipe database to verify we write data from reindexer. + executor.execute(() -> parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.SUCCESS, "OK")); + return () -> shutDown.set(true); + }, metric, clock, 0.2).reindex(); reindexing = reindexing.with(music, Status.ready(clock.instant()).running().successful(clock.instant())); assertEquals(reindexing, database.readReindexing("cluster")); assertTrue(shutDown.get(), "Session was shut down"); @@ -146,14 +141,14 @@ class ReindexerTest { shutDown.set(false); AtomicReference<Reindexer> aborted = new AtomicReference<>(); aborted.set(new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(20)), database, parameters -> { - database.writeReindexing(Reindexing.empty(), "cluster"); // Wipe database to verify we write data from reindexer. - parameters.getControlHandler().onProgress(new ProgressToken()); - aborted.get().shutdown(); - return () -> { - shutDown.set(true); - parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.ABORTED, "Shut down"); - }; - }, metric, clock)); + database.writeReindexing(Reindexing.empty(), "cluster"); // Wipe database to verify we write data from reindexer. + parameters.getControlHandler().onProgress(new ProgressToken()); + aborted.get().shutdown(); + return () -> { + shutDown.set(true); + parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.ABORTED, "Shut down"); + }; + }, metric, clock, 0.2)); aborted.get().reindex(); reindexing = reindexing.with(music, Status.ready(clock.instant()).running().progressed(new ProgressToken()).halted()); assertEquals(reindexing, database.readReindexing("cluster")); @@ -168,16 +163,16 @@ class ReindexerTest { clock.advance(Duration.ofMillis(10)); shutDown.set(false); new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(30)), database, parameters -> { - database.writeReindexing(Reindexing.empty(), "cluster"); // Wipe database to verify we write data from reindexer. - executor.execute(() -> parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.FAILURE, "Error")); - return () -> shutDown.set(true); - }, metric, clock).reindex(); + database.writeReindexing(Reindexing.empty(), "cluster"); // Wipe database to verify we write data from reindexer. + executor.execute(() -> parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.FAILURE, "Error")); + return () -> shutDown.set(true); + }, metric, clock, 0.2).reindex(); reindexing = reindexing.with(music, Status.ready(clock.instant()).running().failed(clock.instant(), "Error")); assertEquals(reindexing, database.readReindexing("cluster")); assertTrue(shutDown.get(), "Session was shut down"); // Document type is ignored in next run, as it has failed fatally. - new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(30)), database, failIfCalled, metric, clock).reindex(); + new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(30)), database, failIfCalled, metric, clock, 0.2).reindex(); assertEquals(reindexing, database.readReindexing("cluster")); } |