diff options
7 files changed, 357 insertions, 207 deletions
diff --git a/config-model-api/src/main/java/com/yahoo/config/model/api/Reindexing.java b/config-model-api/src/main/java/com/yahoo/config/model/api/Reindexing.java index 0d37f6810a7..4dc06eae841 100644 --- a/config-model-api/src/main/java/com/yahoo/config/model/api/Reindexing.java +++ b/config-model-api/src/main/java/com/yahoo/config/model/api/Reindexing.java @@ -5,21 +5,20 @@ import java.time.Instant; import java.util.Map; /** - * Instants after which reindexing should be triggered, for select document types. + * Status of reindexing for the documents of an application. * * @author jonmv */ public interface Reindexing { - /** The reindexing status for each document type for which this is known. */ - default Map<String, ? extends Status> status() { return Map.of(); } + /** Reindexing status for a given application, cluster and document type. */ + default Status status(String cluster, String documentType) { return () -> Instant.MAX; } - - /** Reindexing status of a given document type. */ + /** Reindexing status of a given document type in a given cluster in a given application. */ interface Status { - /** The instant at which reindexing of this document type may begin. */ - default Instant ready() { return Instant.MAX; }; + /** The instant at which reindexing may begin. */ + Instant ready(); } diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ApplicationCuratorDatabase.java b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ApplicationCuratorDatabase.java index c4cc1cefc6e..94eb6eeea57 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ApplicationCuratorDatabase.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ApplicationCuratorDatabase.java @@ -5,9 +5,11 @@ import com.yahoo.config.provision.ApplicationId; import com.yahoo.config.provision.TenantName; import com.yahoo.path.Path; import com.yahoo.slime.Cursor; +import com.yahoo.slime.Inspector; import com.yahoo.slime.Slime; import com.yahoo.slime.SlimeUtils; -import com.yahoo.vespa.config.server.application.ReindexingStatus.Status; +import com.yahoo.vespa.config.server.application.ApplicationReindexing.Cluster; +import com.yahoo.vespa.config.server.application.ApplicationReindexing.Status; import com.yahoo.vespa.config.server.tenant.TenantRepository; import com.yahoo.vespa.curator.Curator; import com.yahoo.yolean.Exceptions; @@ -29,13 +31,13 @@ public class ApplicationCuratorDatabase { this.curator = curator; } - public ReindexingStatus readReindexingStatus(ApplicationId id) { + public ApplicationReindexing readReindexingStatus(ApplicationId id) { return curator.getData(reindexingDataPath(id)) .map(data -> ReindexingStatusSerializer.fromBytes(data)) - .orElse(ReindexingStatus.empty()); + .orElse(ApplicationReindexing.empty()); } - public void writeReindexingStatus(ApplicationId id, ReindexingStatus status) { + public void writeReindexingStatus(ApplicationId id, ApplicationReindexing status) { curator.set(reindexingDataPath(id), ReindexingStatusSerializer.toBytes(status)); } @@ -54,39 +56,66 @@ public class ApplicationCuratorDatabase { private static class ReindexingStatusSerializer { + private static final String COMMON = "common"; + private static final String CLUSTERS = "clusters"; private static final String PENDING = "pending"; private static final String READY = "ready"; private static final String TYPE = "type"; + private static final String NAME = "name"; private static final String GENERATION = "generation"; private static final String EPOCH_MILLIS = "epochMillis"; - private static byte[] toBytes(ReindexingStatus reindexingStatus) { + private static byte[] toBytes(ApplicationReindexing reindexing) { Cursor root = new Slime().setObject(); - Cursor pendingArray = root.setArray(PENDING); - reindexingStatus.pending().forEach((type, generation) -> { - Cursor pendingObject = pendingArray.addObject(); - pendingObject.setString(TYPE, type); - pendingObject.setLong(GENERATION, generation); - }); - Cursor readyArray = root.setArray(READY); - reindexingStatus.status().forEach((type, status) -> { - Cursor readyObject = readyArray.addObject(); - readyObject.setString(TYPE, type); - readyObject.setLong(EPOCH_MILLIS, status.ready().toEpochMilli()); + setStatus(root.setObject(COMMON), reindexing.common()); + + Cursor clustersArray = root.setArray(CLUSTERS); + reindexing.clusters().forEach((name, cluster) -> { + Cursor clusterObject = clustersArray.addObject(); + clusterObject.setString(NAME, name); + setStatus(clusterObject.setObject(COMMON), cluster.common()); + + Cursor pendingArray = clusterObject.setArray(PENDING); + cluster.pending().forEach((type, generation) -> { + Cursor pendingObject = pendingArray.addObject(); + pendingObject.setString(TYPE, type); + pendingObject.setLong(GENERATION, generation); + }); + + Cursor readyArray = clusterObject.setArray(READY); + cluster.ready().forEach((type, status) -> { + Cursor statusObject = readyArray.addObject(); + statusObject.setString(TYPE, type); + setStatus(statusObject, status); + }); }); return Exceptions.uncheck(() -> SlimeUtils.toJsonBytes(root)); } - private static ReindexingStatus fromBytes(byte[] data) { + private static void setStatus(Cursor statusObject, Status status) { + statusObject.setLong(EPOCH_MILLIS, status.ready().toEpochMilli()); + } + + private static ApplicationReindexing fromBytes(byte[] data) { Cursor root = SlimeUtils.jsonToSlimeOrThrow(data).get(); - return new ReindexingStatus(SlimeUtils.entriesStream(root.field(PENDING)) - .filter(entry -> entry.field(TYPE).valid() && entry.field(GENERATION).valid()) - .collect(toUnmodifiableMap(entry -> entry.field(TYPE).asString(), - entry -> entry.field(GENERATION).asLong())), - SlimeUtils.entriesStream(root.field(READY)) - .filter(entry -> entry.field(TYPE).valid() && entry.field(EPOCH_MILLIS).valid()) - .collect(toUnmodifiableMap(entry -> entry.field(TYPE).asString(), - entry -> new Status(Instant.ofEpochMilli(entry.field(EPOCH_MILLIS).asLong()))))); + return new ApplicationReindexing(getStatus(root.field(COMMON)), + SlimeUtils.entriesStream(root.field(CLUSTERS)) + .collect(toUnmodifiableMap(object -> object.field(NAME).asString(), + object -> getCluster(object)))); + } + + private static Cluster getCluster(Inspector object) { + return new Cluster(getStatus(object.field(COMMON)), + SlimeUtils.entriesStream(object.field(PENDING)) + .collect(toUnmodifiableMap(entry -> entry.field(TYPE).asString(), + entry -> entry.field(GENERATION).asLong())), + SlimeUtils.entriesStream(object.field(READY)) + .collect(toUnmodifiableMap(entry -> entry.field(TYPE).asString(), + entry -> getStatus(entry)))); + } + + private static Status getStatus(Inspector statusObject) { + return new Status(Instant.ofEpochMilli(statusObject.field(EPOCH_MILLIS).asLong())); } } diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ApplicationReindexing.java b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ApplicationReindexing.java new file mode 100644 index 00000000000..918aee73874 --- /dev/null +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ApplicationReindexing.java @@ -0,0 +1,231 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.config.server.application; + +import com.yahoo.config.model.api.Reindexing; + +import java.time.Instant; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Stream; + +import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.toUnmodifiableMap; + +/** + * Pending and ready reindexing per document type. Each document type can have either a pending or a ready reindexing. + * Each cluster may also have a global status, which is merged with its document type-specific status, by selecting + * whichever status is ready the latest. The application may also have a global status, which is merged likewise. + * This is immutable. + * + * @author jonmv + */ +public class ApplicationReindexing implements Reindexing { + + private static final ApplicationReindexing empty = new ApplicationReindexing(Status.ALWAYS_READY, Map.of()); + + private final Status common; + private final Map<String, Cluster> clusters; + + public ApplicationReindexing(Status common, Map<String, Cluster> clusters) { + this.common = requireNonNull(common); + this.clusters = Map.copyOf(clusters); + } + + /** No reindexing pending or ready. */ + public static ApplicationReindexing empty() { + return empty; + } + + /** Returns a copy of this with common reindexing for the whole application ready at the given instant. */ + public ApplicationReindexing withReady(Instant readyAt) { + return new ApplicationReindexing(new Status(readyAt), clusters); + } + + /** Returns a copy of this with common reindexing for the given cluster ready at the given instant. */ + public ApplicationReindexing withReady(String cluster, Instant readyAt) { + Cluster current = clusters.getOrDefault(cluster, Cluster.empty); + Cluster modified = new Cluster(new Status(readyAt), current.pending, current.ready); + return new ApplicationReindexing(common, with(cluster, modified, clusters)); + } + + /** Returns a copy of this with reindexing for the given document type in the given cluster ready at the given instant. */ + public ApplicationReindexing withReady(String cluster, String documentType, Instant readyAt) { + Cluster current = clusters.getOrDefault(cluster, Cluster.empty); + Cluster modified = new Cluster(current.common, + without(documentType, current.pending), + with(documentType, new Status(readyAt), current.ready)); + return new ApplicationReindexing(common, with(cluster, modified, clusters)); + } + + /** Returns a copy of this with a pending reindexing at the given generation, for the given document type. */ + public ApplicationReindexing withPending(String cluster, String documentType, long requiredGeneration) { + Cluster current = clusters.getOrDefault(cluster, Cluster.empty); + Cluster modified = new Cluster(current.common, + with(documentType, requirePositive(requiredGeneration), current.pending), + without(documentType, current.ready)); + return new ApplicationReindexing(common, with(cluster, modified, clusters)); + } + + /** The common reindexing status for the whole application. */ + public Status common() { + return common; + } + + /** The reindexing status of each of the clusters of this application. */ + public Map<String, Cluster> clusters() { return clusters; } + + @Override + public Reindexing.Status status(String cluster, String documentType) { + if (clusters.containsKey(cluster)) { + if (clusters.get(cluster).pending().containsKey(documentType)) + return () -> Instant.MAX; + + Status documentStatus = clusters.get(cluster).ready().get(documentType); + Status clusterStatus = clusters.get(cluster).common(); + if (documentStatus == null || documentStatus.ready().isBefore(clusterStatus.ready())) + documentStatus = clusterStatus; + + if (documentStatus.ready().isAfter(common().ready())) + return documentStatus; + } + return common(); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ApplicationReindexing that = (ApplicationReindexing) o; + return common.equals(that.common) && + clusters.equals(that.clusters); + } + + @Override + public int hashCode() { + return Objects.hash(common, clusters); + } + + @Override + public String toString() { + return "ApplicationReindexing{" + + "common=" + common + + ", clusters=" + clusters + + '}'; + } + + + /** Reindexing status for a single content cluster in an application. */ + public static class Cluster { + + private static final Cluster empty = new Cluster(Status.ALWAYS_READY, Map.of(), Map.of()); + + private final Status common; + private final Map<String, Long> pending; + private final Map<String, Status> ready; + + Cluster(Status common, Map<String, Long> pending, Map<String, Status> ready) { + this.common = requireNonNull(common); + this.pending = Map.copyOf(pending); + this.ready = Map.copyOf(ready); + } + + /** The common reindexing status for all document types in this cluster. */ + public Status common() { + return common; + } + + /** The config generation at which the application must have converged for the latest reindexing to begin, per document type. */ + public Map<String, Long> pending() { + return pending; + } + + /** The reindexing status for ready document types in this cluster. */ + public Map<String, Status> ready() { + return ready; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Cluster cluster = (Cluster) o; + return common.equals(cluster.common) && + pending.equals(cluster.pending) && + ready.equals(cluster.ready); + } + + @Override + public int hashCode() { + return Objects.hash(common, pending, ready); + } + + @Override + public String toString() { + return "Cluster{" + + "common=" + common + + ", pending=" + pending + + ", ready=" + ready + + '}'; + } + + } + + + /** Reindexing status common to an application, one of its clusters, or a single document type in a cluster. */ + public static class Status implements Reindexing.Status { + + /** Always ready, i.e., ignored when joining with more specific statuses. */ + private static final Status ALWAYS_READY = new Status(Instant.EPOCH); + + private final Instant ready; + + Status(Instant ready) { + this.ready = requireNonNull(ready); + } + + @Override + public Instant ready() { return ready; } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Status status = (Status) o; + return ready.equals(status.ready); + } + + @Override + public int hashCode() { + return Objects.hash(ready); + } + + @Override + public String toString() { + return "ready at " + ready; + } + + } + + + private static long requirePositive(long generation) { + if (generation <= 0) + throw new IllegalArgumentException("Generation must be positive, but was " + generation); + + return generation; + } + + private static <T> Map<String, T> without(String removed, Map<String, T> map) { + return map.keySet().stream() + .filter(key -> ! removed.equals(key)) + .collect(toUnmodifiableMap(key -> key, + key -> map.get(key))); + } + + private static <T> Map<String, T> with(String added, T value, Map<String, T> map) { + return Stream.concat(Stream.of(added), map.keySet().stream()).distinct() + .collect(toUnmodifiableMap(key -> key, + key -> added.equals(key) ? value + : map.get(key))); + } + +} diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ReindexingStatus.java b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ReindexingStatus.java deleted file mode 100644 index 465fe3a670c..00000000000 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ReindexingStatus.java +++ /dev/null @@ -1,132 +0,0 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.config.server.application; - -import com.yahoo.config.model.api.Reindexing; - -import java.time.Instant; -import java.util.Map; -import java.util.Objects; -import java.util.stream.Stream; - -import static java.util.stream.Collectors.toUnmodifiableMap; - -/** - * Pending and ready reindexing per document type. Each document type can have either a pending or a ready reindexing. - * This is immutable. - * - * @author jonmv - */ -public class ReindexingStatus implements Reindexing { - - private static final ReindexingStatus empty = new ReindexingStatus(Map.of(), Map.of()); - - private final Map<String, Long> pending; - private final Map<String, Status> ready; - - ReindexingStatus(Map<String, Long> pending, Map<String, Status> ready) { - this.pending = Map.copyOf(pending); - this.ready = Map.copyOf(ready); - } - - /** No reindexing pending or ready. */ - public static ReindexingStatus empty() { - return empty; - } - - /** Returns a copy of this with a pending reindexing at the given generation, for the given document type. */ - public ReindexingStatus withPending(String documentType, long requiredGeneration) { - return new ReindexingStatus(with(documentType, requirePositive(requiredGeneration), pending), - without(documentType, ready)); - } - - /** Returns a copy of this with reindexing for the given document type set ready at the given instant. */ - public ReindexingStatus withReady(String documentType, Instant readyAt) { - return new ReindexingStatus(without(documentType, pending), - with(documentType, new Status(readyAt), ready)); - } - - /** The config generation at which the application must have converged for the latest reindexing to begin, per document type. */ - public Map<String, Long> pending() { - return pending; - } - - @Override - public Map<String, ? extends Reindexing.Status> status() { - return ready; - } - - private static long requirePositive(long generation) { - if (generation <= 0) - throw new IllegalArgumentException("Generation must be positive, but was " + generation); - - return generation; - } - - private static <T> Map<String, T> without(String removed, Map<String, T> map) { - return map.keySet().stream() - .filter(key -> ! removed.equals(key)) - .collect(toUnmodifiableMap(key -> key, - key -> map.get(key))); - } - - private static <T> Map<String, T> with(String added, T value, Map<String, T> map) { - return Stream.concat(Stream.of(added), map.keySet().stream()).distinct() - .collect(toUnmodifiableMap(key -> key, - key -> added.equals(key) ? value - : map.get(key))); - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - ReindexingStatus that = (ReindexingStatus) o; - return pending.equals(that.pending) && - ready.equals(that.ready); - } - - @Override - public int hashCode() { - return Objects.hash(pending, ready); - } - - @Override - public String toString() { - return "ReindexingStatus{" + - "pending=" + pending + - ", ready=" + ready + - '}'; - } - - static class Status implements Reindexing.Status { - - private final Instant ready; - - Status(Instant ready) { - this.ready = Objects.requireNonNull(ready); - } - - @Override - public Instant ready() { return ready; } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - Status status = (Status) o; - return ready.equals(status.ready); - } - - @Override - public int hashCode() { - return Objects.hash(ready); - } - - @Override - public String toString() { - return "ready at " + ready; - } - - } - -} diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/application/ApplicationCuratorDatabaseTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/application/ApplicationCuratorDatabaseTest.java index d9a4f65de66..96de2a8e4a4 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/application/ApplicationCuratorDatabaseTest.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/application/ApplicationCuratorDatabaseTest.java @@ -3,7 +3,6 @@ package com.yahoo.vespa.config.server.application; import com.yahoo.config.provision.ApplicationId; import com.yahoo.vespa.curator.mock.MockCurator; -import org.junit.Assert; import org.junit.Test; import java.time.Instant; @@ -20,15 +19,19 @@ public class ApplicationCuratorDatabaseTest { ApplicationId id = ApplicationId.defaultId(); ApplicationCuratorDatabase db = new ApplicationCuratorDatabase(new MockCurator()); - assertEquals(ReindexingStatus.empty(), db.readReindexingStatus(id)); + assertEquals(ApplicationReindexing.empty(), db.readReindexingStatus(id)); - ReindexingStatus status = ReindexingStatus.empty() - .withPending("pending1", 1) - .withPending("pending2", 2) - .withReady("ready1", Instant.ofEpochMilli(123)) - .withReady("ready2", Instant.ofEpochMilli(321)); - db.writeReindexingStatus(id, status); - assertEquals(status, db.readReindexingStatus(id)); + ApplicationReindexing reindexing = ApplicationReindexing.empty() + .withReady(Instant.ofEpochMilli(1 << 20)) + .withPending("one", "a", 10) + .withReady("two", "b", Instant.ofEpochMilli(2)) + .withPending("two", "b", 20) + .withReady("two", Instant.ofEpochMilli(2 << 10)) + .withReady("one", "a", Instant.ofEpochMilli(1)) + .withReady("two", "c", Instant.ofEpochMilli(3)); + + db.writeReindexingStatus(id, reindexing); + assertEquals(reindexing, db.readReindexingStatus(id)); } } diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/application/ApplicationReindexingTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/application/ApplicationReindexingTest.java new file mode 100644 index 00000000000..1b0bad95acc --- /dev/null +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/application/ApplicationReindexingTest.java @@ -0,0 +1,54 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.config.server.application; + +import com.yahoo.vespa.config.server.application.ApplicationReindexing.Status; +import org.junit.Test; + +import java.time.Instant; +import java.util.Map; +import java.util.Set; + +import static org.junit.Assert.assertEquals; + +/** + * @author jonmv + */ +public class ApplicationReindexingTest { + + @Test + public void test() { + ApplicationReindexing reindexing = ApplicationReindexing.empty() + .withReady(Instant.ofEpochMilli(1 << 20)) + .withPending("one", "a", 10) + .withReady("two", "b", Instant.ofEpochMilli(2)) + .withPending("two", "b", 20) + .withReady("two", Instant.ofEpochMilli(2 << 10)) + .withReady("one", "a", Instant.ofEpochMilli(1)) + .withReady("two", "c", Instant.ofEpochMilli(3)); + + assertEquals(new Status(Instant.ofEpochMilli(1 << 20)), + reindexing.common()); + + assertEquals(Set.of("one", "two"), + reindexing.clusters().keySet()); + + assertEquals(new Status(Instant.EPOCH), + reindexing.clusters().get("one").common()); + + assertEquals(Map.of("a", new Status(Instant.ofEpochMilli(1))), + reindexing.clusters().get("one").ready()); + + assertEquals(Map.of(), + reindexing.clusters().get("one").pending()); + + assertEquals(new Status(Instant.ofEpochMilli(2 << 10)), + reindexing.clusters().get("two").common()); + + assertEquals(Map.of("c", new Status(Instant.ofEpochMilli(3))), + reindexing.clusters().get("two").ready()); + + assertEquals(Map.of("b", 20L), + reindexing.clusters().get("two").pending()); + } + +} diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/application/ReindexingStatusTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/application/ReindexingStatusTest.java deleted file mode 100644 index 2f09b5afba4..00000000000 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/application/ReindexingStatusTest.java +++ /dev/null @@ -1,34 +0,0 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.config.server.application; - -import org.junit.Test; - -import java.time.Instant; -import java.util.Map; - -import static org.junit.Assert.assertEquals; - -/** - * @author jonmv - */ -public class ReindexingStatusTest { - - @Test - public void test() { - ReindexingStatus status = ReindexingStatus.empty() - .withPending("one", 1) - .withPending("two", 2) - .withReady("two", Instant.EPOCH) - .withPending("three", 2) - .withReady("three", Instant.EPOCH) - .withPending("three", 3) - .withReady("four", Instant.MIN) - .withReady("four", Instant.MAX); - assertEquals(Map.of("one", 1L, - "three", 3L), status.pending()); - assertEquals(Map.of("two", new ReindexingStatus.Status(Instant.EPOCH), - "four", new ReindexingStatus.Status(Instant.MAX)), - status.status()); - } - -} |