aboutsummaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-11-03 11:11:51 +0100
committerGitHub <noreply@github.com>2020-11-03 11:11:51 +0100
commit6150297581695cb6c3c10cc08f1d4bf8e71c43e3 (patch)
tree6f5d26f21bee82587be63177e227d50d89b28b5f /searchcore
parentb5cd73513c0a8c701829edc572de5095897c818b (diff)
parent354bfd789c8ca637a7d719e6c17b494710ffd40b (diff)
Merge pull request #15143 from vespa-engine/havardpe/balance-second-phase-ranking-workload
balance second phase ranking workload
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/tests/proton/matching/docid_range_scheduler/docid_range_scheduler_test.cpp17
-rw-r--r--searchcore/src/tests/proton/matching/match_loop_communicator/match_loop_communicator_test.cpp196
-rw-r--r--searchcore/src/vespa/searchcore/proton/matching/docid_range_scheduler.h22
-rw-r--r--searchcore/src/vespa/searchcore/proton/matching/document_scorer.cpp11
-rw-r--r--searchcore/src/vespa/searchcore/proton/matching/document_scorer.h15
-rw-r--r--searchcore/src/vespa/searchcore/proton/matching/i_match_loop_communicator.h6
-rw-r--r--searchcore/src/vespa/searchcore/proton/matching/match_loop_communicator.cpp71
-rw-r--r--searchcore/src/vespa/searchcore/proton/matching/match_loop_communicator.h41
-rw-r--r--searchcore/src/vespa/searchcore/proton/matching/match_master.cpp8
-rw-r--r--searchcore/src/vespa/searchcore/proton/matching/match_thread.cpp56
10 files changed, 241 insertions, 202 deletions
diff --git a/searchcore/src/tests/proton/matching/docid_range_scheduler/docid_range_scheduler_test.cpp b/searchcore/src/tests/proton/matching/docid_range_scheduler/docid_range_scheduler_test.cpp
index 1841d8bb531..4814ab4cb49 100644
--- a/searchcore/src/tests/proton/matching/docid_range_scheduler/docid_range_scheduler_test.cpp
+++ b/searchcore/src/tests/proton/matching/docid_range_scheduler/docid_range_scheduler_test.cpp
@@ -85,10 +85,6 @@ TEST("require that the docid range splitter gives empty ranges if accessed with
TEST("require that the partition scheduler acts as expected") {
PartitionDocidRangeScheduler scheduler(4, 16);
- TEST_DO(verify_range(scheduler.total_span(0), DocidRange(1, 5)));
- TEST_DO(verify_range(scheduler.total_span(1), DocidRange(5, 9)));
- TEST_DO(verify_range(scheduler.total_span(2), DocidRange(9, 13)));
- TEST_DO(verify_range(scheduler.total_span(3), DocidRange(13, 16)));
EXPECT_EQUAL(scheduler.total_size(0), 4u);
EXPECT_EQUAL(scheduler.total_size(1), 4u);
EXPECT_EQUAL(scheduler.total_size(2), 4u);
@@ -106,8 +102,6 @@ TEST("require that the partition scheduler acts as expected") {
TEST("require that the partition scheduler protects against documents underflow") {
PartitionDocidRangeScheduler scheduler(2, 0);
- TEST_DO(verify_range(scheduler.total_span(0), DocidRange(1,1)));
- TEST_DO(verify_range(scheduler.total_span(1), DocidRange(1,1)));
EXPECT_EQUAL(scheduler.total_size(0), 0u);
EXPECT_EQUAL(scheduler.total_size(1), 0u);
EXPECT_EQUAL(scheduler.unassigned_size(), 0u);
@@ -122,8 +116,6 @@ TEST("require that the partition scheduler protects against documents underflow"
TEST("require that the task scheduler acts as expected") {
TaskDocidRangeScheduler scheduler(2, 5, 20);
EXPECT_EQUAL(scheduler.unassigned_size(), 19u);
- TEST_DO(verify_range(scheduler.total_span(0), DocidRange(1, 20)));
- TEST_DO(verify_range(scheduler.total_span(1), DocidRange(1, 20)));
EXPECT_EQUAL(scheduler.total_size(0), 0u);
EXPECT_EQUAL(scheduler.total_size(1), 0u);
TEST_DO(verify_range(scheduler.first_range(1), DocidRange(1, 5)));
@@ -141,8 +133,6 @@ TEST("require that the task scheduler acts as expected") {
TEST("require that the task scheduler protects against documents underflow") {
TaskDocidRangeScheduler scheduler(2, 4, 0);
- TEST_DO(verify_range(scheduler.total_span(0), DocidRange(1,1)));
- TEST_DO(verify_range(scheduler.total_span(1), DocidRange(1,1)));
EXPECT_EQUAL(scheduler.total_size(0), 0u);
EXPECT_EQUAL(scheduler.total_size(1), 0u);
EXPECT_EQUAL(scheduler.unassigned_size(), 0u);
@@ -167,13 +157,6 @@ TEST("require that the adaptive scheduler starts by dividing the docid space equ
TEST_DO(verify_range(scheduler.first_range(3), DocidRange(13, 16)));
}
-TEST("require that the adaptive scheduler reports the full span to all threads") {
- AdaptiveDocidRangeScheduler scheduler(3, 1, 16);
- TEST_DO(verify_range(scheduler.total_span(0), DocidRange(1,16)));
- TEST_DO(verify_range(scheduler.total_span(1), DocidRange(1,16)));
- TEST_DO(verify_range(scheduler.total_span(2), DocidRange(1,16)));
-}
-
TEST_MT_FF("require that the adaptive scheduler terminates when all workers request more work",
4, AdaptiveDocidRangeScheduler(num_threads, 1, 16), TimeBomb(60))
{
diff --git a/searchcore/src/tests/proton/matching/match_loop_communicator/match_loop_communicator_test.cpp b/searchcore/src/tests/proton/matching/match_loop_communicator/match_loop_communicator_test.cpp
index f5564ac22a7..0b0f28962c4 100644
--- a/searchcore/src/tests/proton/matching/match_loop_communicator/match_loop_communicator_test.cpp
+++ b/searchcore/src/tests/proton/matching/match_loop_communicator/match_loop_communicator_test.cpp
@@ -1,48 +1,63 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/vespalib/testkit/test_kit.h>
#include <vespa/searchcore/proton/matching/match_loop_communicator.h>
-#include <vespa/vespalib/util/box.h>
+#include <algorithm>
using namespace proton::matching;
-using vespalib::Box;
-using vespalib::make_box;
-
using Range = MatchLoopCommunicator::Range;
using RangePair = MatchLoopCommunicator::RangePair;
using Matches = MatchLoopCommunicator::Matches;
using Hit = MatchLoopCommunicator::Hit;
using Hits = MatchLoopCommunicator::Hits;
+using TaggedHit = MatchLoopCommunicator::TaggedHit;
+using TaggedHits = MatchLoopCommunicator::TaggedHits;
using search::queryeval::SortedHitSequence;
+std::vector<Hit> hit_vec(std::vector<Hit> list) { return list; }
+
Hits makeScores(size_t id) {
switch (id) {
- case 0: return make_box<Hit>({1, 5.4}, {2, 4.4}, {3, 3.4}, {4, 2.4}, {5, 1.4});
- case 1: return make_box<Hit>({11, 5.3}, {12, 4.3}, {13, 3.3}, {14, 2.3}, {15, 1.3});
- case 2: return make_box<Hit>({21, 5.2}, {22, 4.2}, {23, 3.2}, {24, 2.2}, {25, 1.2});
- case 3: return make_box<Hit>({31, 5.1}, {32, 4.1}, {33, 3.1}, {34, 2.1}, {35, 1.1});
- case 4: return make_box<Hit>({41, 5.0}, {42, 4.0}, {43, 3.0}, {44, 2.0}, {45, 1.0});
+ case 0: return {{1, 5.4}, {2, 4.4}, {3, 3.4}, {4, 2.4}, {5, 1.4}};
+ case 1: return {{11, 5.3}, {12, 4.3}, {13, 3.3}, {14, 2.3}, {15, 1.3}};
+ case 2: return {{21, 5.2}, {22, 4.2}, {23, 3.2}, {24, 2.2}, {25, 1.2}};
+ case 3: return {{31, 5.1}, {32, 4.1}, {33, 3.1}, {34, 2.1}, {35, 1.1}};
+ case 4: return {{41, 5.0}, {42, 4.0}, {43, 3.0}, {44, 2.0}, {45, 1.0}};
}
- return Box<Hit>();
+ return {};
}
-Hits selectBest(MatchLoopCommunicator &com, const Hits &hits) {
+std::tuple<size_t,Hits,RangePair> second_phase(MatchLoopCommunicator &com, const Hits &hits, size_t thread_id, double delta = 0.0) {
std::vector<uint32_t> refs;
for (size_t i = 0; i < hits.size(); ++i) {
refs.push_back(i);
}
- return com.selectBest(SortedHitSequence(&hits[0], &refs[0], refs.size()));
+ auto my_work = com.get_second_phase_work(SortedHitSequence(&hits[0], &refs[0], refs.size()), thread_id);
+ // the DocumentScorer used by the match thread will sort on docid here to ensure increasing seek order, this is not needed here
+ size_t work_size = my_work.size();
+ for (auto &[hit, tag]: my_work) {
+ hit.second += delta; // second phase ranking is first phase + delta
+ }
+ auto [best_hits, ranges] = com.complete_second_phase(std::move(my_work), thread_id);
+ // the HitCollector will sort on docid to prepare for result merging, we do it to simplify comparing with expected results
+ auto sort_on_docid = [](const auto &a, const auto &b){ return (a.first < b.first); };
+ std::sort(best_hits.begin(), best_hits.end(), sort_on_docid);
+ return {work_size, best_hits, ranges};
}
-RangePair makeRanges(size_t id) {
- switch (id) {
- case 0: return std::make_pair(Range(5, 5), Range(7, 7));
- case 1: return std::make_pair(Range(2, 2), Range(8, 8));
- case 2: return std::make_pair(Range(3, 3), Range(6, 6));
- case 3: return std::make_pair(Range(1, 1), Range(5, 5));
- case 4: return std::make_pair(Range(4, 4), Range(9, 9));
- }
- return std::make_pair(Range(-50, -60), Range(60, 50));
+Hits selectBest(MatchLoopCommunicator &com, const Hits &hits, size_t thread_id) {
+ auto [work_size, best_hits, ranges] = second_phase(com, hits, thread_id);
+ return best_hits;
+}
+
+RangePair rangeCover(MatchLoopCommunicator &com, const Hits &hits, size_t thread_id, double delta) {
+ auto [work_size, best_hits, ranges] = second_phase(com, hits, thread_id, delta);
+ return ranges;
+}
+
+size_t my_work_size(MatchLoopCommunicator &com, const Hits &hits, size_t thread_id) {
+ auto [work_size, best_hits, ranges] = second_phase(com, hits, thread_id);
+ return work_size;
}
void equal(size_t count, const Hits & a, const Hits & b) {
@@ -59,114 +74,137 @@ void equal_range(const Range &a, const Range &b) {
EXPECT_EQUAL(a.high, b.high);
}
+void equal_ranges(const RangePair &a, const RangePair &b) {
+ TEST_DO(equal_range(a.first, b.first));
+ TEST_DO(equal_range(a.second, b.second));
+}
+
struct EveryOdd : public search::queryeval::IDiversifier {
bool accepted(uint32_t docId) override {
return docId & 0x01;
}
};
+struct None : public search::queryeval::IDiversifier {
+ bool accepted(uint32_t) override { return false; }
+};
+
TEST_F("require that selectBest gives appropriate results for single thread", MatchLoopCommunicator(num_threads, 3)) {
- TEST_DO(equal(2u, make_box<Hit>({1, 5}, {2, 4}), selectBest(f1, make_box<Hit>({1, 5}, {2, 4}))));
- TEST_DO(equal(3u, make_box<Hit>({1, 5}, {2, 4}, {3, 3}), selectBest(f1, make_box<Hit>({1, 5}, {2, 4}, {3, 3}))));
- TEST_DO(equal(3u, make_box<Hit>({1, 5}, {2, 4}, {3, 3}), selectBest(f1, make_box<Hit>({1, 5}, {2, 4}, {3, 3}, {4, 2}))));
+ TEST_DO(equal(2u, hit_vec({{1, 5}, {2, 4}}), selectBest(f1, hit_vec({{1, 5}, {2, 4}}), thread_id)));
+ TEST_DO(equal(3u, hit_vec({{1, 5}, {2, 4}, {3, 3}}), selectBest(f1, hit_vec({{1, 5}, {2, 4}, {3, 3}}), thread_id)));
+ TEST_DO(equal(3u, hit_vec({{1, 5}, {2, 4}, {3, 3}}), selectBest(f1, hit_vec({{1, 5}, {2, 4}, {3, 3}, {4, 2}}), thread_id)));
}
TEST_F("require that selectBest gives appropriate results for single thread with filter",
MatchLoopCommunicator(num_threads, 3, std::make_unique<EveryOdd>()))
{
- TEST_DO(equal(1u, make_box<Hit>({1, 5}), selectBest(f1, make_box<Hit>({1, 5}, {2, 4}))));
- TEST_DO(equal(2u, make_box<Hit>({1, 5}, {3, 3}), selectBest(f1, make_box<Hit>({1, 5}, {2, 4}, {3, 3}))));
- TEST_DO(equal(3u, make_box<Hit>({1, 5}, {3, 3}, {5, 1}), selectBest(f1, make_box<Hit>({1, 5}, {2, 4}, {3, 3}, {4, 2}, {5, 1}, {6, 0}))));
+ TEST_DO(equal(1u, hit_vec({{1, 5}}), selectBest(f1, hit_vec({{1, 5}, {2, 4}}), thread_id)));
+ TEST_DO(equal(2u, hit_vec({{1, 5}, {3, 3}}), selectBest(f1, hit_vec({{1, 5}, {2, 4}, {3, 3}}), thread_id)));
+ TEST_DO(equal(3u, hit_vec({{1, 5}, {3, 3}, {5, 1}}), selectBest(f1, hit_vec({{1, 5}, {2, 4}, {3, 3}, {4, 2}, {5, 1}, {6, 0}}), thread_id)));
}
TEST_MT_F("require that selectBest works with no hits", 10, MatchLoopCommunicator(num_threads, 10)) {
- EXPECT_TRUE(selectBest(f1, Box<Hit>()).empty());
+ EXPECT_TRUE(selectBest(f1, hit_vec({}), thread_id).empty());
}
TEST_MT_F("require that selectBest works with too many hits from all threads", 5, MatchLoopCommunicator(num_threads, 13)) {
if (thread_id < 3) {
- TEST_DO(equal(3u, makeScores(thread_id), selectBest(f1, makeScores(thread_id))));
+ TEST_DO(equal(3u, makeScores(thread_id), selectBest(f1, makeScores(thread_id), thread_id)));
} else {
- TEST_DO(equal(2u, makeScores(thread_id), selectBest(f1, makeScores(thread_id))));
+ TEST_DO(equal(2u, makeScores(thread_id), selectBest(f1, makeScores(thread_id), thread_id)));
}
}
TEST_MT_F("require that selectBest works with some exhausted threads", 5, MatchLoopCommunicator(num_threads, 22)) {
if (thread_id < 2) {
- TEST_DO(equal(5u, makeScores(thread_id), selectBest(f1, makeScores(thread_id))));
+ TEST_DO(equal(5u, makeScores(thread_id), selectBest(f1, makeScores(thread_id), thread_id)));
} else {
- TEST_DO(equal(4u, makeScores(thread_id), selectBest(f1, makeScores(thread_id))));
+ TEST_DO(equal(4u, makeScores(thread_id), selectBest(f1, makeScores(thread_id), thread_id)));
}
}
TEST_MT_F("require that selectBest can select all hits from all threads", 5, MatchLoopCommunicator(num_threads, 100)) {
- EXPECT_EQUAL(5u, selectBest(f1, makeScores(thread_id)).size());
+ EXPECT_EQUAL(5u, selectBest(f1, makeScores(thread_id), thread_id).size());
}
TEST_MT_F("require that selectBest works with some empty threads", 10, MatchLoopCommunicator(num_threads, 7)) {
if (thread_id < 2) {
- TEST_DO(equal(2u, makeScores(thread_id), selectBest(f1, makeScores(thread_id))));
+ TEST_DO(equal(2u, makeScores(thread_id), selectBest(f1, makeScores(thread_id), thread_id)));
} else if (thread_id < 5) {
- TEST_DO(equal(1u, makeScores(thread_id), selectBest(f1, makeScores(thread_id))));
+ TEST_DO(equal(1u, makeScores(thread_id), selectBest(f1, makeScores(thread_id), thread_id)));
} else {
- EXPECT_TRUE(selectBest(f1, makeScores(thread_id)).empty());
+ EXPECT_TRUE(selectBest(f1, makeScores(thread_id), thread_id).empty());
}
}
-TEST_F("require that rangeCover is identity function for single thread", MatchLoopCommunicator(num_threads, 5)) {
- RangePair res = f1.rangeCover(std::make_pair(Range(2, 4), Range(3, 5)));
- TEST_DO(equal_range(Range(2, 4), res.first));
- TEST_DO(equal_range(Range(3, 5), res.second));
+TEST_F("require that rangeCover works with a single thread", MatchLoopCommunicator(num_threads, 5)) {
+ RangePair res = rangeCover(f1, hit_vec({{1, 7.5}, {2, 1.5}}), thread_id, 10);
+ TEST_DO(equal_ranges(RangePair({1.5, 7.5}, {11.5, 17.5}), res));
}
-TEST_MT_F("require that rangeCover can mix ranges from multiple threads", 5, MatchLoopCommunicator(num_threads, 5)) {
- RangePair res = f1.rangeCover(makeRanges(thread_id));
- TEST_DO(equal_range(Range(1, 5), res.first));
- TEST_DO(equal_range(Range(5, 9), res.second));
+TEST_MT_F("require that rangeCover works with multiple threads", 5, MatchLoopCommunicator(num_threads, 10)) {
+ RangePair res = rangeCover(f1, hit_vec({{thread_id * 100 + 1, 100.0 + thread_id}, {thread_id * 100 + 2, 100.0 - thread_id}}), thread_id, 10);
+ TEST_DO(equal_ranges(RangePair({96.0, 104.0}, {106.0, 114.0}), res));
}
-TEST_MT_F("require that invalid ranges are ignored", 10, MatchLoopCommunicator(num_threads, 5)) {
- RangePair res = f1.rangeCover(makeRanges(thread_id));
- TEST_DO(equal_range(Range(1, 5), res.first));
- TEST_DO(equal_range(Range(5, 9), res.second));
+TEST_MT_F("require that rangeCover works with no hits", 10, MatchLoopCommunicator(num_threads, 5)) {
+ RangePair res = rangeCover(f1, hit_vec({}), thread_id, 10);
+ TEST_DO(equal_ranges(RangePair({}, {}), res));
}
-TEST_MT_F("require that only invalid ranges produce default invalid range", 3, MatchLoopCommunicator(num_threads, 5)) {
- RangePair res = f1.rangeCover(makeRanges(10));
- Range expect;
- TEST_DO(equal_range(expect, res.first));
- TEST_DO(equal_range(expect, res.second));
-}
-
-TEST_F("require that hits dropped due to lack of diversity affects range cover result",
- MatchLoopCommunicator(num_threads, 3, std::make_unique<EveryOdd>()))
+TEST_FFF("require that hits dropped due to lack of diversity affects range cover result",
+ MatchLoopCommunicator(num_threads, 3),
+ MatchLoopCommunicator(num_threads, 3, std::make_unique<EveryOdd>()),
+ MatchLoopCommunicator(num_threads, 3, std::make_unique<None>()))
{
- TEST_DO(equal(3u, make_box<Hit>({1, 5}, {3, 3}, {5, 1}), selectBest(f1, make_box<Hit>({1, 5}, {2, 4}, {3, 3}, {4, 2}, {5, 1}))));
- // best dropped: 4
- std::vector<RangePair> input = {
- std::make_pair(Range(), Range()),
- std::make_pair(Range(3, 5), Range(1, 10)),
- std::make_pair(Range(5, 10), Range(1, 10)),
- std::make_pair(Range(1, 3), Range(1, 10))
- };
- std::vector<RangePair> expect = {
- std::make_pair(Range(), Range()),
- std::make_pair(Range(4, 5), Range(1, 10)),
- std::make_pair(Range(5, 10), Range(1, 10)),
- std::make_pair(Range(4, 4), Range(1, 10))
- };
- ASSERT_EQUAL(input.size(), expect.size());
- for (size_t i = 0; i < input.size(); ++i) {
- auto output = f1.rangeCover(input[i]);
- TEST_STATE(vespalib::make_string("case: %zu", i).c_str());
- TEST_DO(equal_range(expect[i].first, output.first));
- TEST_DO(equal_range(expect[i].second, output.second));
- }
+ auto hits_in = hit_vec({{1, 5}, {2, 4}, {3, 3}, {4, 2}, {5, 1}});
+ auto [my_work1, hits1, ranges1] = second_phase(f1, hits_in, thread_id, 10);
+ auto [my_work2, hits2, ranges2] = second_phase(f2, hits_in, thread_id, 10);
+ auto [my_work3, hits3, ranges3] = second_phase(f3, hits_in, thread_id, 10);
+
+ EXPECT_EQUAL(my_work1, 3u);
+ EXPECT_EQUAL(my_work2, 3u);
+ EXPECT_EQUAL(my_work3, 0u);
+
+ TEST_DO(equal(3u, hit_vec({{1, 15}, {2, 14}, {3, 13}}), hits1));
+ TEST_DO(equal(3u, hit_vec({{1, 15}, {3, 13}, {5, 11}}), hits2));
+ TEST_DO(equal(0u, hit_vec({}), hits3));
+
+ TEST_DO(equal_ranges(RangePair({3,5},{13,15}), ranges1));
+ TEST_DO(equal_ranges(RangePair({4,5},{11,15}), ranges2)); // best dropped: 4
+
+ // note that the 'drops all hits due to diversity' case will
+ // trigger much of the same code path as dropping second phase
+ // ranking due to hard doom.
+
+ TEST_DO(equal_ranges(RangePair({},{}), ranges3));
}
-TEST_MT_F("require that count_matches will count hits and docs across threads", 4, MatchLoopCommunicator(num_threads, 5)) {
+TEST_MT_F("require that estimate_match_frequency will count hits and docs across threads", 4, MatchLoopCommunicator(num_threads, 5)) {
double freq = (0.0/10.0 + 1.0/11.0 + 2.0/12.0 + 3.0/13.0) / 4.0;
EXPECT_APPROX(freq, f1.estimate_match_frequency(Matches(thread_id, thread_id + 10)), 0.00001);
}
+TEST_MT_F("require that second phase work is evenly distributed among search threads", 5, MatchLoopCommunicator(num_threads, 20)) {
+ size_t num_hits = thread_id * 5;
+ size_t docid = thread_id * 100;
+ double score = thread_id * 100.0;
+ Hits my_hits;
+ for(size_t i = 0; i < num_hits; ++i) {
+ my_hits.emplace_back(++docid, score);
+ score -= 1.0;
+ }
+ auto [my_work, best_hits, ranges] = second_phase(f1, my_hits, thread_id, 1000.0);
+ EXPECT_EQUAL(my_work, 4u);
+ TEST_DO(equal_ranges(RangePair({381,400},{1381,1400}), ranges));
+ if (thread_id == 4) {
+ for (auto &hit: my_hits) {
+ hit.second += 1000.0;
+ }
+ TEST_DO(equal(num_hits, my_hits, best_hits));
+ } else {
+ EXPECT_TRUE(best_hits.empty());
+ }
+}
+
TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/searchcore/src/vespa/searchcore/proton/matching/docid_range_scheduler.h b/searchcore/src/vespa/searchcore/proton/matching/docid_range_scheduler.h
index e6fce89e82d..19e5ec0343d 100644
--- a/searchcore/src/vespa/searchcore/proton/matching/docid_range_scheduler.h
+++ b/searchcore/src/vespa/searchcore/proton/matching/docid_range_scheduler.h
@@ -71,17 +71,15 @@ public:
* 'next_range' function. When a worker is assigned an empty range,
* its work is done.
*
- * The 'total_span' function returns a range that is guaranteed to
- * contain all ranges assigned to the given worker. The 'total_size'
- * function returns the accumulated size of all ranges assigned to the
- * given worker. The 'unassigned_size' function returns the
- * accumulated size of all currently unassigned ranges.
+ * The 'total_size' function returns the accumulated size of all
+ * ranges assigned to the given worker. The 'unassigned_size' function
+ * returns the accumulated size of all currently unassigned ranges.
*
- * Note that the return values from 'total_span', 'total_size' and
- * 'unassigned_size' may or may not account for the range returned
- * from 'first_range' since the scheduler is allowed to pre-assign
- * ranges to workers. Calling 'first_range' first ensures that all
- * other return values make sense.
+ * Note that the return values from 'total_size' and 'unassigned_size'
+ * may or may not account for the range returned from 'first_range'
+ * since the scheduler is allowed to pre-assign ranges to
+ * workers. Calling 'first_range' first ensures that all other return
+ * values make sense.
*
* The 'idle_observer' and 'share_range' functions are used for
* work-sharing, where a worker thread potentially can offload some of
@@ -109,7 +107,6 @@ struct DocidRangeScheduler {
typedef std::unique_ptr<DocidRangeScheduler> UP;
virtual DocidRange first_range(size_t thread_id) = 0;
virtual DocidRange next_range(size_t thread_id) = 0;
- virtual DocidRange total_span(size_t thread_id) const = 0;
virtual size_t total_size(size_t thread_id) const = 0;
virtual size_t unassigned_size() const = 0;
virtual IdleObserver make_idle_observer() const = 0;
@@ -130,7 +127,6 @@ public:
PartitionDocidRangeScheduler(size_t num_threads, uint32_t docid_limit);
DocidRange first_range(size_t thread_id) override { return _ranges[thread_id]; }
DocidRange next_range(size_t) override { return DocidRange(); }
- DocidRange total_span(size_t thread_id) const override { return _ranges[thread_id]; }
size_t total_size(size_t thread_id) const override { return _ranges[thread_id].size(); }
size_t unassigned_size() const override { return 0; }
IdleObserver make_idle_observer() const override { return IdleObserver(); }
@@ -157,7 +153,6 @@ public:
TaskDocidRangeScheduler(size_t num_threads, size_t num_tasks, uint32_t docid_limit);
DocidRange first_range(size_t thread_id) override { return next_task(thread_id); }
DocidRange next_range(size_t thread_id) override { return next_task(thread_id); }
- DocidRange total_span(size_t) const override { return _splitter.full_range(); }
size_t total_size(size_t thread_id) const override { return _assigned[thread_id]; }
size_t unassigned_size() const override { return _unassigned.load(std::memory_order::memory_order_relaxed); }
IdleObserver make_idle_observer() const override { return IdleObserver(); }
@@ -197,7 +192,6 @@ public:
~AdaptiveDocidRangeScheduler();
DocidRange first_range(size_t thread_id) override;
DocidRange next_range(size_t thread_id) override;
- DocidRange total_span(size_t) const override { return _splitter.full_range(); }
size_t total_size(size_t thread_id) const override { return _assigned[thread_id]; }
size_t unassigned_size() const override { return 0; }
IdleObserver make_idle_observer() const override { return IdleObserver(_num_idle); }
diff --git a/searchcore/src/vespa/searchcore/proton/matching/document_scorer.cpp b/searchcore/src/vespa/searchcore/proton/matching/document_scorer.cpp
index c7721b428b9..608e8adf5c2 100644
--- a/searchcore/src/vespa/searchcore/proton/matching/document_scorer.cpp
+++ b/searchcore/src/vespa/searchcore/proton/matching/document_scorer.cpp
@@ -1,6 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "document_scorer.h"
+#include <algorithm>
#include <cassert>
using search::feature_t;
@@ -30,10 +31,14 @@ DocumentScorer::DocumentScorer(RankProgram &rankProgram,
{
}
-feature_t
-DocumentScorer::score(uint32_t docId)
+void
+DocumentScorer::score(TaggedHits &hits)
{
- return doScore(docId);
+ auto sort_on_docid = [](const TaggedHit &a, const TaggedHit &b){ return (a.first.first < b.first.first); };
+ std::sort(hits.begin(), hits.end(), sort_on_docid);
+ for (auto &hit: hits) {
+ hit.first.second = doScore(hit.first.first);
+ }
}
}
diff --git a/searchcore/src/vespa/searchcore/proton/matching/document_scorer.h b/searchcore/src/vespa/searchcore/proton/matching/document_scorer.h
index f29f70b2cfa..63ee00c3412 100644
--- a/searchcore/src/vespa/searchcore/proton/matching/document_scorer.h
+++ b/searchcore/src/vespa/searchcore/proton/matching/document_scorer.h
@@ -2,24 +2,28 @@
#pragma once
+#include "i_match_loop_communicator.h"
#include <vespa/searchlib/fef/rank_program.h>
-#include <vespa/searchlib/queryeval/hitcollector.h>
#include <vespa/searchlib/queryeval/searchiterator.h>
namespace proton::matching {
/**
* Class used to calculate the rank score for a set of documents using
- * a rank program for calculation and a search iterator for unpacking match data.
- * The calculateScore() function is always called in increasing docId order.
+ * a rank program for calculation and a search iterator for unpacking
+ * match data. The doScore function must be called with increasing
+ * docid.
*/
-class DocumentScorer : public search::queryeval::HitCollector::DocumentScorer
+class DocumentScorer
{
private:
search::queryeval::SearchIterator &_searchItr;
search::fef::LazyValue _scoreFeature;
public:
+ using TaggedHit = IMatchLoopCommunicator::TaggedHit;
+ using TaggedHits = IMatchLoopCommunicator::TaggedHits;
+
DocumentScorer(search::fef::RankProgram &rankProgram,
search::queryeval::SearchIterator &searchItr);
@@ -28,7 +32,8 @@ public:
return _scoreFeature.as_number(docId);
}
- virtual search::feature_t score(uint32_t docId) override;
+ // annotate hits with rank score, may change order
+ void score(TaggedHits &hits);
};
}
diff --git a/searchcore/src/vespa/searchcore/proton/matching/i_match_loop_communicator.h b/searchcore/src/vespa/searchcore/proton/matching/i_match_loop_communicator.h
index 15ca9921524..c9a4a61f9c5 100644
--- a/searchcore/src/vespa/searchcore/proton/matching/i_match_loop_communicator.h
+++ b/searchcore/src/vespa/searchcore/proton/matching/i_match_loop_communicator.h
@@ -17,6 +17,8 @@ struct IMatchLoopCommunicator {
using SortedHitSequence = search::queryeval::SortedHitSequence;
using Hit = SortedHitSequence::Hit;
using Hits = std::vector<Hit>;
+ using TaggedHit = std::pair<Hit,size_t>;
+ using TaggedHits = std::vector<TaggedHit>;
struct Matches {
size_t hits;
size_t docs;
@@ -28,8 +30,8 @@ struct IMatchLoopCommunicator {
}
};
virtual double estimate_match_frequency(const Matches &matches) = 0;
- virtual Hits selectBest(SortedHitSequence sortedHits) = 0;
- virtual RangePair rangeCover(const RangePair &ranges) = 0;
+ virtual TaggedHits get_second_phase_work(SortedHitSequence sortedHits, size_t thread_id) = 0;
+ virtual std::pair<Hits,RangePair> complete_second_phase(TaggedHits my_results, size_t thread_id) = 0;
virtual ~IMatchLoopCommunicator() {}
};
diff --git a/searchcore/src/vespa/searchcore/proton/matching/match_loop_communicator.cpp b/searchcore/src/vespa/searchcore/proton/matching/match_loop_communicator.cpp
index 07a8b224b89..4db26f6308a 100644
--- a/searchcore/src/vespa/searchcore/proton/matching/match_loop_communicator.cpp
+++ b/searchcore/src/vespa/searchcore/proton/matching/match_loop_communicator.cpp
@@ -9,10 +9,11 @@ MatchLoopCommunicator::MatchLoopCommunicator(size_t threads, size_t topN)
: MatchLoopCommunicator(threads, topN, std::unique_ptr<IDiversifier>())
{}
MatchLoopCommunicator::MatchLoopCommunicator(size_t threads, size_t topN, std::unique_ptr<IDiversifier> diversifier)
- : _best_dropped(),
+ : _best_scores(),
+ _best_dropped(),
_estimate_match_frequency(threads),
- _selectBest(threads, topN, _best_dropped, std::move(diversifier)),
- _rangeCover(threads, _best_dropped)
+ _get_second_phase_work(threads, topN, _best_scores, _best_dropped, std::move(diversifier)),
+ _complete_second_phase(threads, topN, _best_scores, _best_dropped)
{}
MatchLoopCommunicator::~MatchLoopCommunicator() = default;
@@ -33,25 +34,30 @@ MatchLoopCommunicator::EstimateMatchFrequency::mingle()
}
}
-MatchLoopCommunicator::SelectBest::SelectBest(size_t n, size_t topN_in, BestDropped &best_dropped_in, std::unique_ptr<IDiversifier> diversifier)
- : vespalib::Rendezvous<SortedHitSequence, Hits>(n),
+MatchLoopCommunicator::GetSecondPhaseWork::GetSecondPhaseWork(size_t n, size_t topN_in, Range &best_scores_in, BestDropped &best_dropped_in, std::unique_ptr<IDiversifier> diversifier)
+ : vespalib::Rendezvous<SortedHitSequence, TaggedHits, true>(n),
topN(topN_in),
+ best_scores(best_scores_in),
best_dropped(best_dropped_in),
_diversifier(std::move(diversifier))
{}
-MatchLoopCommunicator::SelectBest::~SelectBest() = default;
+MatchLoopCommunicator::GetSecondPhaseWork::~GetSecondPhaseWork() = default;
template<typename Q, typename F>
void
-MatchLoopCommunicator::SelectBest::mingle(Q &queue, F &&accept)
+MatchLoopCommunicator::GetSecondPhaseWork::mingle(Q &queue, F &&accept)
{
- best_dropped.valid = false;
- for (size_t picked = 0; picked < topN && !queue.empty(); ) {
+ size_t picked = 0;
+ search::feature_t last_score = 0.0;
+ while ((picked < topN) && !queue.empty()) {
uint32_t i = queue.front();
const Hit & hit = in(i).get();
if (accept(hit.first)) {
- out(i).push_back(hit);
- ++picked;
+ out(picked % size()).emplace_back(hit, i);
+ last_score = hit.second;
+ if (++picked == 1) {
+ best_scores.high = hit.second;
+ }
} else if (!best_dropped.valid) {
best_dropped.valid = true;
best_dropped.score = hit.second;
@@ -63,16 +69,21 @@ MatchLoopCommunicator::SelectBest::mingle(Q &queue, F &&accept)
queue.pop_front();
}
}
+ if (picked > 0) {
+ best_scores.low = last_score;
+ }
}
void
-MatchLoopCommunicator::SelectBest::mingle()
+MatchLoopCommunicator::GetSecondPhaseWork::mingle()
{
- size_t est_out = (topN / size()) + 16;
+ best_scores = Range();
+ best_dropped.valid = false;
+ size_t est_out = (topN / size()) + 1;
vespalib::PriorityQueue<uint32_t, SelectCmp> queue(SelectCmp(*this));
for (size_t i = 0; i < size(); ++i) {
+ out(i).reserve(est_out);
if (in(i).valid()) {
- out(i).reserve(est_out);
queue.push(i);
}
}
@@ -84,28 +95,26 @@ MatchLoopCommunicator::SelectBest::mingle()
}
void
-MatchLoopCommunicator::RangeCover::mingle()
+MatchLoopCommunicator::CompleteSecondPhase::mingle()
{
- size_t i = 0;
- while (i < size() && (!in(i).first.isValid() || !in(i).second.isValid())) {
- ++i;
+ RangePair score_ranges(best_scores, Range());
+ Range &new_scores = score_ranges.second;
+ size_t est_out = (topN / size()) + 16;
+ for (size_t i = 0; i < size(); ++i) {
+ out(i).first.reserve(est_out);
}
- if (i < size()) {
- RangePair result = in(i++);
- for (; i < size(); ++i) {
- if (in(i).first.isValid() && in(i).second.isValid()) {
- result.first.low = std::min(result.first.low, in(i).first.low);
- result.first.high = std::max(result.first.high, in(i).first.high);
- result.second.low = std::min(result.second.low, in(i).second.low);
- result.second.high = std::max(result.second.high, in(i).second.high);
- }
+ for (size_t i = 0; i < size(); ++i) {
+ for (const auto &[hit, tag]: in(i)) {
+ out(tag).first.push_back(hit);
+ new_scores.update(hit.second);
}
+ }
+ if (score_ranges.first.isValid() && score_ranges.second.isValid()) {
if (best_dropped.valid) {
- result.first.low = std::max(result.first.low, best_dropped.score);
- result.first.high = std::max(result.first.low, result.first.high);
+ score_ranges.first.low = std::max(score_ranges.first.low, best_dropped.score);
}
- for (size_t j = 0; j < size(); ++j) {
- out(j) = result;
+ for (size_t i = 0; i < size(); ++i) {
+ out(i).second = score_ranges;
}
}
}
diff --git a/searchcore/src/vespa/searchcore/proton/matching/match_loop_communicator.h b/searchcore/src/vespa/searchcore/proton/matching/match_loop_communicator.h
index 425197fac3b..42dd82587fa 100644
--- a/searchcore/src/vespa/searchcore/proton/matching/match_loop_communicator.h
+++ b/searchcore/src/vespa/searchcore/proton/matching/match_loop_communicator.h
@@ -20,12 +20,13 @@ private:
EstimateMatchFrequency(size_t n) : vespalib::Rendezvous<Matches, double>(n) {}
void mingle() override;
};
- struct SelectBest : vespalib::Rendezvous<SortedHitSequence, Hits> {
+ struct GetSecondPhaseWork : vespalib::Rendezvous<SortedHitSequence, TaggedHits, true> {
size_t topN;
+ Range &best_scores;
BestDropped &best_dropped;
std::unique_ptr<IDiversifier> _diversifier;
- SelectBest(size_t n, size_t topN_in, BestDropped &best_dropped_in, std::unique_ptr<IDiversifier>);
- ~SelectBest() override;
+ GetSecondPhaseWork(size_t n, size_t topN_in, Range &best_scores_in, BestDropped &best_dropped_in, std::unique_ptr<IDiversifier>);
+ ~GetSecondPhaseWork() override;
void mingle() override;
template<typename Q, typename F>
void mingle(Q &queue, F &&accept);
@@ -34,23 +35,27 @@ private:
}
};
struct SelectCmp {
- SelectBest &sb;
- SelectCmp(SelectBest &sb_in) : sb(sb_in) {}
+ GetSecondPhaseWork &sb;
+ SelectCmp(GetSecondPhaseWork &sb_in) : sb(sb_in) {}
bool operator()(uint32_t a, uint32_t b) const {
return (sb.cmp(a, b));
}
};
- struct RangeCover : vespalib::Rendezvous<RangePair, RangePair> {
- BestDropped &best_dropped;
- RangeCover(size_t n, BestDropped &best_dropped_in)
- : vespalib::Rendezvous<RangePair, RangePair>(n), best_dropped(best_dropped_in) {}
+ struct CompleteSecondPhase : vespalib::Rendezvous<TaggedHits, std::pair<Hits,RangePair>, true> {
+ size_t topN;
+ const Range &best_scores;
+ const BestDropped &best_dropped;
+ CompleteSecondPhase(size_t n, size_t topN_in, const Range &best_scores_in, const BestDropped &best_dropped_in)
+ : vespalib::Rendezvous<TaggedHits, std::pair<Hits,RangePair>, true>(n),
+ topN(topN_in), best_scores(best_scores_in), best_dropped(best_dropped_in) {}
void mingle() override;
};
- BestDropped _best_dropped;
- EstimateMatchFrequency _estimate_match_frequency;
- SelectBest _selectBest;
- RangeCover _rangeCover;
+ Range _best_scores;
+ BestDropped _best_dropped;
+ EstimateMatchFrequency _estimate_match_frequency;
+ GetSecondPhaseWork _get_second_phase_work;
+ CompleteSecondPhase _complete_second_phase;
public:
MatchLoopCommunicator(size_t threads, size_t topN);
@@ -60,11 +65,13 @@ public:
double estimate_match_frequency(const Matches &matches) override {
return _estimate_match_frequency.rendezvous(matches);
}
- Hits selectBest(SortedHitSequence sortedHits) override {
- return _selectBest.rendezvous(sortedHits);
+
+ TaggedHits get_second_phase_work(SortedHitSequence sortedHits, size_t thread_id) override {
+ return _get_second_phase_work.rendezvous(sortedHits, thread_id);
}
- RangePair rangeCover(const RangePair &ranges) override {
- return _rangeCover.rendezvous(ranges);
+
+ std::pair<Hits,RangePair> complete_second_phase(TaggedHits my_results, size_t thread_id) override {
+ return _complete_second_phase.rendezvous(std::move(my_results), thread_id);
}
};
diff --git a/searchcore/src/vespa/searchcore/proton/matching/match_master.cpp b/searchcore/src/vespa/searchcore/proton/matching/match_master.cpp
index 3cbf88facd5..6c86f6c2d1c 100644
--- a/searchcore/src/vespa/searchcore/proton/matching/match_master.cpp
+++ b/searchcore/src/vespa/searchcore/proton/matching/match_master.cpp
@@ -32,13 +32,13 @@ struct TimedMatchLoopCommunicator : IMatchLoopCommunicator {
double estimate_match_frequency(const Matches &matches) override {
return communicator.estimate_match_frequency(matches);
}
- Hits selectBest(SortedHitSequence sortedHits) override {
- auto result = communicator.selectBest(sortedHits);
+ TaggedHits get_second_phase_work(SortedHitSequence sortedHits, size_t thread_id) override {
+ auto result = communicator.get_second_phase_work(sortedHits, thread_id);
timer = vespalib::Timer();
return result;
}
- RangePair rangeCover(const RangePair &ranges) override {
- RangePair result = communicator.rangeCover(ranges);
+ std::pair<Hits,RangePair> complete_second_phase(TaggedHits my_results, size_t thread_id) override {
+ auto result = communicator.complete_second_phase(std::move(my_results), thread_id);
elapsed = timer.elapsed();
return result;
}
diff --git a/searchcore/src/vespa/searchcore/proton/matching/match_thread.cpp b/searchcore/src/vespa/searchcore/proton/matching/match_thread.cpp
index 0e80d31a063..5cb4394880f 100644
--- a/searchcore/src/vespa/searchcore/proton/matching/match_thread.cpp
+++ b/searchcore/src/vespa/searchcore/proton/matching/match_thread.cpp
@@ -202,8 +202,8 @@ MatchThread::match_loop(MatchTools &tools, HitCollector &hits)
uint32_t matches = context.matches;
if (do_limit && context.isBelowLimit()) {
const size_t searchedSoFar = scheduler.total_size(thread_id);
- LOG(debug, "Limit not reached (had %d) at docid=%d which is after %zu docs.",
- matches, scheduler.total_span(thread_id).end, searchedSoFar);
+ LOG(debug, "Limit not reached (had %d) after %zu docs.",
+ matches, searchedSoFar);
estimate_match_frequency(matches, searchedSoFar);
tools.match_limiter().updateDocIdSpaceEstimate(searchedSoFar, 0);
}
@@ -293,35 +293,31 @@ MatchThread::findMatches(MatchTools &tools)
trace->addEvent(4, "Start match and first phase rank");
match_loop_helper(tools, hits);
if (tools.has_second_phase_rank()) {
- { // 2nd phase ranking
- trace->addEvent(4, "Start second phase rerank");
- tools.setup_second_phase();
- DocidRange docid_range = scheduler.total_span(thread_id);
- tools.search().initRange(docid_range.begin, docid_range.end);
- auto sorted_hit_seq = matchToolsFactory.should_diversify()
- ? hits.getSortedHitSequence(matchParams.arraySize)
- : hits.getSortedHitSequence(matchParams.heapSize);
- trace->addEvent(5, "Synchronize before second phase rerank");
- WaitTimer select_best_timer(wait_time_s);
- auto kept_hits = communicator.selectBest(sorted_hit_seq);
- select_best_timer.done();
- DocumentScorer scorer(tools.rank_program(), tools.search());
- if (tools.getDoom().hard_doom()) {
- kept_hits.clear();
- }
- uint32_t reRanked = hits.reRank(scorer, std::move(kept_hits));
- if (auto onReRankTask = matchToolsFactory.createOnReRankTask()) {
- onReRankTask->run(hits.getReRankedHits());
- }
- thread_stats.docsReRanked(reRanked);
+ trace->addEvent(4, "Start second phase rerank");
+ tools.setup_second_phase();
+ DocidRange docid_range(1, matchParams.numDocs);
+ tools.search().initRange(docid_range.begin, docid_range.end);
+ auto sorted_hit_seq = matchToolsFactory.should_diversify()
+ ? hits.getSortedHitSequence(matchParams.arraySize)
+ : hits.getSortedHitSequence(matchParams.heapSize);
+ trace->addEvent(5, "Synchronize before second phase rerank");
+ WaitTimer get_second_phase_work_timer(wait_time_s);
+ auto my_work = communicator.get_second_phase_work(sorted_hit_seq, thread_id);
+ get_second_phase_work_timer.done();
+ DocumentScorer scorer(tools.rank_program(), tools.search());
+ if (tools.getDoom().hard_doom()) {
+ my_work.clear();
}
- { // rank scaling
- trace->addEvent(5, "Synchronize before rank scaling");
- auto my_ranges = hits.getRanges();
- WaitTimer range_cover_timer(wait_time_s);
- auto ranges = communicator.rangeCover(my_ranges);
- range_cover_timer.done();
- hits.setRanges(ranges);
+ scorer.score(my_work);
+ thread_stats.docsReRanked(my_work.size());
+ trace->addEvent(5, "Synchronize before rank scaling");
+ WaitTimer complete_second_phase_timer(wait_time_s);
+ auto [kept_hits, ranges] = communicator.complete_second_phase(my_work, thread_id);
+ complete_second_phase_timer.done();
+ hits.setReRankedHits(std::move(kept_hits));
+ hits.setRanges(ranges);
+ if (auto onReRankTask = matchToolsFactory.createOnReRankTask()) {
+ onReRankTask->run(hits.getReRankedHits());
}
}
trace->addEvent(4, "Create result set");