summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java13
-rw-r--r--clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java3
-rw-r--r--clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java51
-rw-r--r--config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java1
-rw-r--r--config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ClusterControllerContainer.java1
-rw-r--r--config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ClusterControllerContainerCluster.java2
-rw-r--r--config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ReindexingContext.java7
-rw-r--r--configdefinitions/src/vespa/reindexing.def7
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java29
-rw-r--r--flags/src/main/java/com/yahoo/vespa/flags/Flags.java7
10 files changed, 70 insertions, 51 deletions
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java
index c0a54d55a03..fd887a4196b 100644
--- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java
+++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java
@@ -49,10 +49,10 @@ public class Reindexer {
private final ReindexingMetrics metrics;
private final Clock clock;
private final Phaser phaser = new Phaser(2); // Reindexer and visitor.
+ private final double windowSizeIncrement;
- @Inject
public Reindexer(Cluster cluster, Map<DocumentType, Instant> ready, ReindexingCurator database,
- DocumentAccess access, Metric metric, Clock clock) {
+ DocumentAccess access, Metric metric, Clock clock, double windowSizeIncrement) {
this(cluster,
ready,
database,
@@ -65,11 +65,13 @@ public class Reindexer {
}
},
metric,
- clock);
+ clock,
+ windowSizeIncrement);
}
Reindexer(Cluster cluster, Map<DocumentType, Instant> ready, ReindexingCurator database,
- Function<VisitorParameters, Runnable> visitorSessions, Metric metric, Clock clock) {
+ Function<VisitorParameters, Runnable> visitorSessions, Metric metric, Clock clock,
+ double windowSizeIncrement) {
for (DocumentType type : ready.keySet())
cluster.bucketSpaceOf(type); // Verifies this is known.
@@ -79,6 +81,7 @@ public class Reindexer {
this.visitorSessions = visitorSessions;
this.metrics = new ReindexingMetrics(metric, cluster.name);
this.clock = clock;
+ this.windowSizeIncrement = windowSizeIncrement;
}
/** Lets the reindexer abort any ongoing visit session, wait for it to complete normally, then exit. */
@@ -194,7 +197,7 @@ public class Reindexer {
VisitorParameters createParameters(DocumentType type, ProgressToken progress) {
VisitorParameters parameters = new VisitorParameters(type.getName());
- parameters.setThrottlePolicy(new DynamicThrottlePolicy().setWindowSizeIncrement(0.2)
+ parameters.setThrottlePolicy(new DynamicThrottlePolicy().setWindowSizeIncrement(windowSizeIncrement)
.setWindowSizeDecrementFactor(5)
.setResizeRate(10)
.setMinWindowSize(1));
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java
index 9a114eabbb5..8668ed037ef 100644
--- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java
+++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java
@@ -72,7 +72,8 @@ public class ReindexingMaintainer extends AbstractComponent {
access.getDocumentTypeManager()),
access,
metric,
- clock))
+ clock,
+ reindexingConfig.windowSizeIncrement()))
.collect(toUnmodifiableList());
this.executor = new ScheduledThreadPoolExecutor(reindexingConfig.clusters().size(), new DaemonThreadFactory("reindexer-"));
if (reindexingConfig.enabled())
diff --git a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java
index 7086c36af3f..9a88d8aad1f 100644
--- a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java
+++ b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java
@@ -59,17 +59,12 @@ class ReindexerTest {
@Test
void throwsWhenUnknownBuckets() {
assertThrows(NullPointerException.class,
- () -> new Reindexer(new Cluster("cluster", "id", Map.of()),
- Map.of(music, Instant.EPOCH),
- database,
- failIfCalled,
- metric,
- clock));
+ () -> new Reindexer(new Cluster("cluster", "id", Map.of()), Map.of(music, Instant.EPOCH), database, failIfCalled, metric, clock, 0.2));
}
@Test
void throwsWhenLockHeldElsewhere() throws InterruptedException, ExecutionException {
- Reindexer reindexer = new Reindexer(cluster, Map.of(music, Instant.EPOCH), database, failIfCalled, metric, clock);
+ Reindexer reindexer = new Reindexer(cluster, Map.of(music, Instant.EPOCH), database, failIfCalled, metric, clock, 0.2);
Executors.newSingleThreadExecutor().submit(() -> database.lockReindexing("cluster")).get();
assertThrows(ReindexingLockException.class, reindexer::reindex);
}
@@ -77,13 +72,13 @@ class ReindexerTest {
@Test
@Timeout(10)
void nothingToDoWithEmptyConfig() throws ReindexingLockException {
- new Reindexer(cluster, Map.of(), database, failIfCalled, metric, clock).reindex();
+ new Reindexer(cluster, Map.of(), database, failIfCalled, metric, clock, 0.2).reindex();
assertEquals(Map.of(), metric.metrics());
}
@Test
void testParameters() {
- Reindexer reindexer = new Reindexer(cluster, Map.of(), database, failIfCalled, metric, clock);
+ Reindexer reindexer = new Reindexer(cluster, Map.of(), database, failIfCalled, metric, clock, 0.2);
ProgressToken token = new ProgressToken();
VisitorParameters parameters = reindexer.createParameters(music, token);
assertEquals("music:[document]", parameters.getFieldSet());
@@ -100,7 +95,7 @@ class ReindexerTest {
void testReindexing() throws ReindexingLockException {
// Reindexer is told to update "music" documents no earlier than EPOCH, which is just now.
// Since "music" is a new document type, it is stored as just reindexed, and nothing else happens.
- new Reindexer(cluster, Map.of(music, Instant.EPOCH), database, failIfCalled, metric, clock).reindex();
+ new Reindexer(cluster, Map.of(music, Instant.EPOCH), database, failIfCalled, metric, clock, 0.2).reindex();
Reindexing reindexing = Reindexing.empty().with(music, Status.ready(Instant.EPOCH).running().successful(Instant.EPOCH));
assertEquals(reindexing, database.readReindexing("cluster"));
assertEquals(Map.of("reindexing.progress", Map.of(Map.of("documenttype", "music",
@@ -124,7 +119,7 @@ class ReindexerTest {
// New config tells reindexer to reindex "music" documents no earlier than at 10 millis after EPOCH, which isn't yet.
// Nothing happens, since it's not yet time. This isn't supposed to happen unless high clock skew.
clock.advance(Duration.ofMillis(5));
- new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(10)), database, failIfCalled, metric, clock).reindex();
+ new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(10)), database, failIfCalled, metric, clock, 0.2).reindex();
assertEquals(reindexing, database.readReindexing("cluster"));
// It's time to reindex the "music" documents — let this complete successfully.
@@ -132,10 +127,10 @@ class ReindexerTest {
AtomicBoolean shutDown = new AtomicBoolean();
Executor executor = Executors.newSingleThreadExecutor();
new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(10)), database, parameters -> {
- database.writeReindexing(Reindexing.empty(), "cluster"); // Wipe database to verify we write data from reindexer.
- executor.execute(() -> parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.SUCCESS, "OK"));
- return () -> shutDown.set(true);
- }, metric, clock).reindex();
+ database.writeReindexing(Reindexing.empty(), "cluster"); // Wipe database to verify we write data from reindexer.
+ executor.execute(() -> parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.SUCCESS, "OK"));
+ return () -> shutDown.set(true);
+ }, metric, clock, 0.2).reindex();
reindexing = reindexing.with(music, Status.ready(clock.instant()).running().successful(clock.instant()));
assertEquals(reindexing, database.readReindexing("cluster"));
assertTrue(shutDown.get(), "Session was shut down");
@@ -146,14 +141,14 @@ class ReindexerTest {
shutDown.set(false);
AtomicReference<Reindexer> aborted = new AtomicReference<>();
aborted.set(new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(20)), database, parameters -> {
- database.writeReindexing(Reindexing.empty(), "cluster"); // Wipe database to verify we write data from reindexer.
- parameters.getControlHandler().onProgress(new ProgressToken());
- aborted.get().shutdown();
- return () -> {
- shutDown.set(true);
- parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.ABORTED, "Shut down");
- };
- }, metric, clock));
+ database.writeReindexing(Reindexing.empty(), "cluster"); // Wipe database to verify we write data from reindexer.
+ parameters.getControlHandler().onProgress(new ProgressToken());
+ aborted.get().shutdown();
+ return () -> {
+ shutDown.set(true);
+ parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.ABORTED, "Shut down");
+ };
+ }, metric, clock, 0.2));
aborted.get().reindex();
reindexing = reindexing.with(music, Status.ready(clock.instant()).running().progressed(new ProgressToken()).halted());
assertEquals(reindexing, database.readReindexing("cluster"));
@@ -168,16 +163,16 @@ class ReindexerTest {
clock.advance(Duration.ofMillis(10));
shutDown.set(false);
new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(30)), database, parameters -> {
- database.writeReindexing(Reindexing.empty(), "cluster"); // Wipe database to verify we write data from reindexer.
- executor.execute(() -> parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.FAILURE, "Error"));
- return () -> shutDown.set(true);
- }, metric, clock).reindex();
+ database.writeReindexing(Reindexing.empty(), "cluster"); // Wipe database to verify we write data from reindexer.
+ executor.execute(() -> parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.FAILURE, "Error"));
+ return () -> shutDown.set(true);
+ }, metric, clock, 0.2).reindex();
reindexing = reindexing.with(music, Status.ready(clock.instant()).running().failed(clock.instant(), "Error"));
assertEquals(reindexing, database.readReindexing("cluster"));
assertTrue(shutDown.get(), "Session was shut down");
// Document type is ignored in next run, as it has failed fatally.
- new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(30)), database, failIfCalled, metric, clock).reindex();
+ new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(30)), database, failIfCalled, metric, clock, 0.2).reindex();
assertEquals(reindexing, database.readReindexing("cluster"));
}
diff --git a/config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java b/config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java
index 3e0561a315a..5c04f9d1fd8 100644
--- a/config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java
+++ b/config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java
@@ -65,6 +65,7 @@ public interface ModelContext {
*/
interface FeatureFlags {
@ModelFeatureFlag(owners = {"bjorncs", "jonmv"}) default boolean enableAutomaticReindexing() { return false; }
+ @ModelFeatureFlag(owners = {"bjorncs", "jonmv"}) default double reindexerWindowSizeIncrement() { return 0.2; }
@ModelFeatureFlag(owners = {"baldersheim"}, comment = "Revisit in May or June 2020") default double defaultTermwiseLimit() { throw new UnsupportedOperationException("TODO specify default value"); }
@ModelFeatureFlag(owners = {"vekterli"}) default boolean useThreePhaseUpdates() { throw new UnsupportedOperationException("TODO specify default value"); }
@ModelFeatureFlag(owners = {"geirst"}, comment = "Remove on 7.XXX when this is default on") default boolean useDirectStorageApiRpc() { throw new UnsupportedOperationException("TODO specify default value"); }
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ClusterControllerContainer.java b/config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ClusterControllerContainer.java
index f8932b415b1..14fbeb17aaf 100644
--- a/config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ClusterControllerContainer.java
+++ b/config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ClusterControllerContainer.java
@@ -155,6 +155,7 @@ public class ClusterControllerContainer extends Container implements
}
builder.enabled(ctx.reindexing().enabled());
+ builder.windowSizeIncrement(ctx.windowSizeIncrement());
for (String clusterId : ctx.clusterIds()) {
ReindexingConfig.Clusters.Builder clusterBuilder = new ReindexingConfig.Clusters.Builder();
for (NewDocumentType type : ctx.documentTypesForCluster(clusterId)) {
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ClusterControllerContainerCluster.java b/config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ClusterControllerContainerCluster.java
index 63fc0b4515f..3fe6ce3ff27 100644
--- a/config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ClusterControllerContainerCluster.java
+++ b/config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ClusterControllerContainerCluster.java
@@ -35,7 +35,7 @@ public class ClusterControllerContainerCluster extends ContainerCluster<ClusterC
Reindexing reindexing = deployState.featureFlags().enableAutomaticReindexing()
? deployState.reindexing().orElse(Reindexing.DISABLED_INSTANCE)
: Reindexing.DISABLED_INSTANCE;
- return new ReindexingContext(reindexing);
+ return new ReindexingContext(reindexing, deployState.featureFlags().reindexerWindowSizeIncrement());
}
}
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ReindexingContext.java b/config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ReindexingContext.java
index 712498f78cb..7380b950fb2 100644
--- a/config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ReindexingContext.java
+++ b/config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ReindexingContext.java
@@ -21,9 +21,11 @@ public class ReindexingContext {
private final Object monitor = new Object();
private final Map<String, Set<NewDocumentType>> documentTypesPerCluster = new HashMap<>();
private final Reindexing reindexing;
+ private final double windowSizeIncrement;
- public ReindexingContext(Reindexing reindexing) {
+ public ReindexingContext(Reindexing reindexing, double windowSizeIncrement) {
this.reindexing = Objects.requireNonNull(reindexing);
+ this.windowSizeIncrement = windowSizeIncrement;
}
public void addDocumentType(String clusterId, NewDocumentType type) {
@@ -46,4 +48,7 @@ public class ReindexingContext {
}
public Reindexing reindexing() { return reindexing; }
+
+ public double windowSizeIncrement() { return windowSizeIncrement; }
+
}
diff --git a/configdefinitions/src/vespa/reindexing.def b/configdefinitions/src/vespa/reindexing.def
index e020aec3f65..93dc767fed0 100644
--- a/configdefinitions/src/vespa/reindexing.def
+++ b/configdefinitions/src/vespa/reindexing.def
@@ -6,13 +6,16 @@ namespace=vespa.config.content.reindexing
# Whether reindexing should run at all
enabled bool default=false
-# TODO jonmv: remove after 7.310 is gone
+# TODO jonmv: remove after 7.324 is gone
# The name of the content cluster to reindex documents from
clusterName string default=""
-# TODO jonmv: remove after 7.310 is gone
+# TODO jonmv: remove after 7.324 is gone
# Epoch millis after which latest reprocessing may begin, per document type
status{}.readyAtMillis long
# Epoch millis after which latest reprocessing may begin, per document type, per cluster
clusters{}.documentTypes{}.readyAtMillis long
+
+# Window size increment used for the dynamic throttling policy of the reindexing visitor session
+windowSizeIncrement double default=0.2
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java b/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java
index 4a6fb4bd6bf..15fedee3099 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java
@@ -149,6 +149,7 @@ public class ModelContextImpl implements ModelContext {
public static class FeatureFlags implements ModelContext.FeatureFlags {
private final boolean enableAutomaticReindexing;
+ private final double reindexerWindowSizeIncrement;
private final double defaultTermwiseLimit;
private final boolean useThreePhaseUpdates;
private final boolean useDirectStorageApiRpc;
@@ -166,23 +167,25 @@ public class ModelContextImpl implements ModelContext {
public FeatureFlags(FlagSource source, ApplicationId appId) {
this.enableAutomaticReindexing = flagValue(source, appId, Flags.ENABLE_AUTOMATIC_REINDEXING);
- defaultTermwiseLimit = flagValue(source, appId, Flags.DEFAULT_TERM_WISE_LIMIT);
- useThreePhaseUpdates = flagValue(source, appId, Flags.USE_THREE_PHASE_UPDATES);
- useDirectStorageApiRpc = flagValue(source, appId, Flags.USE_DIRECT_STORAGE_API_RPC);
- feedSequencer = flagValue(source, appId, Flags.FEED_SEQUENCER_TYPE);
- responseSequencer = flagValue(source, appId, Flags.RESPONSE_SEQUENCER_TYPE);
- numResponseThreads = flagValue(source, appId, Flags.RESPONSE_NUM_THREADS);
- skipCommunicationManagerThread = flagValue(source, appId, Flags.SKIP_COMMUNICATIONMANAGER_THREAD);
- skipMbusRequestThread = flagValue(source, appId, Flags.SKIP_MBUS_REQUEST_THREAD);
- skipMbusReplyThread = flagValue(source, appId, Flags.SKIP_MBUS_REPLY_THREAD);
+ this.reindexerWindowSizeIncrement = flagValue(source, appId, Flags.REINDEXER_WINDOW_SIZE_INCREMENT);
+ this.defaultTermwiseLimit = flagValue(source, appId, Flags.DEFAULT_TERM_WISE_LIMIT);
+ this.useThreePhaseUpdates = flagValue(source, appId, Flags.USE_THREE_PHASE_UPDATES);
+ this.useDirectStorageApiRpc = flagValue(source, appId, Flags.USE_DIRECT_STORAGE_API_RPC);
+ this.feedSequencer = flagValue(source, appId, Flags.FEED_SEQUENCER_TYPE);
+ this.responseSequencer = flagValue(source, appId, Flags.RESPONSE_SEQUENCER_TYPE);
+ this.numResponseThreads = flagValue(source, appId, Flags.RESPONSE_NUM_THREADS);
+ this.skipCommunicationManagerThread = flagValue(source, appId, Flags.SKIP_COMMUNICATIONMANAGER_THREAD);
+ this.skipMbusRequestThread = flagValue(source, appId, Flags.SKIP_MBUS_REQUEST_THREAD);
+ this.skipMbusReplyThread = flagValue(source, appId, Flags.SKIP_MBUS_REPLY_THREAD);
this.useAccessControlTlsHandshakeClientAuth = flagValue(source, appId, Flags.USE_ACCESS_CONTROL_CLIENT_AUTHENTICATION);
- useAsyncMessageHandlingOnSchedule = flagValue(source, appId, Flags.USE_ASYNC_MESSAGE_HANDLING_ON_SCHEDULE);
- contentNodeBucketDBStripeBits = flagValue(source, appId, Flags.CONTENT_NODE_BUCKET_DB_STRIPE_BITS);
- mergeChunkSize = flagValue(source, appId, Flags.MERGE_CHUNK_SIZE);
- feedConcurrency = flagValue(source, appId, Flags.FEED_CONCURRENCY);
+ this.useAsyncMessageHandlingOnSchedule = flagValue(source, appId, Flags.USE_ASYNC_MESSAGE_HANDLING_ON_SCHEDULE);
+ this.contentNodeBucketDBStripeBits = flagValue(source, appId, Flags.CONTENT_NODE_BUCKET_DB_STRIPE_BITS);
+ this.mergeChunkSize = flagValue(source, appId, Flags.MERGE_CHUNK_SIZE);
+ this.feedConcurrency = flagValue(source, appId, Flags.FEED_CONCURRENCY);
}
@Override public boolean enableAutomaticReindexing() { return enableAutomaticReindexing; }
+ @Override public double reindexerWindowSizeIncrement() { return reindexerWindowSizeIncrement; }
@Override public double defaultTermwiseLimit() { return defaultTermwiseLimit; }
@Override public boolean useThreePhaseUpdates() { return useThreePhaseUpdates; }
@Override public boolean useDirectStorageApiRpc() { return useDirectStorageApiRpc; }
diff --git a/flags/src/main/java/com/yahoo/vespa/flags/Flags.java b/flags/src/main/java/com/yahoo/vespa/flags/Flags.java
index 728029b0ed9..90d2db27b4f 100644
--- a/flags/src/main/java/com/yahoo/vespa/flags/Flags.java
+++ b/flags/src/main/java/com/yahoo/vespa/flags/Flags.java
@@ -265,6 +265,13 @@ public class Flags {
"Takes effect on next internal redeployment",
APPLICATION_ID);
+ public static final UnboundDoubleFlag REINDEXER_WINDOW_SIZE_INCREMENT = defineDoubleFlag(
+ "reindexer-window-size-increment", 0.2,
+ List.of("jonmv"), "2020-12-09", "2021-02-07",
+ "Window size increment for dynamic throttle policy used by reindexer visitor session — more means more aggressive reindexing",
+ "Takes effect on (re)deployment",
+ APPLICATION_ID);
+
public static final UnboundBooleanFlag USE_POWER_OF_TWO_CHOICES_LOAD_BALANCING = defineFeatureFlag(
"use-power-of-two-choices-load-balancing", false,
List.of("tokle"), "2020-12-02", "2021-02-01",