summaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
blob: 7369b33e82d9233659ba75014dbc0f936860433d (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.search.dispatch;

import com.yahoo.component.AbstractComponent;
import com.yahoo.container.handler.VipStatus;
import com.yahoo.jdisc.Metric;
import com.yahoo.prelude.fastsearch.VespaBackEndSearcher;
import com.yahoo.processing.request.CompoundName;
import com.yahoo.search.Query;
import com.yahoo.search.Result;
import com.yahoo.search.dispatch.SearchPath.InvalidSearchPathException;
import com.yahoo.search.dispatch.rpc.RpcInvokerFactory;
import com.yahoo.search.dispatch.rpc.RpcResourcePool;
import com.yahoo.search.dispatch.searchcluster.Group;
import com.yahoo.search.dispatch.searchcluster.Node;
import com.yahoo.search.dispatch.searchcluster.PingFactory;
import com.yahoo.search.dispatch.searchcluster.SearchCluster;
import com.yahoo.search.query.profile.types.FieldDescription;
import com.yahoo.search.query.profile.types.FieldType;
import com.yahoo.search.query.profile.types.QueryProfileType;
import com.yahoo.search.result.ErrorMessage;
import com.yahoo.vespa.config.search.DispatchConfig;

import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;

/**
 * A dispatcher communicates with search nodes to perform queries and fill hits.
 *
 * This class allocates {@link SearchInvoker} and {@link FillInvoker} objects based
 * on query properties and general system status. The caller can then use the provided
 * invocation object to execute the search or fill.
 *
 * This class is multithread safe.
 *
 * @author bratseth
 * @author ollvir
 */
public class Dispatcher extends AbstractComponent {

    public static final String DISPATCH = "dispatch";
    private static final String INTERNAL = "internal";
    private static final String PROTOBUF = "protobuf";

    private static final String FDISPATCH_METRIC = "dispatch_fdispatch";
    private static final String INTERNAL_METRIC = "dispatch_internal";

    private static final int MAX_GROUP_SELECTION_ATTEMPTS = 3;

    /** If enabled, search queries will use protobuf rpc */
    public static final CompoundName dispatchProtobuf = CompoundName.fromComponents(DISPATCH, PROTOBUF);

    /** A model of the search cluster this dispatches to */
    private final SearchCluster searchCluster;

    private final LoadBalancer loadBalancer;
    private final boolean multilevelDispatch;

    private final InvokerFactory invokerFactory;

    private final Metric metric;
    private final Metric.Context metricContext;

    private static final QueryProfileType argumentType;

    static {
        argumentType = new QueryProfileType(DISPATCH);
        argumentType.setStrict(true);
        argumentType.setBuiltin(true);
        argumentType.addField(new FieldDescription(INTERNAL, FieldType.booleanType));
        argumentType.addField(new FieldDescription(PROTOBUF, FieldType.booleanType));
        argumentType.freeze();
    }

    public static QueryProfileType getArgumentType() { return argumentType; }

    public static Dispatcher create(String clusterId,
                                    DispatchConfig dispatchConfig,
                                    int containerClusterSize,
                                    VipStatus vipStatus,
                                    Metric metric) {
        var searchCluster = new SearchCluster(clusterId, dispatchConfig, containerClusterSize, vipStatus);
        var rpcFactory = new RpcInvokerFactory(new RpcResourcePool(dispatchConfig), searchCluster);

        return new Dispatcher(searchCluster, dispatchConfig, rpcFactory, rpcFactory, metric);
    }

    public Dispatcher(SearchCluster searchCluster,
                      DispatchConfig dispatchConfig,
                      InvokerFactory invokerFactory,
                      PingFactory pingFactory,
                      Metric metric) {
        this.searchCluster = searchCluster;
        this.loadBalancer = new LoadBalancer(searchCluster,
                                  dispatchConfig.distributionPolicy() == DispatchConfig.DistributionPolicy.ROUNDROBIN);
        this.invokerFactory = invokerFactory;
        this.multilevelDispatch = dispatchConfig.useMultilevelDispatch();
        this.metric = metric;
        this.metricContext = metric.createContext(null);

        searchCluster.startClusterMonitoring(pingFactory);
    }

    /** Returns the search cluster this dispatches to */
    public SearchCluster searchCluster() {
        return searchCluster;
    }

    @Override
    public void deconstruct() {
        invokerFactory.release();
    }

    public Optional<FillInvoker> getFillInvoker(Result result, VespaBackEndSearcher searcher) {
        return invokerFactory.createFillInvoker(searcher, result);
    }

    public Optional<SearchInvoker> getSearchInvoker(Query query, VespaBackEndSearcher searcher) {
        if (multilevelDispatch) {
            emitDispatchMetric(Optional.empty());
            return Optional.empty();
        }

        Optional<SearchInvoker> invoker = getSearchPathInvoker(query, searcher);

        if (invoker.isEmpty()) {
            invoker = getInternalInvoker(query, searcher);
        }
        if (invoker.isPresent() && query.properties().getBoolean(com.yahoo.search.query.Model.ESTIMATE)) {
            query.setHits(0);
            query.setOffset(0);
        }
        emitDispatchMetric(invoker);

        return invoker;
    }

    /** Builds an invoker based on searchpath */
    private Optional<SearchInvoker> getSearchPathInvoker(Query query, VespaBackEndSearcher searcher) {
        String searchPath = query.getModel().getSearchPath();
        if (searchPath == null) return Optional.empty();

        try {
            List<Node> nodes = SearchPath.selectNodes(searchPath, searchCluster);
            if (nodes.isEmpty()) return Optional.empty();

            query.trace(false, 2, "Dispatching internally with search path ", searchPath);
            return invokerFactory.createSearchInvoker(searcher, query, OptionalInt.empty(), nodes, true);
        } catch (InvalidSearchPathException e) {
            return Optional.of(new SearchErrorInvoker(ErrorMessage.createIllegalQuery(e.getMessage())));
        }
    }

    private Optional<SearchInvoker> getInternalInvoker(Query query, VespaBackEndSearcher searcher) {
        Optional<Node> directNode = searchCluster.localCorpusDispatchTarget();
        if (directNode.isPresent()) {
            Node node = directNode.get();
            query.trace(false, 2, "Dispatching directly to ", node);
            return invokerFactory.createSearchInvoker(searcher, query, OptionalInt.empty(), Arrays.asList(node), true);
        }

        int covered = searchCluster.groupsWithSufficientCoverage();
        int groups = searchCluster.orderedGroups().size();
        int max = Integer.min(Integer.min(covered + 1, groups), MAX_GROUP_SELECTION_ATTEMPTS);
        Set<Integer> rejected = null;
        for (int i = 0; i < max; i++) {
            Optional<Group> groupInCluster = loadBalancer.takeGroup(rejected);
            if (groupInCluster.isEmpty()) break; // No groups available

            Group group = groupInCluster.get();
            boolean acceptIncompleteCoverage = (i == max - 1);
            Optional<SearchInvoker> invoker = invokerFactory.createSearchInvoker(searcher,
                                                                                 query,
                                                                                 OptionalInt.of(group.id()),
                                                                                 group.nodes(),
                                                                                 acceptIncompleteCoverage);
            if (invoker.isPresent()) {
                query.trace(false, 2, "Dispatching internally to search group ", group.id());
                query.getModel().setSearchPath("/" + group.id());
                invoker.get().teardown((success, time) -> loadBalancer.releaseGroup(group, success, time));
                return invoker;
            } else {
                loadBalancer.releaseGroup(group, false, 0);
                if (rejected == null) {
                    rejected = new HashSet<>();
                }
                rejected.add(group.id());
            }
        }

        return Optional.empty();
    }

    private void emitDispatchMetric(Optional<SearchInvoker> invoker) {
        if (invoker.isEmpty()) {
            metric.add(FDISPATCH_METRIC, 1, metricContext);
        } else {
            metric.add(INTERNAL_METRIC, 1, metricContext);
        }
    }

}