summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGeir Storli <geirst@verizonmedia.com>2021-05-11 15:04:47 +0000
committerGeir Storli <geirst@verizonmedia.com>2021-05-11 15:04:47 +0000
commit284dba9f34a5eb84b83b7ae706cc6274f323ffac (patch)
tree29f677aa767576ffae37768dd2a6328c1ed9558b
parent839a6f9a7d1f66937f51db3766a2dfd3e7b90675 (diff)
Stop all stripe threads before starting shutdown (and closing) of the storage link chain.
This is required to avoid stripe threads being able to send up messages while the communication manager is being closed. Such messages will fail at the RPC layer (already closed) and an error reply is sent down from the communication manager. This triggers an assert in StorageLink::sendDown() which is already CLOSED.
-rw-r--r--storage/src/tests/distributor/distributortestutil.cpp3
-rw-r--r--storage/src/tests/distributor/distributortestutil.h6
-rw-r--r--storage/src/vespa/storage/distributor/distributor.cpp27
-rw-r--r--storage/src/vespa/storage/distributor/distributor.h3
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe_pool.h1
-rw-r--r--storage/src/vespa/storage/storageserver/distributornode.cpp7
-rw-r--r--storage/src/vespa/storage/storageserver/distributornode.h3
7 files changed, 31 insertions, 19 deletions
diff --git a/storage/src/tests/distributor/distributortestutil.cpp b/storage/src/tests/distributor/distributortestutil.cpp
index be123d6281c..e92ba0374bc 100644
--- a/storage/src/tests/distributor/distributortestutil.cpp
+++ b/storage/src/tests/distributor/distributortestutil.cpp
@@ -7,6 +7,7 @@
#include <vespa/storage/distributor/distributor_bucket_space.h>
#include <vespa/storage/distributor/distributor_stripe.h>
#include <vespa/storage/distributor/distributor_stripe_component.h>
+#include <vespa/storage/distributor/distributor_stripe_pool.h>
#include <vespa/vdslib/distribution/distribution.h>
#include <vespa/vespalib/text/stringtokenizer.h>
@@ -28,10 +29,12 @@ DistributorTestUtil::createLinks()
{
_node.reset(new TestDistributorApp(_config.getConfigId()));
_threadPool = framework::TickingThreadPool::createDefault("distributor");
+ _stripe_pool = std::make_unique<DistributorStripePool>();
_distributor.reset(new Distributor(
_node->getComponentRegister(),
_node->node_identity(),
*_threadPool,
+ *_stripe_pool,
*this,
_num_distributor_stripes,
_hostInfo,
diff --git a/storage/src/tests/distributor/distributortestutil.h b/storage/src/tests/distributor/distributortestutil.h
index de46905c870..63ca47755e6 100644
--- a/storage/src/tests/distributor/distributortestutil.h
+++ b/storage/src/tests/distributor/distributortestutil.h
@@ -17,16 +17,17 @@ namespace framework { struct TickingThreadPool; }
namespace distributor {
-class StripeBucketDBUpdater;
class Distributor;
class DistributorBucketSpace;
class DistributorBucketSpaceRepo;
-class DistributorStripeOperationContext;
class DistributorStripe;
class DistributorStripeComponent;
+class DistributorStripeOperationContext;
+class DistributorStripePool;
class ExternalOperationHandler;
class IdealStateManager;
class Operation;
+class StripeBucketDBUpdater;
// TODO STRIPE rename to DistributorStripeTestUtil?
class DistributorTestUtil : private DoneInitializeHandler
@@ -206,6 +207,7 @@ protected:
vdstestlib::DirConfig _config;
std::unique_ptr<TestDistributorApp> _node;
std::unique_ptr<framework::TickingThreadPool> _threadPool;
+ std::unique_ptr<DistributorStripePool> _stripe_pool;
std::unique_ptr<Distributor> _distributor;
std::unique_ptr<storage::DistributorComponent> _component;
DistributorMessageSenderStub _sender;
diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp
index f7ecd324a51..47f7fee5873 100644
--- a/storage/src/vespa/storage/distributor/distributor.cpp
+++ b/storage/src/vespa/storage/distributor/distributor.cpp
@@ -48,6 +48,7 @@ namespace storage::distributor {
Distributor::Distributor(DistributorComponentRegister& compReg,
const NodeIdentity& node_identity,
framework::TickingThreadPool& threadPool,
+ DistributorStripePool& stripe_pool,
DoneInitializeHandler& doneInitHandler,
uint32_t num_distributor_stripes,
HostInfo& hostInfoReporterRegistrar,
@@ -60,7 +61,7 @@ Distributor::Distributor(DistributorComponentRegister& compReg,
_use_legacy_mode(num_distributor_stripes == 0),
_stripe(std::make_unique<DistributorStripe>(compReg, *_metrics, node_identity, threadPool,
doneInitHandler, *this, *this, _use_legacy_mode)),
- _stripe_pool(),
+ _stripe_pool(stripe_pool),
_stripes(),
_stripe_accessor(),
_message_queue(),
@@ -88,8 +89,7 @@ Distributor::Distributor(DistributorComponentRegister& compReg,
_component.registerMetricUpdateHook(_metricUpdateHook, framework::SecondTime(0));
if (!_use_legacy_mode) {
LOG(info, "Setting up distributor with %u stripes", num_distributor_stripes); // TODO STRIPE remove once legacy gone
- _stripe_pool = std::make_unique<DistributorStripePool>();
- _stripe_accessor = std::make_unique<MultiThreadedStripeAccessor>(*_stripe_pool);
+ _stripe_accessor = std::make_unique<MultiThreadedStripeAccessor>(_stripe_pool);
_bucket_db_updater = std::make_unique<BucketDBUpdater>(_component, _component,
*this, *this,
_component.getDistribution(),
@@ -253,7 +253,7 @@ Distributor::onOpen()
_threadPool.start(_component.getThreadPool());
if (!_use_legacy_mode) {
std::vector<TickableStripe*> pool_stripes({_stripes[0].get()});
- _stripe_pool->start(pool_stripes);
+ _stripe_pool.start(pool_stripes);
}
} else {
LOG(warning, "Not starting distributor thread as it's configured to "
@@ -263,6 +263,8 @@ Distributor::onOpen()
}
void Distributor::onClose() {
+ // Note: In a running system this function is called by the main thread in StorageApp as part of shutdown.
+ // The distributor and stripe thread pools are already stopped at this point.
LOG(debug, "Distributor::onClose invoked");
if (_use_legacy_mode) {
_stripe->flush_and_close();
@@ -270,14 +272,11 @@ void Distributor::onClose() {
// Tests may run with multiple stripes but without threads (for determinism's sake),
// so only try to flush stripes if a pool is running.
// TODO STRIPE probably also need to flush when running tests to handle any explicit close-tests.
- if (_stripe_pool->stripe_count() > 0){
- {
- auto guard = _stripe_accessor->rendezvous_and_hold_all();
- guard->flush_and_close();
+ if (_stripe_pool.stripe_count() > 0) {
+ assert(_stripe_pool.is_stopped());
+ for (auto& thread : _stripe_pool) {
+ thread->stripe().flush_and_close();
}
- // TODO STRIPE must ensure no incoming requests can be posted on stripes between close
- // and pool stop+join!
- _stripe_pool->stop_and_join();
}
assert(_bucket_db_updater);
_bucket_db_updater->flush();
@@ -335,11 +334,11 @@ Distributor::onDown(const std::shared_ptr<api::StorageMessage>& msg)
return true;
}
assert(_stripes.size() == 1);
- assert(_stripe_pool->stripe_count() == 1);
+ assert(_stripe_pool.stripe_count() == 1);
// TODO STRIPE correct routing with multiple stripes
bool handled = first_stripe().handle_or_enqueue_message(msg);
if (handled) {
- _stripe_pool->stripe_thread(0).notify_event_has_triggered();
+ _stripe_pool.stripe_thread(0).notify_event_has_triggered();
}
return handled;
}
@@ -440,7 +439,7 @@ Distributor::propagateDefaultDistribution(
_stripe->propagateDefaultDistribution(std::move(distribution));
} else {
// Should only be called at ctor time, at which point the pool is not yet running.
- assert(_stripe_pool->stripe_count() == 0);
+ assert(_stripe_pool.stripe_count() == 0);
assert(_stripes.size() == 1); // TODO STRIPE all the stripes yes
auto new_configs = BucketSpaceDistributionConfigs::from_default_distribution(std::move(distribution));
for (auto& stripe : _stripes) {
diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h
index b6dbc4432eb..50bd2526ff4 100644
--- a/storage/src/vespa/storage/distributor/distributor.h
+++ b/storage/src/vespa/storage/distributor/distributor.h
@@ -63,6 +63,7 @@ public:
Distributor(DistributorComponentRegister&,
const NodeIdentity& node_identity,
framework::TickingThreadPool&,
+ DistributorStripePool& stripe_pool,
DoneInitializeHandler&,
uint32_t num_distributor_stripes,
HostInfo& hostInfoReporterRegistrar,
@@ -201,7 +202,7 @@ private:
const bool _use_legacy_mode;
// TODO STRIPE multiple stripes...! This is for proof of concept of wiring.
std::unique_ptr<DistributorStripe> _stripe;
- std::unique_ptr<DistributorStripePool> _stripe_pool;
+ DistributorStripePool& _stripe_pool;
std::vector<std::unique_ptr<DistributorStripe>> _stripes;
std::unique_ptr<StripeAccessor> _stripe_accessor;
MessageQueue _message_queue; // Queue for top-level ops
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_pool.h b/storage/src/vespa/storage/distributor/distributor_stripe_pool.h
index 72534de57ae..5e72cb47fc4 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe_pool.h
+++ b/storage/src/vespa/storage/distributor/distributor_stripe_pool.h
@@ -77,6 +77,7 @@ public:
return *_stripes[idx];
}
[[nodiscard]] size_t stripe_count() const noexcept { return _stripes.size(); }
+ [[nodiscard]] bool is_stopped() const noexcept { return _stopped; }
// Applies to all threads. May be called both before and after start(). Thread safe.
void set_tick_wait_duration(vespalib::duration new_tick_wait_duration) noexcept;
diff --git a/storage/src/vespa/storage/storageserver/distributornode.cpp b/storage/src/vespa/storage/storageserver/distributornode.cpp
index 8f4f0422f44..f49b2a23688 100644
--- a/storage/src/vespa/storage/storageserver/distributornode.cpp
+++ b/storage/src/vespa/storage/storageserver/distributornode.cpp
@@ -5,9 +5,10 @@
#include "communicationmanager.h"
#include "opslogger.h"
#include "statemanager.h"
+#include <vespa/storage/common/hostreporter/hostinfo.h>
#include <vespa/storage/common/i_storage_chain_builder.h>
#include <vespa/storage/distributor/distributor.h>
-#include <vespa/storage/common/hostreporter/hostinfo.h>
+#include <vespa/storage/distributor/distributor_stripe_pool.h>
#include <vespa/vespalib/util/exceptions.h>
#include <vespa/log/log.h>
@@ -26,6 +27,7 @@ DistributorNode::DistributorNode(
std::make_unique<HostInfo>(),
!communicationManager ? NORMAL : SINGLE_THREADED_TEST_MODE),
_threadPool(framework::TickingThreadPool::createDefault("distributor")),
+ _stripe_pool(std::make_unique<distributor::DistributorStripePool>()),
_context(context),
_lastUniqueTimestampRequested(0),
_uniqueTimestampCounter(0),
@@ -52,6 +54,7 @@ void
DistributorNode::shutdownDistributor()
{
_threadPool->stop();
+ _stripe_pool->stop_and_join();
shutdown();
}
@@ -100,7 +103,7 @@ DistributorNode::createChain(IStorageChainBuilder &builder)
// manager, which is safe since the lifetime of said state manager
// extends to the end of the process.
builder.add(std::make_unique<storage::distributor::Distributor>
- (dcr, *_node_identity, *_threadPool, getDoneInitializeHandler(),
+ (dcr, *_node_identity, *_threadPool, *_stripe_pool, getDoneInitializeHandler(),
_num_distributor_stripes,
stateManager->getHostInfo()));
diff --git a/storage/src/vespa/storage/storageserver/distributornode.h b/storage/src/vespa/storage/storageserver/distributornode.h
index 267d4400ac7..f2e483bbc9f 100644
--- a/storage/src/vespa/storage/storageserver/distributornode.h
+++ b/storage/src/vespa/storage/storageserver/distributornode.h
@@ -15,6 +15,8 @@
namespace storage {
+namespace distributor { class DistributorStripePool; }
+
class IStorageChainBuilder;
class DistributorNode
@@ -22,6 +24,7 @@ class DistributorNode
private UniqueTimeCalculator
{
framework::TickingThreadPool::UP _threadPool;
+ std::unique_ptr<distributor::DistributorStripePool> _stripe_pool;
DistributorNodeContext& _context;
uint64_t _lastUniqueTimestampRequested;
uint32_t _uniqueTimestampCounter;