diff options
author | Jon Marius Venstad <jonmv@users.noreply.github.com> | 2020-12-04 23:26:35 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-12-04 23:26:35 +0100 |
commit | 638b4f6cc2b5c5926c4802ef407c596649e169c8 (patch) | |
tree | f794039500efc7990246e96c81668d9baea3d885 /configserver | |
parent | a90709008ec0d108ee9a2e26bda20e39a10424b5 (diff) |
Revert "Jonmv/reindexing over multiple clusters"
Diffstat (limited to 'configserver')
3 files changed, 42 insertions, 128 deletions
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ClusterReindexing.java b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ClusterReindexing.java index 4b7148463f9..7f0671820d3 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ClusterReindexing.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ClusterReindexing.java @@ -27,26 +27,6 @@ public class ClusterReindexing { public Map<String, Status> documentTypeStatus() { return documentTypeStatus; } - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - ClusterReindexing that = (ClusterReindexing) o; - return documentTypeStatus.equals(that.documentTypeStatus); - } - - @Override - public int hashCode() { - return Objects.hash(documentTypeStatus); - } - - @Override - public String toString() { - return "ClusterReindexing{" + - "documentTypeStatus=" + documentTypeStatus + - '}'; - } - public static class Status { @@ -69,35 +49,6 @@ public class ClusterReindexing { public Optional<State> state() { return Optional.ofNullable(state); } public Optional<String> message() { return Optional.ofNullable(message); } public Optional<String> progress() { return Optional.ofNullable(progress); } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - Status status = (Status) o; - return startedAt.equals(status.startedAt) && - Objects.equals(endedAt, status.endedAt) && - state == status.state && - Objects.equals(message, status.message) && - Objects.equals(progress, status.progress); - } - - @Override - public int hashCode() { - return Objects.hash(startedAt, endedAt, state, message, progress); - } - - @Override - public String toString() { - return "Status{" + - "startedAt=" + startedAt + - ", endedAt=" + endedAt + - ", state=" + state + - ", message='" + message + '\'' + - ", progress='" + progress + '\'' + - '}'; - } - } @@ -116,7 +67,5 @@ public class ClusterReindexing { } public String asString() { return stringValue; } - } - } diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClient.java b/configserver/src/main/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClient.java index 0124b6822f0..fef0120a431 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClient.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClient.java @@ -55,21 +55,21 @@ public class DefaultClusterReindexingStatusClient implements ClusterReindexingSt @Override public Map<String, ClusterReindexing> getReindexingStatus(ModelResult application) throws IOException { Map<ClusterId, List<ServiceInfo>> clusters = clusterControllerClusters(application); - Map<ClusterId, CompletableFuture<Map<String, ClusterReindexing>>> futureStatusPerCluster = new HashMap<>(); + Map<ClusterId, CompletableFuture<ClusterReindexing>> futureStatusPerCluster = new HashMap<>(); clusters.forEach((clusterId, clusterNodes) -> { var parallelRequests = clusterNodes.stream() .map(this::getReindexingStatus) .collect(Collectors.toList()); - CompletableFuture<Map<String, ClusterReindexing>> combinedRequest = CompletableFutures.firstOf(parallelRequests); + CompletableFuture<ClusterReindexing> combinedRequest = CompletableFutures.firstOf(parallelRequests); futureStatusPerCluster.put(clusterId, combinedRequest); }); try { Map<String, ClusterReindexing> statusPerCluster = new HashMap<>(); futureStatusPerCluster.forEach((clusterId, futureStatus) -> { - statusPerCluster.putAll(futureStatus.join()); + statusPerCluster.put(clusterId.s(), futureStatus.join()); }); - return Map.copyOf(statusPerCluster); + return statusPerCluster; } catch (Exception e) { throw new IOException("Failed to get reindexing status from cluster controllers: " + e.getMessage(), e); } @@ -77,7 +77,7 @@ public class DefaultClusterReindexingStatusClient implements ClusterReindexingSt @Override public void close() { uncheck(() -> httpClient.close()); } - private CompletableFuture<Map<String, ClusterReindexing>> getReindexingStatus(ServiceInfo service) { + private CompletableFuture<ClusterReindexing> getReindexingStatus(ServiceInfo service) { URI uri = URI.create(String.format("http://%s:%d/reindexing/v1/status", service.getHostName(), getStatePort(service))); CompletableFuture<SimpleHttpResponse> responsePromise = new CompletableFuture<>(); httpClient.execute(SimpleHttpRequests.get(uri), new FutureCallback<>() { @@ -94,40 +94,33 @@ public class DefaultClusterReindexingStatusClient implements ClusterReindexingSt }, executor); } - private static Map<String, ClusterReindexing> toClusterReindexing(SimpleHttpResponse response) throws IOException { + private static ClusterReindexing toClusterReindexing(SimpleHttpResponse response) throws IOException { if (response.getCode() != HttpStatus.SC_OK) throw new IOException("Expected status code 200, got " + response.getCode()); if (response.getBody() == null) throw new IOException("Response has no content"); return toClusterReindexing(response.getBodyBytes()); } - private static Map<String, ClusterReindexing> toClusterReindexing(byte[] requestBody) throws IOException { + private static ClusterReindexing toClusterReindexing(byte[] requestBody) throws IOException { JsonNode jsonNode = mapper.readTree(requestBody); - Map<String, ClusterReindexing> clusters = new HashMap<>(); - for (var clusterNames = jsonNode.get("clusters").fieldNames(); clusterNames.hasNext(); ) { - String clusterName = clusterNames.next(); - JsonNode clusterJson = jsonNode.get("clusters").get(clusterName); - Map<String, ClusterReindexing.Status> documentStatuses = new HashMap<>(); - for (var documentTypes = clusterJson.get("documentTypes").fieldNames(); documentTypes.hasNext(); ) { - String type = documentTypes.next(); - JsonNode statusJson = clusterJson.get("documentTypes").get(type); - Instant startedMillis = Instant.ofEpochMilli(statusJson.get("startedMillis").longValue()); - Instant endedMillis = Optional.ofNullable(statusJson.get("endedMillis")) - .map(json -> Instant.ofEpochMilli(json.longValue())) - .orElse(null); - String progressToken = Optional.ofNullable(statusJson.get("progress")) - .map(JsonNode::textValue) - .orElse(null); - ClusterReindexing.State state = Optional.ofNullable(statusJson.get("state")) - .map(json -> ClusterReindexing.State.fromString(json.textValue())) - .orElse(null); - String message = Optional.ofNullable(statusJson.get("message")) - .map(JsonNode::textValue) - .orElse(null); - documentStatuses.put(type, new ClusterReindexing.Status(startedMillis, endedMillis, state, message, progressToken)); - } - clusters.put(clusterName, new ClusterReindexing(documentStatuses)); + Map<String, ClusterReindexing.Status> documentStatuses = new HashMap<>(); + for (JsonNode statusJson : jsonNode.get("status")) { + String type = statusJson.get("type").textValue(); + Instant startedMillis = Instant.ofEpochMilli(statusJson.get("startedMillis").longValue()); + Instant endedMillis = Optional.ofNullable(statusJson.get("endedMillis")) + .map(json -> Instant.ofEpochMilli(json.longValue())) + .orElse(null); + String progressToken = Optional.ofNullable(statusJson.get("progress")) + .map(JsonNode::textValue) + .orElse(null); + ClusterReindexing.State state = Optional.ofNullable(statusJson.get("state")) + .map(json -> ClusterReindexing.State.fromString(json.textValue())) + .orElse(null); + String message = Optional.ofNullable(statusJson.get("message")) + .map(JsonNode::textValue) + .orElse(null); + documentStatuses.put(type, new ClusterReindexing.Status(startedMillis, endedMillis, state, message, progressToken)); } - return Map.copyOf(clusters); + return new ClusterReindexing(documentStatuses); } private static int getStatePort(ServiceInfo service) { diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClientTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClientTest.java index 82e1bd96373..21894e4a756 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClientTest.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClientTest.java @@ -7,12 +7,10 @@ import com.yahoo.config.model.api.PortInfo; import com.yahoo.config.model.api.ServiceInfo; import com.yahoo.documentapi.ProgressToken; import com.yahoo.vespa.config.server.modelfactory.ModelResult; -import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import java.io.IOException; -import java.time.Instant; import java.util.Collection; import java.util.List; import java.util.Map; @@ -23,7 +21,6 @@ import static com.github.tomakehurst.wiremock.client.WireMock.serverError; import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.options; import static com.yahoo.config.model.api.container.ContainerServiceType.CLUSTERCONTROLLER_CONTAINER; -import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -43,50 +40,25 @@ public class DefaultClusterReindexingStatusClientTest { String uriPath = "/reindexing/v1/status"; server1.stubFor(get(urlEqualTo(uriPath)).willReturn(serverError())); server2.stubFor(get(urlEqualTo(uriPath)).willReturn(okJson( - "{" + - " \"clusters\": {" + - " \"cluster1\": {" + - " \"documentTypes\": {" + - " \"music\": {" + - " \"startedMillis\":0," + - " \"state\": \"" + ClusterReindexing.State.RUNNING.asString() + "\"" + - " }" + - " }" + - " }" + - " }" + - "}"))); + "{\"status\":[{" + + "\"type\":\"music\"," + + "\"startedMillis\":0," + + "\"endedMillis\":123," + + "\"progress\":\"" + new ProgressToken().serializeToString() + "\"," + + "\"state\": \"" + ClusterReindexing.State.FAILED.asString() + "\"," + + "\"message\":\"something went wrong\"}" + + "]}"))); server3.stubFor(get(urlEqualTo(uriPath)).willReturn(okJson( - "{" + - " \"clusters\": {" + - " \"cluster2\": {" + - " \"documentTypes\": {" + - " \"artist\": {" + - " \"startedMillis\":50," + - " \"endedMillis\":150," + - " \"progress\":\"half-done\"," + - " \"state\": \"" + ClusterReindexing.State.SUCCESSFUL.asString() + "\"," + - " \"message\":\"success\"" + - " }" + - " }" + - " }" + - " }" + - "}"))); - Map<String, ClusterReindexing> expected = Map.of("cluster1", - new ClusterReindexing(Map.of("music", - new ClusterReindexing.Status(Instant.ofEpochMilli(0), - null, - ClusterReindexing.State.RUNNING, - null, - null))), - "cluster2", - new ClusterReindexing(Map.of("artist", - new ClusterReindexing.Status(Instant.ofEpochMilli(50), - Instant.ofEpochMilli(150), - ClusterReindexing.State.SUCCESSFUL, - "success", - "half-done")))); + "{\"status\":[{" + + "\"type\":\"artist\"," + + "\"startedMillis\":10," + + "\"endedMillis\":150," + + "\"progress\":\"" + new ProgressToken().serializeToString() + "\"," + + "\"state\": \"" + ClusterReindexing.State.SUCCESSFUL.asString() + "\"," + + "\"message\":\"successs\"}" + + "]}"))); Map<String, ClusterReindexing> result = client.getReindexingStatus(app); - assertEquals(expected, result); + System.out.println(result); } |