aboutsummaryrefslogtreecommitdiffstats
path: root/searchlib
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@online.no>2022-01-06 12:25:14 +0100
committerTor Egge <Tor.Egge@online.no>2022-01-06 12:27:20 +0100
commitad70f29311cca2e812df502347433d9b4ecf3e54 (patch)
treea135415d108baba49c6b614a28729a33a4883b4f /searchlib
parent00fd50f8f99ec06a846404355f6c6e5c54e73d4f (diff)
Move state from stack to field merger.
Diffstat (limited to 'searchlib')
-rw-r--r--searchlib/src/vespa/searchlib/diskindex/field_merger.cpp122
-rw-r--r--searchlib/src/vespa/searchlib/diskindex/field_merger.h30
-rw-r--r--searchlib/src/vespa/searchlib/diskindex/fusion.cpp4
3 files changed, 87 insertions, 69 deletions
diff --git a/searchlib/src/vespa/searchlib/diskindex/field_merger.cpp b/searchlib/src/vespa/searchlib/diskindex/field_merger.cpp
index 375d30d3003..ee5d72f1daa 100644
--- a/searchlib/src/vespa/searchlib/diskindex/field_merger.cpp
+++ b/searchlib/src/vespa/searchlib/diskindex/field_merger.cpp
@@ -50,10 +50,18 @@ createTmpPath(const vespalib::string & base, uint32_t index) {
}
-FieldMerger::FieldMerger(uint32_t id, const FusionOutputIndex& fusion_out_index)
+FieldMerger::FieldMerger(uint32_t id, const FusionOutputIndex& fusion_out_index, std::shared_ptr<IFlushToken> flush_token)
: _id(id),
_field_dir(fusion_out_index.get_path() + "/" + SchemaUtil::IndexIterator(fusion_out_index.get_schema(), id).getName()),
- _fusion_out_index(fusion_out_index)
+ _fusion_out_index(fusion_out_index),
+ _flush_token(std::move(flush_token)),
+ _word_readers(),
+ _word_heap(),
+ _word_num_mappings(),
+ _num_word_ids(0),
+ _readers(),
+ _heap(),
+ _writer()
{
}
@@ -96,8 +104,10 @@ FieldMerger::clean_tmp_dirs()
}
bool
-FieldMerger::open_input_word_readers(std::vector<std::unique_ptr<DictionaryWordReader>> & readers, PostingPriorityQueue<DictionaryWordReader>& heap)
+FieldMerger::open_input_word_readers()
{
+ _word_readers.reserve(_fusion_out_index.get_old_indexes().size());
+ _word_heap = std::make_unique<PostingPriorityQueue<DictionaryWordReader>>();
SchemaUtil::IndexIterator index(_fusion_out_index.get_schema(), _id);
for (auto & oi : _fusion_out_index.get_old_indexes()) {
auto reader(std::make_unique<DictionaryWordReader>());
@@ -117,16 +127,17 @@ FieldMerger::open_input_word_readers(std::vector<std::unique_ptr<DictionaryWordR
}
reader->read();
if (reader->isValid()) {
- readers.push_back(std::move(reader));
- heap.initialAdd(readers.back().get());
+ _word_readers.push_back(std::move(reader));
+ _word_heap->initialAdd(_word_readers.back().get());
}
}
return true;
}
bool
-FieldMerger::read_mapping_files(WordNumMappingList& list)
+FieldMerger::read_mapping_files()
{
+ _word_num_mappings.resize(_fusion_out_index.get_old_indexes().size());
SchemaUtil::IndexIterator index(_fusion_out_index.get_schema(), _id);
for (const auto & oi : _fusion_out_index.get_old_indexes()) {
std::vector<uint32_t> oldIndexes;
@@ -134,7 +145,7 @@ FieldMerger::read_mapping_files(WordNumMappingList& list)
if (!SchemaUtil::getIndexIds(oldSchema, DataType::STRING, oldIndexes)) {
return false;
}
- WordNumMapping &wordNumMapping = list[oi.getIndex()];
+ WordNumMapping &wordNumMapping = _word_num_mappings[oi.getIndex()];
if (oldIndexes.empty()) {
wordNumMapping.noMappingFile();
continue;
@@ -152,34 +163,34 @@ FieldMerger::read_mapping_files(WordNumMappingList& list)
}
bool
-FieldMerger::renumber_word_ids(WordNumMappingList& list, uint64_t& numWordIds, const IFlushToken& flush_token)
+FieldMerger::renumber_word_ids()
{
SchemaUtil::IndexIterator index(_fusion_out_index.get_schema(), _id);
vespalib::string indexName = index.getName();
LOG(debug, "Renumber word IDs for field %s", indexName.c_str());
- std::vector<std::unique_ptr<DictionaryWordReader>> readers;
- PostingPriorityQueue<DictionaryWordReader> heap;
WordAggregator out;
- if (!open_input_word_readers(readers, heap)) {
+ if (!open_input_word_readers()) {
return false;
}
- heap.merge(out, 4, flush_token);
- if (flush_token.stop_requested()) {
+ _word_heap->merge(out, 4, *_flush_token);
+ if (_flush_token->stop_requested()) {
return false;
}
- assert(heap.empty());
- numWordIds = out.getWordNum();
+ assert(_word_heap->empty());
+ _word_heap.reset();
+ _num_word_ids = out.getWordNum();
// Close files
- for (auto &i : readers) {
+ for (auto &i : _word_readers) {
i->close();
}
+ _word_readers.clear();
// Now read mapping files back into an array
// XXX: avoid this, and instead make the array here
- if (!read_mapping_files(list)) {
+ if (!read_mapping_files()) {
return false;
}
LOG(debug, "Finished renumbering words IDs for field %s", indexName.c_str());
@@ -210,8 +221,9 @@ FieldMerger::allocate_field_length_scanner()
}
bool
-FieldMerger::open_input_field_readers(const WordNumMappingList& list, std::vector<std::unique_ptr<FieldReader>>& readers)
+FieldMerger::open_input_field_readers()
{
+ _readers.reserve(_fusion_out_index.get_old_indexes().size());
SchemaUtil::IndexIterator index(_fusion_out_index.get_schema(), _id);
auto field_length_scanner = allocate_field_length_scanner();
vespalib::string indexName = index.getName();
@@ -221,31 +233,35 @@ FieldMerger::open_input_field_readers(const WordNumMappingList& list, std::vecto
continue; // drop data
}
auto reader = FieldReader::allocFieldReader(index, oldSchema, field_length_scanner);
- reader->setup(list[oi.getIndex()], oi.getDocIdMapping());
+ reader->setup(_word_num_mappings[oi.getIndex()], oi.getDocIdMapping());
if (!reader->open(oi.getPath() + "/" + indexName + "/", _fusion_out_index.get_tune_file_indexing()._read)) {
return false;
}
- readers.push_back(std::move(reader));
+ _readers.push_back(std::move(reader));
}
return true;
}
bool
-FieldMerger::open_field_writer(FieldWriter& writer, const FieldLengthInfo& field_length_info)
+FieldMerger::open_field_writer()
{
+ FieldLengthInfo field_length_info;
+ if (!_readers.empty()) {
+ field_length_info = _readers.back()->get_field_length_info();
+ }
SchemaUtil::IndexIterator index(_fusion_out_index.get_schema(), _id);
- if (!writer.open(_field_dir + "/", 64, 262144, _fusion_out_index.get_dynamic_k_pos_index_format(),
- index.use_interleaved_features(), index.getSchema(),
- index.getIndex(),
- field_length_info,
- _fusion_out_index.get_tune_file_indexing()._write, _fusion_out_index.get_file_header_context())) {
+ if (!_writer->open(_field_dir + "/", 64, 262144, _fusion_out_index.get_dynamic_k_pos_index_format(),
+ index.use_interleaved_features(), index.getSchema(),
+ index.getIndex(),
+ field_length_info,
+ _fusion_out_index.get_tune_file_indexing()._write, _fusion_out_index.get_file_header_context())) {
throw IllegalArgumentException(make_string("Could not open output posocc + dictionary in %s", _field_dir.c_str()));
}
return true;
}
bool
-FieldMerger::select_cooked_or_raw_features(FieldReader& reader, FieldWriter& writer)
+FieldMerger::select_cooked_or_raw_features(FieldReader& reader)
{
bool rawFormatOK = true;
bool cookedFormatOK = true;
@@ -258,7 +274,7 @@ FieldMerger::select_cooked_or_raw_features(FieldReader& reader, FieldWriter& wri
return true;
}
{
- writer.getFeatureParams(featureParams);
+ _writer->getFeatureParams(featureParams);
cookedFormat = featureParams.getStr("cookedEncoding");
rawFormat = featureParams.getStr("encoding");
if (rawFormat == "") {
@@ -304,65 +320,63 @@ FieldMerger::select_cooked_or_raw_features(FieldReader& reader, FieldWriter& wri
}
bool
-FieldMerger::setup_merge_heap(const std::vector<std::unique_ptr<FieldReader>>& readers, FieldWriter& writer, PostingPriorityQueue<FieldReader>& heap)
+FieldMerger::setup_merge_heap()
{
- for (auto &reader : readers) {
- if (!select_cooked_or_raw_features(*reader, writer)) {
+ _heap = std::make_unique<PostingPriorityQueue<FieldReader>>();
+ for (auto &reader : _readers) {
+ if (!select_cooked_or_raw_features(*reader)) {
return false;
}
if (reader->isValid()) {
reader->read();
}
if (reader->isValid()) {
- heap.initialAdd(reader.get());
+ _heap->initialAdd(reader.get());
}
}
return true;
}
bool
-FieldMerger::merge_postings(const WordNumMappingList& list, uint64_t numWordIds, const IFlushToken& flush_token)
+FieldMerger::merge_postings()
{
SchemaUtil::IndexIterator index(_fusion_out_index.get_schema(), _id);
- std::vector<std::unique_ptr<FieldReader>> readers;
- PostingPriorityQueue<FieldReader> heap;
/* OUTPUT */
- FieldWriter fieldWriter(_fusion_out_index.get_doc_id_limit(), numWordIds);
+ _writer = std::make_unique<FieldWriter>(_fusion_out_index.get_doc_id_limit(), _num_word_ids);
vespalib::string indexName = index.getName();
- if (!open_input_field_readers(list, readers)) {
+ if (!open_input_field_readers()) {
return false;
}
- FieldLengthInfo field_length_info;
- if (!readers.empty()) {
- field_length_info = readers.back()->get_field_length_info();
- }
- if (!open_field_writer(fieldWriter, field_length_info)) {
+ if (!open_field_writer()) {
return false;
}
- if (!setup_merge_heap(readers, fieldWriter, heap)) {
+ if (!setup_merge_heap()) {
return false;
}
- heap.merge(fieldWriter, 4, flush_token);
- if (flush_token.stop_requested()) {
+ _heap->merge(*_writer, 4, *_flush_token);
+ if (_flush_token->stop_requested()) {
return false;
}
- assert(heap.empty());
+ assert(_heap->empty());
+ _heap.reset();
- for (auto &reader : readers) {
+ for (auto &reader : _readers) {
if (!reader->close()) {
return false;
}
}
- if (!fieldWriter.close()) {
+ _readers.clear();
+ if (!_writer->close()) {
throw IllegalArgumentException(make_string("Could not close output posocc + dictionary in %s", _field_dir.c_str()));
}
+ _writer.reset();
return true;
}
bool
-FieldMerger::merge_field(std::shared_ptr<IFlushToken> flush_token)
+FieldMerger::merge_field()
{
const Schema &schema = _fusion_out_index.get_schema();
SchemaUtil::IndexIterator index(schema, _id);
@@ -381,10 +395,8 @@ FieldMerger::merge_field(std::shared_ptr<IFlushToken> flush_token)
make_tmp_dirs();
- WordNumMappingList list(_fusion_out_index.get_old_indexes().size());
- uint64_t numWordIds(0);
- if (!renumber_word_ids(list, numWordIds, *flush_token)) {
- if (flush_token->stop_requested()) {
+ if (!renumber_word_ids()) {
+ if (_flush_token->stop_requested()) {
return false;
}
LOG(error, "Could not renumber field word ids for field %s dir %s", indexName.c_str(), _field_dir.c_str());
@@ -392,9 +404,9 @@ FieldMerger::merge_field(std::shared_ptr<IFlushToken> flush_token)
}
// Tokamak
- bool res = merge_postings(list, numWordIds, *flush_token);
+ bool res = merge_postings();
if (!res) {
- if (flush_token->stop_requested()) {
+ if (_flush_token->stop_requested()) {
return false;
}
throw IllegalArgumentException(make_string("Could not merge field postings for field %s dir %s",
diff --git a/searchlib/src/vespa/searchlib/diskindex/field_merger.h b/searchlib/src/vespa/searchlib/diskindex/field_merger.h
index 31c2818cf17..a57005e18a4 100644
--- a/searchlib/src/vespa/searchlib/diskindex/field_merger.h
+++ b/searchlib/src/vespa/searchlib/diskindex/field_merger.h
@@ -12,8 +12,6 @@ class IFlushToken;
template <class IN> class PostingPriorityQueue;
}
-namespace search::index { class FieldLengthInfo; }
-
namespace search::diskindex {
class DictionaryWordReader;
@@ -33,22 +31,30 @@ class FieldMerger
uint32_t _id;
vespalib::string _field_dir;
const FusionOutputIndex& _fusion_out_index;
+ std::shared_ptr<IFlushToken> _flush_token;
+ std::vector<std::unique_ptr<DictionaryWordReader>> _word_readers;
+ std::unique_ptr<PostingPriorityQueue<DictionaryWordReader>> _word_heap;
+ WordNumMappingList _word_num_mappings;
+ uint64_t _num_word_ids;
+ std::vector<std::unique_ptr<FieldReader>> _readers;
+ std::unique_ptr<PostingPriorityQueue<FieldReader>> _heap;
+ std::unique_ptr<FieldWriter> _writer;
void make_tmp_dirs();
bool clean_tmp_dirs();
- bool open_input_word_readers(std::vector<std::unique_ptr<DictionaryWordReader>>& readers, PostingPriorityQueue<DictionaryWordReader>& heap);
- bool read_mapping_files(WordNumMappingList& list);
- bool renumber_word_ids(WordNumMappingList& list, uint64_t& numWordIds, const IFlushToken& flush_token);
+ bool open_input_word_readers();
+ bool read_mapping_files();
+ bool renumber_word_ids();
std::shared_ptr<FieldLengthScanner> allocate_field_length_scanner();
- bool open_input_field_readers(const WordNumMappingList& list, std::vector<std::unique_ptr<FieldReader>>& readers);
- bool open_field_writer(FieldWriter& writer, const index::FieldLengthInfo& field_length_info);
- bool select_cooked_or_raw_features(FieldReader& reader, FieldWriter& writer);
- bool setup_merge_heap(const std::vector<std::unique_ptr<FieldReader>>& readers, FieldWriter& writer, PostingPriorityQueue<FieldReader>& heap);
- bool merge_postings(const WordNumMappingList& list, uint64_t numWordIds, const IFlushToken& flush_token);
+ bool open_input_field_readers();
+ bool open_field_writer();
+ bool select_cooked_or_raw_features(FieldReader& reader);
+ bool setup_merge_heap();
+ bool merge_postings();
public:
- FieldMerger(uint32_t id, const FusionOutputIndex& fusion_out_index);
+ FieldMerger(uint32_t id, const FusionOutputIndex& fusion_out_index, std::shared_ptr<IFlushToken> flush_token);
~FieldMerger();
- bool merge_field(std::shared_ptr<IFlushToken> flush_token);
+ bool merge_field();
};
}
diff --git a/searchlib/src/vespa/searchlib/diskindex/fusion.cpp b/searchlib/src/vespa/searchlib/diskindex/fusion.cpp
index 5891849959b..eafbbac361b 100644
--- a/searchlib/src/vespa/searchlib/diskindex/fusion.cpp
+++ b/searchlib/src/vespa/searchlib/diskindex/fusion.cpp
@@ -86,8 +86,8 @@ Fusion::mergeFields(vespalib::ThreadExecutor & executor, std::shared_ptr<IFlushT
for (SchemaUtil::IndexIterator iter(schema); iter.isValid(); ++iter) {
concurrent.wait();
executor.execute(vespalib::makeLambdaTask([this, index=iter.getIndex(), &failed, &done, &concurrent, flush_token]() {
- FieldMerger merger(index, _fusion_out_index);
- if (!merger.merge_field(flush_token)) {
+ FieldMerger merger(index, _fusion_out_index, flush_token);
+ if (!merger.merge_field()) {
failed++;
}
concurrent.post();