summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJon Marius Venstad <jonmv@users.noreply.github.com>2021-02-14 16:20:12 +0100
committerGitHub <noreply@github.com>2021-02-14 16:20:12 +0100
commit76b1a42ae483ae66a2d1e43ef749f41661754e1c (patch)
treefd8c70a3b5124305a6ec8e5f91ae40bcc7f8becb
parentb51ce665456070fb6b59001558309fcb8429d509 (diff)
parent91c7af56b20a0d93a0db550cecca2059a7665992 (diff)
Merge pull request #16506 from vespa-engine/jonmv/shut-down-reincexing-curator
Jonmv/shut down reincexing curator
-rw-r--r--clustercontroller-apps/src/main/java/com/yahoo/vespa/clustercontroller/apps/clustercontroller/ClusterController.java5
-rw-r--r--clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java19
-rw-r--r--clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java7
-rw-r--r--clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java17
-rw-r--r--clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java8
-rw-r--r--zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java1
6 files changed, 20 insertions, 37 deletions
diff --git a/clustercontroller-apps/src/main/java/com/yahoo/vespa/clustercontroller/apps/clustercontroller/ClusterController.java b/clustercontroller-apps/src/main/java/com/yahoo/vespa/clustercontroller/apps/clustercontroller/ClusterController.java
index ff7cbbf83da..8ec93b99aff 100644
--- a/clustercontroller-apps/src/main/java/com/yahoo/vespa/clustercontroller/apps/clustercontroller/ClusterController.java
+++ b/clustercontroller-apps/src/main/java/com/yahoo/vespa/clustercontroller/apps/clustercontroller/ClusterController.java
@@ -100,8 +100,9 @@ public class ClusterController extends AbstractComponent
*/
private void verifyThatZooKeeperWorks(FleetControllerOptions options) throws Exception {
if (options.zooKeeperServerAddress != null && !"".equals(options.zooKeeperServerAddress)) {
- Curator curator = Curator.create(options.zooKeeperServerAddress);
- curator.framework().blockUntilConnected();
+ try (Curator curator = Curator.create(options.zooKeeperServerAddress)) {
+ curator.framework().blockUntilConnected();
+ }
}
}
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java
index 2c34c3164aa..af43211011a 100644
--- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java
+++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java
@@ -38,7 +38,7 @@ import static java.util.logging.Level.WARNING;
*
* @author jonmv
*/
-public class Reindexer implements AutoCloseable {
+public class Reindexer {
private static final Logger log = Logger.getLogger(Reindexer.class.getName());
@@ -81,6 +81,11 @@ public class Reindexer implements AutoCloseable {
this.clock = clock;
}
+ /** Lets the reindexer abort any ongoing visit session, wait for it to complete normally, then exit. */
+ public void shutdown() {
+ phaser.forceTermination(); // All parties waiting on this phaser are immediately allowed to proceed.
+ }
+
/** Starts and tracks reprocessing of ready document types until done, or interrupted. */
public void reindex() throws ReindexingLockException {
if (phaser.isTerminated())
@@ -206,18 +211,6 @@ public class Reindexer implements AutoCloseable {
return parameters;
}
- @Override
- public void close() throws Exception {
- phaser.forceTermination(); // All parties waiting on this phaser are immediately allowed to proceed.
- database.close();
- }
-
- @Override
- public String toString() {
- return "Reindexer{" +
- "cluster=" + cluster +
- '}';
- }
static class Cluster {
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java
index f3edf30f3b2..22ae54fcc6b 100644
--- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java
+++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java
@@ -26,7 +26,7 @@ import static java.util.stream.Collectors.toUnmodifiableMap;
*
* @author jonmv
*/
-public class ReindexingCurator implements AutoCloseable {
+public class ReindexingCurator {
private final Curator curator;
private final ReindexingSerializer serializer;
@@ -65,11 +65,6 @@ public class ReindexingCurator implements AutoCloseable {
private Path statusPath(String clusterName) { return rootPath(clusterName).append("status"); }
private Path lockPath(String clusterName) { return rootPath(clusterName).append("lock"); }
- @Override
- public void close() throws Exception {
- curator.close();
- }
-
private static class ReindexingSerializer {
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java
index 101989c6d17..4e263fb865d 100644
--- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java
+++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java
@@ -50,6 +50,7 @@ public class ReindexingMaintainer extends AbstractComponent {
private static final Logger log = Logger.getLogger(Reindexing.class.getName());
+ private final Curator curator;
private final List<Reindexer> reindexers;
private final ScheduledExecutorService executor;
@@ -65,11 +66,12 @@ public class ReindexingMaintainer extends AbstractComponent {
ReindexingMaintainer(Clock clock, Metric metric, DocumentAccess access, ZookeepersConfig zookeepersConfig,
ClusterListConfig clusterListConfig, AllClustersBucketSpacesConfig allClustersBucketSpacesConfig,
ReindexingConfig reindexingConfig) {
+ this.curator = Curator.create(zookeepersConfig.zookeeperserverlist());
+ ReindexingCurator reindexingCurator = new ReindexingCurator(curator, access.getDocumentTypeManager());
this.reindexers = reindexingConfig.clusters().entrySet().stream()
.map(cluster -> new Reindexer(parseCluster(cluster.getKey(), clusterListConfig, allClustersBucketSpacesConfig, access.getDocumentTypeManager()),
parseReady(cluster.getValue(), access.getDocumentTypeManager()),
- new ReindexingCurator(Curator.create(zookeepersConfig.zookeeperserverlist()),
- access.getDocumentTypeManager()),
+ reindexingCurator,
access,
metric,
clock))
@@ -98,13 +100,8 @@ public class ReindexingMaintainer extends AbstractComponent {
@Override
public void deconstruct() {
try {
- for (Reindexer reindexer : reindexers) {
- try {
- reindexer.close();
- } catch (Exception e) {
- log.log(WARNING, "Received exception while closing down reindexer " + reindexer.toString() + " : ", e);
- }
- }
+ for (Reindexer reindexer : reindexers)
+ reindexer.shutdown();
executor.shutdown();
if ( ! executor.awaitTermination(45, TimeUnit.SECONDS))
@@ -116,6 +113,8 @@ public class ReindexingMaintainer extends AbstractComponent {
}
if ( ! executor.isShutdown())
executor.shutdownNow();
+
+ curator.close();
}
static Map<DocumentType, Instant> parseReady(ReindexingConfig.Clusters cluster, DocumentTypeManager manager) {
diff --git a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java
index c5b72db4dee..8cf96fa8c45 100644
--- a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java
+++ b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java
@@ -28,12 +28,10 @@ import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
-import static java.util.logging.Level.WARNING;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
/**
* @author jonmv
@@ -142,11 +140,7 @@ class ReindexerTest {
aborted.set(new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(20)), database, parameters -> {
database.writeReindexing(Reindexing.empty(), "cluster"); // Wipe database to verify we write data from reindexer.
parameters.getControlHandler().onProgress(new ProgressToken());
- try {
- aborted.get().close();
- } catch (Exception e) {
- fail(e);
- }
+ aborted.get().shutdown();
return () -> {
shutDown.set(true);
parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.ABORTED, "Shut down");
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java
index 90eec5760fc..adfd9bd051f 100644
--- a/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java
+++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java
@@ -78,6 +78,7 @@ public class Curator implements VespaCurator, AutoCloseable {
}
@Inject
+ // TODO jonmv: Use a Provider for this, due to required shutdown.
public Curator(CuratorConfig curatorConfig, @SuppressWarnings("unused") VespaZooKeeperServer server) {
// Depends on ZooKeeperServer to make sure it is started first
this(ConnectionSpec.create(curatorConfig.server(),