diff options
17 files changed, 247 insertions, 186 deletions
diff --git a/searchcore/src/tests/proton/feed_and_search/feed_and_search.cpp b/searchcore/src/tests/proton/feed_and_search/feed_and_search.cpp index 855b31310a3..aff368ceced 100644 --- a/searchcore/src/tests/proton/feed_and_search/feed_and_search.cpp +++ b/searchcore/src/tests/proton/feed_and_search/feed_and_search.cpp @@ -197,16 +197,16 @@ void Test::requireThatMemoryIndexCanBeDumpedAndSearched() { bool fret1 = DocumentSummary::readDocIdLimit(index_dir, fusionDocIdLimit); ASSERT_TRUE(fret1); SelectorArray selector(fusionDocIdLimit, 0); - bool fret2 = Fusion::merge(schema, - index_dir2, - fusionInputs, - selector, - false /* dynamicKPosOccFormat */, - tuneFileIndexing, - fileHeaderContext, - sharedExecutor, - std::make_shared<FlushToken>()); - ASSERT_TRUE(fret2); + { + Fusion fusion(schema, + index_dir2, + fusionInputs, + selector, + tuneFileIndexing, + fileHeaderContext); + bool fret2 = fusion.merge(sharedExecutor, std::make_shared<FlushToken>()); + ASSERT_TRUE(fret2); + } // Fusion test with all docs removed in output (doesn't affect word list) const string index_dir3 = "test_index3"; @@ -216,16 +216,16 @@ void Test::requireThatMemoryIndexCanBeDumpedAndSearched() { bool fret3 = DocumentSummary::readDocIdLimit(index_dir, fusionDocIdLimit); ASSERT_TRUE(fret3); SelectorArray selector2(fusionDocIdLimit, 1); - bool fret4 = Fusion::merge(schema, - index_dir3, - fusionInputs, - selector2, - false /* dynamicKPosOccFormat */, - tuneFileIndexing, - fileHeaderContext, - sharedExecutor, - std::make_shared<FlushToken>()); - ASSERT_TRUE(fret4); + { + Fusion fusion(schema, + index_dir3, + fusionInputs, + selector2, + tuneFileIndexing, + fileHeaderContext); + bool fret4 = fusion.merge(sharedExecutor, std::make_shared<FlushToken>()); + ASSERT_TRUE(fret4); + } // Fusion test with all docs removed in input (affects word list) const string index_dir4 = "test_index4"; @@ -235,16 +235,17 @@ void Test::requireThatMemoryIndexCanBeDumpedAndSearched() { bool fret5 = DocumentSummary::readDocIdLimit(index_dir3, fusionDocIdLimit); ASSERT_TRUE(fret5); SelectorArray selector3(fusionDocIdLimit, 0); - bool fret6 = Fusion::merge(schema, - index_dir4, - fusionInputs, - selector3, - false /* dynamicKPosOccFormat */, - tuneFileIndexing, - fileHeaderContext, - sharedExecutor, - std::make_shared<FlushToken>()); - ASSERT_TRUE(fret6); + { + Fusion fusion(schema, + index_dir4, + fusionInputs, + selector3, + tuneFileIndexing, + fileHeaderContext); + bool fret6 = fusion.merge(sharedExecutor, + std::make_shared<FlushToken>()); + ASSERT_TRUE(fret6); + } DiskIndex disk_index(index_dir); ASSERT_TRUE(disk_index.setup(TuneFileSearch())); diff --git a/searchcore/src/vespa/searchcore/proton/index/indexmanager.cpp b/searchcore/src/vespa/searchcore/proton/index/indexmanager.cpp index bfca455217c..251d0475537 100644 --- a/searchcore/src/vespa/searchcore/proton/index/indexmanager.cpp +++ b/searchcore/src/vespa/searchcore/proton/index/indexmanager.cpp @@ -69,9 +69,9 @@ IndexManager::MaintainerOperations::runFusion(const Schema &schema, std::shared_ptr<IFlushToken> flush_token) { SerialNumFileHeaderContext fileHeaderContext(_fileHeaderContext, serialNum); - const bool dynamic_k_doc_pos_occ_format = false; - return Fusion::merge(schema, outputDir, sources, selectorArray, dynamic_k_doc_pos_occ_format, - _tuneFileIndexing, fileHeaderContext, _threadingService.shared(), std::move(flush_token)); + Fusion fusion(schema, outputDir, sources, selectorArray, + _tuneFileIndexing, fileHeaderContext); + return fusion.merge(_threadingService.shared(), std::move(flush_token)); } diff --git a/searchlib/src/tests/diskindex/fusion/.gitignore b/searchlib/src/tests/diskindex/fusion/.gitignore index d9a33665c43..934c9efb8fa 100644 --- a/searchlib/src/tests/diskindex/fusion/.gitignore +++ b/searchlib/src/tests/diskindex/fusion/.gitignore @@ -21,10 +21,7 @@ mdump2 mdump3 mdump4 mdump5 -sdump2 -sdump3 -sdump4 -sdump5 +sdump[2-6] /ddump6 /dmdump6 /dump6 diff --git a/searchlib/src/tests/diskindex/fusion/fusion_test.cpp b/searchlib/src/tests/diskindex/fusion/fusion_test.cpp index 6794b9c0f5c..72867edf474 100644 --- a/searchlib/src/tests/diskindex/fusion/fusion_test.cpp +++ b/searchlib/src/tests/diskindex/fusion/fusion_test.cpp @@ -24,7 +24,7 @@ #include <vespa/vespalib/util/destructor_callbacks.h> #include <vespa/vespalib/util/threadstackexecutor.h> #include <vespa/vespalib/util/sequencedtaskexecutor.h> -#include <gtest/gtest.h> +#include <vespa/vespalib/gtest/gtest.h> #include <vespa/log/log.h> LOG_SETUP("fusion_test"); @@ -65,7 +65,7 @@ protected: Schema _schema; const Schema & getSchema() const { return _schema; } - void requireThatFusionIsWorking(const vespalib::string &prefix, bool directio, bool readmmap); + void requireThatFusionIsWorking(const vespalib::string &prefix, bool directio, bool readmmap, bool force_short_merge_chunk); void make_simple_index(const vespalib::string &dump_dir, const IFieldLengthInspector &field_length_inspector); bool try_merge_simple_indexes(const vespalib::string &dump_dir, const std::vector<vespalib::string> &sources, std::shared_ptr<IFlushToken> flush_token); void merge_simple_indexes(const vespalib::string &dump_dir, const std::vector<vespalib::string> &sources); @@ -292,7 +292,7 @@ VESPA_THREAD_STACK_TAG(invert_executor) VESPA_THREAD_STACK_TAG(push_executor) void -FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool directio, bool readmmap) +FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool directio, bool readmmap, bool force_small_merge_chunk) { Schema schema; Schema schema2; @@ -357,7 +357,6 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire ib.setPrefix(dump2dir); uint32_t numDocs = 12 + 1; uint32_t numWords = fic.getNumUniqueWords(); - bool dynamicKPosOcc = false; MockFieldLengthInspector mock_field_length_inspector; TuneFileIndexing tuneFileIndexing; TuneFileSearch tuneFileSearch; @@ -392,9 +391,10 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire std::vector<vespalib::string> sources; SelectorArray selector(numDocs, 0); sources.push_back(prefix + "dump2"); - ASSERT_TRUE(Fusion::merge(schema, prefix + "dump3", sources, selector, - dynamicKPosOcc, - tuneFileIndexing,fileHeaderContext, executor, std::make_shared<FlushToken>())); + Fusion fusion(schema, prefix + "dump3", sources, selector, + tuneFileIndexing,fileHeaderContext); + fusion.set_force_small_merge_chunk(force_small_merge_chunk); + ASSERT_TRUE(fusion.merge(executor, std::make_shared<FlushToken>())); } while (0); do { DiskIndex dw3(prefix + "dump3"); @@ -405,9 +405,10 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire std::vector<vespalib::string> sources; SelectorArray selector(numDocs, 0); sources.push_back(prefix + "dump3"); - ASSERT_TRUE(Fusion::merge(schema2, prefix + "dump4", sources, selector, - dynamicKPosOcc, - tuneFileIndexing, fileHeaderContext, executor, std::make_shared<FlushToken>())); + Fusion fusion(schema2, prefix + "dump4", sources, selector, + tuneFileIndexing, fileHeaderContext); + fusion.set_force_small_merge_chunk(force_small_merge_chunk); + ASSERT_TRUE(fusion.merge(executor, std::make_shared<FlushToken>())); } while (0); do { DiskIndex dw4(prefix + "dump4"); @@ -418,9 +419,10 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire std::vector<vespalib::string> sources; SelectorArray selector(numDocs, 0); sources.push_back(prefix + "dump3"); - ASSERT_TRUE(Fusion::merge(schema3, prefix + "dump5", sources, selector, - dynamicKPosOcc, - tuneFileIndexing, fileHeaderContext, executor, std::make_shared<FlushToken>())); + Fusion fusion(schema3, prefix + "dump5", sources, selector, + tuneFileIndexing, fileHeaderContext); + fusion.set_force_small_merge_chunk(force_small_merge_chunk); + ASSERT_TRUE(fusion.merge(executor, std::make_shared<FlushToken>())); } while (0); do { DiskIndex dw5(prefix + "dump5"); @@ -431,9 +433,11 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire std::vector<vespalib::string> sources; SelectorArray selector(numDocs, 0); sources.push_back(prefix + "dump3"); - ASSERT_TRUE(Fusion::merge(schema, prefix + "dump6", sources, selector, - !dynamicKPosOcc, - tuneFileIndexing, fileHeaderContext, executor, std::make_shared<FlushToken>())); + Fusion fusion(schema, prefix + "dump6", sources, selector, + tuneFileIndexing, fileHeaderContext); + fusion.set_dynamic_k_pos_index_format(true); + fusion.set_force_small_merge_chunk(force_small_merge_chunk); + ASSERT_TRUE(fusion.merge(executor, std::make_shared<FlushToken>())); } while (0); do { DiskIndex dw6(prefix + "dump6"); @@ -444,9 +448,10 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire std::vector<vespalib::string> sources; SelectorArray selector(numDocs, 0); sources.push_back(prefix + "dump2"); - ASSERT_TRUE(Fusion::merge(schema, prefix + "dump3", sources, selector, - dynamicKPosOcc, - tuneFileIndexing, fileHeaderContext, executor, std::make_shared<FlushToken>())); + Fusion fusion(schema, prefix + "dump3", sources, selector, + tuneFileIndexing, fileHeaderContext); + fusion.set_force_small_merge_chunk(force_small_merge_chunk); + ASSERT_TRUE(fusion.merge(executor, std::make_shared<FlushToken>())); } while (0); do { DiskIndex dw3(prefix + "dump3"); @@ -487,9 +492,9 @@ FusionTest::try_merge_simple_indexes(const vespalib::string &dump_dir, const std TuneFileIndexing tuneFileIndexing; DummyFileHeaderContext fileHeaderContext; SelectorArray selector(20, 0); - return Fusion::merge(_schema, dump_dir, sources, selector, - false, - tuneFileIndexing, fileHeaderContext, executor, flush_token); + Fusion fusion(_schema, dump_dir, sources, selector, + tuneFileIndexing, fileHeaderContext); + return fusion.merge(executor, flush_token); } void @@ -506,22 +511,27 @@ FusionTest::FusionTest() TEST_F(FusionTest, require_that_normal_fusion_is_working) { - requireThatFusionIsWorking("", false, false); + requireThatFusionIsWorking("", false, false, false); } TEST_F(FusionTest, require_that_directio_fusion_is_working) { - requireThatFusionIsWorking("d", true, false); + requireThatFusionIsWorking("d", true, false, false); } TEST_F(FusionTest, require_that_mmap_fusion_is_working) { - requireThatFusionIsWorking("m", false, true); + requireThatFusionIsWorking("m", false, true, false); } TEST_F(FusionTest, require_that_directiommap_fusion_is_working) { - requireThatFusionIsWorking("dm", true, true); + requireThatFusionIsWorking("dm", true, true, false); +} + +TEST_F(FusionTest, require_that_small_merge_chunk_fusion_is_working) +{ + requireThatFusionIsWorking("s", false, false, true); } namespace { @@ -608,11 +618,11 @@ TEST_F(FusionTest, require_that_fusion_can_be_stopped) vespalib::rmdir("stopdump3", true); flush_token = std::make_shared<MyFlushToken>(1); ASSERT_FALSE(try_merge_simple_indexes("stopdump3", {"stopdump2"}, flush_token)); - EXPECT_EQ(12, flush_token->get_checks()); + EXPECT_EQ(8, flush_token->get_checks()); vespalib::rmdir("stopdump3", true); flush_token = std::make_shared<MyFlushToken>(47); ASSERT_FALSE(try_merge_simple_indexes("stopdump3", {"stopdump2"}, flush_token)); - EXPECT_LT(48, flush_token->get_checks()); + EXPECT_LE(48, flush_token->get_checks()); clean_stopped_fusion_testdirs(); } diff --git a/searchlib/src/vespa/searchlib/diskindex/field_merger.cpp b/searchlib/src/vespa/searchlib/diskindex/field_merger.cpp index 6179f06c9da..68672a0a930 100644 --- a/searchlib/src/vespa/searchlib/diskindex/field_merger.cpp +++ b/searchlib/src/vespa/searchlib/diskindex/field_merger.cpp @@ -39,6 +39,11 @@ namespace search::diskindex { namespace { +constexpr uint32_t renumber_word_ids_heap_limit = 4; +constexpr uint32_t renumber_word_ids_merge_chunk = 1000000; +constexpr uint32_t merge_postings_heap_limit = 4; +constexpr uint32_t merge_postings_merge_chunk = 50000; + vespalib::string createTmpPath(const vespalib::string & base, uint32_t index) { vespalib::asciistream os; @@ -174,18 +179,20 @@ FieldMerger::renumber_word_ids_start() return false; } _word_aggregator = std::make_unique<WordAggregator>(); + _word_heap->setup(renumber_word_ids_heap_limit); + _word_heap->set_merge_chunk(_fusion_out_index.get_force_small_merge_chunk() ? 1u : renumber_word_ids_merge_chunk); return true; } -bool +void FieldMerger::renumber_word_ids_main() { - _word_heap->merge(*_word_aggregator, 4, *_flush_token); + _word_heap->merge(*_word_aggregator, *_flush_token); if (_flush_token->stop_requested()) { - return false; + _failed = true; + } else if (_word_heap->empty()) { + _state = State::RENUMBER_WORD_IDS_FINISH; } - assert(_word_heap->empty()); - return true; } bool @@ -356,6 +363,8 @@ FieldMerger::setup_merge_heap() _heap->initialAdd(reader.get()); } } + _heap->setup(merge_postings_heap_limit); + _heap->set_merge_chunk(_fusion_out_index.get_force_small_merge_chunk() ? 1u : merge_postings_merge_chunk); return true; } @@ -374,15 +383,15 @@ FieldMerger::merge_postings_start() return setup_merge_heap(); } -bool +void FieldMerger::merge_postings_main() { - _heap->merge(*_writer, 4, *_flush_token); + _heap->merge(*_writer, *_flush_token); if (_flush_token->stop_requested()) { - return false; + _failed = true; + } else if (_heap->empty()) { + _state = State::MERGE_POSTINGS_FINISH; } - assert(_heap->empty()); - return true; } bool @@ -475,11 +484,7 @@ FieldMerger::process_merge_field() merge_field_start(); break; case State::RENUMBER_WORD_IDS: - if (!renumber_word_ids_main()) { - renumber_word_ids_failed(); - } else { - _state = State::RENUMBER_WORD_IDS_FINISH; - } + renumber_word_ids_main(); break; case State::RENUMBER_WORD_IDS_FINISH: if (!renumber_word_ids_finish()) { @@ -491,11 +496,7 @@ FieldMerger::process_merge_field() } break; case State::MERGE_POSTINGS: - if (!merge_postings_main()) { - merge_postings_failed(); - } else { - _state = State::MERGE_POSTINGS_FINISH; - } + merge_postings_main(); break; case State::MERGE_POSTINGS_FINISH: merge_field_finish(); diff --git a/searchlib/src/vespa/searchlib/diskindex/field_merger.h b/searchlib/src/vespa/searchlib/diskindex/field_merger.h index c5ce337e845..5017a7d5192 100644 --- a/searchlib/src/vespa/searchlib/diskindex/field_merger.h +++ b/searchlib/src/vespa/searchlib/diskindex/field_merger.h @@ -53,13 +53,14 @@ class FieldMerger std::unique_ptr<FieldWriter> _writer; State _state; bool _failed; + bool _force_small_merge_chunk; void make_tmp_dirs(); bool clean_tmp_dirs(); bool open_input_word_readers(); bool read_mapping_files(); bool renumber_word_ids_start(); - bool renumber_word_ids_main(); + void renumber_word_ids_main(); bool renumber_word_ids_finish(); void renumber_word_ids_failed(); std::shared_ptr<FieldLengthScanner> allocate_field_length_scanner(); @@ -68,7 +69,7 @@ class FieldMerger bool select_cooked_or_raw_features(FieldReader& reader); bool setup_merge_heap(); bool merge_postings_start(); - bool merge_postings_main(); + void merge_postings_main(); bool merge_postings_finish(); void merge_postings_failed(); public: diff --git a/searchlib/src/vespa/searchlib/diskindex/fusion.cpp b/searchlib/src/vespa/searchlib/diskindex/fusion.cpp index 1afed18cb48..12552f09027 100644 --- a/searchlib/src/vespa/searchlib/diskindex/fusion.cpp +++ b/searchlib/src/vespa/searchlib/diskindex/fusion.cpp @@ -34,6 +34,7 @@ namespace { std::vector<FusionInputIndex> createInputIndexes(const std::vector<vespalib::string> & sources, const SelectorArray &selector) { + assert(sources.size() <= 255); // due to source selector data type std::vector<FusionInputIndex> indexes; indexes.reserve(sources.size()); uint32_t i = 0; @@ -43,17 +44,28 @@ createInputIndexes(const std::vector<vespalib::string> & sources, const Selector return indexes; } +uint32_t calc_trimmed_doc_id_limit(const SelectorArray& selector, const std::vector<vespalib::string>& sources) +{ + uint32_t docIdLimit = selector.size(); + uint32_t trimmed_doc_id_limit = docIdLimit; + + // Limit docIdLimit in output based on selections that cannot be satisfied + uint32_t sources_size = sources.size(); + while (trimmed_doc_id_limit > 0 && selector[trimmed_doc_id_limit - 1] >= sources_size) { + --trimmed_doc_id_limit; + } + return trimmed_doc_id_limit; +} + } -Fusion::Fusion(uint32_t docIdLimit, const Schema & schema, const vespalib::string & dir, - const std::vector<vespalib::string> & sources, const SelectorArray &selector, - bool dynamicKPosIndexFormat, const TuneFileIndexing &tuneFileIndexing, - const FileHeaderContext &fileHeaderContext) - : _fusion_out_index(schema, dir, createInputIndexes(sources, selector), docIdLimit, dynamicKPosIndexFormat, tuneFileIndexing, fileHeaderContext) +Fusion::Fusion(const Schema& schema, const vespalib::string& dir, + const std::vector<vespalib::string>& sources, const SelectorArray& selector, + const TuneFileIndexing& tuneFileIndexing, + const FileHeaderContext& fileHeaderContext) + : _old_indexes(createInputIndexes(sources, selector)), + _fusion_out_index(schema, dir, _old_indexes, calc_trimmed_doc_id_limit(selector, sources), tuneFileIndexing, fileHeaderContext) { - if (!readSchemaFiles()) { - throw IllegalArgumentException("Cannot read schema files for source indexes"); - } } Fusion::~Fusion() = default; @@ -102,51 +114,41 @@ Fusion::readSchemaFiles() } bool -Fusion::merge(const Schema &schema, const vespalib::string &dir, const std::vector<vespalib::string> &sources, - const SelectorArray &selector, bool dynamicKPosOccFormat, - const TuneFileIndexing &tuneFileIndexing, const FileHeaderContext &fileHeaderContext, - vespalib::ThreadExecutor & executor, - std::shared_ptr<IFlushToken> flush_token) +Fusion::merge(vespalib::ThreadExecutor& executor, std::shared_ptr<IFlushToken> flush_token) { - assert(sources.size() <= 255); - uint32_t docIdLimit = selector.size(); - uint32_t trimmedDocIdLimit = docIdLimit; - - // Limit docIdLimit in output based on selections that cannot be satisfied - uint32_t sourcesSize = sources.size(); - while (trimmedDocIdLimit > 0 && selector[trimmedDocIdLimit - 1] >= sourcesSize) { - --trimmedDocIdLimit; - } - FastOS_StatInfo statInfo; - if (!FastOS_File::Stat(dir.c_str(), &statInfo)) { + if (!FastOS_File::Stat(_fusion_out_index.get_path().c_str(), &statInfo)) { if (statInfo._error != FastOS_StatInfo::FileNotFound) { - LOG(error, "Could not stat \"%s\"", dir.c_str()); + LOG(error, "Could not stat \"%s\"", _fusion_out_index.get_path().c_str()); return false; } } else { if (!statInfo._isDirectory) { - LOG(error, "\"%s\" is not a directory", dir.c_str()); + LOG(error, "\"%s\" is not a directory", _fusion_out_index.get_path().c_str()); return false; } - search::DirectoryTraverse dt(dir.c_str()); + search::DirectoryTraverse dt(_fusion_out_index.get_path().c_str()); if (!dt.RemoveTree()) { - LOG(error, "Failed to clean directory \"%s\"", dir.c_str()); + LOG(error, "Failed to clean directory \"%s\"", _fusion_out_index.get_path().c_str()); return false; } } - vespalib::mkdir(dir, false); - schema.saveToFile(dir + "/schema.txt"); - if (!DocumentSummary::writeDocIdLimit(dir, trimmedDocIdLimit)) { - LOG(error, "Could not write docsum count in dir %s: %s", dir.c_str(), getLastErrorString().c_str()); + vespalib::mkdir(_fusion_out_index.get_path(), false); + _fusion_out_index.get_schema().saveToFile(_fusion_out_index.get_path() + "/schema.txt"); + if (!DocumentSummary::writeDocIdLimit(_fusion_out_index.get_path(), _fusion_out_index.get_doc_id_limit())) { + LOG(error, "Could not write docsum count in dir %s: %s", _fusion_out_index.get_path().c_str(), getLastErrorString().c_str()); return false; } try { - auto fusion = std::make_unique<Fusion>(trimmedDocIdLimit, schema, dir, sources, selector, - dynamicKPosOccFormat, tuneFileIndexing, fileHeaderContext); - return fusion->mergeFields(executor, flush_token); + for (auto& old_index : _old_indexes) { + old_index.setup(); + } + if (!readSchemaFiles()) { + throw IllegalArgumentException("Cannot read schema files for source indexes"); + } + return mergeFields(executor, flush_token); } catch (const std::exception & e) { LOG(error, "%s", e.what()); return false; diff --git a/searchlib/src/vespa/searchlib/diskindex/fusion.h b/searchlib/src/vespa/searchlib/diskindex/fusion.h index 7e0b70dca36..1f5c4471950 100644 --- a/searchlib/src/vespa/searchlib/diskindex/fusion.h +++ b/searchlib/src/vespa/searchlib/diskindex/fusion.h @@ -31,21 +31,19 @@ private: const Schema &getSchema() const { return _fusion_out_index.get_schema(); } + std::vector<FusionInputIndex> _old_indexes; FusionOutputIndex _fusion_out_index; public: Fusion(const Fusion &) = delete; Fusion& operator=(const Fusion &) = delete; - Fusion(uint32_t docIdLimit, const Schema &schema, const vespalib::string &dir, - const std::vector<vespalib::string> & sources, const SelectorArray &selector, bool dynamicKPosIndexFormat, - const TuneFileIndexing &tuneFileIndexing, const common::FileHeaderContext &fileHeaderContext); + Fusion(const Schema& schema, const vespalib::string& dir, + const std::vector<vespalib::string>& sources, const SelectorArray& selector, + const TuneFileIndexing& tuneFileIndexing, const common::FileHeaderContext& fileHeaderContext); ~Fusion(); - - static bool - merge(const Schema &schema, const vespalib::string &dir, const std::vector<vespalib::string> &sources, - const SelectorArray &docIdSelector, bool dynamicKPosOccFormat, const TuneFileIndexing &tuneFileIndexing, - const common::FileHeaderContext &fileHeaderContext, vespalib::ThreadExecutor & executor, - std::shared_ptr<IFlushToken> flush_token); + void set_dynamic_k_pos_index_format(bool dynamic_k_pos_index_format) { _fusion_out_index.set_dynamic_k_pos_index_format(dynamic_k_pos_index_format); } + void set_force_small_merge_chunk(bool force_small_merge_chunk) { _fusion_out_index.set_force_small_merge_chunk(force_small_merge_chunk); } + bool merge(vespalib::ThreadExecutor& executor, std::shared_ptr<IFlushToken> flush_token); }; } diff --git a/searchlib/src/vespa/searchlib/diskindex/fusion_input_index.cpp b/searchlib/src/vespa/searchlib/diskindex/fusion_input_index.cpp index 278d095b639..51c365957d9 100644 --- a/searchlib/src/vespa/searchlib/diskindex/fusion_input_index.cpp +++ b/searchlib/src/vespa/searchlib/diskindex/fusion_input_index.cpp @@ -14,21 +14,28 @@ namespace search::diskindex { FusionInputIndex::FusionInputIndex(const vespalib::string& path, uint32_t index, const SelectorArray& selector) : _path(path), _index(index), - _schema() + _selector(&selector), + _schema(), + _docIdMapping() { - vespalib::string fname = path + "/schema.txt"; +} + +FusionInputIndex::~FusionInputIndex() = default; + +void +FusionInputIndex::setup() +{ + vespalib::string fname = _path + "/schema.txt"; if ( ! _schema.loadFromFile(fname)) { throw IllegalArgumentException(make_string("Failed loading schema %s", fname.c_str())); } if ( ! SchemaUtil::validateSchema(_schema)) { throw IllegalArgumentException(make_string("Failed validating schema %s", fname.c_str())); } - if (!_docIdMapping.readDocIdLimit(path)) { - throw IllegalArgumentException(make_string("Cannot determine docIdLimit for old index \"%s\"", path.c_str())); + if (!_docIdMapping.readDocIdLimit(_path)) { + throw IllegalArgumentException(make_string("Cannot determine docIdLimit for old index \"%s\"", _path.c_str())); } - _docIdMapping.setup(_docIdMapping._docIdLimit, &selector, index); + _docIdMapping.setup(_docIdMapping._docIdLimit, _selector, _index); } -FusionInputIndex::~FusionInputIndex() = default; - } diff --git a/searchlib/src/vespa/searchlib/diskindex/fusion_input_index.h b/searchlib/src/vespa/searchlib/diskindex/fusion_input_index.h index fd7bb8f0256..6606e00d73b 100644 --- a/searchlib/src/vespa/searchlib/diskindex/fusion_input_index.h +++ b/searchlib/src/vespa/searchlib/diskindex/fusion_input_index.h @@ -14,10 +14,11 @@ namespace search::diskindex { class FusionInputIndex { private: - vespalib::string _path; - uint32_t _index; - index::Schema _schema; - DocIdMapping _docIdMapping; + vespalib::string _path; + uint32_t _index; + const SelectorArray* _selector; + index::Schema _schema; + DocIdMapping _docIdMapping; public: FusionInputIndex(const vespalib::string& path, uint32_t index, const SelectorArray& selector); @@ -25,6 +26,7 @@ public: FusionInputIndex & operator = (FusionInputIndex&&) = default; ~FusionInputIndex(); + void setup(); const vespalib::string& getPath() const noexcept { return _path; } uint32_t getIndex() const noexcept { return _index; } const DocIdMapping& getDocIdMapping() const noexcept { return _docIdMapping; } diff --git a/searchlib/src/vespa/searchlib/diskindex/fusion_output_index.cpp b/searchlib/src/vespa/searchlib/diskindex/fusion_output_index.cpp index 66ec0889cbe..3c75aa16b93 100644 --- a/searchlib/src/vespa/searchlib/diskindex/fusion_output_index.cpp +++ b/searchlib/src/vespa/searchlib/diskindex/fusion_output_index.cpp @@ -5,12 +5,13 @@ namespace search::diskindex { -FusionOutputIndex::FusionOutputIndex(const index::Schema& schema, const vespalib::string& path, std::vector<FusionInputIndex> old_indexes, uint32_t doc_id_limit, bool dynamic_k_pos_index_format, const TuneFileIndexing& tune_file_indexing, const common::FileHeaderContext& file_header_context) +FusionOutputIndex::FusionOutputIndex(const index::Schema& schema, const vespalib::string& path, const std::vector<FusionInputIndex>& old_indexes, uint32_t doc_id_limit, const TuneFileIndexing& tune_file_indexing, const common::FileHeaderContext& file_header_context) : _schema(schema), _path(path), _old_indexes(std::move(old_indexes)), _doc_id_limit(doc_id_limit), - _dynamic_k_pos_index_format(dynamic_k_pos_index_format), + _dynamic_k_pos_index_format(false), + _force_small_merge_chunk(false), _tune_file_indexing(tune_file_indexing), _file_header_context(file_header_context) { diff --git a/searchlib/src/vespa/searchlib/diskindex/fusion_output_index.h b/searchlib/src/vespa/searchlib/diskindex/fusion_output_index.h index c366b111363..729ecd26524 100644 --- a/searchlib/src/vespa/searchlib/diskindex/fusion_output_index.h +++ b/searchlib/src/vespa/searchlib/diskindex/fusion_output_index.h @@ -22,20 +22,24 @@ class FusionOutputIndex private: const index::Schema& _schema; const vespalib::string _path; - const std::vector<FusionInputIndex> _old_indexes; + const std::vector<FusionInputIndex>& _old_indexes; const uint32_t _doc_id_limit; - const bool _dynamic_k_pos_index_format; + bool _dynamic_k_pos_index_format; + bool _force_small_merge_chunk; const TuneFileIndexing& _tune_file_indexing; const common::FileHeaderContext& _file_header_context; public: - FusionOutputIndex(const index::Schema& schema, const vespalib::string& path, std::vector<FusionInputIndex> old_indexes, uint32_t doc_id_limit, bool dynamic_k_pos_index_format, const TuneFileIndexing& tune_file_indexing, const common::FileHeaderContext& file_header_context); + FusionOutputIndex(const index::Schema& schema, const vespalib::string& path, const std::vector<FusionInputIndex>& old_indexes, uint32_t doc_id_limit, const TuneFileIndexing& tune_file_indexing, const common::FileHeaderContext& file_header_context); ~FusionOutputIndex(); + void set_dynamic_k_pos_index_format(bool dynamic_k_pos_index_format) { _dynamic_k_pos_index_format = dynamic_k_pos_index_format; } + void set_force_small_merge_chunk(bool force_small_merge_chunk) { _force_small_merge_chunk = force_small_merge_chunk; } const index::Schema& get_schema() const noexcept { return _schema; } const vespalib::string& get_path() const noexcept { return _path; } const std::vector<FusionInputIndex>& get_old_indexes() const noexcept { return _old_indexes; } uint32_t get_doc_id_limit() const noexcept { return _doc_id_limit; } bool get_dynamic_k_pos_index_format() const noexcept { return _dynamic_k_pos_index_format; } + bool get_force_small_merge_chunk() const noexcept { return _force_small_merge_chunk; } const TuneFileIndexing& get_tune_file_indexing() const noexcept { return _tune_file_indexing; } const common::FileHeaderContext& get_file_header_context() const noexcept { return _file_header_context; } }; diff --git a/searchlib/src/vespa/searchlib/test/fakedata/fakememtreeocc.cpp b/searchlib/src/vespa/searchlib/test/fakedata/fakememtreeocc.cpp index 64d194c5c7e..666eed8f1e8 100644 --- a/searchlib/src/vespa/searchlib/test/fakedata/fakememtreeocc.cpp +++ b/searchlib/src/vespa/searchlib/test/fakedata/fakememtreeocc.cpp @@ -363,8 +363,11 @@ FakeMemTreeOccFactory::setup(const std::vector<const FakeWord *> &fws) } ++i; } - heap.merge(_mgr, 4, flush_token); - assert(heap.empty()); + heap.setup(4); + heap.set_merge_chunk(100000); + while (!heap.empty()) { + heap.merge(_mgr, flush_token); + } _mgr.finalize(); } diff --git a/searchlib/src/vespa/searchlib/util/posting_priority_queue.h b/searchlib/src/vespa/searchlib/util/posting_priority_queue.h index 01ae0995806..c1549b32f93 100644 --- a/searchlib/src/vespa/searchlib/util/posting_priority_queue.h +++ b/searchlib/src/vespa/searchlib/util/posting_priority_queue.h @@ -12,7 +12,7 @@ namespace search { template <class Reader> class PostingPriorityQueue { -public: +protected: class Ref { Reader *_ref; @@ -28,9 +28,14 @@ public: using Vector = std::vector<Ref>; Vector _vec; + uint32_t _heap_limit; + uint32_t _merge_chunk; +public: PostingPriorityQueue() - : _vec() + : _vec(), + _heap_limit(0u), + _merge_chunk(0u) { } @@ -40,9 +45,10 @@ public: /* * Sort vector after a set of initial add operations, so lowest() - * and adjust() can be used. + * and adjust() can be used. Skip sort if _vec.size() < heap_limit + * since merging with few elements don't use heap. */ - void sort() { std::sort(_vec.begin(), _vec.end()); } + void setup(uint32_t heap_limit); /* * Return lowest value. Assumes vector is sorted. diff --git a/searchlib/src/vespa/searchlib/util/posting_priority_queue.hpp b/searchlib/src/vespa/searchlib/util/posting_priority_queue.hpp index 69bf7bc547c..33f3bce2be6 100644 --- a/searchlib/src/vespa/searchlib/util/posting_priority_queue.hpp +++ b/searchlib/src/vespa/searchlib/util/posting_priority_queue.hpp @@ -32,4 +32,17 @@ PostingPriorityQueue<Reader>::adjust() *to = changed; // Save changed value at right location } +template <class Reader> +void +PostingPriorityQueue<Reader>::setup(uint32_t heap_limit) +{ + _heap_limit = heap_limit; + for (auto ref : _vec) { + assert(ref.get()->isValid()); + } + if (_vec.size() >= heap_limit) { + std::sort(_vec.begin(), _vec.end()); + } +} + } diff --git a/searchlib/src/vespa/searchlib/util/posting_priority_queue_merger.h b/searchlib/src/vespa/searchlib/util/posting_priority_queue_merger.h index 8dd941a2c13..9debcd06ea6 100644 --- a/searchlib/src/vespa/searchlib/util/posting_priority_queue_merger.h +++ b/searchlib/src/vespa/searchlib/util/posting_priority_queue_merger.h @@ -13,20 +13,29 @@ namespace search { template <class Reader, class Writer> class PostingPriorityQueueMerger : public PostingPriorityQueue<Reader> { + uint32_t _merge_chunk; public: using Parent = PostingPriorityQueue<Reader>; using Vector = typename Parent::Vector; + using Parent::_heap_limit; using Parent::_vec; using Parent::adjust; using Parent::empty; using Parent::lowest; - using Parent::sort; + using Parent::setup; - void mergeHeap(Writer& writer, const IFlushToken& flush_token) __attribute__((noinline)); - static void mergeOne(Writer& writer, Reader& reader, const IFlushToken &flush_token) __attribute__((noinline)); - static void mergeTwo(Writer& writer, Reader& reader1, Reader& reader2, const IFlushToken& flush_token) __attribute__((noinline)); - static void mergeSmall(Writer& writer, typename Vector::iterator ib, typename Vector::iterator ie, const IFlushToken &flush_token) __attribute__((noinline)); - void merge(Writer& writer, uint32_t heapLimit, const IFlushToken& flush_token) __attribute__((noinline)); + PostingPriorityQueueMerger() + : Parent(), + _merge_chunk(0u) + { + } + + void set_merge_chunk(uint32_t merge_chunk) { _merge_chunk = merge_chunk; } + void mergeHeap(Writer& writer, const IFlushToken& flush_token, uint32_t remaining_merge_chunk) __attribute__((noinline)); + static void mergeOne(Writer& writer, Reader& reader, const IFlushToken &flush_token, uint32_t remaining_merge_chunk) __attribute__((noinline)); + static void mergeTwo(Writer& writer, Reader& reader1, Reader& reader2, const IFlushToken& flush_token, uint32_t& remaining_merge_chunk) __attribute__((noinline)); + static void mergeSmall(Writer& writer, typename Vector::iterator ib, typename Vector::iterator ie, const IFlushToken &flush_token, uint32_t& remaining_merge_chunk) __attribute__((noinline)); + void merge(Writer& writer, const IFlushToken& flush_token) __attribute__((noinline)); }; } diff --git a/searchlib/src/vespa/searchlib/util/posting_priority_queue_merger.hpp b/searchlib/src/vespa/searchlib/util/posting_priority_queue_merger.hpp index d33356cee4a..5676f6326df 100644 --- a/searchlib/src/vespa/searchlib/util/posting_priority_queue_merger.hpp +++ b/searchlib/src/vespa/searchlib/util/posting_priority_queue_merger.hpp @@ -9,44 +9,48 @@ namespace search { template <class Reader, class Writer> void -PostingPriorityQueueMerger<Reader, Writer>::mergeHeap(Writer& writer, const IFlushToken& flush_token) +PostingPriorityQueueMerger<Reader, Writer>::mergeHeap(Writer& writer, const IFlushToken& flush_token, uint32_t remaining_merge_chunk) { - while (!empty() && !flush_token.stop_requested()) { + while (remaining_merge_chunk > 0u && !empty() && !flush_token.stop_requested()) { Reader *low = lowest(); low->write(writer); low->read(); adjust(); + --remaining_merge_chunk; } } template <class Reader, class Writer> void -PostingPriorityQueueMerger<Reader, Writer>::mergeOne(Writer& writer, Reader& reader, const IFlushToken& flush_token) +PostingPriorityQueueMerger<Reader, Writer>::mergeOne(Writer& writer, Reader& reader, const IFlushToken& flush_token, uint32_t remaining_merge_chunk) { - while (reader.isValid() && !flush_token.stop_requested()) { + while (remaining_merge_chunk > 0u && reader.isValid() && !flush_token.stop_requested()) { reader.write(writer); reader.read(); + --remaining_merge_chunk; } } template <class Reader, class Writer> void -PostingPriorityQueueMerger<Reader, Writer>::mergeTwo(Writer& writer, Reader& reader1, Reader& reader2, const IFlushToken& flush_token) +PostingPriorityQueueMerger<Reader, Writer>::mergeTwo(Writer& writer, Reader& reader1, Reader& reader2, const IFlushToken& flush_token, uint32_t& remaining_merge_chunk) { - while (!flush_token.stop_requested()) { + while (remaining_merge_chunk > 0u && !flush_token.stop_requested()) { Reader &low = reader2 < reader1 ? reader2 : reader1; low.write(writer); low.read(); - if (!low.isValid()) + --remaining_merge_chunk; + if (!low.isValid()) { break; + } } } template <class Reader, class Writer> void -PostingPriorityQueueMerger<Reader, Writer>::mergeSmall(Writer& writer, typename Vector::iterator ib, typename Vector::iterator ie, const IFlushToken& flush_token) +PostingPriorityQueueMerger<Reader, Writer>::mergeSmall(Writer& writer, typename Vector::iterator ib, typename Vector::iterator ie, const IFlushToken& flush_token, uint32_t& remaining_merge_chunk) { - while (!flush_token.stop_requested()) { + while (remaining_merge_chunk > 0u && !flush_token.stop_requested()) { typename Vector::iterator i = ib; Reader *low = i->get(); for (++i; i != ie; ++i) @@ -54,47 +58,49 @@ PostingPriorityQueueMerger<Reader, Writer>::mergeSmall(Writer& writer, typename low = i->get(); low->write(writer); low->read(); - if (!low->isValid()) + --remaining_merge_chunk; + if (!low->isValid()) { break; + } } } template <class Reader, class Writer> void -PostingPriorityQueueMerger<Reader, Writer>::merge(Writer& writer, uint32_t heapLimit, const IFlushToken& flush_token) +PostingPriorityQueueMerger<Reader, Writer>::merge(Writer& writer, const IFlushToken& flush_token) { if (_vec.empty()) return; - for (typename Vector::iterator i = _vec.begin(), ie = _vec.end(); i != ie; - ++i) { - assert(i->get()->isValid()); - } - if (_vec.size() >= heapLimit) { - sort(); - void (PostingPriorityQueueMerger::*mergeHeapFunc)(Writer& writer, const IFlushToken& flush_token) = + assert(_heap_limit > 0u); + uint32_t remaining_merge_chunk = _merge_chunk; + if (_vec.size() >= _heap_limit) { + void (PostingPriorityQueueMerger::*mergeHeapFunc)(Writer& writer, const IFlushToken& flush_token, uint32_t remaining_merge_chunk) = &PostingPriorityQueueMerger::mergeHeap; - (this->*mergeHeapFunc)(writer, flush_token); + (this->*mergeHeapFunc)(writer, flush_token, remaining_merge_chunk); return; } - while (!flush_token.stop_requested()) { + while (remaining_merge_chunk > 0u && !flush_token.stop_requested()) { if (_vec.size() == 1) { - void (*mergeOneFunc)(Writer& writer, Reader& reader, const IFlushToken& flush_token) = + void (*mergeOneFunc)(Writer& writer, Reader& reader, const IFlushToken& flush_token, uint32_t remaining_merge_chunk) = &PostingPriorityQueueMerger::mergeOne; - (*mergeOneFunc)(writer, *_vec.front().get(), flush_token); - _vec.clear(); + (*mergeOneFunc)(writer, *_vec.front().get(), flush_token, remaining_merge_chunk); + if (!_vec.front().get()->isValid()) { + _vec.clear(); + } return; } if (_vec.size() == 2) { - void (*mergeTwoFunc)(Writer& writer, Reader& reader1, Reader& reader2, const IFlushToken& flush_token) = + void (*mergeTwoFunc)(Writer& writer, Reader& reader1, Reader& reader2, const IFlushToken& flush_token, uint32_t& remaining_merge_chunk) = &PostingPriorityQueueMerger::mergeTwo; - (*mergeTwoFunc)(writer, *_vec[0].get(), *_vec[1].get(), flush_token); + (*mergeTwoFunc)(writer, *_vec[0].get(), *_vec[1].get(), flush_token, remaining_merge_chunk); } else { void (*mergeSmallFunc)(Writer& writer, typename Vector::iterator ib, typename Vector::iterator ie, - const IFlushToken& flush_token) = + const IFlushToken& flush_token, + uint32_t& remaining_merge_chunk) = &PostingPriorityQueueMerger::mergeSmall; - (*mergeSmallFunc)(writer, _vec.begin(), _vec.end(), flush_token); + (*mergeSmallFunc)(writer, _vec.begin(), _vec.end(), flush_token, remaining_merge_chunk); } for (typename Vector::iterator i = _vec.begin(), ie = _vec.end(); i != ie; ++i) { |