diff options
53 files changed, 1084 insertions, 647 deletions
diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/Application.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/Application.java index ee8e1b19e58..364e91f1828 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/Application.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/Application.java @@ -8,24 +8,18 @@ import com.yahoo.config.application.api.ValidationOverrides; import com.yahoo.config.provision.ApplicationId; import com.yahoo.config.provision.Environment; import com.yahoo.config.provision.Zone; -import com.yahoo.vespa.hosted.controller.api.integration.organization.IssueId; -import com.yahoo.vespa.hosted.controller.application.ApplicationRevision; import com.yahoo.vespa.hosted.controller.application.Change; import com.yahoo.vespa.hosted.controller.application.Deployment; import com.yahoo.vespa.hosted.controller.application.DeploymentJobs; -import com.yahoo.vespa.hosted.controller.application.DeploymentJobs.JobReport; -import com.yahoo.vespa.hosted.controller.application.DeploymentJobs.JobType; import java.time.Instant; import java.util.Collections; import java.util.Comparator; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.function.Function; -import java.util.stream.Collector; import java.util.stream.Collectors; /** @@ -61,9 +55,9 @@ public class Application { deploymentJobs, deploying, outstandingChange); } - private Application(ApplicationId id, DeploymentSpec deploymentSpec, ValidationOverrides validationOverrides, - Map<Zone, Deployment> deployments, - DeploymentJobs deploymentJobs, Optional<Change> deploying, boolean outstandingChange) { + Application(ApplicationId id, DeploymentSpec deploymentSpec, ValidationOverrides validationOverrides, + Map<Zone, Deployment> deployments, DeploymentJobs deploymentJobs, Optional<Change> deploying, + boolean outstandingChange) { Objects.requireNonNull(id, "id cannot be null"); Objects.requireNonNull(deploymentSpec, "deploymentSpec cannot be null"); Objects.requireNonNull(validationOverrides, "validationOverrides cannot be null"); @@ -137,87 +131,8 @@ public class Application { return deployedVersion().orElse(controller.systemVersion()); } - public Application withProjectId(long projectId) { - return new Application(id, deploymentSpec, validationOverrides, deployments, deploymentJobs.withProjectId(projectId), deploying, outstandingChange); - } - - public Application with(IssueId issueId) { - return new Application(id, deploymentSpec, validationOverrides, deployments, deploymentJobs.with(issueId), deploying, outstandingChange); - } - - public Application withJobCompletion(JobReport report, Instant notificationTime, Controller controller) { - return new Application(id, - deploymentSpec, - validationOverrides, - deployments, - deploymentJobs.withCompletion(report, notificationTime, controller), - deploying, - outstandingChange); - } - - public Application withJobTriggering(long runId, JobType type, Optional<Change> change, String reason, - Instant triggerTime, Controller controller) { - return new Application(id, - deploymentSpec, - validationOverrides, - deployments, - deploymentJobs.withTriggering(type, - change, - runId, - determineTriggerVersion(type, controller), - determineTriggerRevision(type, controller), - reason, - triggerTime), - deploying, - outstandingChange); - } - - public Application with(Deployment deployment) { - Map<Zone, Deployment> deployments = new LinkedHashMap<>(this.deployments); - deployments.put(deployment.zone(), deployment); - return new Application(id, deploymentSpec, validationOverrides, deployments, deploymentJobs, deploying, outstandingChange); - } - - public Application with(DeploymentJobs deploymentJobs) { - return new Application(id, deploymentSpec, validationOverrides, deployments, deploymentJobs, deploying, outstandingChange); - } - - public Application withoutDeploymentIn(Zone zone) { - Map<Zone, Deployment> deployments = new LinkedHashMap<>(this.deployments); - deployments.remove(zone); - return new Application(id, deploymentSpec, validationOverrides, deployments, deploymentJobs, deploying, outstandingChange); - } - - public Application withoutDeploymentJob(JobType jobType) { - DeploymentJobs deploymentJobs = this.deploymentJobs.without(jobType); - return new Application(id, deploymentSpec, validationOverrides, deployments, deploymentJobs, deploying, outstandingChange); - } - - public Application with(DeploymentSpec deploymentSpec) { - return new Application(id, deploymentSpec, validationOverrides, deployments, deploymentJobs, deploying, outstandingChange); - } - - public Application with(ValidationOverrides validationOverrides) { - return new Application(id, deploymentSpec, validationOverrides, deployments, deploymentJobs, deploying, outstandingChange); - } - - public Application withDeploying(Optional<Change> deploying) { - return new Application(id, deploymentSpec, validationOverrides, deployments, deploymentJobs, deploying, outstandingChange); - } - - public Application withOutstandingChange(boolean outstandingChange) { - return new Application(id, deploymentSpec, validationOverrides, deployments, deploymentJobs, deploying, outstandingChange); - } - - private Version determineTriggerVersion(JobType jobType, Controller controller) { - Optional<Zone> zone = jobType.zone(controller.system()); - if ( ! zone.isPresent()) // a sloppy test TODO: Fix - return controller.systemVersion(); - return currentDeployVersion(controller, zone.get()); - } - /** Returns the version a deployment to this zone should use for this application */ - Version currentDeployVersion(Controller controller, Zone zone) { + public Version currentDeployVersion(Controller controller, Zone zone) { if ( ! deploying().isPresent()) return currentVersion(controller, zone); else if ( deploying().get() instanceof Change.ApplicationChange) @@ -227,7 +142,7 @@ public class Application { } /** Returns the current version this application has, or if none; should use, in the given zone */ - Version currentVersion(Controller controller, Zone zone) { + public Version currentVersion(Controller controller, Zone zone) { Deployment currentDeployment = deployments().get(zone); if (currentDeployment != null) // Already deployed in this zone: Use that version return currentDeployment.version(); @@ -235,34 +150,6 @@ public class Application { return deployedVersion().orElse(controller.systemVersion()); } - private Optional<ApplicationRevision> determineTriggerRevision(JobType jobType, Controller controller) { - Optional<Zone> zone = jobType.zone(controller.system()); - if ( ! zone.isPresent()) // a sloppy test TODO: Fix - return Optional.empty(); - return currentDeployRevision(jobType.zone(controller.system()).get()); - } - - /** Returns the version a deployment to this zone should use for this application, or empty if we don't know */ - Optional<ApplicationRevision> currentDeployRevision(Zone zone) { - if ( ! deploying().isPresent()) - return currentRevision(zone); - else if ( deploying().get() instanceof Change.VersionChange) - return currentRevision(zone); - else - return ((Change.ApplicationChange)deploying().get()).revision(); - } - - /** - * Returns the current revision this application has, or if none; should use assuming no change, - * in the given zone. Empty if not known - */ - Optional<ApplicationRevision> currentRevision(Zone zone) { - Deployment currentDeployment = deployments().get(zone); - if (currentDeployment != null) // Already deployed in this zone: Use that revision - return Optional.of(currentDeployment.revision()); - return Optional.empty(); - } - @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/ApplicationController.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/ApplicationController.java index 3339e59d581..12ebfa625ac 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/ApplicationController.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/ApplicationController.java @@ -22,7 +22,6 @@ import com.yahoo.vespa.hosted.controller.api.identifiers.DeploymentId; import com.yahoo.vespa.hosted.controller.api.identifiers.Hostname; import com.yahoo.vespa.hosted.controller.api.identifiers.RevisionId; import com.yahoo.vespa.hosted.controller.api.identifiers.TenantId; -import com.yahoo.vespa.hosted.controller.api.integration.organization.IssueId; import com.yahoo.vespa.hosted.controller.api.integration.configserver.ConfigServerClient; import com.yahoo.vespa.hosted.controller.api.integration.configserver.Log; import com.yahoo.vespa.hosted.controller.api.integration.configserver.NoInstanceException; @@ -112,9 +111,10 @@ public class ApplicationController { for (Application application : db.listApplications()) { try (Lock lock = lock(application.id())) { - Optional<Application> optionalApplication = db.getApplication(application.id()); // re-get inside lock - if ( ! optionalApplication.isPresent()) continue; // was removed since listing; ok - store(optionalApplication.get(), lock); // re-write all applications to update storage format + Optional<LockedApplication> lockedApplication = db.getApplication(application.id()) + .map(app -> new LockedApplication(app, lock)); + if ( ! lockedApplication.isPresent()) continue; // was removed since listing; ok + store(lockedApplication.get()); // re-write all applications to update storage format } } } @@ -124,6 +124,12 @@ public class ApplicationController { return db.getApplication(id); } + + /** Returns an locked application with the given id that be updated and stored */ + public Optional<LockedApplication> get(ApplicationId id, Lock lock) { + return db.getApplication(id).map(application -> new LockedApplication(application, lock)); + } + /** * Returns the application with the given id * @@ -133,6 +139,16 @@ public class ApplicationController { return get(id).orElseThrow(() -> new IllegalArgumentException(id + " not found")); } + /** + * Returns a locked application that be updated and stored + * + * @throws IllegalArgumentException if it does not exist + * + */ + public LockedApplication require(ApplicationId id, Lock lock) { + return get(id, lock).orElseThrow(() -> new IllegalArgumentException(id + " not found")); + } + /** Returns a snapshot of all applications */ public List<Application> asList() { return db.listApplications(); @@ -260,8 +276,8 @@ public class ApplicationController { zmsClient.addApplication(tenant.get().getAthensDomain().get(), new com.yahoo.vespa.hosted.controller.api.identifiers.ApplicationId(id.application().value())); } - Application application = new Application(id); - store(application, lock); + LockedApplication application = new LockedApplication(new Application(id), lock); + store(application); log.info("Created " + application); return application; } @@ -273,7 +289,9 @@ public class ApplicationController { ApplicationPackage applicationPackage, DeployOptions options) { try (Lock lock = lock(applicationId)) { // Determine what we are doing - Application application = get(applicationId).orElse(new Application(applicationId)); + LockedApplication application = get(applicationId, lock).orElse(new LockedApplication( + new Application(applicationId), lock) + ); Version version; if (options.deployCurrentVersion) @@ -311,12 +329,12 @@ public class ApplicationController { // Delete zones not listed in DeploymentSpec, if allowed // We do this at deployment time to be able to return a validation failure message when necessary - application = deleteRemovedDeployments(application, lock); + application = deleteRemovedDeployments(application); // Clean up deployment jobs that are no longer referenced by deployment spec application = deleteUnreferencedDeploymentJobs(application); - store(application, lock); // store missing information even if we fail deployment below + store(application); // store missing information even if we fail deployment below } // Ensure that the deploying change is tested @@ -341,7 +359,7 @@ public class ApplicationController { previousDeployment.clusterUtils(), previousDeployment.clusterInfo(), previousDeployment.metrics()); application = application.with(newDeployment); - store(application, lock); + store(application); return new ActivateResult(new RevisionId(applicationPackage.hash()), preparedApplication.prepareResponse()); } @@ -358,7 +376,7 @@ public class ApplicationController { return new ActivateResult(new RevisionId(applicationPackage.hash()), prepareResponse); } - private Application deleteRemovedDeployments(Application application, Lock lock) { + private LockedApplication deleteRemovedDeployments(LockedApplication application) { List<Deployment> deploymentsToRemove = application.productionDeployments().values().stream() .filter(deployment -> ! application.deploymentSpec().includes(deployment.zone().environment(), Optional.of(deployment.zone().region()))) @@ -376,13 +394,13 @@ public class ApplicationController { (deploymentsToRemove.size() > 1 ? "these zones" : "this zone") + " in deployment.xml"); - Application applicationWithRemoval = application; + LockedApplication applicationWithRemoval = application; for (Deployment deployment : deploymentsToRemove) - applicationWithRemoval = deactivate(applicationWithRemoval, deployment.zone(), lock); + applicationWithRemoval = deactivate(applicationWithRemoval, deployment.zone()); return applicationWithRemoval; } - private Application deleteUnreferencedDeploymentJobs(Application application) { + private LockedApplication deleteUnreferencedDeploymentJobs(LockedApplication application) { for (DeploymentJobs.JobType job : application.deploymentJobs().jobStatus().keySet()) { Optional<Zone> zone = job.zone(controller.system()); @@ -510,20 +528,12 @@ public class ApplicationController { } } - public void setIssueId(ApplicationId id, IssueId issueId) { - try (Lock lock = lock(id)) { - get(id).ifPresent(application -> store(application.with(issueId), lock)); - } - } - /** * Replace any previous version of this application by this instance * - * @param application the application version to store - * @param lock the lock held on this application since before modification started + * @param application a locked application to store */ - @SuppressWarnings("unused") // lock is part of the signature to remind people to acquire it, not needed internally - public void store(Application application, Lock lock) { + public void store(LockedApplication application) { db.store(application); } @@ -555,26 +565,25 @@ public class ApplicationController { } /** Deactivate application in the given zone */ - public Application deactivate(Application application, Zone zone) { - return deactivate(application, zone, Optional.empty(), false); + public void deactivate(Application application, Zone zone) { + deactivate(application, zone, Optional.empty(), false); } /** Deactivate a known deployment of the given application */ - public Application deactivate(Application application, Deployment deployment, boolean requireThatDeploymentHasExpired) { - return deactivate(application, deployment.zone(), Optional.of(deployment), requireThatDeploymentHasExpired); + public void deactivate(Application application, Deployment deployment, boolean requireThatDeploymentHasExpired) { + deactivate(application, deployment.zone(), Optional.of(deployment), requireThatDeploymentHasExpired); } - private Application deactivate(Application application, Zone zone, Optional<Deployment> deployment, - boolean requireThatDeploymentHasExpired) { + private void deactivate(Application application, Zone zone, Optional<Deployment> deployment, + boolean requireThatDeploymentHasExpired) { try (Lock lock = lock(application.id())) { - application = controller.applications().require(application.id()); // re-get with lock + LockedApplication lockedApplication = controller.applications().require(application.id(), lock); if (deployment.isPresent() && requireThatDeploymentHasExpired && ! DeploymentExpirer.hasExpired(controller.zoneRegistry(), deployment.get(), clock.instant())) { - return application; + return; } - application = deactivate(application, zone, lock); - store(application, lock); - return application; + lockedApplication = deactivate(lockedApplication, zone); + store(lockedApplication); } } @@ -583,7 +592,7 @@ public class ApplicationController { * * @return the application with the deployment in the given zone removed */ - private Application deactivate(Application application, Zone zone, Lock lock) { + private LockedApplication deactivate(LockedApplication application, Zone zone) { try { configserverClient.deactivate(new DeploymentId(application.id(), zone)); } diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/LockedApplication.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/LockedApplication.java new file mode 100644 index 00000000000..b0424282ace --- /dev/null +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/LockedApplication.java @@ -0,0 +1,168 @@ +package com.yahoo.vespa.hosted.controller; + +import com.yahoo.component.Version; +import com.yahoo.config.application.api.DeploymentSpec; +import com.yahoo.config.application.api.ValidationOverrides; +import com.yahoo.config.provision.Zone; +import com.yahoo.vespa.curator.Lock; +import com.yahoo.vespa.hosted.controller.api.integration.organization.IssueId; +import com.yahoo.vespa.hosted.controller.application.ApplicationRevision; +import com.yahoo.vespa.hosted.controller.application.Change; +import com.yahoo.vespa.hosted.controller.application.Deployment; +import com.yahoo.vespa.hosted.controller.application.DeploymentJobs; + +import java.time.Instant; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +/** + * A combination of an application instance and a lock for that application. Provides methods for updating application + * fields. + * + * @author mpolden + */ +public class LockedApplication extends Application { + + private final Lock lock; + + /** + * LockedApplication should be acquired through ApplicationController and never constructed directly + * + * @param application Application instance for which lock has been acquired + * @param lock Unused, but must be held when constructing this + */ + LockedApplication(Application application, Lock lock) { + super(application.id(), application.deploymentSpec(), application.validationOverrides(), + application.deployments(), application.deploymentJobs(), application.deploying(), + application.hasOutstandingChange()); + this.lock = Objects.requireNonNull(lock, "lock cannot be null"); + } + + public LockedApplication withProjectId(long projectId) { + return new LockedApplication(new Application(id(), deploymentSpec(), validationOverrides(), deployments(), + deploymentJobs().withProjectId(projectId), deploying(), + hasOutstandingChange()), lock); + } + + public LockedApplication with(IssueId issueId) { + return new LockedApplication(new Application(id(), deploymentSpec(), validationOverrides(), deployments(), + deploymentJobs().with(issueId), deploying(), + hasOutstandingChange()), lock); + } + + public LockedApplication withJobCompletion(DeploymentJobs.JobReport report, Instant notificationTime, + Controller controller) { + return new LockedApplication(new Application(id(), deploymentSpec(), validationOverrides(), + deployments(), + deploymentJobs().withCompletion(report, notificationTime, + controller), + deploying(), hasOutstandingChange()), lock); + } + + public LockedApplication withJobTriggering(long runId, DeploymentJobs.JobType type, Optional<Change> change, + String reason, Instant triggerTime, Controller controller) { + return new LockedApplication(new Application(id(), deploymentSpec(), validationOverrides(), deployments(), + deploymentJobs().withTriggering(type, + change, + runId, + determineTriggerVersion(type, controller), + determineTriggerRevision(type, controller), + reason, + triggerTime), + deploying(), hasOutstandingChange()), lock); + } + + public LockedApplication with(Deployment deployment) { + Map<Zone, Deployment> deployments = new LinkedHashMap<>(deployments()); + deployments.put(deployment.zone(), deployment); + return new LockedApplication(new Application(id(), deploymentSpec(), validationOverrides(), + deployments, deploymentJobs(), deploying(), + hasOutstandingChange()), lock); + } + + public LockedApplication with(DeploymentJobs deploymentJobs) { + return new LockedApplication(new Application(id(), deploymentSpec(), validationOverrides(), + deployments(), deploymentJobs, deploying(), + hasOutstandingChange()), lock); + } + + public LockedApplication withoutDeploymentIn(Zone zone) { + Map<Zone, Deployment> deployments = new LinkedHashMap<>(deployments()); + deployments.remove(zone); + return new LockedApplication(new Application(id(), deploymentSpec(), validationOverrides(), + deployments, deploymentJobs(), deploying(), + hasOutstandingChange()), lock); + } + + public LockedApplication withoutDeploymentJob(DeploymentJobs.JobType jobType) { + DeploymentJobs deploymentJobs = deploymentJobs().without(jobType); + return new LockedApplication(new Application(id(), deploymentSpec(), validationOverrides(), + deployments(), deploymentJobs, deploying(), + hasOutstandingChange()), lock); + } + + public LockedApplication with(DeploymentSpec deploymentSpec) { + return new LockedApplication(new Application(id(), deploymentSpec, validationOverrides(), + deployments(), deploymentJobs(), deploying(), + hasOutstandingChange()), lock); + } + + public LockedApplication with(ValidationOverrides validationOverrides) { + return new LockedApplication(new Application(id(), deploymentSpec(), validationOverrides, + deployments(), deploymentJobs(), deploying(), + hasOutstandingChange()), lock); + } + + public LockedApplication withDeploying(Optional<Change> deploying) { + return new LockedApplication(new Application(id(), deploymentSpec(), validationOverrides(), + deployments(), deploymentJobs(), deploying, + hasOutstandingChange()), lock); + } + + public LockedApplication withOutstandingChange(boolean outstandingChange) { + return new LockedApplication(new Application(id(), deploymentSpec(), + validationOverrides(), deployments(), + deploymentJobs(), deploying(), outstandingChange), lock); + } + + private Version determineTriggerVersion(DeploymentJobs.JobType jobType, Controller controller) { + Optional<Zone> zone = jobType.zone(controller.system()); + if ( ! zone.isPresent()) // a sloppy test TODO: Fix + return controller.systemVersion(); + return currentDeployVersion(controller, zone.get()); + } + + private Optional<ApplicationRevision> determineTriggerRevision(DeploymentJobs.JobType jobType, + Controller controller) { + Optional<Zone> zone = jobType.zone(controller.system()); + if ( ! zone.isPresent()) // a sloppy test TODO: Fix + return Optional.empty(); + return currentDeployRevision(jobType.zone(controller.system()).get()); + } + + /** Returns the version a deployment to this zone should use for this application, or empty if we don't know */ + private Optional<ApplicationRevision> currentDeployRevision(Zone zone) { + if (!deploying().isPresent()) { + return currentRevision(zone); + } else if (deploying().get() instanceof Change.VersionChange) { + return currentRevision(zone); + } else { + return ((Change.ApplicationChange) deploying().get()).revision(); + } + } + + /** + * Returns the current revision this application has, or if none; should use assuming no change, + * in the given zone. Empty if not known + */ + private Optional<ApplicationRevision> currentRevision(Zone zone) { + Deployment currentDeployment = deployments().get(zone); + if (currentDeployment != null) { // Already deployed in this zone: Use that revision + return Optional.of(currentDeployment.revision()); + } + return Optional.empty(); + } + +} diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentOrder.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentOrder.java index f9a3ba0e8d3..bb84c9e17d4 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentOrder.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentOrder.java @@ -6,6 +6,7 @@ import com.yahoo.config.provision.Environment; import com.yahoo.config.provision.Zone; import com.yahoo.vespa.hosted.controller.Application; import com.yahoo.vespa.hosted.controller.Controller; +import com.yahoo.vespa.hosted.controller.LockedApplication; import com.yahoo.vespa.hosted.controller.application.Change; import com.yahoo.vespa.hosted.controller.application.Deployment; import com.yahoo.vespa.hosted.controller.application.DeploymentJobs; @@ -49,7 +50,7 @@ public class DeploymentOrder { /** Returns a list of jobs to trigger after the given job */ // TODO: This does too much - should just tell us the order, as advertised - public List<JobType> nextAfter(JobType job, Application application) { + public List<JobType> nextAfter(JobType job, LockedApplication application) { if ( ! application.deploying().isPresent()) { // Change was cancelled return Collections.emptyList(); } diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentTrigger.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentTrigger.java index 11074e1a68e..131d89fd650 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentTrigger.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentTrigger.java @@ -3,13 +3,13 @@ package com.yahoo.vespa.hosted.controller.deployment; import com.yahoo.config.application.api.DeploymentSpec; import com.yahoo.config.provision.ApplicationId; -import com.yahoo.config.provision.Environment; import com.yahoo.config.provision.SystemName; import com.yahoo.config.provision.Zone; import com.yahoo.vespa.curator.Lock; import com.yahoo.vespa.hosted.controller.Application; import com.yahoo.vespa.hosted.controller.ApplicationController; import com.yahoo.vespa.hosted.controller.Controller; +import com.yahoo.vespa.hosted.controller.LockedApplication; import com.yahoo.vespa.hosted.controller.application.ApplicationList; import com.yahoo.vespa.hosted.controller.application.Change; import com.yahoo.vespa.hosted.controller.application.DeploymentJobs; @@ -74,7 +74,7 @@ public class DeploymentTrigger { */ public void triggerFromCompletion(JobReport report) { try (Lock lock = applications().lock(report.applicationId())) { - Application application = applications().require(report.applicationId()); + LockedApplication application = applications().require(report.applicationId(), lock); application = application.withJobCompletion(report, clock.instant(), controller); // Handle successful starting and ending @@ -87,7 +87,7 @@ public class DeploymentTrigger { application = application.withDeploying(Optional.of(Change.ApplicationChange.unknown())); } else { // postpone - applications().store(application.withOutstandingChange(true), lock); + applications().store(application.withOutstandingChange(true)); return; } } @@ -101,17 +101,17 @@ public class DeploymentTrigger { if (report.success()) application = trigger(order.nextAfter(report.jobType(), application), application, String.format("%s completed successfully in build %d", - report.jobType(), report.buildNumber()), lock); + report.jobType(), report.buildNumber())); else if (isCapacityConstrained(report.jobType()) && shouldRetryOnOutOfCapacity(application, report.jobType())) application = trigger(report.jobType(), application, true, String.format("Retrying due to out of capacity in build %d", - report.buildNumber()), lock); + report.buildNumber())); else if (shouldRetryNow(application)) application = trigger(report.jobType(), application, false, String.format("Retrying as build %d just started failing", - report.buildNumber()), lock); + report.buildNumber())); - applications().store(application, lock); + applications().store(application); } } @@ -123,14 +123,14 @@ public class DeploymentTrigger { applications = applications.notPullRequest(); for (Application application : applications.asList()) { try (Lock lock = applications().lock(application.id())) { - application = controller.applications().get(application.id()).orElse(null); // re-get with lock - if (application == null) continue; // application removed - triggerReadyJobs(application, lock); + Optional<LockedApplication> lockedApplication = controller.applications().get(application.id(), lock); + if (!lockedApplication.isPresent()) continue; // application removed + triggerReadyJobs(lockedApplication.get()); } } } - private void triggerReadyJobs(Application application, Lock lock) { + private void triggerReadyJobs(LockedApplication application) { if ( ! application.deploying().isPresent()) return; for (JobType jobType : order.jobsFrom(application.deploymentSpec())) { JobStatus jobStatus = application.deploymentJobs().jobStatus().get(jobType); @@ -145,8 +145,8 @@ public class DeploymentTrigger { nextToTrigger.add(nextJobType); } // Trigger them in parallel - application = trigger(nextToTrigger, application, "Triggering previously blocked jobs", lock); - controller.applications().store(application, lock); + application = trigger(nextToTrigger, application, "Triggering previously blocked jobs"); + controller.applications().store(application); } } @@ -180,7 +180,7 @@ public class DeploymentTrigger { */ public void triggerFailing(ApplicationId applicationId) { try (Lock lock = applications().lock(applicationId)) { - Application application = applications().require(applicationId); + LockedApplication application = applications().require(applicationId, lock); if ( ! application.deploying().isPresent()) return; // No ongoing change, no need to retry // Retry first failing job @@ -188,8 +188,8 @@ public class DeploymentTrigger { JobStatus jobStatus = application.deploymentJobs().jobStatus().get(jobType); if (isFailing(application.deploying().get(), jobStatus)) { if (shouldRetryNow(jobStatus)) { - application = trigger(jobType, application, false, "Retrying failing job", lock); - applications().store(application, lock); + application = trigger(jobType, application, false, "Retrying failing job"); + applications().store(application); } break; } @@ -198,9 +198,8 @@ public class DeploymentTrigger { // Retry dead job Optional<JobStatus> firstDeadJob = firstDeadJob(application.deploymentJobs()); if (firstDeadJob.isPresent()) { - application = trigger(firstDeadJob.get().type(), application, false, "Retrying dead job", - lock); - applications().store(application, lock); + application = trigger(firstDeadJob.get().type(), application, false, "Retrying dead job"); + applications().store(application); } } } @@ -224,10 +223,10 @@ public class DeploymentTrigger { // Trigger next try (Lock lock = applications().lock(application.id())) { - application = applications().require(application.id()); - application = trigger(order.nextAfter(lastSuccessfulJob.get().type(), application), application, - "Resuming delayed deployment", lock); - applications().store(application, lock); + LockedApplication lockedApplication = applications().require(application.id(), lock); + lockedApplication = trigger(order.nextAfter(lastSuccessfulJob.get().type(), lockedApplication), + lockedApplication, "Resuming delayed deployment"); + applications().store(lockedApplication); } } } @@ -240,15 +239,15 @@ public class DeploymentTrigger { */ public void triggerChange(ApplicationId applicationId, Change change) { try (Lock lock = applications().lock(applicationId)) { - Application application = applications().require(applicationId); + LockedApplication application = applications().require(applicationId, lock); if (application.deploying().isPresent() && ! application.deploymentJobs().hasFailures()) throw new IllegalArgumentException("Could not start " + change + " on " + application + ": " + application.deploying().get() + " is already in progress"); application = application.withDeploying(Optional.of(change)); if (change instanceof Change.ApplicationChange) application = application.withOutstandingChange(false); - application = trigger(JobType.systemTest, application, false, "Deploying change", lock); - applications().store(application, lock); + application = trigger(JobType.systemTest, application, false, "Deploying change"); + applications().store(application); } } @@ -259,10 +258,10 @@ public class DeploymentTrigger { */ public void cancelChange(ApplicationId applicationId) { try (Lock lock = applications().lock(applicationId)) { - Application application = applications().require(applicationId); + LockedApplication application = applications().require(applicationId, lock); buildSystem.removeJobs(application.id()); application = application.withDeploying(Optional.empty()); - applications().store(application, lock); + applications().store(application); } } @@ -344,15 +343,15 @@ public class DeploymentTrigger { * @param cause describes why the job is triggered * @return the application in the triggered state, which *must* be stored by the caller */ - private Application trigger(JobType jobType, Application application, boolean first, String cause, Lock lock) { + private LockedApplication trigger(JobType jobType, LockedApplication application, boolean first, String cause) { if (isRunningProductionJob(application)) return application; - return triggerAllowParallel(jobType, application, first, false, cause, lock); + return triggerAllowParallel(jobType, application, first, false, cause); } - private Application trigger(List<JobType> jobs, Application application, String cause, Lock lock) { + private LockedApplication trigger(List<JobType> jobs, LockedApplication application, String cause) { if (isRunningProductionJob(application)) return application; for (JobType job : jobs) - application = triggerAllowParallel(job, application, false, false, cause, lock); + application = triggerAllowParallel(job, application, false, false, cause); return application; } @@ -366,8 +365,8 @@ public class DeploymentTrigger { * @param reason describes why the job is triggered * @return the application in the triggered state, if actually triggered. This *must* be stored by the caller */ - public Application triggerAllowParallel(JobType jobType, Application application, - boolean first, boolean force, String reason, Lock lock) { + public LockedApplication triggerAllowParallel(JobType jobType, LockedApplication application, + boolean first, boolean force, String reason) { if (jobType == null) return application; // we are passed null when the last job has been reached // Never allow untested changes to go through // Note that this may happen because a new change catches up and prevents an older one from continuing @@ -382,11 +381,12 @@ public class DeploymentTrigger { application.deploying().map(d -> "deploying " + d).orElse("restarted deployment"), reason)); buildSystem.addJob(application.id(), jobType, first); - return application.withJobTriggering(-1, jobType, application.deploying(), reason, clock.instant(), controller); + return application.withJobTriggering(-1, jobType, application.deploying(), reason, clock.instant(), + controller); } /** Returns true if the given proposed job triggering should be effected */ - private boolean allowedTriggering(JobType jobType, Application application) { + private boolean allowedTriggering(JobType jobType, LockedApplication application) { // Note: We could make a more fine-grained and more correct determination about whether to block // by instead basing the decision on what is currently deployed in the zone. However, // this leads to some additional corner cases, and the possibility of blocking an application @@ -405,7 +405,7 @@ public class DeploymentTrigger { .anyMatch(entry -> entry.getKey().isProduction() && entry.getValue().isRunning(jobTimeoutLimit())); } - private boolean acceptNewRevisionNow(Application application) { + private boolean acceptNewRevisionNow(LockedApplication application) { if ( ! application.deploying().isPresent()) return true; if ( application.deploying().get() instanceof Change.ApplicationChange) return true; // more changes are ok diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/ClusterInfoMaintainer.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/ClusterInfoMaintainer.java index a6bf0112e5d..275aedfc812 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/ClusterInfoMaintainer.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/ClusterInfoMaintainer.java @@ -7,6 +7,7 @@ import com.yahoo.config.provision.Zone; import com.yahoo.vespa.curator.Lock; import com.yahoo.vespa.hosted.controller.Application; import com.yahoo.vespa.hosted.controller.Controller; +import com.yahoo.vespa.hosted.controller.LockedApplication; import com.yahoo.vespa.hosted.controller.api.identifiers.DeploymentId; import com.yahoo.vespa.hosted.controller.api.integration.configserver.NodeList; import com.yahoo.vespa.hosted.controller.application.ClusterInfo; @@ -87,16 +88,16 @@ public class ClusterInfoMaintainer extends Maintainer { protected void maintain() { for (Application application : controller().applications().asList()) { try (Lock lock = controller().applications().lock(application.id())) { - application = controller.applications().get(application.id()).orElse(null); // re-get inside lock - if (application == null) continue; // application removed + Optional<LockedApplication> lockedApplication = controller.applications().get(application.id(), lock); + if (!lockedApplication.isPresent()) continue; // application removed - for (Deployment deployment : application.deployments().values()) { + for (Deployment deployment : lockedApplication.get().deployments().values()) { DeploymentId deploymentId = new DeploymentId(application.id(), deployment.zone()); try { NodeList nodes = controller().applications().configserverClient().getNodeList(deploymentId); Map<ClusterSpec.Id, ClusterInfo> clusterInfo = getClusterInfo(nodes, deployment.zone()); - Application app = application.with(deployment.withClusterInfo(clusterInfo)); - controller.applications().store(app, lock); + controller.applications().store(lockedApplication.get() + .with(deployment.withClusterInfo(clusterInfo))); } catch (IOException | IllegalArgumentException e) { log.log(Level.WARNING, "Failing getting cluster info of for " + deploymentId, e); diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/ClusterUtilizationMaintainer.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/ClusterUtilizationMaintainer.java index f9a3de87cab..60b890f10fb 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/ClusterUtilizationMaintainer.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/ClusterUtilizationMaintainer.java @@ -7,6 +7,7 @@ import com.yahoo.config.provision.Zone; import com.yahoo.vespa.curator.Lock; import com.yahoo.vespa.hosted.controller.Application; import com.yahoo.vespa.hosted.controller.Controller; +import com.yahoo.vespa.hosted.controller.LockedApplication; import com.yahoo.vespa.hosted.controller.api.integration.MetricsService; import com.yahoo.vespa.hosted.controller.application.ClusterUtilization; import com.yahoo.vespa.hosted.controller.application.Deployment; @@ -14,6 +15,7 @@ import com.yahoo.vespa.hosted.controller.application.Deployment; import java.time.Duration; import java.util.HashMap; import java.util.Map; +import java.util.Optional; /** * Fetch utilization metrics and update applications with this data. @@ -46,12 +48,12 @@ public class ClusterUtilizationMaintainer extends Maintainer { protected void maintain() { for (Application application : controller().applications().asList()) { try (Lock lock = controller().applications().lock(application.id())) { - application = controller.applications().get(application.id()).orElse(null); // re-get inside lock - if (application == null) continue; // application removed + Optional<LockedApplication> lockedApplication = controller.applications().get(application.id(), lock); + if (!lockedApplication.isPresent()) continue; // application removed for (Deployment deployment : application.deployments().values()) { Map<ClusterSpec.Id, ClusterUtilization> clusterUtilization = getUpdatedClusterUtilizations(application.id(), deployment.zone()); - Application app = application.with(deployment.withClusterUtils(clusterUtilization)); - controller.applications().store(app, lock); + controller.applications().store(lockedApplication.get() + .with(deployment.withClusterUtils(clusterUtilization))); } } } diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/DeploymentIssueReporter.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/DeploymentIssueReporter.java index 23068b9567f..3e42fda73b3 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/DeploymentIssueReporter.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/DeploymentIssueReporter.java @@ -2,6 +2,7 @@ package com.yahoo.vespa.hosted.controller.maintenance; import com.yahoo.config.provision.ApplicationId; +import com.yahoo.vespa.curator.Lock; import com.yahoo.vespa.hosted.controller.Application; import com.yahoo.vespa.hosted.controller.Controller; import com.yahoo.vespa.hosted.controller.api.Tenant; @@ -62,7 +63,7 @@ public class DeploymentIssueReporter extends Maintainer { if (oldApplicationChangeFailuresIn(application.deploymentJobs())) failingApplications.add(application.id()); else - controller().applications().setIssueId(application.id(), null); + storeIssueId(application.id(), null); failingApplications.forEach(this::fileDeploymentIssueFor); @@ -116,7 +117,7 @@ public class DeploymentIssueReporter extends Maintainer { IssueId issueId = tenant.tenantType() == TenantType.USER ? deploymentIssues.fileUnlessOpen(ourIssueId, applicationId, userFor(tenant)) : deploymentIssues.fileUnlessOpen(ourIssueId, applicationId, propertyIdFor(tenant)); - controller().applications().setIssueId(applicationId, issueId); + storeIssueId(applicationId, issueId); } catch (RuntimeException e) { // Catch errors due to wrong data in the controller, or issues client timeout. log.log(Level.WARNING, "Exception caught when attempting to file an issue for " + applicationId, e); @@ -135,4 +136,12 @@ public class DeploymentIssueReporter extends Maintainer { })); } + private void storeIssueId(ApplicationId id, IssueId issueId) { + try (Lock lock = controller().applications().lock(id)) { + controller().applications().get(id, lock).ifPresent( + application -> controller().applications().store(application.with(issueId)) + ); + } + } + } diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/DeploymentMetricsMaintainer.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/DeploymentMetricsMaintainer.java index 8c28ec63517..2e6e378272d 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/DeploymentMetricsMaintainer.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/DeploymentMetricsMaintainer.java @@ -4,6 +4,7 @@ package com.yahoo.vespa.hosted.controller.maintenance;// Copyright 2017 Yahoo Ho import com.yahoo.vespa.curator.Lock; import com.yahoo.vespa.hosted.controller.Application; import com.yahoo.vespa.hosted.controller.Controller; +import com.yahoo.vespa.hosted.controller.LockedApplication; import com.yahoo.vespa.hosted.controller.api.integration.MetricsService; import com.yahoo.vespa.hosted.controller.application.Deployment; import com.yahoo.vespa.hosted.controller.application.DeploymentMetrics; @@ -11,6 +12,7 @@ import com.yahoo.yolean.Exceptions; import java.io.UncheckedIOException; import java.time.Duration; +import java.util.Optional; import java.util.logging.Level; import java.util.logging.Logger; @@ -42,15 +44,15 @@ public class DeploymentMetricsMaintainer extends Maintainer { try (Lock lock = controller().applications().lock(application.id())) { // Deployment or application may have changed (or be gone) now: - application = controller().applications().get(application.id()).orElse(null); - if (application == null) - break; + Optional<LockedApplication> lockedApplication = controller().applications() + .get(application.id(), lock); + if (!lockedApplication.isPresent()) continue; - deployment = application.deployments().get(deployment.zone()); - if (deployment == null) - continue; + deployment = lockedApplication.get().deployments().get(deployment.zone()); + if (deployment == null) continue; - controller().applications().store(application.with(deployment.withMetrics(appMetrics)), lock); + controller().applications().store(lockedApplication.get() + .with(deployment.withMetrics(appMetrics))); } } catch (UncheckedIOException e) { diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/filter/UnauthenticatedUserPrincipal.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/filter/UnauthenticatedUserPrincipal.java deleted file mode 100644 index a88e881ce9d..00000000000 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/filter/UnauthenticatedUserPrincipal.java +++ /dev/null @@ -1,44 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.hosted.controller.restapi.filter; - -import java.security.Principal; -import java.util.Objects; - -/** - * A principal for an unauthenticated user (typically from a trusted host). - * This principal should only be used in combination with machine authentication! - * - * @author bjorncs - */ -public class UnauthenticatedUserPrincipal implements Principal { - private final String username; - - public UnauthenticatedUserPrincipal(String username) { - this.username = username; - } - - @Override - public String getName() { - return username; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - UnauthenticatedUserPrincipal that = (UnauthenticatedUserPrincipal) o; - return Objects.equals(username, that.username); - } - - @Override - public int hashCode() { - return Objects.hash(username); - } - - @Override - public String toString() { - return "UnauthenticatedUserPrincipal{" + - "username='" + username + '\'' + - '}'; - } -} diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/filter/UserIdRequestFilter.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/filter/UserIdRequestFilter.java deleted file mode 100644 index 46df4d7a603..00000000000 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/filter/UserIdRequestFilter.java +++ /dev/null @@ -1,23 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.hosted.controller.restapi.filter; - -import com.yahoo.jdisc.handler.ResponseHandler; -import com.yahoo.jdisc.http.filter.DiscFilterRequest; -import com.yahoo.jdisc.http.filter.SecurityRequestFilter; -import com.yahoo.vespa.hosted.controller.api.nonpublic.HeaderFields; -import com.yahoo.yolean.chain.Before; - -/** - * Allows hosts using host-based authentication to set user ID. - * - * @author Tony Vaagenes - */ -@Before("CreateSecurityContextFilter") -public class UserIdRequestFilter implements SecurityRequestFilter { - - @Override - public void filter(DiscFilterRequest request, ResponseHandler handler) { - String userName = request.getHeader(HeaderFields.USER_ID_HEADER_FIELD); - request.setUserPrincipal(new UnauthenticatedUserPrincipal(userName)); - } -} diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/screwdriver/ScrewdriverApiHandler.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/screwdriver/ScrewdriverApiHandler.java index 06e87ffe2f8..3dbff0b4aa3 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/screwdriver/ScrewdriverApiHandler.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/screwdriver/ScrewdriverApiHandler.java @@ -13,8 +13,8 @@ import com.yahoo.slime.Inspector; import com.yahoo.slime.Slime; import com.yahoo.vespa.config.SlimeUtils; import com.yahoo.vespa.curator.Lock; -import com.yahoo.vespa.hosted.controller.Application; import com.yahoo.vespa.hosted.controller.Controller; +import com.yahoo.vespa.hosted.controller.LockedApplication; import com.yahoo.vespa.hosted.controller.api.integration.BuildService.BuildJob; import com.yahoo.vespa.hosted.controller.application.DeploymentJobs.JobError; import com.yahoo.vespa.hosted.controller.application.DeploymentJobs.JobReport; @@ -105,24 +105,22 @@ public class ScrewdriverApiHandler extends LoggingRequestHandler { private HttpResponse trigger(HttpRequest request, String tenantName, String applicationName) { ApplicationId applicationId = ApplicationId.from(tenantName, applicationName, "default"); try (Lock lock = controller.applications().lock(applicationId)) { - Application application = controller.applications().require(applicationId); + LockedApplication application = controller.applications().require(applicationId, lock); JobType jobType = Optional.of(asString(request.getData())) .filter(s -> !s.isEmpty()) .map(JobType::fromId) .orElse(JobType.component); // Since this is a manual operation we likely want it to trigger as soon as possible so we add it at to the // front of the queue - application = controller.applications().deploymentTrigger().triggerAllowParallel(jobType, - application, - true, - true, - "Triggered from the screwdriver/v1 web service", - lock); - controller.applications().store(application, lock); + application = controller.applications().deploymentTrigger().triggerAllowParallel( + jobType, application, true, true, + "Triggered from the screwdriver/v1 web service" + ); + controller.applications().store(application); Slime slime = new Slime(); Cursor cursor = slime.setObject(); - cursor.setString("message", "Triggered " + jobType.id() + " for " + application.id()); + cursor.setString("message", "Triggered " + jobType.id() + " for " + applicationId); return new SlimeJsonResponse(slime); } } diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/ControllerTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/ControllerTest.java index 572e96edf1c..aea66f3cd67 100644 --- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/ControllerTest.java +++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/ControllerTest.java @@ -527,13 +527,16 @@ public class ControllerTest { TenantId tenant = tester.createTenant("tenant1", "domain1", 11L); Application app = tester.createApplication(tenant, "app1", "default", 1); - app = app.withDeploying(Optional.of(new Change.VersionChange(Version.fromString("6.3")))); - applications.store(app, applications.lock(app.id())); - try { - tester.deploy(app, new Zone(Environment.prod, RegionName.from("us-east-3"))); - fail("Expected exception"); - } catch (IllegalArgumentException e) { - assertEquals("Rejecting deployment of application 'tenant1.app1' to zone prod.us-east-3 as version change to 6.3 is not tested", e.getMessage()); + try (Lock lock = tester.controller().applications().lock(app.id())) { + LockedApplication application = tester.controller().applications().require(app.id(), lock); + application = application.withDeploying(Optional.of(new Change.VersionChange(Version.fromString("6.3")))); + applications.store(application); + try { + tester.deploy(app, new Zone(Environment.prod, RegionName.from("us-east-3"))); + fail("Expected exception"); + } catch (IllegalArgumentException e) { + assertEquals("Rejecting deployment of application 'tenant1.app1' to zone prod.us-east-3 as version change to 6.3 is not tested", e.getMessage()); + } } } @@ -557,7 +560,7 @@ public class ControllerTest { Slime slime = SlimeUtils.jsonToSlime(json); Application application = serializer.fromSlime(slime); try (Lock lock = tester.controller().applications().lock(application.id())) { - tester.controller().applications().store(application, lock); + tester.controller().applications().store(new LockedApplication(application, lock)); } ApplicationPackage applicationPackage = new ApplicationPackageBuilder() diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/ControllerTester.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/ControllerTester.java index deb7b77eb56..4e3c15ea1a4 100644 --- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/ControllerTester.java +++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/ControllerTester.java @@ -8,7 +8,10 @@ import com.yahoo.config.provision.InstanceName; import com.yahoo.config.provision.RegionName; import com.yahoo.config.provision.TenantName; import com.yahoo.config.provision.Zone; +import com.yahoo.slime.Slime; import com.yahoo.test.ManualClock; +import com.yahoo.vespa.curator.Lock; +import com.yahoo.vespa.curator.mock.MockCurator; import com.yahoo.vespa.hosted.controller.api.Tenant; import com.yahoo.vespa.hosted.controller.api.application.v4.model.DeployOptions; import com.yahoo.vespa.hosted.controller.api.application.v4.model.GitRevision; @@ -28,9 +31,10 @@ import com.yahoo.vespa.hosted.controller.api.integration.github.GitHubMock; import com.yahoo.vespa.hosted.controller.api.integration.organization.MockOrganization; import com.yahoo.vespa.hosted.controller.api.integration.routing.MemoryGlobalRoutingService; import com.yahoo.vespa.hosted.controller.application.ApplicationPackage; -import com.yahoo.vespa.hosted.controller.athenz.mock.AthenzDbMock; import com.yahoo.vespa.hosted.controller.athenz.mock.AthenzClientFactoryMock; +import com.yahoo.vespa.hosted.controller.athenz.mock.AthenzDbMock; import com.yahoo.vespa.hosted.controller.integration.MockMetricsService; +import com.yahoo.vespa.hosted.controller.persistence.ApplicationSerializer; import com.yahoo.vespa.hosted.controller.persistence.ControllerDb; import com.yahoo.vespa.hosted.controller.persistence.CuratorDb; import com.yahoo.vespa.hosted.controller.persistence.MemoryControllerDb; @@ -42,7 +46,6 @@ import com.yahoo.vespa.hosted.rotation.MemoryRotationRepository; import java.util.Optional; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; /** * Convenience methods for controller tests. @@ -139,6 +142,16 @@ public final class ControllerTester { return createAndDeploy(tenantName, domainName, applicationName, environment, projectId, null); } + /** Create application from slime */ + public Application createApplication(Slime slime) { + ApplicationSerializer serializer = new ApplicationSerializer(); + Application application = serializer.fromSlime(slime); + try (Lock lock = controller().applications().lock(application.id())) { + controller().applications().store(new LockedApplication(application, lock)); + } + return application; + } + public Zone toZone(Environment environment) { switch (environment) { case dev: case test: return new Zone(environment, RegionName.from("us-east-1")); @@ -167,10 +180,13 @@ public final class ControllerTester { public Application createApplication(TenantId tenant, String applicationName, String instanceName, long projectId) { ApplicationId applicationId = applicationId(tenant.id(), applicationName, instanceName); - Application application = controller().applications().createApplication(applicationId, Optional.of(TestIdentities.userNToken)) - .withProjectId(projectId); - assertTrue(controller().applications().get(applicationId).isPresent()); - return application; + controller().applications().createApplication(applicationId, Optional.of(TestIdentities.userNToken)); + try (Lock lock = controller().applications().lock(applicationId)) { + LockedApplication lockedApplication = controller().applications().require(applicationId, lock) + .withProjectId(projectId); + controller().applications().store(lockedApplication); + return lockedApplication; + } } public void deploy(Application application, Zone zone) { @@ -196,6 +212,11 @@ public final class ControllerTester { InstanceName.from(instance)); } + // Used by ApplicationSerializerTest to avoid breaking encapsulation. Should not be used by anything else + public static LockedApplication writable(Application application) { + return new LockedApplication(application, new Lock("/test", new MockCurator())); + } + private static Controller createController(ControllerDb db, CuratorDb curator, ConfigServerClientMock configServerClientMock, ManualClock clock, GitHubMock gitHubClientMock, ZoneRegistryMock zoneRegistryMock, diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/FailureRedeployerTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/FailureRedeployerTest.java index 4c935747ac6..2782dd6ec3b 100644 --- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/FailureRedeployerTest.java +++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/FailureRedeployerTest.java @@ -6,13 +6,11 @@ import com.yahoo.config.provision.Environment; import com.yahoo.config.provision.SystemName; import com.yahoo.slime.Slime; import com.yahoo.vespa.config.SlimeUtils; -import com.yahoo.vespa.curator.Lock; import com.yahoo.vespa.hosted.controller.Application; import com.yahoo.vespa.hosted.controller.application.ApplicationPackage; import com.yahoo.vespa.hosted.controller.application.DeploymentJobs; import com.yahoo.vespa.hosted.controller.deployment.ApplicationPackageBuilder; import com.yahoo.vespa.hosted.controller.deployment.DeploymentTester; -import com.yahoo.vespa.hosted.controller.persistence.ApplicationSerializer; import org.junit.Test; import java.nio.file.Files; @@ -186,13 +184,9 @@ public class FailureRedeployerTest { assertEquals(version, tester.controller().versionStatus().systemVersion().get().versionNumber()); // Load test data data - ApplicationSerializer serializer = new ApplicationSerializer(); byte[] json = Files.readAllBytes(Paths.get("src/test/java/com/yahoo/vespa/hosted/controller/maintenance/testdata/canary-with-stale-data.json")); Slime slime = SlimeUtils.jsonToSlime(json); - Application application = serializer.fromSlime(slime); - try (Lock lock = tester.controller().applications().lock(application.id())) { - tester.controller().applications().store(application, lock); - } + Application application = tester.controllerTester().createApplication(slime); ApplicationPackage applicationPackage = new ApplicationPackageBuilder() .upgradePolicy("canary") .region("cd-us-central-1") @@ -242,14 +236,9 @@ public class FailureRedeployerTest { assertEquals(version, tester.controller().versionStatus().systemVersion().get().versionNumber()); // Load test data data - ApplicationSerializer serializer = new ApplicationSerializer(); byte[] json = Files.readAllBytes(Paths.get("src/test/java/com/yahoo/vespa/hosted/controller/maintenance/testdata/pr-instance-with-dead-locked-job.json")); Slime slime = SlimeUtils.jsonToSlime(json); - Application application = serializer.fromSlime(slime); - - try (Lock lock = tester.controller().applications().lock(application.id())) { - tester.controller().applications().store(application, lock); - } + Application application = tester.controllerTester().createApplication(slime); // Failure redeployer does not restart deployment tester.failureRedeployer().maintain(); @@ -267,14 +256,9 @@ public class FailureRedeployerTest { assertEquals(version, tester.controller().versionStatus().systemVersion().get().versionNumber()); // Load test data data - ApplicationSerializer serializer = new ApplicationSerializer(); byte[] json = Files.readAllBytes(Paths.get("src/test/java/com/yahoo/vespa/hosted/controller/maintenance/testdata/application-without-project-id.json")); Slime slime = SlimeUtils.jsonToSlime(json); - Application application = serializer.fromSlime(slime); - - try (Lock lock = tester.controller().applications().lock(application.id())) { - tester.controller().applications().store(application, lock); - } + tester.controllerTester().createApplication(slime); // Failure redeployer does not restart deployment tester.failureRedeployer().maintain(); diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/persistence/ApplicationSerializerTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/persistence/ApplicationSerializerTest.java index f65996feac8..b38a38c3120 100644 --- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/persistence/ApplicationSerializerTest.java +++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/persistence/ApplicationSerializerTest.java @@ -34,6 +34,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import static com.yahoo.vespa.hosted.controller.ControllerTester.writable; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -135,20 +136,20 @@ public class ApplicationSerializerTest { assertEquals(6, serialized.deployments().get(zone2).metrics().writeLatencyMillis(), Double.MIN_VALUE); { // test more deployment serialization cases - Application original2 = original.withDeploying(Optional.of(Change.ApplicationChange.of(ApplicationRevision.from("hash1")))); + Application original2 = writable(original).withDeploying(Optional.of(Change.ApplicationChange.of(ApplicationRevision.from("hash1")))); Application serialized2 = applicationSerializer.fromSlime(applicationSerializer.toSlime(original2)); assertEquals(original2.deploying(), serialized2.deploying()); assertEquals(((Change.ApplicationChange)serialized2.deploying().get()).revision().get().source(), ((Change.ApplicationChange)original2.deploying().get()).revision().get().source()); - Application original3 = original.withDeploying(Optional.of(Change.ApplicationChange.of(ApplicationRevision.from("hash1", - new SourceRevision("a", "b", "c"))))); + Application original3 = writable(original).withDeploying(Optional.of(Change.ApplicationChange.of(ApplicationRevision.from("hash1", + new SourceRevision("a", "b", "c"))))); Application serialized3 = applicationSerializer.fromSlime(applicationSerializer.toSlime(original3)); assertEquals(original3.deploying(), serialized2.deploying()); assertEquals(((Change.ApplicationChange)serialized3.deploying().get()).revision().get().source(), ((Change.ApplicationChange)original3.deploying().get()).revision().get().source()); - Application original4 = original.withDeploying(Optional.empty()); + Application original4 = writable(original).withDeploying(Optional.empty()); Application serialized4 = applicationSerializer.fromSlime(applicationSerializer.toSlime(original4)); assertEquals(original4.deploying(), serialized4.deploying()); } diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiTest.java index eeeed08c2b8..278df2f9b1e 100644 --- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiTest.java +++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiTest.java @@ -8,6 +8,7 @@ import com.yahoo.config.provision.Environment; import com.yahoo.vespa.curator.Lock; import com.yahoo.vespa.hosted.controller.Application; import com.yahoo.vespa.hosted.controller.ConfigServerClientMock; +import com.yahoo.vespa.hosted.controller.LockedApplication; import com.yahoo.vespa.hosted.controller.api.identifiers.AthenzDomain; import com.yahoo.vespa.hosted.controller.api.identifiers.PropertyId; import com.yahoo.vespa.hosted.controller.api.identifiers.UserId; @@ -753,6 +754,8 @@ public class ApplicationApiTest extends ControllerContainerTest { private void setDeploymentMaintainedInfo(ContainerControllerTester controllerTester) { for (Application application : controllerTester.controller().applications().asList()) { try (Lock lock = controllerTester.controller().applications().lock(application.id())) { + LockedApplication lockedApplication = controllerTester.controller().applications() + .require(application.id(), lock); for (Deployment deployment : application.deployments().values()) { Map<ClusterSpec.Id, ClusterInfo> clusterInfo = new HashMap<>(); List<String> hostnames = new ArrayList<>(); @@ -764,8 +767,7 @@ public class ApplicationApiTest extends ControllerContainerTest { deployment = deployment.withClusterInfo(clusterInfo); deployment = deployment.withClusterUtils(clusterUtils); deployment = deployment.withMetrics(new DeploymentMetrics(1,2,3,4,5)); - application = application.with(deployment); - controllerTester.controller().applications().store(application, lock); + controllerTester.controller().applications().store(lockedApplication.with(deployment)); } } } diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/screwdriver/ScrewdriverApiTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/screwdriver/ScrewdriverApiTest.java index 3dc6a3326c9..1638a2845ed 100644 --- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/screwdriver/ScrewdriverApiTest.java +++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/screwdriver/ScrewdriverApiTest.java @@ -149,8 +149,9 @@ public class ScrewdriverApiTest extends ControllerContainerTest { Application app = tester.createApplication(); try (Lock lock = tester.controller().applications().lock(app.id())) { - app = app.withProjectId(1); - tester.controller().applications().store(app, lock); + tester.controller().applications().store( + tester.controller().applications().require(app.id(), lock).withProjectId(1) + ); } // Unknown application diff --git a/fastos/src/tests/job.h b/fastos/src/tests/job.h index 1d35ec95270..a5b84fa0f9c 100644 --- a/fastos/src/tests/job.h +++ b/fastos/src/tests/job.h @@ -2,9 +2,6 @@ #pragma once -#include <mutex> -#include <condition_variable> - enum JobCode { PRINT_MESSAGE_AND_WAIT3SEC, @@ -31,8 +28,9 @@ private: public: JobCode code; char *message; - std::mutex *mutex; - std::condition_variable *condition; + FastOS_Mutex *mutex; + FastOS_Cond *condition; + FastOS_BoolCond *boolcondition; FastOS_ThreadInterface *otherThread, *ownThread; double *timebuf; double average; @@ -47,6 +45,7 @@ public: message(nullptr), mutex(nullptr), condition(nullptr), + boolcondition(nullptr), otherThread(nullptr), ownThread(nullptr), timebuf(nullptr), diff --git a/fastos/src/tests/processtest.cpp b/fastos/src/tests/processtest.cpp index cd6839fd0aa..11e1307027d 100644 --- a/fastos/src/tests/processtest.cpp +++ b/fastos/src/tests/processtest.cpp @@ -17,14 +17,15 @@ public: static int _allocCount; static int _successCount; static int _failCount; - static std::mutex *_counterLock; + static FastOS_Mutex *_counterLock; MyListener (const char *title) : _title(title), _receivedBytes(0) { - std::lock_guard<std::mutex> guard(*_counterLock); - _allocCount++; + _counterLock->Lock(); + _allocCount++; + _counterLock->Unlock(); } virtual ~MyListener () @@ -33,13 +34,14 @@ public: const int correctByteCount = 16; - std::lock_guard<std::mutex> guard(*_counterLock); + _counterLock->Lock(); if(_receivedBytes == (isStdout ? correctByteCount : 0)) _successCount++; else _failCount++; _allocCount--; + _counterLock->Unlock(); } void OnReceiveData (const void *data, size_t length) override @@ -60,7 +62,7 @@ public: int MyListener::_allocCount = 0; int MyListener::_successCount = 0; int MyListener::_failCount = 0; -std::mutex *MyListener::_counterLock = nullptr; +FastOS_Mutex *MyListener::_counterLock = nullptr; class ThreadRunJob : public FastOS_Runnable @@ -120,7 +122,7 @@ private: // or not. bool _gotMessage; int _receivedMessages; - std::mutex *_counterLock; + FastOS_Mutex *_counterLock; bool _isChild; public: ProcessTest () @@ -154,8 +156,9 @@ public: // We only have the counter lock if we are the parent process. if(_counterLock != nullptr) { - std::lock_guard<std::mutex> guard(*_counterLock); - _receivedMessages++; + _counterLock->Lock(); + _receivedMessages++; + _counterLock->Unlock(); } } @@ -216,7 +219,7 @@ public: const int numLoops = 100; const int numEachTime = 40; - MyListener::_counterLock = new std::mutex; + MyListener::_counterLock = new FastOS_Mutex(); char testHeader[200]; strcpy(testHeader, "Process Test"); @@ -378,7 +381,7 @@ public: TestHeader ("IPC Test"); const char *childProgram = _argv[1]; - _counterLock = new std::mutex; + _counterLock = new FastOS_Mutex(); int i; for(i=0; i<30; i++) diff --git a/fastos/src/tests/thread_bounce_test.cpp b/fastos/src/tests/thread_bounce_test.cpp index 423221d55cb..bf94f3e1aab 100644 --- a/fastos/src/tests/thread_bounce_test.cpp +++ b/fastos/src/tests/thread_bounce_test.cpp @@ -14,10 +14,8 @@ class Thread_Bounce_Test : public ThreadTestBase TestHeader("Bounce Test"); FastOS_ThreadPool pool(128 * 1024); - std::mutex mutex1; - std::condition_variable cond1; - std::mutex mutex2; - std::condition_variable cond2; + FastOS_Cond cond1; + FastOS_Cond cond2; Job job1; Job job2; FastOS_Time checkTime; @@ -30,9 +28,7 @@ class Thread_Bounce_Test : public ThreadTestBase job2.code = BOUNCE_CONDITIONS; job1.otherjob = &job2; job2.otherjob = &job1; - job1.mutex = &mutex1; job1.condition = &cond1; - job2.mutex = &mutex2; job2.condition = &cond2; job1.ownThread = pool.NewThread(this, static_cast<void *>(&job1)); @@ -48,28 +44,28 @@ class Thread_Bounce_Test : public ThreadTestBase left = static_cast<int>(checkTime.MilliSecsToNow()); } - mutex1.lock(); + cond1.Lock(); cnt1 = job1.bouncewakeupcnt; - mutex1.unlock(); - mutex2.lock(); + cond1.Unlock(); + cond2.Lock(); cnt2 = job2.bouncewakeupcnt; - mutex2.unlock(); + cond2.Unlock(); cntsum = cnt1 + cnt2; Progress(lastcntsum != cntsum, "%d bounces", cntsum); lastcntsum = cntsum; } job1.ownThread->SetBreakFlag(); - mutex1.lock(); + cond1.Lock(); job1.bouncewakeup = true; - cond1.notify_one(); - mutex1.unlock(); + cond1.Signal(); + cond1.Unlock(); job2.ownThread->SetBreakFlag(); - mutex2.lock(); + cond2.Lock(); job2.bouncewakeup = true; - cond2.notify_one(); - mutex2.unlock(); + cond2.Signal(); + cond2.Unlock(); pool.Close(); Progress(true, "Pool closed."); diff --git a/fastos/src/tests/thread_joinwait_test.cpp b/fastos/src/tests/thread_joinwait_test.cpp index 7153a05f836..05ab1627334 100644 --- a/fastos/src/tests/thread_joinwait_test.cpp +++ b/fastos/src/tests/thread_joinwait_test.cpp @@ -25,11 +25,11 @@ class Thread_JoinWait_Test : public ThreadTestBase Job jobs[testThreads]; - std::mutex jobMutex; + FastOS_Mutex jobMutex; // The mutex is used to pause the first threads until we have created // the last one. - jobMutex.lock(); + jobMutex.Lock(); for(i=0; i<lastThreadNum; i++) { @@ -68,7 +68,7 @@ class Thread_JoinWait_Test : public ThreadTestBase } } - jobMutex.unlock(); + jobMutex.Unlock(); if((variant & 1) != 0) { diff --git a/fastos/src/tests/thread_mutex_test.cpp b/fastos/src/tests/thread_mutex_test.cpp index d49cf37163d..b8ac575038b 100644 --- a/fastos/src/tests/thread_mutex_test.cpp +++ b/fastos/src/tests/thread_mutex_test.cpp @@ -25,11 +25,10 @@ class Thread_Mutex_Test : public ThreadTestBase { int i; Job jobs[MUTEX_TEST_THREADS]; - std::mutex *myMutex=nullptr; + FastOS_Mutex *myMutex=nullptr; - if(usingMutex) { - myMutex = new std::mutex; - } + if(usingMutex) + myMutex = new FastOS_Mutex(); for(i=0; i<MUTEX_TEST_THREADS; i++) { @@ -118,7 +117,7 @@ class Thread_Mutex_Test : public ThreadTestBase FastOS_ThreadPool pool(128*1024); Job job; - std::mutex mtx; + FastOS_Mutex mtx; job.code = HOLD_MUTEX_FOR2SEC; job.result = -1; @@ -136,28 +135,28 @@ class Thread_Mutex_Test : public ThreadTestBase for(int i=0; i<5; i++) { - lockrc = mtx.try_lock(); + lockrc = mtx.TryLock(); Progress(!lockrc, "We should not get the mutex lock just yet (%s)", lockrc ? "got it" : "didn't get it"); if(lockrc) { - mtx.unlock(); + mtx.Unlock(); break; } } FastOS_Thread::Sleep(2000); - lockrc = mtx.try_lock(); + lockrc = mtx.TryLock(); Progress(lockrc, "We should get the mutex lock now (%s)", lockrc ? "got it" : "didn't get it"); if(lockrc) - mtx.unlock(); + mtx.Unlock(); Progress(true, "Attempting to do normal lock..."); - mtx.lock(); + mtx.Lock(); Progress(true, "Got lock. Attempt to do normal unlock..."); - mtx.unlock(); + mtx.Unlock(); Progress(true, "Unlock OK."); } diff --git a/fastos/src/tests/thread_test_base.hpp b/fastos/src/tests/thread_test_base.hpp index 7966e95b369..5305b132d3c 100644 --- a/fastos/src/tests/thread_test_base.hpp +++ b/fastos/src/tests/thread_test_base.hpp @@ -2,17 +2,13 @@ #pragma once -#include <chrono> - static volatile int64_t number; #define INCREASE_NUMBER_AMOUNT 10000 -using namespace std::chrono_literals; - class ThreadTestBase : public BaseTest, public FastOS_Runnable { private: - std::mutex printMutex; + FastOS_Mutex printMutex; public: ThreadTestBase(void) @@ -23,8 +19,9 @@ public: void PrintProgress (char *string) override { - std::lock_guard<std::mutex> guard(printMutex); + printMutex.Lock(); BaseTest::PrintProgress(string); + printMutex.Unlock(); } void Run (FastOS_ThreadInterface *thread, void *arg) override; @@ -96,10 +93,8 @@ void ThreadTestBase::Run (FastOS_ThreadInterface *thread, void *arg) { int result; - std::unique_lock<std::mutex> guard; - if(job->mutex != nullptr) { - guard = std::unique_lock<std::mutex>(*job->mutex); - } + if(job->mutex != nullptr) + job->mutex->Lock(); result = static_cast<int>(number); @@ -112,7 +107,8 @@ void ThreadTestBase::Run (FastOS_ThreadInterface *thread, void *arg) FastOS_Thread::Sleep(1000); } - guard = std::unique_lock<std::mutex>(); + if(job->mutex != nullptr) + job->mutex->Unlock(); job->result = result; // This marks the end of the thread @@ -136,23 +132,26 @@ void ThreadTestBase::Run (FastOS_ThreadInterface *thread, void *arg) case WAIT_FOR_THREAD_TO_FINISH: { - std::unique_lock<std::mutex> guard; - if (job->mutex != nullptr) { - guard = std::unique_lock<std::mutex>(*job->mutex); - } + if(job->mutex) + job->mutex->Lock(); if(job->otherThread != nullptr) job->otherThread->Join(); + if(job->mutex) + job->mutex->Unlock(); break; } case WAIT_FOR_CONDITION: { - std::unique_lock<std::mutex> guard(*job->mutex); + job->condition->Lock(); + job->result = 1; - job->condition->wait(guard); - guard.unlock(); + + job->condition->Wait(); + job->condition->Unlock(); + job->result = 0; break; @@ -161,25 +160,25 @@ void ThreadTestBase::Run (FastOS_ThreadInterface *thread, void *arg) case BOUNCE_CONDITIONS: { while (!thread->GetBreakFlag()) { - { - std::lock_guard<std::mutex> guard(*job->otherjob->mutex); - job->otherjob->bouncewakeupcnt++; - job->otherjob->bouncewakeup = true; - job->otherjob->condition->notify_one(); - } - std::unique_lock<std::mutex> guard(*job->mutex); - while (!job->bouncewakeup) { - job->condition->wait_for(guard, 1ms); - } - job->bouncewakeup = false; + job->otherjob->condition->Lock(); + job->otherjob->bouncewakeupcnt++; + job->otherjob->bouncewakeup = true; + job->otherjob->condition->Signal(); + job->otherjob->condition->Unlock(); + + job->condition->Lock(); + while (!job->bouncewakeup) + job->condition->TimedWait(1); + job->bouncewakeup = false; + job->condition->Unlock(); } break; } case TEST_ID: { - job->mutex->lock(); // Initially the parent threads owns the lock - job->mutex->unlock(); // It is unlocked when we should start + job->mutex->Lock(); // Initially the parent threads owns the lock + job->mutex->Unlock(); // It is unlocked when we should start FastOS_ThreadId currentId = FastOS_Thread::GetCurrentThreadId(); @@ -193,19 +192,18 @@ void ThreadTestBase::Run (FastOS_ThreadInterface *thread, void *arg) case WAIT2SEC_AND_SIGNALCOND: { FastOS_Thread::Sleep(2000); - job->condition->notify_one(); + job->condition->Signal(); job->result = 1; break; } case HOLD_MUTEX_FOR2SEC: { - { - std::lock_guard<std::mutex> guard(*job->mutex); - FastOS_Thread::Sleep(2000); - } - job->result = 1; - break; + job->mutex->Lock(); + FastOS_Thread::Sleep(2000); + job->mutex->Unlock(); + job->result = 1; + break; } case WAIT_2_SEC: diff --git a/fastos/src/tests/threadtest.cpp b/fastos/src/tests/threadtest.cpp index b0b64697129..81ea234fb97 100644 --- a/fastos/src/tests/threadtest.cpp +++ b/fastos/src/tests/threadtest.cpp @@ -5,18 +5,18 @@ #include "thread_test_base.hpp" #include <vespa/fastos/time.h> #include <cstdlib> -#include <chrono> #define MUTEX_TEST_THREADS 6 #define MAX_THREADS 7 + class ThreadTest : public ThreadTestBase { int Main () override; void WaitForXThreadsToHaveWait (Job *jobs, int jobCount, - std::mutex &mutex, + FastOS_Cond *condition, int numWait) { Progress(true, "Waiting for %d threads to be in wait state", numWait); @@ -26,15 +26,16 @@ class ThreadTest : public ThreadTestBase { int waitingThreads=0; + condition->Lock(); + + for(int i=0; i<jobCount; i++) { - std::lock_guard<std::mutex> guard(mutex); - for(int i=0; i<jobCount; i++) - { - if(jobs[i].result == 1) - waitingThreads++; - } + if(jobs[i].result == 1) + waitingThreads++; } + condition->Unlock(); + if(waitingThreads != oldNumber) Progress(true, "%d threads are waiting", waitingThreads); @@ -322,14 +323,12 @@ class ThreadTest : public ThreadTestBase } void SharedSignalAndBroadcastTest (Job *jobs, int numThreads, - std::mutex *mutex, - std::condition_variable *condition, + FastOS_Cond *condition, FastOS_ThreadPool *pool) { for(int i=0; i<numThreads; i++) { jobs[i].code = WAIT_FOR_CONDITION; - jobs[i].mutex = mutex; jobs[i].condition = condition; jobs[i].ownThread = pool->NewThread(this, static_cast<void *>(&jobs[i])); @@ -339,7 +338,7 @@ class ThreadTest : public ThreadTestBase } WaitForXThreadsToHaveWait (jobs, numThreads, - *mutex, numThreads); + condition, numThreads); // Threads are not guaranteed to have entered sleep yet, // as this test only tests for result code @@ -355,16 +354,15 @@ class ThreadTest : public ThreadTestBase FastOS_ThreadPool pool(128*1024); Job jobs[numThreads]; - std::mutex mutex; - std::condition_variable condition; + FastOS_Cond condition; - SharedSignalAndBroadcastTest(jobs, numThreads, &mutex, &condition, &pool); + SharedSignalAndBroadcastTest(jobs, numThreads, &condition, &pool); for(int i=0; i<numThreads; i++) { - condition.notify_one(); + condition.Signal(); WaitForXThreadsToHaveWait(jobs, numThreads, - mutex, numThreads-1-i); + &condition, numThreads-1-i); } Progress(true, "Waiting for threads to finish using pool.Close()..."); @@ -381,13 +379,12 @@ class ThreadTest : public ThreadTestBase FastOS_ThreadPool pool(128*1024); Job jobs[numThreads]; - std::mutex mutex; - std::condition_variable condition; + FastOS_Cond condition; - SharedSignalAndBroadcastTest(jobs, numThreads, &mutex, &condition, &pool); + SharedSignalAndBroadcastTest(jobs, numThreads, &condition, &pool); - condition.notify_all(); - WaitForXThreadsToHaveWait(jobs, numThreads, mutex, 0); + condition.Broadcast(); + WaitForXThreadsToHaveWait(jobs, numThreads, &condition, 0); Progress(true, "Waiting for threads to finish using pool.Close()..."); pool.Close(); @@ -404,9 +401,9 @@ class ThreadTest : public ThreadTestBase FastOS_ThreadPool pool(128*1024); Job jobs[numThreads]; - std::mutex slowStartMutex; + FastOS_Mutex slowStartMutex; - slowStartMutex.lock(); // Halt all threads until we want them to run + slowStartMutex.Lock(); // Halt all threads until we want them to run for(i=0; i<numThreads; i++) { jobs[i].code = TEST_ID; @@ -431,7 +428,7 @@ class ThreadTest : public ThreadTestBase } } - slowStartMutex.unlock(); // Allow threads to run + slowStartMutex.Unlock(); // Allow threads to run Progress(true, "Waiting for threads to finish using pool.Close()..."); pool.Close(); @@ -452,12 +449,10 @@ class ThreadTest : public ThreadTestBase FastOS_ThreadPool pool(128*1024); Job job; - std::mutex mutex; - std::condition_variable condition; + FastOS_Cond condition; job.code = WAIT2SEC_AND_SIGNALCOND; job.result = -1; - job.mutex = &mutex; job.condition = &condition; job.ownThread = pool.NewThread(this, static_cast<void *>(&job)); @@ -466,17 +461,18 @@ class ThreadTest : public ThreadTestBase if(job.ownThread != nullptr) { - std::unique_lock<std::mutex> guard(mutex); - bool gotCond = condition.wait_for(guard, 500ms) == std::cv_status::no_timeout; + condition.Lock(); + bool gotCond = condition.TimedWait(500); Progress(!gotCond, "We should not get the condition just yet (%s)", gotCond ? "got it" : "didn't get it"); - gotCond = condition.wait_for(guard, 500ms) == std::cv_status::no_timeout; + gotCond = condition.TimedWait(500); Progress(!gotCond, "We should not get the condition just yet (%s)", gotCond ? "got it" : "didn't get it"); - gotCond = condition.wait_for(guard, 5000ms) == std::cv_status::no_timeout; + gotCond = condition.TimedWait(5000); Progress(gotCond, "We should have got the condition now (%s)", gotCond ? "got it" : "didn't get it"); - } + condition.Unlock(); + } Progress(true, "Waiting for threads to finish using pool.Close()..."); pool.Close(); @@ -495,22 +491,31 @@ class ThreadTest : public ThreadTestBase for(i=0; i<allocCount; i++) { - std::mutex *mtx = new std::mutex; - mtx->lock(); - mtx->unlock(); + FastOS_Mutex *mtx = new FastOS_Mutex(); + mtx->Lock(); + mtx->Unlock(); delete mtx; if((i % progressIndex) == (progressIndex - 1)) - Progress(true, "Tested %d std::mutex instances", i + 1); + Progress(true, "Tested %d FastOS_Mutex instances", i + 1); + } + + for(i=0; i<allocCount; i++) + { + FastOS_Cond *cond = new FastOS_Cond(); + delete cond; + + if((i % progressIndex) == (progressIndex - 1)) + Progress(true, "Tested %d FastOS_Cond instances", i+1); } for(i=0; i<allocCount; i++) { - std::condition_variable *cond = new std::condition_variable; + FastOS_BoolCond *cond = new FastOS_BoolCond(); delete cond; if((i % progressIndex) == (progressIndex - 1)) - Progress(true, "Tested %d std::condition_variable instances", i+1); + Progress(true, "Tested %d FastOS_BoolCond instances", i+1); } PrintSeparator(); @@ -523,13 +528,13 @@ class ThreadTest : public ThreadTestBase const int allocCount = 150000; int i; - std::mutex **mutexes = new std::mutex*[allocCount]; + FastOS_Mutex **mutexes = new FastOS_Mutex*[allocCount]; FastOS_Time startTime, nowTime; startTime.SetNow(); for(i=0; i<allocCount; i++) - mutexes[i] = new std::mutex; + mutexes[i] = new FastOS_Mutex(); nowTime.SetNow(); Progress(true, "Allocated %d mutexes at time: %d ms", allocCount, @@ -538,10 +543,10 @@ class ThreadTest : public ThreadTestBase for(int e=0; e<4; e++) { for(i=0; i<allocCount; i++) - mutexes[i]->lock(); + mutexes[i]->Lock(); for(i=0; i<allocCount; i++) - mutexes[i]->unlock(); + mutexes[i]->Unlock(); nowTime.SetNow(); Progress(true, "Tested %d mutexes at time: %d ms", allocCount, diff --git a/fastos/src/tests/typetest.cpp b/fastos/src/tests/typetest.cpp index 503c9a30d24..209af305501 100644 --- a/fastos/src/tests/typetest.cpp +++ b/fastos/src/tests/typetest.cpp @@ -16,8 +16,11 @@ private: TestHeader("Object Sizes (bytes)"); Progress(true, "FastOS_Application: %d", sizeof(FastOS_Application)); + Progress(true, "FastOS_BoolCond %d", sizeof(FastOS_BoolCond)); + Progress(true, "FastOS_Cond %d", sizeof(FastOS_Cond)); Progress(true, "FastOS_DirectoryScan %d", sizeof(FastOS_DirectoryScan)); Progress(true, "FastOS_File: %d", sizeof(FastOS_File)); + Progress(true, "FastOS_Mutex: %d", sizeof(FastOS_Mutex)); Progress(true, "FastOS_Runnable %d", sizeof(FastOS_Runnable)); Progress(true, "FastOS_ServerSocket %d", sizeof(FastOS_ServerSocket)); Progress(true, "FastOS_Socket: %d", sizeof(FastOS_Socket)); diff --git a/fastos/src/vespa/fastos/CMakeLists.txt b/fastos/src/vespa/fastos/CMakeLists.txt index f98e5b8d97b..2a0ff2d370a 100644 --- a/fastos/src/vespa/fastos/CMakeLists.txt +++ b/fastos/src/vespa/fastos/CMakeLists.txt @@ -13,9 +13,11 @@ vespa_add_library(fastos_objects OBJECT time.cpp timestamp.cpp unix_app.cpp + unix_cond.cpp unix_dynamiclibrary.cpp unix_file.cpp unix_ipc.cpp + unix_mutex.cpp unix_process.cpp unix_socket.cpp unix_thread.cpp diff --git a/fastos/src/vespa/fastos/app.cpp b/fastos/src/vespa/fastos/app.cpp index 822683540f7..824d009591f 100644 --- a/fastos/src/vespa/fastos/app.cpp +++ b/fastos/src/vespa/fastos/app.cpp @@ -65,7 +65,7 @@ bool FastOS_ApplicationInterface::Init () if(errorMsg == nullptr) { - _processListMutex = new std::mutex; + _processListMutex = new FastOS_Mutex(); _threadPool = new FastOS_ThreadPool(128 * 1024); rc = true; } diff --git a/fastos/src/vespa/fastos/app.h b/fastos/src/vespa/fastos/app.h index 9560d1ced6a..283db64985c 100644 --- a/fastos/src/vespa/fastos/app.h +++ b/fastos/src/vespa/fastos/app.h @@ -15,7 +15,7 @@ class FastOS_ProcessInterface; class FastOS_ThreadPool; -#include <mutex> +#include <vespa/fastos/mutex.h> /** * FastOS application wrapper class. @@ -143,7 +143,7 @@ protected: FastOS_ThreadPool *_threadPool; FastOS_ProcessInterface *_processList; - std::mutex *_processListMutex; + FastOS_Mutex *_processListMutex; bool _disableLeakReporting; virtual bool PreThreadInit () { return true; } @@ -248,7 +248,8 @@ public: void AddChildProcess (FastOS_ProcessInterface *node); void RemoveChildProcess (FastOS_ProcessInterface *node); - std::unique_lock<std::mutex> getProcessGuard() { return std::unique_lock<std::mutex>(*_processListMutex); } + void ProcessLock () { _processListMutex->Lock(); } + void ProcessUnlock() { _processListMutex->Unlock(); } FastOS_ProcessInterface *GetProcessList () { return _processList; } FastOS_ThreadPool *GetThreadPool (); diff --git a/fastos/src/vespa/fastos/cond.h b/fastos/src/vespa/fastos/cond.h new file mode 100644 index 00000000000..c9405728223 --- /dev/null +++ b/fastos/src/vespa/fastos/cond.h @@ -0,0 +1,165 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +//************************************************************************ +/** + * @file + * Class definitions for FastOS_CondInterface and FastOS_BoolCond. + * + * @author Div, Oivind H. Danielsen + */ + +#pragma once + +#include "mutex.h" + + +/** + * This class implements a synchronization mechanism used by threads to wait + * until a condition expression involving shared data attains a particular state. + * + * Condition variables provide a different type of synchronization + * than locking mechanisms like mutexes. For instance, a mutex is used + * to cause other threads to wait while the thread holding the mutex + * executes code in a critical section. In contrast, a condition + * variable is typically used by a thread to make itself wait until an + * expression involving shared data attains a particular state. + */ +class FastOS_CondInterface : public FastOS_Mutex +{ +public: + FastOS_CondInterface(void) : FastOS_Mutex() { } + + virtual ~FastOS_CondInterface () {} + + /** + * Wait for the condition to be signalled. If the wait takes + * longer than [milliseconds] ms, the wait is aborted and false + * is returned. + * @param milliseconds Max time to wait. + * @return Boolean success/failure + */ + virtual bool TimedWait (int milliseconds) = 0; + + /** + * Wait for the condition to be signalled. + */ + virtual void Wait (void)=0; + + /** + * Send a signal to one thread waiting on the condition (if any). + */ + virtual void Signal (void)=0; + + /** + * Send a signal to all threads waiting on the condition. + */ + virtual void Broadcast (void)=0; +}; + +#include <vespa/fastos/unix_cond.h> +typedef FastOS_UNIX_Cond FASTOS_PREFIX(Cond); + +/** + * This class implements a condition variable with a boolean + * value. + */ +class FastOS_BoolCond : public FastOS_Cond +{ + bool _busy; + +public: + /** + * Constructor. Initially the boolean variable is + * set to non-busy. + */ + FastOS_BoolCond(void) : _busy(false) { } + + ~FastOS_BoolCond(void) { } + + /** + * If the variable is busy, wait for it to be non-busy, + * then set the variable to busy. */ + void SetBusy(void) + { + Lock(); + + while (_busy == true) + Wait(); + + _busy = true; + Unlock(); + } + + /** + * If the variable is busy, wait until it is no longer busy. + * If it was non-busy to begin with, no wait is performed. + */ + void WaitBusy(void) + { + Lock(); + + while (_busy == true) + Wait(); + + Unlock(); + } + + /** + * If the variable is busy, wait until it is no longer busy or a + * timeout occurs. If it was non-busy to begin with, no wait is + * performed. + * @param ms Time to wait + * @return True=non-busy, false=timeout + */ + bool TimedWaitBusy(int ms) + { + bool success = true; + + Lock(); + if (_busy == true) { + success = TimedWait(ms); + } + Unlock(); + + return success; + } + + /** + * Return busy status. + * @return True=busy, false=non-busy + */ + bool PollBusy (void) + { + bool rc; + Lock(); + rc = _busy; + Unlock(); + return rc; + } + + /** + * Set the variable to non-busy, and signal one thread + * waiting (if there are any). + * (if any). + */ + void ClearBusy(void) + { + Lock(); + _busy = false; + Signal(); + Unlock(); + } + + /** + * Set the variable to non-busy, and broadcast to all + * threads waiting (if there are any). + */ + void ClearBusyBroadcast(void) + { + Lock(); + _busy = false; + Broadcast(); + Unlock(); + } +}; + + diff --git a/fastos/src/vespa/fastos/mutex.h b/fastos/src/vespa/fastos/mutex.h new file mode 100644 index 00000000000..530e8d007bc --- /dev/null +++ b/fastos/src/vespa/fastos/mutex.h @@ -0,0 +1,64 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +//************************************************************************ +/** + * @file + * Class definition for FastOS_Mutex. + * + * @author Div, Oivind H. Danielsen + */ + +#pragma once + +#include "types.h" + +/** + * This class defines a mutual-exclusion object. + * + * Facilitates synchronized access to mutual-exclusion zones in the program. + * Before entering code sections where only a single thread at the time can + * operate, use @ref Lock(). If another thread is holding the lock at the + * time, the calling thread will sleep until the current holder of the mutex + * is through using it. + * + * Use @ref Unlock() to release the mutex lock. This will allow other threads + * to obtain the lock. + */ + +class FastOS_MutexInterface +{ +public: + /** + * Destructor + */ + virtual ~FastOS_MutexInterface () {} + + /** + * Obtain an exclusive lock on the mutex. The result of a recursive lock + * is currently undefined. The caller should assume this will result + * in a deadlock situation. + * A recursive lock occurs when a thread, currently owning the lock, + * attempts to lock the mutex a second time. + * + * Use @ref Unlock() to unlock the mutex when done. + */ + virtual void Lock ()=0; + + /** + * Try to obtain an exclusive lock on the mutex. If a lock cannot be + * obtained right away, the method will return false. There will + * be no blocking/waiting for the mutex lock to be available. If + * the mutex was locked in the attempt, true is returned. + * @return Boolean success/failure + */ + virtual bool TryLock ()=0; + + /** + * Unlock a locked mutex. The result of unlocking a mutex not already + * locked by the calling thread is undefined. + */ + virtual void Unlock ()=0; +}; + +#include "unix_mutex.h" +typedef FastOS_UNIX_Mutex FASTOS_PREFIX(Mutex); + diff --git a/fastos/src/vespa/fastos/ringbuffer.h b/fastos/src/vespa/fastos/ringbuffer.h index 41c0af7385b..53ee003915e 100644 --- a/fastos/src/vespa/fastos/ringbuffer.h +++ b/fastos/src/vespa/fastos/ringbuffer.h @@ -32,7 +32,7 @@ private: return (_dataIndex + offset) % _bufferSize; } - std::mutex _mutex; + FastOS_Mutex _mutex; public: void Reset () @@ -128,6 +128,14 @@ public: return _closed; } - std::unique_lock<std::mutex> getGuard() { return std::unique_lock<std::mutex>(_mutex); } + void Lock () + { + _mutex.Lock(); + } + + void Unlock () + { + _mutex.Unlock(); + } }; diff --git a/fastos/src/vespa/fastos/socketevent.cpp b/fastos/src/vespa/fastos/socketevent.cpp index 5e542390a53..a80cb015782 100644 --- a/fastos/src/vespa/fastos/socketevent.cpp +++ b/fastos/src/vespa/fastos/socketevent.cpp @@ -7,7 +7,7 @@ FastOS_SocketEventObjects *FastOS_SocketEventObjects::_objects = nullptr; -std::mutex FastOS_SocketEventObjects::_listMutex; +FastOS_Mutex FastOS_SocketEventObjects::_listMutex; int FastOS_SocketEventObjects::_objectCount = 0; bool FastOS_SocketEventObjects::_initialized = false; @@ -55,12 +55,12 @@ bool FastOS_SocketEvent::HandleWakeUp () FastOS_SocketEventObjects *FastOS_SocketEventObjects::ObtainObject (FastOS_SocketEvent *event) { FastOS_SocketEventObjects *node; - std::unique_lock<std::mutex> guard(_listMutex); + _listMutex.Lock(); if(_objects == nullptr) { _objectCount++; - guard.unlock(); + _listMutex.Unlock(); node = new FastOS_SocketEventObjects(event); node->_next = nullptr; @@ -70,6 +70,8 @@ FastOS_SocketEventObjects *FastOS_SocketEventObjects::ObtainObject (FastOS_Socke node = _objects; _objects = node->_next; node->_next = nullptr; + + _listMutex.Unlock(); } return node; @@ -79,7 +81,7 @@ void FastOS_SocketEventObjects::ReleaseObject (FastOS_SocketEventObjects *node) { if (node != nullptr) node->ReleasedCleanup(); - std::lock_guard<std::mutex> guard(_listMutex); + _listMutex.Lock(); if (_initialized) { node->_next = _objects; @@ -88,6 +90,8 @@ void FastOS_SocketEventObjects::ReleaseObject (FastOS_SocketEventObjects *node) delete node; _objectCount--; } + + _listMutex.Unlock(); } @@ -209,14 +213,15 @@ FastOS_SocketEvent::epollFini() void FastOS_SocketEventObjects::InitializeClass(void) { - std::lock_guard<std::mutex> guard(_listMutex); + _listMutex.Lock(); _initialized = true; + _listMutex.Unlock(); } void FastOS_SocketEventObjects::ClassCleanup(void) { - std::lock_guard<std::mutex> guard(_listMutex); + _listMutex.Lock(); _initialized = false; for (;;) { @@ -231,6 +236,7 @@ void FastOS_SocketEventObjects::ClassCleanup(void) _objectCount--; } } + _listMutex.Unlock(); } diff --git a/fastos/src/vespa/fastos/socketevent.h b/fastos/src/vespa/fastos/socketevent.h index 267f948caf9..5e457908ace 100644 --- a/fastos/src/vespa/fastos/socketevent.h +++ b/fastos/src/vespa/fastos/socketevent.h @@ -3,11 +3,11 @@ #pragma once #include "types.h" +#include "mutex.h" #include <poll.h> #include <sys/epoll.h> #include <vector> -#include <mutex> class FastOS_IOEvent { @@ -25,7 +25,7 @@ private: FastOS_SocketEventObjects(const FastOS_SocketEventObjects&); FastOS_SocketEventObjects& operator=(const FastOS_SocketEventObjects&); - static std::mutex _listMutex; + static FastOS_Mutex _listMutex; static int _objectCount; static bool _initialized; diff --git a/fastos/src/vespa/fastos/thread.cpp b/fastos/src/vespa/fastos/thread.cpp index 5e3400b70e3..3cd3bb4b85b 100644 --- a/fastos/src/vespa/fastos/thread.cpp +++ b/fastos/src/vespa/fastos/thread.cpp @@ -20,7 +20,6 @@ FastOS_ThreadPool::FastOS_ThreadPool(int stackSize, int maxThreads) _stackSize(stackSize), _closeCalledFlag(false), _freeMutex(), - _liveMutex(), _liveCond(), _freeThreads(nullptr), _activeThreads(nullptr), @@ -41,20 +40,21 @@ void FastOS_ThreadPool::ThreadIsAboutToTerminate(FastOS_ThreadInterface *) { assert(isClosed()); - std::lock_guard<std::mutex> guard(_liveMutex); + _liveCond.Lock(); _numTerminated++; _numLive--; - if (_numLive == 0) { - _liveCond.notify_all(); - } + if (_numLive == 0) + _liveCond.Broadcast(); + + _liveCond.Unlock(); } // This is a NOP if the thread isn't active. void FastOS_ThreadPool::FreeThread (FastOS_ThreadInterface *thread) { - std::lock_guard<std::mutex> guard(_freeMutex); + _freeMutex.Lock(); if(thread->_active) { LinkOutThread(thread, &_activeThreads); @@ -65,6 +65,8 @@ void FastOS_ThreadPool::FreeThread (FastOS_ThreadInterface *thread) LinkInThread(thread, &_freeThreads); _numFree++; } + + _freeMutex.Unlock(); } void FastOS_ThreadPool::LinkOutThread (FastOS_ThreadInterface *thread, FastOS_ThreadInterface **listHead) @@ -108,7 +110,7 @@ FastOS_ThreadInterface *FastOS_ThreadPool::NewThread (FastOS_Runnable *owner, vo { FastOS_ThreadInterface *thread=nullptr; - std::unique_lock<std::mutex> freeGuard(_freeMutex); + _freeMutex.Lock(); if (!isClosed()) { if ((thread = _freeThreads) != nullptr) { @@ -124,21 +126,24 @@ FastOS_ThreadInterface *FastOS_ThreadPool::NewThread (FastOS_Runnable *owner, vo fprintf(stderr, "Error: Maximum number of threads (%d)" " already allocated.\n", _maxThreads); } else { - freeGuard.unlock(); - { - std::lock_guard<std::mutex> liveGuard(_liveMutex); - _numLive++; - } + _freeMutex.Unlock(); + + _liveCond.Lock(); + _numLive++; + _liveCond.Unlock(); + thread = FastOS_Thread::CreateThread(this); if (thread == nullptr) { - std::lock_guard<std::mutex> liveGuard(_liveMutex); + _liveCond.Lock(); _numLive--; if (_numLive == 0) { - _liveCond.notify_all(); + _liveCond.Broadcast(); } + _liveCond.Unlock(); } - freeGuard.lock(); + + _freeMutex.Lock(); if(thread != nullptr) ActivateThread(thread); @@ -146,10 +151,11 @@ FastOS_ThreadInterface *FastOS_ThreadPool::NewThread (FastOS_Runnable *owner, vo } } - freeGuard.unlock(); + _freeMutex.Unlock(); if(thread != nullptr) { - std::lock_guard<std::mutex> liveGuard(_liveMutex); + _liveCond.Lock(); thread->Dispatch(owner, arg); + _liveCond.Unlock(); } return thread; @@ -160,7 +166,7 @@ void FastOS_ThreadPool::BreakThreads () { FastOS_ThreadInterface *thread; - std::lock_guard<std::mutex> freeGuard(_freeMutex); + _freeMutex.Lock(); // Notice all active threads that they should quit for(thread=_activeThreads; thread != nullptr; thread=thread->_next) { @@ -171,22 +177,26 @@ void FastOS_ThreadPool::BreakThreads () for(thread=_freeThreads; thread != nullptr; thread=thread->_next) { thread->SetBreakFlag(); } + + _freeMutex.Unlock(); } void FastOS_ThreadPool::JoinThreads () { - std::unique_lock<std::mutex> liveGuard(_liveMutex); - while (_numLive > 0) { - _liveCond.wait(liveGuard); - } + _liveCond.Lock(); + + while (_numLive > 0) + _liveCond.Wait(); + + _liveCond.Unlock(); } void FastOS_ThreadPool::DeleteThreads () { FastOS_ThreadInterface *thread; - std::lock_guard<std::mutex> freeGuard(_freeMutex); + _freeMutex.Lock(); assert(_numActive == 0); assert(_numLive == 0); @@ -199,25 +209,30 @@ void FastOS_ThreadPool::DeleteThreads () } assert(_numFree == 0); + + _freeMutex.Unlock(); } void FastOS_ThreadPool::Close () { - std::unique_lock<std::mutex> closeFlagGuard(_closeFlagMutex); + _closeFlagMutex.Lock(); if (!_closeCalledFlag) { _closeCalledFlag = true; - closeFlagGuard.unlock(); + _closeFlagMutex.Unlock(); BreakThreads(); JoinThreads(); DeleteThreads(); + } else { + _closeFlagMutex.Unlock(); } } bool FastOS_ThreadPool::isClosed() { - std::lock_guard<std::mutex> closeFlagGuard(_closeFlagMutex); + _closeFlagMutex.Lock(); bool closed(_closeCalledFlag); + _closeFlagMutex.Unlock(); return closed; } @@ -247,19 +262,20 @@ void FastOS_ThreadInterface::Hook () while(!finished) { - std::unique_lock<std::mutex> dispatchedGuard(_dispatchedMutex); // BEGIN lock + _dispatched.Lock(); // BEGIN lock + while (_owner == nullptr && !(finished = _pool->isClosed())) { - _dispatchedCond.wait(dispatchedGuard); + _dispatched.Wait(); } - dispatchedGuard.unlock(); // END lock + _dispatched.Unlock(); // END lock if(!finished) { PreEntry(); deleteOnCompletion = _owner->DeleteOnCompletion(); _owner->Run(this, _startArg); - dispatchedGuard.lock(); // BEGIN lock + _dispatched.Lock(); // BEGIN lock if (deleteOnCompletion) { delete _owner; @@ -269,13 +285,9 @@ void FastOS_ThreadInterface::Hook () _breakFlag = false; finished = _pool->isClosed(); - dispatchedGuard.unlock(); // END lock + _dispatched.Unlock(); // END lock - { - std::lock_guard<std::mutex> runningGuard(_runningMutex); - _runningFlag = false; - _runningCond.notify_all(); - } + _runningCond.ClearBusyBroadcast(); _pool->FreeThread(this); // printf("Thread given back to FastOS_ThreadPool: %p\n", this); @@ -295,15 +307,9 @@ void FastOS_ThreadInterface::Hook () void FastOS_ThreadInterface::Dispatch(FastOS_Runnable *newOwner, void *arg) { - std::lock_guard<std::mutex> dispatchedGuard(_dispatchedMutex); + _dispatched.Lock(); - { - std::unique_lock<std::mutex> runningGuard(_runningMutex); - while (_runningFlag) { - _runningCond.wait(runningGuard); - } - _runningFlag = true; - } + _runningCond.SetBusy(); _owner = newOwner; _startArg = arg; @@ -316,14 +322,18 @@ void FastOS_ThreadInterface::Dispatch(FastOS_Runnable *newOwner, void *arg) // it the safe way we just do that, instead of keeping a unneccessary long // suppressionslist. It will be long enough anyway. - _dispatchedCond.notify_one(); + _dispatched.Signal(); + + _dispatched.Unlock(); } void FastOS_ThreadInterface::SetBreakFlag() { - std::lock_guard<std::mutex> dispatchedGuard(_dispatchedMutex); + _dispatched.Lock(); _breakFlag = true; - _dispatchedCond.notify_one(); + + _dispatched.Signal(); + _dispatched.Unlock(); } @@ -341,10 +351,7 @@ FastOS_ThreadInterface *FastOS_ThreadInterface::CreateThread(FastOS_ThreadPool * void FastOS_ThreadInterface::Join () { - std::unique_lock<std::mutex> runningGuard(_runningMutex); - while (_runningFlag) { - _runningCond.wait(runningGuard); - } + _runningCond.WaitBusy(); } diff --git a/fastos/src/vespa/fastos/thread.h b/fastos/src/vespa/fastos/thread.h index 2726efe3cf0..eb43fc6b664 100644 --- a/fastos/src/vespa/fastos/thread.h +++ b/fastos/src/vespa/fastos/thread.h @@ -12,8 +12,8 @@ #include "types.h" -#include <mutex> -#include <condition_variable> +#include "mutex.h" +#include "cond.h" typedef pthread_t FastOS_ThreadId; @@ -41,7 +41,7 @@ private: FastOS_ThreadPool& operator=(const FastOS_ThreadPool&); int _startedThreadsCount; - std::mutex _closeFlagMutex; + FastOS_Mutex _closeFlagMutex; /** * The stack size for threads in this pool. */ @@ -49,9 +49,8 @@ private: bool _closeCalledFlag; // Always lock in this order - std::mutex _freeMutex; - std::mutex _liveMutex; - std::condition_variable _liveCond; + FastOS_Mutex _freeMutex; + FastOS_Cond _liveCond; /** * List of free (available) threads. */ @@ -233,8 +232,7 @@ protected: * The thread does not start (call @ref FastOS_Runnable::Run()) * until this event has been triggered. */ - std::mutex _dispatchedMutex; - std::condition_variable _dispatchedCond; + FastOS_Cond _dispatched; FastOS_ThreadInterface *_next; FastOS_ThreadInterface *_prev; @@ -305,9 +303,7 @@ protected: * Is the thread running? This is used by @ref Join(), to wait for threads * to finish. */ - std::mutex _runningMutex; - std::condition_variable _runningCond; - bool _runningFlag; + FastOS_BoolCond _runningCond; public: /** @@ -328,8 +324,7 @@ public: * Constructor. Resets internal attributes. */ FastOS_ThreadInterface (FastOS_ThreadPool *pool) - : _dispatchedMutex(), - _dispatchedCond(), + : _dispatched(), _next(nullptr), _prev(nullptr), _owner(nullptr), @@ -337,9 +332,7 @@ public: _startArg(nullptr), _breakFlag(false), _active(false), - _runningMutex(), - _runningCond(), - _runningFlag(false) + _runningCond() { } diff --git a/fastos/src/vespa/fastos/unix_app.cpp b/fastos/src/vespa/fastos/unix_app.cpp index c60035aa5ab..7682b2d5b8f 100644 --- a/fastos/src/vespa/fastos/unix_app.cpp +++ b/fastos/src/vespa/fastos/unix_app.cpp @@ -162,13 +162,9 @@ void FastOS_UNIX_Application::Cleanup () _ipcHelper->Exit(); if (_processStarter != nullptr) { - { - std::unique_lock<std::mutex> guard; - if (_processListMutex) { - guard = getProcessGuard(); - } - _processStarter->Stop(); - } + if (_processListMutex) ProcessLock(); + _processStarter->Stop(); + if (_processListMutex) ProcessUnlock(); delete _processStarter; _processStarter = nullptr; } diff --git a/fastos/src/vespa/fastos/unix_cond.cpp b/fastos/src/vespa/fastos/unix_cond.cpp new file mode 100644 index 00000000000..5eb1f5b0218 --- /dev/null +++ b/fastos/src/vespa/fastos/unix_cond.cpp @@ -0,0 +1,49 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "cond.h" +#include <sys/time.h> +#include <cstdint> + +FastOS_UNIX_Cond::FastOS_UNIX_Cond(void) + : FastOS_CondInterface(), + _cond() +{ + pthread_cond_init(&_cond, nullptr); +} + +FastOS_UNIX_Cond::~FastOS_UNIX_Cond(void) +{ + pthread_cond_destroy(&_cond); +} + +void +FastOS_UNIX_Cond::Wait(void) +{ + pthread_cond_wait(&_cond, &_mutex); +} + +bool +FastOS_UNIX_Cond::TimedWait(int milliseconds) +{ + + struct timeval currentTime; + struct timespec absTime; + int error; + + gettimeofday(¤tTime, nullptr); + + int64_t ns = (static_cast<int64_t>(currentTime.tv_sec) * + static_cast<int64_t>(1000 * 1000 * 1000) + + static_cast<int64_t>(currentTime.tv_usec) * + static_cast<int64_t>(1000) + + static_cast<int64_t>(milliseconds) * + static_cast<int64_t>(1000 * 1000)); + + absTime.tv_sec = static_cast<int> + (ns / static_cast<int64_t>(1000 * 1000 * 1000)); + absTime.tv_nsec = static_cast<int> + (ns % static_cast<int64_t>(1000 * 1000 * 1000)); + + error = pthread_cond_timedwait(&_cond, &_mutex, &absTime); + return error == 0; +} diff --git a/fastos/src/vespa/fastos/unix_cond.h b/fastos/src/vespa/fastos/unix_cond.h new file mode 100644 index 00000000000..7367d812959 --- /dev/null +++ b/fastos/src/vespa/fastos/unix_cond.h @@ -0,0 +1,42 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +//************************************************************************ +/** + * Class definition and implementation for FastOS_UNIX_Cond. + * + * @author Div, Oivind H. Danielsen + */ + +#pragma once + +#include <vespa/fastos/cond.h> + + +class FastOS_UNIX_Cond : public FastOS_CondInterface +{ +private: + FastOS_UNIX_Cond(const FastOS_UNIX_Cond &); + FastOS_UNIX_Cond& operator=(const FastOS_UNIX_Cond &); + + pthread_cond_t _cond; + +public: + FastOS_UNIX_Cond (); + + ~FastOS_UNIX_Cond(); + + void Wait() override; + + bool TimedWait(int milliseconds) override; + + void Signal() override + { + pthread_cond_signal(&_cond); + } + + void Broadcast() override + { + pthread_cond_broadcast(&_cond); + } +}; + + diff --git a/fastos/src/vespa/fastos/unix_ipc.cpp b/fastos/src/vespa/fastos/unix_ipc.cpp index 79fbe3ee076..695d395674f 100644 --- a/fastos/src/vespa/fastos/unix_ipc.cpp +++ b/fastos/src/vespa/fastos/unix_ipc.cpp @@ -5,8 +5,6 @@ #include <cstring> #include <unistd.h> #include <fcntl.h> -#include <memory> -#include <future> FastOS_UNIX_IPCHelper:: FastOS_UNIX_IPCHelper (FastOS_ApplicationInterface *app, int descriptor) @@ -57,7 +55,7 @@ DoWrite(FastOS_UNIX_Process::DescriptorHandle &desc) bool rc = true; FastOS_RingBuffer *buffer = desc._writeBuffer.get(); - auto bufferGuard = buffer->getGuard(); + buffer->Lock(); int writeBytes = buffer->GetReadSpace(); if(writeBytes > 0) { @@ -80,6 +78,8 @@ DoWrite(FastOS_UNIX_Process::DescriptorHandle &desc) else if(bytesWritten == 0) desc.CloseHandle(); } + buffer->Unlock(); + return rc; } @@ -90,7 +90,7 @@ DoRead (FastOS_UNIX_Process::DescriptorHandle &desc) FastOS_RingBuffer *buffer = desc._readBuffer.get(); - auto bufferGuard = buffer->getGuard(); + buffer->Lock(); int readBytes = buffer->GetWriteSpace(); if(readBytes > 0) { int bytesRead; @@ -108,6 +108,7 @@ DoRead (FastOS_UNIX_Process::DescriptorHandle &desc) desc.CloseHandle(); } } + buffer->Unlock(); return rc; } @@ -429,7 +430,8 @@ RemoveClosingProcesses(void) if(!stillBusy) { - if (xproc->_closing) { + if(xproc->_closing != nullptr) + { // We already have the process lock at this point, // so modifying the list is safe. _app->RemoveChildProcess(node); @@ -448,8 +450,7 @@ RemoveClosingProcesses(void) } // The process destructor can now proceed - auto closingPromise(std::move(xproc->_closing)); - closingPromise->set_value(); + xproc->_closing->ClearBusy(); } } } @@ -473,32 +474,31 @@ Run(FastOS_ThreadInterface *thisThread, void *arg) for(;;) { // Deliver messages to from child processes and parent. + _app->ProcessLock(); + for(node = _app->GetProcessList(); node != nullptr; node = node->_next) { - auto guard = _app->getProcessGuard(); - for(node = _app->GetProcessList(); node != nullptr; node = node->_next) - { - FastOS_UNIX_Process *xproc = static_cast<FastOS_UNIX_Process *>(node); - FastOS_UNIX_Process::DescriptorHandle &desc = - xproc->GetDescriptorHandle(FastOS_UNIX_Process::TYPE_IPC); - DeliverMessages(desc._readBuffer.get()); - PipeData(xproc, FastOS_UNIX_Process::TYPE_STDOUT); - PipeData(xproc, FastOS_UNIX_Process::TYPE_STDERR); - } - DeliverMessages(_appParentIPCDescriptor._readBuffer.get()); + FastOS_UNIX_Process *xproc = static_cast<FastOS_UNIX_Process *>(node); + FastOS_UNIX_Process::DescriptorHandle &desc = + xproc->GetDescriptorHandle(FastOS_UNIX_Process::TYPE_IPC); + DeliverMessages(desc._readBuffer.get()); + PipeData(xproc, FastOS_UNIX_Process::TYPE_STDOUT); + PipeData(xproc, FastOS_UNIX_Process::TYPE_STDERR); + } + DeliverMessages(_appParentIPCDescriptor._readBuffer.get()); - // Setup file descriptor sets for the next select() call - BuildPollChecks(); + // Setup file descriptor sets for the next select() call + BuildPollChecks(); - // Close and signal closing processes - RemoveClosingProcesses(); + // Close and signal closing processes + RemoveClosingProcesses(); - BuildPollArray(&fds, &nfds, &allocnfds); - } - bool exitFlag = false; - { - std::lock_guard<std::mutex> guard(_lock); - exitFlag = _exitFlag; - } + BuildPollArray(&fds, &nfds, &allocnfds); + + _app->ProcessUnlock(); + + _lock.Lock(); + bool exitFlag(_exitFlag); + _lock.Unlock(); if (exitFlag) { if (_appParentIPCDescriptor._fd != -1) @@ -546,13 +546,11 @@ Run(FastOS_ThreadInterface *thisThread, void *arg) break; } - bool woken = false; - { - auto guard = _app->getProcessGuard(); - woken = SavePollArray(fds, nfds); - // Do actual IO (based on file descriptor sets and buffer contents) - PerformAsyncIO(); - } + _app->ProcessLock(); + bool woken = SavePollArray(fds, nfds); + // Do actual IO (based on file descriptor sets and buffer contents) + PerformAsyncIO(); + _app->ProcessUnlock(); PerformAsyncIPCIO(); // Did someone want to wake us up from the poll() call? @@ -586,7 +584,7 @@ SendMessage (FastOS_UNIX_Process *xproc, const void *buffer, ipcBuffer = desc._writeBuffer.get(); if(ipcBuffer != nullptr) { - auto ipcBufferGuard = ipcBuffer->getGuard(); + ipcBuffer->Lock(); if(ipcBuffer->GetWriteSpace() >= int((length + sizeof(int)))) { memcpy(ipcBuffer->GetWritePtr(), &length, sizeof(int)); @@ -597,6 +595,7 @@ SendMessage (FastOS_UNIX_Process *xproc, const void *buffer, NotifyProcessListChange(); rc = true; } + ipcBuffer->Unlock(); } return rc; } @@ -612,9 +611,10 @@ void FastOS_UNIX_IPCHelper::NotifyProcessListChange () void FastOS_UNIX_IPCHelper::Exit () { - std::lock_guard<std::mutex> guard(_lock); + _lock.Lock(); _exitFlag = true; NotifyProcessListChange(); + _lock.Unlock(); } void FastOS_UNIX_IPCHelper::AddProcess (FastOS_UNIX_Process *xproc) @@ -639,11 +639,16 @@ void FastOS_UNIX_IPCHelper::AddProcess (FastOS_UNIX_Process *xproc) void FastOS_UNIX_IPCHelper::RemoveProcess (FastOS_UNIX_Process *xproc) { - auto closePromise = std::make_unique<std::promise<void>>(); - auto closeFuture = closePromise->get_future(); - xproc->_closing = std::move(closePromise); + (void)xproc; + + FastOS_BoolCond closeWait; + + closeWait.SetBusy(); + xproc->_closing = &closeWait; + NotifyProcessListChange(); - closeFuture.wait(); + + closeWait.WaitBusy(); } void FastOS_UNIX_IPCHelper::DeliverMessages (FastOS_RingBuffer *buffer) @@ -651,7 +656,7 @@ void FastOS_UNIX_IPCHelper::DeliverMessages (FastOS_RingBuffer *buffer) if(buffer == nullptr) return; - auto bufferGuard = buffer->getGuard(); + buffer->Lock(); unsigned int readSpace; while((readSpace = buffer->GetReadSpace()) > sizeof(int)) @@ -668,6 +673,8 @@ void FastOS_UNIX_IPCHelper::DeliverMessages (FastOS_RingBuffer *buffer) else break; } + + buffer->Unlock(); } void FastOS_UNIX_IPCHelper:: @@ -683,7 +690,7 @@ PipeData (FastOS_UNIX_Process *process, if(listener == nullptr) return; - auto bufferGuard = buffer->getGuard(); + buffer->Lock(); unsigned int readSpace; while((readSpace = buffer->GetReadSpace()) > 0) { @@ -693,4 +700,6 @@ PipeData (FastOS_UNIX_Process *process, if(buffer->GetCloseFlag()) process->CloseListener(type); + + buffer->Unlock(); } diff --git a/fastos/src/vespa/fastos/unix_ipc.h b/fastos/src/vespa/fastos/unix_ipc.h index 218096e2145..35e77e11cb2 100644 --- a/fastos/src/vespa/fastos/unix_ipc.h +++ b/fastos/src/vespa/fastos/unix_ipc.h @@ -13,7 +13,7 @@ private: FastOS_UNIX_IPCHelper& operator=(const FastOS_UNIX_IPCHelper&); protected: - std::mutex _lock; + FastOS_Mutex _lock; volatile bool _exitFlag; FastOS_ApplicationInterface *_app; diff --git a/fastos/src/vespa/fastos/unix_mutex.cpp b/fastos/src/vespa/fastos/unix_mutex.cpp new file mode 100644 index 00000000000..535a39ce592 --- /dev/null +++ b/fastos/src/vespa/fastos/unix_mutex.cpp @@ -0,0 +1,18 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "mutex.h" +#include <cassert> + +FastOS_UNIX_Mutex::FastOS_UNIX_Mutex(void) + : FastOS_MutexInterface(), + _mutex() +{ + int error = pthread_mutex_init(&_mutex, nullptr); + assert(error == 0); + (void) error; +} + +FastOS_UNIX_Mutex::~FastOS_UNIX_Mutex(void) +{ + pthread_mutex_destroy(&_mutex); +} diff --git a/fastos/src/vespa/fastos/unix_mutex.h b/fastos/src/vespa/fastos/unix_mutex.h new file mode 100644 index 00000000000..30150bc1590 --- /dev/null +++ b/fastos/src/vespa/fastos/unix_mutex.h @@ -0,0 +1,44 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** +****************************************************************************** +* @author Oivind H. Danielsen +* @date Creation date: 2000-02-02 +* @file +* Class definition and implementation for FastOS_UNIX_Mutex +*****************************************************************************/ + + + +#pragma once + + +#include "mutex.h" +#include <pthread.h> + +class FastOS_UNIX_Mutex : public FastOS_MutexInterface +{ +private: + FastOS_UNIX_Mutex(const FastOS_UNIX_Mutex &other); + FastOS_UNIX_Mutex & operator = (const FastOS_UNIX_Mutex &other); +protected: + pthread_mutex_t _mutex; + +public: + FastOS_UNIX_Mutex(); + + ~FastOS_UNIX_Mutex(); + + bool TryLock () override { + return pthread_mutex_trylock(&_mutex) == 0; + } + + void Lock() override { + pthread_mutex_lock(&_mutex); + } + + void Unlock() override { + pthread_mutex_unlock(&_mutex); + } +}; + + diff --git a/fastos/src/vespa/fastos/unix_process.cpp b/fastos/src/vespa/fastos/unix_process.cpp index 80ad0605f78..df32cb935ff 100644 --- a/fastos/src/vespa/fastos/unix_process.cpp +++ b/fastos/src/vespa/fastos/unix_process.cpp @@ -805,10 +805,9 @@ FastOS_UNIX_Process (const char *cmdLine, bool pipeStdin, if (stderrListener != nullptr) _descriptor[TYPE_STDERR]._readBuffer.reset(new FastOS_RingBuffer(bufferSize)); - { - auto guard = _app->getProcessGuard(); - _app->AddChildProcess(this); - } + _app->ProcessLock(); + _app->AddChildProcess(this); + _app->ProcessUnlock(); // App::AddToIPCComm() is performed when the process is started } @@ -826,8 +825,9 @@ FastOS_UNIX_Process::~FastOS_UNIX_Process () static_cast<FastOS_UNIX_Application *>(_app)->RemoveFromIPCComm(this); } else { // No IPC descriptor, do it ourselves - auto guard = _app->getProcessGuard(); + _app->ProcessLock(); _app->RemoveChildProcess(this); + _app->ProcessUnlock(); } for(int i=0; i<int(TYPE_COUNT); i++) { @@ -897,7 +897,7 @@ bool FastOS_UNIX_Process::Signal(int sig) bool rc = false; pid_t pid; - auto guard = _app->getProcessGuard(); + _app->ProcessLock(); pid = GetProcessId(); if (pid == 0) { /* Do nothing */ @@ -908,6 +908,7 @@ bool FastOS_UNIX_Process::Signal(int sig) _killed = true; rc = true; } + _app->ProcessUnlock(); return rc; } @@ -1721,7 +1722,7 @@ CreateProcess (FastOS_UNIX_Process *process, const char *cmdLine = process->GetCommandLine(); - auto guard = _app->getProcessGuard(); + process->_app->ProcessLock(); if (process->GetDirectChild()) { _hasDirectChildren = true; @@ -1769,7 +1770,7 @@ CreateProcess (FastOS_UNIX_Process *process, "Forkandexec %s failed\n", cmdLine); } - guard.unlock(); + process->_app->ProcessUnlock(); delete rprocess; FreeEnvironmentVariables(env); return rc; @@ -1846,6 +1847,8 @@ CreateProcess (FastOS_UNIX_Process *process, } } } + process->_app->ProcessUnlock(); + return rc; } @@ -1923,13 +1926,13 @@ FastOS_UNIX_ProcessStarter::Wait(FastOS_UNIX_Process *process, *pollStillRunning = true; for (;;) { - { - auto guard = process->_app->getProcessGuard(); + process->_app->ProcessLock(); - if (_hasDirectChildren) PollReapDirectChildren(); + if (_hasDirectChildren) PollReapDirectChildren(); - if (_hasProxiedChildren) PollReapProxiedChildren(); - } + if (_hasProxiedChildren) PollReapProxiedChildren(); + + process->_app->ProcessUnlock(); if (process->GetDeathFlag()) { if (pollStillRunning != nullptr) @@ -1968,14 +1971,16 @@ bool FastOS_UNIX_ProcessStarter::Detach(FastOS_UNIX_Process *process) bool rc = true; pid_t pid; - auto guard = process->_app->getProcessGuard(); + process->_app->ProcessLock(); pid = process->GetProcessId(); if (pid == 0) { + process->_app->ProcessUnlock(); return false; // Cannot detach nonstarted process. } if (process->GetDeathFlag()) { + process->_app->ProcessUnlock(); return true; } @@ -1999,6 +2004,7 @@ bool FastOS_UNIX_ProcessStarter::Detach(FastOS_UNIX_Process *process) ReadBytes(_mainSocket, &returnCode, sizeof(int)); process->DeathNotification(returnCode); } + process->_app->ProcessUnlock(); return rc; } @@ -2038,4 +2044,4 @@ FastOS_UNIX_Process::DescriptorHandle::CloseHandleDirectChild() close(_fd); _fd = -1; } -} +}
\ No newline at end of file diff --git a/fastos/src/vespa/fastos/unix_process.h b/fastos/src/vespa/fastos/unix_process.h index bff5a1d276e..16614deb1a2 100644 --- a/fastos/src/vespa/fastos/unix_process.h +++ b/fastos/src/vespa/fastos/unix_process.h @@ -12,8 +12,8 @@ #include "app.h" #include <string> #include <memory> -#include <future> +class FastOS_BoolCond; class FastOS_UNIX_RealProcess; class FastOS_RingBuffer; @@ -78,7 +78,7 @@ public: { TYPE_READCOUNT = 3 }; - std::unique_ptr<std::promise<void>> _closing; + FastOS_BoolCond *_closing; FastOS_ProcessRedirectListener *GetListener (DescriptorType type) { if(type == TYPE_STDOUT) diff --git a/fnet/src/vespa/fnet/fnet.h b/fnet/src/vespa/fnet/fnet.h index 5a3a8b28942..7b8404376d7 100644 --- a/fnet/src/vespa/fnet/fnet.h +++ b/fnet/src/vespa/fnet/fnet.h @@ -3,6 +3,8 @@ #pragma once #include <vespa/vespalib/component/vtag.h> +#include <vespa/fastos/mutex.h> +#include <vespa/fastos/cond.h> // DEPRECATED diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_manager_initializer.cpp b/searchcore/src/vespa/searchcore/proton/attribute/attribute_manager_initializer.cpp index 9feaa738bf1..c59e3718b1e 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_manager_initializer.cpp +++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_manager_initializer.cpp @@ -46,14 +46,14 @@ public: class AttributeManagerInitializerTask : public vespalib::Executor::Task { - std::promise<bool> _promise; + std::promise<void> _promise; search::SerialNum _configSerialNum; DocumentMetaStore::SP _documentMetaStore; AttributeManager::SP _attrMgr; InitializedAttributesResult &_attributesResult; public: - AttributeManagerInitializerTask(std::promise<bool> &&promise, + AttributeManagerInitializerTask(std::promise<void> &&promise, search::SerialNum configSerialNum, DocumentMetaStore::SP documentMetaStore, AttributeManager::SP attrMgr, @@ -63,7 +63,7 @@ public: }; -AttributeManagerInitializerTask::AttributeManagerInitializerTask(std::promise<bool> &&promise, +AttributeManagerInitializerTask::AttributeManagerInitializerTask(std::promise<void> &&promise, search::SerialNum configSerialNum, DocumentMetaStore::SP documentMetaStore, AttributeManager::SP attrMgr, @@ -86,7 +86,7 @@ AttributeManagerInitializerTask::run() _attrMgr->addExtraAttribute(_documentMetaStore); _attrMgr->addInitializedAttributes(_attributesResult.get()); _attrMgr->pruneRemovedFields(_configSerialNum); - _promise.set_value(true); + _promise.set_value(); } class AttributeInitializerTasksBuilder : public IAttributeInitializerRegistry @@ -168,8 +168,8 @@ AttributeManagerInitializer::AttributeManagerInitializer(SerialNum configSerialN void AttributeManagerInitializer::run() { - std::promise<bool> promise; - std::future<bool> future = promise.get_future(); + std::promise<void> promise; + auto future = promise.get_future(); /* * Attribute manager and some its members (e.g. _attributeFieldWriter) assumes that work is performed * by document db master thread and lacks locking to handle calls from multiple threads. @@ -179,7 +179,7 @@ AttributeManagerInitializer::run() _documentMetaStore, _attrMgr, _attributesResult)); - (void) future.get(); + future.wait(); *_attrMgrResult = _attrMgr; } diff --git a/searchcore/src/vespa/searchcore/proton/initializer/task_runner.cpp b/searchcore/src/vespa/searchcore/proton/initializer/task_runner.cpp index 7deb0afa7af..770f00dc264 100644 --- a/searchcore/src/vespa/searchcore/proton/initializer/task_runner.cpp +++ b/searchcore/src/vespa/searchcore/proton/initializer/task_runner.cpp @@ -90,11 +90,11 @@ void TaskRunner::runTask(InitializerTask::SP task) { vespalib::ThreadStackExecutor executor(1, 128 * 1024); - std::promise<bool> promise; - std::future<bool> future = promise.get_future(); + std::promise<void> promise; + auto future = promise.get_future(); runTask(task, executor, - makeLambdaTask([&]() { promise.set_value(true); })); - (void) future.get(); + makeLambdaTask([&]() { promise.set_value(); })); + future.wait(); } void diff --git a/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.cpp b/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.cpp index cb5c5de0f5a..5941ef13365 100644 --- a/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.cpp +++ b/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.cpp @@ -28,31 +28,31 @@ GidToLidChangeListener::~GidToLidChangeListener() void GidToLidChangeListener::notifyPutDone(document::GlobalId gid, uint32_t lid) { - std::promise<bool> promise; - std::future<bool> future = promise.get_future(); + std::promise<void> promise; + auto future = promise.get_future(); _attributeFieldWriter.executeLambda(_executorId, - [this, &promise, gid, lid]() { _attr->notifyReferencedPut(gid, lid); promise.set_value(true); }); - (void) future.get(); + [this, &promise, gid, lid]() { _attr->notifyReferencedPut(gid, lid); promise.set_value(); }); + future.wait(); } void GidToLidChangeListener::notifyRemove(document::GlobalId gid) { - std::promise<bool> promise; - std::future<bool> future = promise.get_future(); + std::promise<void> promise; + auto future = promise.get_future(); _attributeFieldWriter.executeLambda(_executorId, - [this, &promise, gid]() { _attr->notifyReferencedRemove(gid); promise.set_value(true); }); - (void) future.get(); + [this, &promise, gid]() { _attr->notifyReferencedRemove(gid); promise.set_value(); }); + future.wait(); } void GidToLidChangeListener::notifyRegistered() { - std::promise<bool> promise; - std::future<bool> future = promise.get_future(); + std::promise<void> promise; + auto future = promise.get_future(); _attributeFieldWriter.executeLambda(_executorId, - [this, &promise]() { _attr->populateReferencedLids(); promise.set_value(true); }); - (void) future.get(); + [this, &promise]() { _attr->populateReferencedLids(); promise.set_value(); }); + future.wait(); } const vespalib::string & diff --git a/searchcore/src/vespa/searchcore/proton/server/document_subdb_initializer.cpp b/searchcore/src/vespa/searchcore/proton/server/document_subdb_initializer.cpp index a9609bf0cc5..510be013c52 100644 --- a/searchcore/src/vespa/searchcore/proton/server/document_subdb_initializer.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/document_subdb_initializer.cpp @@ -29,10 +29,10 @@ addDocumentMetaStoreInitTask(InitTask::SP documentMetaStoreInitTask) void DocumentSubDbInitializer::run() { - std::promise<bool> promise; - std::future<bool> future = promise.get_future(); - _master.execute(makeLambdaTask([&]() { _subDB.setup(_result); promise.set_value(true); })); - (void) future.get(); + std::promise<void> promise; + auto future = promise.get_future(); + _master.execute(makeLambdaTask([&]() { _subDB.setup(_result); promise.set_value(); })); + future.wait(); } } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/proton_configurer.cpp b/searchcore/src/vespa/searchcore/proton/server/proton_configurer.cpp index 436cf3865e5..00360a9c405 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton_configurer.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/proton_configurer.cpp @@ -179,10 +179,10 @@ ProtonConfigurer::applyInitialConfig(InitializeThreads initializeThreads) { // called by proton app main thread assert(!_executor.isCurrentThread()); - std::promise<bool> promise; - std::future<bool> future = promise.get_future(); - _executor.execute(makeLambdaTask([this, initializeThreads, &promise]() { applyConfig(getPendingConfigSnapshot(), initializeThreads, true); promise.set_value(true); })); - (void) future.get(); + std::promise<void> promise; + auto future = promise.get_future(); + _executor.execute(makeLambdaTask([this, initializeThreads, &promise]() { applyConfig(getPendingConfigSnapshot(), initializeThreads, true); promise.set_value(); })); + future.wait(); } } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlydocsubdb.cpp b/searchcore/src/vespa/searchcore/proton/server/storeonlydocsubdb.cpp index bba03621f8a..e78f8136d26 100644 --- a/searchcore/src/vespa/searchcore/proton/server/storeonlydocsubdb.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/storeonlydocsubdb.cpp @@ -200,10 +200,10 @@ StoreOnlyDocSubDB::onReplayDone() _dms->shrinkLidSpace(); uint32_t docIdLimit = _dms->getCommittedDocIdLimit(); auto &docStore = _rSummaryMgr->getBackingStore(); - std::promise<bool> promise; - std::future<bool> future = promise.get_future(); - _writeService.summary().execute(makeLambdaTask([&]() { docStoreReplayDone(docStore, docIdLimit); promise.set_value(true); })); - (void) future.get(); + std::promise<void> promise; + auto future = promise.get_future(); + _writeService.summary().execute(makeLambdaTask([&]() { docStoreReplayDone(docStore, docIdLimit); promise.set_value(); })); + future.wait(); } diff --git a/searchlib/src/vespa/searchlib/common/threaded_compactable_lid_space.cpp b/searchlib/src/vespa/searchlib/common/threaded_compactable_lid_space.cpp index 531d7be38cc..079a1f493de 100644 --- a/searchlib/src/vespa/searchlib/common/threaded_compactable_lid_space.cpp +++ b/searchlib/src/vespa/searchlib/common/threaded_compactable_lid_space.cpp @@ -23,10 +23,10 @@ ThreadedCompactableLidSpace::~ThreadedCompactableLidSpace() void ThreadedCompactableLidSpace::compactLidSpace(uint32_t wantedDocLidLimit) { - std::promise<bool> promise; - std::future<bool> future = promise.get_future(); - _executor.executeLambda(_executorId, [this, wantedDocLidLimit, &promise]() { _target->compactLidSpace(wantedDocLidLimit); promise.set_value(true); }); - (void) future.get(); + std::promise<void> promise; + auto future = promise.get_future(); + _executor.executeLambda(_executorId, [this, wantedDocLidLimit, &promise]() { _target->compactLidSpace(wantedDocLidLimit); promise.set_value(); }); + future.wait(); } bool @@ -44,10 +44,10 @@ ThreadedCompactableLidSpace::getEstimatedShrinkLidSpaceGain() const void ThreadedCompactableLidSpace::shrinkLidSpace() { - std::promise<bool> promise; - std::future<bool> future = promise.get_future(); - _executor.executeLambda(_executorId, [this, &promise]() { _target->shrinkLidSpace(); promise.set_value(true); }); - (void) future.get(); + std::promise<void> promise; + auto future = promise.get_future(); + _executor.executeLambda(_executorId, [this, &promise]() { _target->shrinkLidSpace(); promise.set_value(); }); + future.wait(); } } |