summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahoo-inc.com>2017-08-31 12:06:53 +0000
committerTor Brede Vekterli <vekterli@yahoo-inc.com>2017-08-31 15:22:15 +0000
commitbd2827d46947646a538e256a4e656c8f94438917 (patch)
treedbaad465b045df560188cb94718b54d280dfe9ad /storage
parent239ccaca10f97e9e4038ae48604e248e14074eb3 (diff)
Wire together new listener to propagate errors
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/persistence/CMakeLists.txt2
-rw-r--r--storage/src/tests/persistence/provider_error_wrapper_test.cpp (renamed from storage/src/tests/persistence/providershutdownwrappertest.cpp)63
-rw-r--r--storage/src/tests/storageserver/CMakeLists.txt1
-rw-r--r--storage/src/tests/storageserver/mergethrottlertest.cpp8
-rw-r--r--storage/src/tests/storageserver/service_layer_error_listener_test.cpp82
-rw-r--r--storage/src/vespa/storage/persistence/CMakeLists.txt2
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp4
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.h7
-rw-r--r--storage/src/vespa/storage/persistence/persistencethread.h2
-rw-r--r--storage/src/vespa/storage/persistence/provider_error_wrapper.cpp (renamed from storage/src/vespa/storage/persistence/providershutdownwrapper.cpp)24
-rw-r--r--storage/src/vespa/storage/persistence/provider_error_wrapper.h (renamed from storage/src/vespa/storage/persistence/providershutdownwrapper.h)19
-rw-r--r--storage/src/vespa/storage/storageserver/CMakeLists.txt1
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.cpp11
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.h6
-rw-r--r--storage/src/vespa/storage/storageserver/service_layer_error_listener.cpp36
-rw-r--r--storage/src/vespa/storage/storageserver/service_layer_error_listener.h36
-rw-r--r--storage/src/vespa/storage/storageserver/servicelayernode.cpp23
17 files changed, 210 insertions, 117 deletions
diff --git a/storage/src/tests/persistence/CMakeLists.txt b/storage/src/tests/persistence/CMakeLists.txt
index 44aed95ba77..f5715ca3531 100644
--- a/storage/src/tests/persistence/CMakeLists.txt
+++ b/storage/src/tests/persistence/CMakeLists.txt
@@ -6,7 +6,7 @@ vespa_add_library(storage_testpersistence TEST
splitbitdetectortest.cpp
legacyoperationhandlertest.cpp
diskmoveoperationhandlertest.cpp
- providershutdownwrappertest.cpp
+ provider_error_wrapper_test.cpp
mergehandlertest.cpp
persistencethread_splittest.cpp
bucketownershipnotifiertest.cpp
diff --git a/storage/src/tests/persistence/providershutdownwrappertest.cpp b/storage/src/tests/persistence/provider_error_wrapper_test.cpp
index f7998607184..94a2586f0d8 100644
--- a/storage/src/tests/persistence/providershutdownwrappertest.cpp
+++ b/storage/src/tests/persistence/provider_error_wrapper_test.cpp
@@ -6,11 +6,9 @@
namespace storage {
-// TODO rename file
class ProviderErrorWrapperTest : public SingleDiskPersistenceTestUtils {
public:
CPPUNIT_TEST_SUITE(ProviderErrorWrapperTest);
- CPPUNIT_TEST(testShutdownOnFatalError);
CPPUNIT_TEST(fatal_error_invokes_listener);
CPPUNIT_TEST(resource_exhaustion_error_invokes_listener);
CPPUNIT_TEST(listener_not_invoked_on_success);
@@ -18,7 +16,6 @@ public:
CPPUNIT_TEST(multiple_listeners_can_be_registered);
CPPUNIT_TEST_SUITE_END();
- void testShutdownOnFatalError();
void fatal_error_invokes_listener();
void resource_exhaustion_error_invokes_listener();
void listener_not_invoked_on_success();
@@ -30,22 +27,6 @@ CPPUNIT_TEST_SUITE_REGISTRATION(ProviderErrorWrapperTest);
namespace {
-class TestShutdownListener
- : public framework::defaultimplementation::ShutdownListener
-{
-public:
- TestShutdownListener() : _reason() {}
-
- void requestShutdown(vespalib::stringref reason) override {
- _reason = reason;
- }
-
- bool shutdownRequested() const { return !_reason.empty(); }
- const vespalib::string& getReason() const { return _reason; }
-private:
- vespalib::string _reason;
-};
-
struct MockErrorListener : ProviderErrorListener {
void on_fatal_error(vespalib::stringref message) override {
_seen_fatal_error = true;
@@ -73,19 +54,17 @@ struct Fixture {
: providerWrapper(provider),
app(),
component(app.getComponentRegister(), "dummy"),
- errorWrapper(providerWrapper, component)
+ errorWrapper(providerWrapper)
{
providerWrapper.setFailureMask(PersistenceProviderWrapper::FAIL_ALL_OPERATIONS);
}
- ~Fixture();
+ ~Fixture() {}
void perform_spi_operation() {
errorWrapper.getBucketInfo(spi::Bucket(document::BucketId(16, 1234), spi::PartitionId(0)));
}
};
-Fixture::~Fixture() {}
-
}
void ProviderErrorWrapperTest::fatal_error_invokes_listener() {
@@ -157,44 +136,6 @@ void ProviderErrorWrapperTest::multiple_listeners_can_be_registered() {
CPPUNIT_ASSERT(listener2->_seen_resource_exhaustion_error);
}
-
-
-// TODO rewrite, move component interaction testing elsewhere
-void
-ProviderErrorWrapperTest::testShutdownOnFatalError() {
- Fixture f(getPersistenceProvider());
-
- TestShutdownListener shutdownListener;
-
- f.app.getComponentRegister().registerShutdownListener(shutdownListener);
-
- f.providerWrapper.setResult(
- spi::Result(spi::Result::FATAL_ERROR, "eject! eject!"));
- f.providerWrapper.setFailureMask(
- PersistenceProviderWrapper::FAIL_ALL_OPERATIONS);
-
- CPPUNIT_ASSERT(!shutdownListener.shutdownRequested());
- // This should cause the node to implicitly be shut down
- f.errorWrapper.getBucketInfo(
- spi::Bucket(document::BucketId(16, 1234),
- spi::PartitionId(0)));
-
- CPPUNIT_ASSERT(shutdownListener.shutdownRequested());
- CPPUNIT_ASSERT_EQUAL(vespalib::string("eject! eject!"),
- shutdownListener.getReason());
-
- // Triggering a new error should not cause shutdown to be requested twice.
- f.providerWrapper.setResult(
- spi::Result(spi::Result::FATAL_ERROR, "boom!"));
-
- f.errorWrapper.getBucketInfo(
- spi::Bucket(document::BucketId(16, 1234),
- spi::PartitionId(0)));
-
- CPPUNIT_ASSERT_EQUAL(vespalib::string("eject! eject!"),
- shutdownListener.getReason());
-}
-
} // ns storage
diff --git a/storage/src/tests/storageserver/CMakeLists.txt b/storage/src/tests/storageserver/CMakeLists.txt
index eedc29989eb..959261dc8ff 100644
--- a/storage/src/tests/storageserver/CMakeLists.txt
+++ b/storage/src/tests/storageserver/CMakeLists.txt
@@ -11,6 +11,7 @@ vespa_add_library(storage_teststorageserver TEST
priorityconvertertest.cpp
statereportertest.cpp
changedbucketownershiphandlertest.cpp
+ service_layer_error_listener_test.cpp
DEPENDS
storage_storageserver
storage_testcommon
diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp
index 5e00ba373d2..29ff780807f 100644
--- a/storage/src/tests/storageserver/mergethrottlertest.cpp
+++ b/storage/src/tests/storageserver/mergethrottlertest.cpp
@@ -1577,7 +1577,10 @@ MergeThrottlerTest::testNewClusterStateAbortsAllOutdatedActiveMerges()
void MergeThrottlerTest::backpressure_busy_bounces_merges_for_configured_duration() {
_servers[0]->getClock().setAbsoluteTimeInSeconds(1000);
- _throttlers[0]->applyTimedBackpressure();
+
+ CPPUNIT_ASSERT(!_throttlers[0]->backpressure_mode_active());
+ _throttlers[0]->apply_timed_backpressure();
+ CPPUNIT_ASSERT(_throttlers[0]->backpressure_mode_active());
document::BucketId bucket(16, 6789);
CPPUNIT_ASSERT_EQUAL(uint64_t(0), _throttlers[0]->getMetrics().bounced_due_to_back_pressure.getValue());
@@ -1596,12 +1599,13 @@ void MergeThrottlerTest::backpressure_busy_bounces_merges_for_configured_duratio
sendMerge(MergeBuilder(bucket).clusterStateVersion(10));
_topLinks[0]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime);
+ CPPUNIT_ASSERT(!_throttlers[0]->backpressure_mode_active());
CPPUNIT_ASSERT_EQUAL(uint64_t(1), _throttlers[0]->getMetrics().bounced_due_to_back_pressure.getValue());
}
void MergeThrottlerTest::source_only_merges_are_not_affected_by_backpressure() {
_servers[2]->getClock().setAbsoluteTimeInSeconds(1000);
- _throttlers[2]->applyTimedBackpressure();
+ _throttlers[2]->apply_timed_backpressure();
document::BucketId bucket(16, 6789);
_topLinks[2]->sendDown(MergeBuilder(bucket).clusterStateVersion(10).chain(0, 1).source_only(2).create());
diff --git a/storage/src/tests/storageserver/service_layer_error_listener_test.cpp b/storage/src/tests/storageserver/service_layer_error_listener_test.cpp
new file mode 100644
index 00000000000..eb39490da08
--- /dev/null
+++ b/storage/src/tests/storageserver/service_layer_error_listener_test.cpp
@@ -0,0 +1,82 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/storage/storageserver/service_layer_error_listener.h>
+#include <vespa/storage/storageserver/mergethrottler.h>
+#include <vespa/storageframework/defaultimplementation/component/componentregisterimpl.h>
+#include <vespa/vdstestlib/cppunit/macros.h>
+#include <vespa/vdstestlib/cppunit/dirconfig.h>
+#include <tests/common/testhelper.h>
+#include <tests/common/teststorageapp.h>
+
+namespace storage {
+
+class ServiceLayerErrorListenerTest : public CppUnit::TestFixture {
+public:
+ CPPUNIT_TEST_SUITE(ServiceLayerErrorListenerTest);
+ CPPUNIT_TEST(shutdown_invoked_on_fatal_error);
+ CPPUNIT_TEST(merge_throttle_backpressure_invoked_on_resource_exhaution_error);
+ CPPUNIT_TEST_SUITE_END();
+
+ void shutdown_invoked_on_fatal_error();
+ void merge_throttle_backpressure_invoked_on_resource_exhaution_error();
+};
+
+CPPUNIT_TEST_SUITE_REGISTRATION(ServiceLayerErrorListenerTest);
+
+namespace {
+
+class TestShutdownListener
+ : public framework::defaultimplementation::ShutdownListener
+{
+public:
+ TestShutdownListener() : _reason() {}
+
+ void requestShutdown(vespalib::stringref reason) override {
+ _reason = reason;
+ }
+
+ bool shutdown_requested() const { return !_reason.empty(); }
+ const vespalib::string& reason() const { return _reason; }
+private:
+ vespalib::string _reason;
+};
+
+struct Fixture {
+ vdstestlib::DirConfig config{getStandardConfig(true)};
+ TestServiceLayerApp app;
+ ServiceLayerComponent component{app.getComponentRegister(), "dummy"};
+ MergeThrottler merge_throttler{config.getConfigId(), app.getComponentRegister()};
+ TestShutdownListener shutdown_listener;
+ ServiceLayerErrorListener error_listener{component, merge_throttler};
+
+ ~Fixture();
+};
+
+Fixture::~Fixture() {}
+
+}
+
+void ServiceLayerErrorListenerTest::shutdown_invoked_on_fatal_error() {
+ Fixture f;
+
+ f.app.getComponentRegister().registerShutdownListener(f.shutdown_listener);
+ CPPUNIT_ASSERT(!f.shutdown_listener.shutdown_requested());
+
+ f.error_listener.on_fatal_error("eject! eject!");
+ CPPUNIT_ASSERT(f.shutdown_listener.shutdown_requested());
+ CPPUNIT_ASSERT_EQUAL(vespalib::string("eject! eject!"), f.shutdown_listener.reason());
+
+ // Should only be invoked once
+ f.error_listener.on_fatal_error("here be dragons");
+ CPPUNIT_ASSERT_EQUAL(vespalib::string("eject! eject!"), f.shutdown_listener.reason());
+}
+
+void ServiceLayerErrorListenerTest::merge_throttle_backpressure_invoked_on_resource_exhaution_error() {
+ Fixture f;
+
+ CPPUNIT_ASSERT(!f.merge_throttler.backpressure_mode_active());
+ f.error_listener.on_resource_exhaustion_error("buy more RAM!");
+ CPPUNIT_ASSERT(f.merge_throttler.backpressure_mode_active());
+}
+
+}
diff --git a/storage/src/vespa/storage/persistence/CMakeLists.txt b/storage/src/vespa/storage/persistence/CMakeLists.txt
index 42ee585988f..1f2299bf46d 100644
--- a/storage/src/vespa/storage/persistence/CMakeLists.txt
+++ b/storage/src/vespa/storage/persistence/CMakeLists.txt
@@ -9,7 +9,7 @@ vespa_add_library(storage_spersistence OBJECT
types.cpp
mergehandler.cpp
bucketprocessor.cpp
- providershutdownwrapper.cpp
+ provider_error_wrapper.cpp
bucketownershipnotifier.cpp
fieldvisitor.cpp
testandsethelper.cpp
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
index 1e74c957b01..b46217f6443 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
@@ -37,9 +37,9 @@ FileStorManager(const config::ConfigUri & configUri,
_component(compReg, "filestormanager"),
_partitions(partitions),
_providerCore(provider),
- _providerShutdown(_providerCore, _component),
+ _providerErrorWrapper(_providerCore),
_nodeUpInLastNodeStateSeenByProvider(false),
- _providerMetric(new spi::MetricPersistenceProvider(_providerShutdown)),
+ _providerMetric(new spi::MetricPersistenceProvider(_providerErrorWrapper)),
_provider(_providerMetric.get()),
_bucketIdFactory(_component.getBucketIdFactory()),
_configUri(configUri),
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h
index 233ce5edf65..05b9b9bc430 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h
@@ -23,7 +23,7 @@
#include <vespa/config-stor-filestor.h>
#include <vespa/storage/persistence/diskthread.h>
-#include <vespa/storage/persistence/providershutdownwrapper.h>
+#include <vespa/storage/persistence/provider_error_wrapper.h>
#include <vespa/storage/common/nodestateupdater.h>
#include <vespa/storageframework/generic/status/htmlstatusreporter.h>
@@ -55,7 +55,7 @@ class FileStorManager : public StorageLinkQueued,
ServiceLayerComponent _component;
const spi::PartitionStateList& _partitions;
spi::PersistenceProvider& _providerCore;
- ProviderErrorWrapper _providerShutdown;
+ ProviderErrorWrapper _providerErrorWrapper;
bool _nodeUpInLastNodeStateSeenByProvider;
spi::MetricPersistenceProvider::UP _providerMetric;
spi::PersistenceProvider* _provider;
@@ -118,6 +118,9 @@ public:
spi::PersistenceProvider& getPersistenceProvider() {
return *_provider;
}
+ ProviderErrorWrapper& error_wrapper() noexcept {
+ return _providerErrorWrapper;
+ }
void handleNewState() override;
diff --git a/storage/src/vespa/storage/persistence/persistencethread.h b/storage/src/vespa/storage/persistence/persistencethread.h
index f545d673032..218d2f7dd23 100644
--- a/storage/src/vespa/storage/persistence/persistencethread.h
+++ b/storage/src/vespa/storage/persistence/persistencethread.h
@@ -7,7 +7,7 @@
#include "mergehandler.h"
#include "diskmoveoperationhandler.h"
#include "persistenceutil.h"
-#include "providershutdownwrapper.h"
+#include "provider_error_wrapper.h"
#include <vespa/storage/common/storagecomponent.h>
#include <vespa/storage/common/statusmessages.h>
diff --git a/storage/src/vespa/storage/persistence/providershutdownwrapper.cpp b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp
index 521e1928e23..a7ba13de6fc 100644
--- a/storage/src/vespa/storage/persistence/providershutdownwrapper.cpp
+++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp
@@ -1,6 +1,6 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include "providershutdownwrapper.h"
+#include "provider_error_wrapper.h"
#include "persistenceutil.h"
#include <vespa/log/log.h>
@@ -20,28 +20,10 @@ ProviderErrorWrapper::checkResult(ResultType&& result) const
return std::forward<ResultType>(result);
}
-// TODO move this out as error listener instead
void ProviderErrorWrapper::trigger_shutdown_listeners_once(vespalib::stringref reason) const {
std::lock_guard<std::mutex> guard(_mutex);
- // TODO decide if this behavior even belongs here. Move to dedicated listener?
- if (_shutdownTriggered) {
- LOG(debug,
- "Received FATAL_ERROR from persistence provider: %s. "
- "Node has already been instructed to shut down so "
- "not doing anything now.",
- reason.c_str());
- } else {
- for (auto& listener : _listeners) {
- listener->on_fatal_error(reason);
- }
- // TODO move out
- LOG(info,
- "Received FATAL_ERROR from persistence provider, "
- "shutting down node: %s",
- reason.c_str());
- const_cast<ProviderErrorWrapper*>(this)->
- _component.requestShutdown(reason);
- _shutdownTriggered = true;
+ for (auto& listener : _listeners) {
+ listener->on_fatal_error(reason);
}
}
diff --git a/storage/src/vespa/storage/persistence/providershutdownwrapper.h b/storage/src/vespa/storage/persistence/provider_error_wrapper.h
index 41ef265ff3e..96f14b71908 100644
--- a/storage/src/vespa/storage/persistence/providershutdownwrapper.h
+++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.h
@@ -6,12 +6,8 @@
* provider implementation, transparently checking the result of each
* operation to see if the result is FATAL_ERROR or RESOURCE_EXHAUSTED.
*
- * If FATAL_ERROR is received, the wrapper transparently initiates a
- * shutdown of the process (but still returns the response up to the caller
- * as if it were just a non-wrapped call). FIXME update comment!
- *
- * If RESOURCE_EXHAUSTED is received, the wrapper will invoke any and all
- * resource exhaustion listeners synchronously, before returning the response
+ * If FATAL_ERROR or RESOURCE_EXHAUSTED is observed, the wrapper will invoke any
+ * and all resource exhaustion listeners synchronously, before returning the response
* to the caller as usual.
*/
#pragma once
@@ -39,16 +35,11 @@ public:
}
};
-// TODO rename file once name is settled!
-
class ProviderErrorWrapper : public spi::PersistenceProvider {
public:
- ProviderErrorWrapper(spi::PersistenceProvider& impl,
- ServiceLayerComponent& component)
+ explicit ProviderErrorWrapper(spi::PersistenceProvider& impl)
: _impl(impl),
- _component(component),
- _mutex(),
- _shutdownTriggered(false)
+ _mutex()
{
}
@@ -98,10 +89,8 @@ private:
void trigger_resource_exhaustion_listeners(vespalib::stringref reason) const;
spi::PersistenceProvider& _impl;
- ServiceLayerComponent& _component;
std::vector<std::shared_ptr<ProviderErrorListener>> _listeners;
mutable std::mutex _mutex;
- mutable bool _shutdownTriggered;
};
} // storage
diff --git a/storage/src/vespa/storage/storageserver/CMakeLists.txt b/storage/src/vespa/storage/storageserver/CMakeLists.txt
index 4b8905eb6d7..2fd3fe43a57 100644
--- a/storage/src/vespa/storage/storageserver/CMakeLists.txt
+++ b/storage/src/vespa/storage/storageserver/CMakeLists.txt
@@ -23,6 +23,7 @@ vespa_add_library(storage_storageserver
statereporter.cpp
storagemetricsset.cpp
changedbucketownershiphandler.cpp
+ service_layer_error_listener.cpp
INSTALL lib64
DEPENDS
storage
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.cpp b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
index cd3028dc6ba..f40fede6e8f 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.cpp
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
@@ -677,7 +677,7 @@ bool MergeThrottler::merge_has_this_node_as_source_only_node(const api::MergeBuc
void MergeThrottler::bounce_backpressure_throttled_merge(const api::MergeBucketCommand& cmd, MessageGuard& guard) {
sendReply(cmd, api::ReturnCode(api::ReturnCode::BUSY,
- "Node is throttling merges due to resource exhaustion back-pressure"),
+ "Node is throttling merges due to resource exhaustion"),
guard, _metrics->local);
_metrics->bounced_due_to_back_pressure.inc();
}
@@ -1233,10 +1233,15 @@ MergeThrottler::markActiveMergesAsAborted(uint32_t minimumStateVersion)
}
}
-void MergeThrottler::applyTimedBackpressure() {
+void MergeThrottler::apply_timed_backpressure() {
vespalib::LockGuard lock(_stateLock);
_throttle_until_time = _component.getClock().getMonotonicTime() + _backpressure_duration;
- // TODO decide if we should abort active merges
+ // TODO add exponential backoff if we deem it necessary
+}
+
+bool MergeThrottler::backpressure_mode_active() const {
+ vespalib::LockGuard lock(_stateLock);
+ return (_component.getClock().getMonotonicTime() < _throttle_until_time);
}
void
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.h b/storage/src/vespa/storage/storageserver/mergethrottler.h
index 67bb01dd8fa..d4eb4000f15 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.h
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.h
@@ -190,9 +190,11 @@ public:
* When invoked, merges to the node will be BUSY-bounced by the throttler
* for a configurable period of time instead of being processed.
*
- * Must not be called if _stateLock is already held, or deadlock will occur.
+ * Thread safe, but must not be called if _stateLock is already held, or
+ * deadlock will occur.
*/
- void applyTimedBackpressure();
+ void apply_timed_backpressure();
+ bool backpressure_mode_active() const;
// For unit testing only
const ActiveMergeMap& getActiveMerges() const { return _merges; }
diff --git a/storage/src/vespa/storage/storageserver/service_layer_error_listener.cpp b/storage/src/vespa/storage/storageserver/service_layer_error_listener.cpp
new file mode 100644
index 00000000000..41177fe46b8
--- /dev/null
+++ b/storage/src/vespa/storage/storageserver/service_layer_error_listener.cpp
@@ -0,0 +1,36 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "service_layer_error_listener.h"
+#include <vespa/storage/common/storagecomponent.h>
+#include <vespa/storage/storageserver/mergethrottler.h>
+
+#include <vespa/log/log.h>
+LOG_SETUP(".node.errorlistener");
+
+namespace storage {
+
+void ServiceLayerErrorListener::on_fatal_error(vespalib::stringref message) {
+ bool expected = false;
+ if (_shutdown_initiated.compare_exchange_strong(expected, true)) {
+ LOG(info,
+ "Received FATAL_ERROR from persistence provider, "
+ "shutting down node: %s",
+ message.c_str());
+ _component.requestShutdown(message); // Thread safe
+ } else {
+ LOG(debug,
+ "Received FATAL_ERROR from persistence provider: %s. "
+ "Node has already been instructed to shut down so "
+ "not doing anything now.",
+ message.c_str());
+ }
+}
+
+void ServiceLayerErrorListener::on_resource_exhaustion_error(vespalib::stringref message) {
+ LOG(debug, "SPI reports resource exhaustion ('%s'). "
+ "Applying back-pressure to merge throttler",
+ message.c_str());
+ _merge_throttler.apply_timed_backpressure(); // Thread safe
+}
+
+}
diff --git a/storage/src/vespa/storage/storageserver/service_layer_error_listener.h b/storage/src/vespa/storage/storageserver/service_layer_error_listener.h
new file mode 100644
index 00000000000..6995459e333
--- /dev/null
+++ b/storage/src/vespa/storage/storageserver/service_layer_error_listener.h
@@ -0,0 +1,36 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/storage/persistence/provider_error_wrapper.h>
+#include <atomic>
+
+namespace storage {
+
+class StorageComponent;
+class MergeThrottler;
+
+/*
+ * Listener implementation for SPI errors that require action beyond simply
+ * responding to the command that generated them.
+ *
+ * - Fatal errors will trigger a process shutdown.
+ * - Resource exhaustion errors will trigger merge back-pressure.
+ */
+class ServiceLayerErrorListener : public ProviderErrorListener {
+ StorageComponent& _component;
+ MergeThrottler& _merge_throttler;
+ std::atomic<bool> _shutdown_initiated;
+public:
+ ServiceLayerErrorListener(StorageComponent& component,
+ MergeThrottler& merge_throttler)
+ : _component(component),
+ _merge_throttler(merge_throttler),
+ _shutdown_initiated(false)
+ {}
+
+ void on_fatal_error(vespalib::stringref message) override;
+ void on_resource_exhaustion_error(vespalib::stringref message) override;
+};
+
+}
diff --git a/storage/src/vespa/storage/storageserver/servicelayernode.cpp b/storage/src/vespa/storage/storageserver/servicelayernode.cpp
index 4f87b9af6f2..bd4d6bd5200 100644
--- a/storage/src/vespa/storage/storageserver/servicelayernode.cpp
+++ b/storage/src/vespa/storage/storageserver/servicelayernode.cpp
@@ -9,6 +9,7 @@
#include "opslogger.h"
#include "statemanager.h"
#include "priorityconverter.h"
+#include "service_layer_error_listener.h"
#include <vespa/storage/visiting/messagebusvisitormessagesession.h>
#include <vespa/storage/visiting/visitormanager.h>
#include <vespa/storage/bucketdb/bucketmanager.h>
@@ -16,6 +17,7 @@
#include <vespa/storage/bucketmover/bucketmover.h>
#include <vespa/storage/persistence/filestorage/filestormanager.h>
#include <vespa/storage/persistence/filestorage/modifiedbucketchecker.h>
+#include <vespa/storage/persistence/provider_error_wrapper.h>
#include <vespa/persistence/spi/exceptions.h>
#include <vespa/messagebus/rpcmessagebus.h>
@@ -35,7 +37,7 @@ ServiceLayerNode::ServiceLayerNode(
_persistenceProvider(persistenceProvider),
_partitions(0),
_externalVisitors(externalVisitors),
- _fileStorManager(0),
+ _fileStorManager(nullptr),
_init_has_been_called(false),
_noUsablePartitionMode(false)
{
@@ -112,7 +114,7 @@ void
ServiceLayerNode::removeConfigSubscriptions()
{
StorageNode::removeConfigSubscriptions();
- _configFetcher.reset(0);
+ _configFetcher.reset();
}
void
@@ -166,7 +168,7 @@ ServiceLayerNode::initializeNodeSpecific()
void
ServiceLayerNode::handleLiveConfigUpdate()
{
- if (_newServerConfig.get() != 0) {
+ if (_newServerConfig) {
bool updated = false;
vespa::config::content::core::StorServerConfigBuilder oldC(*_serverConfig);
vespa::config::content::core::StorServerConfig& newC(*_newServerConfig);
@@ -218,9 +220,11 @@ ServiceLayerNode::configure(
// updates
{
vespalib::LockGuard configLockGuard(_configLock);
- _newDevicesConfig.reset(config.release());
+ _newDevicesConfig = std::move(config);
+ }
+ if (_distributionConfig) {
+ handleLiveConfigUpdate();
}
- if (_distributionConfig.get() != 0) handleLiveConfigUpdate();
}
VisitorMessageSession::UP
@@ -258,8 +262,9 @@ ServiceLayerNode::createChain()
chain->push_back(StorageLink::UP(releaseStateManager().release()));
return chain;
}
+ MergeThrottler* merge_throttler;
chain->push_back(StorageLink::UP(new OpsLogger(compReg, _configUri)));
- chain->push_back(StorageLink::UP(new MergeThrottler(_configUri, compReg)));
+ chain->push_back(StorageLink::UP(merge_throttler = new MergeThrottler(_configUri, compReg)));
chain->push_back(StorageLink::UP(new ChangedBucketOwnershipHandler(_configUri, compReg)));
chain->push_back(StorageLink::UP(new BucketIntegrityChecker(_configUri, compReg)));
chain->push_back(StorageLink::UP(new bucketmover::BucketMover(_configUri, compReg)));
@@ -277,6 +282,12 @@ ServiceLayerNode::createChain()
_configUri, _partitions, _persistenceProvider,
_context.getComponentRegister())));
chain->push_back(StorageLink::UP(releaseStateManager().release()));
+
+ // Lifetimes of all referenced components shall outlive the last call going
+ // through the SPI, as queues are flushed and worker threads joined when
+ // the storage link chain is closed prior to destruction.
+ auto error_listener = std::make_shared<ServiceLayerErrorListener>(*_component, *merge_throttler);
+ _fileStorManager->error_wrapper().register_error_listener(std::move(error_listener));
return chain;
}