summaryrefslogtreecommitdiffstats
path: root/configserver
diff options
context:
space:
mode:
authorBjørn Christian Seime <bjorncs@verizonmedia.com>2020-12-01 17:49:00 +0100
committerBjørn Christian Seime <bjorncs@verizonmedia.com>2020-12-01 18:05:41 +0100
commit26429fb170a455760a20e2a9b8a20371c4962f54 (patch)
tree17cdff5af8ed2fd9e170d560bdf568d2e38366b2 /configserver
parent8ed17bd76ac1acb51b6d1e13b3f2414d36aa2aff (diff)
Fetch cluster reindexing status from cluster controllers
Diffstat (limited to 'configserver')
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/ApplicationRepository.java18
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/application/ClusterReindexingStatusClient.java25
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClient.java150
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandler.java2
-rw-r--r--configserver/src/test/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClientTest.java101
5 files changed, 292 insertions, 4 deletions
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/ApplicationRepository.java b/configserver/src/main/java/com/yahoo/vespa/config/server/ApplicationRepository.java
index 6cc05a0f69e..1ffefd1a6b7 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/ApplicationRepository.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/ApplicationRepository.java
@@ -32,8 +32,11 @@ import com.yahoo.transaction.Transaction;
import com.yahoo.vespa.config.server.application.Application;
import com.yahoo.vespa.config.server.application.ApplicationReindexing;
import com.yahoo.vespa.config.server.application.ApplicationSet;
+import com.yahoo.vespa.config.server.application.ClusterReindexing;
+import com.yahoo.vespa.config.server.application.ClusterReindexingStatusClient;
import com.yahoo.vespa.config.server.application.CompressedApplicationInputStream;
import com.yahoo.vespa.config.server.application.ConfigConvergenceChecker;
+import com.yahoo.vespa.config.server.application.DefaultClusterReindexingStatusClient;
import com.yahoo.vespa.config.server.application.FileDistributionStatus;
import com.yahoo.vespa.config.server.application.HttpProxy;
import com.yahoo.vespa.config.server.application.TenantApplications;
@@ -134,6 +137,7 @@ public class ApplicationRepository implements com.yahoo.config.provision.Deploye
private final LogRetriever logRetriever;
private final TesterClient testerClient;
private final Metric metric;
+ private final ClusterReindexingStatusClient clusterReindexingStatusClient;
@Inject
public ApplicationRepository(TenantRepository tenantRepository,
@@ -157,7 +161,8 @@ public class ApplicationRepository implements com.yahoo.config.provision.Deploye
Clock.systemUTC(),
testerClient,
metric,
- flagSource);
+ flagSource,
+ new DefaultClusterReindexingStatusClient());
}
private ApplicationRepository(TenantRepository tenantRepository,
@@ -171,7 +176,8 @@ public class ApplicationRepository implements com.yahoo.config.provision.Deploye
Clock clock,
TesterClient testerClient,
Metric metric,
- FlagSource flagSource) {
+ FlagSource flagSource,
+ ClusterReindexingStatusClient clusterReindexingStatusClient) {
this.tenantRepository = Objects.requireNonNull(tenantRepository);
this.hostProvisioner = Objects.requireNonNull(hostProvisioner);
this.infraDeployer = Objects.requireNonNull(infraDeployer);
@@ -183,6 +189,7 @@ public class ApplicationRepository implements com.yahoo.config.provision.Deploye
this.clock = Objects.requireNonNull(clock);
this.testerClient = Objects.requireNonNull(testerClient);
this.metric = Objects.requireNonNull(metric);
+ this.clusterReindexingStatusClient = clusterReindexingStatusClient;
}
public static class Builder {
@@ -266,7 +273,8 @@ public class ApplicationRepository implements com.yahoo.config.provision.Deploye
clock,
testerClient,
metric,
- flagSource);
+ flagSource,
+ ClusterReindexingStatusClient.DUMMY_INSTANCE);
}
}
@@ -528,6 +536,10 @@ public class ApplicationRepository implements com.yahoo.config.provision.Deploye
CLUSTERCONTROLLER_CONTAINER.serviceName, relativePath);
}
+ public Map<String, ClusterReindexing> getClusterReindexingStatus(ApplicationId applicationId) {
+ return uncheck(() -> clusterReindexingStatusClient.getReindexingStatus(getApplication(applicationId)));
+ }
+
public Long getApplicationGeneration(ApplicationId applicationId) {
return getApplication(applicationId).getApplicationGeneration();
}
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ClusterReindexingStatusClient.java b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ClusterReindexingStatusClient.java
new file mode 100644
index 00000000000..1201bbd4814
--- /dev/null
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ClusterReindexingStatusClient.java
@@ -0,0 +1,25 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.config.server.application;
+
+import com.yahoo.vespa.config.server.modelfactory.ModelResult;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Retrieves reindexing status from content clusters
+ *
+ * @author bjorncs
+ */
+public interface ClusterReindexingStatusClient extends AutoCloseable {
+
+ Map<String, ClusterReindexing> getReindexingStatus(ModelResult application) throws IOException;
+
+ void close();
+
+ ClusterReindexingStatusClient DUMMY_INSTANCE = new ClusterReindexingStatusClient() {
+ @Override public Map<String, ClusterReindexing> getReindexingStatus(ModelResult application) { return Map.of(); }
+ @Override public void close() {}
+ };
+
+}
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClient.java b/configserver/src/main/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClient.java
new file mode 100644
index 00000000000..96873f147f6
--- /dev/null
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClient.java
@@ -0,0 +1,150 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.config.server.application;
+
+import ai.vespa.util.http.VespaAsyncHttpClientBuilder;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.yahoo.concurrent.CompletableFutures;
+import com.yahoo.concurrent.DaemonThreadFactory;
+import com.yahoo.config.model.api.PortInfo;
+import com.yahoo.config.model.api.ServiceInfo;
+import com.yahoo.vespa.applicationmodel.ClusterId;
+import com.yahoo.vespa.config.server.modelfactory.ModelResult;
+import org.apache.hc.client5.http.async.methods.SimpleHttpRequests;
+import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
+import org.apache.hc.client5.http.config.RequestConfig;
+import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.http.HttpStatus;
+import org.apache.hc.core5.reactor.IOReactorConfig;
+import org.apache.hc.core5.util.Timeout;
+
+import java.io.IOException;
+import java.net.URI;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
+
+import static com.yahoo.config.model.api.container.ContainerServiceType.CLUSTERCONTROLLER_CONTAINER;
+import static com.yahoo.yolean.Exceptions.throwUnchecked;
+import static com.yahoo.yolean.Exceptions.uncheck;
+
+/**
+ * Retrieves reindexing status from cluster controllers over HTTP
+ *
+ * @author bjorncs
+ */
+public class DefaultClusterReindexingStatusClient implements ClusterReindexingStatusClient {
+
+ private static final ObjectMapper mapper = new ObjectMapper();
+
+ private final Executor executor =
+ Executors.newSingleThreadExecutor(new DaemonThreadFactory("cluster-controller-reindexing-client-"));
+ private final CloseableHttpAsyncClient httpClient = createHttpClient();
+
+ public DefaultClusterReindexingStatusClient() {
+ httpClient.start();
+ }
+
+ @Override
+ public Map<String, ClusterReindexing> getReindexingStatus(ModelResult application) throws IOException {
+ Map<ClusterId, List<ServiceInfo>> clusters = clusterControllerClusters(application);
+ Map<ClusterId, CompletableFuture<ClusterReindexing>> futureStatusPerCluster = new HashMap<>();
+ clusters.forEach((clusterId, clusterNodes) -> {
+ var parallelRequests = clusterNodes.stream()
+ .map(this::getReindexingStatus)
+ .collect(Collectors.toList());
+ CompletableFuture<ClusterReindexing> combinedRequest = CompletableFutures.firstOf(parallelRequests);
+ futureStatusPerCluster.put(clusterId, combinedRequest);
+ });
+
+ try {
+ Map<String, ClusterReindexing> statusPerCluster = new HashMap<>();
+ futureStatusPerCluster.forEach((clusterId, futureStatus) -> {
+ statusPerCluster.put(clusterId.s(), futureStatus.join());
+ });
+ return statusPerCluster;
+ } catch (Exception e) {
+ throw new IOException("Failed to get reindexing status from cluster controllers: " + e.getMessage(), e);
+ }
+ }
+
+ @Override public void close() { uncheck(() -> httpClient.close()); }
+
+ private CompletableFuture<ClusterReindexing> getReindexingStatus(ServiceInfo service) {
+ URI uri = URI.create(String.format("http://%s:%d/reindexing/v1/status", service.getHostName(), getStatePort(service)));
+ CompletableFuture<SimpleHttpResponse> responsePromise = new CompletableFuture<>();
+ httpClient.execute(SimpleHttpRequests.get(uri), new FutureCallback<>() {
+ @Override public void completed(SimpleHttpResponse result) { responsePromise.complete(result); }
+ @Override public void failed(Exception ex) { responsePromise.completeExceptionally(ex); }
+ @Override public void cancelled() { responsePromise.cancel(false); }
+ });
+ return responsePromise.handleAsync((response, error) -> {
+ if (response != null) {
+ return uncheck(() -> toClusterReindexing(response));
+ } else {
+ throw throwUnchecked(new IOException(String.format("For '%s': %s", uri, error.getMessage()), error));
+ }
+ }, executor);
+ }
+
+ private static ClusterReindexing toClusterReindexing(SimpleHttpResponse response) throws IOException {
+ if (response.getCode() != HttpStatus.SC_OK) throw new IOException("Expected status code 200, got " + response.getCode());
+ if (response.getBody() == null) throw new IOException("Response has no content");
+ return toClusterReindexing(response.getBodyBytes());
+ }
+
+ private static ClusterReindexing toClusterReindexing(byte[] requestBody) throws IOException {
+ JsonNode jsonNode = mapper.readTree(requestBody);
+ Map<String, ClusterReindexing.Status> documentStatuses = new HashMap<>();
+ for (JsonNode statusJson : jsonNode.get("status")) {
+ String type = statusJson.get("type").textValue();
+ Instant startedMillis = Instant.ofEpochMilli(statusJson.get("startedMillis").longValue());
+ Instant endedMillis = Instant.ofEpochMilli(statusJson.get("endedMillis").longValue());
+ String progressToken = statusJson.get("progress").textValue();
+ ClusterReindexing.State state = ClusterReindexing.State.fromString(statusJson.get("state").textValue());
+ String message = statusJson.get("message").textValue();
+ documentStatuses.put(type, new ClusterReindexing.Status(startedMillis, endedMillis, state, message, progressToken));
+ }
+ return new ClusterReindexing(documentStatuses);
+ }
+
+ private static int getStatePort(ServiceInfo service) {
+ return service.getPorts().stream()
+ .filter(port -> port.getTags().contains("state"))
+ .map(PortInfo::getPort)
+ .findAny()
+ .orElseThrow(() -> new IllegalStateException("Cluster controller container has no container port"));
+ }
+
+ private static Map<ClusterId, List<ServiceInfo>> clusterControllerClusters(ModelResult application) {
+ return application.getModel().getHosts().stream()
+ .flatMap(host -> host.getServices().stream())
+ .filter(service -> service.getServiceType().equals(CLUSTERCONTROLLER_CONTAINER.serviceName))
+ .collect(Collectors.groupingBy(service -> new ClusterId(service.getProperty("clustername").get())));
+
+ }
+
+ private static CloseableHttpAsyncClient createHttpClient() {
+ return VespaAsyncHttpClientBuilder
+ .create()
+ .setIOReactorConfig(IOReactorConfig.custom()
+ .setSoTimeout(Timeout.ofSeconds(2))
+ .build())
+ .setDefaultRequestConfig(
+ RequestConfig.custom()
+ .setConnectTimeout(Timeout.ofSeconds(2))
+ .setConnectionRequestTimeout(Timeout.ofSeconds(2))
+ .setResponseTimeout(Timeout.ofSeconds(4))
+ .build())
+ .setUserAgent("cluster-controller-reindexing-client")
+ .build();
+
+ }
+
+}
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandler.java b/configserver/src/main/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandler.java
index 0c56dfd7bd7..0c7d60c5313 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandler.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandler.java
@@ -255,7 +255,7 @@ public class ApplicationHandler extends HttpHandler {
return new ReindexingResponse(tenant.getApplicationRepo().database()
.readReindexingStatus(applicationId)
.orElseThrow(() -> new NotFoundException("Reindexing status not found for " + applicationId)),
- Map.of()); // TODO jonmv/bjorncs: Get status of each cluster and fill in here.
+ applicationRepository.getClusterReindexingStatus(applicationId));
}
private HttpResponse restart(HttpRequest request, ApplicationId applicationId) {
diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClientTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClientTest.java
new file mode 100644
index 00000000000..21894e4a756
--- /dev/null
+++ b/configserver/src/test/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClientTest.java
@@ -0,0 +1,101 @@
+package com.yahoo.vespa.config.server.application;// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+import com.github.tomakehurst.wiremock.junit.WireMockRule;
+import com.yahoo.config.model.api.HostInfo;
+import com.yahoo.config.model.api.Model;
+import com.yahoo.config.model.api.PortInfo;
+import com.yahoo.config.model.api.ServiceInfo;
+import com.yahoo.documentapi.ProgressToken;
+import com.yahoo.vespa.config.server.modelfactory.ModelResult;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import static com.github.tomakehurst.wiremock.client.WireMock.get;
+import static com.github.tomakehurst.wiremock.client.WireMock.okJson;
+import static com.github.tomakehurst.wiremock.client.WireMock.serverError;
+import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
+import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.options;
+import static com.yahoo.config.model.api.container.ContainerServiceType.CLUSTERCONTROLLER_CONTAINER;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * @author bjorncs
+ */
+public class DefaultClusterReindexingStatusClientTest {
+
+ @Rule public final WireMockRule server1 = new WireMockRule(options().dynamicPort(), true);
+ @Rule public final WireMockRule server2 = new WireMockRule(options().dynamicPort(), true);
+ @Rule public final WireMockRule server3 = new WireMockRule(options().dynamicPort(), true);
+
+ @Test
+ public void combines_result_from_multiple_cluster_controller_clusters() throws IOException {
+ var client = new DefaultClusterReindexingStatusClient();
+ MockApplication app = new MockApplication();
+ String uriPath = "/reindexing/v1/status";
+ server1.stubFor(get(urlEqualTo(uriPath)).willReturn(serverError()));
+ server2.stubFor(get(urlEqualTo(uriPath)).willReturn(okJson(
+ "{\"status\":[{" +
+ "\"type\":\"music\"," +
+ "\"startedMillis\":0," +
+ "\"endedMillis\":123," +
+ "\"progress\":\"" + new ProgressToken().serializeToString() + "\"," +
+ "\"state\": \"" + ClusterReindexing.State.FAILED.asString() + "\"," +
+ "\"message\":\"something went wrong\"}" +
+ "]}")));
+ server3.stubFor(get(urlEqualTo(uriPath)).willReturn(okJson(
+ "{\"status\":[{" +
+ "\"type\":\"artist\"," +
+ "\"startedMillis\":10," +
+ "\"endedMillis\":150," +
+ "\"progress\":\"" + new ProgressToken().serializeToString() + "\"," +
+ "\"state\": \"" + ClusterReindexing.State.SUCCESSFUL.asString() + "\"," +
+ "\"message\":\"successs\"}" +
+ "]}")));
+ Map<String, ClusterReindexing> result = client.getReindexingStatus(app);
+ System.out.println(result);
+ }
+
+
+ private class MockApplication implements ModelResult {
+ private final Collection<HostInfo> hosts;
+
+ MockApplication() {
+ this.hosts = createHosts();
+ }
+
+ @Override
+ public Model getModel() {
+ Model model = mock(Model.class);
+ when(model.getHosts()).thenReturn(hosts);
+ return model;
+ }
+
+ private Collection<HostInfo> createHosts() {
+ return List.of(
+ createHostInfo(server1.port(), "cc1.1", "cluster1"),
+ createHostInfo(server2.port(), "cc1.2", "cluster1"),
+ createHostInfo(server3.port(), "cc2.1", "cluster2"));
+ }
+
+ private HostInfo createHostInfo(int serverPort, String serviceName, String clusterId) {
+ return new HostInfo(
+ "localhost",
+ List.of(new ServiceInfo(
+ serviceName,
+ CLUSTERCONTROLLER_CONTAINER.serviceName,
+ List.of(new PortInfo(serverPort, List.of("state"))),
+ Map.of("clustername", clusterId),
+ "myconfigId",
+ "localhost")));
+ }
+
+ }
+
+
+} \ No newline at end of file