diff options
author | Tor Brede Vekterli <vekterli@yahooinc.com> | 2021-09-17 09:53:22 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@yahooinc.com> | 2021-09-20 09:37:44 +0000 |
commit | 830bd44f5fa5cb5a0a2ee7ca34e8afaff40c474d (patch) | |
tree | 9b28117a0e092caaa6de7d5c78119d67e0248f0f /storage | |
parent | 98d154b54670a9737317d345c8619b83ea5d4fb3 (diff) |
Remove most traces of distributor legacy mode.
Some assorted legacy bits and pieces still remain on the factory floor,
these will be cleaned up in follow-ups.
Diffstat (limited to 'storage')
21 files changed, 151 insertions, 6066 deletions
diff --git a/storage/src/tests/distributor/CMakeLists.txt b/storage/src/tests/distributor/CMakeLists.txt index 67a7fed8d0b..5a5422156d1 100644 --- a/storage/src/tests/distributor/CMakeLists.txt +++ b/storage/src/tests/distributor/CMakeLists.txt @@ -15,15 +15,12 @@ vespa_add_executable(storage_distributor_gtest_runner_app TEST distributor_stripe_pool_test.cpp distributor_stripe_test.cpp distributor_stripe_test_util.cpp - distributortestutil.cpp externaloperationhandlertest.cpp garbagecollectiontest.cpp getoperationtest.cpp gtest_runner.cpp idealstatemanagertest.cpp joinbuckettest.cpp - legacy_bucket_db_updater_test.cpp - legacy_distributor_test.cpp maintenanceschedulertest.cpp mergelimitertest.cpp mergeoperationtest.cpp diff --git a/storage/src/tests/distributor/distributor_stripe_test_util.cpp b/storage/src/tests/distributor/distributor_stripe_test_util.cpp index 043f996a4a1..4b7e73d3e43 100644 --- a/storage/src/tests/distributor/distributor_stripe_test_util.cpp +++ b/storage/src/tests/distributor/distributor_stripe_test_util.cpp @@ -20,7 +20,6 @@ namespace storage::distributor { DistributorStripeTestUtil::DistributorStripeTestUtil() : _config(), _node(), - _threadPool(), _stripe(), _sender(), _senderDown(), @@ -37,18 +36,14 @@ void DistributorStripeTestUtil::createLinks() { _node.reset(new TestDistributorApp(_config.getConfigId())); - _threadPool = framework::TickingThreadPool::createDefault("distributor"); _metrics = std::make_shared<DistributorMetricSet>(); _ideal_state_metrics = std::make_shared<IdealStateMetricSet>(); _stripe = std::make_unique<DistributorStripe>(_node->getComponentRegister(), *_metrics, *_ideal_state_metrics, _node->node_identity(), - *_threadPool, - *this, _messageSender, *this, - false, _done_initializing); } diff --git a/storage/src/tests/distributor/distributor_stripe_test_util.h b/storage/src/tests/distributor/distributor_stripe_test_util.h index ccade98fd01..e73d1a3baa1 100644 --- a/storage/src/tests/distributor/distributor_stripe_test_util.h +++ b/storage/src/tests/distributor/distributor_stripe_test_util.h @@ -212,7 +212,6 @@ public: protected: vdstestlib::DirConfig _config; std::unique_ptr<TestDistributorApp> _node; - std::unique_ptr<framework::TickingThreadPool> _threadPool; std::shared_ptr<DistributorMetricSet> _metrics; std::shared_ptr<IdealStateMetricSet> _ideal_state_metrics; std::unique_ptr<DistributorStripe> _stripe; diff --git a/storage/src/tests/distributor/distributortestutil.cpp b/storage/src/tests/distributor/distributortestutil.cpp deleted file mode 100644 index 1e24bd72c9b..00000000000 --- a/storage/src/tests/distributor/distributortestutil.cpp +++ /dev/null @@ -1,512 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include "distributortestutil.h" -#include <vespa/config-stor-distribution.h> -#include <vespa/document/test/make_bucket_space.h> -#include <vespa/document/test/make_document_bucket.h> -#include <vespa/storage/distributor/top_level_distributor.h> -#include <vespa/storage/distributor/distributor_bucket_space.h> -#include <vespa/storage/distributor/distributor_stripe.h> -#include <vespa/storage/distributor/distributor_stripe_component.h> -#include <vespa/storage/distributor/distributor_stripe_pool.h> -#include <vespa/vdslib/distribution/distribution.h> -#include <vespa/vespalib/text/stringtokenizer.h> - -using document::test::makeBucketSpace; -using document::test::makeDocumentBucket; - -namespace storage::distributor { - -DistributorTestUtil::DistributorTestUtil() - : _messageSender(_sender, _senderDown), - _num_distributor_stripes(0) // TODO STRIPE change default -{ - _config = getStandardConfig(false); -} -DistributorTestUtil::~DistributorTestUtil() { } - -void -DistributorTestUtil::createLinks() -{ - _node.reset(new TestDistributorApp(_config.getConfigId())); - _threadPool = framework::TickingThreadPool::createDefault("distributor"); - _stripe_pool = std::make_unique<DistributorStripePool>(); - _distributor.reset(new TopLevelDistributor( - _node->getComponentRegister(), - _node->node_identity(), - *_threadPool, - *_stripe_pool, - *this, - _num_distributor_stripes, - _hostInfo, - &_messageSender)); - _component.reset(new storage::DistributorComponent(_node->getComponentRegister(), "distrtestutil")); -}; - -void -DistributorTestUtil::setupDistributor(int redundancy, - int nodeCount, - const std::string& systemState, - uint32_t earlyReturn, - bool requirePrimaryToBeWritten) -{ - setup_distributor(redundancy, nodeCount, lib::ClusterStateBundle(lib::ClusterState(systemState)), earlyReturn, requirePrimaryToBeWritten); -} - -void -DistributorTestUtil::setup_distributor(int redundancy, - int node_count, - const lib::ClusterStateBundle& state, - uint32_t early_return, - bool require_primary_to_be_written) -{ - lib::Distribution::DistributionConfigBuilder config( - lib::Distribution::getDefaultDistributionConfig(redundancy, node_count).get()); - config.redundancy = redundancy; - config.initialRedundancy = early_return; - config.ensurePrimaryPersisted = require_primary_to_be_written; - auto distribution = std::make_shared<lib::Distribution>(config); - _node->getComponentRegister().setDistribution(distribution); - enable_distributor_cluster_state(state); - // This is for all intents and purposes a hack to avoid having the - // distributor treat setting the distribution explicitly as a signal that - // it should send RequestBucketInfo to all configured nodes. - // If we called storage_distribution_changed followed by enableDistribution - // explicitly (which is what happens in "real life"), that is what would - // take place. - // The inverse case of this can be explicitly accomplished by calling - // triggerDistributionChange(). - // This isn't pretty, folks, but it avoids breaking the world for now, - // as many tests have implicit assumptions about this being the behavior. - _distributor->propagateDefaultDistribution(distribution); -} - -void -DistributorTestUtil::setRedundancy(uint32_t redundancy) -{ - auto distribution = std::make_shared<lib::Distribution>( - lib::Distribution::getDefaultDistributionConfig( - redundancy, 100)); - // Same rationale for not triggering a full distribution change as - // in setupDistributor() - _node->getComponentRegister().setDistribution(distribution); - _distributor->propagateDefaultDistribution(std::move(distribution)); -} - -void -DistributorTestUtil::triggerDistributionChange(lib::Distribution::SP distr) -{ - _node->getComponentRegister().setDistribution(std::move(distr)); - _distributor->storageDistributionChanged(); - _distributor->enableNextDistribution(); -} - -void -DistributorTestUtil::receive_set_system_state_command(const vespalib::string& state_str) -{ - auto state_cmd = std::make_shared<api::SetSystemStateCommand>(lib::ClusterState(state_str)); - _distributor->handleMessage(state_cmd); // TODO move semantics -} - -void -DistributorTestUtil::handle_top_level_message(const std::shared_ptr<api::StorageMessage>& msg) -{ - _distributor->handleMessage(msg); -} - -void -DistributorTestUtil::setTypeRepo(const std::shared_ptr<const document::DocumentTypeRepo> &repo) -{ - _node->getComponentRegister().setDocumentTypeRepo(repo); -} - -void -DistributorTestUtil::close() -{ - _component.reset(0); - if (_distributor.get()) { - _distributor->onClose(); - } - _sender.clear(); - _node.reset(0); - _config = getStandardConfig(false); -} - -namespace { - std::string dumpVector(const std::vector<uint16_t>& vec) { - std::ostringstream ost; - for (uint32_t i = 0; i < vec.size(); ++i) { - if (i != 0) { - ost << ","; - } - ost << vec[i]; - } - return ost.str(); - } -} - -std::string -DistributorTestUtil::getNodes(document::BucketId id) -{ - BucketDatabase::Entry entry = getBucket(id); - - if (!entry.valid()) { - return id.toString(); - } else { - std::vector<uint16_t> nodes = entry->getNodes(); - std::sort(nodes.begin(), nodes.end()); - - std::ostringstream ost; - ost << id << ": " << dumpVector(nodes); - return ost.str(); - } -} - -std::string -DistributorTestUtil::getIdealStr(document::BucketId id, const lib::ClusterState& state) -{ - if (!getDistributorBucketSpace().owns_bucket_in_state(state, id)) { - return id.toString(); - } - - std::vector<uint16_t> nodes; - getDistribution().getIdealNodes( - lib::NodeType::STORAGE, state, id, nodes); - std::sort(nodes.begin(), nodes.end()); - std::ostringstream ost; - ost << id << ": " << dumpVector(nodes); - return ost.str(); -} - -void -DistributorTestUtil::addIdealNodes(const lib::ClusterState& state, - const document::BucketId& id) -{ - BucketDatabase::Entry entry = getBucket(id); - - if (!entry.valid()) { - entry = BucketDatabase::Entry(id); - } - - std::vector<uint16_t> res; - assert(_component.get()); - getDistribution().getIdealNodes( - lib::NodeType::STORAGE, state, id, res); - - for (uint32_t i = 0; i < res.size(); ++i) { - if (state.getNodeState(lib::Node(lib::NodeType::STORAGE, res[i])).getState() != - lib::State::MAINTENANCE) - { - entry->addNode(BucketCopy(0, res[i], api::BucketInfo(1,1,1)), - toVector<uint16_t>(0)); - } - } - - getBucketDatabase().update(entry); -} - -void DistributorTestUtil::addNodesToBucketDB(const document::Bucket& bucket, const std::string& nodeStr) { - BucketDatabase::Entry entry = getBucket(bucket); - - if (!entry.valid()) { - entry = BucketDatabase::Entry(bucket.getBucketId()); - } - - entry->clear(); - - vespalib::StringTokenizer tokenizer(nodeStr, ","); - for (uint32_t i = 0; i < tokenizer.size(); ++i) { - vespalib::StringTokenizer tok2(tokenizer[i], "="); - vespalib::StringTokenizer tok3(tok2[1], "/"); - - api::BucketInfo info(atoi(tok3[0].data()), - atoi(tok3.size() > 1 ? tok3[1].data() : tok3[0].data()), - atoi(tok3.size() > 2 ? tok3[2].data() : tok3[0].data())); - - size_t flagsIdx = 3; - - // Meta info override? For simplicity, require both meta count and size - if (tok3.size() > 4 && (!tok3[3].empty() && isdigit(tok3[3][0]))) { - info.setMetaCount(atoi(tok3[3].data())); - info.setUsedFileSize(atoi(tok3[4].data())); - flagsIdx = 5; - } - - if ((tok3.size() > flagsIdx + 1) && tok3[flagsIdx + 1] == "a") { - info.setActive(); - } else { - info.setActive(false); - } - if ((tok3.size() > flagsIdx + 2) && tok3[flagsIdx + 2] == "r") { - info.setReady(); - } else { - info.setReady(false); - } - - uint16_t idx = atoi(tok2[0].data()); - BucketCopy node( - 0, - idx, - info); - - // Allow user to manually override trusted and active. - if (tok3.size() > flagsIdx && tok3[flagsIdx] == "t") { - node.setTrusted(); - } - - entry->addNodeManual(node); - } - - getBucketDatabase(bucket.getBucketSpace()).update(entry); -} - -void -DistributorTestUtil::addNodesToBucketDB(const document::BucketId& id, - const std::string& nodeStr) -{ - addNodesToBucketDB(document::Bucket(makeBucketSpace(), id), nodeStr); -} - -void -DistributorTestUtil::removeFromBucketDB(const document::BucketId& id) -{ - getBucketDatabase().remove(id); -} - -void -DistributorTestUtil::addIdealNodes(const document::BucketId& id) -{ - // TODO STRIPE roundabout way of getting state bundle..! - addIdealNodes(*operation_context().cluster_state_bundle().getBaselineClusterState(), id); -} - -void -DistributorTestUtil::insertBucketInfo(document::BucketId id, - uint16_t node, - uint32_t checksum, - uint32_t count, - uint32_t size, - bool trusted, - bool active) -{ - api::BucketInfo info(checksum, count, size); - insertBucketInfo(id, node, info, trusted, active); -} - -void -DistributorTestUtil::insertBucketInfo(document::BucketId id, - uint16_t node, - const api::BucketInfo& info, - bool trusted, - bool active) -{ - BucketDatabase::Entry entry = getBucketDatabase().get(id); - if (!entry.valid()) { - entry = BucketDatabase::Entry(id, BucketInfo()); - } - - api::BucketInfo info2(info); - if (active) { - info2.setActive(); - } - BucketCopy copy(operation_context().generate_unique_timestamp(), node, info2); - - entry->addNode(copy.setTrusted(trusted), toVector<uint16_t>(0)); - - getBucketDatabase().update(entry); -} - -std::string -DistributorTestUtil::dumpBucket(const document::BucketId& bid) -{ - return getBucketDatabase().get(bid).toString(); -} - -void -DistributorTestUtil::sendReply(Operation& op, - int idx, - api::ReturnCode::Result result) -{ - if (idx == -1) { - idx = _sender.commands().size() - 1; - } - assert(idx >= 0 && idx < static_cast<int>(_sender.commands().size())); - - std::shared_ptr<api::StorageCommand> cmd = _sender.command(idx); - api::StorageReply::SP reply(cmd->makeReply().release()); - reply->setResult(result); - op.receive(_sender, reply); -} - -BucketDatabase::Entry DistributorTestUtil::getBucket(const document::Bucket& bucket) const { - return getBucketDatabase(bucket.getBucketSpace()).get(bucket.getBucketId()); -} - -BucketDatabase::Entry -DistributorTestUtil::getBucket(const document::BucketId& bId) const -{ - return getBucketDatabase().get(bId); -} - -void -DistributorTestUtil::disableBucketActivationInConfig(bool disable) -{ - vespa::config::content::core::StorDistributormanagerConfigBuilder config; - config.disableBucketActivation = disable; - getConfig().configure(config); -} - -StripeBucketDBUpdater& -DistributorTestUtil::getBucketDBUpdater() { - return _distributor->bucket_db_updater(); -} -IdealStateManager& -DistributorTestUtil::getIdealStateManager() { - return _distributor->ideal_state_manager(); -} -ExternalOperationHandler& -DistributorTestUtil::getExternalOperationHandler() { - return _distributor->external_operation_handler(); -} - -const storage::distributor::DistributorNodeContext& -DistributorTestUtil::node_context() const { - return _distributor->distributor_component(); -} - -storage::distributor::DistributorStripeOperationContext& -DistributorTestUtil::operation_context() { - return _distributor->distributor_component(); -} - -const DocumentSelectionParser& -DistributorTestUtil::doc_selection_parser() const { - return _distributor->distributor_component(); -} - -bool -DistributorTestUtil::tick() { - framework::ThreadWaitInfo res( - framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN); - { - framework::TickingLockGuard lock( - _distributor->_threadPool.freezeCriticalTicks()); - res.merge(_distributor->doCriticalTick(0)); - } - res.merge(_distributor->doNonCriticalTick(0)); - return !res.waitWanted(); -} - -DistributorConfiguration& -DistributorTestUtil::getConfig() { - // TODO STRIPE avoid const cast - return const_cast<DistributorConfiguration&>(_distributor->getConfig()); -} - -DistributorBucketSpace & -DistributorTestUtil::getDistributorBucketSpace() -{ - return getBucketSpaceRepo().get(makeBucketSpace()); -} - -BucketDatabase& -DistributorTestUtil::getBucketDatabase() { - return getDistributorBucketSpace().getBucketDatabase(); -} - -BucketDatabase& DistributorTestUtil::getBucketDatabase(document::BucketSpace space) { - return getBucketSpaceRepo().get(space).getBucketDatabase(); -} - -const BucketDatabase& -DistributorTestUtil::getBucketDatabase() const { - return getBucketSpaceRepo().get(makeBucketSpace()).getBucketDatabase(); -} - -const BucketDatabase& DistributorTestUtil::getBucketDatabase(document::BucketSpace space) const { - return getBucketSpaceRepo().get(space).getBucketDatabase(); -} - -DistributorBucketSpaceRepo & -DistributorTestUtil::getBucketSpaceRepo() { - return _distributor->getBucketSpaceRepo(); -} - -const DistributorBucketSpaceRepo & -DistributorTestUtil::getBucketSpaceRepo() const { - return _distributor->getBucketSpaceRepo(); -} - -DistributorBucketSpaceRepo & -DistributorTestUtil::getReadOnlyBucketSpaceRepo() { - return _distributor->getReadOnlyBucketSpaceRepo(); -} - -const DistributorBucketSpaceRepo & -DistributorTestUtil::getReadOnlyBucketSpaceRepo() const { - return _distributor->getReadOnlyBucketSpaceRepo(); -} - -bool -DistributorTestUtil::distributor_is_in_recovery_mode() const noexcept { - return _distributor->isInRecoveryMode(); -} - -const lib::ClusterStateBundle& -DistributorTestUtil::current_distributor_cluster_state_bundle() const noexcept { - return getDistributor().getClusterStateBundle(); -} - -std::string -DistributorTestUtil::active_ideal_state_operations() const { - return _distributor->getActiveIdealStateOperations(); -} - -const PendingMessageTracker& -DistributorTestUtil::pending_message_tracker() const noexcept { - return _distributor->getPendingMessageTracker(); -} - -PendingMessageTracker& -DistributorTestUtil::pending_message_tracker() noexcept { - return _distributor->getPendingMessageTracker(); -} - -std::chrono::steady_clock::duration -DistributorTestUtil::db_memory_sample_interval() const noexcept { - return _distributor->db_memory_sample_interval(); -} - -const lib::Distribution& -DistributorTestUtil::getDistribution() const { - return getBucketSpaceRepo().get(makeBucketSpace()).getDistribution(); -} - -std::vector<document::BucketSpace> -DistributorTestUtil::getBucketSpaces() const -{ - std::vector<document::BucketSpace> res; - for (const auto &repo : getBucketSpaceRepo()) { - res.push_back(repo.first); - } - return res; -} - -void -DistributorTestUtil::enableDistributorClusterState(vespalib::stringref state) -{ - getBucketDBUpdater().simulate_cluster_state_bundle_activation( - lib::ClusterStateBundle(lib::ClusterState(state))); -} - -void -DistributorTestUtil::enable_distributor_cluster_state(const lib::ClusterStateBundle& state) -{ - getBucketDBUpdater().simulate_cluster_state_bundle_activation(state); -} - -void -DistributorTestUtil::setSystemState(const lib::ClusterState& systemState) { - _distributor->enableClusterStateBundle(lib::ClusterStateBundle(systemState)); -} - -} diff --git a/storage/src/tests/distributor/distributortestutil.h b/storage/src/tests/distributor/distributortestutil.h deleted file mode 100644 index 9457bfeba83..00000000000 --- a/storage/src/tests/distributor/distributortestutil.h +++ /dev/null @@ -1,248 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#pragma once - -#include "distributor_message_sender_stub.h" -#include <tests/common/dummystoragelink.h> -#include <tests/common/testhelper.h> -#include <tests/common/teststorageapp.h> -#include <vespa/storage/common/hostreporter/hostinfo.h> -#include <vespa/storage/frameworkimpl/component/distributorcomponentregisterimpl.h> -#include <vespa/storage/storageutil/utils.h> -#include <vespa/storageapi/message/state.h> -#include <vespa/storageframework/defaultimplementation/clock/fakeclock.h> - -namespace storage { - -namespace framework { struct TickingThreadPool; } - -namespace distributor { - -class TopLevelDistributor; -class DistributorBucketSpace; -class DistributorBucketSpaceRepo; -class DistributorNodeContext; -class DistributorStripe; -class DistributorStripeComponent; -class DistributorStripeOperationContext; -class DistributorStripePool; -class DocumentSelectionParser; -class ExternalOperationHandler; -class IdealStateManager; -class Operation; -class StripeBucketDBUpdater; - -// TODO STRIPE rename to DistributorStripeTestUtil? -class DistributorTestUtil : private DoneInitializeHandler -{ -public: - DistributorTestUtil(); - ~DistributorTestUtil(); - - /** - * Sets up the storage link chain. - */ - void createLinks(); - void setTypeRepo(const std::shared_ptr<const document::DocumentTypeRepo> &repo); - - void close(); - - /** - * Returns a string with the nodes currently stored in the bucket - * database for the given bucket. - */ - std::string getNodes(document::BucketId id); - - /** - * Returns a string with the ideal state nodes for the given bucket. - */ - std::string getIdealStr(document::BucketId id, const lib::ClusterState& state); - - /** - * Adds the ideal nodes for the given bucket and the given cluster state - * to the bucket database. - */ - void addIdealNodes(const lib::ClusterState& state, const document::BucketId& id); - - /** - * Adds all the ideal nodes for the given bucket to the bucket database. - */ - void addIdealNodes(const document::BucketId& id); - - /** - * Parses the given string to a set of node => bucket info data, - * and inserts them as nodes in the given bucket. - * Format: - * "node1=checksum/docs/size,node2=checksum/docs/size" - */ - void addNodesToBucketDB(const document::Bucket& bucket, const std::string& nodeStr); - // As the above, but always inserts into default bucket space - void addNodesToBucketDB(const document::BucketId& id, const std::string& nodeStr); - - /** - * Removes the given bucket from the bucket database. - */ - void removeFromBucketDB(const document::BucketId& id); - - /** - * Inserts the given bucket information for the given bucket and node in - * the bucket database. - */ - void insertBucketInfo(document::BucketId id, - uint16_t node, - uint32_t checksum, - uint32_t count, - uint32_t size, - bool trusted = false, - bool active = false); - - /** - * Inserts the given bucket information for the given bucket and node in - * the bucket database. - */ - void insertBucketInfo(document::BucketId id, - uint16_t node, - const api::BucketInfo& info, - bool trusted = false, - bool active = false); - - std::string dumpBucket(const document::BucketId& bucket); - - /** - * Replies to message idx sent upwards with the given result code. - * If idx = -1, replies to the last command received upwards. - */ - void sendReply(Operation& op, - int idx = -1, - api::ReturnCode::Result result = api::ReturnCode::OK); - - StripeBucketDBUpdater& getBucketDBUpdater(); - IdealStateManager& getIdealStateManager(); - ExternalOperationHandler& getExternalOperationHandler(); - const storage::distributor::DistributorNodeContext& node_context() const; - storage::distributor::DistributorStripeOperationContext& operation_context(); - const DocumentSelectionParser& doc_selection_parser() const; - - TopLevelDistributor& getDistributor() noexcept { return *_distributor; } - const TopLevelDistributor& getDistributor() const noexcept { return *_distributor; } - - bool tick(); - - DistributorConfiguration& getConfig(); - - vdstestlib::DirConfig& getDirConfig() { - return _config; - } - - // TODO explicit notion of bucket spaces for tests - DistributorBucketSpace &getDistributorBucketSpace(); - BucketDatabase& getBucketDatabase(); // Implicit default space only - BucketDatabase& getBucketDatabase(document::BucketSpace space); - const BucketDatabase& getBucketDatabase() const; // Implicit default space only - const BucketDatabase& getBucketDatabase(document::BucketSpace space) const; - DistributorBucketSpaceRepo &getBucketSpaceRepo(); - const DistributorBucketSpaceRepo &getBucketSpaceRepo() const; - DistributorBucketSpaceRepo& getReadOnlyBucketSpaceRepo(); - const DistributorBucketSpaceRepo& getReadOnlyBucketSpaceRepo() const; - [[nodiscard]] bool distributor_is_in_recovery_mode() const noexcept; - [[nodiscard]] const lib::ClusterStateBundle& current_distributor_cluster_state_bundle() const noexcept; - [[nodiscard]] std::string active_ideal_state_operations() const; - [[nodiscard]] const PendingMessageTracker& pending_message_tracker() const noexcept; - [[nodiscard]] PendingMessageTracker& pending_message_tracker() noexcept; - [[nodiscard]] std::chrono::steady_clock::duration db_memory_sample_interval() const noexcept; - - const lib::Distribution& getDistribution() const; - // "End to end" distribution change trigger, which will invoke the bucket - // DB updater as expected based on the previous and new cluster state - // and config. - void triggerDistributionChange(std::shared_ptr<lib::Distribution> distr); - - framework::defaultimplementation::FakeClock& getClock() { return _node->getClock(); } - DistributorComponentRegister& getComponentRegister() { return _node->getComponentRegister(); } - DistributorComponentRegisterImpl& getComponentRegisterImpl() { return _node->getComponentRegister(); } - - StorageComponent& getComponent() { - if (_component.get() == 0) { - _component.reset(new storage::DistributorComponent( - _node->getComponentRegister(), "distributor_test_utils")); - } - return *_component; - } - - void setupDistributor(int redundancy, - int nodeCount, - const std::string& systemState, - uint32_t earlyReturn = false, - bool requirePrimaryToBeWritten = true); - - void setup_distributor(int redundancy, - int node_count, - const lib::ClusterStateBundle& state, - uint32_t early_return = false, - bool require_primary_to_be_written = true); - - void setRedundancy(uint32_t redundancy); - - void notifyDoneInitializing() override {} - - // Must implement this for storage server interface for now - virtual api::Timestamp getUniqueTimestamp() { - return _component->getUniqueTimestamp(); - } - - void disableBucketActivationInConfig(bool disable); - - BucketDatabase::Entry getBucket(const document::Bucket& bucket) const; - // Gets bucket entry from default space only - BucketDatabase::Entry getBucket(const document::BucketId& bId) const; - - std::vector<document::BucketSpace> getBucketSpaces() const; - - DistributorMessageSenderStub& sender() noexcept { return _sender; } - const DistributorMessageSenderStub& sender() const noexcept { return _sender; } - - void setSystemState(const lib::ClusterState& systemState); - - // Invokes full cluster state transition pipeline rather than directly applying - // the state and just pretending everything has been completed. - void receive_set_system_state_command(const vespalib::string& state_str); - - void handle_top_level_message(const std::shared_ptr<api::StorageMessage>& msg); - - // Must be called prior to createLinks() to have any effect - void set_num_distributor_stripes(uint32_t n_stripes) noexcept { - _num_distributor_stripes = n_stripes; - } -protected: - vdstestlib::DirConfig _config; - std::unique_ptr<TestDistributorApp> _node; - std::unique_ptr<framework::TickingThreadPool> _threadPool; - std::unique_ptr<DistributorStripePool> _stripe_pool; - std::unique_ptr<TopLevelDistributor> _distributor; - std::unique_ptr<storage::DistributorComponent> _component; - DistributorMessageSenderStub _sender; - DistributorMessageSenderStub _senderDown; - HostInfo _hostInfo; - - struct MessageSenderImpl : public ChainedMessageSender { - DistributorMessageSenderStub& _sender; - DistributorMessageSenderStub& _senderDown; - MessageSenderImpl(DistributorMessageSenderStub& up, DistributorMessageSenderStub& down) - : _sender(up), _senderDown(down) {} - - void sendUp(const std::shared_ptr<api::StorageMessage>& msg) override { - _sender.send(msg); - } - void sendDown(const std::shared_ptr<api::StorageMessage>& msg) override { - _senderDown.send(msg); - } - }; - MessageSenderImpl _messageSender; - uint32_t _num_distributor_stripes; - - void enableDistributorClusterState(vespalib::stringref state); - void enable_distributor_cluster_state(const lib::ClusterStateBundle& state); -}; - -} - -} diff --git a/storage/src/tests/distributor/legacy_bucket_db_updater_test.cpp b/storage/src/tests/distributor/legacy_bucket_db_updater_test.cpp deleted file mode 100644 index 5fa3ae5840b..00000000000 --- a/storage/src/tests/distributor/legacy_bucket_db_updater_test.cpp +++ /dev/null @@ -1,2893 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include <vespa/storageapi/message/persistence.h> -#include <vespa/storage/distributor/top_level_bucket_db_updater.h> -#include <vespa/storage/distributor/bucket_space_distribution_context.h> -#include <vespa/storage/distributor/distributormetricsset.h> -#include <vespa/storage/distributor/pending_bucket_space_db_transition.h> -#include <vespa/storage/distributor/outdated_nodes_map.h> -#include <vespa/storage/storageutil/distributorstatecache.h> -#include <tests/distributor/distributortestutil.h> -#include <vespa/document/test/make_document_bucket.h> -#include <vespa/document/test/make_bucket_space.h> -#include <vespa/document/bucket/fixed_bucket_spaces.h> -#include <vespa/storage/distributor/simpleclusterinformation.h> -#include <vespa/storage/distributor/top_level_distributor.h> -#include <vespa/storage/distributor/distributor_stripe.h> -#include <vespa/storage/distributor/distributor_bucket_space.h> -#include <vespa/vespalib/gtest/gtest.h> -#include <vespa/vespalib/text/stringtokenizer.h> -#include <vespa/vespalib/util/benchmark_timer.h> -#include <sstream> -#include <iomanip> - -using namespace storage::api; -using namespace storage::lib; -using document::test::makeDocumentBucket; -using document::test::makeBucketSpace; -using document::BucketSpace; -using document::FixedBucketSpaces; -using document::BucketId; -using document::Bucket; - -using namespace ::testing; - -namespace storage::distributor { - -namespace { - -std::string -getStringList(std::string s, uint32_t count) -{ - std::ostringstream ost; - for (uint32_t i = 0; i < count; ++i) { - if (i > 0) { - ost << ","; - } - ost << s; - } - return ost.str(); -} - -std::string -getRequestBucketInfoStrings(uint32_t count) -{ - return getStringList("Request bucket info", count); -} - -} - -// TODO STRIPE: Remove this test when legacy mode is gone. -class LegacyBucketDBUpdaterTest : public Test, - public DistributorTestUtil -{ -public: - LegacyBucketDBUpdaterTest(); - ~LegacyBucketDBUpdaterTest() override; - - auto &defaultDistributorBucketSpace() { return getBucketSpaceRepo().get(makeBucketSpace()); } - - bool bucketExistsThatHasNode(int bucketCount, uint16_t node) const; - - ClusterInformation::CSP createClusterInfo(const std::string& clusterStateString) { - lib::ClusterState baselineClusterState(clusterStateString); - lib::ClusterStateBundle clusterStateBundle(baselineClusterState); - ClusterInformation::CSP clusterInfo( - new SimpleClusterInformation( - getBucketDBUpdater().node_context().node_index(), - clusterStateBundle, - "ui")); - for (auto* repo : {&mutable_repo(), &read_only_repo()}) { - for (auto& space : *repo) { - space.second->setClusterState(clusterStateBundle.getDerivedClusterState(space.first)); - } - } - return clusterInfo; - } - - DistributorBucketSpaceRepo& mutable_repo() noexcept { return getBucketSpaceRepo(); } - // Note: not calling this "immutable_repo" since it may actually be modified by the pending - // cluster state component (just not by operations), so it would not have the expected semantics. - DistributorBucketSpaceRepo& read_only_repo() noexcept { return getReadOnlyBucketSpaceRepo(); } - - BucketDatabase& mutable_default_db() noexcept { - return mutable_repo().get(FixedBucketSpaces::default_space()).getBucketDatabase(); - } - BucketDatabase& mutable_global_db() noexcept { - return mutable_repo().get(FixedBucketSpaces::global_space()).getBucketDatabase(); - } - BucketDatabase& read_only_default_db() noexcept { - return read_only_repo().get(FixedBucketSpaces::default_space()).getBucketDatabase(); - } - BucketDatabase& read_only_global_db() noexcept { - return read_only_repo().get(FixedBucketSpaces::global_space()).getBucketDatabase(); - } - - static std::string getNodeList(std::vector<uint16_t> nodes, size_t count); - - std::string getNodeList(std::vector<uint16_t> nodes); - - std::vector<uint16_t> - expandNodeVec(const std::vector<uint16_t> &nodes); - - std::vector<document::BucketSpace> _bucketSpaces; - - size_t messageCount(size_t messagesPerBucketSpace) const { - return messagesPerBucketSpace * _bucketSpaces.size(); - } - - void trigger_completed_but_not_yet_activated_transition( - vespalib::stringref initial_state, uint32_t initial_buckets, uint32_t initial_expected_msgs, - vespalib::stringref pending_state, uint32_t pending_buckets, uint32_t pending_expected_msgs); - -public: - using OutdatedNodesMap = dbtransition::OutdatedNodesMap; - void SetUp() override { - createLinks(); - _bucketSpaces = getBucketSpaces(); - // Disable deferred activation by default (at least for now) to avoid breaking the entire world. - getBucketDBUpdater().set_stale_reads_enabled(false); - }; - - void TearDown() override { - close(); - } - - std::shared_ptr<RequestBucketInfoReply> getFakeBucketReply( - const lib::ClusterState& state, - const RequestBucketInfoCommand& cmd, - int storageIndex, - uint32_t bucketCount, - uint32_t invalidBucketCount = 0) - { - auto sreply = std::make_shared<RequestBucketInfoReply>(cmd); - sreply->setAddress(storageAddress(storageIndex)); - - api::RequestBucketInfoReply::EntryVector &vec = sreply->getBucketInfo(); - - for (uint32_t i=0; i<bucketCount + invalidBucketCount; i++) { - if (!getDistributorBucketSpace().owns_bucket_in_state(state, document::BucketId(16, i))) { - continue; - } - - std::vector<uint16_t> nodes; - defaultDistributorBucketSpace().getDistribution().getIdealNodes( - lib::NodeType::STORAGE, - state, - document::BucketId(16, i), - nodes); - - for (uint32_t j=0; j<nodes.size(); j++) { - if (nodes[j] == storageIndex) { - if (i >= bucketCount) { - vec.push_back(api::RequestBucketInfoReply::Entry( - document::BucketId(16, i), - api::BucketInfo())); - } else { - vec.push_back(api::RequestBucketInfoReply::Entry( - document::BucketId(16, i), - api::BucketInfo(10,1,1))); - } - } - } - } - - return sreply; - } - - void fakeBucketReply(const lib::ClusterState &state, - const api::StorageCommand &cmd, - uint32_t bucketCount, - uint32_t invalidBucketCount = 0) - { - ASSERT_EQ(cmd.getType(), MessageType::REQUESTBUCKETINFO); - const api::StorageMessageAddress &address(*cmd.getAddress()); - getBucketDBUpdater().onRequestBucketInfoReply( - getFakeBucketReply(state, - dynamic_cast<const RequestBucketInfoCommand &>(cmd), - address.getIndex(), - bucketCount, - invalidBucketCount)); - } - - void sendFakeReplyForSingleBucketRequest( - const api::RequestBucketInfoCommand& rbi) - { - ASSERT_EQ(size_t(1), rbi.getBuckets().size()); - const document::BucketId& bucket(rbi.getBuckets()[0]); - - std::shared_ptr<api::RequestBucketInfoReply> reply( - new api::RequestBucketInfoReply(rbi)); - reply->getBucketInfo().push_back( - api::RequestBucketInfoReply::Entry(bucket, - api::BucketInfo(20, 10, 12, 50, 60, true, true))); - getBucketDBUpdater().onRequestBucketInfoReply(reply); - } - - std::string verifyBucket(document::BucketId id, const lib::ClusterState& state) { - BucketDatabase::Entry entry = getBucketDatabase().get(id); - if (!entry.valid()) { - return vespalib::make_string("%s doesn't exist in DB", - id.toString().c_str()); - } - - std::vector<uint16_t> nodes; - defaultDistributorBucketSpace().getDistribution().getIdealNodes( - lib::NodeType::STORAGE, - state, - document::BucketId(id), - nodes); - - if (nodes.size() != entry->getNodeCount()) { - return vespalib::make_string("Bucket Id %s has %d nodes in " - "ideal state, but has only %d in DB", - id.toString().c_str(), - (int)nodes.size(), - (int)entry->getNodeCount()); - } - - for (uint32_t i = 0; i<nodes.size(); i++) { - bool found = false; - - for (uint32_t j = 0; j<entry->getNodeCount(); j++) { - if (nodes[i] == entry->getNodeRef(j).getNode()) { - found = true; - } - } - - if (!found) { - return vespalib::make_string( - "Bucket Id %s has no copy from node %d", - id.toString().c_str(), - nodes[i]); - } - } - - return ""; - } - - - void verifyInvalid(document::BucketId id, int storageNode) { - BucketDatabase::Entry entry = getBucketDatabase().get(id); - - ASSERT_TRUE(entry.valid()); - - bool found = false; - for (uint32_t j = 0; j<entry->getNodeCount(); j++) { - if (entry->getNodeRef(j).getNode() == storageNode) { - ASSERT_FALSE(entry->getNodeRef(j).valid()); - found = true; - } - } - - ASSERT_TRUE(found); - } - - struct OrderByIncreasingNodeIndex { - template <typename T> - bool operator()(const T& lhs, const T& rhs) { - return (lhs->getAddress()->getIndex() - < rhs->getAddress()->getIndex()); - } - }; - - void sortSentMessagesByIndex(DistributorMessageSenderStub& sender, - size_t sortFromOffset = 0) - { - std::sort(sender.commands().begin() + sortFromOffset, - sender.commands().end(), - OrderByIncreasingNodeIndex()); - } - - void setSystemState(const lib::ClusterState& state) { - const size_t sizeBeforeState = _sender.commands().size(); - getBucketDBUpdater().onSetSystemState( - std::make_shared<api::SetSystemStateCommand>(state)); - // A lot of test logic has the assumption that all messages sent as a - // result of cluster state changes will be in increasing index order - // (for simplicity, not because this is required for correctness). - // Only sort the messages that arrived as a result of the state, don't - // jumble the sorting with any existing messages. - sortSentMessagesByIndex(_sender, sizeBeforeState); - } - - void set_cluster_state_bundle(const lib::ClusterStateBundle& state) { - const size_t sizeBeforeState = _sender.commands().size(); - getBucketDBUpdater().onSetSystemState( - std::make_shared<api::SetSystemStateCommand>(state)); - sortSentMessagesByIndex(_sender, sizeBeforeState); - } - - bool activate_cluster_state_version(uint32_t version) { - return getBucketDBUpdater().onActivateClusterStateVersion( - std::make_shared<api::ActivateClusterStateVersionCommand>(version)); - } - - void assert_has_activate_cluster_state_reply_with_actual_version(uint32_t version) { - ASSERT_EQ(size_t(1), _sender.replies().size()); - auto* response = dynamic_cast<api::ActivateClusterStateVersionReply*>(_sender.replies().back().get()); - ASSERT_TRUE(response != nullptr); - ASSERT_EQ(version, response->actualVersion()); - _sender.clear(); - } - - void completeBucketInfoGathering(const lib::ClusterState& state, - size_t expectedMsgs, - uint32_t bucketCount = 1, - uint32_t invalidBucketCount = 0) - { - ASSERT_EQ(expectedMsgs, _sender.commands().size()); - - for (uint32_t i = 0; i < _sender.commands().size(); i++) { - ASSERT_NO_FATAL_FAILURE(fakeBucketReply(state, *_sender.command(i), - bucketCount, invalidBucketCount)); - } - } - - void setAndEnableClusterState(const lib::ClusterState& state, - uint32_t expectedMsgs, - uint32_t nBuckets) - { - _sender.clear(); - setSystemState(state); - ASSERT_NO_FATAL_FAILURE(completeBucketInfoGathering(state, expectedMsgs, nBuckets)); - } - - void completeStateTransitionInSeconds(const std::string& stateStr, - uint32_t seconds, - uint32_t expectedMsgs) - { - _sender.clear(); - lib::ClusterState state(stateStr); - setSystemState(state); - getClock().addSecondsToTime(seconds); - ASSERT_NO_FATAL_FAILURE(completeBucketInfoGathering(state, expectedMsgs)); - } - - uint64_t lastTransitionTimeInMillis() { - return uint64_t(getDistributor().getMetrics().stateTransitionTime.getLast()); - } - - void setStorageNodes(uint32_t numStorageNodes) { - _sender.clear(); - - lib::ClusterState newState( - vespalib::make_string("distributor:1 storage:%d", numStorageNodes)); - - setSystemState(newState); - - for (uint32_t i=0; i< messageCount(numStorageNodes); i++) { - ASSERT_EQ(_sender.command(i)->getType(), MessageType::REQUESTBUCKETINFO); - - const api::StorageMessageAddress *address = _sender.command(i)->getAddress(); - ASSERT_EQ((uint32_t)(i / _bucketSpaces.size()), (uint32_t)address->getIndex()); - } - } - - void initializeNodesAndBuckets(uint32_t numStorageNodes, - uint32_t numBuckets) - { - ASSERT_NO_FATAL_FAILURE(setStorageNodes(numStorageNodes)); - - vespalib::string state(vespalib::make_string( - "distributor:1 storage:%d", numStorageNodes)); - lib::ClusterState newState(state); - - for (uint32_t i=0; i< messageCount(numStorageNodes); i++) { - ASSERT_NO_FATAL_FAILURE(fakeBucketReply(newState, *_sender.command(i), numBuckets)); - } - ASSERT_NO_FATAL_FAILURE(assertCorrectBuckets(numBuckets, state)); - } - - bool bucketHasNode(document::BucketId id, uint16_t node) const { - BucketDatabase::Entry entry = getBucket(id); - assert(entry.valid()); - - for (uint32_t j=0; j<entry->getNodeCount(); j++) { - if (entry->getNodeRef(j).getNode() == node) { - return true; - } - } - - return false; - } - - api::StorageMessageAddress storageAddress(uint16_t node) { - static vespalib::string _storage("storage"); - return api::StorageMessageAddress(&_storage, lib::NodeType::STORAGE, node); - } - - std::string getSentNodes(const std::string& oldClusterState, - const std::string& newClusterState); - - std::string getSentNodesDistributionChanged( - const std::string& oldClusterState); - - std::vector<uint16_t> getSentNodesWithPreemption( - const std::string& oldClusterState, - uint32_t expectedOldStateMessages, - const std::string& preemptedClusterState, - const std::string& newClusterState); - - std::vector<uint16_t> getSendSet() const; - - std::string mergeBucketLists( - const lib::ClusterState& oldState, - const std::string& existingData, - const lib::ClusterState& newState, - const std::string& newData, - bool includeBucketInfo = false); - - std::string mergeBucketLists( - const std::string& existingData, - const std::string& newData, - bool includeBucketInfo = false); - - void assertCorrectBuckets(int numBuckets, const std::string& stateStr) { - lib::ClusterState state(stateStr); - for (int i=0; i<numBuckets; i++) { - ASSERT_EQ(getIdealStr(document::BucketId(16, i), state), - getNodes(document::BucketId(16, i))); - } - } - - void setDistribution(const std::string& distConfig) { - triggerDistributionChange( - std::make_shared<lib::Distribution>(distConfig)); - } - - std::string getDistConfig6Nodes2Groups() const { - return ("redundancy 2\n" - "group[3]\n" - "group[0].name \"invalid\"\n" - "group[0].index \"invalid\"\n" - "group[0].partitions 1|*\n" - "group[0].nodes[0]\n" - "group[1].name rack0\n" - "group[1].index 0\n" - "group[1].nodes[3]\n" - "group[1].nodes[0].index 0\n" - "group[1].nodes[1].index 1\n" - "group[1].nodes[2].index 2\n" - "group[2].name rack1\n" - "group[2].index 1\n" - "group[2].nodes[3]\n" - "group[2].nodes[0].index 3\n" - "group[2].nodes[1].index 4\n" - "group[2].nodes[2].index 5\n"); - } - - std::string getDistConfig6Nodes4Groups() const { - return ("redundancy 2\n" - "group[4]\n" - "group[0].name \"invalid\"\n" - "group[0].index \"invalid\"\n" - "group[0].partitions 1|*\n" - "group[0].nodes[0]\n" - "group[1].name rack0\n" - "group[1].index 0\n" - "group[1].nodes[2]\n" - "group[1].nodes[0].index 0\n" - "group[1].nodes[1].index 1\n" - "group[2].name rack1\n" - "group[2].index 1\n" - "group[2].nodes[2]\n" - "group[2].nodes[0].index 2\n" - "group[2].nodes[1].index 3\n" - "group[3].name rack2\n" - "group[3].index 2\n" - "group[3].nodes[2]\n" - "group[3].nodes[0].index 4\n" - "group[3].nodes[1].index 5\n"); - } - - std::string getDistConfig3Nodes1Group() const { - return ("redundancy 2\n" - "group[2]\n" - "group[0].name \"invalid\"\n" - "group[0].index \"invalid\"\n" - "group[0].partitions 1|*\n" - "group[0].nodes[0]\n" - "group[1].name rack0\n" - "group[1].index 0\n" - "group[1].nodes[3]\n" - "group[1].nodes[0].index 0\n" - "group[1].nodes[1].index 1\n" - "group[1].nodes[2].index 2\n"); - } - - struct PendingClusterStateFixture { - DistributorMessageSenderStub sender; - framework::defaultimplementation::FakeClock clock; - std::unique_ptr<PendingClusterState> state; - - PendingClusterStateFixture( - LegacyBucketDBUpdaterTest& owner, - const std::string& oldClusterState, - const std::string& newClusterState) - { - std::shared_ptr<api::SetSystemStateCommand> cmd( - new api::SetSystemStateCommand( - lib::ClusterState(newClusterState))); - - ClusterInformation::CSP clusterInfo( - owner.createClusterInfo(oldClusterState)); - - OutdatedNodesMap outdatedNodesMap; - state = PendingClusterState::createForClusterStateChange( - clock, clusterInfo, sender, - owner.getBucketSpaceRepo(), - cmd, outdatedNodesMap, api::Timestamp(1)); - } - - PendingClusterStateFixture( - LegacyBucketDBUpdaterTest& owner, - const std::string& oldClusterState) - { - ClusterInformation::CSP clusterInfo( - owner.createClusterInfo(oldClusterState)); - - state = PendingClusterState::createForDistributionChange( - clock, clusterInfo, sender, owner.getBucketSpaceRepo(), api::Timestamp(1)); - } - }; - - std::unique_ptr<PendingClusterStateFixture> createPendingStateFixtureForStateChange( - const std::string& oldClusterState, - const std::string& newClusterState) - { - return std::make_unique<PendingClusterStateFixture>(*this, oldClusterState, newClusterState); - } - - std::unique_ptr<PendingClusterStateFixture> createPendingStateFixtureForDistributionChange( - const std::string& oldClusterState) - { - return std::make_unique<PendingClusterStateFixture>(*this, oldClusterState); - } - - uint32_t populate_bucket_db_via_request_bucket_info_for_benchmarking(); - - void complete_recovery_mode() { - _distributor->scanAllBuckets(); - } -}; - -LegacyBucketDBUpdaterTest::LegacyBucketDBUpdaterTest() - : DistributorTestUtil(), - _bucketSpaces() -{ -} - -LegacyBucketDBUpdaterTest::~LegacyBucketDBUpdaterTest() = default; - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterTest, normal_usage) { - setSystemState(lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3")); - - ASSERT_EQ(messageCount(3), _sender.commands().size()); - - // Ensure distribution hash is set correctly - ASSERT_EQ( - defaultDistributorBucketSpace().getDistribution() - .getNodeGraph().getDistributionConfigHash(), - dynamic_cast<const RequestBucketInfoCommand&>( - *_sender.command(0)).getDistributionHash()); - - ASSERT_NO_FATAL_FAILURE(fakeBucketReply(lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3"), - *_sender.command(0), 10)); - - _sender.clear(); - - // Optimization for not refetching unneeded data after cluster state - // change is only implemented after completion of previous cluster state - setSystemState(lib::ClusterState("distributor:2 .0.s:i storage:3")); - - ASSERT_EQ(messageCount(3), _sender.commands().size()); - // Expect reply of first set SystemState request. - ASSERT_EQ(size_t(1), _sender.replies().size()); - - ASSERT_NO_FATAL_FAILURE(completeBucketInfoGathering( - lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3"), - messageCount(3), 10)); - ASSERT_NO_FATAL_FAILURE(assertCorrectBuckets(10, "distributor:2 storage:3")); -} - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterTest, distributor_change) { - int numBuckets = 100; - - // First sends request - setSystemState(lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3")); - ASSERT_EQ(messageCount(3), _sender.commands().size()); - ASSERT_NO_FATAL_FAILURE(completeBucketInfoGathering(lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3"), - messageCount(3), numBuckets)); - _sender.clear(); - - // No change from initializing to up (when done with last job) - setSystemState(lib::ClusterState("distributor:2 storage:3")); - ASSERT_EQ(size_t(0), _sender.commands().size()); - _sender.clear(); - - // Adding node. No new read requests, but buckets thrown - setSystemState(lib::ClusterState("distributor:3 storage:3")); - ASSERT_EQ(size_t(0), _sender.commands().size()); - ASSERT_NO_FATAL_FAILURE(assertCorrectBuckets(numBuckets, "distributor:3 storage:3")); - _sender.clear(); - - // Removing distributor. Need to refetch new data from all nodes. - setSystemState(lib::ClusterState("distributor:2 storage:3")); - ASSERT_EQ(messageCount(3), _sender.commands().size()); - ASSERT_NO_FATAL_FAILURE(completeBucketInfoGathering(lib::ClusterState("distributor:2 storage:3"), - messageCount(3), numBuckets)); - _sender.clear(); - ASSERT_NO_FATAL_FAILURE(assertCorrectBuckets(numBuckets, "distributor:2 storage:3")); -} - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterTest, distributor_change_with_grouping) { - std::string distConfig(getDistConfig6Nodes2Groups()); - setDistribution(distConfig); - int numBuckets = 100; - - setSystemState(lib::ClusterState("distributor:6 storage:6")); - ASSERT_EQ(messageCount(6), _sender.commands().size()); - ASSERT_NO_FATAL_FAILURE(completeBucketInfoGathering(lib::ClusterState("distributor:6 storage:6"), - messageCount(6), numBuckets)); - _sender.clear(); - - // Distributor going down in other group, no change - setSystemState(lib::ClusterState("distributor:6 .5.s:d storage:6")); - ASSERT_EQ(size_t(0), _sender.commands().size()); - _sender.clear(); - - setSystemState(lib::ClusterState("distributor:6 storage:6")); - ASSERT_EQ(size_t(0), _sender.commands().size()); - ASSERT_NO_FATAL_FAILURE(assertCorrectBuckets(numBuckets, "distributor:6 storage:6")); - _sender.clear(); - - // Unchanged grouping cause no change. - setDistribution(distConfig); - ASSERT_EQ(size_t(0), _sender.commands().size()); - - // Changed grouping cause change - setDistribution(getDistConfig6Nodes4Groups()); - - ASSERT_EQ(messageCount(6), _sender.commands().size()); -} - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterTest, normal_usage_initializing) { - setSystemState(lib::ClusterState("distributor:1 .0.s:i storage:1 .0.s:i")); - - ASSERT_EQ(_bucketSpaces.size(), _sender.commands().size()); - - // Not yet passing on system state. - ASSERT_EQ(size_t(0), _senderDown.commands().size()); - - ASSERT_NO_FATAL_FAILURE(completeBucketInfoGathering(lib::ClusterState("distributor:1 .0.s:i storage:1"), - _bucketSpaces.size(), 10, 10)); - - ASSERT_NO_FATAL_FAILURE(assertCorrectBuckets(10, "distributor:1 storage:1")); - - for (int i=10; i<20; i++) { - ASSERT_NO_FATAL_FAILURE(verifyInvalid(document::BucketId(16, i), 0)); - } - - // Pass on cluster state and recheck buckets now. - ASSERT_EQ(size_t(1), _senderDown.commands().size()); - - _sender.clear(); - _senderDown.clear(); - - setSystemState(lib::ClusterState("distributor:1 .0.s:i storage:1")); - - // Send a new request bucket info up. - ASSERT_EQ(_bucketSpaces.size(), _sender.commands().size()); - - ASSERT_NO_FATAL_FAILURE(completeBucketInfoGathering(lib::ClusterState("distributor:1 .0.s:i storage:1"), - _bucketSpaces.size(), 20)); - - // Pass on cluster state and recheck buckets now. - ASSERT_EQ(size_t(1), _senderDown.commands().size()); - - ASSERT_NO_FATAL_FAILURE(assertCorrectBuckets(20, "distributor:1 storage:1")); -} - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterTest, failed_request_bucket_info) { - setSystemState(lib::ClusterState("distributor:1 .0.s:i storage:1")); - - // 2 messages sent up: 1 to the nodes, and one reply to the setsystemstate. - ASSERT_EQ(_bucketSpaces.size(), _sender.commands().size()); - - { - for (uint32_t i = 0; i < _bucketSpaces.size(); ++i) { - std::shared_ptr<api::RequestBucketInfoReply> reply = - getFakeBucketReply(lib::ClusterState("distributor:1 .0.s:i storage:1"), - *((RequestBucketInfoCommand*)_sender.command(i).get()), - 0, - 10); - reply->setResult(api::ReturnCode::NOT_CONNECTED); - getBucketDBUpdater().onRequestBucketInfoReply(reply); - } - - // Trigger that delayed message is sent - getClock().addSecondsToTime(10); - getBucketDBUpdater().resendDelayedMessages(); - } - - // Should be resent. - ASSERT_EQ(getRequestBucketInfoStrings(messageCount(2)), _sender.getCommands()); - - ASSERT_EQ(size_t(0), _senderDown.commands().size()); - - for (uint32_t i = 0; i < _bucketSpaces.size(); ++i) { - ASSERT_NO_FATAL_FAILURE(fakeBucketReply(lib::ClusterState("distributor:1 .0.s:i storage:1"), - *_sender.command(_bucketSpaces.size() + i), 10)); - } - - for (int i=0; i<10; i++) { - EXPECT_EQ(std::string(""), - verifyBucket(document::BucketId(16, i), - lib::ClusterState("distributor:1 storage:1"))); - } - - // Set system state should now be passed on - EXPECT_EQ(std::string("Set system state"), _senderDown.getCommands()); -} - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterTest, down_while_init) { - ASSERT_NO_FATAL_FAILURE(setStorageNodes(3)); - - ASSERT_NO_FATAL_FAILURE(fakeBucketReply(lib::ClusterState("distributor:1 storage:3"), - *_sender.command(0), 5)); - - setSystemState(lib::ClusterState("distributor:1 storage:3 .1.s:d")); - - ASSERT_NO_FATAL_FAILURE(fakeBucketReply(lib::ClusterState("distributor:1 storage:3"), - *_sender.command(2), 5)); - - ASSERT_NO_FATAL_FAILURE(fakeBucketReply(lib::ClusterState("distributor:1 storage:3"), - *_sender.command(1), 5)); -} - -bool -LegacyBucketDBUpdaterTest::bucketExistsThatHasNode(int bucketCount, uint16_t node) const -{ - for (int i=1; i<bucketCount; i++) { - if (bucketHasNode(document::BucketId(16, i), node)) { - return true; - } - } - - return false; -} - -std::string -LegacyBucketDBUpdaterTest::getNodeList(std::vector<uint16_t> nodes, size_t count) -{ - std::ostringstream ost; - bool first = true; - for (const auto &node : nodes) { - for (uint32_t i = 0; i < count; ++i) { - if (!first) { - ost << ","; - } - ost << node; - first = false; - } - } - return ost.str(); -} - -std::string -LegacyBucketDBUpdaterTest::getNodeList(std::vector<uint16_t> nodes) -{ - return getNodeList(std::move(nodes), _bucketSpaces.size()); -} - -std::vector<uint16_t> -LegacyBucketDBUpdaterTest::expandNodeVec(const std::vector<uint16_t> &nodes) -{ - std::vector<uint16_t> res; - size_t count = _bucketSpaces.size(); - for (const auto &node : nodes) { - for (uint32_t i = 0; i < count; ++i) { - res.push_back(node); - } - } - return res; -} - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterTest, node_down) { - ASSERT_NO_FATAL_FAILURE(setStorageNodes(3)); - enableDistributorClusterState("distributor:1 storage:3"); - - for (int i=1; i<100; i++) { - addIdealNodes(document::BucketId(16, i)); - } - - EXPECT_TRUE(bucketExistsThatHasNode(100, 1)); - - setSystemState(lib::ClusterState("distributor:1 storage:3 .1.s:d")); - - EXPECT_FALSE(bucketExistsThatHasNode(100, 1)); -} - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterTest, storage_node_in_maintenance_clears_buckets_for_node) { - ASSERT_NO_FATAL_FAILURE(setStorageNodes(3)); - enableDistributorClusterState("distributor:1 storage:3"); - - for (int i=1; i<100; i++) { - addIdealNodes(document::BucketId(16, i)); - } - - EXPECT_TRUE(bucketExistsThatHasNode(100, 1)); - - setSystemState(lib::ClusterState("distributor:1 storage:3 .1.s:m")); - - EXPECT_FALSE(bucketExistsThatHasNode(100, 1)); -} - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterTest, node_down_copies_get_in_sync) { - ASSERT_NO_FATAL_FAILURE(setStorageNodes(3)); - - lib::ClusterState systemState("distributor:1 storage:3"); - document::BucketId bid(16, 1); - - addNodesToBucketDB(bid, "0=3,1=2,2=3"); - - setSystemState(lib::ClusterState("distributor:1 storage:3 .1.s:d")); - - EXPECT_EQ( - std::string("BucketId(0x4000000000000001) : " - "node(idx=0,crc=0x3,docs=3/3,bytes=3/3,trusted=true,active=false,ready=false), " - "node(idx=2,crc=0x3,docs=3/3,bytes=3/3,trusted=true,active=false,ready=false)"), - dumpBucket(bid)); -} - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterTest, initializing_while_recheck) { - lib::ClusterState systemState("distributor:1 storage:2 .0.s:i .0.i:0.1"); - setSystemState(systemState); - - ASSERT_EQ(messageCount(2), _sender.commands().size()); - ASSERT_EQ(size_t(0), _senderDown.commands().size()); - - getBucketDBUpdater().recheckBucketInfo(1, makeDocumentBucket(document::BucketId(16, 3))); - - for (uint32_t i = 0; i < messageCount(2); ++i) { - ASSERT_NO_FATAL_FAILURE(fakeBucketReply(systemState, *_sender.command(i), 100)); - } - - // Now we can pass on system state. - ASSERT_EQ(size_t(1), _senderDown.commands().size()); - EXPECT_EQ(MessageType::SETSYSTEMSTATE, _senderDown.command(0)->getType()); -} - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterTest, bit_change) { - std::vector<document::BucketId> bucketlist; - - { - setSystemState(lib::ClusterState("bits:14 storage:1 distributor:2")); - - ASSERT_EQ(_bucketSpaces.size(), _sender.commands().size()); - - for (uint32_t bsi = 0; bsi < _bucketSpaces.size(); ++bsi) { - ASSERT_EQ(_sender.command(bsi)->getType(), MessageType::REQUESTBUCKETINFO); - const auto &req = dynamic_cast<const RequestBucketInfoCommand &>(*_sender.command(bsi)); - auto sreply = std::make_shared<RequestBucketInfoReply>(req); - sreply->setAddress(storageAddress(0)); - auto& vec = sreply->getBucketInfo(); - if (req.getBucketSpace() == FixedBucketSpaces::default_space()) { - int cnt=0; - for (int i=0; cnt < 2; i++) { - lib::Distribution distribution = defaultDistributorBucketSpace().getDistribution(); - std::vector<uint16_t> distributors; - if (distribution.getIdealDistributorNode( - lib::ClusterState("bits:14 storage:1 distributor:2"), - document::BucketId(16, i)) - == 0) - { - vec.push_back(api::RequestBucketInfoReply::Entry( - document::BucketId(16, i), - api::BucketInfo(10,1,1))); - - bucketlist.push_back(document::BucketId(16, i)); - cnt++; - } - } - } - - getBucketDBUpdater().onRequestBucketInfoReply(sreply); - } - } - - EXPECT_EQ(std::string("BucketId(0x4000000000000001) : " - "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)"), - dumpBucket(bucketlist[0])); - EXPECT_EQ(std::string("BucketId(0x4000000000000002) : " - "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)"), - dumpBucket(bucketlist[1])); - - { - _sender.clear(); - setSystemState(lib::ClusterState("bits:16 storage:1 distributor:2")); - - ASSERT_EQ(_bucketSpaces.size(), _sender.commands().size()); - for (uint32_t bsi = 0; bsi < _bucketSpaces.size(); ++bsi) { - - ASSERT_EQ(_sender.command(bsi)->getType(), MessageType::REQUESTBUCKETINFO); - const auto &req = dynamic_cast<const RequestBucketInfoCommand &>(*_sender.command(bsi)); - auto sreply = std::make_shared<RequestBucketInfoReply>(req); - sreply->setAddress(storageAddress(0)); - sreply->setResult(api::ReturnCode::OK); - if (req.getBucketSpace() == FixedBucketSpaces::default_space()) { - api::RequestBucketInfoReply::EntryVector &vec = sreply->getBucketInfo(); - - for (uint32_t i = 0; i < 3; ++i) { - vec.push_back(api::RequestBucketInfoReply::Entry( - document::BucketId(16, i), - api::BucketInfo(10,1,1))); - } - - vec.push_back(api::RequestBucketInfoReply::Entry( - document::BucketId(16, 4), - api::BucketInfo(10,1,1))); - } - - getBucketDBUpdater().onRequestBucketInfoReply(sreply); - } - } - - EXPECT_EQ(std::string("BucketId(0x4000000000000000) : " - "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)"), - dumpBucket(document::BucketId(16, 0))); - EXPECT_EQ(std::string("BucketId(0x4000000000000001) : " - "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)"), - dumpBucket(document::BucketId(16, 1))); - EXPECT_EQ(std::string("BucketId(0x4000000000000002) : " - "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)"), - dumpBucket(document::BucketId(16, 2))); - EXPECT_EQ(std::string("BucketId(0x4000000000000004) : " - "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)"), - dumpBucket(document::BucketId(16, 4))); - - { - _sender.clear(); - setSystemState(lib::ClusterState("storage:1 distributor:2 .1.s:i")); - } - - { - _sender.clear(); - setSystemState(lib::ClusterState("storage:1 distributor:2")); - } -}; - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterTest, recheck_node_with_failure) { - ASSERT_NO_FATAL_FAILURE(initializeNodesAndBuckets(3, 5)); - - _sender.clear(); - - getBucketDBUpdater().recheckBucketInfo(1, makeDocumentBucket(document::BucketId(16, 3))); - - ASSERT_EQ(size_t(1), _sender.commands().size()); - - uint16_t index = 0; - { - auto& rbi = dynamic_cast<RequestBucketInfoCommand&>(*_sender.command(0)); - ASSERT_EQ(size_t(1), rbi.getBuckets().size()); - EXPECT_EQ(document::BucketId(16, 3), rbi.getBuckets()[0]); - auto reply = std::make_shared<api::RequestBucketInfoReply>(rbi); - const api::StorageMessageAddress *address = _sender.command(0)->getAddress(); - index = address->getIndex(); - reply->setResult(api::ReturnCode::NOT_CONNECTED); - getBucketDBUpdater().onRequestBucketInfoReply(reply); - // Trigger that delayed message is sent - getClock().addSecondsToTime(10); - getBucketDBUpdater().resendDelayedMessages(); - } - - ASSERT_EQ(size_t(2), _sender.commands().size()); - - setSystemState( - lib::ClusterState(vespalib::make_string("distributor:1 storage:3 .%d.s:d", index))); - - // Recheck bucket. - { - auto& rbi = dynamic_cast<RequestBucketInfoCommand&>(*_sender.command(1)); - ASSERT_EQ(size_t(1), rbi.getBuckets().size()); - EXPECT_EQ(document::BucketId(16, 3), rbi.getBuckets()[0]); - auto reply = std::make_shared<api::RequestBucketInfoReply>(rbi); - reply->setResult(api::ReturnCode::NOT_CONNECTED); - getBucketDBUpdater().onRequestBucketInfoReply(reply); - } - - // Should not retry since node is down. - EXPECT_EQ(size_t(2), _sender.commands().size()); -} - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterTest, recheck_node) { - ASSERT_NO_FATAL_FAILURE(initializeNodesAndBuckets(3, 5)); - - _sender.clear(); - - getBucketDBUpdater().recheckBucketInfo(1, makeDocumentBucket(document::BucketId(16, 3))); - - ASSERT_EQ(size_t(1), _sender.commands().size()); - - auto& rbi = dynamic_cast<RequestBucketInfoCommand&>(*_sender.command(0)); - ASSERT_EQ(size_t(1), rbi.getBuckets().size()); - EXPECT_EQ(document::BucketId(16, 3), rbi.getBuckets()[0]); - - auto reply = std::make_shared<api::RequestBucketInfoReply>(rbi); - reply->getBucketInfo().push_back( - api::RequestBucketInfoReply::Entry(document::BucketId(16, 3), - api::BucketInfo(20, 10, 12, 50, 60, true, true))); - getBucketDBUpdater().onRequestBucketInfoReply(reply); - - lib::ClusterState state("distributor:1 storage:3"); - for (uint32_t i = 0; i < 3; i++) { - EXPECT_EQ(getIdealStr(document::BucketId(16, i), state), - getNodes(document::BucketId(16, i))); - } - - for (uint32_t i = 4; i < 5; i++) { - EXPECT_EQ(getIdealStr(document::BucketId(16, i), state), - getNodes(document::BucketId(16, i))); - } - - BucketDatabase::Entry entry = getBucketDatabase().get(document::BucketId(16, 3)); - ASSERT_TRUE(entry.valid()); - - const BucketCopy* copy = entry->getNode(1); - ASSERT_TRUE(copy != nullptr); - EXPECT_EQ(api::BucketInfo(20,10,12, 50, 60, true, true), copy->getBucketInfo()); -} - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterTest, notify_bucket_change) { - enableDistributorClusterState("distributor:1 storage:1"); - - addNodesToBucketDB(document::BucketId(16, 1), "0=1234"); - _sender.replies().clear(); - - { - api::BucketInfo info(1, 2, 3, 4, 5, true, true); - auto cmd(std::make_shared<api::NotifyBucketChangeCommand>( - makeDocumentBucket(document::BucketId(16, 1)), info)); - cmd->setSourceIndex(0); - getBucketDBUpdater().onNotifyBucketChange(cmd); - } - - { - api::BucketInfo info(10, 11, 12, 13, 14, false, false); - auto cmd(std::make_shared<api::NotifyBucketChangeCommand>( - makeDocumentBucket(document::BucketId(16, 2)), info)); - cmd->setSourceIndex(0); - getBucketDBUpdater().onNotifyBucketChange(cmd); - } - - // Must receive reply - ASSERT_EQ(size_t(2), _sender.replies().size()); - - for (int i = 0; i < 2; ++i) { - ASSERT_EQ(MessageType::NOTIFYBUCKETCHANGE_REPLY, - _sender.reply(i)->getType()); - } - - // No database update until request bucket info replies have been received. - EXPECT_EQ(std::string("BucketId(0x4000000000000001) : " - "node(idx=0,crc=0x4d2,docs=1234/1234,bytes=1234/1234," - "trusted=false,active=false,ready=false)"), - dumpBucket(document::BucketId(16, 1))); - EXPECT_EQ(std::string("NONEXISTING"), dumpBucket(document::BucketId(16, 2))); - - ASSERT_EQ(size_t(2), _sender.commands().size()); - - std::vector<api::BucketInfo> infos; - infos.push_back(api::BucketInfo(4567, 200, 2000, 400, 4000, true, true)); - infos.push_back(api::BucketInfo(8999, 300, 3000, 500, 5000, false, false)); - - for (int i = 0; i < 2; ++i) { - auto& rbi = dynamic_cast<RequestBucketInfoCommand&>(*_sender.command(i)); - ASSERT_EQ(size_t(1), rbi.getBuckets().size()); - EXPECT_EQ(document::BucketId(16, i + 1), rbi.getBuckets()[0]); - - auto reply = std::make_shared<api::RequestBucketInfoReply>(rbi); - reply->getBucketInfo().push_back( - api::RequestBucketInfoReply::Entry(document::BucketId(16, i + 1), - infos[i])); - getBucketDBUpdater().onRequestBucketInfoReply(reply); - } - - EXPECT_EQ(std::string("BucketId(0x4000000000000001) : " - "node(idx=0,crc=0x11d7,docs=200/400,bytes=2000/4000,trusted=true,active=true,ready=true)"), - dumpBucket(document::BucketId(16, 1))); - EXPECT_EQ(std::string("BucketId(0x4000000000000002) : " - "node(idx=0,crc=0x2327,docs=300/500,bytes=3000/5000,trusted=true,active=false,ready=false)"), - dumpBucket(document::BucketId(16, 2))); -} - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterTest, notify_bucket_change_from_node_down) { - enableDistributorClusterState("distributor:1 storage:2"); - - addNodesToBucketDB(document::BucketId(16, 1), "1=1234"); - - _sender.replies().clear(); - - { - api::BucketInfo info(8999, 300, 3000, 500, 5000, false, false); - auto cmd(std::make_shared<api::NotifyBucketChangeCommand>( - makeDocumentBucket(document::BucketId(16, 1)), info)); - cmd->setSourceIndex(0); - getBucketDBUpdater().onNotifyBucketChange(cmd); - } - // Enable here to avoid having request bucket info be silently swallowed - // (send_request_bucket_info drops message if node is down). - enableDistributorClusterState("distributor:1 storage:2 .0.s:d"); - - ASSERT_EQ(std::string("BucketId(0x4000000000000001) : " - "node(idx=1,crc=0x4d2,docs=1234/1234,bytes=1234/1234,trusted=false,active=false,ready=false)"), - dumpBucket(document::BucketId(16, 1))); - - ASSERT_EQ(size_t(1), _sender.replies().size()); - ASSERT_EQ(MessageType::NOTIFYBUCKETCHANGE_REPLY, _sender.reply(0)->getType()); - - // Currently, this pending operation will be auto-flushed when the cluster state - // changes so the behavior is still correct. Keep this test around to prevent - // regressions here. - ASSERT_EQ(size_t(1), _sender.commands().size()); - auto& rbi = dynamic_cast<RequestBucketInfoCommand&>(*_sender.command(0)); - ASSERT_EQ(size_t(1), rbi.getBuckets().size()); - EXPECT_EQ(document::BucketId(16, 1), rbi.getBuckets()[0]); - - auto reply = std::make_shared<api::RequestBucketInfoReply>(rbi); - reply->getBucketInfo().push_back( - api::RequestBucketInfoReply::Entry( - document::BucketId(16, 1), - api::BucketInfo(8999, 300, 3000, 500, 5000, false, false))); - getBucketDBUpdater().onRequestBucketInfoReply(reply); - - // No change - EXPECT_EQ(std::string("BucketId(0x4000000000000001) : " - "node(idx=1,crc=0x4d2,docs=1234/1234,bytes=1234/1234,trusted=false,active=false,ready=false)"), - dumpBucket(document::BucketId(16, 1))); -} - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -/** - * Test that NotifyBucketChange received while there's a pending cluster state - * waits until the cluster state has been enabled as current before it sends off - * the single bucket info requests. This is to prevent a race condition where - * the replies to bucket info requests for buckets that would be owned by the - * distributor in the pending state but not by the current state would be - * discarded when attempted inserted into the bucket database. - */ -TEST_F(LegacyBucketDBUpdaterTest, notify_change_with_pending_state_queues_bucket_info_requests) { - setSystemState(lib::ClusterState("distributor:1 storage:1")); - ASSERT_EQ(_bucketSpaces.size(), _sender.commands().size()); - - { - api::BucketInfo info(8999, 300, 3000, 500, 5000, false, false); - auto cmd(std::make_shared<api::NotifyBucketChangeCommand>( - makeDocumentBucket(document::BucketId(16, 1)), info)); - cmd->setSourceIndex(0); - getBucketDBUpdater().onNotifyBucketChange(cmd); - } - - ASSERT_EQ(_bucketSpaces.size(), _sender.commands().size()); - - ASSERT_NO_FATAL_FAILURE(completeBucketInfoGathering(lib::ClusterState("distributor:1 storage:1"), - _bucketSpaces.size(), 10)); - - ASSERT_EQ(_bucketSpaces.size() + 1, _sender.commands().size()); - - { - auto& rbi = dynamic_cast<RequestBucketInfoCommand&>(*_sender.command(_bucketSpaces.size())); - ASSERT_EQ(size_t(1), rbi.getBuckets().size()); - EXPECT_EQ(document::BucketId(16, 1), rbi.getBuckets()[0]); - } - _sender.clear(); - - // Queue must be cleared once pending state is enabled. - { - lib::ClusterState state("distributor:1 storage:2"); - uint32_t expectedMsgs = _bucketSpaces.size(), dummyBucketsToReturn = 1; - ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(state, expectedMsgs, dummyBucketsToReturn)); - } - ASSERT_EQ(_bucketSpaces.size(), _sender.commands().size()); - { - auto& rbi = dynamic_cast<RequestBucketInfoCommand&>(*_sender.command(0)); - EXPECT_EQ(size_t(0), rbi.getBuckets().size()); - } -} - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterTest, merge_reply) { - enableDistributorClusterState("distributor:1 storage:3"); - - addNodesToBucketDB(document::BucketId(16, 1234), - "0=1234,1=1234,2=1234"); - - std::vector<api::MergeBucketCommand::Node> nodes; - nodes.push_back(api::MergeBucketCommand::Node(0)); - nodes.push_back(api::MergeBucketCommand::Node(1)); - nodes.push_back(api::MergeBucketCommand::Node(2)); - - api::MergeBucketCommand cmd(makeDocumentBucket(document::BucketId(16, 1234)), nodes, 0); - - auto reply = std::make_shared<api::MergeBucketReply>(cmd); - - _sender.clear(); - getBucketDBUpdater().onMergeBucketReply(reply); - - ASSERT_EQ(size_t(3), _sender.commands().size()); - - for (uint32_t i = 0; i < 3; i++) { - auto req = std::dynamic_pointer_cast<api::RequestBucketInfoCommand>(_sender.command(i)); - - ASSERT_TRUE(req.get() != nullptr); - ASSERT_EQ(size_t(1), req->getBuckets().size()); - EXPECT_EQ(document::BucketId(16, 1234), req->getBuckets()[0]); - - auto reqreply = std::make_shared<api::RequestBucketInfoReply>(*req); - reqreply->getBucketInfo().push_back( - api::RequestBucketInfoReply::Entry(document::BucketId(16, 1234), - api::BucketInfo(10 * (i + 1), 100 * (i +1), 1000 * (i+1)))); - - getBucketDBUpdater().onRequestBucketInfoReply(reqreply); - } - - EXPECT_EQ(std::string("BucketId(0x40000000000004d2) : " - "node(idx=0,crc=0xa,docs=100/100,bytes=1000/1000,trusted=false,active=false,ready=false), " - "node(idx=1,crc=0x14,docs=200/200,bytes=2000/2000,trusted=false,active=false,ready=false), " - "node(idx=2,crc=0x1e,docs=300/300,bytes=3000/3000,trusted=false,active=false,ready=false)"), - dumpBucket(document::BucketId(16, 1234))); -}; - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterTest, merge_reply_node_down) { - enableDistributorClusterState("distributor:1 storage:3"); - std::vector<api::MergeBucketCommand::Node> nodes; - - addNodesToBucketDB(document::BucketId(16, 1234), "0=1234,1=1234,2=1234"); - - for (uint32_t i = 0; i < 3; ++i) { - nodes.push_back(api::MergeBucketCommand::Node(i)); - } - - api::MergeBucketCommand cmd(makeDocumentBucket(document::BucketId(16, 1234)), nodes, 0); - - auto reply = std::make_shared<api::MergeBucketReply>(cmd); - - setSystemState(lib::ClusterState("distributor:1 storage:2")); - - _sender.clear(); - getBucketDBUpdater().onMergeBucketReply(reply); - - ASSERT_EQ(size_t(2), _sender.commands().size()); - - for (uint32_t i = 0; i < 2; i++) { - auto req = std::dynamic_pointer_cast<api::RequestBucketInfoCommand>(_sender.command(i)); - - ASSERT_TRUE(req.get() != nullptr); - ASSERT_EQ(size_t(1), req->getBuckets().size()); - EXPECT_EQ(document::BucketId(16, 1234), req->getBuckets()[0]); - - auto reqreply = std::make_shared<api::RequestBucketInfoReply>(*req); - reqreply->getBucketInfo().push_back( - api::RequestBucketInfoReply::Entry( - document::BucketId(16, 1234), - api::BucketInfo(10 * (i + 1), 100 * (i +1), 1000 * (i+1)))); - getBucketDBUpdater().onRequestBucketInfoReply(reqreply); - } - - EXPECT_EQ(std::string("BucketId(0x40000000000004d2) : " - "node(idx=0,crc=0xa,docs=100/100,bytes=1000/1000,trusted=false,active=false,ready=false), " - "node(idx=1,crc=0x14,docs=200/200,bytes=2000/2000,trusted=false,active=false,ready=false)"), - dumpBucket(document::BucketId(16, 1234))); -}; - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterTest, merge_reply_node_down_after_request_sent) { - enableDistributorClusterState("distributor:1 storage:3"); - std::vector<api::MergeBucketCommand::Node> nodes; - - addNodesToBucketDB(document::BucketId(16, 1234), "0=1234,1=1234,2=1234"); - - for (uint32_t i = 0; i < 3; ++i) { - nodes.push_back(api::MergeBucketCommand::Node(i)); - } - - api::MergeBucketCommand cmd(makeDocumentBucket(document::BucketId(16, 1234)), nodes, 0); - - auto reply(std::make_shared<api::MergeBucketReply>(cmd)); - - _sender.clear(); - getBucketDBUpdater().onMergeBucketReply(reply); - - ASSERT_EQ(size_t(3), _sender.commands().size()); - - setSystemState(lib::ClusterState("distributor:1 storage:2")); - - for (uint32_t i = 0; i < 3; i++) { - auto req = std::dynamic_pointer_cast<api::RequestBucketInfoCommand>(_sender.command(i)); - - ASSERT_TRUE(req.get() != nullptr); - ASSERT_EQ(size_t(1), req->getBuckets().size()); - EXPECT_EQ(document::BucketId(16, 1234), req->getBuckets()[0]); - - auto reqreply = std::make_shared<api::RequestBucketInfoReply>(*req); - reqreply->getBucketInfo().push_back( - api::RequestBucketInfoReply::Entry( - document::BucketId(16, 1234), - api::BucketInfo(10 * (i + 1), 100 * (i +1), 1000 * (i+1)))); - getBucketDBUpdater().onRequestBucketInfoReply(reqreply); - } - - EXPECT_EQ(std::string("BucketId(0x40000000000004d2) : " - "node(idx=0,crc=0xa,docs=100/100,bytes=1000/1000,trusted=false,active=false,ready=false), " - "node(idx=1,crc=0x14,docs=200/200,bytes=2000/2000,trusted=false,active=false,ready=false)"), - dumpBucket(document::BucketId(16, 1234))); -}; - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterTest, flush) { - enableDistributorClusterState("distributor:1 storage:3"); - _sender.clear(); - - addNodesToBucketDB(document::BucketId(16, 1234), "0=1234,1=1234,2=1234"); - - std::vector<api::MergeBucketCommand::Node> nodes; - for (uint32_t i = 0; i < 3; ++i) { - nodes.push_back(api::MergeBucketCommand::Node(i)); - } - - api::MergeBucketCommand cmd(makeDocumentBucket(document::BucketId(16, 1234)), nodes, 0); - - auto reply(std::make_shared<api::MergeBucketReply>(cmd)); - - _sender.clear(); - getBucketDBUpdater().onMergeBucketReply(reply); - - ASSERT_EQ(size_t(3), _sender.commands().size()); - ASSERT_EQ(size_t(0), _senderDown.replies().size()); - - getBucketDBUpdater().flush(); - // Flushing should drop all merge bucket replies - EXPECT_EQ(size_t(0), _senderDown.commands().size()); -} - -std::string -LegacyBucketDBUpdaterTest::getSentNodes( - const std::string& oldClusterState, - const std::string& newClusterState) -{ - auto fixture = createPendingStateFixtureForStateChange( - oldClusterState, newClusterState); - - sortSentMessagesByIndex(fixture->sender); - - std::ostringstream ost; - for (uint32_t i = 0; i < fixture->sender.commands().size(); i++) { - auto& req = dynamic_cast<RequestBucketInfoCommand&>(*fixture->sender.command(i)); - - if (i > 0) { - ost << ","; - } - - ost << req.getAddress()->getIndex(); - } - - return ost.str(); -} - -std::string -LegacyBucketDBUpdaterTest::getSentNodesDistributionChanged( - const std::string& oldClusterState) -{ - DistributorMessageSenderStub sender; - - framework::defaultimplementation::FakeClock clock; - ClusterInformation::CSP clusterInfo(createClusterInfo(oldClusterState)); - std::unique_ptr<PendingClusterState> state( - PendingClusterState::createForDistributionChange( - clock, clusterInfo, sender, getBucketSpaceRepo(), api::Timestamp(1))); - - sortSentMessagesByIndex(sender); - - std::ostringstream ost; - for (uint32_t i = 0; i < sender.commands().size(); i++) { - auto& req = dynamic_cast<RequestBucketInfoCommand&>(*sender.command(i)); - - if (i > 0) { - ost << ","; - } - - ost << req.getAddress()->getIndex(); - } - - return ost.str(); -} - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterTest, pending_cluster_state_send_messages) { - EXPECT_EQ(getNodeList({0, 1, 2}), - getSentNodes("cluster:d", - "distributor:1 storage:3")); - - EXPECT_EQ(getNodeList({0, 1}), - getSentNodes("cluster:d", - "distributor:1 storage:3 .2.s:m")); - - EXPECT_EQ(getNodeList({2}), - getSentNodes("distributor:1 storage:2", - "distributor:1 storage:3")); - - EXPECT_EQ(getNodeList({2, 3, 4, 5}), - getSentNodes("distributor:1 storage:2", - "distributor:1 storage:6")); - - EXPECT_EQ(getNodeList({0, 1, 2}), - getSentNodes("distributor:4 storage:3", - "distributor:3 storage:3")); - - EXPECT_EQ(getNodeList({0, 1, 2, 3}), - getSentNodes("distributor:4 storage:3", - "distributor:4 .2.s:d storage:4")); - - EXPECT_EQ(std::string(""), - getSentNodes("distributor:4 storage:3", - "distributor:4 .0.s:d storage:4")); - - EXPECT_EQ(std::string(""), - getSentNodes("distributor:3 storage:3", - "distributor:4 storage:3")); - - EXPECT_EQ(getNodeList({2}), - getSentNodes("distributor:3 storage:3 .2.s:i", - "distributor:3 storage:3")); - - EXPECT_EQ(getNodeList({1}), - getSentNodes("distributor:3 storage:3 .1.s:d", - "distributor:3 storage:3")); - - EXPECT_EQ(getNodeList({1, 2, 4}), - getSentNodes("distributor:3 storage:4 .1.s:d .2.s:i", - "distributor:3 storage:5")); - - EXPECT_EQ(std::string(""), - getSentNodes("distributor:1 storage:3", - "cluster:d")); - - EXPECT_EQ(std::string(""), - getSentNodes("distributor:1 storage:3", - "distributor:1 storage:3")); - - EXPECT_EQ(std::string(""), - getSentNodes("distributor:1 storage:3", - "cluster:d distributor:1 storage:6")); - - EXPECT_EQ(std::string(""), - getSentNodes("distributor:3 storage:3", - "distributor:3 .2.s:m storage:3")); - - EXPECT_EQ(getNodeList({0, 1, 2}), - getSentNodes("distributor:3 .2.s:m storage:3", - "distributor:3 .2.s:d storage:3")); - - EXPECT_EQ(std::string(""), - getSentNodes("distributor:3 .2.s:m storage:3", - "distributor:3 storage:3")); - - EXPECT_EQ(getNodeList({0, 1, 2}), - getSentNodesDistributionChanged("distributor:3 storage:3")); - - EXPECT_EQ(getNodeList({0, 1}), - getSentNodes("distributor:10 storage:2", - "distributor:10 .1.s:d storage:2")); - - EXPECT_EQ(std::string(""), - getSentNodes("distributor:2 storage:2", - "distributor:3 .2.s:i storage:2")); - - EXPECT_EQ(getNodeList({0, 1, 2}), - getSentNodes("distributor:3 storage:3", - "distributor:3 .2.s:s storage:3")); - - EXPECT_EQ(std::string(""), - getSentNodes("distributor:3 .2.s:s storage:3", - "distributor:3 .2.s:d storage:3")); - - EXPECT_EQ(getNodeList({1}), - getSentNodes("distributor:3 storage:3 .1.s:m", - "distributor:3 storage:3")); - - EXPECT_EQ(std::string(""), - getSentNodes("distributor:3 storage:3", - "distributor:3 storage:3 .1.s:m")); -}; - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterTest, pending_cluster_state_receive) { - DistributorMessageSenderStub sender; - - auto cmd(std::make_shared<api::SetSystemStateCommand>( - lib::ClusterState("distributor:1 storage:3"))); - - framework::defaultimplementation::FakeClock clock; - ClusterInformation::CSP clusterInfo(createClusterInfo("cluster:d")); - OutdatedNodesMap outdatedNodesMap; - std::unique_ptr<PendingClusterState> state( - PendingClusterState::createForClusterStateChange( - clock, clusterInfo, sender, getBucketSpaceRepo(), - cmd, outdatedNodesMap, api::Timestamp(1))); - - ASSERT_EQ(messageCount(3), sender.commands().size()); - - sortSentMessagesByIndex(sender); - - std::ostringstream ost; - for (uint32_t i = 0; i < sender.commands().size(); i++) { - auto* req = dynamic_cast<RequestBucketInfoCommand*>(sender.command(i).get()); - ASSERT_TRUE(req != nullptr); - - auto rep = std::make_shared<RequestBucketInfoReply>(*req); - - rep->getBucketInfo().push_back( - RequestBucketInfoReply::Entry( - document::BucketId(16, i), - api::BucketInfo(i, i, i, i, i))); - - ASSERT_TRUE(state->onRequestBucketInfoReply(rep)); - ASSERT_EQ((i == (sender.commands().size() - 1)), state->done()); - } - - auto& pendingTransition = state->getPendingBucketSpaceDbTransition(makeBucketSpace()); - EXPECT_EQ(3, (int)pendingTransition.results().size()); -} - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterTest, pending_cluster_state_with_group_down) { - std::string config(getDistConfig6Nodes4Groups()); - config += "distributor_auto_ownership_transfer_on_whole_group_down true\n"; - setDistribution(config); - - // Group config has nodes {0, 1}, {2, 3}, {4, 5} - // We're node index 0. - - // Entire group 1 goes down. Must refetch from all nodes. - EXPECT_EQ(getNodeList({0, 1, 2, 3, 4, 5}), - getSentNodes("distributor:6 storage:6", - "distributor:6 .2.s:d .3.s:d storage:6")); - - // But don't fetch if not the entire group is down. - EXPECT_EQ(std::string(""), - getSentNodes("distributor:6 storage:6", - "distributor:6 .2.s:d storage:6")); -} - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterTest, pending_cluster_state_with_group_down_and_no_handover) { - std::string config(getDistConfig6Nodes4Groups()); - config += "distributor_auto_ownership_transfer_on_whole_group_down false\n"; - setDistribution(config); - - // Group is down, but config says to not do anything about it. - EXPECT_EQ(getNodeList({0, 1, 2, 3, 4, 5}, _bucketSpaces.size() - 1), - getSentNodes("distributor:6 storage:6", - "distributor:6 .2.s:d .3.s:d storage:6")); -} - -namespace { - -void -parseInputData(const std::string& data, - uint64_t timestamp, - PendingClusterState& state, - bool includeBucketInfo) -{ - vespalib::StringTokenizer tokenizer(data, "|"); - for (uint32_t i = 0; i < tokenizer.size(); i++) { - vespalib::StringTokenizer tok2(tokenizer[i], ":"); - - uint16_t node = atoi(tok2[0].data()); - - state.setNodeReplied(node); - auto& pendingTransition = state.getPendingBucketSpaceDbTransition(makeBucketSpace()); - - vespalib::StringTokenizer tok3(tok2[1], ","); - for (uint32_t j = 0; j < tok3.size(); j++) { - if (includeBucketInfo) { - vespalib::StringTokenizer tok4(tok3[j], "/"); - - pendingTransition.addNodeInfo( - document::BucketId(16, atoi(tok4[0].data())), - BucketCopy( - timestamp, - node, - api::BucketInfo( - atoi(tok4[1].data()), - atoi(tok4[2].data()), - atoi(tok4[3].data()), - atoi(tok4[2].data()), - atoi(tok4[3].data())))); - } else { - pendingTransition.addNodeInfo( - document::BucketId(16, atoi(tok3[j].data())), - BucketCopy(timestamp, - node, - api::BucketInfo(3, 3, 3, 3, 3))); - } - } - } -} - -struct BucketDumper : public BucketDatabase::EntryProcessor -{ - std::ostringstream ost; - bool _includeBucketInfo; - - explicit BucketDumper(bool includeBucketInfo) - : _includeBucketInfo(includeBucketInfo) - { - } - - bool process(const BucketDatabase::ConstEntryRef& e) override { - document::BucketId bucketId(e.getBucketId()); - - ost << (uint32_t)bucketId.getRawId() << ":"; - for (uint32_t i = 0; i < e->getNodeCount(); ++i) { - if (i > 0) { - ost << ","; - } - const BucketCopy& copy(e->getNodeRef(i)); - ost << copy.getNode(); - if (_includeBucketInfo) { - ost << "/" << copy.getChecksum() - << "/" << copy.getDocumentCount() - << "/" << copy.getTotalDocumentSize() - << "/" << (copy.trusted() ? "t" : "u"); - } - } - ost << "|"; - return true; - } -}; - -} - -std::string -LegacyBucketDBUpdaterTest::mergeBucketLists( - const lib::ClusterState& oldState, - const std::string& existingData, - const lib::ClusterState& newState, - const std::string& newData, - bool includeBucketInfo) -{ - framework::defaultimplementation::FakeClock clock; - framework::MilliSecTimer timer(clock); - - DistributorMessageSenderStub sender; - OutdatedNodesMap outdatedNodesMap; - - { - auto cmd(std::make_shared<api::SetSystemStateCommand>(oldState)); - - api::Timestamp beforeTime(1); - - ClusterInformation::CSP clusterInfo(createClusterInfo("cluster:d")); - std::unique_ptr<PendingClusterState> state( - PendingClusterState::createForClusterStateChange( - clock, clusterInfo, sender, getBucketSpaceRepo(), - cmd, outdatedNodesMap, beforeTime)); - - parseInputData(existingData, beforeTime, *state, includeBucketInfo); - state->mergeIntoBucketDatabases(); - } - - BucketDumper dumper_tmp(true); - getBucketDatabase().forEach(dumper_tmp); - - { - auto cmd(std::make_shared<api::SetSystemStateCommand>( - lib::ClusterState(newState))); - - api::Timestamp afterTime(2); - - ClusterInformation::CSP clusterInfo(createClusterInfo(oldState.toString())); - std::unique_ptr<PendingClusterState> state( - PendingClusterState::createForClusterStateChange( - clock, clusterInfo, sender, getBucketSpaceRepo(), - cmd, outdatedNodesMap, afterTime)); - - parseInputData(newData, afterTime, *state, includeBucketInfo); - state->mergeIntoBucketDatabases(); - } - - BucketDumper dumper(includeBucketInfo); - auto &bucketDb(defaultDistributorBucketSpace().getBucketDatabase()); - bucketDb.forEach(dumper); - bucketDb.clear(); - return dumper.ost.str(); -} - -std::string -LegacyBucketDBUpdaterTest::mergeBucketLists(const std::string& existingData, - const std::string& newData, - bool includeBucketInfo) -{ - return mergeBucketLists( - lib::ClusterState("distributor:1 storage:3"), - existingData, - lib::ClusterState("distributor:1 storage:3"), - newData, - includeBucketInfo); -} - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterTest, pending_cluster_state_merge) { - // Simple initializing case - ask all nodes for info - EXPECT_EQ( - // Result is on the form: [bucket w/o count bits]:[node indexes]|.. - std::string("4:0,1|2:0,1|6:1,2|1:0,2|5:2,0|3:2,1|"), - // Input is on the form: [node]:[bucket w/o count bits]|... - mergeBucketLists( - "", - "0:1,2,4,5|1:2,3,4,6|2:1,3,5,6")); - - // New node came up - EXPECT_EQ( - std::string("4:0,1|2:0,1|6:1,2,3|1:0,2,3|5:2,0,3|3:2,1,3|"), - mergeBucketLists( - "0:1,2,4,5|1:2,3,4,6|2:1,3,5,6", - "3:1,3,5,6")); - - // Node came up with some buckets removed and some added - // Buckets that were removed should not be removed as the node - // didn't lose a disk. - EXPECT_EQ( - std::string("8:0|4:0,1|2:0,1|6:1,0,2|1:0,2|5:2,0|3:2,1|"), - mergeBucketLists( - "0:1,2,4,5|1:2,3,4,6|2:1,3,5,6", - "0:1,2,6,8")); - - // Bucket info format is "bucketid/checksum/count/size" - // Node went from initializing to up and invalid bucket went to empty. - EXPECT_EQ( - std::string("2:0/0/0/0/t|"), - mergeBucketLists( - "0:2/0/0/1", - "0:2/0/0/0", - true)); - - EXPECT_EQ(std::string("5:1/2/3/4/u,0/0/0/0/u|"), - mergeBucketLists("", "0:5/0/0/0|1:5/2/3/4", true)); -} - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterTest, pending_cluster_state_merge_replica_changed) { - // Node went from initializing to up and non-invalid bucket changed. - EXPECT_EQ( - std::string("2:0/2/3/4/t|3:0/2/4/6/t|"), - mergeBucketLists( - lib::ClusterState("distributor:1 storage:1 .0.s:i"), - "0:2/1/2/3,3/2/4/6", - lib::ClusterState("distributor:1 storage:1"), - "0:2/2/3/4,3/2/4/6", - true)); -} - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterTest, no_db_resurrection_for_bucket_not_owned_in_current_state) { - document::BucketId bucket(16, 3); - lib::ClusterState stateBefore("distributor:1 storage:1"); - { - uint32_t expectedMsgs = _bucketSpaces.size(), dummyBucketsToReturn = 1; - ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(stateBefore, expectedMsgs, dummyBucketsToReturn)); - } - _sender.clear(); - - getBucketDBUpdater().recheckBucketInfo(0, makeDocumentBucket(bucket)); - - ASSERT_EQ(size_t(1), _sender.commands().size()); - std::shared_ptr<api::RequestBucketInfoCommand> rbi( - std::dynamic_pointer_cast<RequestBucketInfoCommand>( - _sender.command(0))); - - lib::ClusterState stateAfter("distributor:3 storage:3"); - - { - uint32_t expectedMsgs = messageCount(2), dummyBucketsToReturn = 1; - ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(stateAfter, expectedMsgs, dummyBucketsToReturn)); - } - EXPECT_FALSE(getDistributorBucketSpace().get_bucket_ownership_flags(bucket).owned_in_current_state()); - - ASSERT_NO_FATAL_FAILURE(sendFakeReplyForSingleBucketRequest(*rbi)); - - EXPECT_EQ(std::string("NONEXISTING"), dumpBucket(bucket)); -} - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterTest, no_db_resurrection_for_bucket_not_owned_in_pending_state) { - document::BucketId bucket(16, 3); - lib::ClusterState stateBefore("distributor:1 storage:1"); - { - uint32_t expectedMsgs = _bucketSpaces.size(), dummyBucketsToReturn = 1; - ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(stateBefore, expectedMsgs, dummyBucketsToReturn)); - } - _sender.clear(); - - getBucketDBUpdater().recheckBucketInfo(0, makeDocumentBucket(bucket)); - - ASSERT_EQ(size_t(1), _sender.commands().size()); - std::shared_ptr<api::RequestBucketInfoCommand> rbi( - std::dynamic_pointer_cast<RequestBucketInfoCommand>( - _sender.command(0))); - - lib::ClusterState stateAfter("distributor:3 storage:3"); - // Set, but _don't_ enable cluster state. We want it to be pending. - setSystemState(stateAfter); - EXPECT_TRUE(getDistributorBucketSpace().get_bucket_ownership_flags(bucket).owned_in_current_state()); - EXPECT_FALSE(getDistributorBucketSpace().get_bucket_ownership_flags(bucket).owned_in_pending_state()); - - ASSERT_NO_FATAL_FAILURE(sendFakeReplyForSingleBucketRequest(*rbi)); - - EXPECT_EQ(std::string("NONEXISTING"), dumpBucket(bucket)); -} - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -/* - * If we get a distribution config change, it's important that cluster states that - * arrive after this--but _before_ the pending cluster state has finished--must trigger - * a full bucket info fetch no matter what the cluster state change was! Otherwise, we - * will with a high likelihood end up not getting the complete view of the buckets in - * the cluster. - */ -TEST_F(LegacyBucketDBUpdaterTest, cluster_state_always_sends_full_fetch_when_distribution_change_pending) { - lib::ClusterState stateBefore("distributor:6 storage:6"); - { - uint32_t expectedMsgs = messageCount(6), dummyBucketsToReturn = 1; - ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(stateBefore, expectedMsgs, dummyBucketsToReturn)); - } - _sender.clear(); - std::string distConfig(getDistConfig6Nodes2Groups()); - setDistribution(distConfig); - - sortSentMessagesByIndex(_sender); - ASSERT_EQ(messageCount(6), _sender.commands().size()); - // Suddenly, a wild cluster state change appears! Even though this state - // does not in itself imply any bucket changes, it will still overwrite the - // pending cluster state and thus its state of pending bucket info requests. - setSystemState(lib::ClusterState("distributor:6 .2.t:12345 storage:6")); - - ASSERT_EQ(messageCount(12), _sender.commands().size()); - - // Send replies for first messageCount(6) (outdated requests). - int numBuckets = 10; - for (uint32_t i = 0; i < messageCount(6); ++i) { - ASSERT_NO_FATAL_FAILURE(fakeBucketReply(lib::ClusterState("distributor:6 storage:6"), - *_sender.command(i), numBuckets)); - } - // No change from these. - ASSERT_NO_FATAL_FAILURE(assertCorrectBuckets(1, "distributor:6 storage:6")); - - // Send for current pending. - for (uint32_t i = 0; i < messageCount(6); ++i) { - ASSERT_NO_FATAL_FAILURE(fakeBucketReply(lib::ClusterState("distributor:6 .2.t:12345 storage:6"), - *_sender.command(i + messageCount(6)), - numBuckets)); - } - ASSERT_NO_FATAL_FAILURE(assertCorrectBuckets(numBuckets, "distributor:6 storage:6")); - _sender.clear(); - - // No more pending global fetch; this should be a no-op state. - setSystemState(lib::ClusterState("distributor:6 .3.t:12345 storage:6")); - EXPECT_EQ(size_t(0), _sender.commands().size()); -} - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterTest, changed_distribution_config_triggers_recovery_mode) { - ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(lib::ClusterState("distributor:6 storage:6"), messageCount(6), 20)); - _sender.clear(); - EXPECT_TRUE(distributor_is_in_recovery_mode()); - complete_recovery_mode(); - EXPECT_FALSE(distributor_is_in_recovery_mode()); - - std::string distConfig(getDistConfig6Nodes4Groups()); - setDistribution(distConfig); - sortSentMessagesByIndex(_sender); - // No replies received yet, still no recovery mode. - EXPECT_FALSE(distributor_is_in_recovery_mode()); - - ASSERT_EQ(messageCount(6), _sender.commands().size()); - uint32_t numBuckets = 10; - for (uint32_t i = 0; i < messageCount(6); ++i) { - ASSERT_NO_FATAL_FAILURE(fakeBucketReply(lib::ClusterState("distributor:6 storage:6"), - *_sender.command(i), numBuckets)); - } - - // Pending cluster state (i.e. distribution) has been enabled, which should - // cause recovery mode to be entered. - EXPECT_TRUE(distributor_is_in_recovery_mode()); - complete_recovery_mode(); - EXPECT_FALSE(distributor_is_in_recovery_mode()); -} - -namespace { - -template <typename Func> -struct FunctorProcessor : BucketDatabase::EntryProcessor { - Func _f; - - template <typename F> - explicit FunctorProcessor(F&& f) : _f(std::forward<F>(f)) {} - - bool process(const BucketDatabase::ConstEntryRef& e) override { - _f(e); - return true; - } -}; - -template <typename Func> -std::unique_ptr<BucketDatabase::EntryProcessor> func_processor(Func&& f) { - return std::make_unique<FunctorProcessor<Func>>(std::forward<Func>(f)); -} - -} - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterTest, changed_distribution_config_does_not_elide_bucket_db_pruning) { - setDistribution(getDistConfig3Nodes1Group()); - - constexpr uint32_t n_buckets = 100; - ASSERT_NO_FATAL_FAILURE( - setAndEnableClusterState(lib::ClusterState("distributor:6 storage:6"), messageCount(6), n_buckets)); - _sender.clear(); - - // Config implies a different node set than the current cluster state, so it's crucial that - // DB pruning is _not_ elided. Yes, this is inherently racing with cluster state changes and - // should be changed to be atomic and controlled by the cluster controller instead of config. - // But this is where we currently are. - setDistribution(getDistConfig6Nodes2Groups()); - - getBucketDatabase().forEach(*func_processor([&](const auto& e) { - EXPECT_TRUE(getDistributorBucketSpace().get_bucket_ownership_flags(e.getBucketId()).owned_in_pending_state()); - })); -} - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterTest, newly_added_buckets_have_current_time_as_gc_timestamp) { - getClock().setAbsoluteTimeInSeconds(101234); - lib::ClusterState stateBefore("distributor:1 storage:1"); - { - uint32_t expectedMsgs = _bucketSpaces.size(), dummyBucketsToReturn = 1; - ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(stateBefore, expectedMsgs, dummyBucketsToReturn)); - } - - // setAndEnableClusterState adds n buckets with id (16, i) - document::BucketId bucket(16, 0); - BucketDatabase::Entry e(getBucket(bucket)); - ASSERT_TRUE(e.valid()); - EXPECT_EQ(uint32_t(101234), e->getLastGarbageCollectionTime()); -} - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterTest, newer_mutations_not_overwritten_by_earlier_bucket_fetch) { - { - lib::ClusterState stateBefore("distributor:1 storage:1 .0.s:i"); - uint32_t expectedMsgs = _bucketSpaces.size(), dummyBucketsToReturn = 0; - // This step is required to make the distributor ready for accepting - // the below explicit database insertion towards node 0. - ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(stateBefore, expectedMsgs, - dummyBucketsToReturn)); - } - _sender.clear(); - getClock().setAbsoluteTimeInSeconds(1000); - lib::ClusterState state("distributor:1 storage:1"); - setSystemState(state); - ASSERT_EQ(_bucketSpaces.size(), _sender.commands().size()); - - // Before replying with the bucket info, simulate the arrival of a mutation - // reply that alters the state of the bucket with information that will be - // more recent that what is returned by the bucket info. This information - // must not be lost when the bucket info is later merged into the database. - document::BucketId bucket(16, 1); - constexpr uint64_t insertionTimestamp = 1001ULL * 1000000; - api::BucketInfo wantedInfo(5, 6, 7); - getBucketDBUpdater().operation_context().update_bucket_database( - makeDocumentBucket(bucket), - BucketCopy(insertionTimestamp, 0, wantedInfo), - DatabaseUpdate::CREATE_IF_NONEXISTING); - - getClock().setAbsoluteTimeInSeconds(1002); - constexpr uint32_t bucketsReturned = 10; // Buckets (16, 0) ... (16, 9) - // Return bucket information which on the timeline might originate from - // anywhere between [1000, 1002]. Our assumption is that any mutations - // taking place after t=1000 must have its reply received and processed - // by this distributor and timestamped strictly higher than t=1000 (modulo - // clock skew, of course, but that is outside the scope of this). A mutation - // happening before t=1000 but receiving a reply at t>1000 does not affect - // correctness, as this should contain the same bucket info as that - // contained in the full bucket reply and the DB update is thus idempotent. - for (uint32_t i = 0; i < _bucketSpaces.size(); ++i) { - ASSERT_NO_FATAL_FAILURE(fakeBucketReply(state, *_sender.command(i), bucketsReturned)); - } - - BucketDatabase::Entry e(getBucket(bucket)); - ASSERT_EQ(uint32_t(1), e->getNodeCount()); - EXPECT_EQ(wantedInfo, e->getNodeRef(0).getBucketInfo()); -} - -std::vector<uint16_t> -LegacyBucketDBUpdaterTest::getSendSet() const -{ - std::vector<uint16_t> nodes; - std::transform(_sender.commands().begin(), - _sender.commands().end(), - std::back_inserter(nodes), - [](auto& cmd) - { - auto& req(dynamic_cast<const api::RequestBucketInfoCommand&>(*cmd)); - return req.getAddress()->getIndex(); - }); - return nodes; -} - -std::vector<uint16_t> -LegacyBucketDBUpdaterTest::getSentNodesWithPreemption( - const std::string& oldClusterState, - uint32_t expectedOldStateMessages, - const std::string& preemptedClusterState, - const std::string& newClusterState) -{ - lib::ClusterState stateBefore(oldClusterState); - uint32_t dummyBucketsToReturn = 10; - // FIXME cannot chain assertion checks in non-void function - setAndEnableClusterState(lib::ClusterState(oldClusterState), - expectedOldStateMessages, - dummyBucketsToReturn); - - _sender.clear(); - - setSystemState(lib::ClusterState(preemptedClusterState)); - _sender.clear(); - // Do not allow the pending state to become the active state; trigger a - // new transition without ACKing the info requests first. This will - // overwrite the pending state entirely. - setSystemState(lib::ClusterState(newClusterState)); - return getSendSet(); -} - -using nodeVec = std::vector<uint16_t>; - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -/* - * If we don't carry over the set of nodes that we need to fetch from, - * a naive comparison between the active state and the new state will - * make it appear to the distributor that nothing has changed, as any - * database modifications caused by intermediate states will not be - * accounted for (basically the ABA problem in a distributed setting). - */ -TEST_F(LegacyBucketDBUpdaterTest, preempted_distributor_change_carries_node_set_over_to_next_state_fetch) { - EXPECT_EQ( - expandNodeVec({0, 1, 2, 3, 4, 5}), - getSentNodesWithPreemption("version:1 distributor:6 storage:6", - messageCount(6), - "version:2 distributor:6 .5.s:d storage:6", - "version:3 distributor:6 storage:6")); -} - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterTest, preempted_storage_change_carries_node_set_over_to_next_state_fetch) { - EXPECT_EQ( - expandNodeVec({2, 3}), - getSentNodesWithPreemption( - "version:1 distributor:6 storage:6 .2.s:d", - messageCount(5), - "version:2 distributor:6 storage:6 .2.s:d .3.s:d", - "version:3 distributor:6 storage:6")); -} - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterTest, preempted_storage_node_down_must_be_re_fetched) { - EXPECT_EQ( - expandNodeVec({2}), - getSentNodesWithPreemption( - "version:1 distributor:6 storage:6", - messageCount(6), - "version:2 distributor:6 storage:6 .2.s:d", - "version:3 distributor:6 storage:6")); -} - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterTest, do_not_send_to_preempted_node_now_in_down_state) { - EXPECT_EQ( - nodeVec{}, - getSentNodesWithPreemption( - "version:1 distributor:6 storage:6 .2.s:d", - messageCount(5), - "version:2 distributor:6 storage:6", // Sends to 2. - "version:3 distributor:6 storage:6 .2.s:d")); // 2 down again. -} - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterTest, doNotSendToPreemptedNodeNotPartOfNewState) { - // Even though 100 nodes are preempted, not all of these should be part - // of the request afterwards when only 6 are part of the state. - EXPECT_EQ( - expandNodeVec({0, 1, 2, 3, 4, 5}), - getSentNodesWithPreemption( - "version:1 distributor:6 storage:100", - messageCount(100), - "version:2 distributor:5 .4.s:d storage:100", - "version:3 distributor:6 storage:6")); -} - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterTest, outdated_node_set_cleared_after_successful_state_completion) { - lib::ClusterState stateBefore( - "version:1 distributor:6 storage:6 .1.t:1234"); - uint32_t expectedMsgs = messageCount(6), dummyBucketsToReturn = 10; - ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(stateBefore, expectedMsgs, dummyBucketsToReturn)); - _sender.clear(); - // New cluster state that should not by itself trigger any new fetches, - // unless outdated node set is somehow not cleared after an enabled - // (completed) cluster state has been set. - lib::ClusterState stateAfter("version:3 distributor:6 storage:6"); - setSystemState(stateAfter); - EXPECT_EQ(size_t(0), _sender.commands().size()); -} - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest (despite being disabled) -// XXX test currently disabled since distribution config currently isn't used -// at all in order to deduce the set of nodes to send to. This might not matter -// in practice since it is assumed that the cluster state matching the new -// distribution config will follow very shortly after the config has been -// applied to the node. The new cluster state will then send out requests to -// the correct node set. -TEST_F(LegacyBucketDBUpdaterTest, DISABLED_cluster_config_downsize_only_sends_to_available_nodes) { - uint32_t expectedMsgs = 6, dummyBucketsToReturn = 20; - ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(lib::ClusterState("distributor:6 storage:6"), - expectedMsgs, dummyBucketsToReturn)); - _sender.clear(); - - // Intentionally trigger a racing config change which arrives before the - // new cluster state representing it. - std::string distConfig(getDistConfig3Nodes1Group()); - setDistribution(distConfig); - sortSentMessagesByIndex(_sender); - - EXPECT_EQ((nodeVec{0, 1, 2}), getSendSet()); -} - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -/** - * Test scenario where a cluster is downsized by removing a subset of the nodes - * from the distribution configuration. The system must be able to deal with - * a scenario where the set of nodes between two cluster states across a config - * change may differ. - * - * See VESPA-790 for details. - */ -TEST_F(LegacyBucketDBUpdaterTest, node_missing_from_config_is_treated_as_needing_ownership_transfer) { - uint32_t expectedMsgs = messageCount(3), dummyBucketsToReturn = 1; - ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(lib::ClusterState("distributor:3 storage:3"), - expectedMsgs, dummyBucketsToReturn)); - _sender.clear(); - - // Cluster goes from {0, 1, 2} -> {0, 1}. This leaves us with a config - // that does not contain node 2 while the _active_ cluster state still - // contains this node. - const char* downsizeCfg = - "redundancy 2\n" - "distributor_auto_ownership_transfer_on_whole_group_down true\n" - "group[2]\n" - "group[0].name \"invalid\"\n" - "group[0].index \"invalid\"\n" - "group[0].partitions 1|*\n" - "group[0].nodes[0]\n" - "group[1].name rack0\n" - "group[1].index 0\n" - "group[1].nodes[2]\n" - "group[1].nodes[0].index 0\n" - "group[1].nodes[1].index 1\n"; - - setDistribution(downsizeCfg); - sortSentMessagesByIndex(_sender); - _sender.clear(); - - // Attempt to apply state with {0, 1} set. This will compare the new state - // with the previous state, which still has node 2. - expectedMsgs = messageCount(2); - ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(lib::ClusterState("distributor:2 storage:2"), - expectedMsgs, dummyBucketsToReturn)); - - EXPECT_EQ(expandNodeVec({0, 1}), getSendSet()); -} - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterTest, changed_distributor_set_implies_ownership_transfer) { - auto fixture = createPendingStateFixtureForStateChange( - "distributor:2 storage:2", "distributor:1 storage:2"); - EXPECT_TRUE(fixture->state->hasBucketOwnershipTransfer()); - - fixture = createPendingStateFixtureForStateChange( - "distributor:2 storage:2", "distributor:2 .1.s:d storage:2"); - EXPECT_TRUE(fixture->state->hasBucketOwnershipTransfer()); -} - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterTest, unchanged_distributor_set_implies_no_ownership_transfer) { - auto fixture = createPendingStateFixtureForStateChange( - "distributor:2 storage:2", "distributor:2 storage:1"); - EXPECT_FALSE(fixture->state->hasBucketOwnershipTransfer()); - - fixture = createPendingStateFixtureForStateChange( - "distributor:2 storage:2", "distributor:2 storage:2 .1.s:d"); - EXPECT_FALSE(fixture->state->hasBucketOwnershipTransfer()); -} - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterTest, changed_distribution_config_implies_ownership_transfer) { - auto fixture = createPendingStateFixtureForDistributionChange( - "distributor:2 storage:2"); - EXPECT_TRUE(fixture->state->hasBucketOwnershipTransfer()); -} - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterTest, transition_time_tracked_for_single_state_change) { - ASSERT_NO_FATAL_FAILURE(completeStateTransitionInSeconds("distributor:2 storage:2", 5, messageCount(2))); - - EXPECT_EQ(uint64_t(5000), lastTransitionTimeInMillis()); -} - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterTest, transition_time_reset_across_non_preempting_state_changes) { - ASSERT_NO_FATAL_FAILURE(completeStateTransitionInSeconds("distributor:2 storage:2", 5, messageCount(2))); - ASSERT_NO_FATAL_FAILURE(completeStateTransitionInSeconds("distributor:2 storage:3", 3, messageCount(1))); - - EXPECT_EQ(uint64_t(3000), lastTransitionTimeInMillis()); -} - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterTest, transition_time_tracked_for_distribution_config_change) { - lib::ClusterState state("distributor:2 storage:2"); - ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(state, messageCount(2), 1)); - - _sender.clear(); - std::string distConfig(getDistConfig3Nodes1Group()); - setDistribution(distConfig); - getClock().addSecondsToTime(4); - ASSERT_NO_FATAL_FAILURE(completeBucketInfoGathering(state, messageCount(2))); - EXPECT_EQ(uint64_t(4000), lastTransitionTimeInMillis()); -} - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterTest, transition_time_tracked_across_preempted_transitions) { - _sender.clear(); - lib::ClusterState state("distributor:2 storage:2"); - setSystemState(state); - getClock().addSecondsToTime(5); - // Pre-empted with new state here, which will push out the old pending - // state and replace it with a new one. We should still count the time - // used processing the old state. - ASSERT_NO_FATAL_FAILURE(completeStateTransitionInSeconds("distributor:2 storage:3", 3, messageCount(3))); - - EXPECT_EQ(uint64_t(8000), lastTransitionTimeInMillis()); -} - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -/* - * Brief reminder on test DSL for checking bucket merge operations: - * - * mergeBucketLists() takes as input strings of the format - * <node>:<raw bucket id>/<checksum>/<num docs>/<doc size>|<node>: - * and returns a string describing the bucket DB post-merge with the format - * <raw bucket id>:<node>/<checksum>/<num docs>/<doc size>,<node>:....|<raw bucket id>:.... - * - * Yes, the order of node<->bucket id is reversed between the two, perhaps to make sure you're awake. - */ - -TEST_F(LegacyBucketDBUpdaterTest, batch_update_of_existing_diverging_replicas_does_not_mark_any_as_trusted) { - // Replacing bucket information for content node 0 should not mark existing - // untrusted replica as trusted as a side effect. - EXPECT_EQ( - std::string("5:1/7/8/9/u,0/1/2/3/u|"), - mergeBucketLists( - lib::ClusterState("distributor:1 storage:3 .0.s:i"), - "0:5/0/0/0|1:5/7/8/9", - lib::ClusterState("distributor:1 storage:3 .0.s:u"), - "0:5/1/2/3|1:5/7/8/9", true)); -} - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterTest, batch_add_of_new_diverging_replicas_does_not_mark_any_as_trusted) { - EXPECT_EQ(std::string("5:1/7/8/9/u,0/1/2/3/u|"), - mergeBucketLists("", "0:5/1/2/3|1:5/7/8/9", true)); -} - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterTest, batch_add_with_single_resulting_replica_implicitly_marks_as_trusted) { - EXPECT_EQ(std::string("5:0/1/2/3/t|"), - mergeBucketLists("", "0:5/1/2/3", true)); -} - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterTest, identity_update_of_single_replica_does_not_clear_trusted) { - EXPECT_EQ(std::string("5:0/1/2/3/t|"), - mergeBucketLists("0:5/1/2/3", "0:5/1/2/3", true)); -} - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterTest, identity_update_of_diverging_untrusted_replicas_does_not_mark_any_as_trusted) { - EXPECT_EQ(std::string("5:1/7/8/9/u,0/1/2/3/u|"), - mergeBucketLists("0:5/1/2/3|1:5/7/8/9", "0:5/1/2/3|1:5/7/8/9", true)); -} - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterTest, adding_diverging_replica_to_existing_trusted_does_not_remove_trusted) { - EXPECT_EQ(std::string("5:1/2/3/4/u,0/1/2/3/t|"), - mergeBucketLists("0:5/1/2/3", "0:5/1/2/3|1:5/2/3/4", true)); -} - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterTest, batch_update_from_distributor_change_does_not_mark_diverging_replicas_as_trusted) { - // This differs from batch_update_of_existing_diverging_replicas_does_not_mark_any_as_trusted - // in that _all_ content nodes are considered outdated when distributor changes take place, - // and therefore a slightly different code path is taken. In particular, bucket info for - // outdated nodes gets removed before possibly being re-added (if present in the bucket info - // response). - EXPECT_EQ( - std::string("5:1/7/8/9/u,0/1/2/3/u|"), - mergeBucketLists( - lib::ClusterState("distributor:2 storage:3"), - "0:5/1/2/3|1:5/7/8/9", - lib::ClusterState("distributor:1 storage:3"), - "0:5/1/2/3|1:5/7/8/9", true)); -} - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -// TODO remove on Vespa 8 - this is a workaround for https://github.com/vespa-engine/vespa/issues/8475 -TEST_F(LegacyBucketDBUpdaterTest, global_distribution_hash_falls_back_to_legacy_format_upon_request_rejection) { - std::string distConfig(getDistConfig6Nodes2Groups()); - setDistribution(distConfig); - - const vespalib::string current_hash = "(0d*|*(0;0;1;2)(1;3;4;5))"; - const vespalib::string legacy_hash = "(0d3|3|*(0;0;1;2)(1;3;4;5))"; - - setSystemState(lib::ClusterState("distributor:6 storage:6")); - ASSERT_EQ(messageCount(6), _sender.commands().size()); - - api::RequestBucketInfoCommand* global_req = nullptr; - for (auto& cmd : _sender.commands()) { - auto& req_cmd = dynamic_cast<api::RequestBucketInfoCommand&>(*cmd); - if (req_cmd.getBucketSpace() == document::FixedBucketSpaces::global_space()) { - global_req = &req_cmd; - break; - } - } - ASSERT_TRUE(global_req != nullptr); - ASSERT_EQ(current_hash, global_req->getDistributionHash()); - - auto reply = std::make_shared<api::RequestBucketInfoReply>(*global_req); - reply->setResult(api::ReturnCode::REJECTED); - getBucketDBUpdater().onRequestBucketInfoReply(reply); - - getClock().addSecondsToTime(10); - getBucketDBUpdater().resendDelayedMessages(); - - // Should now be a resent request with legacy distribution hash - ASSERT_EQ(messageCount(6) + 1, _sender.commands().size()); - auto& legacy_req = dynamic_cast<api::RequestBucketInfoCommand&>(*_sender.commands().back()); - ASSERT_EQ(legacy_hash, legacy_req.getDistributionHash()); - - // Now if we reject it _again_ we should cycle back to the current hash - // in case it wasn't a hash-based rejection after all. And the circle of life continues. - reply = std::make_shared<api::RequestBucketInfoReply>(legacy_req); - reply->setResult(api::ReturnCode::REJECTED); - getBucketDBUpdater().onRequestBucketInfoReply(reply); - - getClock().addSecondsToTime(10); - getBucketDBUpdater().resendDelayedMessages(); - - ASSERT_EQ(messageCount(6) + 2, _sender.commands().size()); - auto& new_current_req = dynamic_cast<api::RequestBucketInfoCommand&>(*_sender.commands().back()); - ASSERT_EQ(current_hash, new_current_req.getDistributionHash()); -} - -namespace { - -template <typename Func> -void for_each_bucket(const BucketDatabase& db, const document::BucketSpace& space, Func&& f) { - BucketId last(0); - auto e = db.getNext(last); - while (e.valid()) { - f(space, e); - e = db.getNext(e.getBucketId()); - } -} - -template <typename Func> -void for_each_bucket(const DistributorBucketSpaceRepo& repo, Func&& f) { - for (const auto& space : repo) { - for_each_bucket(space.second->getBucketDatabase(), space.first, f); - } -} - -} - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterTest, non_owned_buckets_moved_to_read_only_db_on_ownership_change) { - getBucketDBUpdater().set_stale_reads_enabled(true); - - lib::ClusterState initial_state("distributor:1 storage:4"); // All buckets owned by us by definition - set_cluster_state_bundle(lib::ClusterStateBundle(initial_state, {}, false)); // Skip activation step for simplicity - - ASSERT_EQ(messageCount(4), _sender.commands().size()); - constexpr uint32_t n_buckets = 10; - ASSERT_NO_FATAL_FAILURE(completeBucketInfoGathering(initial_state, messageCount(4), n_buckets)); - _sender.clear(); - - EXPECT_EQ(size_t(n_buckets), mutable_default_db().size()); - EXPECT_EQ(size_t(n_buckets), mutable_global_db().size()); - EXPECT_EQ(size_t(0), read_only_default_db().size()); - EXPECT_EQ(size_t(0), read_only_global_db().size()); - - lib::ClusterState pending_state("distributor:2 storage:4"); - - std::unordered_set<Bucket, Bucket::hash> buckets_not_owned_in_pending_state; - for_each_bucket(mutable_repo(), [&](const auto& space, const auto& entry) { - if (!getDistributorBucketSpace().owns_bucket_in_state(pending_state, entry.getBucketId())) { - buckets_not_owned_in_pending_state.insert(Bucket(space, entry.getBucketId())); - } - }); - EXPECT_FALSE(buckets_not_owned_in_pending_state.empty()); - - set_cluster_state_bundle(lib::ClusterStateBundle(pending_state, {}, true)); // Now requires activation - - const auto buckets_not_owned_per_space = (buckets_not_owned_in_pending_state.size() / 2); // 2 spaces - const auto expected_mutable_buckets = n_buckets - buckets_not_owned_per_space; - EXPECT_EQ(expected_mutable_buckets, mutable_default_db().size()); - EXPECT_EQ(expected_mutable_buckets, mutable_global_db().size()); - EXPECT_EQ(buckets_not_owned_per_space, read_only_default_db().size()); - EXPECT_EQ(buckets_not_owned_per_space, read_only_global_db().size()); - - for_each_bucket(read_only_repo(), [&](const auto& space, const auto& entry) { - EXPECT_TRUE(buckets_not_owned_in_pending_state.find(Bucket(space, entry.getBucketId())) - != buckets_not_owned_in_pending_state.end()); - }); -} - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterTest, buckets_no_longer_available_are_not_moved_to_read_only_database) { - constexpr uint32_t n_buckets = 10; - // No ownership change, just node down. Test redundancy is 2, so removing 2 nodes will - // cause some buckets to be entirely unavailable. - trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4, - "version:2 distributor:1 storage:4 .0.s:d .1.s:m", n_buckets, 0); - - EXPECT_EQ(size_t(0), read_only_default_db().size()); - EXPECT_EQ(size_t(0), read_only_global_db().size()); -} - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterTest, non_owned_buckets_purged_when_read_only_support_is_config_disabled) { - getBucketDBUpdater().set_stale_reads_enabled(false); - - lib::ClusterState initial_state("distributor:1 storage:4"); // All buckets owned by us by definition - set_cluster_state_bundle(lib::ClusterStateBundle(initial_state, {}, false)); // Skip activation step for simplicity - - ASSERT_EQ(messageCount(4), _sender.commands().size()); - constexpr uint32_t n_buckets = 10; - ASSERT_NO_FATAL_FAILURE(completeBucketInfoGathering(initial_state, messageCount(4), n_buckets)); - _sender.clear(); - - // Nothing in read-only DB after first bulk load of buckets. - EXPECT_EQ(size_t(0), read_only_default_db().size()); - EXPECT_EQ(size_t(0), read_only_global_db().size()); - - lib::ClusterState pending_state("distributor:2 storage:4"); - setSystemState(pending_state); - // No buckets should be moved into read only db after ownership changes. - EXPECT_EQ(size_t(0), read_only_default_db().size()); - EXPECT_EQ(size_t(0), read_only_global_db().size()); -} - -void LegacyBucketDBUpdaterTest::trigger_completed_but_not_yet_activated_transition( - vespalib::stringref initial_state_str, - uint32_t initial_buckets, - uint32_t initial_expected_msgs, - vespalib::stringref pending_state_str, - uint32_t pending_buckets, - uint32_t pending_expected_msgs) -{ - lib::ClusterState initial_state(initial_state_str); - setSystemState(initial_state); - ASSERT_EQ(messageCount(initial_expected_msgs), _sender.commands().size()); - ASSERT_NO_FATAL_FAILURE(completeBucketInfoGathering( - initial_state, messageCount(initial_expected_msgs), initial_buckets)); - _sender.clear(); - - lib::ClusterState pending_state(pending_state_str); // Ownership change - set_cluster_state_bundle(lib::ClusterStateBundle(pending_state, {}, true)); - ASSERT_EQ(messageCount(pending_expected_msgs), _sender.commands().size()); - ASSERT_NO_FATAL_FAILURE(completeBucketInfoGathering( - pending_state, messageCount(pending_expected_msgs), pending_buckets)); - _sender.clear(); -} - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterTest, deferred_activated_state_does_not_enable_state_until_activation_received) { - getBucketDBUpdater().set_stale_reads_enabled(true); - constexpr uint32_t n_buckets = 10; - ASSERT_NO_FATAL_FAILURE( - trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4, - "version:2 distributor:1 storage:4", n_buckets, 4)); - - // Version should not be switched over yet - EXPECT_EQ(uint32_t(1), current_distributor_cluster_state_bundle().getVersion()); - - EXPECT_EQ(uint64_t(0), mutable_default_db().size()); - EXPECT_EQ(uint64_t(0), mutable_global_db().size()); - - EXPECT_FALSE(activate_cluster_state_version(2)); - - EXPECT_EQ(uint32_t(2), current_distributor_cluster_state_bundle().getVersion()); - EXPECT_EQ(uint64_t(n_buckets), mutable_default_db().size()); - EXPECT_EQ(uint64_t(n_buckets), mutable_global_db().size()); -} - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterTest, read_only_db_cleared_once_pending_state_is_activated) { - getBucketDBUpdater().set_stale_reads_enabled(true); - constexpr uint32_t n_buckets = 10; - ASSERT_NO_FATAL_FAILURE( - trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4, - "version:2 distributor:2 storage:4", n_buckets, 0)); - EXPECT_FALSE(activate_cluster_state_version(2)); - - EXPECT_EQ(uint64_t(0), read_only_default_db().size()); - EXPECT_EQ(uint64_t(0), read_only_global_db().size()); -} - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterTest, read_only_db_is_populated_even_when_self_is_marked_down) { - getBucketDBUpdater().set_stale_reads_enabled(true); - constexpr uint32_t n_buckets = 10; - ASSERT_NO_FATAL_FAILURE( - trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4, - "version:2 distributor:1 .0.s:d storage:4", n_buckets, 0)); - - // State not yet activated, so read-only DBs have got all the buckets we used to have. - EXPECT_EQ(uint64_t(0), mutable_default_db().size()); - EXPECT_EQ(uint64_t(0), mutable_global_db().size()); - EXPECT_EQ(uint64_t(n_buckets), read_only_default_db().size()); - EXPECT_EQ(uint64_t(n_buckets), read_only_global_db().size()); -} - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterTest, activate_cluster_state_request_with_mismatching_version_returns_actual_version) { - getBucketDBUpdater().set_stale_reads_enabled(true); - constexpr uint32_t n_buckets = 10; - ASSERT_NO_FATAL_FAILURE( - trigger_completed_but_not_yet_activated_transition("version:4 distributor:1 storage:4", n_buckets, 4, - "version:5 distributor:2 storage:4", n_buckets, 0)); - - EXPECT_TRUE(activate_cluster_state_version(4)); // Too old version - ASSERT_NO_FATAL_FAILURE(assert_has_activate_cluster_state_reply_with_actual_version(5)); - - EXPECT_TRUE(activate_cluster_state_version(6)); // More recent version than what has been observed - ASSERT_NO_FATAL_FAILURE(assert_has_activate_cluster_state_reply_with_actual_version(5)); -} - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterTest, activate_cluster_state_request_without_pending_transition_passes_message_through) { - getBucketDBUpdater().set_stale_reads_enabled(true); - constexpr uint32_t n_buckets = 10; - ASSERT_NO_FATAL_FAILURE( - trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4, - "version:2 distributor:1 storage:4", n_buckets, 4)); - // Activate version 2; no pending cluster state after this. - EXPECT_FALSE(activate_cluster_state_version(2)); - - // No pending cluster state for version 3, just passed through to be implicitly bounced by state manager. - // Note: state manager is not modelled in this test, so we just check that the message handler returns - // false (meaning "didn't take message ownership") and there's no auto-generated reply. - EXPECT_FALSE(activate_cluster_state_version(3)); - EXPECT_EQ(size_t(0), _sender.replies().size()); -} - -// TODO STRIPE disabled benchmark tests are NOT migrated to new test suite -TEST_F(LegacyBucketDBUpdaterTest, DISABLED_benchmark_bulk_loading_into_empty_db) { - // Need to trigger an initial edge to complete first bucket scan - ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(lib::ClusterState("distributor:2 storage:1"), - messageCount(1), 0)); - _sender.clear(); - - lib::ClusterState state("distributor:1 storage:1"); - setSystemState(state); - - constexpr uint32_t superbuckets = 1u << 16u; - constexpr uint32_t sub_buckets = 14; - constexpr uint32_t n_buckets = superbuckets * sub_buckets; - - ASSERT_EQ(_bucketSpaces.size(), _sender.commands().size()); - for (uint32_t bsi = 0; bsi < _bucketSpaces.size(); ++bsi) { - ASSERT_EQ(_sender.command(bsi)->getType(), MessageType::REQUESTBUCKETINFO); - const auto& req = dynamic_cast<const RequestBucketInfoCommand&>(*_sender.command(bsi)); - - auto sreply = std::make_shared<RequestBucketInfoReply>(req); - sreply->setAddress(storageAddress(0)); - auto& vec = sreply->getBucketInfo(); - if (req.getBucketSpace() == FixedBucketSpaces::default_space()) { - for (uint32_t sb = 0; sb < superbuckets; ++sb) { - for (uint64_t i = 0; i < sub_buckets; ++i) { - document::BucketId bucket(48, (i << 32ULL) | sb); - vec.push_back(api::RequestBucketInfoReply::Entry(bucket, api::BucketInfo(10, 1, 1))); - } - } - } - - vespalib::BenchmarkTimer timer(1.0); - // Global space has no buckets but will serve as a trigger for merging - // buckets into the DB. This lets us measure the overhead of just this part. - if (req.getBucketSpace() == FixedBucketSpaces::global_space()) { - timer.before(); - } - getBucketDBUpdater().onRequestBucketInfoReply(sreply); - if (req.getBucketSpace() == FixedBucketSpaces::global_space()) { - timer.after(); - fprintf(stderr, "Took %g seconds to merge %u buckets into DB\n", timer.min_time(), n_buckets); - } - } - - EXPECT_EQ(size_t(n_buckets), mutable_default_db().size()); - EXPECT_EQ(size_t(0), mutable_global_db().size()); -} - -uint32_t LegacyBucketDBUpdaterTest::populate_bucket_db_via_request_bucket_info_for_benchmarking() { - // Need to trigger an initial edge to complete first bucket scan - setAndEnableClusterState(lib::ClusterState("distributor:2 storage:1"), messageCount(1), 0); - _sender.clear(); - - lib::ClusterState state("distributor:1 storage:1"); - setSystemState(state); - - constexpr uint32_t superbuckets = 1u << 16u; - constexpr uint32_t sub_buckets = 14; - constexpr uint32_t n_buckets = superbuckets * sub_buckets; - - assert(_bucketSpaces.size() == _sender.commands().size()); - for (uint32_t bsi = 0; bsi < _bucketSpaces.size(); ++bsi) { - assert(_sender.command(bsi)->getType() == MessageType::REQUESTBUCKETINFO); - const auto& req = dynamic_cast<const RequestBucketInfoCommand&>(*_sender.command(bsi)); - - auto sreply = std::make_shared<RequestBucketInfoReply>(req); - sreply->setAddress(storageAddress(0)); - auto& vec = sreply->getBucketInfo(); - if (req.getBucketSpace() == FixedBucketSpaces::default_space()) { - for (uint32_t sb = 0; sb < superbuckets; ++sb) { - for (uint64_t i = 0; i < sub_buckets; ++i) { - document::BucketId bucket(48, (i << 32ULL) | sb); - vec.push_back(api::RequestBucketInfoReply::Entry(bucket, api::BucketInfo(10, 1, 1))); - } - } - } - getBucketDBUpdater().onRequestBucketInfoReply(sreply); - } - - assert(mutable_default_db().size() == n_buckets); - assert(mutable_global_db().size() == 0); - return n_buckets; -} - -TEST_F(LegacyBucketDBUpdaterTest, DISABLED_benchmark_removing_buckets_for_unavailable_storage_nodes) { - const uint32_t n_buckets = populate_bucket_db_via_request_bucket_info_for_benchmarking(); - - lib::ClusterState no_op_state("distributor:1 storage:1 .0.s:m"); // Removing all buckets via ownership - vespalib::BenchmarkTimer timer(1.0); - timer.before(); - setSystemState(no_op_state); - timer.after(); - fprintf(stderr, "Took %g seconds to scan and remove %u buckets\n", timer.min_time(), n_buckets); -} - -TEST_F(LegacyBucketDBUpdaterTest, DISABLED_benchmark_no_buckets_removed_during_node_remover_db_pass) { - const uint32_t n_buckets = populate_bucket_db_via_request_bucket_info_for_benchmarking(); - - // TODO this benchmark is void if we further restrict the pruning elision logic to allow - // elision when storage nodes come online. - lib::ClusterState no_op_state("distributor:1 storage:2"); // Not removing any buckets - vespalib::BenchmarkTimer timer(1.0); - timer.before(); - setSystemState(no_op_state); - timer.after(); - fprintf(stderr, "Took %g seconds to scan %u buckets with no-op action\n", timer.min_time(), n_buckets); -} - -TEST_F(LegacyBucketDBUpdaterTest, DISABLED_benchmark_all_buckets_removed_during_node_remover_db_pass) { - const uint32_t n_buckets = populate_bucket_db_via_request_bucket_info_for_benchmarking(); - - lib::ClusterState no_op_state("distributor:1 storage:1 .0.s:m"); // Removing all buckets via all replicas gone - vespalib::BenchmarkTimer timer(1.0); - timer.before(); - setSystemState(no_op_state); - timer.after(); - fprintf(stderr, "Took %g seconds to scan and remove %u buckets\n", timer.min_time(), n_buckets); -} - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterTest, pending_cluster_state_getter_is_non_null_only_when_state_is_pending) { - auto initial_baseline = std::make_shared<lib::ClusterState>("distributor:1 storage:2 .0.s:d"); - auto initial_default = std::make_shared<lib::ClusterState>("distributor:1 storage:2 .0.s:m"); - - lib::ClusterStateBundle initial_bundle(*initial_baseline, {{FixedBucketSpaces::default_space(), initial_default}, - {FixedBucketSpaces::global_space(), initial_baseline}}); - set_cluster_state_bundle(initial_bundle); - - auto* state = getBucketDBUpdater().pendingClusterStateOrNull(FixedBucketSpaces::default_space()); - ASSERT_TRUE(state != nullptr); - EXPECT_EQ(*initial_default, *state); - - state = getBucketDBUpdater().pendingClusterStateOrNull(FixedBucketSpaces::global_space()); - ASSERT_TRUE(state != nullptr); - EXPECT_EQ(*initial_baseline, *state); - - ASSERT_NO_FATAL_FAILURE(completeBucketInfoGathering(*initial_baseline, messageCount(1), 0)); - - state = getBucketDBUpdater().pendingClusterStateOrNull(FixedBucketSpaces::default_space()); - EXPECT_TRUE(state == nullptr); - - state = getBucketDBUpdater().pendingClusterStateOrNull(FixedBucketSpaces::global_space()); - EXPECT_TRUE(state == nullptr); -} - -struct LegacyBucketDBUpdaterSnapshotTest : LegacyBucketDBUpdaterTest { - lib::ClusterState empty_state; - std::shared_ptr<lib::ClusterState> initial_baseline; - std::shared_ptr<lib::ClusterState> initial_default; - lib::ClusterStateBundle initial_bundle; - Bucket default_bucket; - Bucket global_bucket; - - LegacyBucketDBUpdaterSnapshotTest() - : LegacyBucketDBUpdaterTest(), - empty_state(), - initial_baseline(std::make_shared<lib::ClusterState>("distributor:1 storage:2 .0.s:d")), - initial_default(std::make_shared<lib::ClusterState>("distributor:1 storage:2 .0.s:m")), - initial_bundle(*initial_baseline, {{FixedBucketSpaces::default_space(), initial_default}, - {FixedBucketSpaces::global_space(), initial_baseline}}), - default_bucket(FixedBucketSpaces::default_space(), BucketId(16, 1234)), - global_bucket(FixedBucketSpaces::global_space(), BucketId(16, 1234)) - { - } - ~LegacyBucketDBUpdaterSnapshotTest() override; - - void SetUp() override { - LegacyBucketDBUpdaterTest::SetUp(); - getBucketDBUpdater().set_stale_reads_enabled(true); - }; - - // Assumes that the distributor owns all buckets, so it may choose any arbitrary bucket in the bucket space - uint32_t buckets_in_snapshot_matching_current_db(DistributorBucketSpaceRepo& repo, BucketSpace bucket_space) { - auto rs = getBucketDBUpdater().read_snapshot_for_bucket(Bucket(bucket_space, BucketId(16, 1234))); - if (!rs.is_routable()) { - return 0; - } - auto guard = rs.steal_read_guard(); - uint32_t found_buckets = 0; - for_each_bucket(repo, [&](const auto& space, const auto& entry) { - if (space == bucket_space) { - auto entries = guard->find_parents_and_self(entry.getBucketId()); - if (entries.size() == 1) { - ++found_buckets; - } - } - }); - return found_buckets; - } -}; - -LegacyBucketDBUpdaterSnapshotTest::~LegacyBucketDBUpdaterSnapshotTest() = default; - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterSnapshotTest, default_space_snapshot_prior_to_activated_state_is_non_routable) { - auto rs = getBucketDBUpdater().read_snapshot_for_bucket(default_bucket); - EXPECT_FALSE(rs.is_routable()); -} - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterSnapshotTest, global_space_snapshot_prior_to_activated_state_is_non_routable) { - auto rs = getBucketDBUpdater().read_snapshot_for_bucket(global_bucket); - EXPECT_FALSE(rs.is_routable()); -} - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterSnapshotTest, read_snapshot_returns_appropriate_cluster_states) { - set_cluster_state_bundle(initial_bundle); - // State currently pending, empty initial state is active - - auto def_rs = getBucketDBUpdater().read_snapshot_for_bucket(default_bucket); - EXPECT_EQ(def_rs.context().active_cluster_state()->toString(), empty_state.toString()); - EXPECT_EQ(def_rs.context().default_active_cluster_state()->toString(), empty_state.toString()); - ASSERT_TRUE(def_rs.context().has_pending_state_transition()); - EXPECT_EQ(def_rs.context().pending_cluster_state()->toString(), initial_default->toString()); - - auto global_rs = getBucketDBUpdater().read_snapshot_for_bucket(global_bucket); - EXPECT_EQ(global_rs.context().active_cluster_state()->toString(), empty_state.toString()); - EXPECT_EQ(global_rs.context().default_active_cluster_state()->toString(), empty_state.toString()); - ASSERT_TRUE(global_rs.context().has_pending_state_transition()); - EXPECT_EQ(global_rs.context().pending_cluster_state()->toString(), initial_baseline->toString()); - - ASSERT_NO_FATAL_FAILURE(completeBucketInfoGathering(*initial_baseline, messageCount(1), 0)); - // State now activated, no pending - - def_rs = getBucketDBUpdater().read_snapshot_for_bucket(default_bucket); - EXPECT_EQ(def_rs.context().active_cluster_state()->toString(), initial_default->toString()); - EXPECT_EQ(def_rs.context().default_active_cluster_state()->toString(), initial_default->toString()); - EXPECT_FALSE(def_rs.context().has_pending_state_transition()); - - global_rs = getBucketDBUpdater().read_snapshot_for_bucket(global_bucket); - EXPECT_EQ(global_rs.context().active_cluster_state()->toString(), initial_baseline->toString()); - EXPECT_EQ(global_rs.context().default_active_cluster_state()->toString(), initial_default->toString()); - EXPECT_FALSE(global_rs.context().has_pending_state_transition()); -} - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterSnapshotTest, snapshot_with_no_pending_state_transition_returns_mutable_db_guard) { - constexpr uint32_t n_buckets = 10; - ASSERT_NO_FATAL_FAILURE( - trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4, - "version:2 distributor:1 storage:4", n_buckets, 4)); - EXPECT_FALSE(activate_cluster_state_version(2)); - EXPECT_EQ(buckets_in_snapshot_matching_current_db(mutable_repo(), FixedBucketSpaces::default_space()), - n_buckets); - EXPECT_EQ(buckets_in_snapshot_matching_current_db(mutable_repo(), FixedBucketSpaces::global_space()), - n_buckets); -} - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterSnapshotTest, snapshot_returns_unroutable_for_non_owned_bucket_in_current_state) { - ASSERT_NO_FATAL_FAILURE( - trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4, - "version:2 distributor:2 .0.s:d storage:4", 0, 0)); - EXPECT_FALSE(activate_cluster_state_version(2)); - // We're down in state 2 and therefore do not own any buckets - auto def_rs = getBucketDBUpdater().read_snapshot_for_bucket(default_bucket); - EXPECT_FALSE(def_rs.is_routable()); -} - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterSnapshotTest, snapshot_with_pending_state_returns_read_only_guard_for_bucket_only_owned_in_current_state) { - constexpr uint32_t n_buckets = 10; - ASSERT_NO_FATAL_FAILURE( - trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4, - "version:2 distributor:2 .0.s:d storage:4", 0, 0)); - EXPECT_EQ(buckets_in_snapshot_matching_current_db(read_only_repo(), FixedBucketSpaces::default_space()), - n_buckets); - EXPECT_EQ(buckets_in_snapshot_matching_current_db(read_only_repo(), FixedBucketSpaces::global_space()), - n_buckets); -} - -// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest -TEST_F(LegacyBucketDBUpdaterSnapshotTest, snapshot_is_unroutable_if_stale_reads_disabled_and_bucket_not_owned_in_pending_state) { - getBucketDBUpdater().set_stale_reads_enabled(false); - constexpr uint32_t n_buckets = 10; - ASSERT_NO_FATAL_FAILURE( - trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4, - "version:2 distributor:2 .0.s:d storage:4", 0, 0)); - auto def_rs = getBucketDBUpdater().read_snapshot_for_bucket(default_bucket); - EXPECT_FALSE(def_rs.is_routable()); -} - -} diff --git a/storage/src/tests/distributor/legacy_distributor_test.cpp b/storage/src/tests/distributor/legacy_distributor_test.cpp deleted file mode 100644 index 90d64ddb130..00000000000 --- a/storage/src/tests/distributor/legacy_distributor_test.cpp +++ /dev/null @@ -1,1326 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include <vespa/storage/distributor/idealstatemetricsset.h> -#include <vespa/storageapi/message/persistence.h> -#include <vespa/storageapi/message/bucketsplitting.h> -#include <vespa/storageapi/message/visitor.h> -#include <vespa/storageapi/message/removelocation.h> -#include <vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h> -#include <tests/distributor/distributortestutil.h> -#include <vespa/document/bucket/fixed_bucket_spaces.h> -#include <vespa/document/fieldset/fieldsets.h> -#include <vespa/document/test/make_document_bucket.h> -#include <vespa/document/test/make_bucket_space.h> -#include <vespa/storage/config/config-stor-distributormanager.h> -#include <vespa/storage/distributor/top_level_distributor.h> -#include <vespa/storage/distributor/distributor_stripe.h> -#include <vespa/storage/distributor/distributor_status.h> -#include <vespa/storage/distributor/distributor_bucket_space.h> -#include <vespa/storage/distributor/distributormetricsset.h> -#include <vespa/vespalib/text/stringtokenizer.h> -#include <vespa/metrics/updatehook.h> -#include <thread> -#include <vespa/vespalib/gtest/gtest.h> -#include <gmock/gmock.h> - -using document::test::makeDocumentBucket; -using document::test::makeBucketSpace; -using document::FixedBucketSpaces; -using document::BucketSpace; -using document::Bucket; -using document::BucketId; -using namespace ::testing; - -namespace storage::distributor { - -// TODO STRIPE: Remove this test when legacy mode is gone. -struct LegacyDistributorTest : Test, DistributorTestUtil { - LegacyDistributorTest(); - ~LegacyDistributorTest() override; - - // TODO handle edge case for window between getnodestate reply already - // sent and new request not yet received - - void assertBucketSpaceStats(size_t expBucketPending, size_t expBucketTotal, uint16_t node, const vespalib::string &bucketSpace, - const BucketSpacesStatsProvider::PerNodeBucketSpacesStats &stats); - std::vector<document::BucketSpace> _bucketSpaces; - - void SetUp() override { - createLinks(); - _bucketSpaces = getBucketSpaces(); - }; - - void TearDown() override { - close(); - } - - // Simple type aliases to make interfacing with certain utility functions - // easier. Note that this is only for readability and does not provide any - // added type safety. - using NodeCount = int; - using Redundancy = int; - - using ConfigBuilder = vespa::config::content::core::StorDistributormanagerConfigBuilder; - - void configureDistributor(const ConfigBuilder& config) { - getConfig().configure(config); - _distributor->enable_next_config_if_changed(); - } - - auto currentReplicaCountingMode() const noexcept { - return _distributor->bucket_db_metric_updater().getMinimumReplicaCountingMode(); - } - - std::string testOp(std::shared_ptr<api::StorageMessage> msg) - { - _distributor->handleMessage(msg); - - std::string tmp = _sender.getCommands(); - _sender.clear(); - return tmp; - } - - void tickDistributorNTimes(uint32_t n) { - for (uint32_t i = 0; i < n; ++i) { - tick(); - } - } - - typedef bool ResetTrusted; - - std::string updateBucketDB(const std::string& firstState, - const std::string& secondState, - bool resetTrusted = false) - { - std::vector<std::string> states(toVector<std::string>(firstState, secondState)); - - for (uint32_t i = 0; i < states.size(); ++i) { - std::vector<uint16_t> removedNodes; - std::vector<BucketCopy> changedNodes; - - vespalib::StringTokenizer tokenizer(states[i], ","); - for (uint32_t j = 0; j < tokenizer.size(); ++j) { - vespalib::StringTokenizer tokenizer2(tokenizer[j], ":"); - - bool trusted = false; - if (tokenizer2.size() > 2) { - trusted = true; - } - - uint16_t node = atoi(tokenizer2[0].data()); - if (tokenizer2[1] == "r") { - removedNodes.push_back(node); - } else { - uint32_t checksum = atoi(tokenizer2[1].data()); - changedNodes.push_back( - BucketCopy( - i + 1, - node, - api::BucketInfo( - checksum, - checksum / 2, - checksum / 4)).setTrusted(trusted)); - } - } - - operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(document::BucketId(16, 1)), removedNodes); - - uint32_t flags(DatabaseUpdate::CREATE_IF_NONEXISTING - | (resetTrusted ? DatabaseUpdate::RESET_TRUSTED : 0)); - - operation_context().update_bucket_database(makeDocumentBucket(document::BucketId(16, 1)), - changedNodes, - flags); - } - - std::string retVal = dumpBucket(document::BucketId(16, 1)); - getBucketDatabase().clear(); - return retVal; - } - - size_t explicit_node_state_reply_send_invocations() const noexcept { - return _node->getNodeStateUpdater().explicit_node_state_reply_send_invocations(); - } - - StatusReporterDelegate& distributor_status_delegate() { - // TODO STRIPE - return _distributor->_stripe->_distributorStatusDelegate; - } - - framework::TickingThreadPool& distributor_thread_pool() { - return _distributor->_threadPool; - } - - const std::vector<std::shared_ptr<DistributorStatus>>& distributor_status_todos() { - // TODO STRIPE - return _distributor->_stripe->_statusToDo; - } - - TopLevelDistributor::MetricUpdateHook distributor_metric_update_hook() { - return _distributor->_metricUpdateHook; - } - - SimpleMaintenanceScanner::PendingMaintenanceStats distributor_maintenance_stats() { - return _distributor->pending_maintenance_stats(); - } - - BucketSpacesStatsProvider::PerNodeBucketSpacesStats distributor_bucket_spaces_stats() { - return _distributor->getBucketSpacesStats(); - } - - DistributorHostInfoReporter& distributor_host_info_reporter() { - return _distributor->_hostInfoReporter; - } - - bool distributor_handle_message(const std::shared_ptr<api::StorageMessage>& msg) { - return _distributor->handleMessage(msg); - } - - uint64_t db_sample_interval_sec() const noexcept { - return std::chrono::duration_cast<std::chrono::seconds>(_distributor->db_memory_sample_interval()).count(); - } - - void configure_stale_reads_enabled(bool enabled) { - ConfigBuilder builder; - builder.allowStaleReadsDuringClusterStateTransitions = enabled; - configureDistributor(builder); - } - - void configure_update_fast_path_restart_enabled(bool enabled) { - ConfigBuilder builder; - builder.restartWithFastUpdatePathIfAllGetTimestampsAreConsistent = enabled; - configureDistributor(builder); - } - - void configure_merge_operations_disabled(bool disabled) { - ConfigBuilder builder; - builder.mergeOperationsDisabled = disabled; - configureDistributor(builder); - } - - void configure_use_weak_internal_read_consistency(bool use_weak) { - ConfigBuilder builder; - builder.useWeakInternalReadConsistencyForClientGets = use_weak; - configureDistributor(builder); - } - - void configure_metadata_update_phase_enabled(bool enabled) { - ConfigBuilder builder; - builder.enableMetadataOnlyFetchPhaseForInconsistentUpdates = enabled; - configureDistributor(builder); - } - - void configure_prioritize_global_bucket_merges(bool enabled) { - ConfigBuilder builder; - builder.prioritizeGlobalBucketMerges = enabled; - configureDistributor(builder); - } - - void configure_max_activation_inhibited_out_of_sync_groups(uint32_t n_groups) { - ConfigBuilder builder; - builder.maxActivationInhibitedOutOfSyncGroups = n_groups; - configureDistributor(builder); - } - - void configureMaxClusterClockSkew(int seconds); - void replyToSingleRequestBucketInfoCommandWith1Bucket(); - void sendDownDummyRemoveCommand(); - void assertSingleBouncedRemoveReplyPresent(); - void assertNoMessageBounced(); - void configure_mutation_sequencing(bool enabled); - void configure_merge_busy_inhibit_duration(int seconds); - void do_test_pending_merge_getnodestate_reply_edge(BucketSpace space); - - void set_up_and_start_get_op_with_stale_reads_enabled(bool enabled); -}; - -LegacyDistributorTest::LegacyDistributorTest() - : Test(), - DistributorTestUtil(), - _bucketSpaces() -{ -} - -LegacyDistributorTest::~LegacyDistributorTest() = default; - -// Migrated to DistributorStripeTest -TEST_F(LegacyDistributorTest, operation_generation) { - setupDistributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1"); - - document::BucketId bid; - addNodesToBucketDB(document::BucketId(16, 1), "0=1/1/1/t"); - - EXPECT_EQ("Remove", testOp(std::make_shared<api::RemoveCommand>( - makeDocumentBucket(bid), - document::DocumentId("id:m:test:n=1:foo"), - api::Timestamp(1234)))); - - auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "foo", "bar", ""); - cmd->addBucketToBeVisited(document::BucketId(16, 1)); - cmd->addBucketToBeVisited(document::BucketId()); - - EXPECT_EQ("Visitor Create", testOp(cmd)); -} - -// Migrated to DistributorStripeTest -TEST_F(LegacyDistributorTest, operations_generated_and_started_without_duplicates) { - setupDistributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1"); - - for (uint32_t i = 0; i < 6; ++i) { - addNodesToBucketDB(document::BucketId(16, i), "0=1"); - } - - tickDistributorNTimes(20); - - ASSERT_FALSE(tick()); - - ASSERT_EQ(6, _sender.commands().size()); -} - - -// Migrated to DistributorStripeTest -// Migrated to TopLevelDistributorTest -TEST_F(LegacyDistributorTest, recovery_mode_on_cluster_state_change) { - setupDistributor(Redundancy(1), NodeCount(2), - "storage:1 .0.s:d distributor:1"); - enableDistributorClusterState("storage:1 distributor:1"); - - EXPECT_TRUE(distributor_is_in_recovery_mode()); - for (uint32_t i = 0; i < 3; ++i) { - addNodesToBucketDB(document::BucketId(16, i), "0=1"); - } - for (int i = 0; i < 3; ++i) { - tick(); - EXPECT_TRUE(distributor_is_in_recovery_mode()); - } - tick(); - EXPECT_FALSE(distributor_is_in_recovery_mode()); - - enableDistributorClusterState("storage:2 distributor:1"); - EXPECT_TRUE(distributor_is_in_recovery_mode()); -} - -TEST_F(LegacyDistributorTest, distributor_considered_initialized_once_self_observed_up) { - setupDistributor(Redundancy(1), NodeCount(2), "distributor:1 .0.s:d storage:1"); // We're down D: - EXPECT_FALSE(_distributor->done_initializing()); - enableDistributorClusterState("distributor:1 storage:1"); // We're up :D - EXPECT_TRUE(_distributor->done_initializing()); - enableDistributorClusterState("distributor:1 .0.s:d storage:1"); // And down again :I but that does not change init state - EXPECT_TRUE(_distributor->done_initializing()); -} - -// Migrated to DistributorStripeTest -TEST_F(LegacyDistributorTest, operations_are_throttled) { - setupDistributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1"); - getConfig().setMinPendingMaintenanceOps(1); - getConfig().setMaxPendingMaintenanceOps(1); - - for (uint32_t i = 0; i < 6; ++i) { - addNodesToBucketDB(document::BucketId(16, i), "0=1"); - } - tickDistributorNTimes(20); - ASSERT_EQ(1, _sender.commands().size()); -} - -// Migrated to DistributorStripeTest -TEST_F(LegacyDistributorTest, handle_unknown_maintenance_reply) { - setupDistributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1"); - - { - auto cmd = std::make_shared<api::SplitBucketCommand>(makeDocumentBucket(document::BucketId(16, 1234))); - auto reply = std::make_shared<api::SplitBucketReply>(*cmd); - ASSERT_TRUE(_distributor->handleReply(reply)); - } - - { - // RemoveLocationReply must be treated as a maintenance reply since - // it's what GC is currently built around. - auto cmd = std::make_shared<api::RemoveLocationCommand>( - "false", makeDocumentBucket(document::BucketId(30, 1234))); - auto reply = std::shared_ptr<api::StorageReply>(cmd->makeReply()); - ASSERT_TRUE(_distributor->handleReply(reply)); - } -} - -// Migrated to TopLevelDistributorTest -TEST_F(LegacyDistributorTest, contains_time_statement) { - setupDistributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1"); - - EXPECT_FALSE(getConfig().containsTimeStatement("")); - EXPECT_FALSE(getConfig().containsTimeStatement("testdoctype1")); - EXPECT_FALSE(getConfig().containsTimeStatement("testdoctype1.headerfield > 42")); - EXPECT_TRUE(getConfig().containsTimeStatement("testdoctype1.headerfield > now()")); - EXPECT_TRUE(getConfig().containsTimeStatement("testdoctype1.headerfield > now() - 3600")); - EXPECT_TRUE(getConfig().containsTimeStatement("testdoctype1.headerfield == now() - 3600")); -} - -// Migrated to DistributorStripeTest -TEST_F(LegacyDistributorTest, update_bucket_database) { - enableDistributorClusterState("distributor:1 storage:3"); - - EXPECT_EQ("BucketId(0x4000000000000001) : " - "node(idx=0,crc=0x1c8,docs=228/228,bytes=114/114,trusted=true,active=false,ready=false), " - "node(idx=1,crc=0x1c8,docs=228/228,bytes=114/114,trusted=true,active=false,ready=false)", - updateBucketDB("0:456,1:456,2:789", "2:r")); - - EXPECT_EQ("BucketId(0x4000000000000001) : " - "node(idx=0,crc=0x1c8,docs=228/228,bytes=114/114,trusted=true,active=false,ready=false), " - "node(idx=2,crc=0x1c8,docs=228/228,bytes=114/114,trusted=true,active=false,ready=false), " - "node(idx=1,crc=0x1c8,docs=228/228,bytes=114/114,trusted=true,active=false,ready=false)", - updateBucketDB("0:456,1:456", "2:456")); - - EXPECT_EQ("BucketId(0x4000000000000001) : " - "node(idx=0,crc=0x315,docs=394/394,bytes=197/197,trusted=false,active=false,ready=false), " - "node(idx=2,crc=0x14d,docs=166/166,bytes=83/83,trusted=false,active=false,ready=false), " - "node(idx=1,crc=0x34a,docs=421/421,bytes=210/210,trusted=false,active=false,ready=false)", - updateBucketDB("0:456:t,1:456:t,2:123", "0:789,1:842,2:333")); - - EXPECT_EQ("BucketId(0x4000000000000001) : " - "node(idx=0,crc=0x315,docs=394/394,bytes=197/197,trusted=true,active=false,ready=false), " - "node(idx=2,crc=0x14d,docs=166/166,bytes=83/83,trusted=false,active=false,ready=false), " - "node(idx=1,crc=0x315,docs=394/394,bytes=197/197,trusted=true,active=false,ready=false)", - updateBucketDB("0:456:t,1:456:t,2:123", "0:789,1:789,2:333")); - - EXPECT_EQ("BucketId(0x4000000000000001) : " - "node(idx=2,crc=0x14d,docs=166/166,bytes=83/83,trusted=true,active=false,ready=false)", - updateBucketDB("0:456:t,1:456:t", "0:r,1:r,2:333")); - - // Copies are in sync so should still be trusted even if explicitly reset. - EXPECT_EQ("BucketId(0x4000000000000001) : " - "node(idx=0,crc=0x1c8,docs=228/228,bytes=114/114,trusted=true,active=false,ready=false), " - "node(idx=2,crc=0x1c8,docs=228/228,bytes=114/114,trusted=true,active=false,ready=false), " - "node(idx=1,crc=0x1c8,docs=228/228,bytes=114/114,trusted=true,active=false,ready=false)", - updateBucketDB("0:456,1:456", "2:456", ResetTrusted(true))); - - // When resetting, first inserted copy should not end up as implicitly trusted. - EXPECT_EQ("BucketId(0x4000000000000001) : " - "node(idx=0,crc=0x1c8,docs=228/228,bytes=114/114,trusted=false,active=false,ready=false), " - "node(idx=2,crc=0x14d,docs=166/166,bytes=83/83,trusted=false,active=false,ready=false)", - updateBucketDB("0:456", "2:333", ResetTrusted(true))); -} - -namespace { - -using namespace framework::defaultimplementation; - -class StatusRequestThread : public framework::Runnable { - StatusReporterDelegate& _reporter; - std::string _result; -public: - explicit StatusRequestThread(StatusReporterDelegate& reporter) - : _reporter(reporter) - {} - void run(framework::ThreadHandle&) override { - framework::HttpUrlPath path("/distributor?page=buckets"); - std::ostringstream stream; - _reporter.reportStatus(stream, path); - _result = stream.str(); - } - - std::string getResult() const { - return _result; - } -}; - -} - -// Migrated to TopLevelDistributorTest -TEST_F(LegacyDistributorTest, tick_processes_status_requests) { - setupDistributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1"); - - addNodesToBucketDB(document::BucketId(16, 1), "0=1/1/1/t"); - - // Must go via delegate since reportStatus is now just a rendering - // function and not a request enqueuer (see Distributor::handleStatusRequest). - StatusRequestThread thread(distributor_status_delegate()); - FakeClock clock; - ThreadPoolImpl pool(clock); - int ticksBeforeWait = 1; - framework::Thread::UP tp(pool.startThread(thread, "statustest", 5ms, 5s, ticksBeforeWait)); - - while (true) { - std::this_thread::sleep_for(1ms); - framework::TickingLockGuard guard(distributor_thread_pool().freezeCriticalTicks()); - if (!distributor_status_todos().empty()) { - break; - } - - } - ASSERT_TRUE(tick()); - - tp->interruptAndJoin(); - - EXPECT_THAT(thread.getResult(), HasSubstr("BucketId(0x4000000000000001)")); -} - -// Migrated to TopLevelDistributorTest -TEST_F(LegacyDistributorTest, metric_update_hook_updates_pending_maintenance_metrics) { - setupDistributor(Redundancy(2), NodeCount(2), "storage:2 distributor:1"); - // To ensure we count all operations, not just those fitting within the - // pending window. - getConfig().setMinPendingMaintenanceOps(1); - getConfig().setMaxPendingMaintenanceOps(1); - - // 1 bucket must be merged, 1 must be split, 1 should be activated. - addNodesToBucketDB(document::BucketId(16, 1), "0=2/2/2/t/a,1=1/1/1"); - addNodesToBucketDB(document::BucketId(16, 2), - "0=100/10000000/200000/t/a,1=100/10000000/200000/t"); - addNodesToBucketDB(document::BucketId(16, 3), - "0=200/300/400/t,1=200/300/400/t"); - - // Go many full scanner rounds to check that metrics are set, not - // added to existing. - tickDistributorNTimes(50); - - // By this point, no hook has been called so the metrics have not been - // set. - using MO = MaintenanceOperation; - { - const IdealStateMetricSet& metrics(getIdealStateManager().getMetrics()); - EXPECT_EQ(0, metrics.operations[MO::MERGE_BUCKET]->pending.getLast()); - EXPECT_EQ(0, metrics.operations[MO::SPLIT_BUCKET]->pending.getLast()); - EXPECT_EQ(0, metrics.operations[MO::SET_BUCKET_STATE]->pending.getLast()); - EXPECT_EQ(0, metrics.operations[MO::DELETE_BUCKET]->pending.getLast()); - EXPECT_EQ(0, metrics.operations[MO::JOIN_BUCKET]->pending.getLast()); - EXPECT_EQ(0, metrics.operations[MO::GARBAGE_COLLECTION]->pending.getLast()); - } - - // Force trigger update hook - std::mutex l; - distributor_metric_update_hook().updateMetrics(metrics::MetricLockGuard(l)); - // Metrics should now be updated to the last complete working state - { - const IdealStateMetricSet& metrics(getIdealStateManager().getMetrics()); - EXPECT_EQ(1, metrics.operations[MO::MERGE_BUCKET]->pending.getLast()); - EXPECT_EQ(1, metrics.operations[MO::SPLIT_BUCKET]->pending.getLast()); - EXPECT_EQ(1, metrics.operations[MO::SET_BUCKET_STATE]->pending.getLast()); - EXPECT_EQ(0, metrics.operations[MO::DELETE_BUCKET]->pending.getLast()); - EXPECT_EQ(0, metrics.operations[MO::JOIN_BUCKET]->pending.getLast()); - EXPECT_EQ(0, metrics.operations[MO::GARBAGE_COLLECTION]->pending.getLast()); - } -} - -// Migrated to TopLevelDistributorTest -TEST_F(LegacyDistributorTest, bucket_db_memory_usage_metrics_only_updated_at_fixed_time_intervals) { - getClock().setAbsoluteTimeInSeconds(1000); - - setupDistributor(Redundancy(2), NodeCount(2), "storage:2 distributor:1"); - addNodesToBucketDB(document::BucketId(16, 1), "0=1/1/1/t/a,1=2/2/2"); - tickDistributorNTimes(10); - - std::mutex l; - distributor_metric_update_hook().updateMetrics(metrics::MetricLockGuard(l)); - auto* m = getDistributor().getMetrics().mutable_dbs.memory_usage.getMetric("used_bytes"); - ASSERT_TRUE(m != nullptr); - auto last_used = m->getLongValue("last"); - EXPECT_GT(last_used, 0); - - // Add another bucket to the DB. This should increase the underlying used number of - // bytes, but this should not be aggregated into the metrics until the sampling time - // interval has passed. Instead, old metric gauge values should be preserved. - addNodesToBucketDB(document::BucketId(16, 2), "0=1/1/1/t/a,1=2/2/2"); - - const auto sample_interval_sec = db_sample_interval_sec(); - getClock().setAbsoluteTimeInSeconds(1000 + sample_interval_sec - 1); // Not there yet. - tickDistributorNTimes(50); - distributor_metric_update_hook().updateMetrics(metrics::MetricLockGuard(l)); - - m = getDistributor().getMetrics().mutable_dbs.memory_usage.getMetric("used_bytes"); - auto now_used = m->getLongValue("last"); - EXPECT_EQ(now_used, last_used); - - getClock().setAbsoluteTimeInSeconds(1000 + sample_interval_sec + 1); - tickDistributorNTimes(10); - distributor_metric_update_hook().updateMetrics(metrics::MetricLockGuard(l)); - - m = getDistributor().getMetrics().mutable_dbs.memory_usage.getMetric("used_bytes"); - now_used = m->getLongValue("last"); - EXPECT_GT(now_used, last_used); -} - -// Migrated to DistributorStripeTest -TEST_F(LegacyDistributorTest, priority_config_is_propagated_to_distributor_configuration) { - using namespace vespa::config::content::core; - - setupDistributor(Redundancy(2), NodeCount(2), "storage:2 distributor:1"); - - ConfigBuilder builder; - builder.priorityMergeMoveToIdealNode = 1; - builder.priorityMergeOutOfSyncCopies = 2; - builder.priorityMergeTooFewCopies = 3; - builder.priorityActivateNoExistingActive = 4; - builder.priorityActivateWithExistingActive = 5; - builder.priorityDeleteBucketCopy = 6; - builder.priorityJoinBuckets = 7; - builder.prioritySplitDistributionBits = 8; - builder.prioritySplitLargeBucket = 9; - builder.prioritySplitInconsistentBucket = 10; - builder.priorityGarbageCollection = 11; - builder.priorityMergeGlobalBuckets = 12; - - getConfig().configure(builder); - - const auto& mp = getConfig().getMaintenancePriorities(); - EXPECT_EQ(1, static_cast<int>(mp.mergeMoveToIdealNode)); - EXPECT_EQ(2, static_cast<int>(mp.mergeOutOfSyncCopies)); - EXPECT_EQ(3, static_cast<int>(mp.mergeTooFewCopies)); - EXPECT_EQ(4, static_cast<int>(mp.activateNoExistingActive)); - EXPECT_EQ(5, static_cast<int>(mp.activateWithExistingActive)); - EXPECT_EQ(6, static_cast<int>(mp.deleteBucketCopy)); - EXPECT_EQ(7, static_cast<int>(mp.joinBuckets)); - EXPECT_EQ(8, static_cast<int>(mp.splitDistributionBits)); - EXPECT_EQ(9, static_cast<int>(mp.splitLargeBucket)); - EXPECT_EQ(10, static_cast<int>(mp.splitInconsistentBucket)); - EXPECT_EQ(11, static_cast<int>(mp.garbageCollection)); - EXPECT_EQ(12, static_cast<int>(mp.mergeGlobalBuckets)); -} - -// Migrated to DistributorStripeTest -// Explicit cluster state edge test added in TopLevelDistributorTest::cluster_state_lifecycle_is_propagated_to_stripes -TEST_F(LegacyDistributorTest, no_db_resurrection_for_bucket_not_owned_in_pending_state) { - setupDistributor(Redundancy(1), NodeCount(10), "storage:2 distributor:2"); - lib::ClusterState newState("storage:10 distributor:10"); - auto stateCmd = std::make_shared<api::SetSystemStateCommand>(newState); - // Force newState into being the pending state. According to the initial - // state we own the bucket, but according to the pending state, we do - // not. This must be handled correctly by the database update code. - getBucketDBUpdater().onSetSystemState(stateCmd); - - document::BucketId nonOwnedBucket(16, 3); - EXPECT_FALSE(getDistributorBucketSpace().get_bucket_ownership_flags(nonOwnedBucket).owned_in_pending_state()); - EXPECT_FALSE(getDistributorBucketSpace().check_ownership_in_pending_and_current_state(nonOwnedBucket).isOwned()); - - std::vector<BucketCopy> copies; - copies.emplace_back(1234, 0, api::BucketInfo(0x567, 1, 2)); - operation_context().update_bucket_database(makeDocumentBucket(nonOwnedBucket), copies, - DatabaseUpdate::CREATE_IF_NONEXISTING); - - EXPECT_EQ("NONEXISTING", dumpBucket(nonOwnedBucket)); -} - -// Migrated to DistributorStripeTest -TEST_F(LegacyDistributorTest, added_db_buckets_without_gc_timestamp_implicitly_get_current_time) { - setupDistributor(Redundancy(1), NodeCount(10), "storage:2 distributor:2"); - getClock().setAbsoluteTimeInSeconds(101234); - document::BucketId bucket(16, 7654); - - std::vector<BucketCopy> copies; - copies.emplace_back(1234, 0, api::BucketInfo(0x567, 1, 2)); - operation_context().update_bucket_database(makeDocumentBucket(bucket), copies, - DatabaseUpdate::CREATE_IF_NONEXISTING); - BucketDatabase::Entry e(getBucket(bucket)); - EXPECT_EQ(101234, e->getLastGarbageCollectionTime()); -} - -// Migrated to DistributorStripeTest -TEST_F(LegacyDistributorTest, merge_stats_are_accumulated_during_database_iteration) { - setupDistributor(Redundancy(2), NodeCount(3), "storage:3 distributor:1"); - // Copies out of sync. Not possible for distributor to _reliably_ tell - // which direction(s) data will flow, so for simplicity assume that we - // must sync both copies. - // Note that we mark certain copies as active to prevent the bucketstate - // checker from pre-empting the merges. - // -> syncing[0] += 1, syncing[2] += 1 - addNodesToBucketDB(document::BucketId(16, 1), "0=1/1/1/t/a,2=2/2/2"); - // Must add missing node 2 for bucket - // -> copyingOut[0] += 1, copyingIn[2] += 1 - addNodesToBucketDB(document::BucketId(16, 2), "0=1/1/1/t/a"); - // Moving from non-ideal node 1 to ideal node 2. Both nodes 0 and 1 will - // be involved in this merge, but only node 1 will be tagged as source only - // (i.e. to be deleted after the merge is completed). - // -> copyingOut[0] += 1, movingOut[1] += 1, copyingIn[2] += 1 - addNodesToBucketDB(document::BucketId(16, 3), "0=2/2/2/t/a,1=2/2/2/t"); - - // Go many full scanner rounds to check that stats are set, not - // added to existing. - tickDistributorNTimes(50); - - const auto& stats = distributor_maintenance_stats(); - { - NodeMaintenanceStats wanted; - wanted.syncing = 1; - wanted.copyingOut = 2; - wanted.total = 3; - EXPECT_EQ(wanted, stats.perNodeStats.forNode(0, makeBucketSpace())); - } - { - NodeMaintenanceStats wanted; - wanted.movingOut = 1; - wanted.total = 1; - EXPECT_EQ(wanted, stats.perNodeStats.forNode(1, makeBucketSpace())); - } - { - NodeMaintenanceStats wanted; - wanted.syncing = 1; - wanted.copyingIn = 2; - wanted.total = 1; - EXPECT_EQ(wanted, stats.perNodeStats.forNode(2, makeBucketSpace())); - } - auto bucketStats = distributor_bucket_spaces_stats(); - ASSERT_EQ(3, bucketStats.size()); - assertBucketSpaceStats(1, 3, 0, "default", bucketStats); - assertBucketSpaceStats(0, 1, 1, "default", bucketStats); - assertBucketSpaceStats(3, 1, 2, "default", bucketStats); -} - -void -LegacyDistributorTest::assertBucketSpaceStats(size_t expBucketPending, size_t expBucketTotal, uint16_t node, - const vespalib::string& bucketSpace, - const BucketSpacesStatsProvider::PerNodeBucketSpacesStats& stats) -{ - auto nodeItr = stats.find(node); - ASSERT_TRUE(nodeItr != stats.end()); - ASSERT_EQ(1, nodeItr->second.size()); - auto bucketSpaceItr = nodeItr->second.find(bucketSpace); - ASSERT_TRUE(bucketSpaceItr != nodeItr->second.end()); - ASSERT_TRUE(bucketSpaceItr->second.valid()); - ASSERT_EQ(expBucketTotal, bucketSpaceItr->second.bucketsTotal()); - ASSERT_EQ(expBucketPending, bucketSpaceItr->second.bucketsPending()); -} - -/** - * Since maintenance operations are prioritized differently, activation - * pre-empts merging and other ops. If this also implies pre-empting running - * their state checkers at all, we won't get any statistics from any other - * operations for the bucket. - */ -// Migrated to DistributorStripeTest -TEST_F(LegacyDistributorTest, stats_generated_for_preempted_operations) { - setupDistributor(Redundancy(2), NodeCount(2), "storage:2 distributor:1"); - // For this test it suffices to have a single bucket with multiple aspects - // wrong about it. In this case, let a bucket be both out of sync _and_ - // missing an active copy. This _should_ give a statistic with both nodes 0 - // and 1 requiring a sync. If instead merge stats generation is preempted - // by activation, we'll see no merge stats at all. - addNodesToBucketDB(document::BucketId(16, 1), "0=1/1/1,1=2/2/2"); - tickDistributorNTimes(50); - const auto& stats = distributor_maintenance_stats(); - { - NodeMaintenanceStats wanted; - wanted.syncing = 1; - wanted.total = 1; - EXPECT_EQ(wanted, stats.perNodeStats.forNode(0, makeBucketSpace())); - } - { - NodeMaintenanceStats wanted; - wanted.syncing = 1; - wanted.total = 1; - EXPECT_EQ(wanted, stats.perNodeStats.forNode(1, makeBucketSpace())); - } -} - -// Migrated to TopLevelDistributorTest -TEST_F(LegacyDistributorTest, host_info_reporter_config_is_propagated_to_reporter) { - setupDistributor(Redundancy(2), NodeCount(2), "storage:2 distributor:1"); - - // Default is enabled=true. - EXPECT_TRUE(distributor_host_info_reporter().isReportingEnabled()); - - ConfigBuilder builder; - builder.enableHostInfoReporting = false; - configureDistributor(builder); - - EXPECT_FALSE(distributor_host_info_reporter().isReportingEnabled()); -} - -// Migrated to DistributorStripeTest -TEST_F(LegacyDistributorTest, replica_counting_mode_is_configured_to_trusted_by_default) { - setupDistributor(Redundancy(2), NodeCount(2), "storage:2 distributor:1"); - EXPECT_EQ(ConfigBuilder::MinimumReplicaCountingMode::TRUSTED, currentReplicaCountingMode()); -} - -// Migrated to DistributorStripeTest -TEST_F(LegacyDistributorTest, replica_counting_mode_config_is_propagated_to_metric_updater) { - setupDistributor(Redundancy(2), NodeCount(2), "storage:2 distributor:1"); - ConfigBuilder builder; - builder.minimumReplicaCountingMode = ConfigBuilder::MinimumReplicaCountingMode::ANY; - configureDistributor(builder); - EXPECT_EQ(ConfigBuilder::MinimumReplicaCountingMode::ANY, currentReplicaCountingMode()); -} - -// Migrated to DistributorStripeTest -TEST_F(LegacyDistributorTest, max_consecutively_inhibited_maintenance_ticks_config_is_propagated_to_internal_config) { - setupDistributor(Redundancy(2), NodeCount(2), "storage:2 distributor:1"); - ConfigBuilder builder; - builder.maxConsecutivelyInhibitedMaintenanceTicks = 123; - getConfig().configure(builder); - EXPECT_EQ(getConfig().max_consecutively_inhibited_maintenance_ticks(), 123); -} - -// Migrated to DistributorStripeTest -TEST_F(LegacyDistributorTest, bucket_activation_is_enabled_by_default) { - setupDistributor(Redundancy(2), NodeCount(2), "storage:2 distributor:1"); - EXPECT_FALSE(getConfig().isBucketActivationDisabled()); -} - -// Migrated to DistributorStripeTest -TEST_F(LegacyDistributorTest, bucket_activation_config_is_propagated_to_distributor_configuration) { - using namespace vespa::config::content::core; - - setupDistributor(Redundancy(2), NodeCount(2), "storage:2 distributor:1"); - - ConfigBuilder builder; - builder.disableBucketActivation = true; - getConfig().configure(builder); - - EXPECT_TRUE(getConfig().isBucketActivationDisabled()); -} - -void -LegacyDistributorTest::configureMaxClusterClockSkew(int seconds) { - using namespace vespa::config::content::core; - - ConfigBuilder builder; - builder.maxClusterClockSkewSec = seconds; - getConfig().configure(builder); - _distributor->enable_next_config_if_changed(); -} - -// Migrated to DistributorStripeTest -TEST_F(LegacyDistributorTest, max_clock_skew_config_is_propagated_to_distributor_config) { - setupDistributor(Redundancy(2), NodeCount(2), "storage:2 distributor:1"); - - configureMaxClusterClockSkew(5); - EXPECT_EQ(getConfig().getMaxClusterClockSkew(), std::chrono::seconds(5)); -} - -namespace { - -auto makeDummyRemoveCommand() { - return std::make_shared<api::RemoveCommand>( - makeDocumentBucket(document::BucketId(0)), - document::DocumentId("id:foo:testdoctype1:n=1:foo"), - api::Timestamp(0)); -} - -auto make_dummy_get_command_for_bucket_1() { - return std::make_shared<api::GetCommand>( - makeDocumentBucket(document::BucketId(0)), - document::DocumentId("id:foo:testdoctype1:n=1:foo"), - document::AllFields::NAME); -} - -} - -void LegacyDistributorTest::replyToSingleRequestBucketInfoCommandWith1Bucket() { - ASSERT_EQ(_bucketSpaces.size(), _sender.commands().size()); - for (uint32_t i = 0; i < _sender.commands().size(); ++i) { - ASSERT_EQ(api::MessageType::REQUESTBUCKETINFO, _sender.command(i)->getType()); - auto& bucketReq(static_cast<api::RequestBucketInfoCommand&> - (*_sender.command(i))); - auto bucketReply = bucketReq.makeReply(); - if (bucketReq.getBucketSpace() == FixedBucketSpaces::default_space()) { - // Make sure we have a bucket to route our remove op to, or we'd get - // an immediate reply anyway. - dynamic_cast<api::RequestBucketInfoReply&>(*bucketReply) - .getBucketInfo().push_back( - api::RequestBucketInfoReply::Entry(document::BucketId(1, 1), - api::BucketInfo(20, 10, 12, 50, 60, true, true))); - } - _distributor->handleMessage(std::move(bucketReply)); - } - _sender.commands().clear(); -} - -void LegacyDistributorTest::sendDownDummyRemoveCommand() { - _distributor->handleMessage(makeDummyRemoveCommand()); -} - -void LegacyDistributorTest::assertSingleBouncedRemoveReplyPresent() { - ASSERT_EQ(1, _sender.replies().size()); // Rejected remove - ASSERT_EQ(api::MessageType::REMOVE_REPLY, _sender.reply(0)->getType()); - auto& reply(static_cast<api::RemoveReply&>(*_sender.reply(0))); - ASSERT_EQ(api::ReturnCode::STALE_TIMESTAMP, reply.getResult().getResult()); - _sender.replies().clear(); -} - -void LegacyDistributorTest::assertNoMessageBounced() { - ASSERT_EQ(0, _sender.replies().size()); -} - -// Migrated to TopLevelDistributorTest -TEST_F(LegacyDistributorTest, configured_safe_time_point_rejection_works_end_to_end) { - setupDistributor(Redundancy(2), NodeCount(2), - "bits:1 storage:1 distributor:2"); - getClock().setAbsoluteTimeInSeconds(1000); - configureMaxClusterClockSkew(10); - - receive_set_system_state_command("bits:1 storage:1 distributor:1"); - ASSERT_NO_FATAL_FAILURE(replyToSingleRequestBucketInfoCommandWith1Bucket()); - // SetSystemStateCommand sent down chain at this point. - sendDownDummyRemoveCommand(); - ASSERT_NO_FATAL_FAILURE(assertSingleBouncedRemoveReplyPresent()); - - // Increment time to first whole second of clock + 10 seconds of skew. - // Should now not get any feed rejections. - getClock().setAbsoluteTimeInSeconds(1011); - - sendDownDummyRemoveCommand(); - ASSERT_NO_FATAL_FAILURE(assertNoMessageBounced()); -} - -void LegacyDistributorTest::configure_mutation_sequencing(bool enabled) { - using namespace vespa::config::content::core; - - ConfigBuilder builder; - builder.sequenceMutatingOperations = enabled; - getConfig().configure(builder); - _distributor->enable_next_config_if_changed(); -} - -// Migrated to DistributorStripeTest -TEST_F(LegacyDistributorTest, sequencing_config_is_propagated_to_distributor_config) { - setupDistributor(Redundancy(2), NodeCount(2), "storage:2 distributor:1"); - - // Should be enabled by default - EXPECT_TRUE(getConfig().getSequenceMutatingOperations()); - - // Explicitly disabled. - configure_mutation_sequencing(false); - EXPECT_FALSE(getConfig().getSequenceMutatingOperations()); - - // Explicitly enabled. - configure_mutation_sequencing(true); - EXPECT_TRUE(getConfig().getSequenceMutatingOperations()); -} - -void -LegacyDistributorTest::configure_merge_busy_inhibit_duration(int seconds) { - using namespace vespa::config::content::core; - - ConfigBuilder builder; - builder.inhibitMergeSendingOnBusyNodeDurationSec = seconds; - getConfig().configure(builder); - _distributor->enable_next_config_if_changed(); -} - -// Migrated to DistributorStripeTest -TEST_F(LegacyDistributorTest, merge_busy_inhibit_duration_config_is_propagated_to_distributor_config) { - setupDistributor(Redundancy(2), NodeCount(2), "storage:2 distributor:1"); - - configure_merge_busy_inhibit_duration(7); - EXPECT_EQ(getConfig().getInhibitMergesOnBusyNodeDuration(), std::chrono::seconds(7)); -} - -// Migrated to DistributorStripeTest -TEST_F(LegacyDistributorTest, merge_busy_inhibit_duration_is_propagated_to_pending_message_tracker) { - setupDistributor(Redundancy(2), NodeCount(2), "storage:1 distributor:1"); - addNodesToBucketDB(document::BucketId(16, 1), "0=1/1/1/t"); - - configure_merge_busy_inhibit_duration(100); - auto cmd = makeDummyRemoveCommand(); // Remove is for bucket 1 - distributor_handle_message(cmd); - - // Should send to content node 0 - ASSERT_EQ(1, _sender.commands().size()); - ASSERT_EQ(api::MessageType::REMOVE, _sender.command(0)->getType()); - auto& fwd_cmd = dynamic_cast<api::RemoveCommand&>(*_sender.command(0)); - auto reply = fwd_cmd.makeReply(); - reply->setResult(api::ReturnCode(api::ReturnCode::BUSY)); - _distributor->handleReply(std::shared_ptr<api::StorageReply>(std::move(reply))); - - auto& node_info = pending_message_tracker().getNodeInfo(); - - EXPECT_TRUE(node_info.isBusy(0)); - getClock().addSecondsToTime(99); - EXPECT_TRUE(node_info.isBusy(0)); - getClock().addSecondsToTime(2); - EXPECT_FALSE(node_info.isBusy(0)); -} - -// Migrated to DistributorStripeTest -TEST_F(LegacyDistributorTest, external_client_requests_are_handled_individually_in_priority_order) { - setupDistributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1"); - addNodesToBucketDB(document::BucketId(16, 1), "0=1/1/1/t/a"); - - std::vector<api::StorageMessage::Priority> priorities({50, 255, 10, 40, 0}); - document::DocumentId id("id:foo:testdoctype1:n=1:foo"); - vespalib::stringref field_set = ""; - for (auto pri : priorities) { - auto cmd = std::make_shared<api::GetCommand>(makeDocumentBucket(document::BucketId()), id, field_set); - cmd->setPriority(pri); - // onDown appends to internal message FIFO queue, awaiting hand-off. - _distributor->onDown(cmd); - } - // At the hand-off point we expect client requests to be prioritized. - // For each tick, a priority-order client request is processed and sent off. - for (size_t i = 1; i <= priorities.size(); ++i) { - tickDistributorNTimes(1); - ASSERT_EQ(i, _sender.commands().size()); - } - - std::vector<int> expected({0, 10, 40, 50, 255}); - std::vector<int> actual; - for (auto& msg : _sender.commands()) { - actual.emplace_back(static_cast<int>(msg->getPriority())); - } - EXPECT_THAT(actual, ContainerEq(expected)); -} - -// Migrated to DistributorStripeTest -TEST_F(LegacyDistributorTest, internal_messages_are_started_in_fifo_order_batch) { - // To test internal request ordering, we use NotifyBucketChangeCommand - // for the reason that it explicitly updates the bucket database for - // each individual invocation. - setupDistributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1"); - document::BucketId bucket(16, 1); - addNodesToBucketDB(bucket, "0=1/1/1/t"); - - std::vector<api::StorageMessage::Priority> priorities({50, 255, 10, 40, 1}); - for (auto pri : priorities) { - api::BucketInfo fake_info(pri, pri, pri); - auto cmd = std::make_shared<api::NotifyBucketChangeCommand>(makeDocumentBucket(bucket), fake_info); - cmd->setSourceIndex(0); - cmd->setPriority(pri); - _distributor->onDown(cmd); - } - - // Doing a single tick should process all internal requests in one batch - tickDistributorNTimes(1); - ASSERT_EQ(5, _sender.replies().size()); - - // The bucket info for priority 1 (last FIFO-order change command received, but - // highest priority) should be the end-state of the bucket database, _not_ that - // of lowest priority 255. - BucketDatabase::Entry e(getBucket(bucket)); - EXPECT_EQ(api::BucketInfo(1, 1, 1), e.getBucketInfo().getNode(0)->getBucketInfo()); -} - -// Migrated to DistributorStripeTest -TEST_F(LegacyDistributorTest, closing_aborts_priority_queued_client_requests) { - setupDistributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1"); - document::BucketId bucket(16, 1); - addNodesToBucketDB(bucket, "0=1/1/1/t"); - - document::DocumentId id("id:foo:testdoctype1:n=1:foo"); - vespalib::stringref field_set = ""; - for (int i = 0; i < 10; ++i) { - auto cmd = std::make_shared<api::GetCommand>(makeDocumentBucket(document::BucketId()), id, field_set); - _distributor->onDown(cmd); - } - tickDistributorNTimes(1); - // Closing should trigger 1 abort via startet GetOperation and 9 aborts from pri queue - _distributor->close(); - ASSERT_EQ(10, _sender.replies().size()); - for (auto& msg : _sender.replies()) { - EXPECT_EQ(api::ReturnCode::ABORTED, dynamic_cast<api::StorageReply&>(*msg).getResult().getResult()); - } -} - -namespace { - -void assert_invalid_stats_for_all_spaces( - const BucketSpacesStatsProvider::PerNodeBucketSpacesStats& stats, - uint16_t node_index) { - auto stats_iter = stats.find(node_index); - ASSERT_TRUE(stats_iter != stats.cend()); - ASSERT_EQ(2, stats_iter->second.size()); - auto space_iter = stats_iter->second.find(document::FixedBucketSpaces::default_space_name()); - ASSERT_TRUE(space_iter != stats_iter->second.cend()); - ASSERT_FALSE(space_iter->second.valid()); - space_iter = stats_iter->second.find(document::FixedBucketSpaces::global_space_name()); - ASSERT_TRUE(space_iter != stats_iter->second.cend()); - ASSERT_FALSE(space_iter->second.valid()); -} - -} - -// Migrated to DistributorStripeTest -// Cross-stripe bucket stats test added in TopLevelDistributorTest::entering_recovery_mode_resets_bucket_space_stats_across_all_stripes -TEST_F(LegacyDistributorTest, entering_recovery_mode_resets_bucket_space_stats) { - // Set up a cluster state + DB contents which implies merge maintenance ops - setupDistributor(Redundancy(2), NodeCount(2), "version:1 distributor:1 storage:2"); - addNodesToBucketDB(document::BucketId(16, 1), "0=1/1/1/t/a"); - addNodesToBucketDB(document::BucketId(16, 2), "0=1/1/1/t/a"); - addNodesToBucketDB(document::BucketId(16, 3), "0=2/2/2/t/a"); - - tickDistributorNTimes(5); // 1/3rds into second round through database - - enableDistributorClusterState("version:2 distributor:1 storage:3 .1.s:d"); - EXPECT_TRUE(distributor_is_in_recovery_mode()); - // Bucket space stats should now be invalid per space per node, pending stats - // from state version 2. Exposing stats from version 1 risks reporting stale - // information back to the cluster controller. - const auto stats = distributor_bucket_spaces_stats(); - ASSERT_EQ(2, stats.size()); - - assert_invalid_stats_for_all_spaces(stats, 0); - assert_invalid_stats_for_all_spaces(stats, 2); -} - -// Migrated to TopLevelDistributorTest -TEST_F(LegacyDistributorTest, leaving_recovery_mode_immediately_sends_getnodestate_replies) { - setupDistributor(Redundancy(2), NodeCount(2), "version:1 distributor:1 storage:2"); - // Should not send explicit replies during init stage - ASSERT_EQ(0, explicit_node_state_reply_send_invocations()); - // Add a couple of buckets so we have something to iterate over - addNodesToBucketDB(document::BucketId(16, 1), "0=1/1/1/t/a"); - addNodesToBucketDB(document::BucketId(16, 2), "0=1/1/1/t/a"); - - enableDistributorClusterState("version:2 distributor:1 storage:3 .1.s:d"); - EXPECT_TRUE(distributor_is_in_recovery_mode()); - EXPECT_EQ(0, explicit_node_state_reply_send_invocations()); - tickDistributorNTimes(1); // DB round not yet complete - EXPECT_EQ(0, explicit_node_state_reply_send_invocations()); - tickDistributorNTimes(2); // DB round complete after 2nd bucket + "scan done" discovery tick - EXPECT_EQ(1, explicit_node_state_reply_send_invocations()); - EXPECT_FALSE(distributor_is_in_recovery_mode()); - // Now out of recovery mode, subsequent round completions should not send replies - tickDistributorNTimes(10); - EXPECT_EQ(1, explicit_node_state_reply_send_invocations()); -} - -void LegacyDistributorTest::do_test_pending_merge_getnodestate_reply_edge(BucketSpace space) { - setupDistributor(Redundancy(2), NodeCount(2), "version:1 distributor:1 storage:2"); - EXPECT_TRUE(distributor_is_in_recovery_mode()); - // 2 buckets with missing replicas triggering merge pending stats - addNodesToBucketDB(Bucket(space, BucketId(16, 1)), "0=1/1/1/t/a"); - addNodesToBucketDB(Bucket(space, BucketId(16, 2)), "0=1/1/1/t/a"); - tickDistributorNTimes(3); - EXPECT_FALSE(distributor_is_in_recovery_mode()); - const auto space_name = FixedBucketSpaces::to_string(space); - assertBucketSpaceStats(2, 0, 1, space_name, _distributor->getBucketSpacesStats()); - // First completed scan sends off merge stats et al to cluster controller - EXPECT_EQ(1, explicit_node_state_reply_send_invocations()); - - // Edge not triggered when 1 bucket with missing replica left - addNodesToBucketDB(Bucket(space, BucketId(16, 1)), "0=1/1/1/t/a,1=1/1/1/t"); - tickDistributorNTimes(3); - assertBucketSpaceStats(1, 1, 1, space_name, _distributor->getBucketSpacesStats()); - EXPECT_EQ(1, explicit_node_state_reply_send_invocations()); - - // Edge triggered when no more buckets with requiring merge - addNodesToBucketDB(Bucket(space, BucketId(16, 2)), "0=1/1/1/t/a,1=1/1/1/t"); - tickDistributorNTimes(3); - assertBucketSpaceStats(0, 2, 1, space_name, _distributor->getBucketSpacesStats()); - EXPECT_EQ(2, explicit_node_state_reply_send_invocations()); - - // Should only send when edge happens, not in subsequent DB iterations - tickDistributorNTimes(10); - EXPECT_EQ(2, explicit_node_state_reply_send_invocations()); - - // Going back to merges pending should _not_ send a getnodestate reply (at least for now) - addNodesToBucketDB(Bucket(space, BucketId(16, 1)), "0=1/1/1/t/a"); - tickDistributorNTimes(3); - assertBucketSpaceStats(1, 1, 1, space_name, _distributor->getBucketSpacesStats()); - EXPECT_EQ(2, explicit_node_state_reply_send_invocations()); -} - -// TODO: rewrite into DistributorStripeTest -TEST_F(LegacyDistributorTest, pending_to_no_pending_default_merges_edge_immediately_sends_getnodestate_replies) { - do_test_pending_merge_getnodestate_reply_edge(FixedBucketSpaces::default_space()); -} - -// TODO: rewrite into DistributorStripeTest -TEST_F(LegacyDistributorTest, pending_to_no_pending_global_merges_edge_immediately_sends_getnodestate_replies) { - do_test_pending_merge_getnodestate_reply_edge(FixedBucketSpaces::global_space()); -} - -// Migrated to DistributorStripeTest -TEST_F(LegacyDistributorTest, stale_reads_config_is_propagated_to_external_operation_handler) { - createLinks(); - setupDistributor(Redundancy(1), NodeCount(1), "distributor:1 storage:1"); - - configure_stale_reads_enabled(true); - EXPECT_TRUE(getExternalOperationHandler().concurrent_gets_enabled()); - - configure_stale_reads_enabled(false); - EXPECT_FALSE(getExternalOperationHandler().concurrent_gets_enabled()); -} - -// Migrated to DistributorStripeTest -TEST_F(LegacyDistributorTest, fast_path_on_consistent_gets_config_is_propagated_to_internal_config) { - createLinks(); - setupDistributor(Redundancy(1), NodeCount(1), "distributor:1 storage:1"); - - configure_update_fast_path_restart_enabled(true); - EXPECT_TRUE(getConfig().update_fast_path_restart_enabled()); - - configure_update_fast_path_restart_enabled(false); - EXPECT_FALSE(getConfig().update_fast_path_restart_enabled()); -} - -// Migrated to DistributorStripeTest -TEST_F(LegacyDistributorTest, merge_disabling_config_is_propagated_to_internal_config) { - createLinks(); - setupDistributor(Redundancy(1), NodeCount(1), "distributor:1 storage:1"); - - configure_merge_operations_disabled(true); - EXPECT_TRUE(getConfig().merge_operations_disabled()); - - configure_merge_operations_disabled(false); - EXPECT_FALSE(getConfig().merge_operations_disabled()); -} - -// Migrated to DistributorStripeTest -TEST_F(LegacyDistributorTest, metadata_update_phase_config_is_propagated_to_internal_config) { - createLinks(); - setupDistributor(Redundancy(1), NodeCount(1), "distributor:1 storage:1"); - - configure_metadata_update_phase_enabled(true); - EXPECT_TRUE(getConfig().enable_metadata_only_fetch_phase_for_inconsistent_updates()); - - configure_metadata_update_phase_enabled(false); - EXPECT_FALSE(getConfig().enable_metadata_only_fetch_phase_for_inconsistent_updates()); -} - -// Migrated to DistributorStripeTest -TEST_F(LegacyDistributorTest, weak_internal_read_consistency_config_is_propagated_to_internal_configs) { - createLinks(); - setupDistributor(Redundancy(1), NodeCount(1), "distributor:1 storage:1"); - - configure_use_weak_internal_read_consistency(true); - EXPECT_TRUE(getConfig().use_weak_internal_read_consistency_for_client_gets()); - EXPECT_TRUE(getExternalOperationHandler().use_weak_internal_read_consistency_for_gets()); - - configure_use_weak_internal_read_consistency(false); - EXPECT_FALSE(getConfig().use_weak_internal_read_consistency_for_client_gets()); - EXPECT_FALSE(getExternalOperationHandler().use_weak_internal_read_consistency_for_gets()); -} - -void LegacyDistributorTest::set_up_and_start_get_op_with_stale_reads_enabled(bool enabled) { - createLinks(); - setupDistributor(Redundancy(1), NodeCount(1), "distributor:1 storage:1"); - configure_stale_reads_enabled(enabled); - - document::BucketId bucket(16, 1); - addNodesToBucketDB(bucket, "0=1/1/1/t"); - _distributor->onDown(make_dummy_get_command_for_bucket_1()); -} - -// Migrated to DistributorStripeTest -TEST_F(LegacyDistributorTest, gets_are_started_outside_main_distributor_logic_if_stale_reads_enabled) { - set_up_and_start_get_op_with_stale_reads_enabled(true); - ASSERT_THAT(_sender.commands(), SizeIs(1)); - EXPECT_THAT(_sender.replies(), SizeIs(0)); - - // Reply is routed to the correct owner - auto reply = std::shared_ptr<api::StorageReply>(_sender.command(0)->makeReply()); - _distributor->onDown(reply); - ASSERT_THAT(_sender.commands(), SizeIs(1)); - EXPECT_THAT(_sender.replies(), SizeIs(1)); -} - -// Migrated to DistributorStripeTest -TEST_F(LegacyDistributorTest, gets_are_not_started_outside_main_distributor_logic_if_stale_reads_disabled) { - set_up_and_start_get_op_with_stale_reads_enabled(false); - // Get has been placed into distributor queue, so no external messages are produced. - EXPECT_THAT(_sender.commands(), SizeIs(0)); - EXPECT_THAT(_sender.replies(), SizeIs(0)); -} - -// Migrated to DistributorStripeTest -// There's no need or desire to track "lockfree" Gets in the main pending message tracker, -// as we only have to track mutations to inhibit maintenance ops safely. Furthermore, -// the message tracker is a multi-index and therefore has some runtime cost. -TEST_F(LegacyDistributorTest, gets_started_outside_main_thread_are_not_tracked_by_main_pending_message_tracker) { - set_up_and_start_get_op_with_stale_reads_enabled(true); - Bucket bucket(FixedBucketSpaces::default_space(), BucketId(16, 1)); - EXPECT_FALSE(pending_message_tracker().hasPendingMessage( - 0, bucket, api::MessageType::GET_ID)); -} - -// Migrated to DistributorStripeTest -TEST_F(LegacyDistributorTest, closing_aborts_gets_started_outside_main_distributor_thread) { - set_up_and_start_get_op_with_stale_reads_enabled(true); - _distributor->close(); - ASSERT_EQ(1, _sender.replies().size()); - EXPECT_EQ(api::ReturnCode::ABORTED, _sender.reply(0)->getResult().getResult()); -} - -// Migrated to DistributorStripeTest -TEST_F(LegacyDistributorTest, prioritize_global_bucket_merges_config_is_propagated_to_internal_config) { - createLinks(); - setupDistributor(Redundancy(1), NodeCount(1), "distributor:1 storage:1"); - - configure_prioritize_global_bucket_merges(true); - EXPECT_TRUE(getConfig().prioritize_global_bucket_merges()); - - configure_prioritize_global_bucket_merges(false); - EXPECT_FALSE(getConfig().prioritize_global_bucket_merges()); -} - -// Migrated to DistributorStripeTest -TEST_F(LegacyDistributorTest, max_activation_inhibited_out_of_sync_groups_config_is_propagated_to_internal_config) { - createLinks(); - setupDistributor(Redundancy(1), NodeCount(1), "distributor:1 storage:1"); - - configure_max_activation_inhibited_out_of_sync_groups(3); - EXPECT_EQ(getConfig().max_activation_inhibited_out_of_sync_groups(), 3); - - configure_max_activation_inhibited_out_of_sync_groups(0); - EXPECT_EQ(getConfig().max_activation_inhibited_out_of_sync_groups(), 0); -} - -// Migrated to DistributorStripeTest -TEST_F(LegacyDistributorTest, wanted_split_bit_count_is_lower_bounded) { - createLinks(); - setupDistributor(Redundancy(1), NodeCount(1), "distributor:1 storage:1"); - - ConfigBuilder builder; - builder.minsplitcount = 7; - configureDistributor(builder); - - EXPECT_EQ(getConfig().getMinimalBucketSplit(), 8); -} - -// Migrated to TopLevelDistributorTest -TEST_F(LegacyDistributorTest, host_info_sent_immediately_once_all_stripes_first_reported) { - set_num_distributor_stripes(4); - createLinks(); - getClock().setAbsoluteTimeInSeconds(1000); - // TODO STRIPE can't call this currently since it touches the bucket DB updater directly: - // setupDistributor(Redundancy(2), NodeCount(2), "version:1 distributor:1 storage:2"); - - tickDistributorNTimes(1); - EXPECT_EQ(0, explicit_node_state_reply_send_invocations()); // Nothing yet - getDistributor().notify_stripe_wants_to_send_host_info(1); - getDistributor().notify_stripe_wants_to_send_host_info(2); - getDistributor().notify_stripe_wants_to_send_host_info(3); - - tickDistributorNTimes(1); - // Still nothing. Missing initial report from stripe 0 - EXPECT_EQ(0, explicit_node_state_reply_send_invocations()); - - getDistributor().notify_stripe_wants_to_send_host_info(0); - tickDistributorNTimes(1); - // All stripes have reported in, it's time to party! - EXPECT_EQ(1, explicit_node_state_reply_send_invocations()); - - // No further sends if stripes haven't requested it yet. - getClock().setAbsoluteTimeInSeconds(2000); - tickDistributorNTimes(10); - EXPECT_EQ(1, explicit_node_state_reply_send_invocations()); -} - -// Migrated to TopLevelDistributorTest -TEST_F(LegacyDistributorTest, non_bootstrap_host_info_send_request_delays_sending) { - set_num_distributor_stripes(4); - createLinks(); - getClock().setAbsoluteTimeInSeconds(1000); - - for (uint16_t i = 0; i < 4; ++i) { - getDistributor().notify_stripe_wants_to_send_host_info(i); - } - tickDistributorNTimes(1); - // Bootstrap case - EXPECT_EQ(1, explicit_node_state_reply_send_invocations()); - - // Stripe 1 suddenly really wants to tell the cluster controller something again - getDistributor().notify_stripe_wants_to_send_host_info(1); - tickDistributorNTimes(1); - // But its cry for attention is not yet honored since the delay hasn't passed. - EXPECT_EQ(1, explicit_node_state_reply_send_invocations()); - - getClock().addMilliSecondsToTime(999); - tickDistributorNTimes(1); - // 1 sec delay has still not passed - EXPECT_EQ(1, explicit_node_state_reply_send_invocations()); - - getClock().addMilliSecondsToTime(1); - tickDistributorNTimes(1); - // But now it has - EXPECT_EQ(2, explicit_node_state_reply_send_invocations()); -} - -} diff --git a/storage/src/tests/distributor/statusreporterdelegatetest.cpp b/storage/src/tests/distributor/statusreporterdelegatetest.cpp index 9e66f1920e2..3cac901619e 100644 --- a/storage/src/tests/distributor/statusreporterdelegatetest.cpp +++ b/storage/src/tests/distributor/statusreporterdelegatetest.cpp @@ -1,7 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <tests/common/testhelper.h> -#include <tests/distributor/distributortestutil.h> +#include <tests/common/teststorageapp.h> #include <vespa/storage/distributor/statusreporterdelegate.h> #include <vespa/vespalib/gtest/gtest.h> diff --git a/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp b/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp index 01f7d5a4f0a..d57ce228908 100644 --- a/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp +++ b/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp @@ -630,7 +630,7 @@ TopLevelBucketDBUpdaterTest::trigger_completed_but_not_yet_activated_transition( } TEST_F(TopLevelBucketDBUpdaterTest, normal_usage) { - set_cluster_state(lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3")); + set_cluster_state(lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3")); // FIXME init mode why? ASSERT_EQ(message_count(3), _sender.commands().size()); @@ -638,21 +638,21 @@ TEST_F(TopLevelBucketDBUpdaterTest, normal_usage) { ASSERT_EQ(_component->getDistribution()->getNodeGraph().getDistributionConfigHash(), dynamic_cast<const RequestBucketInfoCommand&>(*_sender.command(0)).getDistributionHash()); - ASSERT_NO_FATAL_FAILURE(fake_bucket_reply(lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3"), + ASSERT_NO_FATAL_FAILURE(fake_bucket_reply(lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3"), // FIXME init mode why? *_sender.command(0), 10)); _sender.clear(); // Optimization for not refetching unneeded data after cluster state // change is only implemented after completion of previous cluster state - set_cluster_state("distributor:2 .0.s:i storage:3"); + set_cluster_state("distributor:2 .0.s:i storage:3"); // FIXME init mode why? ASSERT_EQ(message_count(3), _sender.commands().size()); // Expect reply of first set SystemState request. ASSERT_EQ(size_t(1), _sender.replies().size()); ASSERT_NO_FATAL_FAILURE(complete_bucket_info_gathering( - lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3"), + lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3"), // FIXME init mode why? message_count(3), 10)); ASSERT_NO_FATAL_FAILURE(assert_correct_buckets(10, "distributor:2 storage:3")); } @@ -661,9 +661,9 @@ TEST_F(TopLevelBucketDBUpdaterTest, distributor_change) { int num_buckets = 100; // First sends request - set_cluster_state("distributor:2 .0.s:i .1.s:i storage:3"); + set_cluster_state("distributor:2 .0.s:i .1.s:i storage:3"); // FIXME init mode why? ASSERT_EQ(message_count(3), _sender.commands().size()); - ASSERT_NO_FATAL_FAILURE(complete_bucket_info_gathering(lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3"), + ASSERT_NO_FATAL_FAILURE(complete_bucket_info_gathering(lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3"), // FIXME init mode why? message_count(3), num_buckets)); _sender.clear(); @@ -718,14 +718,14 @@ TEST_F(TopLevelBucketDBUpdaterTest, distributor_change_with_grouping) { } TEST_F(TopLevelBucketDBUpdaterTest, normal_usage_initializing) { - set_cluster_state("distributor:1 .0.s:i storage:1 .0.s:i"); + set_cluster_state("distributor:1 .0.s:i storage:1 .0.s:i"); // FIXME init mode why? ASSERT_EQ(_bucket_spaces.size(), _sender.commands().size()); // Not yet passing on system state. ASSERT_EQ(size_t(0), _sender_down.commands().size()); - ASSERT_NO_FATAL_FAILURE(complete_bucket_info_gathering(lib::ClusterState("distributor:1 .0.s:i storage:1"), + ASSERT_NO_FATAL_FAILURE(complete_bucket_info_gathering(lib::ClusterState("distributor:1 .0.s:i storage:1"), // FIXME init mode why? _bucket_spaces.size(), 10, 10)); ASSERT_NO_FATAL_FAILURE(assert_correct_buckets(10, "distributor:1 storage:1")); @@ -740,12 +740,12 @@ TEST_F(TopLevelBucketDBUpdaterTest, normal_usage_initializing) { _sender.clear(); _sender_down.clear(); - set_cluster_state("distributor:1 .0.s:i storage:1"); + set_cluster_state("distributor:1 .0.s:i storage:1"); // FIXME init mode why? // Send a new request bucket info up. ASSERT_EQ(_bucket_spaces.size(), _sender.commands().size()); - ASSERT_NO_FATAL_FAILURE(complete_bucket_info_gathering(lib::ClusterState("distributor:1 .0.s:i storage:1"), + ASSERT_NO_FATAL_FAILURE(complete_bucket_info_gathering(lib::ClusterState("distributor:1 .0.s:i storage:1"), // FIXME init mode why? _bucket_spaces.size(), 20)); // Pass on cluster state and recheck buckets now. @@ -755,7 +755,7 @@ TEST_F(TopLevelBucketDBUpdaterTest, normal_usage_initializing) { } TEST_F(TopLevelBucketDBUpdaterTest, failed_request_bucket_info) { - set_cluster_state("distributor:1 .0.s:i storage:1"); + set_cluster_state("distributor:1 .0.s:i storage:1"); // FIXME init mode why? // 2 messages sent up: 1 to the nodes, and one reply to the setsystemstate. ASSERT_EQ(_bucket_spaces.size(), _sender.commands().size()); @@ -781,7 +781,7 @@ TEST_F(TopLevelBucketDBUpdaterTest, failed_request_bucket_info) { ASSERT_EQ(size_t(0), _sender_down.commands().size()); for (uint32_t i = 0; i < _bucket_spaces.size(); ++i) { - ASSERT_NO_FATAL_FAILURE(fake_bucket_reply(lib::ClusterState("distributor:1 .0.s:i storage:1"), + ASSERT_NO_FATAL_FAILURE(fake_bucket_reply(lib::ClusterState("distributor:1 .0.s:i storage:1"), // FIXME init mode why? *_sender.command(_bucket_spaces.size() + i), 10)); } diff --git a/storage/src/tests/distributor/top_level_distributor_test_util.cpp b/storage/src/tests/distributor/top_level_distributor_test_util.cpp index b6e9beb38ae..e4002b3d1cb 100644 --- a/storage/src/tests/distributor/top_level_distributor_test_util.cpp +++ b/storage/src/tests/distributor/top_level_distributor_test_util.cpp @@ -303,11 +303,6 @@ TopLevelDistributorTestUtil::total_distributor_metrics() const return *_distributor->_total_metrics; } -const storage::distributor::DistributorNodeContext& -TopLevelDistributorTestUtil::node_context() const { - return _distributor->distributor_component(); -} - DistributorBucketSpace& TopLevelDistributorTestUtil::distributor_bucket_space(const document::BucketId& id) { diff --git a/storage/src/tests/distributor/top_level_distributor_test_util.h b/storage/src/tests/distributor/top_level_distributor_test_util.h index 8832f8ada6e..e8794eb4199 100644 --- a/storage/src/tests/distributor/top_level_distributor_test_util.h +++ b/storage/src/tests/distributor/top_level_distributor_test_util.h @@ -69,7 +69,6 @@ public: TopLevelBucketDBUpdater& bucket_db_updater(); const IdealStateMetricSet& total_ideal_state_metrics() const; const DistributorMetricSet& total_distributor_metrics() const; - const storage::distributor::DistributorNodeContext& node_context() const; DistributorBucketSpace& distributor_bucket_space(const document::BucketId& id); const DistributorBucketSpace& distributor_bucket_space(const document::BucketId& id) const; diff --git a/storage/src/vespa/storage/bucketdb/bucketinfo.h b/storage/src/vespa/storage/bucketdb/bucketinfo.h index 690fd3e36a9..eae13cfc34c 100644 --- a/storage/src/vespa/storage/bucketdb/bucketinfo.h +++ b/storage/src/vespa/storage/bucketdb/bucketinfo.h @@ -9,7 +9,6 @@ namespace storage { namespace distributor { class DistributorStripeTestUtil; - class DistributorTestUtil; class TopLevelDistributorTestUtil; } @@ -205,7 +204,6 @@ public: private: friend class distributor::DistributorStripeTestUtil; - friend class distributor::DistributorTestUtil; friend class distributor::TopLevelDistributorTestUtil; /** diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp index 1751b05b25d..6378a8ed3c4 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe.cpp +++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp @@ -32,15 +32,11 @@ DistributorStripe::DistributorStripe(DistributorComponentRegister& compReg, DistributorMetricSet& metrics, IdealStateMetricSet& ideal_state_metrics, const NodeIdentity& node_identity, - framework::TickingThreadPool& threadPool, - DoneInitializeHandler& doneInitHandler, ChainedMessageSender& messageSender, StripeHostInfoNotifier& stripe_host_info_notifier, - bool use_legacy_mode, - bool& done_initializing_ref, + const bool& done_initializing_ref, uint32_t stripe_index) : DistributorStripeInterface(), - framework::StatusReporter("distributor", "Distributor"), _clusterStateBundle(lib::ClusterState()), _bucketSpaceRepo(std::make_unique<DistributorBucketSpaceRepo>(node_identity.node_index())), _readOnlyBucketSpaceRepo(std::make_unique<DistributorBucketSpaceRepo>(node_identity.node_index())), @@ -51,9 +47,7 @@ DistributorStripe::DistributorStripe(DistributorComponentRegister& compReg, _maintenanceOperationOwner(*this, _component.getClock()), _operation_sequencer(std::make_unique<OperationSequencer>()), _pendingMessageTracker(compReg, stripe_index), - _bucketDBUpdater(_component, _component, *this, *this, use_legacy_mode), - _distributorStatusDelegate(compReg, *this, *this), - _bucketDBStatusDelegate(compReg, *this, _bucketDBUpdater), + _bucketDBUpdater(_component, _component, *this, *this), _idealStateManager(_component, _component, ideal_state_metrics), _messageSender(messageSender), _stripe_host_info_notifier(stripe_host_info_notifier), @@ -61,8 +55,6 @@ DistributorStripe::DistributorStripe(DistributorComponentRegister& compReg, *_operation_sequencer, *this, _component, _idealStateManager, _operationOwner), _external_message_mutex(), - _threadPool(threadPool), - _doneInitializeHandler(doneInitHandler), _done_initializing_ref(done_initializing_ref), _bucketPriorityDb(std::make_unique<SimpleBucketPriorityDatabase>()), _scanner(std::make_unique<SimpleMaintenanceScanner>(*_bucketPriorityDb, _idealStateManager, *_bucketSpaceRepo)), @@ -83,13 +75,8 @@ DistributorStripe::DistributorStripe(DistributorComponentRegister& compReg, _last_db_memory_sample_time_point(), _inhibited_maintenance_tick_count(0), _must_send_updated_host_info(false), - _use_legacy_mode(use_legacy_mode), _stripe_index(stripe_index) { - if (use_legacy_mode) { - _distributorStatusDelegate.registerStatusPage(); - _bucketDBStatusDelegate.registerStatusPage(); - } propagateDefaultDistribution(_component.getDistribution()); propagateClusterStates(); }; @@ -173,15 +160,8 @@ DistributorStripe::handle_or_enqueue_message(const std::shared_ptr<api::StorageM if (_externalOperationHandler.try_handle_message_outside_main_thread(msg)) { return true; } - MBUS_TRACE(msg->getTrace(), 9, - vespalib::make_string("DistributorStripe[%u]: Added to message queue. Thread state: ", _stripe_index) - + _threadPool.getStatus()); - if (_use_legacy_mode) { - // TODO STRIPE remove - framework::TickingLockGuard guard(_threadPool.freezeCriticalTicks()); - _messageQueue.push_back(msg); - guard.broadcast(); - } else { + MBUS_TRACE(msg->getTrace(), 9, vespalib::make_string("DistributorStripe[%u]: Added to message queue.", _stripe_index)); + { std::lock_guard lock(_external_message_mutex); _messageQueue.push_back(msg); // Caller has the responsibility to wake up correct stripe @@ -294,10 +274,6 @@ DistributorStripe::enableClusterStateBundle(const lib::ClusterStateBundle& state propagateClusterStates(); const auto& baseline_state = *state.getBaselineClusterState(); - if (_use_legacy_mode && !_done_initializing_ref && (baseline_state.getNodeState(my_node).getState() == lib::State::UP)) { - _done_initializing_ref = true; // TODO STRIPE remove; responsibility moved to TopLevelDistributor in non-legacy - _doneInitializeHandler.notifyDoneInitializing(); - } enterRecoveryMode(); // Clear all active messages on nodes that are down. @@ -314,18 +290,6 @@ DistributorStripe::enableClusterStateBundle(const lib::ClusterStateBundle& state } } } - - // TODO STRIPE remove when legacy is gone; the stripe bucket DB updater does not have this info! - if (_use_legacy_mode && _bucketDBUpdater.bucketOwnershipHasChanged()) { - using TimePoint = OwnershipTransferSafeTimePointCalculator::TimePoint; - // Note: this assumes that std::chrono::system_clock and the framework - // system clock have the same epoch, which should be a reasonable - // assumption. - const auto now = TimePoint(std::chrono::milliseconds( - _component.getClock().getTimeInMillis().getTime())); - _externalOperationHandler.rejectFeedBeforeTimeReached( - _ownershipSafeTimeCalc->safeTimePoint(now)); - } } OperationRoutingSnapshot DistributorStripe::read_snapshot_for_bucket(const document::Bucket& bucket) const { @@ -400,28 +364,6 @@ void DistributorStripe::invalidate_bucket_spaces_stats() { } void -DistributorStripe::storage_distribution_changed() -{ - assert(_use_legacy_mode); - if (!_distribution.get() - || *_component.getDistribution() != *_distribution) - { - LOG(debug, - "Distribution changed to %s, must refetch bucket information", - _component.getDistribution()->toString().c_str()); - - // FIXME this is not thread safe - _nextDistribution = _component.getDistribution(); - } else { - LOG(debug, - "Got distribution change, but the distribution %s was the same as " - "before: %s", - _component.getDistribution()->toString().c_str(), - _distribution->toString().c_str()); - } -} - -void DistributorStripe::recheckBucketInfo(uint16_t nodeIdx, const document::Bucket &bucket) { _bucketDBUpdater.recheckBucketInfo(nodeIdx, bucket); } @@ -479,21 +421,6 @@ DistributorStripe::checkBucketForSplit(document::BucketSpace bucketSpace, } } -// TODO STRIPE must only be called when operating in legacy single stripe mode! -// In other cases, distribution config switching is controlled by top-level distributor, not via framework(tm). -void -DistributorStripe::enableNextDistribution() -{ - assert(_use_legacy_mode); - if (_nextDistribution.get()) { - _distribution = _nextDistribution; - propagateDefaultDistribution(_distribution); - _nextDistribution = std::shared_ptr<lib::Distribution>(); - // TODO conditional on whether top-level DB updater is in charge - _bucketDBUpdater.storageDistributionChanged(); - } -} - // TODO STRIPE must be invoked by top-level bucket db updater probably void DistributorStripe::propagateDefaultDistribution( @@ -509,7 +436,6 @@ DistributorStripe::propagateDefaultDistribution( // Only called when stripe is in rendezvous freeze void DistributorStripe::update_distribution_config(const BucketSpaceDistributionConfigs& new_configs) { - assert(!_use_legacy_mode); auto default_distr = new_configs.get_or_nullptr(document::FixedBucketSpaces::default_space()); auto global_distr = new_configs.get_or_nullptr(document::FixedBucketSpaces::global_space()); assert(default_distr && global_distr); @@ -741,11 +667,7 @@ DistributorStripe::scanNextBucket() void DistributorStripe::send_updated_host_info_if_required() { if (_must_send_updated_host_info) { - if (_use_legacy_mode) { - _component.getStateUpdater().immediately_send_get_node_state_replies(); - } else { - _stripe_host_info_notifier.notify_stripe_wants_to_send_host_info(_stripe_index); - } + _stripe_host_info_notifier.notify_stripe_wants_to_send_host_info(_stripe_index); _must_send_updated_host_info = false; } } @@ -758,30 +680,14 @@ DistributorStripe::startNextMaintenanceOperation() _scheduler->tick(_schedulingMode); } -// TODO STRIPE begone with this! -framework::ThreadWaitInfo -DistributorStripe::doCriticalTick(framework::ThreadIndex) -{ - _tickResult = framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN; - assert(_use_legacy_mode); - enableNextDistribution(); - enableNextConfig(); - fetchStatusRequests(); - fetchExternalMessages(); - return _tickResult; -} - framework::ThreadWaitInfo DistributorStripe::doNonCriticalTick(framework::ThreadIndex) { _tickResult = framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN; - if (!_use_legacy_mode) { + { std::lock_guard lock(_external_message_mutex); fetchExternalMessages(); } - if (_use_legacy_mode) { - handleStatusRequests(); - } startExternalOperations(); if (initializing()) { _bucketDBUpdater.resendDelayedMessages(); @@ -804,7 +710,6 @@ DistributorStripe::doNonCriticalTick(framework::ThreadIndex) } bool DistributorStripe::tick() { - assert(!_use_legacy_mode); auto wait_info = doNonCriticalTick(framework::ThreadIndex(0)); return !wait_info.waitWanted(); // If we don't want to wait, we presumably did some useful stuff. } @@ -823,14 +728,6 @@ void DistributorStripe::mark_maintenance_tick_as_no_longer_inhibited() noexcept } void -DistributorStripe::enableNextConfig() -{ - assert(_use_legacy_mode); - propagate_config_snapshot_to_internal_components(); - -} - -void DistributorStripe::update_total_distributor_config(std::shared_ptr<const DistributorConfiguration> config) { _total_config = std::move(config); @@ -851,52 +748,12 @@ DistributorStripe::propagate_config_snapshot_to_internal_components() } void -DistributorStripe::fetchStatusRequests() -{ - assert(_use_legacy_mode); - if (_fetchedStatusRequests.empty()) { - _fetchedStatusRequests.swap(_statusToDo); - } -} - -void DistributorStripe::fetchExternalMessages() { assert(_fetchedMessages.empty()); _fetchedMessages.swap(_messageQueue); } -void -DistributorStripe::handleStatusRequests() -{ - assert(_use_legacy_mode); - uint32_t sz = _fetchedStatusRequests.size(); - for (uint32_t i = 0; i < sz; ++i) { - auto& s = *_fetchedStatusRequests[i]; - s.getReporter().reportStatus(s.getStream(), s.getPath()); - s.notifyCompleted(); - } - _fetchedStatusRequests.clear(); - if (sz > 0) { - signalWorkWasDone(); - } -} - -vespalib::string -DistributorStripe::getReportContentType(const framework::HttpUrlPath& path) const -{ - assert(_use_legacy_mode); - if (path.hasAttribute("page")) { - if (path.getAttribute("page") == "buckets") { - return "text/html"; - } else { - return "application/xml"; - } - } else { - return "text/html"; - } -} - std::string DistributorStripe::getActiveIdealStateOperations() const { @@ -909,54 +766,6 @@ DistributorStripe::getActiveOperations() const return _operationOwner.toString(); } -// TODO STRIPE remove this; delegated to top-level Distributor only -bool -DistributorStripe::reportStatus(std::ostream& out, - const framework::HttpUrlPath& path) const -{ - assert(_use_legacy_mode); - if (!path.hasAttribute("page") || path.getAttribute("page") == "buckets") { - framework::PartlyHtmlStatusReporter htmlReporter(*this); - htmlReporter.reportHtmlHeader(out, path); - if (!path.hasAttribute("page")) { - out << "<a href=\"?page=pending\">Count of pending messages to storage nodes</a><br>\n" - << "<a href=\"?page=buckets\">List all buckets, highlight non-ideal state</a><br>\n"; - } else { - const_cast<IdealStateManager&>(_idealStateManager) - .getBucketStatus(out); - } - htmlReporter.reportHtmlFooter(out, path); - } else { - framework::PartlyXmlStatusReporter xmlReporter(*this, out, path); - using namespace vespalib::xml; - std::string page(path.getAttribute("page")); - - if (page == "pending") { - xmlReporter << XmlTag("pending") - << XmlAttribute("externalload", _operationOwner.size()) - << XmlAttribute("maintenance",_maintenanceOperationOwner.size()) - << XmlEndTag(); - } - } - - return true; -} - -// TODO STRIPE remove this; delegated to top-level Distributor only -bool -DistributorStripe::handleStatusRequest(const DelegatedStatusRequest& request) const -{ - assert(_use_legacy_mode); - auto wrappedRequest = std::make_shared<DistributorStatus>(request); - { - framework::TickingLockGuard guard(_threadPool.freezeCriticalTicks()); - _statusToDo.push_back(wrappedRequest); - guard.broadcast(); - } - wrappedRequest->waitForCompletion(); - return true; -} - StripeAccessGuard::PendingOperationStats DistributorStripe::pending_operation_stats() const { @@ -979,7 +788,6 @@ void DistributorStripe::enable_cluster_state_bundle(const lib::ClusterStateBundle& new_state, bool has_bucket_ownership_change) { - assert(!_use_legacy_mode); // TODO STRIPE replace legacy func enableClusterStateBundle(new_state); if (has_bucket_ownership_change) { diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.h b/storage/src/vespa/storage/distributor/distributor_stripe.h index d584960726a..0dcac9ea7b7 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe.h +++ b/storage/src/vespa/storage/distributor/distributor_stripe.h @@ -53,8 +53,6 @@ class ThrottlingOperationStarter; */ class DistributorStripe final : public DistributorStripeInterface, - public StatusDelegator, - public framework::StatusReporter, public MinReplicaProvider, public BucketSpacesStatsProvider, public NonTrackingMessageSender, @@ -65,12 +63,9 @@ public: DistributorMetricSet& metrics, IdealStateMetricSet& ideal_state_metrics, const NodeIdentity& node_identity, - framework::TickingThreadPool&, - DoneInitializeHandler&, ChainedMessageSender& messageSender, StripeHostInfoNotifier& stripe_host_info_notifier, - bool use_legacy_mode, - bool& done_initializing_ref, // TODO STRIPE const ref once legacy is gone and stripe can't mutate init state + const bool& done_initializing_ref, uint32_t stripe_index = 0); ~DistributorStripe() override; @@ -114,24 +109,15 @@ public: */ void notifyDistributionChangeEnabled() override; - void storage_distribution_changed(); - void recheckBucketInfo(uint16_t nodeIdx, const document::Bucket &bucket) override; bool handleReply(const std::shared_ptr<api::StorageReply>& reply) override; - // StatusReporter implementation - vespalib::string getReportContentType(const framework::HttpUrlPath&) const override; - bool reportStatus(std::ostream&, const framework::HttpUrlPath&) const override; - - bool handleStatusRequest(const DelegatedStatusRequest& request) const override; - StripeAccessGuard::PendingOperationStats pending_operation_stats() const override; std::string getActiveIdealStateOperations() const; std::string getActiveOperations() const; - framework::ThreadWaitInfo doCriticalTick(framework::ThreadIndex); framework::ThreadWaitInfo doNonCriticalTick(framework::ThreadIndex); /** @@ -199,22 +185,17 @@ public: bool tick() override; private: - // TODO STRIPE: reduce number of friends. DistributorStripe too popular for its own good. friend class TopLevelDistributor; friend class DistributorStripeTestUtil; - friend class DistributorTestUtil; - friend class TopLevelDistributorTestUtil; - friend class LegacyBucketDBUpdaterTest; friend class MetricUpdateHook; friend class MultiThreadedStripeAccessGuard; friend struct DistributorStripeTest; - friend struct LegacyDistributorTest; friend struct TopLevelDistributorTest; + friend class TopLevelDistributorTestUtil; bool handleMessage(const std::shared_ptr<api::StorageMessage>& msg); bool isMaintenanceReply(const api::StorageReply& reply) const; - void handleStatusRequests(); void send_shutdown_abort_reply(const std::shared_ptr<api::StorageMessage>&); void handle_or_propagate_message(const std::shared_ptr<api::StorageMessage>& msg); void startExternalOperations(); @@ -247,8 +228,6 @@ private: bool should_inhibit_current_maintenance_scan_tick() const noexcept; void mark_current_maintenance_tick_as_inhibited() noexcept; void mark_maintenance_tick_as_no_longer_inhibited() noexcept; - void enableNextConfig(); - void fetchStatusRequests(); void fetchExternalMessages(); void startNextMaintenanceOperation(); void signalWorkWasDone(); @@ -263,7 +242,6 @@ private: bool generateOperation(const std::shared_ptr<api::StorageMessage>& msg, Operation::SP& operation); - void enableNextDistribution(); // TODO STRIPE remove once legacy is gone void propagateDefaultDistribution(std::shared_ptr<const lib::Distribution>); // TODO STRIPE remove once legacy is gone void propagateClusterStates(); @@ -316,15 +294,12 @@ private: std::unique_ptr<OperationSequencer> _operation_sequencer; PendingMessageTracker _pendingMessageTracker; StripeBucketDBUpdater _bucketDBUpdater; - StatusReporterDelegate _distributorStatusDelegate; - StatusReporterDelegate _bucketDBStatusDelegate; IdealStateManager _idealStateManager; ChainedMessageSender& _messageSender; StripeHostInfoNotifier& _stripe_host_info_notifier; ExternalOperationHandler _externalOperationHandler; std::shared_ptr<lib::Distribution> _distribution; - std::shared_ptr<lib::Distribution> _nextDistribution; using MessageQueue = std::vector<std::shared_ptr<api::StorageMessage>>; struct IndirectHigherPriority { @@ -342,13 +317,7 @@ private: MessageQueue _messageQueue; ClientRequestPriorityQueue _client_request_priority_queue; MessageQueue _fetchedMessages; - framework::TickingThreadPool& _threadPool; - - mutable std::vector<std::shared_ptr<DistributorStatus>> _statusToDo; - mutable std::vector<std::shared_ptr<DistributorStatus>> _fetchedStatusRequests; - - DoneInitializeHandler& _doneInitializeHandler; // TODO STRIPE remove when legacy is gone - bool& _done_initializing_ref; + const bool& _done_initializing_ref; std::unique_ptr<BucketPriorityDatabase> _bucketPriorityDb; std::unique_ptr<SimpleMaintenanceScanner> _scanner; @@ -374,7 +343,6 @@ private: std::chrono::steady_clock::time_point _last_db_memory_sample_time_point; size_t _inhibited_maintenance_tick_count; bool _must_send_updated_host_info; - bool _use_legacy_mode; uint32_t _stripe_index; }; diff --git a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp index 1ca985947a2..0cf976c969c 100644 --- a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp +++ b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp @@ -159,14 +159,6 @@ void MultiThreadedStripeAccessGuard::report_delayed_single_bucket_requests(vespa }); } -TickableStripe& MultiThreadedStripeAccessGuard::first_stripe() noexcept { - return _stripe_pool.stripe_thread(0).stripe(); -} - -const TickableStripe& MultiThreadedStripeAccessGuard::first_stripe() const noexcept { - return _stripe_pool.stripe_thread(0).stripe(); -} - template <typename Func> void MultiThreadedStripeAccessGuard::for_each_stripe(Func&& f) { for (auto& stripe_thread : _stripe_pool) { diff --git a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h index 62af21cc43f..0ecc9eb803d 100644 --- a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h +++ b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h @@ -60,10 +60,6 @@ public: void report_delayed_single_bucket_requests(vespalib::xml::XmlOutputStream& xos) const override; private: - // TODO STRIPE remove once multi threaded stripe support is implemented - TickableStripe& first_stripe() noexcept; - const TickableStripe& first_stripe() const noexcept; - template <typename Func> void for_each_stripe(Func&& f); diff --git a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.cpp b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.cpp index c48434484d2..f0cc18b346e 100644 --- a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.cpp +++ b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.cpp @@ -31,25 +31,19 @@ namespace storage::distributor { StripeBucketDBUpdater::StripeBucketDBUpdater(const DistributorNodeContext& node_ctx, DistributorStripeOperationContext& op_ctx, DistributorStripeInterface& owner, - DistributorMessageSender& sender, - bool use_legacy_mode) + DistributorMessageSender& sender) : framework::StatusReporter("bucketdb", "Bucket DB Updater"), _node_ctx(node_ctx), _op_ctx(op_ctx), _distributor_interface(owner), _delayedRequests(), _sentMessages(), - _pendingClusterState(), - _history(), _sender(sender), _enqueuedRechecks(), - _outdatedNodesMap(), - _transitionTimer(_node_ctx.clock()), _stale_reads_enabled(false), _active_distribution_contexts(), _explicit_transition_read_guard(), - _distribution_context_mutex(), - _use_legacy_mode(use_legacy_mode) + _distribution_context_mutex() { for (auto& elem : _op_ctx.bucket_space_repo()) { _active_distribution_contexts.emplace( @@ -223,66 +217,18 @@ public: } -void -StripeBucketDBUpdater::removeSuperfluousBuckets( - const lib::ClusterStateBundle& newState, - bool is_distribution_config_change) -{ - assert(_use_legacy_mode); - const bool move_to_read_only_db = shouldDeferStateEnabling(); - const char* up_states = storage_node_up_states(); - for (auto& elem : _op_ctx.bucket_space_repo()) { - const auto& newDistribution(elem.second->getDistribution()); - const auto& oldClusterState(elem.second->getClusterState()); - const auto& new_cluster_state = newState.getDerivedClusterState(elem.first); - - // Running a full DB sweep is expensive, so if the cluster state transition does - // not actually indicate that buckets should possibly be removed, we elide it entirely. - if (!is_distribution_config_change - && db_pruning_may_be_elided(oldClusterState, *new_cluster_state, up_states)) - { - LOG(debug, "[bucket space '%s']: eliding DB pruning for state transition '%s' -> '%s'", - document::FixedBucketSpaces::to_string(elem.first).data(), - oldClusterState.toString().c_str(), new_cluster_state->toString().c_str()); - continue; - } - - auto& bucketDb(elem.second->getBucketDatabase()); - auto& readOnlyDb(_op_ctx.read_only_bucket_space_repo().get(elem.first).getBucketDatabase()); - - // Remove all buckets not belonging to this distributor, or - // being on storage nodes that are no longer up. - MergingNodeRemover proc( - oldClusterState, - *new_cluster_state, - _node_ctx.node_index(), - newDistribution, - up_states, - move_to_read_only_db); - - bucketDb.merge(proc); - if (move_to_read_only_db) { - ReadOnlyDbMergingInserter read_only_merger(proc.getNonOwnedEntries()); - readOnlyDb.merge(read_only_merger); - } - maybe_inject_simulated_db_pruning_delay(); - } -} - PotentialDataLossReport StripeBucketDBUpdater::remove_superfluous_buckets( document::BucketSpace bucket_space, const lib::ClusterState& new_state, bool is_distribution_change) { - assert(!_use_legacy_mode); (void)is_distribution_change; // TODO remove if not needed const bool move_to_read_only_db = shouldDeferStateEnabling(); const char* up_states = storage_node_up_states(); auto& s = _op_ctx.bucket_space_repo().get(bucket_space); const auto& new_distribution = s.getDistribution(); - const auto& old_cluster_state = s.getClusterState(); // Elision of DB sweep is done at a higher level, so we don't have to do that here. auto& bucket_db = s.getBucketDatabase(); auto& read_only_db = _op_ctx.read_only_bucket_space_repo().get(bucket_space).getBucketDatabase(); @@ -290,7 +236,6 @@ StripeBucketDBUpdater::remove_superfluous_buckets( // Remove all buckets not belonging to this distributor, or // being on storage nodes that are no longer up. MergingNodeRemover proc( - old_cluster_state, new_state, _node_ctx.node_index(), new_distribution, @@ -303,7 +248,7 @@ StripeBucketDBUpdater::remove_superfluous_buckets( read_only_db.merge(read_only_merger); } PotentialDataLossReport report; - report.buckets = proc.removed_buckets(); + report.buckets = proc.removed_buckets(); report.documents = proc.removed_documents(); return report; } @@ -317,7 +262,6 @@ StripeBucketDBUpdater::merge_entries_into_db(document::BucketSpace bucket_space, const std::unordered_set<uint16_t>& outdated_nodes, const std::vector<dbtransition::Entry>& entries) { - assert(!_use_legacy_mode); auto& s = _op_ctx.bucket_space_repo().get(bucket_space); auto& bucket_db = s.getBucketDatabase(); @@ -326,44 +270,6 @@ StripeBucketDBUpdater::merge_entries_into_db(document::BucketSpace bucket_space, bucket_db.merge(merger); } -namespace { - -void maybe_sleep_for(std::chrono::milliseconds ms) { - if (ms.count() > 0) { - std::this_thread::sleep_for(ms); - } -} - -} - -void -StripeBucketDBUpdater::maybe_inject_simulated_db_pruning_delay() { - maybe_sleep_for(_op_ctx.distributor_config().simulated_db_pruning_latency()); -} - -void -StripeBucketDBUpdater::maybe_inject_simulated_db_merging_delay() { - maybe_sleep_for(_op_ctx.distributor_config().simulated_db_merging_latency()); -} - -void -StripeBucketDBUpdater::ensureTransitionTimerStarted() -{ - // Don't overwrite start time if we're already processing a state, as - // that will make transition times appear artificially low. - if (!hasPendingClusterState()) { - _transitionTimer = framework::MilliSecTimer( - _node_ctx.clock()); - } -} - -void -StripeBucketDBUpdater::completeTransitionTimer() -{ - _distributor_interface.getMetrics() - .stateTransitionTime.addValue(_transitionTimer.getElapsedTimeAsDouble()); -} - void StripeBucketDBUpdater::clearReadOnlyBucketRepoDatabases() { @@ -372,46 +278,6 @@ StripeBucketDBUpdater::clearReadOnlyBucketRepoDatabases() } } -void -StripeBucketDBUpdater::storageDistributionChanged() -{ - ensureTransitionTimerStarted(); - - removeSuperfluousBuckets(_op_ctx.cluster_state_bundle(), true); - - auto clusterInfo = std::make_shared<const SimpleClusterInformation>( - _node_ctx.node_index(), - _op_ctx.cluster_state_bundle(), - storage_node_up_states()); - _pendingClusterState = PendingClusterState::createForDistributionChange( - _node_ctx.clock(), - std::move(clusterInfo), - _sender, - _op_ctx.bucket_space_repo(), - _op_ctx.generate_unique_timestamp()); - _outdatedNodesMap = _pendingClusterState->getOutdatedNodesMap(); - _op_ctx.bucket_space_repo().set_pending_cluster_state_bundle(_pendingClusterState->getNewClusterStateBundle()); -} - -void -StripeBucketDBUpdater::replyToPreviousPendingClusterStateIfAny() -{ - if (_pendingClusterState.get() && _pendingClusterState->hasCommand()) { - _distributor_interface.getMessageSender().sendUp( - std::make_shared<api::SetSystemStateReply>(*_pendingClusterState->getCommand())); - } -} - -void -StripeBucketDBUpdater::replyToActivationWithActualVersion( - const api::ActivateClusterStateVersionCommand& cmd, - uint32_t actualVersion) -{ - auto reply = std::make_shared<api::ActivateClusterStateVersionReply>(cmd); - reply->setActualVersion(actualVersion); - _distributor_interface.getMessageSender().sendUp(reply); // TODO let API accept rvalues -} - void StripeBucketDBUpdater::update_read_snapshot_before_db_pruning() { std::lock_guard lock(_distribution_context_mutex); for (auto& elem : _op_ctx.bucket_space_repo()) { @@ -428,7 +294,6 @@ void StripeBucketDBUpdater::update_read_snapshot_before_db_pruning() { } } - void StripeBucketDBUpdater::update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle& new_state) { std::lock_guard lock(_distribution_context_mutex); const auto old_default_state = _op_ctx.bucket_space_repo().get( @@ -467,88 +332,6 @@ void StripeBucketDBUpdater::update_read_snapshot_after_activation(const lib::Clu } } -bool -StripeBucketDBUpdater::onSetSystemState( - const std::shared_ptr<api::SetSystemStateCommand>& cmd) -{ - assert(_use_legacy_mode); - LOG(debug, - "Received new cluster state %s", - cmd->getSystemState().toString().c_str()); - - const lib::ClusterStateBundle oldState = _op_ctx.cluster_state_bundle(); - const lib::ClusterStateBundle& state = cmd->getClusterStateBundle(); - - if (state == oldState) { - return false; - } - ensureTransitionTimerStarted(); - // Separate timer since _transition_timer might span multiple pending states. - framework::MilliSecTimer process_timer(_node_ctx.clock()); - update_read_snapshot_before_db_pruning(); - const auto& bundle = cmd->getClusterStateBundle(); - removeSuperfluousBuckets(bundle, false); - update_read_snapshot_after_db_pruning(bundle); - replyToPreviousPendingClusterStateIfAny(); - - auto clusterInfo = std::make_shared<const SimpleClusterInformation>( - _node_ctx.node_index(), - _op_ctx.cluster_state_bundle(), - storage_node_up_states()); - _pendingClusterState = PendingClusterState::createForClusterStateChange( - _node_ctx.clock(), - std::move(clusterInfo), - _sender, - _op_ctx.bucket_space_repo(), - cmd, - _outdatedNodesMap, - _op_ctx.generate_unique_timestamp()); - _outdatedNodesMap = _pendingClusterState->getOutdatedNodesMap(); - - _distributor_interface.getMetrics().set_cluster_state_processing_time.addValue( - process_timer.getElapsedTimeAsDouble()); - - _op_ctx.bucket_space_repo().set_pending_cluster_state_bundle(_pendingClusterState->getNewClusterStateBundle()); - if (isPendingClusterStateCompleted()) { - processCompletedPendingClusterState(); - } - return true; -} - -bool -StripeBucketDBUpdater::onActivateClusterStateVersion(const std::shared_ptr<api::ActivateClusterStateVersionCommand>& cmd) -{ - assert(_use_legacy_mode); - if (hasPendingClusterState() && _pendingClusterState->isVersionedTransition()) { - const auto pending_version = _pendingClusterState->clusterStateVersion(); - if (pending_version == cmd->version()) { - if (isPendingClusterStateCompleted()) { - assert(_pendingClusterState->isDeferred()); - activatePendingClusterState(); - } else { - LOG(error, "Received cluster state activation for pending version %u " - "without pending state being complete yet. This is not expected, " - "as no activation should be sent before all distributors have " - "reported that state processing is complete.", pending_version); - replyToActivationWithActualVersion(*cmd, 0); // Invalid version, will cause re-send (hopefully when completed). - return true; - } - } else { - replyToActivationWithActualVersion(*cmd, pending_version); - return true; - } - } else if (shouldDeferStateEnabling()) { - // Likely just a resend, but log warn for now to get a feel of how common it is. - LOG(warning, "Received cluster state activation command for version %u, which " - "has no corresponding pending state. Likely resent operation.", cmd->version()); - } else { - LOG(debug, "Received cluster state activation command for version %u, but distributor " - "config does not have deferred activation enabled. Treating as no-op.", cmd->version()); - } - // Fall through to next link in call chain that cares about this message. - return false; -} - StripeBucketDBUpdater::MergeReplyGuard::~MergeReplyGuard() { if (_reply) { @@ -646,30 +429,9 @@ bool StripeBucketDBUpdater::onRequestBucketInfoReply( const std::shared_ptr<api::RequestBucketInfoReply> & repl) { - if (pendingClusterStateAccepted(repl)) { - return true; - } return processSingleBucketInfoReply(repl); } -bool -StripeBucketDBUpdater::pendingClusterStateAccepted( - const std::shared_ptr<api::RequestBucketInfoReply> & repl) -{ - if (_pendingClusterState.get() - && _pendingClusterState->onRequestBucketInfoReply(repl)) - { - if (isPendingClusterStateCompleted()) { - processCompletedPendingClusterState(); - } - return true; - } - LOG(spam, - "Reply %s was not accepted by pending cluster state", - repl->toString().c_str()); - return false; -} - void StripeBucketDBUpdater::handleSingleBucketInfoFailure( const std::shared_ptr<api::RequestBucketInfoReply>& repl, @@ -688,9 +450,6 @@ StripeBucketDBUpdater::handleSingleBucketInfoFailure( void StripeBucketDBUpdater::resendDelayedMessages() { - if (_pendingClusterState) { - _pendingClusterState->resendDelayedMessages(); - } if (_delayedRequests.empty()) { return; // Don't fetch time if not needed } @@ -803,100 +562,11 @@ StripeBucketDBUpdater::updateDatabase(document::BucketSpace bucketSpace, uint16_ } } -bool -StripeBucketDBUpdater::isPendingClusterStateCompleted() const -{ - return _pendingClusterState.get() && _pendingClusterState->done(); -} - -void -StripeBucketDBUpdater::processCompletedPendingClusterState() -{ - if (_pendingClusterState->isDeferred()) { - LOG(debug, "Deferring completion of pending cluster state version %u until explicitly activated", - _pendingClusterState->clusterStateVersion()); - assert(_pendingClusterState->hasCommand()); // Deferred transitions should only ever be created by state commands. - // Sending down SetSystemState command will reach the state manager and a reply - // will be auto-sent back to the cluster controller in charge. Once this happens, - // it will send an explicit activation command once all distributors have reported - // that their pending cluster states have completed. - // A booting distributor will treat itself as "system Up" before the state has actually - // taken effect via activation. External operation handler will keep operations from - // actually being scheduled until state has been activated. The external operation handler - // needs to be explicitly aware of the case where no state has yet to be activated. - _distributor_interface.getMessageSender().sendDown( - _pendingClusterState->getCommand()); - _pendingClusterState->clearCommand(); - return; - } - // Distribution config change or non-deferred cluster state. Immediately activate - // the pending state without being told to do so explicitly. - activatePendingClusterState(); -} - -void -StripeBucketDBUpdater::activatePendingClusterState() -{ - framework::MilliSecTimer process_timer(_node_ctx.clock()); - - _pendingClusterState->mergeIntoBucketDatabases(); - maybe_inject_simulated_db_merging_delay(); - - if (_pendingClusterState->isVersionedTransition()) { - LOG(debug, "Activating pending cluster state version %u", _pendingClusterState->clusterStateVersion()); - enableCurrentClusterStateBundleInDistributor(); - if (_pendingClusterState->hasCommand()) { - _distributor_interface.getMessageSender().sendDown( - _pendingClusterState->getCommand()); - } - addCurrentStateToClusterStateHistory(); - } else { - LOG(debug, "Activating pending distribution config"); - // TODO distribution changes cannot currently be deferred as they are not - // initiated by the cluster controller! - _distributor_interface.notifyDistributionChangeEnabled(); - } - - update_read_snapshot_after_activation(_pendingClusterState->getNewClusterStateBundle()); - _pendingClusterState.reset(); - _outdatedNodesMap.clear(); - _op_ctx.bucket_space_repo().clear_pending_cluster_state_bundle(); // TODO also read only bucket space..? - sendAllQueuedBucketRechecks(); - completeTransitionTimer(); - clearReadOnlyBucketRepoDatabases(); - - _distributor_interface.getMetrics().activate_cluster_state_processing_time.addValue( - process_timer.getElapsedTimeAsDouble()); -} - -void -StripeBucketDBUpdater::enableCurrentClusterStateBundleInDistributor() -{ - const lib::ClusterStateBundle& state( - _pendingClusterState->getNewClusterStateBundle()); - - LOG(debug, - "StripeBucketDBUpdater finished processing state %s", - state.getBaselineClusterState()->toString().c_str()); - - _distributor_interface.enableClusterStateBundle(state); -} - void StripeBucketDBUpdater::simulate_cluster_state_bundle_activation(const lib::ClusterStateBundle& activated_state) { update_read_snapshot_after_activation(activated_state); _distributor_interface.enableClusterStateBundle(activated_state); } -void -StripeBucketDBUpdater::addCurrentStateToClusterStateHistory() -{ - _history.push_back(_pendingClusterState->getSummary()); - - if (_history.size() > 50) { - _history.pop_front(); - } -} - vespalib::string StripeBucketDBUpdater::getReportContentType(const framework::HttpUrlPath&) const { @@ -952,19 +622,7 @@ StripeBucketDBUpdater::reportXmlStatus(vespalib::xml::XmlOutputStream& xos, << XmlTag("systemstate_active") << XmlContent(_op_ctx.cluster_state_bundle().getBaselineClusterState()->toString()) << XmlEndTag(); - if (_pendingClusterState) { - xos << *_pendingClusterState; - } - xos << XmlTag("systemstate_history"); - for (auto i(_history.rbegin()), e(_history.rend()); i != e; ++i) { - xos << XmlTag("change") - << XmlAttribute("from", i->_prevClusterState) - << XmlAttribute("to", i->_newClusterState) - << XmlAttribute("processingtime", i->_processingTime) - << XmlEndTag(); - } - xos << XmlEndTag() - << XmlTag("single_bucket_requests"); + xos << XmlTag("single_bucket_requests"); report_single_bucket_requests(xos); xos << XmlEndTag() << XmlTag("delayed_single_bucket_requests"); @@ -990,14 +648,12 @@ StripeBucketDBUpdater::report_delayed_single_bucket_requests(vespalib::xml::XmlO } StripeBucketDBUpdater::MergingNodeRemover::MergingNodeRemover( - const lib::ClusterState& oldState, const lib::ClusterState& s, uint16_t localIndex, const lib::Distribution& distribution, const char* upStates, bool track_non_owned_entries) - : _oldState(oldState), - _state(s), + : _state(s), _available_nodes(), _nonOwnedBuckets(), _removed_buckets(0), @@ -1140,15 +796,6 @@ StripeBucketDBUpdater::MergingNodeRemover::storage_node_is_available(uint16_t in return ((index < _available_nodes.size()) && _available_nodes[index]); } -StripeBucketDBUpdater::MergingNodeRemover::~MergingNodeRemover() -{ - if (_removed_buckets != 0) { - LOGBM(info, "After cluster state change %s, %zu buckets no longer " - "have available replicas. %zu documents in these buckets will " - "be unavailable until nodes come back up", - _oldState.getTextualDifference(_state).c_str(), - _removed_buckets, _removed_documents); - } -} +StripeBucketDBUpdater::MergingNodeRemover::~MergingNodeRemover() = default; } // distributor diff --git a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h index 9bc91ca78e7..1d81aa014a8 100644 --- a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h +++ b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h @@ -35,12 +35,10 @@ class StripeBucketDBUpdater final public api::MessageHandler { public: - using OutdatedNodesMap = dbtransition::OutdatedNodesMap; StripeBucketDBUpdater(const DistributorNodeContext& node_ctx, DistributorStripeOperationContext& op_ctx, DistributorStripeInterface& owner, - DistributorMessageSender& sender, - bool use_legacy_mode); + DistributorMessageSender& sender); ~StripeBucketDBUpdater() override; void flush(); @@ -48,13 +46,10 @@ public: void recheckBucketInfo(uint32_t nodeIdx, const document::Bucket& bucket); void handle_activated_cluster_state_bundle(); - bool onSetSystemState(const std::shared_ptr<api::SetSystemStateCommand>& cmd) override; - bool onActivateClusterStateVersion(const std::shared_ptr<api::ActivateClusterStateVersionCommand>& cmd) override; bool onRequestBucketInfoReply(const std::shared_ptr<api::RequestBucketInfoReply> & repl) override; bool onMergeBucketReply(const std::shared_ptr<api::MergeBucketReply>& reply) override; bool onNotifyBucketChange(const std::shared_ptr<api::NotifyBucketChangeCommand>&) override; void resendDelayedMessages(); - void storageDistributionChanged(); vespalib::string reportXmlStatus(vespalib::xml::XmlOutputStream&, const framework::HttpUrlPath&) const; vespalib::string getReportContentType(const framework::HttpUrlPath&) const override; @@ -68,16 +63,6 @@ public: const DistributorNodeContext& node_context() const { return _node_ctx; } DistributorStripeOperationContext& operation_context() { return _op_ctx; } - /** - * Returns whether the current PendingClusterState indicates that there has - * been a transfer of bucket ownership amongst the distributors in the - * cluster. This method only makes sense to call when _pending_cluster_state - * is active, such as from within a enableClusterState() call. - */ - bool bucketOwnershipHasChanged() const { - return ((_pendingClusterState.get() != nullptr) - && _pendingClusterState->hasBucketOwnershipTransfer()); - } void set_stale_reads_enabled(bool enabled) noexcept { _stale_reads_enabled.store(enabled, std::memory_order_relaxed); } @@ -144,8 +129,6 @@ private: }; friend class DistributorStripeTestUtil; - friend class DistributorTestUtil; - friend class TopLevelDistributorTestUtil; // TODO STRIPE remove asap // TODO refactor and rewire to avoid needing this direct meddling friend class DistributorStripe; @@ -156,13 +139,9 @@ private: bool shouldDeferStateEnabling() const noexcept; bool hasPendingClusterState() const; - bool pendingClusterStateAccepted(const std::shared_ptr<api::RequestBucketInfoReply>& repl); bool processSingleBucketInfoReply(const std::shared_ptr<api::RequestBucketInfoReply>& repl); void handleSingleBucketInfoFailure(const std::shared_ptr<api::RequestBucketInfoReply>& repl, const BucketRequest& req); - bool isPendingClusterStateCompleted() const; - void processCompletedPendingClusterState(); - void activatePendingClusterState(); void mergeBucketInfoWithDatabase(const std::shared_ptr<api::RequestBucketInfoReply>& repl, const BucketRequest& req); void convertBucketInfoToBucketList(const std::shared_ptr<api::RequestBucketInfoReply>& repl, @@ -171,8 +150,6 @@ private: const std::shared_ptr<MergeReplyGuard>& mergeReply); void addBucketInfoForNode(const BucketDatabase::Entry& e, uint16_t node, BucketListMerger::BucketList& existing) const; - void ensureTransitionTimerStarted(); - void completeTransitionTimer(); void clearReadOnlyBucketRepoDatabases(); /** * Adds all buckets contained in the bucket database @@ -189,15 +166,10 @@ private: */ void updateDatabase(document::BucketSpace bucketSpace, uint16_t node, BucketListMerger& merger); - void updateState(const lib::ClusterState& oldState, const lib::ClusterState& newState); - void update_read_snapshot_before_db_pruning(); - void removeSuperfluousBuckets(const lib::ClusterStateBundle& newState, - bool is_distribution_config_change); void update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle& new_state); void update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state); - // TODO STRIPE only called when stripe guard is held PotentialDataLossReport remove_superfluous_buckets(document::BucketSpace bucket_space, const lib::ClusterState& new_state, bool is_distribution_change); @@ -209,26 +181,15 @@ private: const std::unordered_set<uint16_t>& outdated_nodes, const std::vector<dbtransition::Entry>& entries); - void replyToPreviousPendingClusterStateIfAny(); - void replyToActivationWithActualVersion( - const api::ActivateClusterStateVersionCommand& cmd, - uint32_t actualVersion); - - void enableCurrentClusterStateBundleInDistributor(); - void addCurrentStateToClusterStateHistory(); void enqueueRecheckUntilPendingStateEnabled(uint16_t node, const document::Bucket&); void sendAllQueuedBucketRechecks(); - void maybe_inject_simulated_db_pruning_delay(); - void maybe_inject_simulated_db_merging_delay(); - /** Removes all copies of buckets that are on nodes that are down. */ class MergingNodeRemover : public BucketDatabase::MergingProcessor { public: - MergingNodeRemover(const lib::ClusterState& oldState, - const lib::ClusterState& s, + MergingNodeRemover(const lib::ClusterState& s, uint16_t localIndex, const lib::Distribution& distribution, const char* upStates, @@ -250,44 +211,38 @@ private: bool has_unavailable_nodes(const BucketDatabase::Entry&) const; bool storage_node_is_available(uint16_t index) const noexcept; - const lib::ClusterState _oldState; - const lib::ClusterState _state; - std::vector<bool> _available_nodes; + const lib::ClusterState _state; + std::vector<bool> _available_nodes; std::vector<BucketDatabase::Entry> _nonOwnedBuckets; - size_t _removed_buckets; - size_t _removed_documents; - - uint16_t _localIndex; - const lib::Distribution& _distribution; - const char* _upStates; - bool _track_non_owned_entries; - - mutable uint64_t _cachedDecisionSuperbucket; - mutable bool _cachedOwned; + size_t _removed_buckets; + size_t _removed_documents; + uint16_t _localIndex; + const lib::Distribution& _distribution; + const char* _upStates; + bool _track_non_owned_entries; + mutable uint64_t _cachedDecisionSuperbucket; + mutable bool _cachedOwned; }; - const DistributorNodeContext& _node_ctx; - DistributorStripeOperationContext& _op_ctx; - DistributorStripeInterface& _distributor_interface; - std::deque<std::pair<framework::MilliSecTime, BucketRequest> > _delayedRequests; - std::map<uint64_t, BucketRequest> _sentMessages; - std::unique_ptr<PendingClusterState> _pendingClusterState; - std::list<PendingClusterState::Summary> _history; - DistributorMessageSender& _sender; - std::set<EnqueuedBucketRecheck> _enqueuedRechecks; - OutdatedNodesMap _outdatedNodesMap; - framework::MilliSecTimer _transitionTimer; - std::atomic<bool> _stale_reads_enabled; using DistributionContexts = std::unordered_map<document::BucketSpace, std::shared_ptr<BucketSpaceDistributionContext>, document::BucketSpace::hash>; - DistributionContexts _active_distribution_contexts; using DbGuards = std::unordered_map<document::BucketSpace, std::shared_ptr<BucketDatabase::ReadGuard>, document::BucketSpace::hash>; - DbGuards _explicit_transition_read_guard; - mutable std::mutex _distribution_context_mutex; - bool _use_legacy_mode; + using DelayedRequestsQueue = std::deque<std::pair<framework::MilliSecTime, BucketRequest>>; + + const DistributorNodeContext& _node_ctx; + DistributorStripeOperationContext& _op_ctx; + DistributorStripeInterface& _distributor_interface; + DelayedRequestsQueue _delayedRequests; + std::map<uint64_t, BucketRequest> _sentMessages; + DistributorMessageSender& _sender; + std::set<EnqueuedBucketRecheck> _enqueuedRechecks; + std::atomic<bool> _stale_reads_enabled; + DistributionContexts _active_distribution_contexts; + DbGuards _explicit_transition_read_guard; + mutable std::mutex _distribution_context_mutex; }; } diff --git a/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp b/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp index 191be2a6766..20ecf68c3f1 100644 --- a/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp +++ b/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp @@ -91,9 +91,6 @@ TopLevelBucketDBUpdater::propagate_distribution_config(const BucketSpaceDistribu } } -// FIXME what about bucket DB replica update timestamp allocations?! Replace with u64 counter..? -// Must at the very least ensure we use stripe-local TS generation for DB inserts...! i.e. no global TS -// Or do we have to touch these at all here? Just defer all this via stripe interface? void TopLevelBucketDBUpdater::flush() { diff --git a/storage/src/vespa/storage/distributor/top_level_distributor.cpp b/storage/src/vespa/storage/distributor/top_level_distributor.cpp index ae414b2a85e..456464576a1 100644 --- a/storage/src/vespa/storage/distributor/top_level_distributor.cpp +++ b/storage/src/vespa/storage/distributor/top_level_distributor.cpp @@ -22,13 +22,11 @@ #include <vespa/storage/common/node_identity.h> #include <vespa/storage/common/nodestateupdater.h> #include <vespa/storage/config/distributorconfiguration.h> -#include <vespa/storage/distributor/maintenance/simplebucketprioritydatabase.h> #include <vespa/storageapi/message/persistence.h> #include <vespa/storageapi/message/visitor.h> #include <vespa/storageframework/generic/status/xmlstatusreporter.h> #include <vespa/vdslib/distribution/distribution.h> #include <vespa/vespalib/util/memoryusage.h> -#include <vespa/vespalib/util/time.h> #include <algorithm> #include <vespa/log/log.h> @@ -38,16 +36,6 @@ using namespace std::chrono_literals; namespace storage::distributor { -/* TODO STRIPE - * - need a DistributorStripeComponent per stripe - * - or better, remove entirely! - * - probably also DistributorStripeInterface since it's used to send - * - metrics aggregation - * - host info aggregation..!! - * - handled if Distributor getMinReplica etc delegates to stripes? - * - these are already thread safe - * - status aggregation - */ TopLevelDistributor::TopLevelDistributor(DistributorComponentRegister& compReg, const NodeIdentity& node_identity, framework::TickingThreadPool& threadPool, @@ -61,23 +49,11 @@ TopLevelDistributor::TopLevelDistributor(DistributorComponentRegister& compReg, _node_identity(node_identity), _comp_reg(compReg), _done_init_handler(done_init_handler), - _use_legacy_mode(num_distributor_stripes == 0), _done_initializing(false), - _metrics(std::make_shared<DistributorMetricSet>()), - _total_metrics(_use_legacy_mode ? std::shared_ptr<DistributorTotalMetrics>() - : std::make_shared<DistributorTotalMetrics>(num_distributor_stripes)), - _ideal_state_metrics(_use_legacy_mode ? std::make_shared<IdealStateMetricSet>() - : std::shared_ptr<IdealStateMetricSet>()), - _ideal_state_total_metrics(_use_legacy_mode ? std::shared_ptr<IdealStateTotalMetrics>() - : std::make_shared<IdealStateTotalMetrics>(num_distributor_stripes)), + _total_metrics(std::make_shared<DistributorTotalMetrics>(num_distributor_stripes)), + _ideal_state_total_metrics(std::make_shared<IdealStateTotalMetrics>(num_distributor_stripes)), _messageSender(messageSender), _n_stripe_bits(0), - _stripe(std::make_unique<DistributorStripe>(compReg, - _use_legacy_mode ? *_metrics : _total_metrics->stripe(0), - (_use_legacy_mode ? *_ideal_state_metrics - : _ideal_state_total_metrics->stripe(0)), - node_identity, threadPool, - _done_init_handler, *this, *this, _use_legacy_mode, _done_initializing)), _stripe_pool(stripe_pool), _stripes(), _stripe_accessor(), @@ -105,35 +81,33 @@ TopLevelDistributor::TopLevelDistributor(DistributorComponentRegister& compReg, _next_distribution(), _current_internal_config_generation(_component.internal_config_generation()) { - _component.registerMetric(_use_legacy_mode ? *_metrics : *_total_metrics); - _ideal_state_component.registerMetric(_use_legacy_mode ? *_ideal_state_metrics : - *_ideal_state_total_metrics); + _component.registerMetric(*_total_metrics); + _ideal_state_component.registerMetric(*_ideal_state_total_metrics); _component.registerMetricUpdateHook(_metricUpdateHook, framework::SecondTime(0)); - if (!_use_legacy_mode) { - assert(num_distributor_stripes == adjusted_num_stripes(num_distributor_stripes)); - _n_stripe_bits = calc_num_stripe_bits(num_distributor_stripes); - LOG(debug, "Setting up distributor with %u stripes using %u stripe bits", - num_distributor_stripes, _n_stripe_bits); - _stripe_accessor = std::make_unique<MultiThreadedStripeAccessor>(_stripe_pool); - _bucket_db_updater = std::make_unique<TopLevelBucketDBUpdater>(_component, _component, - *this, *this, - _component.getDistribution(), - *_stripe_accessor, - this); - _stripes.emplace_back(std::move(_stripe)); - for (size_t i = 1; i < num_distributor_stripes; ++i) { - _stripes.emplace_back(std::make_unique<DistributorStripe>(compReg, - _total_metrics->stripe(i), - _ideal_state_total_metrics->stripe(i), - node_identity, threadPool, - _done_init_handler, *this, *this, _use_legacy_mode, - _done_initializing, i)); - } - _stripe_scan_stats.resize(num_distributor_stripes); - _distributorStatusDelegate.registerStatusPage(); - _bucket_db_status_delegate = std::make_unique<StatusReporterDelegate>(compReg, *this, *_bucket_db_updater); - _bucket_db_status_delegate->registerStatusPage(); - } + + assert(num_distributor_stripes == adjusted_num_stripes(num_distributor_stripes)); + _n_stripe_bits = calc_num_stripe_bits(num_distributor_stripes); + LOG(debug, "Setting up distributor with %u stripes using %u stripe bits", + num_distributor_stripes, _n_stripe_bits); + _stripe_accessor = std::make_unique<MultiThreadedStripeAccessor>(_stripe_pool); + _bucket_db_updater = std::make_unique<TopLevelBucketDBUpdater>(_component, _component, + *this, *this, + _component.getDistribution(), + *_stripe_accessor, + this); + for (size_t i = 0; i < num_distributor_stripes; ++i) { + _stripes.emplace_back(std::make_unique<DistributorStripe>(compReg, + _total_metrics->stripe(i), + _ideal_state_total_metrics->stripe(i), + node_identity, + *this, *this, + _done_initializing, i)); + } + _stripe_scan_stats.resize(num_distributor_stripes); + _distributorStatusDelegate.registerStatusPage(); + _bucket_db_status_delegate = std::make_unique<StatusReporterDelegate>(compReg, *this, *_bucket_db_updater); + _bucket_db_status_delegate->registerStatusPage(); + _hostInfoReporter.enableReporting(config().getEnableHostInfoReporting()); hostInfoReporterRegistrar.registerReporter(&_hostInfoReporter); propagateDefaultDistribution(_component.getDistribution()); @@ -148,114 +122,7 @@ TopLevelDistributor::~TopLevelDistributor() DistributorMetricSet& TopLevelDistributor::getMetrics() { - return _use_legacy_mode ? *_metrics : _total_metrics->bucket_db_updater_metrics(); -} - -// TODO STRIPE figure out how to handle inspection functions used by tests when legacy mode no longer exists. -// All functions below that assert on _use_legacy_mode are only currently used by tests - -bool -TopLevelDistributor::isInRecoveryMode() const noexcept { - assert(_use_legacy_mode); // TODO STRIPE - return _stripe->isInRecoveryMode(); -} - -const PendingMessageTracker& -TopLevelDistributor::getPendingMessageTracker() const { - assert(_use_legacy_mode); // TODO STRIPE - return _stripe->getPendingMessageTracker(); -} - -PendingMessageTracker& -TopLevelDistributor::getPendingMessageTracker() { - assert(_use_legacy_mode); // TODO STRIPE - return _stripe->getPendingMessageTracker(); -} - -DistributorBucketSpaceRepo& -TopLevelDistributor::getBucketSpaceRepo() noexcept { - assert(_use_legacy_mode); // TODO STRIPE - return _stripe->getBucketSpaceRepo(); -} - -const DistributorBucketSpaceRepo& -TopLevelDistributor::getBucketSpaceRepo() const noexcept { - assert(_use_legacy_mode); // TODO STRIPE - return _stripe->getBucketSpaceRepo(); -} - -DistributorBucketSpaceRepo& -TopLevelDistributor::getReadOnlyBucketSpaceRepo() noexcept { - assert(_use_legacy_mode); // TODO STRIPE - return _stripe->getReadOnlyBucketSpaceRepo(); -} - -const DistributorBucketSpaceRepo& -TopLevelDistributor::getReadyOnlyBucketSpaceRepo() const noexcept { - assert(_use_legacy_mode); // TODO STRIPE - return _stripe->getReadOnlyBucketSpaceRepo();; -} - -storage::distributor::DistributorStripeComponent& -TopLevelDistributor::distributor_component() noexcept { - assert(_use_legacy_mode); // TODO STRIPE - // TODO STRIPE We need to grab the stripe's component since tests like to access - // these things uncomfortably directly. - return _stripe->_component; -} - -StripeBucketDBUpdater& -TopLevelDistributor::bucket_db_updater() { - assert(_use_legacy_mode); // TODO STRIPE - return _stripe->bucket_db_updater(); -} - -const StripeBucketDBUpdater& -TopLevelDistributor::bucket_db_updater() const { - assert(_use_legacy_mode); // TODO STRIPE - return _stripe->bucket_db_updater(); -} - -IdealStateManager& -TopLevelDistributor::ideal_state_manager() { - assert(_use_legacy_mode); // TODO STRIPE - return _stripe->ideal_state_manager(); -} - -const IdealStateManager& -TopLevelDistributor::ideal_state_manager() const { - assert(_use_legacy_mode); // TODO STRIPE - return _stripe->ideal_state_manager(); -} - -ExternalOperationHandler& -TopLevelDistributor::external_operation_handler() { - assert(_use_legacy_mode); // TODO STRIPE - return _stripe->external_operation_handler(); -} - -const ExternalOperationHandler& -TopLevelDistributor::external_operation_handler() const { - assert(_use_legacy_mode); // TODO STRIPE - return _stripe->external_operation_handler(); -} - -BucketDBMetricUpdater& -TopLevelDistributor::bucket_db_metric_updater() const noexcept { - assert(_use_legacy_mode); // TODO STRIPE - return _stripe->_bucketDBMetricUpdater; -} - -const DistributorConfiguration& -TopLevelDistributor::getConfig() const { - assert(_use_legacy_mode); // TODO STRIPE - return _stripe->getConfig(); -} - -std::chrono::steady_clock::duration -TopLevelDistributor::db_memory_sample_interval() const noexcept { - assert(_use_legacy_mode); // TODO STRIPE - return _stripe->db_memory_sample_interval(); + return _total_metrics->bucket_db_updater_metrics(); } void @@ -272,8 +139,6 @@ TopLevelDistributor::onOpen() { LOG(debug, "Distributor::onOpen invoked"); setNodeStateUp(); - framework::MilliSecTime maxProcessingTime(60 * 1000); - framework::MilliSecTime waitTime(1000); if (_component.getDistributorConfig().startDistributorThread) { _threadPool.addThread(*this); _threadPool.start(_component.getThreadPool()); @@ -289,33 +154,27 @@ void TopLevelDistributor::onClose() { // Note: In a running system this function is called by the main thread in StorageApp as part of shutdown. // The distributor and stripe thread pools are already stopped at this point. LOG(debug, "Distributor::onClose invoked"); - if (_use_legacy_mode) { - _stripe->flush_and_close(); - } else { - // Tests may run with multiple stripes but without threads (for determinism's sake), - // so only try to flush stripes if a pool is running. - // TODO STRIPE probably also need to flush when running tests to handle any explicit close-tests. - if (_stripe_pool.stripe_count() > 0) { - assert(_stripe_pool.is_stopped()); - for (auto& thread : _stripe_pool) { - thread->stripe().flush_and_close(); - } + // Tests may run with multiple stripes but without threads (for determinism's sake), + // so only try to flush stripes if a pool is running. + // TODO STRIPE probably also need to flush when running tests to handle any explicit close-tests. + if (_stripe_pool.stripe_count() > 0) { + assert(_stripe_pool.is_stopped()); + for (auto& thread : _stripe_pool) { + thread->stripe().flush_and_close(); } - assert(_bucket_db_updater); - _bucket_db_updater->flush(); } + assert(_bucket_db_updater); + _bucket_db_updater->flush(); } void TopLevelDistributor::start_stripe_pool() { - if (!_use_legacy_mode) { - std::vector<TickableStripe*> pool_stripes; - for (auto& stripe : _stripes) { - pool_stripes.push_back(stripe.get()); - } - _stripe_pool.start(pool_stripes); // If unit testing, this won't actually start any OS threads + std::vector<TickableStripe*> pool_stripes; + for (auto& stripe : _stripes) { + pool_stripes.push_back(stripe.get()); } + _stripe_pool.start(pool_stripes); // If unit testing, this won't actually start any OS threads } void @@ -410,37 +269,19 @@ TopLevelDistributor::stripe_of_bucket_id(const document::BucketId& bucket_id, co bool TopLevelDistributor::onDown(const std::shared_ptr<api::StorageMessage>& msg) { - if (_use_legacy_mode) { - return _stripe->handle_or_enqueue_message(msg); - } else { - if (should_be_handled_by_top_level_bucket_db_updater(*msg)) { - dispatch_to_main_distributor_thread_queue(msg); - return true; - } - auto bucket_id = get_bucket_id_for_striping(*msg, _component); - uint32_t stripe_idx = stripe_of_bucket_id(bucket_id, *msg); - MBUS_TRACE(msg->getTrace(), 9, - vespalib::make_string("Distributor::onDown(): Dispatch message to stripe %u", stripe_idx)); - bool handled = _stripes[stripe_idx]->handle_or_enqueue_message(msg); - if (handled) { - _stripe_pool.notify_stripe_event_has_triggered(stripe_idx); - } - return handled; + if (should_be_handled_by_top_level_bucket_db_updater(*msg)) { + dispatch_to_main_distributor_thread_queue(msg); + return true; } -} - -bool -TopLevelDistributor::handleReply(const std::shared_ptr<api::StorageReply>& reply) -{ - assert(_use_legacy_mode); - return _stripe->handleReply(reply); -} - -bool -TopLevelDistributor::handleMessage(const std::shared_ptr<api::StorageMessage>& msg) -{ - assert(_use_legacy_mode); // TODO STRIPE - return _stripe->handleMessage(msg); + auto bucket_id = get_bucket_id_for_striping(*msg, _component); + uint32_t stripe_idx = stripe_of_bucket_id(bucket_id, *msg); + MBUS_TRACE(msg->getTrace(), 9, + vespalib::make_string("Distributor::onDown(): Dispatch message to stripe %u", stripe_idx)); + bool handled = _stripes[stripe_idx]->handle_or_enqueue_message(msg); + if (handled) { + _stripe_pool.notify_stripe_event_has_triggered(stripe_idx); + } + return handled; } const DistributorConfiguration& @@ -461,53 +302,28 @@ TopLevelDistributor::sendReply(const std::shared_ptr<api::StorageReply>& reply) sendUp(reply); } -const lib::ClusterStateBundle& -TopLevelDistributor::getClusterStateBundle() const -{ - assert(_use_legacy_mode); // TODO STRIPE - // TODO STRIPE must offer a single unifying state across stripes - return _stripe->getClusterStateBundle(); -} - -void -TopLevelDistributor::enableClusterStateBundle(const lib::ClusterStateBundle& state) -{ - assert(_use_legacy_mode); // TODO STRIPE - // TODO STRIPE make test injection/force-function - _stripe->enableClusterStateBundle(state); -} - void TopLevelDistributor::storageDistributionChanged() { - if (!_use_legacy_mode) { - if (!_distribution || (*_component.getDistribution() != *_distribution)) { - LOG(debug, "Distribution changed to %s, must re-fetch bucket information", - _component.getDistribution()->toString().c_str()); - _next_distribution = _component.getDistribution(); // FIXME this is not thread safe - } else { - LOG(debug, "Got distribution change, but the distribution %s was the same as before: %s", - _component.getDistribution()->toString().c_str(), - _distribution->toString().c_str()); - } + if (!_distribution || (*_component.getDistribution() != *_distribution)) { + LOG(debug, "Distribution changed to %s, must re-fetch bucket information", + _component.getDistribution()->toString().c_str()); + _next_distribution = _component.getDistribution(); // FIXME this is not thread safe } else { - // May happen from any thread. - _stripe->storage_distribution_changed(); + LOG(debug, "Got distribution change, but the distribution %s was the same as before: %s", + _component.getDistribution()->toString().c_str(), + _distribution->toString().c_str()); } } void TopLevelDistributor::enableNextDistribution() { - if (!_use_legacy_mode) { - if (_next_distribution) { - _distribution = _next_distribution; - _next_distribution = std::shared_ptr<lib::Distribution>(); - auto new_configs = BucketSpaceDistributionConfigs::from_default_distribution(_distribution); - _bucket_db_updater->storage_distribution_changed(new_configs); - } - } else { - _stripe->enableNextDistribution(); + if (_next_distribution) { + _distribution = _next_distribution; + _next_distribution = std::shared_ptr<lib::Distribution>(); + auto new_configs = BucketSpaceDistributionConfigs::from_default_distribution(_distribution); + _bucket_db_updater->storage_distribution_changed(new_configs); } } @@ -517,79 +333,51 @@ void TopLevelDistributor::propagateDefaultDistribution( std::shared_ptr<const lib::Distribution> distribution) { - // TODO STRIPE cannot directly access stripe when not in legacy mode! - if (_use_legacy_mode) { - _stripe->propagateDefaultDistribution(std::move(distribution)); - } else { - // Should only be called at ctor time, at which point the pool is not yet running. - assert(_stripe_pool.stripe_count() == 0); - auto new_configs = BucketSpaceDistributionConfigs::from_default_distribution(std::move(distribution)); - for (auto& stripe : _stripes) { - stripe->update_distribution_config(new_configs); - } + // Should only be called at ctor time, at which point the pool is not yet running. + assert(_stripe_pool.stripe_count() == 0); + auto new_configs = BucketSpaceDistributionConfigs::from_default_distribution(std::move(distribution)); + for (auto& stripe : _stripes) { + stripe->update_distribution_config(new_configs); } } std::unordered_map<uint16_t, uint32_t> TopLevelDistributor::getMinReplica() const { - if (_use_legacy_mode) { - return _stripe->getMinReplica(); - } else { - std::unordered_map<uint16_t, uint32_t> result; - for (const auto& stripe : _stripes) { - merge_min_replica_stats(result, stripe->getMinReplica()); - } - return result; + std::unordered_map<uint16_t, uint32_t> result; + for (const auto& stripe : _stripes) { + merge_min_replica_stats(result, stripe->getMinReplica()); } + return result; } BucketSpacesStatsProvider::PerNodeBucketSpacesStats TopLevelDistributor::getBucketSpacesStats() const { - if (_use_legacy_mode) { - return _stripe->getBucketSpacesStats(); - } else { - BucketSpacesStatsProvider::PerNodeBucketSpacesStats result; - for (const auto& stripe : _stripes) { - merge_per_node_bucket_spaces_stats(result, stripe->getBucketSpacesStats()); - } - return result; + BucketSpacesStatsProvider::PerNodeBucketSpacesStats result; + for (const auto& stripe : _stripes) { + merge_per_node_bucket_spaces_stats(result, stripe->getBucketSpacesStats()); } + return result; } SimpleMaintenanceScanner::PendingMaintenanceStats TopLevelDistributor::pending_maintenance_stats() const { - if (_use_legacy_mode) { - return _stripe->pending_maintenance_stats(); - } else { - SimpleMaintenanceScanner::PendingMaintenanceStats result; - for (const auto& stripe : _stripes) { - result.merge(stripe->pending_maintenance_stats()); - } - return result; + SimpleMaintenanceScanner::PendingMaintenanceStats result; + for (const auto& stripe : _stripes) { + result.merge(stripe->pending_maintenance_stats()); } + return result; } void TopLevelDistributor::propagateInternalScanMetricsToExternal() { - if (_use_legacy_mode) { - _stripe->propagateInternalScanMetricsToExternal(); - } else { - for (auto &stripe : _stripes) { - stripe->propagateInternalScanMetricsToExternal(); - } - _total_metrics->aggregate(); - _ideal_state_total_metrics->aggregate(); + for (auto &stripe : _stripes) { + stripe->propagateInternalScanMetricsToExternal(); } -} - -void -TopLevelDistributor::scanAllBuckets() -{ - assert(_use_legacy_mode); // TODO STRIPE - _stripe->scanAllBuckets(); + _total_metrics->aggregate(); + _ideal_state_total_metrics->aggregate(); } void @@ -604,7 +392,6 @@ TopLevelDistributor::dispatch_to_main_distributor_thread_queue(const std::shared void TopLevelDistributor::fetch_external_messages() { - assert(!_use_legacy_mode); assert(_fetched_messages.empty()); _fetched_messages.swap(_message_queue); } @@ -612,7 +399,6 @@ TopLevelDistributor::fetch_external_messages() void TopLevelDistributor::process_fetched_external_messages() { - assert(!_use_legacy_mode); for (auto& msg : _fetched_messages) { MBUS_TRACE(msg->getTrace(), 9, "Distributor: Processing message in main thread"); if (!msg->callHandler(*_bucket_db_updater, msg)) { @@ -627,36 +413,25 @@ TopLevelDistributor::process_fetched_external_messages() } framework::ThreadWaitInfo -TopLevelDistributor::doCriticalTick(framework::ThreadIndex idx) +TopLevelDistributor::doCriticalTick([[maybe_unused]] framework::ThreadIndex idx) { _tickResult = framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN; - if (!_use_legacy_mode) { - enableNextDistribution(); - fetch_status_requests(); - fetch_external_messages(); - } + enableNextDistribution(); + fetch_status_requests(); + fetch_external_messages(); // Propagates any new configs down to stripe(s) enable_next_config_if_changed(); - if (_use_legacy_mode) { - _stripe->doCriticalTick(idx); - _tickResult.merge(_stripe->_tickResult); - } return _tickResult; } framework::ThreadWaitInfo -TopLevelDistributor::doNonCriticalTick(framework::ThreadIndex idx) +TopLevelDistributor::doNonCriticalTick([[maybe_unused]] framework::ThreadIndex idx) { - if (_use_legacy_mode) { - _stripe->doNonCriticalTick(idx); - _tickResult = _stripe->_tickResult; - } else { - _tickResult = framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN; - handle_status_requests(); - process_fetched_external_messages(); - send_host_info_if_appropriate(); - _bucket_db_updater->resend_delayed_messages(); - } + _tickResult = framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN; + handle_status_requests(); + process_fetched_external_messages(); + send_host_info_if_appropriate(); + _bucket_db_updater->resend_delayed_messages(); return _tickResult; } @@ -666,20 +441,13 @@ TopLevelDistributor::enable_next_config_if_changed() // Only lazily trigger a config propagation and internal update if something has _actually changed_. if (_component.internal_config_generation() != _current_internal_config_generation) { _total_config = _component.total_distributor_config_sp(); - if (!_use_legacy_mode) { + { auto guard = _stripe_accessor->rendezvous_and_hold_all(); guard->update_total_distributor_config(_component.total_distributor_config_sp()); - } else { - _stripe->update_total_distributor_config(_component.total_distributor_config_sp()); } _hostInfoReporter.enableReporting(config().getEnableHostInfoReporting()); _current_internal_config_generation = _component.internal_config_generation(); } - if (_use_legacy_mode) { - // TODO STRIPE remove these once tests are fixed to trigger reconfig properly - _hostInfoReporter.enableReporting(getConfig().getEnableHostInfoReporting()); - _stripe->enableNextConfig(); // TODO STRIPE avoid redundant call - } } void @@ -688,7 +456,6 @@ TopLevelDistributor::notify_stripe_wants_to_send_host_info(uint16_t stripe_index // TODO STRIPE assert(_done_initializing); (can't currently do due to some unit test restrictions; uncomment and find out) LOG(debug, "Stripe %u has signalled an intent to send host info out-of-band", stripe_index); std::lock_guard lock(_stripe_scan_notify_mutex); - assert(!_use_legacy_mode); assert(stripe_index < _stripe_scan_stats.size()); auto& stats = _stripe_scan_stats[stripe_index]; stats.wants_to_send_host_info = true; @@ -735,7 +502,6 @@ TopLevelDistributor::send_host_info_if_appropriate() void TopLevelDistributor::on_cluster_state_bundle_activated(const lib::ClusterStateBundle& new_bundle) { - assert(!_use_legacy_mode); lib::Node my_node(lib::NodeType::DISTRIBUTOR, getDistributorIndex()); if (!_done_initializing && (new_bundle.getBaselineClusterState()->getNodeState(my_node).getState() == lib::State::UP)) { _done_initializing = true; @@ -780,7 +546,6 @@ TopLevelDistributor::work_was_done() const noexcept vespalib::string TopLevelDistributor::getReportContentType(const framework::HttpUrlPath& path) const { - assert(!_use_legacy_mode); if (path.hasAttribute("page")) { if (path.getAttribute("page") == "buckets") { return "text/html"; @@ -792,18 +557,10 @@ TopLevelDistributor::getReportContentType(const framework::HttpUrlPath& path) co } } -std::string -TopLevelDistributor::getActiveIdealStateOperations() const -{ - assert(_use_legacy_mode); - return _stripe->getActiveIdealStateOperations(); -} - bool TopLevelDistributor::reportStatus(std::ostream& out, const framework::HttpUrlPath& path) const { - assert(!_use_legacy_mode); if (!path.hasAttribute("page") || path.getAttribute("page") == "buckets") { framework::PartlyHtmlStatusReporter htmlReporter(*this); htmlReporter.reportHtmlHeader(out, path); @@ -842,7 +599,6 @@ TopLevelDistributor::reportStatus(std::ostream& out, bool TopLevelDistributor::handleStatusRequest(const DelegatedStatusRequest& request) const { - assert(!_use_legacy_mode); auto wrappedRequest = std::make_shared<DistributorStatus>(request); { framework::TickingLockGuard guard(_threadPool.freezeCriticalTicks()); diff --git a/storage/src/vespa/storage/distributor/top_level_distributor.h b/storage/src/vespa/storage/distributor/top_level_distributor.h index 81a30accf01..420d0df08ed 100644 --- a/storage/src/vespa/storage/distributor/top_level_distributor.h +++ b/storage/src/vespa/storage/distributor/top_level_distributor.h @@ -101,8 +101,6 @@ public: void storageDistributionChanged() override; - bool handleReply(const std::shared_ptr<api::StorageReply>& reply); - // StatusReporter implementation vespalib::string getReportContentType(const framework::HttpUrlPath&) const override; bool reportStatus(std::ostream&, const framework::HttpUrlPath&) const override; @@ -134,43 +132,12 @@ public: private: friend class DistributorStripeTestUtil; - friend class DistributorTestUtil; friend class TopLevelDistributorTestUtil; - friend class LegacyBucketDBUpdaterTest; friend class MetricUpdateHook; friend struct DistributorStripeTest; - friend struct LegacyDistributorTest; friend struct TopLevelDistributorTest; void setNodeStateUp(); - bool handleMessage(const std::shared_ptr<api::StorageMessage>& msg); - - /** - * Enables a new cluster state. Used by tests to bypass TopLevelBucketDBUpdater. - */ - void enableClusterStateBundle(const lib::ClusterStateBundle& clusterStateBundle); - - // Accessors used by tests - std::string getActiveIdealStateOperations() const; - const lib::ClusterStateBundle& getClusterStateBundle() const; - const DistributorConfiguration& getConfig() const; - bool isInRecoveryMode() const noexcept; - PendingMessageTracker& getPendingMessageTracker(); - const PendingMessageTracker& getPendingMessageTracker() const; - DistributorBucketSpaceRepo& getBucketSpaceRepo() noexcept; - const DistributorBucketSpaceRepo& getBucketSpaceRepo() const noexcept; - DistributorBucketSpaceRepo& getReadOnlyBucketSpaceRepo() noexcept; - const DistributorBucketSpaceRepo& getReadyOnlyBucketSpaceRepo() const noexcept; - storage::distributor::DistributorStripeComponent& distributor_component() noexcept; - std::chrono::steady_clock::duration db_memory_sample_interval() const noexcept; - - StripeBucketDBUpdater& bucket_db_updater(); - const StripeBucketDBUpdater& bucket_db_updater() const; - IdealStateManager& ideal_state_manager(); - const IdealStateManager& ideal_state_manager() const; - ExternalOperationHandler& external_operation_handler(); - const ExternalOperationHandler& external_operation_handler() const; - BucketDBMetricUpdater& bucket_db_metric_updater() const noexcept; /** * Return a copy of the latest min replica data, see MinReplicaProvider. @@ -185,7 +152,6 @@ private: * Takes metric lock. */ void propagateInternalScanMetricsToExternal(); - void scanAllBuckets(); void enable_next_config_if_changed(); void fetch_status_requests(); void handle_status_requests(); @@ -217,15 +183,11 @@ private: const NodeIdentity _node_identity; DistributorComponentRegister& _comp_reg; DoneInitializeHandler& _done_init_handler; - const bool _use_legacy_mode; bool _done_initializing; - std::shared_ptr<DistributorMetricSet> _metrics; std::shared_ptr<DistributorTotalMetrics> _total_metrics; - std::shared_ptr<IdealStateMetricSet> _ideal_state_metrics; std::shared_ptr<IdealStateTotalMetrics> _ideal_state_total_metrics; ChainedMessageSender* _messageSender; uint8_t _n_stripe_bits; - std::unique_ptr<DistributorStripe> _stripe; DistributorStripePool& _stripe_pool; std::vector<std::unique_ptr<DistributorStripe>> _stripes; std::unique_ptr<StripeAccessor> _stripe_accessor; |