From b7bbe73ab4abf0c13fa6618b44a676def05a5958 Mon Sep 17 00:00:00 2001 From: Tor Egge Date: Sat, 15 Jan 2022 17:15:56 +0100 Subject: Split element length scanning into smaller chunks. --- .../src/tests/diskindex/fusion/fusion_test.cpp | 20 ++++- .../src/vespa/searchlib/diskindex/field_merger.cpp | 99 +++++++++++++++++----- .../src/vespa/searchlib/diskindex/field_merger.h | 14 ++- .../src/vespa/searchlib/diskindex/fieldreader.cpp | 33 +++++--- .../src/vespa/searchlib/diskindex/fieldreader.h | 5 +- 5 files changed, 132 insertions(+), 39 deletions(-) (limited to 'searchlib') diff --git a/searchlib/src/tests/diskindex/fusion/fusion_test.cpp b/searchlib/src/tests/diskindex/fusion/fusion_test.cpp index 72867edf474..37fac07a8d4 100644 --- a/searchlib/src/tests/diskindex/fusion/fusion_test.cpp +++ b/searchlib/src/tests/diskindex/fusion/fusion_test.cpp @@ -63,12 +63,14 @@ class FusionTest : public ::testing::Test { protected: Schema _schema; + bool _force_small_merge_chunk; const Schema & getSchema() const { return _schema; } void requireThatFusionIsWorking(const vespalib::string &prefix, bool directio, bool readmmap, bool force_short_merge_chunk); void make_simple_index(const vespalib::string &dump_dir, const IFieldLengthInspector &field_length_inspector); bool try_merge_simple_indexes(const vespalib::string &dump_dir, const std::vector &sources, std::shared_ptr flush_token); void merge_simple_indexes(const vespalib::string &dump_dir, const std::vector &sources); + void reconstruct_interleaved_features(); public: FusionTest(); }; @@ -494,6 +496,7 @@ FusionTest::try_merge_simple_indexes(const vespalib::string &dump_dir, const std SelectorArray selector(20, 0); Fusion fusion(_schema, dump_dir, sources, selector, tuneFileIndexing, fileHeaderContext); + fusion.set_force_small_merge_chunk(_force_small_merge_chunk); return fusion.merge(executor, flush_token); } @@ -505,7 +508,8 @@ FusionTest::merge_simple_indexes(const vespalib::string &dump_dir, const std::ve FusionTest::FusionTest() : ::testing::Test(), - _schema(make_schema(false)) + _schema(make_schema(false)), + _force_small_merge_chunk(false) { } @@ -557,7 +561,8 @@ TEST_F(FusionTest, require_that_average_field_length_is_preserved) clean_field_length_testdirs(); } -TEST_F(FusionTest, require_that_interleaved_features_can_be_reconstructed) +void +FusionTest::reconstruct_interleaved_features() { clean_field_length_testdirs(); make_simple_index("fldump2", MockFieldLengthInspector()); @@ -573,6 +578,17 @@ TEST_F(FusionTest, require_that_interleaved_features_can_be_reconstructed) clean_field_length_testdirs(); } +TEST_F(FusionTest, require_that_interleaved_features_can_be_reconstructed) +{ + reconstruct_interleaved_features(); +} + +TEST_F(FusionTest, require_that_interleaved_features_can_be_reconstructed_with_small_merge_chunk) +{ + _force_small_merge_chunk = true; + reconstruct_interleaved_features(); +} + namespace { void clean_stopped_fusion_testdirs() diff --git a/searchlib/src/vespa/searchlib/diskindex/field_merger.cpp b/searchlib/src/vespa/searchlib/diskindex/field_merger.cpp index 68672a0a930..c5ad0c63c21 100644 --- a/searchlib/src/vespa/searchlib/diskindex/field_merger.cpp +++ b/searchlib/src/vespa/searchlib/diskindex/field_merger.cpp @@ -43,6 +43,7 @@ constexpr uint32_t renumber_word_ids_heap_limit = 4; constexpr uint32_t renumber_word_ids_merge_chunk = 1000000; constexpr uint32_t merge_postings_heap_limit = 4; constexpr uint32_t merge_postings_merge_chunk = 50000; +constexpr uint32_t scan_chunk = 80000; vespalib::string createTmpPath(const vespalib::string & base, uint32_t index) { @@ -69,6 +70,8 @@ FieldMerger::FieldMerger(uint32_t id, const FusionOutputIndex& fusion_out_index, _readers(), _heap(), _writer(), + _field_length_scanner(), + _open_reader_idx(std::numeric_limits::max()), _state(State::MERGE_START), _failed(false) { @@ -228,7 +231,7 @@ FieldMerger::renumber_word_ids_failed() LOG(error, "Could not renumber field word ids for field %s dir %s", _field_name.c_str(), _field_dir.c_str()); } -std::shared_ptr +void FieldMerger::allocate_field_length_scanner() { SchemaUtil::IndexIterator index(_fusion_out_index.get_schema(), _id); @@ -242,35 +245,72 @@ FieldMerger::allocate_field_length_scanner() const Schema &old_schema = old_index.getSchema(); if (index.hasOldFields(old_schema) && !index.has_matching_use_interleaved_features(old_schema)) { - return std::make_shared(_fusion_out_index.get_doc_id_limit()); + _field_length_scanner = std::make_shared(_fusion_out_index.get_doc_id_limit()); + return; } } } } - return std::shared_ptr(); } bool +FieldMerger::open_input_field_reader() +{ + auto& oi = _fusion_out_index.get_old_indexes()[_open_reader_idx]; + if (!_readers.back()->open(oi.getPath() + "/" + _field_name + "/", _fusion_out_index.get_tune_file_indexing()._read)) { + _readers.pop_back(); + return false; + } + return true; +} + +void FieldMerger::open_input_field_readers() { - _readers.reserve(_fusion_out_index.get_old_indexes().size()); SchemaUtil::IndexIterator index(_fusion_out_index.get_schema(), _id); - auto field_length_scanner = allocate_field_length_scanner(); - for (const auto &oi : _fusion_out_index.get_old_indexes()) { + for (; _open_reader_idx < _fusion_out_index.get_old_indexes().size(); ++_open_reader_idx) { + auto& oi = _fusion_out_index.get_old_indexes()[_open_reader_idx]; const Schema &oldSchema = oi.getSchema(); if (!index.hasOldFields(oldSchema)) { continue; // drop data } - auto reader = FieldReader::allocFieldReader(index, oldSchema, field_length_scanner); - reader->setup(_word_num_mappings[oi.getIndex()], oi.getDocIdMapping()); - if (!reader->open(oi.getPath() + "/" + _field_name + "/", _fusion_out_index.get_tune_file_indexing()._read)) { - return false; + _readers.push_back(FieldReader::allocFieldReader(index, oldSchema, _field_length_scanner)); + auto& reader = *_readers.back(); + reader.setup(_word_num_mappings[oi.getIndex()], oi.getDocIdMapping()); + if (!open_input_field_reader()) { + merge_postings_failed(); + return; + } + if (reader.need_regenerate_interleaved_features_scan()) { + _state = State::SCAN_ELEMENT_LENGTHS; + return; } - _readers.push_back(std::move(reader)); } - return true; + _field_length_scanner.reset(); + _open_reader_idx = std::numeric_limits::max(); + _state = State::OPEN_POSTINGS_FIELD_READERS_FINISH; +} + +void +FieldMerger::scan_element_lengths() +{ + auto& reader = *_readers.back(); + if (reader.isValid()) { + reader.scan_element_lengths(_fusion_out_index.get_force_small_merge_chunk() ? 1u : scan_chunk); + if (reader.isValid()) { + return; + } + } + reader.close(); + if (!open_input_field_reader()) { + merge_postings_failed(); + } else { + ++_open_reader_idx; + _state = State::OPEN_POSTINGS_FIELD_READERS; + } } + bool FieldMerger::open_field_writer() { @@ -368,19 +408,25 @@ FieldMerger::setup_merge_heap() return true; } -bool +void FieldMerger::merge_postings_start() { /* OUTPUT */ _writer = std::make_unique(_fusion_out_index.get_doc_id_limit(), _num_word_ids); + _readers.reserve(_fusion_out_index.get_old_indexes().size()); + allocate_field_length_scanner(); + _open_reader_idx = 0; + _state = State::OPEN_POSTINGS_FIELD_READERS; +} - if (!open_input_field_readers()) { - return false; - } - if (!open_field_writer()) { - return false; +void +FieldMerger::merge_postings_open_field_readers_done() +{ + if (!open_field_writer() || !setup_merge_heap()) { + merge_postings_failed(); + } else { + _state = State::MERGE_POSTINGS; } - return setup_merge_heap(); } void @@ -457,7 +503,6 @@ FieldMerger::merge_field_finish() bool res = merge_postings_finish(); if (!res) { merge_postings_failed(); - _failed = true; return; } if (!FileKit::createStamp(_field_dir + "/.mergeocc_done")) { @@ -489,11 +534,19 @@ FieldMerger::process_merge_field() case State::RENUMBER_WORD_IDS_FINISH: if (!renumber_word_ids_finish()) { renumber_word_ids_failed(); - } else if (!merge_postings_start()) { - merge_postings_failed(); + break; } else { - _state = State::MERGE_POSTINGS; + merge_postings_start(); } + [[fallthrough]]; + case State::OPEN_POSTINGS_FIELD_READERS: + open_input_field_readers(); + break; + case State::SCAN_ELEMENT_LENGTHS: + scan_element_lengths(); + break; + case State::OPEN_POSTINGS_FIELD_READERS_FINISH: + merge_postings_open_field_readers_done(); break; case State::MERGE_POSTINGS: merge_postings_main(); diff --git a/searchlib/src/vespa/searchlib/diskindex/field_merger.h b/searchlib/src/vespa/searchlib/diskindex/field_merger.h index 5017a7d5192..380baa3e060 100644 --- a/searchlib/src/vespa/searchlib/diskindex/field_merger.h +++ b/searchlib/src/vespa/searchlib/diskindex/field_merger.h @@ -33,6 +33,9 @@ class FieldMerger MERGE_START, RENUMBER_WORD_IDS, RENUMBER_WORD_IDS_FINISH, + OPEN_POSTINGS_FIELD_READERS, + SCAN_ELEMENT_LENGTHS, + OPEN_POSTINGS_FIELD_READERS_FINISH, MERGE_POSTINGS, MERGE_POSTINGS_FINISH, MERGE_DONE @@ -51,6 +54,8 @@ class FieldMerger std::vector> _readers; std::unique_ptr> _heap; std::unique_ptr _writer; + std::shared_ptr _field_length_scanner; + uint32_t _open_reader_idx; State _state; bool _failed; bool _force_small_merge_chunk; @@ -63,12 +68,15 @@ class FieldMerger void renumber_word_ids_main(); bool renumber_word_ids_finish(); void renumber_word_ids_failed(); - std::shared_ptr allocate_field_length_scanner(); - bool open_input_field_readers(); + void allocate_field_length_scanner(); + bool open_input_field_reader(); + void open_input_field_readers(); + void scan_element_lengths(); bool open_field_writer(); bool select_cooked_or_raw_features(FieldReader& reader); bool setup_merge_heap(); - bool merge_postings_start(); + void merge_postings_start(); + void merge_postings_open_field_readers_done(); void merge_postings_main(); bool merge_postings_finish(); void merge_postings_failed(); diff --git a/searchlib/src/vespa/searchlib/diskindex/fieldreader.cpp b/searchlib/src/vespa/searchlib/diskindex/fieldreader.cpp index 202ac2b08ff..1794db8bba3 100644 --- a/searchlib/src/vespa/searchlib/diskindex/fieldreader.cpp +++ b/searchlib/src/vespa/searchlib/diskindex/fieldreader.cpp @@ -99,6 +99,16 @@ FieldReader::allowRawFeatures() return true; } +bool +FieldReader::need_regenerate_interleaved_features_scan() +{ + return false; +} + +void +FieldReader::scan_element_lengths(uint32_t) +{ +} void FieldReader::setup(const WordNumMapping &wordNumMapping, @@ -277,27 +287,30 @@ FieldReaderStripInfo::open(const vespalib::string &prefix, const TuneFileSeqRead _regenerate_interleaved_features = true; } } - if (_regenerate_interleaved_features && _hasElements && _field_length_scanner) { - scan_element_lengths(); - close(); - if (!FieldReader::open(prefix, tuneFileRead)) { - return false; - } - } return true; } +bool +FieldReaderStripInfo::need_regenerate_interleaved_features_scan() +{ + return (_regenerate_interleaved_features && _hasElements && _field_length_scanner); +} + void -FieldReaderStripInfo::scan_element_lengths() +FieldReaderStripInfo::scan_element_lengths(uint32_t scan_chunk) { - for (;;) { + if (!isValid()) { + return; + } + while (scan_chunk != 0u) { FieldReader::read(); - if (_wordNum == noWordNumHigh()) { + if (!isValid()) { break; } DocIdAndFeatures &features = _docIdAndFeatures; assert(!features.has_raw_data()); _field_length_scanner->scan_features(features); + --scan_chunk; } } diff --git a/searchlib/src/vespa/searchlib/diskindex/fieldreader.h b/searchlib/src/vespa/searchlib/diskindex/fieldreader.h index ef319971fa7..75c8d35740e 100644 --- a/searchlib/src/vespa/searchlib/diskindex/fieldreader.h +++ b/searchlib/src/vespa/searchlib/diskindex/fieldreader.h @@ -72,6 +72,8 @@ public: virtual void read(); virtual bool allowRawFeatures(); + virtual bool need_regenerate_interleaved_features_scan(); + virtual void scan_element_lengths(uint32_t scan_chunk); void write(FieldWriter &writer) { if (_wordNum != writer.getSparseWordNum()) { @@ -131,9 +133,10 @@ private: public: FieldReaderStripInfo(const IndexIterator &index, std::shared_ptr); bool allowRawFeatures() override; + bool need_regenerate_interleaved_features_scan() override; bool open(const vespalib::string &prefix, const TuneFileSeqRead &tuneFileRead) override; void read() override; - void scan_element_lengths(); + void scan_element_lengths(uint32_t scan_chunk) override; void getFeatureParams(PostingListParams ¶ms) override; }; -- cgit v1.2.3