summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJon Marius Venstad <jvenstad@yahoo-inc.com>2017-11-13 14:15:26 +0100
committerJon Marius Venstad <jvenstad@yahoo-inc.com>2018-01-03 15:54:08 +0100
commitf2a5aa74010636d237f283367f535c22db40d886 (patch)
tree262e9791e4a87352c08af8843a0548cc866b4747
parentded1d3f4e61278c47209efd9b1b1bdb984439651 (diff)
Unfinished PushingBuildSystem
-rw-r--r--controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/BuildService.java4
-rw-r--r--controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/PushingBuildSystem.java166
-rw-r--r--controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/Maintainer.java2
-rw-r--r--controller-server/src/main/java/com/yahoo/vespa/hosted/controller/persistence/CuratorDb.java22
-rw-r--r--controller-server/src/main/java/com/yahoo/vespa/hosted/controller/persistence/JobQueueSerializer.java5
-rw-r--r--controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/MockBuildService.java61
6 files changed, 205 insertions, 55 deletions
diff --git a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/BuildService.java b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/BuildService.java
index 7933a23c45f..0d9c316d4d8 100644
--- a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/BuildService.java
+++ b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/BuildService.java
@@ -7,8 +7,8 @@ package com.yahoo.vespa.hosted.controller.api.integration;
public interface BuildService {
/**
- * Enqueue a job defined by "buildJob in an external build system, and return the outcome of the enqueue request.
- * This method should return @false only when a retry is in order, and @true otherwise, e.g., on success, or for
+ * Enqueue a job defined by buildJob in an external build system, and return the outcome of the enqueue request.
+ * This method should return false only when a retry is in order, and true otherwise, e.g., on success, or for
* invalid jobs.
*/
boolean trigger(BuildJob buildJob);
diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/PushingBuildSystem.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/PushingBuildSystem.java
new file mode 100644
index 00000000000..1769a511886
--- /dev/null
+++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/PushingBuildSystem.java
@@ -0,0 +1,166 @@
+package com.yahoo.vespa.hosted.controller.deployment;
+
+
+import com.yahoo.config.provision.ApplicationId;
+import com.yahoo.vespa.curator.Lock;
+import com.yahoo.vespa.hosted.controller.Controller;
+import com.yahoo.vespa.hosted.controller.api.integration.BuildService;
+import com.yahoo.vespa.hosted.controller.application.DeploymentJobs;
+import com.yahoo.vespa.hosted.controller.maintenance.JobControl;
+import com.yahoo.vespa.hosted.controller.maintenance.Maintainer;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Stores and triggers build jobs in an external BuildService.
+ *
+ * Capacity constrained jobs are added to queues which are polled with a given interval.
+ * Other jobs are triggered right away, in the provided BuildService.
+ *
+ * All triggering (constrained and otherwise) can be turned off in the given JobControl.
+ *
+ * Each triggering spawns its own thread, as there is no guarantee the BuildService provides a timely response.
+ *
+ * @author jvenstad
+ */
+public class PushingBuildSystem extends Maintainer implements BuildSystem {
+
+ private static final Logger log = Logger.getLogger(PushingBuildSystem.class.getName());
+ // The number of jobs to offer, on each poll, for zones that have limited capacity
+ static final Duration constrainedTriggeringInterval = Duration.ofSeconds(30);
+ static final int triggeringRetries = 5;
+
+ private final ExecutorService executors;
+ private final BuildService buildService;
+
+ @SuppressWarnings("unused") // Used by DI.
+ public PushingBuildSystem(Controller controller, JobControl jobControl, BuildService buildService) {
+ super(controller, constrainedTriggeringInterval, jobControl);
+
+ this.buildService = buildService;
+
+ executors = Executors.newFixedThreadPool(20);
+ }
+
+
+ @Override
+ public void addJob(ApplicationId application, DeploymentJobs.JobType jobType, boolean first) {
+ if ( ! projectId(application).isPresent()) {
+ log.warning("Not queuing " + jobType.jobName() + " for " + application.toShortString() +
+ " because project ID is missing");
+ return;
+ }
+
+ // Store jobs that aren't triggered right away.
+ if (isCapacityConstrained(jobType) || ! jobControl().isActive(name())) {
+ try (Lock lock = curator().lockJobQueues()) {
+ Deque<ApplicationId> queue = curator().readJobQueue(jobType);
+ if ( ! queue.contains(application)) {
+ if (first)
+ queue.addFirst(application);
+ else
+ queue.add(application);
+ curator().writeJobQueue(jobType, queue);
+ }
+ else
+ throw new IllegalStateException("Was ordered to trigger " + jobType + " for " + application + ", but this was already enqueued.");
+ }
+ }
+ else
+ new Thread(() -> triggerWithRetries(new BuildService.BuildJob(projectId(application).get(), jobType.jobName()), triggeringRetries)).start();
+ }
+
+ @Override
+ public List<BuildService.BuildJob> jobs() {
+ return getJobs(false);
+ }
+
+ @Override
+ public List<BuildService.BuildJob> takeJobsToRun() {
+ return getJobs(true);
+ }
+
+
+ @Override
+ public void removeJobs(ApplicationId application) {
+ try (Lock lock = curator().lockJobQueues()) {
+ for (DeploymentJobs.JobType jobType : DeploymentJobs.JobType.values()) {
+ Deque<ApplicationId> queue = curator().readJobQueue(jobType);
+ while (queue.remove(application)) {
+ // keep removing until not found
+ }
+ curator().writeJobQueue(jobType, queue);
+ }
+ }
+ }
+
+ private void triggerWithRetries(BuildService.BuildJob buildJob, int retries) {
+ executors.submit(() -> {
+ try {
+ for (int i = 0; i < retries; i++)
+ if (buildService.trigger(buildJob))
+ return;
+
+ throw new RuntimeException("Exhausted all " + retries + " retries without success.");
+ }
+ catch (RuntimeException e) {
+ log.log(Level.WARNING, "Failed to trigger " + buildJob + "; this is likely a transient error.", e);
+ }
+ });
+ }
+
+ private List<BuildService.BuildJob> getJobs(boolean removeFromQueue) {
+ int capacityConstrainedJobsOffered = 0;
+ try (Lock lock = curator().lockJobQueues()) { // TODO: Why? Because multi-controller, perhaps?
+ List<BuildService.BuildJob> jobsToRun = new ArrayList<>();
+ for (DeploymentJobs.JobType jobType : DeploymentJobs.JobType.values()) {
+ Deque<ApplicationId> queue = curator().readJobQueue(jobType);
+ for (ApplicationId a : queue) {
+ ApplicationId application = removeFromQueue ? queue.poll() : a;
+
+ Optional<Long> projectId = projectId(application);
+ if (projectId.isPresent()) {
+ jobsToRun.add(new BuildService.BuildJob(projectId.get(), jobType.jobName()));
+ } else {
+ }
+
+ // Return a limited number of jobs at a time for capacity constrained zones
+ if (removeFromQueue && isCapacityConstrained(jobType) &&
+ ++capacityConstrainedJobsOffered >= 2) {
+ break;
+ }
+ }
+ if (removeFromQueue)
+ curator().writeJobQueue(jobType, queue);
+ }
+ return Collections.unmodifiableList(jobsToRun);
+ }
+ }
+
+ private Optional<Long> projectId(ApplicationId applicationId) {
+ return controller().applications().require(applicationId).deploymentJobs().projectId();
+ }
+
+ private static boolean isCapacityConstrained(DeploymentJobs.JobType jobType) {
+ return jobType == DeploymentJobs.JobType.stagingTest || jobType == DeploymentJobs.JobType.systemTest;
+ }
+
+ @Override
+ protected void maintain() {
+ Set<BuildService.BuildJob> jobsToTrigger = new LinkedHashSet<>();
+ // TODO: Store applications with triggering here, instead of in the DeploymentTrigger?
+ }
+
+}
+
diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/Maintainer.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/Maintainer.java
index ebab2054d4f..5d55a0f71f0 100644
--- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/Maintainer.java
+++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/Maintainer.java
@@ -43,6 +43,8 @@ public abstract class Maintainer extends AbstractComponent implements Runnable {
protected CuratorDb curator() { return jobControl.curator(); }
+ protected JobControl jobControl() { return jobControl; }
+
@Override
public void run() {
try {
diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/persistence/CuratorDb.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/persistence/CuratorDb.java
index a3bb191fc38..cd7d630b4f2 100644
--- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/persistence/CuratorDb.java
+++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/persistence/CuratorDb.java
@@ -123,33 +123,29 @@ public class CuratorDb {
}
public void writeInactiveJobs(Set<String> inactiveJobs) {
- NestedTransaction transaction = new NestedTransaction();
curator.set(inactiveJobsPath(), stringSetSerializer.toJson(inactiveJobs));
- transaction.commit();
}
public Deque<ApplicationId> readJobQueue(DeploymentJobs.JobType jobType) {
try {
Optional<byte[]> data = curator.getData(jobQueuePath(jobType));
- if (! data.isPresent() || data.get().length == 0) return new ArrayDeque<>(); // job queue has never been written
+ if ( ! data.isPresent() || data.get().length == 0) return new ArrayDeque<>(); // job queue has never been written
return jobQueueSerializer.fromJson(data.get());
}
catch (RuntimeException e) {
log.log(Level.WARNING, "Error reading job queue, deleting inactive state");
- writeInactiveJobs(Collections.emptySet());
+ writeJobQueue(jobType, Collections::emptyIterator);
return new ArrayDeque<>();
}
}
- public void writeJobQueue(DeploymentJobs.JobType jobType, Deque<ApplicationId> queue) {
- NestedTransaction transaction = new NestedTransaction();
+ public void writeJobQueue(DeploymentJobs.JobType jobType, Iterable<ApplicationId> queue) {
curator.set(jobQueuePath(jobType), jobQueueSerializer.toJson(queue));
- transaction.commit();
}
public double readUpgradesPerMinute() {
Optional<byte[]> n = curator.getData(upgradesPerMinutePath());
- if (!n.isPresent() || n.get().length == 0) {
+ if ( ! n.isPresent() || n.get().length == 0) {
return 0.5; // Default if value has never been written
}
return ByteBuffer.wrap(n.get()).getDouble();
@@ -159,39 +155,33 @@ public class CuratorDb {
if (n < 0) {
throw new IllegalArgumentException("Upgrades per minute must be >= 0");
}
- NestedTransaction transaction = new NestedTransaction();
curator.set(upgradesPerMinutePath(), ByteBuffer.allocate(Double.BYTES).putDouble(n).array());
- transaction.commit();
}
public boolean readIgnoreConfidence() {
Optional<byte[]> value = curator.getData(ignoreConfidencePath());
- if (! value.isPresent() || value.get().length == 0) {
+ if ( ! value.isPresent() || value.get().length == 0) {
return false; // Default if value has never been written
}
return ByteBuffer.wrap(value.get()).getInt() == 1;
}
public void writeIgnoreConfidence(boolean value) {
- NestedTransaction transaction = new NestedTransaction();
curator.set(ignoreConfidencePath(), ByteBuffer.allocate(Integer.BYTES).putInt(value ? 1 : 0).array());
- transaction.commit();
}
public void writeVersionStatus(VersionStatus status) {
VersionStatusSerializer serializer = new VersionStatusSerializer();
- NestedTransaction transaction = new NestedTransaction();
try {
curator.set(versionStatusPath(), SlimeUtils.toJsonBytes(serializer.toSlime(status)));
} catch (IOException e) {
throw new UncheckedIOException("Failed to serialize version status", e);
}
- transaction.commit();
}
public VersionStatus readVersionStatus() {
Optional<byte[]> data = curator.getData(versionStatusPath());
- if (!data.isPresent() || data.get().length == 0) {
+ if ( ! data.isPresent() || data.get().length == 0) {
return VersionStatus.empty(); // Default if status has never been written
}
VersionStatusSerializer serializer = new VersionStatusSerializer();
diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/persistence/JobQueueSerializer.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/persistence/JobQueueSerializer.java
index 5017624f286..f6d6406a820 100644
--- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/persistence/JobQueueSerializer.java
+++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/persistence/JobQueueSerializer.java
@@ -12,18 +12,19 @@ import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashSet;
+import java.util.List;
import java.util.Set;
/**
* Serialization of a queue of ApplicationIds to/from Json bytes using Slime.
*
- * The set is serialized as an array of string.
+ * The queue is serialized as an array of strings.
*
* @author bratseth
*/
public class JobQueueSerializer {
- public byte[] toJson(Deque<ApplicationId> queue) {
+ public byte[] toJson(Iterable<ApplicationId> queue) {
try {
Slime slime = new Slime();
Cursor array = slime.setArray();
diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/MockBuildService.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/MockBuildService.java
index 1b1a4feaa4e..0afbaebc82c 100644
--- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/MockBuildService.java
+++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/MockBuildService.java
@@ -16,8 +16,8 @@ import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;
-import static com.yahoo.vespa.hosted.controller.deployment.MockBuildService.JobStatus.QUEUED;
-import static com.yahoo.vespa.hosted.controller.deployment.MockBuildService.JobStatus.RUNNING;
+import static com.yahoo.vespa.hosted.controller.deployment.MockBuildService.JobRunStatus.QUEUED;
+import static com.yahoo.vespa.hosted.controller.deployment.MockBuildService.JobRunStatus.RUNNING;
/**
* Simulates polling of build jobs from the controller and triggering and execution of
@@ -30,7 +30,7 @@ public class MockBuildService implements BuildService {
private final ControllerTester tester;
private final MockTimeline timeline;
private final Map<String, Job> jobs;
- private final Map<String, JobStatus> jobStatuses;
+ private final Map<String, JobRunStatus> jobStatuses;
private Version version;
public MockBuildService(ControllerTester tester, MockTimeline timeline) {
@@ -72,12 +72,6 @@ public class MockBuildService implements BuildService {
System.err.println(timeline.now() + ": Triggered " + key + "; it will finish at " + timeline.now().plus(job.duration));
}
- public void incrementVersion() {
- version = new Version(version.getMajor(), version.getMinor() + 1);
- }
-
- public Version version() { return version; }
-
/** Add @job to the set of @Job objects we have information about. */
private void add(Job job) {
jobs.put(job.buildJob().toString(), job);
@@ -88,9 +82,10 @@ public class MockBuildService implements BuildService {
project.jobs.values().forEach(this::add);
}
+ // TODO: Replace with something that relies on ApplicationPackage.
/** Make a @Project with the given settings, modify it if desired, and @add() it its jobs to the pool of known ones. */
- public Project project(ApplicationId applicationId, Long projectId, Duration duration, Supplier<Boolean> success) {
- return new Project(applicationId, projectId, duration, success);
+ public Project project(ApplicationId applicationId, Long projectId, Duration duration, Supplier<JobError> error) {
+ return new Project(applicationId, projectId, duration, error);
}
@@ -99,31 +94,27 @@ public class MockBuildService implements BuildService {
private final ApplicationId applicationId;
private final Long projectId;
- private final Duration duration;
- private final Supplier<Boolean> success;
private final Map<JobType, Job> jobs;
- private Project(ApplicationId applicationId, Long projectId, Duration duration, Supplier<Boolean> success) {
+ private Project(ApplicationId applicationId, Long projectId, Duration duration, Supplier<JobError> error) {
this.applicationId = applicationId;
this.projectId = projectId;
- this.duration = duration;
- this.success = success;
jobs = new EnumMap<>(JobType.class);
for (JobType jobType : JobType.values())
- jobs.put(jobType, new Job(applicationId, projectId, jobType, duration, success));
+ jobs.put(jobType, new Job(applicationId, projectId, jobType, duration, error));
}
/** Set @duration for @jobType of this @Project. */
public Project set(Duration duration, JobType jobType) {
- jobs.compute(jobType, (type, job) -> new Job(applicationId, projectId, jobType, duration, job.success));
+ jobs.compute(jobType, (__, job) -> new Job(applicationId, projectId, jobType, duration, job.error));
return this;
}
/** Set @success for @jobType of this @Project. */
- public Project set(Supplier<Boolean> success, JobType jobType) {
- jobs.compute(jobType, (type, job) -> new Job(applicationId, projectId, jobType, job.duration, success));
+ public Project set(Supplier<JobError> error, JobType jobType) {
+ jobs.compute(jobType, (__, job) -> new Job(applicationId, projectId, jobType, job.duration, error));
return this;
}
@@ -142,35 +133,35 @@ public class MockBuildService implements BuildService {
private final Long projectId;
private final JobType jobType;
private final Duration duration;
- private final Supplier<Boolean> success;
+ private final Supplier<JobError> error;
+
+ private long buildNumber = 0;
- private Job(ApplicationId applicationId, Long projectId, JobType jobType, Duration duration, Supplier<Boolean> success) {
+ private Job(ApplicationId applicationId, Long projectId, JobType jobType, Duration duration, Supplier<JobError> error) {
this.applicationId = applicationId;
this.projectId = projectId;
this.jobType = jobType;
this.duration = duration;
- this.success = success;
+ this.error = error;
}
private void outcome() {
- Boolean success = this.success.get();
- System.err.println(timeline.now() + ": Job " + projectId + ":" + jobType + " reports " + success);
- if (success != null)
- tester.controller().applications().notifyJobCompletion(
- new DeploymentJobs.JobReport(
- applicationId,
- jobType,
- projectId,
- 42,
- Optional.ofNullable(success ? null : JobError.unknown)
- ));
+ if (error == null) return; // null JobError supplier means the job doesn't report back, i.e., is aborted.
+
+ JobError jobError = this.error.get();
+ System.err.println(timeline.now() + ": Job " + projectId + ":" + jobType + " reports " + (jobError == null ? " success " : jobError));
+ tester.controller().applications().notifyJobCompletion(new DeploymentJobs.JobReport(applicationId,
+ jobType,
+ projectId,
+ ++buildNumber,
+ Optional.ofNullable(jobError)));
}
private BuildJob buildJob() { return new BuildJob(projectId, jobType.jobName()); }
}
- enum JobStatus {
+ enum JobRunStatus {
QUEUED,
RUNNING
}