diff options
author | Geir Storli <geirst@verizonmedia.com> | 2021-06-24 14:06:00 +0000 |
---|---|---|
committer | Geir Storli <geirst@verizonmedia.com> | 2021-06-25 11:53:55 +0000 |
commit | 6012a24b3a24662cc57fc5be86015ef326803342 (patch) | |
tree | deb29eb73224075416cab42e7bd52f291accdc44 /storage/src/tests/distributor/distributor_stripe_test_util.cpp | |
parent | 0ff6ec0a6fed7ea0617861fc529cf299f097ff58 (diff) |
Prepare baseline utils and tests for a single distributor stripe.
This is copied from DistributorTestUtil and LegacyDistributorTest,
and adjusted to work with one distributor stripe.
Diffstat (limited to 'storage/src/tests/distributor/distributor_stripe_test_util.cpp')
-rw-r--r-- | storage/src/tests/distributor/distributor_stripe_test_util.cpp | 495 |
1 files changed, 495 insertions, 0 deletions
diff --git a/storage/src/tests/distributor/distributor_stripe_test_util.cpp b/storage/src/tests/distributor/distributor_stripe_test_util.cpp new file mode 100644 index 00000000000..fcdbcbbec19 --- /dev/null +++ b/storage/src/tests/distributor/distributor_stripe_test_util.cpp @@ -0,0 +1,495 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "distributor_stripe_test_util.h" +#include <vespa/config-stor-distribution.h> +#include <vespa/document/test/make_bucket_space.h> +#include <vespa/document/test/make_document_bucket.h> +#include <vespa/storage/distributor/distributor_bucket_space.h> +#include <vespa/storage/distributor/distributor_stripe.h> +#include <vespa/storage/distributor/distributor_stripe_component.h> +#include <vespa/storage/distributor/distributormetricsset.h> +#include <vespa/storage/distributor/ideal_state_total_metrics.h> +#include <vespa/vdslib/distribution/distribution.h> +#include <vespa/vespalib/text/stringtokenizer.h> + +using document::test::makeBucketSpace; +using document::test::makeDocumentBucket; + +namespace storage::distributor { + +DistributorStripeTestUtil::DistributorStripeTestUtil() + : _config(), + _node(), + _threadPool(), + _stripe(), + _sender(), + _senderDown(), + _hostInfo(), + _messageSender(_sender, _senderDown) +{ + _config = getStandardConfig(false); +} + +DistributorStripeTestUtil::~DistributorStripeTestUtil() { } + +void +DistributorStripeTestUtil::createLinks() +{ + _node.reset(new TestDistributorApp(_config.getConfigId())); + _threadPool = framework::TickingThreadPool::createDefault("distributor"); + _metrics = std::make_shared<DistributorMetricSet>(); + _ideal_state_metrics = std::make_shared<IdealStateMetricSet>(); + _stripe = std::make_unique<DistributorStripe>(_node->getComponentRegister(), + *_metrics, + *_ideal_state_metrics, + _node->node_identity(), + *_threadPool, + *this, + _messageSender, + *this, + false); +} + +void +DistributorStripeTestUtil::setupDistributor(int redundancy, + int nodeCount, + const std::string& systemState, + uint32_t earlyReturn, + bool requirePrimaryToBeWritten) +{ + setup_distributor(redundancy, nodeCount, lib::ClusterStateBundle(lib::ClusterState(systemState)), earlyReturn, requirePrimaryToBeWritten); +} + +void +DistributorStripeTestUtil::setup_distributor(int redundancy, + int node_count, + const lib::ClusterStateBundle& state, + uint32_t early_return, + bool require_primary_to_be_written) +{ + lib::Distribution::DistributionConfigBuilder config( + lib::Distribution::getDefaultDistributionConfig(redundancy, node_count).get()); + config.redundancy = redundancy; + config.initialRedundancy = early_return; + config.ensurePrimaryPersisted = require_primary_to_be_written; + auto distribution = std::make_shared<lib::Distribution>(config); + _node->getComponentRegister().setDistribution(distribution); + enable_distributor_cluster_state(state); + + // TODO STRIPE: Update this comment now that stripe is used instead. + // This is for all intents and purposes a hack to avoid having the + // distributor treat setting the distribution explicitly as a signal that + // it should send RequestBucketInfo to all configured nodes. + // If we called storage_distribution_changed followed by enableDistribution + // explicitly (which is what happens in "real life"), that is what would + // take place. + // The inverse case of this can be explicitly accomplished by calling + // triggerDistributionChange(). + // This isn't pretty, folks, but it avoids breaking the world for now, + // as many tests have implicit assumptions about this being the behavior. + auto new_configs = BucketSpaceDistributionConfigs::from_default_distribution(std::move(distribution)); + _stripe->update_distribution_config(new_configs); +} + +void +DistributorStripeTestUtil::receive_set_system_state_command(const vespalib::string& state_str) +{ + auto state_cmd = std::make_shared<api::SetSystemStateCommand>(lib::ClusterState(state_str)); + _stripe->handleMessage(state_cmd); // TODO move semantics +} + +void +DistributorStripeTestUtil::handle_top_level_message(const std::shared_ptr<api::StorageMessage>& msg) +{ + _stripe->handleMessage(msg); +} + +void +DistributorStripeTestUtil::setTypeRepo(const std::shared_ptr<const document::DocumentTypeRepo>& repo) +{ + _node->getComponentRegister().setDocumentTypeRepo(repo); +} + +void +DistributorStripeTestUtil::close() +{ + _stripe->flush_and_close(); + _sender.clear(); + _node.reset(0); + _config = getStandardConfig(false); +} + +namespace { + +std::string dumpVector(const std::vector<uint16_t>& vec) { + std::ostringstream ost; + for (uint32_t i = 0; i < vec.size(); ++i) { + if (i != 0) { + ost << ","; + } + ost << vec[i]; + } + return ost.str(); +} + +} + +std::string +DistributorStripeTestUtil::getNodes(document::BucketId id) +{ + BucketDatabase::Entry entry = getBucket(id); + + if (!entry.valid()) { + return id.toString(); + } else { + std::vector<uint16_t> nodes = entry->getNodes(); + std::sort(nodes.begin(), nodes.end()); + + std::ostringstream ost; + ost << id << ": " << dumpVector(nodes); + return ost.str(); + } +} + +std::string +DistributorStripeTestUtil::getIdealStr(document::BucketId id, const lib::ClusterState& state) +{ + if (!getDistributorBucketSpace().owns_bucket_in_state(state, id)) { + return id.toString(); + } + + std::vector<uint16_t> nodes; + getDistribution().getIdealNodes( + lib::NodeType::STORAGE, state, id, nodes); + std::sort(nodes.begin(), nodes.end()); + std::ostringstream ost; + ost << id << ": " << dumpVector(nodes); + return ost.str(); +} + +void +DistributorStripeTestUtil::addIdealNodes(const lib::ClusterState& state, + const document::BucketId& id) +{ + BucketDatabase::Entry entry = getBucket(id); + + if (!entry.valid()) { + entry = BucketDatabase::Entry(id); + } + + std::vector<uint16_t> res; + getDistribution().getIdealNodes( + lib::NodeType::STORAGE, state, id, res); + + for (uint32_t i = 0; i < res.size(); ++i) { + if (state.getNodeState(lib::Node(lib::NodeType::STORAGE, res[i])).getState() != + lib::State::MAINTENANCE) + { + entry->addNode(BucketCopy(0, res[i], api::BucketInfo(1,1,1)), + toVector<uint16_t>(0)); + } + } + + getBucketDatabase().update(entry); +} + +void +DistributorStripeTestUtil::addNodesToBucketDB(const document::Bucket& bucket, const std::string& nodeStr) +{ + BucketDatabase::Entry entry = getBucket(bucket); + + if (!entry.valid()) { + entry = BucketDatabase::Entry(bucket.getBucketId()); + } + + entry->clear(); + + vespalib::StringTokenizer tokenizer(nodeStr, ","); + for (uint32_t i = 0; i < tokenizer.size(); ++i) { + vespalib::StringTokenizer tok2(tokenizer[i], "="); + vespalib::StringTokenizer tok3(tok2[1], "/"); + + api::BucketInfo info(atoi(tok3[0].data()), + atoi(tok3.size() > 1 ? tok3[1].data() : tok3[0].data()), + atoi(tok3.size() > 2 ? tok3[2].data() : tok3[0].data())); + + size_t flagsIdx = 3; + + // Meta info override? For simplicity, require both meta count and size + if (tok3.size() > 4 && (!tok3[3].empty() && isdigit(tok3[3][0]))) { + info.setMetaCount(atoi(tok3[3].data())); + info.setUsedFileSize(atoi(tok3[4].data())); + flagsIdx = 5; + } + + if ((tok3.size() > flagsIdx + 1) && tok3[flagsIdx + 1] == "a") { + info.setActive(); + } else { + info.setActive(false); + } + if ((tok3.size() > flagsIdx + 2) && tok3[flagsIdx + 2] == "r") { + info.setReady(); + } else { + info.setReady(false); + } + + uint16_t idx = atoi(tok2[0].data()); + BucketCopy node( + 0, + idx, + info); + + // Allow user to manually override trusted and active. + if (tok3.size() > flagsIdx && tok3[flagsIdx] == "t") { + node.setTrusted(); + } + + entry->addNodeManual(node); + } + + getBucketDatabase(bucket.getBucketSpace()).update(entry); +} + +void +DistributorStripeTestUtil::addNodesToBucketDB(const document::BucketId& id, + const std::string& nodeStr) +{ + addNodesToBucketDB(document::Bucket(makeBucketSpace(), id), nodeStr); +} + +void +DistributorStripeTestUtil::removeFromBucketDB(const document::BucketId& id) +{ + getBucketDatabase().remove(id); +} + +void +DistributorStripeTestUtil::addIdealNodes(const document::BucketId& id) +{ + // TODO STRIPE roundabout way of getting state bundle..! + addIdealNodes(*operation_context().cluster_state_bundle().getBaselineClusterState(), id); +} + +void +DistributorStripeTestUtil::insertBucketInfo(document::BucketId id, + uint16_t node, + uint32_t checksum, + uint32_t count, + uint32_t size, + bool trusted, + bool active) +{ + api::BucketInfo info(checksum, count, size); + insertBucketInfo(id, node, info, trusted, active); +} + +void +DistributorStripeTestUtil::insertBucketInfo(document::BucketId id, + uint16_t node, + const api::BucketInfo& info, + bool trusted, + bool active) +{ + BucketDatabase::Entry entry = getBucketDatabase().get(id); + if (!entry.valid()) { + entry = BucketDatabase::Entry(id, BucketInfo()); + } + + api::BucketInfo info2(info); + if (active) { + info2.setActive(); + } + BucketCopy copy(operation_context().generate_unique_timestamp(), node, info2); + + entry->addNode(copy.setTrusted(trusted), toVector<uint16_t>(0)); + + getBucketDatabase().update(entry); +} + +std::string +DistributorStripeTestUtil::dumpBucket(const document::BucketId& bid) +{ + return getBucketDatabase().get(bid).toString(); +} + +void +DistributorStripeTestUtil::sendReply(Operation& op, + int idx, + api::ReturnCode::Result result) +{ + if (idx == -1) { + idx = _sender.commands().size() - 1; + } + assert(idx >= 0 && idx < static_cast<int>(_sender.commands().size())); + + std::shared_ptr<api::StorageCommand> cmd = _sender.command(idx); + api::StorageReply::SP reply(cmd->makeReply().release()); + reply->setResult(result); + op.receive(_sender, reply); +} + +BucketDatabase::Entry +DistributorStripeTestUtil::getBucket(const document::Bucket& bucket) const +{ + return getBucketDatabase(bucket.getBucketSpace()).get(bucket.getBucketId()); +} + +BucketDatabase::Entry +DistributorStripeTestUtil::getBucket(const document::BucketId& bId) const +{ + return getBucketDatabase().get(bId); +} + +void +DistributorStripeTestUtil::disableBucketActivationInConfig(bool disable) +{ + vespa::config::content::core::StorDistributormanagerConfigBuilder config; + config.disableBucketActivation = disable; + getConfig().configure(config); +} + +StripeBucketDBUpdater& +DistributorStripeTestUtil::getBucketDBUpdater() { + return _stripe->bucket_db_updater(); +} + +IdealStateManager& +DistributorStripeTestUtil::getIdealStateManager() { + return _stripe->ideal_state_manager(); +} + +ExternalOperationHandler& +DistributorStripeTestUtil::getExternalOperationHandler() { + return _stripe->external_operation_handler(); +} + +const storage::distributor::DistributorNodeContext& +DistributorStripeTestUtil::node_context() const { + return _stripe->_component; +} + +storage::distributor::DistributorStripeOperationContext& +DistributorStripeTestUtil::operation_context() { + return _stripe->_component; +} + +const DocumentSelectionParser& +DistributorStripeTestUtil::doc_selection_parser() const { + return _stripe->_component; +} + +DistributorConfiguration& +DistributorStripeTestUtil::getConfig() { + // TODO STRIPE avoid const cast + return const_cast<DistributorConfiguration&>(_stripe->getConfig()); +} + +DistributorBucketSpace& +DistributorStripeTestUtil::getDistributorBucketSpace() { + return getBucketSpaceRepo().get(makeBucketSpace()); +} + +BucketDatabase& +DistributorStripeTestUtil::getBucketDatabase() { + return getDistributorBucketSpace().getBucketDatabase(); +} + +BucketDatabase& +DistributorStripeTestUtil::getBucketDatabase(document::BucketSpace space) { + return getBucketSpaceRepo().get(space).getBucketDatabase(); +} + +const BucketDatabase& +DistributorStripeTestUtil::getBucketDatabase() const { + return getBucketSpaceRepo().get(makeBucketSpace()).getBucketDatabase(); +} + +const BucketDatabase& +DistributorStripeTestUtil::getBucketDatabase(document::BucketSpace space) const { + return getBucketSpaceRepo().get(space).getBucketDatabase(); +} + +DistributorBucketSpaceRepo& +DistributorStripeTestUtil::getBucketSpaceRepo() { + return _stripe->getBucketSpaceRepo(); +} + +const DistributorBucketSpaceRepo& +DistributorStripeTestUtil::getBucketSpaceRepo() const { + return _stripe->getBucketSpaceRepo(); +} + +DistributorBucketSpaceRepo& +DistributorStripeTestUtil::getReadOnlyBucketSpaceRepo() { + return _stripe->getReadOnlyBucketSpaceRepo(); +} + +const DistributorBucketSpaceRepo& +DistributorStripeTestUtil::getReadOnlyBucketSpaceRepo() const { + return _stripe->getReadOnlyBucketSpaceRepo(); +} + +bool +DistributorStripeTestUtil::distributor_is_in_recovery_mode() const noexcept { + return _stripe->isInRecoveryMode(); +} + +const lib::ClusterStateBundle& +DistributorStripeTestUtil::current_distributor_cluster_state_bundle() const noexcept { + return _stripe->getClusterStateBundle(); +} + +std::string +DistributorStripeTestUtil::active_ideal_state_operations() const { + return _stripe->getActiveIdealStateOperations(); +} + +const PendingMessageTracker& +DistributorStripeTestUtil::pending_message_tracker() const noexcept { + return _stripe->getPendingMessageTracker(); +} + +PendingMessageTracker& +DistributorStripeTestUtil::pending_message_tracker() noexcept { + return _stripe->getPendingMessageTracker(); +} + +std::chrono::steady_clock::duration +DistributorStripeTestUtil::db_memory_sample_interval() const noexcept { + return _stripe->db_memory_sample_interval(); +} + +const lib::Distribution& +DistributorStripeTestUtil::getDistribution() const { + return getBucketSpaceRepo().get(makeBucketSpace()).getDistribution(); +} + +std::vector<document::BucketSpace> +DistributorStripeTestUtil::getBucketSpaces() const +{ + std::vector<document::BucketSpace> res; + for (const auto &repo : getBucketSpaceRepo()) { + res.push_back(repo.first); + } + return res; +} + +void +DistributorStripeTestUtil::enableDistributorClusterState(vespalib::stringref state) +{ + getBucketDBUpdater().simulate_cluster_state_bundle_activation( + lib::ClusterStateBundle(lib::ClusterState(state))); +} + +void +DistributorStripeTestUtil::enable_distributor_cluster_state(const lib::ClusterStateBundle& state) +{ + getBucketDBUpdater().simulate_cluster_state_bundle_activation(state); +} + +void +DistributorStripeTestUtil::setSystemState(const lib::ClusterState& systemState) { + _stripe->enableClusterStateBundle(lib::ClusterStateBundle(systemState)); +} + +} |