summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2019-05-10 11:25:48 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2019-05-10 12:25:38 +0000
commit4412aace869986be3a1060f78f367841353d3384 (patch)
treef4b5e1f6da5eaf1563f3b2fd64779800acfd5796 /storage
parent840d4e0578dc627b75bcd0050f1b253e84cc30ed (diff)
Simplify the supervisor responsibility
Diffstat (limited to 'storage')
-rw-r--r--storage/src/vespa/storage/storageserver/fnetlistener.cpp9
-rw-r--r--storage/src/vespa/storage/storageserver/fnetlistener.h14
-rw-r--r--storage/src/vespa/storage/tools/storage-cmd.cpp26
3 files changed, 27 insertions, 22 deletions
diff --git a/storage/src/vespa/storage/storageserver/fnetlistener.cpp b/storage/src/vespa/storage/storageserver/fnetlistener.cpp
index ec488b25714..2f1cdc74ce1 100644
--- a/storage/src/vespa/storage/storageserver/fnetlistener.cpp
+++ b/storage/src/vespa/storage/storageserver/fnetlistener.cpp
@@ -7,6 +7,7 @@
#include <vespa/vespalib/util/exceptions.h>
#include <vespa/vespalib/util/host_name.h>
#include <vespa/fnet/frt/supervisor.h>
+#include <vespa/fnet/transport.h>
#include <sstream>
#include <vespa/log/log.h>
@@ -16,7 +17,9 @@ namespace storage {
FNetListener::FNetListener(MessageEnqueuer& messageEnqueuer, const config::ConfigUri & configUri, uint32_t port)
: _messageEnqueuer(messageEnqueuer),
- _orb(std::make_unique<FRT_Supervisor>()),
+ _threadPool(std::make_unique<FastOS_ThreadPool>(1024*60)),
+ _transport(std::make_unique<FNET_Transport>()),
+ _orb(std::make_unique<FRT_Supervisor>(_transport.get())),
_closed(false),
_slobrokRegister(*_orb, configUri)
{
@@ -26,7 +29,7 @@ FNetListener::FNetListener(MessageEnqueuer& messageEnqueuer, const config::Confi
ost << "Failed to listen to RPC port " << port << ".";
throw vespalib::IllegalStateException(ost.str(), VESPA_STRLOC);
}
- _orb->Start();
+ _transport->Start(_threadPool.get());
}
FNetListener::~FNetListener()
@@ -57,7 +60,7 @@ FNetListener::close()
{
_closed = true;
_slobrokRegister.unregisterName(_handle);
- _orb->ShutDown(true);
+ _transport->ShutDown(true);
}
void
diff --git a/storage/src/vespa/storage/storageserver/fnetlistener.h b/storage/src/vespa/storage/storageserver/fnetlistener.h
index 2097be15491..8d24311bc57 100644
--- a/storage/src/vespa/storage/storageserver/fnetlistener.h
+++ b/storage/src/vespa/storage/storageserver/fnetlistener.h
@@ -4,6 +4,8 @@
#include <vespa/slobrok/sbregister.h>
#include <atomic>
+class FNET_Transport;
+
namespace storage {
namespace api { class StorageMessage; }
@@ -33,11 +35,13 @@ public:
int getListenPort() const;
private:
- MessageEnqueuer& _messageEnqueuer;
- std::unique_ptr<FRT_Supervisor> _orb;
- std::atomic<bool> _closed;
- slobrok::api::RegisterAPI _slobrokRegister;
- vespalib::string _handle;
+ MessageEnqueuer& _messageEnqueuer;
+ std::unique_ptr<FastOS_ThreadPool> _threadPool;
+ std::unique_ptr<FNET_Transport> _transport;
+ std::unique_ptr<FRT_Supervisor> _orb;
+ std::atomic<bool> _closed;
+ slobrok::api::RegisterAPI _slobrokRegister;
+ vespalib::string _handle;
void detach_and_forward_to_enqueuer(std::shared_ptr<api::StorageMessage> cmd, FRT_RPCRequest *req);
};
diff --git a/storage/src/vespa/storage/tools/storage-cmd.cpp b/storage/src/vespa/storage/tools/storage-cmd.cpp
index 564574627a7..daaa890873f 100644
--- a/storage/src/vespa/storage/tools/storage-cmd.cpp
+++ b/storage/src/vespa/storage/tools/storage-cmd.cpp
@@ -18,22 +18,22 @@ private:
const char *value = param + 2;
switch (param[0]) {
case 'b':
- req->GetParams()->AddInt8(strtoll(value, NULL, 0));
+ req->GetParams()->AddInt8(strtoll(value, nullptr, 0));
break;
case 'h':
- req->GetParams()->AddInt16(strtoll(value, NULL, 0));
+ req->GetParams()->AddInt16(strtoll(value, nullptr, 0));
break;
case 'i':
- req->GetParams()->AddInt32(strtoll(value, NULL, 0));
+ req->GetParams()->AddInt32(strtoll(value, nullptr, 0));
break;
case 'l':
- req->GetParams()->AddInt64(strtoll(value, NULL, 0));
+ req->GetParams()->AddInt64(strtoll(value, nullptr, 0));
break;
case 'f':
- req->GetParams()->AddFloat(vespalib::locale::c::strtod(value, NULL));
+ req->GetParams()->AddFloat(vespalib::locale::c::strtod(value, nullptr));
break;
case 'd':
- req->GetParams()->AddDouble(vespalib::locale::c::strtod(value, NULL));
+ req->GetParams()->AddDouble(vespalib::locale::c::strtod(value, nullptr));
break;
case 's':
req->GetParams()->AddString(value);
@@ -55,11 +55,10 @@ public:
return 1;
}
int retCode = 0;
- FRT_Supervisor supervisor;
- supervisor.Start();
+ fnet::frt::StandaloneFRT supervisor;
slobrok::ConfiguratorFactory sbcfg("admin/slobrok.0");
- slobrok::api::MirrorAPI mirror(supervisor, sbcfg);
+ slobrok::api::MirrorAPI mirror(supervisor.supervisor(), sbcfg);
while (!mirror.ready()) {
FastOS_Thread::Sleep(10);
@@ -72,11 +71,11 @@ public:
}
for (size_t j = 0; j < list.size(); j++) {
- FRT_Target *target = supervisor.GetTarget(list[j].second.c_str());
+ FRT_Target *target = supervisor.supervisor().GetTarget(list[j].second.c_str());
// If not fleet controller, need to connect first.
- if (strstr(_argv[1], "fleetcontroller") == NULL) {
- FRT_RPCRequest *req = supervisor.AllocRPCRequest();
+ if (strstr(_argv[1], "fleetcontroller") == nullptr) {
+ FRT_RPCRequest *req = supervisor.supervisor().AllocRPCRequest();
req->SetMethodName("vespa.storage.connect");
req->GetParams()->AddString(_argv[1]);
target->InvokeSync(req, 10.0);
@@ -90,7 +89,7 @@ public:
req->SubRef();
}
- FRT_RPCRequest *req = supervisor.AllocRPCRequest();
+ FRT_RPCRequest *req = supervisor.supervisor().AllocRPCRequest();
req->SetMethodName(_argv[2]);
for (int i = 3; i < _argc; ++i) {
@@ -115,7 +114,6 @@ public:
req->SubRef();
target->SubRef();
}
- supervisor.ShutDown(true);
return retCode;
}
};