diff options
author | Tor Brede Vekterli <vekterli@yahoo-inc.com> | 2017-11-14 16:18:28 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@yahoo-inc.com> | 2017-11-14 16:18:28 +0000 |
commit | b58dee9562801107fafac6bb3fb2d3b8a998d07a (patch) | |
tree | 9eb9de29b89c9421d01212805a65e5b217bae8ac /storage | |
parent | 3c2a0964ff6b1caed437fba425c40467560b3c37 (diff) |
Track merges by Bucket instead of BucketId in MergeThrottler
Diffstat (limited to 'storage')
3 files changed, 16 insertions, 27 deletions
diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp index 7d04714e9a8..3d469fc4252 100644 --- a/storage/src/tests/storageserver/mergethrottlertest.cpp +++ b/storage/src/tests/storageserver/mergethrottlertest.cpp @@ -313,7 +313,7 @@ MergeThrottlerTest::testChain() _servers[i]->setClusterState(lib::ClusterState("distributor:100 storage:100 version:123")); } - BucketId bid(14, 0x1337); + Bucket bucket(makeDocumentBucket(BucketId(14, 0x1337))); // Use different node permutations to ensure it works no matter which node is // set as the executor. More specifically, _all_ permutations. @@ -321,15 +321,11 @@ MergeThrottlerTest::testChain() uint16_t lastNodeIdx = _storageNodeCount - 1; uint16_t executorNode = indices[0]; - //std::cout << "\n----\n"; std::vector<MergeBucketCommand::Node> nodes; for (int i = 0; i < _storageNodeCount; ++i) { nodes.push_back(MergeBucketCommand::Node(indices[i], (i + executorNode) % 2 == 0)); - //std::cout << indices[i] << " "; } - //std::cout << "\n"; - std::shared_ptr<MergeBucketCommand> cmd( - new MergeBucketCommand(makeDocumentBucket(bid), nodes, UINT_MAX, 123)); + auto cmd = std::make_shared<MergeBucketCommand>(bucket, nodes, UINT_MAX, 123); cmd->setPriority(7); cmd->setTimeout(54321); StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0); @@ -351,8 +347,6 @@ MergeThrottlerTest::testChain() _topLinks[i]->sendDown(fwd); _topLinks[i]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); - //std::cout << "fwd " << i << " -> " << i+1 << "\n"; - // Forwarded merge should not be sent down. Should not be necessary // to lock throttler here, since it should be sleeping like a champion CPPUNIT_ASSERT_EQUAL(std::size_t(0), _bottomLinks[i]->getNumCommands()); @@ -363,7 +357,6 @@ MergeThrottlerTest::testChain() CPPUNIT_ASSERT_EQUAL(uint16_t(i + 1), fwd->getAddress()->getIndex()); CPPUNIT_ASSERT_EQUAL(distributorIndex, dynamic_cast<const StorageCommand&>(*fwd).getSourceIndex()); { - //uint16_t chain[] = { 0 }; std::vector<uint16_t> chain; for (int j = 0; j <= i; ++j) { chain.push_back(j); @@ -416,10 +409,10 @@ MergeThrottlerTest::testChain() // The MergeBucketCommand that is kept in the executor node should // be the one from the node it initially got it from, NOT the one // from the last node, since the chain has looped - CPPUNIT_ASSERT(_throttlers[executorNode]->getActiveMerges().find(bid) + CPPUNIT_ASSERT(_throttlers[executorNode]->getActiveMerges().find(bucket) != _throttlers[executorNode]->getActiveMerges().end()); CPPUNIT_ASSERT_EQUAL(static_cast<StorageMessage*>(fwdToExec.get()), - _throttlers[executorNode]->getActiveMerges().find(bid)->second.getMergeCmd().get()); + _throttlers[executorNode]->getActiveMerges().find(bucket)->second.getMergeCmd().get()); } // Send reply up from persistence layer to simulate a completed @@ -440,7 +433,7 @@ MergeThrottlerTest::testChain() // Merge should not be removed yet from executor, since it's pending an unwind CPPUNIT_ASSERT_EQUAL(std::size_t(1), _throttlers[executorNode]->getActiveMerges().size()); CPPUNIT_ASSERT_EQUAL(static_cast<StorageMessage*>(fwdToExec.get()), - _throttlers[executorNode]->getActiveMerges().find(bid)->second.getMergeCmd().get()); + _throttlers[executorNode]->getActiveMerges().find(bucket)->second.getMergeCmd().get()); } // MergeBucketReply waiting to be sent back to node 2. NOTE: we don't have any // transport context stuff set up here to perform the reply mapping, so we @@ -452,8 +445,6 @@ MergeThrottlerTest::testChain() // eg: 0 -> 2 -> 1 -> 0. Or: 2 -> 1 -> 0 if no cycle for (int i = (executorNode != lastNodeIdx ? _storageNodeCount - 1 : _storageNodeCount - 2); i >= 0; --i) { - //std::cout << "unwind " << i << "\n"; - _topLinks[i]->sendDown(unwind); _topLinks[i]->waitForMessage(MessageType::MERGEBUCKET_REPLY, _messageWaitTime); @@ -469,7 +460,7 @@ MergeThrottlerTest::testChain() CPPUNIT_ASSERT_EQUAL(ReturnCode::OK, mbr.getResult().getResult()); CPPUNIT_ASSERT_EQUAL(vespalib::string("Great success! :D-|-<"), mbr.getResult().getMessage()); - CPPUNIT_ASSERT_EQUAL(bid, mbr.getBucketId()); + CPPUNIT_ASSERT_EQUAL(bucket, mbr.getBucket()); } while (std::next_permutation(indices, indices + _storageNodeCount)); diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.cpp b/storage/src/vespa/storage/storageserver/mergethrottler.cpp index be30c459bdf..60dedab5ce8 100644 --- a/storage/src/vespa/storage/storageserver/mergethrottler.cpp +++ b/storage/src/vespa/storage/storageserver/mergethrottler.cpp @@ -431,7 +431,7 @@ bool MergeThrottler::isMergeAlreadyKnown(const api::StorageMessage::SP& msg) const { auto& mergeCmd = static_cast<const api::MergeBucketCommand&>(*msg); - return _merges.find(mergeCmd.getBucketId()) != _merges.end(); + return _merges.find(mergeCmd.getBucket()) != _merges.end(); } bool @@ -830,10 +830,8 @@ MergeThrottler::processNewMergeCommand( // and that we can fit it into our window. // Register the merge now so that it will contribute to filling up our // merge throttling window. - assert(_merges.find(mergeCmd.getBucketId()) == _merges.end()); - auto state = _merges.insert( - std::make_pair(mergeCmd.getBucketId(), - ChainedMergeState(msg))).first; + assert(_merges.find(mergeCmd.getBucket()) == _merges.end()); + auto state = _merges.emplace(mergeCmd.getBucket(), ChainedMergeState(msg)).first; LOG(debug, "Added merge %s to internal state", mergeCmd.toString().c_str()); @@ -911,7 +909,7 @@ MergeThrottler::processCycledMergeCommand( MergeNodeSequence nodeSeq(mergeCmd, _component.getIndex()); - auto mergeIter = _merges.find(mergeCmd.getBucketId()); + auto mergeIter = _merges.find(mergeCmd.getBucket()); assert(mergeIter != _merges.end()); if (mergeIter->second.isAborted()) { @@ -964,7 +962,7 @@ MergeThrottler::processMergeReply( { auto& mergeReply = dynamic_cast<const api::MergeBucketReply&>(*msg); - auto mergeIter = _merges.find(mergeReply.getBucketId()); + auto mergeIter = _merges.find(mergeReply.getBucket()); if (mergeIter == _merges.end()) { LOG(warning, "Received %s, which has no command mapped " "for it. Cannot send chained reply!", @@ -1075,7 +1073,7 @@ MergeThrottler::onDown(const std::shared_ptr<api::StorageMessage>& msg) } else if (isDiffCommand(*msg)) { vespalib::LockGuard lock(_stateLock); auto& cmd = static_cast<api::StorageCommand&>(*msg); - if (bucketIsUnknownOrAborted(cmd.getBucketId())) { + if (bucketIsUnknownOrAborted(cmd.getBucket())) { sendUp(makeAbortReply(cmd, "no state recorded for bucket in merge " "throttler, source merge probably aborted earlier")); return true; @@ -1104,7 +1102,7 @@ MergeThrottler::isMergeReply(const api::StorageMessage& msg) const } bool -MergeThrottler::bucketIsUnknownOrAborted(const document::BucketId& bucket) const +MergeThrottler::bucketIsUnknownOrAborted(const document::Bucket& bucket) const { auto it = _merges.find(bucket); if (it == _merges.end()) { diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.h b/storage/src/vespa/storage/storageserver/mergethrottler.h index 69fdfdc1b95..d62e9a042b2 100644 --- a/storage/src/vespa/storage/storageserver/mergethrottler.h +++ b/storage/src/vespa/storage/storageserver/mergethrottler.h @@ -13,7 +13,7 @@ #include <vespa/storage/distributor/messageguard.h> #include <vespa/storageframework/generic/status/htmlstatusreporter.h> #include <vespa/storageapi/message/bucket.h> -#include <vespa/document/bucket/bucketid.h> +#include <vespa/document/bucket/bucket.h> #include <vespa/vespalib/util/document_runnable.h> #include <vespa/messagebus/staticthrottlepolicy.h> #include <vespa/metrics/metrics.h> @@ -134,7 +134,7 @@ private: const std::string& getMergeCmdString() const { return _cmdString; } }; - typedef std::map<document::BucketId, ChainedMergeState> ActiveMergeMap; + typedef std::map<document::Bucket, ChainedMergeState> ActiveMergeMap; // Use a set rather than a priority_queue, since we want to be // able to iterate over the collection during status rendering @@ -371,7 +371,7 @@ private: bool isDiffCommand(const api::StorageMessage& msg) const; bool isMergeCommand(const api::StorageMessage& msg) const; bool isMergeReply(const api::StorageMessage& msg) const; - bool bucketIsUnknownOrAborted(const document::BucketId& bucket) const; + bool bucketIsUnknownOrAborted(const document::Bucket& bucket) const; std::shared_ptr<api::StorageMessage> makeAbortReply( api::StorageCommand& cmd, |