summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahoo-inc.com>2017-11-14 16:18:28 +0000
committerTor Brede Vekterli <vekterli@yahoo-inc.com>2017-11-14 16:18:28 +0000
commitb58dee9562801107fafac6bb3fb2d3b8a998d07a (patch)
tree9eb9de29b89c9421d01212805a65e5b217bae8ac /storage
parent3c2a0964ff6b1caed437fba425c40467560b3c37 (diff)
Track merges by Bucket instead of BucketId in MergeThrottler
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/storageserver/mergethrottlertest.cpp21
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.cpp16
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.h6
3 files changed, 16 insertions, 27 deletions
diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp
index 7d04714e9a8..3d469fc4252 100644
--- a/storage/src/tests/storageserver/mergethrottlertest.cpp
+++ b/storage/src/tests/storageserver/mergethrottlertest.cpp
@@ -313,7 +313,7 @@ MergeThrottlerTest::testChain()
_servers[i]->setClusterState(lib::ClusterState("distributor:100 storage:100 version:123"));
}
- BucketId bid(14, 0x1337);
+ Bucket bucket(makeDocumentBucket(BucketId(14, 0x1337)));
// Use different node permutations to ensure it works no matter which node is
// set as the executor. More specifically, _all_ permutations.
@@ -321,15 +321,11 @@ MergeThrottlerTest::testChain()
uint16_t lastNodeIdx = _storageNodeCount - 1;
uint16_t executorNode = indices[0];
- //std::cout << "\n----\n";
std::vector<MergeBucketCommand::Node> nodes;
for (int i = 0; i < _storageNodeCount; ++i) {
nodes.push_back(MergeBucketCommand::Node(indices[i], (i + executorNode) % 2 == 0));
- //std::cout << indices[i] << " ";
}
- //std::cout << "\n";
- std::shared_ptr<MergeBucketCommand> cmd(
- new MergeBucketCommand(makeDocumentBucket(bid), nodes, UINT_MAX, 123));
+ auto cmd = std::make_shared<MergeBucketCommand>(bucket, nodes, UINT_MAX, 123);
cmd->setPriority(7);
cmd->setTimeout(54321);
StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0);
@@ -351,8 +347,6 @@ MergeThrottlerTest::testChain()
_topLinks[i]->sendDown(fwd);
_topLinks[i]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime);
- //std::cout << "fwd " << i << " -> " << i+1 << "\n";
-
// Forwarded merge should not be sent down. Should not be necessary
// to lock throttler here, since it should be sleeping like a champion
CPPUNIT_ASSERT_EQUAL(std::size_t(0), _bottomLinks[i]->getNumCommands());
@@ -363,7 +357,6 @@ MergeThrottlerTest::testChain()
CPPUNIT_ASSERT_EQUAL(uint16_t(i + 1), fwd->getAddress()->getIndex());
CPPUNIT_ASSERT_EQUAL(distributorIndex, dynamic_cast<const StorageCommand&>(*fwd).getSourceIndex());
{
- //uint16_t chain[] = { 0 };
std::vector<uint16_t> chain;
for (int j = 0; j <= i; ++j) {
chain.push_back(j);
@@ -416,10 +409,10 @@ MergeThrottlerTest::testChain()
// The MergeBucketCommand that is kept in the executor node should
// be the one from the node it initially got it from, NOT the one
// from the last node, since the chain has looped
- CPPUNIT_ASSERT(_throttlers[executorNode]->getActiveMerges().find(bid)
+ CPPUNIT_ASSERT(_throttlers[executorNode]->getActiveMerges().find(bucket)
!= _throttlers[executorNode]->getActiveMerges().end());
CPPUNIT_ASSERT_EQUAL(static_cast<StorageMessage*>(fwdToExec.get()),
- _throttlers[executorNode]->getActiveMerges().find(bid)->second.getMergeCmd().get());
+ _throttlers[executorNode]->getActiveMerges().find(bucket)->second.getMergeCmd().get());
}
// Send reply up from persistence layer to simulate a completed
@@ -440,7 +433,7 @@ MergeThrottlerTest::testChain()
// Merge should not be removed yet from executor, since it's pending an unwind
CPPUNIT_ASSERT_EQUAL(std::size_t(1), _throttlers[executorNode]->getActiveMerges().size());
CPPUNIT_ASSERT_EQUAL(static_cast<StorageMessage*>(fwdToExec.get()),
- _throttlers[executorNode]->getActiveMerges().find(bid)->second.getMergeCmd().get());
+ _throttlers[executorNode]->getActiveMerges().find(bucket)->second.getMergeCmd().get());
}
// MergeBucketReply waiting to be sent back to node 2. NOTE: we don't have any
// transport context stuff set up here to perform the reply mapping, so we
@@ -452,8 +445,6 @@ MergeThrottlerTest::testChain()
// eg: 0 -> 2 -> 1 -> 0. Or: 2 -> 1 -> 0 if no cycle
for (int i = (executorNode != lastNodeIdx ? _storageNodeCount - 1 : _storageNodeCount - 2); i >= 0; --i) {
- //std::cout << "unwind " << i << "\n";
-
_topLinks[i]->sendDown(unwind);
_topLinks[i]->waitForMessage(MessageType::MERGEBUCKET_REPLY, _messageWaitTime);
@@ -469,7 +460,7 @@ MergeThrottlerTest::testChain()
CPPUNIT_ASSERT_EQUAL(ReturnCode::OK, mbr.getResult().getResult());
CPPUNIT_ASSERT_EQUAL(vespalib::string("Great success! :D-|-<"), mbr.getResult().getMessage());
- CPPUNIT_ASSERT_EQUAL(bid, mbr.getBucketId());
+ CPPUNIT_ASSERT_EQUAL(bucket, mbr.getBucket());
} while (std::next_permutation(indices, indices + _storageNodeCount));
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.cpp b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
index be30c459bdf..60dedab5ce8 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.cpp
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
@@ -431,7 +431,7 @@ bool
MergeThrottler::isMergeAlreadyKnown(const api::StorageMessage::SP& msg) const
{
auto& mergeCmd = static_cast<const api::MergeBucketCommand&>(*msg);
- return _merges.find(mergeCmd.getBucketId()) != _merges.end();
+ return _merges.find(mergeCmd.getBucket()) != _merges.end();
}
bool
@@ -830,10 +830,8 @@ MergeThrottler::processNewMergeCommand(
// and that we can fit it into our window.
// Register the merge now so that it will contribute to filling up our
// merge throttling window.
- assert(_merges.find(mergeCmd.getBucketId()) == _merges.end());
- auto state = _merges.insert(
- std::make_pair(mergeCmd.getBucketId(),
- ChainedMergeState(msg))).first;
+ assert(_merges.find(mergeCmd.getBucket()) == _merges.end());
+ auto state = _merges.emplace(mergeCmd.getBucket(), ChainedMergeState(msg)).first;
LOG(debug, "Added merge %s to internal state",
mergeCmd.toString().c_str());
@@ -911,7 +909,7 @@ MergeThrottler::processCycledMergeCommand(
MergeNodeSequence nodeSeq(mergeCmd, _component.getIndex());
- auto mergeIter = _merges.find(mergeCmd.getBucketId());
+ auto mergeIter = _merges.find(mergeCmd.getBucket());
assert(mergeIter != _merges.end());
if (mergeIter->second.isAborted()) {
@@ -964,7 +962,7 @@ MergeThrottler::processMergeReply(
{
auto& mergeReply = dynamic_cast<const api::MergeBucketReply&>(*msg);
- auto mergeIter = _merges.find(mergeReply.getBucketId());
+ auto mergeIter = _merges.find(mergeReply.getBucket());
if (mergeIter == _merges.end()) {
LOG(warning, "Received %s, which has no command mapped "
"for it. Cannot send chained reply!",
@@ -1075,7 +1073,7 @@ MergeThrottler::onDown(const std::shared_ptr<api::StorageMessage>& msg)
} else if (isDiffCommand(*msg)) {
vespalib::LockGuard lock(_stateLock);
auto& cmd = static_cast<api::StorageCommand&>(*msg);
- if (bucketIsUnknownOrAborted(cmd.getBucketId())) {
+ if (bucketIsUnknownOrAborted(cmd.getBucket())) {
sendUp(makeAbortReply(cmd, "no state recorded for bucket in merge "
"throttler, source merge probably aborted earlier"));
return true;
@@ -1104,7 +1102,7 @@ MergeThrottler::isMergeReply(const api::StorageMessage& msg) const
}
bool
-MergeThrottler::bucketIsUnknownOrAborted(const document::BucketId& bucket) const
+MergeThrottler::bucketIsUnknownOrAborted(const document::Bucket& bucket) const
{
auto it = _merges.find(bucket);
if (it == _merges.end()) {
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.h b/storage/src/vespa/storage/storageserver/mergethrottler.h
index 69fdfdc1b95..d62e9a042b2 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.h
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.h
@@ -13,7 +13,7 @@
#include <vespa/storage/distributor/messageguard.h>
#include <vespa/storageframework/generic/status/htmlstatusreporter.h>
#include <vespa/storageapi/message/bucket.h>
-#include <vespa/document/bucket/bucketid.h>
+#include <vespa/document/bucket/bucket.h>
#include <vespa/vespalib/util/document_runnable.h>
#include <vespa/messagebus/staticthrottlepolicy.h>
#include <vespa/metrics/metrics.h>
@@ -134,7 +134,7 @@ private:
const std::string& getMergeCmdString() const { return _cmdString; }
};
- typedef std::map<document::BucketId, ChainedMergeState> ActiveMergeMap;
+ typedef std::map<document::Bucket, ChainedMergeState> ActiveMergeMap;
// Use a set rather than a priority_queue, since we want to be
// able to iterate over the collection during status rendering
@@ -371,7 +371,7 @@ private:
bool isDiffCommand(const api::StorageMessage& msg) const;
bool isMergeCommand(const api::StorageMessage& msg) const;
bool isMergeReply(const api::StorageMessage& msg) const;
- bool bucketIsUnknownOrAborted(const document::BucketId& bucket) const;
+ bool bucketIsUnknownOrAborted(const document::Bucket& bucket) const;
std::shared_ptr<api::StorageMessage> makeAbortReply(
api::StorageCommand& cmd,