summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2019-03-14 14:40:52 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2019-03-14 14:44:30 +0000
commit33b3bdca77a0141e4f20185024f8e772209ffbef (patch)
tree97e2169325a257ee345e4840a8616d0187df08a8 /storage
parentc5d95cd19e86f2d3c337122226efd946f47d752e (diff)
WIP on BucketDBUpdater explicit activation support
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/distributor/bucketdbupdatertest.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/bucketdbupdater.cpp77
-rw-r--r--storage/src/vespa/storage/distributor/bucketdbupdater.h6
-rw-r--r--storage/src/vespa/storage/distributor/pendingclusterstate.cpp4
-rw-r--r--storage/src/vespa/storage/distributor/pendingclusterstate.h28
-rw-r--r--storage/src/vespa/storage/storageserver/statemanager.cpp5
6 files changed, 115 insertions, 7 deletions
diff --git a/storage/src/tests/distributor/bucketdbupdatertest.cpp b/storage/src/tests/distributor/bucketdbupdatertest.cpp
index 6c70988cbc6..fd9512b2bd6 100644
--- a/storage/src/tests/distributor/bucketdbupdatertest.cpp
+++ b/storage/src/tests/distributor/bucketdbupdatertest.cpp
@@ -233,6 +233,8 @@ public:
void setUp() override {
createLinks();
_bucketSpaces = getBucketSpaces();
+ // Disable deferred activation by default (at least for now) to avoid breaking the entire world.
+ getConfig().setAllowStaleReadsDuringClusterStateTransitions(false);
};
void tearDown() override {
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
index 9c6ec4a7f0e..20ec67fdb14 100644
--- a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
+++ b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
@@ -54,6 +54,13 @@ BucketDBUpdater::print(std::ostream& out, bool verbose, const std::string& inden
}
bool
+BucketDBUpdater::shouldDeferStateEnabling() const noexcept
+{
+ return _distributorComponent.getDistributor().getConfig()
+ .allowStaleReadsDuringClusterStateTransitions();
+}
+
+bool
BucketDBUpdater::hasPendingClusterState() const
{
return static_cast<bool>(_pendingClusterState);
@@ -195,6 +202,16 @@ BucketDBUpdater::replyToPreviousPendingClusterStateIfAny()
}
}
+void
+BucketDBUpdater::replyToActivationWithActualVersion(
+ const api::ActivateClusterStateVersionCommand& cmd,
+ uint32_t actualVersion)
+{
+ auto reply = std::make_shared<api::ActivateClusterStateVersionReply>(cmd);
+ reply->setActualVersion(actualVersion);
+ _distributorComponent.sendUp(reply); // TODO let API accept rvalues
+}
+
bool
BucketDBUpdater::onSetSystemState(
const std::shared_ptr<api::SetSystemStateCommand>& cmd)
@@ -237,6 +254,37 @@ BucketDBUpdater::onSetSystemState(
return true;
}
+bool
+BucketDBUpdater::onActivateClusterStateVersion(const std::shared_ptr<api::ActivateClusterStateVersionCommand>& cmd)
+{
+ // TODO test edges!
+ if (hasPendingClusterState() && _pendingClusterState->isVersionedTransition()) {
+ const auto pending_version = _pendingClusterState->clusterStateVersion();
+ if (pending_version == cmd->version()) {
+ if (isPendingClusterStateCompleted()) {
+ assert(_pendingClusterState->isDeferred());
+ activatePendingClusterState();
+ } else {
+ LOG(error, "Received cluster state activation for pending version %u "
+ "without pending state being complete yet. This is not expected, "
+ "as no activation should be sent before all distributors have "
+ "reported that state processing is complete.", pending_version);
+ replyToActivationWithActualVersion(*cmd, 0); // Invalid version, will cause re-send (hopefully when completed).
+ return true;
+ }
+ } else {
+ replyToActivationWithActualVersion(*cmd, pending_version);
+ return true;
+ }
+ } else {
+ // Likely just a resend, but log warn for now to get a feel of how common it is.
+ LOG(warning, "Received cluster state activation command for version %u, which "
+ "has no corresponding pending state. Resent operation?", cmd->version());
+ }
+ // Fall through to next link in call chain that cares about this message.
+ return false;
+}
+
BucketDBUpdater::MergeReplyGuard::~MergeReplyGuard()
{
if (_reply) {
@@ -497,14 +545,37 @@ BucketDBUpdater::isPendingClusterStateCompleted() const
void
BucketDBUpdater::processCompletedPendingClusterState()
{
+ if (_pendingClusterState->isDeferred()) {
+ assert(_pendingClusterState->hasCommand()); // Deferred transitions should only ever be created by state commands.
+ // Sending down SetSystemState command will reach the state manager and a reply
+ // will be auto-sent back to the cluster controller in charge. Once this happens,
+ // it will send an explicit activation command once all distributors have reported
+ // that their pending cluster states have completed.
+ _distributorComponent.getDistributor().getMessageSender().sendDown(
+ _pendingClusterState->getCommand());
+ _pendingClusterState->clearCommand();
+ return;
+ }
+ // Distribution config change or non-deferred cluster state. Immediately activate
+ // the pending state without being told to do so explicitly.
+ activatePendingClusterState();
+}
+
+void
+BucketDBUpdater::activatePendingClusterState()
+{
_pendingClusterState->mergeIntoBucketDatabases();
- if (_pendingClusterState->getCommand().get()) {
+ if (_pendingClusterState->isVersionedTransition()) {
enableCurrentClusterStateBundleInDistributor();
- _distributorComponent.getDistributor().getMessageSender().sendDown(
- _pendingClusterState->getCommand());
+ if (_pendingClusterState->hasCommand()) {
+ _distributorComponent.getDistributor().getMessageSender().sendDown(
+ _pendingClusterState->getCommand());
+ }
addCurrentStateToClusterStateHistory();
} else {
+ // TODO distribution changes cannot currently be deferred as they are not
+ // initiated by the cluster controller!
_distributorComponent.getDistributor().notifyDistributionChangeEnabled();
}
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.h b/storage/src/vespa/storage/distributor/bucketdbupdater.h
index 18893edaeda..973495d2007 100644
--- a/storage/src/vespa/storage/distributor/bucketdbupdater.h
+++ b/storage/src/vespa/storage/distributor/bucketdbupdater.h
@@ -44,6 +44,7 @@ public:
void recheckBucketInfo(uint32_t nodeIdx, const document::Bucket& bucket);
bool onSetSystemState(const std::shared_ptr<api::SetSystemStateCommand>& cmd) override;
+ bool onActivateClusterStateVersion(const std::shared_ptr<api::ActivateClusterStateVersionCommand>& cmd) override;
bool onRequestBucketInfoReply(const std::shared_ptr<api::RequestBucketInfoReply> & repl) override;
bool onMergeBucketReply(const std::shared_ptr<api::MergeBucketReply>& reply) override;
bool onNotifyBucketChange(const std::shared_ptr<api::NotifyBucketChangeCommand>&) override;
@@ -125,6 +126,7 @@ private:
}
};
+ bool shouldDeferStateEnabling() const noexcept;
bool hasPendingClusterState() const;
bool pendingClusterStateAccepted(const std::shared_ptr<api::RequestBucketInfoReply>& repl);
bool processSingleBucketInfoReply(const std::shared_ptr<api::RequestBucketInfoReply>& repl);
@@ -132,6 +134,7 @@ private:
const BucketRequest& req);
bool isPendingClusterStateCompleted() const;
void processCompletedPendingClusterState();
+ void activatePendingClusterState();
void mergeBucketInfoWithDatabase(const std::shared_ptr<api::RequestBucketInfoReply>& repl,
const BucketRequest& req);
void convertBucketInfoToBucketList(const std::shared_ptr<api::RequestBucketInfoReply>& repl,
@@ -162,6 +165,9 @@ private:
void removeSuperfluousBuckets(const lib::ClusterStateBundle& newState);
void replyToPreviousPendingClusterStateIfAny();
+ void replyToActivationWithActualVersion(
+ const api::ActivateClusterStateVersionCommand& cmd,
+ uint32_t actualVersion);
void enableCurrentClusterStateBundleInDistributor();
void addCurrentStateToClusterStateHistory();
diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
index ddbf87d4f24..6cba7084037 100644
--- a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
+++ b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
@@ -42,6 +42,8 @@ PendingClusterState::PendingClusterState(
_sender(sender),
_bucketSpaceRepo(bucketSpaceRepo),
_readOnlyBucketSpaceRepo(readOnlyBucketSpaceRepo),
+ _clusterStateVersion(_cmd->getClusterStateBundle().getVersion()),
+ _isVersionedTransition(true),
_bucketOwnershipTransfer(false),
_pendingTransitions()
{
@@ -65,6 +67,8 @@ PendingClusterState::PendingClusterState(
_sender(sender),
_bucketSpaceRepo(bucketSpaceRepo),
_readOnlyBucketSpaceRepo(readOnlyBucketSpaceRepo),
+ _clusterStateVersion(0),
+ _isVersionedTransition(false),
_bucketOwnershipTransfer(true),
_pendingTransitions()
{
diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.h b/storage/src/vespa/storage/distributor/pendingclusterstate.h
index cfccf36a61e..cedc0573381 100644
--- a/storage/src/vespa/storage/distributor/pendingclusterstate.h
+++ b/storage/src/vespa/storage/distributor/pendingclusterstate.h
@@ -108,10 +108,31 @@ public:
return _bucketOwnershipTransfer;
}
+ bool hasCommand() const noexcept {
+ return (_cmd.get() != nullptr);
+ }
+
std::shared_ptr<api::SetSystemStateCommand> getCommand() {
return _cmd;
}
+ bool isVersionedTransition() const noexcept {
+ return _isVersionedTransition;
+ }
+
+ uint32_t clusterStateVersion() const noexcept {
+ return _clusterStateVersion;
+ }
+
+ bool isDeferred() const noexcept {
+ return (isVersionedTransition()
+ && _newClusterStateBundle.deferredActivation());
+ }
+
+ void clearCommand() {
+ _cmd.reset();
+ }
+
const lib::ClusterStateBundle& getNewClusterStateBundle() const {
return _newClusterStateBundle;
}
@@ -210,9 +231,10 @@ private:
api::Timestamp _creationTimestamp;
DistributorMessageSender& _sender;
- DistributorBucketSpaceRepo &_bucketSpaceRepo;
- DistributorBucketSpaceRepo &_readOnlyBucketSpaceRepo;
-
+ DistributorBucketSpaceRepo& _bucketSpaceRepo;
+ DistributorBucketSpaceRepo& _readOnlyBucketSpaceRepo;
+ uint32_t _clusterStateVersion;
+ bool _isVersionedTransition;
bool _bucketOwnershipTransfer;
std::unordered_map<document::BucketSpace, std::unique_ptr<PendingBucketSpaceDbTransition>, document::BucketSpace::hash> _pendingTransitions;
};
diff --git a/storage/src/vespa/storage/storageserver/statemanager.cpp b/storage/src/vespa/storage/storageserver/statemanager.cpp
index 9532f99dd2a..02b9a765dd8 100644
--- a/storage/src/vespa/storage/storageserver/statemanager.cpp
+++ b/storage/src/vespa/storage/storageserver/statemanager.cpp
@@ -518,7 +518,10 @@ bool
StateManager::onActivateClusterStateVersion(
const std::shared_ptr<api::ActivateClusterStateVersionCommand>& cmd)
{
- // TODO invoke listeners and set actual version
+ // TODO we probably don't want to invoke listeners here? but just bounce with
+ // currently activated bundle version?
+ // Must ensure that layer above (i.e. distributor) maintains strict operation
+ // ordering.
sendUp(std::make_shared<api::ActivateClusterStateVersionReply>(*cmd));
return true;
}