summaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
blob: db7e80a95e557710b565155791da11800a931c5f (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.search.dispatch;

import com.yahoo.component.annotation.Inject;
import com.yahoo.component.AbstractComponent;
import com.yahoo.component.ComponentId;
import com.yahoo.compress.Compressor;
import com.yahoo.container.handler.VipStatus;
import com.yahoo.prelude.fastsearch.VespaBackEndSearcher;
import com.yahoo.processing.request.CompoundName;
import com.yahoo.search.Query;
import com.yahoo.search.Result;
import com.yahoo.search.cluster.ClusterMonitor;
import com.yahoo.search.dispatch.SearchPath.InvalidSearchPathException;
import com.yahoo.search.dispatch.rpc.RpcInvokerFactory;
import com.yahoo.search.dispatch.rpc.RpcPingFactory;
import com.yahoo.search.dispatch.rpc.RpcResourcePool;
import com.yahoo.search.dispatch.searchcluster.Group;
import com.yahoo.search.dispatch.searchcluster.SearchGroups;
import com.yahoo.search.dispatch.searchcluster.Node;
import com.yahoo.search.dispatch.searchcluster.SearchCluster;
import com.yahoo.search.query.profile.types.FieldDescription;
import com.yahoo.search.query.profile.types.FieldType;
import com.yahoo.search.query.profile.types.QueryProfileType;
import com.yahoo.search.result.ErrorMessage;
import com.yahoo.vespa.config.search.DispatchConfig;
import com.yahoo.vespa.config.search.DispatchNodesConfig;

import java.time.Duration;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;

/**
 * A dispatcher communicates with search nodes to perform queries and fill hits.
 * <p>
 * This class allocates {@link SearchInvoker} and {@link FillInvoker} objects based
 * on query properties and general system status. The caller can then use the provided
 * invocation object to execute the search or fill.
 * <p>
 * This class is multithread safe.
 *
 * @author bratseth
 * @author ollvir
 */
public class Dispatcher extends AbstractComponent {

    public static final String DISPATCH = "dispatch";
    private static final String TOP_K_PROBABILITY = "topKProbability";
    private static final int MAX_GROUP_SELECTION_ATTEMPTS = 3;

    /** If set will control computation of how many hits will be fetched from each partition.*/
    public static final CompoundName topKProbability = CompoundName.from(DISPATCH + "." + TOP_K_PROBABILITY);

    private final DispatchConfig dispatchConfig;
    private final RpcResourcePool rpcResourcePool;
    private final SearchCluster searchCluster;
    private final ClusterMonitor<Node> clusterMonitor;
    private volatile VolatileItems volatileItems;

    private static class VolatileItems {
        final LoadBalancer loadBalancer;
        final InvokerFactory invokerFactory;
        VolatileItems(LoadBalancer loadBalancer, InvokerFactory invokerFactory) {
            this.loadBalancer = loadBalancer;
            this.invokerFactory = invokerFactory;
        }
    }

    private static final QueryProfileType argumentType;

    static {
        argumentType = new QueryProfileType(DISPATCH);
        argumentType.setStrict(true);
        argumentType.setBuiltin(true);
        argumentType.addField(new FieldDescription(TOP_K_PROBABILITY, FieldType.doubleType));
        argumentType.freeze();
    }

    public static QueryProfileType getArgumentType() { return argumentType; }

    @Inject
    public Dispatcher(ComponentId clusterId, DispatchConfig dispatchConfig,
                      DispatchNodesConfig nodesConfig, VipStatus vipStatus) {
        this.dispatchConfig = dispatchConfig;
        rpcResourcePool = new RpcResourcePool(dispatchConfig, nodesConfig);
        searchCluster = new SearchCluster(clusterId.stringValue(), dispatchConfig.minActivedocsPercentage(),
                                          toNodes(nodesConfig), vipStatus, new RpcPingFactory(rpcResourcePool));
        clusterMonitor = new ClusterMonitor<>(searchCluster, true);
        volatileItems = update(null);
        initialWarmup(dispatchConfig.warmuptime());
    }

    /* For simple mocking in tests. Beware that searchCluster is shutdown on in deconstruct() */
    Dispatcher(ClusterMonitor<Node> clusterMonitor, SearchCluster searchCluster,
               DispatchConfig dispatchConfig, InvokerFactory invokerFactory) {
        this.dispatchConfig = dispatchConfig;
        this.rpcResourcePool = null;
        this.searchCluster = searchCluster;
        this.clusterMonitor = clusterMonitor;
        this.volatileItems = update(invokerFactory);
    }

    private VolatileItems update(InvokerFactory invokerFactory) {
        var items = new VolatileItems(new LoadBalancer(searchCluster.groupList().groups(), toLoadBalancerPolicy(dispatchConfig.distributionPolicy())),
                                      (invokerFactory == null)
                                             ? new RpcInvokerFactory(rpcResourcePool, searchCluster.groupList(), dispatchConfig)
                                             : invokerFactory);
        searchCluster.addMonitoring(clusterMonitor);
        return items;
    }

    private void initialWarmup(double warmupTime) {
        Thread warmup = new Thread(() -> warmup(warmupTime));
        warmup.start();
        try {
            while ( ! searchCluster.hasInformationAboutAllNodes()) {
                Thread.sleep(1);
            }
            warmup.join();
        } catch (InterruptedException e) {}

        // Now we have information from all nodes and a ping iteration has completed.
        // Instead of waiting until next ping interval to update coverage and group state,
        // we should compute the state ourselves, so that when the dispatcher is ready the state
        // of its groups are also known.
        searchCluster.pingIterationCompleted();
    }

    private static LoadBalancer.Policy toLoadBalancerPolicy(DispatchConfig.DistributionPolicy.Enum policy) {
        return switch (policy) {
            case ROUNDROBIN -> LoadBalancer.Policy.ROUNDROBIN;
            case BEST_OF_RANDOM_2 -> LoadBalancer.Policy.BEST_OF_RANDOM_2;
            case ADAPTIVE,LATENCY_AMORTIZED_OVER_REQUESTS -> LoadBalancer.Policy.LATENCY_AMORTIZED_OVER_REQUESTS;
            case LATENCY_AMORTIZED_OVER_TIME -> LoadBalancer.Policy.LATENCY_AMORTIZED_OVER_TIME;
        };
    }
    private static List<Node> toNodes(DispatchNodesConfig nodesConfig) {
        return nodesConfig.node().stream()
                .map(n -> new Node(n.key(), n.host(), n.group()))
                .toList();
    }

    /**
     * Will run important code in order to trigger JIT compilation and avoid cold start issues.
     * Currently warms up lz4 compression code.
     */
    private static void warmup(double seconds) {
        new Compressor().warmup(seconds);
    }

    public boolean allGroupsHaveSize1() {
        return searchCluster.groupList().groups().stream().allMatch(g -> g.nodes().size() == 1);
    }

    @Override
    public void deconstruct() {
        // The clustermonitor must be shutdown first as it uses the invokerfactory through the searchCluster.
        clusterMonitor.shutdown();
        if (rpcResourcePool != null) {
            rpcResourcePool.close();
        }
    }

    public FillInvoker getFillInvoker(Result result, VespaBackEndSearcher searcher) {
        return volatileItems.invokerFactory.createFillInvoker(searcher, result);
    }

    public SearchInvoker getSearchInvoker(Query query, VespaBackEndSearcher searcher) {
        VolatileItems items = volatileItems; // Take a snapshot
        int maxHitsPerNode = dispatchConfig.maxHitsPerNode();
        SearchInvoker invoker = getSearchPathInvoker(query, searcher, searchCluster.groupList(), items.invokerFactory, maxHitsPerNode)
                .orElseGet(() -> getInternalInvoker(query, searcher, searchCluster, items.loadBalancer, items.invokerFactory, maxHitsPerNode));

        if (query.properties().getBoolean(com.yahoo.search.query.Model.ESTIMATE)) {
            query.setHits(0);
            query.setOffset(0);
        }
        return invoker;
    }

    /** Builds an invoker based on searchpath */
    private static Optional<SearchInvoker> getSearchPathInvoker(Query query, VespaBackEndSearcher searcher, SearchGroups cluster,
                                                                InvokerFactory invokerFactory, int maxHitsPerNode) {
        String searchPath = query.getModel().getSearchPath();
        if (searchPath == null) return Optional.empty();

        try {
            List<Node> nodes = SearchPath.selectNodes(searchPath, cluster);
            if (nodes.isEmpty()) return Optional.empty();

            query.trace(false, 2, "Dispatching with search path ", searchPath);
            return invokerFactory.createSearchInvoker(searcher,
                                                      query,
                                                      nodes,
                                                      true,
                                                      maxHitsPerNode);
        } catch (InvalidSearchPathException e) {
            return Optional.of(new SearchErrorInvoker(ErrorMessage.createIllegalQuery(e.getMessage())));
        }
    }

    private static SearchInvoker getInternalInvoker(Query query, VespaBackEndSearcher searcher, SearchCluster cluster,
                                                    LoadBalancer loadBalancer, InvokerFactory invokerFactory, int maxHitsPerNode) {
        Optional<Node> directNode = cluster.localCorpusDispatchTarget();
        if (directNode.isPresent()) {
            Node node = directNode.get();
            query.trace(false, 2, "Dispatching to ", node);
            return invokerFactory.createSearchInvoker(searcher,
                                                      query,
                                                      List.of(node),
                                                      true,
                                                      maxHitsPerNode)
                                 .orElseThrow(() -> new IllegalStateException("Could not dispatch directly to " + node));
        }

        int covered = cluster.groupsWithSufficientCoverage();
        int groups = cluster.groupList().size();
        int max = Integer.min(Integer.min(covered + 1, groups), MAX_GROUP_SELECTION_ATTEMPTS);
        Set<Integer> rejected = rejectGroupBlockingFeed(cluster.groupList().groups());
        for (int i = 0; i < max; i++) {
            Optional<Group> groupInCluster = loadBalancer.takeGroup(rejected);
            if (groupInCluster.isEmpty()) break; // No groups available

            Group group = groupInCluster.get();
            boolean acceptIncompleteCoverage = (i == max - 1);
            Optional<SearchInvoker> invoker = invokerFactory.createSearchInvoker(searcher,
                                                                                 query,
                                                                                 group.nodes(),
                                                                                 acceptIncompleteCoverage,
                                                                                 maxHitsPerNode);
            if (invoker.isPresent()) {
                query.trace(false, 2, "Dispatching to group ", group.id(), " after retries = ", i);
                query.getModel().setSearchPath("/" + group.id());
                invoker.get().teardown((success, time) -> loadBalancer.releaseGroup(group, success, time));
                return invoker.get();
            } else {
                loadBalancer.releaseGroup(group, false, RequestDuration.of(Duration.ZERO));
                if (rejected == null) {
                    rejected = new HashSet<>();
                }
                rejected.add(group.id());
            }
        }
        throw new IllegalStateException("No suitable groups to dispatch query. Rejected: " + rejected);
    }

    /**
     * We want to avoid groups blocking feed because their data may be out of date.
     * If there is a single group blocking feed, we want to reject it.
     * If multiple groups are blocking feed we should use them anyway as we may not have remaining
     * capacity otherwise. Same if there are no other groups.
     *
     * @return a modifiable set containing the single group to reject, or null otherwise
     */
    private static Set<Integer> rejectGroupBlockingFeed(Collection<Group> groups) {
        if (groups.size() == 1) return null;
        List<Group> groupsRejectingFeed = groups.stream().filter(Group::isBlockingWrites).toList();
        if (groupsRejectingFeed.size() != 1) return null;
        Set<Integer> rejected = new HashSet<>();
        rejected.add(groupsRejectingFeed.get(0).id());
        return rejected;
    }

}