diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-04-01 14:15:34 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-04-01 16:50:21 +0000 |
commit | 554f5abc565b5e6c2d8996574857c25f91c02ee1 (patch) | |
tree | 305f278ffc1cf9f613d7f33a30a0ab454c371592 /messagebus/src | |
parent | 83f42c8c3a106f0685deb175d57d4f725185df5d (diff) |
- Redo the servicepool to resolve addresses first time and not loadbalance.
- Make it thread safe.
- Remove any loadbalancing tests
- Assert that no loadbalancing is requested.
Diffstat (limited to 'messagebus/src')
19 files changed, 206 insertions, 528 deletions
diff --git a/messagebus/src/tests/CMakeLists.txt b/messagebus/src/tests/CMakeLists.txt index cb2a403f55d..e05f732d8b4 100644 --- a/messagebus/src/tests/CMakeLists.txt +++ b/messagebus/src/tests/CMakeLists.txt @@ -9,7 +9,6 @@ add_subdirectory(context) add_subdirectory(emptyreply) add_subdirectory(error) add_subdirectory(identity) -add_subdirectory(loadbalance) add_subdirectory(messagebus) add_subdirectory(messageordering) add_subdirectory(messenger) diff --git a/messagebus/src/tests/loadbalance/.gitignore b/messagebus/src/tests/loadbalance/.gitignore deleted file mode 100644 index d1cbb5977f1..00000000000 --- a/messagebus/src/tests/loadbalance/.gitignore +++ /dev/null @@ -1,4 +0,0 @@ -.depend -Makefile -loadbalance_test -messagebus_loadbalance_test_app diff --git a/messagebus/src/tests/loadbalance/CMakeLists.txt b/messagebus/src/tests/loadbalance/CMakeLists.txt deleted file mode 100644 index e249a8284a6..00000000000 --- a/messagebus/src/tests/loadbalance/CMakeLists.txt +++ /dev/null @@ -1,9 +0,0 @@ -# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -vespa_add_executable(messagebus_loadbalance_test_app TEST - SOURCES - loadbalance.cpp - DEPENDS - messagebus_messagebus-test - messagebus -) -vespa_add_test(NAME messagebus_loadbalance_test_app COMMAND messagebus_loadbalance_test_app) diff --git a/messagebus/src/tests/loadbalance/loadbalance.cpp b/messagebus/src/tests/loadbalance/loadbalance.cpp deleted file mode 100644 index 05ea6d78871..00000000000 --- a/messagebus/src/tests/loadbalance/loadbalance.cpp +++ /dev/null @@ -1,88 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include <vespa/vespalib/testkit/testapp.h> -#include <vespa/messagebus/destinationsession.h> -#include <vespa/messagebus/intermediatesession.h> -#include <vespa/messagebus/messagebus.h> -#include <vespa/messagebus/routablequeue.h> -#include <vespa/messagebus/sourcesession.h> -#include <vespa/messagebus/sourcesessionparams.h> -#include <vespa/messagebus/testlib/receptor.h> -#include <vespa/messagebus/routing/routingspec.h> -#include <vespa/messagebus/testlib/simplemessage.h> -#include <vespa/messagebus/testlib/simplereply.h> -#include <vespa/messagebus/testlib/simpleprotocol.h> -#include <vespa/messagebus/testlib/slobrok.h> -#include <vespa/messagebus/testlib/testserver.h> - -using namespace mbus; -using namespace std::chrono_literals; - -struct Handler : public IMessageHandler -{ - DestinationSession::UP session; - uint32_t cnt; - - Handler(MessageBus &mb) : session(), cnt(0) { - session = mb.createDestinationSession("session", true, *this); - } - ~Handler() { - session.reset(); - } - void handleMessage(Message::UP msg) override { - ++cnt; - session->acknowledge(std::move(msg)); - } -}; - -RoutingSpec getRouting() { - return RoutingSpec() - .addTable(RoutingTableSpec("Simple") - .addHop(HopSpec("dst", "test/*/session")) - .addRoute(RouteSpec("test").addHop("dst"))); -} - -TEST_SETUP(Test); - -int -Test::Main() -{ - TEST_INIT("loadbalance_test"); - - Slobrok slobrok; - TestServer src(Identity(""), getRouting(), slobrok); - TestServer dst1(Identity("test/dst1"), getRouting(), slobrok); - TestServer dst2(Identity("test/dst2"), getRouting(), slobrok); - TestServer dst3(Identity("test/dst3"), getRouting(), slobrok); - - Handler h1(dst1.mb); - Handler h2(dst2.mb); - Handler h3(dst3.mb); - - ASSERT_TRUE(src.waitSlobrok("test/dst1/session")); - ASSERT_TRUE(src.waitSlobrok("test/dst2/session")); - ASSERT_TRUE(src.waitSlobrok("test/dst3/session")); - - RoutableQueue queue; - SourceSessionParams params; - params.setTimeout(30s); - params.setThrottlePolicy(IThrottlePolicy::SP()); - SourceSession::UP ss = src.mb.createSourceSession(queue, params); - - uint32_t msgCnt = 90; - ASSERT_TRUE(msgCnt % 3 == 0); - for (uint32_t i = 0; i < msgCnt; ++i) { - ss->send(Message::UP(new SimpleMessage("test")), "test"); - } - for (uint32_t i = 0; i < 1000; ++i) { - if (queue.size() == msgCnt) { - break; - } - std::this_thread::sleep_for(10ms); - } - EXPECT_TRUE(queue.size() == msgCnt); - EXPECT_TRUE(h1.cnt == msgCnt / 3); - EXPECT_TRUE(h2.cnt == msgCnt / 3); - EXPECT_TRUE(h3.cnt == msgCnt / 3); - TEST_DONE(); -} diff --git a/messagebus/src/tests/messagebus/messagebus.cpp b/messagebus/src/tests/messagebus/messagebus.cpp index 86c7bf91f2a..367bfc997d0 100644 --- a/messagebus/src/tests/messagebus/messagebus.cpp +++ b/messagebus/src/tests/messagebus/messagebus.cpp @@ -112,13 +112,10 @@ public: Test(); ~Test(); int Main() override; - void testSendToAny(); void testSendToCol(); - void testSendToAnyThenCol(); void testDirectHop(); void testDirectRoute(); void testRoutingPolicyCache(); - void debugTrace(); private: void setup(); @@ -131,21 +128,18 @@ private: TEST_APPHOOK(Test); -Test::Test() {} -Test::~Test() {} +Test::Test() = default; +Test::~Test() = default; int Test::Main() { TEST_INIT("messagebus_test"); - testSendToAny(); TEST_FLUSH(); testSendToCol(); TEST_FLUSH(); - testSendToAnyThenCol(); TEST_FLUSH(); testDirectHop(); TEST_FLUSH(); testDirectRoute(); TEST_FLUSH(); testRoutingPolicyCache(); TEST_FLUSH(); - debugTrace(); TEST_FLUSH(); TEST_DONE(); } @@ -206,38 +200,6 @@ void Test::teardown() } void -Test::testSendToAny() -{ - setup(); - for (uint32_t i = 0; i < 300; ++i) { - Message::UP msg(new SimpleMessage("test")); - EXPECT_TRUE(client->session->send(std::move(msg), "DocProc").isAccepted()); - } - EXPECT_TRUE(dp0->waitQueueSize(100)); - EXPECT_TRUE(dp1->waitQueueSize(100)); - EXPECT_TRUE(dp2->waitQueueSize(100)); - for (uint32_t i = 0; i < dpVec.size(); ++i) { - DocProc *p = dpVec[i]; - while (p->queue.size() > 0) { - Routable::UP msg = p->queue.dequeue(); - ASSERT_TRUE(msg); - Reply::UP reply(new EmptyReply()); - msg->swapState(*reply); - reply->addError(Error(ErrorCode::FATAL_ERROR, "")); - p->session->forward(std::move(reply)); - } - } - EXPECT_TRUE(client->waitQueueSize(300)); - while (client->queue.size() > 0) { - Routable::UP reply = client->queue.dequeue(); - ASSERT_TRUE(reply); - ASSERT_TRUE(reply->isReply()); - EXPECT_TRUE(static_cast<Reply&>(*reply).getNumErrors() == 1); - } - teardown(); -} - -void Test::testSendToCol() { setup(); @@ -282,83 +244,6 @@ Test::testSendToCol() } void -Test::testSendToAnyThenCol() -{ - setup(); - ASSERT_TRUE(SimpleMessage("msg").getHash() % 2 == 0); - for (uint32_t i = 0; i < 150; ++i) { - Message::UP msg(new SimpleMessage("msg")); - EXPECT_TRUE(client->session->send(std::move(msg), "Index").isAccepted()); - } - EXPECT_TRUE(dp0->waitQueueSize(50)); - EXPECT_TRUE(dp1->waitQueueSize(50)); - EXPECT_TRUE(dp2->waitQueueSize(50)); - for (uint32_t i = 0; i < dpVec.size(); ++i) { - DocProc *p = dpVec[i]; - while (p->queue.size() > 0) { - Routable::UP r = p->queue.dequeue(); - ASSERT_TRUE(r); - p->session->forward(std::move(r)); - } - } - EXPECT_TRUE(search00->waitQueueSize(150)); - EXPECT_TRUE(search01->waitQueueSize(0)); - EXPECT_TRUE(search10->waitQueueSize(150)); - EXPECT_TRUE(search11->waitQueueSize(0)); - ASSERT_TRUE(SimpleMessage("msh").getHash() % 2 == 1); - for (uint32_t i = 0; i < 150; ++i) { - Message::UP msg(new SimpleMessage("msh")); - ASSERT_TRUE(client->session->send(std::move(msg), "Index").isAccepted()); - } - EXPECT_TRUE(dp0->waitQueueSize(50)); - EXPECT_TRUE(dp1->waitQueueSize(50)); - EXPECT_TRUE(dp2->waitQueueSize(50)); - for (uint32_t i = 0; i < dpVec.size(); ++i) { - DocProc *p = dpVec[i]; - while (p->queue.size() > 0) { - Routable::UP r = p->queue.dequeue(); - ASSERT_TRUE(r); - p->session->forward(std::move(r)); - } - } - EXPECT_TRUE(search00->waitQueueSize(150)); - EXPECT_TRUE(search01->waitQueueSize(150)); - EXPECT_TRUE(search10->waitQueueSize(150)); - EXPECT_TRUE(search11->waitQueueSize(150)); - for (uint32_t i = 0; i < searchVec.size(); ++i) { - Search *s = searchVec[i]; - while (s->queue.size() > 0) { - Routable::UP msg = s->queue.dequeue(); - ASSERT_TRUE(msg); - Reply::UP reply(new EmptyReply()); - msg->swapState(*reply); - s->session->reply(std::move(reply)); - } - } - EXPECT_TRUE(dp0->waitQueueSize(100)); - EXPECT_TRUE(dp1->waitQueueSize(100)); - EXPECT_TRUE(dp2->waitQueueSize(100)); - for (uint32_t i = 0; i < dpVec.size(); ++i) { - DocProc *p = dpVec[i]; - while (p->queue.size() > 0) { - Routable::UP r = p->queue.dequeue(); - ASSERT_TRUE(r); - p->session->forward(std::move(r)); - } - } - client->waitQueueSize(300); - std::this_thread::sleep_for(100ms); - client->waitQueueSize(300); - while (client->queue.size() > 0) { - Routable::UP reply = client->queue.dequeue(); - ASSERT_TRUE(reply); - ASSERT_TRUE(reply->isReply()); - EXPECT_TRUE(static_cast<Reply&>(*reply).getNumErrors() == 0); - } - teardown(); -} - -void Test::testDirectHop() { setup(); @@ -468,65 +353,3 @@ Test::testRoutingPolicyCache() teardown(); } - -void -Test::debugTrace() -{ - setup(); - ASSERT_TRUE(SimpleMessage("msg").getHash() % 2 == 0); - for (uint32_t i = 0; i < 3; ++i) { - Message::UP msg(new SimpleMessage("msg")); - msg->getTrace().setLevel(4 + i); - EXPECT_TRUE(client->session->send(std::move(msg), "Index").isAccepted()); - } - EXPECT_TRUE(dp0->waitQueueSize(1)); - EXPECT_TRUE(dp1->waitQueueSize(1)); - EXPECT_TRUE(dp2->waitQueueSize(1)); - for (uint32_t i = 0; i < dpVec.size(); ++i) { - DocProc *p = dpVec[i]; - while (p->queue.size() > 0) { - Routable::UP r = p->queue.dequeue(); - ASSERT_TRUE(r); - p->session->forward(std::move(r)); - } - } - EXPECT_TRUE(search00->waitQueueSize(3)); - EXPECT_TRUE(search01->waitQueueSize(0)); - EXPECT_TRUE(search10->waitQueueSize(3)); - EXPECT_TRUE(search11->waitQueueSize(0)); - for (uint32_t i = 0; i < searchVec.size(); ++i) { - Search *s = searchVec[i]; - while (s->queue.size() > 0) { - Routable::UP msg = s->queue.dequeue(); - ASSERT_TRUE(msg); - Reply::UP reply(new EmptyReply()); - msg->swapState(*reply); - s->session->reply(std::move(reply)); - } - } - EXPECT_TRUE(dp0->waitQueueSize(1)); - EXPECT_TRUE(dp1->waitQueueSize(1)); - EXPECT_TRUE(dp2->waitQueueSize(1)); - for (uint32_t i = 0; i < dpVec.size(); ++i) { - DocProc *p = dpVec[i]; - while (p->queue.size() > 0) { - Routable::UP r = p->queue.dequeue(); - ASSERT_TRUE(r); - p->session->forward(std::move(r)); - } - } - client->waitQueueSize(3); - Routable::UP reply = client->queue.dequeue(); - fprintf(stderr, "\nTRACE DUMP(level=%d):\n%s\n\n", - reply->getTrace().getLevel(), - reply->getTrace().toString().c_str()); - reply = client->queue.dequeue(); - fprintf(stderr, "\nTRACE DUMP(level=%d):\n%s\n\n", - reply->getTrace().getLevel(), - reply->getTrace().toString().c_str()); - reply = client->queue.dequeue(); - fprintf(stderr, "\nTRACE DUMP(level=%d):\n%s\n\n", - reply->getTrace().getLevel(), - reply->getTrace().toString().c_str()); - teardown(); -} diff --git a/messagebus/src/tests/serviceaddress/serviceaddress.cpp b/messagebus/src/tests/serviceaddress/serviceaddress.cpp index 441da5a80ac..770cac620a9 100644 --- a/messagebus/src/tests/serviceaddress/serviceaddress.cpp +++ b/messagebus/src/tests/serviceaddress/serviceaddress.cpp @@ -7,34 +7,49 @@ using namespace mbus; -class Test : public vespalib::TestApp { -public: - int Main() override; - void testAddrServiceAddress(); - void testNameServiceAddress(); - -private: - bool waitSlobrok(RPCNetwork &network, const string &pattern, size_t num); - bool testAddress(RPCNetwork& network, const string &pattern, - const string &expectedSpec, const string &expectedSession); - bool testNullAddress(RPCNetwork &network, const string &pattern); -}; - -int -Test::Main() +bool +waitSlobrok(RPCNetwork &network, const string &pattern, size_t num) { - TEST_INIT("serviceaddress_test"); - - testAddrServiceAddress(); TEST_FLUSH(); - testNameServiceAddress(); TEST_FLUSH(); + for (int i = 0; i < 1000; i++) { + slobrok::api::IMirrorAPI::SpecList res = network.getMirror().lookup(pattern); + if (res.size() == num) { + return true; + } + std::this_thread::sleep_for(10ms); + } + return false; +} - TEST_DONE(); +bool +testNullAddress(RPCNetwork &network, const string &pattern) +{ + RPCService service(network.getMirror(), pattern); + RPCServiceAddress::UP obj = service.resolve(); + if ( ! EXPECT_FALSE(obj)) { + return false; + } + return true; } -TEST_APPHOOK(Test); +bool +testAddress(RPCNetwork &network, const string &pattern, + const string &expectedSpec, const string &expectedSession) +{ + RPCService service(network.getMirror(), pattern); + RPCServiceAddress::UP obj = service.resolve(); + if (!EXPECT_TRUE(obj)) { + return false; + } + if (!EXPECT_EQUAL(expectedSpec, obj->getConnectionSpec())) { + return false; + } + if (!EXPECT_EQUAL(expectedSession, obj->getSessionName())) { + return false; + } + return true; +} -void -Test::testAddrServiceAddress() +TEST("testAddrServiceAddress") { Slobrok slobrok; RPCNetwork network(RPCNetworkParams(slobrok.config()) @@ -55,8 +70,7 @@ Test::testAddrServiceAddress() network.shutdown(); } -void -Test::testNameServiceAddress() +TEST("testNameServiceAddress") { Slobrok slobrok; RPCNetwork network(RPCNetworkParams(slobrok.config()) @@ -74,45 +88,4 @@ Test::testNameServiceAddress() network.shutdown(); } -bool -Test::waitSlobrok(RPCNetwork &network, const string &pattern, size_t num) -{ - for (int i = 0; i < 1000; i++) { - slobrok::api::IMirrorAPI::SpecList res = network.getMirror().lookup(pattern); - if (res.size() == num) { - return true; - } - std::this_thread::sleep_for(10ms); - } - return false; -} - -bool -Test::testNullAddress(RPCNetwork &network, const string &pattern) -{ - RPCService service(network.getMirror(), pattern); - RPCServiceAddress::UP obj = service.resolve(); - if (!EXPECT_TRUE(obj.get() == NULL)) { - return false; - } - return true; -} - -bool -Test::testAddress(RPCNetwork &network, const string &pattern, - const string &expectedSpec, const string &expectedSession) -{ - RPCService service(network.getMirror(), pattern); - RPCServiceAddress::UP obj = service.resolve(); - if (!EXPECT_TRUE(obj.get() != NULL)) { - return false; - } - if (!EXPECT_EQUAL(expectedSpec, obj->getConnectionSpec())) { - return false; - } - if (!EXPECT_EQUAL(expectedSession, obj->getSessionName())) { - return false; - } - return true; -} - +TEST_MAIN() { TEST_RUN_ALL(); }
\ No newline at end of file diff --git a/messagebus/src/tests/servicepool/servicepool.cpp b/messagebus/src/tests/servicepool/servicepool.cpp index 1334831c30c..d81ff84c36a 100644 --- a/messagebus/src/tests/servicepool/servicepool.cpp +++ b/messagebus/src/tests/servicepool/servicepool.cpp @@ -4,67 +4,59 @@ #include <vespa/messagebus/network/rpcnetworkparams.h> #include <vespa/messagebus/network/rpcservicepool.h> #include <vespa/messagebus/testlib/slobrok.h> +#include <vespa/messagebus/testlib/testserver.h> #include <vespa/vespalib/testkit/testapp.h> using namespace mbus; -class Test : public vespalib::TestApp { -private: - void testMaxSize(); - -public: - int Main() override { - TEST_INIT("servicepool_test"); - - testMaxSize(); TEST_FLUSH(); - - TEST_DONE(); - } -}; - -TEST_APPHOOK(Test); - -void -Test::testMaxSize() +TEST("testMaxSize") { Slobrok slobrok; - RPCNetwork net(RPCNetworkParams(slobrok.config())); - RPCServicePool pool(net, 2); + TestServer me(Identity("me"), RoutingSpec(), slobrok); + RPCNetwork & net = me.net; + net.registerSession("foo"); + net.registerSession("bar"); + net.registerSession("baz"); + me.waitSlobrok("me/baz"); + RPCServicePool pool(net.getMirror(), 2); net.start(); + net.waitUntilReady(30s); - pool.resolve("foo"); + RPCServiceAddress::UP addr = pool.resolve("me/foo"); EXPECT_EQUAL(1u, pool.getSize()); - EXPECT_TRUE(pool.hasService("foo")); - EXPECT_TRUE(!pool.hasService("bar")); - EXPECT_TRUE(!pool.hasService("baz")); + EXPECT_TRUE(pool.hasService("me/foo")); + EXPECT_TRUE(!pool.hasService("me/bar")); + EXPECT_TRUE(!pool.hasService("me/baz")); - pool.resolve("foo"); + addr = pool.resolve("me/foo"); EXPECT_EQUAL(1u, pool.getSize()); - EXPECT_TRUE(pool.hasService("foo")); - EXPECT_TRUE(!pool.hasService("bar")); - EXPECT_TRUE(!pool.hasService("baz")); + EXPECT_TRUE(pool.hasService("me/foo")); + EXPECT_TRUE(!pool.hasService("me/bar")); + EXPECT_TRUE(!pool.hasService("me/baz")); - pool.resolve("bar"); + addr = pool.resolve("me/bar"); EXPECT_EQUAL(2u, pool.getSize()); - EXPECT_TRUE(pool.hasService("foo")); - EXPECT_TRUE(pool.hasService("bar")); - EXPECT_TRUE(!pool.hasService("baz")); + EXPECT_TRUE(pool.hasService("me/foo")); + EXPECT_TRUE(pool.hasService("me/bar")); + EXPECT_TRUE(!pool.hasService("me/baz")); - pool.resolve("baz"); + addr = pool.resolve("me/baz"); EXPECT_EQUAL(2u, pool.getSize()); - EXPECT_TRUE(!pool.hasService("foo")); - EXPECT_TRUE(pool.hasService("bar")); - EXPECT_TRUE(pool.hasService("baz")); + EXPECT_TRUE(!pool.hasService("me/foo")); + EXPECT_TRUE(pool.hasService("me/bar")); + EXPECT_TRUE(pool.hasService("me/baz")); - pool.resolve("bar"); + addr = pool.resolve("me/bar"); EXPECT_EQUAL(2u, pool.getSize()); - EXPECT_TRUE(!pool.hasService("foo")); - EXPECT_TRUE(pool.hasService("bar")); - EXPECT_TRUE(pool.hasService("baz")); + EXPECT_TRUE(!pool.hasService("me/foo")); + EXPECT_TRUE(pool.hasService("me/bar")); + EXPECT_TRUE(pool.hasService("me/baz")); - pool.resolve("foo"); + addr = pool.resolve("me/foo"); EXPECT_EQUAL(2u, pool.getSize()); - EXPECT_TRUE(pool.hasService("foo")); - EXPECT_TRUE(pool.hasService("bar")); - EXPECT_TRUE(!pool.hasService("baz")); + EXPECT_TRUE(pool.hasService("me/foo")); + EXPECT_TRUE(pool.hasService("me/bar")); + EXPECT_TRUE(!pool.hasService("me/baz")); } + +TEST_MAIN() { TEST_RUN_ALL(); }
\ No newline at end of file diff --git a/messagebus/src/tests/shutdown/shutdown.cpp b/messagebus/src/tests/shutdown/shutdown.cpp index e415622707f..07d9f0fae5d 100644 --- a/messagebus/src/tests/shutdown/shutdown.cpp +++ b/messagebus/src/tests/shutdown/shutdown.cpp @@ -12,30 +12,9 @@ using namespace mbus; -class Test : public vespalib::TestApp { -private: - void requireThatListenFailedIsExceptionSafe(); - void requireThatShutdownOnSourceWithPendingIsSafe(); - void requireThatShutdownOnIntermediateWithPendingIsSafe(); - -public: - int Main() override { - TEST_INIT("shutdown_test"); - - requireThatListenFailedIsExceptionSafe(); TEST_FLUSH(); - requireThatShutdownOnSourceWithPendingIsSafe(); TEST_FLUSH(); - requireThatShutdownOnIntermediateWithPendingIsSafe(); TEST_FLUSH(); - - TEST_DONE(); - } -}; - static const duration TIMEOUT = 120s; -TEST_APPHOOK(Test); - -void -Test::requireThatListenFailedIsExceptionSafe() +TEST("requireThatListenFailedIsExceptionSafe") { fnet::frt::StandaloneFRT orb; ASSERT_TRUE(orb.supervisor().Listen(0)); @@ -51,8 +30,7 @@ Test::requireThatListenFailedIsExceptionSafe() } } -void -Test::requireThatShutdownOnSourceWithPendingIsSafe() +TEST("requireThatShutdownOnSourceWithPendingIsSafe") { Slobrok slobrok; TestServer dstServer(MessageBusParams() @@ -87,8 +65,7 @@ Test::requireThatShutdownOnSourceWithPendingIsSafe() } } -void -Test::requireThatShutdownOnIntermediateWithPendingIsSafe() +TEST("requireThatShutdownOnIntermediateWithPendingIsSafe") { Slobrok slobrok; TestServer dstServer(MessageBusParams() @@ -114,7 +91,7 @@ Test::requireThatShutdownOnIntermediateWithPendingIsSafe() ASSERT_TRUE(srcServer.waitSlobrok("dst/session", 1)); for (uint32_t i = 0; i < 10; ++i) { - Message::UP msg(new SimpleMessage("msg")); + Message::UP msg = std::make_unique<SimpleMessage>("msg"); { TestServer itrServer(MessageBusParams() .setRetryPolicy(std::make_shared<RetryTransientErrorsPolicy>()) @@ -141,3 +118,5 @@ Test::requireThatShutdownOnIntermediateWithPendingIsSafe() dstServer.mb.sync(); } } + +TEST_MAIN() { TEST_RUN_ALL(); }
\ No newline at end of file diff --git a/messagebus/src/tests/targetpool/targetpool.cpp b/messagebus/src/tests/targetpool/targetpool.cpp index 0e0e566f2be..9259f992d6c 100644 --- a/messagebus/src/tests/targetpool/targetpool.cpp +++ b/messagebus/src/tests/targetpool/targetpool.cpp @@ -22,12 +22,7 @@ public: } }; -TEST_SETUP(Test); - -int -Test::Main() -{ - TEST_INIT("targetpool_test"); +TEST("targetpool_test") { // Necessary setup to be able to resolve targets. Slobrok slobrok; @@ -46,9 +41,9 @@ Test::Main() // Assert that all connections expire. RPCTarget::SP target; - ASSERT_TRUE((target = pool.getTarget(orb, adr1)).get() != NULL); target.reset(); - ASSERT_TRUE((target = pool.getTarget(orb, adr2)).get() != NULL); target.reset(); - ASSERT_TRUE((target = pool.getTarget(orb, adr3)).get() != NULL); target.reset(); + ASSERT_TRUE((target = pool.getTarget(orb, adr1))); target.reset(); + ASSERT_TRUE((target = pool.getTarget(orb, adr2))); target.reset(); + ASSERT_TRUE((target = pool.getTarget(orb, adr3))); target.reset(); EXPECT_EQUAL(3u, pool.size()); for (uint32_t i = 0; i < 10; ++i) { pool.flushTargets(false); @@ -59,19 +54,19 @@ Test::Main() EXPECT_EQUAL(0u, pool.size()); // Assert that only idle connections expire. - ASSERT_TRUE((target = pool.getTarget(orb, adr1)).get() != NULL); target.reset(); - ASSERT_TRUE((target = pool.getTarget(orb, adr2)).get() != NULL); target.reset(); - ASSERT_TRUE((target = pool.getTarget(orb, adr3)).get() != NULL); target.reset(); + ASSERT_TRUE((target = pool.getTarget(orb, adr1))); target.reset(); + ASSERT_TRUE((target = pool.getTarget(orb, adr2))); target.reset(); + ASSERT_TRUE((target = pool.getTarget(orb, adr3))); target.reset(); EXPECT_EQUAL(3u, pool.size()); timer.millis += 444; pool.flushTargets(false); EXPECT_EQUAL(3u, pool.size()); - ASSERT_TRUE((target = pool.getTarget(orb, adr2)).get() != NULL); target.reset(); - ASSERT_TRUE((target = pool.getTarget(orb, adr3)).get() != NULL); target.reset(); + ASSERT_TRUE((target = pool.getTarget(orb, adr2))); target.reset(); + ASSERT_TRUE((target = pool.getTarget(orb, adr3))); target.reset(); timer.millis += 444; pool.flushTargets(false); EXPECT_EQUAL(2u, pool.size()); - ASSERT_TRUE((target = pool.getTarget(orb, adr3)).get() != NULL); target.reset(); + ASSERT_TRUE((target = pool.getTarget(orb, adr3))); target.reset(); timer.millis += 444; pool.flushTargets(false); EXPECT_EQUAL(1u, pool.size()); @@ -80,7 +75,7 @@ Test::Main() EXPECT_EQUAL(0u, pool.size()); // Assert that connections never expire while they are referenced. - ASSERT_TRUE((target = pool.getTarget(orb, adr1)).get() != NULL); + ASSERT_TRUE((target = pool.getTarget(orb, adr1))); EXPECT_EQUAL(1u, pool.size()); for (int i = 0; i < 10; ++i) { timer.millis += 999; @@ -91,6 +86,6 @@ Test::Main() timer.millis += 999; pool.flushTargets(false); EXPECT_EQUAL(0u, pool.size()); - - TEST_DONE(); } + +TEST_MAIN() { TEST_RUN_ALL(); }
\ No newline at end of file diff --git a/messagebus/src/vespa/messagebus/callstack.cpp b/messagebus/src/vespa/messagebus/callstack.cpp index b7179e14cad..ab22f1ace34 100644 --- a/messagebus/src/vespa/messagebus/callstack.cpp +++ b/messagebus/src/vespa/messagebus/callstack.cpp @@ -20,7 +20,7 @@ CallStack::discard() } } -CallStack::~CallStack() { } +CallStack::~CallStack() = default; IReplyHandler & CallStack::pop(Reply &reply) diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp index 4b498c4c014..d1c663efc39 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp @@ -26,6 +26,8 @@ LOG_SETUP(".rpcnetwork"); using vespalib::make_string; using namespace std::chrono_literals; +namespace mbus { + namespace { /** @@ -56,9 +58,22 @@ public: } }; -} // namespace <unnamed> +struct TargetPoolTask : public FNET_Task { + RPCTargetPool &_pool; -namespace mbus { + TargetPoolTask(FNET_Scheduler &scheduler, RPCTargetPool &pool) + : FNET_Task(&scheduler), + _pool(pool) + { + ScheduleNow(); + } + void PerformTask() override { + _pool.flushTargets(false); + Schedule(1.0); + } +}; + +} RPCNetwork::SendContext::SendContext(RPCNetwork &net, const Message &msg, const std::vector<RoutingNode*> &recipients) @@ -92,20 +107,6 @@ RPCNetwork::SendContext::handleVersion(const vespalib::Version *version) } } -RPCNetwork::TargetPoolTask::TargetPoolTask(FNET_Scheduler &scheduler, RPCTargetPool &pool) - : FNET_Task(&scheduler), - _pool(pool) -{ - ScheduleNow(); -} - -void -RPCNetwork::TargetPoolTask::PerformTask() -{ - _pool.flushTargets(false); - Schedule(1.0); -} - RPCNetwork::RPCNetwork(const RPCNetworkParams ¶ms) : _owner(nullptr), _ident(params.getIdentity()), @@ -113,13 +114,13 @@ RPCNetwork::RPCNetwork(const RPCNetworkParams ¶ms) : _transport(std::make_unique<FNET_Transport>()), _orb(std::make_unique<FRT_Supervisor>(_transport.get())), _scheduler(*_transport->GetScheduler()), - _targetPool(std::make_unique<RPCTargetPool>(params.getConnectionExpireSecs())), - _targetPoolTask(_scheduler, *_targetPool), - _servicePool(std::make_unique<RPCServicePool>(*this, 4096)), _slobrokCfgFactory(std::make_unique<slobrok::ConfiguratorFactory>(params.getSlobrokConfig())), _mirror(std::make_unique<slobrok::api::MirrorAPI>(*_orb, *_slobrokCfgFactory)), _regAPI(std::make_unique<slobrok::api::RegisterAPI>(*_orb, *_slobrokCfgFactory)), _requestedPort(params.getListenPort()), + _targetPool(std::make_unique<RPCTargetPool>(params.getConnectionExpireSecs())), + _targetPoolTask(std::make_unique<TargetPoolTask>(_scheduler, *_targetPool)), + _servicePool(std::make_unique<RPCServicePool>(*_mirror, 4096)), _executor(std::make_unique<vespalib::ThreadStackExecutor>(params.getNumThreads(), 65536)), _sendV1(std::make_unique<RPCSendV1>()), _sendV2(std::make_unique<RPCSendV2>()), diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.h b/messagebus/src/vespa/messagebus/network/rpcnetwork.h index a6c2724929d..a8eb514387c 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.h +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.h @@ -50,13 +50,6 @@ private: void handleVersion(const vespalib::Version *version) override; }; - struct TargetPoolTask : public FNET_Task { - RPCTargetPool &_pool; - - TargetPoolTask(FNET_Scheduler &scheduler, RPCTargetPool &pool); - void PerformTask() override; - }; - using SendAdapterMap = std::map<vespalib::Version, RPCSendAdapter*>; INetworkOwner *_owner; @@ -65,13 +58,13 @@ private: std::unique_ptr<FNET_Transport> _transport; std::unique_ptr<FRT_Supervisor> _orb; FNET_Scheduler &_scheduler; - std::unique_ptr<RPCTargetPool> _targetPool; - TargetPoolTask _targetPoolTask; - std::unique_ptr<RPCServicePool> _servicePool; std::unique_ptr<slobrok::ConfiguratorFactory> _slobrokCfgFactory; std::unique_ptr<slobrok::api::IMirrorAPI> _mirror; std::unique_ptr<slobrok::api::RegisterAPI> _regAPI; int _requestedPort; + std::unique_ptr<RPCTargetPool> _targetPool; + std::unique_ptr<FNET_Task> _targetPoolTask; + std::unique_ptr<RPCServicePool> _servicePool; std::unique_ptr<vespalib::ThreadStackExecutor> _executor; std::unique_ptr<RPCSendAdapter> _sendV1; std::unique_ptr<RPCSendAdapter> _sendV2; diff --git a/messagebus/src/vespa/messagebus/network/rpcservice.cpp b/messagebus/src/vespa/messagebus/network/rpcservice.cpp index fd1b84f545f..8a7c60392bb 100644 --- a/messagebus/src/vespa/messagebus/network/rpcservice.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcservice.cpp @@ -6,36 +6,36 @@ namespace mbus { RPCService::RPCService(const Mirror &mirror, const string &pattern) : - _mirror(mirror), - _pattern(pattern), - _addressIdx(random()), - _addressGen(0), - _addressList() -{ } + _serviceName(), + _connectionSpec() +{ + if (pattern.find("tcp/") == 0) { + size_t pos = pattern.find_last_of('/'); + if (pos != string::npos && pos < pattern.size() - 1) { + RPCServiceAddress test(pattern, pattern.substr(0, pos)); + if ( ! test.isMalformed()) { + _serviceName = pattern; + _connectionSpec = pattern.substr(0, pos); + } + } + } else { + Mirror::SpecList addressList = mirror.lookup(pattern); + if (!addressList.empty()) { + assert(addressList.size() == 1); + const auto &entry = addressList[random() % addressList.size()]; + _serviceName = entry.first; + _connectionSpec = entry.second; + } + } +} RPCService::~RPCService() = default; RPCServiceAddress::UP RPCService::resolve() { - if (_pattern.find("tcp/") == 0) { - size_t pos = _pattern.find_last_of('/'); - if (pos != string::npos && pos < _pattern.size() - 1) { - auto ret = std::make_unique<RPCServiceAddress>(_pattern, _pattern.substr(0, pos)); - if (!ret->isMalformed()) { - return ret; - } - } - } else { - if (_addressGen != _mirror.updates()) { - _addressGen = _mirror.updates(); - _addressList = _mirror.lookup(_pattern); - } - if (!_addressList.empty()) { - _addressIdx = (_addressIdx + 1) % _addressList.size(); - const AddressList::value_type &entry = _addressList[_addressIdx]; - return std::make_unique<RPCServiceAddress>(entry.first, entry.second); - } + if ( !_serviceName.empty()) { + return std::make_unique<RPCServiceAddress>(_serviceName, _connectionSpec); } return RPCServiceAddress::UP(); } diff --git a/messagebus/src/vespa/messagebus/network/rpcservice.h b/messagebus/src/vespa/messagebus/network/rpcservice.h index 18c847b0298..ad08470321c 100644 --- a/messagebus/src/vespa/messagebus/network/rpcservice.h +++ b/messagebus/src/vespa/messagebus/network/rpcservice.h @@ -16,13 +16,9 @@ class RPCNetwork; class RPCService { private: typedef slobrok::api::IMirrorAPI Mirror; - typedef Mirror::SpecList AddressList; - const Mirror &_mirror; - string _pattern; - uint32_t _addressIdx; - uint32_t _addressGen; - AddressList _addressList; + string _serviceName; + string _connectionSpec; public: using UP = std::unique_ptr<RPCService>; @@ -46,13 +42,7 @@ public: */ RPCServiceAddress::UP resolve(); - /** - * Returns the pattern used when querying for the naming server for - * addresses. This is given at construtor time. - * - * @return The service pattern. - */ - const string &getPattern() const { return _pattern; } + bool isValid() const { return ! _connectionSpec.empty(); } }; } // namespace mbus diff --git a/messagebus/src/vespa/messagebus/network/rpcserviceaddress.h b/messagebus/src/vespa/messagebus/network/rpcserviceaddress.h index 99a9f383e75..77d64517c40 100644 --- a/messagebus/src/vespa/messagebus/network/rpcserviceaddress.h +++ b/messagebus/src/vespa/messagebus/network/rpcserviceaddress.h @@ -33,7 +33,7 @@ public: * @param connectionSpec The connection specification. */ RPCServiceAddress(const string &serviceName, const string &connectionSpec); - ~RPCServiceAddress(); + ~RPCServiceAddress() override; /** * Returns whether or not this service address is malformed. diff --git a/messagebus/src/vespa/messagebus/network/rpcservicepool.cpp b/messagebus/src/vespa/messagebus/network/rpcservicepool.cpp index fb40ccff62b..dd192a7a848 100644 --- a/messagebus/src/vespa/messagebus/network/rpcservicepool.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcservicepool.cpp @@ -6,11 +6,14 @@ namespace mbus { -RPCServicePool::RPCServicePool(RPCNetwork &net, uint32_t maxSize) : - _net(net), - _lru(maxSize) +RPCServicePool::RPCServicePool(const slobrok::api::IMirrorAPI & mirror, uint32_t maxSize) : + _mirror(mirror), + _lock(), + _lru(std::make_unique<ServiceCache>(maxSize)), + _updateGen(0), + _maxSize(maxSize) { - _lru.reserve(maxSize); + _lru->reserve(maxSize); assert(maxSize > 0); } @@ -19,27 +22,52 @@ RPCServicePool::~RPCServicePool() = default; RPCServiceAddress::UP RPCServicePool::resolve(const string &pattern) { - std::unique_ptr<RPCService> * found = _lru.findAndRef(pattern); - if (found) { - return (*found)->resolve(); + std::shared_ptr<RPCService> service; + { + LockGuard guard(_lock); + handleMirrorUpdates(guard); + std::shared_ptr<RPCService> *found = _lru->findAndRef(pattern); + if (found) { + service = *found; + } + } + + if (service) { + return service->resolve(); } else { - auto service = std::make_unique<RPCService>(_net.getMirror(), pattern); + service = std::make_shared<RPCService>(_mirror, pattern); auto result = service->resolve(); - _lru[pattern] = std::move(service); + if (service->isValid()) { + LockGuard guard(_lock); + (*_lru)[pattern] = std::move(service); + } return result; } + +} + +void +RPCServicePool::handleMirrorUpdates(const LockGuard &) { + uint32_t currentgen = _mirror.updates(); + if (_updateGen != currentgen) { + auto lru = std::make_unique<ServiceCache>(_maxSize); + _lru.swap(lru); + _updateGen = currentgen; + } } uint32_t RPCServicePool::getSize() const { - return _lru.size(); + LockGuard guard(_lock); + return _lru->size(); } bool RPCServicePool::hasService(const string &pattern) const { - return _lru.hasKey(pattern); + LockGuard guard(_lock); + return _lru->hasKey(pattern); } } // namespace mbus diff --git a/messagebus/src/vespa/messagebus/network/rpcservicepool.h b/messagebus/src/vespa/messagebus/network/rpcservicepool.h index 2614363838c..212c975a38c 100644 --- a/messagebus/src/vespa/messagebus/network/rpcservicepool.h +++ b/messagebus/src/vespa/messagebus/network/rpcservicepool.h @@ -13,12 +13,6 @@ class RPCNetwork; * the rpc network. */ class RPCServicePool { -private: - typedef vespalib::lrucache_map< vespalib::LruParam<string, RPCService::UP> > ServiceCache; - - RPCNetwork &_net; - ServiceCache _lru; - public: RPCServicePool(const RPCServicePool &) = delete; RPCServicePool & operator = (const RPCServicePool &) = delete; @@ -28,7 +22,7 @@ public: * @param net The underlying RPC network. * @param maxSize The max number of services to cache. */ - RPCServicePool(RPCNetwork &net, uint32_t maxSize); + RPCServicePool(const slobrok::api::IMirrorAPI & mirror, uint32_t maxSize); /** * Destructor. Frees any allocated resources. @@ -61,6 +55,17 @@ public: * @return True if a corresponding service is in the pool. */ bool hasService(const string &pattern) const; +private: + using ServiceCache = vespalib::lrucache_map< vespalib::LruParam<string, std::shared_ptr<RPCService> >>; + using LockGuard = std::lock_guard<std::mutex>; + + void handleMirrorUpdates(const LockGuard & guard); + + const slobrok::api::IMirrorAPI & _mirror; + mutable std::mutex _lock; + std::unique_ptr<ServiceCache> _lru; + uint32_t _updateGen; + uint32_t _maxSize; }; } // namespace mbus diff --git a/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp b/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp index cc09e44c460..b42ac47e54d 100644 --- a/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp +++ b/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp @@ -32,7 +32,7 @@ void RPCTargetPool::flushTargets(bool force) { uint64_t currentTime = _timer->getMilliTime(); - vespalib::LockGuard guard(_lock); + LockGuard guard(_lock); TargetMap::iterator it = _targets.begin(); while (it != _targets.end()) { Entry &entry = it->second; @@ -56,7 +56,7 @@ RPCTargetPool::flushTargets(bool force) size_t RPCTargetPool::size() { - vespalib::LockGuard guard(_lock); + LockGuard guard(_lock); return _targets.size(); } @@ -65,7 +65,7 @@ RPCTargetPool::getTarget(FRT_Supervisor &orb, const RPCServiceAddress &address) { const string & spec = address.getConnectionSpec(); uint64_t currentTime = _timer->getMilliTime(); - vespalib::LockGuard guard(_lock); + LockGuard guard(_lock); auto it = _targets.find(spec); if (it != _targets.end()) { Entry &entry = it->second; diff --git a/messagebus/src/vespa/messagebus/network/rpctargetpool.h b/messagebus/src/vespa/messagebus/network/rpctargetpool.h index 5f858f66993..d47fd977356 100644 --- a/messagebus/src/vespa/messagebus/network/rpctargetpool.h +++ b/messagebus/src/vespa/messagebus/network/rpctargetpool.h @@ -28,9 +28,10 @@ private: Entry(RPCTarget::SP target, uint64_t lastUse); }; - typedef std::map<string, Entry> TargetMap; + using TargetMap = std::map<string, Entry>; + using LockGuard = std::lock_guard<std::mutex>; - vespalib::Lock _lock; + std::mutex _lock; TargetMap _targets; ITimer::UP _timer; uint64_t _expireMillis; |