aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@broadpark.no>2021-01-06 16:31:24 +0100
committerTor Egge <Tor.Egge@broadpark.no>2021-01-06 16:31:24 +0100
commite096765acfc1b28e6bce7855d7450824e4d287bb (patch)
tree1c4cc0e5de0d1623d096c55933f4d6d2178df3df
parente1977829a79d9c7598fc99627861c6a822d16318 (diff)
Stop fusion when closing flush engine.
-rw-r--r--searchcore/src/tests/proton/index/fusionrunner_test.cpp13
-rw-r--r--searchcore/src/tests/proton/index/indexmanager_test.cpp24
-rw-r--r--searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp19
-rw-r--r--searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h5
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp17
-rw-r--r--searchlib/src/tests/diskindex/fusion/fusion_test.cpp6
-rw-r--r--searchlib/src/vespa/searchlib/util/postingpriorityqueue.h2
7 files changed, 75 insertions, 11 deletions
diff --git a/searchcore/src/tests/proton/index/fusionrunner_test.cpp b/searchcore/src/tests/proton/index/fusionrunner_test.cpp
index acd5c86fd5d..80e6e8b3db8 100644
--- a/searchcore/src/tests/proton/index/fusionrunner_test.cpp
+++ b/searchcore/src/tests/proton/index/fusionrunner_test.cpp
@@ -80,6 +80,7 @@ class Test : public vespalib::TestApp {
void requireThatFusionCanRunOnMultipleDiskIndexes();
void requireThatOldFusionIndexCanBePartOfNewFusion();
void requireThatSelectorsCanBeRebased();
+ void requireThatFusionCanBeStopped();
public:
Test()
@@ -111,6 +112,7 @@ Test::Main()
TEST_CALL(requireThatFusionCanRunOnMultipleDiskIndexes());
TEST_CALL(requireThatOldFusionIndexCanBePartOfNewFusion());
TEST_CALL(requireThatSelectorsCanBeRebased());
+ TEST_CALL(requireThatFusionCanBeStopped());
TEST_DONE();
}
@@ -324,6 +326,17 @@ void Test::requireThatSelectorsCanBeRebased() {
checkResults(fusion_id, disk_id, 3);
}
+void
+Test::requireThatFusionCanBeStopped()
+{
+ createIndex(base_dir, disk_id[0]);
+ createIndex(base_dir, disk_id[1]);
+ auto flush_token = std::make_shared<search::FlushToken>();
+ flush_token->request_stop();
+ uint32_t fusion_id = _fusion_runner->fuse(_fusion_spec, 0u, _ops, flush_token);
+ EXPECT_EQUAL(0u, fusion_id);
+}
+
} // namespace
TEST_APPHOOK(Test);
diff --git a/searchcore/src/tests/proton/index/indexmanager_test.cpp b/searchcore/src/tests/proton/index/indexmanager_test.cpp
index 67eb11cee3e..0589bdbda96 100644
--- a/searchcore/src/tests/proton/index/indexmanager_test.cpp
+++ b/searchcore/src/tests/proton/index/indexmanager_test.cpp
@@ -830,6 +830,30 @@ TEST_F(IndexManagerTest, field_length_info_is_loaded_from_disk_index_during_star
expect_field_length_info(1, 2, *as_memory_index(*sources, 1));
}
+TEST_F(IndexManagerTest, fusion_can_be_stopped)
+{
+ resetIndexManager();
+
+ addDocument(docid);
+ flushIndexManager();
+ addDocument(docid);
+ flushIndexManager();
+
+ IndexFusionTarget target(_index_manager->getMaintainer());
+ auto flush_token = std::make_shared<search::FlushToken>();
+ flush_token->request_stop();
+ vespalib::Executor::Task::UP fusionTask = target.initFlush(1, flush_token);
+ fusionTask->run();
+
+ FusionSpec spec = _index_manager->getMaintainer().getFusionSpec();
+ set<uint32_t> fusion_ids = readDiskIds(index_dir, "fusion");
+ EXPECT_TRUE(fusion_ids.empty());
+ EXPECT_EQ(0u, spec.last_fusion_id);
+ EXPECT_EQ(2u, spec.flush_ids.size());
+ EXPECT_EQ(1u, spec.flush_ids[0]);
+ EXPECT_EQ(2u, spec.flush_ids[1]);
+}
+
} // namespace
int
diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp
index 5f35ecc916d..ab5b1ac5937 100644
--- a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp
+++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp
@@ -94,7 +94,9 @@ FlushEngine::FlushEngine(std::shared_ptr<flushengine::ITlsStatsFactory> tlsStats
_strategyLock(),
_strategyCond(),
_tlsStatsFactory(std::move(tlsStatsFactory)),
- _pendingPrune()
+ _pendingPrune(),
+ _normal_flush_token(std::make_shared<search::FlushToken>()),
+ _gc_flush_token(std::make_shared<search::FlushToken>())
{ }
FlushEngine::~FlushEngine()
@@ -117,6 +119,7 @@ FlushEngine::close()
{
std::lock_guard<std::mutex> strategyGuard(_strategyLock);
std::lock_guard<std::mutex> guard(_lock);
+ _gc_flush_token->request_stop();
_closed = true;
_cond.notify_all();
}
@@ -269,6 +272,16 @@ FlushEngine::getSortedTargetList()
return ret;
}
+std::shared_ptr<search::IFlushToken>
+FlushEngine::get_flush_token(const FlushContext& ctx)
+{
+ if (ctx.getTarget()->getType() == IFlushTarget::Type::GC) {
+ return _gc_flush_token;
+ } else {
+ return _normal_flush_token;
+ }
+}
+
FlushContext::SP
FlushEngine::initNextFlush(const FlushContext::List &lst)
{
@@ -277,7 +290,7 @@ FlushEngine::initNextFlush(const FlushContext::List &lst)
if (LOG_WOULD_LOG(event)) {
EventLogger::flushInit(it->getName());
}
- if (it->initFlush(std::make_shared<search::FlushToken>())) {
+ if (it->initFlush(get_flush_token(*it))) {
ctx = it;
break;
}
@@ -294,7 +307,7 @@ FlushEngine::flushAll(const FlushContext::List &lst)
LOG(debug, "%ld targets to flush.", lst.size());
for (const FlushContext::SP & ctx : lst) {
if (wait(0)) {
- if (ctx->initFlush(std::make_shared<search::FlushToken>())) {
+ if (ctx->initFlush(get_flush_token(*ctx))) {
logTarget("initiated", *ctx);
_executor.execute(std::make_unique<FlushTask>(initFlush(*ctx), *this, ctx));
} else {
diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h
index 160423c7c68..f51e93f0fbd 100644
--- a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h
+++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h
@@ -12,6 +12,8 @@
#include <mutex>
#include <condition_variable>
+namespace search { class FlushToken; }
+
namespace proton {
namespace flushengine { class ITlsStatsFactory; }
@@ -63,9 +65,12 @@ private:
std::condition_variable _strategyCond;
std::shared_ptr<flushengine::ITlsStatsFactory> _tlsStatsFactory;
std::set<IFlushHandler::SP> _pendingPrune;
+ std::shared_ptr<search::FlushToken> _normal_flush_token;
+ std::shared_ptr<search::FlushToken> _gc_flush_token;
FlushContext::List getTargetList(bool includeFlushingTargets) const;
std::pair<FlushContext::List,bool> getSortedTargetList();
+ std::shared_ptr<search::IFlushToken> get_flush_token(const FlushContext& ctx);
FlushContext::SP initNextFlush(const FlushContext::List &lst);
vespalib::string flushNextTarget(const vespalib::string & name);
void flushAll(const FlushContext::List &lst);
diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp
index e2bcb8b7629..9c180c39144 100644
--- a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp
+++ b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp
@@ -10,6 +10,7 @@
#include "indexwriteutilities.h"
#include <vespa/fastos/file.h>
#include <vespa/searchcorespi/flush/closureflushtask.h>
+#include <vespa/searchlib/common/i_flush_token.h>
#include <vespa/searchlib/index/schemautil.h>
#include <vespa/searchlib/util/dirtraverse.h>
#include <vespa/searchlib/util/filekit.h>
@@ -984,11 +985,15 @@ IndexMaintainer::doFusion(SerialNum serialNum, std::shared_ptr<search::IFlushTok
_fusion_spec.flush_ids.clear();
}
- uint32_t new_fusion_id = runFusion(spec, std::move(flush_token));
+ uint32_t new_fusion_id = runFusion(spec, flush_token);
LockGuard lock(_fusion_lock);
if (new_fusion_id == spec.last_fusion_id) { // Error running fusion.
- LOG(warning, "Fusion failed for id %u.", spec.flush_ids.back());
+ if (flush_token->stop_requested()) {
+ LOG(info, "Fusion stopped for id %u.", spec.flush_ids.back());
+ } else {
+ LOG(warning, "Fusion failed for id %u.", spec.flush_ids.back());
+ }
// Restore fusion spec.
copy(_fusion_spec.flush_ids.begin(), _fusion_spec.flush_ids.end(), back_inserter(spec.flush_ids));
_fusion_spec.flush_ids.swap(spec.flush_ids);
@@ -1020,14 +1025,18 @@ IndexMaintainer::runFusion(const FusionSpec &fusion_spec, std::shared_ptr<search
serialNum = IndexReadUtilities::readSerialNum(lastFlushDir);
}
FusionRunner fusion_runner(_base_dir, args._schema, tuneFileAttributes, _ctx.getFileHeaderContext());
- uint32_t new_fusion_id = fusion_runner.fuse(fusion_spec, serialNum, _operations, std::move(flush_token));
+ uint32_t new_fusion_id = fusion_runner.fuse(fusion_spec, serialNum, _operations, flush_token);
bool ok = (new_fusion_id != 0);
if (ok) {
ok = IndexWriteUtilities::copySerialNumFile(getFlushDir(fusion_spec.flush_ids.back()),
getFusionDir(new_fusion_id));
}
if (!ok) {
- LOG(error, "Fusion failed.");
+ if (flush_token->stop_requested()) {
+ LOG(info, "Fusion stopped.");
+ } else {
+ LOG(error, "Fusion failed.");
+ }
string fail_dir = getFusionDir(fusion_spec.flush_ids.back());
FastOS_FileInterface::EmptyAndRemoveDirectory(fail_dir.c_str());
{
diff --git a/searchlib/src/tests/diskindex/fusion/fusion_test.cpp b/searchlib/src/tests/diskindex/fusion/fusion_test.cpp
index efc9e99bf88..4c62140b731 100644
--- a/searchlib/src/tests/diskindex/fusion/fusion_test.cpp
+++ b/searchlib/src/tests/diskindex/fusion/fusion_test.cpp
@@ -602,15 +602,15 @@ TEST_F(FusionTest, require_that_fusion_can_be_stopped)
auto flush_token = std::make_shared<MyFlushToken>(10000);
make_simple_index("stopdump2", MockFieldLengthInspector());
ASSERT_TRUE(try_merge_simple_indexes("stopdump3", {"stopdump2"}, flush_token));
- EXPECT_EQ(40, flush_token->get_checks());
+ EXPECT_EQ(48, flush_token->get_checks());
vespalib::rmdir("stopdump3", true);
flush_token = std::make_shared<MyFlushToken>(1);
ASSERT_FALSE(try_merge_simple_indexes("stopdump3", {"stopdump2"}, flush_token));
EXPECT_EQ(12, flush_token->get_checks());
vespalib::rmdir("stopdump3", true);
- flush_token = std::make_shared<MyFlushToken>(39);
+ flush_token = std::make_shared<MyFlushToken>(47);
ASSERT_FALSE(try_merge_simple_indexes("stopdump3", {"stopdump2"}, flush_token));
- EXPECT_EQ(41, flush_token->get_checks());
+ EXPECT_EQ(49, flush_token->get_checks());
clean_stopped_fusion_testdirs();
}
diff --git a/searchlib/src/vespa/searchlib/util/postingpriorityqueue.h b/searchlib/src/vespa/searchlib/util/postingpriorityqueue.h
index baf38035210..008e9055e57 100644
--- a/searchlib/src/vespa/searchlib/util/postingpriorityqueue.h
+++ b/searchlib/src/vespa/searchlib/util/postingpriorityqueue.h
@@ -221,7 +221,7 @@ PostingPriorityQueue<IN>::merge(OUT &out, uint32_t heapLimit, const IFlushToken&
(this->*mergeHeapFunc)(out, flush_token);
return;
}
- for (;;) {
+ while (!flush_token.stop_requested()) {
if (_vec.size() == 1) {
void (*mergeOneFunc)(OUT &out, IN &in, const IFlushToken& flush_token) =
&PostingPriorityQueue<IN>::mergeOne;