diff options
author | Eirik Nygaard <eirik@yahoo-inc.com> | 2016-06-20 14:38:29 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2016-06-20 14:38:29 +0200 |
commit | a6104447a944e8a3cbf8757211eb92b5ef740fc3 (patch) | |
tree | 5780f8b2aaee34ba803f648b1573e0d8bc4f9a5b /searchcore | |
parent | a4a78dda858a3bc7c7c393fd4cfbd46244601e2a (diff) | |
parent | 29692bead9e9accefa1c261d5c3ca91629bd096a (diff) |
Merge pull request #85 from yahoo/toregge/stabilize-flushengine-unit-test-try-2
Toregge/stabilize flushengine unit test try 2
Diffstat (limited to 'searchcore')
-rw-r--r-- | searchcore/src/tests/proton/flushengine/flushengine.cpp | 116 | ||||
-rw-r--r-- | searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp | 2 |
2 files changed, 102 insertions, 16 deletions
diff --git a/searchcore/src/tests/proton/flushengine/flushengine.cpp b/searchcore/src/tests/proton/flushengine/flushengine.cpp index 364d8bf3717..add89d5fc58 100644 --- a/searchcore/src/tests/proton/flushengine/flushengine.cpp +++ b/searchcore/src/tests/proton/flushengine/flushengine.cpp @@ -68,15 +68,63 @@ class SimpleTlsStatsFactory : public flushengine::ITlsStatsFactory } }; +class SimpleHandler; + +class WrappedFlushTask : public searchcorespi::FlushTask +{ + searchcorespi::FlushTask::UP _task; + SimpleHandler &_handler; + +public: + virtual void run() override; + WrappedFlushTask(searchcorespi::FlushTask::UP task, + SimpleHandler &handler) + : _task(std::move(task)), + _handler(handler) + { + } + + virtual search::SerialNum getFlushSerial() const override + { + return _task->getFlushSerial(); + } +}; + +class WrappedFlushTarget : public FlushTargetProxy +{ + SimpleHandler &_handler; +public: + WrappedFlushTarget(const IFlushTarget::SP &target, + SimpleHandler &handler) + : FlushTargetProxy(target), + _handler(handler) + { + } + + virtual Task::UP initFlush(SerialNum currentSerial) override + { + Task::UP task(_target->initFlush(currentSerial)); + if (task) { + return std::make_unique<WrappedFlushTask>(std::move(task), + _handler); + } + return std::move(task); + } +}; + typedef std::vector<IFlushTarget::SP> Targets; +using FlushDoneHistory = std::vector<search::SerialNum>; + class SimpleHandler : public test::DummyFlushHandler { public: Targets _targets; search::SerialNum _oldestSerial; search::SerialNum _currentSerial; + uint32_t _pendingDone; + vespalib::Lock _lock; vespalib::CountDownLatch _done; - std::vector<search::SerialNum> _flushDoneHistory; + FlushDoneHistory _flushDoneHistory; public: typedef std::shared_ptr<SimpleHandler> SP; @@ -87,7 +135,9 @@ public: _targets(targets), _oldestSerial(0), _currentSerial(currentSerial), - _done(targets.size() + 1), + _pendingDone(0u), + _lock(), + _done(targets.size()), _flushDoneHistory() { // empty @@ -106,21 +156,51 @@ public: { LOG(info, "SimpleHandler(%s)::getFlushTargets()", getName().c_str()); - return _targets; + std::vector<IFlushTarget::SP> wrappedTargets; + for (const auto &target : _targets) { + wrappedTargets.push_back(std::make_shared<WrappedFlushTarget> + (target, *this)); + } + return std::move(wrappedTargets); } + // Called once by flush engine slave thread for each task done + void taskDone() + { + vespalib::LockGuard guard(_lock); + ++_pendingDone; + } + + // Called by flush engine master thread after flush handler is + // added to flush engine and when one or more flush tasks related + // to flush handler have completed. void flushDone(search::SerialNum oldestSerial) override { + vespalib::LockGuard guard(_lock); LOG(info, "SimpleHandler(%s)::flushDone(%" PRIu64 ")", getName().c_str(), oldestSerial); _oldestSerial = std::max(_oldestSerial, oldestSerial); _flushDoneHistory.push_back(oldestSerial); - _done.countDown(); - } + while (_pendingDone > 0) { + --_pendingDone; + _done.countDown(); + } + } + FlushDoneHistory getFlushDoneHistory() + { + vespalib::LockGuard guard(_lock); + return _flushDoneHistory; + } }; +void WrappedFlushTask::run() +{ + _task->run(); + _handler.taskDone(); +} + class SimpleTask : public searchcorespi::FlushTask { search::SerialNum &_flushedSerial; search::SerialNum &_currentSerial; @@ -296,6 +376,10 @@ public: if (cached != NULL) { raw = cached->getFlushTarget().get(); } + WrappedFlushTarget *wrapped = dynamic_cast<WrappedFlushTarget *>(raw); + if (wrapped != nullptr) { + raw = wrapped->getFlushTarget().get(); + } for (uint32_t i = 0, len = _targets.size(); i < len; ++i) { if (raw == _targets[i].get()) { LOG(info, "Index of target %p is %d.", raw, i); @@ -413,8 +497,8 @@ TEST_F("require that oldest serial is found", Fixture(1, IINTERVAL)) EXPECT_TRUE(handler->_done.await(LONG_TIMEOUT)); EXPECT_EQUAL(25ul, handler->_oldestSerial); - EXPECT_EQUAL(std::vector<search::SerialNum>({ 10, 20, 25 }), - handler->_flushDoneHistory); + FlushDoneHistory handlerFlushDoneHistory(handler->getFlushDoneHistory()); + EXPECT_EQUAL(FlushDoneHistory({ 10, 20, 25 }), handlerFlushDoneHistory); } TEST_F("require that oldest serial is found in group", Fixture(2, IINTERVAL)) @@ -440,17 +524,19 @@ TEST_F("require that oldest serial is found in group", Fixture(2, IINTERVAL)) EXPECT_TRUE(fooH->_done.await(LONG_TIMEOUT)); EXPECT_EQUAL(25ul, fooH->_oldestSerial); - // Both [ 10, 25 ] and [ 10, 20, 25 } are legal histories - if (fooH->_flushDoneHistory != std::vector<search::SerialNum>({ 10, 25 })) { - EXPECT_EQUAL(std::vector<search::SerialNum>({ 10, 20, 25 }), - fooH->_flushDoneHistory); + // [ 10, 25 ], [ 10, 25, 25 ] and [ 10, 20, 25 ] are legal histories + FlushDoneHistory fooHFlushDoneHistory(fooH->getFlushDoneHistory()); + if (fooHFlushDoneHistory != FlushDoneHistory({ 10, 25 }) && + fooHFlushDoneHistory != FlushDoneHistory({ 10, 25, 25 })) { + EXPECT_EQUAL(FlushDoneHistory({ 10, 20, 25 }), fooHFlushDoneHistory); } EXPECT_TRUE(barH->_done.await(LONG_TIMEOUT)); EXPECT_EQUAL(20ul, barH->_oldestSerial); - // Both [ 5, 20 ] and [ 5, 15, 20 } are legal histories - if (barH->_flushDoneHistory != std::vector<search::SerialNum>({ 5, 20 })) { - EXPECT_EQUAL(std::vector<search::SerialNum>({ 5, 15, 20 }), - barH->_flushDoneHistory); + // [ 5, 20 ], [ 5, 20, 20 ] and [ 5, 15, 20 ] are legal histories + FlushDoneHistory barHFlushDoneHistory(barH->getFlushDoneHistory()); + if (barHFlushDoneHistory != FlushDoneHistory({ 5, 20 }) && + barHFlushDoneHistory != FlushDoneHistory({ 5, 20, 20 })) { + EXPECT_EQUAL(FlushDoneHistory({ 5, 15, 20 }), barHFlushDoneHistory); } } diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp index f3013b4e5de..9d5cf4c006f 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp +++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp @@ -90,8 +90,8 @@ FlushEngine::start() FlushEngine & FlushEngine::close() { - MonitorGuard strategyGuard(_strategyMonitor); { + MonitorGuard strategyGuard(_strategyMonitor); MonitorGuard guard(_monitor); _closed = true; guard.broadcast(); |