summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorEirik Nygaard <eirik@yahoo-inc.com>2016-06-20 14:38:29 +0200
committerGitHub <noreply@github.com>2016-06-20 14:38:29 +0200
commita6104447a944e8a3cbf8757211eb92b5ef740fc3 (patch)
tree5780f8b2aaee34ba803f648b1573e0d8bc4f9a5b /searchcore
parenta4a78dda858a3bc7c7c393fd4cfbd46244601e2a (diff)
parent29692bead9e9accefa1c261d5c3ca91629bd096a (diff)
Merge pull request #85 from yahoo/toregge/stabilize-flushengine-unit-test-try-2
Toregge/stabilize flushengine unit test try 2
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/tests/proton/flushengine/flushengine.cpp116
-rw-r--r--searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp2
2 files changed, 102 insertions, 16 deletions
diff --git a/searchcore/src/tests/proton/flushengine/flushengine.cpp b/searchcore/src/tests/proton/flushengine/flushengine.cpp
index 364d8bf3717..add89d5fc58 100644
--- a/searchcore/src/tests/proton/flushengine/flushengine.cpp
+++ b/searchcore/src/tests/proton/flushengine/flushengine.cpp
@@ -68,15 +68,63 @@ class SimpleTlsStatsFactory : public flushengine::ITlsStatsFactory
}
};
+class SimpleHandler;
+
+class WrappedFlushTask : public searchcorespi::FlushTask
+{
+ searchcorespi::FlushTask::UP _task;
+ SimpleHandler &_handler;
+
+public:
+ virtual void run() override;
+ WrappedFlushTask(searchcorespi::FlushTask::UP task,
+ SimpleHandler &handler)
+ : _task(std::move(task)),
+ _handler(handler)
+ {
+ }
+
+ virtual search::SerialNum getFlushSerial() const override
+ {
+ return _task->getFlushSerial();
+ }
+};
+
+class WrappedFlushTarget : public FlushTargetProxy
+{
+ SimpleHandler &_handler;
+public:
+ WrappedFlushTarget(const IFlushTarget::SP &target,
+ SimpleHandler &handler)
+ : FlushTargetProxy(target),
+ _handler(handler)
+ {
+ }
+
+ virtual Task::UP initFlush(SerialNum currentSerial) override
+ {
+ Task::UP task(_target->initFlush(currentSerial));
+ if (task) {
+ return std::make_unique<WrappedFlushTask>(std::move(task),
+ _handler);
+ }
+ return std::move(task);
+ }
+};
+
typedef std::vector<IFlushTarget::SP> Targets;
+using FlushDoneHistory = std::vector<search::SerialNum>;
+
class SimpleHandler : public test::DummyFlushHandler {
public:
Targets _targets;
search::SerialNum _oldestSerial;
search::SerialNum _currentSerial;
+ uint32_t _pendingDone;
+ vespalib::Lock _lock;
vespalib::CountDownLatch _done;
- std::vector<search::SerialNum> _flushDoneHistory;
+ FlushDoneHistory _flushDoneHistory;
public:
typedef std::shared_ptr<SimpleHandler> SP;
@@ -87,7 +135,9 @@ public:
_targets(targets),
_oldestSerial(0),
_currentSerial(currentSerial),
- _done(targets.size() + 1),
+ _pendingDone(0u),
+ _lock(),
+ _done(targets.size()),
_flushDoneHistory()
{
// empty
@@ -106,21 +156,51 @@ public:
{
LOG(info, "SimpleHandler(%s)::getFlushTargets()",
getName().c_str());
- return _targets;
+ std::vector<IFlushTarget::SP> wrappedTargets;
+ for (const auto &target : _targets) {
+ wrappedTargets.push_back(std::make_shared<WrappedFlushTarget>
+ (target, *this));
+ }
+ return std::move(wrappedTargets);
}
+ // Called once by flush engine slave thread for each task done
+ void taskDone()
+ {
+ vespalib::LockGuard guard(_lock);
+ ++_pendingDone;
+ }
+
+ // Called by flush engine master thread after flush handler is
+ // added to flush engine and when one or more flush tasks related
+ // to flush handler have completed.
void
flushDone(search::SerialNum oldestSerial) override
{
+ vespalib::LockGuard guard(_lock);
LOG(info, "SimpleHandler(%s)::flushDone(%" PRIu64 ")",
getName().c_str(), oldestSerial);
_oldestSerial = std::max(_oldestSerial, oldestSerial);
_flushDoneHistory.push_back(oldestSerial);
- _done.countDown();
- }
+ while (_pendingDone > 0) {
+ --_pendingDone;
+ _done.countDown();
+ }
+ }
+ FlushDoneHistory getFlushDoneHistory()
+ {
+ vespalib::LockGuard guard(_lock);
+ return _flushDoneHistory;
+ }
};
+void WrappedFlushTask::run()
+{
+ _task->run();
+ _handler.taskDone();
+}
+
class SimpleTask : public searchcorespi::FlushTask {
search::SerialNum &_flushedSerial;
search::SerialNum &_currentSerial;
@@ -296,6 +376,10 @@ public:
if (cached != NULL) {
raw = cached->getFlushTarget().get();
}
+ WrappedFlushTarget *wrapped = dynamic_cast<WrappedFlushTarget *>(raw);
+ if (wrapped != nullptr) {
+ raw = wrapped->getFlushTarget().get();
+ }
for (uint32_t i = 0, len = _targets.size(); i < len; ++i) {
if (raw == _targets[i].get()) {
LOG(info, "Index of target %p is %d.", raw, i);
@@ -413,8 +497,8 @@ TEST_F("require that oldest serial is found", Fixture(1, IINTERVAL))
EXPECT_TRUE(handler->_done.await(LONG_TIMEOUT));
EXPECT_EQUAL(25ul, handler->_oldestSerial);
- EXPECT_EQUAL(std::vector<search::SerialNum>({ 10, 20, 25 }),
- handler->_flushDoneHistory);
+ FlushDoneHistory handlerFlushDoneHistory(handler->getFlushDoneHistory());
+ EXPECT_EQUAL(FlushDoneHistory({ 10, 20, 25 }), handlerFlushDoneHistory);
}
TEST_F("require that oldest serial is found in group", Fixture(2, IINTERVAL))
@@ -440,17 +524,19 @@ TEST_F("require that oldest serial is found in group", Fixture(2, IINTERVAL))
EXPECT_TRUE(fooH->_done.await(LONG_TIMEOUT));
EXPECT_EQUAL(25ul, fooH->_oldestSerial);
- // Both [ 10, 25 ] and [ 10, 20, 25 } are legal histories
- if (fooH->_flushDoneHistory != std::vector<search::SerialNum>({ 10, 25 })) {
- EXPECT_EQUAL(std::vector<search::SerialNum>({ 10, 20, 25 }),
- fooH->_flushDoneHistory);
+ // [ 10, 25 ], [ 10, 25, 25 ] and [ 10, 20, 25 ] are legal histories
+ FlushDoneHistory fooHFlushDoneHistory(fooH->getFlushDoneHistory());
+ if (fooHFlushDoneHistory != FlushDoneHistory({ 10, 25 }) &&
+ fooHFlushDoneHistory != FlushDoneHistory({ 10, 25, 25 })) {
+ EXPECT_EQUAL(FlushDoneHistory({ 10, 20, 25 }), fooHFlushDoneHistory);
}
EXPECT_TRUE(barH->_done.await(LONG_TIMEOUT));
EXPECT_EQUAL(20ul, barH->_oldestSerial);
- // Both [ 5, 20 ] and [ 5, 15, 20 } are legal histories
- if (barH->_flushDoneHistory != std::vector<search::SerialNum>({ 5, 20 })) {
- EXPECT_EQUAL(std::vector<search::SerialNum>({ 5, 15, 20 }),
- barH->_flushDoneHistory);
+ // [ 5, 20 ], [ 5, 20, 20 ] and [ 5, 15, 20 ] are legal histories
+ FlushDoneHistory barHFlushDoneHistory(barH->getFlushDoneHistory());
+ if (barHFlushDoneHistory != FlushDoneHistory({ 5, 20 }) &&
+ barHFlushDoneHistory != FlushDoneHistory({ 5, 20, 20 })) {
+ EXPECT_EQUAL(FlushDoneHistory({ 5, 15, 20 }), barHFlushDoneHistory);
}
}
diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp
index f3013b4e5de..9d5cf4c006f 100644
--- a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp
+++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp
@@ -90,8 +90,8 @@ FlushEngine::start()
FlushEngine &
FlushEngine::close()
{
- MonitorGuard strategyGuard(_strategyMonitor);
{
+ MonitorGuard strategyGuard(_strategyMonitor);
MonitorGuard guard(_monitor);
_closed = true;
guard.broadcast();