diff options
10 files changed, 70 insertions, 51 deletions
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java index c0a54d55a03..fd887a4196b 100644 --- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java @@ -49,10 +49,10 @@ public class Reindexer { private final ReindexingMetrics metrics; private final Clock clock; private final Phaser phaser = new Phaser(2); // Reindexer and visitor. + private final double windowSizeIncrement; - @Inject public Reindexer(Cluster cluster, Map<DocumentType, Instant> ready, ReindexingCurator database, - DocumentAccess access, Metric metric, Clock clock) { + DocumentAccess access, Metric metric, Clock clock, double windowSizeIncrement) { this(cluster, ready, database, @@ -65,11 +65,13 @@ public class Reindexer { } }, metric, - clock); + clock, + windowSizeIncrement); } Reindexer(Cluster cluster, Map<DocumentType, Instant> ready, ReindexingCurator database, - Function<VisitorParameters, Runnable> visitorSessions, Metric metric, Clock clock) { + Function<VisitorParameters, Runnable> visitorSessions, Metric metric, Clock clock, + double windowSizeIncrement) { for (DocumentType type : ready.keySet()) cluster.bucketSpaceOf(type); // Verifies this is known. @@ -79,6 +81,7 @@ public class Reindexer { this.visitorSessions = visitorSessions; this.metrics = new ReindexingMetrics(metric, cluster.name); this.clock = clock; + this.windowSizeIncrement = windowSizeIncrement; } /** Lets the reindexer abort any ongoing visit session, wait for it to complete normally, then exit. */ @@ -194,7 +197,7 @@ public class Reindexer { VisitorParameters createParameters(DocumentType type, ProgressToken progress) { VisitorParameters parameters = new VisitorParameters(type.getName()); - parameters.setThrottlePolicy(new DynamicThrottlePolicy().setWindowSizeIncrement(0.2) + parameters.setThrottlePolicy(new DynamicThrottlePolicy().setWindowSizeIncrement(windowSizeIncrement) .setWindowSizeDecrementFactor(5) .setResizeRate(10) .setMinWindowSize(1)); diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java index 9a114eabbb5..8668ed037ef 100644 --- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java @@ -72,7 +72,8 @@ public class ReindexingMaintainer extends AbstractComponent { access.getDocumentTypeManager()), access, metric, - clock)) + clock, + reindexingConfig.windowSizeIncrement())) .collect(toUnmodifiableList()); this.executor = new ScheduledThreadPoolExecutor(reindexingConfig.clusters().size(), new DaemonThreadFactory("reindexer-")); if (reindexingConfig.enabled()) diff --git a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java index 7086c36af3f..9a88d8aad1f 100644 --- a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java +++ b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java @@ -59,17 +59,12 @@ class ReindexerTest { @Test void throwsWhenUnknownBuckets() { assertThrows(NullPointerException.class, - () -> new Reindexer(new Cluster("cluster", "id", Map.of()), - Map.of(music, Instant.EPOCH), - database, - failIfCalled, - metric, - clock)); + () -> new Reindexer(new Cluster("cluster", "id", Map.of()), Map.of(music, Instant.EPOCH), database, failIfCalled, metric, clock, 0.2)); } @Test void throwsWhenLockHeldElsewhere() throws InterruptedException, ExecutionException { - Reindexer reindexer = new Reindexer(cluster, Map.of(music, Instant.EPOCH), database, failIfCalled, metric, clock); + Reindexer reindexer = new Reindexer(cluster, Map.of(music, Instant.EPOCH), database, failIfCalled, metric, clock, 0.2); Executors.newSingleThreadExecutor().submit(() -> database.lockReindexing("cluster")).get(); assertThrows(ReindexingLockException.class, reindexer::reindex); } @@ -77,13 +72,13 @@ class ReindexerTest { @Test @Timeout(10) void nothingToDoWithEmptyConfig() throws ReindexingLockException { - new Reindexer(cluster, Map.of(), database, failIfCalled, metric, clock).reindex(); + new Reindexer(cluster, Map.of(), database, failIfCalled, metric, clock, 0.2).reindex(); assertEquals(Map.of(), metric.metrics()); } @Test void testParameters() { - Reindexer reindexer = new Reindexer(cluster, Map.of(), database, failIfCalled, metric, clock); + Reindexer reindexer = new Reindexer(cluster, Map.of(), database, failIfCalled, metric, clock, 0.2); ProgressToken token = new ProgressToken(); VisitorParameters parameters = reindexer.createParameters(music, token); assertEquals("music:[document]", parameters.getFieldSet()); @@ -100,7 +95,7 @@ class ReindexerTest { void testReindexing() throws ReindexingLockException { // Reindexer is told to update "music" documents no earlier than EPOCH, which is just now. // Since "music" is a new document type, it is stored as just reindexed, and nothing else happens. - new Reindexer(cluster, Map.of(music, Instant.EPOCH), database, failIfCalled, metric, clock).reindex(); + new Reindexer(cluster, Map.of(music, Instant.EPOCH), database, failIfCalled, metric, clock, 0.2).reindex(); Reindexing reindexing = Reindexing.empty().with(music, Status.ready(Instant.EPOCH).running().successful(Instant.EPOCH)); assertEquals(reindexing, database.readReindexing("cluster")); assertEquals(Map.of("reindexing.progress", Map.of(Map.of("documenttype", "music", @@ -124,7 +119,7 @@ class ReindexerTest { // New config tells reindexer to reindex "music" documents no earlier than at 10 millis after EPOCH, which isn't yet. // Nothing happens, since it's not yet time. This isn't supposed to happen unless high clock skew. clock.advance(Duration.ofMillis(5)); - new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(10)), database, failIfCalled, metric, clock).reindex(); + new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(10)), database, failIfCalled, metric, clock, 0.2).reindex(); assertEquals(reindexing, database.readReindexing("cluster")); // It's time to reindex the "music" documents — let this complete successfully. @@ -132,10 +127,10 @@ class ReindexerTest { AtomicBoolean shutDown = new AtomicBoolean(); Executor executor = Executors.newSingleThreadExecutor(); new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(10)), database, parameters -> { - database.writeReindexing(Reindexing.empty(), "cluster"); // Wipe database to verify we write data from reindexer. - executor.execute(() -> parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.SUCCESS, "OK")); - return () -> shutDown.set(true); - }, metric, clock).reindex(); + database.writeReindexing(Reindexing.empty(), "cluster"); // Wipe database to verify we write data from reindexer. + executor.execute(() -> parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.SUCCESS, "OK")); + return () -> shutDown.set(true); + }, metric, clock, 0.2).reindex(); reindexing = reindexing.with(music, Status.ready(clock.instant()).running().successful(clock.instant())); assertEquals(reindexing, database.readReindexing("cluster")); assertTrue(shutDown.get(), "Session was shut down"); @@ -146,14 +141,14 @@ class ReindexerTest { shutDown.set(false); AtomicReference<Reindexer> aborted = new AtomicReference<>(); aborted.set(new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(20)), database, parameters -> { - database.writeReindexing(Reindexing.empty(), "cluster"); // Wipe database to verify we write data from reindexer. - parameters.getControlHandler().onProgress(new ProgressToken()); - aborted.get().shutdown(); - return () -> { - shutDown.set(true); - parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.ABORTED, "Shut down"); - }; - }, metric, clock)); + database.writeReindexing(Reindexing.empty(), "cluster"); // Wipe database to verify we write data from reindexer. + parameters.getControlHandler().onProgress(new ProgressToken()); + aborted.get().shutdown(); + return () -> { + shutDown.set(true); + parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.ABORTED, "Shut down"); + }; + }, metric, clock, 0.2)); aborted.get().reindex(); reindexing = reindexing.with(music, Status.ready(clock.instant()).running().progressed(new ProgressToken()).halted()); assertEquals(reindexing, database.readReindexing("cluster")); @@ -168,16 +163,16 @@ class ReindexerTest { clock.advance(Duration.ofMillis(10)); shutDown.set(false); new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(30)), database, parameters -> { - database.writeReindexing(Reindexing.empty(), "cluster"); // Wipe database to verify we write data from reindexer. - executor.execute(() -> parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.FAILURE, "Error")); - return () -> shutDown.set(true); - }, metric, clock).reindex(); + database.writeReindexing(Reindexing.empty(), "cluster"); // Wipe database to verify we write data from reindexer. + executor.execute(() -> parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.FAILURE, "Error")); + return () -> shutDown.set(true); + }, metric, clock, 0.2).reindex(); reindexing = reindexing.with(music, Status.ready(clock.instant()).running().failed(clock.instant(), "Error")); assertEquals(reindexing, database.readReindexing("cluster")); assertTrue(shutDown.get(), "Session was shut down"); // Document type is ignored in next run, as it has failed fatally. - new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(30)), database, failIfCalled, metric, clock).reindex(); + new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(30)), database, failIfCalled, metric, clock, 0.2).reindex(); assertEquals(reindexing, database.readReindexing("cluster")); } diff --git a/config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java b/config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java index 3e0561a315a..5c04f9d1fd8 100644 --- a/config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java +++ b/config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java @@ -65,6 +65,7 @@ public interface ModelContext { */ interface FeatureFlags { @ModelFeatureFlag(owners = {"bjorncs", "jonmv"}) default boolean enableAutomaticReindexing() { return false; } + @ModelFeatureFlag(owners = {"bjorncs", "jonmv"}) default double reindexerWindowSizeIncrement() { return 0.2; } @ModelFeatureFlag(owners = {"baldersheim"}, comment = "Revisit in May or June 2020") default double defaultTermwiseLimit() { throw new UnsupportedOperationException("TODO specify default value"); } @ModelFeatureFlag(owners = {"vekterli"}) default boolean useThreePhaseUpdates() { throw new UnsupportedOperationException("TODO specify default value"); } @ModelFeatureFlag(owners = {"geirst"}, comment = "Remove on 7.XXX when this is default on") default boolean useDirectStorageApiRpc() { throw new UnsupportedOperationException("TODO specify default value"); } diff --git a/config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ClusterControllerContainer.java b/config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ClusterControllerContainer.java index f8932b415b1..14fbeb17aaf 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ClusterControllerContainer.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ClusterControllerContainer.java @@ -155,6 +155,7 @@ public class ClusterControllerContainer extends Container implements } builder.enabled(ctx.reindexing().enabled()); + builder.windowSizeIncrement(ctx.windowSizeIncrement()); for (String clusterId : ctx.clusterIds()) { ReindexingConfig.Clusters.Builder clusterBuilder = new ReindexingConfig.Clusters.Builder(); for (NewDocumentType type : ctx.documentTypesForCluster(clusterId)) { diff --git a/config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ClusterControllerContainerCluster.java b/config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ClusterControllerContainerCluster.java index 63fc0b4515f..3fe6ce3ff27 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ClusterControllerContainerCluster.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ClusterControllerContainerCluster.java @@ -35,7 +35,7 @@ public class ClusterControllerContainerCluster extends ContainerCluster<ClusterC Reindexing reindexing = deployState.featureFlags().enableAutomaticReindexing() ? deployState.reindexing().orElse(Reindexing.DISABLED_INSTANCE) : Reindexing.DISABLED_INSTANCE; - return new ReindexingContext(reindexing); + return new ReindexingContext(reindexing, deployState.featureFlags().reindexerWindowSizeIncrement()); } } diff --git a/config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ReindexingContext.java b/config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ReindexingContext.java index 712498f78cb..7380b950fb2 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ReindexingContext.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ReindexingContext.java @@ -21,9 +21,11 @@ public class ReindexingContext { private final Object monitor = new Object(); private final Map<String, Set<NewDocumentType>> documentTypesPerCluster = new HashMap<>(); private final Reindexing reindexing; + private final double windowSizeIncrement; - public ReindexingContext(Reindexing reindexing) { + public ReindexingContext(Reindexing reindexing, double windowSizeIncrement) { this.reindexing = Objects.requireNonNull(reindexing); + this.windowSizeIncrement = windowSizeIncrement; } public void addDocumentType(String clusterId, NewDocumentType type) { @@ -46,4 +48,7 @@ public class ReindexingContext { } public Reindexing reindexing() { return reindexing; } + + public double windowSizeIncrement() { return windowSizeIncrement; } + } diff --git a/configdefinitions/src/vespa/reindexing.def b/configdefinitions/src/vespa/reindexing.def index e020aec3f65..93dc767fed0 100644 --- a/configdefinitions/src/vespa/reindexing.def +++ b/configdefinitions/src/vespa/reindexing.def @@ -6,13 +6,16 @@ namespace=vespa.config.content.reindexing # Whether reindexing should run at all enabled bool default=false -# TODO jonmv: remove after 7.310 is gone +# TODO jonmv: remove after 7.324 is gone # The name of the content cluster to reindex documents from clusterName string default="" -# TODO jonmv: remove after 7.310 is gone +# TODO jonmv: remove after 7.324 is gone # Epoch millis after which latest reprocessing may begin, per document type status{}.readyAtMillis long # Epoch millis after which latest reprocessing may begin, per document type, per cluster clusters{}.documentTypes{}.readyAtMillis long + +# Window size increment used for the dynamic throttling policy of the reindexing visitor session +windowSizeIncrement double default=0.2 diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java b/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java index 4a6fb4bd6bf..15fedee3099 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java @@ -149,6 +149,7 @@ public class ModelContextImpl implements ModelContext { public static class FeatureFlags implements ModelContext.FeatureFlags { private final boolean enableAutomaticReindexing; + private final double reindexerWindowSizeIncrement; private final double defaultTermwiseLimit; private final boolean useThreePhaseUpdates; private final boolean useDirectStorageApiRpc; @@ -166,23 +167,25 @@ public class ModelContextImpl implements ModelContext { public FeatureFlags(FlagSource source, ApplicationId appId) { this.enableAutomaticReindexing = flagValue(source, appId, Flags.ENABLE_AUTOMATIC_REINDEXING); - defaultTermwiseLimit = flagValue(source, appId, Flags.DEFAULT_TERM_WISE_LIMIT); - useThreePhaseUpdates = flagValue(source, appId, Flags.USE_THREE_PHASE_UPDATES); - useDirectStorageApiRpc = flagValue(source, appId, Flags.USE_DIRECT_STORAGE_API_RPC); - feedSequencer = flagValue(source, appId, Flags.FEED_SEQUENCER_TYPE); - responseSequencer = flagValue(source, appId, Flags.RESPONSE_SEQUENCER_TYPE); - numResponseThreads = flagValue(source, appId, Flags.RESPONSE_NUM_THREADS); - skipCommunicationManagerThread = flagValue(source, appId, Flags.SKIP_COMMUNICATIONMANAGER_THREAD); - skipMbusRequestThread = flagValue(source, appId, Flags.SKIP_MBUS_REQUEST_THREAD); - skipMbusReplyThread = flagValue(source, appId, Flags.SKIP_MBUS_REPLY_THREAD); + this.reindexerWindowSizeIncrement = flagValue(source, appId, Flags.REINDEXER_WINDOW_SIZE_INCREMENT); + this.defaultTermwiseLimit = flagValue(source, appId, Flags.DEFAULT_TERM_WISE_LIMIT); + this.useThreePhaseUpdates = flagValue(source, appId, Flags.USE_THREE_PHASE_UPDATES); + this.useDirectStorageApiRpc = flagValue(source, appId, Flags.USE_DIRECT_STORAGE_API_RPC); + this.feedSequencer = flagValue(source, appId, Flags.FEED_SEQUENCER_TYPE); + this.responseSequencer = flagValue(source, appId, Flags.RESPONSE_SEQUENCER_TYPE); + this.numResponseThreads = flagValue(source, appId, Flags.RESPONSE_NUM_THREADS); + this.skipCommunicationManagerThread = flagValue(source, appId, Flags.SKIP_COMMUNICATIONMANAGER_THREAD); + this.skipMbusRequestThread = flagValue(source, appId, Flags.SKIP_MBUS_REQUEST_THREAD); + this.skipMbusReplyThread = flagValue(source, appId, Flags.SKIP_MBUS_REPLY_THREAD); this.useAccessControlTlsHandshakeClientAuth = flagValue(source, appId, Flags.USE_ACCESS_CONTROL_CLIENT_AUTHENTICATION); - useAsyncMessageHandlingOnSchedule = flagValue(source, appId, Flags.USE_ASYNC_MESSAGE_HANDLING_ON_SCHEDULE); - contentNodeBucketDBStripeBits = flagValue(source, appId, Flags.CONTENT_NODE_BUCKET_DB_STRIPE_BITS); - mergeChunkSize = flagValue(source, appId, Flags.MERGE_CHUNK_SIZE); - feedConcurrency = flagValue(source, appId, Flags.FEED_CONCURRENCY); + this.useAsyncMessageHandlingOnSchedule = flagValue(source, appId, Flags.USE_ASYNC_MESSAGE_HANDLING_ON_SCHEDULE); + this.contentNodeBucketDBStripeBits = flagValue(source, appId, Flags.CONTENT_NODE_BUCKET_DB_STRIPE_BITS); + this.mergeChunkSize = flagValue(source, appId, Flags.MERGE_CHUNK_SIZE); + this.feedConcurrency = flagValue(source, appId, Flags.FEED_CONCURRENCY); } @Override public boolean enableAutomaticReindexing() { return enableAutomaticReindexing; } + @Override public double reindexerWindowSizeIncrement() { return reindexerWindowSizeIncrement; } @Override public double defaultTermwiseLimit() { return defaultTermwiseLimit; } @Override public boolean useThreePhaseUpdates() { return useThreePhaseUpdates; } @Override public boolean useDirectStorageApiRpc() { return useDirectStorageApiRpc; } diff --git a/flags/src/main/java/com/yahoo/vespa/flags/Flags.java b/flags/src/main/java/com/yahoo/vespa/flags/Flags.java index 728029b0ed9..90d2db27b4f 100644 --- a/flags/src/main/java/com/yahoo/vespa/flags/Flags.java +++ b/flags/src/main/java/com/yahoo/vespa/flags/Flags.java @@ -265,6 +265,13 @@ public class Flags { "Takes effect on next internal redeployment", APPLICATION_ID); + public static final UnboundDoubleFlag REINDEXER_WINDOW_SIZE_INCREMENT = defineDoubleFlag( + "reindexer-window-size-increment", 0.2, + List.of("jonmv"), "2020-12-09", "2021-02-07", + "Window size increment for dynamic throttle policy used by reindexer visitor session — more means more aggressive reindexing", + "Takes effect on (re)deployment", + APPLICATION_ID); + public static final UnboundBooleanFlag USE_POWER_OF_TWO_CHOICES_LOAD_BALANCING = defineFeatureFlag( "use-power-of-two-choices-load-balancing", false, List.of("tokle"), "2020-12-02", "2021-02-01", |