From 51b034bd2ae32161990c88bfac71b057084d818b Mon Sep 17 00:00:00 2001 From: Jon Marius Venstad Date: Mon, 18 Jan 2021 09:48:03 +0100 Subject: Improved model for reindexing data Making the users of reindexing data config aware allows getting rid of the confusing default-for-app-and-cluster status, and the dummy first-time reindexing in the reindexer. Since the upgraded config servers will wipe the "common" status, the non-upgraded config servers will read these as EPOCH, instead of some time last year. This should not be a problem for any users of these data, as far as I can tell --- .../main/java/ai/vespa/reindexing/Reindexer.java | 6 +- .../java/ai/vespa/reindexing/ReindexerTest.java | 56 +++++----- .../vespa/config/server/ApplicationRepository.java | 14 ++- .../application/ApplicationCuratorDatabase.java | 11 +- .../server/application/ApplicationReindexing.java | 113 ++++++--------------- .../server/application/TenantApplications.java | 2 +- .../config/server/http/v2/ApplicationHandler.java | 67 ++++++------ .../config/server/tenant/TenantRepository.java | 1 + .../ApplicationCuratorDatabaseTest.java | 4 +- .../application/ApplicationReindexingTest.java | 46 ++++----- .../config/server/deploy/HostedDeployTest.java | 2 +- .../server/http/v2/ApplicationHandlerTest.java | 29 ++++-- .../maintenance/ReindexingMaintainerTest.java | 17 ++-- 13 files changed, 161 insertions(+), 207 deletions(-) diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java index 37f9d830da8..306e06b7c7e 100644 --- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java @@ -117,11 +117,9 @@ public class Reindexer { } static Reindexing updateWithReady(Map ready, Reindexing reindexing, Instant now) { - for (DocumentType type : ready.keySet()) { // We consider update for document types for which we have config. + for (DocumentType type : ready.keySet()) { // We update only for document types for which we have config. if ( ! ready.get(type).isAfter(now)) { - Status status = reindexing.status().getOrDefault(type, Status.ready(now) - .running() - .successful(now)); + Status status = reindexing.status().getOrDefault(type, Status.ready(now)); if (status.startedAt().isBefore(ready.get(type))) status = Status.ready(now); diff --git a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java index 01586e06015..0f290250e2d 100644 --- a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java +++ b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java @@ -27,7 +27,6 @@ import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Function; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertSame; @@ -40,7 +39,7 @@ import static org.junit.jupiter.api.Assertions.fail; */ class ReindexerTest { - static final Function failIfCalled = __ -> () -> fail("Not supposed to run"); + static Runnable failIfCalled(VisitorParameters ignored) { throw new AssertionError("Not supposed to run"); } final DocumentmanagerConfig musicConfig = Deriver.getDocumentManagerConfig("src/test/resources/schemas/music.sd").build(); final DocumentTypeManager manager = new DocumentTypeManager(musicConfig); @@ -59,12 +58,12 @@ class ReindexerTest { @Test void throwsWhenUnknownBuckets() { assertThrows(NullPointerException.class, - () -> new Reindexer(new Cluster("cluster", Map.of()), Map.of(music, Instant.EPOCH), database, failIfCalled, metric, clock, 0.2)); + () -> new Reindexer(new Cluster("cluster", Map.of()), Map.of(music, Instant.EPOCH), database, ReindexerTest::failIfCalled, metric, clock, 0.2)); } @Test void throwsWhenLockHeldElsewhere() throws InterruptedException, ExecutionException { - Reindexer reindexer = new Reindexer(cluster, Map.of(music, Instant.EPOCH), database, failIfCalled, metric, clock, 0.2); + Reindexer reindexer = new Reindexer(cluster, Map.of(music, Instant.EPOCH), database, ReindexerTest::failIfCalled, metric, clock, 0.2); Executors.newSingleThreadExecutor().submit(() -> database.lockReindexing("cluster")).get(); assertThrows(ReindexingLockException.class, reindexer::reindex); } @@ -72,13 +71,13 @@ class ReindexerTest { @Test @Timeout(10) void nothingToDoWithEmptyConfig() throws ReindexingLockException { - new Reindexer(cluster, Map.of(), database, failIfCalled, metric, clock, 0.2).reindex(); + new Reindexer(cluster, Map.of(), database, ReindexerTest::failIfCalled, metric, clock, 0.2).reindex(); assertEquals(Map.of(), metric.metrics()); } @Test void testParameters() { - Reindexer reindexer = new Reindexer(cluster, Map.of(), database, failIfCalled, metric, clock, 0.2); + Reindexer reindexer = new Reindexer(cluster, Map.of(), database, ReindexerTest::failIfCalled, metric, clock, 0.2); ProgressToken token = new ProgressToken(); VisitorParameters parameters = reindexer.createParameters(music, token); assertEquals("music:[document]", parameters.getFieldSet()); @@ -93,33 +92,15 @@ class ReindexerTest { @Test @Timeout(10) void testReindexing() throws ReindexingLockException { - // Reindexer is told to update "music" documents no earlier than EPOCH, which is just now. - // Since "music" is a new document type, it is stored as just reindexed, and nothing else happens. - new Reindexer(cluster, Map.of(music, Instant.EPOCH), database, failIfCalled, metric, clock, 0.2).reindex(); - Reindexing reindexing = Reindexing.empty().with(music, Status.ready(Instant.EPOCH).running().successful(Instant.EPOCH)); + // Reindexer is created without any ready document types, which means nothing should run. + new Reindexer(cluster, Map.of(), database, ReindexerTest::failIfCalled, metric, clock, 0.2).reindex(); + Reindexing reindexing = Reindexing.empty(); assertEquals(reindexing, database.readReindexing("cluster")); - assertEquals(Map.of("reindexing.progress", Map.of(Map.of("documenttype", "music", - "clusterid", "cluster", - "state", "successful"), - 1.0, - Map.of("documenttype", "music", - "clusterid", "cluster", - "state", "pending"), - -1.0, - Map.of("documenttype", "music", - "clusterid", "cluster", - "state", "failed"), - -1.0, - Map.of("documenttype", "music", - "clusterid", "cluster", - "state", "running"), - -1.0)), - metric.metrics()); // New config tells reindexer to reindex "music" documents no earlier than at 10 millis after EPOCH, which isn't yet. // Nothing happens, since it's not yet time. This isn't supposed to happen unless high clock skew. clock.advance(Duration.ofMillis(5)); - new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(10)), database, failIfCalled, metric, clock, 0.2).reindex(); + new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(10)), database, ReindexerTest::failIfCalled, metric, clock, 0.2).reindex(); assertEquals(reindexing, database.readReindexing("cluster")); // It's time to reindex the "music" documents — let this complete successfully. @@ -134,6 +115,23 @@ class ReindexerTest { reindexing = reindexing.with(music, Status.ready(clock.instant()).running().successful(clock.instant())); assertEquals(reindexing, database.readReindexing("cluster")); assertTrue(shutDown.get(), "Session was shut down"); + assertEquals(Map.of("reindexing.progress", Map.of(Map.of("documenttype", "music", + "clusterid", "cluster", + "state", "successful"), + 1.0, + Map.of("documenttype", "music", + "clusterid", "cluster", + "state", "pending"), + -1.0, + Map.of("documenttype", "music", + "clusterid", "cluster", + "state", "failed"), + -1.0, + Map.of("documenttype", "music", + "clusterid", "cluster", + "state", "running"), + -1.0)), + metric.metrics()); // One more reindexing, this time shut down before visit completes, but after progress is reported. clock.advance(Duration.ofMillis(10)); @@ -172,7 +170,7 @@ class ReindexerTest { assertTrue(shutDown.get(), "Session was shut down"); // Document type is ignored in next run, as it has failed fatally. - new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(30)), database, failIfCalled, metric, clock, 0.2).reindex(); + new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(30)), database, ReindexerTest::failIfCalled, metric, clock, 0.2).reindex(); assertEquals(reindexing, database.readReindexing("cluster")); } diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/ApplicationRepository.java b/configserver/src/main/java/com/yahoo/vespa/config/server/ApplicationRepository.java index 1ffefd1a6b7..a341063ddd7 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/ApplicationRepository.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/ApplicationRepository.java @@ -915,12 +915,20 @@ public class ApplicationRepository implements com.yahoo.config.provision.Deploye } public ApplicationReindexing getReindexing(ApplicationId id) { - return getTenant(id).getApplicationRepo().database().readReindexingStatus(id) - .orElse(ApplicationReindexing.ready(clock.instant())); + Tenant tenant = getTenant(id); + if (tenant == null) + throw new NotFoundException("Tenant '" + id.tenant().value() + "' not found"); + + return tenant.getApplicationRepo().database().readReindexingStatus(id) + .orElseThrow(() -> new NotFoundException("Reindexing status not found for " + id)); } public void modifyReindexing(ApplicationId id, UnaryOperator modifications) { - getTenant(id).getApplicationRepo().database().modifyReindexing(id, ApplicationReindexing.ready(clock.instant()), modifications); + Tenant tenant = getTenant(id); + if (tenant == null) + throw new NotFoundException("Tenant '" + id.tenant().value() + "' not found"); + + tenant.getApplicationRepo().database().modifyReindexing(id, ApplicationReindexing.empty(), modifications); } public ConfigserverConfig configserverConfig() { diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ApplicationCuratorDatabase.java b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ApplicationCuratorDatabase.java index f98d58d9fb4..099de9bc2bf 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ApplicationCuratorDatabase.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ApplicationCuratorDatabase.java @@ -73,12 +73,12 @@ public class ApplicationCuratorDatabase { /** * Creates a node for the given application, marking its existence. */ - public void createApplication(ApplicationId id, Instant now) { + public void createApplication(ApplicationId id) { if ( ! id.tenant().equals(tenant)) throw new IllegalArgumentException("Cannot write application id '" + id + "' for tenant '" + tenant + "'"); try (Lock lock = lock(id)) { curator.create(applicationPath(id)); - modifyReindexing(id, ApplicationReindexing.ready(now), UnaryOperator.identity()); + modifyReindexing(id, ApplicationReindexing.empty(), UnaryOperator.identity()); } } @@ -158,7 +158,6 @@ public class ApplicationCuratorDatabase { private static class ReindexingStatusSerializer { - private static final String COMMON = "common"; private static final String ENABLED = "enabled"; private static final String CLUSTERS = "clusters"; private static final String PENDING = "pending"; @@ -171,13 +170,11 @@ public class ApplicationCuratorDatabase { private static byte[] toBytes(ApplicationReindexing reindexing) { Cursor root = new Slime().setObject(); root.setBool(ENABLED, reindexing.enabled()); - setStatus(root.setObject(COMMON), reindexing.common()); Cursor clustersArray = root.setArray(CLUSTERS); reindexing.clusters().forEach((name, cluster) -> { Cursor clusterObject = clustersArray.addObject(); clusterObject.setString(NAME, name); - setStatus(clusterObject.setObject(COMMON), cluster.common()); Cursor pendingArray = clusterObject.setArray(PENDING); cluster.pending().forEach((type, generation) -> { @@ -203,15 +200,13 @@ public class ApplicationCuratorDatabase { private static ApplicationReindexing fromBytes(byte[] data) { Cursor root = SlimeUtils.jsonToSlimeOrThrow(data).get(); return new ApplicationReindexing(root.field(ENABLED).valid() ? root.field(ENABLED).asBool() : true, - getStatus(root.field(COMMON)), SlimeUtils.entriesStream(root.field(CLUSTERS)) .collect(toUnmodifiableMap(object -> object.field(NAME).asString(), object -> getCluster(object)))); } private static Cluster getCluster(Inspector object) { - return new Cluster(getStatus(object.field(COMMON)), - SlimeUtils.entriesStream(object.field(PENDING)) + return new Cluster(SlimeUtils.entriesStream(object.field(PENDING)) .collect(toUnmodifiableMap(entry -> entry.field(TYPE).asString(), entry -> entry.field(GENERATION).asLong())), SlimeUtils.entriesStream(object.field(READY)) diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ApplicationReindexing.java b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ApplicationReindexing.java index c603b4af6a5..63aca62768d 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ApplicationReindexing.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ApplicationReindexing.java @@ -12,6 +12,7 @@ import com.yahoo.vespa.model.search.DocumentDatabase; import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.Collection; +import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -26,9 +27,8 @@ import static java.util.stream.Collectors.toUnmodifiableMap; import static java.util.stream.Collectors.toUnmodifiableSet; /** - * Pending and ready reindexing per document type. Each document type can have either a pending or a ready reindexing. - * Each cluster may also have a global status, which is merged with its document type-specific status, by selecting - * whichever status is ready the latest. The application may also have a global status, which is merged likewise. + * Pending reindexing: convergence to the stored config generation allows reindexing to start. + * Ready reindexing: reindexing may start after this timestamp. * This is immutable. * * @author jonmv @@ -36,72 +36,48 @@ import static java.util.stream.Collectors.toUnmodifiableSet; public class ApplicationReindexing implements Reindexing { private final boolean enabled; - private final Status common; private final Map clusters; - ApplicationReindexing(boolean enabled, Status common, Map clusters) { + ApplicationReindexing(boolean enabled, Map clusters) { this.enabled = enabled; - this.common = requireNonNull(common); this.clusters = Map.copyOf(clusters); } /** Reindexing for the whole application ready now. */ - public static ApplicationReindexing ready(Instant now) { - return new ApplicationReindexing(true, new Status(now), Map.of()); - } - - /** Returns a copy of this with reindexing for the whole application ready at the given instant. */ - public ApplicationReindexing withReady(Instant readyAt) { - return new ApplicationReindexing(enabled, - new Status(readyAt), - clusters.entrySet().stream() - .filter(cluster -> ! cluster.getValue().pending.isEmpty()) - .collect(toUnmodifiableMap(cluster -> cluster.getKey(), - cluster -> new Cluster(new Status(readyAt), - cluster.getValue().pending, - Map.of())))); - } - - /** Returns a copy of this with reindexing for the given cluster ready at the given instant. */ - public ApplicationReindexing withReady(String cluster, Instant readyAt) { - Cluster current = clusters.getOrDefault(cluster, Cluster.ready(common)); - Cluster modified = new Cluster(new Status(readyAt), current.pending, Map.of()); - return new ApplicationReindexing(enabled, common, with(cluster, modified, clusters)); + public static ApplicationReindexing empty() { + return new ApplicationReindexing(true, Map.of()); } /** Returns a copy of this with reindexing for the given document type in the given cluster ready at the given instant. */ public ApplicationReindexing withReady(String cluster, String documentType, Instant readyAt) { - Cluster current = clusters.getOrDefault(cluster, Cluster.ready(common)); - Cluster modified = new Cluster(current.common, - current.pending, + Cluster current = clusters.getOrDefault(cluster, Cluster.empty()); + Cluster modified = new Cluster(current.pending, with(documentType, new Status(readyAt), current.ready)); - return new ApplicationReindexing(enabled, common, with(cluster, modified, clusters)); + return new ApplicationReindexing(enabled, with(cluster, modified, clusters)); } /** Returns a copy of this with a pending reindexing at the given generation, for the given document type. */ public ApplicationReindexing withPending(String cluster, String documentType, long requiredGeneration) { - Cluster current = clusters.getOrDefault(cluster, Cluster.ready(common)); - Cluster modified = new Cluster(current.common, - with(documentType, requirePositive(requiredGeneration), current.pending), + Cluster current = clusters.getOrDefault(cluster, Cluster.empty()); + Cluster modified = new Cluster(with(documentType, requirePositive(requiredGeneration), current.pending), current.ready); - return new ApplicationReindexing(enabled, common, with(cluster, modified, clusters)); + return new ApplicationReindexing(enabled, with(cluster, modified, clusters)); } /** Returns a copy of this with no pending reindexing for the given document type. */ public ApplicationReindexing withoutPending(String cluster, String documentType) { - Cluster current = clusters.getOrDefault(cluster, Cluster.ready(common)); + Cluster current = clusters.getOrDefault(cluster, Cluster.empty()); if (current == null) return this; - Cluster modified = new Cluster(current.common, - without(documentType, current.pending), + Cluster modified = new Cluster(without(documentType, current.pending), current.ready); - return new ApplicationReindexing(enabled, common, with(cluster, modified, clusters)); + return new ApplicationReindexing(enabled, with(cluster, modified, clusters)); } /** Returns a copy of this without the given cluster. */ public ApplicationReindexing without(String cluster) { - return new ApplicationReindexing(enabled, common, without(cluster, clusters)); + return new ApplicationReindexing(enabled, without(cluster, clusters)); } /** Returns a copy of this without the given document type in the given cluster. */ @@ -110,15 +86,14 @@ public class ApplicationReindexing implements Reindexing { if (current == null) return this; - Cluster modified = new Cluster(current.common, - current.pending, + Cluster modified = new Cluster(current.pending, without(documentType, current.ready)); - return new ApplicationReindexing(enabled, common, with(cluster, modified, clusters)); + return new ApplicationReindexing(enabled, with(cluster, modified, clusters)); } /** Returns a copy of this with the enabled-state set to the given value. */ public ApplicationReindexing enabled(boolean enabled) { - return new ApplicationReindexing(enabled, common, clusters); + return new ApplicationReindexing(enabled, clusters); } @Override @@ -126,21 +101,12 @@ public class ApplicationReindexing implements Reindexing { return enabled; } - /** The common reindexing status for the whole application. */ - public Status common() { - return common; - } - /** The reindexing status of each of the clusters of this application. */ public Map clusters() { return clusters; } @Override - public Optional status(String cluster, String documentType) { - return ! clusters.containsKey(cluster) - ? Optional.of(common()) - : ! clusters.get(cluster).ready().containsKey(documentType) - ? Optional.of(clusters.get(cluster).common()) - : Optional.of(clusters.get(cluster).ready().get(documentType)); + public Optional status(String clusterName, String documentType) { + return Optional.ofNullable(clusters.get(clusterName)).map(cluster -> cluster.ready().get(documentType)); } @Override @@ -149,20 +115,18 @@ public class ApplicationReindexing implements Reindexing { if (o == null || getClass() != o.getClass()) return false; ApplicationReindexing that = (ApplicationReindexing) o; return enabled == that.enabled && - common.equals(that.common) && clusters.equals(that.clusters); } @Override public int hashCode() { - return Objects.hash(enabled, common, clusters); + return Objects.hash(enabled, clusters); } @Override public String toString() { return "ApplicationReindexing{" + "enabled=" + enabled + - ", common=" + common + ", clusters=" + clusters + '}'; } @@ -170,23 +134,16 @@ public class ApplicationReindexing implements Reindexing { /** Reindexing status for a single content cluster in an application. */ public static class Cluster { - private static Cluster ready(Status common) { return new Cluster(common, Map.of(), Map.of()); } + private static Cluster empty() { return new Cluster(Map.of(), Map.of()); } - private final Status common; private final Map pending; private final Map ready; - Cluster(Status common, Map pending, Map ready) { - this.common = requireNonNull(common); + Cluster(Map pending, Map ready) { this.pending = Map.copyOf(pending); this.ready = Map.copyOf(ready); } - /** The common reindexing status for all document types in this cluster. */ - public Status common() { - return common; - } - /** The config generation at which the application must have converged for the latest reindexing to begin, per document type. */ public Map pending() { return pending; @@ -202,21 +159,19 @@ public class ApplicationReindexing implements Reindexing { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Cluster cluster = (Cluster) o; - return common.equals(cluster.common) && - pending.equals(cluster.pending) && + return pending.equals(cluster.pending) && ready.equals(cluster.ready); } @Override public int hashCode() { - return Objects.hash(common, pending, ready); + return Objects.hash(pending, ready); } @Override public String toString() { return "Cluster{" + - "common=" + common + - ", pending=" + pending + + "pending=" + pending + ", ready=" + ready + '}'; } @@ -265,17 +220,15 @@ public class ApplicationReindexing implements Reindexing { } private static Map without(String removed, Map map) { - return map.keySet().stream() - .filter(key -> ! removed.equals(key)) - .collect(toUnmodifiableMap(key -> key, - key -> map.get(key))); + Map modified = new HashMap<>(map); + modified.remove(removed); + return Map.copyOf(modified); } private static Map with(String added, T value, Map map) { - return Stream.concat(Stream.of(added), map.keySet().stream()).distinct() - .collect(toUnmodifiableMap(key -> key, - key -> added.equals(key) ? value - : map.get(key))); + Map modified = new HashMap<>(map); + modified.put(added, value); + return Map.copyOf(modified); } } diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/application/TenantApplications.java b/configserver/src/main/java/com/yahoo/vespa/config/server/application/TenantApplications.java index 0e5315c0dcc..c4908b8827b 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/application/TenantApplications.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/application/TenantApplications.java @@ -142,7 +142,7 @@ public class TenantApplications implements RequestHandler, HostValidator new NotFoundException("Application '" + id + "' not found")) + .getForVersionOrLatest(Optional.empty(), applicationRepository.clock().instant()) + .getModel(); + } + private HttpResponse triggerReindexing(HttpRequest request, ApplicationId applicationId) { - Application application = applicationRepository.getActiveApplicationSet(applicationId) - .orElseThrow(() -> new NotFoundException(applicationId + " not found")) - .getForVersionOrLatest(Optional.empty(), applicationRepository.clock().instant()); - Map> documentTypes = application.getModel().documentTypesByCluster(); - Map> indexedDocumentTypes = application.getModel().indexedDocumentTypesByCluster(); + Model model = getActiveModelOrThrow(applicationId); + Map> documentTypes = model.documentTypesByCluster(); + Map> indexedDocumentTypes = model.indexedDocumentTypesByCluster(); boolean indexedOnly = request.getBooleanProperty("indexedOnly"); Set clusters = StringUtilities.split(request.getProperty("clusterId")); @@ -279,9 +283,8 @@ public class ApplicationHandler extends HttpHandler { if (tenant == null) throw new NotFoundException("Tenant '" + applicationId.tenant().value() + "' not found"); - return new ReindexingResponse(tenant.getApplicationRepo().database() - .readReindexingStatus(applicationId) - .orElseThrow(() -> new NotFoundException("Reindexing status not found for " + applicationId)), + return new ReindexingResponse(getActiveModelOrThrow(applicationId).documentTypesByCluster(), + applicationRepository.getReindexing(applicationId), applicationRepository.getClusterReindexingStatus(applicationId)); } @@ -471,33 +474,33 @@ public class ApplicationHandler extends HttpHandler { } static class ReindexingResponse extends JSONResponse { - ReindexingResponse(ApplicationReindexing reindexing, Map clusters) { + ReindexingResponse(Map> documentTypes, ApplicationReindexing reindexing, + Map clusters) { super(Response.Status.OK); object.setBool("enabled", reindexing.enabled()); Cursor clustersObject = object.setObject("clusters"); - Stream clusterNames = Stream.concat(clusters.keySet().stream(), reindexing.clusters().keySet().stream()); - clusterNames.sorted() - .forEach(clusterName -> { - Cursor clusterObject = clustersObject.setObject(clusterName); - Cursor pendingObject = clusterObject.setObject("pending"); - Cursor readyObject = clusterObject.setObject("ready"); - - Map statuses = new HashMap<>(); - if (reindexing.clusters().containsKey(clusterName)) { - reindexing.clusters().get(clusterName).pending().entrySet().stream().sorted(comparingByKey()) - .forEach(pending -> pendingObject.setLong(pending.getKey(), pending.getValue())); - - reindexing.clusters().get(clusterName).ready().entrySet().stream().sorted(comparingByKey()) - .forEach(ready -> setStatus(statuses.computeIfAbsent(ready.getKey(), readyObject::setObject), ready.getValue())); - } - if (clusters.containsKey(clusterName)) - clusters.get(clusterName).documentTypeStatus().entrySet().stream().sorted(comparingByKey()) - .forEach(status -> setStatus(statuses.computeIfAbsent(status.getKey(), readyObject::setObject), status.getValue())); - - }); + documentTypes.forEach((cluster, types) -> { + Cursor clusterObject = clustersObject.setObject(cluster); + Cursor pendingObject = clusterObject.setObject("pending"); + Cursor readyObject = clusterObject.setObject("ready"); + + for (String type : types) { + Cursor statusObject = readyObject.setObject(type); + if (reindexing.clusters().containsKey(cluster)) { + if (reindexing.clusters().get(cluster).pending().containsKey(type)) + pendingObject.setLong(type, reindexing.clusters().get(cluster).pending().get(type)); + + if (reindexing.clusters().get(cluster).ready().containsKey(type)) + setStatus(statusObject, reindexing.clusters().get(cluster).ready().get(type)); + } + if (clusters.containsKey(cluster)) + if (clusters.get(cluster).documentTypeStatus().containsKey(type)) + setStatus(statusObject, clusters.get(cluster).documentTypeStatus().get(type)); + } + }); } - private static void setStatus(Cursor object, ApplicationReindexing.Status readyStatus) { + private static void setStatus(Cursor object, ApplicationReindexing.Status readyStatus) { object.setLong("readyMillis", readyStatus.ready().toEpochMilli()); } diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/TenantRepository.java b/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/TenantRepository.java index a2fa7851173..ce382e8698e 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/TenantRepository.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/TenantRepository.java @@ -427,6 +427,7 @@ public class TenantRepository { return tenants.containsKey(tenant); } + /** Returns the tenant with the given name, or {@code null} if this does not exist. */ public Tenant getTenant(TenantName tenantName) { return tenants.get(tenantName); } diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/application/ApplicationCuratorDatabaseTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/application/ApplicationCuratorDatabaseTest.java index 0c4472398c1..13803f29794 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/application/ApplicationCuratorDatabaseTest.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/application/ApplicationCuratorDatabaseTest.java @@ -22,12 +22,10 @@ public class ApplicationCuratorDatabaseTest { assertEquals(Optional.empty(), db.readReindexingStatus(id)); - ApplicationReindexing reindexing = ApplicationReindexing.ready(Instant.EPOCH) - .withReady(Instant.ofEpochMilli(1 << 20)) + ApplicationReindexing reindexing = ApplicationReindexing.empty() .withPending("one", "a", 10) .withReady("two", "b", Instant.ofEpochMilli(2)) .withPending("two", "b", 20) - .withReady("two", Instant.ofEpochMilli(2 << 10)) .withReady("one", "a", Instant.ofEpochMilli(1)) .withReady("two", "c", Instant.ofEpochMilli(3)) .enabled(false); diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/application/ApplicationReindexingTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/application/ApplicationReindexingTest.java index 128ba9c0c9d..e8b71dc5433 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/application/ApplicationReindexingTest.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/application/ApplicationReindexingTest.java @@ -20,55 +20,45 @@ public class ApplicationReindexingTest { @Test public void test() { - ApplicationReindexing reindexing = ApplicationReindexing.ready(Instant.EPOCH) - .withReady(Instant.ofEpochMilli(1 << 20)) + ApplicationReindexing reindexing = ApplicationReindexing.empty() .withPending("one", "a", 10) .withReady("two", "b", Instant.ofEpochMilli(2)) .withPending("two", "b", 20) - .withReady("two", Instant.ofEpochMilli(2 << 10)) .withReady("one", "a", Instant.ofEpochMilli(1)) - .withReady("two", "c", Instant.ofEpochMilli(3)) + .withReady("two", "a", Instant.ofEpochMilli(3)) .withoutPending("one", "a"); - // Document is most specific, and is used. assertEquals(Instant.ofEpochMilli(1), reindexing.status("one", "a").orElseThrow().ready()); + assertEquals(Optional.empty(), + reindexing.status("one", "b")); - // Cluster is most specific, and inherits application's common status. - assertEquals(Instant.ofEpochMilli(1 << 20), - reindexing.status("one", "d").orElseThrow().ready()); - - // Cluster is most specific, and has its own status set. - assertEquals(Instant.ofEpochMilli(2 << 10), - reindexing.status("two", "d").orElseThrow().ready()); - - // Application is most specific, as cluster and documeent are unknown. - assertEquals(Instant.ofEpochMilli(1 << 20), - reindexing.status("three", "a").orElseThrow().ready()); - - // Cluster is most specific, and is used, also when pending reindexing exists for document. - assertEquals(Instant.ofEpochMilli(2 << 10), + assertEquals(Instant.ofEpochMilli(2), reindexing.status("two", "b").orElseThrow().ready()); + assertEquals(Instant.ofEpochMilli(3), + reindexing.status("two", "a").orElseThrow().ready()); - assertEquals(new Status(Instant.ofEpochMilli(1 << 20)), - reindexing.common()); + assertEquals(Optional.empty(), + reindexing.status("three", "a")); + // Remove "a" in "one", and "one" entirely. + assertEquals(Optional.empty(), + reindexing.without("one", "a").status("one", "a")); + assertEquals(Optional.empty(), + reindexing.without("one").status("one", "a")); + + // Verify content of "reindexing". assertEquals(Set.of("one", "two"), reindexing.clusters().keySet()); - assertEquals(new Status(Instant.ofEpochMilli(1 << 20)), - reindexing.clusters().get("one").common()); - assertEquals(Map.of("a", new Status(Instant.ofEpochMilli(1))), reindexing.clusters().get("one").ready()); assertEquals(Map.of(), reindexing.clusters().get("one").pending()); - assertEquals(new Status(Instant.ofEpochMilli(2 << 10)), - reindexing.clusters().get("two").common()); - - assertEquals(Map.of("c", new Status(Instant.ofEpochMilli(3))), + assertEquals(Map.of("a", new Status(Instant.ofEpochMilli(3)), + "b", new Status(Instant.ofEpochMilli(2))), reindexing.clusters().get("two").ready()); assertEquals(Map.of("b", 20L), diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/deploy/HostedDeployTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/deploy/HostedDeployTest.java index 341fa7109da..bb6c85961a5 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/deploy/HostedDeployTest.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/deploy/HostedDeployTest.java @@ -411,7 +411,7 @@ public class HostedDeployTest { assertEquals(4, tester.getAllocatedHostsOf(tester.applicationId()).getHosts().size()); assertTrue(prepareResult.configChangeActions().getRestartActions().isEmpty()); // Handled by deployment. - assertEquals(Optional.of(ApplicationReindexing.ready(clock.instant()) + assertEquals(Optional.of(ApplicationReindexing.empty() .withPending("cluster", "music", prepareResult.sessionId())), tester.tenant().getApplicationRepo().database().readReindexingStatus(tester.applicationId())); } diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandlerTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandlerTest.java index 714526b24d4..293219eade3 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandlerTest.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandlerTest.java @@ -53,6 +53,9 @@ import java.time.Duration; import java.time.Instant; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; import java.util.stream.Stream; import static com.yahoo.config.model.api.container.ContainerServiceType.CLUSTERCONTROLLER_CONTAINER; @@ -214,10 +217,10 @@ public class ApplicationHandlerTest { @Test public void testReindex() throws Exception { ApplicationCuratorDatabase database = applicationRepository.getTenant(applicationId).getApplicationRepo().database(); - reindexing(applicationId, GET, "{\"error-code\": \"NOT_FOUND\", \"message\": \"Reindexing status not found for default.default\"}", 404); + reindexing(applicationId, GET, "{\"error-code\": \"NOT_FOUND\", \"message\": \"Application 'default.default' not found\"}", 404); applicationRepository.deploy(testAppMultipleClusters, prepareParams(applicationId)); - ApplicationReindexing expected = ApplicationReindexing.ready(clock.instant()); + ApplicationReindexing expected = ApplicationReindexing.empty(); assertEquals(expected, database.readReindexingStatus(applicationId).orElseThrow()); @@ -453,9 +456,8 @@ public class ApplicationHandlerTest { @Test public void testReindexingSerialization() throws IOException { Instant now = Instant.ofEpochMilli(123456); - ApplicationReindexing applicationReindexing = ApplicationReindexing.ready(now.minusSeconds(10)) + ApplicationReindexing applicationReindexing = ApplicationReindexing.empty() .withPending("foo", "bar", 123L) - .withReady("moo", now.minusSeconds(1)) .withReady("moo", "baz", now); ClusterReindexing clusterReindexing = new ClusterReindexing(Map.of("bax", new Status(now, null, null, null, null), "baz", new Status(now.plusSeconds(1), @@ -463,7 +465,12 @@ public class ApplicationHandlerTest { ClusterReindexing.State.FAILED, "message", 0.1))); - assertJsonEquals(getRenderedString(new ReindexingResponse(applicationReindexing, + Map> documentTypes = new TreeMap<>(Map.of("boo", new TreeSet<>(Set.of("bar", "baz", "bax")), + "foo", new TreeSet<>(Set.of("bar", "hax")), + "moo", new TreeSet<>(Set.of("baz", "bax")))); + + assertJsonEquals(getRenderedString(new ReindexingResponse(documentTypes, + applicationReindexing, Map.of("boo", clusterReindexing, "moo", clusterReindexing))), "{\n" + @@ -472,6 +479,7 @@ public class ApplicationHandlerTest { " \"boo\": {\n" + " \"pending\": {},\n" + " \"ready\": {\n" + + " \"bar\": {},\n" + " \"bax\": {\n" + " \"startedMillis\": 123456\n" + " },\n" + @@ -488,11 +496,17 @@ public class ApplicationHandlerTest { " \"pending\": {\n" + " \"bar\": 123\n" + " },\n" + - " \"ready\": {},\n" + + " \"ready\": {\n" + + " \"bar\": {},\n" + + " \"hax\": {}\n" + + " },\n" + " },\n" + " \"moo\": {\n" + " \"pending\": {},\n" + " \"ready\": {\n" + + " \"bax\": {\n" + + " \"startedMillis\": 123456\n" + + " },\n" + " \"baz\": {\n" + " \"readyMillis\": 123456,\n" + " \"startedMillis\": 124456,\n" + @@ -500,9 +514,6 @@ public class ApplicationHandlerTest { " \"state\": \"failed\",\n" + " \"message\": \"message\",\n" + " \"progress\": 0.1\n" + - " },\n" + - " \"bax\": {\n" + - " \"startedMillis\": 123456\n" + " }\n" + " }\n" + " }\n" + diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/maintenance/ReindexingMaintainerTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/maintenance/ReindexingMaintainerTest.java index d0a4cd59dbd..dc4932c8c04 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/maintenance/ReindexingMaintainerTest.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/maintenance/ReindexingMaintainerTest.java @@ -19,12 +19,10 @@ public class ReindexingMaintainerTest { @Test public void testReadyComputation() { - ApplicationReindexing reindexing = ApplicationReindexing.ready(Instant.EPOCH) + ApplicationReindexing reindexing = ApplicationReindexing.empty() .withPending("one", "a", 10) .withPending("two", "b", 20) - .withReady("one", Instant.ofEpochMilli(2)) .withReady("one", "a", Instant.ofEpochMilli(3)) - .withReady("two", Instant.ofEpochMilli(2 << 10)) .withReady("two", "b", Instant.ofEpochMilli(2)) .withReady("two", "c", Instant.ofEpochMilli(3)); @@ -39,9 +37,10 @@ public class ReindexingMaintainerTest { withNewReady(reindexing, () -> 19L, later)); assertEquals(reindexing.withoutPending("one", "a") // Converged, no longer pending. + .withReady("one", "a", later) .withoutPending("two", "b") // Converged, no Longer pending. - .withReady(later), // Outsider calls withReady(later), overriding more specific status. - withNewReady(reindexing, () -> 20L, later).withReady(later)); + .withReady("two", "b", later), + withNewReady(reindexing, () -> 20L, later)); // Verify generation supplier isn't called when no pending document types. withNewReady(reindexing.withoutPending("one", "a").withoutPending("two", "b"), @@ -51,7 +50,7 @@ public class ReindexingMaintainerTest { @Test public void testGarbageRemoval() { - ApplicationReindexing reindexing = ApplicationReindexing.ready(Instant.EPOCH) + ApplicationReindexing reindexing = ApplicationReindexing.empty() .withPending("one", "a", 10) .withPending("two", "b", 20) .withReady("one", "a", Instant.ofEpochMilli(3)) @@ -67,13 +66,13 @@ public class ReindexingMaintainerTest { withOnlyCurrentData(reindexing, Map.of("one", List.of("a"), "two", List.of("b", "c")))); - assertEquals(ApplicationReindexing.ready(Instant.EPOCH) + assertEquals(ApplicationReindexing.empty() .withPending("two", "b", 20) .withReady("two", "b", Instant.ofEpochMilli(2)), withOnlyCurrentData(reindexing, Map.of("two", List.of("a", "b")))); - assertEquals(ApplicationReindexing.ready(Instant.EPOCH) - .withReady("one", Instant.EPOCH) + assertEquals(ApplicationReindexing.empty() + .withReady("one", "a", Instant.EPOCH).without("one", "a") .withReady("two", "c", Instant.ofEpochMilli(3)), withOnlyCurrentData(reindexing, Map.of("one", List.of("c"), "two", List.of("c")))); -- cgit v1.2.3