diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2020-12-04 15:50:46 +0100 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2020-12-04 15:50:46 +0100 |
commit | db40d63bc83c57360fba480ee9e3fae8ab0477b2 (patch) | |
tree | 3841c8814ce5e5d7807966373b00fd8956cdbf4f /configserver | |
parent | 5d17334d1d65b45c13c3962831fab03760c34ffb (diff) |
Merge reindexing responses with multiple clusters in client in cfg server
Diffstat (limited to 'configserver')
3 files changed, 128 insertions, 42 deletions
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ClusterReindexing.java b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ClusterReindexing.java index 7f0671820d3..4b7148463f9 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ClusterReindexing.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ClusterReindexing.java @@ -27,6 +27,26 @@ public class ClusterReindexing { public Map<String, Status> documentTypeStatus() { return documentTypeStatus; } + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ClusterReindexing that = (ClusterReindexing) o; + return documentTypeStatus.equals(that.documentTypeStatus); + } + + @Override + public int hashCode() { + return Objects.hash(documentTypeStatus); + } + + @Override + public String toString() { + return "ClusterReindexing{" + + "documentTypeStatus=" + documentTypeStatus + + '}'; + } + public static class Status { @@ -49,6 +69,35 @@ public class ClusterReindexing { public Optional<State> state() { return Optional.ofNullable(state); } public Optional<String> message() { return Optional.ofNullable(message); } public Optional<String> progress() { return Optional.ofNullable(progress); } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Status status = (Status) o; + return startedAt.equals(status.startedAt) && + Objects.equals(endedAt, status.endedAt) && + state == status.state && + Objects.equals(message, status.message) && + Objects.equals(progress, status.progress); + } + + @Override + public int hashCode() { + return Objects.hash(startedAt, endedAt, state, message, progress); + } + + @Override + public String toString() { + return "Status{" + + "startedAt=" + startedAt + + ", endedAt=" + endedAt + + ", state=" + state + + ", message='" + message + '\'' + + ", progress='" + progress + '\'' + + '}'; + } + } @@ -67,5 +116,7 @@ public class ClusterReindexing { } public String asString() { return stringValue; } + } + } diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClient.java b/configserver/src/main/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClient.java index fef0120a431..0124b6822f0 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClient.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClient.java @@ -55,21 +55,21 @@ public class DefaultClusterReindexingStatusClient implements ClusterReindexingSt @Override public Map<String, ClusterReindexing> getReindexingStatus(ModelResult application) throws IOException { Map<ClusterId, List<ServiceInfo>> clusters = clusterControllerClusters(application); - Map<ClusterId, CompletableFuture<ClusterReindexing>> futureStatusPerCluster = new HashMap<>(); + Map<ClusterId, CompletableFuture<Map<String, ClusterReindexing>>> futureStatusPerCluster = new HashMap<>(); clusters.forEach((clusterId, clusterNodes) -> { var parallelRequests = clusterNodes.stream() .map(this::getReindexingStatus) .collect(Collectors.toList()); - CompletableFuture<ClusterReindexing> combinedRequest = CompletableFutures.firstOf(parallelRequests); + CompletableFuture<Map<String, ClusterReindexing>> combinedRequest = CompletableFutures.firstOf(parallelRequests); futureStatusPerCluster.put(clusterId, combinedRequest); }); try { Map<String, ClusterReindexing> statusPerCluster = new HashMap<>(); futureStatusPerCluster.forEach((clusterId, futureStatus) -> { - statusPerCluster.put(clusterId.s(), futureStatus.join()); + statusPerCluster.putAll(futureStatus.join()); }); - return statusPerCluster; + return Map.copyOf(statusPerCluster); } catch (Exception e) { throw new IOException("Failed to get reindexing status from cluster controllers: " + e.getMessage(), e); } @@ -77,7 +77,7 @@ public class DefaultClusterReindexingStatusClient implements ClusterReindexingSt @Override public void close() { uncheck(() -> httpClient.close()); } - private CompletableFuture<ClusterReindexing> getReindexingStatus(ServiceInfo service) { + private CompletableFuture<Map<String, ClusterReindexing>> getReindexingStatus(ServiceInfo service) { URI uri = URI.create(String.format("http://%s:%d/reindexing/v1/status", service.getHostName(), getStatePort(service))); CompletableFuture<SimpleHttpResponse> responsePromise = new CompletableFuture<>(); httpClient.execute(SimpleHttpRequests.get(uri), new FutureCallback<>() { @@ -94,33 +94,40 @@ public class DefaultClusterReindexingStatusClient implements ClusterReindexingSt }, executor); } - private static ClusterReindexing toClusterReindexing(SimpleHttpResponse response) throws IOException { + private static Map<String, ClusterReindexing> toClusterReindexing(SimpleHttpResponse response) throws IOException { if (response.getCode() != HttpStatus.SC_OK) throw new IOException("Expected status code 200, got " + response.getCode()); if (response.getBody() == null) throw new IOException("Response has no content"); return toClusterReindexing(response.getBodyBytes()); } - private static ClusterReindexing toClusterReindexing(byte[] requestBody) throws IOException { + private static Map<String, ClusterReindexing> toClusterReindexing(byte[] requestBody) throws IOException { JsonNode jsonNode = mapper.readTree(requestBody); - Map<String, ClusterReindexing.Status> documentStatuses = new HashMap<>(); - for (JsonNode statusJson : jsonNode.get("status")) { - String type = statusJson.get("type").textValue(); - Instant startedMillis = Instant.ofEpochMilli(statusJson.get("startedMillis").longValue()); - Instant endedMillis = Optional.ofNullable(statusJson.get("endedMillis")) - .map(json -> Instant.ofEpochMilli(json.longValue())) - .orElse(null); - String progressToken = Optional.ofNullable(statusJson.get("progress")) - .map(JsonNode::textValue) - .orElse(null); - ClusterReindexing.State state = Optional.ofNullable(statusJson.get("state")) - .map(json -> ClusterReindexing.State.fromString(json.textValue())) - .orElse(null); - String message = Optional.ofNullable(statusJson.get("message")) - .map(JsonNode::textValue) - .orElse(null); - documentStatuses.put(type, new ClusterReindexing.Status(startedMillis, endedMillis, state, message, progressToken)); + Map<String, ClusterReindexing> clusters = new HashMap<>(); + for (var clusterNames = jsonNode.get("clusters").fieldNames(); clusterNames.hasNext(); ) { + String clusterName = clusterNames.next(); + JsonNode clusterJson = jsonNode.get("clusters").get(clusterName); + Map<String, ClusterReindexing.Status> documentStatuses = new HashMap<>(); + for (var documentTypes = clusterJson.get("documentTypes").fieldNames(); documentTypes.hasNext(); ) { + String type = documentTypes.next(); + JsonNode statusJson = clusterJson.get("documentTypes").get(type); + Instant startedMillis = Instant.ofEpochMilli(statusJson.get("startedMillis").longValue()); + Instant endedMillis = Optional.ofNullable(statusJson.get("endedMillis")) + .map(json -> Instant.ofEpochMilli(json.longValue())) + .orElse(null); + String progressToken = Optional.ofNullable(statusJson.get("progress")) + .map(JsonNode::textValue) + .orElse(null); + ClusterReindexing.State state = Optional.ofNullable(statusJson.get("state")) + .map(json -> ClusterReindexing.State.fromString(json.textValue())) + .orElse(null); + String message = Optional.ofNullable(statusJson.get("message")) + .map(JsonNode::textValue) + .orElse(null); + documentStatuses.put(type, new ClusterReindexing.Status(startedMillis, endedMillis, state, message, progressToken)); + } + clusters.put(clusterName, new ClusterReindexing(documentStatuses)); } - return new ClusterReindexing(documentStatuses); + return Map.copyOf(clusters); } private static int getStatePort(ServiceInfo service) { diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClientTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClientTest.java index 21894e4a756..82e1bd96373 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClientTest.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClientTest.java @@ -7,10 +7,12 @@ import com.yahoo.config.model.api.PortInfo; import com.yahoo.config.model.api.ServiceInfo; import com.yahoo.documentapi.ProgressToken; import com.yahoo.vespa.config.server.modelfactory.ModelResult; +import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import java.io.IOException; +import java.time.Instant; import java.util.Collection; import java.util.List; import java.util.Map; @@ -21,6 +23,7 @@ import static com.github.tomakehurst.wiremock.client.WireMock.serverError; import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.options; import static com.yahoo.config.model.api.container.ContainerServiceType.CLUSTERCONTROLLER_CONTAINER; +import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -40,25 +43,50 @@ public class DefaultClusterReindexingStatusClientTest { String uriPath = "/reindexing/v1/status"; server1.stubFor(get(urlEqualTo(uriPath)).willReturn(serverError())); server2.stubFor(get(urlEqualTo(uriPath)).willReturn(okJson( - "{\"status\":[{" + - "\"type\":\"music\"," + - "\"startedMillis\":0," + - "\"endedMillis\":123," + - "\"progress\":\"" + new ProgressToken().serializeToString() + "\"," + - "\"state\": \"" + ClusterReindexing.State.FAILED.asString() + "\"," + - "\"message\":\"something went wrong\"}" + - "]}"))); + "{" + + " \"clusters\": {" + + " \"cluster1\": {" + + " \"documentTypes\": {" + + " \"music\": {" + + " \"startedMillis\":0," + + " \"state\": \"" + ClusterReindexing.State.RUNNING.asString() + "\"" + + " }" + + " }" + + " }" + + " }" + + "}"))); server3.stubFor(get(urlEqualTo(uriPath)).willReturn(okJson( - "{\"status\":[{" + - "\"type\":\"artist\"," + - "\"startedMillis\":10," + - "\"endedMillis\":150," + - "\"progress\":\"" + new ProgressToken().serializeToString() + "\"," + - "\"state\": \"" + ClusterReindexing.State.SUCCESSFUL.asString() + "\"," + - "\"message\":\"successs\"}" + - "]}"))); + "{" + + " \"clusters\": {" + + " \"cluster2\": {" + + " \"documentTypes\": {" + + " \"artist\": {" + + " \"startedMillis\":50," + + " \"endedMillis\":150," + + " \"progress\":\"half-done\"," + + " \"state\": \"" + ClusterReindexing.State.SUCCESSFUL.asString() + "\"," + + " \"message\":\"success\"" + + " }" + + " }" + + " }" + + " }" + + "}"))); + Map<String, ClusterReindexing> expected = Map.of("cluster1", + new ClusterReindexing(Map.of("music", + new ClusterReindexing.Status(Instant.ofEpochMilli(0), + null, + ClusterReindexing.State.RUNNING, + null, + null))), + "cluster2", + new ClusterReindexing(Map.of("artist", + new ClusterReindexing.Status(Instant.ofEpochMilli(50), + Instant.ofEpochMilli(150), + ClusterReindexing.State.SUCCESSFUL, + "success", + "half-done")))); Map<String, ClusterReindexing> result = client.getReindexingStatus(app); - System.out.println(result); + assertEquals(expected, result); } |