diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2019-05-14 20:07:46 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2019-05-15 20:53:34 +0000 |
commit | 8e5370fa867ed87d0b9b2b86f742ee14a3825bd7 (patch) | |
tree | 826571fed6668410946d8412f2037e3ac4106a71 | |
parent | a65db0dc5b5a272da545e93408526d738f1f696c (diff) |
- Provide an executor and use it for fusion.
- Use per field temporary directories.
- Keep WordNumMapping on the stack to make it thread safe.
12 files changed, 255 insertions, 452 deletions
diff --git a/searchcore/src/tests/proton/feed_and_search/feed_and_search.cpp b/searchcore/src/tests/proton/feed_and_search/feed_and_search.cpp index b2334ed025e..d0b19d77181 100644 --- a/searchcore/src/tests/proton/feed_and_search/feed_and_search.cpp +++ b/searchcore/src/tests/proton/feed_and_search/feed_and_search.cpp @@ -17,6 +17,7 @@ LOG_SETUP("feed_and_search_test"); #include <vespa/searchlib/queryeval/searchiterator.h> #include <vespa/searchlib/queryeval/fake_requestcontext.h> #include <vespa/vespalib/testkit/testapp.h> +#include <vespa/vespalib/util/threadstackexecutor.h> #include <sstream> #include <vespa/searchlib/diskindex/fusion.h> #include <vespa/searchlib/common/documentsummary.h> @@ -144,6 +145,7 @@ void Test::testSearch(Searchable &source, // again. void Test::requireThatMemoryIndexCanBeDumpedAndSearched() { Schema schema = getSchema(); + vespalib::ThreadStackExecutor sharedExecutor(2, 0x10000); search::SequencedTaskExecutor indexFieldInverter(2); search::SequencedTaskExecutor indexFieldWriter(2); MemoryIndex memory_index(schema, indexFieldInverter, indexFieldWriter); @@ -167,8 +169,7 @@ void Test::requireThatMemoryIndexCanBeDumpedAndSearched() { const uint64_t num_words = memory_index.getNumWords(); search::TuneFileIndexing tuneFileIndexing; DummyFileHeaderContext fileHeaderContext; - index_builder.open(docIdLimit, num_words, tuneFileIndexing, - fileHeaderContext); + index_builder.open(docIdLimit, num_words, tuneFileIndexing, fileHeaderContext); memory_index.dump(index_builder); index_builder.close(); @@ -177,17 +178,18 @@ void Test::requireThatMemoryIndexCanBeDumpedAndSearched() { std::vector<string> fusionInputs; fusionInputs.push_back(index_dir); uint32_t fusionDocIdLimit = 0; - typedef search::diskindex::Fusion FastS_Fusion; + using Fusion = search::diskindex::Fusion; bool fret1 = DocumentSummary::readDocIdLimit(index_dir, fusionDocIdLimit); ASSERT_TRUE(fret1); SelectorArray selector(fusionDocIdLimit, 0); - bool fret2 = FastS_Fusion::merge(schema, - index_dir2, - fusionInputs, - selector, - false /* dynamicKPosOccFormat */, - tuneFileIndexing, - fileHeaderContext); + bool fret2 = Fusion::merge(schema, + index_dir2, + fusionInputs, + selector, + false /* dynamicKPosOccFormat */, + tuneFileIndexing, + fileHeaderContext, + sharedExecutor); ASSERT_TRUE(fret2); // Fusion test with all docs removed in output (doesn't affect word list) @@ -198,13 +200,14 @@ void Test::requireThatMemoryIndexCanBeDumpedAndSearched() { bool fret3 = DocumentSummary::readDocIdLimit(index_dir, fusionDocIdLimit); ASSERT_TRUE(fret3); SelectorArray selector2(fusionDocIdLimit, 1); - bool fret4 = FastS_Fusion::merge(schema, - index_dir3, - fusionInputs, - selector2, - false /* dynamicKPosOccFormat */, - tuneFileIndexing, - fileHeaderContext); + bool fret4 = Fusion::merge(schema, + index_dir3, + fusionInputs, + selector2, + false /* dynamicKPosOccFormat */, + tuneFileIndexing, + fileHeaderContext, + sharedExecutor); ASSERT_TRUE(fret4); // Fusion test with all docs removed in input (affects word list) @@ -215,13 +218,14 @@ void Test::requireThatMemoryIndexCanBeDumpedAndSearched() { bool fret5 = DocumentSummary::readDocIdLimit(index_dir3, fusionDocIdLimit); ASSERT_TRUE(fret5); SelectorArray selector3(fusionDocIdLimit, 0); - bool fret6 = FastS_Fusion::merge(schema, - index_dir4, - fusionInputs, - selector3, - false /* dynamicKPosOccFormat */, - tuneFileIndexing, - fileHeaderContext); + bool fret6 = Fusion::merge(schema, + index_dir4, + fusionInputs, + selector3, + false /* dynamicKPosOccFormat */, + tuneFileIndexing, + fileHeaderContext, + sharedExecutor); ASSERT_TRUE(fret6); DiskIndex disk_index(index_dir); diff --git a/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.cpp b/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.cpp index 1abe9540859..113855d7e2b 100644 --- a/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.cpp +++ b/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.cpp @@ -1,9 +1,9 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "summarymanager.h" #include "documentstoreadapter.h" #include "summarycompacttarget.h" #include "summaryflushtarget.h" -#include "summarymanager.h" #include <vespa/config/print/ostreamconfigwriter.h> #include <vespa/document/repo/documenttyperepo.h> #include <vespa/juniper/rpinterface.h> @@ -12,6 +12,8 @@ #include <vespa/vespalib/util/lambdatask.h> #include <vespa/searchsummary/docsummary/docsumconfig.h> #include <vespa/vespalib/util/exceptions.h> +#include <vespa/fastlib/text/normwordfolder.h> + #include <sstream> #include <vespa/log/log.h> @@ -84,7 +86,7 @@ SummarySetup(const vespalib::string & baseDir, const DocTypeName & docTypeName, const search::IAttributeManager::SP &attributeMgr, const search::IDocumentStore::SP & docStore, const std::shared_ptr<const DocumentTypeRepo> &repo) : _docsumWriter(), - _wordFolder(), + _wordFolder(std::make_unique<Fast_NormalizeWordFolder>()), _juniperProps(juniperCfg), _juniperConfig(), _attributeMgr(attributeMgr), @@ -103,7 +105,7 @@ SummarySetup(const vespalib::string & baseDir, const DocTypeName & docTypeName, baseDir.c_str(), oss.str().c_str())); } - _juniperConfig = std::make_unique<juniper::Juniper>(&_juniperProps, &_wordFolder); + _juniperConfig = std::make_unique<juniper::Juniper>(&_juniperProps, _wordFolder.get()); _docsumWriter = std::make_unique<DynamicDocsumWriter>(resultConfig.release(), nullptr); DynamicDocsumConfig dynCfg(this, _docsumWriter.get()); dynCfg.configure(summarymapCfg); diff --git a/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.h b/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.h index 8df65e3e2ff..4fde28e69de 100644 --- a/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.h +++ b/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.h @@ -10,13 +10,14 @@ #include <vespa/searchlib/docstore/logdocumentstore.h> #include <vespa/searchlib/transactionlog/syncproxy.h> #include <vespa/document/fieldvalue/document.h> -#include <vespa/vespalib/util/threadstackexecutor.h> -#include <vespa/fastlib/text/normwordfolder.h> +#include <vespa/vespalib/util/threadexecutor.h> namespace searchcorespi::index { struct IThreadService; } namespace search { class IBucketizer; } namespace search::common { class FileHeaderContext; } +class Fast_NormalizeWordFolder; + namespace proton { class SummaryManager : public ISummaryManager @@ -25,7 +26,7 @@ public: class SummarySetup : public ISummarySetup { private: std::unique_ptr<search::docsummary::DynamicDocsumWriter> _docsumWriter; - Fast_NormalizeWordFolder _wordFolder; + std::unique_ptr<Fast_NormalizeWordFolder> _wordFolder; search::docsummary::JuniperProperties _juniperProps; std::unique_ptr<juniper::Juniper> _juniperConfig; search::IAttributeManager::SP _attributeMgr; diff --git a/searchcore/src/vespa/searchcore/proton/index/indexmanager.cpp b/searchcore/src/vespa/searchcore/proton/index/indexmanager.cpp index 2139e34818f..b74cc2c603f 100644 --- a/searchcore/src/vespa/searchcore/proton/index/indexmanager.cpp +++ b/searchcore/src/vespa/searchcore/proton/index/indexmanager.cpp @@ -66,7 +66,7 @@ IndexManager::MaintainerOperations::runFusion(const Schema &schema, SerialNumFileHeaderContext fileHeaderContext(_fileHeaderContext, serialNum); const bool dynamic_k_doc_pos_occ_format = false; return Fusion::merge(schema, outputDir, sources, selectorArray, dynamic_k_doc_pos_occ_format, - _tuneFileIndexing, fileHeaderContext); + _tuneFileIndexing, fileHeaderContext, _threadingService.shared()); } diff --git a/searchlib/src/tests/diskindex/fusion/fusion_test.cpp b/searchlib/src/tests/diskindex/fusion/fusion_test.cpp index 24146e516a0..c46869392f7 100644 --- a/searchlib/src/tests/diskindex/fusion/fusion_test.cpp +++ b/searchlib/src/tests/diskindex/fusion/fusion_test.cpp @@ -16,6 +16,7 @@ #include <vespa/searchlib/memoryindex/field_index_collection.h> #include <vespa/searchlib/memoryindex/posting_iterator.h> #include <vespa/searchlib/util/filekit.h> +#include <vespa/vespalib/util/threadstackexecutor.h> #include <vespa/vespalib/testkit/testapp.h> #include <vespa/log/log.h> @@ -38,7 +39,6 @@ using namespace index; namespace diskindex { - class Test : public vespalib::TestApp { private: @@ -62,8 +62,7 @@ myPushDocument(DocumentInverter &inv, FieldIndexCollection &fieldIndexes) } vespalib::string -toString(FieldPositionsIterator posItr, - bool hasElements = false, bool hasWeights = false) +toString(FieldPositionsIterator posItr, bool hasElements = false, bool hasWeights = false) { vespalib::asciistream ss; ss << "{"; @@ -121,9 +120,7 @@ toString(DocIdAndFeatures &features) void -validateDiskIndex(DiskIndex &dw, - bool f2HasElements, - bool f3HasWeights) +validateDiskIndex(DiskIndex &dw, bool f2HasElements, bool f3HasWeights) { typedef DiskIndex::LookupResult LR; typedef index::PostingListHandle PH; @@ -239,16 +236,13 @@ validateDiskIndex(DiskIndex &dw, void -Test::requireThatFusionIsWorking(const vespalib::string &prefix, - bool directio, - bool readmmap) +Test::requireThatFusionIsWorking(const vespalib::string &prefix, bool directio, bool readmmap) { Schema schema; Schema schema2; Schema schema3; for (SchemaUtil::IndexIterator it(getSchema()); it.isValid(); ++it) { - const Schema::IndexField &iField = - _schema.getIndexField(it.getIndex()); + const Schema::IndexField &iField = _schema.getIndexField(it.getIndex()); schema.addIndexField(Schema::IndexField(iField.getName(), iField.getDataType(), iField.getCollectionType())); @@ -350,6 +344,7 @@ Test::requireThatFusionIsWorking(const vespalib::string &prefix, EXPECT_TRUE(FileKit::hasStamp(tsName)); EXPECT_TRUE(FileKit::removeStamp(tsName)); EXPECT_FALSE(FileKit::hasStamp(tsName)); + vespalib::ThreadStackExecutor executor(4, 0x10000); do { DiskIndex dw2(prefix + "dump2"); @@ -362,12 +357,9 @@ Test::requireThatFusionIsWorking(const vespalib::string &prefix, std::vector<vespalib::string> sources; SelectorArray selector(numDocs, 0); sources.push_back(prefix + "dump2"); - if (!EXPECT_TRUE(Fusion::merge(schema, - prefix + "dump3", - sources, selector, + if (!EXPECT_TRUE(Fusion::merge(schema, prefix + "dump3", sources, selector, dynamicKPosOcc, - tuneFileIndexing, - fileHeaderContext))) + tuneFileIndexing,fileHeaderContext, executor))) return; } while (0); do { @@ -380,12 +372,9 @@ Test::requireThatFusionIsWorking(const vespalib::string &prefix, std::vector<vespalib::string> sources; SelectorArray selector(numDocs, 0); sources.push_back(prefix + "dump3"); - if (!EXPECT_TRUE(Fusion::merge(schema2, - prefix + "dump4", - sources, selector, + if (!EXPECT_TRUE(Fusion::merge(schema2, prefix + "dump4", sources, selector, dynamicKPosOcc, - tuneFileIndexing, - fileHeaderContext))) + tuneFileIndexing, fileHeaderContext, executor))) return; } while (0); do { @@ -398,12 +387,9 @@ Test::requireThatFusionIsWorking(const vespalib::string &prefix, std::vector<vespalib::string> sources; SelectorArray selector(numDocs, 0); sources.push_back(prefix + "dump3"); - if (!EXPECT_TRUE(Fusion::merge(schema3, - prefix + "dump5", - sources, selector, + if (!EXPECT_TRUE(Fusion::merge(schema3, prefix + "dump5", sources, selector, dynamicKPosOcc, - tuneFileIndexing, - fileHeaderContext))) + tuneFileIndexing, fileHeaderContext, executor))) return; } while (0); do { @@ -416,12 +402,9 @@ Test::requireThatFusionIsWorking(const vespalib::string &prefix, std::vector<vespalib::string> sources; SelectorArray selector(numDocs, 0); sources.push_back(prefix + "dump3"); - if (!EXPECT_TRUE(Fusion::merge(schema, - prefix + "dump6", - sources, selector, + if (!EXPECT_TRUE(Fusion::merge(schema, prefix + "dump6", sources, selector, !dynamicKPosOcc, - tuneFileIndexing, - fileHeaderContext))) + tuneFileIndexing, fileHeaderContext, executor))) return; } while (0); do { @@ -434,12 +417,9 @@ Test::requireThatFusionIsWorking(const vespalib::string &prefix, std::vector<vespalib::string> sources; SelectorArray selector(numDocs, 0); sources.push_back(prefix + "dump2"); - if (!EXPECT_TRUE(Fusion::merge(schema, - prefix + "dump3", - sources, selector, + if (!EXPECT_TRUE(Fusion::merge(schema, prefix + "dump3", sources, selector, dynamicKPosOcc, - tuneFileIndexing, - fileHeaderContext))) + tuneFileIndexing, fileHeaderContext, executor))) return; } while (0); do { diff --git a/searchlib/src/vespa/searchlib/diskindex/docidmapper.cpp b/searchlib/src/vespa/searchlib/diskindex/docidmapper.cpp index f8534ed62ff..489986f1f5c 100644 --- a/searchlib/src/vespa/searchlib/diskindex/docidmapper.cpp +++ b/searchlib/src/vespa/searchlib/diskindex/docidmapper.cpp @@ -2,16 +2,14 @@ #include "docidmapper.h" #include <vespa/searchlib/common/documentsummary.h> -#include <vespa/searchlib/common/bitvector.h> -#include <vespa/fastlib/io/bufferedfile.h> #define NO_DOC static_cast<uint32_t>(-1) namespace search::diskindex { DocIdMapping::DocIdMapping() - : _docIdLimit(0u), - _selector(nullptr), + : _selector(nullptr), + _docIdLimit(0u), _selectorId(0) { } @@ -36,9 +34,7 @@ DocIdMapping::setup(uint32_t docIdLimit) void -DocIdMapping::setup(uint32_t docIdLimit, - const SelectorArray *selector, - uint8_t selectorId) +DocIdMapping::setup(uint32_t docIdLimit, const SelectorArray *selector, uint8_t selectorId) { _docIdLimit = docIdLimit; _selector = selector; diff --git a/searchlib/src/vespa/searchlib/diskindex/docidmapper.h b/searchlib/src/vespa/searchlib/diskindex/docidmapper.h index 08b4865cb0a..a8c33eaed2f 100644 --- a/searchlib/src/vespa/searchlib/diskindex/docidmapper.h +++ b/searchlib/src/vespa/searchlib/diskindex/docidmapper.h @@ -9,13 +9,13 @@ namespace search { class BitVector; } namespace search::diskindex { -typedef vespalib::Array<uint8_t> SelectorArray; +using SelectorArray = vespalib::Array<uint8_t>; class DocIdMapping { public: - uint32_t _docIdLimit; const SelectorArray *_selector; // External ownership + uint32_t _docIdLimit; uint8_t _selectorId; DocIdMapping(); @@ -32,7 +32,7 @@ public: const uint8_t *_selector; uint32_t _docIdLimit; // Limit on legal input values uint32_t _selectorLimit; // Limit on output - uint8_t _selectorId; + uint8_t _selectorId; DocIdMapper() : _selector(nullptr), @@ -42,12 +42,9 @@ public: { } void setup(const DocIdMapping &mapping) { - _selector = (mapping._selector != nullptr) ? - &((*mapping._selector)[0]) : nullptr; + _selector = (mapping._selector != nullptr) ? &((*mapping._selector)[0]) : nullptr; _docIdLimit = mapping._docIdLimit; - _selectorLimit = (mapping._selector != nullptr) ? - (*mapping._selector).size() : - 0u; + _selectorLimit = (mapping._selector != nullptr) ? (*mapping._selector).size() : 0u; _selectorId = mapping._selectorId; } diff --git a/searchlib/src/vespa/searchlib/diskindex/fieldreader.cpp b/searchlib/src/vespa/searchlib/diskindex/fieldreader.cpp index 04af3a18026..c4cd6d3a22e 100644 --- a/searchlib/src/vespa/searchlib/diskindex/fieldreader.cpp +++ b/searchlib/src/vespa/searchlib/diskindex/fieldreader.cpp @@ -110,36 +110,26 @@ FieldReader::open(const vespalib::string &prefix, { vespalib::string name = prefix + "posocc.dat.compressed"; FastOS_StatInfo statInfo; - bool statres; - statres = FastOS_File::Stat(name.c_str(), &statInfo); + bool statres = FastOS_File::Stat(name.c_str(), &statInfo); if (!statres) { - LOG(error, - "Could not stat compressed posocc file %s: %s", - name.c_str(), getLastErrorString().c_str()); + LOG(error, "Could not stat compressed posocc file %s: %s", name.c_str(), getLastErrorString().c_str()); return false; } _dictFile = std::make_unique<PageDict4FileSeqRead>(); PostingListParams featureParams; - _oldposoccfile = makePosOccRead(name, - _dictFile.get(), - featureParams, - tuneFileRead); + _oldposoccfile = makePosOccRead(name, _dictFile.get(), featureParams, tuneFileRead); vespalib::string cname = prefix + "dictionary"; if (!_dictFile->open(cname, tuneFileRead)) { - LOG(error, - "Could not open posocc count file %s for read", - cname.c_str()); + LOG(error, "Could not open posocc count file %s for read", cname.c_str()); return false; } // open posocc.dat if (!_oldposoccfile || !_oldposoccfile->open(name, tuneFileRead)) { - LOG(error, - "Could not open posocc file %s for read", - name.c_str()); + LOG(error, "Could not open posocc file %s for read", name.c_str()); return false; } _oldWordNum = noWordNum(); @@ -158,8 +148,7 @@ FieldReader::close() if (_oldposoccfile) { bool closeRes = _oldposoccfile->close(); if (!closeRes) { - LOG(error, - "Could not close posocc file for read"); + LOG(error, "Could not close posocc file for read"); ret = false; } _oldposoccfile.reset(); @@ -167,8 +156,7 @@ FieldReader::close() if (_dictFile) { bool closeRes = _dictFile->close(); if (!closeRes) { - LOG(error, - "Could not close posocc file for read"); + LOG(error, "Could not close posocc file for read"); ret = false; } _dictFile.reset(); diff --git a/searchlib/src/vespa/searchlib/diskindex/fieldreader.h b/searchlib/src/vespa/searchlib/diskindex/fieldreader.h index 8e79c892f6e..243da21731b 100644 --- a/searchlib/src/vespa/searchlib/diskindex/fieldreader.h +++ b/searchlib/src/vespa/searchlib/diskindex/fieldreader.h @@ -39,7 +39,7 @@ public: typedef index::PostingListCounts PostingListCounts; typedef index::PostingListParams PostingListParams; - uint64_t _wordNum; + uint64_t _wordNum; DocIdAndFeatures _docIdAndFeatures; protected: std::unique_ptr<DictionaryFileSeqRead> _dictFile; diff --git a/searchlib/src/vespa/searchlib/diskindex/fieldwriter.cpp b/searchlib/src/vespa/searchlib/diskindex/fieldwriter.cpp index bd30074836e..7fb575da7f3 100644 --- a/searchlib/src/vespa/searchlib/diskindex/fieldwriter.cpp +++ b/searchlib/src/vespa/searchlib/diskindex/fieldwriter.cpp @@ -14,8 +14,7 @@ namespace search::diskindex { using vespalib::getLastErrorString; using common::FileHeaderContext; -FieldWriter::FieldWriter(uint32_t docIdLimit, - uint64_t numWordIds) +FieldWriter::FieldWriter(uint32_t docIdLimit, uint64_t numWordIds) : _wordNum(noWordNum()), _prevDocId(0), _dictFile(), @@ -50,10 +49,7 @@ FieldWriter::open(const vespalib::string &prefix, PostingListParams featureParams; PostingListParams countParams; - diskindex::setupDefaultPosOccParameters(&countParams, - ¶ms, - _numWordIds, - _docIdLimit); + diskindex::setupDefaultPosOccParameters(&countParams, ¶ms, _numWordIds, _docIdLimit); if (minSkipDocs != 0) { countParams.set("minSkipDocs", minSkipDocs); @@ -70,12 +66,7 @@ FieldWriter::open(const vespalib::string &prefix, _dictFile = std::make_unique<PageDict4FileSeqWrite>(); _dictFile->setParams(countParams); - _posoccfile = diskindex::makePosOccWrite(_dictFile.get(), - dynamicKPosOccFormat, - params, - featureParams, - schema, - indexId); + _posoccfile = makePosOccWrite(_dictFile.get(), dynamicKPosOccFormat, params, featureParams, schema, indexId); vespalib::string cname = _prefix + "dictionary"; // Open output dictionary file @@ -149,8 +140,7 @@ FieldWriter::close() if (_posoccfile) { bool closeRes = _posoccfile->close(); if (!closeRes) { - LOG(error, - "Could not close posocc file for write"); + LOG(error, "Could not close posocc file for write"); ret = false; } _posoccfile.reset(); @@ -158,8 +148,7 @@ FieldWriter::close() if (_dictFile) { bool closeRes = _dictFile->close(); if (!closeRes) { - LOG(error, - "Could not close posocc count file for write"); + LOG(error, "Could not close posocc count file for write"); ret = false; } _dictFile.reset(); diff --git a/searchlib/src/vespa/searchlib/diskindex/fusion.cpp b/searchlib/src/vespa/searchlib/diskindex/fusion.cpp index 6d95f87b6a6..79e8ca5d718 100644 --- a/searchlib/src/vespa/searchlib/diskindex/fusion.cpp +++ b/searchlib/src/vespa/searchlib/diskindex/fusion.cpp @@ -9,9 +9,14 @@ #include <vespa/vespalib/io/fileutil.h> #include <vespa/searchlib/common/documentsummary.h> #include <vespa/vespalib/util/error.h> +#include <vespa/vespalib/util/lambdatask.h> +#include <vespa/vespalib/util/count_down_latch.h> +#include <vespa/vespalib/stllike/asciistream.h> #include <sstream> #include <vespa/log/log.h> +#include <vespa/vespalib/util/exceptions.h> + LOG_SETUP(".diskindex.fusion"); using search::FileKit; @@ -25,78 +30,83 @@ using search::index::Schema; using search::index::SchemaUtil; using search::index::schema::DataType; using vespalib::getLastErrorString; +using vespalib::IllegalArgumentException; +using vespalib::make_string; namespace search::diskindex { -void -FusionInputIndex::setSchema(const Schema::SP &schema) -{ - _schema = schema; -} +namespace { -Fusion::Fusion(bool dynamicKPosIndexFormat, - const TuneFileIndexing &tuneFileIndexing, - const FileHeaderContext &fileHeaderContext) - : _schema(nullptr), - _oldIndexes(), - _docIdLimit(0u), - _numWordIds(0u), - _dynamicKPosIndexFormat(dynamicKPosIndexFormat), - _outDir("merged"), - _tuneFileIndexing(tuneFileIndexing), - _fileHeaderContext(fileHeaderContext) -{ } +vespalib::string +createTmpPath(const vespalib::string & base, uint32_t index) { + vespalib::asciistream os; + os << base; + os << "/tmpindex"; + os << index; + return os.str(); +} -Fusion::~Fusion() +std::vector<FusionInputIndex> +createInputIndexes(const std::vector<vespalib::string> & sources, const SelectorArray &selector) { - ReleaseMappingTables(); + std::vector<FusionInputIndex> indexes; + indexes.reserve(sources.size()); + uint32_t i = 0; + for (const auto & source : sources) { + indexes.emplace_back(source, i++, selector); + } + return indexes; } -void -Fusion::setSchema(const Schema *schema) -{ - _schema = schema; } -void -Fusion::setOutDir(const vespalib::string &outDir) +FusionInputIndex::FusionInputIndex(const vespalib::string &path, uint32_t index, const SelectorArray &selector) + : _path(path), + _index(index), + _schema() { - _outDir = outDir; + vespalib::string fname = path + "/schema.txt"; + if ( ! _schema.loadFromFile(fname)) { + throw IllegalArgumentException(make_string("Failed loading schema %s", fname.c_str())); + } + if ( ! SchemaUtil::validateSchema(_schema)) { + throw IllegalArgumentException(make_string("Failed validating schema %s", fname.c_str())); + } + if (!_docIdMapping.readDocIdLimit(path)) { + throw IllegalArgumentException(make_string("Cannot determine docIdLimit for old index \"%s\"", path.c_str())); + } + _docIdMapping.setup(_docIdMapping._docIdLimit, &selector, index); } -void -Fusion::SetOldIndexList(const std::vector<vespalib::string> &oldIndexList) +FusionInputIndex::~FusionInputIndex() = default; + +Fusion::Fusion(uint32_t docIdLimit, const Schema & schema, const vespalib::string & dir, + const std::vector<vespalib::string> & sources, const SelectorArray &selector, + bool dynamicKPosIndexFormat, const TuneFileIndexing &tuneFileIndexing, + const FileHeaderContext &fileHeaderContext) + : _schema(schema), + _oldIndexes(createInputIndexes(sources, selector)), + _docIdLimit(docIdLimit), + _dynamicKPosIndexFormat(dynamicKPosIndexFormat), + _outDir(dir), + _tuneFileIndexing(tuneFileIndexing), + _fileHeaderContext(fileHeaderContext) { - _oldIndexes.resize(oldIndexList.size()); - OldIndexIterator oldIndexIt = _oldIndexes.begin(); - uint32_t i = 0; - for (std::vector<vespalib::string>::const_iterator - it = oldIndexList.begin(), ite = oldIndexList.end(); - it != ite; - ++it, ++oldIndexIt, ++i) { - oldIndexIt->reset(allocOldIndex()); - OldIndex &oi = **oldIndexIt; - oi.setPath(*it); - std::ostringstream tmpindexpath0; - tmpindexpath0 << _outDir; - tmpindexpath0 << "/tmpindex"; - tmpindexpath0 << i; - oi.setTmpPath(tmpindexpath0.str()); + if (!readSchemaFiles()) { + throw IllegalArgumentException("Cannot read schema files for source indexes"); } } +Fusion::~Fusion() = default; bool -Fusion::openInputWordReaders(const SchemaUtil::IndexIterator &index, - std::vector< - std::unique_ptr<DictionaryWordReader> > & - readers, +Fusion::openInputWordReaders(const vespalib::string & dir, const SchemaUtil::IndexIterator &index, + std::vector<std::unique_ptr<DictionaryWordReader> > & readers, PostingPriorityQueue<DictionaryWordReader> &heap) { - for (auto &i : getOldIndexes()) { - OldIndex &oi = *i; + for (auto & oi : _oldIndexes) { auto reader(std::make_unique<DictionaryWordReader>()); - const vespalib::string &tmpindexpath = oi.getTmpPath(); + const vespalib::string &tmpindexpath = createTmpPath(dir, oi.getIndex()); const vespalib::string &oldindexpath = oi.getPath(); vespalib::string wordMapName = tmpindexpath + "/old2new.dat"; vespalib::string fieldDir(oldindexpath + "/" + index.getName()); @@ -105,12 +115,9 @@ Fusion::openInputWordReaders(const SchemaUtil::IndexIterator &index, if (!index.hasOldFields(oldSchema)) { continue; // drop data } - bool res = reader->open(dictName, - wordMapName, - _tuneFileIndexing._read); + bool res = reader->open(dictName, wordMapName, _tuneFileIndexing._read); if (!res) { - LOG(error, "Could not open dictionary %s to generate %s", - dictName.c_str(), wordMapName.c_str()); + LOG(error, "Could not open dictionary %s to generate %s", dictName.c_str(), wordMapName.c_str()); return false; } reader->read(); @@ -124,7 +131,8 @@ Fusion::openInputWordReaders(const SchemaUtil::IndexIterator &index, bool -Fusion::renumberFieldWordIds(const SchemaUtil::IndexIterator &index) +Fusion::renumberFieldWordIds(const vespalib::string & dir, const SchemaUtil::IndexIterator &index, + WordNumMappingList & list, uint64_t & numWordIds) { vespalib::string indexName = index.getName(); LOG(debug, "Renumber word IDs for field %s", indexName.c_str()); @@ -133,12 +141,12 @@ Fusion::renumberFieldWordIds(const SchemaUtil::IndexIterator &index) PostingPriorityQueue<DictionaryWordReader> heap; WordAggregator out; - if (!openInputWordReaders(index, readers, heap)) { + if (!openInputWordReaders(dir, index, readers, heap)) { return false; } heap.merge(out, 4); assert(heap.empty()); - _numWordIds = out.getWordNum(); + numWordIds = out.getWordNum(); // Close files for (auto &i : readers) { @@ -147,28 +155,33 @@ Fusion::renumberFieldWordIds(const SchemaUtil::IndexIterator &index) // Now read mapping files back into an array // XXX: avoid this, and instead make the array here - if (!ReadMappingFiles(&index)) { + if (!readMappingFiles(dir, &index, list)) { return false; } - LOG(debug, "Finished renumbering words IDs for field %s", - indexName.c_str()); + LOG(debug, "Finished renumbering words IDs for field %s", indexName.c_str()); return true; } bool -Fusion::mergeFields() +Fusion::mergeFields(vespalib::ThreadExecutor & executor) { - typedef SchemaUtil::IndexIterator IndexIterator; - const Schema &schema = getSchema(); - for (IndexIterator index(schema); index.isValid(); ++index) { - if (!mergeField(index.getIndex())) { - return false; - } + std::atomic<uint32_t> failed(0); + vespalib::CountDownLatch done(schema.getNumIndexFields()); + for (SchemaUtil::IndexIterator iter(schema); iter.isValid(); ++iter) { + executor.execute(vespalib::makeLambdaTask([this, index=iter.getIndex(), &failed, &done]() { + if (!mergeField(index)) { + failed++; + } + done.countDown(); + })); } - return true; + LOG(info, "Waiting for %u fields", schema.getNumIndexFields()); + done.await(); + LOG(info, "Done waiting for %u fields", schema.getNumIndexFields()); + return (failed == 0u); } @@ -190,37 +203,35 @@ Fusion::mergeField(uint32_t id) if (FileKit::hasStamp(indexDir + "/.mergeocc_done")) { return true; } - vespalib::mkdir(indexDir.c_str(), false); + vespalib::mkdir(indexDir, false); - LOG(debug, "mergeField for field %s dir %s", - indexName.c_str(), indexDir.c_str()); + LOG(info, "mergeField for field %s dir %s", indexName.c_str(), indexDir.c_str()); - makeTmpDirs(); + makeTmpDirs(indexDir); - if (!renumberFieldWordIds(index)) { - LOG(error, "Could not renumber field word ids for field %s dir %s", - indexName.c_str(), indexDir.c_str()); + WordNumMappingList list(_oldIndexes.size()); + uint64_t numWordIds(0); + if (!renumberFieldWordIds(indexDir, index, list, numWordIds)) { + LOG(error, "Could not renumber field word ids for field %s dir %s", indexName.c_str(), indexDir.c_str()); return false; } // Tokamak - bool res = mergeFieldPostings(index); + bool res = mergeFieldPostings(index, list, numWordIds); if (!res) { - LOG(error, "Could not merge field postings for field %s dir %s", - indexName.c_str(), indexDir.c_str()); - LOG_ABORT("should not be reached"); + throw IllegalArgumentException(make_string("Could not merge field postings for field %s dir %s", + indexName.c_str(), indexDir.c_str())); } if (!FileKit::createStamp(indexDir + "/.mergeocc_done")) { return false; } vespalib::File::sync(indexDir); - if (!CleanTmpDirs()) { + if (!cleanTmpDirs(indexDir)) { return false; } - LOG(debug, "Finished mergeField for field %s dir %s", - indexName.c_str(), indexDir.c_str()); + LOG(info, "Finished mergeField for field %s dir %s", indexName.c_str(), indexDir.c_str()); return true; } @@ -264,8 +275,7 @@ Fusion::selectCookedOrRawFeatures(Reader &reader, Writer &writer) } } if (!cookedFormatOK) { - LOG(error, - "Cannot perform fusion, cooked feature formats don't match"); + LOG(error, "Cannot perform fusion, cooked feature formats don't match"); return false; } if (rawFormatOK) { @@ -288,20 +298,17 @@ Fusion::selectCookedOrRawFeatures(Reader &reader, Writer &writer) bool -Fusion::openInputFieldReaders(const SchemaUtil::IndexIterator &index, - std::vector<std::unique_ptr<FieldReader> > & - readers) +Fusion::openInputFieldReaders(const SchemaUtil::IndexIterator &index, const WordNumMappingList & list, + std::vector<std::unique_ptr<FieldReader> > & readers) { vespalib::string indexName = index.getName(); - for (auto &i : _oldIndexes) { - OldIndex &oi = *i; + for (const auto &oi : _oldIndexes) { const Schema &oldSchema = oi.getSchema(); if (!index.hasOldFields(oldSchema)) { continue; // drop data } auto reader = FieldReader::allocFieldReader(index, oldSchema); - reader->setup(oi.getWordNumMapping(), - oi.getDocIdMapping()); + reader->setup(list[oi.getIndex()], oi.getDocIdMapping()); if (!reader->open(oi.getPath() + "/" + indexName + "/", _tuneFileIndexing._read)) { return false; } @@ -312,33 +319,21 @@ Fusion::openInputFieldReaders(const SchemaUtil::IndexIterator &index, bool -Fusion::openFieldWriter(const SchemaUtil::IndexIterator &index, - FieldWriter &writer) +Fusion::openFieldWriter(const SchemaUtil::IndexIterator &index, FieldWriter &writer) { vespalib::string dir = _outDir + "/" + index.getName(); - if (!writer.open(dir + "/", - 64, - 262144, - _dynamicKPosIndexFormat, false, - index.getSchema(), - index.getIndex(), - _tuneFileIndexing._write, - _fileHeaderContext)) { - LOG(error, "Could not open output posocc + dictionary in %s", - dir.c_str()); - LOG_ABORT("should not be reached"); - return false; + if (!writer.open(dir + "/", 64, 262144, _dynamicKPosIndexFormat, false, index.getSchema(), + index.getIndex(), _tuneFileIndexing._write, _fileHeaderContext)) { + throw IllegalArgumentException(make_string("Could not open output posocc + dictionary in %s", dir.c_str())); } return true; } bool -Fusion::setupMergeHeap(const std::vector<std::unique_ptr<FieldReader> > & - readers, - FieldWriter &writer, - PostingPriorityQueue<FieldReader> &heap) +Fusion::setupMergeHeap(const std::vector<std::unique_ptr<FieldReader> > & readers, + FieldWriter &writer, PostingPriorityQueue<FieldReader> &heap) { for (auto &reader : readers) { if (!selectCookedOrRawFeatures(*reader, writer)) { @@ -356,15 +351,15 @@ Fusion::setupMergeHeap(const std::vector<std::unique_ptr<FieldReader> > & bool -Fusion::mergeFieldPostings(const SchemaUtil::IndexIterator &index) +Fusion::mergeFieldPostings(const SchemaUtil::IndexIterator &index, const WordNumMappingList & list, uint64_t numWordIds) { std::vector<std::unique_ptr<FieldReader>> readers; PostingPriorityQueue<FieldReader> heap; /* OUTPUT */ - FieldWriter fieldWriter(_docIdLimit, _numWordIds); + FieldWriter fieldWriter(_docIdLimit, numWordIds); vespalib::string indexName = index.getName(); - if (!openInputFieldReaders(index, readers)) { + if (!openInputFieldReaders(index, list, readers)) { return false; } if (!openFieldWriter(index, fieldWriter)) { @@ -383,32 +378,23 @@ Fusion::mergeFieldPostings(const SchemaUtil::IndexIterator &index) } } if (!fieldWriter.close()) { - LOG(error, "Could not close output posocc + dictionary in %s/%s", - _outDir.c_str(), indexName.c_str()); - LOG_ABORT("should not be reached"); + throw IllegalArgumentException(make_string("Could not close output posocc + dictionary in %s/%s", + _outDir.c_str(), indexName.c_str())); } return true; } bool -Fusion::ReadMappingFiles(const SchemaUtil::IndexIterator *index) +Fusion::readMappingFiles(const vespalib::string & dir, const SchemaUtil::IndexIterator *index, WordNumMappingList & list) { - ReleaseMappingTables(); - - size_t numberOfOldIndexes = _oldIndexes.size(); - for (uint32_t i = 0; i < numberOfOldIndexes; i++) - { - OldIndex &oi = *_oldIndexes[i]; - WordNumMapping &wordNumMapping = oi.getWordNumMapping(); + for (const auto & oi : _oldIndexes) { std::vector<uint32_t> oldIndexes; const Schema &oldSchema = oi.getSchema(); - if (!SchemaUtil::getIndexIds(oldSchema, - DataType::STRING, - oldIndexes)) - { + if (!SchemaUtil::getIndexIds(oldSchema, DataType::STRING, oldIndexes)) { return false; } + WordNumMapping &wordNumMapping = list[oi.getIndex()]; if (oldIndexes.empty()) { wordNumMapping.noMappingFile(); continue; @@ -418,7 +404,7 @@ Fusion::ReadMappingFiles(const SchemaUtil::IndexIterator *index) } // Open word mapping file - vespalib::string old2newname = oi.getTmpPath() + "/old2new.dat"; + vespalib::string old2newname = createTmpPath(dir, oi.getIndex()) + "/old2new.dat"; wordNumMapping.readMappingFile(old2newname, _tuneFileIndexing._read); } @@ -426,40 +412,20 @@ Fusion::ReadMappingFiles(const SchemaUtil::IndexIterator *index) } -bool -Fusion::ReleaseMappingTables() -{ - size_t numberOfOldIndexes = _oldIndexes.size(); - for (uint32_t i = 0; i < numberOfOldIndexes; i++) - { - OldIndex &oi = *_oldIndexes[i]; - oi.getWordNumMapping().clear(); - } - return true; -} - - void -Fusion::makeTmpDirs() +Fusion::makeTmpDirs(const vespalib::string & dir) { - for (auto &i : getOldIndexes()) { - OldIndex &oi = *i; - // Make tmpindex directories - const vespalib::string &tmpindexpath = oi.getTmpPath(); - vespalib::mkdir(tmpindexpath, false); + for (const auto & index : _oldIndexes) { + vespalib::mkdir(createTmpPath(dir, index.getIndex()), false); } } bool -Fusion::CleanTmpDirs() +Fusion::cleanTmpDirs(const vespalib::string & dir) { uint32_t i = 0; for (;;) { - std::ostringstream tmpindexpath0; - tmpindexpath0 << _outDir; - tmpindexpath0 << "/tmpindex"; - tmpindexpath0 << i; - const vespalib::string &tmpindexpath = tmpindexpath0.str(); + vespalib::string tmpindexpath = createTmpPath(dir, i); FastOS_StatInfo statInfo; if (!FastOS_File::Stat(tmpindexpath.c_str(), &statInfo)) { if (statInfo._error == FastOS_StatInfo::FileNotFound) { @@ -472,12 +438,7 @@ Fusion::CleanTmpDirs() } while (i > 0) { i--; - // Remove tmpindex directories - std::ostringstream tmpindexpath0; - tmpindexpath0 << _outDir; - tmpindexpath0 << "/tmpindex"; - tmpindexpath0 << i; - const vespalib::string &tmpindexpath = tmpindexpath0.str(); + vespalib::string tmpindexpath = createTmpPath(dir, i); search::DirectoryTraverse dt(tmpindexpath.c_str()); if (!dt.RemoveTree()) { LOG(error, "Failed to clean tmpdir %s", tmpindexpath.c_str()); @@ -491,30 +452,13 @@ Fusion::CleanTmpDirs() bool Fusion::checkSchemaCompat() { + /* TODO: Check compatibility */ return true; } - bool Fusion::readSchemaFiles() { - OldIndexIterator oldIndexIt = _oldIndexes.begin(); - OldIndexIterator oldIndexIte = _oldIndexes.end(); - - for (; oldIndexIt != oldIndexIte; ++oldIndexIt) { - OldIndex &oi = **oldIndexIt; - vespalib::string oldcfname = oi.getPath() + "/schema.txt"; - Schema::SP schema(new Schema); - if (!schema->loadFromFile(oldcfname)) { - return false; - } - if (!SchemaUtil::validateSchema(*_schema)) { - return false; - } - oi.setSchema(schema); - } - - /* TODO: Check compatibility */ bool res = checkSchemaCompat(); if (!res) { LOG(error, "Index fusion cannot continue due to incompatible indexes"); @@ -522,15 +466,11 @@ Fusion::readSchemaFiles() return res; } - bool -Fusion::merge(const Schema &schema, - const vespalib::string &dir, - const std::vector<vespalib::string> &sources, - const SelectorArray &selector, - bool dynamicKPosOccFormat, - const TuneFileIndexing &tuneFileIndexing, - const FileHeaderContext &fileHeaderContext) +Fusion::merge(const Schema &schema, const vespalib::string &dir, const std::vector<vespalib::string> &sources, + const SelectorArray &selector, bool dynamicKPosOccFormat, + const TuneFileIndexing &tuneFileIndexing, const FileHeaderContext &fileHeaderContext, + vespalib::ThreadExecutor & executor) { assert(sources.size() <= 255); uint32_t docIdLimit = selector.size(); @@ -563,46 +503,18 @@ Fusion::merge(const Schema &schema, vespalib::mkdir(dir, false); schema.saveToFile(dir + "/schema.txt"); if (!DocumentSummary::writeDocIdLimit(dir, trimmedDocIdLimit)) { - LOG(error, "Could not write docsum count in dir %s: %s", - dir.c_str(), getLastErrorString().c_str()); + LOG(error, "Could not write docsum count in dir %s: %s", dir.c_str(), getLastErrorString().c_str()); return false; } - std::unique_ptr<Fusion> fusion(new Fusion(dynamicKPosOccFormat, - tuneFileIndexing, - fileHeaderContext)); - fusion->setSchema(&schema); - fusion->setOutDir(dir); - fusion->SetOldIndexList(sources); - if (!fusion->readSchemaFiles()) { - LOG(error, "Cannot read schema files for source indexes"); + try { + auto fusion = std::make_unique<Fusion>(trimmedDocIdLimit, schema, dir, sources, selector, + dynamicKPosOccFormat, tuneFileIndexing, fileHeaderContext); + return fusion->mergeFields(executor); + } catch (const std::exception & e) { + LOG(error, "%s", e.what()); return false; } - uint32_t idx = 0; - std::vector<std::shared_ptr<OldIndex> > &oldIndexes = - fusion->getOldIndexes(); - - for (OldIndexIterator i = oldIndexes.begin(), ie = oldIndexes.end(); - i != ie; ++i, ++idx) { - OldIndex &oi = **i; - // Make tmpindex directories - const vespalib::string &tmpindexpath = oi.getTmpPath(); - vespalib::mkdir(tmpindexpath, false); - DocIdMapping &docIdMapping = oi.getDocIdMapping(); - if (!docIdMapping.readDocIdLimit(oi.getPath())) { - LOG(error, "Cannot determine docIdLimit for old index \"%s\"", - oi.getPath().c_str()); - return false; - } - docIdMapping.setup(docIdMapping._docIdLimit, - &selector, - idx); - } - fusion->setDocIdLimit(trimmedDocIdLimit); - if (!fusion->mergeFields()) { - return false; - } - return true; } } diff --git a/searchlib/src/vespa/searchlib/diskindex/fusion.h b/searchlib/src/vespa/searchlib/diskindex/fusion.h index 520aa7b2203..53f9db75758 100644 --- a/searchlib/src/vespa/searchlib/diskindex/fusion.h +++ b/searchlib/src/vespa/searchlib/diskindex/fusion.h @@ -6,15 +6,11 @@ #include "wordnummapper.h" #include <vespa/searchlib/index/schemautil.h> -#include <vector> -#include <string> +#include <vespa/vespalib/util/threadexecutor.h> namespace search { template <class IN> class PostingPriorityQueue; } - -namespace search::common { - class TuneFileIndexing; - class FileHeaderContext; -} +namespace search { class TuneFileIndexing; } +namespace search::common { class FileHeaderContext; } namespace search::diskindex { @@ -24,139 +20,77 @@ class DictionaryWordReader; class FusionInputIndex { -public: - typedef diskindex::WordNumMapping WordNumMapping; - typedef diskindex::DocIdMapping DocIdMapping; -private: - vespalib::string _path; - WordNumMapping _wordNumMapping; - DocIdMapping _docIdMapping; - vespalib::string _tmpPath; - index::Schema::SP _schema; +private : + vespalib::string _path; + uint32_t _index; + index::Schema _schema; + DocIdMapping _docIdMapping; public: - FusionInputIndex() - : _path(), - _wordNumMapping(), - _docIdMapping(), - _tmpPath(), - _schema() - { - } - - virtual ~FusionInputIndex() {} - - void setPath(const vespalib::string &path) { _path = path; } + FusionInputIndex(const vespalib::string &path, uint32_t index, const SelectorArray & selector); + FusionInputIndex(FusionInputIndex &&) = default; + FusionInputIndex & operator = (FusionInputIndex &&) = default; + ~FusionInputIndex(); + const vespalib::string & getPath() const { return _path; } - void setTmpPath(const vespalib::string &tmpPath) { _tmpPath = tmpPath; } - const vespalib::string &getTmpPath() const { return _tmpPath; } - const WordNumMapping & getWordNumMapping() const { return _wordNumMapping; } - WordNumMapping & getWordNumMapping() { return _wordNumMapping; } + uint32_t getIndex() const { return _index; } const DocIdMapping & getDocIdMapping() const { return _docIdMapping; } - - DocIdMapping & getDocIdMapping() { return _docIdMapping; } - - const index::Schema &getSchema() const { - assert(_schema); - return *_schema; - } - - void setSchema(const index::Schema::SP &schema); + const index::Schema &getSchema() const { return _schema; } }; class Fusion { -public: - typedef search::index::Schema Schema; - typedef search::index::SchemaUtil SchemaUtil; - private: - Fusion(const Fusion &); - Fusion& operator=(const Fusion &); - -public: - Fusion(bool dynamicKPosIndexFormat, - const TuneFileIndexing &tuneFileIndexing, - const search::common::FileHeaderContext &fileHeaderContext); + using Schema = index::Schema; + using SchemaUtil = index::SchemaUtil; + using WordNumMappingList = std::vector<WordNumMapping>; - virtual ~Fusion(); - - void SetOldIndexList(const std::vector<vespalib::string> &oldIndexList); - bool mergeFields(); + bool mergeFields(vespalib::ThreadExecutor & executor); bool mergeField(uint32_t id); - bool openInputFieldReaders(const SchemaUtil::IndexIterator &index, - std::vector<std::unique_ptr<FieldReader> > & - readers); + bool openInputFieldReaders(const SchemaUtil::IndexIterator &index, const WordNumMappingList & list, + std::vector<std::unique_ptr<FieldReader> > & readers); bool openFieldWriter(const SchemaUtil::IndexIterator &index, FieldWriter & writer); bool setupMergeHeap(const std::vector<std::unique_ptr<FieldReader> > & readers, FieldWriter &writer, PostingPriorityQueue<FieldReader> &heap); - bool mergeFieldPostings(const SchemaUtil::IndexIterator &index); - bool openInputWordReaders(const SchemaUtil::IndexIterator &index, + bool mergeFieldPostings(const SchemaUtil::IndexIterator &index, const WordNumMappingList & list, uint64_t numWordIds); + bool openInputWordReaders(const vespalib::string & dir, const SchemaUtil::IndexIterator &index, std::vector<std::unique_ptr<DictionaryWordReader> > &readers, PostingPriorityQueue<DictionaryWordReader> &heap); - bool renumberFieldWordIds(const SchemaUtil::IndexIterator &index); - void setSchema(const Schema *schema); - void setOutDir(const vespalib::string &outDir); - void makeTmpDirs(); - bool CleanTmpDirs(); + bool renumberFieldWordIds(const vespalib::string & dir, const SchemaUtil::IndexIterator &index, + WordNumMappingList & list, uint64_t & numWordIds); + void makeTmpDirs(const vespalib::string & dir); + bool cleanTmpDirs(const vespalib::string & dir); bool readSchemaFiles(); bool checkSchemaCompat(); template <class Reader, class Writer> - static bool - selectCookedOrRawFeatures(Reader &reader, Writer &writer); - -protected: - bool ReadMappingFiles(const SchemaUtil::IndexIterator *index); - bool ReleaseMappingTables(); -protected: - - typedef FusionInputIndex OldIndex; - - const Schema *_schema; // External ownership - std::vector<std::shared_ptr<OldIndex> > _oldIndexes; - typedef std::vector<std::shared_ptr<OldIndex> >::iterator - OldIndexIterator; - - // OUTPUT: + static bool selectCookedOrRawFeatures(Reader &reader, Writer &writer); - uint32_t _docIdLimit; - uint64_t _numWordIds; + bool readMappingFiles(const vespalib::string & dir, const SchemaUtil::IndexIterator *index, WordNumMappingList & list); + const Schema &getSchema() const { return _schema; } - // Index format parameters. - bool _dynamicKPosIndexFormat; + const Schema &_schema; // External ownership + std::vector<FusionInputIndex> _oldIndexes; + const uint32_t _docIdLimit; + const bool _dynamicKPosIndexFormat; + vespalib::string _outDir; - // Index location parameters - - /* - * Output location - */ - vespalib::string _outDir; - - const TuneFileIndexing &_tuneFileIndexing; + const TuneFileIndexing &_tuneFileIndexing; const common::FileHeaderContext &_fileHeaderContext; - - const Schema &getSchema() const { - assert(_schema != nullptr); - return *_schema; - } public: + Fusion(const Fusion &) = delete; + Fusion& operator=(const Fusion &) = delete; + Fusion(uint32_t docIdLimit, const Schema &schema, const vespalib::string &dir, + const std::vector<vespalib::string> & sources, const SelectorArray &selector, bool dynamicKPosIndexFormat, + const TuneFileIndexing &tuneFileIndexing, const common::FileHeaderContext &fileHeaderContext); - void setDocIdLimit(uint32_t docIdLimit) { _docIdLimit = docIdLimit; } - std::vector<std::shared_ptr<OldIndex> > & getOldIndexes() { return _oldIndexes; } - virtual OldIndex *allocOldIndex() { return new OldIndex; } - - /** - * This method is used by new indexing pipeline to merge indexes. - */ - static bool merge(const Schema &schema, - const vespalib::string &dir, - const std::vector<vespalib::string> &sources, - const SelectorArray &docIdSelector, - bool dynamicKPosOccFormat, - const TuneFileIndexing &tuneFileIndexing, - const common::FileHeaderContext &fileHeaderContext); + ~Fusion(); + + static bool + merge(const Schema &schema, const vespalib::string &dir, const std::vector<vespalib::string> &sources, + const SelectorArray &docIdSelector, bool dynamicKPosOccFormat, const TuneFileIndexing &tuneFileIndexing, + const common::FileHeaderContext &fileHeaderContext, vespalib::ThreadExecutor & executor); }; } |