summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2017-04-24 23:23:28 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2017-04-24 23:23:28 +0200
commit8cbba246fcc23b3cd1eab0186f800769b69d9cd7 (patch)
treebcc3aa532162618e44fd77ad166d046a693735c6
parent8625ee710864e39712869888a203431c52485750 (diff)
- Deinline large metrics constructors/destructors.
- Optimize includes - remove virtual when override.
-rw-r--r--storage/src/vespa/storage/storageserver/CMakeLists.txt2
-rw-r--r--storage/src/vespa/storage/storageserver/applicationgenerationfetcher.h5
-rw-r--r--storage/src/vespa/storage/storageserver/bucketintegritychecker.h34
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.h51
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanagermetrics.h31
-rw-r--r--storage/src/vespa/storage/storageserver/distributornode.h18
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.cpp72
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.h134
-rw-r--r--storage/src/vespa/storage/storageserver/opslogger.h10
-rw-r--r--storage/src/vespa/storage/storageserver/statemanager.h27
-rw-r--r--storage/src/vespa/storage/storageserver/statereporter.h27
-rw-r--r--storage/src/vespa/storage/storageserver/storagemetricsset.h82
-rw-r--r--storage/src/vespa/storage/storageserver/storagenode.h32
13 files changed, 165 insertions, 360 deletions
diff --git a/storage/src/vespa/storage/storageserver/CMakeLists.txt b/storage/src/vespa/storage/storageserver/CMakeLists.txt
index 17638f55b3a..79ce0206005 100644
--- a/storage/src/vespa/storage/storageserver/CMakeLists.txt
+++ b/storage/src/vespa/storage/storageserver/CMakeLists.txt
@@ -8,6 +8,7 @@ vespa_add_library(storage_storageserver
fnetlistener.cpp
rpcrequestwrapper.cpp
communicationmanager.cpp
+ communicationmanagermetrics.cpp
statemanager.cpp
documentapiconverter.cpp
opslogger.cpp
@@ -20,6 +21,7 @@ vespa_add_library(storage_storageserver
distributornode.cpp
servicelayernode.cpp
statereporter.cpp
+ storagemetricsset.cpp
changedbucketownershiphandler.cpp
INSTALL lib64
DEPENDS
diff --git a/storage/src/vespa/storage/storageserver/applicationgenerationfetcher.h b/storage/src/vespa/storage/storageserver/applicationgenerationfetcher.h
index 0e33c8c86c6..7d5b64c1d17 100644
--- a/storage/src/vespa/storage/storageserver/applicationgenerationfetcher.h
+++ b/storage/src/vespa/storage/storageserver/applicationgenerationfetcher.h
@@ -9,11 +9,14 @@
#pragma once
+#include <cstdint>
+#include <string>
+
namespace storage {
class ApplicationGenerationFetcher {
public:
- virtual ~ApplicationGenerationFetcher() {};
+ virtual ~ApplicationGenerationFetcher() {}
virtual int64_t getGeneration() const = 0;
virtual std::string getComponentName() const = 0;
diff --git a/storage/src/vespa/storage/storageserver/bucketintegritychecker.h b/storage/src/vespa/storage/storageserver/bucketintegritychecker.h
index 6b87be804d0..fd9516a85aa 100644
--- a/storage/src/vespa/storage/storageserver/bucketintegritychecker.h
+++ b/storage/src/vespa/storage/storageserver/bucketintegritychecker.h
@@ -112,47 +112,29 @@ public:
ServiceLayerComponentRegister&);
~BucketIntegrityChecker();
- virtual void onClose() override;
-
- virtual void print(std::ostream& out, bool verbose, const std::string& indent) const override;
-
+ void onClose() override;
+ void print(std::ostream& out, bool verbose, const std::string& indent) const override;
SchedulingOptions& getSchedulingOptions() { return _scheduleOptions; }
-
bool isWorkingOnCycle() const;
-
uint32_t getCycleCount() const;
/** Give thread a bump by signalling it. */
void bump() const;
- void setMaxThreadWaitTime(framework::MilliSecTime milliSecs)
- { _maxThreadWaitTime = milliSecs; }
+ void setMaxThreadWaitTime(framework::MilliSecTime milliSecs) { _maxThreadWaitTime = milliSecs; }
framework::Clock& getClock() { return _component.getClock(); }
private:
- virtual void configure(std::unique_ptr<vespa::config::content::core::StorIntegritycheckerConfig>) override;
-
+ void configure(std::unique_ptr<vespa::config::content::core::StorIntegritycheckerConfig>) override;
void onDoneInit() override;
-
bool onInternalReply(const std::shared_ptr<api::InternalReply>&) override;
bool onSetSystemState(const std::shared_ptr<api::SetSystemStateCommand>&) override;
- bool onNotifyBucketChangeReply(
- const std::shared_ptr<api::NotifyBucketChangeReply>&) override
- { return true; }
-
- SchedulingOptions::RunState getCurrentRunState(
- framework::SecondTime time) const;
-
- virtual void run(framework::ThreadHandle&) override;
-
+ bool onNotifyBucketChangeReply(const std::shared_ptr<api::NotifyBucketChangeReply>&) override { return true; }
+ SchedulingOptions::RunState getCurrentRunState(framework::SecondTime time) const;
+ void run(framework::ThreadHandle&) override;
uint32_t getTotalPendingCount() const;
-
- // Status::Reporter implementation
- virtual void reportHtmlStatus(std::ostream&,
- const framework::HttpUrlPath&) const override;
-
+ void reportHtmlStatus(std::ostream&, const framework::HttpUrlPath&) const override;
};
}
-
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.h b/storage/src/vespa/storage/storageserver/communicationmanager.h
index 31100de78d1..3cae63d3586 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.h
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.h
@@ -1,8 +1,6 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
/**
-
-
-* @class CommunicationManager
+ * @class CommunicationManager
* @ingroup storageserver
*
* @brief Class used for sending messages over the network.
@@ -12,26 +10,26 @@
#pragma once
-#include <vespa/vespalib/util/document_runnable.h>
-#include <map>
-#include <memory>
-#include <queue>
-#include <vector>
-#include <atomic>
-#include <vespa/metrics/metrics.h>
-#include <vespa/messagebus/rpcmessagebus.h>
-#include <vespa/storageframework/storageframework.h>
+#include <vespa/documentapi/documentapi.h>
+#include "communicationmanagermetrics.h"
+#include "messageallocationtypes.h"
+#include "documentapiconverter.h"
#include <vespa/storage/common/storagelink.h>
#include <vespa/storage/config/config-stor-communicationmanager.h>
+#include <vespa/storageframework/storageframework.h>
+#include <vespa/storageframework/storageframework.h>
#include <vespa/storageapi/messageapi/storagecommand.h>
#include <vespa/storageapi/messageapi/storagemessage.h>
#include <vespa/storageapi/mbusprot/storagecommand.h>
#include <vespa/storageapi/mbusprot/storagereply.h>
-#include <vespa/documentapi/documentapi.h>
-#include <vespa/storage/storageserver/communicationmanagermetrics.h>
-#include <vespa/storage/storageserver/messageallocationtypes.h>
-#include "documentapiconverter.h"
-#include <vespa/storageframework/storageframework.h>
+#include <vespa/messagebus/rpcmessagebus.h>
+#include <vespa/metrics/metrics.h>
+#include <vespa/vespalib/util/document_runnable.h>
+#include <map>
+#include <memory>
+#include <queue>
+#include <vector>
+#include <atomic>
namespace storage {
@@ -189,43 +187,32 @@ private:
public:
CommunicationManager(StorageComponentRegister& compReg,
const config::ConfigUri & configUri);
- virtual ~CommunicationManager();
+ ~CommunicationManager();
void enqueue(const std::shared_ptr<api::StorageMessage> & msg);
-
mbus::RPCMessageBus& getMessageBus() { assert(_mbus.get()); return *_mbus; }
-
const PriorityConverter& getPriorityConverter() const { return _docApiConverter.getPriorityConverter(); }
/**
* From StorageLink. Called when messages arrive from storage
* modules. Will convert and dispatch messages to MessageServer
*/
- virtual bool onUp(const std::shared_ptr<api::StorageMessage>&) override;
-
+ bool onUp(const std::shared_ptr<api::StorageMessage>&) override;
bool sendCommand(const std::shared_ptr<api::StorageCommand>& command);
-
bool sendReply(const std::shared_ptr<api::StorageReply>& reply);
void sendDirectRPCReply(RPCRequestWrapper& request, const std::shared_ptr<api::StorageReply>& reply);
void sendMessageBusReply(StorageTransportContext& context, const std::shared_ptr<api::StorageReply>& reply);
// Pump thread
void run(framework::ThreadHandle&) override;
+ void print(std::ostream& out, bool verbose, const std::string& indent) const override;
- virtual void print(std::ostream& out, bool verbose, const std::string& indent) const override;
-
- /** Get messages from messagebus. */
void handleMessage(std::unique_ptr<mbus::Message> msg) override;
-
void sendMessageBusMessage(const std::shared_ptr<api::StorageCommand>& msg,
- std::unique_ptr<mbus::Message> mbusMsg, const mbus::Route& route);
+ std::unique_ptr<mbus::Message> mbusMsg, const mbus::Route& route);
- /** Get replies from messagebus. */
void handleReply(std::unique_ptr<mbus::Reply> msg) override;
-
void updateMessagebusProtocol(const document::DocumentTypeRepo::SP &repo);
-
};
} // storage
-
diff --git a/storage/src/vespa/storage/storageserver/communicationmanagermetrics.h b/storage/src/vespa/storage/storageserver/communicationmanagermetrics.h
index 4b580f79904..40c3646647e 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanagermetrics.h
+++ b/storage/src/vespa/storage/storageserver/communicationmanagermetrics.h
@@ -9,7 +9,6 @@
#pragma once
#include <vespa/metrics/metrics.h>
-#include <vespa/documentapi/loadtypes/loadtypeset.h>
namespace storage {
@@ -22,34 +21,8 @@ struct CommunicationManagerMetrics : public metrics::MetricSet {
metrics::DoubleAverageMetric sendCommandLatency;
metrics::DoubleAverageMetric sendReplyLatency;
- CommunicationManagerMetrics(const metrics::LoadTypeSet& loadTypes,
- metrics::MetricSet* owner = 0)
- : metrics::MetricSet("communication", "",
- "Metrics for the communication manager", owner),
- queueSize("messagequeue", "", "Size of input message queue.", this),
- messageProcessTime(loadTypes, metrics::DoubleAverageMetric(
- "messageprocesstime", "",
- "Time transport thread uses to process a single message"),
- this),
- exceptionMessageProcessTime(loadTypes, metrics::DoubleAverageMetric(
- "exceptionmessageprocesstime", "",
- "Time transport thread uses to process a single message "
- "that fails with an exception thrown into communication "
- "manager"),
- this),
- failedDueToTooLittleMemory("toolittlememory", "",
- "Number of messages failed due to too little memory "
- "available", this),
- convertToStorageAPIFailures("convertfailures", "",
- "Number of messages that failed to get converted to "
- "storage API messages", this),
- sendCommandLatency("sendcommandlatency", "",
- "Average ms used to send commands to MBUS", this),
- sendReplyLatency("sendreplylatency", "",
- "Average ms used to send replies to MBUS", this)
- {
- }
-
+ CommunicationManagerMetrics(const metrics::LoadTypeSet& loadTypes, metrics::MetricSet* owner = 0);
+ ~CommunicationManagerMetrics();
};
}
diff --git a/storage/src/vespa/storage/storageserver/distributornode.h b/storage/src/vespa/storage/storageserver/distributornode.h
index 03792d8fb56..31fe8fe7878 100644
--- a/storage/src/vespa/storage/storageserver/distributornode.h
+++ b/storage/src/vespa/storage/storageserver/distributornode.h
@@ -8,9 +8,9 @@
#pragma once
+#include "distributornodecontext.h"
+#include "storagenode.h"
#include <vespa/storage/common/distributorcomponent.h>
-#include <vespa/storage/storageserver/distributornodecontext.h>
-#include <vespa/storage/storageserver/storagenode.h>
#include <vespa/storageframework/generic/thread/tickingthread.h>
namespace storage {
@@ -41,19 +41,16 @@ public:
StorageLink::UP communicationManager = StorageLink::UP());
~DistributorNode();
- virtual const lib::NodeType& getNodeType() const override
- { return lib::NodeType::DISTRIBUTOR; }
-
- virtual ResumeGuard pause() override;
+ const lib::NodeType& getNodeType() const override { return lib::NodeType::DISTRIBUTOR; }
+ ResumeGuard pause() override;
void handleConfigChange(vespa::config::content::core::StorDistributormanagerConfig&);
void handleConfigChange(vespa::config::content::core::StorVisitordispatcherConfig&);
private:
- virtual void initializeNodeSpecific() override;
- virtual StorageLink::UP createChain() override;
-
- virtual api::Timestamp getUniqueTimestamp() override;
+ void initializeNodeSpecific() override;
+ StorageLink::UP createChain() override;
+ api::Timestamp getUniqueTimestamp() override;
/**
* Shut down necessary distributor-specific components before shutting
@@ -63,4 +60,3 @@ private:
};
} // storage
-
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.cpp b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
index ebfa10fe814..577fedb58e1 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.cpp
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
@@ -1,15 +1,12 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <vespa/fastos/fastos.h>
-#include <vespa/storage/storageserver/mergethrottler.h>
-
+#include "mergethrottler.h"
+#include "storagemetricsset.h"
#include <iostream>
#include <sstream>
-#include <iterator>
#include <vespa/vespalib/stllike/asciistream.h>
#include <vespa/storage/common/nodestateupdater.h>
#include <vespa/storage/persistence/messages.h>
-#include <vespa/storage/storageserver/storagemetricsset.h>
#include <vespa/log/log.h>
LOG_SETUP(".mergethrottler");
@@ -46,6 +43,71 @@ const mbus::string DummyMbusMessage<Base>::NAME = "SkyNet";
}
+MergeThrottler::ChainedMergeState::ChainedMergeState()
+ : _cmd(),
+ _cmdString(),
+ _clusterStateVersion(0),
+ _inCycle(false),
+ _executingLocally(false),
+ _unwinding(false),
+ _cycleBroken(false),
+ _aborted(false)
+{ }
+
+MergeThrottler::ChainedMergeState::ChainedMergeState(const api::StorageMessage::SP& cmd, bool executing)
+ : _cmd(cmd),
+ _cmdString(cmd->toString()),
+ _clusterStateVersion(static_cast<const api::MergeBucketCommand&>(*cmd).getClusterStateVersion()),
+ _inCycle(false),
+ _executingLocally(executing),
+ _unwinding(false),
+ _cycleBroken(false),
+ _aborted(false)
+{ }
+MergeThrottler::ChainedMergeState::~ChainedMergeState() {}
+
+MergeThrottler::Metrics::Metrics(metrics::MetricSet* owner)
+ : metrics::MetricSet("mergethrottler", "", "", owner),
+ averageQueueWaitingTime("averagequeuewaitingtime", "", "Average time a merge spends in the throttler queue", this),
+ chaining("mergechains", this),
+ local("locallyexecutedmerges", this)
+{ }
+MergeThrottler::Metrics::~Metrics() {}
+
+MergeThrottler::MergeFailureMetrics::MergeFailureMetrics(metrics::MetricSet* owner)
+ : metrics::MetricSet("failures", "", "Detailed failure statistics", owner),
+ sum("total", "", "Sum of all failures", this),
+ notready("notready", "", "The number of merges discarded because distributor was not ready", this),
+ timeout("timeout", "", "The number of merges that failed because they timed out towards storage", this),
+ aborted("aborted", "", "The number of merges that failed because the storage node was (most likely) shutting down", this),
+ wrongdistribution("wrongdistribution", "", "The number of merges that were discarded (flushed) because they were initiated at an older cluster state than the current", this),
+ bucketnotfound("bucketnotfound", "", "The number of operations that failed because the bucket did not exist", this),
+ busy("busy", "", "The number of merges that failed because the storage node was busy", this),
+ exists("exists", "", "The number of merges that were rejected due to a merge operation for their bucket already being processed", this),
+ rejected("rejected", "", "The number of merges that were rejected", this),
+ other("other", "", "The number of other failures", this)
+{
+ sum.addMetricToSum(notready);
+ sum.addMetricToSum(timeout);
+ sum.addMetricToSum(aborted);
+ sum.addMetricToSum(wrongdistribution);
+ sum.addMetricToSum(bucketnotfound);
+ sum.addMetricToSum(busy);
+ sum.addMetricToSum(exists);
+ sum.addMetricToSum(rejected);
+ sum.addMetricToSum(other);
+}
+MergeThrottler::MergeFailureMetrics::~MergeFailureMetrics() { }
+
+
+MergeThrottler::MergeOperationMetrics::MergeOperationMetrics(const std::string& name, metrics::MetricSet* owner)
+ : metrics::MetricSet(name, "", vespalib::make_string("Statistics for %s", name.c_str()), owner),
+ ok("ok", "", vespalib::make_string("The number of successful merges for '%s'", name.c_str()), this),
+ failures(this)
+{
+}
+MergeThrottler::MergeOperationMetrics::~MergeOperationMetrics() { }
+
MergeThrottler::MergeNodeSequence::MergeNodeSequence(
const api::MergeBucketCommand& cmd,
uint16_t thisIndex)
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.h b/storage/src/vespa/storage/storageserver/mergethrottler.h
index 8ee4ca7be71..74ffe095d7c 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.h
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.h
@@ -7,11 +7,6 @@
*/
#pragma once
-#include <map>
-#include <utility>
-#include <vector>
-#include <set>
-#include <memory>
#include <vespa/vespalib/util/document_runnable.h>
#include <vespa/storage/config/config-stor-server.h>
#include <vespa/storage/common/storagelink.h>
@@ -48,37 +43,8 @@ public:
metrics::LongCountMetric rejected;
metrics::LongCountMetric other;
- MergeFailureMetrics(metrics::MetricSet* owner)
- : metrics::MetricSet("failures", "", "Detailed failure statistics", owner),
- sum("total", "", "Sum of all failures", this),
- notready("notready", "", "The number of merges discarded "
- "because distributor was not ready", this),
- timeout("timeout", "", "The number of merges that failed because "
- "they timed out towards storage", this),
- aborted("aborted", "", "The number of merges that failed "
- "because the storage node was (most likely) shutting down", this),
- wrongdistribution("wrongdistribution", "", "The number of merges that "
- "were discarded (flushed) because they were initiated at an "
- "older cluster state than the current", this),
- bucketnotfound("bucketnotfound", "", "The number of operations that failed "
- "because the bucket did not exist", this),
- busy("busy", "", "The number of merges that failed because the "
- "storage node was busy", this),
- exists("exists", "", "The number of merges that were rejected due to a "
- "merge operation for their bucket already being processed", this),
- rejected("rejected", "", "The number of merges that were rejected", this),
- other("other", "", "The number of other failures", this)
- {
- sum.addMetricToSum(notready);
- sum.addMetricToSum(timeout);
- sum.addMetricToSum(aborted);
- sum.addMetricToSum(wrongdistribution);
- sum.addMetricToSum(bucketnotfound);
- sum.addMetricToSum(busy);
- sum.addMetricToSum(exists);
- sum.addMetricToSum(rejected);
- sum.addMetricToSum(other);
- }
+ MergeFailureMetrics(metrics::MetricSet* owner);
+ ~MergeFailureMetrics();
};
class MergeOperationMetrics : public metrics::MetricSet
@@ -87,12 +53,8 @@ public:
metrics::LongCountMetric ok;
MergeFailureMetrics failures;
- MergeOperationMetrics(const std::string& name, metrics::MetricSet* owner)
- : metrics::MetricSet(name, "", vespalib::make_string("Statistics for %s", name.c_str()), owner),
- ok("ok", "", vespalib::make_string("The number of successful merges for '%s'", name.c_str()), this),
- failures(this)
- {
- }
+ MergeOperationMetrics(const std::string& name, metrics::MetricSet* owner);
+ ~MergeOperationMetrics();
};
class Metrics : public metrics::MetricSet
@@ -102,15 +64,8 @@ public:
MergeOperationMetrics chaining;
MergeOperationMetrics local;
- Metrics(metrics::MetricSet* owner = 0)
- : metrics::MetricSet("mergethrottler", "", "", owner),
- averageQueueWaitingTime(
- "averagequeuewaitingtime", "", "Average time a merge spends in "
- "the throttler queue", this),
- chaining("mergechains", this),
- local("locallyexecutedmerges", this)
- {
- }
+ Metrics(metrics::MetricSet* owner = 0);
+ ~Metrics();
};
private:
@@ -151,30 +106,9 @@ private:
bool _cycleBroken;
bool _aborted;
- ChainedMergeState()
- : _cmd(),
- _cmdString(),
- _clusterStateVersion(0),
- _inCycle(false),
- _executingLocally(false),
- _unwinding(false),
- _cycleBroken(false),
- _aborted(false)
- {
- }
-
- ChainedMergeState(const api::StorageMessage::SP& cmd, bool executing = false)
- : _cmd(cmd),
- _cmdString(cmd->toString()),
- _clusterStateVersion(static_cast<const api::MergeBucketCommand&>(
- *cmd).getClusterStateVersion()),
- _inCycle(false),
- _executingLocally(executing),
- _unwinding(false),
- _cycleBroken(false),
- _aborted(false)
- {
- }
+ ChainedMergeState();
+ ChainedMergeState(const api::StorageMessage::SP& cmd, bool executing = false);
+ ~ChainedMergeState();
// Use default copy-constructor/assignment operator
bool isExecutingLocally() const { return _executingLocally; }
@@ -239,9 +173,7 @@ public:
* windowSizeIncrement used for allowing unit tests to start out with more
* than 1 as their window size.
*/
- MergeThrottler(const config::ConfigUri & configUri,
- StorageComponentRegister&);
-
+ MergeThrottler(const config::ConfigUri & configUri, StorageComponentRegister&);
~MergeThrottler();
/** Implements document::Runnable::run */
@@ -267,12 +199,8 @@ public:
vespalib::Lock& getStateLock() { return _stateLock; }
Metrics& getMetrics() { return *_metrics; }
-
std::size_t getMaxQueueSize() const { return _maxQueueSize; }
-
void print(std::ostream& out, bool verbose, const std::string& indent) const override;
-
- // HtmlStatusReporter implementation
void reportHtmlStatus(std::ostream&, const framework::HttpUrlPath&) const override;
private:
friend class ThreadRendezvousGuard; // impl in .cpp file
@@ -285,9 +213,7 @@ private:
std::size_t _sortedIndex; // Index of current storage node in the sorted node sequence
const uint16_t _thisIndex; // Index of the current storage node
- MergeNodeSequence(
- const api::MergeBucketCommand& cmd,
- uint16_t thisIndex);
+ MergeNodeSequence(const api::MergeBucketCommand& cmd, uint16_t thisIndex);
std::size_t getSortedIndex() const { return _sortedIndex; }
const std::vector<api::MergeBucketCommand::Node>& getSortedNodes() const {
@@ -332,13 +258,8 @@ private:
// NOTE: unless explicitly specified, all the below functions require
// _sync lock to be held upon call (usually implicitly via MessageGuard)
- void handleMessageDown(
- const std::shared_ptr<api::StorageMessage>& msg,
- MessageGuard& msgGuard);
-
- void handleMessageUp(
- const std::shared_ptr<api::StorageMessage>& msg,
- MessageGuard& msgGuard);
+ void handleMessageDown(const std::shared_ptr<api::StorageMessage>& msg, MessageGuard& msgGuard);
+ void handleMessageUp(const std::shared_ptr<api::StorageMessage>& msg, MessageGuard& msgGuard);
/**
* Handle the receival of MergeBucketReply, be it from another node
@@ -372,17 +293,13 @@ private:
*
* Precondition: no existing merge state exists for msg's bucketid.
*/
- void processNewMergeCommand(
- const api::StorageMessage::SP& msg,
- MessageGuard& msgGuard);
+ void processNewMergeCommand(const api::StorageMessage::SP& msg, MessageGuard& msgGuard);
/**
* Precondition: an existing merge state exists for msg's bucketid.
* @return true if message was handled, false otherwise (see onUp/onDown).
*/
- bool processCycledMergeCommand(
- const api::StorageMessage::SP& msg,
- MessageGuard& msgGuard);
+ bool processCycledMergeCommand(const api::StorageMessage::SP& msg, MessageGuard& msgGuard);
/**
* Forwards the given MergeBucketCommand to the storage node given
@@ -403,10 +320,7 @@ private:
* @return Highest priority waiting merge or null SP if queue is empty
*/
api::StorageMessage::SP getNextQueuedMerge();
-
- void enqueueMerge(
- const api::StorageMessage::SP& msg,
- MessageGuard& msgGuard);
+ void enqueueMerge(const api::StorageMessage::SP& msg, MessageGuard& msgGuard);
/**
* @return true if throttle policy says at least one additional
@@ -434,25 +348,15 @@ private:
* Immediately reject all queued merges whose cluster state version is
* less than that of rejectLessThanVersion
*/
- void rejectOutdatedQueuedMerges(MessageGuard& msgGuard,
- uint32_t rejectLessThanVersion);
-
+ void rejectOutdatedQueuedMerges(MessageGuard& msgGuard, uint32_t rejectLessThanVersion);
bool attemptProcessNextQueuedMerge(MessageGuard& msgGuard);
-
bool processQueuedMerges(MessageGuard& msgGuard);
-
void handleRendezvous(vespalib::MonitorGuard& guard);
-
void rendezvousWithWorkerThread(vespalib::MonitorGuard&);
-
void releaseWorkerThreadRendezvous(vespalib::MonitorGuard&);
-
bool isDiffCommand(const api::StorageMessage& msg) const;
-
bool isMergeCommand(const api::StorageMessage& msg) const;
-
bool isMergeReply(const api::StorageMessage& msg) const;
-
bool bucketIsUnknownOrAborted(const document::BucketId& bucket) const;
std::shared_ptr<api::StorageMessage> makeAbortReply(
@@ -460,8 +364,7 @@ private:
vespalib::stringref reason) const;
void handleOutdatedMerges(const api::SetSystemStateCommand&);
- void rejectOperationsInThreadQueue(MessageGuard&,
- uint32_t minimumStateVersion);
+ void rejectOperationsInThreadQueue(MessageGuard&, uint32_t minimumStateVersion);
void markActiveMergesAsAborted(uint32_t minimumStateVersion);
// const function, but metrics are mutable
@@ -471,4 +374,3 @@ private:
};
} // namespace storage
-
diff --git a/storage/src/vespa/storage/storageserver/opslogger.h b/storage/src/vespa/storage/storageserver/opslogger.h
index 57f304a04aa..905c0fd4e85 100644
--- a/storage/src/vespa/storage/storageserver/opslogger.h
+++ b/storage/src/vespa/storage/storageserver/opslogger.h
@@ -27,20 +27,15 @@ public:
~OpsLogger();
void onClose() override;
-
- virtual void print(std::ostream& out, bool verbose, const std::string& indent) const override;
-
+ void print(std::ostream& out, bool verbose, const std::string& indent) const override;
bool onPutReply(const std::shared_ptr<api::PutReply>& msg) override;
bool onUpdateReply(const std::shared_ptr<api::UpdateReply>& msg) override;
bool onRemoveReply(const std::shared_ptr<api::RemoveReply>& msg) override;
bool onGetReply(const std::shared_ptr<api::GetReply>& msg) override;
/** Ignore all replies on the way down the storage chain. */
- bool onDown(const std::shared_ptr<api::StorageMessage>&) override
- { return false; };
-
+ bool onDown(const std::shared_ptr<api::StorageMessage>&) override { return false; };
void configure(std::unique_ptr<vespa::config::content::core::StorOpsloggerConfig> config) override;
-
private:
vespalib::Lock _lock;
std::string _fileName;
@@ -51,4 +46,3 @@ private:
};
}
-
diff --git a/storage/src/vespa/storage/storageserver/statemanager.h b/storage/src/vespa/storage/storageserver/statemanager.h
index 028168937b5..974b8ca2393 100644
--- a/storage/src/vespa/storage/storageserver/statemanager.h
+++ b/storage/src/vespa/storage/storageserver/statemanager.h
@@ -72,23 +72,19 @@ public:
void tick();
- virtual void print(std::ostream& out, bool verbose, const std::string& indent) const override;
+ void print(std::ostream& out, bool verbose, const std::string& indent) const override;
+ void reportHtmlStatus(std::ostream&, const framework::HttpUrlPath&) const override;
- /** Implementation of HtmlStatusReporter */
- virtual void reportHtmlStatus(std::ostream&, const framework::HttpUrlPath&) const override;
+ lib::NodeState::CSP getReportedNodeState() const override;
+ lib::NodeState::CSP getCurrentNodeState() const override;
+ lib::ClusterState::CSP getSystemState() const override;
- virtual lib::NodeState::CSP getReportedNodeState() const override;
- virtual lib::NodeState::CSP getCurrentNodeState() const override;
- virtual lib::ClusterState::CSP getSystemState() const override;
-
- virtual void addStateListener(StateListener&) override;
- virtual void removeStateListener(StateListener&) override;
-
- virtual Lock::SP grabStateChangeLock() override;
- virtual void setReportedNodeState(const lib::NodeState& state) override;
+ void addStateListener(StateListener&) override;
+ void removeStateListener(StateListener&) override;
+ Lock::SP grabStateChangeLock() override;
+ void setReportedNodeState(const lib::NodeState& state) override;
void setClusterState(const lib::ClusterState& c);
-
HostInfo& getHostInfo() { return *_hostInfo; }
private:
@@ -136,10 +132,7 @@ private:
*/
std::string getNodeInfo() const;
- virtual void run(framework::ThreadHandle&) override;
-
+ void run(framework::ThreadHandle&) override;
};
} // storage
-
-
diff --git a/storage/src/vespa/storage/storageserver/statereporter.h b/storage/src/vespa/storage/storageserver/statereporter.h
index 51a9e93a197..c46a878ef40 100644
--- a/storage/src/vespa/storage/storageserver/statereporter.h
+++ b/storage/src/vespa/storage/storageserver/statereporter.h
@@ -10,11 +10,11 @@
#pragma once
-#include <vespa/metrics/metrics.h>
-#include <vespa/metrics/state_api_adapter.h>
+#include "applicationgenerationfetcher.h"
#include <vespa/storage/common/storagecomponent.h>
-#include <vespa/storage/storageserver/applicationgenerationfetcher.h>
#include <vespa/storageframework/storageframework.h>
+#include <vespa/metrics/metrics.h>
+#include <vespa/metrics/state_api_adapter.h>
#include <vespa/vespalib/net/metrics_producer.h>
#include <vespa/vespalib/net/state_api.h>
@@ -37,11 +37,8 @@ public:
const std::string& name = "status");
~StateReporter();
- vespalib::string getReportContentType(
- const framework::HttpUrlPath&) const override;
- bool reportStatus(std::ostream& out,
- const framework::HttpUrlPath& path) const override;
-
+ vespalib::string getReportContentType(const framework::HttpUrlPath&) const override;
+ bool reportStatus(std::ostream& out, const framework::HttpUrlPath& path) const override;
private:
metrics::MetricManager &_manager;
metrics::StateApiAdapter _metricsAdapter;
@@ -50,16 +47,10 @@ private:
ApplicationGenerationFetcher& _generationFetcher;
std::string _name;
- // Implements vespalib::MetricsProducer
- virtual vespalib::string getMetrics(const vespalib::string &consumer) override;
- virtual vespalib::string getTotalMetrics(const vespalib::string &consumer) override;
-
- // Implements vespalib::HealthProducer
- virtual Health getHealth() const override;
-
- // Implements vespalib::ComponentConfigProducer
- virtual void getComponentConfig(Consumer &consumer) override;
+ vespalib::string getMetrics(const vespalib::string &consumer) override;
+ vespalib::string getTotalMetrics(const vespalib::string &consumer) override;
+ Health getHealth() const override;
+ void getComponentConfig(Consumer &consumer) override;
};
} // storage
-
diff --git a/storage/src/vespa/storage/storageserver/storagemetricsset.h b/storage/src/vespa/storage/storageserver/storagemetricsset.h
index 18be3e21ada..f7083705763 100644
--- a/storage/src/vespa/storage/storageserver/storagemetricsset.h
+++ b/storage/src/vespa/storage/storageserver/storagemetricsset.h
@@ -3,7 +3,6 @@
#pragma once
#include <vespa/metrics/metrics.h>
-#include <vespa/document/fieldvalue/serializablearray.h>
namespace storage {
@@ -16,22 +15,8 @@ public:
metrics::LongValueMetric highpri;
metrics::LongValueMetric veryhighpri;
- MessageMemoryUseMetricSet(metrics::MetricSet* owner)
- : metrics::MetricSet("message_memory_use", "memory",
- "Message use from storage messages", owner),
- total("total", "memory",
- "Message use from storage messages", this),
- lowpri("lowpri", "memory",
- "Message use from low priority storage messages", this),
- normalpri("normalpri", "memory",
- "Message use from normal priority storage messages", this),
- highpri("highpri", "memory",
- "Message use from high priority storage messages", this),
- veryhighpri("veryhighpri", "memory",
- "Message use from very high priority storage messages", this)
- {
- }
-
+ MessageMemoryUseMetricSet(metrics::MetricSet* owner);
+ ~MessageMemoryUseMetricSet();
};
struct DocumentSerializationMetricSet : public metrics::MetricSet
@@ -43,36 +28,8 @@ struct DocumentSerializationMetricSet : public metrics::MetricSet
metrics::LongCountMetric serializedUncompressed;
metrics::LongCountMetric inputWronglySerialized;
- DocumentSerializationMetricSet(metrics::MetricSet* owner)
- : metrics::MetricSet("document_serialization", "docserialization",
- "Counts of document serialization of various types", owner),
- usedCachedSerializationCount(
- "cached_serialization_count", "docserialization",
- "Number of times we didn't need to serialize the document as "
- "we already had serialized version cached", this),
- compressedDocumentCount(
- "compressed_serialization_count", "docserialization",
- "Number of times we compressed document when serializing",
- this),
- compressionDidntHelpCount(
- "compressed_didnthelp_count", "docserialization",
- "Number of times we compressed document when serializing, but "
- "the compressed version was bigger, so it was dumped", this),
- uncompressableCount(
- "uncompressable_serialization_count", "docserialization",
- "Number of times we didn't attempt compression as document "
- "had already been tagged uncompressable", this),
- serializedUncompressed(
- "uncompressed_serialization_count", "docserialization",
- "Number of times we serialized a document uncompressed", this),
- inputWronglySerialized(
- "input_wrongly_serialized_count", "docserialization",
- "Number of times we reserialized a document because the "
- "compression it had in cache did not match what was configured",
- this)
- {
- }
-
+ DocumentSerializationMetricSet(metrics::MetricSet* owner);
+ ~DocumentSerializationMetricSet();
};
struct StorageMetricSet : public metrics::MetricSet
@@ -82,34 +39,9 @@ struct StorageMetricSet : public metrics::MetricSet
metrics::LongValueMetric memoryUse_visiting;
DocumentSerializationMetricSet documentSerialization;
- StorageMetricSet()
- : metrics::MetricSet("server", "memory",
- "Metrics for VDS applications"),
- memoryUse("memoryusage", "memory", "", this),
- memoryUse_messages(this),
- memoryUse_visiting("memoryusage_visiting", "memory",
- "Message use from visiting", this),
- documentSerialization(this)
- {
- }
-
- void updateMetrics() {
- document::SerializableArray::Statistics stats(
- document::SerializableArray::getStatistics());
-
- documentSerialization.usedCachedSerializationCount.set(
- stats._usedCachedSerializationCount);
- documentSerialization.compressedDocumentCount.set(
- stats._compressedDocumentCount);
- documentSerialization.compressionDidntHelpCount.set(
- stats._compressionDidntHelpCount);
- documentSerialization.uncompressableCount.set(
- stats._uncompressableCount);
- documentSerialization.serializedUncompressed.set(
- stats._serializedUncompressed);
- documentSerialization.inputWronglySerialized.set(
- stats._inputWronglySerialized);
- }
+ StorageMetricSet();
+ ~StorageMetricSet();
+ void updateMetrics();
};
} // storage
diff --git a/storage/src/vespa/storage/storageserver/storagenode.h b/storage/src/vespa/storage/storageserver/storagenode.h
index 5df29191489..5eea62a17ad 100644
--- a/storage/src/vespa/storage/storageserver/storagenode.h
+++ b/storage/src/vespa/storage/storageserver/storagenode.h
@@ -12,9 +12,10 @@
#pragma once
+#include "storagemetricsset.h"
+#include "storagenodecontext.h"
+#include "applicationgenerationfetcher.h"
#include <vespa/document/bucket/bucketidfactory.h>
-#include <memory>
-#include <string>
#include <vespa/storage/config/config-stor-server.h>
#include <vespa/config/helper/legacysubscriber.h>
@@ -34,9 +35,6 @@
#include <vespa/storageframework/defaultimplementation/memory/memorymanager.h>
#include <vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h>
#include <vespa/storage/frameworkimpl/memory/memorystatusviewer.h>
-#include <vespa/storage/storageserver/applicationgenerationfetcher.h>
-#include <vespa/storage/storageserver/storagenodecontext.h>
-#include <vespa/storage/storageserver/storagemetricsset.h>
#include <vespa/storage/visiting/visitormessagesessionfactory.h>
#include <vespa/storageframework/storageframework.h>
#include <vespa/storage/storageutil/resumeguard.h>
@@ -78,12 +76,9 @@ public:
virtual ~StorageNode();
virtual const lib::NodeType& getNodeType() const = 0;
-
bool attemptedStopped() const;
-
- virtual void notifyDoneInitializing() override;
+ void notifyDoneInitializing() override;
void waitUntilInitialized(uint32_t timeoutSeconds = 15);
-
void updateMetrics(const MetricLockGuard & guard) override;
/** Updates the document type repo. */
@@ -94,17 +89,12 @@ public:
* is alive, no calls will be made towards the persistence provider.
*/
virtual ResumeGuard pause() = 0;
-
void requestShutdown(vespalib::stringref reason) override;
-
- void
- notifyPartitionDown(int partId, vespalib::stringref reason);
-
+ void notifyPartitionDown(int partId, vespalib::stringref reason);
DoneInitializeHandler& getDoneInitializeHandler() { return *this; }
- // For testing
+ // For testing
StorageLink* getChain() { return _chain.get(); }
-
virtual void initializeStatusWebServer();
private:
@@ -140,10 +130,10 @@ private:
std::unique_ptr<StorageLink> _chain;
/** Implementation of config callbacks. */
- virtual void configure(std::unique_ptr<vespa::config::content::core::StorServerConfig> config) override;
- virtual void configure(std::unique_ptr<vespa::config::content::UpgradingConfig> config) override;
- virtual void configure(std::unique_ptr<vespa::config::content::StorDistributionConfig> config) override;
- virtual void configure(std::unique_ptr<vespa::config::content::core::StorPrioritymappingConfig>) override;
+ void configure(std::unique_ptr<vespa::config::content::core::StorServerConfig> config) override;
+ void configure(std::unique_ptr<vespa::config::content::UpgradingConfig> config) override;
+ void configure(std::unique_ptr<vespa::config::content::StorDistributionConfig> config) override;
+ void configure(std::unique_ptr<vespa::config::content::core::StorPrioritymappingConfig>) override;
virtual void configure(std::unique_ptr<document::DocumenttypesConfig> config,
bool hasChanged, int64_t generation);
void updateUpgradeFlag(const vespa::config::content::UpgradingConfig&);
@@ -184,8 +174,6 @@ protected:
virtual void handleLiveConfigUpdate();
void shutdown();
virtual void removeConfigSubscriptions();
-
};
} // storage
-