diff options
author | Tor Egge <Tor.Egge@oath.com> | 2017-11-03 14:23:07 +0000 |
---|---|---|
committer | Tor Egge <Tor.Egge@oath.com> | 2017-11-03 14:23:07 +0000 |
commit | 27ec059fa2656d1a5009bbbf3abff20ae0c699b3 (patch) | |
tree | 7bc3b5a24f0b3eea171949403a2f01b6add1527e | |
parent | af114174557efbecdac25a1164cc7fdb5f7e21ab (diff) |
Pass distributor bucket space to get operation, put operation, two phase
update operation and update operation.
15 files changed, 62 insertions, 19 deletions
diff --git a/storage/src/tests/distributor/distributortestutil.cpp b/storage/src/tests/distributor/distributortestutil.cpp index 613ce7aad4e..5deb31f8579 100644 --- a/storage/src/tests/distributor/distributortestutil.cpp +++ b/storage/src/tests/distributor/distributortestutil.cpp @@ -343,6 +343,12 @@ DistributorTestUtil::getConfig() { return const_cast<DistributorConfiguration&>(_distributor->getConfig()); } +DistributorBucketSpace & +DistributorTestUtil::getDistributorBucketSpace() +{ + return _distributor->getDefaultBucketSpace(); +} + BucketDatabase& DistributorTestUtil::getBucketDatabase() { return _distributor->getDefaultBucketSpace().getBucketDatabase(); diff --git a/storage/src/tests/distributor/distributortestutil.h b/storage/src/tests/distributor/distributortestutil.h index 4de84e47bfe..4f09c11ac03 100644 --- a/storage/src/tests/distributor/distributortestutil.h +++ b/storage/src/tests/distributor/distributortestutil.h @@ -19,6 +19,7 @@ namespace distributor { class BucketDBUpdater; class Distributor; +class DistributorBucketSpace; class IdealStateManager; class ExternalOperationHandler; class Operation; @@ -121,6 +122,7 @@ public: } // TODO explicit notion of bucket spaces for tests + DistributorBucketSpace &getDistributorBucketSpace(); BucketDatabase& getBucketDatabase(); const BucketDatabase& getBucketDatabase() const; diff --git a/storage/src/tests/distributor/getoperationtest.cpp b/storage/src/tests/distributor/getoperationtest.cpp index ee1ea70163f..8bb8e24c17a 100644 --- a/storage/src/tests/distributor/getoperationtest.cpp +++ b/storage/src/tests/distributor/getoperationtest.cpp @@ -75,6 +75,7 @@ public: new api::GetCommand(makeDocumentBucket(document::BucketId(0)), docId, "[all]")); op.reset(new GetOperation(getExternalOperationHandler(), + getDistributorBucketSpace(), msg, getDistributor().getMetrics(). gets[msg->getLoadType()])); diff --git a/storage/src/tests/distributor/putoperationtest.cpp b/storage/src/tests/distributor/putoperationtest.cpp index 755802127fe..7f54e163006 100644 --- a/storage/src/tests/distributor/putoperationtest.cpp +++ b/storage/src/tests/distributor/putoperationtest.cpp @@ -131,6 +131,7 @@ public: void sendPut(std::shared_ptr<api::PutCommand> msg) { op.reset(new PutOperation(getExternalOperationHandler(), + getDistributorBucketSpace(), msg, getDistributor().getMetrics(). puts[msg->getLoadType()])); diff --git a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp index 9f6a7010179..a7418629f81 100644 --- a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp +++ b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp @@ -337,7 +337,7 @@ TwoPhaseUpdateOperationTest::sendUpdate(const std::string& bucketState, ExternalOperationHandler& handler = getExternalOperationHandler(); return std::make_shared<TwoPhaseUpdateOperation>( - handler, msg, getDistributor().getMetrics()); + handler, getDistributorBucketSpace(), msg, getDistributor().getMetrics()); } diff --git a/storage/src/tests/distributor/updateoperationtest.cpp b/storage/src/tests/distributor/updateoperationtest.cpp index 62590eabdad..c15a2b4057c 100644 --- a/storage/src/tests/distributor/updateoperationtest.cpp +++ b/storage/src/tests/distributor/updateoperationtest.cpp @@ -93,6 +93,7 @@ UpdateOperation_Test::sendUpdate(const std::string& bucketState) ExternalOperationHandler& handler = getExternalOperationHandler(); return std::shared_ptr<UpdateOperation>( new UpdateOperation(handler, + getDistributorBucketSpace(), msg, getDistributor().getMetrics().updates[msg->getLoadType()])); } diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp index 0d9135a375f..d0c14630451 100644 --- a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp +++ b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp @@ -138,7 +138,9 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, Put) auto handle = _mutationSequencer.try_acquire(cmd->getDocumentId()); if (allowMutation(handle)) { - _op = std::make_shared<PutOperation>(*this, cmd, getMetrics().puts[cmd->getLoadType()], std::move(handle)); + _op = std::make_shared<PutOperation>(*this, + _bucketSpaceRepo.get(cmd->getBucket().getBucketSpace()), + cmd, getMetrics().puts[cmd->getLoadType()], std::move(handle)); } else { sendUp(makeConcurrentMutationRejectionReply(*cmd, cmd->getDocumentId())); } @@ -161,7 +163,9 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, Update) } auto handle = _mutationSequencer.try_acquire(cmd->getDocumentId()); if (allowMutation(handle)) { - _op = std::make_shared<TwoPhaseUpdateOperation>(*this, cmd, getMetrics(), std::move(handle)); + _op = std::make_shared<TwoPhaseUpdateOperation>(*this, + _bucketSpaceRepo.get(cmd->getBucket().getBucketSpace()), + cmd, getMetrics(), std::move(handle)); } else { sendUp(makeConcurrentMutationRejectionReply(*cmd, cmd->getDocumentId())); } @@ -235,6 +239,7 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, Get) _op = Operation::SP(new GetOperation( *this, + _bucketSpaceRepo.get(cmd->getBucket().getBucketSpace()), cmd, getMetrics().gets[cmd->getLoadType()])); return true; diff --git a/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp index b71e2728f5b..246020c191c 100644 --- a/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp @@ -5,6 +5,7 @@ #include <vespa/storageapi/message/persistence.h> #include <vespa/vdslib/state/nodestate.h> #include <vespa/document/fieldvalue/document.h> +#include <vespa/storage/distributor/distributor_bucket_space.h> #include <vespa/log/log.h> LOG_SETUP(".distributor.callback.doc.get"); @@ -46,10 +47,12 @@ GetOperation::GroupId::operator==(const GroupId& other) const } GetOperation::GetOperation(DistributorComponent& manager, + DistributorBucketSpace &bucketSpace, const std::shared_ptr<api::GetCommand> & msg, PersistenceOperationMetricSet& metric) : Operation(), _manager(manager), + _bucketSpace(bucketSpace), _msg(msg), _returnCode(api::ReturnCode::OK), _doc((document::Document*)NULL), @@ -239,7 +242,7 @@ GetOperation::assignTargetNodeGroups() document::BucketId bid = bucketIdFactory.getBucketId(_msg->getDocumentId()); std::vector<BucketDatabase::Entry> entries; - _manager.getBucketDatabase().getParents(bid, entries); + _bucketSpace.getBucketDatabase().getParents(bid, entries); for (uint32_t j = 0; j < entries.size(); ++j) { const BucketDatabase::Entry& e = entries[j]; diff --git a/storage/src/vespa/storage/distributor/operations/external/getoperation.h b/storage/src/vespa/storage/distributor/operations/external/getoperation.h index 12748c38b90..03279a87152 100644 --- a/storage/src/vespa/storage/distributor/operations/external/getoperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/getoperation.h @@ -18,11 +18,13 @@ class PersistenceOperationMetricSet; namespace distributor { class DistributorComponent; +class DistributorBucketSpace; class GetOperation : public Operation { public: GetOperation(DistributorComponent& manager, + DistributorBucketSpace &bucketSpace, const std::shared_ptr<api::GetCommand> & msg, PersistenceOperationMetricSet& metric); @@ -71,6 +73,7 @@ private: std::map<GroupId, GroupVector> _responses; DistributorComponent& _manager; + DistributorBucketSpace &_bucketSpace; std::shared_ptr<api::GetCommand> _msg; diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp index 8467f9684bf..7ef03cb696a 100644 --- a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp @@ -12,6 +12,7 @@ #include <vespa/storageapi/message/bucket.h> #include <vespa/storageapi/message/persistence.h> #include <vespa/vdslib/distribution/idealnodecalculatorimpl.h> +#include <vespa/storage/distributor/distributor_bucket_space.h> LOG_SETUP(".distributor.callback.doc.put"); @@ -21,6 +22,7 @@ using namespace storage; using document::BucketSpace; PutOperation::PutOperation(DistributorComponent& manager, + DistributorBucketSpace &bucketSpace, const std::shared_ptr<api::PutCommand> & msg, PersistenceOperationMetricSet& metric, SequencingHandle sequencingHandle) @@ -31,7 +33,8 @@ PutOperation::PutOperation(DistributorComponent& manager, msg->getTimestamp()), _tracker(_trackerInstance), _msg(msg), - _manager(manager) + _manager(manager), + _bucketSpace(bucketSpace) { }; @@ -190,11 +193,11 @@ PutOperation::insertDatabaseEntryAndScheduleCreateBucket( assert(!multipleBuckets); (void) multipleBuckets; BucketDatabase::Entry entry( - _manager.getBucketDatabase().get(lastBucket)); + _bucketSpace.getBucketDatabase().get(lastBucket)); std::vector<uint16_t> idealState( - _manager.getDistribution().getIdealStorageNodes( + _bucketSpace.getDistribution().getIdealStorageNodes( _manager.getClusterState(), lastBucket, "ui")); - active = ActiveCopy::calculate(idealState, _manager.getDistribution(), + active = ActiveCopy::calculate(idealState, _bucketSpace.getDistribution(), entry); LOG(debug, "Active copies for bucket %s: %s", entry.getBucketId().toString().c_str(), active.toString().c_str()); @@ -203,7 +206,7 @@ PutOperation::insertDatabaseEntryAndScheduleCreateBucket( copy.setActive(true); entry->updateNode(copy); } - _manager.getBucketDatabase().update(entry); + _bucketSpace.getBucketDatabase().update(entry); } for (uint32_t i=0, n=copies.size(); i<n; ++i) { if (!copies[i].isNewCopy()) continue; @@ -274,13 +277,13 @@ PutOperation::onStart(DistributorMessageSender& sender) std::vector<document::BucketId> bucketsToCheckForSplit; lib::IdealNodeCalculatorImpl idealNodeCalculator; - idealNodeCalculator.setDistribution(_manager.getDistribution()); + idealNodeCalculator.setDistribution(_bucketSpace.getDistribution()); idealNodeCalculator.setClusterState(_manager.getClusterState()); OperationTargetResolverImpl targetResolver( - _manager.getBucketDatabase(), + _bucketSpace.getBucketDatabase(), idealNodeCalculator, _manager.getDistributor().getConfig().getMinimalBucketSplit(), - _manager.getDistribution().getRedundancy()); + _bucketSpace.getDistribution().getRedundancy()); OperationTargetList targets(targetResolver.getTargets( OperationTargetResolver::PUT, bid)); @@ -299,7 +302,7 @@ PutOperation::onStart(DistributorMessageSender& sender) // Mark any entries we're not feeding to as not trusted. std::vector<BucketDatabase::Entry> entries; - _manager.getBucketDatabase().getParents(bid, entries); + _bucketSpace.getBucketDatabase().getParents(bid, entries); std::vector<PersistenceMessageTracker::ToSend> createBucketBatch; if (targets.hasAnyNewCopies()) { diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.h b/storage/src/vespa/storage/distributor/operations/external/putoperation.h index 191a0116031..8beffe8b2c3 100644 --- a/storage/src/vespa/storage/distributor/operations/external/putoperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.h @@ -20,10 +20,13 @@ namespace api { } namespace distributor { +class DistributorBucketSpace; + class PutOperation : public SequencedOperation { public: PutOperation(DistributorComponent& manager, + DistributorBucketSpace &bucketSpace, const std::shared_ptr<api::PutCommand> & msg, PersistenceOperationMetricSet& metric, SequencingHandle sequencingHandle = SequencingHandle()); @@ -72,6 +75,7 @@ private: std::shared_ptr<api::PutCommand> _msg; DistributorComponent& _manager; + DistributorBucketSpace &_bucketSpace; }; } // distributor diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp index 838cfd5a55f..79ffee7430c 100644 --- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp @@ -10,6 +10,7 @@ #include <vespa/storageapi/message/persistence.h> #include <vespa/storageapi/message/batch.h> #include <vespa/vespalib/stllike/hash_map.hpp> +#include <vespa/storage/distributor/distributor_bucket_space.h> #include <vespa/log/log.h> LOG_SETUP(".distributor.callback.twophaseupdate"); @@ -22,6 +23,7 @@ namespace distributor { TwoPhaseUpdateOperation::TwoPhaseUpdateOperation( DistributorComponent& manager, + DistributorBucketSpace &bucketSpace, const std::shared_ptr<api::UpdateCommand>& msg, DistributorMetricSet& metrics, SequencingHandle sequencingHandle) @@ -32,6 +34,7 @@ TwoPhaseUpdateOperation::TwoPhaseUpdateOperation( _updateCmd(msg), _updateReply(), _manager(manager), + _bucketSpace(bucketSpace), _sendState(SendState::NONE_SENT), _mode(Mode::FAST_PATH), _replySent(false) @@ -148,7 +151,7 @@ TwoPhaseUpdateOperation::isFastPathPossible() const { // Fast path iff bucket exists AND is consistent (split and copies). std::vector<BucketDatabase::Entry> entries; - _manager.getBucketDatabase().getParents(_updateDocBucketId, entries); + _bucketSpace.getBucketDatabase().getParents(_updateDocBucketId, entries); if (entries.size() != 1) { return false; @@ -161,7 +164,7 @@ TwoPhaseUpdateOperation::startFastPathUpdate(DistributorMessageSender& sender) { _mode = Mode::FAST_PATH; std::shared_ptr<UpdateOperation> updateOperation( - new UpdateOperation(_manager, _updateCmd, _updateMetric)); + new UpdateOperation(_manager, _bucketSpace, _updateCmd, _updateMetric)); IntermediateMessageSender intermediate( _sentMessageMap, updateOperation, sender); @@ -189,7 +192,7 @@ TwoPhaseUpdateOperation::startSafePathUpdate(DistributorMessageSender& sender) "[all]")); copyMessageSettings(*_updateCmd, *get); std::shared_ptr<GetOperation> getOperation( - std::make_shared<GetOperation>(_manager, get, _getMetric)); + std::make_shared<GetOperation>(_manager, _bucketSpace, get, _getMetric)); IntermediateMessageSender intermediate( _sentMessageMap, getOperation, sender); @@ -257,7 +260,7 @@ TwoPhaseUpdateOperation::schedulePutsWithUpdatedDocument( new api::PutCommand(bucket, doc, putTimestamp)); copyMessageSettings(*_updateCmd, *put); std::shared_ptr<PutOperation> putOperation( - new PutOperation(_manager, put, _putMetric)); + new PutOperation(_manager, _bucketSpace, put, _putMetric)); IntermediateMessageSender intermediate( _sentMessageMap, putOperation, sender); diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h index 820dce051eb..e3fb6c93a3a 100644 --- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h @@ -21,6 +21,8 @@ class CreateBucketReply; namespace distributor { +class DistributorBucketSpace; + /* * General functional outline: * @@ -49,6 +51,7 @@ class TwoPhaseUpdateOperation : public SequencedOperation { public: TwoPhaseUpdateOperation(DistributorComponent& manager, + DistributorBucketSpace &bucketSpace, const std::shared_ptr<api::UpdateCommand> & msg, DistributorMetricSet& metrics, SequencingHandle sequencingHandle = SequencingHandle()); @@ -124,6 +127,7 @@ private: std::shared_ptr<api::UpdateCommand> _updateCmd; std::shared_ptr<api::StorageReply> _updateReply; DistributorComponent& _manager; + DistributorBucketSpace &_bucketSpace; SentMessageMap _sentMessageMap; SendState _sendState; Mode _mode; diff --git a/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp index 89bff0d1382..d622c42b321 100644 --- a/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp @@ -5,6 +5,7 @@ #include <vespa/storageapi/message/bucket.h> #include <vespa/storageapi/message/persistence.h> #include <vespa/document/fieldvalue/document.h> +#include <vespa/storage/distributor/distributor_bucket_space.h> #include <vespa/log/log.h> LOG_SETUP(".distributor.callback.doc.update"); @@ -15,6 +16,7 @@ using namespace storage; using document::BucketSpace; UpdateOperation::UpdateOperation(DistributorComponent& manager, + DistributorBucketSpace &bucketSpace, const std::shared_ptr<api::UpdateCommand> & msg, PersistenceOperationMetricSet& metric) : Operation(), @@ -24,7 +26,8 @@ UpdateOperation::UpdateOperation(DistributorComponent& manager, msg->getTimestamp()), _tracker(_trackerInstance), _msg(msg), - _manager(manager) + _manager(manager), + _bucketSpace(bucketSpace) { } @@ -69,7 +72,7 @@ UpdateOperation::onStart(DistributorMessageSender& sender) _msg->getDocumentId())); std::vector<BucketDatabase::Entry> entries; - _manager.getBucketDatabase().getParents(bucketId, entries); + _bucketSpace.getBucketDatabase().getParents(bucketId, entries); if (entries.empty()) { _tracker.fail(sender, diff --git a/storage/src/vespa/storage/distributor/operations/external/updateoperation.h b/storage/src/vespa/storage/distributor/operations/external/updateoperation.h index 802927918ca..a23fd2ab876 100644 --- a/storage/src/vespa/storage/distributor/operations/external/updateoperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/updateoperation.h @@ -17,10 +17,13 @@ class CreateBucketReply; namespace distributor { +class DistributorBucketSpace; + class UpdateOperation : public Operation { public: UpdateOperation(DistributorComponent& manager, + DistributorBucketSpace &bucketSpace, const std::shared_ptr<api::UpdateCommand> & msg, PersistenceOperationMetricSet& metric); @@ -40,6 +43,7 @@ private: std::shared_ptr<api::UpdateCommand> _msg; DistributorComponent& _manager; + DistributorBucketSpace &_bucketSpace; std::pair<document::BucketId, uint16_t> _newestTimestampLocation; bool anyStorageNodesAvailable() const; |