diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-01-31 21:33:58 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2021-01-31 21:33:58 +0000 |
commit | a344dd17d18cf12805b977986f0bf36d0191bfe8 (patch) | |
tree | 4dacb9ca69e0aeecd3f0e116761acfc3f40e5fc1 | |
parent | b39f3bd47b9b915a061bc28c3203f5eca5c541ca (diff) |
Add test that verifies that concurrency is upheld during triggerFlush too.
-rw-r--r-- | searchcore/src/tests/proton/flushengine/flushengine_test.cpp | 106 |
1 files changed, 51 insertions, 55 deletions
diff --git a/searchcore/src/tests/proton/flushengine/flushengine_test.cpp b/searchcore/src/tests/proton/flushengine/flushengine_test.cpp index b1c188b2f9f..d5823a8e055 100644 --- a/searchcore/src/tests/proton/flushengine/flushengine_test.cpp +++ b/searchcore/src/tests/proton/flushengine/flushengine_test.cpp @@ -42,8 +42,7 @@ public: public: SimpleExecutor() : _done() - { - } + { } Task::UP execute(Task::UP task) override @@ -83,8 +82,7 @@ public: SimpleHandler &handler) : _task(std::move(task)), _handler(handler) - { - } + { } search::SerialNum getFlushSerial() const override { return _task->getFlushSerial(); @@ -95,19 +93,15 @@ class WrappedFlushTarget : public FlushTargetProxy { SimpleHandler &_handler; public: - WrappedFlushTarget(const IFlushTarget::SP &target, - SimpleHandler &handler) + WrappedFlushTarget(const IFlushTarget::SP &target, SimpleHandler &handler) : FlushTargetProxy(target), _handler(handler) - { - } + { } - Task::UP initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) override - { + Task::UP initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) override { Task::UP task(_target->initFlush(currentSerial, std::move(flush_token))); if (task) { - return std::make_unique<WrappedFlushTask>(std::move(task), - _handler); + return std::make_unique<WrappedFlushTask>(std::move(task), _handler); } return task; } @@ -140,33 +134,25 @@ public: _lock(), _done(targets.size()), _flushDoneHistory() - { - } + { } - search::SerialNum - getCurrentSerialNumber() const override - { - LOG(info, "SimpleHandler(%s)::getCurrentSerialNumber()", - getName().c_str()); + search::SerialNum getCurrentSerialNumber() const override { + LOG(info, "SimpleHandler(%s)::getCurrentSerialNumber()", getName().c_str()); return _currentSerial; } std::vector<IFlushTarget::SP> - getFlushTargets() override - { - LOG(info, "SimpleHandler(%s)::getFlushTargets()", - getName().c_str()); + getFlushTargets() override { + LOG(info, "SimpleHandler(%s)::getFlushTargets()", getName().c_str()); std::vector<IFlushTarget::SP> wrappedTargets; for (const auto &target : _targets) { - wrappedTargets.push_back(std::make_shared<WrappedFlushTarget> - (target, *this)); + wrappedTargets.push_back(std::make_shared<WrappedFlushTarget>(target, *this)); } return wrappedTargets; } // Called once by flush engine thread for each task done - void taskDone() - { + void taskDone() { std::lock_guard<std::mutex> guard(_lock); ++_pendingDone; } @@ -174,12 +160,9 @@ public: // Called by flush engine master thread after flush handler is // added to flush engine and when one or more flush tasks related // to flush handler have completed. - void - flushDone(search::SerialNum oldestSerial) override - { + void flushDone(search::SerialNum oldestSerial) override { std::lock_guard<std::mutex> guard(_lock); - LOG(info, "SimpleHandler(%s)::flushDone(%" PRIu64 ")", - getName().c_str(), oldestSerial); + LOG(info, "SimpleHandler(%s)::flushDone(%" PRIu64 ")", getName().c_str(), oldestSerial); _oldestSerial = std::max(_oldestSerial, oldestSerial); _flushDoneHistory.push_back(oldestSerial); while (_pendingDone > 0) { @@ -188,8 +171,7 @@ public: } } - FlushDoneHistory getFlushDoneHistory() - { + FlushDoneHistory getFlushDoneHistory() { std::lock_guard<std::mutex> guard(_lock); return _flushDoneHistory; } @@ -217,12 +199,11 @@ public: search::SerialNum ¤tSerial) : _flushedSerial(flushedSerial), _currentSerial(currentSerial), _start(start), _done(done), _proceed(proceed) - { - } + { } void run() override { _start.countDown(); - if (_proceed != NULL) { + if (_proceed != nullptr) { _proceed->await(); } _flushedSerial = _currentSerial; @@ -270,8 +251,7 @@ public: _taskStart(), _taskDone(), _task(std::move(task)) - { - } + { } SimpleTarget(search::SerialNum flushedSerial = 0, bool proceedImmediately = true) : SimpleTarget("anon", flushedSerial, proceedImmediately) @@ -316,8 +296,7 @@ public: : SimpleTarget("anon"), _mgain(false), _serial(false) - { - } + { } MemoryGain getApproxMemoryGain() const override { LOG_ASSERT(_mgain == false); @@ -389,8 +368,7 @@ public: class NoFlushStrategy : public SimpleStrategy { - FlushContext::List getFlushTargets(const FlushContext::List &, - const flushengine::TlsStatsMap &) const override { + FlushContext::List getFlushTargets(const FlushContext::List &, const flushengine::TlsStatsMap &) const override { return FlushContext::List(); } }; @@ -430,13 +408,11 @@ struct Fixture : tlsStatsFactory(std::make_shared<SimpleTlsStatsFactory>()), strategy(strategy_), engine(tlsStatsFactory, strategy, numThreads, idleInterval) - { - } + { } Fixture(uint32_t numThreads, vespalib::duration idleInterval) : Fixture(numThreads, idleInterval, std::make_shared<SimpleStrategy>()) - { - } + { } void putFlushHandler(const vespalib::string &docTypeName, IFlushHandler::SP handler) { engine.putFlushHandler(DocTypeName(docTypeName), handler); @@ -446,17 +422,14 @@ struct Fixture strategy->_targets.push_back(std::move(target)); } - std::shared_ptr<SimpleHandler> - addSimpleHandler(Targets targets) - { + std::shared_ptr<SimpleHandler> addSimpleHandler(Targets targets) { auto handler = std::make_shared<SimpleHandler>(targets, "handler", 20); engine.putFlushHandler(DocTypeName("handler"), handler); engine.start(); return handler; } - void assertOldestSerial(SimpleHandler &handler, search::SerialNum expOldestSerial) - { + void assertOldestSerial(SimpleHandler &handler, search::SerialNum expOldestSerial) { using namespace std::chrono_literals; for (int pass = 0; pass < 600; ++pass) { std::this_thread::sleep_for(100ms); @@ -593,8 +566,7 @@ TEST_F("require that target can refuse flush", Fixture(2, IINTERVAL)) EXPECT_TRUE(!handler->_done.await(SHORT_TIMEOUT)); } -TEST_F("require that targets are flushed when nothing new to flush", - Fixture(2, IINTERVAL)) +TEST_F("require that targets are flushed when nothing new to flush", Fixture(2, IINTERVAL)) { auto target = std::make_shared<SimpleTarget>("anon", 5); // oldest unflushed serial num = 5 auto handler = std::make_shared<SimpleHandler>(Targets({target}), "anon", 4); // current serial num = 4 @@ -640,7 +612,7 @@ TEST("require that threaded target works") auto target = std::make_shared<ThreadedFlushTarget>(executor, getSerialNum, std::make_shared<SimpleTarget>()); EXPECT_FALSE(executor._done.await(SHORT_TIMEOUT)); - EXPECT_TRUE(target->initFlush(0, std::make_shared<search::FlushToken>()).get() != NULL); + EXPECT_TRUE(target->initFlush(0, std::make_shared<search::FlushToken>())); EXPECT_TRUE(executor._done.await(LONG_TIMEOUT)); } @@ -713,6 +685,30 @@ TEST_F("require that concurrency works", Fixture(2, 1ms)) target2->_proceed.countDown(); } +TEST_F("require that concurrency works with triggerFlush", Fixture(2, 1ms)) +{ + auto target1 = std::make_shared<SimpleTarget>("target1", 1, false); + auto target2 = std::make_shared<SimpleTarget>("target2", 2, false); + auto target3 = std::make_shared<SimpleTarget>("target3", 3, false); + auto handler = std::make_shared<SimpleHandler>(Targets({target1, target2, target3}), "handler", 9); + f.putFlushHandler("handler", handler); + std::thread thread([this]() { f.engine.triggerFlush(); }); + std::this_thread::sleep_for(1s); + f.engine.start(); + + EXPECT_TRUE(target1->_initDone.await(LONG_TIMEOUT)); + EXPECT_TRUE(target2->_initDone.await(LONG_TIMEOUT)); + EXPECT_TRUE(!target3->_initDone.await(SHORT_TIMEOUT)); + assertThatHandlersInCurrentSet(f.engine, {"handler.target1", "handler.target2"}); + EXPECT_TRUE(!target3->_initDone.await(SHORT_TIMEOUT)); + target1->_proceed.countDown(); + EXPECT_TRUE(target1->_taskDone.await(LONG_TIMEOUT)); + assertThatHandlersInCurrentSet(f.engine, {"handler.target2", "handler.target3"}); + target3->_proceed.countDown(); + target2->_proceed.countDown(); + thread.join(); +} + TEST_F("require that state explorer can list flush targets", Fixture(1, 1ms)) { auto target = std::make_shared<SimpleTarget>("target1", 100, false); |