diff options
author | Jon Marius Venstad <jonmv@users.noreply.github.com> | 2020-12-04 23:40:36 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-12-04 23:40:36 +0100 |
commit | e94865ec953f076d7dd75370f5d4260fc522a81c (patch) | |
tree | 55cdeb2b1020bc687388c38075d04366d4c75527 | |
parent | 57a44319f652817b5841d4d3d476a070235970f5 (diff) | |
parent | 638b4f6cc2b5c5926c4802ef407c596649e169c8 (diff) |
Merge pull request #15691 from vespa-engine/revert-15677-jonmv/reindexing-over-multiple-clusters
Revert "Jonmv/reindexing over multiple clusters"
12 files changed, 137 insertions, 250 deletions
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java index eaca5e3a847..ef5e676e36b 100644 --- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java @@ -91,10 +91,10 @@ public class Reindexer { if (phaser.isTerminated()) throw new IllegalStateException("Already shut down"); - try (Lock lock = database.lockReindexing(cluster.name())) { - AtomicReference<Reindexing> reindexing = new AtomicReference<>(database.readReindexing(cluster.name())); + try (Lock lock = database.lockReindexing()) { + AtomicReference<Reindexing> reindexing = new AtomicReference<>(database.readReindexing()); reindexing.set(updateWithReady(ready, reindexing.get(), clock.instant())); - database.writeReindexing(reindexing.get(), cluster.name()); + database.writeReindexing(reindexing.get()); metrics.dump(reindexing.get()); for (DocumentType type : ready.keySet()) { // We consider only document types for which we have config. @@ -150,7 +150,7 @@ public class Reindexer { status.updateAndGet(value -> value.progressed(token)); if (progressLastStored.get().isBefore(clock.instant().minusSeconds(10))) { progressLastStored.set(clock.instant()); - database.writeReindexing(reindexing.updateAndGet(value -> value.with(type, status.get())), cluster.name()); + database.writeReindexing(reindexing.updateAndGet(value -> value.with(type, status.get()))); metrics.dump(reindexing.get()); } } @@ -184,7 +184,7 @@ public class Reindexer { log.log(INFO, "Completed reindexing of " + type + " after " + Duration.between(status.get().startedAt(), clock.instant())); status.updateAndGet(value -> value.successful(clock.instant())); } - database.writeReindexing(reindexing.updateAndGet(value -> value.with(type, status.get())), cluster.name()); + database.writeReindexing(reindexing.updateAndGet(value -> value.with(type, status.get()))); metrics.dump(reindexing.get()); } diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java index 22ae54fcc6b..5336275a9c0 100644 --- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java @@ -29,41 +29,43 @@ import static java.util.stream.Collectors.toUnmodifiableMap; public class ReindexingCurator { private final Curator curator; + private final String clusterName; private final ReindexingSerializer serializer; private final Duration lockTimeout; - public ReindexingCurator(Curator curator, DocumentTypeManager manager) { - this(curator, manager, Duration.ofSeconds(1)); + public ReindexingCurator(Curator curator, String clusterName, DocumentTypeManager manager) { + this(curator, clusterName, manager, Duration.ofSeconds(1)); } - ReindexingCurator(Curator curator, DocumentTypeManager manager, Duration lockTimeout) { + ReindexingCurator(Curator curator, String clusterName, DocumentTypeManager manager, Duration lockTimeout) { this.curator = curator; + this.clusterName = clusterName; this.serializer = new ReindexingSerializer(manager); this.lockTimeout = lockTimeout; } - public Reindexing readReindexing(String cluster) { - return curator.getData(statusPath(cluster)).map(serializer::deserialize) + public Reindexing readReindexing() { + return curator.getData(statusPath()).map(serializer::deserialize) .orElse(Reindexing.empty()); } - public void writeReindexing(Reindexing reindexing, String cluster) { - curator.set(statusPath(cluster), serializer.serialize(reindexing)); + public void writeReindexing(Reindexing reindexing) { + curator.set(statusPath(), serializer.serialize(reindexing)); } /** This lock must be held to manipulate reindexing state, or by whoever has a running visitor. */ - public Lock lockReindexing(String cluster) throws ReindexingLockException { + public Lock lockReindexing() throws ReindexingLockException { try { - return curator.lock(lockPath(cluster), lockTimeout); + return curator.lock(lockPath(), lockTimeout); } catch (UncheckedTimeoutException e) { // TODO jonmv: Avoid use of guava classes. throw new ReindexingLockException(e); } } - private Path rootPath(String clusterName) { return Path.fromString("/reindexing/v1/" + clusterName); } - private Path statusPath(String clusterName) { return rootPath(clusterName).append("status"); } - private Path lockPath(String clusterName) { return rootPath(clusterName).append("lock"); } + private Path rootPath() { return Path.fromString("/reindexing/v1/" + clusterName); } + private Path statusPath() { return rootPath().append("status"); } + private Path lockPath() { return rootPath().append("lock"); } private static class ReindexingSerializer { diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java index 9a114eabbb5..7989338c406 100644 --- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java @@ -29,14 +29,12 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.logging.Logger; -import java.util.stream.Collectors; import java.util.stream.Stream; import static java.util.logging.Level.FINE; import static java.util.logging.Level.WARNING; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toMap; -import static java.util.stream.Collectors.toUnmodifiableList; import static java.util.stream.Collectors.toUnmodifiableMap; /** @@ -50,7 +48,7 @@ public class ReindexingMaintainer extends AbstractComponent { private static final Logger log = Logger.getLogger(Reindexing.class.getName()); - private final List<Reindexer> reindexers; + private final Reindexer reindexer; private final ScheduledExecutorService executor; @Inject @@ -65,57 +63,51 @@ public class ReindexingMaintainer extends AbstractComponent { ReindexingMaintainer(Clock clock, Metric metric, DocumentAccess access, ZookeepersConfig zookeepersConfig, ClusterListConfig clusterListConfig, AllClustersBucketSpacesConfig allClustersBucketSpacesConfig, ReindexingConfig reindexingConfig) { - this.reindexers = reindexingConfig.clusters().entrySet().stream() - .map(cluster -> new Reindexer(parseCluster(cluster.getKey(), clusterListConfig, allClustersBucketSpacesConfig, access.getDocumentTypeManager()), - parseReady(cluster.getValue(), access.getDocumentTypeManager()), - new ReindexingCurator(Curator.create(zookeepersConfig.zookeeperserverlist()), - access.getDocumentTypeManager()), - access, - metric, - clock)) - .collect(toUnmodifiableList()); - this.executor = new ScheduledThreadPoolExecutor(reindexingConfig.clusters().size(), new DaemonThreadFactory("reindexer-")); + this.reindexer = new Reindexer(parseCluster(reindexingConfig.clusterName(), clusterListConfig, allClustersBucketSpacesConfig, access.getDocumentTypeManager()), + parseReady(reindexingConfig, access.getDocumentTypeManager()), + new ReindexingCurator(Curator.create(zookeepersConfig.zookeeperserverlist()), + reindexingConfig.clusterName(), + access.getDocumentTypeManager()), + access, + metric, + clock); + this.executor = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory("reindexer-")); if (reindexingConfig.enabled()) scheduleStaggered((delayMillis, intervalMillis) -> executor.scheduleAtFixedRate(this::maintain, delayMillis, intervalMillis, TimeUnit.MILLISECONDS), Duration.ofMinutes(1), clock.instant(), HostName.getLocalhost(), zookeepersConfig.zookeeperserverlist()); } private void maintain() { - for (Reindexer reindexer : reindexers) - executor.submit(() -> { - try { - reindexer.reindex(); - } - catch (ReindexingLockException e) { - log.log(FINE, "Failed to acquire reindexing lock"); - } - catch (Exception e) { - log.log(WARNING, "Exception when reindexing", e); - } - }); + try { + reindexer.reindex(); + } + catch (ReindexingLockException e) { + log.log(FINE, "Failed to acquire reindexing lock"); + } + catch (Exception e) { + log.log(WARNING, "Exception when reindexing", e); + } } @Override public void deconstruct() { try { - for (Reindexer reindexer : reindexers) - reindexer.shutdown(); - + reindexer.shutdown(); executor.shutdown(); if ( ! executor.awaitTermination(45, TimeUnit.SECONDS)) - log.log(WARNING, "Failed to shut down reindexing within timeout"); + log.log(WARNING, "Failed to shut down reindexer within timeout"); } catch (InterruptedException e) { - log.log(WARNING, "Interrupted while waiting for reindexing to shut down"); + log.log(WARNING, "Interrupted while waiting for reindexer to shut down"); Thread.currentThread().interrupt(); } } - static Map<DocumentType, Instant> parseReady(ReindexingConfig.Clusters cluster, DocumentTypeManager manager) { - return cluster.documentTypes().entrySet().stream() - .collect(toUnmodifiableMap(typeStatus -> manager.getDocumentType(typeStatus.getKey()), - typeStatus -> Instant.ofEpochMilli(typeStatus.getValue().readyAtMillis()))); + static Map<DocumentType, Instant> parseReady(ReindexingConfig config, DocumentTypeManager manager) { + return config.status().entrySet().stream() + .collect(toUnmodifiableMap(typeStatus -> manager.getDocumentType(typeStatus.getKey()), + typeStatus -> Instant.ofEpochMilli(typeStatus.getValue().readyAtMillis()))); } /** Schedules a task with the given interval (across all containers in this ZK cluster). */ diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/http/ReindexingV1ApiHandler.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/http/ReindexingV1ApiHandler.java index b1c0d012325..fca08f7743c 100644 --- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/http/ReindexingV1ApiHandler.java +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/http/ReindexingV1ApiHandler.java @@ -22,8 +22,6 @@ import com.yahoo.vespa.config.content.reindexing.ReindexingConfig; import com.yahoo.vespa.curator.Curator; import com.yahoo.vespa.zookeeper.VespaZooKeeperServer; -import java.util.Collection; -import java.util.List; import java.util.concurrent.Executor; import static com.yahoo.jdisc.http.HttpRequest.Method.GET; @@ -36,7 +34,6 @@ import static com.yahoo.jdisc.http.HttpRequest.Method.GET; public class ReindexingV1ApiHandler extends ThreadedHttpRequestHandler { private final ReindexingCurator database; - private final List<String> clusterNames; @Inject public ReindexingV1ApiHandler(Executor executor, Metric metric, @@ -44,15 +41,14 @@ public class ReindexingV1ApiHandler extends ThreadedHttpRequestHandler { ReindexingConfig reindexingConfig, DocumentmanagerConfig documentmanagerConfig) { this(executor, metric, - reindexingConfig.clusters().keySet(), new ReindexingCurator(Curator.create(zookeepersConfig.zookeeperserverlist()), + reindexingConfig.clusterName(), new DocumentTypeManager(documentmanagerConfig))); } - ReindexingV1ApiHandler(Executor executor, Metric metric, Collection<String> clusterNames, ReindexingCurator database) { + ReindexingV1ApiHandler(Executor executor, Metric metric, ReindexingCurator database) { super(executor, metric); this.database = database; - this.clusterNames = List.copyOf(clusterNames); } @Override @@ -75,18 +71,16 @@ public class ReindexingV1ApiHandler extends ThreadedHttpRequestHandler { HttpResponse getStatus() { Slime slime = new Slime(); - Cursor clustersObject = slime.setObject().setObject("clusters"); - for (String clusterName : clusterNames) { - Cursor documentTypesObject = clustersObject.setObject(clusterName).setObject("documentTypes"); - database.readReindexing(clusterName).status().forEach((type, status) -> { - Cursor statusObject = documentTypesObject.setObject(type.getName()); - statusObject.setLong("startedMillis", status.startedAt().toEpochMilli()); - status.endedAt().ifPresent(endedAt -> statusObject.setLong("endedMillis", endedAt.toEpochMilli())); - status.progress().ifPresent(progress -> statusObject.setString("progress", progress.serializeToString())); - statusObject.setString("state", toString(status.state())); - status.message().ifPresent(message -> statusObject.setString("message", message)); - }); - } + Cursor statusArray = slime.setObject().setArray("status"); + database.readReindexing().status().forEach((type, status) -> { + Cursor statusObject = statusArray.addObject(); + statusObject.setString("type", type.getName()); + statusObject.setLong("startedMillis", status.startedAt().toEpochMilli()); + status.endedAt().ifPresent(endedAt -> statusObject.setLong("endedMillis", endedAt.toEpochMilli())); + status.progress().ifPresent(progress -> statusObject.setString("progress", progress.serializeToString())); + statusObject.setString("state", toString(status.state())); + status.message().ifPresent(message -> statusObject.setString("message", message)); + }); return new SlimeJsonResponse(slime); } diff --git a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java index 7086c36af3f..3ba4083121c 100644 --- a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java +++ b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java @@ -53,7 +53,7 @@ class ReindexerTest { @BeforeEach void setUp() { - database = new ReindexingCurator(new MockCurator(), manager, Duration.ofMillis(1)); + database = new ReindexingCurator(new MockCurator(), "cluster", manager, Duration.ofMillis(1)); } @Test @@ -70,7 +70,7 @@ class ReindexerTest { @Test void throwsWhenLockHeldElsewhere() throws InterruptedException, ExecutionException { Reindexer reindexer = new Reindexer(cluster, Map.of(music, Instant.EPOCH), database, failIfCalled, metric, clock); - Executors.newSingleThreadExecutor().submit(() -> database.lockReindexing("cluster")).get(); + Executors.newSingleThreadExecutor().submit(database::lockReindexing).get(); assertThrows(ReindexingLockException.class, reindexer::reindex); } @@ -102,7 +102,7 @@ class ReindexerTest { // Since "music" is a new document type, it is stored as just reindexed, and nothing else happens. new Reindexer(cluster, Map.of(music, Instant.EPOCH), database, failIfCalled, metric, clock).reindex(); Reindexing reindexing = Reindexing.empty().with(music, Status.ready(Instant.EPOCH).running().successful(Instant.EPOCH)); - assertEquals(reindexing, database.readReindexing("cluster")); + assertEquals(reindexing, database.readReindexing()); assertEquals(Map.of("reindexing.progress", Map.of(Map.of("documenttype", "music", "clusterid", "cluster", "state", "successful"), @@ -125,19 +125,19 @@ class ReindexerTest { // Nothing happens, since it's not yet time. This isn't supposed to happen unless high clock skew. clock.advance(Duration.ofMillis(5)); new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(10)), database, failIfCalled, metric, clock).reindex(); - assertEquals(reindexing, database.readReindexing("cluster")); + assertEquals(reindexing, database.readReindexing()); // It's time to reindex the "music" documents — let this complete successfully. clock.advance(Duration.ofMillis(10)); AtomicBoolean shutDown = new AtomicBoolean(); Executor executor = Executors.newSingleThreadExecutor(); new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(10)), database, parameters -> { - database.writeReindexing(Reindexing.empty(), "cluster"); // Wipe database to verify we write data from reindexer. + database.writeReindexing(Reindexing.empty()); // Wipe database to verify we write data from reindexer. executor.execute(() -> parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.SUCCESS, "OK")); return () -> shutDown.set(true); }, metric, clock).reindex(); reindexing = reindexing.with(music, Status.ready(clock.instant()).running().successful(clock.instant())); - assertEquals(reindexing, database.readReindexing("cluster")); + assertEquals(reindexing, database.readReindexing()); assertTrue(shutDown.get(), "Session was shut down"); // One more reindexing, this time shut down before visit completes, but after progress is reported. @@ -146,7 +146,7 @@ class ReindexerTest { shutDown.set(false); AtomicReference<Reindexer> aborted = new AtomicReference<>(); aborted.set(new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(20)), database, parameters -> { - database.writeReindexing(Reindexing.empty(), "cluster"); // Wipe database to verify we write data from reindexer. + database.writeReindexing(Reindexing.empty()); // Wipe database to verify we write data from reindexer. parameters.getControlHandler().onProgress(new ProgressToken()); aborted.get().shutdown(); return () -> { @@ -156,7 +156,7 @@ class ReindexerTest { }, metric, clock)); aborted.get().reindex(); reindexing = reindexing.with(music, Status.ready(clock.instant()).running().progressed(new ProgressToken()).halted()); - assertEquals(reindexing, database.readReindexing("cluster")); + assertEquals(reindexing, database.readReindexing()); assertTrue(shutDown.get(), "Session was shut down"); assertEquals(1.0, // new ProgressToken() is 100% done. metric.metrics().get("reindexing.progress") @@ -168,17 +168,17 @@ class ReindexerTest { clock.advance(Duration.ofMillis(10)); shutDown.set(false); new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(30)), database, parameters -> { - database.writeReindexing(Reindexing.empty(), "cluster"); // Wipe database to verify we write data from reindexer. + database.writeReindexing(Reindexing.empty()); // Wipe database to verify we write data from reindexer. executor.execute(() -> parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.FAILURE, "Error")); return () -> shutDown.set(true); }, metric, clock).reindex(); reindexing = reindexing.with(music, Status.ready(clock.instant()).running().failed(clock.instant(), "Error")); - assertEquals(reindexing, database.readReindexing("cluster")); + assertEquals(reindexing, database.readReindexing()); assertTrue(shutDown.get(), "Session was shut down"); // Document type is ignored in next run, as it has failed fatally. new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(30)), database, failIfCalled, metric, clock).reindex(); - assertEquals(reindexing, database.readReindexing("cluster")); + assertEquals(reindexing, database.readReindexing()); } } diff --git a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingCuratorTest.java b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingCuratorTest.java index 7d4cb2af47e..c5a58dcae68 100644 --- a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingCuratorTest.java +++ b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingCuratorTest.java @@ -25,24 +25,24 @@ class ReindexingCuratorTest { DocumentTypeManager manager = new DocumentTypeManager(musicConfig); DocumentType music = manager.getDocumentType("music"); MockCurator mockCurator = new MockCurator(); - ReindexingCurator curator = new ReindexingCurator(mockCurator, manager); + ReindexingCurator curator = new ReindexingCurator(mockCurator, "cluster", manager); - assertEquals(Reindexing.empty(), curator.readReindexing("cluster")); + assertEquals(Reindexing.empty(), curator.readReindexing()); Reindexing.Status status = Reindexing.Status.ready(Instant.ofEpochMilli(123)) .running() .progressed(new ProgressToken()); Reindexing reindexing = Reindexing.empty().with(music, status); - curator.writeReindexing(reindexing, "cluster"); - assertEquals(reindexing, curator.readReindexing("cluster")); + curator.writeReindexing(reindexing); + assertEquals(reindexing, curator.readReindexing()); status = status.halted().running().failed(Instant.ofEpochMilli(321), "error"); reindexing = reindexing.with(music, status); - curator.writeReindexing(reindexing, "cluster"); - assertEquals(reindexing, curator.readReindexing("cluster")); + curator.writeReindexing(reindexing); + assertEquals(reindexing, curator.readReindexing()); // Unknown document types are forgotten. - assertEquals(Reindexing.empty(), new ReindexingCurator(mockCurator, new DocumentTypeManager(emptyConfig)).readReindexing("cluster")); + assertEquals(Reindexing.empty(), new ReindexingCurator(mockCurator, "cluster", new DocumentTypeManager(emptyConfig)).readReindexing()); } } diff --git a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingMaintainerTest.java b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingMaintainerTest.java index afa68debadb..713fb836d62 100644 --- a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingMaintainerTest.java +++ b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingMaintainerTest.java @@ -31,15 +31,17 @@ class ReindexingMaintainerTest { DocumentTypeManager manager = new DocumentTypeManager(musicConfig); assertEquals(Map.of(manager.getDocumentType("music"), Instant.ofEpochMilli(123)), - parseReady(new ReindexingConfig.Clusters.Builder() - .documentTypes("music", new ReindexingConfig.Clusters.DocumentTypes.Builder().readyAtMillis(123)) + parseReady(new ReindexingConfig.Builder() + .enabled(true) + .clusterName("cluster") + .status("music", new ReindexingConfig.Status.Builder().readyAtMillis(123)) .build(), manager)); // Unknown document type fails - assertThrows(NullPointerException.class, - () -> parseReady(new ReindexingConfig.Clusters.Builder() - .documentTypes("poetry", new ReindexingConfig.Clusters.DocumentTypes.Builder().readyAtMillis(123)) + assertThrows(IllegalArgumentException.class, + () -> parseReady(new ReindexingConfig.Builder() + .status("poetry", new ReindexingConfig.Status.Builder().readyAtMillis(123)) .build(), manager)); diff --git a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/http/ReindexingV1ApiTest.java b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/http/ReindexingV1ApiTest.java index b8f62050347..1b6379d21e5 100644 --- a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/http/ReindexingV1ApiTest.java +++ b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/http/ReindexingV1ApiTest.java @@ -15,7 +15,6 @@ import com.yahoo.vespa.curator.mock.MockCurator; import org.junit.jupiter.api.Test; import java.time.Instant; -import java.util.List; import java.util.concurrent.Executors; import static com.yahoo.jdisc.http.HttpRequest.Method.POST; @@ -29,9 +28,8 @@ class ReindexingV1ApiTest { DocumentmanagerConfig musicConfig = Deriver.getDocumentManagerConfig("src/test/resources/schemas/music.sd").build(); DocumentTypeManager manager = new DocumentTypeManager(musicConfig); DocumentType musicType = manager.getDocumentType("music"); - ReindexingCurator database = new ReindexingCurator(new MockCurator(), manager); - ReindexingV1ApiHandler handler = new ReindexingV1ApiHandler(Executors.newSingleThreadExecutor(), new MockMetric(), - List.of("cluster", "oyster"), database); + ReindexingCurator database = new ReindexingCurator(new MockCurator(), "cluster", manager); + ReindexingV1ApiHandler handler = new ReindexingV1ApiHandler(Executors.newSingleThreadExecutor(), new MockMetric(), database); @Test void testResponses() { @@ -45,33 +43,23 @@ class ReindexingV1ApiTest { // GET at status with empty database response = driver.sendRequest("http://localhost/reindexing/v1/status"); - assertEquals("{\"clusters\":{\"cluster\":{\"documentTypes\":{}},\"oyster\":{\"documentTypes\":{}}}}", response.readAll()); + assertEquals("{\"status\":[]}", response.readAll()); assertEquals(200, response.getStatus()); // GET at status with a failed status database.writeReindexing(Reindexing.empty().with(musicType, Status.ready(Instant.EPOCH) .running() .progressed(new ProgressToken()) - .failed(Instant.ofEpochMilli(123), "ヽ(。_°)ノ")), - "cluster"); + .failed(Instant.ofEpochMilli(123), "ヽ(。_°)ノ"))); response = driver.sendRequest("http://localhost/reindexing/v1/status"); - assertEquals("{" + - "\"clusters\":{" + - "\"cluster\":{" + - "\"documentTypes\":{" + - "\"music\":{" + - "\"startedMillis\":0," + - "\"endedMillis\":123," + - "\"progress\":\"" + new ProgressToken().serializeToString() + "\"," + - "\"state\":\"failed\"," + - "\"message\":\"ヽ(。_°)ノ\"}" + - "}" + - "}," + - "\"oyster\":{" + - "\"documentTypes\":{}" + - "}" + - "}" + - "}", + assertEquals("{\"status\":[{" + + "\"type\":\"music\"," + + "\"startedMillis\":0," + + "\"endedMillis\":123," + + "\"progress\":\"" + new ProgressToken().serializeToString() + "\"," + + "\"state\":\"failed\"," + + "\"message\":\"ヽ(。_°)ノ\"}" + + "]}", response.readAll()); assertEquals(200, response.getStatus()); diff --git a/configdefinitions/src/vespa/reindexing.def b/configdefinitions/src/vespa/reindexing.def index e020aec3f65..d577f62b10b 100644 --- a/configdefinitions/src/vespa/reindexing.def +++ b/configdefinitions/src/vespa/reindexing.def @@ -6,13 +6,8 @@ namespace=vespa.config.content.reindexing # Whether reindexing should run at all enabled bool default=false -# TODO jonmv: remove after 7.310 is gone # The name of the content cluster to reindex documents from -clusterName string default="" +clusterName string -# TODO jonmv: remove after 7.310 is gone # Epoch millis after which latest reprocessing may begin, per document type status{}.readyAtMillis long - -# Epoch millis after which latest reprocessing may begin, per document type, per cluster -clusters{}.documentTypes{}.readyAtMillis long diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ClusterReindexing.java b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ClusterReindexing.java index 4b7148463f9..7f0671820d3 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ClusterReindexing.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ClusterReindexing.java @@ -27,26 +27,6 @@ public class ClusterReindexing { public Map<String, Status> documentTypeStatus() { return documentTypeStatus; } - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - ClusterReindexing that = (ClusterReindexing) o; - return documentTypeStatus.equals(that.documentTypeStatus); - } - - @Override - public int hashCode() { - return Objects.hash(documentTypeStatus); - } - - @Override - public String toString() { - return "ClusterReindexing{" + - "documentTypeStatus=" + documentTypeStatus + - '}'; - } - public static class Status { @@ -69,35 +49,6 @@ public class ClusterReindexing { public Optional<State> state() { return Optional.ofNullable(state); } public Optional<String> message() { return Optional.ofNullable(message); } public Optional<String> progress() { return Optional.ofNullable(progress); } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - Status status = (Status) o; - return startedAt.equals(status.startedAt) && - Objects.equals(endedAt, status.endedAt) && - state == status.state && - Objects.equals(message, status.message) && - Objects.equals(progress, status.progress); - } - - @Override - public int hashCode() { - return Objects.hash(startedAt, endedAt, state, message, progress); - } - - @Override - public String toString() { - return "Status{" + - "startedAt=" + startedAt + - ", endedAt=" + endedAt + - ", state=" + state + - ", message='" + message + '\'' + - ", progress='" + progress + '\'' + - '}'; - } - } @@ -116,7 +67,5 @@ public class ClusterReindexing { } public String asString() { return stringValue; } - } - } diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClient.java b/configserver/src/main/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClient.java index 0124b6822f0..fef0120a431 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClient.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClient.java @@ -55,21 +55,21 @@ public class DefaultClusterReindexingStatusClient implements ClusterReindexingSt @Override public Map<String, ClusterReindexing> getReindexingStatus(ModelResult application) throws IOException { Map<ClusterId, List<ServiceInfo>> clusters = clusterControllerClusters(application); - Map<ClusterId, CompletableFuture<Map<String, ClusterReindexing>>> futureStatusPerCluster = new HashMap<>(); + Map<ClusterId, CompletableFuture<ClusterReindexing>> futureStatusPerCluster = new HashMap<>(); clusters.forEach((clusterId, clusterNodes) -> { var parallelRequests = clusterNodes.stream() .map(this::getReindexingStatus) .collect(Collectors.toList()); - CompletableFuture<Map<String, ClusterReindexing>> combinedRequest = CompletableFutures.firstOf(parallelRequests); + CompletableFuture<ClusterReindexing> combinedRequest = CompletableFutures.firstOf(parallelRequests); futureStatusPerCluster.put(clusterId, combinedRequest); }); try { Map<String, ClusterReindexing> statusPerCluster = new HashMap<>(); futureStatusPerCluster.forEach((clusterId, futureStatus) -> { - statusPerCluster.putAll(futureStatus.join()); + statusPerCluster.put(clusterId.s(), futureStatus.join()); }); - return Map.copyOf(statusPerCluster); + return statusPerCluster; } catch (Exception e) { throw new IOException("Failed to get reindexing status from cluster controllers: " + e.getMessage(), e); } @@ -77,7 +77,7 @@ public class DefaultClusterReindexingStatusClient implements ClusterReindexingSt @Override public void close() { uncheck(() -> httpClient.close()); } - private CompletableFuture<Map<String, ClusterReindexing>> getReindexingStatus(ServiceInfo service) { + private CompletableFuture<ClusterReindexing> getReindexingStatus(ServiceInfo service) { URI uri = URI.create(String.format("http://%s:%d/reindexing/v1/status", service.getHostName(), getStatePort(service))); CompletableFuture<SimpleHttpResponse> responsePromise = new CompletableFuture<>(); httpClient.execute(SimpleHttpRequests.get(uri), new FutureCallback<>() { @@ -94,40 +94,33 @@ public class DefaultClusterReindexingStatusClient implements ClusterReindexingSt }, executor); } - private static Map<String, ClusterReindexing> toClusterReindexing(SimpleHttpResponse response) throws IOException { + private static ClusterReindexing toClusterReindexing(SimpleHttpResponse response) throws IOException { if (response.getCode() != HttpStatus.SC_OK) throw new IOException("Expected status code 200, got " + response.getCode()); if (response.getBody() == null) throw new IOException("Response has no content"); return toClusterReindexing(response.getBodyBytes()); } - private static Map<String, ClusterReindexing> toClusterReindexing(byte[] requestBody) throws IOException { + private static ClusterReindexing toClusterReindexing(byte[] requestBody) throws IOException { JsonNode jsonNode = mapper.readTree(requestBody); - Map<String, ClusterReindexing> clusters = new HashMap<>(); - for (var clusterNames = jsonNode.get("clusters").fieldNames(); clusterNames.hasNext(); ) { - String clusterName = clusterNames.next(); - JsonNode clusterJson = jsonNode.get("clusters").get(clusterName); - Map<String, ClusterReindexing.Status> documentStatuses = new HashMap<>(); - for (var documentTypes = clusterJson.get("documentTypes").fieldNames(); documentTypes.hasNext(); ) { - String type = documentTypes.next(); - JsonNode statusJson = clusterJson.get("documentTypes").get(type); - Instant startedMillis = Instant.ofEpochMilli(statusJson.get("startedMillis").longValue()); - Instant endedMillis = Optional.ofNullable(statusJson.get("endedMillis")) - .map(json -> Instant.ofEpochMilli(json.longValue())) - .orElse(null); - String progressToken = Optional.ofNullable(statusJson.get("progress")) - .map(JsonNode::textValue) - .orElse(null); - ClusterReindexing.State state = Optional.ofNullable(statusJson.get("state")) - .map(json -> ClusterReindexing.State.fromString(json.textValue())) - .orElse(null); - String message = Optional.ofNullable(statusJson.get("message")) - .map(JsonNode::textValue) - .orElse(null); - documentStatuses.put(type, new ClusterReindexing.Status(startedMillis, endedMillis, state, message, progressToken)); - } - clusters.put(clusterName, new ClusterReindexing(documentStatuses)); + Map<String, ClusterReindexing.Status> documentStatuses = new HashMap<>(); + for (JsonNode statusJson : jsonNode.get("status")) { + String type = statusJson.get("type").textValue(); + Instant startedMillis = Instant.ofEpochMilli(statusJson.get("startedMillis").longValue()); + Instant endedMillis = Optional.ofNullable(statusJson.get("endedMillis")) + .map(json -> Instant.ofEpochMilli(json.longValue())) + .orElse(null); + String progressToken = Optional.ofNullable(statusJson.get("progress")) + .map(JsonNode::textValue) + .orElse(null); + ClusterReindexing.State state = Optional.ofNullable(statusJson.get("state")) + .map(json -> ClusterReindexing.State.fromString(json.textValue())) + .orElse(null); + String message = Optional.ofNullable(statusJson.get("message")) + .map(JsonNode::textValue) + .orElse(null); + documentStatuses.put(type, new ClusterReindexing.Status(startedMillis, endedMillis, state, message, progressToken)); } - return Map.copyOf(clusters); + return new ClusterReindexing(documentStatuses); } private static int getStatePort(ServiceInfo service) { diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClientTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClientTest.java index 82e1bd96373..21894e4a756 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClientTest.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClientTest.java @@ -7,12 +7,10 @@ import com.yahoo.config.model.api.PortInfo; import com.yahoo.config.model.api.ServiceInfo; import com.yahoo.documentapi.ProgressToken; import com.yahoo.vespa.config.server.modelfactory.ModelResult; -import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import java.io.IOException; -import java.time.Instant; import java.util.Collection; import java.util.List; import java.util.Map; @@ -23,7 +21,6 @@ import static com.github.tomakehurst.wiremock.client.WireMock.serverError; import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.options; import static com.yahoo.config.model.api.container.ContainerServiceType.CLUSTERCONTROLLER_CONTAINER; -import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -43,50 +40,25 @@ public class DefaultClusterReindexingStatusClientTest { String uriPath = "/reindexing/v1/status"; server1.stubFor(get(urlEqualTo(uriPath)).willReturn(serverError())); server2.stubFor(get(urlEqualTo(uriPath)).willReturn(okJson( - "{" + - " \"clusters\": {" + - " \"cluster1\": {" + - " \"documentTypes\": {" + - " \"music\": {" + - " \"startedMillis\":0," + - " \"state\": \"" + ClusterReindexing.State.RUNNING.asString() + "\"" + - " }" + - " }" + - " }" + - " }" + - "}"))); + "{\"status\":[{" + + "\"type\":\"music\"," + + "\"startedMillis\":0," + + "\"endedMillis\":123," + + "\"progress\":\"" + new ProgressToken().serializeToString() + "\"," + + "\"state\": \"" + ClusterReindexing.State.FAILED.asString() + "\"," + + "\"message\":\"something went wrong\"}" + + "]}"))); server3.stubFor(get(urlEqualTo(uriPath)).willReturn(okJson( - "{" + - " \"clusters\": {" + - " \"cluster2\": {" + - " \"documentTypes\": {" + - " \"artist\": {" + - " \"startedMillis\":50," + - " \"endedMillis\":150," + - " \"progress\":\"half-done\"," + - " \"state\": \"" + ClusterReindexing.State.SUCCESSFUL.asString() + "\"," + - " \"message\":\"success\"" + - " }" + - " }" + - " }" + - " }" + - "}"))); - Map<String, ClusterReindexing> expected = Map.of("cluster1", - new ClusterReindexing(Map.of("music", - new ClusterReindexing.Status(Instant.ofEpochMilli(0), - null, - ClusterReindexing.State.RUNNING, - null, - null))), - "cluster2", - new ClusterReindexing(Map.of("artist", - new ClusterReindexing.Status(Instant.ofEpochMilli(50), - Instant.ofEpochMilli(150), - ClusterReindexing.State.SUCCESSFUL, - "success", - "half-done")))); + "{\"status\":[{" + + "\"type\":\"artist\"," + + "\"startedMillis\":10," + + "\"endedMillis\":150," + + "\"progress\":\"" + new ProgressToken().serializeToString() + "\"," + + "\"state\": \"" + ClusterReindexing.State.SUCCESSFUL.asString() + "\"," + + "\"message\":\"successs\"}" + + "]}"))); Map<String, ClusterReindexing> result = client.getReindexingStatus(app); - assertEquals(expected, result); + System.out.println(result); } |