diff options
Diffstat (limited to 'messagebus/src/tests/sourcesession/sourcesession.cpp')
-rw-r--r-- | messagebus/src/tests/sourcesession/sourcesession.cpp | 339 |
1 files changed, 339 insertions, 0 deletions
diff --git a/messagebus/src/tests/sourcesession/sourcesession.cpp b/messagebus/src/tests/sourcesession/sourcesession.cpp new file mode 100644 index 00000000000..f70789d81af --- /dev/null +++ b/messagebus/src/tests/sourcesession/sourcesession.cpp @@ -0,0 +1,339 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <vespa/log/log.h> +LOG_SETUP("sourcesession_test"); + +#include <vespa/messagebus/destinationsession.h> +#include <vespa/messagebus/error.h> +#include <vespa/messagebus/errorcode.h> +#include <vespa/messagebus/emptyreply.h> +#include <vespa/messagebus/messagebus.h> +#include <vespa/messagebus/routablequeue.h> +#include <vespa/messagebus/sourcesession.h> +#include <vespa/messagebus/sourcesessionparams.h> +#include <vespa/messagebus/routing/retrytransienterrorspolicy.h> +#include <vespa/messagebus/routing/routingcontext.h> +#include <vespa/messagebus/routing/routingspec.h> +#include <vespa/messagebus/testlib/simplemessage.h> +#include <vespa/messagebus/testlib/simpleprotocol.h> +#include <vespa/messagebus/testlib/simplereply.h> +#include <vespa/messagebus/testlib/slobrok.h> +#include <vespa/messagebus/testlib/testserver.h> +#include <vespa/vespalib/testkit/testapp.h> + +using namespace mbus; + +struct DelayedHandler : public IMessageHandler +{ + DestinationSession::UP session; + uint32_t delay; + + DelayedHandler(MessageBus &mb, uint32_t d) : session(), delay(d) { + session = mb.createDestinationSession("session", true, *this); + } + ~DelayedHandler() { + session.reset(); + } + virtual void handleMessage(Message::UP msg) { + // this will block the transport thread in the server messagebus, + // but that should be ok, as we only want to test the timing in the + // client messagebus... + FastOS_Thread::Sleep(delay); + session->acknowledge(std::move(msg)); + } +}; + +RoutingSpec getRouting() { + return RoutingSpec() + .addTable(RoutingTableSpec("Simple") + .addHop(HopSpec("dst", "dst/session")) + .addRoute(RouteSpec("dst").addHop("dst"))); +} + +RoutingSpec getBadRouting() { + return RoutingSpec() + .addTable(RoutingTableSpec("Simple") + .addHop(HopSpec("dst", "dst/session")) + .addRoute(RouteSpec("dst").addHop("dst"))); +} + +bool waitQueueSize(RoutableQueue &queue, uint32_t size) { + for (uint32_t i = 0; i < 60000; ++i) { + if (queue.size() == size) { + return true; + } + FastOS_Thread::Sleep(1); + } + return false; +} + +class Test : public vespalib::TestApp +{ +public: + void testSequencing(); + void testResendError(); + void testResendConnDown(); + void testIllegalRoute(); + void testNoServices(); + void testBlockingClose(); + void testNonBlockingClose(); + int Main(); +}; + +void +Test::testSequencing() +{ + Slobrok slobrok; + TestServer src(Identity(""), getRouting(), slobrok); + TestServer dst(Identity("dst"), getRouting(), slobrok); + + RoutableQueue srcQ; + RoutableQueue dstQ; + + SourceSessionParams params; + params.setThrottlePolicy(IThrottlePolicy::SP()); + + SourceSession::UP ss = src.mb.createSourceSession(srcQ, params); + DestinationSession::UP ds = dst.mb.createDestinationSession("session", true, dstQ); + + ASSERT_TRUE(src.waitSlobrok("dst/session")); + + EXPECT_TRUE(ss->send(Message::UP(new SimpleMessage("foo", true, 1)), "dst").isAccepted()); + EXPECT_TRUE(ss->send(Message::UP(new SimpleMessage("foo", true, 2)), "dst").isAccepted()); + EXPECT_TRUE(ss->send(Message::UP(new SimpleMessage("foo", true, 1)), "dst").isAccepted()); + EXPECT_TRUE(waitQueueSize(dstQ, 2)); + FastOS_Thread::Sleep(250); + EXPECT_TRUE(waitQueueSize(dstQ, 2)); + EXPECT_TRUE(waitQueueSize(srcQ, 0)); + ds->acknowledge(Message::UP((Message*)dstQ.dequeue(0).release())); + ds->acknowledge(Message::UP((Message*)dstQ.dequeue(0).release())); + EXPECT_TRUE(waitQueueSize(srcQ, 2)); + EXPECT_TRUE(waitQueueSize(dstQ, 1)); + ds->acknowledge(Message::UP((Message*)dstQ.dequeue(0).release())); + ASSERT_TRUE(waitQueueSize(srcQ, 3)); + ASSERT_TRUE(waitQueueSize(dstQ, 0)); +} + +void +Test::testResendError() +{ + Slobrok slobrok; + RetryTransientErrorsPolicy::SP retryPolicy(new RetryTransientErrorsPolicy()); + retryPolicy->setBaseDelay(0); + TestServer src(MessageBusParams().addProtocol(IProtocol::SP(new SimpleProtocol())).setRetryPolicy(retryPolicy), + RPCNetworkParams().setSlobrokConfig(slobrok.config())); + src.mb.setupRouting(getRouting()); + TestServer dst(Identity("dst"), getRouting(), slobrok); + + RoutableQueue srcQ; + RoutableQueue dstQ; + + SourceSession::UP ss = src.mb.createSourceSession(srcQ); + DestinationSession::UP ds = dst.mb.createDestinationSession("session", true, dstQ); + + ASSERT_TRUE(src.waitSlobrok("dst/session")); + + { + Message::UP msg(new SimpleMessage("foo")); + msg->getTrace().setLevel(9); + EXPECT_TRUE(ss->send(std::move(msg), "dst").isAccepted()); + } + EXPECT_TRUE(waitQueueSize(dstQ, 1)); + { + Routable::UP r = dstQ.dequeue(0); + Reply::UP reply(new EmptyReply()); + r->swapState(*reply); + reply->addError(Error(ErrorCode::FATAL_ERROR, "error")); + ds->reply(std::move(reply)); + } + EXPECT_TRUE(waitQueueSize(srcQ, 1)); + EXPECT_TRUE(waitQueueSize(dstQ, 0)); + + { + Message::UP msg(new SimpleMessage("foo")); + msg->getTrace().setLevel(9); + EXPECT_TRUE(ss->send(std::move(msg), "dst").isAccepted()); + } + EXPECT_TRUE(waitQueueSize(dstQ, 1)); + { + Routable::UP r = dstQ.dequeue(0); + Reply::UP reply(new EmptyReply()); + r->swapState(*reply); + reply->addError(Error(ErrorCode::TRANSIENT_ERROR, "error")); + ds->reply(std::move(reply)); + } + EXPECT_TRUE(waitQueueSize(dstQ, 1)); + EXPECT_TRUE(waitQueueSize(srcQ, 1)); + ds->acknowledge(Message::UP((Message*)dstQ.dequeue(0).release())); + ASSERT_TRUE(waitQueueSize(srcQ, 2)); + ASSERT_TRUE(waitQueueSize(dstQ, 0)); + { + string trace1 = srcQ.dequeue(0)->getTrace().toString(); + string trace2 = srcQ.dequeue(0)->getTrace().toString(); + fprintf(stderr, "\nTRACE DUMP:\n%s\n\n", trace1.c_str()); + fprintf(stderr, "\nTRACE DUMP:\n%s\n\n", trace2.c_str()); + } +} + +void +Test::testResendConnDown() +{ + Slobrok slobrok; + RetryTransientErrorsPolicy::SP retryPolicy(new RetryTransientErrorsPolicy()); + retryPolicy->setBaseDelay(0); + TestServer src(MessageBusParams().addProtocol(IProtocol::SP(new SimpleProtocol())).setRetryPolicy(retryPolicy), + RPCNetworkParams().setSlobrokConfig(slobrok.config())); + src.mb.setupRouting(RoutingSpec().addTable(RoutingTableSpec(SimpleProtocol::NAME) + .addHop(HopSpec("dst", "dst2/session")) + .addHop(HopSpec("pxy", "[All]").addRecipient("dst")) + .addRoute(RouteSpec("dst").addHop("pxy")))); + RoutableQueue srcQ; + SourceSession::UP ss = src.mb.createSourceSession(srcQ); + + TestServer dst(Identity("dst"), RoutingSpec(), slobrok); + RoutableQueue dstQ; + DestinationSession::UP ds = dst.mb.createDestinationSession("session", true, dstQ); + ASSERT_TRUE(src.waitSlobrok("dst/session", 1)); + + { + TestServer dst2(Identity("dst2"), RoutingSpec(), slobrok); + RoutableQueue dst2Q; + DestinationSession::UP ds2 = dst2.mb.createDestinationSession("session", true, dst2Q); + ASSERT_TRUE(src.waitSlobrok("dst2/session", 1)); + + Message::UP msg(new SimpleMessage("foo")); + msg->getTrace().setLevel(9); + EXPECT_TRUE(ss->send(std::move(msg), "dst").isAccepted()); + EXPECT_TRUE(waitQueueSize(dst2Q, 1)); + Routable::UP obj = dst2Q.dequeue(0); + obj->discard(); + src.mb.setupRouting(RoutingSpec().addTable(RoutingTableSpec(SimpleProtocol::NAME) + .addHop(HopSpec("dst", "dst/session")))); + } // dst2 goes down, resend with new config + + ASSERT_TRUE(waitQueueSize(dstQ, 1)); // fails + ASSERT_TRUE(waitQueueSize(srcQ, 0)); + ds->acknowledge(Message::UP((Message*)dstQ.dequeue(0).release())); + ASSERT_TRUE(waitQueueSize(srcQ, 1)); + ASSERT_TRUE(waitQueueSize(dstQ, 0)); + + string trace = srcQ.dequeue(0)->getTrace().toString(); + fprintf(stderr, "\nTRACE DUMP:\n%s\n\n", trace.c_str()); +} + +void +Test::testIllegalRoute() +{ + Slobrok slobrok; + TestServer src(MessageBusParams() + .addProtocol(IProtocol::SP(new SimpleProtocol())) + .setRetryPolicy(IRetryPolicy::SP()), + RPCNetworkParams() + .setSlobrokConfig(slobrok.config())); + src.mb.setupRouting(getRouting()); + + RoutableQueue srcQ; + SourceSession::UP ss = src.mb.createSourceSession(srcQ, SourceSessionParams()); + { + // no such hop + Message::UP msg(new SimpleMessage("foo")); + msg->getTrace().setLevel(9); + msg->setRoute(Route::parse("bogus")); + EXPECT_TRUE(ss->send(std::move(msg)).isAccepted()); + } + ASSERT_TRUE(waitQueueSize(srcQ, 1)); + { + while (srcQ.size() > 0) { + Routable::UP routable = srcQ.dequeue(0); + ASSERT_TRUE(routable->isReply()); + Reply::UP r(static_cast<Reply*>(routable.release())); + EXPECT_EQUAL(1u, r->getNumErrors()); + EXPECT_EQUAL((uint32_t)ErrorCode::NO_ADDRESS_FOR_SERVICE, r->getError(0).getCode()); + string trace = r->getTrace().toString(); + fprintf(stderr, "\nTRACE DUMP:\n%s\n\n", trace.c_str()); + } + } +} + +void +Test::testNoServices() +{ + Slobrok slobrok; + TestServer src(MessageBusParams() + .addProtocol(IProtocol::SP(new SimpleProtocol())) + .setRetryPolicy(IRetryPolicy::SP()), + RPCNetworkParams() + .setSlobrokConfig(slobrok.config())); + src.mb.setupRouting(getBadRouting()); + + RoutableQueue srcQ; + SourceSession::UP ss = src.mb.createSourceSession(srcQ); + { + // no services for hop + Message::UP msg(new SimpleMessage("foo")); + msg->getTrace().setLevel(9); + EXPECT_TRUE(ss->send(std::move(msg), "dst").isAccepted()); + } + ASSERT_TRUE(waitQueueSize(srcQ, 1)); + { + while (srcQ.size() > 0) { + Routable::UP routable = srcQ.dequeue(0); + ASSERT_TRUE(routable->isReply()); + Reply::UP r(static_cast<Reply*>(routable.release())); + EXPECT_TRUE(r->getNumErrors() == 1); + EXPECT_TRUE(r->getError(0).getCode() == ErrorCode::NO_ADDRESS_FOR_SERVICE); + string trace = r->getTrace().toString(); + fprintf(stderr, "\nTRACE DUMP:\n%s\n\n", trace.c_str()); + } + } +} + +void +Test::testBlockingClose() +{ + Slobrok slobrok; + TestServer src(Identity(""), getRouting(), slobrok); + TestServer dst(Identity("dst"), getRouting(), slobrok); + + RoutableQueue srcQ; + DelayedHandler dstH(dst.mb, 1000); + ASSERT_TRUE(src.waitSlobrok("dst/session")); + + SourceSessionParams params; + SourceSession::UP ss = src.mb.createSourceSession(srcQ, params); + + EXPECT_TRUE(ss->send(Message::UP(new SimpleMessage("foo")), "dst").isAccepted()); + ss->close(); + srcQ.handleMessage(Message::UP(new SimpleMessage("bogus"))); + Routable::UP routable = srcQ.dequeue(0); + EXPECT_TRUE(routable->isReply()); +} + +void +Test::testNonBlockingClose() +{ + Slobrok slobrok; + TestServer src(Identity(""), getRouting(), slobrok); + + RoutableQueue srcQ; + + SourceSessionParams params; + SourceSession::UP ss = src.mb.createSourceSession(srcQ, params); + ss->close(); // this should not hang +} + +int +Test::Main() +{ + TEST_INIT("sourcesession_test"); + testSequencing(); TEST_FLUSH(); + testResendError(); TEST_FLUSH(); + testResendConnDown(); TEST_FLUSH(); + testIllegalRoute(); TEST_FLUSH(); + testNoServices(); TEST_FLUSH(); + testBlockingClose(); TEST_FLUSH(); + testNonBlockingClose(); TEST_FLUSH(); + TEST_DONE(); +} + +TEST_APPHOOK(Test); |