aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src/tests/storageserver/testvisitormessagesession.cpp
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@yahoo-inc.com>2016-06-15 23:09:44 +0200
committerJon Bratseth <bratseth@yahoo-inc.com>2016-06-15 23:09:44 +0200
commit72231250ed81e10d66bfe70701e64fa5fe50f712 (patch)
tree2728bba1131a6f6e5bdf95afec7d7ff9358dac50 /storage/src/tests/storageserver/testvisitormessagesession.cpp
Publish
Diffstat (limited to 'storage/src/tests/storageserver/testvisitormessagesession.cpp')
-rw-r--r--storage/src/tests/storageserver/testvisitormessagesession.cpp78
1 files changed, 78 insertions, 0 deletions
diff --git a/storage/src/tests/storageserver/testvisitormessagesession.cpp b/storage/src/tests/storageserver/testvisitormessagesession.cpp
new file mode 100644
index 00000000000..e814f6cf229
--- /dev/null
+++ b/storage/src/tests/storageserver/testvisitormessagesession.cpp
@@ -0,0 +1,78 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/fastos/fastos.h>
+#include <tests/storageserver/testvisitormessagesession.h>
+#include <vespa/storageframework/defaultimplementation/clock/realclock.h>
+
+namespace storage {
+
+TestVisitorMessageSession::~TestVisitorMessageSession()
+{
+}
+
+TestVisitorMessageSession::TestVisitorMessageSession(VisitorThread& t,
+ Visitor& v,
+ const mbus::Error& autoReplyError,
+ bool autoReply)
+ : _autoReplyError(autoReplyError),
+ _autoReply(autoReply),
+ thread(t),
+ visitor(v),
+ pendingCount(0)
+{
+}
+
+void
+TestVisitorMessageSession::reply(mbus::Reply::UP rep) {
+ {
+ vespalib::MonitorGuard guard(_waitMonitor);
+ pendingCount--;
+ }
+ thread.handleMessageBusReply(std::move(rep), visitor);
+}
+
+mbus::Result
+TestVisitorMessageSession::send(
+ std::unique_ptr<documentapi::DocumentMessage> message)
+{
+ vespalib::MonitorGuard guard(_waitMonitor);
+ if (_autoReply) {
+ pendingCount++;
+ mbus::Reply::UP rep = message->createReply();
+ rep->setMessage(mbus::Message::UP(message.release()));
+ if (_autoReplyError.getCode() == mbus::ErrorCode::NONE) {
+ reply(std::move(rep));
+ return mbus::Result();
+ } else {
+ return mbus::Result(_autoReplyError,
+ std::unique_ptr<mbus::Message>(message.release()));
+ }
+ } else {
+ pendingCount++;
+ sentMessages.push_back(
+ vespalib::LinkedPtr<documentapi::DocumentMessage>(
+ message.release()));
+ guard.broadcast();
+ return mbus::Result();
+ }
+}
+
+void
+TestVisitorMessageSession::waitForMessages(unsigned int msgCount) {
+ framework::defaultimplementation::RealClock clock;
+ framework::MilliSecTime endTime(
+ clock.getTimeInMillis() + framework::MilliSecTime(60 * 1000));
+
+ vespalib::MonitorGuard guard(_waitMonitor);
+ while (sentMessages.size() < msgCount) {
+ if (clock.getTimeInMillis() > endTime) {
+ throw vespalib::IllegalStateException(
+ vespalib::make_string("Timed out waiting for %u messages "
+ "in test visitor session", msgCount),
+ VESPA_STRLOC);
+ }
+ guard.wait(1000);
+ }
+};
+
+}