diff options
author | Tor Egge <Tor.Egge@oath.com> | 2017-11-13 11:14:29 +0000 |
---|---|---|
committer | Tor Egge <Tor.Egge@oath.com> | 2017-11-13 13:14:46 +0000 |
commit | 61a4023b39195912d642ad5256f30730c5567feb (patch) | |
tree | 555f02ae217ed1411944d48966c504c861169419 /storage | |
parent | 372b70b09f7abba1aafe4931b44f01d7f9415d4a (diff) |
Partially change BucketDBUpdater to use document::Bucket instead of
document::BucketId.
Diffstat (limited to 'storage')
5 files changed, 38 insertions, 39 deletions
diff --git a/storage/src/tests/distributor/bucketdbupdatertest.cpp b/storage/src/tests/distributor/bucketdbupdatertest.cpp index 03a51cf6180..9f931c3f5c0 100644 --- a/storage/src/tests/distributor/bucketdbupdatertest.cpp +++ b/storage/src/tests/distributor/bucketdbupdatertest.cpp @@ -880,7 +880,7 @@ BucketDBUpdaterTest::testInitializingWhileRecheck() CPPUNIT_ASSERT_EQUAL(size_t(2), _sender.commands.size()); CPPUNIT_ASSERT_EQUAL(size_t(0), _senderDown.commands.size()); - getBucketDBUpdater().recheckBucketInfo(1, document::BucketId(16, 3)); + getBucketDBUpdater().recheckBucketInfo(1, makeDocumentBucket(document::BucketId(16, 3))); for (int i=0; i<2; i++) { fakeBucketReply(systemState, @@ -1010,7 +1010,7 @@ BucketDBUpdaterTest::testRecheckNodeWithFailure() _sender.clear(); - getBucketDBUpdater().recheckBucketInfo(1, document::BucketId(16, 3)); + getBucketDBUpdater().recheckBucketInfo(1, makeDocumentBucket(document::BucketId(16, 3))); CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.commands.size()); @@ -1060,7 +1060,7 @@ BucketDBUpdaterTest::testRecheckNode() _sender.clear(); - getBucketDBUpdater().recheckBucketInfo(1, document::BucketId(16, 3)); + getBucketDBUpdater().recheckBucketInfo(1, makeDocumentBucket(document::BucketId(16, 3))); CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.commands.size()); @@ -1954,7 +1954,7 @@ BucketDBUpdaterTest::testNoDbResurrectionForBucketNotOwnedInCurrentState() } _sender.clear(); - getBucketDBUpdater().recheckBucketInfo(0, bucket); + getBucketDBUpdater().recheckBucketInfo(0, makeDocumentBucket(bucket)); CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.commands.size()); std::shared_ptr<api::RequestBucketInfoCommand> rbi( @@ -1986,7 +1986,7 @@ BucketDBUpdaterTest::testNoDbResurrectionForBucketNotOwnedInPendingState() } _sender.clear(); - getBucketDBUpdater().recheckBucketInfo(0, bucket); + getBucketDBUpdater().recheckBucketInfo(0, makeDocumentBucket(bucket)); CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.commands.size()); std::shared_ptr<api::RequestBucketInfoCommand> rbi( @@ -1999,7 +1999,7 @@ BucketDBUpdaterTest::testNoDbResurrectionForBucketNotOwnedInPendingState() CPPUNIT_ASSERT(getBucketDBUpdater().getDistributorComponent() .ownsBucketInCurrentState(makeDocumentBucket(bucket))); CPPUNIT_ASSERT(!getBucketDBUpdater() - .checkOwnershipInPendingState(bucket).isOwned()); + .checkOwnershipInPendingState(makeDocumentBucket(bucket)).isOwned()); sendFakeReplyForSingleBucketRequest(*rbi); diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp index cbc78157911..1640af0f871 100644 --- a/storage/src/tests/distributor/distributortest.cpp +++ b/storage/src/tests/distributor/distributortest.cpp @@ -556,7 +556,7 @@ Distributor_Test::testNoDbResurrectionForBucketNotOwnedInPendingState() document::BucketId nonOwnedBucket(16, 3); CPPUNIT_ASSERT(!getBucketDBUpdater() - .checkOwnershipInPendingState(nonOwnedBucket).isOwned()); + .checkOwnershipInPendingState(makeDocumentBucket(nonOwnedBucket)).isOwned()); CPPUNIT_ASSERT(!getBucketDBUpdater().getDistributorComponent() .checkOwnershipInPendingAndCurrentState(makeDocumentBucket(nonOwnedBucket)) .isOwned()); diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp index b68e53c7136..faf508cf4e6 100644 --- a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp +++ b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp @@ -2,6 +2,7 @@ #include "bucketdbupdater.h" #include "distributor.h" +#include "distributor_bucket_space_repo.h" #include "simpleclusterinformation.h" #include <vespa/storage/common/bucketoperationlogger.h> #include <vespa/storageapi/message/persistence.h> @@ -59,13 +60,11 @@ BucketDBUpdater::hasPendingClusterState() const } BucketOwnership -BucketDBUpdater::checkOwnershipInPendingState(const document::BucketId& b) const +BucketDBUpdater::checkOwnershipInPendingState(const document::Bucket& b) const { if (hasPendingClusterState()) { const lib::ClusterState& state(_pendingClusterState->getNewClusterState()); - const lib::Distribution& distribution(_pendingClusterState->getDistribution()); - document::Bucket bucket(BucketSpace::placeHolder(), b); - if (!_bucketSpaceComponent.ownsBucketInState(distribution, state, bucket)) { + if (!_bucketSpaceComponent.ownsBucketInState(state, b)) { return BucketOwnership::createNotOwnedInState(state); } } @@ -75,7 +74,7 @@ BucketDBUpdater::checkOwnershipInPendingState(const document::BucketId& b) const void BucketDBUpdater::sendRequestBucketInfo( uint16_t node, - const document::BucketId& bucket, + const document::Bucket& bucket, const std::shared_ptr<MergeReplyGuard>& mergeReplyGuard) { if (!_bucketSpaceComponent.storageNodeIsUp(node)) { @@ -83,10 +82,10 @@ BucketDBUpdater::sendRequestBucketInfo( } std::vector<document::BucketId> buckets; - buckets.push_back(bucket); + buckets.push_back(bucket.getBucketId()); std::shared_ptr<api::RequestBucketInfoCommand> msg( - new api::RequestBucketInfoCommand(BucketSpace::placeHolder(), buckets)); + new api::RequestBucketInfoCommand(bucket.getBucketSpace(), buckets)); LOG(debug, "Sending request bucket info command %lu for " @@ -106,9 +105,9 @@ BucketDBUpdater::sendRequestBucketInfo( void BucketDBUpdater::recheckBucketInfo(uint32_t nodeIdx, - const document::BucketId& bid) + const document::Bucket& bucket) { - sendRequestBucketInfo(nodeIdx, bid, std::shared_ptr<MergeReplyGuard>()); + sendRequestBucketInfo(nodeIdx, bucket, std::shared_ptr<MergeReplyGuard>()); } void @@ -248,7 +247,7 @@ BucketDBUpdater::onMergeBucketReply( // bucket again to make sure it's ok. for (uint32_t i = 0; i < reply->getNodes().size(); i++) { sendRequestBucketInfo(reply->getNodes()[i].index, - reply->getBucketId(), + reply->getBucket(), replyGuard); } @@ -258,7 +257,7 @@ BucketDBUpdater::onMergeBucketReply( void BucketDBUpdater::enqueueRecheckUntilPendingStateEnabled( uint16_t node, - const document::BucketId& bucket) + const document::Bucket& bucket) { LOG(spam, "DB updater has a pending cluster state, enqueuing recheck " @@ -305,10 +304,10 @@ BucketDBUpdater::onNotifyBucketChange( if (hasPendingClusterState()) { enqueueRecheckUntilPendingStateEnabled(cmd->getSourceIndex(), - cmd->getBucketId()); + cmd->getBucket()); } else { sendRequestBucketInfo(cmd->getSourceIndex(), - cmd->getBucketId(), + cmd->getBucket(), std::shared_ptr<MergeReplyGuard>()); } @@ -357,7 +356,7 @@ BucketDBUpdater::handleSingleBucketInfoFailure( LOG(debug, "Request bucket info failed towards node %d: error was %s", req.targetNode, repl->getResult().toString().c_str()); - if (req.bucket != document::BucketId(0)) { + if (req.bucket.getBucketId() != document::BucketId(0)) { framework::MilliSecTime sendTime(_bucketSpaceComponent.getClock()); sendTime += framework::MilliSecTime(100); _delayedRequests.emplace_back(sendTime, req); @@ -451,11 +450,12 @@ BucketDBUpdater::addBucketInfoForNode( } void -BucketDBUpdater::findRelatedBucketsInDatabase(uint16_t node, const document::BucketId& bucketId, +BucketDBUpdater::findRelatedBucketsInDatabase(uint16_t node, const document::Bucket& bucket, BucketListMerger::BucketList& existing) { + auto &distributorBucketSpace(_bucketSpaceComponent.getBucketSpaceRepo().get(bucket.getBucketSpace())); std::vector<BucketDatabase::Entry> entries; - _bucketSpaceComponent.getBucketDatabase().getAll(bucketId, entries); + distributorBucketSpace.getBucketDatabase().getAll(bucket.getBucketId(), entries); for (const BucketDatabase::Entry & entry : entries) { addBucketInfoForNode(entry, node, existing); @@ -585,10 +585,10 @@ BucketDBUpdater::reportXmlStatus(vespalib::xml::XmlOutputStream& xos, { xos << XmlTag("storagenode") << XmlAttribute("index", entry.second.targetNode); - if (entry.second.bucket.getRawId() == 0) { + if (entry.second.bucket.getBucketId().getRawId() == 0) { xos << XmlAttribute("bucket", ALL); } else { - xos << XmlAttribute("bucket", entry.second.bucket.getId(), XmlAttribute::HEX); + xos << XmlAttribute("bucket", entry.second.bucket.getBucketId().getId(), XmlAttribute::HEX); } xos << XmlAttribute("sendtimestamp", entry.second.timestamp) << XmlEndTag(); diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.h b/storage/src/vespa/storage/distributor/bucketdbupdater.h index ee07c46754f..0f85bfb2494 100644 --- a/storage/src/vespa/storage/distributor/bucketdbupdater.h +++ b/storage/src/vespa/storage/distributor/bucketdbupdater.h @@ -8,7 +8,7 @@ #include "pendingclusterstate.h" #include "distributor_bucket_space_component.h" #include "outdated_nodes_map.h" -#include <vespa/document/bucket/bucketid.h> +#include <vespa/document/bucket/bucket.h> #include <vespa/storageapi/messageapi/returncode.h> #include <vespa/storageapi/message/bucket.h> #include <vespa/vdslib/state/clusterstate.h> @@ -38,8 +38,8 @@ public: ~BucketDBUpdater(); void flush(); - BucketOwnership checkOwnershipInPendingState(const document::BucketId&) const; - void recheckBucketInfo(uint32_t nodeIdx, const document::BucketId& bid); + BucketOwnership checkOwnershipInPendingState(const document::Bucket&) const; + void recheckBucketInfo(uint32_t nodeIdx, const document::Bucket& bucket); bool onSetSystemState(const std::shared_ptr<api::SetSystemStateCommand>& cmd) override; bool onRequestBucketInfoReply(const std::shared_ptr<api::RequestBucketInfoReply> & repl) override; @@ -84,9 +84,9 @@ private: struct BucketRequest { BucketRequest() - : targetNode(0), bucket(0), timestamp(0) {}; + : targetNode(0), bucket(), timestamp(0) {}; - BucketRequest(uint16_t t, uint64_t currentTime, const document::BucketId& b, + BucketRequest(uint16_t t, uint64_t currentTime, const document::Bucket& b, const std::shared_ptr<MergeReplyGuard>& guard) : targetNode(t), bucket(b), @@ -94,7 +94,7 @@ private: _mergeReplyGuard(guard) {}; uint16_t targetNode; - document::BucketId bucket; + document::Bucket bucket; uint64_t timestamp; std::shared_ptr<MergeReplyGuard> _mergeReplyGuard; @@ -102,11 +102,11 @@ private: struct EnqueuedBucketRecheck { uint16_t node; - document::BucketId bucket; + document::Bucket bucket; EnqueuedBucketRecheck() : node(0), bucket() {} - EnqueuedBucketRecheck(uint16_t _node, const document::BucketId& _bucket) + EnqueuedBucketRecheck(uint16_t _node, const document::Bucket& _bucket) : node(_node), bucket(_bucket) {} @@ -124,7 +124,6 @@ private: bool hasPendingClusterState() const; bool pendingClusterStateAccepted(const std::shared_ptr<api::RequestBucketInfoReply>& repl); - bool bucketOwnedAccordingToPendingState(const document::BucketId& bucketId) const; bool processSingleBucketInfoReply(const std::shared_ptr<api::RequestBucketInfoReply>& repl); void handleSingleBucketInfoFailure(const std::shared_ptr<api::RequestBucketInfoReply>& repl, const BucketRequest& req); @@ -134,7 +133,7 @@ private: const BucketRequest& req); void convertBucketInfoToBucketList(const std::shared_ptr<api::RequestBucketInfoReply>& repl, uint16_t targetNode, BucketListMerger::BucketList& newList); - void sendRequestBucketInfo(uint16_t node, const document::BucketId& bucket, + void sendRequestBucketInfo(uint16_t node, const document::Bucket& bucket, const std::shared_ptr<MergeReplyGuard>& mergeReply); void addBucketInfoForNode(const BucketDatabase::Entry& e, uint16_t node, BucketListMerger::BucketList& existing) const; @@ -146,7 +145,7 @@ private: * in bucketId, or that bucketId is contained in, that have copies * on the given node. */ - void findRelatedBucketsInDatabase(uint16_t node, const document::BucketId& bucketId, + void findRelatedBucketsInDatabase(uint16_t node, const document::Bucket& bucket, BucketListMerger::BucketList& existing); /** @@ -163,7 +162,7 @@ private: void enableCurrentClusterStateInDistributor(); void addCurrentStateToClusterStateHistory(); - void enqueueRecheckUntilPendingStateEnabled(uint16_t node, const document::BucketId&); + void enqueueRecheckUntilPendingStateEnabled(uint16_t node, const document::Bucket&); void sendAllQueuedBucketRechecks(); friend class BucketDBUpdater_Test; diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp index 11c029c5c94..fe3a44e83ab 100644 --- a/storage/src/vespa/storage/distributor/distributor.cpp +++ b/storage/src/vespa/storage/distributor/distributor.cpp @@ -154,7 +154,7 @@ const DistributorBucketSpace& Distributor::getDefaultBucketSpace() const noexcep BucketOwnership Distributor::checkOwnershipInPendingState(const document::Bucket &b) const { - return _bucketDBUpdater.checkOwnershipInPendingState(b.getBucketId()); + return _bucketDBUpdater.checkOwnershipInPendingState(b); } void @@ -455,7 +455,7 @@ Distributor::storageDistributionChanged() void Distributor::recheckBucketInfo(uint16_t nodeIdx, const document::Bucket &bucket) { - _bucketDBUpdater.recheckBucketInfo(nodeIdx, bucket.getBucketId()); + _bucketDBUpdater.recheckBucketInfo(nodeIdx, bucket); } namespace { |