summaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4InvokerFactory.java
blob: 51048db3cb7721301761c8b06ad29e49600851c1 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.prelude.fastsearch;

import com.google.common.collect.ImmutableMap;
import com.yahoo.fs4.mplex.Backend;
import com.yahoo.search.Query;
import com.yahoo.search.Result;
import com.yahoo.search.dispatch.FillInvoker;
import com.yahoo.search.dispatch.InterleavedFillInvoker;
import com.yahoo.search.dispatch.InterleavedSearchInvoker;
import com.yahoo.search.dispatch.SearchInvoker;
import com.yahoo.search.dispatch.searchcluster.Node;
import com.yahoo.search.dispatch.searchcluster.SearchCluster;
import com.yahoo.search.result.Hit;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

/**
 * FS4InvokerFactory constructs {@link FillInvoker} and {@link SearchInvoker} objects that communicate with
 * content nodes or dispatchers over the fnet/FS4 protocol
 *
 * @author ollivir
 */
public class FS4InvokerFactory {
    private final FS4ResourcePool fs4ResourcePool;
    private final VespaBackEndSearcher searcher;
    private final SearchCluster searchCluster;
    private final ImmutableMap<Integer, Node> nodesByKey;

    public FS4InvokerFactory(FS4ResourcePool fs4ResourcePool, SearchCluster searchCluster, VespaBackEndSearcher searcher) {
        this.fs4ResourcePool = fs4ResourcePool;
        this.searcher = searcher;
        this.searchCluster = searchCluster;

        ImmutableMap.Builder<Integer, Node> builder = ImmutableMap.builder();
        searchCluster.groups().values().forEach(group -> group.nodes().forEach(node -> builder.put(node.key(), node)));
        this.nodesByKey = builder.build();
    }

    public SearchInvoker getSearchInvoker(Query query, Node node) {
        Backend backend = fs4ResourcePool.getBackend(node.hostname(), node.fs4port());
        return new FS4SearchInvoker(searcher, query, backend.openChannel(), node);
    }

    /**
     * Create a {@link SearchInvoker} for a list of content nodes.
     *
     * @param query
     *            the search query being processed
     * @param groupId
     *            the id of the node group to which the nodes belong
     * @param nodes
     *            pre-selected list of content nodes
     * @param acceptIncompleteCoverage
     *            if some of the nodes are unavailable and this parameter is
     *            <b>false</b>, verify that the remaining set of nodes has enough
     *            coverage
     * @return Optional containing the SearchInvoker or <i>empty</i> if some node in the
     *         list is invalid and the remaining coverage is not sufficient
     */
    public Optional<SearchInvoker> getSearchInvoker(Query query, int groupId, List<Node> nodes, boolean acceptIncompleteCoverage) {
        Map<Integer, SearchInvoker> invokers = new HashMap<>();
        Set<Integer> failed = null;
        for (Node node : nodes) {
            if (node.isWorking()) {
                Backend backend = fs4ResourcePool.getBackend(node.hostname(), node.fs4port());
                if (backend.probeConnection()) {
                    invokers.put(node.key(), new FS4SearchInvoker(searcher, query, backend.openChannel(), node));
                } else {
                    if(failed == null) {
                        failed = new HashSet<>();
                    }
                    failed.add(node.key());
                }
            }
        }
        if (failed != null && ! acceptIncompleteCoverage) {
            List<Node> success = new ArrayList<>(nodes.size() - failed.size());
            for (Node node : nodes) {
                if (!failed.contains(node.key())) {
                    success.add(node);
                }
            }
            if (!searchCluster.isPartialGroupCoverageSufficient(groupId, success)) {
                return Optional.empty();
            }
        }
        if (invokers.size() == 1) {
            return Optional.of(invokers.values().iterator().next());
        } else {
            return Optional.of(new InterleavedSearchInvoker(invokers));
        }
    }

    public FillInvoker getFillInvoker(Query query, Node node) {
        return new FS4FillInvoker(searcher, query, fs4ResourcePool, node.hostname(), node.fs4port(), node.key());
    }

    /**
     * Create a {@link FillInvoker} for a the hits in a {@link Result}.
     *
     * @param result the Result containing hits that need to be filled
     * @return Optional containing the FillInvoker or <i>empty</i> if some hit is from an unknown content node
     */
    public Optional<FillInvoker> getFillInvoker(Result result) {
        Collection<Integer> requiredNodes = requiredFillNodes(result);

        Query query = result.getQuery();
        Map<Integer, FillInvoker> invokers = new HashMap<>();
        for (Integer distKey : requiredNodes) {
            Node node = nodesByKey.get(distKey);
            if (node == null) {
                return Optional.empty();
            }
            invokers.put(distKey, getFillInvoker(query, node));
        }

        if (invokers.size() == 1) {
            return Optional.of(invokers.values().iterator().next());
        } else {
            return Optional.of(new InterleavedFillInvoker(invokers));
        }
    }

    private static Collection<Integer> requiredFillNodes(Result result) {
        Set<Integer> requiredNodes = new HashSet<>();
        for (Iterator<Hit> i = result.hits().unorderedDeepIterator(); i.hasNext();) {
            Hit h = i.next();
            if (h instanceof FastHit) {
                FastHit hit = (FastHit) h;
                requiredNodes.add(hit.getDistributionKey());
            }
        }
        return requiredNodes;
    }
}