aboutsummaryrefslogtreecommitdiffstats
path: root/searchlib
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@online.no>2022-01-09 16:34:43 +0100
committerTor Egge <Tor.Egge@online.no>2022-01-09 16:48:03 +0100
commit58c83e0fce3c23edf0565ced68db48f2edd2fba8 (patch)
tree56ce7ff0ba5d07afee1456f95ac8cb9b3a0c9146 /searchlib
parent70231237adb460ffa0cf3e289880aa3115432fba (diff)
Prepare for smaller tasks for field merging.
Diffstat (limited to 'searchlib')
-rw-r--r--searchlib/src/tests/diskindex/fusion/.gitignore5
-rw-r--r--searchlib/src/tests/diskindex/fusion/fusion_test.cpp28
-rw-r--r--searchlib/src/vespa/searchlib/diskindex/field_merger.cpp39
-rw-r--r--searchlib/src/vespa/searchlib/diskindex/field_merger.h5
-rw-r--r--searchlib/src/vespa/searchlib/diskindex/fusion.h1
-rw-r--r--searchlib/src/vespa/searchlib/diskindex/fusion_output_index.cpp1
-rw-r--r--searchlib/src/vespa/searchlib/diskindex/fusion_output_index.h3
-rw-r--r--searchlib/src/vespa/searchlib/test/fakedata/fakememtreeocc.cpp7
-rw-r--r--searchlib/src/vespa/searchlib/util/posting_priority_queue.h14
-rw-r--r--searchlib/src/vespa/searchlib/util/posting_priority_queue.hpp13
-rw-r--r--searchlib/src/vespa/searchlib/util/posting_priority_queue_merger.h21
-rw-r--r--searchlib/src/vespa/searchlib/util/posting_priority_queue_merger.hpp60
12 files changed, 123 insertions, 74 deletions
diff --git a/searchlib/src/tests/diskindex/fusion/.gitignore b/searchlib/src/tests/diskindex/fusion/.gitignore
index d9a33665c43..934c9efb8fa 100644
--- a/searchlib/src/tests/diskindex/fusion/.gitignore
+++ b/searchlib/src/tests/diskindex/fusion/.gitignore
@@ -21,10 +21,7 @@ mdump2
mdump3
mdump4
mdump5
-sdump2
-sdump3
-sdump4
-sdump5
+sdump[2-6]
/ddump6
/dmdump6
/dump6
diff --git a/searchlib/src/tests/diskindex/fusion/fusion_test.cpp b/searchlib/src/tests/diskindex/fusion/fusion_test.cpp
index ccc5e8cbade..72867edf474 100644
--- a/searchlib/src/tests/diskindex/fusion/fusion_test.cpp
+++ b/searchlib/src/tests/diskindex/fusion/fusion_test.cpp
@@ -24,7 +24,7 @@
#include <vespa/vespalib/util/destructor_callbacks.h>
#include <vespa/vespalib/util/threadstackexecutor.h>
#include <vespa/vespalib/util/sequencedtaskexecutor.h>
-#include <gtest/gtest.h>
+#include <vespa/vespalib/gtest/gtest.h>
#include <vespa/log/log.h>
LOG_SETUP("fusion_test");
@@ -65,7 +65,7 @@ protected:
Schema _schema;
const Schema & getSchema() const { return _schema; }
- void requireThatFusionIsWorking(const vespalib::string &prefix, bool directio, bool readmmap);
+ void requireThatFusionIsWorking(const vespalib::string &prefix, bool directio, bool readmmap, bool force_short_merge_chunk);
void make_simple_index(const vespalib::string &dump_dir, const IFieldLengthInspector &field_length_inspector);
bool try_merge_simple_indexes(const vespalib::string &dump_dir, const std::vector<vespalib::string> &sources, std::shared_ptr<IFlushToken> flush_token);
void merge_simple_indexes(const vespalib::string &dump_dir, const std::vector<vespalib::string> &sources);
@@ -292,7 +292,7 @@ VESPA_THREAD_STACK_TAG(invert_executor)
VESPA_THREAD_STACK_TAG(push_executor)
void
-FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool directio, bool readmmap)
+FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool directio, bool readmmap, bool force_small_merge_chunk)
{
Schema schema;
Schema schema2;
@@ -393,6 +393,7 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire
sources.push_back(prefix + "dump2");
Fusion fusion(schema, prefix + "dump3", sources, selector,
tuneFileIndexing,fileHeaderContext);
+ fusion.set_force_small_merge_chunk(force_small_merge_chunk);
ASSERT_TRUE(fusion.merge(executor, std::make_shared<FlushToken>()));
} while (0);
do {
@@ -406,6 +407,7 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire
sources.push_back(prefix + "dump3");
Fusion fusion(schema2, prefix + "dump4", sources, selector,
tuneFileIndexing, fileHeaderContext);
+ fusion.set_force_small_merge_chunk(force_small_merge_chunk);
ASSERT_TRUE(fusion.merge(executor, std::make_shared<FlushToken>()));
} while (0);
do {
@@ -419,6 +421,7 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire
sources.push_back(prefix + "dump3");
Fusion fusion(schema3, prefix + "dump5", sources, selector,
tuneFileIndexing, fileHeaderContext);
+ fusion.set_force_small_merge_chunk(force_small_merge_chunk);
ASSERT_TRUE(fusion.merge(executor, std::make_shared<FlushToken>()));
} while (0);
do {
@@ -433,6 +436,7 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire
Fusion fusion(schema, prefix + "dump6", sources, selector,
tuneFileIndexing, fileHeaderContext);
fusion.set_dynamic_k_pos_index_format(true);
+ fusion.set_force_small_merge_chunk(force_small_merge_chunk);
ASSERT_TRUE(fusion.merge(executor, std::make_shared<FlushToken>()));
} while (0);
do {
@@ -446,6 +450,7 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire
sources.push_back(prefix + "dump2");
Fusion fusion(schema, prefix + "dump3", sources, selector,
tuneFileIndexing, fileHeaderContext);
+ fusion.set_force_small_merge_chunk(force_small_merge_chunk);
ASSERT_TRUE(fusion.merge(executor, std::make_shared<FlushToken>()));
} while (0);
do {
@@ -506,22 +511,27 @@ FusionTest::FusionTest()
TEST_F(FusionTest, require_that_normal_fusion_is_working)
{
- requireThatFusionIsWorking("", false, false);
+ requireThatFusionIsWorking("", false, false, false);
}
TEST_F(FusionTest, require_that_directio_fusion_is_working)
{
- requireThatFusionIsWorking("d", true, false);
+ requireThatFusionIsWorking("d", true, false, false);
}
TEST_F(FusionTest, require_that_mmap_fusion_is_working)
{
- requireThatFusionIsWorking("m", false, true);
+ requireThatFusionIsWorking("m", false, true, false);
}
TEST_F(FusionTest, require_that_directiommap_fusion_is_working)
{
- requireThatFusionIsWorking("dm", true, true);
+ requireThatFusionIsWorking("dm", true, true, false);
+}
+
+TEST_F(FusionTest, require_that_small_merge_chunk_fusion_is_working)
+{
+ requireThatFusionIsWorking("s", false, false, true);
}
namespace {
@@ -608,11 +618,11 @@ TEST_F(FusionTest, require_that_fusion_can_be_stopped)
vespalib::rmdir("stopdump3", true);
flush_token = std::make_shared<MyFlushToken>(1);
ASSERT_FALSE(try_merge_simple_indexes("stopdump3", {"stopdump2"}, flush_token));
- EXPECT_EQ(12, flush_token->get_checks());
+ EXPECT_EQ(8, flush_token->get_checks());
vespalib::rmdir("stopdump3", true);
flush_token = std::make_shared<MyFlushToken>(47);
ASSERT_FALSE(try_merge_simple_indexes("stopdump3", {"stopdump2"}, flush_token));
- EXPECT_LT(48, flush_token->get_checks());
+ EXPECT_LE(48, flush_token->get_checks());
clean_stopped_fusion_testdirs();
}
diff --git a/searchlib/src/vespa/searchlib/diskindex/field_merger.cpp b/searchlib/src/vespa/searchlib/diskindex/field_merger.cpp
index 6179f06c9da..d313957d528 100644
--- a/searchlib/src/vespa/searchlib/diskindex/field_merger.cpp
+++ b/searchlib/src/vespa/searchlib/diskindex/field_merger.cpp
@@ -39,6 +39,9 @@ namespace search::diskindex {
namespace {
+constexpr uint32_t renumber_word_ids_merge_chunk = 1000000;
+constexpr uint32_t merge_postings_merge_chunk = 50000;
+
vespalib::string
createTmpPath(const vespalib::string & base, uint32_t index) {
vespalib::asciistream os;
@@ -174,18 +177,20 @@ FieldMerger::renumber_word_ids_start()
return false;
}
_word_aggregator = std::make_unique<WordAggregator>();
+ _word_heap->setup(4);
+ _word_heap->set_merge_chunk(_fusion_out_index.get_force_small_merge_chunk() ? 1u : renumber_word_ids_merge_chunk);
return true;
}
-bool
+void
FieldMerger::renumber_word_ids_main()
{
- _word_heap->merge(*_word_aggregator, 4, *_flush_token);
+ _word_heap->merge(*_word_aggregator, *_flush_token);
if (_flush_token->stop_requested()) {
- return false;
+ _failed = true;
+ } else if (_word_heap->empty()) {
+ _state = State::RENUMBER_WORD_IDS_FINISH;
}
- assert(_word_heap->empty());
- return true;
}
bool
@@ -356,6 +361,8 @@ FieldMerger::setup_merge_heap()
_heap->initialAdd(reader.get());
}
}
+ _heap->setup(4);
+ _heap->set_merge_chunk(_fusion_out_index.get_force_small_merge_chunk() ? 1u : merge_postings_merge_chunk);
return true;
}
@@ -374,15 +381,15 @@ FieldMerger::merge_postings_start()
return setup_merge_heap();
}
-bool
+void
FieldMerger::merge_postings_main()
{
- _heap->merge(*_writer, 4, *_flush_token);
+ _heap->merge(*_writer, *_flush_token);
if (_flush_token->stop_requested()) {
- return false;
+ _failed = true;
+ } else if (_heap->empty()) {
+ _state = State::MERGE_POSTINGS_FINISH;
}
- assert(_heap->empty());
- return true;
}
bool
@@ -475,11 +482,7 @@ FieldMerger::process_merge_field()
merge_field_start();
break;
case State::RENUMBER_WORD_IDS:
- if (!renumber_word_ids_main()) {
- renumber_word_ids_failed();
- } else {
- _state = State::RENUMBER_WORD_IDS_FINISH;
- }
+ renumber_word_ids_main();
break;
case State::RENUMBER_WORD_IDS_FINISH:
if (!renumber_word_ids_finish()) {
@@ -491,11 +494,7 @@ FieldMerger::process_merge_field()
}
break;
case State::MERGE_POSTINGS:
- if (!merge_postings_main()) {
- merge_postings_failed();
- } else {
- _state = State::MERGE_POSTINGS_FINISH;
- }
+ merge_postings_main();
break;
case State::MERGE_POSTINGS_FINISH:
merge_field_finish();
diff --git a/searchlib/src/vespa/searchlib/diskindex/field_merger.h b/searchlib/src/vespa/searchlib/diskindex/field_merger.h
index c5ce337e845..5017a7d5192 100644
--- a/searchlib/src/vespa/searchlib/diskindex/field_merger.h
+++ b/searchlib/src/vespa/searchlib/diskindex/field_merger.h
@@ -53,13 +53,14 @@ class FieldMerger
std::unique_ptr<FieldWriter> _writer;
State _state;
bool _failed;
+ bool _force_small_merge_chunk;
void make_tmp_dirs();
bool clean_tmp_dirs();
bool open_input_word_readers();
bool read_mapping_files();
bool renumber_word_ids_start();
- bool renumber_word_ids_main();
+ void renumber_word_ids_main();
bool renumber_word_ids_finish();
void renumber_word_ids_failed();
std::shared_ptr<FieldLengthScanner> allocate_field_length_scanner();
@@ -68,7 +69,7 @@ class FieldMerger
bool select_cooked_or_raw_features(FieldReader& reader);
bool setup_merge_heap();
bool merge_postings_start();
- bool merge_postings_main();
+ void merge_postings_main();
bool merge_postings_finish();
void merge_postings_failed();
public:
diff --git a/searchlib/src/vespa/searchlib/diskindex/fusion.h b/searchlib/src/vespa/searchlib/diskindex/fusion.h
index e905006bf14..1f5c4471950 100644
--- a/searchlib/src/vespa/searchlib/diskindex/fusion.h
+++ b/searchlib/src/vespa/searchlib/diskindex/fusion.h
@@ -42,6 +42,7 @@ public:
~Fusion();
void set_dynamic_k_pos_index_format(bool dynamic_k_pos_index_format) { _fusion_out_index.set_dynamic_k_pos_index_format(dynamic_k_pos_index_format); }
+ void set_force_small_merge_chunk(bool force_small_merge_chunk) { _fusion_out_index.set_force_small_merge_chunk(force_small_merge_chunk); }
bool merge(vespalib::ThreadExecutor& executor, std::shared_ptr<IFlushToken> flush_token);
};
diff --git a/searchlib/src/vespa/searchlib/diskindex/fusion_output_index.cpp b/searchlib/src/vespa/searchlib/diskindex/fusion_output_index.cpp
index 7c63eb0f940..3c75aa16b93 100644
--- a/searchlib/src/vespa/searchlib/diskindex/fusion_output_index.cpp
+++ b/searchlib/src/vespa/searchlib/diskindex/fusion_output_index.cpp
@@ -11,6 +11,7 @@ FusionOutputIndex::FusionOutputIndex(const index::Schema& schema, const vespalib
_old_indexes(std::move(old_indexes)),
_doc_id_limit(doc_id_limit),
_dynamic_k_pos_index_format(false),
+ _force_small_merge_chunk(false),
_tune_file_indexing(tune_file_indexing),
_file_header_context(file_header_context)
{
diff --git a/searchlib/src/vespa/searchlib/diskindex/fusion_output_index.h b/searchlib/src/vespa/searchlib/diskindex/fusion_output_index.h
index 59cda4b33de..729ecd26524 100644
--- a/searchlib/src/vespa/searchlib/diskindex/fusion_output_index.h
+++ b/searchlib/src/vespa/searchlib/diskindex/fusion_output_index.h
@@ -25,6 +25,7 @@ private:
const std::vector<FusionInputIndex>& _old_indexes;
const uint32_t _doc_id_limit;
bool _dynamic_k_pos_index_format;
+ bool _force_small_merge_chunk;
const TuneFileIndexing& _tune_file_indexing;
const common::FileHeaderContext& _file_header_context;
public:
@@ -32,11 +33,13 @@ public:
~FusionOutputIndex();
void set_dynamic_k_pos_index_format(bool dynamic_k_pos_index_format) { _dynamic_k_pos_index_format = dynamic_k_pos_index_format; }
+ void set_force_small_merge_chunk(bool force_small_merge_chunk) { _force_small_merge_chunk = force_small_merge_chunk; }
const index::Schema& get_schema() const noexcept { return _schema; }
const vespalib::string& get_path() const noexcept { return _path; }
const std::vector<FusionInputIndex>& get_old_indexes() const noexcept { return _old_indexes; }
uint32_t get_doc_id_limit() const noexcept { return _doc_id_limit; }
bool get_dynamic_k_pos_index_format() const noexcept { return _dynamic_k_pos_index_format; }
+ bool get_force_small_merge_chunk() const noexcept { return _force_small_merge_chunk; }
const TuneFileIndexing& get_tune_file_indexing() const noexcept { return _tune_file_indexing; }
const common::FileHeaderContext& get_file_header_context() const noexcept { return _file_header_context; }
};
diff --git a/searchlib/src/vespa/searchlib/test/fakedata/fakememtreeocc.cpp b/searchlib/src/vespa/searchlib/test/fakedata/fakememtreeocc.cpp
index 64d194c5c7e..666eed8f1e8 100644
--- a/searchlib/src/vespa/searchlib/test/fakedata/fakememtreeocc.cpp
+++ b/searchlib/src/vespa/searchlib/test/fakedata/fakememtreeocc.cpp
@@ -363,8 +363,11 @@ FakeMemTreeOccFactory::setup(const std::vector<const FakeWord *> &fws)
}
++i;
}
- heap.merge(_mgr, 4, flush_token);
- assert(heap.empty());
+ heap.setup(4);
+ heap.set_merge_chunk(100000);
+ while (!heap.empty()) {
+ heap.merge(_mgr, flush_token);
+ }
_mgr.finalize();
}
diff --git a/searchlib/src/vespa/searchlib/util/posting_priority_queue.h b/searchlib/src/vespa/searchlib/util/posting_priority_queue.h
index 01ae0995806..c1549b32f93 100644
--- a/searchlib/src/vespa/searchlib/util/posting_priority_queue.h
+++ b/searchlib/src/vespa/searchlib/util/posting_priority_queue.h
@@ -12,7 +12,7 @@ namespace search {
template <class Reader>
class PostingPriorityQueue
{
-public:
+protected:
class Ref
{
Reader *_ref;
@@ -28,9 +28,14 @@ public:
using Vector = std::vector<Ref>;
Vector _vec;
+ uint32_t _heap_limit;
+ uint32_t _merge_chunk;
+public:
PostingPriorityQueue()
- : _vec()
+ : _vec(),
+ _heap_limit(0u),
+ _merge_chunk(0u)
{
}
@@ -40,9 +45,10 @@ public:
/*
* Sort vector after a set of initial add operations, so lowest()
- * and adjust() can be used.
+ * and adjust() can be used. Skip sort if _vec.size() < heap_limit
+ * since merging with few elements don't use heap.
*/
- void sort() { std::sort(_vec.begin(), _vec.end()); }
+ void setup(uint32_t heap_limit);
/*
* Return lowest value. Assumes vector is sorted.
diff --git a/searchlib/src/vespa/searchlib/util/posting_priority_queue.hpp b/searchlib/src/vespa/searchlib/util/posting_priority_queue.hpp
index 69bf7bc547c..33f3bce2be6 100644
--- a/searchlib/src/vespa/searchlib/util/posting_priority_queue.hpp
+++ b/searchlib/src/vespa/searchlib/util/posting_priority_queue.hpp
@@ -32,4 +32,17 @@ PostingPriorityQueue<Reader>::adjust()
*to = changed; // Save changed value at right location
}
+template <class Reader>
+void
+PostingPriorityQueue<Reader>::setup(uint32_t heap_limit)
+{
+ _heap_limit = heap_limit;
+ for (auto ref : _vec) {
+ assert(ref.get()->isValid());
+ }
+ if (_vec.size() >= heap_limit) {
+ std::sort(_vec.begin(), _vec.end());
+ }
+}
+
}
diff --git a/searchlib/src/vespa/searchlib/util/posting_priority_queue_merger.h b/searchlib/src/vespa/searchlib/util/posting_priority_queue_merger.h
index 8dd941a2c13..9debcd06ea6 100644
--- a/searchlib/src/vespa/searchlib/util/posting_priority_queue_merger.h
+++ b/searchlib/src/vespa/searchlib/util/posting_priority_queue_merger.h
@@ -13,20 +13,29 @@ namespace search {
template <class Reader, class Writer>
class PostingPriorityQueueMerger : public PostingPriorityQueue<Reader>
{
+ uint32_t _merge_chunk;
public:
using Parent = PostingPriorityQueue<Reader>;
using Vector = typename Parent::Vector;
+ using Parent::_heap_limit;
using Parent::_vec;
using Parent::adjust;
using Parent::empty;
using Parent::lowest;
- using Parent::sort;
+ using Parent::setup;
- void mergeHeap(Writer& writer, const IFlushToken& flush_token) __attribute__((noinline));
- static void mergeOne(Writer& writer, Reader& reader, const IFlushToken &flush_token) __attribute__((noinline));
- static void mergeTwo(Writer& writer, Reader& reader1, Reader& reader2, const IFlushToken& flush_token) __attribute__((noinline));
- static void mergeSmall(Writer& writer, typename Vector::iterator ib, typename Vector::iterator ie, const IFlushToken &flush_token) __attribute__((noinline));
- void merge(Writer& writer, uint32_t heapLimit, const IFlushToken& flush_token) __attribute__((noinline));
+ PostingPriorityQueueMerger()
+ : Parent(),
+ _merge_chunk(0u)
+ {
+ }
+
+ void set_merge_chunk(uint32_t merge_chunk) { _merge_chunk = merge_chunk; }
+ void mergeHeap(Writer& writer, const IFlushToken& flush_token, uint32_t remaining_merge_chunk) __attribute__((noinline));
+ static void mergeOne(Writer& writer, Reader& reader, const IFlushToken &flush_token, uint32_t remaining_merge_chunk) __attribute__((noinline));
+ static void mergeTwo(Writer& writer, Reader& reader1, Reader& reader2, const IFlushToken& flush_token, uint32_t& remaining_merge_chunk) __attribute__((noinline));
+ static void mergeSmall(Writer& writer, typename Vector::iterator ib, typename Vector::iterator ie, const IFlushToken &flush_token, uint32_t& remaining_merge_chunk) __attribute__((noinline));
+ void merge(Writer& writer, const IFlushToken& flush_token) __attribute__((noinline));
};
}
diff --git a/searchlib/src/vespa/searchlib/util/posting_priority_queue_merger.hpp b/searchlib/src/vespa/searchlib/util/posting_priority_queue_merger.hpp
index d33356cee4a..5676f6326df 100644
--- a/searchlib/src/vespa/searchlib/util/posting_priority_queue_merger.hpp
+++ b/searchlib/src/vespa/searchlib/util/posting_priority_queue_merger.hpp
@@ -9,44 +9,48 @@ namespace search {
template <class Reader, class Writer>
void
-PostingPriorityQueueMerger<Reader, Writer>::mergeHeap(Writer& writer, const IFlushToken& flush_token)
+PostingPriorityQueueMerger<Reader, Writer>::mergeHeap(Writer& writer, const IFlushToken& flush_token, uint32_t remaining_merge_chunk)
{
- while (!empty() && !flush_token.stop_requested()) {
+ while (remaining_merge_chunk > 0u && !empty() && !flush_token.stop_requested()) {
Reader *low = lowest();
low->write(writer);
low->read();
adjust();
+ --remaining_merge_chunk;
}
}
template <class Reader, class Writer>
void
-PostingPriorityQueueMerger<Reader, Writer>::mergeOne(Writer& writer, Reader& reader, const IFlushToken& flush_token)
+PostingPriorityQueueMerger<Reader, Writer>::mergeOne(Writer& writer, Reader& reader, const IFlushToken& flush_token, uint32_t remaining_merge_chunk)
{
- while (reader.isValid() && !flush_token.stop_requested()) {
+ while (remaining_merge_chunk > 0u && reader.isValid() && !flush_token.stop_requested()) {
reader.write(writer);
reader.read();
+ --remaining_merge_chunk;
}
}
template <class Reader, class Writer>
void
-PostingPriorityQueueMerger<Reader, Writer>::mergeTwo(Writer& writer, Reader& reader1, Reader& reader2, const IFlushToken& flush_token)
+PostingPriorityQueueMerger<Reader, Writer>::mergeTwo(Writer& writer, Reader& reader1, Reader& reader2, const IFlushToken& flush_token, uint32_t& remaining_merge_chunk)
{
- while (!flush_token.stop_requested()) {
+ while (remaining_merge_chunk > 0u && !flush_token.stop_requested()) {
Reader &low = reader2 < reader1 ? reader2 : reader1;
low.write(writer);
low.read();
- if (!low.isValid())
+ --remaining_merge_chunk;
+ if (!low.isValid()) {
break;
+ }
}
}
template <class Reader, class Writer>
void
-PostingPriorityQueueMerger<Reader, Writer>::mergeSmall(Writer& writer, typename Vector::iterator ib, typename Vector::iterator ie, const IFlushToken& flush_token)
+PostingPriorityQueueMerger<Reader, Writer>::mergeSmall(Writer& writer, typename Vector::iterator ib, typename Vector::iterator ie, const IFlushToken& flush_token, uint32_t& remaining_merge_chunk)
{
- while (!flush_token.stop_requested()) {
+ while (remaining_merge_chunk > 0u && !flush_token.stop_requested()) {
typename Vector::iterator i = ib;
Reader *low = i->get();
for (++i; i != ie; ++i)
@@ -54,47 +58,49 @@ PostingPriorityQueueMerger<Reader, Writer>::mergeSmall(Writer& writer, typename
low = i->get();
low->write(writer);
low->read();
- if (!low->isValid())
+ --remaining_merge_chunk;
+ if (!low->isValid()) {
break;
+ }
}
}
template <class Reader, class Writer>
void
-PostingPriorityQueueMerger<Reader, Writer>::merge(Writer& writer, uint32_t heapLimit, const IFlushToken& flush_token)
+PostingPriorityQueueMerger<Reader, Writer>::merge(Writer& writer, const IFlushToken& flush_token)
{
if (_vec.empty())
return;
- for (typename Vector::iterator i = _vec.begin(), ie = _vec.end(); i != ie;
- ++i) {
- assert(i->get()->isValid());
- }
- if (_vec.size() >= heapLimit) {
- sort();
- void (PostingPriorityQueueMerger::*mergeHeapFunc)(Writer& writer, const IFlushToken& flush_token) =
+ assert(_heap_limit > 0u);
+ uint32_t remaining_merge_chunk = _merge_chunk;
+ if (_vec.size() >= _heap_limit) {
+ void (PostingPriorityQueueMerger::*mergeHeapFunc)(Writer& writer, const IFlushToken& flush_token, uint32_t remaining_merge_chunk) =
&PostingPriorityQueueMerger::mergeHeap;
- (this->*mergeHeapFunc)(writer, flush_token);
+ (this->*mergeHeapFunc)(writer, flush_token, remaining_merge_chunk);
return;
}
- while (!flush_token.stop_requested()) {
+ while (remaining_merge_chunk > 0u && !flush_token.stop_requested()) {
if (_vec.size() == 1) {
- void (*mergeOneFunc)(Writer& writer, Reader& reader, const IFlushToken& flush_token) =
+ void (*mergeOneFunc)(Writer& writer, Reader& reader, const IFlushToken& flush_token, uint32_t remaining_merge_chunk) =
&PostingPriorityQueueMerger::mergeOne;
- (*mergeOneFunc)(writer, *_vec.front().get(), flush_token);
- _vec.clear();
+ (*mergeOneFunc)(writer, *_vec.front().get(), flush_token, remaining_merge_chunk);
+ if (!_vec.front().get()->isValid()) {
+ _vec.clear();
+ }
return;
}
if (_vec.size() == 2) {
- void (*mergeTwoFunc)(Writer& writer, Reader& reader1, Reader& reader2, const IFlushToken& flush_token) =
+ void (*mergeTwoFunc)(Writer& writer, Reader& reader1, Reader& reader2, const IFlushToken& flush_token, uint32_t& remaining_merge_chunk) =
&PostingPriorityQueueMerger::mergeTwo;
- (*mergeTwoFunc)(writer, *_vec[0].get(), *_vec[1].get(), flush_token);
+ (*mergeTwoFunc)(writer, *_vec[0].get(), *_vec[1].get(), flush_token, remaining_merge_chunk);
} else {
void (*mergeSmallFunc)(Writer& writer,
typename Vector::iterator ib,
typename Vector::iterator ie,
- const IFlushToken& flush_token) =
+ const IFlushToken& flush_token,
+ uint32_t& remaining_merge_chunk) =
&PostingPriorityQueueMerger::mergeSmall;
- (*mergeSmallFunc)(writer, _vec.begin(), _vec.end(), flush_token);
+ (*mergeSmallFunc)(writer, _vec.begin(), _vec.end(), flush_token, remaining_merge_chunk);
}
for (typename Vector::iterator i = _vec.begin(), ie = _vec.end();
i != ie; ++i) {