aboutsummaryrefslogtreecommitdiffstats
path: root/searchlib
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@online.no>2022-01-07 12:29:26 +0100
committerTor Egge <Tor.Egge@online.no>2022-01-07 12:29:26 +0100
commit2e57bf027b5020731ab950d34af7b0fe8d708234 (patch)
treeb6b7e79140adedec1df326477db601e855e980d1 /searchlib
parentea8199921bb9b46e86ef7f2c715c2e00b0708ade (diff)
Add FieldMerger state machine.
Diffstat (limited to 'searchlib')
-rw-r--r--searchlib/src/vespa/searchlib/diskindex/field_merger.cpp124
-rw-r--r--searchlib/src/vespa/searchlib/diskindex/field_merger.h18
2 files changed, 103 insertions, 39 deletions
diff --git a/searchlib/src/vespa/searchlib/diskindex/field_merger.cpp b/searchlib/src/vespa/searchlib/diskindex/field_merger.cpp
index 67ec7faa621..6179f06c9da 100644
--- a/searchlib/src/vespa/searchlib/diskindex/field_merger.cpp
+++ b/searchlib/src/vespa/searchlib/diskindex/field_merger.cpp
@@ -63,7 +63,9 @@ FieldMerger::FieldMerger(uint32_t id, const FusionOutputIndex& fusion_out_index,
_num_word_ids(0),
_readers(),
_heap(),
- _writer()
+ _writer(),
+ _state(State::MERGE_START),
+ _failed(false)
{
}
@@ -209,16 +211,14 @@ FieldMerger::renumber_word_ids_finish()
return true;
}
-bool
-FieldMerger::renumber_word_ids()
+void
+FieldMerger::renumber_word_ids_failed()
{
- if (!renumber_word_ids_start()) {
- return false;
- }
- if (!renumber_word_ids_main()) {
- return false;
+ _failed = true;
+ if (_flush_token->stop_requested()) {
+ return;
}
- return renumber_word_ids_finish();
+ LOG(error, "Could not renumber field word ids for field %s dir %s", _field_name.c_str(), _field_dir.c_str());
}
std::shared_ptr<FieldLengthScanner>
@@ -403,30 +403,31 @@ FieldMerger::merge_postings_finish()
return true;
}
-bool
-FieldMerger::merge_postings()
+void
+FieldMerger::merge_postings_failed()
{
- if (!merge_postings_start()) {
- return false;
- }
- if (!merge_postings_main()) {
- return false;
+ _failed = true;
+ if (_flush_token->stop_requested()) {
+ return;
}
- return merge_postings_finish();
+ throw IllegalArgumentException(make_string("Could not merge field postings for field %s dir %s",
+ _field_name.c_str(), _field_dir.c_str()));
}
-bool
-FieldMerger::merge_field()
+void
+FieldMerger::merge_field_start()
{
const Schema &schema = _fusion_out_index.get_schema();
SchemaUtil::IndexIterator index(schema, _id);
SchemaUtil::IndexSettings settings = index.getIndexSettings();
if (settings.hasError()) {
- return false;
+ _failed = true;
+ return;
}
if (FileKit::hasStamp(_field_dir + "/.mergeocc_done")) {
- return true;
+ _state = State::MERGE_DONE;
+ return;
}
vespalib::mkdir(_field_dir, false);
@@ -434,35 +435,84 @@ FieldMerger::merge_field()
make_tmp_dirs();
- if (!renumber_word_ids()) {
- if (_flush_token->stop_requested()) {
- return false;
- }
- LOG(error, "Could not renumber field word ids for field %s dir %s", _field_name.c_str(), _field_dir.c_str());
- return false;
+ if (!renumber_word_ids_start()) {
+ renumber_word_ids_failed();
+ return;
}
+ _state = State::RENUMBER_WORD_IDS;
+}
- // Tokamak
- bool res = merge_postings();
+void
+FieldMerger::merge_field_finish()
+{
+ bool res = merge_postings_finish();
if (!res) {
- if (_flush_token->stop_requested()) {
- return false;
- }
- throw IllegalArgumentException(make_string("Could not merge field postings for field %s dir %s",
- _field_name.c_str(), _field_dir.c_str()));
+ merge_postings_failed();
+ _failed = true;
+ return;
}
if (!FileKit::createStamp(_field_dir + "/.mergeocc_done")) {
- return false;
+ _failed = true;
+ return;
}
vespalib::File::sync(_field_dir);
if (!clean_tmp_dirs()) {
- return false;
+ _failed = true;
+ return;
}
LOG(debug, "Finished merge_field for field %s dir %s", _field_name.c_str(), _field_dir.c_str());
- return true;
+ _state = State::MERGE_DONE;
+}
+
+void
+FieldMerger::process_merge_field()
+{
+ switch (_state) {
+ case State::MERGE_START:
+ merge_field_start();
+ break;
+ case State::RENUMBER_WORD_IDS:
+ if (!renumber_word_ids_main()) {
+ renumber_word_ids_failed();
+ } else {
+ _state = State::RENUMBER_WORD_IDS_FINISH;
+ }
+ break;
+ case State::RENUMBER_WORD_IDS_FINISH:
+ if (!renumber_word_ids_finish()) {
+ renumber_word_ids_failed();
+ } else if (!merge_postings_start()) {
+ merge_postings_failed();
+ } else {
+ _state = State::MERGE_POSTINGS;
+ }
+ break;
+ case State::MERGE_POSTINGS:
+ if (!merge_postings_main()) {
+ merge_postings_failed();
+ } else {
+ _state = State::MERGE_POSTINGS_FINISH;
+ }
+ break;
+ case State::MERGE_POSTINGS_FINISH:
+ merge_field_finish();
+ break;
+ case State::MERGE_DONE:
+ default:
+ LOG_ABORT("should not be reached");
+ }
+}
+
+bool
+FieldMerger::merge_field()
+{
+ while (!_failed && _state != State::MERGE_DONE) {
+ process_merge_field();
+ }
+ return !_failed;
}
}
diff --git a/searchlib/src/vespa/searchlib/diskindex/field_merger.h b/searchlib/src/vespa/searchlib/diskindex/field_merger.h
index bbb2f210511..c5ce337e845 100644
--- a/searchlib/src/vespa/searchlib/diskindex/field_merger.h
+++ b/searchlib/src/vespa/searchlib/diskindex/field_merger.h
@@ -29,6 +29,15 @@ class FieldMerger
{
using WordNumMappingList = std::vector<WordNumMapping>;
+ enum class State {
+ MERGE_START,
+ RENUMBER_WORD_IDS,
+ RENUMBER_WORD_IDS_FINISH,
+ MERGE_POSTINGS,
+ MERGE_POSTINGS_FINISH,
+ MERGE_DONE
+ };
+
uint32_t _id;
vespalib::string _field_name;
vespalib::string _field_dir;
@@ -42,6 +51,8 @@ class FieldMerger
std::vector<std::unique_ptr<FieldReader>> _readers;
std::unique_ptr<PostingPriorityQueueMerger<FieldReader, FieldWriter>> _heap;
std::unique_ptr<FieldWriter> _writer;
+ State _state;
+ bool _failed;
void make_tmp_dirs();
bool clean_tmp_dirs();
@@ -50,7 +61,7 @@ class FieldMerger
bool renumber_word_ids_start();
bool renumber_word_ids_main();
bool renumber_word_ids_finish();
- bool renumber_word_ids();
+ void renumber_word_ids_failed();
std::shared_ptr<FieldLengthScanner> allocate_field_length_scanner();
bool open_input_field_readers();
bool open_field_writer();
@@ -59,10 +70,13 @@ class FieldMerger
bool merge_postings_start();
bool merge_postings_main();
bool merge_postings_finish();
- bool merge_postings();
+ void merge_postings_failed();
public:
FieldMerger(uint32_t id, const FusionOutputIndex& fusion_out_index, std::shared_ptr<IFlushToken> flush_token);
~FieldMerger();
+ void merge_field_start();
+ void merge_field_finish();
+ void process_merge_field(); // Called multiple times
bool merge_field();
};