diff options
author | Tor Egge <Tor.Egge@online.no> | 2021-10-13 12:29:42 +0200 |
---|---|---|
committer | Tor Egge <Tor.Egge@online.no> | 2021-10-13 12:29:42 +0200 |
commit | e302a3f50761a500448734161c30f439337bae0d (patch) | |
tree | b9f1f5b0a3906568cfa6c5e85ce8ba183081b3ad /storage | |
parent | 4b66edb00d3810b067eabc55ac9d4d657b954626 (diff) |
Add metrics for blocked and throttled operations.
Diffstat (limited to 'storage')
12 files changed, 117 insertions, 6 deletions
diff --git a/storage/src/tests/distributor/blockingoperationstartertest.cpp b/storage/src/tests/distributor/blockingoperationstartertest.cpp index ba4148af28b..15aada37c9b 100644 --- a/storage/src/tests/distributor/blockingoperationstartertest.cpp +++ b/storage/src/tests/distributor/blockingoperationstartertest.cpp @@ -14,6 +14,14 @@ using namespace ::testing; namespace storage::distributor { +namespace { + +const MockOperation& as_mock_operation(const Operation& operation) { + return dynamic_cast<const MockOperation&>(operation); +} + +} + struct FakeDistributorStripeOperationContext : public DistributorStripeOperationContext { PendingMessageTracker& _message_tracker; @@ -129,15 +137,19 @@ BlockingOperationStarterTest::SetUp() } TEST_F(BlockingOperationStarterTest, operation_not_blocked_when_no_messages_pending) { - ASSERT_TRUE(_operationStarter->start(createMockOperation(), OperationStarter::Priority(0))); + auto operation = createMockOperation(); + ASSERT_TRUE(_operationStarter->start(operation, OperationStarter::Priority(0))); EXPECT_EQ("Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000001)), pri 0\n", _starterImpl->toString()); + EXPECT_FALSE(as_mock_operation(*operation).get_was_blocked()); } TEST_F(BlockingOperationStarterTest, operation_blocked_when_messages_pending) { // start should return true but not forward message to underlying starter. - ASSERT_TRUE(_operationStarter->start(createBlockingMockOperation(), OperationStarter::Priority(0))); + auto operation = createBlockingMockOperation(); + ASSERT_TRUE(_operationStarter->start(operation, OperationStarter::Priority(0))); EXPECT_EQ("", _starterImpl->toString()); + EXPECT_TRUE(as_mock_operation(*operation).get_was_blocked()); } } diff --git a/storage/src/tests/distributor/maintenancemocks.h b/storage/src/tests/distributor/maintenancemocks.h index 0ea1baf7412..f708fbd41aa 100644 --- a/storage/src/tests/distributor/maintenancemocks.h +++ b/storage/src/tests/distributor/maintenancemocks.h @@ -34,10 +34,14 @@ class MockOperation : public MaintenanceOperation document::Bucket _bucket; std::string _reason; bool _shouldBlock; + bool _was_blocked; + bool _was_throttled; public: MockOperation(const document::Bucket &bucket) : _bucket(bucket), - _shouldBlock(false) + _shouldBlock(false), + _was_blocked(false), + _was_throttled(false) {} std::string toString() const override { @@ -51,12 +55,16 @@ public: } void onStart(DistributorStripeMessageSender&) override {} void onReceive(DistributorStripeMessageSender&, const std::shared_ptr<api::StorageReply>&) override {} + void on_blocked() override { _was_blocked = true; } + void on_throttled() override { _was_throttled = true; } bool isBlocked(const DistributorStripeOperationContext&, const OperationSequencer&) const override { return _shouldBlock; } void setShouldBlock(bool shouldBlock) { _shouldBlock = shouldBlock; } + bool get_was_blocked() const noexcept { return _was_blocked; } + bool get_was_throttled() const noexcept { return _was_throttled; } }; class MockMaintenanceOperationGenerator diff --git a/storage/src/tests/distributor/mergeoperationtest.cpp b/storage/src/tests/distributor/mergeoperationtest.cpp index 769651afd8d..d174a6335b7 100644 --- a/storage/src/tests/distributor/mergeoperationtest.cpp +++ b/storage/src/tests/distributor/mergeoperationtest.cpp @@ -35,8 +35,19 @@ struct MergeOperationTest : Test, DistributorStripeTestUtil { void TearDown() override { close(); } + + std::shared_ptr<MergeOperation> setup_minimal_merge_op(); }; +std::shared_ptr<MergeOperation> +MergeOperationTest::setup_minimal_merge_op() +{ + document::BucketId bucket_id(16, 1); + auto op = std::make_shared<MergeOperation>(BucketAndNodes(makeDocumentBucket(bucket_id), toVector<uint16_t>(0, 1, 2))); + op->setIdealStateManager(&getIdealStateManager()); + return op; +} + TEST_F(MergeOperationTest, simple) { getClock().setAbsoluteTimeInSeconds(10); @@ -500,4 +511,22 @@ TEST_F(MergeOperationTest, merge_operation_is_not_blocked_by_request_bucket_info EXPECT_FALSE(op.isBlocked(operation_context(), _operation_sequencer)); } +TEST_F(MergeOperationTest, on_blocked_updates_metrics) +{ + auto op = setup_minimal_merge_op(); + auto metrics = getIdealStateManager().getMetrics().operations[IdealStateOperation::MERGE_BUCKET]; + EXPECT_EQ(0, metrics->blocked.getValue()); + op->on_blocked(); + EXPECT_EQ(1, metrics->blocked.getValue()); +} + +TEST_F(MergeOperationTest, on_throttled_updates_metrics) +{ + auto op = setup_minimal_merge_op(); + auto metrics = getIdealStateManager().getMetrics().operations[IdealStateOperation::MERGE_BUCKET]; + EXPECT_EQ(0, metrics->throttled.getValue()); + op->on_throttled(); + EXPECT_EQ(1, metrics->throttled.getValue()); +} + } // storage::distributor diff --git a/storage/src/tests/distributor/throttlingoperationstartertest.cpp b/storage/src/tests/distributor/throttlingoperationstartertest.cpp index c657c5390fc..eac345a243f 100644 --- a/storage/src/tests/distributor/throttlingoperationstartertest.cpp +++ b/storage/src/tests/distributor/throttlingoperationstartertest.cpp @@ -11,6 +11,14 @@ using document::BucketId; using document::test::makeDocumentBucket; using namespace ::testing; +namespace { + +const MockOperation& as_mock_operation(const Operation& operation) { + return dynamic_cast<const MockOperation&>(operation); +} + +} + struct ThrottlingOperationStarterTest : Test { std::shared_ptr<Operation> createMockOperation() { return std::shared_ptr<Operation>(new MockOperation(makeDocumentBucket(BucketId(16, 1)))); @@ -39,8 +47,10 @@ ThrottlingOperationStarterTest::TearDown() } TEST_F(ThrottlingOperationStarterTest, operation_not_throttled_when_slot_available) { - EXPECT_TRUE(_operationStarter->start(createMockOperation(), + auto operation = createMockOperation(); + EXPECT_TRUE(_operationStarter->start(operation, OperationStarter::Priority(0))); + EXPECT_FALSE(as_mock_operation(*operation).get_was_throttled()); } TEST_F(ThrottlingOperationStarterTest, operation_starting_is_forwarded_to_implementation) { @@ -52,8 +62,10 @@ TEST_F(ThrottlingOperationStarterTest, operation_starting_is_forwarded_to_implem TEST_F(ThrottlingOperationStarterTest, operation_throttled_when_no_available_slots) { _operationStarter->setMaxPendingRange(0, 0); - EXPECT_FALSE(_operationStarter->start(createMockOperation(), + auto operation = createMockOperation(); + EXPECT_FALSE(_operationStarter->start(operation, OperationStarter::Priority(0))); + EXPECT_TRUE(as_mock_operation(*operation).get_was_throttled()); } TEST_F(ThrottlingOperationStarterTest, throttling_with_max_pending_range) { diff --git a/storage/src/vespa/storage/distributor/blockingoperationstarter.cpp b/storage/src/vespa/storage/distributor/blockingoperationstarter.cpp index be0d2cd55f2..1b3540253b7 100644 --- a/storage/src/vespa/storage/distributor/blockingoperationstarter.cpp +++ b/storage/src/vespa/storage/distributor/blockingoperationstarter.cpp @@ -8,6 +8,7 @@ bool BlockingOperationStarter::start(const std::shared_ptr<Operation>& operation, Priority priority) { if (operation->isBlocked(_operation_context, _operation_sequencer)) { + operation->on_blocked(); return true; } return _starterImpl.start(operation, priority); diff --git a/storage/src/vespa/storage/distributor/idealstatemetricsset.cpp b/storage/src/vespa/storage/distributor/idealstatemetricsset.cpp index 5d712f643f3..89dc1ada39d 100644 --- a/storage/src/vespa/storage/distributor/idealstatemetricsset.cpp +++ b/storage/src/vespa/storage/distributor/idealstatemetricsset.cpp @@ -15,7 +15,13 @@ OperationMetricSet::OperationMetricSet(const std::string& name, metrics::Metric: "The number of operations successfully performed", this), failed("done_failed", {{"logdefault"},{"yamasdefault"}}, - "The number of operations that failed", this) + "The number of operations that failed", this), + blocked("blocked", + {{"logdefault"},{"yamasdefault"}}, + "The number of operations blocked by blocking operation starter", this), + throttled("throttled", + {{"logdefault"},{"yamasdefault"}}, + "The number of operations throttled by throttling operation starter", this) {} OperationMetricSet::~OperationMetricSet() = default; diff --git a/storage/src/vespa/storage/distributor/idealstatemetricsset.h b/storage/src/vespa/storage/distributor/idealstatemetricsset.h index 7f9090ffc39..ada1bd50e5c 100644 --- a/storage/src/vespa/storage/distributor/idealstatemetricsset.h +++ b/storage/src/vespa/storage/distributor/idealstatemetricsset.h @@ -16,6 +16,8 @@ public: metrics::LongValueMetric pending; metrics::LongCountMetric ok; metrics::LongCountMetric failed; + metrics::LongCountMetric blocked; + metrics::LongCountMetric throttled; OperationMetricSet(const std::string& name, metrics::Metric::Tags tags, const std::string& description, MetricSet* owner); ~OperationMetricSet() override; diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp index 7265afee84a..b1231fafcd9 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp @@ -94,6 +94,22 @@ IdealStateOperation::done() } } +void +IdealStateOperation::on_blocked() +{ + if (_manager) { + _manager->getMetrics().operations[getType()]->blocked.inc(1); + } +} + +void +IdealStateOperation::on_throttled() +{ + if (_manager) { + _manager->getMetrics().operations[getType()]->throttled.inc(1); + } +} + uint32_t IdealStateOperation::memorySize() const { diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h index 9ff0fb126f6..d4dc4e405df 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h +++ b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h @@ -159,6 +159,10 @@ public: */ virtual void done(); + void on_blocked() override; + + void on_throttled() override; + /** Called by IdealStateManager to allow the operation to call back its OperationFinished() method when done. diff --git a/storage/src/vespa/storage/distributor/operations/operation.cpp b/storage/src/vespa/storage/distributor/operations/operation.cpp index 60c9cd0db97..a48fb53a7ce 100644 --- a/storage/src/vespa/storage/distributor/operations/operation.cpp +++ b/storage/src/vespa/storage/distributor/operations/operation.cpp @@ -41,5 +41,15 @@ Operation::copyMessageSettings(const api::StorageCommand& source, api::StorageCo target.setPriority(source.getPriority()); } +void +Operation::on_blocked() +{ +} + +void +Operation::on_throttled() +{ +} + } diff --git a/storage/src/vespa/storage/distributor/operations/operation.h b/storage/src/vespa/storage/distributor/operations/operation.h index e3d452a90dc..fff0450cc04 100644 --- a/storage/src/vespa/storage/distributor/operations/operation.h +++ b/storage/src/vespa/storage/distributor/operations/operation.h @@ -66,6 +66,16 @@ public: return false; } + /* + * Called by blocking operation starter if operation was blocked + */ + virtual void on_blocked(); + + /* + * Called by throttling operation starter if operation was throttled + */ + virtual void on_throttled(); + /** Returns the timestamp on which the first message was sent from this callback. */ diff --git a/storage/src/vespa/storage/distributor/throttlingoperationstarter.cpp b/storage/src/vespa/storage/distributor/throttlingoperationstarter.cpp index 7f05a102c9a..ccf8273e386 100644 --- a/storage/src/vespa/storage/distributor/throttlingoperationstarter.cpp +++ b/storage/src/vespa/storage/distributor/throttlingoperationstarter.cpp @@ -24,6 +24,7 @@ ThrottlingOperationStarter::start(const std::shared_ptr<Operation>& operation, Priority priority) { if (!canStart(_pendingCount, priority)) { + operation->on_throttled(); return false; } auto wrappedOp = std::make_shared<ThrottlingOperation>(operation, *this); |