aboutsummaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java
blob: 6eb67b245f249afb7fa1cfef51f79f731e193261 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
package com.yahoo.search.dispatch;

import com.google.common.annotations.Beta;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultimap;
import com.yahoo.search.cluster.ClusterMonitor;
import com.yahoo.search.cluster.NodeManager;
import com.yahoo.search.result.ErrorMessage;
import com.yahoo.vespa.config.search.DispatchConfig;

// Only needed until query requests are moved to rpc
import com.yahoo.prelude.Ping;
import com.yahoo.prelude.fastsearch.FastSearcher;
import com.yahoo.yolean.Exceptions;
import com.yahoo.prelude.Pong;
import com.yahoo.prelude.fastsearch.FS4ResourcePool;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;

/**
 * A model of a search cluster we might want to dispatch queries to.
 * 
 * @author bratseth
 */
@Beta
public class SearchCluster implements NodeManager<SearchCluster.Node> {

    private static final Logger log = Logger.getLogger(SearchCluster.class.getName());
    
    private final int size;
    private final ImmutableMap<Integer, Group> groups;
    private final ImmutableMultimap<String, Node> nodesByHost;
    private final ClusterMonitor<Node> clusterMonitor;

    // Only needed until query requests are moved to rpc
    private final FS4ResourcePool fs4ResourcePool;

    public SearchCluster(DispatchConfig dispatchConfig, FS4ResourcePool fs4ResourcePool) {
        this(toNodes(dispatchConfig), fs4ResourcePool);
    }
    
    public SearchCluster(List<Node> nodes, FS4ResourcePool fs4ResourcePool) {
        size = nodes.size();
        this.fs4ResourcePool = fs4ResourcePool;
        
        // Create groups
        ImmutableMap.Builder<Integer, Group> groupsBuilder = new ImmutableMap.Builder<>();
        for (Map.Entry<Integer, List<Node>> group : nodes.stream().collect(Collectors.groupingBy(Node::group)).entrySet())
            groupsBuilder.put(group.getKey(), new Group(group.getKey(), group.getValue()));
        groups = groupsBuilder.build();
        
        // Index nodes by host
        ImmutableMultimap.Builder<String, Node> nodesByHostBuilder = new ImmutableMultimap.Builder<>();
        for (Node node : nodes)
            nodesByHostBuilder.put(node.hostname(), node);
        nodesByHost = nodesByHostBuilder.build();
        
        // Set up monitoring of the fs4 interface of the nodes
        // We can switch to monitoring the rpc interface instead when we move the query phase to rpc
        clusterMonitor = new ClusterMonitor<>(this);
        for (Node node : nodes)
            clusterMonitor.add(node, true);
    }
    
    private static ImmutableList<Node> toNodes(DispatchConfig dispatchConfig) {
        ImmutableList.Builder<Node> nodesBuilder = new ImmutableList.Builder<>();
        for (DispatchConfig.Node node : dispatchConfig.node())
            nodesBuilder.add(new Node(node.host(), node.port(), node.group()));
        return nodesBuilder.build();
    }
    
    /** Returns the number of nodes in this cluster (across all groups) */
    public int size() { return size; }
    
    /** Returns the groups of this cluster as an immutable map indexed by group id */
    public ImmutableMap<Integer, Group> groups() { return groups; }

    /** 
     * Returns the nodes of this cluster as an immutable map indexed by host.
     * One host may contain multiple nodes (on different ports), so this is a multi-map.
     */
    public ImmutableMultimap<String, Node> nodesByHost() { return nodesByHost; }

    /** Used by the cluster monitor to manage node status */
    @Override
    public void working(Node node) { node.setWorking(true); }

    /** Used by the cluster monitor to manage node status */
    @Override
    public void failed(Node node) { node.setWorking(false); }

    /** Used by the cluster monitor to manage node status */
    @Override
    public void ping(Node node, Executor executor) {
        Pinger pinger = new Pinger(node);
        FutureTask<Pong> future = new FutureTask<>(pinger);

        executor.execute(future);
        Pong pong;
        try {
            pong = future.get(clusterMonitor.getConfiguration().getFailLimit(), TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            pong = new Pong();
            pong.addError(ErrorMessage.createUnspecifiedError("Ping was interrupted: " + node));
            log.log(Level.WARNING, "Exception pinging " + node, e);
        } catch (ExecutionException e) {
            pong = new Pong();
            pong.addError(ErrorMessage.createUnspecifiedError("Execution was interrupted: " + node));
            log.log(Level.WARNING, "Exception pinging " + node, e);
        } catch (TimeoutException e) {
            pong = new Pong();
            pong.addError(ErrorMessage.createNoAnswerWhenPingingNode("Ping thread timed out"));
        }
        future.cancel(true);

        if (pong.badResponse())
            clusterMonitor.failed(node, pong.getError(0));
        else
            clusterMonitor.responded(node);
    }

    private class Pinger implements Callable<Pong> {

        private final Node node;

        public Pinger(Node node) {
            this.node = node;
        }

        public Pong call() {
            Pong pong;
            try {
                pong = FastSearcher.ping(new Ping(clusterMonitor.getConfiguration().getRequestTimeout()), 
                                         fs4ResourcePool.getBackend(node.hostname(), node.port()), node.toString());
            } catch (RuntimeException e) {
                pong = new Pong();
                pong.addError(ErrorMessage.createBackendCommunicationError("Exception when pinging " + node + ": "
                              + Exceptions.toMessageString(e)));
            }
            return pong;
        }

    }

    public static class Group {
        
        private final int id;
        private final ImmutableList<Node> nodes;
        
        public Group(int id, List<Node> nodes) {
            this.id = id;
            this.nodes = ImmutableList.copyOf(nodes);
        }

        /** Returns the id of this group */
        public int id() { return id; }
        
        /** Returns the nodes in this group as an immutable list */
        public ImmutableList<Node> nodes() { return nodes; }

        @Override
        public String toString() { return "search group " + id; }
        
    }
    
    public static class Node {
        
        private final String hostname;
        private final int port;
        private final int group;
        
        private final AtomicBoolean working = new AtomicBoolean();
        
        public Node(String hostname, int port, int group) {
            this.hostname = hostname;
            this.port = port;
            this.group = group;
        }
        
        public String hostname() { return hostname; }
        public int port() { return port; }

        /** Returns the id of this group this node belongs to */
        public int group() { return group; }
        
        private void setWorking(boolean working) {
            this.working.lazySet(working);
        }
        
        /** Returns whether this node is currently responding to requests */
        public boolean isWorking() { return working.get(); }
        
        @Override
        public int hashCode() { return Objects.hash(hostname, port); }
        
        @Override
        public boolean equals(Object o) {
            if (o == this) return true;
            if ( ! (o instanceof Node)) return false;
            Node other = (Node)o;
            if ( ! Objects.equals(this.hostname, other.hostname)) return false;
            if ( ! Objects.equals(this.port, other.port)) return false;
            return true;
        }
        
        @Override
        public String toString() { return "search node " + hostname; }
        
    }

}