summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahoo-inc.com>2017-10-27 15:31:44 +0200
committerGitHub <noreply@github.com>2017-10-27 15:31:44 +0200
commitb75f4a64a832a405da5012de6233f43296393933 (patch)
tree2e57a602230aac0ec60f6ab46368fd85498dadb2
parent88b7039c442a02ec2635024a375ed3535f91d8aa (diff)
parent7aa0bca94a914c3ec31871b06c564250dce2fc45 (diff)
Merge pull request #3901 from vespa-engine/toregge/use-document-bucket-in-ideal-state-operations
Toregge/use document bucket in ideal state operations
-rw-r--r--storage/src/tests/distributor/bucketstateoperationtest.cpp11
-rw-r--r--storage/src/tests/distributor/garbagecollectiontest.cpp5
-rw-r--r--storage/src/tests/distributor/idealstatemanagertest.cpp24
-rw-r--r--storage/src/tests/distributor/joinbuckettest.cpp7
-rw-r--r--storage/src/tests/distributor/mergeoperationtest.cpp10
-rw-r--r--storage/src/tests/distributor/pendingmessagetrackertest.cpp22
-rw-r--r--storage/src/tests/distributor/removebucketoperationtest.cpp9
-rw-r--r--storage/src/tests/distributor/splitbuckettest.cpp8
-rw-r--r--storage/src/vespa/storage/distributor/distributor.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/putoperation.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp63
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h18
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp12
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.h2
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/operationtargetresolver.cpp6
-rw-r--r--storage/src/vespa/storage/distributor/operationtargetresolver.h3
-rw-r--r--storage/src/vespa/storage/distributor/pendingmessagetracker.cpp12
-rw-r--r--storage/src/vespa/storage/distributor/pendingmessagetracker.h6
-rw-r--r--storage/src/vespa/storage/distributor/statechecker.h2
-rw-r--r--storage/src/vespa/storage/distributor/statecheckers.cpp24
-rw-r--r--storage/src/vespa/storage/distributor/statecheckers.h2
23 files changed, 154 insertions, 100 deletions
diff --git a/storage/src/tests/distributor/bucketstateoperationtest.cpp b/storage/src/tests/distributor/bucketstateoperationtest.cpp
index 32fbea1be7d..216a051be15 100644
--- a/storage/src/tests/distributor/bucketstateoperationtest.cpp
+++ b/storage/src/tests/distributor/bucketstateoperationtest.cpp
@@ -4,6 +4,9 @@
#include <tests/distributor/distributortestutil.h>
#include <vespa/storage/distributor/operations/idealstate/setbucketstateoperation.h>
#include <vespa/storage/distributor/distributor.h>
+#include <vespa/document/test/make_document_bucket.h>
+
+using document::test::makeDocumentBucket;
namespace storage {
@@ -59,7 +62,7 @@ BucketStateOperationTest::testActivateSingleNode()
document::BucketId bid(16, 1);
insertBucketInfo(bid, 0, 0xabc, 10, 1100, true, false);
- BucketAndNodes bucketAndNodes(bid, toVector<uint16_t>(0));
+ BucketAndNodes bucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(0));
std::vector<uint16_t> active;
active.push_back(0);
SetBucketStateOperation op("storage", bucketAndNodes, active);
@@ -100,7 +103,7 @@ BucketStateOperationTest::testActivateAndDeactivateNodes()
insertBucketInfo(bid, 0, 0xabc, 10, 1100, false, true);
insertBucketInfo(bid, 1, 0xdef, 15, 1500, false, false);
- BucketAndNodes bucketAndNodes(bid, toVector<uint16_t>(0, 1));
+ BucketAndNodes bucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(0, 1));
std::vector<uint16_t> active;
active.push_back(1);
SetBucketStateOperation op("storage", bucketAndNodes, active);
@@ -165,7 +168,7 @@ BucketStateOperationTest::testDoNotDeactivateIfActivateFails()
insertBucketInfo(bid, 0, 0xabc, 10, 1100, false, true);
insertBucketInfo(bid, 1, 0xdef, 15, 1500, false, false);
- BucketAndNodes bucketAndNodes(bid, toVector<uint16_t>(0, 1));
+ BucketAndNodes bucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(0, 1));
std::vector<uint16_t> active;
active.push_back(1);
SetBucketStateOperation op("storage", bucketAndNodes, active);
@@ -214,7 +217,7 @@ BucketStateOperationTest::testBucketDbNotUpdatedOnFailure()
document::BucketId bid(16, 1);
insertBucketInfo(bid, 0, 0xabc, 10, 1100, true, false);
- BucketAndNodes bucketAndNodes(bid, toVector<uint16_t>(0));
+ BucketAndNodes bucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(0));
std::vector<uint16_t> active;
active.push_back(0);
SetBucketStateOperation op("storage", bucketAndNodes, active);
diff --git a/storage/src/tests/distributor/garbagecollectiontest.cpp b/storage/src/tests/distributor/garbagecollectiontest.cpp
index 7a0ba75a988..f65fa4b0cd3 100644
--- a/storage/src/tests/distributor/garbagecollectiontest.cpp
+++ b/storage/src/tests/distributor/garbagecollectiontest.cpp
@@ -6,6 +6,9 @@
#include <vespa/storage/distributor/idealstatemanager.h>
#include <tests/distributor/distributortestutil.h>
#include <vespa/storage/distributor/distributor.h>
+#include <vespa/document/test/make_document_bucket.h>
+
+using document::test::makeDocumentBucket;
namespace storage {
namespace distributor {
@@ -39,7 +42,7 @@ GarbageCollectionOperationTest::testSimple()
getConfig().setGarbageCollection("music.date < 34", 3600);
GarbageCollectionOperation op("storage",
- BucketAndNodes(document::BucketId(16, 1),
+ BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)),
toVector<uint16_t>(0, 1)));
op.setIdealStateManager(&getIdealStateManager());
diff --git a/storage/src/tests/distributor/idealstatemanagertest.cpp b/storage/src/tests/distributor/idealstatemanagertest.cpp
index d4d7a00a2df..0c695f9a3d4 100644
--- a/storage/src/tests/distributor/idealstatemanagertest.cpp
+++ b/storage/src/tests/distributor/idealstatemanagertest.cpp
@@ -143,9 +143,9 @@ IdealStateManagerTest::testClearActiveOnNodeDown()
}
CPPUNIT_ASSERT_EQUAL(
- std::string("setbucketstate to [0] BucketId(0x4000000000000001) (pri 100)\n"
- "setbucketstate to [0] BucketId(0x4000000000000002) (pri 100)\n"
- "setbucketstate to [0] BucketId(0x4000000000000003) (pri 100)\n"),
+ std::string("setbucketstate to [0] Bucket(BucketSpace(0x0000000000000000), BucketId(0x4000000000000001)) (pri 100)\n"
+ "setbucketstate to [0] Bucket(BucketSpace(0x0000000000000000), BucketId(0x4000000000000002)) (pri 100)\n"
+ "setbucketstate to [0] Bucket(BucketSpace(0x0000000000000000), BucketId(0x4000000000000003)) (pri 100)\n"),
_distributor->getActiveIdealStateOperations());
setSystemState(lib::ClusterState("distributor:1 storage:3 .0.s:d"));
@@ -169,19 +169,19 @@ IdealStateManagerTest::testRecheckWhenActive()
tick();
CPPUNIT_ASSERT_EQUAL(
- std::string("setbucketstate to [0] BucketId(0x4000000000000001) (pri 100)\n"),
+ std::string("setbucketstate to [0] Bucket(BucketSpace(0x0000000000000000), BucketId(0x4000000000000001)) (pri 100)\n"),
_distributor->getActiveIdealStateOperations());
tick();
CPPUNIT_ASSERT_EQUAL(
- std::string("setbucketstate to [0] BucketId(0x4000000000000001) (pri 100)\n"),
+ std::string("setbucketstate to [0] Bucket(BucketSpace(0x0000000000000000), BucketId(0x4000000000000001)) (pri 100)\n"),
_distributor->getActiveIdealStateOperations());
tick();
CPPUNIT_ASSERT_EQUAL(
- std::string("setbucketstate to [0] BucketId(0x4000000000000001) (pri 100)\n"),
+ std::string("setbucketstate to [0] Bucket(BucketSpace(0x0000000000000000), BucketId(0x4000000000000001)) (pri 100)\n"),
_distributor->getActiveIdealStateOperations());
}
@@ -208,14 +208,14 @@ IdealStateManagerTest::testBlockIdealStateOpsOnFullRequestBucketInfo()
{
RemoveBucketOperation op("storage",
- BucketAndNodes(bid, toVector<uint16_t>(3, 4)));
+ BucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(3, 4)));
CPPUNIT_ASSERT(op.isBlocked(tracker));
}
{
// Don't trigger on requests to other nodes.
RemoveBucketOperation op("storage",
- BucketAndNodes(bid, toVector<uint16_t>(3, 5)));
+ BucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(3, 5)));
CPPUNIT_ASSERT(!op.isBlocked(tracker));
}
@@ -230,7 +230,7 @@ IdealStateManagerTest::testBlockIdealStateOpsOnFullRequestBucketInfo()
{
RemoveBucketOperation op("storage",
- BucketAndNodes(bid, toVector<uint16_t>(7)));
+ BucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(7)));
CPPUNIT_ASSERT(!op.isBlocked(tracker));
}
}
@@ -251,11 +251,11 @@ IdealStateManagerTest::testBlockCheckForAllOperationsToSpecificBucket()
}
{
RemoveBucketOperation op("storage",
- BucketAndNodes(bid, toVector<uint16_t>(7)));
+ BucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(7)));
// Not blocked for exact node match.
- CPPUNIT_ASSERT(!op.checkBlock(bid, tracker));
+ CPPUNIT_ASSERT(!op.checkBlock(makeDocumentBucket(bid), tracker));
// But blocked for bucket match!
- CPPUNIT_ASSERT(op.checkBlockForAllNodes(bid, tracker));
+ CPPUNIT_ASSERT(op.checkBlockForAllNodes(makeDocumentBucket(bid), tracker));
}
}
diff --git a/storage/src/tests/distributor/joinbuckettest.cpp b/storage/src/tests/distributor/joinbuckettest.cpp
index 8b9a929d429..e0275214c34 100644
--- a/storage/src/tests/distributor/joinbuckettest.cpp
+++ b/storage/src/tests/distributor/joinbuckettest.cpp
@@ -4,6 +4,9 @@
#include <vespa/storage/distributor/operations/idealstate/joinoperation.h>
#include <vespa/storage/distributor/distributor.h>
#include <tests/distributor/distributortestutil.h>
+#include <vespa/document/test/make_document_bucket.h>
+
+using document::test::makeDocumentBucket;
namespace storage {
namespace distributor {
@@ -48,7 +51,7 @@ JoinOperationTest::testSimple()
_distributor->enableClusterState(lib::ClusterState("distributor:1 storage:1"));
JoinOperation op("storage",
- BucketAndNodes(document::BucketId(32, 0),
+ BucketAndNodes(makeDocumentBucket(document::BucketId(32, 0)),
toVector<uint16_t>(0)),
toVector(document::BucketId(33, 1),
document::BucketId(33, 0x100000001)));
@@ -109,7 +112,7 @@ JoinOperationTest::sendSparseJoinsToNodesWithoutBothSourceBuckets()
lib::ClusterState("distributor:1 storage:2"));
JoinOperation op("storage",
- BucketAndNodes(document::BucketId(32, 0),
+ BucketAndNodes(makeDocumentBucket(document::BucketId(32, 0)),
toVector<uint16_t>(0, 1)),
toVector(document::BucketId(33, 1),
document::BucketId(33, 0x100000001)));
diff --git a/storage/src/tests/distributor/mergeoperationtest.cpp b/storage/src/tests/distributor/mergeoperationtest.cpp
index f1db60a5ef4..7ca58ba54f3 100644
--- a/storage/src/tests/distributor/mergeoperationtest.cpp
+++ b/storage/src/tests/distributor/mergeoperationtest.cpp
@@ -72,7 +72,7 @@ MergeOperationTest::testSimple()
_distributor->enableClusterState(lib::ClusterState("distributor:1 storage:3"));
- MergeOperation op(BucketAndNodes(document::BucketId(16, 1),
+ MergeOperation op(BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)),
toVector<uint16_t>(0, 1, 2)));
op.setIdealStateManager(&getIdealStateManager());
op.start(_sender, framework::MilliSecTime(0));
@@ -105,7 +105,7 @@ MergeOperationTest::testFailIfSourceOnlyCopiesChanged()
_distributor->enableClusterState(lib::ClusterState("distributor:1 storage:3"));
- MergeOperation op(BucketAndNodes(document::BucketId(16, 1),
+ MergeOperation op(BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)),
toVector<uint16_t>(0, 1, 2)));
op.setIdealStateManager(&getIdealStateManager());
op.start(_sender, framework::MilliSecTime(0));
@@ -307,7 +307,7 @@ MergeOperationTest::doNotRemoveCopiesWithPendingMessages() {
"1=20/1/1,"
"2=10/1/1/t");
- MergeOperation op(BucketAndNodes(bucket,
+ MergeOperation op(BucketAndNodes(makeDocumentBucket(bucket),
toVector<uint16_t>(0, 1, 2)));
op.setIdealStateManager(&getIdealStateManager());
op.start(_sender, framework::MilliSecTime(0));
@@ -369,7 +369,7 @@ MergeOperationTest::allow_deleting_active_source_only_replica()
_distributor->enableClusterState(
lib::ClusterState("distributor:1 storage:3"));
- MergeOperation op(BucketAndNodes(document::BucketId(16, 1),
+ MergeOperation op(BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)),
toVector<uint16_t>(0, 1, 2)));
op.setIdealStateManager(&getIdealStateManager());
op.start(_sender, framework::MilliSecTime(0));
@@ -486,7 +486,7 @@ void MergeOperationTest::merge_operation_is_blocked_by_any_busy_target_node() {
getClock().setAbsoluteTimeInSeconds(10);
addNodesToBucketDB(document::BucketId(16, 1), "0=10/1/1/t,1=20/1/1,2=10/1/1/t");
_distributor->enableClusterState(lib::ClusterState("distributor:1 storage:3"));
- MergeOperation op(BucketAndNodes(document::BucketId(16, 1), toVector<uint16_t>(0, 1, 2)));
+ MergeOperation op(BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), toVector<uint16_t>(0, 1, 2)));
op.setIdealStateManager(&getIdealStateManager());
// Should not block on nodes _not_ included in operation node set
diff --git a/storage/src/tests/distributor/pendingmessagetrackertest.cpp b/storage/src/tests/distributor/pendingmessagetrackertest.cpp
index 6aba8e1fb98..8e9c016ea54 100644
--- a/storage/src/tests/distributor/pendingmessagetrackertest.cpp
+++ b/storage/src/tests/distributor/pendingmessagetrackertest.cpp
@@ -445,19 +445,19 @@ PendingMessageTrackerTest::testGetPendingMessageTypes()
{
TestChecker checker;
- tracker.checkPendingMessages(0, bid, checker);
+ tracker.checkPendingMessages(0, makeDocumentBucket(bid), checker);
CPPUNIT_ASSERT_EQUAL(127, (int)checker.pri);
}
{
TestChecker checker;
- tracker.checkPendingMessages(0, document::BucketId(16, 1235), checker);
+ tracker.checkPendingMessages(0, makeDocumentBucket(document::BucketId(16, 1235)), checker);
CPPUNIT_ASSERT_EQUAL(255, (int)checker.pri);
}
{
TestChecker checker;
- tracker.checkPendingMessages(1, bid, checker);
+ tracker.checkPendingMessages(1, makeDocumentBucket(bid), checker);
CPPUNIT_ASSERT_EQUAL(255, (int)checker.pri);
}
}
@@ -472,7 +472,7 @@ PendingMessageTrackerTest::testHasPendingMessage()
PendingMessageTracker tracker(compReg);
document::BucketId bid(16, 1234);
- CPPUNIT_ASSERT(!tracker.hasPendingMessage(1, bid, api::MessageType::REMOVE_ID));
+ CPPUNIT_ASSERT(!tracker.hasPendingMessage(1, makeDocumentBucket(bid), api::MessageType::REMOVE_ID));
{
std::shared_ptr<api::RemoveCommand> remove(
@@ -484,13 +484,13 @@ PendingMessageTrackerTest::testHasPendingMessage()
tracker.insert(remove);
}
- CPPUNIT_ASSERT(tracker.hasPendingMessage(1, bid, api::MessageType::REMOVE_ID));
- CPPUNIT_ASSERT(!tracker.hasPendingMessage(0, bid, api::MessageType::REMOVE_ID));
- CPPUNIT_ASSERT(!tracker.hasPendingMessage(2, bid, api::MessageType::REMOVE_ID));
+ CPPUNIT_ASSERT(tracker.hasPendingMessage(1, makeDocumentBucket(bid), api::MessageType::REMOVE_ID));
+ CPPUNIT_ASSERT(!tracker.hasPendingMessage(0, makeDocumentBucket(bid), api::MessageType::REMOVE_ID));
+ CPPUNIT_ASSERT(!tracker.hasPendingMessage(2, makeDocumentBucket(bid), api::MessageType::REMOVE_ID));
CPPUNIT_ASSERT(!tracker.hasPendingMessage(1,
- document::BucketId(16, 1233),
+ makeDocumentBucket(document::BucketId(16, 1233)),
api::MessageType::REMOVE_ID));
- CPPUNIT_ASSERT(!tracker.hasPendingMessage(1, bid, api::MessageType::DELETEBUCKET_ID));
+ CPPUNIT_ASSERT(!tracker.hasPendingMessage(1, makeDocumentBucket(bid), api::MessageType::DELETEBUCKET_ID));
}
namespace {
@@ -527,7 +527,7 @@ PendingMessageTrackerTest::testGetAllMessagesForSingleBucket()
{
OperationEnumerator enumerator;
- tracker.checkPendingMessages(document::BucketId(16, 1234), enumerator);
+ tracker.checkPendingMessages(makeDocumentBucket(document::BucketId(16, 1234)), enumerator);
CPPUNIT_ASSERT_EQUAL(std::string("Remove -> 0\n"
"Remove -> 0\n"
"Remove -> 1\n"
@@ -536,7 +536,7 @@ PendingMessageTrackerTest::testGetAllMessagesForSingleBucket()
}
{
OperationEnumerator enumerator;
- tracker.checkPendingMessages(document::BucketId(16, 9876), enumerator);
+ tracker.checkPendingMessages(makeDocumentBucket(document::BucketId(16, 9876)), enumerator);
CPPUNIT_ASSERT_EQUAL(std::string(""), enumerator.str());
}
}
diff --git a/storage/src/tests/distributor/removebucketoperationtest.cpp b/storage/src/tests/distributor/removebucketoperationtest.cpp
index d9f6518d694..f289d374509 100644
--- a/storage/src/tests/distributor/removebucketoperationtest.cpp
+++ b/storage/src/tests/distributor/removebucketoperationtest.cpp
@@ -7,6 +7,9 @@
#include <vespa/storage/distributor/idealstatemanager.h>
#include <vespa/storage/distributor/distributor.h>
#include <tests/distributor/distributortestutil.h>
+#include <vespa/document/test/make_document_bucket.h>
+
+using document::test::makeDocumentBucket;
namespace storage {
namespace distributor {
@@ -48,7 +51,7 @@ RemoveBucketOperationTest::testSimple()
_distributor->enableClusterState(lib::ClusterState("distributor:1 storage:3"));
RemoveBucketOperation op("storage",
- BucketAndNodes(document::BucketId(16, 1),
+ BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)),
toVector<uint16_t>(1,2)));
op.setIdealStateManager(&getIdealStateManager());
op.start(_sender, framework::MilliSecTime(0));
@@ -85,7 +88,7 @@ RemoveBucketOperationTest::testBucketInfoMismatchFailure()
_distributor->enableClusterState(lib::ClusterState("distributor:1 storage:2"));
RemoveBucketOperation op("storage",
- BucketAndNodes(document::BucketId(16, 1),
+ BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)),
toVector<uint16_t>(1)));
op.setIdealStateManager(&getIdealStateManager());
op.start(_sender, framework::MilliSecTime(0));
@@ -126,7 +129,7 @@ RemoveBucketOperationTest::testFailWithInvalidBucketInfo()
_distributor->enableClusterState(lib::ClusterState("distributor:1 storage:2"));
RemoveBucketOperation op("storage",
- BucketAndNodes(document::BucketId(16, 1),
+ BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)),
toVector<uint16_t>(1)));
op.setIdealStateManager(&getIdealStateManager());
op.start(_sender, framework::MilliSecTime(0));
diff --git a/storage/src/tests/distributor/splitbuckettest.cpp b/storage/src/tests/distributor/splitbuckettest.cpp
index 2caf8f106f1..47452aac391 100644
--- a/storage/src/tests/distributor/splitbuckettest.cpp
+++ b/storage/src/tests/distributor/splitbuckettest.cpp
@@ -76,7 +76,7 @@ SplitOperationTest::testSimple()
tooLargeBucketSize, 250);
SplitOperation op("storage",
- BucketAndNodes(document::BucketId(16, 1),
+ BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)),
toVector<uint16_t>(0)),
maxSplitBits,
splitCount,
@@ -156,7 +156,7 @@ SplitOperationTest::testMultiNodeFailure()
SplitOperation op("storage",
- BucketAndNodes(document::BucketId(16, 1),
+ BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)),
toVector<uint16_t>(0,1)),
maxSplitBits,
splitCount,
@@ -256,7 +256,7 @@ SplitOperationTest::testCopyTrustedStatusNotCarriedOverAfterSplit()
"2=550/60/70000");
SplitOperation op("storage",
- BucketAndNodes(sourceBucket, toVector<uint16_t>(0, 1)),
+ BucketAndNodes(makeDocumentBucket(sourceBucket), toVector<uint16_t>(0, 1)),
maxSplitBits,
splitCount,
splitByteSize);
@@ -329,7 +329,7 @@ SplitOperationTest::testOperationBlockedByPendingJoin()
insertBucketInfo(joinTarget, 0, 0xabc, 1000, 1234, 250);
SplitOperation op("storage",
- BucketAndNodes(joinTarget, toVector<uint16_t>(0)),
+ BucketAndNodes(makeDocumentBucket(joinTarget), toVector<uint16_t>(0)),
maxSplitBits,
splitCount,
splitByteSize);
diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp
index 3dfe5103654..fa00a0b74b8 100644
--- a/storage/src/vespa/storage/distributor/distributor.cpp
+++ b/storage/src/vespa/storage/distributor/distributor.cpp
@@ -518,7 +518,7 @@ Distributor::checkBucketForSplit(const BucketDatabase::Entry& e,
SplitChecker checker(priority);
for (uint32_t i = 0; i < e->getNodeCount(); ++i) {
_pendingMessageTracker.checkPendingMessages(e->getNodeRef(i).getNode(),
- e.getBucketId(),
+ document::Bucket(document::BucketSpace::placeHolder(), e.getBucketId()),
checker);
if (checker.found) {
return;
diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
index b1fa0d98bb0..dafc2b71e6a 100644
--- a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
@@ -286,7 +286,7 @@ PutOperation::onStart(DistributorMessageSender& sender)
for (size_t i = 0; i < targets.size(); ++i) {
if (_manager.getDistributor().getPendingMessageTracker().
hasPendingMessage(targets[i].getNode().getIndex(),
- targets[i].getBucketId(),
+ targets[i].getBucket(),
api::MessageType::DELETEBUCKET_ID))
{
_tracker.fail(sender, api::ReturnCode(api::ReturnCode::BUCKET_DELETED,
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp
index 2315c7ab745..3095dce7b87 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp
@@ -11,6 +11,7 @@ LOG_SETUP(".distributor.operation");
using namespace storage;
using namespace storage::distributor;
+using document::BucketSpace;
const uint32_t IdealStateOperation::MAINTENANCE_MESSAGE_TYPES[] =
{
@@ -35,21 +36,28 @@ IdealStateOperation::~IdealStateOperation()
{
}
-BucketAndNodes::BucketAndNodes(const document::BucketId& id, uint16_t node)
- : _id(id)
+BucketAndNodes::BucketAndNodes(const document::Bucket &bucket, uint16_t node)
+ : _bucket(bucket)
{
_nodes.push_back(node);
}
-BucketAndNodes::BucketAndNodes(const document::BucketId& id,
+BucketAndNodes::BucketAndNodes(const document::Bucket &bucket,
const std::vector<uint16_t>& nodes)
- : _id(id),
+ : _bucket(bucket),
_nodes(nodes)
{
assert(!nodes.empty());
std::sort(_nodes.begin(), _nodes.end());
}
+void
+BucketAndNodes::setBucketId(const document::BucketId &id)
+{
+ document::Bucket newBucket(_bucket.getBucketSpace(), id);
+ _bucket = newBucket;
+}
+
std::string
BucketAndNodes::toString() const
{
@@ -65,7 +73,7 @@ BucketAndNodes::toString() const
}
ost << "] ";
- ost << _id;
+ ost << _bucket.toString();
return ost.str();
}
@@ -174,23 +182,41 @@ public:
}
};
+bool
+checkNullBucketRequestBucketInfoMessage(uint16_t node,
+ document::BucketSpace bucketSpace,
+ const PendingMessageTracker& tracker)
+{
+ RequestBucketInfoChecker rchk;
+ for (;;) {
+ // Check messages sent to null-bucket (i.e. any bucket) for the node.
+ document::Bucket nullBucket(bucketSpace, document::BucketId());
+ tracker.checkPendingMessages(node, nullBucket, rchk);
+ if (rchk.blocked) {
+ return true;
+ }
+ if (bucketSpace == BucketSpace::placeHolder()) {
+ break;
+ }
+ bucketSpace = BucketSpace::placeHolder();
+ }
+ return false;
+}
+
}
bool
-IdealStateOperation::checkBlock(const document::BucketId& bId,
+IdealStateOperation::checkBlock(const document::Bucket &bucket,
const PendingMessageTracker& tracker) const
{
IdealStateOpChecker ichk(*this);
- RequestBucketInfoChecker rchk;
const std::vector<uint16_t>& nodes(getNodes());
- for (size_t i = 0; i < nodes.size(); ++i) {
- tracker.checkPendingMessages(nodes[i], bId, ichk);
+ for (auto node : nodes) {
+ tracker.checkPendingMessages(node, bucket, ichk);
if (ichk.blocked) {
return true;
}
- // Check messages sent to null-bucket (i.e. any bucket) for the node.
- tracker.checkPendingMessages(nodes[i], document::BucketId(), rchk);
- if (rchk.blocked) {
+ if (checkNullBucketRequestBucketInfoMessage(node, bucket.getBucketSpace(), tracker)) {
return true;
}
}
@@ -199,21 +225,18 @@ IdealStateOperation::checkBlock(const document::BucketId& bId,
bool
IdealStateOperation::checkBlockForAllNodes(
- const document::BucketId& bid,
+ const document::Bucket &bucket,
const PendingMessageTracker& tracker) const
{
IdealStateOpChecker ichk(*this);
// Check messages sent to _any node_ for _this_ particular bucket.
- tracker.checkPendingMessages(bid, ichk);
+ tracker.checkPendingMessages(bucket, ichk);
if (ichk.blocked) {
return true;
}
- RequestBucketInfoChecker rchk;
- // Check messages sent to null-bucket (i.e. _any bucket_) for the node.
const std::vector<uint16_t>& nodes(getNodes());
- for (size_t i = 0; i < nodes.size(); ++i) {
- tracker.checkPendingMessages(nodes[i], document::BucketId(), rchk);
- if (rchk.blocked) {
+ for (auto node : nodes) {
+ if (checkNullBucketRequestBucketInfoMessage(node, bucket.getBucketSpace(), tracker)) {
return true;
}
}
@@ -224,7 +247,7 @@ IdealStateOperation::checkBlockForAllNodes(
bool
IdealStateOperation::isBlocked(const PendingMessageTracker& tracker) const
{
- return checkBlock(getBucketId(), tracker);
+ return checkBlock(getBucket(), tracker);
}
std::string
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h
index e0d96296b83..54a85b1873f 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h
@@ -32,7 +32,7 @@ public:
@param id Target bucket
@param node Target node
*/
- BucketAndNodes(const document::BucketId& id, uint16_t node);
+ BucketAndNodes(const document::Bucket &id, uint16_t node);
/**
Constructor for operations with multiple target nodes.
@@ -40,7 +40,7 @@ public:
@param id Target bucket
@param nodes Target nodes
*/
- BucketAndNodes(const document::BucketId& id,
+ BucketAndNodes(const document::Bucket &id,
const std::vector<uint16_t>& nodes);
/**
@@ -48,16 +48,16 @@ public:
@param id The new target bucket
*/
- void setBucketId(const document::BucketId& id) { _id = id; }
+ void setBucketId(const document::BucketId &id);
/**
Returns the target bucket.
@return Returns the target bucket.
*/
- const document::BucketId& getBucketId() const { return _id; }
+ document::BucketId getBucketId() const { return _bucket.getBucketId(); }
- document::Bucket getBucket() const { return document::Bucket(document::BucketSpace::placeHolder(), _id); }
+ document::Bucket getBucket() const { return _bucket; }
/**
Returns the target nodes
@@ -81,7 +81,7 @@ public:
std::string toString() const;
private:
- document::BucketId _id;
+ document::Bucket _bucket;
std::vector<uint16_t> _nodes;
};
@@ -142,7 +142,7 @@ public:
@return The target bucket.
*/
- const document::BucketId& getBucketId() const { return _bucketAndNodes.getBucketId(); }
+ document::BucketId getBucketId() const { return _bucketAndNodes.getBucketId(); }
document::Bucket getBucket() const { return _bucketAndNodes.getBucket(); }
@@ -237,8 +237,8 @@ protected:
* operations to other nodes for this bucket, these will not be part of
* the set of messages checked.
*/
- bool checkBlock(const document::BucketId& bId, const PendingMessageTracker& tracker) const;
- bool checkBlockForAllNodes(const document::BucketId& bId, const PendingMessageTracker& tracker) const;
+ bool checkBlock(const document::Bucket &bucket, const PendingMessageTracker& tracker) const;
+ bool checkBlockForAllNodes(const document::Bucket &bucket, const PendingMessageTracker& tracker) const;
};
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp
index a73048e822a..aeca655d825 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp
@@ -142,11 +142,17 @@ JoinOperation::onReceive(DistributorMessageSender&, const api::StorageReply::SP&
}
}
+document::Bucket
+JoinOperation::getJoinBucket(size_t idx) const
+{
+ return document::Bucket(getBucket().getBucketSpace(), _bucketsToJoin[idx]);
+}
+
bool
JoinOperation::isBlocked(const PendingMessageTracker& tracker) const
{
- return (checkBlock(getBucketId(), tracker) ||
- checkBlock(_bucketsToJoin[0], tracker) ||
- (_bucketsToJoin.size() > 1 && checkBlock(_bucketsToJoin[1], tracker)));
+ return (checkBlock(getBucket(), tracker) ||
+ checkBlock(getJoinBucket(0), tracker) ||
+ (_bucketsToJoin.size() > 1 && checkBlock(getJoinBucket(1), tracker)));
}
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.h
index ec3f36650fe..55a7c05264a 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.h
@@ -55,6 +55,8 @@ protected:
*/
bool enqueueJoinMessagePerTargetNode(const NodeToBuckets& nodeToBuckets);
+ document::Bucket getJoinBucket(size_t idx) const;
+
MessageTracker _tracker;
std::vector<document::BucketId> _bucketsToJoin;
};
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
index 1b161b2c5f7..e889dbe279b 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
@@ -228,7 +228,7 @@ MergeOperation::deleteSourceOnlyNodes(
_removeOperation.reset(
new RemoveBucketOperation(
_manager->getDistributorComponent().getClusterName(),
- BucketAndNodes(getBucketId(), sourceOnlyNodes)));
+ BucketAndNodes(getBucket(), sourceOnlyNodes)));
// Must not send removes to source only copies if something has caused
// pending load to the copy after the merge was sent!
if (_removeOperation->isBlocked(sender.getPendingMessageTracker())) {
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp
index 1e6fc84adbd..d7cb42d63bd 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp
@@ -166,7 +166,7 @@ SplitOperation::onReceive(DistributorMessageSender&, const api::StorageReply::SP
bool
SplitOperation::isBlocked(const PendingMessageTracker& tracker) const
{
- return checkBlockForAllNodes(getBucketId(), tracker);
+ return checkBlockForAllNodes(getBucket(), tracker);
}
bool
diff --git a/storage/src/vespa/storage/distributor/operationtargetresolver.cpp b/storage/src/vespa/storage/distributor/operationtargetresolver.cpp
index a862ffadc6f..f537c6b67b6 100644
--- a/storage/src/vespa/storage/distributor/operationtargetresolver.cpp
+++ b/storage/src/vespa/storage/distributor/operationtargetresolver.cpp
@@ -6,6 +6,12 @@
namespace storage {
namespace distributor {
+document::Bucket
+OperationTarget::getBucket() const
+{
+ return document::Bucket(document::BucketSpace::placeHolder(), _bucket);
+}
+
void
OperationTarget::print(vespalib::asciistream& out, const PrintProperties&) const {
out << "OperationTarget(" << _bucket << ", " << _node
diff --git a/storage/src/vespa/storage/distributor/operationtargetresolver.h b/storage/src/vespa/storage/distributor/operationtargetresolver.h
index 55202160fea..b9f7537b5f5 100644
--- a/storage/src/vespa/storage/distributor/operationtargetresolver.h
+++ b/storage/src/vespa/storage/distributor/operationtargetresolver.h
@@ -6,7 +6,7 @@
*/
#pragma once
-#include <vespa/document/bucket/bucketid.h>
+#include <vespa/document/bucket/bucket.h>
#include <vespa/vdslib/state/node.h>
#include <vespa/vespalib/util/printable.h>
@@ -25,6 +25,7 @@ public:
: _bucket(id), _node(node), _newCopy(newCopy) {}
const document::BucketId& getBucketId() const { return _bucket; }
+ document::Bucket getBucket() const;
const lib::Node& getNode() const { return _node; }
bool isNewCopy() const { return _newCopy; }
diff --git a/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp b/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp
index aeadca17d13..ac3dd1ce25f 100644
--- a/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp
+++ b/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp
@@ -202,36 +202,36 @@ runCheckerOnRange(PendingMessageTracker::Checker& checker, const Range& range)
void
PendingMessageTracker::checkPendingMessages(uint16_t node,
- const document::BucketId& bid,
+ const document::Bucket &bucket,
Checker& checker) const
{
vespalib::LockGuard guard(_lock);
const MessagesByNodeAndBucket& msgs(boost::multi_index::get<1>(_messages));
- auto range = pairAsRange(msgs.equal_range(boost::make_tuple(node, bid)));
+ auto range = pairAsRange(msgs.equal_range(boost::make_tuple(node, bucket.getBucketId())));
runCheckerOnRange(checker, range);
}
void
-PendingMessageTracker::checkPendingMessages(const document::BucketId& bid,
+PendingMessageTracker::checkPendingMessages(const document::Bucket &bucket,
Checker& checker) const
{
vespalib::LockGuard guard(_lock);
const MessagesByBucketAndType& msgs(boost::multi_index::get<2>(_messages));
- auto range = pairAsRange(msgs.equal_range(boost::make_tuple(bid)));
+ auto range = pairAsRange(msgs.equal_range(boost::make_tuple(bucket.getBucketId())));
runCheckerOnRange(checker, range);
}
bool
PendingMessageTracker::hasPendingMessage(uint16_t node,
- const document::BucketId& bid,
+ const document::Bucket &bucket,
uint32_t messageType) const
{
vespalib::LockGuard guard(_lock);
const MessagesByNodeAndBucket& msgs(boost::multi_index::get<1>(_messages));
- auto range = msgs.equal_range(boost::make_tuple(node, bid, messageType));
+ auto range = msgs.equal_range(boost::make_tuple(node, bucket.getBucketId(), messageType));
return (range.first != range.second);
}
diff --git a/storage/src/vespa/storage/distributor/pendingmessagetracker.h b/storage/src/vespa/storage/distributor/pendingmessagetracker.h
index 9b49e94d51f..c1c3e80114b 100644
--- a/storage/src/vespa/storage/distributor/pendingmessagetracker.h
+++ b/storage/src/vespa/storage/distributor/pendingmessagetracker.h
@@ -75,7 +75,7 @@ public:
* Breaks when the checker returns false.
*/
void checkPendingMessages(uint16_t node,
- const document::BucketId& bid,
+ const document::Bucket &bucket,
Checker& checker) const;
/**
@@ -83,7 +83,7 @@ public:
* and invokes the given checker with the node, message type and priority.
* Breaks when the checker returns false.
*/
- void checkPendingMessages(const document::BucketId& bid,
+ void checkPendingMessages(const document::Bucket &bucket,
Checker& checker) const;
/**
@@ -91,7 +91,7 @@ public:
* messageType pending to bucket bid on the given node.
*/
bool hasPendingMessage(uint16_t node,
- const document::BucketId& bid,
+ const document::Bucket &bucket,
uint32_t messageType) const;
/**
diff --git a/storage/src/vespa/storage/distributor/statechecker.h b/storage/src/vespa/storage/distributor/statechecker.h
index a8e198427f7..fbadd5642d4 100644
--- a/storage/src/vespa/storage/distributor/statechecker.h
+++ b/storage/src/vespa/storage/distributor/statechecker.h
@@ -82,6 +82,8 @@ public:
return siblingEntry;
}
+ document::Bucket getBucket() const { return document::Bucket(document::BucketSpace::placeHolder(), bucketId); }
+
std::string toString() const;
};
diff --git a/storage/src/vespa/storage/distributor/statecheckers.cpp b/storage/src/vespa/storage/distributor/statecheckers.cpp
index 701a147a56f..35d111a8c38 100644
--- a/storage/src/vespa/storage/distributor/statecheckers.cpp
+++ b/storage/src/vespa/storage/distributor/statecheckers.cpp
@@ -15,6 +15,8 @@
#include <vespa/log/log.h>
LOG_SETUP(".distributor.operation.checkers");
+using document::BucketSpace;
+
namespace storage {
namespace distributor {
@@ -84,7 +86,7 @@ SplitBucketStateChecker::generateMinimumBucketSplitOperation(
{
IdealStateOperation::UP so(new SplitOperation(
c.component.getClusterName(),
- BucketAndNodes(c.bucketId, c.entry->getNodes()),
+ BucketAndNodes(c.getBucket(), c.entry->getNodes()),
c.distributorConfig.getMinimalBucketSplit(),
0,
0));
@@ -103,7 +105,7 @@ SplitBucketStateChecker::generateMaxSizeExceededSplitOperation(
{
IdealStateOperation::UP so(new SplitOperation(
c.component.getClusterName(),
- BucketAndNodes(c.bucketId, c.entry->getNodes()),
+ BucketAndNodes(c.getBucket(), c.entry->getNodes()),
58,
c.distributorConfig.getSplitCount(),
c.distributorConfig.getSplitSize()));
@@ -420,7 +422,7 @@ bucketHasMultipleChildren(const document::BucketId& bucket,
}
-document::BucketId
+document::Bucket
JoinBucketsStateChecker::computeJoinBucket(const Context& c) const
{
// Always decrease by at least 1 bit, as we could not get here unless this
@@ -443,7 +445,7 @@ JoinBucketsStateChecker::computeJoinBucket(const Context& c) const
--level;
target = candidate;
}
- return target;
+ return document::Bucket(BucketSpace::placeHolder(), target);
}
StateChecker::Result
@@ -455,8 +457,8 @@ JoinBucketsStateChecker::check(StateChecker::Context& c)
return Result::noMaintenanceNeeded();
}
- document::BucketId joinedBucket(computeJoinBucket(c));
- assert(joinedBucket.getUsedBits() < c.bucketId.getUsedBits());
+ document::Bucket joinedBucket(computeJoinBucket(c));
+ assert(joinedBucket.getBucketId().getUsedBits() < c.bucketId.getUsedBits());
std::vector<document::BucketId> sourceBuckets;
if (c.getSiblingEntry().valid()) {
@@ -572,7 +574,7 @@ SplitInconsistentStateChecker::check(StateChecker::Context& c)
IdealStateOperation::UP op(new SplitOperation(
c.component.getClusterName(),
- BucketAndNodes(c.bucketId, c.entry->getNodes()),
+ BucketAndNodes(c.getBucket(), c.entry->getNodes()),
getHighestUsedBits(c.entries),
0,
0));
@@ -842,7 +844,7 @@ SynchronizeAndMoveStateChecker::check(StateChecker::Context& c)
if (result.shouldMerge()) {
IdealStateOperation::UP op(
- new MergeOperation(BucketAndNodes(c.bucketId, result.nodes()),
+ new MergeOperation(BucketAndNodes(c.getBucket(), result.nodes()),
c.distributorConfig.getMaxNodesPerMerge()));
op->setPriority(result.priority());
op->setDetailedReason(result.reason());
@@ -986,7 +988,7 @@ DeleteExtraCopiesStateChecker::check(StateChecker::Context& c)
if (!removedCopies.empty()) {
IdealStateOperation::UP ro(new RemoveBucketOperation(
c.component.getClusterName(),
- BucketAndNodes(c.bucketId, removedCopies)));
+ BucketAndNodes(c.getBucket(), removedCopies)));
ro->setPriority(c.distributorConfig.getMaintenancePriorities()
.deleteBucketCopy);
@@ -1087,7 +1089,7 @@ BucketStateStateChecker::check(StateChecker::Context& c)
}
auto op = std::make_unique<SetBucketStateOperation>(
c.component.getClusterName(),
- BucketAndNodes(c.bucketId, operationNodes),
+ BucketAndNodes(c.getBucket(), operationNodes),
activeNodeIndexes);
// If activeNodes > 1, we're dealing with a active-per-leaf group case and
@@ -1127,7 +1129,7 @@ GarbageCollectionStateChecker::check(Context& c)
IdealStateOperation::UP op(
new GarbageCollectionOperation(
c.component.getClusterName(),
- BucketAndNodes(c.bucketId, c.entry->getNodes())));
+ BucketAndNodes(c.getBucket(), c.entry->getNodes())));
vespalib::asciistream reason;
reason << "[Needs garbage collection: Last check at "
diff --git a/storage/src/vespa/storage/distributor/statecheckers.h b/storage/src/vespa/storage/distributor/statecheckers.h
index f474d75a6b6..4f72d31aac5 100644
--- a/storage/src/vespa/storage/distributor/statecheckers.h
+++ b/storage/src/vespa/storage/distributor/statecheckers.h
@@ -56,7 +56,7 @@ private:
bool smallEnoughToJoin(const Context& c) const;
bool singleBucketJoinIsEnabled(const Context&) const;
bool singleBucketJoinIsConsistent(const Context& c) const;
- document::BucketId computeJoinBucket(const Context& c) const;
+ document::Bucket computeJoinBucket(const Context& c) const;
};
class SplitBucketStateChecker : public StateChecker