diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2020-11-09 20:32:38 +0100 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2020-11-09 20:32:38 +0100 |
commit | 282e3445253d7529d35e9e43fa77ac089bc2c4c5 (patch) | |
tree | a998d9febf06b4e31403ebee2f52c6290d5d6923 /clustercontroller-reindexer | |
parent | 299dab82f32d443be426eb29ebe3f4b8ed5b0fbe (diff) |
Make Reindexer more easily testable, andd simplify unit tests
Diffstat (limited to 'clustercontroller-reindexer')
-rw-r--r-- | clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java | 51 | ||||
-rw-r--r-- | clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java | 127 |
2 files changed, 80 insertions, 98 deletions
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java index 9e7e2880036..30e21697a6d 100644 --- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java @@ -3,13 +3,13 @@ package ai.vespa.reindexing; import ai.vespa.reindexing.Reindexing.Status; import ai.vespa.reindexing.ReindexingCurator.ReindexingLockException; +import com.google.inject.Inject; import com.yahoo.document.DocumentType; import com.yahoo.document.select.parser.ParseException; import com.yahoo.documentapi.DocumentAccess; import com.yahoo.documentapi.ProgressToken; import com.yahoo.documentapi.VisitorControlHandler; import com.yahoo.documentapi.VisitorParameters; -import com.yahoo.documentapi.VisitorSession; import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; import com.yahoo.vespa.curator.Lock; @@ -19,16 +19,15 @@ import java.time.Instant; import java.util.Map; import java.util.Objects; import java.util.TreeMap; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.Phaser; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; import java.util.logging.Logger; import static java.util.Objects.requireNonNull; import static java.util.logging.Level.FINE; import static java.util.logging.Level.INFO; import static java.util.logging.Level.WARNING; -import static java.util.stream.Collectors.joining; /** * Progresses reindexing efforts by creating visitor sessions against its own content cluster, @@ -44,22 +43,39 @@ public class Reindexer { private final Cluster cluster; private final Map<DocumentType, Instant> ready; private final ReindexingCurator database; - private final DocumentAccess access; + private final Function<VisitorParameters, Runnable> visitorSessions; private final Clock clock; private final Phaser phaser = new Phaser(2); // Reindexer and visitor. private Reindexing reindexing; private Status status; + @Inject public Reindexer(Cluster cluster, Map<DocumentType, Instant> ready, ReindexingCurator database, DocumentAccess access, Clock clock) { + this(cluster, + ready, + database, + parameters -> { + try { + return access.createVisitorSession(parameters)::destroy; + } + catch (ParseException e) { + throw new IllegalStateException(e); + } + }, + clock); + } + + Reindexer(Cluster cluster, Map<DocumentType, Instant> ready, ReindexingCurator database, + Function<VisitorParameters, Runnable> visitorSessions, Clock clock) { for (DocumentType type : ready.keySet()) cluster.bucketSpaceOf(type); // Verifies this is known. this.cluster = cluster; this.ready = new TreeMap<>(ready); // Iterate through document types in consistent order. this.database = database; - this.access = access; + this.visitorSessions = visitorSessions; this.clock = clock; } @@ -132,7 +148,14 @@ public class Reindexer { phaser.arriveAndAwaitAdvance(); // Synchronize with the reindex thread. } }; - visit(type, status.progress().orElse(null), control); + + VisitorParameters parameters = createParameters(type, status.progress().orElse(null)); + parameters.setControlHandler(control); + Runnable sessionShutdown = visitorSessions.apply(parameters); + + // Wait until done; or until termination is forced, in which case we abort the visit and wait for it to complete. + phaser.arriveAndAwaitAdvance(); // Synchronize with the visitor completion thread. + sessionShutdown.run(); // If we were interrupted, the result may not yet be set in the control handler. switch (control.getResult().getCode()) { @@ -153,22 +176,6 @@ public class Reindexer { database.writeReindexing(reindexing.with(type, status)); } - private void visit(DocumentType type, ProgressToken progress, VisitorControlHandler control) { - VisitorParameters parameters = createParameters(type, progress); - parameters.setControlHandler(control); - VisitorSession session; - try { - session = access.createVisitorSession(parameters); - } - catch (ParseException e) { - throw new IllegalStateException(e); - } - - // Wait until done; or until termination is forced, in which case we abort the visit and wait for it to complete. - phaser.arriveAndAwaitAdvance(); // Synchronize with the visitor completion thread. - session.destroy(); - } - VisitorParameters createParameters(DocumentType type, ProgressToken progress) { VisitorParameters parameters = new VisitorParameters(type.getName()); parameters.setRemoteDataHandler(cluster.name()); diff --git a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java index 20393cba958..4e54dea837b 100644 --- a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java +++ b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java @@ -5,15 +5,12 @@ import ai.vespa.reindexing.Reindexer.Cluster; import ai.vespa.reindexing.Reindexing.Status; import ai.vespa.reindexing.ReindexingCurator.ReindexingLockException; import com.yahoo.document.Document; -import com.yahoo.document.DocumentPut; import com.yahoo.document.DocumentType; import com.yahoo.document.DocumentTypeManager; import com.yahoo.document.config.DocumentmanagerConfig; -import com.yahoo.documentapi.DocumentAccessParams; import com.yahoo.documentapi.ProgressToken; -import com.yahoo.documentapi.SyncParameters; +import com.yahoo.documentapi.VisitorControlHandler; import com.yahoo.documentapi.VisitorParameters; -import com.yahoo.documentapi.local.LocalDocumentAccess; import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; import com.yahoo.searchdefinition.derived.Deriver; import com.yahoo.test.ManualClock; @@ -25,16 +22,17 @@ import org.junit.jupiter.api.Timeout; import java.time.Duration; import java.time.Instant; import java.util.Map; -import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executor; import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.Phaser; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; /** @@ -42,21 +40,20 @@ import static org.junit.jupiter.api.Assertions.fail; */ class ReindexerTest { + static final Function<VisitorParameters, Runnable> failIfCalled = __ -> () -> { fail("Not supposed to run"); }; + final DocumentmanagerConfig musicConfig = Deriver.getDocumentManagerConfig("src/test/resources/schemas/music.sd").build(); final DocumentTypeManager manager = new DocumentTypeManager(musicConfig); final DocumentType music = manager.getDocumentType("music"); final Document document1 = new Document(music, "id:ns:music::one"); - final Document document2 = new Document(music, "id:ns:music::two"); final Cluster cluster = new Cluster("cluster", "id", Map.of(music, "default")); final ManualClock clock = new ManualClock(Instant.EPOCH); ReindexingCurator database; - LocalDocumentAccess access; @BeforeEach void setUp() { database = new ReindexingCurator(new MockCurator(), "cluster", manager, Duration.ofMillis(1)); - access = new LocalDocumentAccess(new DocumentAccessParams().setDocumentmanagerConfig(musicConfig)); } @Test @@ -65,29 +62,26 @@ class ReindexerTest { () -> new Reindexer(new Cluster("cluster", "id", Map.of()), Map.of(music, Instant.EPOCH), database, - access, + failIfCalled, clock)); } @Test void throwsWhenLockHeldElsewhere() throws InterruptedException, ExecutionException { - Reindexer reindexer = new Reindexer(cluster, Map.of(music, Instant.EPOCH), database, access, clock); + Reindexer reindexer = new Reindexer(cluster, Map.of(music, Instant.EPOCH), database, failIfCalled, clock); Executors.newSingleThreadExecutor().submit(database::lockReindexing).get(); assertThrows(ReindexingLockException.class, reindexer::reindex); } @Test @Timeout(10) - void nothingToDo() throws ReindexingLockException { - Reindexer reindexer = new Reindexer(cluster, Map.of(), database, access, clock); - access.createSyncSession(new SyncParameters.Builder().build()).put(new DocumentPut(document1)); - access.setPhaser(new Phaser(1)); // Would block any visiting until timeout. - reindexer.reindex(); + void nothingToDoWithEmptyConfig() throws ReindexingLockException { + new Reindexer(cluster, Map.of(), database, failIfCalled, clock).reindex(); } @Test void parameters() { - Reindexer reindexer = new Reindexer(cluster, Map.of(), database, access, clock); + Reindexer reindexer = new Reindexer(cluster, Map.of(), database, failIfCalled, clock); ProgressToken token = new ProgressToken(); VisitorParameters parameters = reindexer.createParameters(music, token); assertEquals("music:[document]", parameters.getFieldSet()); @@ -103,80 +97,61 @@ class ReindexerTest { @Timeout(10) void testReindexing() throws ReindexingLockException, ExecutionException, InterruptedException { // Reindexer is told to update "music" documents no earlier than EPOCH, which is just now. - Reindexer reindexer = new Reindexer(cluster, Map.of(music, Instant.EPOCH), database, access, clock); - Phaser phaser = new Phaser(1); - access.setPhaser(phaser); // Will block any visiting until timeout. - reindexer.reindex(); - - // Since "music" was a new document type, it is stored as just reindexed, and nothing else happens. + // Since "music" is a new document type, it is stored as just reindexed, and nothing else happens. + new Reindexer(cluster, Map.of(music, Instant.EPOCH), database, failIfCalled, clock).reindex(); Reindexing reindexing = Reindexing.empty().with(music, Status.ready(Instant.EPOCH).running().successful(Instant.EPOCH)); assertEquals(reindexing, database.readReindexing()); // New config tells reindexer to reindex "music" documents no earlier than at 10 millis after EPOCH, which isn't yet. + // Nothing happens, since it's not yet time. This isn't supposed to happen unless high clock skew. clock.advance(Duration.ofMillis(5)); - reindexer = new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(10)), database, access, clock); - reindexer.reindex(); // Nothing happens because it's not yet time. This isn't supposed to happen unless high clock skew. + new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(10)), database, failIfCalled, clock).reindex(); assertEquals(reindexing, database.readReindexing()); - // It's time to reindex the "music" documents, but since we sneak a FAILED status in there, nothing is done. + // It's time to reindex the "music" documents — let this complete successfully. clock.advance(Duration.ofMillis(10)); - Reindexing failed = Reindexing.empty().with(music, Status.ready(clock.instant()).running().failed(clock.instant(), "fail")); - database.writeReindexing(failed); - reindexer = new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(10)), database, access, clock); - reindexer.reindex(); // Nothing happens because status is already FAILED for the document type. - assertEquals(failed, database.readReindexing()); - - // It's time to reindex the "music" documents — none yet, so this is a no-op, which just updates the timestamp. - database.writeReindexing(reindexing); // Restore state where reindexing was complete at 5 ms after EPOCH. - ExecutorService executor = Executors.newSingleThreadExecutor(); - reindexer.reindex(); + AtomicBoolean shutDown = new AtomicBoolean(); + Executor executor = Executors.newSingleThreadExecutor(); + new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(10)), database, parameters -> { + database.writeReindexing(Reindexing.empty()); // Wipe database to verify we write data from reindexer. + executor.execute(() -> parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.SUCCESS, "OK")); + return () -> shutDown.set(true); + }, clock).reindex(); reindexing = reindexing.with(music, Status.ready(clock.instant()).running().successful(clock.instant())); assertEquals(reindexing, database.readReindexing()); + assertTrue(shutDown.get(), "Session was shut down"); - // We add a document and interrupt reindexing before the visit is complete. - access.createSyncSession(new SyncParameters.Builder().build()).put(new DocumentPut(document1)); + // One more reindexing, this time shut down before visit completes, but after progress is reported. clock.advance(Duration.ofMillis(10)); - reindexer = new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(20)), database, access, clock); - Future<?> future = executor.submit(uncheckedReindex(reindexer)); - while (phaser.getRegisteredParties() == 1) - Thread.sleep(1); // Need to wait for the visitor to register, without any proper way of doing it >_< - database.writeReindexing(Reindexing.empty()); // Wreck database while running, to verify we write the expected value. - reindexer.shutdown(); - phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which may now send the document. - phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which may now complete. - future.get(); // Write state to database. - reindexing = reindexing.with(music, Status.ready(clock.instant()).running().halted()); - assertEquals(reindexing, database.readReindexing()); - - // Manually put a progress token in there, to verify this is used when resuming, and that we store what we retrieve. + shutDown.set(false); + AtomicReference<Reindexer> aborted = new AtomicReference<>(); + aborted.set(new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(20)), database, parameters -> { + database.writeReindexing(Reindexing.empty()); // Wipe database to verify we write data from reindexer. + parameters.getControlHandler().onProgress(new ProgressToken()); + aborted.get().shutdown(); + executor.execute(() -> parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.ABORTED, "Shut down")); + return () -> shutDown.set(true); + }, clock)); + aborted.get().reindex(); reindexing = reindexing.with(music, Status.ready(clock.instant()).running().progressed(new ProgressToken()).halted()); - database.writeReindexing(reindexing); - reindexer = new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(20)), database, access, clock); - future = executor.submit(uncheckedReindex(reindexer)); - while (phaser.getRegisteredParties() == 1) - Thread.sleep(1); // Need to wait for the visitor to register, without any proper way of doing it >_< - database.writeReindexing(Reindexing.empty()); // Wreck database while running, to verify we write the expected value. - reindexer.shutdown(); // Interrupt the visit before it completes. - phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which may now send the document. - phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which may now complete. - future.get(); // Write state to database. assertEquals(reindexing, database.readReindexing()); + assertTrue(shutDown.get(), "Session was shut down"); - // Finally let the visit complete normally. - reindexer = new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(20)), database, access, clock); - future = executor.submit(uncheckedReindex(reindexer)); - while (phaser.getRegisteredParties() == 1) - Thread.sleep(1); // Need to wait for the visitor to register, without any proper way of doing it >_< - database.writeReindexing(Reindexing.empty()); // Wreck database while running, to verify we write the expected value. - phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which may now send the document. - phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which may now complete. - future.get(); // Write state to database. - reindexing = reindexing.with(music, Status.ready(clock.instant()).running().successful(clock.instant())); + // Last reindexing fails. + clock.advance(Duration.ofMillis(10)); + shutDown.set(false); + new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(30)), database, parameters -> { + database.writeReindexing(Reindexing.empty()); // Wipe database to verify we write data from reindexer. + executor.execute(() -> parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.FAILURE, "Error")); + return () -> shutDown.set(true); + }, clock).reindex(); + reindexing = reindexing.with(music, Status.ready(clock.instant()).running().failed(clock.instant(), "Error")); assertEquals(reindexing, database.readReindexing()); - } + assertTrue(shutDown.get(), "Session was shut down"); - Callable<Void> uncheckedReindex(Reindexer reindexer) { - return () -> { reindexer.reindex(); return null; }; + // Document type is ignored in next run, as it has failed fatally. + new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(30)), database, failIfCalled, clock).reindex(); + assertEquals(reindexing, database.readReindexing()); } } |