diff options
author | Tor Brede Vekterli <vekterli@yahoo-inc.com> | 2017-08-31 12:06:53 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@yahoo-inc.com> | 2017-08-31 15:22:15 +0000 |
commit | bd2827d46947646a538e256a4e656c8f94438917 (patch) | |
tree | dbaad465b045df560188cb94718b54d280dfe9ad /storage | |
parent | 239ccaca10f97e9e4038ae48604e248e14074eb3 (diff) |
Wire together new listener to propagate errors
Diffstat (limited to 'storage')
17 files changed, 210 insertions, 117 deletions
diff --git a/storage/src/tests/persistence/CMakeLists.txt b/storage/src/tests/persistence/CMakeLists.txt index 44aed95ba77..f5715ca3531 100644 --- a/storage/src/tests/persistence/CMakeLists.txt +++ b/storage/src/tests/persistence/CMakeLists.txt @@ -6,7 +6,7 @@ vespa_add_library(storage_testpersistence TEST splitbitdetectortest.cpp legacyoperationhandlertest.cpp diskmoveoperationhandlertest.cpp - providershutdownwrappertest.cpp + provider_error_wrapper_test.cpp mergehandlertest.cpp persistencethread_splittest.cpp bucketownershipnotifiertest.cpp diff --git a/storage/src/tests/persistence/providershutdownwrappertest.cpp b/storage/src/tests/persistence/provider_error_wrapper_test.cpp index f7998607184..94a2586f0d8 100644 --- a/storage/src/tests/persistence/providershutdownwrappertest.cpp +++ b/storage/src/tests/persistence/provider_error_wrapper_test.cpp @@ -6,11 +6,9 @@ namespace storage { -// TODO rename file class ProviderErrorWrapperTest : public SingleDiskPersistenceTestUtils { public: CPPUNIT_TEST_SUITE(ProviderErrorWrapperTest); - CPPUNIT_TEST(testShutdownOnFatalError); CPPUNIT_TEST(fatal_error_invokes_listener); CPPUNIT_TEST(resource_exhaustion_error_invokes_listener); CPPUNIT_TEST(listener_not_invoked_on_success); @@ -18,7 +16,6 @@ public: CPPUNIT_TEST(multiple_listeners_can_be_registered); CPPUNIT_TEST_SUITE_END(); - void testShutdownOnFatalError(); void fatal_error_invokes_listener(); void resource_exhaustion_error_invokes_listener(); void listener_not_invoked_on_success(); @@ -30,22 +27,6 @@ CPPUNIT_TEST_SUITE_REGISTRATION(ProviderErrorWrapperTest); namespace { -class TestShutdownListener - : public framework::defaultimplementation::ShutdownListener -{ -public: - TestShutdownListener() : _reason() {} - - void requestShutdown(vespalib::stringref reason) override { - _reason = reason; - } - - bool shutdownRequested() const { return !_reason.empty(); } - const vespalib::string& getReason() const { return _reason; } -private: - vespalib::string _reason; -}; - struct MockErrorListener : ProviderErrorListener { void on_fatal_error(vespalib::stringref message) override { _seen_fatal_error = true; @@ -73,19 +54,17 @@ struct Fixture { : providerWrapper(provider), app(), component(app.getComponentRegister(), "dummy"), - errorWrapper(providerWrapper, component) + errorWrapper(providerWrapper) { providerWrapper.setFailureMask(PersistenceProviderWrapper::FAIL_ALL_OPERATIONS); } - ~Fixture(); + ~Fixture() {} void perform_spi_operation() { errorWrapper.getBucketInfo(spi::Bucket(document::BucketId(16, 1234), spi::PartitionId(0))); } }; -Fixture::~Fixture() {} - } void ProviderErrorWrapperTest::fatal_error_invokes_listener() { @@ -157,44 +136,6 @@ void ProviderErrorWrapperTest::multiple_listeners_can_be_registered() { CPPUNIT_ASSERT(listener2->_seen_resource_exhaustion_error); } - - -// TODO rewrite, move component interaction testing elsewhere -void -ProviderErrorWrapperTest::testShutdownOnFatalError() { - Fixture f(getPersistenceProvider()); - - TestShutdownListener shutdownListener; - - f.app.getComponentRegister().registerShutdownListener(shutdownListener); - - f.providerWrapper.setResult( - spi::Result(spi::Result::FATAL_ERROR, "eject! eject!")); - f.providerWrapper.setFailureMask( - PersistenceProviderWrapper::FAIL_ALL_OPERATIONS); - - CPPUNIT_ASSERT(!shutdownListener.shutdownRequested()); - // This should cause the node to implicitly be shut down - f.errorWrapper.getBucketInfo( - spi::Bucket(document::BucketId(16, 1234), - spi::PartitionId(0))); - - CPPUNIT_ASSERT(shutdownListener.shutdownRequested()); - CPPUNIT_ASSERT_EQUAL(vespalib::string("eject! eject!"), - shutdownListener.getReason()); - - // Triggering a new error should not cause shutdown to be requested twice. - f.providerWrapper.setResult( - spi::Result(spi::Result::FATAL_ERROR, "boom!")); - - f.errorWrapper.getBucketInfo( - spi::Bucket(document::BucketId(16, 1234), - spi::PartitionId(0))); - - CPPUNIT_ASSERT_EQUAL(vespalib::string("eject! eject!"), - shutdownListener.getReason()); -} - } // ns storage diff --git a/storage/src/tests/storageserver/CMakeLists.txt b/storage/src/tests/storageserver/CMakeLists.txt index eedc29989eb..959261dc8ff 100644 --- a/storage/src/tests/storageserver/CMakeLists.txt +++ b/storage/src/tests/storageserver/CMakeLists.txt @@ -11,6 +11,7 @@ vespa_add_library(storage_teststorageserver TEST priorityconvertertest.cpp statereportertest.cpp changedbucketownershiphandlertest.cpp + service_layer_error_listener_test.cpp DEPENDS storage_storageserver storage_testcommon diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp index 5e00ba373d2..29ff780807f 100644 --- a/storage/src/tests/storageserver/mergethrottlertest.cpp +++ b/storage/src/tests/storageserver/mergethrottlertest.cpp @@ -1577,7 +1577,10 @@ MergeThrottlerTest::testNewClusterStateAbortsAllOutdatedActiveMerges() void MergeThrottlerTest::backpressure_busy_bounces_merges_for_configured_duration() { _servers[0]->getClock().setAbsoluteTimeInSeconds(1000); - _throttlers[0]->applyTimedBackpressure(); + + CPPUNIT_ASSERT(!_throttlers[0]->backpressure_mode_active()); + _throttlers[0]->apply_timed_backpressure(); + CPPUNIT_ASSERT(_throttlers[0]->backpressure_mode_active()); document::BucketId bucket(16, 6789); CPPUNIT_ASSERT_EQUAL(uint64_t(0), _throttlers[0]->getMetrics().bounced_due_to_back_pressure.getValue()); @@ -1596,12 +1599,13 @@ void MergeThrottlerTest::backpressure_busy_bounces_merges_for_configured_duratio sendMerge(MergeBuilder(bucket).clusterStateVersion(10)); _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); + CPPUNIT_ASSERT(!_throttlers[0]->backpressure_mode_active()); CPPUNIT_ASSERT_EQUAL(uint64_t(1), _throttlers[0]->getMetrics().bounced_due_to_back_pressure.getValue()); } void MergeThrottlerTest::source_only_merges_are_not_affected_by_backpressure() { _servers[2]->getClock().setAbsoluteTimeInSeconds(1000); - _throttlers[2]->applyTimedBackpressure(); + _throttlers[2]->apply_timed_backpressure(); document::BucketId bucket(16, 6789); _topLinks[2]->sendDown(MergeBuilder(bucket).clusterStateVersion(10).chain(0, 1).source_only(2).create()); diff --git a/storage/src/tests/storageserver/service_layer_error_listener_test.cpp b/storage/src/tests/storageserver/service_layer_error_listener_test.cpp new file mode 100644 index 00000000000..eb39490da08 --- /dev/null +++ b/storage/src/tests/storageserver/service_layer_error_listener_test.cpp @@ -0,0 +1,82 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/storage/storageserver/service_layer_error_listener.h> +#include <vespa/storage/storageserver/mergethrottler.h> +#include <vespa/storageframework/defaultimplementation/component/componentregisterimpl.h> +#include <vespa/vdstestlib/cppunit/macros.h> +#include <vespa/vdstestlib/cppunit/dirconfig.h> +#include <tests/common/testhelper.h> +#include <tests/common/teststorageapp.h> + +namespace storage { + +class ServiceLayerErrorListenerTest : public CppUnit::TestFixture { +public: + CPPUNIT_TEST_SUITE(ServiceLayerErrorListenerTest); + CPPUNIT_TEST(shutdown_invoked_on_fatal_error); + CPPUNIT_TEST(merge_throttle_backpressure_invoked_on_resource_exhaution_error); + CPPUNIT_TEST_SUITE_END(); + + void shutdown_invoked_on_fatal_error(); + void merge_throttle_backpressure_invoked_on_resource_exhaution_error(); +}; + +CPPUNIT_TEST_SUITE_REGISTRATION(ServiceLayerErrorListenerTest); + +namespace { + +class TestShutdownListener + : public framework::defaultimplementation::ShutdownListener +{ +public: + TestShutdownListener() : _reason() {} + + void requestShutdown(vespalib::stringref reason) override { + _reason = reason; + } + + bool shutdown_requested() const { return !_reason.empty(); } + const vespalib::string& reason() const { return _reason; } +private: + vespalib::string _reason; +}; + +struct Fixture { + vdstestlib::DirConfig config{getStandardConfig(true)}; + TestServiceLayerApp app; + ServiceLayerComponent component{app.getComponentRegister(), "dummy"}; + MergeThrottler merge_throttler{config.getConfigId(), app.getComponentRegister()}; + TestShutdownListener shutdown_listener; + ServiceLayerErrorListener error_listener{component, merge_throttler}; + + ~Fixture(); +}; + +Fixture::~Fixture() {} + +} + +void ServiceLayerErrorListenerTest::shutdown_invoked_on_fatal_error() { + Fixture f; + + f.app.getComponentRegister().registerShutdownListener(f.shutdown_listener); + CPPUNIT_ASSERT(!f.shutdown_listener.shutdown_requested()); + + f.error_listener.on_fatal_error("eject! eject!"); + CPPUNIT_ASSERT(f.shutdown_listener.shutdown_requested()); + CPPUNIT_ASSERT_EQUAL(vespalib::string("eject! eject!"), f.shutdown_listener.reason()); + + // Should only be invoked once + f.error_listener.on_fatal_error("here be dragons"); + CPPUNIT_ASSERT_EQUAL(vespalib::string("eject! eject!"), f.shutdown_listener.reason()); +} + +void ServiceLayerErrorListenerTest::merge_throttle_backpressure_invoked_on_resource_exhaution_error() { + Fixture f; + + CPPUNIT_ASSERT(!f.merge_throttler.backpressure_mode_active()); + f.error_listener.on_resource_exhaustion_error("buy more RAM!"); + CPPUNIT_ASSERT(f.merge_throttler.backpressure_mode_active()); +} + +} diff --git a/storage/src/vespa/storage/persistence/CMakeLists.txt b/storage/src/vespa/storage/persistence/CMakeLists.txt index 42ee585988f..1f2299bf46d 100644 --- a/storage/src/vespa/storage/persistence/CMakeLists.txt +++ b/storage/src/vespa/storage/persistence/CMakeLists.txt @@ -9,7 +9,7 @@ vespa_add_library(storage_spersistence OBJECT types.cpp mergehandler.cpp bucketprocessor.cpp - providershutdownwrapper.cpp + provider_error_wrapper.cpp bucketownershipnotifier.cpp fieldvisitor.cpp testandsethelper.cpp diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index 1e74c957b01..b46217f6443 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -37,9 +37,9 @@ FileStorManager(const config::ConfigUri & configUri, _component(compReg, "filestormanager"), _partitions(partitions), _providerCore(provider), - _providerShutdown(_providerCore, _component), + _providerErrorWrapper(_providerCore), _nodeUpInLastNodeStateSeenByProvider(false), - _providerMetric(new spi::MetricPersistenceProvider(_providerShutdown)), + _providerMetric(new spi::MetricPersistenceProvider(_providerErrorWrapper)), _provider(_providerMetric.get()), _bucketIdFactory(_component.getBucketIdFactory()), _configUri(configUri), diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h index 233ce5edf65..05b9b9bc430 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h @@ -23,7 +23,7 @@ #include <vespa/config-stor-filestor.h> #include <vespa/storage/persistence/diskthread.h> -#include <vespa/storage/persistence/providershutdownwrapper.h> +#include <vespa/storage/persistence/provider_error_wrapper.h> #include <vespa/storage/common/nodestateupdater.h> #include <vespa/storageframework/generic/status/htmlstatusreporter.h> @@ -55,7 +55,7 @@ class FileStorManager : public StorageLinkQueued, ServiceLayerComponent _component; const spi::PartitionStateList& _partitions; spi::PersistenceProvider& _providerCore; - ProviderErrorWrapper _providerShutdown; + ProviderErrorWrapper _providerErrorWrapper; bool _nodeUpInLastNodeStateSeenByProvider; spi::MetricPersistenceProvider::UP _providerMetric; spi::PersistenceProvider* _provider; @@ -118,6 +118,9 @@ public: spi::PersistenceProvider& getPersistenceProvider() { return *_provider; } + ProviderErrorWrapper& error_wrapper() noexcept { + return _providerErrorWrapper; + } void handleNewState() override; diff --git a/storage/src/vespa/storage/persistence/persistencethread.h b/storage/src/vespa/storage/persistence/persistencethread.h index f545d673032..218d2f7dd23 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.h +++ b/storage/src/vespa/storage/persistence/persistencethread.h @@ -7,7 +7,7 @@ #include "mergehandler.h" #include "diskmoveoperationhandler.h" #include "persistenceutil.h" -#include "providershutdownwrapper.h" +#include "provider_error_wrapper.h" #include <vespa/storage/common/storagecomponent.h> #include <vespa/storage/common/statusmessages.h> diff --git a/storage/src/vespa/storage/persistence/providershutdownwrapper.cpp b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp index 521e1928e23..a7ba13de6fc 100644 --- a/storage/src/vespa/storage/persistence/providershutdownwrapper.cpp +++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp @@ -1,6 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include "providershutdownwrapper.h" +#include "provider_error_wrapper.h" #include "persistenceutil.h" #include <vespa/log/log.h> @@ -20,28 +20,10 @@ ProviderErrorWrapper::checkResult(ResultType&& result) const return std::forward<ResultType>(result); } -// TODO move this out as error listener instead void ProviderErrorWrapper::trigger_shutdown_listeners_once(vespalib::stringref reason) const { std::lock_guard<std::mutex> guard(_mutex); - // TODO decide if this behavior even belongs here. Move to dedicated listener? - if (_shutdownTriggered) { - LOG(debug, - "Received FATAL_ERROR from persistence provider: %s. " - "Node has already been instructed to shut down so " - "not doing anything now.", - reason.c_str()); - } else { - for (auto& listener : _listeners) { - listener->on_fatal_error(reason); - } - // TODO move out - LOG(info, - "Received FATAL_ERROR from persistence provider, " - "shutting down node: %s", - reason.c_str()); - const_cast<ProviderErrorWrapper*>(this)-> - _component.requestShutdown(reason); - _shutdownTriggered = true; + for (auto& listener : _listeners) { + listener->on_fatal_error(reason); } } diff --git a/storage/src/vespa/storage/persistence/providershutdownwrapper.h b/storage/src/vespa/storage/persistence/provider_error_wrapper.h index 41ef265ff3e..96f14b71908 100644 --- a/storage/src/vespa/storage/persistence/providershutdownwrapper.h +++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.h @@ -6,12 +6,8 @@ * provider implementation, transparently checking the result of each * operation to see if the result is FATAL_ERROR or RESOURCE_EXHAUSTED. * - * If FATAL_ERROR is received, the wrapper transparently initiates a - * shutdown of the process (but still returns the response up to the caller - * as if it were just a non-wrapped call). FIXME update comment! - * - * If RESOURCE_EXHAUSTED is received, the wrapper will invoke any and all - * resource exhaustion listeners synchronously, before returning the response + * If FATAL_ERROR or RESOURCE_EXHAUSTED is observed, the wrapper will invoke any + * and all resource exhaustion listeners synchronously, before returning the response * to the caller as usual. */ #pragma once @@ -39,16 +35,11 @@ public: } }; -// TODO rename file once name is settled! - class ProviderErrorWrapper : public spi::PersistenceProvider { public: - ProviderErrorWrapper(spi::PersistenceProvider& impl, - ServiceLayerComponent& component) + explicit ProviderErrorWrapper(spi::PersistenceProvider& impl) : _impl(impl), - _component(component), - _mutex(), - _shutdownTriggered(false) + _mutex() { } @@ -98,10 +89,8 @@ private: void trigger_resource_exhaustion_listeners(vespalib::stringref reason) const; spi::PersistenceProvider& _impl; - ServiceLayerComponent& _component; std::vector<std::shared_ptr<ProviderErrorListener>> _listeners; mutable std::mutex _mutex; - mutable bool _shutdownTriggered; }; } // storage diff --git a/storage/src/vespa/storage/storageserver/CMakeLists.txt b/storage/src/vespa/storage/storageserver/CMakeLists.txt index 4b8905eb6d7..2fd3fe43a57 100644 --- a/storage/src/vespa/storage/storageserver/CMakeLists.txt +++ b/storage/src/vespa/storage/storageserver/CMakeLists.txt @@ -23,6 +23,7 @@ vespa_add_library(storage_storageserver statereporter.cpp storagemetricsset.cpp changedbucketownershiphandler.cpp + service_layer_error_listener.cpp INSTALL lib64 DEPENDS storage diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.cpp b/storage/src/vespa/storage/storageserver/mergethrottler.cpp index cd3028dc6ba..f40fede6e8f 100644 --- a/storage/src/vespa/storage/storageserver/mergethrottler.cpp +++ b/storage/src/vespa/storage/storageserver/mergethrottler.cpp @@ -677,7 +677,7 @@ bool MergeThrottler::merge_has_this_node_as_source_only_node(const api::MergeBuc void MergeThrottler::bounce_backpressure_throttled_merge(const api::MergeBucketCommand& cmd, MessageGuard& guard) { sendReply(cmd, api::ReturnCode(api::ReturnCode::BUSY, - "Node is throttling merges due to resource exhaustion back-pressure"), + "Node is throttling merges due to resource exhaustion"), guard, _metrics->local); _metrics->bounced_due_to_back_pressure.inc(); } @@ -1233,10 +1233,15 @@ MergeThrottler::markActiveMergesAsAborted(uint32_t minimumStateVersion) } } -void MergeThrottler::applyTimedBackpressure() { +void MergeThrottler::apply_timed_backpressure() { vespalib::LockGuard lock(_stateLock); _throttle_until_time = _component.getClock().getMonotonicTime() + _backpressure_duration; - // TODO decide if we should abort active merges + // TODO add exponential backoff if we deem it necessary +} + +bool MergeThrottler::backpressure_mode_active() const { + vespalib::LockGuard lock(_stateLock); + return (_component.getClock().getMonotonicTime() < _throttle_until_time); } void diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.h b/storage/src/vespa/storage/storageserver/mergethrottler.h index 67bb01dd8fa..d4eb4000f15 100644 --- a/storage/src/vespa/storage/storageserver/mergethrottler.h +++ b/storage/src/vespa/storage/storageserver/mergethrottler.h @@ -190,9 +190,11 @@ public: * When invoked, merges to the node will be BUSY-bounced by the throttler * for a configurable period of time instead of being processed. * - * Must not be called if _stateLock is already held, or deadlock will occur. + * Thread safe, but must not be called if _stateLock is already held, or + * deadlock will occur. */ - void applyTimedBackpressure(); + void apply_timed_backpressure(); + bool backpressure_mode_active() const; // For unit testing only const ActiveMergeMap& getActiveMerges() const { return _merges; } diff --git a/storage/src/vespa/storage/storageserver/service_layer_error_listener.cpp b/storage/src/vespa/storage/storageserver/service_layer_error_listener.cpp new file mode 100644 index 00000000000..41177fe46b8 --- /dev/null +++ b/storage/src/vespa/storage/storageserver/service_layer_error_listener.cpp @@ -0,0 +1,36 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "service_layer_error_listener.h" +#include <vespa/storage/common/storagecomponent.h> +#include <vespa/storage/storageserver/mergethrottler.h> + +#include <vespa/log/log.h> +LOG_SETUP(".node.errorlistener"); + +namespace storage { + +void ServiceLayerErrorListener::on_fatal_error(vespalib::stringref message) { + bool expected = false; + if (_shutdown_initiated.compare_exchange_strong(expected, true)) { + LOG(info, + "Received FATAL_ERROR from persistence provider, " + "shutting down node: %s", + message.c_str()); + _component.requestShutdown(message); // Thread safe + } else { + LOG(debug, + "Received FATAL_ERROR from persistence provider: %s. " + "Node has already been instructed to shut down so " + "not doing anything now.", + message.c_str()); + } +} + +void ServiceLayerErrorListener::on_resource_exhaustion_error(vespalib::stringref message) { + LOG(debug, "SPI reports resource exhaustion ('%s'). " + "Applying back-pressure to merge throttler", + message.c_str()); + _merge_throttler.apply_timed_backpressure(); // Thread safe +} + +} diff --git a/storage/src/vespa/storage/storageserver/service_layer_error_listener.h b/storage/src/vespa/storage/storageserver/service_layer_error_listener.h new file mode 100644 index 00000000000..6995459e333 --- /dev/null +++ b/storage/src/vespa/storage/storageserver/service_layer_error_listener.h @@ -0,0 +1,36 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/storage/persistence/provider_error_wrapper.h> +#include <atomic> + +namespace storage { + +class StorageComponent; +class MergeThrottler; + +/* + * Listener implementation for SPI errors that require action beyond simply + * responding to the command that generated them. + * + * - Fatal errors will trigger a process shutdown. + * - Resource exhaustion errors will trigger merge back-pressure. + */ +class ServiceLayerErrorListener : public ProviderErrorListener { + StorageComponent& _component; + MergeThrottler& _merge_throttler; + std::atomic<bool> _shutdown_initiated; +public: + ServiceLayerErrorListener(StorageComponent& component, + MergeThrottler& merge_throttler) + : _component(component), + _merge_throttler(merge_throttler), + _shutdown_initiated(false) + {} + + void on_fatal_error(vespalib::stringref message) override; + void on_resource_exhaustion_error(vespalib::stringref message) override; +}; + +} diff --git a/storage/src/vespa/storage/storageserver/servicelayernode.cpp b/storage/src/vespa/storage/storageserver/servicelayernode.cpp index 4f87b9af6f2..bd4d6bd5200 100644 --- a/storage/src/vespa/storage/storageserver/servicelayernode.cpp +++ b/storage/src/vespa/storage/storageserver/servicelayernode.cpp @@ -9,6 +9,7 @@ #include "opslogger.h" #include "statemanager.h" #include "priorityconverter.h" +#include "service_layer_error_listener.h" #include <vespa/storage/visiting/messagebusvisitormessagesession.h> #include <vespa/storage/visiting/visitormanager.h> #include <vespa/storage/bucketdb/bucketmanager.h> @@ -16,6 +17,7 @@ #include <vespa/storage/bucketmover/bucketmover.h> #include <vespa/storage/persistence/filestorage/filestormanager.h> #include <vespa/storage/persistence/filestorage/modifiedbucketchecker.h> +#include <vespa/storage/persistence/provider_error_wrapper.h> #include <vespa/persistence/spi/exceptions.h> #include <vespa/messagebus/rpcmessagebus.h> @@ -35,7 +37,7 @@ ServiceLayerNode::ServiceLayerNode( _persistenceProvider(persistenceProvider), _partitions(0), _externalVisitors(externalVisitors), - _fileStorManager(0), + _fileStorManager(nullptr), _init_has_been_called(false), _noUsablePartitionMode(false) { @@ -112,7 +114,7 @@ void ServiceLayerNode::removeConfigSubscriptions() { StorageNode::removeConfigSubscriptions(); - _configFetcher.reset(0); + _configFetcher.reset(); } void @@ -166,7 +168,7 @@ ServiceLayerNode::initializeNodeSpecific() void ServiceLayerNode::handleLiveConfigUpdate() { - if (_newServerConfig.get() != 0) { + if (_newServerConfig) { bool updated = false; vespa::config::content::core::StorServerConfigBuilder oldC(*_serverConfig); vespa::config::content::core::StorServerConfig& newC(*_newServerConfig); @@ -218,9 +220,11 @@ ServiceLayerNode::configure( // updates { vespalib::LockGuard configLockGuard(_configLock); - _newDevicesConfig.reset(config.release()); + _newDevicesConfig = std::move(config); + } + if (_distributionConfig) { + handleLiveConfigUpdate(); } - if (_distributionConfig.get() != 0) handleLiveConfigUpdate(); } VisitorMessageSession::UP @@ -258,8 +262,9 @@ ServiceLayerNode::createChain() chain->push_back(StorageLink::UP(releaseStateManager().release())); return chain; } + MergeThrottler* merge_throttler; chain->push_back(StorageLink::UP(new OpsLogger(compReg, _configUri))); - chain->push_back(StorageLink::UP(new MergeThrottler(_configUri, compReg))); + chain->push_back(StorageLink::UP(merge_throttler = new MergeThrottler(_configUri, compReg))); chain->push_back(StorageLink::UP(new ChangedBucketOwnershipHandler(_configUri, compReg))); chain->push_back(StorageLink::UP(new BucketIntegrityChecker(_configUri, compReg))); chain->push_back(StorageLink::UP(new bucketmover::BucketMover(_configUri, compReg))); @@ -277,6 +282,12 @@ ServiceLayerNode::createChain() _configUri, _partitions, _persistenceProvider, _context.getComponentRegister()))); chain->push_back(StorageLink::UP(releaseStateManager().release())); + + // Lifetimes of all referenced components shall outlive the last call going + // through the SPI, as queues are flushed and worker threads joined when + // the storage link chain is closed prior to destruction. + auto error_listener = std::make_shared<ServiceLayerErrorListener>(*_component, *merge_throttler); + _fileStorManager->error_wrapper().register_error_listener(std::move(error_listener)); return chain; } |