summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2019-10-07 14:48:05 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2019-10-09 14:54:50 +0000
commit0c41661e9c77aa39d26d67970628ffdd64853971 (patch)
treeaa6d6ef3f1b5492c97c5ef5a3f9db424663df15f /storage
parentecd57816948aeaa143f9686dea971fa01763e2e4 (diff)
Support thread-safe metric updates
Currently only used for code paths touched by Get operations.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/vespa/storage/distributor/externaloperationhandler.cpp4
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/getoperation.cpp39
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/getoperation.h2
-rw-r--r--storage/src/vespa/storage/distributor/persistence_operation_metric_set.h20
4 files changed, 45 insertions, 20 deletions
diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
index c48dad70357..3267d455b45 100644
--- a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
+++ b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
@@ -302,7 +302,7 @@ std::shared_ptr<Operation> ExternalOperationHandler::try_generate_get_operation(
*ctx.pending_cluster_state());
} else {
bounce_with_wrong_distribution(*cmd, *snapshot.context().default_active_cluster_state());
- metrics.failures.wrongdistributor.inc(); // TODO thread safety for updates
+ metrics.locked()->failures.wrongdistributor.inc();
}
return std::shared_ptr<Operation>();
}
@@ -366,6 +366,8 @@ bool ExternalOperationHandler::try_handle_message_outside_main_thread(const std:
std::lock_guard g(_non_main_thread_ops_mutex);
// The Get for which this reply was created may have been sent by someone outside
// the ExternalOperationHandler, such as TwoPhaseUpdateOperation. Pass it on if so.
+ // It is undefined which thread actually invokes this, so mutex protection of reply
+ // handling is crucial!
return _non_main_thread_ops_owner.handleReply(std::dynamic_pointer_cast<api::StorageReply>(msg));
}
return false;
diff --git a/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp
index 7ff2e298791..ad2e5cf6478 100644
--- a/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp
@@ -182,31 +182,32 @@ GetOperation::onReceive(DistributorMessageSender& sender, const std::shared_ptr<
}
}
+void GetOperation::update_internal_metrics() {
+ auto metric = _metric.locked();
+ if (_returnCode.success()) {
+ metric->ok.inc();
+ } else if (_returnCode.getResult() == api::ReturnCode::TIMEOUT) {
+ metric->failures.timeout.inc();
+ } else if (_returnCode.isBusy()) {
+ metric->failures.busy.inc();
+ } else if (_returnCode.isNodeDownOrNetwork()) {
+ metric->failures.notconnected.inc();
+ } else {
+ metric->failures.storagefailure.inc();
+ }
+ if (!_doc.get()) {
+ metric->failures.notfound.inc();
+ }
+ metric->latency.addValue(_operationTimer.getElapsedTimeAsDouble());
+}
+
void
GetOperation::sendReply(DistributorMessageSender& sender)
{
if (_msg.get()) {
auto repl = std::make_shared<api::GetReply>(*_msg, _doc, _lastModified);
repl->setResult(_returnCode);
-
- if (_returnCode.success()) {
- _metric.ok.inc();
- } else if (_returnCode.getResult() == api::ReturnCode::TIMEOUT) {
- _metric.failures.timeout.inc();
- } else if (_returnCode.isBusy()) {
- _metric.failures.busy.inc();
- } else if (_returnCode.isNodeDownOrNetwork()) {
- _metric.failures.notconnected.inc();
- } else {
- _metric.failures.storagefailure.inc();
- }
-
- if (!_doc.get()) {
- _metric.failures.notfound.inc();
- }
-
- _metric.latency.addValue(_operationTimer.getElapsedTimeAsDouble());
-
+ update_internal_metrics();
sender.sendReply(repl);
_msg.reset();
}
diff --git a/storage/src/vespa/storage/distributor/operations/external/getoperation.h b/storage/src/vespa/storage/distributor/operations/external/getoperation.h
index fe4dab5e9f2..4f2d7c3d963 100644
--- a/storage/src/vespa/storage/distributor/operations/external/getoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/getoperation.h
@@ -99,6 +99,8 @@ private:
* could be found (i.e. all targets have already been sent to).
*/
int findBestUnsentTarget(const GroupVector& candidates) const;
+
+ void update_internal_metrics();
};
}
diff --git a/storage/src/vespa/storage/distributor/persistence_operation_metric_set.h b/storage/src/vespa/storage/distributor/persistence_operation_metric_set.h
index 52249529e4f..d951d7ceba2 100644
--- a/storage/src/vespa/storage/distributor/persistence_operation_metric_set.h
+++ b/storage/src/vespa/storage/distributor/persistence_operation_metric_set.h
@@ -4,6 +4,7 @@
#include <vespa/metrics/metrics.h>
#include <vespa/documentapi/loadtypes/loadtypeset.h>
+#include <mutex>
namespace storage {
@@ -36,6 +37,7 @@ public:
class PersistenceOperationMetricSet : public metrics::MetricSet
{
+ mutable std::mutex _mutex;
public:
metrics::DoubleAverageMetric latency;
metrics::LongCountMetric ok;
@@ -54,6 +56,24 @@ public:
* Does _not_ update latency metric.
*/
void updateFromResult(const api::ReturnCode& result);
+
+ friend class LockWrapper;
+ class LockWrapper {
+ std::unique_lock<std::mutex> _lock;
+ PersistenceOperationMetricSet& _self;
+ public:
+ explicit LockWrapper(PersistenceOperationMetricSet& self)
+ : _lock(self._mutex),
+ _self(self)
+ {}
+
+ PersistenceOperationMetricSet* operator->() noexcept {
+ return &_self;
+ }
+ };
+ LockWrapper locked() noexcept {
+ return LockWrapper(*this);
+ }
};
}