diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-02-12 17:33:01 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2021-02-12 17:33:01 +0000 |
commit | fb0524c28864e0311e819cc73a5f6b8f2a6e8b60 (patch) | |
tree | db5982ca7448bbcc6e4285feeade29f50d503ecb /clustercontroller-reindexer/src | |
parent | 5b5c3bedbdd5bf9ae8106af7fead9dcfb3dd6a2f (diff) |
Close the underlying Curator to avoid leaking threads and other resources.
Diffstat (limited to 'clustercontroller-reindexer/src')
4 files changed, 33 insertions, 10 deletions
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java index af43211011a..2c34c3164aa 100644 --- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java @@ -38,7 +38,7 @@ import static java.util.logging.Level.WARNING; * * @author jonmv */ -public class Reindexer { +public class Reindexer implements AutoCloseable { private static final Logger log = Logger.getLogger(Reindexer.class.getName()); @@ -81,11 +81,6 @@ public class Reindexer { this.clock = clock; } - /** Lets the reindexer abort any ongoing visit session, wait for it to complete normally, then exit. */ - public void shutdown() { - phaser.forceTermination(); // All parties waiting on this phaser are immediately allowed to proceed. - } - /** Starts and tracks reprocessing of ready document types until done, or interrupted. */ public void reindex() throws ReindexingLockException { if (phaser.isTerminated()) @@ -211,6 +206,18 @@ public class Reindexer { return parameters; } + @Override + public void close() throws Exception { + phaser.forceTermination(); // All parties waiting on this phaser are immediately allowed to proceed. + database.close(); + } + + @Override + public String toString() { + return "Reindexer{" + + "cluster=" + cluster + + '}'; + } static class Cluster { diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java index 22ae54fcc6b..f3edf30f3b2 100644 --- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java @@ -26,7 +26,7 @@ import static java.util.stream.Collectors.toUnmodifiableMap; * * @author jonmv */ -public class ReindexingCurator { +public class ReindexingCurator implements AutoCloseable { private final Curator curator; private final ReindexingSerializer serializer; @@ -65,6 +65,11 @@ public class ReindexingCurator { private Path statusPath(String clusterName) { return rootPath(clusterName).append("status"); } private Path lockPath(String clusterName) { return rootPath(clusterName).append("lock"); } + @Override + public void close() throws Exception { + curator.close(); + } + private static class ReindexingSerializer { diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java index a37a24d2b01..101989c6d17 100644 --- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java @@ -98,8 +98,13 @@ public class ReindexingMaintainer extends AbstractComponent { @Override public void deconstruct() { try { - for (Reindexer reindexer : reindexers) - reindexer.shutdown(); + for (Reindexer reindexer : reindexers) { + try { + reindexer.close(); + } catch (Exception e) { + log.log(WARNING, "Received exception while closing down reindexer " + reindexer.toString() + " : ", e); + } + } executor.shutdown(); if ( ! executor.awaitTermination(45, TimeUnit.SECONDS)) diff --git a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java index 8cf96fa8c45..c5b72db4dee 100644 --- a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java +++ b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java @@ -28,10 +28,12 @@ import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import static java.util.logging.Level.WARNING; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; /** * @author jonmv @@ -140,7 +142,11 @@ class ReindexerTest { aborted.set(new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(20)), database, parameters -> { database.writeReindexing(Reindexing.empty(), "cluster"); // Wipe database to verify we write data from reindexer. parameters.getControlHandler().onProgress(new ProgressToken()); - aborted.get().shutdown(); + try { + aborted.get().close(); + } catch (Exception e) { + fail(e); + } return () -> { shutDown.set(true); parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.ABORTED, "Shut down"); |