summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--storage/src/tests/distributor/bucketdbupdatertest.cpp1
-rw-r--r--storage/src/tests/distributor/distributortest.cpp17
-rw-r--r--storage/src/tests/distributor/distributortestutil.cpp12
-rw-r--r--storage/src/tests/distributor/distributortestutil.h2
-rw-r--r--storage/src/tests/distributor/externaloperationhandlertest.cpp1
-rw-r--r--storage/src/tests/distributor/getoperationtest.cpp1
-rw-r--r--storage/src/tests/distributor/idealstatemanagertest.cpp1
-rw-r--r--storage/src/tests/distributor/putoperationtest.cpp1
-rw-r--r--storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp1
-rw-r--r--storage/src/tests/distributor/removelocationtest.cpp1
-rw-r--r--storage/src/tests/distributor/removeoperationtest.cpp1
-rw-r--r--storage/src/tests/distributor/statecheckerstest.cpp1
-rw-r--r--storage/src/tests/distributor/twophaseupdateoperationtest.cpp1
-rw-r--r--storage/src/tests/distributor/updateoperationtest.cpp1
-rw-r--r--storage/src/tests/distributor/visitoroperationtest.cpp1
-rw-r--r--storage/src/vespa/storage/distributor/CMakeLists.txt2
-rw-r--r--storage/src/vespa/storage/distributor/bucketdbupdater.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/bucketdbupdater.h4
-rw-r--r--storage/src/vespa/storage/distributor/distributor.cpp862
-rw-r--r--storage/src/vespa/storage/distributor/distributor.h160
-rw-r--r--storage/src/vespa/storage/distributor/distributor_status.cpp32
-rw-r--r--storage/src/vespa/storage/distributor/distributor_status.h41
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe.cpp903
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe.h352
-rw-r--r--storage/src/vespa/storage/distributor/distributorcomponent.cpp11
-rw-r--r--storage/src/vespa/storage/distributor/distributorcomponent.h5
-rw-r--r--storage/src/vespa/storage/distributor/idealstatemanager.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/idealstatemanager.h4
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp9
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp12
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp1
31 files changed, 1590 insertions, 855 deletions
diff --git a/storage/src/tests/distributor/bucketdbupdatertest.cpp b/storage/src/tests/distributor/bucketdbupdatertest.cpp
index 1b93e728a04..4ec49b5c6f8 100644
--- a/storage/src/tests/distributor/bucketdbupdatertest.cpp
+++ b/storage/src/tests/distributor/bucketdbupdatertest.cpp
@@ -13,6 +13,7 @@
#include <vespa/document/bucket/fixed_bucket_spaces.h>
#include <vespa/storage/distributor/simpleclusterinformation.h>
#include <vespa/storage/distributor/distributor.h>
+#include <vespa/storage/distributor/distributor_stripe.h>
#include <vespa/storage/distributor/distributor_bucket_space.h>
#include <vespa/vespalib/gtest/gtest.h>
#include <vespa/vespalib/text/stringtokenizer.h>
diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp
index 564cd2bc876..61c74a263cf 100644
--- a/storage/src/tests/distributor/distributortest.cpp
+++ b/storage/src/tests/distributor/distributortest.cpp
@@ -13,6 +13,8 @@
#include <vespa/document/test/make_bucket_space.h>
#include <vespa/storage/config/config-stor-distributormanager.h>
#include <vespa/storage/distributor/distributor.h>
+#include <vespa/storage/distributor/distributor_stripe.h>
+#include <vespa/storage/distributor/distributor_status.h>
#include <vespa/storage/distributor/distributor_bucket_space.h>
#include <vespa/storage/distributor/distributormetricsset.h>
#include <vespa/vespalib/text/stringtokenizer.h>
@@ -65,8 +67,7 @@ struct DistributorTest : Test, DistributorTestUtil {
}
auto currentReplicaCountingMode() const noexcept {
- return _distributor->_bucketDBMetricUpdater
- .getMinimumReplicaCountingMode();
+ return _distributor->bucket_db_metric_updater().getMinimumReplicaCountingMode();
}
std::string testOp(std::shared_ptr<api::StorageMessage> msg)
@@ -141,23 +142,25 @@ struct DistributorTest : Test, DistributorTestUtil {
}
StatusReporterDelegate& distributor_status_delegate() {
- return _distributor->_distributorStatusDelegate;
+ // TODO STRIPE
+ return _distributor->_stripe->_distributorStatusDelegate;
}
framework::TickingThreadPool& distributor_thread_pool() {
return _distributor->_threadPool;
}
- const std::vector<std::shared_ptr<Distributor::Status>>& distributor_status_todos() {
- return _distributor->_statusToDo;
+ const std::vector<std::shared_ptr<DistributorStatus>>& distributor_status_todos() {
+ // TODO STRIPE
+ return _distributor->_stripe->_statusToDo;
}
Distributor::MetricUpdateHook distributor_metric_update_hook() {
return _distributor->_metricUpdateHook;
}
- SimpleMaintenanceScanner::PendingMaintenanceStats& distributor_maintenance_stats() {
- return _distributor->_maintenanceStats;
+ SimpleMaintenanceScanner::PendingMaintenanceStats distributor_maintenance_stats() {
+ return _distributor->pending_maintenance_stats();
}
BucketSpacesStatsProvider::PerNodeBucketSpacesStats distributor_bucket_spaces_stats() {
diff --git a/storage/src/tests/distributor/distributortestutil.cpp b/storage/src/tests/distributor/distributortestutil.cpp
index 2802b976256..b465edf5d16 100644
--- a/storage/src/tests/distributor/distributortestutil.cpp
+++ b/storage/src/tests/distributor/distributortestutil.cpp
@@ -4,6 +4,7 @@
#include <vespa/document/test/make_bucket_space.h>
#include <vespa/document/test/make_document_bucket.h>
#include <vespa/storage/distributor/distributor.h>
+#include <vespa/storage/distributor/distributor_stripe.h>
#include <vespa/storage/distributor/distributor_bucket_space.h>
#include <vespa/storage/distributor/distributorcomponent.h>
#include <vespa/vdslib/distribution/distribution.h>
@@ -257,6 +258,7 @@ DistributorTestUtil::removeFromBucketDB(const document::BucketId& id)
void
DistributorTestUtil::addIdealNodes(const document::BucketId& id)
{
+ // TODO STRIPE roundabout way of getting state bundle..!
addIdealNodes(*distributor_component().getClusterStateBundle().getBaselineClusterState(), id);
}
@@ -338,20 +340,21 @@ DistributorTestUtil::disableBucketActivationInConfig(bool disable)
BucketDBUpdater&
DistributorTestUtil::getBucketDBUpdater() {
- return _distributor->_bucketDBUpdater;
+ return _distributor->bucket_db_updater();
}
IdealStateManager&
DistributorTestUtil::getIdealStateManager() {
- return _distributor->_idealStateManager;
+ return _distributor->ideal_state_manager();
}
ExternalOperationHandler&
DistributorTestUtil::getExternalOperationHandler() {
- return _distributor->_externalOperationHandler;
+ return _distributor->external_operation_handler();
}
storage::distributor::DistributorComponent&
DistributorTestUtil::distributor_component() {
- return _distributor->_component;
+ // TODO STRIPE tests use this to indirectly access bucket space repos/DBs!
+ return _distributor->distributor_component();
}
bool
@@ -369,6 +372,7 @@ DistributorTestUtil::tick() {
DistributorConfiguration&
DistributorTestUtil::getConfig() {
+ // TODO STRIPE avoid const cast
return const_cast<DistributorConfiguration&>(_distributor->getConfig());
}
diff --git a/storage/src/tests/distributor/distributortestutil.h b/storage/src/tests/distributor/distributortestutil.h
index 630d466a72e..f450f2545db 100644
--- a/storage/src/tests/distributor/distributortestutil.h
+++ b/storage/src/tests/distributor/distributortestutil.h
@@ -22,10 +22,12 @@ class Distributor;
class DistributorBucketSpace;
class DistributorBucketSpaceRepo;
class DistributorComponent;
+class DistributorStripe;
class IdealStateManager;
class ExternalOperationHandler;
class Operation;
+// TODO STRIPE rename to DistributorStripeTestUtil?
class DistributorTestUtil : private DoneInitializeHandler
{
public:
diff --git a/storage/src/tests/distributor/externaloperationhandlertest.cpp b/storage/src/tests/distributor/externaloperationhandlertest.cpp
index a95418b0b74..1829808990a 100644
--- a/storage/src/tests/distributor/externaloperationhandlertest.cpp
+++ b/storage/src/tests/distributor/externaloperationhandlertest.cpp
@@ -8,6 +8,7 @@
#include <vespa/document/update/documentupdate.h>
#include <vespa/storage/common/reindexing_constants.h>
#include <vespa/storage/distributor/distributor.h>
+#include <vespa/storage/distributor/distributor_stripe.h>
#include <vespa/storage/distributor/distributor_bucket_space.h>
#include <vespa/storage/distributor/distributormetricsset.h>
#include <vespa/storage/distributor/externaloperationhandler.h>
diff --git a/storage/src/tests/distributor/getoperationtest.cpp b/storage/src/tests/distributor/getoperationtest.cpp
index fe87de5f18a..1123c354ef4 100644
--- a/storage/src/tests/distributor/getoperationtest.cpp
+++ b/storage/src/tests/distributor/getoperationtest.cpp
@@ -8,6 +8,7 @@
#include <vespa/storage/distributor/distributor_bucket_space.h>
#include <vespa/storage/distributor/externaloperationhandler.h>
#include <vespa/storage/distributor/distributor.h>
+#include <vespa/storage/distributor/distributor_stripe.h>
#include <vespa/storage/distributor/distributormetricsset.h>
#include <vespa/storage/distributor/operations/external/getoperation.h>
#include <tests/distributor/distributortestutil.h>
diff --git a/storage/src/tests/distributor/idealstatemanagertest.cpp b/storage/src/tests/distributor/idealstatemanagertest.cpp
index cae6bf9f226..fd23dd5d656 100644
--- a/storage/src/tests/distributor/idealstatemanagertest.cpp
+++ b/storage/src/tests/distributor/idealstatemanagertest.cpp
@@ -3,6 +3,7 @@
#include <vespa/storageapi/message/persistence.h>
#include <vespa/storage/distributor/bucketdbupdater.h>
#include <vespa/storage/distributor/distributor.h>
+#include <vespa/storage/distributor/distributor_stripe.h>
#include <vespa/storage/distributor/operations/idealstate/mergeoperation.h>
#include <vespa/storage/distributor/operation_sequencer.h>
#include <vespa/storageapi/message/stat.h>
diff --git a/storage/src/tests/distributor/putoperationtest.cpp b/storage/src/tests/distributor/putoperationtest.cpp
index bdb5eb9eb4d..c510e08ab2a 100644
--- a/storage/src/tests/distributor/putoperationtest.cpp
+++ b/storage/src/tests/distributor/putoperationtest.cpp
@@ -3,6 +3,7 @@
#include <vespa/document/repo/documenttyperepo.h>
#include <vespa/storage/distributor/operations/external/putoperation.h>
#include <vespa/storage/distributor/distributor.h>
+#include <vespa/storage/distributor/distributor_stripe.h>
#include <vespa/storage/distributor/distributor_bucket_space.h>
#include <vespa/storageapi/message/bucket.h>
#include <vespa/storageapi/message/persistence.h>
diff --git a/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp b/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp
index 91fb560f381..02491b670c6 100644
--- a/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp
+++ b/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp
@@ -8,6 +8,7 @@
#include <vespa/storage/distributor/operations/external/read_for_write_visitor_operation.h>
#include <vespa/storage/distributor/operations/external/visitoroperation.h>
#include <vespa/storage/distributor/distributor.h>
+#include <vespa/storage/distributor/distributor_stripe.h>
#include <vespa/storage/distributor/distributormetricsset.h>
#include <vespa/storage/distributor/pendingmessagetracker.h>
#include <vespa/storage/distributor/uuid_generator.h>
diff --git a/storage/src/tests/distributor/removelocationtest.cpp b/storage/src/tests/distributor/removelocationtest.cpp
index 779d6f60be0..7ba2995d8e3 100644
--- a/storage/src/tests/distributor/removelocationtest.cpp
+++ b/storage/src/tests/distributor/removelocationtest.cpp
@@ -5,6 +5,7 @@
#include <tests/distributor/distributortestutil.h>
#include <vespa/document/test/make_document_bucket.h>
#include <vespa/storage/distributor/distributor.h>
+#include <vespa/storage/distributor/distributor_stripe.h>
#include <vespa/vespalib/gtest/gtest.h>
using document::test::makeDocumentBucket;
diff --git a/storage/src/tests/distributor/removeoperationtest.cpp b/storage/src/tests/distributor/removeoperationtest.cpp
index cc5452e2bfe..de76379e854 100644
--- a/storage/src/tests/distributor/removeoperationtest.cpp
+++ b/storage/src/tests/distributor/removeoperationtest.cpp
@@ -3,6 +3,7 @@
#include <iomanip>
#include <tests/common/dummystoragelink.h>
#include <vespa/storage/distributor/distributor.h>
+#include <vespa/storage/distributor/distributor_stripe.h>
#include <vespa/storageapi/message/persistence.h>
#include <tests/distributor/distributortestutil.h>
#include <vespa/document/test/make_document_bucket.h>
diff --git a/storage/src/tests/distributor/statecheckerstest.cpp b/storage/src/tests/distributor/statecheckerstest.cpp
index 314e5b27e25..2f4c386e1ed 100644
--- a/storage/src/tests/distributor/statecheckerstest.cpp
+++ b/storage/src/tests/distributor/statecheckerstest.cpp
@@ -7,6 +7,7 @@
#include <vespa/document/test/make_document_bucket.h>
#include <vespa/storage/distributor/bucketdbupdater.h>
#include <vespa/storage/distributor/distributor.h>
+#include <vespa/storage/distributor/distributor_stripe.h>
#include <vespa/storage/distributor/distributor_bucket_space.h>
#include <vespa/storage/distributor/operations/idealstate/mergeoperation.h>
#include <vespa/storage/distributor/statecheckers.h>
diff --git a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
index 924678a6cd0..58556832f2d 100644
--- a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
+++ b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
@@ -10,6 +10,7 @@
#include <tests/distributor/distributortestutil.h>
#include <vespa/document/test/make_document_bucket.h>
#include <vespa/storage/distributor/distributor.h>
+#include <vespa/storage/distributor/distributor_stripe.h>
#include <vespa/document/fieldset/fieldsets.h>
#include <vespa/vespalib/gtest/gtest.h>
#include <gmock/gmock.h>
diff --git a/storage/src/tests/distributor/updateoperationtest.cpp b/storage/src/tests/distributor/updateoperationtest.cpp
index d3a8b270ad0..ea9f0d86ac4 100644
--- a/storage/src/tests/distributor/updateoperationtest.cpp
+++ b/storage/src/tests/distributor/updateoperationtest.cpp
@@ -10,6 +10,7 @@
#include <vespa/document/test/make_document_bucket.h>
#include <vespa/storage/distributor/operations/external/updateoperation.h>
#include <vespa/storage/distributor/distributor.h>
+#include <vespa/storage/distributor/distributor_stripe.h>
#include <vespa/config/helper/configgetter.hpp>
#include <vespa/vespalib/gtest/gtest.h>
diff --git a/storage/src/tests/distributor/visitoroperationtest.cpp b/storage/src/tests/distributor/visitoroperationtest.cpp
index c4f72c312c7..ccbb64e8970 100644
--- a/storage/src/tests/distributor/visitoroperationtest.cpp
+++ b/storage/src/tests/distributor/visitoroperationtest.cpp
@@ -10,6 +10,7 @@
#include <vespa/storage/distributor/distributormetricsset.h>
#include <tests/distributor/distributortestutil.h>
#include <vespa/storage/distributor/distributor.h>
+#include <vespa/storage/distributor/distributor_stripe.h>
#include <vespa/document/fieldset/fieldsets.h>
#include <vespa/document/test/make_bucket_space.h>
#include <vespa/vespalib/gtest/gtest.h>
diff --git a/storage/src/vespa/storage/distributor/CMakeLists.txt b/storage/src/vespa/storage/distributor/CMakeLists.txt
index f3ba6af6e0c..2b5423c60e4 100644
--- a/storage/src/vespa/storage/distributor/CMakeLists.txt
+++ b/storage/src/vespa/storage/distributor/CMakeLists.txt
@@ -14,6 +14,8 @@ vespa_add_library(storage_distributor
distributor_bucket_space_repo.cpp
distributor.cpp
distributor_host_info_reporter.cpp
+ distributor_status.cpp
+ distributor_stripe.cpp
distributorcomponent.cpp
distributormessagesender.cpp
distributormetricsset.cpp
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
index 12fd14c260e..2a76ed5a26a 100644
--- a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
+++ b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
@@ -24,7 +24,7 @@ using document::BucketSpace;
namespace storage::distributor {
-BucketDBUpdater::BucketDBUpdater(Distributor& owner,
+BucketDBUpdater::BucketDBUpdater(DistributorInterface& owner,
DistributorBucketSpaceRepo& bucketSpaceRepo,
DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo,
DistributorMessageSender& sender,
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.h b/storage/src/vespa/storage/distributor/bucketdbupdater.h
index 8fab76575e9..d80d823a7d1 100644
--- a/storage/src/vespa/storage/distributor/bucketdbupdater.h
+++ b/storage/src/vespa/storage/distributor/bucketdbupdater.h
@@ -26,7 +26,7 @@ class XmlAttribute;
namespace storage::distributor {
-class Distributor;
+class DistributorInterface;
class BucketSpaceDistributionContext;
class BucketDBUpdater : public framework::StatusReporter,
@@ -34,7 +34,7 @@ class BucketDBUpdater : public framework::StatusReporter,
{
public:
using OutdatedNodesMap = dbtransition::OutdatedNodesMap;
- BucketDBUpdater(Distributor& owner,
+ BucketDBUpdater(DistributorInterface& owner,
DistributorBucketSpaceRepo& bucketSpaceRepo,
DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo,
DistributorMessageSender& sender,
diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp
index cb4336d9f1e..665478e68a2 100644
--- a/storage/src/vespa/storage/distributor/distributor.cpp
+++ b/storage/src/vespa/storage/distributor/distributor.cpp
@@ -3,6 +3,8 @@
#include "blockingoperationstarter.h"
#include "distributor.h"
#include "distributor_bucket_space.h"
+#include "distributor_status.h"
+#include "distributor_stripe.h"
#include "distributormetricsset.h"
#include "idealstatemetricsset.h"
#include "operation_sequencer.h"
@@ -26,45 +28,16 @@ using namespace std::chrono_literals;
namespace storage::distributor {
-class Distributor::Status {
- const DelegatedStatusRequest& _request;
- std::mutex _lock;
- std::condition_variable _cond;
- bool _done;
-
-public:
- Status(const DelegatedStatusRequest& request) noexcept
- : _request(request),
- _lock(),
- _cond(),
- _done(false)
- {}
-
- std::ostream& getStream() {
- return _request.outputStream;
- }
- const framework::HttpUrlPath& getPath() const {
- return _request.path;
- }
- const framework::StatusReporter& getReporter() const {
- return _request.reporter;
- }
-
- void notifyCompleted() {
- {
- std::lock_guard guard(_lock);
- _done = true;
- }
- _cond.notify_all();
- }
- void waitForCompletion() {
- std::unique_lock guard(_lock);
- while (!_done) {
- _cond.wait(guard);
- }
- }
-};
-
+/* TODO STRIPE
+ * - need a DistributorComponent per stripe
+ * - or better, remove entirely!
+ * - probably also DistributorInterface since it's used to send
+ * - metrics aggregation
+ * - host info aggregation..!!
+ * - handled if Distributor getMinReplica etc delegates to stripes?
+ * - these are already thread safe
+ * - status aggregation
+ */
Distributor::Distributor(DistributorComponentRegister& compReg,
const NodeIdentity& node_identity,
framework::TickingThreadPool& threadPool,
@@ -75,56 +48,27 @@ Distributor::Distributor(DistributorComponentRegister& compReg,
: StorageLink("distributor"),
DistributorInterface(),
framework::StatusReporter("distributor", "Distributor"),
- _clusterStateBundle(lib::ClusterState()),
+ _metrics(std::make_shared<DistributorMetricSet>()),
+ _messageSender(messageSender),
+ _stripe(std::make_unique<DistributorStripe>(compReg, *_metrics, node_identity, threadPool, doneInitHandler,
+ manageActiveBucketCopies, *this)),
+ // TODO STRIPE remove once DistributorComponent no longer references bucket space repos
_bucketSpaceRepo(std::make_unique<DistributorBucketSpaceRepo>(node_identity.node_index())),
_readOnlyBucketSpaceRepo(std::make_unique<DistributorBucketSpaceRepo>(node_identity.node_index())),
+ // TODO STRIPE slim down
_component(*this, *_bucketSpaceRepo, *_readOnlyBucketSpaceRepo, compReg, "distributor"),
- _metrics(std::make_shared<DistributorMetricSet>()),
- _operationOwner(*this, _component.getClock()),
- _maintenanceOperationOwner(*this, _component.getClock()),
- _operation_sequencer(std::make_unique<OperationSequencer>()),
- _pendingMessageTracker(compReg),
- _bucketDBUpdater(*this, *_bucketSpaceRepo, *_readOnlyBucketSpaceRepo, *this, compReg),
_distributorStatusDelegate(compReg, *this, *this),
- _bucketDBStatusDelegate(compReg, *this, _bucketDBUpdater),
- _idealStateManager(*this, *_bucketSpaceRepo, *_readOnlyBucketSpaceRepo, compReg, manageActiveBucketCopies),
- _messageSender(messageSender),
- _externalOperationHandler(_component, _component, getMetrics(), getMessageSender(),
- *_operation_sequencer, *this, _component,
- _idealStateManager, _operationOwner),
_threadPool(threadPool),
- _initializingIsUp(true),
- _doneInitializeHandler(doneInitHandler),
- _doneInitializing(false),
- _bucketPriorityDb(std::make_unique<SimpleBucketPriorityDatabase>()),
- _scanner(std::make_unique<SimpleMaintenanceScanner>(*_bucketPriorityDb, _idealStateManager, *_bucketSpaceRepo)),
- _throttlingStarter(std::make_unique<ThrottlingOperationStarter>(_maintenanceOperationOwner)),
- _blockingStarter(std::make_unique<BlockingOperationStarter>(_pendingMessageTracker, *_operation_sequencer,
- *_throttlingStarter)),
- _scheduler(std::make_unique<MaintenanceScheduler>(_idealStateManager, *_bucketPriorityDb, *_blockingStarter)),
- _schedulingMode(MaintenanceScheduler::NORMAL_SCHEDULING_MODE),
- _recoveryTimeStarted(_component.getClock()),
_tickResult(framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN),
- _bucketIdHasher(std::make_unique<BucketGcTimeCalculator::BucketIdIdentityHasher>()),
_metricUpdateHook(*this),
_metricLock(),
- _maintenanceStats(),
- _bucketSpacesStats(),
- _bucketDbStats(),
- _hostInfoReporter(*this, *this),
- _ownershipSafeTimeCalc(std::make_unique<OwnershipTransferSafeTimePointCalculator>(0s)), // Set by config later
- _db_memory_sample_interval(30s),
- _last_db_memory_sample_time_point(),
- _inhibited_maintenance_tick_count(0),
- _must_send_updated_host_info(false)
+ _hostInfoReporter(*this, *this)
{
_component.registerMetric(*_metrics);
_component.registerMetricUpdateHook(_metricUpdateHook, framework::SecondTime(0));
_distributorStatusDelegate.registerStatusPage();
- _bucketDBStatusDelegate.registerStatusPage();
hostInfoReporterRegistrar.registerReporter(&_hostInfoReporter);
propagateDefaultDistribution(_component.getDistribution());
- propagateClusterStates();
};
Distributor::~Distributor()
@@ -133,37 +77,117 @@ Distributor::~Distributor()
closeNextLink();
}
+bool
+Distributor::isInRecoveryMode() const noexcept {
+ return _stripe->isInRecoveryMode();
+}
+
int
-Distributor::getDistributorIndex() const
-{
+Distributor::getDistributorIndex() const {
return _component.getIndex();
}
const PendingMessageTracker&
-Distributor::getPendingMessageTracker() const
-{
- return _pendingMessageTracker;
+Distributor::getPendingMessageTracker() const {
+ return _stripe->getPendingMessageTracker();
+}
+
+PendingMessageTracker&
+Distributor::getPendingMessageTracker() {
+ return _stripe->getPendingMessageTracker();
+}
+
+DistributorBucketSpaceRepo&
+Distributor::getBucketSpaceRepo() noexcept {
+ return _stripe->getBucketSpaceRepo();
+}
+
+const DistributorBucketSpaceRepo&
+Distributor::getBucketSpaceRepo() const noexcept {
+ return _stripe->getBucketSpaceRepo();
+}
+
+DistributorBucketSpaceRepo&
+Distributor::getReadOnlyBucketSpaceRepo() noexcept {
+ return _stripe->getReadOnlyBucketSpaceRepo();
+}
+
+const DistributorBucketSpaceRepo&
+Distributor::getReadyOnlyBucketSpaceRepo() const noexcept {
+ return _stripe->getReadOnlyBucketSpaceRepo();;
+}
+
+storage::distributor::DistributorComponent&
+Distributor::distributor_component() noexcept {
+ // TODO STRIPE We need to grab the stripe's component since tests like to access
+ // these things uncomfortably directly.
+ return _stripe->_component;
+}
+
+BucketDBUpdater&
+Distributor::bucket_db_updater() {
+ return _stripe->bucket_db_updater();
+}
+
+const BucketDBUpdater&
+Distributor::bucket_db_updater() const {
+ return _stripe->bucket_db_updater();
+}
+
+IdealStateManager&
+Distributor::ideal_state_manager() {
+ return _stripe->ideal_state_manager();
+}
+
+const IdealStateManager&
+Distributor::ideal_state_manager() const {
+ return _stripe->ideal_state_manager();
+}
+
+ExternalOperationHandler&
+Distributor::external_operation_handler() {
+ return _stripe->external_operation_handler();
+}
+
+const ExternalOperationHandler&
+Distributor::external_operation_handler() const {
+ return _stripe->external_operation_handler();
+}
+
+BucketDBMetricUpdater&
+Distributor::bucket_db_metric_updater() const noexcept {
+ return _stripe->_bucketDBMetricUpdater;
+}
+
+const DistributorConfiguration&
+Distributor::getConfig() const {
+ return _stripe->getConfig();
+}
+
+std::chrono::steady_clock::duration
+Distributor::db_memory_sample_interval() const noexcept {
+ return _stripe->db_memory_sample_interval();
+}
+
+bool Distributor::initializing() const {
+ return _stripe->initializing();
}
const lib::ClusterState*
Distributor::pendingClusterStateOrNull(const document::BucketSpace& space) const {
- return _bucketDBUpdater.pendingClusterStateOrNull(space);
+ return bucket_db_updater().pendingClusterStateOrNull(space);
}
void
-Distributor::sendCommand(const std::shared_ptr<api::StorageCommand>& cmd)
+Distributor::sendCommand(const std::shared_ptr<api::StorageCommand>&)
{
- if (cmd->getType() == api::MessageType::MERGEBUCKET) {
- api::MergeBucketCommand& merge(static_cast<api::MergeBucketCommand&>(*cmd));
- _idealStateManager.getMetrics().nodesPerMerge.addValue(merge.getNodes().size());
- }
- sendUp(cmd);
+ assert(false); // TODO STRIPE
}
void
-Distributor::sendReply(const std::shared_ptr<api::StorageReply>& reply)
+Distributor::sendReply(const std::shared_ptr<api::StorageReply>&)
{
- sendUp(reply);
+ assert(false); // TODO STRIPE
}
void
@@ -179,6 +203,7 @@ void
Distributor::onOpen()
{
LOG(debug, "Distributor::onOpen invoked");
+ _stripe->open();
setNodeStateUp();
framework::MilliSecTime maxProcessingTime(60 * 1000);
framework::MilliSecTime waitTime(1000);
@@ -200,38 +225,22 @@ void Distributor::send_shutdown_abort_reply(const std::shared_ptr<api::StorageMe
}
void Distributor::onClose() {
- for (auto& msg : _messageQueue) {
- if (!msg->getType().isReply()) {
- send_shutdown_abort_reply(msg);
- }
- }
- _messageQueue.clear();
- while (!_client_request_priority_queue.empty()) {
- send_shutdown_abort_reply(_client_request_priority_queue.top());
- _client_request_priority_queue.pop();
- }
-
LOG(debug, "Distributor::onClose invoked");
- _pendingMessageTracker.abort_deferred_tasks();
- _bucketDBUpdater.flush();
- _externalOperationHandler.close_pending();
- _operationOwner.onClose();
- _maintenanceOperationOwner.onClose();
+ _stripe->close();
}
-void Distributor::send_up_without_tracking(const std::shared_ptr<api::StorageMessage>& msg) {
- if (_messageSender) {
- _messageSender->sendUp(msg);
- } else {
- StorageLink::sendUp(msg);
- }
+void Distributor::send_up_without_tracking(const std::shared_ptr<api::StorageMessage>&) {
+ assert(false);
}
void
Distributor::sendUp(const std::shared_ptr<api::StorageMessage>& msg)
{
- _pendingMessageTracker.insert(msg);
- send_up_without_tracking(msg);
+ if (_messageSender) {
+ _messageSender->sendUp(msg);
+ } else {
+ StorageLink::sendUp(msg);
+ }
}
void
@@ -247,281 +256,72 @@ Distributor::sendDown(const std::shared_ptr<api::StorageMessage>& msg)
bool
Distributor::onDown(const std::shared_ptr<api::StorageMessage>& msg)
{
- if (_externalOperationHandler.try_handle_message_outside_main_thread(msg)) {
- return true;
- }
- framework::TickingLockGuard guard(_threadPool.freezeCriticalTicks());
- MBUS_TRACE(msg->getTrace(), 9,
- "Distributor: Added to message queue. Thread state: "
- + _threadPool.getStatus());
- _messageQueue.push_back(msg);
- guard.broadcast();
- return true;
+ return _stripe->onDown(msg);
}
void
Distributor::handleCompletedMerge(
- const std::shared_ptr<api::MergeBucketReply>& reply)
+ const std::shared_ptr<api::MergeBucketReply>&)
{
- _maintenanceOperationOwner.handleReply(reply);
-}
-
-bool
-Distributor::isMaintenanceReply(const api::StorageReply& reply) const
-{
- switch (reply.getType().getId()) {
- case api::MessageType::CREATEBUCKET_REPLY_ID:
- case api::MessageType::MERGEBUCKET_REPLY_ID:
- case api::MessageType::DELETEBUCKET_REPLY_ID:
- case api::MessageType::REQUESTBUCKETINFO_REPLY_ID:
- case api::MessageType::SPLITBUCKET_REPLY_ID:
- case api::MessageType::JOINBUCKETS_REPLY_ID:
- case api::MessageType::SETBUCKETSTATE_REPLY_ID:
- case api::MessageType::REMOVELOCATION_REPLY_ID:
- return true;
- default:
- return false;
- }
+ assert(false);
}
bool
-Distributor::handleReply(const std::shared_ptr<api::StorageReply>& reply)
+Distributor::isMaintenanceReply(const api::StorageReply&) const
{
- document::Bucket bucket = _pendingMessageTracker.reply(*reply);
-
- if (reply->getResult().getResult() == api::ReturnCode::BUCKET_NOT_FOUND &&
- bucket.getBucketId() != document::BucketId(0) &&
- reply->getAddress())
- {
- recheckBucketInfo(reply->getAddress()->getIndex(), bucket);
- }
-
- if (reply->callHandler(_bucketDBUpdater, reply)) {
- return true;
- }
-
- if (_operationOwner.handleReply(reply)) {
- return true;
- }
-
- if (_maintenanceOperationOwner.handleReply(reply)) {
- _scanner->prioritizeBucket(bucket);
- return true;
- }
-
- // If it's a maintenance operation reply, it's most likely a reply to an
- // operation whose state was flushed from the distributor when its node
- // went down in the cluster state. Just swallow the reply to avoid getting
- // warnings about unhandled messages at the bottom of the link chain.
- return isMaintenanceReply(*reply);
+ assert(false);
}
bool
-Distributor::generateOperation(
- const std::shared_ptr<api::StorageMessage>& msg,
- Operation::SP& operation)
+Distributor::handleReply(const std::shared_ptr<api::StorageReply>& reply)
{
- return _externalOperationHandler.handleMessage(msg, operation);
+ return _stripe->handleReply(reply);
}
bool
Distributor::handleMessage(const std::shared_ptr<api::StorageMessage>& msg)
{
- if (msg->getType().isReply()) {
- auto reply = std::dynamic_pointer_cast<api::StorageReply>(msg);
- if (handleReply(reply)) {
- return true;
- }
- }
-
- if (msg->callHandler(_bucketDBUpdater, msg)) {
- return true;
- }
-
- Operation::SP operation;
- if (generateOperation(msg, operation)) {
- if (operation.get()) {
- _operationOwner.start(operation, msg->getPriority());
- }
- return true;
- }
-
- return false;
+ return _stripe->handleMessage(msg);
}
const lib::ClusterStateBundle&
Distributor::getClusterStateBundle() const
{
- return _clusterStateBundle;
+ // TODO STRIPE must offer a single unifying state across stripes
+ return _stripe->getClusterStateBundle();
}
void
Distributor::enableClusterStateBundle(const lib::ClusterStateBundle& state)
{
- lib::ClusterStateBundle oldState = _clusterStateBundle;
- _clusterStateBundle = state;
- propagateClusterStates();
-
- lib::Node myNode(lib::NodeType::DISTRIBUTOR, _component.getIndex());
- const auto &baselineState = *_clusterStateBundle.getBaselineClusterState();
-
- if (!_doneInitializing &&
- baselineState.getNodeState(myNode).getState() == lib::State::UP)
- {
- _doneInitializing = true;
- _doneInitializeHandler.notifyDoneInitializing();
- }
- enterRecoveryMode();
-
- // Clear all active messages on nodes that are down.
- const uint16_t old_node_count = oldState.getBaselineClusterState()->getNodeCount(lib::NodeType::STORAGE);
- const uint16_t new_node_count = baselineState.getNodeCount(lib::NodeType::STORAGE);
- for (uint16_t i = 0; i < std::max(old_node_count, new_node_count); ++i) {
- const auto& node_state = baselineState.getNodeState(lib::Node(lib::NodeType::STORAGE, i)).getState();
- if (!node_state.oneOf(getStorageNodeUpStates())) {
- std::vector<uint64_t> msgIds = _pendingMessageTracker.clearMessagesForNode(i);
- LOG(debug, "Node %u is down, clearing %zu pending maintenance operations", i, msgIds.size());
-
- for (uint32_t j = 0; j < msgIds.size(); ++j) {
- _maintenanceOperationOwner.erase(msgIds[j]);
- }
- }
- }
-
- if (_bucketDBUpdater.bucketOwnershipHasChanged()) {
- using TimePoint = OwnershipTransferSafeTimePointCalculator::TimePoint;
- // Note: this assumes that std::chrono::system_clock and the framework
- // system clock have the same epoch, which should be a reasonable
- // assumption.
- const auto now = TimePoint(std::chrono::milliseconds(
- _component.getClock().getTimeInMillis().getTime()));
- _externalOperationHandler.rejectFeedBeforeTimeReached(
- _ownershipSafeTimeCalc->safeTimePoint(now));
- }
+ // TODO STRIPE make test injection/force-function
+ _stripe->enableClusterStateBundle(state);
}
-OperationRoutingSnapshot Distributor::read_snapshot_for_bucket(const document::Bucket& bucket) const {
- return _bucketDBUpdater.read_snapshot_for_bucket(bucket);
+OperationRoutingSnapshot Distributor::read_snapshot_for_bucket(const document::Bucket&) const {
+ abort();
}
void
Distributor::notifyDistributionChangeEnabled()
{
- LOG(debug, "Pending cluster state for distribution change has been enabled");
- // Trigger a re-scan of bucket database, just like we do when a new cluster
- // state has been enabled.
- enterRecoveryMode();
-}
-
-void
-Distributor::enterRecoveryMode()
-{
- LOG(debug, "Entering recovery mode");
- _schedulingMode = MaintenanceScheduler::RECOVERY_SCHEDULING_MODE;
- _scanner->reset();
- _bucketDBMetricUpdater.reset();
- // TODO reset _bucketDbStats?
- invalidate_bucket_spaces_stats();
-
- _recoveryTimeStarted = framework::MilliSecTimer(_component.getClock());
-}
-
-void
-Distributor::leaveRecoveryMode()
-{
- if (isInRecoveryMode()) {
- LOG(debug, "Leaving recovery mode");
- _metrics->recoveryModeTime.addValue(
- _recoveryTimeStarted.getElapsedTimeAsDouble());
- if (_doneInitializing) {
- _must_send_updated_host_info = true;
- }
- }
- _schedulingMode = MaintenanceScheduler::NORMAL_SCHEDULING_MODE;
-}
-
-template <typename NodeFunctor>
-void Distributor::for_each_available_content_node_in(const lib::ClusterState& state, NodeFunctor&& func) {
- const auto node_count = state.getNodeCount(lib::NodeType::STORAGE);
- for (uint16_t i = 0; i < node_count; ++i) {
- lib::Node node(lib::NodeType::STORAGE, i);
- if (state.getNodeState(node).getState().oneOf("uir")) {
- func(node);
- }
- }
-}
-
-BucketSpacesStatsProvider::BucketSpacesStats Distributor::make_invalid_stats_per_configured_space() const {
- BucketSpacesStatsProvider::BucketSpacesStats invalid_space_stats;
- for (auto& space : *_bucketSpaceRepo) {
- invalid_space_stats.emplace(document::FixedBucketSpaces::to_string(space.first),
- BucketSpaceStats::make_invalid());
- }
- return invalid_space_stats;
-}
-
-void Distributor::invalidate_bucket_spaces_stats() {
- std::lock_guard guard(_metricLock);
- _bucketSpacesStats = BucketSpacesStatsProvider::PerNodeBucketSpacesStats();
- auto invalid_space_stats = make_invalid_stats_per_configured_space();
-
- const auto& baseline = *_clusterStateBundle.getBaselineClusterState();
- for_each_available_content_node_in(baseline, [this, &invalid_space_stats](const lib::Node& node) {
- _bucketSpacesStats[node.getIndex()] = invalid_space_stats;
- });
+ _stripe->notifyDistributionChangeEnabled();
}
void
Distributor::storageDistributionChanged()
{
- if (!_distribution.get()
- || *_component.getDistribution() != *_distribution)
- {
- LOG(debug,
- "Distribution changed to %s, must refetch bucket information",
- _component.getDistribution()->toString().c_str());
-
- // FIXME this is not thread safe
- _nextDistribution = _component.getDistribution();
- } else {
- LOG(debug,
- "Got distribution change, but the distribution %s was the same as "
- "before: %s",
- _component.getDistribution()->toString().c_str(),
- _distribution->toString().c_str());
- }
+ // May happen from any thread.
+ _stripe->storageDistributionChanged();
}
void
Distributor::recheckBucketInfo(uint16_t nodeIdx, const document::Bucket &bucket) {
- _bucketDBUpdater.recheckBucketInfo(nodeIdx, bucket);
+ bucket_db_updater().recheckBucketInfo(nodeIdx, bucket);
}
namespace {
-class MaintenanceChecker : public PendingMessageTracker::Checker
-{
-public:
- bool found;
-
- MaintenanceChecker() : found(false) {};
-
- bool check(uint32_t msgType, uint16_t node, uint8_t pri) override {
- (void) node;
- (void) pri;
- for (uint32_t i = 0;
- IdealStateOperation::MAINTENANCE_MESSAGE_TYPES[i] != 0;
- ++i)
- {
- if (msgType == IdealStateOperation::MAINTENANCE_MESSAGE_TYPES[i]) {
- found = true;
- return false;
- }
- }
- return true;
- }
-};
-
class SplitChecker : public PendingMessageTracker::Checker
{
public:
@@ -545,64 +345,31 @@ public:
}
void
-Distributor::checkBucketForSplit(document::BucketSpace bucketSpace,
- const BucketDatabase::Entry& e,
- uint8_t priority)
+Distributor::checkBucketForSplit(document::BucketSpace,
+ const BucketDatabase::Entry&,
+ uint8_t)
{
- if (!getConfig().doInlineSplit()) {
- return;
- }
-
- // Verify that there are no existing pending splits at the
- // appropriate priority.
- SplitChecker checker(priority);
- for (uint32_t i = 0; i < e->getNodeCount(); ++i) {
- _pendingMessageTracker.checkPendingMessages(e->getNodeRef(i).getNode(),
- document::Bucket(bucketSpace, e.getBucketId()),
- checker);
- if (checker.found) {
- return;
- }
- }
-
- Operation::SP operation =
- _idealStateManager.generateInterceptingSplit(bucketSpace, e, priority);
-
- if (operation.get()) {
- _maintenanceOperationOwner.start(operation, priority);
- }
+ assert(false);
}
void
Distributor::enableNextDistribution()
{
- if (_nextDistribution.get()) {
- _distribution = _nextDistribution;
- propagateDefaultDistribution(_distribution);
- _nextDistribution = std::shared_ptr<lib::Distribution>();
- _bucketDBUpdater.storageDistributionChanged();
- }
+ _stripe->enableNextDistribution();
}
+// TODO STRIPE only used by tests to directly inject new distribution config
void
Distributor::propagateDefaultDistribution(
std::shared_ptr<const lib::Distribution> distribution)
{
- auto global_distr = GlobalBucketSpaceDistributionConverter::convert_to_global(*distribution);
- for (auto* repo : {_bucketSpaceRepo.get(), _readOnlyBucketSpaceRepo.get()}) {
- repo->get(document::FixedBucketSpaces::default_space()).setDistribution(distribution);
- repo->get(document::FixedBucketSpaces::global_space()).setDistribution(global_distr);
- }
+ _stripe->propagateDefaultDistribution(std::move(distribution));
}
void
Distributor::propagateClusterStates()
{
- for (auto* repo : {_bucketSpaceRepo.get(), _readOnlyBucketSpaceRepo.get()}) {
- for (auto& iter : *repo) {
- iter.second->setClusterState(_clusterStateBundle.getDerivedClusterState(iter.first));
- }
- }
+ assert(false);
}
void
@@ -617,378 +384,99 @@ Distributor::workWasDone() const noexcept
return !_tickResult.waitWanted();
}
-namespace {
-
-bool is_client_request(const api::StorageMessage& msg) noexcept {
- // Despite having been converted to StorageAPI messages, the following
- // set of messages are never sent to the distributor by other processes
- // than clients.
- switch (msg.getType().getId()) {
- case api::MessageType::GET_ID:
- case api::MessageType::PUT_ID:
- case api::MessageType::REMOVE_ID:
- case api::MessageType::VISITOR_CREATE_ID:
- case api::MessageType::VISITOR_DESTROY_ID:
- case api::MessageType::GETBUCKETLIST_ID:
- case api::MessageType::STATBUCKET_ID:
- case api::MessageType::UPDATE_ID:
- case api::MessageType::REMOVELOCATION_ID:
- return true;
- default:
- return false;
- }
-}
-
-}
-
-void Distributor::handle_or_propagate_message(const std::shared_ptr<api::StorageMessage>& msg) {
- if (!handleMessage(msg)) {
- MBUS_TRACE(msg->getTrace(), 9, "Distributor: Not handling it. Sending further down.");
- sendDown(msg);
- }
-}
-
-void Distributor::startExternalOperations() {
- for (auto& msg : _fetchedMessages) {
- if (is_client_request(*msg)) {
- MBUS_TRACE(msg->getTrace(), 9, "Distributor: adding to client request priority queue");
- _client_request_priority_queue.emplace(std::move(msg));
- } else {
- MBUS_TRACE(msg->getTrace(), 9, "Distributor: Grabbed from queue to be processed.");
- handle_or_propagate_message(msg);
- }
- }
-
- const bool start_single_client_request = !_client_request_priority_queue.empty();
- if (start_single_client_request) {
- const auto& msg = _client_request_priority_queue.top();
- MBUS_TRACE(msg->getTrace(), 9, "Distributor: Grabbed from "
- "client request priority queue to be processed.");
- handle_or_propagate_message(msg);
- _client_request_priority_queue.pop();
- }
-
- if (!_fetchedMessages.empty() || start_single_client_request) {
- signalWorkWasDone();
- }
- _fetchedMessages.clear();
-}
-
std::unordered_map<uint16_t, uint32_t>
Distributor::getMinReplica() const
{
- std::lock_guard guard(_metricLock);
- return _bucketDbStats._minBucketReplica;
+ // TODO STRIPE merged snapshot from all stripes
+ return _stripe->getMinReplica();
}
BucketSpacesStatsProvider::PerNodeBucketSpacesStats
Distributor::getBucketSpacesStats() const
{
- std::lock_guard guard(_metricLock);
- return _bucketSpacesStats;
-}
-
-void
-Distributor::propagateInternalScanMetricsToExternal()
-{
- std::lock_guard guard(_metricLock);
-
- // All shared values are written when _metricLock is held, so no races.
- if (_bucketDBMetricUpdater.hasCompletedRound()) {
- _bucketDbStats.propagateMetrics(_idealStateManager.getMetrics(),
- getMetrics());
- _idealStateManager.getMetrics().setPendingOperations(
- _maintenanceStats.global.pending);
- }
-}
-
-namespace {
-
-BucketSpaceStats
-toBucketSpaceStats(const NodeMaintenanceStats &stats)
-{
- return BucketSpaceStats(stats.total, stats.syncing + stats.copyingIn);
-}
-
-using PerNodeBucketSpacesStats = BucketSpacesStatsProvider::PerNodeBucketSpacesStats;
-
-PerNodeBucketSpacesStats
-toBucketSpacesStats(const NodeMaintenanceStatsTracker &maintenanceStats)
-{
- PerNodeBucketSpacesStats result;
- for (const auto &nodeEntry : maintenanceStats.perNodeStats()) {
- for (const auto &bucketSpaceEntry : nodeEntry.second) {
- auto bucketSpace = document::FixedBucketSpaces::to_string(bucketSpaceEntry.first);
- result[nodeEntry.first][bucketSpace] = toBucketSpaceStats(bucketSpaceEntry.second);
- }
- }
- return result;
-}
-
-size_t spaces_with_merges_pending(const PerNodeBucketSpacesStats& stats) {
- std::unordered_set<document::BucketSpace, document::BucketSpace::hash> spaces_with_pending;
- for (auto& node : stats) {
- for (auto& space : node.second) {
- if (space.second.valid() && space.second.bucketsPending() != 0) {
- // TODO avoid bucket space string roundtrip
- spaces_with_pending.emplace(document::FixedBucketSpaces::from_string(space.first));
- }
- }
- }
- return spaces_with_pending.size();
-}
-
-// TODO should we also trigger on !pending --> pending edge?
-bool merge_no_longer_pending_edge(const PerNodeBucketSpacesStats& prev_stats,
- const PerNodeBucketSpacesStats& curr_stats) {
- const auto prev_pending = spaces_with_merges_pending(prev_stats);
- const auto curr_pending = spaces_with_merges_pending(curr_stats);
- return curr_pending < prev_pending;
+ // TODO STRIPE merged snapshot from all stripes
+ return _stripe->getBucketSpacesStats();
}
+SimpleMaintenanceScanner::PendingMaintenanceStats
+Distributor::pending_maintenance_stats() const {
+ // TODO STRIPE merged snapshot from all stripes
+ return _stripe->pending_maintenance_stats();
}
void
-Distributor::updateInternalMetricsForCompletedScan()
+Distributor::propagateInternalScanMetricsToExternal()
{
- std::lock_guard guard(_metricLock);
-
- _bucketDBMetricUpdater.completeRound();
- _bucketDbStats = _bucketDBMetricUpdater.getLastCompleteStats();
- _maintenanceStats = _scanner->getPendingMaintenanceStats();
- auto new_space_stats = toBucketSpacesStats(_maintenanceStats.perNodeStats);
- if (merge_no_longer_pending_edge(_bucketSpacesStats, new_space_stats)) {
- _must_send_updated_host_info = true;
- }
- _bucketSpacesStats = std::move(new_space_stats);
- maybe_update_bucket_db_memory_usage_stats();
+ _stripe->propagateInternalScanMetricsToExternal();
}
void Distributor::maybe_update_bucket_db_memory_usage_stats() {
- auto now = _component.getClock().getMonotonicTime();
- if ((now - _last_db_memory_sample_time_point) > _db_memory_sample_interval) {
- for (auto& space : *_bucketSpaceRepo) {
- _bucketDBMetricUpdater.update_db_memory_usage(space.second->getBucketDatabase().memory_usage(), true);
- }
- for (auto& space : *_readOnlyBucketSpaceRepo) {
- _bucketDBMetricUpdater.update_db_memory_usage(space.second->getBucketDatabase().memory_usage(), false);
- }
- _last_db_memory_sample_time_point = now;
- } else {
- // Reuse previous memory statistics instead of sampling new.
- _bucketDBMetricUpdater.update_db_memory_usage(_bucketDbStats._mutable_db_mem_usage, true);
- _bucketDBMetricUpdater.update_db_memory_usage(_bucketDbStats._read_only_db_mem_usage, false);
- }
+ assert(false);
}
void
Distributor::scanAllBuckets()
{
- enterRecoveryMode();
- while (!scanNextBucket().isDone()) {}
-}
-
-MaintenanceScanner::ScanResult
-Distributor::scanNextBucket()
-{
- MaintenanceScanner::ScanResult scanResult(_scanner->scanNext());
- if (scanResult.isDone()) {
- updateInternalMetricsForCompletedScan();
- leaveRecoveryMode();
- send_updated_host_info_if_required();
- _scanner->reset();
- } else {
- const auto &distribution(_bucketSpaceRepo->get(scanResult.getBucketSpace()).getDistribution());
- _bucketDBMetricUpdater.visit(
- scanResult.getEntry(),
- distribution.getRedundancy());
- }
- return scanResult;
-}
-
-void Distributor::send_updated_host_info_if_required() {
- if (_must_send_updated_host_info) {
- _component.getStateUpdater().immediately_send_get_node_state_replies();
- _must_send_updated_host_info = false;
- }
-}
-
-void
-Distributor::startNextMaintenanceOperation()
-{
- _throttlingStarter->setMaxPendingRange(getConfig().getMinPendingMaintenanceOps(),
- getConfig().getMaxPendingMaintenanceOps());
- _scheduler->tick(_schedulingMode);
+ _stripe->scanAllBuckets();
}
framework::ThreadWaitInfo
-Distributor::doCriticalTick(framework::ThreadIndex)
+Distributor::doCriticalTick(framework::ThreadIndex idx)
{
_tickResult = framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN;
- enableNextDistribution();
+ // Propagates any new configs down to stripe(s)
enableNextConfig();
- fetchStatusRequests();
- fetchExternalMessages();
+ _stripe->doCriticalTick(idx);
+ _tickResult.merge(_stripe->_tickResult);
return _tickResult;
}
framework::ThreadWaitInfo
-Distributor::doNonCriticalTick(framework::ThreadIndex)
+Distributor::doNonCriticalTick(framework::ThreadIndex idx)
{
- _tickResult = framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN;
- handleStatusRequests();
- startExternalOperations();
- if (initializing()) {
- _bucketDBUpdater.resendDelayedMessages();
- return _tickResult;
- }
- // Ordering note: since maintenance inhibiting checks whether startExternalOperations()
- // did any useful work with incoming data, this check must be performed _after_ the call.
- if (!should_inhibit_current_maintenance_scan_tick()) {
- scanNextBucket();
- startNextMaintenanceOperation();
- if (isInRecoveryMode()) {
- signalWorkWasDone();
- }
- mark_maintenance_tick_as_no_longer_inhibited();
- _bucketDBUpdater.resendDelayedMessages();
- } else {
- mark_current_maintenance_tick_as_inhibited();
- }
+ // TODO STRIPE stripes need their own thread loops!
+ _stripe->doNonCriticalTick(idx);
+ _tickResult = _stripe->_tickResult;
return _tickResult;
}
-bool Distributor::should_inhibit_current_maintenance_scan_tick() const noexcept {
- return (workWasDone() && (_inhibited_maintenance_tick_count
- < getConfig().max_consecutively_inhibited_maintenance_ticks()));
-}
-
-void Distributor::mark_current_maintenance_tick_as_inhibited() noexcept {
- ++_inhibited_maintenance_tick_count;
-}
-
-void Distributor::mark_maintenance_tick_as_no_longer_inhibited() noexcept {
- _inhibited_maintenance_tick_count = 0;
-}
-
void
Distributor::enableNextConfig()
{
_hostInfoReporter.enableReporting(getConfig().getEnableHostInfoReporting());
- _bucketDBMetricUpdater.setMinimumReplicaCountingMode(getConfig().getMinimumReplicaCountingMode());
- _ownershipSafeTimeCalc->setMaxClusterClockSkew(getConfig().getMaxClusterClockSkew());
- _pendingMessageTracker.setNodeBusyDuration(getConfig().getInhibitMergesOnBusyNodeDuration());
- _bucketDBUpdater.set_stale_reads_enabled(getConfig().allowStaleReadsDuringClusterStateTransitions());
- _externalOperationHandler.set_concurrent_gets_enabled(
- getConfig().allowStaleReadsDuringClusterStateTransitions());
- _externalOperationHandler.set_use_weak_internal_read_consistency_for_gets(
- getConfig().use_weak_internal_read_consistency_for_client_gets());
-}
-
-void
-Distributor::fetchStatusRequests()
-{
- if (_fetchedStatusRequests.empty()) {
- _fetchedStatusRequests.swap(_statusToDo);
- }
-}
-
-void
-Distributor::fetchExternalMessages()
-{
- assert(_fetchedMessages.empty());
- _fetchedMessages.swap(_messageQueue);
+ _stripe->enableNextConfig(); // TODO STRIPE avoid redundant call
}
void
Distributor::handleStatusRequests()
{
- uint32_t sz = _fetchedStatusRequests.size();
- for (uint32_t i = 0; i < sz; ++i) {
- Status& s(*_fetchedStatusRequests[i]);
- s.getReporter().reportStatus(s.getStream(), s.getPath());
- s.notifyCompleted();
- }
- _fetchedStatusRequests.clear();
- if (sz > 0) {
- signalWorkWasDone();
- }
+ assert(false);
}
vespalib::string
Distributor::getReportContentType(const framework::HttpUrlPath& path) const
{
- if (path.hasAttribute("page")) {
- if (path.getAttribute("page") == "buckets") {
- return "text/html";
- } else {
- return "application/xml";
- }
- } else {
- return "text/html";
- }
+ return _stripe->getReportContentType(path);
}
std::string
Distributor::getActiveIdealStateOperations() const
{
- return _maintenanceOperationOwner.toString();
-}
-
-std::string
-Distributor::getActiveOperations() const
-{
- return _operationOwner.toString();
+ return _stripe->getActiveIdealStateOperations();
}
bool
Distributor::reportStatus(std::ostream& out,
const framework::HttpUrlPath& path) const
{
- if (!path.hasAttribute("page") || path.getAttribute("page") == "buckets") {
- framework::PartlyHtmlStatusReporter htmlReporter(*this);
- htmlReporter.reportHtmlHeader(out, path);
- if (!path.hasAttribute("page")) {
- out << "<a href=\"?page=pending\">Count of pending messages to "
- << "storage nodes</a><br><a href=\"?page=maintenance&show=50\">"
- << "List maintenance queue (adjust show parameter to see more "
- << "operations, -1 for all)</a><br>\n<a href=\"?page=buckets\">"
- << "List all buckets, highlight non-ideal state</a><br>\n";
- } else {
- const_cast<IdealStateManager&>(_idealStateManager)
- .getBucketStatus(out);
- }
- htmlReporter.reportHtmlFooter(out, path);
- } else {
- framework::PartlyXmlStatusReporter xmlReporter(*this, out, path);
- using namespace vespalib::xml;
- std::string page(path.getAttribute("page"));
-
- if (page == "pending") {
- xmlReporter << XmlTag("pending")
- << XmlAttribute("externalload", _operationOwner.size())
- << XmlAttribute("maintenance",
- _maintenanceOperationOwner.size())
- << XmlEndTag();
- } else if (page == "maintenance") {
- // Need new page
- }
- }
-
- return true;
+ return _stripe->reportStatus(out, path);
}
bool
Distributor::handleStatusRequest(const DelegatedStatusRequest& request) const
{
- auto wrappedRequest = std::make_shared<Status>(request);
- {
- framework::TickingLockGuard guard(_threadPool.freezeCriticalTicks());
- _statusToDo.push_back(wrappedRequest);
- guard.broadcast();
- }
- wrappedRequest->waitForCompletion();
- return true;
+ // TODO STRIPE need to aggregate status responses _across_ stripes..!
+ return _stripe->handleStatusRequest(request);
}
}
diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h
index 05340d7dcd2..c758dbd75e2 100644
--- a/storage/src/vespa/storage/distributor/distributor.h
+++ b/storage/src/vespa/storage/distributor/distributor.h
@@ -34,6 +34,8 @@ namespace storage::distributor {
class BlockingOperationStarter;
class BucketPriorityDatabase;
class DistributorBucketSpaceRepo;
+class DistributorStatus;
+class DistributorStripe;
class OperationSequencer;
class OwnershipTransferSafeTimePointCalculator;
class SimpleMaintenanceScanner;
@@ -71,17 +73,13 @@ public:
void send_up_without_tracking(const std::shared_ptr<api::StorageMessage>&) override;
ChainedMessageSender& getMessageSender() override {
- return (_messageSender == 0 ? *this : *_messageSender);
+ abort(); // TODO STRIPE
}
DistributorMetricSet& getMetrics() override { return *_metrics; }
- PendingMessageTracker& getPendingMessageTracker() override {
- return _pendingMessageTracker;
- }
-
const OperationSequencer& operation_sequencer() const noexcept override {
- return *_operation_sequencer;
+ abort(); // TODO STRIPE
}
const lib::ClusterState* pendingClusterStateOrNull(const document::BucketSpace&) const override;
@@ -112,10 +110,7 @@ public:
bool handleStatusRequest(const DelegatedStatusRequest& request) const override;
- uint32_t pendingMaintenanceCount() const;
-
std::string getActiveIdealStateOperations() const;
- std::string getActiveOperations() const;
virtual framework::ThreadWaitInfo doCriticalTick(framework::ThreadIndex) override;
virtual framework::ThreadWaitInfo doNonCriticalTick(framework::ThreadIndex) override;
@@ -135,7 +130,7 @@ public:
* storage nodes to be up.
*/
const char* getStorageNodeUpStates() const override {
- return _initializingIsUp ? "uri" : "ur";
+ return "uri";
}
/**
@@ -145,42 +140,33 @@ public:
*/
void handleCompletedMerge(const std::shared_ptr<api::MergeBucketReply>& reply) override;
-
- bool initializing() const override {
- return !_doneInitializing;
- }
+ bool initializing() const override;
- const DistributorConfiguration& getConfig() const override {
- return _component.getTotalDistributorConfig();
- }
+ const DistributorConfiguration& getConfig() const override;
- bool isInRecoveryMode() const {
- return _schedulingMode == MaintenanceScheduler::RECOVERY_SCHEDULING_MODE;
- }
+ bool isInRecoveryMode() const noexcept;
int getDistributorIndex() const override;
+ PendingMessageTracker& getPendingMessageTracker() override;
const PendingMessageTracker& getPendingMessageTracker() const override;
+
+ DistributorBucketSpaceRepo& getBucketSpaceRepo() noexcept;
+ const DistributorBucketSpaceRepo& getBucketSpaceRepo() const noexcept;
+ DistributorBucketSpaceRepo& getReadOnlyBucketSpaceRepo() noexcept;
+ const DistributorBucketSpaceRepo& getReadyOnlyBucketSpaceRepo() const noexcept;
+
+ storage::distributor::DistributorComponent& distributor_component() noexcept;
+
void sendCommand(const std::shared_ptr<api::StorageCommand>&) override;
void sendReply(const std::shared_ptr<api::StorageReply>&) override;
const BucketGcTimeCalculator::BucketIdHasher&
getBucketIdHasher() const override {
- return *_bucketIdHasher;
- }
-
- DistributorBucketSpaceRepo &getBucketSpaceRepo() noexcept { return *_bucketSpaceRepo; }
- const DistributorBucketSpaceRepo &getBucketSpaceRepo() const noexcept { return *_bucketSpaceRepo; }
-
- DistributorBucketSpaceRepo& getReadOnlyBucketSpaceRepo() noexcept {
- return *_readOnlyBucketSpaceRepo;
- }
- const DistributorBucketSpaceRepo& getReadyOnlyBucketSpaceRepo() const noexcept {
- return *_readOnlyBucketSpaceRepo;
+ abort(); // TODO STRIPE
}
OperationRoutingSnapshot read_snapshot_for_bucket(const document::Bucket&) const override;
- class Status;
class MetricUpdateHook : public framework::MetricUpdateHook
{
public:
@@ -197,9 +183,7 @@ public:
Distributor& _self;
};
- std::chrono::steady_clock::duration db_memory_sample_interval() const noexcept {
- return _db_memory_sample_interval;
- }
+ std::chrono::steady_clock::duration db_memory_sample_interval() const noexcept;
private:
friend struct DistributorTest;
@@ -216,59 +200,43 @@ private:
void handle_or_propagate_message(const std::shared_ptr<api::StorageMessage>& msg);
void startExternalOperations();
+ // Accessors used by tests
+ BucketDBUpdater& bucket_db_updater();
+ const BucketDBUpdater& bucket_db_updater() const;
+ IdealStateManager& ideal_state_manager();
+ const IdealStateManager& ideal_state_manager() const;
+ ExternalOperationHandler& external_operation_handler();
+ const ExternalOperationHandler& external_operation_handler() const;
+
+ BucketDBMetricUpdater& bucket_db_metric_updater() const noexcept;
+
/**
* Return a copy of the latest min replica data, see MinReplicaProvider.
*/
std::unordered_map<uint16_t, uint32_t> getMinReplica() const override;
PerNodeBucketSpacesStats getBucketSpacesStats() const override;
+ SimpleMaintenanceScanner::PendingMaintenanceStats pending_maintenance_stats() const;
/**
* Atomically publish internal metrics to external ideal state metrics.
* Takes metric lock.
*/
void propagateInternalScanMetricsToExternal();
- /**
- * Atomically updates internal metrics (not externally visible metrics;
- * these are not changed until a snapshot triggers
- * propagateIdealStateMetrics()).
- *
- * Takes metric lock.
- */
- void updateInternalMetricsForCompletedScan();
void maybe_update_bucket_db_memory_usage_stats();
void scanAllBuckets();
- MaintenanceScanner::ScanResult scanNextBucket();
- bool should_inhibit_current_maintenance_scan_tick() const noexcept;
- void mark_current_maintenance_tick_as_inhibited() noexcept;
- void mark_maintenance_tick_as_no_longer_inhibited() noexcept;
void enableNextConfig();
- void fetchStatusRequests();
- void fetchExternalMessages();
- void startNextMaintenanceOperation();
void signalWorkWasDone();
bool workWasDone() const noexcept;
- void enterRecoveryMode();
- void leaveRecoveryMode();
-
- // Tries to generate an operation from the given message. Returns true
- // if we either returned an operation, or the message was otherwise handled
- // (for instance, wrong distribution).
- bool generateOperation(const std::shared_ptr<api::StorageMessage>& msg,
- Operation::SP& operation);
-
void enableNextDistribution();
void propagateDefaultDistribution(std::shared_ptr<const lib::Distribution>);
void propagateClusterStates();
- BucketSpacesStatsProvider::BucketSpacesStats make_invalid_stats_per_configured_space() const;
- template <typename NodeFunctor>
- void for_each_available_content_node_in(const lib::ClusterState&, NodeFunctor&&);
- void invalidate_bucket_spaces_stats();
- void send_updated_host_info_if_required();
-
- lib::ClusterStateBundle _clusterStateBundle;
+ std::shared_ptr<DistributorMetricSet> _metrics;
+ ChainedMessageSender* _messageSender;
+ // TODO STRIPE multiple stripes...! This is for proof of concept of wiring.
+ std::unique_ptr<DistributorStripe> _stripe;
std::unique_ptr<DistributorBucketSpaceRepo> _bucketSpaceRepo;
// Read-only bucket space repo with DBs that only contain buckets transiently
@@ -276,74 +244,18 @@ private:
// and the DBs are empty during non-transition phases.
std::unique_ptr<DistributorBucketSpaceRepo> _readOnlyBucketSpaceRepo;
storage::distributor::DistributorComponent _component;
- std::shared_ptr<DistributorMetricSet> _metrics;
-
- OperationOwner _operationOwner;
- OperationOwner _maintenanceOperationOwner;
- std::unique_ptr<OperationSequencer> _operation_sequencer;
- PendingMessageTracker _pendingMessageTracker;
- BucketDBUpdater _bucketDBUpdater;
StatusReporterDelegate _distributorStatusDelegate;
- StatusReporterDelegate _bucketDBStatusDelegate;
- IdealStateManager _idealStateManager;
- ChainedMessageSender* _messageSender;
- ExternalOperationHandler _externalOperationHandler;
-
- std::shared_ptr<lib::Distribution> _distribution;
- std::shared_ptr<lib::Distribution> _nextDistribution;
- using MessageQueue = std::vector<std::shared_ptr<api::StorageMessage>>;
- struct IndirectHigherPriority {
- template <typename Lhs, typename Rhs>
- bool operator()(const Lhs& lhs, const Rhs& rhs) const noexcept {
- return lhs->getPriority() > rhs->getPriority();
- }
- };
- using ClientRequestPriorityQueue = std::priority_queue<
- std::shared_ptr<api::StorageMessage>,
- std::vector<std::shared_ptr<api::StorageMessage>>,
- IndirectHigherPriority
- >;
- MessageQueue _messageQueue;
- ClientRequestPriorityQueue _client_request_priority_queue;
- MessageQueue _fetchedMessages;
framework::TickingThreadPool& _threadPool;
- mutable std::vector<std::shared_ptr<Status>> _statusToDo;
- mutable std::vector<std::shared_ptr<Status>> _fetchedStatusRequests;
+ mutable std::vector<std::shared_ptr<DistributorStatus>> _statusToDo;
+ mutable std::vector<std::shared_ptr<DistributorStatus>> _fetchedStatusRequests;
- bool _initializingIsUp;
-
- DoneInitializeHandler& _doneInitializeHandler;
- bool _doneInitializing;
-
- std::unique_ptr<BucketPriorityDatabase> _bucketPriorityDb;
- std::unique_ptr<SimpleMaintenanceScanner> _scanner;
- std::unique_ptr<ThrottlingOperationStarter> _throttlingStarter;
- std::unique_ptr<BlockingOperationStarter> _blockingStarter;
- std::unique_ptr<MaintenanceScheduler> _scheduler;
- MaintenanceScheduler::SchedulingMode _schedulingMode;
- framework::MilliSecTimer _recoveryTimeStarted;
framework::ThreadWaitInfo _tickResult;
- BucketDBMetricUpdater _bucketDBMetricUpdater;
- std::unique_ptr<BucketGcTimeCalculator::BucketIdHasher> _bucketIdHasher;
MetricUpdateHook _metricUpdateHook;
mutable std::mutex _metricLock;
- /**
- * Maintenance stats for last completed database scan iteration.
- * Access must be protected by _metricLock as it is read by metric
- * manager thread but written by distributor thread.
- */
- SimpleMaintenanceScanner::PendingMaintenanceStats _maintenanceStats;
- BucketSpacesStatsProvider::PerNodeBucketSpacesStats _bucketSpacesStats;
- BucketDBMetricUpdater::Stats _bucketDbStats;
DistributorHostInfoReporter _hostInfoReporter;
- std::unique_ptr<OwnershipTransferSafeTimePointCalculator> _ownershipSafeTimeCalc;
- std::chrono::steady_clock::duration _db_memory_sample_interval;
- std::chrono::steady_clock::time_point _last_db_memory_sample_time_point;
- size_t _inhibited_maintenance_tick_count;
- bool _must_send_updated_host_info;
};
}
diff --git a/storage/src/vespa/storage/distributor/distributor_status.cpp b/storage/src/vespa/storage/distributor/distributor_status.cpp
new file mode 100644
index 00000000000..811608822d8
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/distributor_status.cpp
@@ -0,0 +1,32 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "distributor_status.h"
+#include "delegatedstatusrequest.h"
+
+namespace storage::distributor {
+
+std::ostream& DistributorStatus::getStream() {
+ return _request.outputStream;
+}
+const framework::HttpUrlPath& DistributorStatus::getPath() const {
+ return _request.path;
+}
+const framework::StatusReporter& DistributorStatus::getReporter() const {
+ return _request.reporter;
+}
+
+void DistributorStatus::notifyCompleted() {
+ {
+ std::lock_guard guard(_lock);
+ _done = true;
+ }
+ _cond.notify_all();
+}
+void DistributorStatus::waitForCompletion() {
+ std::unique_lock guard(_lock);
+ while (!_done) {
+ _cond.wait(guard);
+ }
+}
+
+}
diff --git a/storage/src/vespa/storage/distributor/distributor_status.h b/storage/src/vespa/storage/distributor/distributor_status.h
new file mode 100644
index 00000000000..6783789949b
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/distributor_status.h
@@ -0,0 +1,41 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <condition_variable>
+#include <iosfwd>
+#include <mutex>
+
+namespace storage::framework {
+class HttpUrlPath;
+class StatusReporter;
+}
+
+namespace storage::distributor {
+
+class DelegatedStatusRequest;
+
+// TODO STRIPE description
+class DistributorStatus {
+ const DelegatedStatusRequest& _request;
+ std::mutex _lock;
+ std::condition_variable _cond;
+ bool _done;
+
+public:
+ DistributorStatus(const DelegatedStatusRequest& request) noexcept
+ : _request(request),
+ _lock(),
+ _cond(),
+ _done(false)
+ {}
+
+ std::ostream& getStream();
+ const framework::HttpUrlPath& getPath() const;
+ const framework::StatusReporter& getReporter() const;
+
+ void notifyCompleted();
+ void waitForCompletion();
+};
+
+}
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
new file mode 100644
index 00000000000..4671e5ec9ca
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
@@ -0,0 +1,903 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "blockingoperationstarter.h"
+#include "distributor_stripe.h"
+#include "distributor_status.h"
+#include "distributor_bucket_space.h"
+#include "distributormetricsset.h"
+#include "idealstatemetricsset.h"
+#include "operation_sequencer.h"
+#include "ownership_transfer_safe_time_point_calculator.h"
+#include "throttlingoperationstarter.h"
+#include <vespa/document/bucket/fixed_bucket_spaces.h>
+#include <vespa/storage/common/global_bucket_space_distribution_converter.h>
+#include <vespa/storage/common/hostreporter/hostinfo.h>
+#include <vespa/storage/common/node_identity.h>
+#include <vespa/storage/common/nodestateupdater.h>
+#include <vespa/storage/distributor/maintenance/simplebucketprioritydatabase.h>
+#include <vespa/storageframework/generic/status/xmlstatusreporter.h>
+#include <vespa/vdslib/distribution/distribution.h>
+#include <vespa/vespalib/util/memoryusage.h>
+#include <algorithm>
+
+#include <vespa/log/log.h>
+LOG_SETUP(".distributor_stripe");
+
+using namespace std::chrono_literals;
+
+namespace storage::distributor {
+
+/* TODO STRIPE
+ * - need a DistributorComponent per stripe
+ * - or better, remove entirely!
+ * - probably also DistributorInterface since it's used to send
+ * - metrics aggregation
+ */
+DistributorStripe::DistributorStripe(DistributorComponentRegister& compReg,
+ DistributorMetricSet& metrics,
+ const NodeIdentity& node_identity,
+ framework::TickingThreadPool& threadPool,
+ DoneInitializeHandler& doneInitHandler,
+ bool manageActiveBucketCopies,
+ ChainedMessageSender& messageSender)
+ : StorageLink("distributor"),
+ DistributorInterface(),
+ framework::StatusReporter("distributor", "Distributor"),
+ _clusterStateBundle(lib::ClusterState()),
+ _bucketSpaceRepo(std::make_unique<DistributorBucketSpaceRepo>(node_identity.node_index())),
+ _readOnlyBucketSpaceRepo(std::make_unique<DistributorBucketSpaceRepo>(node_identity.node_index())),
+ _component(*this, *_bucketSpaceRepo, *_readOnlyBucketSpaceRepo, compReg, "distributor"),
+ _metrics(metrics),
+ _operationOwner(*this, _component.getClock()),
+ _maintenanceOperationOwner(*this, _component.getClock()),
+ _operation_sequencer(std::make_unique<OperationSequencer>()),
+ _pendingMessageTracker(compReg),
+ _bucketDBUpdater(*this, *_bucketSpaceRepo, *_readOnlyBucketSpaceRepo, *this, compReg),
+ _distributorStatusDelegate(compReg, *this, *this),
+ _bucketDBStatusDelegate(compReg, *this, _bucketDBUpdater),
+ _idealStateManager(*this, *_bucketSpaceRepo, *_readOnlyBucketSpaceRepo, compReg, manageActiveBucketCopies),
+ _messageSender(messageSender),
+ _externalOperationHandler(_component, _component, getMetrics(), getMessageSender(),
+ *_operation_sequencer, *this, _component,
+ _idealStateManager, _operationOwner),
+ _threadPool(threadPool),
+ _doneInitializeHandler(doneInitHandler),
+ _doneInitializing(false),
+ _bucketPriorityDb(std::make_unique<SimpleBucketPriorityDatabase>()),
+ _scanner(std::make_unique<SimpleMaintenanceScanner>(*_bucketPriorityDb, _idealStateManager, *_bucketSpaceRepo)),
+ _throttlingStarter(std::make_unique<ThrottlingOperationStarter>(_maintenanceOperationOwner)),
+ _blockingStarter(std::make_unique<BlockingOperationStarter>(_pendingMessageTracker, *_operation_sequencer,
+ *_throttlingStarter)),
+ _scheduler(std::make_unique<MaintenanceScheduler>(_idealStateManager, *_bucketPriorityDb, *_blockingStarter)),
+ _schedulingMode(MaintenanceScheduler::NORMAL_SCHEDULING_MODE),
+ _recoveryTimeStarted(_component.getClock()),
+ _tickResult(framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN),
+ _bucketIdHasher(std::make_unique<BucketGcTimeCalculator::BucketIdIdentityHasher>()),
+ _metricUpdateHook(*this),
+ _metricLock(),
+ _maintenanceStats(),
+ _bucketSpacesStats(),
+ _bucketDbStats(),
+ _ownershipSafeTimeCalc(std::make_unique<OwnershipTransferSafeTimePointCalculator>(0s)), // Set by config later
+ _db_memory_sample_interval(30s),
+ _last_db_memory_sample_time_point(),
+ _inhibited_maintenance_tick_count(0),
+ _must_send_updated_host_info(false)
+{
+ _bucketDBStatusDelegate.registerStatusPage();
+ propagateDefaultDistribution(_component.getDistribution());
+ propagateClusterStates();
+};
+
+DistributorStripe::~DistributorStripe() = default;
+
+int
+DistributorStripe::getDistributorIndex() const
+{
+ return _component.getIndex();
+}
+
+const PendingMessageTracker&
+DistributorStripe::getPendingMessageTracker() const
+{
+ return _pendingMessageTracker;
+}
+
+const lib::ClusterState*
+DistributorStripe::pendingClusterStateOrNull(const document::BucketSpace& space) const {
+ return _bucketDBUpdater.pendingClusterStateOrNull(space);
+}
+
+void
+DistributorStripe::sendCommand(const std::shared_ptr<api::StorageCommand>& cmd)
+{
+ if (cmd->getType() == api::MessageType::MERGEBUCKET) {
+ api::MergeBucketCommand& merge(static_cast<api::MergeBucketCommand&>(*cmd));
+ _idealStateManager.getMetrics().nodesPerMerge.addValue(merge.getNodes().size());
+ }
+ sendUp(cmd);
+}
+
+void
+DistributorStripe::sendReply(const std::shared_ptr<api::StorageReply>& reply)
+{
+ sendUp(reply);
+}
+
+void
+DistributorStripe::onOpen()
+{
+ LOG(debug, "DistributorStripe::onOpen invoked");
+ if (_component.getDistributorConfig().startDistributorThread) {
+ // TODO STRIPE own thread per stripe!
+ } else {
+ LOG(warning, "Not starting distributor stripe thread as it's not configured to "
+ "run. Unless you are just running a test tool, this is a "
+ "fatal error.");
+ }
+}
+
+void DistributorStripe::send_shutdown_abort_reply(const std::shared_ptr<api::StorageMessage>& msg) {
+ api::StorageReply::UP reply(
+ std::dynamic_pointer_cast<api::StorageCommand>(msg)->makeReply());
+ reply->setResult(api::ReturnCode(api::ReturnCode::ABORTED, "Distributor is shutting down"));
+ sendUp(std::shared_ptr<api::StorageMessage>(reply.release()));
+}
+
+void DistributorStripe::onClose() {
+ for (auto& msg : _messageQueue) {
+ if (!msg->getType().isReply()) {
+ send_shutdown_abort_reply(msg);
+ }
+ }
+ _messageQueue.clear();
+ while (!_client_request_priority_queue.empty()) {
+ send_shutdown_abort_reply(_client_request_priority_queue.top());
+ _client_request_priority_queue.pop();
+ }
+
+ LOG(debug, "DistributorStripe::onClose invoked");
+ _pendingMessageTracker.abort_deferred_tasks();
+ _bucketDBUpdater.flush();
+ _externalOperationHandler.close_pending();
+ _operationOwner.onClose();
+ _maintenanceOperationOwner.onClose();
+}
+
+void DistributorStripe::send_up_without_tracking(const std::shared_ptr<api::StorageMessage>& msg) {
+ _messageSender.sendUp(msg);
+}
+
+void
+DistributorStripe::sendUp(const std::shared_ptr<api::StorageMessage>& msg)
+{
+ _pendingMessageTracker.insert(msg);
+ send_up_without_tracking(msg);
+}
+
+bool
+DistributorStripe::onDown(const std::shared_ptr<api::StorageMessage>& msg)
+{
+ if (_externalOperationHandler.try_handle_message_outside_main_thread(msg)) {
+ return true;
+ }
+ framework::TickingLockGuard guard(_threadPool.freezeCriticalTicks());
+ MBUS_TRACE(msg->getTrace(), 9,
+ "Distributor: Added to message queue. Thread state: "
+ + _threadPool.getStatus());
+ _messageQueue.push_back(msg);
+ guard.broadcast();
+ return true;
+}
+
+void
+DistributorStripe::handleCompletedMerge(
+ const std::shared_ptr<api::MergeBucketReply>& reply)
+{
+ _maintenanceOperationOwner.handleReply(reply);
+}
+
+bool
+DistributorStripe::isMaintenanceReply(const api::StorageReply& reply) const
+{
+ switch (reply.getType().getId()) {
+ case api::MessageType::CREATEBUCKET_REPLY_ID:
+ case api::MessageType::MERGEBUCKET_REPLY_ID:
+ case api::MessageType::DELETEBUCKET_REPLY_ID:
+ case api::MessageType::REQUESTBUCKETINFO_REPLY_ID:
+ case api::MessageType::SPLITBUCKET_REPLY_ID:
+ case api::MessageType::JOINBUCKETS_REPLY_ID:
+ case api::MessageType::SETBUCKETSTATE_REPLY_ID:
+ case api::MessageType::REMOVELOCATION_REPLY_ID:
+ return true;
+ default:
+ return false;
+ }
+}
+
+bool
+DistributorStripe::handleReply(const std::shared_ptr<api::StorageReply>& reply)
+{
+ document::Bucket bucket = _pendingMessageTracker.reply(*reply);
+
+ if (reply->getResult().getResult() == api::ReturnCode::BUCKET_NOT_FOUND &&
+ bucket.getBucketId() != document::BucketId(0) &&
+ reply->getAddress())
+ {
+ recheckBucketInfo(reply->getAddress()->getIndex(), bucket);
+ }
+
+ if (reply->callHandler(_bucketDBUpdater, reply)) {
+ return true;
+ }
+
+ if (_operationOwner.handleReply(reply)) {
+ return true;
+ }
+
+ if (_maintenanceOperationOwner.handleReply(reply)) {
+ _scanner->prioritizeBucket(bucket);
+ return true;
+ }
+
+ // If it's a maintenance operation reply, it's most likely a reply to an
+ // operation whose state was flushed from the distributor when its node
+ // went down in the cluster state. Just swallow the reply to avoid getting
+ // warnings about unhandled messages at the bottom of the link chain.
+ return isMaintenanceReply(*reply);
+}
+
+bool
+DistributorStripe::generateOperation(
+ const std::shared_ptr<api::StorageMessage>& msg,
+ Operation::SP& operation)
+{
+ return _externalOperationHandler.handleMessage(msg, operation);
+}
+
+bool
+DistributorStripe::handleMessage(const std::shared_ptr<api::StorageMessage>& msg)
+{
+ if (msg->getType().isReply()) {
+ auto reply = std::dynamic_pointer_cast<api::StorageReply>(msg);
+ if (handleReply(reply)) {
+ return true;
+ }
+ }
+
+ if (msg->callHandler(_bucketDBUpdater, msg)) {
+ return true;
+ }
+
+ Operation::SP operation;
+ if (generateOperation(msg, operation)) {
+ if (operation.get()) {
+ _operationOwner.start(operation, msg->getPriority());
+ }
+ return true;
+ }
+
+ return false;
+}
+
+const lib::ClusterStateBundle&
+DistributorStripe::getClusterStateBundle() const
+{
+ return _clusterStateBundle;
+}
+
+void
+DistributorStripe::enableClusterStateBundle(const lib::ClusterStateBundle& state)
+{
+ lib::Node my_node(lib::NodeType::DISTRIBUTOR, getDistributorIndex());
+ lib::ClusterStateBundle oldState = _clusterStateBundle;
+ _clusterStateBundle = state;
+ propagateClusterStates();
+
+ const auto& baseline_state = *state.getBaselineClusterState();
+ if (!_doneInitializing && (baseline_state.getNodeState(my_node).getState() == lib::State::UP)) {
+ _doneInitializing = true;
+ _doneInitializeHandler.notifyDoneInitializing();
+ }
+ enterRecoveryMode();
+
+ // Clear all active messages on nodes that are down.
+ const uint16_t old_node_count = oldState.getBaselineClusterState()->getNodeCount(lib::NodeType::STORAGE);
+ const uint16_t new_node_count = baseline_state.getNodeCount(lib::NodeType::STORAGE);
+ for (uint16_t i = 0; i < std::max(old_node_count, new_node_count); ++i) {
+ const auto& node_state = baseline_state.getNodeState(lib::Node(lib::NodeType::STORAGE, i)).getState();
+ if (!node_state.oneOf(getStorageNodeUpStates())) {
+ std::vector<uint64_t> msgIds = _pendingMessageTracker.clearMessagesForNode(i);
+ LOG(debug, "Node %u is down, clearing %zu pending maintenance operations", i, msgIds.size());
+
+ for (uint32_t j = 0; j < msgIds.size(); ++j) {
+ _maintenanceOperationOwner.erase(msgIds[j]);
+ }
+ }
+ }
+
+ if (_bucketDBUpdater.bucketOwnershipHasChanged()) {
+ using TimePoint = OwnershipTransferSafeTimePointCalculator::TimePoint;
+ // Note: this assumes that std::chrono::system_clock and the framework
+ // system clock have the same epoch, which should be a reasonable
+ // assumption.
+ const auto now = TimePoint(std::chrono::milliseconds(
+ _component.getClock().getTimeInMillis().getTime()));
+ _externalOperationHandler.rejectFeedBeforeTimeReached(
+ _ownershipSafeTimeCalc->safeTimePoint(now));
+ }
+}
+
+OperationRoutingSnapshot DistributorStripe::read_snapshot_for_bucket(const document::Bucket& bucket) const {
+ return _bucketDBUpdater.read_snapshot_for_bucket(bucket);
+}
+
+void
+DistributorStripe::notifyDistributionChangeEnabled()
+{
+ LOG(debug, "Pending cluster state for distribution change has been enabled");
+ // Trigger a re-scan of bucket database, just like we do when a new cluster
+ // state has been enabled.
+ enterRecoveryMode();
+}
+
+void
+DistributorStripe::enterRecoveryMode()
+{
+ LOG(debug, "Entering recovery mode");
+ _schedulingMode = MaintenanceScheduler::RECOVERY_SCHEDULING_MODE;
+ _scanner->reset();
+ _bucketDBMetricUpdater.reset();
+ // TODO reset _bucketDbStats?
+ invalidate_bucket_spaces_stats();
+
+ _recoveryTimeStarted = framework::MilliSecTimer(_component.getClock());
+}
+
+void
+DistributorStripe::leaveRecoveryMode()
+{
+ if (isInRecoveryMode()) {
+ LOG(debug, "Leaving recovery mode");
+ // FIXME don't use shared metric for this
+ _metrics.recoveryModeTime.addValue(
+ _recoveryTimeStarted.getElapsedTimeAsDouble());
+ if (_doneInitializing) {
+ _must_send_updated_host_info = true;
+ }
+ }
+ _schedulingMode = MaintenanceScheduler::NORMAL_SCHEDULING_MODE;
+}
+
+template <typename NodeFunctor>
+void DistributorStripe::for_each_available_content_node_in(const lib::ClusterState& state, NodeFunctor&& func) {
+ const auto node_count = state.getNodeCount(lib::NodeType::STORAGE);
+ for (uint16_t i = 0; i < node_count; ++i) {
+ lib::Node node(lib::NodeType::STORAGE, i);
+ if (state.getNodeState(node).getState().oneOf("uir")) {
+ func(node);
+ }
+ }
+}
+
+BucketSpacesStatsProvider::BucketSpacesStats DistributorStripe::make_invalid_stats_per_configured_space() const {
+ BucketSpacesStatsProvider::BucketSpacesStats invalid_space_stats;
+ for (auto& space : *_bucketSpaceRepo) {
+ invalid_space_stats.emplace(document::FixedBucketSpaces::to_string(space.first),
+ BucketSpaceStats::make_invalid());
+ }
+ return invalid_space_stats;
+}
+
+void DistributorStripe::invalidate_bucket_spaces_stats() {
+ std::lock_guard guard(_metricLock);
+ _bucketSpacesStats = BucketSpacesStatsProvider::PerNodeBucketSpacesStats();
+ auto invalid_space_stats = make_invalid_stats_per_configured_space();
+
+ const auto& baseline = *_clusterStateBundle.getBaselineClusterState();
+ for_each_available_content_node_in(baseline, [this, &invalid_space_stats](const lib::Node& node) {
+ _bucketSpacesStats[node.getIndex()] = invalid_space_stats;
+ });
+}
+
+void
+DistributorStripe::storageDistributionChanged()
+{
+ if (!_distribution.get()
+ || *_component.getDistribution() != *_distribution)
+ {
+ LOG(debug,
+ "Distribution changed to %s, must refetch bucket information",
+ _component.getDistribution()->toString().c_str());
+
+ // FIXME this is not thread safe
+ _nextDistribution = _component.getDistribution();
+ } else {
+ LOG(debug,
+ "Got distribution change, but the distribution %s was the same as "
+ "before: %s",
+ _component.getDistribution()->toString().c_str(),
+ _distribution->toString().c_str());
+ }
+}
+
+void
+DistributorStripe::recheckBucketInfo(uint16_t nodeIdx, const document::Bucket &bucket) {
+ _bucketDBUpdater.recheckBucketInfo(nodeIdx, bucket);
+}
+
+namespace {
+
+class SplitChecker : public PendingMessageTracker::Checker
+{
+public:
+ bool found;
+ uint8_t maxPri;
+
+ SplitChecker(uint8_t maxP) : found(false), maxPri(maxP) {};
+
+ bool check(uint32_t msgType, uint16_t node, uint8_t pri) override {
+ (void) node;
+ (void) pri;
+ if (msgType == api::MessageType::SPLITBUCKET_ID && pri <= maxPri) {
+ found = true;
+ return false;
+ }
+
+ return true;
+ }
+};
+
+}
+
+void
+DistributorStripe::checkBucketForSplit(document::BucketSpace bucketSpace,
+ const BucketDatabase::Entry& e,
+ uint8_t priority)
+{
+ if (!getConfig().doInlineSplit()) {
+ return;
+ }
+
+ // Verify that there are no existing pending splits at the
+ // appropriate priority.
+ SplitChecker checker(priority);
+ for (uint32_t i = 0; i < e->getNodeCount(); ++i) {
+ _pendingMessageTracker.checkPendingMessages(e->getNodeRef(i).getNode(),
+ document::Bucket(bucketSpace, e.getBucketId()),
+ checker);
+ if (checker.found) {
+ return;
+ }
+ }
+
+ Operation::SP operation =
+ _idealStateManager.generateInterceptingSplit(bucketSpace, e, priority);
+
+ if (operation.get()) {
+ _maintenanceOperationOwner.start(operation, priority);
+ }
+}
+
+void
+DistributorStripe::enableNextDistribution()
+{
+ if (_nextDistribution.get()) {
+ _distribution = _nextDistribution;
+ propagateDefaultDistribution(_distribution);
+ _nextDistribution = std::shared_ptr<lib::Distribution>();
+ _bucketDBUpdater.storageDistributionChanged();
+ }
+}
+
+void
+DistributorStripe::propagateDefaultDistribution(
+ std::shared_ptr<const lib::Distribution> distribution)
+{
+ auto global_distr = GlobalBucketSpaceDistributionConverter::convert_to_global(*distribution);
+ for (auto* repo : {_bucketSpaceRepo.get(), _readOnlyBucketSpaceRepo.get()}) {
+ repo->get(document::FixedBucketSpaces::default_space()).setDistribution(distribution);
+ repo->get(document::FixedBucketSpaces::global_space()).setDistribution(global_distr);
+ }
+}
+
+void
+DistributorStripe::propagateClusterStates()
+{
+ for (auto* repo : {_bucketSpaceRepo.get(), _readOnlyBucketSpaceRepo.get()}) {
+ for (auto& iter : *repo) {
+ iter.second->setClusterState(_clusterStateBundle.getDerivedClusterState(iter.first));
+ }
+ }
+}
+
+void
+DistributorStripe::signalWorkWasDone()
+{
+ _tickResult = framework::ThreadWaitInfo::MORE_WORK_ENQUEUED;
+}
+
+bool
+DistributorStripe::workWasDone() const noexcept
+{
+ return !_tickResult.waitWanted();
+}
+
+namespace {
+
+bool is_client_request(const api::StorageMessage& msg) noexcept {
+ // Despite having been converted to StorageAPI messages, the following
+ // set of messages are never sent to the distributor by other processes
+ // than clients.
+ switch (msg.getType().getId()) {
+ case api::MessageType::GET_ID:
+ case api::MessageType::PUT_ID:
+ case api::MessageType::REMOVE_ID:
+ case api::MessageType::VISITOR_CREATE_ID:
+ case api::MessageType::VISITOR_DESTROY_ID:
+ case api::MessageType::GETBUCKETLIST_ID:
+ case api::MessageType::STATBUCKET_ID:
+ case api::MessageType::UPDATE_ID:
+ case api::MessageType::REMOVELOCATION_ID:
+ return true;
+ default:
+ return false;
+ }
+}
+
+}
+
+void DistributorStripe::handle_or_propagate_message(const std::shared_ptr<api::StorageMessage>& msg) {
+ if (!handleMessage(msg)) {
+ MBUS_TRACE(msg->getTrace(), 9, "Distributor: Not handling it. Sending further down.");
+ _messageSender.sendDown(msg);
+ }
+}
+
+void DistributorStripe::startExternalOperations() {
+ for (auto& msg : _fetchedMessages) {
+ if (is_client_request(*msg)) {
+ MBUS_TRACE(msg->getTrace(), 9, "Distributor: adding to client request priority queue");
+ _client_request_priority_queue.emplace(std::move(msg));
+ } else {
+ MBUS_TRACE(msg->getTrace(), 9, "Distributor: Grabbed from queue to be processed.");
+ handle_or_propagate_message(msg);
+ }
+ }
+
+ const bool start_single_client_request = !_client_request_priority_queue.empty();
+ if (start_single_client_request) {
+ const auto& msg = _client_request_priority_queue.top();
+ MBUS_TRACE(msg->getTrace(), 9, "Distributor: Grabbed from "
+ "client request priority queue to be processed.");
+ handle_or_propagate_message(msg);
+ _client_request_priority_queue.pop();
+ }
+
+ if (!_fetchedMessages.empty() || start_single_client_request) {
+ signalWorkWasDone();
+ }
+ _fetchedMessages.clear();
+}
+
+std::unordered_map<uint16_t, uint32_t>
+DistributorStripe::getMinReplica() const
+{
+ std::lock_guard guard(_metricLock);
+ return _bucketDbStats._minBucketReplica;
+}
+
+BucketSpacesStatsProvider::PerNodeBucketSpacesStats
+DistributorStripe::getBucketSpacesStats() const
+{
+ std::lock_guard guard(_metricLock);
+ return _bucketSpacesStats;
+}
+
+SimpleMaintenanceScanner::PendingMaintenanceStats
+DistributorStripe::pending_maintenance_stats() const {
+ std::lock_guard guard(_metricLock);
+ return _maintenanceStats;
+}
+
+void
+DistributorStripe::propagateInternalScanMetricsToExternal()
+{
+ std::lock_guard guard(_metricLock);
+
+ // All shared values are written when _metricLock is held, so no races.
+ if (_bucketDBMetricUpdater.hasCompletedRound()) {
+ _bucketDbStats.propagateMetrics(_idealStateManager.getMetrics(), getMetrics());
+ _idealStateManager.getMetrics().setPendingOperations(_maintenanceStats.global.pending);
+ }
+}
+
+namespace {
+
+BucketSpaceStats
+toBucketSpaceStats(const NodeMaintenanceStats &stats)
+{
+ return BucketSpaceStats(stats.total, stats.syncing + stats.copyingIn);
+}
+
+using PerNodeBucketSpacesStats = BucketSpacesStatsProvider::PerNodeBucketSpacesStats;
+
+PerNodeBucketSpacesStats
+toBucketSpacesStats(const NodeMaintenanceStatsTracker &maintenanceStats)
+{
+ PerNodeBucketSpacesStats result;
+ for (const auto &nodeEntry : maintenanceStats.perNodeStats()) {
+ for (const auto &bucketSpaceEntry : nodeEntry.second) {
+ auto bucketSpace = document::FixedBucketSpaces::to_string(bucketSpaceEntry.first);
+ result[nodeEntry.first][bucketSpace] = toBucketSpaceStats(bucketSpaceEntry.second);
+ }
+ }
+ return result;
+}
+
+size_t spaces_with_merges_pending(const PerNodeBucketSpacesStats& stats) {
+ std::unordered_set<document::BucketSpace, document::BucketSpace::hash> spaces_with_pending;
+ for (auto& node : stats) {
+ for (auto& space : node.second) {
+ if (space.second.valid() && space.second.bucketsPending() != 0) {
+ // TODO avoid bucket space string roundtrip
+ spaces_with_pending.emplace(document::FixedBucketSpaces::from_string(space.first));
+ }
+ }
+ }
+ return spaces_with_pending.size();
+}
+
+// TODO should we also trigger on !pending --> pending edge?
+bool merge_no_longer_pending_edge(const PerNodeBucketSpacesStats& prev_stats,
+ const PerNodeBucketSpacesStats& curr_stats) {
+ const auto prev_pending = spaces_with_merges_pending(prev_stats);
+ const auto curr_pending = spaces_with_merges_pending(curr_stats);
+ return curr_pending < prev_pending;
+}
+
+}
+
+void
+DistributorStripe::updateInternalMetricsForCompletedScan()
+{
+ std::lock_guard guard(_metricLock);
+
+ _bucketDBMetricUpdater.completeRound();
+ _bucketDbStats = _bucketDBMetricUpdater.getLastCompleteStats();
+ _maintenanceStats = _scanner->getPendingMaintenanceStats();
+ auto new_space_stats = toBucketSpacesStats(_maintenanceStats.perNodeStats);
+ if (merge_no_longer_pending_edge(_bucketSpacesStats, new_space_stats)) {
+ _must_send_updated_host_info = true;
+ }
+ _bucketSpacesStats = std::move(new_space_stats);
+ maybe_update_bucket_db_memory_usage_stats();
+}
+
+void DistributorStripe::maybe_update_bucket_db_memory_usage_stats() {
+ auto now = _component.getClock().getMonotonicTime();
+ if ((now - _last_db_memory_sample_time_point) > _db_memory_sample_interval) {
+ for (auto& space : *_bucketSpaceRepo) {
+ _bucketDBMetricUpdater.update_db_memory_usage(space.second->getBucketDatabase().memory_usage(), true);
+ }
+ for (auto& space : *_readOnlyBucketSpaceRepo) {
+ _bucketDBMetricUpdater.update_db_memory_usage(space.second->getBucketDatabase().memory_usage(), false);
+ }
+ _last_db_memory_sample_time_point = now;
+ } else {
+ // Reuse previous memory statistics instead of sampling new.
+ _bucketDBMetricUpdater.update_db_memory_usage(_bucketDbStats._mutable_db_mem_usage, true);
+ _bucketDBMetricUpdater.update_db_memory_usage(_bucketDbStats._read_only_db_mem_usage, false);
+ }
+}
+
+void
+DistributorStripe::scanAllBuckets()
+{
+ enterRecoveryMode();
+ while (!scanNextBucket().isDone()) {}
+}
+
+MaintenanceScanner::ScanResult
+DistributorStripe::scanNextBucket()
+{
+ MaintenanceScanner::ScanResult scanResult(_scanner->scanNext());
+ if (scanResult.isDone()) {
+ updateInternalMetricsForCompletedScan();
+ leaveRecoveryMode();
+ send_updated_host_info_if_required();
+ _scanner->reset();
+ } else {
+ const auto &distribution(_bucketSpaceRepo->get(scanResult.getBucketSpace()).getDistribution());
+ _bucketDBMetricUpdater.visit(
+ scanResult.getEntry(),
+ distribution.getRedundancy());
+ }
+ return scanResult;
+}
+
+void DistributorStripe::send_updated_host_info_if_required() {
+ if (_must_send_updated_host_info) {
+ _component.getStateUpdater().immediately_send_get_node_state_replies();
+ _must_send_updated_host_info = false;
+ }
+}
+
+void
+DistributorStripe::startNextMaintenanceOperation()
+{
+ _throttlingStarter->setMaxPendingRange(getConfig().getMinPendingMaintenanceOps(),
+ getConfig().getMaxPendingMaintenanceOps());
+ _scheduler->tick(_schedulingMode);
+}
+
+framework::ThreadWaitInfo
+DistributorStripe::doCriticalTick(framework::ThreadIndex)
+{
+ _tickResult = framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN;
+ enableNextDistribution();
+ enableNextConfig();
+ fetchStatusRequests();
+ fetchExternalMessages();
+ return _tickResult;
+}
+
+framework::ThreadWaitInfo
+DistributorStripe::doNonCriticalTick(framework::ThreadIndex)
+{
+ _tickResult = framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN;
+ handleStatusRequests();
+ startExternalOperations();
+ if (initializing()) {
+ _bucketDBUpdater.resendDelayedMessages();
+ return _tickResult;
+ }
+ // Ordering note: since maintenance inhibiting checks whether startExternalOperations()
+ // did any useful work with incoming data, this check must be performed _after_ the call.
+ if (!should_inhibit_current_maintenance_scan_tick()) {
+ scanNextBucket();
+ startNextMaintenanceOperation();
+ if (isInRecoveryMode()) {
+ signalWorkWasDone();
+ }
+ mark_maintenance_tick_as_no_longer_inhibited();
+ _bucketDBUpdater.resendDelayedMessages();
+ } else {
+ mark_current_maintenance_tick_as_inhibited();
+ }
+ return _tickResult;
+}
+
+bool DistributorStripe::should_inhibit_current_maintenance_scan_tick() const noexcept {
+ return (workWasDone() && (_inhibited_maintenance_tick_count
+ < getConfig().max_consecutively_inhibited_maintenance_ticks()));
+}
+
+void DistributorStripe::mark_current_maintenance_tick_as_inhibited() noexcept {
+ ++_inhibited_maintenance_tick_count;
+}
+
+void DistributorStripe::mark_maintenance_tick_as_no_longer_inhibited() noexcept {
+ _inhibited_maintenance_tick_count = 0;
+}
+
+void
+DistributorStripe::enableNextConfig()
+{
+ _bucketDBMetricUpdater.setMinimumReplicaCountingMode(getConfig().getMinimumReplicaCountingMode());
+ _ownershipSafeTimeCalc->setMaxClusterClockSkew(getConfig().getMaxClusterClockSkew());
+ _pendingMessageTracker.setNodeBusyDuration(getConfig().getInhibitMergesOnBusyNodeDuration());
+ _bucketDBUpdater.set_stale_reads_enabled(getConfig().allowStaleReadsDuringClusterStateTransitions());
+ _externalOperationHandler.set_concurrent_gets_enabled(
+ getConfig().allowStaleReadsDuringClusterStateTransitions());
+ _externalOperationHandler.set_use_weak_internal_read_consistency_for_gets(
+ getConfig().use_weak_internal_read_consistency_for_client_gets());
+}
+
+void
+DistributorStripe::fetchStatusRequests()
+{
+ if (_fetchedStatusRequests.empty()) {
+ _fetchedStatusRequests.swap(_statusToDo);
+ }
+}
+
+void
+DistributorStripe::fetchExternalMessages()
+{
+ assert(_fetchedMessages.empty());
+ _fetchedMessages.swap(_messageQueue);
+}
+
+void
+DistributorStripe::handleStatusRequests()
+{
+ uint32_t sz = _fetchedStatusRequests.size();
+ for (uint32_t i = 0; i < sz; ++i) {
+ auto& s = *_fetchedStatusRequests[i];
+ s.getReporter().reportStatus(s.getStream(), s.getPath());
+ s.notifyCompleted();
+ }
+ _fetchedStatusRequests.clear();
+ if (sz > 0) {
+ signalWorkWasDone();
+ }
+}
+
+vespalib::string
+DistributorStripe::getReportContentType(const framework::HttpUrlPath& path) const
+{
+ if (path.hasAttribute("page")) {
+ if (path.getAttribute("page") == "buckets") {
+ return "text/html";
+ } else {
+ return "application/xml";
+ }
+ } else {
+ return "text/html";
+ }
+}
+
+std::string
+DistributorStripe::getActiveIdealStateOperations() const
+{
+ return _maintenanceOperationOwner.toString();
+}
+
+std::string
+DistributorStripe::getActiveOperations() const
+{
+ return _operationOwner.toString();
+}
+
+bool
+DistributorStripe::reportStatus(std::ostream& out,
+ const framework::HttpUrlPath& path) const
+{
+ if (!path.hasAttribute("page") || path.getAttribute("page") == "buckets") {
+ framework::PartlyHtmlStatusReporter htmlReporter(*this);
+ htmlReporter.reportHtmlHeader(out, path);
+ if (!path.hasAttribute("page")) {
+ out << "<a href=\"?page=pending\">Count of pending messages to "
+ << "storage nodes</a><br><a href=\"?page=maintenance&show=50\">"
+ << "List maintenance queue (adjust show parameter to see more "
+ << "operations, -1 for all)</a><br>\n<a href=\"?page=buckets\">"
+ << "List all buckets, highlight non-ideal state</a><br>\n";
+ } else {
+ const_cast<IdealStateManager&>(_idealStateManager)
+ .getBucketStatus(out);
+ }
+ htmlReporter.reportHtmlFooter(out, path);
+ } else {
+ framework::PartlyXmlStatusReporter xmlReporter(*this, out, path);
+ using namespace vespalib::xml;
+ std::string page(path.getAttribute("page"));
+
+ if (page == "pending") {
+ xmlReporter << XmlTag("pending")
+ << XmlAttribute("externalload", _operationOwner.size())
+ << XmlAttribute("maintenance",
+ _maintenanceOperationOwner.size())
+ << XmlEndTag();
+ } else if (page == "maintenance") {
+ // Need new page
+ }
+ }
+
+ return true;
+}
+
+bool
+DistributorStripe::handleStatusRequest(const DelegatedStatusRequest& request) const
+{
+ auto wrappedRequest = std::make_shared<DistributorStatus>(request);
+ {
+ framework::TickingLockGuard guard(_threadPool.freezeCriticalTicks());
+ _statusToDo.push_back(wrappedRequest);
+ guard.broadcast();
+ }
+ wrappedRequest->waitForCompletion();
+ return true;
+}
+
+}
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.h b/storage/src/vespa/storage/distributor/distributor_stripe.h
new file mode 100644
index 00000000000..dbf899a6de2
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/distributor_stripe.h
@@ -0,0 +1,352 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "bucket_spaces_stats_provider.h"
+#include "bucketdbupdater.h"
+#include "distributor_host_info_reporter.h"
+#include "distributorinterface.h"
+#include "externaloperationhandler.h"
+#include "idealstatemanager.h"
+#include "min_replica_provider.h"
+#include "pendingmessagetracker.h"
+#include "statusreporterdelegate.h"
+#include <vespa/config/config.h>
+#include <vespa/storage/common/doneinitializehandler.h>
+#include <vespa/storage/common/messagesender.h>
+#include <vespa/storage/distributor/bucketdb/bucketdbmetricupdater.h>
+#include <vespa/storage/distributor/distributorcomponent.h>
+#include <vespa/storage/distributor/maintenance/maintenancescheduler.h>
+#include <vespa/storageapi/message/state.h>
+#include <vespa/storageframework/generic/metric/metricupdatehook.h>
+#include <vespa/storageframework/generic/thread/tickingthread.h>
+#include <queue>
+#include <unordered_map>
+
+namespace storage {
+ struct DoneInitializeHandler;
+ class HostInfo;
+ class NodeIdentity;
+}
+
+namespace storage::distributor {
+
+class BlockingOperationStarter;
+class BucketPriorityDatabase;
+class DistributorStatus;
+class DistributorBucketSpaceRepo;
+class OperationSequencer;
+class OwnershipTransferSafeTimePointCalculator;
+class SimpleMaintenanceScanner;
+class ThrottlingOperationStarter;
+
+class DistributorStripe final
+ : public StorageLink, // TODO decouple
+ public DistributorInterface,
+ public StatusDelegator,
+ public framework::StatusReporter,
+ public framework::TickingThread,
+ public MinReplicaProvider,
+ public BucketSpacesStatsProvider,
+ public NonTrackingMessageSender
+{
+public:
+ DistributorStripe(DistributorComponentRegister&,
+ DistributorMetricSet& metrics,
+ const NodeIdentity& node_identity,
+ framework::TickingThreadPool&,
+ DoneInitializeHandler&,
+ bool manageActiveBucketCopies,
+ ChainedMessageSender& messageSender);
+
+ ~DistributorStripe() override;
+
+ const ClusterContext& cluster_context() const override {
+ return _component.cluster_context();
+ }
+ void onOpen() override;
+ void onClose() override;
+ bool onDown(const std::shared_ptr<api::StorageMessage>&) override;
+ void sendUp(const std::shared_ptr<api::StorageMessage>&) override;
+ // Bypasses message tracker component. Thread safe.
+ void send_up_without_tracking(const std::shared_ptr<api::StorageMessage>&) override;
+
+ ChainedMessageSender& getMessageSender() override {
+ return _messageSender;
+ }
+
+ DistributorMetricSet& getMetrics() override { return _metrics; }
+
+ PendingMessageTracker& getPendingMessageTracker() override {
+ return _pendingMessageTracker;
+ }
+
+ const OperationSequencer& operation_sequencer() const noexcept override {
+ return *_operation_sequencer;
+ }
+
+ const lib::ClusterState* pendingClusterStateOrNull(const document::BucketSpace&) const override;
+
+ /**
+ * Enables a new cluster state. Called after the bucket db updater has
+ * retrieved all bucket info related to the change.
+ */
+ void enableClusterStateBundle(const lib::ClusterStateBundle& clusterStateBundle) override;
+
+ /**
+ * Invoked when a pending cluster state for a distribution (config)
+ * change has been enabled. An invocation of storageDistributionChanged
+ * will eventually cause this method to be called, assuming the pending
+ * cluster state completed successfully.
+ */
+ void notifyDistributionChangeEnabled() override;
+
+ void storageDistributionChanged() override;
+
+ void recheckBucketInfo(uint16_t nodeIdx, const document::Bucket &bucket) override;
+
+ bool handleReply(const std::shared_ptr<api::StorageReply>& reply) override;
+
+ // StatusReporter implementation
+ vespalib::string getReportContentType(const framework::HttpUrlPath&) const override;
+ bool reportStatus(std::ostream&, const framework::HttpUrlPath&) const override;
+
+ bool handleStatusRequest(const DelegatedStatusRequest& request) const override;
+
+ std::string getActiveIdealStateOperations() const;
+ std::string getActiveOperations() const;
+
+ virtual framework::ThreadWaitInfo doCriticalTick(framework::ThreadIndex) override;
+ virtual framework::ThreadWaitInfo doNonCriticalTick(framework::ThreadIndex) override;
+
+ /**
+ * Checks whether a bucket needs to be split, and sends a split
+ * if so.
+ */
+ void checkBucketForSplit(document::BucketSpace bucketSpace,
+ const BucketDatabase::Entry& e,
+ uint8_t priority) override;
+
+ const lib::ClusterStateBundle& getClusterStateBundle() const override;
+
+ /**
+ * @return Returns the states in which the distributors consider
+ * storage nodes to be up.
+ */
+ const char* getStorageNodeUpStates() const override {
+ return "uri";
+ }
+
+ /**
+ * Called by bucket db updater after a merge has finished, and all the
+ * request bucket info operations have been performed as well. Passes the
+ * merge back to the operation that created it.
+ */
+ void handleCompletedMerge(const std::shared_ptr<api::MergeBucketReply>& reply) override;
+
+ bool initializing() const override {
+ return !_doneInitializing;
+ }
+
+ const DistributorConfiguration& getConfig() const override {
+ return _component.getTotalDistributorConfig();
+ }
+
+ bool isInRecoveryMode() const noexcept {
+ return _schedulingMode == MaintenanceScheduler::RECOVERY_SCHEDULING_MODE;
+ }
+
+ int getDistributorIndex() const override;
+ const PendingMessageTracker& getPendingMessageTracker() const override;
+ void sendCommand(const std::shared_ptr<api::StorageCommand>&) override;
+ void sendReply(const std::shared_ptr<api::StorageReply>&) override;
+
+ const BucketGcTimeCalculator::BucketIdHasher&
+ getBucketIdHasher() const override {
+ return *_bucketIdHasher;
+ }
+
+ BucketDBUpdater& bucket_db_updater() { return _bucketDBUpdater; }
+ const BucketDBUpdater& bucket_db_updater() const { return _bucketDBUpdater; }
+ IdealStateManager& ideal_state_manager() { return _idealStateManager; }
+ const IdealStateManager& ideal_state_manager() const { return _idealStateManager; }
+ ExternalOperationHandler& external_operation_handler() { return _externalOperationHandler; }
+ const ExternalOperationHandler& external_operation_handler() const { return _externalOperationHandler; }
+
+ DistributorBucketSpaceRepo &getBucketSpaceRepo() noexcept { return *_bucketSpaceRepo; }
+ const DistributorBucketSpaceRepo &getBucketSpaceRepo() const noexcept { return *_bucketSpaceRepo; }
+
+ DistributorBucketSpaceRepo& getReadOnlyBucketSpaceRepo() noexcept {
+ return *_readOnlyBucketSpaceRepo;
+ }
+ const DistributorBucketSpaceRepo& getReadyOnlyBucketSpaceRepo() const noexcept {
+ return *_readOnlyBucketSpaceRepo;
+ }
+
+ OperationRoutingSnapshot read_snapshot_for_bucket(const document::Bucket&) const override;
+
+ class MetricUpdateHook : public framework::MetricUpdateHook
+ {
+ public:
+ MetricUpdateHook(DistributorStripe& self)
+ : _self(self)
+ {
+ }
+
+ void updateMetrics(const MetricLockGuard &) override {
+ _self.propagateInternalScanMetricsToExternal();
+ }
+
+ private:
+ DistributorStripe& _self;
+ };
+
+ std::chrono::steady_clock::duration db_memory_sample_interval() const noexcept {
+ return _db_memory_sample_interval;
+ }
+
+private:
+ friend struct DistributorTest;
+ friend class BucketDBUpdaterTest;
+ friend class DistributorTestUtil;
+ friend class MetricUpdateHook;
+ friend class Distributor;
+
+ bool handleMessage(const std::shared_ptr<api::StorageMessage>& msg);
+ bool isMaintenanceReply(const api::StorageReply& reply) const;
+
+ void handleStatusRequests();
+ void send_shutdown_abort_reply(const std::shared_ptr<api::StorageMessage>&);
+ void handle_or_propagate_message(const std::shared_ptr<api::StorageMessage>& msg);
+ void startExternalOperations();
+
+ /**
+ * Return a copy of the latest min replica data, see MinReplicaProvider.
+ */
+ std::unordered_map<uint16_t, uint32_t> getMinReplica() const override;
+
+ PerNodeBucketSpacesStats getBucketSpacesStats() const override;
+
+ SimpleMaintenanceScanner::PendingMaintenanceStats pending_maintenance_stats() const;
+
+ /**
+ * Atomically publish internal metrics to external ideal state metrics.
+ * Takes metric lock.
+ */
+ void propagateInternalScanMetricsToExternal();
+ /**
+ * Atomically updates internal metrics (not externally visible metrics;
+ * these are not changed until a snapshot triggers
+ * propagateIdealStateMetrics()).
+ *
+ * Takes metric lock.
+ */
+ void updateInternalMetricsForCompletedScan();
+ void maybe_update_bucket_db_memory_usage_stats();
+ void scanAllBuckets();
+ MaintenanceScanner::ScanResult scanNextBucket();
+ bool should_inhibit_current_maintenance_scan_tick() const noexcept;
+ void mark_current_maintenance_tick_as_inhibited() noexcept;
+ void mark_maintenance_tick_as_no_longer_inhibited() noexcept;
+ void enableNextConfig();
+ void fetchStatusRequests();
+ void fetchExternalMessages();
+ void startNextMaintenanceOperation();
+ void signalWorkWasDone();
+ bool workWasDone() const noexcept;
+
+ void enterRecoveryMode();
+ void leaveRecoveryMode();
+
+ // Tries to generate an operation from the given message. Returns true
+ // if we either returned an operation, or the message was otherwise handled
+ // (for instance, wrong distribution).
+ bool generateOperation(const std::shared_ptr<api::StorageMessage>& msg,
+ Operation::SP& operation);
+
+ void enableNextDistribution();
+ void propagateDefaultDistribution(std::shared_ptr<const lib::Distribution>);
+ void propagateClusterStates();
+
+ BucketSpacesStatsProvider::BucketSpacesStats make_invalid_stats_per_configured_space() const;
+ template <typename NodeFunctor>
+ void for_each_available_content_node_in(const lib::ClusterState&, NodeFunctor&&);
+ void invalidate_bucket_spaces_stats();
+ void send_updated_host_info_if_required();
+
+ lib::ClusterStateBundle _clusterStateBundle;
+
+ std::unique_ptr<DistributorBucketSpaceRepo> _bucketSpaceRepo;
+ // Read-only bucket space repo with DBs that only contain buckets transiently
+ // during cluster state transitions. Bucket set does not overlap that of _bucketSpaceRepo
+ // and the DBs are empty during non-transition phases.
+ std::unique_ptr<DistributorBucketSpaceRepo> _readOnlyBucketSpaceRepo;
+ storage::distributor::DistributorComponent _component;
+ DistributorMetricSet& _metrics;
+
+ OperationOwner _operationOwner;
+ OperationOwner _maintenanceOperationOwner;
+
+ std::unique_ptr<OperationSequencer> _operation_sequencer;
+ PendingMessageTracker _pendingMessageTracker;
+ BucketDBUpdater _bucketDBUpdater;
+ StatusReporterDelegate _distributorStatusDelegate;
+ StatusReporterDelegate _bucketDBStatusDelegate;
+ IdealStateManager _idealStateManager;
+ ChainedMessageSender& _messageSender;
+ ExternalOperationHandler _externalOperationHandler;
+
+ std::shared_ptr<lib::Distribution> _distribution;
+ std::shared_ptr<lib::Distribution> _nextDistribution;
+
+ using MessageQueue = std::vector<std::shared_ptr<api::StorageMessage>>;
+ struct IndirectHigherPriority {
+ template <typename Lhs, typename Rhs>
+ bool operator()(const Lhs& lhs, const Rhs& rhs) const noexcept {
+ return lhs->getPriority() > rhs->getPriority();
+ }
+ };
+ using ClientRequestPriorityQueue = std::priority_queue<
+ std::shared_ptr<api::StorageMessage>,
+ std::vector<std::shared_ptr<api::StorageMessage>>,
+ IndirectHigherPriority
+ >;
+ MessageQueue _messageQueue;
+ ClientRequestPriorityQueue _client_request_priority_queue;
+ MessageQueue _fetchedMessages;
+ framework::TickingThreadPool& _threadPool;
+
+ mutable std::vector<std::shared_ptr<DistributorStatus>> _statusToDo;
+ mutable std::vector<std::shared_ptr<DistributorStatus>> _fetchedStatusRequests;
+
+ DoneInitializeHandler& _doneInitializeHandler;
+ bool _doneInitializing;
+
+ std::unique_ptr<BucketPriorityDatabase> _bucketPriorityDb;
+ std::unique_ptr<SimpleMaintenanceScanner> _scanner;
+ std::unique_ptr<ThrottlingOperationStarter> _throttlingStarter;
+ std::unique_ptr<BlockingOperationStarter> _blockingStarter;
+ std::unique_ptr<MaintenanceScheduler> _scheduler;
+ MaintenanceScheduler::SchedulingMode _schedulingMode;
+ framework::MilliSecTimer _recoveryTimeStarted;
+ framework::ThreadWaitInfo _tickResult;
+ BucketDBMetricUpdater _bucketDBMetricUpdater;
+ std::unique_ptr<BucketGcTimeCalculator::BucketIdHasher> _bucketIdHasher;
+ MetricUpdateHook _metricUpdateHook;
+ mutable std::mutex _metricLock;
+ /**
+ * Maintenance stats for last completed database scan iteration.
+ * Access must be protected by _metricLock as it is read by metric
+ * manager thread but written by distributor thread.
+ */
+ SimpleMaintenanceScanner::PendingMaintenanceStats _maintenanceStats;
+ BucketSpacesStatsProvider::PerNodeBucketSpacesStats _bucketSpacesStats;
+ BucketDBMetricUpdater::Stats _bucketDbStats;
+ std::unique_ptr<OwnershipTransferSafeTimePointCalculator> _ownershipSafeTimeCalc;
+ std::chrono::steady_clock::duration _db_memory_sample_interval;
+ std::chrono::steady_clock::time_point _last_db_memory_sample_time_point;
+ size_t _inhibited_maintenance_tick_count;
+ bool _must_send_updated_host_info;
+};
+
+}
diff --git a/storage/src/vespa/storage/distributor/distributorcomponent.cpp b/storage/src/vespa/storage/distributor/distributorcomponent.cpp
index 3f6c125bbfa..e5fe3c6c43c 100644
--- a/storage/src/vespa/storage/distributor/distributorcomponent.cpp
+++ b/storage/src/vespa/storage/distributor/distributorcomponent.cpp
@@ -139,9 +139,9 @@ class UpdateBucketDatabaseProcessor : public BucketDatabase::EntryUpdateProcesso
bool _reset_trusted;
public:
UpdateBucketDatabaseProcessor(const framework::Clock& clock, const std::vector<BucketCopy>& changed_nodes, std::vector<uint16_t> ideal_nodes, bool reset_trusted);
- virtual ~UpdateBucketDatabaseProcessor();
- virtual BucketDatabase::Entry create_entry(const document::BucketId& bucket) const override;
- virtual bool process_entry(BucketDatabase::Entry &entry) const override;
+ ~UpdateBucketDatabaseProcessor() override;
+ BucketDatabase::Entry create_entry(const document::BucketId& bucket) const override;
+ bool process_entry(BucketDatabase::Entry &entry) const override;
};
UpdateBucketDatabaseProcessor::UpdateBucketDatabaseProcessor(const framework::Clock& clock, const std::vector<BucketCopy>& changed_nodes, std::vector<uint16_t> ideal_nodes, bool reset_trusted)
@@ -291,11 +291,6 @@ DistributorComponent::createAppropriateBucket(const document::Bucket &bucket)
}
bool
-DistributorComponent::initializing() const {
- return _distributor.initializing();
-}
-
-bool
DistributorComponent::has_pending_message(uint16_t node_index,
const document::Bucket& bucket,
uint32_t message_type) const
diff --git a/storage/src/vespa/storage/distributor/distributorcomponent.h b/storage/src/vespa/storage/distributor/distributorcomponent.h
index ca953ed01ef..6a3620cbcf7 100644
--- a/storage/src/vespa/storage/distributor/distributorcomponent.h
+++ b/storage/src/vespa/storage/distributor/distributorcomponent.h
@@ -139,11 +139,6 @@ public:
*/
BucketDatabase::Entry createAppropriateBucket(const document::Bucket &bucket);
- /**
- * Returns true if the node is currently initializing.
- */
- bool initializing() const;
-
// Implements DistributorNodeContext
const framework::Clock& clock() const noexcept override { return getClock(); }
const vespalib::string * cluster_name_ptr() const noexcept override { return cluster_context().cluster_name_ptr(); }
diff --git a/storage/src/vespa/storage/distributor/idealstatemanager.cpp b/storage/src/vespa/storage/distributor/idealstatemanager.cpp
index 6dbe6b1c2a5..c92f6a3dfcf 100644
--- a/storage/src/vespa/storage/distributor/idealstatemanager.cpp
+++ b/storage/src/vespa/storage/distributor/idealstatemanager.cpp
@@ -25,7 +25,7 @@ namespace storage {
namespace distributor {
IdealStateManager::IdealStateManager(
- Distributor& owner,
+ DistributorInterface& owner,
DistributorBucketSpaceRepo& bucketSpaceRepo,
DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo,
DistributorComponentRegister& compReg,
diff --git a/storage/src/vespa/storage/distributor/idealstatemanager.h b/storage/src/vespa/storage/distributor/idealstatemanager.h
index 7ed28d845d7..19f88334889 100644
--- a/storage/src/vespa/storage/distributor/idealstatemanager.h
+++ b/storage/src/vespa/storage/distributor/idealstatemanager.h
@@ -11,7 +11,7 @@ namespace storage::distributor {
class IdealStateMetricSet;
class IdealStateOperation;
-class Distributor;
+class DistributorInterface;
class SplitBucketStateChecker;
/**
@@ -34,7 +34,7 @@ class IdealStateManager : public framework::HtmlStatusReporter,
{
public:
- IdealStateManager(Distributor& owner,
+ IdealStateManager(DistributorInterface& owner,
DistributorBucketSpaceRepo& bucketSpaceRepo,
DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo,
DistributorComponentRegister& compReg,
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
index d80666518ee..7762918405d 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
@@ -167,18 +167,17 @@ FileStorHandlerImpl::flush(bool killPendingMerges)
if (killPendingMerges) {
api::ReturnCode code(api::ReturnCode::ABORTED, "Storage node is shutting down");
- for (auto & entry : _mergeStates)
- {
+ for (auto & entry : _mergeStates) {
MergeStatus& s(*entry.second);
- if (s.pendingGetDiff.get() != 0) {
+ if (s.pendingGetDiff) {
s.pendingGetDiff->setResult(code);
_messageSender.sendReply(s.pendingGetDiff);
}
- if (s.pendingApplyDiff.get() != 0) {
+ if (s.pendingApplyDiff) {
s.pendingApplyDiff->setResult(code);
_messageSender.sendReply(s.pendingApplyDiff);
}
- if (s.reply.get() != 0) {
+ if (s.reply) {
s.reply->setResult(code);
_messageSender.sendReply(s.reply);
}
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
index 83f268358cb..f1063bf9c10 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
@@ -835,18 +835,12 @@ void FileStorManager::onFlush(bool downwards)
LOG(debug, "flushed thread[%s]", thread->getThread().getId().c_str());
}
}
- uint32_t queueSize = _filestorHandler->getQueueSize();
- std::ostringstream ost;
- if (queueSize > 0) {
- ost << "Queue size " << queueSize;
- }
- std::string result = ost.str();
- if (result.size() > 0) {
+ uint32_t queue_size = _filestorHandler->getQueueSize();
+ if (queue_size > 0) {
LOG(error, "Operations in persistence layer after flush. This is ok "
"during load, but should not happen when flush is called "
"during shutdown as load then is supposed to have been "
- "stopped: %s",
- result.c_str());
+ "stopped: Queue size is %u", queue_size);
}
StorageLinkQueued::onFlush(downwards);
LOG(debug, "Done Flushing");
diff --git a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp
index 4c075f44d35..db4cddce032 100644
--- a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp
+++ b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp
@@ -112,6 +112,7 @@ void SharedRpcResources::shutdown() {
_slobrok_register->unregisterName(_handle);
}
_transport->ShutDown(true);
+ // FIXME need to reset to break weak_ptrs? But ShutDown should already sync pending resolves...!
_shutdown = true;
}