From e096765acfc1b28e6bce7855d7450824e4d287bb Mon Sep 17 00:00:00 2001 From: Tor Egge Date: Wed, 6 Jan 2021 16:31:24 +0100 Subject: Stop fusion when closing flush engine. --- .../src/tests/proton/index/fusionrunner_test.cpp | 13 ++++++++++++ .../src/tests/proton/index/indexmanager_test.cpp | 24 ++++++++++++++++++++++ .../searchcore/proton/flushengine/flushengine.cpp | 19 ++++++++++++++--- .../searchcore/proton/flushengine/flushengine.h | 5 +++++ 4 files changed, 58 insertions(+), 3 deletions(-) (limited to 'searchcore') diff --git a/searchcore/src/tests/proton/index/fusionrunner_test.cpp b/searchcore/src/tests/proton/index/fusionrunner_test.cpp index acd5c86fd5d..80e6e8b3db8 100644 --- a/searchcore/src/tests/proton/index/fusionrunner_test.cpp +++ b/searchcore/src/tests/proton/index/fusionrunner_test.cpp @@ -80,6 +80,7 @@ class Test : public vespalib::TestApp { void requireThatFusionCanRunOnMultipleDiskIndexes(); void requireThatOldFusionIndexCanBePartOfNewFusion(); void requireThatSelectorsCanBeRebased(); + void requireThatFusionCanBeStopped(); public: Test() @@ -111,6 +112,7 @@ Test::Main() TEST_CALL(requireThatFusionCanRunOnMultipleDiskIndexes()); TEST_CALL(requireThatOldFusionIndexCanBePartOfNewFusion()); TEST_CALL(requireThatSelectorsCanBeRebased()); + TEST_CALL(requireThatFusionCanBeStopped()); TEST_DONE(); } @@ -324,6 +326,17 @@ void Test::requireThatSelectorsCanBeRebased() { checkResults(fusion_id, disk_id, 3); } +void +Test::requireThatFusionCanBeStopped() +{ + createIndex(base_dir, disk_id[0]); + createIndex(base_dir, disk_id[1]); + auto flush_token = std::make_shared(); + flush_token->request_stop(); + uint32_t fusion_id = _fusion_runner->fuse(_fusion_spec, 0u, _ops, flush_token); + EXPECT_EQUAL(0u, fusion_id); +} + } // namespace TEST_APPHOOK(Test); diff --git a/searchcore/src/tests/proton/index/indexmanager_test.cpp b/searchcore/src/tests/proton/index/indexmanager_test.cpp index 67eb11cee3e..0589bdbda96 100644 --- a/searchcore/src/tests/proton/index/indexmanager_test.cpp +++ b/searchcore/src/tests/proton/index/indexmanager_test.cpp @@ -830,6 +830,30 @@ TEST_F(IndexManagerTest, field_length_info_is_loaded_from_disk_index_during_star expect_field_length_info(1, 2, *as_memory_index(*sources, 1)); } +TEST_F(IndexManagerTest, fusion_can_be_stopped) +{ + resetIndexManager(); + + addDocument(docid); + flushIndexManager(); + addDocument(docid); + flushIndexManager(); + + IndexFusionTarget target(_index_manager->getMaintainer()); + auto flush_token = std::make_shared(); + flush_token->request_stop(); + vespalib::Executor::Task::UP fusionTask = target.initFlush(1, flush_token); + fusionTask->run(); + + FusionSpec spec = _index_manager->getMaintainer().getFusionSpec(); + set fusion_ids = readDiskIds(index_dir, "fusion"); + EXPECT_TRUE(fusion_ids.empty()); + EXPECT_EQ(0u, spec.last_fusion_id); + EXPECT_EQ(2u, spec.flush_ids.size()); + EXPECT_EQ(1u, spec.flush_ids[0]); + EXPECT_EQ(2u, spec.flush_ids[1]); +} + } // namespace int diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp index 5f35ecc916d..ab5b1ac5937 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp +++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp @@ -94,7 +94,9 @@ FlushEngine::FlushEngine(std::shared_ptr tlsStats _strategyLock(), _strategyCond(), _tlsStatsFactory(std::move(tlsStatsFactory)), - _pendingPrune() + _pendingPrune(), + _normal_flush_token(std::make_shared()), + _gc_flush_token(std::make_shared()) { } FlushEngine::~FlushEngine() @@ -117,6 +119,7 @@ FlushEngine::close() { std::lock_guard strategyGuard(_strategyLock); std::lock_guard guard(_lock); + _gc_flush_token->request_stop(); _closed = true; _cond.notify_all(); } @@ -269,6 +272,16 @@ FlushEngine::getSortedTargetList() return ret; } +std::shared_ptr +FlushEngine::get_flush_token(const FlushContext& ctx) +{ + if (ctx.getTarget()->getType() == IFlushTarget::Type::GC) { + return _gc_flush_token; + } else { + return _normal_flush_token; + } +} + FlushContext::SP FlushEngine::initNextFlush(const FlushContext::List &lst) { @@ -277,7 +290,7 @@ FlushEngine::initNextFlush(const FlushContext::List &lst) if (LOG_WOULD_LOG(event)) { EventLogger::flushInit(it->getName()); } - if (it->initFlush(std::make_shared())) { + if (it->initFlush(get_flush_token(*it))) { ctx = it; break; } @@ -294,7 +307,7 @@ FlushEngine::flushAll(const FlushContext::List &lst) LOG(debug, "%ld targets to flush.", lst.size()); for (const FlushContext::SP & ctx : lst) { if (wait(0)) { - if (ctx->initFlush(std::make_shared())) { + if (ctx->initFlush(get_flush_token(*ctx))) { logTarget("initiated", *ctx); _executor.execute(std::make_unique(initFlush(*ctx), *this, ctx)); } else { diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h index 160423c7c68..f51e93f0fbd 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h +++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h @@ -12,6 +12,8 @@ #include #include +namespace search { class FlushToken; } + namespace proton { namespace flushengine { class ITlsStatsFactory; } @@ -63,9 +65,12 @@ private: std::condition_variable _strategyCond; std::shared_ptr _tlsStatsFactory; std::set _pendingPrune; + std::shared_ptr _normal_flush_token; + std::shared_ptr _gc_flush_token; FlushContext::List getTargetList(bool includeFlushingTargets) const; std::pair getSortedTargetList(); + std::shared_ptr get_flush_token(const FlushContext& ctx); FlushContext::SP initNextFlush(const FlushContext::List &lst); vespalib::string flushNextTarget(const vespalib::string & name); void flushAll(const FlushContext::List &lst); -- cgit v1.2.3