diff options
author | Tor Brede Vekterli <vekterli@yahoo-inc.com> | 2017-04-25 12:45:13 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-04-25 12:45:13 +0200 |
commit | d738318dcdeff8a2d68e32c2fc9c7f1358a662b2 (patch) | |
tree | bd4087a7f6e76fa702bea403bfb4cfa5696d6b35 | |
parent | c9a881c21a8366b35f54d6d4ffd1ca6101ed48bf (diff) | |
parent | 21703df86c2bd7558636108362ff206fd4439f3a (diff) |
Merge pull request #2260 from yahoo/balder/deinline-in-storageserver
- Deinline large metrics constructors/destructors.
15 files changed, 274 insertions, 360 deletions
diff --git a/storage/src/vespa/storage/storageserver/CMakeLists.txt b/storage/src/vespa/storage/storageserver/CMakeLists.txt index 17638f55b3a..79ce0206005 100644 --- a/storage/src/vespa/storage/storageserver/CMakeLists.txt +++ b/storage/src/vespa/storage/storageserver/CMakeLists.txt @@ -8,6 +8,7 @@ vespa_add_library(storage_storageserver fnetlistener.cpp rpcrequestwrapper.cpp communicationmanager.cpp + communicationmanagermetrics.cpp statemanager.cpp documentapiconverter.cpp opslogger.cpp @@ -20,6 +21,7 @@ vespa_add_library(storage_storageserver distributornode.cpp servicelayernode.cpp statereporter.cpp + storagemetricsset.cpp changedbucketownershiphandler.cpp INSTALL lib64 DEPENDS diff --git a/storage/src/vespa/storage/storageserver/applicationgenerationfetcher.h b/storage/src/vespa/storage/storageserver/applicationgenerationfetcher.h index 0e33c8c86c6..7d5b64c1d17 100644 --- a/storage/src/vespa/storage/storageserver/applicationgenerationfetcher.h +++ b/storage/src/vespa/storage/storageserver/applicationgenerationfetcher.h @@ -9,11 +9,14 @@ #pragma once +#include <cstdint> +#include <string> + namespace storage { class ApplicationGenerationFetcher { public: - virtual ~ApplicationGenerationFetcher() {}; + virtual ~ApplicationGenerationFetcher() {} virtual int64_t getGeneration() const = 0; virtual std::string getComponentName() const = 0; diff --git a/storage/src/vespa/storage/storageserver/bucketintegritychecker.h b/storage/src/vespa/storage/storageserver/bucketintegritychecker.h index 6b87be804d0..fd9516a85aa 100644 --- a/storage/src/vespa/storage/storageserver/bucketintegritychecker.h +++ b/storage/src/vespa/storage/storageserver/bucketintegritychecker.h @@ -112,47 +112,29 @@ public: ServiceLayerComponentRegister&); ~BucketIntegrityChecker(); - virtual void onClose() override; - - virtual void print(std::ostream& out, bool verbose, const std::string& indent) const override; - + void onClose() override; + void print(std::ostream& out, bool verbose, const std::string& indent) const override; SchedulingOptions& getSchedulingOptions() { return _scheduleOptions; } - bool isWorkingOnCycle() const; - uint32_t getCycleCount() const; /** Give thread a bump by signalling it. */ void bump() const; - void setMaxThreadWaitTime(framework::MilliSecTime milliSecs) - { _maxThreadWaitTime = milliSecs; } + void setMaxThreadWaitTime(framework::MilliSecTime milliSecs) { _maxThreadWaitTime = milliSecs; } framework::Clock& getClock() { return _component.getClock(); } private: - virtual void configure(std::unique_ptr<vespa::config::content::core::StorIntegritycheckerConfig>) override; - + void configure(std::unique_ptr<vespa::config::content::core::StorIntegritycheckerConfig>) override; void onDoneInit() override; - bool onInternalReply(const std::shared_ptr<api::InternalReply>&) override; bool onSetSystemState(const std::shared_ptr<api::SetSystemStateCommand>&) override; - bool onNotifyBucketChangeReply( - const std::shared_ptr<api::NotifyBucketChangeReply>&) override - { return true; } - - SchedulingOptions::RunState getCurrentRunState( - framework::SecondTime time) const; - - virtual void run(framework::ThreadHandle&) override; - + bool onNotifyBucketChangeReply(const std::shared_ptr<api::NotifyBucketChangeReply>&) override { return true; } + SchedulingOptions::RunState getCurrentRunState(framework::SecondTime time) const; + void run(framework::ThreadHandle&) override; uint32_t getTotalPendingCount() const; - - // Status::Reporter implementation - virtual void reportHtmlStatus(std::ostream&, - const framework::HttpUrlPath&) const override; - + void reportHtmlStatus(std::ostream&, const framework::HttpUrlPath&) const override; }; } - diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.h b/storage/src/vespa/storage/storageserver/communicationmanager.h index 31100de78d1..3cae63d3586 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.h +++ b/storage/src/vespa/storage/storageserver/communicationmanager.h @@ -1,8 +1,6 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. /** - - -* @class CommunicationManager + * @class CommunicationManager * @ingroup storageserver * * @brief Class used for sending messages over the network. @@ -12,26 +10,26 @@ #pragma once -#include <vespa/vespalib/util/document_runnable.h> -#include <map> -#include <memory> -#include <queue> -#include <vector> -#include <atomic> -#include <vespa/metrics/metrics.h> -#include <vespa/messagebus/rpcmessagebus.h> -#include <vespa/storageframework/storageframework.h> +#include <vespa/documentapi/documentapi.h> +#include "communicationmanagermetrics.h" +#include "messageallocationtypes.h" +#include "documentapiconverter.h" #include <vespa/storage/common/storagelink.h> #include <vespa/storage/config/config-stor-communicationmanager.h> +#include <vespa/storageframework/storageframework.h> +#include <vespa/storageframework/storageframework.h> #include <vespa/storageapi/messageapi/storagecommand.h> #include <vespa/storageapi/messageapi/storagemessage.h> #include <vespa/storageapi/mbusprot/storagecommand.h> #include <vespa/storageapi/mbusprot/storagereply.h> -#include <vespa/documentapi/documentapi.h> -#include <vespa/storage/storageserver/communicationmanagermetrics.h> -#include <vespa/storage/storageserver/messageallocationtypes.h> -#include "documentapiconverter.h" -#include <vespa/storageframework/storageframework.h> +#include <vespa/messagebus/rpcmessagebus.h> +#include <vespa/metrics/metrics.h> +#include <vespa/vespalib/util/document_runnable.h> +#include <map> +#include <memory> +#include <queue> +#include <vector> +#include <atomic> namespace storage { @@ -189,43 +187,32 @@ private: public: CommunicationManager(StorageComponentRegister& compReg, const config::ConfigUri & configUri); - virtual ~CommunicationManager(); + ~CommunicationManager(); void enqueue(const std::shared_ptr<api::StorageMessage> & msg); - mbus::RPCMessageBus& getMessageBus() { assert(_mbus.get()); return *_mbus; } - const PriorityConverter& getPriorityConverter() const { return _docApiConverter.getPriorityConverter(); } /** * From StorageLink. Called when messages arrive from storage * modules. Will convert and dispatch messages to MessageServer */ - virtual bool onUp(const std::shared_ptr<api::StorageMessage>&) override; - + bool onUp(const std::shared_ptr<api::StorageMessage>&) override; bool sendCommand(const std::shared_ptr<api::StorageCommand>& command); - bool sendReply(const std::shared_ptr<api::StorageReply>& reply); void sendDirectRPCReply(RPCRequestWrapper& request, const std::shared_ptr<api::StorageReply>& reply); void sendMessageBusReply(StorageTransportContext& context, const std::shared_ptr<api::StorageReply>& reply); // Pump thread void run(framework::ThreadHandle&) override; + void print(std::ostream& out, bool verbose, const std::string& indent) const override; - virtual void print(std::ostream& out, bool verbose, const std::string& indent) const override; - - /** Get messages from messagebus. */ void handleMessage(std::unique_ptr<mbus::Message> msg) override; - void sendMessageBusMessage(const std::shared_ptr<api::StorageCommand>& msg, - std::unique_ptr<mbus::Message> mbusMsg, const mbus::Route& route); + std::unique_ptr<mbus::Message> mbusMsg, const mbus::Route& route); - /** Get replies from messagebus. */ void handleReply(std::unique_ptr<mbus::Reply> msg) override; - void updateMessagebusProtocol(const document::DocumentTypeRepo::SP &repo); - }; } // storage - diff --git a/storage/src/vespa/storage/storageserver/communicationmanagermetrics.cpp b/storage/src/vespa/storage/storageserver/communicationmanagermetrics.cpp new file mode 100644 index 00000000000..d1e71b6e8a5 --- /dev/null +++ b/storage/src/vespa/storage/storageserver/communicationmanagermetrics.cpp @@ -0,0 +1,32 @@ +// Copyright 2017 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "communicationmanagermetrics.h" +#include <vespa/documentapi/loadtypes/loadtypeset.h> + +using namespace metrics; +namespace storage { + +CommunicationManagerMetrics::CommunicationManagerMetrics(const LoadTypeSet& loadTypes, MetricSet* owner) + : MetricSet("communication", "", "Metrics for the communication manager", owner), + queueSize("messagequeue", "", "Size of input message queue.", this), + messageProcessTime(loadTypes, + DoubleAverageMetric("messageprocesstime", "", + "Time transport thread uses to process a single message"), + this), + exceptionMessageProcessTime(loadTypes, + DoubleAverageMetric("exceptionmessageprocesstime", "", + "Time transport thread uses to process a single message " + "that fails with an exception thrown into communication manager"), + this), + failedDueToTooLittleMemory("toolittlememory", "", "Number of messages failed due to too little memory available", this), + convertToStorageAPIFailures("convertfailures", "", + "Number of messages that failed to get converted to storage API messages", this), + sendCommandLatency("sendcommandlatency", "", "Average ms used to send commands to MBUS", this), + sendReplyLatency("sendreplylatency", "", "Average ms used to send replies to MBUS", this) +{ +} + +CommunicationManagerMetrics::~CommunicationManagerMetrics() { } + +} + diff --git a/storage/src/vespa/storage/storageserver/communicationmanagermetrics.h b/storage/src/vespa/storage/storageserver/communicationmanagermetrics.h index 4b580f79904..40c3646647e 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanagermetrics.h +++ b/storage/src/vespa/storage/storageserver/communicationmanagermetrics.h @@ -9,7 +9,6 @@ #pragma once #include <vespa/metrics/metrics.h> -#include <vespa/documentapi/loadtypes/loadtypeset.h> namespace storage { @@ -22,34 +21,8 @@ struct CommunicationManagerMetrics : public metrics::MetricSet { metrics::DoubleAverageMetric sendCommandLatency; metrics::DoubleAverageMetric sendReplyLatency; - CommunicationManagerMetrics(const metrics::LoadTypeSet& loadTypes, - metrics::MetricSet* owner = 0) - : metrics::MetricSet("communication", "", - "Metrics for the communication manager", owner), - queueSize("messagequeue", "", "Size of input message queue.", this), - messageProcessTime(loadTypes, metrics::DoubleAverageMetric( - "messageprocesstime", "", - "Time transport thread uses to process a single message"), - this), - exceptionMessageProcessTime(loadTypes, metrics::DoubleAverageMetric( - "exceptionmessageprocesstime", "", - "Time transport thread uses to process a single message " - "that fails with an exception thrown into communication " - "manager"), - this), - failedDueToTooLittleMemory("toolittlememory", "", - "Number of messages failed due to too little memory " - "available", this), - convertToStorageAPIFailures("convertfailures", "", - "Number of messages that failed to get converted to " - "storage API messages", this), - sendCommandLatency("sendcommandlatency", "", - "Average ms used to send commands to MBUS", this), - sendReplyLatency("sendreplylatency", "", - "Average ms used to send replies to MBUS", this) - { - } - + CommunicationManagerMetrics(const metrics::LoadTypeSet& loadTypes, metrics::MetricSet* owner = 0); + ~CommunicationManagerMetrics(); }; } diff --git a/storage/src/vespa/storage/storageserver/distributornode.h b/storage/src/vespa/storage/storageserver/distributornode.h index 03792d8fb56..31fe8fe7878 100644 --- a/storage/src/vespa/storage/storageserver/distributornode.h +++ b/storage/src/vespa/storage/storageserver/distributornode.h @@ -8,9 +8,9 @@ #pragma once +#include "distributornodecontext.h" +#include "storagenode.h" #include <vespa/storage/common/distributorcomponent.h> -#include <vespa/storage/storageserver/distributornodecontext.h> -#include <vespa/storage/storageserver/storagenode.h> #include <vespa/storageframework/generic/thread/tickingthread.h> namespace storage { @@ -41,19 +41,16 @@ public: StorageLink::UP communicationManager = StorageLink::UP()); ~DistributorNode(); - virtual const lib::NodeType& getNodeType() const override - { return lib::NodeType::DISTRIBUTOR; } - - virtual ResumeGuard pause() override; + const lib::NodeType& getNodeType() const override { return lib::NodeType::DISTRIBUTOR; } + ResumeGuard pause() override; void handleConfigChange(vespa::config::content::core::StorDistributormanagerConfig&); void handleConfigChange(vespa::config::content::core::StorVisitordispatcherConfig&); private: - virtual void initializeNodeSpecific() override; - virtual StorageLink::UP createChain() override; - - virtual api::Timestamp getUniqueTimestamp() override; + void initializeNodeSpecific() override; + StorageLink::UP createChain() override; + api::Timestamp getUniqueTimestamp() override; /** * Shut down necessary distributor-specific components before shutting @@ -63,4 +60,3 @@ private: }; } // storage - diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.cpp b/storage/src/vespa/storage/storageserver/mergethrottler.cpp index ebfa10fe814..577fedb58e1 100644 --- a/storage/src/vespa/storage/storageserver/mergethrottler.cpp +++ b/storage/src/vespa/storage/storageserver/mergethrottler.cpp @@ -1,15 +1,12 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <vespa/fastos/fastos.h> -#include <vespa/storage/storageserver/mergethrottler.h> - +#include "mergethrottler.h" +#include "storagemetricsset.h" #include <iostream> #include <sstream> -#include <iterator> #include <vespa/vespalib/stllike/asciistream.h> #include <vespa/storage/common/nodestateupdater.h> #include <vespa/storage/persistence/messages.h> -#include <vespa/storage/storageserver/storagemetricsset.h> #include <vespa/log/log.h> LOG_SETUP(".mergethrottler"); @@ -46,6 +43,71 @@ const mbus::string DummyMbusMessage<Base>::NAME = "SkyNet"; } +MergeThrottler::ChainedMergeState::ChainedMergeState() + : _cmd(), + _cmdString(), + _clusterStateVersion(0), + _inCycle(false), + _executingLocally(false), + _unwinding(false), + _cycleBroken(false), + _aborted(false) +{ } + +MergeThrottler::ChainedMergeState::ChainedMergeState(const api::StorageMessage::SP& cmd, bool executing) + : _cmd(cmd), + _cmdString(cmd->toString()), + _clusterStateVersion(static_cast<const api::MergeBucketCommand&>(*cmd).getClusterStateVersion()), + _inCycle(false), + _executingLocally(executing), + _unwinding(false), + _cycleBroken(false), + _aborted(false) +{ } +MergeThrottler::ChainedMergeState::~ChainedMergeState() {} + +MergeThrottler::Metrics::Metrics(metrics::MetricSet* owner) + : metrics::MetricSet("mergethrottler", "", "", owner), + averageQueueWaitingTime("averagequeuewaitingtime", "", "Average time a merge spends in the throttler queue", this), + chaining("mergechains", this), + local("locallyexecutedmerges", this) +{ } +MergeThrottler::Metrics::~Metrics() {} + +MergeThrottler::MergeFailureMetrics::MergeFailureMetrics(metrics::MetricSet* owner) + : metrics::MetricSet("failures", "", "Detailed failure statistics", owner), + sum("total", "", "Sum of all failures", this), + notready("notready", "", "The number of merges discarded because distributor was not ready", this), + timeout("timeout", "", "The number of merges that failed because they timed out towards storage", this), + aborted("aborted", "", "The number of merges that failed because the storage node was (most likely) shutting down", this), + wrongdistribution("wrongdistribution", "", "The number of merges that were discarded (flushed) because they were initiated at an older cluster state than the current", this), + bucketnotfound("bucketnotfound", "", "The number of operations that failed because the bucket did not exist", this), + busy("busy", "", "The number of merges that failed because the storage node was busy", this), + exists("exists", "", "The number of merges that were rejected due to a merge operation for their bucket already being processed", this), + rejected("rejected", "", "The number of merges that were rejected", this), + other("other", "", "The number of other failures", this) +{ + sum.addMetricToSum(notready); + sum.addMetricToSum(timeout); + sum.addMetricToSum(aborted); + sum.addMetricToSum(wrongdistribution); + sum.addMetricToSum(bucketnotfound); + sum.addMetricToSum(busy); + sum.addMetricToSum(exists); + sum.addMetricToSum(rejected); + sum.addMetricToSum(other); +} +MergeThrottler::MergeFailureMetrics::~MergeFailureMetrics() { } + + +MergeThrottler::MergeOperationMetrics::MergeOperationMetrics(const std::string& name, metrics::MetricSet* owner) + : metrics::MetricSet(name, "", vespalib::make_string("Statistics for %s", name.c_str()), owner), + ok("ok", "", vespalib::make_string("The number of successful merges for '%s'", name.c_str()), this), + failures(this) +{ +} +MergeThrottler::MergeOperationMetrics::~MergeOperationMetrics() { } + MergeThrottler::MergeNodeSequence::MergeNodeSequence( const api::MergeBucketCommand& cmd, uint16_t thisIndex) diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.h b/storage/src/vespa/storage/storageserver/mergethrottler.h index 8ee4ca7be71..74ffe095d7c 100644 --- a/storage/src/vespa/storage/storageserver/mergethrottler.h +++ b/storage/src/vespa/storage/storageserver/mergethrottler.h @@ -7,11 +7,6 @@ */ #pragma once -#include <map> -#include <utility> -#include <vector> -#include <set> -#include <memory> #include <vespa/vespalib/util/document_runnable.h> #include <vespa/storage/config/config-stor-server.h> #include <vespa/storage/common/storagelink.h> @@ -48,37 +43,8 @@ public: metrics::LongCountMetric rejected; metrics::LongCountMetric other; - MergeFailureMetrics(metrics::MetricSet* owner) - : metrics::MetricSet("failures", "", "Detailed failure statistics", owner), - sum("total", "", "Sum of all failures", this), - notready("notready", "", "The number of merges discarded " - "because distributor was not ready", this), - timeout("timeout", "", "The number of merges that failed because " - "they timed out towards storage", this), - aborted("aborted", "", "The number of merges that failed " - "because the storage node was (most likely) shutting down", this), - wrongdistribution("wrongdistribution", "", "The number of merges that " - "were discarded (flushed) because they were initiated at an " - "older cluster state than the current", this), - bucketnotfound("bucketnotfound", "", "The number of operations that failed " - "because the bucket did not exist", this), - busy("busy", "", "The number of merges that failed because the " - "storage node was busy", this), - exists("exists", "", "The number of merges that were rejected due to a " - "merge operation for their bucket already being processed", this), - rejected("rejected", "", "The number of merges that were rejected", this), - other("other", "", "The number of other failures", this) - { - sum.addMetricToSum(notready); - sum.addMetricToSum(timeout); - sum.addMetricToSum(aborted); - sum.addMetricToSum(wrongdistribution); - sum.addMetricToSum(bucketnotfound); - sum.addMetricToSum(busy); - sum.addMetricToSum(exists); - sum.addMetricToSum(rejected); - sum.addMetricToSum(other); - } + MergeFailureMetrics(metrics::MetricSet* owner); + ~MergeFailureMetrics(); }; class MergeOperationMetrics : public metrics::MetricSet @@ -87,12 +53,8 @@ public: metrics::LongCountMetric ok; MergeFailureMetrics failures; - MergeOperationMetrics(const std::string& name, metrics::MetricSet* owner) - : metrics::MetricSet(name, "", vespalib::make_string("Statistics for %s", name.c_str()), owner), - ok("ok", "", vespalib::make_string("The number of successful merges for '%s'", name.c_str()), this), - failures(this) - { - } + MergeOperationMetrics(const std::string& name, metrics::MetricSet* owner); + ~MergeOperationMetrics(); }; class Metrics : public metrics::MetricSet @@ -102,15 +64,8 @@ public: MergeOperationMetrics chaining; MergeOperationMetrics local; - Metrics(metrics::MetricSet* owner = 0) - : metrics::MetricSet("mergethrottler", "", "", owner), - averageQueueWaitingTime( - "averagequeuewaitingtime", "", "Average time a merge spends in " - "the throttler queue", this), - chaining("mergechains", this), - local("locallyexecutedmerges", this) - { - } + Metrics(metrics::MetricSet* owner = 0); + ~Metrics(); }; private: @@ -151,30 +106,9 @@ private: bool _cycleBroken; bool _aborted; - ChainedMergeState() - : _cmd(), - _cmdString(), - _clusterStateVersion(0), - _inCycle(false), - _executingLocally(false), - _unwinding(false), - _cycleBroken(false), - _aborted(false) - { - } - - ChainedMergeState(const api::StorageMessage::SP& cmd, bool executing = false) - : _cmd(cmd), - _cmdString(cmd->toString()), - _clusterStateVersion(static_cast<const api::MergeBucketCommand&>( - *cmd).getClusterStateVersion()), - _inCycle(false), - _executingLocally(executing), - _unwinding(false), - _cycleBroken(false), - _aborted(false) - { - } + ChainedMergeState(); + ChainedMergeState(const api::StorageMessage::SP& cmd, bool executing = false); + ~ChainedMergeState(); // Use default copy-constructor/assignment operator bool isExecutingLocally() const { return _executingLocally; } @@ -239,9 +173,7 @@ public: * windowSizeIncrement used for allowing unit tests to start out with more * than 1 as their window size. */ - MergeThrottler(const config::ConfigUri & configUri, - StorageComponentRegister&); - + MergeThrottler(const config::ConfigUri & configUri, StorageComponentRegister&); ~MergeThrottler(); /** Implements document::Runnable::run */ @@ -267,12 +199,8 @@ public: vespalib::Lock& getStateLock() { return _stateLock; } Metrics& getMetrics() { return *_metrics; } - std::size_t getMaxQueueSize() const { return _maxQueueSize; } - void print(std::ostream& out, bool verbose, const std::string& indent) const override; - - // HtmlStatusReporter implementation void reportHtmlStatus(std::ostream&, const framework::HttpUrlPath&) const override; private: friend class ThreadRendezvousGuard; // impl in .cpp file @@ -285,9 +213,7 @@ private: std::size_t _sortedIndex; // Index of current storage node in the sorted node sequence const uint16_t _thisIndex; // Index of the current storage node - MergeNodeSequence( - const api::MergeBucketCommand& cmd, - uint16_t thisIndex); + MergeNodeSequence(const api::MergeBucketCommand& cmd, uint16_t thisIndex); std::size_t getSortedIndex() const { return _sortedIndex; } const std::vector<api::MergeBucketCommand::Node>& getSortedNodes() const { @@ -332,13 +258,8 @@ private: // NOTE: unless explicitly specified, all the below functions require // _sync lock to be held upon call (usually implicitly via MessageGuard) - void handleMessageDown( - const std::shared_ptr<api::StorageMessage>& msg, - MessageGuard& msgGuard); - - void handleMessageUp( - const std::shared_ptr<api::StorageMessage>& msg, - MessageGuard& msgGuard); + void handleMessageDown(const std::shared_ptr<api::StorageMessage>& msg, MessageGuard& msgGuard); + void handleMessageUp(const std::shared_ptr<api::StorageMessage>& msg, MessageGuard& msgGuard); /** * Handle the receival of MergeBucketReply, be it from another node @@ -372,17 +293,13 @@ private: * * Precondition: no existing merge state exists for msg's bucketid. */ - void processNewMergeCommand( - const api::StorageMessage::SP& msg, - MessageGuard& msgGuard); + void processNewMergeCommand(const api::StorageMessage::SP& msg, MessageGuard& msgGuard); /** * Precondition: an existing merge state exists for msg's bucketid. * @return true if message was handled, false otherwise (see onUp/onDown). */ - bool processCycledMergeCommand( - const api::StorageMessage::SP& msg, - MessageGuard& msgGuard); + bool processCycledMergeCommand(const api::StorageMessage::SP& msg, MessageGuard& msgGuard); /** * Forwards the given MergeBucketCommand to the storage node given @@ -403,10 +320,7 @@ private: * @return Highest priority waiting merge or null SP if queue is empty */ api::StorageMessage::SP getNextQueuedMerge(); - - void enqueueMerge( - const api::StorageMessage::SP& msg, - MessageGuard& msgGuard); + void enqueueMerge(const api::StorageMessage::SP& msg, MessageGuard& msgGuard); /** * @return true if throttle policy says at least one additional @@ -434,25 +348,15 @@ private: * Immediately reject all queued merges whose cluster state version is * less than that of rejectLessThanVersion */ - void rejectOutdatedQueuedMerges(MessageGuard& msgGuard, - uint32_t rejectLessThanVersion); - + void rejectOutdatedQueuedMerges(MessageGuard& msgGuard, uint32_t rejectLessThanVersion); bool attemptProcessNextQueuedMerge(MessageGuard& msgGuard); - bool processQueuedMerges(MessageGuard& msgGuard); - void handleRendezvous(vespalib::MonitorGuard& guard); - void rendezvousWithWorkerThread(vespalib::MonitorGuard&); - void releaseWorkerThreadRendezvous(vespalib::MonitorGuard&); - bool isDiffCommand(const api::StorageMessage& msg) const; - bool isMergeCommand(const api::StorageMessage& msg) const; - bool isMergeReply(const api::StorageMessage& msg) const; - bool bucketIsUnknownOrAborted(const document::BucketId& bucket) const; std::shared_ptr<api::StorageMessage> makeAbortReply( @@ -460,8 +364,7 @@ private: vespalib::stringref reason) const; void handleOutdatedMerges(const api::SetSystemStateCommand&); - void rejectOperationsInThreadQueue(MessageGuard&, - uint32_t minimumStateVersion); + void rejectOperationsInThreadQueue(MessageGuard&, uint32_t minimumStateVersion); void markActiveMergesAsAborted(uint32_t minimumStateVersion); // const function, but metrics are mutable @@ -471,4 +374,3 @@ private: }; } // namespace storage - diff --git a/storage/src/vespa/storage/storageserver/opslogger.h b/storage/src/vespa/storage/storageserver/opslogger.h index 57f304a04aa..905c0fd4e85 100644 --- a/storage/src/vespa/storage/storageserver/opslogger.h +++ b/storage/src/vespa/storage/storageserver/opslogger.h @@ -27,20 +27,15 @@ public: ~OpsLogger(); void onClose() override; - - virtual void print(std::ostream& out, bool verbose, const std::string& indent) const override; - + void print(std::ostream& out, bool verbose, const std::string& indent) const override; bool onPutReply(const std::shared_ptr<api::PutReply>& msg) override; bool onUpdateReply(const std::shared_ptr<api::UpdateReply>& msg) override; bool onRemoveReply(const std::shared_ptr<api::RemoveReply>& msg) override; bool onGetReply(const std::shared_ptr<api::GetReply>& msg) override; /** Ignore all replies on the way down the storage chain. */ - bool onDown(const std::shared_ptr<api::StorageMessage>&) override - { return false; }; - + bool onDown(const std::shared_ptr<api::StorageMessage>&) override { return false; }; void configure(std::unique_ptr<vespa::config::content::core::StorOpsloggerConfig> config) override; - private: vespalib::Lock _lock; std::string _fileName; @@ -51,4 +46,3 @@ private: }; } - diff --git a/storage/src/vespa/storage/storageserver/statemanager.h b/storage/src/vespa/storage/storageserver/statemanager.h index 028168937b5..974b8ca2393 100644 --- a/storage/src/vespa/storage/storageserver/statemanager.h +++ b/storage/src/vespa/storage/storageserver/statemanager.h @@ -72,23 +72,19 @@ public: void tick(); - virtual void print(std::ostream& out, bool verbose, const std::string& indent) const override; + void print(std::ostream& out, bool verbose, const std::string& indent) const override; + void reportHtmlStatus(std::ostream&, const framework::HttpUrlPath&) const override; - /** Implementation of HtmlStatusReporter */ - virtual void reportHtmlStatus(std::ostream&, const framework::HttpUrlPath&) const override; + lib::NodeState::CSP getReportedNodeState() const override; + lib::NodeState::CSP getCurrentNodeState() const override; + lib::ClusterState::CSP getSystemState() const override; - virtual lib::NodeState::CSP getReportedNodeState() const override; - virtual lib::NodeState::CSP getCurrentNodeState() const override; - virtual lib::ClusterState::CSP getSystemState() const override; - - virtual void addStateListener(StateListener&) override; - virtual void removeStateListener(StateListener&) override; - - virtual Lock::SP grabStateChangeLock() override; - virtual void setReportedNodeState(const lib::NodeState& state) override; + void addStateListener(StateListener&) override; + void removeStateListener(StateListener&) override; + Lock::SP grabStateChangeLock() override; + void setReportedNodeState(const lib::NodeState& state) override; void setClusterState(const lib::ClusterState& c); - HostInfo& getHostInfo() { return *_hostInfo; } private: @@ -136,10 +132,7 @@ private: */ std::string getNodeInfo() const; - virtual void run(framework::ThreadHandle&) override; - + void run(framework::ThreadHandle&) override; }; } // storage - - diff --git a/storage/src/vespa/storage/storageserver/statereporter.h b/storage/src/vespa/storage/storageserver/statereporter.h index 51a9e93a197..c46a878ef40 100644 --- a/storage/src/vespa/storage/storageserver/statereporter.h +++ b/storage/src/vespa/storage/storageserver/statereporter.h @@ -10,11 +10,11 @@ #pragma once -#include <vespa/metrics/metrics.h> -#include <vespa/metrics/state_api_adapter.h> +#include "applicationgenerationfetcher.h" #include <vespa/storage/common/storagecomponent.h> -#include <vespa/storage/storageserver/applicationgenerationfetcher.h> #include <vespa/storageframework/storageframework.h> +#include <vespa/metrics/metrics.h> +#include <vespa/metrics/state_api_adapter.h> #include <vespa/vespalib/net/metrics_producer.h> #include <vespa/vespalib/net/state_api.h> @@ -37,11 +37,8 @@ public: const std::string& name = "status"); ~StateReporter(); - vespalib::string getReportContentType( - const framework::HttpUrlPath&) const override; - bool reportStatus(std::ostream& out, - const framework::HttpUrlPath& path) const override; - + vespalib::string getReportContentType(const framework::HttpUrlPath&) const override; + bool reportStatus(std::ostream& out, const framework::HttpUrlPath& path) const override; private: metrics::MetricManager &_manager; metrics::StateApiAdapter _metricsAdapter; @@ -50,16 +47,10 @@ private: ApplicationGenerationFetcher& _generationFetcher; std::string _name; - // Implements vespalib::MetricsProducer - virtual vespalib::string getMetrics(const vespalib::string &consumer) override; - virtual vespalib::string getTotalMetrics(const vespalib::string &consumer) override; - - // Implements vespalib::HealthProducer - virtual Health getHealth() const override; - - // Implements vespalib::ComponentConfigProducer - virtual void getComponentConfig(Consumer &consumer) override; + vespalib::string getMetrics(const vespalib::string &consumer) override; + vespalib::string getTotalMetrics(const vespalib::string &consumer) override; + Health getHealth() const override; + void getComponentConfig(Consumer &consumer) override; }; } // storage - diff --git a/storage/src/vespa/storage/storageserver/storagemetricsset.cpp b/storage/src/vespa/storage/storageserver/storagemetricsset.cpp new file mode 100644 index 00000000000..aa57e3ebcd5 --- /dev/null +++ b/storage/src/vespa/storage/storageserver/storagemetricsset.cpp @@ -0,0 +1,77 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "storagemetricsset.h" +#include <vespa/document/fieldvalue/serializablearray.h> + +namespace storage { + +MessageMemoryUseMetricSet::MessageMemoryUseMetricSet(metrics::MetricSet* owner) + : metrics::MetricSet("message_memory_use", "memory", "Message use from storage messages", owner), + total("total", "memory", "Message use from storage messages", this), + lowpri("lowpri", "memory", "Message use from low priority storage messages", this), + normalpri("normalpri", "memory", "Message use from normal priority storage messages", this), + highpri("highpri", "memory", "Message use from high priority storage messages", this), + veryhighpri("veryhighpri", "memory", "Message use from very high priority storage messages", this) +{ } +MessageMemoryUseMetricSet::~MessageMemoryUseMetricSet() {} + +DocumentSerializationMetricSet::DocumentSerializationMetricSet(metrics::MetricSet* owner) + : metrics::MetricSet("document_serialization", "docserialization", + "Counts of document serialization of various types", owner), + usedCachedSerializationCount( + "cached_serialization_count", "docserialization", + "Number of times we didn't need to serialize the document as " + "we already had serialized version cached", this), + compressedDocumentCount( + "compressed_serialization_count", "docserialization", + "Number of times we compressed document when serializing", + this), + compressionDidntHelpCount( + "compressed_didnthelp_count", "docserialization", + "Number of times we compressed document when serializing, but " + "the compressed version was bigger, so it was dumped", this), + uncompressableCount( + "uncompressable_serialization_count", "docserialization", + "Number of times we didn't attempt compression as document " + "had already been tagged uncompressable", this), + serializedUncompressed( + "uncompressed_serialization_count", "docserialization", + "Number of times we serialized a document uncompressed", this), + inputWronglySerialized( + "input_wrongly_serialized_count", "docserialization", + "Number of times we reserialized a document because the " + "compression it had in cache did not match what was configured", + this) +{ } +DocumentSerializationMetricSet::~DocumentSerializationMetricSet() { } + +StorageMetricSet::StorageMetricSet() + : metrics::MetricSet("server", "memory", + "Metrics for VDS applications"), + memoryUse("memoryusage", "memory", "", this), + memoryUse_messages(this), + memoryUse_visiting("memoryusage_visiting", "memory", + "Message use from visiting", this), + documentSerialization(this) +{ } +StorageMetricSet::~StorageMetricSet() { } + +void StorageMetricSet::updateMetrics() { + document::SerializableArray::Statistics stats( + document::SerializableArray::getStatistics()); + + documentSerialization.usedCachedSerializationCount.set( + stats._usedCachedSerializationCount); + documentSerialization.compressedDocumentCount.set( + stats._compressedDocumentCount); + documentSerialization.compressionDidntHelpCount.set( + stats._compressionDidntHelpCount); + documentSerialization.uncompressableCount.set( + stats._uncompressableCount); + documentSerialization.serializedUncompressed.set( + stats._serializedUncompressed); + documentSerialization.inputWronglySerialized.set( + stats._inputWronglySerialized); +} + +} // storage diff --git a/storage/src/vespa/storage/storageserver/storagemetricsset.h b/storage/src/vespa/storage/storageserver/storagemetricsset.h index 18be3e21ada..f7083705763 100644 --- a/storage/src/vespa/storage/storageserver/storagemetricsset.h +++ b/storage/src/vespa/storage/storageserver/storagemetricsset.h @@ -3,7 +3,6 @@ #pragma once #include <vespa/metrics/metrics.h> -#include <vespa/document/fieldvalue/serializablearray.h> namespace storage { @@ -16,22 +15,8 @@ public: metrics::LongValueMetric highpri; metrics::LongValueMetric veryhighpri; - MessageMemoryUseMetricSet(metrics::MetricSet* owner) - : metrics::MetricSet("message_memory_use", "memory", - "Message use from storage messages", owner), - total("total", "memory", - "Message use from storage messages", this), - lowpri("lowpri", "memory", - "Message use from low priority storage messages", this), - normalpri("normalpri", "memory", - "Message use from normal priority storage messages", this), - highpri("highpri", "memory", - "Message use from high priority storage messages", this), - veryhighpri("veryhighpri", "memory", - "Message use from very high priority storage messages", this) - { - } - + MessageMemoryUseMetricSet(metrics::MetricSet* owner); + ~MessageMemoryUseMetricSet(); }; struct DocumentSerializationMetricSet : public metrics::MetricSet @@ -43,36 +28,8 @@ struct DocumentSerializationMetricSet : public metrics::MetricSet metrics::LongCountMetric serializedUncompressed; metrics::LongCountMetric inputWronglySerialized; - DocumentSerializationMetricSet(metrics::MetricSet* owner) - : metrics::MetricSet("document_serialization", "docserialization", - "Counts of document serialization of various types", owner), - usedCachedSerializationCount( - "cached_serialization_count", "docserialization", - "Number of times we didn't need to serialize the document as " - "we already had serialized version cached", this), - compressedDocumentCount( - "compressed_serialization_count", "docserialization", - "Number of times we compressed document when serializing", - this), - compressionDidntHelpCount( - "compressed_didnthelp_count", "docserialization", - "Number of times we compressed document when serializing, but " - "the compressed version was bigger, so it was dumped", this), - uncompressableCount( - "uncompressable_serialization_count", "docserialization", - "Number of times we didn't attempt compression as document " - "had already been tagged uncompressable", this), - serializedUncompressed( - "uncompressed_serialization_count", "docserialization", - "Number of times we serialized a document uncompressed", this), - inputWronglySerialized( - "input_wrongly_serialized_count", "docserialization", - "Number of times we reserialized a document because the " - "compression it had in cache did not match what was configured", - this) - { - } - + DocumentSerializationMetricSet(metrics::MetricSet* owner); + ~DocumentSerializationMetricSet(); }; struct StorageMetricSet : public metrics::MetricSet @@ -82,34 +39,9 @@ struct StorageMetricSet : public metrics::MetricSet metrics::LongValueMetric memoryUse_visiting; DocumentSerializationMetricSet documentSerialization; - StorageMetricSet() - : metrics::MetricSet("server", "memory", - "Metrics for VDS applications"), - memoryUse("memoryusage", "memory", "", this), - memoryUse_messages(this), - memoryUse_visiting("memoryusage_visiting", "memory", - "Message use from visiting", this), - documentSerialization(this) - { - } - - void updateMetrics() { - document::SerializableArray::Statistics stats( - document::SerializableArray::getStatistics()); - - documentSerialization.usedCachedSerializationCount.set( - stats._usedCachedSerializationCount); - documentSerialization.compressedDocumentCount.set( - stats._compressedDocumentCount); - documentSerialization.compressionDidntHelpCount.set( - stats._compressionDidntHelpCount); - documentSerialization.uncompressableCount.set( - stats._uncompressableCount); - documentSerialization.serializedUncompressed.set( - stats._serializedUncompressed); - documentSerialization.inputWronglySerialized.set( - stats._inputWronglySerialized); - } + StorageMetricSet(); + ~StorageMetricSet(); + void updateMetrics(); }; } // storage diff --git a/storage/src/vespa/storage/storageserver/storagenode.h b/storage/src/vespa/storage/storageserver/storagenode.h index 5df29191489..5eea62a17ad 100644 --- a/storage/src/vespa/storage/storageserver/storagenode.h +++ b/storage/src/vespa/storage/storageserver/storagenode.h @@ -12,9 +12,10 @@ #pragma once +#include "storagemetricsset.h" +#include "storagenodecontext.h" +#include "applicationgenerationfetcher.h" #include <vespa/document/bucket/bucketidfactory.h> -#include <memory> -#include <string> #include <vespa/storage/config/config-stor-server.h> #include <vespa/config/helper/legacysubscriber.h> @@ -34,9 +35,6 @@ #include <vespa/storageframework/defaultimplementation/memory/memorymanager.h> #include <vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h> #include <vespa/storage/frameworkimpl/memory/memorystatusviewer.h> -#include <vespa/storage/storageserver/applicationgenerationfetcher.h> -#include <vespa/storage/storageserver/storagenodecontext.h> -#include <vespa/storage/storageserver/storagemetricsset.h> #include <vespa/storage/visiting/visitormessagesessionfactory.h> #include <vespa/storageframework/storageframework.h> #include <vespa/storage/storageutil/resumeguard.h> @@ -78,12 +76,9 @@ public: virtual ~StorageNode(); virtual const lib::NodeType& getNodeType() const = 0; - bool attemptedStopped() const; - - virtual void notifyDoneInitializing() override; + void notifyDoneInitializing() override; void waitUntilInitialized(uint32_t timeoutSeconds = 15); - void updateMetrics(const MetricLockGuard & guard) override; /** Updates the document type repo. */ @@ -94,17 +89,12 @@ public: * is alive, no calls will be made towards the persistence provider. */ virtual ResumeGuard pause() = 0; - void requestShutdown(vespalib::stringref reason) override; - - void - notifyPartitionDown(int partId, vespalib::stringref reason); - + void notifyPartitionDown(int partId, vespalib::stringref reason); DoneInitializeHandler& getDoneInitializeHandler() { return *this; } - // For testing + // For testing StorageLink* getChain() { return _chain.get(); } - virtual void initializeStatusWebServer(); private: @@ -140,10 +130,10 @@ private: std::unique_ptr<StorageLink> _chain; /** Implementation of config callbacks. */ - virtual void configure(std::unique_ptr<vespa::config::content::core::StorServerConfig> config) override; - virtual void configure(std::unique_ptr<vespa::config::content::UpgradingConfig> config) override; - virtual void configure(std::unique_ptr<vespa::config::content::StorDistributionConfig> config) override; - virtual void configure(std::unique_ptr<vespa::config::content::core::StorPrioritymappingConfig>) override; + void configure(std::unique_ptr<vespa::config::content::core::StorServerConfig> config) override; + void configure(std::unique_ptr<vespa::config::content::UpgradingConfig> config) override; + void configure(std::unique_ptr<vespa::config::content::StorDistributionConfig> config) override; + void configure(std::unique_ptr<vespa::config::content::core::StorPrioritymappingConfig>) override; virtual void configure(std::unique_ptr<document::DocumenttypesConfig> config, bool hasChanged, int64_t generation); void updateUpgradeFlag(const vespa::config::content::UpgradingConfig&); @@ -184,8 +174,6 @@ protected: virtual void handleLiveConfigUpdate(); void shutdown(); virtual void removeConfigSubscriptions(); - }; } // storage - |