summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--storage/src/tests/bucketdb/bucketmanagertest.cpp45
-rw-r--r--storage/src/vespa/storage/bucketdb/bucketmanager.cpp15
-rw-r--r--storage/src/vespa/storage/distributor/pendingclusterstate.cpp2
3 files changed, 50 insertions, 12 deletions
diff --git a/storage/src/tests/bucketdb/bucketmanagertest.cpp b/storage/src/tests/bucketdb/bucketmanagertest.cpp
index e14bb8e11d1..b83ccccbf48 100644
--- a/storage/src/tests/bucketdb/bucketmanagertest.cpp
+++ b/storage/src/tests/bucketdb/bucketmanagertest.cpp
@@ -556,7 +556,7 @@ class ConcurrentOperationFixture {
public:
explicit ConcurrentOperationFixture(BucketManagerTest& self)
: _self(self),
- _state("distributor:1 storage:1")
+ _state(std::make_shared<lib::ClusterState>("distributor:1 storage:1"))
{
_self.setupTestEnvironment();
_self._top->open();
@@ -566,9 +566,7 @@ public:
// Need a cluster state to work with initially, so that processing
// bucket requests can calculate a target distributor.
- _self._node->setClusterState(_state);
- _self._manager->onDown(
- std::make_shared<api::SetSystemStateCommand>(_state));
+ updater_internal_cluster_state_with_current();
}
void setUp(const WithBuckets& buckets) {
@@ -577,6 +575,16 @@ public:
}
}
+ void updater_internal_cluster_state_with_current() {
+ _self._node->setClusterState(*_state);
+ _self._manager->onDown(std::make_shared<api::SetSystemStateCommand>(*_state));
+ }
+
+ void update_cluster_state(const lib::ClusterState& state) {
+ _state = std::make_shared<lib::ClusterState>(state);
+ updater_internal_cluster_state_with_current();
+ }
+
auto acquireBucketLock(const document::BucketId& bucket) {
return _self._node->getStorageBucketDatabase().get(bucket, "foo");
}
@@ -610,15 +618,15 @@ public:
}
auto createFullFetchCommand() const {
- return std::make_shared<api::RequestBucketInfoCommand>(makeBucketSpace(), 0, _state);
+ return std::make_shared<api::RequestBucketInfoCommand>(makeBucketSpace(), 0, *_state);
}
auto createFullFetchCommandWithHash(vespalib::stringref hash) const {
- return std::make_shared<api::RequestBucketInfoCommand>(makeBucketSpace(), 0, _state, hash);
+ return std::make_shared<api::RequestBucketInfoCommand>(makeBucketSpace(), 0, *_state, hash);
}
auto createFullFetchCommandWithHash(document::BucketSpace space, vespalib::stringref hash) const {
- return std::make_shared<api::RequestBucketInfoCommand>(space, 0, _state, hash);
+ return std::make_shared<api::RequestBucketInfoCommand>(space, 0, *_state, hash);
}
auto acquireBucketLockAndSendInfoRequest(const document::BucketId& bucket) {
@@ -726,7 +734,7 @@ group[2].nodes[2].index 5
private:
BucketManagerTest& _self;
- lib::ClusterState _state;
+ std::shared_ptr<lib::ClusterState> _state;
};
TEST_F(BucketManagerTest, split_reply_ordered_after_bucket_reply) {
@@ -1214,4 +1222,25 @@ TEST_F(BucketManagerTest, fall_back_to_legacy_global_distribution_hash_on_mismat
EXPECT_EQ(api::ReturnCode::OK, reply.getResult().getResult()); // _not_ REJECTED
}
+// It's possible for the request processing thread and onSetSystemState (which use
+// the same mutex) to race with the actual internal component cluster state switch-over.
+// Ensure we detect and handle this by bouncing the request back to the distributor.
+// It's for all intents and purposes guaranteed that the internal state has converged
+// once the distributor has gotten around to retrying the operation.
+TEST_F(BucketManagerTest, bounce_request_on_internal_cluster_state_version_mismatch) {
+ ConcurrentOperationFixture f(*this);
+
+ // Make manager-internal and component-internal version state inconsistent
+ f.update_cluster_state(lib::ClusterState("version:2 distributor:1 storage:1"));
+ _manager->onDown(std::make_shared<api::SetSystemStateCommand>(lib::ClusterState("version:3 distributor:1 storage:1")));
+
+ // Info command is sent with state version 2, which mismatches that of internal state 3
+ // even though it's the same as the component's current version.
+ _top->sendDown(f.createFullFetchCommand());
+
+ auto replies = f.awaitAndGetReplies(1);
+ auto& reply = dynamic_cast<api::RequestBucketInfoReply&>(*replies[0]);
+ EXPECT_EQ(api::ReturnCode::REJECTED, reply.getResult().getResult());
+}
+
} // storage
diff --git a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp
index 2d70ee8d3ba..4680414baa1 100644
--- a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp
+++ b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp
@@ -411,7 +411,7 @@ void BucketManager::startWorkerThread()
bool BucketManager::onRequestBucketInfo(
const std::shared_ptr<api::RequestBucketInfoCommand>& cmd)
{
- LOG(debug, "Got request bucket info command");
+ LOG(debug, "Got request bucket info command %s", cmd->toString().c_str());
if (cmd->getBuckets().size() == 0 && cmd->hasSystemState()) {
std::lock_guard<std::mutex> guard(_workerLock);
@@ -542,9 +542,10 @@ BucketManager::processRequestBucketInfoCommands(document::BucketSpace bucketSpac
const auto our_hash = distribution->getNodeGraph().getDistributionConfigHash();
- LOG(debug, "Processing %zu queued request bucket info commands. "
+ LOG(debug, "Processing %zu queued request bucket info commands for bucket space %s. "
"Using cluster state '%s' and distribution hash '%s'",
reqs.size(),
+ bucketSpace.toString().c_str(),
clusterState->toString().c_str(),
our_hash.c_str());
@@ -555,7 +556,15 @@ BucketManager::processRequestBucketInfoCommands(document::BucketSpace bucketSpac
const auto their_hash = (*it)->getDistributionHash();
std::ostringstream error;
- if ((*it)->getSystemState().getVersion() > _lastClusterStateSeen) {
+ if (clusterState->getVersion() != _lastClusterStateSeen) {
+ // Calling onSetSystemState() on _this_ component and actually switching over
+ // to another cluster state version does not happen atomically. Detect and
+ // gracefully deal with the case where we're not internally in sync.
+ error << "Inconsistent internal cluster state on node during transition; "
+ << "failing request from distributor " << (*it)->getDistributor()
+ << " so it can be retried. Node version is " << clusterState->getVersion()
+ << ", but last version seen by the bucket manager is " << _lastClusterStateSeen;
+ } else if ((*it)->getSystemState().getVersion() > _lastClusterStateSeen) {
error << "Ignoring bucket info request for cluster state version "
<< (*it)->getSystemState().getVersion() << " as newest "
<< "version we know of is " << _lastClusterStateSeen;
diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
index c694fda2c1b..175c7d27033 100644
--- a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
+++ b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
@@ -219,7 +219,7 @@ PendingClusterState::requestNode(BucketSpaceAndNode bucketSpaceAndNode)
"and distribution hash '%s'",
bucketSpaceAndNode.bucketSpace.getId(),
bucketSpaceAndNode.node,
- getNewClusterStateBundleString().c_str(),
+ _newClusterStateBundle.getDerivedClusterState(bucketSpaceAndNode.bucketSpace)->toString().c_str(),
distributionHash.c_str());
std::shared_ptr<api::RequestBucketInfoCommand> cmd(