diff options
author | Tor Egge <Tor.Egge@broadpark.no> | 2021-01-05 12:51:15 +0100 |
---|---|---|
committer | Tor Egge <Tor.Egge@broadpark.no> | 2021-01-05 14:58:59 +0100 |
commit | 42a02eddf6b450682994afae68249171fe7876b8 (patch) | |
tree | 6a0a14eac83492008788f41baf81c6dcc87d0a5d /searchlib | |
parent | 6382cb8513ab166e4e4184e0ddebd60f97fb6bb3 (diff) |
Add low level support for stopping a running disk index fusion.
Diffstat (limited to 'searchlib')
10 files changed, 200 insertions, 56 deletions
diff --git a/searchlib/src/tests/diskindex/fusion/fusion_test.cpp b/searchlib/src/tests/diskindex/fusion/fusion_test.cpp index c94d19cb3b7..efc9e99bf88 100644 --- a/searchlib/src/tests/diskindex/fusion/fusion_test.cpp +++ b/searchlib/src/tests/diskindex/fusion/fusion_test.cpp @@ -1,5 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/searchlib/common/flush_token.h> #include <vespa/searchlib/diskindex/diskindex.h> #include <vespa/searchlib/diskindex/fusion.h> #include <vespa/searchlib/diskindex/indexbuilder.h> @@ -61,6 +62,7 @@ protected: void requireThatFusionIsWorking(const vespalib::string &prefix, bool directio, bool readmmap); void make_simple_index(const vespalib::string &dump_dir, const IFieldLengthInspector &field_length_inspector); + bool try_merge_simple_indexes(const vespalib::string &dump_dir, const std::vector<vespalib::string> &sources, std::shared_ptr<IFlushToken> flush_token); void merge_simple_indexes(const vespalib::string &dump_dir, const std::vector<vespalib::string> &sources); public: FusionTest(); @@ -390,7 +392,7 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire sources.push_back(prefix + "dump2"); ASSERT_TRUE(Fusion::merge(schema, prefix + "dump3", sources, selector, dynamicKPosOcc, - tuneFileIndexing,fileHeaderContext, executor)); + tuneFileIndexing,fileHeaderContext, executor, std::make_shared<FlushToken>())); } while (0); do { DiskIndex dw3(prefix + "dump3"); @@ -403,7 +405,7 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire sources.push_back(prefix + "dump3"); ASSERT_TRUE(Fusion::merge(schema2, prefix + "dump4", sources, selector, dynamicKPosOcc, - tuneFileIndexing, fileHeaderContext, executor)); + tuneFileIndexing, fileHeaderContext, executor, std::make_shared<FlushToken>())); } while (0); do { DiskIndex dw4(prefix + "dump4"); @@ -416,7 +418,7 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire sources.push_back(prefix + "dump3"); ASSERT_TRUE(Fusion::merge(schema3, prefix + "dump5", sources, selector, dynamicKPosOcc, - tuneFileIndexing, fileHeaderContext, executor)); + tuneFileIndexing, fileHeaderContext, executor, std::make_shared<FlushToken>())); } while (0); do { DiskIndex dw5(prefix + "dump5"); @@ -429,7 +431,7 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire sources.push_back(prefix + "dump3"); ASSERT_TRUE(Fusion::merge(schema, prefix + "dump6", sources, selector, !dynamicKPosOcc, - tuneFileIndexing, fileHeaderContext, executor)); + tuneFileIndexing, fileHeaderContext, executor, std::make_shared<FlushToken>())); } while (0); do { DiskIndex dw6(prefix + "dump6"); @@ -442,7 +444,7 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire sources.push_back(prefix + "dump2"); ASSERT_TRUE(Fusion::merge(schema, prefix + "dump3", sources, selector, dynamicKPosOcc, - tuneFileIndexing, fileHeaderContext, executor)); + tuneFileIndexing, fileHeaderContext, executor, std::make_shared<FlushToken>())); } while (0); do { DiskIndex dw3(prefix + "dump3"); @@ -476,16 +478,22 @@ FusionTest::make_simple_index(const vespalib::string &dump_dir, const IFieldLeng ib.close(); } -void -FusionTest::merge_simple_indexes(const vespalib::string &dump_dir, const std::vector<vespalib::string> &sources) +bool +FusionTest::try_merge_simple_indexes(const vespalib::string &dump_dir, const std::vector<vespalib::string> &sources, std::shared_ptr<IFlushToken> flush_token) { vespalib::ThreadStackExecutor executor(4, 0x10000); TuneFileIndexing tuneFileIndexing; DummyFileHeaderContext fileHeaderContext; SelectorArray selector(20, 0); - ASSERT_TRUE(Fusion::merge(_schema, dump_dir, sources, selector, - false, - tuneFileIndexing, fileHeaderContext, executor)); + return Fusion::merge(_schema, dump_dir, sources, selector, + false, + tuneFileIndexing, fileHeaderContext, executor, flush_token); +} + +void +FusionTest::merge_simple_indexes(const vespalib::string &dump_dir, const std::vector<vespalib::string> &sources) +{ + ASSERT_TRUE(try_merge_simple_indexes(dump_dir, sources, std::make_shared<FlushToken>())); } FusionTest::FusionTest() @@ -553,6 +561,59 @@ TEST_F(FusionTest, require_that_interleaved_features_can_be_reconstructed) clean_field_length_testdirs(); } +namespace { + +void clean_stopped_fusion_testdirs() +{ + vespalib::rmdir("stopdump2", true); + vespalib::rmdir("stopdump3", true); +} + +class MyFlushToken : public FlushToken +{ + mutable std::atomic<size_t> _checks; + const size_t _limit; +public: + MyFlushToken(size_t limit) + : FlushToken(), + _checks(0u), + _limit(limit) + { + } + ~MyFlushToken() override = default; + bool stop_requested() const noexcept override; + size_t get_checks() const noexcept { return _checks; } +}; + +bool +MyFlushToken::stop_requested() const noexcept +{ + if (++_checks >= _limit) { + const_cast<MyFlushToken *>(this)->request_stop(); + } + return FlushToken::stop_requested(); +} + +} + +TEST_F(FusionTest, require_that_fusion_can_be_stopped) +{ + clean_stopped_fusion_testdirs(); + auto flush_token = std::make_shared<MyFlushToken>(10000); + make_simple_index("stopdump2", MockFieldLengthInspector()); + ASSERT_TRUE(try_merge_simple_indexes("stopdump3", {"stopdump2"}, flush_token)); + EXPECT_EQ(40, flush_token->get_checks()); + vespalib::rmdir("stopdump3", true); + flush_token = std::make_shared<MyFlushToken>(1); + ASSERT_FALSE(try_merge_simple_indexes("stopdump3", {"stopdump2"}, flush_token)); + EXPECT_EQ(12, flush_token->get_checks()); + vespalib::rmdir("stopdump3", true); + flush_token = std::make_shared<MyFlushToken>(39); + ASSERT_FALSE(try_merge_simple_indexes("stopdump3", {"stopdump2"}, flush_token)); + EXPECT_EQ(41, flush_token->get_checks()); + clean_stopped_fusion_testdirs(); +} + } } diff --git a/searchlib/src/vespa/searchlib/common/CMakeLists.txt b/searchlib/src/vespa/searchlib/common/CMakeLists.txt index 7e1cfd8fec5..97af7855aea 100644 --- a/searchlib/src/vespa/searchlib/common/CMakeLists.txt +++ b/searchlib/src/vespa/searchlib/common/CMakeLists.txt @@ -11,6 +11,7 @@ vespa_add_library(searchlib_common OBJECT documentsummary.cpp featureset.cpp fileheadercontext.cpp + flush_token.cpp gatecallback.cpp geo_location.cpp geo_location_spec.cpp diff --git a/searchlib/src/vespa/searchlib/common/flush_token.cpp b/searchlib/src/vespa/searchlib/common/flush_token.cpp new file mode 100644 index 00000000000..b5b34bef9a5 --- /dev/null +++ b/searchlib/src/vespa/searchlib/common/flush_token.cpp @@ -0,0 +1,26 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "flush_token.h" + +namespace search { + +FlushToken::FlushToken() + : _stop(false) +{ +} + +FlushToken::~FlushToken() = default; + +bool +FlushToken::stop_requested() const noexcept +{ + return _stop.load(std::memory_order_relaxed); +} + +void +FlushToken::request_stop() noexcept +{ + _stop.store(true, std::memory_order_relaxed); +} + +} diff --git a/searchlib/src/vespa/searchlib/common/flush_token.h b/searchlib/src/vespa/searchlib/common/flush_token.h new file mode 100644 index 00000000000..ea882dbbe45 --- /dev/null +++ b/searchlib/src/vespa/searchlib/common/flush_token.h @@ -0,0 +1,20 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "i_flush_token.h" +#include <atomic> + +namespace search { + +/* + * Class for checking if current flush task should be stopped. + */ +class FlushToken : public IFlushToken { + std::atomic<bool> _stop; +public: + FlushToken(); + ~FlushToken() override; + bool stop_requested() const noexcept override; + void request_stop() noexcept; +}; + +} diff --git a/searchlib/src/vespa/searchlib/common/i_flush_token.h b/searchlib/src/vespa/searchlib/common/i_flush_token.h new file mode 100644 index 00000000000..281c1a13382 --- /dev/null +++ b/searchlib/src/vespa/searchlib/common/i_flush_token.h @@ -0,0 +1,15 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +namespace search { + +/* + * Interface class for checking if current flush task should be stopped. + * TODO: Add methods to register transient memory usage during flush. + */ +class IFlushToken { +public: + virtual ~IFlushToken() = default; + virtual bool stop_requested() const noexcept = 0; +}; + +} diff --git a/searchlib/src/vespa/searchlib/diskindex/fieldreader.h b/searchlib/src/vespa/searchlib/diskindex/fieldreader.h index 899b3708bf9..e95a8389d38 100644 --- a/searchlib/src/vespa/searchlib/diskindex/fieldreader.h +++ b/searchlib/src/vespa/searchlib/diskindex/fieldreader.h @@ -6,7 +6,6 @@ #include <vespa/searchlib/index/docidandfeatures.h> #include <vespa/searchlib/index/postinglistfile.h> #include <vespa/searchlib/index/schemautil.h> -#include <vespa/searchlib/util/postingpriorityqueue.h> #include "wordnummapper.h" #include "docidmapper.h" #include "fieldwriter.h" diff --git a/searchlib/src/vespa/searchlib/diskindex/fusion.cpp b/searchlib/src/vespa/searchlib/diskindex/fusion.cpp index 0ad178d14b3..c8b0674252f 100644 --- a/searchlib/src/vespa/searchlib/diskindex/fusion.cpp +++ b/searchlib/src/vespa/searchlib/diskindex/fusion.cpp @@ -6,9 +6,11 @@ #include "field_length_scanner.h" #include <vespa/vespalib/util/stringfmt.h> #include <vespa/searchlib/bitcompression/posocc_fields_params.h> +#include <vespa/searchlib/common/i_flush_token.h> #include <vespa/searchlib/index/field_length_info.h> #include <vespa/searchlib/util/filekit.h> #include <vespa/searchlib/util/dirtraverse.h> +#include <vespa/searchlib/util/postingpriorityqueue.h> #include <vespa/vespalib/io/fileutil.h> #include <vespa/searchlib/common/documentsummary.h> #include <vespa/vespalib/util/error.h> @@ -139,7 +141,7 @@ Fusion::openInputWordReaders(const vespalib::string & dir, const SchemaUtil::Ind bool Fusion::renumberFieldWordIds(const vespalib::string & dir, const SchemaUtil::IndexIterator &index, - WordNumMappingList & list, uint64_t & numWordIds) + WordNumMappingList & list, uint64_t & numWordIds, const IFlushToken& flush_token) { vespalib::string indexName = index.getName(); LOG(debug, "Renumber word IDs for field %s", indexName.c_str()); @@ -151,7 +153,10 @@ Fusion::renumberFieldWordIds(const vespalib::string & dir, const SchemaUtil::Ind if (!openInputWordReaders(dir, index, readers, heap)) { return false; } - heap.merge(out, 4); + heap.merge(out, 4, flush_token); + if (flush_token.stop_requested()) { + return false; + } assert(heap.empty()); numWordIds = out.getWordNum(); @@ -172,7 +177,7 @@ Fusion::renumberFieldWordIds(const vespalib::string & dir, const SchemaUtil::Ind bool -Fusion::mergeFields(vespalib::ThreadExecutor & executor) +Fusion::mergeFields(vespalib::ThreadExecutor & executor, std::shared_ptr<IFlushToken> flush_token) { const Schema &schema = getSchema(); std::atomic<uint32_t> failed(0); @@ -181,8 +186,8 @@ Fusion::mergeFields(vespalib::ThreadExecutor & executor) vespalib::CountDownLatch done(schema.getNumIndexFields()); for (SchemaUtil::IndexIterator iter(schema); iter.isValid(); ++iter) { concurrent.wait(); - executor.execute(vespalib::makeLambdaTask([this, index=iter.getIndex(), &failed, &done, &concurrent]() { - if (!mergeField(index)) { + executor.execute(vespalib::makeLambdaTask([this, index=iter.getIndex(), &failed, &done, &concurrent, flush_token]() { + if (!mergeField(index, flush_token)) { failed++; } concurrent.post(); @@ -197,7 +202,7 @@ Fusion::mergeFields(vespalib::ThreadExecutor & executor) bool -Fusion::mergeField(uint32_t id) +Fusion::mergeField(uint32_t id, std::shared_ptr<IFlushToken> flush_token) { typedef SchemaUtil::IndexIterator IndexIterator; typedef SchemaUtil::IndexSettings IndexSettings; @@ -222,14 +227,20 @@ Fusion::mergeField(uint32_t id) WordNumMappingList list(_oldIndexes.size()); uint64_t numWordIds(0); - if (!renumberFieldWordIds(indexDir, index, list, numWordIds)) { + if (!renumberFieldWordIds(indexDir, index, list, numWordIds, *flush_token)) { + if (flush_token->stop_requested()) { + return false; + } LOG(error, "Could not renumber field word ids for field %s dir %s", indexName.c_str(), indexDir.c_str()); return false; } // Tokamak - bool res = mergeFieldPostings(index, list, numWordIds); + bool res = mergeFieldPostings(index, list, numWordIds, *flush_token); if (!res) { + if (flush_token->stop_requested()) { + return false; + } throw IllegalArgumentException(make_string("Could not merge field postings for field %s dir %s", indexName.c_str(), indexDir.c_str())); } @@ -387,7 +398,7 @@ Fusion::setupMergeHeap(const std::vector<std::unique_ptr<FieldReader> > & reader bool -Fusion::mergeFieldPostings(const SchemaUtil::IndexIterator &index, const WordNumMappingList & list, uint64_t numWordIds) +Fusion::mergeFieldPostings(const SchemaUtil::IndexIterator &index, const WordNumMappingList & list, uint64_t numWordIds, const IFlushToken& flush_token) { std::vector<std::unique_ptr<FieldReader>> readers; PostingPriorityQueue<FieldReader> heap; @@ -409,7 +420,10 @@ Fusion::mergeFieldPostings(const SchemaUtil::IndexIterator &index, const WordNum return false; } - heap.merge(fieldWriter, 4); + heap.merge(fieldWriter, 4, flush_token); + if (flush_token.stop_requested()) { + return false; + } assert(heap.empty()); for (auto &reader : readers) { @@ -510,7 +524,8 @@ bool Fusion::merge(const Schema &schema, const vespalib::string &dir, const std::vector<vespalib::string> &sources, const SelectorArray &selector, bool dynamicKPosOccFormat, const TuneFileIndexing &tuneFileIndexing, const FileHeaderContext &fileHeaderContext, - vespalib::ThreadExecutor & executor) + vespalib::ThreadExecutor & executor, + std::shared_ptr<IFlushToken> flush_token) { assert(sources.size() <= 255); uint32_t docIdLimit = selector.size(); @@ -550,7 +565,7 @@ Fusion::merge(const Schema &schema, const vespalib::string &dir, const std::vect try { auto fusion = std::make_unique<Fusion>(trimmedDocIdLimit, schema, dir, sources, selector, dynamicKPosOccFormat, tuneFileIndexing, fileHeaderContext); - return fusion->mergeFields(executor); + return fusion->mergeFields(executor, flush_token); } catch (const std::exception & e) { LOG(error, "%s", e.what()); return false; diff --git a/searchlib/src/vespa/searchlib/diskindex/fusion.h b/searchlib/src/vespa/searchlib/diskindex/fusion.h index d532384f6e9..1f517f149f9 100644 --- a/searchlib/src/vespa/searchlib/diskindex/fusion.h +++ b/searchlib/src/vespa/searchlib/diskindex/fusion.h @@ -9,7 +9,10 @@ #include <vespa/vespalib/util/threadexecutor.h> namespace search { template <class IN> class PostingPriorityQueue; } -namespace search { class TuneFileIndexing; } +namespace search { +class IFlushToken; +class TuneFileIndexing; +} namespace search::common { class FileHeaderContext; } namespace search::index { class FieldLengthInfo; } @@ -48,20 +51,20 @@ private: using SchemaUtil = index::SchemaUtil; using WordNumMappingList = std::vector<WordNumMapping>; - bool mergeFields(vespalib::ThreadExecutor & executor); - bool mergeField(uint32_t id); + bool mergeFields(vespalib::ThreadExecutor & executor, std::shared_ptr<IFlushToken> flush_token); + bool mergeField(uint32_t id, std::shared_ptr<IFlushToken> flush_token); std::shared_ptr<FieldLengthScanner> allocate_field_length_scanner(const SchemaUtil::IndexIterator &index); bool openInputFieldReaders(const SchemaUtil::IndexIterator &index, const WordNumMappingList & list, std::vector<std::unique_ptr<FieldReader> > & readers); bool openFieldWriter(const SchemaUtil::IndexIterator &index, FieldWriter & writer, const index::FieldLengthInfo &field_length_info); bool setupMergeHeap(const std::vector<std::unique_ptr<FieldReader> > & readers, FieldWriter &writer, PostingPriorityQueue<FieldReader> &heap); - bool mergeFieldPostings(const SchemaUtil::IndexIterator &index, const WordNumMappingList & list, uint64_t numWordIds); + bool mergeFieldPostings(const SchemaUtil::IndexIterator &index, const WordNumMappingList & list, uint64_t numWordIds, const IFlushToken& flush_token); bool openInputWordReaders(const vespalib::string & dir, const SchemaUtil::IndexIterator &index, std::vector<std::unique_ptr<DictionaryWordReader> > &readers, PostingPriorityQueue<DictionaryWordReader> &heap); bool renumberFieldWordIds(const vespalib::string & dir, const SchemaUtil::IndexIterator &index, - WordNumMappingList & list, uint64_t & numWordIds); + WordNumMappingList & list, uint64_t& numWordIds, const IFlushToken& flush_token); void makeTmpDirs(const vespalib::string & dir); bool cleanTmpDirs(const vespalib::string & dir); bool readSchemaFiles(); @@ -93,7 +96,8 @@ public: static bool merge(const Schema &schema, const vespalib::string &dir, const std::vector<vespalib::string> &sources, const SelectorArray &docIdSelector, bool dynamicKPosOccFormat, const TuneFileIndexing &tuneFileIndexing, - const common::FileHeaderContext &fileHeaderContext, vespalib::ThreadExecutor & executor); + const common::FileHeaderContext &fileHeaderContext, vespalib::ThreadExecutor & executor, + std::shared_ptr<IFlushToken> flush_token); }; } diff --git a/searchlib/src/vespa/searchlib/test/fakedata/fakememtreeocc.cpp b/searchlib/src/vespa/searchlib/test/fakedata/fakememtreeocc.cpp index b7d444a0fca..0174f1d0aac 100644 --- a/searchlib/src/vespa/searchlib/test/fakedata/fakememtreeocc.cpp +++ b/searchlib/src/vespa/searchlib/test/fakedata/fakememtreeocc.cpp @@ -2,6 +2,7 @@ #include "fakememtreeocc.h" #include "fpfactory.h" +#include <vespa/searchlib/common/flush_token.h> #include <vespa/searchlib/memoryindex/posting_iterator.h> #include <vespa/searchlib/queryeval/iterators.h> #include <vespa/searchlib/util/postingpriorityqueue.h> @@ -353,16 +354,15 @@ FakeMemTreeOccFactory::setup(const std::vector<const FakeWord *> &fws) PostingPriorityQueue<FakeWord::RandomizedReader> heap; std::vector<FakeWord::RandomizedReader>::iterator i(r.begin()); std::vector<FakeWord::RandomizedReader>::iterator ie(r.end()); + FlushToken flush_token; while (i != ie) { i->read(); - if (i->isValid()) + if (i->isValid()) { heap.initialAdd(&*i); -#if 0 - heap.merge(_mgr, 4); -#endif + } ++i; } - heap.merge(_mgr, 4); + heap.merge(_mgr, 4, flush_token); assert(heap.empty()); _mgr.finalize(); } diff --git a/searchlib/src/vespa/searchlib/util/postingpriorityqueue.h b/searchlib/src/vespa/searchlib/util/postingpriorityqueue.h index 532c9e6711f..baf38035210 100644 --- a/searchlib/src/vespa/searchlib/util/postingpriorityqueue.h +++ b/searchlib/src/vespa/searchlib/util/postingpriorityqueue.h @@ -91,26 +91,27 @@ public: template <class OUT> void - mergeHeap(OUT &out) __attribute__((noinline)); + mergeHeap(OUT &out, const IFlushToken& flush_token) __attribute__((noinline)); template <class OUT> static void - mergeOne(OUT &out, IN &in) __attribute__((noinline)); + mergeOne(OUT &out, IN &in, const IFlushToken &flush_token) __attribute__((noinline)); template <class OUT> static void - mergeTwo(OUT &out, IN &in1, IN &in2) __attribute__((noinline)); + mergeTwo(OUT &out, IN &in1, IN &in2, const IFlushToken& flush_token) __attribute__((noinline)); template <class OUT> static void mergeSmall(OUT &out, typename Vector::iterator ib, - typename Vector::iterator ie) + typename Vector::iterator ie, + const IFlushToken &flush_token) __attribute__((noinline)); template <class OUT> void - merge(OUT &out, uint32_t heapLimit) __attribute__((noinline)); + merge(OUT &out, uint32_t heapLimit, const IFlushToken& flush_token) __attribute__((noinline)); }; @@ -143,9 +144,9 @@ PostingPriorityQueue<IN>::adjust() template <class IN> template <class OUT> void -PostingPriorityQueue<IN>::mergeHeap(OUT &out) +PostingPriorityQueue<IN>::mergeHeap(OUT &out, const IFlushToken& flush_token) { - while (!empty()) { + while (!empty() && !flush_token.stop_requested()) { IN *low = lowest(); low->write(out); low->read(); @@ -157,9 +158,9 @@ PostingPriorityQueue<IN>::mergeHeap(OUT &out) template <class IN> template <class OUT> void -PostingPriorityQueue<IN>::mergeOne(OUT &out, IN &in) +PostingPriorityQueue<IN>::mergeOne(OUT &out, IN &in, const IFlushToken& flush_token) { - while (in.isValid()) { + while (in.isValid() && !flush_token.stop_requested()) { in.write(out); in.read(); } @@ -168,9 +169,9 @@ PostingPriorityQueue<IN>::mergeOne(OUT &out, IN &in) template <class IN> template <class OUT> void -PostingPriorityQueue<IN>::mergeTwo(OUT &out, IN &in1, IN &in2) +PostingPriorityQueue<IN>::mergeTwo(OUT &out, IN &in1, IN &in2, const IFlushToken& flush_token) { - for (;;) { + while (!flush_token.stop_requested()) { IN &low = in2 < in1 ? in2 : in1; low.write(out); low.read(); @@ -185,9 +186,10 @@ template <class OUT> void PostingPriorityQueue<IN>::mergeSmall(OUT &out, typename Vector::iterator ib, - typename Vector::iterator ie) + typename Vector::iterator ie, + const IFlushToken& flush_token) { - for (;;) { + while (!flush_token.stop_requested()) { typename Vector::iterator i = ib; IN *low = i->get(); for (++i; i != ie; ++i) @@ -204,7 +206,7 @@ PostingPriorityQueue<IN>::mergeSmall(OUT &out, template <class IN> template <class OUT> void -PostingPriorityQueue<IN>::merge(OUT &out, uint32_t heapLimit) +PostingPriorityQueue<IN>::merge(OUT &out, uint32_t heapLimit, const IFlushToken& flush_token) { if (_vec.empty()) return; @@ -214,29 +216,30 @@ PostingPriorityQueue<IN>::merge(OUT &out, uint32_t heapLimit) } if (_vec.size() >= heapLimit) { sort(); - void (PostingPriorityQueue::*mergeHeapFunc)(OUT &out) = + void (PostingPriorityQueue::*mergeHeapFunc)(OUT &out, const IFlushToken& flush_token) = &PostingPriorityQueue::mergeHeap; - (this->*mergeHeapFunc)(out); + (this->*mergeHeapFunc)(out, flush_token); return; } for (;;) { if (_vec.size() == 1) { - void (*mergeOneFunc)(OUT &out, IN &in) = + void (*mergeOneFunc)(OUT &out, IN &in, const IFlushToken& flush_token) = &PostingPriorityQueue<IN>::mergeOne; - (*mergeOneFunc)(out, *_vec.front().get()); + (*mergeOneFunc)(out, *_vec.front().get(), flush_token); _vec.clear(); return; } if (_vec.size() == 2) { - void (*mergeTwoFunc)(OUT &out, IN &in1, IN &in2) = + void (*mergeTwoFunc)(OUT &out, IN &in1, IN &in2, const IFlushToken& flush_token) = &PostingPriorityQueue<IN>::mergeTwo; - (*mergeTwoFunc)(out, *_vec[0].get(), *_vec[1].get()); + (*mergeTwoFunc)(out, *_vec[0].get(), *_vec[1].get(), flush_token); } else { void (*mergeSmallFunc)(OUT &out, typename Vector::iterator ib, - typename Vector::iterator ie) = + typename Vector::iterator ie, + const IFlushToken& flush_token) = &PostingPriorityQueue::mergeSmall; - (*mergeSmallFunc)(out, _vec.begin(), _vec.end()); + (*mergeSmallFunc)(out, _vec.begin(), _vec.end(), flush_token); } for (typename Vector::iterator i = _vec.begin(), ie = _vec.end(); i != ie; ++i) { |