diff options
author | Tor Egge <Tor.Egge@yahoo-inc.com> | 2016-06-18 18:31:57 +0000 |
---|---|---|
committer | Tor Egge <Tor.Egge@yahoo-inc.com> | 2016-06-18 18:52:23 +0000 |
commit | 29692bead9e9accefa1c261d5c3ca91629bd096a (patch) | |
tree | 8217c5737162768ba9ac337991d46ba08fcd5d83 /searchcore | |
parent | c986a7bf2f1dc6699c1718a281747d7da6c3a021 (diff) |
If flush tasks for two flush targets for the same flush handler
completes at the same time then flush engine might call flushDone
only once to handle transaction log pruning for handler.
Adjust SimpleHandler class in unit test to better detect when
flushing has completed.
Diffstat (limited to 'searchcore')
-rw-r--r-- | searchcore/src/tests/proton/flushengine/flushengine.cpp | 116 |
1 files changed, 101 insertions, 15 deletions
diff --git a/searchcore/src/tests/proton/flushengine/flushengine.cpp b/searchcore/src/tests/proton/flushengine/flushengine.cpp index 364d8bf3717..add89d5fc58 100644 --- a/searchcore/src/tests/proton/flushengine/flushengine.cpp +++ b/searchcore/src/tests/proton/flushengine/flushengine.cpp @@ -68,15 +68,63 @@ class SimpleTlsStatsFactory : public flushengine::ITlsStatsFactory } }; +class SimpleHandler; + +class WrappedFlushTask : public searchcorespi::FlushTask +{ + searchcorespi::FlushTask::UP _task; + SimpleHandler &_handler; + +public: + virtual void run() override; + WrappedFlushTask(searchcorespi::FlushTask::UP task, + SimpleHandler &handler) + : _task(std::move(task)), + _handler(handler) + { + } + + virtual search::SerialNum getFlushSerial() const override + { + return _task->getFlushSerial(); + } +}; + +class WrappedFlushTarget : public FlushTargetProxy +{ + SimpleHandler &_handler; +public: + WrappedFlushTarget(const IFlushTarget::SP &target, + SimpleHandler &handler) + : FlushTargetProxy(target), + _handler(handler) + { + } + + virtual Task::UP initFlush(SerialNum currentSerial) override + { + Task::UP task(_target->initFlush(currentSerial)); + if (task) { + return std::make_unique<WrappedFlushTask>(std::move(task), + _handler); + } + return std::move(task); + } +}; + typedef std::vector<IFlushTarget::SP> Targets; +using FlushDoneHistory = std::vector<search::SerialNum>; + class SimpleHandler : public test::DummyFlushHandler { public: Targets _targets; search::SerialNum _oldestSerial; search::SerialNum _currentSerial; + uint32_t _pendingDone; + vespalib::Lock _lock; vespalib::CountDownLatch _done; - std::vector<search::SerialNum> _flushDoneHistory; + FlushDoneHistory _flushDoneHistory; public: typedef std::shared_ptr<SimpleHandler> SP; @@ -87,7 +135,9 @@ public: _targets(targets), _oldestSerial(0), _currentSerial(currentSerial), - _done(targets.size() + 1), + _pendingDone(0u), + _lock(), + _done(targets.size()), _flushDoneHistory() { // empty @@ -106,21 +156,51 @@ public: { LOG(info, "SimpleHandler(%s)::getFlushTargets()", getName().c_str()); - return _targets; + std::vector<IFlushTarget::SP> wrappedTargets; + for (const auto &target : _targets) { + wrappedTargets.push_back(std::make_shared<WrappedFlushTarget> + (target, *this)); + } + return std::move(wrappedTargets); } + // Called once by flush engine slave thread for each task done + void taskDone() + { + vespalib::LockGuard guard(_lock); + ++_pendingDone; + } + + // Called by flush engine master thread after flush handler is + // added to flush engine and when one or more flush tasks related + // to flush handler have completed. void flushDone(search::SerialNum oldestSerial) override { + vespalib::LockGuard guard(_lock); LOG(info, "SimpleHandler(%s)::flushDone(%" PRIu64 ")", getName().c_str(), oldestSerial); _oldestSerial = std::max(_oldestSerial, oldestSerial); _flushDoneHistory.push_back(oldestSerial); - _done.countDown(); - } + while (_pendingDone > 0) { + --_pendingDone; + _done.countDown(); + } + } + FlushDoneHistory getFlushDoneHistory() + { + vespalib::LockGuard guard(_lock); + return _flushDoneHistory; + } }; +void WrappedFlushTask::run() +{ + _task->run(); + _handler.taskDone(); +} + class SimpleTask : public searchcorespi::FlushTask { search::SerialNum &_flushedSerial; search::SerialNum &_currentSerial; @@ -296,6 +376,10 @@ public: if (cached != NULL) { raw = cached->getFlushTarget().get(); } + WrappedFlushTarget *wrapped = dynamic_cast<WrappedFlushTarget *>(raw); + if (wrapped != nullptr) { + raw = wrapped->getFlushTarget().get(); + } for (uint32_t i = 0, len = _targets.size(); i < len; ++i) { if (raw == _targets[i].get()) { LOG(info, "Index of target %p is %d.", raw, i); @@ -413,8 +497,8 @@ TEST_F("require that oldest serial is found", Fixture(1, IINTERVAL)) EXPECT_TRUE(handler->_done.await(LONG_TIMEOUT)); EXPECT_EQUAL(25ul, handler->_oldestSerial); - EXPECT_EQUAL(std::vector<search::SerialNum>({ 10, 20, 25 }), - handler->_flushDoneHistory); + FlushDoneHistory handlerFlushDoneHistory(handler->getFlushDoneHistory()); + EXPECT_EQUAL(FlushDoneHistory({ 10, 20, 25 }), handlerFlushDoneHistory); } TEST_F("require that oldest serial is found in group", Fixture(2, IINTERVAL)) @@ -440,17 +524,19 @@ TEST_F("require that oldest serial is found in group", Fixture(2, IINTERVAL)) EXPECT_TRUE(fooH->_done.await(LONG_TIMEOUT)); EXPECT_EQUAL(25ul, fooH->_oldestSerial); - // Both [ 10, 25 ] and [ 10, 20, 25 } are legal histories - if (fooH->_flushDoneHistory != std::vector<search::SerialNum>({ 10, 25 })) { - EXPECT_EQUAL(std::vector<search::SerialNum>({ 10, 20, 25 }), - fooH->_flushDoneHistory); + // [ 10, 25 ], [ 10, 25, 25 ] and [ 10, 20, 25 ] are legal histories + FlushDoneHistory fooHFlushDoneHistory(fooH->getFlushDoneHistory()); + if (fooHFlushDoneHistory != FlushDoneHistory({ 10, 25 }) && + fooHFlushDoneHistory != FlushDoneHistory({ 10, 25, 25 })) { + EXPECT_EQUAL(FlushDoneHistory({ 10, 20, 25 }), fooHFlushDoneHistory); } EXPECT_TRUE(barH->_done.await(LONG_TIMEOUT)); EXPECT_EQUAL(20ul, barH->_oldestSerial); - // Both [ 5, 20 ] and [ 5, 15, 20 } are legal histories - if (barH->_flushDoneHistory != std::vector<search::SerialNum>({ 5, 20 })) { - EXPECT_EQUAL(std::vector<search::SerialNum>({ 5, 15, 20 }), - barH->_flushDoneHistory); + // [ 5, 20 ], [ 5, 20, 20 ] and [ 5, 15, 20 ] are legal histories + FlushDoneHistory barHFlushDoneHistory(barH->getFlushDoneHistory()); + if (barHFlushDoneHistory != FlushDoneHistory({ 5, 20 }) && + barHFlushDoneHistory != FlushDoneHistory({ 5, 20, 20 })) { + EXPECT_EQUAL(FlushDoneHistory({ 5, 15, 20 }), barHFlushDoneHistory); } } |