diff options
author | Tor Brede Vekterli <vekterli@yahoo-inc.com> | 2017-08-30 14:45:35 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@yahoo-inc.com> | 2017-08-30 14:45:35 +0000 |
commit | 239ccaca10f97e9e4038ae48604e248e14074eb3 (patch) | |
tree | ea56ad802c316489d035ea8f7d4edf6af9e29c1e /storage | |
parent | 6d220d01be0b76921ffd0de9ecfd8256db5c176b (diff) |
Generalize SPI shutdown wrapper into error source for listeners
Diffstat (limited to 'storage')
5 files changed, 239 insertions, 74 deletions
diff --git a/storage/src/tests/persistence/providershutdownwrappertest.cpp b/storage/src/tests/persistence/providershutdownwrappertest.cpp index 3475aa58dfd..f7998607184 100644 --- a/storage/src/tests/persistence/providershutdownwrappertest.cpp +++ b/storage/src/tests/persistence/providershutdownwrappertest.cpp @@ -6,17 +6,27 @@ namespace storage { -class ProviderShutdownWrapperTest : public SingleDiskPersistenceTestUtils -{ +// TODO rename file +class ProviderErrorWrapperTest : public SingleDiskPersistenceTestUtils { public: - CPPUNIT_TEST_SUITE(ProviderShutdownWrapperTest); + CPPUNIT_TEST_SUITE(ProviderErrorWrapperTest); CPPUNIT_TEST(testShutdownOnFatalError); + CPPUNIT_TEST(fatal_error_invokes_listener); + CPPUNIT_TEST(resource_exhaustion_error_invokes_listener); + CPPUNIT_TEST(listener_not_invoked_on_success); + CPPUNIT_TEST(listener_not_invoked_on_regular_errors); + CPPUNIT_TEST(multiple_listeners_can_be_registered); CPPUNIT_TEST_SUITE_END(); void testShutdownOnFatalError(); + void fatal_error_invokes_listener(); + void resource_exhaustion_error_invokes_listener(); + void listener_not_invoked_on_success(); + void listener_not_invoked_on_regular_errors(); + void multiple_listeners_can_be_registered(); }; -CPPUNIT_TEST_SUITE_REGISTRATION(ProviderShutdownWrapperTest); +CPPUNIT_TEST_SUITE_REGISTRATION(ProviderErrorWrapperTest); namespace { @@ -36,31 +46,136 @@ private: vespalib::string _reason; }; -} +struct MockErrorListener : ProviderErrorListener { + void on_fatal_error(vespalib::stringref message) override { + _seen_fatal_error = true; + _fatal_error = message; + } + void on_resource_exhaustion_error(vespalib::stringref message) override { + _seen_resource_exhaustion_error = true; + _resource_exhaustion_error = message; + } -void -ProviderShutdownWrapperTest::testShutdownOnFatalError() -{ + vespalib::string _fatal_error; + vespalib::string _resource_exhaustion_error; + bool _seen_fatal_error{false}; + bool _seen_resource_exhaustion_error{false}; +}; + +struct Fixture { // We wrap the wrapper. It's turtles all the way down! - PersistenceProviderWrapper providerWrapper( - getPersistenceProvider()); + PersistenceProviderWrapper providerWrapper; TestServiceLayerApp app; - ServiceLayerComponent component(app.getComponentRegister(), "dummy"); + ServiceLayerComponent component; + ProviderErrorWrapper errorWrapper; + + Fixture(spi::PersistenceProvider& provider) + : providerWrapper(provider), + app(), + component(app.getComponentRegister(), "dummy"), + errorWrapper(providerWrapper, component) + { + providerWrapper.setFailureMask(PersistenceProviderWrapper::FAIL_ALL_OPERATIONS); + } + ~Fixture(); - ProviderShutdownWrapper shutdownWrapper(providerWrapper, component); + void perform_spi_operation() { + errorWrapper.getBucketInfo(spi::Bucket(document::BucketId(16, 1234), spi::PartitionId(0))); + } +}; + +Fixture::~Fixture() {} + +} + +void ProviderErrorWrapperTest::fatal_error_invokes_listener() { + Fixture f(getPersistenceProvider()); + auto listener = std::make_shared<MockErrorListener>(); + f.errorWrapper.register_error_listener(listener); + f.providerWrapper.setResult(spi::Result(spi::Result::FATAL_ERROR, "eject! eject!")); + + CPPUNIT_ASSERT(!listener->_seen_fatal_error); + f.perform_spi_operation(); + + CPPUNIT_ASSERT(!listener->_seen_resource_exhaustion_error); + CPPUNIT_ASSERT(listener->_seen_fatal_error); + CPPUNIT_ASSERT_EQUAL(vespalib::string("eject! eject!"), listener->_fatal_error); +} + +void ProviderErrorWrapperTest::resource_exhaustion_error_invokes_listener() { + Fixture f(getPersistenceProvider()); + auto listener = std::make_shared<MockErrorListener>(); + f.errorWrapper.register_error_listener(listener); + f.providerWrapper.setResult(spi::Result(spi::Result::RESOURCE_EXHAUSTED, "out of juice")); + + CPPUNIT_ASSERT(!listener->_seen_resource_exhaustion_error); + f.perform_spi_operation(); + + CPPUNIT_ASSERT(!listener->_seen_fatal_error); + CPPUNIT_ASSERT(listener->_seen_resource_exhaustion_error); + CPPUNIT_ASSERT_EQUAL(vespalib::string("out of juice"), listener->_resource_exhaustion_error); +} + +void ProviderErrorWrapperTest::listener_not_invoked_on_success() { + Fixture f(getPersistenceProvider()); + auto listener = std::make_shared<MockErrorListener>(); + f.errorWrapper.register_error_listener(listener); + f.perform_spi_operation(); + + CPPUNIT_ASSERT(!listener->_seen_fatal_error); + CPPUNIT_ASSERT(!listener->_seen_resource_exhaustion_error); +} + +void ProviderErrorWrapperTest::listener_not_invoked_on_regular_errors() { + Fixture f(getPersistenceProvider()); + auto listener = std::make_shared<MockErrorListener>(); + f.errorWrapper.register_error_listener(listener); + + // TODO dedupe + f.providerWrapper.setResult(spi::Result(spi::Result::TRANSIENT_ERROR, "beep boop")); + f.perform_spi_operation(); + CPPUNIT_ASSERT(!listener->_seen_fatal_error); + CPPUNIT_ASSERT(!listener->_seen_resource_exhaustion_error); + + f.providerWrapper.setResult(spi::Result(spi::Result::PERMANENT_ERROR, "beep boop!!")); + f.perform_spi_operation(); + CPPUNIT_ASSERT(!listener->_seen_fatal_error); + CPPUNIT_ASSERT(!listener->_seen_resource_exhaustion_error); +} + +void ProviderErrorWrapperTest::multiple_listeners_can_be_registered() { + Fixture f(getPersistenceProvider()); + auto listener1 = std::make_shared<MockErrorListener>(); + auto listener2 = std::make_shared<MockErrorListener>(); + f.errorWrapper.register_error_listener(listener1); + f.errorWrapper.register_error_listener(listener2); + + f.providerWrapper.setResult(spi::Result(spi::Result::RESOURCE_EXHAUSTED, "out of juice")); + f.perform_spi_operation(); + + CPPUNIT_ASSERT(listener1->_seen_resource_exhaustion_error); + CPPUNIT_ASSERT(listener2->_seen_resource_exhaustion_error); +} + + + +// TODO rewrite, move component interaction testing elsewhere +void +ProviderErrorWrapperTest::testShutdownOnFatalError() { + Fixture f(getPersistenceProvider()); TestShutdownListener shutdownListener; - app.getComponentRegister().registerShutdownListener(shutdownListener); + f.app.getComponentRegister().registerShutdownListener(shutdownListener); - providerWrapper.setResult( + f.providerWrapper.setResult( spi::Result(spi::Result::FATAL_ERROR, "eject! eject!")); - providerWrapper.setFailureMask( + f.providerWrapper.setFailureMask( PersistenceProviderWrapper::FAIL_ALL_OPERATIONS); CPPUNIT_ASSERT(!shutdownListener.shutdownRequested()); // This should cause the node to implicitly be shut down - shutdownWrapper.getBucketInfo( + f.errorWrapper.getBucketInfo( spi::Bucket(document::BucketId(16, 1234), spi::PartitionId(0))); @@ -69,10 +184,10 @@ ProviderShutdownWrapperTest::testShutdownOnFatalError() shutdownListener.getReason()); // Triggering a new error should not cause shutdown to be requested twice. - providerWrapper.setResult( + f.providerWrapper.setResult( spi::Result(spi::Result::FATAL_ERROR, "boom!")); - shutdownWrapper.getBucketInfo( + f.errorWrapper.getBucketInfo( spi::Bucket(document::BucketId(16, 1234), spi::PartitionId(0))); diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h index 82ea5ecba7f..233ce5edf65 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h @@ -55,7 +55,7 @@ class FileStorManager : public StorageLinkQueued, ServiceLayerComponent _component; const spi::PartitionStateList& _partitions; spi::PersistenceProvider& _providerCore; - ProviderShutdownWrapper _providerShutdown; + ProviderErrorWrapper _providerShutdown; bool _nodeUpInLastNodeStateSeenByProvider; spi::MetricPersistenceProvider::UP _providerMetric; spi::PersistenceProvider* _provider; diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormetrics.h b/storage/src/vespa/storage/persistence/filestorage/filestormetrics.h index fe09758e842..d4dadf94184 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormetrics.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestormetrics.h @@ -95,10 +95,10 @@ struct FileStorThreadMetrics : public metrics::MetricSet metrics::LongCountMetric bytesMerged; metrics::LongCountMetric getBucketDiffReply; metrics::LongCountMetric applyBucketDiffReply; - metrics::LongAverageMetric mergeLatencyTotal; - metrics::LongAverageMetric mergeMetadataReadLatency; - metrics::LongAverageMetric mergeDataReadLatency; - metrics::LongAverageMetric mergeDataWriteLatency; + metrics::DoubleAverageMetric mergeLatencyTotal; + metrics::DoubleAverageMetric mergeMetadataReadLatency; + metrics::DoubleAverageMetric mergeDataReadLatency; + metrics::DoubleAverageMetric mergeDataWriteLatency; metrics::DoubleAverageMetric mergeAverageDataReceivedNeeded; metrics::LongAverageMetric batchingSize; diff --git a/storage/src/vespa/storage/persistence/providershutdownwrapper.cpp b/storage/src/vespa/storage/persistence/providershutdownwrapper.cpp index 1a2f9620e65..521e1928e23 100644 --- a/storage/src/vespa/storage/persistence/providershutdownwrapper.cpp +++ b/storage/src/vespa/storage/persistence/providershutdownwrapper.cpp @@ -10,68 +10,92 @@ namespace storage { template <typename ResultType> ResultType -ProviderShutdownWrapper::checkResult(ResultType&& result) const +ProviderErrorWrapper::checkResult(ResultType&& result) const { if (result.getErrorCode() == spi::Result::FATAL_ERROR) { - vespalib::LockGuard guard(_shutdownLock); - if (_shutdownTriggered) { - LOG(debug, - "Received FATAL_ERROR from persistence provider: %s. " - "Node has already been instructed to shut down so " - "not doing anything now.", - result.getErrorMessage().c_str()); - } else { - LOG(info, - "Received FATAL_ERROR from persistence provider, " - "shutting down node: %s", - result.getErrorMessage().c_str()); - const_cast<ProviderShutdownWrapper*>(this)-> - _component.requestShutdown(result.getErrorMessage()); - _shutdownTriggered = true; + trigger_shutdown_listeners_once(result.getErrorMessage()); + } else if (result.getErrorCode() == spi::Result::RESOURCE_EXHAUSTED) { + trigger_resource_exhaustion_listeners(result.getErrorMessage()); + } + return std::forward<ResultType>(result); +} + +// TODO move this out as error listener instead +void ProviderErrorWrapper::trigger_shutdown_listeners_once(vespalib::stringref reason) const { + std::lock_guard<std::mutex> guard(_mutex); + // TODO decide if this behavior even belongs here. Move to dedicated listener? + if (_shutdownTriggered) { + LOG(debug, + "Received FATAL_ERROR from persistence provider: %s. " + "Node has already been instructed to shut down so " + "not doing anything now.", + reason.c_str()); + } else { + for (auto& listener : _listeners) { + listener->on_fatal_error(reason); } + // TODO move out + LOG(info, + "Received FATAL_ERROR from persistence provider, " + "shutting down node: %s", + reason.c_str()); + const_cast<ProviderErrorWrapper*>(this)-> + _component.requestShutdown(reason); + _shutdownTriggered = true; + } +} + +void ProviderErrorWrapper::trigger_resource_exhaustion_listeners(vespalib::stringref reason) const { + std::lock_guard<std::mutex> guard(_mutex); + for (auto& listener : _listeners) { + listener->on_resource_exhaustion_error(reason); } - return std::move(result); +} + +void ProviderErrorWrapper::register_error_listener(std::shared_ptr<ProviderErrorListener> listener) { + std::lock_guard<std::mutex> guard(_mutex); + _listeners.emplace_back(std::move(listener)); } spi::Result -ProviderShutdownWrapper::initialize() +ProviderErrorWrapper::initialize() { return checkResult(_impl.initialize()); } spi::PartitionStateListResult -ProviderShutdownWrapper::getPartitionStates() const +ProviderErrorWrapper::getPartitionStates() const { return checkResult(_impl.getPartitionStates()); } spi::BucketIdListResult -ProviderShutdownWrapper::listBuckets(spi::PartitionId partitionId) const +ProviderErrorWrapper::listBuckets(spi::PartitionId partitionId) const { return checkResult(_impl.listBuckets(partitionId)); } spi::Result -ProviderShutdownWrapper::setClusterState(const spi::ClusterState& state) +ProviderErrorWrapper::setClusterState(const spi::ClusterState& state) { return checkResult(_impl.setClusterState(state)); } spi::Result -ProviderShutdownWrapper::setActiveState(const spi::Bucket& bucket, +ProviderErrorWrapper::setActiveState(const spi::Bucket& bucket, spi::BucketInfo::ActiveState newState) { return checkResult(_impl.setActiveState(bucket, newState)); } spi::BucketInfoResult -ProviderShutdownWrapper::getBucketInfo(const spi::Bucket& bucket) const +ProviderErrorWrapper::getBucketInfo(const spi::Bucket& bucket) const { return checkResult(_impl.getBucketInfo(bucket)); } spi::Result -ProviderShutdownWrapper::put(const spi::Bucket& bucket, +ProviderErrorWrapper::put(const spi::Bucket& bucket, spi::Timestamp ts, const spi::DocumentSP& doc, spi::Context& context) @@ -80,7 +104,7 @@ ProviderShutdownWrapper::put(const spi::Bucket& bucket, } spi::RemoveResult -ProviderShutdownWrapper::remove(const spi::Bucket& bucket, +ProviderErrorWrapper::remove(const spi::Bucket& bucket, spi::Timestamp ts, const document::DocumentId& docId, spi::Context& context) @@ -89,7 +113,7 @@ ProviderShutdownWrapper::remove(const spi::Bucket& bucket, } spi::RemoveResult -ProviderShutdownWrapper::removeIfFound(const spi::Bucket& bucket, +ProviderErrorWrapper::removeIfFound(const spi::Bucket& bucket, spi::Timestamp ts, const document::DocumentId& docId, spi::Context& context) @@ -98,7 +122,7 @@ ProviderShutdownWrapper::removeIfFound(const spi::Bucket& bucket, } spi::UpdateResult -ProviderShutdownWrapper::update(const spi::Bucket& bucket, +ProviderErrorWrapper::update(const spi::Bucket& bucket, spi::Timestamp ts, const spi::DocumentUpdateSP& docUpdate, spi::Context& context) @@ -107,7 +131,7 @@ ProviderShutdownWrapper::update(const spi::Bucket& bucket, } spi::GetResult -ProviderShutdownWrapper::get(const spi::Bucket& bucket, +ProviderErrorWrapper::get(const spi::Bucket& bucket, const document::FieldSet& fieldSet, const document::DocumentId& docId, spi::Context& context) const @@ -116,13 +140,13 @@ ProviderShutdownWrapper::get(const spi::Bucket& bucket, } spi::Result -ProviderShutdownWrapper::flush(const spi::Bucket& bucket, spi::Context& context) +ProviderErrorWrapper::flush(const spi::Bucket& bucket, spi::Context& context) { return checkResult(_impl.flush(bucket, context)); } spi::CreateIteratorResult -ProviderShutdownWrapper::createIterator(const spi::Bucket& bucket, +ProviderErrorWrapper::createIterator(const spi::Bucket& bucket, const document::FieldSet& fieldSet, const spi::Selection& selection, spi::IncludedVersions versions, @@ -132,7 +156,7 @@ ProviderShutdownWrapper::createIterator(const spi::Bucket& bucket, } spi::IterateResult -ProviderShutdownWrapper::iterate(spi::IteratorId iteratorId, +ProviderErrorWrapper::iterate(spi::IteratorId iteratorId, uint64_t maxByteSize, spi::Context& context) const { @@ -140,41 +164,41 @@ ProviderShutdownWrapper::iterate(spi::IteratorId iteratorId, } spi::Result -ProviderShutdownWrapper::destroyIterator(spi::IteratorId iteratorId, +ProviderErrorWrapper::destroyIterator(spi::IteratorId iteratorId, spi::Context& context) { return checkResult(_impl.destroyIterator(iteratorId, context)); } spi::Result -ProviderShutdownWrapper::createBucket(const spi::Bucket& bucket, +ProviderErrorWrapper::createBucket(const spi::Bucket& bucket, spi::Context& context) { return checkResult(_impl.createBucket(bucket, context)); } spi::Result -ProviderShutdownWrapper::deleteBucket(const spi::Bucket& bucket, +ProviderErrorWrapper::deleteBucket(const spi::Bucket& bucket, spi::Context& context) { return checkResult(_impl.deleteBucket(bucket, context)); } spi::BucketIdListResult -ProviderShutdownWrapper::getModifiedBuckets() const +ProviderErrorWrapper::getModifiedBuckets() const { return checkResult(_impl.getModifiedBuckets()); } spi::Result -ProviderShutdownWrapper::maintain(const spi::Bucket& bucket, +ProviderErrorWrapper::maintain(const spi::Bucket& bucket, spi::MaintenanceLevel level) { return checkResult(_impl.maintain(bucket, level)); } spi::Result -ProviderShutdownWrapper::split(const spi::Bucket& source, +ProviderErrorWrapper::split(const spi::Bucket& source, const spi::Bucket& target1, const spi::Bucket& target2, spi::Context& context) @@ -183,7 +207,7 @@ ProviderShutdownWrapper::split(const spi::Bucket& source, } spi::Result -ProviderShutdownWrapper::join(const spi::Bucket& source1, +ProviderErrorWrapper::join(const spi::Bucket& source1, const spi::Bucket& source2, const spi::Bucket& target, spi::Context& context) { @@ -191,14 +215,14 @@ ProviderShutdownWrapper::join(const spi::Bucket& source1, } spi::Result -ProviderShutdownWrapper::move(const spi::Bucket& source, +ProviderErrorWrapper::move(const spi::Bucket& source, spi::PartitionId target, spi::Context& context) { return checkResult(_impl.move(source, target, context)); } spi::Result -ProviderShutdownWrapper::removeEntry(const spi::Bucket& bucket, +ProviderErrorWrapper::removeEntry(const spi::Bucket& bucket, spi::Timestamp ts, spi::Context& context) { return checkResult(_impl.removeEntry(bucket, ts, context)); diff --git a/storage/src/vespa/storage/persistence/providershutdownwrapper.h b/storage/src/vespa/storage/persistence/providershutdownwrapper.h index 0700bcdcda2..41ef265ff3e 100644 --- a/storage/src/vespa/storage/persistence/providershutdownwrapper.h +++ b/storage/src/vespa/storage/persistence/providershutdownwrapper.h @@ -1,33 +1,53 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. /** - * \class storage::ProviderShutdownWrapper + * \class storage::ProviderErrorWrapper * * \brief Utility class which forwards all calls to the real persistence * provider implementation, transparently checking the result of each - * operation to see if the result is FATAL_ERROR. If so, it initiates a + * operation to see if the result is FATAL_ERROR or RESOURCE_EXHAUSTED. + * + * If FATAL_ERROR is received, the wrapper transparently initiates a * shutdown of the process (but still returns the response up to the caller - * as if it were just a non-wrapped call). + * as if it were just a non-wrapped call). FIXME update comment! * + * If RESOURCE_EXHAUSTED is received, the wrapper will invoke any and all + * resource exhaustion listeners synchronously, before returning the response + * to the caller as usual. */ #pragma once +#include <vespa/persistence/spi/persistenceprovider.h> #include <vector> #include <string> -#include <vespa/persistence/spi/persistenceprovider.h> -#include <vespa/vespalib/util/sync.h> +#include <memory> +#include <mutex> namespace storage { class ServiceLayerComponent; -class ProviderShutdownWrapper : public spi::PersistenceProvider -{ +class ProviderErrorListener { public: - ProviderShutdownWrapper(spi::PersistenceProvider& impl, - ServiceLayerComponent& component) + virtual ~ProviderErrorListener() = default; + // Note: fatal error listener will only be called _once_, as it is assumed + // that such errors are indeed fatal and will lead to the process' termination. + virtual void on_fatal_error(vespalib::stringref message) { + (void)message; + } + virtual void on_resource_exhaustion_error(vespalib::stringref message) { + (void)message; + } +}; + +// TODO rename file once name is settled! + +class ProviderErrorWrapper : public spi::PersistenceProvider { +public: + ProviderErrorWrapper(spi::PersistenceProvider& impl, + ServiceLayerComponent& component) : _impl(impl), _component(component), - _shutdownLock(), + _mutex(), _shutdownTriggered(false) { } @@ -63,6 +83,8 @@ public: const spi::PersistenceProvider& getProviderImplementation() const { return _impl; } + + void register_error_listener(std::shared_ptr<ProviderErrorListener> listener); private: /** * Check whether result has a FATAL_ERROR return code and invoke @@ -72,9 +94,13 @@ private: template <typename ResultType> ResultType checkResult(ResultType&& result) const; + void trigger_shutdown_listeners_once(vespalib::stringref reason) const; + void trigger_resource_exhaustion_listeners(vespalib::stringref reason) const; + spi::PersistenceProvider& _impl; ServiceLayerComponent& _component; - vespalib::Lock _shutdownLock; + std::vector<std::shared_ptr<ProviderErrorListener>> _listeners; + mutable std::mutex _mutex; mutable bool _shutdownTriggered; }; |