summaryrefslogtreecommitdiffstats
path: root/clustercontroller-reindexer/src/test
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2020-11-06 16:55:46 +0100
committerJon Marius Venstad <venstad@gmail.com>2020-11-06 16:55:46 +0100
commit604ddaefbb59f1353a16e25e45ad0c241cc79793 (patch)
tree97ac3f57e69447872436944a78911b7b49b6a3dd /clustercontroller-reindexer/src/test
parentcab5c5ae03d0b6bb76f0fbda57e99f6302349bab (diff)
Avoid interrupts for control flow
Diffstat (limited to 'clustercontroller-reindexer/src/test')
-rw-r--r--clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java30
1 files changed, 13 insertions, 17 deletions
diff --git a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java
index 263c32739a7..418b868b9c0 100644
--- a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java
+++ b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java
@@ -44,7 +44,8 @@ class ReindexerTest {
final DocumentmanagerConfig musicConfig = Deriver.getDocumentManagerConfig("src/test/resources/schemas/music.sd").build();
final DocumentTypeManager manager = new DocumentTypeManager(musicConfig);
final DocumentType music = manager.getDocumentType("music");
- final Document document = new Document(music, "id:ns:music::one");
+ final Document document1 = new Document(music, "id:ns:music::one");
+ final Document document2 = new Document(music, "id:ns:music::two");
final Cluster cluster = new Cluster("cluster", "id", Map.of(music, "default"));
final ManualClock clock = new ManualClock(Instant.EPOCH);
@@ -78,7 +79,7 @@ class ReindexerTest {
@Timeout(10)
void nothingToDo() throws ReindexingLockException {
Reindexer reindexer = new Reindexer(cluster, Map.of(), database, access, clock);
- access.createSyncSession(new SyncParameters.Builder().build()).put(new DocumentPut(document));
+ access.createSyncSession(new SyncParameters.Builder().build()).put(new DocumentPut(document1));
access.setPhaser(new Phaser(1)); // Would block any visiting until timeout.
reindexer.reindex();
}
@@ -124,7 +125,7 @@ class ReindexerTest {
reindexer.reindex(); // Nothing happens because status is already FAILED for the document type.
assertEquals(failed, database.readReindexing());
- // It's time to reindex the "music" documents — none yet, so this is a no-op.
+ // It's time to reindex the "music" documents — none yet, so this is a no-op, which just updates the timestamp.
database.writeReindexing(reindexing); // Restore state where reindexing was complete at 5 ms after EPOCH.
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<?> future = executor.submit(uncheckedReindex(reindexer));
@@ -135,16 +136,17 @@ class ReindexerTest {
reindexing = reindexing.with(music, Status.ready(clock.instant()).running().successful(clock.instant()));
assertEquals(reindexing, database.readReindexing());
- // We add a document and interrupt visiting before this document is visited.
- access.createSyncSession(new SyncParameters.Builder().build()).put(new DocumentPut(document));
+ // We add a document and interrupt reindexing before the visit is complete.
+ access.createSyncSession(new SyncParameters.Builder().build()).put(new DocumentPut(document1));
clock.advance(Duration.ofMillis(10));
reindexer = new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(20)), database, access, clock);
future = executor.submit(uncheckedReindex(reindexer));
while (phaser.getRegisteredParties() == 1)
Thread.sleep(1); // Need to wait for the visitor to register, without any proper way of doing it >_<
database.writeReindexing(Reindexing.empty()); // Wreck database while running, to verify we write the expected value.
+ reindexer.shutdown();
phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which may now send the document.
- executor.shutdownNow(); // Interrupt the visit before it completes.
+ phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which may now complete.
future.get(); // Write state to database.
reindexing = reindexing.with(music, Status.ready(clock.instant()).running().halted());
assertEquals(reindexing, database.readReindexing());
@@ -152,29 +154,23 @@ class ReindexerTest {
// Manually put a progress token in there, to verify this is used when resuming, and that we store what we retrieve.
reindexing = reindexing.with(music, Status.ready(clock.instant()).running().progressed(new ProgressToken()).halted());
database.writeReindexing(reindexing);
- phaser = new Phaser(1);
- access.setPhaser(phaser);
- executor = Executors.newSingleThreadExecutor();
+ reindexer = new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(20)), database, access, clock);
future = executor.submit(uncheckedReindex(reindexer));
while (phaser.getRegisteredParties() == 1)
Thread.sleep(1); // Need to wait for the visitor to register, without any proper way of doing it >_<
database.writeReindexing(Reindexing.empty()); // Wreck database while running, to verify we write the expected value.
+ reindexer.shutdown(); // Interrupt the visit before it completes.
phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which may now send the document.
- executor.shutdownNow(); // Interrupt the visit before it completes.
+ phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which may now complete.
future.get(); // Write state to database.
assertEquals(reindexing, database.readReindexing());
- // Finally let the visit complete.
- phaser = new Phaser(1);
- access.setPhaser(phaser);
- executor = Executors.newSingleThreadExecutor();
+ // Finally let the visit complete normally.
+ reindexer = new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(20)), database, access, clock);
future = executor.submit(uncheckedReindex(reindexer));
- while (phaser.getRegisteredParties() == 1)
- Thread.sleep(1); // Need to wait for the visitor to register, without any proper way of doing it >_<
database.writeReindexing(Reindexing.empty()); // Wreck database while running, to verify we write the expected value.
phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which may now send the document.
phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which may now complete.
- phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which has now completed.
future.get(); // Write state to database.
reindexing = reindexing.with(music, Status.ready(clock.instant()).running().successful(clock.instant()));
assertEquals(reindexing, database.readReindexing());