aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@online.no>2022-01-15 17:15:56 +0100
committerTor Egge <Tor.Egge@online.no>2022-01-16 00:16:08 +0100
commitb7bbe73ab4abf0c13fa6618b44a676def05a5958 (patch)
tree350f217ea0fcc1035f302f496269776f19681a73
parent8c23868cd630aa1241c6e26903d2b72c98862f99 (diff)
Split element length scanning into smaller chunks.
-rw-r--r--searchlib/src/tests/diskindex/fusion/fusion_test.cpp20
-rw-r--r--searchlib/src/vespa/searchlib/diskindex/field_merger.cpp99
-rw-r--r--searchlib/src/vespa/searchlib/diskindex/field_merger.h14
-rw-r--r--searchlib/src/vespa/searchlib/diskindex/fieldreader.cpp33
-rw-r--r--searchlib/src/vespa/searchlib/diskindex/fieldreader.h5
5 files changed, 132 insertions, 39 deletions
diff --git a/searchlib/src/tests/diskindex/fusion/fusion_test.cpp b/searchlib/src/tests/diskindex/fusion/fusion_test.cpp
index 72867edf474..37fac07a8d4 100644
--- a/searchlib/src/tests/diskindex/fusion/fusion_test.cpp
+++ b/searchlib/src/tests/diskindex/fusion/fusion_test.cpp
@@ -63,12 +63,14 @@ class FusionTest : public ::testing::Test
{
protected:
Schema _schema;
+ bool _force_small_merge_chunk;
const Schema & getSchema() const { return _schema; }
void requireThatFusionIsWorking(const vespalib::string &prefix, bool directio, bool readmmap, bool force_short_merge_chunk);
void make_simple_index(const vespalib::string &dump_dir, const IFieldLengthInspector &field_length_inspector);
bool try_merge_simple_indexes(const vespalib::string &dump_dir, const std::vector<vespalib::string> &sources, std::shared_ptr<IFlushToken> flush_token);
void merge_simple_indexes(const vespalib::string &dump_dir, const std::vector<vespalib::string> &sources);
+ void reconstruct_interleaved_features();
public:
FusionTest();
};
@@ -494,6 +496,7 @@ FusionTest::try_merge_simple_indexes(const vespalib::string &dump_dir, const std
SelectorArray selector(20, 0);
Fusion fusion(_schema, dump_dir, sources, selector,
tuneFileIndexing, fileHeaderContext);
+ fusion.set_force_small_merge_chunk(_force_small_merge_chunk);
return fusion.merge(executor, flush_token);
}
@@ -505,7 +508,8 @@ FusionTest::merge_simple_indexes(const vespalib::string &dump_dir, const std::ve
FusionTest::FusionTest()
: ::testing::Test(),
- _schema(make_schema(false))
+ _schema(make_schema(false)),
+ _force_small_merge_chunk(false)
{
}
@@ -557,7 +561,8 @@ TEST_F(FusionTest, require_that_average_field_length_is_preserved)
clean_field_length_testdirs();
}
-TEST_F(FusionTest, require_that_interleaved_features_can_be_reconstructed)
+void
+FusionTest::reconstruct_interleaved_features()
{
clean_field_length_testdirs();
make_simple_index("fldump2", MockFieldLengthInspector());
@@ -573,6 +578,17 @@ TEST_F(FusionTest, require_that_interleaved_features_can_be_reconstructed)
clean_field_length_testdirs();
}
+TEST_F(FusionTest, require_that_interleaved_features_can_be_reconstructed)
+{
+ reconstruct_interleaved_features();
+}
+
+TEST_F(FusionTest, require_that_interleaved_features_can_be_reconstructed_with_small_merge_chunk)
+{
+ _force_small_merge_chunk = true;
+ reconstruct_interleaved_features();
+}
+
namespace {
void clean_stopped_fusion_testdirs()
diff --git a/searchlib/src/vespa/searchlib/diskindex/field_merger.cpp b/searchlib/src/vespa/searchlib/diskindex/field_merger.cpp
index 68672a0a930..c5ad0c63c21 100644
--- a/searchlib/src/vespa/searchlib/diskindex/field_merger.cpp
+++ b/searchlib/src/vespa/searchlib/diskindex/field_merger.cpp
@@ -43,6 +43,7 @@ constexpr uint32_t renumber_word_ids_heap_limit = 4;
constexpr uint32_t renumber_word_ids_merge_chunk = 1000000;
constexpr uint32_t merge_postings_heap_limit = 4;
constexpr uint32_t merge_postings_merge_chunk = 50000;
+constexpr uint32_t scan_chunk = 80000;
vespalib::string
createTmpPath(const vespalib::string & base, uint32_t index) {
@@ -69,6 +70,8 @@ FieldMerger::FieldMerger(uint32_t id, const FusionOutputIndex& fusion_out_index,
_readers(),
_heap(),
_writer(),
+ _field_length_scanner(),
+ _open_reader_idx(std::numeric_limits<uint32_t>::max()),
_state(State::MERGE_START),
_failed(false)
{
@@ -228,7 +231,7 @@ FieldMerger::renumber_word_ids_failed()
LOG(error, "Could not renumber field word ids for field %s dir %s", _field_name.c_str(), _field_dir.c_str());
}
-std::shared_ptr<FieldLengthScanner>
+void
FieldMerger::allocate_field_length_scanner()
{
SchemaUtil::IndexIterator index(_fusion_out_index.get_schema(), _id);
@@ -242,35 +245,72 @@ FieldMerger::allocate_field_length_scanner()
const Schema &old_schema = old_index.getSchema();
if (index.hasOldFields(old_schema) &&
!index.has_matching_use_interleaved_features(old_schema)) {
- return std::make_shared<FieldLengthScanner>(_fusion_out_index.get_doc_id_limit());
+ _field_length_scanner = std::make_shared<FieldLengthScanner>(_fusion_out_index.get_doc_id_limit());
+ return;
}
}
}
}
- return std::shared_ptr<FieldLengthScanner>();
}
bool
+FieldMerger::open_input_field_reader()
+{
+ auto& oi = _fusion_out_index.get_old_indexes()[_open_reader_idx];
+ if (!_readers.back()->open(oi.getPath() + "/" + _field_name + "/", _fusion_out_index.get_tune_file_indexing()._read)) {
+ _readers.pop_back();
+ return false;
+ }
+ return true;
+}
+
+void
FieldMerger::open_input_field_readers()
{
- _readers.reserve(_fusion_out_index.get_old_indexes().size());
SchemaUtil::IndexIterator index(_fusion_out_index.get_schema(), _id);
- auto field_length_scanner = allocate_field_length_scanner();
- for (const auto &oi : _fusion_out_index.get_old_indexes()) {
+ for (; _open_reader_idx < _fusion_out_index.get_old_indexes().size(); ++_open_reader_idx) {
+ auto& oi = _fusion_out_index.get_old_indexes()[_open_reader_idx];
const Schema &oldSchema = oi.getSchema();
if (!index.hasOldFields(oldSchema)) {
continue; // drop data
}
- auto reader = FieldReader::allocFieldReader(index, oldSchema, field_length_scanner);
- reader->setup(_word_num_mappings[oi.getIndex()], oi.getDocIdMapping());
- if (!reader->open(oi.getPath() + "/" + _field_name + "/", _fusion_out_index.get_tune_file_indexing()._read)) {
- return false;
+ _readers.push_back(FieldReader::allocFieldReader(index, oldSchema, _field_length_scanner));
+ auto& reader = *_readers.back();
+ reader.setup(_word_num_mappings[oi.getIndex()], oi.getDocIdMapping());
+ if (!open_input_field_reader()) {
+ merge_postings_failed();
+ return;
+ }
+ if (reader.need_regenerate_interleaved_features_scan()) {
+ _state = State::SCAN_ELEMENT_LENGTHS;
+ return;
}
- _readers.push_back(std::move(reader));
}
- return true;
+ _field_length_scanner.reset();
+ _open_reader_idx = std::numeric_limits<uint32_t>::max();
+ _state = State::OPEN_POSTINGS_FIELD_READERS_FINISH;
+}
+
+void
+FieldMerger::scan_element_lengths()
+{
+ auto& reader = *_readers.back();
+ if (reader.isValid()) {
+ reader.scan_element_lengths(_fusion_out_index.get_force_small_merge_chunk() ? 1u : scan_chunk);
+ if (reader.isValid()) {
+ return;
+ }
+ }
+ reader.close();
+ if (!open_input_field_reader()) {
+ merge_postings_failed();
+ } else {
+ ++_open_reader_idx;
+ _state = State::OPEN_POSTINGS_FIELD_READERS;
+ }
}
+
bool
FieldMerger::open_field_writer()
{
@@ -368,19 +408,25 @@ FieldMerger::setup_merge_heap()
return true;
}
-bool
+void
FieldMerger::merge_postings_start()
{
/* OUTPUT */
_writer = std::make_unique<FieldWriter>(_fusion_out_index.get_doc_id_limit(), _num_word_ids);
+ _readers.reserve(_fusion_out_index.get_old_indexes().size());
+ allocate_field_length_scanner();
+ _open_reader_idx = 0;
+ _state = State::OPEN_POSTINGS_FIELD_READERS;
+}
- if (!open_input_field_readers()) {
- return false;
- }
- if (!open_field_writer()) {
- return false;
+void
+FieldMerger::merge_postings_open_field_readers_done()
+{
+ if (!open_field_writer() || !setup_merge_heap()) {
+ merge_postings_failed();
+ } else {
+ _state = State::MERGE_POSTINGS;
}
- return setup_merge_heap();
}
void
@@ -457,7 +503,6 @@ FieldMerger::merge_field_finish()
bool res = merge_postings_finish();
if (!res) {
merge_postings_failed();
- _failed = true;
return;
}
if (!FileKit::createStamp(_field_dir + "/.mergeocc_done")) {
@@ -489,11 +534,19 @@ FieldMerger::process_merge_field()
case State::RENUMBER_WORD_IDS_FINISH:
if (!renumber_word_ids_finish()) {
renumber_word_ids_failed();
- } else if (!merge_postings_start()) {
- merge_postings_failed();
+ break;
} else {
- _state = State::MERGE_POSTINGS;
+ merge_postings_start();
}
+ [[fallthrough]];
+ case State::OPEN_POSTINGS_FIELD_READERS:
+ open_input_field_readers();
+ break;
+ case State::SCAN_ELEMENT_LENGTHS:
+ scan_element_lengths();
+ break;
+ case State::OPEN_POSTINGS_FIELD_READERS_FINISH:
+ merge_postings_open_field_readers_done();
break;
case State::MERGE_POSTINGS:
merge_postings_main();
diff --git a/searchlib/src/vespa/searchlib/diskindex/field_merger.h b/searchlib/src/vespa/searchlib/diskindex/field_merger.h
index 5017a7d5192..380baa3e060 100644
--- a/searchlib/src/vespa/searchlib/diskindex/field_merger.h
+++ b/searchlib/src/vespa/searchlib/diskindex/field_merger.h
@@ -33,6 +33,9 @@ class FieldMerger
MERGE_START,
RENUMBER_WORD_IDS,
RENUMBER_WORD_IDS_FINISH,
+ OPEN_POSTINGS_FIELD_READERS,
+ SCAN_ELEMENT_LENGTHS,
+ OPEN_POSTINGS_FIELD_READERS_FINISH,
MERGE_POSTINGS,
MERGE_POSTINGS_FINISH,
MERGE_DONE
@@ -51,6 +54,8 @@ class FieldMerger
std::vector<std::unique_ptr<FieldReader>> _readers;
std::unique_ptr<PostingPriorityQueueMerger<FieldReader, FieldWriter>> _heap;
std::unique_ptr<FieldWriter> _writer;
+ std::shared_ptr<FieldLengthScanner> _field_length_scanner;
+ uint32_t _open_reader_idx;
State _state;
bool _failed;
bool _force_small_merge_chunk;
@@ -63,12 +68,15 @@ class FieldMerger
void renumber_word_ids_main();
bool renumber_word_ids_finish();
void renumber_word_ids_failed();
- std::shared_ptr<FieldLengthScanner> allocate_field_length_scanner();
- bool open_input_field_readers();
+ void allocate_field_length_scanner();
+ bool open_input_field_reader();
+ void open_input_field_readers();
+ void scan_element_lengths();
bool open_field_writer();
bool select_cooked_or_raw_features(FieldReader& reader);
bool setup_merge_heap();
- bool merge_postings_start();
+ void merge_postings_start();
+ void merge_postings_open_field_readers_done();
void merge_postings_main();
bool merge_postings_finish();
void merge_postings_failed();
diff --git a/searchlib/src/vespa/searchlib/diskindex/fieldreader.cpp b/searchlib/src/vespa/searchlib/diskindex/fieldreader.cpp
index 202ac2b08ff..1794db8bba3 100644
--- a/searchlib/src/vespa/searchlib/diskindex/fieldreader.cpp
+++ b/searchlib/src/vespa/searchlib/diskindex/fieldreader.cpp
@@ -99,6 +99,16 @@ FieldReader::allowRawFeatures()
return true;
}
+bool
+FieldReader::need_regenerate_interleaved_features_scan()
+{
+ return false;
+}
+
+void
+FieldReader::scan_element_lengths(uint32_t)
+{
+}
void
FieldReader::setup(const WordNumMapping &wordNumMapping,
@@ -277,27 +287,30 @@ FieldReaderStripInfo::open(const vespalib::string &prefix, const TuneFileSeqRead
_regenerate_interleaved_features = true;
}
}
- if (_regenerate_interleaved_features && _hasElements && _field_length_scanner) {
- scan_element_lengths();
- close();
- if (!FieldReader::open(prefix, tuneFileRead)) {
- return false;
- }
- }
return true;
}
+bool
+FieldReaderStripInfo::need_regenerate_interleaved_features_scan()
+{
+ return (_regenerate_interleaved_features && _hasElements && _field_length_scanner);
+}
+
void
-FieldReaderStripInfo::scan_element_lengths()
+FieldReaderStripInfo::scan_element_lengths(uint32_t scan_chunk)
{
- for (;;) {
+ if (!isValid()) {
+ return;
+ }
+ while (scan_chunk != 0u) {
FieldReader::read();
- if (_wordNum == noWordNumHigh()) {
+ if (!isValid()) {
break;
}
DocIdAndFeatures &features = _docIdAndFeatures;
assert(!features.has_raw_data());
_field_length_scanner->scan_features(features);
+ --scan_chunk;
}
}
diff --git a/searchlib/src/vespa/searchlib/diskindex/fieldreader.h b/searchlib/src/vespa/searchlib/diskindex/fieldreader.h
index ef319971fa7..75c8d35740e 100644
--- a/searchlib/src/vespa/searchlib/diskindex/fieldreader.h
+++ b/searchlib/src/vespa/searchlib/diskindex/fieldreader.h
@@ -72,6 +72,8 @@ public:
virtual void read();
virtual bool allowRawFeatures();
+ virtual bool need_regenerate_interleaved_features_scan();
+ virtual void scan_element_lengths(uint32_t scan_chunk);
void write(FieldWriter &writer) {
if (_wordNum != writer.getSparseWordNum()) {
@@ -131,9 +133,10 @@ private:
public:
FieldReaderStripInfo(const IndexIterator &index, std::shared_ptr<FieldLengthScanner>);
bool allowRawFeatures() override;
+ bool need_regenerate_interleaved_features_scan() override;
bool open(const vespalib::string &prefix, const TuneFileSeqRead &tuneFileRead) override;
void read() override;
- void scan_element_lengths();
+ void scan_element_lengths(uint32_t scan_chunk) override;
void getFeatureParams(PostingListParams &params) override;
};