summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorGeir Storli <geirst@verizonmedia.com>2021-06-24 14:06:00 +0000
committerGeir Storli <geirst@verizonmedia.com>2021-06-25 11:53:55 +0000
commit6012a24b3a24662cc57fc5be86015ef326803342 (patch)
treedeb29eb73224075416cab42e7bd52f291accdc44 /storage
parent0ff6ec0a6fed7ea0617861fc529cf299f097ff58 (diff)
Prepare baseline utils and tests for a single distributor stripe.
This is copied from DistributorTestUtil and LegacyDistributorTest, and adjusted to work with one distributor stripe.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/distributor/CMakeLists.txt2
-rw-r--r--storage/src/tests/distributor/distributor_stripe_test.cpp85
-rw-r--r--storage/src/tests/distributor/distributor_stripe_test_util.cpp495
-rw-r--r--storage/src/tests/distributor/distributor_stripe_test_util.h228
-rw-r--r--storage/src/vespa/storage/bucketdb/bucketinfo.h6
-rw-r--r--storage/src/vespa/storage/distributor/bucketdbupdater.h1
-rw-r--r--storage/src/vespa/storage/distributor/distributor.h2
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe.h2
-rw-r--r--storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h1
9 files changed, 821 insertions, 1 deletions
diff --git a/storage/src/tests/distributor/CMakeLists.txt b/storage/src/tests/distributor/CMakeLists.txt
index fad8ca0bb25..0d314e050d4 100644
--- a/storage/src/tests/distributor/CMakeLists.txt
+++ b/storage/src/tests/distributor/CMakeLists.txt
@@ -13,6 +13,8 @@ vespa_add_executable(storage_distributor_gtest_runner_app TEST
distributor_host_info_reporter_test.cpp
distributor_message_sender_stub.cpp
distributor_stripe_pool_test.cpp
+ distributor_stripe_test.cpp
+ distributor_stripe_test_util.cpp
distributortestutil.cpp
externaloperationhandlertest.cpp
garbagecollectiontest.cpp
diff --git a/storage/src/tests/distributor/distributor_stripe_test.cpp b/storage/src/tests/distributor/distributor_stripe_test.cpp
new file mode 100644
index 00000000000..606117cc693
--- /dev/null
+++ b/storage/src/tests/distributor/distributor_stripe_test.cpp
@@ -0,0 +1,85 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <tests/distributor/distributor_stripe_test_util.h>
+#include <vespa/document/test/make_bucket_space.h>
+#include <vespa/document/test/make_document_bucket.h>
+#include <vespa/storage/distributor/bucket_spaces_stats_provider.h>
+#include <vespa/storage/distributor/distributor.h>
+#include <vespa/storage/distributor/distributor_stripe.h>
+#include <vespa/storageapi/message/persistence.h>
+#include <vespa/storageapi/message/visitor.h>
+#include <vespa/vespalib/gtest/gtest.h>
+#include <vespa/vespalib/text/stringtokenizer.h>
+
+using document::Bucket;
+using document::BucketId;
+using document::BucketSpace;
+using document::FixedBucketSpaces;
+using document::test::makeBucketSpace;
+using document::test::makeDocumentBucket;
+using namespace ::testing;
+
+namespace storage::distributor {
+
+/**
+ * This was copied from LegacyDistributorTest and adjusted to work with one distributor stripe.
+ */
+struct DistributorStripeTest : Test, DistributorStripeTestUtil {
+ DistributorStripeTest();
+ ~DistributorStripeTest() override;
+
+ std::vector<document::BucketSpace> _bucketSpaces;
+
+ void SetUp() override {
+ createLinks();
+ _bucketSpaces = getBucketSpaces();
+ };
+
+ void TearDown() override {
+ close();
+ }
+
+ // Simple type aliases to make interfacing with certain utility functions
+ // easier. Note that this is only for readability and does not provide any
+ // added type safety.
+ using NodeCount = int;
+ using Redundancy = int;
+
+ std::string testOp(std::shared_ptr<api::StorageMessage> msg) {
+ _stripe->handleMessage(msg);
+
+ std::string tmp = _sender.getCommands();
+ _sender.clear();
+ return tmp;
+ }
+
+};
+
+DistributorStripeTest::DistributorStripeTest()
+ : Test(),
+ DistributorStripeTestUtil(),
+ _bucketSpaces()
+{
+}
+
+DistributorStripeTest::~DistributorStripeTest() = default;
+
+TEST_F(DistributorStripeTest, operation_generation) {
+ setupDistributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1");
+
+ document::BucketId bid;
+ addNodesToBucketDB(document::BucketId(16, 1), "0=1/1/1/t");
+
+ EXPECT_EQ("Remove", testOp(std::make_shared<api::RemoveCommand>(
+ makeDocumentBucket(bid),
+ document::DocumentId("id:m:test:n=1:foo"),
+ api::Timestamp(1234))));
+
+ auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "foo", "bar", "");
+ cmd->addBucketToBeVisited(document::BucketId(16, 1));
+ cmd->addBucketToBeVisited(document::BucketId());
+
+ EXPECT_EQ("Visitor Create", testOp(cmd));
+}
+
+}
diff --git a/storage/src/tests/distributor/distributor_stripe_test_util.cpp b/storage/src/tests/distributor/distributor_stripe_test_util.cpp
new file mode 100644
index 00000000000..fcdbcbbec19
--- /dev/null
+++ b/storage/src/tests/distributor/distributor_stripe_test_util.cpp
@@ -0,0 +1,495 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "distributor_stripe_test_util.h"
+#include <vespa/config-stor-distribution.h>
+#include <vespa/document/test/make_bucket_space.h>
+#include <vespa/document/test/make_document_bucket.h>
+#include <vespa/storage/distributor/distributor_bucket_space.h>
+#include <vespa/storage/distributor/distributor_stripe.h>
+#include <vespa/storage/distributor/distributor_stripe_component.h>
+#include <vespa/storage/distributor/distributormetricsset.h>
+#include <vespa/storage/distributor/ideal_state_total_metrics.h>
+#include <vespa/vdslib/distribution/distribution.h>
+#include <vespa/vespalib/text/stringtokenizer.h>
+
+using document::test::makeBucketSpace;
+using document::test::makeDocumentBucket;
+
+namespace storage::distributor {
+
+DistributorStripeTestUtil::DistributorStripeTestUtil()
+ : _config(),
+ _node(),
+ _threadPool(),
+ _stripe(),
+ _sender(),
+ _senderDown(),
+ _hostInfo(),
+ _messageSender(_sender, _senderDown)
+{
+ _config = getStandardConfig(false);
+}
+
+DistributorStripeTestUtil::~DistributorStripeTestUtil() { }
+
+void
+DistributorStripeTestUtil::createLinks()
+{
+ _node.reset(new TestDistributorApp(_config.getConfigId()));
+ _threadPool = framework::TickingThreadPool::createDefault("distributor");
+ _metrics = std::make_shared<DistributorMetricSet>();
+ _ideal_state_metrics = std::make_shared<IdealStateMetricSet>();
+ _stripe = std::make_unique<DistributorStripe>(_node->getComponentRegister(),
+ *_metrics,
+ *_ideal_state_metrics,
+ _node->node_identity(),
+ *_threadPool,
+ *this,
+ _messageSender,
+ *this,
+ false);
+}
+
+void
+DistributorStripeTestUtil::setupDistributor(int redundancy,
+ int nodeCount,
+ const std::string& systemState,
+ uint32_t earlyReturn,
+ bool requirePrimaryToBeWritten)
+{
+ setup_distributor(redundancy, nodeCount, lib::ClusterStateBundle(lib::ClusterState(systemState)), earlyReturn, requirePrimaryToBeWritten);
+}
+
+void
+DistributorStripeTestUtil::setup_distributor(int redundancy,
+ int node_count,
+ const lib::ClusterStateBundle& state,
+ uint32_t early_return,
+ bool require_primary_to_be_written)
+{
+ lib::Distribution::DistributionConfigBuilder config(
+ lib::Distribution::getDefaultDistributionConfig(redundancy, node_count).get());
+ config.redundancy = redundancy;
+ config.initialRedundancy = early_return;
+ config.ensurePrimaryPersisted = require_primary_to_be_written;
+ auto distribution = std::make_shared<lib::Distribution>(config);
+ _node->getComponentRegister().setDistribution(distribution);
+ enable_distributor_cluster_state(state);
+
+ // TODO STRIPE: Update this comment now that stripe is used instead.
+ // This is for all intents and purposes a hack to avoid having the
+ // distributor treat setting the distribution explicitly as a signal that
+ // it should send RequestBucketInfo to all configured nodes.
+ // If we called storage_distribution_changed followed by enableDistribution
+ // explicitly (which is what happens in "real life"), that is what would
+ // take place.
+ // The inverse case of this can be explicitly accomplished by calling
+ // triggerDistributionChange().
+ // This isn't pretty, folks, but it avoids breaking the world for now,
+ // as many tests have implicit assumptions about this being the behavior.
+ auto new_configs = BucketSpaceDistributionConfigs::from_default_distribution(std::move(distribution));
+ _stripe->update_distribution_config(new_configs);
+}
+
+void
+DistributorStripeTestUtil::receive_set_system_state_command(const vespalib::string& state_str)
+{
+ auto state_cmd = std::make_shared<api::SetSystemStateCommand>(lib::ClusterState(state_str));
+ _stripe->handleMessage(state_cmd); // TODO move semantics
+}
+
+void
+DistributorStripeTestUtil::handle_top_level_message(const std::shared_ptr<api::StorageMessage>& msg)
+{
+ _stripe->handleMessage(msg);
+}
+
+void
+DistributorStripeTestUtil::setTypeRepo(const std::shared_ptr<const document::DocumentTypeRepo>& repo)
+{
+ _node->getComponentRegister().setDocumentTypeRepo(repo);
+}
+
+void
+DistributorStripeTestUtil::close()
+{
+ _stripe->flush_and_close();
+ _sender.clear();
+ _node.reset(0);
+ _config = getStandardConfig(false);
+}
+
+namespace {
+
+std::string dumpVector(const std::vector<uint16_t>& vec) {
+ std::ostringstream ost;
+ for (uint32_t i = 0; i < vec.size(); ++i) {
+ if (i != 0) {
+ ost << ",";
+ }
+ ost << vec[i];
+ }
+ return ost.str();
+}
+
+}
+
+std::string
+DistributorStripeTestUtil::getNodes(document::BucketId id)
+{
+ BucketDatabase::Entry entry = getBucket(id);
+
+ if (!entry.valid()) {
+ return id.toString();
+ } else {
+ std::vector<uint16_t> nodes = entry->getNodes();
+ std::sort(nodes.begin(), nodes.end());
+
+ std::ostringstream ost;
+ ost << id << ": " << dumpVector(nodes);
+ return ost.str();
+ }
+}
+
+std::string
+DistributorStripeTestUtil::getIdealStr(document::BucketId id, const lib::ClusterState& state)
+{
+ if (!getDistributorBucketSpace().owns_bucket_in_state(state, id)) {
+ return id.toString();
+ }
+
+ std::vector<uint16_t> nodes;
+ getDistribution().getIdealNodes(
+ lib::NodeType::STORAGE, state, id, nodes);
+ std::sort(nodes.begin(), nodes.end());
+ std::ostringstream ost;
+ ost << id << ": " << dumpVector(nodes);
+ return ost.str();
+}
+
+void
+DistributorStripeTestUtil::addIdealNodes(const lib::ClusterState& state,
+ const document::BucketId& id)
+{
+ BucketDatabase::Entry entry = getBucket(id);
+
+ if (!entry.valid()) {
+ entry = BucketDatabase::Entry(id);
+ }
+
+ std::vector<uint16_t> res;
+ getDistribution().getIdealNodes(
+ lib::NodeType::STORAGE, state, id, res);
+
+ for (uint32_t i = 0; i < res.size(); ++i) {
+ if (state.getNodeState(lib::Node(lib::NodeType::STORAGE, res[i])).getState() !=
+ lib::State::MAINTENANCE)
+ {
+ entry->addNode(BucketCopy(0, res[i], api::BucketInfo(1,1,1)),
+ toVector<uint16_t>(0));
+ }
+ }
+
+ getBucketDatabase().update(entry);
+}
+
+void
+DistributorStripeTestUtil::addNodesToBucketDB(const document::Bucket& bucket, const std::string& nodeStr)
+{
+ BucketDatabase::Entry entry = getBucket(bucket);
+
+ if (!entry.valid()) {
+ entry = BucketDatabase::Entry(bucket.getBucketId());
+ }
+
+ entry->clear();
+
+ vespalib::StringTokenizer tokenizer(nodeStr, ",");
+ for (uint32_t i = 0; i < tokenizer.size(); ++i) {
+ vespalib::StringTokenizer tok2(tokenizer[i], "=");
+ vespalib::StringTokenizer tok3(tok2[1], "/");
+
+ api::BucketInfo info(atoi(tok3[0].data()),
+ atoi(tok3.size() > 1 ? tok3[1].data() : tok3[0].data()),
+ atoi(tok3.size() > 2 ? tok3[2].data() : tok3[0].data()));
+
+ size_t flagsIdx = 3;
+
+ // Meta info override? For simplicity, require both meta count and size
+ if (tok3.size() > 4 && (!tok3[3].empty() && isdigit(tok3[3][0]))) {
+ info.setMetaCount(atoi(tok3[3].data()));
+ info.setUsedFileSize(atoi(tok3[4].data()));
+ flagsIdx = 5;
+ }
+
+ if ((tok3.size() > flagsIdx + 1) && tok3[flagsIdx + 1] == "a") {
+ info.setActive();
+ } else {
+ info.setActive(false);
+ }
+ if ((tok3.size() > flagsIdx + 2) && tok3[flagsIdx + 2] == "r") {
+ info.setReady();
+ } else {
+ info.setReady(false);
+ }
+
+ uint16_t idx = atoi(tok2[0].data());
+ BucketCopy node(
+ 0,
+ idx,
+ info);
+
+ // Allow user to manually override trusted and active.
+ if (tok3.size() > flagsIdx && tok3[flagsIdx] == "t") {
+ node.setTrusted();
+ }
+
+ entry->addNodeManual(node);
+ }
+
+ getBucketDatabase(bucket.getBucketSpace()).update(entry);
+}
+
+void
+DistributorStripeTestUtil::addNodesToBucketDB(const document::BucketId& id,
+ const std::string& nodeStr)
+{
+ addNodesToBucketDB(document::Bucket(makeBucketSpace(), id), nodeStr);
+}
+
+void
+DistributorStripeTestUtil::removeFromBucketDB(const document::BucketId& id)
+{
+ getBucketDatabase().remove(id);
+}
+
+void
+DistributorStripeTestUtil::addIdealNodes(const document::BucketId& id)
+{
+ // TODO STRIPE roundabout way of getting state bundle..!
+ addIdealNodes(*operation_context().cluster_state_bundle().getBaselineClusterState(), id);
+}
+
+void
+DistributorStripeTestUtil::insertBucketInfo(document::BucketId id,
+ uint16_t node,
+ uint32_t checksum,
+ uint32_t count,
+ uint32_t size,
+ bool trusted,
+ bool active)
+{
+ api::BucketInfo info(checksum, count, size);
+ insertBucketInfo(id, node, info, trusted, active);
+}
+
+void
+DistributorStripeTestUtil::insertBucketInfo(document::BucketId id,
+ uint16_t node,
+ const api::BucketInfo& info,
+ bool trusted,
+ bool active)
+{
+ BucketDatabase::Entry entry = getBucketDatabase().get(id);
+ if (!entry.valid()) {
+ entry = BucketDatabase::Entry(id, BucketInfo());
+ }
+
+ api::BucketInfo info2(info);
+ if (active) {
+ info2.setActive();
+ }
+ BucketCopy copy(operation_context().generate_unique_timestamp(), node, info2);
+
+ entry->addNode(copy.setTrusted(trusted), toVector<uint16_t>(0));
+
+ getBucketDatabase().update(entry);
+}
+
+std::string
+DistributorStripeTestUtil::dumpBucket(const document::BucketId& bid)
+{
+ return getBucketDatabase().get(bid).toString();
+}
+
+void
+DistributorStripeTestUtil::sendReply(Operation& op,
+ int idx,
+ api::ReturnCode::Result result)
+{
+ if (idx == -1) {
+ idx = _sender.commands().size() - 1;
+ }
+ assert(idx >= 0 && idx < static_cast<int>(_sender.commands().size()));
+
+ std::shared_ptr<api::StorageCommand> cmd = _sender.command(idx);
+ api::StorageReply::SP reply(cmd->makeReply().release());
+ reply->setResult(result);
+ op.receive(_sender, reply);
+}
+
+BucketDatabase::Entry
+DistributorStripeTestUtil::getBucket(const document::Bucket& bucket) const
+{
+ return getBucketDatabase(bucket.getBucketSpace()).get(bucket.getBucketId());
+}
+
+BucketDatabase::Entry
+DistributorStripeTestUtil::getBucket(const document::BucketId& bId) const
+{
+ return getBucketDatabase().get(bId);
+}
+
+void
+DistributorStripeTestUtil::disableBucketActivationInConfig(bool disable)
+{
+ vespa::config::content::core::StorDistributormanagerConfigBuilder config;
+ config.disableBucketActivation = disable;
+ getConfig().configure(config);
+}
+
+StripeBucketDBUpdater&
+DistributorStripeTestUtil::getBucketDBUpdater() {
+ return _stripe->bucket_db_updater();
+}
+
+IdealStateManager&
+DistributorStripeTestUtil::getIdealStateManager() {
+ return _stripe->ideal_state_manager();
+}
+
+ExternalOperationHandler&
+DistributorStripeTestUtil::getExternalOperationHandler() {
+ return _stripe->external_operation_handler();
+}
+
+const storage::distributor::DistributorNodeContext&
+DistributorStripeTestUtil::node_context() const {
+ return _stripe->_component;
+}
+
+storage::distributor::DistributorStripeOperationContext&
+DistributorStripeTestUtil::operation_context() {
+ return _stripe->_component;
+}
+
+const DocumentSelectionParser&
+DistributorStripeTestUtil::doc_selection_parser() const {
+ return _stripe->_component;
+}
+
+DistributorConfiguration&
+DistributorStripeTestUtil::getConfig() {
+ // TODO STRIPE avoid const cast
+ return const_cast<DistributorConfiguration&>(_stripe->getConfig());
+}
+
+DistributorBucketSpace&
+DistributorStripeTestUtil::getDistributorBucketSpace() {
+ return getBucketSpaceRepo().get(makeBucketSpace());
+}
+
+BucketDatabase&
+DistributorStripeTestUtil::getBucketDatabase() {
+ return getDistributorBucketSpace().getBucketDatabase();
+}
+
+BucketDatabase&
+DistributorStripeTestUtil::getBucketDatabase(document::BucketSpace space) {
+ return getBucketSpaceRepo().get(space).getBucketDatabase();
+}
+
+const BucketDatabase&
+DistributorStripeTestUtil::getBucketDatabase() const {
+ return getBucketSpaceRepo().get(makeBucketSpace()).getBucketDatabase();
+}
+
+const BucketDatabase&
+DistributorStripeTestUtil::getBucketDatabase(document::BucketSpace space) const {
+ return getBucketSpaceRepo().get(space).getBucketDatabase();
+}
+
+DistributorBucketSpaceRepo&
+DistributorStripeTestUtil::getBucketSpaceRepo() {
+ return _stripe->getBucketSpaceRepo();
+}
+
+const DistributorBucketSpaceRepo&
+DistributorStripeTestUtil::getBucketSpaceRepo() const {
+ return _stripe->getBucketSpaceRepo();
+}
+
+DistributorBucketSpaceRepo&
+DistributorStripeTestUtil::getReadOnlyBucketSpaceRepo() {
+ return _stripe->getReadOnlyBucketSpaceRepo();
+}
+
+const DistributorBucketSpaceRepo&
+DistributorStripeTestUtil::getReadOnlyBucketSpaceRepo() const {
+ return _stripe->getReadOnlyBucketSpaceRepo();
+}
+
+bool
+DistributorStripeTestUtil::distributor_is_in_recovery_mode() const noexcept {
+ return _stripe->isInRecoveryMode();
+}
+
+const lib::ClusterStateBundle&
+DistributorStripeTestUtil::current_distributor_cluster_state_bundle() const noexcept {
+ return _stripe->getClusterStateBundle();
+}
+
+std::string
+DistributorStripeTestUtil::active_ideal_state_operations() const {
+ return _stripe->getActiveIdealStateOperations();
+}
+
+const PendingMessageTracker&
+DistributorStripeTestUtil::pending_message_tracker() const noexcept {
+ return _stripe->getPendingMessageTracker();
+}
+
+PendingMessageTracker&
+DistributorStripeTestUtil::pending_message_tracker() noexcept {
+ return _stripe->getPendingMessageTracker();
+}
+
+std::chrono::steady_clock::duration
+DistributorStripeTestUtil::db_memory_sample_interval() const noexcept {
+ return _stripe->db_memory_sample_interval();
+}
+
+const lib::Distribution&
+DistributorStripeTestUtil::getDistribution() const {
+ return getBucketSpaceRepo().get(makeBucketSpace()).getDistribution();
+}
+
+std::vector<document::BucketSpace>
+DistributorStripeTestUtil::getBucketSpaces() const
+{
+ std::vector<document::BucketSpace> res;
+ for (const auto &repo : getBucketSpaceRepo()) {
+ res.push_back(repo.first);
+ }
+ return res;
+}
+
+void
+DistributorStripeTestUtil::enableDistributorClusterState(vespalib::stringref state)
+{
+ getBucketDBUpdater().simulate_cluster_state_bundle_activation(
+ lib::ClusterStateBundle(lib::ClusterState(state)));
+}
+
+void
+DistributorStripeTestUtil::enable_distributor_cluster_state(const lib::ClusterStateBundle& state)
+{
+ getBucketDBUpdater().simulate_cluster_state_bundle_activation(state);
+}
+
+void
+DistributorStripeTestUtil::setSystemState(const lib::ClusterState& systemState) {
+ _stripe->enableClusterStateBundle(lib::ClusterStateBundle(systemState));
+}
+
+}
diff --git a/storage/src/tests/distributor/distributor_stripe_test_util.h b/storage/src/tests/distributor/distributor_stripe_test_util.h
new file mode 100644
index 00000000000..32222d85e95
--- /dev/null
+++ b/storage/src/tests/distributor/distributor_stripe_test_util.h
@@ -0,0 +1,228 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "distributor_message_sender_stub.h"
+#include <tests/common/dummystoragelink.h>
+#include <tests/common/testhelper.h>
+#include <tests/common/teststorageapp.h>
+#include <vespa/storage/common/hostreporter/hostinfo.h>
+#include <vespa/storage/distributor/stripe_host_info_notifier.h>
+
+namespace storage {
+
+namespace framework { struct TickingThreadPool; }
+
+namespace distributor {
+
+class DistributorBucketSpace;
+class DistributorBucketSpaceRepo;
+class DistributorMetricSet;
+class DistributorNodeContext;
+class DistributorStripe;
+class DistributorStripeComponent;
+class DistributorStripeOperationContext;
+class DistributorStripePool;
+class DocumentSelectionParser;
+class ExternalOperationHandler;
+class IdealStateManager;
+class IdealStateMetricSet;
+class Operation;
+class StripeBucketDBUpdater;
+
+/**
+ * Helper class with utilities needed when testing DistributorStripe.
+ *
+ * This was copied from DistributorTestUtil (used in LegacyDistributorTest)
+ * and adjusted to work with one distributor stripe.
+ */
+class DistributorStripeTestUtil : public DoneInitializeHandler,
+ public StripeHostInfoNotifier {
+public:
+ DistributorStripeTestUtil();
+ ~DistributorStripeTestUtil();
+
+ /**
+ * Sets up the storage link chain.
+ */
+ void createLinks();
+ void setTypeRepo(const std::shared_ptr<const document::DocumentTypeRepo>& repo);
+
+ void close();
+
+ /**
+ * Returns a string with the nodes currently stored in the bucket
+ * database for the given bucket.
+ */
+ std::string getNodes(document::BucketId id);
+
+ /**
+ * Returns a string with the ideal state nodes for the given bucket.
+ */
+ std::string getIdealStr(document::BucketId id, const lib::ClusterState& state);
+
+ /**
+ * Adds the ideal nodes for the given bucket and the given cluster state
+ * to the bucket database.
+ */
+ void addIdealNodes(const lib::ClusterState& state, const document::BucketId& id);
+
+ /**
+ * Adds all the ideal nodes for the given bucket to the bucket database.
+ */
+ void addIdealNodes(const document::BucketId& id);
+
+ /**
+ * Parses the given string to a set of node => bucket info data,
+ * and inserts them as nodes in the given bucket.
+ * Format:
+ * "node1=checksum/docs/size,node2=checksum/docs/size"
+ */
+ void addNodesToBucketDB(const document::Bucket& bucket, const std::string& nodeStr);
+ // As the above, but always inserts into default bucket space
+ void addNodesToBucketDB(const document::BucketId& id, const std::string& nodeStr);
+
+ /**
+ * Removes the given bucket from the bucket database.
+ */
+ void removeFromBucketDB(const document::BucketId& id);
+
+ /**
+ * Inserts the given bucket information for the given bucket and node in
+ * the bucket database.
+ */
+ void insertBucketInfo(document::BucketId id,
+ uint16_t node,
+ uint32_t checksum,
+ uint32_t count,
+ uint32_t size,
+ bool trusted = false,
+ bool active = false);
+
+ /**
+ * Inserts the given bucket information for the given bucket and node in
+ * the bucket database.
+ */
+ void insertBucketInfo(document::BucketId id,
+ uint16_t node,
+ const api::BucketInfo& info,
+ bool trusted = false,
+ bool active = false);
+
+ std::string dumpBucket(const document::BucketId& bucket);
+
+ /**
+ * Replies to message idx sent upwards with the given result code.
+ * If idx = -1, replies to the last command received upwards.
+ */
+ void sendReply(Operation& op,
+ int idx = -1,
+ api::ReturnCode::Result result = api::ReturnCode::OK);
+
+ StripeBucketDBUpdater& getBucketDBUpdater();
+ IdealStateManager& getIdealStateManager();
+ ExternalOperationHandler& getExternalOperationHandler();
+ const storage::distributor::DistributorNodeContext& node_context() const;
+ storage::distributor::DistributorStripeOperationContext& operation_context();
+ const DocumentSelectionParser& doc_selection_parser() const;
+
+ DistributorConfiguration& getConfig();
+
+ vdstestlib::DirConfig& getDirConfig() {
+ return _config;
+ }
+
+ // TODO explicit notion of bucket spaces for tests
+ DistributorBucketSpace& getDistributorBucketSpace();
+ BucketDatabase& getBucketDatabase(); // Implicit default space only
+ BucketDatabase& getBucketDatabase(document::BucketSpace space);
+ const BucketDatabase& getBucketDatabase() const; // Implicit default space only
+ const BucketDatabase& getBucketDatabase(document::BucketSpace space) const;
+ DistributorBucketSpaceRepo& getBucketSpaceRepo();
+ const DistributorBucketSpaceRepo& getBucketSpaceRepo() const;
+ DistributorBucketSpaceRepo& getReadOnlyBucketSpaceRepo();
+ const DistributorBucketSpaceRepo& getReadOnlyBucketSpaceRepo() const;
+ [[nodiscard]] bool distributor_is_in_recovery_mode() const noexcept;
+ [[nodiscard]] const lib::ClusterStateBundle& current_distributor_cluster_state_bundle() const noexcept;
+ [[nodiscard]] std::string active_ideal_state_operations() const;
+ [[nodiscard]] const PendingMessageTracker& pending_message_tracker() const noexcept;
+ [[nodiscard]] PendingMessageTracker& pending_message_tracker() noexcept;
+ [[nodiscard]] std::chrono::steady_clock::duration db_memory_sample_interval() const noexcept;
+
+ const lib::Distribution& getDistribution() const;
+
+ framework::defaultimplementation::FakeClock& getClock() { return _node->getClock(); }
+ DistributorComponentRegister& getComponentRegister() { return _node->getComponentRegister(); }
+ DistributorComponentRegisterImpl& getComponentRegisterImpl() { return _node->getComponentRegister(); }
+
+ void setupDistributor(int redundancy,
+ int nodeCount,
+ const std::string& systemState,
+ uint32_t earlyReturn = false,
+ bool requirePrimaryToBeWritten = true);
+
+ void setup_distributor(int redundancy,
+ int node_count,
+ const lib::ClusterStateBundle& state,
+ uint32_t early_return = false,
+ bool require_primary_to_be_written = true);
+
+ // Implements DoneInitializeHandler
+ void notifyDoneInitializing() override {}
+
+ // Implements StripeHostInfoNotifier
+ void notify_stripe_wants_to_send_host_info(uint16_t stripe_index) override {
+ (void) stripe_index;
+ }
+
+ void disableBucketActivationInConfig(bool disable);
+
+ BucketDatabase::Entry getBucket(const document::Bucket& bucket) const;
+ // Gets bucket entry from default space only
+ BucketDatabase::Entry getBucket(const document::BucketId& bId) const;
+
+ std::vector<document::BucketSpace> getBucketSpaces() const;
+
+ DistributorMessageSenderStub& sender() noexcept { return _sender; }
+ const DistributorMessageSenderStub& sender() const noexcept { return _sender; }
+
+ void setSystemState(const lib::ClusterState& systemState);
+
+ // Invokes full cluster state transition pipeline rather than directly applying
+ // the state and just pretending everything has been completed.
+ void receive_set_system_state_command(const vespalib::string& state_str);
+
+ void handle_top_level_message(const std::shared_ptr<api::StorageMessage>& msg);
+
+protected:
+ vdstestlib::DirConfig _config;
+ std::unique_ptr<TestDistributorApp> _node;
+ std::unique_ptr<framework::TickingThreadPool> _threadPool;
+ std::shared_ptr<DistributorMetricSet> _metrics;
+ std::shared_ptr<IdealStateMetricSet> _ideal_state_metrics;
+ std::unique_ptr<DistributorStripe> _stripe;
+ DistributorMessageSenderStub _sender;
+ DistributorMessageSenderStub _senderDown;
+ HostInfo _hostInfo;
+
+ struct MessageSenderImpl : public ChainedMessageSender {
+ DistributorMessageSenderStub& _sender;
+ DistributorMessageSenderStub& _senderDown;
+ MessageSenderImpl(DistributorMessageSenderStub& up, DistributorMessageSenderStub& down)
+ : _sender(up), _senderDown(down) {}
+
+ void sendUp(const std::shared_ptr<api::StorageMessage>& msg) override {
+ _sender.send(msg);
+ }
+ void sendDown(const std::shared_ptr<api::StorageMessage>& msg) override {
+ _senderDown.send(msg);
+ }
+ };
+ MessageSenderImpl _messageSender;
+
+ void enableDistributorClusterState(vespalib::stringref state);
+ void enable_distributor_cluster_state(const lib::ClusterStateBundle& state);
+};
+
+}
+
+}
diff --git a/storage/src/vespa/storage/bucketdb/bucketinfo.h b/storage/src/vespa/storage/bucketdb/bucketinfo.h
index 1fd6c824904..533192eda89 100644
--- a/storage/src/vespa/storage/bucketdb/bucketinfo.h
+++ b/storage/src/vespa/storage/bucketdb/bucketinfo.h
@@ -7,7 +7,10 @@
namespace storage {
-namespace distributor { class DistributorTestUtil; }
+namespace distributor {
+ class DistributorStripeTestUtil;
+ class DistributorTestUtil;
+}
enum class TrustedUpdate {
UPDATE,
@@ -200,6 +203,7 @@ public:
void clear() { _nodes.clear(); }
private:
+ friend class distributor::DistributorStripeTestUtil;
friend class distributor::DistributorTestUtil;
/**
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.h b/storage/src/vespa/storage/distributor/bucketdbupdater.h
index 1ec48aa4d63..787f0362da6 100644
--- a/storage/src/vespa/storage/distributor/bucketdbupdater.h
+++ b/storage/src/vespa/storage/distributor/bucketdbupdater.h
@@ -72,6 +72,7 @@ public:
private:
+ friend class DistributorStripeTestUtil;
friend class DistributorTestUtil;
// Only to be used by tests that want to ensure both the BucketDBUpdater _and_ the Distributor
// components agree on the currently active cluster state bundle.
diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h
index 41d88f5dba1..8a522a34be1 100644
--- a/storage/src/vespa/storage/distributor/distributor.h
+++ b/storage/src/vespa/storage/distributor/distributor.h
@@ -125,9 +125,11 @@ public:
};
private:
+ friend class DistributorStripeTestUtil;
friend class DistributorTestUtil;
friend class LegacyBucketDBUpdaterTest;
friend class MetricUpdateHook;
+ friend struct DistributorStripeTest;
friend struct LegacyDistributorTest;
void setNodeStateUp();
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.h b/storage/src/vespa/storage/distributor/distributor_stripe.h
index 8f3de38aec7..9deb0754a1f 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe.h
+++ b/storage/src/vespa/storage/distributor/distributor_stripe.h
@@ -196,10 +196,12 @@ public:
private:
// TODO STRIPE: reduce number of friends. DistributorStripe too popular for its own good.
friend class Distributor;
+ friend class DistributorStripeTestUtil;
friend class DistributorTestUtil;
friend class LegacyBucketDBUpdaterTest;
friend class MetricUpdateHook;
friend class MultiThreadedStripeAccessGuard;
+ friend struct DistributorStripeTest;
friend struct LegacyDistributorTest;
bool handleMessage(const std::shared_ptr<api::StorageMessage>& msg);
diff --git a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h
index ed059a32d14..cf2be06bda9 100644
--- a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h
+++ b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h
@@ -142,6 +142,7 @@ private:
}
};
+ friend class DistributorStripeTestUtil;
friend class DistributorTestUtil;
// TODO refactor and rewire to avoid needing this direct meddling
friend class DistributorStripe;