diff options
author | Geir Storli <geirst@verizonmedia.com> | 2021-05-11 19:32:05 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-05-11 19:32:05 +0200 |
commit | 809d093a732b81711fe54bcd8e2246b574396e0c (patch) | |
tree | 29f677aa767576ffae37768dd2a6328c1ed9558b | |
parent | 839a6f9a7d1f66937f51db3766a2dfd3e7b90675 (diff) | |
parent | 284dba9f34a5eb84b83b7ae706cc6274f323ffac (diff) |
Merge pull request #17824 from vespa-engine/geirst/deterministic-distributor-shutdown-in-new-stripe-mode
Stop all stripe threads before starting shutdown (and closing) of theā¦
7 files changed, 31 insertions, 19 deletions
diff --git a/storage/src/tests/distributor/distributortestutil.cpp b/storage/src/tests/distributor/distributortestutil.cpp index be123d6281c..e92ba0374bc 100644 --- a/storage/src/tests/distributor/distributortestutil.cpp +++ b/storage/src/tests/distributor/distributortestutil.cpp @@ -7,6 +7,7 @@ #include <vespa/storage/distributor/distributor_bucket_space.h> #include <vespa/storage/distributor/distributor_stripe.h> #include <vespa/storage/distributor/distributor_stripe_component.h> +#include <vespa/storage/distributor/distributor_stripe_pool.h> #include <vespa/vdslib/distribution/distribution.h> #include <vespa/vespalib/text/stringtokenizer.h> @@ -28,10 +29,12 @@ DistributorTestUtil::createLinks() { _node.reset(new TestDistributorApp(_config.getConfigId())); _threadPool = framework::TickingThreadPool::createDefault("distributor"); + _stripe_pool = std::make_unique<DistributorStripePool>(); _distributor.reset(new Distributor( _node->getComponentRegister(), _node->node_identity(), *_threadPool, + *_stripe_pool, *this, _num_distributor_stripes, _hostInfo, diff --git a/storage/src/tests/distributor/distributortestutil.h b/storage/src/tests/distributor/distributortestutil.h index de46905c870..63ca47755e6 100644 --- a/storage/src/tests/distributor/distributortestutil.h +++ b/storage/src/tests/distributor/distributortestutil.h @@ -17,16 +17,17 @@ namespace framework { struct TickingThreadPool; } namespace distributor { -class StripeBucketDBUpdater; class Distributor; class DistributorBucketSpace; class DistributorBucketSpaceRepo; -class DistributorStripeOperationContext; class DistributorStripe; class DistributorStripeComponent; +class DistributorStripeOperationContext; +class DistributorStripePool; class ExternalOperationHandler; class IdealStateManager; class Operation; +class StripeBucketDBUpdater; // TODO STRIPE rename to DistributorStripeTestUtil? class DistributorTestUtil : private DoneInitializeHandler @@ -206,6 +207,7 @@ protected: vdstestlib::DirConfig _config; std::unique_ptr<TestDistributorApp> _node; std::unique_ptr<framework::TickingThreadPool> _threadPool; + std::unique_ptr<DistributorStripePool> _stripe_pool; std::unique_ptr<Distributor> _distributor; std::unique_ptr<storage::DistributorComponent> _component; DistributorMessageSenderStub _sender; diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp index f7ecd324a51..47f7fee5873 100644 --- a/storage/src/vespa/storage/distributor/distributor.cpp +++ b/storage/src/vespa/storage/distributor/distributor.cpp @@ -48,6 +48,7 @@ namespace storage::distributor { Distributor::Distributor(DistributorComponentRegister& compReg, const NodeIdentity& node_identity, framework::TickingThreadPool& threadPool, + DistributorStripePool& stripe_pool, DoneInitializeHandler& doneInitHandler, uint32_t num_distributor_stripes, HostInfo& hostInfoReporterRegistrar, @@ -60,7 +61,7 @@ Distributor::Distributor(DistributorComponentRegister& compReg, _use_legacy_mode(num_distributor_stripes == 0), _stripe(std::make_unique<DistributorStripe>(compReg, *_metrics, node_identity, threadPool, doneInitHandler, *this, *this, _use_legacy_mode)), - _stripe_pool(), + _stripe_pool(stripe_pool), _stripes(), _stripe_accessor(), _message_queue(), @@ -88,8 +89,7 @@ Distributor::Distributor(DistributorComponentRegister& compReg, _component.registerMetricUpdateHook(_metricUpdateHook, framework::SecondTime(0)); if (!_use_legacy_mode) { LOG(info, "Setting up distributor with %u stripes", num_distributor_stripes); // TODO STRIPE remove once legacy gone - _stripe_pool = std::make_unique<DistributorStripePool>(); - _stripe_accessor = std::make_unique<MultiThreadedStripeAccessor>(*_stripe_pool); + _stripe_accessor = std::make_unique<MultiThreadedStripeAccessor>(_stripe_pool); _bucket_db_updater = std::make_unique<BucketDBUpdater>(_component, _component, *this, *this, _component.getDistribution(), @@ -253,7 +253,7 @@ Distributor::onOpen() _threadPool.start(_component.getThreadPool()); if (!_use_legacy_mode) { std::vector<TickableStripe*> pool_stripes({_stripes[0].get()}); - _stripe_pool->start(pool_stripes); + _stripe_pool.start(pool_stripes); } } else { LOG(warning, "Not starting distributor thread as it's configured to " @@ -263,6 +263,8 @@ Distributor::onOpen() } void Distributor::onClose() { + // Note: In a running system this function is called by the main thread in StorageApp as part of shutdown. + // The distributor and stripe thread pools are already stopped at this point. LOG(debug, "Distributor::onClose invoked"); if (_use_legacy_mode) { _stripe->flush_and_close(); @@ -270,14 +272,11 @@ void Distributor::onClose() { // Tests may run with multiple stripes but without threads (for determinism's sake), // so only try to flush stripes if a pool is running. // TODO STRIPE probably also need to flush when running tests to handle any explicit close-tests. - if (_stripe_pool->stripe_count() > 0){ - { - auto guard = _stripe_accessor->rendezvous_and_hold_all(); - guard->flush_and_close(); + if (_stripe_pool.stripe_count() > 0) { + assert(_stripe_pool.is_stopped()); + for (auto& thread : _stripe_pool) { + thread->stripe().flush_and_close(); } - // TODO STRIPE must ensure no incoming requests can be posted on stripes between close - // and pool stop+join! - _stripe_pool->stop_and_join(); } assert(_bucket_db_updater); _bucket_db_updater->flush(); @@ -335,11 +334,11 @@ Distributor::onDown(const std::shared_ptr<api::StorageMessage>& msg) return true; } assert(_stripes.size() == 1); - assert(_stripe_pool->stripe_count() == 1); + assert(_stripe_pool.stripe_count() == 1); // TODO STRIPE correct routing with multiple stripes bool handled = first_stripe().handle_or_enqueue_message(msg); if (handled) { - _stripe_pool->stripe_thread(0).notify_event_has_triggered(); + _stripe_pool.stripe_thread(0).notify_event_has_triggered(); } return handled; } @@ -440,7 +439,7 @@ Distributor::propagateDefaultDistribution( _stripe->propagateDefaultDistribution(std::move(distribution)); } else { // Should only be called at ctor time, at which point the pool is not yet running. - assert(_stripe_pool->stripe_count() == 0); + assert(_stripe_pool.stripe_count() == 0); assert(_stripes.size() == 1); // TODO STRIPE all the stripes yes auto new_configs = BucketSpaceDistributionConfigs::from_default_distribution(std::move(distribution)); for (auto& stripe : _stripes) { diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h index b6dbc4432eb..50bd2526ff4 100644 --- a/storage/src/vespa/storage/distributor/distributor.h +++ b/storage/src/vespa/storage/distributor/distributor.h @@ -63,6 +63,7 @@ public: Distributor(DistributorComponentRegister&, const NodeIdentity& node_identity, framework::TickingThreadPool&, + DistributorStripePool& stripe_pool, DoneInitializeHandler&, uint32_t num_distributor_stripes, HostInfo& hostInfoReporterRegistrar, @@ -201,7 +202,7 @@ private: const bool _use_legacy_mode; // TODO STRIPE multiple stripes...! This is for proof of concept of wiring. std::unique_ptr<DistributorStripe> _stripe; - std::unique_ptr<DistributorStripePool> _stripe_pool; + DistributorStripePool& _stripe_pool; std::vector<std::unique_ptr<DistributorStripe>> _stripes; std::unique_ptr<StripeAccessor> _stripe_accessor; MessageQueue _message_queue; // Queue for top-level ops diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_pool.h b/storage/src/vespa/storage/distributor/distributor_stripe_pool.h index 72534de57ae..5e72cb47fc4 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe_pool.h +++ b/storage/src/vespa/storage/distributor/distributor_stripe_pool.h @@ -77,6 +77,7 @@ public: return *_stripes[idx]; } [[nodiscard]] size_t stripe_count() const noexcept { return _stripes.size(); } + [[nodiscard]] bool is_stopped() const noexcept { return _stopped; } // Applies to all threads. May be called both before and after start(). Thread safe. void set_tick_wait_duration(vespalib::duration new_tick_wait_duration) noexcept; diff --git a/storage/src/vespa/storage/storageserver/distributornode.cpp b/storage/src/vespa/storage/storageserver/distributornode.cpp index 8f4f0422f44..f49b2a23688 100644 --- a/storage/src/vespa/storage/storageserver/distributornode.cpp +++ b/storage/src/vespa/storage/storageserver/distributornode.cpp @@ -5,9 +5,10 @@ #include "communicationmanager.h" #include "opslogger.h" #include "statemanager.h" +#include <vespa/storage/common/hostreporter/hostinfo.h> #include <vespa/storage/common/i_storage_chain_builder.h> #include <vespa/storage/distributor/distributor.h> -#include <vespa/storage/common/hostreporter/hostinfo.h> +#include <vespa/storage/distributor/distributor_stripe_pool.h> #include <vespa/vespalib/util/exceptions.h> #include <vespa/log/log.h> @@ -26,6 +27,7 @@ DistributorNode::DistributorNode( std::make_unique<HostInfo>(), !communicationManager ? NORMAL : SINGLE_THREADED_TEST_MODE), _threadPool(framework::TickingThreadPool::createDefault("distributor")), + _stripe_pool(std::make_unique<distributor::DistributorStripePool>()), _context(context), _lastUniqueTimestampRequested(0), _uniqueTimestampCounter(0), @@ -52,6 +54,7 @@ void DistributorNode::shutdownDistributor() { _threadPool->stop(); + _stripe_pool->stop_and_join(); shutdown(); } @@ -100,7 +103,7 @@ DistributorNode::createChain(IStorageChainBuilder &builder) // manager, which is safe since the lifetime of said state manager // extends to the end of the process. builder.add(std::make_unique<storage::distributor::Distributor> - (dcr, *_node_identity, *_threadPool, getDoneInitializeHandler(), + (dcr, *_node_identity, *_threadPool, *_stripe_pool, getDoneInitializeHandler(), _num_distributor_stripes, stateManager->getHostInfo())); diff --git a/storage/src/vespa/storage/storageserver/distributornode.h b/storage/src/vespa/storage/storageserver/distributornode.h index 267d4400ac7..f2e483bbc9f 100644 --- a/storage/src/vespa/storage/storageserver/distributornode.h +++ b/storage/src/vespa/storage/storageserver/distributornode.h @@ -15,6 +15,8 @@ namespace storage { +namespace distributor { class DistributorStripePool; } + class IStorageChainBuilder; class DistributorNode @@ -22,6 +24,7 @@ class DistributorNode private UniqueTimeCalculator { framework::TickingThreadPool::UP _threadPool; + std::unique_ptr<distributor::DistributorStripePool> _stripe_pool; DistributorNodeContext& _context; uint64_t _lastUniqueTimestampRequested; uint32_t _uniqueTimestampCounter; |