summaryrefslogtreecommitdiffstats
path: root/configserver
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2020-12-04 15:50:46 +0100
committerJon Marius Venstad <venstad@gmail.com>2020-12-04 15:50:46 +0100
commitdb40d63bc83c57360fba480ee9e3fae8ab0477b2 (patch)
tree3841c8814ce5e5d7807966373b00fd8956cdbf4f /configserver
parent5d17334d1d65b45c13c3962831fab03760c34ffb (diff)
Merge reindexing responses with multiple clusters in client in cfg server
Diffstat (limited to 'configserver')
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/application/ClusterReindexing.java51
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClient.java57
-rw-r--r--configserver/src/test/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClientTest.java62
3 files changed, 128 insertions, 42 deletions
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ClusterReindexing.java b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ClusterReindexing.java
index 7f0671820d3..4b7148463f9 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ClusterReindexing.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ClusterReindexing.java
@@ -27,6 +27,26 @@ public class ClusterReindexing {
public Map<String, Status> documentTypeStatus() { return documentTypeStatus; }
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ClusterReindexing that = (ClusterReindexing) o;
+ return documentTypeStatus.equals(that.documentTypeStatus);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(documentTypeStatus);
+ }
+
+ @Override
+ public String toString() {
+ return "ClusterReindexing{" +
+ "documentTypeStatus=" + documentTypeStatus +
+ '}';
+ }
+
public static class Status {
@@ -49,6 +69,35 @@ public class ClusterReindexing {
public Optional<State> state() { return Optional.ofNullable(state); }
public Optional<String> message() { return Optional.ofNullable(message); }
public Optional<String> progress() { return Optional.ofNullable(progress); }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ Status status = (Status) o;
+ return startedAt.equals(status.startedAt) &&
+ Objects.equals(endedAt, status.endedAt) &&
+ state == status.state &&
+ Objects.equals(message, status.message) &&
+ Objects.equals(progress, status.progress);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(startedAt, endedAt, state, message, progress);
+ }
+
+ @Override
+ public String toString() {
+ return "Status{" +
+ "startedAt=" + startedAt +
+ ", endedAt=" + endedAt +
+ ", state=" + state +
+ ", message='" + message + '\'' +
+ ", progress='" + progress + '\'' +
+ '}';
+ }
+
}
@@ -67,5 +116,7 @@ public class ClusterReindexing {
}
public String asString() { return stringValue; }
+
}
+
}
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClient.java b/configserver/src/main/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClient.java
index fef0120a431..0124b6822f0 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClient.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClient.java
@@ -55,21 +55,21 @@ public class DefaultClusterReindexingStatusClient implements ClusterReindexingSt
@Override
public Map<String, ClusterReindexing> getReindexingStatus(ModelResult application) throws IOException {
Map<ClusterId, List<ServiceInfo>> clusters = clusterControllerClusters(application);
- Map<ClusterId, CompletableFuture<ClusterReindexing>> futureStatusPerCluster = new HashMap<>();
+ Map<ClusterId, CompletableFuture<Map<String, ClusterReindexing>>> futureStatusPerCluster = new HashMap<>();
clusters.forEach((clusterId, clusterNodes) -> {
var parallelRequests = clusterNodes.stream()
.map(this::getReindexingStatus)
.collect(Collectors.toList());
- CompletableFuture<ClusterReindexing> combinedRequest = CompletableFutures.firstOf(parallelRequests);
+ CompletableFuture<Map<String, ClusterReindexing>> combinedRequest = CompletableFutures.firstOf(parallelRequests);
futureStatusPerCluster.put(clusterId, combinedRequest);
});
try {
Map<String, ClusterReindexing> statusPerCluster = new HashMap<>();
futureStatusPerCluster.forEach((clusterId, futureStatus) -> {
- statusPerCluster.put(clusterId.s(), futureStatus.join());
+ statusPerCluster.putAll(futureStatus.join());
});
- return statusPerCluster;
+ return Map.copyOf(statusPerCluster);
} catch (Exception e) {
throw new IOException("Failed to get reindexing status from cluster controllers: " + e.getMessage(), e);
}
@@ -77,7 +77,7 @@ public class DefaultClusterReindexingStatusClient implements ClusterReindexingSt
@Override public void close() { uncheck(() -> httpClient.close()); }
- private CompletableFuture<ClusterReindexing> getReindexingStatus(ServiceInfo service) {
+ private CompletableFuture<Map<String, ClusterReindexing>> getReindexingStatus(ServiceInfo service) {
URI uri = URI.create(String.format("http://%s:%d/reindexing/v1/status", service.getHostName(), getStatePort(service)));
CompletableFuture<SimpleHttpResponse> responsePromise = new CompletableFuture<>();
httpClient.execute(SimpleHttpRequests.get(uri), new FutureCallback<>() {
@@ -94,33 +94,40 @@ public class DefaultClusterReindexingStatusClient implements ClusterReindexingSt
}, executor);
}
- private static ClusterReindexing toClusterReindexing(SimpleHttpResponse response) throws IOException {
+ private static Map<String, ClusterReindexing> toClusterReindexing(SimpleHttpResponse response) throws IOException {
if (response.getCode() != HttpStatus.SC_OK) throw new IOException("Expected status code 200, got " + response.getCode());
if (response.getBody() == null) throw new IOException("Response has no content");
return toClusterReindexing(response.getBodyBytes());
}
- private static ClusterReindexing toClusterReindexing(byte[] requestBody) throws IOException {
+ private static Map<String, ClusterReindexing> toClusterReindexing(byte[] requestBody) throws IOException {
JsonNode jsonNode = mapper.readTree(requestBody);
- Map<String, ClusterReindexing.Status> documentStatuses = new HashMap<>();
- for (JsonNode statusJson : jsonNode.get("status")) {
- String type = statusJson.get("type").textValue();
- Instant startedMillis = Instant.ofEpochMilli(statusJson.get("startedMillis").longValue());
- Instant endedMillis = Optional.ofNullable(statusJson.get("endedMillis"))
- .map(json -> Instant.ofEpochMilli(json.longValue()))
- .orElse(null);
- String progressToken = Optional.ofNullable(statusJson.get("progress"))
- .map(JsonNode::textValue)
- .orElse(null);
- ClusterReindexing.State state = Optional.ofNullable(statusJson.get("state"))
- .map(json -> ClusterReindexing.State.fromString(json.textValue()))
- .orElse(null);
- String message = Optional.ofNullable(statusJson.get("message"))
- .map(JsonNode::textValue)
- .orElse(null);
- documentStatuses.put(type, new ClusterReindexing.Status(startedMillis, endedMillis, state, message, progressToken));
+ Map<String, ClusterReindexing> clusters = new HashMap<>();
+ for (var clusterNames = jsonNode.get("clusters").fieldNames(); clusterNames.hasNext(); ) {
+ String clusterName = clusterNames.next();
+ JsonNode clusterJson = jsonNode.get("clusters").get(clusterName);
+ Map<String, ClusterReindexing.Status> documentStatuses = new HashMap<>();
+ for (var documentTypes = clusterJson.get("documentTypes").fieldNames(); documentTypes.hasNext(); ) {
+ String type = documentTypes.next();
+ JsonNode statusJson = clusterJson.get("documentTypes").get(type);
+ Instant startedMillis = Instant.ofEpochMilli(statusJson.get("startedMillis").longValue());
+ Instant endedMillis = Optional.ofNullable(statusJson.get("endedMillis"))
+ .map(json -> Instant.ofEpochMilli(json.longValue()))
+ .orElse(null);
+ String progressToken = Optional.ofNullable(statusJson.get("progress"))
+ .map(JsonNode::textValue)
+ .orElse(null);
+ ClusterReindexing.State state = Optional.ofNullable(statusJson.get("state"))
+ .map(json -> ClusterReindexing.State.fromString(json.textValue()))
+ .orElse(null);
+ String message = Optional.ofNullable(statusJson.get("message"))
+ .map(JsonNode::textValue)
+ .orElse(null);
+ documentStatuses.put(type, new ClusterReindexing.Status(startedMillis, endedMillis, state, message, progressToken));
+ }
+ clusters.put(clusterName, new ClusterReindexing(documentStatuses));
}
- return new ClusterReindexing(documentStatuses);
+ return Map.copyOf(clusters);
}
private static int getStatePort(ServiceInfo service) {
diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClientTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClientTest.java
index 21894e4a756..82e1bd96373 100644
--- a/configserver/src/test/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClientTest.java
+++ b/configserver/src/test/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClientTest.java
@@ -7,10 +7,12 @@ import com.yahoo.config.model.api.PortInfo;
import com.yahoo.config.model.api.ServiceInfo;
import com.yahoo.documentapi.ProgressToken;
import com.yahoo.vespa.config.server.modelfactory.ModelResult;
+import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import java.io.IOException;
+import java.time.Instant;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -21,6 +23,7 @@ import static com.github.tomakehurst.wiremock.client.WireMock.serverError;
import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.options;
import static com.yahoo.config.model.api.container.ContainerServiceType.CLUSTERCONTROLLER_CONTAINER;
+import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -40,25 +43,50 @@ public class DefaultClusterReindexingStatusClientTest {
String uriPath = "/reindexing/v1/status";
server1.stubFor(get(urlEqualTo(uriPath)).willReturn(serverError()));
server2.stubFor(get(urlEqualTo(uriPath)).willReturn(okJson(
- "{\"status\":[{" +
- "\"type\":\"music\"," +
- "\"startedMillis\":0," +
- "\"endedMillis\":123," +
- "\"progress\":\"" + new ProgressToken().serializeToString() + "\"," +
- "\"state\": \"" + ClusterReindexing.State.FAILED.asString() + "\"," +
- "\"message\":\"something went wrong\"}" +
- "]}")));
+ "{" +
+ " \"clusters\": {" +
+ " \"cluster1\": {" +
+ " \"documentTypes\": {" +
+ " \"music\": {" +
+ " \"startedMillis\":0," +
+ " \"state\": \"" + ClusterReindexing.State.RUNNING.asString() + "\"" +
+ " }" +
+ " }" +
+ " }" +
+ " }" +
+ "}")));
server3.stubFor(get(urlEqualTo(uriPath)).willReturn(okJson(
- "{\"status\":[{" +
- "\"type\":\"artist\"," +
- "\"startedMillis\":10," +
- "\"endedMillis\":150," +
- "\"progress\":\"" + new ProgressToken().serializeToString() + "\"," +
- "\"state\": \"" + ClusterReindexing.State.SUCCESSFUL.asString() + "\"," +
- "\"message\":\"successs\"}" +
- "]}")));
+ "{" +
+ " \"clusters\": {" +
+ " \"cluster2\": {" +
+ " \"documentTypes\": {" +
+ " \"artist\": {" +
+ " \"startedMillis\":50," +
+ " \"endedMillis\":150," +
+ " \"progress\":\"half-done\"," +
+ " \"state\": \"" + ClusterReindexing.State.SUCCESSFUL.asString() + "\"," +
+ " \"message\":\"success\"" +
+ " }" +
+ " }" +
+ " }" +
+ " }" +
+ "}")));
+ Map<String, ClusterReindexing> expected = Map.of("cluster1",
+ new ClusterReindexing(Map.of("music",
+ new ClusterReindexing.Status(Instant.ofEpochMilli(0),
+ null,
+ ClusterReindexing.State.RUNNING,
+ null,
+ null))),
+ "cluster2",
+ new ClusterReindexing(Map.of("artist",
+ new ClusterReindexing.Status(Instant.ofEpochMilli(50),
+ Instant.ofEpochMilli(150),
+ ClusterReindexing.State.SUCCESSFUL,
+ "success",
+ "half-done"))));
Map<String, ClusterReindexing> result = client.getReindexingStatus(app);
- System.out.println(result);
+ assertEquals(expected, result);
}