diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2023-05-12 11:04:22 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2023-05-12 11:04:22 +0000 |
commit | 580f4bfdda0e99ef19a28f75938dcd2c479ac4a1 (patch) | |
tree | bfa8196b461e83062f66915eb66035b59ff1de90 /searchcore | |
parent | bef1950a75be8b256df07ca5ef6aacd1731c5ef9 (diff) |
Test that high priority tragets do not skip the queue.
Diffstat (limited to 'searchcore')
-rw-r--r-- | searchcore/src/tests/proton/flushengine/flushengine_test.cpp | 91 |
1 files changed, 63 insertions, 28 deletions
diff --git a/searchcore/src/tests/proton/flushengine/flushengine_test.cpp b/searchcore/src/tests/proton/flushengine/flushengine_test.cpp index fd6274e83a6..4033fe679ca 100644 --- a/searchcore/src/tests/proton/flushengine/flushengine_test.cpp +++ b/searchcore/src/tests/proton/flushengine/flushengine_test.cpp @@ -333,10 +333,13 @@ public: class SimpleStrategy : public IFlushStrategy { public: + using SP = std::shared_ptr<SimpleStrategy>; + enum class OrderBy {INDEX_OF, SERIAL}; std::vector<IFlushTarget::SP> _targets; + OrderBy _orderBy; - struct CompareTarget { - CompareTarget(const SimpleStrategy &flush) : _flush(flush) { } + struct CompareIndexOf { + CompareIndexOf(const SimpleStrategy &flush) : _flush(flush) { } bool operator () (const FlushContext::SP &lhs, const FlushContext::SP &rhs) const { return _flush.compare(lhs->getTarget(), rhs->getTarget()); } @@ -347,7 +350,13 @@ public: const flushengine::TlsStatsMap&, const flushengine::ActiveFlushStats&) const override { FlushContext::List fv(targetList); - std::sort(fv.begin(), fv.end(), CompareTarget(*this)); + if (_orderBy == OrderBy::INDEX_OF) { + std::sort(fv.begin(), fv.end(), CompareIndexOf(*this)); + } else { + std::sort(fv.begin(), fv.end(), [](const auto & a, const auto & b) { + return a->getTarget()->getFlushedSerialNum() < b->getTarget()->getFlushedSerialNum(); } + ); + } return fv; } @@ -358,11 +367,7 @@ public: return indexOf(lhs) < indexOf(rhs); } - -public: - using SP = std::shared_ptr<SimpleStrategy>; - - SimpleStrategy() noexcept : _targets() {} + SimpleStrategy(OrderBy orderBy) noexcept : _targets(), _orderBy(orderBy) {} uint32_t indexOf(const IFlushTarget::SP &target) const @@ -389,6 +394,8 @@ public: class NoFlushStrategy : public SimpleStrategy { +public: + NoFlushStrategy() noexcept : SimpleStrategy(OrderBy::INDEX_OF) {} FlushContext::List getFlushTargets(const FlushContext::List &, const flushengine::TlsStatsMap &, const flushengine::ActiveFlushStats&) const override { return {}; } @@ -432,7 +439,7 @@ struct Fixture { } Fixture(uint32_t numThreads, vespalib::duration idleInterval) - : Fixture(numThreads, idleInterval, std::make_shared<SimpleStrategy>()) + : Fixture(numThreads, idleInterval, std::make_shared<SimpleStrategy>(SimpleStrategy::OrderBy::INDEX_OF)) { } void putFlushHandler(const vespalib::string &docTypeName, IFlushHandler::SP handler) { @@ -717,14 +724,14 @@ TEST_F("require that concurrency works", Fixture(2, 1ms)) target2->_proceed.countDown(); } -TEST_F("require that high pri concurrency works", Fixture(2, 1ms)) +TEST_F("require that there is room for one and only one high pri target", + Fixture(2, 1ms, std::make_unique<SimpleStrategy>(SimpleStrategy::OrderBy::SERIAL))) { auto target1 = std::make_shared<SimpleTarget>("target1", 1, false); auto target2 = std::make_shared<SimpleTarget>("target2", 2, false); - auto target3 = std::make_shared<SimpleTarget>("target3", 2, false); - auto target4 = std::make_shared<HighPriorityTarget>("target4", 3, false); - auto target5 = std::make_shared<HighPriorityTarget>("target5", 5, false); - auto handler = std::make_shared<SimpleHandler>(Targets({target1, target2, target3, target4, target5}), "handler", 9); + auto target3 = std::make_shared<HighPriorityTarget>("target3", 3, false); + auto target4 = std::make_shared<HighPriorityTarget>("target4", 4, false); + auto handler = std::make_shared<SimpleHandler>(Targets({target1, target2, target3, target4}), "handler", 9); f.putFlushHandler("handler", handler); f.engine.start(); EXPECT_EQUAL(2u, f.engine.maxConcurrentNormal()); @@ -733,28 +740,56 @@ TEST_F("require that high pri concurrency works", Fixture(2, 1ms)) EXPECT_TRUE(target1->_initDone.await(LONG_TIMEOUT)); EXPECT_TRUE(target2->_initDone.await(LONG_TIMEOUT)); - EXPECT_TRUE(target4->_initDone.await(LONG_TIMEOUT)); - EXPECT_FALSE(target3->_initDone.await(SHORT_TIMEOUT)); - EXPECT_FALSE(target5->_initDone.await(SHORT_TIMEOUT)); - assertThatHandlersInCurrentSet(f.engine, {"handler.target1", "handler.target2", "handler.target4"}); + EXPECT_TRUE(target3->_initDone.await(LONG_TIMEOUT)); + EXPECT_FALSE(target4->_initDone.await(SHORT_TIMEOUT)); + assertThatHandlersInCurrentSet(f.engine, {"handler.target1", "handler.target2", "handler.target3"}); target1->_proceed.countDown(); EXPECT_TRUE(target1->_taskDone.await(LONG_TIMEOUT)); - EXPECT_TRUE(target5->_initDone.await(LONG_TIMEOUT)); - EXPECT_FALSE(target3->_initDone.await(SHORT_TIMEOUT)); - assertThatHandlersInCurrentSet(f.engine, {"handler.target2", "handler.target4", "handler.target5"}); + EXPECT_TRUE(target4->_initDone.await(LONG_TIMEOUT)); + assertThatHandlersInCurrentSet(f.engine, {"handler.target2", "handler.target3", "handler.target4"}); target2->_proceed.countDown(); EXPECT_TRUE(target2->_taskDone.await(LONG_TIMEOUT)); - EXPECT_FALSE(target3->_initDone.await(SHORT_TIMEOUT)); - assertThatHandlersInCurrentSet(f.engine, {"handler.target4", "handler.target5"}); + assertThatHandlersInCurrentSet(f.engine, {"handler.target3", "handler.target4"}); + target3->_proceed.countDown(); + EXPECT_TRUE(target3->_taskDone.await(LONG_TIMEOUT)); + assertThatHandlersInCurrentSet(f.engine, {"handler.target4"}); target4->_proceed.countDown(); EXPECT_TRUE(target4->_taskDone.await(LONG_TIMEOUT)); + assertThatHandlersInCurrentSet(f.engine, {}); +} + +TEST_F("require that high priority does not jump the queue", + Fixture(2, 1ms, std::make_unique<SimpleStrategy>(SimpleStrategy::OrderBy::SERIAL))) +{ + auto target1 = std::make_shared<SimpleTarget>("target1", 1, false); + auto target2 = std::make_shared<SimpleTarget>("target2", 2, false); + auto target3 = std::make_shared<SimpleTarget>("target3", 3, false); + auto target4 = std::make_shared<HighPriorityTarget>("target4", 4, false); + auto handler = std::make_shared<SimpleHandler>(Targets({target1, target2, target3, target4}), "handler", 9); + f.putFlushHandler("handler", handler); + f.engine.start(); + EXPECT_EQUAL(2u, f.engine.maxConcurrentNormal()); + EXPECT_EQUAL(3u, f.engine.maxConcurrentTotal()); + EXPECT_EQUAL(f.engine.maxConcurrentTotal(), f.engine.get_executor().getNumThreads()); + + EXPECT_TRUE(target1->_initDone.await(LONG_TIMEOUT)); + EXPECT_TRUE(target2->_initDone.await(LONG_TIMEOUT)); + EXPECT_FALSE(target3->_initDone.await(SHORT_TIMEOUT)); + EXPECT_FALSE(target4->_initDone.await(SHORT_TIMEOUT)); + assertThatHandlersInCurrentSet(f.engine, {"handler.target1", "handler.target2"}); + target1->_proceed.countDown(); + EXPECT_TRUE(target1->_taskDone.await(LONG_TIMEOUT)); EXPECT_TRUE(target3->_initDone.await(LONG_TIMEOUT)); - assertThatHandlersInCurrentSet(f.engine, {"handler.target5", "handler.target3"}); + EXPECT_TRUE(target4->_initDone.await(LONG_TIMEOUT)); + assertThatHandlersInCurrentSet(f.engine, {"handler.target2", "handler.target3", "handler.target4"}); + target2->_proceed.countDown(); + EXPECT_TRUE(target2->_taskDone.await(LONG_TIMEOUT)); + assertThatHandlersInCurrentSet(f.engine, {"handler.target3", "handler.target4"}); target3->_proceed.countDown(); EXPECT_TRUE(target3->_taskDone.await(LONG_TIMEOUT)); - assertThatHandlersInCurrentSet(f.engine, {"handler.target5"}); - target5->_proceed.countDown(); - EXPECT_TRUE(target5->_taskDone.await(LONG_TIMEOUT)); + assertThatHandlersInCurrentSet(f.engine, {"handler.target4"}); + target4->_proceed.countDown(); + EXPECT_TRUE(target4->_taskDone.await(LONG_TIMEOUT)); assertThatHandlersInCurrentSet(f.engine, {}); } @@ -828,7 +863,7 @@ TEST_F("require that oldest serial is updated when finishing priority flush stra auto target1 = std::make_shared<SimpleTarget>("target1", 10, true); auto handler = f.addSimpleHandler({ target1 }); TEST_DO(f.assertOldestSerial(*handler, 10)); - f.engine.setStrategy(std::make_shared<SimpleStrategy>()); + f.engine.setStrategy(std::make_shared<SimpleStrategy>(SimpleStrategy::OrderBy::INDEX_OF)); EXPECT_EQUAL(20u, handler->_oldestSerial); } |