diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2020-11-06 16:55:46 +0100 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2020-11-06 16:55:46 +0100 |
commit | 604ddaefbb59f1353a16e25e45ad0c241cc79793 (patch) | |
tree | 97ac3f57e69447872436944a78911b7b49b6a3dd /clustercontroller-reindexer/src/test | |
parent | cab5c5ae03d0b6bb76f0fbda57e99f6302349bab (diff) |
Avoid interrupts for control flow
Diffstat (limited to 'clustercontroller-reindexer/src/test')
-rw-r--r-- | clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java | 30 |
1 files changed, 13 insertions, 17 deletions
diff --git a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java index 263c32739a7..418b868b9c0 100644 --- a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java +++ b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java @@ -44,7 +44,8 @@ class ReindexerTest { final DocumentmanagerConfig musicConfig = Deriver.getDocumentManagerConfig("src/test/resources/schemas/music.sd").build(); final DocumentTypeManager manager = new DocumentTypeManager(musicConfig); final DocumentType music = manager.getDocumentType("music"); - final Document document = new Document(music, "id:ns:music::one"); + final Document document1 = new Document(music, "id:ns:music::one"); + final Document document2 = new Document(music, "id:ns:music::two"); final Cluster cluster = new Cluster("cluster", "id", Map.of(music, "default")); final ManualClock clock = new ManualClock(Instant.EPOCH); @@ -78,7 +79,7 @@ class ReindexerTest { @Timeout(10) void nothingToDo() throws ReindexingLockException { Reindexer reindexer = new Reindexer(cluster, Map.of(), database, access, clock); - access.createSyncSession(new SyncParameters.Builder().build()).put(new DocumentPut(document)); + access.createSyncSession(new SyncParameters.Builder().build()).put(new DocumentPut(document1)); access.setPhaser(new Phaser(1)); // Would block any visiting until timeout. reindexer.reindex(); } @@ -124,7 +125,7 @@ class ReindexerTest { reindexer.reindex(); // Nothing happens because status is already FAILED for the document type. assertEquals(failed, database.readReindexing()); - // It's time to reindex the "music" documents — none yet, so this is a no-op. + // It's time to reindex the "music" documents — none yet, so this is a no-op, which just updates the timestamp. database.writeReindexing(reindexing); // Restore state where reindexing was complete at 5 ms after EPOCH. ExecutorService executor = Executors.newSingleThreadExecutor(); Future<?> future = executor.submit(uncheckedReindex(reindexer)); @@ -135,16 +136,17 @@ class ReindexerTest { reindexing = reindexing.with(music, Status.ready(clock.instant()).running().successful(clock.instant())); assertEquals(reindexing, database.readReindexing()); - // We add a document and interrupt visiting before this document is visited. - access.createSyncSession(new SyncParameters.Builder().build()).put(new DocumentPut(document)); + // We add a document and interrupt reindexing before the visit is complete. + access.createSyncSession(new SyncParameters.Builder().build()).put(new DocumentPut(document1)); clock.advance(Duration.ofMillis(10)); reindexer = new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(20)), database, access, clock); future = executor.submit(uncheckedReindex(reindexer)); while (phaser.getRegisteredParties() == 1) Thread.sleep(1); // Need to wait for the visitor to register, without any proper way of doing it >_< database.writeReindexing(Reindexing.empty()); // Wreck database while running, to verify we write the expected value. + reindexer.shutdown(); phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which may now send the document. - executor.shutdownNow(); // Interrupt the visit before it completes. + phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which may now complete. future.get(); // Write state to database. reindexing = reindexing.with(music, Status.ready(clock.instant()).running().halted()); assertEquals(reindexing, database.readReindexing()); @@ -152,29 +154,23 @@ class ReindexerTest { // Manually put a progress token in there, to verify this is used when resuming, and that we store what we retrieve. reindexing = reindexing.with(music, Status.ready(clock.instant()).running().progressed(new ProgressToken()).halted()); database.writeReindexing(reindexing); - phaser = new Phaser(1); - access.setPhaser(phaser); - executor = Executors.newSingleThreadExecutor(); + reindexer = new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(20)), database, access, clock); future = executor.submit(uncheckedReindex(reindexer)); while (phaser.getRegisteredParties() == 1) Thread.sleep(1); // Need to wait for the visitor to register, without any proper way of doing it >_< database.writeReindexing(Reindexing.empty()); // Wreck database while running, to verify we write the expected value. + reindexer.shutdown(); // Interrupt the visit before it completes. phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which may now send the document. - executor.shutdownNow(); // Interrupt the visit before it completes. + phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which may now complete. future.get(); // Write state to database. assertEquals(reindexing, database.readReindexing()); - // Finally let the visit complete. - phaser = new Phaser(1); - access.setPhaser(phaser); - executor = Executors.newSingleThreadExecutor(); + // Finally let the visit complete normally. + reindexer = new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(20)), database, access, clock); future = executor.submit(uncheckedReindex(reindexer)); - while (phaser.getRegisteredParties() == 1) - Thread.sleep(1); // Need to wait for the visitor to register, without any proper way of doing it >_< database.writeReindexing(Reindexing.empty()); // Wreck database while running, to verify we write the expected value. phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which may now send the document. phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which may now complete. - phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which has now completed. future.get(); // Write state to database. reindexing = reindexing.with(music, Status.ready(clock.instant()).running().successful(clock.instant())); assertEquals(reindexing, database.readReindexing()); |