diff options
Diffstat (limited to 'clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java')
-rw-r--r-- | clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java | 60 |
1 files changed, 34 insertions, 26 deletions
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java index 7989338c406..9a114eabbb5 100644 --- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java @@ -29,12 +29,14 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.logging.Logger; +import java.util.stream.Collectors; import java.util.stream.Stream; import static java.util.logging.Level.FINE; import static java.util.logging.Level.WARNING; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toMap; +import static java.util.stream.Collectors.toUnmodifiableList; import static java.util.stream.Collectors.toUnmodifiableMap; /** @@ -48,7 +50,7 @@ public class ReindexingMaintainer extends AbstractComponent { private static final Logger log = Logger.getLogger(Reindexing.class.getName()); - private final Reindexer reindexer; + private final List<Reindexer> reindexers; private final ScheduledExecutorService executor; @Inject @@ -63,51 +65,57 @@ public class ReindexingMaintainer extends AbstractComponent { ReindexingMaintainer(Clock clock, Metric metric, DocumentAccess access, ZookeepersConfig zookeepersConfig, ClusterListConfig clusterListConfig, AllClustersBucketSpacesConfig allClustersBucketSpacesConfig, ReindexingConfig reindexingConfig) { - this.reindexer = new Reindexer(parseCluster(reindexingConfig.clusterName(), clusterListConfig, allClustersBucketSpacesConfig, access.getDocumentTypeManager()), - parseReady(reindexingConfig, access.getDocumentTypeManager()), - new ReindexingCurator(Curator.create(zookeepersConfig.zookeeperserverlist()), - reindexingConfig.clusterName(), - access.getDocumentTypeManager()), - access, - metric, - clock); - this.executor = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory("reindexer-")); + this.reindexers = reindexingConfig.clusters().entrySet().stream() + .map(cluster -> new Reindexer(parseCluster(cluster.getKey(), clusterListConfig, allClustersBucketSpacesConfig, access.getDocumentTypeManager()), + parseReady(cluster.getValue(), access.getDocumentTypeManager()), + new ReindexingCurator(Curator.create(zookeepersConfig.zookeeperserverlist()), + access.getDocumentTypeManager()), + access, + metric, + clock)) + .collect(toUnmodifiableList()); + this.executor = new ScheduledThreadPoolExecutor(reindexingConfig.clusters().size(), new DaemonThreadFactory("reindexer-")); if (reindexingConfig.enabled()) scheduleStaggered((delayMillis, intervalMillis) -> executor.scheduleAtFixedRate(this::maintain, delayMillis, intervalMillis, TimeUnit.MILLISECONDS), Duration.ofMinutes(1), clock.instant(), HostName.getLocalhost(), zookeepersConfig.zookeeperserverlist()); } private void maintain() { - try { - reindexer.reindex(); - } - catch (ReindexingLockException e) { - log.log(FINE, "Failed to acquire reindexing lock"); - } - catch (Exception e) { - log.log(WARNING, "Exception when reindexing", e); - } + for (Reindexer reindexer : reindexers) + executor.submit(() -> { + try { + reindexer.reindex(); + } + catch (ReindexingLockException e) { + log.log(FINE, "Failed to acquire reindexing lock"); + } + catch (Exception e) { + log.log(WARNING, "Exception when reindexing", e); + } + }); } @Override public void deconstruct() { try { - reindexer.shutdown(); + for (Reindexer reindexer : reindexers) + reindexer.shutdown(); + executor.shutdown(); if ( ! executor.awaitTermination(45, TimeUnit.SECONDS)) - log.log(WARNING, "Failed to shut down reindexer within timeout"); + log.log(WARNING, "Failed to shut down reindexing within timeout"); } catch (InterruptedException e) { - log.log(WARNING, "Interrupted while waiting for reindexer to shut down"); + log.log(WARNING, "Interrupted while waiting for reindexing to shut down"); Thread.currentThread().interrupt(); } } - static Map<DocumentType, Instant> parseReady(ReindexingConfig config, DocumentTypeManager manager) { - return config.status().entrySet().stream() - .collect(toUnmodifiableMap(typeStatus -> manager.getDocumentType(typeStatus.getKey()), - typeStatus -> Instant.ofEpochMilli(typeStatus.getValue().readyAtMillis()))); + static Map<DocumentType, Instant> parseReady(ReindexingConfig.Clusters cluster, DocumentTypeManager manager) { + return cluster.documentTypes().entrySet().stream() + .collect(toUnmodifiableMap(typeStatus -> manager.getDocumentType(typeStatus.getKey()), + typeStatus -> Instant.ofEpochMilli(typeStatus.getValue().readyAtMillis()))); } /** Schedules a task with the given interval (across all containers in this ZK cluster). */ |