summaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2017-10-16 23:13:46 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2017-10-16 23:13:46 +0200
commitb6f37867a3de0426b0c865169230e7b3dd1013de (patch)
treeacb280ef69307ec794f8ebb605f547eacbe8532f /messagebus
parent879f08be533050935226998773fd1e00c0668d7c (diff)
Remove OSS from c++
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/src/tests/CMakeLists.txt1
-rw-r--r--messagebus/src/tests/oos/.gitignore4
-rw-r--r--messagebus/src/tests/oos/CMakeLists.txt9
-rw-r--r--messagebus/src/tests/oos/DESC1
-rw-r--r--messagebus/src/tests/oos/FILES1
-rw-r--r--messagebus/src/tests/oos/oos.cpp228
-rw-r--r--messagebus/src/vespa/messagebus/network/CMakeLists.txt2
-rw-r--r--messagebus/src/vespa/messagebus/network/oosclient.cpp109
-rw-r--r--messagebus/src/vespa/messagebus/network/oosclient.h126
-rw-r--r--messagebus/src/vespa/messagebus/network/oosmanager.cpp101
-rw-r--r--messagebus/src/vespa/messagebus/network/oosmanager.h90
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.cpp37
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.h10
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp1
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetworkparams.h21
-rw-r--r--messagebus/src/vespa/messagebus/testlib/CMakeLists.txt2
-rw-r--r--messagebus/src/vespa/messagebus/testlib/oosserver.cpp80
-rw-r--r--messagebus/src/vespa/messagebus/testlib/oosserver.h39
-rw-r--r--messagebus/src/vespa/messagebus/testlib/oosstate.cpp30
-rw-r--r--messagebus/src/vespa/messagebus/testlib/oosstate.h27
-rw-r--r--messagebus/src/vespa/messagebus/testlib/testserver.cpp32
-rw-r--r--messagebus/src/vespa/messagebus/testlib/testserver.h12
22 files changed, 17 insertions, 946 deletions
diff --git a/messagebus/src/tests/CMakeLists.txt b/messagebus/src/tests/CMakeLists.txt
index 2f315fda133..cb2a403f55d 100644
--- a/messagebus/src/tests/CMakeLists.txt
+++ b/messagebus/src/tests/CMakeLists.txt
@@ -13,7 +13,6 @@ add_subdirectory(loadbalance)
add_subdirectory(messagebus)
add_subdirectory(messageordering)
add_subdirectory(messenger)
-add_subdirectory(oos)
add_subdirectory(protocolrepository)
add_subdirectory(queue)
add_subdirectory(replygate)
diff --git a/messagebus/src/tests/oos/.gitignore b/messagebus/src/tests/oos/.gitignore
deleted file mode 100644
index a4771a9176b..00000000000
--- a/messagebus/src/tests/oos/.gitignore
+++ /dev/null
@@ -1,4 +0,0 @@
-.depend
-Makefile
-oos_test
-messagebus_oos_test_app
diff --git a/messagebus/src/tests/oos/CMakeLists.txt b/messagebus/src/tests/oos/CMakeLists.txt
deleted file mode 100644
index 9fd8e198c58..00000000000
--- a/messagebus/src/tests/oos/CMakeLists.txt
+++ /dev/null
@@ -1,9 +0,0 @@
-# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-vespa_add_executable(messagebus_oos_test_app TEST
- SOURCES
- oos.cpp
- DEPENDS
- messagebus_messagebus-test
- messagebus
-)
-vespa_add_test(NAME messagebus_oos_test_app NO_VALGRIND COMMAND messagebus_oos_test_app)
diff --git a/messagebus/src/tests/oos/DESC b/messagebus/src/tests/oos/DESC
deleted file mode 100644
index 16cd7a2f30d..00000000000
--- a/messagebus/src/tests/oos/DESC
+++ /dev/null
@@ -1 +0,0 @@
-oos test. Take a look at oos.cpp for details.
diff --git a/messagebus/src/tests/oos/FILES b/messagebus/src/tests/oos/FILES
deleted file mode 100644
index 08cf509e1fd..00000000000
--- a/messagebus/src/tests/oos/FILES
+++ /dev/null
@@ -1 +0,0 @@
-oos.cpp
diff --git a/messagebus/src/tests/oos/oos.cpp b/messagebus/src/tests/oos/oos.cpp
deleted file mode 100644
index 7606c291877..00000000000
--- a/messagebus/src/tests/oos/oos.cpp
+++ /dev/null
@@ -1,228 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include <vespa/messagebus/destinationsession.h>
-#include <vespa/messagebus/errorcode.h>
-#include <vespa/messagebus/intermediatesession.h>
-#include <vespa/messagebus/messagebus.h>
-#include <vespa/messagebus/routablequeue.h>
-#include <vespa/messagebus/routing/routingspec.h>
-#include <vespa/messagebus/sourcesession.h>
-#include <vespa/messagebus/sourcesessionparams.h>
-#include <vespa/messagebus/testlib/oosserver.h>
-#include <vespa/messagebus/testlib/receptor.h>
-#include <vespa/messagebus/testlib/simplemessage.h>
-#include <vespa/messagebus/testlib/simpleprotocol.h>
-#include <vespa/messagebus/testlib/simplereply.h>
-#include <vespa/messagebus/testlib/slobrok.h>
-#include <vespa/messagebus/testlib/testserver.h>
-#include <vespa/vespalib/testkit/testapp.h>
-
-using namespace mbus;
-
-struct Handler : public IMessageHandler
-{
- DestinationSession::UP session;
- Handler(MessageBus &mb) : session() {
- session = mb.createDestinationSession("session", true, *this);
- }
- ~Handler() {
- session.reset();
- }
- void handleMessage(Message::UP msg) override {
- session->acknowledge(std::move(msg));
- }
-};
-
-
-class Test : public vespalib::TestApp {
-private:
- SourceSession::UP _session;
- RoutableQueue _handler;
-
- bool checkError(const string &dst, uint32_t error);
-
-public:
- Test();
- ~Test();
- int Main() override;
-};
-
-TEST_APPHOOK(Test);
-
-Test::Test() :
- _session(),
- _handler()
-{}
-
-Test::~Test() {}
-bool
-Test::checkError(const string &dst, uint32_t error)
-{
- if (!EXPECT_TRUE(_session.get() != NULL)) {
- return false;
- }
- Message::UP msg(new SimpleMessage("msg"));
- msg->getTrace().setLevel(9);
- if (!EXPECT_TRUE(_session->send(std::move(msg), Route::parse(dst)).isAccepted())) {
- return false;
- }
- Routable::UP reply = _handler.dequeue(10000);
- if (!EXPECT_TRUE(reply.get() != NULL)) {
- return false;
- }
- if (!EXPECT_TRUE(reply->isReply())) {
- return false;
- }
- Reply &ref = static_cast<Reply&>(*reply);
- printf("%s", ref.getTrace().toString().c_str());
- if (error == ErrorCode::NONE) {
- if (!EXPECT_TRUE(!ref.hasErrors())) {
- return false;
- }
- } else {
- if (!EXPECT_TRUE(ref.hasErrors())) {
- return false;
- }
- if (!EXPECT_EQUAL(error, ref.getError(0).getCode())) {
- return false;
- }
- }
- return true;
-}
-
-int
-Test::Main()
-{
- TEST_INIT("oos_test");
-
- Slobrok slobrok;
- TestServer src(Identity(""), RoutingSpec(), slobrok, "oos/*");
- TestServer dst1(Identity("dst1"), RoutingSpec(), slobrok);
- TestServer dst2(Identity("dst2"), RoutingSpec(), slobrok);
- TestServer dst3(Identity("dst3"), RoutingSpec(), slobrok);
- TestServer dst4(Identity("dst4"), RoutingSpec(), slobrok);
- TestServer dst5(Identity("dst5"), RoutingSpec(), slobrok);
- Handler h1(dst1.mb);
- Handler h2(dst2.mb);
- Handler h3(dst3.mb);
- Handler h4(dst4.mb);
- Handler h5(dst5.mb);
- EXPECT_TRUE(src.waitSlobrok("*/session", 5));
-
- _session = src.mb.createSourceSession(_handler);
- EXPECT_TRUE(checkError("dst1/session", ErrorCode::NONE));
- EXPECT_TRUE(checkError("dst2/session", ErrorCode::NONE));
- EXPECT_TRUE(checkError("dst3/session", ErrorCode::NONE));
- EXPECT_TRUE(checkError("dst4/session", ErrorCode::NONE));
- EXPECT_TRUE(checkError("dst5/session", ErrorCode::NONE));
- TEST_FLUSH();
- OOSServer oosServer(slobrok, "oos/1", OOSState()
- .add("dst2/session")
- .add("dst3/session"));
- EXPECT_TRUE(src.waitSlobrok("oos/*", 1));
- EXPECT_TRUE(src.waitState(OOSState()
- .add("dst2/session")
- .add("dst3/session")));
- EXPECT_TRUE(checkError("dst1/session", ErrorCode::NONE)); // test 9
- EXPECT_TRUE(checkError("dst2/session", ErrorCode::SERVICE_OOS)); // return without reply?!?
- EXPECT_TRUE(checkError("dst3/session", ErrorCode::SERVICE_OOS));
- EXPECT_TRUE(checkError("dst4/session", ErrorCode::NONE));
- EXPECT_TRUE(checkError("dst5/session", ErrorCode::NONE));
- TEST_FLUSH();
- oosServer.setState(OOSState()
- .add("dst2/session"));
- EXPECT_TRUE(src.waitState(OOSState()
- .add("dst2/session", true)
- .add("dst3/session", false)));
- EXPECT_TRUE(checkError("dst1/session", ErrorCode::NONE));
- EXPECT_TRUE(checkError("dst2/session", ErrorCode::SERVICE_OOS));
- EXPECT_TRUE(checkError("dst3/session", ErrorCode::NONE));
- EXPECT_TRUE(checkError("dst4/session", ErrorCode::NONE));
- EXPECT_TRUE(checkError("dst5/session", ErrorCode::NONE));
- TEST_FLUSH();
- {
- OOSServer oosServer2(slobrok, "oos/2", OOSState()
- .add("dst4/session")
- .add("dst5/session"));
- EXPECT_TRUE(src.waitSlobrok("oos/*", 2));
- EXPECT_TRUE(src.waitState(OOSState()
- .add("dst2/session")
- .add("dst4/session")
- .add("dst5/session")));
- EXPECT_TRUE(checkError("dst1/session", ErrorCode::NONE));
- EXPECT_TRUE(checkError("dst2/session", ErrorCode::SERVICE_OOS));
- EXPECT_TRUE(checkError("dst3/session", ErrorCode::NONE));
- EXPECT_TRUE(checkError("dst4/session", ErrorCode::SERVICE_OOS));
- EXPECT_TRUE(checkError("dst5/session", ErrorCode::SERVICE_OOS));
- TEST_FLUSH();
- }
- EXPECT_TRUE(src.waitSlobrok("oos/*", 1));
- EXPECT_TRUE(src.waitState(OOSState()
- .add("dst1/session", false)
- .add("dst2/session", true)
- .add("dst3/session", false)
- .add("dst4/session", false)
- .add("dst5/session", false)));
- EXPECT_TRUE(checkError("dst1/session", ErrorCode::NONE));
- EXPECT_TRUE(checkError("dst2/session", ErrorCode::SERVICE_OOS));
- EXPECT_TRUE(checkError("dst3/session", ErrorCode::NONE));
- EXPECT_TRUE(checkError("dst4/session", ErrorCode::NONE));
- EXPECT_TRUE(checkError("dst5/session", ErrorCode::NONE));
- TEST_FLUSH();
- {
- OOSServer oosServer3(slobrok, "oos/3", OOSState()
- .add("dst2/session")
- .add("dst4/session"));
- OOSServer oosServer4(slobrok, "oos/4", OOSState()
- .add("dst2/session")
- .add("dst3/session")
- .add("dst5/session"));
- EXPECT_TRUE(src.waitSlobrok("oos/*", 3));
- EXPECT_TRUE(src.waitState(OOSState()
- .add("dst2/session")
- .add("dst3/session")
- .add("dst4/session")
- .add("dst5/session")));
- EXPECT_TRUE(checkError("dst1/session", ErrorCode::NONE));
- EXPECT_TRUE(checkError("dst2/session", ErrorCode::SERVICE_OOS));
- EXPECT_TRUE(checkError("dst3/session", ErrorCode::SERVICE_OOS));
- EXPECT_TRUE(checkError("dst4/session", ErrorCode::SERVICE_OOS));
- EXPECT_TRUE(checkError("dst5/session", ErrorCode::SERVICE_OOS));
- TEST_FLUSH();
- oosServer3.setState(OOSState()
- .add("dst2/session"));
- oosServer4.setState(OOSState()
- .add("dst1/session"));
- EXPECT_TRUE(src.waitState(OOSState()
- .add("dst1/session", true)
- .add("dst2/session", true)
- .add("dst3/session", false)
- .add("dst4/session", false)
- .add("dst5/session", false)));
- EXPECT_TRUE(checkError("dst1/session", ErrorCode::SERVICE_OOS));
- EXPECT_TRUE(checkError("dst2/session", ErrorCode::SERVICE_OOS));
- EXPECT_TRUE(checkError("dst3/session", ErrorCode::NONE));
- EXPECT_TRUE(checkError("dst4/session", ErrorCode::NONE));
- EXPECT_TRUE(checkError("dst5/session", ErrorCode::NONE));
- TEST_FLUSH();
- }
- EXPECT_TRUE(src.waitSlobrok("oos/*", 1));
- EXPECT_TRUE(src.waitState(OOSState()
- .add("dst1/session", false)
- .add("dst2/session", true)
- .add("dst3/session", false)
- .add("dst4/session", false)
- .add("dst5/session", false)));
- EXPECT_TRUE(checkError("dst1/session", ErrorCode::NONE));
- EXPECT_TRUE(checkError("dst2/session", ErrorCode::SERVICE_OOS));
- EXPECT_TRUE(checkError("dst3/session", ErrorCode::NONE));
- EXPECT_TRUE(checkError("dst4/session", ErrorCode::NONE));
- EXPECT_TRUE(checkError("dst5/session", ErrorCode::NONE));
-
- h2.session.reset();
- EXPECT_TRUE(src.waitSlobrok("*/session", 4));
- EXPECT_TRUE(checkError("dst2/session", ErrorCode::SERVICE_OOS));
-
- _session.reset();
- TEST_DONE();
-}
diff --git a/messagebus/src/vespa/messagebus/network/CMakeLists.txt b/messagebus/src/vespa/messagebus/network/CMakeLists.txt
index 750ff20240f..4c8e146b8eb 100644
--- a/messagebus/src/vespa/messagebus/network/CMakeLists.txt
+++ b/messagebus/src/vespa/messagebus/network/CMakeLists.txt
@@ -2,8 +2,6 @@
vespa_add_library(messagebus_network OBJECT
SOURCES
identity.cpp
- oosclient.cpp
- oosmanager.cpp
rpcnetwork.cpp
rpcnetworkparams.cpp
rpcsend.cpp
diff --git a/messagebus/src/vespa/messagebus/network/oosclient.cpp b/messagebus/src/vespa/messagebus/network/oosclient.cpp
deleted file mode 100644
index d9f67101ea2..00000000000
--- a/messagebus/src/vespa/messagebus/network/oosclient.cpp
+++ /dev/null
@@ -1,109 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "oosclient.h"
-#include <vespa/fnet/frt/supervisor.h>
-
-namespace mbus {
-
-void
-OOSClient::handleReply()
-{
- if (!_req->CheckReturnTypes("Si")) {
- _target->SubRef();
- _target = 0;
- Schedule(1.0);
- return;
- }
- FRT_Values &ret = *(_req->GetReturn());
- uint32_t retGen = ret[1]._intval32;
- if (_reqGen != retGen) {
- StringList oos;
- uint32_t numNames = ret[0]._string_array._len;
- FRT_StringValue *names = ret[0]._string_array._pt;
- for (uint32_t idx = 0; idx < numNames; ++idx) {
- oos.push_back(string(names[idx]._str));
- }
- _oosList.swap(oos);
- _reqGen = retGen;
- _listGen = retGen;
- }
- Schedule(0.1);
-}
-
-void
-OOSClient::handleConnect()
-{
- if (_target == 0) {
- _target = _orb.GetTarget(_spec.c_str());
- _reqGen = 0;
- }
-}
-
-void
-OOSClient::handleInvoke()
-{
- assert(_target != 0);
- _req = _orb.AllocRPCRequest(_req);
- _req->SetMethodName("fleet.getOOSList");
- _req->GetParams()->AddInt32(_reqGen); // gencnt
- _req->GetParams()->AddInt32(60000); // mstimeout
- _target->InvokeAsync(_req, 70.0, this);
-}
-
-void
-OOSClient::PerformTask()
-{
- if (_reqDone) {
- _reqDone = false;
- handleReply();
- return;
- }
- handleConnect();
- handleInvoke();
-}
-
-void
-OOSClient::RequestDone(FRT_RPCRequest *req)
-{
- assert(req == _req && !_reqDone);
- (void) req;
- _reqDone = true;
- ScheduleNow();
-}
-
-OOSClient::OOSClient(FRT_Supervisor &orb,
- const string &mySpec)
- : FNET_Task(orb.GetScheduler()),
- _orb(orb),
- _spec(mySpec),
- _oosList(),
- _reqGen(0),
- _listGen(0),
- _dumpGen(0),
- _reqDone(false),
- _target(0),
- _req(0)
-{
- ScheduleNow();
-}
-
-OOSClient::~OOSClient()
-{
- Kill();
- if (_req != 0) {
- _req->Abort();
- _req->SubRef();
- }
- if (_target != 0) {
- _target->SubRef();
- }
-}
-
-void
-OOSClient::dumpState(StringSet &dst)
-{
- dst.insert(_oosList.begin(), _oosList.end());
- _dumpGen = _listGen;
-}
-
-} // namespace mbus
diff --git a/messagebus/src/vespa/messagebus/network/oosclient.h b/messagebus/src/vespa/messagebus/network/oosclient.h
deleted file mode 100644
index 80cf6015783..00000000000
--- a/messagebus/src/vespa/messagebus/network/oosclient.h
+++ /dev/null
@@ -1,126 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#pragma once
-
-#include <vespa/fnet/frt/invoker.h>
-#include <vespa/fnet/frt/target.h>
-#include <vespa/messagebus/common.h>
-#include <vector>
-#include <set>
-
-namespace mbus {
-
-/**
- * This class keeps track of OOS information obtained from a single
- * server. This class is used by the OOSManager class. Note that since
- * this class is only used inside the transport thread it has no
- * synchronization. Using it directly will lead to race conditions and
- * possible crashes.
- **/
-class OOSClient : public FNET_Task,
- public FRT_IRequestWait
-{
-private:
- typedef std::vector<string> StringList;
-
- FRT_Supervisor &_orb;
- string _spec;
- StringList _oosList;
- uint32_t _reqGen; // server gen used for request
- uint32_t _listGen; // server gen of the oosList
- uint32_t _dumpGen; // server gen used for the last dump
- bool _reqDone;
- FRT_Target *_target;
- FRT_RPCRequest *_req;
-
- OOSClient(const OOSClient &);
- OOSClient &operator=(const OOSClient &);
-
- /**
- * Handle a server reply.
- **/
- void handleReply();
-
- /**
- * Handle server (re)connect.
- **/
- void handleConnect();
-
- /**
- * Handle server invocation.
- **/
- void handleInvoke();
-
- /**
- * From FNET_Task, performs overall server poll logic.
- **/
- void PerformTask() override;
-
- /**
- * From FRT_IRequestWait, picks up server replies.
- *
- * @param req the request that has completed
- **/
- void RequestDone(FRT_RPCRequest *req) override;
-
-public:
- /**
- * Data structure used to aggregate OOS information
- **/
- typedef std::set<string> StringSet;
-
- /**
- * Convenience typedef for a shared pointer to a OOSClient object.
- **/
- typedef std::shared_ptr<OOSClient> SP;
-
- /**
- * Create a new OOSClient polling oos information from the given
- * server.
- *
- * @param orb object used for RPC operations
- * @param spec fnet connect spec for oos server
- **/
- OOSClient(FRT_Supervisor &orb, const string &spec);
-
- /**
- * Destructor.
- **/
- virtual ~OOSClient();
-
- /**
- * Obtain the connect spec of the OOS server this client is
- * talking to.
- *
- * @return OOS server connect spec
- **/
- const string &getSpec() const { return _spec; }
-
- /**
- * Check if this client has changed. A client has changed if it
- * has obtain now information after the dumpState method was last
- * invoked.
- *
- * @return true is this client has changed
- **/
- bool isChanged() const { return (_listGen != _dumpGen); }
-
- /**
- * Returns whether or not this client has receieved any reply
- * at all from the server it is connected to.
- *
- * @return True if initial request has returned.
- */
- bool isReady() const { return _listGen != 0; }
-
- /**
- * Dump the current oos information known by this client into the
- * given string set.
- *
- * @param dst object used to aggregate oos information
- **/
- void dumpState(StringSet &dst);
-};
-
-} // namespace mbus
-
diff --git a/messagebus/src/vespa/messagebus/network/oosmanager.cpp b/messagebus/src/vespa/messagebus/network/oosmanager.cpp
deleted file mode 100644
index 250df147675..00000000000
--- a/messagebus/src/vespa/messagebus/network/oosmanager.cpp
+++ /dev/null
@@ -1,101 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include "oosmanager.h"
-#include "rpcnetwork.h"
-#include <algorithm>
-#include <vespa/fnet/frt/frt.h>
-
-namespace mbus {
-
-OOSClient::SP
-OOSManager::getClient(const string &spec)
-{
- for (uint32_t i = 0; i < _clients.size(); ++i) {
- if (_clients[i]->getSpec() == spec) {
- return _clients[i];
- }
- }
- return OOSClient::SP(new OOSClient(_orb, spec));
-}
-
-void
-OOSManager::PerformTask()
-{
- bool changed = false;
- if (_slobrokGen != _mirror.updates()) {
- _slobrokGen = _mirror.updates();
- SpecList newServices = _mirror.lookup(_servicePattern);
- std::sort(newServices.begin(), newServices.end());
- if (newServices != _services) {
- ClientList newClients;
- for (uint32_t i = 0; i < newServices.size(); ++i) {
- newClients.push_back(getClient(newServices[i].second));
- }
- _services.swap(newServices);
- _clients.swap(newClients);
- changed = true;
- }
- }
- bool allOk = _mirror.ready();
- for (uint32_t i = 0; i < _clients.size(); ++i) {
- if (_clients[i]->isChanged()) {
- changed = true;
- }
- if (!_clients[i]->isReady()) {
- allOk = false;
- }
- }
- if (changed) {
- OOSSet oos(new StringSet());
- for (uint32_t i = 0; i < _clients.size(); ++i) {
- _clients[i]->dumpState(*oos);
- }
- vespalib::LockGuard guard(_lock);
- _oosSet.swap(oos);
- }
- if (allOk && !_ready) {
- _ready = true;
- }
- Schedule(_ready ? 1.0 : 0.1);
-}
-
-OOSManager::OOSManager(FRT_Supervisor &orb,
- IMirrorAPI &mirror,
- const string &servicePattern)
- : FNET_Task(orb.GetScheduler()),
- _orb(orb),
- _mirror(mirror),
- _disabled(servicePattern.empty()),
- _ready(_disabled),
- _lock("mbus::OOSManager::_lock", false),
- _servicePattern(servicePattern),
- _slobrokGen(0),
- _clients(),
- _oosSet()
-{
- if (!_disabled) {
- ScheduleNow();
- }
-}
-
-OOSManager::~OOSManager()
-{
- Kill();
-}
-
-bool
-OOSManager::isOOS(const string &service)
-{
- if (_disabled) {
- return false;
- }
- vespalib::LockGuard guard(_lock);
- if (_oosSet.get() == nullptr) {
- return false;
- }
- if (_oosSet->find(service) == _oosSet->end()) {
- return false;
- }
- return true;
-}
-
-} // namespace mbus
diff --git a/messagebus/src/vespa/messagebus/network/oosmanager.h b/messagebus/src/vespa/messagebus/network/oosmanager.h
deleted file mode 100644
index eac00b93896..00000000000
--- a/messagebus/src/vespa/messagebus/network/oosmanager.h
+++ /dev/null
@@ -1,90 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#pragma once
-
-#include "oosclient.h"
-#include <vespa/fnet/task.h>
-#include <vespa/slobrok/imirrorapi.h>
-#include <vespa/vespalib/util/sync.h>
-#include <set>
-
-class FRT_Supervisor;
-
-namespace mbus {
-
-class RPCNetwork;
-
-/**
- * This class keeps track of OOS information. A set of servers having OOS information are identified by looking up a
- * service pattern in the slobrok. These servers are then polled for information. The information is compiled into a
- * local repository for fast lookup.
- */
-class OOSManager : public FNET_Task {
-public:
- using IMirrorAPI = slobrok::api::IMirrorAPI;
- using SpecList = IMirrorAPI::SpecList;
- using ClientList = std::vector<OOSClient::SP>;
- using StringSet = std::set<string>;
- using OOSSet = std::shared_ptr<StringSet>;
-
-private:
- FRT_Supervisor &_orb;
- IMirrorAPI &_mirror;
- bool _disabled;
- bool _ready;
- vespalib::Lock _lock;
- string _servicePattern;
- uint32_t _slobrokGen;
- SpecList _services;
- ClientList _clients;
- OOSSet _oosSet;
-
- /**
- * Reuse or create a client against the given server.
- *
- * @param spec The connection spec of the OOS server we want to talk to.
- * @return A shared oosclient object.
- */
- OOSClient::SP getClient(const string &spec);
-
- /**
- * Method invoked when this object is run as a task. This method will update the oos information held by
- * this object.
- */
- void PerformTask() override;
-
-public:
- /**
- * Create a new OOSManager. The given service pattern will be looked up in the given slobrok mirror. The
- * resulting set of services will be polled for oos information.
- *
- * @param orb The supervisor used for RPC operations.
- * @param mirror The slobrok mirror.
- * @param servicePattern The service pattern for oos servers.
- */
- OOSManager(FRT_Supervisor &orb,
- IMirrorAPI &mirror,
- const string &servicePattern);
-
- /**
- * Destructor.
- */
- virtual ~OOSManager();
-
- /**
- * Returns whether or not some initial state has been returned.
- *
- * @return True, if initial state has been found.
- */
- bool isReady() const { return _ready; }
-
- /**
- * Returns whether or not the given service has been marked as out of service.
- *
- * @param service The service to check.
- * @return True if the service is out of service.
- */
- bool isOOS(const string &service);
-};
-
-} // namespace mbus
-
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
index 8ff7ac87edc..fae8d73d540 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
@@ -1,7 +1,6 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "rpcnetwork.h"
#include "rpcservicepool.h"
-#include "oosmanager.h"
#include "rpcsendv1.h"
#include "rpcsendv2.h"
#include "rpctargetpool.h"
@@ -19,11 +18,13 @@
#include <vespa/fnet/scheduler.h>
#include <vespa/fnet/transport.h>
#include <vespa/fnet/frt/supervisor.h>
+#include <thread>
#include <vespa/log/log.h>
LOG_SETUP(".rpcnetwork");
using vespalib::make_string;
+using namespace std::chrono_literals;
namespace {
@@ -44,7 +45,7 @@ public:
_gate() {
ScheduleNow();
}
- ~SyncTask() {}
+ ~SyncTask() = default;
void await() {
_gate.await();
@@ -106,7 +107,7 @@ RPCNetwork::TargetPoolTask::PerformTask()
}
RPCNetwork::RPCNetwork(const RPCNetworkParams &params) :
- _owner(0),
+ _owner(nullptr),
_ident(params.getIdentity()),
_threadPool(std::make_unique<FastOS_ThreadPool>(128000, 0)),
_transport(std::make_unique<FNET_Transport>()),
@@ -118,7 +119,6 @@ RPCNetwork::RPCNetwork(const RPCNetworkParams &params) :
_slobrokCfgFactory(std::make_unique<slobrok::ConfiguratorFactory>(params.getSlobrokConfig())),
_mirror(std::make_unique<slobrok::api::MirrorAPI>(*_orb, *_slobrokCfgFactory)),
_regAPI(std::make_unique<slobrok::api::RegisterAPI>(*_orb, *_slobrokCfgFactory)),
- _oosManager(std::make_unique<OOSManager>(*_orb, *_mirror, params.getOOSServerPattern())),
_requestedPort(params.getListenPort()),
_executor(std::make_unique<vespalib::ThreadStackExecutor>(4,65536)),
_sendV1(std::make_unique<RPCSendV1>()),
@@ -177,7 +177,7 @@ RPCNetwork::getVersion() const
void
RPCNetwork::attach(INetworkOwner &owner)
{
- LOG_ASSERT(_owner == 0);
+ LOG_ASSERT(_owner == nullptr);
_owner = &owner;
_sendV1->attach(*this);
@@ -239,19 +239,16 @@ RPCNetwork::waitUntilReady(double seconds) const
if (configurator->poll()) {
hasConfig = true;
}
- if (_mirror->ready() && _oosManager->isReady()) {
+ if (_mirror->ready()) {
return true;
}
- FastOS_Thread::Sleep(10);
+ std::this_thread::sleep_for(10ms);
}
if (! hasConfig) {
LOG(error, "failed to get config for slobroks in %d seconds", (int)seconds);
} else if (! _mirror->ready()) {
- std::string brokers = brokerList.logString();
- LOG(error, "mirror (of %s) failed to become ready in %d seconds",
- brokers.c_str(), (int)seconds);
- } else if (! _oosManager->isReady()) {
- LOG(error, "OOS manager failed to become ready in %d seconds", (int)seconds);
+ auto brokers = brokerList.logString();
+ LOG(error, "mirror (of %s) failed to become ready in %d seconds", brokers.c_str(), (int)seconds);
}
return false;
}
@@ -259,9 +256,8 @@ RPCNetwork::waitUntilReady(double seconds) const
void
RPCNetwork::registerSession(const string &session)
{
- if (_ident.getServicePrefix().size() == 0) {
- LOG(warning, "The session (%s) will not be registered"
- "in the Slobrok since this network has no identity.",
+ if (_ident.getServicePrefix().empty()) {
+ LOG(warning, "The session (%s) will not be registered in the Slobrok since this network has no identity.",
session.c_str());
return;
}
@@ -274,7 +270,7 @@ RPCNetwork::registerSession(const string &session)
void
RPCNetwork::unregisterSession(const string &session)
{
- if (_ident.getServicePrefix().size() == 0) {
+ if (_ident.getServicePrefix().empty()) {
return;
}
string name = _ident.getServicePrefix();
@@ -299,13 +295,8 @@ RPCNetwork::allocServiceAddress(RoutingNode &recipient)
Error
RPCNetwork::resolveServiceAddress(RoutingNode &recipient, const string &serviceName)
{
- if (_oosManager->isOOS(serviceName)) {
- return Error(ErrorCode::SERVICE_OOS,
- make_string("The service '%s' has been marked as out of service.",
- serviceName.c_str()));
- }
RPCServiceAddress::UP ret = _servicePool->resolve(serviceName);
- if (ret.get() == nullptr) {
+ if ( ! ret) {
return Error(ErrorCode::NO_ADDRESS_FOR_SERVICE,
make_string("The address of service '%s' could not be resolved. It is not currently "
"registered with the Vespa name server. "
@@ -313,7 +304,7 @@ RPCNetwork::resolveServiceAddress(RoutingNode &recipient, const string &serviceN
serviceName.c_str()));
}
RPCTarget::SP target = _targetPool->getTarget(*_orb, *ret);
- if (target.get() == nullptr) {
+ if ( ! target) {
return Error(ErrorCode::CONNECTION_ERROR,
make_string("Failed to connect to service '%s'.", serviceName.c_str()));
}
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.h b/messagebus/src/vespa/messagebus/network/rpcnetwork.h
index 13fab018c3b..e29d01c8b04 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.h
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.h
@@ -23,7 +23,6 @@ namespace slobrok {
namespace mbus {
-class OOSManager;
class RPCServicePool;
class RPCTargetPool;
class RPCNetworkParams;
@@ -72,7 +71,6 @@ private:
std::unique_ptr<slobrok::ConfiguratorFactory> _slobrokCfgFactory;
std::unique_ptr<slobrok::api::IMirrorAPI> _mirror;
std::unique_ptr<slobrok::api::RegisterAPI> _regAPI;
- std::unique_ptr<OOSManager> _oosManager;
int _requestedPort;
std::unique_ptr<vespalib::ThreadStackExecutor> _executor;
std::unique_ptr<RPCSendAdapter> _sendV1;
@@ -187,14 +185,6 @@ public:
FNET_Scheduler &getScheduler() { return _scheduler; }
/**
- * Obtain a reference to the internal OOS manager object. This will be
- * mostly used for testing.
- *
- * @return internal OOS manager
- **/
- OOSManager &getOOSManager() { return *_oosManager; }
-
- /**
* Obtain a reference to the internal supervisor. This is used by
* the request adapters to register FRT methods.
*
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp
index df35d51cb54..2ea18b952d4 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp
@@ -7,7 +7,6 @@ namespace mbus {
RPCNetworkParams::RPCNetworkParams() :
_identity(Identity("")),
_slobrokConfig("admin/slobrok.0"),
- _oosServerPattern(""),
_listenPort(0),
_maxInputBufferSize(256*1024),
_maxOutputBufferSize(256*1024),
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h
index bfc624a6523..0a4ed806c27 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h
+++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h
@@ -16,7 +16,6 @@ private:
using CompressionConfig = vespalib::compression::CompressionConfig;
Identity _identity;
config::ConfigUri _slobrokConfig;
- string _oosServerPattern;
int _listenPort;
uint32_t _maxInputBufferSize;
uint32_t _maxOutputBufferSize;
@@ -78,26 +77,6 @@ public:
}
/**
- * Returns the config id pattern used to lookup OOS servers.
- *
- * @return The config id.
- */
- const string &getOOSServerPattern() const {
- return _oosServerPattern;
- }
-
- /**
- * Sets the config id pattern used to lookup OOS servers.
- *
- * @param oosServerPattern The server pattern.
- * @return This, to allow chaining.
- */
- RPCNetworkParams &setOOSServerPattern(const string &oosServerPattern) {
- _oosServerPattern = oosServerPattern;
- return *this;
- }
-
- /**
* Returns the port to listen to.
*
* @return The port.
diff --git a/messagebus/src/vespa/messagebus/testlib/CMakeLists.txt b/messagebus/src/vespa/messagebus/testlib/CMakeLists.txt
index 83106db43ce..cfc47b2db25 100644
--- a/messagebus/src/vespa/messagebus/testlib/CMakeLists.txt
+++ b/messagebus/src/vespa/messagebus/testlib/CMakeLists.txt
@@ -2,8 +2,6 @@
vespa_add_library(messagebus_messagebus-test
SOURCES
custompolicy.cpp
- oosserver.cpp
- oosstate.cpp
receptor.cpp
simplemessage.cpp
simpleprotocol.cpp
diff --git a/messagebus/src/vespa/messagebus/testlib/oosserver.cpp b/messagebus/src/vespa/messagebus/testlib/oosserver.cpp
deleted file mode 100644
index a3d3a580666..00000000000
--- a/messagebus/src/vespa/messagebus/testlib/oosserver.cpp
+++ /dev/null
@@ -1,80 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "oosserver.h"
-#include "slobrok.h"
-
-namespace mbus {
-
-OOSServer::OOSServer(const Slobrok &slobrok, const string service,
- const OOSState &state)
- : _lock("mbus::OOSServer::_lock", false),
- _orb(),
- _port(0),
- _regAPI(_orb, slobrok::ConfiguratorFactory(slobrok.config())),
- _genCnt(1),
- _state()
-{
- setState(state);
- {
- FRT_ReflectionBuilder rb(&_orb);
- //-------------------------------------------------------------------
- rb.DefineMethod("fleet.getOOSList", "ii", "Si", true,
- FRT_METHOD(OOSServer::rpc_poll), this);
- rb.MethodDesc("fetch OOS information");
- rb.ParamDesc("gencnt", "generation already known by client");
- rb.ParamDesc("timeout", "How many milliseconds to wait for changes "
- "before returning if nothing has changed (max=10000)");
- rb.ReturnDesc("names", "list of services that are OOS "
- "(empty if generation has not changed)");
- rb.ReturnDesc("newgen", "generation of the returned list");
- //-------------------------------------------------------------------
- }
- _orb.Listen(0);
- _port = _orb.GetListenPort();
- _orb.Start();
- _regAPI.registerName(service);
-}
-
-OOSServer::~OOSServer()
-{
- _orb.ShutDown(true);
-}
-
-int
-OOSServer::port() const
-{
- return _port;
-}
-
-void
-OOSServer::rpc_poll(FRT_RPCRequest *req)
-{
- vespalib::LockGuard guard(_lock);
- FRT_Values &dst = *req->GetReturn();
- FRT_StringValue *names = dst.AddStringArray(_state.size());
- for (uint32_t i = 0; i < _state.size(); ++i) {
- dst.SetString(&names[i], _state[i].c_str());
- }
- dst.AddInt32(_genCnt);
-}
-
-void
-OOSServer::setState(const OOSState &state)
-{
- std::vector<string> newState;
- for (OOSState::ITR itr = state.begin();
- itr != state.end(); ++itr)
- {
- if (itr->second) {
- newState.push_back(itr->first);
- }
- }
- vespalib::LockGuard guard(_lock);
- _state = newState;
- ++_genCnt;
- if (_genCnt == 0) {
- ++_genCnt;
- }
-}
-
-} // namespace mbus
diff --git a/messagebus/src/vespa/messagebus/testlib/oosserver.h b/messagebus/src/vespa/messagebus/testlib/oosserver.h
deleted file mode 100644
index 512b3c43e7a..00000000000
--- a/messagebus/src/vespa/messagebus/testlib/oosserver.h
+++ /dev/null
@@ -1,39 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#pragma once
-
-#include <vespa/vespalib/util/sync.h>
-#include <vespa/slobrok/sbregister.h>
-#include <vespa/fnet/frt/supervisor.h>
-#include <string>
-#include <vector>
-#include "oosstate.h"
-
-namespace mbus {
-
-class Slobrok;
-
-class OOSServer : public FRT_Invokable
-{
-private:
- OOSServer(const OOSServer &);
- OOSServer &operator=(const OOSServer &);
-
- vespalib::Lock _lock;
- FRT_Supervisor _orb;
- int _port;
- slobrok::api::RegisterAPI _regAPI;
- uint32_t _genCnt;
- std::vector<string> _state;
-
-public:
- OOSServer(const Slobrok &slobrok, const string service,
- const OOSState &state = OOSState());
- ~OOSServer();
- int port() const;
- void rpc_poll(FRT_RPCRequest *req);
- void setState(const OOSState &state);
-};
-
-} // namespace mbus
-
diff --git a/messagebus/src/vespa/messagebus/testlib/oosstate.cpp b/messagebus/src/vespa/messagebus/testlib/oosstate.cpp
deleted file mode 100644
index d4258a31ff0..00000000000
--- a/messagebus/src/vespa/messagebus/testlib/oosstate.cpp
+++ /dev/null
@@ -1,30 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "oosstate.h"
-
-namespace mbus {
-
-OOSState::OOSState()
- : _data()
-{ }
-
-OOSState &
-OOSState::add(const string &service, bool oos)
-{
- _data.push_back(std::make_pair(service, oos));
- return *this;
-}
-
-OOSState::ITR
-OOSState::begin() const
-{
- return _data.begin();
-}
-
-OOSState::ITR
-OOSState::end() const
-{
- return _data.end();
-}
-
-} // namespace mbus
diff --git a/messagebus/src/vespa/messagebus/testlib/oosstate.h b/messagebus/src/vespa/messagebus/testlib/oosstate.h
deleted file mode 100644
index 9dfc58d4070..00000000000
--- a/messagebus/src/vespa/messagebus/testlib/oosstate.h
+++ /dev/null
@@ -1,27 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#pragma once
-
-#include <vector>
-#include <vespa/messagebus/common.h>
-
-namespace mbus {
-
-class OOSState
-{
-public:
- typedef std::vector<std::pair<string, bool> > TYPE;
- typedef TYPE::const_iterator ITR;
-
-private:
- TYPE _data;
-
-public:
- OOSState();
- OOSState &add(const string &service, bool oos = true);
- ITR begin() const;
- ITR end() const;
-};
-
-} // namespace mbus
-
diff --git a/messagebus/src/vespa/messagebus/testlib/testserver.cpp b/messagebus/src/vespa/messagebus/testlib/testserver.cpp
index dbc741f2dd4..a2489aac9ce 100644
--- a/messagebus/src/vespa/messagebus/testlib/testserver.cpp
+++ b/messagebus/src/vespa/messagebus/testlib/testserver.cpp
@@ -1,10 +1,8 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "testserver.h"
-#include "oosstate.h"
#include "simpleprotocol.h"
#include "slobrok.h"
#include "slobrokstate.h"
-#include <vespa/messagebus/network/oosmanager.h>
#include <vespa/vespalib/component/vtag.h>
namespace mbus {
@@ -24,12 +22,10 @@ VersionedRPCNetwork::setVersion(const vespalib::Version &version)
TestServer::TestServer(const Identity &ident,
const RoutingSpec &spec,
const Slobrok &slobrok,
- const string &oosServerPattern,
IProtocol::SP protocol) :
net(RPCNetworkParams()
.setIdentity(ident)
- .setSlobrokConfig(slobrok.config())
- .setOOSServerPattern(oosServerPattern)),
+ .setSlobrokConfig(slobrok.config())),
mb(net, ProtocolSet().add(IProtocol::SP(new SimpleProtocol())).add(protocol))
{
mb.setupRouting(spec);
@@ -50,12 +46,6 @@ TestServer::waitSlobrok(const string &pattern, uint32_t cnt)
}
bool
-TestServer::waitOOS(const string &service)
-{
- return waitState(OOSState().add(service, true));
-}
-
-bool
TestServer::waitState(const SlobrokState &slobrokState)
{
for (uint32_t i = 0; i < 12000; ++i) {
@@ -76,24 +66,4 @@ TestServer::waitState(const SlobrokState &slobrokState)
return false;
}
-bool
-TestServer::waitState(const OOSState &oosState)
-{
- for (uint32_t i = 0; i < 12000; ++i) {
- bool done = true;
- for (OOSState::ITR itr = oosState.begin();
- itr != oosState.end(); ++itr)
- {
- if (net.getOOSManager().isOOS(itr->first) != itr->second) {
- done = false;
- }
- }
- if (done) {
- return true;
- }
- FastOS_Thread::Sleep(10);
- }
- return false;
-}
-
}
diff --git a/messagebus/src/vespa/messagebus/testlib/testserver.h b/messagebus/src/vespa/messagebus/testlib/testserver.h
index 400e2b274c5..757e74c3554 100644
--- a/messagebus/src/vespa/messagebus/testlib/testserver.h
+++ b/messagebus/src/vespa/messagebus/testlib/testserver.h
@@ -13,7 +13,6 @@ class Identity;
class RoutingTableSpec;
class Slobrok;
class SlobrokState;
-class OOSState;
class VersionedRPCNetwork : public RPCNetwork {
private:
@@ -36,20 +35,13 @@ public:
VersionedRPCNetwork net;
MessageBus mb;
- TestServer(const Identity &ident,
- const RoutingSpec &spec,
- const Slobrok &slobrok,
- const string &oosServerPattern = "",
+ TestServer(const Identity &ident, const RoutingSpec &spec, const Slobrok &slobrok,
IProtocol::SP protocol = IProtocol::SP());
- TestServer(const MessageBusParams &mbusParams,
- const RPCNetworkParams &netParams);
+ TestServer(const MessageBusParams &mbusParams, const RPCNetworkParams &netParams);
~TestServer();
bool waitSlobrok(const string &pattern, uint32_t cnt = 1);
- bool waitOOS(const string &service);
-
bool waitState(const SlobrokState &slobrokState);
- bool waitState(const OOSState &oosState);
};
} // namespace mbus