summaryrefslogtreecommitdiffstats
path: root/controller-server
diff options
context:
space:
mode:
authorMartin Polden <mpolden@mpolden.no>2018-09-21 09:13:14 +0200
committerMartin Polden <mpolden@mpolden.no>2018-09-21 09:13:14 +0200
commit8f2d713dca812b9608f6ca2be5d877653b3a2a86 (patch)
tree36c419caf8790cd05f106a77200941fabfd5e204 /controller-server
parent9ad775311f3e2c4fd90bd1c5bdae8d10155e8e92 (diff)
Update applications in parallel
Diffstat (limited to 'controller-server')
-rw-r--r--controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/DeploymentMetricsMaintainer.java66
1 files changed, 42 insertions, 24 deletions
diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/DeploymentMetricsMaintainer.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/DeploymentMetricsMaintainer.java
index 04a6e4075f7..73bc29ed47e 100644
--- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/DeploymentMetricsMaintainer.java
+++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/DeploymentMetricsMaintainer.java
@@ -15,8 +15,12 @@ import com.yahoo.yolean.Exceptions;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -30,6 +34,8 @@ public class DeploymentMetricsMaintainer extends Maintainer {
private static final Logger log = Logger.getLogger(DeploymentMetricsMaintainer.class.getName());
+ private static final int applicationsToUpdateInParallel = 10;
+
private final ApplicationController applications;
DeploymentMetricsMaintainer(Controller controller, Duration duration, JobControl jobControl) {
@@ -39,34 +45,46 @@ public class DeploymentMetricsMaintainer extends Maintainer {
@Override
protected void maintain() {
- boolean hasWarned = false;
- for (Application application : ApplicationList.from(applications.asList()).notPullRequest().asList()) {
- try {
- applications.lockIfPresent(application.id(), locked ->
- applications.store(locked.with(controller().metricsService().getApplicationMetrics(application.id()))));
-
- applications.lockIfPresent(application.id(), locked ->
- applications.store(locked.withRotationStatus(rotationStatus(application))));
+ AtomicBoolean hasWarned = new AtomicBoolean(false);
+ List<Application> applicationList = ApplicationList.from(applications.asList()).notPullRequest().asList();
- for (Deployment deployment : application.deployments().values()) {
- MetricsService.DeploymentMetrics deploymentMetrics = controller().metricsService()
- .getDeploymentMetrics(application.id(), deployment.zone());
- DeploymentMetrics newMetrics = new DeploymentMetrics(deploymentMetrics.queriesPerSecond(),
- deploymentMetrics.writesPerSecond(),
- deploymentMetrics.documentCount(),
- deploymentMetrics.queryLatencyMillis(),
- deploymentMetrics.writeLatencyMillis());
+ // Run parallel stream inside a custom ForkJoinPool so that we can control the number of threads used
+ ForkJoinPool pool = new ForkJoinPool(applicationsToUpdateInParallel);
+ pool.submit(() -> {
+ applicationList.parallelStream().forEach(application -> {
+ try {
+ applications.lockIfPresent(application.id(), locked ->
+ applications.store(locked.with(controller().metricsService().getApplicationMetrics(application.id()))));
applications.lockIfPresent(application.id(), locked ->
- applications.store(locked.with(deployment.zone(), newMetrics)
- .recordActivityAt(controller().clock().instant(), deployment.zone())));
+ applications.store(locked.withRotationStatus(rotationStatus(application))));
+
+ for (Deployment deployment : application.deployments().values()) {
+ MetricsService.DeploymentMetrics deploymentMetrics = controller().metricsService()
+ .getDeploymentMetrics(application.id(), deployment.zone());
+ DeploymentMetrics newMetrics = new DeploymentMetrics(deploymentMetrics.queriesPerSecond(),
+ deploymentMetrics.writesPerSecond(),
+ deploymentMetrics.documentCount(),
+ deploymentMetrics.queryLatencyMillis(),
+ deploymentMetrics.writeLatencyMillis());
+
+ applications.lockIfPresent(application.id(), locked ->
+ applications.store(locked.with(deployment.zone(), newMetrics)
+ .recordActivityAt(controller().clock().instant(), deployment.zone())));
+ }
+ } catch (UncheckedIOException e) {
+ if (!hasWarned.getAndSet(true)) {// produce only one warning per maintenance interval
+ log.log(Level.WARNING, "Failed to query metrics service: " + Exceptions.toMessageString(e) +
+ ". Retrying in " + maintenanceInterval());
+ }
}
- } catch (UncheckedIOException e) {
- if (!hasWarned) // produce only one warning per maintenance interval
- log.log(Level.WARNING, "Failed to query metrics service: " + Exceptions.toMessageString(e) +
- ". Retrying in " + maintenanceInterval());
- hasWarned = true;
- }
+ });
+ });
+ pool.shutdown();
+ try {
+ pool.awaitTermination(30, TimeUnit.MINUTES);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
}
}