summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
Diffstat (limited to 'storage')
-rw-r--r--storage/src/vespa/storage/distributor/distributor.cpp154
-rw-r--r--storage/src/vespa/storage/distributor/distributor.h13
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe.cpp36
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe.h2
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe_pool.h4
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe_thread.h3
-rw-r--r--storage/src/vespa/storage/distributor/legacy_single_stripe_accessor.cpp4
-rw-r--r--storage/src/vespa/storage/distributor/legacy_single_stripe_accessor.h2
-rw-r--r--storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp6
-rw-r--r--storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h4
-rw-r--r--storage/src/vespa/storage/distributor/stripe_access_guard.h2
11 files changed, 187 insertions, 43 deletions
diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp
index 6975a2595ad..2f524f00e0e 100644
--- a/storage/src/vespa/storage/distributor/distributor.cpp
+++ b/storage/src/vespa/storage/distributor/distributor.cpp
@@ -7,9 +7,11 @@
#include "distributor_bucket_space.h"
#include "distributor_status.h"
#include "distributor_stripe.h"
+#include "distributor_stripe_pool.h"
+#include "distributor_stripe_thread.h"
#include "distributormetricsset.h"
#include "idealstatemetricsset.h"
-#include "legacy_single_stripe_accessor.h"
+#include "multi_threaded_stripe_access_guard.h"
#include "operation_sequencer.h"
#include "ownership_transfer_safe_time_point_calculator.h"
#include "throttlingoperationstarter.h"
@@ -57,7 +59,9 @@ Distributor::Distributor(DistributorComponentRegister& compReg,
_use_legacy_mode(num_distributor_stripes == 0),
_stripe(std::make_unique<DistributorStripe>(compReg, *_metrics, node_identity, threadPool,
doneInitHandler, *this, _use_legacy_mode)),
- _stripe_accessor(std::make_unique<LegacySingleStripeAccessor>(*_stripe)),
+ _stripe_pool(),
+ _stripes(),
+ _stripe_accessor(),
_component(*this, compReg, "distributor"),
_total_config(_component.total_distributor_config_sp()),
_bucket_db_updater(),
@@ -74,10 +78,13 @@ Distributor::Distributor(DistributorComponentRegister& compReg,
_component.registerMetricUpdateHook(_metricUpdateHook, framework::SecondTime(0));
if (!_use_legacy_mode) {
LOG(info, "Setting up distributor with %u stripes", num_distributor_stripes); // TODO STRIPE remove once legacy gone
+ _stripe_pool = std::make_unique<DistributorStripePool>();
+ _stripe_accessor = std::make_unique<MultiThreadedStripeAccessor>(*_stripe_pool);
_bucket_db_updater = std::make_unique<BucketDBUpdater>(_component, _component,
*this, *this,
_component.getDistribution(),
*_stripe_accessor);
+ _stripes.emplace_back(std::move(_stripe));
}
_hostInfoReporter.enableReporting(config().getEnableHostInfoReporting());
_distributorStatusDelegate.registerStatusPage();
@@ -91,6 +98,20 @@ Distributor::~Distributor()
closeNextLink();
}
+// TODO STRIPE remove
+DistributorStripe&
+Distributor::first_stripe() noexcept {
+ assert(_stripes.size() == 1);
+ return *_stripes[0];
+}
+
+// TODO STRIPE remove
+const DistributorStripe&
+Distributor::first_stripe() const noexcept {
+ assert(_stripes.size() == 1);
+ return *_stripes[0];
+}
+
// TODO STRIPE figure out how to handle inspection functions used by tests when legacy mode no longer exists.
// All functions below that assert on _use_legacy_mode are only currently used by tests
@@ -217,6 +238,10 @@ Distributor::onOpen()
if (_component.getDistributorConfig().startDistributorThread) {
_threadPool.addThread(*this);
_threadPool.start(_component.getThreadPool());
+ if (!_use_legacy_mode) {
+ std::vector<TickableStripe*> pool_stripes({_stripes[0].get()});
+ _stripe_pool->start(pool_stripes);
+ }
} else {
LOG(warning, "Not starting distributor thread as it's configured to "
"run. Unless you are just running a test tool, this is a "
@@ -226,8 +251,17 @@ Distributor::onOpen()
void Distributor::onClose() {
LOG(debug, "Distributor::onClose invoked");
- _stripe->flush_and_close();
- if (_bucket_db_updater) {
+ if (_use_legacy_mode) {
+ _stripe->flush_and_close();
+ } else {
+ {
+ auto guard = _stripe_accessor->rendezvous_and_hold_all();
+ guard->flush_and_close();
+ }
+ // TODO STRIPE must ensure no incoming requests can be posted on stripes between close
+ // and pool stop+join!
+ _stripe_pool->stop_and_join();
+ assert(_bucket_db_updater);
_bucket_db_updater->flush();
}
}
@@ -273,14 +307,25 @@ bool should_be_handled_by_top_level_bucket_db_updater(const api::StorageMessage&
bool
Distributor::onDown(const std::shared_ptr<api::StorageMessage>& msg)
{
- // FIXME STRIPE this MUST be in a separate thread to enforce processing in a single thread
- // regardless of what RPC thread (comm mgr, FRT...) this is called from!
- if (!_use_legacy_mode && should_be_handled_by_top_level_bucket_db_updater(*msg)) {
- return msg->callHandler(*_bucket_db_updater, msg);
- }
// TODO STRIPE can we route both requests and responses that are BucketCommand|Reply based on their bucket alone?
// that covers most operations already...
- return _stripe->handle_or_enqueue_message(msg);
+ if (_use_legacy_mode) {
+ return _stripe->handle_or_enqueue_message(msg);
+ } else {
+ // FIXME STRIPE this MUST be in a separate thread to enforce processing in a single thread
+ // regardless of what RPC thread (comm mgr, FRT...) this is called from!
+ if (should_be_handled_by_top_level_bucket_db_updater(*msg)) {
+ return msg->callHandler(*_bucket_db_updater, msg);
+ }
+ assert(_stripes.size() == 1);
+ assert(_stripe_pool->stripe_count() == 1);
+ // TODO STRIPE correct routing with multiple stripes
+ bool handled = first_stripe().handle_or_enqueue_message(msg);
+ if (handled) {
+ _stripe_pool->stripe_thread(0).notify_event_has_triggered();
+ }
+ return handled;
+ }
}
bool
@@ -289,6 +334,7 @@ Distributor::handleReply(const std::shared_ptr<api::StorageReply>& reply)
if (!_use_legacy_mode && should_be_handled_by_top_level_bucket_db_updater(*reply)) {
return reply->callHandler(*_bucket_db_updater, reply);
}
+ assert(_use_legacy_mode);
return _stripe->handleReply(reply);
}
@@ -375,33 +421,61 @@ Distributor::propagateDefaultDistribution(
std::shared_ptr<const lib::Distribution> distribution)
{
// TODO STRIPE cannot directly access stripe when not in legacy mode!
- _stripe->propagateDefaultDistribution(std::move(distribution));
+ if (_use_legacy_mode) {
+ _stripe->propagateDefaultDistribution(std::move(distribution));
+ } else {
+ // Should only be called at ctor time, at which point the pool is not yet running.
+ assert(_stripe_pool->stripe_count() == 0);
+ assert(_stripes.size() == 1); // TODO STRIPE all the stripes yes
+ auto new_configs = BucketSpaceDistributionConfigs::from_default_distribution(std::move(distribution));
+ for (auto& stripe : _stripes) {
+ stripe->update_distribution_config(new_configs);
+ }
+ }
}
std::unordered_map<uint16_t, uint32_t>
Distributor::getMinReplica() const
{
// TODO STRIPE merged snapshot from all stripes
- return _stripe->getMinReplica();
+ if (_use_legacy_mode) {
+ return _stripe->getMinReplica();
+ } else {
+ return first_stripe().getMinReplica();
+ }
}
BucketSpacesStatsProvider::PerNodeBucketSpacesStats
Distributor::getBucketSpacesStats() const
{
// TODO STRIPE merged snapshot from all stripes
- return _stripe->getBucketSpacesStats();
+ if (_use_legacy_mode) {
+ return _stripe->getBucketSpacesStats();
+ } else {
+ return first_stripe().getBucketSpacesStats();
+ }
}
SimpleMaintenanceScanner::PendingMaintenanceStats
Distributor::pending_maintenance_stats() const {
// TODO STRIPE merged snapshot from all stripes
- return _stripe->pending_maintenance_stats();
+ if (_use_legacy_mode) {
+ return _stripe->pending_maintenance_stats();
+ } else {
+ return first_stripe().pending_maintenance_stats();
+ }
}
void
Distributor::propagateInternalScanMetricsToExternal()
{
- _stripe->propagateInternalScanMetricsToExternal();
+ // TODO STRIPE propagate to all stripes
+ // TODO STRIPE reconsider metric wiring...
+ if (_use_legacy_mode) {
+ _stripe->propagateInternalScanMetricsToExternal();
+ } else {
+ first_stripe().propagateInternalScanMetricsToExternal();
+ }
}
void
@@ -420,21 +494,23 @@ Distributor::doCriticalTick(framework::ThreadIndex idx)
}
// Propagates any new configs down to stripe(s)
enableNextConfig();
- // TODO STRIPE only do in legacy mode, use stripe pool ticking otherwise
- _stripe->doCriticalTick(idx);
- _tickResult.merge(_stripe->_tickResult);
+ if (_use_legacy_mode) {
+ _stripe->doCriticalTick(idx);
+ _tickResult.merge(_stripe->_tickResult);
+ }
return _tickResult;
}
framework::ThreadWaitInfo
Distributor::doNonCriticalTick(framework::ThreadIndex idx)
{
- if (!_use_legacy_mode) {
+ if (_use_legacy_mode) {
+ _stripe->doNonCriticalTick(idx);
+ _tickResult = _stripe->_tickResult;
+ } else {
_bucket_db_updater->resend_delayed_messages();
+ _tickResult = framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN;
}
- // TODO STRIPE stripes need their own thread loops!
- _stripe->doNonCriticalTick(idx);
- _tickResult = _stripe->_tickResult;
return _tickResult;
}
@@ -463,27 +539,53 @@ Distributor::enableNextConfig() // TODO STRIPE rename to enable_next_config_if_c
vespalib::string
Distributor::getReportContentType(const framework::HttpUrlPath& path) const
{
- return _stripe->getReportContentType(path);
+ // This is const thread safe
+ // TODO STRIPE we should probably do this in the top-level distributor
+ if (_use_legacy_mode) {
+ return _stripe->getReportContentType(path);
+ } else {
+ return first_stripe().getReportContentType(path);
+ }
}
std::string
Distributor::getActiveIdealStateOperations() const
{
- return _stripe->getActiveIdealStateOperations();
+ // TODO STRIPE need to aggregate status responses _across_ stripes..!
+ if (_use_legacy_mode) {
+ return _stripe->getActiveIdealStateOperations();
+ } else {
+ auto guard = _stripe_accessor->rendezvous_and_hold_all();
+ return first_stripe().getActiveIdealStateOperations();
+ }
}
bool
Distributor::reportStatus(std::ostream& out,
const framework::HttpUrlPath& path) const
{
- return _stripe->reportStatus(out, path);
+ // TODO STRIPE need to aggregate status responses _across_ stripes..!
+ if (_use_legacy_mode) {
+ return _stripe->reportStatus(out, path);
+ } else {
+ auto guard = _stripe_accessor->rendezvous_and_hold_all();
+ return first_stripe().reportStatus(out, path);
+ }
}
bool
Distributor::handleStatusRequest(const DelegatedStatusRequest& request) const
{
// TODO STRIPE need to aggregate status responses _across_ stripes..!
- return _stripe->handleStatusRequest(request);
+ if (_use_legacy_mode) {
+ return _stripe->handleStatusRequest(request);
+ } else {
+ // Can't hold guard here or we'll deadlock by never allowing the thread to process the request
+ bool handled = first_stripe().handleStatusRequest(request);
+ // TODO STRIPE wake up stripe thread; handleStatusRequest waits for completion
+ // (not really needed since this will be removed)
+ return handled;
+ }
}
}
diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h
index 0420f1b1f22..d9aada78c79 100644
--- a/storage/src/vespa/storage/distributor/distributor.h
+++ b/storage/src/vespa/storage/distributor/distributor.h
@@ -40,8 +40,9 @@ class BucketDBUpdater;
class DistributorBucketSpaceRepo;
class DistributorStatus;
class DistributorStripe;
+class DistributorStripePool;
+class StripeAccessor;
class OperationSequencer;
-class LegacySingleStripeAccessor;
class OwnershipTransferSafeTimePointCalculator;
class SimpleMaintenanceScanner;
class ThrottlingOperationStarter;
@@ -118,6 +119,10 @@ private:
friend class DistributorTestUtil;
friend class MetricUpdateHook;
+ // TODO STRIPE remove
+ DistributorStripe& first_stripe() noexcept;
+ const DistributorStripe& first_stripe() const noexcept;
+
void setNodeStateUp();
bool handleMessage(const std::shared_ptr<api::StorageMessage>& msg);
@@ -171,8 +176,10 @@ private:
ChainedMessageSender* _messageSender;
const bool _use_legacy_mode;
// TODO STRIPE multiple stripes...! This is for proof of concept of wiring.
- std::unique_ptr<DistributorStripe> _stripe;
- std::unique_ptr<LegacySingleStripeAccessor> _stripe_accessor;
+ std::unique_ptr<DistributorStripe> _stripe;
+ std::unique_ptr<DistributorStripePool> _stripe_pool;
+ std::vector<std::unique_ptr<DistributorStripe>> _stripes;
+ std::unique_ptr<StripeAccessor> _stripe_accessor;
distributor::DistributorComponent _component;
std::shared_ptr<const DistributorConfiguration> _total_config;
std::unique_ptr<BucketDBUpdater> _bucket_db_updater;
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
index 1f6a5b318fd..e9969b79bd4 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe.cpp
+++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
@@ -60,6 +60,7 @@ DistributorStripe::DistributorStripe(DistributorComponentRegister& compReg,
_externalOperationHandler(_component, _component, getMetrics(), getMessageSender(),
*_operation_sequencer, *this, _component,
_idealStateManager, _operationOwner),
+ _external_message_mutex(),
_threadPool(threadPool),
_doneInitializeHandler(doneInitHandler),
_doneInitializing(false),
@@ -168,14 +169,19 @@ DistributorStripe::handle_or_enqueue_message(const std::shared_ptr<api::StorageM
if (_externalOperationHandler.try_handle_message_outside_main_thread(msg)) {
return true;
}
- // TODO STRIPE redesign how message queue guarding and wakeup is performed.
- // Currently involves a _thread pool global_ lock transitively via tick guard!
- framework::TickingLockGuard guard(_threadPool.freezeCriticalTicks());
MBUS_TRACE(msg->getTrace(), 9,
"Distributor: Added to message queue. Thread state: "
+ _threadPool.getStatus());
- _messageQueue.push_back(msg);
- guard.broadcast();
+ if (_use_legacy_mode) {
+ // TODO STRIPE remove
+ framework::TickingLockGuard guard(_threadPool.freezeCriticalTicks());
+ _messageQueue.push_back(msg);
+ guard.broadcast();
+ } else {
+ std::lock_guard lock(_external_message_mutex);
+ _messageQueue.push_back(msg);
+ // Caller has the responsibility to wake up correct stripe
+ }
return true;
}
@@ -727,6 +733,7 @@ DistributorStripe::scanNextBucket()
void DistributorStripe::send_updated_host_info_if_required() {
if (_must_send_updated_host_info) {
+ // TODO STRIPE how to handle with multiple stripes?
_component.getStateUpdater().immediately_send_get_node_state_replies();
_must_send_updated_host_info = false;
}
@@ -745,10 +752,9 @@ framework::ThreadWaitInfo
DistributorStripe::doCriticalTick(framework::ThreadIndex)
{
_tickResult = framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN;
- if (_use_legacy_mode) {
- enableNextDistribution();
- enableNextConfig();
- }
+ assert(_use_legacy_mode);
+ enableNextDistribution();
+ enableNextConfig();
fetchStatusRequests();
fetchExternalMessages();
return _tickResult;
@@ -758,6 +764,11 @@ framework::ThreadWaitInfo
DistributorStripe::doNonCriticalTick(framework::ThreadIndex)
{
_tickResult = framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN;
+ if (!_use_legacy_mode) {
+ std::lock_guard lock(_external_message_mutex);
+ fetchStatusRequests();
+ fetchExternalMessages();
+ }
handleStatusRequests();
startExternalOperations();
if (initializing()) {
@@ -920,14 +931,19 @@ DistributorStripe::reportStatus(std::ostream& out,
return true;
}
+// TODO STRIPE remove this; delegated to top-level Distributor only
bool
DistributorStripe::handleStatusRequest(const DelegatedStatusRequest& request) const
{
auto wrappedRequest = std::make_shared<DistributorStatus>(request);
- {
+ if (_use_legacy_mode) {
framework::TickingLockGuard guard(_threadPool.freezeCriticalTicks());
_statusToDo.push_back(wrappedRequest);
guard.broadcast();
+ } else {
+ std::lock_guard lock(_external_message_mutex);
+ _statusToDo.push_back(wrappedRequest);
+ // FIXME won't be woken up explicitly, but will be processed after 1ms anyway.
}
wrappedRequest->waitForCompletion();
return true;
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.h b/storage/src/vespa/storage/distributor/distributor_stripe.h
index 7b34367cecb..7b0cfc66a1b 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe.h
+++ b/storage/src/vespa/storage/distributor/distributor_stripe.h
@@ -22,6 +22,7 @@
#include <vespa/storageapi/message/state.h>
#include <vespa/storageframework/generic/metric/metricupdatehook.h>
#include <vespa/storageframework/generic/thread/tickingthread.h>
+#include <mutex>
#include <queue>
#include <unordered_map>
@@ -298,6 +299,7 @@ private:
std::vector<std::shared_ptr<api::StorageMessage>>,
IndirectHigherPriority
>;
+ mutable std::mutex _external_message_mutex;
MessageQueue _messageQueue;
ClientRequestPriorityQueue _client_request_priority_queue;
MessageQueue _fetchedMessages;
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_pool.h b/storage/src/vespa/storage/distributor/distributor_stripe_pool.h
index 9149482cd5d..4ac52b0ede8 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe_pool.h
+++ b/storage/src/vespa/storage/distributor/distributor_stripe_pool.h
@@ -62,10 +62,10 @@ public:
void park_all_threads() noexcept;
void unpark_all_threads() noexcept;
- [[nodiscard]] const DistributorStripeThread& stripe(size_t idx) const noexcept {
+ [[nodiscard]] const DistributorStripeThread& stripe_thread(size_t idx) const noexcept {
return *_stripes[idx];
}
- [[nodiscard]] DistributorStripeThread& stripe(size_t idx) noexcept {
+ [[nodiscard]] DistributorStripeThread& stripe_thread(size_t idx) noexcept {
return *_stripes[idx];
}
[[nodiscard]] size_t stripe_count() const noexcept { return _stripes.size(); }
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_thread.h b/storage/src/vespa/storage/distributor/distributor_stripe_thread.h
index b02d733895e..60f10889afd 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe_thread.h
+++ b/storage/src/vespa/storage/distributor/distributor_stripe_thread.h
@@ -53,6 +53,9 @@ public:
TickableStripe* operator->() noexcept { return &_stripe; }
const TickableStripe* operator->() const noexcept { return &_stripe; }
+
+ TickableStripe& stripe() noexcept { return _stripe; }
+ const TickableStripe& stripe() const noexcept { return _stripe; }
private:
[[nodiscard]] bool should_stop_thread_relaxed() const noexcept {
return _should_stop.load(std::memory_order_relaxed);
diff --git a/storage/src/vespa/storage/distributor/legacy_single_stripe_accessor.cpp b/storage/src/vespa/storage/distributor/legacy_single_stripe_accessor.cpp
index 0c6c0206608..29b71436fe8 100644
--- a/storage/src/vespa/storage/distributor/legacy_single_stripe_accessor.cpp
+++ b/storage/src/vespa/storage/distributor/legacy_single_stripe_accessor.cpp
@@ -14,6 +14,10 @@ LegacySingleStripeAccessGuard::~LegacySingleStripeAccessGuard() {
_accessor.mark_guard_released();
}
+void LegacySingleStripeAccessGuard::flush_and_close() {
+ _stripe.flush_and_close();
+}
+
void LegacySingleStripeAccessGuard::update_total_distributor_config(std::shared_ptr<const DistributorConfiguration> config) {
_stripe.update_total_distributor_config(std::move(config));
}
diff --git a/storage/src/vespa/storage/distributor/legacy_single_stripe_accessor.h b/storage/src/vespa/storage/distributor/legacy_single_stripe_accessor.h
index caf1e397e5b..f119462038e 100644
--- a/storage/src/vespa/storage/distributor/legacy_single_stripe_accessor.h
+++ b/storage/src/vespa/storage/distributor/legacy_single_stripe_accessor.h
@@ -21,6 +21,8 @@ public:
DistributorStripe& stripe);
~LegacySingleStripeAccessGuard() override;
+ void flush_and_close() override;
+
void update_total_distributor_config(std::shared_ptr<const DistributorConfiguration> config) override;
void update_distribution_config(const BucketSpaceDistributionConfigs& new_configs) override;
diff --git a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp
index 6bc9c03158a..03e47d2cb67 100644
--- a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp
+++ b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp
@@ -21,6 +21,10 @@ MultiThreadedStripeAccessGuard::~MultiThreadedStripeAccessGuard() {
_accessor.mark_guard_released();
}
+void MultiThreadedStripeAccessGuard::flush_and_close() {
+ first_stripe().flush_and_close();
+}
+
void MultiThreadedStripeAccessGuard::update_total_distributor_config(std::shared_ptr<const DistributorConfiguration> config) {
// TODO STRIPE multiple stripes
first_stripe().update_total_distributor_config(std::move(config));
@@ -95,7 +99,7 @@ void MultiThreadedStripeAccessGuard::clear_read_only_bucket_repo_databases() {
}
DistributorStripe& MultiThreadedStripeAccessGuard::first_stripe() noexcept {
- return dynamic_cast<DistributorStripe&>(_stripe_pool.stripe(0));
+ return dynamic_cast<DistributorStripe&>(_stripe_pool.stripe_thread(0).stripe());
}
std::unique_ptr<StripeAccessGuard> MultiThreadedStripeAccessor::rendezvous_and_hold_all() {
diff --git a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h
index 376eccd1c4a..03e36c29bba 100644
--- a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h
+++ b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h
@@ -27,6 +27,8 @@ public:
DistributorStripePool& stripe_pool);
~MultiThreadedStripeAccessGuard() override;
+ void flush_and_close() override;
+
void update_total_distributor_config(std::shared_ptr<const DistributorConfiguration> config) override;
void update_distribution_config(const BucketSpaceDistributionConfigs& new_configs) override;
@@ -65,7 +67,7 @@ class MultiThreadedStripeAccessor : public StripeAccessor {
friend class MultiThreadedStripeAccessGuard;
public:
- MultiThreadedStripeAccessor(DistributorStripePool& stripe_pool)
+ explicit MultiThreadedStripeAccessor(DistributorStripePool& stripe_pool)
: _stripe_pool(stripe_pool),
_guard_held(false)
{}
diff --git a/storage/src/vespa/storage/distributor/stripe_access_guard.h b/storage/src/vespa/storage/distributor/stripe_access_guard.h
index 69aae755dec..1d570cbb3bc 100644
--- a/storage/src/vespa/storage/distributor/stripe_access_guard.h
+++ b/storage/src/vespa/storage/distributor/stripe_access_guard.h
@@ -28,6 +28,8 @@ class StripeAccessGuard {
public:
virtual ~StripeAccessGuard() = default;
+ virtual void flush_and_close() = 0;
+
virtual void update_total_distributor_config(std::shared_ptr<const DistributorConfiguration> config) = 0;
virtual void update_distribution_config(const BucketSpaceDistributionConfigs& new_configs) = 0;