summaryrefslogtreecommitdiffstats
path: root/clustercontroller-reindexer
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-02-12 17:33:01 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2021-02-12 17:33:01 +0000
commitfb0524c28864e0311e819cc73a5f6b8f2a6e8b60 (patch)
treedb5982ca7448bbcc6e4285feeade29f50d503ecb /clustercontroller-reindexer
parent5b5c3bedbdd5bf9ae8106af7fead9dcfb3dd6a2f (diff)
Close the underlying Curator to avoid leaking threads and other resources.
Diffstat (limited to 'clustercontroller-reindexer')
-rw-r--r--clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java19
-rw-r--r--clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java7
-rw-r--r--clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java9
-rw-r--r--clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java8
4 files changed, 33 insertions, 10 deletions
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java
index af43211011a..2c34c3164aa 100644
--- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java
+++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java
@@ -38,7 +38,7 @@ import static java.util.logging.Level.WARNING;
*
* @author jonmv
*/
-public class Reindexer {
+public class Reindexer implements AutoCloseable {
private static final Logger log = Logger.getLogger(Reindexer.class.getName());
@@ -81,11 +81,6 @@ public class Reindexer {
this.clock = clock;
}
- /** Lets the reindexer abort any ongoing visit session, wait for it to complete normally, then exit. */
- public void shutdown() {
- phaser.forceTermination(); // All parties waiting on this phaser are immediately allowed to proceed.
- }
-
/** Starts and tracks reprocessing of ready document types until done, or interrupted. */
public void reindex() throws ReindexingLockException {
if (phaser.isTerminated())
@@ -211,6 +206,18 @@ public class Reindexer {
return parameters;
}
+ @Override
+ public void close() throws Exception {
+ phaser.forceTermination(); // All parties waiting on this phaser are immediately allowed to proceed.
+ database.close();
+ }
+
+ @Override
+ public String toString() {
+ return "Reindexer{" +
+ "cluster=" + cluster +
+ '}';
+ }
static class Cluster {
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java
index 22ae54fcc6b..f3edf30f3b2 100644
--- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java
+++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java
@@ -26,7 +26,7 @@ import static java.util.stream.Collectors.toUnmodifiableMap;
*
* @author jonmv
*/
-public class ReindexingCurator {
+public class ReindexingCurator implements AutoCloseable {
private final Curator curator;
private final ReindexingSerializer serializer;
@@ -65,6 +65,11 @@ public class ReindexingCurator {
private Path statusPath(String clusterName) { return rootPath(clusterName).append("status"); }
private Path lockPath(String clusterName) { return rootPath(clusterName).append("lock"); }
+ @Override
+ public void close() throws Exception {
+ curator.close();
+ }
+
private static class ReindexingSerializer {
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java
index a37a24d2b01..101989c6d17 100644
--- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java
+++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java
@@ -98,8 +98,13 @@ public class ReindexingMaintainer extends AbstractComponent {
@Override
public void deconstruct() {
try {
- for (Reindexer reindexer : reindexers)
- reindexer.shutdown();
+ for (Reindexer reindexer : reindexers) {
+ try {
+ reindexer.close();
+ } catch (Exception e) {
+ log.log(WARNING, "Received exception while closing down reindexer " + reindexer.toString() + " : ", e);
+ }
+ }
executor.shutdown();
if ( ! executor.awaitTermination(45, TimeUnit.SECONDS))
diff --git a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java
index 8cf96fa8c45..c5b72db4dee 100644
--- a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java
+++ b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java
@@ -28,10 +28,12 @@ import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import static java.util.logging.Level.WARNING;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
/**
* @author jonmv
@@ -140,7 +142,11 @@ class ReindexerTest {
aborted.set(new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(20)), database, parameters -> {
database.writeReindexing(Reindexing.empty(), "cluster"); // Wipe database to verify we write data from reindexer.
parameters.getControlHandler().onProgress(new ProgressToken());
- aborted.get().shutdown();
+ try {
+ aborted.get().close();
+ } catch (Exception e) {
+ fail(e);
+ }
return () -> {
shutDown.set(true);
parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.ABORTED, "Shut down");