diff options
author | Bjørn Christian Seime <bjorncs@verizonmedia.com> | 2020-12-01 17:49:00 +0100 |
---|---|---|
committer | Bjørn Christian Seime <bjorncs@verizonmedia.com> | 2020-12-02 10:26:09 +0100 |
commit | a040d7d09ad97c2b06e47e0c44d47882eee16617 (patch) | |
tree | 613c645c28d8e49c903be6381fd8552f5d361ac5 | |
parent | 03c00de5438637108875365dbe949e390f10d4bc (diff) |
Fetch cluster reindexing status from cluster controllers
5 files changed, 292 insertions, 4 deletions
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/ApplicationRepository.java b/configserver/src/main/java/com/yahoo/vespa/config/server/ApplicationRepository.java index 6cc05a0f69e..1ffefd1a6b7 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/ApplicationRepository.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/ApplicationRepository.java @@ -32,8 +32,11 @@ import com.yahoo.transaction.Transaction; import com.yahoo.vespa.config.server.application.Application; import com.yahoo.vespa.config.server.application.ApplicationReindexing; import com.yahoo.vespa.config.server.application.ApplicationSet; +import com.yahoo.vespa.config.server.application.ClusterReindexing; +import com.yahoo.vespa.config.server.application.ClusterReindexingStatusClient; import com.yahoo.vespa.config.server.application.CompressedApplicationInputStream; import com.yahoo.vespa.config.server.application.ConfigConvergenceChecker; +import com.yahoo.vespa.config.server.application.DefaultClusterReindexingStatusClient; import com.yahoo.vespa.config.server.application.FileDistributionStatus; import com.yahoo.vespa.config.server.application.HttpProxy; import com.yahoo.vespa.config.server.application.TenantApplications; @@ -134,6 +137,7 @@ public class ApplicationRepository implements com.yahoo.config.provision.Deploye private final LogRetriever logRetriever; private final TesterClient testerClient; private final Metric metric; + private final ClusterReindexingStatusClient clusterReindexingStatusClient; @Inject public ApplicationRepository(TenantRepository tenantRepository, @@ -157,7 +161,8 @@ public class ApplicationRepository implements com.yahoo.config.provision.Deploye Clock.systemUTC(), testerClient, metric, - flagSource); + flagSource, + new DefaultClusterReindexingStatusClient()); } private ApplicationRepository(TenantRepository tenantRepository, @@ -171,7 +176,8 @@ public class ApplicationRepository implements com.yahoo.config.provision.Deploye Clock clock, TesterClient testerClient, Metric metric, - FlagSource flagSource) { + FlagSource flagSource, + ClusterReindexingStatusClient clusterReindexingStatusClient) { this.tenantRepository = Objects.requireNonNull(tenantRepository); this.hostProvisioner = Objects.requireNonNull(hostProvisioner); this.infraDeployer = Objects.requireNonNull(infraDeployer); @@ -183,6 +189,7 @@ public class ApplicationRepository implements com.yahoo.config.provision.Deploye this.clock = Objects.requireNonNull(clock); this.testerClient = Objects.requireNonNull(testerClient); this.metric = Objects.requireNonNull(metric); + this.clusterReindexingStatusClient = clusterReindexingStatusClient; } public static class Builder { @@ -266,7 +273,8 @@ public class ApplicationRepository implements com.yahoo.config.provision.Deploye clock, testerClient, metric, - flagSource); + flagSource, + ClusterReindexingStatusClient.DUMMY_INSTANCE); } } @@ -528,6 +536,10 @@ public class ApplicationRepository implements com.yahoo.config.provision.Deploye CLUSTERCONTROLLER_CONTAINER.serviceName, relativePath); } + public Map<String, ClusterReindexing> getClusterReindexingStatus(ApplicationId applicationId) { + return uncheck(() -> clusterReindexingStatusClient.getReindexingStatus(getApplication(applicationId))); + } + public Long getApplicationGeneration(ApplicationId applicationId) { return getApplication(applicationId).getApplicationGeneration(); } diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ClusterReindexingStatusClient.java b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ClusterReindexingStatusClient.java new file mode 100644 index 00000000000..1201bbd4814 --- /dev/null +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ClusterReindexingStatusClient.java @@ -0,0 +1,25 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.config.server.application; + +import com.yahoo.vespa.config.server.modelfactory.ModelResult; + +import java.io.IOException; +import java.util.Map; + +/** + * Retrieves reindexing status from content clusters + * + * @author bjorncs + */ +public interface ClusterReindexingStatusClient extends AutoCloseable { + + Map<String, ClusterReindexing> getReindexingStatus(ModelResult application) throws IOException; + + void close(); + + ClusterReindexingStatusClient DUMMY_INSTANCE = new ClusterReindexingStatusClient() { + @Override public Map<String, ClusterReindexing> getReindexingStatus(ModelResult application) { return Map.of(); } + @Override public void close() {} + }; + +} diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClient.java b/configserver/src/main/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClient.java new file mode 100644 index 00000000000..96873f147f6 --- /dev/null +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClient.java @@ -0,0 +1,150 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.config.server.application; + +import ai.vespa.util.http.VespaAsyncHttpClientBuilder; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.yahoo.concurrent.CompletableFutures; +import com.yahoo.concurrent.DaemonThreadFactory; +import com.yahoo.config.model.api.PortInfo; +import com.yahoo.config.model.api.ServiceInfo; +import com.yahoo.vespa.applicationmodel.ClusterId; +import com.yahoo.vespa.config.server.modelfactory.ModelResult; +import org.apache.hc.client5.http.async.methods.SimpleHttpRequests; +import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; +import org.apache.hc.client5.http.config.RequestConfig; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.http.HttpStatus; +import org.apache.hc.core5.reactor.IOReactorConfig; +import org.apache.hc.core5.util.Timeout; + +import java.io.IOException; +import java.net.URI; +import java.time.Instant; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; + +import static com.yahoo.config.model.api.container.ContainerServiceType.CLUSTERCONTROLLER_CONTAINER; +import static com.yahoo.yolean.Exceptions.throwUnchecked; +import static com.yahoo.yolean.Exceptions.uncheck; + +/** + * Retrieves reindexing status from cluster controllers over HTTP + * + * @author bjorncs + */ +public class DefaultClusterReindexingStatusClient implements ClusterReindexingStatusClient { + + private static final ObjectMapper mapper = new ObjectMapper(); + + private final Executor executor = + Executors.newSingleThreadExecutor(new DaemonThreadFactory("cluster-controller-reindexing-client-")); + private final CloseableHttpAsyncClient httpClient = createHttpClient(); + + public DefaultClusterReindexingStatusClient() { + httpClient.start(); + } + + @Override + public Map<String, ClusterReindexing> getReindexingStatus(ModelResult application) throws IOException { + Map<ClusterId, List<ServiceInfo>> clusters = clusterControllerClusters(application); + Map<ClusterId, CompletableFuture<ClusterReindexing>> futureStatusPerCluster = new HashMap<>(); + clusters.forEach((clusterId, clusterNodes) -> { + var parallelRequests = clusterNodes.stream() + .map(this::getReindexingStatus) + .collect(Collectors.toList()); + CompletableFuture<ClusterReindexing> combinedRequest = CompletableFutures.firstOf(parallelRequests); + futureStatusPerCluster.put(clusterId, combinedRequest); + }); + + try { + Map<String, ClusterReindexing> statusPerCluster = new HashMap<>(); + futureStatusPerCluster.forEach((clusterId, futureStatus) -> { + statusPerCluster.put(clusterId.s(), futureStatus.join()); + }); + return statusPerCluster; + } catch (Exception e) { + throw new IOException("Failed to get reindexing status from cluster controllers: " + e.getMessage(), e); + } + } + + @Override public void close() { uncheck(() -> httpClient.close()); } + + private CompletableFuture<ClusterReindexing> getReindexingStatus(ServiceInfo service) { + URI uri = URI.create(String.format("http://%s:%d/reindexing/v1/status", service.getHostName(), getStatePort(service))); + CompletableFuture<SimpleHttpResponse> responsePromise = new CompletableFuture<>(); + httpClient.execute(SimpleHttpRequests.get(uri), new FutureCallback<>() { + @Override public void completed(SimpleHttpResponse result) { responsePromise.complete(result); } + @Override public void failed(Exception ex) { responsePromise.completeExceptionally(ex); } + @Override public void cancelled() { responsePromise.cancel(false); } + }); + return responsePromise.handleAsync((response, error) -> { + if (response != null) { + return uncheck(() -> toClusterReindexing(response)); + } else { + throw throwUnchecked(new IOException(String.format("For '%s': %s", uri, error.getMessage()), error)); + } + }, executor); + } + + private static ClusterReindexing toClusterReindexing(SimpleHttpResponse response) throws IOException { + if (response.getCode() != HttpStatus.SC_OK) throw new IOException("Expected status code 200, got " + response.getCode()); + if (response.getBody() == null) throw new IOException("Response has no content"); + return toClusterReindexing(response.getBodyBytes()); + } + + private static ClusterReindexing toClusterReindexing(byte[] requestBody) throws IOException { + JsonNode jsonNode = mapper.readTree(requestBody); + Map<String, ClusterReindexing.Status> documentStatuses = new HashMap<>(); + for (JsonNode statusJson : jsonNode.get("status")) { + String type = statusJson.get("type").textValue(); + Instant startedMillis = Instant.ofEpochMilli(statusJson.get("startedMillis").longValue()); + Instant endedMillis = Instant.ofEpochMilli(statusJson.get("endedMillis").longValue()); + String progressToken = statusJson.get("progress").textValue(); + ClusterReindexing.State state = ClusterReindexing.State.fromString(statusJson.get("state").textValue()); + String message = statusJson.get("message").textValue(); + documentStatuses.put(type, new ClusterReindexing.Status(startedMillis, endedMillis, state, message, progressToken)); + } + return new ClusterReindexing(documentStatuses); + } + + private static int getStatePort(ServiceInfo service) { + return service.getPorts().stream() + .filter(port -> port.getTags().contains("state")) + .map(PortInfo::getPort) + .findAny() + .orElseThrow(() -> new IllegalStateException("Cluster controller container has no container port")); + } + + private static Map<ClusterId, List<ServiceInfo>> clusterControllerClusters(ModelResult application) { + return application.getModel().getHosts().stream() + .flatMap(host -> host.getServices().stream()) + .filter(service -> service.getServiceType().equals(CLUSTERCONTROLLER_CONTAINER.serviceName)) + .collect(Collectors.groupingBy(service -> new ClusterId(service.getProperty("clustername").get()))); + + } + + private static CloseableHttpAsyncClient createHttpClient() { + return VespaAsyncHttpClientBuilder + .create() + .setIOReactorConfig(IOReactorConfig.custom() + .setSoTimeout(Timeout.ofSeconds(2)) + .build()) + .setDefaultRequestConfig( + RequestConfig.custom() + .setConnectTimeout(Timeout.ofSeconds(2)) + .setConnectionRequestTimeout(Timeout.ofSeconds(2)) + .setResponseTimeout(Timeout.ofSeconds(4)) + .build()) + .setUserAgent("cluster-controller-reindexing-client") + .build(); + + } + +} diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandler.java b/configserver/src/main/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandler.java index 0c56dfd7bd7..0c7d60c5313 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandler.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandler.java @@ -255,7 +255,7 @@ public class ApplicationHandler extends HttpHandler { return new ReindexingResponse(tenant.getApplicationRepo().database() .readReindexingStatus(applicationId) .orElseThrow(() -> new NotFoundException("Reindexing status not found for " + applicationId)), - Map.of()); // TODO jonmv/bjorncs: Get status of each cluster and fill in here. + applicationRepository.getClusterReindexingStatus(applicationId)); } private HttpResponse restart(HttpRequest request, ApplicationId applicationId) { diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClientTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClientTest.java new file mode 100644 index 00000000000..21894e4a756 --- /dev/null +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClientTest.java @@ -0,0 +1,101 @@ +package com.yahoo.vespa.config.server.application;// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +import com.github.tomakehurst.wiremock.junit.WireMockRule; +import com.yahoo.config.model.api.HostInfo; +import com.yahoo.config.model.api.Model; +import com.yahoo.config.model.api.PortInfo; +import com.yahoo.config.model.api.ServiceInfo; +import com.yahoo.documentapi.ProgressToken; +import com.yahoo.vespa.config.server.modelfactory.ModelResult; +import org.junit.Rule; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import static com.github.tomakehurst.wiremock.client.WireMock.get; +import static com.github.tomakehurst.wiremock.client.WireMock.okJson; +import static com.github.tomakehurst.wiremock.client.WireMock.serverError; +import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; +import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.options; +import static com.yahoo.config.model.api.container.ContainerServiceType.CLUSTERCONTROLLER_CONTAINER; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * @author bjorncs + */ +public class DefaultClusterReindexingStatusClientTest { + + @Rule public final WireMockRule server1 = new WireMockRule(options().dynamicPort(), true); + @Rule public final WireMockRule server2 = new WireMockRule(options().dynamicPort(), true); + @Rule public final WireMockRule server3 = new WireMockRule(options().dynamicPort(), true); + + @Test + public void combines_result_from_multiple_cluster_controller_clusters() throws IOException { + var client = new DefaultClusterReindexingStatusClient(); + MockApplication app = new MockApplication(); + String uriPath = "/reindexing/v1/status"; + server1.stubFor(get(urlEqualTo(uriPath)).willReturn(serverError())); + server2.stubFor(get(urlEqualTo(uriPath)).willReturn(okJson( + "{\"status\":[{" + + "\"type\":\"music\"," + + "\"startedMillis\":0," + + "\"endedMillis\":123," + + "\"progress\":\"" + new ProgressToken().serializeToString() + "\"," + + "\"state\": \"" + ClusterReindexing.State.FAILED.asString() + "\"," + + "\"message\":\"something went wrong\"}" + + "]}"))); + server3.stubFor(get(urlEqualTo(uriPath)).willReturn(okJson( + "{\"status\":[{" + + "\"type\":\"artist\"," + + "\"startedMillis\":10," + + "\"endedMillis\":150," + + "\"progress\":\"" + new ProgressToken().serializeToString() + "\"," + + "\"state\": \"" + ClusterReindexing.State.SUCCESSFUL.asString() + "\"," + + "\"message\":\"successs\"}" + + "]}"))); + Map<String, ClusterReindexing> result = client.getReindexingStatus(app); + System.out.println(result); + } + + + private class MockApplication implements ModelResult { + private final Collection<HostInfo> hosts; + + MockApplication() { + this.hosts = createHosts(); + } + + @Override + public Model getModel() { + Model model = mock(Model.class); + when(model.getHosts()).thenReturn(hosts); + return model; + } + + private Collection<HostInfo> createHosts() { + return List.of( + createHostInfo(server1.port(), "cc1.1", "cluster1"), + createHostInfo(server2.port(), "cc1.2", "cluster1"), + createHostInfo(server3.port(), "cc2.1", "cluster2")); + } + + private HostInfo createHostInfo(int serverPort, String serviceName, String clusterId) { + return new HostInfo( + "localhost", + List.of(new ServiceInfo( + serviceName, + CLUSTERCONTROLLER_CONTAINER.serviceName, + List.of(new PortInfo(serverPort, List.of("state"))), + Map.of("clustername", clusterId), + "myconfigId", + "localhost"))); + } + + } + + +}
\ No newline at end of file |