From f2a5aa74010636d237f283367f535c22db40d886 Mon Sep 17 00:00:00 2001 From: Jon Marius Venstad Date: Mon, 13 Nov 2017 14:15:26 +0100 Subject: Unfinished PushingBuildSystem --- .../controller/api/integration/BuildService.java | 4 +- .../controller/deployment/PushingBuildSystem.java | 166 +++++++++++++++++++++ .../hosted/controller/maintenance/Maintainer.java | 2 + .../hosted/controller/persistence/CuratorDb.java | 22 +-- .../controller/persistence/JobQueueSerializer.java | 5 +- .../controller/deployment/MockBuildService.java | 61 ++++---- 6 files changed, 205 insertions(+), 55 deletions(-) create mode 100644 controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/PushingBuildSystem.java diff --git a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/BuildService.java b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/BuildService.java index 7933a23c45f..0d9c316d4d8 100644 --- a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/BuildService.java +++ b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/BuildService.java @@ -7,8 +7,8 @@ package com.yahoo.vespa.hosted.controller.api.integration; public interface BuildService { /** - * Enqueue a job defined by "buildJob in an external build system, and return the outcome of the enqueue request. - * This method should return @false only when a retry is in order, and @true otherwise, e.g., on success, or for + * Enqueue a job defined by buildJob in an external build system, and return the outcome of the enqueue request. + * This method should return false only when a retry is in order, and true otherwise, e.g., on success, or for * invalid jobs. */ boolean trigger(BuildJob buildJob); diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/PushingBuildSystem.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/PushingBuildSystem.java new file mode 100644 index 00000000000..1769a511886 --- /dev/null +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/PushingBuildSystem.java @@ -0,0 +1,166 @@ +package com.yahoo.vespa.hosted.controller.deployment; + + +import com.yahoo.config.provision.ApplicationId; +import com.yahoo.vespa.curator.Lock; +import com.yahoo.vespa.hosted.controller.Controller; +import com.yahoo.vespa.hosted.controller.api.integration.BuildService; +import com.yahoo.vespa.hosted.controller.application.DeploymentJobs; +import com.yahoo.vespa.hosted.controller.maintenance.JobControl; +import com.yahoo.vespa.hosted.controller.maintenance.Maintainer; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Deque; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Stores and triggers build jobs in an external BuildService. + * + * Capacity constrained jobs are added to queues which are polled with a given interval. + * Other jobs are triggered right away, in the provided BuildService. + * + * All triggering (constrained and otherwise) can be turned off in the given JobControl. + * + * Each triggering spawns its own thread, as there is no guarantee the BuildService provides a timely response. + * + * @author jvenstad + */ +public class PushingBuildSystem extends Maintainer implements BuildSystem { + + private static final Logger log = Logger.getLogger(PushingBuildSystem.class.getName()); + // The number of jobs to offer, on each poll, for zones that have limited capacity + static final Duration constrainedTriggeringInterval = Duration.ofSeconds(30); + static final int triggeringRetries = 5; + + private final ExecutorService executors; + private final BuildService buildService; + + @SuppressWarnings("unused") // Used by DI. + public PushingBuildSystem(Controller controller, JobControl jobControl, BuildService buildService) { + super(controller, constrainedTriggeringInterval, jobControl); + + this.buildService = buildService; + + executors = Executors.newFixedThreadPool(20); + } + + + @Override + public void addJob(ApplicationId application, DeploymentJobs.JobType jobType, boolean first) { + if ( ! projectId(application).isPresent()) { + log.warning("Not queuing " + jobType.jobName() + " for " + application.toShortString() + + " because project ID is missing"); + return; + } + + // Store jobs that aren't triggered right away. + if (isCapacityConstrained(jobType) || ! jobControl().isActive(name())) { + try (Lock lock = curator().lockJobQueues()) { + Deque queue = curator().readJobQueue(jobType); + if ( ! queue.contains(application)) { + if (first) + queue.addFirst(application); + else + queue.add(application); + curator().writeJobQueue(jobType, queue); + } + else + throw new IllegalStateException("Was ordered to trigger " + jobType + " for " + application + ", but this was already enqueued."); + } + } + else + new Thread(() -> triggerWithRetries(new BuildService.BuildJob(projectId(application).get(), jobType.jobName()), triggeringRetries)).start(); + } + + @Override + public List jobs() { + return getJobs(false); + } + + @Override + public List takeJobsToRun() { + return getJobs(true); + } + + + @Override + public void removeJobs(ApplicationId application) { + try (Lock lock = curator().lockJobQueues()) { + for (DeploymentJobs.JobType jobType : DeploymentJobs.JobType.values()) { + Deque queue = curator().readJobQueue(jobType); + while (queue.remove(application)) { + // keep removing until not found + } + curator().writeJobQueue(jobType, queue); + } + } + } + + private void triggerWithRetries(BuildService.BuildJob buildJob, int retries) { + executors.submit(() -> { + try { + for (int i = 0; i < retries; i++) + if (buildService.trigger(buildJob)) + return; + + throw new RuntimeException("Exhausted all " + retries + " retries without success."); + } + catch (RuntimeException e) { + log.log(Level.WARNING, "Failed to trigger " + buildJob + "; this is likely a transient error.", e); + } + }); + } + + private List getJobs(boolean removeFromQueue) { + int capacityConstrainedJobsOffered = 0; + try (Lock lock = curator().lockJobQueues()) { // TODO: Why? Because multi-controller, perhaps? + List jobsToRun = new ArrayList<>(); + for (DeploymentJobs.JobType jobType : DeploymentJobs.JobType.values()) { + Deque queue = curator().readJobQueue(jobType); + for (ApplicationId a : queue) { + ApplicationId application = removeFromQueue ? queue.poll() : a; + + Optional projectId = projectId(application); + if (projectId.isPresent()) { + jobsToRun.add(new BuildService.BuildJob(projectId.get(), jobType.jobName())); + } else { + } + + // Return a limited number of jobs at a time for capacity constrained zones + if (removeFromQueue && isCapacityConstrained(jobType) && + ++capacityConstrainedJobsOffered >= 2) { + break; + } + } + if (removeFromQueue) + curator().writeJobQueue(jobType, queue); + } + return Collections.unmodifiableList(jobsToRun); + } + } + + private Optional projectId(ApplicationId applicationId) { + return controller().applications().require(applicationId).deploymentJobs().projectId(); + } + + private static boolean isCapacityConstrained(DeploymentJobs.JobType jobType) { + return jobType == DeploymentJobs.JobType.stagingTest || jobType == DeploymentJobs.JobType.systemTest; + } + + @Override + protected void maintain() { + Set jobsToTrigger = new LinkedHashSet<>(); + // TODO: Store applications with triggering here, instead of in the DeploymentTrigger? + } + +} + diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/Maintainer.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/Maintainer.java index ebab2054d4f..5d55a0f71f0 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/Maintainer.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/Maintainer.java @@ -43,6 +43,8 @@ public abstract class Maintainer extends AbstractComponent implements Runnable { protected CuratorDb curator() { return jobControl.curator(); } + protected JobControl jobControl() { return jobControl; } + @Override public void run() { try { diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/persistence/CuratorDb.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/persistence/CuratorDb.java index a3bb191fc38..cd7d630b4f2 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/persistence/CuratorDb.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/persistence/CuratorDb.java @@ -123,33 +123,29 @@ public class CuratorDb { } public void writeInactiveJobs(Set inactiveJobs) { - NestedTransaction transaction = new NestedTransaction(); curator.set(inactiveJobsPath(), stringSetSerializer.toJson(inactiveJobs)); - transaction.commit(); } public Deque readJobQueue(DeploymentJobs.JobType jobType) { try { Optional data = curator.getData(jobQueuePath(jobType)); - if (! data.isPresent() || data.get().length == 0) return new ArrayDeque<>(); // job queue has never been written + if ( ! data.isPresent() || data.get().length == 0) return new ArrayDeque<>(); // job queue has never been written return jobQueueSerializer.fromJson(data.get()); } catch (RuntimeException e) { log.log(Level.WARNING, "Error reading job queue, deleting inactive state"); - writeInactiveJobs(Collections.emptySet()); + writeJobQueue(jobType, Collections::emptyIterator); return new ArrayDeque<>(); } } - public void writeJobQueue(DeploymentJobs.JobType jobType, Deque queue) { - NestedTransaction transaction = new NestedTransaction(); + public void writeJobQueue(DeploymentJobs.JobType jobType, Iterable queue) { curator.set(jobQueuePath(jobType), jobQueueSerializer.toJson(queue)); - transaction.commit(); } public double readUpgradesPerMinute() { Optional n = curator.getData(upgradesPerMinutePath()); - if (!n.isPresent() || n.get().length == 0) { + if ( ! n.isPresent() || n.get().length == 0) { return 0.5; // Default if value has never been written } return ByteBuffer.wrap(n.get()).getDouble(); @@ -159,39 +155,33 @@ public class CuratorDb { if (n < 0) { throw new IllegalArgumentException("Upgrades per minute must be >= 0"); } - NestedTransaction transaction = new NestedTransaction(); curator.set(upgradesPerMinutePath(), ByteBuffer.allocate(Double.BYTES).putDouble(n).array()); - transaction.commit(); } public boolean readIgnoreConfidence() { Optional value = curator.getData(ignoreConfidencePath()); - if (! value.isPresent() || value.get().length == 0) { + if ( ! value.isPresent() || value.get().length == 0) { return false; // Default if value has never been written } return ByteBuffer.wrap(value.get()).getInt() == 1; } public void writeIgnoreConfidence(boolean value) { - NestedTransaction transaction = new NestedTransaction(); curator.set(ignoreConfidencePath(), ByteBuffer.allocate(Integer.BYTES).putInt(value ? 1 : 0).array()); - transaction.commit(); } public void writeVersionStatus(VersionStatus status) { VersionStatusSerializer serializer = new VersionStatusSerializer(); - NestedTransaction transaction = new NestedTransaction(); try { curator.set(versionStatusPath(), SlimeUtils.toJsonBytes(serializer.toSlime(status))); } catch (IOException e) { throw new UncheckedIOException("Failed to serialize version status", e); } - transaction.commit(); } public VersionStatus readVersionStatus() { Optional data = curator.getData(versionStatusPath()); - if (!data.isPresent() || data.get().length == 0) { + if ( ! data.isPresent() || data.get().length == 0) { return VersionStatus.empty(); // Default if status has never been written } VersionStatusSerializer serializer = new VersionStatusSerializer(); diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/persistence/JobQueueSerializer.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/persistence/JobQueueSerializer.java index 5017624f286..f6d6406a820 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/persistence/JobQueueSerializer.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/persistence/JobQueueSerializer.java @@ -12,18 +12,19 @@ import java.io.IOException; import java.util.ArrayDeque; import java.util.Deque; import java.util.HashSet; +import java.util.List; import java.util.Set; /** * Serialization of a queue of ApplicationIds to/from Json bytes using Slime. * - * The set is serialized as an array of string. + * The queue is serialized as an array of strings. * * @author bratseth */ public class JobQueueSerializer { - public byte[] toJson(Deque queue) { + public byte[] toJson(Iterable queue) { try { Slime slime = new Slime(); Cursor array = slime.setArray(); diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/MockBuildService.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/MockBuildService.java index 1b1a4feaa4e..0afbaebc82c 100644 --- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/MockBuildService.java +++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/MockBuildService.java @@ -16,8 +16,8 @@ import java.util.Map; import java.util.Optional; import java.util.function.Supplier; -import static com.yahoo.vespa.hosted.controller.deployment.MockBuildService.JobStatus.QUEUED; -import static com.yahoo.vespa.hosted.controller.deployment.MockBuildService.JobStatus.RUNNING; +import static com.yahoo.vespa.hosted.controller.deployment.MockBuildService.JobRunStatus.QUEUED; +import static com.yahoo.vespa.hosted.controller.deployment.MockBuildService.JobRunStatus.RUNNING; /** * Simulates polling of build jobs from the controller and triggering and execution of @@ -30,7 +30,7 @@ public class MockBuildService implements BuildService { private final ControllerTester tester; private final MockTimeline timeline; private final Map jobs; - private final Map jobStatuses; + private final Map jobStatuses; private Version version; public MockBuildService(ControllerTester tester, MockTimeline timeline) { @@ -72,12 +72,6 @@ public class MockBuildService implements BuildService { System.err.println(timeline.now() + ": Triggered " + key + "; it will finish at " + timeline.now().plus(job.duration)); } - public void incrementVersion() { - version = new Version(version.getMajor(), version.getMinor() + 1); - } - - public Version version() { return version; } - /** Add @job to the set of @Job objects we have information about. */ private void add(Job job) { jobs.put(job.buildJob().toString(), job); @@ -88,9 +82,10 @@ public class MockBuildService implements BuildService { project.jobs.values().forEach(this::add); } + // TODO: Replace with something that relies on ApplicationPackage. /** Make a @Project with the given settings, modify it if desired, and @add() it its jobs to the pool of known ones. */ - public Project project(ApplicationId applicationId, Long projectId, Duration duration, Supplier success) { - return new Project(applicationId, projectId, duration, success); + public Project project(ApplicationId applicationId, Long projectId, Duration duration, Supplier error) { + return new Project(applicationId, projectId, duration, error); } @@ -99,31 +94,27 @@ public class MockBuildService implements BuildService { private final ApplicationId applicationId; private final Long projectId; - private final Duration duration; - private final Supplier success; private final Map jobs; - private Project(ApplicationId applicationId, Long projectId, Duration duration, Supplier success) { + private Project(ApplicationId applicationId, Long projectId, Duration duration, Supplier error) { this.applicationId = applicationId; this.projectId = projectId; - this.duration = duration; - this.success = success; jobs = new EnumMap<>(JobType.class); for (JobType jobType : JobType.values()) - jobs.put(jobType, new Job(applicationId, projectId, jobType, duration, success)); + jobs.put(jobType, new Job(applicationId, projectId, jobType, duration, error)); } /** Set @duration for @jobType of this @Project. */ public Project set(Duration duration, JobType jobType) { - jobs.compute(jobType, (type, job) -> new Job(applicationId, projectId, jobType, duration, job.success)); + jobs.compute(jobType, (__, job) -> new Job(applicationId, projectId, jobType, duration, job.error)); return this; } /** Set @success for @jobType of this @Project. */ - public Project set(Supplier success, JobType jobType) { - jobs.compute(jobType, (type, job) -> new Job(applicationId, projectId, jobType, job.duration, success)); + public Project set(Supplier error, JobType jobType) { + jobs.compute(jobType, (__, job) -> new Job(applicationId, projectId, jobType, job.duration, error)); return this; } @@ -142,35 +133,35 @@ public class MockBuildService implements BuildService { private final Long projectId; private final JobType jobType; private final Duration duration; - private final Supplier success; + private final Supplier error; + + private long buildNumber = 0; - private Job(ApplicationId applicationId, Long projectId, JobType jobType, Duration duration, Supplier success) { + private Job(ApplicationId applicationId, Long projectId, JobType jobType, Duration duration, Supplier error) { this.applicationId = applicationId; this.projectId = projectId; this.jobType = jobType; this.duration = duration; - this.success = success; + this.error = error; } private void outcome() { - Boolean success = this.success.get(); - System.err.println(timeline.now() + ": Job " + projectId + ":" + jobType + " reports " + success); - if (success != null) - tester.controller().applications().notifyJobCompletion( - new DeploymentJobs.JobReport( - applicationId, - jobType, - projectId, - 42, - Optional.ofNullable(success ? null : JobError.unknown) - )); + if (error == null) return; // null JobError supplier means the job doesn't report back, i.e., is aborted. + + JobError jobError = this.error.get(); + System.err.println(timeline.now() + ": Job " + projectId + ":" + jobType + " reports " + (jobError == null ? " success " : jobError)); + tester.controller().applications().notifyJobCompletion(new DeploymentJobs.JobReport(applicationId, + jobType, + projectId, + ++buildNumber, + Optional.ofNullable(jobError))); } private BuildJob buildJob() { return new BuildJob(projectId, jobType.jobName()); } } - enum JobStatus { + enum JobRunStatus { QUEUED, RUNNING } -- cgit v1.2.3