aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJon Marius Venstad <jonmv@users.noreply.github.com>2020-12-04 23:26:35 +0100
committerGitHub <noreply@github.com>2020-12-04 23:26:35 +0100
commit638b4f6cc2b5c5926c4802ef407c596649e169c8 (patch)
treef794039500efc7990246e96c81668d9baea3d885
parenta90709008ec0d108ee9a2e26bda20e39a10424b5 (diff)
Revert "Jonmv/reindexing over multiple clusters"
-rw-r--r--clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java10
-rw-r--r--clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java26
-rw-r--r--clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java60
-rw-r--r--clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/http/ReindexingV1ApiHandler.java30
-rw-r--r--clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java22
-rw-r--r--clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingCuratorTest.java14
-rw-r--r--clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingMaintainerTest.java12
-rw-r--r--clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/http/ReindexingV1ApiTest.java36
-rw-r--r--configdefinitions/src/vespa/reindexing.def7
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/application/ClusterReindexing.java51
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClient.java57
-rw-r--r--configserver/src/test/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClientTest.java62
12 files changed, 137 insertions, 250 deletions
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java
index eaca5e3a847..ef5e676e36b 100644
--- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java
+++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java
@@ -91,10 +91,10 @@ public class Reindexer {
if (phaser.isTerminated())
throw new IllegalStateException("Already shut down");
- try (Lock lock = database.lockReindexing(cluster.name())) {
- AtomicReference<Reindexing> reindexing = new AtomicReference<>(database.readReindexing(cluster.name()));
+ try (Lock lock = database.lockReindexing()) {
+ AtomicReference<Reindexing> reindexing = new AtomicReference<>(database.readReindexing());
reindexing.set(updateWithReady(ready, reindexing.get(), clock.instant()));
- database.writeReindexing(reindexing.get(), cluster.name());
+ database.writeReindexing(reindexing.get());
metrics.dump(reindexing.get());
for (DocumentType type : ready.keySet()) { // We consider only document types for which we have config.
@@ -150,7 +150,7 @@ public class Reindexer {
status.updateAndGet(value -> value.progressed(token));
if (progressLastStored.get().isBefore(clock.instant().minusSeconds(10))) {
progressLastStored.set(clock.instant());
- database.writeReindexing(reindexing.updateAndGet(value -> value.with(type, status.get())), cluster.name());
+ database.writeReindexing(reindexing.updateAndGet(value -> value.with(type, status.get())));
metrics.dump(reindexing.get());
}
}
@@ -184,7 +184,7 @@ public class Reindexer {
log.log(INFO, "Completed reindexing of " + type + " after " + Duration.between(status.get().startedAt(), clock.instant()));
status.updateAndGet(value -> value.successful(clock.instant()));
}
- database.writeReindexing(reindexing.updateAndGet(value -> value.with(type, status.get())), cluster.name());
+ database.writeReindexing(reindexing.updateAndGet(value -> value.with(type, status.get())));
metrics.dump(reindexing.get());
}
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java
index 22ae54fcc6b..5336275a9c0 100644
--- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java
+++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java
@@ -29,41 +29,43 @@ import static java.util.stream.Collectors.toUnmodifiableMap;
public class ReindexingCurator {
private final Curator curator;
+ private final String clusterName;
private final ReindexingSerializer serializer;
private final Duration lockTimeout;
- public ReindexingCurator(Curator curator, DocumentTypeManager manager) {
- this(curator, manager, Duration.ofSeconds(1));
+ public ReindexingCurator(Curator curator, String clusterName, DocumentTypeManager manager) {
+ this(curator, clusterName, manager, Duration.ofSeconds(1));
}
- ReindexingCurator(Curator curator, DocumentTypeManager manager, Duration lockTimeout) {
+ ReindexingCurator(Curator curator, String clusterName, DocumentTypeManager manager, Duration lockTimeout) {
this.curator = curator;
+ this.clusterName = clusterName;
this.serializer = new ReindexingSerializer(manager);
this.lockTimeout = lockTimeout;
}
- public Reindexing readReindexing(String cluster) {
- return curator.getData(statusPath(cluster)).map(serializer::deserialize)
+ public Reindexing readReindexing() {
+ return curator.getData(statusPath()).map(serializer::deserialize)
.orElse(Reindexing.empty());
}
- public void writeReindexing(Reindexing reindexing, String cluster) {
- curator.set(statusPath(cluster), serializer.serialize(reindexing));
+ public void writeReindexing(Reindexing reindexing) {
+ curator.set(statusPath(), serializer.serialize(reindexing));
}
/** This lock must be held to manipulate reindexing state, or by whoever has a running visitor. */
- public Lock lockReindexing(String cluster) throws ReindexingLockException {
+ public Lock lockReindexing() throws ReindexingLockException {
try {
- return curator.lock(lockPath(cluster), lockTimeout);
+ return curator.lock(lockPath(), lockTimeout);
}
catch (UncheckedTimeoutException e) { // TODO jonmv: Avoid use of guava classes.
throw new ReindexingLockException(e);
}
}
- private Path rootPath(String clusterName) { return Path.fromString("/reindexing/v1/" + clusterName); }
- private Path statusPath(String clusterName) { return rootPath(clusterName).append("status"); }
- private Path lockPath(String clusterName) { return rootPath(clusterName).append("lock"); }
+ private Path rootPath() { return Path.fromString("/reindexing/v1/" + clusterName); }
+ private Path statusPath() { return rootPath().append("status"); }
+ private Path lockPath() { return rootPath().append("lock"); }
private static class ReindexingSerializer {
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java
index 9a114eabbb5..7989338c406 100644
--- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java
+++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java
@@ -29,14 +29,12 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.logging.Logger;
-import java.util.stream.Collectors;
import java.util.stream.Stream;
import static java.util.logging.Level.FINE;
import static java.util.logging.Level.WARNING;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
-import static java.util.stream.Collectors.toUnmodifiableList;
import static java.util.stream.Collectors.toUnmodifiableMap;
/**
@@ -50,7 +48,7 @@ public class ReindexingMaintainer extends AbstractComponent {
private static final Logger log = Logger.getLogger(Reindexing.class.getName());
- private final List<Reindexer> reindexers;
+ private final Reindexer reindexer;
private final ScheduledExecutorService executor;
@Inject
@@ -65,57 +63,51 @@ public class ReindexingMaintainer extends AbstractComponent {
ReindexingMaintainer(Clock clock, Metric metric, DocumentAccess access, ZookeepersConfig zookeepersConfig,
ClusterListConfig clusterListConfig, AllClustersBucketSpacesConfig allClustersBucketSpacesConfig,
ReindexingConfig reindexingConfig) {
- this.reindexers = reindexingConfig.clusters().entrySet().stream()
- .map(cluster -> new Reindexer(parseCluster(cluster.getKey(), clusterListConfig, allClustersBucketSpacesConfig, access.getDocumentTypeManager()),
- parseReady(cluster.getValue(), access.getDocumentTypeManager()),
- new ReindexingCurator(Curator.create(zookeepersConfig.zookeeperserverlist()),
- access.getDocumentTypeManager()),
- access,
- metric,
- clock))
- .collect(toUnmodifiableList());
- this.executor = new ScheduledThreadPoolExecutor(reindexingConfig.clusters().size(), new DaemonThreadFactory("reindexer-"));
+ this.reindexer = new Reindexer(parseCluster(reindexingConfig.clusterName(), clusterListConfig, allClustersBucketSpacesConfig, access.getDocumentTypeManager()),
+ parseReady(reindexingConfig, access.getDocumentTypeManager()),
+ new ReindexingCurator(Curator.create(zookeepersConfig.zookeeperserverlist()),
+ reindexingConfig.clusterName(),
+ access.getDocumentTypeManager()),
+ access,
+ metric,
+ clock);
+ this.executor = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory("reindexer-"));
if (reindexingConfig.enabled())
scheduleStaggered((delayMillis, intervalMillis) -> executor.scheduleAtFixedRate(this::maintain, delayMillis, intervalMillis, TimeUnit.MILLISECONDS),
Duration.ofMinutes(1), clock.instant(), HostName.getLocalhost(), zookeepersConfig.zookeeperserverlist());
}
private void maintain() {
- for (Reindexer reindexer : reindexers)
- executor.submit(() -> {
- try {
- reindexer.reindex();
- }
- catch (ReindexingLockException e) {
- log.log(FINE, "Failed to acquire reindexing lock");
- }
- catch (Exception e) {
- log.log(WARNING, "Exception when reindexing", e);
- }
- });
+ try {
+ reindexer.reindex();
+ }
+ catch (ReindexingLockException e) {
+ log.log(FINE, "Failed to acquire reindexing lock");
+ }
+ catch (Exception e) {
+ log.log(WARNING, "Exception when reindexing", e);
+ }
}
@Override
public void deconstruct() {
try {
- for (Reindexer reindexer : reindexers)
- reindexer.shutdown();
-
+ reindexer.shutdown();
executor.shutdown();
if ( ! executor.awaitTermination(45, TimeUnit.SECONDS))
- log.log(WARNING, "Failed to shut down reindexing within timeout");
+ log.log(WARNING, "Failed to shut down reindexer within timeout");
}
catch (InterruptedException e) {
- log.log(WARNING, "Interrupted while waiting for reindexing to shut down");
+ log.log(WARNING, "Interrupted while waiting for reindexer to shut down");
Thread.currentThread().interrupt();
}
}
- static Map<DocumentType, Instant> parseReady(ReindexingConfig.Clusters cluster, DocumentTypeManager manager) {
- return cluster.documentTypes().entrySet().stream()
- .collect(toUnmodifiableMap(typeStatus -> manager.getDocumentType(typeStatus.getKey()),
- typeStatus -> Instant.ofEpochMilli(typeStatus.getValue().readyAtMillis())));
+ static Map<DocumentType, Instant> parseReady(ReindexingConfig config, DocumentTypeManager manager) {
+ return config.status().entrySet().stream()
+ .collect(toUnmodifiableMap(typeStatus -> manager.getDocumentType(typeStatus.getKey()),
+ typeStatus -> Instant.ofEpochMilli(typeStatus.getValue().readyAtMillis())));
}
/** Schedules a task with the given interval (across all containers in this ZK cluster). */
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/http/ReindexingV1ApiHandler.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/http/ReindexingV1ApiHandler.java
index b1c0d012325..fca08f7743c 100644
--- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/http/ReindexingV1ApiHandler.java
+++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/http/ReindexingV1ApiHandler.java
@@ -22,8 +22,6 @@ import com.yahoo.vespa.config.content.reindexing.ReindexingConfig;
import com.yahoo.vespa.curator.Curator;
import com.yahoo.vespa.zookeeper.VespaZooKeeperServer;
-import java.util.Collection;
-import java.util.List;
import java.util.concurrent.Executor;
import static com.yahoo.jdisc.http.HttpRequest.Method.GET;
@@ -36,7 +34,6 @@ import static com.yahoo.jdisc.http.HttpRequest.Method.GET;
public class ReindexingV1ApiHandler extends ThreadedHttpRequestHandler {
private final ReindexingCurator database;
- private final List<String> clusterNames;
@Inject
public ReindexingV1ApiHandler(Executor executor, Metric metric,
@@ -44,15 +41,14 @@ public class ReindexingV1ApiHandler extends ThreadedHttpRequestHandler {
ReindexingConfig reindexingConfig, DocumentmanagerConfig documentmanagerConfig) {
this(executor,
metric,
- reindexingConfig.clusters().keySet(),
new ReindexingCurator(Curator.create(zookeepersConfig.zookeeperserverlist()),
+ reindexingConfig.clusterName(),
new DocumentTypeManager(documentmanagerConfig)));
}
- ReindexingV1ApiHandler(Executor executor, Metric metric, Collection<String> clusterNames, ReindexingCurator database) {
+ ReindexingV1ApiHandler(Executor executor, Metric metric, ReindexingCurator database) {
super(executor, metric);
this.database = database;
- this.clusterNames = List.copyOf(clusterNames);
}
@Override
@@ -75,18 +71,16 @@ public class ReindexingV1ApiHandler extends ThreadedHttpRequestHandler {
HttpResponse getStatus() {
Slime slime = new Slime();
- Cursor clustersObject = slime.setObject().setObject("clusters");
- for (String clusterName : clusterNames) {
- Cursor documentTypesObject = clustersObject.setObject(clusterName).setObject("documentTypes");
- database.readReindexing(clusterName).status().forEach((type, status) -> {
- Cursor statusObject = documentTypesObject.setObject(type.getName());
- statusObject.setLong("startedMillis", status.startedAt().toEpochMilli());
- status.endedAt().ifPresent(endedAt -> statusObject.setLong("endedMillis", endedAt.toEpochMilli()));
- status.progress().ifPresent(progress -> statusObject.setString("progress", progress.serializeToString()));
- statusObject.setString("state", toString(status.state()));
- status.message().ifPresent(message -> statusObject.setString("message", message));
- });
- }
+ Cursor statusArray = slime.setObject().setArray("status");
+ database.readReindexing().status().forEach((type, status) -> {
+ Cursor statusObject = statusArray.addObject();
+ statusObject.setString("type", type.getName());
+ statusObject.setLong("startedMillis", status.startedAt().toEpochMilli());
+ status.endedAt().ifPresent(endedAt -> statusObject.setLong("endedMillis", endedAt.toEpochMilli()));
+ status.progress().ifPresent(progress -> statusObject.setString("progress", progress.serializeToString()));
+ statusObject.setString("state", toString(status.state()));
+ status.message().ifPresent(message -> statusObject.setString("message", message));
+ });
return new SlimeJsonResponse(slime);
}
diff --git a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java
index 7086c36af3f..3ba4083121c 100644
--- a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java
+++ b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java
@@ -53,7 +53,7 @@ class ReindexerTest {
@BeforeEach
void setUp() {
- database = new ReindexingCurator(new MockCurator(), manager, Duration.ofMillis(1));
+ database = new ReindexingCurator(new MockCurator(), "cluster", manager, Duration.ofMillis(1));
}
@Test
@@ -70,7 +70,7 @@ class ReindexerTest {
@Test
void throwsWhenLockHeldElsewhere() throws InterruptedException, ExecutionException {
Reindexer reindexer = new Reindexer(cluster, Map.of(music, Instant.EPOCH), database, failIfCalled, metric, clock);
- Executors.newSingleThreadExecutor().submit(() -> database.lockReindexing("cluster")).get();
+ Executors.newSingleThreadExecutor().submit(database::lockReindexing).get();
assertThrows(ReindexingLockException.class, reindexer::reindex);
}
@@ -102,7 +102,7 @@ class ReindexerTest {
// Since "music" is a new document type, it is stored as just reindexed, and nothing else happens.
new Reindexer(cluster, Map.of(music, Instant.EPOCH), database, failIfCalled, metric, clock).reindex();
Reindexing reindexing = Reindexing.empty().with(music, Status.ready(Instant.EPOCH).running().successful(Instant.EPOCH));
- assertEquals(reindexing, database.readReindexing("cluster"));
+ assertEquals(reindexing, database.readReindexing());
assertEquals(Map.of("reindexing.progress", Map.of(Map.of("documenttype", "music",
"clusterid", "cluster",
"state", "successful"),
@@ -125,19 +125,19 @@ class ReindexerTest {
// Nothing happens, since it's not yet time. This isn't supposed to happen unless high clock skew.
clock.advance(Duration.ofMillis(5));
new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(10)), database, failIfCalled, metric, clock).reindex();
- assertEquals(reindexing, database.readReindexing("cluster"));
+ assertEquals(reindexing, database.readReindexing());
// It's time to reindex the "music" documents — let this complete successfully.
clock.advance(Duration.ofMillis(10));
AtomicBoolean shutDown = new AtomicBoolean();
Executor executor = Executors.newSingleThreadExecutor();
new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(10)), database, parameters -> {
- database.writeReindexing(Reindexing.empty(), "cluster"); // Wipe database to verify we write data from reindexer.
+ database.writeReindexing(Reindexing.empty()); // Wipe database to verify we write data from reindexer.
executor.execute(() -> parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.SUCCESS, "OK"));
return () -> shutDown.set(true);
}, metric, clock).reindex();
reindexing = reindexing.with(music, Status.ready(clock.instant()).running().successful(clock.instant()));
- assertEquals(reindexing, database.readReindexing("cluster"));
+ assertEquals(reindexing, database.readReindexing());
assertTrue(shutDown.get(), "Session was shut down");
// One more reindexing, this time shut down before visit completes, but after progress is reported.
@@ -146,7 +146,7 @@ class ReindexerTest {
shutDown.set(false);
AtomicReference<Reindexer> aborted = new AtomicReference<>();
aborted.set(new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(20)), database, parameters -> {
- database.writeReindexing(Reindexing.empty(), "cluster"); // Wipe database to verify we write data from reindexer.
+ database.writeReindexing(Reindexing.empty()); // Wipe database to verify we write data from reindexer.
parameters.getControlHandler().onProgress(new ProgressToken());
aborted.get().shutdown();
return () -> {
@@ -156,7 +156,7 @@ class ReindexerTest {
}, metric, clock));
aborted.get().reindex();
reindexing = reindexing.with(music, Status.ready(clock.instant()).running().progressed(new ProgressToken()).halted());
- assertEquals(reindexing, database.readReindexing("cluster"));
+ assertEquals(reindexing, database.readReindexing());
assertTrue(shutDown.get(), "Session was shut down");
assertEquals(1.0, // new ProgressToken() is 100% done.
metric.metrics().get("reindexing.progress")
@@ -168,17 +168,17 @@ class ReindexerTest {
clock.advance(Duration.ofMillis(10));
shutDown.set(false);
new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(30)), database, parameters -> {
- database.writeReindexing(Reindexing.empty(), "cluster"); // Wipe database to verify we write data from reindexer.
+ database.writeReindexing(Reindexing.empty()); // Wipe database to verify we write data from reindexer.
executor.execute(() -> parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.FAILURE, "Error"));
return () -> shutDown.set(true);
}, metric, clock).reindex();
reindexing = reindexing.with(music, Status.ready(clock.instant()).running().failed(clock.instant(), "Error"));
- assertEquals(reindexing, database.readReindexing("cluster"));
+ assertEquals(reindexing, database.readReindexing());
assertTrue(shutDown.get(), "Session was shut down");
// Document type is ignored in next run, as it has failed fatally.
new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(30)), database, failIfCalled, metric, clock).reindex();
- assertEquals(reindexing, database.readReindexing("cluster"));
+ assertEquals(reindexing, database.readReindexing());
}
}
diff --git a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingCuratorTest.java b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingCuratorTest.java
index 7d4cb2af47e..c5a58dcae68 100644
--- a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingCuratorTest.java
+++ b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingCuratorTest.java
@@ -25,24 +25,24 @@ class ReindexingCuratorTest {
DocumentTypeManager manager = new DocumentTypeManager(musicConfig);
DocumentType music = manager.getDocumentType("music");
MockCurator mockCurator = new MockCurator();
- ReindexingCurator curator = new ReindexingCurator(mockCurator, manager);
+ ReindexingCurator curator = new ReindexingCurator(mockCurator, "cluster", manager);
- assertEquals(Reindexing.empty(), curator.readReindexing("cluster"));
+ assertEquals(Reindexing.empty(), curator.readReindexing());
Reindexing.Status status = Reindexing.Status.ready(Instant.ofEpochMilli(123))
.running()
.progressed(new ProgressToken());
Reindexing reindexing = Reindexing.empty().with(music, status);
- curator.writeReindexing(reindexing, "cluster");
- assertEquals(reindexing, curator.readReindexing("cluster"));
+ curator.writeReindexing(reindexing);
+ assertEquals(reindexing, curator.readReindexing());
status = status.halted().running().failed(Instant.ofEpochMilli(321), "error");
reindexing = reindexing.with(music, status);
- curator.writeReindexing(reindexing, "cluster");
- assertEquals(reindexing, curator.readReindexing("cluster"));
+ curator.writeReindexing(reindexing);
+ assertEquals(reindexing, curator.readReindexing());
// Unknown document types are forgotten.
- assertEquals(Reindexing.empty(), new ReindexingCurator(mockCurator, new DocumentTypeManager(emptyConfig)).readReindexing("cluster"));
+ assertEquals(Reindexing.empty(), new ReindexingCurator(mockCurator, "cluster", new DocumentTypeManager(emptyConfig)).readReindexing());
}
}
diff --git a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingMaintainerTest.java b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingMaintainerTest.java
index afa68debadb..713fb836d62 100644
--- a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingMaintainerTest.java
+++ b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingMaintainerTest.java
@@ -31,15 +31,17 @@ class ReindexingMaintainerTest {
DocumentTypeManager manager = new DocumentTypeManager(musicConfig);
assertEquals(Map.of(manager.getDocumentType("music"), Instant.ofEpochMilli(123)),
- parseReady(new ReindexingConfig.Clusters.Builder()
- .documentTypes("music", new ReindexingConfig.Clusters.DocumentTypes.Builder().readyAtMillis(123))
+ parseReady(new ReindexingConfig.Builder()
+ .enabled(true)
+ .clusterName("cluster")
+ .status("music", new ReindexingConfig.Status.Builder().readyAtMillis(123))
.build(),
manager));
// Unknown document type fails
- assertThrows(NullPointerException.class,
- () -> parseReady(new ReindexingConfig.Clusters.Builder()
- .documentTypes("poetry", new ReindexingConfig.Clusters.DocumentTypes.Builder().readyAtMillis(123))
+ assertThrows(IllegalArgumentException.class,
+ () -> parseReady(new ReindexingConfig.Builder()
+ .status("poetry", new ReindexingConfig.Status.Builder().readyAtMillis(123))
.build(),
manager));
diff --git a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/http/ReindexingV1ApiTest.java b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/http/ReindexingV1ApiTest.java
index b8f62050347..1b6379d21e5 100644
--- a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/http/ReindexingV1ApiTest.java
+++ b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/http/ReindexingV1ApiTest.java
@@ -15,7 +15,6 @@ import com.yahoo.vespa.curator.mock.MockCurator;
import org.junit.jupiter.api.Test;
import java.time.Instant;
-import java.util.List;
import java.util.concurrent.Executors;
import static com.yahoo.jdisc.http.HttpRequest.Method.POST;
@@ -29,9 +28,8 @@ class ReindexingV1ApiTest {
DocumentmanagerConfig musicConfig = Deriver.getDocumentManagerConfig("src/test/resources/schemas/music.sd").build();
DocumentTypeManager manager = new DocumentTypeManager(musicConfig);
DocumentType musicType = manager.getDocumentType("music");
- ReindexingCurator database = new ReindexingCurator(new MockCurator(), manager);
- ReindexingV1ApiHandler handler = new ReindexingV1ApiHandler(Executors.newSingleThreadExecutor(), new MockMetric(),
- List.of("cluster", "oyster"), database);
+ ReindexingCurator database = new ReindexingCurator(new MockCurator(), "cluster", manager);
+ ReindexingV1ApiHandler handler = new ReindexingV1ApiHandler(Executors.newSingleThreadExecutor(), new MockMetric(), database);
@Test
void testResponses() {
@@ -45,33 +43,23 @@ class ReindexingV1ApiTest {
// GET at status with empty database
response = driver.sendRequest("http://localhost/reindexing/v1/status");
- assertEquals("{\"clusters\":{\"cluster\":{\"documentTypes\":{}},\"oyster\":{\"documentTypes\":{}}}}", response.readAll());
+ assertEquals("{\"status\":[]}", response.readAll());
assertEquals(200, response.getStatus());
// GET at status with a failed status
database.writeReindexing(Reindexing.empty().with(musicType, Status.ready(Instant.EPOCH)
.running()
.progressed(new ProgressToken())
- .failed(Instant.ofEpochMilli(123), "ヽ(。_°)ノ")),
- "cluster");
+ .failed(Instant.ofEpochMilli(123), "ヽ(。_°)ノ")));
response = driver.sendRequest("http://localhost/reindexing/v1/status");
- assertEquals("{" +
- "\"clusters\":{" +
- "\"cluster\":{" +
- "\"documentTypes\":{" +
- "\"music\":{" +
- "\"startedMillis\":0," +
- "\"endedMillis\":123," +
- "\"progress\":\"" + new ProgressToken().serializeToString() + "\"," +
- "\"state\":\"failed\"," +
- "\"message\":\"ヽ(。_°)ノ\"}" +
- "}" +
- "}," +
- "\"oyster\":{" +
- "\"documentTypes\":{}" +
- "}" +
- "}" +
- "}",
+ assertEquals("{\"status\":[{" +
+ "\"type\":\"music\"," +
+ "\"startedMillis\":0," +
+ "\"endedMillis\":123," +
+ "\"progress\":\"" + new ProgressToken().serializeToString() + "\"," +
+ "\"state\":\"failed\"," +
+ "\"message\":\"ヽ(。_°)ノ\"}" +
+ "]}",
response.readAll());
assertEquals(200, response.getStatus());
diff --git a/configdefinitions/src/vespa/reindexing.def b/configdefinitions/src/vespa/reindexing.def
index e020aec3f65..d577f62b10b 100644
--- a/configdefinitions/src/vespa/reindexing.def
+++ b/configdefinitions/src/vespa/reindexing.def
@@ -6,13 +6,8 @@ namespace=vespa.config.content.reindexing
# Whether reindexing should run at all
enabled bool default=false
-# TODO jonmv: remove after 7.310 is gone
# The name of the content cluster to reindex documents from
-clusterName string default=""
+clusterName string
-# TODO jonmv: remove after 7.310 is gone
# Epoch millis after which latest reprocessing may begin, per document type
status{}.readyAtMillis long
-
-# Epoch millis after which latest reprocessing may begin, per document type, per cluster
-clusters{}.documentTypes{}.readyAtMillis long
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ClusterReindexing.java b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ClusterReindexing.java
index 4b7148463f9..7f0671820d3 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ClusterReindexing.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ClusterReindexing.java
@@ -27,26 +27,6 @@ public class ClusterReindexing {
public Map<String, Status> documentTypeStatus() { return documentTypeStatus; }
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- ClusterReindexing that = (ClusterReindexing) o;
- return documentTypeStatus.equals(that.documentTypeStatus);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(documentTypeStatus);
- }
-
- @Override
- public String toString() {
- return "ClusterReindexing{" +
- "documentTypeStatus=" + documentTypeStatus +
- '}';
- }
-
public static class Status {
@@ -69,35 +49,6 @@ public class ClusterReindexing {
public Optional<State> state() { return Optional.ofNullable(state); }
public Optional<String> message() { return Optional.ofNullable(message); }
public Optional<String> progress() { return Optional.ofNullable(progress); }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- Status status = (Status) o;
- return startedAt.equals(status.startedAt) &&
- Objects.equals(endedAt, status.endedAt) &&
- state == status.state &&
- Objects.equals(message, status.message) &&
- Objects.equals(progress, status.progress);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(startedAt, endedAt, state, message, progress);
- }
-
- @Override
- public String toString() {
- return "Status{" +
- "startedAt=" + startedAt +
- ", endedAt=" + endedAt +
- ", state=" + state +
- ", message='" + message + '\'' +
- ", progress='" + progress + '\'' +
- '}';
- }
-
}
@@ -116,7 +67,5 @@ public class ClusterReindexing {
}
public String asString() { return stringValue; }
-
}
-
}
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClient.java b/configserver/src/main/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClient.java
index 0124b6822f0..fef0120a431 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClient.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClient.java
@@ -55,21 +55,21 @@ public class DefaultClusterReindexingStatusClient implements ClusterReindexingSt
@Override
public Map<String, ClusterReindexing> getReindexingStatus(ModelResult application) throws IOException {
Map<ClusterId, List<ServiceInfo>> clusters = clusterControllerClusters(application);
- Map<ClusterId, CompletableFuture<Map<String, ClusterReindexing>>> futureStatusPerCluster = new HashMap<>();
+ Map<ClusterId, CompletableFuture<ClusterReindexing>> futureStatusPerCluster = new HashMap<>();
clusters.forEach((clusterId, clusterNodes) -> {
var parallelRequests = clusterNodes.stream()
.map(this::getReindexingStatus)
.collect(Collectors.toList());
- CompletableFuture<Map<String, ClusterReindexing>> combinedRequest = CompletableFutures.firstOf(parallelRequests);
+ CompletableFuture<ClusterReindexing> combinedRequest = CompletableFutures.firstOf(parallelRequests);
futureStatusPerCluster.put(clusterId, combinedRequest);
});
try {
Map<String, ClusterReindexing> statusPerCluster = new HashMap<>();
futureStatusPerCluster.forEach((clusterId, futureStatus) -> {
- statusPerCluster.putAll(futureStatus.join());
+ statusPerCluster.put(clusterId.s(), futureStatus.join());
});
- return Map.copyOf(statusPerCluster);
+ return statusPerCluster;
} catch (Exception e) {
throw new IOException("Failed to get reindexing status from cluster controllers: " + e.getMessage(), e);
}
@@ -77,7 +77,7 @@ public class DefaultClusterReindexingStatusClient implements ClusterReindexingSt
@Override public void close() { uncheck(() -> httpClient.close()); }
- private CompletableFuture<Map<String, ClusterReindexing>> getReindexingStatus(ServiceInfo service) {
+ private CompletableFuture<ClusterReindexing> getReindexingStatus(ServiceInfo service) {
URI uri = URI.create(String.format("http://%s:%d/reindexing/v1/status", service.getHostName(), getStatePort(service)));
CompletableFuture<SimpleHttpResponse> responsePromise = new CompletableFuture<>();
httpClient.execute(SimpleHttpRequests.get(uri), new FutureCallback<>() {
@@ -94,40 +94,33 @@ public class DefaultClusterReindexingStatusClient implements ClusterReindexingSt
}, executor);
}
- private static Map<String, ClusterReindexing> toClusterReindexing(SimpleHttpResponse response) throws IOException {
+ private static ClusterReindexing toClusterReindexing(SimpleHttpResponse response) throws IOException {
if (response.getCode() != HttpStatus.SC_OK) throw new IOException("Expected status code 200, got " + response.getCode());
if (response.getBody() == null) throw new IOException("Response has no content");
return toClusterReindexing(response.getBodyBytes());
}
- private static Map<String, ClusterReindexing> toClusterReindexing(byte[] requestBody) throws IOException {
+ private static ClusterReindexing toClusterReindexing(byte[] requestBody) throws IOException {
JsonNode jsonNode = mapper.readTree(requestBody);
- Map<String, ClusterReindexing> clusters = new HashMap<>();
- for (var clusterNames = jsonNode.get("clusters").fieldNames(); clusterNames.hasNext(); ) {
- String clusterName = clusterNames.next();
- JsonNode clusterJson = jsonNode.get("clusters").get(clusterName);
- Map<String, ClusterReindexing.Status> documentStatuses = new HashMap<>();
- for (var documentTypes = clusterJson.get("documentTypes").fieldNames(); documentTypes.hasNext(); ) {
- String type = documentTypes.next();
- JsonNode statusJson = clusterJson.get("documentTypes").get(type);
- Instant startedMillis = Instant.ofEpochMilli(statusJson.get("startedMillis").longValue());
- Instant endedMillis = Optional.ofNullable(statusJson.get("endedMillis"))
- .map(json -> Instant.ofEpochMilli(json.longValue()))
- .orElse(null);
- String progressToken = Optional.ofNullable(statusJson.get("progress"))
- .map(JsonNode::textValue)
- .orElse(null);
- ClusterReindexing.State state = Optional.ofNullable(statusJson.get("state"))
- .map(json -> ClusterReindexing.State.fromString(json.textValue()))
- .orElse(null);
- String message = Optional.ofNullable(statusJson.get("message"))
- .map(JsonNode::textValue)
- .orElse(null);
- documentStatuses.put(type, new ClusterReindexing.Status(startedMillis, endedMillis, state, message, progressToken));
- }
- clusters.put(clusterName, new ClusterReindexing(documentStatuses));
+ Map<String, ClusterReindexing.Status> documentStatuses = new HashMap<>();
+ for (JsonNode statusJson : jsonNode.get("status")) {
+ String type = statusJson.get("type").textValue();
+ Instant startedMillis = Instant.ofEpochMilli(statusJson.get("startedMillis").longValue());
+ Instant endedMillis = Optional.ofNullable(statusJson.get("endedMillis"))
+ .map(json -> Instant.ofEpochMilli(json.longValue()))
+ .orElse(null);
+ String progressToken = Optional.ofNullable(statusJson.get("progress"))
+ .map(JsonNode::textValue)
+ .orElse(null);
+ ClusterReindexing.State state = Optional.ofNullable(statusJson.get("state"))
+ .map(json -> ClusterReindexing.State.fromString(json.textValue()))
+ .orElse(null);
+ String message = Optional.ofNullable(statusJson.get("message"))
+ .map(JsonNode::textValue)
+ .orElse(null);
+ documentStatuses.put(type, new ClusterReindexing.Status(startedMillis, endedMillis, state, message, progressToken));
}
- return Map.copyOf(clusters);
+ return new ClusterReindexing(documentStatuses);
}
private static int getStatePort(ServiceInfo service) {
diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClientTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClientTest.java
index 82e1bd96373..21894e4a756 100644
--- a/configserver/src/test/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClientTest.java
+++ b/configserver/src/test/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClientTest.java
@@ -7,12 +7,10 @@ import com.yahoo.config.model.api.PortInfo;
import com.yahoo.config.model.api.ServiceInfo;
import com.yahoo.documentapi.ProgressToken;
import com.yahoo.vespa.config.server.modelfactory.ModelResult;
-import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import java.io.IOException;
-import java.time.Instant;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -23,7 +21,6 @@ import static com.github.tomakehurst.wiremock.client.WireMock.serverError;
import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.options;
import static com.yahoo.config.model.api.container.ContainerServiceType.CLUSTERCONTROLLER_CONTAINER;
-import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -43,50 +40,25 @@ public class DefaultClusterReindexingStatusClientTest {
String uriPath = "/reindexing/v1/status";
server1.stubFor(get(urlEqualTo(uriPath)).willReturn(serverError()));
server2.stubFor(get(urlEqualTo(uriPath)).willReturn(okJson(
- "{" +
- " \"clusters\": {" +
- " \"cluster1\": {" +
- " \"documentTypes\": {" +
- " \"music\": {" +
- " \"startedMillis\":0," +
- " \"state\": \"" + ClusterReindexing.State.RUNNING.asString() + "\"" +
- " }" +
- " }" +
- " }" +
- " }" +
- "}")));
+ "{\"status\":[{" +
+ "\"type\":\"music\"," +
+ "\"startedMillis\":0," +
+ "\"endedMillis\":123," +
+ "\"progress\":\"" + new ProgressToken().serializeToString() + "\"," +
+ "\"state\": \"" + ClusterReindexing.State.FAILED.asString() + "\"," +
+ "\"message\":\"something went wrong\"}" +
+ "]}")));
server3.stubFor(get(urlEqualTo(uriPath)).willReturn(okJson(
- "{" +
- " \"clusters\": {" +
- " \"cluster2\": {" +
- " \"documentTypes\": {" +
- " \"artist\": {" +
- " \"startedMillis\":50," +
- " \"endedMillis\":150," +
- " \"progress\":\"half-done\"," +
- " \"state\": \"" + ClusterReindexing.State.SUCCESSFUL.asString() + "\"," +
- " \"message\":\"success\"" +
- " }" +
- " }" +
- " }" +
- " }" +
- "}")));
- Map<String, ClusterReindexing> expected = Map.of("cluster1",
- new ClusterReindexing(Map.of("music",
- new ClusterReindexing.Status(Instant.ofEpochMilli(0),
- null,
- ClusterReindexing.State.RUNNING,
- null,
- null))),
- "cluster2",
- new ClusterReindexing(Map.of("artist",
- new ClusterReindexing.Status(Instant.ofEpochMilli(50),
- Instant.ofEpochMilli(150),
- ClusterReindexing.State.SUCCESSFUL,
- "success",
- "half-done"))));
+ "{\"status\":[{" +
+ "\"type\":\"artist\"," +
+ "\"startedMillis\":10," +
+ "\"endedMillis\":150," +
+ "\"progress\":\"" + new ProgressToken().serializeToString() + "\"," +
+ "\"state\": \"" + ClusterReindexing.State.SUCCESSFUL.asString() + "\"," +
+ "\"message\":\"successs\"}" +
+ "]}")));
Map<String, ClusterReindexing> result = client.getReindexingStatus(app);
- assertEquals(expected, result);
+ System.out.println(result);
}