aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahooinc.com>2022-09-21 17:16:39 +0200
committerGitHub <noreply@github.com>2022-09-21 17:16:39 +0200
commite9e69c17db2ef07566491633b21658d93aec3c39 (patch)
tree304729820dc27ac09823a5a0d11a9ad744cd7448
parentd9db475220d68a54ba2c9f820d3bae78f80abd96 (diff)
parentd040de10d2c4ff2a07582809068871a645a01afb (diff)
Merge pull request #24158 from vespa-engine/vekterli/avoid-bucket-db-race-during-cluster-state-transition
Avoid bucket DB race during content node cluster state transition [run-systemtest]
-rw-r--r--storage/src/tests/bucketdb/bucketmanagertest.cpp60
-rw-r--r--storage/src/vespa/storage/bucketdb/bucketmanager.cpp62
-rw-r--r--storage/src/vespa/storage/bucketdb/bucketmanager.h15
3 files changed, 109 insertions, 28 deletions
diff --git a/storage/src/tests/bucketdb/bucketmanagertest.cpp b/storage/src/tests/bucketdb/bucketmanagertest.cpp
index 05c5e9e8850..6404dba7935 100644
--- a/storage/src/tests/bucketdb/bucketmanagertest.cpp
+++ b/storage/src/tests/bucketdb/bucketmanagertest.cpp
@@ -566,7 +566,7 @@ public:
// Need a cluster state to work with initially, so that processing
// bucket requests can calculate a target distributor.
- updater_internal_cluster_state_with_current();
+ update_internal_cluster_state_with_current();
}
void setUp(const WithBuckets& buckets) {
@@ -575,14 +575,24 @@ public:
}
}
- void updater_internal_cluster_state_with_current() {
+ void update_internal_cluster_state_with_current() {
_self._node->setClusterState(*_state);
- _self._manager->onDown(std::make_shared<api::SetSystemStateCommand>(*_state));
+ auto cmd = std::make_shared<api::SetSystemStateCommand>(*_state);
+ _self._manager->onDown(cmd);
+ // Also send up reply to release internal state transition barrier.
+ // We expect there to be no other pending messages at this point.
+ std::shared_ptr<api::StorageReply> reply(cmd->makeReply());
+ auto as_state_reply = std::dynamic_pointer_cast<api::SetSystemStateReply>(reply);
+ assert(as_state_reply);
+ assert(_self._top->getNumReplies() == 0);
+ _self._manager->onUp(as_state_reply);
+ assert(_self._top->getNumReplies() == 1);
+ (void)_self._top->getRepliesOnce(); // Clear state reply sent up chain
}
void update_cluster_state(const lib::ClusterState& state) {
_state = std::make_shared<lib::ClusterState>(state);
- updater_internal_cluster_state_with_current();
+ update_internal_cluster_state_with_current();
}
auto acquireBucketLock(const document::BucketId& bucket) {
@@ -1227,4 +1237,46 @@ TEST_F(BucketManagerTest, bounce_request_on_internal_cluster_state_version_misma
EXPECT_EQ(api::ReturnCode::REJECTED, reply.getResult().getResult());
}
+// This tests a slightly different inconsistency than the above test; the node has
+// locally enabled the cluster state (i.e. initially observed version == enabled version),
+// but is not yet done processing side effects from doing so.
+// See comments in BucketManager::onSetSystemState[Reply]() for rationale
+TEST_F(BucketManagerTest, bounce_request_on_state_change_barrier_not_reached) {
+ ConcurrentOperationFixture f(*this);
+
+ // Make manager-internal and component-internal version state inconsistent
+ f.update_cluster_state(lib::ClusterState("version:2 distributor:1 storage:1"));
+ auto new_state = lib::ClusterState("version:3 distributor:1 storage:1");
+ auto state_cmd = std::make_shared<api::SetSystemStateCommand>(new_state);
+ _top->sendDown(state_cmd);
+ _bottom->waitForMessage(api::MessageType::SETSYSTEMSTATE, MESSAGE_WAIT_TIME);
+ (void)_bottom->getCommandsOnce();
+ _node->setClusterState(new_state);
+
+ // At this point, the node's internal cluster state matches that of the state command
+ // which was observed on the way down. But there may still be side effects pending from
+ // enabling the cluster state. So we must still reject requests until we have observed
+ // the reply for the state command (which must order after any and all side effects).
+
+ _top->sendDown(f.createFullFetchCommand());
+ auto replies = f.awaitAndGetReplies(1);
+ {
+ auto& reply = dynamic_cast<api::RequestBucketInfoReply&>(*replies[0]);
+ EXPECT_EQ(api::ReturnCode::REJECTED, reply.getResult().getResult());
+ }
+ (void)_top->getRepliesOnce();
+
+ // Once the cluster state reply has been observed, requests can go through as expected.
+ _manager->onUp(std::shared_ptr<api::StorageReply>(state_cmd->makeReply()));
+ _top->waitForMessage(api::MessageType::SETSYSTEMSTATE_REPLY, MESSAGE_WAIT_TIME);
+ (void)_top->getRepliesOnce();
+
+ _top->sendDown(f.createFullFetchCommand());
+ replies = f.awaitAndGetReplies(1);
+ {
+ auto& reply = dynamic_cast<api::RequestBucketInfoReply&>(*replies[0]);
+ EXPECT_EQ(api::ReturnCode::OK, reply.getResult().getResult());
+ }
+}
+
} // storage
diff --git a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp
index 51422de07e6..9e2b8566fdd 100644
--- a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp
+++ b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp
@@ -41,7 +41,8 @@ BucketManager::BucketManager(const config::ConfigUri & configUri,
_queueProcessingLock(),
_queuedReplies(),
_firstEqualClusterStateVersion(0),
- _lastClusterStateSeen(0),
+ _last_cluster_state_version_initiated(0),
+ _last_cluster_state_version_completed(0),
_lastUnifiedClusterState(""),
_doneInitialized(false),
_requestsCurrentlyProcessing(0),
@@ -289,7 +290,7 @@ void BucketManager::run(framework::ThreadHandle& thread)
bool didWork = false;
BucketInfoRequestMap infoReqs;
{
- std::lock_guard<std::mutex> guard(_workerLock);
+ std::lock_guard guard(_workerLock);
infoReqs.swap(_bucketInfoRequests);
}
@@ -298,7 +299,7 @@ void BucketManager::run(framework::ThreadHandle& thread)
}
{
- std::unique_lock<std::mutex> guard(_workerLock);
+ std::unique_lock guard(_workerLock);
for (const auto &req : infoReqs) {
assert(req.second.empty());
}
@@ -413,7 +414,7 @@ bool BucketManager::onRequestBucketInfo(
LOG(debug, "Got request bucket info command %s", cmd->toString().c_str());
if (cmd->getBuckets().size() == 0 && cmd->hasSystemState()) {
- std::lock_guard<std::mutex> guard(_workerLock);
+ std::lock_guard guard(_workerLock);
_bucketInfoRequests[cmd->getBucketSpace()].push_back(cmd);
_workerCond.notify_all();
LOG(spam, "Scheduled request bucket info request for retrieval");
@@ -492,7 +493,7 @@ BucketManager::ScopedQueueDispatchGuard::~ScopedQueueDispatchGuard()
void
BucketManager::enterQueueProtectedSection()
{
- std::lock_guard<std::mutex> guard(_queueProcessingLock);
+ std::lock_guard guard(_queueProcessingLock);
++_requestsCurrentlyProcessing;
}
@@ -500,7 +501,7 @@ void
BucketManager::leaveQueueProtectedSection(ScopedQueueDispatchGuard& queueGuard)
{
(void) queueGuard; // Only used to enforce guard is held while calling.
- std::lock_guard<std::mutex> guard(_queueProcessingLock);
+ std::lock_guard guard(_queueProcessingLock);
assert(_requestsCurrentlyProcessing > 0);
// Full bucket info fetches may be concurrently interleaved with bucket-
// specific fetches outside of the processing thread. We only allow queued
@@ -548,25 +549,28 @@ BucketManager::processRequestBucketInfoCommands(document::BucketSpace bucketSpac
clusterState->toString().c_str(),
our_hash.c_str());
- std::lock_guard<std::mutex> clusterStateGuard(_clusterStateLock);
+ std::lock_guard clusterStateGuard(_clusterStateLock);
for (auto it = reqs.rbegin(); it != reqs.rend(); ++it) {
// Currently small requests should not be forwarded to worker thread
assert((*it)->hasSystemState());
const auto their_hash = (*it)->getDistributionHash();
std::ostringstream error;
- if (clusterState->getVersion() != _lastClusterStateSeen) {
+ if ((clusterState->getVersion() != _last_cluster_state_version_initiated) ||
+ (_last_cluster_state_version_initiated != _last_cluster_state_version_completed))
+ {
// Calling onSetSystemState() on _this_ component and actually switching over
// to another cluster state version does not happen atomically. Detect and
// gracefully deal with the case where we're not internally in sync.
error << "Inconsistent internal cluster state on node during transition; "
<< "failing request from distributor " << (*it)->getDistributor()
<< " so it can be retried. Node version is " << clusterState->getVersion()
- << ", but last version seen by the bucket manager is " << _lastClusterStateSeen;
- } else if ((*it)->getSystemState().getVersion() > _lastClusterStateSeen) {
+ << ", last version seen by the bucket manager is " << _last_cluster_state_version_initiated
+ << ", last internally converged version is " << _last_cluster_state_version_completed;
+ } else if ((*it)->getSystemState().getVersion() > _last_cluster_state_version_initiated) {
error << "Ignoring bucket info request for cluster state version "
<< (*it)->getSystemState().getVersion() << " as newest "
- << "version we know of is " << _lastClusterStateSeen;
+ << "version we know of is " << _last_cluster_state_version_initiated;
} else if ((*it)->getSystemState().getVersion()
< _firstEqualClusterStateVersion)
{
@@ -714,20 +718,42 @@ BucketManager::verifyAndUpdateLastModified(api::StorageCommand& cmd,
}
bool
-BucketManager::onSetSystemState(
- const std::shared_ptr<api::SetSystemStateCommand>& cmd)
+BucketManager::onSetSystemState(const std::shared_ptr<api::SetSystemStateCommand>& cmd)
{
LOG(debug, "onSetSystemState(%s)", cmd->toString().c_str());
const lib::ClusterState& state(cmd->getSystemState());
std::string unified(unifyState(state));
- std::lock_guard<std::mutex> lock(_clusterStateLock);
+ std::lock_guard lock(_clusterStateLock);
if (unified != _lastUnifiedClusterState
- || state.getVersion() != _lastClusterStateSeen + 1)
+ || state.getVersion() != _last_cluster_state_version_initiated + 1)
{
_lastUnifiedClusterState = unified;
_firstEqualClusterStateVersion = state.getVersion();
}
- _lastClusterStateSeen = state.getVersion();
+ // At this point, the incoming cluster state version has not yet been enabled on this node;
+ // it's on its merry way down to the StateManager which handles internal transitions.
+ // We must take note of the fact that it will _soon_ be enabled, and must avoid processing
+ // any bucket info requests until any and all side effects caused by this process are
+ // visible in the bucket DB. Failure to enforce this may cause bucket DB iterations to happen
+ // concurrently with transition-induced mutations, causing inconsistent results.
+ // We will not allow processing bucket info fetches until we've observed the _reply_ of the
+ // state command. Such replies are always ordered _after_ side effects of state enabling have
+ // become visible in the bucket DB. See onSetSystemStateReply().
+ _last_cluster_state_version_initiated = state.getVersion();
+ return false;
+}
+
+bool
+BucketManager::onSetSystemStateReply(const std::shared_ptr<api::SetSystemStateReply>& reply)
+{
+ LOG(debug, "onSetSystemStateReply(%s)", reply->toString().c_str());
+ std::lock_guard lock(_clusterStateLock);
+ // This forms part 2 of the cluster state visibility barrier that was initiated when
+ // we first observed the new cluster state version in onSetSystemState(). After this
+ // point, bucket info requests may be processed (assuming no new cluster state versions
+ // have arrived concurrently). In the case of racing cluster states, internal FIFO ordering
+ // of messages shall ensure we eventually end up with a consistent view of observed versions.
+ _last_cluster_state_version_completed = reply->getSystemState().getVersion();
return false;
}
@@ -830,7 +856,7 @@ BucketManager::enqueueIfBucketHasConflicts(const api::BucketReply::SP& reply)
{
// Should very rarely contend, since persistence replies are all sent up
// via a single dispatcher thread.
- std::lock_guard<std::mutex> guard(_queueProcessingLock);
+ std::lock_guard guard(_queueProcessingLock);
if (_requestsCurrentlyProcessing == 0) {
return false; // Nothing to do here; pass through reply.
}
@@ -867,7 +893,7 @@ bool
BucketManager::enqueueAsConflictIfProcessingRequest(
const api::StorageReply::SP& reply)
{
- std::lock_guard<std::mutex> guard(_queueProcessingLock);
+ std::lock_guard guard(_queueProcessingLock);
if (_requestsCurrentlyProcessing != 0) {
LOG(debug, "Enqueued %s due to concurrent RequestBucketInfo",
reply->toString().c_str());
diff --git a/storage/src/vespa/storage/bucketdb/bucketmanager.h b/storage/src/vespa/storage/bucketdb/bucketmanager.h
index 8bd892e0f09..4a04331e08f 100644
--- a/storage/src/vespa/storage/bucketdb/bucketmanager.h
+++ b/storage/src/vespa/storage/bucketdb/bucketmanager.h
@@ -62,11 +62,12 @@ private:
* after distributor unification is equal to all cluster states seen after.
*/
uint32_t _firstEqualClusterStateVersion;
- /**
- * The last cluster state version seen. We must ensure we dont answer to
- * cluster states we haven't seen.
- */
- uint32_t _lastClusterStateSeen;
+ // The most current cluster state versions that we've observed on the way _down_
+ // through the chain, i.e. prior to being enabled on the node.
+ uint32_t _last_cluster_state_version_initiated;
+ // The most current cluster state we've observed on the way _up_ through the
+ // chain, i.e. after being enabled on the node.
+ uint32_t _last_cluster_state_version_completed;
/**
* The unified version of the last cluster state.
*/
@@ -105,6 +106,8 @@ public:
void force_db_sweep_and_metric_update() { updateMetrics(true); }
+ bool onUp(const std::shared_ptr<api::StorageMessage>&) override;
+
private:
friend struct BucketManagerTest;
@@ -198,8 +201,8 @@ private:
*/
bool replyConflictsWithConcurrentOperation(const api::BucketReply& reply) const;
bool enqueueIfBucketHasConflicts(const api::BucketReply::SP& reply);
- bool onUp(const std::shared_ptr<api::StorageMessage>&) override;
bool onSetSystemState(const std::shared_ptr<api::SetSystemStateCommand>&) override;
+ bool onSetSystemStateReply(const std::shared_ptr<api::SetSystemStateReply>&) override;
bool onCreateBucket(const std::shared_ptr<api::CreateBucketCommand>&) override;
bool onMergeBucket(const std::shared_ptr<api::MergeBucketCommand>&) override;
bool onRemove(const std::shared_ptr<api::RemoveCommand>&) override;