diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2020-11-06 18:11:37 +0100 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2020-11-06 18:11:37 +0100 |
commit | 7572c20d93c23756353aefbf4d12c3c214337220 (patch) | |
tree | 24b042bf197633f2d1b6685c4a06a1fae21636d3 | |
parent | 604ddaefbb59f1353a16e25e45ad0c241cc79793 (diff) |
Address review comments, and fix unit test
5 files changed, 13 insertions, 14 deletions
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java index 22b27056123..daf9730cd93 100644 --- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java @@ -76,8 +76,8 @@ public class Reindexer { try (Lock lock = database.lockReindexing()) { for (DocumentType type : ready.keySet()) { // We consider only document types for which we have config. if (ready.get(type).isAfter(clock.instant())) - log.log(WARNING, "Received config for reindexing which is ready in the future — will process later " + - "(" + ready.get(type) + " is after " + clock.instant() + ")"); + log.log(INFO, "Received config for reindexing which is ready in the future — will process later " + + "(" + ready.get(type) + " is after " + clock.instant() + ")"); else progress(type); @@ -108,7 +108,7 @@ public class Reindexer { case SUCCESSFUL: // Intentional fallthrough — all three are done states. return; case RUNNING: - log.log(WARNING, "Unepxected state 'RUNNING' of reindexing of " + type); + log.log(WARNING, "Unexpected state 'RUNNING' of reindexing of " + type); case READY: // Intentional fallthrough — must just assume we failed updating state when exiting previously. log.log(FINE, () -> "Running reindexing of " + type); } diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java index 3ede1524d67..e870a89914a 100644 --- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java @@ -69,7 +69,7 @@ public class ReindexingCurator { try { return curator.lock(lockPath, lockTimeout); } - catch (UncheckedTimeoutException e) { + catch (UncheckedTimeoutException e) { // TODO jonmv: Avoid use of guava classes. throw new ReindexingLockException(e); } } diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java index 054a91458d7..0addf12b4c1 100644 --- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java @@ -27,17 +27,17 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; -import java.util.logging.Level; import java.util.logging.Logger; import java.util.stream.Stream; +import static java.util.logging.Level.FINE; import static java.util.logging.Level.WARNING; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toMap; import static java.util.stream.Collectors.toUnmodifiableMap; /** - * Runs in all cluster controller containers, and progresses reindexngg efforts. + * Runs in all cluster controller containers, and progresses reindexing efforts. * Work is only done by one container at a time, by requiring a shared ZooKeeper lock to be held while visiting. * Whichever maintainer gets the lock holds it until all reindexing is done, or until shutdown. * @@ -78,7 +78,7 @@ public class ReindexingMaintainer extends AbstractComponent { reindexer.reindex(); } catch (ReindexingLockException e) { - // Some other container is handling the reindexing at this moment, which is fine. + log.log(FINE, "Failed to acquire reindexing lock"); } catch (Exception e) { log.log(WARNING, "Exception when reindexing", e); diff --git a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java index 418b868b9c0..7120c2e30e9 100644 --- a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java +++ b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java @@ -35,6 +35,7 @@ import java.util.concurrent.Phaser; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.fail; /** * @author jonmv @@ -128,11 +129,7 @@ class ReindexerTest { // It's time to reindex the "music" documents — none yet, so this is a no-op, which just updates the timestamp. database.writeReindexing(reindexing); // Restore state where reindexing was complete at 5 ms after EPOCH. ExecutorService executor = Executors.newSingleThreadExecutor(); - Future<?> future = executor.submit(uncheckedReindex(reindexer)); - while (phaser.getRegisteredParties() == 1) - Thread.sleep(1); // Need to wait for the visitor to register, without any proper way of doing it >_< - phaser.arriveAndAwaitAdvance(); // Visitor has arrived — so should we. - future.get(); // Write state to database. + reindexer.reindex(); reindexing = reindexing.with(music, Status.ready(clock.instant()).running().successful(clock.instant())); assertEquals(reindexing, database.readReindexing()); @@ -140,7 +137,7 @@ class ReindexerTest { access.createSyncSession(new SyncParameters.Builder().build()).put(new DocumentPut(document1)); clock.advance(Duration.ofMillis(10)); reindexer = new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(20)), database, access, clock); - future = executor.submit(uncheckedReindex(reindexer)); + Future<?> future = executor.submit(uncheckedReindex(reindexer)); while (phaser.getRegisteredParties() == 1) Thread.sleep(1); // Need to wait for the visitor to register, without any proper way of doing it >_< database.writeReindexing(Reindexing.empty()); // Wreck database while running, to verify we write the expected value. @@ -168,6 +165,8 @@ class ReindexerTest { // Finally let the visit complete normally. reindexer = new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(20)), database, access, clock); future = executor.submit(uncheckedReindex(reindexer)); + while (phaser.getRegisteredParties() == 1) + Thread.sleep(1); // Need to wait for the visitor to register, without any proper way of doing it >_< database.writeReindexing(Reindexing.empty()); // Wreck database while running, to verify we write the expected value. phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which may now send the document. phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which may now complete. diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java index 43bfb9ca667..d332b1fb1ca 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java @@ -28,7 +28,7 @@ import java.util.concurrent.atomic.AtomicReference; /** * Local visitor session that copies and iterates through all items in the local document access. - * Each document must be ack'ed for the session to be done visiting, unless the destinatino is remote. + * Each document must be ack'ed for the session to be done visiting, unless the destination is remote. * Only document puts are sent by this session, and this is done from a separate thread. * * @author jonmv |