summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2021-03-03 15:58:38 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2021-03-03 15:58:38 +0000
commit1725ce1df537041cc4894be60b16f247af241c9a (patch)
tree2c11f313bd2d184201a3adafef864f88c62eb329
parent5d72bdae4df1e699202bc2f38212e763018036cd (diff)
Guard against processing bucket requests with inconsistent internal state version
There's a tiny window of time between when the bucket manager observes a new state version and when the state version actually is visible in the rest of the process. We must ensure that we don't end up processing requests when these two differ, or we might erroneously process requests for version X using a state only valid for version Y < X.
-rw-r--r--storage/src/tests/bucketdb/bucketmanagertest.cpp45
-rw-r--r--storage/src/vespa/storage/bucketdb/bucketmanager.cpp15
-rw-r--r--storage/src/vespa/storage/distributor/pendingclusterstate.cpp2
3 files changed, 50 insertions, 12 deletions
diff --git a/storage/src/tests/bucketdb/bucketmanagertest.cpp b/storage/src/tests/bucketdb/bucketmanagertest.cpp
index e14bb8e11d1..b83ccccbf48 100644
--- a/storage/src/tests/bucketdb/bucketmanagertest.cpp
+++ b/storage/src/tests/bucketdb/bucketmanagertest.cpp
@@ -556,7 +556,7 @@ class ConcurrentOperationFixture {
public:
explicit ConcurrentOperationFixture(BucketManagerTest& self)
: _self(self),
- _state("distributor:1 storage:1")
+ _state(std::make_shared<lib::ClusterState>("distributor:1 storage:1"))
{
_self.setupTestEnvironment();
_self._top->open();
@@ -566,9 +566,7 @@ public:
// Need a cluster state to work with initially, so that processing
// bucket requests can calculate a target distributor.
- _self._node->setClusterState(_state);
- _self._manager->onDown(
- std::make_shared<api::SetSystemStateCommand>(_state));
+ updater_internal_cluster_state_with_current();
}
void setUp(const WithBuckets& buckets) {
@@ -577,6 +575,16 @@ public:
}
}
+ void updater_internal_cluster_state_with_current() {
+ _self._node->setClusterState(*_state);
+ _self._manager->onDown(std::make_shared<api::SetSystemStateCommand>(*_state));
+ }
+
+ void update_cluster_state(const lib::ClusterState& state) {
+ _state = std::make_shared<lib::ClusterState>(state);
+ updater_internal_cluster_state_with_current();
+ }
+
auto acquireBucketLock(const document::BucketId& bucket) {
return _self._node->getStorageBucketDatabase().get(bucket, "foo");
}
@@ -610,15 +618,15 @@ public:
}
auto createFullFetchCommand() const {
- return std::make_shared<api::RequestBucketInfoCommand>(makeBucketSpace(), 0, _state);
+ return std::make_shared<api::RequestBucketInfoCommand>(makeBucketSpace(), 0, *_state);
}
auto createFullFetchCommandWithHash(vespalib::stringref hash) const {
- return std::make_shared<api::RequestBucketInfoCommand>(makeBucketSpace(), 0, _state, hash);
+ return std::make_shared<api::RequestBucketInfoCommand>(makeBucketSpace(), 0, *_state, hash);
}
auto createFullFetchCommandWithHash(document::BucketSpace space, vespalib::stringref hash) const {
- return std::make_shared<api::RequestBucketInfoCommand>(space, 0, _state, hash);
+ return std::make_shared<api::RequestBucketInfoCommand>(space, 0, *_state, hash);
}
auto acquireBucketLockAndSendInfoRequest(const document::BucketId& bucket) {
@@ -726,7 +734,7 @@ group[2].nodes[2].index 5
private:
BucketManagerTest& _self;
- lib::ClusterState _state;
+ std::shared_ptr<lib::ClusterState> _state;
};
TEST_F(BucketManagerTest, split_reply_ordered_after_bucket_reply) {
@@ -1214,4 +1222,25 @@ TEST_F(BucketManagerTest, fall_back_to_legacy_global_distribution_hash_on_mismat
EXPECT_EQ(api::ReturnCode::OK, reply.getResult().getResult()); // _not_ REJECTED
}
+// It's possible for the request processing thread and onSetSystemState (which use
+// the same mutex) to race with the actual internal component cluster state switch-over.
+// Ensure we detect and handle this by bouncing the request back to the distributor.
+// It's for all intents and purposes guaranteed that the internal state has converged
+// once the distributor has gotten around to retrying the operation.
+TEST_F(BucketManagerTest, bounce_request_on_internal_cluster_state_version_mismatch) {
+ ConcurrentOperationFixture f(*this);
+
+ // Make manager-internal and component-internal version state inconsistent
+ f.update_cluster_state(lib::ClusterState("version:2 distributor:1 storage:1"));
+ _manager->onDown(std::make_shared<api::SetSystemStateCommand>(lib::ClusterState("version:3 distributor:1 storage:1")));
+
+ // Info command is sent with state version 2, which mismatches that of internal state 3
+ // even though it's the same as the component's current version.
+ _top->sendDown(f.createFullFetchCommand());
+
+ auto replies = f.awaitAndGetReplies(1);
+ auto& reply = dynamic_cast<api::RequestBucketInfoReply&>(*replies[0]);
+ EXPECT_EQ(api::ReturnCode::REJECTED, reply.getResult().getResult());
+}
+
} // storage
diff --git a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp
index 2d70ee8d3ba..4680414baa1 100644
--- a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp
+++ b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp
@@ -411,7 +411,7 @@ void BucketManager::startWorkerThread()
bool BucketManager::onRequestBucketInfo(
const std::shared_ptr<api::RequestBucketInfoCommand>& cmd)
{
- LOG(debug, "Got request bucket info command");
+ LOG(debug, "Got request bucket info command %s", cmd->toString().c_str());
if (cmd->getBuckets().size() == 0 && cmd->hasSystemState()) {
std::lock_guard<std::mutex> guard(_workerLock);
@@ -542,9 +542,10 @@ BucketManager::processRequestBucketInfoCommands(document::BucketSpace bucketSpac
const auto our_hash = distribution->getNodeGraph().getDistributionConfigHash();
- LOG(debug, "Processing %zu queued request bucket info commands. "
+ LOG(debug, "Processing %zu queued request bucket info commands for bucket space %s. "
"Using cluster state '%s' and distribution hash '%s'",
reqs.size(),
+ bucketSpace.toString().c_str(),
clusterState->toString().c_str(),
our_hash.c_str());
@@ -555,7 +556,15 @@ BucketManager::processRequestBucketInfoCommands(document::BucketSpace bucketSpac
const auto their_hash = (*it)->getDistributionHash();
std::ostringstream error;
- if ((*it)->getSystemState().getVersion() > _lastClusterStateSeen) {
+ if (clusterState->getVersion() != _lastClusterStateSeen) {
+ // Calling onSetSystemState() on _this_ component and actually switching over
+ // to another cluster state version does not happen atomically. Detect and
+ // gracefully deal with the case where we're not internally in sync.
+ error << "Inconsistent internal cluster state on node during transition; "
+ << "failing request from distributor " << (*it)->getDistributor()
+ << " so it can be retried. Node version is " << clusterState->getVersion()
+ << ", but last version seen by the bucket manager is " << _lastClusterStateSeen;
+ } else if ((*it)->getSystemState().getVersion() > _lastClusterStateSeen) {
error << "Ignoring bucket info request for cluster state version "
<< (*it)->getSystemState().getVersion() << " as newest "
<< "version we know of is " << _lastClusterStateSeen;
diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
index c694fda2c1b..175c7d27033 100644
--- a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
+++ b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
@@ -219,7 +219,7 @@ PendingClusterState::requestNode(BucketSpaceAndNode bucketSpaceAndNode)
"and distribution hash '%s'",
bucketSpaceAndNode.bucketSpace.getId(),
bucketSpaceAndNode.node,
- getNewClusterStateBundleString().c_str(),
+ _newClusterStateBundle.getDerivedClusterState(bucketSpaceAndNode.bucketSpace)->toString().c_str(),
distributionHash.c_str());
std::shared_ptr<api::RequestBucketInfoCommand> cmd(