aboutsummaryrefslogtreecommitdiffstats
path: root/clustercontroller-reindexer
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2020-11-09 20:32:38 +0100
committerJon Marius Venstad <venstad@gmail.com>2020-11-09 20:32:38 +0100
commit282e3445253d7529d35e9e43fa77ac089bc2c4c5 (patch)
treea998d9febf06b4e31403ebee2f52c6290d5d6923 /clustercontroller-reindexer
parent299dab82f32d443be426eb29ebe3f4b8ed5b0fbe (diff)
Make Reindexer more easily testable, andd simplify unit tests
Diffstat (limited to 'clustercontroller-reindexer')
-rw-r--r--clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java51
-rw-r--r--clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java127
2 files changed, 80 insertions, 98 deletions
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java
index 9e7e2880036..30e21697a6d 100644
--- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java
+++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java
@@ -3,13 +3,13 @@ package ai.vespa.reindexing;
import ai.vespa.reindexing.Reindexing.Status;
import ai.vespa.reindexing.ReindexingCurator.ReindexingLockException;
+import com.google.inject.Inject;
import com.yahoo.document.DocumentType;
import com.yahoo.document.select.parser.ParseException;
import com.yahoo.documentapi.DocumentAccess;
import com.yahoo.documentapi.ProgressToken;
import com.yahoo.documentapi.VisitorControlHandler;
import com.yahoo.documentapi.VisitorParameters;
-import com.yahoo.documentapi.VisitorSession;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
import com.yahoo.vespa.curator.Lock;
@@ -19,16 +19,15 @@ import java.time.Instant;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
import java.util.logging.Logger;
import static java.util.Objects.requireNonNull;
import static java.util.logging.Level.FINE;
import static java.util.logging.Level.INFO;
import static java.util.logging.Level.WARNING;
-import static java.util.stream.Collectors.joining;
/**
* Progresses reindexing efforts by creating visitor sessions against its own content cluster,
@@ -44,22 +43,39 @@ public class Reindexer {
private final Cluster cluster;
private final Map<DocumentType, Instant> ready;
private final ReindexingCurator database;
- private final DocumentAccess access;
+ private final Function<VisitorParameters, Runnable> visitorSessions;
private final Clock clock;
private final Phaser phaser = new Phaser(2); // Reindexer and visitor.
private Reindexing reindexing;
private Status status;
+ @Inject
public Reindexer(Cluster cluster, Map<DocumentType, Instant> ready, ReindexingCurator database,
DocumentAccess access, Clock clock) {
+ this(cluster,
+ ready,
+ database,
+ parameters -> {
+ try {
+ return access.createVisitorSession(parameters)::destroy;
+ }
+ catch (ParseException e) {
+ throw new IllegalStateException(e);
+ }
+ },
+ clock);
+ }
+
+ Reindexer(Cluster cluster, Map<DocumentType, Instant> ready, ReindexingCurator database,
+ Function<VisitorParameters, Runnable> visitorSessions, Clock clock) {
for (DocumentType type : ready.keySet())
cluster.bucketSpaceOf(type); // Verifies this is known.
this.cluster = cluster;
this.ready = new TreeMap<>(ready); // Iterate through document types in consistent order.
this.database = database;
- this.access = access;
+ this.visitorSessions = visitorSessions;
this.clock = clock;
}
@@ -132,7 +148,14 @@ public class Reindexer {
phaser.arriveAndAwaitAdvance(); // Synchronize with the reindex thread.
}
};
- visit(type, status.progress().orElse(null), control);
+
+ VisitorParameters parameters = createParameters(type, status.progress().orElse(null));
+ parameters.setControlHandler(control);
+ Runnable sessionShutdown = visitorSessions.apply(parameters);
+
+ // Wait until done; or until termination is forced, in which case we abort the visit and wait for it to complete.
+ phaser.arriveAndAwaitAdvance(); // Synchronize with the visitor completion thread.
+ sessionShutdown.run();
// If we were interrupted, the result may not yet be set in the control handler.
switch (control.getResult().getCode()) {
@@ -153,22 +176,6 @@ public class Reindexer {
database.writeReindexing(reindexing.with(type, status));
}
- private void visit(DocumentType type, ProgressToken progress, VisitorControlHandler control) {
- VisitorParameters parameters = createParameters(type, progress);
- parameters.setControlHandler(control);
- VisitorSession session;
- try {
- session = access.createVisitorSession(parameters);
- }
- catch (ParseException e) {
- throw new IllegalStateException(e);
- }
-
- // Wait until done; or until termination is forced, in which case we abort the visit and wait for it to complete.
- phaser.arriveAndAwaitAdvance(); // Synchronize with the visitor completion thread.
- session.destroy();
- }
-
VisitorParameters createParameters(DocumentType type, ProgressToken progress) {
VisitorParameters parameters = new VisitorParameters(type.getName());
parameters.setRemoteDataHandler(cluster.name());
diff --git a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java
index 20393cba958..4e54dea837b 100644
--- a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java
+++ b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java
@@ -5,15 +5,12 @@ import ai.vespa.reindexing.Reindexer.Cluster;
import ai.vespa.reindexing.Reindexing.Status;
import ai.vespa.reindexing.ReindexingCurator.ReindexingLockException;
import com.yahoo.document.Document;
-import com.yahoo.document.DocumentPut;
import com.yahoo.document.DocumentType;
import com.yahoo.document.DocumentTypeManager;
import com.yahoo.document.config.DocumentmanagerConfig;
-import com.yahoo.documentapi.DocumentAccessParams;
import com.yahoo.documentapi.ProgressToken;
-import com.yahoo.documentapi.SyncParameters;
+import com.yahoo.documentapi.VisitorControlHandler;
import com.yahoo.documentapi.VisitorParameters;
-import com.yahoo.documentapi.local.LocalDocumentAccess;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
import com.yahoo.searchdefinition.derived.Deriver;
import com.yahoo.test.ManualClock;
@@ -25,16 +22,17 @@ import org.junit.jupiter.api.Timeout;
import java.time.Duration;
import java.time.Instant;
import java.util.Map;
-import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.Phaser;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
/**
@@ -42,21 +40,20 @@ import static org.junit.jupiter.api.Assertions.fail;
*/
class ReindexerTest {
+ static final Function<VisitorParameters, Runnable> failIfCalled = __ -> () -> { fail("Not supposed to run"); };
+
final DocumentmanagerConfig musicConfig = Deriver.getDocumentManagerConfig("src/test/resources/schemas/music.sd").build();
final DocumentTypeManager manager = new DocumentTypeManager(musicConfig);
final DocumentType music = manager.getDocumentType("music");
final Document document1 = new Document(music, "id:ns:music::one");
- final Document document2 = new Document(music, "id:ns:music::two");
final Cluster cluster = new Cluster("cluster", "id", Map.of(music, "default"));
final ManualClock clock = new ManualClock(Instant.EPOCH);
ReindexingCurator database;
- LocalDocumentAccess access;
@BeforeEach
void setUp() {
database = new ReindexingCurator(new MockCurator(), "cluster", manager, Duration.ofMillis(1));
- access = new LocalDocumentAccess(new DocumentAccessParams().setDocumentmanagerConfig(musicConfig));
}
@Test
@@ -65,29 +62,26 @@ class ReindexerTest {
() -> new Reindexer(new Cluster("cluster", "id", Map.of()),
Map.of(music, Instant.EPOCH),
database,
- access,
+ failIfCalled,
clock));
}
@Test
void throwsWhenLockHeldElsewhere() throws InterruptedException, ExecutionException {
- Reindexer reindexer = new Reindexer(cluster, Map.of(music, Instant.EPOCH), database, access, clock);
+ Reindexer reindexer = new Reindexer(cluster, Map.of(music, Instant.EPOCH), database, failIfCalled, clock);
Executors.newSingleThreadExecutor().submit(database::lockReindexing).get();
assertThrows(ReindexingLockException.class, reindexer::reindex);
}
@Test
@Timeout(10)
- void nothingToDo() throws ReindexingLockException {
- Reindexer reindexer = new Reindexer(cluster, Map.of(), database, access, clock);
- access.createSyncSession(new SyncParameters.Builder().build()).put(new DocumentPut(document1));
- access.setPhaser(new Phaser(1)); // Would block any visiting until timeout.
- reindexer.reindex();
+ void nothingToDoWithEmptyConfig() throws ReindexingLockException {
+ new Reindexer(cluster, Map.of(), database, failIfCalled, clock).reindex();
}
@Test
void parameters() {
- Reindexer reindexer = new Reindexer(cluster, Map.of(), database, access, clock);
+ Reindexer reindexer = new Reindexer(cluster, Map.of(), database, failIfCalled, clock);
ProgressToken token = new ProgressToken();
VisitorParameters parameters = reindexer.createParameters(music, token);
assertEquals("music:[document]", parameters.getFieldSet());
@@ -103,80 +97,61 @@ class ReindexerTest {
@Timeout(10)
void testReindexing() throws ReindexingLockException, ExecutionException, InterruptedException {
// Reindexer is told to update "music" documents no earlier than EPOCH, which is just now.
- Reindexer reindexer = new Reindexer(cluster, Map.of(music, Instant.EPOCH), database, access, clock);
- Phaser phaser = new Phaser(1);
- access.setPhaser(phaser); // Will block any visiting until timeout.
- reindexer.reindex();
-
- // Since "music" was a new document type, it is stored as just reindexed, and nothing else happens.
+ // Since "music" is a new document type, it is stored as just reindexed, and nothing else happens.
+ new Reindexer(cluster, Map.of(music, Instant.EPOCH), database, failIfCalled, clock).reindex();
Reindexing reindexing = Reindexing.empty().with(music, Status.ready(Instant.EPOCH).running().successful(Instant.EPOCH));
assertEquals(reindexing, database.readReindexing());
// New config tells reindexer to reindex "music" documents no earlier than at 10 millis after EPOCH, which isn't yet.
+ // Nothing happens, since it's not yet time. This isn't supposed to happen unless high clock skew.
clock.advance(Duration.ofMillis(5));
- reindexer = new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(10)), database, access, clock);
- reindexer.reindex(); // Nothing happens because it's not yet time. This isn't supposed to happen unless high clock skew.
+ new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(10)), database, failIfCalled, clock).reindex();
assertEquals(reindexing, database.readReindexing());
- // It's time to reindex the "music" documents, but since we sneak a FAILED status in there, nothing is done.
+ // It's time to reindex the "music" documents — let this complete successfully.
clock.advance(Duration.ofMillis(10));
- Reindexing failed = Reindexing.empty().with(music, Status.ready(clock.instant()).running().failed(clock.instant(), "fail"));
- database.writeReindexing(failed);
- reindexer = new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(10)), database, access, clock);
- reindexer.reindex(); // Nothing happens because status is already FAILED for the document type.
- assertEquals(failed, database.readReindexing());
-
- // It's time to reindex the "music" documents — none yet, so this is a no-op, which just updates the timestamp.
- database.writeReindexing(reindexing); // Restore state where reindexing was complete at 5 ms after EPOCH.
- ExecutorService executor = Executors.newSingleThreadExecutor();
- reindexer.reindex();
+ AtomicBoolean shutDown = new AtomicBoolean();
+ Executor executor = Executors.newSingleThreadExecutor();
+ new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(10)), database, parameters -> {
+ database.writeReindexing(Reindexing.empty()); // Wipe database to verify we write data from reindexer.
+ executor.execute(() -> parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.SUCCESS, "OK"));
+ return () -> shutDown.set(true);
+ }, clock).reindex();
reindexing = reindexing.with(music, Status.ready(clock.instant()).running().successful(clock.instant()));
assertEquals(reindexing, database.readReindexing());
+ assertTrue(shutDown.get(), "Session was shut down");
- // We add a document and interrupt reindexing before the visit is complete.
- access.createSyncSession(new SyncParameters.Builder().build()).put(new DocumentPut(document1));
+ // One more reindexing, this time shut down before visit completes, but after progress is reported.
clock.advance(Duration.ofMillis(10));
- reindexer = new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(20)), database, access, clock);
- Future<?> future = executor.submit(uncheckedReindex(reindexer));
- while (phaser.getRegisteredParties() == 1)
- Thread.sleep(1); // Need to wait for the visitor to register, without any proper way of doing it >_<
- database.writeReindexing(Reindexing.empty()); // Wreck database while running, to verify we write the expected value.
- reindexer.shutdown();
- phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which may now send the document.
- phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which may now complete.
- future.get(); // Write state to database.
- reindexing = reindexing.with(music, Status.ready(clock.instant()).running().halted());
- assertEquals(reindexing, database.readReindexing());
-
- // Manually put a progress token in there, to verify this is used when resuming, and that we store what we retrieve.
+ shutDown.set(false);
+ AtomicReference<Reindexer> aborted = new AtomicReference<>();
+ aborted.set(new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(20)), database, parameters -> {
+ database.writeReindexing(Reindexing.empty()); // Wipe database to verify we write data from reindexer.
+ parameters.getControlHandler().onProgress(new ProgressToken());
+ aborted.get().shutdown();
+ executor.execute(() -> parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.ABORTED, "Shut down"));
+ return () -> shutDown.set(true);
+ }, clock));
+ aborted.get().reindex();
reindexing = reindexing.with(music, Status.ready(clock.instant()).running().progressed(new ProgressToken()).halted());
- database.writeReindexing(reindexing);
- reindexer = new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(20)), database, access, clock);
- future = executor.submit(uncheckedReindex(reindexer));
- while (phaser.getRegisteredParties() == 1)
- Thread.sleep(1); // Need to wait for the visitor to register, without any proper way of doing it >_<
- database.writeReindexing(Reindexing.empty()); // Wreck database while running, to verify we write the expected value.
- reindexer.shutdown(); // Interrupt the visit before it completes.
- phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which may now send the document.
- phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which may now complete.
- future.get(); // Write state to database.
assertEquals(reindexing, database.readReindexing());
+ assertTrue(shutDown.get(), "Session was shut down");
- // Finally let the visit complete normally.
- reindexer = new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(20)), database, access, clock);
- future = executor.submit(uncheckedReindex(reindexer));
- while (phaser.getRegisteredParties() == 1)
- Thread.sleep(1); // Need to wait for the visitor to register, without any proper way of doing it >_<
- database.writeReindexing(Reindexing.empty()); // Wreck database while running, to verify we write the expected value.
- phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which may now send the document.
- phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which may now complete.
- future.get(); // Write state to database.
- reindexing = reindexing.with(music, Status.ready(clock.instant()).running().successful(clock.instant()));
+ // Last reindexing fails.
+ clock.advance(Duration.ofMillis(10));
+ shutDown.set(false);
+ new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(30)), database, parameters -> {
+ database.writeReindexing(Reindexing.empty()); // Wipe database to verify we write data from reindexer.
+ executor.execute(() -> parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.FAILURE, "Error"));
+ return () -> shutDown.set(true);
+ }, clock).reindex();
+ reindexing = reindexing.with(music, Status.ready(clock.instant()).running().failed(clock.instant(), "Error"));
assertEquals(reindexing, database.readReindexing());
- }
+ assertTrue(shutDown.get(), "Session was shut down");
- Callable<Void> uncheckedReindex(Reindexer reindexer) {
- return () -> { reindexer.reindex(); return null; };
+ // Document type is ignored in next run, as it has failed fatally.
+ new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(30)), database, failIfCalled, clock).reindex();
+ assertEquals(reindexing, database.readReindexing());
}
}