diff options
author | Tor Brede Vekterli <vekterli@yahoo-inc.com> | 2017-10-27 15:31:44 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-10-27 15:31:44 +0200 |
commit | b75f4a64a832a405da5012de6233f43296393933 (patch) | |
tree | 2e57a602230aac0ec60f6ab46368fd85498dadb2 | |
parent | 88b7039c442a02ec2635024a375ed3535f91d8aa (diff) | |
parent | 7aa0bca94a914c3ec31871b06c564250dce2fc45 (diff) |
Merge pull request #3901 from vespa-engine/toregge/use-document-bucket-in-ideal-state-operations
Toregge/use document bucket in ideal state operations
23 files changed, 154 insertions, 100 deletions
diff --git a/storage/src/tests/distributor/bucketstateoperationtest.cpp b/storage/src/tests/distributor/bucketstateoperationtest.cpp index 32fbea1be7d..216a051be15 100644 --- a/storage/src/tests/distributor/bucketstateoperationtest.cpp +++ b/storage/src/tests/distributor/bucketstateoperationtest.cpp @@ -4,6 +4,9 @@ #include <tests/distributor/distributortestutil.h> #include <vespa/storage/distributor/operations/idealstate/setbucketstateoperation.h> #include <vespa/storage/distributor/distributor.h> +#include <vespa/document/test/make_document_bucket.h> + +using document::test::makeDocumentBucket; namespace storage { @@ -59,7 +62,7 @@ BucketStateOperationTest::testActivateSingleNode() document::BucketId bid(16, 1); insertBucketInfo(bid, 0, 0xabc, 10, 1100, true, false); - BucketAndNodes bucketAndNodes(bid, toVector<uint16_t>(0)); + BucketAndNodes bucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(0)); std::vector<uint16_t> active; active.push_back(0); SetBucketStateOperation op("storage", bucketAndNodes, active); @@ -100,7 +103,7 @@ BucketStateOperationTest::testActivateAndDeactivateNodes() insertBucketInfo(bid, 0, 0xabc, 10, 1100, false, true); insertBucketInfo(bid, 1, 0xdef, 15, 1500, false, false); - BucketAndNodes bucketAndNodes(bid, toVector<uint16_t>(0, 1)); + BucketAndNodes bucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(0, 1)); std::vector<uint16_t> active; active.push_back(1); SetBucketStateOperation op("storage", bucketAndNodes, active); @@ -165,7 +168,7 @@ BucketStateOperationTest::testDoNotDeactivateIfActivateFails() insertBucketInfo(bid, 0, 0xabc, 10, 1100, false, true); insertBucketInfo(bid, 1, 0xdef, 15, 1500, false, false); - BucketAndNodes bucketAndNodes(bid, toVector<uint16_t>(0, 1)); + BucketAndNodes bucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(0, 1)); std::vector<uint16_t> active; active.push_back(1); SetBucketStateOperation op("storage", bucketAndNodes, active); @@ -214,7 +217,7 @@ BucketStateOperationTest::testBucketDbNotUpdatedOnFailure() document::BucketId bid(16, 1); insertBucketInfo(bid, 0, 0xabc, 10, 1100, true, false); - BucketAndNodes bucketAndNodes(bid, toVector<uint16_t>(0)); + BucketAndNodes bucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(0)); std::vector<uint16_t> active; active.push_back(0); SetBucketStateOperation op("storage", bucketAndNodes, active); diff --git a/storage/src/tests/distributor/garbagecollectiontest.cpp b/storage/src/tests/distributor/garbagecollectiontest.cpp index 7a0ba75a988..f65fa4b0cd3 100644 --- a/storage/src/tests/distributor/garbagecollectiontest.cpp +++ b/storage/src/tests/distributor/garbagecollectiontest.cpp @@ -6,6 +6,9 @@ #include <vespa/storage/distributor/idealstatemanager.h> #include <tests/distributor/distributortestutil.h> #include <vespa/storage/distributor/distributor.h> +#include <vespa/document/test/make_document_bucket.h> + +using document::test::makeDocumentBucket; namespace storage { namespace distributor { @@ -39,7 +42,7 @@ GarbageCollectionOperationTest::testSimple() getConfig().setGarbageCollection("music.date < 34", 3600); GarbageCollectionOperation op("storage", - BucketAndNodes(document::BucketId(16, 1), + BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), toVector<uint16_t>(0, 1))); op.setIdealStateManager(&getIdealStateManager()); diff --git a/storage/src/tests/distributor/idealstatemanagertest.cpp b/storage/src/tests/distributor/idealstatemanagertest.cpp index d4d7a00a2df..0c695f9a3d4 100644 --- a/storage/src/tests/distributor/idealstatemanagertest.cpp +++ b/storage/src/tests/distributor/idealstatemanagertest.cpp @@ -143,9 +143,9 @@ IdealStateManagerTest::testClearActiveOnNodeDown() } CPPUNIT_ASSERT_EQUAL( - std::string("setbucketstate to [0] BucketId(0x4000000000000001) (pri 100)\n" - "setbucketstate to [0] BucketId(0x4000000000000002) (pri 100)\n" - "setbucketstate to [0] BucketId(0x4000000000000003) (pri 100)\n"), + std::string("setbucketstate to [0] Bucket(BucketSpace(0x0000000000000000), BucketId(0x4000000000000001)) (pri 100)\n" + "setbucketstate to [0] Bucket(BucketSpace(0x0000000000000000), BucketId(0x4000000000000002)) (pri 100)\n" + "setbucketstate to [0] Bucket(BucketSpace(0x0000000000000000), BucketId(0x4000000000000003)) (pri 100)\n"), _distributor->getActiveIdealStateOperations()); setSystemState(lib::ClusterState("distributor:1 storage:3 .0.s:d")); @@ -169,19 +169,19 @@ IdealStateManagerTest::testRecheckWhenActive() tick(); CPPUNIT_ASSERT_EQUAL( - std::string("setbucketstate to [0] BucketId(0x4000000000000001) (pri 100)\n"), + std::string("setbucketstate to [0] Bucket(BucketSpace(0x0000000000000000), BucketId(0x4000000000000001)) (pri 100)\n"), _distributor->getActiveIdealStateOperations()); tick(); CPPUNIT_ASSERT_EQUAL( - std::string("setbucketstate to [0] BucketId(0x4000000000000001) (pri 100)\n"), + std::string("setbucketstate to [0] Bucket(BucketSpace(0x0000000000000000), BucketId(0x4000000000000001)) (pri 100)\n"), _distributor->getActiveIdealStateOperations()); tick(); CPPUNIT_ASSERT_EQUAL( - std::string("setbucketstate to [0] BucketId(0x4000000000000001) (pri 100)\n"), + std::string("setbucketstate to [0] Bucket(BucketSpace(0x0000000000000000), BucketId(0x4000000000000001)) (pri 100)\n"), _distributor->getActiveIdealStateOperations()); } @@ -208,14 +208,14 @@ IdealStateManagerTest::testBlockIdealStateOpsOnFullRequestBucketInfo() { RemoveBucketOperation op("storage", - BucketAndNodes(bid, toVector<uint16_t>(3, 4))); + BucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(3, 4))); CPPUNIT_ASSERT(op.isBlocked(tracker)); } { // Don't trigger on requests to other nodes. RemoveBucketOperation op("storage", - BucketAndNodes(bid, toVector<uint16_t>(3, 5))); + BucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(3, 5))); CPPUNIT_ASSERT(!op.isBlocked(tracker)); } @@ -230,7 +230,7 @@ IdealStateManagerTest::testBlockIdealStateOpsOnFullRequestBucketInfo() { RemoveBucketOperation op("storage", - BucketAndNodes(bid, toVector<uint16_t>(7))); + BucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(7))); CPPUNIT_ASSERT(!op.isBlocked(tracker)); } } @@ -251,11 +251,11 @@ IdealStateManagerTest::testBlockCheckForAllOperationsToSpecificBucket() } { RemoveBucketOperation op("storage", - BucketAndNodes(bid, toVector<uint16_t>(7))); + BucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(7))); // Not blocked for exact node match. - CPPUNIT_ASSERT(!op.checkBlock(bid, tracker)); + CPPUNIT_ASSERT(!op.checkBlock(makeDocumentBucket(bid), tracker)); // But blocked for bucket match! - CPPUNIT_ASSERT(op.checkBlockForAllNodes(bid, tracker)); + CPPUNIT_ASSERT(op.checkBlockForAllNodes(makeDocumentBucket(bid), tracker)); } } diff --git a/storage/src/tests/distributor/joinbuckettest.cpp b/storage/src/tests/distributor/joinbuckettest.cpp index 8b9a929d429..e0275214c34 100644 --- a/storage/src/tests/distributor/joinbuckettest.cpp +++ b/storage/src/tests/distributor/joinbuckettest.cpp @@ -4,6 +4,9 @@ #include <vespa/storage/distributor/operations/idealstate/joinoperation.h> #include <vespa/storage/distributor/distributor.h> #include <tests/distributor/distributortestutil.h> +#include <vespa/document/test/make_document_bucket.h> + +using document::test::makeDocumentBucket; namespace storage { namespace distributor { @@ -48,7 +51,7 @@ JoinOperationTest::testSimple() _distributor->enableClusterState(lib::ClusterState("distributor:1 storage:1")); JoinOperation op("storage", - BucketAndNodes(document::BucketId(32, 0), + BucketAndNodes(makeDocumentBucket(document::BucketId(32, 0)), toVector<uint16_t>(0)), toVector(document::BucketId(33, 1), document::BucketId(33, 0x100000001))); @@ -109,7 +112,7 @@ JoinOperationTest::sendSparseJoinsToNodesWithoutBothSourceBuckets() lib::ClusterState("distributor:1 storage:2")); JoinOperation op("storage", - BucketAndNodes(document::BucketId(32, 0), + BucketAndNodes(makeDocumentBucket(document::BucketId(32, 0)), toVector<uint16_t>(0, 1)), toVector(document::BucketId(33, 1), document::BucketId(33, 0x100000001))); diff --git a/storage/src/tests/distributor/mergeoperationtest.cpp b/storage/src/tests/distributor/mergeoperationtest.cpp index f1db60a5ef4..7ca58ba54f3 100644 --- a/storage/src/tests/distributor/mergeoperationtest.cpp +++ b/storage/src/tests/distributor/mergeoperationtest.cpp @@ -72,7 +72,7 @@ MergeOperationTest::testSimple() _distributor->enableClusterState(lib::ClusterState("distributor:1 storage:3")); - MergeOperation op(BucketAndNodes(document::BucketId(16, 1), + MergeOperation op(BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), toVector<uint16_t>(0, 1, 2))); op.setIdealStateManager(&getIdealStateManager()); op.start(_sender, framework::MilliSecTime(0)); @@ -105,7 +105,7 @@ MergeOperationTest::testFailIfSourceOnlyCopiesChanged() _distributor->enableClusterState(lib::ClusterState("distributor:1 storage:3")); - MergeOperation op(BucketAndNodes(document::BucketId(16, 1), + MergeOperation op(BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), toVector<uint16_t>(0, 1, 2))); op.setIdealStateManager(&getIdealStateManager()); op.start(_sender, framework::MilliSecTime(0)); @@ -307,7 +307,7 @@ MergeOperationTest::doNotRemoveCopiesWithPendingMessages() { "1=20/1/1," "2=10/1/1/t"); - MergeOperation op(BucketAndNodes(bucket, + MergeOperation op(BucketAndNodes(makeDocumentBucket(bucket), toVector<uint16_t>(0, 1, 2))); op.setIdealStateManager(&getIdealStateManager()); op.start(_sender, framework::MilliSecTime(0)); @@ -369,7 +369,7 @@ MergeOperationTest::allow_deleting_active_source_only_replica() _distributor->enableClusterState( lib::ClusterState("distributor:1 storage:3")); - MergeOperation op(BucketAndNodes(document::BucketId(16, 1), + MergeOperation op(BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), toVector<uint16_t>(0, 1, 2))); op.setIdealStateManager(&getIdealStateManager()); op.start(_sender, framework::MilliSecTime(0)); @@ -486,7 +486,7 @@ void MergeOperationTest::merge_operation_is_blocked_by_any_busy_target_node() { getClock().setAbsoluteTimeInSeconds(10); addNodesToBucketDB(document::BucketId(16, 1), "0=10/1/1/t,1=20/1/1,2=10/1/1/t"); _distributor->enableClusterState(lib::ClusterState("distributor:1 storage:3")); - MergeOperation op(BucketAndNodes(document::BucketId(16, 1), toVector<uint16_t>(0, 1, 2))); + MergeOperation op(BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), toVector<uint16_t>(0, 1, 2))); op.setIdealStateManager(&getIdealStateManager()); // Should not block on nodes _not_ included in operation node set diff --git a/storage/src/tests/distributor/pendingmessagetrackertest.cpp b/storage/src/tests/distributor/pendingmessagetrackertest.cpp index 6aba8e1fb98..8e9c016ea54 100644 --- a/storage/src/tests/distributor/pendingmessagetrackertest.cpp +++ b/storage/src/tests/distributor/pendingmessagetrackertest.cpp @@ -445,19 +445,19 @@ PendingMessageTrackerTest::testGetPendingMessageTypes() { TestChecker checker; - tracker.checkPendingMessages(0, bid, checker); + tracker.checkPendingMessages(0, makeDocumentBucket(bid), checker); CPPUNIT_ASSERT_EQUAL(127, (int)checker.pri); } { TestChecker checker; - tracker.checkPendingMessages(0, document::BucketId(16, 1235), checker); + tracker.checkPendingMessages(0, makeDocumentBucket(document::BucketId(16, 1235)), checker); CPPUNIT_ASSERT_EQUAL(255, (int)checker.pri); } { TestChecker checker; - tracker.checkPendingMessages(1, bid, checker); + tracker.checkPendingMessages(1, makeDocumentBucket(bid), checker); CPPUNIT_ASSERT_EQUAL(255, (int)checker.pri); } } @@ -472,7 +472,7 @@ PendingMessageTrackerTest::testHasPendingMessage() PendingMessageTracker tracker(compReg); document::BucketId bid(16, 1234); - CPPUNIT_ASSERT(!tracker.hasPendingMessage(1, bid, api::MessageType::REMOVE_ID)); + CPPUNIT_ASSERT(!tracker.hasPendingMessage(1, makeDocumentBucket(bid), api::MessageType::REMOVE_ID)); { std::shared_ptr<api::RemoveCommand> remove( @@ -484,13 +484,13 @@ PendingMessageTrackerTest::testHasPendingMessage() tracker.insert(remove); } - CPPUNIT_ASSERT(tracker.hasPendingMessage(1, bid, api::MessageType::REMOVE_ID)); - CPPUNIT_ASSERT(!tracker.hasPendingMessage(0, bid, api::MessageType::REMOVE_ID)); - CPPUNIT_ASSERT(!tracker.hasPendingMessage(2, bid, api::MessageType::REMOVE_ID)); + CPPUNIT_ASSERT(tracker.hasPendingMessage(1, makeDocumentBucket(bid), api::MessageType::REMOVE_ID)); + CPPUNIT_ASSERT(!tracker.hasPendingMessage(0, makeDocumentBucket(bid), api::MessageType::REMOVE_ID)); + CPPUNIT_ASSERT(!tracker.hasPendingMessage(2, makeDocumentBucket(bid), api::MessageType::REMOVE_ID)); CPPUNIT_ASSERT(!tracker.hasPendingMessage(1, - document::BucketId(16, 1233), + makeDocumentBucket(document::BucketId(16, 1233)), api::MessageType::REMOVE_ID)); - CPPUNIT_ASSERT(!tracker.hasPendingMessage(1, bid, api::MessageType::DELETEBUCKET_ID)); + CPPUNIT_ASSERT(!tracker.hasPendingMessage(1, makeDocumentBucket(bid), api::MessageType::DELETEBUCKET_ID)); } namespace { @@ -527,7 +527,7 @@ PendingMessageTrackerTest::testGetAllMessagesForSingleBucket() { OperationEnumerator enumerator; - tracker.checkPendingMessages(document::BucketId(16, 1234), enumerator); + tracker.checkPendingMessages(makeDocumentBucket(document::BucketId(16, 1234)), enumerator); CPPUNIT_ASSERT_EQUAL(std::string("Remove -> 0\n" "Remove -> 0\n" "Remove -> 1\n" @@ -536,7 +536,7 @@ PendingMessageTrackerTest::testGetAllMessagesForSingleBucket() } { OperationEnumerator enumerator; - tracker.checkPendingMessages(document::BucketId(16, 9876), enumerator); + tracker.checkPendingMessages(makeDocumentBucket(document::BucketId(16, 9876)), enumerator); CPPUNIT_ASSERT_EQUAL(std::string(""), enumerator.str()); } } diff --git a/storage/src/tests/distributor/removebucketoperationtest.cpp b/storage/src/tests/distributor/removebucketoperationtest.cpp index d9f6518d694..f289d374509 100644 --- a/storage/src/tests/distributor/removebucketoperationtest.cpp +++ b/storage/src/tests/distributor/removebucketoperationtest.cpp @@ -7,6 +7,9 @@ #include <vespa/storage/distributor/idealstatemanager.h> #include <vespa/storage/distributor/distributor.h> #include <tests/distributor/distributortestutil.h> +#include <vespa/document/test/make_document_bucket.h> + +using document::test::makeDocumentBucket; namespace storage { namespace distributor { @@ -48,7 +51,7 @@ RemoveBucketOperationTest::testSimple() _distributor->enableClusterState(lib::ClusterState("distributor:1 storage:3")); RemoveBucketOperation op("storage", - BucketAndNodes(document::BucketId(16, 1), + BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), toVector<uint16_t>(1,2))); op.setIdealStateManager(&getIdealStateManager()); op.start(_sender, framework::MilliSecTime(0)); @@ -85,7 +88,7 @@ RemoveBucketOperationTest::testBucketInfoMismatchFailure() _distributor->enableClusterState(lib::ClusterState("distributor:1 storage:2")); RemoveBucketOperation op("storage", - BucketAndNodes(document::BucketId(16, 1), + BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), toVector<uint16_t>(1))); op.setIdealStateManager(&getIdealStateManager()); op.start(_sender, framework::MilliSecTime(0)); @@ -126,7 +129,7 @@ RemoveBucketOperationTest::testFailWithInvalidBucketInfo() _distributor->enableClusterState(lib::ClusterState("distributor:1 storage:2")); RemoveBucketOperation op("storage", - BucketAndNodes(document::BucketId(16, 1), + BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), toVector<uint16_t>(1))); op.setIdealStateManager(&getIdealStateManager()); op.start(_sender, framework::MilliSecTime(0)); diff --git a/storage/src/tests/distributor/splitbuckettest.cpp b/storage/src/tests/distributor/splitbuckettest.cpp index 2caf8f106f1..47452aac391 100644 --- a/storage/src/tests/distributor/splitbuckettest.cpp +++ b/storage/src/tests/distributor/splitbuckettest.cpp @@ -76,7 +76,7 @@ SplitOperationTest::testSimple() tooLargeBucketSize, 250); SplitOperation op("storage", - BucketAndNodes(document::BucketId(16, 1), + BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), toVector<uint16_t>(0)), maxSplitBits, splitCount, @@ -156,7 +156,7 @@ SplitOperationTest::testMultiNodeFailure() SplitOperation op("storage", - BucketAndNodes(document::BucketId(16, 1), + BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), toVector<uint16_t>(0,1)), maxSplitBits, splitCount, @@ -256,7 +256,7 @@ SplitOperationTest::testCopyTrustedStatusNotCarriedOverAfterSplit() "2=550/60/70000"); SplitOperation op("storage", - BucketAndNodes(sourceBucket, toVector<uint16_t>(0, 1)), + BucketAndNodes(makeDocumentBucket(sourceBucket), toVector<uint16_t>(0, 1)), maxSplitBits, splitCount, splitByteSize); @@ -329,7 +329,7 @@ SplitOperationTest::testOperationBlockedByPendingJoin() insertBucketInfo(joinTarget, 0, 0xabc, 1000, 1234, 250); SplitOperation op("storage", - BucketAndNodes(joinTarget, toVector<uint16_t>(0)), + BucketAndNodes(makeDocumentBucket(joinTarget), toVector<uint16_t>(0)), maxSplitBits, splitCount, splitByteSize); diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp index 3dfe5103654..fa00a0b74b8 100644 --- a/storage/src/vespa/storage/distributor/distributor.cpp +++ b/storage/src/vespa/storage/distributor/distributor.cpp @@ -518,7 +518,7 @@ Distributor::checkBucketForSplit(const BucketDatabase::Entry& e, SplitChecker checker(priority); for (uint32_t i = 0; i < e->getNodeCount(); ++i) { _pendingMessageTracker.checkPendingMessages(e->getNodeRef(i).getNode(), - e.getBucketId(), + document::Bucket(document::BucketSpace::placeHolder(), e.getBucketId()), checker); if (checker.found) { return; diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp index b1fa0d98bb0..dafc2b71e6a 100644 --- a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp @@ -286,7 +286,7 @@ PutOperation::onStart(DistributorMessageSender& sender) for (size_t i = 0; i < targets.size(); ++i) { if (_manager.getDistributor().getPendingMessageTracker(). hasPendingMessage(targets[i].getNode().getIndex(), - targets[i].getBucketId(), + targets[i].getBucket(), api::MessageType::DELETEBUCKET_ID)) { _tracker.fail(sender, api::ReturnCode(api::ReturnCode::BUCKET_DELETED, diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp index 2315c7ab745..3095dce7b87 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp @@ -11,6 +11,7 @@ LOG_SETUP(".distributor.operation"); using namespace storage; using namespace storage::distributor; +using document::BucketSpace; const uint32_t IdealStateOperation::MAINTENANCE_MESSAGE_TYPES[] = { @@ -35,21 +36,28 @@ IdealStateOperation::~IdealStateOperation() { } -BucketAndNodes::BucketAndNodes(const document::BucketId& id, uint16_t node) - : _id(id) +BucketAndNodes::BucketAndNodes(const document::Bucket &bucket, uint16_t node) + : _bucket(bucket) { _nodes.push_back(node); } -BucketAndNodes::BucketAndNodes(const document::BucketId& id, +BucketAndNodes::BucketAndNodes(const document::Bucket &bucket, const std::vector<uint16_t>& nodes) - : _id(id), + : _bucket(bucket), _nodes(nodes) { assert(!nodes.empty()); std::sort(_nodes.begin(), _nodes.end()); } +void +BucketAndNodes::setBucketId(const document::BucketId &id) +{ + document::Bucket newBucket(_bucket.getBucketSpace(), id); + _bucket = newBucket; +} + std::string BucketAndNodes::toString() const { @@ -65,7 +73,7 @@ BucketAndNodes::toString() const } ost << "] "; - ost << _id; + ost << _bucket.toString(); return ost.str(); } @@ -174,23 +182,41 @@ public: } }; +bool +checkNullBucketRequestBucketInfoMessage(uint16_t node, + document::BucketSpace bucketSpace, + const PendingMessageTracker& tracker) +{ + RequestBucketInfoChecker rchk; + for (;;) { + // Check messages sent to null-bucket (i.e. any bucket) for the node. + document::Bucket nullBucket(bucketSpace, document::BucketId()); + tracker.checkPendingMessages(node, nullBucket, rchk); + if (rchk.blocked) { + return true; + } + if (bucketSpace == BucketSpace::placeHolder()) { + break; + } + bucketSpace = BucketSpace::placeHolder(); + } + return false; +} + } bool -IdealStateOperation::checkBlock(const document::BucketId& bId, +IdealStateOperation::checkBlock(const document::Bucket &bucket, const PendingMessageTracker& tracker) const { IdealStateOpChecker ichk(*this); - RequestBucketInfoChecker rchk; const std::vector<uint16_t>& nodes(getNodes()); - for (size_t i = 0; i < nodes.size(); ++i) { - tracker.checkPendingMessages(nodes[i], bId, ichk); + for (auto node : nodes) { + tracker.checkPendingMessages(node, bucket, ichk); if (ichk.blocked) { return true; } - // Check messages sent to null-bucket (i.e. any bucket) for the node. - tracker.checkPendingMessages(nodes[i], document::BucketId(), rchk); - if (rchk.blocked) { + if (checkNullBucketRequestBucketInfoMessage(node, bucket.getBucketSpace(), tracker)) { return true; } } @@ -199,21 +225,18 @@ IdealStateOperation::checkBlock(const document::BucketId& bId, bool IdealStateOperation::checkBlockForAllNodes( - const document::BucketId& bid, + const document::Bucket &bucket, const PendingMessageTracker& tracker) const { IdealStateOpChecker ichk(*this); // Check messages sent to _any node_ for _this_ particular bucket. - tracker.checkPendingMessages(bid, ichk); + tracker.checkPendingMessages(bucket, ichk); if (ichk.blocked) { return true; } - RequestBucketInfoChecker rchk; - // Check messages sent to null-bucket (i.e. _any bucket_) for the node. const std::vector<uint16_t>& nodes(getNodes()); - for (size_t i = 0; i < nodes.size(); ++i) { - tracker.checkPendingMessages(nodes[i], document::BucketId(), rchk); - if (rchk.blocked) { + for (auto node : nodes) { + if (checkNullBucketRequestBucketInfoMessage(node, bucket.getBucketSpace(), tracker)) { return true; } } @@ -224,7 +247,7 @@ IdealStateOperation::checkBlockForAllNodes( bool IdealStateOperation::isBlocked(const PendingMessageTracker& tracker) const { - return checkBlock(getBucketId(), tracker); + return checkBlock(getBucket(), tracker); } std::string diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h index e0d96296b83..54a85b1873f 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h +++ b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h @@ -32,7 +32,7 @@ public: @param id Target bucket @param node Target node */ - BucketAndNodes(const document::BucketId& id, uint16_t node); + BucketAndNodes(const document::Bucket &id, uint16_t node); /** Constructor for operations with multiple target nodes. @@ -40,7 +40,7 @@ public: @param id Target bucket @param nodes Target nodes */ - BucketAndNodes(const document::BucketId& id, + BucketAndNodes(const document::Bucket &id, const std::vector<uint16_t>& nodes); /** @@ -48,16 +48,16 @@ public: @param id The new target bucket */ - void setBucketId(const document::BucketId& id) { _id = id; } + void setBucketId(const document::BucketId &id); /** Returns the target bucket. @return Returns the target bucket. */ - const document::BucketId& getBucketId() const { return _id; } + document::BucketId getBucketId() const { return _bucket.getBucketId(); } - document::Bucket getBucket() const { return document::Bucket(document::BucketSpace::placeHolder(), _id); } + document::Bucket getBucket() const { return _bucket; } /** Returns the target nodes @@ -81,7 +81,7 @@ public: std::string toString() const; private: - document::BucketId _id; + document::Bucket _bucket; std::vector<uint16_t> _nodes; }; @@ -142,7 +142,7 @@ public: @return The target bucket. */ - const document::BucketId& getBucketId() const { return _bucketAndNodes.getBucketId(); } + document::BucketId getBucketId() const { return _bucketAndNodes.getBucketId(); } document::Bucket getBucket() const { return _bucketAndNodes.getBucket(); } @@ -237,8 +237,8 @@ protected: * operations to other nodes for this bucket, these will not be part of * the set of messages checked. */ - bool checkBlock(const document::BucketId& bId, const PendingMessageTracker& tracker) const; - bool checkBlockForAllNodes(const document::BucketId& bId, const PendingMessageTracker& tracker) const; + bool checkBlock(const document::Bucket &bucket, const PendingMessageTracker& tracker) const; + bool checkBlockForAllNodes(const document::Bucket &bucket, const PendingMessageTracker& tracker) const; }; diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp index a73048e822a..aeca655d825 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp @@ -142,11 +142,17 @@ JoinOperation::onReceive(DistributorMessageSender&, const api::StorageReply::SP& } } +document::Bucket +JoinOperation::getJoinBucket(size_t idx) const +{ + return document::Bucket(getBucket().getBucketSpace(), _bucketsToJoin[idx]); +} + bool JoinOperation::isBlocked(const PendingMessageTracker& tracker) const { - return (checkBlock(getBucketId(), tracker) || - checkBlock(_bucketsToJoin[0], tracker) || - (_bucketsToJoin.size() > 1 && checkBlock(_bucketsToJoin[1], tracker))); + return (checkBlock(getBucket(), tracker) || + checkBlock(getJoinBucket(0), tracker) || + (_bucketsToJoin.size() > 1 && checkBlock(getJoinBucket(1), tracker))); } diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.h index ec3f36650fe..55a7c05264a 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.h +++ b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.h @@ -55,6 +55,8 @@ protected: */ bool enqueueJoinMessagePerTargetNode(const NodeToBuckets& nodeToBuckets); + document::Bucket getJoinBucket(size_t idx) const; + MessageTracker _tracker; std::vector<document::BucketId> _bucketsToJoin; }; diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp index 1b161b2c5f7..e889dbe279b 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp @@ -228,7 +228,7 @@ MergeOperation::deleteSourceOnlyNodes( _removeOperation.reset( new RemoveBucketOperation( _manager->getDistributorComponent().getClusterName(), - BucketAndNodes(getBucketId(), sourceOnlyNodes))); + BucketAndNodes(getBucket(), sourceOnlyNodes))); // Must not send removes to source only copies if something has caused // pending load to the copy after the merge was sent! if (_removeOperation->isBlocked(sender.getPendingMessageTracker())) { diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp index 1e6fc84adbd..d7cb42d63bd 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp @@ -166,7 +166,7 @@ SplitOperation::onReceive(DistributorMessageSender&, const api::StorageReply::SP bool SplitOperation::isBlocked(const PendingMessageTracker& tracker) const { - return checkBlockForAllNodes(getBucketId(), tracker); + return checkBlockForAllNodes(getBucket(), tracker); } bool diff --git a/storage/src/vespa/storage/distributor/operationtargetresolver.cpp b/storage/src/vespa/storage/distributor/operationtargetresolver.cpp index a862ffadc6f..f537c6b67b6 100644 --- a/storage/src/vespa/storage/distributor/operationtargetresolver.cpp +++ b/storage/src/vespa/storage/distributor/operationtargetresolver.cpp @@ -6,6 +6,12 @@ namespace storage { namespace distributor { +document::Bucket +OperationTarget::getBucket() const +{ + return document::Bucket(document::BucketSpace::placeHolder(), _bucket); +} + void OperationTarget::print(vespalib::asciistream& out, const PrintProperties&) const { out << "OperationTarget(" << _bucket << ", " << _node diff --git a/storage/src/vespa/storage/distributor/operationtargetresolver.h b/storage/src/vespa/storage/distributor/operationtargetresolver.h index 55202160fea..b9f7537b5f5 100644 --- a/storage/src/vespa/storage/distributor/operationtargetresolver.h +++ b/storage/src/vespa/storage/distributor/operationtargetresolver.h @@ -6,7 +6,7 @@ */ #pragma once -#include <vespa/document/bucket/bucketid.h> +#include <vespa/document/bucket/bucket.h> #include <vespa/vdslib/state/node.h> #include <vespa/vespalib/util/printable.h> @@ -25,6 +25,7 @@ public: : _bucket(id), _node(node), _newCopy(newCopy) {} const document::BucketId& getBucketId() const { return _bucket; } + document::Bucket getBucket() const; const lib::Node& getNode() const { return _node; } bool isNewCopy() const { return _newCopy; } diff --git a/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp b/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp index aeadca17d13..ac3dd1ce25f 100644 --- a/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp +++ b/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp @@ -202,36 +202,36 @@ runCheckerOnRange(PendingMessageTracker::Checker& checker, const Range& range) void PendingMessageTracker::checkPendingMessages(uint16_t node, - const document::BucketId& bid, + const document::Bucket &bucket, Checker& checker) const { vespalib::LockGuard guard(_lock); const MessagesByNodeAndBucket& msgs(boost::multi_index::get<1>(_messages)); - auto range = pairAsRange(msgs.equal_range(boost::make_tuple(node, bid))); + auto range = pairAsRange(msgs.equal_range(boost::make_tuple(node, bucket.getBucketId()))); runCheckerOnRange(checker, range); } void -PendingMessageTracker::checkPendingMessages(const document::BucketId& bid, +PendingMessageTracker::checkPendingMessages(const document::Bucket &bucket, Checker& checker) const { vespalib::LockGuard guard(_lock); const MessagesByBucketAndType& msgs(boost::multi_index::get<2>(_messages)); - auto range = pairAsRange(msgs.equal_range(boost::make_tuple(bid))); + auto range = pairAsRange(msgs.equal_range(boost::make_tuple(bucket.getBucketId()))); runCheckerOnRange(checker, range); } bool PendingMessageTracker::hasPendingMessage(uint16_t node, - const document::BucketId& bid, + const document::Bucket &bucket, uint32_t messageType) const { vespalib::LockGuard guard(_lock); const MessagesByNodeAndBucket& msgs(boost::multi_index::get<1>(_messages)); - auto range = msgs.equal_range(boost::make_tuple(node, bid, messageType)); + auto range = msgs.equal_range(boost::make_tuple(node, bucket.getBucketId(), messageType)); return (range.first != range.second); } diff --git a/storage/src/vespa/storage/distributor/pendingmessagetracker.h b/storage/src/vespa/storage/distributor/pendingmessagetracker.h index 9b49e94d51f..c1c3e80114b 100644 --- a/storage/src/vespa/storage/distributor/pendingmessagetracker.h +++ b/storage/src/vespa/storage/distributor/pendingmessagetracker.h @@ -75,7 +75,7 @@ public: * Breaks when the checker returns false. */ void checkPendingMessages(uint16_t node, - const document::BucketId& bid, + const document::Bucket &bucket, Checker& checker) const; /** @@ -83,7 +83,7 @@ public: * and invokes the given checker with the node, message type and priority. * Breaks when the checker returns false. */ - void checkPendingMessages(const document::BucketId& bid, + void checkPendingMessages(const document::Bucket &bucket, Checker& checker) const; /** @@ -91,7 +91,7 @@ public: * messageType pending to bucket bid on the given node. */ bool hasPendingMessage(uint16_t node, - const document::BucketId& bid, + const document::Bucket &bucket, uint32_t messageType) const; /** diff --git a/storage/src/vespa/storage/distributor/statechecker.h b/storage/src/vespa/storage/distributor/statechecker.h index a8e198427f7..fbadd5642d4 100644 --- a/storage/src/vespa/storage/distributor/statechecker.h +++ b/storage/src/vespa/storage/distributor/statechecker.h @@ -82,6 +82,8 @@ public: return siblingEntry; } + document::Bucket getBucket() const { return document::Bucket(document::BucketSpace::placeHolder(), bucketId); } + std::string toString() const; }; diff --git a/storage/src/vespa/storage/distributor/statecheckers.cpp b/storage/src/vespa/storage/distributor/statecheckers.cpp index 701a147a56f..35d111a8c38 100644 --- a/storage/src/vespa/storage/distributor/statecheckers.cpp +++ b/storage/src/vespa/storage/distributor/statecheckers.cpp @@ -15,6 +15,8 @@ #include <vespa/log/log.h> LOG_SETUP(".distributor.operation.checkers"); +using document::BucketSpace; + namespace storage { namespace distributor { @@ -84,7 +86,7 @@ SplitBucketStateChecker::generateMinimumBucketSplitOperation( { IdealStateOperation::UP so(new SplitOperation( c.component.getClusterName(), - BucketAndNodes(c.bucketId, c.entry->getNodes()), + BucketAndNodes(c.getBucket(), c.entry->getNodes()), c.distributorConfig.getMinimalBucketSplit(), 0, 0)); @@ -103,7 +105,7 @@ SplitBucketStateChecker::generateMaxSizeExceededSplitOperation( { IdealStateOperation::UP so(new SplitOperation( c.component.getClusterName(), - BucketAndNodes(c.bucketId, c.entry->getNodes()), + BucketAndNodes(c.getBucket(), c.entry->getNodes()), 58, c.distributorConfig.getSplitCount(), c.distributorConfig.getSplitSize())); @@ -420,7 +422,7 @@ bucketHasMultipleChildren(const document::BucketId& bucket, } -document::BucketId +document::Bucket JoinBucketsStateChecker::computeJoinBucket(const Context& c) const { // Always decrease by at least 1 bit, as we could not get here unless this @@ -443,7 +445,7 @@ JoinBucketsStateChecker::computeJoinBucket(const Context& c) const --level; target = candidate; } - return target; + return document::Bucket(BucketSpace::placeHolder(), target); } StateChecker::Result @@ -455,8 +457,8 @@ JoinBucketsStateChecker::check(StateChecker::Context& c) return Result::noMaintenanceNeeded(); } - document::BucketId joinedBucket(computeJoinBucket(c)); - assert(joinedBucket.getUsedBits() < c.bucketId.getUsedBits()); + document::Bucket joinedBucket(computeJoinBucket(c)); + assert(joinedBucket.getBucketId().getUsedBits() < c.bucketId.getUsedBits()); std::vector<document::BucketId> sourceBuckets; if (c.getSiblingEntry().valid()) { @@ -572,7 +574,7 @@ SplitInconsistentStateChecker::check(StateChecker::Context& c) IdealStateOperation::UP op(new SplitOperation( c.component.getClusterName(), - BucketAndNodes(c.bucketId, c.entry->getNodes()), + BucketAndNodes(c.getBucket(), c.entry->getNodes()), getHighestUsedBits(c.entries), 0, 0)); @@ -842,7 +844,7 @@ SynchronizeAndMoveStateChecker::check(StateChecker::Context& c) if (result.shouldMerge()) { IdealStateOperation::UP op( - new MergeOperation(BucketAndNodes(c.bucketId, result.nodes()), + new MergeOperation(BucketAndNodes(c.getBucket(), result.nodes()), c.distributorConfig.getMaxNodesPerMerge())); op->setPriority(result.priority()); op->setDetailedReason(result.reason()); @@ -986,7 +988,7 @@ DeleteExtraCopiesStateChecker::check(StateChecker::Context& c) if (!removedCopies.empty()) { IdealStateOperation::UP ro(new RemoveBucketOperation( c.component.getClusterName(), - BucketAndNodes(c.bucketId, removedCopies))); + BucketAndNodes(c.getBucket(), removedCopies))); ro->setPriority(c.distributorConfig.getMaintenancePriorities() .deleteBucketCopy); @@ -1087,7 +1089,7 @@ BucketStateStateChecker::check(StateChecker::Context& c) } auto op = std::make_unique<SetBucketStateOperation>( c.component.getClusterName(), - BucketAndNodes(c.bucketId, operationNodes), + BucketAndNodes(c.getBucket(), operationNodes), activeNodeIndexes); // If activeNodes > 1, we're dealing with a active-per-leaf group case and @@ -1127,7 +1129,7 @@ GarbageCollectionStateChecker::check(Context& c) IdealStateOperation::UP op( new GarbageCollectionOperation( c.component.getClusterName(), - BucketAndNodes(c.bucketId, c.entry->getNodes()))); + BucketAndNodes(c.getBucket(), c.entry->getNodes()))); vespalib::asciistream reason; reason << "[Needs garbage collection: Last check at " diff --git a/storage/src/vespa/storage/distributor/statecheckers.h b/storage/src/vespa/storage/distributor/statecheckers.h index f474d75a6b6..4f72d31aac5 100644 --- a/storage/src/vespa/storage/distributor/statecheckers.h +++ b/storage/src/vespa/storage/distributor/statecheckers.h @@ -56,7 +56,7 @@ private: bool smallEnoughToJoin(const Context& c) const; bool singleBucketJoinIsEnabled(const Context&) const; bool singleBucketJoinIsConsistent(const Context& c) const; - document::BucketId computeJoinBucket(const Context& c) const; + document::Bucket computeJoinBucket(const Context& c) const; }; class SplitBucketStateChecker : public StateChecker |