diff options
author | Martin Polden <mpolden@mpolden.no> | 2018-09-21 09:13:14 +0200 |
---|---|---|
committer | Martin Polden <mpolden@mpolden.no> | 2018-09-21 09:13:14 +0200 |
commit | 8f2d713dca812b9608f6ca2be5d877653b3a2a86 (patch) | |
tree | 36c419caf8790cd05f106a77200941fabfd5e204 /controller-server | |
parent | 9ad775311f3e2c4fd90bd1c5bdae8d10155e8e92 (diff) |
Update applications in parallel
Diffstat (limited to 'controller-server')
-rw-r--r-- | controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/DeploymentMetricsMaintainer.java | 66 |
1 files changed, 42 insertions, 24 deletions
diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/DeploymentMetricsMaintainer.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/DeploymentMetricsMaintainer.java index 04a6e4075f7..73bc29ed47e 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/DeploymentMetricsMaintainer.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/DeploymentMetricsMaintainer.java @@ -15,8 +15,12 @@ import com.yahoo.yolean.Exceptions; import java.io.UncheckedIOException; import java.time.Duration; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.TreeMap; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Level; import java.util.logging.Logger; @@ -30,6 +34,8 @@ public class DeploymentMetricsMaintainer extends Maintainer { private static final Logger log = Logger.getLogger(DeploymentMetricsMaintainer.class.getName()); + private static final int applicationsToUpdateInParallel = 10; + private final ApplicationController applications; DeploymentMetricsMaintainer(Controller controller, Duration duration, JobControl jobControl) { @@ -39,34 +45,46 @@ public class DeploymentMetricsMaintainer extends Maintainer { @Override protected void maintain() { - boolean hasWarned = false; - for (Application application : ApplicationList.from(applications.asList()).notPullRequest().asList()) { - try { - applications.lockIfPresent(application.id(), locked -> - applications.store(locked.with(controller().metricsService().getApplicationMetrics(application.id())))); - - applications.lockIfPresent(application.id(), locked -> - applications.store(locked.withRotationStatus(rotationStatus(application)))); + AtomicBoolean hasWarned = new AtomicBoolean(false); + List<Application> applicationList = ApplicationList.from(applications.asList()).notPullRequest().asList(); - for (Deployment deployment : application.deployments().values()) { - MetricsService.DeploymentMetrics deploymentMetrics = controller().metricsService() - .getDeploymentMetrics(application.id(), deployment.zone()); - DeploymentMetrics newMetrics = new DeploymentMetrics(deploymentMetrics.queriesPerSecond(), - deploymentMetrics.writesPerSecond(), - deploymentMetrics.documentCount(), - deploymentMetrics.queryLatencyMillis(), - deploymentMetrics.writeLatencyMillis()); + // Run parallel stream inside a custom ForkJoinPool so that we can control the number of threads used + ForkJoinPool pool = new ForkJoinPool(applicationsToUpdateInParallel); + pool.submit(() -> { + applicationList.parallelStream().forEach(application -> { + try { + applications.lockIfPresent(application.id(), locked -> + applications.store(locked.with(controller().metricsService().getApplicationMetrics(application.id())))); applications.lockIfPresent(application.id(), locked -> - applications.store(locked.with(deployment.zone(), newMetrics) - .recordActivityAt(controller().clock().instant(), deployment.zone()))); + applications.store(locked.withRotationStatus(rotationStatus(application)))); + + for (Deployment deployment : application.deployments().values()) { + MetricsService.DeploymentMetrics deploymentMetrics = controller().metricsService() + .getDeploymentMetrics(application.id(), deployment.zone()); + DeploymentMetrics newMetrics = new DeploymentMetrics(deploymentMetrics.queriesPerSecond(), + deploymentMetrics.writesPerSecond(), + deploymentMetrics.documentCount(), + deploymentMetrics.queryLatencyMillis(), + deploymentMetrics.writeLatencyMillis()); + + applications.lockIfPresent(application.id(), locked -> + applications.store(locked.with(deployment.zone(), newMetrics) + .recordActivityAt(controller().clock().instant(), deployment.zone()))); + } + } catch (UncheckedIOException e) { + if (!hasWarned.getAndSet(true)) {// produce only one warning per maintenance interval + log.log(Level.WARNING, "Failed to query metrics service: " + Exceptions.toMessageString(e) + + ". Retrying in " + maintenanceInterval()); + } } - } catch (UncheckedIOException e) { - if (!hasWarned) // produce only one warning per maintenance interval - log.log(Level.WARNING, "Failed to query metrics service: " + Exceptions.toMessageString(e) + - ". Retrying in " + maintenanceInterval()); - hasWarned = true; - } + }); + }); + pool.shutdown(); + try { + pool.awaitTermination(30, TimeUnit.MINUTES); + } catch (InterruptedException e) { + throw new RuntimeException(e); } } |