aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2023-05-12 11:04:22 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2023-05-12 11:04:22 +0000
commit580f4bfdda0e99ef19a28f75938dcd2c479ac4a1 (patch)
treebfa8196b461e83062f66915eb66035b59ff1de90
parentbef1950a75be8b256df07ca5ef6aacd1731c5ef9 (diff)
Test that high priority tragets do not skip the queue.
-rw-r--r--searchcore/src/tests/proton/flushengine/flushengine_test.cpp91
1 files changed, 63 insertions, 28 deletions
diff --git a/searchcore/src/tests/proton/flushengine/flushengine_test.cpp b/searchcore/src/tests/proton/flushengine/flushengine_test.cpp
index fd6274e83a6..4033fe679ca 100644
--- a/searchcore/src/tests/proton/flushengine/flushengine_test.cpp
+++ b/searchcore/src/tests/proton/flushengine/flushengine_test.cpp
@@ -333,10 +333,13 @@ public:
class SimpleStrategy : public IFlushStrategy {
public:
+ using SP = std::shared_ptr<SimpleStrategy>;
+ enum class OrderBy {INDEX_OF, SERIAL};
std::vector<IFlushTarget::SP> _targets;
+ OrderBy _orderBy;
- struct CompareTarget {
- CompareTarget(const SimpleStrategy &flush) : _flush(flush) { }
+ struct CompareIndexOf {
+ CompareIndexOf(const SimpleStrategy &flush) : _flush(flush) { }
bool operator () (const FlushContext::SP &lhs, const FlushContext::SP &rhs) const {
return _flush.compare(lhs->getTarget(), rhs->getTarget());
}
@@ -347,7 +350,13 @@ public:
const flushengine::TlsStatsMap&,
const flushengine::ActiveFlushStats&) const override {
FlushContext::List fv(targetList);
- std::sort(fv.begin(), fv.end(), CompareTarget(*this));
+ if (_orderBy == OrderBy::INDEX_OF) {
+ std::sort(fv.begin(), fv.end(), CompareIndexOf(*this));
+ } else {
+ std::sort(fv.begin(), fv.end(), [](const auto & a, const auto & b) {
+ return a->getTarget()->getFlushedSerialNum() < b->getTarget()->getFlushedSerialNum(); }
+ );
+ }
return fv;
}
@@ -358,11 +367,7 @@ public:
return indexOf(lhs) < indexOf(rhs);
}
-
-public:
- using SP = std::shared_ptr<SimpleStrategy>;
-
- SimpleStrategy() noexcept : _targets() {}
+ SimpleStrategy(OrderBy orderBy) noexcept : _targets(), _orderBy(orderBy) {}
uint32_t
indexOf(const IFlushTarget::SP &target) const
@@ -389,6 +394,8 @@ public:
class NoFlushStrategy : public SimpleStrategy
{
+public:
+ NoFlushStrategy() noexcept : SimpleStrategy(OrderBy::INDEX_OF) {}
FlushContext::List getFlushTargets(const FlushContext::List &, const flushengine::TlsStatsMap &, const flushengine::ActiveFlushStats&) const override {
return {};
}
@@ -432,7 +439,7 @@ struct Fixture
{ }
Fixture(uint32_t numThreads, vespalib::duration idleInterval)
- : Fixture(numThreads, idleInterval, std::make_shared<SimpleStrategy>())
+ : Fixture(numThreads, idleInterval, std::make_shared<SimpleStrategy>(SimpleStrategy::OrderBy::INDEX_OF))
{ }
void putFlushHandler(const vespalib::string &docTypeName, IFlushHandler::SP handler) {
@@ -717,14 +724,14 @@ TEST_F("require that concurrency works", Fixture(2, 1ms))
target2->_proceed.countDown();
}
-TEST_F("require that high pri concurrency works", Fixture(2, 1ms))
+TEST_F("require that there is room for one and only one high pri target",
+ Fixture(2, 1ms, std::make_unique<SimpleStrategy>(SimpleStrategy::OrderBy::SERIAL)))
{
auto target1 = std::make_shared<SimpleTarget>("target1", 1, false);
auto target2 = std::make_shared<SimpleTarget>("target2", 2, false);
- auto target3 = std::make_shared<SimpleTarget>("target3", 2, false);
- auto target4 = std::make_shared<HighPriorityTarget>("target4", 3, false);
- auto target5 = std::make_shared<HighPriorityTarget>("target5", 5, false);
- auto handler = std::make_shared<SimpleHandler>(Targets({target1, target2, target3, target4, target5}), "handler", 9);
+ auto target3 = std::make_shared<HighPriorityTarget>("target3", 3, false);
+ auto target4 = std::make_shared<HighPriorityTarget>("target4", 4, false);
+ auto handler = std::make_shared<SimpleHandler>(Targets({target1, target2, target3, target4}), "handler", 9);
f.putFlushHandler("handler", handler);
f.engine.start();
EXPECT_EQUAL(2u, f.engine.maxConcurrentNormal());
@@ -733,28 +740,56 @@ TEST_F("require that high pri concurrency works", Fixture(2, 1ms))
EXPECT_TRUE(target1->_initDone.await(LONG_TIMEOUT));
EXPECT_TRUE(target2->_initDone.await(LONG_TIMEOUT));
- EXPECT_TRUE(target4->_initDone.await(LONG_TIMEOUT));
- EXPECT_FALSE(target3->_initDone.await(SHORT_TIMEOUT));
- EXPECT_FALSE(target5->_initDone.await(SHORT_TIMEOUT));
- assertThatHandlersInCurrentSet(f.engine, {"handler.target1", "handler.target2", "handler.target4"});
+ EXPECT_TRUE(target3->_initDone.await(LONG_TIMEOUT));
+ EXPECT_FALSE(target4->_initDone.await(SHORT_TIMEOUT));
+ assertThatHandlersInCurrentSet(f.engine, {"handler.target1", "handler.target2", "handler.target3"});
target1->_proceed.countDown();
EXPECT_TRUE(target1->_taskDone.await(LONG_TIMEOUT));
- EXPECT_TRUE(target5->_initDone.await(LONG_TIMEOUT));
- EXPECT_FALSE(target3->_initDone.await(SHORT_TIMEOUT));
- assertThatHandlersInCurrentSet(f.engine, {"handler.target2", "handler.target4", "handler.target5"});
+ EXPECT_TRUE(target4->_initDone.await(LONG_TIMEOUT));
+ assertThatHandlersInCurrentSet(f.engine, {"handler.target2", "handler.target3", "handler.target4"});
target2->_proceed.countDown();
EXPECT_TRUE(target2->_taskDone.await(LONG_TIMEOUT));
- EXPECT_FALSE(target3->_initDone.await(SHORT_TIMEOUT));
- assertThatHandlersInCurrentSet(f.engine, {"handler.target4", "handler.target5"});
+ assertThatHandlersInCurrentSet(f.engine, {"handler.target3", "handler.target4"});
+ target3->_proceed.countDown();
+ EXPECT_TRUE(target3->_taskDone.await(LONG_TIMEOUT));
+ assertThatHandlersInCurrentSet(f.engine, {"handler.target4"});
target4->_proceed.countDown();
EXPECT_TRUE(target4->_taskDone.await(LONG_TIMEOUT));
+ assertThatHandlersInCurrentSet(f.engine, {});
+}
+
+TEST_F("require that high priority does not jump the queue",
+ Fixture(2, 1ms, std::make_unique<SimpleStrategy>(SimpleStrategy::OrderBy::SERIAL)))
+{
+ auto target1 = std::make_shared<SimpleTarget>("target1", 1, false);
+ auto target2 = std::make_shared<SimpleTarget>("target2", 2, false);
+ auto target3 = std::make_shared<SimpleTarget>("target3", 3, false);
+ auto target4 = std::make_shared<HighPriorityTarget>("target4", 4, false);
+ auto handler = std::make_shared<SimpleHandler>(Targets({target1, target2, target3, target4}), "handler", 9);
+ f.putFlushHandler("handler", handler);
+ f.engine.start();
+ EXPECT_EQUAL(2u, f.engine.maxConcurrentNormal());
+ EXPECT_EQUAL(3u, f.engine.maxConcurrentTotal());
+ EXPECT_EQUAL(f.engine.maxConcurrentTotal(), f.engine.get_executor().getNumThreads());
+
+ EXPECT_TRUE(target1->_initDone.await(LONG_TIMEOUT));
+ EXPECT_TRUE(target2->_initDone.await(LONG_TIMEOUT));
+ EXPECT_FALSE(target3->_initDone.await(SHORT_TIMEOUT));
+ EXPECT_FALSE(target4->_initDone.await(SHORT_TIMEOUT));
+ assertThatHandlersInCurrentSet(f.engine, {"handler.target1", "handler.target2"});
+ target1->_proceed.countDown();
+ EXPECT_TRUE(target1->_taskDone.await(LONG_TIMEOUT));
EXPECT_TRUE(target3->_initDone.await(LONG_TIMEOUT));
- assertThatHandlersInCurrentSet(f.engine, {"handler.target5", "handler.target3"});
+ EXPECT_TRUE(target4->_initDone.await(LONG_TIMEOUT));
+ assertThatHandlersInCurrentSet(f.engine, {"handler.target2", "handler.target3", "handler.target4"});
+ target2->_proceed.countDown();
+ EXPECT_TRUE(target2->_taskDone.await(LONG_TIMEOUT));
+ assertThatHandlersInCurrentSet(f.engine, {"handler.target3", "handler.target4"});
target3->_proceed.countDown();
EXPECT_TRUE(target3->_taskDone.await(LONG_TIMEOUT));
- assertThatHandlersInCurrentSet(f.engine, {"handler.target5"});
- target5->_proceed.countDown();
- EXPECT_TRUE(target5->_taskDone.await(LONG_TIMEOUT));
+ assertThatHandlersInCurrentSet(f.engine, {"handler.target4"});
+ target4->_proceed.countDown();
+ EXPECT_TRUE(target4->_taskDone.await(LONG_TIMEOUT));
assertThatHandlersInCurrentSet(f.engine, {});
}
@@ -828,7 +863,7 @@ TEST_F("require that oldest serial is updated when finishing priority flush stra
auto target1 = std::make_shared<SimpleTarget>("target1", 10, true);
auto handler = f.addSimpleHandler({ target1 });
TEST_DO(f.assertOldestSerial(*handler, 10));
- f.engine.setStrategy(std::make_shared<SimpleStrategy>());
+ f.engine.setStrategy(std::make_shared<SimpleStrategy>(SimpleStrategy::OrderBy::INDEX_OF));
EXPECT_EQUAL(20u, handler->_oldestSerial);
}