aboutsummaryrefslogtreecommitdiffstats
path: root/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java
blob: df7357f31da5fc3d791aa85fa1b600981ddb4e38 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.clustercontroller.core;

import com.yahoo.jrt.Acceptor;
import com.yahoo.jrt.ErrorCode;
import com.yahoo.jrt.Int32Value;
import com.yahoo.jrt.ListenFailedException;
import com.yahoo.jrt.Method;
import com.yahoo.jrt.Request;
import com.yahoo.jrt.Spec;
import com.yahoo.jrt.StringValue;
import com.yahoo.jrt.Supervisor;
import com.yahoo.jrt.Transport;
import com.yahoo.jrt.slobrok.api.BackOffPolicy;
import com.yahoo.jrt.slobrok.api.Register;
import com.yahoo.jrt.slobrok.api.SlobrokList;
import com.yahoo.vdslib.state.ClusterState;
import com.yahoo.vdslib.state.Node;
import com.yahoo.vdslib.state.NodeState;
import com.yahoo.vdslib.state.NodeType;
import com.yahoo.vdslib.state.State;
import com.yahoo.vespa.clustercontroller.core.rpc.RPCCommunicator;
import com.yahoo.vespa.clustercontroller.core.rpc.RPCUtil;
import java.time.Duration;
import java.time.Instant;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 *
 * Used to fake a node in VDS, such that we can test the fleetcontroller without dummy interface for talking to
 * VDS nodes.
 */
public class DummyVdsNode {

    private static final Logger log = Logger.getLogger(DummyVdsNode.class.getName());

    private final String[] slobrokConnectionSpecs;
    private final String clusterName;
    private final NodeType type;
    private final int index;
    private NodeState nodeState;
    private String hostInfo = "{}";
    private Supervisor supervisor;
    private Acceptor acceptor;
    private Register register;
    private final Timer timer;
    private boolean failSetSystemStateRequests = false;
    private boolean resetTimestampOnReconnect = false;
    private final Map<Node, Long> highestStartTimestamps = new TreeMap<>();
    int timedOutStateReplies = 0;
    int outdatedStateReplies = 0;
    int immediateStateReplies = 0;
    int setNodeStateReplies = 0;
    private boolean registeredInSlobrok = false;

    static class Req {
        final Request request;
        final long timeToReply;

        Req(Request r, long timeToReply) {
            request = r;
            this.timeToReply = timeToReply;
        }
    }

    static class BackOff implements BackOffPolicy {
        public void reset() {}
        public double get() { return 0.01; }
        public boolean shouldWarn(double v) { return false; }
        public boolean shouldInform(double v) { return false; }
    }

    /** List of requests that should be replied to after a specified time */
    private final List<Req> waitingRequests = new LinkedList<>();

    /**
     * History of received cluster states.
     * Any access to this list or to its members must be synchronized on the timer variable.
     */
    private final List<ClusterStateBundle> clusterStateBundles = new LinkedList<>();
    private int activatedClusterStateVersion = 0;

    private final Thread messageResponder = new Thread() {
        public void run() {
            log.log(Level.FINE, () -> "Dummy node " + DummyVdsNode.this + ": starting message responder thread");
            while (true) {
                synchronized (timer) {
                    if (isInterrupted()) break;
                    long currentTime = timer.getCurrentTimeInMillis();
                    for (Iterator<Req> it = waitingRequests.iterator(); it.hasNext(); ) {
                        Req r = it.next();
                        if (currentTime >= r.timeToReply) {
                            log.log(Level.FINE, () -> "Dummy node " + DummyVdsNode.this + ": Responding to node state request at time " + currentTime);
                            r.request.returnValues().add(new StringValue(nodeState.serialize()));
                            if (r.request.methodName().equals("getnodestate3")) {
                                r.request.returnValues().add(new StringValue(hostInfo));
                            }
                            r.request.returnRequest();
                            it.remove();
                            ++timedOutStateReplies;
                        }
                    }
                    try{
                        timer.wait(10);
                    } catch (InterruptedException e) {
                        break;
                    }
                }
            }
            log.log(Level.FINE, () -> "Dummy node " + DummyVdsNode.this + ": shut down message responder thread");
        }
    };

    public DummyVdsNode(Timer timer, String[] slobrokConnectionSpecs, String clusterName,
                        NodeType nodeType, int index) {
        this.timer = timer;
        this.slobrokConnectionSpecs = slobrokConnectionSpecs;
        this.clusterName = clusterName;
        type = nodeType;
        this.index = index;
        this.nodeState = new NodeState(type, State.UP);
        messageResponder.start();
        nodeState.setStartTimestamp(timer.getCurrentTimeInMillis() / 1000);
    }

    int getPendingNodeStateCount() { return waitingRequests.size(); }

    public void shutdown() {
        messageResponder.interrupt();
        try{ messageResponder.join(); } catch (InterruptedException e) { /* ignore */ }
        disconnect();
    }

    public int connect() throws ListenFailedException {
        if (resetTimestampOnReconnect) {
            long startTimestamp = timer.getCurrentTimeInMillis() / 1000;
            nodeState.setStartTimestamp(startTimestamp);
            resetTimestampOnReconnect = false;
        }
        supervisor = new Supervisor(new Transport());
        addMethods();
        acceptor = supervisor.listen(new Spec(0));
        SlobrokList slist = new SlobrokList();
        slist.setup(slobrokConnectionSpecs);
        register = new Register(supervisor, slist, new Spec("localhost", acceptor.port()), new BackOff());
        registerSlobrok();
        return acceptor.port();
    }

    public boolean isConnected() {
        return (registeredInSlobrok && supervisor != null);
    }

    void registerSlobrok() {
        register.registerName(getSlobrokName());
        register.registerName(getSlobrokName() + "/default");
        registeredInSlobrok = true;
    }

    void disconnectSlobrok() {
        register.unregisterName(getSlobrokName());
        register.unregisterName(getSlobrokName() + "/default");
        registeredInSlobrok = false;
    }

    void disconnectImmediately() { disconnect();  }

    void disconnect() {
        log.log(Level.FINE, () -> "Dummy node " + DummyVdsNode.this + ": Breaking connection.");
        if (supervisor == null) return;
        register.shutdown();
        acceptor.shutdown().join();
        supervisor.transport().shutdown().join();
        supervisor = null;
        log.log(Level.FINE, () -> "Dummy node " + DummyVdsNode.this + ": Done breaking connection.");
    }

    public String toString() {
        return type + "." + index;
    }

    public boolean isDistributor() { return type.equals(NodeType.DISTRIBUTOR); }
    public NodeType getType() { return type; }

    public Node getNode() {
        return new Node(type, index);
    }

    void waitForSystemStateVersion(int version, Duration timeout) {
        try {
            Instant endTime = Instant.now().plus(timeout);
            while (getLatestSystemStateVersion().orElse(-1) < version) {
                if (Instant.now().isAfter(endTime))
                    throw new RuntimeException("Timed out waiting for state version " + version + " in " + this);
                Thread.sleep(10);
            }
        } catch (InterruptedException e) { /* ignore */ }
    }

    /** Returns the latest system state version received, or empty if none are received yet. */
    private Optional<Integer> getLatestSystemStateVersion() {
        synchronized(timer) {
            if (clusterStateBundles.isEmpty()) {
                return Optional.empty();
            }
            return Optional.of(clusterStateBundles.get(0).getVersion());
        }
    }

    public boolean hasPendingGetNodeStateRequest() {
        synchronized (timer) {
            return !waitingRequests.isEmpty();
        }
    }

    void replyToPendingNodeStateRequests() {
        for(Req req : waitingRequests) {
            log.log(Level.FINE, () -> "Dummy node " + this + " answering pending node state request.");
            req.request.returnValues().add(new StringValue(nodeState.serialize()));
            if (req.request.methodName().equals("getnodestate3")) {
                req.request.returnValues().add(new StringValue(hostInfo));
            }
            req.request.returnRequest();
            ++setNodeStateReplies;
        }
        waitingRequests.clear();
    }

    public void setNodeState(NodeState state, String hostInfo) {
        log.log(Level.FINE, () -> "Dummy node " + this + " got new state: " + state);
        synchronized(timer) {
            this.nodeState = state;
            this.hostInfo = hostInfo;
            replyToPendingNodeStateRequests();
        }
    }

    public void setNodeState(NodeState state) {
        setNodeState(state, "{}");
    }

    public void setNodeState(State state) {
        setNodeState(new NodeState(type, state));
    }

    List<ClusterState> getSystemStatesReceived() {
        synchronized(timer) {
            return clusterStateBundles.stream()
                    .map(ClusterStateBundle::getBaselineClusterState)
                    .toList();
        }
    }

    public ClusterStateBundle getClusterStateBundle() {
        synchronized(timer) {
            // In a two-phase state activation scenario, bundles are added to `clusterStateBundles` _before_
            // the version has been activated. Since we want this method to only return _activated_ bundles
            // we filter out versions that are not yet activated. In a non two-phase scenario the activated
            // version is implicitly the same as the most recently received bundle, so the filter is a no-op.
            return clusterStateBundles.stream()
                    .filter(b -> b.getVersion() <= activatedClusterStateVersion)
                    .findFirst() // Most recent cluster state bundle first in list
                    .orElse(null);
        }
    }

    public ClusterState getClusterState() {
        return Optional.ofNullable(getClusterStateBundle())
                .map(ClusterStateBundle::getBaselineClusterState)
                .orElse(null);
    }

    String getSlobrokName() {
        return "storage/cluster." + clusterName + "/" + type + "/" + index;
    }

    private void addMethods() {
        Method m;

        m = new Method("vespa.storage.connect", "s", "i", this::rpc_storageConnect);
        m.methodDesc("Binds connection to a storage API handle");
        m.paramDesc(0, "somearg", "Argument looking like slobrok address of the ones we're asking for some reason");
        m.returnDesc(0, "returnCode", "Returncode of request. Should be 0 = OK");
        supervisor.addMethod(m);

        m = new Method("getnodestate3", "sii", "ss", this::rpc_getNodeState3);
        m.methodDesc("Get nodeState of a node, answer when state changes from given state.");
        m.paramDesc(0, "nodeStateIn", "The node state of the given node");
        m.paramDesc(1, "timeout", "Time timeout in milliseconds set by the state requester.");
        m.paramDesc(2, "index", "Node index.");
        m.returnDesc(0, "nodeStateOut", "The node state of the given node");
        m.returnDesc(1, "hostinfo", "Information on the host node is running on");
        supervisor.addMethod(m);

        m = new Method(RPCCommunicator.SET_DISTRIBUTION_STATES_RPC_METHOD_NAME, "bix", "", this::rpc_setDistributionStates);
        m.methodDesc("Set distribution states for cluster and bucket spaces");
        m.paramDesc(0, "compressionType", "Compression type for payload");
        m.paramDesc(1, "uncompressedSize", "Uncompressed size of payload");
        m.paramDesc(2, "payload", "Slime format payload");
        supervisor.addMethod(m);

        m = new Method(RPCCommunicator.ACTIVATE_CLUSTER_STATE_VERSION_RPC_METHOD_NAME, "i", "i", this::rpc_activateClusterStateVersion);
        m.methodDesc("Activate a given cluster state version");
        m.paramDesc(0, "stateVersion", "Cluster state version to activate");
        m.returnDesc(0, "actualVersion", "Actual cluster state version on node");
        supervisor.addMethod(m);
    }

    private void rpc_storageConnect(Request req) {
        synchronized(timer) {
            log.log(Level.FINEST, () -> "Dummy node " + this + " got old type handle connect message.");
            req.returnValues().add(new Int32Value(0));
        }
    }

    boolean sendGetNodeStateReply(int index) {
        for (Iterator<Req> it = waitingRequests.iterator(); it.hasNext(); ) {
             Req r = it.next();
             if (r.request.parameters().size() > 2 && r.request.parameters().get(2).asInt32() == index) {
                 log.log(Level.FINE, () -> "Dummy node " + DummyVdsNode.this + ": Responding to node state reply from controller " + index + " as we received new one");
                 r.request.returnValues().add(new StringValue(nodeState.serialize()));
                 r.request.returnValues().add(new StringValue(hostInfo));
                 r.request.returnRequest();
                 it.remove();
                 ++outdatedStateReplies;
                 return true;
             }
        }
        return false;
    }

    private void rpc_getNodeState3(Request req) {
        log.log(Level.FINE, () -> "Dummy node " + this + ": Got " + req.methodName() + " request");
        try{
            String oldState = req.parameters().get(0).asString();
            int timeout = req.parameters().get(1).asInt32();
            int index = req.parameters().get(2).asInt32();
            synchronized(timer) {
                boolean sentReply = sendGetNodeStateReply(index);
                NodeState givenState = (oldState.equals("unknown") ? null : NodeState.deserialize(type, oldState));
                if (givenState != null && (givenState.equals(nodeState) || sentReply)) {
                    long timeToReply = timer.getCurrentTimeInMillis() + timeout * 800L / 1000;
                    log.log(Level.FINE, () -> "Dummy node " + this + " has same state as reported (" + givenState + "). Queuing request. Timeout is " + timeout + " ms. "
                            + "Will be answered at time " + timeToReply);
                    req.detach();
                    waitingRequests.add(new Req(req, timeToReply));
                    log.log(Level.FINE, () -> "Dummy node " + this + " has " + waitingRequests.size() + " requests waiting to be answered");
                    timer.notifyAll();
                } else {
                    log.log(Level.FINE, () -> "Dummy node " + this + ": Request had " + (givenState == null ? "no state" : "different state(" + givenState +")") + ". Answering with " + nodeState);
                    req.returnValues().add(new StringValue(nodeState.serialize()));
                    req.returnValues().add(new StringValue(hostInfo));
                    ++immediateStateReplies;
                }
            }
        } catch (Exception e) {
            log.log(Level.SEVERE, "Dummy node " + this + ": An error occurred when answering " + req.methodName() + " request: " + e.getMessage());
            e.printStackTrace(System.err);
            req.setError(ErrorCode.METHOD_FAILED, e.getMessage());
        }
    }

    private void updateStartTimestamps(ClusterState state) {
        for(int i=0; i<2; ++i) {
            NodeType nodeType = (i == 0 ? NodeType.DISTRIBUTOR : NodeType.STORAGE);
            for (int j=0, n=state.getNodeCount(nodeType); j<n; ++j) {
                Node node = new Node(nodeType, j);
                NodeState ns = state.getNodeState(node);
                if (ns.getStartTimestamp() != 0) {
                    Long oldValue = highestStartTimestamps.get(node);
                    if (oldValue != null && oldValue > ns.getStartTimestamp()) {
                        throw new Error("Somehow start timestamp of node " + node + " has gone down");
                    }
                    highestStartTimestamps.put(node, ns.getStartTimestamp());
                }
            }
        }
    }

    void failSetSystemState(boolean failSystemStateRequests) {
        synchronized (timer) {
            this.failSetSystemStateRequests = failSystemStateRequests;
        }
    }

    private boolean shouldFailSetSystemStateRequests() {
        synchronized (timer) {
            return failSetSystemStateRequests;
        }
    }

    private void rpc_setDistributionStates(Request req) {
        try {
            if (shouldFailSetSystemStateRequests()) {
                req.setError(ErrorCode.GENERAL_ERROR, "Dummy node configured to fail setDistributionStates() calls");
                return;
            }
            ClusterStateBundle stateBundle = RPCUtil.decodeStateBundleFromSetDistributionStatesRequest(req);
            synchronized(timer) {
                updateStartTimestamps(stateBundle.getBaselineClusterState());
                clusterStateBundles.add(0, stateBundle);
                timer.notifyAll();
            }
            log.log(Level.FINE, () -> "Dummy node " + this + ": Got new cluster state " + stateBundle);
        } catch (Exception e) {
            log.log(Level.SEVERE, "Dummy node " + this + ": An error occurred when answering setdistributionstates request: " + e.getMessage());
            e.printStackTrace(System.err);
            req.setError(ErrorCode.METHOD_FAILED, e.getMessage());
        }
    }

    private void rpc_activateClusterStateVersion(Request req) {
        try {
            if (shouldFailSetSystemStateRequests()) {
                // We assume that failing setDistributionStates also implies failing version activations
                req.setError(ErrorCode.GENERAL_ERROR, "Dummy node configured to fail activateClusterStateVersion() calls");
                return;
            }
            int activateVersion = req.parameters().get(0).asInt32();
            synchronized(timer) {
                int actualVersion = getLatestSystemStateVersion().orElse(0);
                req.returnValues().add(new Int32Value(actualVersion));
                if (activateVersion == actualVersion) {
                    activatedClusterStateVersion = activateVersion;
                    timer.notifyAll();
                } else {
                    log.log(Level.FINE, () -> String.format("Dummy node %s: got a mismatching activation (request version %d, " +
                            "actual %d), not marking version as active", this, activateVersion, actualVersion));
                }
            }
            log.log(Level.FINE, () -> "Dummy node " + this + ": Activating cluster state version " + activateVersion);
        } catch (Exception e) {
            log.log(Level.SEVERE, "Dummy node " + this + ": An error occurred when answering activate_cluster_state_version request: " + e.getMessage());
            e.printStackTrace(System.err);
            req.setError(ErrorCode.METHOD_FAILED, e.getMessage());
        }
    }
}