summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@oath.com>2017-11-07 13:01:08 +0000
committerTor Egge <Tor.Egge@oath.com>2017-11-07 14:29:34 +0000
commit0dbec7f1961d2ee2588305d6a498e710089ebe5f (patch)
treee27d1ee64819c23bcddd0ea0ed2a3a9312762fec /storage
parent45669b2bb56034e8c0d92e964aa17eb5c6cf68d6 (diff)
Factor out portions of PendingClusterState bound to a specific bucket
space to PendingBucketSpaceDbTransition.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/vespa/storage/distributor/CMakeLists.txt1
-rw-r--r--storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp233
-rw-r--r--storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.h92
-rw-r--r--storage/src/vespa/storage/distributor/pending_bucket_space_db_transition_entry.h24
-rw-r--r--storage/src/vespa/storage/distributor/pendingclusterstate.cpp223
-rw-r--r--storage/src/vespa/storage/distributor/pendingclusterstate.h73
6 files changed, 384 insertions, 262 deletions
diff --git a/storage/src/vespa/storage/distributor/CMakeLists.txt b/storage/src/vespa/storage/distributor/CMakeLists.txt
index 3dc08d858ad..3f3f6b8458a 100644
--- a/storage/src/vespa/storage/distributor/CMakeLists.txt
+++ b/storage/src/vespa/storage/distributor/CMakeLists.txt
@@ -27,6 +27,7 @@ vespa_add_library(storage_distributor
operationtargetresolverimpl.cpp
ownership_transfer_safe_time_point_calculator.cpp
pendingclusterstate.cpp
+ pending_bucket_space_db_transition.cpp
pendingmessagetracker.cpp
persistence_operation_metric_set.cpp
persistencemessagetracker.cpp
diff --git a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp
new file mode 100644
index 00000000000..f5c8da4c261
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp
@@ -0,0 +1,233 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "pending_bucket_space_db_transition.h"
+#include "clusterinformation.h"
+#include "pendingclusterstate.h"
+#include <vespa/storage/common/bucketoperationlogger.h>
+#include <algorithm>
+
+#include <vespa/log/log.h>
+LOG_SETUP(".pendingclusterstateprocessor");
+
+namespace storage::distributor {
+
+PendingBucketSpaceDbTransition::PendingBucketSpaceDbTransition(const PendingClusterState &pendingClusterState,
+ std::shared_ptr<const ClusterInformation> clusterInfo,
+ const lib::ClusterState &newClusterState,
+ api::Timestamp creationTimestamp)
+ : _entries(),
+ _iter(0),
+ _removedBuckets(),
+ _missingEntries(),
+ _clusterInfo(std::move(clusterInfo)),
+ _outdatedNodes(pendingClusterState.getOutdatedNodeSet()),
+ _newClusterState(newClusterState),
+ _creationTimestamp(creationTimestamp),
+ _pendingClusterState(pendingClusterState)
+{
+}
+
+PendingBucketSpaceDbTransition::~PendingBucketSpaceDbTransition()
+{
+}
+
+PendingBucketSpaceDbTransition::Range
+PendingBucketSpaceDbTransition::skipAllForSameBucket()
+{
+ Range r(_iter, _iter);
+
+ for (document::BucketId& bid = _entries[_iter].bucketId;
+ _iter < _entries.size() && _entries[_iter].bucketId == bid;
+ ++_iter)
+ {
+ }
+
+ r.second = _iter;
+ return r;
+}
+
+std::vector<BucketCopy>
+PendingBucketSpaceDbTransition::getCopiesThatAreNewOrAltered(BucketDatabase::Entry& info, const Range& range)
+{
+ std::vector<BucketCopy> copiesToAdd;
+ for (uint32_t i = range.first; i < range.second; ++i) {
+ const BucketCopy& candidate(_entries[i].copy);
+ const BucketCopy* cp = info->getNode(candidate.getNode());
+
+ if (!cp || !(cp->getBucketInfo() == candidate.getBucketInfo())) {
+ copiesToAdd.push_back(candidate);
+ }
+ }
+ return copiesToAdd;
+}
+
+void
+PendingBucketSpaceDbTransition::insertInfo(BucketDatabase::Entry& info, const Range& range)
+{
+ std::vector<BucketCopy> copiesToAddOrUpdate(
+ getCopiesThatAreNewOrAltered(info, range));
+
+ std::vector<uint16_t> order(
+ _clusterInfo->getIdealStorageNodesForState(
+ _newClusterState,
+ _entries[range.first].bucketId));
+ info->addNodes(copiesToAddOrUpdate, order, TrustedUpdate::DEFER);
+
+ LOG_BUCKET_OPERATION_NO_LOCK(
+ _entries[range.first].bucketId,
+ vespalib::make_string("insertInfo: %s",
+ info.toString().c_str()));
+}
+
+std::string
+PendingBucketSpaceDbTransition::requestNodesToString()
+{
+ return _pendingClusterState.requestNodesToString();
+}
+
+bool
+PendingBucketSpaceDbTransition::removeCopiesFromNodesThatWereRequested(BucketDatabase::Entry& e, const document::BucketId& bucketId)
+{
+ bool updated = false;
+ for (uint32_t i = 0; i < e->getNodeCount();) {
+ auto& info(e->getNodeRef(i));
+ const uint16_t entryNode(info.getNode());
+ // Don't remove an entry if it's been updated in the time after the
+ // bucket info requests were sent, as this would erase newer state.
+ // Don't immediately update trusted state, as that could erroneously
+ // mark a single remaining replica as trusted even though there might
+ // be one or more additional replicas pending merge into the database.
+ if (nodeIsOutdated(entryNode)
+ && (info.getTimestamp() < _creationTimestamp)
+ && e->removeNode(entryNode, TrustedUpdate::DEFER))
+ {
+ LOG(spam,
+ "Removed bucket %s from node %d",
+ bucketId.toString().c_str(),
+ entryNode);
+ updated = true;
+ // After removing current node, getNodeRef(i) will point to the _next_ node, so don't increment `i`.
+ } else {
+ ++i;
+ }
+ }
+ return updated;
+}
+
+bool
+PendingBucketSpaceDbTransition::databaseIteratorHasPassedBucketInfoIterator(const document::BucketId& bucketId) const
+{
+ return (_iter < _entries.size()
+ && _entries[_iter].bucketId.toKey() < bucketId.toKey());
+}
+
+bool
+PendingBucketSpaceDbTransition::bucketInfoIteratorPointsToBucket(const document::BucketId& bucketId) const
+{
+ return _iter < _entries.size() && _entries[_iter].bucketId == bucketId;
+}
+
+bool
+PendingBucketSpaceDbTransition::process(BucketDatabase::Entry& e)
+{
+ document::BucketId bucketId(e.getBucketId());
+
+ LOG(spam,
+ "Before merging info from nodes [%s], bucket %s had info %s",
+ requestNodesToString().c_str(),
+ bucketId.toString().c_str(),
+ e.getBucketInfo().toString().c_str());
+
+ while (databaseIteratorHasPassedBucketInfoIterator(bucketId)) {
+ LOG(spam, "Found new bucket %s, adding",
+ _entries[_iter].bucketId.toString().c_str());
+
+ _missingEntries.push_back(skipAllForSameBucket());
+ }
+
+ bool updated(removeCopiesFromNodesThatWereRequested(e, bucketId));
+
+ if (bucketInfoIteratorPointsToBucket(bucketId)) {
+ LOG(spam, "Updating bucket %s",
+ _entries[_iter].bucketId.toString().c_str());
+
+ insertInfo(e, skipAllForSameBucket());
+ updated = true;
+ }
+
+ if (updated) {
+ // Remove bucket if we've previously removed all nodes from it
+ if (e->getNodeCount() == 0) {
+ _removedBuckets.push_back(bucketId);
+ } else {
+ e.getBucketInfo().updateTrusted();
+ }
+ }
+
+ LOG(spam,
+ "After merging info from nodes [%s], bucket %s had info %s",
+ requestNodesToString().c_str(),
+ bucketId.toString().c_str(),
+ e.getBucketInfo().toString().c_str());
+
+ return true;
+}
+
+void
+PendingBucketSpaceDbTransition::addToBucketDB(BucketDatabase& db, const Range& range)
+{
+ LOG(spam, "Adding new bucket %s with %d copies",
+ _entries[range.first].bucketId.toString().c_str(),
+ range.second - range.first);
+
+ BucketDatabase::Entry e(_entries[range.first].bucketId, BucketInfo());
+ insertInfo(e, range);
+ if (e->getLastGarbageCollectionTime() == 0) {
+ e->setLastGarbageCollectionTime(
+ framework::MicroSecTime(_creationTimestamp)
+ .getSeconds().getTime());
+ }
+ e.getBucketInfo().updateTrusted();
+ db.update(e);
+}
+
+void
+PendingBucketSpaceDbTransition::mergeInto(BucketDatabase& db)
+{
+ std::sort(_entries.begin(), _entries.end());
+
+ db.forEach(*this);
+
+ for (uint32_t i = 0; i < _removedBuckets.size(); ++i) {
+ db.remove(_removedBuckets[i]);
+ }
+ _removedBuckets.clear();
+
+ // All of the remaining were not already in the bucket database.
+ while (_iter < _entries.size()) {
+ _missingEntries.push_back(skipAllForSameBucket());
+ }
+
+ for (uint32_t i = 0; i < _missingEntries.size(); ++i) {
+ addToBucketDB(db, _missingEntries[i]);
+ }
+}
+
+void
+PendingBucketSpaceDbTransition::onRequestBucketInfoReply(const api::RequestBucketInfoReply &reply, uint16_t node)
+{
+ for (const auto &entry : reply.getBucketInfo()) {
+ _entries.emplace_back(entry._bucketId,
+ BucketCopy(_creationTimestamp,
+ node,
+ entry._info));
+ }
+}
+
+void
+PendingBucketSpaceDbTransition::addNodeInfo(const document::BucketId& id, const BucketCopy& copy)
+{
+ _entries.emplace_back(id, copy);
+}
+
+}
diff --git a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.h b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.h
new file mode 100644
index 00000000000..04c12f9dd7d
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.h
@@ -0,0 +1,92 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "pending_bucket_space_db_transition_entry.h"
+#include <vespa/storage/bucketdb/bucketdatabase.h>
+#include <unordered_set>
+
+namespace storage::api { class RequestBucketInfoReply; }
+namespace storage::lib { class ClusterState; }
+
+namespace storage::distributor {
+
+class ClusterInformation;
+class PendingClusterState;
+
+/**
+ * Class used by PendingClusterState to track request bucket info
+ * reply result within a bucket space and apply it to the distributor
+ * bucket database when switching to the pending cluster state.
+ */
+class PendingBucketSpaceDbTransition : public BucketDatabase::MutableEntryProcessor
+{
+public:
+ using Entry = pendingbucketspacedbtransition::Entry;
+ using EntryList = std::vector<Entry>;
+private:
+ using Range = std::pair<uint32_t, uint32_t>;
+
+ EntryList _entries;
+ uint32_t _iter;
+ std::vector<document::BucketId> _removedBuckets;
+ std::vector<Range> _missingEntries;
+ std::shared_ptr<const ClusterInformation> _clusterInfo;
+
+ // Set for all nodes that may have changed state since that previous
+ // active cluster state, or that were marked as outdated when the pending
+ // cluster state was constructed.
+ // May be a superset of _requestedNodes, as some nodes that are outdated
+ // may be down and thus cannot get a request.
+ const std::unordered_set<uint16_t> _outdatedNodes;
+
+ const lib::ClusterState &_newClusterState;
+ const api::Timestamp _creationTimestamp;
+ const PendingClusterState &_pendingClusterState;
+
+ // BucketDataBase::MutableEntryProcessor API
+ bool process(BucketDatabase::Entry& e) override;
+
+ /**
+ * Skips through all entries for the same bucket and returns
+ * the range in the entry list for which they were found.
+ * The range is [from, to>
+ */
+ Range skipAllForSameBucket();
+
+ std::vector<BucketCopy> getCopiesThatAreNewOrAltered(BucketDatabase::Entry& info, const Range& range);
+ void insertInfo(BucketDatabase::Entry& info, const Range& range);
+ void addToBucketDB(BucketDatabase& db, const Range& range);
+
+ bool nodeIsOutdated(uint16_t node) const {
+ return (_outdatedNodes.find(node) != _outdatedNodes.end());
+ }
+
+ // Returns whether at least one replica was removed from the entry.
+ // Does NOT implicitly update trusted status on remaining replicas; caller must do
+ // this explicitly.
+ bool removeCopiesFromNodesThatWereRequested(BucketDatabase::Entry& e, const document::BucketId& bucketId);
+
+ // Helper methods for iterating over _entries
+ bool databaseIteratorHasPassedBucketInfoIterator(const document::BucketId& bucketId) const;
+ bool bucketInfoIteratorPointsToBucket(const document::BucketId& bucketId) const;
+ std::string requestNodesToString();
+
+public:
+ PendingBucketSpaceDbTransition(const PendingClusterState &pendingClusterState,
+ std::shared_ptr<const ClusterInformation> clusterInfo,
+ const lib::ClusterState &newClusterState,
+ api::Timestamp creationTimestamp);
+ ~PendingBucketSpaceDbTransition();
+
+ // Merges all the results with the given bucket database.
+ void mergeInto(BucketDatabase& db);
+
+ // Adds the info from the reply to our list of information.
+ void onRequestBucketInfoReply(const api::RequestBucketInfoReply &reply, uint16_t node);
+
+ // Methods used by unit tests.
+ const EntryList& results() const { return _entries; }
+ void addNodeInfo(const document::BucketId& id, const BucketCopy& copy);
+};
+
+}
diff --git a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition_entry.h b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition_entry.h
new file mode 100644
index 00000000000..a2751275ba5
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition_entry.h
@@ -0,0 +1,24 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/document/bucket/bucketid.h>
+#include <vespa/storage/bucketdb/bucketcopy.h>
+
+namespace storage::distributor::pendingbucketspacedbtransition {
+
+struct Entry {
+ Entry(const document::BucketId& bid,
+ const BucketCopy& copy_)
+ : bucketId(bid),
+ copy(copy_)
+ {}
+
+ document::BucketId bucketId;
+ BucketCopy copy;
+
+ bool operator<(const Entry& other) const {
+ return bucketId.toKey() < other.bucketId.toKey();
+ }
+};
+
+}
diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
index 9ad803b7b7f..41c8974470f 100644
--- a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
+++ b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
@@ -1,6 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "pendingclusterstate.h"
+#include "pending_bucket_space_db_transition.h"
#include "bucketdbupdater.h"
#include <vespa/storageframework/defaultimplementation/clock/realclock.h>
#include <vespa/storage/common/bucketoperationlogger.h>
@@ -28,14 +29,14 @@ PendingClusterState::PendingClusterState(
: _cmd(newStateCmd),
_requestedNodes(newStateCmd->getSystemState().getNodeCount(lib::NodeType::STORAGE)),
_outdatedNodes(newStateCmd->getSystemState().getNodeCount(lib::NodeType::STORAGE)),
- _iter(0),
_prevClusterState(clusterInfo->getClusterState()),
_newClusterState(newStateCmd->getSystemState()),
_clock(clock),
_clusterInfo(clusterInfo),
_creationTimestamp(creationTimestamp),
_sender(sender),
- _bucketOwnershipTransfer(distributorChanged(_prevClusterState, _newClusterState))
+ _bucketOwnershipTransfer(distributorChanged(_prevClusterState, _newClusterState)),
+ _pendingTransition()
{
logConstructionInformation();
if (hasBucketOwnershipTransfer()) {
@@ -44,6 +45,7 @@ PendingClusterState::PendingClusterState(
updateSetOfNodesThatAreOutdated();
addAdditionalNodesToOutdatedSet(outdatedNodes);
}
+ _pendingTransition = std::make_unique<PendingBucketSpaceDbTransition>(*this, _clusterInfo, _newClusterState, _creationTimestamp);
if (shouldRequestBucketInfo()) {
requestNodes();
}
@@ -56,17 +58,18 @@ PendingClusterState::PendingClusterState(
api::Timestamp creationTimestamp)
: _requestedNodes(clusterInfo->getStorageNodeCount()),
_outdatedNodes(clusterInfo->getStorageNodeCount()),
- _iter(0),
_prevClusterState(clusterInfo->getClusterState()),
_newClusterState(clusterInfo->getClusterState()),
_clock(clock),
_clusterInfo(clusterInfo),
_creationTimestamp(creationTimestamp),
_sender(sender),
- _bucketOwnershipTransfer(true)
+ _bucketOwnershipTransfer(true),
+ _pendingTransition()
{
logConstructionInformation();
markAllAvailableNodesAsRequiringRequest();
+ _pendingTransition = std::make_unique<PendingBucketSpaceDbTransition>(*this, _clusterInfo, _newClusterState, _creationTimestamp);
if (shouldRequestBucketInfo()) {
requestNodes();
}
@@ -364,14 +367,7 @@ PendingClusterState::onRequestBucketInfoReply(const std::shared_ptr<api::Request
}
setNodeReplied(node);
-
- for (uint32_t i = 0; i < reply->getBucketInfo().size(); ++i) {
- addNodeInfo(reply->getBucketInfo()[i]._bucketId,
- BucketCopy(_creationTimestamp,
- node,
- reply->getBucketInfo()[i]._info));
- }
-
+ _pendingTransition->onRequestBucketInfoReply(*reply, node);
_sentMessages.erase(iter);
return true;
@@ -389,68 +385,8 @@ PendingClusterState::resendDelayedMessages() {
}
}
-void
-PendingClusterState::addNodeInfo(
- const document::BucketId& id,
- const BucketCopy& copy)
-{
- _entries.push_back(Entry(id, copy));
-}
-
-PendingClusterState::Range
-PendingClusterState::skipAllForSameBucket()
-{
- Range r(_iter, _iter);
-
- for (document::BucketId& bid = _entries[_iter].bucketId;
- _iter < _entries.size() && _entries[_iter].bucketId == bid;
- ++_iter)
- {
- }
-
- r.second = _iter;
- return r;
-}
-
-void
-PendingClusterState::insertInfo(
- BucketDatabase::Entry& info,
- const Range& range)
-{
- std::vector<BucketCopy> copiesToAddOrUpdate(
- getCopiesThatAreNewOrAltered(info, range));
-
- std::vector<uint16_t> order(
- _clusterInfo->getIdealStorageNodesForState(
- _newClusterState,
- _entries[range.first].bucketId));
- info->addNodes(copiesToAddOrUpdate, order, TrustedUpdate::DEFER);
-
- LOG_BUCKET_OPERATION_NO_LOCK(
- _entries[range.first].bucketId,
- vespalib::make_string("insertInfo: %s",
- info.toString().c_str()));
-}
-
-std::vector<BucketCopy>
-PendingClusterState::getCopiesThatAreNewOrAltered(
- BucketDatabase::Entry& info,
- const Range& range)
-{
- std::vector<BucketCopy> copiesToAdd;
- for (uint32_t i = range.first; i < range.second; ++i) {
- const BucketCopy& candidate(_entries[i].copy);
- const BucketCopy* cp = info->getNode(candidate.getNode());
-
- if (!cp || !(cp->getBucketInfo() == candidate.getBucketInfo())) {
- copiesToAdd.push_back(candidate);
- }
- }
- return copiesToAdd;
-}
-
std::string
-PendingClusterState::requestNodesToString()
+PendingClusterState::requestNodesToString() const
{
std::ostringstream ost;
for (uint32_t i = 0; i < _requestedNodes.size(); ++i) {
@@ -464,137 +400,10 @@ PendingClusterState::requestNodesToString()
return ost.str();
}
-bool
-PendingClusterState::removeCopiesFromNodesThatWereRequested(
- BucketDatabase::Entry& e,
- const document::BucketId& bucketId)
-{
- bool updated = false;
- for (uint32_t i = 0; i < e->getNodeCount();) {
- auto& info(e->getNodeRef(i));
- const uint16_t entryNode(info.getNode());
- // Don't remove an entry if it's been updated in the time after the
- // bucket info requests were sent, as this would erase newer state.
- // Don't immediately update trusted state, as that could erroneously
- // mark a single remaining replica as trusted even though there might
- // be one or more additional replicas pending merge into the database.
- if (nodeIsOutdated(entryNode)
- && (info.getTimestamp() < _creationTimestamp)
- && e->removeNode(entryNode, TrustedUpdate::DEFER))
- {
- LOG(spam,
- "Removed bucket %s from node %d",
- bucketId.toString().c_str(),
- entryNode);
- updated = true;
- // After removing current node, getNodeRef(i) will point to the _next_ node, so don't increment `i`.
- } else {
- ++i;
- }
- }
- return updated;
-}
-
-bool
-PendingClusterState::databaseIteratorHasPassedBucketInfoIterator(
- const document::BucketId& bucketId) const
-{
- return (_iter < _entries.size()
- && _entries[_iter].bucketId.toKey() < bucketId.toKey());
-}
-
-bool
-PendingClusterState::bucketInfoIteratorPointsToBucket(
- const document::BucketId& bucketId) const
-{
- return _iter < _entries.size() && _entries[_iter].bucketId == bucketId;
-}
-
-bool
-PendingClusterState::process(BucketDatabase::Entry& e)
-{
- document::BucketId bucketId(e.getBucketId());
-
- LOG(spam,
- "Before merging info from nodes [%s], bucket %s had info %s",
- requestNodesToString().c_str(),
- bucketId.toString().c_str(),
- e.getBucketInfo().toString().c_str());
-
- while (databaseIteratorHasPassedBucketInfoIterator(bucketId)) {
- LOG(spam, "Found new bucket %s, adding",
- _entries[_iter].bucketId.toString().c_str());
-
- _missingEntries.push_back(skipAllForSameBucket());
- }
-
- bool updated(removeCopiesFromNodesThatWereRequested(e, bucketId));
-
- if (bucketInfoIteratorPointsToBucket(bucketId)) {
- LOG(spam, "Updating bucket %s",
- _entries[_iter].bucketId.toString().c_str());
-
- insertInfo(e, skipAllForSameBucket());
- updated = true;
- }
-
- if (updated) {
- // Remove bucket if we've previously removed all nodes from it
- if (e->getNodeCount() == 0) {
- _removedBuckets.push_back(bucketId);
- } else {
- e.getBucketInfo().updateTrusted();
- }
- }
-
- LOG(spam,
- "After merging info from nodes [%s], bucket %s had info %s",
- requestNodesToString().c_str(),
- bucketId.toString().c_str(),
- e.getBucketInfo().toString().c_str());
-
- return true;
-}
-
-void
-PendingClusterState::addToBucketDB(BucketDatabase& db,
- const Range& range)
-{
- LOG(spam, "Adding new bucket %s with %d copies",
- _entries[range.first].bucketId.toString().c_str(),
- range.second - range.first);
-
- BucketDatabase::Entry e(_entries[range.first].bucketId, BucketInfo());
- insertInfo(e, range);
- if (e->getLastGarbageCollectionTime() == 0) {
- e->setLastGarbageCollectionTime(
- framework::MicroSecTime(_creationTimestamp)
- .getSeconds().getTime());
- }
- e.getBucketInfo().updateTrusted();
- db.update(e);
-}
-
void
PendingClusterState::mergeInto(BucketDatabase& db)
{
- std::sort(_entries.begin(), _entries.end());
-
- db.forEach(*this);
-
- for (uint32_t i = 0; i < _removedBuckets.size(); ++i) {
- db.remove(_removedBuckets[i]);
- }
- _removedBuckets.clear();
-
- // All of the remaining were not already in the bucket database.
- while (_iter < _entries.size()) {
- _missingEntries.push_back(skipAllForSameBucket());
- }
-
- for (uint32_t i = 0; i < _missingEntries.size(); ++i) {
- addToBucketDB(db, _missingEntries[i]);
- }
+ _pendingTransition->mergeInto(db);
}
void
@@ -621,4 +430,16 @@ PendingClusterState::getSummary() const
(_clock.getTimeInMicros().getTime() - _creationTimestamp));
}
+const PendingBucketSpaceDbTransition::EntryList &
+PendingClusterState::results() const
+{
+ return _pendingTransition->results();
+}
+
+void
+PendingClusterState::addNodeInfo(const document::BucketId& id, const BucketCopy& copy)
+{
+ _pendingTransition->addNodeInfo(id, copy);
+}
+
}
diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.h b/storage/src/vespa/storage/distributor/pendingclusterstate.h
index 316d7996d81..657d4f28179 100644
--- a/storage/src/vespa/storage/distributor/pendingclusterstate.h
+++ b/storage/src/vespa/storage/distributor/pendingclusterstate.h
@@ -1,8 +1,8 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
+#include "pending_bucket_space_db_transition_entry.h"
#include "clusterinformation.h"
-#include <vespa/storage/bucketdb/bucketdatabase.h>
#include <vespa/storage/common/storagelink.h>
#include <vespa/storageapi/message/bucket.h>
#include <vespa/storageapi/message/state.h>
@@ -12,32 +12,19 @@
#include <unordered_set>
#include <deque>
+namespace storage { class BucketDatabase; }
+
namespace storage::distributor {
class DistributorMessageSender;
+class PendingBucketSpaceDbTransition;
/**
* Class used by BucketDBUpdater to track request bucket info
* messages sent to the storage nodes.
*/
-class PendingClusterState : public vespalib::XmlSerializable,
- public BucketDatabase::MutableEntryProcessor {
+class PendingClusterState : public vespalib::XmlSerializable {
public:
- struct Entry {
- Entry(const document::BucketId& bid,
- const BucketCopy& copy_)
- : bucketId(bid),
- copy(copy_)
- {}
-
- document::BucketId bucketId;
- BucketCopy copy;
-
- bool operator<(const Entry& other) const {
- return bucketId.toKey() < other.bucketId.toKey();
- }
- };
-
struct Summary {
Summary(const std::string& prevClusterState, const std::string& newClusterState, uint32_t processingTime);
Summary(const Summary &);
@@ -51,8 +38,6 @@ public:
uint32_t _processingTime;
};
- typedef std::vector<Entry> EntryList;
-
static std::unique_ptr<PendingClusterState> createForClusterStateChange(
const framework::Clock& clock,
const ClusterInformation::CSP& clusterInfo,
@@ -99,11 +84,6 @@ public:
_requestedNodes[nodeIdx] = true;
}
- /**
- * Adds info from a node to our list of information.
- */
- void addNodeInfo(const document::BucketId& id, const BucketCopy& copy);
-
/** Called to resend delayed resends due to failures. */
void resendDelayedMessages();
@@ -146,8 +126,11 @@ public:
* Merges all the results with the given bucket database.
*/
void mergeInto(BucketDatabase& db);
- bool process(BucketDatabase::Entry& e) override;
- const EntryList& results() const { return _entries; }
+ // Get our list of information. Only used by unit test.
+ const std::vector<pendingbucketspacedbtransition::Entry>& results() const;
+ // Adds info from a node to our list of information. Only used by unit test.
+ void addNodeInfo(const document::BucketId& id, const BucketCopy& copy);
+
/**
* Returns true if this pending state was due to a distribution bit
@@ -156,6 +139,7 @@ public:
bool distributionChange() const { return _distributionChange; }
void printXml(vespalib::XmlOutputStream&) const override;
Summary getSummary() const;
+ std::string requestNodesToString() const;
private:
/**
@@ -202,41 +186,12 @@ private:
bool nodeNeedsOwnershipTransferFromGroupDown(uint16_t nodeIndex, const lib::ClusterState& state) const;
bool nodeWasUpButNowIsDown(const lib::State& old, const lib::State& nw) const;
- typedef std::pair<uint32_t, uint32_t> Range;
-
- /**
- * Skips through all entries for the same bucket and returns
- * the range in the entry list for which they were found.
- * The range is [from, to>
- */
- Range skipAllForSameBucket();
-
- void insertInfo(BucketDatabase::Entry& info, const Range& range);
- void addToBucketDB(BucketDatabase& db, const Range& range);
-
- std::vector<BucketCopy> getCopiesThatAreNewOrAltered(BucketDatabase::Entry& info, const Range& range);
-
- std::string requestNodesToString();
-
- // Returns whether at least one replica was removed from the entry.
- // Does NOT implicitly update trusted status on remaining replicas; caller must do
- // this explicitly.
- bool removeCopiesFromNodesThatWereRequested(BucketDatabase::Entry& e, const document::BucketId& bucketId);
-
- bool databaseIteratorHasPassedBucketInfoIterator(const document::BucketId& bucketId) const;
- bool bucketInfoIteratorPointsToBucket(const document::BucketId& bucketId) const;
-
- bool nodeIsOutdated(uint16_t node) const {
- return (_outdatedNodes.find(node) != _outdatedNodes.end());
- }
-
bool storageNodeUpInNewState(uint16_t node) const;
std::shared_ptr<api::SetSystemStateCommand> _cmd;
std::map<uint64_t, uint16_t> _sentMessages;
std::vector<bool> _requestedNodes;
- std::vector<document::BucketId> _removedBuckets;
std::deque<std::pair<framework::MilliSecTime, uint16_t> > _delayedRequests;
// Set for all nodes that may have changed state since that previous
@@ -246,11 +201,6 @@ private:
// may be down and thus cannot get a request.
std::unordered_set<uint16_t> _outdatedNodes;
- EntryList _entries;
- uint32_t _iter;
-
- std::vector<Range> _missingEntries;
-
lib::ClusterState _prevClusterState;
lib::ClusterState _newClusterState;
@@ -262,6 +212,7 @@ private:
bool _distributionChange;
bool _bucketOwnershipTransfer;
+ std::unique_ptr<PendingBucketSpaceDbTransition> _pendingTransition;
};
}