summaryrefslogtreecommitdiffstats
path: root/searchlib
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@broadpark.no>2021-01-05 12:51:15 +0100
committerTor Egge <Tor.Egge@broadpark.no>2021-01-05 14:58:59 +0100
commit42a02eddf6b450682994afae68249171fe7876b8 (patch)
tree6a0a14eac83492008788f41baf81c6dcc87d0a5d /searchlib
parent6382cb8513ab166e4e4184e0ddebd60f97fb6bb3 (diff)
Add low level support for stopping a running disk index fusion.
Diffstat (limited to 'searchlib')
-rw-r--r--searchlib/src/tests/diskindex/fusion/fusion_test.cpp81
-rw-r--r--searchlib/src/vespa/searchlib/common/CMakeLists.txt1
-rw-r--r--searchlib/src/vespa/searchlib/common/flush_token.cpp26
-rw-r--r--searchlib/src/vespa/searchlib/common/flush_token.h20
-rw-r--r--searchlib/src/vespa/searchlib/common/i_flush_token.h15
-rw-r--r--searchlib/src/vespa/searchlib/diskindex/fieldreader.h1
-rw-r--r--searchlib/src/vespa/searchlib/diskindex/fusion.cpp39
-rw-r--r--searchlib/src/vespa/searchlib/diskindex/fusion.h16
-rw-r--r--searchlib/src/vespa/searchlib/test/fakedata/fakememtreeocc.cpp10
-rw-r--r--searchlib/src/vespa/searchlib/util/postingpriorityqueue.h47
10 files changed, 200 insertions, 56 deletions
diff --git a/searchlib/src/tests/diskindex/fusion/fusion_test.cpp b/searchlib/src/tests/diskindex/fusion/fusion_test.cpp
index c94d19cb3b7..efc9e99bf88 100644
--- a/searchlib/src/tests/diskindex/fusion/fusion_test.cpp
+++ b/searchlib/src/tests/diskindex/fusion/fusion_test.cpp
@@ -1,5 +1,6 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/searchlib/common/flush_token.h>
#include <vespa/searchlib/diskindex/diskindex.h>
#include <vespa/searchlib/diskindex/fusion.h>
#include <vespa/searchlib/diskindex/indexbuilder.h>
@@ -61,6 +62,7 @@ protected:
void requireThatFusionIsWorking(const vespalib::string &prefix, bool directio, bool readmmap);
void make_simple_index(const vespalib::string &dump_dir, const IFieldLengthInspector &field_length_inspector);
+ bool try_merge_simple_indexes(const vespalib::string &dump_dir, const std::vector<vespalib::string> &sources, std::shared_ptr<IFlushToken> flush_token);
void merge_simple_indexes(const vespalib::string &dump_dir, const std::vector<vespalib::string> &sources);
public:
FusionTest();
@@ -390,7 +392,7 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire
sources.push_back(prefix + "dump2");
ASSERT_TRUE(Fusion::merge(schema, prefix + "dump3", sources, selector,
dynamicKPosOcc,
- tuneFileIndexing,fileHeaderContext, executor));
+ tuneFileIndexing,fileHeaderContext, executor, std::make_shared<FlushToken>()));
} while (0);
do {
DiskIndex dw3(prefix + "dump3");
@@ -403,7 +405,7 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire
sources.push_back(prefix + "dump3");
ASSERT_TRUE(Fusion::merge(schema2, prefix + "dump4", sources, selector,
dynamicKPosOcc,
- tuneFileIndexing, fileHeaderContext, executor));
+ tuneFileIndexing, fileHeaderContext, executor, std::make_shared<FlushToken>()));
} while (0);
do {
DiskIndex dw4(prefix + "dump4");
@@ -416,7 +418,7 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire
sources.push_back(prefix + "dump3");
ASSERT_TRUE(Fusion::merge(schema3, prefix + "dump5", sources, selector,
dynamicKPosOcc,
- tuneFileIndexing, fileHeaderContext, executor));
+ tuneFileIndexing, fileHeaderContext, executor, std::make_shared<FlushToken>()));
} while (0);
do {
DiskIndex dw5(prefix + "dump5");
@@ -429,7 +431,7 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire
sources.push_back(prefix + "dump3");
ASSERT_TRUE(Fusion::merge(schema, prefix + "dump6", sources, selector,
!dynamicKPosOcc,
- tuneFileIndexing, fileHeaderContext, executor));
+ tuneFileIndexing, fileHeaderContext, executor, std::make_shared<FlushToken>()));
} while (0);
do {
DiskIndex dw6(prefix + "dump6");
@@ -442,7 +444,7 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire
sources.push_back(prefix + "dump2");
ASSERT_TRUE(Fusion::merge(schema, prefix + "dump3", sources, selector,
dynamicKPosOcc,
- tuneFileIndexing, fileHeaderContext, executor));
+ tuneFileIndexing, fileHeaderContext, executor, std::make_shared<FlushToken>()));
} while (0);
do {
DiskIndex dw3(prefix + "dump3");
@@ -476,16 +478,22 @@ FusionTest::make_simple_index(const vespalib::string &dump_dir, const IFieldLeng
ib.close();
}
-void
-FusionTest::merge_simple_indexes(const vespalib::string &dump_dir, const std::vector<vespalib::string> &sources)
+bool
+FusionTest::try_merge_simple_indexes(const vespalib::string &dump_dir, const std::vector<vespalib::string> &sources, std::shared_ptr<IFlushToken> flush_token)
{
vespalib::ThreadStackExecutor executor(4, 0x10000);
TuneFileIndexing tuneFileIndexing;
DummyFileHeaderContext fileHeaderContext;
SelectorArray selector(20, 0);
- ASSERT_TRUE(Fusion::merge(_schema, dump_dir, sources, selector,
- false,
- tuneFileIndexing, fileHeaderContext, executor));
+ return Fusion::merge(_schema, dump_dir, sources, selector,
+ false,
+ tuneFileIndexing, fileHeaderContext, executor, flush_token);
+}
+
+void
+FusionTest::merge_simple_indexes(const vespalib::string &dump_dir, const std::vector<vespalib::string> &sources)
+{
+ ASSERT_TRUE(try_merge_simple_indexes(dump_dir, sources, std::make_shared<FlushToken>()));
}
FusionTest::FusionTest()
@@ -553,6 +561,59 @@ TEST_F(FusionTest, require_that_interleaved_features_can_be_reconstructed)
clean_field_length_testdirs();
}
+namespace {
+
+void clean_stopped_fusion_testdirs()
+{
+ vespalib::rmdir("stopdump2", true);
+ vespalib::rmdir("stopdump3", true);
+}
+
+class MyFlushToken : public FlushToken
+{
+ mutable std::atomic<size_t> _checks;
+ const size_t _limit;
+public:
+ MyFlushToken(size_t limit)
+ : FlushToken(),
+ _checks(0u),
+ _limit(limit)
+ {
+ }
+ ~MyFlushToken() override = default;
+ bool stop_requested() const noexcept override;
+ size_t get_checks() const noexcept { return _checks; }
+};
+
+bool
+MyFlushToken::stop_requested() const noexcept
+{
+ if (++_checks >= _limit) {
+ const_cast<MyFlushToken *>(this)->request_stop();
+ }
+ return FlushToken::stop_requested();
+}
+
+}
+
+TEST_F(FusionTest, require_that_fusion_can_be_stopped)
+{
+ clean_stopped_fusion_testdirs();
+ auto flush_token = std::make_shared<MyFlushToken>(10000);
+ make_simple_index("stopdump2", MockFieldLengthInspector());
+ ASSERT_TRUE(try_merge_simple_indexes("stopdump3", {"stopdump2"}, flush_token));
+ EXPECT_EQ(40, flush_token->get_checks());
+ vespalib::rmdir("stopdump3", true);
+ flush_token = std::make_shared<MyFlushToken>(1);
+ ASSERT_FALSE(try_merge_simple_indexes("stopdump3", {"stopdump2"}, flush_token));
+ EXPECT_EQ(12, flush_token->get_checks());
+ vespalib::rmdir("stopdump3", true);
+ flush_token = std::make_shared<MyFlushToken>(39);
+ ASSERT_FALSE(try_merge_simple_indexes("stopdump3", {"stopdump2"}, flush_token));
+ EXPECT_EQ(41, flush_token->get_checks());
+ clean_stopped_fusion_testdirs();
+}
+
}
}
diff --git a/searchlib/src/vespa/searchlib/common/CMakeLists.txt b/searchlib/src/vespa/searchlib/common/CMakeLists.txt
index 7e1cfd8fec5..97af7855aea 100644
--- a/searchlib/src/vespa/searchlib/common/CMakeLists.txt
+++ b/searchlib/src/vespa/searchlib/common/CMakeLists.txt
@@ -11,6 +11,7 @@ vespa_add_library(searchlib_common OBJECT
documentsummary.cpp
featureset.cpp
fileheadercontext.cpp
+ flush_token.cpp
gatecallback.cpp
geo_location.cpp
geo_location_spec.cpp
diff --git a/searchlib/src/vespa/searchlib/common/flush_token.cpp b/searchlib/src/vespa/searchlib/common/flush_token.cpp
new file mode 100644
index 00000000000..b5b34bef9a5
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/common/flush_token.cpp
@@ -0,0 +1,26 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "flush_token.h"
+
+namespace search {
+
+FlushToken::FlushToken()
+ : _stop(false)
+{
+}
+
+FlushToken::~FlushToken() = default;
+
+bool
+FlushToken::stop_requested() const noexcept
+{
+ return _stop.load(std::memory_order_relaxed);
+}
+
+void
+FlushToken::request_stop() noexcept
+{
+ _stop.store(true, std::memory_order_relaxed);
+}
+
+}
diff --git a/searchlib/src/vespa/searchlib/common/flush_token.h b/searchlib/src/vespa/searchlib/common/flush_token.h
new file mode 100644
index 00000000000..ea882dbbe45
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/common/flush_token.h
@@ -0,0 +1,20 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "i_flush_token.h"
+#include <atomic>
+
+namespace search {
+
+/*
+ * Class for checking if current flush task should be stopped.
+ */
+class FlushToken : public IFlushToken {
+ std::atomic<bool> _stop;
+public:
+ FlushToken();
+ ~FlushToken() override;
+ bool stop_requested() const noexcept override;
+ void request_stop() noexcept;
+};
+
+}
diff --git a/searchlib/src/vespa/searchlib/common/i_flush_token.h b/searchlib/src/vespa/searchlib/common/i_flush_token.h
new file mode 100644
index 00000000000..281c1a13382
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/common/i_flush_token.h
@@ -0,0 +1,15 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+namespace search {
+
+/*
+ * Interface class for checking if current flush task should be stopped.
+ * TODO: Add methods to register transient memory usage during flush.
+ */
+class IFlushToken {
+public:
+ virtual ~IFlushToken() = default;
+ virtual bool stop_requested() const noexcept = 0;
+};
+
+}
diff --git a/searchlib/src/vespa/searchlib/diskindex/fieldreader.h b/searchlib/src/vespa/searchlib/diskindex/fieldreader.h
index 899b3708bf9..e95a8389d38 100644
--- a/searchlib/src/vespa/searchlib/diskindex/fieldreader.h
+++ b/searchlib/src/vespa/searchlib/diskindex/fieldreader.h
@@ -6,7 +6,6 @@
#include <vespa/searchlib/index/docidandfeatures.h>
#include <vespa/searchlib/index/postinglistfile.h>
#include <vespa/searchlib/index/schemautil.h>
-#include <vespa/searchlib/util/postingpriorityqueue.h>
#include "wordnummapper.h"
#include "docidmapper.h"
#include "fieldwriter.h"
diff --git a/searchlib/src/vespa/searchlib/diskindex/fusion.cpp b/searchlib/src/vespa/searchlib/diskindex/fusion.cpp
index 0ad178d14b3..c8b0674252f 100644
--- a/searchlib/src/vespa/searchlib/diskindex/fusion.cpp
+++ b/searchlib/src/vespa/searchlib/diskindex/fusion.cpp
@@ -6,9 +6,11 @@
#include "field_length_scanner.h"
#include <vespa/vespalib/util/stringfmt.h>
#include <vespa/searchlib/bitcompression/posocc_fields_params.h>
+#include <vespa/searchlib/common/i_flush_token.h>
#include <vespa/searchlib/index/field_length_info.h>
#include <vespa/searchlib/util/filekit.h>
#include <vespa/searchlib/util/dirtraverse.h>
+#include <vespa/searchlib/util/postingpriorityqueue.h>
#include <vespa/vespalib/io/fileutil.h>
#include <vespa/searchlib/common/documentsummary.h>
#include <vespa/vespalib/util/error.h>
@@ -139,7 +141,7 @@ Fusion::openInputWordReaders(const vespalib::string & dir, const SchemaUtil::Ind
bool
Fusion::renumberFieldWordIds(const vespalib::string & dir, const SchemaUtil::IndexIterator &index,
- WordNumMappingList & list, uint64_t & numWordIds)
+ WordNumMappingList & list, uint64_t & numWordIds, const IFlushToken& flush_token)
{
vespalib::string indexName = index.getName();
LOG(debug, "Renumber word IDs for field %s", indexName.c_str());
@@ -151,7 +153,10 @@ Fusion::renumberFieldWordIds(const vespalib::string & dir, const SchemaUtil::Ind
if (!openInputWordReaders(dir, index, readers, heap)) {
return false;
}
- heap.merge(out, 4);
+ heap.merge(out, 4, flush_token);
+ if (flush_token.stop_requested()) {
+ return false;
+ }
assert(heap.empty());
numWordIds = out.getWordNum();
@@ -172,7 +177,7 @@ Fusion::renumberFieldWordIds(const vespalib::string & dir, const SchemaUtil::Ind
bool
-Fusion::mergeFields(vespalib::ThreadExecutor & executor)
+Fusion::mergeFields(vespalib::ThreadExecutor & executor, std::shared_ptr<IFlushToken> flush_token)
{
const Schema &schema = getSchema();
std::atomic<uint32_t> failed(0);
@@ -181,8 +186,8 @@ Fusion::mergeFields(vespalib::ThreadExecutor & executor)
vespalib::CountDownLatch done(schema.getNumIndexFields());
for (SchemaUtil::IndexIterator iter(schema); iter.isValid(); ++iter) {
concurrent.wait();
- executor.execute(vespalib::makeLambdaTask([this, index=iter.getIndex(), &failed, &done, &concurrent]() {
- if (!mergeField(index)) {
+ executor.execute(vespalib::makeLambdaTask([this, index=iter.getIndex(), &failed, &done, &concurrent, flush_token]() {
+ if (!mergeField(index, flush_token)) {
failed++;
}
concurrent.post();
@@ -197,7 +202,7 @@ Fusion::mergeFields(vespalib::ThreadExecutor & executor)
bool
-Fusion::mergeField(uint32_t id)
+Fusion::mergeField(uint32_t id, std::shared_ptr<IFlushToken> flush_token)
{
typedef SchemaUtil::IndexIterator IndexIterator;
typedef SchemaUtil::IndexSettings IndexSettings;
@@ -222,14 +227,20 @@ Fusion::mergeField(uint32_t id)
WordNumMappingList list(_oldIndexes.size());
uint64_t numWordIds(0);
- if (!renumberFieldWordIds(indexDir, index, list, numWordIds)) {
+ if (!renumberFieldWordIds(indexDir, index, list, numWordIds, *flush_token)) {
+ if (flush_token->stop_requested()) {
+ return false;
+ }
LOG(error, "Could not renumber field word ids for field %s dir %s", indexName.c_str(), indexDir.c_str());
return false;
}
// Tokamak
- bool res = mergeFieldPostings(index, list, numWordIds);
+ bool res = mergeFieldPostings(index, list, numWordIds, *flush_token);
if (!res) {
+ if (flush_token->stop_requested()) {
+ return false;
+ }
throw IllegalArgumentException(make_string("Could not merge field postings for field %s dir %s",
indexName.c_str(), indexDir.c_str()));
}
@@ -387,7 +398,7 @@ Fusion::setupMergeHeap(const std::vector<std::unique_ptr<FieldReader> > & reader
bool
-Fusion::mergeFieldPostings(const SchemaUtil::IndexIterator &index, const WordNumMappingList & list, uint64_t numWordIds)
+Fusion::mergeFieldPostings(const SchemaUtil::IndexIterator &index, const WordNumMappingList & list, uint64_t numWordIds, const IFlushToken& flush_token)
{
std::vector<std::unique_ptr<FieldReader>> readers;
PostingPriorityQueue<FieldReader> heap;
@@ -409,7 +420,10 @@ Fusion::mergeFieldPostings(const SchemaUtil::IndexIterator &index, const WordNum
return false;
}
- heap.merge(fieldWriter, 4);
+ heap.merge(fieldWriter, 4, flush_token);
+ if (flush_token.stop_requested()) {
+ return false;
+ }
assert(heap.empty());
for (auto &reader : readers) {
@@ -510,7 +524,8 @@ bool
Fusion::merge(const Schema &schema, const vespalib::string &dir, const std::vector<vespalib::string> &sources,
const SelectorArray &selector, bool dynamicKPosOccFormat,
const TuneFileIndexing &tuneFileIndexing, const FileHeaderContext &fileHeaderContext,
- vespalib::ThreadExecutor & executor)
+ vespalib::ThreadExecutor & executor,
+ std::shared_ptr<IFlushToken> flush_token)
{
assert(sources.size() <= 255);
uint32_t docIdLimit = selector.size();
@@ -550,7 +565,7 @@ Fusion::merge(const Schema &schema, const vespalib::string &dir, const std::vect
try {
auto fusion = std::make_unique<Fusion>(trimmedDocIdLimit, schema, dir, sources, selector,
dynamicKPosOccFormat, tuneFileIndexing, fileHeaderContext);
- return fusion->mergeFields(executor);
+ return fusion->mergeFields(executor, flush_token);
} catch (const std::exception & e) {
LOG(error, "%s", e.what());
return false;
diff --git a/searchlib/src/vespa/searchlib/diskindex/fusion.h b/searchlib/src/vespa/searchlib/diskindex/fusion.h
index d532384f6e9..1f517f149f9 100644
--- a/searchlib/src/vespa/searchlib/diskindex/fusion.h
+++ b/searchlib/src/vespa/searchlib/diskindex/fusion.h
@@ -9,7 +9,10 @@
#include <vespa/vespalib/util/threadexecutor.h>
namespace search { template <class IN> class PostingPriorityQueue; }
-namespace search { class TuneFileIndexing; }
+namespace search {
+class IFlushToken;
+class TuneFileIndexing;
+}
namespace search::common { class FileHeaderContext; }
namespace search::index { class FieldLengthInfo; }
@@ -48,20 +51,20 @@ private:
using SchemaUtil = index::SchemaUtil;
using WordNumMappingList = std::vector<WordNumMapping>;
- bool mergeFields(vespalib::ThreadExecutor & executor);
- bool mergeField(uint32_t id);
+ bool mergeFields(vespalib::ThreadExecutor & executor, std::shared_ptr<IFlushToken> flush_token);
+ bool mergeField(uint32_t id, std::shared_ptr<IFlushToken> flush_token);
std::shared_ptr<FieldLengthScanner> allocate_field_length_scanner(const SchemaUtil::IndexIterator &index);
bool openInputFieldReaders(const SchemaUtil::IndexIterator &index, const WordNumMappingList & list,
std::vector<std::unique_ptr<FieldReader> > & readers);
bool openFieldWriter(const SchemaUtil::IndexIterator &index, FieldWriter & writer, const index::FieldLengthInfo &field_length_info);
bool setupMergeHeap(const std::vector<std::unique_ptr<FieldReader> > & readers,
FieldWriter &writer, PostingPriorityQueue<FieldReader> &heap);
- bool mergeFieldPostings(const SchemaUtil::IndexIterator &index, const WordNumMappingList & list, uint64_t numWordIds);
+ bool mergeFieldPostings(const SchemaUtil::IndexIterator &index, const WordNumMappingList & list, uint64_t numWordIds, const IFlushToken& flush_token);
bool openInputWordReaders(const vespalib::string & dir, const SchemaUtil::IndexIterator &index,
std::vector<std::unique_ptr<DictionaryWordReader> > &readers,
PostingPriorityQueue<DictionaryWordReader> &heap);
bool renumberFieldWordIds(const vespalib::string & dir, const SchemaUtil::IndexIterator &index,
- WordNumMappingList & list, uint64_t & numWordIds);
+ WordNumMappingList & list, uint64_t& numWordIds, const IFlushToken& flush_token);
void makeTmpDirs(const vespalib::string & dir);
bool cleanTmpDirs(const vespalib::string & dir);
bool readSchemaFiles();
@@ -93,7 +96,8 @@ public:
static bool
merge(const Schema &schema, const vespalib::string &dir, const std::vector<vespalib::string> &sources,
const SelectorArray &docIdSelector, bool dynamicKPosOccFormat, const TuneFileIndexing &tuneFileIndexing,
- const common::FileHeaderContext &fileHeaderContext, vespalib::ThreadExecutor & executor);
+ const common::FileHeaderContext &fileHeaderContext, vespalib::ThreadExecutor & executor,
+ std::shared_ptr<IFlushToken> flush_token);
};
}
diff --git a/searchlib/src/vespa/searchlib/test/fakedata/fakememtreeocc.cpp b/searchlib/src/vespa/searchlib/test/fakedata/fakememtreeocc.cpp
index b7d444a0fca..0174f1d0aac 100644
--- a/searchlib/src/vespa/searchlib/test/fakedata/fakememtreeocc.cpp
+++ b/searchlib/src/vespa/searchlib/test/fakedata/fakememtreeocc.cpp
@@ -2,6 +2,7 @@
#include "fakememtreeocc.h"
#include "fpfactory.h"
+#include <vespa/searchlib/common/flush_token.h>
#include <vespa/searchlib/memoryindex/posting_iterator.h>
#include <vespa/searchlib/queryeval/iterators.h>
#include <vespa/searchlib/util/postingpriorityqueue.h>
@@ -353,16 +354,15 @@ FakeMemTreeOccFactory::setup(const std::vector<const FakeWord *> &fws)
PostingPriorityQueue<FakeWord::RandomizedReader> heap;
std::vector<FakeWord::RandomizedReader>::iterator i(r.begin());
std::vector<FakeWord::RandomizedReader>::iterator ie(r.end());
+ FlushToken flush_token;
while (i != ie) {
i->read();
- if (i->isValid())
+ if (i->isValid()) {
heap.initialAdd(&*i);
-#if 0
- heap.merge(_mgr, 4);
-#endif
+ }
++i;
}
- heap.merge(_mgr, 4);
+ heap.merge(_mgr, 4, flush_token);
assert(heap.empty());
_mgr.finalize();
}
diff --git a/searchlib/src/vespa/searchlib/util/postingpriorityqueue.h b/searchlib/src/vespa/searchlib/util/postingpriorityqueue.h
index 532c9e6711f..baf38035210 100644
--- a/searchlib/src/vespa/searchlib/util/postingpriorityqueue.h
+++ b/searchlib/src/vespa/searchlib/util/postingpriorityqueue.h
@@ -91,26 +91,27 @@ public:
template <class OUT>
void
- mergeHeap(OUT &out) __attribute__((noinline));
+ mergeHeap(OUT &out, const IFlushToken& flush_token) __attribute__((noinline));
template <class OUT>
static void
- mergeOne(OUT &out, IN &in) __attribute__((noinline));
+ mergeOne(OUT &out, IN &in, const IFlushToken &flush_token) __attribute__((noinline));
template <class OUT>
static void
- mergeTwo(OUT &out, IN &in1, IN &in2) __attribute__((noinline));
+ mergeTwo(OUT &out, IN &in1, IN &in2, const IFlushToken& flush_token) __attribute__((noinline));
template <class OUT>
static void
mergeSmall(OUT &out,
typename Vector::iterator ib,
- typename Vector::iterator ie)
+ typename Vector::iterator ie,
+ const IFlushToken &flush_token)
__attribute__((noinline));
template <class OUT>
void
- merge(OUT &out, uint32_t heapLimit) __attribute__((noinline));
+ merge(OUT &out, uint32_t heapLimit, const IFlushToken& flush_token) __attribute__((noinline));
};
@@ -143,9 +144,9 @@ PostingPriorityQueue<IN>::adjust()
template <class IN>
template <class OUT>
void
-PostingPriorityQueue<IN>::mergeHeap(OUT &out)
+PostingPriorityQueue<IN>::mergeHeap(OUT &out, const IFlushToken& flush_token)
{
- while (!empty()) {
+ while (!empty() && !flush_token.stop_requested()) {
IN *low = lowest();
low->write(out);
low->read();
@@ -157,9 +158,9 @@ PostingPriorityQueue<IN>::mergeHeap(OUT &out)
template <class IN>
template <class OUT>
void
-PostingPriorityQueue<IN>::mergeOne(OUT &out, IN &in)
+PostingPriorityQueue<IN>::mergeOne(OUT &out, IN &in, const IFlushToken& flush_token)
{
- while (in.isValid()) {
+ while (in.isValid() && !flush_token.stop_requested()) {
in.write(out);
in.read();
}
@@ -168,9 +169,9 @@ PostingPriorityQueue<IN>::mergeOne(OUT &out, IN &in)
template <class IN>
template <class OUT>
void
-PostingPriorityQueue<IN>::mergeTwo(OUT &out, IN &in1, IN &in2)
+PostingPriorityQueue<IN>::mergeTwo(OUT &out, IN &in1, IN &in2, const IFlushToken& flush_token)
{
- for (;;) {
+ while (!flush_token.stop_requested()) {
IN &low = in2 < in1 ? in2 : in1;
low.write(out);
low.read();
@@ -185,9 +186,10 @@ template <class OUT>
void
PostingPriorityQueue<IN>::mergeSmall(OUT &out,
typename Vector::iterator ib,
- typename Vector::iterator ie)
+ typename Vector::iterator ie,
+ const IFlushToken& flush_token)
{
- for (;;) {
+ while (!flush_token.stop_requested()) {
typename Vector::iterator i = ib;
IN *low = i->get();
for (++i; i != ie; ++i)
@@ -204,7 +206,7 @@ PostingPriorityQueue<IN>::mergeSmall(OUT &out,
template <class IN>
template <class OUT>
void
-PostingPriorityQueue<IN>::merge(OUT &out, uint32_t heapLimit)
+PostingPriorityQueue<IN>::merge(OUT &out, uint32_t heapLimit, const IFlushToken& flush_token)
{
if (_vec.empty())
return;
@@ -214,29 +216,30 @@ PostingPriorityQueue<IN>::merge(OUT &out, uint32_t heapLimit)
}
if (_vec.size() >= heapLimit) {
sort();
- void (PostingPriorityQueue::*mergeHeapFunc)(OUT &out) =
+ void (PostingPriorityQueue::*mergeHeapFunc)(OUT &out, const IFlushToken& flush_token) =
&PostingPriorityQueue::mergeHeap;
- (this->*mergeHeapFunc)(out);
+ (this->*mergeHeapFunc)(out, flush_token);
return;
}
for (;;) {
if (_vec.size() == 1) {
- void (*mergeOneFunc)(OUT &out, IN &in) =
+ void (*mergeOneFunc)(OUT &out, IN &in, const IFlushToken& flush_token) =
&PostingPriorityQueue<IN>::mergeOne;
- (*mergeOneFunc)(out, *_vec.front().get());
+ (*mergeOneFunc)(out, *_vec.front().get(), flush_token);
_vec.clear();
return;
}
if (_vec.size() == 2) {
- void (*mergeTwoFunc)(OUT &out, IN &in1, IN &in2) =
+ void (*mergeTwoFunc)(OUT &out, IN &in1, IN &in2, const IFlushToken& flush_token) =
&PostingPriorityQueue<IN>::mergeTwo;
- (*mergeTwoFunc)(out, *_vec[0].get(), *_vec[1].get());
+ (*mergeTwoFunc)(out, *_vec[0].get(), *_vec[1].get(), flush_token);
} else {
void (*mergeSmallFunc)(OUT &out,
typename Vector::iterator ib,
- typename Vector::iterator ie) =
+ typename Vector::iterator ie,
+ const IFlushToken& flush_token) =
&PostingPriorityQueue::mergeSmall;
- (*mergeSmallFunc)(out, _vec.begin(), _vec.end());
+ (*mergeSmallFunc)(out, _vec.begin(), _vec.end(), flush_token);
}
for (typename Vector::iterator i = _vec.begin(), ie = _vec.end();
i != ie; ++i) {