diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2020-11-09 20:32:38 +0100 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2020-11-09 20:32:38 +0100 |
commit | 282e3445253d7529d35e9e43fa77ac089bc2c4c5 (patch) | |
tree | a998d9febf06b4e31403ebee2f52c6290d5d6923 /clustercontroller-reindexer/src/test | |
parent | 299dab82f32d443be426eb29ebe3f4b8ed5b0fbe (diff) |
Make Reindexer more easily testable, andd simplify unit tests
Diffstat (limited to 'clustercontroller-reindexer/src/test')
-rw-r--r-- | clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java | 127 |
1 files changed, 51 insertions, 76 deletions
diff --git a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java index 20393cba958..4e54dea837b 100644 --- a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java +++ b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java @@ -5,15 +5,12 @@ import ai.vespa.reindexing.Reindexer.Cluster; import ai.vespa.reindexing.Reindexing.Status; import ai.vespa.reindexing.ReindexingCurator.ReindexingLockException; import com.yahoo.document.Document; -import com.yahoo.document.DocumentPut; import com.yahoo.document.DocumentType; import com.yahoo.document.DocumentTypeManager; import com.yahoo.document.config.DocumentmanagerConfig; -import com.yahoo.documentapi.DocumentAccessParams; import com.yahoo.documentapi.ProgressToken; -import com.yahoo.documentapi.SyncParameters; +import com.yahoo.documentapi.VisitorControlHandler; import com.yahoo.documentapi.VisitorParameters; -import com.yahoo.documentapi.local.LocalDocumentAccess; import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; import com.yahoo.searchdefinition.derived.Deriver; import com.yahoo.test.ManualClock; @@ -25,16 +22,17 @@ import org.junit.jupiter.api.Timeout; import java.time.Duration; import java.time.Instant; import java.util.Map; -import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executor; import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.Phaser; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; /** @@ -42,21 +40,20 @@ import static org.junit.jupiter.api.Assertions.fail; */ class ReindexerTest { + static final Function<VisitorParameters, Runnable> failIfCalled = __ -> () -> { fail("Not supposed to run"); }; + final DocumentmanagerConfig musicConfig = Deriver.getDocumentManagerConfig("src/test/resources/schemas/music.sd").build(); final DocumentTypeManager manager = new DocumentTypeManager(musicConfig); final DocumentType music = manager.getDocumentType("music"); final Document document1 = new Document(music, "id:ns:music::one"); - final Document document2 = new Document(music, "id:ns:music::two"); final Cluster cluster = new Cluster("cluster", "id", Map.of(music, "default")); final ManualClock clock = new ManualClock(Instant.EPOCH); ReindexingCurator database; - LocalDocumentAccess access; @BeforeEach void setUp() { database = new ReindexingCurator(new MockCurator(), "cluster", manager, Duration.ofMillis(1)); - access = new LocalDocumentAccess(new DocumentAccessParams().setDocumentmanagerConfig(musicConfig)); } @Test @@ -65,29 +62,26 @@ class ReindexerTest { () -> new Reindexer(new Cluster("cluster", "id", Map.of()), Map.of(music, Instant.EPOCH), database, - access, + failIfCalled, clock)); } @Test void throwsWhenLockHeldElsewhere() throws InterruptedException, ExecutionException { - Reindexer reindexer = new Reindexer(cluster, Map.of(music, Instant.EPOCH), database, access, clock); + Reindexer reindexer = new Reindexer(cluster, Map.of(music, Instant.EPOCH), database, failIfCalled, clock); Executors.newSingleThreadExecutor().submit(database::lockReindexing).get(); assertThrows(ReindexingLockException.class, reindexer::reindex); } @Test @Timeout(10) - void nothingToDo() throws ReindexingLockException { - Reindexer reindexer = new Reindexer(cluster, Map.of(), database, access, clock); - access.createSyncSession(new SyncParameters.Builder().build()).put(new DocumentPut(document1)); - access.setPhaser(new Phaser(1)); // Would block any visiting until timeout. - reindexer.reindex(); + void nothingToDoWithEmptyConfig() throws ReindexingLockException { + new Reindexer(cluster, Map.of(), database, failIfCalled, clock).reindex(); } @Test void parameters() { - Reindexer reindexer = new Reindexer(cluster, Map.of(), database, access, clock); + Reindexer reindexer = new Reindexer(cluster, Map.of(), database, failIfCalled, clock); ProgressToken token = new ProgressToken(); VisitorParameters parameters = reindexer.createParameters(music, token); assertEquals("music:[document]", parameters.getFieldSet()); @@ -103,80 +97,61 @@ class ReindexerTest { @Timeout(10) void testReindexing() throws ReindexingLockException, ExecutionException, InterruptedException { // Reindexer is told to update "music" documents no earlier than EPOCH, which is just now. - Reindexer reindexer = new Reindexer(cluster, Map.of(music, Instant.EPOCH), database, access, clock); - Phaser phaser = new Phaser(1); - access.setPhaser(phaser); // Will block any visiting until timeout. - reindexer.reindex(); - - // Since "music" was a new document type, it is stored as just reindexed, and nothing else happens. + // Since "music" is a new document type, it is stored as just reindexed, and nothing else happens. + new Reindexer(cluster, Map.of(music, Instant.EPOCH), database, failIfCalled, clock).reindex(); Reindexing reindexing = Reindexing.empty().with(music, Status.ready(Instant.EPOCH).running().successful(Instant.EPOCH)); assertEquals(reindexing, database.readReindexing()); // New config tells reindexer to reindex "music" documents no earlier than at 10 millis after EPOCH, which isn't yet. + // Nothing happens, since it's not yet time. This isn't supposed to happen unless high clock skew. clock.advance(Duration.ofMillis(5)); - reindexer = new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(10)), database, access, clock); - reindexer.reindex(); // Nothing happens because it's not yet time. This isn't supposed to happen unless high clock skew. + new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(10)), database, failIfCalled, clock).reindex(); assertEquals(reindexing, database.readReindexing()); - // It's time to reindex the "music" documents, but since we sneak a FAILED status in there, nothing is done. + // It's time to reindex the "music" documents — let this complete successfully. clock.advance(Duration.ofMillis(10)); - Reindexing failed = Reindexing.empty().with(music, Status.ready(clock.instant()).running().failed(clock.instant(), "fail")); - database.writeReindexing(failed); - reindexer = new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(10)), database, access, clock); - reindexer.reindex(); // Nothing happens because status is already FAILED for the document type. - assertEquals(failed, database.readReindexing()); - - // It's time to reindex the "music" documents — none yet, so this is a no-op, which just updates the timestamp. - database.writeReindexing(reindexing); // Restore state where reindexing was complete at 5 ms after EPOCH. - ExecutorService executor = Executors.newSingleThreadExecutor(); - reindexer.reindex(); + AtomicBoolean shutDown = new AtomicBoolean(); + Executor executor = Executors.newSingleThreadExecutor(); + new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(10)), database, parameters -> { + database.writeReindexing(Reindexing.empty()); // Wipe database to verify we write data from reindexer. + executor.execute(() -> parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.SUCCESS, "OK")); + return () -> shutDown.set(true); + }, clock).reindex(); reindexing = reindexing.with(music, Status.ready(clock.instant()).running().successful(clock.instant())); assertEquals(reindexing, database.readReindexing()); + assertTrue(shutDown.get(), "Session was shut down"); - // We add a document and interrupt reindexing before the visit is complete. - access.createSyncSession(new SyncParameters.Builder().build()).put(new DocumentPut(document1)); + // One more reindexing, this time shut down before visit completes, but after progress is reported. clock.advance(Duration.ofMillis(10)); - reindexer = new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(20)), database, access, clock); - Future<?> future = executor.submit(uncheckedReindex(reindexer)); - while (phaser.getRegisteredParties() == 1) - Thread.sleep(1); // Need to wait for the visitor to register, without any proper way of doing it >_< - database.writeReindexing(Reindexing.empty()); // Wreck database while running, to verify we write the expected value. - reindexer.shutdown(); - phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which may now send the document. - phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which may now complete. - future.get(); // Write state to database. - reindexing = reindexing.with(music, Status.ready(clock.instant()).running().halted()); - assertEquals(reindexing, database.readReindexing()); - - // Manually put a progress token in there, to verify this is used when resuming, and that we store what we retrieve. + shutDown.set(false); + AtomicReference<Reindexer> aborted = new AtomicReference<>(); + aborted.set(new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(20)), database, parameters -> { + database.writeReindexing(Reindexing.empty()); // Wipe database to verify we write data from reindexer. + parameters.getControlHandler().onProgress(new ProgressToken()); + aborted.get().shutdown(); + executor.execute(() -> parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.ABORTED, "Shut down")); + return () -> shutDown.set(true); + }, clock)); + aborted.get().reindex(); reindexing = reindexing.with(music, Status.ready(clock.instant()).running().progressed(new ProgressToken()).halted()); - database.writeReindexing(reindexing); - reindexer = new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(20)), database, access, clock); - future = executor.submit(uncheckedReindex(reindexer)); - while (phaser.getRegisteredParties() == 1) - Thread.sleep(1); // Need to wait for the visitor to register, without any proper way of doing it >_< - database.writeReindexing(Reindexing.empty()); // Wreck database while running, to verify we write the expected value. - reindexer.shutdown(); // Interrupt the visit before it completes. - phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which may now send the document. - phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which may now complete. - future.get(); // Write state to database. assertEquals(reindexing, database.readReindexing()); + assertTrue(shutDown.get(), "Session was shut down"); - // Finally let the visit complete normally. - reindexer = new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(20)), database, access, clock); - future = executor.submit(uncheckedReindex(reindexer)); - while (phaser.getRegisteredParties() == 1) - Thread.sleep(1); // Need to wait for the visitor to register, without any proper way of doing it >_< - database.writeReindexing(Reindexing.empty()); // Wreck database while running, to verify we write the expected value. - phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which may now send the document. - phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which may now complete. - future.get(); // Write state to database. - reindexing = reindexing.with(music, Status.ready(clock.instant()).running().successful(clock.instant())); + // Last reindexing fails. + clock.advance(Duration.ofMillis(10)); + shutDown.set(false); + new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(30)), database, parameters -> { + database.writeReindexing(Reindexing.empty()); // Wipe database to verify we write data from reindexer. + executor.execute(() -> parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.FAILURE, "Error")); + return () -> shutDown.set(true); + }, clock).reindex(); + reindexing = reindexing.with(music, Status.ready(clock.instant()).running().failed(clock.instant(), "Error")); assertEquals(reindexing, database.readReindexing()); - } + assertTrue(shutDown.get(), "Session was shut down"); - Callable<Void> uncheckedReindex(Reindexer reindexer) { - return () -> { reindexer.reindex(); return null; }; + // Document type is ignored in next run, as it has failed fatally. + new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(30)), database, failIfCalled, clock).reindex(); + assertEquals(reindexing, database.readReindexing()); } } |