diff options
Diffstat (limited to 'messagebus/src/tests/choke/choke.cpp')
-rw-r--r-- | messagebus/src/tests/choke/choke.cpp | 227 |
1 files changed, 227 insertions, 0 deletions
diff --git a/messagebus/src/tests/choke/choke.cpp b/messagebus/src/tests/choke/choke.cpp new file mode 100644 index 00000000000..567ec52db79 --- /dev/null +++ b/messagebus/src/tests/choke/choke.cpp @@ -0,0 +1,227 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <vespa/log/log.h> +LOG_SETUP("choke_test"); + +#include <vespa/messagebus/emptyreply.h> +#include <vespa/messagebus/errorcode.h> +#include <vespa/messagebus/reply.h> +#include <vespa/messagebus/testlib/receptor.h> +#include <vespa/messagebus/testlib/simplemessage.h> +#include <vespa/messagebus/testlib/simpleprotocol.h> +#include <vespa/messagebus/testlib/slobrok.h> +#include <vespa/messagebus/testlib/testserver.h> +#include <vespa/vespalib/testkit/testapp.h> + +using namespace mbus; + +//////////////////////////////////////////////////////////////////////////////// +// +// Setup +// +//////////////////////////////////////////////////////////////////////////////// + +class TestData { +public: + Slobrok _slobrok; + TestServer _srcServer; + SourceSession::UP _srcSession; + Receptor _srcHandler; + TestServer _dstServer; + DestinationSession::UP _dstSession; + Receptor _dstHandler; + +public: + TestData(); + bool start(); +}; + +class Test : public vespalib::TestApp { +private: + Message::UP createMessage(const string &msg); + +public: + int Main(); + void testMaxCount(TestData &data); + void testMaxSize(TestData &data); +}; + +TEST_APPHOOK(Test); + +TestData::TestData() : + _slobrok(), + _srcServer(MessageBusParams() + .setRetryPolicy(IRetryPolicy::SP()) + .addProtocol(IProtocol::SP(new SimpleProtocol())), + RPCNetworkParams() + .setSlobrokConfig(_slobrok.config())), + _srcSession(), + _srcHandler(), + _dstServer(MessageBusParams() + .addProtocol(IProtocol::SP(new SimpleProtocol())), + RPCNetworkParams() + .setIdentity(Identity("dst")) + .setSlobrokConfig(_slobrok.config())), + _dstSession(), + _dstHandler() +{ + // empty +} + +bool +TestData::start() +{ + _srcSession = _srcServer.mb.createSourceSession(SourceSessionParams() + .setThrottlePolicy(IThrottlePolicy::SP()) + .setReplyHandler(_srcHandler)); + if (_srcSession.get() == NULL) { + return false; + } + _dstSession = _dstServer.mb.createDestinationSession(DestinationSessionParams() + .setName("session") + .setMessageHandler(_dstHandler)); + if (_dstSession.get() == NULL) { + return false; + } + if (!_srcServer.waitSlobrok("dst/session", 1u)) { + return false; + } + return true; +} + +Message::UP +Test::createMessage(const string &msg) +{ + Message::UP ret(new SimpleMessage(msg)); + ret->getTrace().setLevel(9); + return ret; +} + +int +Test::Main() +{ + TEST_INIT("choke_test"); + + TestData data; + ASSERT_TRUE(data.start()); + + testMaxCount(data); TEST_FLUSH(); + testMaxSize(data); TEST_FLUSH(); + + TEST_DONE(); +} + +static const double TIMEOUT = 120; + +//////////////////////////////////////////////////////////////////////////////// +// +// Tests +// +//////////////////////////////////////////////////////////////////////////////// + +void +Test::testMaxCount(TestData &data) +{ + uint32_t max = 10; + data._dstServer.mb.setMaxPendingCount(max); + std::vector<Message*> lst; + for (uint32_t i = 0; i < max * 2; ++i) { + if (i < max) { + EXPECT_EQUAL(i, data._dstServer.mb.getPendingCount()); + } else { + EXPECT_EQUAL(max, data._dstServer.mb.getPendingCount()); + } + EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("dst/session")).isAccepted()); + if (i < max) { + Message::UP msg = data._dstHandler.getMessage(TIMEOUT); + ASSERT_TRUE(msg.get() != NULL); + lst.push_back(msg.release()); + } else { + Reply::UP reply = data._srcHandler.getReply(); + ASSERT_TRUE(reply.get() != NULL); + EXPECT_EQUAL(1u, reply->getNumErrors()); + EXPECT_EQUAL((uint32_t)ErrorCode::SESSION_BUSY, reply->getError(0).getCode()); + } + } + for (uint32_t i = 0; i < 5; ++i) { + Message::UP msg(lst[0]); + lst.erase(lst.begin()); + data._dstSession->acknowledge(std::move(msg)); + + Reply::UP reply = data._srcHandler.getReply(); + ASSERT_TRUE(reply.get() != NULL); + EXPECT_TRUE(!reply->hasErrors()); + msg = reply->getMessage(); + ASSERT_TRUE(msg.get() != NULL); + EXPECT_TRUE(data._srcSession->send(std::move(msg), Route::parse("dst/session")).isAccepted()); + + msg = data._dstHandler.getMessage(TIMEOUT); + ASSERT_TRUE(msg.get() != NULL); + lst.push_back(msg.release()); + } + while (!lst.empty()) { + EXPECT_EQUAL(lst.size(), data._dstServer.mb.getPendingCount()); + Message::UP msg(lst[0]); + lst.erase(lst.begin()); + data._dstSession->acknowledge(std::move(msg)); + + Reply::UP reply = data._srcHandler.getReply(); + ASSERT_TRUE(reply.get() != NULL); + EXPECT_TRUE(!reply->hasErrors()); + } + EXPECT_EQUAL(0u, data._dstServer.mb.getPendingCount()); +} + +void +Test::testMaxSize(TestData &data) +{ + uint32_t size = createMessage("msg")->getApproxSize(); + uint32_t max = size * 10; + data._dstServer.mb.setMaxPendingSize(max); + std::vector<Message*> lst; + for (uint32_t i = 0; i < max * 2; i += size) { + if (i < max) { + EXPECT_EQUAL(i, data._dstServer.mb.getPendingSize()); + } else { + EXPECT_EQUAL(max, data._dstServer.mb.getPendingSize()); + } + EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("dst/session")).isAccepted()); + if (i < max) { + Message::UP msg = data._dstHandler.getMessage(TIMEOUT); + ASSERT_TRUE(msg.get() != NULL); + lst.push_back(msg.release()); + } else { + Reply::UP reply = data._srcHandler.getReply(); + ASSERT_TRUE(reply.get() != NULL); + EXPECT_EQUAL(1u, reply->getNumErrors()); + EXPECT_EQUAL((uint32_t)ErrorCode::SESSION_BUSY, reply->getError(0).getCode()); + } + } + for (uint32_t i = 0; i < 5; ++i) { + Message::UP msg(lst[0]); + lst.erase(lst.begin()); + data._dstSession->acknowledge(std::move(msg)); + + Reply::UP reply = data._srcHandler.getReply(); + ASSERT_TRUE(reply.get() != NULL); + EXPECT_TRUE(!reply->hasErrors()); + msg = reply->getMessage(); + ASSERT_TRUE(msg.get() != NULL); + EXPECT_TRUE(data._srcSession->send(std::move(msg), Route::parse("dst/session")).isAccepted()); + + msg = data._dstHandler.getMessage(TIMEOUT); + ASSERT_TRUE(msg.get() != NULL); + lst.push_back(msg.release()); + } + while (!lst.empty()) { + EXPECT_EQUAL(size * lst.size(), data._dstServer.mb.getPendingSize()); + Message::UP msg(lst[0]); + lst.erase(lst.begin()); + data._dstSession->acknowledge(std::move(msg)); + + Reply::UP reply = data._srcHandler.getReply(); + ASSERT_TRUE(reply.get() != NULL); + EXPECT_TRUE(!reply->hasErrors()); + } + EXPECT_EQUAL(0u, data._dstServer.mb.getPendingSize()); +} |