summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2021-08-27 09:07:57 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2021-08-27 11:05:33 +0000
commita52abb01475098fa77be306bb9ae61611a08cb1a (patch)
treed6674fd2c9b0f70b6691edc661db40b4c1cdce5f /storage
parenta0c5413398a2a3e4a4754dbf819c4a22b00b984a (diff)
Add test of top-level distributor functionality
This is a subset of the legacy distributor tests, adapted to explicitly test cross-stripe functionality. Once all relevant tests have been ported to be cross-stripe, the legacy test code will be removed.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/common/message_sender_stub.cpp2
-rw-r--r--storage/src/tests/common/message_sender_stub.h6
-rw-r--r--storage/src/tests/distributor/CMakeLists.txt2
-rw-r--r--storage/src/tests/distributor/top_level_distributor_test.cpp268
-rw-r--r--storage/src/tests/distributor/top_level_distributor_test_util.cpp310
-rw-r--r--storage/src/tests/distributor/top_level_distributor_test_util.h131
-rw-r--r--storage/src/vespa/storage/bucketdb/bucketinfo.h2
-rw-r--r--storage/src/vespa/storage/common/distributorcomponent.h8
-rw-r--r--storage/src/vespa/storage/distributor/bucketdbupdater.h1
-rw-r--r--storage/src/vespa/storage/distributor/distributor.cpp46
-rw-r--r--storage/src/vespa/storage/distributor/distributor.h7
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe.h1
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe_pool.cpp36
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe_pool.h6
-rw-r--r--storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h1
15 files changed, 800 insertions, 27 deletions
diff --git a/storage/src/tests/common/message_sender_stub.cpp b/storage/src/tests/common/message_sender_stub.cpp
index a82d45b0b99..c0ed0a9418a 100644
--- a/storage/src/tests/common/message_sender_stub.cpp
+++ b/storage/src/tests/common/message_sender_stub.cpp
@@ -22,7 +22,7 @@ MessageSenderStub::getLastCommand(bool verbose) const
}
std::string
-MessageSenderStub::dumpMessage(const api::StorageMessage& msg, bool includeAddress, bool verbose) const
+MessageSenderStub::dumpMessage(const api::StorageMessage& msg, bool includeAddress, bool verbose)
{
std::ostringstream ost;
diff --git a/storage/src/tests/common/message_sender_stub.h b/storage/src/tests/common/message_sender_stub.h
index 73b1fcff9f4..3a73ff884bc 100644
--- a/storage/src/tests/common/message_sender_stub.h
+++ b/storage/src/tests/common/message_sender_stub.h
@@ -38,9 +38,9 @@ struct MessageSenderStub : MessageSender {
std::string getReplies(bool includeAddress = false,
bool verbose = false) const;
- std::string dumpMessage(const api::StorageMessage& msg,
- bool includeAddress,
- bool verbose) const;
+ static std::string dumpMessage(const api::StorageMessage& msg,
+ bool includeAddress,
+ bool verbose);
};
diff --git a/storage/src/tests/distributor/CMakeLists.txt b/storage/src/tests/distributor/CMakeLists.txt
index 0d314e050d4..678c19d4c6f 100644
--- a/storage/src/tests/distributor/CMakeLists.txt
+++ b/storage/src/tests/distributor/CMakeLists.txt
@@ -47,6 +47,8 @@ vespa_add_executable(storage_distributor_gtest_runner_app TEST
statoperationtest.cpp
statusreporterdelegatetest.cpp
throttlingoperationstartertest.cpp
+ top_level_distributor_test.cpp
+ top_level_distributor_test_util.cpp
twophaseupdateoperationtest.cpp
updateoperationtest.cpp
visitoroperationtest.cpp
diff --git a/storage/src/tests/distributor/top_level_distributor_test.cpp b/storage/src/tests/distributor/top_level_distributor_test.cpp
new file mode 100644
index 00000000000..8572420eaba
--- /dev/null
+++ b/storage/src/tests/distributor/top_level_distributor_test.cpp
@@ -0,0 +1,268 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/storage/distributor/idealstatemetricsset.h>
+#include <vespa/storageapi/message/persistence.h>
+#include <vespa/storageapi/message/bucketsplitting.h>
+#include <vespa/storageapi/message/visitor.h>
+#include <vespa/storageapi/message/removelocation.h>
+#include <vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h>
+#include <tests/distributor/top_level_distributor_test_util.h>
+#include <vespa/document/bucket/fixed_bucket_spaces.h>
+#include <vespa/document/fieldset/fieldsets.h>
+#include <vespa/document/test/make_document_bucket.h>
+#include <vespa/document/test/make_bucket_space.h>
+#include <vespa/storage/config/config-stor-distributormanager.h>
+#include <vespa/storage/distributor/distributor.h>
+#include <vespa/storage/distributor/distributor_stripe.h>
+#include <vespa/storage/distributor/distributor_status.h>
+#include <vespa/storage/distributor/distributor_bucket_space.h>
+#include <vespa/storage/distributor/distributormetricsset.h>
+#include <vespa/storage/distributor/distributor_stripe_pool.h>
+#include <vespa/storage/distributor/distributor_stripe_thread.h>
+#include <vespa/vespalib/text/stringtokenizer.h>
+#include <vespa/vespalib/stllike/asciistream.h>
+#include <vespa/metrics/updatehook.h>
+#include <thread>
+#include <vespa/vespalib/gtest/gtest.h>
+#include <gmock/gmock.h>
+
+using document::test::makeDocumentBucket;
+using document::test::makeBucketSpace;
+using document::FixedBucketSpaces;
+using document::BucketSpace;
+using document::Bucket;
+using document::BucketId;
+using namespace ::testing;
+
+namespace storage::distributor {
+
+struct TopLevelDistributorTest : Test, TopLevelDistributorTestUtil {
+ TopLevelDistributorTest();
+ ~TopLevelDistributorTest() override;
+
+ void SetUp() override {
+ create_links();
+ };
+
+ void TearDown() override {
+ close();
+ }
+
+ // Simple type aliases to make interfacing with certain utility functions
+ // easier. Note that this is only for readability and does not provide any
+ // added type safety.
+ using NodeCount = int;
+ using Redundancy = int;
+ using ConfigBuilder = vespa::config::content::core::StorDistributormanagerConfigBuilder;
+
+ std::string resolve_stripe_operation_routing(std::shared_ptr<api::StorageMessage> msg) {
+ handle_top_level_message(msg);
+
+ vespalib::asciistream posted_msgs;
+ auto stripes = distributor_stripes();
+ for (size_t i = 0; i < stripes.size(); ++i) {
+ // TODO less intrusive, this is brittle.
+ for (auto& qmsg : stripes[i]->_messageQueue) {
+ posted_msgs << "Stripe " << i << ": " << MessageSenderStub::dumpMessage(*qmsg, false, false);
+ }
+ stripes[i]->_messageQueue.clear();
+ }
+ return posted_msgs.str();
+ }
+
+ void tick_distributor_n_times(uint32_t n) {
+ for (uint32_t i = 0; i < n; ++i) {
+ tick();
+ }
+ }
+
+ StatusReporterDelegate& distributor_status_delegate() {
+ return _distributor->_distributorStatusDelegate;
+ }
+
+ framework::TickingThreadPool& distributor_thread_pool() {
+ return _distributor->_threadPool;
+ }
+
+ const std::vector<std::shared_ptr<DistributorStatus>>& distributor_status_todos() {
+ return _distributor->_status_to_do;
+ }
+
+ Distributor::MetricUpdateHook distributor_metric_update_hook() {
+ return _distributor->_metricUpdateHook;
+ }
+};
+
+TopLevelDistributorTest::TopLevelDistributorTest()
+ : Test(),
+ TopLevelDistributorTestUtil()
+{
+}
+
+TopLevelDistributorTest::~TopLevelDistributorTest() = default;
+
+TEST_F(TopLevelDistributorTest, external_operation_is_routed_to_expected_stripe) {
+ setup_distributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1");
+
+ auto op = std::make_shared<api::RemoveCommand>(
+ makeDocumentBucket(document::BucketId()),
+ document::DocumentId("id:m:test:n=1:foo"),
+ api::Timestamp(1234));
+
+ // We expect stripe mapping to be deterministic.
+ EXPECT_EQ("Stripe 2: Remove", resolve_stripe_operation_routing(op));
+
+ auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "foo", "bar", "");
+ cmd->addBucketToBeVisited(document::BucketId(16, 1234));
+ cmd->addBucketToBeVisited(document::BucketId());
+
+ EXPECT_EQ("Stripe 1: Visitor Create", resolve_stripe_operation_routing(cmd));
+}
+
+TEST_F(TopLevelDistributorTest, recovery_mode_on_cluster_state_change_is_triggered_across_all_stripes) {
+ setup_distributor(Redundancy(1), NodeCount(2),
+ "storage:1 .0.s:d distributor:1");
+ enable_distributor_cluster_state("storage:1 distributor:1");
+
+ EXPECT_TRUE(all_distributor_stripes_are_in_recovery_mode());
+ tick();
+ EXPECT_FALSE(all_distributor_stripes_are_in_recovery_mode());
+
+ enable_distributor_cluster_state("storage:2 distributor:1");
+ EXPECT_TRUE(all_distributor_stripes_are_in_recovery_mode());
+}
+
+// TODO STRIPE consider moving to generic test, not specific to top-level distributor or stripe
+TEST_F(TopLevelDistributorTest, contains_time_statement) {
+ setup_distributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1");
+
+ auto cfg = _component->total_distributor_config_sp();
+ EXPECT_FALSE(cfg->containsTimeStatement(""));
+ EXPECT_FALSE(cfg->containsTimeStatement("testdoctype1"));
+ EXPECT_FALSE(cfg->containsTimeStatement("testdoctype1.headerfield > 42"));
+ EXPECT_TRUE(cfg->containsTimeStatement("testdoctype1.headerfield > now()"));
+ EXPECT_TRUE(cfg->containsTimeStatement("testdoctype1.headerfield > now() - 3600"));
+ EXPECT_TRUE(cfg->containsTimeStatement("testdoctype1.headerfield == now() - 3600"));
+}
+
+TEST_F(TopLevelDistributorTest, config_changes_are_propagated_to_all_stripes) {
+ setup_distributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1");
+
+ for (auto* s : distributor_stripes()) {
+ ASSERT_NE(s->getConfig().getSplitCount(), 1234);
+ ASSERT_NE(s->getConfig().getJoinCount(), 123);
+ }
+
+ auto cfg = current_distributor_config();
+ cfg.splitcount = 1234;
+ cfg.joincount = 123;
+ reconfigure(cfg);
+
+ for (auto* s : distributor_stripes()) {
+ ASSERT_EQ(s->getConfig().getSplitCount(), 1234);
+ ASSERT_EQ(s->getConfig().getJoinCount(), 123);
+ }
+}
+
+namespace {
+
+using namespace framework::defaultimplementation;
+
+class StatusRequestThread : public framework::Runnable {
+ StatusReporterDelegate& _reporter;
+ std::string _result;
+public:
+ explicit StatusRequestThread(StatusReporterDelegate& reporter)
+ : _reporter(reporter)
+ {}
+ void run(framework::ThreadHandle&) override {
+ framework::HttpUrlPath path("/distributor?page=buckets");
+ std::ostringstream stream;
+ _reporter.reportStatus(stream, path);
+ _result = stream.str();
+ }
+
+ std::string getResult() const {
+ return _result;
+ }
+};
+
+}
+
+TEST_F(TopLevelDistributorTest, tick_aggregates_status_requests_from_all_stripes) {
+ setup_distributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1");
+
+ ASSERT_NE(stripe_of_bucket(document::BucketId(16, 1)),
+ stripe_of_bucket(document::BucketId(16, 2)));
+
+ add_nodes_to_stripe_bucket_db(document::BucketId(16, 1), "0=1/1/1/t");
+ add_nodes_to_stripe_bucket_db(document::BucketId(16, 2), "0=2/2/2/t");
+
+ // Must go via delegate since reportStatus is now just a rendering
+ // function and not a request enqueuer (see Distributor::handleStatusRequest).
+ StatusRequestThread thread(distributor_status_delegate());
+ FakeClock clock;
+ ThreadPoolImpl pool(clock);
+ int ticksBeforeWait = 1;
+ framework::Thread::UP tp(pool.startThread(thread, "statustest", 5ms, 5s, ticksBeforeWait));
+
+ while (true) {
+ std::this_thread::sleep_for(1ms);
+ framework::TickingLockGuard guard(distributor_thread_pool().freezeCriticalTicks());
+ if (!distributor_status_todos().empty()) {
+ break;
+ }
+
+ }
+ ASSERT_TRUE(tick());
+
+ tp->interruptAndJoin();
+
+ // Result contains buckets from DBs of multiple stripes.
+ EXPECT_THAT(thread.getResult(), HasSubstr("BucketId(0x4000000000000001)"));
+ EXPECT_THAT(thread.getResult(), HasSubstr("BucketId(0x4000000000000002)"));
+}
+
+TEST_F(TopLevelDistributorTest, metric_update_hook_updates_pending_maintenance_metrics) {
+ setup_distributor(Redundancy(2), NodeCount(2), "storage:2 distributor:1");
+ // To ensure we count all operations, not just those fitting within the pending window.
+ auto cfg = current_distributor_config();
+ cfg.maxpendingidealstateoperations = 1; // FIXME STRIPE this does not actually seem to be used...!
+ reconfigure(cfg);
+
+ // 1 bucket must be merged, 1 must be split, 1 should be activated.
+ add_nodes_to_stripe_bucket_db(document::BucketId(16, 1), "0=2/2/2/t/a,1=1/1/1");
+ add_nodes_to_stripe_bucket_db(document::BucketId(16, 2), "0=100/10000000/200000/t/a,1=100/10000000/200000/t");
+ add_nodes_to_stripe_bucket_db(document::BucketId(16, 3), "0=200/300/400/t,1=200/300/400/t");
+
+ // Go many full scanner rounds to check that metrics are set, not added to existing.
+ tick_distributor_n_times(50);
+
+ // By this point, no hook has been called so the metrics have not been set.
+ using MO = MaintenanceOperation;
+ {
+ const IdealStateMetricSet& metrics = total_ideal_state_metrics();
+ EXPECT_EQ(0, metrics.operations[MO::MERGE_BUCKET]->pending.getLast());
+ EXPECT_EQ(0, metrics.operations[MO::SPLIT_BUCKET]->pending.getLast());
+ EXPECT_EQ(0, metrics.operations[MO::SET_BUCKET_STATE]->pending.getLast());
+ EXPECT_EQ(0, metrics.operations[MO::DELETE_BUCKET]->pending.getLast());
+ EXPECT_EQ(0, metrics.operations[MO::JOIN_BUCKET]->pending.getLast());
+ EXPECT_EQ(0, metrics.operations[MO::GARBAGE_COLLECTION]->pending.getLast());
+ }
+
+ // Force trigger update hook
+ std::mutex l;
+ distributor_metric_update_hook().updateMetrics(metrics::MetricLockGuard(l));
+ // Metrics should now be updated to the last complete working state
+ {
+ const IdealStateMetricSet& metrics = total_ideal_state_metrics();
+ EXPECT_EQ(1, metrics.operations[MO::MERGE_BUCKET]->pending.getLast());
+ EXPECT_EQ(1, metrics.operations[MO::SPLIT_BUCKET]->pending.getLast());
+ EXPECT_EQ(1, metrics.operations[MO::SET_BUCKET_STATE]->pending.getLast());
+ EXPECT_EQ(0, metrics.operations[MO::DELETE_BUCKET]->pending.getLast());
+ EXPECT_EQ(0, metrics.operations[MO::JOIN_BUCKET]->pending.getLast());
+ EXPECT_EQ(0, metrics.operations[MO::GARBAGE_COLLECTION]->pending.getLast());
+ }
+}
+
+}
diff --git a/storage/src/tests/distributor/top_level_distributor_test_util.cpp b/storage/src/tests/distributor/top_level_distributor_test_util.cpp
new file mode 100644
index 00000000000..e7670a78a51
--- /dev/null
+++ b/storage/src/tests/distributor/top_level_distributor_test_util.cpp
@@ -0,0 +1,310 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "top_level_distributor_test_util.h"
+#include <vespa/config-stor-distribution.h>
+#include <vespa/document/test/make_bucket_space.h>
+#include <vespa/document/test/make_document_bucket.h>
+#include <vespa/storage/distributor/distributor.h>
+#include <vespa/storage/distributor/distributor_bucket_space.h>
+#include <vespa/storage/distributor/distributor_stripe.h>
+#include <vespa/storage/distributor/distributor_stripe_component.h>
+#include <vespa/storage/distributor/distributor_stripe_pool.h>
+#include <vespa/storage/distributor/distributor_stripe_thread.h>
+#include <vespa/storage/common/bucket_stripe_utils.h>
+#include <vespa/vdslib/distribution/distribution.h>
+#include <vespa/vespalib/text/stringtokenizer.h>
+
+using document::test::makeBucketSpace;
+using document::test::makeDocumentBucket;
+
+namespace storage::distributor {
+
+TopLevelDistributorTestUtil::TopLevelDistributorTestUtil()
+ : _message_sender(_sender, _sender_down),
+ _num_distributor_stripes(4)
+{
+ _config = getStandardConfig(false);
+}
+
+TopLevelDistributorTestUtil::~TopLevelDistributorTestUtil() = default;
+
+void
+TopLevelDistributorTestUtil::create_links()
+{
+ _node.reset(new TestDistributorApp(_config.getConfigId()));
+ _thread_pool = framework::TickingThreadPool::createDefault("distributor");
+ _stripe_pool = DistributorStripePool::make_non_threaded_pool_for_testing();
+ _distributor.reset(new Distributor(
+ _node->getComponentRegister(),
+ _node->node_identity(),
+ *_thread_pool,
+ *_stripe_pool,
+ *this,
+ _num_distributor_stripes,
+ _host_info,
+ &_message_sender));
+ _component.reset(new storage::DistributorComponent(_node->getComponentRegister(), "distrtestutil"));
+};
+
+void
+TopLevelDistributorTestUtil::setup_distributor(int redundancy,
+ int node_count,
+ const std::string& cluster_state,
+ uint32_t early_return,
+ bool require_primary_to_be_written)
+{
+ setup_distributor(redundancy, node_count, lib::ClusterStateBundle(lib::ClusterState(cluster_state)),
+ early_return, require_primary_to_be_written);
+}
+
+void
+TopLevelDistributorTestUtil::setup_distributor(int redundancy,
+ int node_count,
+ const lib::ClusterStateBundle& state,
+ uint32_t early_return,
+ bool require_primary_to_be_written)
+{
+ lib::Distribution::DistributionConfigBuilder config(
+ lib::Distribution::getDefaultDistributionConfig(redundancy, node_count).get());
+ config.redundancy = redundancy;
+ config.initialRedundancy = early_return;
+ config.ensurePrimaryPersisted = require_primary_to_be_written;
+ auto distribution = std::make_shared<lib::Distribution>(config);
+ _node->getComponentRegister().setDistribution(distribution);
+ // This is for all intents and purposes a hack to avoid having the
+ // distributor treat setting the distribution explicitly as a signal that
+ // it should send RequestBucketInfo to all configured nodes.
+ // If we called storage_distribution_changed followed by enableDistribution
+ // explicitly (which is what happens in "real life"), that is what would
+ // take place.
+ // The inverse case of this can be explicitly accomplished by calling
+ // triggerDistributionChange().
+ // This isn't pretty, folks, but it avoids breaking the world for now,
+ // as many tests have implicit assumptions about this being the behavior.
+ _distributor->propagateDefaultDistribution(distribution);
+ // Explicitly init the stripe pool since onOpen isn't called during testing
+ _distributor->start_stripe_pool();
+ enable_distributor_cluster_state(state);
+}
+
+size_t
+TopLevelDistributorTestUtil::stripe_of_bucket(const document::BucketId& id) const noexcept
+{
+ return stripe_of_bucket_key(id.toKey(), _distributor->_n_stripe_bits);
+}
+
+size_t
+TopLevelDistributorTestUtil::stripe_of_bucket(const document::Bucket& bucket) const noexcept
+{
+ return stripe_of_bucket_key(bucket.getBucketId().toKey(), _distributor->_n_stripe_bits);
+}
+
+void
+TopLevelDistributorTestUtil::receive_set_system_state_command(const vespalib::string& state_str)
+{
+ auto state_cmd = std::make_shared<api::SetSystemStateCommand>(lib::ClusterState(state_str));
+ handle_top_level_message(state_cmd); // TODO move semantics
+}
+
+bool
+TopLevelDistributorTestUtil::handle_top_level_message(const std::shared_ptr<api::StorageMessage>& msg)
+{
+ return _distributor->onDown(msg);
+}
+
+void
+TopLevelDistributorTestUtil::close()
+{
+ _component.reset(0);
+ if (_distributor.get()) {
+ _stripe_pool->stop_and_join(); // Must be tagged as stopped prior to onClose
+ _distributor->onClose();
+ }
+ _sender.clear();
+ _node.reset(0);
+ _config = getStandardConfig(false);
+}
+
+void
+TopLevelDistributorTestUtil::add_nodes_to_stripe_bucket_db(const document::Bucket& bucket,
+ const std::string& nodeStr)
+{
+ BucketDatabase::Entry entry = get_bucket(bucket);
+
+ if (!entry.valid()) {
+ entry = BucketDatabase::Entry(bucket.getBucketId());
+ }
+
+ entry->clear();
+
+ vespalib::StringTokenizer tokenizer(nodeStr, ",");
+ for (uint32_t i = 0; i < tokenizer.size(); ++i) {
+ vespalib::StringTokenizer tok2(tokenizer[i], "=");
+ vespalib::StringTokenizer tok3(tok2[1], "/");
+
+ api::BucketInfo info(atoi(tok3[0].data()),
+ atoi(tok3.size() > 1 ? tok3[1].data() : tok3[0].data()),
+ atoi(tok3.size() > 2 ? tok3[2].data() : tok3[0].data()));
+
+ size_t flagsIdx = 3;
+
+ // Meta info override? For simplicity, require both meta count and size
+ if (tok3.size() > 4 && (!tok3[3].empty() && isdigit(tok3[3][0]))) {
+ info.setMetaCount(atoi(tok3[3].data()));
+ info.setUsedFileSize(atoi(tok3[4].data()));
+ flagsIdx = 5;
+ }
+
+ if ((tok3.size() > flagsIdx + 1) && tok3[flagsIdx + 1] == "a") {
+ info.setActive();
+ } else {
+ info.setActive(false);
+ }
+ if ((tok3.size() > flagsIdx + 2) && tok3[flagsIdx + 2] == "r") {
+ info.setReady();
+ } else {
+ info.setReady(false);
+ }
+
+ uint16_t idx = atoi(tok2[0].data());
+ BucketCopy node(0, idx, info);
+
+ // Allow user to manually override trusted and active.
+ if (tok3.size() > flagsIdx && tok3[flagsIdx] == "t") {
+ node.setTrusted();
+ }
+
+ entry->addNodeManual(node);
+ }
+
+ stripe_bucket_database(stripe_of_bucket(bucket), bucket.getBucketSpace()).update(entry);
+}
+
+void
+TopLevelDistributorTestUtil::add_nodes_to_stripe_bucket_db(const document::BucketId& id,
+ const std::string& nodeStr)
+{
+ add_nodes_to_stripe_bucket_db(document::Bucket(makeBucketSpace(), id), nodeStr);
+}
+
+BucketDatabase::Entry
+TopLevelDistributorTestUtil::get_bucket(const document::Bucket& bucket) const
+{
+ return stripe_bucket_database(stripe_of_bucket(bucket), bucket.getBucketSpace()).get(bucket.getBucketId());
+}
+
+BucketDatabase::Entry
+TopLevelDistributorTestUtil::get_bucket(const document::BucketId& bId) const
+{
+ return stripe_bucket_database(stripe_of_bucket(bId)).get(bId);
+}
+
+BucketDBUpdater&
+TopLevelDistributorTestUtil::bucket_db_updater() {
+ return *_distributor->_bucket_db_updater;
+}
+
+const IdealStateMetricSet&
+TopLevelDistributorTestUtil::total_ideal_state_metrics() const
+{
+ assert(_distributor->_ideal_state_total_metrics);
+ return *_distributor->_ideal_state_total_metrics;
+}
+
+const storage::distributor::DistributorNodeContext&
+TopLevelDistributorTestUtil::node_context() const {
+ return _distributor->distributor_component();
+}
+
+storage::distributor::DistributorStripeOperationContext&
+TopLevelDistributorTestUtil::operation_context() {
+ return _distributor->distributor_component();
+}
+
+bool
+TopLevelDistributorTestUtil::tick() {
+ framework::ThreadWaitInfo res(
+ framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN);
+ {
+ framework::TickingLockGuard lock(_distributor->_threadPool.freezeCriticalTicks());
+ res.merge(_distributor->doCriticalTick(0));
+ }
+ res.merge(_distributor->doNonCriticalTick(0));
+ bool did_work = !res.waitWanted();
+ for (auto& s : *_stripe_pool) {
+ did_work |= s->stripe().tick();
+ }
+ return did_work;
+}
+
+const DistributorConfig&
+TopLevelDistributorTestUtil::current_distributor_config() const
+{
+ return _component->getDistributorConfig();
+}
+
+void
+TopLevelDistributorTestUtil::reconfigure(const DistributorConfig& cfg)
+{
+ _node->getComponentRegister().setDistributorConfig(cfg);
+ tick(); // Config is propagated upon next top-level tick
+}
+
+BucketDatabase&
+TopLevelDistributorTestUtil::stripe_bucket_database(uint16_t stripe_idx) {
+ assert(stripe_idx < _distributor->_stripes.size());
+ return _distributor->_stripes[stripe_idx]->getBucketSpaceRepo().get(makeBucketSpace()).getBucketDatabase();
+}
+
+BucketDatabase&
+TopLevelDistributorTestUtil::stripe_bucket_database(uint16_t stripe_idx, document::BucketSpace space) {
+ assert(stripe_idx < _distributor->_stripes.size());
+ return _distributor->_stripes[stripe_idx]->getBucketSpaceRepo().get(space).getBucketDatabase();
+}
+
+const BucketDatabase&
+TopLevelDistributorTestUtil::stripe_bucket_database(uint16_t stripe_idx) const {
+ assert(stripe_idx < _distributor->_stripes.size());
+ return _distributor->_stripes[stripe_idx]->getBucketSpaceRepo().get(makeBucketSpace()).getBucketDatabase();
+}
+
+const BucketDatabase&
+TopLevelDistributorTestUtil::stripe_bucket_database(uint16_t stripe_idx, document::BucketSpace space) const {
+ assert(stripe_idx < _distributor->_stripes.size());
+ return _distributor->_stripes[stripe_idx]->getBucketSpaceRepo().get(space).getBucketDatabase();
+}
+
+// Hide how the sausages are made when directly accessing internal stripes
+std::vector<DistributorStripe*>
+TopLevelDistributorTestUtil::distributor_stripes() const {
+ std::vector<DistributorStripe*> stripes;
+ stripes.reserve(_distributor->_stripes.size());
+ for (auto& s : _distributor->_stripes) {
+ stripes.emplace_back(s.get());
+ }
+ return stripes;
+}
+
+bool
+TopLevelDistributorTestUtil::all_distributor_stripes_are_in_recovery_mode() const {
+ for (auto* s : distributor_stripes()) {
+ if (!s->isInRecoveryMode()) {
+ return false;
+ }
+ }
+ return true;
+}
+
+void
+TopLevelDistributorTestUtil::enable_distributor_cluster_state(vespalib::stringref state)
+{
+ bucket_db_updater().simulate_cluster_state_bundle_activation(
+ lib::ClusterStateBundle(lib::ClusterState(state)));
+}
+
+void
+TopLevelDistributorTestUtil::enable_distributor_cluster_state(const lib::ClusterStateBundle& state)
+{
+ bucket_db_updater().simulate_cluster_state_bundle_activation(state);
+}
+
+}
diff --git a/storage/src/tests/distributor/top_level_distributor_test_util.h b/storage/src/tests/distributor/top_level_distributor_test_util.h
new file mode 100644
index 00000000000..0cc736464a1
--- /dev/null
+++ b/storage/src/tests/distributor/top_level_distributor_test_util.h
@@ -0,0 +1,131 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "distributor_message_sender_stub.h"
+#include <tests/common/dummystoragelink.h>
+#include <tests/common/testhelper.h>
+#include <tests/common/teststorageapp.h>
+#include <vespa/storage/common/hostreporter/hostinfo.h>
+#include <vespa/storage/frameworkimpl/component/distributorcomponentregisterimpl.h>
+#include <vespa/storage/storageutil/utils.h>
+#include <vespa/storageapi/message/state.h>
+#include <vespa/storageframework/defaultimplementation/clock/fakeclock.h>
+
+namespace storage {
+
+namespace framework { struct TickingThreadPool; }
+
+namespace distributor {
+
+class Distributor;
+class DistributorNodeContext;
+class DistributorStripe;
+class DistributorStripeComponent;
+class DistributorStripeOperationContext;
+class DistributorStripePool;
+class IdealStateMetricSet;
+class Operation;
+class BucketDBUpdater;
+
+class TopLevelDistributorTestUtil : private DoneInitializeHandler
+{
+public:
+ TopLevelDistributorTestUtil();
+ ~TopLevelDistributorTestUtil();
+
+ void create_links();
+
+ void close();
+
+ size_t stripe_of_bucket(const document::BucketId& id) const noexcept;
+ size_t stripe_of_bucket(const document::Bucket& bucket) const noexcept;
+
+ /**
+ * Parses the given string to a set of node => bucket info data,
+ * and inserts them as nodes in the given bucket.
+ * Format:
+ * "node1=checksum/docs/size,node2=checksum/docs/size"
+ */
+ void add_nodes_to_stripe_bucket_db(const document::Bucket& bucket,
+ const std::string& nodeStr);
+ // As the above, but always inserts into default bucket space
+ void add_nodes_to_stripe_bucket_db(const document::BucketId& id, const std::string& nodeStr);
+
+ BucketDBUpdater& bucket_db_updater();
+ const IdealStateMetricSet& total_ideal_state_metrics() const;
+ const storage::distributor::DistributorNodeContext& node_context() const;
+ storage::distributor::DistributorStripeOperationContext& operation_context();
+
+ std::vector<DistributorStripe*> distributor_stripes() const;
+
+ bool tick();
+
+ const DistributorConfig& current_distributor_config() const;
+ void reconfigure(const DistributorConfig&);
+
+ BucketDatabase& stripe_bucket_database(uint16_t stripe_idx); // Implicit default space only
+ BucketDatabase& stripe_bucket_database(uint16_t stripe_idx, document::BucketSpace space);
+ const BucketDatabase& stripe_bucket_database(uint16_t stripe_idx) const; // Implicit default space only
+ const BucketDatabase& stripe_bucket_database(uint16_t stripe_idx, document::BucketSpace space) const;
+ [[nodiscard]] bool all_distributor_stripes_are_in_recovery_mode() const;
+
+ void setup_distributor(int redundancy,
+ int node_count,
+ const std::string& systemState,
+ uint32_t early_return = false,
+ bool require_primary_to_be_written = true);
+
+ void setup_distributor(int redundancy,
+ int node_count,
+ const lib::ClusterStateBundle& state,
+ uint32_t early_return = false,
+ bool require_primary_to_be_written = true);
+
+ void notifyDoneInitializing() override {}
+
+ BucketDatabase::Entry get_bucket(const document::Bucket& bucket) const;
+ // Gets bucket entry from default space only
+ BucketDatabase::Entry get_bucket(const document::BucketId& bId) const;
+
+ DistributorMessageSenderStub& sender() noexcept { return _sender; }
+ const DistributorMessageSenderStub& sender() const noexcept { return _sender; }
+
+ // Invokes full cluster state transition pipeline rather than directly applying
+ // the state and just pretending everything has been completed.
+ void receive_set_system_state_command(const vespalib::string& state_str);
+ bool handle_top_level_message(const std::shared_ptr<api::StorageMessage>& msg);
+
+protected:
+ vdstestlib::DirConfig _config;
+ std::unique_ptr<TestDistributorApp> _node;
+ std::unique_ptr<framework::TickingThreadPool> _thread_pool;
+ std::unique_ptr<DistributorStripePool> _stripe_pool;
+ std::unique_ptr<Distributor> _distributor;
+ std::unique_ptr<storage::DistributorComponent> _component;
+ DistributorMessageSenderStub _sender;
+ DistributorMessageSenderStub _sender_down;
+ HostInfo _host_info;
+
+ struct MessageSenderImpl : public ChainedMessageSender {
+ DistributorMessageSenderStub& _sender;
+ DistributorMessageSenderStub& _senderDown;
+ MessageSenderImpl(DistributorMessageSenderStub& up, DistributorMessageSenderStub& down)
+ : _sender(up), _senderDown(down) {}
+
+ void sendUp(const std::shared_ptr<api::StorageMessage>& msg) override {
+ _sender.send(msg);
+ }
+ void sendDown(const std::shared_ptr<api::StorageMessage>& msg) override {
+ _senderDown.send(msg);
+ }
+ };
+ MessageSenderImpl _message_sender;
+ uint32_t _num_distributor_stripes;
+
+ void enable_distributor_cluster_state(vespalib::stringref state);
+ void enable_distributor_cluster_state(const lib::ClusterStateBundle& state);
+};
+
+}
+
+}
diff --git a/storage/src/vespa/storage/bucketdb/bucketinfo.h b/storage/src/vespa/storage/bucketdb/bucketinfo.h
index 533192eda89..690fd3e36a9 100644
--- a/storage/src/vespa/storage/bucketdb/bucketinfo.h
+++ b/storage/src/vespa/storage/bucketdb/bucketinfo.h
@@ -10,6 +10,7 @@ namespace storage {
namespace distributor {
class DistributorStripeTestUtil;
class DistributorTestUtil;
+ class TopLevelDistributorTestUtil;
}
enum class TrustedUpdate {
@@ -205,6 +206,7 @@ public:
private:
friend class distributor::DistributorStripeTestUtil;
friend class distributor::DistributorTestUtil;
+ friend class distributor::TopLevelDistributorTestUtil;
/**
* Returns the bucket copy struct for the given node, null if nonexisting
diff --git a/storage/src/vespa/storage/common/distributorcomponent.h b/storage/src/vespa/storage/common/distributorcomponent.h
index 403ffa3376c..cbea739866f 100644
--- a/storage/src/vespa/storage/common/distributorcomponent.h
+++ b/storage/src/vespa/storage/common/distributorcomponent.h
@@ -41,17 +41,17 @@ namespace lib {
class IdealNodeCalculator;
}
-typedef vespa::config::content::core::internal::InternalStorDistributormanagerType DistributorConfig;
-typedef vespa::config::content::core::internal::InternalStorVisitordispatcherType VisitorConfig;
+using DistributorConfig = vespa::config::content::core::internal::InternalStorDistributormanagerType;
+using VisitorConfig = vespa::config::content::core::internal::InternalStorVisitordispatcherType;
struct UniqueTimeCalculator {
- virtual ~UniqueTimeCalculator() {}
+ virtual ~UniqueTimeCalculator() = default;
[[nodiscard]] virtual api::Timestamp generate_unique_timestamp() = 0;
};
struct DistributorManagedComponent
{
- virtual ~DistributorManagedComponent() {}
+ virtual ~DistributorManagedComponent() = default;
virtual void setTimeCalculator(UniqueTimeCalculator&) = 0;
virtual void setDistributorConfig(const DistributorConfig&)= 0;
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.h b/storage/src/vespa/storage/distributor/bucketdbupdater.h
index 787f0362da6..0fded3e5a65 100644
--- a/storage/src/vespa/storage/distributor/bucketdbupdater.h
+++ b/storage/src/vespa/storage/distributor/bucketdbupdater.h
@@ -74,6 +74,7 @@ private:
friend class DistributorStripeTestUtil;
friend class DistributorTestUtil;
+ friend class TopLevelDistributorTestUtil;
// Only to be used by tests that want to ensure both the BucketDBUpdater _and_ the Distributor
// components agree on the currently active cluster state bundle.
// Transitively invokes Distributor::enableClusterStateBundle
diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp
index 6f9cbf3b0f2..a5ef9915704 100644
--- a/storage/src/vespa/storage/distributor/distributor.cpp
+++ b/storage/src/vespa/storage/distributor/distributor.cpp
@@ -61,16 +61,18 @@ Distributor::Distributor(DistributorComponentRegister& compReg,
_comp_reg(compReg),
_use_legacy_mode(num_distributor_stripes == 0),
_metrics(std::make_shared<DistributorMetricSet>()),
- _total_metrics(_use_legacy_mode ? std::shared_ptr<DistributorTotalMetrics>() :
- std::make_shared<DistributorTotalMetrics>(num_distributor_stripes)),
- _ideal_state_metrics(_use_legacy_mode ? std::make_shared<IdealStateMetricSet>() : std::shared_ptr<IdealStateMetricSet>()),
- _ideal_state_total_metrics(_use_legacy_mode ? std::shared_ptr<IdealStateTotalMetrics>() :
- std::make_shared<IdealStateTotalMetrics>(num_distributor_stripes)),
+ _total_metrics(_use_legacy_mode ? std::shared_ptr<DistributorTotalMetrics>()
+ : std::make_shared<DistributorTotalMetrics>(num_distributor_stripes)),
+ _ideal_state_metrics(_use_legacy_mode ? std::make_shared<IdealStateMetricSet>()
+ : std::shared_ptr<IdealStateMetricSet>()),
+ _ideal_state_total_metrics(_use_legacy_mode ? std::shared_ptr<IdealStateTotalMetrics>()
+ : std::make_shared<IdealStateTotalMetrics>(num_distributor_stripes)),
_messageSender(messageSender),
_n_stripe_bits(0),
_stripe(std::make_unique<DistributorStripe>(compReg,
_use_legacy_mode ? *_metrics : _total_metrics->stripe(0),
- _use_legacy_mode ? *_ideal_state_metrics : _ideal_state_total_metrics->stripe(0),
+ (_use_legacy_mode ? *_ideal_state_metrics
+ : _ideal_state_total_metrics->stripe(0)),
node_identity, threadPool,
doneInitHandler, *this, *this, _use_legacy_mode)),
_stripe_pool(stripe_pool),
@@ -107,8 +109,8 @@ Distributor::Distributor(DistributorComponentRegister& compReg,
if (!_use_legacy_mode) {
assert(num_distributor_stripes == adjusted_num_stripes(num_distributor_stripes));
_n_stripe_bits = calc_num_stripe_bits(num_distributor_stripes);
- LOG(info, "Setting up distributor with %u stripes using %u stripe bits",
- num_distributor_stripes, _n_stripe_bits); // TODO STRIPE remove once legacy gone
+ LOG(debug, "Setting up distributor with %u stripes using %u stripe bits",
+ num_distributor_stripes, _n_stripe_bits);
_stripe_accessor = std::make_unique<MultiThreadedStripeAccessor>(_stripe_pool);
_bucket_db_updater = std::make_unique<BucketDBUpdater>(_component, _component,
*this, *this,
@@ -270,13 +272,7 @@ Distributor::onOpen()
if (_component.getDistributorConfig().startDistributorThread) {
_threadPool.addThread(*this);
_threadPool.start(_component.getThreadPool());
- if (!_use_legacy_mode) {
- std::vector<TickableStripe*> pool_stripes;
- for (auto& stripe : _stripes) {
- pool_stripes.push_back(stripe.get());
- }
- _stripe_pool.start(pool_stripes);
- }
+ start_stripe_pool();
} else {
LOG(warning, "Not starting distributor thread as it's configured to "
"run. Unless you are just running a test tool, this is a "
@@ -306,6 +302,18 @@ void Distributor::onClose() {
}
void
+Distributor::start_stripe_pool()
+{
+ if (!_use_legacy_mode) {
+ std::vector<TickableStripe*> pool_stripes;
+ for (auto& stripe : _stripes) {
+ pool_stripes.push_back(stripe.get());
+ }
+ _stripe_pool.start(pool_stripes); // If unit testing, this won't actually start any OS threads
+ }
+}
+
+void
Distributor::sendUp(const std::shared_ptr<api::StorageMessage>& msg)
{
if (_messageSender) {
@@ -412,7 +420,7 @@ Distributor::onDown(const std::shared_ptr<api::StorageMessage>& msg)
vespalib::make_string("Distributor::onDown(): Dispatch message to stripe %u", stripe_idx));
bool handled = _stripes[stripe_idx]->handle_or_enqueue_message(msg);
if (handled) {
- _stripe_pool.stripe_thread(stripe_idx).notify_event_has_triggered();
+ _stripe_pool.notify_stripe_event_has_triggered(stripe_idx);
}
return handled;
}
@@ -750,6 +758,12 @@ Distributor::signal_work_was_done()
_tickResult = framework::ThreadWaitInfo::MORE_WORK_ENQUEUED;
}
+bool
+Distributor::work_was_done() const noexcept
+{
+ return !_tickResult.waitWanted();
+}
+
vespalib::string
Distributor::getReportContentType(const framework::HttpUrlPath& path) const
{
diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h
index 8a522a34be1..ad19af7a656 100644
--- a/storage/src/vespa/storage/distributor/distributor.h
+++ b/storage/src/vespa/storage/distributor/distributor.h
@@ -80,6 +80,8 @@ public:
void sendUp(const std::shared_ptr<api::StorageMessage>&) override;
void sendDown(const std::shared_ptr<api::StorageMessage>&) override;
+ void start_stripe_pool();
+
DistributorMetricSet& getMetrics();
// Implements DistributorInterface and DistributorMessageSender.
@@ -127,10 +129,12 @@ public:
private:
friend class DistributorStripeTestUtil;
friend class DistributorTestUtil;
+ friend class TopLevelDistributorTestUtil;
friend class LegacyBucketDBUpdaterTest;
friend class MetricUpdateHook;
friend struct DistributorStripeTest;
friend struct LegacyDistributorTest;
+ friend struct TopLevelDistributorTest;
void setNodeStateUp();
bool handleMessage(const std::shared_ptr<api::StorageMessage>& msg);
@@ -180,6 +184,7 @@ private:
void fetch_status_requests();
void handle_status_requests();
void signal_work_was_done();
+ [[nodiscard]] bool work_was_done() const noexcept;
void enableNextDistribution();
void propagateDefaultDistribution(std::shared_ptr<const lib::Distribution>);
@@ -205,7 +210,7 @@ private:
std::shared_ptr<DistributorMetricSet> _metrics;
std::shared_ptr<DistributorTotalMetrics> _total_metrics;
std::shared_ptr<IdealStateMetricSet> _ideal_state_metrics;
- std::shared_ptr<IdealStateTotalMetrics> _ideal_state_total_metrics;
+ std::shared_ptr<IdealStateTotalMetrics> _ideal_state_total_metrics;
ChainedMessageSender* _messageSender;
// TODO STRIPE multiple stripes...! This is for proof of concept of wiring.
uint8_t _n_stripe_bits;
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.h b/storage/src/vespa/storage/distributor/distributor_stripe.h
index 9deb0754a1f..38667859cfd 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe.h
+++ b/storage/src/vespa/storage/distributor/distributor_stripe.h
@@ -203,6 +203,7 @@ private:
friend class MultiThreadedStripeAccessGuard;
friend struct DistributorStripeTest;
friend struct LegacyDistributorTest;
+ friend struct TopLevelDistributorTest;
bool handleMessage(const std::shared_ptr<api::StorageMessage>& msg);
bool isMaintenanceReply(const api::StorageReply& reply) const;
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_pool.cpp b/storage/src/vespa/storage/distributor/distributor_stripe_pool.cpp
index 13162de4208..e0d3e3d2508 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe_pool.cpp
+++ b/storage/src/vespa/storage/distributor/distributor_stripe_pool.cpp
@@ -7,7 +7,7 @@
namespace storage::distributor {
-DistributorStripePool::DistributorStripePool()
+DistributorStripePool::DistributorStripePool(bool test_mode, PrivateCtorTag)
: _thread_pool(512_Ki),
_n_stripe_bits(0),
_stripes(),
@@ -17,17 +17,30 @@ DistributorStripePool::DistributorStripePool()
_parked_threads(0),
_bootstrap_tick_wait_duration(1ms),
_bootstrap_ticks_before_wait(10),
+ _single_threaded_test_mode(test_mode),
_stopped(false)
{}
+DistributorStripePool::DistributorStripePool()
+ : DistributorStripePool(false, PrivateCtorTag())
+{}
+
DistributorStripePool::~DistributorStripePool() {
if (!_stopped) {
stop_and_join();
}
}
+std::unique_ptr<DistributorStripePool>
+DistributorStripePool::make_non_threaded_pool_for_testing() {
+ return std::make_unique<DistributorStripePool>(true, PrivateCtorTag());
+}
+
void DistributorStripePool::park_all_threads() noexcept {
assert(!_stripes.empty());
+ if (_single_threaded_test_mode) {
+ return;
+ }
// Thread pool is not dynamic and signal_wants_park() is thread safe.
for (auto& s : _stripes) {
s->signal_wants_park();
@@ -37,6 +50,9 @@ void DistributorStripePool::park_all_threads() noexcept {
}
void DistributorStripePool::unpark_all_threads() noexcept {
+ if (_single_threaded_test_mode) {
+ return;
+ }
// Thread pool is not dynamic and unpark_thread() is thread safe.
for (auto& s : _stripes) {
s->unpark_thread();
@@ -58,7 +74,17 @@ TickableStripe& DistributorStripePool::stripe_of_key(uint64_t key) noexcept {
return stripe_thread(stripe_of_bucket_key(key, _n_stripe_bits)).stripe();
}
+void DistributorStripePool::notify_stripe_event_has_triggered(size_t stripe_idx) noexcept {
+ if (_single_threaded_test_mode) {
+ return;
+ }
+ stripe_thread(stripe_idx).notify_event_has_triggered();
+}
+
void DistributorStripePool::park_thread_until_released(DistributorStripeThread& thread) noexcept {
+ if (_single_threaded_test_mode) {
+ return;
+ }
std::unique_lock lock(_mutex);
assert(_parked_threads < _threads.size());
++_parked_threads;
@@ -88,19 +114,25 @@ void DistributorStripePool::start(const std::vector<TickableStripe*>& stripes) {
new_stripe->set_ticks_before_wait(_bootstrap_ticks_before_wait);
_stripes.emplace_back(std::move(new_stripe));
}
+ if (_single_threaded_test_mode) {
+ return; // We want all the control structures in place, but none of the actual OS threads.
+ }
for (auto& s : _stripes) {
_threads.emplace_back(_thread_pool.NewThread(s.get()));
}
}
void DistributorStripePool::stop_and_join() {
+ _stopped = true;
+ if (_single_threaded_test_mode) {
+ return;
+ }
for (auto& s : _stripes) {
s->signal_should_stop();
}
for (auto* t : _threads) {
t->Join();
}
- _stopped = true;
}
void DistributorStripePool::set_tick_wait_duration(vespalib::duration new_tick_wait_duration) noexcept {
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_pool.h b/storage/src/vespa/storage/distributor/distributor_stripe_pool.h
index 75328479296..1b0fa0dd5d3 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe_pool.h
+++ b/storage/src/vespa/storage/distributor/distributor_stripe_pool.h
@@ -46,15 +46,20 @@ class DistributorStripePool {
size_t _parked_threads; // Must be protected by _park_mutex
vespalib::duration _bootstrap_tick_wait_duration;
uint32_t _bootstrap_ticks_before_wait;
+ bool _single_threaded_test_mode;
bool _stopped;
friend class DistributorStripeThread;
+ struct PrivateCtorTag{};
public:
using const_iterator = StripeVector::const_iterator;
+ DistributorStripePool(bool test_mode, PrivateCtorTag);
DistributorStripePool();
~DistributorStripePool();
+ static std::unique_ptr<DistributorStripePool> make_non_threaded_pool_for_testing();
+
// Set up the stripe pool with a 1-1 relationship between the provided
// stripes and running threads. Can only be called once per pool.
//
@@ -78,6 +83,7 @@ public:
[[nodiscard]] DistributorStripeThread& stripe_thread(size_t idx) noexcept {
return *_stripes[idx];
}
+ void notify_stripe_event_has_triggered(size_t stripe_idx) noexcept;
[[nodiscard]] const TickableStripe& stripe_of_key(uint64_t key) const noexcept;
[[nodiscard]] TickableStripe& stripe_of_key(uint64_t key) noexcept;
[[nodiscard]] size_t stripe_count() const noexcept { return _stripes.size(); }
diff --git a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h
index cf2be06bda9..1456308c3d0 100644
--- a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h
+++ b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h
@@ -144,6 +144,7 @@ private:
friend class DistributorStripeTestUtil;
friend class DistributorTestUtil;
+ friend class TopLevelDistributorTestUtil; // TODO STRIPE remove asap
// TODO refactor and rewire to avoid needing this direct meddling
friend class DistributorStripe;