summaryrefslogtreecommitdiffstats
path: root/config/src/main/java/com/yahoo/config/subscription/impl/ConfigSubscription.java
blob: 780556e93fa2f40959a2cced200b2b2c5cdfeaef (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.config.subscription.impl;

import com.yahoo.config.ConfigInstance;
import com.yahoo.config.subscription.ConfigSet;
import com.yahoo.config.subscription.ConfigSource;
import com.yahoo.config.subscription.ConfigSourceSet;
import com.yahoo.config.subscription.ConfigSubscriber;
import com.yahoo.config.subscription.DirSource;
import com.yahoo.config.subscription.FileSource;
import com.yahoo.config.subscription.JarSource;
import com.yahoo.config.subscription.RawSource;
import com.yahoo.vespa.config.ConfigKey;
import com.yahoo.vespa.config.PayloadChecksums;
import com.yahoo.vespa.config.TimingValues;
import com.yahoo.vespa.config.protocol.DefContent;

import java.io.File;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;

import static com.yahoo.vespa.config.PayloadChecksum.Type.MD5;

/**
 * Represents one active subscription to one config
 *
 * @author Vegard Havdal
 */
public abstract class ConfigSubscription<T extends ConfigInstance> {

    protected static final Logger log = Logger.getLogger(ConfigSubscription.class.getName());
    private final AtomicReference<ConfigState<T>> config = new AtomicReference<>();
    protected final ConfigKey<T> key;
    protected final Class<T> configClass;
    private volatile RuntimeException exception = null;
    private State state = State.OPEN;

    public static class ConfigState<T extends ConfigInstance> {

        private final boolean configChanged;
        private final boolean generationChanged;
        private final T config;
        private final Long generation;
        private final boolean applyOnRestart;
        private final PayloadChecksums payloadChecksums;

        private ConfigState(boolean generationChanged,
                            Long generation,
                            boolean applyOnRestart,
                            boolean configChanged,
                            T config,
                            PayloadChecksums payloadChecksums) {
            this.generationChanged = generationChanged;
            this.generation = generation;
            this.applyOnRestart = applyOnRestart;
            this.configChanged = configChanged;
            this.config = config;
            this.payloadChecksums = payloadChecksums;
        }

        private ConfigState(Long generation, T config, PayloadChecksums payloadChecksums) {
            this(false, generation, false, false, config, payloadChecksums);
        }

        private ConfigState() {
            this(false, 0L, false, false, null, PayloadChecksums.empty());
        }

        private ConfigState<T> createUnchanged() {  return new ConfigState<>(generation, config, payloadChecksums); }

        public boolean isConfigChanged() { return configChanged; }

        public boolean isGenerationChanged() { return generationChanged; }

        public Long getGeneration() { return generation; }

        public boolean applyOnRestart() { return applyOnRestart; }

        public T getConfig() { return config; }

        public PayloadChecksums getChecksums() { return payloadChecksums; }

    }

    /**
     * If non-null: The user has set this generation explicitly. nextConfig should take this into account.
     * Access to these variables _must_ be synchronized, as nextConfig and reload() is likely to be run from
     * independent threads.
     */
    private final AtomicReference<Long> reloadedGeneration = new AtomicReference<>();

    enum State {
        OPEN, CLOSED
    }

    /**
     * Initializes one subscription
     *
     * @param key        a {@link ConfigKey}
     */
    ConfigSubscription(ConfigKey<T> key) {
        this.key = key;
        this.configClass = key.getConfigClass();
        this.config.set(new ConfigState<>());
        getConfigState().getChecksums().removeChecksumsOfType(MD5);  // TODO: Temporary until we don't use md5 anymore
    }

    /**
     * Correct type of ConfigSubscription instance based on type of source or form of config id
     *
     * @param key        a {@link ConfigKey}
     * @param subscriber the subscriber for this subscription
     * @return a subclass of a ConfigsSubscription
     */
    public static <T extends ConfigInstance> ConfigSubscription<T> get(ConfigKey<T> key, ConfigSubscriber subscriber,
                                                                       ConfigSource source, TimingValues timingValues) {
        String configId = key.getConfigId();
        if (source instanceof RawSource || configId.startsWith("raw:")) return getRawSub(key, source);
        if (source instanceof FileSource || configId.startsWith("file:")) return getFileSub(key, source);
        if (source instanceof DirSource || configId.startsWith("dir:")) return getDirFileSub(key, source);
        if (source instanceof JarSource || configId.startsWith("jar:")) return getJarSub(key, source);
        if (source instanceof ConfigSet) return new ConfigSetSubscription<>(key, source);
        if (source instanceof ConfigSourceSet) return new JRTConfigSubscription<>(key, subscriber, (ConfigSourceSet) source, timingValues);
        throw new IllegalArgumentException("Unknown source type: " + source);
    }

    private static <T extends ConfigInstance> JarConfigSubscription<T> getJarSub(ConfigKey<T> key, ConfigSource source) {
        String jarName;
        String path = "config/";
        if (source instanceof JarSource) {
            JarSource js = (JarSource) source;
            jarName = js.getJarFile().getName();
            if (js.getPath() != null) path = js.getPath();
        } else {
            jarName = key.getConfigId().replace("jar:", "").replaceFirst("\\!/.*", "");
            if (key.getConfigId().contains("!/")) path = key.getConfigId().replaceFirst(".*\\!/", "");
        }
        return new JarConfigSubscription<>(key, jarName, path);
    }

    private static <T extends ConfigInstance> ConfigSubscription<T> getFileSub(ConfigKey<T> key, ConfigSource source) {
        File file = ((source instanceof FileSource))
                ? ((FileSource) source).getFile()
                : new File(key.getConfigId().replace("file:", ""));
        return new FileConfigSubscription<>(key, file);
    }

    private static <T extends ConfigInstance> ConfigSubscription<T> getRawSub(ConfigKey<T> key, ConfigSource source) {
        String payload = ((source instanceof RawSource)
                ? ((RawSource) source).payload
                : key.getConfigId().replace("raw:", ""));
        return new RawConfigSubscription<>(key, payload);
    }

    private static <T extends ConfigInstance> ConfigSubscription<T> getDirFileSub(ConfigKey<T> key, ConfigSource source) {
        String dir = key.getConfigId().replace("dir:", "");
        if (source instanceof DirSource) {
            dir = ((DirSource) source).getDir().toString();
        }
        if (!dir.endsWith(File.separator)) dir = dir + File.separator;
        String name = getConfigFilename(key);
        File file = new File(dir + name);
        if (!file.exists()) {
            throw new IllegalArgumentException("Could not find a config file for '" + key.getName() + "' in '" + dir + "'");
        }
        return new FileConfigSubscription<>(key, file);
    }

    @SuppressWarnings("unchecked")
    @Override
    public boolean equals(Object o) {
        if (o instanceof ConfigSubscription) {
            ConfigSubscription<T> other = (ConfigSubscription<T>) o;
            return key.equals(other.key);
        }
        return false;
    }


    /**
     * Called from {@link ConfigSubscriber} when the changed status of this config is propagated to the clients
     */
    public boolean isConfigChangedAndReset(Long requiredGen) {
        ConfigState<T> prev = config.get();
        while (prev.getGeneration().equals(requiredGen) && !config.compareAndSet(prev, prev.createUnchanged())) {
            prev = config.get();
        }
        // A false positive is a lot better than a false negative
        return !prev.getGeneration().equals(requiredGen) || prev.isConfigChanged();
    }

    void setConfig(Long generation, boolean applyOnRestart, T config, PayloadChecksums payloadChecksums) {
        this.config.set(new ConfigState<>(true, generation, applyOnRestart, true, config, payloadChecksums));
    }

    void setConfigAndGeneration(Long generation, boolean applyOnRestart, T config, PayloadChecksums payloadChecksums) {
        ConfigState<T> prev = this.config.get();
        boolean configChanged = !Objects.equals(prev.getConfig(), config);
        String message = "Config has changed unexpectedly for " + key + ", generation " + generation;
        if (configChanged) {
            if (log.isLoggable(Level.FINE))
                message = message + ", config in state :" + prev.getConfig() + ", new config: " + config;
            log.log(Level.WARNING, message);
        }
        this.config.set(new ConfigState<>(true, generation, applyOnRestart, configChanged, config, payloadChecksums));
    }

    /**
     * Used by {@link FileConfigSubscription} and {@link ConfigSetSubscription}
     */
    protected void setConfigIncGen(T config) {
        ConfigState<T> prev = this.config.get();
        this.config.set(new ConfigState<>(true, prev.getGeneration() + 1, prev.applyOnRestart(), true, config, prev.payloadChecksums));
    }

    protected void setConfigIfChanged(T config) {
        ConfigState<T> prev = this.config.get();
        this.config.set(new ConfigState<>(true, prev.getGeneration(), prev.applyOnRestart(), !Objects.equals(prev.getConfig(), config), config, prev.payloadChecksums));
    }

    void setGeneration(Long generation) {
        ConfigState<T> prev = config.get();
        this.config.set(new ConfigState<>(true, generation, prev.applyOnRestart(), prev.isConfigChanged(), prev.getConfig(), prev.payloadChecksums));
    }

    void setApplyOnRestart(boolean applyOnRestart) {
        ConfigState<T> prev = config.get();
        this.config.set(new ConfigState<>(prev.isGenerationChanged(), prev.getGeneration(), applyOnRestart, prev.isConfigChanged(), prev.getConfig(), prev.payloadChecksums));
    }

    /**
     * The config state object of this subscription
     *
     * @return the ConfigInstance (the config) of this subscription
     */
    public ConfigState<T> getConfigState() {
        return config.get();
    }

    /**
     * The class of the subscription's desired {@link ConfigInstance}
     *
     * @return the config class
     */
    public Class<T> getConfigClass() {
        return configClass;
    }

    @Override
    public String toString() {
        StringBuilder s = new StringBuilder(key.toString());
        ConfigState<T> c = config.get();
        s.append(", Current generation: ").append(c.getGeneration())
                .append(", Generation changed: ").append(c.isGenerationChanged())
                .append(", Config changed: ").append(c.isConfigChanged());
        if (exception != null)
            s.append(", Exception: ").append(exception);
        return s.toString();
    }

    /**
     * The config key which this subscription uses to identify its config
     *
     * @return the ConfigKey for this subscription
     */
    public ConfigKey<T> getKey() {
        return key;
    }

    /**
     * Polls this subscription for a change. The method is guaranteed to use all of the given timeout before returning false. It will also take into account a user-set generation,
     * that can be set by {@link ConfigSubscriber#reload(long)}.
     *
     * @param timeout in milliseconds
     * @return false if timed out, true if generation or config or {@link #exception} changed. If true, the {@link #config} field will be set also.
     * has changed
     */
    public abstract boolean nextConfig(long timeout);

    /**
     * Will block until the next {@link #nextConfig(long)} is guaranteed to return an answer (or throw) immediately (i.e. not block)
     *
     * @param timeout in milliseconds
     * @return false if timed out
     */
    public abstract boolean subscribe(long timeout);

    /**
     * Called by for example network threads to signal that the user thread should throw this exception immediately
     *
     * @param e a RuntimeException
     */
    public void setException(RuntimeException e) {
        this.exception = e;
    }

    /**
     * Gets an exception set by for example a network thread. If not null, it indicates that it should be
     * thrown in the user's thread immediately.
     *
     * @return a RuntimeException if there exists one
     */
    public RuntimeException getException() {
        return exception;
    }

    /**
     * Returns true if an exception set by for example a network thread has been caught.
     *
     * @return true if there exists an exception for this subscription
     */
    boolean hasException() {
        return exception != null;
    }

    public void close() {
        state = State.CLOSED;
    }

    public boolean isClosed() { return state == State.CLOSED; }

    /**
     * Returns the file name corresponding to the given key's defName.
     *
     * @param key a {@link ConfigKey}
     * @return file name
     */
    static <T extends ConfigInstance> String getConfigFilename(ConfigKey<T> key) {
        return key.getName() + ".cfg";
    }

    /**
     * Force this into the given generation, used in testing
     *
     * @param generation a config generation
     */
    public void reload(long generation) {
        reloadedGeneration.set(generation);
    }

    /**
     * True if someone has set the {@link #reloadedGeneration} number by calling {@link #reload(long)}
     * and hence wants to force a given generation programmatically. If that is the case,
     * sets the generation and flags it as changed accordingly.
     *
     * @return true if {@link #reload(long)} has been called, false otherwise
     */
    protected boolean checkReloaded() {
        Long reloaded = reloadedGeneration.getAndSet(null);
        if (reloaded != null) {
            setGeneration(reloaded);
            return true;
        }
        return false;
    }

    /**
     * The config definition schema
     *
     * @return the config definition for this subscription
     */
    public DefContent getDefContent() {
        return (DefContent.fromClass(configClass));
    }
}