aboutsummaryrefslogtreecommitdiffstats
path: root/clustercontroller-reindexer
diff options
context:
space:
mode:
authorJon Marius Venstad <jonmv@users.noreply.github.com>2020-12-08 11:45:04 +0100
committerGitHub <noreply@github.com>2020-12-08 11:45:04 +0100
commit51b0057b5c3d0e5a53af6f6e32c92af6108b6d3d (patch)
tree0c0c823aac9027a011efce270fa0db8f8214f1b3 /clustercontroller-reindexer
parentec953c84bcf1f24a5d343882f2308c2d5f354dab (diff)
parentc66e845c151a1049a49fc087e98b5931c8d9270d (diff)
Merge pull request #15704 from vespa-engine/revert-15691-revert-15677-jonmv/reindexing-over-multiple-clusters
Revert "Revert "Jonmv/reindexing over multiple clusters""
Diffstat (limited to 'clustercontroller-reindexer')
-rw-r--r--clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java10
-rw-r--r--clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java26
-rw-r--r--clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java60
-rw-r--r--clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/http/ReindexingV1ApiHandler.java30
-rw-r--r--clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java22
-rw-r--r--clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingCuratorTest.java14
-rw-r--r--clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingMaintainerTest.java12
-rw-r--r--clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/http/ReindexingV1ApiTest.java36
8 files changed, 116 insertions, 94 deletions
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java
index ebd6837a97f..19dfd031dfc 100644
--- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java
+++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java
@@ -91,10 +91,10 @@ public class Reindexer {
if (phaser.isTerminated())
throw new IllegalStateException("Already shut down");
- try (Lock lock = database.lockReindexing()) {
- AtomicReference<Reindexing> reindexing = new AtomicReference<>(database.readReindexing());
+ try (Lock lock = database.lockReindexing(cluster.name())) {
+ AtomicReference<Reindexing> reindexing = new AtomicReference<>(database.readReindexing(cluster.name()));
reindexing.set(updateWithReady(ready, reindexing.get(), clock.instant()));
- database.writeReindexing(reindexing.get());
+ database.writeReindexing(reindexing.get(), cluster.name());
metrics.dump(reindexing.get());
for (DocumentType type : ready.keySet()) { // We consider only document types for which we have config.
@@ -150,7 +150,7 @@ public class Reindexer {
status.updateAndGet(value -> value.progressed(token));
if (progressLastStored.get().isBefore(clock.instant().minusSeconds(10))) {
progressLastStored.set(clock.instant());
- database.writeReindexing(reindexing.updateAndGet(value -> value.with(type, status.get())));
+ database.writeReindexing(reindexing.updateAndGet(value -> value.with(type, status.get())), cluster.name());
metrics.dump(reindexing.get());
}
}
@@ -185,7 +185,7 @@ public class Reindexer {
log.log(INFO, "Completed reindexing of " + type + " after " + Duration.between(status.get().startedAt(), clock.instant()));
status.updateAndGet(value -> value.successful(clock.instant()));
}
- database.writeReindexing(reindexing.updateAndGet(value -> value.with(type, status.get())));
+ database.writeReindexing(reindexing.updateAndGet(value -> value.with(type, status.get())), cluster.name());
metrics.dump(reindexing.get());
}
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java
index 5336275a9c0..22ae54fcc6b 100644
--- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java
+++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java
@@ -29,43 +29,41 @@ import static java.util.stream.Collectors.toUnmodifiableMap;
public class ReindexingCurator {
private final Curator curator;
- private final String clusterName;
private final ReindexingSerializer serializer;
private final Duration lockTimeout;
- public ReindexingCurator(Curator curator, String clusterName, DocumentTypeManager manager) {
- this(curator, clusterName, manager, Duration.ofSeconds(1));
+ public ReindexingCurator(Curator curator, DocumentTypeManager manager) {
+ this(curator, manager, Duration.ofSeconds(1));
}
- ReindexingCurator(Curator curator, String clusterName, DocumentTypeManager manager, Duration lockTimeout) {
+ ReindexingCurator(Curator curator, DocumentTypeManager manager, Duration lockTimeout) {
this.curator = curator;
- this.clusterName = clusterName;
this.serializer = new ReindexingSerializer(manager);
this.lockTimeout = lockTimeout;
}
- public Reindexing readReindexing() {
- return curator.getData(statusPath()).map(serializer::deserialize)
+ public Reindexing readReindexing(String cluster) {
+ return curator.getData(statusPath(cluster)).map(serializer::deserialize)
.orElse(Reindexing.empty());
}
- public void writeReindexing(Reindexing reindexing) {
- curator.set(statusPath(), serializer.serialize(reindexing));
+ public void writeReindexing(Reindexing reindexing, String cluster) {
+ curator.set(statusPath(cluster), serializer.serialize(reindexing));
}
/** This lock must be held to manipulate reindexing state, or by whoever has a running visitor. */
- public Lock lockReindexing() throws ReindexingLockException {
+ public Lock lockReindexing(String cluster) throws ReindexingLockException {
try {
- return curator.lock(lockPath(), lockTimeout);
+ return curator.lock(lockPath(cluster), lockTimeout);
}
catch (UncheckedTimeoutException e) { // TODO jonmv: Avoid use of guava classes.
throw new ReindexingLockException(e);
}
}
- private Path rootPath() { return Path.fromString("/reindexing/v1/" + clusterName); }
- private Path statusPath() { return rootPath().append("status"); }
- private Path lockPath() { return rootPath().append("lock"); }
+ private Path rootPath(String clusterName) { return Path.fromString("/reindexing/v1/" + clusterName); }
+ private Path statusPath(String clusterName) { return rootPath(clusterName).append("status"); }
+ private Path lockPath(String clusterName) { return rootPath(clusterName).append("lock"); }
private static class ReindexingSerializer {
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java
index 7989338c406..9a114eabbb5 100644
--- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java
+++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java
@@ -29,12 +29,14 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.logging.Logger;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
import static java.util.logging.Level.FINE;
import static java.util.logging.Level.WARNING;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
+import static java.util.stream.Collectors.toUnmodifiableList;
import static java.util.stream.Collectors.toUnmodifiableMap;
/**
@@ -48,7 +50,7 @@ public class ReindexingMaintainer extends AbstractComponent {
private static final Logger log = Logger.getLogger(Reindexing.class.getName());
- private final Reindexer reindexer;
+ private final List<Reindexer> reindexers;
private final ScheduledExecutorService executor;
@Inject
@@ -63,51 +65,57 @@ public class ReindexingMaintainer extends AbstractComponent {
ReindexingMaintainer(Clock clock, Metric metric, DocumentAccess access, ZookeepersConfig zookeepersConfig,
ClusterListConfig clusterListConfig, AllClustersBucketSpacesConfig allClustersBucketSpacesConfig,
ReindexingConfig reindexingConfig) {
- this.reindexer = new Reindexer(parseCluster(reindexingConfig.clusterName(), clusterListConfig, allClustersBucketSpacesConfig, access.getDocumentTypeManager()),
- parseReady(reindexingConfig, access.getDocumentTypeManager()),
- new ReindexingCurator(Curator.create(zookeepersConfig.zookeeperserverlist()),
- reindexingConfig.clusterName(),
- access.getDocumentTypeManager()),
- access,
- metric,
- clock);
- this.executor = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory("reindexer-"));
+ this.reindexers = reindexingConfig.clusters().entrySet().stream()
+ .map(cluster -> new Reindexer(parseCluster(cluster.getKey(), clusterListConfig, allClustersBucketSpacesConfig, access.getDocumentTypeManager()),
+ parseReady(cluster.getValue(), access.getDocumentTypeManager()),
+ new ReindexingCurator(Curator.create(zookeepersConfig.zookeeperserverlist()),
+ access.getDocumentTypeManager()),
+ access,
+ metric,
+ clock))
+ .collect(toUnmodifiableList());
+ this.executor = new ScheduledThreadPoolExecutor(reindexingConfig.clusters().size(), new DaemonThreadFactory("reindexer-"));
if (reindexingConfig.enabled())
scheduleStaggered((delayMillis, intervalMillis) -> executor.scheduleAtFixedRate(this::maintain, delayMillis, intervalMillis, TimeUnit.MILLISECONDS),
Duration.ofMinutes(1), clock.instant(), HostName.getLocalhost(), zookeepersConfig.zookeeperserverlist());
}
private void maintain() {
- try {
- reindexer.reindex();
- }
- catch (ReindexingLockException e) {
- log.log(FINE, "Failed to acquire reindexing lock");
- }
- catch (Exception e) {
- log.log(WARNING, "Exception when reindexing", e);
- }
+ for (Reindexer reindexer : reindexers)
+ executor.submit(() -> {
+ try {
+ reindexer.reindex();
+ }
+ catch (ReindexingLockException e) {
+ log.log(FINE, "Failed to acquire reindexing lock");
+ }
+ catch (Exception e) {
+ log.log(WARNING, "Exception when reindexing", e);
+ }
+ });
}
@Override
public void deconstruct() {
try {
- reindexer.shutdown();
+ for (Reindexer reindexer : reindexers)
+ reindexer.shutdown();
+
executor.shutdown();
if ( ! executor.awaitTermination(45, TimeUnit.SECONDS))
- log.log(WARNING, "Failed to shut down reindexer within timeout");
+ log.log(WARNING, "Failed to shut down reindexing within timeout");
}
catch (InterruptedException e) {
- log.log(WARNING, "Interrupted while waiting for reindexer to shut down");
+ log.log(WARNING, "Interrupted while waiting for reindexing to shut down");
Thread.currentThread().interrupt();
}
}
- static Map<DocumentType, Instant> parseReady(ReindexingConfig config, DocumentTypeManager manager) {
- return config.status().entrySet().stream()
- .collect(toUnmodifiableMap(typeStatus -> manager.getDocumentType(typeStatus.getKey()),
- typeStatus -> Instant.ofEpochMilli(typeStatus.getValue().readyAtMillis())));
+ static Map<DocumentType, Instant> parseReady(ReindexingConfig.Clusters cluster, DocumentTypeManager manager) {
+ return cluster.documentTypes().entrySet().stream()
+ .collect(toUnmodifiableMap(typeStatus -> manager.getDocumentType(typeStatus.getKey()),
+ typeStatus -> Instant.ofEpochMilli(typeStatus.getValue().readyAtMillis())));
}
/** Schedules a task with the given interval (across all containers in this ZK cluster). */
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/http/ReindexingV1ApiHandler.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/http/ReindexingV1ApiHandler.java
index fca08f7743c..b1c0d012325 100644
--- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/http/ReindexingV1ApiHandler.java
+++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/http/ReindexingV1ApiHandler.java
@@ -22,6 +22,8 @@ import com.yahoo.vespa.config.content.reindexing.ReindexingConfig;
import com.yahoo.vespa.curator.Curator;
import com.yahoo.vespa.zookeeper.VespaZooKeeperServer;
+import java.util.Collection;
+import java.util.List;
import java.util.concurrent.Executor;
import static com.yahoo.jdisc.http.HttpRequest.Method.GET;
@@ -34,6 +36,7 @@ import static com.yahoo.jdisc.http.HttpRequest.Method.GET;
public class ReindexingV1ApiHandler extends ThreadedHttpRequestHandler {
private final ReindexingCurator database;
+ private final List<String> clusterNames;
@Inject
public ReindexingV1ApiHandler(Executor executor, Metric metric,
@@ -41,14 +44,15 @@ public class ReindexingV1ApiHandler extends ThreadedHttpRequestHandler {
ReindexingConfig reindexingConfig, DocumentmanagerConfig documentmanagerConfig) {
this(executor,
metric,
+ reindexingConfig.clusters().keySet(),
new ReindexingCurator(Curator.create(zookeepersConfig.zookeeperserverlist()),
- reindexingConfig.clusterName(),
new DocumentTypeManager(documentmanagerConfig)));
}
- ReindexingV1ApiHandler(Executor executor, Metric metric, ReindexingCurator database) {
+ ReindexingV1ApiHandler(Executor executor, Metric metric, Collection<String> clusterNames, ReindexingCurator database) {
super(executor, metric);
this.database = database;
+ this.clusterNames = List.copyOf(clusterNames);
}
@Override
@@ -71,16 +75,18 @@ public class ReindexingV1ApiHandler extends ThreadedHttpRequestHandler {
HttpResponse getStatus() {
Slime slime = new Slime();
- Cursor statusArray = slime.setObject().setArray("status");
- database.readReindexing().status().forEach((type, status) -> {
- Cursor statusObject = statusArray.addObject();
- statusObject.setString("type", type.getName());
- statusObject.setLong("startedMillis", status.startedAt().toEpochMilli());
- status.endedAt().ifPresent(endedAt -> statusObject.setLong("endedMillis", endedAt.toEpochMilli()));
- status.progress().ifPresent(progress -> statusObject.setString("progress", progress.serializeToString()));
- statusObject.setString("state", toString(status.state()));
- status.message().ifPresent(message -> statusObject.setString("message", message));
- });
+ Cursor clustersObject = slime.setObject().setObject("clusters");
+ for (String clusterName : clusterNames) {
+ Cursor documentTypesObject = clustersObject.setObject(clusterName).setObject("documentTypes");
+ database.readReindexing(clusterName).status().forEach((type, status) -> {
+ Cursor statusObject = documentTypesObject.setObject(type.getName());
+ statusObject.setLong("startedMillis", status.startedAt().toEpochMilli());
+ status.endedAt().ifPresent(endedAt -> statusObject.setLong("endedMillis", endedAt.toEpochMilli()));
+ status.progress().ifPresent(progress -> statusObject.setString("progress", progress.serializeToString()));
+ statusObject.setString("state", toString(status.state()));
+ status.message().ifPresent(message -> statusObject.setString("message", message));
+ });
+ }
return new SlimeJsonResponse(slime);
}
diff --git a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java
index 3ba4083121c..7086c36af3f 100644
--- a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java
+++ b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java
@@ -53,7 +53,7 @@ class ReindexerTest {
@BeforeEach
void setUp() {
- database = new ReindexingCurator(new MockCurator(), "cluster", manager, Duration.ofMillis(1));
+ database = new ReindexingCurator(new MockCurator(), manager, Duration.ofMillis(1));
}
@Test
@@ -70,7 +70,7 @@ class ReindexerTest {
@Test
void throwsWhenLockHeldElsewhere() throws InterruptedException, ExecutionException {
Reindexer reindexer = new Reindexer(cluster, Map.of(music, Instant.EPOCH), database, failIfCalled, metric, clock);
- Executors.newSingleThreadExecutor().submit(database::lockReindexing).get();
+ Executors.newSingleThreadExecutor().submit(() -> database.lockReindexing("cluster")).get();
assertThrows(ReindexingLockException.class, reindexer::reindex);
}
@@ -102,7 +102,7 @@ class ReindexerTest {
// Since "music" is a new document type, it is stored as just reindexed, and nothing else happens.
new Reindexer(cluster, Map.of(music, Instant.EPOCH), database, failIfCalled, metric, clock).reindex();
Reindexing reindexing = Reindexing.empty().with(music, Status.ready(Instant.EPOCH).running().successful(Instant.EPOCH));
- assertEquals(reindexing, database.readReindexing());
+ assertEquals(reindexing, database.readReindexing("cluster"));
assertEquals(Map.of("reindexing.progress", Map.of(Map.of("documenttype", "music",
"clusterid", "cluster",
"state", "successful"),
@@ -125,19 +125,19 @@ class ReindexerTest {
// Nothing happens, since it's not yet time. This isn't supposed to happen unless high clock skew.
clock.advance(Duration.ofMillis(5));
new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(10)), database, failIfCalled, metric, clock).reindex();
- assertEquals(reindexing, database.readReindexing());
+ assertEquals(reindexing, database.readReindexing("cluster"));
// It's time to reindex the "music" documents — let this complete successfully.
clock.advance(Duration.ofMillis(10));
AtomicBoolean shutDown = new AtomicBoolean();
Executor executor = Executors.newSingleThreadExecutor();
new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(10)), database, parameters -> {
- database.writeReindexing(Reindexing.empty()); // Wipe database to verify we write data from reindexer.
+ database.writeReindexing(Reindexing.empty(), "cluster"); // Wipe database to verify we write data from reindexer.
executor.execute(() -> parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.SUCCESS, "OK"));
return () -> shutDown.set(true);
}, metric, clock).reindex();
reindexing = reindexing.with(music, Status.ready(clock.instant()).running().successful(clock.instant()));
- assertEquals(reindexing, database.readReindexing());
+ assertEquals(reindexing, database.readReindexing("cluster"));
assertTrue(shutDown.get(), "Session was shut down");
// One more reindexing, this time shut down before visit completes, but after progress is reported.
@@ -146,7 +146,7 @@ class ReindexerTest {
shutDown.set(false);
AtomicReference<Reindexer> aborted = new AtomicReference<>();
aborted.set(new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(20)), database, parameters -> {
- database.writeReindexing(Reindexing.empty()); // Wipe database to verify we write data from reindexer.
+ database.writeReindexing(Reindexing.empty(), "cluster"); // Wipe database to verify we write data from reindexer.
parameters.getControlHandler().onProgress(new ProgressToken());
aborted.get().shutdown();
return () -> {
@@ -156,7 +156,7 @@ class ReindexerTest {
}, metric, clock));
aborted.get().reindex();
reindexing = reindexing.with(music, Status.ready(clock.instant()).running().progressed(new ProgressToken()).halted());
- assertEquals(reindexing, database.readReindexing());
+ assertEquals(reindexing, database.readReindexing("cluster"));
assertTrue(shutDown.get(), "Session was shut down");
assertEquals(1.0, // new ProgressToken() is 100% done.
metric.metrics().get("reindexing.progress")
@@ -168,17 +168,17 @@ class ReindexerTest {
clock.advance(Duration.ofMillis(10));
shutDown.set(false);
new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(30)), database, parameters -> {
- database.writeReindexing(Reindexing.empty()); // Wipe database to verify we write data from reindexer.
+ database.writeReindexing(Reindexing.empty(), "cluster"); // Wipe database to verify we write data from reindexer.
executor.execute(() -> parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.FAILURE, "Error"));
return () -> shutDown.set(true);
}, metric, clock).reindex();
reindexing = reindexing.with(music, Status.ready(clock.instant()).running().failed(clock.instant(), "Error"));
- assertEquals(reindexing, database.readReindexing());
+ assertEquals(reindexing, database.readReindexing("cluster"));
assertTrue(shutDown.get(), "Session was shut down");
// Document type is ignored in next run, as it has failed fatally.
new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(30)), database, failIfCalled, metric, clock).reindex();
- assertEquals(reindexing, database.readReindexing());
+ assertEquals(reindexing, database.readReindexing("cluster"));
}
}
diff --git a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingCuratorTest.java b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingCuratorTest.java
index c5a58dcae68..7d4cb2af47e 100644
--- a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingCuratorTest.java
+++ b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingCuratorTest.java
@@ -25,24 +25,24 @@ class ReindexingCuratorTest {
DocumentTypeManager manager = new DocumentTypeManager(musicConfig);
DocumentType music = manager.getDocumentType("music");
MockCurator mockCurator = new MockCurator();
- ReindexingCurator curator = new ReindexingCurator(mockCurator, "cluster", manager);
+ ReindexingCurator curator = new ReindexingCurator(mockCurator, manager);
- assertEquals(Reindexing.empty(), curator.readReindexing());
+ assertEquals(Reindexing.empty(), curator.readReindexing("cluster"));
Reindexing.Status status = Reindexing.Status.ready(Instant.ofEpochMilli(123))
.running()
.progressed(new ProgressToken());
Reindexing reindexing = Reindexing.empty().with(music, status);
- curator.writeReindexing(reindexing);
- assertEquals(reindexing, curator.readReindexing());
+ curator.writeReindexing(reindexing, "cluster");
+ assertEquals(reindexing, curator.readReindexing("cluster"));
status = status.halted().running().failed(Instant.ofEpochMilli(321), "error");
reindexing = reindexing.with(music, status);
- curator.writeReindexing(reindexing);
- assertEquals(reindexing, curator.readReindexing());
+ curator.writeReindexing(reindexing, "cluster");
+ assertEquals(reindexing, curator.readReindexing("cluster"));
// Unknown document types are forgotten.
- assertEquals(Reindexing.empty(), new ReindexingCurator(mockCurator, "cluster", new DocumentTypeManager(emptyConfig)).readReindexing());
+ assertEquals(Reindexing.empty(), new ReindexingCurator(mockCurator, new DocumentTypeManager(emptyConfig)).readReindexing("cluster"));
}
}
diff --git a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingMaintainerTest.java b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingMaintainerTest.java
index 713fb836d62..afa68debadb 100644
--- a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingMaintainerTest.java
+++ b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingMaintainerTest.java
@@ -31,17 +31,15 @@ class ReindexingMaintainerTest {
DocumentTypeManager manager = new DocumentTypeManager(musicConfig);
assertEquals(Map.of(manager.getDocumentType("music"), Instant.ofEpochMilli(123)),
- parseReady(new ReindexingConfig.Builder()
- .enabled(true)
- .clusterName("cluster")
- .status("music", new ReindexingConfig.Status.Builder().readyAtMillis(123))
+ parseReady(new ReindexingConfig.Clusters.Builder()
+ .documentTypes("music", new ReindexingConfig.Clusters.DocumentTypes.Builder().readyAtMillis(123))
.build(),
manager));
// Unknown document type fails
- assertThrows(IllegalArgumentException.class,
- () -> parseReady(new ReindexingConfig.Builder()
- .status("poetry", new ReindexingConfig.Status.Builder().readyAtMillis(123))
+ assertThrows(NullPointerException.class,
+ () -> parseReady(new ReindexingConfig.Clusters.Builder()
+ .documentTypes("poetry", new ReindexingConfig.Clusters.DocumentTypes.Builder().readyAtMillis(123))
.build(),
manager));
diff --git a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/http/ReindexingV1ApiTest.java b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/http/ReindexingV1ApiTest.java
index 1b6379d21e5..b8f62050347 100644
--- a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/http/ReindexingV1ApiTest.java
+++ b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/http/ReindexingV1ApiTest.java
@@ -15,6 +15,7 @@ import com.yahoo.vespa.curator.mock.MockCurator;
import org.junit.jupiter.api.Test;
import java.time.Instant;
+import java.util.List;
import java.util.concurrent.Executors;
import static com.yahoo.jdisc.http.HttpRequest.Method.POST;
@@ -28,8 +29,9 @@ class ReindexingV1ApiTest {
DocumentmanagerConfig musicConfig = Deriver.getDocumentManagerConfig("src/test/resources/schemas/music.sd").build();
DocumentTypeManager manager = new DocumentTypeManager(musicConfig);
DocumentType musicType = manager.getDocumentType("music");
- ReindexingCurator database = new ReindexingCurator(new MockCurator(), "cluster", manager);
- ReindexingV1ApiHandler handler = new ReindexingV1ApiHandler(Executors.newSingleThreadExecutor(), new MockMetric(), database);
+ ReindexingCurator database = new ReindexingCurator(new MockCurator(), manager);
+ ReindexingV1ApiHandler handler = new ReindexingV1ApiHandler(Executors.newSingleThreadExecutor(), new MockMetric(),
+ List.of("cluster", "oyster"), database);
@Test
void testResponses() {
@@ -43,23 +45,33 @@ class ReindexingV1ApiTest {
// GET at status with empty database
response = driver.sendRequest("http://localhost/reindexing/v1/status");
- assertEquals("{\"status\":[]}", response.readAll());
+ assertEquals("{\"clusters\":{\"cluster\":{\"documentTypes\":{}},\"oyster\":{\"documentTypes\":{}}}}", response.readAll());
assertEquals(200, response.getStatus());
// GET at status with a failed status
database.writeReindexing(Reindexing.empty().with(musicType, Status.ready(Instant.EPOCH)
.running()
.progressed(new ProgressToken())
- .failed(Instant.ofEpochMilli(123), "ヽ(。_°)ノ")));
+ .failed(Instant.ofEpochMilli(123), "ヽ(。_°)ノ")),
+ "cluster");
response = driver.sendRequest("http://localhost/reindexing/v1/status");
- assertEquals("{\"status\":[{" +
- "\"type\":\"music\"," +
- "\"startedMillis\":0," +
- "\"endedMillis\":123," +
- "\"progress\":\"" + new ProgressToken().serializeToString() + "\"," +
- "\"state\":\"failed\"," +
- "\"message\":\"ヽ(。_°)ノ\"}" +
- "]}",
+ assertEquals("{" +
+ "\"clusters\":{" +
+ "\"cluster\":{" +
+ "\"documentTypes\":{" +
+ "\"music\":{" +
+ "\"startedMillis\":0," +
+ "\"endedMillis\":123," +
+ "\"progress\":\"" + new ProgressToken().serializeToString() + "\"," +
+ "\"state\":\"failed\"," +
+ "\"message\":\"ヽ(。_°)ノ\"}" +
+ "}" +
+ "}," +
+ "\"oyster\":{" +
+ "\"documentTypes\":{}" +
+ "}" +
+ "}" +
+ "}",
response.readAll());
assertEquals(200, response.getStatus());