diff options
author | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-06-15 23:09:44 +0200 |
---|---|---|
committer | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-06-15 23:09:44 +0200 |
commit | 72231250ed81e10d66bfe70701e64fa5fe50f712 (patch) | |
tree | 2728bba1131a6f6e5bdf95afec7d7ff9358dac50 /storage/src/tests/storageserver/testvisitormessagesession.cpp |
Publish
Diffstat (limited to 'storage/src/tests/storageserver/testvisitormessagesession.cpp')
-rw-r--r-- | storage/src/tests/storageserver/testvisitormessagesession.cpp | 78 |
1 files changed, 78 insertions, 0 deletions
diff --git a/storage/src/tests/storageserver/testvisitormessagesession.cpp b/storage/src/tests/storageserver/testvisitormessagesession.cpp new file mode 100644 index 00000000000..e814f6cf229 --- /dev/null +++ b/storage/src/tests/storageserver/testvisitormessagesession.cpp @@ -0,0 +1,78 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/fastos/fastos.h> +#include <tests/storageserver/testvisitormessagesession.h> +#include <vespa/storageframework/defaultimplementation/clock/realclock.h> + +namespace storage { + +TestVisitorMessageSession::~TestVisitorMessageSession() +{ +} + +TestVisitorMessageSession::TestVisitorMessageSession(VisitorThread& t, + Visitor& v, + const mbus::Error& autoReplyError, + bool autoReply) + : _autoReplyError(autoReplyError), + _autoReply(autoReply), + thread(t), + visitor(v), + pendingCount(0) +{ +} + +void +TestVisitorMessageSession::reply(mbus::Reply::UP rep) { + { + vespalib::MonitorGuard guard(_waitMonitor); + pendingCount--; + } + thread.handleMessageBusReply(std::move(rep), visitor); +} + +mbus::Result +TestVisitorMessageSession::send( + std::unique_ptr<documentapi::DocumentMessage> message) +{ + vespalib::MonitorGuard guard(_waitMonitor); + if (_autoReply) { + pendingCount++; + mbus::Reply::UP rep = message->createReply(); + rep->setMessage(mbus::Message::UP(message.release())); + if (_autoReplyError.getCode() == mbus::ErrorCode::NONE) { + reply(std::move(rep)); + return mbus::Result(); + } else { + return mbus::Result(_autoReplyError, + std::unique_ptr<mbus::Message>(message.release())); + } + } else { + pendingCount++; + sentMessages.push_back( + vespalib::LinkedPtr<documentapi::DocumentMessage>( + message.release())); + guard.broadcast(); + return mbus::Result(); + } +} + +void +TestVisitorMessageSession::waitForMessages(unsigned int msgCount) { + framework::defaultimplementation::RealClock clock; + framework::MilliSecTime endTime( + clock.getTimeInMillis() + framework::MilliSecTime(60 * 1000)); + + vespalib::MonitorGuard guard(_waitMonitor); + while (sentMessages.size() < msgCount) { + if (clock.getTimeInMillis() > endTime) { + throw vespalib::IllegalStateException( + vespalib::make_string("Timed out waiting for %u messages " + "in test visitor session", msgCount), + VESPA_STRLOC); + } + guard.wait(1000); + } +}; + +} |