diff options
author | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2021-08-27 09:07:57 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2021-08-27 11:05:33 +0000 |
commit | a52abb01475098fa77be306bb9ae61611a08cb1a (patch) | |
tree | d6674fd2c9b0f70b6691edc661db40b4c1cdce5f /storage/src/tests/distributor | |
parent | a0c5413398a2a3e4a4754dbf819c4a22b00b984a (diff) |
Add test of top-level distributor functionality
This is a subset of the legacy distributor tests, adapted to
explicitly test cross-stripe functionality. Once all relevant
tests have been ported to be cross-stripe, the legacy test
code will be removed.
Diffstat (limited to 'storage/src/tests/distributor')
4 files changed, 711 insertions, 0 deletions
diff --git a/storage/src/tests/distributor/CMakeLists.txt b/storage/src/tests/distributor/CMakeLists.txt index 0d314e050d4..678c19d4c6f 100644 --- a/storage/src/tests/distributor/CMakeLists.txt +++ b/storage/src/tests/distributor/CMakeLists.txt @@ -47,6 +47,8 @@ vespa_add_executable(storage_distributor_gtest_runner_app TEST statoperationtest.cpp statusreporterdelegatetest.cpp throttlingoperationstartertest.cpp + top_level_distributor_test.cpp + top_level_distributor_test_util.cpp twophaseupdateoperationtest.cpp updateoperationtest.cpp visitoroperationtest.cpp diff --git a/storage/src/tests/distributor/top_level_distributor_test.cpp b/storage/src/tests/distributor/top_level_distributor_test.cpp new file mode 100644 index 00000000000..8572420eaba --- /dev/null +++ b/storage/src/tests/distributor/top_level_distributor_test.cpp @@ -0,0 +1,268 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/storage/distributor/idealstatemetricsset.h> +#include <vespa/storageapi/message/persistence.h> +#include <vespa/storageapi/message/bucketsplitting.h> +#include <vespa/storageapi/message/visitor.h> +#include <vespa/storageapi/message/removelocation.h> +#include <vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h> +#include <tests/distributor/top_level_distributor_test_util.h> +#include <vespa/document/bucket/fixed_bucket_spaces.h> +#include <vespa/document/fieldset/fieldsets.h> +#include <vespa/document/test/make_document_bucket.h> +#include <vespa/document/test/make_bucket_space.h> +#include <vespa/storage/config/config-stor-distributormanager.h> +#include <vespa/storage/distributor/distributor.h> +#include <vespa/storage/distributor/distributor_stripe.h> +#include <vespa/storage/distributor/distributor_status.h> +#include <vespa/storage/distributor/distributor_bucket_space.h> +#include <vespa/storage/distributor/distributormetricsset.h> +#include <vespa/storage/distributor/distributor_stripe_pool.h> +#include <vespa/storage/distributor/distributor_stripe_thread.h> +#include <vespa/vespalib/text/stringtokenizer.h> +#include <vespa/vespalib/stllike/asciistream.h> +#include <vespa/metrics/updatehook.h> +#include <thread> +#include <vespa/vespalib/gtest/gtest.h> +#include <gmock/gmock.h> + +using document::test::makeDocumentBucket; +using document::test::makeBucketSpace; +using document::FixedBucketSpaces; +using document::BucketSpace; +using document::Bucket; +using document::BucketId; +using namespace ::testing; + +namespace storage::distributor { + +struct TopLevelDistributorTest : Test, TopLevelDistributorTestUtil { + TopLevelDistributorTest(); + ~TopLevelDistributorTest() override; + + void SetUp() override { + create_links(); + }; + + void TearDown() override { + close(); + } + + // Simple type aliases to make interfacing with certain utility functions + // easier. Note that this is only for readability and does not provide any + // added type safety. + using NodeCount = int; + using Redundancy = int; + using ConfigBuilder = vespa::config::content::core::StorDistributormanagerConfigBuilder; + + std::string resolve_stripe_operation_routing(std::shared_ptr<api::StorageMessage> msg) { + handle_top_level_message(msg); + + vespalib::asciistream posted_msgs; + auto stripes = distributor_stripes(); + for (size_t i = 0; i < stripes.size(); ++i) { + // TODO less intrusive, this is brittle. + for (auto& qmsg : stripes[i]->_messageQueue) { + posted_msgs << "Stripe " << i << ": " << MessageSenderStub::dumpMessage(*qmsg, false, false); + } + stripes[i]->_messageQueue.clear(); + } + return posted_msgs.str(); + } + + void tick_distributor_n_times(uint32_t n) { + for (uint32_t i = 0; i < n; ++i) { + tick(); + } + } + + StatusReporterDelegate& distributor_status_delegate() { + return _distributor->_distributorStatusDelegate; + } + + framework::TickingThreadPool& distributor_thread_pool() { + return _distributor->_threadPool; + } + + const std::vector<std::shared_ptr<DistributorStatus>>& distributor_status_todos() { + return _distributor->_status_to_do; + } + + Distributor::MetricUpdateHook distributor_metric_update_hook() { + return _distributor->_metricUpdateHook; + } +}; + +TopLevelDistributorTest::TopLevelDistributorTest() + : Test(), + TopLevelDistributorTestUtil() +{ +} + +TopLevelDistributorTest::~TopLevelDistributorTest() = default; + +TEST_F(TopLevelDistributorTest, external_operation_is_routed_to_expected_stripe) { + setup_distributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1"); + + auto op = std::make_shared<api::RemoveCommand>( + makeDocumentBucket(document::BucketId()), + document::DocumentId("id:m:test:n=1:foo"), + api::Timestamp(1234)); + + // We expect stripe mapping to be deterministic. + EXPECT_EQ("Stripe 2: Remove", resolve_stripe_operation_routing(op)); + + auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "foo", "bar", ""); + cmd->addBucketToBeVisited(document::BucketId(16, 1234)); + cmd->addBucketToBeVisited(document::BucketId()); + + EXPECT_EQ("Stripe 1: Visitor Create", resolve_stripe_operation_routing(cmd)); +} + +TEST_F(TopLevelDistributorTest, recovery_mode_on_cluster_state_change_is_triggered_across_all_stripes) { + setup_distributor(Redundancy(1), NodeCount(2), + "storage:1 .0.s:d distributor:1"); + enable_distributor_cluster_state("storage:1 distributor:1"); + + EXPECT_TRUE(all_distributor_stripes_are_in_recovery_mode()); + tick(); + EXPECT_FALSE(all_distributor_stripes_are_in_recovery_mode()); + + enable_distributor_cluster_state("storage:2 distributor:1"); + EXPECT_TRUE(all_distributor_stripes_are_in_recovery_mode()); +} + +// TODO STRIPE consider moving to generic test, not specific to top-level distributor or stripe +TEST_F(TopLevelDistributorTest, contains_time_statement) { + setup_distributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1"); + + auto cfg = _component->total_distributor_config_sp(); + EXPECT_FALSE(cfg->containsTimeStatement("")); + EXPECT_FALSE(cfg->containsTimeStatement("testdoctype1")); + EXPECT_FALSE(cfg->containsTimeStatement("testdoctype1.headerfield > 42")); + EXPECT_TRUE(cfg->containsTimeStatement("testdoctype1.headerfield > now()")); + EXPECT_TRUE(cfg->containsTimeStatement("testdoctype1.headerfield > now() - 3600")); + EXPECT_TRUE(cfg->containsTimeStatement("testdoctype1.headerfield == now() - 3600")); +} + +TEST_F(TopLevelDistributorTest, config_changes_are_propagated_to_all_stripes) { + setup_distributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1"); + + for (auto* s : distributor_stripes()) { + ASSERT_NE(s->getConfig().getSplitCount(), 1234); + ASSERT_NE(s->getConfig().getJoinCount(), 123); + } + + auto cfg = current_distributor_config(); + cfg.splitcount = 1234; + cfg.joincount = 123; + reconfigure(cfg); + + for (auto* s : distributor_stripes()) { + ASSERT_EQ(s->getConfig().getSplitCount(), 1234); + ASSERT_EQ(s->getConfig().getJoinCount(), 123); + } +} + +namespace { + +using namespace framework::defaultimplementation; + +class StatusRequestThread : public framework::Runnable { + StatusReporterDelegate& _reporter; + std::string _result; +public: + explicit StatusRequestThread(StatusReporterDelegate& reporter) + : _reporter(reporter) + {} + void run(framework::ThreadHandle&) override { + framework::HttpUrlPath path("/distributor?page=buckets"); + std::ostringstream stream; + _reporter.reportStatus(stream, path); + _result = stream.str(); + } + + std::string getResult() const { + return _result; + } +}; + +} + +TEST_F(TopLevelDistributorTest, tick_aggregates_status_requests_from_all_stripes) { + setup_distributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1"); + + ASSERT_NE(stripe_of_bucket(document::BucketId(16, 1)), + stripe_of_bucket(document::BucketId(16, 2))); + + add_nodes_to_stripe_bucket_db(document::BucketId(16, 1), "0=1/1/1/t"); + add_nodes_to_stripe_bucket_db(document::BucketId(16, 2), "0=2/2/2/t"); + + // Must go via delegate since reportStatus is now just a rendering + // function and not a request enqueuer (see Distributor::handleStatusRequest). + StatusRequestThread thread(distributor_status_delegate()); + FakeClock clock; + ThreadPoolImpl pool(clock); + int ticksBeforeWait = 1; + framework::Thread::UP tp(pool.startThread(thread, "statustest", 5ms, 5s, ticksBeforeWait)); + + while (true) { + std::this_thread::sleep_for(1ms); + framework::TickingLockGuard guard(distributor_thread_pool().freezeCriticalTicks()); + if (!distributor_status_todos().empty()) { + break; + } + + } + ASSERT_TRUE(tick()); + + tp->interruptAndJoin(); + + // Result contains buckets from DBs of multiple stripes. + EXPECT_THAT(thread.getResult(), HasSubstr("BucketId(0x4000000000000001)")); + EXPECT_THAT(thread.getResult(), HasSubstr("BucketId(0x4000000000000002)")); +} + +TEST_F(TopLevelDistributorTest, metric_update_hook_updates_pending_maintenance_metrics) { + setup_distributor(Redundancy(2), NodeCount(2), "storage:2 distributor:1"); + // To ensure we count all operations, not just those fitting within the pending window. + auto cfg = current_distributor_config(); + cfg.maxpendingidealstateoperations = 1; // FIXME STRIPE this does not actually seem to be used...! + reconfigure(cfg); + + // 1 bucket must be merged, 1 must be split, 1 should be activated. + add_nodes_to_stripe_bucket_db(document::BucketId(16, 1), "0=2/2/2/t/a,1=1/1/1"); + add_nodes_to_stripe_bucket_db(document::BucketId(16, 2), "0=100/10000000/200000/t/a,1=100/10000000/200000/t"); + add_nodes_to_stripe_bucket_db(document::BucketId(16, 3), "0=200/300/400/t,1=200/300/400/t"); + + // Go many full scanner rounds to check that metrics are set, not added to existing. + tick_distributor_n_times(50); + + // By this point, no hook has been called so the metrics have not been set. + using MO = MaintenanceOperation; + { + const IdealStateMetricSet& metrics = total_ideal_state_metrics(); + EXPECT_EQ(0, metrics.operations[MO::MERGE_BUCKET]->pending.getLast()); + EXPECT_EQ(0, metrics.operations[MO::SPLIT_BUCKET]->pending.getLast()); + EXPECT_EQ(0, metrics.operations[MO::SET_BUCKET_STATE]->pending.getLast()); + EXPECT_EQ(0, metrics.operations[MO::DELETE_BUCKET]->pending.getLast()); + EXPECT_EQ(0, metrics.operations[MO::JOIN_BUCKET]->pending.getLast()); + EXPECT_EQ(0, metrics.operations[MO::GARBAGE_COLLECTION]->pending.getLast()); + } + + // Force trigger update hook + std::mutex l; + distributor_metric_update_hook().updateMetrics(metrics::MetricLockGuard(l)); + // Metrics should now be updated to the last complete working state + { + const IdealStateMetricSet& metrics = total_ideal_state_metrics(); + EXPECT_EQ(1, metrics.operations[MO::MERGE_BUCKET]->pending.getLast()); + EXPECT_EQ(1, metrics.operations[MO::SPLIT_BUCKET]->pending.getLast()); + EXPECT_EQ(1, metrics.operations[MO::SET_BUCKET_STATE]->pending.getLast()); + EXPECT_EQ(0, metrics.operations[MO::DELETE_BUCKET]->pending.getLast()); + EXPECT_EQ(0, metrics.operations[MO::JOIN_BUCKET]->pending.getLast()); + EXPECT_EQ(0, metrics.operations[MO::GARBAGE_COLLECTION]->pending.getLast()); + } +} + +} diff --git a/storage/src/tests/distributor/top_level_distributor_test_util.cpp b/storage/src/tests/distributor/top_level_distributor_test_util.cpp new file mode 100644 index 00000000000..e7670a78a51 --- /dev/null +++ b/storage/src/tests/distributor/top_level_distributor_test_util.cpp @@ -0,0 +1,310 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "top_level_distributor_test_util.h" +#include <vespa/config-stor-distribution.h> +#include <vespa/document/test/make_bucket_space.h> +#include <vespa/document/test/make_document_bucket.h> +#include <vespa/storage/distributor/distributor.h> +#include <vespa/storage/distributor/distributor_bucket_space.h> +#include <vespa/storage/distributor/distributor_stripe.h> +#include <vespa/storage/distributor/distributor_stripe_component.h> +#include <vespa/storage/distributor/distributor_stripe_pool.h> +#include <vespa/storage/distributor/distributor_stripe_thread.h> +#include <vespa/storage/common/bucket_stripe_utils.h> +#include <vespa/vdslib/distribution/distribution.h> +#include <vespa/vespalib/text/stringtokenizer.h> + +using document::test::makeBucketSpace; +using document::test::makeDocumentBucket; + +namespace storage::distributor { + +TopLevelDistributorTestUtil::TopLevelDistributorTestUtil() + : _message_sender(_sender, _sender_down), + _num_distributor_stripes(4) +{ + _config = getStandardConfig(false); +} + +TopLevelDistributorTestUtil::~TopLevelDistributorTestUtil() = default; + +void +TopLevelDistributorTestUtil::create_links() +{ + _node.reset(new TestDistributorApp(_config.getConfigId())); + _thread_pool = framework::TickingThreadPool::createDefault("distributor"); + _stripe_pool = DistributorStripePool::make_non_threaded_pool_for_testing(); + _distributor.reset(new Distributor( + _node->getComponentRegister(), + _node->node_identity(), + *_thread_pool, + *_stripe_pool, + *this, + _num_distributor_stripes, + _host_info, + &_message_sender)); + _component.reset(new storage::DistributorComponent(_node->getComponentRegister(), "distrtestutil")); +}; + +void +TopLevelDistributorTestUtil::setup_distributor(int redundancy, + int node_count, + const std::string& cluster_state, + uint32_t early_return, + bool require_primary_to_be_written) +{ + setup_distributor(redundancy, node_count, lib::ClusterStateBundle(lib::ClusterState(cluster_state)), + early_return, require_primary_to_be_written); +} + +void +TopLevelDistributorTestUtil::setup_distributor(int redundancy, + int node_count, + const lib::ClusterStateBundle& state, + uint32_t early_return, + bool require_primary_to_be_written) +{ + lib::Distribution::DistributionConfigBuilder config( + lib::Distribution::getDefaultDistributionConfig(redundancy, node_count).get()); + config.redundancy = redundancy; + config.initialRedundancy = early_return; + config.ensurePrimaryPersisted = require_primary_to_be_written; + auto distribution = std::make_shared<lib::Distribution>(config); + _node->getComponentRegister().setDistribution(distribution); + // This is for all intents and purposes a hack to avoid having the + // distributor treat setting the distribution explicitly as a signal that + // it should send RequestBucketInfo to all configured nodes. + // If we called storage_distribution_changed followed by enableDistribution + // explicitly (which is what happens in "real life"), that is what would + // take place. + // The inverse case of this can be explicitly accomplished by calling + // triggerDistributionChange(). + // This isn't pretty, folks, but it avoids breaking the world for now, + // as many tests have implicit assumptions about this being the behavior. + _distributor->propagateDefaultDistribution(distribution); + // Explicitly init the stripe pool since onOpen isn't called during testing + _distributor->start_stripe_pool(); + enable_distributor_cluster_state(state); +} + +size_t +TopLevelDistributorTestUtil::stripe_of_bucket(const document::BucketId& id) const noexcept +{ + return stripe_of_bucket_key(id.toKey(), _distributor->_n_stripe_bits); +} + +size_t +TopLevelDistributorTestUtil::stripe_of_bucket(const document::Bucket& bucket) const noexcept +{ + return stripe_of_bucket_key(bucket.getBucketId().toKey(), _distributor->_n_stripe_bits); +} + +void +TopLevelDistributorTestUtil::receive_set_system_state_command(const vespalib::string& state_str) +{ + auto state_cmd = std::make_shared<api::SetSystemStateCommand>(lib::ClusterState(state_str)); + handle_top_level_message(state_cmd); // TODO move semantics +} + +bool +TopLevelDistributorTestUtil::handle_top_level_message(const std::shared_ptr<api::StorageMessage>& msg) +{ + return _distributor->onDown(msg); +} + +void +TopLevelDistributorTestUtil::close() +{ + _component.reset(0); + if (_distributor.get()) { + _stripe_pool->stop_and_join(); // Must be tagged as stopped prior to onClose + _distributor->onClose(); + } + _sender.clear(); + _node.reset(0); + _config = getStandardConfig(false); +} + +void +TopLevelDistributorTestUtil::add_nodes_to_stripe_bucket_db(const document::Bucket& bucket, + const std::string& nodeStr) +{ + BucketDatabase::Entry entry = get_bucket(bucket); + + if (!entry.valid()) { + entry = BucketDatabase::Entry(bucket.getBucketId()); + } + + entry->clear(); + + vespalib::StringTokenizer tokenizer(nodeStr, ","); + for (uint32_t i = 0; i < tokenizer.size(); ++i) { + vespalib::StringTokenizer tok2(tokenizer[i], "="); + vespalib::StringTokenizer tok3(tok2[1], "/"); + + api::BucketInfo info(atoi(tok3[0].data()), + atoi(tok3.size() > 1 ? tok3[1].data() : tok3[0].data()), + atoi(tok3.size() > 2 ? tok3[2].data() : tok3[0].data())); + + size_t flagsIdx = 3; + + // Meta info override? For simplicity, require both meta count and size + if (tok3.size() > 4 && (!tok3[3].empty() && isdigit(tok3[3][0]))) { + info.setMetaCount(atoi(tok3[3].data())); + info.setUsedFileSize(atoi(tok3[4].data())); + flagsIdx = 5; + } + + if ((tok3.size() > flagsIdx + 1) && tok3[flagsIdx + 1] == "a") { + info.setActive(); + } else { + info.setActive(false); + } + if ((tok3.size() > flagsIdx + 2) && tok3[flagsIdx + 2] == "r") { + info.setReady(); + } else { + info.setReady(false); + } + + uint16_t idx = atoi(tok2[0].data()); + BucketCopy node(0, idx, info); + + // Allow user to manually override trusted and active. + if (tok3.size() > flagsIdx && tok3[flagsIdx] == "t") { + node.setTrusted(); + } + + entry->addNodeManual(node); + } + + stripe_bucket_database(stripe_of_bucket(bucket), bucket.getBucketSpace()).update(entry); +} + +void +TopLevelDistributorTestUtil::add_nodes_to_stripe_bucket_db(const document::BucketId& id, + const std::string& nodeStr) +{ + add_nodes_to_stripe_bucket_db(document::Bucket(makeBucketSpace(), id), nodeStr); +} + +BucketDatabase::Entry +TopLevelDistributorTestUtil::get_bucket(const document::Bucket& bucket) const +{ + return stripe_bucket_database(stripe_of_bucket(bucket), bucket.getBucketSpace()).get(bucket.getBucketId()); +} + +BucketDatabase::Entry +TopLevelDistributorTestUtil::get_bucket(const document::BucketId& bId) const +{ + return stripe_bucket_database(stripe_of_bucket(bId)).get(bId); +} + +BucketDBUpdater& +TopLevelDistributorTestUtil::bucket_db_updater() { + return *_distributor->_bucket_db_updater; +} + +const IdealStateMetricSet& +TopLevelDistributorTestUtil::total_ideal_state_metrics() const +{ + assert(_distributor->_ideal_state_total_metrics); + return *_distributor->_ideal_state_total_metrics; +} + +const storage::distributor::DistributorNodeContext& +TopLevelDistributorTestUtil::node_context() const { + return _distributor->distributor_component(); +} + +storage::distributor::DistributorStripeOperationContext& +TopLevelDistributorTestUtil::operation_context() { + return _distributor->distributor_component(); +} + +bool +TopLevelDistributorTestUtil::tick() { + framework::ThreadWaitInfo res( + framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN); + { + framework::TickingLockGuard lock(_distributor->_threadPool.freezeCriticalTicks()); + res.merge(_distributor->doCriticalTick(0)); + } + res.merge(_distributor->doNonCriticalTick(0)); + bool did_work = !res.waitWanted(); + for (auto& s : *_stripe_pool) { + did_work |= s->stripe().tick(); + } + return did_work; +} + +const DistributorConfig& +TopLevelDistributorTestUtil::current_distributor_config() const +{ + return _component->getDistributorConfig(); +} + +void +TopLevelDistributorTestUtil::reconfigure(const DistributorConfig& cfg) +{ + _node->getComponentRegister().setDistributorConfig(cfg); + tick(); // Config is propagated upon next top-level tick +} + +BucketDatabase& +TopLevelDistributorTestUtil::stripe_bucket_database(uint16_t stripe_idx) { + assert(stripe_idx < _distributor->_stripes.size()); + return _distributor->_stripes[stripe_idx]->getBucketSpaceRepo().get(makeBucketSpace()).getBucketDatabase(); +} + +BucketDatabase& +TopLevelDistributorTestUtil::stripe_bucket_database(uint16_t stripe_idx, document::BucketSpace space) { + assert(stripe_idx < _distributor->_stripes.size()); + return _distributor->_stripes[stripe_idx]->getBucketSpaceRepo().get(space).getBucketDatabase(); +} + +const BucketDatabase& +TopLevelDistributorTestUtil::stripe_bucket_database(uint16_t stripe_idx) const { + assert(stripe_idx < _distributor->_stripes.size()); + return _distributor->_stripes[stripe_idx]->getBucketSpaceRepo().get(makeBucketSpace()).getBucketDatabase(); +} + +const BucketDatabase& +TopLevelDistributorTestUtil::stripe_bucket_database(uint16_t stripe_idx, document::BucketSpace space) const { + assert(stripe_idx < _distributor->_stripes.size()); + return _distributor->_stripes[stripe_idx]->getBucketSpaceRepo().get(space).getBucketDatabase(); +} + +// Hide how the sausages are made when directly accessing internal stripes +std::vector<DistributorStripe*> +TopLevelDistributorTestUtil::distributor_stripes() const { + std::vector<DistributorStripe*> stripes; + stripes.reserve(_distributor->_stripes.size()); + for (auto& s : _distributor->_stripes) { + stripes.emplace_back(s.get()); + } + return stripes; +} + +bool +TopLevelDistributorTestUtil::all_distributor_stripes_are_in_recovery_mode() const { + for (auto* s : distributor_stripes()) { + if (!s->isInRecoveryMode()) { + return false; + } + } + return true; +} + +void +TopLevelDistributorTestUtil::enable_distributor_cluster_state(vespalib::stringref state) +{ + bucket_db_updater().simulate_cluster_state_bundle_activation( + lib::ClusterStateBundle(lib::ClusterState(state))); +} + +void +TopLevelDistributorTestUtil::enable_distributor_cluster_state(const lib::ClusterStateBundle& state) +{ + bucket_db_updater().simulate_cluster_state_bundle_activation(state); +} + +} diff --git a/storage/src/tests/distributor/top_level_distributor_test_util.h b/storage/src/tests/distributor/top_level_distributor_test_util.h new file mode 100644 index 00000000000..0cc736464a1 --- /dev/null +++ b/storage/src/tests/distributor/top_level_distributor_test_util.h @@ -0,0 +1,131 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "distributor_message_sender_stub.h" +#include <tests/common/dummystoragelink.h> +#include <tests/common/testhelper.h> +#include <tests/common/teststorageapp.h> +#include <vespa/storage/common/hostreporter/hostinfo.h> +#include <vespa/storage/frameworkimpl/component/distributorcomponentregisterimpl.h> +#include <vespa/storage/storageutil/utils.h> +#include <vespa/storageapi/message/state.h> +#include <vespa/storageframework/defaultimplementation/clock/fakeclock.h> + +namespace storage { + +namespace framework { struct TickingThreadPool; } + +namespace distributor { + +class Distributor; +class DistributorNodeContext; +class DistributorStripe; +class DistributorStripeComponent; +class DistributorStripeOperationContext; +class DistributorStripePool; +class IdealStateMetricSet; +class Operation; +class BucketDBUpdater; + +class TopLevelDistributorTestUtil : private DoneInitializeHandler +{ +public: + TopLevelDistributorTestUtil(); + ~TopLevelDistributorTestUtil(); + + void create_links(); + + void close(); + + size_t stripe_of_bucket(const document::BucketId& id) const noexcept; + size_t stripe_of_bucket(const document::Bucket& bucket) const noexcept; + + /** + * Parses the given string to a set of node => bucket info data, + * and inserts them as nodes in the given bucket. + * Format: + * "node1=checksum/docs/size,node2=checksum/docs/size" + */ + void add_nodes_to_stripe_bucket_db(const document::Bucket& bucket, + const std::string& nodeStr); + // As the above, but always inserts into default bucket space + void add_nodes_to_stripe_bucket_db(const document::BucketId& id, const std::string& nodeStr); + + BucketDBUpdater& bucket_db_updater(); + const IdealStateMetricSet& total_ideal_state_metrics() const; + const storage::distributor::DistributorNodeContext& node_context() const; + storage::distributor::DistributorStripeOperationContext& operation_context(); + + std::vector<DistributorStripe*> distributor_stripes() const; + + bool tick(); + + const DistributorConfig& current_distributor_config() const; + void reconfigure(const DistributorConfig&); + + BucketDatabase& stripe_bucket_database(uint16_t stripe_idx); // Implicit default space only + BucketDatabase& stripe_bucket_database(uint16_t stripe_idx, document::BucketSpace space); + const BucketDatabase& stripe_bucket_database(uint16_t stripe_idx) const; // Implicit default space only + const BucketDatabase& stripe_bucket_database(uint16_t stripe_idx, document::BucketSpace space) const; + [[nodiscard]] bool all_distributor_stripes_are_in_recovery_mode() const; + + void setup_distributor(int redundancy, + int node_count, + const std::string& systemState, + uint32_t early_return = false, + bool require_primary_to_be_written = true); + + void setup_distributor(int redundancy, + int node_count, + const lib::ClusterStateBundle& state, + uint32_t early_return = false, + bool require_primary_to_be_written = true); + + void notifyDoneInitializing() override {} + + BucketDatabase::Entry get_bucket(const document::Bucket& bucket) const; + // Gets bucket entry from default space only + BucketDatabase::Entry get_bucket(const document::BucketId& bId) const; + + DistributorMessageSenderStub& sender() noexcept { return _sender; } + const DistributorMessageSenderStub& sender() const noexcept { return _sender; } + + // Invokes full cluster state transition pipeline rather than directly applying + // the state and just pretending everything has been completed. + void receive_set_system_state_command(const vespalib::string& state_str); + bool handle_top_level_message(const std::shared_ptr<api::StorageMessage>& msg); + +protected: + vdstestlib::DirConfig _config; + std::unique_ptr<TestDistributorApp> _node; + std::unique_ptr<framework::TickingThreadPool> _thread_pool; + std::unique_ptr<DistributorStripePool> _stripe_pool; + std::unique_ptr<Distributor> _distributor; + std::unique_ptr<storage::DistributorComponent> _component; + DistributorMessageSenderStub _sender; + DistributorMessageSenderStub _sender_down; + HostInfo _host_info; + + struct MessageSenderImpl : public ChainedMessageSender { + DistributorMessageSenderStub& _sender; + DistributorMessageSenderStub& _senderDown; + MessageSenderImpl(DistributorMessageSenderStub& up, DistributorMessageSenderStub& down) + : _sender(up), _senderDown(down) {} + + void sendUp(const std::shared_ptr<api::StorageMessage>& msg) override { + _sender.send(msg); + } + void sendDown(const std::shared_ptr<api::StorageMessage>& msg) override { + _senderDown.send(msg); + } + }; + MessageSenderImpl _message_sender; + uint32_t _num_distributor_stripes; + + void enable_distributor_cluster_state(vespalib::stringref state); + void enable_distributor_cluster_state(const lib::ClusterStateBundle& state); +}; + +} + +} |