diff options
author | Tor Egge <Tor.Egge@online.no> | 2022-01-09 15:52:25 +0100 |
---|---|---|
committer | Tor Egge <Tor.Egge@online.no> | 2022-01-09 16:48:03 +0100 |
commit | 70231237adb460ffa0cf3e289880aa3115432fba (patch) | |
tree | c3a829d9bf90057d3f7e7220ad560099cd98e495 /searchlib | |
parent | 9bfd641c4c4805f0ceb24deb607dd441bf93baa2 (diff) |
Simplify use of search::diskindex::Fusion class.
Diffstat (limited to 'searchlib')
7 files changed, 88 insertions, 79 deletions
diff --git a/searchlib/src/tests/diskindex/fusion/fusion_test.cpp b/searchlib/src/tests/diskindex/fusion/fusion_test.cpp index 6794b9c0f5c..ccc5e8cbade 100644 --- a/searchlib/src/tests/diskindex/fusion/fusion_test.cpp +++ b/searchlib/src/tests/diskindex/fusion/fusion_test.cpp @@ -357,7 +357,6 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire ib.setPrefix(dump2dir); uint32_t numDocs = 12 + 1; uint32_t numWords = fic.getNumUniqueWords(); - bool dynamicKPosOcc = false; MockFieldLengthInspector mock_field_length_inspector; TuneFileIndexing tuneFileIndexing; TuneFileSearch tuneFileSearch; @@ -392,9 +391,9 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire std::vector<vespalib::string> sources; SelectorArray selector(numDocs, 0); sources.push_back(prefix + "dump2"); - ASSERT_TRUE(Fusion::merge(schema, prefix + "dump3", sources, selector, - dynamicKPosOcc, - tuneFileIndexing,fileHeaderContext, executor, std::make_shared<FlushToken>())); + Fusion fusion(schema, prefix + "dump3", sources, selector, + tuneFileIndexing,fileHeaderContext); + ASSERT_TRUE(fusion.merge(executor, std::make_shared<FlushToken>())); } while (0); do { DiskIndex dw3(prefix + "dump3"); @@ -405,9 +404,9 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire std::vector<vespalib::string> sources; SelectorArray selector(numDocs, 0); sources.push_back(prefix + "dump3"); - ASSERT_TRUE(Fusion::merge(schema2, prefix + "dump4", sources, selector, - dynamicKPosOcc, - tuneFileIndexing, fileHeaderContext, executor, std::make_shared<FlushToken>())); + Fusion fusion(schema2, prefix + "dump4", sources, selector, + tuneFileIndexing, fileHeaderContext); + ASSERT_TRUE(fusion.merge(executor, std::make_shared<FlushToken>())); } while (0); do { DiskIndex dw4(prefix + "dump4"); @@ -418,9 +417,9 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire std::vector<vespalib::string> sources; SelectorArray selector(numDocs, 0); sources.push_back(prefix + "dump3"); - ASSERT_TRUE(Fusion::merge(schema3, prefix + "dump5", sources, selector, - dynamicKPosOcc, - tuneFileIndexing, fileHeaderContext, executor, std::make_shared<FlushToken>())); + Fusion fusion(schema3, prefix + "dump5", sources, selector, + tuneFileIndexing, fileHeaderContext); + ASSERT_TRUE(fusion.merge(executor, std::make_shared<FlushToken>())); } while (0); do { DiskIndex dw5(prefix + "dump5"); @@ -431,9 +430,10 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire std::vector<vespalib::string> sources; SelectorArray selector(numDocs, 0); sources.push_back(prefix + "dump3"); - ASSERT_TRUE(Fusion::merge(schema, prefix + "dump6", sources, selector, - !dynamicKPosOcc, - tuneFileIndexing, fileHeaderContext, executor, std::make_shared<FlushToken>())); + Fusion fusion(schema, prefix + "dump6", sources, selector, + tuneFileIndexing, fileHeaderContext); + fusion.set_dynamic_k_pos_index_format(true); + ASSERT_TRUE(fusion.merge(executor, std::make_shared<FlushToken>())); } while (0); do { DiskIndex dw6(prefix + "dump6"); @@ -444,9 +444,9 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire std::vector<vespalib::string> sources; SelectorArray selector(numDocs, 0); sources.push_back(prefix + "dump2"); - ASSERT_TRUE(Fusion::merge(schema, prefix + "dump3", sources, selector, - dynamicKPosOcc, - tuneFileIndexing, fileHeaderContext, executor, std::make_shared<FlushToken>())); + Fusion fusion(schema, prefix + "dump3", sources, selector, + tuneFileIndexing, fileHeaderContext); + ASSERT_TRUE(fusion.merge(executor, std::make_shared<FlushToken>())); } while (0); do { DiskIndex dw3(prefix + "dump3"); @@ -487,9 +487,9 @@ FusionTest::try_merge_simple_indexes(const vespalib::string &dump_dir, const std TuneFileIndexing tuneFileIndexing; DummyFileHeaderContext fileHeaderContext; SelectorArray selector(20, 0); - return Fusion::merge(_schema, dump_dir, sources, selector, - false, - tuneFileIndexing, fileHeaderContext, executor, flush_token); + Fusion fusion(_schema, dump_dir, sources, selector, + tuneFileIndexing, fileHeaderContext); + return fusion.merge(executor, flush_token); } void diff --git a/searchlib/src/vespa/searchlib/diskindex/fusion.cpp b/searchlib/src/vespa/searchlib/diskindex/fusion.cpp index 1afed18cb48..12552f09027 100644 --- a/searchlib/src/vespa/searchlib/diskindex/fusion.cpp +++ b/searchlib/src/vespa/searchlib/diskindex/fusion.cpp @@ -34,6 +34,7 @@ namespace { std::vector<FusionInputIndex> createInputIndexes(const std::vector<vespalib::string> & sources, const SelectorArray &selector) { + assert(sources.size() <= 255); // due to source selector data type std::vector<FusionInputIndex> indexes; indexes.reserve(sources.size()); uint32_t i = 0; @@ -43,17 +44,28 @@ createInputIndexes(const std::vector<vespalib::string> & sources, const Selector return indexes; } +uint32_t calc_trimmed_doc_id_limit(const SelectorArray& selector, const std::vector<vespalib::string>& sources) +{ + uint32_t docIdLimit = selector.size(); + uint32_t trimmed_doc_id_limit = docIdLimit; + + // Limit docIdLimit in output based on selections that cannot be satisfied + uint32_t sources_size = sources.size(); + while (trimmed_doc_id_limit > 0 && selector[trimmed_doc_id_limit - 1] >= sources_size) { + --trimmed_doc_id_limit; + } + return trimmed_doc_id_limit; +} + } -Fusion::Fusion(uint32_t docIdLimit, const Schema & schema, const vespalib::string & dir, - const std::vector<vespalib::string> & sources, const SelectorArray &selector, - bool dynamicKPosIndexFormat, const TuneFileIndexing &tuneFileIndexing, - const FileHeaderContext &fileHeaderContext) - : _fusion_out_index(schema, dir, createInputIndexes(sources, selector), docIdLimit, dynamicKPosIndexFormat, tuneFileIndexing, fileHeaderContext) +Fusion::Fusion(const Schema& schema, const vespalib::string& dir, + const std::vector<vespalib::string>& sources, const SelectorArray& selector, + const TuneFileIndexing& tuneFileIndexing, + const FileHeaderContext& fileHeaderContext) + : _old_indexes(createInputIndexes(sources, selector)), + _fusion_out_index(schema, dir, _old_indexes, calc_trimmed_doc_id_limit(selector, sources), tuneFileIndexing, fileHeaderContext) { - if (!readSchemaFiles()) { - throw IllegalArgumentException("Cannot read schema files for source indexes"); - } } Fusion::~Fusion() = default; @@ -102,51 +114,41 @@ Fusion::readSchemaFiles() } bool -Fusion::merge(const Schema &schema, const vespalib::string &dir, const std::vector<vespalib::string> &sources, - const SelectorArray &selector, bool dynamicKPosOccFormat, - const TuneFileIndexing &tuneFileIndexing, const FileHeaderContext &fileHeaderContext, - vespalib::ThreadExecutor & executor, - std::shared_ptr<IFlushToken> flush_token) +Fusion::merge(vespalib::ThreadExecutor& executor, std::shared_ptr<IFlushToken> flush_token) { - assert(sources.size() <= 255); - uint32_t docIdLimit = selector.size(); - uint32_t trimmedDocIdLimit = docIdLimit; - - // Limit docIdLimit in output based on selections that cannot be satisfied - uint32_t sourcesSize = sources.size(); - while (trimmedDocIdLimit > 0 && selector[trimmedDocIdLimit - 1] >= sourcesSize) { - --trimmedDocIdLimit; - } - FastOS_StatInfo statInfo; - if (!FastOS_File::Stat(dir.c_str(), &statInfo)) { + if (!FastOS_File::Stat(_fusion_out_index.get_path().c_str(), &statInfo)) { if (statInfo._error != FastOS_StatInfo::FileNotFound) { - LOG(error, "Could not stat \"%s\"", dir.c_str()); + LOG(error, "Could not stat \"%s\"", _fusion_out_index.get_path().c_str()); return false; } } else { if (!statInfo._isDirectory) { - LOG(error, "\"%s\" is not a directory", dir.c_str()); + LOG(error, "\"%s\" is not a directory", _fusion_out_index.get_path().c_str()); return false; } - search::DirectoryTraverse dt(dir.c_str()); + search::DirectoryTraverse dt(_fusion_out_index.get_path().c_str()); if (!dt.RemoveTree()) { - LOG(error, "Failed to clean directory \"%s\"", dir.c_str()); + LOG(error, "Failed to clean directory \"%s\"", _fusion_out_index.get_path().c_str()); return false; } } - vespalib::mkdir(dir, false); - schema.saveToFile(dir + "/schema.txt"); - if (!DocumentSummary::writeDocIdLimit(dir, trimmedDocIdLimit)) { - LOG(error, "Could not write docsum count in dir %s: %s", dir.c_str(), getLastErrorString().c_str()); + vespalib::mkdir(_fusion_out_index.get_path(), false); + _fusion_out_index.get_schema().saveToFile(_fusion_out_index.get_path() + "/schema.txt"); + if (!DocumentSummary::writeDocIdLimit(_fusion_out_index.get_path(), _fusion_out_index.get_doc_id_limit())) { + LOG(error, "Could not write docsum count in dir %s: %s", _fusion_out_index.get_path().c_str(), getLastErrorString().c_str()); return false; } try { - auto fusion = std::make_unique<Fusion>(trimmedDocIdLimit, schema, dir, sources, selector, - dynamicKPosOccFormat, tuneFileIndexing, fileHeaderContext); - return fusion->mergeFields(executor, flush_token); + for (auto& old_index : _old_indexes) { + old_index.setup(); + } + if (!readSchemaFiles()) { + throw IllegalArgumentException("Cannot read schema files for source indexes"); + } + return mergeFields(executor, flush_token); } catch (const std::exception & e) { LOG(error, "%s", e.what()); return false; diff --git a/searchlib/src/vespa/searchlib/diskindex/fusion.h b/searchlib/src/vespa/searchlib/diskindex/fusion.h index 7e0b70dca36..e905006bf14 100644 --- a/searchlib/src/vespa/searchlib/diskindex/fusion.h +++ b/searchlib/src/vespa/searchlib/diskindex/fusion.h @@ -31,21 +31,18 @@ private: const Schema &getSchema() const { return _fusion_out_index.get_schema(); } + std::vector<FusionInputIndex> _old_indexes; FusionOutputIndex _fusion_out_index; public: Fusion(const Fusion &) = delete; Fusion& operator=(const Fusion &) = delete; - Fusion(uint32_t docIdLimit, const Schema &schema, const vespalib::string &dir, - const std::vector<vespalib::string> & sources, const SelectorArray &selector, bool dynamicKPosIndexFormat, - const TuneFileIndexing &tuneFileIndexing, const common::FileHeaderContext &fileHeaderContext); + Fusion(const Schema& schema, const vespalib::string& dir, + const std::vector<vespalib::string>& sources, const SelectorArray& selector, + const TuneFileIndexing& tuneFileIndexing, const common::FileHeaderContext& fileHeaderContext); ~Fusion(); - - static bool - merge(const Schema &schema, const vespalib::string &dir, const std::vector<vespalib::string> &sources, - const SelectorArray &docIdSelector, bool dynamicKPosOccFormat, const TuneFileIndexing &tuneFileIndexing, - const common::FileHeaderContext &fileHeaderContext, vespalib::ThreadExecutor & executor, - std::shared_ptr<IFlushToken> flush_token); + void set_dynamic_k_pos_index_format(bool dynamic_k_pos_index_format) { _fusion_out_index.set_dynamic_k_pos_index_format(dynamic_k_pos_index_format); } + bool merge(vespalib::ThreadExecutor& executor, std::shared_ptr<IFlushToken> flush_token); }; } diff --git a/searchlib/src/vespa/searchlib/diskindex/fusion_input_index.cpp b/searchlib/src/vespa/searchlib/diskindex/fusion_input_index.cpp index 278d095b639..51c365957d9 100644 --- a/searchlib/src/vespa/searchlib/diskindex/fusion_input_index.cpp +++ b/searchlib/src/vespa/searchlib/diskindex/fusion_input_index.cpp @@ -14,21 +14,28 @@ namespace search::diskindex { FusionInputIndex::FusionInputIndex(const vespalib::string& path, uint32_t index, const SelectorArray& selector) : _path(path), _index(index), - _schema() + _selector(&selector), + _schema(), + _docIdMapping() { - vespalib::string fname = path + "/schema.txt"; +} + +FusionInputIndex::~FusionInputIndex() = default; + +void +FusionInputIndex::setup() +{ + vespalib::string fname = _path + "/schema.txt"; if ( ! _schema.loadFromFile(fname)) { throw IllegalArgumentException(make_string("Failed loading schema %s", fname.c_str())); } if ( ! SchemaUtil::validateSchema(_schema)) { throw IllegalArgumentException(make_string("Failed validating schema %s", fname.c_str())); } - if (!_docIdMapping.readDocIdLimit(path)) { - throw IllegalArgumentException(make_string("Cannot determine docIdLimit for old index \"%s\"", path.c_str())); + if (!_docIdMapping.readDocIdLimit(_path)) { + throw IllegalArgumentException(make_string("Cannot determine docIdLimit for old index \"%s\"", _path.c_str())); } - _docIdMapping.setup(_docIdMapping._docIdLimit, &selector, index); + _docIdMapping.setup(_docIdMapping._docIdLimit, _selector, _index); } -FusionInputIndex::~FusionInputIndex() = default; - } diff --git a/searchlib/src/vespa/searchlib/diskindex/fusion_input_index.h b/searchlib/src/vespa/searchlib/diskindex/fusion_input_index.h index fd7bb8f0256..6606e00d73b 100644 --- a/searchlib/src/vespa/searchlib/diskindex/fusion_input_index.h +++ b/searchlib/src/vespa/searchlib/diskindex/fusion_input_index.h @@ -14,10 +14,11 @@ namespace search::diskindex { class FusionInputIndex { private: - vespalib::string _path; - uint32_t _index; - index::Schema _schema; - DocIdMapping _docIdMapping; + vespalib::string _path; + uint32_t _index; + const SelectorArray* _selector; + index::Schema _schema; + DocIdMapping _docIdMapping; public: FusionInputIndex(const vespalib::string& path, uint32_t index, const SelectorArray& selector); @@ -25,6 +26,7 @@ public: FusionInputIndex & operator = (FusionInputIndex&&) = default; ~FusionInputIndex(); + void setup(); const vespalib::string& getPath() const noexcept { return _path; } uint32_t getIndex() const noexcept { return _index; } const DocIdMapping& getDocIdMapping() const noexcept { return _docIdMapping; } diff --git a/searchlib/src/vespa/searchlib/diskindex/fusion_output_index.cpp b/searchlib/src/vespa/searchlib/diskindex/fusion_output_index.cpp index 66ec0889cbe..7c63eb0f940 100644 --- a/searchlib/src/vespa/searchlib/diskindex/fusion_output_index.cpp +++ b/searchlib/src/vespa/searchlib/diskindex/fusion_output_index.cpp @@ -5,12 +5,12 @@ namespace search::diskindex { -FusionOutputIndex::FusionOutputIndex(const index::Schema& schema, const vespalib::string& path, std::vector<FusionInputIndex> old_indexes, uint32_t doc_id_limit, bool dynamic_k_pos_index_format, const TuneFileIndexing& tune_file_indexing, const common::FileHeaderContext& file_header_context) +FusionOutputIndex::FusionOutputIndex(const index::Schema& schema, const vespalib::string& path, const std::vector<FusionInputIndex>& old_indexes, uint32_t doc_id_limit, const TuneFileIndexing& tune_file_indexing, const common::FileHeaderContext& file_header_context) : _schema(schema), _path(path), _old_indexes(std::move(old_indexes)), _doc_id_limit(doc_id_limit), - _dynamic_k_pos_index_format(dynamic_k_pos_index_format), + _dynamic_k_pos_index_format(false), _tune_file_indexing(tune_file_indexing), _file_header_context(file_header_context) { diff --git a/searchlib/src/vespa/searchlib/diskindex/fusion_output_index.h b/searchlib/src/vespa/searchlib/diskindex/fusion_output_index.h index c366b111363..59cda4b33de 100644 --- a/searchlib/src/vespa/searchlib/diskindex/fusion_output_index.h +++ b/searchlib/src/vespa/searchlib/diskindex/fusion_output_index.h @@ -22,15 +22,16 @@ class FusionOutputIndex private: const index::Schema& _schema; const vespalib::string _path; - const std::vector<FusionInputIndex> _old_indexes; + const std::vector<FusionInputIndex>& _old_indexes; const uint32_t _doc_id_limit; - const bool _dynamic_k_pos_index_format; + bool _dynamic_k_pos_index_format; const TuneFileIndexing& _tune_file_indexing; const common::FileHeaderContext& _file_header_context; public: - FusionOutputIndex(const index::Schema& schema, const vespalib::string& path, std::vector<FusionInputIndex> old_indexes, uint32_t doc_id_limit, bool dynamic_k_pos_index_format, const TuneFileIndexing& tune_file_indexing, const common::FileHeaderContext& file_header_context); + FusionOutputIndex(const index::Schema& schema, const vespalib::string& path, const std::vector<FusionInputIndex>& old_indexes, uint32_t doc_id_limit, const TuneFileIndexing& tune_file_indexing, const common::FileHeaderContext& file_header_context); ~FusionOutputIndex(); + void set_dynamic_k_pos_index_format(bool dynamic_k_pos_index_format) { _dynamic_k_pos_index_format = dynamic_k_pos_index_format; } const index::Schema& get_schema() const noexcept { return _schema; } const vespalib::string& get_path() const noexcept { return _path; } const std::vector<FusionInputIndex>& get_old_indexes() const noexcept { return _old_indexes; } |