summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJon Marius Venstad <jonmv@users.noreply.github.com>2020-11-02 09:46:01 +0100
committerGitHub <noreply@github.com>2020-11-02 09:46:01 +0100
commitfd229417d0f7bb65fa1b6c428c49dfbbe6f86edc (patch)
treede3ce71217bbd7a3ec2927ca5bd0533c5ce5d4ed
parentbd7181b95f36434052b73a95fc8202995551f8cf (diff)
parentfe43131104ac22158a65c6e6c040e23a76d16101 (diff)
Merge pull request #15087 from vespa-engine/jonmv/reindexing-data-stores
Jonmv/reindexing data stores
-rw-r--r--config-model-api/src/main/java/com/yahoo/config/model/api/Reindexing.java26
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/ApplicationRepository.java4
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/application/ApplicationCuratorDatabase.java94
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/application/ConfigConvergenceChecker.java16
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/application/ReindexingStatus.java132
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/application/TenantApplications.java1
-rw-r--r--configserver/src/test/java/com/yahoo/vespa/config/server/application/ApplicationCuratorDatabaseTest.java34
-rw-r--r--configserver/src/test/java/com/yahoo/vespa/config/server/application/ConfigConvergenceCheckerTest.java10
-rw-r--r--configserver/src/test/java/com/yahoo/vespa/config/server/application/ReindexingStatusTest.java34
-rw-r--r--vespajlib/src/main/java/com/yahoo/concurrent/maintenance/Maintainer.java1
10 files changed, 339 insertions, 13 deletions
diff --git a/config-model-api/src/main/java/com/yahoo/config/model/api/Reindexing.java b/config-model-api/src/main/java/com/yahoo/config/model/api/Reindexing.java
new file mode 100644
index 00000000000..0d37f6810a7
--- /dev/null
+++ b/config-model-api/src/main/java/com/yahoo/config/model/api/Reindexing.java
@@ -0,0 +1,26 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.config.model.api;
+
+import java.time.Instant;
+import java.util.Map;
+
+/**
+ * Instants after which reindexing should be triggered, for select document types.
+ *
+ * @author jonmv
+ */
+public interface Reindexing {
+
+ /** The reindexing status for each document type for which this is known. */
+ default Map<String, ? extends Status> status() { return Map.of(); }
+
+
+ /** Reindexing status of a given document type. */
+ interface Status {
+
+ /** The instant at which reindexing of this document type may begin. */
+ default Instant ready() { return Instant.MAX; };
+
+ }
+
+}
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/ApplicationRepository.java b/configserver/src/main/java/com/yahoo/vespa/config/server/ApplicationRepository.java
index 42c5d80bd9b..5d8cd43cc44 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/ApplicationRepository.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/ApplicationRepository.java
@@ -663,12 +663,12 @@ public class ApplicationRepository implements com.yahoo.config.provision.Deploye
public HttpResponse checkServiceForConfigConvergence(ApplicationId applicationId, String hostAndPort, URI uri,
Duration timeout, Optional<Version> vespaVersion) {
- return convergeChecker.checkService(getApplication(applicationId, vespaVersion), hostAndPort, uri, timeout);
+ return convergeChecker.getServiceConfigGenerationResponse(getApplication(applicationId, vespaVersion), hostAndPort, uri, timeout);
}
public HttpResponse servicesToCheckForConfigConvergence(ApplicationId applicationId, URI uri,
Duration timeoutPerService, Optional<Version> vespaVersion) {
- return convergeChecker.servicesToCheck(getApplication(applicationId, vespaVersion), uri, timeoutPerService);
+ return convergeChecker.getServiceConfigGenerationsResponse(getApplication(applicationId, vespaVersion), uri, timeoutPerService);
}
// ---------------- Logs ----------------------------------------------------------------
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ApplicationCuratorDatabase.java b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ApplicationCuratorDatabase.java
new file mode 100644
index 00000000000..c4cc1cefc6e
--- /dev/null
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ApplicationCuratorDatabase.java
@@ -0,0 +1,94 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.config.server.application;
+
+import com.yahoo.config.provision.ApplicationId;
+import com.yahoo.config.provision.TenantName;
+import com.yahoo.path.Path;
+import com.yahoo.slime.Cursor;
+import com.yahoo.slime.Slime;
+import com.yahoo.slime.SlimeUtils;
+import com.yahoo.vespa.config.server.application.ReindexingStatus.Status;
+import com.yahoo.vespa.config.server.tenant.TenantRepository;
+import com.yahoo.vespa.curator.Curator;
+import com.yahoo.yolean.Exceptions;
+
+import java.time.Instant;
+
+import static java.util.stream.Collectors.toUnmodifiableMap;
+
+/**
+ * Stores data and holds locks for application, backed by a {@link Curator}.
+ *
+ * @author jonmv
+ */
+public class ApplicationCuratorDatabase {
+
+ private final Curator curator;
+
+ public ApplicationCuratorDatabase(Curator curator) {
+ this.curator = curator;
+ }
+
+ public ReindexingStatus readReindexingStatus(ApplicationId id) {
+ return curator.getData(reindexingDataPath(id))
+ .map(data -> ReindexingStatusSerializer.fromBytes(data))
+ .orElse(ReindexingStatus.empty());
+ }
+
+ public void writeReindexingStatus(ApplicationId id, ReindexingStatus status) {
+ curator.set(reindexingDataPath(id), ReindexingStatusSerializer.toBytes(status));
+ }
+
+ private static Path applicationsRoot(TenantName tenant) {
+ return TenantRepository.getApplicationsPath(tenant);
+ }
+
+ private static Path applicationPath(ApplicationId id) {
+ return applicationsRoot(id.tenant()).append(id.serializedForm());
+ }
+
+ private static Path reindexingDataPath(ApplicationId id) {
+ return applicationPath(id).append("reindexing");
+ }
+
+
+ private static class ReindexingStatusSerializer {
+
+ private static final String PENDING = "pending";
+ private static final String READY = "ready";
+ private static final String TYPE = "type";
+ private static final String GENERATION = "generation";
+ private static final String EPOCH_MILLIS = "epochMillis";
+
+ private static byte[] toBytes(ReindexingStatus reindexingStatus) {
+ Cursor root = new Slime().setObject();
+ Cursor pendingArray = root.setArray(PENDING);
+ reindexingStatus.pending().forEach((type, generation) -> {
+ Cursor pendingObject = pendingArray.addObject();
+ pendingObject.setString(TYPE, type);
+ pendingObject.setLong(GENERATION, generation);
+ });
+ Cursor readyArray = root.setArray(READY);
+ reindexingStatus.status().forEach((type, status) -> {
+ Cursor readyObject = readyArray.addObject();
+ readyObject.setString(TYPE, type);
+ readyObject.setLong(EPOCH_MILLIS, status.ready().toEpochMilli());
+ });
+ return Exceptions.uncheck(() -> SlimeUtils.toJsonBytes(root));
+ }
+
+ private static ReindexingStatus fromBytes(byte[] data) {
+ Cursor root = SlimeUtils.jsonToSlimeOrThrow(data).get();
+ return new ReindexingStatus(SlimeUtils.entriesStream(root.field(PENDING))
+ .filter(entry -> entry.field(TYPE).valid() && entry.field(GENERATION).valid())
+ .collect(toUnmodifiableMap(entry -> entry.field(TYPE).asString(),
+ entry -> entry.field(GENERATION).asLong())),
+ SlimeUtils.entriesStream(root.field(READY))
+ .filter(entry -> entry.field(TYPE).valid() && entry.field(EPOCH_MILLIS).valid())
+ .collect(toUnmodifiableMap(entry -> entry.field(TYPE).asString(),
+ entry -> new Status(Instant.ofEpochMilli(entry.field(EPOCH_MILLIS).asLong())))));
+ }
+
+ }
+
+}
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ConfigConvergenceChecker.java b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ConfigConvergenceChecker.java
index 824cc932ecc..6b316c06b54 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ConfigConvergenceChecker.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ConfigConvergenceChecker.java
@@ -70,26 +70,30 @@ public class ConfigConvergenceChecker extends AbstractComponent {
this.stateApiFactory = stateApiFactory;
}
- /** Check all services in given application. Returns the minimum current generation of all services */
- public ServiceListResponse servicesToCheck(Application application, URI requestUrl, Duration timeoutPerService) {
- log.log(Level.FINE, () -> "Finding services to check config convergence for in '" + application);
+ /** Fetches the active config generation for all services in the given application. */
+ public Map<ServiceInfo, Long> getServiceConfigGenerations(Application application, Duration timeoutPerService) {
List<ServiceInfo> servicesToCheck = new ArrayList<>();
application.getModel().getHosts()
.forEach(host -> host.getServices().stream()
.filter(service -> serviceTypesToCheck.contains(service.getServiceType()))
.forEach(service -> getStatePort(service).ifPresent(port -> servicesToCheck.add(service))));
- Map<ServiceInfo, Long> currentGenerations = getServiceGenerations(servicesToCheck, timeoutPerService);
+ return getServiceGenerations(servicesToCheck, timeoutPerService);
+ }
+
+ /** Check all services in given application. Returns the minimum current generation of all services */
+ public ServiceListResponse getServiceConfigGenerationsResponse(Application application, URI requestUrl, Duration timeoutPerService) {
+ Map<ServiceInfo, Long> currentGenerations = getServiceConfigGenerations(application, timeoutPerService);
long currentGeneration = currentGenerations.values().stream().mapToLong(Long::longValue).min().orElse(-1);
return new ServiceListResponse(200, currentGenerations, requestUrl, application.getApplicationGeneration(),
currentGeneration);
}
/** Check service identified by host and port in given application */
- public ServiceResponse checkService(Application application, String hostAndPortToCheck, URI requestUrl, Duration timeout) {
+ public ServiceResponse getServiceConfigGenerationResponse(Application application, String hostAndPortToCheck, URI requestUrl, Duration timeout) {
Long wantedGeneration = application.getApplicationGeneration();
try {
- if (! hostInApplication(application, hostAndPortToCheck))
+ if ( ! hostInApplication(application, hostAndPortToCheck))
return ServiceResponse.createHostNotFoundInAppResponse(requestUrl, hostAndPortToCheck, wantedGeneration);
long currentGeneration = getServiceGeneration(URI.create("http://" + hostAndPortToCheck), timeout);
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ReindexingStatus.java b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ReindexingStatus.java
new file mode 100644
index 00000000000..465fe3a670c
--- /dev/null
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ReindexingStatus.java
@@ -0,0 +1,132 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.config.server.application;
+
+import com.yahoo.config.model.api.Reindexing;
+
+import java.time.Instant;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Stream;
+
+import static java.util.stream.Collectors.toUnmodifiableMap;
+
+/**
+ * Pending and ready reindexing per document type. Each document type can have either a pending or a ready reindexing.
+ * This is immutable.
+ *
+ * @author jonmv
+ */
+public class ReindexingStatus implements Reindexing {
+
+ private static final ReindexingStatus empty = new ReindexingStatus(Map.of(), Map.of());
+
+ private final Map<String, Long> pending;
+ private final Map<String, Status> ready;
+
+ ReindexingStatus(Map<String, Long> pending, Map<String, Status> ready) {
+ this.pending = Map.copyOf(pending);
+ this.ready = Map.copyOf(ready);
+ }
+
+ /** No reindexing pending or ready. */
+ public static ReindexingStatus empty() {
+ return empty;
+ }
+
+ /** Returns a copy of this with a pending reindexing at the given generation, for the given document type. */
+ public ReindexingStatus withPending(String documentType, long requiredGeneration) {
+ return new ReindexingStatus(with(documentType, requirePositive(requiredGeneration), pending),
+ without(documentType, ready));
+ }
+
+ /** Returns a copy of this with reindexing for the given document type set ready at the given instant. */
+ public ReindexingStatus withReady(String documentType, Instant readyAt) {
+ return new ReindexingStatus(without(documentType, pending),
+ with(documentType, new Status(readyAt), ready));
+ }
+
+ /** The config generation at which the application must have converged for the latest reindexing to begin, per document type. */
+ public Map<String, Long> pending() {
+ return pending;
+ }
+
+ @Override
+ public Map<String, ? extends Reindexing.Status> status() {
+ return ready;
+ }
+
+ private static long requirePositive(long generation) {
+ if (generation <= 0)
+ throw new IllegalArgumentException("Generation must be positive, but was " + generation);
+
+ return generation;
+ }
+
+ private static <T> Map<String, T> without(String removed, Map<String, T> map) {
+ return map.keySet().stream()
+ .filter(key -> ! removed.equals(key))
+ .collect(toUnmodifiableMap(key -> key,
+ key -> map.get(key)));
+ }
+
+ private static <T> Map<String, T> with(String added, T value, Map<String, T> map) {
+ return Stream.concat(Stream.of(added), map.keySet().stream()).distinct()
+ .collect(toUnmodifiableMap(key -> key,
+ key -> added.equals(key) ? value
+ : map.get(key)));
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ReindexingStatus that = (ReindexingStatus) o;
+ return pending.equals(that.pending) &&
+ ready.equals(that.ready);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(pending, ready);
+ }
+
+ @Override
+ public String toString() {
+ return "ReindexingStatus{" +
+ "pending=" + pending +
+ ", ready=" + ready +
+ '}';
+ }
+
+ static class Status implements Reindexing.Status {
+
+ private final Instant ready;
+
+ Status(Instant ready) {
+ this.ready = Objects.requireNonNull(ready);
+ }
+
+ @Override
+ public Instant ready() { return ready; }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ Status status = (Status) o;
+ return ready.equals(status.ready);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(ready);
+ }
+
+ @Override
+ public String toString() {
+ return "ready at " + ready;
+ }
+
+ }
+
+}
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/application/TenantApplications.java b/configserver/src/main/java/com/yahoo/vespa/config/server/application/TenantApplications.java
index 844964fb57c..a01ce2e2cc3 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/application/TenantApplications.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/application/TenantApplications.java
@@ -232,6 +232,7 @@ public class TenantApplications implements RequestHandler, HostValidator<Applica
log.log(Level.FINE, TenantRepository.logPre(applicationId) + "Application added: " + applicationId);
}
+ // TODO jonmv: Move curator stuff to ApplicationCuratorDatabase
private Path applicationPath(ApplicationId id) {
return applicationsPath.append(id.serializedForm());
}
diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/application/ApplicationCuratorDatabaseTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/application/ApplicationCuratorDatabaseTest.java
new file mode 100644
index 00000000000..d9a4f65de66
--- /dev/null
+++ b/configserver/src/test/java/com/yahoo/vespa/config/server/application/ApplicationCuratorDatabaseTest.java
@@ -0,0 +1,34 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.config.server.application;
+
+import com.yahoo.config.provision.ApplicationId;
+import com.yahoo.vespa.curator.mock.MockCurator;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.time.Instant;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * @author jonmv
+ */
+public class ApplicationCuratorDatabaseTest {
+
+ @Test
+ public void testReindexingStatusSerialization() {
+ ApplicationId id = ApplicationId.defaultId();
+ ApplicationCuratorDatabase db = new ApplicationCuratorDatabase(new MockCurator());
+
+ assertEquals(ReindexingStatus.empty(), db.readReindexingStatus(id));
+
+ ReindexingStatus status = ReindexingStatus.empty()
+ .withPending("pending1", 1)
+ .withPending("pending2", 2)
+ .withReady("ready1", Instant.ofEpochMilli(123))
+ .withReady("ready2", Instant.ofEpochMilli(321));
+ db.writeReindexingStatus(id, status);
+ assertEquals(status, db.readReindexingStatus(id));
+ }
+
+}
diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/application/ConfigConvergenceCheckerTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/application/ConfigConvergenceCheckerTest.java
index 1f034a92cbb..6aeb774d2b0 100644
--- a/configserver/src/test/java/com/yahoo/vespa/config/server/application/ConfigConvergenceCheckerTest.java
+++ b/configserver/src/test/java/com/yahoo/vespa/config/server/application/ConfigConvergenceCheckerTest.java
@@ -77,7 +77,7 @@ public class ConfigConvergenceCheckerTest {
String serviceName = hostAndPort(this.service);
URI requestUrl = testServer().resolve("/serviceconverge/" + serviceName);
wireMock.stubFor(get(urlEqualTo("/state/v1/config")).willReturn(okJson("{\"config\":{\"generation\":3}}")));
- HttpResponse serviceResponse = checker.checkService(application, hostAndPort(this.service), requestUrl, clientTimeout);
+ HttpResponse serviceResponse = checker.getServiceConfigGenerationResponse(application, hostAndPort(this.service), requestUrl, clientTimeout);
assertResponse("{\n" +
" \"url\": \"" + requestUrl.toString() + "\",\n" +
" \"host\": \"" + hostAndPort(this.service) + "\",\n" +
@@ -92,7 +92,7 @@ public class ConfigConvergenceCheckerTest {
{ // Missing service
String serviceName = "notPresent:1337";
URI requestUrl = testServer().resolve("/serviceconverge/" + serviceName);
- HttpResponse response = checker.checkService(application, "notPresent:1337", requestUrl,clientTimeout);
+ HttpResponse response = checker.getServiceConfigGenerationResponse(application, "notPresent:1337", requestUrl, clientTimeout);
assertResponse("{\n" +
" \"url\": \"" + requestUrl.toString() + "\",\n" +
" \"host\": \"" + serviceName + "\",\n" +
@@ -111,7 +111,7 @@ public class ConfigConvergenceCheckerTest {
URI requestUrl = testServer().resolve("/serviceconverge");
URI serviceUrl = testServer().resolve("/serviceconverge/" + serviceName);
wireMock.stubFor(get(urlEqualTo("/state/v1/config")).willReturn(okJson("{\"config\":{\"generation\":3}}")));
- HttpResponse response = checker.servicesToCheck(application, requestUrl, clientTimeout);
+ HttpResponse response = checker.getServiceConfigGenerationsResponse(application, requestUrl, clientTimeout);
assertResponse("{\n" +
" \"services\": [\n" +
" {\n" +
@@ -148,7 +148,7 @@ public class ConfigConvergenceCheckerTest {
URI requestUrl = testServer().resolve("/serviceconverge");
URI serviceUrl = testServer().resolve("/serviceconverge/" + hostAndPort(service));
URI serviceUrl2 = testServer().resolve("/serviceconverge/" + hostAndPort(service2));
- HttpResponse response = checker.servicesToCheck(application, requestUrl, clientTimeout);
+ HttpResponse response = checker.getServiceConfigGenerationsResponse(application, requestUrl, clientTimeout);
assertResponse("{\n" +
" \"services\": [\n" +
" {\n" +
@@ -182,7 +182,7 @@ public class ConfigConvergenceCheckerTest {
wireMock.stubFor(get(urlEqualTo("/state/v1/config")).willReturn(aResponse()
.withFixedDelay((int) clientTimeout.plus(Duration.ofSeconds(1)).toMillis())
.withBody("response too slow")));
- HttpResponse response = checker.checkService(application, hostAndPort(service), requestUrl, Duration.ofMillis(1));
+ HttpResponse response = checker.getServiceConfigGenerationResponse(application, hostAndPort(service), requestUrl, Duration.ofMillis(1));
// Message contained in a SocketTimeoutException may differ across platforms, so we do a partial match of the response here
assertResponse((responseBody) -> assertTrue("Response matches", responseBody.startsWith(
"{\"url\":\"" + requestUrl.toString() + "\",\"host\":\"" + hostAndPort(requestUrl) +
diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/application/ReindexingStatusTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/application/ReindexingStatusTest.java
new file mode 100644
index 00000000000..2f09b5afba4
--- /dev/null
+++ b/configserver/src/test/java/com/yahoo/vespa/config/server/application/ReindexingStatusTest.java
@@ -0,0 +1,34 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.config.server.application;
+
+import org.junit.Test;
+
+import java.time.Instant;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * @author jonmv
+ */
+public class ReindexingStatusTest {
+
+ @Test
+ public void test() {
+ ReindexingStatus status = ReindexingStatus.empty()
+ .withPending("one", 1)
+ .withPending("two", 2)
+ .withReady("two", Instant.EPOCH)
+ .withPending("three", 2)
+ .withReady("three", Instant.EPOCH)
+ .withPending("three", 3)
+ .withReady("four", Instant.MIN)
+ .withReady("four", Instant.MAX);
+ assertEquals(Map.of("one", 1L,
+ "three", 3L), status.pending());
+ assertEquals(Map.of("two", new ReindexingStatus.Status(Instant.EPOCH),
+ "four", new ReindexingStatus.Status(Instant.MAX)),
+ status.status());
+ }
+
+}
diff --git a/vespajlib/src/main/java/com/yahoo/concurrent/maintenance/Maintainer.java b/vespajlib/src/main/java/com/yahoo/concurrent/maintenance/Maintainer.java
index b1d6fd99333..242bdb1161e 100644
--- a/vespajlib/src/main/java/com/yahoo/concurrent/maintenance/Maintainer.java
+++ b/vespajlib/src/main/java/com/yahoo/concurrent/maintenance/Maintainer.java
@@ -19,6 +19,7 @@ import java.util.logging.Logger;
*
* @author bratseth
* @author mpolden
+ * @author jonmv
*/
public abstract class Maintainer implements Runnable, AutoCloseable {