diff options
author | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2019-03-13 16:00:04 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2019-03-14 14:44:29 +0000 |
commit | c5d95cd19e86f2d3c337122226efd946f47d752e (patch) | |
tree | f6d5ab49215e41d67a8208248c58267f32617d25 /storage/src/tests/storageserver | |
parent | 4a18ca637cff723bcc45acc425689a69bcf4db66 (diff) |
Basic handling of activate_cluster_state_version RPC in backend
Diffstat (limited to 'storage/src/tests/storageserver')
-rw-r--r-- | storage/src/tests/storageserver/fnet_listener_test.cpp | 88 |
1 files changed, 77 insertions, 11 deletions
diff --git a/storage/src/tests/storageserver/fnet_listener_test.cpp b/storage/src/tests/storageserver/fnet_listener_test.cpp index 84051041d25..d40b230d725 100644 --- a/storage/src/tests/storageserver/fnet_listener_test.cpp +++ b/storage/src/tests/storageserver/fnet_listener_test.cpp @@ -27,6 +27,9 @@ public: CPPUNIT_TEST(set_distribution_rpc_is_immediately_failed_if_listener_is_closed); CPPUNIT_TEST(overly_large_uncompressed_bundle_size_parameter_returns_rpc_error); CPPUNIT_TEST(mismatching_uncompressed_bundle_size_parameter_returns_rpc_error); + CPPUNIT_TEST(true_deferred_activation_flag_can_be_roundtrip_encoded); + CPPUNIT_TEST(false_deferred_activation_flag_can_be_roundtrip_encoded); + CPPUNIT_TEST(activate_cluster_state_version_rpc_enqueues_command_with_version); CPPUNIT_TEST_SUITE_END(); void baseline_set_distribution_states_rpc_enqueues_command_with_state_bundle(); @@ -35,6 +38,9 @@ public: void set_distribution_rpc_is_immediately_failed_if_listener_is_closed(); void overly_large_uncompressed_bundle_size_parameter_returns_rpc_error(); void mismatching_uncompressed_bundle_size_parameter_returns_rpc_error(); + void true_deferred_activation_flag_can_be_roundtrip_encoded(); + void false_deferred_activation_flag_can_be_roundtrip_encoded(); + void activate_cluster_state_version_rpc_enqueues_command_with_version(); }; CPPUNIT_TEST_SUITE_REGISTRATION(FNetListenerTest); @@ -54,24 +60,25 @@ struct DummyReturnHandler : FRT_IReturnHandler { FNET_Connection* GetConnection() override { return nullptr; } }; -struct Fixture { +struct FixtureBase { // TODO factor out Slobrok code to avoid need to set up live ports for unrelated tests mbus::Slobrok slobrok; vdstestlib::DirConfig config; MockOperationEnqueuer enqueuer; std::unique_ptr<FNetListener> fnet_listener; - SlimeClusterStateBundleCodec codec; DummyReturnHandler return_handler; bool request_is_detached{false}; FRT_RPCRequest* bound_request{nullptr}; - Fixture() : config(getStandardConfig(true)) { + FixtureBase() + : config(getStandardConfig(true)) + { config.getConfig("stor-server").set("node_index", "1"); addSlobrokConfig(config, slobrok); fnet_listener = std::make_unique<FNetListener>(enqueuer, config.getConfigId(), 0); } - ~Fixture() { + virtual ~FixtureBase() { // Must destroy any associated message contexts that may have refs to FRT_Request // instance _before_ we destroy the request itself. enqueuer._enqueued.clear(); @@ -79,6 +86,12 @@ struct Fixture { bound_request->SubRef(); } } +}; + +struct SetStateFixture : FixtureBase { + SlimeClusterStateBundleCodec codec; + + SetStateFixture() : FixtureBase() {} void bind_request_params(EncodedClusterStateBundle& encoded_bundle, uint32_t uncompressed_length) { bound_request = new FRT_RPCRequest(); // Naked new isn't pretty, but FRT_RPCRequest has internal refcounting @@ -123,6 +136,10 @@ struct Fixture { lib::ClusterStateBundle dummy_baseline_bundle() const { return lib::ClusterStateBundle(lib::ClusterState("version:123 distributor:3 storage:3")); } + + lib::ClusterStateBundle dummy_baseline_bundle_with_deferred_activation(bool deferred) const { + return lib::ClusterStateBundle(lib::ClusterState("version:123 distributor:3 storage:3"), {}, deferred); + } }; std::shared_ptr<const lib::ClusterState> state_of(vespalib::stringref state) { @@ -138,17 +155,17 @@ vespalib::string make_compressable_state_string() { ss.str().data(), ss.str().data()); } -} +} // anon namespace void FNetListenerTest::baseline_set_distribution_states_rpc_enqueues_command_with_state_bundle() { - Fixture f; + SetStateFixture f; auto baseline = f.dummy_baseline_bundle(); f.assert_request_received_and_propagated(baseline); } void FNetListenerTest::set_distribution_states_rpc_with_derived_enqueues_command_with_state_bundle() { - Fixture f; + SetStateFixture f; lib::ClusterStateBundle spaces_bundle( lib::ClusterState("version:123 distributor:3 storage:3"), {{FixedBucketSpaces::default_space(), state_of("version:123 distributor:3 storage:3 .0.s:d")}, @@ -158,7 +175,7 @@ void FNetListenerTest::set_distribution_states_rpc_with_derived_enqueues_command } void FNetListenerTest::compressed_bundle_is_transparently_uncompressed() { - Fixture f; + SetStateFixture f; auto state_str = make_compressable_state_string(); lib::ClusterStateBundle compressable_bundle{lib::ClusterState(state_str)}; @@ -171,24 +188,73 @@ void FNetListenerTest::compressed_bundle_is_transparently_uncompressed() { } void FNetListenerTest::set_distribution_rpc_is_immediately_failed_if_listener_is_closed() { - Fixture f; + SetStateFixture f; f.create_request(f.dummy_baseline_bundle()); f.fnet_listener->close(); f.assert_request_returns_error_response(RPCRequestWrapper::ERR_NODE_SHUTTING_DOWN); } void FNetListenerTest::overly_large_uncompressed_bundle_size_parameter_returns_rpc_error() { - Fixture f; + SetStateFixture f; auto encoded_bundle = f.codec.encode(f.dummy_baseline_bundle()); f.bind_request_params(encoded_bundle, FNetListener::StateBundleMaxUncompressedSize + 1); f.assert_request_returns_error_response(RPCRequestWrapper::ERR_BAD_REQUEST); } void FNetListenerTest::mismatching_uncompressed_bundle_size_parameter_returns_rpc_error() { - Fixture f; + SetStateFixture f; auto encoded_bundle = f.codec.encode(f.dummy_baseline_bundle()); f.bind_request_params(encoded_bundle, encoded_bundle._buffer->getDataLen() + 100); f.assert_request_returns_error_response(RPCRequestWrapper::ERR_BAD_REQUEST); } +void FNetListenerTest::true_deferred_activation_flag_can_be_roundtrip_encoded() { + SetStateFixture f; + f.assert_request_received_and_propagated(f.dummy_baseline_bundle_with_deferred_activation(true)); + +} + +void FNetListenerTest::false_deferred_activation_flag_can_be_roundtrip_encoded() { + SetStateFixture f; + f.assert_request_received_and_propagated(f.dummy_baseline_bundle_with_deferred_activation(false)); +} + +struct ActivateStateFixture : FixtureBase { + ActivateStateFixture() : FixtureBase() {} + + void bind_request_params(uint32_t activate_version) { + bound_request = new FRT_RPCRequest(); // Naked new isn't pretty, but FRT_RPCRequest has internal refcounting + auto* params = bound_request->GetParams(); + params->AddInt32(activate_version); + + bound_request->SetDetachedPT(&request_is_detached); + bound_request->SetReturnHandler(&return_handler); + } + + void create_request(uint32_t activate_version) { + // Only 1 request allowed per fixture due to lifetime handling snags + assert(bound_request == nullptr); + bind_request_params(activate_version); + } + + void assert_enqueued_operation_has_activate_version(uint32_t version) { + CPPUNIT_ASSERT(bound_request != nullptr); + CPPUNIT_ASSERT(request_is_detached); + CPPUNIT_ASSERT_EQUAL(size_t(1), enqueuer._enqueued.size()); + auto& state_request = dynamic_cast<const api::ActivateClusterStateVersionCommand&>(*enqueuer._enqueued[0]); + CPPUNIT_ASSERT_EQUAL(version, state_request.version()); + } + + void assert_request_received_and_propagated(uint32_t activate_version) { + create_request(activate_version); + fnet_listener->RPC_activateClusterStateVersion(bound_request); + assert_enqueued_operation_has_activate_version(activate_version); + } +}; + +void FNetListenerTest::activate_cluster_state_version_rpc_enqueues_command_with_version() { + ActivateStateFixture f; + f.assert_request_received_and_propagated(1234567); +} + } |