summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahoo-inc.com>2017-09-14 14:35:25 +0000
committerTor Brede Vekterli <vekterli@yahoo-inc.com>2017-09-14 14:38:00 +0000
commit42856470b2f16ae049fe25a089ee151ca1a9d6d2 (patch)
tree7b9d2b62790b086d409cb2f61c0c9114867f386b
parentafd3b362a2756b5c6c25215c1e1e78d295693ac3 (diff)
Inhibit scheduling of merges towards nodes that are marked busy
Utilizes existing maintenance operation scheduler system which checks if an operation is considered blocked and does not start it if this is the case. We now block a merge operation if any of the nodes in its node set are marked busy by the pending message tracker. Duration for which nodes are marked busy is live-configurable.
-rw-r--r--storage/src/tests/distributor/distributortest.cpp48
-rw-r--r--storage/src/tests/distributor/mergeoperationtest.cpp25
-rw-r--r--storage/src/tests/distributor/nodeinfotest.cpp6
-rw-r--r--storage/src/tests/distributor/pendingmessagetrackertest.cpp139
-rw-r--r--storage/src/vespa/storage/config/distributorconfiguration.cpp7
-rw-r--r--storage/src/vespa/storage/config/distributorconfiguration.h6
-rw-r--r--storage/src/vespa/storage/config/stor-distributormanager.def4
-rw-r--r--storage/src/vespa/storage/distributor/distributor.cpp10
-rw-r--r--storage/src/vespa/storage/distributor/nodeinfo.cpp53
-rw-r--r--storage/src/vespa/storage/distributor/nodeinfo.h9
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp27
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h3
-rw-r--r--storage/src/vespa/storage/distributor/pendingmessagetracker.cpp14
-rw-r--r--storage/src/vespa/storage/distributor/pendingmessagetracker.h8
-rw-r--r--storage/src/vespa/storage/distributor/throttlingoperationstarter.cpp2
15 files changed, 234 insertions, 127 deletions
diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp
index bfa1181eca1..3aedd31f574 100644
--- a/storage/src/tests/distributor/distributortest.cpp
+++ b/storage/src/tests/distributor/distributortest.cpp
@@ -46,6 +46,8 @@ class Distributor_Test : public CppUnit::TestFixture,
CPPUNIT_TEST(max_clock_skew_config_is_propagated_to_distributor_config);
CPPUNIT_TEST(configured_safe_time_point_rejection_works_end_to_end);
CPPUNIT_TEST(sequencing_config_is_propagated_to_distributor_config);
+ CPPUNIT_TEST(merge_busy_inhibit_duration_config_is_propagated_to_distributor_config);
+ CPPUNIT_TEST(merge_busy_inhibit_duration_is_propagated_to_pending_message_tracker);
CPPUNIT_TEST_SUITE_END();
protected:
@@ -73,6 +75,8 @@ protected:
void max_clock_skew_config_is_propagated_to_distributor_config();
void configured_safe_time_point_rejection_works_end_to_end();
void sequencing_config_is_propagated_to_distributor_config();
+ void merge_busy_inhibit_duration_config_is_propagated_to_distributor_config();
+ void merge_busy_inhibit_duration_is_propagated_to_pending_message_tracker();
public:
void setUp() override {
@@ -177,6 +181,7 @@ private:
void assertSingleBouncedRemoveReplyPresent();
void assertNoMessageBounced();
void configure_mutation_sequencing(bool enabled);
+ void configure_merge_busy_inhibit_duration(int seconds);
};
CPPUNIT_TEST_SUITE_REGISTRATION(Distributor_Test);
@@ -819,6 +824,49 @@ void Distributor_Test::sequencing_config_is_propagated_to_distributor_config() {
CPPUNIT_ASSERT(getConfig().getSequenceMutatingOperations());
}
+void
+Distributor_Test::configure_merge_busy_inhibit_duration(int seconds) {
+ using namespace vespa::config::content::core;
+ using ConfigBuilder = StorDistributormanagerConfigBuilder;
+
+ ConfigBuilder builder;
+ builder.inhibitMergeSendingOnBusyNodeDurationSec = seconds;
+ getConfig().configure(builder);
+ _distributor->enableNextConfig();
+}
+
+void Distributor_Test::merge_busy_inhibit_duration_config_is_propagated_to_distributor_config() {
+ setupDistributor(Redundancy(2), NodeCount(2), "storage:2 distributor:1");
+
+ configure_merge_busy_inhibit_duration(7);
+ CPPUNIT_ASSERT(getConfig().getInhibitMergesOnBusyNodeDuration() == std::chrono::seconds(7));
+}
+
+void Distributor_Test::merge_busy_inhibit_duration_is_propagated_to_pending_message_tracker() {
+ setupDistributor(Redundancy(2), NodeCount(2), "storage:1 distributor:1");
+ addNodesToBucketDB(document::BucketId(16, 1), "0=1/1/1/t");
+
+ configure_merge_busy_inhibit_duration(100);
+ auto cmd = makeDummyRemoveCommand(); // Remove is for bucket 1
+ _distributor->handleMessage(cmd);
+
+ // Should send to content node 0
+ CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.commands.size());
+ CPPUNIT_ASSERT_EQUAL(api::MessageType::REMOVE, _sender.commands[0]->getType());
+ auto& fwd_cmd = dynamic_cast<api::RemoveCommand&>(*_sender.commands[0]);
+ auto reply = fwd_cmd.makeReply();
+ reply->setResult(api::ReturnCode(api::ReturnCode::BUSY));
+ _distributor->handleReply(std::shared_ptr<api::StorageReply>(std::move(reply)));
+
+ auto& node_info = _distributor->getPendingMessageTracker().getNodeInfo();
+
+ CPPUNIT_ASSERT(node_info.isBusy(0));
+ getClock().addSecondsToTime(99);
+ CPPUNIT_ASSERT(node_info.isBusy(0));
+ getClock().addSecondsToTime(2);
+ CPPUNIT_ASSERT(!node_info.isBusy(0));
+}
+
}
}
diff --git a/storage/src/tests/distributor/mergeoperationtest.cpp b/storage/src/tests/distributor/mergeoperationtest.cpp
index b298bec3977..7e7ca52635e 100644
--- a/storage/src/tests/distributor/mergeoperationtest.cpp
+++ b/storage/src/tests/distributor/mergeoperationtest.cpp
@@ -28,6 +28,7 @@ class MergeOperationTest : public CppUnit::TestFixture,
CPPUNIT_TEST(testMarkRedundantTrustedCopiesAsSourceOnly);
CPPUNIT_TEST(onlyMarkRedundantRetiredReplicasAsSourceOnly);
CPPUNIT_TEST(mark_post_merge_redundant_replicas_source_only);
+ CPPUNIT_TEST(merge_operation_is_blocked_by_any_busy_target_node);
CPPUNIT_TEST_SUITE_END();
std::unique_ptr<PendingMessageTracker> _pendingTracker;
@@ -41,6 +42,7 @@ protected:
void testMarkRedundantTrustedCopiesAsSourceOnly();
void onlyMarkRedundantRetiredReplicasAsSourceOnly();
void mark_post_merge_redundant_replicas_source_only();
+ void merge_operation_is_blocked_by_any_busy_target_node();
public:
void setUp() override {
@@ -478,5 +480,28 @@ void MergeOperationTest::mark_post_merge_redundant_replicas_source_only() {
getNodeList("storage:10", 4, "3,5,7,6"));
}
+void MergeOperationTest::merge_operation_is_blocked_by_any_busy_target_node() {
+ getClock().setAbsoluteTimeInSeconds(10);
+ addNodesToBucketDB(document::BucketId(16, 1), "0=10/1/1/t,1=20/1/1,2=10/1/1/t");
+ _distributor->enableClusterState(lib::ClusterState("distributor:1 storage:3"));
+ MergeOperation op(BucketAndNodes(document::BucketId(16, 1), toVector<uint16_t>(0, 1, 2)));
+ op.setIdealStateManager(&getIdealStateManager());
+
+ // Should not block on nodes _not_ included in operation node set
+ _pendingTracker->getNodeInfo().setBusy(3, std::chrono::seconds(10));
+ CPPUNIT_ASSERT(!op.isBlocked(*_pendingTracker));
+
+ // Node 1 is included in operation node set and should cause a block
+ _pendingTracker->getNodeInfo().setBusy(0, std::chrono::seconds(10));
+ CPPUNIT_ASSERT(op.isBlocked(*_pendingTracker));
+
+ getClock().addSecondsToTime(11);
+ CPPUNIT_ASSERT(!op.isBlocked(*_pendingTracker)); // No longer busy
+
+ // Should block on other operation nodes than the first listed as well
+ _pendingTracker->getNodeInfo().setBusy(1, std::chrono::seconds(10));
+ CPPUNIT_ASSERT(op.isBlocked(*_pendingTracker));
+}
+
} // distributor
} // storage
diff --git a/storage/src/tests/distributor/nodeinfotest.cpp b/storage/src/tests/distributor/nodeinfotest.cpp
index a3eaf2505b3..0363f25831a 100644
--- a/storage/src/tests/distributor/nodeinfotest.cpp
+++ b/storage/src/tests/distributor/nodeinfotest.cpp
@@ -57,11 +57,11 @@ NodeInfoTest::testSimple()
CPPUNIT_ASSERT_EQUAL(1, (int)info.getPendingCount(7));
CPPUNIT_ASSERT_EQUAL(0, (int)info.getPendingCount(5));
- info.setBusy(5);
+ info.setBusy(5, std::chrono::seconds(60));
clock.addSecondsToTime(10);
- info.setBusy(1);
+ info.setBusy(1, std::chrono::seconds(60));
clock.addSecondsToTime(20);
- info.setBusy(42);
+ info.setBusy(42, std::chrono::seconds(60));
CPPUNIT_ASSERT_EQUAL(true, info.isBusy(5));
CPPUNIT_ASSERT_EQUAL(true, info.isBusy(1));
diff --git a/storage/src/tests/distributor/pendingmessagetrackertest.cpp b/storage/src/tests/distributor/pendingmessagetrackertest.cpp
index cda9a9e3782..fa36b4e6305 100644
--- a/storage/src/tests/distributor/pendingmessagetrackertest.cpp
+++ b/storage/src/tests/distributor/pendingmessagetrackertest.cpp
@@ -13,12 +13,10 @@
namespace storage {
namespace distributor {
-// Workaround typedef for not (yet) running with --std=c++14 which supports
-// user defined literals. Once we do, replace ms(123) with 123ms.
-using ms = std::chrono::milliseconds;
+using namespace std::chrono_literals;
-class PendingMessageTrackerCallback_Test : public CppUnit::TestFixture {
- CPPUNIT_TEST_SUITE(PendingMessageTrackerCallback_Test);
+class PendingMessageTrackerTest : public CppUnit::TestFixture {
+ CPPUNIT_TEST_SUITE(PendingMessageTrackerTest);
CPPUNIT_TEST(testSimple);
CPPUNIT_TEST(testMultipleMessages);
CPPUNIT_TEST(testStartPage);
@@ -36,6 +34,8 @@ class PendingMessageTrackerCallback_Test : public CppUnit::TestFixture {
CPPUNIT_TEST(timeTravellingClockLatenciesNotRegistered);
CPPUNIT_TEST(statsSnapshotIncludesAllNodes);
CPPUNIT_TEST(latencyProviderForwardsToImplementation);
+ CPPUNIT_TEST(busy_reply_marks_node_as_busy);
+ CPPUNIT_TEST(busy_node_duration_can_be_adjusted);
CPPUNIT_TEST_SUITE_END();
public:
@@ -56,6 +56,8 @@ public:
void timeTravellingClockLatenciesNotRegistered();
void statsSnapshotIncludesAllNodes();
void latencyProviderForwardsToImplementation();
+ void busy_reply_marks_node_as_busy();
+ void busy_node_duration_can_be_adjusted();
private:
void insertMessages(PendingMessageTracker& tracker);
@@ -122,10 +124,12 @@ public:
}
void sendPutReply(api::PutCommand& putCmd,
- const RequestBuilder& builder)
+ const RequestBuilder& builder,
+ const api::ReturnCode& result = api::ReturnCode())
{
assignMockedTime(builder.atTime());
auto putReply = putCmd.makeReply();
+ putReply->setResult(result);
_tracker->reply(*putReply);
}
@@ -149,8 +153,8 @@ public:
void sendPutAndReplyWithLatency(uint16_t node,
std::chrono::milliseconds latency)
{
- auto put = sendPut(RequestBuilder().atTime(ms(1000)).toNode(node));
- sendPutReply(*put, RequestBuilder().atTime(ms(1000) + latency));
+ auto put = sendPut(RequestBuilder().atTime(1000ms).toNode(node));
+ sendPutReply(*put, RequestBuilder().atTime(1000ms + latency));
}
OperationStats getNodePutOperationStats(uint16_t node) {
@@ -158,6 +162,7 @@ public:
}
PendingMessageTracker& tracker() { return *_tracker; }
+ auto& clock() { return _clock; }
private:
std::string createDummyIdString(const document::BucketId& bucket) const {
@@ -215,17 +220,16 @@ Fixture::Fixture()
_clock.setAbsoluteTimeInSeconds(1);
// Have to set clock in compReg before constructing tracker, or it'll
// flip out and die on an explicit nullptr check.
- _tracker = std::unique_ptr<PendingMessageTracker>(
- new PendingMessageTracker(_compReg));
+ _tracker = std::make_unique<PendingMessageTracker>(_compReg);
}
Fixture::~Fixture() {}
}
-CPPUNIT_TEST_SUITE_REGISTRATION(PendingMessageTrackerCallback_Test);
+CPPUNIT_TEST_SUITE_REGISTRATION(PendingMessageTrackerTest);
void
-PendingMessageTrackerCallback_Test::testSimple()
+PendingMessageTrackerTest::testSimple()
{
StorageComponentRegisterImpl compReg;
framework::defaultimplementation::FakeClock clock;
@@ -269,7 +273,7 @@ PendingMessageTrackerCallback_Test::testSimple()
}
void
-PendingMessageTrackerCallback_Test::insertMessages(PendingMessageTracker& tracker)
+PendingMessageTrackerTest::insertMessages(PendingMessageTracker& tracker)
{
for (uint32_t i = 0; i < 4; i++) {
std::ostringstream ost;
@@ -294,7 +298,7 @@ PendingMessageTrackerCallback_Test::insertMessages(PendingMessageTracker& tracke
}
void
-PendingMessageTrackerCallback_Test::testStartPage()
+PendingMessageTrackerTest::testStartPage()
{
StorageComponentRegisterImpl compReg;
framework::defaultimplementation::FakeClock clock;
@@ -318,7 +322,7 @@ PendingMessageTrackerCallback_Test::testStartPage()
}
void
-PendingMessageTrackerCallback_Test::testMultipleMessages()
+PendingMessageTrackerTest::testMultipleMessages()
{
StorageComponentRegisterImpl compReg;
framework::defaultimplementation::FakeClock clock;
@@ -419,7 +423,7 @@ public:
}
void
-PendingMessageTrackerCallback_Test::testGetPendingMessageTypes()
+PendingMessageTrackerTest::testGetPendingMessageTypes()
{
StorageComponentRegisterImpl compReg;
framework::defaultimplementation::FakeClock clock;
@@ -456,7 +460,7 @@ PendingMessageTrackerCallback_Test::testGetPendingMessageTypes()
}
void
-PendingMessageTrackerCallback_Test::testHasPendingMessage()
+PendingMessageTrackerTest::testHasPendingMessage()
{
StorageComponentRegisterImpl compReg;
framework::defaultimplementation::FakeClock clock;
@@ -508,7 +512,7 @@ public:
} // anon ns
void
-PendingMessageTrackerCallback_Test::testGetAllMessagesForSingleBucket()
+PendingMessageTrackerTest::testGetAllMessagesForSingleBucket()
{
StorageComponentRegisterImpl compReg;
framework::defaultimplementation::FakeClock clock;
@@ -535,10 +539,10 @@ PendingMessageTrackerCallback_Test::testGetAllMessagesForSingleBucket()
}
void
-PendingMessageTrackerCallback_Test::nodeStatsCanBeOutputStreamed()
+PendingMessageTrackerTest::nodeStatsCanBeOutputStreamed()
{
NodeStats stats;
- stats.puts = makeOpStats(ms(56789), 10);
+ stats.puts = makeOpStats(56789ms, 10);
std::ostringstream os;
os << stats;
@@ -550,120 +554,143 @@ PendingMessageTrackerCallback_Test::nodeStatsCanBeOutputStreamed()
}
void
-PendingMessageTrackerCallback_Test::totalPutLatencyIsInitiallyZero()
+PendingMessageTrackerTest::totalPutLatencyIsInitiallyZero()
{
Fixture fixture;
- CPPUNIT_ASSERT_EQUAL(makeOpStats(ms(0), 0),
+ CPPUNIT_ASSERT_EQUAL(makeOpStats(0ms, 0),
fixture.getNodePutOperationStats(0));
}
void
-PendingMessageTrackerCallback_Test::statsNotAlteredBeforeReplyReceived()
+PendingMessageTrackerTest::statsNotAlteredBeforeReplyReceived()
{
Fixture fixture;
- fixture.sendPut(RequestBuilder().atTime(ms(1000)).toNode(0));
- CPPUNIT_ASSERT_EQUAL(makeOpStats(ms(0), 0),
+ fixture.sendPut(RequestBuilder().atTime(1000ms).toNode(0));
+ CPPUNIT_ASSERT_EQUAL(makeOpStats(0ms, 0),
fixture.getNodePutOperationStats(0));
}
void
-PendingMessageTrackerCallback_Test::totalPutLatencyIsTrackedForSingleRequest()
+PendingMessageTrackerTest::totalPutLatencyIsTrackedForSingleRequest()
{
Fixture fixture;
- fixture.sendPutAndReplyWithLatency(0, ms(500));
+ fixture.sendPutAndReplyWithLatency(0, 500ms);
- CPPUNIT_ASSERT_EQUAL(makeOpStats(ms(500), 1),
+ CPPUNIT_ASSERT_EQUAL(makeOpStats(500ms, 1),
fixture.getNodePutOperationStats(0));
}
void
-PendingMessageTrackerCallback_Test::statsAreTrackedSeparatelyPerNode()
+PendingMessageTrackerTest::statsAreTrackedSeparatelyPerNode()
{
Fixture fixture;
- fixture.sendPutAndReplyWithLatency(0, ms(500));
- fixture.sendPutAndReplyWithLatency(1, ms(600));
+ fixture.sendPutAndReplyWithLatency(0, 500ms);
+ fixture.sendPutAndReplyWithLatency(1, 600ms);
- CPPUNIT_ASSERT_EQUAL(makeOpStats(ms(500), 1),
+ CPPUNIT_ASSERT_EQUAL(makeOpStats(500ms, 1),
fixture.getNodePutOperationStats(0));
- CPPUNIT_ASSERT_EQUAL(makeOpStats(ms(600), 1),
+ CPPUNIT_ASSERT_EQUAL(makeOpStats(600ms, 1),
fixture.getNodePutOperationStats(1));
}
// Necessarily, this test will have to be altered when we add tracking of
// other message types as well.
void
-PendingMessageTrackerCallback_Test::onlyPutMessagesAreTracked()
+PendingMessageTrackerTest::onlyPutMessagesAreTracked()
{
Fixture fixture;
auto remove = fixture.sendRemove(
- RequestBuilder().atTime(ms(1000)).toNode(0));
- fixture.sendRemoveReply(*remove, RequestBuilder().atTime(ms(2000)));
- CPPUNIT_ASSERT_EQUAL(makeOpStats(ms(0), 0),
+ RequestBuilder().atTime(1000ms).toNode(0));
+ fixture.sendRemoveReply(*remove, RequestBuilder().atTime(2000ms));
+ CPPUNIT_ASSERT_EQUAL(makeOpStats(0ms, 0),
fixture.getNodePutOperationStats(0));
}
void
-PendingMessageTrackerCallback_Test::totalPutLatencyIsAggregatedAcrossRequests()
+PendingMessageTrackerTest::totalPutLatencyIsAggregatedAcrossRequests()
{
Fixture fixture;
// Model 2 concurrent puts to node 0.
- fixture.sendPutAndReplyWithLatency(0, ms(500));
- fixture.sendPutAndReplyWithLatency(0, ms(600));
- CPPUNIT_ASSERT_EQUAL(makeOpStats(ms(1100), 2),
+ fixture.sendPutAndReplyWithLatency(0, 500ms);
+ fixture.sendPutAndReplyWithLatency(0, 600ms);
+ CPPUNIT_ASSERT_EQUAL(makeOpStats(1100ms, 2),
fixture.getNodePutOperationStats(0));
}
void
-PendingMessageTrackerCallback_Test::clearingMessagesDoesNotAffectStats()
+PendingMessageTrackerTest::clearingMessagesDoesNotAffectStats()
{
Fixture fixture;
- fixture.sendPutAndReplyWithLatency(2, ms(2000));
+ fixture.sendPutAndReplyWithLatency(2, 2000ms);
fixture.tracker().clearMessagesForNode(2);
- CPPUNIT_ASSERT_EQUAL(makeOpStats(ms(2000), 1),
+ CPPUNIT_ASSERT_EQUAL(makeOpStats(2000ms, 1),
fixture.getNodePutOperationStats(2));
}
void
-PendingMessageTrackerCallback_Test::timeTravellingClockLatenciesNotRegistered()
+PendingMessageTrackerTest::timeTravellingClockLatenciesNotRegistered()
{
Fixture fixture;
- auto put = fixture.sendPut(RequestBuilder().atTime(ms(1000)).toNode(0));
- fixture.sendPutReply(*put, RequestBuilder().atTime(ms(999)));
+ auto put = fixture.sendPut(RequestBuilder().atTime(1000ms).toNode(0));
+ fixture.sendPutReply(*put, RequestBuilder().atTime(999ms));
// Latency increase of zero, but we do count the request itself.
- CPPUNIT_ASSERT_EQUAL(makeOpStats(ms(0), 1),
+ CPPUNIT_ASSERT_EQUAL(makeOpStats(0ms, 1),
fixture.getNodePutOperationStats(0));
}
void
-PendingMessageTrackerCallback_Test::statsSnapshotIncludesAllNodes()
+PendingMessageTrackerTest::statsSnapshotIncludesAllNodes()
{
Fixture fixture;
- fixture.sendPutAndReplyWithLatency(0, ms(500));
- fixture.sendPutAndReplyWithLatency(1, ms(600));
+ fixture.sendPutAndReplyWithLatency(0, 500ms);
+ fixture.sendPutAndReplyWithLatency(1, 600ms);
NodeStatsSnapshot snapshot = fixture.tracker().getLatencyStatistics();
CPPUNIT_ASSERT_EQUAL(size_t(2), snapshot.nodeToStats.size());
- CPPUNIT_ASSERT_EQUAL(makeOpStats(ms(500), 1),
+ CPPUNIT_ASSERT_EQUAL(makeOpStats(500ms, 1),
snapshot.nodeToStats[0].puts);
- CPPUNIT_ASSERT_EQUAL(makeOpStats(ms(600), 1),
+ CPPUNIT_ASSERT_EQUAL(makeOpStats(600ms, 1),
snapshot.nodeToStats[1].puts);
}
void
-PendingMessageTrackerCallback_Test::latencyProviderForwardsToImplementation()
+PendingMessageTrackerTest::latencyProviderForwardsToImplementation()
{
Fixture fixture;
- fixture.sendPutAndReplyWithLatency(0, ms(500));
+ fixture.sendPutAndReplyWithLatency(0, 500ms);
LatencyStatisticsProvider& provider(
fixture.tracker().getLatencyStatisticsProvider());
NodeStatsSnapshot snapshot = provider.getLatencyStatistics();
CPPUNIT_ASSERT_EQUAL(size_t(1), snapshot.nodeToStats.size());
- CPPUNIT_ASSERT_EQUAL(makeOpStats(ms(500), 1),
+ CPPUNIT_ASSERT_EQUAL(makeOpStats(500ms, 1),
snapshot.nodeToStats[0].puts);
}
+// TODO don't set busy for visitor replies? These will mark the node as busy today,
+// but have the same actual semantics as busy merges (i.e. "queue is full", not "node
+// is too busy to accept new requests in general").
+
+void PendingMessageTrackerTest::busy_reply_marks_node_as_busy() {
+ Fixture f;
+ auto cmd = f.sendPut(RequestBuilder().toNode(0));
+ CPPUNIT_ASSERT(!f.tracker().getNodeInfo().isBusy(0));
+ f.sendPutReply(*cmd, RequestBuilder(), api::ReturnCode(api::ReturnCode::BUSY));
+ CPPUNIT_ASSERT(f.tracker().getNodeInfo().isBusy(0));
+ CPPUNIT_ASSERT(!f.tracker().getNodeInfo().isBusy(1));
+}
+
+void PendingMessageTrackerTest::busy_node_duration_can_be_adjusted() {
+ Fixture f;
+ auto cmd = f.sendPut(RequestBuilder().toNode(0));
+ f.tracker().setNodeBusyDuration(std::chrono::seconds(10));
+ f.sendPutReply(*cmd, RequestBuilder(), api::ReturnCode(api::ReturnCode::BUSY));
+ CPPUNIT_ASSERT(f.tracker().getNodeInfo().isBusy(0));
+ f.clock().addSecondsToTime(11);
+ CPPUNIT_ASSERT(!f.tracker().getNodeInfo().isBusy(0));
+}
+
} // distributor
} // storage
diff --git a/storage/src/vespa/storage/config/distributorconfiguration.cpp b/storage/src/vespa/storage/config/distributorconfiguration.cpp
index 3007eff3d92..44cf56fdff8 100644
--- a/storage/src/vespa/storage/config/distributorconfiguration.cpp
+++ b/storage/src/vespa/storage/config/distributorconfiguration.cpp
@@ -27,6 +27,7 @@ DistributorConfiguration::DistributorConfiguration(StorageComponent& component)
_maxVisitorsPerNodePerClientVisitor(4),
_minBucketsPerVisitor(5),
_maxClusterClockSkew(0),
+ _inhibitMergeSendingOnBusyNodeDuration(std::chrono::seconds(60)),
_doInlineSplit(true),
_enableJoinForSiblingLessBuckets(false),
_enableInconsistentJoin(false),
@@ -149,8 +150,10 @@ DistributorConfiguration::configure(const vespa::config::content::core::StorDist
configureMaintenancePriorities(config);
if (config.maxClusterClockSkewSec >= 0) {
- _maxClusterClockSkew = std::chrono::seconds(
- config.maxClusterClockSkewSec);
+ _maxClusterClockSkew = std::chrono::seconds(config.maxClusterClockSkewSec);
+ }
+ if (config.inhibitMergeSendingOnBusyNodeDurationSec >= 0) {
+ _inhibitMergeSendingOnBusyNodeDuration = std::chrono::seconds(config.inhibitMergeSendingOnBusyNodeDurationSec);
}
LOG(debug,
diff --git a/storage/src/vespa/storage/config/distributorconfiguration.h b/storage/src/vespa/storage/config/distributorconfiguration.h
index d33f46befac..d3e589f9d00 100644
--- a/storage/src/vespa/storage/config/distributorconfiguration.h
+++ b/storage/src/vespa/storage/config/distributorconfiguration.h
@@ -15,7 +15,7 @@ namespace distributor {
class DistributorConfiguration {
public:
- DistributorConfiguration(StorageComponent& component);
+ explicit DistributorConfiguration(StorageComponent& component);
~DistributorConfiguration();
struct MaintenancePriorities
@@ -225,6 +225,9 @@ public:
std::chrono::seconds getMaxClusterClockSkew() const noexcept {
return _maxClusterClockSkew;
}
+ std::chrono::seconds getInhibitMergesOnBusyNodeDuration() const noexcept {
+ return _inhibitMergeSendingOnBusyNodeDuration;
+ }
bool getSequenceMutatingOperations() const noexcept {
return _sequenceMutatingOperations;
@@ -263,6 +266,7 @@ private:
MaintenancePriorities _maintenancePriorities;
std::chrono::seconds _maxClusterClockSkew;
+ std::chrono::seconds _inhibitMergeSendingOnBusyNodeDuration;
bool _doInlineSplit;
bool _enableJoinForSiblingLessBuckets;
diff --git a/storage/src/vespa/storage/config/stor-distributormanager.def b/storage/src/vespa/storage/config/stor-distributormanager.def
index f549442cf51..0a43a7e8aa4 100644
--- a/storage/src/vespa/storage/config/stor-distributormanager.def
+++ b/storage/src/vespa/storage/config/stor-distributormanager.def
@@ -180,3 +180,7 @@ max_cluster_clock_skew_sec int default=1
## modifications to documents when sent from multiple feed clients.
sequence_mutating_operations bool default=true
+## Number of seconds that scheduling of new merge operations should be inhibited
+## towards a node if it has indicated that its merge queues are full or it is
+## suffering from resource exhaustion.
+inhibit_merge_sending_on_busy_node_duration_sec int default=30
diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp
index f0b80900fe4..0dc52650131 100644
--- a/storage/src/vespa/storage/distributor/distributor.cpp
+++ b/storage/src/vespa/storage/distributor/distributor.cpp
@@ -684,12 +684,10 @@ Distributor::doNonCriticalTick(framework::ThreadIndex)
void
Distributor::enableNextConfig()
{
- _hostInfoReporter.enableReporting(
- getConfig().getEnableHostInfoReporting());
- _bucketDBMetricUpdater.setMinimumReplicaCountingMode(
- getConfig().getMinimumReplicaCountingMode());
- _ownershipSafeTimeCalc->setMaxClusterClockSkew(
- getConfig().getMaxClusterClockSkew());
+ _hostInfoReporter.enableReporting(getConfig().getEnableHostInfoReporting());
+ _bucketDBMetricUpdater.setMinimumReplicaCountingMode(getConfig().getMinimumReplicaCountingMode());
+ _ownershipSafeTimeCalc->setMaxClusterClockSkew(getConfig().getMaxClusterClockSkew());
+ _pendingMessageTracker.setNodeBusyDuration(getConfig().getInhibitMergesOnBusyNodeDuration());
}
void
diff --git a/storage/src/vespa/storage/distributor/nodeinfo.cpp b/storage/src/vespa/storage/distributor/nodeinfo.cpp
index f018f78ef7a..9430f1691e0 100644
--- a/storage/src/vespa/storage/distributor/nodeinfo.cpp
+++ b/storage/src/vespa/storage/distributor/nodeinfo.cpp
@@ -6,21 +6,17 @@
namespace storage::distributor {
NodeInfo::NodeInfo(const framework::Clock& clock)
- : _clock(clock) {}
+ : _clock(clock) {}
-uint32_t
-NodeInfo::getPendingCount(uint16_t idx) const
-{
+uint32_t NodeInfo::getPendingCount(uint16_t idx) const {
return getNode(idx)._pending;
}
-bool
-NodeInfo::isBusy(uint16_t idx) const
-{
+bool NodeInfo::isBusy(uint16_t idx) const {
const SingleNodeInfo& info = getNode(idx);
- if (info._busyTime.isSet()) {
- if (_clock.getTimeInSeconds() > info._busyTime) {
- info._busyTime = framework::SecondTime(0);
+ if (info._busyUntilTime.time_since_epoch().count() != 0) {
+ if (_clock.getMonotonicTime() > info._busyUntilTime) {
+ info._busyUntilTime = framework::MonotonicTimePoint{};
} else {
return true;
}
@@ -29,22 +25,15 @@ NodeInfo::isBusy(uint16_t idx) const
return false;
}
-void
-NodeInfo::setBusy(uint16_t idx)
-{
- getNode(idx)._busyTime = _clock.getTimeInSeconds()
- + framework::SecondTime(60);
+void NodeInfo::setBusy(uint16_t idx, framework::MonotonicDuration for_duration) {
+ getNode(idx)._busyUntilTime = _clock.getMonotonicTime() + for_duration;
}
-void
-NodeInfo::incPending(uint16_t idx)
-{
+void NodeInfo::incPending(uint16_t idx) {
getNode(idx)._pending++;
}
-void
-NodeInfo::decPending(uint16_t idx)
-{
+void NodeInfo::decPending(uint16_t idx) {
SingleNodeInfo& info = getNode(idx);
if (info._pending > 0) {
@@ -52,28 +41,24 @@ NodeInfo::decPending(uint16_t idx)
}
}
-void
-NodeInfo::clearPending(uint16_t idx)
-{
+void NodeInfo::clearPending(uint16_t idx) {
SingleNodeInfo& info = getNode(idx);
info._pending = 0;
}
-NodeInfo::SingleNodeInfo&
-NodeInfo::getNode(uint16_t idx)
-{
- while ((int)_nodes.size() < idx + 1) {
- _nodes.push_back(SingleNodeInfo());
+NodeInfo::SingleNodeInfo& NodeInfo::getNode(uint16_t idx) {
+ const auto index_lbound = static_cast<size_t>(idx) + 1;
+ while (_nodes.size() < index_lbound) {
+ _nodes.emplace_back();
}
return _nodes[idx];
}
-const NodeInfo::SingleNodeInfo&
-NodeInfo::getNode(uint16_t idx) const
-{
- while ((int)_nodes.size() < idx + 1) {
- _nodes.push_back(SingleNodeInfo());
+const NodeInfo::SingleNodeInfo& NodeInfo::getNode(uint16_t idx) const {
+ const auto index_lbound = static_cast<size_t>(idx) + 1;
+ while (_nodes.size() < index_lbound) {
+ _nodes.emplace_back();
}
return _nodes[idx];
diff --git a/storage/src/vespa/storage/distributor/nodeinfo.h b/storage/src/vespa/storage/distributor/nodeinfo.h
index 57fd0145331..a28a8965714 100644
--- a/storage/src/vespa/storage/distributor/nodeinfo.h
+++ b/storage/src/vespa/storage/distributor/nodeinfo.h
@@ -14,13 +14,13 @@ namespace storage::distributor {
class NodeInfo {
public:
- NodeInfo(const framework::Clock& clock);
+ explicit NodeInfo(const framework::Clock& clock);
uint32_t getPendingCount(uint16_t idx) const;
bool isBusy(uint16_t idx) const;
- void setBusy(uint16_t idx);
+ void setBusy(uint16_t idx, framework::MonotonicDuration for_duration);
void incPending(uint16_t idx);
@@ -30,11 +30,10 @@ public:
private:
struct SingleNodeInfo {
- SingleNodeInfo()
- : _pending(0), _busyTime(0) {};
+ SingleNodeInfo() : _pending(0), _busyUntilTime() {}
uint32_t _pending;
- mutable framework::SecondTime _busyTime;
+ mutable framework::MonotonicTimePoint _busyUntilTime;
};
mutable std::vector<SingleNodeInfo> _nodes;
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
index 81d5c1a7ba6..28381af4021 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
@@ -1,6 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "mergeoperation.h"
#include <vespa/storage/distributor/idealstatemanager.h>
+#include <array>
#include <vespa/log/bufferedlogger.h>
LOG_SETUP(".distributor.operation.idealstate.merge");
@@ -302,25 +303,21 @@ MergeOperation::onReceive(DistributorMessageSender& sender,
namespace {
-static const uint32_t WRITE_FEED_MESSAGE_TYPES[] =
-{
+constexpr std::array<uint32_t, 7> WRITE_FEED_MESSAGE_TYPES {{
api::MessageType::PUT_ID,
api::MessageType::REMOVE_ID,
api::MessageType::UPDATE_ID,
api::MessageType::REMOVELOCATION_ID,
api::MessageType::MULTIOPERATION_ID,
api::MessageType::BATCHPUTREMOVE_ID,
- api::MessageType::BATCHDOCUMENTUPDATE_ID,
- 0
-};
+ api::MessageType::BATCHDOCUMENTUPDATE_ID
+}};
}
-bool
-MergeOperation::shouldBlockThisOperation(uint32_t messageType, uint8_t pri) const
-{
- for (uint32_t i = 0; WRITE_FEED_MESSAGE_TYPES[i] != 0; ++i) {
- if (messageType == WRITE_FEED_MESSAGE_TYPES[i]) {
+bool MergeOperation::shouldBlockThisOperation(uint32_t messageType, uint8_t pri) const {
+ for (auto blocking_type : WRITE_FEED_MESSAGE_TYPES) {
+ if (messageType == blocking_type) {
return true;
}
}
@@ -328,4 +325,14 @@ MergeOperation::shouldBlockThisOperation(uint32_t messageType, uint8_t pri) cons
return IdealStateOperation::shouldBlockThisOperation(messageType, pri);
}
+bool MergeOperation::isBlocked(const PendingMessageTracker& pending_tracker) const {
+ const auto& node_info = pending_tracker.getNodeInfo();
+ for (auto node : getNodes()) {
+ if (node_info.isBusy(node)) {
+ return true;
+ }
+ }
+ return IdealStateOperation::isBlocked(pending_tracker);
+}
+
}
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h
index 36b894efb20..d09bc0ba5c4 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h
@@ -46,7 +46,8 @@ public:
const document::BucketId&, MergeLimiter&,
std::vector<MergeMetaData>&);
- bool shouldBlockThisOperation(uint32_t messageType, uint8_t pri) const override ;
+ bool shouldBlockThisOperation(uint32_t messageType, uint8_t pri) const override;
+ bool isBlocked(const PendingMessageTracker& pendingMessages) const override;
private:
static void addIdealNodes(
const std::vector<uint16_t>& idealNodes,
diff --git a/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp b/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp
index b2eb416c48f..aeadca17d13 100644
--- a/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp
+++ b/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp
@@ -18,6 +18,7 @@ PendingMessageTracker::PendingMessageTracker(framework::ComponentRegister& cr)
_nodeIndexToStats(),
_nodeInfo(_component.getClock()),
_statisticsForwarder(*this),
+ _nodeBusyDuration(60),
_lock()
{
_component.registerStatusPage(*this);
@@ -54,15 +55,14 @@ PendingMessageTracker::currentTime() const
namespace {
template <typename Pair>
-struct PairAsRange
-{
+struct PairAsRange {
Pair _pair;
explicit PairAsRange(Pair pair) : _pair(std::move(pair)) {}
- auto begin() -> decltype(_pair.first) { return _pair.first; }
- auto end() -> decltype(_pair.second) { return _pair.second; }
- auto begin() const -> decltype(_pair.first) { return _pair.first; }
- auto end() const -> decltype(_pair.second) { return _pair.second; }
+ auto begin() { return _pair.first; }
+ auto end() { return _pair.second; }
+ auto begin() const { return _pair.first; }
+ auto end() const { return _pair.second; }
};
template <typename Pair>
@@ -133,7 +133,7 @@ PendingMessageTracker::reply(const api::StorageReply& r)
updateNodeStatsOnReply(*iter);
api::ReturnCode::Result code = r.getResult().getResult();
if (code == api::ReturnCode::BUSY || code == api::ReturnCode::TIMEOUT) {
- _nodeInfo.setBusy(r.getAddress()->getIndex());
+ _nodeInfo.setBusy(r.getAddress()->getIndex(), _nodeBusyDuration);
}
LOG(debug, "Erased message with id %zu", msgId);
msgs.erase(msgId);
diff --git a/storage/src/vespa/storage/distributor/pendingmessagetracker.h b/storage/src/vespa/storage/distributor/pendingmessagetracker.h
index 42704a02a40..9b49e94d51f 100644
--- a/storage/src/vespa/storage/distributor/pendingmessagetracker.h
+++ b/storage/src/vespa/storage/distributor/pendingmessagetracker.h
@@ -21,6 +21,7 @@
#include <set>
#include <unordered_map>
+#include <chrono>
namespace storage {
namespace distributor {
@@ -106,7 +107,7 @@ public:
* storage node. "Completed" here means both successful and failed
* operations. Statistics are monotonically increasing within the scope of
* the process' lifetime and are never reset. This models how the Linux
- * kernel reports its internal stats and means the caller must maintan
+ * kernel reports its internal stats and means the caller must maintain
* value snapshots to extract meaningful time series information.
*
* If stats are requested for a node that has not had any operations
@@ -137,6 +138,10 @@ public:
return _statisticsForwarder;
}
+ void setNodeBusyDuration(std::chrono::seconds secs) noexcept {
+ _nodeBusyDuration = secs;
+ }
+
private:
struct MessageEntry {
TimePoint timeStamp;
@@ -210,6 +215,7 @@ private:
std::unordered_map<uint16_t, NodeStats> _nodeIndexToStats;
NodeInfo _nodeInfo;
ForwardingLatencyStatisticsProvider _statisticsForwarder;
+ std::chrono::seconds _nodeBusyDuration;
// Since distributor is currently single-threaded, this will only
// contend when status page is being accessed. It is, however, required
diff --git a/storage/src/vespa/storage/distributor/throttlingoperationstarter.cpp b/storage/src/vespa/storage/distributor/throttlingoperationstarter.cpp
index 282ddd871b8..abd9778d72c 100644
--- a/storage/src/vespa/storage/distributor/throttlingoperationstarter.cpp
+++ b/storage/src/vespa/storage/distributor/throttlingoperationstarter.cpp
@@ -26,7 +26,7 @@ ThrottlingOperationStarter::start(const std::shared_ptr<Operation>& operation,
if (!canStart(_pendingCount, priority)) {
return false;
}
- Operation::SP wrappedOp(new ThrottlingOperation(operation, *this));
+ auto wrappedOp = std::make_shared<ThrottlingOperation>(operation, *this);
++_pendingCount;
return _starterImpl.start(wrappedOp, priority);
}