aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-04-01 14:15:34 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-04-01 16:50:21 +0000
commit554f5abc565b5e6c2d8996574857c25f91c02ee1 (patch)
tree305f278ffc1cf9f613d7f33a30a0ab454c371592
parent83f42c8c3a106f0685deb175d57d4f725185df5d (diff)
- Redo the servicepool to resolve addresses first time and not loadbalance.
- Make it thread safe. - Remove any loadbalancing tests - Assert that no loadbalancing is requested.
-rw-r--r--messagebus/src/tests/CMakeLists.txt1
-rw-r--r--messagebus/src/tests/loadbalance/.gitignore4
-rw-r--r--messagebus/src/tests/loadbalance/CMakeLists.txt9
-rw-r--r--messagebus/src/tests/loadbalance/loadbalance.cpp88
-rw-r--r--messagebus/src/tests/messagebus/messagebus.cpp181
-rw-r--r--messagebus/src/tests/serviceaddress/serviceaddress.cpp107
-rw-r--r--messagebus/src/tests/servicepool/servicepool.cpp80
-rw-r--r--messagebus/src/tests/shutdown/shutdown.cpp33
-rw-r--r--messagebus/src/tests/targetpool/targetpool.cpp31
-rw-r--r--messagebus/src/vespa/messagebus/callstack.cpp2
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.cpp39
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.h13
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcservice.cpp48
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcservice.h16
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcserviceaddress.h2
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcservicepool.cpp50
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcservicepool.h19
-rw-r--r--messagebus/src/vespa/messagebus/network/rpctargetpool.cpp6
-rw-r--r--messagebus/src/vespa/messagebus/network/rpctargetpool.h5
-rw-r--r--slobrok/src/tests/configure/configure.cpp12
20 files changed, 209 insertions, 537 deletions
diff --git a/messagebus/src/tests/CMakeLists.txt b/messagebus/src/tests/CMakeLists.txt
index cb2a403f55d..e05f732d8b4 100644
--- a/messagebus/src/tests/CMakeLists.txt
+++ b/messagebus/src/tests/CMakeLists.txt
@@ -9,7 +9,6 @@ add_subdirectory(context)
add_subdirectory(emptyreply)
add_subdirectory(error)
add_subdirectory(identity)
-add_subdirectory(loadbalance)
add_subdirectory(messagebus)
add_subdirectory(messageordering)
add_subdirectory(messenger)
diff --git a/messagebus/src/tests/loadbalance/.gitignore b/messagebus/src/tests/loadbalance/.gitignore
deleted file mode 100644
index d1cbb5977f1..00000000000
--- a/messagebus/src/tests/loadbalance/.gitignore
+++ /dev/null
@@ -1,4 +0,0 @@
-.depend
-Makefile
-loadbalance_test
-messagebus_loadbalance_test_app
diff --git a/messagebus/src/tests/loadbalance/CMakeLists.txt b/messagebus/src/tests/loadbalance/CMakeLists.txt
deleted file mode 100644
index e249a8284a6..00000000000
--- a/messagebus/src/tests/loadbalance/CMakeLists.txt
+++ /dev/null
@@ -1,9 +0,0 @@
-# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-vespa_add_executable(messagebus_loadbalance_test_app TEST
- SOURCES
- loadbalance.cpp
- DEPENDS
- messagebus_messagebus-test
- messagebus
-)
-vespa_add_test(NAME messagebus_loadbalance_test_app COMMAND messagebus_loadbalance_test_app)
diff --git a/messagebus/src/tests/loadbalance/loadbalance.cpp b/messagebus/src/tests/loadbalance/loadbalance.cpp
deleted file mode 100644
index 05ea6d78871..00000000000
--- a/messagebus/src/tests/loadbalance/loadbalance.cpp
+++ /dev/null
@@ -1,88 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include <vespa/vespalib/testkit/testapp.h>
-#include <vespa/messagebus/destinationsession.h>
-#include <vespa/messagebus/intermediatesession.h>
-#include <vespa/messagebus/messagebus.h>
-#include <vespa/messagebus/routablequeue.h>
-#include <vespa/messagebus/sourcesession.h>
-#include <vespa/messagebus/sourcesessionparams.h>
-#include <vespa/messagebus/testlib/receptor.h>
-#include <vespa/messagebus/routing/routingspec.h>
-#include <vespa/messagebus/testlib/simplemessage.h>
-#include <vespa/messagebus/testlib/simplereply.h>
-#include <vespa/messagebus/testlib/simpleprotocol.h>
-#include <vespa/messagebus/testlib/slobrok.h>
-#include <vespa/messagebus/testlib/testserver.h>
-
-using namespace mbus;
-using namespace std::chrono_literals;
-
-struct Handler : public IMessageHandler
-{
- DestinationSession::UP session;
- uint32_t cnt;
-
- Handler(MessageBus &mb) : session(), cnt(0) {
- session = mb.createDestinationSession("session", true, *this);
- }
- ~Handler() {
- session.reset();
- }
- void handleMessage(Message::UP msg) override {
- ++cnt;
- session->acknowledge(std::move(msg));
- }
-};
-
-RoutingSpec getRouting() {
- return RoutingSpec()
- .addTable(RoutingTableSpec("Simple")
- .addHop(HopSpec("dst", "test/*/session"))
- .addRoute(RouteSpec("test").addHop("dst")));
-}
-
-TEST_SETUP(Test);
-
-int
-Test::Main()
-{
- TEST_INIT("loadbalance_test");
-
- Slobrok slobrok;
- TestServer src(Identity(""), getRouting(), slobrok);
- TestServer dst1(Identity("test/dst1"), getRouting(), slobrok);
- TestServer dst2(Identity("test/dst2"), getRouting(), slobrok);
- TestServer dst3(Identity("test/dst3"), getRouting(), slobrok);
-
- Handler h1(dst1.mb);
- Handler h2(dst2.mb);
- Handler h3(dst3.mb);
-
- ASSERT_TRUE(src.waitSlobrok("test/dst1/session"));
- ASSERT_TRUE(src.waitSlobrok("test/dst2/session"));
- ASSERT_TRUE(src.waitSlobrok("test/dst3/session"));
-
- RoutableQueue queue;
- SourceSessionParams params;
- params.setTimeout(30s);
- params.setThrottlePolicy(IThrottlePolicy::SP());
- SourceSession::UP ss = src.mb.createSourceSession(queue, params);
-
- uint32_t msgCnt = 90;
- ASSERT_TRUE(msgCnt % 3 == 0);
- for (uint32_t i = 0; i < msgCnt; ++i) {
- ss->send(Message::UP(new SimpleMessage("test")), "test");
- }
- for (uint32_t i = 0; i < 1000; ++i) {
- if (queue.size() == msgCnt) {
- break;
- }
- std::this_thread::sleep_for(10ms);
- }
- EXPECT_TRUE(queue.size() == msgCnt);
- EXPECT_TRUE(h1.cnt == msgCnt / 3);
- EXPECT_TRUE(h2.cnt == msgCnt / 3);
- EXPECT_TRUE(h3.cnt == msgCnt / 3);
- TEST_DONE();
-}
diff --git a/messagebus/src/tests/messagebus/messagebus.cpp b/messagebus/src/tests/messagebus/messagebus.cpp
index 86c7bf91f2a..367bfc997d0 100644
--- a/messagebus/src/tests/messagebus/messagebus.cpp
+++ b/messagebus/src/tests/messagebus/messagebus.cpp
@@ -112,13 +112,10 @@ public:
Test();
~Test();
int Main() override;
- void testSendToAny();
void testSendToCol();
- void testSendToAnyThenCol();
void testDirectHop();
void testDirectRoute();
void testRoutingPolicyCache();
- void debugTrace();
private:
void setup();
@@ -131,21 +128,18 @@ private:
TEST_APPHOOK(Test);
-Test::Test() {}
-Test::~Test() {}
+Test::Test() = default;
+Test::~Test() = default;
int
Test::Main()
{
TEST_INIT("messagebus_test");
- testSendToAny(); TEST_FLUSH();
testSendToCol(); TEST_FLUSH();
- testSendToAnyThenCol(); TEST_FLUSH();
testDirectHop(); TEST_FLUSH();
testDirectRoute(); TEST_FLUSH();
testRoutingPolicyCache(); TEST_FLUSH();
- debugTrace(); TEST_FLUSH();
TEST_DONE();
}
@@ -206,38 +200,6 @@ void Test::teardown()
}
void
-Test::testSendToAny()
-{
- setup();
- for (uint32_t i = 0; i < 300; ++i) {
- Message::UP msg(new SimpleMessage("test"));
- EXPECT_TRUE(client->session->send(std::move(msg), "DocProc").isAccepted());
- }
- EXPECT_TRUE(dp0->waitQueueSize(100));
- EXPECT_TRUE(dp1->waitQueueSize(100));
- EXPECT_TRUE(dp2->waitQueueSize(100));
- for (uint32_t i = 0; i < dpVec.size(); ++i) {
- DocProc *p = dpVec[i];
- while (p->queue.size() > 0) {
- Routable::UP msg = p->queue.dequeue();
- ASSERT_TRUE(msg);
- Reply::UP reply(new EmptyReply());
- msg->swapState(*reply);
- reply->addError(Error(ErrorCode::FATAL_ERROR, ""));
- p->session->forward(std::move(reply));
- }
- }
- EXPECT_TRUE(client->waitQueueSize(300));
- while (client->queue.size() > 0) {
- Routable::UP reply = client->queue.dequeue();
- ASSERT_TRUE(reply);
- ASSERT_TRUE(reply->isReply());
- EXPECT_TRUE(static_cast<Reply&>(*reply).getNumErrors() == 1);
- }
- teardown();
-}
-
-void
Test::testSendToCol()
{
setup();
@@ -282,83 +244,6 @@ Test::testSendToCol()
}
void
-Test::testSendToAnyThenCol()
-{
- setup();
- ASSERT_TRUE(SimpleMessage("msg").getHash() % 2 == 0);
- for (uint32_t i = 0; i < 150; ++i) {
- Message::UP msg(new SimpleMessage("msg"));
- EXPECT_TRUE(client->session->send(std::move(msg), "Index").isAccepted());
- }
- EXPECT_TRUE(dp0->waitQueueSize(50));
- EXPECT_TRUE(dp1->waitQueueSize(50));
- EXPECT_TRUE(dp2->waitQueueSize(50));
- for (uint32_t i = 0; i < dpVec.size(); ++i) {
- DocProc *p = dpVec[i];
- while (p->queue.size() > 0) {
- Routable::UP r = p->queue.dequeue();
- ASSERT_TRUE(r);
- p->session->forward(std::move(r));
- }
- }
- EXPECT_TRUE(search00->waitQueueSize(150));
- EXPECT_TRUE(search01->waitQueueSize(0));
- EXPECT_TRUE(search10->waitQueueSize(150));
- EXPECT_TRUE(search11->waitQueueSize(0));
- ASSERT_TRUE(SimpleMessage("msh").getHash() % 2 == 1);
- for (uint32_t i = 0; i < 150; ++i) {
- Message::UP msg(new SimpleMessage("msh"));
- ASSERT_TRUE(client->session->send(std::move(msg), "Index").isAccepted());
- }
- EXPECT_TRUE(dp0->waitQueueSize(50));
- EXPECT_TRUE(dp1->waitQueueSize(50));
- EXPECT_TRUE(dp2->waitQueueSize(50));
- for (uint32_t i = 0; i < dpVec.size(); ++i) {
- DocProc *p = dpVec[i];
- while (p->queue.size() > 0) {
- Routable::UP r = p->queue.dequeue();
- ASSERT_TRUE(r);
- p->session->forward(std::move(r));
- }
- }
- EXPECT_TRUE(search00->waitQueueSize(150));
- EXPECT_TRUE(search01->waitQueueSize(150));
- EXPECT_TRUE(search10->waitQueueSize(150));
- EXPECT_TRUE(search11->waitQueueSize(150));
- for (uint32_t i = 0; i < searchVec.size(); ++i) {
- Search *s = searchVec[i];
- while (s->queue.size() > 0) {
- Routable::UP msg = s->queue.dequeue();
- ASSERT_TRUE(msg);
- Reply::UP reply(new EmptyReply());
- msg->swapState(*reply);
- s->session->reply(std::move(reply));
- }
- }
- EXPECT_TRUE(dp0->waitQueueSize(100));
- EXPECT_TRUE(dp1->waitQueueSize(100));
- EXPECT_TRUE(dp2->waitQueueSize(100));
- for (uint32_t i = 0; i < dpVec.size(); ++i) {
- DocProc *p = dpVec[i];
- while (p->queue.size() > 0) {
- Routable::UP r = p->queue.dequeue();
- ASSERT_TRUE(r);
- p->session->forward(std::move(r));
- }
- }
- client->waitQueueSize(300);
- std::this_thread::sleep_for(100ms);
- client->waitQueueSize(300);
- while (client->queue.size() > 0) {
- Routable::UP reply = client->queue.dequeue();
- ASSERT_TRUE(reply);
- ASSERT_TRUE(reply->isReply());
- EXPECT_TRUE(static_cast<Reply&>(*reply).getNumErrors() == 0);
- }
- teardown();
-}
-
-void
Test::testDirectHop()
{
setup();
@@ -468,65 +353,3 @@ Test::testRoutingPolicyCache()
teardown();
}
-
-void
-Test::debugTrace()
-{
- setup();
- ASSERT_TRUE(SimpleMessage("msg").getHash() % 2 == 0);
- for (uint32_t i = 0; i < 3; ++i) {
- Message::UP msg(new SimpleMessage("msg"));
- msg->getTrace().setLevel(4 + i);
- EXPECT_TRUE(client->session->send(std::move(msg), "Index").isAccepted());
- }
- EXPECT_TRUE(dp0->waitQueueSize(1));
- EXPECT_TRUE(dp1->waitQueueSize(1));
- EXPECT_TRUE(dp2->waitQueueSize(1));
- for (uint32_t i = 0; i < dpVec.size(); ++i) {
- DocProc *p = dpVec[i];
- while (p->queue.size() > 0) {
- Routable::UP r = p->queue.dequeue();
- ASSERT_TRUE(r);
- p->session->forward(std::move(r));
- }
- }
- EXPECT_TRUE(search00->waitQueueSize(3));
- EXPECT_TRUE(search01->waitQueueSize(0));
- EXPECT_TRUE(search10->waitQueueSize(3));
- EXPECT_TRUE(search11->waitQueueSize(0));
- for (uint32_t i = 0; i < searchVec.size(); ++i) {
- Search *s = searchVec[i];
- while (s->queue.size() > 0) {
- Routable::UP msg = s->queue.dequeue();
- ASSERT_TRUE(msg);
- Reply::UP reply(new EmptyReply());
- msg->swapState(*reply);
- s->session->reply(std::move(reply));
- }
- }
- EXPECT_TRUE(dp0->waitQueueSize(1));
- EXPECT_TRUE(dp1->waitQueueSize(1));
- EXPECT_TRUE(dp2->waitQueueSize(1));
- for (uint32_t i = 0; i < dpVec.size(); ++i) {
- DocProc *p = dpVec[i];
- while (p->queue.size() > 0) {
- Routable::UP r = p->queue.dequeue();
- ASSERT_TRUE(r);
- p->session->forward(std::move(r));
- }
- }
- client->waitQueueSize(3);
- Routable::UP reply = client->queue.dequeue();
- fprintf(stderr, "\nTRACE DUMP(level=%d):\n%s\n\n",
- reply->getTrace().getLevel(),
- reply->getTrace().toString().c_str());
- reply = client->queue.dequeue();
- fprintf(stderr, "\nTRACE DUMP(level=%d):\n%s\n\n",
- reply->getTrace().getLevel(),
- reply->getTrace().toString().c_str());
- reply = client->queue.dequeue();
- fprintf(stderr, "\nTRACE DUMP(level=%d):\n%s\n\n",
- reply->getTrace().getLevel(),
- reply->getTrace().toString().c_str());
- teardown();
-}
diff --git a/messagebus/src/tests/serviceaddress/serviceaddress.cpp b/messagebus/src/tests/serviceaddress/serviceaddress.cpp
index 441da5a80ac..770cac620a9 100644
--- a/messagebus/src/tests/serviceaddress/serviceaddress.cpp
+++ b/messagebus/src/tests/serviceaddress/serviceaddress.cpp
@@ -7,34 +7,49 @@
using namespace mbus;
-class Test : public vespalib::TestApp {
-public:
- int Main() override;
- void testAddrServiceAddress();
- void testNameServiceAddress();
-
-private:
- bool waitSlobrok(RPCNetwork &network, const string &pattern, size_t num);
- bool testAddress(RPCNetwork& network, const string &pattern,
- const string &expectedSpec, const string &expectedSession);
- bool testNullAddress(RPCNetwork &network, const string &pattern);
-};
-
-int
-Test::Main()
+bool
+waitSlobrok(RPCNetwork &network, const string &pattern, size_t num)
{
- TEST_INIT("serviceaddress_test");
-
- testAddrServiceAddress(); TEST_FLUSH();
- testNameServiceAddress(); TEST_FLUSH();
+ for (int i = 0; i < 1000; i++) {
+ slobrok::api::IMirrorAPI::SpecList res = network.getMirror().lookup(pattern);
+ if (res.size() == num) {
+ return true;
+ }
+ std::this_thread::sleep_for(10ms);
+ }
+ return false;
+}
- TEST_DONE();
+bool
+testNullAddress(RPCNetwork &network, const string &pattern)
+{
+ RPCService service(network.getMirror(), pattern);
+ RPCServiceAddress::UP obj = service.resolve();
+ if ( ! EXPECT_FALSE(obj)) {
+ return false;
+ }
+ return true;
}
-TEST_APPHOOK(Test);
+bool
+testAddress(RPCNetwork &network, const string &pattern,
+ const string &expectedSpec, const string &expectedSession)
+{
+ RPCService service(network.getMirror(), pattern);
+ RPCServiceAddress::UP obj = service.resolve();
+ if (!EXPECT_TRUE(obj)) {
+ return false;
+ }
+ if (!EXPECT_EQUAL(expectedSpec, obj->getConnectionSpec())) {
+ return false;
+ }
+ if (!EXPECT_EQUAL(expectedSession, obj->getSessionName())) {
+ return false;
+ }
+ return true;
+}
-void
-Test::testAddrServiceAddress()
+TEST("testAddrServiceAddress")
{
Slobrok slobrok;
RPCNetwork network(RPCNetworkParams(slobrok.config())
@@ -55,8 +70,7 @@ Test::testAddrServiceAddress()
network.shutdown();
}
-void
-Test::testNameServiceAddress()
+TEST("testNameServiceAddress")
{
Slobrok slobrok;
RPCNetwork network(RPCNetworkParams(slobrok.config())
@@ -74,45 +88,4 @@ Test::testNameServiceAddress()
network.shutdown();
}
-bool
-Test::waitSlobrok(RPCNetwork &network, const string &pattern, size_t num)
-{
- for (int i = 0; i < 1000; i++) {
- slobrok::api::IMirrorAPI::SpecList res = network.getMirror().lookup(pattern);
- if (res.size() == num) {
- return true;
- }
- std::this_thread::sleep_for(10ms);
- }
- return false;
-}
-
-bool
-Test::testNullAddress(RPCNetwork &network, const string &pattern)
-{
- RPCService service(network.getMirror(), pattern);
- RPCServiceAddress::UP obj = service.resolve();
- if (!EXPECT_TRUE(obj.get() == NULL)) {
- return false;
- }
- return true;
-}
-
-bool
-Test::testAddress(RPCNetwork &network, const string &pattern,
- const string &expectedSpec, const string &expectedSession)
-{
- RPCService service(network.getMirror(), pattern);
- RPCServiceAddress::UP obj = service.resolve();
- if (!EXPECT_TRUE(obj.get() != NULL)) {
- return false;
- }
- if (!EXPECT_EQUAL(expectedSpec, obj->getConnectionSpec())) {
- return false;
- }
- if (!EXPECT_EQUAL(expectedSession, obj->getSessionName())) {
- return false;
- }
- return true;
-}
-
+TEST_MAIN() { TEST_RUN_ALL(); } \ No newline at end of file
diff --git a/messagebus/src/tests/servicepool/servicepool.cpp b/messagebus/src/tests/servicepool/servicepool.cpp
index 1334831c30c..d81ff84c36a 100644
--- a/messagebus/src/tests/servicepool/servicepool.cpp
+++ b/messagebus/src/tests/servicepool/servicepool.cpp
@@ -4,67 +4,59 @@
#include <vespa/messagebus/network/rpcnetworkparams.h>
#include <vespa/messagebus/network/rpcservicepool.h>
#include <vespa/messagebus/testlib/slobrok.h>
+#include <vespa/messagebus/testlib/testserver.h>
#include <vespa/vespalib/testkit/testapp.h>
using namespace mbus;
-class Test : public vespalib::TestApp {
-private:
- void testMaxSize();
-
-public:
- int Main() override {
- TEST_INIT("servicepool_test");
-
- testMaxSize(); TEST_FLUSH();
-
- TEST_DONE();
- }
-};
-
-TEST_APPHOOK(Test);
-
-void
-Test::testMaxSize()
+TEST("testMaxSize")
{
Slobrok slobrok;
- RPCNetwork net(RPCNetworkParams(slobrok.config()));
- RPCServicePool pool(net, 2);
+ TestServer me(Identity("me"), RoutingSpec(), slobrok);
+ RPCNetwork & net = me.net;
+ net.registerSession("foo");
+ net.registerSession("bar");
+ net.registerSession("baz");
+ me.waitSlobrok("me/baz");
+ RPCServicePool pool(net.getMirror(), 2);
net.start();
+ net.waitUntilReady(30s);
- pool.resolve("foo");
+ RPCServiceAddress::UP addr = pool.resolve("me/foo");
EXPECT_EQUAL(1u, pool.getSize());
- EXPECT_TRUE(pool.hasService("foo"));
- EXPECT_TRUE(!pool.hasService("bar"));
- EXPECT_TRUE(!pool.hasService("baz"));
+ EXPECT_TRUE(pool.hasService("me/foo"));
+ EXPECT_TRUE(!pool.hasService("me/bar"));
+ EXPECT_TRUE(!pool.hasService("me/baz"));
- pool.resolve("foo");
+ addr = pool.resolve("me/foo");
EXPECT_EQUAL(1u, pool.getSize());
- EXPECT_TRUE(pool.hasService("foo"));
- EXPECT_TRUE(!pool.hasService("bar"));
- EXPECT_TRUE(!pool.hasService("baz"));
+ EXPECT_TRUE(pool.hasService("me/foo"));
+ EXPECT_TRUE(!pool.hasService("me/bar"));
+ EXPECT_TRUE(!pool.hasService("me/baz"));
- pool.resolve("bar");
+ addr = pool.resolve("me/bar");
EXPECT_EQUAL(2u, pool.getSize());
- EXPECT_TRUE(pool.hasService("foo"));
- EXPECT_TRUE(pool.hasService("bar"));
- EXPECT_TRUE(!pool.hasService("baz"));
+ EXPECT_TRUE(pool.hasService("me/foo"));
+ EXPECT_TRUE(pool.hasService("me/bar"));
+ EXPECT_TRUE(!pool.hasService("me/baz"));
- pool.resolve("baz");
+ addr = pool.resolve("me/baz");
EXPECT_EQUAL(2u, pool.getSize());
- EXPECT_TRUE(!pool.hasService("foo"));
- EXPECT_TRUE(pool.hasService("bar"));
- EXPECT_TRUE(pool.hasService("baz"));
+ EXPECT_TRUE(!pool.hasService("me/foo"));
+ EXPECT_TRUE(pool.hasService("me/bar"));
+ EXPECT_TRUE(pool.hasService("me/baz"));
- pool.resolve("bar");
+ addr = pool.resolve("me/bar");
EXPECT_EQUAL(2u, pool.getSize());
- EXPECT_TRUE(!pool.hasService("foo"));
- EXPECT_TRUE(pool.hasService("bar"));
- EXPECT_TRUE(pool.hasService("baz"));
+ EXPECT_TRUE(!pool.hasService("me/foo"));
+ EXPECT_TRUE(pool.hasService("me/bar"));
+ EXPECT_TRUE(pool.hasService("me/baz"));
- pool.resolve("foo");
+ addr = pool.resolve("me/foo");
EXPECT_EQUAL(2u, pool.getSize());
- EXPECT_TRUE(pool.hasService("foo"));
- EXPECT_TRUE(pool.hasService("bar"));
- EXPECT_TRUE(!pool.hasService("baz"));
+ EXPECT_TRUE(pool.hasService("me/foo"));
+ EXPECT_TRUE(pool.hasService("me/bar"));
+ EXPECT_TRUE(!pool.hasService("me/baz"));
}
+
+TEST_MAIN() { TEST_RUN_ALL(); } \ No newline at end of file
diff --git a/messagebus/src/tests/shutdown/shutdown.cpp b/messagebus/src/tests/shutdown/shutdown.cpp
index e415622707f..07d9f0fae5d 100644
--- a/messagebus/src/tests/shutdown/shutdown.cpp
+++ b/messagebus/src/tests/shutdown/shutdown.cpp
@@ -12,30 +12,9 @@
using namespace mbus;
-class Test : public vespalib::TestApp {
-private:
- void requireThatListenFailedIsExceptionSafe();
- void requireThatShutdownOnSourceWithPendingIsSafe();
- void requireThatShutdownOnIntermediateWithPendingIsSafe();
-
-public:
- int Main() override {
- TEST_INIT("shutdown_test");
-
- requireThatListenFailedIsExceptionSafe(); TEST_FLUSH();
- requireThatShutdownOnSourceWithPendingIsSafe(); TEST_FLUSH();
- requireThatShutdownOnIntermediateWithPendingIsSafe(); TEST_FLUSH();
-
- TEST_DONE();
- }
-};
-
static const duration TIMEOUT = 120s;
-TEST_APPHOOK(Test);
-
-void
-Test::requireThatListenFailedIsExceptionSafe()
+TEST("requireThatListenFailedIsExceptionSafe")
{
fnet::frt::StandaloneFRT orb;
ASSERT_TRUE(orb.supervisor().Listen(0));
@@ -51,8 +30,7 @@ Test::requireThatListenFailedIsExceptionSafe()
}
}
-void
-Test::requireThatShutdownOnSourceWithPendingIsSafe()
+TEST("requireThatShutdownOnSourceWithPendingIsSafe")
{
Slobrok slobrok;
TestServer dstServer(MessageBusParams()
@@ -87,8 +65,7 @@ Test::requireThatShutdownOnSourceWithPendingIsSafe()
}
}
-void
-Test::requireThatShutdownOnIntermediateWithPendingIsSafe()
+TEST("requireThatShutdownOnIntermediateWithPendingIsSafe")
{
Slobrok slobrok;
TestServer dstServer(MessageBusParams()
@@ -114,7 +91,7 @@ Test::requireThatShutdownOnIntermediateWithPendingIsSafe()
ASSERT_TRUE(srcServer.waitSlobrok("dst/session", 1));
for (uint32_t i = 0; i < 10; ++i) {
- Message::UP msg(new SimpleMessage("msg"));
+ Message::UP msg = std::make_unique<SimpleMessage>("msg");
{
TestServer itrServer(MessageBusParams()
.setRetryPolicy(std::make_shared<RetryTransientErrorsPolicy>())
@@ -141,3 +118,5 @@ Test::requireThatShutdownOnIntermediateWithPendingIsSafe()
dstServer.mb.sync();
}
}
+
+TEST_MAIN() { TEST_RUN_ALL(); } \ No newline at end of file
diff --git a/messagebus/src/tests/targetpool/targetpool.cpp b/messagebus/src/tests/targetpool/targetpool.cpp
index 0e0e566f2be..9259f992d6c 100644
--- a/messagebus/src/tests/targetpool/targetpool.cpp
+++ b/messagebus/src/tests/targetpool/targetpool.cpp
@@ -22,12 +22,7 @@ public:
}
};
-TEST_SETUP(Test);
-
-int
-Test::Main()
-{
- TEST_INIT("targetpool_test");
+TEST("targetpool_test") {
// Necessary setup to be able to resolve targets.
Slobrok slobrok;
@@ -46,9 +41,9 @@ Test::Main()
// Assert that all connections expire.
RPCTarget::SP target;
- ASSERT_TRUE((target = pool.getTarget(orb, adr1)).get() != NULL); target.reset();
- ASSERT_TRUE((target = pool.getTarget(orb, adr2)).get() != NULL); target.reset();
- ASSERT_TRUE((target = pool.getTarget(orb, adr3)).get() != NULL); target.reset();
+ ASSERT_TRUE((target = pool.getTarget(orb, adr1))); target.reset();
+ ASSERT_TRUE((target = pool.getTarget(orb, adr2))); target.reset();
+ ASSERT_TRUE((target = pool.getTarget(orb, adr3))); target.reset();
EXPECT_EQUAL(3u, pool.size());
for (uint32_t i = 0; i < 10; ++i) {
pool.flushTargets(false);
@@ -59,19 +54,19 @@ Test::Main()
EXPECT_EQUAL(0u, pool.size());
// Assert that only idle connections expire.
- ASSERT_TRUE((target = pool.getTarget(orb, adr1)).get() != NULL); target.reset();
- ASSERT_TRUE((target = pool.getTarget(orb, adr2)).get() != NULL); target.reset();
- ASSERT_TRUE((target = pool.getTarget(orb, adr3)).get() != NULL); target.reset();
+ ASSERT_TRUE((target = pool.getTarget(orb, adr1))); target.reset();
+ ASSERT_TRUE((target = pool.getTarget(orb, adr2))); target.reset();
+ ASSERT_TRUE((target = pool.getTarget(orb, adr3))); target.reset();
EXPECT_EQUAL(3u, pool.size());
timer.millis += 444;
pool.flushTargets(false);
EXPECT_EQUAL(3u, pool.size());
- ASSERT_TRUE((target = pool.getTarget(orb, adr2)).get() != NULL); target.reset();
- ASSERT_TRUE((target = pool.getTarget(orb, adr3)).get() != NULL); target.reset();
+ ASSERT_TRUE((target = pool.getTarget(orb, adr2))); target.reset();
+ ASSERT_TRUE((target = pool.getTarget(orb, adr3))); target.reset();
timer.millis += 444;
pool.flushTargets(false);
EXPECT_EQUAL(2u, pool.size());
- ASSERT_TRUE((target = pool.getTarget(orb, adr3)).get() != NULL); target.reset();
+ ASSERT_TRUE((target = pool.getTarget(orb, adr3))); target.reset();
timer.millis += 444;
pool.flushTargets(false);
EXPECT_EQUAL(1u, pool.size());
@@ -80,7 +75,7 @@ Test::Main()
EXPECT_EQUAL(0u, pool.size());
// Assert that connections never expire while they are referenced.
- ASSERT_TRUE((target = pool.getTarget(orb, adr1)).get() != NULL);
+ ASSERT_TRUE((target = pool.getTarget(orb, adr1)));
EXPECT_EQUAL(1u, pool.size());
for (int i = 0; i < 10; ++i) {
timer.millis += 999;
@@ -91,6 +86,6 @@ Test::Main()
timer.millis += 999;
pool.flushTargets(false);
EXPECT_EQUAL(0u, pool.size());
-
- TEST_DONE();
}
+
+TEST_MAIN() { TEST_RUN_ALL(); } \ No newline at end of file
diff --git a/messagebus/src/vespa/messagebus/callstack.cpp b/messagebus/src/vespa/messagebus/callstack.cpp
index b7179e14cad..ab22f1ace34 100644
--- a/messagebus/src/vespa/messagebus/callstack.cpp
+++ b/messagebus/src/vespa/messagebus/callstack.cpp
@@ -20,7 +20,7 @@ CallStack::discard()
}
}
-CallStack::~CallStack() { }
+CallStack::~CallStack() = default;
IReplyHandler &
CallStack::pop(Reply &reply)
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
index 4b498c4c014..d1c663efc39 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
@@ -26,6 +26,8 @@ LOG_SETUP(".rpcnetwork");
using vespalib::make_string;
using namespace std::chrono_literals;
+namespace mbus {
+
namespace {
/**
@@ -56,9 +58,22 @@ public:
}
};
-} // namespace <unnamed>
+struct TargetPoolTask : public FNET_Task {
+ RPCTargetPool &_pool;
-namespace mbus {
+ TargetPoolTask(FNET_Scheduler &scheduler, RPCTargetPool &pool)
+ : FNET_Task(&scheduler),
+ _pool(pool)
+ {
+ ScheduleNow();
+ }
+ void PerformTask() override {
+ _pool.flushTargets(false);
+ Schedule(1.0);
+ }
+};
+
+}
RPCNetwork::SendContext::SendContext(RPCNetwork &net, const Message &msg,
const std::vector<RoutingNode*> &recipients)
@@ -92,20 +107,6 @@ RPCNetwork::SendContext::handleVersion(const vespalib::Version *version)
}
}
-RPCNetwork::TargetPoolTask::TargetPoolTask(FNET_Scheduler &scheduler, RPCTargetPool &pool)
- : FNET_Task(&scheduler),
- _pool(pool)
-{
- ScheduleNow();
-}
-
-void
-RPCNetwork::TargetPoolTask::PerformTask()
-{
- _pool.flushTargets(false);
- Schedule(1.0);
-}
-
RPCNetwork::RPCNetwork(const RPCNetworkParams &params) :
_owner(nullptr),
_ident(params.getIdentity()),
@@ -113,13 +114,13 @@ RPCNetwork::RPCNetwork(const RPCNetworkParams &params) :
_transport(std::make_unique<FNET_Transport>()),
_orb(std::make_unique<FRT_Supervisor>(_transport.get())),
_scheduler(*_transport->GetScheduler()),
- _targetPool(std::make_unique<RPCTargetPool>(params.getConnectionExpireSecs())),
- _targetPoolTask(_scheduler, *_targetPool),
- _servicePool(std::make_unique<RPCServicePool>(*this, 4096)),
_slobrokCfgFactory(std::make_unique<slobrok::ConfiguratorFactory>(params.getSlobrokConfig())),
_mirror(std::make_unique<slobrok::api::MirrorAPI>(*_orb, *_slobrokCfgFactory)),
_regAPI(std::make_unique<slobrok::api::RegisterAPI>(*_orb, *_slobrokCfgFactory)),
_requestedPort(params.getListenPort()),
+ _targetPool(std::make_unique<RPCTargetPool>(params.getConnectionExpireSecs())),
+ _targetPoolTask(std::make_unique<TargetPoolTask>(_scheduler, *_targetPool)),
+ _servicePool(std::make_unique<RPCServicePool>(*_mirror, 4096)),
_executor(std::make_unique<vespalib::ThreadStackExecutor>(params.getNumThreads(), 65536)),
_sendV1(std::make_unique<RPCSendV1>()),
_sendV2(std::make_unique<RPCSendV2>()),
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.h b/messagebus/src/vespa/messagebus/network/rpcnetwork.h
index a6c2724929d..a8eb514387c 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.h
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.h
@@ -50,13 +50,6 @@ private:
void handleVersion(const vespalib::Version *version) override;
};
- struct TargetPoolTask : public FNET_Task {
- RPCTargetPool &_pool;
-
- TargetPoolTask(FNET_Scheduler &scheduler, RPCTargetPool &pool);
- void PerformTask() override;
- };
-
using SendAdapterMap = std::map<vespalib::Version, RPCSendAdapter*>;
INetworkOwner *_owner;
@@ -65,13 +58,13 @@ private:
std::unique_ptr<FNET_Transport> _transport;
std::unique_ptr<FRT_Supervisor> _orb;
FNET_Scheduler &_scheduler;
- std::unique_ptr<RPCTargetPool> _targetPool;
- TargetPoolTask _targetPoolTask;
- std::unique_ptr<RPCServicePool> _servicePool;
std::unique_ptr<slobrok::ConfiguratorFactory> _slobrokCfgFactory;
std::unique_ptr<slobrok::api::IMirrorAPI> _mirror;
std::unique_ptr<slobrok::api::RegisterAPI> _regAPI;
int _requestedPort;
+ std::unique_ptr<RPCTargetPool> _targetPool;
+ std::unique_ptr<FNET_Task> _targetPoolTask;
+ std::unique_ptr<RPCServicePool> _servicePool;
std::unique_ptr<vespalib::ThreadStackExecutor> _executor;
std::unique_ptr<RPCSendAdapter> _sendV1;
std::unique_ptr<RPCSendAdapter> _sendV2;
diff --git a/messagebus/src/vespa/messagebus/network/rpcservice.cpp b/messagebus/src/vespa/messagebus/network/rpcservice.cpp
index fd1b84f545f..8a7c60392bb 100644
--- a/messagebus/src/vespa/messagebus/network/rpcservice.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcservice.cpp
@@ -6,36 +6,36 @@
namespace mbus {
RPCService::RPCService(const Mirror &mirror, const string &pattern) :
- _mirror(mirror),
- _pattern(pattern),
- _addressIdx(random()),
- _addressGen(0),
- _addressList()
-{ }
+ _serviceName(),
+ _connectionSpec()
+{
+ if (pattern.find("tcp/") == 0) {
+ size_t pos = pattern.find_last_of('/');
+ if (pos != string::npos && pos < pattern.size() - 1) {
+ RPCServiceAddress test(pattern, pattern.substr(0, pos));
+ if ( ! test.isMalformed()) {
+ _serviceName = pattern;
+ _connectionSpec = pattern.substr(0, pos);
+ }
+ }
+ } else {
+ Mirror::SpecList addressList = mirror.lookup(pattern);
+ if (!addressList.empty()) {
+ assert(addressList.size() == 1);
+ const auto &entry = addressList[random() % addressList.size()];
+ _serviceName = entry.first;
+ _connectionSpec = entry.second;
+ }
+ }
+}
RPCService::~RPCService() = default;
RPCServiceAddress::UP
RPCService::resolve()
{
- if (_pattern.find("tcp/") == 0) {
- size_t pos = _pattern.find_last_of('/');
- if (pos != string::npos && pos < _pattern.size() - 1) {
- auto ret = std::make_unique<RPCServiceAddress>(_pattern, _pattern.substr(0, pos));
- if (!ret->isMalformed()) {
- return ret;
- }
- }
- } else {
- if (_addressGen != _mirror.updates()) {
- _addressGen = _mirror.updates();
- _addressList = _mirror.lookup(_pattern);
- }
- if (!_addressList.empty()) {
- _addressIdx = (_addressIdx + 1) % _addressList.size();
- const AddressList::value_type &entry = _addressList[_addressIdx];
- return std::make_unique<RPCServiceAddress>(entry.first, entry.second);
- }
+ if ( !_serviceName.empty()) {
+ return std::make_unique<RPCServiceAddress>(_serviceName, _connectionSpec);
}
return RPCServiceAddress::UP();
}
diff --git a/messagebus/src/vespa/messagebus/network/rpcservice.h b/messagebus/src/vespa/messagebus/network/rpcservice.h
index 18c847b0298..ad08470321c 100644
--- a/messagebus/src/vespa/messagebus/network/rpcservice.h
+++ b/messagebus/src/vespa/messagebus/network/rpcservice.h
@@ -16,13 +16,9 @@ class RPCNetwork;
class RPCService {
private:
typedef slobrok::api::IMirrorAPI Mirror;
- typedef Mirror::SpecList AddressList;
- const Mirror &_mirror;
- string _pattern;
- uint32_t _addressIdx;
- uint32_t _addressGen;
- AddressList _addressList;
+ string _serviceName;
+ string _connectionSpec;
public:
using UP = std::unique_ptr<RPCService>;
@@ -46,13 +42,7 @@ public:
*/
RPCServiceAddress::UP resolve();
- /**
- * Returns the pattern used when querying for the naming server for
- * addresses. This is given at construtor time.
- *
- * @return The service pattern.
- */
- const string &getPattern() const { return _pattern; }
+ bool isValid() const { return ! _connectionSpec.empty(); }
};
} // namespace mbus
diff --git a/messagebus/src/vespa/messagebus/network/rpcserviceaddress.h b/messagebus/src/vespa/messagebus/network/rpcserviceaddress.h
index 99a9f383e75..77d64517c40 100644
--- a/messagebus/src/vespa/messagebus/network/rpcserviceaddress.h
+++ b/messagebus/src/vespa/messagebus/network/rpcserviceaddress.h
@@ -33,7 +33,7 @@ public:
* @param connectionSpec The connection specification.
*/
RPCServiceAddress(const string &serviceName, const string &connectionSpec);
- ~RPCServiceAddress();
+ ~RPCServiceAddress() override;
/**
* Returns whether or not this service address is malformed.
diff --git a/messagebus/src/vespa/messagebus/network/rpcservicepool.cpp b/messagebus/src/vespa/messagebus/network/rpcservicepool.cpp
index fb40ccff62b..dd192a7a848 100644
--- a/messagebus/src/vespa/messagebus/network/rpcservicepool.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcservicepool.cpp
@@ -6,11 +6,14 @@
namespace mbus {
-RPCServicePool::RPCServicePool(RPCNetwork &net, uint32_t maxSize) :
- _net(net),
- _lru(maxSize)
+RPCServicePool::RPCServicePool(const slobrok::api::IMirrorAPI & mirror, uint32_t maxSize) :
+ _mirror(mirror),
+ _lock(),
+ _lru(std::make_unique<ServiceCache>(maxSize)),
+ _updateGen(0),
+ _maxSize(maxSize)
{
- _lru.reserve(maxSize);
+ _lru->reserve(maxSize);
assert(maxSize > 0);
}
@@ -19,27 +22,52 @@ RPCServicePool::~RPCServicePool() = default;
RPCServiceAddress::UP
RPCServicePool::resolve(const string &pattern)
{
- std::unique_ptr<RPCService> * found = _lru.findAndRef(pattern);
- if (found) {
- return (*found)->resolve();
+ std::shared_ptr<RPCService> service;
+ {
+ LockGuard guard(_lock);
+ handleMirrorUpdates(guard);
+ std::shared_ptr<RPCService> *found = _lru->findAndRef(pattern);
+ if (found) {
+ service = *found;
+ }
+ }
+
+ if (service) {
+ return service->resolve();
} else {
- auto service = std::make_unique<RPCService>(_net.getMirror(), pattern);
+ service = std::make_shared<RPCService>(_mirror, pattern);
auto result = service->resolve();
- _lru[pattern] = std::move(service);
+ if (service->isValid()) {
+ LockGuard guard(_lock);
+ (*_lru)[pattern] = std::move(service);
+ }
return result;
}
+
+}
+
+void
+RPCServicePool::handleMirrorUpdates(const LockGuard &) {
+ uint32_t currentgen = _mirror.updates();
+ if (_updateGen != currentgen) {
+ auto lru = std::make_unique<ServiceCache>(_maxSize);
+ _lru.swap(lru);
+ _updateGen = currentgen;
+ }
}
uint32_t
RPCServicePool::getSize() const
{
- return _lru.size();
+ LockGuard guard(_lock);
+ return _lru->size();
}
bool
RPCServicePool::hasService(const string &pattern) const
{
- return _lru.hasKey(pattern);
+ LockGuard guard(_lock);
+ return _lru->hasKey(pattern);
}
} // namespace mbus
diff --git a/messagebus/src/vespa/messagebus/network/rpcservicepool.h b/messagebus/src/vespa/messagebus/network/rpcservicepool.h
index 2614363838c..212c975a38c 100644
--- a/messagebus/src/vespa/messagebus/network/rpcservicepool.h
+++ b/messagebus/src/vespa/messagebus/network/rpcservicepool.h
@@ -13,12 +13,6 @@ class RPCNetwork;
* the rpc network.
*/
class RPCServicePool {
-private:
- typedef vespalib::lrucache_map< vespalib::LruParam<string, RPCService::UP> > ServiceCache;
-
- RPCNetwork &_net;
- ServiceCache _lru;
-
public:
RPCServicePool(const RPCServicePool &) = delete;
RPCServicePool & operator = (const RPCServicePool &) = delete;
@@ -28,7 +22,7 @@ public:
* @param net The underlying RPC network.
* @param maxSize The max number of services to cache.
*/
- RPCServicePool(RPCNetwork &net, uint32_t maxSize);
+ RPCServicePool(const slobrok::api::IMirrorAPI & mirror, uint32_t maxSize);
/**
* Destructor. Frees any allocated resources.
@@ -61,6 +55,17 @@ public:
* @return True if a corresponding service is in the pool.
*/
bool hasService(const string &pattern) const;
+private:
+ using ServiceCache = vespalib::lrucache_map< vespalib::LruParam<string, std::shared_ptr<RPCService> >>;
+ using LockGuard = std::lock_guard<std::mutex>;
+
+ void handleMirrorUpdates(const LockGuard & guard);
+
+ const slobrok::api::IMirrorAPI & _mirror;
+ mutable std::mutex _lock;
+ std::unique_ptr<ServiceCache> _lru;
+ uint32_t _updateGen;
+ uint32_t _maxSize;
};
} // namespace mbus
diff --git a/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp b/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp
index cc09e44c460..b42ac47e54d 100644
--- a/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp
@@ -32,7 +32,7 @@ void
RPCTargetPool::flushTargets(bool force)
{
uint64_t currentTime = _timer->getMilliTime();
- vespalib::LockGuard guard(_lock);
+ LockGuard guard(_lock);
TargetMap::iterator it = _targets.begin();
while (it != _targets.end()) {
Entry &entry = it->second;
@@ -56,7 +56,7 @@ RPCTargetPool::flushTargets(bool force)
size_t
RPCTargetPool::size()
{
- vespalib::LockGuard guard(_lock);
+ LockGuard guard(_lock);
return _targets.size();
}
@@ -65,7 +65,7 @@ RPCTargetPool::getTarget(FRT_Supervisor &orb, const RPCServiceAddress &address)
{
const string & spec = address.getConnectionSpec();
uint64_t currentTime = _timer->getMilliTime();
- vespalib::LockGuard guard(_lock);
+ LockGuard guard(_lock);
auto it = _targets.find(spec);
if (it != _targets.end()) {
Entry &entry = it->second;
diff --git a/messagebus/src/vespa/messagebus/network/rpctargetpool.h b/messagebus/src/vespa/messagebus/network/rpctargetpool.h
index 5f858f66993..d47fd977356 100644
--- a/messagebus/src/vespa/messagebus/network/rpctargetpool.h
+++ b/messagebus/src/vespa/messagebus/network/rpctargetpool.h
@@ -28,9 +28,10 @@ private:
Entry(RPCTarget::SP target, uint64_t lastUse);
};
- typedef std::map<string, Entry> TargetMap;
+ using TargetMap = std::map<string, Entry>;
+ using LockGuard = std::lock_guard<std::mutex>;
- vespalib::Lock _lock;
+ std::mutex _lock;
TargetMap _targets;
ITimer::UP _timer;
uint64_t _expireMillis;
diff --git a/slobrok/src/tests/configure/configure.cpp b/slobrok/src/tests/configure/configure.cpp
index 579e468db45..aa9826045ef 100644
--- a/slobrok/src/tests/configure/configure.cpp
+++ b/slobrok/src/tests/configure/configure.cpp
@@ -23,9 +23,6 @@ using slobrok::ConfigShim;
using slobrok::SlobrokServer;
using slobrok::ConfiguratorFactory;
-TEST_SETUP(Test);
-
-
std::string
createSpec(int port)
{
@@ -93,10 +90,7 @@ compare(MirrorAPI &api, const char *pattern, SpecList expect)
return false;
}
-int
-Test::Main()
-{
- TEST_INIT("configure_test");
+TEST("configure_test") {
fnet::frt::StandaloneFRT orb1;
fnet::frt::StandaloneFRT orb2;
@@ -214,10 +208,10 @@ Test::Main()
serverOne.stop();
serverTwo.stop();
- TEST_DONE();
-
orb4.supervisor().GetTransport()->ShutDown(true);
orb3.supervisor().GetTransport()->ShutDown(true);
orb2.supervisor().GetTransport()->ShutDown(true);
orb1.supervisor().GetTransport()->ShutDown(true);
}
+
+TEST_MAIN() { TEST_RUN_ALL(); } \ No newline at end of file