diff options
author | Tor Egge <Tor.Egge@oath.com> | 2017-11-07 13:01:08 +0000 |
---|---|---|
committer | Tor Egge <Tor.Egge@oath.com> | 2017-11-07 14:29:34 +0000 |
commit | 0dbec7f1961d2ee2588305d6a498e710089ebe5f (patch) | |
tree | e27d1ee64819c23bcddd0ea0ed2a3a9312762fec /storage | |
parent | 45669b2bb56034e8c0d92e964aa17eb5c6cf68d6 (diff) |
Factor out portions of PendingClusterState bound to a specific bucket
space to PendingBucketSpaceDbTransition.
Diffstat (limited to 'storage')
6 files changed, 384 insertions, 262 deletions
diff --git a/storage/src/vespa/storage/distributor/CMakeLists.txt b/storage/src/vespa/storage/distributor/CMakeLists.txt index 3dc08d858ad..3f3f6b8458a 100644 --- a/storage/src/vespa/storage/distributor/CMakeLists.txt +++ b/storage/src/vespa/storage/distributor/CMakeLists.txt @@ -27,6 +27,7 @@ vespa_add_library(storage_distributor operationtargetresolverimpl.cpp ownership_transfer_safe_time_point_calculator.cpp pendingclusterstate.cpp + pending_bucket_space_db_transition.cpp pendingmessagetracker.cpp persistence_operation_metric_set.cpp persistencemessagetracker.cpp diff --git a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp new file mode 100644 index 00000000000..f5c8da4c261 --- /dev/null +++ b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp @@ -0,0 +1,233 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "pending_bucket_space_db_transition.h" +#include "clusterinformation.h" +#include "pendingclusterstate.h" +#include <vespa/storage/common/bucketoperationlogger.h> +#include <algorithm> + +#include <vespa/log/log.h> +LOG_SETUP(".pendingclusterstateprocessor"); + +namespace storage::distributor { + +PendingBucketSpaceDbTransition::PendingBucketSpaceDbTransition(const PendingClusterState &pendingClusterState, + std::shared_ptr<const ClusterInformation> clusterInfo, + const lib::ClusterState &newClusterState, + api::Timestamp creationTimestamp) + : _entries(), + _iter(0), + _removedBuckets(), + _missingEntries(), + _clusterInfo(std::move(clusterInfo)), + _outdatedNodes(pendingClusterState.getOutdatedNodeSet()), + _newClusterState(newClusterState), + _creationTimestamp(creationTimestamp), + _pendingClusterState(pendingClusterState) +{ +} + +PendingBucketSpaceDbTransition::~PendingBucketSpaceDbTransition() +{ +} + +PendingBucketSpaceDbTransition::Range +PendingBucketSpaceDbTransition::skipAllForSameBucket() +{ + Range r(_iter, _iter); + + for (document::BucketId& bid = _entries[_iter].bucketId; + _iter < _entries.size() && _entries[_iter].bucketId == bid; + ++_iter) + { + } + + r.second = _iter; + return r; +} + +std::vector<BucketCopy> +PendingBucketSpaceDbTransition::getCopiesThatAreNewOrAltered(BucketDatabase::Entry& info, const Range& range) +{ + std::vector<BucketCopy> copiesToAdd; + for (uint32_t i = range.first; i < range.second; ++i) { + const BucketCopy& candidate(_entries[i].copy); + const BucketCopy* cp = info->getNode(candidate.getNode()); + + if (!cp || !(cp->getBucketInfo() == candidate.getBucketInfo())) { + copiesToAdd.push_back(candidate); + } + } + return copiesToAdd; +} + +void +PendingBucketSpaceDbTransition::insertInfo(BucketDatabase::Entry& info, const Range& range) +{ + std::vector<BucketCopy> copiesToAddOrUpdate( + getCopiesThatAreNewOrAltered(info, range)); + + std::vector<uint16_t> order( + _clusterInfo->getIdealStorageNodesForState( + _newClusterState, + _entries[range.first].bucketId)); + info->addNodes(copiesToAddOrUpdate, order, TrustedUpdate::DEFER); + + LOG_BUCKET_OPERATION_NO_LOCK( + _entries[range.first].bucketId, + vespalib::make_string("insertInfo: %s", + info.toString().c_str())); +} + +std::string +PendingBucketSpaceDbTransition::requestNodesToString() +{ + return _pendingClusterState.requestNodesToString(); +} + +bool +PendingBucketSpaceDbTransition::removeCopiesFromNodesThatWereRequested(BucketDatabase::Entry& e, const document::BucketId& bucketId) +{ + bool updated = false; + for (uint32_t i = 0; i < e->getNodeCount();) { + auto& info(e->getNodeRef(i)); + const uint16_t entryNode(info.getNode()); + // Don't remove an entry if it's been updated in the time after the + // bucket info requests were sent, as this would erase newer state. + // Don't immediately update trusted state, as that could erroneously + // mark a single remaining replica as trusted even though there might + // be one or more additional replicas pending merge into the database. + if (nodeIsOutdated(entryNode) + && (info.getTimestamp() < _creationTimestamp) + && e->removeNode(entryNode, TrustedUpdate::DEFER)) + { + LOG(spam, + "Removed bucket %s from node %d", + bucketId.toString().c_str(), + entryNode); + updated = true; + // After removing current node, getNodeRef(i) will point to the _next_ node, so don't increment `i`. + } else { + ++i; + } + } + return updated; +} + +bool +PendingBucketSpaceDbTransition::databaseIteratorHasPassedBucketInfoIterator(const document::BucketId& bucketId) const +{ + return (_iter < _entries.size() + && _entries[_iter].bucketId.toKey() < bucketId.toKey()); +} + +bool +PendingBucketSpaceDbTransition::bucketInfoIteratorPointsToBucket(const document::BucketId& bucketId) const +{ + return _iter < _entries.size() && _entries[_iter].bucketId == bucketId; +} + +bool +PendingBucketSpaceDbTransition::process(BucketDatabase::Entry& e) +{ + document::BucketId bucketId(e.getBucketId()); + + LOG(spam, + "Before merging info from nodes [%s], bucket %s had info %s", + requestNodesToString().c_str(), + bucketId.toString().c_str(), + e.getBucketInfo().toString().c_str()); + + while (databaseIteratorHasPassedBucketInfoIterator(bucketId)) { + LOG(spam, "Found new bucket %s, adding", + _entries[_iter].bucketId.toString().c_str()); + + _missingEntries.push_back(skipAllForSameBucket()); + } + + bool updated(removeCopiesFromNodesThatWereRequested(e, bucketId)); + + if (bucketInfoIteratorPointsToBucket(bucketId)) { + LOG(spam, "Updating bucket %s", + _entries[_iter].bucketId.toString().c_str()); + + insertInfo(e, skipAllForSameBucket()); + updated = true; + } + + if (updated) { + // Remove bucket if we've previously removed all nodes from it + if (e->getNodeCount() == 0) { + _removedBuckets.push_back(bucketId); + } else { + e.getBucketInfo().updateTrusted(); + } + } + + LOG(spam, + "After merging info from nodes [%s], bucket %s had info %s", + requestNodesToString().c_str(), + bucketId.toString().c_str(), + e.getBucketInfo().toString().c_str()); + + return true; +} + +void +PendingBucketSpaceDbTransition::addToBucketDB(BucketDatabase& db, const Range& range) +{ + LOG(spam, "Adding new bucket %s with %d copies", + _entries[range.first].bucketId.toString().c_str(), + range.second - range.first); + + BucketDatabase::Entry e(_entries[range.first].bucketId, BucketInfo()); + insertInfo(e, range); + if (e->getLastGarbageCollectionTime() == 0) { + e->setLastGarbageCollectionTime( + framework::MicroSecTime(_creationTimestamp) + .getSeconds().getTime()); + } + e.getBucketInfo().updateTrusted(); + db.update(e); +} + +void +PendingBucketSpaceDbTransition::mergeInto(BucketDatabase& db) +{ + std::sort(_entries.begin(), _entries.end()); + + db.forEach(*this); + + for (uint32_t i = 0; i < _removedBuckets.size(); ++i) { + db.remove(_removedBuckets[i]); + } + _removedBuckets.clear(); + + // All of the remaining were not already in the bucket database. + while (_iter < _entries.size()) { + _missingEntries.push_back(skipAllForSameBucket()); + } + + for (uint32_t i = 0; i < _missingEntries.size(); ++i) { + addToBucketDB(db, _missingEntries[i]); + } +} + +void +PendingBucketSpaceDbTransition::onRequestBucketInfoReply(const api::RequestBucketInfoReply &reply, uint16_t node) +{ + for (const auto &entry : reply.getBucketInfo()) { + _entries.emplace_back(entry._bucketId, + BucketCopy(_creationTimestamp, + node, + entry._info)); + } +} + +void +PendingBucketSpaceDbTransition::addNodeInfo(const document::BucketId& id, const BucketCopy& copy) +{ + _entries.emplace_back(id, copy); +} + +} diff --git a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.h b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.h new file mode 100644 index 00000000000..04c12f9dd7d --- /dev/null +++ b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.h @@ -0,0 +1,92 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "pending_bucket_space_db_transition_entry.h" +#include <vespa/storage/bucketdb/bucketdatabase.h> +#include <unordered_set> + +namespace storage::api { class RequestBucketInfoReply; } +namespace storage::lib { class ClusterState; } + +namespace storage::distributor { + +class ClusterInformation; +class PendingClusterState; + +/** + * Class used by PendingClusterState to track request bucket info + * reply result within a bucket space and apply it to the distributor + * bucket database when switching to the pending cluster state. + */ +class PendingBucketSpaceDbTransition : public BucketDatabase::MutableEntryProcessor +{ +public: + using Entry = pendingbucketspacedbtransition::Entry; + using EntryList = std::vector<Entry>; +private: + using Range = std::pair<uint32_t, uint32_t>; + + EntryList _entries; + uint32_t _iter; + std::vector<document::BucketId> _removedBuckets; + std::vector<Range> _missingEntries; + std::shared_ptr<const ClusterInformation> _clusterInfo; + + // Set for all nodes that may have changed state since that previous + // active cluster state, or that were marked as outdated when the pending + // cluster state was constructed. + // May be a superset of _requestedNodes, as some nodes that are outdated + // may be down and thus cannot get a request. + const std::unordered_set<uint16_t> _outdatedNodes; + + const lib::ClusterState &_newClusterState; + const api::Timestamp _creationTimestamp; + const PendingClusterState &_pendingClusterState; + + // BucketDataBase::MutableEntryProcessor API + bool process(BucketDatabase::Entry& e) override; + + /** + * Skips through all entries for the same bucket and returns + * the range in the entry list for which they were found. + * The range is [from, to> + */ + Range skipAllForSameBucket(); + + std::vector<BucketCopy> getCopiesThatAreNewOrAltered(BucketDatabase::Entry& info, const Range& range); + void insertInfo(BucketDatabase::Entry& info, const Range& range); + void addToBucketDB(BucketDatabase& db, const Range& range); + + bool nodeIsOutdated(uint16_t node) const { + return (_outdatedNodes.find(node) != _outdatedNodes.end()); + } + + // Returns whether at least one replica was removed from the entry. + // Does NOT implicitly update trusted status on remaining replicas; caller must do + // this explicitly. + bool removeCopiesFromNodesThatWereRequested(BucketDatabase::Entry& e, const document::BucketId& bucketId); + + // Helper methods for iterating over _entries + bool databaseIteratorHasPassedBucketInfoIterator(const document::BucketId& bucketId) const; + bool bucketInfoIteratorPointsToBucket(const document::BucketId& bucketId) const; + std::string requestNodesToString(); + +public: + PendingBucketSpaceDbTransition(const PendingClusterState &pendingClusterState, + std::shared_ptr<const ClusterInformation> clusterInfo, + const lib::ClusterState &newClusterState, + api::Timestamp creationTimestamp); + ~PendingBucketSpaceDbTransition(); + + // Merges all the results with the given bucket database. + void mergeInto(BucketDatabase& db); + + // Adds the info from the reply to our list of information. + void onRequestBucketInfoReply(const api::RequestBucketInfoReply &reply, uint16_t node); + + // Methods used by unit tests. + const EntryList& results() const { return _entries; } + void addNodeInfo(const document::BucketId& id, const BucketCopy& copy); +}; + +} diff --git a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition_entry.h b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition_entry.h new file mode 100644 index 00000000000..a2751275ba5 --- /dev/null +++ b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition_entry.h @@ -0,0 +1,24 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/document/bucket/bucketid.h> +#include <vespa/storage/bucketdb/bucketcopy.h> + +namespace storage::distributor::pendingbucketspacedbtransition { + +struct Entry { + Entry(const document::BucketId& bid, + const BucketCopy& copy_) + : bucketId(bid), + copy(copy_) + {} + + document::BucketId bucketId; + BucketCopy copy; + + bool operator<(const Entry& other) const { + return bucketId.toKey() < other.bucketId.toKey(); + } +}; + +} diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp index 9ad803b7b7f..41c8974470f 100644 --- a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp +++ b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "pendingclusterstate.h" +#include "pending_bucket_space_db_transition.h" #include "bucketdbupdater.h" #include <vespa/storageframework/defaultimplementation/clock/realclock.h> #include <vespa/storage/common/bucketoperationlogger.h> @@ -28,14 +29,14 @@ PendingClusterState::PendingClusterState( : _cmd(newStateCmd), _requestedNodes(newStateCmd->getSystemState().getNodeCount(lib::NodeType::STORAGE)), _outdatedNodes(newStateCmd->getSystemState().getNodeCount(lib::NodeType::STORAGE)), - _iter(0), _prevClusterState(clusterInfo->getClusterState()), _newClusterState(newStateCmd->getSystemState()), _clock(clock), _clusterInfo(clusterInfo), _creationTimestamp(creationTimestamp), _sender(sender), - _bucketOwnershipTransfer(distributorChanged(_prevClusterState, _newClusterState)) + _bucketOwnershipTransfer(distributorChanged(_prevClusterState, _newClusterState)), + _pendingTransition() { logConstructionInformation(); if (hasBucketOwnershipTransfer()) { @@ -44,6 +45,7 @@ PendingClusterState::PendingClusterState( updateSetOfNodesThatAreOutdated(); addAdditionalNodesToOutdatedSet(outdatedNodes); } + _pendingTransition = std::make_unique<PendingBucketSpaceDbTransition>(*this, _clusterInfo, _newClusterState, _creationTimestamp); if (shouldRequestBucketInfo()) { requestNodes(); } @@ -56,17 +58,18 @@ PendingClusterState::PendingClusterState( api::Timestamp creationTimestamp) : _requestedNodes(clusterInfo->getStorageNodeCount()), _outdatedNodes(clusterInfo->getStorageNodeCount()), - _iter(0), _prevClusterState(clusterInfo->getClusterState()), _newClusterState(clusterInfo->getClusterState()), _clock(clock), _clusterInfo(clusterInfo), _creationTimestamp(creationTimestamp), _sender(sender), - _bucketOwnershipTransfer(true) + _bucketOwnershipTransfer(true), + _pendingTransition() { logConstructionInformation(); markAllAvailableNodesAsRequiringRequest(); + _pendingTransition = std::make_unique<PendingBucketSpaceDbTransition>(*this, _clusterInfo, _newClusterState, _creationTimestamp); if (shouldRequestBucketInfo()) { requestNodes(); } @@ -364,14 +367,7 @@ PendingClusterState::onRequestBucketInfoReply(const std::shared_ptr<api::Request } setNodeReplied(node); - - for (uint32_t i = 0; i < reply->getBucketInfo().size(); ++i) { - addNodeInfo(reply->getBucketInfo()[i]._bucketId, - BucketCopy(_creationTimestamp, - node, - reply->getBucketInfo()[i]._info)); - } - + _pendingTransition->onRequestBucketInfoReply(*reply, node); _sentMessages.erase(iter); return true; @@ -389,68 +385,8 @@ PendingClusterState::resendDelayedMessages() { } } -void -PendingClusterState::addNodeInfo( - const document::BucketId& id, - const BucketCopy& copy) -{ - _entries.push_back(Entry(id, copy)); -} - -PendingClusterState::Range -PendingClusterState::skipAllForSameBucket() -{ - Range r(_iter, _iter); - - for (document::BucketId& bid = _entries[_iter].bucketId; - _iter < _entries.size() && _entries[_iter].bucketId == bid; - ++_iter) - { - } - - r.second = _iter; - return r; -} - -void -PendingClusterState::insertInfo( - BucketDatabase::Entry& info, - const Range& range) -{ - std::vector<BucketCopy> copiesToAddOrUpdate( - getCopiesThatAreNewOrAltered(info, range)); - - std::vector<uint16_t> order( - _clusterInfo->getIdealStorageNodesForState( - _newClusterState, - _entries[range.first].bucketId)); - info->addNodes(copiesToAddOrUpdate, order, TrustedUpdate::DEFER); - - LOG_BUCKET_OPERATION_NO_LOCK( - _entries[range.first].bucketId, - vespalib::make_string("insertInfo: %s", - info.toString().c_str())); -} - -std::vector<BucketCopy> -PendingClusterState::getCopiesThatAreNewOrAltered( - BucketDatabase::Entry& info, - const Range& range) -{ - std::vector<BucketCopy> copiesToAdd; - for (uint32_t i = range.first; i < range.second; ++i) { - const BucketCopy& candidate(_entries[i].copy); - const BucketCopy* cp = info->getNode(candidate.getNode()); - - if (!cp || !(cp->getBucketInfo() == candidate.getBucketInfo())) { - copiesToAdd.push_back(candidate); - } - } - return copiesToAdd; -} - std::string -PendingClusterState::requestNodesToString() +PendingClusterState::requestNodesToString() const { std::ostringstream ost; for (uint32_t i = 0; i < _requestedNodes.size(); ++i) { @@ -464,137 +400,10 @@ PendingClusterState::requestNodesToString() return ost.str(); } -bool -PendingClusterState::removeCopiesFromNodesThatWereRequested( - BucketDatabase::Entry& e, - const document::BucketId& bucketId) -{ - bool updated = false; - for (uint32_t i = 0; i < e->getNodeCount();) { - auto& info(e->getNodeRef(i)); - const uint16_t entryNode(info.getNode()); - // Don't remove an entry if it's been updated in the time after the - // bucket info requests were sent, as this would erase newer state. - // Don't immediately update trusted state, as that could erroneously - // mark a single remaining replica as trusted even though there might - // be one or more additional replicas pending merge into the database. - if (nodeIsOutdated(entryNode) - && (info.getTimestamp() < _creationTimestamp) - && e->removeNode(entryNode, TrustedUpdate::DEFER)) - { - LOG(spam, - "Removed bucket %s from node %d", - bucketId.toString().c_str(), - entryNode); - updated = true; - // After removing current node, getNodeRef(i) will point to the _next_ node, so don't increment `i`. - } else { - ++i; - } - } - return updated; -} - -bool -PendingClusterState::databaseIteratorHasPassedBucketInfoIterator( - const document::BucketId& bucketId) const -{ - return (_iter < _entries.size() - && _entries[_iter].bucketId.toKey() < bucketId.toKey()); -} - -bool -PendingClusterState::bucketInfoIteratorPointsToBucket( - const document::BucketId& bucketId) const -{ - return _iter < _entries.size() && _entries[_iter].bucketId == bucketId; -} - -bool -PendingClusterState::process(BucketDatabase::Entry& e) -{ - document::BucketId bucketId(e.getBucketId()); - - LOG(spam, - "Before merging info from nodes [%s], bucket %s had info %s", - requestNodesToString().c_str(), - bucketId.toString().c_str(), - e.getBucketInfo().toString().c_str()); - - while (databaseIteratorHasPassedBucketInfoIterator(bucketId)) { - LOG(spam, "Found new bucket %s, adding", - _entries[_iter].bucketId.toString().c_str()); - - _missingEntries.push_back(skipAllForSameBucket()); - } - - bool updated(removeCopiesFromNodesThatWereRequested(e, bucketId)); - - if (bucketInfoIteratorPointsToBucket(bucketId)) { - LOG(spam, "Updating bucket %s", - _entries[_iter].bucketId.toString().c_str()); - - insertInfo(e, skipAllForSameBucket()); - updated = true; - } - - if (updated) { - // Remove bucket if we've previously removed all nodes from it - if (e->getNodeCount() == 0) { - _removedBuckets.push_back(bucketId); - } else { - e.getBucketInfo().updateTrusted(); - } - } - - LOG(spam, - "After merging info from nodes [%s], bucket %s had info %s", - requestNodesToString().c_str(), - bucketId.toString().c_str(), - e.getBucketInfo().toString().c_str()); - - return true; -} - -void -PendingClusterState::addToBucketDB(BucketDatabase& db, - const Range& range) -{ - LOG(spam, "Adding new bucket %s with %d copies", - _entries[range.first].bucketId.toString().c_str(), - range.second - range.first); - - BucketDatabase::Entry e(_entries[range.first].bucketId, BucketInfo()); - insertInfo(e, range); - if (e->getLastGarbageCollectionTime() == 0) { - e->setLastGarbageCollectionTime( - framework::MicroSecTime(_creationTimestamp) - .getSeconds().getTime()); - } - e.getBucketInfo().updateTrusted(); - db.update(e); -} - void PendingClusterState::mergeInto(BucketDatabase& db) { - std::sort(_entries.begin(), _entries.end()); - - db.forEach(*this); - - for (uint32_t i = 0; i < _removedBuckets.size(); ++i) { - db.remove(_removedBuckets[i]); - } - _removedBuckets.clear(); - - // All of the remaining were not already in the bucket database. - while (_iter < _entries.size()) { - _missingEntries.push_back(skipAllForSameBucket()); - } - - for (uint32_t i = 0; i < _missingEntries.size(); ++i) { - addToBucketDB(db, _missingEntries[i]); - } + _pendingTransition->mergeInto(db); } void @@ -621,4 +430,16 @@ PendingClusterState::getSummary() const (_clock.getTimeInMicros().getTime() - _creationTimestamp)); } +const PendingBucketSpaceDbTransition::EntryList & +PendingClusterState::results() const +{ + return _pendingTransition->results(); +} + +void +PendingClusterState::addNodeInfo(const document::BucketId& id, const BucketCopy& copy) +{ + _pendingTransition->addNodeInfo(id, copy); +} + } diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.h b/storage/src/vespa/storage/distributor/pendingclusterstate.h index 316d7996d81..657d4f28179 100644 --- a/storage/src/vespa/storage/distributor/pendingclusterstate.h +++ b/storage/src/vespa/storage/distributor/pendingclusterstate.h @@ -1,8 +1,8 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once +#include "pending_bucket_space_db_transition_entry.h" #include "clusterinformation.h" -#include <vespa/storage/bucketdb/bucketdatabase.h> #include <vespa/storage/common/storagelink.h> #include <vespa/storageapi/message/bucket.h> #include <vespa/storageapi/message/state.h> @@ -12,32 +12,19 @@ #include <unordered_set> #include <deque> +namespace storage { class BucketDatabase; } + namespace storage::distributor { class DistributorMessageSender; +class PendingBucketSpaceDbTransition; /** * Class used by BucketDBUpdater to track request bucket info * messages sent to the storage nodes. */ -class PendingClusterState : public vespalib::XmlSerializable, - public BucketDatabase::MutableEntryProcessor { +class PendingClusterState : public vespalib::XmlSerializable { public: - struct Entry { - Entry(const document::BucketId& bid, - const BucketCopy& copy_) - : bucketId(bid), - copy(copy_) - {} - - document::BucketId bucketId; - BucketCopy copy; - - bool operator<(const Entry& other) const { - return bucketId.toKey() < other.bucketId.toKey(); - } - }; - struct Summary { Summary(const std::string& prevClusterState, const std::string& newClusterState, uint32_t processingTime); Summary(const Summary &); @@ -51,8 +38,6 @@ public: uint32_t _processingTime; }; - typedef std::vector<Entry> EntryList; - static std::unique_ptr<PendingClusterState> createForClusterStateChange( const framework::Clock& clock, const ClusterInformation::CSP& clusterInfo, @@ -99,11 +84,6 @@ public: _requestedNodes[nodeIdx] = true; } - /** - * Adds info from a node to our list of information. - */ - void addNodeInfo(const document::BucketId& id, const BucketCopy& copy); - /** Called to resend delayed resends due to failures. */ void resendDelayedMessages(); @@ -146,8 +126,11 @@ public: * Merges all the results with the given bucket database. */ void mergeInto(BucketDatabase& db); - bool process(BucketDatabase::Entry& e) override; - const EntryList& results() const { return _entries; } + // Get our list of information. Only used by unit test. + const std::vector<pendingbucketspacedbtransition::Entry>& results() const; + // Adds info from a node to our list of information. Only used by unit test. + void addNodeInfo(const document::BucketId& id, const BucketCopy& copy); + /** * Returns true if this pending state was due to a distribution bit @@ -156,6 +139,7 @@ public: bool distributionChange() const { return _distributionChange; } void printXml(vespalib::XmlOutputStream&) const override; Summary getSummary() const; + std::string requestNodesToString() const; private: /** @@ -202,41 +186,12 @@ private: bool nodeNeedsOwnershipTransferFromGroupDown(uint16_t nodeIndex, const lib::ClusterState& state) const; bool nodeWasUpButNowIsDown(const lib::State& old, const lib::State& nw) const; - typedef std::pair<uint32_t, uint32_t> Range; - - /** - * Skips through all entries for the same bucket and returns - * the range in the entry list for which they were found. - * The range is [from, to> - */ - Range skipAllForSameBucket(); - - void insertInfo(BucketDatabase::Entry& info, const Range& range); - void addToBucketDB(BucketDatabase& db, const Range& range); - - std::vector<BucketCopy> getCopiesThatAreNewOrAltered(BucketDatabase::Entry& info, const Range& range); - - std::string requestNodesToString(); - - // Returns whether at least one replica was removed from the entry. - // Does NOT implicitly update trusted status on remaining replicas; caller must do - // this explicitly. - bool removeCopiesFromNodesThatWereRequested(BucketDatabase::Entry& e, const document::BucketId& bucketId); - - bool databaseIteratorHasPassedBucketInfoIterator(const document::BucketId& bucketId) const; - bool bucketInfoIteratorPointsToBucket(const document::BucketId& bucketId) const; - - bool nodeIsOutdated(uint16_t node) const { - return (_outdatedNodes.find(node) != _outdatedNodes.end()); - } - bool storageNodeUpInNewState(uint16_t node) const; std::shared_ptr<api::SetSystemStateCommand> _cmd; std::map<uint64_t, uint16_t> _sentMessages; std::vector<bool> _requestedNodes; - std::vector<document::BucketId> _removedBuckets; std::deque<std::pair<framework::MilliSecTime, uint16_t> > _delayedRequests; // Set for all nodes that may have changed state since that previous @@ -246,11 +201,6 @@ private: // may be down and thus cannot get a request. std::unordered_set<uint16_t> _outdatedNodes; - EntryList _entries; - uint32_t _iter; - - std::vector<Range> _missingEntries; - lib::ClusterState _prevClusterState; lib::ClusterState _newClusterState; @@ -262,6 +212,7 @@ private: bool _distributionChange; bool _bucketOwnershipTransfer; + std::unique_ptr<PendingBucketSpaceDbTransition> _pendingTransition; }; } |