diff options
author | Håvard Pettersen <havardpe@oath.com> | 2019-05-14 12:55:05 +0000 |
---|---|---|
committer | Håvard Pettersen <havardpe@oath.com> | 2019-05-14 12:55:05 +0000 |
commit | 1503a9df2da8c06c8336e521811d2e589a1ce674 (patch) | |
tree | 3198e446513421954f3208b273c9e746b18f0591 | |
parent | 3830d8e3ddfb430d09b2ee188a7325b57d11832a (diff) |
introduce online state in proto rpc adapter
this is to ensure we do not start server stuff until we are officially
online.
6 files changed, 78 insertions, 31 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp index f39d9d8620c..772be9049db 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp @@ -332,6 +332,7 @@ Proton::init(const BootstrapConfig::SP & configSnapshot) _executor.sync(); waitForOnlineState(); _isReplayDone = true; + _rpcHooks->set_online(); if ( ! _fs4Server->start() ) { throw vespalib::PortListenException(protonConfig.ptport, "FS4"); } diff --git a/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.cpp b/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.cpp index 99cc62ce30c..fca49a52553 100644 --- a/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.cpp @@ -216,6 +216,12 @@ RPCHooksBase::open(Params & params) LOG(debug, "started monitoring interface"); } +void +RPCHooksBase::set_online() +{ + _proto_rpc_adapter->set_online(); +} + RPCHooksBase::~RPCHooksBase() = default; void diff --git a/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.h b/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.h index f16237381d6..21b433acf7b 100644 --- a/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.h +++ b/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.h @@ -99,6 +99,7 @@ public: RPCHooksBase(const RPCHooksBase &) = delete; RPCHooksBase & operator = (const RPCHooksBase &) = delete; RPCHooksBase(Params ¶ms); + void set_online(); virtual ~RPCHooksBase(); void close(); diff --git a/searchlib/src/tests/engine/proto_rpc_adapter/proto_rpc_adapter_test.cpp b/searchlib/src/tests/engine/proto_rpc_adapter/proto_rpc_adapter_test.cpp index 6747e4e741e..89763f54f3d 100644 --- a/searchlib/src/tests/engine/proto_rpc_adapter/proto_rpc_adapter_test.cpp +++ b/searchlib/src/tests/engine/proto_rpc_adapter/proto_rpc_adapter_test.cpp @@ -92,46 +92,70 @@ TEST_F(ProtoRpcAdapterTest, require_that_plain_rpc_ping_works) { TEST_F(ProtoRpcAdapterTest, require_that_proto_rpc_search_works) { auto target = connect(); - auto *rpc = new FRT_RPCRequest(); - ProtoSearchRequest req; - req.set_offset(42); - ProtoRpcAdapter::encode_search_request(req, *rpc); - target->InvokeSync(rpc, 60.0); - ProtoSearchReply reply; - EXPECT_TRUE(ProtoRpcAdapter::decode_search_reply(*rpc, reply)); - EXPECT_EQ(reply.total_hit_count(), 42); - rpc->SubRef(); + for (bool online: {false, true, true}) { + auto *rpc = new FRT_RPCRequest(); + ProtoSearchRequest req; + req.set_offset(42); + ProtoRpcAdapter::encode_search_request(req, *rpc); + target->InvokeSync(rpc, 60.0); + if (online) { + ProtoSearchReply reply; + EXPECT_TRUE(ProtoRpcAdapter::decode_search_reply(*rpc, reply)); + EXPECT_EQ(reply.total_hit_count(), 42); + } else { + EXPECT_EQ(rpc->GetErrorCode(), FRTE_RPC_METHOD_FAILED); + EXPECT_EQ(std::string(rpc->GetErrorMessage()), std::string("Server not online")); + adapter.set_online(); + } + rpc->SubRef(); + } target->SubRef(); } TEST_F(ProtoRpcAdapterTest, require_that_proto_rpc_getDocsums_works) { auto target = connect(); - auto *rpc = new FRT_RPCRequest(); - ProtoDocsumRequest req; - req.set_rank_profile("mlr"); - ProtoRpcAdapter::encode_docsum_request(req, *rpc); - target->InvokeSync(rpc, 60.0); - ProtoDocsumReply reply; - EXPECT_TRUE(ProtoRpcAdapter::decode_docsum_reply(*rpc, reply)); - const auto &mem = reply.slime_summaries(); - Slime slime; - EXPECT_EQ(BinaryFormat::decode(Memory(mem.data(), mem.size()), slime), mem.size()); - EXPECT_EQ(slime.get()[0]["use_root_slime"].asBool(), true); - EXPECT_EQ(slime.get()[1]["ranking"].asString().make_string(), "mlr"); - rpc->SubRef(); + for (bool online: {false, true, true}) { + auto *rpc = new FRT_RPCRequest(); + ProtoDocsumRequest req; + req.set_rank_profile("mlr"); + ProtoRpcAdapter::encode_docsum_request(req, *rpc); + target->InvokeSync(rpc, 60.0); + if (online) { + ProtoDocsumReply reply; + EXPECT_TRUE(ProtoRpcAdapter::decode_docsum_reply(*rpc, reply)); + const auto &mem = reply.slime_summaries(); + Slime slime; + EXPECT_EQ(BinaryFormat::decode(Memory(mem.data(), mem.size()), slime), mem.size()); + EXPECT_EQ(slime.get()[0]["use_root_slime"].asBool(), true); + EXPECT_EQ(slime.get()[1]["ranking"].asString().make_string(), "mlr"); + } else { + EXPECT_EQ(rpc->GetErrorCode(), FRTE_RPC_METHOD_FAILED); + EXPECT_EQ(std::string(rpc->GetErrorMessage()), std::string("Server not online")); + adapter.set_online(); + } + rpc->SubRef(); + } target->SubRef(); } TEST_F(ProtoRpcAdapterTest, require_that_proto_rpc_ping_works) { auto target = connect(); - auto *rpc = new FRT_RPCRequest(); - ProtoMonitorRequest req; - ProtoRpcAdapter::encode_monitor_request(req, *rpc); - target->InvokeSync(rpc, 60.0); - ProtoMonitorReply reply; - EXPECT_TRUE(ProtoRpcAdapter::decode_monitor_reply(*rpc, reply)); - EXPECT_EQ(reply.active_docs(), 53); - rpc->SubRef(); + for (bool online: {false, true, true}) { + auto *rpc = new FRT_RPCRequest(); + ProtoMonitorRequest req; + ProtoRpcAdapter::encode_monitor_request(req, *rpc); + target->InvokeSync(rpc, 60.0); + if (online) { + ProtoMonitorReply reply; + EXPECT_TRUE(ProtoRpcAdapter::decode_monitor_reply(*rpc, reply)); + EXPECT_EQ(reply.active_docs(), 53); + } else { + EXPECT_EQ(rpc->GetErrorCode(), FRTE_RPC_METHOD_FAILED); + EXPECT_EQ(std::string(rpc->GetErrorMessage()), std::string("Server not online")); + adapter.set_online(); + } + rpc->SubRef(); + } target->SubRef(); } diff --git a/searchlib/src/vespa/searchlib/engine/proto_rpc_adapter.cpp b/searchlib/src/vespa/searchlib/engine/proto_rpc_adapter.cpp index 4e637fe19cf..fabfa638c33 100644 --- a/searchlib/src/vespa/searchlib/engine/proto_rpc_adapter.cpp +++ b/searchlib/src/vespa/searchlib/engine/proto_rpc_adapter.cpp @@ -178,7 +178,8 @@ ProtoRpcAdapter::ProtoRpcAdapter(SearchServer &search_server, FRT_Supervisor &orb) : _search_server(search_server), _docsum_server(docsum_server), - _monitor_server(monitor_server) + _monitor_server(monitor_server), + _online(false) { FRT_ReflectionBuilder rb(&orb); //------------------------------------------------------------------------- @@ -202,6 +203,9 @@ ProtoRpcAdapter::ProtoRpcAdapter(SearchServer &search_server, void ProtoRpcAdapter::rpc_search(FRT_RPCRequest *req) { + if (!is_online()) { + return req->SetError(FRTE_RPC_METHOD_FAILED, "Server not online"); + } req->Detach(); auto &client = req->getStash().create<SearchCompletionHandler>(*req); auto reply = _search_server.search(search_request_decoder(*req), client); @@ -213,6 +217,9 @@ ProtoRpcAdapter::rpc_search(FRT_RPCRequest *req) void ProtoRpcAdapter::rpc_getDocsums(FRT_RPCRequest *req) { + if (!is_online()) { + return req->SetError(FRTE_RPC_METHOD_FAILED, "Server not online"); + } req->Detach(); auto &client = req->getStash().create<GetDocsumsCompletionHandler>(*req); auto reply = _docsum_server.getDocsums(docsum_request_decoder(*req), client); @@ -224,6 +231,9 @@ ProtoRpcAdapter::rpc_getDocsums(FRT_RPCRequest *req) void ProtoRpcAdapter::rpc_ping(FRT_RPCRequest *rpc) { + if (!is_online()) { + return rpc->SetError(FRTE_RPC_METHOD_FAILED, "Server not online"); + } rpc->Detach(); ProtoMonitorRequest msg; if (decode_message(*rpc->GetParams(), msg)) { diff --git a/searchlib/src/vespa/searchlib/engine/proto_rpc_adapter.h b/searchlib/src/vespa/searchlib/engine/proto_rpc_adapter.h index cd864f3d5bf..77b26b743f2 100644 --- a/searchlib/src/vespa/searchlib/engine/proto_rpc_adapter.h +++ b/searchlib/src/vespa/searchlib/engine/proto_rpc_adapter.h @@ -4,6 +4,7 @@ #include <vespa/fnet/frt/invokable.h> #include "proto_converter.h" +#include <atomic> class FRT_Supervisor; @@ -31,12 +32,16 @@ private: SearchServer &_search_server; DocsumServer &_docsum_server; MonitorServer &_monitor_server; + std::atomic<bool> _online; public: ProtoRpcAdapter(SearchServer &search_server, DocsumServer &docsum_server, MonitorServer &monitor_server, FRT_Supervisor &orb); + void set_online() { _online.store(std::memory_order_release); } + bool is_online() const { return _online.load(std::memory_order_acquire); } + void rpc_search(FRT_RPCRequest *req); void rpc_getDocsums(FRT_RPCRequest *req); void rpc_ping(FRT_RPCRequest *req); |