aboutsummaryrefslogtreecommitdiffstats
path: root/clustercontroller-reindexer/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'clustercontroller-reindexer/src/main')
-rw-r--r--clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java10
-rw-r--r--clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java26
-rw-r--r--clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java60
-rw-r--r--clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/http/ReindexingV1ApiHandler.java30
4 files changed, 69 insertions, 57 deletions
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java
index ebd6837a97f..19dfd031dfc 100644
--- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java
+++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java
@@ -91,10 +91,10 @@ public class Reindexer {
if (phaser.isTerminated())
throw new IllegalStateException("Already shut down");
- try (Lock lock = database.lockReindexing()) {
- AtomicReference<Reindexing> reindexing = new AtomicReference<>(database.readReindexing());
+ try (Lock lock = database.lockReindexing(cluster.name())) {
+ AtomicReference<Reindexing> reindexing = new AtomicReference<>(database.readReindexing(cluster.name()));
reindexing.set(updateWithReady(ready, reindexing.get(), clock.instant()));
- database.writeReindexing(reindexing.get());
+ database.writeReindexing(reindexing.get(), cluster.name());
metrics.dump(reindexing.get());
for (DocumentType type : ready.keySet()) { // We consider only document types for which we have config.
@@ -150,7 +150,7 @@ public class Reindexer {
status.updateAndGet(value -> value.progressed(token));
if (progressLastStored.get().isBefore(clock.instant().minusSeconds(10))) {
progressLastStored.set(clock.instant());
- database.writeReindexing(reindexing.updateAndGet(value -> value.with(type, status.get())));
+ database.writeReindexing(reindexing.updateAndGet(value -> value.with(type, status.get())), cluster.name());
metrics.dump(reindexing.get());
}
}
@@ -185,7 +185,7 @@ public class Reindexer {
log.log(INFO, "Completed reindexing of " + type + " after " + Duration.between(status.get().startedAt(), clock.instant()));
status.updateAndGet(value -> value.successful(clock.instant()));
}
- database.writeReindexing(reindexing.updateAndGet(value -> value.with(type, status.get())));
+ database.writeReindexing(reindexing.updateAndGet(value -> value.with(type, status.get())), cluster.name());
metrics.dump(reindexing.get());
}
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java
index 5336275a9c0..22ae54fcc6b 100644
--- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java
+++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java
@@ -29,43 +29,41 @@ import static java.util.stream.Collectors.toUnmodifiableMap;
public class ReindexingCurator {
private final Curator curator;
- private final String clusterName;
private final ReindexingSerializer serializer;
private final Duration lockTimeout;
- public ReindexingCurator(Curator curator, String clusterName, DocumentTypeManager manager) {
- this(curator, clusterName, manager, Duration.ofSeconds(1));
+ public ReindexingCurator(Curator curator, DocumentTypeManager manager) {
+ this(curator, manager, Duration.ofSeconds(1));
}
- ReindexingCurator(Curator curator, String clusterName, DocumentTypeManager manager, Duration lockTimeout) {
+ ReindexingCurator(Curator curator, DocumentTypeManager manager, Duration lockTimeout) {
this.curator = curator;
- this.clusterName = clusterName;
this.serializer = new ReindexingSerializer(manager);
this.lockTimeout = lockTimeout;
}
- public Reindexing readReindexing() {
- return curator.getData(statusPath()).map(serializer::deserialize)
+ public Reindexing readReindexing(String cluster) {
+ return curator.getData(statusPath(cluster)).map(serializer::deserialize)
.orElse(Reindexing.empty());
}
- public void writeReindexing(Reindexing reindexing) {
- curator.set(statusPath(), serializer.serialize(reindexing));
+ public void writeReindexing(Reindexing reindexing, String cluster) {
+ curator.set(statusPath(cluster), serializer.serialize(reindexing));
}
/** This lock must be held to manipulate reindexing state, or by whoever has a running visitor. */
- public Lock lockReindexing() throws ReindexingLockException {
+ public Lock lockReindexing(String cluster) throws ReindexingLockException {
try {
- return curator.lock(lockPath(), lockTimeout);
+ return curator.lock(lockPath(cluster), lockTimeout);
}
catch (UncheckedTimeoutException e) { // TODO jonmv: Avoid use of guava classes.
throw new ReindexingLockException(e);
}
}
- private Path rootPath() { return Path.fromString("/reindexing/v1/" + clusterName); }
- private Path statusPath() { return rootPath().append("status"); }
- private Path lockPath() { return rootPath().append("lock"); }
+ private Path rootPath(String clusterName) { return Path.fromString("/reindexing/v1/" + clusterName); }
+ private Path statusPath(String clusterName) { return rootPath(clusterName).append("status"); }
+ private Path lockPath(String clusterName) { return rootPath(clusterName).append("lock"); }
private static class ReindexingSerializer {
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java
index 7989338c406..9a114eabbb5 100644
--- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java
+++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java
@@ -29,12 +29,14 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.logging.Logger;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
import static java.util.logging.Level.FINE;
import static java.util.logging.Level.WARNING;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
+import static java.util.stream.Collectors.toUnmodifiableList;
import static java.util.stream.Collectors.toUnmodifiableMap;
/**
@@ -48,7 +50,7 @@ public class ReindexingMaintainer extends AbstractComponent {
private static final Logger log = Logger.getLogger(Reindexing.class.getName());
- private final Reindexer reindexer;
+ private final List<Reindexer> reindexers;
private final ScheduledExecutorService executor;
@Inject
@@ -63,51 +65,57 @@ public class ReindexingMaintainer extends AbstractComponent {
ReindexingMaintainer(Clock clock, Metric metric, DocumentAccess access, ZookeepersConfig zookeepersConfig,
ClusterListConfig clusterListConfig, AllClustersBucketSpacesConfig allClustersBucketSpacesConfig,
ReindexingConfig reindexingConfig) {
- this.reindexer = new Reindexer(parseCluster(reindexingConfig.clusterName(), clusterListConfig, allClustersBucketSpacesConfig, access.getDocumentTypeManager()),
- parseReady(reindexingConfig, access.getDocumentTypeManager()),
- new ReindexingCurator(Curator.create(zookeepersConfig.zookeeperserverlist()),
- reindexingConfig.clusterName(),
- access.getDocumentTypeManager()),
- access,
- metric,
- clock);
- this.executor = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory("reindexer-"));
+ this.reindexers = reindexingConfig.clusters().entrySet().stream()
+ .map(cluster -> new Reindexer(parseCluster(cluster.getKey(), clusterListConfig, allClustersBucketSpacesConfig, access.getDocumentTypeManager()),
+ parseReady(cluster.getValue(), access.getDocumentTypeManager()),
+ new ReindexingCurator(Curator.create(zookeepersConfig.zookeeperserverlist()),
+ access.getDocumentTypeManager()),
+ access,
+ metric,
+ clock))
+ .collect(toUnmodifiableList());
+ this.executor = new ScheduledThreadPoolExecutor(reindexingConfig.clusters().size(), new DaemonThreadFactory("reindexer-"));
if (reindexingConfig.enabled())
scheduleStaggered((delayMillis, intervalMillis) -> executor.scheduleAtFixedRate(this::maintain, delayMillis, intervalMillis, TimeUnit.MILLISECONDS),
Duration.ofMinutes(1), clock.instant(), HostName.getLocalhost(), zookeepersConfig.zookeeperserverlist());
}
private void maintain() {
- try {
- reindexer.reindex();
- }
- catch (ReindexingLockException e) {
- log.log(FINE, "Failed to acquire reindexing lock");
- }
- catch (Exception e) {
- log.log(WARNING, "Exception when reindexing", e);
- }
+ for (Reindexer reindexer : reindexers)
+ executor.submit(() -> {
+ try {
+ reindexer.reindex();
+ }
+ catch (ReindexingLockException e) {
+ log.log(FINE, "Failed to acquire reindexing lock");
+ }
+ catch (Exception e) {
+ log.log(WARNING, "Exception when reindexing", e);
+ }
+ });
}
@Override
public void deconstruct() {
try {
- reindexer.shutdown();
+ for (Reindexer reindexer : reindexers)
+ reindexer.shutdown();
+
executor.shutdown();
if ( ! executor.awaitTermination(45, TimeUnit.SECONDS))
- log.log(WARNING, "Failed to shut down reindexer within timeout");
+ log.log(WARNING, "Failed to shut down reindexing within timeout");
}
catch (InterruptedException e) {
- log.log(WARNING, "Interrupted while waiting for reindexer to shut down");
+ log.log(WARNING, "Interrupted while waiting for reindexing to shut down");
Thread.currentThread().interrupt();
}
}
- static Map<DocumentType, Instant> parseReady(ReindexingConfig config, DocumentTypeManager manager) {
- return config.status().entrySet().stream()
- .collect(toUnmodifiableMap(typeStatus -> manager.getDocumentType(typeStatus.getKey()),
- typeStatus -> Instant.ofEpochMilli(typeStatus.getValue().readyAtMillis())));
+ static Map<DocumentType, Instant> parseReady(ReindexingConfig.Clusters cluster, DocumentTypeManager manager) {
+ return cluster.documentTypes().entrySet().stream()
+ .collect(toUnmodifiableMap(typeStatus -> manager.getDocumentType(typeStatus.getKey()),
+ typeStatus -> Instant.ofEpochMilli(typeStatus.getValue().readyAtMillis())));
}
/** Schedules a task with the given interval (across all containers in this ZK cluster). */
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/http/ReindexingV1ApiHandler.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/http/ReindexingV1ApiHandler.java
index fca08f7743c..b1c0d012325 100644
--- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/http/ReindexingV1ApiHandler.java
+++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/http/ReindexingV1ApiHandler.java
@@ -22,6 +22,8 @@ import com.yahoo.vespa.config.content.reindexing.ReindexingConfig;
import com.yahoo.vespa.curator.Curator;
import com.yahoo.vespa.zookeeper.VespaZooKeeperServer;
+import java.util.Collection;
+import java.util.List;
import java.util.concurrent.Executor;
import static com.yahoo.jdisc.http.HttpRequest.Method.GET;
@@ -34,6 +36,7 @@ import static com.yahoo.jdisc.http.HttpRequest.Method.GET;
public class ReindexingV1ApiHandler extends ThreadedHttpRequestHandler {
private final ReindexingCurator database;
+ private final List<String> clusterNames;
@Inject
public ReindexingV1ApiHandler(Executor executor, Metric metric,
@@ -41,14 +44,15 @@ public class ReindexingV1ApiHandler extends ThreadedHttpRequestHandler {
ReindexingConfig reindexingConfig, DocumentmanagerConfig documentmanagerConfig) {
this(executor,
metric,
+ reindexingConfig.clusters().keySet(),
new ReindexingCurator(Curator.create(zookeepersConfig.zookeeperserverlist()),
- reindexingConfig.clusterName(),
new DocumentTypeManager(documentmanagerConfig)));
}
- ReindexingV1ApiHandler(Executor executor, Metric metric, ReindexingCurator database) {
+ ReindexingV1ApiHandler(Executor executor, Metric metric, Collection<String> clusterNames, ReindexingCurator database) {
super(executor, metric);
this.database = database;
+ this.clusterNames = List.copyOf(clusterNames);
}
@Override
@@ -71,16 +75,18 @@ public class ReindexingV1ApiHandler extends ThreadedHttpRequestHandler {
HttpResponse getStatus() {
Slime slime = new Slime();
- Cursor statusArray = slime.setObject().setArray("status");
- database.readReindexing().status().forEach((type, status) -> {
- Cursor statusObject = statusArray.addObject();
- statusObject.setString("type", type.getName());
- statusObject.setLong("startedMillis", status.startedAt().toEpochMilli());
- status.endedAt().ifPresent(endedAt -> statusObject.setLong("endedMillis", endedAt.toEpochMilli()));
- status.progress().ifPresent(progress -> statusObject.setString("progress", progress.serializeToString()));
- statusObject.setString("state", toString(status.state()));
- status.message().ifPresent(message -> statusObject.setString("message", message));
- });
+ Cursor clustersObject = slime.setObject().setObject("clusters");
+ for (String clusterName : clusterNames) {
+ Cursor documentTypesObject = clustersObject.setObject(clusterName).setObject("documentTypes");
+ database.readReindexing(clusterName).status().forEach((type, status) -> {
+ Cursor statusObject = documentTypesObject.setObject(type.getName());
+ statusObject.setLong("startedMillis", status.startedAt().toEpochMilli());
+ status.endedAt().ifPresent(endedAt -> statusObject.setLong("endedMillis", endedAt.toEpochMilli()));
+ status.progress().ifPresent(progress -> statusObject.setString("progress", progress.serializeToString()));
+ statusObject.setString("state", toString(status.state()));
+ status.message().ifPresent(message -> statusObject.setString("message", message));
+ });
+ }
return new SlimeJsonResponse(slime);
}