summaryrefslogtreecommitdiffstats
path: root/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java')
-rw-r--r--clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java37
1 files changed, 24 insertions, 13 deletions
diff --git a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java
index 70a65b4a072..5737f038a17 100644
--- a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java
+++ b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java
@@ -3,6 +3,7 @@ package ai.vespa.reindexing;
import ai.vespa.reindexing.Reindexer.Cluster;
import ai.vespa.reindexing.Reindexing.Status;
+import ai.vespa.reindexing.Reindexing.Trigger;
import ai.vespa.reindexing.ReindexingCurator.ReindexingLockException;
import com.yahoo.document.DocumentType;
import com.yahoo.document.DocumentTypeManager;
@@ -12,6 +13,7 @@ import com.yahoo.documentapi.VisitorControlHandler;
import com.yahoo.documentapi.VisitorParameters;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
import com.yahoo.jdisc.test.MockMetric;
+import com.yahoo.messagebus.DynamicThrottlePolicy;
import com.yahoo.searchdefinition.derived.Deriver;
import com.yahoo.test.ManualClock;
import com.yahoo.vespa.curator.mock.MockCurator;
@@ -21,13 +23,17 @@ import org.junit.jupiter.api.Timeout;
import java.time.Duration;
import java.time.Instant;
+import java.util.Arrays;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import static java.util.stream.Collectors.toList;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -57,12 +63,12 @@ class ReindexerTest {
@Test
void throwsWhenUnknownBuckets() {
assertThrows(NullPointerException.class,
- () -> new Reindexer(new Cluster("cluster", Map.of()), Map.of(music, Instant.EPOCH), database, ReindexerTest::failIfCalled, metric, clock));
+ () -> new Reindexer(new Cluster("cluster", Map.of()), triggers(0), database, ReindexerTest::failIfCalled, metric, clock));
}
@Test
void throwsWhenLockHeldElsewhere() throws InterruptedException, ExecutionException {
- Reindexer reindexer = new Reindexer(cluster, Map.of(music, Instant.EPOCH), database, ReindexerTest::failIfCalled, metric, clock);
+ Reindexer reindexer = new Reindexer(cluster, triggers(0), database, ReindexerTest::failIfCalled, metric, clock);
Executors.newSingleThreadExecutor().submit(() -> database.lockReindexing("cluster")).get();
assertThrows(ReindexingLockException.class, reindexer::reindex);
}
@@ -70,15 +76,15 @@ class ReindexerTest {
@Test
@Timeout(10)
void nothingToDoWithEmptyConfig() throws ReindexingLockException {
- new Reindexer(cluster, Map.of(), database, ReindexerTest::failIfCalled, metric, clock).reindex();
+ new Reindexer(cluster, triggers(), database, ReindexerTest::failIfCalled, metric, clock).reindex();
assertEquals(Map.of(), metric.metrics());
}
@Test
void testParameters() {
- Reindexer reindexer = new Reindexer(cluster, Map.of(), database, ReindexerTest::failIfCalled, metric, clock);
+ Reindexer reindexer = new Reindexer(cluster, triggers(), database, ReindexerTest::failIfCalled, metric, clock);
ProgressToken token = new ProgressToken();
- VisitorParameters parameters = reindexer.createParameters(music, token);
+ VisitorParameters parameters = reindexer.createParameters(music, 0.3, token);
assertEquals("music:[document]", parameters.getFieldSet());
assertSame(token, parameters.getResumeToken());
assertEquals("default", parameters.getBucketSpace());
@@ -86,27 +92,28 @@ class ReindexerTest {
assertEquals("cluster", parameters.getRemoteDataHandler());
assertEquals("music", parameters.getDocumentSelection());
assertEquals(DocumentProtocol.Priority.NORMAL_3, parameters.getPriority());
+ assertEquals(0.3, ((DynamicThrottlePolicy) parameters.getThrottlePolicy()).getWindowSizeIncrement());
}
@Test
@Timeout(10)
void testReindexing() throws ReindexingLockException {
// Reindexer is created against en empty database, so any ready document types are assumed already done.
- new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(-10)), database, ReindexerTest::failIfCalled, metric, clock).reindex();
+ new Reindexer(cluster, triggers(-10), database, ReindexerTest::failIfCalled, metric, clock).reindex();
Reindexing reindexing = Reindexing.empty().with(music, Status.ready(Instant.EPOCH).running().successful(Instant.EPOCH));
assertEquals(reindexing, database.readReindexing("cluster"));
// New config tells reindexer to reindex "music" documents no earlier than at 10 millis after EPOCH, which isn't yet.
// Nothing happens, since it's not yet time. This isn't supposed to happen unless high clock skew.
clock.advance(Duration.ofMillis(5));
- new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(10)), database, ReindexerTest::failIfCalled, metric, clock).reindex();
+ new Reindexer(cluster, triggers(10), database, ReindexerTest::failIfCalled, metric, clock).reindex();
assertEquals(reindexing, database.readReindexing("cluster"));
// It's time to reindex the "music" documents — let this complete successfully.
clock.advance(Duration.ofMillis(10));
AtomicBoolean shutDown = new AtomicBoolean();
Executor executor = Executors.newSingleThreadExecutor();
- new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(10)), database, parameters -> {
+ new Reindexer(cluster, triggers(10), database, parameters -> {
database.writeReindexing(Reindexing.empty(), "cluster"); // Wipe database to verify we write data from reindexer.
executor.execute(() -> parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.SUCCESS, "OK"));
return () -> shutDown.set(true);
@@ -137,7 +144,7 @@ class ReindexerTest {
metric.metrics().clear();
shutDown.set(false);
AtomicReference<Reindexer> aborted = new AtomicReference<>();
- aborted.set(new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(20)), database, parameters -> {
+ aborted.set(new Reindexer(cluster, triggers(20), database, parameters -> {
database.writeReindexing(Reindexing.empty(), "cluster"); // Wipe database to verify we write data from reindexer.
parameters.getControlHandler().onProgress(new ProgressToken());
aborted.get().shutdown();
@@ -157,13 +164,13 @@ class ReindexerTest {
"state", "pending")));
// Reindexer is created without any ready document types, which means nothing should run.
- new Reindexer(cluster, Map.of(), database, ReindexerTest::failIfCalled, metric, clock).reindex();
+ new Reindexer(cluster, triggers(), database, ReindexerTest::failIfCalled, metric, clock).reindex();
assertEquals(reindexing, database.readReindexing("cluster"));
// Last reindexing fails.
clock.advance(Duration.ofMillis(10));
shutDown.set(false);
- new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(30)), database, parameters -> {
+ new Reindexer(cluster, triggers(30), database, parameters -> {
database.writeReindexing(Reindexing.empty(), "cluster"); // Wipe database to verify we write data from reindexer.
executor.execute(() -> parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.FAILURE, "Error"));
return () -> shutDown.set(true);
@@ -174,13 +181,13 @@ class ReindexerTest {
// Document type is ignored in next run, as it has failed, and grace period is not yet over.
clock.advance(Reindexer.failureGrace.minusMillis(1));
- new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(30)), database, ReindexerTest::failIfCalled, metric, clock).reindex();
+ new Reindexer(cluster, triggers(30), database, ReindexerTest::failIfCalled, metric, clock).reindex();
assertEquals(reindexing, database.readReindexing("cluster"));
// When failure grace period is over, reindexing resumes as usual.
clock.advance(Duration.ofMillis(1));
shutDown.set(false);
- new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(30)), database, parameters -> {
+ new Reindexer(cluster, triggers(30), database, parameters -> {
executor.execute(() -> parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.SUCCESS, "OK"));
return () -> shutDown.set(true);
}, metric, clock).reindex();
@@ -189,4 +196,8 @@ class ReindexerTest {
assertTrue(shutDown.get(), "Session was shut down");
}
+ List<Trigger> triggers(long... ready) {
+ return Arrays.stream(ready).mapToObj(at -> new Trigger(music, Instant.ofEpochMilli(at), 0.2)).collect(toList());
+ }
+
}