aboutsummaryrefslogtreecommitdiffstats
path: root/orchestrator/src/main/java/com/yahoo/vespa/orchestrator/status/ZookeeperStatusService.java
blob: c56ff661bba8d04985720445b237c67fdaf2e5cb (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.orchestrator.status;

import com.yahoo.container.jaxrs.annotation.Component;
import com.yahoo.log.LogLevel;
import com.yahoo.vespa.applicationmodel.ApplicationInstanceReference;
import com.yahoo.vespa.applicationmodel.HostName;
import com.yahoo.vespa.curator.Curator;
import com.yahoo.vespa.curator.Lock;
import com.yahoo.vespa.orchestrator.OrchestratorContext;
import com.yahoo.vespa.orchestrator.OrchestratorUtil;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.data.Stat;

import javax.inject.Inject;
import java.time.Duration;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Logger;

/**
 * Stores instance suspension status and which hosts are allowed to go down in zookeeper.
 *
 * TODO: expiry of old application instances
 * @author Tony Vaagenes
 */
public class ZookeeperStatusService implements StatusService {

    private static final Logger log = Logger.getLogger(ZookeeperStatusService.class.getName());

    final static String HOST_STATUS_BASE_PATH = "/vespa/host-status-service";
    final static String APPLICATION_STATUS_BASE_PATH = "/vespa/application-status-service";

    private final Curator curator;

    @Inject
    public ZookeeperStatusService(@Component Curator curator) {
        this.curator = curator;
    }

    @Override
    public ReadOnlyStatusRegistry forApplicationInstance(ApplicationInstanceReference applicationInstanceReference) {
        return new ReadOnlyStatusRegistry() {
            @Override
            public HostStatus getHostStatus(HostName hostName) {
                return getInternalHostStatus(applicationInstanceReference, hostName);
            }

            @Override
            public ApplicationInstanceStatus getApplicationInstanceStatus() {
                return getInternalApplicationInstanceStatus(applicationInstanceReference);
            }
        };
    }

    @Override
    public Set<ApplicationInstanceReference> getAllSuspendedApplications() {
        try {
            Set<ApplicationInstanceReference> resultSet = new HashSet<>();

            // Return empty set if the base path does not exist
            Stat stat = curator.framework().checkExists().forPath(APPLICATION_STATUS_BASE_PATH);
            if (stat == null) return resultSet;

            // The path exist and we may have children
            for (String appRefStr : curator.framework().getChildren().forPath(APPLICATION_STATUS_BASE_PATH)) {
                ApplicationInstanceReference appRef = OrchestratorUtil.parseAppInstanceReference(appRefStr);
                resultSet.add(appRef);
            }

            return resultSet;
        } catch (Exception e) {
            log.log(LogLevel.DEBUG, "Something went wrong while listing out applications in suspend.", e);
            throw new RuntimeException(e);
        }
    }

    /**
     *  1) locks the status service for an application instance.
     *  2) fails all operations in this thread when the session is lost,
     *     since session loss might cause the lock to be lost.
     *     Since it only fails operations in this thread,
     *     all operations depending on a lock, including the locking itself, must be done in this thread.
     *     Note that since it is the thread that fails, all status operations in this thread will fail
     *     even if they're not supposed to be guarded by this lock
     *     (i.e. the request is for another applicationInstanceReference)
     */
    @Override
    public MutableStatusRegistry lockApplicationInstance_forCurrentThreadOnly(
            OrchestratorContext context,
            ApplicationInstanceReference applicationInstanceReference) {
        Duration duration = context.getTimeLeft();
        String lockPath = applicationInstanceLock2Path(applicationInstanceReference);
        Lock lock = new Lock(lockPath, curator);
        lock.acquire(duration);

        try {
            return new ZkMutableStatusRegistry(lock, applicationInstanceReference, context.isProbe());
        } catch (Throwable t) {
            // In case the constructor throws an exception.
            lock.close();
            throw t;
        }
    }

    private InterProcessSemaphoreMutex acquireMutexOrThrow(long timeout, TimeUnit timeoutTimeUnit, String lockPath) throws Exception {
        InterProcessSemaphoreMutex mutex = new InterProcessSemaphoreMutex(curator.framework(), lockPath);

        log.log(LogLevel.DEBUG, "Waiting for lock on " + lockPath);
        boolean acquired = mutex.acquire(timeout, timeoutTimeUnit);
        if (!acquired) {
            log.log(LogLevel.DEBUG, "Timed out waiting for lock on " + lockPath);
            throw new TimeoutException("Timed out waiting for lock on " + lockPath);
        }
        log.log(LogLevel.DEBUG, "Successfully acquired lock on " + lockPath);
        return mutex;
    }

    private void setHostStatus(ApplicationInstanceReference applicationInstanceReference,
                               HostName hostName,
                               HostStatus status) {
        String path = hostAllowedDownPath(applicationInstanceReference, hostName);

        try {
            switch (status) {
                case NO_REMARKS:
                    deleteNode_ignoreNoNodeException(path,"Host already has state NO_REMARKS, path = " + path);
                    break;
                case ALLOWED_TO_BE_DOWN:
                    createNode_ignoreNodeExistsException(path,
                                                         "Host already has state ALLOWED_TO_BE_DOWN, path = " + path);
            }
        } catch (Exception e) {
            //TODO: IOException with explanation
            throw new RuntimeException(e);
        }
    }

    private void deleteNode_ignoreNoNodeException(String path, String debugLogMessageIfNotExists) throws Exception {
        try {
            curator.framework().delete().forPath(path);
        } catch (NoNodeException e) {
            log.log(LogLevel.DEBUG, debugLogMessageIfNotExists, e);
        }
    }

    private void createNode_ignoreNodeExistsException(String path, String debugLogMessageIfExists) throws Exception {
        try {
            curator.framework().create()
                    .creatingParentsIfNeeded()
                    .forPath(path);
        } catch (NodeExistsException e) {
            log.log(LogLevel.DEBUG, debugLogMessageIfExists, e);
        }
    }

    //TODO: Eliminate repeated calls to getHostStatus, replace with bulk operation.
    private HostStatus getInternalHostStatus(ApplicationInstanceReference applicationInstanceReference, HostName hostName) {
        try {
            Stat statOrNull = curator.framework().checkExists().forPath(
                    hostAllowedDownPath(applicationInstanceReference, hostName));

            return (statOrNull == null) ? HostStatus.NO_REMARKS : HostStatus.ALLOWED_TO_BE_DOWN;
        } catch (Exception e) {
            //TODO: IOException with explanation - Should we only catch IOExceptions or are they a special case?
            throw new RuntimeException(e);
        }
    }

    /** Common implementation for the two internal classes that sets ApplicationInstanceStatus. */
    private ApplicationInstanceStatus getInternalApplicationInstanceStatus(ApplicationInstanceReference applicationInstanceReference) {
        try {
            Stat statOrNull = curator.framework().checkExists().forPath(
                    applicationInstanceSuspendedPath(applicationInstanceReference));

            return (statOrNull == null) ? ApplicationInstanceStatus.NO_REMARKS : ApplicationInstanceStatus.ALLOWED_TO_BE_DOWN;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private HostStatus getHostStatusWithLock(
            final ApplicationInstanceReference applicationInstanceReference,
            final HostName hostName) {
        return getInternalHostStatus(applicationInstanceReference, hostName);
    }

    private static String applicationInstancePath(ApplicationInstanceReference applicationInstanceReference) {
        return HOST_STATUS_BASE_PATH + '/' +
                applicationInstanceReference.tenantId() + ":" + applicationInstanceReference.applicationInstanceId();
    }

    private static String hostsAllowedDownPath(ApplicationInstanceReference applicationInstanceReference) {
        return applicationInstancePath(applicationInstanceReference) + "/hosts-allowed-down";
    }

    private static String applicationInstanceLockPath(ApplicationInstanceReference applicationInstanceReference) {
        return applicationInstancePath(applicationInstanceReference) + "/lock";
    }

    private static String applicationInstanceLock2Path(ApplicationInstanceReference applicationInstanceReference) {
        return applicationInstancePath(applicationInstanceReference) + "/lock2";
    }

    private String applicationInstanceSuspendedPath(ApplicationInstanceReference applicationInstanceReference) {
        return APPLICATION_STATUS_BASE_PATH + "/" + OrchestratorUtil.toRestApiFormat(applicationInstanceReference);
    }

    private static String hostAllowedDownPath(ApplicationInstanceReference applicationInstanceReference, HostName hostname) {
        return hostsAllowedDownPath(applicationInstanceReference) + '/' + hostname.s();
    }

    private class ZkMutableStatusRegistry implements MutableStatusRegistry {

        private final Lock lock;
        private final ApplicationInstanceReference applicationInstanceReference;
        private final boolean probe;

        public ZkMutableStatusRegistry(Lock lock,
                                       ApplicationInstanceReference applicationInstanceReference,
                                       boolean probe) {
            this.lock = lock;
            this.applicationInstanceReference = applicationInstanceReference;
            this.probe = probe;
        }

        @Override
        public void setHostState(final HostName hostName, final HostStatus status) {
            if (probe) return;
            log.log(LogLevel.INFO, "Setting host " + hostName + " to status " + status);
            setHostStatus(applicationInstanceReference, hostName, status);
        }

        @Override
        public void setApplicationInstanceStatus(ApplicationInstanceStatus applicationInstanceStatus) {
            if (probe) return;

            log.log(LogLevel.INFO, "Setting app " + applicationInstanceReference.asString() + " to status " + applicationInstanceStatus);

            String path = applicationInstanceSuspendedPath(applicationInstanceReference);
            try {
                switch (applicationInstanceStatus) {
                    case NO_REMARKS:
                        deleteNode_ignoreNoNodeException(path,
                                "Instance is already in state NO_REMARKS, path = " + path);
                        break;
                    case ALLOWED_TO_BE_DOWN:
                        createNode_ignoreNodeExistsException(path,
                                "Instance is already in state ALLOWED_TO_BE_DOWN, path = " + path);
                        break;
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        @Override
        public HostStatus getHostStatus(final HostName hostName) {
            return getHostStatusWithLock(applicationInstanceReference, hostName);
        }

        @Override
        public ApplicationInstanceStatus getApplicationInstanceStatus() {
            return getInternalApplicationInstanceStatus(applicationInstanceReference);
        }

        @Override
        @NoThrow
        public void close()  {
            try {
                lock.close();
            } catch (RuntimeException e) {
                // We may want to avoid logging some exceptions that may be expected, like when session expires.
                log.log(LogLevel.WARNING,
                        "Failed to close application lock for " +
                        ZookeeperStatusService.class.getSimpleName() + ", will ignore and continue",
                        e);
            }
        }
    }

}