summaryrefslogtreecommitdiffstats
path: root/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/ApplicationMaintainer.java
blob: f729c30de20952d5565bd5fa6db0bf38bb5667c5 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.hosted.provision.maintenance;

import com.yahoo.concurrent.DaemonThreadFactory;
import com.yahoo.config.provision.ApplicationId;
import com.yahoo.config.provision.Deployer;
import com.yahoo.config.provision.Deployment;
import com.yahoo.log.LogLevel;
import com.yahoo.transaction.Mutex;
import com.yahoo.vespa.hosted.provision.Node;
import com.yahoo.vespa.hosted.provision.NodeRepository;

import java.time.Duration;
import java.time.Instant;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
 * @author bratseth
 * @author mpolden
 */
public abstract class ApplicationMaintainer extends Maintainer {

    private final Deployer deployer;
    private final CopyOnWriteArrayList<ApplicationId> pendingDeployments = new CopyOnWriteArrayList<>();

    // Use a fixed thread pool to avoid overload on config servers. Resource usage when deploying varies
    // a lot between applications, so doing one by one avoids issues where one or more resource-demanding
    // deployments happen simultaneously
    private final ThreadPoolExecutor deploymentExecutor = new ThreadPoolExecutor(1, 1,
                                                                                 0L, TimeUnit.MILLISECONDS,
                                                                                 new LinkedBlockingQueue<>(),
                                                                                 new DaemonThreadFactory("node repo application maintainer"));

    protected ApplicationMaintainer(Deployer deployer, NodeRepository nodeRepository, Duration interval, JobControl jobControl) {
        super(nodeRepository, interval, jobControl);
        this.deployer = deployer;
    }

    @Override
    protected final void maintain() {
        applicationsNeedingMaintenance().forEach(this::deploy);
    }

    /** Returns the number of deployments that are pending execution */
    public int pendingDeployments() {
        return pendingDeployments.size();
    }

    /** Returns whether given application should be deployed at this moment in time */
    protected boolean canDeployNow(ApplicationId application) {
        return true;
    }

    /**
     * Redeploy this application.
     *
     * The default implementation deploys asynchronously to make sure we do all applications timely
     * even when deployments are slow.
     */
    protected void deploy(ApplicationId application) {
        if (pendingDeployments.addIfAbsent(application)) { // Avoid queuing multiple deployments for same application
            log.log(LogLevel.INFO, application + " will be deployed, last deploy time " +
                                   getLastDeployTime(application));
            deploymentExecutor.execute(() -> deployWithLock(application));
        }
    }

    protected Deployer deployer() { return deployer; }

    protected Set<ApplicationId> applicationsNeedingMaintenance() {
        return nodesNeedingMaintenance().stream()
                                        .map(node -> node.allocation().get().owner())
                                        .collect(Collectors.toCollection(LinkedHashSet::new));
    }

    /**
     * Returns the nodes whose applications should be maintained by this now. 
     * This should be some subset of the allocated nodes. 
     */
    protected abstract List<Node> nodesNeedingMaintenance();

    /** Redeploy this application. A lock will be taken for the duration of the deployment activation */
    protected final void deployWithLock(ApplicationId application) {
        // An application might change its state between the time the set of applications is retrieved and the
        // time deployment happens. Lock the application and check if it's still active.
        //
        // Lock is acquired with a low timeout to reduce the chance of colliding with an external deployment.
        try (Mutex lock = nodeRepository().lock(application, Duration.ofSeconds(1))) {
            if ( ! isActive(application)) return; // became inactive since deployment was requested
            if ( ! canDeployNow(application)) return; // redeployment is no longer needed
            Optional<Deployment> deployment = deployer.deployFromLocalActive(application);
            if ( ! deployment.isPresent()) return; // this will be done at another config server
            log.log(LogLevel.DEBUG, this.getClass().getSimpleName() + " deploying " + application);
            deployment.get().activate();
        } catch (RuntimeException e) {
            log.log(LogLevel.WARNING, "Exception on maintenance redeploy", e);
        } finally {
            pendingDeployments.remove(application);
        }
    }

    /** Returns the last time application was deployed. Epoch is returned if the application has never been deployed. */
    protected final Instant getLastDeployTime(ApplicationId application) {
        return deployer.lastDeployTime(application).orElse(Instant.EPOCH);
    }

    /** Returns true when application has at least one active node */
    private boolean isActive(ApplicationId application) {
        return ! nodeRepository().getNodes(application, Node.State.active).isEmpty();
    }

    @Override
    public void deconstruct() {
        super.deconstruct();
        this.deploymentExecutor.shutdownNow();
        try {
            // Give deployments in progress some time to complete
            this.deploymentExecutor.awaitTermination(1, TimeUnit.MINUTES);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

}