diff options
author | Tor Egge <Tor.Egge@online.no> | 2022-01-06 22:14:54 +0100 |
---|---|---|
committer | Tor Egge <Tor.Egge@online.no> | 2022-01-06 22:14:54 +0100 |
commit | d4463837e5e6ccbfd3ece320f6a399ff2b1376b4 (patch) | |
tree | e5b7ab368d574f5693b3063b19cd1810185cfe7f /searchlib | |
parent | eb85f48b3a6e4e69b2d45f2d9393d8b4d8e27daa (diff) |
Refactor field merger.
Diffstat (limited to 'searchlib')
10 files changed, 326 insertions, 318 deletions
diff --git a/searchlib/src/vespa/searchlib/diskindex/field_merger.cpp b/searchlib/src/vespa/searchlib/diskindex/field_merger.cpp index ee5d72f1daa..67ec7faa621 100644 --- a/searchlib/src/vespa/searchlib/diskindex/field_merger.cpp +++ b/searchlib/src/vespa/searchlib/diskindex/field_merger.cpp @@ -13,7 +13,7 @@ #include <vespa/searchlib/index/schemautil.h> #include <vespa/searchlib/util/filekit.h> #include <vespa/searchlib/util/dirtraverse.h> -#include <vespa/searchlib/util/postingpriorityqueue.h> +#include <vespa/searchlib/util/posting_priority_queue_merger.hpp> #include <vespa/vespalib/io/fileutil.h> #include <vespa/vespalib/stllike/asciistream.h> #include <vespa/vespalib/util/exceptions.h> @@ -52,11 +52,13 @@ createTmpPath(const vespalib::string & base, uint32_t index) { FieldMerger::FieldMerger(uint32_t id, const FusionOutputIndex& fusion_out_index, std::shared_ptr<IFlushToken> flush_token) : _id(id), - _field_dir(fusion_out_index.get_path() + "/" + SchemaUtil::IndexIterator(fusion_out_index.get_schema(), id).getName()), + _field_name(SchemaUtil::IndexIterator(fusion_out_index.get_schema(), id).getName()), + _field_dir(fusion_out_index.get_path() + "/" + _field_name), _fusion_out_index(fusion_out_index), _flush_token(std::move(flush_token)), _word_readers(), _word_heap(), + _word_aggregator(), _word_num_mappings(), _num_word_ids(0), _readers(), @@ -107,14 +109,14 @@ bool FieldMerger::open_input_word_readers() { _word_readers.reserve(_fusion_out_index.get_old_indexes().size()); - _word_heap = std::make_unique<PostingPriorityQueue<DictionaryWordReader>>(); + _word_heap = std::make_unique<PostingPriorityQueueMerger<DictionaryWordReader, WordAggregator>>(); SchemaUtil::IndexIterator index(_fusion_out_index.get_schema(), _id); for (auto & oi : _fusion_out_index.get_old_indexes()) { auto reader(std::make_unique<DictionaryWordReader>()); const vespalib::string &tmpindexpath = createTmpPath(_field_dir, oi.getIndex()); const vespalib::string &oldindexpath = oi.getPath(); vespalib::string wordMapName = tmpindexpath + "/old2new.dat"; - vespalib::string fieldDir(oldindexpath + "/" + index.getName()); + vespalib::string fieldDir(oldindexpath + "/" + _field_name); vespalib::string dictName(fieldDir + "/dictionary"); const Schema &oldSchema = oi.getSchema(); if (!index.hasOldFields(oldSchema)) { @@ -163,24 +165,33 @@ FieldMerger::read_mapping_files() } bool -FieldMerger::renumber_word_ids() +FieldMerger::renumber_word_ids_start() { - SchemaUtil::IndexIterator index(_fusion_out_index.get_schema(), _id); - vespalib::string indexName = index.getName(); - LOG(debug, "Renumber word IDs for field %s", indexName.c_str()); - - WordAggregator out; - + LOG(debug, "Renumber word IDs for field %s", _field_name.c_str()); if (!open_input_word_readers()) { return false; } - _word_heap->merge(out, 4, *_flush_token); + _word_aggregator = std::make_unique<WordAggregator>(); + return true; +} + +bool +FieldMerger::renumber_word_ids_main() +{ + _word_heap->merge(*_word_aggregator, 4, *_flush_token); if (_flush_token->stop_requested()) { return false; } assert(_word_heap->empty()); + return true; +} + +bool +FieldMerger::renumber_word_ids_finish() +{ _word_heap.reset(); - _num_word_ids = out.getWordNum(); + _num_word_ids = _word_aggregator->getWordNum(); + _word_aggregator.reset(); // Close files for (auto &i : _word_readers) { @@ -193,11 +204,23 @@ FieldMerger::renumber_word_ids() if (!read_mapping_files()) { return false; } - LOG(debug, "Finished renumbering words IDs for field %s", indexName.c_str()); + LOG(debug, "Finished renumbering words IDs for field %s", _field_name.c_str()); return true; } +bool +FieldMerger::renumber_word_ids() +{ + if (!renumber_word_ids_start()) { + return false; + } + if (!renumber_word_ids_main()) { + return false; + } + return renumber_word_ids_finish(); +} + std::shared_ptr<FieldLengthScanner> FieldMerger::allocate_field_length_scanner() { @@ -226,7 +249,6 @@ FieldMerger::open_input_field_readers() _readers.reserve(_fusion_out_index.get_old_indexes().size()); SchemaUtil::IndexIterator index(_fusion_out_index.get_schema(), _id); auto field_length_scanner = allocate_field_length_scanner(); - vespalib::string indexName = index.getName(); for (const auto &oi : _fusion_out_index.get_old_indexes()) { const Schema &oldSchema = oi.getSchema(); if (!index.hasOldFields(oldSchema)) { @@ -234,7 +256,7 @@ FieldMerger::open_input_field_readers() } auto reader = FieldReader::allocFieldReader(index, oldSchema, field_length_scanner); reader->setup(_word_num_mappings[oi.getIndex()], oi.getDocIdMapping()); - if (!reader->open(oi.getPath() + "/" + indexName + "/", _fusion_out_index.get_tune_file_indexing()._read)) { + if (!reader->open(oi.getPath() + "/" + _field_name + "/", _fusion_out_index.get_tune_file_indexing()._read)) { return false; } _readers.push_back(std::move(reader)); @@ -322,7 +344,7 @@ FieldMerger::select_cooked_or_raw_features(FieldReader& reader) bool FieldMerger::setup_merge_heap() { - _heap = std::make_unique<PostingPriorityQueue<FieldReader>>(); + _heap = std::make_unique<PostingPriorityQueueMerger<FieldReader, FieldWriter>>(); for (auto &reader : _readers) { if (!select_cooked_or_raw_features(*reader)) { return false; @@ -338,12 +360,10 @@ FieldMerger::setup_merge_heap() } bool -FieldMerger::merge_postings() +FieldMerger::merge_postings_start() { - SchemaUtil::IndexIterator index(_fusion_out_index.get_schema(), _id); /* OUTPUT */ _writer = std::make_unique<FieldWriter>(_fusion_out_index.get_doc_id_limit(), _num_word_ids); - vespalib::string indexName = index.getName(); if (!open_input_field_readers()) { return false; @@ -351,15 +371,23 @@ FieldMerger::merge_postings() if (!open_field_writer()) { return false; } - if (!setup_merge_heap()) { - return false; - } + return setup_merge_heap(); +} +bool +FieldMerger::merge_postings_main() +{ _heap->merge(*_writer, 4, *_flush_token); if (_flush_token->stop_requested()) { return false; } assert(_heap->empty()); + return true; +} + +bool +FieldMerger::merge_postings_finish() +{ _heap.reset(); for (auto &reader : _readers) { @@ -376,11 +404,22 @@ FieldMerger::merge_postings() } bool +FieldMerger::merge_postings() +{ + if (!merge_postings_start()) { + return false; + } + if (!merge_postings_main()) { + return false; + } + return merge_postings_finish(); +} + +bool FieldMerger::merge_field() { const Schema &schema = _fusion_out_index.get_schema(); SchemaUtil::IndexIterator index(schema, _id); - const vespalib::string &indexName = index.getName(); SchemaUtil::IndexSettings settings = index.getIndexSettings(); if (settings.hasError()) { return false; @@ -391,7 +430,7 @@ FieldMerger::merge_field() } vespalib::mkdir(_field_dir, false); - LOG(debug, "merge_field for field %s dir %s", indexName.c_str(), _field_dir.c_str()); + LOG(debug, "merge_field for field %s dir %s", _field_name.c_str(), _field_dir.c_str()); make_tmp_dirs(); @@ -399,7 +438,7 @@ FieldMerger::merge_field() if (_flush_token->stop_requested()) { return false; } - LOG(error, "Could not renumber field word ids for field %s dir %s", indexName.c_str(), _field_dir.c_str()); + LOG(error, "Could not renumber field word ids for field %s dir %s", _field_name.c_str(), _field_dir.c_str()); return false; } @@ -410,7 +449,7 @@ FieldMerger::merge_field() return false; } throw IllegalArgumentException(make_string("Could not merge field postings for field %s dir %s", - indexName.c_str(), _field_dir.c_str())); + _field_name.c_str(), _field_dir.c_str())); } if (!FileKit::createStamp(_field_dir + "/.mergeocc_done")) { return false; @@ -421,7 +460,7 @@ FieldMerger::merge_field() return false; } - LOG(debug, "Finished merge_field for field %s dir %s", indexName.c_str(), _field_dir.c_str()); + LOG(debug, "Finished merge_field for field %s dir %s", _field_name.c_str(), _field_dir.c_str()); return true; } diff --git a/searchlib/src/vespa/searchlib/diskindex/field_merger.h b/searchlib/src/vespa/searchlib/diskindex/field_merger.h index a57005e18a4..bbb2f210511 100644 --- a/searchlib/src/vespa/searchlib/diskindex/field_merger.h +++ b/searchlib/src/vespa/searchlib/diskindex/field_merger.h @@ -9,7 +9,7 @@ namespace search { class IFlushToken; -template <class IN> class PostingPriorityQueue; +template <class Reader, class Writer> class PostingPriorityQueueMerger; } namespace search::diskindex { @@ -19,6 +19,7 @@ class FieldLengthScanner; class FieldReader; class FieldWriter; class FusionOutputIndex; +class WordAggregator; class WordNumMapping; /* @@ -29,27 +30,35 @@ class FieldMerger using WordNumMappingList = std::vector<WordNumMapping>; uint32_t _id; + vespalib::string _field_name; vespalib::string _field_dir; const FusionOutputIndex& _fusion_out_index; std::shared_ptr<IFlushToken> _flush_token; std::vector<std::unique_ptr<DictionaryWordReader>> _word_readers; - std::unique_ptr<PostingPriorityQueue<DictionaryWordReader>> _word_heap; + std::unique_ptr<PostingPriorityQueueMerger<DictionaryWordReader, WordAggregator>> _word_heap; + std::unique_ptr<WordAggregator> _word_aggregator; WordNumMappingList _word_num_mappings; uint64_t _num_word_ids; std::vector<std::unique_ptr<FieldReader>> _readers; - std::unique_ptr<PostingPriorityQueue<FieldReader>> _heap; + std::unique_ptr<PostingPriorityQueueMerger<FieldReader, FieldWriter>> _heap; std::unique_ptr<FieldWriter> _writer; void make_tmp_dirs(); bool clean_tmp_dirs(); bool open_input_word_readers(); bool read_mapping_files(); + bool renumber_word_ids_start(); + bool renumber_word_ids_main(); + bool renumber_word_ids_finish(); bool renumber_word_ids(); std::shared_ptr<FieldLengthScanner> allocate_field_length_scanner(); bool open_input_field_readers(); bool open_field_writer(); bool select_cooked_or_raw_features(FieldReader& reader); bool setup_merge_heap(); + bool merge_postings_start(); + bool merge_postings_main(); + bool merge_postings_finish(); bool merge_postings(); public: FieldMerger(uint32_t id, const FusionOutputIndex& fusion_out_index, std::shared_ptr<IFlushToken> flush_token); diff --git a/searchlib/src/vespa/searchlib/diskindex/fusion.cpp b/searchlib/src/vespa/searchlib/diskindex/fusion.cpp index eafbbac361b..1afed18cb48 100644 --- a/searchlib/src/vespa/searchlib/diskindex/fusion.cpp +++ b/searchlib/src/vespa/searchlib/diskindex/fusion.cpp @@ -3,46 +3,29 @@ #include "fusion.h" #include "fusion_input_index.h" #include "field_merger.h" -#include "fieldreader.h" -#include "dictionarywordreader.h" -#include "field_length_scanner.h" -#include <vespa/vespalib/util/stringfmt.h> -#include <vespa/searchlib/bitcompression/posocc_fields_params.h> +#include <vespa/fastos/file.h> +#include <vespa/searchlib/common/documentsummary.h> #include <vespa/searchlib/common/i_flush_token.h> -#include <vespa/searchlib/index/field_length_info.h> -#include <vespa/searchlib/util/filekit.h> +#include <vespa/searchlib/index/schemautil.h> #include <vespa/searchlib/util/dirtraverse.h> -#include <vespa/searchlib/util/postingpriorityqueue.h> #include <vespa/vespalib/io/fileutil.h> -#include <vespa/searchlib/common/documentsummary.h> +#include <vespa/vespalib/util/count_down_latch.h> #include <vespa/vespalib/util/error.h> +#include <vespa/vespalib/util/exceptions.h> #include <vespa/vespalib/util/lambdatask.h> -#include <vespa/vespalib/util/count_down_latch.h> -#include <vespa/vespalib/stllike/asciistream.h> #include <vespa/document/util/queue.h> -#include <sstream> #include <vespa/log/log.h> -#include <vespa/vespalib/util/exceptions.h> LOG_SETUP(".diskindex.fusion"); -using search::FileKit; -using search::PostingPriorityQueue; using search::common::FileHeaderContext; -using search::diskindex::DocIdMapping; -using search::diskindex::WordNumMapping; using search::docsummary::DocumentSummary; -using search::index::FieldLengthInfo; -using search::bitcompression::PosOccFieldParams; -using search::bitcompression::PosOccFieldsParams; -using search::index::PostingListParams; using search::index::Schema; using search::index::SchemaUtil; using search::index::schema::DataType; using vespalib::getLastErrorString; using vespalib::IllegalArgumentException; -using vespalib::make_string; namespace search::diskindex { diff --git a/searchlib/src/vespa/searchlib/diskindex/fusion.h b/searchlib/src/vespa/searchlib/diskindex/fusion.h index 22dda4d6edf..7e0b70dca36 100644 --- a/searchlib/src/vespa/searchlib/diskindex/fusion.h +++ b/searchlib/src/vespa/searchlib/diskindex/fusion.h @@ -3,12 +3,10 @@ #pragma once #include "fusion_output_index.h" - #include <vespa/vespalib/util/threadexecutor.h> namespace search { class IFlushToken; -template <class IN> class PostingPriorityQueue; class TuneFileIndexing; } diff --git a/searchlib/src/vespa/searchlib/test/fakedata/fakememtreeocc.cpp b/searchlib/src/vespa/searchlib/test/fakedata/fakememtreeocc.cpp index 8ae3fe0bdad..64d194c5c7e 100644 --- a/searchlib/src/vespa/searchlib/test/fakedata/fakememtreeocc.cpp +++ b/searchlib/src/vespa/searchlib/test/fakedata/fakememtreeocc.cpp @@ -5,7 +5,7 @@ #include <vespa/searchlib/common/flush_token.h> #include <vespa/searchlib/memoryindex/posting_iterator.h> #include <vespa/searchlib/queryeval/iterators.h> -#include <vespa/searchlib/util/postingpriorityqueue.h> +#include <vespa/searchlib/util/posting_priority_queue_merger.hpp> #include <vespa/vespalib/datastore/buffer_type.hpp> #include <vespa/vespalib/btree/btreeiterator.hpp> #include <vespa/vespalib/btree/btreenode.hpp> @@ -352,7 +352,7 @@ FakeMemTreeOccFactory::setup(const std::vector<const FakeWord *> &fws) ++wordIdx; } - PostingPriorityQueue<FakeWord::RandomizedReader> heap; + PostingPriorityQueueMerger<FakeWord::RandomizedReader, FakeWord::RandomizedWriter> heap; std::vector<FakeWord::RandomizedReader>::iterator i(r.begin()); std::vector<FakeWord::RandomizedReader>::iterator ie(r.end()); FlushToken flush_token; diff --git a/searchlib/src/vespa/searchlib/util/posting_priority_queue.h b/searchlib/src/vespa/searchlib/util/posting_priority_queue.h new file mode 100644 index 00000000000..01ae0995806 --- /dev/null +++ b/searchlib/src/vespa/searchlib/util/posting_priority_queue.h @@ -0,0 +1,59 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vector> + +namespace search { + +/* + * Provide priority queue semantics for a set of posting readers. + */ +template <class Reader> +class PostingPriorityQueue +{ +public: + class Ref + { + Reader *_ref; + public: + Ref(Reader *ref) + : _ref(ref) + { + } + + bool operator<(const Ref &rhs) const { return *_ref < *rhs._ref; } + Reader *get() const noexcept { return _ref; } + }; + + using Vector = std::vector<Ref>; + Vector _vec; + + PostingPriorityQueue() + : _vec() + { + } + + bool empty() const { return _vec.empty(); } + void clear() { _vec.clear(); } + void initialAdd(Reader *it) { _vec.push_back(Ref(it)); } + + /* + * Sort vector after a set of initial add operations, so lowest() + * and adjust() can be used. + */ + void sort() { std::sort(_vec.begin(), _vec.end()); } + + /* + * Return lowest value. Assumes vector is sorted. + */ + Reader *lowest() const { return _vec.front().get(); } + + /* + * The vector might no longer be sorted since the first element has changed + * value. Perform adjustments to make vector sorted again. + */ + void adjust(); +}; + +} diff --git a/searchlib/src/vespa/searchlib/util/posting_priority_queue.hpp b/searchlib/src/vespa/searchlib/util/posting_priority_queue.hpp new file mode 100644 index 00000000000..69bf7bc547c --- /dev/null +++ b/searchlib/src/vespa/searchlib/util/posting_priority_queue.hpp @@ -0,0 +1,35 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "posting_priority_queue.h" + +namespace search { + +template <class Reader> +void +PostingPriorityQueue<Reader>::adjust() +{ + typedef typename Vector::iterator VIT; + if (!_vec.front().get()->isValid()) { + _vec.erase(_vec.begin()); // Iterator no longer valid + return; + } + if (_vec.size() == 1) { // Only one iterator left + return; + } + // Peform binary search to find first element higher than changed value + VIT gt = std::upper_bound(_vec.begin() + 1, _vec.end(), _vec.front()); + VIT to = _vec.begin(); + VIT from = to; + ++from; + Ref changed = *to; // Remember changed value + while (from != gt) { // Shift elements to make space for changed value + *to = *from; + ++from; + ++to; + } + *to = changed; // Save changed value at right location +} + +} diff --git a/searchlib/src/vespa/searchlib/util/posting_priority_queue_merger.h b/searchlib/src/vespa/searchlib/util/posting_priority_queue_merger.h new file mode 100644 index 00000000000..8dd941a2c13 --- /dev/null +++ b/searchlib/src/vespa/searchlib/util/posting_priority_queue_merger.h @@ -0,0 +1,32 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "posting_priority_queue.h" + +namespace search { + +/* + * Provide priority queue semantics for a set of posting readers with + * merging to a posting writer. + */ +template <class Reader, class Writer> +class PostingPriorityQueueMerger : public PostingPriorityQueue<Reader> +{ +public: + using Parent = PostingPriorityQueue<Reader>; + using Vector = typename Parent::Vector; + using Parent::_vec; + using Parent::adjust; + using Parent::empty; + using Parent::lowest; + using Parent::sort; + + void mergeHeap(Writer& writer, const IFlushToken& flush_token) __attribute__((noinline)); + static void mergeOne(Writer& writer, Reader& reader, const IFlushToken &flush_token) __attribute__((noinline)); + static void mergeTwo(Writer& writer, Reader& reader1, Reader& reader2, const IFlushToken& flush_token) __attribute__((noinline)); + static void mergeSmall(Writer& writer, typename Vector::iterator ib, typename Vector::iterator ie, const IFlushToken &flush_token) __attribute__((noinline)); + void merge(Writer& writer, uint32_t heapLimit, const IFlushToken& flush_token) __attribute__((noinline)); +}; + +} diff --git a/searchlib/src/vespa/searchlib/util/posting_priority_queue_merger.hpp b/searchlib/src/vespa/searchlib/util/posting_priority_queue_merger.hpp new file mode 100644 index 00000000000..d33356cee4a --- /dev/null +++ b/searchlib/src/vespa/searchlib/util/posting_priority_queue_merger.hpp @@ -0,0 +1,114 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "posting_priority_queue.hpp" +#include "posting_priority_queue_merger.h" + +namespace search { + +template <class Reader, class Writer> +void +PostingPriorityQueueMerger<Reader, Writer>::mergeHeap(Writer& writer, const IFlushToken& flush_token) +{ + while (!empty() && !flush_token.stop_requested()) { + Reader *low = lowest(); + low->write(writer); + low->read(); + adjust(); + } +} + +template <class Reader, class Writer> +void +PostingPriorityQueueMerger<Reader, Writer>::mergeOne(Writer& writer, Reader& reader, const IFlushToken& flush_token) +{ + while (reader.isValid() && !flush_token.stop_requested()) { + reader.write(writer); + reader.read(); + } +} + +template <class Reader, class Writer> +void +PostingPriorityQueueMerger<Reader, Writer>::mergeTwo(Writer& writer, Reader& reader1, Reader& reader2, const IFlushToken& flush_token) +{ + while (!flush_token.stop_requested()) { + Reader &low = reader2 < reader1 ? reader2 : reader1; + low.write(writer); + low.read(); + if (!low.isValid()) + break; + } +} + +template <class Reader, class Writer> +void +PostingPriorityQueueMerger<Reader, Writer>::mergeSmall(Writer& writer, typename Vector::iterator ib, typename Vector::iterator ie, const IFlushToken& flush_token) +{ + while (!flush_token.stop_requested()) { + typename Vector::iterator i = ib; + Reader *low = i->get(); + for (++i; i != ie; ++i) + if (*i->get() < *low) + low = i->get(); + low->write(writer); + low->read(); + if (!low->isValid()) + break; + } +} + +template <class Reader, class Writer> +void +PostingPriorityQueueMerger<Reader, Writer>::merge(Writer& writer, uint32_t heapLimit, const IFlushToken& flush_token) +{ + if (_vec.empty()) + return; + for (typename Vector::iterator i = _vec.begin(), ie = _vec.end(); i != ie; + ++i) { + assert(i->get()->isValid()); + } + if (_vec.size() >= heapLimit) { + sort(); + void (PostingPriorityQueueMerger::*mergeHeapFunc)(Writer& writer, const IFlushToken& flush_token) = + &PostingPriorityQueueMerger::mergeHeap; + (this->*mergeHeapFunc)(writer, flush_token); + return; + } + while (!flush_token.stop_requested()) { + if (_vec.size() == 1) { + void (*mergeOneFunc)(Writer& writer, Reader& reader, const IFlushToken& flush_token) = + &PostingPriorityQueueMerger::mergeOne; + (*mergeOneFunc)(writer, *_vec.front().get(), flush_token); + _vec.clear(); + return; + } + if (_vec.size() == 2) { + void (*mergeTwoFunc)(Writer& writer, Reader& reader1, Reader& reader2, const IFlushToken& flush_token) = + &PostingPriorityQueueMerger::mergeTwo; + (*mergeTwoFunc)(writer, *_vec[0].get(), *_vec[1].get(), flush_token); + } else { + void (*mergeSmallFunc)(Writer& writer, + typename Vector::iterator ib, + typename Vector::iterator ie, + const IFlushToken& flush_token) = + &PostingPriorityQueueMerger::mergeSmall; + (*mergeSmallFunc)(writer, _vec.begin(), _vec.end(), flush_token); + } + for (typename Vector::iterator i = _vec.begin(), ie = _vec.end(); + i != ie; ++i) { + if (!i->get()->isValid()) { + _vec.erase(i); + break; + } + } + for (typename Vector::iterator i = _vec.begin(), ie = _vec.end(); + i != ie; ++i) { + assert(i->get()->isValid()); + } + assert(!_vec.empty()); + } +} + +} diff --git a/searchlib/src/vespa/searchlib/util/postingpriorityqueue.h b/searchlib/src/vespa/searchlib/util/postingpriorityqueue.h deleted file mode 100644 index c263c6bc470..00000000000 --- a/searchlib/src/vespa/searchlib/util/postingpriorityqueue.h +++ /dev/null @@ -1,261 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#pragma once - -#include <vector> - -namespace search -{ - -/* - * Provide priority queue semantics for a set of posting inputs. - */ -template <class IN> -class PostingPriorityQueue -{ -public: - class Ref - { - IN *_ref; - public: - Ref(IN *ref) - : _ref(ref) - { - } - - bool - operator<(const Ref &rhs) const - { - return *_ref < *rhs._ref; - } - - IN * - get() const - { - return _ref; - } - }; - - typedef std::vector<Ref> Vector; - Vector _vec; - - PostingPriorityQueue() - : _vec() - { - } - - bool - empty() const - { - return _vec.empty(); - } - - void - clear() - { - _vec.clear(); - } - - void - initialAdd(IN *it) - { - _vec.push_back(Ref(it)); - } - - /* - * Sort vector after a set of initial add operations, so lowest() - * and adjust() can be used. - */ - void - sort() - { - std::sort(_vec.begin(), _vec.end()); - } - - /* - * Return lowest value. Assumes vector is sorted. - */ - IN * - lowest() const - { - return _vec.front().get(); - } - - /* - * The vector might no longer be sorted since the first element has changed - * value. Perform adjustments to make vector sorted again. - */ - void - adjust(); - - - template <class OUT> - void - mergeHeap(OUT &out, const IFlushToken& flush_token) __attribute__((noinline)); - - template <class OUT> - static void - mergeOne(OUT &out, IN &in, const IFlushToken &flush_token) __attribute__((noinline)); - - template <class OUT> - static void - mergeTwo(OUT &out, IN &in1, IN &in2, const IFlushToken& flush_token) __attribute__((noinline)); - - template <class OUT> - static void - mergeSmall(OUT &out, - typename Vector::iterator ib, - typename Vector::iterator ie, - const IFlushToken &flush_token) - __attribute__((noinline)); - - template <class OUT> - void - merge(OUT &out, uint32_t heapLimit, const IFlushToken& flush_token) __attribute__((noinline)); -}; - - -template <class IN> -void -PostingPriorityQueue<IN>::adjust() -{ - typedef typename Vector::iterator VIT; - if (!_vec.front().get()->isValid()) { - _vec.erase(_vec.begin()); // Iterator no longer valid - return; - } - if (_vec.size() == 1) // Only one iterator left - return; - // Peform binary search to find first element higher than changed value - VIT gt = std::upper_bound(_vec.begin() + 1, _vec.end(), _vec.front()); - VIT to = _vec.begin(); - VIT from = to; - ++from; - Ref changed = *to; // Remember changed value - while (from != gt) { // Shift elements to make space for changed value - *to = *from; - ++from; - ++to; - } - *to = changed; // Save changed value at right location -} - - -template <class IN> -template <class OUT> -void -PostingPriorityQueue<IN>::mergeHeap(OUT &out, const IFlushToken& flush_token) -{ - while (!empty() && !flush_token.stop_requested()) { - IN *low = lowest(); - low->write(out); - low->read(); - adjust(); - } -} - - -template <class IN> -template <class OUT> -void -PostingPriorityQueue<IN>::mergeOne(OUT &out, IN &in, const IFlushToken& flush_token) -{ - while (in.isValid() && !flush_token.stop_requested()) { - in.write(out); - in.read(); - } -} - -template <class IN> -template <class OUT> -void -PostingPriorityQueue<IN>::mergeTwo(OUT &out, IN &in1, IN &in2, const IFlushToken& flush_token) -{ - while (!flush_token.stop_requested()) { - IN &low = in2 < in1 ? in2 : in1; - low.write(out); - low.read(); - if (!low.isValid()) - break; - } -} - - -template <class IN> -template <class OUT> -void -PostingPriorityQueue<IN>::mergeSmall(OUT &out, - typename Vector::iterator ib, - typename Vector::iterator ie, - const IFlushToken& flush_token) -{ - while (!flush_token.stop_requested()) { - typename Vector::iterator i = ib; - IN *low = i->get(); - for (++i; i != ie; ++i) - if (*i->get() < *low) - low = i->get(); - low->write(out); - low->read(); - if (!low->isValid()) - break; - } -} - - -template <class IN> -template <class OUT> -void -PostingPriorityQueue<IN>::merge(OUT &out, uint32_t heapLimit, const IFlushToken& flush_token) -{ - if (_vec.empty()) - return; - for (typename Vector::iterator i = _vec.begin(), ie = _vec.end(); i != ie; - ++i) { - assert(i->get()->isValid()); - } - if (_vec.size() >= heapLimit) { - sort(); - void (PostingPriorityQueue::*mergeHeapFunc)(OUT &out, const IFlushToken& flush_token) = - &PostingPriorityQueue::mergeHeap; - (this->*mergeHeapFunc)(out, flush_token); - return; - } - while (!flush_token.stop_requested()) { - if (_vec.size() == 1) { - void (*mergeOneFunc)(OUT &out, IN &in, const IFlushToken& flush_token) = - &PostingPriorityQueue<IN>::mergeOne; - (*mergeOneFunc)(out, *_vec.front().get(), flush_token); - _vec.clear(); - return; - } - if (_vec.size() == 2) { - void (*mergeTwoFunc)(OUT &out, IN &in1, IN &in2, const IFlushToken& flush_token) = - &PostingPriorityQueue<IN>::mergeTwo; - (*mergeTwoFunc)(out, *_vec[0].get(), *_vec[1].get(), flush_token); - } else { - void (*mergeSmallFunc)(OUT &out, - typename Vector::iterator ib, - typename Vector::iterator ie, - const IFlushToken& flush_token) = - &PostingPriorityQueue::mergeSmall; - (*mergeSmallFunc)(out, _vec.begin(), _vec.end(), flush_token); - } - for (typename Vector::iterator i = _vec.begin(), ie = _vec.end(); - i != ie; ++i) { - if (!i->get()->isValid()) { - _vec.erase(i); - break; - } - } - for (typename Vector::iterator i = _vec.begin(), ie = _vec.end(); - i != ie; ++i) { - assert(i->get()->isValid()); - } - assert(!_vec.empty()); - } -} - - -} // namespace search - |