summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@broadpark.no>2021-01-06 16:31:24 +0100
committerTor Egge <Tor.Egge@broadpark.no>2021-01-06 16:31:24 +0100
commite096765acfc1b28e6bce7855d7450824e4d287bb (patch)
tree1c4cc0e5de0d1623d096c55933f4d6d2178df3df /searchcore
parente1977829a79d9c7598fc99627861c6a822d16318 (diff)
Stop fusion when closing flush engine.
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/tests/proton/index/fusionrunner_test.cpp13
-rw-r--r--searchcore/src/tests/proton/index/indexmanager_test.cpp24
-rw-r--r--searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp19
-rw-r--r--searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h5
4 files changed, 58 insertions, 3 deletions
diff --git a/searchcore/src/tests/proton/index/fusionrunner_test.cpp b/searchcore/src/tests/proton/index/fusionrunner_test.cpp
index acd5c86fd5d..80e6e8b3db8 100644
--- a/searchcore/src/tests/proton/index/fusionrunner_test.cpp
+++ b/searchcore/src/tests/proton/index/fusionrunner_test.cpp
@@ -80,6 +80,7 @@ class Test : public vespalib::TestApp {
void requireThatFusionCanRunOnMultipleDiskIndexes();
void requireThatOldFusionIndexCanBePartOfNewFusion();
void requireThatSelectorsCanBeRebased();
+ void requireThatFusionCanBeStopped();
public:
Test()
@@ -111,6 +112,7 @@ Test::Main()
TEST_CALL(requireThatFusionCanRunOnMultipleDiskIndexes());
TEST_CALL(requireThatOldFusionIndexCanBePartOfNewFusion());
TEST_CALL(requireThatSelectorsCanBeRebased());
+ TEST_CALL(requireThatFusionCanBeStopped());
TEST_DONE();
}
@@ -324,6 +326,17 @@ void Test::requireThatSelectorsCanBeRebased() {
checkResults(fusion_id, disk_id, 3);
}
+void
+Test::requireThatFusionCanBeStopped()
+{
+ createIndex(base_dir, disk_id[0]);
+ createIndex(base_dir, disk_id[1]);
+ auto flush_token = std::make_shared<search::FlushToken>();
+ flush_token->request_stop();
+ uint32_t fusion_id = _fusion_runner->fuse(_fusion_spec, 0u, _ops, flush_token);
+ EXPECT_EQUAL(0u, fusion_id);
+}
+
} // namespace
TEST_APPHOOK(Test);
diff --git a/searchcore/src/tests/proton/index/indexmanager_test.cpp b/searchcore/src/tests/proton/index/indexmanager_test.cpp
index 67eb11cee3e..0589bdbda96 100644
--- a/searchcore/src/tests/proton/index/indexmanager_test.cpp
+++ b/searchcore/src/tests/proton/index/indexmanager_test.cpp
@@ -830,6 +830,30 @@ TEST_F(IndexManagerTest, field_length_info_is_loaded_from_disk_index_during_star
expect_field_length_info(1, 2, *as_memory_index(*sources, 1));
}
+TEST_F(IndexManagerTest, fusion_can_be_stopped)
+{
+ resetIndexManager();
+
+ addDocument(docid);
+ flushIndexManager();
+ addDocument(docid);
+ flushIndexManager();
+
+ IndexFusionTarget target(_index_manager->getMaintainer());
+ auto flush_token = std::make_shared<search::FlushToken>();
+ flush_token->request_stop();
+ vespalib::Executor::Task::UP fusionTask = target.initFlush(1, flush_token);
+ fusionTask->run();
+
+ FusionSpec spec = _index_manager->getMaintainer().getFusionSpec();
+ set<uint32_t> fusion_ids = readDiskIds(index_dir, "fusion");
+ EXPECT_TRUE(fusion_ids.empty());
+ EXPECT_EQ(0u, spec.last_fusion_id);
+ EXPECT_EQ(2u, spec.flush_ids.size());
+ EXPECT_EQ(1u, spec.flush_ids[0]);
+ EXPECT_EQ(2u, spec.flush_ids[1]);
+}
+
} // namespace
int
diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp
index 5f35ecc916d..ab5b1ac5937 100644
--- a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp
+++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp
@@ -94,7 +94,9 @@ FlushEngine::FlushEngine(std::shared_ptr<flushengine::ITlsStatsFactory> tlsStats
_strategyLock(),
_strategyCond(),
_tlsStatsFactory(std::move(tlsStatsFactory)),
- _pendingPrune()
+ _pendingPrune(),
+ _normal_flush_token(std::make_shared<search::FlushToken>()),
+ _gc_flush_token(std::make_shared<search::FlushToken>())
{ }
FlushEngine::~FlushEngine()
@@ -117,6 +119,7 @@ FlushEngine::close()
{
std::lock_guard<std::mutex> strategyGuard(_strategyLock);
std::lock_guard<std::mutex> guard(_lock);
+ _gc_flush_token->request_stop();
_closed = true;
_cond.notify_all();
}
@@ -269,6 +272,16 @@ FlushEngine::getSortedTargetList()
return ret;
}
+std::shared_ptr<search::IFlushToken>
+FlushEngine::get_flush_token(const FlushContext& ctx)
+{
+ if (ctx.getTarget()->getType() == IFlushTarget::Type::GC) {
+ return _gc_flush_token;
+ } else {
+ return _normal_flush_token;
+ }
+}
+
FlushContext::SP
FlushEngine::initNextFlush(const FlushContext::List &lst)
{
@@ -277,7 +290,7 @@ FlushEngine::initNextFlush(const FlushContext::List &lst)
if (LOG_WOULD_LOG(event)) {
EventLogger::flushInit(it->getName());
}
- if (it->initFlush(std::make_shared<search::FlushToken>())) {
+ if (it->initFlush(get_flush_token(*it))) {
ctx = it;
break;
}
@@ -294,7 +307,7 @@ FlushEngine::flushAll(const FlushContext::List &lst)
LOG(debug, "%ld targets to flush.", lst.size());
for (const FlushContext::SP & ctx : lst) {
if (wait(0)) {
- if (ctx->initFlush(std::make_shared<search::FlushToken>())) {
+ if (ctx->initFlush(get_flush_token(*ctx))) {
logTarget("initiated", *ctx);
_executor.execute(std::make_unique<FlushTask>(initFlush(*ctx), *this, ctx));
} else {
diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h
index 160423c7c68..f51e93f0fbd 100644
--- a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h
+++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h
@@ -12,6 +12,8 @@
#include <mutex>
#include <condition_variable>
+namespace search { class FlushToken; }
+
namespace proton {
namespace flushengine { class ITlsStatsFactory; }
@@ -63,9 +65,12 @@ private:
std::condition_variable _strategyCond;
std::shared_ptr<flushengine::ITlsStatsFactory> _tlsStatsFactory;
std::set<IFlushHandler::SP> _pendingPrune;
+ std::shared_ptr<search::FlushToken> _normal_flush_token;
+ std::shared_ptr<search::FlushToken> _gc_flush_token;
FlushContext::List getTargetList(bool includeFlushingTargets) const;
std::pair<FlushContext::List,bool> getSortedTargetList();
+ std::shared_ptr<search::IFlushToken> get_flush_token(const FlushContext& ctx);
FlushContext::SP initNextFlush(const FlushContext::List &lst);
vespalib::string flushNextTarget(const vespalib::string & name);
void flushAll(const FlushContext::List &lst);