diff options
author | Tor Egge <Tor.Egge@online.no> | 2022-01-09 16:34:43 +0100 |
---|---|---|
committer | Tor Egge <Tor.Egge@online.no> | 2022-01-09 16:48:03 +0100 |
commit | 58c83e0fce3c23edf0565ced68db48f2edd2fba8 (patch) | |
tree | 56ce7ff0ba5d07afee1456f95ac8cb9b3a0c9146 /searchlib | |
parent | 70231237adb460ffa0cf3e289880aa3115432fba (diff) |
Prepare for smaller tasks for field merging.
Diffstat (limited to 'searchlib')
12 files changed, 123 insertions, 74 deletions
diff --git a/searchlib/src/tests/diskindex/fusion/.gitignore b/searchlib/src/tests/diskindex/fusion/.gitignore index d9a33665c43..934c9efb8fa 100644 --- a/searchlib/src/tests/diskindex/fusion/.gitignore +++ b/searchlib/src/tests/diskindex/fusion/.gitignore @@ -21,10 +21,7 @@ mdump2 mdump3 mdump4 mdump5 -sdump2 -sdump3 -sdump4 -sdump5 +sdump[2-6] /ddump6 /dmdump6 /dump6 diff --git a/searchlib/src/tests/diskindex/fusion/fusion_test.cpp b/searchlib/src/tests/diskindex/fusion/fusion_test.cpp index ccc5e8cbade..72867edf474 100644 --- a/searchlib/src/tests/diskindex/fusion/fusion_test.cpp +++ b/searchlib/src/tests/diskindex/fusion/fusion_test.cpp @@ -24,7 +24,7 @@ #include <vespa/vespalib/util/destructor_callbacks.h> #include <vespa/vespalib/util/threadstackexecutor.h> #include <vespa/vespalib/util/sequencedtaskexecutor.h> -#include <gtest/gtest.h> +#include <vespa/vespalib/gtest/gtest.h> #include <vespa/log/log.h> LOG_SETUP("fusion_test"); @@ -65,7 +65,7 @@ protected: Schema _schema; const Schema & getSchema() const { return _schema; } - void requireThatFusionIsWorking(const vespalib::string &prefix, bool directio, bool readmmap); + void requireThatFusionIsWorking(const vespalib::string &prefix, bool directio, bool readmmap, bool force_short_merge_chunk); void make_simple_index(const vespalib::string &dump_dir, const IFieldLengthInspector &field_length_inspector); bool try_merge_simple_indexes(const vespalib::string &dump_dir, const std::vector<vespalib::string> &sources, std::shared_ptr<IFlushToken> flush_token); void merge_simple_indexes(const vespalib::string &dump_dir, const std::vector<vespalib::string> &sources); @@ -292,7 +292,7 @@ VESPA_THREAD_STACK_TAG(invert_executor) VESPA_THREAD_STACK_TAG(push_executor) void -FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool directio, bool readmmap) +FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool directio, bool readmmap, bool force_small_merge_chunk) { Schema schema; Schema schema2; @@ -393,6 +393,7 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire sources.push_back(prefix + "dump2"); Fusion fusion(schema, prefix + "dump3", sources, selector, tuneFileIndexing,fileHeaderContext); + fusion.set_force_small_merge_chunk(force_small_merge_chunk); ASSERT_TRUE(fusion.merge(executor, std::make_shared<FlushToken>())); } while (0); do { @@ -406,6 +407,7 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire sources.push_back(prefix + "dump3"); Fusion fusion(schema2, prefix + "dump4", sources, selector, tuneFileIndexing, fileHeaderContext); + fusion.set_force_small_merge_chunk(force_small_merge_chunk); ASSERT_TRUE(fusion.merge(executor, std::make_shared<FlushToken>())); } while (0); do { @@ -419,6 +421,7 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire sources.push_back(prefix + "dump3"); Fusion fusion(schema3, prefix + "dump5", sources, selector, tuneFileIndexing, fileHeaderContext); + fusion.set_force_small_merge_chunk(force_small_merge_chunk); ASSERT_TRUE(fusion.merge(executor, std::make_shared<FlushToken>())); } while (0); do { @@ -433,6 +436,7 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire Fusion fusion(schema, prefix + "dump6", sources, selector, tuneFileIndexing, fileHeaderContext); fusion.set_dynamic_k_pos_index_format(true); + fusion.set_force_small_merge_chunk(force_small_merge_chunk); ASSERT_TRUE(fusion.merge(executor, std::make_shared<FlushToken>())); } while (0); do { @@ -446,6 +450,7 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire sources.push_back(prefix + "dump2"); Fusion fusion(schema, prefix + "dump3", sources, selector, tuneFileIndexing, fileHeaderContext); + fusion.set_force_small_merge_chunk(force_small_merge_chunk); ASSERT_TRUE(fusion.merge(executor, std::make_shared<FlushToken>())); } while (0); do { @@ -506,22 +511,27 @@ FusionTest::FusionTest() TEST_F(FusionTest, require_that_normal_fusion_is_working) { - requireThatFusionIsWorking("", false, false); + requireThatFusionIsWorking("", false, false, false); } TEST_F(FusionTest, require_that_directio_fusion_is_working) { - requireThatFusionIsWorking("d", true, false); + requireThatFusionIsWorking("d", true, false, false); } TEST_F(FusionTest, require_that_mmap_fusion_is_working) { - requireThatFusionIsWorking("m", false, true); + requireThatFusionIsWorking("m", false, true, false); } TEST_F(FusionTest, require_that_directiommap_fusion_is_working) { - requireThatFusionIsWorking("dm", true, true); + requireThatFusionIsWorking("dm", true, true, false); +} + +TEST_F(FusionTest, require_that_small_merge_chunk_fusion_is_working) +{ + requireThatFusionIsWorking("s", false, false, true); } namespace { @@ -608,11 +618,11 @@ TEST_F(FusionTest, require_that_fusion_can_be_stopped) vespalib::rmdir("stopdump3", true); flush_token = std::make_shared<MyFlushToken>(1); ASSERT_FALSE(try_merge_simple_indexes("stopdump3", {"stopdump2"}, flush_token)); - EXPECT_EQ(12, flush_token->get_checks()); + EXPECT_EQ(8, flush_token->get_checks()); vespalib::rmdir("stopdump3", true); flush_token = std::make_shared<MyFlushToken>(47); ASSERT_FALSE(try_merge_simple_indexes("stopdump3", {"stopdump2"}, flush_token)); - EXPECT_LT(48, flush_token->get_checks()); + EXPECT_LE(48, flush_token->get_checks()); clean_stopped_fusion_testdirs(); } diff --git a/searchlib/src/vespa/searchlib/diskindex/field_merger.cpp b/searchlib/src/vespa/searchlib/diskindex/field_merger.cpp index 6179f06c9da..d313957d528 100644 --- a/searchlib/src/vespa/searchlib/diskindex/field_merger.cpp +++ b/searchlib/src/vespa/searchlib/diskindex/field_merger.cpp @@ -39,6 +39,9 @@ namespace search::diskindex { namespace { +constexpr uint32_t renumber_word_ids_merge_chunk = 1000000; +constexpr uint32_t merge_postings_merge_chunk = 50000; + vespalib::string createTmpPath(const vespalib::string & base, uint32_t index) { vespalib::asciistream os; @@ -174,18 +177,20 @@ FieldMerger::renumber_word_ids_start() return false; } _word_aggregator = std::make_unique<WordAggregator>(); + _word_heap->setup(4); + _word_heap->set_merge_chunk(_fusion_out_index.get_force_small_merge_chunk() ? 1u : renumber_word_ids_merge_chunk); return true; } -bool +void FieldMerger::renumber_word_ids_main() { - _word_heap->merge(*_word_aggregator, 4, *_flush_token); + _word_heap->merge(*_word_aggregator, *_flush_token); if (_flush_token->stop_requested()) { - return false; + _failed = true; + } else if (_word_heap->empty()) { + _state = State::RENUMBER_WORD_IDS_FINISH; } - assert(_word_heap->empty()); - return true; } bool @@ -356,6 +361,8 @@ FieldMerger::setup_merge_heap() _heap->initialAdd(reader.get()); } } + _heap->setup(4); + _heap->set_merge_chunk(_fusion_out_index.get_force_small_merge_chunk() ? 1u : merge_postings_merge_chunk); return true; } @@ -374,15 +381,15 @@ FieldMerger::merge_postings_start() return setup_merge_heap(); } -bool +void FieldMerger::merge_postings_main() { - _heap->merge(*_writer, 4, *_flush_token); + _heap->merge(*_writer, *_flush_token); if (_flush_token->stop_requested()) { - return false; + _failed = true; + } else if (_heap->empty()) { + _state = State::MERGE_POSTINGS_FINISH; } - assert(_heap->empty()); - return true; } bool @@ -475,11 +482,7 @@ FieldMerger::process_merge_field() merge_field_start(); break; case State::RENUMBER_WORD_IDS: - if (!renumber_word_ids_main()) { - renumber_word_ids_failed(); - } else { - _state = State::RENUMBER_WORD_IDS_FINISH; - } + renumber_word_ids_main(); break; case State::RENUMBER_WORD_IDS_FINISH: if (!renumber_word_ids_finish()) { @@ -491,11 +494,7 @@ FieldMerger::process_merge_field() } break; case State::MERGE_POSTINGS: - if (!merge_postings_main()) { - merge_postings_failed(); - } else { - _state = State::MERGE_POSTINGS_FINISH; - } + merge_postings_main(); break; case State::MERGE_POSTINGS_FINISH: merge_field_finish(); diff --git a/searchlib/src/vespa/searchlib/diskindex/field_merger.h b/searchlib/src/vespa/searchlib/diskindex/field_merger.h index c5ce337e845..5017a7d5192 100644 --- a/searchlib/src/vespa/searchlib/diskindex/field_merger.h +++ b/searchlib/src/vespa/searchlib/diskindex/field_merger.h @@ -53,13 +53,14 @@ class FieldMerger std::unique_ptr<FieldWriter> _writer; State _state; bool _failed; + bool _force_small_merge_chunk; void make_tmp_dirs(); bool clean_tmp_dirs(); bool open_input_word_readers(); bool read_mapping_files(); bool renumber_word_ids_start(); - bool renumber_word_ids_main(); + void renumber_word_ids_main(); bool renumber_word_ids_finish(); void renumber_word_ids_failed(); std::shared_ptr<FieldLengthScanner> allocate_field_length_scanner(); @@ -68,7 +69,7 @@ class FieldMerger bool select_cooked_or_raw_features(FieldReader& reader); bool setup_merge_heap(); bool merge_postings_start(); - bool merge_postings_main(); + void merge_postings_main(); bool merge_postings_finish(); void merge_postings_failed(); public: diff --git a/searchlib/src/vespa/searchlib/diskindex/fusion.h b/searchlib/src/vespa/searchlib/diskindex/fusion.h index e905006bf14..1f5c4471950 100644 --- a/searchlib/src/vespa/searchlib/diskindex/fusion.h +++ b/searchlib/src/vespa/searchlib/diskindex/fusion.h @@ -42,6 +42,7 @@ public: ~Fusion(); void set_dynamic_k_pos_index_format(bool dynamic_k_pos_index_format) { _fusion_out_index.set_dynamic_k_pos_index_format(dynamic_k_pos_index_format); } + void set_force_small_merge_chunk(bool force_small_merge_chunk) { _fusion_out_index.set_force_small_merge_chunk(force_small_merge_chunk); } bool merge(vespalib::ThreadExecutor& executor, std::shared_ptr<IFlushToken> flush_token); }; diff --git a/searchlib/src/vespa/searchlib/diskindex/fusion_output_index.cpp b/searchlib/src/vespa/searchlib/diskindex/fusion_output_index.cpp index 7c63eb0f940..3c75aa16b93 100644 --- a/searchlib/src/vespa/searchlib/diskindex/fusion_output_index.cpp +++ b/searchlib/src/vespa/searchlib/diskindex/fusion_output_index.cpp @@ -11,6 +11,7 @@ FusionOutputIndex::FusionOutputIndex(const index::Schema& schema, const vespalib _old_indexes(std::move(old_indexes)), _doc_id_limit(doc_id_limit), _dynamic_k_pos_index_format(false), + _force_small_merge_chunk(false), _tune_file_indexing(tune_file_indexing), _file_header_context(file_header_context) { diff --git a/searchlib/src/vespa/searchlib/diskindex/fusion_output_index.h b/searchlib/src/vespa/searchlib/diskindex/fusion_output_index.h index 59cda4b33de..729ecd26524 100644 --- a/searchlib/src/vespa/searchlib/diskindex/fusion_output_index.h +++ b/searchlib/src/vespa/searchlib/diskindex/fusion_output_index.h @@ -25,6 +25,7 @@ private: const std::vector<FusionInputIndex>& _old_indexes; const uint32_t _doc_id_limit; bool _dynamic_k_pos_index_format; + bool _force_small_merge_chunk; const TuneFileIndexing& _tune_file_indexing; const common::FileHeaderContext& _file_header_context; public: @@ -32,11 +33,13 @@ public: ~FusionOutputIndex(); void set_dynamic_k_pos_index_format(bool dynamic_k_pos_index_format) { _dynamic_k_pos_index_format = dynamic_k_pos_index_format; } + void set_force_small_merge_chunk(bool force_small_merge_chunk) { _force_small_merge_chunk = force_small_merge_chunk; } const index::Schema& get_schema() const noexcept { return _schema; } const vespalib::string& get_path() const noexcept { return _path; } const std::vector<FusionInputIndex>& get_old_indexes() const noexcept { return _old_indexes; } uint32_t get_doc_id_limit() const noexcept { return _doc_id_limit; } bool get_dynamic_k_pos_index_format() const noexcept { return _dynamic_k_pos_index_format; } + bool get_force_small_merge_chunk() const noexcept { return _force_small_merge_chunk; } const TuneFileIndexing& get_tune_file_indexing() const noexcept { return _tune_file_indexing; } const common::FileHeaderContext& get_file_header_context() const noexcept { return _file_header_context; } }; diff --git a/searchlib/src/vespa/searchlib/test/fakedata/fakememtreeocc.cpp b/searchlib/src/vespa/searchlib/test/fakedata/fakememtreeocc.cpp index 64d194c5c7e..666eed8f1e8 100644 --- a/searchlib/src/vespa/searchlib/test/fakedata/fakememtreeocc.cpp +++ b/searchlib/src/vespa/searchlib/test/fakedata/fakememtreeocc.cpp @@ -363,8 +363,11 @@ FakeMemTreeOccFactory::setup(const std::vector<const FakeWord *> &fws) } ++i; } - heap.merge(_mgr, 4, flush_token); - assert(heap.empty()); + heap.setup(4); + heap.set_merge_chunk(100000); + while (!heap.empty()) { + heap.merge(_mgr, flush_token); + } _mgr.finalize(); } diff --git a/searchlib/src/vespa/searchlib/util/posting_priority_queue.h b/searchlib/src/vespa/searchlib/util/posting_priority_queue.h index 01ae0995806..c1549b32f93 100644 --- a/searchlib/src/vespa/searchlib/util/posting_priority_queue.h +++ b/searchlib/src/vespa/searchlib/util/posting_priority_queue.h @@ -12,7 +12,7 @@ namespace search { template <class Reader> class PostingPriorityQueue { -public: +protected: class Ref { Reader *_ref; @@ -28,9 +28,14 @@ public: using Vector = std::vector<Ref>; Vector _vec; + uint32_t _heap_limit; + uint32_t _merge_chunk; +public: PostingPriorityQueue() - : _vec() + : _vec(), + _heap_limit(0u), + _merge_chunk(0u) { } @@ -40,9 +45,10 @@ public: /* * Sort vector after a set of initial add operations, so lowest() - * and adjust() can be used. + * and adjust() can be used. Skip sort if _vec.size() < heap_limit + * since merging with few elements don't use heap. */ - void sort() { std::sort(_vec.begin(), _vec.end()); } + void setup(uint32_t heap_limit); /* * Return lowest value. Assumes vector is sorted. diff --git a/searchlib/src/vespa/searchlib/util/posting_priority_queue.hpp b/searchlib/src/vespa/searchlib/util/posting_priority_queue.hpp index 69bf7bc547c..33f3bce2be6 100644 --- a/searchlib/src/vespa/searchlib/util/posting_priority_queue.hpp +++ b/searchlib/src/vespa/searchlib/util/posting_priority_queue.hpp @@ -32,4 +32,17 @@ PostingPriorityQueue<Reader>::adjust() *to = changed; // Save changed value at right location } +template <class Reader> +void +PostingPriorityQueue<Reader>::setup(uint32_t heap_limit) +{ + _heap_limit = heap_limit; + for (auto ref : _vec) { + assert(ref.get()->isValid()); + } + if (_vec.size() >= heap_limit) { + std::sort(_vec.begin(), _vec.end()); + } +} + } diff --git a/searchlib/src/vespa/searchlib/util/posting_priority_queue_merger.h b/searchlib/src/vespa/searchlib/util/posting_priority_queue_merger.h index 8dd941a2c13..9debcd06ea6 100644 --- a/searchlib/src/vespa/searchlib/util/posting_priority_queue_merger.h +++ b/searchlib/src/vespa/searchlib/util/posting_priority_queue_merger.h @@ -13,20 +13,29 @@ namespace search { template <class Reader, class Writer> class PostingPriorityQueueMerger : public PostingPriorityQueue<Reader> { + uint32_t _merge_chunk; public: using Parent = PostingPriorityQueue<Reader>; using Vector = typename Parent::Vector; + using Parent::_heap_limit; using Parent::_vec; using Parent::adjust; using Parent::empty; using Parent::lowest; - using Parent::sort; + using Parent::setup; - void mergeHeap(Writer& writer, const IFlushToken& flush_token) __attribute__((noinline)); - static void mergeOne(Writer& writer, Reader& reader, const IFlushToken &flush_token) __attribute__((noinline)); - static void mergeTwo(Writer& writer, Reader& reader1, Reader& reader2, const IFlushToken& flush_token) __attribute__((noinline)); - static void mergeSmall(Writer& writer, typename Vector::iterator ib, typename Vector::iterator ie, const IFlushToken &flush_token) __attribute__((noinline)); - void merge(Writer& writer, uint32_t heapLimit, const IFlushToken& flush_token) __attribute__((noinline)); + PostingPriorityQueueMerger() + : Parent(), + _merge_chunk(0u) + { + } + + void set_merge_chunk(uint32_t merge_chunk) { _merge_chunk = merge_chunk; } + void mergeHeap(Writer& writer, const IFlushToken& flush_token, uint32_t remaining_merge_chunk) __attribute__((noinline)); + static void mergeOne(Writer& writer, Reader& reader, const IFlushToken &flush_token, uint32_t remaining_merge_chunk) __attribute__((noinline)); + static void mergeTwo(Writer& writer, Reader& reader1, Reader& reader2, const IFlushToken& flush_token, uint32_t& remaining_merge_chunk) __attribute__((noinline)); + static void mergeSmall(Writer& writer, typename Vector::iterator ib, typename Vector::iterator ie, const IFlushToken &flush_token, uint32_t& remaining_merge_chunk) __attribute__((noinline)); + void merge(Writer& writer, const IFlushToken& flush_token) __attribute__((noinline)); }; } diff --git a/searchlib/src/vespa/searchlib/util/posting_priority_queue_merger.hpp b/searchlib/src/vespa/searchlib/util/posting_priority_queue_merger.hpp index d33356cee4a..5676f6326df 100644 --- a/searchlib/src/vespa/searchlib/util/posting_priority_queue_merger.hpp +++ b/searchlib/src/vespa/searchlib/util/posting_priority_queue_merger.hpp @@ -9,44 +9,48 @@ namespace search { template <class Reader, class Writer> void -PostingPriorityQueueMerger<Reader, Writer>::mergeHeap(Writer& writer, const IFlushToken& flush_token) +PostingPriorityQueueMerger<Reader, Writer>::mergeHeap(Writer& writer, const IFlushToken& flush_token, uint32_t remaining_merge_chunk) { - while (!empty() && !flush_token.stop_requested()) { + while (remaining_merge_chunk > 0u && !empty() && !flush_token.stop_requested()) { Reader *low = lowest(); low->write(writer); low->read(); adjust(); + --remaining_merge_chunk; } } template <class Reader, class Writer> void -PostingPriorityQueueMerger<Reader, Writer>::mergeOne(Writer& writer, Reader& reader, const IFlushToken& flush_token) +PostingPriorityQueueMerger<Reader, Writer>::mergeOne(Writer& writer, Reader& reader, const IFlushToken& flush_token, uint32_t remaining_merge_chunk) { - while (reader.isValid() && !flush_token.stop_requested()) { + while (remaining_merge_chunk > 0u && reader.isValid() && !flush_token.stop_requested()) { reader.write(writer); reader.read(); + --remaining_merge_chunk; } } template <class Reader, class Writer> void -PostingPriorityQueueMerger<Reader, Writer>::mergeTwo(Writer& writer, Reader& reader1, Reader& reader2, const IFlushToken& flush_token) +PostingPriorityQueueMerger<Reader, Writer>::mergeTwo(Writer& writer, Reader& reader1, Reader& reader2, const IFlushToken& flush_token, uint32_t& remaining_merge_chunk) { - while (!flush_token.stop_requested()) { + while (remaining_merge_chunk > 0u && !flush_token.stop_requested()) { Reader &low = reader2 < reader1 ? reader2 : reader1; low.write(writer); low.read(); - if (!low.isValid()) + --remaining_merge_chunk; + if (!low.isValid()) { break; + } } } template <class Reader, class Writer> void -PostingPriorityQueueMerger<Reader, Writer>::mergeSmall(Writer& writer, typename Vector::iterator ib, typename Vector::iterator ie, const IFlushToken& flush_token) +PostingPriorityQueueMerger<Reader, Writer>::mergeSmall(Writer& writer, typename Vector::iterator ib, typename Vector::iterator ie, const IFlushToken& flush_token, uint32_t& remaining_merge_chunk) { - while (!flush_token.stop_requested()) { + while (remaining_merge_chunk > 0u && !flush_token.stop_requested()) { typename Vector::iterator i = ib; Reader *low = i->get(); for (++i; i != ie; ++i) @@ -54,47 +58,49 @@ PostingPriorityQueueMerger<Reader, Writer>::mergeSmall(Writer& writer, typename low = i->get(); low->write(writer); low->read(); - if (!low->isValid()) + --remaining_merge_chunk; + if (!low->isValid()) { break; + } } } template <class Reader, class Writer> void -PostingPriorityQueueMerger<Reader, Writer>::merge(Writer& writer, uint32_t heapLimit, const IFlushToken& flush_token) +PostingPriorityQueueMerger<Reader, Writer>::merge(Writer& writer, const IFlushToken& flush_token) { if (_vec.empty()) return; - for (typename Vector::iterator i = _vec.begin(), ie = _vec.end(); i != ie; - ++i) { - assert(i->get()->isValid()); - } - if (_vec.size() >= heapLimit) { - sort(); - void (PostingPriorityQueueMerger::*mergeHeapFunc)(Writer& writer, const IFlushToken& flush_token) = + assert(_heap_limit > 0u); + uint32_t remaining_merge_chunk = _merge_chunk; + if (_vec.size() >= _heap_limit) { + void (PostingPriorityQueueMerger::*mergeHeapFunc)(Writer& writer, const IFlushToken& flush_token, uint32_t remaining_merge_chunk) = &PostingPriorityQueueMerger::mergeHeap; - (this->*mergeHeapFunc)(writer, flush_token); + (this->*mergeHeapFunc)(writer, flush_token, remaining_merge_chunk); return; } - while (!flush_token.stop_requested()) { + while (remaining_merge_chunk > 0u && !flush_token.stop_requested()) { if (_vec.size() == 1) { - void (*mergeOneFunc)(Writer& writer, Reader& reader, const IFlushToken& flush_token) = + void (*mergeOneFunc)(Writer& writer, Reader& reader, const IFlushToken& flush_token, uint32_t remaining_merge_chunk) = &PostingPriorityQueueMerger::mergeOne; - (*mergeOneFunc)(writer, *_vec.front().get(), flush_token); - _vec.clear(); + (*mergeOneFunc)(writer, *_vec.front().get(), flush_token, remaining_merge_chunk); + if (!_vec.front().get()->isValid()) { + _vec.clear(); + } return; } if (_vec.size() == 2) { - void (*mergeTwoFunc)(Writer& writer, Reader& reader1, Reader& reader2, const IFlushToken& flush_token) = + void (*mergeTwoFunc)(Writer& writer, Reader& reader1, Reader& reader2, const IFlushToken& flush_token, uint32_t& remaining_merge_chunk) = &PostingPriorityQueueMerger::mergeTwo; - (*mergeTwoFunc)(writer, *_vec[0].get(), *_vec[1].get(), flush_token); + (*mergeTwoFunc)(writer, *_vec[0].get(), *_vec[1].get(), flush_token, remaining_merge_chunk); } else { void (*mergeSmallFunc)(Writer& writer, typename Vector::iterator ib, typename Vector::iterator ie, - const IFlushToken& flush_token) = + const IFlushToken& flush_token, + uint32_t& remaining_merge_chunk) = &PostingPriorityQueueMerger::mergeSmall; - (*mergeSmallFunc)(writer, _vec.begin(), _vec.end(), flush_token); + (*mergeSmallFunc)(writer, _vec.begin(), _vec.end(), flush_token, remaining_merge_chunk); } for (typename Vector::iterator i = _vec.begin(), ie = _vec.end(); i != ie; ++i) { |