From 284dba9f34a5eb84b83b7ae706cc6274f323ffac Mon Sep 17 00:00:00 2001 From: Geir Storli Date: Tue, 11 May 2021 15:04:47 +0000 Subject: Stop all stripe threads before starting shutdown (and closing) of the storage link chain. This is required to avoid stripe threads being able to send up messages while the communication manager is being closed. Such messages will fail at the RPC layer (already closed) and an error reply is sent down from the communication manager. This triggers an assert in StorageLink::sendDown() which is already CLOSED. --- .../src/tests/distributor/distributortestutil.cpp | 3 +++ .../src/tests/distributor/distributortestutil.h | 6 +++-- .../src/vespa/storage/distributor/distributor.cpp | 27 +++++++++++----------- .../src/vespa/storage/distributor/distributor.h | 3 ++- .../storage/distributor/distributor_stripe_pool.h | 1 + .../storage/storageserver/distributornode.cpp | 7 ++++-- .../vespa/storage/storageserver/distributornode.h | 3 +++ 7 files changed, 31 insertions(+), 19 deletions(-) (limited to 'storage/src') diff --git a/storage/src/tests/distributor/distributortestutil.cpp b/storage/src/tests/distributor/distributortestutil.cpp index be123d6281c..e92ba0374bc 100644 --- a/storage/src/tests/distributor/distributortestutil.cpp +++ b/storage/src/tests/distributor/distributortestutil.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include @@ -28,10 +29,12 @@ DistributorTestUtil::createLinks() { _node.reset(new TestDistributorApp(_config.getConfigId())); _threadPool = framework::TickingThreadPool::createDefault("distributor"); + _stripe_pool = std::make_unique(); _distributor.reset(new Distributor( _node->getComponentRegister(), _node->node_identity(), *_threadPool, + *_stripe_pool, *this, _num_distributor_stripes, _hostInfo, diff --git a/storage/src/tests/distributor/distributortestutil.h b/storage/src/tests/distributor/distributortestutil.h index de46905c870..63ca47755e6 100644 --- a/storage/src/tests/distributor/distributortestutil.h +++ b/storage/src/tests/distributor/distributortestutil.h @@ -17,16 +17,17 @@ namespace framework { struct TickingThreadPool; } namespace distributor { -class StripeBucketDBUpdater; class Distributor; class DistributorBucketSpace; class DistributorBucketSpaceRepo; -class DistributorStripeOperationContext; class DistributorStripe; class DistributorStripeComponent; +class DistributorStripeOperationContext; +class DistributorStripePool; class ExternalOperationHandler; class IdealStateManager; class Operation; +class StripeBucketDBUpdater; // TODO STRIPE rename to DistributorStripeTestUtil? class DistributorTestUtil : private DoneInitializeHandler @@ -206,6 +207,7 @@ protected: vdstestlib::DirConfig _config; std::unique_ptr _node; std::unique_ptr _threadPool; + std::unique_ptr _stripe_pool; std::unique_ptr _distributor; std::unique_ptr _component; DistributorMessageSenderStub _sender; diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp index f7ecd324a51..47f7fee5873 100644 --- a/storage/src/vespa/storage/distributor/distributor.cpp +++ b/storage/src/vespa/storage/distributor/distributor.cpp @@ -48,6 +48,7 @@ namespace storage::distributor { Distributor::Distributor(DistributorComponentRegister& compReg, const NodeIdentity& node_identity, framework::TickingThreadPool& threadPool, + DistributorStripePool& stripe_pool, DoneInitializeHandler& doneInitHandler, uint32_t num_distributor_stripes, HostInfo& hostInfoReporterRegistrar, @@ -60,7 +61,7 @@ Distributor::Distributor(DistributorComponentRegister& compReg, _use_legacy_mode(num_distributor_stripes == 0), _stripe(std::make_unique(compReg, *_metrics, node_identity, threadPool, doneInitHandler, *this, *this, _use_legacy_mode)), - _stripe_pool(), + _stripe_pool(stripe_pool), _stripes(), _stripe_accessor(), _message_queue(), @@ -88,8 +89,7 @@ Distributor::Distributor(DistributorComponentRegister& compReg, _component.registerMetricUpdateHook(_metricUpdateHook, framework::SecondTime(0)); if (!_use_legacy_mode) { LOG(info, "Setting up distributor with %u stripes", num_distributor_stripes); // TODO STRIPE remove once legacy gone - _stripe_pool = std::make_unique(); - _stripe_accessor = std::make_unique(*_stripe_pool); + _stripe_accessor = std::make_unique(_stripe_pool); _bucket_db_updater = std::make_unique(_component, _component, *this, *this, _component.getDistribution(), @@ -253,7 +253,7 @@ Distributor::onOpen() _threadPool.start(_component.getThreadPool()); if (!_use_legacy_mode) { std::vector pool_stripes({_stripes[0].get()}); - _stripe_pool->start(pool_stripes); + _stripe_pool.start(pool_stripes); } } else { LOG(warning, "Not starting distributor thread as it's configured to " @@ -263,6 +263,8 @@ Distributor::onOpen() } void Distributor::onClose() { + // Note: In a running system this function is called by the main thread in StorageApp as part of shutdown. + // The distributor and stripe thread pools are already stopped at this point. LOG(debug, "Distributor::onClose invoked"); if (_use_legacy_mode) { _stripe->flush_and_close(); @@ -270,14 +272,11 @@ void Distributor::onClose() { // Tests may run with multiple stripes but without threads (for determinism's sake), // so only try to flush stripes if a pool is running. // TODO STRIPE probably also need to flush when running tests to handle any explicit close-tests. - if (_stripe_pool->stripe_count() > 0){ - { - auto guard = _stripe_accessor->rendezvous_and_hold_all(); - guard->flush_and_close(); + if (_stripe_pool.stripe_count() > 0) { + assert(_stripe_pool.is_stopped()); + for (auto& thread : _stripe_pool) { + thread->stripe().flush_and_close(); } - // TODO STRIPE must ensure no incoming requests can be posted on stripes between close - // and pool stop+join! - _stripe_pool->stop_and_join(); } assert(_bucket_db_updater); _bucket_db_updater->flush(); @@ -335,11 +334,11 @@ Distributor::onDown(const std::shared_ptr& msg) return true; } assert(_stripes.size() == 1); - assert(_stripe_pool->stripe_count() == 1); + assert(_stripe_pool.stripe_count() == 1); // TODO STRIPE correct routing with multiple stripes bool handled = first_stripe().handle_or_enqueue_message(msg); if (handled) { - _stripe_pool->stripe_thread(0).notify_event_has_triggered(); + _stripe_pool.stripe_thread(0).notify_event_has_triggered(); } return handled; } @@ -440,7 +439,7 @@ Distributor::propagateDefaultDistribution( _stripe->propagateDefaultDistribution(std::move(distribution)); } else { // Should only be called at ctor time, at which point the pool is not yet running. - assert(_stripe_pool->stripe_count() == 0); + assert(_stripe_pool.stripe_count() == 0); assert(_stripes.size() == 1); // TODO STRIPE all the stripes yes auto new_configs = BucketSpaceDistributionConfigs::from_default_distribution(std::move(distribution)); for (auto& stripe : _stripes) { diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h index b6dbc4432eb..50bd2526ff4 100644 --- a/storage/src/vespa/storage/distributor/distributor.h +++ b/storage/src/vespa/storage/distributor/distributor.h @@ -63,6 +63,7 @@ public: Distributor(DistributorComponentRegister&, const NodeIdentity& node_identity, framework::TickingThreadPool&, + DistributorStripePool& stripe_pool, DoneInitializeHandler&, uint32_t num_distributor_stripes, HostInfo& hostInfoReporterRegistrar, @@ -201,7 +202,7 @@ private: const bool _use_legacy_mode; // TODO STRIPE multiple stripes...! This is for proof of concept of wiring. std::unique_ptr _stripe; - std::unique_ptr _stripe_pool; + DistributorStripePool& _stripe_pool; std::vector> _stripes; std::unique_ptr _stripe_accessor; MessageQueue _message_queue; // Queue for top-level ops diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_pool.h b/storage/src/vespa/storage/distributor/distributor_stripe_pool.h index 72534de57ae..5e72cb47fc4 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe_pool.h +++ b/storage/src/vespa/storage/distributor/distributor_stripe_pool.h @@ -77,6 +77,7 @@ public: return *_stripes[idx]; } [[nodiscard]] size_t stripe_count() const noexcept { return _stripes.size(); } + [[nodiscard]] bool is_stopped() const noexcept { return _stopped; } // Applies to all threads. May be called both before and after start(). Thread safe. void set_tick_wait_duration(vespalib::duration new_tick_wait_duration) noexcept; diff --git a/storage/src/vespa/storage/storageserver/distributornode.cpp b/storage/src/vespa/storage/storageserver/distributornode.cpp index 8f4f0422f44..f49b2a23688 100644 --- a/storage/src/vespa/storage/storageserver/distributornode.cpp +++ b/storage/src/vespa/storage/storageserver/distributornode.cpp @@ -5,9 +5,10 @@ #include "communicationmanager.h" #include "opslogger.h" #include "statemanager.h" +#include #include #include -#include +#include #include #include @@ -26,6 +27,7 @@ DistributorNode::DistributorNode( std::make_unique(), !communicationManager ? NORMAL : SINGLE_THREADED_TEST_MODE), _threadPool(framework::TickingThreadPool::createDefault("distributor")), + _stripe_pool(std::make_unique()), _context(context), _lastUniqueTimestampRequested(0), _uniqueTimestampCounter(0), @@ -52,6 +54,7 @@ void DistributorNode::shutdownDistributor() { _threadPool->stop(); + _stripe_pool->stop_and_join(); shutdown(); } @@ -100,7 +103,7 @@ DistributorNode::createChain(IStorageChainBuilder &builder) // manager, which is safe since the lifetime of said state manager // extends to the end of the process. builder.add(std::make_unique - (dcr, *_node_identity, *_threadPool, getDoneInitializeHandler(), + (dcr, *_node_identity, *_threadPool, *_stripe_pool, getDoneInitializeHandler(), _num_distributor_stripes, stateManager->getHostInfo())); diff --git a/storage/src/vespa/storage/storageserver/distributornode.h b/storage/src/vespa/storage/storageserver/distributornode.h index 267d4400ac7..f2e483bbc9f 100644 --- a/storage/src/vespa/storage/storageserver/distributornode.h +++ b/storage/src/vespa/storage/storageserver/distributornode.h @@ -15,6 +15,8 @@ namespace storage { +namespace distributor { class DistributorStripePool; } + class IStorageChainBuilder; class DistributorNode @@ -22,6 +24,7 @@ class DistributorNode private UniqueTimeCalculator { framework::TickingThreadPool::UP _threadPool; + std::unique_ptr _stripe_pool; DistributorNodeContext& _context; uint64_t _lastUniqueTimestampRequested; uint32_t _uniqueTimestampCounter; -- cgit v1.2.3