summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahoo-inc.com>2017-09-15 10:45:20 +0200
committerGitHub <noreply@github.com>2017-09-15 10:45:20 +0200
commit02fc546e8545be4d466c178c838b382657becbcb (patch)
tree956c8ec5a211df460ad1b9ee46b0ebce0baad7d7
parentb0154a2c2259e67b2923823daf52724bebee1c07 (diff)
parent42856470b2f16ae049fe25a089ee151ca1a9d6d2 (diff)
Merge pull request #3412 from vespa-engine/vekterli/inhibit-merge-scheduling-towards-busy-nodes
Inhibit scheduling of merges towards nodes that are marked busy
-rw-r--r--storage/src/tests/distributor/distributortest.cpp48
-rw-r--r--storage/src/tests/distributor/mergeoperationtest.cpp25
-rw-r--r--storage/src/tests/distributor/nodeinfotest.cpp6
-rw-r--r--storage/src/tests/distributor/pendingmessagetrackertest.cpp139
-rw-r--r--storage/src/vespa/storage/config/distributorconfiguration.cpp7
-rw-r--r--storage/src/vespa/storage/config/distributorconfiguration.h6
-rw-r--r--storage/src/vespa/storage/config/stor-distributormanager.def4
-rw-r--r--storage/src/vespa/storage/distributor/distributor.cpp10
-rw-r--r--storage/src/vespa/storage/distributor/nodeinfo.cpp53
-rw-r--r--storage/src/vespa/storage/distributor/nodeinfo.h9
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp27
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h3
-rw-r--r--storage/src/vespa/storage/distributor/pendingmessagetracker.cpp14
-rw-r--r--storage/src/vespa/storage/distributor/pendingmessagetracker.h8
-rw-r--r--storage/src/vespa/storage/distributor/throttlingoperationstarter.cpp2
15 files changed, 234 insertions, 127 deletions
diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp
index bfa1181eca1..3aedd31f574 100644
--- a/storage/src/tests/distributor/distributortest.cpp
+++ b/storage/src/tests/distributor/distributortest.cpp
@@ -46,6 +46,8 @@ class Distributor_Test : public CppUnit::TestFixture,
CPPUNIT_TEST(max_clock_skew_config_is_propagated_to_distributor_config);
CPPUNIT_TEST(configured_safe_time_point_rejection_works_end_to_end);
CPPUNIT_TEST(sequencing_config_is_propagated_to_distributor_config);
+ CPPUNIT_TEST(merge_busy_inhibit_duration_config_is_propagated_to_distributor_config);
+ CPPUNIT_TEST(merge_busy_inhibit_duration_is_propagated_to_pending_message_tracker);
CPPUNIT_TEST_SUITE_END();
protected:
@@ -73,6 +75,8 @@ protected:
void max_clock_skew_config_is_propagated_to_distributor_config();
void configured_safe_time_point_rejection_works_end_to_end();
void sequencing_config_is_propagated_to_distributor_config();
+ void merge_busy_inhibit_duration_config_is_propagated_to_distributor_config();
+ void merge_busy_inhibit_duration_is_propagated_to_pending_message_tracker();
public:
void setUp() override {
@@ -177,6 +181,7 @@ private:
void assertSingleBouncedRemoveReplyPresent();
void assertNoMessageBounced();
void configure_mutation_sequencing(bool enabled);
+ void configure_merge_busy_inhibit_duration(int seconds);
};
CPPUNIT_TEST_SUITE_REGISTRATION(Distributor_Test);
@@ -819,6 +824,49 @@ void Distributor_Test::sequencing_config_is_propagated_to_distributor_config() {
CPPUNIT_ASSERT(getConfig().getSequenceMutatingOperations());
}
+void
+Distributor_Test::configure_merge_busy_inhibit_duration(int seconds) {
+ using namespace vespa::config::content::core;
+ using ConfigBuilder = StorDistributormanagerConfigBuilder;
+
+ ConfigBuilder builder;
+ builder.inhibitMergeSendingOnBusyNodeDurationSec = seconds;
+ getConfig().configure(builder);
+ _distributor->enableNextConfig();
+}
+
+void Distributor_Test::merge_busy_inhibit_duration_config_is_propagated_to_distributor_config() {
+ setupDistributor(Redundancy(2), NodeCount(2), "storage:2 distributor:1");
+
+ configure_merge_busy_inhibit_duration(7);
+ CPPUNIT_ASSERT(getConfig().getInhibitMergesOnBusyNodeDuration() == std::chrono::seconds(7));
+}
+
+void Distributor_Test::merge_busy_inhibit_duration_is_propagated_to_pending_message_tracker() {
+ setupDistributor(Redundancy(2), NodeCount(2), "storage:1 distributor:1");
+ addNodesToBucketDB(document::BucketId(16, 1), "0=1/1/1/t");
+
+ configure_merge_busy_inhibit_duration(100);
+ auto cmd = makeDummyRemoveCommand(); // Remove is for bucket 1
+ _distributor->handleMessage(cmd);
+
+ // Should send to content node 0
+ CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.commands.size());
+ CPPUNIT_ASSERT_EQUAL(api::MessageType::REMOVE, _sender.commands[0]->getType());
+ auto& fwd_cmd = dynamic_cast<api::RemoveCommand&>(*_sender.commands[0]);
+ auto reply = fwd_cmd.makeReply();
+ reply->setResult(api::ReturnCode(api::ReturnCode::BUSY));
+ _distributor->handleReply(std::shared_ptr<api::StorageReply>(std::move(reply)));
+
+ auto& node_info = _distributor->getPendingMessageTracker().getNodeInfo();
+
+ CPPUNIT_ASSERT(node_info.isBusy(0));
+ getClock().addSecondsToTime(99);
+ CPPUNIT_ASSERT(node_info.isBusy(0));
+ getClock().addSecondsToTime(2);
+ CPPUNIT_ASSERT(!node_info.isBusy(0));
+}
+
}
}
diff --git a/storage/src/tests/distributor/mergeoperationtest.cpp b/storage/src/tests/distributor/mergeoperationtest.cpp
index b298bec3977..7e7ca52635e 100644
--- a/storage/src/tests/distributor/mergeoperationtest.cpp
+++ b/storage/src/tests/distributor/mergeoperationtest.cpp
@@ -28,6 +28,7 @@ class MergeOperationTest : public CppUnit::TestFixture,
CPPUNIT_TEST(testMarkRedundantTrustedCopiesAsSourceOnly);
CPPUNIT_TEST(onlyMarkRedundantRetiredReplicasAsSourceOnly);
CPPUNIT_TEST(mark_post_merge_redundant_replicas_source_only);
+ CPPUNIT_TEST(merge_operation_is_blocked_by_any_busy_target_node);
CPPUNIT_TEST_SUITE_END();
std::unique_ptr<PendingMessageTracker> _pendingTracker;
@@ -41,6 +42,7 @@ protected:
void testMarkRedundantTrustedCopiesAsSourceOnly();
void onlyMarkRedundantRetiredReplicasAsSourceOnly();
void mark_post_merge_redundant_replicas_source_only();
+ void merge_operation_is_blocked_by_any_busy_target_node();
public:
void setUp() override {
@@ -478,5 +480,28 @@ void MergeOperationTest::mark_post_merge_redundant_replicas_source_only() {
getNodeList("storage:10", 4, "3,5,7,6"));
}
+void MergeOperationTest::merge_operation_is_blocked_by_any_busy_target_node() {
+ getClock().setAbsoluteTimeInSeconds(10);
+ addNodesToBucketDB(document::BucketId(16, 1), "0=10/1/1/t,1=20/1/1,2=10/1/1/t");
+ _distributor->enableClusterState(lib::ClusterState("distributor:1 storage:3"));
+ MergeOperation op(BucketAndNodes(document::BucketId(16, 1), toVector<uint16_t>(0, 1, 2)));
+ op.setIdealStateManager(&getIdealStateManager());
+
+ // Should not block on nodes _not_ included in operation node set
+ _pendingTracker->getNodeInfo().setBusy(3, std::chrono::seconds(10));
+ CPPUNIT_ASSERT(!op.isBlocked(*_pendingTracker));
+
+ // Node 1 is included in operation node set and should cause a block
+ _pendingTracker->getNodeInfo().setBusy(0, std::chrono::seconds(10));
+ CPPUNIT_ASSERT(op.isBlocked(*_pendingTracker));
+
+ getClock().addSecondsToTime(11);
+ CPPUNIT_ASSERT(!op.isBlocked(*_pendingTracker)); // No longer busy
+
+ // Should block on other operation nodes than the first listed as well
+ _pendingTracker->getNodeInfo().setBusy(1, std::chrono::seconds(10));
+ CPPUNIT_ASSERT(op.isBlocked(*_pendingTracker));
+}
+
} // distributor
} // storage
diff --git a/storage/src/tests/distributor/nodeinfotest.cpp b/storage/src/tests/distributor/nodeinfotest.cpp
index a3eaf2505b3..0363f25831a 100644
--- a/storage/src/tests/distributor/nodeinfotest.cpp
+++ b/storage/src/tests/distributor/nodeinfotest.cpp
@@ -57,11 +57,11 @@ NodeInfoTest::testSimple()
CPPUNIT_ASSERT_EQUAL(1, (int)info.getPendingCount(7));
CPPUNIT_ASSERT_EQUAL(0, (int)info.getPendingCount(5));
- info.setBusy(5);
+ info.setBusy(5, std::chrono::seconds(60));
clock.addSecondsToTime(10);
- info.setBusy(1);
+ info.setBusy(1, std::chrono::seconds(60));
clock.addSecondsToTime(20);
- info.setBusy(42);
+ info.setBusy(42, std::chrono::seconds(60));
CPPUNIT_ASSERT_EQUAL(true, info.isBusy(5));
CPPUNIT_ASSERT_EQUAL(true, info.isBusy(1));
diff --git a/storage/src/tests/distributor/pendingmessagetrackertest.cpp b/storage/src/tests/distributor/pendingmessagetrackertest.cpp
index cda9a9e3782..fa36b4e6305 100644
--- a/storage/src/tests/distributor/pendingmessagetrackertest.cpp
+++ b/storage/src/tests/distributor/pendingmessagetrackertest.cpp
@@ -13,12 +13,10 @@
namespace storage {
namespace distributor {
-// Workaround typedef for not (yet) running with --std=c++14 which supports
-// user defined literals. Once we do, replace ms(123) with 123ms.
-using ms = std::chrono::milliseconds;
+using namespace std::chrono_literals;
-class PendingMessageTrackerCallback_Test : public CppUnit::TestFixture {
- CPPUNIT_TEST_SUITE(PendingMessageTrackerCallback_Test);
+class PendingMessageTrackerTest : public CppUnit::TestFixture {
+ CPPUNIT_TEST_SUITE(PendingMessageTrackerTest);
CPPUNIT_TEST(testSimple);
CPPUNIT_TEST(testMultipleMessages);
CPPUNIT_TEST(testStartPage);
@@ -36,6 +34,8 @@ class PendingMessageTrackerCallback_Test : public CppUnit::TestFixture {
CPPUNIT_TEST(timeTravellingClockLatenciesNotRegistered);
CPPUNIT_TEST(statsSnapshotIncludesAllNodes);
CPPUNIT_TEST(latencyProviderForwardsToImplementation);
+ CPPUNIT_TEST(busy_reply_marks_node_as_busy);
+ CPPUNIT_TEST(busy_node_duration_can_be_adjusted);
CPPUNIT_TEST_SUITE_END();
public:
@@ -56,6 +56,8 @@ public:
void timeTravellingClockLatenciesNotRegistered();
void statsSnapshotIncludesAllNodes();
void latencyProviderForwardsToImplementation();
+ void busy_reply_marks_node_as_busy();
+ void busy_node_duration_can_be_adjusted();
private:
void insertMessages(PendingMessageTracker& tracker);
@@ -122,10 +124,12 @@ public:
}
void sendPutReply(api::PutCommand& putCmd,
- const RequestBuilder& builder)
+ const RequestBuilder& builder,
+ const api::ReturnCode& result = api::ReturnCode())
{
assignMockedTime(builder.atTime());
auto putReply = putCmd.makeReply();
+ putReply->setResult(result);
_tracker->reply(*putReply);
}
@@ -149,8 +153,8 @@ public:
void sendPutAndReplyWithLatency(uint16_t node,
std::chrono::milliseconds latency)
{
- auto put = sendPut(RequestBuilder().atTime(ms(1000)).toNode(node));
- sendPutReply(*put, RequestBuilder().atTime(ms(1000) + latency));
+ auto put = sendPut(RequestBuilder().atTime(1000ms).toNode(node));
+ sendPutReply(*put, RequestBuilder().atTime(1000ms + latency));
}
OperationStats getNodePutOperationStats(uint16_t node) {
@@ -158,6 +162,7 @@ public:
}
PendingMessageTracker& tracker() { return *_tracker; }
+ auto& clock() { return _clock; }
private:
std::string createDummyIdString(const document::BucketId& bucket) const {
@@ -215,17 +220,16 @@ Fixture::Fixture()
_clock.setAbsoluteTimeInSeconds(1);
// Have to set clock in compReg before constructing tracker, or it'll
// flip out and die on an explicit nullptr check.
- _tracker = std::unique_ptr<PendingMessageTracker>(
- new PendingMessageTracker(_compReg));
+ _tracker = std::make_unique<PendingMessageTracker>(_compReg);
}
Fixture::~Fixture() {}
}
-CPPUNIT_TEST_SUITE_REGISTRATION(PendingMessageTrackerCallback_Test);
+CPPUNIT_TEST_SUITE_REGISTRATION(PendingMessageTrackerTest);
void
-PendingMessageTrackerCallback_Test::testSimple()
+PendingMessageTrackerTest::testSimple()
{
StorageComponentRegisterImpl compReg;
framework::defaultimplementation::FakeClock clock;
@@ -269,7 +273,7 @@ PendingMessageTrackerCallback_Test::testSimple()
}
void
-PendingMessageTrackerCallback_Test::insertMessages(PendingMessageTracker& tracker)
+PendingMessageTrackerTest::insertMessages(PendingMessageTracker& tracker)
{
for (uint32_t i = 0; i < 4; i++) {
std::ostringstream ost;
@@ -294,7 +298,7 @@ PendingMessageTrackerCallback_Test::insertMessages(PendingMessageTracker& tracke
}
void
-PendingMessageTrackerCallback_Test::testStartPage()
+PendingMessageTrackerTest::testStartPage()
{
StorageComponentRegisterImpl compReg;
framework::defaultimplementation::FakeClock clock;
@@ -318,7 +322,7 @@ PendingMessageTrackerCallback_Test::testStartPage()
}
void
-PendingMessageTrackerCallback_Test::testMultipleMessages()
+PendingMessageTrackerTest::testMultipleMessages()
{
StorageComponentRegisterImpl compReg;
framework::defaultimplementation::FakeClock clock;
@@ -419,7 +423,7 @@ public:
}
void
-PendingMessageTrackerCallback_Test::testGetPendingMessageTypes()
+PendingMessageTrackerTest::testGetPendingMessageTypes()
{
StorageComponentRegisterImpl compReg;
framework::defaultimplementation::FakeClock clock;
@@ -456,7 +460,7 @@ PendingMessageTrackerCallback_Test::testGetPendingMessageTypes()
}
void
-PendingMessageTrackerCallback_Test::testHasPendingMessage()
+PendingMessageTrackerTest::testHasPendingMessage()
{
StorageComponentRegisterImpl compReg;
framework::defaultimplementation::FakeClock clock;
@@ -508,7 +512,7 @@ public:
} // anon ns
void
-PendingMessageTrackerCallback_Test::testGetAllMessagesForSingleBucket()
+PendingMessageTrackerTest::testGetAllMessagesForSingleBucket()
{
StorageComponentRegisterImpl compReg;
framework::defaultimplementation::FakeClock clock;
@@ -535,10 +539,10 @@ PendingMessageTrackerCallback_Test::testGetAllMessagesForSingleBucket()
}
void
-PendingMessageTrackerCallback_Test::nodeStatsCanBeOutputStreamed()
+PendingMessageTrackerTest::nodeStatsCanBeOutputStreamed()
{
NodeStats stats;
- stats.puts = makeOpStats(ms(56789), 10);
+ stats.puts = makeOpStats(56789ms, 10);
std::ostringstream os;
os << stats;
@@ -550,120 +554,143 @@ PendingMessageTrackerCallback_Test::nodeStatsCanBeOutputStreamed()
}
void
-PendingMessageTrackerCallback_Test::totalPutLatencyIsInitiallyZero()
+PendingMessageTrackerTest::totalPutLatencyIsInitiallyZero()
{
Fixture fixture;
- CPPUNIT_ASSERT_EQUAL(makeOpStats(ms(0), 0),
+ CPPUNIT_ASSERT_EQUAL(makeOpStats(0ms, 0),
fixture.getNodePutOperationStats(0));
}
void
-PendingMessageTrackerCallback_Test::statsNotAlteredBeforeReplyReceived()
+PendingMessageTrackerTest::statsNotAlteredBeforeReplyReceived()
{
Fixture fixture;
- fixture.sendPut(RequestBuilder().atTime(ms(1000)).toNode(0));
- CPPUNIT_ASSERT_EQUAL(makeOpStats(ms(0), 0),
+ fixture.sendPut(RequestBuilder().atTime(1000ms).toNode(0));
+ CPPUNIT_ASSERT_EQUAL(makeOpStats(0ms, 0),
fixture.getNodePutOperationStats(0));
}
void
-PendingMessageTrackerCallback_Test::totalPutLatencyIsTrackedForSingleRequest()
+PendingMessageTrackerTest::totalPutLatencyIsTrackedForSingleRequest()
{
Fixture fixture;
- fixture.sendPutAndReplyWithLatency(0, ms(500));
+ fixture.sendPutAndReplyWithLatency(0, 500ms);
- CPPUNIT_ASSERT_EQUAL(makeOpStats(ms(500), 1),
+ CPPUNIT_ASSERT_EQUAL(makeOpStats(500ms, 1),
fixture.getNodePutOperationStats(0));
}
void
-PendingMessageTrackerCallback_Test::statsAreTrackedSeparatelyPerNode()
+PendingMessageTrackerTest::statsAreTrackedSeparatelyPerNode()
{
Fixture fixture;
- fixture.sendPutAndReplyWithLatency(0, ms(500));
- fixture.sendPutAndReplyWithLatency(1, ms(600));
+ fixture.sendPutAndReplyWithLatency(0, 500ms);
+ fixture.sendPutAndReplyWithLatency(1, 600ms);
- CPPUNIT_ASSERT_EQUAL(makeOpStats(ms(500), 1),
+ CPPUNIT_ASSERT_EQUAL(makeOpStats(500ms, 1),
fixture.getNodePutOperationStats(0));
- CPPUNIT_ASSERT_EQUAL(makeOpStats(ms(600), 1),
+ CPPUNIT_ASSERT_EQUAL(makeOpStats(600ms, 1),
fixture.getNodePutOperationStats(1));
}
// Necessarily, this test will have to be altered when we add tracking of
// other message types as well.
void
-PendingMessageTrackerCallback_Test::onlyPutMessagesAreTracked()
+PendingMessageTrackerTest::onlyPutMessagesAreTracked()
{
Fixture fixture;
auto remove = fixture.sendRemove(
- RequestBuilder().atTime(ms(1000)).toNode(0));
- fixture.sendRemoveReply(*remove, RequestBuilder().atTime(ms(2000)));
- CPPUNIT_ASSERT_EQUAL(makeOpStats(ms(0), 0),
+ RequestBuilder().atTime(1000ms).toNode(0));
+ fixture.sendRemoveReply(*remove, RequestBuilder().atTime(2000ms));
+ CPPUNIT_ASSERT_EQUAL(makeOpStats(0ms, 0),
fixture.getNodePutOperationStats(0));
}
void
-PendingMessageTrackerCallback_Test::totalPutLatencyIsAggregatedAcrossRequests()
+PendingMessageTrackerTest::totalPutLatencyIsAggregatedAcrossRequests()
{
Fixture fixture;
// Model 2 concurrent puts to node 0.
- fixture.sendPutAndReplyWithLatency(0, ms(500));
- fixture.sendPutAndReplyWithLatency(0, ms(600));
- CPPUNIT_ASSERT_EQUAL(makeOpStats(ms(1100), 2),
+ fixture.sendPutAndReplyWithLatency(0, 500ms);
+ fixture.sendPutAndReplyWithLatency(0, 600ms);
+ CPPUNIT_ASSERT_EQUAL(makeOpStats(1100ms, 2),
fixture.getNodePutOperationStats(0));
}
void
-PendingMessageTrackerCallback_Test::clearingMessagesDoesNotAffectStats()
+PendingMessageTrackerTest::clearingMessagesDoesNotAffectStats()
{
Fixture fixture;
- fixture.sendPutAndReplyWithLatency(2, ms(2000));
+ fixture.sendPutAndReplyWithLatency(2, 2000ms);
fixture.tracker().clearMessagesForNode(2);
- CPPUNIT_ASSERT_EQUAL(makeOpStats(ms(2000), 1),
+ CPPUNIT_ASSERT_EQUAL(makeOpStats(2000ms, 1),
fixture.getNodePutOperationStats(2));
}
void
-PendingMessageTrackerCallback_Test::timeTravellingClockLatenciesNotRegistered()
+PendingMessageTrackerTest::timeTravellingClockLatenciesNotRegistered()
{
Fixture fixture;
- auto put = fixture.sendPut(RequestBuilder().atTime(ms(1000)).toNode(0));
- fixture.sendPutReply(*put, RequestBuilder().atTime(ms(999)));
+ auto put = fixture.sendPut(RequestBuilder().atTime(1000ms).toNode(0));
+ fixture.sendPutReply(*put, RequestBuilder().atTime(999ms));
// Latency increase of zero, but we do count the request itself.
- CPPUNIT_ASSERT_EQUAL(makeOpStats(ms(0), 1),
+ CPPUNIT_ASSERT_EQUAL(makeOpStats(0ms, 1),
fixture.getNodePutOperationStats(0));
}
void
-PendingMessageTrackerCallback_Test::statsSnapshotIncludesAllNodes()
+PendingMessageTrackerTest::statsSnapshotIncludesAllNodes()
{
Fixture fixture;
- fixture.sendPutAndReplyWithLatency(0, ms(500));
- fixture.sendPutAndReplyWithLatency(1, ms(600));
+ fixture.sendPutAndReplyWithLatency(0, 500ms);
+ fixture.sendPutAndReplyWithLatency(1, 600ms);
NodeStatsSnapshot snapshot = fixture.tracker().getLatencyStatistics();
CPPUNIT_ASSERT_EQUAL(size_t(2), snapshot.nodeToStats.size());
- CPPUNIT_ASSERT_EQUAL(makeOpStats(ms(500), 1),
+ CPPUNIT_ASSERT_EQUAL(makeOpStats(500ms, 1),
snapshot.nodeToStats[0].puts);
- CPPUNIT_ASSERT_EQUAL(makeOpStats(ms(600), 1),
+ CPPUNIT_ASSERT_EQUAL(makeOpStats(600ms, 1),
snapshot.nodeToStats[1].puts);
}
void
-PendingMessageTrackerCallback_Test::latencyProviderForwardsToImplementation()
+PendingMessageTrackerTest::latencyProviderForwardsToImplementation()
{
Fixture fixture;
- fixture.sendPutAndReplyWithLatency(0, ms(500));
+ fixture.sendPutAndReplyWithLatency(0, 500ms);
LatencyStatisticsProvider& provider(
fixture.tracker().getLatencyStatisticsProvider());
NodeStatsSnapshot snapshot = provider.getLatencyStatistics();
CPPUNIT_ASSERT_EQUAL(size_t(1), snapshot.nodeToStats.size());
- CPPUNIT_ASSERT_EQUAL(makeOpStats(ms(500), 1),
+ CPPUNIT_ASSERT_EQUAL(makeOpStats(500ms, 1),
snapshot.nodeToStats[0].puts);
}
+// TODO don't set busy for visitor replies? These will mark the node as busy today,
+// but have the same actual semantics as busy merges (i.e. "queue is full", not "node
+// is too busy to accept new requests in general").
+
+void PendingMessageTrackerTest::busy_reply_marks_node_as_busy() {
+ Fixture f;
+ auto cmd = f.sendPut(RequestBuilder().toNode(0));
+ CPPUNIT_ASSERT(!f.tracker().getNodeInfo().isBusy(0));
+ f.sendPutReply(*cmd, RequestBuilder(), api::ReturnCode(api::ReturnCode::BUSY));
+ CPPUNIT_ASSERT(f.tracker().getNodeInfo().isBusy(0));
+ CPPUNIT_ASSERT(!f.tracker().getNodeInfo().isBusy(1));
+}
+
+void PendingMessageTrackerTest::busy_node_duration_can_be_adjusted() {
+ Fixture f;
+ auto cmd = f.sendPut(RequestBuilder().toNode(0));
+ f.tracker().setNodeBusyDuration(std::chrono::seconds(10));
+ f.sendPutReply(*cmd, RequestBuilder(), api::ReturnCode(api::ReturnCode::BUSY));
+ CPPUNIT_ASSERT(f.tracker().getNodeInfo().isBusy(0));
+ f.clock().addSecondsToTime(11);
+ CPPUNIT_ASSERT(!f.tracker().getNodeInfo().isBusy(0));
+}
+
} // distributor
} // storage
diff --git a/storage/src/vespa/storage/config/distributorconfiguration.cpp b/storage/src/vespa/storage/config/distributorconfiguration.cpp
index 3007eff3d92..44cf56fdff8 100644
--- a/storage/src/vespa/storage/config/distributorconfiguration.cpp
+++ b/storage/src/vespa/storage/config/distributorconfiguration.cpp
@@ -27,6 +27,7 @@ DistributorConfiguration::DistributorConfiguration(StorageComponent& component)
_maxVisitorsPerNodePerClientVisitor(4),
_minBucketsPerVisitor(5),
_maxClusterClockSkew(0),
+ _inhibitMergeSendingOnBusyNodeDuration(std::chrono::seconds(60)),
_doInlineSplit(true),
_enableJoinForSiblingLessBuckets(false),
_enableInconsistentJoin(false),
@@ -149,8 +150,10 @@ DistributorConfiguration::configure(const vespa::config::content::core::StorDist
configureMaintenancePriorities(config);
if (config.maxClusterClockSkewSec >= 0) {
- _maxClusterClockSkew = std::chrono::seconds(
- config.maxClusterClockSkewSec);
+ _maxClusterClockSkew = std::chrono::seconds(config.maxClusterClockSkewSec);
+ }
+ if (config.inhibitMergeSendingOnBusyNodeDurationSec >= 0) {
+ _inhibitMergeSendingOnBusyNodeDuration = std::chrono::seconds(config.inhibitMergeSendingOnBusyNodeDurationSec);
}
LOG(debug,
diff --git a/storage/src/vespa/storage/config/distributorconfiguration.h b/storage/src/vespa/storage/config/distributorconfiguration.h
index d33f46befac..d3e589f9d00 100644
--- a/storage/src/vespa/storage/config/distributorconfiguration.h
+++ b/storage/src/vespa/storage/config/distributorconfiguration.h
@@ -15,7 +15,7 @@ namespace distributor {
class DistributorConfiguration {
public:
- DistributorConfiguration(StorageComponent& component);
+ explicit DistributorConfiguration(StorageComponent& component);
~DistributorConfiguration();
struct MaintenancePriorities
@@ -225,6 +225,9 @@ public:
std::chrono::seconds getMaxClusterClockSkew() const noexcept {
return _maxClusterClockSkew;
}
+ std::chrono::seconds getInhibitMergesOnBusyNodeDuration() const noexcept {
+ return _inhibitMergeSendingOnBusyNodeDuration;
+ }
bool getSequenceMutatingOperations() const noexcept {
return _sequenceMutatingOperations;
@@ -263,6 +266,7 @@ private:
MaintenancePriorities _maintenancePriorities;
std::chrono::seconds _maxClusterClockSkew;
+ std::chrono::seconds _inhibitMergeSendingOnBusyNodeDuration;
bool _doInlineSplit;
bool _enableJoinForSiblingLessBuckets;
diff --git a/storage/src/vespa/storage/config/stor-distributormanager.def b/storage/src/vespa/storage/config/stor-distributormanager.def
index f549442cf51..0a43a7e8aa4 100644
--- a/storage/src/vespa/storage/config/stor-distributormanager.def
+++ b/storage/src/vespa/storage/config/stor-distributormanager.def
@@ -180,3 +180,7 @@ max_cluster_clock_skew_sec int default=1
## modifications to documents when sent from multiple feed clients.
sequence_mutating_operations bool default=true
+## Number of seconds that scheduling of new merge operations should be inhibited
+## towards a node if it has indicated that its merge queues are full or it is
+## suffering from resource exhaustion.
+inhibit_merge_sending_on_busy_node_duration_sec int default=30
diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp
index f0b80900fe4..0dc52650131 100644
--- a/storage/src/vespa/storage/distributor/distributor.cpp
+++ b/storage/src/vespa/storage/distributor/distributor.cpp
@@ -684,12 +684,10 @@ Distributor::doNonCriticalTick(framework::ThreadIndex)
void
Distributor::enableNextConfig()
{
- _hostInfoReporter.enableReporting(
- getConfig().getEnableHostInfoReporting());
- _bucketDBMetricUpdater.setMinimumReplicaCountingMode(
- getConfig().getMinimumReplicaCountingMode());
- _ownershipSafeTimeCalc->setMaxClusterClockSkew(
- getConfig().getMaxClusterClockSkew());
+ _hostInfoReporter.enableReporting(getConfig().getEnableHostInfoReporting());
+ _bucketDBMetricUpdater.setMinimumReplicaCountingMode(getConfig().getMinimumReplicaCountingMode());
+ _ownershipSafeTimeCalc->setMaxClusterClockSkew(getConfig().getMaxClusterClockSkew());
+ _pendingMessageTracker.setNodeBusyDuration(getConfig().getInhibitMergesOnBusyNodeDuration());
}
void
diff --git a/storage/src/vespa/storage/distributor/nodeinfo.cpp b/storage/src/vespa/storage/distributor/nodeinfo.cpp
index f018f78ef7a..9430f1691e0 100644
--- a/storage/src/vespa/storage/distributor/nodeinfo.cpp
+++ b/storage/src/vespa/storage/distributor/nodeinfo.cpp
@@ -6,21 +6,17 @@
namespace storage::distributor {
NodeInfo::NodeInfo(const framework::Clock& clock)
- : _clock(clock) {}
+ : _clock(clock) {}
-uint32_t
-NodeInfo::getPendingCount(uint16_t idx) const
-{
+uint32_t NodeInfo::getPendingCount(uint16_t idx) const {
return getNode(idx)._pending;
}
-bool
-NodeInfo::isBusy(uint16_t idx) const
-{
+bool NodeInfo::isBusy(uint16_t idx) const {
const SingleNodeInfo& info = getNode(idx);
- if (info._busyTime.isSet()) {
- if (_clock.getTimeInSeconds() > info._busyTime) {
- info._busyTime = framework::SecondTime(0);
+ if (info._busyUntilTime.time_since_epoch().count() != 0) {
+ if (_clock.getMonotonicTime() > info._busyUntilTime) {
+ info._busyUntilTime = framework::MonotonicTimePoint{};
} else {
return true;
}
@@ -29,22 +25,15 @@ NodeInfo::isBusy(uint16_t idx) const
return false;
}
-void
-NodeInfo::setBusy(uint16_t idx)
-{
- getNode(idx)._busyTime = _clock.getTimeInSeconds()
- + framework::SecondTime(60);
+void NodeInfo::setBusy(uint16_t idx, framework::MonotonicDuration for_duration) {
+ getNode(idx)._busyUntilTime = _clock.getMonotonicTime() + for_duration;
}
-void
-NodeInfo::incPending(uint16_t idx)
-{
+void NodeInfo::incPending(uint16_t idx) {
getNode(idx)._pending++;
}
-void
-NodeInfo::decPending(uint16_t idx)
-{
+void NodeInfo::decPending(uint16_t idx) {
SingleNodeInfo& info = getNode(idx);
if (info._pending > 0) {
@@ -52,28 +41,24 @@ NodeInfo::decPending(uint16_t idx)
}
}
-void
-NodeInfo::clearPending(uint16_t idx)
-{
+void NodeInfo::clearPending(uint16_t idx) {
SingleNodeInfo& info = getNode(idx);
info._pending = 0;
}
-NodeInfo::SingleNodeInfo&
-NodeInfo::getNode(uint16_t idx)
-{
- while ((int)_nodes.size() < idx + 1) {
- _nodes.push_back(SingleNodeInfo());
+NodeInfo::SingleNodeInfo& NodeInfo::getNode(uint16_t idx) {
+ const auto index_lbound = static_cast<size_t>(idx) + 1;
+ while (_nodes.size() < index_lbound) {
+ _nodes.emplace_back();
}
return _nodes[idx];
}
-const NodeInfo::SingleNodeInfo&
-NodeInfo::getNode(uint16_t idx) const
-{
- while ((int)_nodes.size() < idx + 1) {
- _nodes.push_back(SingleNodeInfo());
+const NodeInfo::SingleNodeInfo& NodeInfo::getNode(uint16_t idx) const {
+ const auto index_lbound = static_cast<size_t>(idx) + 1;
+ while (_nodes.size() < index_lbound) {
+ _nodes.emplace_back();
}
return _nodes[idx];
diff --git a/storage/src/vespa/storage/distributor/nodeinfo.h b/storage/src/vespa/storage/distributor/nodeinfo.h
index 57fd0145331..a28a8965714 100644
--- a/storage/src/vespa/storage/distributor/nodeinfo.h
+++ b/storage/src/vespa/storage/distributor/nodeinfo.h
@@ -14,13 +14,13 @@ namespace storage::distributor {
class NodeInfo {
public:
- NodeInfo(const framework::Clock& clock);
+ explicit NodeInfo(const framework::Clock& clock);
uint32_t getPendingCount(uint16_t idx) const;
bool isBusy(uint16_t idx) const;
- void setBusy(uint16_t idx);
+ void setBusy(uint16_t idx, framework::MonotonicDuration for_duration);
void incPending(uint16_t idx);
@@ -30,11 +30,10 @@ public:
private:
struct SingleNodeInfo {
- SingleNodeInfo()
- : _pending(0), _busyTime(0) {};
+ SingleNodeInfo() : _pending(0), _busyUntilTime() {}
uint32_t _pending;
- mutable framework::SecondTime _busyTime;
+ mutable framework::MonotonicTimePoint _busyUntilTime;
};
mutable std::vector<SingleNodeInfo> _nodes;
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
index 81d5c1a7ba6..28381af4021 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
@@ -1,6 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "mergeoperation.h"
#include <vespa/storage/distributor/idealstatemanager.h>
+#include <array>
#include <vespa/log/bufferedlogger.h>
LOG_SETUP(".distributor.operation.idealstate.merge");
@@ -302,25 +303,21 @@ MergeOperation::onReceive(DistributorMessageSender& sender,
namespace {
-static const uint32_t WRITE_FEED_MESSAGE_TYPES[] =
-{
+constexpr std::array<uint32_t, 7> WRITE_FEED_MESSAGE_TYPES {{
api::MessageType::PUT_ID,
api::MessageType::REMOVE_ID,
api::MessageType::UPDATE_ID,
api::MessageType::REMOVELOCATION_ID,
api::MessageType::MULTIOPERATION_ID,
api::MessageType::BATCHPUTREMOVE_ID,
- api::MessageType::BATCHDOCUMENTUPDATE_ID,
- 0
-};
+ api::MessageType::BATCHDOCUMENTUPDATE_ID
+}};
}
-bool
-MergeOperation::shouldBlockThisOperation(uint32_t messageType, uint8_t pri) const
-{
- for (uint32_t i = 0; WRITE_FEED_MESSAGE_TYPES[i] != 0; ++i) {
- if (messageType == WRITE_FEED_MESSAGE_TYPES[i]) {
+bool MergeOperation::shouldBlockThisOperation(uint32_t messageType, uint8_t pri) const {
+ for (auto blocking_type : WRITE_FEED_MESSAGE_TYPES) {
+ if (messageType == blocking_type) {
return true;
}
}
@@ -328,4 +325,14 @@ MergeOperation::shouldBlockThisOperation(uint32_t messageType, uint8_t pri) cons
return IdealStateOperation::shouldBlockThisOperation(messageType, pri);
}
+bool MergeOperation::isBlocked(const PendingMessageTracker& pending_tracker) const {
+ const auto& node_info = pending_tracker.getNodeInfo();
+ for (auto node : getNodes()) {
+ if (node_info.isBusy(node)) {
+ return true;
+ }
+ }
+ return IdealStateOperation::isBlocked(pending_tracker);
+}
+
}
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h
index 36b894efb20..d09bc0ba5c4 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h
@@ -46,7 +46,8 @@ public:
const document::BucketId&, MergeLimiter&,
std::vector<MergeMetaData>&);
- bool shouldBlockThisOperation(uint32_t messageType, uint8_t pri) const override ;
+ bool shouldBlockThisOperation(uint32_t messageType, uint8_t pri) const override;
+ bool isBlocked(const PendingMessageTracker& pendingMessages) const override;
private:
static void addIdealNodes(
const std::vector<uint16_t>& idealNodes,
diff --git a/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp b/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp
index b2eb416c48f..aeadca17d13 100644
--- a/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp
+++ b/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp
@@ -18,6 +18,7 @@ PendingMessageTracker::PendingMessageTracker(framework::ComponentRegister& cr)
_nodeIndexToStats(),
_nodeInfo(_component.getClock()),
_statisticsForwarder(*this),
+ _nodeBusyDuration(60),
_lock()
{
_component.registerStatusPage(*this);
@@ -54,15 +55,14 @@ PendingMessageTracker::currentTime() const
namespace {
template <typename Pair>
-struct PairAsRange
-{
+struct PairAsRange {
Pair _pair;
explicit PairAsRange(Pair pair) : _pair(std::move(pair)) {}
- auto begin() -> decltype(_pair.first) { return _pair.first; }
- auto end() -> decltype(_pair.second) { return _pair.second; }
- auto begin() const -> decltype(_pair.first) { return _pair.first; }
- auto end() const -> decltype(_pair.second) { return _pair.second; }
+ auto begin() { return _pair.first; }
+ auto end() { return _pair.second; }
+ auto begin() const { return _pair.first; }
+ auto end() const { return _pair.second; }
};
template <typename Pair>
@@ -133,7 +133,7 @@ PendingMessageTracker::reply(const api::StorageReply& r)
updateNodeStatsOnReply(*iter);
api::ReturnCode::Result code = r.getResult().getResult();
if (code == api::ReturnCode::BUSY || code == api::ReturnCode::TIMEOUT) {
- _nodeInfo.setBusy(r.getAddress()->getIndex());
+ _nodeInfo.setBusy(r.getAddress()->getIndex(), _nodeBusyDuration);
}
LOG(debug, "Erased message with id %zu", msgId);
msgs.erase(msgId);
diff --git a/storage/src/vespa/storage/distributor/pendingmessagetracker.h b/storage/src/vespa/storage/distributor/pendingmessagetracker.h
index 42704a02a40..9b49e94d51f 100644
--- a/storage/src/vespa/storage/distributor/pendingmessagetracker.h
+++ b/storage/src/vespa/storage/distributor/pendingmessagetracker.h
@@ -21,6 +21,7 @@
#include <set>
#include <unordered_map>
+#include <chrono>
namespace storage {
namespace distributor {
@@ -106,7 +107,7 @@ public:
* storage node. "Completed" here means both successful and failed
* operations. Statistics are monotonically increasing within the scope of
* the process' lifetime and are never reset. This models how the Linux
- * kernel reports its internal stats and means the caller must maintan
+ * kernel reports its internal stats and means the caller must maintain
* value snapshots to extract meaningful time series information.
*
* If stats are requested for a node that has not had any operations
@@ -137,6 +138,10 @@ public:
return _statisticsForwarder;
}
+ void setNodeBusyDuration(std::chrono::seconds secs) noexcept {
+ _nodeBusyDuration = secs;
+ }
+
private:
struct MessageEntry {
TimePoint timeStamp;
@@ -210,6 +215,7 @@ private:
std::unordered_map<uint16_t, NodeStats> _nodeIndexToStats;
NodeInfo _nodeInfo;
ForwardingLatencyStatisticsProvider _statisticsForwarder;
+ std::chrono::seconds _nodeBusyDuration;
// Since distributor is currently single-threaded, this will only
// contend when status page is being accessed. It is, however, required
diff --git a/storage/src/vespa/storage/distributor/throttlingoperationstarter.cpp b/storage/src/vespa/storage/distributor/throttlingoperationstarter.cpp
index 282ddd871b8..abd9778d72c 100644
--- a/storage/src/vespa/storage/distributor/throttlingoperationstarter.cpp
+++ b/storage/src/vespa/storage/distributor/throttlingoperationstarter.cpp
@@ -26,7 +26,7 @@ ThrottlingOperationStarter::start(const std::shared_ptr<Operation>& operation,
if (!canStart(_pendingCount, priority)) {
return false;
}
- Operation::SP wrappedOp(new ThrottlingOperation(operation, *this));
+ auto wrappedOp = std::make_shared<ThrottlingOperation>(operation, *this);
++_pendingCount;
return _starterImpl.start(wrappedOp, priority);
}