diff options
author | bjormel <bjormel@yahooinc.com> | 2023-10-01 12:23:12 +0000 |
---|---|---|
committer | bjormel <bjormel@yahooinc.com> | 2023-10-01 12:23:12 +0000 |
commit | e9058b555d4dfea2f6c872d9a677e8678b569569 (patch) | |
tree | fa1b67c6e39712c1e0d9f308b0dd55573b43f913 /searchcore/src/vespa/searchcore/proton/matching | |
parent | 0ad931fa86658904fe9212b014d810236b0e00e4 (diff) | |
parent | 16030193ec04ee41e98779a3d7ee6a6c1d0d0d6f (diff) |
Merge branch 'master' into bjormel/aws-main-controller
Diffstat (limited to 'searchcore/src/vespa/searchcore/proton/matching')
6 files changed, 63 insertions, 35 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/matching/attribute_limiter.cpp b/searchcore/src/vespa/searchcore/proton/matching/attribute_limiter.cpp index 1528b327747..b8027bff04a 100644 --- a/searchcore/src/vespa/searchcore/proton/matching/attribute_limiter.cpp +++ b/searchcore/src/vespa/searchcore/proton/matching/attribute_limiter.cpp @@ -6,6 +6,7 @@ #include <vespa/searchlib/fef/matchdatalayout.h> #include <vespa/searchlib/queryeval/searchable.h> #include <vespa/searchlib/queryeval/blueprint.h> +#include <vespa/searchlib/queryeval/irequestcontext.h> #include <vespa/searchlib/query/tree/range.h> #include <vespa/searchlib/query/tree/simplequery.h> @@ -98,7 +99,7 @@ AttributeLimiter::create_search(size_t want_hits, size_t max_group_size, bool st FieldSpecList field; // single field API is protected field.add(FieldSpec(_attribute_name, my_field_id, my_handle)); _blueprint = _searchable_attributes.createBlueprint(_requestContext, field, node); - _blueprint->fetchPostings(ExecuteInfo::create(strictSearch)); + _blueprint->fetchPostings(ExecuteInfo::create(strictSearch, &_requestContext.getDoom())); _estimatedHits.store(_blueprint->getState().estimate().estHits, std::memory_order_relaxed); _blueprint->freeze(); } diff --git a/searchcore/src/vespa/searchcore/proton/matching/match_thread.cpp b/searchcore/src/vespa/searchcore/proton/matching/match_thread.cpp index b57346611f1..14a238330ba 100644 --- a/searchcore/src/vespa/searchcore/proton/matching/match_thread.cpp +++ b/searchcore/src/vespa/searchcore/proton/matching/match_thread.cpp @@ -222,6 +222,8 @@ MatchThread::match_loop(MatchTools &tools, HitCollector &hits) !docid_range.empty(); docid_range = scheduler.next_range(thread_id)) { + // Due to some schedulers communicating across threads, it is vital that all complete this + // loop. Do not break out. if (!softDoomed) { uint32_t lastCovered = inner_match_loop<Strategy, do_rank, do_limit, do_share_work, use_rank_drop_limit>(context, tools, docid_range); softDoomed = (lastCovered < docid_range.end); @@ -311,6 +313,41 @@ MatchThread::match_loop_helper(MatchTools &tools, HitCollector &hits) } } +void +MatchThread::secondPhase(MatchTools & tools, HitCollector & hits) { + trace->addEvent(4, "Start second phase rerank"); + auto sorted_hit_seq = matchToolsFactory.should_diversify() + ? hits.getSortedHitSequence(matchParams.arraySize) + : hits.getSortedHitSequence(matchParams.heapSize); + trace->addEvent(5, "Synchronize before second phase rerank"); + WaitTimer get_second_phase_work_timer(wait_time_s); + /** + * All, or none of the threads in the bundle should call communicator.get_second_phase_work and + * communicator.complete_second_phase. + * Avoid early return and handle doom with care. + */ + auto my_work = communicator.get_second_phase_work(sorted_hit_seq, thread_id); + get_second_phase_work_timer.done(); + if (tools.getDoom().hard_doom()) { + my_work.clear(); + } + if (!my_work.empty()) { + tools.setup_second_phase(second_phase_profiler.get()); + DocumentScorer scorer(tools.rank_program(), tools.search()); + scorer.score(my_work); + } + thread_stats.docsReRanked(my_work.size()); + trace->addEvent(5, "Synchronize before rank scaling"); + WaitTimer complete_second_phase_timer(wait_time_s); + auto [kept_hits, ranges] = communicator.complete_second_phase(my_work, thread_id); + complete_second_phase_timer.done(); + hits.setReRankedHits(std::move(kept_hits)); + hits.setRanges(ranges); + if (auto onReRankTask = matchToolsFactory.createOnSecondPhaseTask()) { + onReRankTask->run(hits.getReRankedHits()); + } +} + search::ResultSet::UP MatchThread::findMatches(MatchTools &tools) { @@ -332,34 +369,15 @@ MatchThread::findMatches(MatchTools &tools) } HitCollector hits(matchParams.numDocs, matchParams.arraySize); trace->addEvent(4, "Start match and first phase rank"); + /** + * All, or none of the threads in the bundle must execute the match loop. + * The same goes for secondPhase. + * This is due to all the threads in the bundle needs to meet up and exchange information. + * If not you will have deadlock. + */ match_loop_helper(tools, hits); if (tools.has_second_phase_rank()) { - trace->addEvent(4, "Start second phase rerank"); - auto sorted_hit_seq = matchToolsFactory.should_diversify() - ? hits.getSortedHitSequence(matchParams.arraySize) - : hits.getSortedHitSequence(matchParams.heapSize); - trace->addEvent(5, "Synchronize before second phase rerank"); - WaitTimer get_second_phase_work_timer(wait_time_s); - auto my_work = communicator.get_second_phase_work(sorted_hit_seq, thread_id); - get_second_phase_work_timer.done(); - if (tools.getDoom().hard_doom()) { - my_work.clear(); - } - if (!my_work.empty()) { - tools.setup_second_phase(second_phase_profiler.get()); - DocumentScorer scorer(tools.rank_program(), tools.search()); - scorer.score(my_work); - } - thread_stats.docsReRanked(my_work.size()); - trace->addEvent(5, "Synchronize before rank scaling"); - WaitTimer complete_second_phase_timer(wait_time_s); - auto [kept_hits, ranges] = communicator.complete_second_phase(my_work, thread_id); - complete_second_phase_timer.done(); - hits.setReRankedHits(std::move(kept_hits)); - hits.setRanges(ranges); - if (auto onReRankTask = matchToolsFactory.createOnSecondPhaseTask()) { - onReRankTask->run(hits.getReRankedHits()); - } + secondPhase(tools, hits); } trace->addEvent(4, "Create result set"); return hits.getResultSet(fallback_rank_value()); @@ -463,6 +481,11 @@ MatchThread::run() auto capture_issues = vespalib::Issue::listen(my_issues); trace->addEvent(4, "Start MatchThread::run"); MatchTools::UP matchTools = matchToolsFactory.createMatchTools(); + /** + * All, or none of the threads in the bundle must call findMatches. + * All, or none of the threads in the bundle must call mergeDirector.dualMerge. + * Avoid early return and handle doom with care. + */ search::ResultSet::UP result = findMatches(*matchTools); match_time_s = vespalib::to_s(match_time.elapsed()); resultContext = resultProcessor.createThreadContext(matchTools->getDoom(), thread_id, _distributionKey); @@ -475,6 +498,7 @@ MatchThread::run() result->getNumHits(), resultContext->sort->hasSortData(), bool(resultContext->grouping))); + (void) processToken; // Avoid unused warning get_token_timer.done(); trace->addEvent(5, "Start result processing"); processResult(matchTools->getDoom(), std::move(result), *resultContext); diff --git a/searchcore/src/vespa/searchcore/proton/matching/match_thread.h b/searchcore/src/vespa/searchcore/proton/matching/match_thread.h index 03ba34eca1f..ad864a98227 100644 --- a/searchcore/src/vespa/searchcore/proton/matching/match_thread.h +++ b/searchcore/src/vespa/searchcore/proton/matching/match_thread.h @@ -113,6 +113,7 @@ private: void match_loop_helper(MatchTools &tools, HitCollector &hits); search::ResultSet::UP findMatches(MatchTools &tools); + void secondPhase(MatchTools & tools, HitCollector & hits); void processResult(const Doom & doom, search::ResultSet::UP result, ResultProcessor::Context &context); diff --git a/searchcore/src/vespa/searchcore/proton/matching/match_tools.cpp b/searchcore/src/vespa/searchcore/proton/matching/match_tools.cpp index 5ae671b88cb..758ef35ebc9 100644 --- a/searchcore/src/vespa/searchcore/proton/matching/match_tools.cpp +++ b/searchcore/src/vespa/searchcore/proton/matching/match_tools.cpp @@ -201,9 +201,9 @@ MatchToolsFactory(QueryLimiter & queryLimiter, trace.addEvent(5, "Optimize query execution plan"); _query.optimize(); trace.addEvent(4, "Perform dictionary lookups and posting lists initialization"); - _query.fetchPostings(); + _query.fetchPostings(_requestContext.getDoom()); if (is_search) { - _query.handle_global_filter(searchContext.getDocIdLimit(), + _query.handle_global_filter(_requestContext.getDoom(), searchContext.getDocIdLimit(), _attribute_blueprint_params.global_filter_lower_limit, _attribute_blueprint_params.global_filter_upper_limit, thread_bundle, trace); diff --git a/searchcore/src/vespa/searchcore/proton/matching/query.cpp b/searchcore/src/vespa/searchcore/proton/matching/query.cpp index d0738f1857f..22f6ec9cc88 100644 --- a/searchcore/src/vespa/searchcore/proton/matching/query.cpp +++ b/searchcore/src/vespa/searchcore/proton/matching/query.cpp @@ -247,13 +247,14 @@ Query::optimize() } void -Query::fetchPostings() +Query::fetchPostings(const vespalib::Doom & doom) { - _blueprint->fetchPostings(search::queryeval::ExecuteInfo::create(true, 1.0)); + _blueprint->fetchPostings(search::queryeval::ExecuteInfo::create(true, &doom)); } void -Query::handle_global_filter(uint32_t docid_limit, double global_filter_lower_limit, double global_filter_upper_limit, +Query::handle_global_filter(const vespalib::Doom & doom, uint32_t docid_limit, + double global_filter_lower_limit, double global_filter_upper_limit, vespalib::ThreadBundle &thread_bundle, search::engine::Trace& trace) { if (!handle_global_filter(*_blueprint, docid_limit, global_filter_lower_limit, global_filter_upper_limit, thread_bundle, &trace)) { @@ -264,7 +265,7 @@ Query::handle_global_filter(uint32_t docid_limit, double global_filter_lower_lim _blueprint = Blueprint::optimize(std::move(_blueprint)); LOG(debug, "blueprint after handle_global_filter:\n%s\n", _blueprint->asString().c_str()); // strictness may change if optimized order changed: - fetchPostings(); + fetchPostings(doom); } bool diff --git a/searchcore/src/vespa/searchcore/proton/matching/query.h b/searchcore/src/vespa/searchcore/proton/matching/query.h index b0299307e92..1a3136042a7 100644 --- a/searchcore/src/vespa/searchcore/proton/matching/query.h +++ b/searchcore/src/vespa/searchcore/proton/matching/query.h @@ -98,9 +98,10 @@ public: * test to verify the original query without optimization. **/ void optimize(); - void fetchPostings(); + void fetchPostings(const vespalib::Doom & doom); - void handle_global_filter(uint32_t docid_limit, double global_filter_lower_limit, double global_filter_upper_limit, + void handle_global_filter(const vespalib::Doom & doom, uint32_t docid_limit, + double global_filter_lower_limit, double global_filter_upper_limit, vespalib::ThreadBundle &thread_bundle, search::engine::Trace& trace); /** |