diff options
Diffstat (limited to 'storage/src')
9 files changed, 821 insertions, 1 deletions
diff --git a/storage/src/tests/distributor/CMakeLists.txt b/storage/src/tests/distributor/CMakeLists.txt index fad8ca0bb25..0d314e050d4 100644 --- a/storage/src/tests/distributor/CMakeLists.txt +++ b/storage/src/tests/distributor/CMakeLists.txt @@ -13,6 +13,8 @@ vespa_add_executable(storage_distributor_gtest_runner_app TEST distributor_host_info_reporter_test.cpp distributor_message_sender_stub.cpp distributor_stripe_pool_test.cpp + distributor_stripe_test.cpp + distributor_stripe_test_util.cpp distributortestutil.cpp externaloperationhandlertest.cpp garbagecollectiontest.cpp diff --git a/storage/src/tests/distributor/distributor_stripe_test.cpp b/storage/src/tests/distributor/distributor_stripe_test.cpp new file mode 100644 index 00000000000..606117cc693 --- /dev/null +++ b/storage/src/tests/distributor/distributor_stripe_test.cpp @@ -0,0 +1,85 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <tests/distributor/distributor_stripe_test_util.h> +#include <vespa/document/test/make_bucket_space.h> +#include <vespa/document/test/make_document_bucket.h> +#include <vespa/storage/distributor/bucket_spaces_stats_provider.h> +#include <vespa/storage/distributor/distributor.h> +#include <vespa/storage/distributor/distributor_stripe.h> +#include <vespa/storageapi/message/persistence.h> +#include <vespa/storageapi/message/visitor.h> +#include <vespa/vespalib/gtest/gtest.h> +#include <vespa/vespalib/text/stringtokenizer.h> + +using document::Bucket; +using document::BucketId; +using document::BucketSpace; +using document::FixedBucketSpaces; +using document::test::makeBucketSpace; +using document::test::makeDocumentBucket; +using namespace ::testing; + +namespace storage::distributor { + +/** + * This was copied from LegacyDistributorTest and adjusted to work with one distributor stripe. + */ +struct DistributorStripeTest : Test, DistributorStripeTestUtil { + DistributorStripeTest(); + ~DistributorStripeTest() override; + + std::vector<document::BucketSpace> _bucketSpaces; + + void SetUp() override { + createLinks(); + _bucketSpaces = getBucketSpaces(); + }; + + void TearDown() override { + close(); + } + + // Simple type aliases to make interfacing with certain utility functions + // easier. Note that this is only for readability and does not provide any + // added type safety. + using NodeCount = int; + using Redundancy = int; + + std::string testOp(std::shared_ptr<api::StorageMessage> msg) { + _stripe->handleMessage(msg); + + std::string tmp = _sender.getCommands(); + _sender.clear(); + return tmp; + } + +}; + +DistributorStripeTest::DistributorStripeTest() + : Test(), + DistributorStripeTestUtil(), + _bucketSpaces() +{ +} + +DistributorStripeTest::~DistributorStripeTest() = default; + +TEST_F(DistributorStripeTest, operation_generation) { + setupDistributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1"); + + document::BucketId bid; + addNodesToBucketDB(document::BucketId(16, 1), "0=1/1/1/t"); + + EXPECT_EQ("Remove", testOp(std::make_shared<api::RemoveCommand>( + makeDocumentBucket(bid), + document::DocumentId("id:m:test:n=1:foo"), + api::Timestamp(1234)))); + + auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "foo", "bar", ""); + cmd->addBucketToBeVisited(document::BucketId(16, 1)); + cmd->addBucketToBeVisited(document::BucketId()); + + EXPECT_EQ("Visitor Create", testOp(cmd)); +} + +} diff --git a/storage/src/tests/distributor/distributor_stripe_test_util.cpp b/storage/src/tests/distributor/distributor_stripe_test_util.cpp new file mode 100644 index 00000000000..fcdbcbbec19 --- /dev/null +++ b/storage/src/tests/distributor/distributor_stripe_test_util.cpp @@ -0,0 +1,495 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "distributor_stripe_test_util.h" +#include <vespa/config-stor-distribution.h> +#include <vespa/document/test/make_bucket_space.h> +#include <vespa/document/test/make_document_bucket.h> +#include <vespa/storage/distributor/distributor_bucket_space.h> +#include <vespa/storage/distributor/distributor_stripe.h> +#include <vespa/storage/distributor/distributor_stripe_component.h> +#include <vespa/storage/distributor/distributormetricsset.h> +#include <vespa/storage/distributor/ideal_state_total_metrics.h> +#include <vespa/vdslib/distribution/distribution.h> +#include <vespa/vespalib/text/stringtokenizer.h> + +using document::test::makeBucketSpace; +using document::test::makeDocumentBucket; + +namespace storage::distributor { + +DistributorStripeTestUtil::DistributorStripeTestUtil() + : _config(), + _node(), + _threadPool(), + _stripe(), + _sender(), + _senderDown(), + _hostInfo(), + _messageSender(_sender, _senderDown) +{ + _config = getStandardConfig(false); +} + +DistributorStripeTestUtil::~DistributorStripeTestUtil() { } + +void +DistributorStripeTestUtil::createLinks() +{ + _node.reset(new TestDistributorApp(_config.getConfigId())); + _threadPool = framework::TickingThreadPool::createDefault("distributor"); + _metrics = std::make_shared<DistributorMetricSet>(); + _ideal_state_metrics = std::make_shared<IdealStateMetricSet>(); + _stripe = std::make_unique<DistributorStripe>(_node->getComponentRegister(), + *_metrics, + *_ideal_state_metrics, + _node->node_identity(), + *_threadPool, + *this, + _messageSender, + *this, + false); +} + +void +DistributorStripeTestUtil::setupDistributor(int redundancy, + int nodeCount, + const std::string& systemState, + uint32_t earlyReturn, + bool requirePrimaryToBeWritten) +{ + setup_distributor(redundancy, nodeCount, lib::ClusterStateBundle(lib::ClusterState(systemState)), earlyReturn, requirePrimaryToBeWritten); +} + +void +DistributorStripeTestUtil::setup_distributor(int redundancy, + int node_count, + const lib::ClusterStateBundle& state, + uint32_t early_return, + bool require_primary_to_be_written) +{ + lib::Distribution::DistributionConfigBuilder config( + lib::Distribution::getDefaultDistributionConfig(redundancy, node_count).get()); + config.redundancy = redundancy; + config.initialRedundancy = early_return; + config.ensurePrimaryPersisted = require_primary_to_be_written; + auto distribution = std::make_shared<lib::Distribution>(config); + _node->getComponentRegister().setDistribution(distribution); + enable_distributor_cluster_state(state); + + // TODO STRIPE: Update this comment now that stripe is used instead. + // This is for all intents and purposes a hack to avoid having the + // distributor treat setting the distribution explicitly as a signal that + // it should send RequestBucketInfo to all configured nodes. + // If we called storage_distribution_changed followed by enableDistribution + // explicitly (which is what happens in "real life"), that is what would + // take place. + // The inverse case of this can be explicitly accomplished by calling + // triggerDistributionChange(). + // This isn't pretty, folks, but it avoids breaking the world for now, + // as many tests have implicit assumptions about this being the behavior. + auto new_configs = BucketSpaceDistributionConfigs::from_default_distribution(std::move(distribution)); + _stripe->update_distribution_config(new_configs); +} + +void +DistributorStripeTestUtil::receive_set_system_state_command(const vespalib::string& state_str) +{ + auto state_cmd = std::make_shared<api::SetSystemStateCommand>(lib::ClusterState(state_str)); + _stripe->handleMessage(state_cmd); // TODO move semantics +} + +void +DistributorStripeTestUtil::handle_top_level_message(const std::shared_ptr<api::StorageMessage>& msg) +{ + _stripe->handleMessage(msg); +} + +void +DistributorStripeTestUtil::setTypeRepo(const std::shared_ptr<const document::DocumentTypeRepo>& repo) +{ + _node->getComponentRegister().setDocumentTypeRepo(repo); +} + +void +DistributorStripeTestUtil::close() +{ + _stripe->flush_and_close(); + _sender.clear(); + _node.reset(0); + _config = getStandardConfig(false); +} + +namespace { + +std::string dumpVector(const std::vector<uint16_t>& vec) { + std::ostringstream ost; + for (uint32_t i = 0; i < vec.size(); ++i) { + if (i != 0) { + ost << ","; + } + ost << vec[i]; + } + return ost.str(); +} + +} + +std::string +DistributorStripeTestUtil::getNodes(document::BucketId id) +{ + BucketDatabase::Entry entry = getBucket(id); + + if (!entry.valid()) { + return id.toString(); + } else { + std::vector<uint16_t> nodes = entry->getNodes(); + std::sort(nodes.begin(), nodes.end()); + + std::ostringstream ost; + ost << id << ": " << dumpVector(nodes); + return ost.str(); + } +} + +std::string +DistributorStripeTestUtil::getIdealStr(document::BucketId id, const lib::ClusterState& state) +{ + if (!getDistributorBucketSpace().owns_bucket_in_state(state, id)) { + return id.toString(); + } + + std::vector<uint16_t> nodes; + getDistribution().getIdealNodes( + lib::NodeType::STORAGE, state, id, nodes); + std::sort(nodes.begin(), nodes.end()); + std::ostringstream ost; + ost << id << ": " << dumpVector(nodes); + return ost.str(); +} + +void +DistributorStripeTestUtil::addIdealNodes(const lib::ClusterState& state, + const document::BucketId& id) +{ + BucketDatabase::Entry entry = getBucket(id); + + if (!entry.valid()) { + entry = BucketDatabase::Entry(id); + } + + std::vector<uint16_t> res; + getDistribution().getIdealNodes( + lib::NodeType::STORAGE, state, id, res); + + for (uint32_t i = 0; i < res.size(); ++i) { + if (state.getNodeState(lib::Node(lib::NodeType::STORAGE, res[i])).getState() != + lib::State::MAINTENANCE) + { + entry->addNode(BucketCopy(0, res[i], api::BucketInfo(1,1,1)), + toVector<uint16_t>(0)); + } + } + + getBucketDatabase().update(entry); +} + +void +DistributorStripeTestUtil::addNodesToBucketDB(const document::Bucket& bucket, const std::string& nodeStr) +{ + BucketDatabase::Entry entry = getBucket(bucket); + + if (!entry.valid()) { + entry = BucketDatabase::Entry(bucket.getBucketId()); + } + + entry->clear(); + + vespalib::StringTokenizer tokenizer(nodeStr, ","); + for (uint32_t i = 0; i < tokenizer.size(); ++i) { + vespalib::StringTokenizer tok2(tokenizer[i], "="); + vespalib::StringTokenizer tok3(tok2[1], "/"); + + api::BucketInfo info(atoi(tok3[0].data()), + atoi(tok3.size() > 1 ? tok3[1].data() : tok3[0].data()), + atoi(tok3.size() > 2 ? tok3[2].data() : tok3[0].data())); + + size_t flagsIdx = 3; + + // Meta info override? For simplicity, require both meta count and size + if (tok3.size() > 4 && (!tok3[3].empty() && isdigit(tok3[3][0]))) { + info.setMetaCount(atoi(tok3[3].data())); + info.setUsedFileSize(atoi(tok3[4].data())); + flagsIdx = 5; + } + + if ((tok3.size() > flagsIdx + 1) && tok3[flagsIdx + 1] == "a") { + info.setActive(); + } else { + info.setActive(false); + } + if ((tok3.size() > flagsIdx + 2) && tok3[flagsIdx + 2] == "r") { + info.setReady(); + } else { + info.setReady(false); + } + + uint16_t idx = atoi(tok2[0].data()); + BucketCopy node( + 0, + idx, + info); + + // Allow user to manually override trusted and active. + if (tok3.size() > flagsIdx && tok3[flagsIdx] == "t") { + node.setTrusted(); + } + + entry->addNodeManual(node); + } + + getBucketDatabase(bucket.getBucketSpace()).update(entry); +} + +void +DistributorStripeTestUtil::addNodesToBucketDB(const document::BucketId& id, + const std::string& nodeStr) +{ + addNodesToBucketDB(document::Bucket(makeBucketSpace(), id), nodeStr); +} + +void +DistributorStripeTestUtil::removeFromBucketDB(const document::BucketId& id) +{ + getBucketDatabase().remove(id); +} + +void +DistributorStripeTestUtil::addIdealNodes(const document::BucketId& id) +{ + // TODO STRIPE roundabout way of getting state bundle..! + addIdealNodes(*operation_context().cluster_state_bundle().getBaselineClusterState(), id); +} + +void +DistributorStripeTestUtil::insertBucketInfo(document::BucketId id, + uint16_t node, + uint32_t checksum, + uint32_t count, + uint32_t size, + bool trusted, + bool active) +{ + api::BucketInfo info(checksum, count, size); + insertBucketInfo(id, node, info, trusted, active); +} + +void +DistributorStripeTestUtil::insertBucketInfo(document::BucketId id, + uint16_t node, + const api::BucketInfo& info, + bool trusted, + bool active) +{ + BucketDatabase::Entry entry = getBucketDatabase().get(id); + if (!entry.valid()) { + entry = BucketDatabase::Entry(id, BucketInfo()); + } + + api::BucketInfo info2(info); + if (active) { + info2.setActive(); + } + BucketCopy copy(operation_context().generate_unique_timestamp(), node, info2); + + entry->addNode(copy.setTrusted(trusted), toVector<uint16_t>(0)); + + getBucketDatabase().update(entry); +} + +std::string +DistributorStripeTestUtil::dumpBucket(const document::BucketId& bid) +{ + return getBucketDatabase().get(bid).toString(); +} + +void +DistributorStripeTestUtil::sendReply(Operation& op, + int idx, + api::ReturnCode::Result result) +{ + if (idx == -1) { + idx = _sender.commands().size() - 1; + } + assert(idx >= 0 && idx < static_cast<int>(_sender.commands().size())); + + std::shared_ptr<api::StorageCommand> cmd = _sender.command(idx); + api::StorageReply::SP reply(cmd->makeReply().release()); + reply->setResult(result); + op.receive(_sender, reply); +} + +BucketDatabase::Entry +DistributorStripeTestUtil::getBucket(const document::Bucket& bucket) const +{ + return getBucketDatabase(bucket.getBucketSpace()).get(bucket.getBucketId()); +} + +BucketDatabase::Entry +DistributorStripeTestUtil::getBucket(const document::BucketId& bId) const +{ + return getBucketDatabase().get(bId); +} + +void +DistributorStripeTestUtil::disableBucketActivationInConfig(bool disable) +{ + vespa::config::content::core::StorDistributormanagerConfigBuilder config; + config.disableBucketActivation = disable; + getConfig().configure(config); +} + +StripeBucketDBUpdater& +DistributorStripeTestUtil::getBucketDBUpdater() { + return _stripe->bucket_db_updater(); +} + +IdealStateManager& +DistributorStripeTestUtil::getIdealStateManager() { + return _stripe->ideal_state_manager(); +} + +ExternalOperationHandler& +DistributorStripeTestUtil::getExternalOperationHandler() { + return _stripe->external_operation_handler(); +} + +const storage::distributor::DistributorNodeContext& +DistributorStripeTestUtil::node_context() const { + return _stripe->_component; +} + +storage::distributor::DistributorStripeOperationContext& +DistributorStripeTestUtil::operation_context() { + return _stripe->_component; +} + +const DocumentSelectionParser& +DistributorStripeTestUtil::doc_selection_parser() const { + return _stripe->_component; +} + +DistributorConfiguration& +DistributorStripeTestUtil::getConfig() { + // TODO STRIPE avoid const cast + return const_cast<DistributorConfiguration&>(_stripe->getConfig()); +} + +DistributorBucketSpace& +DistributorStripeTestUtil::getDistributorBucketSpace() { + return getBucketSpaceRepo().get(makeBucketSpace()); +} + +BucketDatabase& +DistributorStripeTestUtil::getBucketDatabase() { + return getDistributorBucketSpace().getBucketDatabase(); +} + +BucketDatabase& +DistributorStripeTestUtil::getBucketDatabase(document::BucketSpace space) { + return getBucketSpaceRepo().get(space).getBucketDatabase(); +} + +const BucketDatabase& +DistributorStripeTestUtil::getBucketDatabase() const { + return getBucketSpaceRepo().get(makeBucketSpace()).getBucketDatabase(); +} + +const BucketDatabase& +DistributorStripeTestUtil::getBucketDatabase(document::BucketSpace space) const { + return getBucketSpaceRepo().get(space).getBucketDatabase(); +} + +DistributorBucketSpaceRepo& +DistributorStripeTestUtil::getBucketSpaceRepo() { + return _stripe->getBucketSpaceRepo(); +} + +const DistributorBucketSpaceRepo& +DistributorStripeTestUtil::getBucketSpaceRepo() const { + return _stripe->getBucketSpaceRepo(); +} + +DistributorBucketSpaceRepo& +DistributorStripeTestUtil::getReadOnlyBucketSpaceRepo() { + return _stripe->getReadOnlyBucketSpaceRepo(); +} + +const DistributorBucketSpaceRepo& +DistributorStripeTestUtil::getReadOnlyBucketSpaceRepo() const { + return _stripe->getReadOnlyBucketSpaceRepo(); +} + +bool +DistributorStripeTestUtil::distributor_is_in_recovery_mode() const noexcept { + return _stripe->isInRecoveryMode(); +} + +const lib::ClusterStateBundle& +DistributorStripeTestUtil::current_distributor_cluster_state_bundle() const noexcept { + return _stripe->getClusterStateBundle(); +} + +std::string +DistributorStripeTestUtil::active_ideal_state_operations() const { + return _stripe->getActiveIdealStateOperations(); +} + +const PendingMessageTracker& +DistributorStripeTestUtil::pending_message_tracker() const noexcept { + return _stripe->getPendingMessageTracker(); +} + +PendingMessageTracker& +DistributorStripeTestUtil::pending_message_tracker() noexcept { + return _stripe->getPendingMessageTracker(); +} + +std::chrono::steady_clock::duration +DistributorStripeTestUtil::db_memory_sample_interval() const noexcept { + return _stripe->db_memory_sample_interval(); +} + +const lib::Distribution& +DistributorStripeTestUtil::getDistribution() const { + return getBucketSpaceRepo().get(makeBucketSpace()).getDistribution(); +} + +std::vector<document::BucketSpace> +DistributorStripeTestUtil::getBucketSpaces() const +{ + std::vector<document::BucketSpace> res; + for (const auto &repo : getBucketSpaceRepo()) { + res.push_back(repo.first); + } + return res; +} + +void +DistributorStripeTestUtil::enableDistributorClusterState(vespalib::stringref state) +{ + getBucketDBUpdater().simulate_cluster_state_bundle_activation( + lib::ClusterStateBundle(lib::ClusterState(state))); +} + +void +DistributorStripeTestUtil::enable_distributor_cluster_state(const lib::ClusterStateBundle& state) +{ + getBucketDBUpdater().simulate_cluster_state_bundle_activation(state); +} + +void +DistributorStripeTestUtil::setSystemState(const lib::ClusterState& systemState) { + _stripe->enableClusterStateBundle(lib::ClusterStateBundle(systemState)); +} + +} diff --git a/storage/src/tests/distributor/distributor_stripe_test_util.h b/storage/src/tests/distributor/distributor_stripe_test_util.h new file mode 100644 index 00000000000..32222d85e95 --- /dev/null +++ b/storage/src/tests/distributor/distributor_stripe_test_util.h @@ -0,0 +1,228 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "distributor_message_sender_stub.h" +#include <tests/common/dummystoragelink.h> +#include <tests/common/testhelper.h> +#include <tests/common/teststorageapp.h> +#include <vespa/storage/common/hostreporter/hostinfo.h> +#include <vespa/storage/distributor/stripe_host_info_notifier.h> + +namespace storage { + +namespace framework { struct TickingThreadPool; } + +namespace distributor { + +class DistributorBucketSpace; +class DistributorBucketSpaceRepo; +class DistributorMetricSet; +class DistributorNodeContext; +class DistributorStripe; +class DistributorStripeComponent; +class DistributorStripeOperationContext; +class DistributorStripePool; +class DocumentSelectionParser; +class ExternalOperationHandler; +class IdealStateManager; +class IdealStateMetricSet; +class Operation; +class StripeBucketDBUpdater; + +/** + * Helper class with utilities needed when testing DistributorStripe. + * + * This was copied from DistributorTestUtil (used in LegacyDistributorTest) + * and adjusted to work with one distributor stripe. + */ +class DistributorStripeTestUtil : public DoneInitializeHandler, + public StripeHostInfoNotifier { +public: + DistributorStripeTestUtil(); + ~DistributorStripeTestUtil(); + + /** + * Sets up the storage link chain. + */ + void createLinks(); + void setTypeRepo(const std::shared_ptr<const document::DocumentTypeRepo>& repo); + + void close(); + + /** + * Returns a string with the nodes currently stored in the bucket + * database for the given bucket. + */ + std::string getNodes(document::BucketId id); + + /** + * Returns a string with the ideal state nodes for the given bucket. + */ + std::string getIdealStr(document::BucketId id, const lib::ClusterState& state); + + /** + * Adds the ideal nodes for the given bucket and the given cluster state + * to the bucket database. + */ + void addIdealNodes(const lib::ClusterState& state, const document::BucketId& id); + + /** + * Adds all the ideal nodes for the given bucket to the bucket database. + */ + void addIdealNodes(const document::BucketId& id); + + /** + * Parses the given string to a set of node => bucket info data, + * and inserts them as nodes in the given bucket. + * Format: + * "node1=checksum/docs/size,node2=checksum/docs/size" + */ + void addNodesToBucketDB(const document::Bucket& bucket, const std::string& nodeStr); + // As the above, but always inserts into default bucket space + void addNodesToBucketDB(const document::BucketId& id, const std::string& nodeStr); + + /** + * Removes the given bucket from the bucket database. + */ + void removeFromBucketDB(const document::BucketId& id); + + /** + * Inserts the given bucket information for the given bucket and node in + * the bucket database. + */ + void insertBucketInfo(document::BucketId id, + uint16_t node, + uint32_t checksum, + uint32_t count, + uint32_t size, + bool trusted = false, + bool active = false); + + /** + * Inserts the given bucket information for the given bucket and node in + * the bucket database. + */ + void insertBucketInfo(document::BucketId id, + uint16_t node, + const api::BucketInfo& info, + bool trusted = false, + bool active = false); + + std::string dumpBucket(const document::BucketId& bucket); + + /** + * Replies to message idx sent upwards with the given result code. + * If idx = -1, replies to the last command received upwards. + */ + void sendReply(Operation& op, + int idx = -1, + api::ReturnCode::Result result = api::ReturnCode::OK); + + StripeBucketDBUpdater& getBucketDBUpdater(); + IdealStateManager& getIdealStateManager(); + ExternalOperationHandler& getExternalOperationHandler(); + const storage::distributor::DistributorNodeContext& node_context() const; + storage::distributor::DistributorStripeOperationContext& operation_context(); + const DocumentSelectionParser& doc_selection_parser() const; + + DistributorConfiguration& getConfig(); + + vdstestlib::DirConfig& getDirConfig() { + return _config; + } + + // TODO explicit notion of bucket spaces for tests + DistributorBucketSpace& getDistributorBucketSpace(); + BucketDatabase& getBucketDatabase(); // Implicit default space only + BucketDatabase& getBucketDatabase(document::BucketSpace space); + const BucketDatabase& getBucketDatabase() const; // Implicit default space only + const BucketDatabase& getBucketDatabase(document::BucketSpace space) const; + DistributorBucketSpaceRepo& getBucketSpaceRepo(); + const DistributorBucketSpaceRepo& getBucketSpaceRepo() const; + DistributorBucketSpaceRepo& getReadOnlyBucketSpaceRepo(); + const DistributorBucketSpaceRepo& getReadOnlyBucketSpaceRepo() const; + [[nodiscard]] bool distributor_is_in_recovery_mode() const noexcept; + [[nodiscard]] const lib::ClusterStateBundle& current_distributor_cluster_state_bundle() const noexcept; + [[nodiscard]] std::string active_ideal_state_operations() const; + [[nodiscard]] const PendingMessageTracker& pending_message_tracker() const noexcept; + [[nodiscard]] PendingMessageTracker& pending_message_tracker() noexcept; + [[nodiscard]] std::chrono::steady_clock::duration db_memory_sample_interval() const noexcept; + + const lib::Distribution& getDistribution() const; + + framework::defaultimplementation::FakeClock& getClock() { return _node->getClock(); } + DistributorComponentRegister& getComponentRegister() { return _node->getComponentRegister(); } + DistributorComponentRegisterImpl& getComponentRegisterImpl() { return _node->getComponentRegister(); } + + void setupDistributor(int redundancy, + int nodeCount, + const std::string& systemState, + uint32_t earlyReturn = false, + bool requirePrimaryToBeWritten = true); + + void setup_distributor(int redundancy, + int node_count, + const lib::ClusterStateBundle& state, + uint32_t early_return = false, + bool require_primary_to_be_written = true); + + // Implements DoneInitializeHandler + void notifyDoneInitializing() override {} + + // Implements StripeHostInfoNotifier + void notify_stripe_wants_to_send_host_info(uint16_t stripe_index) override { + (void) stripe_index; + } + + void disableBucketActivationInConfig(bool disable); + + BucketDatabase::Entry getBucket(const document::Bucket& bucket) const; + // Gets bucket entry from default space only + BucketDatabase::Entry getBucket(const document::BucketId& bId) const; + + std::vector<document::BucketSpace> getBucketSpaces() const; + + DistributorMessageSenderStub& sender() noexcept { return _sender; } + const DistributorMessageSenderStub& sender() const noexcept { return _sender; } + + void setSystemState(const lib::ClusterState& systemState); + + // Invokes full cluster state transition pipeline rather than directly applying + // the state and just pretending everything has been completed. + void receive_set_system_state_command(const vespalib::string& state_str); + + void handle_top_level_message(const std::shared_ptr<api::StorageMessage>& msg); + +protected: + vdstestlib::DirConfig _config; + std::unique_ptr<TestDistributorApp> _node; + std::unique_ptr<framework::TickingThreadPool> _threadPool; + std::shared_ptr<DistributorMetricSet> _metrics; + std::shared_ptr<IdealStateMetricSet> _ideal_state_metrics; + std::unique_ptr<DistributorStripe> _stripe; + DistributorMessageSenderStub _sender; + DistributorMessageSenderStub _senderDown; + HostInfo _hostInfo; + + struct MessageSenderImpl : public ChainedMessageSender { + DistributorMessageSenderStub& _sender; + DistributorMessageSenderStub& _senderDown; + MessageSenderImpl(DistributorMessageSenderStub& up, DistributorMessageSenderStub& down) + : _sender(up), _senderDown(down) {} + + void sendUp(const std::shared_ptr<api::StorageMessage>& msg) override { + _sender.send(msg); + } + void sendDown(const std::shared_ptr<api::StorageMessage>& msg) override { + _senderDown.send(msg); + } + }; + MessageSenderImpl _messageSender; + + void enableDistributorClusterState(vespalib::stringref state); + void enable_distributor_cluster_state(const lib::ClusterStateBundle& state); +}; + +} + +} diff --git a/storage/src/vespa/storage/bucketdb/bucketinfo.h b/storage/src/vespa/storage/bucketdb/bucketinfo.h index 1fd6c824904..533192eda89 100644 --- a/storage/src/vespa/storage/bucketdb/bucketinfo.h +++ b/storage/src/vespa/storage/bucketdb/bucketinfo.h @@ -7,7 +7,10 @@ namespace storage { -namespace distributor { class DistributorTestUtil; } +namespace distributor { + class DistributorStripeTestUtil; + class DistributorTestUtil; +} enum class TrustedUpdate { UPDATE, @@ -200,6 +203,7 @@ public: void clear() { _nodes.clear(); } private: + friend class distributor::DistributorStripeTestUtil; friend class distributor::DistributorTestUtil; /** diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.h b/storage/src/vespa/storage/distributor/bucketdbupdater.h index 1ec48aa4d63..787f0362da6 100644 --- a/storage/src/vespa/storage/distributor/bucketdbupdater.h +++ b/storage/src/vespa/storage/distributor/bucketdbupdater.h @@ -72,6 +72,7 @@ public: private: + friend class DistributorStripeTestUtil; friend class DistributorTestUtil; // Only to be used by tests that want to ensure both the BucketDBUpdater _and_ the Distributor // components agree on the currently active cluster state bundle. diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h index 41d88f5dba1..8a522a34be1 100644 --- a/storage/src/vespa/storage/distributor/distributor.h +++ b/storage/src/vespa/storage/distributor/distributor.h @@ -125,9 +125,11 @@ public: }; private: + friend class DistributorStripeTestUtil; friend class DistributorTestUtil; friend class LegacyBucketDBUpdaterTest; friend class MetricUpdateHook; + friend struct DistributorStripeTest; friend struct LegacyDistributorTest; void setNodeStateUp(); diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.h b/storage/src/vespa/storage/distributor/distributor_stripe.h index 8f3de38aec7..9deb0754a1f 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe.h +++ b/storage/src/vespa/storage/distributor/distributor_stripe.h @@ -196,10 +196,12 @@ public: private: // TODO STRIPE: reduce number of friends. DistributorStripe too popular for its own good. friend class Distributor; + friend class DistributorStripeTestUtil; friend class DistributorTestUtil; friend class LegacyBucketDBUpdaterTest; friend class MetricUpdateHook; friend class MultiThreadedStripeAccessGuard; + friend struct DistributorStripeTest; friend struct LegacyDistributorTest; bool handleMessage(const std::shared_ptr<api::StorageMessage>& msg); diff --git a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h index ed059a32d14..cf2be06bda9 100644 --- a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h +++ b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h @@ -142,6 +142,7 @@ private: } }; + friend class DistributorStripeTestUtil; friend class DistributorTestUtil; // TODO refactor and rewire to avoid needing this direct meddling friend class DistributorStripe; |