diff options
author | Jon Marius Venstad <jonmv@users.noreply.github.com> | 2020-11-02 09:46:01 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-11-02 09:46:01 +0100 |
commit | fd229417d0f7bb65fa1b6c428c49dfbbe6f86edc (patch) | |
tree | de3ce71217bbd7a3ec2927ca5bd0533c5ce5d4ed | |
parent | bd7181b95f36434052b73a95fc8202995551f8cf (diff) | |
parent | fe43131104ac22158a65c6e6c040e23a76d16101 (diff) |
Merge pull request #15087 from vespa-engine/jonmv/reindexing-data-stores
Jonmv/reindexing data stores
10 files changed, 339 insertions, 13 deletions
diff --git a/config-model-api/src/main/java/com/yahoo/config/model/api/Reindexing.java b/config-model-api/src/main/java/com/yahoo/config/model/api/Reindexing.java new file mode 100644 index 00000000000..0d37f6810a7 --- /dev/null +++ b/config-model-api/src/main/java/com/yahoo/config/model/api/Reindexing.java @@ -0,0 +1,26 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.config.model.api; + +import java.time.Instant; +import java.util.Map; + +/** + * Instants after which reindexing should be triggered, for select document types. + * + * @author jonmv + */ +public interface Reindexing { + + /** The reindexing status for each document type for which this is known. */ + default Map<String, ? extends Status> status() { return Map.of(); } + + + /** Reindexing status of a given document type. */ + interface Status { + + /** The instant at which reindexing of this document type may begin. */ + default Instant ready() { return Instant.MAX; }; + + } + +} diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/ApplicationRepository.java b/configserver/src/main/java/com/yahoo/vespa/config/server/ApplicationRepository.java index 42c5d80bd9b..5d8cd43cc44 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/ApplicationRepository.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/ApplicationRepository.java @@ -663,12 +663,12 @@ public class ApplicationRepository implements com.yahoo.config.provision.Deploye public HttpResponse checkServiceForConfigConvergence(ApplicationId applicationId, String hostAndPort, URI uri, Duration timeout, Optional<Version> vespaVersion) { - return convergeChecker.checkService(getApplication(applicationId, vespaVersion), hostAndPort, uri, timeout); + return convergeChecker.getServiceConfigGenerationResponse(getApplication(applicationId, vespaVersion), hostAndPort, uri, timeout); } public HttpResponse servicesToCheckForConfigConvergence(ApplicationId applicationId, URI uri, Duration timeoutPerService, Optional<Version> vespaVersion) { - return convergeChecker.servicesToCheck(getApplication(applicationId, vespaVersion), uri, timeoutPerService); + return convergeChecker.getServiceConfigGenerationsResponse(getApplication(applicationId, vespaVersion), uri, timeoutPerService); } // ---------------- Logs ---------------------------------------------------------------- diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ApplicationCuratorDatabase.java b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ApplicationCuratorDatabase.java new file mode 100644 index 00000000000..c4cc1cefc6e --- /dev/null +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ApplicationCuratorDatabase.java @@ -0,0 +1,94 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.config.server.application; + +import com.yahoo.config.provision.ApplicationId; +import com.yahoo.config.provision.TenantName; +import com.yahoo.path.Path; +import com.yahoo.slime.Cursor; +import com.yahoo.slime.Slime; +import com.yahoo.slime.SlimeUtils; +import com.yahoo.vespa.config.server.application.ReindexingStatus.Status; +import com.yahoo.vespa.config.server.tenant.TenantRepository; +import com.yahoo.vespa.curator.Curator; +import com.yahoo.yolean.Exceptions; + +import java.time.Instant; + +import static java.util.stream.Collectors.toUnmodifiableMap; + +/** + * Stores data and holds locks for application, backed by a {@link Curator}. + * + * @author jonmv + */ +public class ApplicationCuratorDatabase { + + private final Curator curator; + + public ApplicationCuratorDatabase(Curator curator) { + this.curator = curator; + } + + public ReindexingStatus readReindexingStatus(ApplicationId id) { + return curator.getData(reindexingDataPath(id)) + .map(data -> ReindexingStatusSerializer.fromBytes(data)) + .orElse(ReindexingStatus.empty()); + } + + public void writeReindexingStatus(ApplicationId id, ReindexingStatus status) { + curator.set(reindexingDataPath(id), ReindexingStatusSerializer.toBytes(status)); + } + + private static Path applicationsRoot(TenantName tenant) { + return TenantRepository.getApplicationsPath(tenant); + } + + private static Path applicationPath(ApplicationId id) { + return applicationsRoot(id.tenant()).append(id.serializedForm()); + } + + private static Path reindexingDataPath(ApplicationId id) { + return applicationPath(id).append("reindexing"); + } + + + private static class ReindexingStatusSerializer { + + private static final String PENDING = "pending"; + private static final String READY = "ready"; + private static final String TYPE = "type"; + private static final String GENERATION = "generation"; + private static final String EPOCH_MILLIS = "epochMillis"; + + private static byte[] toBytes(ReindexingStatus reindexingStatus) { + Cursor root = new Slime().setObject(); + Cursor pendingArray = root.setArray(PENDING); + reindexingStatus.pending().forEach((type, generation) -> { + Cursor pendingObject = pendingArray.addObject(); + pendingObject.setString(TYPE, type); + pendingObject.setLong(GENERATION, generation); + }); + Cursor readyArray = root.setArray(READY); + reindexingStatus.status().forEach((type, status) -> { + Cursor readyObject = readyArray.addObject(); + readyObject.setString(TYPE, type); + readyObject.setLong(EPOCH_MILLIS, status.ready().toEpochMilli()); + }); + return Exceptions.uncheck(() -> SlimeUtils.toJsonBytes(root)); + } + + private static ReindexingStatus fromBytes(byte[] data) { + Cursor root = SlimeUtils.jsonToSlimeOrThrow(data).get(); + return new ReindexingStatus(SlimeUtils.entriesStream(root.field(PENDING)) + .filter(entry -> entry.field(TYPE).valid() && entry.field(GENERATION).valid()) + .collect(toUnmodifiableMap(entry -> entry.field(TYPE).asString(), + entry -> entry.field(GENERATION).asLong())), + SlimeUtils.entriesStream(root.field(READY)) + .filter(entry -> entry.field(TYPE).valid() && entry.field(EPOCH_MILLIS).valid()) + .collect(toUnmodifiableMap(entry -> entry.field(TYPE).asString(), + entry -> new Status(Instant.ofEpochMilli(entry.field(EPOCH_MILLIS).asLong()))))); + } + + } + +} diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ConfigConvergenceChecker.java b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ConfigConvergenceChecker.java index 824cc932ecc..6b316c06b54 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ConfigConvergenceChecker.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ConfigConvergenceChecker.java @@ -70,26 +70,30 @@ public class ConfigConvergenceChecker extends AbstractComponent { this.stateApiFactory = stateApiFactory; } - /** Check all services in given application. Returns the minimum current generation of all services */ - public ServiceListResponse servicesToCheck(Application application, URI requestUrl, Duration timeoutPerService) { - log.log(Level.FINE, () -> "Finding services to check config convergence for in '" + application); + /** Fetches the active config generation for all services in the given application. */ + public Map<ServiceInfo, Long> getServiceConfigGenerations(Application application, Duration timeoutPerService) { List<ServiceInfo> servicesToCheck = new ArrayList<>(); application.getModel().getHosts() .forEach(host -> host.getServices().stream() .filter(service -> serviceTypesToCheck.contains(service.getServiceType())) .forEach(service -> getStatePort(service).ifPresent(port -> servicesToCheck.add(service)))); - Map<ServiceInfo, Long> currentGenerations = getServiceGenerations(servicesToCheck, timeoutPerService); + return getServiceGenerations(servicesToCheck, timeoutPerService); + } + + /** Check all services in given application. Returns the minimum current generation of all services */ + public ServiceListResponse getServiceConfigGenerationsResponse(Application application, URI requestUrl, Duration timeoutPerService) { + Map<ServiceInfo, Long> currentGenerations = getServiceConfigGenerations(application, timeoutPerService); long currentGeneration = currentGenerations.values().stream().mapToLong(Long::longValue).min().orElse(-1); return new ServiceListResponse(200, currentGenerations, requestUrl, application.getApplicationGeneration(), currentGeneration); } /** Check service identified by host and port in given application */ - public ServiceResponse checkService(Application application, String hostAndPortToCheck, URI requestUrl, Duration timeout) { + public ServiceResponse getServiceConfigGenerationResponse(Application application, String hostAndPortToCheck, URI requestUrl, Duration timeout) { Long wantedGeneration = application.getApplicationGeneration(); try { - if (! hostInApplication(application, hostAndPortToCheck)) + if ( ! hostInApplication(application, hostAndPortToCheck)) return ServiceResponse.createHostNotFoundInAppResponse(requestUrl, hostAndPortToCheck, wantedGeneration); long currentGeneration = getServiceGeneration(URI.create("http://" + hostAndPortToCheck), timeout); diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ReindexingStatus.java b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ReindexingStatus.java new file mode 100644 index 00000000000..465fe3a670c --- /dev/null +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ReindexingStatus.java @@ -0,0 +1,132 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.config.server.application; + +import com.yahoo.config.model.api.Reindexing; + +import java.time.Instant; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Stream; + +import static java.util.stream.Collectors.toUnmodifiableMap; + +/** + * Pending and ready reindexing per document type. Each document type can have either a pending or a ready reindexing. + * This is immutable. + * + * @author jonmv + */ +public class ReindexingStatus implements Reindexing { + + private static final ReindexingStatus empty = new ReindexingStatus(Map.of(), Map.of()); + + private final Map<String, Long> pending; + private final Map<String, Status> ready; + + ReindexingStatus(Map<String, Long> pending, Map<String, Status> ready) { + this.pending = Map.copyOf(pending); + this.ready = Map.copyOf(ready); + } + + /** No reindexing pending or ready. */ + public static ReindexingStatus empty() { + return empty; + } + + /** Returns a copy of this with a pending reindexing at the given generation, for the given document type. */ + public ReindexingStatus withPending(String documentType, long requiredGeneration) { + return new ReindexingStatus(with(documentType, requirePositive(requiredGeneration), pending), + without(documentType, ready)); + } + + /** Returns a copy of this with reindexing for the given document type set ready at the given instant. */ + public ReindexingStatus withReady(String documentType, Instant readyAt) { + return new ReindexingStatus(without(documentType, pending), + with(documentType, new Status(readyAt), ready)); + } + + /** The config generation at which the application must have converged for the latest reindexing to begin, per document type. */ + public Map<String, Long> pending() { + return pending; + } + + @Override + public Map<String, ? extends Reindexing.Status> status() { + return ready; + } + + private static long requirePositive(long generation) { + if (generation <= 0) + throw new IllegalArgumentException("Generation must be positive, but was " + generation); + + return generation; + } + + private static <T> Map<String, T> without(String removed, Map<String, T> map) { + return map.keySet().stream() + .filter(key -> ! removed.equals(key)) + .collect(toUnmodifiableMap(key -> key, + key -> map.get(key))); + } + + private static <T> Map<String, T> with(String added, T value, Map<String, T> map) { + return Stream.concat(Stream.of(added), map.keySet().stream()).distinct() + .collect(toUnmodifiableMap(key -> key, + key -> added.equals(key) ? value + : map.get(key))); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ReindexingStatus that = (ReindexingStatus) o; + return pending.equals(that.pending) && + ready.equals(that.ready); + } + + @Override + public int hashCode() { + return Objects.hash(pending, ready); + } + + @Override + public String toString() { + return "ReindexingStatus{" + + "pending=" + pending + + ", ready=" + ready + + '}'; + } + + static class Status implements Reindexing.Status { + + private final Instant ready; + + Status(Instant ready) { + this.ready = Objects.requireNonNull(ready); + } + + @Override + public Instant ready() { return ready; } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Status status = (Status) o; + return ready.equals(status.ready); + } + + @Override + public int hashCode() { + return Objects.hash(ready); + } + + @Override + public String toString() { + return "ready at " + ready; + } + + } + +} diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/application/TenantApplications.java b/configserver/src/main/java/com/yahoo/vespa/config/server/application/TenantApplications.java index 844964fb57c..a01ce2e2cc3 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/application/TenantApplications.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/application/TenantApplications.java @@ -232,6 +232,7 @@ public class TenantApplications implements RequestHandler, HostValidator<Applica log.log(Level.FINE, TenantRepository.logPre(applicationId) + "Application added: " + applicationId); } + // TODO jonmv: Move curator stuff to ApplicationCuratorDatabase private Path applicationPath(ApplicationId id) { return applicationsPath.append(id.serializedForm()); } diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/application/ApplicationCuratorDatabaseTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/application/ApplicationCuratorDatabaseTest.java new file mode 100644 index 00000000000..d9a4f65de66 --- /dev/null +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/application/ApplicationCuratorDatabaseTest.java @@ -0,0 +1,34 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.config.server.application; + +import com.yahoo.config.provision.ApplicationId; +import com.yahoo.vespa.curator.mock.MockCurator; +import org.junit.Assert; +import org.junit.Test; + +import java.time.Instant; + +import static org.junit.Assert.assertEquals; + +/** + * @author jonmv + */ +public class ApplicationCuratorDatabaseTest { + + @Test + public void testReindexingStatusSerialization() { + ApplicationId id = ApplicationId.defaultId(); + ApplicationCuratorDatabase db = new ApplicationCuratorDatabase(new MockCurator()); + + assertEquals(ReindexingStatus.empty(), db.readReindexingStatus(id)); + + ReindexingStatus status = ReindexingStatus.empty() + .withPending("pending1", 1) + .withPending("pending2", 2) + .withReady("ready1", Instant.ofEpochMilli(123)) + .withReady("ready2", Instant.ofEpochMilli(321)); + db.writeReindexingStatus(id, status); + assertEquals(status, db.readReindexingStatus(id)); + } + +} diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/application/ConfigConvergenceCheckerTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/application/ConfigConvergenceCheckerTest.java index 1f034a92cbb..6aeb774d2b0 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/application/ConfigConvergenceCheckerTest.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/application/ConfigConvergenceCheckerTest.java @@ -77,7 +77,7 @@ public class ConfigConvergenceCheckerTest { String serviceName = hostAndPort(this.service); URI requestUrl = testServer().resolve("/serviceconverge/" + serviceName); wireMock.stubFor(get(urlEqualTo("/state/v1/config")).willReturn(okJson("{\"config\":{\"generation\":3}}"))); - HttpResponse serviceResponse = checker.checkService(application, hostAndPort(this.service), requestUrl, clientTimeout); + HttpResponse serviceResponse = checker.getServiceConfigGenerationResponse(application, hostAndPort(this.service), requestUrl, clientTimeout); assertResponse("{\n" + " \"url\": \"" + requestUrl.toString() + "\",\n" + " \"host\": \"" + hostAndPort(this.service) + "\",\n" + @@ -92,7 +92,7 @@ public class ConfigConvergenceCheckerTest { { // Missing service String serviceName = "notPresent:1337"; URI requestUrl = testServer().resolve("/serviceconverge/" + serviceName); - HttpResponse response = checker.checkService(application, "notPresent:1337", requestUrl,clientTimeout); + HttpResponse response = checker.getServiceConfigGenerationResponse(application, "notPresent:1337", requestUrl, clientTimeout); assertResponse("{\n" + " \"url\": \"" + requestUrl.toString() + "\",\n" + " \"host\": \"" + serviceName + "\",\n" + @@ -111,7 +111,7 @@ public class ConfigConvergenceCheckerTest { URI requestUrl = testServer().resolve("/serviceconverge"); URI serviceUrl = testServer().resolve("/serviceconverge/" + serviceName); wireMock.stubFor(get(urlEqualTo("/state/v1/config")).willReturn(okJson("{\"config\":{\"generation\":3}}"))); - HttpResponse response = checker.servicesToCheck(application, requestUrl, clientTimeout); + HttpResponse response = checker.getServiceConfigGenerationsResponse(application, requestUrl, clientTimeout); assertResponse("{\n" + " \"services\": [\n" + " {\n" + @@ -148,7 +148,7 @@ public class ConfigConvergenceCheckerTest { URI requestUrl = testServer().resolve("/serviceconverge"); URI serviceUrl = testServer().resolve("/serviceconverge/" + hostAndPort(service)); URI serviceUrl2 = testServer().resolve("/serviceconverge/" + hostAndPort(service2)); - HttpResponse response = checker.servicesToCheck(application, requestUrl, clientTimeout); + HttpResponse response = checker.getServiceConfigGenerationsResponse(application, requestUrl, clientTimeout); assertResponse("{\n" + " \"services\": [\n" + " {\n" + @@ -182,7 +182,7 @@ public class ConfigConvergenceCheckerTest { wireMock.stubFor(get(urlEqualTo("/state/v1/config")).willReturn(aResponse() .withFixedDelay((int) clientTimeout.plus(Duration.ofSeconds(1)).toMillis()) .withBody("response too slow"))); - HttpResponse response = checker.checkService(application, hostAndPort(service), requestUrl, Duration.ofMillis(1)); + HttpResponse response = checker.getServiceConfigGenerationResponse(application, hostAndPort(service), requestUrl, Duration.ofMillis(1)); // Message contained in a SocketTimeoutException may differ across platforms, so we do a partial match of the response here assertResponse((responseBody) -> assertTrue("Response matches", responseBody.startsWith( "{\"url\":\"" + requestUrl.toString() + "\",\"host\":\"" + hostAndPort(requestUrl) + diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/application/ReindexingStatusTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/application/ReindexingStatusTest.java new file mode 100644 index 00000000000..2f09b5afba4 --- /dev/null +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/application/ReindexingStatusTest.java @@ -0,0 +1,34 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.config.server.application; + +import org.junit.Test; + +import java.time.Instant; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +/** + * @author jonmv + */ +public class ReindexingStatusTest { + + @Test + public void test() { + ReindexingStatus status = ReindexingStatus.empty() + .withPending("one", 1) + .withPending("two", 2) + .withReady("two", Instant.EPOCH) + .withPending("three", 2) + .withReady("three", Instant.EPOCH) + .withPending("three", 3) + .withReady("four", Instant.MIN) + .withReady("four", Instant.MAX); + assertEquals(Map.of("one", 1L, + "three", 3L), status.pending()); + assertEquals(Map.of("two", new ReindexingStatus.Status(Instant.EPOCH), + "four", new ReindexingStatus.Status(Instant.MAX)), + status.status()); + } + +} diff --git a/vespajlib/src/main/java/com/yahoo/concurrent/maintenance/Maintainer.java b/vespajlib/src/main/java/com/yahoo/concurrent/maintenance/Maintainer.java index b1d6fd99333..242bdb1161e 100644 --- a/vespajlib/src/main/java/com/yahoo/concurrent/maintenance/Maintainer.java +++ b/vespajlib/src/main/java/com/yahoo/concurrent/maintenance/Maintainer.java @@ -19,6 +19,7 @@ import java.util.logging.Logger; * * @author bratseth * @author mpolden + * @author jonmv */ public abstract class Maintainer implements Runnable, AutoCloseable { |