diff options
7 files changed, 322 insertions, 0 deletions
diff --git a/config-model-api/src/main/java/com/yahoo/config/model/api/Reindexing.java b/config-model-api/src/main/java/com/yahoo/config/model/api/Reindexing.java new file mode 100644 index 00000000000..0d37f6810a7 --- /dev/null +++ b/config-model-api/src/main/java/com/yahoo/config/model/api/Reindexing.java @@ -0,0 +1,26 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.config.model.api; + +import java.time.Instant; +import java.util.Map; + +/** + * Instants after which reindexing should be triggered, for select document types. + * + * @author jonmv + */ +public interface Reindexing { + + /** The reindexing status for each document type for which this is known. */ + default Map<String, ? extends Status> status() { return Map.of(); } + + + /** Reindexing status of a given document type. */ + interface Status { + + /** The instant at which reindexing of this document type may begin. */ + default Instant ready() { return Instant.MAX; }; + + } + +} diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ApplicationCuratorDatabase.java b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ApplicationCuratorDatabase.java new file mode 100644 index 00000000000..c4cc1cefc6e --- /dev/null +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ApplicationCuratorDatabase.java @@ -0,0 +1,94 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.config.server.application; + +import com.yahoo.config.provision.ApplicationId; +import com.yahoo.config.provision.TenantName; +import com.yahoo.path.Path; +import com.yahoo.slime.Cursor; +import com.yahoo.slime.Slime; +import com.yahoo.slime.SlimeUtils; +import com.yahoo.vespa.config.server.application.ReindexingStatus.Status; +import com.yahoo.vespa.config.server.tenant.TenantRepository; +import com.yahoo.vespa.curator.Curator; +import com.yahoo.yolean.Exceptions; + +import java.time.Instant; + +import static java.util.stream.Collectors.toUnmodifiableMap; + +/** + * Stores data and holds locks for application, backed by a {@link Curator}. + * + * @author jonmv + */ +public class ApplicationCuratorDatabase { + + private final Curator curator; + + public ApplicationCuratorDatabase(Curator curator) { + this.curator = curator; + } + + public ReindexingStatus readReindexingStatus(ApplicationId id) { + return curator.getData(reindexingDataPath(id)) + .map(data -> ReindexingStatusSerializer.fromBytes(data)) + .orElse(ReindexingStatus.empty()); + } + + public void writeReindexingStatus(ApplicationId id, ReindexingStatus status) { + curator.set(reindexingDataPath(id), ReindexingStatusSerializer.toBytes(status)); + } + + private static Path applicationsRoot(TenantName tenant) { + return TenantRepository.getApplicationsPath(tenant); + } + + private static Path applicationPath(ApplicationId id) { + return applicationsRoot(id.tenant()).append(id.serializedForm()); + } + + private static Path reindexingDataPath(ApplicationId id) { + return applicationPath(id).append("reindexing"); + } + + + private static class ReindexingStatusSerializer { + + private static final String PENDING = "pending"; + private static final String READY = "ready"; + private static final String TYPE = "type"; + private static final String GENERATION = "generation"; + private static final String EPOCH_MILLIS = "epochMillis"; + + private static byte[] toBytes(ReindexingStatus reindexingStatus) { + Cursor root = new Slime().setObject(); + Cursor pendingArray = root.setArray(PENDING); + reindexingStatus.pending().forEach((type, generation) -> { + Cursor pendingObject = pendingArray.addObject(); + pendingObject.setString(TYPE, type); + pendingObject.setLong(GENERATION, generation); + }); + Cursor readyArray = root.setArray(READY); + reindexingStatus.status().forEach((type, status) -> { + Cursor readyObject = readyArray.addObject(); + readyObject.setString(TYPE, type); + readyObject.setLong(EPOCH_MILLIS, status.ready().toEpochMilli()); + }); + return Exceptions.uncheck(() -> SlimeUtils.toJsonBytes(root)); + } + + private static ReindexingStatus fromBytes(byte[] data) { + Cursor root = SlimeUtils.jsonToSlimeOrThrow(data).get(); + return new ReindexingStatus(SlimeUtils.entriesStream(root.field(PENDING)) + .filter(entry -> entry.field(TYPE).valid() && entry.field(GENERATION).valid()) + .collect(toUnmodifiableMap(entry -> entry.field(TYPE).asString(), + entry -> entry.field(GENERATION).asLong())), + SlimeUtils.entriesStream(root.field(READY)) + .filter(entry -> entry.field(TYPE).valid() && entry.field(EPOCH_MILLIS).valid()) + .collect(toUnmodifiableMap(entry -> entry.field(TYPE).asString(), + entry -> new Status(Instant.ofEpochMilli(entry.field(EPOCH_MILLIS).asLong()))))); + } + + } + +} diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ReindexingStatus.java b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ReindexingStatus.java new file mode 100644 index 00000000000..ead1ec88474 --- /dev/null +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ReindexingStatus.java @@ -0,0 +1,132 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.config.server.application; + +import com.yahoo.config.model.api.Reindexing; + +import java.time.Instant; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Stream; + +import static java.util.stream.Collectors.toUnmodifiableMap; + +/** + * Pending and ready reindexing per document type. Each document type can have either a pending or a ready reindexing. + * This is immutable. + * + * @author jonmv + */ +public class ReindexingStatus implements Reindexing { + + private static final ReindexingStatus empty = new ReindexingStatus(Map.of(), Map.of()); + + private final Map<String, Long> pending; + private final Map<String, Status> ready; + + ReindexingStatus(Map<String, Long> pending, Map<String, Status> ready) { + this.pending = Map.copyOf(pending); + this.ready = ready; + } + + /** No reindexing pending or ready. */ + public static ReindexingStatus empty() { + return empty; + } + + /** Returns a copy of this with a pending reindexing at the given generation, for the given document type. */ + public ReindexingStatus withPending(String documentType, long requiredGeneration) { + return new ReindexingStatus(with(documentType, requirePositive(requiredGeneration), pending), + without(documentType, ready)); + } + + /** Returns a copy of this with reindexing for the given document type set ready at the given instant. */ + public ReindexingStatus withReady(String documentType, Instant readyAt) { + return new ReindexingStatus(without(documentType, pending), + with(documentType, new Status(readyAt), ready)); + } + + /** The config generation at which the application must have converged for the latest reindexing to begin, per document type. */ + public Map<String, Long> pending() { + return pending; + } + + @Override + public Map<String, ? extends Reindexing.Status> status() { + return ready; + } + + private static long requirePositive(long generation) { + if (generation <= 0) + throw new IllegalArgumentException("Generation must be positive, but was " + generation); + + return generation; + } + + private static <T> Map<String, T> without(String removed, Map<String, T> map) { + return map.keySet().stream() + .filter(key -> ! removed.equals(key)) + .collect(toUnmodifiableMap(key -> key, + key -> map.get(key))); + } + + private static <T> Map<String, T> with(String added, T value, Map<String, T> map) { + return Stream.concat(Stream.of(added), map.keySet().stream()).distinct() + .collect(toUnmodifiableMap(key -> key, + key -> added.equals(key) ? value + : map.get(key))); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ReindexingStatus that = (ReindexingStatus) o; + return pending.equals(that.pending) && + ready.equals(that.ready); + } + + @Override + public int hashCode() { + return Objects.hash(pending, ready); + } + + @Override + public String toString() { + return "ReindexingStatus{" + + "pending=" + pending + + ", ready=" + ready + + '}'; + } + + static class Status implements Reindexing.Status { + + private final Instant ready; + + Status(Instant ready) { + this.ready = Objects.requireNonNull(ready); + } + + @Override + public Instant ready() { return ready; } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Status status = (Status) o; + return ready.equals(status.ready); + } + + @Override + public int hashCode() { + return Objects.hash(ready); + } + + @Override + public String toString() { + return "ready at " + ready; + } + + } + +} diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/application/TenantApplications.java b/configserver/src/main/java/com/yahoo/vespa/config/server/application/TenantApplications.java index 844964fb57c..a01ce2e2cc3 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/application/TenantApplications.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/application/TenantApplications.java @@ -232,6 +232,7 @@ public class TenantApplications implements RequestHandler, HostValidator<Applica log.log(Level.FINE, TenantRepository.logPre(applicationId) + "Application added: " + applicationId); } + // TODO jonmv: Move curator stuff to ApplicationCuratorDatabase private Path applicationPath(ApplicationId id) { return applicationsPath.append(id.serializedForm()); } diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/application/ApplicationCuratorDatabaseTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/application/ApplicationCuratorDatabaseTest.java new file mode 100644 index 00000000000..d9a4f65de66 --- /dev/null +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/application/ApplicationCuratorDatabaseTest.java @@ -0,0 +1,34 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.config.server.application; + +import com.yahoo.config.provision.ApplicationId; +import com.yahoo.vespa.curator.mock.MockCurator; +import org.junit.Assert; +import org.junit.Test; + +import java.time.Instant; + +import static org.junit.Assert.assertEquals; + +/** + * @author jonmv + */ +public class ApplicationCuratorDatabaseTest { + + @Test + public void testReindexingStatusSerialization() { + ApplicationId id = ApplicationId.defaultId(); + ApplicationCuratorDatabase db = new ApplicationCuratorDatabase(new MockCurator()); + + assertEquals(ReindexingStatus.empty(), db.readReindexingStatus(id)); + + ReindexingStatus status = ReindexingStatus.empty() + .withPending("pending1", 1) + .withPending("pending2", 2) + .withReady("ready1", Instant.ofEpochMilli(123)) + .withReady("ready2", Instant.ofEpochMilli(321)); + db.writeReindexingStatus(id, status); + assertEquals(status, db.readReindexingStatus(id)); + } + +} diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/application/ReindexingStatusTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/application/ReindexingStatusTest.java new file mode 100644 index 00000000000..2f09b5afba4 --- /dev/null +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/application/ReindexingStatusTest.java @@ -0,0 +1,34 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.config.server.application; + +import org.junit.Test; + +import java.time.Instant; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +/** + * @author jonmv + */ +public class ReindexingStatusTest { + + @Test + public void test() { + ReindexingStatus status = ReindexingStatus.empty() + .withPending("one", 1) + .withPending("two", 2) + .withReady("two", Instant.EPOCH) + .withPending("three", 2) + .withReady("three", Instant.EPOCH) + .withPending("three", 3) + .withReady("four", Instant.MIN) + .withReady("four", Instant.MAX); + assertEquals(Map.of("one", 1L, + "three", 3L), status.pending()); + assertEquals(Map.of("two", new ReindexingStatus.Status(Instant.EPOCH), + "four", new ReindexingStatus.Status(Instant.MAX)), + status.status()); + } + +} diff --git a/vespajlib/src/main/java/com/yahoo/concurrent/maintenance/Maintainer.java b/vespajlib/src/main/java/com/yahoo/concurrent/maintenance/Maintainer.java index b1d6fd99333..242bdb1161e 100644 --- a/vespajlib/src/main/java/com/yahoo/concurrent/maintenance/Maintainer.java +++ b/vespajlib/src/main/java/com/yahoo/concurrent/maintenance/Maintainer.java @@ -19,6 +19,7 @@ import java.util.logging.Logger; * * @author bratseth * @author mpolden + * @author jonmv */ public abstract class Maintainer implements Runnable, AutoCloseable { |