summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2023-05-08 12:43:37 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2023-05-08 12:43:37 +0000
commit039a1a5c69128a7d2333f33f2a6fc46a410973eb (patch)
treee040160c4fb7f8aa0ae9c4f6e06a589566eae17f /searchcore
parent748eacb6cbd133fdc3927cb8f626fff62127a59a (diff)
Leave at least one slot available for high priority flush targets.
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/tests/proton/flushengine/flushengine_test.cpp44
-rw-r--r--searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp51
-rw-r--r--searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h9
3 files changed, 83 insertions, 21 deletions
diff --git a/searchcore/src/tests/proton/flushengine/flushengine_test.cpp b/searchcore/src/tests/proton/flushengine/flushengine_test.cpp
index 3fdc5a8ce9f..caa88e7305b 100644
--- a/searchcore/src/tests/proton/flushengine/flushengine_test.cpp
+++ b/searchcore/src/tests/proton/flushengine/flushengine_test.cpp
@@ -235,6 +235,7 @@ protected:
SimpleTarget(const std::string &name, const Type &type, search::SerialNum flushedSerial = 0, bool proceedImmediately = true) :
test::DummyFlushTarget(name, type, Component::OTHER),
_flushedSerial(flushedSerial),
+ _currentSerial(0),
_proceed(),
_initDone(),
_taskStart(),
@@ -266,7 +267,7 @@ public:
{ }
SimpleTarget(const std::string &name, search::SerialNum flushedSerial = 0, bool proceedImmediately = true)
- : SimpleTarget(name, Type::OTHER, flushedSerial, proceedImmediately)
+ : SimpleTarget(name, Type::OTHER, flushedSerial, proceedImmediately)
{ }
Time getLastFlushTime() const override { return vespalib::system_clock::now(); }
@@ -288,8 +289,19 @@ public:
class GCTarget : public SimpleTarget {
public:
GCTarget(const vespalib::string &name, search::SerialNum flushedSerial)
- : SimpleTarget(name, Type::GC, flushedSerial)
+ : SimpleTarget(name, Type::GC, flushedSerial)
+ {}
+};
+
+class HighPriorityTarget : public SimpleTarget {
+public:
+ HighPriorityTarget(const vespalib::string &name, search::SerialNum flushedSerial, bool proceed)
+ : SimpleTarget(name, Type::OTHER, flushedSerial, proceed)
{}
+
+ Priority getPriority() const override {
+ return Priority::HIGH;
+ }
};
class AssertedTarget : public SimpleTarget {
@@ -705,6 +717,34 @@ TEST_F("require that concurrency works", Fixture(2, 1ms))
target2->_proceed.countDown();
}
+TEST_F("require that high pri concurrency works", Fixture(2, 1ms))
+{
+ auto target1 = std::make_shared<SimpleTarget>("target1", 1, false);
+ auto target2 = std::make_shared<SimpleTarget>("target2", 2, false);
+ auto target3 = std::make_shared<SimpleTarget>("target3", 2, false);
+ auto target4 = std::make_shared<HighPriorityTarget>("target4", 3, false);
+ auto handler = std::make_shared<SimpleHandler>(Targets({target1, target2, target3, target4}), "handler", 9);
+ f.putFlushHandler("handler", handler);
+ f.engine.start();
+
+ EXPECT_TRUE(target1->_initDone.await(LONG_TIMEOUT));
+ EXPECT_TRUE(target2->_initDone.await(LONG_TIMEOUT));
+ EXPECT_TRUE(!target3->_initDone.await(SHORT_TIMEOUT));
+ EXPECT_TRUE(target4->_initDone.await(SHORT_TIMEOUT));
+ assertThatHandlersInCurrentSet(f.engine, {"handler.target1", "handler.target2", "handler.target4"});
+ EXPECT_TRUE(!target3->_initDone.await(SHORT_TIMEOUT));
+ target1->_proceed.countDown();
+ EXPECT_TRUE(target1->_taskDone.await(LONG_TIMEOUT));
+ EXPECT_TRUE(!target3->_initDone.await(SHORT_TIMEOUT));
+ assertThatHandlersInCurrentSet(f.engine, {"handler.target2", "handler.target4"});
+ target2->_proceed.countDown();
+ EXPECT_TRUE(target2->_taskDone.await(LONG_TIMEOUT));
+ EXPECT_TRUE(target3->_initDone.await(LONG_TIMEOUT));
+ assertThatHandlersInCurrentSet(f.engine, {"handler.target4", "handler.target3"});
+ target3->_proceed.countDown();
+ target4->_proceed.countDown();
+}
+
TEST_F("require that concurrency works with triggerFlush", Fixture(2, 1ms))
{
auto target1 = std::make_shared<SimpleTarget>("target1", 1, false);
diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp
index 51713f26307..905a864f470 100644
--- a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp
+++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp
@@ -147,20 +147,28 @@ FlushEngine::kick()
}
bool
-FlushEngine::canFlushMore(const std::unique_lock<std::mutex> &guard) const
+FlushEngine::canFlushMore(const std::unique_lock<std::mutex> &, IFlushTarget::Priority priority) const
{
- (void) guard;
- return _maxConcurrent > _flushing.size();
+ if (priority > IFlushTarget::Priority::NORMAL) {
+ return (_maxConcurrent + 1) > _flushing.size();
+ } else {
+ return _maxConcurrent > _flushing.size();
+ }
}
-bool
-FlushEngine::wait(vespalib::duration minimumWaitTimeIfReady, bool ignorePendingPrune)
-{
+void
+FlushEngine::idle_wait(vespalib::duration minimumWaitTimeIfReady) {
std::unique_lock<std::mutex> guard(_lock);
- if (canFlushMore(guard) && _pendingPrune.empty()) {
+ if (canFlushMore(guard, IFlushTarget::Priority::HIGH) && _pendingPrune.empty()) {
_cond.wait_for(guard, minimumWaitTimeIfReady);
}
- while ( ! canFlushMore(guard) && ( ignorePendingPrune || _pendingPrune.empty())) {
+}
+
+bool
+FlushEngine::wait_for_slot(IFlushTarget::Priority priority)
+{
+ std::unique_lock<std::mutex> guard(_lock);
+ while ( ! canFlushMore(guard, priority)) {
_cond.wait_for(guard, 1s); // broadcast when flush done
}
return !_closed.load(std::memory_order_relaxed);
@@ -169,13 +177,19 @@ FlushEngine::wait(vespalib::duration minimumWaitTimeIfReady, bool ignorePendingP
void
FlushEngine::wait_for_slot_or_pending_prune(IFlushTarget::Priority priority)
{
- (void) priority;
std::unique_lock<std::mutex> guard(_lock);
- while ( ! canFlushMore(guard) && _pendingPrune.empty()) {
+ while ( ! canFlushMore(guard, priority) && _pendingPrune.empty()) {
_cond.wait_for(guard, 1s); // broadcast when flush done
}
}
+bool
+FlushEngine::has_slot(IFlushTarget::Priority priority)
+{
+ std::unique_lock<std::mutex> guard(_lock);
+ return canFlushMore(guard, priority);
+}
+
vespalib::string
FlushEngine::checkAndFlush(vespalib::string prev) {
std::pair<FlushContext::List, bool> lst = getSortedTargetList();
@@ -183,8 +197,17 @@ FlushEngine::checkAndFlush(vespalib::string prev) {
// Everything returned from a priority strategy should be flushed
flushAll(lst.first);
} else if ( ! lst.first.empty()) {
- wait_for_slot_or_pending_prune(lst.first[0]->getTarget()->getPriority());
- prev = flushNextTarget(prev, lst.first);
+ if (has_slot(IFlushTarget::Priority::NORMAL)) {
+ prev = flushNextTarget(prev, lst.first);
+ } else {
+ FlushContext::List highPri;
+ for (const FlushContext::SP & ctx : lst.first) {
+ if (ctx->getTarget()->getPriority() > IFlushTarget::Priority::NORMAL) {
+ highPri.push_back(ctx);
+ }
+ }
+ prev = flushNextTarget(prev, highPri);
+ }
if (!prev.empty()) {
// Sleep 1 ms after a successful flush in order to avoid busy loop in case
// of strategy or target error.
@@ -208,7 +231,7 @@ FlushEngine::run()
} else {
prevFlushName = checkAndFlush(prevFlushName);
if (prevFlushName.empty()) {
- wait(idleInterval);
+ idle_wait(idleInterval);
}
}
}
@@ -351,7 +374,7 @@ FlushEngine::flushAll(const FlushContext::List &lst)
{
LOG(debug, "%ld targets to flush.", lst.size());
for (const FlushContext::SP & ctx : lst) {
- if (wait(vespalib::duration::zero(), true)) {
+ if (wait_for_slot(IFlushTarget::Priority::NORMAL)) {
if (ctx->initFlush(get_flush_token(*ctx))) {
logTarget("initiated", *ctx);
_executor.execute(std::make_unique<FlushTask>(initFlush(*ctx), *this, ctx));
diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h
index 0be086729fa..29d65c0f9fb 100644
--- a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h
+++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h
@@ -81,12 +81,11 @@ private:
uint32_t initFlush(const FlushContext &ctx);
uint32_t initFlush(const IFlushHandler::SP &handler, const IFlushTarget::SP &target);
void flushDone(const FlushContext &ctx, uint32_t taskId);
- bool canFlushMore(const std::unique_lock<std::mutex> &guard) const;
+ bool canFlushMore(const std::unique_lock<std::mutex> &guard, IFlushTarget::Priority priority) const;
void wait_for_slot_or_pending_prune(IFlushTarget::Priority priority);
- bool wait(vespalib::duration minimumWaitTimeIfReady, bool ignorePendingPrune);
- void wait(vespalib::duration minimumWaitTimeIfReady) {
- wait(minimumWaitTimeIfReady, false);
- }
+ void idle_wait(vespalib::duration minimumWaitTimeIfReady);
+ bool wait_for_slot(IFlushTarget::Priority priority);
+ bool has_slot(IFlushTarget::Priority priority);
bool isFlushing(const std::lock_guard<std::mutex> &guard, const vespalib::string & name) const;
vespalib::string checkAndFlush(vespalib::string prev);