summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorGeir Storli <geirstorli@yahoo.no>2017-11-14 13:52:06 +0100
committerGitHub <noreply@github.com>2017-11-14 13:52:06 +0100
commit3aefb034de5176d6225140312096adc37f520d93 (patch)
treeb9c3c9337e0e42b4fe9f18a394deada0d19e73c5 /storage
parentb66d6bd93cbffbbcb4b3276acde6d343149406f9 (diff)
parent1d2a276f4fcf5a10fb987c3a15c149ed66e04f88 (diff)
Merge pull request #4102 from vespa-engine/toregge/change-distributor-bucket-db-updater-to-handle-multiple-bucket-spaces
Change distributor BucketDBUpdater to handle multiple bucket spaces
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/distributor/bucketdbupdatertest.cpp13
-rw-r--r--storage/src/tests/distributor/distributortest.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/bucketdbupdater.cpp85
-rw-r--r--storage/src/vespa/storage/distributor/bucketdbupdater.h29
-rw-r--r--storage/src/vespa/storage/distributor/clusterinformation.cpp24
-rw-r--r--storage/src/vespa/storage/distributor/clusterinformation.h10
-rw-r--r--storage/src/vespa/storage/distributor/distributor.cpp6
-rw-r--r--storage/src/vespa/storage/distributor/pendingclusterstate.cpp8
-rw-r--r--storage/src/vespa/storage/distributor/pendingclusterstate.h3
-rw-r--r--storage/src/vespa/storage/distributor/simpleclusterinformation.h7
10 files changed, 71 insertions, 116 deletions
diff --git a/storage/src/tests/distributor/bucketdbupdatertest.cpp b/storage/src/tests/distributor/bucketdbupdatertest.cpp
index 03a51cf6180..4a3f93d3f45 100644
--- a/storage/src/tests/distributor/bucketdbupdatertest.cpp
+++ b/storage/src/tests/distributor/bucketdbupdatertest.cpp
@@ -151,7 +151,6 @@ protected:
ClusterInformation::CSP clusterInfo(
new SimpleClusterInformation(
getBucketDBUpdater().getDistributorComponent().getIndex(),
- getBucketDBUpdater().getDistributorComponent().getDistribution(),
lib::ClusterState(clusterState),
"ui"));
return clusterInfo;
@@ -880,7 +879,7 @@ BucketDBUpdaterTest::testInitializingWhileRecheck()
CPPUNIT_ASSERT_EQUAL(size_t(2), _sender.commands.size());
CPPUNIT_ASSERT_EQUAL(size_t(0), _senderDown.commands.size());
- getBucketDBUpdater().recheckBucketInfo(1, document::BucketId(16, 3));
+ getBucketDBUpdater().recheckBucketInfo(1, makeDocumentBucket(document::BucketId(16, 3)));
for (int i=0; i<2; i++) {
fakeBucketReply(systemState,
@@ -1010,7 +1009,7 @@ BucketDBUpdaterTest::testRecheckNodeWithFailure()
_sender.clear();
- getBucketDBUpdater().recheckBucketInfo(1, document::BucketId(16, 3));
+ getBucketDBUpdater().recheckBucketInfo(1, makeDocumentBucket(document::BucketId(16, 3)));
CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.commands.size());
@@ -1060,7 +1059,7 @@ BucketDBUpdaterTest::testRecheckNode()
_sender.clear();
- getBucketDBUpdater().recheckBucketInfo(1, document::BucketId(16, 3));
+ getBucketDBUpdater().recheckBucketInfo(1, makeDocumentBucket(document::BucketId(16, 3)));
CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.commands.size());
@@ -1954,7 +1953,7 @@ BucketDBUpdaterTest::testNoDbResurrectionForBucketNotOwnedInCurrentState()
}
_sender.clear();
- getBucketDBUpdater().recheckBucketInfo(0, bucket);
+ getBucketDBUpdater().recheckBucketInfo(0, makeDocumentBucket(bucket));
CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.commands.size());
std::shared_ptr<api::RequestBucketInfoCommand> rbi(
@@ -1986,7 +1985,7 @@ BucketDBUpdaterTest::testNoDbResurrectionForBucketNotOwnedInPendingState()
}
_sender.clear();
- getBucketDBUpdater().recheckBucketInfo(0, bucket);
+ getBucketDBUpdater().recheckBucketInfo(0, makeDocumentBucket(bucket));
CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.commands.size());
std::shared_ptr<api::RequestBucketInfoCommand> rbi(
@@ -1999,7 +1998,7 @@ BucketDBUpdaterTest::testNoDbResurrectionForBucketNotOwnedInPendingState()
CPPUNIT_ASSERT(getBucketDBUpdater().getDistributorComponent()
.ownsBucketInCurrentState(makeDocumentBucket(bucket)));
CPPUNIT_ASSERT(!getBucketDBUpdater()
- .checkOwnershipInPendingState(bucket).isOwned());
+ .checkOwnershipInPendingState(makeDocumentBucket(bucket)).isOwned());
sendFakeReplyForSingleBucketRequest(*rbi);
diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp
index cbc78157911..1640af0f871 100644
--- a/storage/src/tests/distributor/distributortest.cpp
+++ b/storage/src/tests/distributor/distributortest.cpp
@@ -556,7 +556,7 @@ Distributor_Test::testNoDbResurrectionForBucketNotOwnedInPendingState()
document::BucketId nonOwnedBucket(16, 3);
CPPUNIT_ASSERT(!getBucketDBUpdater()
- .checkOwnershipInPendingState(nonOwnedBucket).isOwned());
+ .checkOwnershipInPendingState(makeDocumentBucket(nonOwnedBucket)).isOwned());
CPPUNIT_ASSERT(!getBucketDBUpdater().getDistributorComponent()
.checkOwnershipInPendingAndCurrentState(makeDocumentBucket(nonOwnedBucket))
.isOwned());
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
index b68e53c7136..85d7af571e4 100644
--- a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
+++ b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
@@ -2,6 +2,7 @@
#include "bucketdbupdater.h"
#include "distributor.h"
+#include "distributor_bucket_space_repo.h"
#include "simpleclusterinformation.h"
#include <vespa/storage/common/bucketoperationlogger.h>
#include <vespa/storageapi/message/persistence.h>
@@ -59,13 +60,11 @@ BucketDBUpdater::hasPendingClusterState() const
}
BucketOwnership
-BucketDBUpdater::checkOwnershipInPendingState(const document::BucketId& b) const
+BucketDBUpdater::checkOwnershipInPendingState(const document::Bucket& b) const
{
if (hasPendingClusterState()) {
const lib::ClusterState& state(_pendingClusterState->getNewClusterState());
- const lib::Distribution& distribution(_pendingClusterState->getDistribution());
- document::Bucket bucket(BucketSpace::placeHolder(), b);
- if (!_bucketSpaceComponent.ownsBucketInState(distribution, state, bucket)) {
+ if (!_bucketSpaceComponent.ownsBucketInState(state, b)) {
return BucketOwnership::createNotOwnedInState(state);
}
}
@@ -75,7 +74,7 @@ BucketDBUpdater::checkOwnershipInPendingState(const document::BucketId& b) const
void
BucketDBUpdater::sendRequestBucketInfo(
uint16_t node,
- const document::BucketId& bucket,
+ const document::Bucket& bucket,
const std::shared_ptr<MergeReplyGuard>& mergeReplyGuard)
{
if (!_bucketSpaceComponent.storageNodeIsUp(node)) {
@@ -83,10 +82,10 @@ BucketDBUpdater::sendRequestBucketInfo(
}
std::vector<document::BucketId> buckets;
- buckets.push_back(bucket);
+ buckets.push_back(bucket.getBucketId());
std::shared_ptr<api::RequestBucketInfoCommand> msg(
- new api::RequestBucketInfoCommand(BucketSpace::placeHolder(), buckets));
+ new api::RequestBucketInfoCommand(bucket.getBucketSpace(), buckets));
LOG(debug,
"Sending request bucket info command %lu for "
@@ -106,30 +105,33 @@ BucketDBUpdater::sendRequestBucketInfo(
void
BucketDBUpdater::recheckBucketInfo(uint32_t nodeIdx,
- const document::BucketId& bid)
+ const document::Bucket& bucket)
{
- sendRequestBucketInfo(nodeIdx, bid, std::shared_ptr<MergeReplyGuard>());
+ sendRequestBucketInfo(nodeIdx, bucket, std::shared_ptr<MergeReplyGuard>());
}
void
BucketDBUpdater::removeSuperfluousBuckets(
- const lib::Distribution& newDistribution,
const lib::ClusterState& newState)
{
- // Remove all buckets not belonging to this distributor, or
- // being on storage nodes that are no longer up.
- NodeRemover proc(
- _bucketSpaceComponent.getClusterState(),
- newState,
- _bucketSpaceComponent.getBucketIdFactory(),
- _bucketSpaceComponent.getIndex(),
- newDistribution,
- _bucketSpaceComponent.getDistributor().getStorageNodeUpStates());
+ for (auto &elem : _bucketSpaceComponent.getBucketSpaceRepo()) {
+ const auto &newDistribution(elem.second->getDistribution());
+ auto &bucketDb(elem.second->getBucketDatabase());
- _bucketSpaceComponent.getBucketDatabase().forEach(proc);
+ // Remove all buckets not belonging to this distributor, or
+ // being on storage nodes that are no longer up.
+ NodeRemover proc(
+ _bucketSpaceComponent.getClusterState(),
+ newState,
+ _bucketSpaceComponent.getBucketIdFactory(),
+ _bucketSpaceComponent.getIndex(),
+ newDistribution,
+ _bucketSpaceComponent.getDistributor().getStorageNodeUpStates());
+ bucketDb.forEach(proc);
- for (const auto & entry :proc.getBucketsToRemove()) {
- _bucketSpaceComponent.getBucketDatabase().remove(entry);
+ for (const auto & entry :proc.getBucketsToRemove()) {
+ bucketDb.remove(entry);
+ }
}
}
@@ -152,17 +154,14 @@ BucketDBUpdater::completeTransitionTimer()
}
void
-BucketDBUpdater::storageDistributionChanged(
- const lib::Distribution& distribution)
+BucketDBUpdater::storageDistributionChanged()
{
ensureTransitionTimerStarted();
- removeSuperfluousBuckets(distribution,
- _bucketSpaceComponent.getClusterState());
+ removeSuperfluousBuckets(_bucketSpaceComponent.getClusterState());
ClusterInformation::CSP clusterInfo(new SimpleClusterInformation(
_bucketSpaceComponent.getIndex(),
- distribution,
_bucketSpaceComponent.getClusterState(),
_bucketSpaceComponent.getDistributor().getStorageNodeUpStates()));
_pendingClusterState = PendingClusterState::createForDistributionChange(
@@ -201,15 +200,12 @@ BucketDBUpdater::onSetSystemState(
}
ensureTransitionTimerStarted();
- removeSuperfluousBuckets(
- _bucketSpaceComponent.getDistribution(),
- cmd->getSystemState());
+ removeSuperfluousBuckets(cmd->getSystemState());
replyToPreviousPendingClusterStateIfAny();
ClusterInformation::CSP clusterInfo(
new SimpleClusterInformation(
_bucketSpaceComponent.getIndex(),
- _bucketSpaceComponent.getDistribution(),
_bucketSpaceComponent.getClusterState(),
_bucketSpaceComponent.getDistributor()
.getStorageNodeUpStates()));
@@ -248,7 +244,7 @@ BucketDBUpdater::onMergeBucketReply(
// bucket again to make sure it's ok.
for (uint32_t i = 0; i < reply->getNodes().size(); i++) {
sendRequestBucketInfo(reply->getNodes()[i].index,
- reply->getBucketId(),
+ reply->getBucket(),
replyGuard);
}
@@ -258,7 +254,7 @@ BucketDBUpdater::onMergeBucketReply(
void
BucketDBUpdater::enqueueRecheckUntilPendingStateEnabled(
uint16_t node,
- const document::BucketId& bucket)
+ const document::Bucket& bucket)
{
LOG(spam,
"DB updater has a pending cluster state, enqueuing recheck "
@@ -305,10 +301,10 @@ BucketDBUpdater::onNotifyBucketChange(
if (hasPendingClusterState()) {
enqueueRecheckUntilPendingStateEnabled(cmd->getSourceIndex(),
- cmd->getBucketId());
+ cmd->getBucket());
} else {
sendRequestBucketInfo(cmd->getSourceIndex(),
- cmd->getBucketId(),
+ cmd->getBucket(),
std::shared_ptr<MergeReplyGuard>());
}
@@ -357,7 +353,7 @@ BucketDBUpdater::handleSingleBucketInfoFailure(
LOG(debug, "Request bucket info failed towards node %d: error was %s",
req.targetNode, repl->getResult().toString().c_str());
- if (req.bucket != document::BucketId(0)) {
+ if (req.bucket.getBucketId() != document::BucketId(0)) {
framework::MilliSecTime sendTime(_bucketSpaceComponent.getClock());
sendTime += framework::MilliSecTime(100);
_delayedRequests.emplace_back(sendTime, req);
@@ -409,7 +405,7 @@ BucketDBUpdater::mergeBucketInfoWithDatabase(
std::sort(newList.begin(), newList.end(), sort_pred);
BucketListMerger merger(newList, existing, req.timestamp);
- updateDatabase(req.targetNode, merger);
+ updateDatabase(req.bucket.getBucketSpace(), req.targetNode, merger);
}
bool
@@ -451,11 +447,12 @@ BucketDBUpdater::addBucketInfoForNode(
}
void
-BucketDBUpdater::findRelatedBucketsInDatabase(uint16_t node, const document::BucketId& bucketId,
+BucketDBUpdater::findRelatedBucketsInDatabase(uint16_t node, const document::Bucket& bucket,
BucketListMerger::BucketList& existing)
{
+ auto &distributorBucketSpace(_bucketSpaceComponent.getBucketSpaceRepo().get(bucket.getBucketSpace()));
std::vector<BucketDatabase::Entry> entries;
- _bucketSpaceComponent.getBucketDatabase().getAll(bucketId, entries);
+ distributorBucketSpace.getBucketDatabase().getAll(bucket.getBucketId(), entries);
for (const BucketDatabase::Entry & entry : entries) {
addBucketInfoForNode(entry, node, existing);
@@ -463,15 +460,15 @@ BucketDBUpdater::findRelatedBucketsInDatabase(uint16_t node, const document::Buc
}
void
-BucketDBUpdater::updateDatabase(uint16_t node, BucketListMerger& merger)
+BucketDBUpdater::updateDatabase(document::BucketSpace bucketSpace, uint16_t node, BucketListMerger& merger)
{
for (const document::BucketId & bucketId : merger.getRemovedEntries()) {
- document::Bucket bucket(BucketSpace::placeHolder(), bucketId);
+ document::Bucket bucket(bucketSpace, bucketId);
_bucketSpaceComponent.removeNodeFromDB(bucket, node);
}
for (const BucketListMerger::BucketEntry& entry : merger.getAddedEntries()) {
- document::Bucket bucket(BucketSpace::placeHolder(), entry.first);
+ document::Bucket bucket(bucketSpace, entry.first);
_bucketSpaceComponent.updateBucketDatabase(
bucket,
BucketCopy(merger.getTimestamp(), node, entry.second),
@@ -585,10 +582,10 @@ BucketDBUpdater::reportXmlStatus(vespalib::xml::XmlOutputStream& xos,
{
xos << XmlTag("storagenode")
<< XmlAttribute("index", entry.second.targetNode);
- if (entry.second.bucket.getRawId() == 0) {
+ if (entry.second.bucket.getBucketId().getRawId() == 0) {
xos << XmlAttribute("bucket", ALL);
} else {
- xos << XmlAttribute("bucket", entry.second.bucket.getId(), XmlAttribute::HEX);
+ xos << XmlAttribute("bucket", entry.second.bucket.getBucketId().getId(), XmlAttribute::HEX);
}
xos << XmlAttribute("sendtimestamp", entry.second.timestamp)
<< XmlEndTag();
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.h b/storage/src/vespa/storage/distributor/bucketdbupdater.h
index ee07c46754f..c4b387c9a2f 100644
--- a/storage/src/vespa/storage/distributor/bucketdbupdater.h
+++ b/storage/src/vespa/storage/distributor/bucketdbupdater.h
@@ -8,7 +8,7 @@
#include "pendingclusterstate.h"
#include "distributor_bucket_space_component.h"
#include "outdated_nodes_map.h"
-#include <vespa/document/bucket/bucketid.h>
+#include <vespa/document/bucket/bucket.h>
#include <vespa/storageapi/messageapi/returncode.h>
#include <vespa/storageapi/message/bucket.h>
#include <vespa/vdslib/state/clusterstate.h>
@@ -38,15 +38,15 @@ public:
~BucketDBUpdater();
void flush();
- BucketOwnership checkOwnershipInPendingState(const document::BucketId&) const;
- void recheckBucketInfo(uint32_t nodeIdx, const document::BucketId& bid);
+ BucketOwnership checkOwnershipInPendingState(const document::Bucket&) const;
+ void recheckBucketInfo(uint32_t nodeIdx, const document::Bucket& bucket);
bool onSetSystemState(const std::shared_ptr<api::SetSystemStateCommand>& cmd) override;
bool onRequestBucketInfoReply(const std::shared_ptr<api::RequestBucketInfoReply> & repl) override;
bool onMergeBucketReply(const std::shared_ptr<api::MergeBucketReply>& reply) override;
bool onNotifyBucketChange(const std::shared_ptr<api::NotifyBucketChangeCommand>&) override;
void resendDelayedMessages();
- void storageDistributionChanged(const lib::Distribution&);
+ void storageDistributionChanged();
vespalib::string reportXmlStatus(vespalib::xml::XmlOutputStream&, const framework::HttpUrlPath&) const;
vespalib::string getReportContentType(const framework::HttpUrlPath&) const override;
@@ -84,9 +84,9 @@ private:
struct BucketRequest {
BucketRequest()
- : targetNode(0), bucket(0), timestamp(0) {};
+ : targetNode(0), bucket(), timestamp(0) {};
- BucketRequest(uint16_t t, uint64_t currentTime, const document::BucketId& b,
+ BucketRequest(uint16_t t, uint64_t currentTime, const document::Bucket& b,
const std::shared_ptr<MergeReplyGuard>& guard)
: targetNode(t),
bucket(b),
@@ -94,7 +94,7 @@ private:
_mergeReplyGuard(guard) {};
uint16_t targetNode;
- document::BucketId bucket;
+ document::Bucket bucket;
uint64_t timestamp;
std::shared_ptr<MergeReplyGuard> _mergeReplyGuard;
@@ -102,11 +102,11 @@ private:
struct EnqueuedBucketRecheck {
uint16_t node;
- document::BucketId bucket;
+ document::Bucket bucket;
EnqueuedBucketRecheck() : node(0), bucket() {}
- EnqueuedBucketRecheck(uint16_t _node, const document::BucketId& _bucket)
+ EnqueuedBucketRecheck(uint16_t _node, const document::Bucket& _bucket)
: node(_node),
bucket(_bucket)
{}
@@ -124,7 +124,6 @@ private:
bool hasPendingClusterState() const;
bool pendingClusterStateAccepted(const std::shared_ptr<api::RequestBucketInfoReply>& repl);
- bool bucketOwnedAccordingToPendingState(const document::BucketId& bucketId) const;
bool processSingleBucketInfoReply(const std::shared_ptr<api::RequestBucketInfoReply>& repl);
void handleSingleBucketInfoFailure(const std::shared_ptr<api::RequestBucketInfoReply>& repl,
const BucketRequest& req);
@@ -134,7 +133,7 @@ private:
const BucketRequest& req);
void convertBucketInfoToBucketList(const std::shared_ptr<api::RequestBucketInfoReply>& repl,
uint16_t targetNode, BucketListMerger::BucketList& newList);
- void sendRequestBucketInfo(uint16_t node, const document::BucketId& bucket,
+ void sendRequestBucketInfo(uint16_t node, const document::Bucket& bucket,
const std::shared_ptr<MergeReplyGuard>& mergeReply);
void addBucketInfoForNode(const BucketDatabase::Entry& e, uint16_t node,
BucketListMerger::BucketList& existing) const;
@@ -146,24 +145,24 @@ private:
* in bucketId, or that bucketId is contained in, that have copies
* on the given node.
*/
- void findRelatedBucketsInDatabase(uint16_t node, const document::BucketId& bucketId,
+ void findRelatedBucketsInDatabase(uint16_t node, const document::Bucket& bucket,
BucketListMerger::BucketList& existing);
/**
Updates the bucket database from the information generated by the given
bucket list merger.
*/
- void updateDatabase(uint16_t node, BucketListMerger& merger);
+ void updateDatabase(document::BucketSpace bucketSpace, uint16_t node, BucketListMerger& merger);
void updateState(const lib::ClusterState& oldState, const lib::ClusterState& newState);
- void removeSuperfluousBuckets(const lib::Distribution& newDistribution, const lib::ClusterState& newState);
+ void removeSuperfluousBuckets(const lib::ClusterState& newState);
void replyToPreviousPendingClusterStateIfAny();
void enableCurrentClusterStateInDistributor();
void addCurrentStateToClusterStateHistory();
- void enqueueRecheckUntilPendingStateEnabled(uint16_t node, const document::BucketId&);
+ void enqueueRecheckUntilPendingStateEnabled(uint16_t node, const document::Bucket&);
void sendAllQueuedBucketRechecks();
friend class BucketDBUpdater_Test;
diff --git a/storage/src/vespa/storage/distributor/clusterinformation.cpp b/storage/src/vespa/storage/distributor/clusterinformation.cpp
index 2e22d7db51d..cd09e4f46d4 100644
--- a/storage/src/vespa/storage/distributor/clusterinformation.cpp
+++ b/storage/src/vespa/storage/distributor/clusterinformation.cpp
@@ -6,30 +6,6 @@
namespace storage::distributor {
-bool
-ClusterInformation::nodeInSameGroupAsSelf(uint16_t otherNode) const
-{
- return (getDistribution().getNodeGraph().getGroupForNode(otherNode)
- == getDistribution().getNodeGraph().getGroupForNode(getDistributorIndex()));
-}
-
-vespalib::string
-ClusterInformation::getDistributionHash() const
-{
- return getDistribution().getNodeGraph().getDistributionConfigHash();
-}
-
-std::vector<uint16_t>
-ClusterInformation::getIdealStorageNodesForState(
- const lib::ClusterState& clusterState,
- const document::BucketId& bucketId) const
-{
- return getDistribution().getIdealStorageNodes(
- clusterState,
- bucketId,
- getStorageUpStates());
-}
-
uint16_t
ClusterInformation::getStorageNodeCount() const
{
diff --git a/storage/src/vespa/storage/distributor/clusterinformation.h b/storage/src/vespa/storage/distributor/clusterinformation.h
index 8b416e26147..25f303d0f52 100644
--- a/storage/src/vespa/storage/distributor/clusterinformation.h
+++ b/storage/src/vespa/storage/distributor/clusterinformation.h
@@ -26,20 +26,10 @@ public:
virtual uint16_t getDistributorIndex() const = 0;
- virtual const lib::Distribution& getDistribution() const = 0;
-
virtual const lib::ClusterState& getClusterState() const = 0;
virtual const char* getStorageUpStates() const = 0;
- bool nodeInSameGroupAsSelf(uint16_t otherNode) const;
-
- vespalib::string getDistributionHash() const;
-
- std::vector<uint16_t> getIdealStorageNodesForState(
- const lib::ClusterState& clusterState,
- const document::BucketId& bucketId) const;
-
uint16_t getStorageNodeCount() const;
};
diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp
index 11c029c5c94..c9930b7299c 100644
--- a/storage/src/vespa/storage/distributor/distributor.cpp
+++ b/storage/src/vespa/storage/distributor/distributor.cpp
@@ -154,7 +154,7 @@ const DistributorBucketSpace& Distributor::getDefaultBucketSpace() const noexcep
BucketOwnership
Distributor::checkOwnershipInPendingState(const document::Bucket &b) const
{
- return _bucketDBUpdater.checkOwnershipInPendingState(b.getBucketId());
+ return _bucketDBUpdater.checkOwnershipInPendingState(b);
}
void
@@ -455,7 +455,7 @@ Distributor::storageDistributionChanged()
void
Distributor::recheckBucketInfo(uint16_t nodeIdx, const document::Bucket &bucket) {
- _bucketDBUpdater.recheckBucketInfo(nodeIdx, bucket.getBucketId());
+ _bucketDBUpdater.recheckBucketInfo(nodeIdx, bucket);
}
namespace {
@@ -553,7 +553,7 @@ Distributor::enableNextDistribution()
_distribution = _nextDistribution;
propagateDefaultDistribution(_distribution);
_nextDistribution = std::shared_ptr<lib::Distribution>();
- _bucketDBUpdater.storageDistributionChanged(getDistribution());
+ _bucketDBUpdater.storageDistributionChanged();
}
}
diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
index daa85822264..daaf723274d 100644
--- a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
+++ b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
@@ -91,13 +91,15 @@ PendingClusterState::initializeBucketSpaceTransitions(bool distributionChanged,
void
PendingClusterState::logConstructionInformation() const
{
+ const auto &distributorBucketSpace(_bucketSpaceRepo.get(BucketSpace::placeHolder()));
+ const auto &distribution(distributorBucketSpace.getDistribution());
LOG(debug,
"New PendingClusterState constructed with previous cluster "
"state '%s', new cluster state '%s', distribution config "
"hash: '%s'",
_prevClusterState.toString().c_str(),
_newClusterState.toString().c_str(),
- _clusterInfo->getDistribution().getNodeGraph().getDistributionConfigHash().c_str());
+ distribution.getNodeGraph().getDistributionConfigHash().c_str());
}
bool
@@ -179,7 +181,9 @@ PendingClusterState::requestBucketInfoFromStorageNodesWithChangedState()
void
PendingClusterState::requestNode(BucketSpaceAndNode bucketSpaceAndNode)
{
- vespalib::string distributionHash(_clusterInfo->getDistributionHash());
+ const auto &distributorBucketSpace(_bucketSpaceRepo.get(bucketSpaceAndNode.bucketSpace));
+ const auto &distribution(distributorBucketSpace.getDistribution());
+ vespalib::string distributionHash(distribution.getNodeGraph().getDistributionConfigHash());
LOG(debug,
"Requesting bucket info for bucket space %" PRIu64 " node %d with cluster state '%s' "
"and distribution hash '%s'",
diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.h b/storage/src/vespa/storage/distributor/pendingclusterstate.h
index 7a23d48c9fd..2d75c795745 100644
--- a/storage/src/vespa/storage/distributor/pendingclusterstate.h
+++ b/storage/src/vespa/storage/distributor/pendingclusterstate.h
@@ -113,9 +113,6 @@ public:
const lib::ClusterState& getPrevClusterState() const {
return _prevClusterState;
}
- const lib::Distribution& getDistribution() const {
- return _clusterInfo->getDistribution();
- }
/**
* Returns the union set of the outdated node set provided at construction
diff --git a/storage/src/vespa/storage/distributor/simpleclusterinformation.h b/storage/src/vespa/storage/distributor/simpleclusterinformation.h
index e6e46890c3f..2946abf620c 100644
--- a/storage/src/vespa/storage/distributor/simpleclusterinformation.h
+++ b/storage/src/vespa/storage/distributor/simpleclusterinformation.h
@@ -11,11 +11,9 @@ class SimpleClusterInformation : public ClusterInformation
{
public:
SimpleClusterInformation(uint16_t myIndex,
- const lib::Distribution& distribution,
const lib::ClusterState& clusterState,
const char* storageUpStates)
: _myIndex(myIndex),
- _distribution(distribution.serialize()),
_clusterState(clusterState),
_storageUpStates(storageUpStates)
{}
@@ -24,10 +22,6 @@ public:
return _myIndex;
}
- const lib::Distribution& getDistribution() const override {
- return _distribution;
- }
-
const lib::ClusterState& getClusterState() const override {
return _clusterState;
}
@@ -38,7 +32,6 @@ public:
private:
uint16_t _myIndex;
- lib::Distribution _distribution;
lib::ClusterState _clusterState;
const char* _storageUpStates;
};