diff options
author | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2021-08-27 09:07:57 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2021-08-27 11:05:33 +0000 |
commit | a52abb01475098fa77be306bb9ae61611a08cb1a (patch) | |
tree | d6674fd2c9b0f70b6691edc661db40b4c1cdce5f /storage | |
parent | a0c5413398a2a3e4a4754dbf819c4a22b00b984a (diff) |
Add test of top-level distributor functionality
This is a subset of the legacy distributor tests, adapted to
explicitly test cross-stripe functionality. Once all relevant
tests have been ported to be cross-stripe, the legacy test
code will be removed.
Diffstat (limited to 'storage')
15 files changed, 800 insertions, 27 deletions
diff --git a/storage/src/tests/common/message_sender_stub.cpp b/storage/src/tests/common/message_sender_stub.cpp index a82d45b0b99..c0ed0a9418a 100644 --- a/storage/src/tests/common/message_sender_stub.cpp +++ b/storage/src/tests/common/message_sender_stub.cpp @@ -22,7 +22,7 @@ MessageSenderStub::getLastCommand(bool verbose) const } std::string -MessageSenderStub::dumpMessage(const api::StorageMessage& msg, bool includeAddress, bool verbose) const +MessageSenderStub::dumpMessage(const api::StorageMessage& msg, bool includeAddress, bool verbose) { std::ostringstream ost; diff --git a/storage/src/tests/common/message_sender_stub.h b/storage/src/tests/common/message_sender_stub.h index 73b1fcff9f4..3a73ff884bc 100644 --- a/storage/src/tests/common/message_sender_stub.h +++ b/storage/src/tests/common/message_sender_stub.h @@ -38,9 +38,9 @@ struct MessageSenderStub : MessageSender { std::string getReplies(bool includeAddress = false, bool verbose = false) const; - std::string dumpMessage(const api::StorageMessage& msg, - bool includeAddress, - bool verbose) const; + static std::string dumpMessage(const api::StorageMessage& msg, + bool includeAddress, + bool verbose); }; diff --git a/storage/src/tests/distributor/CMakeLists.txt b/storage/src/tests/distributor/CMakeLists.txt index 0d314e050d4..678c19d4c6f 100644 --- a/storage/src/tests/distributor/CMakeLists.txt +++ b/storage/src/tests/distributor/CMakeLists.txt @@ -47,6 +47,8 @@ vespa_add_executable(storage_distributor_gtest_runner_app TEST statoperationtest.cpp statusreporterdelegatetest.cpp throttlingoperationstartertest.cpp + top_level_distributor_test.cpp + top_level_distributor_test_util.cpp twophaseupdateoperationtest.cpp updateoperationtest.cpp visitoroperationtest.cpp diff --git a/storage/src/tests/distributor/top_level_distributor_test.cpp b/storage/src/tests/distributor/top_level_distributor_test.cpp new file mode 100644 index 00000000000..8572420eaba --- /dev/null +++ b/storage/src/tests/distributor/top_level_distributor_test.cpp @@ -0,0 +1,268 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/storage/distributor/idealstatemetricsset.h> +#include <vespa/storageapi/message/persistence.h> +#include <vespa/storageapi/message/bucketsplitting.h> +#include <vespa/storageapi/message/visitor.h> +#include <vespa/storageapi/message/removelocation.h> +#include <vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h> +#include <tests/distributor/top_level_distributor_test_util.h> +#include <vespa/document/bucket/fixed_bucket_spaces.h> +#include <vespa/document/fieldset/fieldsets.h> +#include <vespa/document/test/make_document_bucket.h> +#include <vespa/document/test/make_bucket_space.h> +#include <vespa/storage/config/config-stor-distributormanager.h> +#include <vespa/storage/distributor/distributor.h> +#include <vespa/storage/distributor/distributor_stripe.h> +#include <vespa/storage/distributor/distributor_status.h> +#include <vespa/storage/distributor/distributor_bucket_space.h> +#include <vespa/storage/distributor/distributormetricsset.h> +#include <vespa/storage/distributor/distributor_stripe_pool.h> +#include <vespa/storage/distributor/distributor_stripe_thread.h> +#include <vespa/vespalib/text/stringtokenizer.h> +#include <vespa/vespalib/stllike/asciistream.h> +#include <vespa/metrics/updatehook.h> +#include <thread> +#include <vespa/vespalib/gtest/gtest.h> +#include <gmock/gmock.h> + +using document::test::makeDocumentBucket; +using document::test::makeBucketSpace; +using document::FixedBucketSpaces; +using document::BucketSpace; +using document::Bucket; +using document::BucketId; +using namespace ::testing; + +namespace storage::distributor { + +struct TopLevelDistributorTest : Test, TopLevelDistributorTestUtil { + TopLevelDistributorTest(); + ~TopLevelDistributorTest() override; + + void SetUp() override { + create_links(); + }; + + void TearDown() override { + close(); + } + + // Simple type aliases to make interfacing with certain utility functions + // easier. Note that this is only for readability and does not provide any + // added type safety. + using NodeCount = int; + using Redundancy = int; + using ConfigBuilder = vespa::config::content::core::StorDistributormanagerConfigBuilder; + + std::string resolve_stripe_operation_routing(std::shared_ptr<api::StorageMessage> msg) { + handle_top_level_message(msg); + + vespalib::asciistream posted_msgs; + auto stripes = distributor_stripes(); + for (size_t i = 0; i < stripes.size(); ++i) { + // TODO less intrusive, this is brittle. + for (auto& qmsg : stripes[i]->_messageQueue) { + posted_msgs << "Stripe " << i << ": " << MessageSenderStub::dumpMessage(*qmsg, false, false); + } + stripes[i]->_messageQueue.clear(); + } + return posted_msgs.str(); + } + + void tick_distributor_n_times(uint32_t n) { + for (uint32_t i = 0; i < n; ++i) { + tick(); + } + } + + StatusReporterDelegate& distributor_status_delegate() { + return _distributor->_distributorStatusDelegate; + } + + framework::TickingThreadPool& distributor_thread_pool() { + return _distributor->_threadPool; + } + + const std::vector<std::shared_ptr<DistributorStatus>>& distributor_status_todos() { + return _distributor->_status_to_do; + } + + Distributor::MetricUpdateHook distributor_metric_update_hook() { + return _distributor->_metricUpdateHook; + } +}; + +TopLevelDistributorTest::TopLevelDistributorTest() + : Test(), + TopLevelDistributorTestUtil() +{ +} + +TopLevelDistributorTest::~TopLevelDistributorTest() = default; + +TEST_F(TopLevelDistributorTest, external_operation_is_routed_to_expected_stripe) { + setup_distributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1"); + + auto op = std::make_shared<api::RemoveCommand>( + makeDocumentBucket(document::BucketId()), + document::DocumentId("id:m:test:n=1:foo"), + api::Timestamp(1234)); + + // We expect stripe mapping to be deterministic. + EXPECT_EQ("Stripe 2: Remove", resolve_stripe_operation_routing(op)); + + auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "foo", "bar", ""); + cmd->addBucketToBeVisited(document::BucketId(16, 1234)); + cmd->addBucketToBeVisited(document::BucketId()); + + EXPECT_EQ("Stripe 1: Visitor Create", resolve_stripe_operation_routing(cmd)); +} + +TEST_F(TopLevelDistributorTest, recovery_mode_on_cluster_state_change_is_triggered_across_all_stripes) { + setup_distributor(Redundancy(1), NodeCount(2), + "storage:1 .0.s:d distributor:1"); + enable_distributor_cluster_state("storage:1 distributor:1"); + + EXPECT_TRUE(all_distributor_stripes_are_in_recovery_mode()); + tick(); + EXPECT_FALSE(all_distributor_stripes_are_in_recovery_mode()); + + enable_distributor_cluster_state("storage:2 distributor:1"); + EXPECT_TRUE(all_distributor_stripes_are_in_recovery_mode()); +} + +// TODO STRIPE consider moving to generic test, not specific to top-level distributor or stripe +TEST_F(TopLevelDistributorTest, contains_time_statement) { + setup_distributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1"); + + auto cfg = _component->total_distributor_config_sp(); + EXPECT_FALSE(cfg->containsTimeStatement("")); + EXPECT_FALSE(cfg->containsTimeStatement("testdoctype1")); + EXPECT_FALSE(cfg->containsTimeStatement("testdoctype1.headerfield > 42")); + EXPECT_TRUE(cfg->containsTimeStatement("testdoctype1.headerfield > now()")); + EXPECT_TRUE(cfg->containsTimeStatement("testdoctype1.headerfield > now() - 3600")); + EXPECT_TRUE(cfg->containsTimeStatement("testdoctype1.headerfield == now() - 3600")); +} + +TEST_F(TopLevelDistributorTest, config_changes_are_propagated_to_all_stripes) { + setup_distributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1"); + + for (auto* s : distributor_stripes()) { + ASSERT_NE(s->getConfig().getSplitCount(), 1234); + ASSERT_NE(s->getConfig().getJoinCount(), 123); + } + + auto cfg = current_distributor_config(); + cfg.splitcount = 1234; + cfg.joincount = 123; + reconfigure(cfg); + + for (auto* s : distributor_stripes()) { + ASSERT_EQ(s->getConfig().getSplitCount(), 1234); + ASSERT_EQ(s->getConfig().getJoinCount(), 123); + } +} + +namespace { + +using namespace framework::defaultimplementation; + +class StatusRequestThread : public framework::Runnable { + StatusReporterDelegate& _reporter; + std::string _result; +public: + explicit StatusRequestThread(StatusReporterDelegate& reporter) + : _reporter(reporter) + {} + void run(framework::ThreadHandle&) override { + framework::HttpUrlPath path("/distributor?page=buckets"); + std::ostringstream stream; + _reporter.reportStatus(stream, path); + _result = stream.str(); + } + + std::string getResult() const { + return _result; + } +}; + +} + +TEST_F(TopLevelDistributorTest, tick_aggregates_status_requests_from_all_stripes) { + setup_distributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1"); + + ASSERT_NE(stripe_of_bucket(document::BucketId(16, 1)), + stripe_of_bucket(document::BucketId(16, 2))); + + add_nodes_to_stripe_bucket_db(document::BucketId(16, 1), "0=1/1/1/t"); + add_nodes_to_stripe_bucket_db(document::BucketId(16, 2), "0=2/2/2/t"); + + // Must go via delegate since reportStatus is now just a rendering + // function and not a request enqueuer (see Distributor::handleStatusRequest). + StatusRequestThread thread(distributor_status_delegate()); + FakeClock clock; + ThreadPoolImpl pool(clock); + int ticksBeforeWait = 1; + framework::Thread::UP tp(pool.startThread(thread, "statustest", 5ms, 5s, ticksBeforeWait)); + + while (true) { + std::this_thread::sleep_for(1ms); + framework::TickingLockGuard guard(distributor_thread_pool().freezeCriticalTicks()); + if (!distributor_status_todos().empty()) { + break; + } + + } + ASSERT_TRUE(tick()); + + tp->interruptAndJoin(); + + // Result contains buckets from DBs of multiple stripes. + EXPECT_THAT(thread.getResult(), HasSubstr("BucketId(0x4000000000000001)")); + EXPECT_THAT(thread.getResult(), HasSubstr("BucketId(0x4000000000000002)")); +} + +TEST_F(TopLevelDistributorTest, metric_update_hook_updates_pending_maintenance_metrics) { + setup_distributor(Redundancy(2), NodeCount(2), "storage:2 distributor:1"); + // To ensure we count all operations, not just those fitting within the pending window. + auto cfg = current_distributor_config(); + cfg.maxpendingidealstateoperations = 1; // FIXME STRIPE this does not actually seem to be used...! + reconfigure(cfg); + + // 1 bucket must be merged, 1 must be split, 1 should be activated. + add_nodes_to_stripe_bucket_db(document::BucketId(16, 1), "0=2/2/2/t/a,1=1/1/1"); + add_nodes_to_stripe_bucket_db(document::BucketId(16, 2), "0=100/10000000/200000/t/a,1=100/10000000/200000/t"); + add_nodes_to_stripe_bucket_db(document::BucketId(16, 3), "0=200/300/400/t,1=200/300/400/t"); + + // Go many full scanner rounds to check that metrics are set, not added to existing. + tick_distributor_n_times(50); + + // By this point, no hook has been called so the metrics have not been set. + using MO = MaintenanceOperation; + { + const IdealStateMetricSet& metrics = total_ideal_state_metrics(); + EXPECT_EQ(0, metrics.operations[MO::MERGE_BUCKET]->pending.getLast()); + EXPECT_EQ(0, metrics.operations[MO::SPLIT_BUCKET]->pending.getLast()); + EXPECT_EQ(0, metrics.operations[MO::SET_BUCKET_STATE]->pending.getLast()); + EXPECT_EQ(0, metrics.operations[MO::DELETE_BUCKET]->pending.getLast()); + EXPECT_EQ(0, metrics.operations[MO::JOIN_BUCKET]->pending.getLast()); + EXPECT_EQ(0, metrics.operations[MO::GARBAGE_COLLECTION]->pending.getLast()); + } + + // Force trigger update hook + std::mutex l; + distributor_metric_update_hook().updateMetrics(metrics::MetricLockGuard(l)); + // Metrics should now be updated to the last complete working state + { + const IdealStateMetricSet& metrics = total_ideal_state_metrics(); + EXPECT_EQ(1, metrics.operations[MO::MERGE_BUCKET]->pending.getLast()); + EXPECT_EQ(1, metrics.operations[MO::SPLIT_BUCKET]->pending.getLast()); + EXPECT_EQ(1, metrics.operations[MO::SET_BUCKET_STATE]->pending.getLast()); + EXPECT_EQ(0, metrics.operations[MO::DELETE_BUCKET]->pending.getLast()); + EXPECT_EQ(0, metrics.operations[MO::JOIN_BUCKET]->pending.getLast()); + EXPECT_EQ(0, metrics.operations[MO::GARBAGE_COLLECTION]->pending.getLast()); + } +} + +} diff --git a/storage/src/tests/distributor/top_level_distributor_test_util.cpp b/storage/src/tests/distributor/top_level_distributor_test_util.cpp new file mode 100644 index 00000000000..e7670a78a51 --- /dev/null +++ b/storage/src/tests/distributor/top_level_distributor_test_util.cpp @@ -0,0 +1,310 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "top_level_distributor_test_util.h" +#include <vespa/config-stor-distribution.h> +#include <vespa/document/test/make_bucket_space.h> +#include <vespa/document/test/make_document_bucket.h> +#include <vespa/storage/distributor/distributor.h> +#include <vespa/storage/distributor/distributor_bucket_space.h> +#include <vespa/storage/distributor/distributor_stripe.h> +#include <vespa/storage/distributor/distributor_stripe_component.h> +#include <vespa/storage/distributor/distributor_stripe_pool.h> +#include <vespa/storage/distributor/distributor_stripe_thread.h> +#include <vespa/storage/common/bucket_stripe_utils.h> +#include <vespa/vdslib/distribution/distribution.h> +#include <vespa/vespalib/text/stringtokenizer.h> + +using document::test::makeBucketSpace; +using document::test::makeDocumentBucket; + +namespace storage::distributor { + +TopLevelDistributorTestUtil::TopLevelDistributorTestUtil() + : _message_sender(_sender, _sender_down), + _num_distributor_stripes(4) +{ + _config = getStandardConfig(false); +} + +TopLevelDistributorTestUtil::~TopLevelDistributorTestUtil() = default; + +void +TopLevelDistributorTestUtil::create_links() +{ + _node.reset(new TestDistributorApp(_config.getConfigId())); + _thread_pool = framework::TickingThreadPool::createDefault("distributor"); + _stripe_pool = DistributorStripePool::make_non_threaded_pool_for_testing(); + _distributor.reset(new Distributor( + _node->getComponentRegister(), + _node->node_identity(), + *_thread_pool, + *_stripe_pool, + *this, + _num_distributor_stripes, + _host_info, + &_message_sender)); + _component.reset(new storage::DistributorComponent(_node->getComponentRegister(), "distrtestutil")); +}; + +void +TopLevelDistributorTestUtil::setup_distributor(int redundancy, + int node_count, + const std::string& cluster_state, + uint32_t early_return, + bool require_primary_to_be_written) +{ + setup_distributor(redundancy, node_count, lib::ClusterStateBundle(lib::ClusterState(cluster_state)), + early_return, require_primary_to_be_written); +} + +void +TopLevelDistributorTestUtil::setup_distributor(int redundancy, + int node_count, + const lib::ClusterStateBundle& state, + uint32_t early_return, + bool require_primary_to_be_written) +{ + lib::Distribution::DistributionConfigBuilder config( + lib::Distribution::getDefaultDistributionConfig(redundancy, node_count).get()); + config.redundancy = redundancy; + config.initialRedundancy = early_return; + config.ensurePrimaryPersisted = require_primary_to_be_written; + auto distribution = std::make_shared<lib::Distribution>(config); + _node->getComponentRegister().setDistribution(distribution); + // This is for all intents and purposes a hack to avoid having the + // distributor treat setting the distribution explicitly as a signal that + // it should send RequestBucketInfo to all configured nodes. + // If we called storage_distribution_changed followed by enableDistribution + // explicitly (which is what happens in "real life"), that is what would + // take place. + // The inverse case of this can be explicitly accomplished by calling + // triggerDistributionChange(). + // This isn't pretty, folks, but it avoids breaking the world for now, + // as many tests have implicit assumptions about this being the behavior. + _distributor->propagateDefaultDistribution(distribution); + // Explicitly init the stripe pool since onOpen isn't called during testing + _distributor->start_stripe_pool(); + enable_distributor_cluster_state(state); +} + +size_t +TopLevelDistributorTestUtil::stripe_of_bucket(const document::BucketId& id) const noexcept +{ + return stripe_of_bucket_key(id.toKey(), _distributor->_n_stripe_bits); +} + +size_t +TopLevelDistributorTestUtil::stripe_of_bucket(const document::Bucket& bucket) const noexcept +{ + return stripe_of_bucket_key(bucket.getBucketId().toKey(), _distributor->_n_stripe_bits); +} + +void +TopLevelDistributorTestUtil::receive_set_system_state_command(const vespalib::string& state_str) +{ + auto state_cmd = std::make_shared<api::SetSystemStateCommand>(lib::ClusterState(state_str)); + handle_top_level_message(state_cmd); // TODO move semantics +} + +bool +TopLevelDistributorTestUtil::handle_top_level_message(const std::shared_ptr<api::StorageMessage>& msg) +{ + return _distributor->onDown(msg); +} + +void +TopLevelDistributorTestUtil::close() +{ + _component.reset(0); + if (_distributor.get()) { + _stripe_pool->stop_and_join(); // Must be tagged as stopped prior to onClose + _distributor->onClose(); + } + _sender.clear(); + _node.reset(0); + _config = getStandardConfig(false); +} + +void +TopLevelDistributorTestUtil::add_nodes_to_stripe_bucket_db(const document::Bucket& bucket, + const std::string& nodeStr) +{ + BucketDatabase::Entry entry = get_bucket(bucket); + + if (!entry.valid()) { + entry = BucketDatabase::Entry(bucket.getBucketId()); + } + + entry->clear(); + + vespalib::StringTokenizer tokenizer(nodeStr, ","); + for (uint32_t i = 0; i < tokenizer.size(); ++i) { + vespalib::StringTokenizer tok2(tokenizer[i], "="); + vespalib::StringTokenizer tok3(tok2[1], "/"); + + api::BucketInfo info(atoi(tok3[0].data()), + atoi(tok3.size() > 1 ? tok3[1].data() : tok3[0].data()), + atoi(tok3.size() > 2 ? tok3[2].data() : tok3[0].data())); + + size_t flagsIdx = 3; + + // Meta info override? For simplicity, require both meta count and size + if (tok3.size() > 4 && (!tok3[3].empty() && isdigit(tok3[3][0]))) { + info.setMetaCount(atoi(tok3[3].data())); + info.setUsedFileSize(atoi(tok3[4].data())); + flagsIdx = 5; + } + + if ((tok3.size() > flagsIdx + 1) && tok3[flagsIdx + 1] == "a") { + info.setActive(); + } else { + info.setActive(false); + } + if ((tok3.size() > flagsIdx + 2) && tok3[flagsIdx + 2] == "r") { + info.setReady(); + } else { + info.setReady(false); + } + + uint16_t idx = atoi(tok2[0].data()); + BucketCopy node(0, idx, info); + + // Allow user to manually override trusted and active. + if (tok3.size() > flagsIdx && tok3[flagsIdx] == "t") { + node.setTrusted(); + } + + entry->addNodeManual(node); + } + + stripe_bucket_database(stripe_of_bucket(bucket), bucket.getBucketSpace()).update(entry); +} + +void +TopLevelDistributorTestUtil::add_nodes_to_stripe_bucket_db(const document::BucketId& id, + const std::string& nodeStr) +{ + add_nodes_to_stripe_bucket_db(document::Bucket(makeBucketSpace(), id), nodeStr); +} + +BucketDatabase::Entry +TopLevelDistributorTestUtil::get_bucket(const document::Bucket& bucket) const +{ + return stripe_bucket_database(stripe_of_bucket(bucket), bucket.getBucketSpace()).get(bucket.getBucketId()); +} + +BucketDatabase::Entry +TopLevelDistributorTestUtil::get_bucket(const document::BucketId& bId) const +{ + return stripe_bucket_database(stripe_of_bucket(bId)).get(bId); +} + +BucketDBUpdater& +TopLevelDistributorTestUtil::bucket_db_updater() { + return *_distributor->_bucket_db_updater; +} + +const IdealStateMetricSet& +TopLevelDistributorTestUtil::total_ideal_state_metrics() const +{ + assert(_distributor->_ideal_state_total_metrics); + return *_distributor->_ideal_state_total_metrics; +} + +const storage::distributor::DistributorNodeContext& +TopLevelDistributorTestUtil::node_context() const { + return _distributor->distributor_component(); +} + +storage::distributor::DistributorStripeOperationContext& +TopLevelDistributorTestUtil::operation_context() { + return _distributor->distributor_component(); +} + +bool +TopLevelDistributorTestUtil::tick() { + framework::ThreadWaitInfo res( + framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN); + { + framework::TickingLockGuard lock(_distributor->_threadPool.freezeCriticalTicks()); + res.merge(_distributor->doCriticalTick(0)); + } + res.merge(_distributor->doNonCriticalTick(0)); + bool did_work = !res.waitWanted(); + for (auto& s : *_stripe_pool) { + did_work |= s->stripe().tick(); + } + return did_work; +} + +const DistributorConfig& +TopLevelDistributorTestUtil::current_distributor_config() const +{ + return _component->getDistributorConfig(); +} + +void +TopLevelDistributorTestUtil::reconfigure(const DistributorConfig& cfg) +{ + _node->getComponentRegister().setDistributorConfig(cfg); + tick(); // Config is propagated upon next top-level tick +} + +BucketDatabase& +TopLevelDistributorTestUtil::stripe_bucket_database(uint16_t stripe_idx) { + assert(stripe_idx < _distributor->_stripes.size()); + return _distributor->_stripes[stripe_idx]->getBucketSpaceRepo().get(makeBucketSpace()).getBucketDatabase(); +} + +BucketDatabase& +TopLevelDistributorTestUtil::stripe_bucket_database(uint16_t stripe_idx, document::BucketSpace space) { + assert(stripe_idx < _distributor->_stripes.size()); + return _distributor->_stripes[stripe_idx]->getBucketSpaceRepo().get(space).getBucketDatabase(); +} + +const BucketDatabase& +TopLevelDistributorTestUtil::stripe_bucket_database(uint16_t stripe_idx) const { + assert(stripe_idx < _distributor->_stripes.size()); + return _distributor->_stripes[stripe_idx]->getBucketSpaceRepo().get(makeBucketSpace()).getBucketDatabase(); +} + +const BucketDatabase& +TopLevelDistributorTestUtil::stripe_bucket_database(uint16_t stripe_idx, document::BucketSpace space) const { + assert(stripe_idx < _distributor->_stripes.size()); + return _distributor->_stripes[stripe_idx]->getBucketSpaceRepo().get(space).getBucketDatabase(); +} + +// Hide how the sausages are made when directly accessing internal stripes +std::vector<DistributorStripe*> +TopLevelDistributorTestUtil::distributor_stripes() const { + std::vector<DistributorStripe*> stripes; + stripes.reserve(_distributor->_stripes.size()); + for (auto& s : _distributor->_stripes) { + stripes.emplace_back(s.get()); + } + return stripes; +} + +bool +TopLevelDistributorTestUtil::all_distributor_stripes_are_in_recovery_mode() const { + for (auto* s : distributor_stripes()) { + if (!s->isInRecoveryMode()) { + return false; + } + } + return true; +} + +void +TopLevelDistributorTestUtil::enable_distributor_cluster_state(vespalib::stringref state) +{ + bucket_db_updater().simulate_cluster_state_bundle_activation( + lib::ClusterStateBundle(lib::ClusterState(state))); +} + +void +TopLevelDistributorTestUtil::enable_distributor_cluster_state(const lib::ClusterStateBundle& state) +{ + bucket_db_updater().simulate_cluster_state_bundle_activation(state); +} + +} diff --git a/storage/src/tests/distributor/top_level_distributor_test_util.h b/storage/src/tests/distributor/top_level_distributor_test_util.h new file mode 100644 index 00000000000..0cc736464a1 --- /dev/null +++ b/storage/src/tests/distributor/top_level_distributor_test_util.h @@ -0,0 +1,131 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "distributor_message_sender_stub.h" +#include <tests/common/dummystoragelink.h> +#include <tests/common/testhelper.h> +#include <tests/common/teststorageapp.h> +#include <vespa/storage/common/hostreporter/hostinfo.h> +#include <vespa/storage/frameworkimpl/component/distributorcomponentregisterimpl.h> +#include <vespa/storage/storageutil/utils.h> +#include <vespa/storageapi/message/state.h> +#include <vespa/storageframework/defaultimplementation/clock/fakeclock.h> + +namespace storage { + +namespace framework { struct TickingThreadPool; } + +namespace distributor { + +class Distributor; +class DistributorNodeContext; +class DistributorStripe; +class DistributorStripeComponent; +class DistributorStripeOperationContext; +class DistributorStripePool; +class IdealStateMetricSet; +class Operation; +class BucketDBUpdater; + +class TopLevelDistributorTestUtil : private DoneInitializeHandler +{ +public: + TopLevelDistributorTestUtil(); + ~TopLevelDistributorTestUtil(); + + void create_links(); + + void close(); + + size_t stripe_of_bucket(const document::BucketId& id) const noexcept; + size_t stripe_of_bucket(const document::Bucket& bucket) const noexcept; + + /** + * Parses the given string to a set of node => bucket info data, + * and inserts them as nodes in the given bucket. + * Format: + * "node1=checksum/docs/size,node2=checksum/docs/size" + */ + void add_nodes_to_stripe_bucket_db(const document::Bucket& bucket, + const std::string& nodeStr); + // As the above, but always inserts into default bucket space + void add_nodes_to_stripe_bucket_db(const document::BucketId& id, const std::string& nodeStr); + + BucketDBUpdater& bucket_db_updater(); + const IdealStateMetricSet& total_ideal_state_metrics() const; + const storage::distributor::DistributorNodeContext& node_context() const; + storage::distributor::DistributorStripeOperationContext& operation_context(); + + std::vector<DistributorStripe*> distributor_stripes() const; + + bool tick(); + + const DistributorConfig& current_distributor_config() const; + void reconfigure(const DistributorConfig&); + + BucketDatabase& stripe_bucket_database(uint16_t stripe_idx); // Implicit default space only + BucketDatabase& stripe_bucket_database(uint16_t stripe_idx, document::BucketSpace space); + const BucketDatabase& stripe_bucket_database(uint16_t stripe_idx) const; // Implicit default space only + const BucketDatabase& stripe_bucket_database(uint16_t stripe_idx, document::BucketSpace space) const; + [[nodiscard]] bool all_distributor_stripes_are_in_recovery_mode() const; + + void setup_distributor(int redundancy, + int node_count, + const std::string& systemState, + uint32_t early_return = false, + bool require_primary_to_be_written = true); + + void setup_distributor(int redundancy, + int node_count, + const lib::ClusterStateBundle& state, + uint32_t early_return = false, + bool require_primary_to_be_written = true); + + void notifyDoneInitializing() override {} + + BucketDatabase::Entry get_bucket(const document::Bucket& bucket) const; + // Gets bucket entry from default space only + BucketDatabase::Entry get_bucket(const document::BucketId& bId) const; + + DistributorMessageSenderStub& sender() noexcept { return _sender; } + const DistributorMessageSenderStub& sender() const noexcept { return _sender; } + + // Invokes full cluster state transition pipeline rather than directly applying + // the state and just pretending everything has been completed. + void receive_set_system_state_command(const vespalib::string& state_str); + bool handle_top_level_message(const std::shared_ptr<api::StorageMessage>& msg); + +protected: + vdstestlib::DirConfig _config; + std::unique_ptr<TestDistributorApp> _node; + std::unique_ptr<framework::TickingThreadPool> _thread_pool; + std::unique_ptr<DistributorStripePool> _stripe_pool; + std::unique_ptr<Distributor> _distributor; + std::unique_ptr<storage::DistributorComponent> _component; + DistributorMessageSenderStub _sender; + DistributorMessageSenderStub _sender_down; + HostInfo _host_info; + + struct MessageSenderImpl : public ChainedMessageSender { + DistributorMessageSenderStub& _sender; + DistributorMessageSenderStub& _senderDown; + MessageSenderImpl(DistributorMessageSenderStub& up, DistributorMessageSenderStub& down) + : _sender(up), _senderDown(down) {} + + void sendUp(const std::shared_ptr<api::StorageMessage>& msg) override { + _sender.send(msg); + } + void sendDown(const std::shared_ptr<api::StorageMessage>& msg) override { + _senderDown.send(msg); + } + }; + MessageSenderImpl _message_sender; + uint32_t _num_distributor_stripes; + + void enable_distributor_cluster_state(vespalib::stringref state); + void enable_distributor_cluster_state(const lib::ClusterStateBundle& state); +}; + +} + +} diff --git a/storage/src/vespa/storage/bucketdb/bucketinfo.h b/storage/src/vespa/storage/bucketdb/bucketinfo.h index 533192eda89..690fd3e36a9 100644 --- a/storage/src/vespa/storage/bucketdb/bucketinfo.h +++ b/storage/src/vespa/storage/bucketdb/bucketinfo.h @@ -10,6 +10,7 @@ namespace storage { namespace distributor { class DistributorStripeTestUtil; class DistributorTestUtil; + class TopLevelDistributorTestUtil; } enum class TrustedUpdate { @@ -205,6 +206,7 @@ public: private: friend class distributor::DistributorStripeTestUtil; friend class distributor::DistributorTestUtil; + friend class distributor::TopLevelDistributorTestUtil; /** * Returns the bucket copy struct for the given node, null if nonexisting diff --git a/storage/src/vespa/storage/common/distributorcomponent.h b/storage/src/vespa/storage/common/distributorcomponent.h index 403ffa3376c..cbea739866f 100644 --- a/storage/src/vespa/storage/common/distributorcomponent.h +++ b/storage/src/vespa/storage/common/distributorcomponent.h @@ -41,17 +41,17 @@ namespace lib { class IdealNodeCalculator; } -typedef vespa::config::content::core::internal::InternalStorDistributormanagerType DistributorConfig; -typedef vespa::config::content::core::internal::InternalStorVisitordispatcherType VisitorConfig; +using DistributorConfig = vespa::config::content::core::internal::InternalStorDistributormanagerType; +using VisitorConfig = vespa::config::content::core::internal::InternalStorVisitordispatcherType; struct UniqueTimeCalculator { - virtual ~UniqueTimeCalculator() {} + virtual ~UniqueTimeCalculator() = default; [[nodiscard]] virtual api::Timestamp generate_unique_timestamp() = 0; }; struct DistributorManagedComponent { - virtual ~DistributorManagedComponent() {} + virtual ~DistributorManagedComponent() = default; virtual void setTimeCalculator(UniqueTimeCalculator&) = 0; virtual void setDistributorConfig(const DistributorConfig&)= 0; diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.h b/storage/src/vespa/storage/distributor/bucketdbupdater.h index 787f0362da6..0fded3e5a65 100644 --- a/storage/src/vespa/storage/distributor/bucketdbupdater.h +++ b/storage/src/vespa/storage/distributor/bucketdbupdater.h @@ -74,6 +74,7 @@ private: friend class DistributorStripeTestUtil; friend class DistributorTestUtil; + friend class TopLevelDistributorTestUtil; // Only to be used by tests that want to ensure both the BucketDBUpdater _and_ the Distributor // components agree on the currently active cluster state bundle. // Transitively invokes Distributor::enableClusterStateBundle diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp index 6f9cbf3b0f2..a5ef9915704 100644 --- a/storage/src/vespa/storage/distributor/distributor.cpp +++ b/storage/src/vespa/storage/distributor/distributor.cpp @@ -61,16 +61,18 @@ Distributor::Distributor(DistributorComponentRegister& compReg, _comp_reg(compReg), _use_legacy_mode(num_distributor_stripes == 0), _metrics(std::make_shared<DistributorMetricSet>()), - _total_metrics(_use_legacy_mode ? std::shared_ptr<DistributorTotalMetrics>() : - std::make_shared<DistributorTotalMetrics>(num_distributor_stripes)), - _ideal_state_metrics(_use_legacy_mode ? std::make_shared<IdealStateMetricSet>() : std::shared_ptr<IdealStateMetricSet>()), - _ideal_state_total_metrics(_use_legacy_mode ? std::shared_ptr<IdealStateTotalMetrics>() : - std::make_shared<IdealStateTotalMetrics>(num_distributor_stripes)), + _total_metrics(_use_legacy_mode ? std::shared_ptr<DistributorTotalMetrics>() + : std::make_shared<DistributorTotalMetrics>(num_distributor_stripes)), + _ideal_state_metrics(_use_legacy_mode ? std::make_shared<IdealStateMetricSet>() + : std::shared_ptr<IdealStateMetricSet>()), + _ideal_state_total_metrics(_use_legacy_mode ? std::shared_ptr<IdealStateTotalMetrics>() + : std::make_shared<IdealStateTotalMetrics>(num_distributor_stripes)), _messageSender(messageSender), _n_stripe_bits(0), _stripe(std::make_unique<DistributorStripe>(compReg, _use_legacy_mode ? *_metrics : _total_metrics->stripe(0), - _use_legacy_mode ? *_ideal_state_metrics : _ideal_state_total_metrics->stripe(0), + (_use_legacy_mode ? *_ideal_state_metrics + : _ideal_state_total_metrics->stripe(0)), node_identity, threadPool, doneInitHandler, *this, *this, _use_legacy_mode)), _stripe_pool(stripe_pool), @@ -107,8 +109,8 @@ Distributor::Distributor(DistributorComponentRegister& compReg, if (!_use_legacy_mode) { assert(num_distributor_stripes == adjusted_num_stripes(num_distributor_stripes)); _n_stripe_bits = calc_num_stripe_bits(num_distributor_stripes); - LOG(info, "Setting up distributor with %u stripes using %u stripe bits", - num_distributor_stripes, _n_stripe_bits); // TODO STRIPE remove once legacy gone + LOG(debug, "Setting up distributor with %u stripes using %u stripe bits", + num_distributor_stripes, _n_stripe_bits); _stripe_accessor = std::make_unique<MultiThreadedStripeAccessor>(_stripe_pool); _bucket_db_updater = std::make_unique<BucketDBUpdater>(_component, _component, *this, *this, @@ -270,13 +272,7 @@ Distributor::onOpen() if (_component.getDistributorConfig().startDistributorThread) { _threadPool.addThread(*this); _threadPool.start(_component.getThreadPool()); - if (!_use_legacy_mode) { - std::vector<TickableStripe*> pool_stripes; - for (auto& stripe : _stripes) { - pool_stripes.push_back(stripe.get()); - } - _stripe_pool.start(pool_stripes); - } + start_stripe_pool(); } else { LOG(warning, "Not starting distributor thread as it's configured to " "run. Unless you are just running a test tool, this is a " @@ -306,6 +302,18 @@ void Distributor::onClose() { } void +Distributor::start_stripe_pool() +{ + if (!_use_legacy_mode) { + std::vector<TickableStripe*> pool_stripes; + for (auto& stripe : _stripes) { + pool_stripes.push_back(stripe.get()); + } + _stripe_pool.start(pool_stripes); // If unit testing, this won't actually start any OS threads + } +} + +void Distributor::sendUp(const std::shared_ptr<api::StorageMessage>& msg) { if (_messageSender) { @@ -412,7 +420,7 @@ Distributor::onDown(const std::shared_ptr<api::StorageMessage>& msg) vespalib::make_string("Distributor::onDown(): Dispatch message to stripe %u", stripe_idx)); bool handled = _stripes[stripe_idx]->handle_or_enqueue_message(msg); if (handled) { - _stripe_pool.stripe_thread(stripe_idx).notify_event_has_triggered(); + _stripe_pool.notify_stripe_event_has_triggered(stripe_idx); } return handled; } @@ -750,6 +758,12 @@ Distributor::signal_work_was_done() _tickResult = framework::ThreadWaitInfo::MORE_WORK_ENQUEUED; } +bool +Distributor::work_was_done() const noexcept +{ + return !_tickResult.waitWanted(); +} + vespalib::string Distributor::getReportContentType(const framework::HttpUrlPath& path) const { diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h index 8a522a34be1..ad19af7a656 100644 --- a/storage/src/vespa/storage/distributor/distributor.h +++ b/storage/src/vespa/storage/distributor/distributor.h @@ -80,6 +80,8 @@ public: void sendUp(const std::shared_ptr<api::StorageMessage>&) override; void sendDown(const std::shared_ptr<api::StorageMessage>&) override; + void start_stripe_pool(); + DistributorMetricSet& getMetrics(); // Implements DistributorInterface and DistributorMessageSender. @@ -127,10 +129,12 @@ public: private: friend class DistributorStripeTestUtil; friend class DistributorTestUtil; + friend class TopLevelDistributorTestUtil; friend class LegacyBucketDBUpdaterTest; friend class MetricUpdateHook; friend struct DistributorStripeTest; friend struct LegacyDistributorTest; + friend struct TopLevelDistributorTest; void setNodeStateUp(); bool handleMessage(const std::shared_ptr<api::StorageMessage>& msg); @@ -180,6 +184,7 @@ private: void fetch_status_requests(); void handle_status_requests(); void signal_work_was_done(); + [[nodiscard]] bool work_was_done() const noexcept; void enableNextDistribution(); void propagateDefaultDistribution(std::shared_ptr<const lib::Distribution>); @@ -205,7 +210,7 @@ private: std::shared_ptr<DistributorMetricSet> _metrics; std::shared_ptr<DistributorTotalMetrics> _total_metrics; std::shared_ptr<IdealStateMetricSet> _ideal_state_metrics; - std::shared_ptr<IdealStateTotalMetrics> _ideal_state_total_metrics; + std::shared_ptr<IdealStateTotalMetrics> _ideal_state_total_metrics; ChainedMessageSender* _messageSender; // TODO STRIPE multiple stripes...! This is for proof of concept of wiring. uint8_t _n_stripe_bits; diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.h b/storage/src/vespa/storage/distributor/distributor_stripe.h index 9deb0754a1f..38667859cfd 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe.h +++ b/storage/src/vespa/storage/distributor/distributor_stripe.h @@ -203,6 +203,7 @@ private: friend class MultiThreadedStripeAccessGuard; friend struct DistributorStripeTest; friend struct LegacyDistributorTest; + friend struct TopLevelDistributorTest; bool handleMessage(const std::shared_ptr<api::StorageMessage>& msg); bool isMaintenanceReply(const api::StorageReply& reply) const; diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_pool.cpp b/storage/src/vespa/storage/distributor/distributor_stripe_pool.cpp index 13162de4208..e0d3e3d2508 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe_pool.cpp +++ b/storage/src/vespa/storage/distributor/distributor_stripe_pool.cpp @@ -7,7 +7,7 @@ namespace storage::distributor { -DistributorStripePool::DistributorStripePool() +DistributorStripePool::DistributorStripePool(bool test_mode, PrivateCtorTag) : _thread_pool(512_Ki), _n_stripe_bits(0), _stripes(), @@ -17,17 +17,30 @@ DistributorStripePool::DistributorStripePool() _parked_threads(0), _bootstrap_tick_wait_duration(1ms), _bootstrap_ticks_before_wait(10), + _single_threaded_test_mode(test_mode), _stopped(false) {} +DistributorStripePool::DistributorStripePool() + : DistributorStripePool(false, PrivateCtorTag()) +{} + DistributorStripePool::~DistributorStripePool() { if (!_stopped) { stop_and_join(); } } +std::unique_ptr<DistributorStripePool> +DistributorStripePool::make_non_threaded_pool_for_testing() { + return std::make_unique<DistributorStripePool>(true, PrivateCtorTag()); +} + void DistributorStripePool::park_all_threads() noexcept { assert(!_stripes.empty()); + if (_single_threaded_test_mode) { + return; + } // Thread pool is not dynamic and signal_wants_park() is thread safe. for (auto& s : _stripes) { s->signal_wants_park(); @@ -37,6 +50,9 @@ void DistributorStripePool::park_all_threads() noexcept { } void DistributorStripePool::unpark_all_threads() noexcept { + if (_single_threaded_test_mode) { + return; + } // Thread pool is not dynamic and unpark_thread() is thread safe. for (auto& s : _stripes) { s->unpark_thread(); @@ -58,7 +74,17 @@ TickableStripe& DistributorStripePool::stripe_of_key(uint64_t key) noexcept { return stripe_thread(stripe_of_bucket_key(key, _n_stripe_bits)).stripe(); } +void DistributorStripePool::notify_stripe_event_has_triggered(size_t stripe_idx) noexcept { + if (_single_threaded_test_mode) { + return; + } + stripe_thread(stripe_idx).notify_event_has_triggered(); +} + void DistributorStripePool::park_thread_until_released(DistributorStripeThread& thread) noexcept { + if (_single_threaded_test_mode) { + return; + } std::unique_lock lock(_mutex); assert(_parked_threads < _threads.size()); ++_parked_threads; @@ -88,19 +114,25 @@ void DistributorStripePool::start(const std::vector<TickableStripe*>& stripes) { new_stripe->set_ticks_before_wait(_bootstrap_ticks_before_wait); _stripes.emplace_back(std::move(new_stripe)); } + if (_single_threaded_test_mode) { + return; // We want all the control structures in place, but none of the actual OS threads. + } for (auto& s : _stripes) { _threads.emplace_back(_thread_pool.NewThread(s.get())); } } void DistributorStripePool::stop_and_join() { + _stopped = true; + if (_single_threaded_test_mode) { + return; + } for (auto& s : _stripes) { s->signal_should_stop(); } for (auto* t : _threads) { t->Join(); } - _stopped = true; } void DistributorStripePool::set_tick_wait_duration(vespalib::duration new_tick_wait_duration) noexcept { diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_pool.h b/storage/src/vespa/storage/distributor/distributor_stripe_pool.h index 75328479296..1b0fa0dd5d3 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe_pool.h +++ b/storage/src/vespa/storage/distributor/distributor_stripe_pool.h @@ -46,15 +46,20 @@ class DistributorStripePool { size_t _parked_threads; // Must be protected by _park_mutex vespalib::duration _bootstrap_tick_wait_duration; uint32_t _bootstrap_ticks_before_wait; + bool _single_threaded_test_mode; bool _stopped; friend class DistributorStripeThread; + struct PrivateCtorTag{}; public: using const_iterator = StripeVector::const_iterator; + DistributorStripePool(bool test_mode, PrivateCtorTag); DistributorStripePool(); ~DistributorStripePool(); + static std::unique_ptr<DistributorStripePool> make_non_threaded_pool_for_testing(); + // Set up the stripe pool with a 1-1 relationship between the provided // stripes and running threads. Can only be called once per pool. // @@ -78,6 +83,7 @@ public: [[nodiscard]] DistributorStripeThread& stripe_thread(size_t idx) noexcept { return *_stripes[idx]; } + void notify_stripe_event_has_triggered(size_t stripe_idx) noexcept; [[nodiscard]] const TickableStripe& stripe_of_key(uint64_t key) const noexcept; [[nodiscard]] TickableStripe& stripe_of_key(uint64_t key) noexcept; [[nodiscard]] size_t stripe_count() const noexcept { return _stripes.size(); } diff --git a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h index cf2be06bda9..1456308c3d0 100644 --- a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h +++ b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h @@ -144,6 +144,7 @@ private: friend class DistributorStripeTestUtil; friend class DistributorTestUtil; + friend class TopLevelDistributorTestUtil; // TODO STRIPE remove asap // TODO refactor and rewire to avoid needing this direct meddling friend class DistributorStripe; |