aboutsummaryrefslogtreecommitdiffstats
path: root/searchlib
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@online.no>2022-01-06 22:14:54 +0100
committerTor Egge <Tor.Egge@online.no>2022-01-06 22:14:54 +0100
commitd4463837e5e6ccbfd3ece320f6a399ff2b1376b4 (patch)
treee5b7ab368d574f5693b3063b19cd1810185cfe7f /searchlib
parenteb85f48b3a6e4e69b2d45f2d9393d8b4d8e27daa (diff)
Refactor field merger.
Diffstat (limited to 'searchlib')
-rw-r--r--searchlib/src/vespa/searchlib/diskindex/field_merger.cpp95
-rw-r--r--searchlib/src/vespa/searchlib/diskindex/field_merger.h15
-rw-r--r--searchlib/src/vespa/searchlib/diskindex/fusion.cpp27
-rw-r--r--searchlib/src/vespa/searchlib/diskindex/fusion.h2
-rw-r--r--searchlib/src/vespa/searchlib/test/fakedata/fakememtreeocc.cpp4
-rw-r--r--searchlib/src/vespa/searchlib/util/posting_priority_queue.h59
-rw-r--r--searchlib/src/vespa/searchlib/util/posting_priority_queue.hpp35
-rw-r--r--searchlib/src/vespa/searchlib/util/posting_priority_queue_merger.h32
-rw-r--r--searchlib/src/vespa/searchlib/util/posting_priority_queue_merger.hpp114
-rw-r--r--searchlib/src/vespa/searchlib/util/postingpriorityqueue.h261
10 files changed, 326 insertions, 318 deletions
diff --git a/searchlib/src/vespa/searchlib/diskindex/field_merger.cpp b/searchlib/src/vespa/searchlib/diskindex/field_merger.cpp
index ee5d72f1daa..67ec7faa621 100644
--- a/searchlib/src/vespa/searchlib/diskindex/field_merger.cpp
+++ b/searchlib/src/vespa/searchlib/diskindex/field_merger.cpp
@@ -13,7 +13,7 @@
#include <vespa/searchlib/index/schemautil.h>
#include <vespa/searchlib/util/filekit.h>
#include <vespa/searchlib/util/dirtraverse.h>
-#include <vespa/searchlib/util/postingpriorityqueue.h>
+#include <vespa/searchlib/util/posting_priority_queue_merger.hpp>
#include <vespa/vespalib/io/fileutil.h>
#include <vespa/vespalib/stllike/asciistream.h>
#include <vespa/vespalib/util/exceptions.h>
@@ -52,11 +52,13 @@ createTmpPath(const vespalib::string & base, uint32_t index) {
FieldMerger::FieldMerger(uint32_t id, const FusionOutputIndex& fusion_out_index, std::shared_ptr<IFlushToken> flush_token)
: _id(id),
- _field_dir(fusion_out_index.get_path() + "/" + SchemaUtil::IndexIterator(fusion_out_index.get_schema(), id).getName()),
+ _field_name(SchemaUtil::IndexIterator(fusion_out_index.get_schema(), id).getName()),
+ _field_dir(fusion_out_index.get_path() + "/" + _field_name),
_fusion_out_index(fusion_out_index),
_flush_token(std::move(flush_token)),
_word_readers(),
_word_heap(),
+ _word_aggregator(),
_word_num_mappings(),
_num_word_ids(0),
_readers(),
@@ -107,14 +109,14 @@ bool
FieldMerger::open_input_word_readers()
{
_word_readers.reserve(_fusion_out_index.get_old_indexes().size());
- _word_heap = std::make_unique<PostingPriorityQueue<DictionaryWordReader>>();
+ _word_heap = std::make_unique<PostingPriorityQueueMerger<DictionaryWordReader, WordAggregator>>();
SchemaUtil::IndexIterator index(_fusion_out_index.get_schema(), _id);
for (auto & oi : _fusion_out_index.get_old_indexes()) {
auto reader(std::make_unique<DictionaryWordReader>());
const vespalib::string &tmpindexpath = createTmpPath(_field_dir, oi.getIndex());
const vespalib::string &oldindexpath = oi.getPath();
vespalib::string wordMapName = tmpindexpath + "/old2new.dat";
- vespalib::string fieldDir(oldindexpath + "/" + index.getName());
+ vespalib::string fieldDir(oldindexpath + "/" + _field_name);
vespalib::string dictName(fieldDir + "/dictionary");
const Schema &oldSchema = oi.getSchema();
if (!index.hasOldFields(oldSchema)) {
@@ -163,24 +165,33 @@ FieldMerger::read_mapping_files()
}
bool
-FieldMerger::renumber_word_ids()
+FieldMerger::renumber_word_ids_start()
{
- SchemaUtil::IndexIterator index(_fusion_out_index.get_schema(), _id);
- vespalib::string indexName = index.getName();
- LOG(debug, "Renumber word IDs for field %s", indexName.c_str());
-
- WordAggregator out;
-
+ LOG(debug, "Renumber word IDs for field %s", _field_name.c_str());
if (!open_input_word_readers()) {
return false;
}
- _word_heap->merge(out, 4, *_flush_token);
+ _word_aggregator = std::make_unique<WordAggregator>();
+ return true;
+}
+
+bool
+FieldMerger::renumber_word_ids_main()
+{
+ _word_heap->merge(*_word_aggregator, 4, *_flush_token);
if (_flush_token->stop_requested()) {
return false;
}
assert(_word_heap->empty());
+ return true;
+}
+
+bool
+FieldMerger::renumber_word_ids_finish()
+{
_word_heap.reset();
- _num_word_ids = out.getWordNum();
+ _num_word_ids = _word_aggregator->getWordNum();
+ _word_aggregator.reset();
// Close files
for (auto &i : _word_readers) {
@@ -193,11 +204,23 @@ FieldMerger::renumber_word_ids()
if (!read_mapping_files()) {
return false;
}
- LOG(debug, "Finished renumbering words IDs for field %s", indexName.c_str());
+ LOG(debug, "Finished renumbering words IDs for field %s", _field_name.c_str());
return true;
}
+bool
+FieldMerger::renumber_word_ids()
+{
+ if (!renumber_word_ids_start()) {
+ return false;
+ }
+ if (!renumber_word_ids_main()) {
+ return false;
+ }
+ return renumber_word_ids_finish();
+}
+
std::shared_ptr<FieldLengthScanner>
FieldMerger::allocate_field_length_scanner()
{
@@ -226,7 +249,6 @@ FieldMerger::open_input_field_readers()
_readers.reserve(_fusion_out_index.get_old_indexes().size());
SchemaUtil::IndexIterator index(_fusion_out_index.get_schema(), _id);
auto field_length_scanner = allocate_field_length_scanner();
- vespalib::string indexName = index.getName();
for (const auto &oi : _fusion_out_index.get_old_indexes()) {
const Schema &oldSchema = oi.getSchema();
if (!index.hasOldFields(oldSchema)) {
@@ -234,7 +256,7 @@ FieldMerger::open_input_field_readers()
}
auto reader = FieldReader::allocFieldReader(index, oldSchema, field_length_scanner);
reader->setup(_word_num_mappings[oi.getIndex()], oi.getDocIdMapping());
- if (!reader->open(oi.getPath() + "/" + indexName + "/", _fusion_out_index.get_tune_file_indexing()._read)) {
+ if (!reader->open(oi.getPath() + "/" + _field_name + "/", _fusion_out_index.get_tune_file_indexing()._read)) {
return false;
}
_readers.push_back(std::move(reader));
@@ -322,7 +344,7 @@ FieldMerger::select_cooked_or_raw_features(FieldReader& reader)
bool
FieldMerger::setup_merge_heap()
{
- _heap = std::make_unique<PostingPriorityQueue<FieldReader>>();
+ _heap = std::make_unique<PostingPriorityQueueMerger<FieldReader, FieldWriter>>();
for (auto &reader : _readers) {
if (!select_cooked_or_raw_features(*reader)) {
return false;
@@ -338,12 +360,10 @@ FieldMerger::setup_merge_heap()
}
bool
-FieldMerger::merge_postings()
+FieldMerger::merge_postings_start()
{
- SchemaUtil::IndexIterator index(_fusion_out_index.get_schema(), _id);
/* OUTPUT */
_writer = std::make_unique<FieldWriter>(_fusion_out_index.get_doc_id_limit(), _num_word_ids);
- vespalib::string indexName = index.getName();
if (!open_input_field_readers()) {
return false;
@@ -351,15 +371,23 @@ FieldMerger::merge_postings()
if (!open_field_writer()) {
return false;
}
- if (!setup_merge_heap()) {
- return false;
- }
+ return setup_merge_heap();
+}
+bool
+FieldMerger::merge_postings_main()
+{
_heap->merge(*_writer, 4, *_flush_token);
if (_flush_token->stop_requested()) {
return false;
}
assert(_heap->empty());
+ return true;
+}
+
+bool
+FieldMerger::merge_postings_finish()
+{
_heap.reset();
for (auto &reader : _readers) {
@@ -376,11 +404,22 @@ FieldMerger::merge_postings()
}
bool
+FieldMerger::merge_postings()
+{
+ if (!merge_postings_start()) {
+ return false;
+ }
+ if (!merge_postings_main()) {
+ return false;
+ }
+ return merge_postings_finish();
+}
+
+bool
FieldMerger::merge_field()
{
const Schema &schema = _fusion_out_index.get_schema();
SchemaUtil::IndexIterator index(schema, _id);
- const vespalib::string &indexName = index.getName();
SchemaUtil::IndexSettings settings = index.getIndexSettings();
if (settings.hasError()) {
return false;
@@ -391,7 +430,7 @@ FieldMerger::merge_field()
}
vespalib::mkdir(_field_dir, false);
- LOG(debug, "merge_field for field %s dir %s", indexName.c_str(), _field_dir.c_str());
+ LOG(debug, "merge_field for field %s dir %s", _field_name.c_str(), _field_dir.c_str());
make_tmp_dirs();
@@ -399,7 +438,7 @@ FieldMerger::merge_field()
if (_flush_token->stop_requested()) {
return false;
}
- LOG(error, "Could not renumber field word ids for field %s dir %s", indexName.c_str(), _field_dir.c_str());
+ LOG(error, "Could not renumber field word ids for field %s dir %s", _field_name.c_str(), _field_dir.c_str());
return false;
}
@@ -410,7 +449,7 @@ FieldMerger::merge_field()
return false;
}
throw IllegalArgumentException(make_string("Could not merge field postings for field %s dir %s",
- indexName.c_str(), _field_dir.c_str()));
+ _field_name.c_str(), _field_dir.c_str()));
}
if (!FileKit::createStamp(_field_dir + "/.mergeocc_done")) {
return false;
@@ -421,7 +460,7 @@ FieldMerger::merge_field()
return false;
}
- LOG(debug, "Finished merge_field for field %s dir %s", indexName.c_str(), _field_dir.c_str());
+ LOG(debug, "Finished merge_field for field %s dir %s", _field_name.c_str(), _field_dir.c_str());
return true;
}
diff --git a/searchlib/src/vespa/searchlib/diskindex/field_merger.h b/searchlib/src/vespa/searchlib/diskindex/field_merger.h
index a57005e18a4..bbb2f210511 100644
--- a/searchlib/src/vespa/searchlib/diskindex/field_merger.h
+++ b/searchlib/src/vespa/searchlib/diskindex/field_merger.h
@@ -9,7 +9,7 @@
namespace search {
class IFlushToken;
-template <class IN> class PostingPriorityQueue;
+template <class Reader, class Writer> class PostingPriorityQueueMerger;
}
namespace search::diskindex {
@@ -19,6 +19,7 @@ class FieldLengthScanner;
class FieldReader;
class FieldWriter;
class FusionOutputIndex;
+class WordAggregator;
class WordNumMapping;
/*
@@ -29,27 +30,35 @@ class FieldMerger
using WordNumMappingList = std::vector<WordNumMapping>;
uint32_t _id;
+ vespalib::string _field_name;
vespalib::string _field_dir;
const FusionOutputIndex& _fusion_out_index;
std::shared_ptr<IFlushToken> _flush_token;
std::vector<std::unique_ptr<DictionaryWordReader>> _word_readers;
- std::unique_ptr<PostingPriorityQueue<DictionaryWordReader>> _word_heap;
+ std::unique_ptr<PostingPriorityQueueMerger<DictionaryWordReader, WordAggregator>> _word_heap;
+ std::unique_ptr<WordAggregator> _word_aggregator;
WordNumMappingList _word_num_mappings;
uint64_t _num_word_ids;
std::vector<std::unique_ptr<FieldReader>> _readers;
- std::unique_ptr<PostingPriorityQueue<FieldReader>> _heap;
+ std::unique_ptr<PostingPriorityQueueMerger<FieldReader, FieldWriter>> _heap;
std::unique_ptr<FieldWriter> _writer;
void make_tmp_dirs();
bool clean_tmp_dirs();
bool open_input_word_readers();
bool read_mapping_files();
+ bool renumber_word_ids_start();
+ bool renumber_word_ids_main();
+ bool renumber_word_ids_finish();
bool renumber_word_ids();
std::shared_ptr<FieldLengthScanner> allocate_field_length_scanner();
bool open_input_field_readers();
bool open_field_writer();
bool select_cooked_or_raw_features(FieldReader& reader);
bool setup_merge_heap();
+ bool merge_postings_start();
+ bool merge_postings_main();
+ bool merge_postings_finish();
bool merge_postings();
public:
FieldMerger(uint32_t id, const FusionOutputIndex& fusion_out_index, std::shared_ptr<IFlushToken> flush_token);
diff --git a/searchlib/src/vespa/searchlib/diskindex/fusion.cpp b/searchlib/src/vespa/searchlib/diskindex/fusion.cpp
index eafbbac361b..1afed18cb48 100644
--- a/searchlib/src/vespa/searchlib/diskindex/fusion.cpp
+++ b/searchlib/src/vespa/searchlib/diskindex/fusion.cpp
@@ -3,46 +3,29 @@
#include "fusion.h"
#include "fusion_input_index.h"
#include "field_merger.h"
-#include "fieldreader.h"
-#include "dictionarywordreader.h"
-#include "field_length_scanner.h"
-#include <vespa/vespalib/util/stringfmt.h>
-#include <vespa/searchlib/bitcompression/posocc_fields_params.h>
+#include <vespa/fastos/file.h>
+#include <vespa/searchlib/common/documentsummary.h>
#include <vespa/searchlib/common/i_flush_token.h>
-#include <vespa/searchlib/index/field_length_info.h>
-#include <vespa/searchlib/util/filekit.h>
+#include <vespa/searchlib/index/schemautil.h>
#include <vespa/searchlib/util/dirtraverse.h>
-#include <vespa/searchlib/util/postingpriorityqueue.h>
#include <vespa/vespalib/io/fileutil.h>
-#include <vespa/searchlib/common/documentsummary.h>
+#include <vespa/vespalib/util/count_down_latch.h>
#include <vespa/vespalib/util/error.h>
+#include <vespa/vespalib/util/exceptions.h>
#include <vespa/vespalib/util/lambdatask.h>
-#include <vespa/vespalib/util/count_down_latch.h>
-#include <vespa/vespalib/stllike/asciistream.h>
#include <vespa/document/util/queue.h>
-#include <sstream>
#include <vespa/log/log.h>
-#include <vespa/vespalib/util/exceptions.h>
LOG_SETUP(".diskindex.fusion");
-using search::FileKit;
-using search::PostingPriorityQueue;
using search::common::FileHeaderContext;
-using search::diskindex::DocIdMapping;
-using search::diskindex::WordNumMapping;
using search::docsummary::DocumentSummary;
-using search::index::FieldLengthInfo;
-using search::bitcompression::PosOccFieldParams;
-using search::bitcompression::PosOccFieldsParams;
-using search::index::PostingListParams;
using search::index::Schema;
using search::index::SchemaUtil;
using search::index::schema::DataType;
using vespalib::getLastErrorString;
using vespalib::IllegalArgumentException;
-using vespalib::make_string;
namespace search::diskindex {
diff --git a/searchlib/src/vespa/searchlib/diskindex/fusion.h b/searchlib/src/vespa/searchlib/diskindex/fusion.h
index 22dda4d6edf..7e0b70dca36 100644
--- a/searchlib/src/vespa/searchlib/diskindex/fusion.h
+++ b/searchlib/src/vespa/searchlib/diskindex/fusion.h
@@ -3,12 +3,10 @@
#pragma once
#include "fusion_output_index.h"
-
#include <vespa/vespalib/util/threadexecutor.h>
namespace search {
class IFlushToken;
-template <class IN> class PostingPriorityQueue;
class TuneFileIndexing;
}
diff --git a/searchlib/src/vespa/searchlib/test/fakedata/fakememtreeocc.cpp b/searchlib/src/vespa/searchlib/test/fakedata/fakememtreeocc.cpp
index 8ae3fe0bdad..64d194c5c7e 100644
--- a/searchlib/src/vespa/searchlib/test/fakedata/fakememtreeocc.cpp
+++ b/searchlib/src/vespa/searchlib/test/fakedata/fakememtreeocc.cpp
@@ -5,7 +5,7 @@
#include <vespa/searchlib/common/flush_token.h>
#include <vespa/searchlib/memoryindex/posting_iterator.h>
#include <vespa/searchlib/queryeval/iterators.h>
-#include <vespa/searchlib/util/postingpriorityqueue.h>
+#include <vespa/searchlib/util/posting_priority_queue_merger.hpp>
#include <vespa/vespalib/datastore/buffer_type.hpp>
#include <vespa/vespalib/btree/btreeiterator.hpp>
#include <vespa/vespalib/btree/btreenode.hpp>
@@ -352,7 +352,7 @@ FakeMemTreeOccFactory::setup(const std::vector<const FakeWord *> &fws)
++wordIdx;
}
- PostingPriorityQueue<FakeWord::RandomizedReader> heap;
+ PostingPriorityQueueMerger<FakeWord::RandomizedReader, FakeWord::RandomizedWriter> heap;
std::vector<FakeWord::RandomizedReader>::iterator i(r.begin());
std::vector<FakeWord::RandomizedReader>::iterator ie(r.end());
FlushToken flush_token;
diff --git a/searchlib/src/vespa/searchlib/util/posting_priority_queue.h b/searchlib/src/vespa/searchlib/util/posting_priority_queue.h
new file mode 100644
index 00000000000..01ae0995806
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/util/posting_priority_queue.h
@@ -0,0 +1,59 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vector>
+
+namespace search {
+
+/*
+ * Provide priority queue semantics for a set of posting readers.
+ */
+template <class Reader>
+class PostingPriorityQueue
+{
+public:
+ class Ref
+ {
+ Reader *_ref;
+ public:
+ Ref(Reader *ref)
+ : _ref(ref)
+ {
+ }
+
+ bool operator<(const Ref &rhs) const { return *_ref < *rhs._ref; }
+ Reader *get() const noexcept { return _ref; }
+ };
+
+ using Vector = std::vector<Ref>;
+ Vector _vec;
+
+ PostingPriorityQueue()
+ : _vec()
+ {
+ }
+
+ bool empty() const { return _vec.empty(); }
+ void clear() { _vec.clear(); }
+ void initialAdd(Reader *it) { _vec.push_back(Ref(it)); }
+
+ /*
+ * Sort vector after a set of initial add operations, so lowest()
+ * and adjust() can be used.
+ */
+ void sort() { std::sort(_vec.begin(), _vec.end()); }
+
+ /*
+ * Return lowest value. Assumes vector is sorted.
+ */
+ Reader *lowest() const { return _vec.front().get(); }
+
+ /*
+ * The vector might no longer be sorted since the first element has changed
+ * value. Perform adjustments to make vector sorted again.
+ */
+ void adjust();
+};
+
+}
diff --git a/searchlib/src/vespa/searchlib/util/posting_priority_queue.hpp b/searchlib/src/vespa/searchlib/util/posting_priority_queue.hpp
new file mode 100644
index 00000000000..69bf7bc547c
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/util/posting_priority_queue.hpp
@@ -0,0 +1,35 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "posting_priority_queue.h"
+
+namespace search {
+
+template <class Reader>
+void
+PostingPriorityQueue<Reader>::adjust()
+{
+ typedef typename Vector::iterator VIT;
+ if (!_vec.front().get()->isValid()) {
+ _vec.erase(_vec.begin()); // Iterator no longer valid
+ return;
+ }
+ if (_vec.size() == 1) { // Only one iterator left
+ return;
+ }
+ // Peform binary search to find first element higher than changed value
+ VIT gt = std::upper_bound(_vec.begin() + 1, _vec.end(), _vec.front());
+ VIT to = _vec.begin();
+ VIT from = to;
+ ++from;
+ Ref changed = *to; // Remember changed value
+ while (from != gt) { // Shift elements to make space for changed value
+ *to = *from;
+ ++from;
+ ++to;
+ }
+ *to = changed; // Save changed value at right location
+}
+
+}
diff --git a/searchlib/src/vespa/searchlib/util/posting_priority_queue_merger.h b/searchlib/src/vespa/searchlib/util/posting_priority_queue_merger.h
new file mode 100644
index 00000000000..8dd941a2c13
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/util/posting_priority_queue_merger.h
@@ -0,0 +1,32 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "posting_priority_queue.h"
+
+namespace search {
+
+/*
+ * Provide priority queue semantics for a set of posting readers with
+ * merging to a posting writer.
+ */
+template <class Reader, class Writer>
+class PostingPriorityQueueMerger : public PostingPriorityQueue<Reader>
+{
+public:
+ using Parent = PostingPriorityQueue<Reader>;
+ using Vector = typename Parent::Vector;
+ using Parent::_vec;
+ using Parent::adjust;
+ using Parent::empty;
+ using Parent::lowest;
+ using Parent::sort;
+
+ void mergeHeap(Writer& writer, const IFlushToken& flush_token) __attribute__((noinline));
+ static void mergeOne(Writer& writer, Reader& reader, const IFlushToken &flush_token) __attribute__((noinline));
+ static void mergeTwo(Writer& writer, Reader& reader1, Reader& reader2, const IFlushToken& flush_token) __attribute__((noinline));
+ static void mergeSmall(Writer& writer, typename Vector::iterator ib, typename Vector::iterator ie, const IFlushToken &flush_token) __attribute__((noinline));
+ void merge(Writer& writer, uint32_t heapLimit, const IFlushToken& flush_token) __attribute__((noinline));
+};
+
+}
diff --git a/searchlib/src/vespa/searchlib/util/posting_priority_queue_merger.hpp b/searchlib/src/vespa/searchlib/util/posting_priority_queue_merger.hpp
new file mode 100644
index 00000000000..d33356cee4a
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/util/posting_priority_queue_merger.hpp
@@ -0,0 +1,114 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "posting_priority_queue.hpp"
+#include "posting_priority_queue_merger.h"
+
+namespace search {
+
+template <class Reader, class Writer>
+void
+PostingPriorityQueueMerger<Reader, Writer>::mergeHeap(Writer& writer, const IFlushToken& flush_token)
+{
+ while (!empty() && !flush_token.stop_requested()) {
+ Reader *low = lowest();
+ low->write(writer);
+ low->read();
+ adjust();
+ }
+}
+
+template <class Reader, class Writer>
+void
+PostingPriorityQueueMerger<Reader, Writer>::mergeOne(Writer& writer, Reader& reader, const IFlushToken& flush_token)
+{
+ while (reader.isValid() && !flush_token.stop_requested()) {
+ reader.write(writer);
+ reader.read();
+ }
+}
+
+template <class Reader, class Writer>
+void
+PostingPriorityQueueMerger<Reader, Writer>::mergeTwo(Writer& writer, Reader& reader1, Reader& reader2, const IFlushToken& flush_token)
+{
+ while (!flush_token.stop_requested()) {
+ Reader &low = reader2 < reader1 ? reader2 : reader1;
+ low.write(writer);
+ low.read();
+ if (!low.isValid())
+ break;
+ }
+}
+
+template <class Reader, class Writer>
+void
+PostingPriorityQueueMerger<Reader, Writer>::mergeSmall(Writer& writer, typename Vector::iterator ib, typename Vector::iterator ie, const IFlushToken& flush_token)
+{
+ while (!flush_token.stop_requested()) {
+ typename Vector::iterator i = ib;
+ Reader *low = i->get();
+ for (++i; i != ie; ++i)
+ if (*i->get() < *low)
+ low = i->get();
+ low->write(writer);
+ low->read();
+ if (!low->isValid())
+ break;
+ }
+}
+
+template <class Reader, class Writer>
+void
+PostingPriorityQueueMerger<Reader, Writer>::merge(Writer& writer, uint32_t heapLimit, const IFlushToken& flush_token)
+{
+ if (_vec.empty())
+ return;
+ for (typename Vector::iterator i = _vec.begin(), ie = _vec.end(); i != ie;
+ ++i) {
+ assert(i->get()->isValid());
+ }
+ if (_vec.size() >= heapLimit) {
+ sort();
+ void (PostingPriorityQueueMerger::*mergeHeapFunc)(Writer& writer, const IFlushToken& flush_token) =
+ &PostingPriorityQueueMerger::mergeHeap;
+ (this->*mergeHeapFunc)(writer, flush_token);
+ return;
+ }
+ while (!flush_token.stop_requested()) {
+ if (_vec.size() == 1) {
+ void (*mergeOneFunc)(Writer& writer, Reader& reader, const IFlushToken& flush_token) =
+ &PostingPriorityQueueMerger::mergeOne;
+ (*mergeOneFunc)(writer, *_vec.front().get(), flush_token);
+ _vec.clear();
+ return;
+ }
+ if (_vec.size() == 2) {
+ void (*mergeTwoFunc)(Writer& writer, Reader& reader1, Reader& reader2, const IFlushToken& flush_token) =
+ &PostingPriorityQueueMerger::mergeTwo;
+ (*mergeTwoFunc)(writer, *_vec[0].get(), *_vec[1].get(), flush_token);
+ } else {
+ void (*mergeSmallFunc)(Writer& writer,
+ typename Vector::iterator ib,
+ typename Vector::iterator ie,
+ const IFlushToken& flush_token) =
+ &PostingPriorityQueueMerger::mergeSmall;
+ (*mergeSmallFunc)(writer, _vec.begin(), _vec.end(), flush_token);
+ }
+ for (typename Vector::iterator i = _vec.begin(), ie = _vec.end();
+ i != ie; ++i) {
+ if (!i->get()->isValid()) {
+ _vec.erase(i);
+ break;
+ }
+ }
+ for (typename Vector::iterator i = _vec.begin(), ie = _vec.end();
+ i != ie; ++i) {
+ assert(i->get()->isValid());
+ }
+ assert(!_vec.empty());
+ }
+}
+
+}
diff --git a/searchlib/src/vespa/searchlib/util/postingpriorityqueue.h b/searchlib/src/vespa/searchlib/util/postingpriorityqueue.h
deleted file mode 100644
index c263c6bc470..00000000000
--- a/searchlib/src/vespa/searchlib/util/postingpriorityqueue.h
+++ /dev/null
@@ -1,261 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#pragma once
-
-#include <vector>
-
-namespace search
-{
-
-/*
- * Provide priority queue semantics for a set of posting inputs.
- */
-template <class IN>
-class PostingPriorityQueue
-{
-public:
- class Ref
- {
- IN *_ref;
- public:
- Ref(IN *ref)
- : _ref(ref)
- {
- }
-
- bool
- operator<(const Ref &rhs) const
- {
- return *_ref < *rhs._ref;
- }
-
- IN *
- get() const
- {
- return _ref;
- }
- };
-
- typedef std::vector<Ref> Vector;
- Vector _vec;
-
- PostingPriorityQueue()
- : _vec()
- {
- }
-
- bool
- empty() const
- {
- return _vec.empty();
- }
-
- void
- clear()
- {
- _vec.clear();
- }
-
- void
- initialAdd(IN *it)
- {
- _vec.push_back(Ref(it));
- }
-
- /*
- * Sort vector after a set of initial add operations, so lowest()
- * and adjust() can be used.
- */
- void
- sort()
- {
- std::sort(_vec.begin(), _vec.end());
- }
-
- /*
- * Return lowest value. Assumes vector is sorted.
- */
- IN *
- lowest() const
- {
- return _vec.front().get();
- }
-
- /*
- * The vector might no longer be sorted since the first element has changed
- * value. Perform adjustments to make vector sorted again.
- */
- void
- adjust();
-
-
- template <class OUT>
- void
- mergeHeap(OUT &out, const IFlushToken& flush_token) __attribute__((noinline));
-
- template <class OUT>
- static void
- mergeOne(OUT &out, IN &in, const IFlushToken &flush_token) __attribute__((noinline));
-
- template <class OUT>
- static void
- mergeTwo(OUT &out, IN &in1, IN &in2, const IFlushToken& flush_token) __attribute__((noinline));
-
- template <class OUT>
- static void
- mergeSmall(OUT &out,
- typename Vector::iterator ib,
- typename Vector::iterator ie,
- const IFlushToken &flush_token)
- __attribute__((noinline));
-
- template <class OUT>
- void
- merge(OUT &out, uint32_t heapLimit, const IFlushToken& flush_token) __attribute__((noinline));
-};
-
-
-template <class IN>
-void
-PostingPriorityQueue<IN>::adjust()
-{
- typedef typename Vector::iterator VIT;
- if (!_vec.front().get()->isValid()) {
- _vec.erase(_vec.begin()); // Iterator no longer valid
- return;
- }
- if (_vec.size() == 1) // Only one iterator left
- return;
- // Peform binary search to find first element higher than changed value
- VIT gt = std::upper_bound(_vec.begin() + 1, _vec.end(), _vec.front());
- VIT to = _vec.begin();
- VIT from = to;
- ++from;
- Ref changed = *to; // Remember changed value
- while (from != gt) { // Shift elements to make space for changed value
- *to = *from;
- ++from;
- ++to;
- }
- *to = changed; // Save changed value at right location
-}
-
-
-template <class IN>
-template <class OUT>
-void
-PostingPriorityQueue<IN>::mergeHeap(OUT &out, const IFlushToken& flush_token)
-{
- while (!empty() && !flush_token.stop_requested()) {
- IN *low = lowest();
- low->write(out);
- low->read();
- adjust();
- }
-}
-
-
-template <class IN>
-template <class OUT>
-void
-PostingPriorityQueue<IN>::mergeOne(OUT &out, IN &in, const IFlushToken& flush_token)
-{
- while (in.isValid() && !flush_token.stop_requested()) {
- in.write(out);
- in.read();
- }
-}
-
-template <class IN>
-template <class OUT>
-void
-PostingPriorityQueue<IN>::mergeTwo(OUT &out, IN &in1, IN &in2, const IFlushToken& flush_token)
-{
- while (!flush_token.stop_requested()) {
- IN &low = in2 < in1 ? in2 : in1;
- low.write(out);
- low.read();
- if (!low.isValid())
- break;
- }
-}
-
-
-template <class IN>
-template <class OUT>
-void
-PostingPriorityQueue<IN>::mergeSmall(OUT &out,
- typename Vector::iterator ib,
- typename Vector::iterator ie,
- const IFlushToken& flush_token)
-{
- while (!flush_token.stop_requested()) {
- typename Vector::iterator i = ib;
- IN *low = i->get();
- for (++i; i != ie; ++i)
- if (*i->get() < *low)
- low = i->get();
- low->write(out);
- low->read();
- if (!low->isValid())
- break;
- }
-}
-
-
-template <class IN>
-template <class OUT>
-void
-PostingPriorityQueue<IN>::merge(OUT &out, uint32_t heapLimit, const IFlushToken& flush_token)
-{
- if (_vec.empty())
- return;
- for (typename Vector::iterator i = _vec.begin(), ie = _vec.end(); i != ie;
- ++i) {
- assert(i->get()->isValid());
- }
- if (_vec.size() >= heapLimit) {
- sort();
- void (PostingPriorityQueue::*mergeHeapFunc)(OUT &out, const IFlushToken& flush_token) =
- &PostingPriorityQueue::mergeHeap;
- (this->*mergeHeapFunc)(out, flush_token);
- return;
- }
- while (!flush_token.stop_requested()) {
- if (_vec.size() == 1) {
- void (*mergeOneFunc)(OUT &out, IN &in, const IFlushToken& flush_token) =
- &PostingPriorityQueue<IN>::mergeOne;
- (*mergeOneFunc)(out, *_vec.front().get(), flush_token);
- _vec.clear();
- return;
- }
- if (_vec.size() == 2) {
- void (*mergeTwoFunc)(OUT &out, IN &in1, IN &in2, const IFlushToken& flush_token) =
- &PostingPriorityQueue<IN>::mergeTwo;
- (*mergeTwoFunc)(out, *_vec[0].get(), *_vec[1].get(), flush_token);
- } else {
- void (*mergeSmallFunc)(OUT &out,
- typename Vector::iterator ib,
- typename Vector::iterator ie,
- const IFlushToken& flush_token) =
- &PostingPriorityQueue::mergeSmall;
- (*mergeSmallFunc)(out, _vec.begin(), _vec.end(), flush_token);
- }
- for (typename Vector::iterator i = _vec.begin(), ie = _vec.end();
- i != ie; ++i) {
- if (!i->get()->isValid()) {
- _vec.erase(i);
- break;
- }
- }
- for (typename Vector::iterator i = _vec.begin(), ie = _vec.end();
- i != ie; ++i) {
- assert(i->get()->isValid());
- }
- assert(!_vec.empty());
- }
-}
-
-
-} // namespace search
-