summaryrefslogtreecommitdiffstats
path: root/controller-server
diff options
context:
space:
mode:
authorJon Marius Venstad <jvenstad@yahoo-inc.com>2018-04-09 15:20:28 +0200
committerJon Marius Venstad <jvenstad@yahoo-inc.com>2018-04-09 15:28:58 +0200
commit9e460d4c7a5eaae4cf8475ec66ef8129e4fef8aa (patch)
tree89967051fc57284d1bff0bc775cf8ca1cc76cbce /controller-server
parent5c06fe4525c4b27d7f57e2ed96bd5ad4fb20e118 (diff)
Simplify
Diffstat (limited to 'controller-server')
-rw-r--r--controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentTrigger.java56
1 files changed, 12 insertions, 44 deletions
diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentTrigger.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentTrigger.java
index 09f49391d14..409ca0ea1ad 100644
--- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentTrigger.java
+++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentTrigger.java
@@ -23,25 +23,18 @@ import com.yahoo.vespa.hosted.controller.application.JobList;
import com.yahoo.vespa.hosted.controller.application.JobStatus;
import com.yahoo.vespa.hosted.controller.persistence.CuratorDb;
-import java.io.UncheckedIOException;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
import java.util.stream.Stream;
@@ -50,7 +43,6 @@ import static java.util.Comparator.naturalOrder;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.joining;
import static java.util.stream.Collectors.partitioningBy;
-import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
/**
@@ -76,7 +68,6 @@ public class DeploymentTrigger {
private final Clock clock;
private final DeploymentOrder order;
private final BuildService buildService;
- private final ExecutorService executor;
public DeploymentTrigger(Controller controller, CuratorDb curator, BuildService buildService, Clock clock) {
Objects.requireNonNull(controller, "controller cannot be null");
@@ -87,7 +78,6 @@ public class DeploymentTrigger {
this.order = new DeploymentOrder(controller::system);
this.buildService = buildService;
this.jobTimeout = controller.system().equals(SystemName.main) ? Duration.ofHours(12) : Duration.ofHours(1);
- this.executor = Executors.newFixedThreadPool(20);
}
/**
@@ -137,46 +127,24 @@ public class DeploymentTrigger {
* Only one job is triggered each run for test jobs, since those environments have limited capacity.
*/
public void triggerReadyJobs() {
- List<Callable<Void>> tasks = new ArrayList<>();
-
computeReadyJobs().collect(partitioningBy(job -> job.jobType().isTest()))
- .forEach((isConstrained, jobList) -> {
- if (isConstrained) {
- jobList.stream()
+ .entrySet().stream()
+ .flatMap(entry -> (entry.getKey()
+ // True for capacity constrained zones -- sort by priority and make a task for each job type.
+ ? entry.getValue().stream()
.sorted(comparing(Job::isRetry)
.thenComparing(Job::applicationUpgrade)
.reversed()
.thenComparing(Job::availableSince))
.collect(groupingBy(Job::jobType))
- .values().forEach(jobs -> tasks.add(
- () -> {
- for (Job job : jobs)
- if (canTrigger(job) && trigger(job))
- break;
- return null;
- }));
- }
- else {
- jobList.stream()
- .collect(groupingBy(Job::id))
- .values().forEach(jobs -> tasks.add(
- () -> {
- for (Job job : jobs)
- applications().lockIfPresent(job.id, ignored -> {
- if (canTrigger(job))
- trigger(job);
- });
- return null;
- }));
- }
- });
-
- try {
- executor.invokeAll(tasks);
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
+ // False for production jobs -- keep step order and make a task for each application.
+ : entry.getValue().stream()
+ .collect(groupingBy(Job::id)))
+ .values().stream()
+ .map(jobs -> (Runnable) jobs.stream()
+ .filter(job -> canTrigger(job) && trigger(job))
+ .limit(entry.getKey() ? 1 : Long.MAX_VALUE)::count))
+ .parallel().forEach(Runnable::run);
}
/**