summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahoo-inc.com>2017-08-30 14:28:52 +0000
committerTor Brede Vekterli <vekterli@yahoo-inc.com>2017-08-30 14:28:52 +0000
commit6d220d01be0b76921ffd0de9ecfd8256db5c176b (patch)
tree49ab123532b18b8475988e72dd74906fc570f286 /storage
parent605a60c9a2c95d7a67e674995a455c5c5f7f3545 (diff)
Do not throttle source only merges during back-pressure
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/storageserver/mergethrottlertest.cpp24
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.cpp15
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.h1
3 files changed, 34 insertions, 6 deletions
diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp
index ec16b76bf9a..5e00ba373d2 100644
--- a/storage/src/tests/storageserver/mergethrottlertest.cpp
+++ b/storage/src/tests/storageserver/mergethrottlertest.cpp
@@ -11,6 +11,7 @@
#include <vespa/storageapi/message/bucket.h>
#include <vespa/storageapi/message/state.h>
#include <vespa/vespalib/util/exceptions.h>
+#include <unordered_set>
#include <memory>
#include <iterator>
#include <vector>
@@ -30,6 +31,7 @@ struct MergeBuilder {
api::Timestamp _maxTimestamp;
std::vector<uint16_t> _nodes;
std::vector<uint16_t> _chain;
+ std::unordered_set<uint16_t> _source_only;
uint64_t _clusterStateVersion;
MergeBuilder(const document::BucketId& bucket)
@@ -84,11 +86,17 @@ struct MergeBuilder {
_chain.push_back(n2);
return *this;
}
+ MergeBuilder& source_only(uint16_t node) {
+ _source_only.insert(node);
+ return *this;
+ }
api::MergeBucketCommand::SP create() const {
std::vector<api::MergeBucketCommand::Node> n;
for (uint32_t i = 0; i < _nodes.size(); ++i) {
- n.push_back(_nodes[i]);
+ uint16_t node = _nodes[i];
+ bool source_only = (_source_only.find(node) != _source_only.end());
+ n.emplace_back(node, source_only);
}
std::shared_ptr<MergeBucketCommand> cmd(
new MergeBucketCommand(_bucket, n, _maxTimestamp,
@@ -137,6 +145,7 @@ class MergeThrottlerTest : public CppUnit::TestFixture {
CPPUNIT_TEST(testApplyBucketDiffCommandNotInActiveSetIsRejected);
CPPUNIT_TEST(testNewClusterStateAbortsAllOutdatedActiveMerges);
CPPUNIT_TEST(backpressure_busy_bounces_merges_for_configured_duration);
+ CPPUNIT_TEST(source_only_merges_are_not_affected_by_backpressure);
CPPUNIT_TEST_SUITE_END();
public:
void setUp() override;
@@ -167,6 +176,7 @@ public:
void testApplyBucketDiffCommandNotInActiveSetIsRejected();
void testNewClusterStateAbortsAllOutdatedActiveMerges();
void backpressure_busy_bounces_merges_for_configured_duration();
+ void source_only_merges_are_not_affected_by_backpressure();
private:
static const int _storageNodeCount = 3;
static const int _messageWaitTime = 100;
@@ -1568,7 +1578,6 @@ MergeThrottlerTest::testNewClusterStateAbortsAllOutdatedActiveMerges()
void MergeThrottlerTest::backpressure_busy_bounces_merges_for_configured_duration() {
_servers[0]->getClock().setAbsoluteTimeInSeconds(1000);
_throttlers[0]->applyTimedBackpressure();
-
document::BucketId bucket(16, 6789);
CPPUNIT_ASSERT_EQUAL(uint64_t(0), _throttlers[0]->getMetrics().bounced_due_to_back_pressure.getValue());
@@ -1590,7 +1599,16 @@ void MergeThrottlerTest::backpressure_busy_bounces_merges_for_configured_duratio
CPPUNIT_ASSERT_EQUAL(uint64_t(1), _throttlers[0]->getMetrics().bounced_due_to_back_pressure.getValue());
}
-// TODO test source only merges allowed through
+void MergeThrottlerTest::source_only_merges_are_not_affected_by_backpressure() {
+ _servers[2]->getClock().setAbsoluteTimeInSeconds(1000);
+ _throttlers[2]->applyTimedBackpressure();
+ document::BucketId bucket(16, 6789);
+
+ _topLinks[2]->sendDown(MergeBuilder(bucket).clusterStateVersion(10).chain(0, 1).source_only(2).create());
+ _topLinks[2]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime);
+
+ CPPUNIT_ASSERT_EQUAL(uint64_t(0), _throttlers[0]->getMetrics().bounced_due_to_back_pressure.getValue());
+}
// TODO test message queue aborting (use rendezvous functionality--make guard)
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.cpp b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
index 62dd6170825..cd3028dc6ba 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.cpp
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
@@ -654,18 +654,27 @@ MergeThrottler::run(framework::ThreadHandle& thread)
}
bool MergeThrottler::merge_is_backpressure_throttled(const api::MergeBucketCommand& cmd) const {
- (void) cmd;
if (_throttle_until_time.time_since_epoch().count() == 0) {
- return false; // Avoid sampling the clock if throttling time is zeroed already.
+ return false;
+ }
+ if (merge_has_this_node_as_source_only_node(cmd)) {
+ return false;
}
- // TODO source only bypass for own node
if (_component.getClock().getMonotonicTime() < _throttle_until_time) {
return true;
}
+ // Avoid sampling the clock when it can't do anything useful.
_throttle_until_time = std::chrono::steady_clock::time_point{};
return false;
}
+bool MergeThrottler::merge_has_this_node_as_source_only_node(const api::MergeBucketCommand& cmd) const {
+ auto self_is_source_only = [self = _component.getIndex()](auto& node) {
+ return (node.index == self) && node.sourceOnly;
+ };
+ return std::any_of(cmd.getNodes().begin(), cmd.getNodes().end(), self_is_source_only);
+}
+
void MergeThrottler::bounce_backpressure_throttled_merge(const api::MergeBucketCommand& cmd, MessageGuard& guard) {
sendReply(cmd, api::ReturnCode(api::ReturnCode::BUSY,
"Node is throttling merges due to resource exhaustion back-pressure"),
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.h b/storage/src/vespa/storage/storageserver/mergethrottler.h
index 22c3c106b37..67bb01dd8fa 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.h
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.h
@@ -336,6 +336,7 @@ private:
bool merge_is_backpressure_throttled(const api::MergeBucketCommand& cmd) const;
void bounce_backpressure_throttled_merge(const api::MergeBucketCommand& cmd, MessageGuard& guard);
+ bool merge_has_this_node_as_source_only_node(const api::MergeBucketCommand& cmd) const;
void sendReply(const api::MergeBucketCommand& cmd,
const api::ReturnCode& result,