aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--storage/src/tests/storageserver/fnet_listener_test.cpp88
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.cpp9
-rw-r--r--storage/src/vespa/storage/storageserver/fnetlistener.cpp19
-rw-r--r--storage/src/vespa/storage/storageserver/fnetlistener.h1
-rw-r--r--storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.cpp9
-rw-r--r--storage/src/vespa/storage/storageserver/statemanager.cpp9
-rw-r--r--storage/src/vespa/storage/storageserver/statemanager.h1
-rw-r--r--storageapi/src/vespa/storageapi/message/state.cpp36
-rw-r--r--storageapi/src/vespa/storageapi/message/state.h24
-rw-r--r--storageapi/src/vespa/storageapi/messageapi/messagehandler.h8
-rw-r--r--storageapi/src/vespa/storageapi/messageapi/storagemessage.cpp2
-rw-r--r--storageapi/src/vespa/storageapi/messageapi/storagemessage.h4
-rw-r--r--vdslib/src/vespa/vdslib/state/cluster_state_bundle.cpp24
-rw-r--r--vdslib/src/vespa/vdslib/state/cluster_state_bundle.h5
14 files changed, 222 insertions, 17 deletions
diff --git a/storage/src/tests/storageserver/fnet_listener_test.cpp b/storage/src/tests/storageserver/fnet_listener_test.cpp
index 84051041d25..d40b230d725 100644
--- a/storage/src/tests/storageserver/fnet_listener_test.cpp
+++ b/storage/src/tests/storageserver/fnet_listener_test.cpp
@@ -27,6 +27,9 @@ public:
CPPUNIT_TEST(set_distribution_rpc_is_immediately_failed_if_listener_is_closed);
CPPUNIT_TEST(overly_large_uncompressed_bundle_size_parameter_returns_rpc_error);
CPPUNIT_TEST(mismatching_uncompressed_bundle_size_parameter_returns_rpc_error);
+ CPPUNIT_TEST(true_deferred_activation_flag_can_be_roundtrip_encoded);
+ CPPUNIT_TEST(false_deferred_activation_flag_can_be_roundtrip_encoded);
+ CPPUNIT_TEST(activate_cluster_state_version_rpc_enqueues_command_with_version);
CPPUNIT_TEST_SUITE_END();
void baseline_set_distribution_states_rpc_enqueues_command_with_state_bundle();
@@ -35,6 +38,9 @@ public:
void set_distribution_rpc_is_immediately_failed_if_listener_is_closed();
void overly_large_uncompressed_bundle_size_parameter_returns_rpc_error();
void mismatching_uncompressed_bundle_size_parameter_returns_rpc_error();
+ void true_deferred_activation_flag_can_be_roundtrip_encoded();
+ void false_deferred_activation_flag_can_be_roundtrip_encoded();
+ void activate_cluster_state_version_rpc_enqueues_command_with_version();
};
CPPUNIT_TEST_SUITE_REGISTRATION(FNetListenerTest);
@@ -54,24 +60,25 @@ struct DummyReturnHandler : FRT_IReturnHandler {
FNET_Connection* GetConnection() override { return nullptr; }
};
-struct Fixture {
+struct FixtureBase {
// TODO factor out Slobrok code to avoid need to set up live ports for unrelated tests
mbus::Slobrok slobrok;
vdstestlib::DirConfig config;
MockOperationEnqueuer enqueuer;
std::unique_ptr<FNetListener> fnet_listener;
- SlimeClusterStateBundleCodec codec;
DummyReturnHandler return_handler;
bool request_is_detached{false};
FRT_RPCRequest* bound_request{nullptr};
- Fixture() : config(getStandardConfig(true)) {
+ FixtureBase()
+ : config(getStandardConfig(true))
+ {
config.getConfig("stor-server").set("node_index", "1");
addSlobrokConfig(config, slobrok);
fnet_listener = std::make_unique<FNetListener>(enqueuer, config.getConfigId(), 0);
}
- ~Fixture() {
+ virtual ~FixtureBase() {
// Must destroy any associated message contexts that may have refs to FRT_Request
// instance _before_ we destroy the request itself.
enqueuer._enqueued.clear();
@@ -79,6 +86,12 @@ struct Fixture {
bound_request->SubRef();
}
}
+};
+
+struct SetStateFixture : FixtureBase {
+ SlimeClusterStateBundleCodec codec;
+
+ SetStateFixture() : FixtureBase() {}
void bind_request_params(EncodedClusterStateBundle& encoded_bundle, uint32_t uncompressed_length) {
bound_request = new FRT_RPCRequest(); // Naked new isn't pretty, but FRT_RPCRequest has internal refcounting
@@ -123,6 +136,10 @@ struct Fixture {
lib::ClusterStateBundle dummy_baseline_bundle() const {
return lib::ClusterStateBundle(lib::ClusterState("version:123 distributor:3 storage:3"));
}
+
+ lib::ClusterStateBundle dummy_baseline_bundle_with_deferred_activation(bool deferred) const {
+ return lib::ClusterStateBundle(lib::ClusterState("version:123 distributor:3 storage:3"), {}, deferred);
+ }
};
std::shared_ptr<const lib::ClusterState> state_of(vespalib::stringref state) {
@@ -138,17 +155,17 @@ vespalib::string make_compressable_state_string() {
ss.str().data(), ss.str().data());
}
-}
+} // anon namespace
void FNetListenerTest::baseline_set_distribution_states_rpc_enqueues_command_with_state_bundle() {
- Fixture f;
+ SetStateFixture f;
auto baseline = f.dummy_baseline_bundle();
f.assert_request_received_and_propagated(baseline);
}
void FNetListenerTest::set_distribution_states_rpc_with_derived_enqueues_command_with_state_bundle() {
- Fixture f;
+ SetStateFixture f;
lib::ClusterStateBundle spaces_bundle(
lib::ClusterState("version:123 distributor:3 storage:3"),
{{FixedBucketSpaces::default_space(), state_of("version:123 distributor:3 storage:3 .0.s:d")},
@@ -158,7 +175,7 @@ void FNetListenerTest::set_distribution_states_rpc_with_derived_enqueues_command
}
void FNetListenerTest::compressed_bundle_is_transparently_uncompressed() {
- Fixture f;
+ SetStateFixture f;
auto state_str = make_compressable_state_string();
lib::ClusterStateBundle compressable_bundle{lib::ClusterState(state_str)};
@@ -171,24 +188,73 @@ void FNetListenerTest::compressed_bundle_is_transparently_uncompressed() {
}
void FNetListenerTest::set_distribution_rpc_is_immediately_failed_if_listener_is_closed() {
- Fixture f;
+ SetStateFixture f;
f.create_request(f.dummy_baseline_bundle());
f.fnet_listener->close();
f.assert_request_returns_error_response(RPCRequestWrapper::ERR_NODE_SHUTTING_DOWN);
}
void FNetListenerTest::overly_large_uncompressed_bundle_size_parameter_returns_rpc_error() {
- Fixture f;
+ SetStateFixture f;
auto encoded_bundle = f.codec.encode(f.dummy_baseline_bundle());
f.bind_request_params(encoded_bundle, FNetListener::StateBundleMaxUncompressedSize + 1);
f.assert_request_returns_error_response(RPCRequestWrapper::ERR_BAD_REQUEST);
}
void FNetListenerTest::mismatching_uncompressed_bundle_size_parameter_returns_rpc_error() {
- Fixture f;
+ SetStateFixture f;
auto encoded_bundle = f.codec.encode(f.dummy_baseline_bundle());
f.bind_request_params(encoded_bundle, encoded_bundle._buffer->getDataLen() + 100);
f.assert_request_returns_error_response(RPCRequestWrapper::ERR_BAD_REQUEST);
}
+void FNetListenerTest::true_deferred_activation_flag_can_be_roundtrip_encoded() {
+ SetStateFixture f;
+ f.assert_request_received_and_propagated(f.dummy_baseline_bundle_with_deferred_activation(true));
+
+}
+
+void FNetListenerTest::false_deferred_activation_flag_can_be_roundtrip_encoded() {
+ SetStateFixture f;
+ f.assert_request_received_and_propagated(f.dummy_baseline_bundle_with_deferred_activation(false));
+}
+
+struct ActivateStateFixture : FixtureBase {
+ ActivateStateFixture() : FixtureBase() {}
+
+ void bind_request_params(uint32_t activate_version) {
+ bound_request = new FRT_RPCRequest(); // Naked new isn't pretty, but FRT_RPCRequest has internal refcounting
+ auto* params = bound_request->GetParams();
+ params->AddInt32(activate_version);
+
+ bound_request->SetDetachedPT(&request_is_detached);
+ bound_request->SetReturnHandler(&return_handler);
+ }
+
+ void create_request(uint32_t activate_version) {
+ // Only 1 request allowed per fixture due to lifetime handling snags
+ assert(bound_request == nullptr);
+ bind_request_params(activate_version);
+ }
+
+ void assert_enqueued_operation_has_activate_version(uint32_t version) {
+ CPPUNIT_ASSERT(bound_request != nullptr);
+ CPPUNIT_ASSERT(request_is_detached);
+ CPPUNIT_ASSERT_EQUAL(size_t(1), enqueuer._enqueued.size());
+ auto& state_request = dynamic_cast<const api::ActivateClusterStateVersionCommand&>(*enqueuer._enqueued[0]);
+ CPPUNIT_ASSERT_EQUAL(version, state_request.version());
+ }
+
+ void assert_request_received_and_propagated(uint32_t activate_version) {
+ create_request(activate_version);
+ fnet_listener->RPC_activateClusterStateVersion(bound_request);
+ assert_enqueued_operation_has_activate_version(activate_version);
+ }
+};
+
+void FNetListenerTest::activate_cluster_state_version_rpc_enqueues_command_with_version() {
+ ActivateStateFixture f;
+ f.assert_request_received_and_propagated(1234567);
+}
+
}
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
index 7fb85ef0ecc..978d434847e 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
@@ -622,20 +622,25 @@ CommunicationManager::sendDirectRPCReply(
{
std::string requestName(request.getMethodName());
if (requestName == "getnodestate3") {
- api::GetNodeStateReply& gns(static_cast<api::GetNodeStateReply&>(*reply));
+ auto& gns(dynamic_cast<api::GetNodeStateReply&>(*reply));
std::ostringstream ns;
serializeNodeState(gns, ns, true, true, false);
request.addReturnString(ns.str().c_str());
request.addReturnString(gns.getNodeInfo().c_str());
LOGBP(debug, "Sending getnodestate3 reply with host info '%s'.", gns.getNodeInfo().c_str());
} else if (requestName == "getnodestate2") {
- api::GetNodeStateReply& gns(static_cast<api::GetNodeStateReply&>(*reply));
+ auto& gns(dynamic_cast<api::GetNodeStateReply&>(*reply));
std::ostringstream ns;
serializeNodeState(gns, ns, true, true, false);
request.addReturnString(ns.str().c_str());
LOGBP(debug, "Sending getnodestate2 reply with no host info.");
} else if (requestName == "setsystemstate2" || requestName == "setdistributionstates") {
// No data to return
+ } else if (requestName == "activate_cluster_state_version") {
+ auto& activate_reply(dynamic_cast<api::ActivateClusterStateVersionReply&>(*reply));
+ request.addReturnInt(activate_reply.actualVersion());
+ LOGBP(debug, "sending activate_cluster_state_version reply for version %u with actual version %u ",
+ activate_reply.activateVersion(), activate_reply.actualVersion());
} else {
request.addReturnInt(reply->getResult().getResult());
request.addReturnString(reply->getResult().getMessage().c_str());
diff --git a/storage/src/vespa/storage/storageserver/fnetlistener.cpp b/storage/src/vespa/storage/storageserver/fnetlistener.cpp
index e31bded772c..3654b17a2a7 100644
--- a/storage/src/vespa/storage/storageserver/fnetlistener.cpp
+++ b/storage/src/vespa/storage/storageserver/fnetlistener.cpp
@@ -92,6 +92,11 @@ FNetListener::initRPC()
rb.ParamDesc("uncompressedSize", "Uncompressed size for payload");
rb.ParamDesc("payload", "Binary Slime format payload");
//-------------------------------------------------------------------------
+ rb.DefineMethod("activate_cluster_state_version", "i", "i", FRT_METHOD(FNetListener::RPC_activateClusterStateVersion), this);
+ rb.MethodDesc("Explicitly activates an already prepared cluster state version");
+ rb.ParamDesc("activate_version", "Expected cluster state version to activate");
+ rb.ReturnDesc("actual_version", "Cluster state version that was prepared on the node prior to receiving RPC");
+ //-------------------------------------------------------------------------
rb.DefineMethod("getcurrenttime", "", "lis", FRT_METHOD(FNetListener::RPC_getCurrentTime), this);
rb.MethodDesc("Get current time on this node");
rb.ReturnDesc("seconds", "Current time in seconds since epoch");
@@ -211,4 +216,18 @@ void FNetListener::RPC_setDistributionStates(FRT_RPCRequest* req) {
detach_and_forward_to_enqueuer(std::move(cmd), req);
}
+void FNetListener::RPC_activateClusterStateVersion(FRT_RPCRequest* req) {
+ if (_closed) {
+ LOG(debug, "Not handling RPC call activate_cluster_state_version() as we have closed");
+ req->SetError(RPCRequestWrapper::ERR_NODE_SHUTTING_DOWN, "Node shutting down");
+ return;
+ }
+
+ const int32_t activate_version = req->GetParams()->GetValue(0)._intval32;
+ auto cmd = std::make_shared<api::ActivateClusterStateVersionCommand>(activate_version);
+ cmd->setPriority(api::StorageMessage::VERYHIGH);
+
+ detach_and_forward_to_enqueuer(std::move(cmd), req);
+}
+
}
diff --git a/storage/src/vespa/storage/storageserver/fnetlistener.h b/storage/src/vespa/storage/storageserver/fnetlistener.h
index abcba18e0be..2097be15491 100644
--- a/storage/src/vespa/storage/storageserver/fnetlistener.h
+++ b/storage/src/vespa/storage/storageserver/fnetlistener.h
@@ -26,6 +26,7 @@ public:
void RPC_setSystemState2(FRT_RPCRequest *req);
void RPC_getCurrentTime(FRT_RPCRequest *req);
void RPC_setDistributionStates(FRT_RPCRequest* req);
+ void RPC_activateClusterStateVersion(FRT_RPCRequest* req);
void registerHandle(vespalib::stringref handle);
void close();
diff --git a/storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.cpp b/storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.cpp
index 5b7e0ab4621..1f854bc724e 100644
--- a/storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.cpp
+++ b/storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.cpp
@@ -53,6 +53,9 @@ EncodedClusterStateBundle SlimeClusterStateBundleCodec::encode(
{
vespalib::Slime slime;
Cursor& root = slime.setObject();
+ if (bundle.deferredActivation()) {
+ root.setBool("deferred-activation", bundle.deferredActivation());
+ }
Cursor& states = root.setObject("states");
states.setString("baseline", serialize_state(*bundle.getBaselineClusterState()));
Cursor& spaces = states.setObject("spaces");
@@ -79,6 +82,7 @@ namespace {
static const Memory StatesField("states");
static const Memory BaselineField("baseline");
static const Memory SpacesField("spaces");
+static const Memory DeferredActivationField("deferred-activation");
struct StateInserter : vespalib::slime::ObjectTraverser {
lib::ClusterStateBundle::BucketSpaceStateMapping& _space_states;
@@ -118,8 +122,11 @@ std::shared_ptr<const lib::ClusterStateBundle> SlimeClusterStateBundleCodec::dec
lib::ClusterStateBundle::BucketSpaceStateMapping space_states;
StateInserter inserter(space_states);
spaces.traverse(inserter);
+
+ const bool deferred_activation = root[DeferredActivationField].asBool(); // Defaults to false if not set.
+
// TODO add shared_ptr constructor for baseline?
- return std::make_shared<lib::ClusterStateBundle>(baseline, std::move(space_states));
+ return std::make_shared<lib::ClusterStateBundle>(baseline, std::move(space_states), deferred_activation);
}
}
diff --git a/storage/src/vespa/storage/storageserver/statemanager.cpp b/storage/src/vespa/storage/storageserver/statemanager.cpp
index 95cb5dec696..9532f99dd2a 100644
--- a/storage/src/vespa/storage/storageserver/statemanager.cpp
+++ b/storage/src/vespa/storage/storageserver/statemanager.cpp
@@ -514,6 +514,15 @@ StateManager::onSetSystemState(
return true;
}
+bool
+StateManager::onActivateClusterStateVersion(
+ const std::shared_ptr<api::ActivateClusterStateVersionCommand>& cmd)
+{
+ // TODO invoke listeners and set actual version
+ sendUp(std::make_shared<api::ActivateClusterStateVersionReply>(*cmd));
+ return true;
+}
+
void
StateManager::run(framework::ThreadHandle& thread)
{
diff --git a/storage/src/vespa/storage/storageserver/statemanager.h b/storage/src/vespa/storage/storageserver/statemanager.h
index 0bacd41f6d9..57f0e02a136 100644
--- a/storage/src/vespa/storage/storageserver/statemanager.h
+++ b/storage/src/vespa/storage/storageserver/statemanager.h
@@ -137,6 +137,7 @@ private:
bool onGetNodeState(const std::shared_ptr<api::GetNodeStateCommand>&) override;
bool onSetSystemState(const std::shared_ptr<api::SetSystemStateCommand>&) override;
+ bool onActivateClusterStateVersion(const std::shared_ptr<api::ActivateClusterStateVersionCommand>&) override;
/**
* _stateLock MUST NOT be held while calling.
diff --git a/storageapi/src/vespa/storageapi/message/state.cpp b/storageapi/src/vespa/storageapi/message/state.cpp
index efa9a45764f..071dba16b91 100644
--- a/storageapi/src/vespa/storageapi/message/state.cpp
+++ b/storageapi/src/vespa/storageapi/message/state.cpp
@@ -12,6 +12,8 @@ IMPLEMENT_COMMAND(GetNodeStateCommand, GetNodeStateReply)
IMPLEMENT_REPLY(GetNodeStateReply)
IMPLEMENT_COMMAND(SetSystemStateCommand, SetSystemStateReply)
IMPLEMENT_REPLY(SetSystemStateReply)
+IMPLEMENT_COMMAND(ActivateClusterStateVersionCommand, ActivateClusterStateVersionReply)
+IMPLEMENT_REPLY(ActivateClusterStateVersionReply)
GetNodeStateCommand::GetNodeStateCommand(lib::NodeState::UP expectedState)
: StorageCommand(MessageType::GETNODESTATE),
@@ -102,5 +104,39 @@ SetSystemStateReply::print(std::ostream& out, bool verbose,
}
}
+ActivateClusterStateVersionCommand::ActivateClusterStateVersionCommand(uint32_t version)
+ : StorageCommand(MessageType::ACTIVATE_CLUSTER_STATE_VERSION),
+ _version(version)
+{
+}
+
+void ActivateClusterStateVersionCommand::print(std::ostream& out, bool verbose,
+ const std::string& indent) const
+{
+ out << "ActivateClusterStateVersionCommand(" << _version << ")";
+ if (verbose) {
+ out << " : ";
+ StorageCommand::print(out, verbose, indent);
+ }
+}
+
+ActivateClusterStateVersionReply::ActivateClusterStateVersionReply(const ActivateClusterStateVersionCommand& cmd)
+ : StorageReply(cmd),
+ _activateVersion(cmd.version()),
+ _actualVersion(0) // Must be set explicitly
+{
+}
+
+void ActivateClusterStateVersionReply::print(std::ostream& out, bool verbose,
+ const std::string& indent) const
+{
+ out << "ActivateClusterStateVersionReply(activate " << _activateVersion
+ << ", actual " << _actualVersion << ")";
+ if (verbose) {
+ out << " : ";
+ StorageReply::print(out, verbose, indent);
+ }
+}
+
} // api
} // storage
diff --git a/storageapi/src/vespa/storageapi/message/state.h b/storageapi/src/vespa/storageapi/message/state.h
index 4e5ad92b259..e48dce76fbb 100644
--- a/storageapi/src/vespa/storageapi/message/state.h
+++ b/storageapi/src/vespa/storageapi/message/state.h
@@ -93,4 +93,28 @@ public:
DECLARE_STORAGEREPLY(SetSystemStateReply, onSetSystemStateReply)
};
+class ActivateClusterStateVersionCommand : public StorageCommand {
+ uint32_t _version;
+public:
+ explicit ActivateClusterStateVersionCommand(uint32_t version);
+ uint32_t version() const noexcept { return _version; }
+ void print(std::ostream& out, bool verbose, const std::string& indent) const override;
+
+ DECLARE_STORAGECOMMAND(ActivateClusterStateVersionCommand, onActivateClusterStateVersion);
+
+};
+
+class ActivateClusterStateVersionReply : public StorageReply {
+ uint32_t _activateVersion;
+ uint32_t _actualVersion;
+public:
+ explicit ActivateClusterStateVersionReply(const ActivateClusterStateVersionCommand&);
+ uint32_t activateVersion() const noexcept { return _activateVersion; }
+ void setActualVersion(uint32_t version) noexcept { _actualVersion = version; }
+ uint32_t actualVersion() const noexcept { return _actualVersion; }
+ void print(std::ostream& out, bool verbose, const std::string& indent) const override;
+
+ DECLARE_STORAGEREPLY(ActivateClusterStateVersionReply, onActivateClusterStateVersionReply);
+};
+
}
diff --git a/storageapi/src/vespa/storageapi/messageapi/messagehandler.h b/storageapi/src/vespa/storageapi/messageapi/messagehandler.h
index a9c1dfb8f26..27ee509e859 100644
--- a/storageapi/src/vespa/storageapi/messageapi/messagehandler.h
+++ b/storageapi/src/vespa/storageapi/messageapi/messagehandler.h
@@ -50,6 +50,8 @@ class NotifyBucketChangeCommand;
class SetNodeStateCommand;
class GetNodeStateCommand;
class SetSystemStateCommand;
+class ActivateClusterStateVersionCommand;
+class ActivateClusterStateVersionReply;
class GetSystemStateCommand;
class GetBucketNodesCommand;
class BucketsAddedCommand;
@@ -276,6 +278,12 @@ public:
virtual bool onSetSystemStateReply(
const std::shared_ptr<api::SetSystemStateReply>&)
{ return false; }
+ virtual bool onActivateClusterStateVersion(
+ const std::shared_ptr<api::ActivateClusterStateVersionCommand>&)
+ { return false; }
+ virtual bool onActivateClusterStateVersionReply(
+ const std::shared_ptr<api::ActivateClusterStateVersionReply>&)
+ { return false; }
virtual bool onGetSystemState(
const std::shared_ptr<api::GetSystemStateCommand>&)
{ return false; }
diff --git a/storageapi/src/vespa/storageapi/messageapi/storagemessage.cpp b/storageapi/src/vespa/storageapi/messageapi/storagemessage.cpp
index bab475eea32..40422ce06c4 100644
--- a/storageapi/src/vespa/storageapi/messageapi/storagemessage.cpp
+++ b/storageapi/src/vespa/storageapi/messageapi/storagemessage.cpp
@@ -77,6 +77,8 @@ const MessageType MessageType::SETSYSTEMSTATE("Set system state", SETSYSTEMSTATE
const MessageType MessageType::SETSYSTEMSTATE_REPLY("Set system state reply", SETSYSTEMSTATE_REPLY_ID, &MessageType::SETSYSTEMSTATE);
const MessageType MessageType::GETSYSTEMSTATE("Get system state", GETSYSTEMSTATE_ID);
const MessageType MessageType::GETSYSTEMSTATE_REPLY("get system state reply", GETSYSTEMSTATE_REPLY_ID, &MessageType::GETSYSTEMSTATE);
+const MessageType MessageType::ACTIVATE_CLUSTER_STATE_VERSION("Activate cluster state version", ACTIVATE_CLUSTER_STATE_VERSION_ID);
+const MessageType MessageType::ACTIVATE_CLUSTER_STATE_VERSION_REPLY("Activate cluster state version reply", ACTIVATE_CLUSTER_STATE_VERSION_REPLY_ID, &MessageType::ACTIVATE_CLUSTER_STATE_VERSION);
const MessageType MessageType::GETBUCKETDIFF("GetBucketDiff", GETBUCKETDIFF_ID);
const MessageType MessageType::GETBUCKETDIFF_REPLY("GetBucketDiff reply", GETBUCKETDIFF_REPLY_ID, &MessageType::GETBUCKETDIFF);
const MessageType MessageType::APPLYBUCKETDIFF("ApplyBucketDiff", APPLYBUCKETDIFF_ID);
diff --git a/storageapi/src/vespa/storageapi/messageapi/storagemessage.h b/storageapi/src/vespa/storageapi/messageapi/storagemessage.h
index c9f6e737a47..8c2338a020c 100644
--- a/storageapi/src/vespa/storageapi/messageapi/storagemessage.h
+++ b/storageapi/src/vespa/storageapi/messageapi/storagemessage.h
@@ -149,6 +149,8 @@ public:
QUERYRESULT_REPLY_ID = 89,
SETBUCKETSTATE_ID = 94,
SETBUCKETSTATE_REPLY_ID = 95,
+ ACTIVATE_CLUSTER_STATE_VERSION_ID = 96,
+ ACTIVATE_CLUSTER_STATE_VERSION_REPLY_ID = 97,
MESSAGETYPE_MAX_ID
};
@@ -195,6 +197,8 @@ public:
static const MessageType SETSYSTEMSTATE_REPLY;
static const MessageType GETSYSTEMSTATE;
static const MessageType GETSYSTEMSTATE_REPLY;
+ static const MessageType ACTIVATE_CLUSTER_STATE_VERSION;
+ static const MessageType ACTIVATE_CLUSTER_STATE_VERSION_REPLY;
static const MessageType BUCKETSADDED;
static const MessageType BUCKETSADDED_REPLY;
static const MessageType BUCKETSREMOVED;
diff --git a/vdslib/src/vespa/vdslib/state/cluster_state_bundle.cpp b/vdslib/src/vespa/vdslib/state/cluster_state_bundle.cpp
index ed561d67f6d..68a279f04f8 100644
--- a/vdslib/src/vespa/vdslib/state/cluster_state_bundle.cpp
+++ b/vdslib/src/vespa/vdslib/state/cluster_state_bundle.cpp
@@ -8,14 +8,25 @@
namespace storage::lib {
ClusterStateBundle::ClusterStateBundle(const ClusterState &baselineClusterState)
- : _baselineClusterState(std::make_shared<const ClusterState>(baselineClusterState))
+ : _baselineClusterState(std::make_shared<const ClusterState>(baselineClusterState)),
+ _deferredActivation(false)
{
}
ClusterStateBundle::ClusterStateBundle(const ClusterState& baselineClusterState,
BucketSpaceStateMapping derivedBucketSpaceStates)
: _baselineClusterState(std::make_shared<const ClusterState>(baselineClusterState)),
- _derivedBucketSpaceStates(std::move(derivedBucketSpaceStates))
+ _derivedBucketSpaceStates(std::move(derivedBucketSpaceStates)),
+ _deferredActivation(false)
+{
+}
+
+ClusterStateBundle::ClusterStateBundle(const ClusterState& baselineClusterState,
+ BucketSpaceStateMapping derivedBucketSpaceStates,
+ bool deferredActivation)
+ : _baselineClusterState(std::make_shared<const ClusterState>(baselineClusterState)),
+ _derivedBucketSpaceStates(std::move(derivedBucketSpaceStates)),
+ _deferredActivation(deferredActivation)
{
}
@@ -52,6 +63,9 @@ ClusterStateBundle::operator==(const ClusterStateBundle &rhs) const
if (_derivedBucketSpaceStates.size() != rhs._derivedBucketSpaceStates.size()) {
return false;
}
+ if (_deferredActivation != rhs._deferredActivation) {
+ return false;
+ }
// Can't do a regular operator== comparison since we must check equality
// of cluster state _values_, not their _pointers_.
for (auto& lhs_ds : _derivedBucketSpaceStates) {
@@ -74,7 +88,11 @@ std::ostream& operator<<(std::ostream& os, const ClusterStateBundle& bundle) {
os << " '" << *ds.second;
}
}
- os << "')";
+ os << '\'';
+ if (bundle.deferredActivation()) {
+ os << " (deferred activation)";
+ }
+ os << ")";
return os;
}
diff --git a/vdslib/src/vespa/vdslib/state/cluster_state_bundle.h b/vdslib/src/vespa/vdslib/state/cluster_state_bundle.h
index a64416762b8..a9e84225c1f 100644
--- a/vdslib/src/vespa/vdslib/state/cluster_state_bundle.h
+++ b/vdslib/src/vespa/vdslib/state/cluster_state_bundle.h
@@ -24,10 +24,14 @@ public:
>;
std::shared_ptr<const ClusterState> _baselineClusterState;
BucketSpaceStateMapping _derivedBucketSpaceStates;
+ bool _deferredActivation;
public:
explicit ClusterStateBundle(const ClusterState &baselineClusterState);
ClusterStateBundle(const ClusterState& baselineClusterState,
BucketSpaceStateMapping derivedBucketSpaceStates);
+ ClusterStateBundle(const ClusterState& baselineClusterState,
+ BucketSpaceStateMapping derivedBucketSpaceStates,
+ bool deferredActivation);
~ClusterStateBundle();
const std::shared_ptr<const ClusterState> &getBaselineClusterState() const;
const std::shared_ptr<const ClusterState> &getDerivedClusterState(document::BucketSpace bucketSpace) const;
@@ -35,6 +39,7 @@ public:
return _derivedBucketSpaceStates;
}
uint32_t getVersion() const;
+ bool deferredActivation() const noexcept { return _deferredActivation; }
bool operator==(const ClusterStateBundle &rhs) const;
bool operator!=(const ClusterStateBundle &rhs) const { return !operator==(rhs); }
};