diff options
author | Geir Storli <geirstorli@yahoo.no> | 2017-11-06 16:42:20 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-11-06 16:42:20 +0100 |
commit | 5e9a9227722eec66390b2670359a41b4703b343a (patch) | |
tree | a1c2edd663d8bf2089afb0459c9df402b8cee529 | |
parent | 34e634df6d6c1c0494114f185330b44587116b42 (diff) | |
parent | e196f9afeb9465163b96a63d0966f3547ce8ea9e (diff) |
Merge pull request #4018 from vespa-engine/toregge/add-distributor-bucket-space-to-ideal-state-operation
Add distributor bucket space to ideal state operation
9 files changed, 37 insertions, 23 deletions
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp index d78262709e3..53d9cc018f9 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp @@ -3,6 +3,7 @@ #include "garbagecollectionoperation.h" #include <vespa/storage/distributor/idealstatemanager.h> #include <vespa/storage/distributor/distributor.h> +#include <vespa/storage/distributor/distributor_bucket_space.h> #include <vespa/storageapi/message/removelocation.h> #include <vespa/log/log.h> @@ -21,7 +22,7 @@ GarbageCollectionOperation::~GarbageCollectionOperation() { } void GarbageCollectionOperation::onStart(DistributorMessageSender& sender) { - BucketDatabase::Entry entry = _manager->getDistributorComponent().getBucketDatabase().get(getBucketId()); + BucketDatabase::Entry entry = _bucketSpace->getBucketDatabase().get(getBucketId()); std::vector<uint16_t> nodes = entry->getNodes(); for (uint32_t i = 0; i < nodes.size(); i++) { @@ -62,11 +63,11 @@ GarbageCollectionOperation::onReceive(DistributorMessageSender&, if (_tracker.finished()) { if (_ok) { - BucketDatabase::Entry dbentry = _manager->getDistributorComponent().getBucketDatabase().get(getBucketId()); + BucketDatabase::Entry dbentry = _bucketSpace->getBucketDatabase().get(getBucketId()); if (dbentry.valid()) { dbentry->setLastGarbageCollectionTime( _manager->getDistributorComponent().getClock().getTimeInSeconds().getTime()); - _manager->getDistributorComponent().getBucketDatabase().update(dbentry); + _bucketSpace->getBucketDatabase().update(dbentry); } } diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp index 3095dce7b87..1e4067eb128 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp @@ -4,6 +4,7 @@ #include <vespa/storage/distributor/pendingmessagetracker.h> #include <vespa/storage/distributor/idealstatemetricsset.h> #include <vespa/storage/distributor/pendingmessagetracker.h> +#include <vespa/storage/distributor/distributor_bucket_space_repo.h> #include <vespa/storageapi/messageapi/maintenancecommand.h> #include <vespa/log/log.h> @@ -25,7 +26,8 @@ const uint32_t IdealStateOperation::MAINTENANCE_MESSAGE_TYPES[] = }; IdealStateOperation::IdealStateOperation(const BucketAndNodes& bucketAndNodes) - : _manager(NULL), + : _manager(nullptr), + _bucketSpace(nullptr), _bucketAndNodes(bucketAndNodes), _ok(true), _priority(255) @@ -78,6 +80,12 @@ BucketAndNodes::toString() const } void +IdealStateOperation::setIdealStateManager(IdealStateManager* manager) { + _manager = manager; + _bucketSpace = &_manager->getBucketSpaceRepo().get(getBucket().getBucketSpace()); +}; + +void IdealStateOperation::done() { if (_manager != NULL) { diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h index 9824ae0630f..e8480902549 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h +++ b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h @@ -10,6 +10,7 @@ namespace storage::distributor { +class DistributorBucketSpace; class PendingMessageTracker; class IdealStateManager; @@ -165,9 +166,7 @@ public: @param manager The ideal state manager. */ - void setIdealStateManager(IdealStateManager* manager) { - _manager = manager; - }; + void setIdealStateManager(IdealStateManager* manager); /** Returns the type of operation this is. @@ -224,6 +223,7 @@ protected: friend class IdealStateManager; IdealStateManager* _manager; + DistributorBucketSpace *_bucketSpace; BucketAndNodes _bucketAndNodes; std::string _detailedReason; diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp index 77135f56399..52a4a5c195c 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp @@ -2,6 +2,7 @@ #include "joinoperation.h" #include <vespa/storageapi/message/bucketsplitting.h> +#include <vespa/storage/distributor/distributor_bucket_space.h> #include <climits> #include <vespa/log/bufferedlogger.h> LOG_SETUP(".distributor.operation.idealstate.join"); @@ -42,7 +43,7 @@ JoinOperation::NodeToBuckets JoinOperation::resolveSourceBucketsPerTargetNode() const { NodeToBuckets nodeToBuckets; - const auto& db(_manager->getDistributorComponent().getBucketDatabase()); + const auto& db(_bucketSpace->getBucketDatabase()); for (const auto& bucket : _bucketsToJoin) { BucketDatabase::Entry entry(db.get(bucket)); @@ -117,7 +118,7 @@ JoinOperation::onReceive(DistributorMessageSender&, const api::StorageReply::SP& LOG(spam, "Adding joined bucket %s", getBucketId().toString().c_str()); } } else if (rep.getResult().getResult() == api::ReturnCode::BUCKET_NOT_FOUND - && _manager->getDistributorComponent().getBucketDatabase().get(getBucketId())->getNode(node) != 0) + && _bucketSpace->getBucketDatabase().get(getBucketId())->getNode(node) != 0) { _manager->getDistributorComponent().recheckBucketInfo(node, getBucket()); LOGBP(warning, "Join failed to find %s: %s", diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp index e889dbe279b..271ac35968e 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "mergeoperation.h" #include <vespa/storage/distributor/idealstatemanager.h> +#include <vespa/storage/distributor/distributor_bucket_space.h> #include <array> #include <vespa/log/bufferedlogger.h> @@ -104,7 +105,7 @@ struct NodeIndexComparator void MergeOperation::onStart(DistributorMessageSender& sender) { - BucketDatabase::Entry entry = _manager->getDistributorComponent().getBucketDatabase().get(getBucketId()); + BucketDatabase::Entry entry = _bucketSpace->getBucketDatabase().get(getBucketId()); if (!entry.valid()) { LOGBP(debug, "Unable to merge nonexisting bucket %s", getBucketId().toString().c_str()); _ok = false; @@ -126,7 +127,7 @@ MergeOperation::onStart(DistributorMessageSender& sender) } _infoBefore = entry.getBucketInfo(); - generateSortedNodeList(_manager->getDistributorComponent().getDistribution(), + generateSortedNodeList(_bucketSpace->getDistribution(), clusterState, getBucketId(), _limiter, @@ -273,7 +274,7 @@ MergeOperation::onReceive(DistributorMessageSender& sender, _ok = result.success(); if (_ok) { BucketDatabase::Entry entry( - _manager->getDistributorComponent().getBucketDatabase().get(getBucketId())); + _bucketSpace->getBucketDatabase().get(getBucketId())); if (!entry.valid()) { LOG(debug, "Bucket %s no longer exists after merge", getBucketId().toString().c_str()); diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp index 6c0245cb590..dd7155d0f38 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp @@ -15,7 +15,7 @@ RemoveBucketOperation::onStartInternal(DistributorMessageSender& sender) { std::vector<std::pair<uint16_t, std::shared_ptr<api::DeleteBucketCommand> > > msgs; - BucketDatabase::Entry entry = _manager->getDistributorComponent().getBucketDatabase().get(getBucketId()); + BucketDatabase::Entry entry = _bucketSpace->getBucketDatabase().get(getBucketId()); for (uint32_t i = 0; i < getNodes().size(); ++i) { uint16_t node = getNodes()[i]; diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/setbucketstateoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/setbucketstateoperation.cpp index f3528d30aba..1acb2dcc64b 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/setbucketstateoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/setbucketstateoperation.cpp @@ -2,6 +2,7 @@ #include "setbucketstateoperation.h" #include <vespa/storage/distributor/idealstatemanager.h> +#include <vespa/storage/distributor/distributor_bucket_space.h> #include <vespa/log/log.h> LOG_SETUP(".distributor.operation.idealstate.setactive"); @@ -83,7 +84,7 @@ SetBucketStateOperation::onReceive(DistributorMessageSender& sender, bool deactivate = false; if (reply->getResult().success()) { BucketDatabase::Entry entry = - _manager->getDistributorComponent().getBucketDatabase().get(rep.getBucketId()); + _bucketSpace->getBucketDatabase().get(rep.getBucketId()); if (entry.valid()) { const BucketCopy* copy = entry->getNode(node); @@ -103,7 +104,7 @@ SetBucketStateOperation::onReceive(DistributorMessageSender& sender, node, bInfo).setTrusted(copy->trusted())); - _manager->getDistributorComponent().getBucketDatabase().update(entry); + _bucketSpace->getBucketDatabase().update(entry); } } else { LOG(debug, "%s did not exist when receiving %s", diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp index a8f547afe45..1b40f744a80 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp @@ -4,6 +4,7 @@ #include <vespa/storage/distributor/idealstatemanager.h> #include <vespa/storage/common/bucketoperationlogger.h> #include <vespa/storageapi/message/bucketsplitting.h> +#include <vespa/storage/distributor/distributor_bucket_space.h> #include <climits> #include <vespa/log/bufferedlogger.h> @@ -26,8 +27,7 @@ SplitOperation::onStart(DistributorMessageSender& sender) { _ok = false; - BucketDatabase::Entry entry = _manager->getDistributorComponent() - .getBucketDatabase().get(getBucketId()); + BucketDatabase::Entry entry = _bucketSpace->getBucketDatabase().get(getBucketId()); for (uint32_t i = 0; i < entry->getNodeCount(); i++) { std::shared_ptr<api::SplitBucketCommand> msg( @@ -66,7 +66,7 @@ SplitOperation::onReceive(DistributorMessageSender&, const api::StorageReply::SP if (rep.getResult().success()) { BucketDatabase::Entry entry = - _manager->getDistributorComponent().getBucketDatabase().get(rep.getBucketId()); + _bucketSpace->getBucketDatabase().get(rep.getBucketId()); if (entry.valid()) { entry->removeNode(node); @@ -74,9 +74,9 @@ SplitOperation::onReceive(DistributorMessageSender&, const api::StorageReply::SP if (entry->getNodeCount() == 0) { LOG(spam, "Removing split bucket %s", getBucketId().toString().c_str()); - _manager->getDistributorComponent().getBucketDatabase().remove(rep.getBucketId()); + _bucketSpace->getBucketDatabase().remove(rep.getBucketId()); } else { - _manager->getDistributorComponent().getBucketDatabase().update(entry); + _bucketSpace->getBucketDatabase().update(entry); } ost << getBucketId() << " => "; @@ -115,7 +115,7 @@ SplitOperation::onReceive(DistributorMessageSender&, const api::StorageReply::SP } } else if ( rep.getResult().getResult() == api::ReturnCode::BUCKET_NOT_FOUND - && _manager->getDistributorComponent().getBucketDatabase().get(rep.getBucketId())->getNode(node) != 0) + && _bucketSpace->getBucketDatabase().get(rep.getBucketId())->getNode(node) != 0) { _manager->getDistributorComponent().recheckBucketInfo(node, getBucket()); LOGBP(debug, "Split failed for %s: bucket not found. Storage and " diff --git a/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp b/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp index a1b43149963..c76e4802fb3 100644 --- a/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp +++ b/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp @@ -4,6 +4,7 @@ #include <vespa/storage/common/vectorprinter.h> #include <vespa/storage/common/bucketoperationlogger.h> #include <vespa/storageapi/message/persistence.h> +#include "distributor_bucket_space_repo.h" #include <vespa/log/log.h> @@ -123,8 +124,9 @@ PersistenceMessageTrackerImpl::canSendReplyEarly() const LOG(spam, "Can't return early because we have already replied or failed"); return false; } - - const lib::Distribution& distribution = _manager.getDistribution(); + auto &bucketSpaceRepo(_manager.getBucketSpaceRepo()); + auto &bucketSpace(bucketSpaceRepo.get(_reply->getBucket().getBucketSpace())); + const lib::Distribution& distribution = bucketSpace.getDistribution(); if (distribution.getInitialRedundancy() == 0) { LOG(spam, "Not returning early because initial redundancy wasn't set"); |