summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@yahoo-inc.com>2016-06-18 18:31:57 +0000
committerTor Egge <Tor.Egge@yahoo-inc.com>2016-06-18 18:52:23 +0000
commit29692bead9e9accefa1c261d5c3ca91629bd096a (patch)
tree8217c5737162768ba9ac337991d46ba08fcd5d83 /searchcore
parentc986a7bf2f1dc6699c1718a281747d7da6c3a021 (diff)
If flush tasks for two flush targets for the same flush handler
completes at the same time then flush engine might call flushDone only once to handle transaction log pruning for handler. Adjust SimpleHandler class in unit test to better detect when flushing has completed.
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/tests/proton/flushengine/flushengine.cpp116
1 files changed, 101 insertions, 15 deletions
diff --git a/searchcore/src/tests/proton/flushengine/flushengine.cpp b/searchcore/src/tests/proton/flushengine/flushengine.cpp
index 364d8bf3717..add89d5fc58 100644
--- a/searchcore/src/tests/proton/flushengine/flushengine.cpp
+++ b/searchcore/src/tests/proton/flushengine/flushengine.cpp
@@ -68,15 +68,63 @@ class SimpleTlsStatsFactory : public flushengine::ITlsStatsFactory
}
};
+class SimpleHandler;
+
+class WrappedFlushTask : public searchcorespi::FlushTask
+{
+ searchcorespi::FlushTask::UP _task;
+ SimpleHandler &_handler;
+
+public:
+ virtual void run() override;
+ WrappedFlushTask(searchcorespi::FlushTask::UP task,
+ SimpleHandler &handler)
+ : _task(std::move(task)),
+ _handler(handler)
+ {
+ }
+
+ virtual search::SerialNum getFlushSerial() const override
+ {
+ return _task->getFlushSerial();
+ }
+};
+
+class WrappedFlushTarget : public FlushTargetProxy
+{
+ SimpleHandler &_handler;
+public:
+ WrappedFlushTarget(const IFlushTarget::SP &target,
+ SimpleHandler &handler)
+ : FlushTargetProxy(target),
+ _handler(handler)
+ {
+ }
+
+ virtual Task::UP initFlush(SerialNum currentSerial) override
+ {
+ Task::UP task(_target->initFlush(currentSerial));
+ if (task) {
+ return std::make_unique<WrappedFlushTask>(std::move(task),
+ _handler);
+ }
+ return std::move(task);
+ }
+};
+
typedef std::vector<IFlushTarget::SP> Targets;
+using FlushDoneHistory = std::vector<search::SerialNum>;
+
class SimpleHandler : public test::DummyFlushHandler {
public:
Targets _targets;
search::SerialNum _oldestSerial;
search::SerialNum _currentSerial;
+ uint32_t _pendingDone;
+ vespalib::Lock _lock;
vespalib::CountDownLatch _done;
- std::vector<search::SerialNum> _flushDoneHistory;
+ FlushDoneHistory _flushDoneHistory;
public:
typedef std::shared_ptr<SimpleHandler> SP;
@@ -87,7 +135,9 @@ public:
_targets(targets),
_oldestSerial(0),
_currentSerial(currentSerial),
- _done(targets.size() + 1),
+ _pendingDone(0u),
+ _lock(),
+ _done(targets.size()),
_flushDoneHistory()
{
// empty
@@ -106,21 +156,51 @@ public:
{
LOG(info, "SimpleHandler(%s)::getFlushTargets()",
getName().c_str());
- return _targets;
+ std::vector<IFlushTarget::SP> wrappedTargets;
+ for (const auto &target : _targets) {
+ wrappedTargets.push_back(std::make_shared<WrappedFlushTarget>
+ (target, *this));
+ }
+ return std::move(wrappedTargets);
}
+ // Called once by flush engine slave thread for each task done
+ void taskDone()
+ {
+ vespalib::LockGuard guard(_lock);
+ ++_pendingDone;
+ }
+
+ // Called by flush engine master thread after flush handler is
+ // added to flush engine and when one or more flush tasks related
+ // to flush handler have completed.
void
flushDone(search::SerialNum oldestSerial) override
{
+ vespalib::LockGuard guard(_lock);
LOG(info, "SimpleHandler(%s)::flushDone(%" PRIu64 ")",
getName().c_str(), oldestSerial);
_oldestSerial = std::max(_oldestSerial, oldestSerial);
_flushDoneHistory.push_back(oldestSerial);
- _done.countDown();
- }
+ while (_pendingDone > 0) {
+ --_pendingDone;
+ _done.countDown();
+ }
+ }
+ FlushDoneHistory getFlushDoneHistory()
+ {
+ vespalib::LockGuard guard(_lock);
+ return _flushDoneHistory;
+ }
};
+void WrappedFlushTask::run()
+{
+ _task->run();
+ _handler.taskDone();
+}
+
class SimpleTask : public searchcorespi::FlushTask {
search::SerialNum &_flushedSerial;
search::SerialNum &_currentSerial;
@@ -296,6 +376,10 @@ public:
if (cached != NULL) {
raw = cached->getFlushTarget().get();
}
+ WrappedFlushTarget *wrapped = dynamic_cast<WrappedFlushTarget *>(raw);
+ if (wrapped != nullptr) {
+ raw = wrapped->getFlushTarget().get();
+ }
for (uint32_t i = 0, len = _targets.size(); i < len; ++i) {
if (raw == _targets[i].get()) {
LOG(info, "Index of target %p is %d.", raw, i);
@@ -413,8 +497,8 @@ TEST_F("require that oldest serial is found", Fixture(1, IINTERVAL))
EXPECT_TRUE(handler->_done.await(LONG_TIMEOUT));
EXPECT_EQUAL(25ul, handler->_oldestSerial);
- EXPECT_EQUAL(std::vector<search::SerialNum>({ 10, 20, 25 }),
- handler->_flushDoneHistory);
+ FlushDoneHistory handlerFlushDoneHistory(handler->getFlushDoneHistory());
+ EXPECT_EQUAL(FlushDoneHistory({ 10, 20, 25 }), handlerFlushDoneHistory);
}
TEST_F("require that oldest serial is found in group", Fixture(2, IINTERVAL))
@@ -440,17 +524,19 @@ TEST_F("require that oldest serial is found in group", Fixture(2, IINTERVAL))
EXPECT_TRUE(fooH->_done.await(LONG_TIMEOUT));
EXPECT_EQUAL(25ul, fooH->_oldestSerial);
- // Both [ 10, 25 ] and [ 10, 20, 25 } are legal histories
- if (fooH->_flushDoneHistory != std::vector<search::SerialNum>({ 10, 25 })) {
- EXPECT_EQUAL(std::vector<search::SerialNum>({ 10, 20, 25 }),
- fooH->_flushDoneHistory);
+ // [ 10, 25 ], [ 10, 25, 25 ] and [ 10, 20, 25 ] are legal histories
+ FlushDoneHistory fooHFlushDoneHistory(fooH->getFlushDoneHistory());
+ if (fooHFlushDoneHistory != FlushDoneHistory({ 10, 25 }) &&
+ fooHFlushDoneHistory != FlushDoneHistory({ 10, 25, 25 })) {
+ EXPECT_EQUAL(FlushDoneHistory({ 10, 20, 25 }), fooHFlushDoneHistory);
}
EXPECT_TRUE(barH->_done.await(LONG_TIMEOUT));
EXPECT_EQUAL(20ul, barH->_oldestSerial);
- // Both [ 5, 20 ] and [ 5, 15, 20 } are legal histories
- if (barH->_flushDoneHistory != std::vector<search::SerialNum>({ 5, 20 })) {
- EXPECT_EQUAL(std::vector<search::SerialNum>({ 5, 15, 20 }),
- barH->_flushDoneHistory);
+ // [ 5, 20 ], [ 5, 20, 20 ] and [ 5, 15, 20 ] are legal histories
+ FlushDoneHistory barHFlushDoneHistory(barH->getFlushDoneHistory());
+ if (barHFlushDoneHistory != FlushDoneHistory({ 5, 20 }) &&
+ barHFlushDoneHistory != FlushDoneHistory({ 5, 20, 20 })) {
+ EXPECT_EQUAL(FlushDoneHistory({ 5, 15, 20 }), barHFlushDoneHistory);
}
}