summaryrefslogtreecommitdiffstats
path: root/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java
blob: 5fd7c26617ee56a2505f1b3e50a785ee3cd67082 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.clustercontroller.core;

import com.yahoo.jrt.*;
import com.yahoo.jrt.StringValue;
import com.yahoo.jrt.slobrok.api.BackOffPolicy;
import com.yahoo.jrt.slobrok.api.Register;
import com.yahoo.jrt.slobrok.api.SlobrokList;
import com.yahoo.log.LogLevel;
import com.yahoo.vdslib.state.*;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.*;
import java.util.logging.Logger;

/**
 *
 * Used to fake a node in VDS, such that we can test the fleetcontroller without dummy interface for talking to
 * VDS nodes.
 */
public class DummyVdsNode {

    public static Logger log = Logger.getLogger(DummyVdsNode.class.getName());

    private String slobrokConnectionSpecs[];
    private String clusterName;
    private NodeType type;
    private int index;
    private NodeState nodeState;
    private Supervisor supervisor;
    private Acceptor acceptor;
    private Register register;
    private int stateCommunicationVersion;
    private boolean negotiatedHandle = false;
    private final Timer timer;
    private boolean failSetSystemStateRequests = false;
    private boolean resetTimestampOnReconnect = false;
    private long startTimestamp;
    private Map<Node, Long> highestStartTimestamps = new TreeMap<Node, Long>();
    public int timedOutStateReplies = 0;
    public int outdatedStateReplies = 0;
    public int immediateStateReplies = 0;
    public int setNodeStateReplies = 0;
    private boolean registeredInSlobrok = false;

    class Req {
        Request request;
        long timeout;

        Req(Request r, long timeout) {
            request = r;
            this.timeout = timeout;
        }
    }
    class BackOff implements BackOffPolicy {
        public void reset() {}
        public double get() { return 0.01; }
        public boolean shouldWarn(double v) { return false; }
    }
    private final List<Req> waitingRequests = new LinkedList<>();

    /**
     * History of received system states.
     * Any access to this list or to its members must be synchronized on the timer variable.
     */
    private List<ClusterState> systemState = new LinkedList<>();

    private Thread messageResponder = new Thread() {
        public void run() {
            log.log(LogLevel.DEBUG, "Dummy node " + DummyVdsNode.this.toString() + ": starting message reponder thread");
            while (true) {
                synchronized (timer) {
                    if (isInterrupted()) break;
                    long currentTime = timer.getCurrentTimeInMillis();
                    for (Iterator<Req> it = waitingRequests.iterator(); it.hasNext(); ) {
                        Req r = it.next();
                        if (r.timeout <= currentTime) {
                            log.log(LogLevel.DEBUG, "Dummy node " + DummyVdsNode.this.toString() + ": Responding to node state request at time " + currentTime);
                            r.request.returnValues().add(new StringValue(nodeState.serialize()));
                            if (r.request.methodName().equals("getnodestate3")) {
                                r.request.returnValues().add(new StringValue("No host info in dummy implementation"));
                            }
                            r.request.returnRequest();
                            it.remove();
                            ++timedOutStateReplies;
                        }
                    }
                    try{
                        timer.wait(100);
                    } catch (InterruptedException e) {
                        break;
                    }
                }
            }
            log.log(LogLevel.DEBUG, "Dummy node " + DummyVdsNode.this.toString() + ": shut down message reponder thread");
        }
    };

    public DummyVdsNode(Timer timer, DummyVdsNodeOptions options, String slobrokConnectionSpecs[], String clusterName, boolean distributor, int index) throws Exception {
        this.timer = timer;
        this.slobrokConnectionSpecs = slobrokConnectionSpecs;
        this.clusterName = clusterName;
        type = distributor ? NodeType.DISTRIBUTOR : NodeType.STORAGE;
        this.index = index;
        this.nodeState = new NodeState(type, State.UP);
        this.stateCommunicationVersion = options.stateCommunicationVersion;
        messageResponder.start();
        nodeState.setStartTimestamp(timer.getCurrentTimeInMillis() / 1000);
    }

    public void resetStartTimestamp() {
        resetTimestampOnReconnect = true;
    }

    public int getPendingNodeStateCount() { return waitingRequests.size(); }

    public void shutdown() {
        messageResponder.interrupt();
        try{ messageResponder.join(); } catch (InterruptedException e) {}
        disconnect();
    }

    public int connect() throws ListenFailedException, UnknownHostException {
        if (resetTimestampOnReconnect) {
            startTimestamp = timer.getCurrentTimeInMillis() / 1000;
            nodeState.setStartTimestamp(startTimestamp);
            resetTimestampOnReconnect = false;
        }
        supervisor = new Supervisor(new Transport());
        addMethods();
        acceptor = supervisor.listen(new Spec(0));
        SlobrokList slist = new SlobrokList();
        slist.setup(slobrokConnectionSpecs);
        register = new Register(supervisor, slist, new Spec("localhost", acceptor.port()), new BackOff());
        registerSlobrok();
        negotiatedHandle = false;
        return acceptor.port();
    }

    public boolean isConnected() {
        return (registeredInSlobrok && supervisor != null);
    }

    public void registerSlobrok() {
        register.registerName(getSlobrokName());
        register.registerName(getSlobrokName() + "/default");
        registeredInSlobrok = true;
    }

    public void disconnectSlobrok() {
        register.unregisterName(getSlobrokName());
        register.unregisterName(getSlobrokName() + "/default");
        registeredInSlobrok = false;
    }

    public void disconnect() { disconnectImmediately(); }
    public void disconnectImmediately() { disconnect(false, 0, false);  }
    public void disconnectBreakConnection() { disconnect(true, FleetControllerTest.timeoutMS, false); }
    public void disconnectAsShutdown() { disconnect(true, FleetControllerTest.timeoutMS, true); }
    public void disconnect(boolean waitForPendingNodeStateRequest, long timeoutms, boolean setStoppingStateFirst) {
        log.log(LogLevel.DEBUG, "Dummy node " + DummyVdsNode.this.toString() + ": Breaking connection." + (waitForPendingNodeStateRequest ? " Waiting for pending state first." : ""));
        if (waitForPendingNodeStateRequest) {
            this.waitForPendingGetNodeStateRequest(timeoutms);
        }
        if (setStoppingStateFirst) {
            NodeState newState = nodeState.clone();
            newState.setState(State.STOPPING);
            // newState.setDescription("Received signal 15 (SIGTERM - Termination signal)");
            // Altered in storageserver implementation. Updating now to fit
            newState.setDescription("controlled shutdown");
            setNodeState(newState);
            // Sleep a bit in hopes of answer being written before shutting down socket
            try{ Thread.sleep(100); } catch (InterruptedException e) {}
        }
        if (supervisor == null) return;
        register.shutdown();
        acceptor.shutdown().join();
        supervisor.transport().shutdown().join();
        supervisor = null;
        log.log(LogLevel.DEBUG, "Dummy node " + DummyVdsNode.this.toString() + ": Done breaking connection.");
    }

    public String toString() {
        return type + "." + index;
    }

    public boolean isDistributor() { return type.equals(NodeType.DISTRIBUTOR); }
    public NodeType getType() { return type; }

    public Node getNode() {
        return new Node(type, index);
    }

    public int getStateCommunicationVersion() { return stateCommunicationVersion; }

    public void waitForSystemStateVersion(int version, long timeout) {
        try {
            long startTime = System.currentTimeMillis();
            while (getLatestSystemStateVersion().orElse(-1) < version) {
                if ( (System.currentTimeMillis() - startTime) > timeout)
                    throw new RuntimeException("Timed out waiting for state version " + version + " in " + this);
                Thread.sleep(10);
            }
        }
        catch (InterruptedException e) {
        }
    }

    /** Returns the latest system state version received, or empty if none are received yet. */
    private Optional<Integer> getLatestSystemStateVersion() {
        synchronized(timer) {
            if (systemState.isEmpty()) return Optional.empty();
            return Optional.of(systemState.get(0).getVersion());
        }
    }

    public boolean hasPendingGetNodeStateRequest() {
        synchronized (timer) {
            return !waitingRequests.isEmpty();
        }
    }

    public void waitForPendingGetNodeStateRequest(long timeout) {
        long startTime = System.currentTimeMillis();
        long endTime = startTime + timeout;
        log.log(LogLevel.DEBUG, "Dummy node " + this + " waiting for pending node state request.");
        while (true) {
            synchronized(timer) {
                if (!waitingRequests.isEmpty()) {
                    log.log(LogLevel.DEBUG, "Dummy node " + this + " has pending request, returning.");
                    return;
                }
                try{
                    log.log(LogLevel.DEBUG, "Dummy node " + this + " waiting " + (endTime - startTime) + " ms for pending request.");
                    timer.wait(endTime - startTime);
                } catch (InterruptedException e) {
                }
                log.log(LogLevel.DEBUG, "Dummy node " + this + " woke up to recheck.");
            }
            startTime = System.currentTimeMillis();
            if (startTime >= endTime) {
                log.log(LogLevel.DEBUG, "Dummy node " + this + " timeout passed. Don't have pending request.");
                if (!waitingRequests.isEmpty()) {
                    log.log(LogLevel.DEBUG, "Dummy node " + this + ". Non-empty set of waiting requests");
                }
                throw new IllegalStateException("Timeout. No pending get node state request pending after waiting " + timeout + " milliseconds.");
            }
        }
    }

    public void replyToPendingNodeStateRequests() {
        for(Req req : waitingRequests) {
            log.log(LogLevel.DEBUG, "Dummy node " + this + " answering pending node state request.");
            req.request.returnValues().add(new StringValue(nodeState.serialize()));
            if (req.request.methodName().equals("getnodestate3")) {
                req.request.returnValues().add(new StringValue("Dummy node host info"));
            }
            req.request.returnRequest();
            ++setNodeStateReplies;
        }
        waitingRequests.clear();
    }

    public void setNodeState(NodeState state) {
        log.log(LogLevel.DEBUG, "Dummy node " + this + " got new state: " + state);
        synchronized(timer) {
            this.nodeState = state;
            replyToPendingNodeStateRequests();
        }
    }

    public void setNodeState(State state) {
        setNodeState(new NodeState(type, state));
    }

    public NodeState getNodeState() {
        synchronized(timer) {
            return nodeState;
        }
    }

    public List<ClusterState> getSystemStatesReceived() {
        List<ClusterState> states = new ArrayList<>();
        synchronized(timer) {
            states.addAll(systemState);
        }
        return states;
    }

    public ClusterState getClusterState() {
        synchronized(timer) {
            return (systemState.isEmpty() ? null : systemState.get(0));
        }
    }

    public String getSlobrokName() {
        return "storage/cluster." + clusterName + "/" + type + "/" + index;
    }

    private void addMethods() {
        Method m;

        m = new Method("vespa.storage.connect", "s", "i", this, "rpc_storageConnect");
        m.methodDesc("Binds connection to a storage API handle");
        m.paramDesc(0, "somearg", "Argument looking like slobrok address of the ones we're asking for some reason");
        m.returnDesc(0, "returnCode", "Returncode of request. Should be 0 = OK");
        supervisor.addMethod(m);

        m = new Method("getnodestate", "", "issi", this, "rpc_getNodeState");
        m.methodDesc("Get nodeState of a node");
        m.returnDesc(0, "returnCode", "Returncode of request. Should be 1 = OK");
        m.returnDesc(1, "returnMessage", "Textual error message if returncode is not ok.");
        m.returnDesc(2, "nodeState", "The node state of the given node");
        m.returnDesc(3, "progress", "Progress in percent of node initialization");
        supervisor.addMethod(m);

        m = new Method("setsystemstate", "s", "is", this, "rpc_setSystemState");
        m.methodDesc("Set system state of entire system");
        m.paramDesc(0, "systemState", "new systemstate");
        m.returnDesc(0, "returnCode", "Returncode of request. Should be 1 = OK");
        m.returnDesc(1, "returnMessage", "Textual error message if returncode is not ok.");
        supervisor.addMethod(m);

        if (stateCommunicationVersion > 0) {
            m = new Method("getnodestate2", "si", "s", this, "rpc_getNodeState2");
            m.methodDesc("Get nodeState of a node, answer when state changes from given state.");
            m.paramDesc(0, "nodeStateIn", "The node state of the given node");
            m.paramDesc(1, "timeout", "Time timeout in milliseconds set by the state requester.");
            m.returnDesc(0, "nodeStateOut", "The node state of the given node");
            supervisor.addMethod(m);

            m = new Method("setsystemstate2", "s", "", this, "rpc_setSystemState2");
            m.methodDesc("Set system state of entire system");
            m.paramDesc(0, "systemState", "new systemstate");
            supervisor.addMethod(m);

            if (stateCommunicationVersion > 1) {
                m = new Method("getnodestate3", "sii", "ss", this, "rpc_getNodeState2");
                m.methodDesc("Get nodeState of a node, answer when state changes from given state.");
                m.paramDesc(0, "nodeStateIn", "The node state of the given node");
                m.paramDesc(1, "timeout", "Time timeout in milliseconds set by the state requester.");
                m.returnDesc(0, "nodeStateOut", "The node state of the given node");
                m.returnDesc(1, "hostinfo", "Information on the host node is running on");
                supervisor.addMethod(m);
            }
        }
    }

    public void rpc_storageConnect(Request req) {
        synchronized(timer) {
            log.log(LogLevel.SPAM, "Dummy node " + this + " got old type handle connect message.");
            req.returnValues().add(new Int32Value(0));
            negotiatedHandle = true;
        }
    }

    public void rpc_getNodeState(Request req) {
        synchronized(timer) {
            if (!negotiatedHandle) {
                req.setError(75000, "Connection not bound to a handle");
                return;
            }
            String stateString = nodeState.serialize(-1, true);
            log.log(LogLevel.DEBUG, "Dummy node " + this + " got old type get node state request, answering: " + stateString);
            req.returnValues().add(new Int32Value(1));
            req.returnValues().add(new StringValue(""));
            req.returnValues().add(new StringValue(stateString));
            req.returnValues().add(new Int32Value(0));
        }
    }

    public boolean sendGetNodeStateReply(int index) {
        for (Iterator<Req> it = waitingRequests.iterator(); it.hasNext(); ) {
             Req r = it.next();
             if (r.request.parameters().size() > 2 && r.request.parameters().get(2).asInt32() == index) {
                 log.log(LogLevel.DEBUG, "Dummy node " + DummyVdsNode.this.toString() + ": Responding to node state reply from controller " + index + " as we received new one");
                 r.request.returnValues().add(new StringValue(nodeState.serialize()));
                 r.request.returnValues().add(new StringValue("No host info from dummy implementation"));
                 r.request.returnRequest();
                 it.remove();
                 ++outdatedStateReplies;
                 return true;
             }
        }
        return false;
    }

    public void rpc_getNodeState2(Request req) {
        log.log(LogLevel.DEBUG, "Dummy node " + this + ": Got " + req.methodName() + " request");
        try{
            String oldState = req.parameters().get(0).asString();
            int timeout = req.parameters().get(1).asInt32();
            int index = -1;
            if (req.parameters().size() > 2) {
                index = req.parameters().get(2).asInt32();
            }
            synchronized(timer) {
                boolean sentReply = sendGetNodeStateReply(index);
                NodeState givenState = (oldState.equals("unknown") ? null : NodeState.deserialize(type, oldState));
                if (givenState != null && (givenState.equals(nodeState) || sentReply)) {
                    log.log(LogLevel.DEBUG, "Dummy node " + this + ": Has same state as reported " + givenState + ". Queing request. Timeout is " + timeout + " ms. "
                            + "Will be answered at time " + (timer.getCurrentTimeInMillis() + timeout * 800l / 1000));
                    req.detach();
                    waitingRequests.add(new Req(req, timer.getCurrentTimeInMillis() + timeout * 800l / 1000));
                    log.log(LogLevel.DEBUG, "Dummy node " + this + " has now " + waitingRequests.size() + " entries and is " + (waitingRequests.isEmpty() ? "empty" : "not empty"));
                    timer.notifyAll();
                } else {
                    log.log(LogLevel.DEBUG, "Dummy node " + this + ": Request had " + (givenState == null ? "no state" : "different state(" + givenState +")") + ". Answering with " + nodeState);
                    req.returnValues().add(new StringValue(nodeState.serialize()));
                    if (req.methodName().equals("getnodestate3")) {
                        req.returnValues().add(new StringValue("Dummy node host info"));
                    }
                    ++immediateStateReplies;
                }
            }
        } catch (Exception e) {
            log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occured when answering " + req.methodName() + " request: " + e.getMessage());
            e.printStackTrace(System.err);
            req.setError(ErrorCode.METHOD_FAILED, e.getMessage());
        }
    }

    public long getStartTimestamp(Node n) {
        Long ts = highestStartTimestamps.get(n);
        return (ts == null ? 0 : ts);
    }

    private void updateStartTimestamps(ClusterState state) {
        for(int i=0; i<2; ++i) {
            NodeType nodeType = (i == 0 ? NodeType.DISTRIBUTOR : NodeType.STORAGE);
            for (int j=0, n=state.getNodeCount(nodeType); j<n; ++j) {
                Node node = new Node(nodeType, j);
                NodeState ns = state.getNodeState(node);
                if (ns.getStartTimestamp() != 0) {
                    Long oldValue = highestStartTimestamps.get(node);
                    if (oldValue != null && oldValue > ns.getStartTimestamp()) {
                        throw new Error("Somehow start timestamp of node " + node + " has gone down");
                    }
                    highestStartTimestamps.put(node, ns.getStartTimestamp());
                }
            }
        }
    }

    public void failSetSystemState(boolean failSystemStateRequests) {
        synchronized (timer) {
            this.failSetSystemStateRequests = failSystemStateRequests;
        }
    }

    private boolean shouldFailSetSystemStateRequests() {
        synchronized (timer) {
            return failSetSystemStateRequests;
        }
    }

    public void rpc_setSystemState(Request req) {
        try{
            if (shouldFailSetSystemStateRequests()) {
                req.setError(ErrorCode.GENERAL_ERROR, "Dummy node configured to fail setSystemState() calls");
                return;
            }
            if (!negotiatedHandle) {
                req.setError(75000, "Connection not bound to a handle");
                return;
            }
            ClusterState newState = new ClusterState(req.parameters().get(0).asString());
            synchronized(timer) {
                updateStartTimestamps(newState);
                systemState.add(0, newState);
                timer.notifyAll();
            }
            req.returnValues().add(new Int32Value(1));
            req.returnValues().add(new StringValue("OK"));
            log.log(LogLevel.DEBUG, "Dummy node " + this + ": Got new system state (through old setsystemstate call) " + newState);
        } catch (Exception e) {
            log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occured when answering setsystemstate request: " + e.getMessage());
            e.printStackTrace(System.err);
            req.returnValues().add(new Int32Value(ErrorCode.METHOD_FAILED));
            req.returnValues().add(new StringValue(e.getMessage()));
        }
    }

    public void rpc_setSystemState2(Request req) {
        try{
            if (shouldFailSetSystemStateRequests()) {
                req.setError(ErrorCode.GENERAL_ERROR, "Dummy node configured to fail setSystemState2() calls");
                return;
            }
            ClusterState newState = new ClusterState(req.parameters().get(0).asString());
            synchronized(timer) {
                updateStartTimestamps(newState);
                systemState.add(0, newState);
                timer.notifyAll();
            }
            log.log(LogLevel.DEBUG, "Dummy node " + this + ": Got new system state " + newState);
        } catch (Exception e) {
            log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occured when answering setsystemstate request: " + e.getMessage());
            e.printStackTrace(System.err);
            req.setError(ErrorCode.METHOD_FAILED, e.getMessage());
        }
    }
}