aboutsummaryrefslogtreecommitdiffstats
path: root/configserver/src/main/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClient.java
diff options
context:
space:
mode:
Diffstat (limited to 'configserver/src/main/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClient.java')
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClient.java57
1 files changed, 25 insertions, 32 deletions
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClient.java b/configserver/src/main/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClient.java
index 0124b6822f0..fef0120a431 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClient.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClient.java
@@ -55,21 +55,21 @@ public class DefaultClusterReindexingStatusClient implements ClusterReindexingSt
@Override
public Map<String, ClusterReindexing> getReindexingStatus(ModelResult application) throws IOException {
Map<ClusterId, List<ServiceInfo>> clusters = clusterControllerClusters(application);
- Map<ClusterId, CompletableFuture<Map<String, ClusterReindexing>>> futureStatusPerCluster = new HashMap<>();
+ Map<ClusterId, CompletableFuture<ClusterReindexing>> futureStatusPerCluster = new HashMap<>();
clusters.forEach((clusterId, clusterNodes) -> {
var parallelRequests = clusterNodes.stream()
.map(this::getReindexingStatus)
.collect(Collectors.toList());
- CompletableFuture<Map<String, ClusterReindexing>> combinedRequest = CompletableFutures.firstOf(parallelRequests);
+ CompletableFuture<ClusterReindexing> combinedRequest = CompletableFutures.firstOf(parallelRequests);
futureStatusPerCluster.put(clusterId, combinedRequest);
});
try {
Map<String, ClusterReindexing> statusPerCluster = new HashMap<>();
futureStatusPerCluster.forEach((clusterId, futureStatus) -> {
- statusPerCluster.putAll(futureStatus.join());
+ statusPerCluster.put(clusterId.s(), futureStatus.join());
});
- return Map.copyOf(statusPerCluster);
+ return statusPerCluster;
} catch (Exception e) {
throw new IOException("Failed to get reindexing status from cluster controllers: " + e.getMessage(), e);
}
@@ -77,7 +77,7 @@ public class DefaultClusterReindexingStatusClient implements ClusterReindexingSt
@Override public void close() { uncheck(() -> httpClient.close()); }
- private CompletableFuture<Map<String, ClusterReindexing>> getReindexingStatus(ServiceInfo service) {
+ private CompletableFuture<ClusterReindexing> getReindexingStatus(ServiceInfo service) {
URI uri = URI.create(String.format("http://%s:%d/reindexing/v1/status", service.getHostName(), getStatePort(service)));
CompletableFuture<SimpleHttpResponse> responsePromise = new CompletableFuture<>();
httpClient.execute(SimpleHttpRequests.get(uri), new FutureCallback<>() {
@@ -94,40 +94,33 @@ public class DefaultClusterReindexingStatusClient implements ClusterReindexingSt
}, executor);
}
- private static Map<String, ClusterReindexing> toClusterReindexing(SimpleHttpResponse response) throws IOException {
+ private static ClusterReindexing toClusterReindexing(SimpleHttpResponse response) throws IOException {
if (response.getCode() != HttpStatus.SC_OK) throw new IOException("Expected status code 200, got " + response.getCode());
if (response.getBody() == null) throw new IOException("Response has no content");
return toClusterReindexing(response.getBodyBytes());
}
- private static Map<String, ClusterReindexing> toClusterReindexing(byte[] requestBody) throws IOException {
+ private static ClusterReindexing toClusterReindexing(byte[] requestBody) throws IOException {
JsonNode jsonNode = mapper.readTree(requestBody);
- Map<String, ClusterReindexing> clusters = new HashMap<>();
- for (var clusterNames = jsonNode.get("clusters").fieldNames(); clusterNames.hasNext(); ) {
- String clusterName = clusterNames.next();
- JsonNode clusterJson = jsonNode.get("clusters").get(clusterName);
- Map<String, ClusterReindexing.Status> documentStatuses = new HashMap<>();
- for (var documentTypes = clusterJson.get("documentTypes").fieldNames(); documentTypes.hasNext(); ) {
- String type = documentTypes.next();
- JsonNode statusJson = clusterJson.get("documentTypes").get(type);
- Instant startedMillis = Instant.ofEpochMilli(statusJson.get("startedMillis").longValue());
- Instant endedMillis = Optional.ofNullable(statusJson.get("endedMillis"))
- .map(json -> Instant.ofEpochMilli(json.longValue()))
- .orElse(null);
- String progressToken = Optional.ofNullable(statusJson.get("progress"))
- .map(JsonNode::textValue)
- .orElse(null);
- ClusterReindexing.State state = Optional.ofNullable(statusJson.get("state"))
- .map(json -> ClusterReindexing.State.fromString(json.textValue()))
- .orElse(null);
- String message = Optional.ofNullable(statusJson.get("message"))
- .map(JsonNode::textValue)
- .orElse(null);
- documentStatuses.put(type, new ClusterReindexing.Status(startedMillis, endedMillis, state, message, progressToken));
- }
- clusters.put(clusterName, new ClusterReindexing(documentStatuses));
+ Map<String, ClusterReindexing.Status> documentStatuses = new HashMap<>();
+ for (JsonNode statusJson : jsonNode.get("status")) {
+ String type = statusJson.get("type").textValue();
+ Instant startedMillis = Instant.ofEpochMilli(statusJson.get("startedMillis").longValue());
+ Instant endedMillis = Optional.ofNullable(statusJson.get("endedMillis"))
+ .map(json -> Instant.ofEpochMilli(json.longValue()))
+ .orElse(null);
+ String progressToken = Optional.ofNullable(statusJson.get("progress"))
+ .map(JsonNode::textValue)
+ .orElse(null);
+ ClusterReindexing.State state = Optional.ofNullable(statusJson.get("state"))
+ .map(json -> ClusterReindexing.State.fromString(json.textValue()))
+ .orElse(null);
+ String message = Optional.ofNullable(statusJson.get("message"))
+ .map(JsonNode::textValue)
+ .orElse(null);
+ documentStatuses.put(type, new ClusterReindexing.Status(startedMillis, endedMillis, state, message, progressToken));
}
- return Map.copyOf(clusters);
+ return new ClusterReindexing(documentStatuses);
}
private static int getStatePort(ServiceInfo service) {