summaryrefslogtreecommitdiffstats
path: root/clustercontroller-reindexer
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2020-11-06 18:11:37 +0100
committerJon Marius Venstad <venstad@gmail.com>2020-11-06 18:11:37 +0100
commit7572c20d93c23756353aefbf4d12c3c214337220 (patch)
tree24b042bf197633f2d1b6685c4a06a1fae21636d3 /clustercontroller-reindexer
parent604ddaefbb59f1353a16e25e45ad0c241cc79793 (diff)
Address review comments, and fix unit test
Diffstat (limited to 'clustercontroller-reindexer')
-rw-r--r--clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java6
-rw-r--r--clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java2
-rw-r--r--clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java6
-rw-r--r--clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java11
4 files changed, 12 insertions, 13 deletions
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java
index 22b27056123..daf9730cd93 100644
--- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java
+++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java
@@ -76,8 +76,8 @@ public class Reindexer {
try (Lock lock = database.lockReindexing()) {
for (DocumentType type : ready.keySet()) { // We consider only document types for which we have config.
if (ready.get(type).isAfter(clock.instant()))
- log.log(WARNING, "Received config for reindexing which is ready in the future — will process later " +
- "(" + ready.get(type) + " is after " + clock.instant() + ")");
+ log.log(INFO, "Received config for reindexing which is ready in the future — will process later " +
+ "(" + ready.get(type) + " is after " + clock.instant() + ")");
else
progress(type);
@@ -108,7 +108,7 @@ public class Reindexer {
case SUCCESSFUL: // Intentional fallthrough — all three are done states.
return;
case RUNNING:
- log.log(WARNING, "Unepxected state 'RUNNING' of reindexing of " + type);
+ log.log(WARNING, "Unexpected state 'RUNNING' of reindexing of " + type);
case READY: // Intentional fallthrough — must just assume we failed updating state when exiting previously.
log.log(FINE, () -> "Running reindexing of " + type);
}
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java
index 3ede1524d67..e870a89914a 100644
--- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java
+++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java
@@ -69,7 +69,7 @@ public class ReindexingCurator {
try {
return curator.lock(lockPath, lockTimeout);
}
- catch (UncheckedTimeoutException e) {
+ catch (UncheckedTimeoutException e) { // TODO jonmv: Avoid use of guava classes.
throw new ReindexingLockException(e);
}
}
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java
index 054a91458d7..0addf12b4c1 100644
--- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java
+++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java
@@ -27,17 +27,17 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
-import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Stream;
+import static java.util.logging.Level.FINE;
import static java.util.logging.Level.WARNING;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
import static java.util.stream.Collectors.toUnmodifiableMap;
/**
- * Runs in all cluster controller containers, and progresses reindexngg efforts.
+ * Runs in all cluster controller containers, and progresses reindexing efforts.
* Work is only done by one container at a time, by requiring a shared ZooKeeper lock to be held while visiting.
* Whichever maintainer gets the lock holds it until all reindexing is done, or until shutdown.
*
@@ -78,7 +78,7 @@ public class ReindexingMaintainer extends AbstractComponent {
reindexer.reindex();
}
catch (ReindexingLockException e) {
- // Some other container is handling the reindexing at this moment, which is fine.
+ log.log(FINE, "Failed to acquire reindexing lock");
}
catch (Exception e) {
log.log(WARNING, "Exception when reindexing", e);
diff --git a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java
index 418b868b9c0..7120c2e30e9 100644
--- a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java
+++ b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java
@@ -35,6 +35,7 @@ import java.util.concurrent.Phaser;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.fail;
/**
* @author jonmv
@@ -128,11 +129,7 @@ class ReindexerTest {
// It's time to reindex the "music" documents — none yet, so this is a no-op, which just updates the timestamp.
database.writeReindexing(reindexing); // Restore state where reindexing was complete at 5 ms after EPOCH.
ExecutorService executor = Executors.newSingleThreadExecutor();
- Future<?> future = executor.submit(uncheckedReindex(reindexer));
- while (phaser.getRegisteredParties() == 1)
- Thread.sleep(1); // Need to wait for the visitor to register, without any proper way of doing it >_<
- phaser.arriveAndAwaitAdvance(); // Visitor has arrived — so should we.
- future.get(); // Write state to database.
+ reindexer.reindex();
reindexing = reindexing.with(music, Status.ready(clock.instant()).running().successful(clock.instant()));
assertEquals(reindexing, database.readReindexing());
@@ -140,7 +137,7 @@ class ReindexerTest {
access.createSyncSession(new SyncParameters.Builder().build()).put(new DocumentPut(document1));
clock.advance(Duration.ofMillis(10));
reindexer = new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(20)), database, access, clock);
- future = executor.submit(uncheckedReindex(reindexer));
+ Future<?> future = executor.submit(uncheckedReindex(reindexer));
while (phaser.getRegisteredParties() == 1)
Thread.sleep(1); // Need to wait for the visitor to register, without any proper way of doing it >_<
database.writeReindexing(Reindexing.empty()); // Wreck database while running, to verify we write the expected value.
@@ -168,6 +165,8 @@ class ReindexerTest {
// Finally let the visit complete normally.
reindexer = new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(20)), database, access, clock);
future = executor.submit(uncheckedReindex(reindexer));
+ while (phaser.getRegisteredParties() == 1)
+ Thread.sleep(1); // Need to wait for the visitor to register, without any proper way of doing it >_<
database.writeReindexing(Reindexing.empty()); // Wreck database while running, to verify we write the expected value.
phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which may now send the document.
phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which may now complete.