aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2023-05-12 13:54:10 +0200
committerGitHub <noreply@github.com>2023-05-12 13:54:10 +0200
commitf2334d2df98f96eb66bf097a1c4bbc0f89ef4b3e (patch)
treebc2dadbe6bbbe6a8857fada04574b975cb3ac0b1
parentbef1950a75be8b256df07ca5ef6aacd1731c5ef9 (diff)
parentcb7330a28a940741fbddaeec622da0555ef93ec9 (diff)
Merge pull request #27091 from vespa-engine/balder/ensure-that-high-priority-targets-does-not-skip-the-queue
Balder/ensure that high priority targets does not skip the queue MERGEOK
-rw-r--r--searchcore/src/tests/proton/flushengine/flushengine_test.cpp91
-rw-r--r--searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp6
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/memoryflush.cpp29
3 files changed, 73 insertions, 53 deletions
diff --git a/searchcore/src/tests/proton/flushengine/flushengine_test.cpp b/searchcore/src/tests/proton/flushengine/flushengine_test.cpp
index fd6274e83a6..4033fe679ca 100644
--- a/searchcore/src/tests/proton/flushengine/flushengine_test.cpp
+++ b/searchcore/src/tests/proton/flushengine/flushengine_test.cpp
@@ -333,10 +333,13 @@ public:
class SimpleStrategy : public IFlushStrategy {
public:
+ using SP = std::shared_ptr<SimpleStrategy>;
+ enum class OrderBy {INDEX_OF, SERIAL};
std::vector<IFlushTarget::SP> _targets;
+ OrderBy _orderBy;
- struct CompareTarget {
- CompareTarget(const SimpleStrategy &flush) : _flush(flush) { }
+ struct CompareIndexOf {
+ CompareIndexOf(const SimpleStrategy &flush) : _flush(flush) { }
bool operator () (const FlushContext::SP &lhs, const FlushContext::SP &rhs) const {
return _flush.compare(lhs->getTarget(), rhs->getTarget());
}
@@ -347,7 +350,13 @@ public:
const flushengine::TlsStatsMap&,
const flushengine::ActiveFlushStats&) const override {
FlushContext::List fv(targetList);
- std::sort(fv.begin(), fv.end(), CompareTarget(*this));
+ if (_orderBy == OrderBy::INDEX_OF) {
+ std::sort(fv.begin(), fv.end(), CompareIndexOf(*this));
+ } else {
+ std::sort(fv.begin(), fv.end(), [](const auto & a, const auto & b) {
+ return a->getTarget()->getFlushedSerialNum() < b->getTarget()->getFlushedSerialNum(); }
+ );
+ }
return fv;
}
@@ -358,11 +367,7 @@ public:
return indexOf(lhs) < indexOf(rhs);
}
-
-public:
- using SP = std::shared_ptr<SimpleStrategy>;
-
- SimpleStrategy() noexcept : _targets() {}
+ SimpleStrategy(OrderBy orderBy) noexcept : _targets(), _orderBy(orderBy) {}
uint32_t
indexOf(const IFlushTarget::SP &target) const
@@ -389,6 +394,8 @@ public:
class NoFlushStrategy : public SimpleStrategy
{
+public:
+ NoFlushStrategy() noexcept : SimpleStrategy(OrderBy::INDEX_OF) {}
FlushContext::List getFlushTargets(const FlushContext::List &, const flushengine::TlsStatsMap &, const flushengine::ActiveFlushStats&) const override {
return {};
}
@@ -432,7 +439,7 @@ struct Fixture
{ }
Fixture(uint32_t numThreads, vespalib::duration idleInterval)
- : Fixture(numThreads, idleInterval, std::make_shared<SimpleStrategy>())
+ : Fixture(numThreads, idleInterval, std::make_shared<SimpleStrategy>(SimpleStrategy::OrderBy::INDEX_OF))
{ }
void putFlushHandler(const vespalib::string &docTypeName, IFlushHandler::SP handler) {
@@ -717,14 +724,14 @@ TEST_F("require that concurrency works", Fixture(2, 1ms))
target2->_proceed.countDown();
}
-TEST_F("require that high pri concurrency works", Fixture(2, 1ms))
+TEST_F("require that there is room for one and only one high pri target",
+ Fixture(2, 1ms, std::make_unique<SimpleStrategy>(SimpleStrategy::OrderBy::SERIAL)))
{
auto target1 = std::make_shared<SimpleTarget>("target1", 1, false);
auto target2 = std::make_shared<SimpleTarget>("target2", 2, false);
- auto target3 = std::make_shared<SimpleTarget>("target3", 2, false);
- auto target4 = std::make_shared<HighPriorityTarget>("target4", 3, false);
- auto target5 = std::make_shared<HighPriorityTarget>("target5", 5, false);
- auto handler = std::make_shared<SimpleHandler>(Targets({target1, target2, target3, target4, target5}), "handler", 9);
+ auto target3 = std::make_shared<HighPriorityTarget>("target3", 3, false);
+ auto target4 = std::make_shared<HighPriorityTarget>("target4", 4, false);
+ auto handler = std::make_shared<SimpleHandler>(Targets({target1, target2, target3, target4}), "handler", 9);
f.putFlushHandler("handler", handler);
f.engine.start();
EXPECT_EQUAL(2u, f.engine.maxConcurrentNormal());
@@ -733,28 +740,56 @@ TEST_F("require that high pri concurrency works", Fixture(2, 1ms))
EXPECT_TRUE(target1->_initDone.await(LONG_TIMEOUT));
EXPECT_TRUE(target2->_initDone.await(LONG_TIMEOUT));
- EXPECT_TRUE(target4->_initDone.await(LONG_TIMEOUT));
- EXPECT_FALSE(target3->_initDone.await(SHORT_TIMEOUT));
- EXPECT_FALSE(target5->_initDone.await(SHORT_TIMEOUT));
- assertThatHandlersInCurrentSet(f.engine, {"handler.target1", "handler.target2", "handler.target4"});
+ EXPECT_TRUE(target3->_initDone.await(LONG_TIMEOUT));
+ EXPECT_FALSE(target4->_initDone.await(SHORT_TIMEOUT));
+ assertThatHandlersInCurrentSet(f.engine, {"handler.target1", "handler.target2", "handler.target3"});
target1->_proceed.countDown();
EXPECT_TRUE(target1->_taskDone.await(LONG_TIMEOUT));
- EXPECT_TRUE(target5->_initDone.await(LONG_TIMEOUT));
- EXPECT_FALSE(target3->_initDone.await(SHORT_TIMEOUT));
- assertThatHandlersInCurrentSet(f.engine, {"handler.target2", "handler.target4", "handler.target5"});
+ EXPECT_TRUE(target4->_initDone.await(LONG_TIMEOUT));
+ assertThatHandlersInCurrentSet(f.engine, {"handler.target2", "handler.target3", "handler.target4"});
target2->_proceed.countDown();
EXPECT_TRUE(target2->_taskDone.await(LONG_TIMEOUT));
- EXPECT_FALSE(target3->_initDone.await(SHORT_TIMEOUT));
- assertThatHandlersInCurrentSet(f.engine, {"handler.target4", "handler.target5"});
+ assertThatHandlersInCurrentSet(f.engine, {"handler.target3", "handler.target4"});
+ target3->_proceed.countDown();
+ EXPECT_TRUE(target3->_taskDone.await(LONG_TIMEOUT));
+ assertThatHandlersInCurrentSet(f.engine, {"handler.target4"});
target4->_proceed.countDown();
EXPECT_TRUE(target4->_taskDone.await(LONG_TIMEOUT));
+ assertThatHandlersInCurrentSet(f.engine, {});
+}
+
+TEST_F("require that high priority does not jump the queue",
+ Fixture(2, 1ms, std::make_unique<SimpleStrategy>(SimpleStrategy::OrderBy::SERIAL)))
+{
+ auto target1 = std::make_shared<SimpleTarget>("target1", 1, false);
+ auto target2 = std::make_shared<SimpleTarget>("target2", 2, false);
+ auto target3 = std::make_shared<SimpleTarget>("target3", 3, false);
+ auto target4 = std::make_shared<HighPriorityTarget>("target4", 4, false);
+ auto handler = std::make_shared<SimpleHandler>(Targets({target1, target2, target3, target4}), "handler", 9);
+ f.putFlushHandler("handler", handler);
+ f.engine.start();
+ EXPECT_EQUAL(2u, f.engine.maxConcurrentNormal());
+ EXPECT_EQUAL(3u, f.engine.maxConcurrentTotal());
+ EXPECT_EQUAL(f.engine.maxConcurrentTotal(), f.engine.get_executor().getNumThreads());
+
+ EXPECT_TRUE(target1->_initDone.await(LONG_TIMEOUT));
+ EXPECT_TRUE(target2->_initDone.await(LONG_TIMEOUT));
+ EXPECT_FALSE(target3->_initDone.await(SHORT_TIMEOUT));
+ EXPECT_FALSE(target4->_initDone.await(SHORT_TIMEOUT));
+ assertThatHandlersInCurrentSet(f.engine, {"handler.target1", "handler.target2"});
+ target1->_proceed.countDown();
+ EXPECT_TRUE(target1->_taskDone.await(LONG_TIMEOUT));
EXPECT_TRUE(target3->_initDone.await(LONG_TIMEOUT));
- assertThatHandlersInCurrentSet(f.engine, {"handler.target5", "handler.target3"});
+ EXPECT_TRUE(target4->_initDone.await(LONG_TIMEOUT));
+ assertThatHandlersInCurrentSet(f.engine, {"handler.target2", "handler.target3", "handler.target4"});
+ target2->_proceed.countDown();
+ EXPECT_TRUE(target2->_taskDone.await(LONG_TIMEOUT));
+ assertThatHandlersInCurrentSet(f.engine, {"handler.target3", "handler.target4"});
target3->_proceed.countDown();
EXPECT_TRUE(target3->_taskDone.await(LONG_TIMEOUT));
- assertThatHandlersInCurrentSet(f.engine, {"handler.target5"});
- target5->_proceed.countDown();
- EXPECT_TRUE(target5->_taskDone.await(LONG_TIMEOUT));
+ assertThatHandlersInCurrentSet(f.engine, {"handler.target4"});
+ target4->_proceed.countDown();
+ EXPECT_TRUE(target4->_taskDone.await(LONG_TIMEOUT));
assertThatHandlersInCurrentSet(f.engine, {});
}
@@ -828,7 +863,7 @@ TEST_F("require that oldest serial is updated when finishing priority flush stra
auto target1 = std::make_shared<SimpleTarget>("target1", 10, true);
auto handler = f.addSimpleHandler({ target1 });
TEST_DO(f.assertOldestSerial(*handler, 10));
- f.engine.setStrategy(std::make_shared<SimpleStrategy>());
+ f.engine.setStrategy(std::make_shared<SimpleStrategy>(SimpleStrategy::OrderBy::INDEX_OF));
EXPECT_EQUAL(20u, handler->_oldestSerial);
}
diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp
index 2d47ea8fb4e..1916a324c6e 100644
--- a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp
+++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp
@@ -199,10 +199,8 @@ FlushEngine::checkAndFlush(vespalib::string prev) {
prev = flushNextTarget(prev, lst.first);
} else {
FlushContext::List highPri;
- for (const FlushContext::SP & ctx : lst.first) {
- if (ctx->getTarget()->getPriority() > IFlushTarget::Priority::NORMAL) {
- highPri.push_back(ctx);
- }
+ if (lst.first.front()->getTarget()->getPriority() > IFlushTarget::Priority::NORMAL) {
+ highPri.push_back(lst.first.front());
}
prev = flushNextTarget(prev, highPri);
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/memoryflush.cpp b/searchcore/src/vespa/searchcore/proton/server/memoryflush.cpp
index fbc6e2beaf5..0d297f02b1b 100644
--- a/searchcore/src/vespa/searchcore/proton/server/memoryflush.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/memoryflush.cpp
@@ -26,12 +26,6 @@ getSerialDiff(SerialNum localLastSerial, const IFlushTarget &target)
return localLastSerial - target.getFlushedSerialNum();
}
-vespalib::string
-getName(const IFlushHandler & handler, const IFlushTarget & target)
-{
- return (handler.getName() + "." + target.getName());
-}
-
uint64_t
estimateNeededTlsSizeForFlushTarget(const TlsStats &tlsStats, SerialNum flushedSerialNum)
{
@@ -155,15 +149,14 @@ MemoryFlush::getFlushTargets(const FlushContext::List& targetList,
config.maxMemoryGain, config.diskBloatFactor,
vespalib::to_s(config.maxTimeGain),
vespalib::to_s(_startTime.time_since_epoch()));
- for (size_t i(0), m(targetList.size()); i < m; i++) {
- const IFlushTarget & target(*targetList[i]->getTarget());
- const IFlushHandler & handler(*targetList[i]->getHandler());
+ for (const auto & ctx : targetList) {
+ const IFlushTarget & target(*ctx->getTarget());
+ const IFlushHandler & handler(*ctx->getHandler());
int64_t mgain(std::max(INT64_C(0), target.getApproxMemoryGain().gain()));
const IFlushTarget::DiskGain dgain(target.getApproxDiskGain());
totalDisk += dgain;
- SerialNum localLastSerial = targetList[i]->getLastSerial();
+ SerialNum localLastSerial = ctx->getLastSerial();
int64_t serialDiff = getSerialDiff(localLastSerial, target);
- vespalib::string name(getName(handler, target));
vespalib::system_time lastFlushTime = target.getLastFlushTime();
vespalib::duration timeDiff(now - (lastFlushTime > vespalib::system_time() ? lastFlushTime : _startTime));
totalMemory += mgain;
@@ -194,16 +187,10 @@ MemoryFlush::getFlushTargets(const FlushContext::List& targetList,
"tlsSize(%" PRIu64 "), tlsSizeNeeded(%" PRIu64 "), "
"flushedSerial(%" PRIu64 "), localLastSerial(%" PRIu64 "), serialDiff(%" PRId64 "), "
"lastFlushTime(%fs), nowTime(%fs), timeDiff(%fs), order(%s)",
- targetList[i]->getName().c_str(),
- totalMemory,
- mgain,
- totalDisk.gain(),
- dgain.gain(),
- tlsStats.getNumBytes(),
- estimateNeededTlsSizeForFlushTarget(tlsStats, target.getFlushedSerialNum()),
- target.getFlushedSerialNum(),
- localLastSerial,
- serialDiff,
+ ctx->getName().c_str(), totalMemory, mgain,
+ totalDisk.gain(), dgain.gain(),
+ tlsStats.getNumBytes(), estimateNeededTlsSizeForFlushTarget(tlsStats, target.getFlushedSerialNum()),
+ target.getFlushedSerialNum(), localLastSerial, serialDiff,
vespalib::to_s(lastFlushTime.time_since_epoch()),
vespalib::to_s(now.time_since_epoch()),
vespalib::to_s(timeDiff),