aboutsummaryrefslogtreecommitdiffstats
path: root/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java
diff options
context:
space:
mode:
Diffstat (limited to 'clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java')
-rw-r--r--clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java60
1 files changed, 34 insertions, 26 deletions
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java
index 7989338c406..9a114eabbb5 100644
--- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java
+++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java
@@ -29,12 +29,14 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.logging.Logger;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
import static java.util.logging.Level.FINE;
import static java.util.logging.Level.WARNING;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
+import static java.util.stream.Collectors.toUnmodifiableList;
import static java.util.stream.Collectors.toUnmodifiableMap;
/**
@@ -48,7 +50,7 @@ public class ReindexingMaintainer extends AbstractComponent {
private static final Logger log = Logger.getLogger(Reindexing.class.getName());
- private final Reindexer reindexer;
+ private final List<Reindexer> reindexers;
private final ScheduledExecutorService executor;
@Inject
@@ -63,51 +65,57 @@ public class ReindexingMaintainer extends AbstractComponent {
ReindexingMaintainer(Clock clock, Metric metric, DocumentAccess access, ZookeepersConfig zookeepersConfig,
ClusterListConfig clusterListConfig, AllClustersBucketSpacesConfig allClustersBucketSpacesConfig,
ReindexingConfig reindexingConfig) {
- this.reindexer = new Reindexer(parseCluster(reindexingConfig.clusterName(), clusterListConfig, allClustersBucketSpacesConfig, access.getDocumentTypeManager()),
- parseReady(reindexingConfig, access.getDocumentTypeManager()),
- new ReindexingCurator(Curator.create(zookeepersConfig.zookeeperserverlist()),
- reindexingConfig.clusterName(),
- access.getDocumentTypeManager()),
- access,
- metric,
- clock);
- this.executor = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory("reindexer-"));
+ this.reindexers = reindexingConfig.clusters().entrySet().stream()
+ .map(cluster -> new Reindexer(parseCluster(cluster.getKey(), clusterListConfig, allClustersBucketSpacesConfig, access.getDocumentTypeManager()),
+ parseReady(cluster.getValue(), access.getDocumentTypeManager()),
+ new ReindexingCurator(Curator.create(zookeepersConfig.zookeeperserverlist()),
+ access.getDocumentTypeManager()),
+ access,
+ metric,
+ clock))
+ .collect(toUnmodifiableList());
+ this.executor = new ScheduledThreadPoolExecutor(reindexingConfig.clusters().size(), new DaemonThreadFactory("reindexer-"));
if (reindexingConfig.enabled())
scheduleStaggered((delayMillis, intervalMillis) -> executor.scheduleAtFixedRate(this::maintain, delayMillis, intervalMillis, TimeUnit.MILLISECONDS),
Duration.ofMinutes(1), clock.instant(), HostName.getLocalhost(), zookeepersConfig.zookeeperserverlist());
}
private void maintain() {
- try {
- reindexer.reindex();
- }
- catch (ReindexingLockException e) {
- log.log(FINE, "Failed to acquire reindexing lock");
- }
- catch (Exception e) {
- log.log(WARNING, "Exception when reindexing", e);
- }
+ for (Reindexer reindexer : reindexers)
+ executor.submit(() -> {
+ try {
+ reindexer.reindex();
+ }
+ catch (ReindexingLockException e) {
+ log.log(FINE, "Failed to acquire reindexing lock");
+ }
+ catch (Exception e) {
+ log.log(WARNING, "Exception when reindexing", e);
+ }
+ });
}
@Override
public void deconstruct() {
try {
- reindexer.shutdown();
+ for (Reindexer reindexer : reindexers)
+ reindexer.shutdown();
+
executor.shutdown();
if ( ! executor.awaitTermination(45, TimeUnit.SECONDS))
- log.log(WARNING, "Failed to shut down reindexer within timeout");
+ log.log(WARNING, "Failed to shut down reindexing within timeout");
}
catch (InterruptedException e) {
- log.log(WARNING, "Interrupted while waiting for reindexer to shut down");
+ log.log(WARNING, "Interrupted while waiting for reindexing to shut down");
Thread.currentThread().interrupt();
}
}
- static Map<DocumentType, Instant> parseReady(ReindexingConfig config, DocumentTypeManager manager) {
- return config.status().entrySet().stream()
- .collect(toUnmodifiableMap(typeStatus -> manager.getDocumentType(typeStatus.getKey()),
- typeStatus -> Instant.ofEpochMilli(typeStatus.getValue().readyAtMillis())));
+ static Map<DocumentType, Instant> parseReady(ReindexingConfig.Clusters cluster, DocumentTypeManager manager) {
+ return cluster.documentTypes().entrySet().stream()
+ .collect(toUnmodifiableMap(typeStatus -> manager.getDocumentType(typeStatus.getKey()),
+ typeStatus -> Instant.ofEpochMilli(typeStatus.getValue().readyAtMillis())));
}
/** Schedules a task with the given interval (across all containers in this ZK cluster). */