summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGeir Storli <geirst@verizonmedia.com>2021-05-11 19:32:05 +0200
committerGitHub <noreply@github.com>2021-05-11 19:32:05 +0200
commit809d093a732b81711fe54bcd8e2246b574396e0c (patch)
tree29f677aa767576ffae37768dd2a6328c1ed9558b
parent839a6f9a7d1f66937f51db3766a2dfd3e7b90675 (diff)
parent284dba9f34a5eb84b83b7ae706cc6274f323ffac (diff)
Merge pull request #17824 from vespa-engine/geirst/deterministic-distributor-shutdown-in-new-stripe-mode
Stop all stripe threads before starting shutdown (and closing) of theā€¦
-rw-r--r--storage/src/tests/distributor/distributortestutil.cpp3
-rw-r--r--storage/src/tests/distributor/distributortestutil.h6
-rw-r--r--storage/src/vespa/storage/distributor/distributor.cpp27
-rw-r--r--storage/src/vespa/storage/distributor/distributor.h3
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe_pool.h1
-rw-r--r--storage/src/vespa/storage/storageserver/distributornode.cpp7
-rw-r--r--storage/src/vespa/storage/storageserver/distributornode.h3
7 files changed, 31 insertions, 19 deletions
diff --git a/storage/src/tests/distributor/distributortestutil.cpp b/storage/src/tests/distributor/distributortestutil.cpp
index be123d6281c..e92ba0374bc 100644
--- a/storage/src/tests/distributor/distributortestutil.cpp
+++ b/storage/src/tests/distributor/distributortestutil.cpp
@@ -7,6 +7,7 @@
#include <vespa/storage/distributor/distributor_bucket_space.h>
#include <vespa/storage/distributor/distributor_stripe.h>
#include <vespa/storage/distributor/distributor_stripe_component.h>
+#include <vespa/storage/distributor/distributor_stripe_pool.h>
#include <vespa/vdslib/distribution/distribution.h>
#include <vespa/vespalib/text/stringtokenizer.h>
@@ -28,10 +29,12 @@ DistributorTestUtil::createLinks()
{
_node.reset(new TestDistributorApp(_config.getConfigId()));
_threadPool = framework::TickingThreadPool::createDefault("distributor");
+ _stripe_pool = std::make_unique<DistributorStripePool>();
_distributor.reset(new Distributor(
_node->getComponentRegister(),
_node->node_identity(),
*_threadPool,
+ *_stripe_pool,
*this,
_num_distributor_stripes,
_hostInfo,
diff --git a/storage/src/tests/distributor/distributortestutil.h b/storage/src/tests/distributor/distributortestutil.h
index de46905c870..63ca47755e6 100644
--- a/storage/src/tests/distributor/distributortestutil.h
+++ b/storage/src/tests/distributor/distributortestutil.h
@@ -17,16 +17,17 @@ namespace framework { struct TickingThreadPool; }
namespace distributor {
-class StripeBucketDBUpdater;
class Distributor;
class DistributorBucketSpace;
class DistributorBucketSpaceRepo;
-class DistributorStripeOperationContext;
class DistributorStripe;
class DistributorStripeComponent;
+class DistributorStripeOperationContext;
+class DistributorStripePool;
class ExternalOperationHandler;
class IdealStateManager;
class Operation;
+class StripeBucketDBUpdater;
// TODO STRIPE rename to DistributorStripeTestUtil?
class DistributorTestUtil : private DoneInitializeHandler
@@ -206,6 +207,7 @@ protected:
vdstestlib::DirConfig _config;
std::unique_ptr<TestDistributorApp> _node;
std::unique_ptr<framework::TickingThreadPool> _threadPool;
+ std::unique_ptr<DistributorStripePool> _stripe_pool;
std::unique_ptr<Distributor> _distributor;
std::unique_ptr<storage::DistributorComponent> _component;
DistributorMessageSenderStub _sender;
diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp
index f7ecd324a51..47f7fee5873 100644
--- a/storage/src/vespa/storage/distributor/distributor.cpp
+++ b/storage/src/vespa/storage/distributor/distributor.cpp
@@ -48,6 +48,7 @@ namespace storage::distributor {
Distributor::Distributor(DistributorComponentRegister& compReg,
const NodeIdentity& node_identity,
framework::TickingThreadPool& threadPool,
+ DistributorStripePool& stripe_pool,
DoneInitializeHandler& doneInitHandler,
uint32_t num_distributor_stripes,
HostInfo& hostInfoReporterRegistrar,
@@ -60,7 +61,7 @@ Distributor::Distributor(DistributorComponentRegister& compReg,
_use_legacy_mode(num_distributor_stripes == 0),
_stripe(std::make_unique<DistributorStripe>(compReg, *_metrics, node_identity, threadPool,
doneInitHandler, *this, *this, _use_legacy_mode)),
- _stripe_pool(),
+ _stripe_pool(stripe_pool),
_stripes(),
_stripe_accessor(),
_message_queue(),
@@ -88,8 +89,7 @@ Distributor::Distributor(DistributorComponentRegister& compReg,
_component.registerMetricUpdateHook(_metricUpdateHook, framework::SecondTime(0));
if (!_use_legacy_mode) {
LOG(info, "Setting up distributor with %u stripes", num_distributor_stripes); // TODO STRIPE remove once legacy gone
- _stripe_pool = std::make_unique<DistributorStripePool>();
- _stripe_accessor = std::make_unique<MultiThreadedStripeAccessor>(*_stripe_pool);
+ _stripe_accessor = std::make_unique<MultiThreadedStripeAccessor>(_stripe_pool);
_bucket_db_updater = std::make_unique<BucketDBUpdater>(_component, _component,
*this, *this,
_component.getDistribution(),
@@ -253,7 +253,7 @@ Distributor::onOpen()
_threadPool.start(_component.getThreadPool());
if (!_use_legacy_mode) {
std::vector<TickableStripe*> pool_stripes({_stripes[0].get()});
- _stripe_pool->start(pool_stripes);
+ _stripe_pool.start(pool_stripes);
}
} else {
LOG(warning, "Not starting distributor thread as it's configured to "
@@ -263,6 +263,8 @@ Distributor::onOpen()
}
void Distributor::onClose() {
+ // Note: In a running system this function is called by the main thread in StorageApp as part of shutdown.
+ // The distributor and stripe thread pools are already stopped at this point.
LOG(debug, "Distributor::onClose invoked");
if (_use_legacy_mode) {
_stripe->flush_and_close();
@@ -270,14 +272,11 @@ void Distributor::onClose() {
// Tests may run with multiple stripes but without threads (for determinism's sake),
// so only try to flush stripes if a pool is running.
// TODO STRIPE probably also need to flush when running tests to handle any explicit close-tests.
- if (_stripe_pool->stripe_count() > 0){
- {
- auto guard = _stripe_accessor->rendezvous_and_hold_all();
- guard->flush_and_close();
+ if (_stripe_pool.stripe_count() > 0) {
+ assert(_stripe_pool.is_stopped());
+ for (auto& thread : _stripe_pool) {
+ thread->stripe().flush_and_close();
}
- // TODO STRIPE must ensure no incoming requests can be posted on stripes between close
- // and pool stop+join!
- _stripe_pool->stop_and_join();
}
assert(_bucket_db_updater);
_bucket_db_updater->flush();
@@ -335,11 +334,11 @@ Distributor::onDown(const std::shared_ptr<api::StorageMessage>& msg)
return true;
}
assert(_stripes.size() == 1);
- assert(_stripe_pool->stripe_count() == 1);
+ assert(_stripe_pool.stripe_count() == 1);
// TODO STRIPE correct routing with multiple stripes
bool handled = first_stripe().handle_or_enqueue_message(msg);
if (handled) {
- _stripe_pool->stripe_thread(0).notify_event_has_triggered();
+ _stripe_pool.stripe_thread(0).notify_event_has_triggered();
}
return handled;
}
@@ -440,7 +439,7 @@ Distributor::propagateDefaultDistribution(
_stripe->propagateDefaultDistribution(std::move(distribution));
} else {
// Should only be called at ctor time, at which point the pool is not yet running.
- assert(_stripe_pool->stripe_count() == 0);
+ assert(_stripe_pool.stripe_count() == 0);
assert(_stripes.size() == 1); // TODO STRIPE all the stripes yes
auto new_configs = BucketSpaceDistributionConfigs::from_default_distribution(std::move(distribution));
for (auto& stripe : _stripes) {
diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h
index b6dbc4432eb..50bd2526ff4 100644
--- a/storage/src/vespa/storage/distributor/distributor.h
+++ b/storage/src/vespa/storage/distributor/distributor.h
@@ -63,6 +63,7 @@ public:
Distributor(DistributorComponentRegister&,
const NodeIdentity& node_identity,
framework::TickingThreadPool&,
+ DistributorStripePool& stripe_pool,
DoneInitializeHandler&,
uint32_t num_distributor_stripes,
HostInfo& hostInfoReporterRegistrar,
@@ -201,7 +202,7 @@ private:
const bool _use_legacy_mode;
// TODO STRIPE multiple stripes...! This is for proof of concept of wiring.
std::unique_ptr<DistributorStripe> _stripe;
- std::unique_ptr<DistributorStripePool> _stripe_pool;
+ DistributorStripePool& _stripe_pool;
std::vector<std::unique_ptr<DistributorStripe>> _stripes;
std::unique_ptr<StripeAccessor> _stripe_accessor;
MessageQueue _message_queue; // Queue for top-level ops
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_pool.h b/storage/src/vespa/storage/distributor/distributor_stripe_pool.h
index 72534de57ae..5e72cb47fc4 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe_pool.h
+++ b/storage/src/vespa/storage/distributor/distributor_stripe_pool.h
@@ -77,6 +77,7 @@ public:
return *_stripes[idx];
}
[[nodiscard]] size_t stripe_count() const noexcept { return _stripes.size(); }
+ [[nodiscard]] bool is_stopped() const noexcept { return _stopped; }
// Applies to all threads. May be called both before and after start(). Thread safe.
void set_tick_wait_duration(vespalib::duration new_tick_wait_duration) noexcept;
diff --git a/storage/src/vespa/storage/storageserver/distributornode.cpp b/storage/src/vespa/storage/storageserver/distributornode.cpp
index 8f4f0422f44..f49b2a23688 100644
--- a/storage/src/vespa/storage/storageserver/distributornode.cpp
+++ b/storage/src/vespa/storage/storageserver/distributornode.cpp
@@ -5,9 +5,10 @@
#include "communicationmanager.h"
#include "opslogger.h"
#include "statemanager.h"
+#include <vespa/storage/common/hostreporter/hostinfo.h>
#include <vespa/storage/common/i_storage_chain_builder.h>
#include <vespa/storage/distributor/distributor.h>
-#include <vespa/storage/common/hostreporter/hostinfo.h>
+#include <vespa/storage/distributor/distributor_stripe_pool.h>
#include <vespa/vespalib/util/exceptions.h>
#include <vespa/log/log.h>
@@ -26,6 +27,7 @@ DistributorNode::DistributorNode(
std::make_unique<HostInfo>(),
!communicationManager ? NORMAL : SINGLE_THREADED_TEST_MODE),
_threadPool(framework::TickingThreadPool::createDefault("distributor")),
+ _stripe_pool(std::make_unique<distributor::DistributorStripePool>()),
_context(context),
_lastUniqueTimestampRequested(0),
_uniqueTimestampCounter(0),
@@ -52,6 +54,7 @@ void
DistributorNode::shutdownDistributor()
{
_threadPool->stop();
+ _stripe_pool->stop_and_join();
shutdown();
}
@@ -100,7 +103,7 @@ DistributorNode::createChain(IStorageChainBuilder &builder)
// manager, which is safe since the lifetime of said state manager
// extends to the end of the process.
builder.add(std::make_unique<storage::distributor::Distributor>
- (dcr, *_node_identity, *_threadPool, getDoneInitializeHandler(),
+ (dcr, *_node_identity, *_threadPool, *_stripe_pool, getDoneInitializeHandler(),
_num_distributor_stripes,
stateManager->getHostInfo()));
diff --git a/storage/src/vespa/storage/storageserver/distributornode.h b/storage/src/vespa/storage/storageserver/distributornode.h
index 267d4400ac7..f2e483bbc9f 100644
--- a/storage/src/vespa/storage/storageserver/distributornode.h
+++ b/storage/src/vespa/storage/storageserver/distributornode.h
@@ -15,6 +15,8 @@
namespace storage {
+namespace distributor { class DistributorStripePool; }
+
class IStorageChainBuilder;
class DistributorNode
@@ -22,6 +24,7 @@ class DistributorNode
private UniqueTimeCalculator
{
framework::TickingThreadPool::UP _threadPool;
+ std::unique_ptr<distributor::DistributorStripePool> _stripe_pool;
DistributorNodeContext& _context;
uint64_t _lastUniqueTimestampRequested;
uint32_t _uniqueTimestampCounter;