diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-11-03 11:11:51 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-11-03 11:11:51 +0100 |
commit | 6150297581695cb6c3c10cc08f1d4bf8e71c43e3 (patch) | |
tree | 6f5d26f21bee82587be63177e227d50d89b28b5f /searchcore | |
parent | b5cd73513c0a8c701829edc572de5095897c818b (diff) | |
parent | 354bfd789c8ca637a7d719e6c17b494710ffd40b (diff) |
Merge pull request #15143 from vespa-engine/havardpe/balance-second-phase-ranking-workload
balance second phase ranking workload
Diffstat (limited to 'searchcore')
10 files changed, 241 insertions, 202 deletions
diff --git a/searchcore/src/tests/proton/matching/docid_range_scheduler/docid_range_scheduler_test.cpp b/searchcore/src/tests/proton/matching/docid_range_scheduler/docid_range_scheduler_test.cpp index 1841d8bb531..4814ab4cb49 100644 --- a/searchcore/src/tests/proton/matching/docid_range_scheduler/docid_range_scheduler_test.cpp +++ b/searchcore/src/tests/proton/matching/docid_range_scheduler/docid_range_scheduler_test.cpp @@ -85,10 +85,6 @@ TEST("require that the docid range splitter gives empty ranges if accessed with TEST("require that the partition scheduler acts as expected") { PartitionDocidRangeScheduler scheduler(4, 16); - TEST_DO(verify_range(scheduler.total_span(0), DocidRange(1, 5))); - TEST_DO(verify_range(scheduler.total_span(1), DocidRange(5, 9))); - TEST_DO(verify_range(scheduler.total_span(2), DocidRange(9, 13))); - TEST_DO(verify_range(scheduler.total_span(3), DocidRange(13, 16))); EXPECT_EQUAL(scheduler.total_size(0), 4u); EXPECT_EQUAL(scheduler.total_size(1), 4u); EXPECT_EQUAL(scheduler.total_size(2), 4u); @@ -106,8 +102,6 @@ TEST("require that the partition scheduler acts as expected") { TEST("require that the partition scheduler protects against documents underflow") { PartitionDocidRangeScheduler scheduler(2, 0); - TEST_DO(verify_range(scheduler.total_span(0), DocidRange(1,1))); - TEST_DO(verify_range(scheduler.total_span(1), DocidRange(1,1))); EXPECT_EQUAL(scheduler.total_size(0), 0u); EXPECT_EQUAL(scheduler.total_size(1), 0u); EXPECT_EQUAL(scheduler.unassigned_size(), 0u); @@ -122,8 +116,6 @@ TEST("require that the partition scheduler protects against documents underflow" TEST("require that the task scheduler acts as expected") { TaskDocidRangeScheduler scheduler(2, 5, 20); EXPECT_EQUAL(scheduler.unassigned_size(), 19u); - TEST_DO(verify_range(scheduler.total_span(0), DocidRange(1, 20))); - TEST_DO(verify_range(scheduler.total_span(1), DocidRange(1, 20))); EXPECT_EQUAL(scheduler.total_size(0), 0u); EXPECT_EQUAL(scheduler.total_size(1), 0u); TEST_DO(verify_range(scheduler.first_range(1), DocidRange(1, 5))); @@ -141,8 +133,6 @@ TEST("require that the task scheduler acts as expected") { TEST("require that the task scheduler protects against documents underflow") { TaskDocidRangeScheduler scheduler(2, 4, 0); - TEST_DO(verify_range(scheduler.total_span(0), DocidRange(1,1))); - TEST_DO(verify_range(scheduler.total_span(1), DocidRange(1,1))); EXPECT_EQUAL(scheduler.total_size(0), 0u); EXPECT_EQUAL(scheduler.total_size(1), 0u); EXPECT_EQUAL(scheduler.unassigned_size(), 0u); @@ -167,13 +157,6 @@ TEST("require that the adaptive scheduler starts by dividing the docid space equ TEST_DO(verify_range(scheduler.first_range(3), DocidRange(13, 16))); } -TEST("require that the adaptive scheduler reports the full span to all threads") { - AdaptiveDocidRangeScheduler scheduler(3, 1, 16); - TEST_DO(verify_range(scheduler.total_span(0), DocidRange(1,16))); - TEST_DO(verify_range(scheduler.total_span(1), DocidRange(1,16))); - TEST_DO(verify_range(scheduler.total_span(2), DocidRange(1,16))); -} - TEST_MT_FF("require that the adaptive scheduler terminates when all workers request more work", 4, AdaptiveDocidRangeScheduler(num_threads, 1, 16), TimeBomb(60)) { diff --git a/searchcore/src/tests/proton/matching/match_loop_communicator/match_loop_communicator_test.cpp b/searchcore/src/tests/proton/matching/match_loop_communicator/match_loop_communicator_test.cpp index f5564ac22a7..0b0f28962c4 100644 --- a/searchcore/src/tests/proton/matching/match_loop_communicator/match_loop_communicator_test.cpp +++ b/searchcore/src/tests/proton/matching/match_loop_communicator/match_loop_communicator_test.cpp @@ -1,48 +1,63 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <vespa/vespalib/testkit/test_kit.h> #include <vespa/searchcore/proton/matching/match_loop_communicator.h> -#include <vespa/vespalib/util/box.h> +#include <algorithm> using namespace proton::matching; -using vespalib::Box; -using vespalib::make_box; - using Range = MatchLoopCommunicator::Range; using RangePair = MatchLoopCommunicator::RangePair; using Matches = MatchLoopCommunicator::Matches; using Hit = MatchLoopCommunicator::Hit; using Hits = MatchLoopCommunicator::Hits; +using TaggedHit = MatchLoopCommunicator::TaggedHit; +using TaggedHits = MatchLoopCommunicator::TaggedHits; using search::queryeval::SortedHitSequence; +std::vector<Hit> hit_vec(std::vector<Hit> list) { return list; } + Hits makeScores(size_t id) { switch (id) { - case 0: return make_box<Hit>({1, 5.4}, {2, 4.4}, {3, 3.4}, {4, 2.4}, {5, 1.4}); - case 1: return make_box<Hit>({11, 5.3}, {12, 4.3}, {13, 3.3}, {14, 2.3}, {15, 1.3}); - case 2: return make_box<Hit>({21, 5.2}, {22, 4.2}, {23, 3.2}, {24, 2.2}, {25, 1.2}); - case 3: return make_box<Hit>({31, 5.1}, {32, 4.1}, {33, 3.1}, {34, 2.1}, {35, 1.1}); - case 4: return make_box<Hit>({41, 5.0}, {42, 4.0}, {43, 3.0}, {44, 2.0}, {45, 1.0}); + case 0: return {{1, 5.4}, {2, 4.4}, {3, 3.4}, {4, 2.4}, {5, 1.4}}; + case 1: return {{11, 5.3}, {12, 4.3}, {13, 3.3}, {14, 2.3}, {15, 1.3}}; + case 2: return {{21, 5.2}, {22, 4.2}, {23, 3.2}, {24, 2.2}, {25, 1.2}}; + case 3: return {{31, 5.1}, {32, 4.1}, {33, 3.1}, {34, 2.1}, {35, 1.1}}; + case 4: return {{41, 5.0}, {42, 4.0}, {43, 3.0}, {44, 2.0}, {45, 1.0}}; } - return Box<Hit>(); + return {}; } -Hits selectBest(MatchLoopCommunicator &com, const Hits &hits) { +std::tuple<size_t,Hits,RangePair> second_phase(MatchLoopCommunicator &com, const Hits &hits, size_t thread_id, double delta = 0.0) { std::vector<uint32_t> refs; for (size_t i = 0; i < hits.size(); ++i) { refs.push_back(i); } - return com.selectBest(SortedHitSequence(&hits[0], &refs[0], refs.size())); + auto my_work = com.get_second_phase_work(SortedHitSequence(&hits[0], &refs[0], refs.size()), thread_id); + // the DocumentScorer used by the match thread will sort on docid here to ensure increasing seek order, this is not needed here + size_t work_size = my_work.size(); + for (auto &[hit, tag]: my_work) { + hit.second += delta; // second phase ranking is first phase + delta + } + auto [best_hits, ranges] = com.complete_second_phase(std::move(my_work), thread_id); + // the HitCollector will sort on docid to prepare for result merging, we do it to simplify comparing with expected results + auto sort_on_docid = [](const auto &a, const auto &b){ return (a.first < b.first); }; + std::sort(best_hits.begin(), best_hits.end(), sort_on_docid); + return {work_size, best_hits, ranges}; } -RangePair makeRanges(size_t id) { - switch (id) { - case 0: return std::make_pair(Range(5, 5), Range(7, 7)); - case 1: return std::make_pair(Range(2, 2), Range(8, 8)); - case 2: return std::make_pair(Range(3, 3), Range(6, 6)); - case 3: return std::make_pair(Range(1, 1), Range(5, 5)); - case 4: return std::make_pair(Range(4, 4), Range(9, 9)); - } - return std::make_pair(Range(-50, -60), Range(60, 50)); +Hits selectBest(MatchLoopCommunicator &com, const Hits &hits, size_t thread_id) { + auto [work_size, best_hits, ranges] = second_phase(com, hits, thread_id); + return best_hits; +} + +RangePair rangeCover(MatchLoopCommunicator &com, const Hits &hits, size_t thread_id, double delta) { + auto [work_size, best_hits, ranges] = second_phase(com, hits, thread_id, delta); + return ranges; +} + +size_t my_work_size(MatchLoopCommunicator &com, const Hits &hits, size_t thread_id) { + auto [work_size, best_hits, ranges] = second_phase(com, hits, thread_id); + return work_size; } void equal(size_t count, const Hits & a, const Hits & b) { @@ -59,114 +74,137 @@ void equal_range(const Range &a, const Range &b) { EXPECT_EQUAL(a.high, b.high); } +void equal_ranges(const RangePair &a, const RangePair &b) { + TEST_DO(equal_range(a.first, b.first)); + TEST_DO(equal_range(a.second, b.second)); +} + struct EveryOdd : public search::queryeval::IDiversifier { bool accepted(uint32_t docId) override { return docId & 0x01; } }; +struct None : public search::queryeval::IDiversifier { + bool accepted(uint32_t) override { return false; } +}; + TEST_F("require that selectBest gives appropriate results for single thread", MatchLoopCommunicator(num_threads, 3)) { - TEST_DO(equal(2u, make_box<Hit>({1, 5}, {2, 4}), selectBest(f1, make_box<Hit>({1, 5}, {2, 4})))); - TEST_DO(equal(3u, make_box<Hit>({1, 5}, {2, 4}, {3, 3}), selectBest(f1, make_box<Hit>({1, 5}, {2, 4}, {3, 3})))); - TEST_DO(equal(3u, make_box<Hit>({1, 5}, {2, 4}, {3, 3}), selectBest(f1, make_box<Hit>({1, 5}, {2, 4}, {3, 3}, {4, 2})))); + TEST_DO(equal(2u, hit_vec({{1, 5}, {2, 4}}), selectBest(f1, hit_vec({{1, 5}, {2, 4}}), thread_id))); + TEST_DO(equal(3u, hit_vec({{1, 5}, {2, 4}, {3, 3}}), selectBest(f1, hit_vec({{1, 5}, {2, 4}, {3, 3}}), thread_id))); + TEST_DO(equal(3u, hit_vec({{1, 5}, {2, 4}, {3, 3}}), selectBest(f1, hit_vec({{1, 5}, {2, 4}, {3, 3}, {4, 2}}), thread_id))); } TEST_F("require that selectBest gives appropriate results for single thread with filter", MatchLoopCommunicator(num_threads, 3, std::make_unique<EveryOdd>())) { - TEST_DO(equal(1u, make_box<Hit>({1, 5}), selectBest(f1, make_box<Hit>({1, 5}, {2, 4})))); - TEST_DO(equal(2u, make_box<Hit>({1, 5}, {3, 3}), selectBest(f1, make_box<Hit>({1, 5}, {2, 4}, {3, 3})))); - TEST_DO(equal(3u, make_box<Hit>({1, 5}, {3, 3}, {5, 1}), selectBest(f1, make_box<Hit>({1, 5}, {2, 4}, {3, 3}, {4, 2}, {5, 1}, {6, 0})))); + TEST_DO(equal(1u, hit_vec({{1, 5}}), selectBest(f1, hit_vec({{1, 5}, {2, 4}}), thread_id))); + TEST_DO(equal(2u, hit_vec({{1, 5}, {3, 3}}), selectBest(f1, hit_vec({{1, 5}, {2, 4}, {3, 3}}), thread_id))); + TEST_DO(equal(3u, hit_vec({{1, 5}, {3, 3}, {5, 1}}), selectBest(f1, hit_vec({{1, 5}, {2, 4}, {3, 3}, {4, 2}, {5, 1}, {6, 0}}), thread_id))); } TEST_MT_F("require that selectBest works with no hits", 10, MatchLoopCommunicator(num_threads, 10)) { - EXPECT_TRUE(selectBest(f1, Box<Hit>()).empty()); + EXPECT_TRUE(selectBest(f1, hit_vec({}), thread_id).empty()); } TEST_MT_F("require that selectBest works with too many hits from all threads", 5, MatchLoopCommunicator(num_threads, 13)) { if (thread_id < 3) { - TEST_DO(equal(3u, makeScores(thread_id), selectBest(f1, makeScores(thread_id)))); + TEST_DO(equal(3u, makeScores(thread_id), selectBest(f1, makeScores(thread_id), thread_id))); } else { - TEST_DO(equal(2u, makeScores(thread_id), selectBest(f1, makeScores(thread_id)))); + TEST_DO(equal(2u, makeScores(thread_id), selectBest(f1, makeScores(thread_id), thread_id))); } } TEST_MT_F("require that selectBest works with some exhausted threads", 5, MatchLoopCommunicator(num_threads, 22)) { if (thread_id < 2) { - TEST_DO(equal(5u, makeScores(thread_id), selectBest(f1, makeScores(thread_id)))); + TEST_DO(equal(5u, makeScores(thread_id), selectBest(f1, makeScores(thread_id), thread_id))); } else { - TEST_DO(equal(4u, makeScores(thread_id), selectBest(f1, makeScores(thread_id)))); + TEST_DO(equal(4u, makeScores(thread_id), selectBest(f1, makeScores(thread_id), thread_id))); } } TEST_MT_F("require that selectBest can select all hits from all threads", 5, MatchLoopCommunicator(num_threads, 100)) { - EXPECT_EQUAL(5u, selectBest(f1, makeScores(thread_id)).size()); + EXPECT_EQUAL(5u, selectBest(f1, makeScores(thread_id), thread_id).size()); } TEST_MT_F("require that selectBest works with some empty threads", 10, MatchLoopCommunicator(num_threads, 7)) { if (thread_id < 2) { - TEST_DO(equal(2u, makeScores(thread_id), selectBest(f1, makeScores(thread_id)))); + TEST_DO(equal(2u, makeScores(thread_id), selectBest(f1, makeScores(thread_id), thread_id))); } else if (thread_id < 5) { - TEST_DO(equal(1u, makeScores(thread_id), selectBest(f1, makeScores(thread_id)))); + TEST_DO(equal(1u, makeScores(thread_id), selectBest(f1, makeScores(thread_id), thread_id))); } else { - EXPECT_TRUE(selectBest(f1, makeScores(thread_id)).empty()); + EXPECT_TRUE(selectBest(f1, makeScores(thread_id), thread_id).empty()); } } -TEST_F("require that rangeCover is identity function for single thread", MatchLoopCommunicator(num_threads, 5)) { - RangePair res = f1.rangeCover(std::make_pair(Range(2, 4), Range(3, 5))); - TEST_DO(equal_range(Range(2, 4), res.first)); - TEST_DO(equal_range(Range(3, 5), res.second)); +TEST_F("require that rangeCover works with a single thread", MatchLoopCommunicator(num_threads, 5)) { + RangePair res = rangeCover(f1, hit_vec({{1, 7.5}, {2, 1.5}}), thread_id, 10); + TEST_DO(equal_ranges(RangePair({1.5, 7.5}, {11.5, 17.5}), res)); } -TEST_MT_F("require that rangeCover can mix ranges from multiple threads", 5, MatchLoopCommunicator(num_threads, 5)) { - RangePair res = f1.rangeCover(makeRanges(thread_id)); - TEST_DO(equal_range(Range(1, 5), res.first)); - TEST_DO(equal_range(Range(5, 9), res.second)); +TEST_MT_F("require that rangeCover works with multiple threads", 5, MatchLoopCommunicator(num_threads, 10)) { + RangePair res = rangeCover(f1, hit_vec({{thread_id * 100 + 1, 100.0 + thread_id}, {thread_id * 100 + 2, 100.0 - thread_id}}), thread_id, 10); + TEST_DO(equal_ranges(RangePair({96.0, 104.0}, {106.0, 114.0}), res)); } -TEST_MT_F("require that invalid ranges are ignored", 10, MatchLoopCommunicator(num_threads, 5)) { - RangePair res = f1.rangeCover(makeRanges(thread_id)); - TEST_DO(equal_range(Range(1, 5), res.first)); - TEST_DO(equal_range(Range(5, 9), res.second)); +TEST_MT_F("require that rangeCover works with no hits", 10, MatchLoopCommunicator(num_threads, 5)) { + RangePair res = rangeCover(f1, hit_vec({}), thread_id, 10); + TEST_DO(equal_ranges(RangePair({}, {}), res)); } -TEST_MT_F("require that only invalid ranges produce default invalid range", 3, MatchLoopCommunicator(num_threads, 5)) { - RangePair res = f1.rangeCover(makeRanges(10)); - Range expect; - TEST_DO(equal_range(expect, res.first)); - TEST_DO(equal_range(expect, res.second)); -} - -TEST_F("require that hits dropped due to lack of diversity affects range cover result", - MatchLoopCommunicator(num_threads, 3, std::make_unique<EveryOdd>())) +TEST_FFF("require that hits dropped due to lack of diversity affects range cover result", + MatchLoopCommunicator(num_threads, 3), + MatchLoopCommunicator(num_threads, 3, std::make_unique<EveryOdd>()), + MatchLoopCommunicator(num_threads, 3, std::make_unique<None>())) { - TEST_DO(equal(3u, make_box<Hit>({1, 5}, {3, 3}, {5, 1}), selectBest(f1, make_box<Hit>({1, 5}, {2, 4}, {3, 3}, {4, 2}, {5, 1})))); - // best dropped: 4 - std::vector<RangePair> input = { - std::make_pair(Range(), Range()), - std::make_pair(Range(3, 5), Range(1, 10)), - std::make_pair(Range(5, 10), Range(1, 10)), - std::make_pair(Range(1, 3), Range(1, 10)) - }; - std::vector<RangePair> expect = { - std::make_pair(Range(), Range()), - std::make_pair(Range(4, 5), Range(1, 10)), - std::make_pair(Range(5, 10), Range(1, 10)), - std::make_pair(Range(4, 4), Range(1, 10)) - }; - ASSERT_EQUAL(input.size(), expect.size()); - for (size_t i = 0; i < input.size(); ++i) { - auto output = f1.rangeCover(input[i]); - TEST_STATE(vespalib::make_string("case: %zu", i).c_str()); - TEST_DO(equal_range(expect[i].first, output.first)); - TEST_DO(equal_range(expect[i].second, output.second)); - } + auto hits_in = hit_vec({{1, 5}, {2, 4}, {3, 3}, {4, 2}, {5, 1}}); + auto [my_work1, hits1, ranges1] = second_phase(f1, hits_in, thread_id, 10); + auto [my_work2, hits2, ranges2] = second_phase(f2, hits_in, thread_id, 10); + auto [my_work3, hits3, ranges3] = second_phase(f3, hits_in, thread_id, 10); + + EXPECT_EQUAL(my_work1, 3u); + EXPECT_EQUAL(my_work2, 3u); + EXPECT_EQUAL(my_work3, 0u); + + TEST_DO(equal(3u, hit_vec({{1, 15}, {2, 14}, {3, 13}}), hits1)); + TEST_DO(equal(3u, hit_vec({{1, 15}, {3, 13}, {5, 11}}), hits2)); + TEST_DO(equal(0u, hit_vec({}), hits3)); + + TEST_DO(equal_ranges(RangePair({3,5},{13,15}), ranges1)); + TEST_DO(equal_ranges(RangePair({4,5},{11,15}), ranges2)); // best dropped: 4 + + // note that the 'drops all hits due to diversity' case will + // trigger much of the same code path as dropping second phase + // ranking due to hard doom. + + TEST_DO(equal_ranges(RangePair({},{}), ranges3)); } -TEST_MT_F("require that count_matches will count hits and docs across threads", 4, MatchLoopCommunicator(num_threads, 5)) { +TEST_MT_F("require that estimate_match_frequency will count hits and docs across threads", 4, MatchLoopCommunicator(num_threads, 5)) { double freq = (0.0/10.0 + 1.0/11.0 + 2.0/12.0 + 3.0/13.0) / 4.0; EXPECT_APPROX(freq, f1.estimate_match_frequency(Matches(thread_id, thread_id + 10)), 0.00001); } +TEST_MT_F("require that second phase work is evenly distributed among search threads", 5, MatchLoopCommunicator(num_threads, 20)) { + size_t num_hits = thread_id * 5; + size_t docid = thread_id * 100; + double score = thread_id * 100.0; + Hits my_hits; + for(size_t i = 0; i < num_hits; ++i) { + my_hits.emplace_back(++docid, score); + score -= 1.0; + } + auto [my_work, best_hits, ranges] = second_phase(f1, my_hits, thread_id, 1000.0); + EXPECT_EQUAL(my_work, 4u); + TEST_DO(equal_ranges(RangePair({381,400},{1381,1400}), ranges)); + if (thread_id == 4) { + for (auto &hit: my_hits) { + hit.second += 1000.0; + } + TEST_DO(equal(num_hits, my_hits, best_hits)); + } else { + EXPECT_TRUE(best_hits.empty()); + } +} + TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/searchcore/src/vespa/searchcore/proton/matching/docid_range_scheduler.h b/searchcore/src/vespa/searchcore/proton/matching/docid_range_scheduler.h index e6fce89e82d..19e5ec0343d 100644 --- a/searchcore/src/vespa/searchcore/proton/matching/docid_range_scheduler.h +++ b/searchcore/src/vespa/searchcore/proton/matching/docid_range_scheduler.h @@ -71,17 +71,15 @@ public: * 'next_range' function. When a worker is assigned an empty range, * its work is done. * - * The 'total_span' function returns a range that is guaranteed to - * contain all ranges assigned to the given worker. The 'total_size' - * function returns the accumulated size of all ranges assigned to the - * given worker. The 'unassigned_size' function returns the - * accumulated size of all currently unassigned ranges. + * The 'total_size' function returns the accumulated size of all + * ranges assigned to the given worker. The 'unassigned_size' function + * returns the accumulated size of all currently unassigned ranges. * - * Note that the return values from 'total_span', 'total_size' and - * 'unassigned_size' may or may not account for the range returned - * from 'first_range' since the scheduler is allowed to pre-assign - * ranges to workers. Calling 'first_range' first ensures that all - * other return values make sense. + * Note that the return values from 'total_size' and 'unassigned_size' + * may or may not account for the range returned from 'first_range' + * since the scheduler is allowed to pre-assign ranges to + * workers. Calling 'first_range' first ensures that all other return + * values make sense. * * The 'idle_observer' and 'share_range' functions are used for * work-sharing, where a worker thread potentially can offload some of @@ -109,7 +107,6 @@ struct DocidRangeScheduler { typedef std::unique_ptr<DocidRangeScheduler> UP; virtual DocidRange first_range(size_t thread_id) = 0; virtual DocidRange next_range(size_t thread_id) = 0; - virtual DocidRange total_span(size_t thread_id) const = 0; virtual size_t total_size(size_t thread_id) const = 0; virtual size_t unassigned_size() const = 0; virtual IdleObserver make_idle_observer() const = 0; @@ -130,7 +127,6 @@ public: PartitionDocidRangeScheduler(size_t num_threads, uint32_t docid_limit); DocidRange first_range(size_t thread_id) override { return _ranges[thread_id]; } DocidRange next_range(size_t) override { return DocidRange(); } - DocidRange total_span(size_t thread_id) const override { return _ranges[thread_id]; } size_t total_size(size_t thread_id) const override { return _ranges[thread_id].size(); } size_t unassigned_size() const override { return 0; } IdleObserver make_idle_observer() const override { return IdleObserver(); } @@ -157,7 +153,6 @@ public: TaskDocidRangeScheduler(size_t num_threads, size_t num_tasks, uint32_t docid_limit); DocidRange first_range(size_t thread_id) override { return next_task(thread_id); } DocidRange next_range(size_t thread_id) override { return next_task(thread_id); } - DocidRange total_span(size_t) const override { return _splitter.full_range(); } size_t total_size(size_t thread_id) const override { return _assigned[thread_id]; } size_t unassigned_size() const override { return _unassigned.load(std::memory_order::memory_order_relaxed); } IdleObserver make_idle_observer() const override { return IdleObserver(); } @@ -197,7 +192,6 @@ public: ~AdaptiveDocidRangeScheduler(); DocidRange first_range(size_t thread_id) override; DocidRange next_range(size_t thread_id) override; - DocidRange total_span(size_t) const override { return _splitter.full_range(); } size_t total_size(size_t thread_id) const override { return _assigned[thread_id]; } size_t unassigned_size() const override { return 0; } IdleObserver make_idle_observer() const override { return IdleObserver(_num_idle); } diff --git a/searchcore/src/vespa/searchcore/proton/matching/document_scorer.cpp b/searchcore/src/vespa/searchcore/proton/matching/document_scorer.cpp index c7721b428b9..608e8adf5c2 100644 --- a/searchcore/src/vespa/searchcore/proton/matching/document_scorer.cpp +++ b/searchcore/src/vespa/searchcore/proton/matching/document_scorer.cpp @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "document_scorer.h" +#include <algorithm> #include <cassert> using search::feature_t; @@ -30,10 +31,14 @@ DocumentScorer::DocumentScorer(RankProgram &rankProgram, { } -feature_t -DocumentScorer::score(uint32_t docId) +void +DocumentScorer::score(TaggedHits &hits) { - return doScore(docId); + auto sort_on_docid = [](const TaggedHit &a, const TaggedHit &b){ return (a.first.first < b.first.first); }; + std::sort(hits.begin(), hits.end(), sort_on_docid); + for (auto &hit: hits) { + hit.first.second = doScore(hit.first.first); + } } } diff --git a/searchcore/src/vespa/searchcore/proton/matching/document_scorer.h b/searchcore/src/vespa/searchcore/proton/matching/document_scorer.h index f29f70b2cfa..63ee00c3412 100644 --- a/searchcore/src/vespa/searchcore/proton/matching/document_scorer.h +++ b/searchcore/src/vespa/searchcore/proton/matching/document_scorer.h @@ -2,24 +2,28 @@ #pragma once +#include "i_match_loop_communicator.h" #include <vespa/searchlib/fef/rank_program.h> -#include <vespa/searchlib/queryeval/hitcollector.h> #include <vespa/searchlib/queryeval/searchiterator.h> namespace proton::matching { /** * Class used to calculate the rank score for a set of documents using - * a rank program for calculation and a search iterator for unpacking match data. - * The calculateScore() function is always called in increasing docId order. + * a rank program for calculation and a search iterator for unpacking + * match data. The doScore function must be called with increasing + * docid. */ -class DocumentScorer : public search::queryeval::HitCollector::DocumentScorer +class DocumentScorer { private: search::queryeval::SearchIterator &_searchItr; search::fef::LazyValue _scoreFeature; public: + using TaggedHit = IMatchLoopCommunicator::TaggedHit; + using TaggedHits = IMatchLoopCommunicator::TaggedHits; + DocumentScorer(search::fef::RankProgram &rankProgram, search::queryeval::SearchIterator &searchItr); @@ -28,7 +32,8 @@ public: return _scoreFeature.as_number(docId); } - virtual search::feature_t score(uint32_t docId) override; + // annotate hits with rank score, may change order + void score(TaggedHits &hits); }; } diff --git a/searchcore/src/vespa/searchcore/proton/matching/i_match_loop_communicator.h b/searchcore/src/vespa/searchcore/proton/matching/i_match_loop_communicator.h index 15ca9921524..c9a4a61f9c5 100644 --- a/searchcore/src/vespa/searchcore/proton/matching/i_match_loop_communicator.h +++ b/searchcore/src/vespa/searchcore/proton/matching/i_match_loop_communicator.h @@ -17,6 +17,8 @@ struct IMatchLoopCommunicator { using SortedHitSequence = search::queryeval::SortedHitSequence; using Hit = SortedHitSequence::Hit; using Hits = std::vector<Hit>; + using TaggedHit = std::pair<Hit,size_t>; + using TaggedHits = std::vector<TaggedHit>; struct Matches { size_t hits; size_t docs; @@ -28,8 +30,8 @@ struct IMatchLoopCommunicator { } }; virtual double estimate_match_frequency(const Matches &matches) = 0; - virtual Hits selectBest(SortedHitSequence sortedHits) = 0; - virtual RangePair rangeCover(const RangePair &ranges) = 0; + virtual TaggedHits get_second_phase_work(SortedHitSequence sortedHits, size_t thread_id) = 0; + virtual std::pair<Hits,RangePair> complete_second_phase(TaggedHits my_results, size_t thread_id) = 0; virtual ~IMatchLoopCommunicator() {} }; diff --git a/searchcore/src/vespa/searchcore/proton/matching/match_loop_communicator.cpp b/searchcore/src/vespa/searchcore/proton/matching/match_loop_communicator.cpp index 07a8b224b89..4db26f6308a 100644 --- a/searchcore/src/vespa/searchcore/proton/matching/match_loop_communicator.cpp +++ b/searchcore/src/vespa/searchcore/proton/matching/match_loop_communicator.cpp @@ -9,10 +9,11 @@ MatchLoopCommunicator::MatchLoopCommunicator(size_t threads, size_t topN) : MatchLoopCommunicator(threads, topN, std::unique_ptr<IDiversifier>()) {} MatchLoopCommunicator::MatchLoopCommunicator(size_t threads, size_t topN, std::unique_ptr<IDiversifier> diversifier) - : _best_dropped(), + : _best_scores(), + _best_dropped(), _estimate_match_frequency(threads), - _selectBest(threads, topN, _best_dropped, std::move(diversifier)), - _rangeCover(threads, _best_dropped) + _get_second_phase_work(threads, topN, _best_scores, _best_dropped, std::move(diversifier)), + _complete_second_phase(threads, topN, _best_scores, _best_dropped) {} MatchLoopCommunicator::~MatchLoopCommunicator() = default; @@ -33,25 +34,30 @@ MatchLoopCommunicator::EstimateMatchFrequency::mingle() } } -MatchLoopCommunicator::SelectBest::SelectBest(size_t n, size_t topN_in, BestDropped &best_dropped_in, std::unique_ptr<IDiversifier> diversifier) - : vespalib::Rendezvous<SortedHitSequence, Hits>(n), +MatchLoopCommunicator::GetSecondPhaseWork::GetSecondPhaseWork(size_t n, size_t topN_in, Range &best_scores_in, BestDropped &best_dropped_in, std::unique_ptr<IDiversifier> diversifier) + : vespalib::Rendezvous<SortedHitSequence, TaggedHits, true>(n), topN(topN_in), + best_scores(best_scores_in), best_dropped(best_dropped_in), _diversifier(std::move(diversifier)) {} -MatchLoopCommunicator::SelectBest::~SelectBest() = default; +MatchLoopCommunicator::GetSecondPhaseWork::~GetSecondPhaseWork() = default; template<typename Q, typename F> void -MatchLoopCommunicator::SelectBest::mingle(Q &queue, F &&accept) +MatchLoopCommunicator::GetSecondPhaseWork::mingle(Q &queue, F &&accept) { - best_dropped.valid = false; - for (size_t picked = 0; picked < topN && !queue.empty(); ) { + size_t picked = 0; + search::feature_t last_score = 0.0; + while ((picked < topN) && !queue.empty()) { uint32_t i = queue.front(); const Hit & hit = in(i).get(); if (accept(hit.first)) { - out(i).push_back(hit); - ++picked; + out(picked % size()).emplace_back(hit, i); + last_score = hit.second; + if (++picked == 1) { + best_scores.high = hit.second; + } } else if (!best_dropped.valid) { best_dropped.valid = true; best_dropped.score = hit.second; @@ -63,16 +69,21 @@ MatchLoopCommunicator::SelectBest::mingle(Q &queue, F &&accept) queue.pop_front(); } } + if (picked > 0) { + best_scores.low = last_score; + } } void -MatchLoopCommunicator::SelectBest::mingle() +MatchLoopCommunicator::GetSecondPhaseWork::mingle() { - size_t est_out = (topN / size()) + 16; + best_scores = Range(); + best_dropped.valid = false; + size_t est_out = (topN / size()) + 1; vespalib::PriorityQueue<uint32_t, SelectCmp> queue(SelectCmp(*this)); for (size_t i = 0; i < size(); ++i) { + out(i).reserve(est_out); if (in(i).valid()) { - out(i).reserve(est_out); queue.push(i); } } @@ -84,28 +95,26 @@ MatchLoopCommunicator::SelectBest::mingle() } void -MatchLoopCommunicator::RangeCover::mingle() +MatchLoopCommunicator::CompleteSecondPhase::mingle() { - size_t i = 0; - while (i < size() && (!in(i).first.isValid() || !in(i).second.isValid())) { - ++i; + RangePair score_ranges(best_scores, Range()); + Range &new_scores = score_ranges.second; + size_t est_out = (topN / size()) + 16; + for (size_t i = 0; i < size(); ++i) { + out(i).first.reserve(est_out); } - if (i < size()) { - RangePair result = in(i++); - for (; i < size(); ++i) { - if (in(i).first.isValid() && in(i).second.isValid()) { - result.first.low = std::min(result.first.low, in(i).first.low); - result.first.high = std::max(result.first.high, in(i).first.high); - result.second.low = std::min(result.second.low, in(i).second.low); - result.second.high = std::max(result.second.high, in(i).second.high); - } + for (size_t i = 0; i < size(); ++i) { + for (const auto &[hit, tag]: in(i)) { + out(tag).first.push_back(hit); + new_scores.update(hit.second); } + } + if (score_ranges.first.isValid() && score_ranges.second.isValid()) { if (best_dropped.valid) { - result.first.low = std::max(result.first.low, best_dropped.score); - result.first.high = std::max(result.first.low, result.first.high); + score_ranges.first.low = std::max(score_ranges.first.low, best_dropped.score); } - for (size_t j = 0; j < size(); ++j) { - out(j) = result; + for (size_t i = 0; i < size(); ++i) { + out(i).second = score_ranges; } } } diff --git a/searchcore/src/vespa/searchcore/proton/matching/match_loop_communicator.h b/searchcore/src/vespa/searchcore/proton/matching/match_loop_communicator.h index 425197fac3b..42dd82587fa 100644 --- a/searchcore/src/vespa/searchcore/proton/matching/match_loop_communicator.h +++ b/searchcore/src/vespa/searchcore/proton/matching/match_loop_communicator.h @@ -20,12 +20,13 @@ private: EstimateMatchFrequency(size_t n) : vespalib::Rendezvous<Matches, double>(n) {} void mingle() override; }; - struct SelectBest : vespalib::Rendezvous<SortedHitSequence, Hits> { + struct GetSecondPhaseWork : vespalib::Rendezvous<SortedHitSequence, TaggedHits, true> { size_t topN; + Range &best_scores; BestDropped &best_dropped; std::unique_ptr<IDiversifier> _diversifier; - SelectBest(size_t n, size_t topN_in, BestDropped &best_dropped_in, std::unique_ptr<IDiversifier>); - ~SelectBest() override; + GetSecondPhaseWork(size_t n, size_t topN_in, Range &best_scores_in, BestDropped &best_dropped_in, std::unique_ptr<IDiversifier>); + ~GetSecondPhaseWork() override; void mingle() override; template<typename Q, typename F> void mingle(Q &queue, F &&accept); @@ -34,23 +35,27 @@ private: } }; struct SelectCmp { - SelectBest &sb; - SelectCmp(SelectBest &sb_in) : sb(sb_in) {} + GetSecondPhaseWork &sb; + SelectCmp(GetSecondPhaseWork &sb_in) : sb(sb_in) {} bool operator()(uint32_t a, uint32_t b) const { return (sb.cmp(a, b)); } }; - struct RangeCover : vespalib::Rendezvous<RangePair, RangePair> { - BestDropped &best_dropped; - RangeCover(size_t n, BestDropped &best_dropped_in) - : vespalib::Rendezvous<RangePair, RangePair>(n), best_dropped(best_dropped_in) {} + struct CompleteSecondPhase : vespalib::Rendezvous<TaggedHits, std::pair<Hits,RangePair>, true> { + size_t topN; + const Range &best_scores; + const BestDropped &best_dropped; + CompleteSecondPhase(size_t n, size_t topN_in, const Range &best_scores_in, const BestDropped &best_dropped_in) + : vespalib::Rendezvous<TaggedHits, std::pair<Hits,RangePair>, true>(n), + topN(topN_in), best_scores(best_scores_in), best_dropped(best_dropped_in) {} void mingle() override; }; - BestDropped _best_dropped; - EstimateMatchFrequency _estimate_match_frequency; - SelectBest _selectBest; - RangeCover _rangeCover; + Range _best_scores; + BestDropped _best_dropped; + EstimateMatchFrequency _estimate_match_frequency; + GetSecondPhaseWork _get_second_phase_work; + CompleteSecondPhase _complete_second_phase; public: MatchLoopCommunicator(size_t threads, size_t topN); @@ -60,11 +65,13 @@ public: double estimate_match_frequency(const Matches &matches) override { return _estimate_match_frequency.rendezvous(matches); } - Hits selectBest(SortedHitSequence sortedHits) override { - return _selectBest.rendezvous(sortedHits); + + TaggedHits get_second_phase_work(SortedHitSequence sortedHits, size_t thread_id) override { + return _get_second_phase_work.rendezvous(sortedHits, thread_id); } - RangePair rangeCover(const RangePair &ranges) override { - return _rangeCover.rendezvous(ranges); + + std::pair<Hits,RangePair> complete_second_phase(TaggedHits my_results, size_t thread_id) override { + return _complete_second_phase.rendezvous(std::move(my_results), thread_id); } }; diff --git a/searchcore/src/vespa/searchcore/proton/matching/match_master.cpp b/searchcore/src/vespa/searchcore/proton/matching/match_master.cpp index 3cbf88facd5..6c86f6c2d1c 100644 --- a/searchcore/src/vespa/searchcore/proton/matching/match_master.cpp +++ b/searchcore/src/vespa/searchcore/proton/matching/match_master.cpp @@ -32,13 +32,13 @@ struct TimedMatchLoopCommunicator : IMatchLoopCommunicator { double estimate_match_frequency(const Matches &matches) override { return communicator.estimate_match_frequency(matches); } - Hits selectBest(SortedHitSequence sortedHits) override { - auto result = communicator.selectBest(sortedHits); + TaggedHits get_second_phase_work(SortedHitSequence sortedHits, size_t thread_id) override { + auto result = communicator.get_second_phase_work(sortedHits, thread_id); timer = vespalib::Timer(); return result; } - RangePair rangeCover(const RangePair &ranges) override { - RangePair result = communicator.rangeCover(ranges); + std::pair<Hits,RangePair> complete_second_phase(TaggedHits my_results, size_t thread_id) override { + auto result = communicator.complete_second_phase(std::move(my_results), thread_id); elapsed = timer.elapsed(); return result; } diff --git a/searchcore/src/vespa/searchcore/proton/matching/match_thread.cpp b/searchcore/src/vespa/searchcore/proton/matching/match_thread.cpp index 0e80d31a063..5cb4394880f 100644 --- a/searchcore/src/vespa/searchcore/proton/matching/match_thread.cpp +++ b/searchcore/src/vespa/searchcore/proton/matching/match_thread.cpp @@ -202,8 +202,8 @@ MatchThread::match_loop(MatchTools &tools, HitCollector &hits) uint32_t matches = context.matches; if (do_limit && context.isBelowLimit()) { const size_t searchedSoFar = scheduler.total_size(thread_id); - LOG(debug, "Limit not reached (had %d) at docid=%d which is after %zu docs.", - matches, scheduler.total_span(thread_id).end, searchedSoFar); + LOG(debug, "Limit not reached (had %d) after %zu docs.", + matches, searchedSoFar); estimate_match_frequency(matches, searchedSoFar); tools.match_limiter().updateDocIdSpaceEstimate(searchedSoFar, 0); } @@ -293,35 +293,31 @@ MatchThread::findMatches(MatchTools &tools) trace->addEvent(4, "Start match and first phase rank"); match_loop_helper(tools, hits); if (tools.has_second_phase_rank()) { - { // 2nd phase ranking - trace->addEvent(4, "Start second phase rerank"); - tools.setup_second_phase(); - DocidRange docid_range = scheduler.total_span(thread_id); - tools.search().initRange(docid_range.begin, docid_range.end); - auto sorted_hit_seq = matchToolsFactory.should_diversify() - ? hits.getSortedHitSequence(matchParams.arraySize) - : hits.getSortedHitSequence(matchParams.heapSize); - trace->addEvent(5, "Synchronize before second phase rerank"); - WaitTimer select_best_timer(wait_time_s); - auto kept_hits = communicator.selectBest(sorted_hit_seq); - select_best_timer.done(); - DocumentScorer scorer(tools.rank_program(), tools.search()); - if (tools.getDoom().hard_doom()) { - kept_hits.clear(); - } - uint32_t reRanked = hits.reRank(scorer, std::move(kept_hits)); - if (auto onReRankTask = matchToolsFactory.createOnReRankTask()) { - onReRankTask->run(hits.getReRankedHits()); - } - thread_stats.docsReRanked(reRanked); + trace->addEvent(4, "Start second phase rerank"); + tools.setup_second_phase(); + DocidRange docid_range(1, matchParams.numDocs); + tools.search().initRange(docid_range.begin, docid_range.end); + auto sorted_hit_seq = matchToolsFactory.should_diversify() + ? hits.getSortedHitSequence(matchParams.arraySize) + : hits.getSortedHitSequence(matchParams.heapSize); + trace->addEvent(5, "Synchronize before second phase rerank"); + WaitTimer get_second_phase_work_timer(wait_time_s); + auto my_work = communicator.get_second_phase_work(sorted_hit_seq, thread_id); + get_second_phase_work_timer.done(); + DocumentScorer scorer(tools.rank_program(), tools.search()); + if (tools.getDoom().hard_doom()) { + my_work.clear(); } - { // rank scaling - trace->addEvent(5, "Synchronize before rank scaling"); - auto my_ranges = hits.getRanges(); - WaitTimer range_cover_timer(wait_time_s); - auto ranges = communicator.rangeCover(my_ranges); - range_cover_timer.done(); - hits.setRanges(ranges); + scorer.score(my_work); + thread_stats.docsReRanked(my_work.size()); + trace->addEvent(5, "Synchronize before rank scaling"); + WaitTimer complete_second_phase_timer(wait_time_s); + auto [kept_hits, ranges] = communicator.complete_second_phase(my_work, thread_id); + complete_second_phase_timer.done(); + hits.setReRankedHits(std::move(kept_hits)); + hits.setRanges(ranges); + if (auto onReRankTask = matchToolsFactory.createOnReRankTask()) { + onReRankTask->run(hits.getReRankedHits()); } } trace->addEvent(4, "Create result set"); |