diff options
author | Geir Storli <geirstorli@yahoo.no> | 2017-11-03 16:22:39 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-11-03 16:22:39 +0100 |
commit | 5179b365632673472e9e64286fee7eec64cc09f9 (patch) | |
tree | bf1239842c068880bf76a239d0506093afe6fa28 /storage | |
parent | dc7b61ade7a38dcd34f3ca1c6c3745cda5006d85 (diff) | |
parent | 8ef4d03ca2afbc3b1c6a8d720c68f5bcb0037e21 (diff) |
Merge pull request #4007 from vespa-engine/toregge/pass-distributor-bucket-space-to-external-operations
Pass distributor bucket space to external operations
Diffstat (limited to 'storage')
31 files changed, 155 insertions, 69 deletions
diff --git a/storage/src/tests/distributor/distributortestutil.cpp b/storage/src/tests/distributor/distributortestutil.cpp index 613ce7aad4e..5deb31f8579 100644 --- a/storage/src/tests/distributor/distributortestutil.cpp +++ b/storage/src/tests/distributor/distributortestutil.cpp @@ -343,6 +343,12 @@ DistributorTestUtil::getConfig() { return const_cast<DistributorConfiguration&>(_distributor->getConfig()); } +DistributorBucketSpace & +DistributorTestUtil::getDistributorBucketSpace() +{ + return _distributor->getDefaultBucketSpace(); +} + BucketDatabase& DistributorTestUtil::getBucketDatabase() { return _distributor->getDefaultBucketSpace().getBucketDatabase(); diff --git a/storage/src/tests/distributor/distributortestutil.h b/storage/src/tests/distributor/distributortestutil.h index 4de84e47bfe..4f09c11ac03 100644 --- a/storage/src/tests/distributor/distributortestutil.h +++ b/storage/src/tests/distributor/distributortestutil.h @@ -19,6 +19,7 @@ namespace distributor { class BucketDBUpdater; class Distributor; +class DistributorBucketSpace; class IdealStateManager; class ExternalOperationHandler; class Operation; @@ -121,6 +122,7 @@ public: } // TODO explicit notion of bucket spaces for tests + DistributorBucketSpace &getDistributorBucketSpace(); BucketDatabase& getBucketDatabase(); const BucketDatabase& getBucketDatabase() const; diff --git a/storage/src/tests/distributor/getoperationtest.cpp b/storage/src/tests/distributor/getoperationtest.cpp index ee1ea70163f..8bb8e24c17a 100644 --- a/storage/src/tests/distributor/getoperationtest.cpp +++ b/storage/src/tests/distributor/getoperationtest.cpp @@ -75,6 +75,7 @@ public: new api::GetCommand(makeDocumentBucket(document::BucketId(0)), docId, "[all]")); op.reset(new GetOperation(getExternalOperationHandler(), + getDistributorBucketSpace(), msg, getDistributor().getMetrics(). gets[msg->getLoadType()])); diff --git a/storage/src/tests/distributor/putoperationtest.cpp b/storage/src/tests/distributor/putoperationtest.cpp index 755802127fe..7f54e163006 100644 --- a/storage/src/tests/distributor/putoperationtest.cpp +++ b/storage/src/tests/distributor/putoperationtest.cpp @@ -131,6 +131,7 @@ public: void sendPut(std::shared_ptr<api::PutCommand> msg) { op.reset(new PutOperation(getExternalOperationHandler(), + getDistributorBucketSpace(), msg, getDistributor().getMetrics(). puts[msg->getLoadType()])); diff --git a/storage/src/tests/distributor/removelocationtest.cpp b/storage/src/tests/distributor/removelocationtest.cpp index 8c55b7e715c..52612048daa 100644 --- a/storage/src/tests/distributor/removelocationtest.cpp +++ b/storage/src/tests/distributor/removelocationtest.cpp @@ -40,6 +40,7 @@ public: new api::RemoveLocationCommand(selection, makeDocumentBucket(document::BucketId(0)))); op.reset(new RemoveLocationOperation(getExternalOperationHandler(), + getDistributorBucketSpace(), msg, getDistributor().getMetrics(). removelocations[msg->getLoadType()])); diff --git a/storage/src/tests/distributor/removeoperationtest.cpp b/storage/src/tests/distributor/removeoperationtest.cpp index 4c2f1bba00d..423e0816c13 100644 --- a/storage/src/tests/distributor/removeoperationtest.cpp +++ b/storage/src/tests/distributor/removeoperationtest.cpp @@ -58,6 +58,7 @@ public: new api::RemoveCommand(makeDocumentBucket(document::BucketId(0)), dId, 100)); op.reset(new RemoveOperation(getExternalOperationHandler(), + getDistributorBucketSpace(), msg, getDistributor().getMetrics(). removes[msg->getLoadType()])); diff --git a/storage/src/tests/distributor/statoperationtest.cpp b/storage/src/tests/distributor/statoperationtest.cpp index 442d74503ab..9ae8fc2fa4a 100644 --- a/storage/src/tests/distributor/statoperationtest.cpp +++ b/storage/src/tests/distributor/statoperationtest.cpp @@ -46,6 +46,7 @@ StatOperationTest::testBucketInfo() StatBucketOperation op( getExternalOperationHandler(), + getDistributorBucketSpace(), std::shared_ptr<api::StatBucketCommand>( new api::StatBucketCommand(makeDocumentBucket(document::BucketId(16, 5)), ""))); @@ -90,7 +91,7 @@ StatOperationTest::testBucketList() { new api::GetBucketListCommand(makeDocumentBucket(document::BucketId(16, 5)))); StatBucketListOperation op( - getExternalOperationHandler().getBucketDatabase(), + getDistributorBucketSpace().getBucketDatabase(), getIdealStateManager(), getExternalOperationHandler().getIndex(), msg); diff --git a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp index 9f6a7010179..a7418629f81 100644 --- a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp +++ b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp @@ -337,7 +337,7 @@ TwoPhaseUpdateOperationTest::sendUpdate(const std::string& bucketState, ExternalOperationHandler& handler = getExternalOperationHandler(); return std::make_shared<TwoPhaseUpdateOperation>( - handler, msg, getDistributor().getMetrics()); + handler, getDistributorBucketSpace(), msg, getDistributor().getMetrics()); } diff --git a/storage/src/tests/distributor/updateoperationtest.cpp b/storage/src/tests/distributor/updateoperationtest.cpp index 62590eabdad..c15a2b4057c 100644 --- a/storage/src/tests/distributor/updateoperationtest.cpp +++ b/storage/src/tests/distributor/updateoperationtest.cpp @@ -93,6 +93,7 @@ UpdateOperation_Test::sendUpdate(const std::string& bucketState) ExternalOperationHandler& handler = getExternalOperationHandler(); return std::shared_ptr<UpdateOperation>( new UpdateOperation(handler, + getDistributorBucketSpace(), msg, getDistributor().getMetrics().updates[msg->getLoadType()])); } diff --git a/storage/src/tests/distributor/visitoroperationtest.cpp b/storage/src/tests/distributor/visitoroperationtest.cpp index f12e1fa4e33..26f4fb3e784 100644 --- a/storage/src/tests/distributor/visitoroperationtest.cpp +++ b/storage/src/tests/distributor/visitoroperationtest.cpp @@ -202,6 +202,7 @@ private: { return std::make_unique<VisitorOperation>( getExternalOperationHandler(), + getDistributorBucketSpace(), msg, config, getDistributor().getMetrics().visits[msg->getLoadType()]); diff --git a/storage/src/vespa/storage/distributor/distributorcomponent.cpp b/storage/src/vespa/storage/distributor/distributorcomponent.cpp index baa9f9916b2..f8a0a5504ec 100644 --- a/storage/src/vespa/storage/distributor/distributorcomponent.cpp +++ b/storage/src/vespa/storage/distributor/distributorcomponent.cpp @@ -3,10 +3,13 @@ #include <vespa/storage/common/bucketoperationlogger.h> #include <vespa/storageapi/messageapi/storagereply.h> #include <vespa/vdslib/distribution/distribution.h> +#include <vespa/storage/distributor/distributor_bucket_space_repo.h> #include <vespa/log/log.h> LOG_SETUP(".distributorstoragelink"); +using document::BucketSpace; + namespace storage { namespace distributor { @@ -45,7 +48,8 @@ DistributorComponent::getClusterState() const std::vector<uint16_t> DistributorComponent::getIdealNodes(const document::Bucket &bucket) const { - return getDistribution().getIdealStorageNodes( + auto &bucketSpace(_bucketSpaceRepo.get(bucket.getBucketSpace())); + return bucketSpace.getDistribution().getIdealStorageNodes( getClusterState(), bucket.getBucketId(), _distributor.getStorageNodeUpStates()); @@ -82,8 +86,9 @@ BucketOwnership DistributorComponent::checkOwnershipInPendingAndCurrentState( const document::Bucket &bucket) const { + auto &bucketSpace(_bucketSpaceRepo.get(bucket.getBucketSpace())); return checkOwnershipInPendingAndGivenState( - getDistribution(), getClusterState(), bucket); + bucketSpace.getDistribution(), getClusterState(), bucket); } bool @@ -112,14 +117,15 @@ DistributorComponent::ownsBucketInState( const lib::ClusterState& clusterState, const document::Bucket &bucket) const { - return ownsBucketInState(getDistribution(), clusterState, bucket); + auto &bucketSpace(_bucketSpaceRepo.get(bucket.getBucketSpace())); + return ownsBucketInState(bucketSpace.getDistribution(), clusterState, bucket); } bool -DistributorComponent::ownsBucketInCurrentState( - const document::Bucket &bucket) const +DistributorComponent::ownsBucketInCurrentState(const document::Bucket &bucket) const { - return ownsBucketInState(getDistribution(), getClusterState(), bucket); + auto &bucketSpace(_bucketSpaceRepo.get(bucket.getBucketSpace())); + return ownsBucketInState(bucketSpace.getDistribution(), getClusterState(), bucket); } api::StorageMessageAddress @@ -165,7 +171,8 @@ void DistributorComponent::removeNodesFromDB(const document::Bucket &bucket, const std::vector<uint16_t>& nodes) { - BucketDatabase::Entry dbentry = getBucketDatabase().get(bucket.getBucketId()); + auto &bucketSpace(_bucketSpaceRepo.get(bucket.getBucketSpace())); + BucketDatabase::Entry dbentry = bucketSpace.getBucketDatabase().get(bucket.getBucketId()); if (dbentry.valid()) { for (uint32_t i = 0; i < nodes.size(); ++i) { @@ -179,14 +186,14 @@ DistributorComponent::removeNodesFromDB(const document::Bucket &bucket, } if (dbentry->getNodeCount() != 0) { - getBucketDatabase().update(dbentry); + bucketSpace.getBucketDatabase().update(dbentry); } else { LOG(debug, "After update, bucket %s now has no copies. " "Removing from database.", bucket.toString().c_str()); - getBucketDatabase().remove(bucket.getBucketId()); + bucketSpace.getBucketDatabase().remove(bucket.getBucketId()); } } } @@ -222,8 +229,9 @@ DistributorComponent::updateBucketDatabase( const std::vector<BucketCopy>& changedNodes, uint32_t updateFlags) { + auto &bucketSpace(_bucketSpaceRepo.get(bucket.getBucketSpace())); assert(!(bucket.getBucketId() == document::BucketId())); - BucketDatabase::Entry dbentry = getBucketDatabase().get(bucket.getBucketId()); + BucketDatabase::Entry dbentry = bucketSpace.getBucketDatabase().get(bucket.getBucketId()); BucketOwnership ownership(checkOwnershipInPendingAndCurrentState(bucket)); if (!ownership.isOwned()) { @@ -277,7 +285,7 @@ DistributorComponent::updateBucketDatabase( if (updateFlags & DatabaseUpdate::RESET_TRUSTED) { dbentry->resetTrusted(); } - getBucketDatabase().update(dbentry); + bucketSpace.getBucketDatabase().update(dbentry); } void @@ -332,19 +340,12 @@ DistributorComponent::getSibling(const document::BucketId& bid) const { }; BucketDatabase::Entry -DistributorComponent::createAppropriateBucket(const document::BucketId& bid) -{ - return getBucketDatabase().createAppropriateBucket( - _distributor.getConfig().getMinimalBucketSplit(), - bid); -} - -document::BucketId -DistributorComponent::getAppropriateBucket(const document::BucketId& bid) +DistributorComponent::createAppropriateBucket(const document::Bucket &bucket) { - return getBucketDatabase().getAppropriateBucket( + auto &bucketSpace(_bucketSpaceRepo.get(bucket.getBucketSpace())); + return bucketSpace.getBucketDatabase().createAppropriateBucket( _distributor.getConfig().getMinimalBucketSplit(), - bid); + bucket.getBucketId()); } bool diff --git a/storage/src/vespa/storage/distributor/distributorcomponent.h b/storage/src/vespa/storage/distributor/distributorcomponent.h index b89e7d0a5f2..307ddc20299 100644 --- a/storage/src/vespa/storage/distributor/distributorcomponent.h +++ b/storage/src/vespa/storage/distributor/distributorcomponent.h @@ -173,13 +173,10 @@ public: document::BucketId getSibling(const document::BucketId& bid) const; /** - * Gets a bucket that is split correctly according to other buckets that - * are in the bucket database. For instance, if you have a sibling bucket of - * the bucket, a similarly split bucket should be created. + * Create a bucket that is split correctly according to other buckets that + * are in the bucket database. */ - document::BucketId getAppropriateBucket(const document::BucketId& bid); - - BucketDatabase::Entry createAppropriateBucket(const document::BucketId& bid); + BucketDatabase::Entry createAppropriateBucket(const document::Bucket &bucket); /** * Returns true if the node is currently initializing. diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp index bd42885aede..77a86a3756d 100644 --- a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp +++ b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp @@ -19,6 +19,7 @@ #include <vespa/storageapi/message/removelocation.h> #include <vespa/storageapi/message/batch.h> #include <vespa/storageapi/message/stat.h> +#include "distributor_bucket_space_repo.h" #include <vespa/log/log.h> LOG_SETUP(".distributor.manager"); @@ -137,7 +138,9 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, Put) auto handle = _mutationSequencer.try_acquire(cmd->getDocumentId()); if (allowMutation(handle)) { - _op = std::make_shared<PutOperation>(*this, cmd, getMetrics().puts[cmd->getLoadType()], std::move(handle)); + _op = std::make_shared<PutOperation>(*this, + _bucketSpaceRepo.get(cmd->getBucket().getBucketSpace()), + cmd, getMetrics().puts[cmd->getLoadType()], std::move(handle)); } else { sendUp(makeConcurrentMutationRejectionReply(*cmd, cmd->getDocumentId())); } @@ -160,7 +163,9 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, Update) } auto handle = _mutationSequencer.try_acquire(cmd->getDocumentId()); if (allowMutation(handle)) { - _op = std::make_shared<TwoPhaseUpdateOperation>(*this, cmd, getMetrics(), std::move(handle)); + _op = std::make_shared<TwoPhaseUpdateOperation>(*this, + _bucketSpaceRepo.get(cmd->getBucket().getBucketSpace()), + cmd, getMetrics(), std::move(handle)); } else { sendUp(makeConcurrentMutationRejectionReply(*cmd, cmd->getDocumentId())); } @@ -183,8 +188,10 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, Remove) } auto handle = _mutationSequencer.try_acquire(cmd->getDocumentId()); if (allowMutation(handle)) { + auto &distributorBucketSpace(_bucketSpaceRepo.get(cmd->getBucket().getBucketSpace())); _op = std::make_shared<RemoveOperation>( *this, + distributorBucketSpace, cmd, getMetrics().removes[cmd->getLoadType()], std::move(handle)); @@ -213,6 +220,7 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, RemoveLocation) _op = Operation::SP(new RemoveLocationOperation( *this, + _bucketSpaceRepo.get(cmd->getBucket().getBucketSpace()), cmd, getMetrics().removelocations[cmd->getLoadType()])); return true; @@ -234,6 +242,7 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, Get) _op = Operation::SP(new GetOperation( *this, + _bucketSpaceRepo.get(cmd->getBucket().getBucketSpace()), cmd, getMetrics().gets[cmd->getLoadType()])); return true; @@ -251,6 +260,7 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, MultiOperation) _op = Operation::SP(new MultiOperationOperation( *this, + _bucketSpaceRepo.get(cmd->getBucket().getBucketSpace()), cmd, getMetrics().multioperations[cmd->getLoadType()])); return true; @@ -261,8 +271,8 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, StatBucket) if (!checkDistribution(*cmd, cmd->getBucket())) { return true; } - - _op = Operation::SP(new StatBucketOperation(*this, cmd)); + auto &distributorBucketSpace(_bucketSpaceRepo.get(cmd->getBucket().getBucketSpace())); + _op = Operation::SP(new StatBucketOperation(*this, distributorBucketSpace, cmd)); return true; } @@ -271,8 +281,11 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, GetBucketList) if (!checkDistribution(*cmd, cmd->getBucket())) { return true; } + auto bucketSpace(cmd->getBucket().getBucketSpace()); + auto &distributorBucketSpace(_bucketSpaceRepo.get(bucketSpace)); + auto &bucketDatabase(distributorBucketSpace.getBucketDatabase()); _op = Operation::SP(new StatBucketListOperation( - getBucketDatabase(), _operationGenerator, getIndex(), cmd)); + bucketDatabase, _operationGenerator, getIndex(), cmd)); return true; } @@ -281,7 +294,8 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, CreateVisitor) const DistributorConfiguration& config(getDistributor().getConfig()); VisitorOperation::Config visitorConfig(config.getMinBucketsPerVisitor(), config.getMaxVisitorsPerNodePerClientVisitor()); - _op = Operation::SP(new VisitorOperation(*this, cmd, visitorConfig, getMetrics().visits[cmd->getLoadType()])); + auto &distributorBucketSpace(_bucketSpaceRepo.get(cmd->getBucket().getBucketSpace())); + _op = Operation::SP(new VisitorOperation(*this, distributorBucketSpace, cmd, visitorConfig, getMetrics().visits[cmd->getLoadType()])); return true; } diff --git a/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp index b71e2728f5b..246020c191c 100644 --- a/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp @@ -5,6 +5,7 @@ #include <vespa/storageapi/message/persistence.h> #include <vespa/vdslib/state/nodestate.h> #include <vespa/document/fieldvalue/document.h> +#include <vespa/storage/distributor/distributor_bucket_space.h> #include <vespa/log/log.h> LOG_SETUP(".distributor.callback.doc.get"); @@ -46,10 +47,12 @@ GetOperation::GroupId::operator==(const GroupId& other) const } GetOperation::GetOperation(DistributorComponent& manager, + DistributorBucketSpace &bucketSpace, const std::shared_ptr<api::GetCommand> & msg, PersistenceOperationMetricSet& metric) : Operation(), _manager(manager), + _bucketSpace(bucketSpace), _msg(msg), _returnCode(api::ReturnCode::OK), _doc((document::Document*)NULL), @@ -239,7 +242,7 @@ GetOperation::assignTargetNodeGroups() document::BucketId bid = bucketIdFactory.getBucketId(_msg->getDocumentId()); std::vector<BucketDatabase::Entry> entries; - _manager.getBucketDatabase().getParents(bid, entries); + _bucketSpace.getBucketDatabase().getParents(bid, entries); for (uint32_t j = 0; j < entries.size(); ++j) { const BucketDatabase::Entry& e = entries[j]; diff --git a/storage/src/vespa/storage/distributor/operations/external/getoperation.h b/storage/src/vespa/storage/distributor/operations/external/getoperation.h index 12748c38b90..03279a87152 100644 --- a/storage/src/vespa/storage/distributor/operations/external/getoperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/getoperation.h @@ -18,11 +18,13 @@ class PersistenceOperationMetricSet; namespace distributor { class DistributorComponent; +class DistributorBucketSpace; class GetOperation : public Operation { public: GetOperation(DistributorComponent& manager, + DistributorBucketSpace &bucketSpace, const std::shared_ptr<api::GetCommand> & msg, PersistenceOperationMetricSet& metric); @@ -71,6 +73,7 @@ private: std::map<GroupId, GroupVector> _responses; DistributorComponent& _manager; + DistributorBucketSpace &_bucketSpace; std::shared_ptr<api::GetCommand> _msg; diff --git a/storage/src/vespa/storage/distributor/operations/external/multioperationoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/multioperationoperation.cpp index 19c693e2a7f..5d93d3e3a5a 100644 --- a/storage/src/vespa/storage/distributor/operations/external/multioperationoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/multioperationoperation.cpp @@ -4,6 +4,7 @@ #include "putoperation.h" #include <vespa/storageapi/message/multioperation.h> #include <vespa/storageapi/message/persistence.h> +#include <vespa/storage/distributor/distributor_bucket_space.h> #include <vespa/log/log.h> LOG_SETUP(".distributor.callback.doc.multioperation"); @@ -14,6 +15,7 @@ namespace storage::distributor { MultiOperationOperation::MultiOperationOperation( DistributorComponent& manager, + DistributorBucketSpace &bucketSpace, const std::shared_ptr<api::MultiOperationCommand> & msg, PersistenceOperationMetricSet& metric) : Operation(), @@ -22,6 +24,7 @@ MultiOperationOperation::MultiOperationOperation( _tracker(_trackerInstance), _msg(msg), _manager(manager), + _bucketSpace(bucketSpace), _minUseBits(manager.getDistributor().getConfig().getMinimalBucketSplit()) { } @@ -36,14 +39,14 @@ MultiOperationOperation::sendToBucket( std::vector<uint16_t> targetNodes; std::vector<MessageTracker::ToSend> createBucketBatch; - if (PutOperation::checkCreateBucket(_manager.getDistribution(), + if (PutOperation::checkCreateBucket(_bucketSpace.getDistribution(), _manager.getClusterState(), e, targetNodes, createBucketBatch, *moCommand)) { - _manager.getBucketDatabase().update(e); + _bucketSpace.getBucketDatabase().update(e); } if (createBucketBatch.size()) { @@ -143,18 +146,18 @@ MultiOperationOperation::onStart(DistributorMessageSender& sender) { if (operationIt->valid()) { document::DocumentId docId = operationIt->getDocumentId(); - document::BucketId bucketId( - _manager.getBucketIdFactory().getBucketId(docId)); + document::Bucket bucket(_msg->getBucket().getBucketSpace(), + _manager.getBucketIdFactory().getBucketId(docId)); - LOG(debug, "Operation with documentid %s mapped to bucketid %s", docId.toString().c_str(), bucketId.toString().c_str()); + LOG(debug, "Operation with documentid %s mapped to bucket %s", docId.toString().c_str(), bucket.toString().c_str()); // OK, we have a bucket ID, must now know which buckets this belongs // to std::vector<BucketDatabase::Entry> entries; - _manager.getBucketDatabase().getParents(bucketId, entries); + _bucketSpace.getBucketDatabase().getParents(bucket.getBucketId(), entries); if (entries.empty()) { - entries.push_back(_manager.createAppropriateBucket(bucketId)); + entries.push_back(_manager.createAppropriateBucket(bucket)); } for (uint32_t i = 0; i < entries.size(); ++i) { diff --git a/storage/src/vespa/storage/distributor/operations/external/multioperationoperation.h b/storage/src/vespa/storage/distributor/operations/external/multioperationoperation.h index a967c7f076f..f63fbbc5458 100644 --- a/storage/src/vespa/storage/distributor/operations/external/multioperationoperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/multioperationoperation.h @@ -17,10 +17,13 @@ namespace api { namespace distributor { +class DistributorBucketSpace; + class MultiOperationOperation : public Operation { public: MultiOperationOperation(DistributorComponent& manager, + DistributorBucketSpace &bucketSpace, const std::shared_ptr<api::MultiOperationCommand> & msg, PersistenceOperationMetricSet& metric); ~MultiOperationOperation(); @@ -36,6 +39,7 @@ private: PersistenceMessageTracker& _tracker; std::shared_ptr<api::MultiOperationCommand> _msg; DistributorComponent& _manager; + DistributorBucketSpace &_bucketSpace; uint32_t _minUseBits; uint32_t getMinimumUsedBits(const vdslib::DocumentList& opList) const; diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp index 8467f9684bf..7ef03cb696a 100644 --- a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp @@ -12,6 +12,7 @@ #include <vespa/storageapi/message/bucket.h> #include <vespa/storageapi/message/persistence.h> #include <vespa/vdslib/distribution/idealnodecalculatorimpl.h> +#include <vespa/storage/distributor/distributor_bucket_space.h> LOG_SETUP(".distributor.callback.doc.put"); @@ -21,6 +22,7 @@ using namespace storage; using document::BucketSpace; PutOperation::PutOperation(DistributorComponent& manager, + DistributorBucketSpace &bucketSpace, const std::shared_ptr<api::PutCommand> & msg, PersistenceOperationMetricSet& metric, SequencingHandle sequencingHandle) @@ -31,7 +33,8 @@ PutOperation::PutOperation(DistributorComponent& manager, msg->getTimestamp()), _tracker(_trackerInstance), _msg(msg), - _manager(manager) + _manager(manager), + _bucketSpace(bucketSpace) { }; @@ -190,11 +193,11 @@ PutOperation::insertDatabaseEntryAndScheduleCreateBucket( assert(!multipleBuckets); (void) multipleBuckets; BucketDatabase::Entry entry( - _manager.getBucketDatabase().get(lastBucket)); + _bucketSpace.getBucketDatabase().get(lastBucket)); std::vector<uint16_t> idealState( - _manager.getDistribution().getIdealStorageNodes( + _bucketSpace.getDistribution().getIdealStorageNodes( _manager.getClusterState(), lastBucket, "ui")); - active = ActiveCopy::calculate(idealState, _manager.getDistribution(), + active = ActiveCopy::calculate(idealState, _bucketSpace.getDistribution(), entry); LOG(debug, "Active copies for bucket %s: %s", entry.getBucketId().toString().c_str(), active.toString().c_str()); @@ -203,7 +206,7 @@ PutOperation::insertDatabaseEntryAndScheduleCreateBucket( copy.setActive(true); entry->updateNode(copy); } - _manager.getBucketDatabase().update(entry); + _bucketSpace.getBucketDatabase().update(entry); } for (uint32_t i=0, n=copies.size(); i<n; ++i) { if (!copies[i].isNewCopy()) continue; @@ -274,13 +277,13 @@ PutOperation::onStart(DistributorMessageSender& sender) std::vector<document::BucketId> bucketsToCheckForSplit; lib::IdealNodeCalculatorImpl idealNodeCalculator; - idealNodeCalculator.setDistribution(_manager.getDistribution()); + idealNodeCalculator.setDistribution(_bucketSpace.getDistribution()); idealNodeCalculator.setClusterState(_manager.getClusterState()); OperationTargetResolverImpl targetResolver( - _manager.getBucketDatabase(), + _bucketSpace.getBucketDatabase(), idealNodeCalculator, _manager.getDistributor().getConfig().getMinimalBucketSplit(), - _manager.getDistribution().getRedundancy()); + _bucketSpace.getDistribution().getRedundancy()); OperationTargetList targets(targetResolver.getTargets( OperationTargetResolver::PUT, bid)); @@ -299,7 +302,7 @@ PutOperation::onStart(DistributorMessageSender& sender) // Mark any entries we're not feeding to as not trusted. std::vector<BucketDatabase::Entry> entries; - _manager.getBucketDatabase().getParents(bid, entries); + _bucketSpace.getBucketDatabase().getParents(bid, entries); std::vector<PersistenceMessageTracker::ToSend> createBucketBatch; if (targets.hasAnyNewCopies()) { diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.h b/storage/src/vespa/storage/distributor/operations/external/putoperation.h index 191a0116031..8beffe8b2c3 100644 --- a/storage/src/vespa/storage/distributor/operations/external/putoperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.h @@ -20,10 +20,13 @@ namespace api { } namespace distributor { +class DistributorBucketSpace; + class PutOperation : public SequencedOperation { public: PutOperation(DistributorComponent& manager, + DistributorBucketSpace &bucketSpace, const std::shared_ptr<api::PutCommand> & msg, PersistenceOperationMetricSet& metric, SequencingHandle sequencingHandle = SequencingHandle()); @@ -72,6 +75,7 @@ private: std::shared_ptr<api::PutCommand> _msg; DistributorComponent& _manager; + DistributorBucketSpace &_bucketSpace; }; } // distributor diff --git a/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.cpp index fdf7cf4860f..2584244023b 100644 --- a/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.cpp @@ -6,6 +6,7 @@ #include <vespa/document/fieldvalue/document.h> #include <vespa/document/repo/documenttyperepo.h> #include <vespa/document/select/parser.h> +#include <vespa/storage/distributor/distributor_bucket_space.h> #include <vespa/log/log.h> LOG_SETUP(".distributor.callback.doc.removelocation"); @@ -17,6 +18,7 @@ using document::BucketSpace; RemoveLocationOperation::RemoveLocationOperation( DistributorComponent& manager, + DistributorBucketSpace &bucketSpace, const std::shared_ptr<api::RemoveLocationCommand> & msg, PersistenceOperationMetricSet& metric) : Operation(), @@ -26,7 +28,8 @@ RemoveLocationOperation::RemoveLocationOperation( 0), _tracker(_trackerInstance), _msg(msg), - _manager(manager) + _manager(manager), + _bucketSpace(bucketSpace) {} RemoveLocationOperation::~RemoveLocationOperation() {} @@ -68,7 +71,7 @@ RemoveLocationOperation::onStart(DistributorMessageSender& sender) } std::vector<BucketDatabase::Entry> entries; - _manager.getBucketDatabase().getAll(bid, entries); + _bucketSpace.getBucketDatabase().getAll(bid, entries); bool sent = false; for (uint32_t j = 0; j < entries.size(); ++j) { diff --git a/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.h b/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.h index 5feff9ba642..64aeb19bae9 100644 --- a/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.h @@ -10,10 +10,13 @@ namespace api { class RemoveLocationCommand; } namespace distributor { +class DistributorBucketSpace; + class RemoveLocationOperation : public Operation { public: RemoveLocationOperation(DistributorComponent& manager, + DistributorBucketSpace &bucketSpace, const std::shared_ptr<api::RemoveLocationCommand> & msg, PersistenceOperationMetricSet& metric); ~RemoveLocationOperation(); @@ -34,6 +37,7 @@ private: std::shared_ptr<api::RemoveLocationCommand> _msg; DistributorComponent& _manager; + DistributorBucketSpace &_bucketSpace; }; } diff --git a/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp index d2af1f6f9c5..4b9a7b3f173 100644 --- a/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "removeoperation.h" #include <vespa/storageapi/message/persistence.h> +#include <vespa/storage/distributor/distributor_bucket_space.h> #include <vespa/log/log.h> LOG_SETUP(".distributor.operation.external.remove"); @@ -11,6 +12,7 @@ using namespace storage; using document::BucketSpace; RemoveOperation::RemoveOperation(DistributorComponent& manager, + DistributorBucketSpace &bucketSpace, const std::shared_ptr<api::RemoveCommand> & msg, PersistenceOperationMetricSet& metric, SequencingHandle sequencingHandle) @@ -20,7 +22,8 @@ RemoveOperation::RemoveOperation(DistributorComponent& manager, manager, msg->getTimestamp()), _tracker(_trackerInstance), _msg(msg), - _manager(manager) + _manager(manager), + _bucketSpace(bucketSpace) { } @@ -36,7 +39,7 @@ RemoveOperation::onStart(DistributorMessageSender& sender) _msg->getDocumentId())); std::vector<BucketDatabase::Entry> entries; - _manager.getBucketDatabase().getParents(bucketId, entries); + _bucketSpace.getBucketDatabase().getParents(bucketId, entries); bool sent = false; diff --git a/storage/src/vespa/storage/distributor/operations/external/removeoperation.h b/storage/src/vespa/storage/distributor/operations/external/removeoperation.h index f48193ee2bf..7794be73ac8 100644 --- a/storage/src/vespa/storage/distributor/operations/external/removeoperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/removeoperation.h @@ -10,10 +10,13 @@ namespace api { class RemoveCommand; } namespace distributor { +class DistributorBucketSpace; + class RemoveOperation : public SequencedOperation { public: RemoveOperation(DistributorComponent& manager, + DistributorBucketSpace &bucketSpace, const std::shared_ptr<api::RemoveCommand> & msg, PersistenceOperationMetricSet& metric, SequencingHandle sequencingHandle = SequencingHandle()); @@ -33,6 +36,7 @@ private: std::shared_ptr<api::RemoveCommand> _msg; DistributorComponent& _manager; + DistributorBucketSpace &_bucketSpace; }; } diff --git a/storage/src/vespa/storage/distributor/operations/external/statbucketoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/statbucketoperation.cpp index cb395d42c9a..4e2f8a3169a 100644 --- a/storage/src/vespa/storage/distributor/operations/external/statbucketoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/statbucketoperation.cpp @@ -3,6 +3,7 @@ #include <vespa/storage/distributor/distributorcomponent.h> #include <vespa/storageapi/message/persistence.h> #include <vespa/storageapi/message/stat.h> +#include <vespa/storage/distributor/distributor_bucket_space.h> #include <vespa/log/log.h> LOG_SETUP(".distributor.callback.statbucket"); @@ -12,9 +13,11 @@ namespace distributor { StatBucketOperation::StatBucketOperation( DistributorComponent& manager, + DistributorBucketSpace &bucketSpace, const std::shared_ptr<api::StatBucketCommand> & cmd) : Operation(), _manager(manager), + _bucketSpace(bucketSpace), _command(cmd) { } @@ -35,7 +38,7 @@ StatBucketOperation::onStart(DistributorMessageSender& sender) std::vector<uint16_t> nodes; BucketDatabase::Entry entry( - _manager.getBucketDatabase().get(_command->getBucketId())); + _bucketSpace.getBucketDatabase().get(_command->getBucketId())); if (entry.valid()) { nodes = entry->getNodes(); diff --git a/storage/src/vespa/storage/distributor/operations/external/statbucketoperation.h b/storage/src/vespa/storage/distributor/operations/external/statbucketoperation.h index 79fb2b4e642..af448c2dd55 100644 --- a/storage/src/vespa/storage/distributor/operations/external/statbucketoperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/statbucketoperation.h @@ -16,11 +16,13 @@ namespace api { class StatBucketCommand; } namespace distributor { class DistributorComponent; +class DistributorBucketSpace; class StatBucketOperation : public Operation { public: StatBucketOperation(DistributorComponent& manager, + DistributorBucketSpace &bucketSpace, const std::shared_ptr<api::StatBucketCommand> & cmd); ~StatBucketOperation(); @@ -31,6 +33,7 @@ public: void onReceive(DistributorMessageSender& sender, const std::shared_ptr<api::StorageReply> & msg) override; private: DistributorComponent& _manager; + DistributorBucketSpace &_bucketSpace; std::shared_ptr<api::StatBucketCommand> _command; diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp index 838cfd5a55f..79ffee7430c 100644 --- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp @@ -10,6 +10,7 @@ #include <vespa/storageapi/message/persistence.h> #include <vespa/storageapi/message/batch.h> #include <vespa/vespalib/stllike/hash_map.hpp> +#include <vespa/storage/distributor/distributor_bucket_space.h> #include <vespa/log/log.h> LOG_SETUP(".distributor.callback.twophaseupdate"); @@ -22,6 +23,7 @@ namespace distributor { TwoPhaseUpdateOperation::TwoPhaseUpdateOperation( DistributorComponent& manager, + DistributorBucketSpace &bucketSpace, const std::shared_ptr<api::UpdateCommand>& msg, DistributorMetricSet& metrics, SequencingHandle sequencingHandle) @@ -32,6 +34,7 @@ TwoPhaseUpdateOperation::TwoPhaseUpdateOperation( _updateCmd(msg), _updateReply(), _manager(manager), + _bucketSpace(bucketSpace), _sendState(SendState::NONE_SENT), _mode(Mode::FAST_PATH), _replySent(false) @@ -148,7 +151,7 @@ TwoPhaseUpdateOperation::isFastPathPossible() const { // Fast path iff bucket exists AND is consistent (split and copies). std::vector<BucketDatabase::Entry> entries; - _manager.getBucketDatabase().getParents(_updateDocBucketId, entries); + _bucketSpace.getBucketDatabase().getParents(_updateDocBucketId, entries); if (entries.size() != 1) { return false; @@ -161,7 +164,7 @@ TwoPhaseUpdateOperation::startFastPathUpdate(DistributorMessageSender& sender) { _mode = Mode::FAST_PATH; std::shared_ptr<UpdateOperation> updateOperation( - new UpdateOperation(_manager, _updateCmd, _updateMetric)); + new UpdateOperation(_manager, _bucketSpace, _updateCmd, _updateMetric)); IntermediateMessageSender intermediate( _sentMessageMap, updateOperation, sender); @@ -189,7 +192,7 @@ TwoPhaseUpdateOperation::startSafePathUpdate(DistributorMessageSender& sender) "[all]")); copyMessageSettings(*_updateCmd, *get); std::shared_ptr<GetOperation> getOperation( - std::make_shared<GetOperation>(_manager, get, _getMetric)); + std::make_shared<GetOperation>(_manager, _bucketSpace, get, _getMetric)); IntermediateMessageSender intermediate( _sentMessageMap, getOperation, sender); @@ -257,7 +260,7 @@ TwoPhaseUpdateOperation::schedulePutsWithUpdatedDocument( new api::PutCommand(bucket, doc, putTimestamp)); copyMessageSettings(*_updateCmd, *put); std::shared_ptr<PutOperation> putOperation( - new PutOperation(_manager, put, _putMetric)); + new PutOperation(_manager, _bucketSpace, put, _putMetric)); IntermediateMessageSender intermediate( _sentMessageMap, putOperation, sender); diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h index 820dce051eb..e3fb6c93a3a 100644 --- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h @@ -21,6 +21,8 @@ class CreateBucketReply; namespace distributor { +class DistributorBucketSpace; + /* * General functional outline: * @@ -49,6 +51,7 @@ class TwoPhaseUpdateOperation : public SequencedOperation { public: TwoPhaseUpdateOperation(DistributorComponent& manager, + DistributorBucketSpace &bucketSpace, const std::shared_ptr<api::UpdateCommand> & msg, DistributorMetricSet& metrics, SequencingHandle sequencingHandle = SequencingHandle()); @@ -124,6 +127,7 @@ private: std::shared_ptr<api::UpdateCommand> _updateCmd; std::shared_ptr<api::StorageReply> _updateReply; DistributorComponent& _manager; + DistributorBucketSpace &_bucketSpace; SentMessageMap _sentMessageMap; SendState _sendState; Mode _mode; diff --git a/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp index 89bff0d1382..d622c42b321 100644 --- a/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp @@ -5,6 +5,7 @@ #include <vespa/storageapi/message/bucket.h> #include <vespa/storageapi/message/persistence.h> #include <vespa/document/fieldvalue/document.h> +#include <vespa/storage/distributor/distributor_bucket_space.h> #include <vespa/log/log.h> LOG_SETUP(".distributor.callback.doc.update"); @@ -15,6 +16,7 @@ using namespace storage; using document::BucketSpace; UpdateOperation::UpdateOperation(DistributorComponent& manager, + DistributorBucketSpace &bucketSpace, const std::shared_ptr<api::UpdateCommand> & msg, PersistenceOperationMetricSet& metric) : Operation(), @@ -24,7 +26,8 @@ UpdateOperation::UpdateOperation(DistributorComponent& manager, msg->getTimestamp()), _tracker(_trackerInstance), _msg(msg), - _manager(manager) + _manager(manager), + _bucketSpace(bucketSpace) { } @@ -69,7 +72,7 @@ UpdateOperation::onStart(DistributorMessageSender& sender) _msg->getDocumentId())); std::vector<BucketDatabase::Entry> entries; - _manager.getBucketDatabase().getParents(bucketId, entries); + _bucketSpace.getBucketDatabase().getParents(bucketId, entries); if (entries.empty()) { _tracker.fail(sender, diff --git a/storage/src/vespa/storage/distributor/operations/external/updateoperation.h b/storage/src/vespa/storage/distributor/operations/external/updateoperation.h index 802927918ca..a23fd2ab876 100644 --- a/storage/src/vespa/storage/distributor/operations/external/updateoperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/updateoperation.h @@ -17,10 +17,13 @@ class CreateBucketReply; namespace distributor { +class DistributorBucketSpace; + class UpdateOperation : public Operation { public: UpdateOperation(DistributorComponent& manager, + DistributorBucketSpace &bucketSpace, const std::shared_ptr<api::UpdateCommand> & msg, PersistenceOperationMetricSet& metric); @@ -40,6 +43,7 @@ private: std::shared_ptr<api::UpdateCommand> _msg; DistributorComponent& _manager; + DistributorBucketSpace &_bucketSpace; std::pair<document::BucketId, uint16_t> _newestTimestampLocation; bool anyStorageNodesAvailable() const; diff --git a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp index 816693f95e6..a4cebcc7c3e 100644 --- a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp @@ -45,11 +45,13 @@ VisitorOperation::BucketInfo::toString() const VisitorOperation::VisitorOperation( DistributorComponent& owner, + DistributorBucketSpace &bucketSpace, const api::CreateVisitorCommand::SP& m, const Config& config, VisitorMetricSet& metrics) : Operation(), _owner(owner), + _bucketSpace(bucketSpace), _msg(m), _sentReply(false), _config(config), @@ -458,7 +460,7 @@ bool VisitorOperation::expandBucketAll() { std::vector<BucketDatabase::Entry> entries; - _owner.getBucketDatabase().getAll(_superBucket.bid, entries); + _bucketSpace.getBucketDatabase().getAll(_superBucket.bid, entries); return pickBucketsToVisit(entries); } @@ -466,7 +468,7 @@ bool VisitorOperation::expandBucketContaining() { std::vector<BucketDatabase::Entry> entries; - _owner.getBucketDatabase().getParents(_superBucket.bid, entries); + _bucketSpace.getBucketDatabase().getParents(_superBucket.bid, entries); return pickBucketsToVisit(entries); } @@ -519,7 +521,7 @@ VisitorOperation::expandBucketContained() uint32_t maxBuckets = _msg->getMaxBucketsPerVisitor(); std::unique_ptr<document::BucketId> bid = getBucketIdAndLast( - _owner.getBucketDatabase(), + _bucketSpace.getBucketDatabase(), _superBucket.bid, _lastBucket); @@ -536,7 +538,7 @@ VisitorOperation::expandBucketContained() _superBucket.subBucketsVisitOrder.push_back(*bid); _superBucket.subBuckets[*bid] = BucketInfo(); - bid = getBucketIdAndLast(_owner.getBucketDatabase(), + bid = getBucketIdAndLast(_bucketSpace.getBucketDatabase(), _superBucket.bid, *bid); } @@ -844,7 +846,7 @@ VisitorOperation::assignBucketsToNodes(NodeToBucketsMap& nodeToBucketsMap) continue; } - BucketDatabase::Entry entry(_owner.getBucketDatabase().get(subBucket)); + BucketDatabase::Entry entry(_bucketSpace.getBucketDatabase().get(subBucket)); if (!bucketIsValidAndConsistent(entry)) { return false; } diff --git a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.h b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.h index 006e2916335..f35a9dcb3ec 100644 --- a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.h @@ -18,6 +18,7 @@ class VisitorMetricSet; namespace distributor { class DistributorComponent; +class DistributorBucketSpace; class VisitorOperation : public Operation { @@ -33,6 +34,7 @@ public: }; VisitorOperation(DistributorComponent& manager, + DistributorBucketSpace &bucketSpace, const std::shared_ptr<api::CreateVisitorCommand> & msg, const Config& config, VisitorMetricSet& metrics); @@ -147,6 +149,7 @@ private: std::unique_ptr<document::OrderingSpecification> _ordering; DistributorComponent& _owner; + DistributorBucketSpace &_bucketSpace; SentMessagesMap _sentMessages; api::CreateVisitorCommand::SP _msg; |