diff options
Diffstat (limited to 'storage')
-rw-r--r-- | storage/src/tests/bucketdb/bucketmanagertest.cpp | 6 | ||||
-rw-r--r-- | storage/src/vespa/storage/bucketdb/bucketmanager.cpp | 36 | ||||
-rw-r--r-- | storage/src/vespa/storage/bucketdb/bucketmanager.h | 9 |
3 files changed, 6 insertions, 45 deletions
diff --git a/storage/src/tests/bucketdb/bucketmanagertest.cpp b/storage/src/tests/bucketdb/bucketmanagertest.cpp index 6404dba7935..dc33bfd04e2 100644 --- a/storage/src/tests/bucketdb/bucketmanagertest.cpp +++ b/storage/src/tests/bucketdb/bucketmanagertest.cpp @@ -631,6 +631,10 @@ public: return std::make_shared<api::RequestBucketInfoCommand>(makeBucketSpace(), 0, *_state); } + auto createFullFetchCommand(const lib::ClusterState& explicit_state) const { + return std::make_shared<api::RequestBucketInfoCommand>(makeBucketSpace(), 0, explicit_state); + } + auto createFullFetchCommandWithHash(vespalib::stringref hash) const { return std::make_shared<api::RequestBucketInfoCommand>(makeBucketSpace(), 0, *_state, hash); } @@ -1271,7 +1275,7 @@ TEST_F(BucketManagerTest, bounce_request_on_state_change_barrier_not_reached) { _top->waitForMessage(api::MessageType::SETSYSTEMSTATE_REPLY, MESSAGE_WAIT_TIME); (void)_top->getRepliesOnce(); - _top->sendDown(f.createFullFetchCommand()); + _top->sendDown(f.createFullFetchCommand(new_state)); replies = f.awaitAndGetReplies(1); { auto& reply = dynamic_cast<api::RequestBucketInfoReply&>(*replies[0]); diff --git a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp index 9e2b8566fdd..9b3ad5c8e22 100644 --- a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp +++ b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp @@ -40,10 +40,8 @@ BucketManager::BucketManager(const config::ConfigUri & configUri, _clusterStateLock(), _queueProcessingLock(), _queuedReplies(), - _firstEqualClusterStateVersion(0), _last_cluster_state_version_initiated(0), _last_cluster_state_version_completed(0), - _lastUnifiedClusterState(""), _doneInitialized(false), _requestsCurrentlyProcessing(0), _component(compReg, "bucketmanager"), @@ -460,24 +458,6 @@ bool BucketManager::onRequestBucketInfo( return true; } -namespace { - std::string unifyState(const lib::ClusterState& state) { - std::vector<char> distributors( - state.getNodeCount(lib::NodeType::DISTRIBUTOR), 'd'); - - uint32_t length = 0; - for (uint32_t i = 0; i < distributors.size(); ++i) { - const lib::NodeState& ns(state.getNodeState( - lib::Node(lib::NodeType::DISTRIBUTOR, i))); - if (ns.getState().oneOf("uirm")) { - distributors[i] = 'u'; - length = i + 1; - } - } - return std::string(&distributors[0], length); - } -} - BucketManager::ScopedQueueDispatchGuard::ScopedQueueDispatchGuard( BucketManager& mgr) : _mgr(mgr) @@ -567,17 +547,10 @@ BucketManager::processRequestBucketInfoCommands(document::BucketSpace bucketSpac << " so it can be retried. Node version is " << clusterState->getVersion() << ", last version seen by the bucket manager is " << _last_cluster_state_version_initiated << ", last internally converged version is " << _last_cluster_state_version_completed; - } else if ((*it)->getSystemState().getVersion() > _last_cluster_state_version_initiated) { + } else if ((*it)->getSystemState().getVersion() != _last_cluster_state_version_initiated) { error << "Ignoring bucket info request for cluster state version " << (*it)->getSystemState().getVersion() << " as newest " << "version we know of is " << _last_cluster_state_version_initiated; - } else if ((*it)->getSystemState().getVersion() - < _firstEqualClusterStateVersion) - { - error << "Ignoring bucket info request for cluster state version " - << (*it)->getSystemState().getVersion() << " as versions " - << "from version " << _firstEqualClusterStateVersion - << " differs from this state."; } else if (!their_hash.empty() && their_hash != our_hash) { // Mismatching config hash indicates nodes are out of sync with their config generations error << "Distributor config hash is not equal to our own; must reject request (our hash: " @@ -722,14 +695,7 @@ BucketManager::onSetSystemState(const std::shared_ptr<api::SetSystemStateCommand { LOG(debug, "onSetSystemState(%s)", cmd->toString().c_str()); const lib::ClusterState& state(cmd->getSystemState()); - std::string unified(unifyState(state)); std::lock_guard lock(_clusterStateLock); - if (unified != _lastUnifiedClusterState - || state.getVersion() != _last_cluster_state_version_initiated + 1) - { - _lastUnifiedClusterState = unified; - _firstEqualClusterStateVersion = state.getVersion(); - } // At this point, the incoming cluster state version has not yet been enabled on this node; // it's on its merry way down to the StateManager which handles internal transitions. // We must take note of the fact that it will _soon_ be enabled, and must avoid processing diff --git a/storage/src/vespa/storage/bucketdb/bucketmanager.h b/storage/src/vespa/storage/bucketdb/bucketmanager.h index 4a04331e08f..fda13e09c45 100644 --- a/storage/src/vespa/storage/bucketdb/bucketmanager.h +++ b/storage/src/vespa/storage/bucketdb/bucketmanager.h @@ -57,21 +57,12 @@ private: using ConflictingBuckets = std::unordered_set<document::BucketId, document::BucketId::hash>; ReplyQueue _queuedReplies; ConflictingBuckets _conflictingBuckets; - /** - * Keeps the version number of the first cluster state version seen that - * after distributor unification is equal to all cluster states seen after. - */ - uint32_t _firstEqualClusterStateVersion; // The most current cluster state versions that we've observed on the way _down_ // through the chain, i.e. prior to being enabled on the node. uint32_t _last_cluster_state_version_initiated; // The most current cluster state we've observed on the way _up_ through the // chain, i.e. after being enabled on the node. uint32_t _last_cluster_state_version_completed; - /** - * The unified version of the last cluster state. - */ - std::string _lastUnifiedClusterState; bool _doneInitialized; size_t _requestsCurrentlyProcessing; ServiceLayerComponent _component; |