aboutsummaryrefslogtreecommitdiffstats
path: root/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/persistence/CachingCurator.java
blob: b74cb6892925a98576baada5d1f1c1c77e8c1635 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.hosted.provision.persistence;

import com.google.common.cache.AbstractCache;
import com.yahoo.config.provision.HostName;
import com.yahoo.path.Path;
import com.yahoo.transaction.NestedTransaction;
import com.yahoo.vespa.curator.Curator;
import com.yahoo.vespa.curator.Lock;
import com.yahoo.vespa.curator.recipes.CuratorCounter;
import com.yahoo.vespa.curator.transaction.CuratorTransaction;
import org.apache.zookeeper.data.Stat;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

/**
 * A caching wrapper for {@link Curator}.
 *
 * It serves reads from an in-memory cache of the content which is invalidated when changed on another node
 * using a global, shared counter. The counter is updated on all write operations, ensured by wrapping write
 * operations in a try block, with the counter increment in a finally block.
 *
 * Locks must be used to ensure consistency.
 *
 * @author bratseth
 * @author jonmv
 */
public class CachingCurator {

    private final Curator curator;

    /** A shared atomic counter which is incremented every time we write to the curator database */
    private final CuratorCounter changeGenerationCounter;

    /** A partial cache of the Curator database, which is only valid if generations match */
    private final AtomicReference<Cache> cache = new AtomicReference<>();

    /** Whether we should return data from the cache or always read from ZooKeeper */
    private final boolean enabled;

    private final Object cacheCreationLock = new Object();

    /**
     * Creates a curator database
     *
     * @param curator the curator instance
     * @param root the file system root of the db
     */
    public CachingCurator(Curator curator, Path root, boolean enabled) {
        this.enabled = enabled;
        this.curator = curator;
        changeGenerationCounter = new CuratorCounter(curator, root.append("changeCounter"));
        cache.set(newCache(changeGenerationCounter.get()));
    }

    /** Returns all hosts configured to be part of this ZooKeeper cluster */
    public List<HostName> cluster() {
        return Arrays.stream(curator.zooKeeperEnsembleConnectionSpec().split(","))
                     .filter(hostAndPort -> !hostAndPort.isEmpty())
                     .map(hostAndPort -> hostAndPort.split(":")[0])
                     .map(HostName::of)
                     .toList();
    }

    /** Create a reentrant lock */
    public Lock lock(Path path, Duration timeout) {
        return curator.lock(path, timeout);
    }

    // --------- Write operations ------------------------------------------------------------------------------
    // These must either create a nested transaction ending in a counter increment or not depend on prior state

    /**
     * Creates a new curator transaction against this database and adds it to the given nested transaction.
     * Important: It is the nested transaction which must be committed - never the curator transaction directly.
     */
    public CuratorTransaction newCuratorTransactionIn(NestedTransaction transaction) {
        // Wrap the curator transaction with an increment of the generation counter.
        CountingCuratorTransaction curatorTransaction = new CountingCuratorTransaction(curator, changeGenerationCounter);
        transaction.add(curatorTransaction);
        return curatorTransaction;
    }

    /** Creates a path in curator and all its parents as necessary. If the path already exists this does nothing. */
    void create(Path path) {
        if (curator.create(path))
            changeGenerationCounter.next(); // Increment counter to ensure getChildren sees any change.
    }

    /** Returns whether given path exists */
    boolean exists(Path path) {
        return curator.exists(path);
    }

    // --------- Read operations -------------------------------------------------------------------------------
    // These can read from the memory file system, which accurately mirrors the ZooKeeper content IF
    // the current generation counter is the same as it was when data was put into the cache, AND
    // the data to read is protected by a lock which is held now, and during any writes of the data.

    /** Returns the immediate, local names of the children under this node in any order */
    List<String> getChildren(Path path) { return getSession().getChildren(path); }

    Optional<byte[]> getData(Path path) { return getSession().getData(path); }

    /** Invalidates the current cache if outdated. */
    Session getSession() {
        if (changeGenerationCounter.get() != cache.get().generation)
            synchronized (cacheCreationLock) {
                while (changeGenerationCounter.get() != cache.get().generation)
                    cache.set(newCache(changeGenerationCounter.get()));
            }
            
        return cache.get();
    }

    CacheStats cacheStats() {
        return cache.get().stats();
    }

    /** Caches must only be instantiated using this method */
    private Cache newCache(long generation) {
        return enabled ? new Cache(generation, curator) : new NoCache(generation, curator);
    }

    /**
     * A thread safe partial snapshot of the curator database content with a given generation.
     * This is merely a recording of what Curator returned at various points in time when 
     * it had the counter at this generation.
     */
    private static class Cache implements Session {

        private final long generation;

        /** The curator instance used to fetch missing data */
        protected final Curator curator;

        // The data of this partial state mirror. The amount of curator state mirrored in this may grow
        // over time by multiple threads. Growing is the only operation permitted by this.
        // The content of the map is immutable.
        private final Map<Path, List<String>> children = new ConcurrentHashMap<>();
        private final Map<Path, Optional<byte[]>> data = new ConcurrentHashMap<>();
        private final Map<Path, Optional<Integer>> stats = new ConcurrentHashMap<>();

        private final AbstractCache.SimpleStatsCounter statistics = new AbstractCache.SimpleStatsCounter();

        /** Create an empty snapshot at a given generation (as an empty snapshot is a valid partial snapshot) */
        private Cache(long generation, Curator curator) {
            this.generation = generation;
            this.curator = curator;
        }

        @Override
        public List<String> getChildren(Path path) {
            return get(children, path, () -> List.copyOf(curator.getChildren(path)));
        }

        @Override
        public Optional<byte[]> getData(Path path) {
            return get(data, path, () -> curator.getData(path)).map(data -> Arrays.copyOf(data, data.length));
        }

        @Override
        public Optional<Integer> getStat(Path path) {
            return get(stats, path, () -> curator.getStat(path).map(Stat::getVersion));
        }

        private <T> T get(Map<Path, T> values, Path path, Supplier<T> loader) {
            return values.compute(path, (key, value) -> {
                if (value == null) {
                    statistics.recordMisses(1);
                    return loader.get();
                }
                statistics.recordHits(1);
                return value;
            });
        }

        public CacheStats stats() {
            var stats = this.statistics.snapshot();
            return new CacheStats(stats.hitRate(), stats.evictionCount(), children.size() + data.size());
        }

    }

    /** An implementation of the curator database cache which does no caching */
    private static class NoCache extends Cache {

        private NoCache(long generation, Curator curator) { super(generation, curator); }

        @Override
        public List<String> getChildren(Path path) { return curator.getChildren(path); }

        @Override
        public Optional<byte[]> getData(Path path) { return curator.getData(path); }

        @Override
        public Optional<Integer> getStat(Path path) {
            return curator.getStat(path).map(Stat::getVersion);
        }

    }

    interface Session {

        /**
         * Returns the children of this path, which may be empty.
         */
        List<String> getChildren(Path path);

        /**
         * Returns a copy of the content of this child - which may be empty.
         */
        Optional<byte[]> getData(Path path);

        Optional<Integer> getStat(Path path);

    }

}