diff options
author | Tor Egge <Tor.Egge@oath.com> | 2017-10-26 12:28:44 +0000 |
---|---|---|
committer | Tor Egge <Tor.Egge@oath.com> | 2017-10-26 12:31:55 +0000 |
commit | c5518c77b93ce135fc02f28b121614074d772392 (patch) | |
tree | e00dc8387513ecd804d60e8517e1da9d766b53ef | |
parent | 4184bb24b0e5cead726f204457d34103ee6d3234 (diff) |
Use document::Bucket instead of document::BucketId in api
checking for pending messages in PendingMessageTracker.
Bucket space portion is still ignored by implementation.
13 files changed, 70 insertions, 38 deletions
diff --git a/storage/src/tests/distributor/idealstatemanagertest.cpp b/storage/src/tests/distributor/idealstatemanagertest.cpp index 4f9d8bdf4d4..0c695f9a3d4 100644 --- a/storage/src/tests/distributor/idealstatemanagertest.cpp +++ b/storage/src/tests/distributor/idealstatemanagertest.cpp @@ -253,9 +253,9 @@ IdealStateManagerTest::testBlockCheckForAllOperationsToSpecificBucket() RemoveBucketOperation op("storage", BucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(7))); // Not blocked for exact node match. - CPPUNIT_ASSERT(!op.checkBlock(bid, tracker)); + CPPUNIT_ASSERT(!op.checkBlock(makeDocumentBucket(bid), tracker)); // But blocked for bucket match! - CPPUNIT_ASSERT(op.checkBlockForAllNodes(bid, tracker)); + CPPUNIT_ASSERT(op.checkBlockForAllNodes(makeDocumentBucket(bid), tracker)); } } diff --git a/storage/src/tests/distributor/pendingmessagetrackertest.cpp b/storage/src/tests/distributor/pendingmessagetrackertest.cpp index 6aba8e1fb98..8e9c016ea54 100644 --- a/storage/src/tests/distributor/pendingmessagetrackertest.cpp +++ b/storage/src/tests/distributor/pendingmessagetrackertest.cpp @@ -445,19 +445,19 @@ PendingMessageTrackerTest::testGetPendingMessageTypes() { TestChecker checker; - tracker.checkPendingMessages(0, bid, checker); + tracker.checkPendingMessages(0, makeDocumentBucket(bid), checker); CPPUNIT_ASSERT_EQUAL(127, (int)checker.pri); } { TestChecker checker; - tracker.checkPendingMessages(0, document::BucketId(16, 1235), checker); + tracker.checkPendingMessages(0, makeDocumentBucket(document::BucketId(16, 1235)), checker); CPPUNIT_ASSERT_EQUAL(255, (int)checker.pri); } { TestChecker checker; - tracker.checkPendingMessages(1, bid, checker); + tracker.checkPendingMessages(1, makeDocumentBucket(bid), checker); CPPUNIT_ASSERT_EQUAL(255, (int)checker.pri); } } @@ -472,7 +472,7 @@ PendingMessageTrackerTest::testHasPendingMessage() PendingMessageTracker tracker(compReg); document::BucketId bid(16, 1234); - CPPUNIT_ASSERT(!tracker.hasPendingMessage(1, bid, api::MessageType::REMOVE_ID)); + CPPUNIT_ASSERT(!tracker.hasPendingMessage(1, makeDocumentBucket(bid), api::MessageType::REMOVE_ID)); { std::shared_ptr<api::RemoveCommand> remove( @@ -484,13 +484,13 @@ PendingMessageTrackerTest::testHasPendingMessage() tracker.insert(remove); } - CPPUNIT_ASSERT(tracker.hasPendingMessage(1, bid, api::MessageType::REMOVE_ID)); - CPPUNIT_ASSERT(!tracker.hasPendingMessage(0, bid, api::MessageType::REMOVE_ID)); - CPPUNIT_ASSERT(!tracker.hasPendingMessage(2, bid, api::MessageType::REMOVE_ID)); + CPPUNIT_ASSERT(tracker.hasPendingMessage(1, makeDocumentBucket(bid), api::MessageType::REMOVE_ID)); + CPPUNIT_ASSERT(!tracker.hasPendingMessage(0, makeDocumentBucket(bid), api::MessageType::REMOVE_ID)); + CPPUNIT_ASSERT(!tracker.hasPendingMessage(2, makeDocumentBucket(bid), api::MessageType::REMOVE_ID)); CPPUNIT_ASSERT(!tracker.hasPendingMessage(1, - document::BucketId(16, 1233), + makeDocumentBucket(document::BucketId(16, 1233)), api::MessageType::REMOVE_ID)); - CPPUNIT_ASSERT(!tracker.hasPendingMessage(1, bid, api::MessageType::DELETEBUCKET_ID)); + CPPUNIT_ASSERT(!tracker.hasPendingMessage(1, makeDocumentBucket(bid), api::MessageType::DELETEBUCKET_ID)); } namespace { @@ -527,7 +527,7 @@ PendingMessageTrackerTest::testGetAllMessagesForSingleBucket() { OperationEnumerator enumerator; - tracker.checkPendingMessages(document::BucketId(16, 1234), enumerator); + tracker.checkPendingMessages(makeDocumentBucket(document::BucketId(16, 1234)), enumerator); CPPUNIT_ASSERT_EQUAL(std::string("Remove -> 0\n" "Remove -> 0\n" "Remove -> 1\n" @@ -536,7 +536,7 @@ PendingMessageTrackerTest::testGetAllMessagesForSingleBucket() } { OperationEnumerator enumerator; - tracker.checkPendingMessages(document::BucketId(16, 9876), enumerator); + tracker.checkPendingMessages(makeDocumentBucket(document::BucketId(16, 9876)), enumerator); CPPUNIT_ASSERT_EQUAL(std::string(""), enumerator.str()); } } diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp index 3dfe5103654..fa00a0b74b8 100644 --- a/storage/src/vespa/storage/distributor/distributor.cpp +++ b/storage/src/vespa/storage/distributor/distributor.cpp @@ -518,7 +518,7 @@ Distributor::checkBucketForSplit(const BucketDatabase::Entry& e, SplitChecker checker(priority); for (uint32_t i = 0; i < e->getNodeCount(); ++i) { _pendingMessageTracker.checkPendingMessages(e->getNodeRef(i).getNode(), - e.getBucketId(), + document::Bucket(document::BucketSpace::placeHolder(), e.getBucketId()), checker); if (checker.found) { return; diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp index b1fa0d98bb0..dafc2b71e6a 100644 --- a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp @@ -286,7 +286,7 @@ PutOperation::onStart(DistributorMessageSender& sender) for (size_t i = 0; i < targets.size(); ++i) { if (_manager.getDistributor().getPendingMessageTracker(). hasPendingMessage(targets[i].getNode().getIndex(), - targets[i].getBucketId(), + targets[i].getBucket(), api::MessageType::DELETEBUCKET_ID)) { _tracker.fail(sender, api::ReturnCode(api::ReturnCode::BUCKET_DELETED, diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp index 09ea3566883..2b6739623ec 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp @@ -11,6 +11,7 @@ LOG_SETUP(".distributor.operation"); using namespace storage; using namespace storage::distributor; +using document::BucketSpace; const uint32_t IdealStateOperation::MAINTENANCE_MESSAGE_TYPES[] = { @@ -184,34 +185,42 @@ public: } bool -IdealStateOperation::checkBlock(const document::BucketId& bId, +IdealStateOperation::checkBlock(const document::Bucket &bucket, const PendingMessageTracker& tracker) const { IdealStateOpChecker ichk(*this); RequestBucketInfoChecker rchk; const std::vector<uint16_t>& nodes(getNodes()); for (size_t i = 0; i < nodes.size(); ++i) { - tracker.checkPendingMessages(nodes[i], bId, ichk); + tracker.checkPendingMessages(nodes[i], bucket, ichk); if (ichk.blocked) { return true; } // Check messages sent to null-bucket (i.e. any bucket) for the node. - tracker.checkPendingMessages(nodes[i], document::BucketId(), rchk); + document::Bucket nullBucket(bucket.getBucketSpace(), document::BucketId()); + tracker.checkPendingMessages(nodes[i], nullBucket, rchk); if (rchk.blocked) { return true; } + if (bucket.getBucketSpace() != BucketSpace::placeHolder()) { + nullBucket = document::Bucket(BucketSpace::placeHolder(), document::BucketId()); + tracker.checkPendingMessages(nodes[i], nullBucket, rchk); + if (rchk.blocked) { + return true; + } + } } return false; } bool IdealStateOperation::checkBlockForAllNodes( - const document::BucketId& bid, + const document::Bucket &bucket, const PendingMessageTracker& tracker) const { IdealStateOpChecker ichk(*this); // Check messages sent to _any node_ for _this_ particular bucket. - tracker.checkPendingMessages(bid, ichk); + tracker.checkPendingMessages(bucket, ichk); if (ichk.blocked) { return true; } @@ -219,10 +228,18 @@ IdealStateOperation::checkBlockForAllNodes( // Check messages sent to null-bucket (i.e. _any bucket_) for the node. const std::vector<uint16_t>& nodes(getNodes()); for (size_t i = 0; i < nodes.size(); ++i) { - tracker.checkPendingMessages(nodes[i], document::BucketId(), rchk); + document::Bucket nullBucket(bucket.getBucketSpace(), document::BucketId()); + tracker.checkPendingMessages(nodes[i], nullBucket, rchk); if (rchk.blocked) { return true; } + if (bucket.getBucketSpace() != BucketSpace::placeHolder()) { + nullBucket = document::Bucket(BucketSpace::placeHolder(), document::BucketId()); + tracker.checkPendingMessages(nodes[i], nullBucket, rchk); + if (rchk.blocked) { + return true; + } + } } return false; } @@ -231,7 +248,7 @@ IdealStateOperation::checkBlockForAllNodes( bool IdealStateOperation::isBlocked(const PendingMessageTracker& tracker) const { - return checkBlock(getBucketId(), tracker); + return checkBlock(getBucket(), tracker); } std::string diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h index 52f65d8781b..54a85b1873f 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h +++ b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h @@ -237,8 +237,8 @@ protected: * operations to other nodes for this bucket, these will not be part of * the set of messages checked. */ - bool checkBlock(const document::BucketId& bId, const PendingMessageTracker& tracker) const; - bool checkBlockForAllNodes(const document::BucketId& bId, const PendingMessageTracker& tracker) const; + bool checkBlock(const document::Bucket &bucket, const PendingMessageTracker& tracker) const; + bool checkBlockForAllNodes(const document::Bucket &bucket, const PendingMessageTracker& tracker) const; }; diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp index a73048e822a..aeca655d825 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp @@ -142,11 +142,17 @@ JoinOperation::onReceive(DistributorMessageSender&, const api::StorageReply::SP& } } +document::Bucket +JoinOperation::getJoinBucket(size_t idx) const +{ + return document::Bucket(getBucket().getBucketSpace(), _bucketsToJoin[idx]); +} + bool JoinOperation::isBlocked(const PendingMessageTracker& tracker) const { - return (checkBlock(getBucketId(), tracker) || - checkBlock(_bucketsToJoin[0], tracker) || - (_bucketsToJoin.size() > 1 && checkBlock(_bucketsToJoin[1], tracker))); + return (checkBlock(getBucket(), tracker) || + checkBlock(getJoinBucket(0), tracker) || + (_bucketsToJoin.size() > 1 && checkBlock(getJoinBucket(1), tracker))); } diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.h index ec3f36650fe..55a7c05264a 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.h +++ b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.h @@ -55,6 +55,8 @@ protected: */ bool enqueueJoinMessagePerTargetNode(const NodeToBuckets& nodeToBuckets); + document::Bucket getJoinBucket(size_t idx) const; + MessageTracker _tracker; std::vector<document::BucketId> _bucketsToJoin; }; diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp index 1e6fc84adbd..d7cb42d63bd 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp @@ -166,7 +166,7 @@ SplitOperation::onReceive(DistributorMessageSender&, const api::StorageReply::SP bool SplitOperation::isBlocked(const PendingMessageTracker& tracker) const { - return checkBlockForAllNodes(getBucketId(), tracker); + return checkBlockForAllNodes(getBucket(), tracker); } bool diff --git a/storage/src/vespa/storage/distributor/operationtargetresolver.cpp b/storage/src/vespa/storage/distributor/operationtargetresolver.cpp index a862ffadc6f..f537c6b67b6 100644 --- a/storage/src/vespa/storage/distributor/operationtargetresolver.cpp +++ b/storage/src/vespa/storage/distributor/operationtargetresolver.cpp @@ -6,6 +6,12 @@ namespace storage { namespace distributor { +document::Bucket +OperationTarget::getBucket() const +{ + return document::Bucket(document::BucketSpace::placeHolder(), _bucket); +} + void OperationTarget::print(vespalib::asciistream& out, const PrintProperties&) const { out << "OperationTarget(" << _bucket << ", " << _node diff --git a/storage/src/vespa/storage/distributor/operationtargetresolver.h b/storage/src/vespa/storage/distributor/operationtargetresolver.h index 55202160fea..b9f7537b5f5 100644 --- a/storage/src/vespa/storage/distributor/operationtargetresolver.h +++ b/storage/src/vespa/storage/distributor/operationtargetresolver.h @@ -6,7 +6,7 @@ */ #pragma once -#include <vespa/document/bucket/bucketid.h> +#include <vespa/document/bucket/bucket.h> #include <vespa/vdslib/state/node.h> #include <vespa/vespalib/util/printable.h> @@ -25,6 +25,7 @@ public: : _bucket(id), _node(node), _newCopy(newCopy) {} const document::BucketId& getBucketId() const { return _bucket; } + document::Bucket getBucket() const; const lib::Node& getNode() const { return _node; } bool isNewCopy() const { return _newCopy; } diff --git a/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp b/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp index aeadca17d13..ac3dd1ce25f 100644 --- a/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp +++ b/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp @@ -202,36 +202,36 @@ runCheckerOnRange(PendingMessageTracker::Checker& checker, const Range& range) void PendingMessageTracker::checkPendingMessages(uint16_t node, - const document::BucketId& bid, + const document::Bucket &bucket, Checker& checker) const { vespalib::LockGuard guard(_lock); const MessagesByNodeAndBucket& msgs(boost::multi_index::get<1>(_messages)); - auto range = pairAsRange(msgs.equal_range(boost::make_tuple(node, bid))); + auto range = pairAsRange(msgs.equal_range(boost::make_tuple(node, bucket.getBucketId()))); runCheckerOnRange(checker, range); } void -PendingMessageTracker::checkPendingMessages(const document::BucketId& bid, +PendingMessageTracker::checkPendingMessages(const document::Bucket &bucket, Checker& checker) const { vespalib::LockGuard guard(_lock); const MessagesByBucketAndType& msgs(boost::multi_index::get<2>(_messages)); - auto range = pairAsRange(msgs.equal_range(boost::make_tuple(bid))); + auto range = pairAsRange(msgs.equal_range(boost::make_tuple(bucket.getBucketId()))); runCheckerOnRange(checker, range); } bool PendingMessageTracker::hasPendingMessage(uint16_t node, - const document::BucketId& bid, + const document::Bucket &bucket, uint32_t messageType) const { vespalib::LockGuard guard(_lock); const MessagesByNodeAndBucket& msgs(boost::multi_index::get<1>(_messages)); - auto range = msgs.equal_range(boost::make_tuple(node, bid, messageType)); + auto range = msgs.equal_range(boost::make_tuple(node, bucket.getBucketId(), messageType)); return (range.first != range.second); } diff --git a/storage/src/vespa/storage/distributor/pendingmessagetracker.h b/storage/src/vespa/storage/distributor/pendingmessagetracker.h index 9b49e94d51f..c1c3e80114b 100644 --- a/storage/src/vespa/storage/distributor/pendingmessagetracker.h +++ b/storage/src/vespa/storage/distributor/pendingmessagetracker.h @@ -75,7 +75,7 @@ public: * Breaks when the checker returns false. */ void checkPendingMessages(uint16_t node, - const document::BucketId& bid, + const document::Bucket &bucket, Checker& checker) const; /** @@ -83,7 +83,7 @@ public: * and invokes the given checker with the node, message type and priority. * Breaks when the checker returns false. */ - void checkPendingMessages(const document::BucketId& bid, + void checkPendingMessages(const document::Bucket &bucket, Checker& checker) const; /** @@ -91,7 +91,7 @@ public: * messageType pending to bucket bid on the given node. */ bool hasPendingMessage(uint16_t node, - const document::BucketId& bid, + const document::Bucket &bucket, uint32_t messageType) const; /** |