aboutsummaryrefslogtreecommitdiffstats
path: root/configserver/src/main/java/com/yahoo/vespa/config/server/application/ApplicationCuratorDatabase.java
blob: e17fda76a01b237a3dcb65e7a3f04f717e827f34 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.config.server.application;

import com.yahoo.config.provision.ApplicationId;
import com.yahoo.config.provision.TenantName;
import com.yahoo.path.Path;
import com.yahoo.slime.Cursor;
import com.yahoo.slime.Inspector;
import com.yahoo.slime.Slime;
import com.yahoo.slime.SlimeUtils;
import com.yahoo.text.Utf8;
import com.yahoo.transaction.Transaction;
import com.yahoo.vespa.config.server.application.ApplicationReindexing.Cluster;
import com.yahoo.vespa.config.server.application.ApplicationReindexing.Status;
import com.yahoo.vespa.config.server.tenant.TenantRepository;
import com.yahoo.vespa.curator.Curator;
import com.yahoo.vespa.curator.Lock;
import com.yahoo.vespa.curator.transaction.CuratorOperations;
import com.yahoo.vespa.curator.transaction.CuratorTransaction;
import com.yahoo.yolean.Exceptions;

import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.ExecutorService;
import java.util.function.UnaryOperator;

import static com.yahoo.vespa.curator.transaction.CuratorOperations.setData;
import static java.util.stream.Collectors.toUnmodifiableMap;

/**
 * Stores data and holds locks for the applications of a tenant, backed by a {@link Curator}.
 *
 * Each application is stored under /config/v2/tenants/<tenant>/applications/<application>,
 * the root contains the currently active session, if any. Children of this node may hold more data.
 * Locks for synchronising writes to these paths, and changes to the config of this application, are found
 * under /config/v2/tenants/<tenant>/locks/<application>.
 *
 * @author jonmv
 */
public class ApplicationCuratorDatabase {

    final TenantName tenant;
    final Path applicationsPath;
    final Path locksPath;

    private final Curator curator;

    public ApplicationCuratorDatabase(TenantName tenant, Curator curator) {
        this.tenant = tenant;
        this.applicationsPath = TenantRepository.getApplicationsPath(tenant);
        this.locksPath = TenantRepository.getLocksPath(tenant);
        this.curator = curator;
    }

    /** Returns the lock for changing the session status of the given application. */
    public Lock lock(ApplicationId id) {
        return curator.lock(lockPath(id), Duration.ofMinutes(1)); // These locks shouldn't be held for very long.
    }

    /** Reads, modifies and writes the application reindexing for this application, while holding its lock. */
    public void modifyReindexing(ApplicationId id, ApplicationReindexing emptyValue, UnaryOperator<ApplicationReindexing> modifications) {
        try (Lock lock = curator.lock(reindexingLockPath(id), Duration.ofMinutes(1))) {
            writeReindexingStatus(id, modifications.apply(readReindexingStatus(id).orElse(emptyValue)));
        }
    }

    public boolean exists(ApplicationId id) {
        return curator.exists(applicationPath(id));
    }

    /**
     * Creates a node for the given application, marking its existence.
     */
    public void createApplication(ApplicationId id) {
        if ( ! id.tenant().equals(tenant))
            throw new IllegalArgumentException("Cannot write application id '" + id + "' for tenant '" + tenant + "'");

        try (Lock lock = lock(id)) {
            if (curator.exists(applicationPath(id))) return;

            var applicationData = new ApplicationData(id, OptionalLong.empty(), OptionalLong.empty());
            curator.set(applicationPath(id), applicationData.toJson());
            modifyReindexing(id, ApplicationReindexing.empty(), UnaryOperator.identity());
        }
    }

    /**
     * Creates a node for the given application, marking its existence.
     */
    // TODO: Remove in Vespa 9
    public void createApplicationInOldFormat(ApplicationId id) {
        if (! id.tenant().equals(tenant))
            throw new IllegalArgumentException("Cannot write application id '" + id + "' for tenant '" + tenant + "'");

        try (Lock lock = lock(id)) {
            if (curator.exists(applicationPath(id))) return;

            curator.create(applicationPath(id));
            modifyReindexing(id, ApplicationReindexing.empty(), UnaryOperator.identity());
        }
    }

    /**
     * Returns a transaction which writes the given session id as the currently active for the given application.
     *
     * @param applicationId An {@link ApplicationId} that represents an active application.
     * @param sessionId     session id belonging to the application package for this application id.
     */
    public Transaction createWriteActiveTransaction(Transaction transaction, ApplicationId applicationId, long sessionId) {
        String path = applicationPath(applicationId).getAbsolute();
        return transaction.add(setData(path, new ApplicationData(applicationId, OptionalLong.of(sessionId), OptionalLong.of(sessionId)).toJson()));
    }

    /**
     * Returns a transaction which writes the given session id as the currently active for the given application.
     *
     * @param applicationId An {@link ApplicationId} that represents an active application.
     * @param sessionId session id belonging to the application package for this application id.
     */
    // TODO: Remove in Vespa 9
    public Transaction createWriteActiveTransactionInOldFormat(Transaction transaction, ApplicationId applicationId, long sessionId) {
        String path = applicationPath(applicationId).getAbsolute();
        return transaction.add(setData(path, Utf8.toAsciiBytes(sessionId)));
    }

    /**
     * Returns a transaction which writes the given session id as the currently active for the given application.
     *
     * @param applicationId An {@link ApplicationId} that represents an active application.
     * @param sessionId session id belonging to the application package for this application id.
     */
    public Transaction createWritePrepareTransaction(Transaction transaction,
                                                     ApplicationId applicationId,
                                                     long sessionId,
                                                     OptionalLong activeSessionId) {
        // Needs to read or be supplied current active session id, to avoid overwriting a newer session id.
        String path = applicationPath(applicationId).getAbsolute();
        return transaction.add(setData(path, new ApplicationData(applicationId, activeSessionId, OptionalLong.of(sessionId)).toJson()));
    }

    /**
     * Returns a transaction which deletes this application.
     */
    public CuratorTransaction createDeleteTransaction(ApplicationId applicationId) {
        return CuratorTransaction.from(CuratorOperations.deleteAll(applicationPath(applicationId).getAbsolute(), curator), curator);
    }

    /**
     * Returns the active session id for the given application.
     * Returns Optional.empty() if application not found or no active session exists.
     */
    public Optional<Long> activeSessionOf(ApplicationId id) {
        return applicationData(id).flatMap(ApplicationData::activeSession);
    }

    /**
     * Returns application data for the given application.
     * Returns Optional.empty() if application not found or no application data exists.
     */
    public Optional<ApplicationData> applicationData(ApplicationId id) {
        Optional<byte[]> data = curator.getData(applicationPath(id));
        if (data.isEmpty() || data.get().length == 0) return Optional.empty();

        try {
            return Optional.of(ApplicationData.fromBytes(data.get()));
        } catch (IllegalArgumentException e) {
            return applicationDataOldFormat(id);
        }
    }

    /**
     * Returns application data for the given application.
     * Returns Optional.empty() if application not found or no application data exists.
     */
    public Optional<ApplicationData> applicationDataOldFormat(ApplicationId id) {
        Optional<byte[]> data = curator.getData(applicationPath(id));
        if (data.isEmpty() || data.get().length == 0) return Optional.empty();

        return Optional.of(new ApplicationData(id,
                                               OptionalLong.of(data.map(bytes -> Long.parseLong(Utf8.toString(bytes))).get()),
                                               OptionalLong.empty()));
    }

    /**
     * List the active applications of a tenant in this config server.
     *
     * @return a list of {@link ApplicationId}s that are active.
     */
    public List<ApplicationId> activeApplications() {
        return curator.getChildren(applicationsPath).stream()
                      .sorted()
                      .map(ApplicationId::fromSerializedForm)
                      .filter(id -> activeSessionOf(id).isPresent())
                      .toList();
    }

    public Optional<ApplicationReindexing> readReindexingStatus(ApplicationId id) {
        return curator.getData(reindexingDataPath(id))
                      .map(ReindexingStatusSerializer::fromBytes);
    }

    void writeReindexingStatus(ApplicationId id, ApplicationReindexing status) {
        curator.set(reindexingDataPath(id), ReindexingStatusSerializer.toBytes(status));
    }

    /** Sets up a listenable cache with the given listener, over the applications path of this tenant. */
    public Curator.DirectoryCache createApplicationsPathCache(ExecutorService zkCacheExecutor) {
        return curator.createDirectoryCache(applicationsPath.getAbsolute(), false, false, zkCacheExecutor);
    }

    private Path reindexingLockPath(ApplicationId id) {
        return locksPath.append(id.serializedForm()).append("reindexing");
    }

    private Path lockPath(ApplicationId id) {
        return locksPath.append(id.serializedForm());
    }

    private Path applicationPath(ApplicationId id) {
        return applicationsPath.append(id.serializedForm());
    }

    private Path reindexingDataPath(ApplicationId id) {
        return applicationPath(id).append("reindexing");
    }

    private static class ReindexingStatusSerializer {

        private static final String ENABLED = "enabled";
        private static final String CLUSTERS = "clusters";
        private static final String PENDING = "pending";
        private static final String READY = "ready";
        private static final String TYPE = "type";
        private static final String NAME = "name";
        private static final String GENERATION = "generation";
        private static final String EPOCH_MILLIS = "epochMillis";
        private static final String SPEED = "speed";
        private static final String CAUSE = "cause";

        private static byte[] toBytes(ApplicationReindexing reindexing) {
            Cursor root = new Slime().setObject();
            root.setBool(ENABLED, reindexing.enabled());

            Cursor clustersArray = root.setArray(CLUSTERS);
            reindexing.clusters().forEach((name, cluster) -> {
                Cursor clusterObject = clustersArray.addObject();
                clusterObject.setString(NAME, name);

                Cursor pendingArray = clusterObject.setArray(PENDING);
                cluster.pending().forEach((type, generation) -> {
                    Cursor pendingObject =  pendingArray.addObject();
                    pendingObject.setString(TYPE, type);
                    pendingObject.setLong(GENERATION, generation);
                });

                Cursor readyArray = clusterObject.setArray(READY);
                cluster.ready().forEach((type, status) -> {
                    Cursor statusObject = readyArray.addObject();
                    statusObject.setString(TYPE, type);
                    setStatus(statusObject, status);
                });
            });
            return Exceptions.uncheck(() -> SlimeUtils.toJsonBytes(root));
        }

        private static void setStatus(Cursor statusObject, Status status) {
            statusObject.setLong(EPOCH_MILLIS, status.ready().toEpochMilli());
            statusObject.setDouble(SPEED, status.speed());
            statusObject.setString(CAUSE, status.cause());
        }

        private static ApplicationReindexing fromBytes(byte[] data) {
            Cursor root = SlimeUtils.jsonToSlimeOrThrow(data).get();
            return new ApplicationReindexing(root.field(ENABLED).valid() ? root.field(ENABLED).asBool() : true,
                                             SlimeUtils.entriesStream(root.field(CLUSTERS))
                                                       .collect(toUnmodifiableMap(object -> object.field(NAME).asString(),
                                                                                  object -> getCluster(object))));
        }

        private static Cluster getCluster(Inspector object) {
            return new Cluster(SlimeUtils.entriesStream(object.field(PENDING))
                                         .collect(toUnmodifiableMap(entry -> entry.field(TYPE).asString(),
                                                                    entry -> entry.field(GENERATION).asLong())),
                               SlimeUtils.entriesStream(object.field(READY))
                                         .collect(toUnmodifiableMap(entry -> entry.field(TYPE).asString(),
                                                                    entry -> getStatus(entry))));
        }

        private static Status getStatus(Inspector statusObject) {
            return new Status(Instant.ofEpochMilli(statusObject.field(EPOCH_MILLIS).asLong()),
                              statusObject.field(SPEED).valid() ? statusObject.field(SPEED).asDouble() : 0.2,
                              statusObject.field(CAUSE).asString());
        }

    }

}