summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2021-05-05 08:44:48 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2021-05-05 08:52:39 +0000
commitca7086563beac4bbae4e3d83f72de04a2ccd292c (patch)
tree4f7a61764327c9b412e0915182d06ae9868654e5 /storage
parent02a7400a60b05d4d8db5c77def32d681199e0fb8 (diff)
Run single stripe in its own thread when not using legacy mode
The (currently single) stripe is now run as part of the distributor stripe pool instead of being transitively invoked by the main thread. Introduce an explicit message mutex per stripe that is used for external messages and status requests when not using legacy mode. Use per-stripe wakeup mechanisms instead of the framework-global mutex used in the legacy code path. Additional work remains to bring back a dedicated message run-queue for the top-level distributor, so this is not yet thread safe for operations to the main `BucketDBUpdater`.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/vespa/storage/distributor/distributor.cpp154
-rw-r--r--storage/src/vespa/storage/distributor/distributor.h13
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe.cpp36
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe.h2
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe_pool.h4
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe_thread.h3
-rw-r--r--storage/src/vespa/storage/distributor/legacy_single_stripe_accessor.cpp4
-rw-r--r--storage/src/vespa/storage/distributor/legacy_single_stripe_accessor.h2
-rw-r--r--storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp6
-rw-r--r--storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h4
-rw-r--r--storage/src/vespa/storage/distributor/stripe_access_guard.h2
11 files changed, 187 insertions, 43 deletions
diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp
index 6975a2595ad..2f524f00e0e 100644
--- a/storage/src/vespa/storage/distributor/distributor.cpp
+++ b/storage/src/vespa/storage/distributor/distributor.cpp
@@ -7,9 +7,11 @@
#include "distributor_bucket_space.h"
#include "distributor_status.h"
#include "distributor_stripe.h"
+#include "distributor_stripe_pool.h"
+#include "distributor_stripe_thread.h"
#include "distributormetricsset.h"
#include "idealstatemetricsset.h"
-#include "legacy_single_stripe_accessor.h"
+#include "multi_threaded_stripe_access_guard.h"
#include "operation_sequencer.h"
#include "ownership_transfer_safe_time_point_calculator.h"
#include "throttlingoperationstarter.h"
@@ -57,7 +59,9 @@ Distributor::Distributor(DistributorComponentRegister& compReg,
_use_legacy_mode(num_distributor_stripes == 0),
_stripe(std::make_unique<DistributorStripe>(compReg, *_metrics, node_identity, threadPool,
doneInitHandler, *this, _use_legacy_mode)),
- _stripe_accessor(std::make_unique<LegacySingleStripeAccessor>(*_stripe)),
+ _stripe_pool(),
+ _stripes(),
+ _stripe_accessor(),
_component(*this, compReg, "distributor"),
_total_config(_component.total_distributor_config_sp()),
_bucket_db_updater(),
@@ -74,10 +78,13 @@ Distributor::Distributor(DistributorComponentRegister& compReg,
_component.registerMetricUpdateHook(_metricUpdateHook, framework::SecondTime(0));
if (!_use_legacy_mode) {
LOG(info, "Setting up distributor with %u stripes", num_distributor_stripes); // TODO STRIPE remove once legacy gone
+ _stripe_pool = std::make_unique<DistributorStripePool>();
+ _stripe_accessor = std::make_unique<MultiThreadedStripeAccessor>(*_stripe_pool);
_bucket_db_updater = std::make_unique<BucketDBUpdater>(_component, _component,
*this, *this,
_component.getDistribution(),
*_stripe_accessor);
+ _stripes.emplace_back(std::move(_stripe));
}
_hostInfoReporter.enableReporting(config().getEnableHostInfoReporting());
_distributorStatusDelegate.registerStatusPage();
@@ -91,6 +98,20 @@ Distributor::~Distributor()
closeNextLink();
}
+// TODO STRIPE remove
+DistributorStripe&
+Distributor::first_stripe() noexcept {
+ assert(_stripes.size() == 1);
+ return *_stripes[0];
+}
+
+// TODO STRIPE remove
+const DistributorStripe&
+Distributor::first_stripe() const noexcept {
+ assert(_stripes.size() == 1);
+ return *_stripes[0];
+}
+
// TODO STRIPE figure out how to handle inspection functions used by tests when legacy mode no longer exists.
// All functions below that assert on _use_legacy_mode are only currently used by tests
@@ -217,6 +238,10 @@ Distributor::onOpen()
if (_component.getDistributorConfig().startDistributorThread) {
_threadPool.addThread(*this);
_threadPool.start(_component.getThreadPool());
+ if (!_use_legacy_mode) {
+ std::vector<TickableStripe*> pool_stripes({_stripes[0].get()});
+ _stripe_pool->start(pool_stripes);
+ }
} else {
LOG(warning, "Not starting distributor thread as it's configured to "
"run. Unless you are just running a test tool, this is a "
@@ -226,8 +251,17 @@ Distributor::onOpen()
void Distributor::onClose() {
LOG(debug, "Distributor::onClose invoked");
- _stripe->flush_and_close();
- if (_bucket_db_updater) {
+ if (_use_legacy_mode) {
+ _stripe->flush_and_close();
+ } else {
+ {
+ auto guard = _stripe_accessor->rendezvous_and_hold_all();
+ guard->flush_and_close();
+ }
+ // TODO STRIPE must ensure no incoming requests can be posted on stripes between close
+ // and pool stop+join!
+ _stripe_pool->stop_and_join();
+ assert(_bucket_db_updater);
_bucket_db_updater->flush();
}
}
@@ -273,14 +307,25 @@ bool should_be_handled_by_top_level_bucket_db_updater(const api::StorageMessage&
bool
Distributor::onDown(const std::shared_ptr<api::StorageMessage>& msg)
{
- // FIXME STRIPE this MUST be in a separate thread to enforce processing in a single thread
- // regardless of what RPC thread (comm mgr, FRT...) this is called from!
- if (!_use_legacy_mode && should_be_handled_by_top_level_bucket_db_updater(*msg)) {
- return msg->callHandler(*_bucket_db_updater, msg);
- }
// TODO STRIPE can we route both requests and responses that are BucketCommand|Reply based on their bucket alone?
// that covers most operations already...
- return _stripe->handle_or_enqueue_message(msg);
+ if (_use_legacy_mode) {
+ return _stripe->handle_or_enqueue_message(msg);
+ } else {
+ // FIXME STRIPE this MUST be in a separate thread to enforce processing in a single thread
+ // regardless of what RPC thread (comm mgr, FRT...) this is called from!
+ if (should_be_handled_by_top_level_bucket_db_updater(*msg)) {
+ return msg->callHandler(*_bucket_db_updater, msg);
+ }
+ assert(_stripes.size() == 1);
+ assert(_stripe_pool->stripe_count() == 1);
+ // TODO STRIPE correct routing with multiple stripes
+ bool handled = first_stripe().handle_or_enqueue_message(msg);
+ if (handled) {
+ _stripe_pool->stripe_thread(0).notify_event_has_triggered();
+ }
+ return handled;
+ }
}
bool
@@ -289,6 +334,7 @@ Distributor::handleReply(const std::shared_ptr<api::StorageReply>& reply)
if (!_use_legacy_mode && should_be_handled_by_top_level_bucket_db_updater(*reply)) {
return reply->callHandler(*_bucket_db_updater, reply);
}
+ assert(_use_legacy_mode);
return _stripe->handleReply(reply);
}
@@ -375,33 +421,61 @@ Distributor::propagateDefaultDistribution(
std::shared_ptr<const lib::Distribution> distribution)
{
// TODO STRIPE cannot directly access stripe when not in legacy mode!
- _stripe->propagateDefaultDistribution(std::move(distribution));
+ if (_use_legacy_mode) {
+ _stripe->propagateDefaultDistribution(std::move(distribution));
+ } else {
+ // Should only be called at ctor time, at which point the pool is not yet running.
+ assert(_stripe_pool->stripe_count() == 0);
+ assert(_stripes.size() == 1); // TODO STRIPE all the stripes yes
+ auto new_configs = BucketSpaceDistributionConfigs::from_default_distribution(std::move(distribution));
+ for (auto& stripe : _stripes) {
+ stripe->update_distribution_config(new_configs);
+ }
+ }
}
std::unordered_map<uint16_t, uint32_t>
Distributor::getMinReplica() const
{
// TODO STRIPE merged snapshot from all stripes
- return _stripe->getMinReplica();
+ if (_use_legacy_mode) {
+ return _stripe->getMinReplica();
+ } else {
+ return first_stripe().getMinReplica();
+ }
}
BucketSpacesStatsProvider::PerNodeBucketSpacesStats
Distributor::getBucketSpacesStats() const
{
// TODO STRIPE merged snapshot from all stripes
- return _stripe->getBucketSpacesStats();
+ if (_use_legacy_mode) {
+ return _stripe->getBucketSpacesStats();
+ } else {
+ return first_stripe().getBucketSpacesStats();
+ }
}
SimpleMaintenanceScanner::PendingMaintenanceStats
Distributor::pending_maintenance_stats() const {
// TODO STRIPE merged snapshot from all stripes
- return _stripe->pending_maintenance_stats();
+ if (_use_legacy_mode) {
+ return _stripe->pending_maintenance_stats();
+ } else {
+ return first_stripe().pending_maintenance_stats();
+ }
}
void
Distributor::propagateInternalScanMetricsToExternal()
{
- _stripe->propagateInternalScanMetricsToExternal();
+ // TODO STRIPE propagate to all stripes
+ // TODO STRIPE reconsider metric wiring...
+ if (_use_legacy_mode) {
+ _stripe->propagateInternalScanMetricsToExternal();
+ } else {
+ first_stripe().propagateInternalScanMetricsToExternal();
+ }
}
void
@@ -420,21 +494,23 @@ Distributor::doCriticalTick(framework::ThreadIndex idx)
}
// Propagates any new configs down to stripe(s)
enableNextConfig();
- // TODO STRIPE only do in legacy mode, use stripe pool ticking otherwise
- _stripe->doCriticalTick(idx);
- _tickResult.merge(_stripe->_tickResult);
+ if (_use_legacy_mode) {
+ _stripe->doCriticalTick(idx);
+ _tickResult.merge(_stripe->_tickResult);
+ }
return _tickResult;
}
framework::ThreadWaitInfo
Distributor::doNonCriticalTick(framework::ThreadIndex idx)
{
- if (!_use_legacy_mode) {
+ if (_use_legacy_mode) {
+ _stripe->doNonCriticalTick(idx);
+ _tickResult = _stripe->_tickResult;
+ } else {
_bucket_db_updater->resend_delayed_messages();
+ _tickResult = framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN;
}
- // TODO STRIPE stripes need their own thread loops!
- _stripe->doNonCriticalTick(idx);
- _tickResult = _stripe->_tickResult;
return _tickResult;
}
@@ -463,27 +539,53 @@ Distributor::enableNextConfig() // TODO STRIPE rename to enable_next_config_if_c
vespalib::string
Distributor::getReportContentType(const framework::HttpUrlPath& path) const
{
- return _stripe->getReportContentType(path);
+ // This is const thread safe
+ // TODO STRIPE we should probably do this in the top-level distributor
+ if (_use_legacy_mode) {
+ return _stripe->getReportContentType(path);
+ } else {
+ return first_stripe().getReportContentType(path);
+ }
}
std::string
Distributor::getActiveIdealStateOperations() const
{
- return _stripe->getActiveIdealStateOperations();
+ // TODO STRIPE need to aggregate status responses _across_ stripes..!
+ if (_use_legacy_mode) {
+ return _stripe->getActiveIdealStateOperations();
+ } else {
+ auto guard = _stripe_accessor->rendezvous_and_hold_all();
+ return first_stripe().getActiveIdealStateOperations();
+ }
}
bool
Distributor::reportStatus(std::ostream& out,
const framework::HttpUrlPath& path) const
{
- return _stripe->reportStatus(out, path);
+ // TODO STRIPE need to aggregate status responses _across_ stripes..!
+ if (_use_legacy_mode) {
+ return _stripe->reportStatus(out, path);
+ } else {
+ auto guard = _stripe_accessor->rendezvous_and_hold_all();
+ return first_stripe().reportStatus(out, path);
+ }
}
bool
Distributor::handleStatusRequest(const DelegatedStatusRequest& request) const
{
// TODO STRIPE need to aggregate status responses _across_ stripes..!
- return _stripe->handleStatusRequest(request);
+ if (_use_legacy_mode) {
+ return _stripe->handleStatusRequest(request);
+ } else {
+ // Can't hold guard here or we'll deadlock by never allowing the thread to process the request
+ bool handled = first_stripe().handleStatusRequest(request);
+ // TODO STRIPE wake up stripe thread; handleStatusRequest waits for completion
+ // (not really needed since this will be removed)
+ return handled;
+ }
}
}
diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h
index 0420f1b1f22..d9aada78c79 100644
--- a/storage/src/vespa/storage/distributor/distributor.h
+++ b/storage/src/vespa/storage/distributor/distributor.h
@@ -40,8 +40,9 @@ class BucketDBUpdater;
class DistributorBucketSpaceRepo;
class DistributorStatus;
class DistributorStripe;
+class DistributorStripePool;
+class StripeAccessor;
class OperationSequencer;
-class LegacySingleStripeAccessor;
class OwnershipTransferSafeTimePointCalculator;
class SimpleMaintenanceScanner;
class ThrottlingOperationStarter;
@@ -118,6 +119,10 @@ private:
friend class DistributorTestUtil;
friend class MetricUpdateHook;
+ // TODO STRIPE remove
+ DistributorStripe& first_stripe() noexcept;
+ const DistributorStripe& first_stripe() const noexcept;
+
void setNodeStateUp();
bool handleMessage(const std::shared_ptr<api::StorageMessage>& msg);
@@ -171,8 +176,10 @@ private:
ChainedMessageSender* _messageSender;
const bool _use_legacy_mode;
// TODO STRIPE multiple stripes...! This is for proof of concept of wiring.
- std::unique_ptr<DistributorStripe> _stripe;
- std::unique_ptr<LegacySingleStripeAccessor> _stripe_accessor;
+ std::unique_ptr<DistributorStripe> _stripe;
+ std::unique_ptr<DistributorStripePool> _stripe_pool;
+ std::vector<std::unique_ptr<DistributorStripe>> _stripes;
+ std::unique_ptr<StripeAccessor> _stripe_accessor;
distributor::DistributorComponent _component;
std::shared_ptr<const DistributorConfiguration> _total_config;
std::unique_ptr<BucketDBUpdater> _bucket_db_updater;
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
index 1f6a5b318fd..e9969b79bd4 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe.cpp
+++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
@@ -60,6 +60,7 @@ DistributorStripe::DistributorStripe(DistributorComponentRegister& compReg,
_externalOperationHandler(_component, _component, getMetrics(), getMessageSender(),
*_operation_sequencer, *this, _component,
_idealStateManager, _operationOwner),
+ _external_message_mutex(),
_threadPool(threadPool),
_doneInitializeHandler(doneInitHandler),
_doneInitializing(false),
@@ -168,14 +169,19 @@ DistributorStripe::handle_or_enqueue_message(const std::shared_ptr<api::StorageM
if (_externalOperationHandler.try_handle_message_outside_main_thread(msg)) {
return true;
}
- // TODO STRIPE redesign how message queue guarding and wakeup is performed.
- // Currently involves a _thread pool global_ lock transitively via tick guard!
- framework::TickingLockGuard guard(_threadPool.freezeCriticalTicks());
MBUS_TRACE(msg->getTrace(), 9,
"Distributor: Added to message queue. Thread state: "
+ _threadPool.getStatus());
- _messageQueue.push_back(msg);
- guard.broadcast();
+ if (_use_legacy_mode) {
+ // TODO STRIPE remove
+ framework::TickingLockGuard guard(_threadPool.freezeCriticalTicks());
+ _messageQueue.push_back(msg);
+ guard.broadcast();
+ } else {
+ std::lock_guard lock(_external_message_mutex);
+ _messageQueue.push_back(msg);
+ // Caller has the responsibility to wake up correct stripe
+ }
return true;
}
@@ -727,6 +733,7 @@ DistributorStripe::scanNextBucket()
void DistributorStripe::send_updated_host_info_if_required() {
if (_must_send_updated_host_info) {
+ // TODO STRIPE how to handle with multiple stripes?
_component.getStateUpdater().immediately_send_get_node_state_replies();
_must_send_updated_host_info = false;
}
@@ -745,10 +752,9 @@ framework::ThreadWaitInfo
DistributorStripe::doCriticalTick(framework::ThreadIndex)
{
_tickResult = framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN;
- if (_use_legacy_mode) {
- enableNextDistribution();
- enableNextConfig();
- }
+ assert(_use_legacy_mode);
+ enableNextDistribution();
+ enableNextConfig();
fetchStatusRequests();
fetchExternalMessages();
return _tickResult;
@@ -758,6 +764,11 @@ framework::ThreadWaitInfo
DistributorStripe::doNonCriticalTick(framework::ThreadIndex)
{
_tickResult = framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN;
+ if (!_use_legacy_mode) {
+ std::lock_guard lock(_external_message_mutex);
+ fetchStatusRequests();
+ fetchExternalMessages();
+ }
handleStatusRequests();
startExternalOperations();
if (initializing()) {
@@ -920,14 +931,19 @@ DistributorStripe::reportStatus(std::ostream& out,
return true;
}
+// TODO STRIPE remove this; delegated to top-level Distributor only
bool
DistributorStripe::handleStatusRequest(const DelegatedStatusRequest& request) const
{
auto wrappedRequest = std::make_shared<DistributorStatus>(request);
- {
+ if (_use_legacy_mode) {
framework::TickingLockGuard guard(_threadPool.freezeCriticalTicks());
_statusToDo.push_back(wrappedRequest);
guard.broadcast();
+ } else {
+ std::lock_guard lock(_external_message_mutex);
+ _statusToDo.push_back(wrappedRequest);
+ // FIXME won't be woken up explicitly, but will be processed after 1ms anyway.
}
wrappedRequest->waitForCompletion();
return true;
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.h b/storage/src/vespa/storage/distributor/distributor_stripe.h
index 7b34367cecb..7b0cfc66a1b 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe.h
+++ b/storage/src/vespa/storage/distributor/distributor_stripe.h
@@ -22,6 +22,7 @@
#include <vespa/storageapi/message/state.h>
#include <vespa/storageframework/generic/metric/metricupdatehook.h>
#include <vespa/storageframework/generic/thread/tickingthread.h>
+#include <mutex>
#include <queue>
#include <unordered_map>
@@ -298,6 +299,7 @@ private:
std::vector<std::shared_ptr<api::StorageMessage>>,
IndirectHigherPriority
>;
+ mutable std::mutex _external_message_mutex;
MessageQueue _messageQueue;
ClientRequestPriorityQueue _client_request_priority_queue;
MessageQueue _fetchedMessages;
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_pool.h b/storage/src/vespa/storage/distributor/distributor_stripe_pool.h
index 9149482cd5d..4ac52b0ede8 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe_pool.h
+++ b/storage/src/vespa/storage/distributor/distributor_stripe_pool.h
@@ -62,10 +62,10 @@ public:
void park_all_threads() noexcept;
void unpark_all_threads() noexcept;
- [[nodiscard]] const DistributorStripeThread& stripe(size_t idx) const noexcept {
+ [[nodiscard]] const DistributorStripeThread& stripe_thread(size_t idx) const noexcept {
return *_stripes[idx];
}
- [[nodiscard]] DistributorStripeThread& stripe(size_t idx) noexcept {
+ [[nodiscard]] DistributorStripeThread& stripe_thread(size_t idx) noexcept {
return *_stripes[idx];
}
[[nodiscard]] size_t stripe_count() const noexcept { return _stripes.size(); }
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_thread.h b/storage/src/vespa/storage/distributor/distributor_stripe_thread.h
index b02d733895e..60f10889afd 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe_thread.h
+++ b/storage/src/vespa/storage/distributor/distributor_stripe_thread.h
@@ -53,6 +53,9 @@ public:
TickableStripe* operator->() noexcept { return &_stripe; }
const TickableStripe* operator->() const noexcept { return &_stripe; }
+
+ TickableStripe& stripe() noexcept { return _stripe; }
+ const TickableStripe& stripe() const noexcept { return _stripe; }
private:
[[nodiscard]] bool should_stop_thread_relaxed() const noexcept {
return _should_stop.load(std::memory_order_relaxed);
diff --git a/storage/src/vespa/storage/distributor/legacy_single_stripe_accessor.cpp b/storage/src/vespa/storage/distributor/legacy_single_stripe_accessor.cpp
index 0c6c0206608..29b71436fe8 100644
--- a/storage/src/vespa/storage/distributor/legacy_single_stripe_accessor.cpp
+++ b/storage/src/vespa/storage/distributor/legacy_single_stripe_accessor.cpp
@@ -14,6 +14,10 @@ LegacySingleStripeAccessGuard::~LegacySingleStripeAccessGuard() {
_accessor.mark_guard_released();
}
+void LegacySingleStripeAccessGuard::flush_and_close() {
+ _stripe.flush_and_close();
+}
+
void LegacySingleStripeAccessGuard::update_total_distributor_config(std::shared_ptr<const DistributorConfiguration> config) {
_stripe.update_total_distributor_config(std::move(config));
}
diff --git a/storage/src/vespa/storage/distributor/legacy_single_stripe_accessor.h b/storage/src/vespa/storage/distributor/legacy_single_stripe_accessor.h
index caf1e397e5b..f119462038e 100644
--- a/storage/src/vespa/storage/distributor/legacy_single_stripe_accessor.h
+++ b/storage/src/vespa/storage/distributor/legacy_single_stripe_accessor.h
@@ -21,6 +21,8 @@ public:
DistributorStripe& stripe);
~LegacySingleStripeAccessGuard() override;
+ void flush_and_close() override;
+
void update_total_distributor_config(std::shared_ptr<const DistributorConfiguration> config) override;
void update_distribution_config(const BucketSpaceDistributionConfigs& new_configs) override;
diff --git a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp
index 6bc9c03158a..03e47d2cb67 100644
--- a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp
+++ b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp
@@ -21,6 +21,10 @@ MultiThreadedStripeAccessGuard::~MultiThreadedStripeAccessGuard() {
_accessor.mark_guard_released();
}
+void MultiThreadedStripeAccessGuard::flush_and_close() {
+ first_stripe().flush_and_close();
+}
+
void MultiThreadedStripeAccessGuard::update_total_distributor_config(std::shared_ptr<const DistributorConfiguration> config) {
// TODO STRIPE multiple stripes
first_stripe().update_total_distributor_config(std::move(config));
@@ -95,7 +99,7 @@ void MultiThreadedStripeAccessGuard::clear_read_only_bucket_repo_databases() {
}
DistributorStripe& MultiThreadedStripeAccessGuard::first_stripe() noexcept {
- return dynamic_cast<DistributorStripe&>(_stripe_pool.stripe(0));
+ return dynamic_cast<DistributorStripe&>(_stripe_pool.stripe_thread(0).stripe());
}
std::unique_ptr<StripeAccessGuard> MultiThreadedStripeAccessor::rendezvous_and_hold_all() {
diff --git a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h
index 376eccd1c4a..03e36c29bba 100644
--- a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h
+++ b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h
@@ -27,6 +27,8 @@ public:
DistributorStripePool& stripe_pool);
~MultiThreadedStripeAccessGuard() override;
+ void flush_and_close() override;
+
void update_total_distributor_config(std::shared_ptr<const DistributorConfiguration> config) override;
void update_distribution_config(const BucketSpaceDistributionConfigs& new_configs) override;
@@ -65,7 +67,7 @@ class MultiThreadedStripeAccessor : public StripeAccessor {
friend class MultiThreadedStripeAccessGuard;
public:
- MultiThreadedStripeAccessor(DistributorStripePool& stripe_pool)
+ explicit MultiThreadedStripeAccessor(DistributorStripePool& stripe_pool)
: _stripe_pool(stripe_pool),
_guard_held(false)
{}
diff --git a/storage/src/vespa/storage/distributor/stripe_access_guard.h b/storage/src/vespa/storage/distributor/stripe_access_guard.h
index 69aae755dec..1d570cbb3bc 100644
--- a/storage/src/vespa/storage/distributor/stripe_access_guard.h
+++ b/storage/src/vespa/storage/distributor/stripe_access_guard.h
@@ -28,6 +28,8 @@ class StripeAccessGuard {
public:
virtual ~StripeAccessGuard() = default;
+ virtual void flush_and_close() = 0;
+
virtual void update_total_distributor_config(std::shared_ptr<const DistributorConfiguration> config) = 0;
virtual void update_distribution_config(const BucketSpaceDistributionConfigs& new_configs) = 0;