aboutsummaryrefslogtreecommitdiffstats
path: root/searchcore/src/vespa/searchcore/proton/matching/match_master.cpp
blob: 26555a0b9f0fbf3cfbcb4c5bc35b07912d116084 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.

#include "match_master.h"
#include "docid_range_scheduler.h"
#include "match_loop_communicator.h"
#include "match_thread.h"
#include "match_tools.h"
#include "extract_features.h"
#include "partial_result.h"
#include <vespa/searchlib/engine/trace.h>
#include <vespa/searchlib/engine/searchreply.h>
#include <vespa/vespalib/util/thread_bundle.h>
#include <vespa/vespalib/util/issue.h>
#include <vespa/vespalib/data/slime/inserter.h>
#include <vespa/vespalib/data/slime/cursor.h>

namespace proton::matching {

using namespace search::fef;
using search::queryeval::SearchIterator;
using vespalib::FeatureSet;
using vespalib::ThreadBundle;
using vespalib::Issue;

namespace {

using namespace vespalib::literals;

struct TimedMatchLoopCommunicator final : IMatchLoopCommunicator {
    IMatchLoopCommunicator &communicator;
    vespalib::Timer timer;
    vespalib::duration elapsed;
    TimedMatchLoopCommunicator(IMatchLoopCommunicator &com) : communicator(com), elapsed(vespalib::duration::zero()) {}
    double estimate_match_frequency(const Matches &matches) override {
        return communicator.estimate_match_frequency(matches);
    }
    TaggedHits get_second_phase_work(SortedHitSequence sortedHits, size_t thread_id) override {
        auto result = communicator.get_second_phase_work(sortedHits, thread_id);
        timer = vespalib::Timer();
        return result;
    }
    std::pair<Hits,RangePair> complete_second_phase(TaggedHits my_results, size_t thread_id) override {
        auto result = communicator.complete_second_phase(std::move(my_results), thread_id);
        elapsed = timer.elapsed();
        return result;
    }
};

DocidRangeScheduler::UP
createScheduler(uint32_t numThreads, uint32_t numSearchPartitions, uint32_t numDocs)
{
    if (numSearchPartitions == 0) {
        return std::make_unique<AdaptiveDocidRangeScheduler>(numThreads, 1, numDocs);
    }
    if (numSearchPartitions <= numThreads) {
        return std::make_unique<PartitionDocidRangeScheduler>(numThreads, numDocs);
    }
    return std::make_unique<TaskDocidRangeScheduler>(numThreads, numSearchPartitions, numDocs);
}

template <class FullResult>
auto make_reply(const MatchToolsFactory &mtf, ResultProcessor &processor, ThreadBundle &bundle, FullResult full_result) {
    if (mtf.has_match_features()) {
        auto docs = processor.extract_docid_ordering(*full_result);
        auto reply = processor.makeReply(std::move(std::move(full_result)));
        if ((docs.size() > 0) && reply->_reply) {
            reply->_reply->match_features = ExtractFeatures::get_match_features(mtf, docs, bundle);
        }
        return reply;
    } else {
        return processor.makeReply(std::move(full_result));
    }
}

} // namespace proton::matching::<unnamed>

ResultProcessor::Result::UP
MatchMaster::match(search::engine::Trace & trace,
                   const MatchParams &params,
                   ThreadBundle &threadBundle,
                   const MatchToolsFactory &mtf,
                   ResultProcessor &resultProcessor,
                   uint32_t distributionKey,
                   uint32_t numSearchPartitions)
{
    vespalib::Timer query_latency_time;
    vespalib::DualMergeDirector mergeDirector(threadBundle.size());
    MatchLoopCommunicator communicator(threadBundle.size(), params.heapSize, mtf.createDiversifier(params.heapSize));
    TimedMatchLoopCommunicator timedCommunicator(communicator);
    DocidRangeScheduler::UP scheduler = createScheduler(threadBundle.size(), numSearchPartitions, params.numDocs);

    std::vector<MatchThread::UP> threadState;
    for (size_t i = 0; i < threadBundle.size(); ++i) {
        IMatchLoopCommunicator &com = (i == 0)
            ? static_cast<IMatchLoopCommunicator&>(timedCommunicator)
            : static_cast<IMatchLoopCommunicator&>(communicator);
        threadState.emplace_back(std::make_unique<MatchThread>(i, threadBundle.size(), params, mtf, com, *scheduler,
                                                               resultProcessor, mergeDirector, distributionKey, trace));
    }
    resultProcessor.prepareThreadContextCreation(threadBundle.size());
    threadBundle.run(threadState);
    auto reply = make_reply(mtf, resultProcessor, threadBundle, threadState[0]->extract_result());
    double query_time_s = vespalib::to_s(query_latency_time.elapsed());
    double rerank_time_s = vespalib::to_s(timedCommunicator.elapsed);
    double match_time_s = 0.0;
    auto inserter = trace.make_inserter("query_execution"_ssv);
    for (size_t i = 0; i < threadState.size(); ++i) {
        const MatchThread & matchThread = *threadState[i];
        match_time_s = std::max(match_time_s, matchThread.get_match_time());
        _stats.merge_partition(matchThread.get_thread_stats(), i);
        inserter.handle_thread(matchThread.getTrace());
        matchThread.get_issues().for_each_message([](const auto &msg){ Issue::report(Issue(msg)); });
    }
    _stats.queryLatency(query_time_s);
    _stats.matchTime(match_time_s - rerank_time_s);
    _stats.rerankTime(rerank_time_s);
    _stats.groupingTime(query_time_s - match_time_s);
    _stats.queries(1);
    if (mtf.match_limiter().was_limited()) {
        _stats.limited_queries(1);        
    }
    return reply;
}

}