aboutsummaryrefslogtreecommitdiffstats
path: root/orchestrator/src/main/java/com/yahoo/vespa/orchestrator/status/ZkStatusService.java
blob: b6dc1b20fc6a755eb1a6e992e47f167e34309cdd (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.orchestrator.status;

import ai.vespa.metrics.ConfigServerMetrics;
import com.yahoo.concurrent.UncheckedTimeoutException;
import com.yahoo.config.provision.ApplicationId;
import com.yahoo.jdisc.Metric;
import com.yahoo.jdisc.Timer;
import com.yahoo.path.Path;
import com.yahoo.vespa.applicationmodel.ApplicationInstanceReference;
import com.yahoo.vespa.applicationmodel.HostName;
import com.yahoo.vespa.curator.Curator;
import com.yahoo.vespa.curator.Lock;
import com.yahoo.vespa.orchestrator.OrchestratorContext;
import com.yahoo.vespa.orchestrator.OrchestratorUtil;
import com.yahoo.vespa.service.monitor.AntiServiceMonitor;
import com.yahoo.vespa.service.monitor.CriticalRegion;
import org.apache.zookeeper.data.Stat;

import javax.inject.Inject;
import java.time.Duration;
import java.time.Instant;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * Stores instance suspension status and which hosts are allowed to go down in zookeeper.
 *
 * TODO: expiry of old application instances
 * @author Tony Vaagenes
 */
public class ZkStatusService implements StatusService {

    private static final Logger log = Logger.getLogger(ZkStatusService.class.getName());

    final static String HOST_STATUS_BASE_PATH = "/vespa/host-status-service";
    final static String APPLICATION_STATUS_BASE_PATH = "/vespa/application-status-service";

    private final Curator curator;
    private final HostInfosCache hostInfosCache;
    private final Metric metric;
    private final Timer timer;
    private final AntiServiceMonitor antiServiceMonitor;

    /**
     * A cache of metric contexts for each possible dimension map. In practice, there is one dimension map
     * for each application, so up to hundreds of elements.
     */
    private final ConcurrentHashMap<Map<String, String>, Metric.Context> cachedContexts = new ConcurrentHashMap<>();

    @Inject
    public ZkStatusService(Curator curator, Metric metric, Timer timer, AntiServiceMonitor antiServiceMonitor) {
        this(curator, metric, timer,
             new HostInfosCache(curator, new HostInfosServiceImpl(curator, timer)),
             antiServiceMonitor);
    }

    /** Non-private for testing only. */
    ZkStatusService(Curator curator, Metric metric, Timer timer, HostInfosCache hostInfosCache,
                    AntiServiceMonitor antiServiceMonitor) {
        this.curator = curator;
        this.metric = metric;
        this.timer = timer;
        this.hostInfosCache = hostInfosCache;
        this.antiServiceMonitor = antiServiceMonitor;
    }

    @Override
    public Set<ApplicationInstanceReference> getAllSuspendedApplications() {
        try {
            Set<ApplicationInstanceReference> resultSet = new HashSet<>();

            // Return empty set if the base path does not exist
            Stat stat = curator.framework().checkExists().forPath(APPLICATION_STATUS_BASE_PATH);
            if (stat == null) return resultSet;

            // The path exist and we may have children
            for (String referenceString : curator.framework().getChildren().forPath(APPLICATION_STATUS_BASE_PATH)) {
                ApplicationInstanceReference reference = OrchestratorUtil.parseApplicationInstanceReference(referenceString);
                resultSet.add(reference);
            }

            return resultSet;
        } catch (Exception e) {
            log.log(Level.FINE, "Something went wrong while listing out applications in suspend.", e);
            throw new RuntimeException(e);
        }
    }

    /**
     * Cache is checked for freshness when this mapping is created, and may be invalidated again later
     * by other users of the cache. Since this function is backed by the cache, any such invalidation
     * will be reflected in the returned mapping; all users of the cache collaborate in repopulating it.
     */
    @Override
    public Function<ApplicationInstanceReference, HostInfos> getHostInfosByApplicationResolver() {
        hostInfosCache.refreshCache();
        return hostInfosCache::getCachedHostInfos;
    }


    /**
     *  1) locks the status service for an application instance.
     *  2) fails all operations in this thread when the session is lost,
     *     since session loss might cause the lock to be lost.
     *     Since it only fails operations in this thread,
     *     all operations depending on a lock, including the locking itself, must be done in this thread.
     *     Note that since it is the thread that fails, all status operations in this thread will fail
     *     even if they're not supposed to be guarded by this lock
     *     (i.e. the request is for another applicationInstanceReference)
     */
    @Override
    public ApplicationLock lockApplication(OrchestratorContext context, ApplicationInstanceReference reference)
            throws UncheckedTimeoutException {

        Runnable onRegistryClose;

        // A multi-application operation, aka batch suspension, will first issue a probe
        // then a non-probe. With "large locks", the lock is not released in between -
        // no lock is taken on the non-probe. Instead, the release is done on the multi-application
        // context close.
        if (context.hasLock(reference)) {
            onRegistryClose = () -> {};
        } else {
            Runnable unlock = acquireLock(context, reference);
            if (context.registerLockAcquisition(reference, unlock)) {
                onRegistryClose = () -> {};
            } else {
                onRegistryClose = unlock;
            }
        }

        try {
            return new ZkApplicationLock(
                    this,
                    curator,
                    onRegistryClose,
                    reference,
                    context.isProbe(),
                    hostInfosCache);
        } catch (Throwable t) {
            // In case the constructor throws an exception.
            onRegistryClose.run();
            throw t;
        }
    }

    private Runnable acquireLock(OrchestratorContext context, ApplicationInstanceReference reference)
            throws UncheckedTimeoutException {
        ApplicationId applicationId = OrchestratorUtil.toApplicationId(reference);
        String app = applicationId.application().value() + "." + applicationId.instance().value();
        Map<String, String> dimensions = Map.of(
                "tenantName", applicationId.tenant().value(),
                "applicationId", applicationId.toFullString(),
                "app", app);
        Metric.Context metricContext = cachedContexts.computeIfAbsent(dimensions, metric::createContext);

        Duration duration = context.getTimeLeft();
        String lockPath = applicationInstanceLock2Path(reference);
        Lock lock = new Lock(lockPath, curator);

        Instant startTime = timer.currentTime();
        Instant acquireEndTime;
        boolean lockAcquired = false;
        try {
            lock.acquire(duration);
            lockAcquired = true;
        } finally {
            acquireEndTime = timer.currentTime();
            double seconds = durationInSeconds(startTime, acquireEndTime);
            // TODO: These metrics are redundant with Lock's metrics
            metric.set(ConfigServerMetrics.ORCHESTRATOR_LOCK_ACQUIRE_LATENCY.baseName(), seconds, metricContext);
            metric.set(ConfigServerMetrics.ORCHESTRATOR_LOCK_ACQUIRED.baseName(), lockAcquired ? 1 : 0, metricContext);

            metric.add(ConfigServerMetrics.ORCHESTRATOR_LOCK_ACQUIRE.baseName(), 1, metricContext);
            String acquireResultMetricName = lockAcquired
                    ? ConfigServerMetrics.ORCHESTRATOR_LOCK_ACQUIRE_SUCCESS.baseName()
                    : ConfigServerMetrics.ORCHESTRATOR_LOCK_ACQUIRE_TIMEOUT.baseName();
            metric.add(acquireResultMetricName, 1, metricContext);
        }

        CriticalRegion inaccessibleDuperModelRegion = antiServiceMonitor
                .disallowDuperModelLockAcquisition(ZkStatusService.class.getSimpleName() + " application lock");

        return () -> {
            try {
                lock.close();
            } catch (RuntimeException e) {
                // We may want to avoid logging some exceptions that may be expected, like when session expires.
                log.log(Level.WARNING,
                        "Failed to close application lock for " +
                                ZkStatusService.class.getSimpleName() + ", will ignore and continue",
                        e);
            }

            inaccessibleDuperModelRegion.close();

            Instant lockReleasedTime = timer.currentTime();
            double seconds = durationInSeconds(acquireEndTime, lockReleasedTime);
            metric.set(ConfigServerMetrics.ORCHESTRATOR_LOCK_HOLD_LATENCY.baseName(), seconds, metricContext);
        };
    }

    private double durationInSeconds(Instant startInstant, Instant endInstant) {
        return Duration.between(startInstant, endInstant).toMillis() / 1000.0;
    }

    @Override
    public HostInfo getHostInfo(ApplicationInstanceReference reference, HostName hostName) {
        return hostInfosCache.getHostInfos(reference).getOrNoRemarks(hostName);
    }

    @Override
    public ApplicationInstanceStatus getApplicationInstanceStatus(ApplicationInstanceReference reference) {
        try {
            Stat statOrNull = curator.framework().checkExists().forPath(
                    applicationInstanceSuspendedPath(reference));

            return (statOrNull == null) ? ApplicationInstanceStatus.NO_REMARKS : ApplicationInstanceStatus.ALLOWED_TO_BE_DOWN;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * Remove all host-related data in ZooKeeper for all hostnames outside the given set.
     */
    @Override
    public void onApplicationActivate(ApplicationInstanceReference reference, Set<HostName> hostnames) {
        withLockForAdminOp(reference, " was activated", () -> {
            HostInfos hostInfos = hostInfosCache.getCachedHostInfos(reference);
            Set<HostName> toRemove = new HashSet<>(hostInfos.getZkHostnames());
            toRemove.removeAll(hostnames);
            if (toRemove.size() > 0) {
                hostInfosCache.removeHosts(reference, toRemove);
            }
        });
    }

    /**
     * Remove the application from ZooKeeper.
     *
     * <ol>
     *     <li>/vespa/host-status/APPLICATION_ID (should just be ./hosts/*)</li>
     *     <li>/vespa/host-status-service/REFERENCE/hosts-allowed-down  (should just be ./*)</li>
     *     <li>/vespa/application-status-service/REFERENCE  (should just be .)</li>
     * </ol>
     */
    @Override
    public void onApplicationRemove(ApplicationInstanceReference reference) {
        withLockForAdminOp(reference, " was removed", () -> {
            // /vespa/application-status-service/REFERENCE
            curator.delete(Path.fromString(applicationInstanceSuspendedPath(reference)));

            // /vespa/host-status-service/REFERENCE/hosts-allowed-down
            curator.delete(Path.fromString(hostsAllowedDownPath(reference)));

            // /vespa/host-status/APPLICATION_ID
            hostInfosCache.removeApplication(reference);
        });
    }

    private void withLockForAdminOp(ApplicationInstanceReference reference,
                                    String eventDescription,
                                    Runnable runnable) {
        OrchestratorContext context = OrchestratorContext.createContextForAdminOp(timer.toUtcClock());

        final ApplicationLock lock;
        try {
            lock = lockApplication(context, reference);
        } catch (RuntimeException e) {
            log.log(Level.SEVERE, "Failed to get Orchestrator lock on when " + reference +
                    eventDescription + ": " + e.getMessage());
            return;
        }

        try (lock) {
            runnable.run();
        } catch (RuntimeException e) {
            log.log(Level.SEVERE, "Failed to clean up after " + reference + eventDescription +
                    ": " + e.getMessage());
        }
    }

    static String applicationInstanceReferencePath(ApplicationInstanceReference reference) {
        return HOST_STATUS_BASE_PATH + '/' + reference.asString();
    }

    private static String hostsAllowedDownPath(ApplicationInstanceReference reference) {
        return applicationInstanceReferencePath(reference) + "/hosts-allowed-down";
    }

    private static String applicationInstanceLock2Path(ApplicationInstanceReference reference) {
        return applicationInstanceReferencePath(reference) + "/lock2";
    }

    String applicationInstanceSuspendedPath(ApplicationInstanceReference reference) {
        return APPLICATION_STATUS_BASE_PATH + "/" + OrchestratorUtil.toRestApiFormat(reference);
    }

    private static String hostAllowedDownPath(ApplicationInstanceReference reference, HostName hostname) {
        return hostsAllowedDownPath(reference) + '/' + hostname.s();
    }

}