summaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
blob: 0dd682dee0e4dd3fcce79dafb474eb440c1ce281 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.search.dispatch;

import com.yahoo.component.AbstractComponent;
import com.yahoo.container.handler.VipStatus;
import com.yahoo.prelude.fastsearch.DocumentDatabase;
import com.yahoo.prelude.fastsearch.FS4InvokerFactory;
import com.yahoo.prelude.fastsearch.FS4ResourcePool;
import com.yahoo.prelude.fastsearch.VespaBackEndSearcher;
import com.yahoo.processing.request.CompoundName;
import com.yahoo.search.Query;
import com.yahoo.search.Result;
import com.yahoo.search.dispatch.SearchPath.InvalidSearchPathException;
import com.yahoo.search.dispatch.searchcluster.Group;
import com.yahoo.search.dispatch.searchcluster.Node;
import com.yahoo.search.dispatch.searchcluster.SearchCluster;
import com.yahoo.vespa.config.search.DispatchConfig;

import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

/**
 * A dispatcher communicates with search nodes to perform queries and fill hits.
 *
 * This class allocates {@link SearchInvoker} and {@link FillInvoker} objects based
 * on query properties and general system status. The caller can then use the provided
 * invocation object to execute the search or fill.
 *
 * This class is multithread safe.
 *
 * @author bratseth
 * @author ollvir
 */
public class Dispatcher extends AbstractComponent {
    private static final int MAX_GROUP_SELECTION_ATTEMPTS = 3;

    /** If enabled, this internal dispatcher will be preferred over fdispatch whenever possible */
    private static final CompoundName dispatchInternal = new CompoundName("dispatch.internal");

    /** A model of the search cluster this dispatches to */
    private final SearchCluster searchCluster;

    private final LoadBalancer loadBalancer;
    private final RpcResourcePool rpcResourcePool;

    public Dispatcher(DispatchConfig dispatchConfig, FS4ResourcePool fs4ResourcePool, int containerClusterSize, VipStatus vipStatus) {
        this.searchCluster = new SearchCluster(dispatchConfig, fs4ResourcePool, containerClusterSize, vipStatus);
        this.loadBalancer = new LoadBalancer(searchCluster,
                dispatchConfig.distributionPolicy() == DispatchConfig.DistributionPolicy.ROUNDROBIN);
        this.rpcResourcePool = new RpcResourcePool(dispatchConfig);
    }

    /** For testing */
    public Dispatcher(Map<Integer, Client.NodeConnection> nodeConnections, Client client) {
        this.searchCluster = null;
        this.loadBalancer = new LoadBalancer(searchCluster, true);
        this.rpcResourcePool = new RpcResourcePool(client, nodeConnections);
    }

    /** Returns the search cluster this dispatches to */
    public SearchCluster searchCluster() {
        return searchCluster;
    }

    @Override
    public void deconstruct() {
        rpcResourcePool.release();
    }

    public Optional<FillInvoker> getFillInvoker(Result result, VespaBackEndSearcher searcher, DocumentDatabase documentDb,
            FS4InvokerFactory fs4InvokerFactory) {
        Optional<FillInvoker> rpcInvoker = rpcResourcePool.getFillInvoker(result.getQuery(), searcher, documentDb);
        if (rpcInvoker.isPresent()) {
            return rpcInvoker;
        }
        if (result.getQuery().properties().getBoolean(dispatchInternal, false)) {
            Optional<FillInvoker> fs4Invoker = fs4InvokerFactory.getFillInvoker(result);
            if (fs4Invoker.isPresent()) {
                return fs4Invoker;
            }
        }
        return Optional.empty();
    }

    public Optional<SearchInvoker> getSearchInvoker(Query query, FS4InvokerFactory fs4InvokerFactory) {
        if (query.properties().getBoolean(dispatchInternal, false)) {
            Optional<SearchInvoker> invoker = getSearchPathInvoker(query, fs4InvokerFactory::getSearchInvoker);

            if(! invoker.isPresent()) {
                invoker = getInternalInvoker(query, fs4InvokerFactory::getSearchInvoker);
            }
            if(invoker.isPresent() && query.properties().getBoolean(com.yahoo.search.query.Model.ESTIMATE)) {
                query.setHits(0);
                query.setOffset(0);
            }
            return invoker;
        }
        return Optional.empty();
    }

    @FunctionalInterface
    private interface SearchInvokerSupplier {
        Optional<SearchInvoker> supply(Query query, int groupId, List<Node> nodes, boolean acceptIncompleteCoverage);
    }

    // build invoker based on searchpath
    private Optional<SearchInvoker> getSearchPathInvoker(Query query, SearchInvokerSupplier invokerFactory) {
        String searchPath = query.getModel().getSearchPath();
        if(searchPath == null) {
            return Optional.empty();
        }
        try {
            List<Node> nodes = SearchPath.selectNodes(searchPath, searchCluster);
            if (nodes.isEmpty()) {
                return Optional.empty();
            } else {
                return invokerFactory.supply(query, -1, nodes, true);
            }
        } catch (InvalidSearchPathException e) {
            return Optional.of(new SearchErrorInvoker(e.getMessage()));
        }
    }

    private Optional<SearchInvoker> getInternalInvoker(Query query, SearchInvokerSupplier invokerFactory) {
        Optional<Node> directNode = searchCluster.directDispatchTarget();
        if (directNode.isPresent()) {
            Node node = directNode.get();
            query.trace(false, 2, "Dispatching directly to ", node);
            return invokerFactory.supply(query, -1, Arrays.asList(node), true);
        }

        int max = Integer.min(searchCluster.orderedGroups().size(), MAX_GROUP_SELECTION_ATTEMPTS);
        Set<Integer> rejected = null;
        for (int i = 0; i < max; i++) {
            Optional<Group> groupInCluster = loadBalancer.takeGroupForQuery(query, rejected);
            if (!groupInCluster.isPresent()) {
                // No groups available
                break;
            }
            Group group = groupInCluster.get();
            boolean acceptIncompleteCoverage = (i == max - 1);
            Optional<SearchInvoker> invoker = invokerFactory.supply(query, group.id(), group.nodes(), acceptIncompleteCoverage);
            if (invoker.isPresent()) {
                query.trace(false, 2, "Dispatching internally to ", group);
                invoker.get().teardown(() -> loadBalancer.releaseGroup(group));
                return invoker;
            } else {
                loadBalancer.releaseGroup(group);
                if (rejected == null) {
                    rejected = new HashSet<>();
                }
                rejected.add(group.id());
            }
        }

        return Optional.empty();
    }
}