summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@oath.com>2017-10-26 12:28:44 +0000
committerTor Egge <Tor.Egge@oath.com>2017-10-26 12:31:55 +0000
commitc5518c77b93ce135fc02f28b121614074d772392 (patch)
treee00dc8387513ecd804d60e8517e1da9d766b53ef
parent4184bb24b0e5cead726f204457d34103ee6d3234 (diff)
Use document::Bucket instead of document::BucketId in api
checking for pending messages in PendingMessageTracker. Bucket space portion is still ignored by implementation.
-rw-r--r--storage/src/tests/distributor/idealstatemanagertest.cpp4
-rw-r--r--storage/src/tests/distributor/pendingmessagetrackertest.cpp22
-rw-r--r--storage/src/vespa/storage/distributor/distributor.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/putoperation.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp31
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h4
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp12
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.h2
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/operationtargetresolver.cpp6
-rw-r--r--storage/src/vespa/storage/distributor/operationtargetresolver.h3
-rw-r--r--storage/src/vespa/storage/distributor/pendingmessagetracker.cpp12
-rw-r--r--storage/src/vespa/storage/distributor/pendingmessagetracker.h6
13 files changed, 70 insertions, 38 deletions
diff --git a/storage/src/tests/distributor/idealstatemanagertest.cpp b/storage/src/tests/distributor/idealstatemanagertest.cpp
index 4f9d8bdf4d4..0c695f9a3d4 100644
--- a/storage/src/tests/distributor/idealstatemanagertest.cpp
+++ b/storage/src/tests/distributor/idealstatemanagertest.cpp
@@ -253,9 +253,9 @@ IdealStateManagerTest::testBlockCheckForAllOperationsToSpecificBucket()
RemoveBucketOperation op("storage",
BucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(7)));
// Not blocked for exact node match.
- CPPUNIT_ASSERT(!op.checkBlock(bid, tracker));
+ CPPUNIT_ASSERT(!op.checkBlock(makeDocumentBucket(bid), tracker));
// But blocked for bucket match!
- CPPUNIT_ASSERT(op.checkBlockForAllNodes(bid, tracker));
+ CPPUNIT_ASSERT(op.checkBlockForAllNodes(makeDocumentBucket(bid), tracker));
}
}
diff --git a/storage/src/tests/distributor/pendingmessagetrackertest.cpp b/storage/src/tests/distributor/pendingmessagetrackertest.cpp
index 6aba8e1fb98..8e9c016ea54 100644
--- a/storage/src/tests/distributor/pendingmessagetrackertest.cpp
+++ b/storage/src/tests/distributor/pendingmessagetrackertest.cpp
@@ -445,19 +445,19 @@ PendingMessageTrackerTest::testGetPendingMessageTypes()
{
TestChecker checker;
- tracker.checkPendingMessages(0, bid, checker);
+ tracker.checkPendingMessages(0, makeDocumentBucket(bid), checker);
CPPUNIT_ASSERT_EQUAL(127, (int)checker.pri);
}
{
TestChecker checker;
- tracker.checkPendingMessages(0, document::BucketId(16, 1235), checker);
+ tracker.checkPendingMessages(0, makeDocumentBucket(document::BucketId(16, 1235)), checker);
CPPUNIT_ASSERT_EQUAL(255, (int)checker.pri);
}
{
TestChecker checker;
- tracker.checkPendingMessages(1, bid, checker);
+ tracker.checkPendingMessages(1, makeDocumentBucket(bid), checker);
CPPUNIT_ASSERT_EQUAL(255, (int)checker.pri);
}
}
@@ -472,7 +472,7 @@ PendingMessageTrackerTest::testHasPendingMessage()
PendingMessageTracker tracker(compReg);
document::BucketId bid(16, 1234);
- CPPUNIT_ASSERT(!tracker.hasPendingMessage(1, bid, api::MessageType::REMOVE_ID));
+ CPPUNIT_ASSERT(!tracker.hasPendingMessage(1, makeDocumentBucket(bid), api::MessageType::REMOVE_ID));
{
std::shared_ptr<api::RemoveCommand> remove(
@@ -484,13 +484,13 @@ PendingMessageTrackerTest::testHasPendingMessage()
tracker.insert(remove);
}
- CPPUNIT_ASSERT(tracker.hasPendingMessage(1, bid, api::MessageType::REMOVE_ID));
- CPPUNIT_ASSERT(!tracker.hasPendingMessage(0, bid, api::MessageType::REMOVE_ID));
- CPPUNIT_ASSERT(!tracker.hasPendingMessage(2, bid, api::MessageType::REMOVE_ID));
+ CPPUNIT_ASSERT(tracker.hasPendingMessage(1, makeDocumentBucket(bid), api::MessageType::REMOVE_ID));
+ CPPUNIT_ASSERT(!tracker.hasPendingMessage(0, makeDocumentBucket(bid), api::MessageType::REMOVE_ID));
+ CPPUNIT_ASSERT(!tracker.hasPendingMessage(2, makeDocumentBucket(bid), api::MessageType::REMOVE_ID));
CPPUNIT_ASSERT(!tracker.hasPendingMessage(1,
- document::BucketId(16, 1233),
+ makeDocumentBucket(document::BucketId(16, 1233)),
api::MessageType::REMOVE_ID));
- CPPUNIT_ASSERT(!tracker.hasPendingMessage(1, bid, api::MessageType::DELETEBUCKET_ID));
+ CPPUNIT_ASSERT(!tracker.hasPendingMessage(1, makeDocumentBucket(bid), api::MessageType::DELETEBUCKET_ID));
}
namespace {
@@ -527,7 +527,7 @@ PendingMessageTrackerTest::testGetAllMessagesForSingleBucket()
{
OperationEnumerator enumerator;
- tracker.checkPendingMessages(document::BucketId(16, 1234), enumerator);
+ tracker.checkPendingMessages(makeDocumentBucket(document::BucketId(16, 1234)), enumerator);
CPPUNIT_ASSERT_EQUAL(std::string("Remove -> 0\n"
"Remove -> 0\n"
"Remove -> 1\n"
@@ -536,7 +536,7 @@ PendingMessageTrackerTest::testGetAllMessagesForSingleBucket()
}
{
OperationEnumerator enumerator;
- tracker.checkPendingMessages(document::BucketId(16, 9876), enumerator);
+ tracker.checkPendingMessages(makeDocumentBucket(document::BucketId(16, 9876)), enumerator);
CPPUNIT_ASSERT_EQUAL(std::string(""), enumerator.str());
}
}
diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp
index 3dfe5103654..fa00a0b74b8 100644
--- a/storage/src/vespa/storage/distributor/distributor.cpp
+++ b/storage/src/vespa/storage/distributor/distributor.cpp
@@ -518,7 +518,7 @@ Distributor::checkBucketForSplit(const BucketDatabase::Entry& e,
SplitChecker checker(priority);
for (uint32_t i = 0; i < e->getNodeCount(); ++i) {
_pendingMessageTracker.checkPendingMessages(e->getNodeRef(i).getNode(),
- e.getBucketId(),
+ document::Bucket(document::BucketSpace::placeHolder(), e.getBucketId()),
checker);
if (checker.found) {
return;
diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
index b1fa0d98bb0..dafc2b71e6a 100644
--- a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
@@ -286,7 +286,7 @@ PutOperation::onStart(DistributorMessageSender& sender)
for (size_t i = 0; i < targets.size(); ++i) {
if (_manager.getDistributor().getPendingMessageTracker().
hasPendingMessage(targets[i].getNode().getIndex(),
- targets[i].getBucketId(),
+ targets[i].getBucket(),
api::MessageType::DELETEBUCKET_ID))
{
_tracker.fail(sender, api::ReturnCode(api::ReturnCode::BUCKET_DELETED,
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp
index 09ea3566883..2b6739623ec 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp
@@ -11,6 +11,7 @@ LOG_SETUP(".distributor.operation");
using namespace storage;
using namespace storage::distributor;
+using document::BucketSpace;
const uint32_t IdealStateOperation::MAINTENANCE_MESSAGE_TYPES[] =
{
@@ -184,34 +185,42 @@ public:
}
bool
-IdealStateOperation::checkBlock(const document::BucketId& bId,
+IdealStateOperation::checkBlock(const document::Bucket &bucket,
const PendingMessageTracker& tracker) const
{
IdealStateOpChecker ichk(*this);
RequestBucketInfoChecker rchk;
const std::vector<uint16_t>& nodes(getNodes());
for (size_t i = 0; i < nodes.size(); ++i) {
- tracker.checkPendingMessages(nodes[i], bId, ichk);
+ tracker.checkPendingMessages(nodes[i], bucket, ichk);
if (ichk.blocked) {
return true;
}
// Check messages sent to null-bucket (i.e. any bucket) for the node.
- tracker.checkPendingMessages(nodes[i], document::BucketId(), rchk);
+ document::Bucket nullBucket(bucket.getBucketSpace(), document::BucketId());
+ tracker.checkPendingMessages(nodes[i], nullBucket, rchk);
if (rchk.blocked) {
return true;
}
+ if (bucket.getBucketSpace() != BucketSpace::placeHolder()) {
+ nullBucket = document::Bucket(BucketSpace::placeHolder(), document::BucketId());
+ tracker.checkPendingMessages(nodes[i], nullBucket, rchk);
+ if (rchk.blocked) {
+ return true;
+ }
+ }
}
return false;
}
bool
IdealStateOperation::checkBlockForAllNodes(
- const document::BucketId& bid,
+ const document::Bucket &bucket,
const PendingMessageTracker& tracker) const
{
IdealStateOpChecker ichk(*this);
// Check messages sent to _any node_ for _this_ particular bucket.
- tracker.checkPendingMessages(bid, ichk);
+ tracker.checkPendingMessages(bucket, ichk);
if (ichk.blocked) {
return true;
}
@@ -219,10 +228,18 @@ IdealStateOperation::checkBlockForAllNodes(
// Check messages sent to null-bucket (i.e. _any bucket_) for the node.
const std::vector<uint16_t>& nodes(getNodes());
for (size_t i = 0; i < nodes.size(); ++i) {
- tracker.checkPendingMessages(nodes[i], document::BucketId(), rchk);
+ document::Bucket nullBucket(bucket.getBucketSpace(), document::BucketId());
+ tracker.checkPendingMessages(nodes[i], nullBucket, rchk);
if (rchk.blocked) {
return true;
}
+ if (bucket.getBucketSpace() != BucketSpace::placeHolder()) {
+ nullBucket = document::Bucket(BucketSpace::placeHolder(), document::BucketId());
+ tracker.checkPendingMessages(nodes[i], nullBucket, rchk);
+ if (rchk.blocked) {
+ return true;
+ }
+ }
}
return false;
}
@@ -231,7 +248,7 @@ IdealStateOperation::checkBlockForAllNodes(
bool
IdealStateOperation::isBlocked(const PendingMessageTracker& tracker) const
{
- return checkBlock(getBucketId(), tracker);
+ return checkBlock(getBucket(), tracker);
}
std::string
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h
index 52f65d8781b..54a85b1873f 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h
@@ -237,8 +237,8 @@ protected:
* operations to other nodes for this bucket, these will not be part of
* the set of messages checked.
*/
- bool checkBlock(const document::BucketId& bId, const PendingMessageTracker& tracker) const;
- bool checkBlockForAllNodes(const document::BucketId& bId, const PendingMessageTracker& tracker) const;
+ bool checkBlock(const document::Bucket &bucket, const PendingMessageTracker& tracker) const;
+ bool checkBlockForAllNodes(const document::Bucket &bucket, const PendingMessageTracker& tracker) const;
};
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp
index a73048e822a..aeca655d825 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp
@@ -142,11 +142,17 @@ JoinOperation::onReceive(DistributorMessageSender&, const api::StorageReply::SP&
}
}
+document::Bucket
+JoinOperation::getJoinBucket(size_t idx) const
+{
+ return document::Bucket(getBucket().getBucketSpace(), _bucketsToJoin[idx]);
+}
+
bool
JoinOperation::isBlocked(const PendingMessageTracker& tracker) const
{
- return (checkBlock(getBucketId(), tracker) ||
- checkBlock(_bucketsToJoin[0], tracker) ||
- (_bucketsToJoin.size() > 1 && checkBlock(_bucketsToJoin[1], tracker)));
+ return (checkBlock(getBucket(), tracker) ||
+ checkBlock(getJoinBucket(0), tracker) ||
+ (_bucketsToJoin.size() > 1 && checkBlock(getJoinBucket(1), tracker)));
}
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.h
index ec3f36650fe..55a7c05264a 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.h
@@ -55,6 +55,8 @@ protected:
*/
bool enqueueJoinMessagePerTargetNode(const NodeToBuckets& nodeToBuckets);
+ document::Bucket getJoinBucket(size_t idx) const;
+
MessageTracker _tracker;
std::vector<document::BucketId> _bucketsToJoin;
};
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp
index 1e6fc84adbd..d7cb42d63bd 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp
@@ -166,7 +166,7 @@ SplitOperation::onReceive(DistributorMessageSender&, const api::StorageReply::SP
bool
SplitOperation::isBlocked(const PendingMessageTracker& tracker) const
{
- return checkBlockForAllNodes(getBucketId(), tracker);
+ return checkBlockForAllNodes(getBucket(), tracker);
}
bool
diff --git a/storage/src/vespa/storage/distributor/operationtargetresolver.cpp b/storage/src/vespa/storage/distributor/operationtargetresolver.cpp
index a862ffadc6f..f537c6b67b6 100644
--- a/storage/src/vespa/storage/distributor/operationtargetresolver.cpp
+++ b/storage/src/vespa/storage/distributor/operationtargetresolver.cpp
@@ -6,6 +6,12 @@
namespace storage {
namespace distributor {
+document::Bucket
+OperationTarget::getBucket() const
+{
+ return document::Bucket(document::BucketSpace::placeHolder(), _bucket);
+}
+
void
OperationTarget::print(vespalib::asciistream& out, const PrintProperties&) const {
out << "OperationTarget(" << _bucket << ", " << _node
diff --git a/storage/src/vespa/storage/distributor/operationtargetresolver.h b/storage/src/vespa/storage/distributor/operationtargetresolver.h
index 55202160fea..b9f7537b5f5 100644
--- a/storage/src/vespa/storage/distributor/operationtargetresolver.h
+++ b/storage/src/vespa/storage/distributor/operationtargetresolver.h
@@ -6,7 +6,7 @@
*/
#pragma once
-#include <vespa/document/bucket/bucketid.h>
+#include <vespa/document/bucket/bucket.h>
#include <vespa/vdslib/state/node.h>
#include <vespa/vespalib/util/printable.h>
@@ -25,6 +25,7 @@ public:
: _bucket(id), _node(node), _newCopy(newCopy) {}
const document::BucketId& getBucketId() const { return _bucket; }
+ document::Bucket getBucket() const;
const lib::Node& getNode() const { return _node; }
bool isNewCopy() const { return _newCopy; }
diff --git a/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp b/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp
index aeadca17d13..ac3dd1ce25f 100644
--- a/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp
+++ b/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp
@@ -202,36 +202,36 @@ runCheckerOnRange(PendingMessageTracker::Checker& checker, const Range& range)
void
PendingMessageTracker::checkPendingMessages(uint16_t node,
- const document::BucketId& bid,
+ const document::Bucket &bucket,
Checker& checker) const
{
vespalib::LockGuard guard(_lock);
const MessagesByNodeAndBucket& msgs(boost::multi_index::get<1>(_messages));
- auto range = pairAsRange(msgs.equal_range(boost::make_tuple(node, bid)));
+ auto range = pairAsRange(msgs.equal_range(boost::make_tuple(node, bucket.getBucketId())));
runCheckerOnRange(checker, range);
}
void
-PendingMessageTracker::checkPendingMessages(const document::BucketId& bid,
+PendingMessageTracker::checkPendingMessages(const document::Bucket &bucket,
Checker& checker) const
{
vespalib::LockGuard guard(_lock);
const MessagesByBucketAndType& msgs(boost::multi_index::get<2>(_messages));
- auto range = pairAsRange(msgs.equal_range(boost::make_tuple(bid)));
+ auto range = pairAsRange(msgs.equal_range(boost::make_tuple(bucket.getBucketId())));
runCheckerOnRange(checker, range);
}
bool
PendingMessageTracker::hasPendingMessage(uint16_t node,
- const document::BucketId& bid,
+ const document::Bucket &bucket,
uint32_t messageType) const
{
vespalib::LockGuard guard(_lock);
const MessagesByNodeAndBucket& msgs(boost::multi_index::get<1>(_messages));
- auto range = msgs.equal_range(boost::make_tuple(node, bid, messageType));
+ auto range = msgs.equal_range(boost::make_tuple(node, bucket.getBucketId(), messageType));
return (range.first != range.second);
}
diff --git a/storage/src/vespa/storage/distributor/pendingmessagetracker.h b/storage/src/vespa/storage/distributor/pendingmessagetracker.h
index 9b49e94d51f..c1c3e80114b 100644
--- a/storage/src/vespa/storage/distributor/pendingmessagetracker.h
+++ b/storage/src/vespa/storage/distributor/pendingmessagetracker.h
@@ -75,7 +75,7 @@ public:
* Breaks when the checker returns false.
*/
void checkPendingMessages(uint16_t node,
- const document::BucketId& bid,
+ const document::Bucket &bucket,
Checker& checker) const;
/**
@@ -83,7 +83,7 @@ public:
* and invokes the given checker with the node, message type and priority.
* Breaks when the checker returns false.
*/
- void checkPendingMessages(const document::BucketId& bid,
+ void checkPendingMessages(const document::Bucket &bucket,
Checker& checker) const;
/**
@@ -91,7 +91,7 @@ public:
* messageType pending to bucket bid on the given node.
*/
bool hasPendingMessage(uint16_t node,
- const document::BucketId& bid,
+ const document::Bucket &bucket,
uint32_t messageType) const;
/**