summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2019-05-14 20:07:46 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2019-05-15 20:53:34 +0000
commit8e5370fa867ed87d0b9b2b86f742ee14a3825bd7 (patch)
tree826571fed6668410946d8412f2037e3ac4106a71
parenta65db0dc5b5a272da545e93408526d738f1f696c (diff)
- Provide an executor and use it for fusion.
- Use per field temporary directories. - Keep WordNumMapping on the stack to make it thread safe.
-rw-r--r--searchcore/src/tests/proton/feed_and_search/feed_and_search.cpp52
-rw-r--r--searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.cpp8
-rw-r--r--searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.h7
-rw-r--r--searchcore/src/vespa/searchcore/proton/index/indexmanager.cpp2
-rw-r--r--searchlib/src/tests/diskindex/fusion/fusion_test.cpp52
-rw-r--r--searchlib/src/vespa/searchlib/diskindex/docidmapper.cpp10
-rw-r--r--searchlib/src/vespa/searchlib/diskindex/docidmapper.h13
-rw-r--r--searchlib/src/vespa/searchlib/diskindex/fieldreader.cpp26
-rw-r--r--searchlib/src/vespa/searchlib/diskindex/fieldreader.h2
-rw-r--r--searchlib/src/vespa/searchlib/diskindex/fieldwriter.cpp21
-rw-r--r--searchlib/src/vespa/searchlib/diskindex/fusion.cpp354
-rw-r--r--searchlib/src/vespa/searchlib/diskindex/fusion.h160
12 files changed, 255 insertions, 452 deletions
diff --git a/searchcore/src/tests/proton/feed_and_search/feed_and_search.cpp b/searchcore/src/tests/proton/feed_and_search/feed_and_search.cpp
index b2334ed025e..d0b19d77181 100644
--- a/searchcore/src/tests/proton/feed_and_search/feed_and_search.cpp
+++ b/searchcore/src/tests/proton/feed_and_search/feed_and_search.cpp
@@ -17,6 +17,7 @@ LOG_SETUP("feed_and_search_test");
#include <vespa/searchlib/queryeval/searchiterator.h>
#include <vespa/searchlib/queryeval/fake_requestcontext.h>
#include <vespa/vespalib/testkit/testapp.h>
+#include <vespa/vespalib/util/threadstackexecutor.h>
#include <sstream>
#include <vespa/searchlib/diskindex/fusion.h>
#include <vespa/searchlib/common/documentsummary.h>
@@ -144,6 +145,7 @@ void Test::testSearch(Searchable &source,
// again.
void Test::requireThatMemoryIndexCanBeDumpedAndSearched() {
Schema schema = getSchema();
+ vespalib::ThreadStackExecutor sharedExecutor(2, 0x10000);
search::SequencedTaskExecutor indexFieldInverter(2);
search::SequencedTaskExecutor indexFieldWriter(2);
MemoryIndex memory_index(schema, indexFieldInverter, indexFieldWriter);
@@ -167,8 +169,7 @@ void Test::requireThatMemoryIndexCanBeDumpedAndSearched() {
const uint64_t num_words = memory_index.getNumWords();
search::TuneFileIndexing tuneFileIndexing;
DummyFileHeaderContext fileHeaderContext;
- index_builder.open(docIdLimit, num_words, tuneFileIndexing,
- fileHeaderContext);
+ index_builder.open(docIdLimit, num_words, tuneFileIndexing, fileHeaderContext);
memory_index.dump(index_builder);
index_builder.close();
@@ -177,17 +178,18 @@ void Test::requireThatMemoryIndexCanBeDumpedAndSearched() {
std::vector<string> fusionInputs;
fusionInputs.push_back(index_dir);
uint32_t fusionDocIdLimit = 0;
- typedef search::diskindex::Fusion FastS_Fusion;
+ using Fusion = search::diskindex::Fusion;
bool fret1 = DocumentSummary::readDocIdLimit(index_dir, fusionDocIdLimit);
ASSERT_TRUE(fret1);
SelectorArray selector(fusionDocIdLimit, 0);
- bool fret2 = FastS_Fusion::merge(schema,
- index_dir2,
- fusionInputs,
- selector,
- false /* dynamicKPosOccFormat */,
- tuneFileIndexing,
- fileHeaderContext);
+ bool fret2 = Fusion::merge(schema,
+ index_dir2,
+ fusionInputs,
+ selector,
+ false /* dynamicKPosOccFormat */,
+ tuneFileIndexing,
+ fileHeaderContext,
+ sharedExecutor);
ASSERT_TRUE(fret2);
// Fusion test with all docs removed in output (doesn't affect word list)
@@ -198,13 +200,14 @@ void Test::requireThatMemoryIndexCanBeDumpedAndSearched() {
bool fret3 = DocumentSummary::readDocIdLimit(index_dir, fusionDocIdLimit);
ASSERT_TRUE(fret3);
SelectorArray selector2(fusionDocIdLimit, 1);
- bool fret4 = FastS_Fusion::merge(schema,
- index_dir3,
- fusionInputs,
- selector2,
- false /* dynamicKPosOccFormat */,
- tuneFileIndexing,
- fileHeaderContext);
+ bool fret4 = Fusion::merge(schema,
+ index_dir3,
+ fusionInputs,
+ selector2,
+ false /* dynamicKPosOccFormat */,
+ tuneFileIndexing,
+ fileHeaderContext,
+ sharedExecutor);
ASSERT_TRUE(fret4);
// Fusion test with all docs removed in input (affects word list)
@@ -215,13 +218,14 @@ void Test::requireThatMemoryIndexCanBeDumpedAndSearched() {
bool fret5 = DocumentSummary::readDocIdLimit(index_dir3, fusionDocIdLimit);
ASSERT_TRUE(fret5);
SelectorArray selector3(fusionDocIdLimit, 0);
- bool fret6 = FastS_Fusion::merge(schema,
- index_dir4,
- fusionInputs,
- selector3,
- false /* dynamicKPosOccFormat */,
- tuneFileIndexing,
- fileHeaderContext);
+ bool fret6 = Fusion::merge(schema,
+ index_dir4,
+ fusionInputs,
+ selector3,
+ false /* dynamicKPosOccFormat */,
+ tuneFileIndexing,
+ fileHeaderContext,
+ sharedExecutor);
ASSERT_TRUE(fret6);
DiskIndex disk_index(index_dir);
diff --git a/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.cpp b/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.cpp
index 1abe9540859..113855d7e2b 100644
--- a/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.cpp
+++ b/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.cpp
@@ -1,9 +1,9 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "summarymanager.h"
#include "documentstoreadapter.h"
#include "summarycompacttarget.h"
#include "summaryflushtarget.h"
-#include "summarymanager.h"
#include <vespa/config/print/ostreamconfigwriter.h>
#include <vespa/document/repo/documenttyperepo.h>
#include <vespa/juniper/rpinterface.h>
@@ -12,6 +12,8 @@
#include <vespa/vespalib/util/lambdatask.h>
#include <vespa/searchsummary/docsummary/docsumconfig.h>
#include <vespa/vespalib/util/exceptions.h>
+#include <vespa/fastlib/text/normwordfolder.h>
+
#include <sstream>
#include <vespa/log/log.h>
@@ -84,7 +86,7 @@ SummarySetup(const vespalib::string & baseDir, const DocTypeName & docTypeName,
const search::IAttributeManager::SP &attributeMgr, const search::IDocumentStore::SP & docStore,
const std::shared_ptr<const DocumentTypeRepo> &repo)
: _docsumWriter(),
- _wordFolder(),
+ _wordFolder(std::make_unique<Fast_NormalizeWordFolder>()),
_juniperProps(juniperCfg),
_juniperConfig(),
_attributeMgr(attributeMgr),
@@ -103,7 +105,7 @@ SummarySetup(const vespalib::string & baseDir, const DocTypeName & docTypeName,
baseDir.c_str(), oss.str().c_str()));
}
- _juniperConfig = std::make_unique<juniper::Juniper>(&_juniperProps, &_wordFolder);
+ _juniperConfig = std::make_unique<juniper::Juniper>(&_juniperProps, _wordFolder.get());
_docsumWriter = std::make_unique<DynamicDocsumWriter>(resultConfig.release(), nullptr);
DynamicDocsumConfig dynCfg(this, _docsumWriter.get());
dynCfg.configure(summarymapCfg);
diff --git a/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.h b/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.h
index 8df65e3e2ff..4fde28e69de 100644
--- a/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.h
+++ b/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.h
@@ -10,13 +10,14 @@
#include <vespa/searchlib/docstore/logdocumentstore.h>
#include <vespa/searchlib/transactionlog/syncproxy.h>
#include <vespa/document/fieldvalue/document.h>
-#include <vespa/vespalib/util/threadstackexecutor.h>
-#include <vespa/fastlib/text/normwordfolder.h>
+#include <vespa/vespalib/util/threadexecutor.h>
namespace searchcorespi::index { struct IThreadService; }
namespace search { class IBucketizer; }
namespace search::common { class FileHeaderContext; }
+class Fast_NormalizeWordFolder;
+
namespace proton {
class SummaryManager : public ISummaryManager
@@ -25,7 +26,7 @@ public:
class SummarySetup : public ISummarySetup {
private:
std::unique_ptr<search::docsummary::DynamicDocsumWriter> _docsumWriter;
- Fast_NormalizeWordFolder _wordFolder;
+ std::unique_ptr<Fast_NormalizeWordFolder> _wordFolder;
search::docsummary::JuniperProperties _juniperProps;
std::unique_ptr<juniper::Juniper> _juniperConfig;
search::IAttributeManager::SP _attributeMgr;
diff --git a/searchcore/src/vespa/searchcore/proton/index/indexmanager.cpp b/searchcore/src/vespa/searchcore/proton/index/indexmanager.cpp
index 2139e34818f..b74cc2c603f 100644
--- a/searchcore/src/vespa/searchcore/proton/index/indexmanager.cpp
+++ b/searchcore/src/vespa/searchcore/proton/index/indexmanager.cpp
@@ -66,7 +66,7 @@ IndexManager::MaintainerOperations::runFusion(const Schema &schema,
SerialNumFileHeaderContext fileHeaderContext(_fileHeaderContext, serialNum);
const bool dynamic_k_doc_pos_occ_format = false;
return Fusion::merge(schema, outputDir, sources, selectorArray, dynamic_k_doc_pos_occ_format,
- _tuneFileIndexing, fileHeaderContext);
+ _tuneFileIndexing, fileHeaderContext, _threadingService.shared());
}
diff --git a/searchlib/src/tests/diskindex/fusion/fusion_test.cpp b/searchlib/src/tests/diskindex/fusion/fusion_test.cpp
index 24146e516a0..c46869392f7 100644
--- a/searchlib/src/tests/diskindex/fusion/fusion_test.cpp
+++ b/searchlib/src/tests/diskindex/fusion/fusion_test.cpp
@@ -16,6 +16,7 @@
#include <vespa/searchlib/memoryindex/field_index_collection.h>
#include <vespa/searchlib/memoryindex/posting_iterator.h>
#include <vespa/searchlib/util/filekit.h>
+#include <vespa/vespalib/util/threadstackexecutor.h>
#include <vespa/vespalib/testkit/testapp.h>
#include <vespa/log/log.h>
@@ -38,7 +39,6 @@ using namespace index;
namespace diskindex {
-
class Test : public vespalib::TestApp
{
private:
@@ -62,8 +62,7 @@ myPushDocument(DocumentInverter &inv, FieldIndexCollection &fieldIndexes)
}
vespalib::string
-toString(FieldPositionsIterator posItr,
- bool hasElements = false, bool hasWeights = false)
+toString(FieldPositionsIterator posItr, bool hasElements = false, bool hasWeights = false)
{
vespalib::asciistream ss;
ss << "{";
@@ -121,9 +120,7 @@ toString(DocIdAndFeatures &features)
void
-validateDiskIndex(DiskIndex &dw,
- bool f2HasElements,
- bool f3HasWeights)
+validateDiskIndex(DiskIndex &dw, bool f2HasElements, bool f3HasWeights)
{
typedef DiskIndex::LookupResult LR;
typedef index::PostingListHandle PH;
@@ -239,16 +236,13 @@ validateDiskIndex(DiskIndex &dw,
void
-Test::requireThatFusionIsWorking(const vespalib::string &prefix,
- bool directio,
- bool readmmap)
+Test::requireThatFusionIsWorking(const vespalib::string &prefix, bool directio, bool readmmap)
{
Schema schema;
Schema schema2;
Schema schema3;
for (SchemaUtil::IndexIterator it(getSchema()); it.isValid(); ++it) {
- const Schema::IndexField &iField =
- _schema.getIndexField(it.getIndex());
+ const Schema::IndexField &iField = _schema.getIndexField(it.getIndex());
schema.addIndexField(Schema::IndexField(iField.getName(),
iField.getDataType(),
iField.getCollectionType()));
@@ -350,6 +344,7 @@ Test::requireThatFusionIsWorking(const vespalib::string &prefix,
EXPECT_TRUE(FileKit::hasStamp(tsName));
EXPECT_TRUE(FileKit::removeStamp(tsName));
EXPECT_FALSE(FileKit::hasStamp(tsName));
+ vespalib::ThreadStackExecutor executor(4, 0x10000);
do {
DiskIndex dw2(prefix + "dump2");
@@ -362,12 +357,9 @@ Test::requireThatFusionIsWorking(const vespalib::string &prefix,
std::vector<vespalib::string> sources;
SelectorArray selector(numDocs, 0);
sources.push_back(prefix + "dump2");
- if (!EXPECT_TRUE(Fusion::merge(schema,
- prefix + "dump3",
- sources, selector,
+ if (!EXPECT_TRUE(Fusion::merge(schema, prefix + "dump3", sources, selector,
dynamicKPosOcc,
- tuneFileIndexing,
- fileHeaderContext)))
+ tuneFileIndexing,fileHeaderContext, executor)))
return;
} while (0);
do {
@@ -380,12 +372,9 @@ Test::requireThatFusionIsWorking(const vespalib::string &prefix,
std::vector<vespalib::string> sources;
SelectorArray selector(numDocs, 0);
sources.push_back(prefix + "dump3");
- if (!EXPECT_TRUE(Fusion::merge(schema2,
- prefix + "dump4",
- sources, selector,
+ if (!EXPECT_TRUE(Fusion::merge(schema2, prefix + "dump4", sources, selector,
dynamicKPosOcc,
- tuneFileIndexing,
- fileHeaderContext)))
+ tuneFileIndexing, fileHeaderContext, executor)))
return;
} while (0);
do {
@@ -398,12 +387,9 @@ Test::requireThatFusionIsWorking(const vespalib::string &prefix,
std::vector<vespalib::string> sources;
SelectorArray selector(numDocs, 0);
sources.push_back(prefix + "dump3");
- if (!EXPECT_TRUE(Fusion::merge(schema3,
- prefix + "dump5",
- sources, selector,
+ if (!EXPECT_TRUE(Fusion::merge(schema3, prefix + "dump5", sources, selector,
dynamicKPosOcc,
- tuneFileIndexing,
- fileHeaderContext)))
+ tuneFileIndexing, fileHeaderContext, executor)))
return;
} while (0);
do {
@@ -416,12 +402,9 @@ Test::requireThatFusionIsWorking(const vespalib::string &prefix,
std::vector<vespalib::string> sources;
SelectorArray selector(numDocs, 0);
sources.push_back(prefix + "dump3");
- if (!EXPECT_TRUE(Fusion::merge(schema,
- prefix + "dump6",
- sources, selector,
+ if (!EXPECT_TRUE(Fusion::merge(schema, prefix + "dump6", sources, selector,
!dynamicKPosOcc,
- tuneFileIndexing,
- fileHeaderContext)))
+ tuneFileIndexing, fileHeaderContext, executor)))
return;
} while (0);
do {
@@ -434,12 +417,9 @@ Test::requireThatFusionIsWorking(const vespalib::string &prefix,
std::vector<vespalib::string> sources;
SelectorArray selector(numDocs, 0);
sources.push_back(prefix + "dump2");
- if (!EXPECT_TRUE(Fusion::merge(schema,
- prefix + "dump3",
- sources, selector,
+ if (!EXPECT_TRUE(Fusion::merge(schema, prefix + "dump3", sources, selector,
dynamicKPosOcc,
- tuneFileIndexing,
- fileHeaderContext)))
+ tuneFileIndexing, fileHeaderContext, executor)))
return;
} while (0);
do {
diff --git a/searchlib/src/vespa/searchlib/diskindex/docidmapper.cpp b/searchlib/src/vespa/searchlib/diskindex/docidmapper.cpp
index f8534ed62ff..489986f1f5c 100644
--- a/searchlib/src/vespa/searchlib/diskindex/docidmapper.cpp
+++ b/searchlib/src/vespa/searchlib/diskindex/docidmapper.cpp
@@ -2,16 +2,14 @@
#include "docidmapper.h"
#include <vespa/searchlib/common/documentsummary.h>
-#include <vespa/searchlib/common/bitvector.h>
-#include <vespa/fastlib/io/bufferedfile.h>
#define NO_DOC static_cast<uint32_t>(-1)
namespace search::diskindex {
DocIdMapping::DocIdMapping()
- : _docIdLimit(0u),
- _selector(nullptr),
+ : _selector(nullptr),
+ _docIdLimit(0u),
_selectorId(0)
{
}
@@ -36,9 +34,7 @@ DocIdMapping::setup(uint32_t docIdLimit)
void
-DocIdMapping::setup(uint32_t docIdLimit,
- const SelectorArray *selector,
- uint8_t selectorId)
+DocIdMapping::setup(uint32_t docIdLimit, const SelectorArray *selector, uint8_t selectorId)
{
_docIdLimit = docIdLimit;
_selector = selector;
diff --git a/searchlib/src/vespa/searchlib/diskindex/docidmapper.h b/searchlib/src/vespa/searchlib/diskindex/docidmapper.h
index 08b4865cb0a..a8c33eaed2f 100644
--- a/searchlib/src/vespa/searchlib/diskindex/docidmapper.h
+++ b/searchlib/src/vespa/searchlib/diskindex/docidmapper.h
@@ -9,13 +9,13 @@ namespace search { class BitVector; }
namespace search::diskindex {
-typedef vespalib::Array<uint8_t> SelectorArray;
+using SelectorArray = vespalib::Array<uint8_t>;
class DocIdMapping
{
public:
- uint32_t _docIdLimit;
const SelectorArray *_selector; // External ownership
+ uint32_t _docIdLimit;
uint8_t _selectorId;
DocIdMapping();
@@ -32,7 +32,7 @@ public:
const uint8_t *_selector;
uint32_t _docIdLimit; // Limit on legal input values
uint32_t _selectorLimit; // Limit on output
- uint8_t _selectorId;
+ uint8_t _selectorId;
DocIdMapper()
: _selector(nullptr),
@@ -42,12 +42,9 @@ public:
{ }
void setup(const DocIdMapping &mapping) {
- _selector = (mapping._selector != nullptr) ?
- &((*mapping._selector)[0]) : nullptr;
+ _selector = (mapping._selector != nullptr) ? &((*mapping._selector)[0]) : nullptr;
_docIdLimit = mapping._docIdLimit;
- _selectorLimit = (mapping._selector != nullptr) ?
- (*mapping._selector).size() :
- 0u;
+ _selectorLimit = (mapping._selector != nullptr) ? (*mapping._selector).size() : 0u;
_selectorId = mapping._selectorId;
}
diff --git a/searchlib/src/vespa/searchlib/diskindex/fieldreader.cpp b/searchlib/src/vespa/searchlib/diskindex/fieldreader.cpp
index 04af3a18026..c4cd6d3a22e 100644
--- a/searchlib/src/vespa/searchlib/diskindex/fieldreader.cpp
+++ b/searchlib/src/vespa/searchlib/diskindex/fieldreader.cpp
@@ -110,36 +110,26 @@ FieldReader::open(const vespalib::string &prefix,
{
vespalib::string name = prefix + "posocc.dat.compressed";
FastOS_StatInfo statInfo;
- bool statres;
- statres = FastOS_File::Stat(name.c_str(), &statInfo);
+ bool statres = FastOS_File::Stat(name.c_str(), &statInfo);
if (!statres) {
- LOG(error,
- "Could not stat compressed posocc file %s: %s",
- name.c_str(), getLastErrorString().c_str());
+ LOG(error, "Could not stat compressed posocc file %s: %s", name.c_str(), getLastErrorString().c_str());
return false;
}
_dictFile = std::make_unique<PageDict4FileSeqRead>();
PostingListParams featureParams;
- _oldposoccfile = makePosOccRead(name,
- _dictFile.get(),
- featureParams,
- tuneFileRead);
+ _oldposoccfile = makePosOccRead(name, _dictFile.get(), featureParams, tuneFileRead);
vespalib::string cname = prefix + "dictionary";
if (!_dictFile->open(cname, tuneFileRead)) {
- LOG(error,
- "Could not open posocc count file %s for read",
- cname.c_str());
+ LOG(error, "Could not open posocc count file %s for read", cname.c_str());
return false;
}
// open posocc.dat
if (!_oldposoccfile || !_oldposoccfile->open(name, tuneFileRead)) {
- LOG(error,
- "Could not open posocc file %s for read",
- name.c_str());
+ LOG(error, "Could not open posocc file %s for read", name.c_str());
return false;
}
_oldWordNum = noWordNum();
@@ -158,8 +148,7 @@ FieldReader::close()
if (_oldposoccfile) {
bool closeRes = _oldposoccfile->close();
if (!closeRes) {
- LOG(error,
- "Could not close posocc file for read");
+ LOG(error, "Could not close posocc file for read");
ret = false;
}
_oldposoccfile.reset();
@@ -167,8 +156,7 @@ FieldReader::close()
if (_dictFile) {
bool closeRes = _dictFile->close();
if (!closeRes) {
- LOG(error,
- "Could not close posocc file for read");
+ LOG(error, "Could not close posocc file for read");
ret = false;
}
_dictFile.reset();
diff --git a/searchlib/src/vespa/searchlib/diskindex/fieldreader.h b/searchlib/src/vespa/searchlib/diskindex/fieldreader.h
index 8e79c892f6e..243da21731b 100644
--- a/searchlib/src/vespa/searchlib/diskindex/fieldreader.h
+++ b/searchlib/src/vespa/searchlib/diskindex/fieldreader.h
@@ -39,7 +39,7 @@ public:
typedef index::PostingListCounts PostingListCounts;
typedef index::PostingListParams PostingListParams;
- uint64_t _wordNum;
+ uint64_t _wordNum;
DocIdAndFeatures _docIdAndFeatures;
protected:
std::unique_ptr<DictionaryFileSeqRead> _dictFile;
diff --git a/searchlib/src/vespa/searchlib/diskindex/fieldwriter.cpp b/searchlib/src/vespa/searchlib/diskindex/fieldwriter.cpp
index bd30074836e..7fb575da7f3 100644
--- a/searchlib/src/vespa/searchlib/diskindex/fieldwriter.cpp
+++ b/searchlib/src/vespa/searchlib/diskindex/fieldwriter.cpp
@@ -14,8 +14,7 @@ namespace search::diskindex {
using vespalib::getLastErrorString;
using common::FileHeaderContext;
-FieldWriter::FieldWriter(uint32_t docIdLimit,
- uint64_t numWordIds)
+FieldWriter::FieldWriter(uint32_t docIdLimit, uint64_t numWordIds)
: _wordNum(noWordNum()),
_prevDocId(0),
_dictFile(),
@@ -50,10 +49,7 @@ FieldWriter::open(const vespalib::string &prefix,
PostingListParams featureParams;
PostingListParams countParams;
- diskindex::setupDefaultPosOccParameters(&countParams,
- &params,
- _numWordIds,
- _docIdLimit);
+ diskindex::setupDefaultPosOccParameters(&countParams, &params, _numWordIds, _docIdLimit);
if (minSkipDocs != 0) {
countParams.set("minSkipDocs", minSkipDocs);
@@ -70,12 +66,7 @@ FieldWriter::open(const vespalib::string &prefix,
_dictFile = std::make_unique<PageDict4FileSeqWrite>();
_dictFile->setParams(countParams);
- _posoccfile = diskindex::makePosOccWrite(_dictFile.get(),
- dynamicKPosOccFormat,
- params,
- featureParams,
- schema,
- indexId);
+ _posoccfile = makePosOccWrite(_dictFile.get(), dynamicKPosOccFormat, params, featureParams, schema, indexId);
vespalib::string cname = _prefix + "dictionary";
// Open output dictionary file
@@ -149,8 +140,7 @@ FieldWriter::close()
if (_posoccfile) {
bool closeRes = _posoccfile->close();
if (!closeRes) {
- LOG(error,
- "Could not close posocc file for write");
+ LOG(error, "Could not close posocc file for write");
ret = false;
}
_posoccfile.reset();
@@ -158,8 +148,7 @@ FieldWriter::close()
if (_dictFile) {
bool closeRes = _dictFile->close();
if (!closeRes) {
- LOG(error,
- "Could not close posocc count file for write");
+ LOG(error, "Could not close posocc count file for write");
ret = false;
}
_dictFile.reset();
diff --git a/searchlib/src/vespa/searchlib/diskindex/fusion.cpp b/searchlib/src/vespa/searchlib/diskindex/fusion.cpp
index 6d95f87b6a6..79e8ca5d718 100644
--- a/searchlib/src/vespa/searchlib/diskindex/fusion.cpp
+++ b/searchlib/src/vespa/searchlib/diskindex/fusion.cpp
@@ -9,9 +9,14 @@
#include <vespa/vespalib/io/fileutil.h>
#include <vespa/searchlib/common/documentsummary.h>
#include <vespa/vespalib/util/error.h>
+#include <vespa/vespalib/util/lambdatask.h>
+#include <vespa/vespalib/util/count_down_latch.h>
+#include <vespa/vespalib/stllike/asciistream.h>
#include <sstream>
#include <vespa/log/log.h>
+#include <vespa/vespalib/util/exceptions.h>
+
LOG_SETUP(".diskindex.fusion");
using search::FileKit;
@@ -25,78 +30,83 @@ using search::index::Schema;
using search::index::SchemaUtil;
using search::index::schema::DataType;
using vespalib::getLastErrorString;
+using vespalib::IllegalArgumentException;
+using vespalib::make_string;
namespace search::diskindex {
-void
-FusionInputIndex::setSchema(const Schema::SP &schema)
-{
- _schema = schema;
-}
+namespace {
-Fusion::Fusion(bool dynamicKPosIndexFormat,
- const TuneFileIndexing &tuneFileIndexing,
- const FileHeaderContext &fileHeaderContext)
- : _schema(nullptr),
- _oldIndexes(),
- _docIdLimit(0u),
- _numWordIds(0u),
- _dynamicKPosIndexFormat(dynamicKPosIndexFormat),
- _outDir("merged"),
- _tuneFileIndexing(tuneFileIndexing),
- _fileHeaderContext(fileHeaderContext)
-{ }
+vespalib::string
+createTmpPath(const vespalib::string & base, uint32_t index) {
+ vespalib::asciistream os;
+ os << base;
+ os << "/tmpindex";
+ os << index;
+ return os.str();
+}
-Fusion::~Fusion()
+std::vector<FusionInputIndex>
+createInputIndexes(const std::vector<vespalib::string> & sources, const SelectorArray &selector)
{
- ReleaseMappingTables();
+ std::vector<FusionInputIndex> indexes;
+ indexes.reserve(sources.size());
+ uint32_t i = 0;
+ for (const auto & source : sources) {
+ indexes.emplace_back(source, i++, selector);
+ }
+ return indexes;
}
-void
-Fusion::setSchema(const Schema *schema)
-{
- _schema = schema;
}
-void
-Fusion::setOutDir(const vespalib::string &outDir)
+FusionInputIndex::FusionInputIndex(const vespalib::string &path, uint32_t index, const SelectorArray &selector)
+ : _path(path),
+ _index(index),
+ _schema()
{
- _outDir = outDir;
+ vespalib::string fname = path + "/schema.txt";
+ if ( ! _schema.loadFromFile(fname)) {
+ throw IllegalArgumentException(make_string("Failed loading schema %s", fname.c_str()));
+ }
+ if ( ! SchemaUtil::validateSchema(_schema)) {
+ throw IllegalArgumentException(make_string("Failed validating schema %s", fname.c_str()));
+ }
+ if (!_docIdMapping.readDocIdLimit(path)) {
+ throw IllegalArgumentException(make_string("Cannot determine docIdLimit for old index \"%s\"", path.c_str()));
+ }
+ _docIdMapping.setup(_docIdMapping._docIdLimit, &selector, index);
}
-void
-Fusion::SetOldIndexList(const std::vector<vespalib::string> &oldIndexList)
+FusionInputIndex::~FusionInputIndex() = default;
+
+Fusion::Fusion(uint32_t docIdLimit, const Schema & schema, const vespalib::string & dir,
+ const std::vector<vespalib::string> & sources, const SelectorArray &selector,
+ bool dynamicKPosIndexFormat, const TuneFileIndexing &tuneFileIndexing,
+ const FileHeaderContext &fileHeaderContext)
+ : _schema(schema),
+ _oldIndexes(createInputIndexes(sources, selector)),
+ _docIdLimit(docIdLimit),
+ _dynamicKPosIndexFormat(dynamicKPosIndexFormat),
+ _outDir(dir),
+ _tuneFileIndexing(tuneFileIndexing),
+ _fileHeaderContext(fileHeaderContext)
{
- _oldIndexes.resize(oldIndexList.size());
- OldIndexIterator oldIndexIt = _oldIndexes.begin();
- uint32_t i = 0;
- for (std::vector<vespalib::string>::const_iterator
- it = oldIndexList.begin(), ite = oldIndexList.end();
- it != ite;
- ++it, ++oldIndexIt, ++i) {
- oldIndexIt->reset(allocOldIndex());
- OldIndex &oi = **oldIndexIt;
- oi.setPath(*it);
- std::ostringstream tmpindexpath0;
- tmpindexpath0 << _outDir;
- tmpindexpath0 << "/tmpindex";
- tmpindexpath0 << i;
- oi.setTmpPath(tmpindexpath0.str());
+ if (!readSchemaFiles()) {
+ throw IllegalArgumentException("Cannot read schema files for source indexes");
}
}
+Fusion::~Fusion() = default;
bool
-Fusion::openInputWordReaders(const SchemaUtil::IndexIterator &index,
- std::vector<
- std::unique_ptr<DictionaryWordReader> > &
- readers,
+Fusion::openInputWordReaders(const vespalib::string & dir, const SchemaUtil::IndexIterator &index,
+ std::vector<std::unique_ptr<DictionaryWordReader> > & readers,
PostingPriorityQueue<DictionaryWordReader> &heap)
{
- for (auto &i : getOldIndexes()) {
- OldIndex &oi = *i;
+ for (auto & oi : _oldIndexes) {
auto reader(std::make_unique<DictionaryWordReader>());
- const vespalib::string &tmpindexpath = oi.getTmpPath();
+ const vespalib::string &tmpindexpath = createTmpPath(dir, oi.getIndex());
const vespalib::string &oldindexpath = oi.getPath();
vespalib::string wordMapName = tmpindexpath + "/old2new.dat";
vespalib::string fieldDir(oldindexpath + "/" + index.getName());
@@ -105,12 +115,9 @@ Fusion::openInputWordReaders(const SchemaUtil::IndexIterator &index,
if (!index.hasOldFields(oldSchema)) {
continue; // drop data
}
- bool res = reader->open(dictName,
- wordMapName,
- _tuneFileIndexing._read);
+ bool res = reader->open(dictName, wordMapName, _tuneFileIndexing._read);
if (!res) {
- LOG(error, "Could not open dictionary %s to generate %s",
- dictName.c_str(), wordMapName.c_str());
+ LOG(error, "Could not open dictionary %s to generate %s", dictName.c_str(), wordMapName.c_str());
return false;
}
reader->read();
@@ -124,7 +131,8 @@ Fusion::openInputWordReaders(const SchemaUtil::IndexIterator &index,
bool
-Fusion::renumberFieldWordIds(const SchemaUtil::IndexIterator &index)
+Fusion::renumberFieldWordIds(const vespalib::string & dir, const SchemaUtil::IndexIterator &index,
+ WordNumMappingList & list, uint64_t & numWordIds)
{
vespalib::string indexName = index.getName();
LOG(debug, "Renumber word IDs for field %s", indexName.c_str());
@@ -133,12 +141,12 @@ Fusion::renumberFieldWordIds(const SchemaUtil::IndexIterator &index)
PostingPriorityQueue<DictionaryWordReader> heap;
WordAggregator out;
- if (!openInputWordReaders(index, readers, heap)) {
+ if (!openInputWordReaders(dir, index, readers, heap)) {
return false;
}
heap.merge(out, 4);
assert(heap.empty());
- _numWordIds = out.getWordNum();
+ numWordIds = out.getWordNum();
// Close files
for (auto &i : readers) {
@@ -147,28 +155,33 @@ Fusion::renumberFieldWordIds(const SchemaUtil::IndexIterator &index)
// Now read mapping files back into an array
// XXX: avoid this, and instead make the array here
- if (!ReadMappingFiles(&index)) {
+ if (!readMappingFiles(dir, &index, list)) {
return false;
}
- LOG(debug, "Finished renumbering words IDs for field %s",
- indexName.c_str());
+ LOG(debug, "Finished renumbering words IDs for field %s", indexName.c_str());
return true;
}
bool
-Fusion::mergeFields()
+Fusion::mergeFields(vespalib::ThreadExecutor & executor)
{
- typedef SchemaUtil::IndexIterator IndexIterator;
-
const Schema &schema = getSchema();
- for (IndexIterator index(schema); index.isValid(); ++index) {
- if (!mergeField(index.getIndex())) {
- return false;
- }
+ std::atomic<uint32_t> failed(0);
+ vespalib::CountDownLatch done(schema.getNumIndexFields());
+ for (SchemaUtil::IndexIterator iter(schema); iter.isValid(); ++iter) {
+ executor.execute(vespalib::makeLambdaTask([this, index=iter.getIndex(), &failed, &done]() {
+ if (!mergeField(index)) {
+ failed++;
+ }
+ done.countDown();
+ }));
}
- return true;
+ LOG(info, "Waiting for %u fields", schema.getNumIndexFields());
+ done.await();
+ LOG(info, "Done waiting for %u fields", schema.getNumIndexFields());
+ return (failed == 0u);
}
@@ -190,37 +203,35 @@ Fusion::mergeField(uint32_t id)
if (FileKit::hasStamp(indexDir + "/.mergeocc_done")) {
return true;
}
- vespalib::mkdir(indexDir.c_str(), false);
+ vespalib::mkdir(indexDir, false);
- LOG(debug, "mergeField for field %s dir %s",
- indexName.c_str(), indexDir.c_str());
+ LOG(info, "mergeField for field %s dir %s", indexName.c_str(), indexDir.c_str());
- makeTmpDirs();
+ makeTmpDirs(indexDir);
- if (!renumberFieldWordIds(index)) {
- LOG(error, "Could not renumber field word ids for field %s dir %s",
- indexName.c_str(), indexDir.c_str());
+ WordNumMappingList list(_oldIndexes.size());
+ uint64_t numWordIds(0);
+ if (!renumberFieldWordIds(indexDir, index, list, numWordIds)) {
+ LOG(error, "Could not renumber field word ids for field %s dir %s", indexName.c_str(), indexDir.c_str());
return false;
}
// Tokamak
- bool res = mergeFieldPostings(index);
+ bool res = mergeFieldPostings(index, list, numWordIds);
if (!res) {
- LOG(error, "Could not merge field postings for field %s dir %s",
- indexName.c_str(), indexDir.c_str());
- LOG_ABORT("should not be reached");
+ throw IllegalArgumentException(make_string("Could not merge field postings for field %s dir %s",
+ indexName.c_str(), indexDir.c_str()));
}
if (!FileKit::createStamp(indexDir + "/.mergeocc_done")) {
return false;
}
vespalib::File::sync(indexDir);
- if (!CleanTmpDirs()) {
+ if (!cleanTmpDirs(indexDir)) {
return false;
}
- LOG(debug, "Finished mergeField for field %s dir %s",
- indexName.c_str(), indexDir.c_str());
+ LOG(info, "Finished mergeField for field %s dir %s", indexName.c_str(), indexDir.c_str());
return true;
}
@@ -264,8 +275,7 @@ Fusion::selectCookedOrRawFeatures(Reader &reader, Writer &writer)
}
}
if (!cookedFormatOK) {
- LOG(error,
- "Cannot perform fusion, cooked feature formats don't match");
+ LOG(error, "Cannot perform fusion, cooked feature formats don't match");
return false;
}
if (rawFormatOK) {
@@ -288,20 +298,17 @@ Fusion::selectCookedOrRawFeatures(Reader &reader, Writer &writer)
bool
-Fusion::openInputFieldReaders(const SchemaUtil::IndexIterator &index,
- std::vector<std::unique_ptr<FieldReader> > &
- readers)
+Fusion::openInputFieldReaders(const SchemaUtil::IndexIterator &index, const WordNumMappingList & list,
+ std::vector<std::unique_ptr<FieldReader> > & readers)
{
vespalib::string indexName = index.getName();
- for (auto &i : _oldIndexes) {
- OldIndex &oi = *i;
+ for (const auto &oi : _oldIndexes) {
const Schema &oldSchema = oi.getSchema();
if (!index.hasOldFields(oldSchema)) {
continue; // drop data
}
auto reader = FieldReader::allocFieldReader(index, oldSchema);
- reader->setup(oi.getWordNumMapping(),
- oi.getDocIdMapping());
+ reader->setup(list[oi.getIndex()], oi.getDocIdMapping());
if (!reader->open(oi.getPath() + "/" + indexName + "/", _tuneFileIndexing._read)) {
return false;
}
@@ -312,33 +319,21 @@ Fusion::openInputFieldReaders(const SchemaUtil::IndexIterator &index,
bool
-Fusion::openFieldWriter(const SchemaUtil::IndexIterator &index,
- FieldWriter &writer)
+Fusion::openFieldWriter(const SchemaUtil::IndexIterator &index, FieldWriter &writer)
{
vespalib::string dir = _outDir + "/" + index.getName();
- if (!writer.open(dir + "/",
- 64,
- 262144,
- _dynamicKPosIndexFormat, false,
- index.getSchema(),
- index.getIndex(),
- _tuneFileIndexing._write,
- _fileHeaderContext)) {
- LOG(error, "Could not open output posocc + dictionary in %s",
- dir.c_str());
- LOG_ABORT("should not be reached");
- return false;
+ if (!writer.open(dir + "/", 64, 262144, _dynamicKPosIndexFormat, false, index.getSchema(),
+ index.getIndex(), _tuneFileIndexing._write, _fileHeaderContext)) {
+ throw IllegalArgumentException(make_string("Could not open output posocc + dictionary in %s", dir.c_str()));
}
return true;
}
bool
-Fusion::setupMergeHeap(const std::vector<std::unique_ptr<FieldReader> > &
- readers,
- FieldWriter &writer,
- PostingPriorityQueue<FieldReader> &heap)
+Fusion::setupMergeHeap(const std::vector<std::unique_ptr<FieldReader> > & readers,
+ FieldWriter &writer, PostingPriorityQueue<FieldReader> &heap)
{
for (auto &reader : readers) {
if (!selectCookedOrRawFeatures(*reader, writer)) {
@@ -356,15 +351,15 @@ Fusion::setupMergeHeap(const std::vector<std::unique_ptr<FieldReader> > &
bool
-Fusion::mergeFieldPostings(const SchemaUtil::IndexIterator &index)
+Fusion::mergeFieldPostings(const SchemaUtil::IndexIterator &index, const WordNumMappingList & list, uint64_t numWordIds)
{
std::vector<std::unique_ptr<FieldReader>> readers;
PostingPriorityQueue<FieldReader> heap;
/* OUTPUT */
- FieldWriter fieldWriter(_docIdLimit, _numWordIds);
+ FieldWriter fieldWriter(_docIdLimit, numWordIds);
vespalib::string indexName = index.getName();
- if (!openInputFieldReaders(index, readers)) {
+ if (!openInputFieldReaders(index, list, readers)) {
return false;
}
if (!openFieldWriter(index, fieldWriter)) {
@@ -383,32 +378,23 @@ Fusion::mergeFieldPostings(const SchemaUtil::IndexIterator &index)
}
}
if (!fieldWriter.close()) {
- LOG(error, "Could not close output posocc + dictionary in %s/%s",
- _outDir.c_str(), indexName.c_str());
- LOG_ABORT("should not be reached");
+ throw IllegalArgumentException(make_string("Could not close output posocc + dictionary in %s/%s",
+ _outDir.c_str(), indexName.c_str()));
}
return true;
}
bool
-Fusion::ReadMappingFiles(const SchemaUtil::IndexIterator *index)
+Fusion::readMappingFiles(const vespalib::string & dir, const SchemaUtil::IndexIterator *index, WordNumMappingList & list)
{
- ReleaseMappingTables();
-
- size_t numberOfOldIndexes = _oldIndexes.size();
- for (uint32_t i = 0; i < numberOfOldIndexes; i++)
- {
- OldIndex &oi = *_oldIndexes[i];
- WordNumMapping &wordNumMapping = oi.getWordNumMapping();
+ for (const auto & oi : _oldIndexes) {
std::vector<uint32_t> oldIndexes;
const Schema &oldSchema = oi.getSchema();
- if (!SchemaUtil::getIndexIds(oldSchema,
- DataType::STRING,
- oldIndexes))
- {
+ if (!SchemaUtil::getIndexIds(oldSchema, DataType::STRING, oldIndexes)) {
return false;
}
+ WordNumMapping &wordNumMapping = list[oi.getIndex()];
if (oldIndexes.empty()) {
wordNumMapping.noMappingFile();
continue;
@@ -418,7 +404,7 @@ Fusion::ReadMappingFiles(const SchemaUtil::IndexIterator *index)
}
// Open word mapping file
- vespalib::string old2newname = oi.getTmpPath() + "/old2new.dat";
+ vespalib::string old2newname = createTmpPath(dir, oi.getIndex()) + "/old2new.dat";
wordNumMapping.readMappingFile(old2newname, _tuneFileIndexing._read);
}
@@ -426,40 +412,20 @@ Fusion::ReadMappingFiles(const SchemaUtil::IndexIterator *index)
}
-bool
-Fusion::ReleaseMappingTables()
-{
- size_t numberOfOldIndexes = _oldIndexes.size();
- for (uint32_t i = 0; i < numberOfOldIndexes; i++)
- {
- OldIndex &oi = *_oldIndexes[i];
- oi.getWordNumMapping().clear();
- }
- return true;
-}
-
-
void
-Fusion::makeTmpDirs()
+Fusion::makeTmpDirs(const vespalib::string & dir)
{
- for (auto &i : getOldIndexes()) {
- OldIndex &oi = *i;
- // Make tmpindex directories
- const vespalib::string &tmpindexpath = oi.getTmpPath();
- vespalib::mkdir(tmpindexpath, false);
+ for (const auto & index : _oldIndexes) {
+ vespalib::mkdir(createTmpPath(dir, index.getIndex()), false);
}
}
bool
-Fusion::CleanTmpDirs()
+Fusion::cleanTmpDirs(const vespalib::string & dir)
{
uint32_t i = 0;
for (;;) {
- std::ostringstream tmpindexpath0;
- tmpindexpath0 << _outDir;
- tmpindexpath0 << "/tmpindex";
- tmpindexpath0 << i;
- const vespalib::string &tmpindexpath = tmpindexpath0.str();
+ vespalib::string tmpindexpath = createTmpPath(dir, i);
FastOS_StatInfo statInfo;
if (!FastOS_File::Stat(tmpindexpath.c_str(), &statInfo)) {
if (statInfo._error == FastOS_StatInfo::FileNotFound) {
@@ -472,12 +438,7 @@ Fusion::CleanTmpDirs()
}
while (i > 0) {
i--;
- // Remove tmpindex directories
- std::ostringstream tmpindexpath0;
- tmpindexpath0 << _outDir;
- tmpindexpath0 << "/tmpindex";
- tmpindexpath0 << i;
- const vespalib::string &tmpindexpath = tmpindexpath0.str();
+ vespalib::string tmpindexpath = createTmpPath(dir, i);
search::DirectoryTraverse dt(tmpindexpath.c_str());
if (!dt.RemoveTree()) {
LOG(error, "Failed to clean tmpdir %s", tmpindexpath.c_str());
@@ -491,30 +452,13 @@ Fusion::CleanTmpDirs()
bool
Fusion::checkSchemaCompat()
{
+ /* TODO: Check compatibility */
return true;
}
-
bool
Fusion::readSchemaFiles()
{
- OldIndexIterator oldIndexIt = _oldIndexes.begin();
- OldIndexIterator oldIndexIte = _oldIndexes.end();
-
- for (; oldIndexIt != oldIndexIte; ++oldIndexIt) {
- OldIndex &oi = **oldIndexIt;
- vespalib::string oldcfname = oi.getPath() + "/schema.txt";
- Schema::SP schema(new Schema);
- if (!schema->loadFromFile(oldcfname)) {
- return false;
- }
- if (!SchemaUtil::validateSchema(*_schema)) {
- return false;
- }
- oi.setSchema(schema);
- }
-
- /* TODO: Check compatibility */
bool res = checkSchemaCompat();
if (!res) {
LOG(error, "Index fusion cannot continue due to incompatible indexes");
@@ -522,15 +466,11 @@ Fusion::readSchemaFiles()
return res;
}
-
bool
-Fusion::merge(const Schema &schema,
- const vespalib::string &dir,
- const std::vector<vespalib::string> &sources,
- const SelectorArray &selector,
- bool dynamicKPosOccFormat,
- const TuneFileIndexing &tuneFileIndexing,
- const FileHeaderContext &fileHeaderContext)
+Fusion::merge(const Schema &schema, const vespalib::string &dir, const std::vector<vespalib::string> &sources,
+ const SelectorArray &selector, bool dynamicKPosOccFormat,
+ const TuneFileIndexing &tuneFileIndexing, const FileHeaderContext &fileHeaderContext,
+ vespalib::ThreadExecutor & executor)
{
assert(sources.size() <= 255);
uint32_t docIdLimit = selector.size();
@@ -563,46 +503,18 @@ Fusion::merge(const Schema &schema,
vespalib::mkdir(dir, false);
schema.saveToFile(dir + "/schema.txt");
if (!DocumentSummary::writeDocIdLimit(dir, trimmedDocIdLimit)) {
- LOG(error, "Could not write docsum count in dir %s: %s",
- dir.c_str(), getLastErrorString().c_str());
+ LOG(error, "Could not write docsum count in dir %s: %s", dir.c_str(), getLastErrorString().c_str());
return false;
}
- std::unique_ptr<Fusion> fusion(new Fusion(dynamicKPosOccFormat,
- tuneFileIndexing,
- fileHeaderContext));
- fusion->setSchema(&schema);
- fusion->setOutDir(dir);
- fusion->SetOldIndexList(sources);
- if (!fusion->readSchemaFiles()) {
- LOG(error, "Cannot read schema files for source indexes");
+ try {
+ auto fusion = std::make_unique<Fusion>(trimmedDocIdLimit, schema, dir, sources, selector,
+ dynamicKPosOccFormat, tuneFileIndexing, fileHeaderContext);
+ return fusion->mergeFields(executor);
+ } catch (const std::exception & e) {
+ LOG(error, "%s", e.what());
return false;
}
- uint32_t idx = 0;
- std::vector<std::shared_ptr<OldIndex> > &oldIndexes =
- fusion->getOldIndexes();
-
- for (OldIndexIterator i = oldIndexes.begin(), ie = oldIndexes.end();
- i != ie; ++i, ++idx) {
- OldIndex &oi = **i;
- // Make tmpindex directories
- const vespalib::string &tmpindexpath = oi.getTmpPath();
- vespalib::mkdir(tmpindexpath, false);
- DocIdMapping &docIdMapping = oi.getDocIdMapping();
- if (!docIdMapping.readDocIdLimit(oi.getPath())) {
- LOG(error, "Cannot determine docIdLimit for old index \"%s\"",
- oi.getPath().c_str());
- return false;
- }
- docIdMapping.setup(docIdMapping._docIdLimit,
- &selector,
- idx);
- }
- fusion->setDocIdLimit(trimmedDocIdLimit);
- if (!fusion->mergeFields()) {
- return false;
- }
- return true;
}
}
diff --git a/searchlib/src/vespa/searchlib/diskindex/fusion.h b/searchlib/src/vespa/searchlib/diskindex/fusion.h
index 520aa7b2203..53f9db75758 100644
--- a/searchlib/src/vespa/searchlib/diskindex/fusion.h
+++ b/searchlib/src/vespa/searchlib/diskindex/fusion.h
@@ -6,15 +6,11 @@
#include "wordnummapper.h"
#include <vespa/searchlib/index/schemautil.h>
-#include <vector>
-#include <string>
+#include <vespa/vespalib/util/threadexecutor.h>
namespace search { template <class IN> class PostingPriorityQueue; }
-
-namespace search::common {
- class TuneFileIndexing;
- class FileHeaderContext;
-}
+namespace search { class TuneFileIndexing; }
+namespace search::common { class FileHeaderContext; }
namespace search::diskindex {
@@ -24,139 +20,77 @@ class DictionaryWordReader;
class FusionInputIndex
{
-public:
- typedef diskindex::WordNumMapping WordNumMapping;
- typedef diskindex::DocIdMapping DocIdMapping;
-private:
- vespalib::string _path;
- WordNumMapping _wordNumMapping;
- DocIdMapping _docIdMapping;
- vespalib::string _tmpPath;
- index::Schema::SP _schema;
+private :
+ vespalib::string _path;
+ uint32_t _index;
+ index::Schema _schema;
+ DocIdMapping _docIdMapping;
public:
- FusionInputIndex()
- : _path(),
- _wordNumMapping(),
- _docIdMapping(),
- _tmpPath(),
- _schema()
- {
- }
-
- virtual ~FusionInputIndex() {}
-
- void setPath(const vespalib::string &path) { _path = path; }
+ FusionInputIndex(const vespalib::string &path, uint32_t index, const SelectorArray & selector);
+ FusionInputIndex(FusionInputIndex &&) = default;
+ FusionInputIndex & operator = (FusionInputIndex &&) = default;
+ ~FusionInputIndex();
+
const vespalib::string & getPath() const { return _path; }
- void setTmpPath(const vespalib::string &tmpPath) { _tmpPath = tmpPath; }
- const vespalib::string &getTmpPath() const { return _tmpPath; }
- const WordNumMapping & getWordNumMapping() const { return _wordNumMapping; }
- WordNumMapping & getWordNumMapping() { return _wordNumMapping; }
+ uint32_t getIndex() const { return _index; }
const DocIdMapping & getDocIdMapping() const { return _docIdMapping; }
-
- DocIdMapping & getDocIdMapping() { return _docIdMapping; }
-
- const index::Schema &getSchema() const {
- assert(_schema);
- return *_schema;
- }
-
- void setSchema(const index::Schema::SP &schema);
+ const index::Schema &getSchema() const { return _schema; }
};
class Fusion
{
-public:
- typedef search::index::Schema Schema;
- typedef search::index::SchemaUtil SchemaUtil;
-
private:
- Fusion(const Fusion &);
- Fusion& operator=(const Fusion &);
-
-public:
- Fusion(bool dynamicKPosIndexFormat,
- const TuneFileIndexing &tuneFileIndexing,
- const search::common::FileHeaderContext &fileHeaderContext);
+ using Schema = index::Schema;
+ using SchemaUtil = index::SchemaUtil;
+ using WordNumMappingList = std::vector<WordNumMapping>;
- virtual ~Fusion();
-
- void SetOldIndexList(const std::vector<vespalib::string> &oldIndexList);
- bool mergeFields();
+ bool mergeFields(vespalib::ThreadExecutor & executor);
bool mergeField(uint32_t id);
- bool openInputFieldReaders(const SchemaUtil::IndexIterator &index,
- std::vector<std::unique_ptr<FieldReader> > &
- readers);
+ bool openInputFieldReaders(const SchemaUtil::IndexIterator &index, const WordNumMappingList & list,
+ std::vector<std::unique_ptr<FieldReader> > & readers);
bool openFieldWriter(const SchemaUtil::IndexIterator &index, FieldWriter & writer);
bool setupMergeHeap(const std::vector<std::unique_ptr<FieldReader> > & readers,
FieldWriter &writer, PostingPriorityQueue<FieldReader> &heap);
- bool mergeFieldPostings(const SchemaUtil::IndexIterator &index);
- bool openInputWordReaders(const SchemaUtil::IndexIterator &index,
+ bool mergeFieldPostings(const SchemaUtil::IndexIterator &index, const WordNumMappingList & list, uint64_t numWordIds);
+ bool openInputWordReaders(const vespalib::string & dir, const SchemaUtil::IndexIterator &index,
std::vector<std::unique_ptr<DictionaryWordReader> > &readers,
PostingPriorityQueue<DictionaryWordReader> &heap);
- bool renumberFieldWordIds(const SchemaUtil::IndexIterator &index);
- void setSchema(const Schema *schema);
- void setOutDir(const vespalib::string &outDir);
- void makeTmpDirs();
- bool CleanTmpDirs();
+ bool renumberFieldWordIds(const vespalib::string & dir, const SchemaUtil::IndexIterator &index,
+ WordNumMappingList & list, uint64_t & numWordIds);
+ void makeTmpDirs(const vespalib::string & dir);
+ bool cleanTmpDirs(const vespalib::string & dir);
bool readSchemaFiles();
bool checkSchemaCompat();
template <class Reader, class Writer>
- static bool
- selectCookedOrRawFeatures(Reader &reader, Writer &writer);
-
-protected:
- bool ReadMappingFiles(const SchemaUtil::IndexIterator *index);
- bool ReleaseMappingTables();
-protected:
-
- typedef FusionInputIndex OldIndex;
-
- const Schema *_schema; // External ownership
- std::vector<std::shared_ptr<OldIndex> > _oldIndexes;
- typedef std::vector<std::shared_ptr<OldIndex> >::iterator
- OldIndexIterator;
-
- // OUTPUT:
+ static bool selectCookedOrRawFeatures(Reader &reader, Writer &writer);
- uint32_t _docIdLimit;
- uint64_t _numWordIds;
+ bool readMappingFiles(const vespalib::string & dir, const SchemaUtil::IndexIterator *index, WordNumMappingList & list);
+ const Schema &getSchema() const { return _schema; }
- // Index format parameters.
- bool _dynamicKPosIndexFormat;
+ const Schema &_schema; // External ownership
+ std::vector<FusionInputIndex> _oldIndexes;
+ const uint32_t _docIdLimit;
+ const bool _dynamicKPosIndexFormat;
+ vespalib::string _outDir;
- // Index location parameters
-
- /*
- * Output location
- */
- vespalib::string _outDir;
-
- const TuneFileIndexing &_tuneFileIndexing;
+ const TuneFileIndexing &_tuneFileIndexing;
const common::FileHeaderContext &_fileHeaderContext;
-
- const Schema &getSchema() const {
- assert(_schema != nullptr);
- return *_schema;
- }
public:
+ Fusion(const Fusion &) = delete;
+ Fusion& operator=(const Fusion &) = delete;
+ Fusion(uint32_t docIdLimit, const Schema &schema, const vespalib::string &dir,
+ const std::vector<vespalib::string> & sources, const SelectorArray &selector, bool dynamicKPosIndexFormat,
+ const TuneFileIndexing &tuneFileIndexing, const common::FileHeaderContext &fileHeaderContext);
- void setDocIdLimit(uint32_t docIdLimit) { _docIdLimit = docIdLimit; }
- std::vector<std::shared_ptr<OldIndex> > & getOldIndexes() { return _oldIndexes; }
- virtual OldIndex *allocOldIndex() { return new OldIndex; }
-
- /**
- * This method is used by new indexing pipeline to merge indexes.
- */
- static bool merge(const Schema &schema,
- const vespalib::string &dir,
- const std::vector<vespalib::string> &sources,
- const SelectorArray &docIdSelector,
- bool dynamicKPosOccFormat,
- const TuneFileIndexing &tuneFileIndexing,
- const common::FileHeaderContext &fileHeaderContext);
+ ~Fusion();
+
+ static bool
+ merge(const Schema &schema, const vespalib::string &dir, const std::vector<vespalib::string> &sources,
+ const SelectorArray &docIdSelector, bool dynamicKPosOccFormat, const TuneFileIndexing &tuneFileIndexing,
+ const common::FileHeaderContext &fileHeaderContext, vespalib::ThreadExecutor & executor);
};
}