diff options
Diffstat (limited to 'clustercontroller-reindexer/src/main')
4 files changed, 69 insertions, 57 deletions
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java index ebd6837a97f..19dfd031dfc 100644 --- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java @@ -91,10 +91,10 @@ public class Reindexer { if (phaser.isTerminated()) throw new IllegalStateException("Already shut down"); - try (Lock lock = database.lockReindexing()) { - AtomicReference<Reindexing> reindexing = new AtomicReference<>(database.readReindexing()); + try (Lock lock = database.lockReindexing(cluster.name())) { + AtomicReference<Reindexing> reindexing = new AtomicReference<>(database.readReindexing(cluster.name())); reindexing.set(updateWithReady(ready, reindexing.get(), clock.instant())); - database.writeReindexing(reindexing.get()); + database.writeReindexing(reindexing.get(), cluster.name()); metrics.dump(reindexing.get()); for (DocumentType type : ready.keySet()) { // We consider only document types for which we have config. @@ -150,7 +150,7 @@ public class Reindexer { status.updateAndGet(value -> value.progressed(token)); if (progressLastStored.get().isBefore(clock.instant().minusSeconds(10))) { progressLastStored.set(clock.instant()); - database.writeReindexing(reindexing.updateAndGet(value -> value.with(type, status.get()))); + database.writeReindexing(reindexing.updateAndGet(value -> value.with(type, status.get())), cluster.name()); metrics.dump(reindexing.get()); } } @@ -185,7 +185,7 @@ public class Reindexer { log.log(INFO, "Completed reindexing of " + type + " after " + Duration.between(status.get().startedAt(), clock.instant())); status.updateAndGet(value -> value.successful(clock.instant())); } - database.writeReindexing(reindexing.updateAndGet(value -> value.with(type, status.get()))); + database.writeReindexing(reindexing.updateAndGet(value -> value.with(type, status.get())), cluster.name()); metrics.dump(reindexing.get()); } diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java index 5336275a9c0..22ae54fcc6b 100644 --- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java @@ -29,43 +29,41 @@ import static java.util.stream.Collectors.toUnmodifiableMap; public class ReindexingCurator { private final Curator curator; - private final String clusterName; private final ReindexingSerializer serializer; private final Duration lockTimeout; - public ReindexingCurator(Curator curator, String clusterName, DocumentTypeManager manager) { - this(curator, clusterName, manager, Duration.ofSeconds(1)); + public ReindexingCurator(Curator curator, DocumentTypeManager manager) { + this(curator, manager, Duration.ofSeconds(1)); } - ReindexingCurator(Curator curator, String clusterName, DocumentTypeManager manager, Duration lockTimeout) { + ReindexingCurator(Curator curator, DocumentTypeManager manager, Duration lockTimeout) { this.curator = curator; - this.clusterName = clusterName; this.serializer = new ReindexingSerializer(manager); this.lockTimeout = lockTimeout; } - public Reindexing readReindexing() { - return curator.getData(statusPath()).map(serializer::deserialize) + public Reindexing readReindexing(String cluster) { + return curator.getData(statusPath(cluster)).map(serializer::deserialize) .orElse(Reindexing.empty()); } - public void writeReindexing(Reindexing reindexing) { - curator.set(statusPath(), serializer.serialize(reindexing)); + public void writeReindexing(Reindexing reindexing, String cluster) { + curator.set(statusPath(cluster), serializer.serialize(reindexing)); } /** This lock must be held to manipulate reindexing state, or by whoever has a running visitor. */ - public Lock lockReindexing() throws ReindexingLockException { + public Lock lockReindexing(String cluster) throws ReindexingLockException { try { - return curator.lock(lockPath(), lockTimeout); + return curator.lock(lockPath(cluster), lockTimeout); } catch (UncheckedTimeoutException e) { // TODO jonmv: Avoid use of guava classes. throw new ReindexingLockException(e); } } - private Path rootPath() { return Path.fromString("/reindexing/v1/" + clusterName); } - private Path statusPath() { return rootPath().append("status"); } - private Path lockPath() { return rootPath().append("lock"); } + private Path rootPath(String clusterName) { return Path.fromString("/reindexing/v1/" + clusterName); } + private Path statusPath(String clusterName) { return rootPath(clusterName).append("status"); } + private Path lockPath(String clusterName) { return rootPath(clusterName).append("lock"); } private static class ReindexingSerializer { diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java index 7989338c406..9a114eabbb5 100644 --- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java @@ -29,12 +29,14 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.logging.Logger; +import java.util.stream.Collectors; import java.util.stream.Stream; import static java.util.logging.Level.FINE; import static java.util.logging.Level.WARNING; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toMap; +import static java.util.stream.Collectors.toUnmodifiableList; import static java.util.stream.Collectors.toUnmodifiableMap; /** @@ -48,7 +50,7 @@ public class ReindexingMaintainer extends AbstractComponent { private static final Logger log = Logger.getLogger(Reindexing.class.getName()); - private final Reindexer reindexer; + private final List<Reindexer> reindexers; private final ScheduledExecutorService executor; @Inject @@ -63,51 +65,57 @@ public class ReindexingMaintainer extends AbstractComponent { ReindexingMaintainer(Clock clock, Metric metric, DocumentAccess access, ZookeepersConfig zookeepersConfig, ClusterListConfig clusterListConfig, AllClustersBucketSpacesConfig allClustersBucketSpacesConfig, ReindexingConfig reindexingConfig) { - this.reindexer = new Reindexer(parseCluster(reindexingConfig.clusterName(), clusterListConfig, allClustersBucketSpacesConfig, access.getDocumentTypeManager()), - parseReady(reindexingConfig, access.getDocumentTypeManager()), - new ReindexingCurator(Curator.create(zookeepersConfig.zookeeperserverlist()), - reindexingConfig.clusterName(), - access.getDocumentTypeManager()), - access, - metric, - clock); - this.executor = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory("reindexer-")); + this.reindexers = reindexingConfig.clusters().entrySet().stream() + .map(cluster -> new Reindexer(parseCluster(cluster.getKey(), clusterListConfig, allClustersBucketSpacesConfig, access.getDocumentTypeManager()), + parseReady(cluster.getValue(), access.getDocumentTypeManager()), + new ReindexingCurator(Curator.create(zookeepersConfig.zookeeperserverlist()), + access.getDocumentTypeManager()), + access, + metric, + clock)) + .collect(toUnmodifiableList()); + this.executor = new ScheduledThreadPoolExecutor(reindexingConfig.clusters().size(), new DaemonThreadFactory("reindexer-")); if (reindexingConfig.enabled()) scheduleStaggered((delayMillis, intervalMillis) -> executor.scheduleAtFixedRate(this::maintain, delayMillis, intervalMillis, TimeUnit.MILLISECONDS), Duration.ofMinutes(1), clock.instant(), HostName.getLocalhost(), zookeepersConfig.zookeeperserverlist()); } private void maintain() { - try { - reindexer.reindex(); - } - catch (ReindexingLockException e) { - log.log(FINE, "Failed to acquire reindexing lock"); - } - catch (Exception e) { - log.log(WARNING, "Exception when reindexing", e); - } + for (Reindexer reindexer : reindexers) + executor.submit(() -> { + try { + reindexer.reindex(); + } + catch (ReindexingLockException e) { + log.log(FINE, "Failed to acquire reindexing lock"); + } + catch (Exception e) { + log.log(WARNING, "Exception when reindexing", e); + } + }); } @Override public void deconstruct() { try { - reindexer.shutdown(); + for (Reindexer reindexer : reindexers) + reindexer.shutdown(); + executor.shutdown(); if ( ! executor.awaitTermination(45, TimeUnit.SECONDS)) - log.log(WARNING, "Failed to shut down reindexer within timeout"); + log.log(WARNING, "Failed to shut down reindexing within timeout"); } catch (InterruptedException e) { - log.log(WARNING, "Interrupted while waiting for reindexer to shut down"); + log.log(WARNING, "Interrupted while waiting for reindexing to shut down"); Thread.currentThread().interrupt(); } } - static Map<DocumentType, Instant> parseReady(ReindexingConfig config, DocumentTypeManager manager) { - return config.status().entrySet().stream() - .collect(toUnmodifiableMap(typeStatus -> manager.getDocumentType(typeStatus.getKey()), - typeStatus -> Instant.ofEpochMilli(typeStatus.getValue().readyAtMillis()))); + static Map<DocumentType, Instant> parseReady(ReindexingConfig.Clusters cluster, DocumentTypeManager manager) { + return cluster.documentTypes().entrySet().stream() + .collect(toUnmodifiableMap(typeStatus -> manager.getDocumentType(typeStatus.getKey()), + typeStatus -> Instant.ofEpochMilli(typeStatus.getValue().readyAtMillis()))); } /** Schedules a task with the given interval (across all containers in this ZK cluster). */ diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/http/ReindexingV1ApiHandler.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/http/ReindexingV1ApiHandler.java index fca08f7743c..b1c0d012325 100644 --- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/http/ReindexingV1ApiHandler.java +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/http/ReindexingV1ApiHandler.java @@ -22,6 +22,8 @@ import com.yahoo.vespa.config.content.reindexing.ReindexingConfig; import com.yahoo.vespa.curator.Curator; import com.yahoo.vespa.zookeeper.VespaZooKeeperServer; +import java.util.Collection; +import java.util.List; import java.util.concurrent.Executor; import static com.yahoo.jdisc.http.HttpRequest.Method.GET; @@ -34,6 +36,7 @@ import static com.yahoo.jdisc.http.HttpRequest.Method.GET; public class ReindexingV1ApiHandler extends ThreadedHttpRequestHandler { private final ReindexingCurator database; + private final List<String> clusterNames; @Inject public ReindexingV1ApiHandler(Executor executor, Metric metric, @@ -41,14 +44,15 @@ public class ReindexingV1ApiHandler extends ThreadedHttpRequestHandler { ReindexingConfig reindexingConfig, DocumentmanagerConfig documentmanagerConfig) { this(executor, metric, + reindexingConfig.clusters().keySet(), new ReindexingCurator(Curator.create(zookeepersConfig.zookeeperserverlist()), - reindexingConfig.clusterName(), new DocumentTypeManager(documentmanagerConfig))); } - ReindexingV1ApiHandler(Executor executor, Metric metric, ReindexingCurator database) { + ReindexingV1ApiHandler(Executor executor, Metric metric, Collection<String> clusterNames, ReindexingCurator database) { super(executor, metric); this.database = database; + this.clusterNames = List.copyOf(clusterNames); } @Override @@ -71,16 +75,18 @@ public class ReindexingV1ApiHandler extends ThreadedHttpRequestHandler { HttpResponse getStatus() { Slime slime = new Slime(); - Cursor statusArray = slime.setObject().setArray("status"); - database.readReindexing().status().forEach((type, status) -> { - Cursor statusObject = statusArray.addObject(); - statusObject.setString("type", type.getName()); - statusObject.setLong("startedMillis", status.startedAt().toEpochMilli()); - status.endedAt().ifPresent(endedAt -> statusObject.setLong("endedMillis", endedAt.toEpochMilli())); - status.progress().ifPresent(progress -> statusObject.setString("progress", progress.serializeToString())); - statusObject.setString("state", toString(status.state())); - status.message().ifPresent(message -> statusObject.setString("message", message)); - }); + Cursor clustersObject = slime.setObject().setObject("clusters"); + for (String clusterName : clusterNames) { + Cursor documentTypesObject = clustersObject.setObject(clusterName).setObject("documentTypes"); + database.readReindexing(clusterName).status().forEach((type, status) -> { + Cursor statusObject = documentTypesObject.setObject(type.getName()); + statusObject.setLong("startedMillis", status.startedAt().toEpochMilli()); + status.endedAt().ifPresent(endedAt -> statusObject.setLong("endedMillis", endedAt.toEpochMilli())); + status.progress().ifPresent(progress -> statusObject.setString("progress", progress.serializeToString())); + statusObject.setString("state", toString(status.state())); + status.message().ifPresent(message -> statusObject.setString("message", message)); + }); + } return new SlimeJsonResponse(slime); } |