summaryrefslogtreecommitdiffstats
path: root/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java
blob: 93aac5c83ed8f2855610b897186a36ba68dc56c6 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.clustercontroller.core;

import com.yahoo.vdslib.state.*;
import com.yahoo.vespa.clustercontroller.core.database.DatabaseHandler;
import com.yahoo.vespa.clustercontroller.core.listeners.NodeAddedOrRemovedListener;
import com.yahoo.vespa.clustercontroller.core.listeners.NodeStateOrHostInfoChangeHandler;
import org.junit.Test;

import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

public class SystemStateBroadcasterTest {

    private static class Fixture {
        FakeTimer timer = new FakeTimer();
        final Object monitor = new Object();
        SystemStateBroadcaster broadcaster = new SystemStateBroadcaster(timer, monitor);
        Communicator mockCommunicator = mock(Communicator.class);

        void simulateNodePartitionedAwaySilently(ClusterFixture cf) {
            cf.cluster().getNodeInfo(Node.ofStorage(0)).setStartTimestamp(600);
            cf.cluster().getNodeInfo(Node.ofStorage(1)).setStartTimestamp(700);
            // Simulate a distributor being partitioned away from the controller without actually going down. It will
            // need to observe all startup timestamps to infer if it should fetch bucket info from nodes.
            cf.cluster().getNodeInfo(Node.ofDistributor(0)).setStartTimestamp(500); // FIXME multiple sources of timestamps are... rather confusing
            cf.cluster().getNodeInfo(Node.ofDistributor(0)).setReportedState(new NodeState(NodeType.DISTRIBUTOR, State.UP).setStartTimestamp(500), 1000);
            cf.cluster().getNodeInfo(Node.ofDistributor(0)).setReportedState(new NodeState(NodeType.DISTRIBUTOR, State.DOWN).setStartTimestamp(500), 2000);
            cf.cluster().getNodeInfo(Node.ofDistributor(0)).setReportedState(new NodeState(NodeType.DISTRIBUTOR, State.UP).setStartTimestamp(500), 3000);
        }
    }

    private static DatabaseHandler.Context dbContextFrom(ContentCluster cluster) {
        return new DatabaseHandler.Context() {
            @Override
            public ContentCluster getCluster() {
                return cluster;
            }

            @Override
            public FleetController getFleetController() {
                return null; // We assume the broadcaster doesn't use this for our test purposes
            }

            @Override
            public NodeAddedOrRemovedListener getNodeAddedOrRemovedListener() {
                return null;
            }

            @Override
            public NodeStateOrHostInfoChangeHandler getNodeStateUpdateListener() {
                return null;
            }
        };
    }

    private static Stream<NodeInfo> clusterNodeInfos(ContentCluster c, Node... nodes) {
        return Stream.of(nodes).map(c::getNodeInfo);
    }

    private static class StateMapping {
        final String bucketSpace;
        final ClusterState state;

        StateMapping(String bucketSpace, ClusterState state) {
            this.bucketSpace = bucketSpace;
            this.state = state;
        }
    }

    private static StateMapping mapping(String bucketSpace, String state) {
        return new StateMapping(bucketSpace, ClusterState.stateFromString(state));
    }

    private static ClusterStateBundle makeBundle(String baselineState, StateMapping... bucketSpaceStates) {
        return ClusterStateBundle.of(AnnotatedClusterState.withoutAnnotations(ClusterState.stateFromString(baselineState)),
                Stream.of(bucketSpaceStates).collect(Collectors.toMap(sm -> sm.bucketSpace, sm -> sm.state)));
    }

    @Test
    public void always_publish_baseline_cluster_state() {
        Fixture f = new Fixture();
        ClusterStateBundle stateBundle = makeBundle("distributor:2 storage:2");
        ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses();
        f.broadcaster.handleNewClusterStates(stateBundle);
        f.broadcaster.broadcastNewState(dbContextFrom(cf.cluster()), f.mockCommunicator);
        cf.cluster().getNodeInfo().forEach(nodeInfo -> verify(f.mockCommunicator).setSystemState(eq(stateBundle), eq(nodeInfo), any()));
    }

    @Test
    public void non_observed_startup_timestamps_are_published_per_node_for_baseline_state() {
        Fixture f = new Fixture();
        ClusterStateBundle stateBundle = makeBundle("distributor:2 storage:2");
        ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses();
        f.simulateNodePartitionedAwaySilently(cf);
        f.broadcaster.handleNewClusterStates(stateBundle);
        f.broadcaster.broadcastNewState(dbContextFrom(cf.cluster()), f.mockCommunicator);

        clusterNodeInfos(cf.cluster(), Node.ofDistributor(1), Node.ofStorage(0), Node.ofStorage(1)).forEach(nodeInfo -> {
            // Only distributor 0 should observe startup timestamps
            verify(f.mockCommunicator).setSystemState(eq(stateBundle), eq(nodeInfo), any());
        });
        ClusterStateBundle expectedDistr0Bundle = makeBundle("distributor:2 storage:2 .0.t:600 .1.t:700");
        verify(f.mockCommunicator).setSystemState(eq(expectedDistr0Bundle), eq(cf.cluster().getNodeInfo(Node.ofDistributor(0))), any());
    }

    @Test
    public void bucket_space_states_are_published_verbatim_when_no_additional_timestamps_needed() {
        Fixture f = new Fixture();
        ClusterStateBundle stateBundle = makeBundle("distributor:2 storage:2",
                mapping("default", "distributor:2 storage:2 .0.s:d"),
                mapping("upsidedown", "distributor:2 .0.s:d storage:2"));
        ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses();
        f.broadcaster.handleNewClusterStates(stateBundle);
        f.broadcaster.broadcastNewState(dbContextFrom(cf.cluster()), f.mockCommunicator);

        cf.cluster().getNodeInfo().forEach(nodeInfo -> verify(f.mockCommunicator).setSystemState(eq(stateBundle), eq(nodeInfo), any()));
    }

    @Test
    public void non_observed_startup_timestamps_are_published_per_bucket_space_state() {
        Fixture f = new Fixture();
        ClusterStateBundle stateBundle = makeBundle("distributor:2 storage:2",
                mapping("default", "distributor:2 storage:2 .0.s:d"),
                mapping("upsidedown", "distributor:2 .0.s:d storage:2"));
        ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses();
        f.simulateNodePartitionedAwaySilently(cf);
        f.broadcaster.handleNewClusterStates(stateBundle);
        f.broadcaster.broadcastNewState(dbContextFrom(cf.cluster()), f.mockCommunicator);

        clusterNodeInfos(cf.cluster(), Node.ofDistributor(1), Node.ofStorage(0), Node.ofStorage(1)).forEach(nodeInfo -> {
            // Only distributor 0 should observe startup timestamps
            verify(f.mockCommunicator).setSystemState(eq(stateBundle), eq(nodeInfo), any());
        });
        ClusterStateBundle expectedDistr0Bundle = makeBundle("distributor:2 storage:2 .0.t:600 .1.t:700",
                mapping("default", "distributor:2 storage:2 .0.s:d .0.t:600 .1.t:700"),
                mapping("upsidedown", "distributor:2 .0.s:d storage:2 .0.t:600 .1.t:700"));
        verify(f.mockCommunicator).setSystemState(eq(expectedDistr0Bundle), eq(cf.cluster().getNodeInfo(Node.ofDistributor(0))), any());
    }
}