aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGeir Storli <geirst@yahooinc.com>2022-01-10 13:38:22 +0100
committerGitHub <noreply@github.com>2022-01-10 13:38:22 +0100
commit6fc8a76d2062e6176510804be59faffe2e20662d (patch)
treeda5b4e701726fa30ab400c5e1aced5d60d589be7
parentc0c25b25dc02315e3d5bc09a720db8f02e42e2b2 (diff)
parent3a8c44513c56d4ce618eeadf8733fb0f8c59fc3b (diff)
Merge pull request #20717 from vespa-engine/toregge/simplify-use-of-search-diskindex-fusion-class
Simplify use of search::diskindex::Fusion class.
-rw-r--r--searchcore/src/tests/proton/feed_and_search/feed_and_search.cpp61
-rw-r--r--searchcore/src/vespa/searchcore/proton/index/indexmanager.cpp6
-rw-r--r--searchlib/src/tests/diskindex/fusion/.gitignore5
-rw-r--r--searchlib/src/tests/diskindex/fusion/fusion_test.cpp66
-rw-r--r--searchlib/src/vespa/searchlib/diskindex/field_merger.cpp41
-rw-r--r--searchlib/src/vespa/searchlib/diskindex/field_merger.h5
-rw-r--r--searchlib/src/vespa/searchlib/diskindex/fusion.cpp72
-rw-r--r--searchlib/src/vespa/searchlib/diskindex/fusion.h16
-rw-r--r--searchlib/src/vespa/searchlib/diskindex/fusion_input_index.cpp21
-rw-r--r--searchlib/src/vespa/searchlib/diskindex/fusion_input_index.h10
-rw-r--r--searchlib/src/vespa/searchlib/diskindex/fusion_output_index.cpp5
-rw-r--r--searchlib/src/vespa/searchlib/diskindex/fusion_output_index.h10
-rw-r--r--searchlib/src/vespa/searchlib/test/fakedata/fakememtreeocc.cpp7
-rw-r--r--searchlib/src/vespa/searchlib/util/posting_priority_queue.h14
-rw-r--r--searchlib/src/vespa/searchlib/util/posting_priority_queue.hpp13
-rw-r--r--searchlib/src/vespa/searchlib/util/posting_priority_queue_merger.h21
-rw-r--r--searchlib/src/vespa/searchlib/util/posting_priority_queue_merger.hpp60
17 files changed, 247 insertions, 186 deletions
diff --git a/searchcore/src/tests/proton/feed_and_search/feed_and_search.cpp b/searchcore/src/tests/proton/feed_and_search/feed_and_search.cpp
index 855b31310a3..aff368ceced 100644
--- a/searchcore/src/tests/proton/feed_and_search/feed_and_search.cpp
+++ b/searchcore/src/tests/proton/feed_and_search/feed_and_search.cpp
@@ -197,16 +197,16 @@ void Test::requireThatMemoryIndexCanBeDumpedAndSearched() {
bool fret1 = DocumentSummary::readDocIdLimit(index_dir, fusionDocIdLimit);
ASSERT_TRUE(fret1);
SelectorArray selector(fusionDocIdLimit, 0);
- bool fret2 = Fusion::merge(schema,
- index_dir2,
- fusionInputs,
- selector,
- false /* dynamicKPosOccFormat */,
- tuneFileIndexing,
- fileHeaderContext,
- sharedExecutor,
- std::make_shared<FlushToken>());
- ASSERT_TRUE(fret2);
+ {
+ Fusion fusion(schema,
+ index_dir2,
+ fusionInputs,
+ selector,
+ tuneFileIndexing,
+ fileHeaderContext);
+ bool fret2 = fusion.merge(sharedExecutor, std::make_shared<FlushToken>());
+ ASSERT_TRUE(fret2);
+ }
// Fusion test with all docs removed in output (doesn't affect word list)
const string index_dir3 = "test_index3";
@@ -216,16 +216,16 @@ void Test::requireThatMemoryIndexCanBeDumpedAndSearched() {
bool fret3 = DocumentSummary::readDocIdLimit(index_dir, fusionDocIdLimit);
ASSERT_TRUE(fret3);
SelectorArray selector2(fusionDocIdLimit, 1);
- bool fret4 = Fusion::merge(schema,
- index_dir3,
- fusionInputs,
- selector2,
- false /* dynamicKPosOccFormat */,
- tuneFileIndexing,
- fileHeaderContext,
- sharedExecutor,
- std::make_shared<FlushToken>());
- ASSERT_TRUE(fret4);
+ {
+ Fusion fusion(schema,
+ index_dir3,
+ fusionInputs,
+ selector2,
+ tuneFileIndexing,
+ fileHeaderContext);
+ bool fret4 = fusion.merge(sharedExecutor, std::make_shared<FlushToken>());
+ ASSERT_TRUE(fret4);
+ }
// Fusion test with all docs removed in input (affects word list)
const string index_dir4 = "test_index4";
@@ -235,16 +235,17 @@ void Test::requireThatMemoryIndexCanBeDumpedAndSearched() {
bool fret5 = DocumentSummary::readDocIdLimit(index_dir3, fusionDocIdLimit);
ASSERT_TRUE(fret5);
SelectorArray selector3(fusionDocIdLimit, 0);
- bool fret6 = Fusion::merge(schema,
- index_dir4,
- fusionInputs,
- selector3,
- false /* dynamicKPosOccFormat */,
- tuneFileIndexing,
- fileHeaderContext,
- sharedExecutor,
- std::make_shared<FlushToken>());
- ASSERT_TRUE(fret6);
+ {
+ Fusion fusion(schema,
+ index_dir4,
+ fusionInputs,
+ selector3,
+ tuneFileIndexing,
+ fileHeaderContext);
+ bool fret6 = fusion.merge(sharedExecutor,
+ std::make_shared<FlushToken>());
+ ASSERT_TRUE(fret6);
+ }
DiskIndex disk_index(index_dir);
ASSERT_TRUE(disk_index.setup(TuneFileSearch()));
diff --git a/searchcore/src/vespa/searchcore/proton/index/indexmanager.cpp b/searchcore/src/vespa/searchcore/proton/index/indexmanager.cpp
index bfca455217c..251d0475537 100644
--- a/searchcore/src/vespa/searchcore/proton/index/indexmanager.cpp
+++ b/searchcore/src/vespa/searchcore/proton/index/indexmanager.cpp
@@ -69,9 +69,9 @@ IndexManager::MaintainerOperations::runFusion(const Schema &schema,
std::shared_ptr<IFlushToken> flush_token)
{
SerialNumFileHeaderContext fileHeaderContext(_fileHeaderContext, serialNum);
- const bool dynamic_k_doc_pos_occ_format = false;
- return Fusion::merge(schema, outputDir, sources, selectorArray, dynamic_k_doc_pos_occ_format,
- _tuneFileIndexing, fileHeaderContext, _threadingService.shared(), std::move(flush_token));
+ Fusion fusion(schema, outputDir, sources, selectorArray,
+ _tuneFileIndexing, fileHeaderContext);
+ return fusion.merge(_threadingService.shared(), std::move(flush_token));
}
diff --git a/searchlib/src/tests/diskindex/fusion/.gitignore b/searchlib/src/tests/diskindex/fusion/.gitignore
index d9a33665c43..934c9efb8fa 100644
--- a/searchlib/src/tests/diskindex/fusion/.gitignore
+++ b/searchlib/src/tests/diskindex/fusion/.gitignore
@@ -21,10 +21,7 @@ mdump2
mdump3
mdump4
mdump5
-sdump2
-sdump3
-sdump4
-sdump5
+sdump[2-6]
/ddump6
/dmdump6
/dump6
diff --git a/searchlib/src/tests/diskindex/fusion/fusion_test.cpp b/searchlib/src/tests/diskindex/fusion/fusion_test.cpp
index 6794b9c0f5c..72867edf474 100644
--- a/searchlib/src/tests/diskindex/fusion/fusion_test.cpp
+++ b/searchlib/src/tests/diskindex/fusion/fusion_test.cpp
@@ -24,7 +24,7 @@
#include <vespa/vespalib/util/destructor_callbacks.h>
#include <vespa/vespalib/util/threadstackexecutor.h>
#include <vespa/vespalib/util/sequencedtaskexecutor.h>
-#include <gtest/gtest.h>
+#include <vespa/vespalib/gtest/gtest.h>
#include <vespa/log/log.h>
LOG_SETUP("fusion_test");
@@ -65,7 +65,7 @@ protected:
Schema _schema;
const Schema & getSchema() const { return _schema; }
- void requireThatFusionIsWorking(const vespalib::string &prefix, bool directio, bool readmmap);
+ void requireThatFusionIsWorking(const vespalib::string &prefix, bool directio, bool readmmap, bool force_short_merge_chunk);
void make_simple_index(const vespalib::string &dump_dir, const IFieldLengthInspector &field_length_inspector);
bool try_merge_simple_indexes(const vespalib::string &dump_dir, const std::vector<vespalib::string> &sources, std::shared_ptr<IFlushToken> flush_token);
void merge_simple_indexes(const vespalib::string &dump_dir, const std::vector<vespalib::string> &sources);
@@ -292,7 +292,7 @@ VESPA_THREAD_STACK_TAG(invert_executor)
VESPA_THREAD_STACK_TAG(push_executor)
void
-FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool directio, bool readmmap)
+FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool directio, bool readmmap, bool force_small_merge_chunk)
{
Schema schema;
Schema schema2;
@@ -357,7 +357,6 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire
ib.setPrefix(dump2dir);
uint32_t numDocs = 12 + 1;
uint32_t numWords = fic.getNumUniqueWords();
- bool dynamicKPosOcc = false;
MockFieldLengthInspector mock_field_length_inspector;
TuneFileIndexing tuneFileIndexing;
TuneFileSearch tuneFileSearch;
@@ -392,9 +391,10 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire
std::vector<vespalib::string> sources;
SelectorArray selector(numDocs, 0);
sources.push_back(prefix + "dump2");
- ASSERT_TRUE(Fusion::merge(schema, prefix + "dump3", sources, selector,
- dynamicKPosOcc,
- tuneFileIndexing,fileHeaderContext, executor, std::make_shared<FlushToken>()));
+ Fusion fusion(schema, prefix + "dump3", sources, selector,
+ tuneFileIndexing,fileHeaderContext);
+ fusion.set_force_small_merge_chunk(force_small_merge_chunk);
+ ASSERT_TRUE(fusion.merge(executor, std::make_shared<FlushToken>()));
} while (0);
do {
DiskIndex dw3(prefix + "dump3");
@@ -405,9 +405,10 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire
std::vector<vespalib::string> sources;
SelectorArray selector(numDocs, 0);
sources.push_back(prefix + "dump3");
- ASSERT_TRUE(Fusion::merge(schema2, prefix + "dump4", sources, selector,
- dynamicKPosOcc,
- tuneFileIndexing, fileHeaderContext, executor, std::make_shared<FlushToken>()));
+ Fusion fusion(schema2, prefix + "dump4", sources, selector,
+ tuneFileIndexing, fileHeaderContext);
+ fusion.set_force_small_merge_chunk(force_small_merge_chunk);
+ ASSERT_TRUE(fusion.merge(executor, std::make_shared<FlushToken>()));
} while (0);
do {
DiskIndex dw4(prefix + "dump4");
@@ -418,9 +419,10 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire
std::vector<vespalib::string> sources;
SelectorArray selector(numDocs, 0);
sources.push_back(prefix + "dump3");
- ASSERT_TRUE(Fusion::merge(schema3, prefix + "dump5", sources, selector,
- dynamicKPosOcc,
- tuneFileIndexing, fileHeaderContext, executor, std::make_shared<FlushToken>()));
+ Fusion fusion(schema3, prefix + "dump5", sources, selector,
+ tuneFileIndexing, fileHeaderContext);
+ fusion.set_force_small_merge_chunk(force_small_merge_chunk);
+ ASSERT_TRUE(fusion.merge(executor, std::make_shared<FlushToken>()));
} while (0);
do {
DiskIndex dw5(prefix + "dump5");
@@ -431,9 +433,11 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire
std::vector<vespalib::string> sources;
SelectorArray selector(numDocs, 0);
sources.push_back(prefix + "dump3");
- ASSERT_TRUE(Fusion::merge(schema, prefix + "dump6", sources, selector,
- !dynamicKPosOcc,
- tuneFileIndexing, fileHeaderContext, executor, std::make_shared<FlushToken>()));
+ Fusion fusion(schema, prefix + "dump6", sources, selector,
+ tuneFileIndexing, fileHeaderContext);
+ fusion.set_dynamic_k_pos_index_format(true);
+ fusion.set_force_small_merge_chunk(force_small_merge_chunk);
+ ASSERT_TRUE(fusion.merge(executor, std::make_shared<FlushToken>()));
} while (0);
do {
DiskIndex dw6(prefix + "dump6");
@@ -444,9 +448,10 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire
std::vector<vespalib::string> sources;
SelectorArray selector(numDocs, 0);
sources.push_back(prefix + "dump2");
- ASSERT_TRUE(Fusion::merge(schema, prefix + "dump3", sources, selector,
- dynamicKPosOcc,
- tuneFileIndexing, fileHeaderContext, executor, std::make_shared<FlushToken>()));
+ Fusion fusion(schema, prefix + "dump3", sources, selector,
+ tuneFileIndexing, fileHeaderContext);
+ fusion.set_force_small_merge_chunk(force_small_merge_chunk);
+ ASSERT_TRUE(fusion.merge(executor, std::make_shared<FlushToken>()));
} while (0);
do {
DiskIndex dw3(prefix + "dump3");
@@ -487,9 +492,9 @@ FusionTest::try_merge_simple_indexes(const vespalib::string &dump_dir, const std
TuneFileIndexing tuneFileIndexing;
DummyFileHeaderContext fileHeaderContext;
SelectorArray selector(20, 0);
- return Fusion::merge(_schema, dump_dir, sources, selector,
- false,
- tuneFileIndexing, fileHeaderContext, executor, flush_token);
+ Fusion fusion(_schema, dump_dir, sources, selector,
+ tuneFileIndexing, fileHeaderContext);
+ return fusion.merge(executor, flush_token);
}
void
@@ -506,22 +511,27 @@ FusionTest::FusionTest()
TEST_F(FusionTest, require_that_normal_fusion_is_working)
{
- requireThatFusionIsWorking("", false, false);
+ requireThatFusionIsWorking("", false, false, false);
}
TEST_F(FusionTest, require_that_directio_fusion_is_working)
{
- requireThatFusionIsWorking("d", true, false);
+ requireThatFusionIsWorking("d", true, false, false);
}
TEST_F(FusionTest, require_that_mmap_fusion_is_working)
{
- requireThatFusionIsWorking("m", false, true);
+ requireThatFusionIsWorking("m", false, true, false);
}
TEST_F(FusionTest, require_that_directiommap_fusion_is_working)
{
- requireThatFusionIsWorking("dm", true, true);
+ requireThatFusionIsWorking("dm", true, true, false);
+}
+
+TEST_F(FusionTest, require_that_small_merge_chunk_fusion_is_working)
+{
+ requireThatFusionIsWorking("s", false, false, true);
}
namespace {
@@ -608,11 +618,11 @@ TEST_F(FusionTest, require_that_fusion_can_be_stopped)
vespalib::rmdir("stopdump3", true);
flush_token = std::make_shared<MyFlushToken>(1);
ASSERT_FALSE(try_merge_simple_indexes("stopdump3", {"stopdump2"}, flush_token));
- EXPECT_EQ(12, flush_token->get_checks());
+ EXPECT_EQ(8, flush_token->get_checks());
vespalib::rmdir("stopdump3", true);
flush_token = std::make_shared<MyFlushToken>(47);
ASSERT_FALSE(try_merge_simple_indexes("stopdump3", {"stopdump2"}, flush_token));
- EXPECT_LT(48, flush_token->get_checks());
+ EXPECT_LE(48, flush_token->get_checks());
clean_stopped_fusion_testdirs();
}
diff --git a/searchlib/src/vespa/searchlib/diskindex/field_merger.cpp b/searchlib/src/vespa/searchlib/diskindex/field_merger.cpp
index 6179f06c9da..68672a0a930 100644
--- a/searchlib/src/vespa/searchlib/diskindex/field_merger.cpp
+++ b/searchlib/src/vespa/searchlib/diskindex/field_merger.cpp
@@ -39,6 +39,11 @@ namespace search::diskindex {
namespace {
+constexpr uint32_t renumber_word_ids_heap_limit = 4;
+constexpr uint32_t renumber_word_ids_merge_chunk = 1000000;
+constexpr uint32_t merge_postings_heap_limit = 4;
+constexpr uint32_t merge_postings_merge_chunk = 50000;
+
vespalib::string
createTmpPath(const vespalib::string & base, uint32_t index) {
vespalib::asciistream os;
@@ -174,18 +179,20 @@ FieldMerger::renumber_word_ids_start()
return false;
}
_word_aggregator = std::make_unique<WordAggregator>();
+ _word_heap->setup(renumber_word_ids_heap_limit);
+ _word_heap->set_merge_chunk(_fusion_out_index.get_force_small_merge_chunk() ? 1u : renumber_word_ids_merge_chunk);
return true;
}
-bool
+void
FieldMerger::renumber_word_ids_main()
{
- _word_heap->merge(*_word_aggregator, 4, *_flush_token);
+ _word_heap->merge(*_word_aggregator, *_flush_token);
if (_flush_token->stop_requested()) {
- return false;
+ _failed = true;
+ } else if (_word_heap->empty()) {
+ _state = State::RENUMBER_WORD_IDS_FINISH;
}
- assert(_word_heap->empty());
- return true;
}
bool
@@ -356,6 +363,8 @@ FieldMerger::setup_merge_heap()
_heap->initialAdd(reader.get());
}
}
+ _heap->setup(merge_postings_heap_limit);
+ _heap->set_merge_chunk(_fusion_out_index.get_force_small_merge_chunk() ? 1u : merge_postings_merge_chunk);
return true;
}
@@ -374,15 +383,15 @@ FieldMerger::merge_postings_start()
return setup_merge_heap();
}
-bool
+void
FieldMerger::merge_postings_main()
{
- _heap->merge(*_writer, 4, *_flush_token);
+ _heap->merge(*_writer, *_flush_token);
if (_flush_token->stop_requested()) {
- return false;
+ _failed = true;
+ } else if (_heap->empty()) {
+ _state = State::MERGE_POSTINGS_FINISH;
}
- assert(_heap->empty());
- return true;
}
bool
@@ -475,11 +484,7 @@ FieldMerger::process_merge_field()
merge_field_start();
break;
case State::RENUMBER_WORD_IDS:
- if (!renumber_word_ids_main()) {
- renumber_word_ids_failed();
- } else {
- _state = State::RENUMBER_WORD_IDS_FINISH;
- }
+ renumber_word_ids_main();
break;
case State::RENUMBER_WORD_IDS_FINISH:
if (!renumber_word_ids_finish()) {
@@ -491,11 +496,7 @@ FieldMerger::process_merge_field()
}
break;
case State::MERGE_POSTINGS:
- if (!merge_postings_main()) {
- merge_postings_failed();
- } else {
- _state = State::MERGE_POSTINGS_FINISH;
- }
+ merge_postings_main();
break;
case State::MERGE_POSTINGS_FINISH:
merge_field_finish();
diff --git a/searchlib/src/vespa/searchlib/diskindex/field_merger.h b/searchlib/src/vespa/searchlib/diskindex/field_merger.h
index c5ce337e845..5017a7d5192 100644
--- a/searchlib/src/vespa/searchlib/diskindex/field_merger.h
+++ b/searchlib/src/vespa/searchlib/diskindex/field_merger.h
@@ -53,13 +53,14 @@ class FieldMerger
std::unique_ptr<FieldWriter> _writer;
State _state;
bool _failed;
+ bool _force_small_merge_chunk;
void make_tmp_dirs();
bool clean_tmp_dirs();
bool open_input_word_readers();
bool read_mapping_files();
bool renumber_word_ids_start();
- bool renumber_word_ids_main();
+ void renumber_word_ids_main();
bool renumber_word_ids_finish();
void renumber_word_ids_failed();
std::shared_ptr<FieldLengthScanner> allocate_field_length_scanner();
@@ -68,7 +69,7 @@ class FieldMerger
bool select_cooked_or_raw_features(FieldReader& reader);
bool setup_merge_heap();
bool merge_postings_start();
- bool merge_postings_main();
+ void merge_postings_main();
bool merge_postings_finish();
void merge_postings_failed();
public:
diff --git a/searchlib/src/vespa/searchlib/diskindex/fusion.cpp b/searchlib/src/vespa/searchlib/diskindex/fusion.cpp
index 1afed18cb48..12552f09027 100644
--- a/searchlib/src/vespa/searchlib/diskindex/fusion.cpp
+++ b/searchlib/src/vespa/searchlib/diskindex/fusion.cpp
@@ -34,6 +34,7 @@ namespace {
std::vector<FusionInputIndex>
createInputIndexes(const std::vector<vespalib::string> & sources, const SelectorArray &selector)
{
+ assert(sources.size() <= 255); // due to source selector data type
std::vector<FusionInputIndex> indexes;
indexes.reserve(sources.size());
uint32_t i = 0;
@@ -43,17 +44,28 @@ createInputIndexes(const std::vector<vespalib::string> & sources, const Selector
return indexes;
}
+uint32_t calc_trimmed_doc_id_limit(const SelectorArray& selector, const std::vector<vespalib::string>& sources)
+{
+ uint32_t docIdLimit = selector.size();
+ uint32_t trimmed_doc_id_limit = docIdLimit;
+
+ // Limit docIdLimit in output based on selections that cannot be satisfied
+ uint32_t sources_size = sources.size();
+ while (trimmed_doc_id_limit > 0 && selector[trimmed_doc_id_limit - 1] >= sources_size) {
+ --trimmed_doc_id_limit;
+ }
+ return trimmed_doc_id_limit;
+}
+
}
-Fusion::Fusion(uint32_t docIdLimit, const Schema & schema, const vespalib::string & dir,
- const std::vector<vespalib::string> & sources, const SelectorArray &selector,
- bool dynamicKPosIndexFormat, const TuneFileIndexing &tuneFileIndexing,
- const FileHeaderContext &fileHeaderContext)
- : _fusion_out_index(schema, dir, createInputIndexes(sources, selector), docIdLimit, dynamicKPosIndexFormat, tuneFileIndexing, fileHeaderContext)
+Fusion::Fusion(const Schema& schema, const vespalib::string& dir,
+ const std::vector<vespalib::string>& sources, const SelectorArray& selector,
+ const TuneFileIndexing& tuneFileIndexing,
+ const FileHeaderContext& fileHeaderContext)
+ : _old_indexes(createInputIndexes(sources, selector)),
+ _fusion_out_index(schema, dir, _old_indexes, calc_trimmed_doc_id_limit(selector, sources), tuneFileIndexing, fileHeaderContext)
{
- if (!readSchemaFiles()) {
- throw IllegalArgumentException("Cannot read schema files for source indexes");
- }
}
Fusion::~Fusion() = default;
@@ -102,51 +114,41 @@ Fusion::readSchemaFiles()
}
bool
-Fusion::merge(const Schema &schema, const vespalib::string &dir, const std::vector<vespalib::string> &sources,
- const SelectorArray &selector, bool dynamicKPosOccFormat,
- const TuneFileIndexing &tuneFileIndexing, const FileHeaderContext &fileHeaderContext,
- vespalib::ThreadExecutor & executor,
- std::shared_ptr<IFlushToken> flush_token)
+Fusion::merge(vespalib::ThreadExecutor& executor, std::shared_ptr<IFlushToken> flush_token)
{
- assert(sources.size() <= 255);
- uint32_t docIdLimit = selector.size();
- uint32_t trimmedDocIdLimit = docIdLimit;
-
- // Limit docIdLimit in output based on selections that cannot be satisfied
- uint32_t sourcesSize = sources.size();
- while (trimmedDocIdLimit > 0 && selector[trimmedDocIdLimit - 1] >= sourcesSize) {
- --trimmedDocIdLimit;
- }
-
FastOS_StatInfo statInfo;
- if (!FastOS_File::Stat(dir.c_str(), &statInfo)) {
+ if (!FastOS_File::Stat(_fusion_out_index.get_path().c_str(), &statInfo)) {
if (statInfo._error != FastOS_StatInfo::FileNotFound) {
- LOG(error, "Could not stat \"%s\"", dir.c_str());
+ LOG(error, "Could not stat \"%s\"", _fusion_out_index.get_path().c_str());
return false;
}
} else {
if (!statInfo._isDirectory) {
- LOG(error, "\"%s\" is not a directory", dir.c_str());
+ LOG(error, "\"%s\" is not a directory", _fusion_out_index.get_path().c_str());
return false;
}
- search::DirectoryTraverse dt(dir.c_str());
+ search::DirectoryTraverse dt(_fusion_out_index.get_path().c_str());
if (!dt.RemoveTree()) {
- LOG(error, "Failed to clean directory \"%s\"", dir.c_str());
+ LOG(error, "Failed to clean directory \"%s\"", _fusion_out_index.get_path().c_str());
return false;
}
}
- vespalib::mkdir(dir, false);
- schema.saveToFile(dir + "/schema.txt");
- if (!DocumentSummary::writeDocIdLimit(dir, trimmedDocIdLimit)) {
- LOG(error, "Could not write docsum count in dir %s: %s", dir.c_str(), getLastErrorString().c_str());
+ vespalib::mkdir(_fusion_out_index.get_path(), false);
+ _fusion_out_index.get_schema().saveToFile(_fusion_out_index.get_path() + "/schema.txt");
+ if (!DocumentSummary::writeDocIdLimit(_fusion_out_index.get_path(), _fusion_out_index.get_doc_id_limit())) {
+ LOG(error, "Could not write docsum count in dir %s: %s", _fusion_out_index.get_path().c_str(), getLastErrorString().c_str());
return false;
}
try {
- auto fusion = std::make_unique<Fusion>(trimmedDocIdLimit, schema, dir, sources, selector,
- dynamicKPosOccFormat, tuneFileIndexing, fileHeaderContext);
- return fusion->mergeFields(executor, flush_token);
+ for (auto& old_index : _old_indexes) {
+ old_index.setup();
+ }
+ if (!readSchemaFiles()) {
+ throw IllegalArgumentException("Cannot read schema files for source indexes");
+ }
+ return mergeFields(executor, flush_token);
} catch (const std::exception & e) {
LOG(error, "%s", e.what());
return false;
diff --git a/searchlib/src/vespa/searchlib/diskindex/fusion.h b/searchlib/src/vespa/searchlib/diskindex/fusion.h
index 7e0b70dca36..1f5c4471950 100644
--- a/searchlib/src/vespa/searchlib/diskindex/fusion.h
+++ b/searchlib/src/vespa/searchlib/diskindex/fusion.h
@@ -31,21 +31,19 @@ private:
const Schema &getSchema() const { return _fusion_out_index.get_schema(); }
+ std::vector<FusionInputIndex> _old_indexes;
FusionOutputIndex _fusion_out_index;
public:
Fusion(const Fusion &) = delete;
Fusion& operator=(const Fusion &) = delete;
- Fusion(uint32_t docIdLimit, const Schema &schema, const vespalib::string &dir,
- const std::vector<vespalib::string> & sources, const SelectorArray &selector, bool dynamicKPosIndexFormat,
- const TuneFileIndexing &tuneFileIndexing, const common::FileHeaderContext &fileHeaderContext);
+ Fusion(const Schema& schema, const vespalib::string& dir,
+ const std::vector<vespalib::string>& sources, const SelectorArray& selector,
+ const TuneFileIndexing& tuneFileIndexing, const common::FileHeaderContext& fileHeaderContext);
~Fusion();
-
- static bool
- merge(const Schema &schema, const vespalib::string &dir, const std::vector<vespalib::string> &sources,
- const SelectorArray &docIdSelector, bool dynamicKPosOccFormat, const TuneFileIndexing &tuneFileIndexing,
- const common::FileHeaderContext &fileHeaderContext, vespalib::ThreadExecutor & executor,
- std::shared_ptr<IFlushToken> flush_token);
+ void set_dynamic_k_pos_index_format(bool dynamic_k_pos_index_format) { _fusion_out_index.set_dynamic_k_pos_index_format(dynamic_k_pos_index_format); }
+ void set_force_small_merge_chunk(bool force_small_merge_chunk) { _fusion_out_index.set_force_small_merge_chunk(force_small_merge_chunk); }
+ bool merge(vespalib::ThreadExecutor& executor, std::shared_ptr<IFlushToken> flush_token);
};
}
diff --git a/searchlib/src/vespa/searchlib/diskindex/fusion_input_index.cpp b/searchlib/src/vespa/searchlib/diskindex/fusion_input_index.cpp
index 278d095b639..51c365957d9 100644
--- a/searchlib/src/vespa/searchlib/diskindex/fusion_input_index.cpp
+++ b/searchlib/src/vespa/searchlib/diskindex/fusion_input_index.cpp
@@ -14,21 +14,28 @@ namespace search::diskindex {
FusionInputIndex::FusionInputIndex(const vespalib::string& path, uint32_t index, const SelectorArray& selector)
: _path(path),
_index(index),
- _schema()
+ _selector(&selector),
+ _schema(),
+ _docIdMapping()
{
- vespalib::string fname = path + "/schema.txt";
+}
+
+FusionInputIndex::~FusionInputIndex() = default;
+
+void
+FusionInputIndex::setup()
+{
+ vespalib::string fname = _path + "/schema.txt";
if ( ! _schema.loadFromFile(fname)) {
throw IllegalArgumentException(make_string("Failed loading schema %s", fname.c_str()));
}
if ( ! SchemaUtil::validateSchema(_schema)) {
throw IllegalArgumentException(make_string("Failed validating schema %s", fname.c_str()));
}
- if (!_docIdMapping.readDocIdLimit(path)) {
- throw IllegalArgumentException(make_string("Cannot determine docIdLimit for old index \"%s\"", path.c_str()));
+ if (!_docIdMapping.readDocIdLimit(_path)) {
+ throw IllegalArgumentException(make_string("Cannot determine docIdLimit for old index \"%s\"", _path.c_str()));
}
- _docIdMapping.setup(_docIdMapping._docIdLimit, &selector, index);
+ _docIdMapping.setup(_docIdMapping._docIdLimit, _selector, _index);
}
-FusionInputIndex::~FusionInputIndex() = default;
-
}
diff --git a/searchlib/src/vespa/searchlib/diskindex/fusion_input_index.h b/searchlib/src/vespa/searchlib/diskindex/fusion_input_index.h
index fd7bb8f0256..6606e00d73b 100644
--- a/searchlib/src/vespa/searchlib/diskindex/fusion_input_index.h
+++ b/searchlib/src/vespa/searchlib/diskindex/fusion_input_index.h
@@ -14,10 +14,11 @@ namespace search::diskindex {
class FusionInputIndex
{
private:
- vespalib::string _path;
- uint32_t _index;
- index::Schema _schema;
- DocIdMapping _docIdMapping;
+ vespalib::string _path;
+ uint32_t _index;
+ const SelectorArray* _selector;
+ index::Schema _schema;
+ DocIdMapping _docIdMapping;
public:
FusionInputIndex(const vespalib::string& path, uint32_t index, const SelectorArray& selector);
@@ -25,6 +26,7 @@ public:
FusionInputIndex & operator = (FusionInputIndex&&) = default;
~FusionInputIndex();
+ void setup();
const vespalib::string& getPath() const noexcept { return _path; }
uint32_t getIndex() const noexcept { return _index; }
const DocIdMapping& getDocIdMapping() const noexcept { return _docIdMapping; }
diff --git a/searchlib/src/vespa/searchlib/diskindex/fusion_output_index.cpp b/searchlib/src/vespa/searchlib/diskindex/fusion_output_index.cpp
index 66ec0889cbe..3c75aa16b93 100644
--- a/searchlib/src/vespa/searchlib/diskindex/fusion_output_index.cpp
+++ b/searchlib/src/vespa/searchlib/diskindex/fusion_output_index.cpp
@@ -5,12 +5,13 @@
namespace search::diskindex {
-FusionOutputIndex::FusionOutputIndex(const index::Schema& schema, const vespalib::string& path, std::vector<FusionInputIndex> old_indexes, uint32_t doc_id_limit, bool dynamic_k_pos_index_format, const TuneFileIndexing& tune_file_indexing, const common::FileHeaderContext& file_header_context)
+FusionOutputIndex::FusionOutputIndex(const index::Schema& schema, const vespalib::string& path, const std::vector<FusionInputIndex>& old_indexes, uint32_t doc_id_limit, const TuneFileIndexing& tune_file_indexing, const common::FileHeaderContext& file_header_context)
: _schema(schema),
_path(path),
_old_indexes(std::move(old_indexes)),
_doc_id_limit(doc_id_limit),
- _dynamic_k_pos_index_format(dynamic_k_pos_index_format),
+ _dynamic_k_pos_index_format(false),
+ _force_small_merge_chunk(false),
_tune_file_indexing(tune_file_indexing),
_file_header_context(file_header_context)
{
diff --git a/searchlib/src/vespa/searchlib/diskindex/fusion_output_index.h b/searchlib/src/vespa/searchlib/diskindex/fusion_output_index.h
index c366b111363..729ecd26524 100644
--- a/searchlib/src/vespa/searchlib/diskindex/fusion_output_index.h
+++ b/searchlib/src/vespa/searchlib/diskindex/fusion_output_index.h
@@ -22,20 +22,24 @@ class FusionOutputIndex
private:
const index::Schema& _schema;
const vespalib::string _path;
- const std::vector<FusionInputIndex> _old_indexes;
+ const std::vector<FusionInputIndex>& _old_indexes;
const uint32_t _doc_id_limit;
- const bool _dynamic_k_pos_index_format;
+ bool _dynamic_k_pos_index_format;
+ bool _force_small_merge_chunk;
const TuneFileIndexing& _tune_file_indexing;
const common::FileHeaderContext& _file_header_context;
public:
- FusionOutputIndex(const index::Schema& schema, const vespalib::string& path, std::vector<FusionInputIndex> old_indexes, uint32_t doc_id_limit, bool dynamic_k_pos_index_format, const TuneFileIndexing& tune_file_indexing, const common::FileHeaderContext& file_header_context);
+ FusionOutputIndex(const index::Schema& schema, const vespalib::string& path, const std::vector<FusionInputIndex>& old_indexes, uint32_t doc_id_limit, const TuneFileIndexing& tune_file_indexing, const common::FileHeaderContext& file_header_context);
~FusionOutputIndex();
+ void set_dynamic_k_pos_index_format(bool dynamic_k_pos_index_format) { _dynamic_k_pos_index_format = dynamic_k_pos_index_format; }
+ void set_force_small_merge_chunk(bool force_small_merge_chunk) { _force_small_merge_chunk = force_small_merge_chunk; }
const index::Schema& get_schema() const noexcept { return _schema; }
const vespalib::string& get_path() const noexcept { return _path; }
const std::vector<FusionInputIndex>& get_old_indexes() const noexcept { return _old_indexes; }
uint32_t get_doc_id_limit() const noexcept { return _doc_id_limit; }
bool get_dynamic_k_pos_index_format() const noexcept { return _dynamic_k_pos_index_format; }
+ bool get_force_small_merge_chunk() const noexcept { return _force_small_merge_chunk; }
const TuneFileIndexing& get_tune_file_indexing() const noexcept { return _tune_file_indexing; }
const common::FileHeaderContext& get_file_header_context() const noexcept { return _file_header_context; }
};
diff --git a/searchlib/src/vespa/searchlib/test/fakedata/fakememtreeocc.cpp b/searchlib/src/vespa/searchlib/test/fakedata/fakememtreeocc.cpp
index 64d194c5c7e..666eed8f1e8 100644
--- a/searchlib/src/vespa/searchlib/test/fakedata/fakememtreeocc.cpp
+++ b/searchlib/src/vespa/searchlib/test/fakedata/fakememtreeocc.cpp
@@ -363,8 +363,11 @@ FakeMemTreeOccFactory::setup(const std::vector<const FakeWord *> &fws)
}
++i;
}
- heap.merge(_mgr, 4, flush_token);
- assert(heap.empty());
+ heap.setup(4);
+ heap.set_merge_chunk(100000);
+ while (!heap.empty()) {
+ heap.merge(_mgr, flush_token);
+ }
_mgr.finalize();
}
diff --git a/searchlib/src/vespa/searchlib/util/posting_priority_queue.h b/searchlib/src/vespa/searchlib/util/posting_priority_queue.h
index 01ae0995806..c1549b32f93 100644
--- a/searchlib/src/vespa/searchlib/util/posting_priority_queue.h
+++ b/searchlib/src/vespa/searchlib/util/posting_priority_queue.h
@@ -12,7 +12,7 @@ namespace search {
template <class Reader>
class PostingPriorityQueue
{
-public:
+protected:
class Ref
{
Reader *_ref;
@@ -28,9 +28,14 @@ public:
using Vector = std::vector<Ref>;
Vector _vec;
+ uint32_t _heap_limit;
+ uint32_t _merge_chunk;
+public:
PostingPriorityQueue()
- : _vec()
+ : _vec(),
+ _heap_limit(0u),
+ _merge_chunk(0u)
{
}
@@ -40,9 +45,10 @@ public:
/*
* Sort vector after a set of initial add operations, so lowest()
- * and adjust() can be used.
+ * and adjust() can be used. Skip sort if _vec.size() < heap_limit
+ * since merging with few elements don't use heap.
*/
- void sort() { std::sort(_vec.begin(), _vec.end()); }
+ void setup(uint32_t heap_limit);
/*
* Return lowest value. Assumes vector is sorted.
diff --git a/searchlib/src/vespa/searchlib/util/posting_priority_queue.hpp b/searchlib/src/vespa/searchlib/util/posting_priority_queue.hpp
index 69bf7bc547c..33f3bce2be6 100644
--- a/searchlib/src/vespa/searchlib/util/posting_priority_queue.hpp
+++ b/searchlib/src/vespa/searchlib/util/posting_priority_queue.hpp
@@ -32,4 +32,17 @@ PostingPriorityQueue<Reader>::adjust()
*to = changed; // Save changed value at right location
}
+template <class Reader>
+void
+PostingPriorityQueue<Reader>::setup(uint32_t heap_limit)
+{
+ _heap_limit = heap_limit;
+ for (auto ref : _vec) {
+ assert(ref.get()->isValid());
+ }
+ if (_vec.size() >= heap_limit) {
+ std::sort(_vec.begin(), _vec.end());
+ }
+}
+
}
diff --git a/searchlib/src/vespa/searchlib/util/posting_priority_queue_merger.h b/searchlib/src/vespa/searchlib/util/posting_priority_queue_merger.h
index 8dd941a2c13..9debcd06ea6 100644
--- a/searchlib/src/vespa/searchlib/util/posting_priority_queue_merger.h
+++ b/searchlib/src/vespa/searchlib/util/posting_priority_queue_merger.h
@@ -13,20 +13,29 @@ namespace search {
template <class Reader, class Writer>
class PostingPriorityQueueMerger : public PostingPriorityQueue<Reader>
{
+ uint32_t _merge_chunk;
public:
using Parent = PostingPriorityQueue<Reader>;
using Vector = typename Parent::Vector;
+ using Parent::_heap_limit;
using Parent::_vec;
using Parent::adjust;
using Parent::empty;
using Parent::lowest;
- using Parent::sort;
+ using Parent::setup;
- void mergeHeap(Writer& writer, const IFlushToken& flush_token) __attribute__((noinline));
- static void mergeOne(Writer& writer, Reader& reader, const IFlushToken &flush_token) __attribute__((noinline));
- static void mergeTwo(Writer& writer, Reader& reader1, Reader& reader2, const IFlushToken& flush_token) __attribute__((noinline));
- static void mergeSmall(Writer& writer, typename Vector::iterator ib, typename Vector::iterator ie, const IFlushToken &flush_token) __attribute__((noinline));
- void merge(Writer& writer, uint32_t heapLimit, const IFlushToken& flush_token) __attribute__((noinline));
+ PostingPriorityQueueMerger()
+ : Parent(),
+ _merge_chunk(0u)
+ {
+ }
+
+ void set_merge_chunk(uint32_t merge_chunk) { _merge_chunk = merge_chunk; }
+ void mergeHeap(Writer& writer, const IFlushToken& flush_token, uint32_t remaining_merge_chunk) __attribute__((noinline));
+ static void mergeOne(Writer& writer, Reader& reader, const IFlushToken &flush_token, uint32_t remaining_merge_chunk) __attribute__((noinline));
+ static void mergeTwo(Writer& writer, Reader& reader1, Reader& reader2, const IFlushToken& flush_token, uint32_t& remaining_merge_chunk) __attribute__((noinline));
+ static void mergeSmall(Writer& writer, typename Vector::iterator ib, typename Vector::iterator ie, const IFlushToken &flush_token, uint32_t& remaining_merge_chunk) __attribute__((noinline));
+ void merge(Writer& writer, const IFlushToken& flush_token) __attribute__((noinline));
};
}
diff --git a/searchlib/src/vespa/searchlib/util/posting_priority_queue_merger.hpp b/searchlib/src/vespa/searchlib/util/posting_priority_queue_merger.hpp
index d33356cee4a..5676f6326df 100644
--- a/searchlib/src/vespa/searchlib/util/posting_priority_queue_merger.hpp
+++ b/searchlib/src/vespa/searchlib/util/posting_priority_queue_merger.hpp
@@ -9,44 +9,48 @@ namespace search {
template <class Reader, class Writer>
void
-PostingPriorityQueueMerger<Reader, Writer>::mergeHeap(Writer& writer, const IFlushToken& flush_token)
+PostingPriorityQueueMerger<Reader, Writer>::mergeHeap(Writer& writer, const IFlushToken& flush_token, uint32_t remaining_merge_chunk)
{
- while (!empty() && !flush_token.stop_requested()) {
+ while (remaining_merge_chunk > 0u && !empty() && !flush_token.stop_requested()) {
Reader *low = lowest();
low->write(writer);
low->read();
adjust();
+ --remaining_merge_chunk;
}
}
template <class Reader, class Writer>
void
-PostingPriorityQueueMerger<Reader, Writer>::mergeOne(Writer& writer, Reader& reader, const IFlushToken& flush_token)
+PostingPriorityQueueMerger<Reader, Writer>::mergeOne(Writer& writer, Reader& reader, const IFlushToken& flush_token, uint32_t remaining_merge_chunk)
{
- while (reader.isValid() && !flush_token.stop_requested()) {
+ while (remaining_merge_chunk > 0u && reader.isValid() && !flush_token.stop_requested()) {
reader.write(writer);
reader.read();
+ --remaining_merge_chunk;
}
}
template <class Reader, class Writer>
void
-PostingPriorityQueueMerger<Reader, Writer>::mergeTwo(Writer& writer, Reader& reader1, Reader& reader2, const IFlushToken& flush_token)
+PostingPriorityQueueMerger<Reader, Writer>::mergeTwo(Writer& writer, Reader& reader1, Reader& reader2, const IFlushToken& flush_token, uint32_t& remaining_merge_chunk)
{
- while (!flush_token.stop_requested()) {
+ while (remaining_merge_chunk > 0u && !flush_token.stop_requested()) {
Reader &low = reader2 < reader1 ? reader2 : reader1;
low.write(writer);
low.read();
- if (!low.isValid())
+ --remaining_merge_chunk;
+ if (!low.isValid()) {
break;
+ }
}
}
template <class Reader, class Writer>
void
-PostingPriorityQueueMerger<Reader, Writer>::mergeSmall(Writer& writer, typename Vector::iterator ib, typename Vector::iterator ie, const IFlushToken& flush_token)
+PostingPriorityQueueMerger<Reader, Writer>::mergeSmall(Writer& writer, typename Vector::iterator ib, typename Vector::iterator ie, const IFlushToken& flush_token, uint32_t& remaining_merge_chunk)
{
- while (!flush_token.stop_requested()) {
+ while (remaining_merge_chunk > 0u && !flush_token.stop_requested()) {
typename Vector::iterator i = ib;
Reader *low = i->get();
for (++i; i != ie; ++i)
@@ -54,47 +58,49 @@ PostingPriorityQueueMerger<Reader, Writer>::mergeSmall(Writer& writer, typename
low = i->get();
low->write(writer);
low->read();
- if (!low->isValid())
+ --remaining_merge_chunk;
+ if (!low->isValid()) {
break;
+ }
}
}
template <class Reader, class Writer>
void
-PostingPriorityQueueMerger<Reader, Writer>::merge(Writer& writer, uint32_t heapLimit, const IFlushToken& flush_token)
+PostingPriorityQueueMerger<Reader, Writer>::merge(Writer& writer, const IFlushToken& flush_token)
{
if (_vec.empty())
return;
- for (typename Vector::iterator i = _vec.begin(), ie = _vec.end(); i != ie;
- ++i) {
- assert(i->get()->isValid());
- }
- if (_vec.size() >= heapLimit) {
- sort();
- void (PostingPriorityQueueMerger::*mergeHeapFunc)(Writer& writer, const IFlushToken& flush_token) =
+ assert(_heap_limit > 0u);
+ uint32_t remaining_merge_chunk = _merge_chunk;
+ if (_vec.size() >= _heap_limit) {
+ void (PostingPriorityQueueMerger::*mergeHeapFunc)(Writer& writer, const IFlushToken& flush_token, uint32_t remaining_merge_chunk) =
&PostingPriorityQueueMerger::mergeHeap;
- (this->*mergeHeapFunc)(writer, flush_token);
+ (this->*mergeHeapFunc)(writer, flush_token, remaining_merge_chunk);
return;
}
- while (!flush_token.stop_requested()) {
+ while (remaining_merge_chunk > 0u && !flush_token.stop_requested()) {
if (_vec.size() == 1) {
- void (*mergeOneFunc)(Writer& writer, Reader& reader, const IFlushToken& flush_token) =
+ void (*mergeOneFunc)(Writer& writer, Reader& reader, const IFlushToken& flush_token, uint32_t remaining_merge_chunk) =
&PostingPriorityQueueMerger::mergeOne;
- (*mergeOneFunc)(writer, *_vec.front().get(), flush_token);
- _vec.clear();
+ (*mergeOneFunc)(writer, *_vec.front().get(), flush_token, remaining_merge_chunk);
+ if (!_vec.front().get()->isValid()) {
+ _vec.clear();
+ }
return;
}
if (_vec.size() == 2) {
- void (*mergeTwoFunc)(Writer& writer, Reader& reader1, Reader& reader2, const IFlushToken& flush_token) =
+ void (*mergeTwoFunc)(Writer& writer, Reader& reader1, Reader& reader2, const IFlushToken& flush_token, uint32_t& remaining_merge_chunk) =
&PostingPriorityQueueMerger::mergeTwo;
- (*mergeTwoFunc)(writer, *_vec[0].get(), *_vec[1].get(), flush_token);
+ (*mergeTwoFunc)(writer, *_vec[0].get(), *_vec[1].get(), flush_token, remaining_merge_chunk);
} else {
void (*mergeSmallFunc)(Writer& writer,
typename Vector::iterator ib,
typename Vector::iterator ie,
- const IFlushToken& flush_token) =
+ const IFlushToken& flush_token,
+ uint32_t& remaining_merge_chunk) =
&PostingPriorityQueueMerger::mergeSmall;
- (*mergeSmallFunc)(writer, _vec.begin(), _vec.end(), flush_token);
+ (*mergeSmallFunc)(writer, _vec.begin(), _vec.end(), flush_token, remaining_merge_chunk);
}
for (typename Vector::iterator i = _vec.begin(), ie = _vec.end();
i != ie; ++i) {