diff options
author | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2019-03-13 16:00:04 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2019-03-14 14:44:29 +0000 |
commit | c5d95cd19e86f2d3c337122226efd946f47d752e (patch) | |
tree | f6d5ab49215e41d67a8208248c58267f32617d25 /storage | |
parent | 4a18ca637cff723bcc45acc425689a69bcf4db66 (diff) |
Basic handling of activate_cluster_state_version RPC in backend
Diffstat (limited to 'storage')
7 files changed, 122 insertions, 14 deletions
diff --git a/storage/src/tests/storageserver/fnet_listener_test.cpp b/storage/src/tests/storageserver/fnet_listener_test.cpp index 84051041d25..d40b230d725 100644 --- a/storage/src/tests/storageserver/fnet_listener_test.cpp +++ b/storage/src/tests/storageserver/fnet_listener_test.cpp @@ -27,6 +27,9 @@ public: CPPUNIT_TEST(set_distribution_rpc_is_immediately_failed_if_listener_is_closed); CPPUNIT_TEST(overly_large_uncompressed_bundle_size_parameter_returns_rpc_error); CPPUNIT_TEST(mismatching_uncompressed_bundle_size_parameter_returns_rpc_error); + CPPUNIT_TEST(true_deferred_activation_flag_can_be_roundtrip_encoded); + CPPUNIT_TEST(false_deferred_activation_flag_can_be_roundtrip_encoded); + CPPUNIT_TEST(activate_cluster_state_version_rpc_enqueues_command_with_version); CPPUNIT_TEST_SUITE_END(); void baseline_set_distribution_states_rpc_enqueues_command_with_state_bundle(); @@ -35,6 +38,9 @@ public: void set_distribution_rpc_is_immediately_failed_if_listener_is_closed(); void overly_large_uncompressed_bundle_size_parameter_returns_rpc_error(); void mismatching_uncompressed_bundle_size_parameter_returns_rpc_error(); + void true_deferred_activation_flag_can_be_roundtrip_encoded(); + void false_deferred_activation_flag_can_be_roundtrip_encoded(); + void activate_cluster_state_version_rpc_enqueues_command_with_version(); }; CPPUNIT_TEST_SUITE_REGISTRATION(FNetListenerTest); @@ -54,24 +60,25 @@ struct DummyReturnHandler : FRT_IReturnHandler { FNET_Connection* GetConnection() override { return nullptr; } }; -struct Fixture { +struct FixtureBase { // TODO factor out Slobrok code to avoid need to set up live ports for unrelated tests mbus::Slobrok slobrok; vdstestlib::DirConfig config; MockOperationEnqueuer enqueuer; std::unique_ptr<FNetListener> fnet_listener; - SlimeClusterStateBundleCodec codec; DummyReturnHandler return_handler; bool request_is_detached{false}; FRT_RPCRequest* bound_request{nullptr}; - Fixture() : config(getStandardConfig(true)) { + FixtureBase() + : config(getStandardConfig(true)) + { config.getConfig("stor-server").set("node_index", "1"); addSlobrokConfig(config, slobrok); fnet_listener = std::make_unique<FNetListener>(enqueuer, config.getConfigId(), 0); } - ~Fixture() { + virtual ~FixtureBase() { // Must destroy any associated message contexts that may have refs to FRT_Request // instance _before_ we destroy the request itself. enqueuer._enqueued.clear(); @@ -79,6 +86,12 @@ struct Fixture { bound_request->SubRef(); } } +}; + +struct SetStateFixture : FixtureBase { + SlimeClusterStateBundleCodec codec; + + SetStateFixture() : FixtureBase() {} void bind_request_params(EncodedClusterStateBundle& encoded_bundle, uint32_t uncompressed_length) { bound_request = new FRT_RPCRequest(); // Naked new isn't pretty, but FRT_RPCRequest has internal refcounting @@ -123,6 +136,10 @@ struct Fixture { lib::ClusterStateBundle dummy_baseline_bundle() const { return lib::ClusterStateBundle(lib::ClusterState("version:123 distributor:3 storage:3")); } + + lib::ClusterStateBundle dummy_baseline_bundle_with_deferred_activation(bool deferred) const { + return lib::ClusterStateBundle(lib::ClusterState("version:123 distributor:3 storage:3"), {}, deferred); + } }; std::shared_ptr<const lib::ClusterState> state_of(vespalib::stringref state) { @@ -138,17 +155,17 @@ vespalib::string make_compressable_state_string() { ss.str().data(), ss.str().data()); } -} +} // anon namespace void FNetListenerTest::baseline_set_distribution_states_rpc_enqueues_command_with_state_bundle() { - Fixture f; + SetStateFixture f; auto baseline = f.dummy_baseline_bundle(); f.assert_request_received_and_propagated(baseline); } void FNetListenerTest::set_distribution_states_rpc_with_derived_enqueues_command_with_state_bundle() { - Fixture f; + SetStateFixture f; lib::ClusterStateBundle spaces_bundle( lib::ClusterState("version:123 distributor:3 storage:3"), {{FixedBucketSpaces::default_space(), state_of("version:123 distributor:3 storage:3 .0.s:d")}, @@ -158,7 +175,7 @@ void FNetListenerTest::set_distribution_states_rpc_with_derived_enqueues_command } void FNetListenerTest::compressed_bundle_is_transparently_uncompressed() { - Fixture f; + SetStateFixture f; auto state_str = make_compressable_state_string(); lib::ClusterStateBundle compressable_bundle{lib::ClusterState(state_str)}; @@ -171,24 +188,73 @@ void FNetListenerTest::compressed_bundle_is_transparently_uncompressed() { } void FNetListenerTest::set_distribution_rpc_is_immediately_failed_if_listener_is_closed() { - Fixture f; + SetStateFixture f; f.create_request(f.dummy_baseline_bundle()); f.fnet_listener->close(); f.assert_request_returns_error_response(RPCRequestWrapper::ERR_NODE_SHUTTING_DOWN); } void FNetListenerTest::overly_large_uncompressed_bundle_size_parameter_returns_rpc_error() { - Fixture f; + SetStateFixture f; auto encoded_bundle = f.codec.encode(f.dummy_baseline_bundle()); f.bind_request_params(encoded_bundle, FNetListener::StateBundleMaxUncompressedSize + 1); f.assert_request_returns_error_response(RPCRequestWrapper::ERR_BAD_REQUEST); } void FNetListenerTest::mismatching_uncompressed_bundle_size_parameter_returns_rpc_error() { - Fixture f; + SetStateFixture f; auto encoded_bundle = f.codec.encode(f.dummy_baseline_bundle()); f.bind_request_params(encoded_bundle, encoded_bundle._buffer->getDataLen() + 100); f.assert_request_returns_error_response(RPCRequestWrapper::ERR_BAD_REQUEST); } +void FNetListenerTest::true_deferred_activation_flag_can_be_roundtrip_encoded() { + SetStateFixture f; + f.assert_request_received_and_propagated(f.dummy_baseline_bundle_with_deferred_activation(true)); + +} + +void FNetListenerTest::false_deferred_activation_flag_can_be_roundtrip_encoded() { + SetStateFixture f; + f.assert_request_received_and_propagated(f.dummy_baseline_bundle_with_deferred_activation(false)); +} + +struct ActivateStateFixture : FixtureBase { + ActivateStateFixture() : FixtureBase() {} + + void bind_request_params(uint32_t activate_version) { + bound_request = new FRT_RPCRequest(); // Naked new isn't pretty, but FRT_RPCRequest has internal refcounting + auto* params = bound_request->GetParams(); + params->AddInt32(activate_version); + + bound_request->SetDetachedPT(&request_is_detached); + bound_request->SetReturnHandler(&return_handler); + } + + void create_request(uint32_t activate_version) { + // Only 1 request allowed per fixture due to lifetime handling snags + assert(bound_request == nullptr); + bind_request_params(activate_version); + } + + void assert_enqueued_operation_has_activate_version(uint32_t version) { + CPPUNIT_ASSERT(bound_request != nullptr); + CPPUNIT_ASSERT(request_is_detached); + CPPUNIT_ASSERT_EQUAL(size_t(1), enqueuer._enqueued.size()); + auto& state_request = dynamic_cast<const api::ActivateClusterStateVersionCommand&>(*enqueuer._enqueued[0]); + CPPUNIT_ASSERT_EQUAL(version, state_request.version()); + } + + void assert_request_received_and_propagated(uint32_t activate_version) { + create_request(activate_version); + fnet_listener->RPC_activateClusterStateVersion(bound_request); + assert_enqueued_operation_has_activate_version(activate_version); + } +}; + +void FNetListenerTest::activate_cluster_state_version_rpc_enqueues_command_with_version() { + ActivateStateFixture f; + f.assert_request_received_and_propagated(1234567); +} + } diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp index 7fb85ef0ecc..978d434847e 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp +++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp @@ -622,20 +622,25 @@ CommunicationManager::sendDirectRPCReply( { std::string requestName(request.getMethodName()); if (requestName == "getnodestate3") { - api::GetNodeStateReply& gns(static_cast<api::GetNodeStateReply&>(*reply)); + auto& gns(dynamic_cast<api::GetNodeStateReply&>(*reply)); std::ostringstream ns; serializeNodeState(gns, ns, true, true, false); request.addReturnString(ns.str().c_str()); request.addReturnString(gns.getNodeInfo().c_str()); LOGBP(debug, "Sending getnodestate3 reply with host info '%s'.", gns.getNodeInfo().c_str()); } else if (requestName == "getnodestate2") { - api::GetNodeStateReply& gns(static_cast<api::GetNodeStateReply&>(*reply)); + auto& gns(dynamic_cast<api::GetNodeStateReply&>(*reply)); std::ostringstream ns; serializeNodeState(gns, ns, true, true, false); request.addReturnString(ns.str().c_str()); LOGBP(debug, "Sending getnodestate2 reply with no host info."); } else if (requestName == "setsystemstate2" || requestName == "setdistributionstates") { // No data to return + } else if (requestName == "activate_cluster_state_version") { + auto& activate_reply(dynamic_cast<api::ActivateClusterStateVersionReply&>(*reply)); + request.addReturnInt(activate_reply.actualVersion()); + LOGBP(debug, "sending activate_cluster_state_version reply for version %u with actual version %u ", + activate_reply.activateVersion(), activate_reply.actualVersion()); } else { request.addReturnInt(reply->getResult().getResult()); request.addReturnString(reply->getResult().getMessage().c_str()); diff --git a/storage/src/vespa/storage/storageserver/fnetlistener.cpp b/storage/src/vespa/storage/storageserver/fnetlistener.cpp index e31bded772c..3654b17a2a7 100644 --- a/storage/src/vespa/storage/storageserver/fnetlistener.cpp +++ b/storage/src/vespa/storage/storageserver/fnetlistener.cpp @@ -92,6 +92,11 @@ FNetListener::initRPC() rb.ParamDesc("uncompressedSize", "Uncompressed size for payload"); rb.ParamDesc("payload", "Binary Slime format payload"); //------------------------------------------------------------------------- + rb.DefineMethod("activate_cluster_state_version", "i", "i", FRT_METHOD(FNetListener::RPC_activateClusterStateVersion), this); + rb.MethodDesc("Explicitly activates an already prepared cluster state version"); + rb.ParamDesc("activate_version", "Expected cluster state version to activate"); + rb.ReturnDesc("actual_version", "Cluster state version that was prepared on the node prior to receiving RPC"); + //------------------------------------------------------------------------- rb.DefineMethod("getcurrenttime", "", "lis", FRT_METHOD(FNetListener::RPC_getCurrentTime), this); rb.MethodDesc("Get current time on this node"); rb.ReturnDesc("seconds", "Current time in seconds since epoch"); @@ -211,4 +216,18 @@ void FNetListener::RPC_setDistributionStates(FRT_RPCRequest* req) { detach_and_forward_to_enqueuer(std::move(cmd), req); } +void FNetListener::RPC_activateClusterStateVersion(FRT_RPCRequest* req) { + if (_closed) { + LOG(debug, "Not handling RPC call activate_cluster_state_version() as we have closed"); + req->SetError(RPCRequestWrapper::ERR_NODE_SHUTTING_DOWN, "Node shutting down"); + return; + } + + const int32_t activate_version = req->GetParams()->GetValue(0)._intval32; + auto cmd = std::make_shared<api::ActivateClusterStateVersionCommand>(activate_version); + cmd->setPriority(api::StorageMessage::VERYHIGH); + + detach_and_forward_to_enqueuer(std::move(cmd), req); +} + } diff --git a/storage/src/vespa/storage/storageserver/fnetlistener.h b/storage/src/vespa/storage/storageserver/fnetlistener.h index abcba18e0be..2097be15491 100644 --- a/storage/src/vespa/storage/storageserver/fnetlistener.h +++ b/storage/src/vespa/storage/storageserver/fnetlistener.h @@ -26,6 +26,7 @@ public: void RPC_setSystemState2(FRT_RPCRequest *req); void RPC_getCurrentTime(FRT_RPCRequest *req); void RPC_setDistributionStates(FRT_RPCRequest* req); + void RPC_activateClusterStateVersion(FRT_RPCRequest* req); void registerHandle(vespalib::stringref handle); void close(); diff --git a/storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.cpp b/storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.cpp index 5b7e0ab4621..1f854bc724e 100644 --- a/storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.cpp +++ b/storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.cpp @@ -53,6 +53,9 @@ EncodedClusterStateBundle SlimeClusterStateBundleCodec::encode( { vespalib::Slime slime; Cursor& root = slime.setObject(); + if (bundle.deferredActivation()) { + root.setBool("deferred-activation", bundle.deferredActivation()); + } Cursor& states = root.setObject("states"); states.setString("baseline", serialize_state(*bundle.getBaselineClusterState())); Cursor& spaces = states.setObject("spaces"); @@ -79,6 +82,7 @@ namespace { static const Memory StatesField("states"); static const Memory BaselineField("baseline"); static const Memory SpacesField("spaces"); +static const Memory DeferredActivationField("deferred-activation"); struct StateInserter : vespalib::slime::ObjectTraverser { lib::ClusterStateBundle::BucketSpaceStateMapping& _space_states; @@ -118,8 +122,11 @@ std::shared_ptr<const lib::ClusterStateBundle> SlimeClusterStateBundleCodec::dec lib::ClusterStateBundle::BucketSpaceStateMapping space_states; StateInserter inserter(space_states); spaces.traverse(inserter); + + const bool deferred_activation = root[DeferredActivationField].asBool(); // Defaults to false if not set. + // TODO add shared_ptr constructor for baseline? - return std::make_shared<lib::ClusterStateBundle>(baseline, std::move(space_states)); + return std::make_shared<lib::ClusterStateBundle>(baseline, std::move(space_states), deferred_activation); } } diff --git a/storage/src/vespa/storage/storageserver/statemanager.cpp b/storage/src/vespa/storage/storageserver/statemanager.cpp index 95cb5dec696..9532f99dd2a 100644 --- a/storage/src/vespa/storage/storageserver/statemanager.cpp +++ b/storage/src/vespa/storage/storageserver/statemanager.cpp @@ -514,6 +514,15 @@ StateManager::onSetSystemState( return true; } +bool +StateManager::onActivateClusterStateVersion( + const std::shared_ptr<api::ActivateClusterStateVersionCommand>& cmd) +{ + // TODO invoke listeners and set actual version + sendUp(std::make_shared<api::ActivateClusterStateVersionReply>(*cmd)); + return true; +} + void StateManager::run(framework::ThreadHandle& thread) { diff --git a/storage/src/vespa/storage/storageserver/statemanager.h b/storage/src/vespa/storage/storageserver/statemanager.h index 0bacd41f6d9..57f0e02a136 100644 --- a/storage/src/vespa/storage/storageserver/statemanager.h +++ b/storage/src/vespa/storage/storageserver/statemanager.h @@ -137,6 +137,7 @@ private: bool onGetNodeState(const std::shared_ptr<api::GetNodeStateCommand>&) override; bool onSetSystemState(const std::shared_ptr<api::SetSystemStateCommand>&) override; + bool onActivateClusterStateVersion(const std::shared_ptr<api::ActivateClusterStateVersionCommand>&) override; /** * _stateLock MUST NOT be held while calling. |