diff options
-rw-r--r-- | searchlib/src/vespa/searchlib/diskindex/field_merger.cpp | 124 | ||||
-rw-r--r-- | searchlib/src/vespa/searchlib/diskindex/field_merger.h | 18 |
2 files changed, 103 insertions, 39 deletions
diff --git a/searchlib/src/vespa/searchlib/diskindex/field_merger.cpp b/searchlib/src/vespa/searchlib/diskindex/field_merger.cpp index 67ec7faa621..6179f06c9da 100644 --- a/searchlib/src/vespa/searchlib/diskindex/field_merger.cpp +++ b/searchlib/src/vespa/searchlib/diskindex/field_merger.cpp @@ -63,7 +63,9 @@ FieldMerger::FieldMerger(uint32_t id, const FusionOutputIndex& fusion_out_index, _num_word_ids(0), _readers(), _heap(), - _writer() + _writer(), + _state(State::MERGE_START), + _failed(false) { } @@ -209,16 +211,14 @@ FieldMerger::renumber_word_ids_finish() return true; } -bool -FieldMerger::renumber_word_ids() +void +FieldMerger::renumber_word_ids_failed() { - if (!renumber_word_ids_start()) { - return false; - } - if (!renumber_word_ids_main()) { - return false; + _failed = true; + if (_flush_token->stop_requested()) { + return; } - return renumber_word_ids_finish(); + LOG(error, "Could not renumber field word ids for field %s dir %s", _field_name.c_str(), _field_dir.c_str()); } std::shared_ptr<FieldLengthScanner> @@ -403,30 +403,31 @@ FieldMerger::merge_postings_finish() return true; } -bool -FieldMerger::merge_postings() +void +FieldMerger::merge_postings_failed() { - if (!merge_postings_start()) { - return false; - } - if (!merge_postings_main()) { - return false; + _failed = true; + if (_flush_token->stop_requested()) { + return; } - return merge_postings_finish(); + throw IllegalArgumentException(make_string("Could not merge field postings for field %s dir %s", + _field_name.c_str(), _field_dir.c_str())); } -bool -FieldMerger::merge_field() +void +FieldMerger::merge_field_start() { const Schema &schema = _fusion_out_index.get_schema(); SchemaUtil::IndexIterator index(schema, _id); SchemaUtil::IndexSettings settings = index.getIndexSettings(); if (settings.hasError()) { - return false; + _failed = true; + return; } if (FileKit::hasStamp(_field_dir + "/.mergeocc_done")) { - return true; + _state = State::MERGE_DONE; + return; } vespalib::mkdir(_field_dir, false); @@ -434,35 +435,84 @@ FieldMerger::merge_field() make_tmp_dirs(); - if (!renumber_word_ids()) { - if (_flush_token->stop_requested()) { - return false; - } - LOG(error, "Could not renumber field word ids for field %s dir %s", _field_name.c_str(), _field_dir.c_str()); - return false; + if (!renumber_word_ids_start()) { + renumber_word_ids_failed(); + return; } + _state = State::RENUMBER_WORD_IDS; +} - // Tokamak - bool res = merge_postings(); +void +FieldMerger::merge_field_finish() +{ + bool res = merge_postings_finish(); if (!res) { - if (_flush_token->stop_requested()) { - return false; - } - throw IllegalArgumentException(make_string("Could not merge field postings for field %s dir %s", - _field_name.c_str(), _field_dir.c_str())); + merge_postings_failed(); + _failed = true; + return; } if (!FileKit::createStamp(_field_dir + "/.mergeocc_done")) { - return false; + _failed = true; + return; } vespalib::File::sync(_field_dir); if (!clean_tmp_dirs()) { - return false; + _failed = true; + return; } LOG(debug, "Finished merge_field for field %s dir %s", _field_name.c_str(), _field_dir.c_str()); - return true; + _state = State::MERGE_DONE; +} + +void +FieldMerger::process_merge_field() +{ + switch (_state) { + case State::MERGE_START: + merge_field_start(); + break; + case State::RENUMBER_WORD_IDS: + if (!renumber_word_ids_main()) { + renumber_word_ids_failed(); + } else { + _state = State::RENUMBER_WORD_IDS_FINISH; + } + break; + case State::RENUMBER_WORD_IDS_FINISH: + if (!renumber_word_ids_finish()) { + renumber_word_ids_failed(); + } else if (!merge_postings_start()) { + merge_postings_failed(); + } else { + _state = State::MERGE_POSTINGS; + } + break; + case State::MERGE_POSTINGS: + if (!merge_postings_main()) { + merge_postings_failed(); + } else { + _state = State::MERGE_POSTINGS_FINISH; + } + break; + case State::MERGE_POSTINGS_FINISH: + merge_field_finish(); + break; + case State::MERGE_DONE: + default: + LOG_ABORT("should not be reached"); + } +} + +bool +FieldMerger::merge_field() +{ + while (!_failed && _state != State::MERGE_DONE) { + process_merge_field(); + } + return !_failed; } } diff --git a/searchlib/src/vespa/searchlib/diskindex/field_merger.h b/searchlib/src/vespa/searchlib/diskindex/field_merger.h index bbb2f210511..c5ce337e845 100644 --- a/searchlib/src/vespa/searchlib/diskindex/field_merger.h +++ b/searchlib/src/vespa/searchlib/diskindex/field_merger.h @@ -29,6 +29,15 @@ class FieldMerger { using WordNumMappingList = std::vector<WordNumMapping>; + enum class State { + MERGE_START, + RENUMBER_WORD_IDS, + RENUMBER_WORD_IDS_FINISH, + MERGE_POSTINGS, + MERGE_POSTINGS_FINISH, + MERGE_DONE + }; + uint32_t _id; vespalib::string _field_name; vespalib::string _field_dir; @@ -42,6 +51,8 @@ class FieldMerger std::vector<std::unique_ptr<FieldReader>> _readers; std::unique_ptr<PostingPriorityQueueMerger<FieldReader, FieldWriter>> _heap; std::unique_ptr<FieldWriter> _writer; + State _state; + bool _failed; void make_tmp_dirs(); bool clean_tmp_dirs(); @@ -50,7 +61,7 @@ class FieldMerger bool renumber_word_ids_start(); bool renumber_word_ids_main(); bool renumber_word_ids_finish(); - bool renumber_word_ids(); + void renumber_word_ids_failed(); std::shared_ptr<FieldLengthScanner> allocate_field_length_scanner(); bool open_input_field_readers(); bool open_field_writer(); @@ -59,10 +70,13 @@ class FieldMerger bool merge_postings_start(); bool merge_postings_main(); bool merge_postings_finish(); - bool merge_postings(); + void merge_postings_failed(); public: FieldMerger(uint32_t id, const FusionOutputIndex& fusion_out_index, std::shared_ptr<IFlushToken> flush_token); ~FieldMerger(); + void merge_field_start(); + void merge_field_finish(); + void process_merge_field(); // Called multiple times bool merge_field(); }; |