diff options
author | Geir Storli <geirst@oath.com> | 2018-03-13 12:36:54 +0000 |
---|---|---|
committer | Geir Storli <geirst@oath.com> | 2018-03-13 12:49:05 +0000 |
commit | fa5b695fa2dac2871125493ce71ef6c6bcc22497 (patch) | |
tree | 8081529d843102057f317a46b0b3ca24d3408034 /storage | |
parent | 89974a6ae0d6112a0a30f01731a23ad7794622f6 (diff) |
Remove LatencyStatisticsProvider and usage in PendingMessageTracker.
Diffstat (limited to 'storage')
8 files changed, 1 insertions, 375 deletions
diff --git a/storage/src/tests/distributor/distributor_host_info_reporter_test.cpp b/storage/src/tests/distributor/distributor_host_info_reporter_test.cpp index df01b3e30a7..c2e8367fa30 100644 --- a/storage/src/tests/distributor/distributor_host_info_reporter_test.cpp +++ b/storage/src/tests/distributor/distributor_host_info_reporter_test.cpp @@ -2,7 +2,6 @@ #include <vespa/vdstestlib/cppunit/macros.h> #include <vespa/storage/distributor/distributor_host_info_reporter.h> -#include <vespa/storage/distributor/latency_statistics_provider.h> #include <vespa/storage/distributor/min_replica_provider.h> #include <vespa/vespalib/data/slime/slime.h> #include <vespa/vespalib/io/fileutil.h> diff --git a/storage/src/tests/distributor/pendingmessagetrackertest.cpp b/storage/src/tests/distributor/pendingmessagetrackertest.cpp index 7adadd226d7..a7fec5ac460 100644 --- a/storage/src/tests/distributor/pendingmessagetrackertest.cpp +++ b/storage/src/tests/distributor/pendingmessagetrackertest.cpp @@ -26,17 +26,6 @@ class PendingMessageTrackerTest : public CppUnit::TestFixture { CPPUNIT_TEST(testGetPendingMessageTypes); CPPUNIT_TEST(testHasPendingMessage); CPPUNIT_TEST(testGetAllMessagesForSingleBucket); - CPPUNIT_TEST(nodeStatsCanBeOutputStreamed); - CPPUNIT_TEST(totalPutLatencyIsInitiallyZero); - CPPUNIT_TEST(statsNotAlteredBeforeReplyReceived); - CPPUNIT_TEST(totalPutLatencyIsTrackedForSingleRequest); - CPPUNIT_TEST(statsAreTrackedSeparatelyPerNode); - CPPUNIT_TEST(onlyPutMessagesAreTracked); - CPPUNIT_TEST(totalPutLatencyIsAggregatedAcrossRequests); - CPPUNIT_TEST(clearingMessagesDoesNotAffectStats); - CPPUNIT_TEST(timeTravellingClockLatenciesNotRegistered); - CPPUNIT_TEST(statsSnapshotIncludesAllNodes); - CPPUNIT_TEST(latencyProviderForwardsToImplementation); CPPUNIT_TEST(busy_reply_marks_node_as_busy); CPPUNIT_TEST(busy_node_duration_can_be_adjusted); CPPUNIT_TEST_SUITE_END(); @@ -48,40 +37,14 @@ public: void testGetPendingMessageTypes(); void testHasPendingMessage(); void testGetAllMessagesForSingleBucket(); - void nodeStatsCanBeOutputStreamed(); - void totalPutLatencyIsInitiallyZero(); - void statsNotAlteredBeforeReplyReceived(); - void totalPutLatencyIsTrackedForSingleRequest(); - void statsAreTrackedSeparatelyPerNode(); - void onlyPutMessagesAreTracked(); - void totalPutLatencyIsAggregatedAcrossRequests(); - void clearingMessagesDoesNotAffectStats(); - void timeTravellingClockLatenciesNotRegistered(); - void statsSnapshotIncludesAllNodes(); - void latencyProviderForwardsToImplementation(); void busy_reply_marks_node_as_busy(); void busy_node_duration_can_be_adjusted(); private: void insertMessages(PendingMessageTracker& tracker); - OperationStats makeOpStats(std::chrono::milliseconds totalLatency, - uint64_t numRequests) const - { - OperationStats stats; - stats.totalLatency = totalLatency; - stats.numRequests = numRequests; - return stats; - } }; -bool -operator==(const OperationStats& a, const OperationStats& b) -{ - return (a.totalLatency == b.totalLatency - && a.numRequests == b.numRequests); -} - namespace { class RequestBuilder { @@ -160,10 +123,6 @@ public: sendPutReply(*put, RequestBuilder().atTime(1000ms + latency)); } - OperationStats getNodePutOperationStats(uint16_t node) { - return _tracker->getNodeStats(node).puts; - } - PendingMessageTracker& tracker() { return *_tracker; } auto& clock() { return _clock; } @@ -541,137 +500,6 @@ PendingMessageTrackerTest::testGetAllMessagesForSingleBucket() } } -void -PendingMessageTrackerTest::nodeStatsCanBeOutputStreamed() -{ - NodeStats stats; - stats.puts = makeOpStats(56789ms, 10); - - std::ostringstream os; - os << stats; - std::string expected( - "NodeStats(puts=OperationStats(" - "totalLatency=56789ms, " - "numRequests=10))"); - CPPUNIT_ASSERT_EQUAL(expected, os.str()); -} - -void -PendingMessageTrackerTest::totalPutLatencyIsInitiallyZero() -{ - Fixture fixture; - CPPUNIT_ASSERT_EQUAL(makeOpStats(0ms, 0), - fixture.getNodePutOperationStats(0)); -} - -void -PendingMessageTrackerTest::statsNotAlteredBeforeReplyReceived() -{ - Fixture fixture; - fixture.sendPut(RequestBuilder().atTime(1000ms).toNode(0)); - CPPUNIT_ASSERT_EQUAL(makeOpStats(0ms, 0), - fixture.getNodePutOperationStats(0)); -} - -void -PendingMessageTrackerTest::totalPutLatencyIsTrackedForSingleRequest() -{ - Fixture fixture; - fixture.sendPutAndReplyWithLatency(0, 500ms); - - CPPUNIT_ASSERT_EQUAL(makeOpStats(500ms, 1), - fixture.getNodePutOperationStats(0)); -} - -void -PendingMessageTrackerTest::statsAreTrackedSeparatelyPerNode() -{ - Fixture fixture; - fixture.sendPutAndReplyWithLatency(0, 500ms); - fixture.sendPutAndReplyWithLatency(1, 600ms); - - CPPUNIT_ASSERT_EQUAL(makeOpStats(500ms, 1), - fixture.getNodePutOperationStats(0)); - CPPUNIT_ASSERT_EQUAL(makeOpStats(600ms, 1), - fixture.getNodePutOperationStats(1)); -} - -// Necessarily, this test will have to be altered when we add tracking of -// other message types as well. -void -PendingMessageTrackerTest::onlyPutMessagesAreTracked() -{ - Fixture fixture; - auto remove = fixture.sendRemove( - RequestBuilder().atTime(1000ms).toNode(0)); - fixture.sendRemoveReply(*remove, RequestBuilder().atTime(2000ms)); - CPPUNIT_ASSERT_EQUAL(makeOpStats(0ms, 0), - fixture.getNodePutOperationStats(0)); -} - -void -PendingMessageTrackerTest::totalPutLatencyIsAggregatedAcrossRequests() -{ - Fixture fixture; - // Model 2 concurrent puts to node 0. - fixture.sendPutAndReplyWithLatency(0, 500ms); - fixture.sendPutAndReplyWithLatency(0, 600ms); - CPPUNIT_ASSERT_EQUAL(makeOpStats(1100ms, 2), - fixture.getNodePutOperationStats(0)); -} - -void -PendingMessageTrackerTest::clearingMessagesDoesNotAffectStats() -{ - Fixture fixture; - fixture.sendPutAndReplyWithLatency(2, 2000ms); - fixture.tracker().clearMessagesForNode(2); - CPPUNIT_ASSERT_EQUAL(makeOpStats(2000ms, 1), - fixture.getNodePutOperationStats(2)); -} - -void -PendingMessageTrackerTest::timeTravellingClockLatenciesNotRegistered() -{ - Fixture fixture; - auto put = fixture.sendPut(RequestBuilder().atTime(1000ms).toNode(0)); - fixture.sendPutReply(*put, RequestBuilder().atTime(999ms)); - // Latency increase of zero, but we do count the request itself. - CPPUNIT_ASSERT_EQUAL(makeOpStats(0ms, 1), - fixture.getNodePutOperationStats(0)); -} - -void -PendingMessageTrackerTest::statsSnapshotIncludesAllNodes() -{ - Fixture fixture; - fixture.sendPutAndReplyWithLatency(0, 500ms); - fixture.sendPutAndReplyWithLatency(1, 600ms); - - NodeStatsSnapshot snapshot = fixture.tracker().getLatencyStatistics(); - - CPPUNIT_ASSERT_EQUAL(size_t(2), snapshot.nodeToStats.size()); - CPPUNIT_ASSERT_EQUAL(makeOpStats(500ms, 1), - snapshot.nodeToStats[0].puts); - CPPUNIT_ASSERT_EQUAL(makeOpStats(600ms, 1), - snapshot.nodeToStats[1].puts); -} - -void -PendingMessageTrackerTest::latencyProviderForwardsToImplementation() -{ - Fixture fixture; - fixture.sendPutAndReplyWithLatency(0, 500ms); - - LatencyStatisticsProvider& provider( - fixture.tracker().getLatencyStatisticsProvider()); - NodeStatsSnapshot snapshot = provider.getLatencyStatistics(); - - CPPUNIT_ASSERT_EQUAL(size_t(1), snapshot.nodeToStats.size()); - CPPUNIT_ASSERT_EQUAL(makeOpStats(500ms, 1), - snapshot.nodeToStats[0].puts); -} - // TODO don't set busy for visitor replies? These will mark the node as busy today, // but have the same actual semantics as busy merges (i.e. "queue is full", not "node // is too busy to accept new requests in general"). diff --git a/storage/src/vespa/storage/distributor/CMakeLists.txt b/storage/src/vespa/storage/distributor/CMakeLists.txt index 8adbfcaf9da..2fd51433306 100644 --- a/storage/src/vespa/storage/distributor/CMakeLists.txt +++ b/storage/src/vespa/storage/distributor/CMakeLists.txt @@ -17,7 +17,6 @@ vespa_add_library(storage_distributor externaloperationhandler.cpp idealstatemanager.cpp idealstatemetricsset.cpp - latency_statistics_provider.cpp messagetracker.cpp nodeinfo.cpp operation_sequencer.cpp diff --git a/storage/src/vespa/storage/distributor/distributor_host_info_reporter.h b/storage/src/vespa/storage/distributor/distributor_host_info_reporter.h index f664b44135d..cfffb54799f 100644 --- a/storage/src/vespa/storage/distributor/distributor_host_info_reporter.h +++ b/storage/src/vespa/storage/distributor/distributor_host_info_reporter.h @@ -8,7 +8,6 @@ namespace storage { namespace distributor { class BucketSpacesStatsProvider; -class LatencyStatisticsProvider; class MinReplicaProvider; struct OperationStats; diff --git a/storage/src/vespa/storage/distributor/latency_statistics_provider.cpp b/storage/src/vespa/storage/distributor/latency_statistics_provider.cpp deleted file mode 100644 index 5f06e0b74a6..00000000000 --- a/storage/src/vespa/storage/distributor/latency_statistics_provider.cpp +++ /dev/null @@ -1,28 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "latency_statistics_provider.h" -#include <ostream> - -namespace storage { -namespace distributor { - -std::ostream& -operator<<(std::ostream& os, const OperationStats& op) -{ - os << "OperationStats(" - << "totalLatency=" << op.totalLatency.count() - << "ms, numRequests=" << op.numRequests - << ')'; - return os; -} - -std::ostream& -operator<<(std::ostream& os, const NodeStats& stats) -{ - os << "NodeStats(puts=" << stats.puts << ')'; - return os; -} - -} // distributor -} // storage - diff --git a/storage/src/vespa/storage/distributor/latency_statistics_provider.h b/storage/src/vespa/storage/distributor/latency_statistics_provider.h deleted file mode 100644 index 1aa0b5a74cb..00000000000 --- a/storage/src/vespa/storage/distributor/latency_statistics_provider.h +++ /dev/null @@ -1,58 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#pragma once - -#include <chrono> -#include <unordered_map> -#include <iosfwd> -#include <stdint.h> - -namespace storage { -namespace distributor { - -struct OperationStats { - std::chrono::milliseconds totalLatency; - uint64_t numRequests; - - OperationStats() - : totalLatency(0), numRequests(0) - { - } -}; - -struct NodeStats { - OperationStats puts; -}; - -std::ostream& -operator<<(std::ostream&, const OperationStats&); - -std::ostream& -operator<<(std::ostream&, const NodeStats&); - -struct NodeStatsSnapshot -{ - std::unordered_map<uint16_t, NodeStats> nodeToStats; -}; - -class LatencyStatisticsProvider -{ -public: - virtual ~LatencyStatisticsProvider() {} - - /** - * Get a snapshot representation of the latency statistics towards a set of - * nodes at the point of the call. - * - * Can be called at any time after registration from another thread context - * and the call must thus be thread safe and data race free. - */ - NodeStatsSnapshot getLatencyStatistics() const { - return doGetLatencyStatistics(); - } - -private: - virtual NodeStatsSnapshot doGetLatencyStatistics() const = 0; -}; - -} // distributor -} // storage diff --git a/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp b/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp index f7e207d7b8c..5035e6feeab 100644 --- a/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp +++ b/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp @@ -7,17 +7,13 @@ #include <vespa/log/log.h> LOG_SETUP(".pendingmessages"); -namespace storage { - -namespace distributor { +namespace storage::distributor { PendingMessageTracker::PendingMessageTracker(framework::ComponentRegister& cr) : framework::HtmlStatusReporter("pendingmessages", "Pending messages to storage nodes"), _component(cr, "pendingmessagetracker"), - _nodeIndexToStats(), _nodeInfo(_component.getClock()), - _statisticsForwarder(*this), _nodeBusyDuration(60), _lock() { @@ -130,7 +126,6 @@ PendingMessageTracker::reply(const api::StorageReply& r) if (iter != msgs.end()) { bucket = iter->bucket; _nodeInfo.decPending(r.getAddress()->getIndex()); - updateNodeStatsOnReply(*iter); api::ReturnCode::Result code = r.getResult().getResult(); if (code == api::ReturnCode::BUSY || code == api::ReturnCode::TIMEOUT) { _nodeInfo.setBusy(r.getAddress()->getIndex(), _nodeBusyDuration); @@ -142,49 +137,6 @@ PendingMessageTracker::reply(const api::StorageReply& r) return bucket; } -void -PendingMessageTracker::updateNodeStatsOnReply(const MessageEntry& entry) -{ - NodeStats& stats(_nodeIndexToStats[entry.nodeIdx]); - switch (entry.msgType) { - case api::MessageType::PUT_ID: - updateOperationStats(stats.puts, entry); - break; - default: - return; // Message was for type not tracked by stats. - } -} - -void -PendingMessageTracker::updateOperationStats(OperationStats& opStats, - const MessageEntry& entry) const -{ - // Time might go backwards due to clock adjustments (here assuming clock - // implementation in storage framework is non-monotonic), so avoid negative - // latencies by clamping to delta of 0. - auto now = std::max(currentTime(), entry.timeStamp); - opStats.totalLatency += (now - entry.timeStamp); - ++opStats.numRequests; -} - - -NodeStatsSnapshot -PendingMessageTracker::getLatencyStatistics() const -{ - std::lock_guard<std::mutex> guard(_lock); - NodeStatsSnapshot snapshot; - // Conveniently, snapshot data structure is exactly the same as our own. - snapshot.nodeToStats = _nodeIndexToStats; - return snapshot; -} - -NodeStatsSnapshot -PendingMessageTracker::ForwardingLatencyStatisticsProvider -::doGetLatencyStatistics() const -{ - return _messageTracker.getLatencyStatistics(); -} - namespace { template <typename Range> @@ -334,14 +286,4 @@ PendingMessageTracker::print(std::ostream& /*out*/, } -NodeStats -PendingMessageTracker::getNodeStats(uint16_t node) const -{ - std::lock_guard<std::mutex> guard(_lock); - auto nodeIter = _nodeIndexToStats.find(node); - return (nodeIter != _nodeIndexToStats.end() ? nodeIter->second - : NodeStats()); } - -} // distributor -} // storage diff --git a/storage/src/vespa/storage/distributor/pendingmessagetracker.h b/storage/src/vespa/storage/distributor/pendingmessagetracker.h index e7bcf85a38d..59bcb206127 100644 --- a/storage/src/vespa/storage/distributor/pendingmessagetracker.h +++ b/storage/src/vespa/storage/distributor/pendingmessagetracker.h @@ -2,7 +2,6 @@ #pragma once #include "nodeinfo.h" -#include "latency_statistics_provider.h" #include <vespa/storage/common/storagelink.h> #include <vespa/storageframework/generic/status/htmlstatusreporter.h> #include <vespa/storageframework/generic/component/componentregister.h> @@ -28,19 +27,6 @@ namespace distributor { class PendingMessageTracker : public framework::HtmlStatusReporter { - class ForwardingLatencyStatisticsProvider - : public LatencyStatisticsProvider - { - PendingMessageTracker& _messageTracker; - public: - ForwardingLatencyStatisticsProvider( - PendingMessageTracker& messageTracker) - : _messageTracker(messageTracker) - { - } - - NodeStatsSnapshot doGetLatencyStatistics() const override; - }; public: class Checker { @@ -103,41 +89,11 @@ public: NodeInfo& getNodeInfo() { return _nodeInfo; } /** - * Get the statistics for all completed operations towards a specific - * storage node. "Completed" here means both successful and failed - * operations. Statistics are monotonically increasing within the scope of - * the process' lifetime and are never reset. This models how the Linux - * kernel reports its internal stats and means the caller must maintain - * value snapshots to extract meaningful time series information. - * - * If stats are requested for a node that has not had any operations - * complete towards it, the returned stats will be all zero. - * - * Method is thread safe and data race free. - * - * It is assumed that NodeStats is sufficiently small that returning it - * by value does not incur a measurable performance impact. This also - * prevents any data race issues in case returned stats are e.g. - * concurrently read by another thread such the metric snapshotting thread. - */ - NodeStats getNodeStats(uint16_t node) const; - - /** * Clears all pending messages for the given node, and returns * the messages erased. */ std::vector<uint64_t> clearMessagesForNode(uint16_t node); - /** - * Must not be called when _lock is already held or there will be a - * deadlock. - */ - NodeStatsSnapshot getLatencyStatistics() const; - - LatencyStatisticsProvider& getLatencyStatisticsProvider() { - return _statisticsForwarder; - } - void setNodeBusyDuration(std::chrono::seconds secs) noexcept { _nodeBusyDuration = secs; } @@ -212,9 +168,7 @@ private: Messages _messages; framework::Component _component; - std::unordered_map<uint16_t, NodeStats> _nodeIndexToStats; NodeInfo _nodeInfo; - ForwardingLatencyStatisticsProvider _statisticsForwarder; std::chrono::seconds _nodeBusyDuration; // Since distributor is currently single-threaded, this will only @@ -234,15 +188,6 @@ private: */ void updateNodeStatsOnReply(const MessageEntry& entry); - - /** - * Modifies opStats in-place with added latency based on delta from send - * time to current time and incremented operation count. - */ - void updateOperationStats(OperationStats& opStats, - const MessageEntry& entry) const; - - void getStatusStartPage(std::ostream& out) const; void getStatusPerNode(std::ostream& out) const; void getStatusPerBucket(std::ostream& out) const; |