summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2019-03-13 16:00:04 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2019-03-14 14:44:29 +0000
commitc5d95cd19e86f2d3c337122226efd946f47d752e (patch)
treef6d5ab49215e41d67a8208248c58267f32617d25 /storage
parent4a18ca637cff723bcc45acc425689a69bcf4db66 (diff)
Basic handling of activate_cluster_state_version RPC in backend
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/storageserver/fnet_listener_test.cpp88
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.cpp9
-rw-r--r--storage/src/vespa/storage/storageserver/fnetlistener.cpp19
-rw-r--r--storage/src/vespa/storage/storageserver/fnetlistener.h1
-rw-r--r--storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.cpp9
-rw-r--r--storage/src/vespa/storage/storageserver/statemanager.cpp9
-rw-r--r--storage/src/vespa/storage/storageserver/statemanager.h1
7 files changed, 122 insertions, 14 deletions
diff --git a/storage/src/tests/storageserver/fnet_listener_test.cpp b/storage/src/tests/storageserver/fnet_listener_test.cpp
index 84051041d25..d40b230d725 100644
--- a/storage/src/tests/storageserver/fnet_listener_test.cpp
+++ b/storage/src/tests/storageserver/fnet_listener_test.cpp
@@ -27,6 +27,9 @@ public:
CPPUNIT_TEST(set_distribution_rpc_is_immediately_failed_if_listener_is_closed);
CPPUNIT_TEST(overly_large_uncompressed_bundle_size_parameter_returns_rpc_error);
CPPUNIT_TEST(mismatching_uncompressed_bundle_size_parameter_returns_rpc_error);
+ CPPUNIT_TEST(true_deferred_activation_flag_can_be_roundtrip_encoded);
+ CPPUNIT_TEST(false_deferred_activation_flag_can_be_roundtrip_encoded);
+ CPPUNIT_TEST(activate_cluster_state_version_rpc_enqueues_command_with_version);
CPPUNIT_TEST_SUITE_END();
void baseline_set_distribution_states_rpc_enqueues_command_with_state_bundle();
@@ -35,6 +38,9 @@ public:
void set_distribution_rpc_is_immediately_failed_if_listener_is_closed();
void overly_large_uncompressed_bundle_size_parameter_returns_rpc_error();
void mismatching_uncompressed_bundle_size_parameter_returns_rpc_error();
+ void true_deferred_activation_flag_can_be_roundtrip_encoded();
+ void false_deferred_activation_flag_can_be_roundtrip_encoded();
+ void activate_cluster_state_version_rpc_enqueues_command_with_version();
};
CPPUNIT_TEST_SUITE_REGISTRATION(FNetListenerTest);
@@ -54,24 +60,25 @@ struct DummyReturnHandler : FRT_IReturnHandler {
FNET_Connection* GetConnection() override { return nullptr; }
};
-struct Fixture {
+struct FixtureBase {
// TODO factor out Slobrok code to avoid need to set up live ports for unrelated tests
mbus::Slobrok slobrok;
vdstestlib::DirConfig config;
MockOperationEnqueuer enqueuer;
std::unique_ptr<FNetListener> fnet_listener;
- SlimeClusterStateBundleCodec codec;
DummyReturnHandler return_handler;
bool request_is_detached{false};
FRT_RPCRequest* bound_request{nullptr};
- Fixture() : config(getStandardConfig(true)) {
+ FixtureBase()
+ : config(getStandardConfig(true))
+ {
config.getConfig("stor-server").set("node_index", "1");
addSlobrokConfig(config, slobrok);
fnet_listener = std::make_unique<FNetListener>(enqueuer, config.getConfigId(), 0);
}
- ~Fixture() {
+ virtual ~FixtureBase() {
// Must destroy any associated message contexts that may have refs to FRT_Request
// instance _before_ we destroy the request itself.
enqueuer._enqueued.clear();
@@ -79,6 +86,12 @@ struct Fixture {
bound_request->SubRef();
}
}
+};
+
+struct SetStateFixture : FixtureBase {
+ SlimeClusterStateBundleCodec codec;
+
+ SetStateFixture() : FixtureBase() {}
void bind_request_params(EncodedClusterStateBundle& encoded_bundle, uint32_t uncompressed_length) {
bound_request = new FRT_RPCRequest(); // Naked new isn't pretty, but FRT_RPCRequest has internal refcounting
@@ -123,6 +136,10 @@ struct Fixture {
lib::ClusterStateBundle dummy_baseline_bundle() const {
return lib::ClusterStateBundle(lib::ClusterState("version:123 distributor:3 storage:3"));
}
+
+ lib::ClusterStateBundle dummy_baseline_bundle_with_deferred_activation(bool deferred) const {
+ return lib::ClusterStateBundle(lib::ClusterState("version:123 distributor:3 storage:3"), {}, deferred);
+ }
};
std::shared_ptr<const lib::ClusterState> state_of(vespalib::stringref state) {
@@ -138,17 +155,17 @@ vespalib::string make_compressable_state_string() {
ss.str().data(), ss.str().data());
}
-}
+} // anon namespace
void FNetListenerTest::baseline_set_distribution_states_rpc_enqueues_command_with_state_bundle() {
- Fixture f;
+ SetStateFixture f;
auto baseline = f.dummy_baseline_bundle();
f.assert_request_received_and_propagated(baseline);
}
void FNetListenerTest::set_distribution_states_rpc_with_derived_enqueues_command_with_state_bundle() {
- Fixture f;
+ SetStateFixture f;
lib::ClusterStateBundle spaces_bundle(
lib::ClusterState("version:123 distributor:3 storage:3"),
{{FixedBucketSpaces::default_space(), state_of("version:123 distributor:3 storage:3 .0.s:d")},
@@ -158,7 +175,7 @@ void FNetListenerTest::set_distribution_states_rpc_with_derived_enqueues_command
}
void FNetListenerTest::compressed_bundle_is_transparently_uncompressed() {
- Fixture f;
+ SetStateFixture f;
auto state_str = make_compressable_state_string();
lib::ClusterStateBundle compressable_bundle{lib::ClusterState(state_str)};
@@ -171,24 +188,73 @@ void FNetListenerTest::compressed_bundle_is_transparently_uncompressed() {
}
void FNetListenerTest::set_distribution_rpc_is_immediately_failed_if_listener_is_closed() {
- Fixture f;
+ SetStateFixture f;
f.create_request(f.dummy_baseline_bundle());
f.fnet_listener->close();
f.assert_request_returns_error_response(RPCRequestWrapper::ERR_NODE_SHUTTING_DOWN);
}
void FNetListenerTest::overly_large_uncompressed_bundle_size_parameter_returns_rpc_error() {
- Fixture f;
+ SetStateFixture f;
auto encoded_bundle = f.codec.encode(f.dummy_baseline_bundle());
f.bind_request_params(encoded_bundle, FNetListener::StateBundleMaxUncompressedSize + 1);
f.assert_request_returns_error_response(RPCRequestWrapper::ERR_BAD_REQUEST);
}
void FNetListenerTest::mismatching_uncompressed_bundle_size_parameter_returns_rpc_error() {
- Fixture f;
+ SetStateFixture f;
auto encoded_bundle = f.codec.encode(f.dummy_baseline_bundle());
f.bind_request_params(encoded_bundle, encoded_bundle._buffer->getDataLen() + 100);
f.assert_request_returns_error_response(RPCRequestWrapper::ERR_BAD_REQUEST);
}
+void FNetListenerTest::true_deferred_activation_flag_can_be_roundtrip_encoded() {
+ SetStateFixture f;
+ f.assert_request_received_and_propagated(f.dummy_baseline_bundle_with_deferred_activation(true));
+
+}
+
+void FNetListenerTest::false_deferred_activation_flag_can_be_roundtrip_encoded() {
+ SetStateFixture f;
+ f.assert_request_received_and_propagated(f.dummy_baseline_bundle_with_deferred_activation(false));
+}
+
+struct ActivateStateFixture : FixtureBase {
+ ActivateStateFixture() : FixtureBase() {}
+
+ void bind_request_params(uint32_t activate_version) {
+ bound_request = new FRT_RPCRequest(); // Naked new isn't pretty, but FRT_RPCRequest has internal refcounting
+ auto* params = bound_request->GetParams();
+ params->AddInt32(activate_version);
+
+ bound_request->SetDetachedPT(&request_is_detached);
+ bound_request->SetReturnHandler(&return_handler);
+ }
+
+ void create_request(uint32_t activate_version) {
+ // Only 1 request allowed per fixture due to lifetime handling snags
+ assert(bound_request == nullptr);
+ bind_request_params(activate_version);
+ }
+
+ void assert_enqueued_operation_has_activate_version(uint32_t version) {
+ CPPUNIT_ASSERT(bound_request != nullptr);
+ CPPUNIT_ASSERT(request_is_detached);
+ CPPUNIT_ASSERT_EQUAL(size_t(1), enqueuer._enqueued.size());
+ auto& state_request = dynamic_cast<const api::ActivateClusterStateVersionCommand&>(*enqueuer._enqueued[0]);
+ CPPUNIT_ASSERT_EQUAL(version, state_request.version());
+ }
+
+ void assert_request_received_and_propagated(uint32_t activate_version) {
+ create_request(activate_version);
+ fnet_listener->RPC_activateClusterStateVersion(bound_request);
+ assert_enqueued_operation_has_activate_version(activate_version);
+ }
+};
+
+void FNetListenerTest::activate_cluster_state_version_rpc_enqueues_command_with_version() {
+ ActivateStateFixture f;
+ f.assert_request_received_and_propagated(1234567);
+}
+
}
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
index 7fb85ef0ecc..978d434847e 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
@@ -622,20 +622,25 @@ CommunicationManager::sendDirectRPCReply(
{
std::string requestName(request.getMethodName());
if (requestName == "getnodestate3") {
- api::GetNodeStateReply& gns(static_cast<api::GetNodeStateReply&>(*reply));
+ auto& gns(dynamic_cast<api::GetNodeStateReply&>(*reply));
std::ostringstream ns;
serializeNodeState(gns, ns, true, true, false);
request.addReturnString(ns.str().c_str());
request.addReturnString(gns.getNodeInfo().c_str());
LOGBP(debug, "Sending getnodestate3 reply with host info '%s'.", gns.getNodeInfo().c_str());
} else if (requestName == "getnodestate2") {
- api::GetNodeStateReply& gns(static_cast<api::GetNodeStateReply&>(*reply));
+ auto& gns(dynamic_cast<api::GetNodeStateReply&>(*reply));
std::ostringstream ns;
serializeNodeState(gns, ns, true, true, false);
request.addReturnString(ns.str().c_str());
LOGBP(debug, "Sending getnodestate2 reply with no host info.");
} else if (requestName == "setsystemstate2" || requestName == "setdistributionstates") {
// No data to return
+ } else if (requestName == "activate_cluster_state_version") {
+ auto& activate_reply(dynamic_cast<api::ActivateClusterStateVersionReply&>(*reply));
+ request.addReturnInt(activate_reply.actualVersion());
+ LOGBP(debug, "sending activate_cluster_state_version reply for version %u with actual version %u ",
+ activate_reply.activateVersion(), activate_reply.actualVersion());
} else {
request.addReturnInt(reply->getResult().getResult());
request.addReturnString(reply->getResult().getMessage().c_str());
diff --git a/storage/src/vespa/storage/storageserver/fnetlistener.cpp b/storage/src/vespa/storage/storageserver/fnetlistener.cpp
index e31bded772c..3654b17a2a7 100644
--- a/storage/src/vespa/storage/storageserver/fnetlistener.cpp
+++ b/storage/src/vespa/storage/storageserver/fnetlistener.cpp
@@ -92,6 +92,11 @@ FNetListener::initRPC()
rb.ParamDesc("uncompressedSize", "Uncompressed size for payload");
rb.ParamDesc("payload", "Binary Slime format payload");
//-------------------------------------------------------------------------
+ rb.DefineMethod("activate_cluster_state_version", "i", "i", FRT_METHOD(FNetListener::RPC_activateClusterStateVersion), this);
+ rb.MethodDesc("Explicitly activates an already prepared cluster state version");
+ rb.ParamDesc("activate_version", "Expected cluster state version to activate");
+ rb.ReturnDesc("actual_version", "Cluster state version that was prepared on the node prior to receiving RPC");
+ //-------------------------------------------------------------------------
rb.DefineMethod("getcurrenttime", "", "lis", FRT_METHOD(FNetListener::RPC_getCurrentTime), this);
rb.MethodDesc("Get current time on this node");
rb.ReturnDesc("seconds", "Current time in seconds since epoch");
@@ -211,4 +216,18 @@ void FNetListener::RPC_setDistributionStates(FRT_RPCRequest* req) {
detach_and_forward_to_enqueuer(std::move(cmd), req);
}
+void FNetListener::RPC_activateClusterStateVersion(FRT_RPCRequest* req) {
+ if (_closed) {
+ LOG(debug, "Not handling RPC call activate_cluster_state_version() as we have closed");
+ req->SetError(RPCRequestWrapper::ERR_NODE_SHUTTING_DOWN, "Node shutting down");
+ return;
+ }
+
+ const int32_t activate_version = req->GetParams()->GetValue(0)._intval32;
+ auto cmd = std::make_shared<api::ActivateClusterStateVersionCommand>(activate_version);
+ cmd->setPriority(api::StorageMessage::VERYHIGH);
+
+ detach_and_forward_to_enqueuer(std::move(cmd), req);
+}
+
}
diff --git a/storage/src/vespa/storage/storageserver/fnetlistener.h b/storage/src/vespa/storage/storageserver/fnetlistener.h
index abcba18e0be..2097be15491 100644
--- a/storage/src/vespa/storage/storageserver/fnetlistener.h
+++ b/storage/src/vespa/storage/storageserver/fnetlistener.h
@@ -26,6 +26,7 @@ public:
void RPC_setSystemState2(FRT_RPCRequest *req);
void RPC_getCurrentTime(FRT_RPCRequest *req);
void RPC_setDistributionStates(FRT_RPCRequest* req);
+ void RPC_activateClusterStateVersion(FRT_RPCRequest* req);
void registerHandle(vespalib::stringref handle);
void close();
diff --git a/storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.cpp b/storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.cpp
index 5b7e0ab4621..1f854bc724e 100644
--- a/storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.cpp
+++ b/storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.cpp
@@ -53,6 +53,9 @@ EncodedClusterStateBundle SlimeClusterStateBundleCodec::encode(
{
vespalib::Slime slime;
Cursor& root = slime.setObject();
+ if (bundle.deferredActivation()) {
+ root.setBool("deferred-activation", bundle.deferredActivation());
+ }
Cursor& states = root.setObject("states");
states.setString("baseline", serialize_state(*bundle.getBaselineClusterState()));
Cursor& spaces = states.setObject("spaces");
@@ -79,6 +82,7 @@ namespace {
static const Memory StatesField("states");
static const Memory BaselineField("baseline");
static const Memory SpacesField("spaces");
+static const Memory DeferredActivationField("deferred-activation");
struct StateInserter : vespalib::slime::ObjectTraverser {
lib::ClusterStateBundle::BucketSpaceStateMapping& _space_states;
@@ -118,8 +122,11 @@ std::shared_ptr<const lib::ClusterStateBundle> SlimeClusterStateBundleCodec::dec
lib::ClusterStateBundle::BucketSpaceStateMapping space_states;
StateInserter inserter(space_states);
spaces.traverse(inserter);
+
+ const bool deferred_activation = root[DeferredActivationField].asBool(); // Defaults to false if not set.
+
// TODO add shared_ptr constructor for baseline?
- return std::make_shared<lib::ClusterStateBundle>(baseline, std::move(space_states));
+ return std::make_shared<lib::ClusterStateBundle>(baseline, std::move(space_states), deferred_activation);
}
}
diff --git a/storage/src/vespa/storage/storageserver/statemanager.cpp b/storage/src/vespa/storage/storageserver/statemanager.cpp
index 95cb5dec696..9532f99dd2a 100644
--- a/storage/src/vespa/storage/storageserver/statemanager.cpp
+++ b/storage/src/vespa/storage/storageserver/statemanager.cpp
@@ -514,6 +514,15 @@ StateManager::onSetSystemState(
return true;
}
+bool
+StateManager::onActivateClusterStateVersion(
+ const std::shared_ptr<api::ActivateClusterStateVersionCommand>& cmd)
+{
+ // TODO invoke listeners and set actual version
+ sendUp(std::make_shared<api::ActivateClusterStateVersionReply>(*cmd));
+ return true;
+}
+
void
StateManager::run(framework::ThreadHandle& thread)
{
diff --git a/storage/src/vespa/storage/storageserver/statemanager.h b/storage/src/vespa/storage/storageserver/statemanager.h
index 0bacd41f6d9..57f0e02a136 100644
--- a/storage/src/vespa/storage/storageserver/statemanager.h
+++ b/storage/src/vespa/storage/storageserver/statemanager.h
@@ -137,6 +137,7 @@ private:
bool onGetNodeState(const std::shared_ptr<api::GetNodeStateCommand>&) override;
bool onSetSystemState(const std::shared_ptr<api::SetSystemStateCommand>&) override;
+ bool onActivateClusterStateVersion(const std::shared_ptr<api::ActivateClusterStateVersionCommand>&) override;
/**
* _stateLock MUST NOT be held while calling.