diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2023-05-12 13:54:10 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-05-12 13:54:10 +0200 |
commit | f2334d2df98f96eb66bf097a1c4bbc0f89ef4b3e (patch) | |
tree | bc2dadbe6bbbe6a8857fada04574b975cb3ac0b1 | |
parent | bef1950a75be8b256df07ca5ef6aacd1731c5ef9 (diff) | |
parent | cb7330a28a940741fbddaeec622da0555ef93ec9 (diff) |
Merge pull request #27091 from vespa-engine/balder/ensure-that-high-priority-targets-does-not-skip-the-queue
Balder/ensure that high priority targets does not skip the queue MERGEOK
3 files changed, 73 insertions, 53 deletions
diff --git a/searchcore/src/tests/proton/flushengine/flushengine_test.cpp b/searchcore/src/tests/proton/flushengine/flushengine_test.cpp index fd6274e83a6..4033fe679ca 100644 --- a/searchcore/src/tests/proton/flushengine/flushengine_test.cpp +++ b/searchcore/src/tests/proton/flushengine/flushengine_test.cpp @@ -333,10 +333,13 @@ public: class SimpleStrategy : public IFlushStrategy { public: + using SP = std::shared_ptr<SimpleStrategy>; + enum class OrderBy {INDEX_OF, SERIAL}; std::vector<IFlushTarget::SP> _targets; + OrderBy _orderBy; - struct CompareTarget { - CompareTarget(const SimpleStrategy &flush) : _flush(flush) { } + struct CompareIndexOf { + CompareIndexOf(const SimpleStrategy &flush) : _flush(flush) { } bool operator () (const FlushContext::SP &lhs, const FlushContext::SP &rhs) const { return _flush.compare(lhs->getTarget(), rhs->getTarget()); } @@ -347,7 +350,13 @@ public: const flushengine::TlsStatsMap&, const flushengine::ActiveFlushStats&) const override { FlushContext::List fv(targetList); - std::sort(fv.begin(), fv.end(), CompareTarget(*this)); + if (_orderBy == OrderBy::INDEX_OF) { + std::sort(fv.begin(), fv.end(), CompareIndexOf(*this)); + } else { + std::sort(fv.begin(), fv.end(), [](const auto & a, const auto & b) { + return a->getTarget()->getFlushedSerialNum() < b->getTarget()->getFlushedSerialNum(); } + ); + } return fv; } @@ -358,11 +367,7 @@ public: return indexOf(lhs) < indexOf(rhs); } - -public: - using SP = std::shared_ptr<SimpleStrategy>; - - SimpleStrategy() noexcept : _targets() {} + SimpleStrategy(OrderBy orderBy) noexcept : _targets(), _orderBy(orderBy) {} uint32_t indexOf(const IFlushTarget::SP &target) const @@ -389,6 +394,8 @@ public: class NoFlushStrategy : public SimpleStrategy { +public: + NoFlushStrategy() noexcept : SimpleStrategy(OrderBy::INDEX_OF) {} FlushContext::List getFlushTargets(const FlushContext::List &, const flushengine::TlsStatsMap &, const flushengine::ActiveFlushStats&) const override { return {}; } @@ -432,7 +439,7 @@ struct Fixture { } Fixture(uint32_t numThreads, vespalib::duration idleInterval) - : Fixture(numThreads, idleInterval, std::make_shared<SimpleStrategy>()) + : Fixture(numThreads, idleInterval, std::make_shared<SimpleStrategy>(SimpleStrategy::OrderBy::INDEX_OF)) { } void putFlushHandler(const vespalib::string &docTypeName, IFlushHandler::SP handler) { @@ -717,14 +724,14 @@ TEST_F("require that concurrency works", Fixture(2, 1ms)) target2->_proceed.countDown(); } -TEST_F("require that high pri concurrency works", Fixture(2, 1ms)) +TEST_F("require that there is room for one and only one high pri target", + Fixture(2, 1ms, std::make_unique<SimpleStrategy>(SimpleStrategy::OrderBy::SERIAL))) { auto target1 = std::make_shared<SimpleTarget>("target1", 1, false); auto target2 = std::make_shared<SimpleTarget>("target2", 2, false); - auto target3 = std::make_shared<SimpleTarget>("target3", 2, false); - auto target4 = std::make_shared<HighPriorityTarget>("target4", 3, false); - auto target5 = std::make_shared<HighPriorityTarget>("target5", 5, false); - auto handler = std::make_shared<SimpleHandler>(Targets({target1, target2, target3, target4, target5}), "handler", 9); + auto target3 = std::make_shared<HighPriorityTarget>("target3", 3, false); + auto target4 = std::make_shared<HighPriorityTarget>("target4", 4, false); + auto handler = std::make_shared<SimpleHandler>(Targets({target1, target2, target3, target4}), "handler", 9); f.putFlushHandler("handler", handler); f.engine.start(); EXPECT_EQUAL(2u, f.engine.maxConcurrentNormal()); @@ -733,28 +740,56 @@ TEST_F("require that high pri concurrency works", Fixture(2, 1ms)) EXPECT_TRUE(target1->_initDone.await(LONG_TIMEOUT)); EXPECT_TRUE(target2->_initDone.await(LONG_TIMEOUT)); - EXPECT_TRUE(target4->_initDone.await(LONG_TIMEOUT)); - EXPECT_FALSE(target3->_initDone.await(SHORT_TIMEOUT)); - EXPECT_FALSE(target5->_initDone.await(SHORT_TIMEOUT)); - assertThatHandlersInCurrentSet(f.engine, {"handler.target1", "handler.target2", "handler.target4"}); + EXPECT_TRUE(target3->_initDone.await(LONG_TIMEOUT)); + EXPECT_FALSE(target4->_initDone.await(SHORT_TIMEOUT)); + assertThatHandlersInCurrentSet(f.engine, {"handler.target1", "handler.target2", "handler.target3"}); target1->_proceed.countDown(); EXPECT_TRUE(target1->_taskDone.await(LONG_TIMEOUT)); - EXPECT_TRUE(target5->_initDone.await(LONG_TIMEOUT)); - EXPECT_FALSE(target3->_initDone.await(SHORT_TIMEOUT)); - assertThatHandlersInCurrentSet(f.engine, {"handler.target2", "handler.target4", "handler.target5"}); + EXPECT_TRUE(target4->_initDone.await(LONG_TIMEOUT)); + assertThatHandlersInCurrentSet(f.engine, {"handler.target2", "handler.target3", "handler.target4"}); target2->_proceed.countDown(); EXPECT_TRUE(target2->_taskDone.await(LONG_TIMEOUT)); - EXPECT_FALSE(target3->_initDone.await(SHORT_TIMEOUT)); - assertThatHandlersInCurrentSet(f.engine, {"handler.target4", "handler.target5"}); + assertThatHandlersInCurrentSet(f.engine, {"handler.target3", "handler.target4"}); + target3->_proceed.countDown(); + EXPECT_TRUE(target3->_taskDone.await(LONG_TIMEOUT)); + assertThatHandlersInCurrentSet(f.engine, {"handler.target4"}); target4->_proceed.countDown(); EXPECT_TRUE(target4->_taskDone.await(LONG_TIMEOUT)); + assertThatHandlersInCurrentSet(f.engine, {}); +} + +TEST_F("require that high priority does not jump the queue", + Fixture(2, 1ms, std::make_unique<SimpleStrategy>(SimpleStrategy::OrderBy::SERIAL))) +{ + auto target1 = std::make_shared<SimpleTarget>("target1", 1, false); + auto target2 = std::make_shared<SimpleTarget>("target2", 2, false); + auto target3 = std::make_shared<SimpleTarget>("target3", 3, false); + auto target4 = std::make_shared<HighPriorityTarget>("target4", 4, false); + auto handler = std::make_shared<SimpleHandler>(Targets({target1, target2, target3, target4}), "handler", 9); + f.putFlushHandler("handler", handler); + f.engine.start(); + EXPECT_EQUAL(2u, f.engine.maxConcurrentNormal()); + EXPECT_EQUAL(3u, f.engine.maxConcurrentTotal()); + EXPECT_EQUAL(f.engine.maxConcurrentTotal(), f.engine.get_executor().getNumThreads()); + + EXPECT_TRUE(target1->_initDone.await(LONG_TIMEOUT)); + EXPECT_TRUE(target2->_initDone.await(LONG_TIMEOUT)); + EXPECT_FALSE(target3->_initDone.await(SHORT_TIMEOUT)); + EXPECT_FALSE(target4->_initDone.await(SHORT_TIMEOUT)); + assertThatHandlersInCurrentSet(f.engine, {"handler.target1", "handler.target2"}); + target1->_proceed.countDown(); + EXPECT_TRUE(target1->_taskDone.await(LONG_TIMEOUT)); EXPECT_TRUE(target3->_initDone.await(LONG_TIMEOUT)); - assertThatHandlersInCurrentSet(f.engine, {"handler.target5", "handler.target3"}); + EXPECT_TRUE(target4->_initDone.await(LONG_TIMEOUT)); + assertThatHandlersInCurrentSet(f.engine, {"handler.target2", "handler.target3", "handler.target4"}); + target2->_proceed.countDown(); + EXPECT_TRUE(target2->_taskDone.await(LONG_TIMEOUT)); + assertThatHandlersInCurrentSet(f.engine, {"handler.target3", "handler.target4"}); target3->_proceed.countDown(); EXPECT_TRUE(target3->_taskDone.await(LONG_TIMEOUT)); - assertThatHandlersInCurrentSet(f.engine, {"handler.target5"}); - target5->_proceed.countDown(); - EXPECT_TRUE(target5->_taskDone.await(LONG_TIMEOUT)); + assertThatHandlersInCurrentSet(f.engine, {"handler.target4"}); + target4->_proceed.countDown(); + EXPECT_TRUE(target4->_taskDone.await(LONG_TIMEOUT)); assertThatHandlersInCurrentSet(f.engine, {}); } @@ -828,7 +863,7 @@ TEST_F("require that oldest serial is updated when finishing priority flush stra auto target1 = std::make_shared<SimpleTarget>("target1", 10, true); auto handler = f.addSimpleHandler({ target1 }); TEST_DO(f.assertOldestSerial(*handler, 10)); - f.engine.setStrategy(std::make_shared<SimpleStrategy>()); + f.engine.setStrategy(std::make_shared<SimpleStrategy>(SimpleStrategy::OrderBy::INDEX_OF)); EXPECT_EQUAL(20u, handler->_oldestSerial); } diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp index 2d47ea8fb4e..1916a324c6e 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp +++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp @@ -199,10 +199,8 @@ FlushEngine::checkAndFlush(vespalib::string prev) { prev = flushNextTarget(prev, lst.first); } else { FlushContext::List highPri; - for (const FlushContext::SP & ctx : lst.first) { - if (ctx->getTarget()->getPriority() > IFlushTarget::Priority::NORMAL) { - highPri.push_back(ctx); - } + if (lst.first.front()->getTarget()->getPriority() > IFlushTarget::Priority::NORMAL) { + highPri.push_back(lst.first.front()); } prev = flushNextTarget(prev, highPri); } diff --git a/searchcore/src/vespa/searchcore/proton/server/memoryflush.cpp b/searchcore/src/vespa/searchcore/proton/server/memoryflush.cpp index fbc6e2beaf5..0d297f02b1b 100644 --- a/searchcore/src/vespa/searchcore/proton/server/memoryflush.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/memoryflush.cpp @@ -26,12 +26,6 @@ getSerialDiff(SerialNum localLastSerial, const IFlushTarget &target) return localLastSerial - target.getFlushedSerialNum(); } -vespalib::string -getName(const IFlushHandler & handler, const IFlushTarget & target) -{ - return (handler.getName() + "." + target.getName()); -} - uint64_t estimateNeededTlsSizeForFlushTarget(const TlsStats &tlsStats, SerialNum flushedSerialNum) { @@ -155,15 +149,14 @@ MemoryFlush::getFlushTargets(const FlushContext::List& targetList, config.maxMemoryGain, config.diskBloatFactor, vespalib::to_s(config.maxTimeGain), vespalib::to_s(_startTime.time_since_epoch())); - for (size_t i(0), m(targetList.size()); i < m; i++) { - const IFlushTarget & target(*targetList[i]->getTarget()); - const IFlushHandler & handler(*targetList[i]->getHandler()); + for (const auto & ctx : targetList) { + const IFlushTarget & target(*ctx->getTarget()); + const IFlushHandler & handler(*ctx->getHandler()); int64_t mgain(std::max(INT64_C(0), target.getApproxMemoryGain().gain())); const IFlushTarget::DiskGain dgain(target.getApproxDiskGain()); totalDisk += dgain; - SerialNum localLastSerial = targetList[i]->getLastSerial(); + SerialNum localLastSerial = ctx->getLastSerial(); int64_t serialDiff = getSerialDiff(localLastSerial, target); - vespalib::string name(getName(handler, target)); vespalib::system_time lastFlushTime = target.getLastFlushTime(); vespalib::duration timeDiff(now - (lastFlushTime > vespalib::system_time() ? lastFlushTime : _startTime)); totalMemory += mgain; @@ -194,16 +187,10 @@ MemoryFlush::getFlushTargets(const FlushContext::List& targetList, "tlsSize(%" PRIu64 "), tlsSizeNeeded(%" PRIu64 "), " "flushedSerial(%" PRIu64 "), localLastSerial(%" PRIu64 "), serialDiff(%" PRId64 "), " "lastFlushTime(%fs), nowTime(%fs), timeDiff(%fs), order(%s)", - targetList[i]->getName().c_str(), - totalMemory, - mgain, - totalDisk.gain(), - dgain.gain(), - tlsStats.getNumBytes(), - estimateNeededTlsSizeForFlushTarget(tlsStats, target.getFlushedSerialNum()), - target.getFlushedSerialNum(), - localLastSerial, - serialDiff, + ctx->getName().c_str(), totalMemory, mgain, + totalDisk.gain(), dgain.gain(), + tlsStats.getNumBytes(), estimateNeededTlsSizeForFlushTarget(tlsStats, target.getFlushedSerialNum()), + target.getFlushedSerialNum(), localLastSerial, serialDiff, vespalib::to_s(lastFlushTime.time_since_epoch()), vespalib::to_s(now.time_since_epoch()), vespalib::to_s(timeDiff), |