diff options
author | Tor Egge <Tor.Egge@online.no> | 2022-01-06 12:25:14 +0100 |
---|---|---|
committer | Tor Egge <Tor.Egge@online.no> | 2022-01-06 12:27:20 +0100 |
commit | ad70f29311cca2e812df502347433d9b4ecf3e54 (patch) | |
tree | a135415d108baba49c6b614a28729a33a4883b4f /searchlib | |
parent | 00fd50f8f99ec06a846404355f6c6e5c54e73d4f (diff) |
Move state from stack to field merger.
Diffstat (limited to 'searchlib')
-rw-r--r-- | searchlib/src/vespa/searchlib/diskindex/field_merger.cpp | 122 | ||||
-rw-r--r-- | searchlib/src/vespa/searchlib/diskindex/field_merger.h | 30 | ||||
-rw-r--r-- | searchlib/src/vespa/searchlib/diskindex/fusion.cpp | 4 |
3 files changed, 87 insertions, 69 deletions
diff --git a/searchlib/src/vespa/searchlib/diskindex/field_merger.cpp b/searchlib/src/vespa/searchlib/diskindex/field_merger.cpp index 375d30d3003..ee5d72f1daa 100644 --- a/searchlib/src/vespa/searchlib/diskindex/field_merger.cpp +++ b/searchlib/src/vespa/searchlib/diskindex/field_merger.cpp @@ -50,10 +50,18 @@ createTmpPath(const vespalib::string & base, uint32_t index) { } -FieldMerger::FieldMerger(uint32_t id, const FusionOutputIndex& fusion_out_index) +FieldMerger::FieldMerger(uint32_t id, const FusionOutputIndex& fusion_out_index, std::shared_ptr<IFlushToken> flush_token) : _id(id), _field_dir(fusion_out_index.get_path() + "/" + SchemaUtil::IndexIterator(fusion_out_index.get_schema(), id).getName()), - _fusion_out_index(fusion_out_index) + _fusion_out_index(fusion_out_index), + _flush_token(std::move(flush_token)), + _word_readers(), + _word_heap(), + _word_num_mappings(), + _num_word_ids(0), + _readers(), + _heap(), + _writer() { } @@ -96,8 +104,10 @@ FieldMerger::clean_tmp_dirs() } bool -FieldMerger::open_input_word_readers(std::vector<std::unique_ptr<DictionaryWordReader>> & readers, PostingPriorityQueue<DictionaryWordReader>& heap) +FieldMerger::open_input_word_readers() { + _word_readers.reserve(_fusion_out_index.get_old_indexes().size()); + _word_heap = std::make_unique<PostingPriorityQueue<DictionaryWordReader>>(); SchemaUtil::IndexIterator index(_fusion_out_index.get_schema(), _id); for (auto & oi : _fusion_out_index.get_old_indexes()) { auto reader(std::make_unique<DictionaryWordReader>()); @@ -117,16 +127,17 @@ FieldMerger::open_input_word_readers(std::vector<std::unique_ptr<DictionaryWordR } reader->read(); if (reader->isValid()) { - readers.push_back(std::move(reader)); - heap.initialAdd(readers.back().get()); + _word_readers.push_back(std::move(reader)); + _word_heap->initialAdd(_word_readers.back().get()); } } return true; } bool -FieldMerger::read_mapping_files(WordNumMappingList& list) +FieldMerger::read_mapping_files() { + _word_num_mappings.resize(_fusion_out_index.get_old_indexes().size()); SchemaUtil::IndexIterator index(_fusion_out_index.get_schema(), _id); for (const auto & oi : _fusion_out_index.get_old_indexes()) { std::vector<uint32_t> oldIndexes; @@ -134,7 +145,7 @@ FieldMerger::read_mapping_files(WordNumMappingList& list) if (!SchemaUtil::getIndexIds(oldSchema, DataType::STRING, oldIndexes)) { return false; } - WordNumMapping &wordNumMapping = list[oi.getIndex()]; + WordNumMapping &wordNumMapping = _word_num_mappings[oi.getIndex()]; if (oldIndexes.empty()) { wordNumMapping.noMappingFile(); continue; @@ -152,34 +163,34 @@ FieldMerger::read_mapping_files(WordNumMappingList& list) } bool -FieldMerger::renumber_word_ids(WordNumMappingList& list, uint64_t& numWordIds, const IFlushToken& flush_token) +FieldMerger::renumber_word_ids() { SchemaUtil::IndexIterator index(_fusion_out_index.get_schema(), _id); vespalib::string indexName = index.getName(); LOG(debug, "Renumber word IDs for field %s", indexName.c_str()); - std::vector<std::unique_ptr<DictionaryWordReader>> readers; - PostingPriorityQueue<DictionaryWordReader> heap; WordAggregator out; - if (!open_input_word_readers(readers, heap)) { + if (!open_input_word_readers()) { return false; } - heap.merge(out, 4, flush_token); - if (flush_token.stop_requested()) { + _word_heap->merge(out, 4, *_flush_token); + if (_flush_token->stop_requested()) { return false; } - assert(heap.empty()); - numWordIds = out.getWordNum(); + assert(_word_heap->empty()); + _word_heap.reset(); + _num_word_ids = out.getWordNum(); // Close files - for (auto &i : readers) { + for (auto &i : _word_readers) { i->close(); } + _word_readers.clear(); // Now read mapping files back into an array // XXX: avoid this, and instead make the array here - if (!read_mapping_files(list)) { + if (!read_mapping_files()) { return false; } LOG(debug, "Finished renumbering words IDs for field %s", indexName.c_str()); @@ -210,8 +221,9 @@ FieldMerger::allocate_field_length_scanner() } bool -FieldMerger::open_input_field_readers(const WordNumMappingList& list, std::vector<std::unique_ptr<FieldReader>>& readers) +FieldMerger::open_input_field_readers() { + _readers.reserve(_fusion_out_index.get_old_indexes().size()); SchemaUtil::IndexIterator index(_fusion_out_index.get_schema(), _id); auto field_length_scanner = allocate_field_length_scanner(); vespalib::string indexName = index.getName(); @@ -221,31 +233,35 @@ FieldMerger::open_input_field_readers(const WordNumMappingList& list, std::vecto continue; // drop data } auto reader = FieldReader::allocFieldReader(index, oldSchema, field_length_scanner); - reader->setup(list[oi.getIndex()], oi.getDocIdMapping()); + reader->setup(_word_num_mappings[oi.getIndex()], oi.getDocIdMapping()); if (!reader->open(oi.getPath() + "/" + indexName + "/", _fusion_out_index.get_tune_file_indexing()._read)) { return false; } - readers.push_back(std::move(reader)); + _readers.push_back(std::move(reader)); } return true; } bool -FieldMerger::open_field_writer(FieldWriter& writer, const FieldLengthInfo& field_length_info) +FieldMerger::open_field_writer() { + FieldLengthInfo field_length_info; + if (!_readers.empty()) { + field_length_info = _readers.back()->get_field_length_info(); + } SchemaUtil::IndexIterator index(_fusion_out_index.get_schema(), _id); - if (!writer.open(_field_dir + "/", 64, 262144, _fusion_out_index.get_dynamic_k_pos_index_format(), - index.use_interleaved_features(), index.getSchema(), - index.getIndex(), - field_length_info, - _fusion_out_index.get_tune_file_indexing()._write, _fusion_out_index.get_file_header_context())) { + if (!_writer->open(_field_dir + "/", 64, 262144, _fusion_out_index.get_dynamic_k_pos_index_format(), + index.use_interleaved_features(), index.getSchema(), + index.getIndex(), + field_length_info, + _fusion_out_index.get_tune_file_indexing()._write, _fusion_out_index.get_file_header_context())) { throw IllegalArgumentException(make_string("Could not open output posocc + dictionary in %s", _field_dir.c_str())); } return true; } bool -FieldMerger::select_cooked_or_raw_features(FieldReader& reader, FieldWriter& writer) +FieldMerger::select_cooked_or_raw_features(FieldReader& reader) { bool rawFormatOK = true; bool cookedFormatOK = true; @@ -258,7 +274,7 @@ FieldMerger::select_cooked_or_raw_features(FieldReader& reader, FieldWriter& wri return true; } { - writer.getFeatureParams(featureParams); + _writer->getFeatureParams(featureParams); cookedFormat = featureParams.getStr("cookedEncoding"); rawFormat = featureParams.getStr("encoding"); if (rawFormat == "") { @@ -304,65 +320,63 @@ FieldMerger::select_cooked_or_raw_features(FieldReader& reader, FieldWriter& wri } bool -FieldMerger::setup_merge_heap(const std::vector<std::unique_ptr<FieldReader>>& readers, FieldWriter& writer, PostingPriorityQueue<FieldReader>& heap) +FieldMerger::setup_merge_heap() { - for (auto &reader : readers) { - if (!select_cooked_or_raw_features(*reader, writer)) { + _heap = std::make_unique<PostingPriorityQueue<FieldReader>>(); + for (auto &reader : _readers) { + if (!select_cooked_or_raw_features(*reader)) { return false; } if (reader->isValid()) { reader->read(); } if (reader->isValid()) { - heap.initialAdd(reader.get()); + _heap->initialAdd(reader.get()); } } return true; } bool -FieldMerger::merge_postings(const WordNumMappingList& list, uint64_t numWordIds, const IFlushToken& flush_token) +FieldMerger::merge_postings() { SchemaUtil::IndexIterator index(_fusion_out_index.get_schema(), _id); - std::vector<std::unique_ptr<FieldReader>> readers; - PostingPriorityQueue<FieldReader> heap; /* OUTPUT */ - FieldWriter fieldWriter(_fusion_out_index.get_doc_id_limit(), numWordIds); + _writer = std::make_unique<FieldWriter>(_fusion_out_index.get_doc_id_limit(), _num_word_ids); vespalib::string indexName = index.getName(); - if (!open_input_field_readers(list, readers)) { + if (!open_input_field_readers()) { return false; } - FieldLengthInfo field_length_info; - if (!readers.empty()) { - field_length_info = readers.back()->get_field_length_info(); - } - if (!open_field_writer(fieldWriter, field_length_info)) { + if (!open_field_writer()) { return false; } - if (!setup_merge_heap(readers, fieldWriter, heap)) { + if (!setup_merge_heap()) { return false; } - heap.merge(fieldWriter, 4, flush_token); - if (flush_token.stop_requested()) { + _heap->merge(*_writer, 4, *_flush_token); + if (_flush_token->stop_requested()) { return false; } - assert(heap.empty()); + assert(_heap->empty()); + _heap.reset(); - for (auto &reader : readers) { + for (auto &reader : _readers) { if (!reader->close()) { return false; } } - if (!fieldWriter.close()) { + _readers.clear(); + if (!_writer->close()) { throw IllegalArgumentException(make_string("Could not close output posocc + dictionary in %s", _field_dir.c_str())); } + _writer.reset(); return true; } bool -FieldMerger::merge_field(std::shared_ptr<IFlushToken> flush_token) +FieldMerger::merge_field() { const Schema &schema = _fusion_out_index.get_schema(); SchemaUtil::IndexIterator index(schema, _id); @@ -381,10 +395,8 @@ FieldMerger::merge_field(std::shared_ptr<IFlushToken> flush_token) make_tmp_dirs(); - WordNumMappingList list(_fusion_out_index.get_old_indexes().size()); - uint64_t numWordIds(0); - if (!renumber_word_ids(list, numWordIds, *flush_token)) { - if (flush_token->stop_requested()) { + if (!renumber_word_ids()) { + if (_flush_token->stop_requested()) { return false; } LOG(error, "Could not renumber field word ids for field %s dir %s", indexName.c_str(), _field_dir.c_str()); @@ -392,9 +404,9 @@ FieldMerger::merge_field(std::shared_ptr<IFlushToken> flush_token) } // Tokamak - bool res = merge_postings(list, numWordIds, *flush_token); + bool res = merge_postings(); if (!res) { - if (flush_token->stop_requested()) { + if (_flush_token->stop_requested()) { return false; } throw IllegalArgumentException(make_string("Could not merge field postings for field %s dir %s", diff --git a/searchlib/src/vespa/searchlib/diskindex/field_merger.h b/searchlib/src/vespa/searchlib/diskindex/field_merger.h index 31c2818cf17..a57005e18a4 100644 --- a/searchlib/src/vespa/searchlib/diskindex/field_merger.h +++ b/searchlib/src/vespa/searchlib/diskindex/field_merger.h @@ -12,8 +12,6 @@ class IFlushToken; template <class IN> class PostingPriorityQueue; } -namespace search::index { class FieldLengthInfo; } - namespace search::diskindex { class DictionaryWordReader; @@ -33,22 +31,30 @@ class FieldMerger uint32_t _id; vespalib::string _field_dir; const FusionOutputIndex& _fusion_out_index; + std::shared_ptr<IFlushToken> _flush_token; + std::vector<std::unique_ptr<DictionaryWordReader>> _word_readers; + std::unique_ptr<PostingPriorityQueue<DictionaryWordReader>> _word_heap; + WordNumMappingList _word_num_mappings; + uint64_t _num_word_ids; + std::vector<std::unique_ptr<FieldReader>> _readers; + std::unique_ptr<PostingPriorityQueue<FieldReader>> _heap; + std::unique_ptr<FieldWriter> _writer; void make_tmp_dirs(); bool clean_tmp_dirs(); - bool open_input_word_readers(std::vector<std::unique_ptr<DictionaryWordReader>>& readers, PostingPriorityQueue<DictionaryWordReader>& heap); - bool read_mapping_files(WordNumMappingList& list); - bool renumber_word_ids(WordNumMappingList& list, uint64_t& numWordIds, const IFlushToken& flush_token); + bool open_input_word_readers(); + bool read_mapping_files(); + bool renumber_word_ids(); std::shared_ptr<FieldLengthScanner> allocate_field_length_scanner(); - bool open_input_field_readers(const WordNumMappingList& list, std::vector<std::unique_ptr<FieldReader>>& readers); - bool open_field_writer(FieldWriter& writer, const index::FieldLengthInfo& field_length_info); - bool select_cooked_or_raw_features(FieldReader& reader, FieldWriter& writer); - bool setup_merge_heap(const std::vector<std::unique_ptr<FieldReader>>& readers, FieldWriter& writer, PostingPriorityQueue<FieldReader>& heap); - bool merge_postings(const WordNumMappingList& list, uint64_t numWordIds, const IFlushToken& flush_token); + bool open_input_field_readers(); + bool open_field_writer(); + bool select_cooked_or_raw_features(FieldReader& reader); + bool setup_merge_heap(); + bool merge_postings(); public: - FieldMerger(uint32_t id, const FusionOutputIndex& fusion_out_index); + FieldMerger(uint32_t id, const FusionOutputIndex& fusion_out_index, std::shared_ptr<IFlushToken> flush_token); ~FieldMerger(); - bool merge_field(std::shared_ptr<IFlushToken> flush_token); + bool merge_field(); }; } diff --git a/searchlib/src/vespa/searchlib/diskindex/fusion.cpp b/searchlib/src/vespa/searchlib/diskindex/fusion.cpp index 5891849959b..eafbbac361b 100644 --- a/searchlib/src/vespa/searchlib/diskindex/fusion.cpp +++ b/searchlib/src/vespa/searchlib/diskindex/fusion.cpp @@ -86,8 +86,8 @@ Fusion::mergeFields(vespalib::ThreadExecutor & executor, std::shared_ptr<IFlushT for (SchemaUtil::IndexIterator iter(schema); iter.isValid(); ++iter) { concurrent.wait(); executor.execute(vespalib::makeLambdaTask([this, index=iter.getIndex(), &failed, &done, &concurrent, flush_token]() { - FieldMerger merger(index, _fusion_out_index); - if (!merger.merge_field(flush_token)) { + FieldMerger merger(index, _fusion_out_index, flush_token); + if (!merger.merge_field()) { failed++; } concurrent.post(); |