diff options
author | Tor Egge <Tor.Egge@broadpark.no> | 2021-01-06 16:31:24 +0100 |
---|---|---|
committer | Tor Egge <Tor.Egge@broadpark.no> | 2021-01-06 16:31:24 +0100 |
commit | e096765acfc1b28e6bce7855d7450824e4d287bb (patch) | |
tree | 1c4cc0e5de0d1623d096c55933f4d6d2178df3df | |
parent | e1977829a79d9c7598fc99627861c6a822d16318 (diff) |
Stop fusion when closing flush engine.
7 files changed, 75 insertions, 11 deletions
diff --git a/searchcore/src/tests/proton/index/fusionrunner_test.cpp b/searchcore/src/tests/proton/index/fusionrunner_test.cpp index acd5c86fd5d..80e6e8b3db8 100644 --- a/searchcore/src/tests/proton/index/fusionrunner_test.cpp +++ b/searchcore/src/tests/proton/index/fusionrunner_test.cpp @@ -80,6 +80,7 @@ class Test : public vespalib::TestApp { void requireThatFusionCanRunOnMultipleDiskIndexes(); void requireThatOldFusionIndexCanBePartOfNewFusion(); void requireThatSelectorsCanBeRebased(); + void requireThatFusionCanBeStopped(); public: Test() @@ -111,6 +112,7 @@ Test::Main() TEST_CALL(requireThatFusionCanRunOnMultipleDiskIndexes()); TEST_CALL(requireThatOldFusionIndexCanBePartOfNewFusion()); TEST_CALL(requireThatSelectorsCanBeRebased()); + TEST_CALL(requireThatFusionCanBeStopped()); TEST_DONE(); } @@ -324,6 +326,17 @@ void Test::requireThatSelectorsCanBeRebased() { checkResults(fusion_id, disk_id, 3); } +void +Test::requireThatFusionCanBeStopped() +{ + createIndex(base_dir, disk_id[0]); + createIndex(base_dir, disk_id[1]); + auto flush_token = std::make_shared<search::FlushToken>(); + flush_token->request_stop(); + uint32_t fusion_id = _fusion_runner->fuse(_fusion_spec, 0u, _ops, flush_token); + EXPECT_EQUAL(0u, fusion_id); +} + } // namespace TEST_APPHOOK(Test); diff --git a/searchcore/src/tests/proton/index/indexmanager_test.cpp b/searchcore/src/tests/proton/index/indexmanager_test.cpp index 67eb11cee3e..0589bdbda96 100644 --- a/searchcore/src/tests/proton/index/indexmanager_test.cpp +++ b/searchcore/src/tests/proton/index/indexmanager_test.cpp @@ -830,6 +830,30 @@ TEST_F(IndexManagerTest, field_length_info_is_loaded_from_disk_index_during_star expect_field_length_info(1, 2, *as_memory_index(*sources, 1)); } +TEST_F(IndexManagerTest, fusion_can_be_stopped) +{ + resetIndexManager(); + + addDocument(docid); + flushIndexManager(); + addDocument(docid); + flushIndexManager(); + + IndexFusionTarget target(_index_manager->getMaintainer()); + auto flush_token = std::make_shared<search::FlushToken>(); + flush_token->request_stop(); + vespalib::Executor::Task::UP fusionTask = target.initFlush(1, flush_token); + fusionTask->run(); + + FusionSpec spec = _index_manager->getMaintainer().getFusionSpec(); + set<uint32_t> fusion_ids = readDiskIds(index_dir, "fusion"); + EXPECT_TRUE(fusion_ids.empty()); + EXPECT_EQ(0u, spec.last_fusion_id); + EXPECT_EQ(2u, spec.flush_ids.size()); + EXPECT_EQ(1u, spec.flush_ids[0]); + EXPECT_EQ(2u, spec.flush_ids[1]); +} + } // namespace int diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp index 5f35ecc916d..ab5b1ac5937 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp +++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp @@ -94,7 +94,9 @@ FlushEngine::FlushEngine(std::shared_ptr<flushengine::ITlsStatsFactory> tlsStats _strategyLock(), _strategyCond(), _tlsStatsFactory(std::move(tlsStatsFactory)), - _pendingPrune() + _pendingPrune(), + _normal_flush_token(std::make_shared<search::FlushToken>()), + _gc_flush_token(std::make_shared<search::FlushToken>()) { } FlushEngine::~FlushEngine() @@ -117,6 +119,7 @@ FlushEngine::close() { std::lock_guard<std::mutex> strategyGuard(_strategyLock); std::lock_guard<std::mutex> guard(_lock); + _gc_flush_token->request_stop(); _closed = true; _cond.notify_all(); } @@ -269,6 +272,16 @@ FlushEngine::getSortedTargetList() return ret; } +std::shared_ptr<search::IFlushToken> +FlushEngine::get_flush_token(const FlushContext& ctx) +{ + if (ctx.getTarget()->getType() == IFlushTarget::Type::GC) { + return _gc_flush_token; + } else { + return _normal_flush_token; + } +} + FlushContext::SP FlushEngine::initNextFlush(const FlushContext::List &lst) { @@ -277,7 +290,7 @@ FlushEngine::initNextFlush(const FlushContext::List &lst) if (LOG_WOULD_LOG(event)) { EventLogger::flushInit(it->getName()); } - if (it->initFlush(std::make_shared<search::FlushToken>())) { + if (it->initFlush(get_flush_token(*it))) { ctx = it; break; } @@ -294,7 +307,7 @@ FlushEngine::flushAll(const FlushContext::List &lst) LOG(debug, "%ld targets to flush.", lst.size()); for (const FlushContext::SP & ctx : lst) { if (wait(0)) { - if (ctx->initFlush(std::make_shared<search::FlushToken>())) { + if (ctx->initFlush(get_flush_token(*ctx))) { logTarget("initiated", *ctx); _executor.execute(std::make_unique<FlushTask>(initFlush(*ctx), *this, ctx)); } else { diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h index 160423c7c68..f51e93f0fbd 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h +++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h @@ -12,6 +12,8 @@ #include <mutex> #include <condition_variable> +namespace search { class FlushToken; } + namespace proton { namespace flushengine { class ITlsStatsFactory; } @@ -63,9 +65,12 @@ private: std::condition_variable _strategyCond; std::shared_ptr<flushengine::ITlsStatsFactory> _tlsStatsFactory; std::set<IFlushHandler::SP> _pendingPrune; + std::shared_ptr<search::FlushToken> _normal_flush_token; + std::shared_ptr<search::FlushToken> _gc_flush_token; FlushContext::List getTargetList(bool includeFlushingTargets) const; std::pair<FlushContext::List,bool> getSortedTargetList(); + std::shared_ptr<search::IFlushToken> get_flush_token(const FlushContext& ctx); FlushContext::SP initNextFlush(const FlushContext::List &lst); vespalib::string flushNextTarget(const vespalib::string & name); void flushAll(const FlushContext::List &lst); diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp index e2bcb8b7629..9c180c39144 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp +++ b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp @@ -10,6 +10,7 @@ #include "indexwriteutilities.h" #include <vespa/fastos/file.h> #include <vespa/searchcorespi/flush/closureflushtask.h> +#include <vespa/searchlib/common/i_flush_token.h> #include <vespa/searchlib/index/schemautil.h> #include <vespa/searchlib/util/dirtraverse.h> #include <vespa/searchlib/util/filekit.h> @@ -984,11 +985,15 @@ IndexMaintainer::doFusion(SerialNum serialNum, std::shared_ptr<search::IFlushTok _fusion_spec.flush_ids.clear(); } - uint32_t new_fusion_id = runFusion(spec, std::move(flush_token)); + uint32_t new_fusion_id = runFusion(spec, flush_token); LockGuard lock(_fusion_lock); if (new_fusion_id == spec.last_fusion_id) { // Error running fusion. - LOG(warning, "Fusion failed for id %u.", spec.flush_ids.back()); + if (flush_token->stop_requested()) { + LOG(info, "Fusion stopped for id %u.", spec.flush_ids.back()); + } else { + LOG(warning, "Fusion failed for id %u.", spec.flush_ids.back()); + } // Restore fusion spec. copy(_fusion_spec.flush_ids.begin(), _fusion_spec.flush_ids.end(), back_inserter(spec.flush_ids)); _fusion_spec.flush_ids.swap(spec.flush_ids); @@ -1020,14 +1025,18 @@ IndexMaintainer::runFusion(const FusionSpec &fusion_spec, std::shared_ptr<search serialNum = IndexReadUtilities::readSerialNum(lastFlushDir); } FusionRunner fusion_runner(_base_dir, args._schema, tuneFileAttributes, _ctx.getFileHeaderContext()); - uint32_t new_fusion_id = fusion_runner.fuse(fusion_spec, serialNum, _operations, std::move(flush_token)); + uint32_t new_fusion_id = fusion_runner.fuse(fusion_spec, serialNum, _operations, flush_token); bool ok = (new_fusion_id != 0); if (ok) { ok = IndexWriteUtilities::copySerialNumFile(getFlushDir(fusion_spec.flush_ids.back()), getFusionDir(new_fusion_id)); } if (!ok) { - LOG(error, "Fusion failed."); + if (flush_token->stop_requested()) { + LOG(info, "Fusion stopped."); + } else { + LOG(error, "Fusion failed."); + } string fail_dir = getFusionDir(fusion_spec.flush_ids.back()); FastOS_FileInterface::EmptyAndRemoveDirectory(fail_dir.c_str()); { diff --git a/searchlib/src/tests/diskindex/fusion/fusion_test.cpp b/searchlib/src/tests/diskindex/fusion/fusion_test.cpp index efc9e99bf88..4c62140b731 100644 --- a/searchlib/src/tests/diskindex/fusion/fusion_test.cpp +++ b/searchlib/src/tests/diskindex/fusion/fusion_test.cpp @@ -602,15 +602,15 @@ TEST_F(FusionTest, require_that_fusion_can_be_stopped) auto flush_token = std::make_shared<MyFlushToken>(10000); make_simple_index("stopdump2", MockFieldLengthInspector()); ASSERT_TRUE(try_merge_simple_indexes("stopdump3", {"stopdump2"}, flush_token)); - EXPECT_EQ(40, flush_token->get_checks()); + EXPECT_EQ(48, flush_token->get_checks()); vespalib::rmdir("stopdump3", true); flush_token = std::make_shared<MyFlushToken>(1); ASSERT_FALSE(try_merge_simple_indexes("stopdump3", {"stopdump2"}, flush_token)); EXPECT_EQ(12, flush_token->get_checks()); vespalib::rmdir("stopdump3", true); - flush_token = std::make_shared<MyFlushToken>(39); + flush_token = std::make_shared<MyFlushToken>(47); ASSERT_FALSE(try_merge_simple_indexes("stopdump3", {"stopdump2"}, flush_token)); - EXPECT_EQ(41, flush_token->get_checks()); + EXPECT_EQ(49, flush_token->get_checks()); clean_stopped_fusion_testdirs(); } diff --git a/searchlib/src/vespa/searchlib/util/postingpriorityqueue.h b/searchlib/src/vespa/searchlib/util/postingpriorityqueue.h index baf38035210..008e9055e57 100644 --- a/searchlib/src/vespa/searchlib/util/postingpriorityqueue.h +++ b/searchlib/src/vespa/searchlib/util/postingpriorityqueue.h @@ -221,7 +221,7 @@ PostingPriorityQueue<IN>::merge(OUT &out, uint32_t heapLimit, const IFlushToken& (this->*mergeHeapFunc)(out, flush_token); return; } - for (;;) { + while (!flush_token.stop_requested()) { if (_vec.size() == 1) { void (*mergeOneFunc)(OUT &out, IN &in, const IFlushToken& flush_token) = &PostingPriorityQueue<IN>::mergeOne; |