summaryrefslogtreecommitdiffstats
path: root/clustercontroller-reindexer/src/test
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2020-11-09 20:32:38 +0100
committerJon Marius Venstad <venstad@gmail.com>2020-11-09 20:32:38 +0100
commit282e3445253d7529d35e9e43fa77ac089bc2c4c5 (patch)
treea998d9febf06b4e31403ebee2f52c6290d5d6923 /clustercontroller-reindexer/src/test
parent299dab82f32d443be426eb29ebe3f4b8ed5b0fbe (diff)
Make Reindexer more easily testable, andd simplify unit tests
Diffstat (limited to 'clustercontroller-reindexer/src/test')
-rw-r--r--clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java127
1 files changed, 51 insertions, 76 deletions
diff --git a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java
index 20393cba958..4e54dea837b 100644
--- a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java
+++ b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java
@@ -5,15 +5,12 @@ import ai.vespa.reindexing.Reindexer.Cluster;
import ai.vespa.reindexing.Reindexing.Status;
import ai.vespa.reindexing.ReindexingCurator.ReindexingLockException;
import com.yahoo.document.Document;
-import com.yahoo.document.DocumentPut;
import com.yahoo.document.DocumentType;
import com.yahoo.document.DocumentTypeManager;
import com.yahoo.document.config.DocumentmanagerConfig;
-import com.yahoo.documentapi.DocumentAccessParams;
import com.yahoo.documentapi.ProgressToken;
-import com.yahoo.documentapi.SyncParameters;
+import com.yahoo.documentapi.VisitorControlHandler;
import com.yahoo.documentapi.VisitorParameters;
-import com.yahoo.documentapi.local.LocalDocumentAccess;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
import com.yahoo.searchdefinition.derived.Deriver;
import com.yahoo.test.ManualClock;
@@ -25,16 +22,17 @@ import org.junit.jupiter.api.Timeout;
import java.time.Duration;
import java.time.Instant;
import java.util.Map;
-import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.Phaser;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
/**
@@ -42,21 +40,20 @@ import static org.junit.jupiter.api.Assertions.fail;
*/
class ReindexerTest {
+ static final Function<VisitorParameters, Runnable> failIfCalled = __ -> () -> { fail("Not supposed to run"); };
+
final DocumentmanagerConfig musicConfig = Deriver.getDocumentManagerConfig("src/test/resources/schemas/music.sd").build();
final DocumentTypeManager manager = new DocumentTypeManager(musicConfig);
final DocumentType music = manager.getDocumentType("music");
final Document document1 = new Document(music, "id:ns:music::one");
- final Document document2 = new Document(music, "id:ns:music::two");
final Cluster cluster = new Cluster("cluster", "id", Map.of(music, "default"));
final ManualClock clock = new ManualClock(Instant.EPOCH);
ReindexingCurator database;
- LocalDocumentAccess access;
@BeforeEach
void setUp() {
database = new ReindexingCurator(new MockCurator(), "cluster", manager, Duration.ofMillis(1));
- access = new LocalDocumentAccess(new DocumentAccessParams().setDocumentmanagerConfig(musicConfig));
}
@Test
@@ -65,29 +62,26 @@ class ReindexerTest {
() -> new Reindexer(new Cluster("cluster", "id", Map.of()),
Map.of(music, Instant.EPOCH),
database,
- access,
+ failIfCalled,
clock));
}
@Test
void throwsWhenLockHeldElsewhere() throws InterruptedException, ExecutionException {
- Reindexer reindexer = new Reindexer(cluster, Map.of(music, Instant.EPOCH), database, access, clock);
+ Reindexer reindexer = new Reindexer(cluster, Map.of(music, Instant.EPOCH), database, failIfCalled, clock);
Executors.newSingleThreadExecutor().submit(database::lockReindexing).get();
assertThrows(ReindexingLockException.class, reindexer::reindex);
}
@Test
@Timeout(10)
- void nothingToDo() throws ReindexingLockException {
- Reindexer reindexer = new Reindexer(cluster, Map.of(), database, access, clock);
- access.createSyncSession(new SyncParameters.Builder().build()).put(new DocumentPut(document1));
- access.setPhaser(new Phaser(1)); // Would block any visiting until timeout.
- reindexer.reindex();
+ void nothingToDoWithEmptyConfig() throws ReindexingLockException {
+ new Reindexer(cluster, Map.of(), database, failIfCalled, clock).reindex();
}
@Test
void parameters() {
- Reindexer reindexer = new Reindexer(cluster, Map.of(), database, access, clock);
+ Reindexer reindexer = new Reindexer(cluster, Map.of(), database, failIfCalled, clock);
ProgressToken token = new ProgressToken();
VisitorParameters parameters = reindexer.createParameters(music, token);
assertEquals("music:[document]", parameters.getFieldSet());
@@ -103,80 +97,61 @@ class ReindexerTest {
@Timeout(10)
void testReindexing() throws ReindexingLockException, ExecutionException, InterruptedException {
// Reindexer is told to update "music" documents no earlier than EPOCH, which is just now.
- Reindexer reindexer = new Reindexer(cluster, Map.of(music, Instant.EPOCH), database, access, clock);
- Phaser phaser = new Phaser(1);
- access.setPhaser(phaser); // Will block any visiting until timeout.
- reindexer.reindex();
-
- // Since "music" was a new document type, it is stored as just reindexed, and nothing else happens.
+ // Since "music" is a new document type, it is stored as just reindexed, and nothing else happens.
+ new Reindexer(cluster, Map.of(music, Instant.EPOCH), database, failIfCalled, clock).reindex();
Reindexing reindexing = Reindexing.empty().with(music, Status.ready(Instant.EPOCH).running().successful(Instant.EPOCH));
assertEquals(reindexing, database.readReindexing());
// New config tells reindexer to reindex "music" documents no earlier than at 10 millis after EPOCH, which isn't yet.
+ // Nothing happens, since it's not yet time. This isn't supposed to happen unless high clock skew.
clock.advance(Duration.ofMillis(5));
- reindexer = new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(10)), database, access, clock);
- reindexer.reindex(); // Nothing happens because it's not yet time. This isn't supposed to happen unless high clock skew.
+ new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(10)), database, failIfCalled, clock).reindex();
assertEquals(reindexing, database.readReindexing());
- // It's time to reindex the "music" documents, but since we sneak a FAILED status in there, nothing is done.
+ // It's time to reindex the "music" documents — let this complete successfully.
clock.advance(Duration.ofMillis(10));
- Reindexing failed = Reindexing.empty().with(music, Status.ready(clock.instant()).running().failed(clock.instant(), "fail"));
- database.writeReindexing(failed);
- reindexer = new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(10)), database, access, clock);
- reindexer.reindex(); // Nothing happens because status is already FAILED for the document type.
- assertEquals(failed, database.readReindexing());
-
- // It's time to reindex the "music" documents — none yet, so this is a no-op, which just updates the timestamp.
- database.writeReindexing(reindexing); // Restore state where reindexing was complete at 5 ms after EPOCH.
- ExecutorService executor = Executors.newSingleThreadExecutor();
- reindexer.reindex();
+ AtomicBoolean shutDown = new AtomicBoolean();
+ Executor executor = Executors.newSingleThreadExecutor();
+ new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(10)), database, parameters -> {
+ database.writeReindexing(Reindexing.empty()); // Wipe database to verify we write data from reindexer.
+ executor.execute(() -> parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.SUCCESS, "OK"));
+ return () -> shutDown.set(true);
+ }, clock).reindex();
reindexing = reindexing.with(music, Status.ready(clock.instant()).running().successful(clock.instant()));
assertEquals(reindexing, database.readReindexing());
+ assertTrue(shutDown.get(), "Session was shut down");
- // We add a document and interrupt reindexing before the visit is complete.
- access.createSyncSession(new SyncParameters.Builder().build()).put(new DocumentPut(document1));
+ // One more reindexing, this time shut down before visit completes, but after progress is reported.
clock.advance(Duration.ofMillis(10));
- reindexer = new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(20)), database, access, clock);
- Future<?> future = executor.submit(uncheckedReindex(reindexer));
- while (phaser.getRegisteredParties() == 1)
- Thread.sleep(1); // Need to wait for the visitor to register, without any proper way of doing it >_<
- database.writeReindexing(Reindexing.empty()); // Wreck database while running, to verify we write the expected value.
- reindexer.shutdown();
- phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which may now send the document.
- phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which may now complete.
- future.get(); // Write state to database.
- reindexing = reindexing.with(music, Status.ready(clock.instant()).running().halted());
- assertEquals(reindexing, database.readReindexing());
-
- // Manually put a progress token in there, to verify this is used when resuming, and that we store what we retrieve.
+ shutDown.set(false);
+ AtomicReference<Reindexer> aborted = new AtomicReference<>();
+ aborted.set(new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(20)), database, parameters -> {
+ database.writeReindexing(Reindexing.empty()); // Wipe database to verify we write data from reindexer.
+ parameters.getControlHandler().onProgress(new ProgressToken());
+ aborted.get().shutdown();
+ executor.execute(() -> parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.ABORTED, "Shut down"));
+ return () -> shutDown.set(true);
+ }, clock));
+ aborted.get().reindex();
reindexing = reindexing.with(music, Status.ready(clock.instant()).running().progressed(new ProgressToken()).halted());
- database.writeReindexing(reindexing);
- reindexer = new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(20)), database, access, clock);
- future = executor.submit(uncheckedReindex(reindexer));
- while (phaser.getRegisteredParties() == 1)
- Thread.sleep(1); // Need to wait for the visitor to register, without any proper way of doing it >_<
- database.writeReindexing(Reindexing.empty()); // Wreck database while running, to verify we write the expected value.
- reindexer.shutdown(); // Interrupt the visit before it completes.
- phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which may now send the document.
- phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which may now complete.
- future.get(); // Write state to database.
assertEquals(reindexing, database.readReindexing());
+ assertTrue(shutDown.get(), "Session was shut down");
- // Finally let the visit complete normally.
- reindexer = new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(20)), database, access, clock);
- future = executor.submit(uncheckedReindex(reindexer));
- while (phaser.getRegisteredParties() == 1)
- Thread.sleep(1); // Need to wait for the visitor to register, without any proper way of doing it >_<
- database.writeReindexing(Reindexing.empty()); // Wreck database while running, to verify we write the expected value.
- phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which may now send the document.
- phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which may now complete.
- future.get(); // Write state to database.
- reindexing = reindexing.with(music, Status.ready(clock.instant()).running().successful(clock.instant()));
+ // Last reindexing fails.
+ clock.advance(Duration.ofMillis(10));
+ shutDown.set(false);
+ new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(30)), database, parameters -> {
+ database.writeReindexing(Reindexing.empty()); // Wipe database to verify we write data from reindexer.
+ executor.execute(() -> parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.FAILURE, "Error"));
+ return () -> shutDown.set(true);
+ }, clock).reindex();
+ reindexing = reindexing.with(music, Status.ready(clock.instant()).running().failed(clock.instant(), "Error"));
assertEquals(reindexing, database.readReindexing());
- }
+ assertTrue(shutDown.get(), "Session was shut down");
- Callable<Void> uncheckedReindex(Reindexer reindexer) {
- return () -> { reindexer.reindex(); return null; };
+ // Document type is ignored in next run, as it has failed fatally.
+ new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(30)), database, failIfCalled, clock).reindex();
+ assertEquals(reindexing, database.readReindexing());
}
}