diff options
Diffstat (limited to 'storage')
96 files changed, 1378 insertions, 1360 deletions
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt index eeec705b13f..c59d095fb4c 100644 --- a/storage/CMakeLists.txt +++ b/storage/CMakeLists.txt @@ -54,7 +54,6 @@ vespa_define_module( TEST_DEPENDS messagebus_messagebus-test - vdstestlib TEST_EXTERNAL_DEPENDS ${VESPA_ATOMIC_LIB} diff --git a/storage/src/tests/bucketdb/bucketmanagertest.cpp b/storage/src/tests/bucketdb/bucketmanagertest.cpp index 92547a83d25..6aff5c52598 100644 --- a/storage/src/tests/bucketdb/bucketmanagertest.cpp +++ b/storage/src/tests/bucketdb/bucketmanagertest.cpp @@ -3,6 +3,7 @@ #include <tests/common/dummystoragelink.h> #include <tests/common/testhelper.h> #include <tests/common/teststorageapp.h> +#include <tests/common/storage_config_set.h> #include <vespa/config/helper/configgetter.hpp> #include <vespa/document/config/config-documenttypes.h> #include <vespa/document/datatype/documenttype.h> @@ -13,17 +14,17 @@ #include <vespa/document/update/documentupdate.h> #include <vespa/metrics/updatehook.h> #include <vespa/storage/bucketdb/bucketmanager.h> -#include <vespa/storage/common/global_bucket_space_distribution_converter.h> #include <vespa/storage/persistence/filestorage/filestormanager.h> #include <vespa/storageapi/message/bucketsplitting.h> #include <vespa/storageapi/message/persistence.h> #include <vespa/storageapi/message/state.h> #include <vespa/vdslib/distribution/distribution.h> +#include <vespa/vdslib/distribution/global_bucket_space_distribution_converter.h> #include <vespa/vdslib/state/clusterstate.h> #include <vespa/vdslib/state/random.h> #include <vespa/vespalib/gtest/gtest.h> #include <vespa/vespalib/stllike/asciistream.h> -#include <vespa/config-stor-filestor.h> +#include <vespa/config-stor-distribution.h> #include <future> #include <vespa/log/log.h> @@ -59,6 +60,7 @@ struct TestParams; struct BucketManagerTest : public Test { public: + std::unique_ptr<StorageConfigSet> _config; std::unique_ptr<TestServiceLayerApp> _node; std::unique_ptr<DummyStorageLink> _top; BucketManager *_manager; @@ -69,8 +71,7 @@ public: ~BucketManagerTest() override; - void setupTestEnvironment(bool fakePersistenceLayer = true, - bool noDelete = false); + void setupTestEnvironment(); void addBucketsToDB(uint32_t count); bool wasBlockedDueToLastModified(api::StorageMessage* msg, uint64_t lastModified); void insertSingleBucket(const document::BucketId& bucket, const api::BucketInfo& info); @@ -125,46 +126,24 @@ BucketManagerTest::~BucketManagerTest() = default; FAIL() << ost.str(); \ } -std::string getMkDirDisk(const std::string & rootFolder, int disk) { - std::ostringstream os; - os << "mkdir -p " << rootFolder << "/disks/d" << disk; - return os.str(); -} - -void BucketManagerTest::setupTestEnvironment(bool fakePersistenceLayer, bool noDelete) +void BucketManagerTest::setupTestEnvironment() { - vdstestlib::DirConfig config(getStandardConfig(true, "bucketmanagertest")); - std::string rootFolder = getRootFolder(config); - if (!noDelete) { - assert(system(("rm -rf " + rootFolder).c_str()) == 0); - } - assert(system(getMkDirDisk(rootFolder, 0).c_str()) == 0); - assert(system(getMkDirDisk(rootFolder, 1).c_str()) == 0); - + _config = StorageConfigSet::make_storage_node_config(); auto repo = std::make_shared<const DocumentTypeRepo>( *ConfigGetter<DocumenttypesConfig>::getConfig("config-doctypes", FileSpec("../config-doctypes.cfg"))); _top = std::make_unique<DummyStorageLink>(); - _node = std::make_unique<TestServiceLayerApp>(NodeIndex(0), config.getConfigId()); + _node = std::make_unique<TestServiceLayerApp>(NodeIndex(0), _config->config_uri()); _node->setTypeRepo(repo); _node->setupDummyPersistence(); // Set up the 3 links - auto config_uri = config::ConfigUri(config.getConfigId()); using vespa::config::content::core::StorServerConfig; - auto manager = std::make_unique<BucketManager>(*config_from<StorServerConfig>(config_uri), _node->getComponentRegister()); + auto manager = std::make_unique<BucketManager>(*config_from<StorServerConfig>(_config->config_uri()), _node->getComponentRegister()); _manager = manager.get(); _top->push_back(std::move(manager)); - if (fakePersistenceLayer) { - auto bottom = std::make_unique<DummyStorageLink>(); - _bottom = bottom.get(); - _top->push_back(std::move(bottom)); - } else { - using StorFilestorConfig = vespa::config::content::internal::InternalStorFilestorType; - auto bottom = std::make_unique<FileStorManager>(*config_from<StorFilestorConfig>(config_uri), - _node->getPersistenceProvider(), _node->getComponentRegister(), - *_node, _node->get_host_info()); - _top->push_back(std::move(bottom)); - } - // Generate a doc to use for testing.. + auto bottom = std::make_unique<DummyStorageLink>(); + _bottom = bottom.get(); + _top->push_back(std::move(bottom)); + const DocumentType &type(*_node->getTypeRepo()->getDocumentType("text/html")); _document = std::make_shared<document::Document>(*_node->getTypeRepo(), type, document::DocumentId("id:ns:text/html::ntnu")); } @@ -689,7 +668,7 @@ public: static std::unique_ptr<lib::Distribution> default_grouped_distribution() { return std::make_unique<lib::Distribution>( - lib::Distribution::ConfigWrapper(GlobalBucketSpaceDistributionConverter::string_to_config(vespalib::string( + lib::Distribution::ConfigWrapper(lib::GlobalBucketSpaceDistributionConverter::string_to_config(vespalib::string( R"(redundancy 2 group[3] group[0].name "invalid" @@ -713,7 +692,7 @@ group[2].nodes[2].index 5 static std::shared_ptr<lib::Distribution> derived_global_grouped_distribution() { auto default_distr = default_grouped_distribution(); - return GlobalBucketSpaceDistributionConverter::convert_to_global(*default_distr); + return lib::GlobalBucketSpaceDistributionConverter::convert_to_global(*default_distr); } private: diff --git a/storage/src/tests/common/CMakeLists.txt b/storage/src/tests/common/CMakeLists.txt index 36487660cce..42f2ac7b40b 100644 --- a/storage/src/tests/common/CMakeLists.txt +++ b/storage/src/tests/common/CMakeLists.txt @@ -3,7 +3,7 @@ vespa_add_library(storage_testcommon TEST SOURCES dummystoragelink.cpp message_sender_stub.cpp - testhelper.cpp + storage_config_set.cpp testnodestateupdater.cpp teststorageapp.cpp DEPENDS @@ -14,7 +14,6 @@ vespa_add_executable(storage_common_gtest_runner_app TEST SOURCES bucket_stripe_utils_test.cpp bucket_utils_test.cpp - global_bucket_space_distribution_converter_test.cpp gtest_runner.cpp metricstest.cpp storagelinktest.cpp diff --git a/storage/src/tests/common/global_bucket_space_distribution_converter_test.cpp b/storage/src/tests/common/global_bucket_space_distribution_converter_test.cpp deleted file mode 100644 index 774f90821fa..00000000000 --- a/storage/src/tests/common/global_bucket_space_distribution_converter_test.cpp +++ /dev/null @@ -1,350 +0,0 @@ -// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include <vespa/storage/common/global_bucket_space_distribution_converter.h> -#include <vespa/vdslib/distribution/distribution.h> -#include <vespa/vdslib/state/clusterstate.h> -#include <vespa/vespalib/gtest/gtest.h> - -using namespace ::testing; - -namespace storage { - -using DistributionConfig = vespa::config::content::StorDistributionConfig; - -namespace { - -vespalib::string default_to_global_config(const vespalib::string& default_config) { - auto default_cfg = GlobalBucketSpaceDistributionConverter::string_to_config(default_config); - auto as_global = GlobalBucketSpaceDistributionConverter::convert_to_global(*default_cfg); - return GlobalBucketSpaceDistributionConverter::config_to_string(*as_global); -} - -vespalib::string default_flat_config( -R"(redundancy 1 -group[1] -group[0].name "invalid" -group[0].index "invalid" -group[0].partitions 1|* -group[0].nodes[3] -group[0].nodes[0].index 0 -group[0].nodes[1].index 1 -group[0].nodes[2].index 2 -)"); - -vespalib::string expected_flat_global_config( -R"(redundancy 3 -initial_redundancy 0 -ensure_primary_persisted true -ready_copies 3 -active_per_leaf_group true -group[0].index "invalid" -group[0].name "invalid" -group[0].capacity 1 -group[0].partitions "*" -group[0].nodes[0].index 0 -group[0].nodes[0].retired false -group[0].nodes[1].index 1 -group[0].nodes[1].retired false -group[0].nodes[2].index 2 -group[0].nodes[2].retired false -)"); - -} - -TEST(GlobalBucketSpaceDistributionConverterTest, can_transform_flat_cluster_config) { - EXPECT_EQ(expected_flat_global_config, default_to_global_config(default_flat_config)); -} - - -TEST(GlobalBucketSpaceDistributionConverterTest, can_transform_single_level_multi_group_config) { - vespalib::string default_config( -R"(redundancy 2 -group[3] -group[0].name "invalid" -group[0].index "invalid" -group[0].partitions 1|* -group[0].nodes[0] -group[1].name rack0 -group[1].index 0 -group[1].nodes[3] -group[1].nodes[0].index 0 -group[1].nodes[1].index 1 -group[1].nodes[2].index 2 -group[2].name rack1 -group[2].index 1 -group[2].nodes[3] -group[2].nodes[0].index 3 -group[2].nodes[1].index 4 -group[2].nodes[2].index 5 -)"); - - // The config converter cannot distinguish between default values - // and explicitly set ones, so we get a few more entries in our output - // config string. - // Most crucial parts of the transformed config is the root redundancy - // and the new partition config. We test _all_ config fields here so that - // we catch anything we miss transferring state of. - vespalib::string expected_global_config( -R"(redundancy 6 -initial_redundancy 0 -ensure_primary_persisted true -ready_copies 6 -active_per_leaf_group true -group[0].index "invalid" -group[0].name "invalid" -group[0].capacity 1 -group[0].partitions "*|*" -group[1].index "0" -group[1].name "rack0" -group[1].capacity 1 -group[1].partitions "" -group[1].nodes[0].index 0 -group[1].nodes[0].retired false -group[1].nodes[1].index 1 -group[1].nodes[1].retired false -group[1].nodes[2].index 2 -group[1].nodes[2].retired false -group[2].index "1" -group[2].name "rack1" -group[2].capacity 1 -group[2].partitions "" -group[2].nodes[0].index 3 -group[2].nodes[0].retired false -group[2].nodes[1].index 4 -group[2].nodes[1].retired false -group[2].nodes[2].index 5 -group[2].nodes[2].retired false -)"); - EXPECT_EQ(expected_global_config, default_to_global_config(default_config)); -} - -TEST(GlobalBucketSpaceDistributionConverterTest, can_transform_multi_level_multi_group_config) { - vespalib::string default_config( -R"(redundancy 2 -group[5] -group[0].name "invalid" -group[0].index "invalid" -group[0].partitions *|* -group[0].nodes[0] -group[1].name switch0 -group[1].index 0 -group[1].partitions 1|* -group[1].nodes[0] -group[2].name rack0 -group[2].index 0.0 -group[2].nodes[1] -group[2].nodes[0].index 0 -group[3].name rack1 -group[3].index 0.1 -group[3].nodes[1] -group[3].nodes[0].index 1 -group[4].name switch1 -group[4].index 1 -group[4].partitions * -group[4].nodes[0] -group[5].name rack0 -group[5].index 1.0 -group[5].nodes[1] -group[5].nodes[0].index 2 -group[6].name rack1 -group[6].index 1.1 -group[6].nodes[1] -group[6].nodes[0].index 3 -)"); - - // Note: leaf groups do not have a partition spec, only inner groups. - vespalib::string expected_global_config( -R"(redundancy 4 -initial_redundancy 0 -ensure_primary_persisted true -ready_copies 4 -active_per_leaf_group true -group[0].index "invalid" -group[0].name "invalid" -group[0].capacity 1 -group[0].partitions "*|*" -group[1].index "0" -group[1].name "switch0" -group[1].capacity 1 -group[1].partitions "*|*" -group[2].index "0.0" -group[2].name "rack0" -group[2].capacity 1 -group[2].partitions "" -group[2].nodes[0].index 0 -group[2].nodes[0].retired false -group[3].index "0.1" -group[3].name "rack1" -group[3].capacity 1 -group[3].partitions "" -group[3].nodes[0].index 1 -group[3].nodes[0].retired false -group[4].index "1" -group[4].name "switch1" -group[4].capacity 1 -group[4].partitions "*|*" -group[5].index "1.0" -group[5].name "rack0" -group[5].capacity 1 -group[5].partitions "" -group[5].nodes[0].index 2 -group[5].nodes[0].retired false -group[6].index "1.1" -group[6].name "rack1" -group[6].capacity 1 -group[6].partitions "" -group[6].nodes[0].index 3 -group[6].nodes[0].retired false -)"); - EXPECT_EQ(expected_global_config, default_to_global_config(default_config)); -} - -// FIXME partition specs are order-invariant with regards to groups, so heterogenous -// setups will not produce the expected replica distribution. -// TODO Consider disallowing entirely when using global docs. -TEST(GlobalBucketSpaceDistributionConverterTest, can_transform_heterogenous_multi_group_config) { - vespalib::string default_config( -R"(redundancy 2 -ready_copies 2 -group[3] -group[0].name "invalid" -group[0].index "invalid" -group[0].partitions "1|*" -group[0].nodes[0] -group[1].name rack0 -group[1].index 0 -group[1].nodes[2] -group[1].nodes[0].index 0 -group[1].nodes[1].index 1 -group[2].name rack1 -group[2].index 1 -group[2].nodes[1] -group[2].nodes[1].index 2 -)"); - - vespalib::string expected_global_config( -R"(redundancy 3 -initial_redundancy 0 -ensure_primary_persisted true -ready_copies 3 -active_per_leaf_group true -group[0].index "invalid" -group[0].name "invalid" -group[0].capacity 1 -group[0].partitions "*|*" -group[1].index "0" -group[1].name "rack0" -group[1].capacity 1 -group[1].partitions "" -group[1].nodes[0].index 0 -group[1].nodes[0].retired false -group[1].nodes[1].index 1 -group[1].nodes[1].retired false -group[2].index "1" -group[2].name "rack1" -group[2].capacity 1 -group[2].partitions "" -group[2].nodes[0].index 2 -group[2].nodes[0].retired false -)"); - EXPECT_EQ(expected_global_config, default_to_global_config(default_config)); -} - -TEST(GlobalBucketSpaceDistributionConverterTest, can_transform_concrete_distribution_instance) { - auto default_cfg = GlobalBucketSpaceDistributionConverter::string_to_config(default_flat_config); - lib::Distribution flat_distr(*default_cfg); - auto global_distr = GlobalBucketSpaceDistributionConverter::convert_to_global(flat_distr); - EXPECT_EQ(expected_flat_global_config, global_distr->serialize()); -} - -TEST(GlobalBucketSpaceDistributionConverterTest, config_retired_state_is_propagated) { - vespalib::string default_config( -R"(redundancy 1 -group[1] -group[0].name "invalid" -group[0].index "invalid" -group[0].partitions 1|* -group[0].nodes[3] -group[0].nodes[0].index 0 -group[0].nodes[0].retired false -group[0].nodes[1].index 1 -group[0].nodes[1].retired true -group[0].nodes[2].index 2 -group[0].nodes[2].retired true -)"); - - auto default_cfg = GlobalBucketSpaceDistributionConverter::string_to_config(default_config); - auto as_global = GlobalBucketSpaceDistributionConverter::convert_to_global(*default_cfg); - - ASSERT_EQ(1, as_global->group.size()); - ASSERT_EQ(3, as_global->group[0].nodes.size()); - EXPECT_FALSE(as_global->group[0].nodes[0].retired); - EXPECT_TRUE(as_global->group[0].nodes[1].retired); - EXPECT_TRUE(as_global->group[0].nodes[2].retired); -} - -TEST(GlobalBucketSpaceDistributionConverterTest, group_capacities_are_propagated) { - vespalib::string default_config( -R"(redundancy 2 -group[3] -group[0].name "invalid" -group[0].index "invalid" -group[0].partitions 1|* -group[0].capacity 5 -group[0].nodes[0] -group[1].name rack0 -group[1].index 0 -group[1].capacity 2 -group[1].nodes[1] -group[1].nodes[0].index 0 -group[2].name rack1 -group[2].capacity 3 -group[2].index 1 -group[2].nodes[1] -group[2].nodes[0].index 1 -)"); - auto default_cfg = GlobalBucketSpaceDistributionConverter::string_to_config(default_config); - auto as_global = GlobalBucketSpaceDistributionConverter::convert_to_global(*default_cfg); - - ASSERT_EQ(3, as_global->group.size()); - EXPECT_DOUBLE_EQ(5.0, as_global->group[0].capacity); - EXPECT_DOUBLE_EQ(2.0, as_global->group[1].capacity); - EXPECT_DOUBLE_EQ(3.0, as_global->group[2].capacity); -} - -TEST(GlobalBucketSpaceDistributionConverterTest, global_distribution_has_same_owner_distributors_as_default) { - vespalib::string default_config( -R"(redundancy 2 -ready_copies 2 -group[3] -group[0].name "invalid" -group[0].index "invalid" -group[0].partitions 1|* -group[0].nodes[0] -group[1].name rack0 -group[1].index 0 -group[1].nodes[1] -group[1].nodes[0].index 0 -group[2].name rack1 -group[2].index 1 -group[2].nodes[2] -group[2].nodes[0].index 1 -group[2].nodes[1].index 2 -)"); - - auto default_cfg = GlobalBucketSpaceDistributionConverter::string_to_config(default_config); - auto global_cfg = GlobalBucketSpaceDistributionConverter::convert_to_global(*default_cfg); - - lib::Distribution default_distr(*default_cfg); - lib::Distribution global_distr(*global_cfg); - lib::ClusterState state("distributor:6 storage:6"); - - for (unsigned int i = 0; i < UINT16_MAX; ++i) { - document::BucketId bucket(16, i); - const auto default_index = default_distr.getIdealDistributorNode(state, bucket, "ui"); - const auto global_index = global_distr.getIdealDistributorNode(state, bucket, "ui"); - ASSERT_EQ(default_index, global_index); - } -} - -} diff --git a/storage/src/tests/common/metricstest.cpp b/storage/src/tests/common/metricstest.cpp index 899c1979e86..6ca84f0304a 100644 --- a/storage/src/tests/common/metricstest.cpp +++ b/storage/src/tests/common/metricstest.cpp @@ -1,5 +1,9 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <tests/common/dummystoragelink.h> +#include <tests/common/storage_config_set.h> +#include <tests/common/teststorageapp.h> +#include <tests/common/testhelper.h> #include <vespa/storageapi/message/persistence.h> #include <vespa/storageframework/defaultimplementation/clock/fakeclock.h> #include <vespa/storage/bucketdb/bucketmanager.h> @@ -7,9 +11,6 @@ #include <vespa/storage/persistence/filestorage/filestormanager.h> #include <vespa/storage/persistence/filestorage/filestormetrics.h> #include <vespa/storage/visiting/visitormetrics.h> -#include <tests/common/teststorageapp.h> -#include <tests/common/testhelper.h> -#include <tests/common/dummystoragelink.h> #include <vespa/metrics/metricmanager.h> #include <vespa/config/common/exceptions.h> #include <vespa/vespalib/gtest/gtest.h> @@ -26,10 +27,10 @@ namespace storage { struct MetricsTest : public Test { framework::defaultimplementation::FakeClock* _clock; + std::unique_ptr<StorageConfigSet> _config; std::unique_ptr<TestServiceLayerApp> _node; std::unique_ptr<DummyStorageLink> _top; std::unique_ptr<StatusMetricConsumer> _metricsConsumer; - std::unique_ptr<vdstestlib::DirConfig> _config; std::unique_ptr<metrics::MetricSet> _topSet; std::unique_ptr<metrics::MetricManager> _metricManager; std::shared_ptr<FileStorMetrics> _filestorMetrics; @@ -66,17 +67,13 @@ MetricsTest::MetricsTest() MetricsTest::~MetricsTest() = default; void MetricsTest::SetUp() { - _config = std::make_unique<vdstestlib::DirConfig>(getStandardConfig(true, "metricstest")); - std::filesystem::remove_all(std::filesystem::path(getRootFolder(*_config))); - try { - _node = std::make_unique<TestServiceLayerApp>(NodeIndex(0), _config->getConfigId()); - _node->setupDummyPersistence(); - _clock = &_node->getClock(); - _clock->setAbsoluteTimeInSeconds(1000000); - _top = std::make_unique<DummyStorageLink>(); - } catch (config::InvalidConfigException& e) { - fprintf(stderr, "%s\n", e.what()); - } + _config = StorageConfigSet::make_storage_node_config(); + _node = std::make_unique<TestServiceLayerApp>(NodeIndex(0), _config->config_uri()); + _node->setupDummyPersistence(); + _clock = &_node->getClock(); + _clock->setAbsoluteTimeInSeconds(1000000); + _top = std::make_unique<DummyStorageLink>(); + _metricManager = std::make_unique<metrics::MetricManager>(std::make_unique<MetricClock>(*_clock)); _topSet.reset(new metrics::MetricSet("vds", {}, "")); { @@ -96,7 +93,7 @@ void MetricsTest::SetUp() { _visitorMetrics = std::make_shared<VisitorMetrics>(); _visitorMetrics->initThreads(4); _topSet->registerMetric(*_visitorMetrics); - _metricManager->init(config::ConfigUri(_config->getConfigId())); + _metricManager->init(_config->config_uri()); } void MetricsTest::TearDown() { diff --git a/storage/src/tests/common/storage_config_set.cpp b/storage/src/tests/common/storage_config_set.cpp new file mode 100644 index 00000000000..ed10e113867 --- /dev/null +++ b/storage/src/tests/common/storage_config_set.cpp @@ -0,0 +1,153 @@ +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "storage_config_set.h" +#include <vespa/config-bucketspaces.h> +#include <vespa/config-persistence.h> +#include <vespa/config-slobroks.h> +#include <vespa/config-stor-distribution.h> +#include <vespa/config-stor-filestor.h> +#include <vespa/config-upgrading.h> +#include <vespa/document/repo/configbuilder.h> +#include <vespa/document/repo/document_type_repo_factory.h> +#include <vespa/document/repo/documenttyperepo.h> +#include <vespa/messagebus/config-messagebus.h> +#include <vespa/messagebus/testlib/slobrok.h> +#include <vespa/metrics/config-metricsmanager.h> +#include <vespa/storage/config/config-stor-bouncer.h> +#include <vespa/storage/config/config-stor-communicationmanager.h> +#include <vespa/storage/config/config-stor-distributormanager.h> +#include <vespa/storage/config/config-stor-prioritymapping.h> +#include <vespa/storage/config/config-stor-server.h> +#include <vespa/storage/config/config-stor-status.h> +#include <vespa/storage/config/config-stor-visitordispatcher.h> +#include <vespa/storage/visiting/config-stor-visitor.h> +#include <vespa/document/base/testdocrepo.h> +#include <vespa/vespalib/util/stringfmt.h> + +namespace storage { + +StorageConfigSet::StorageConfigSet(vespalib::string config_id_str, bool is_storage_node) + : _document_type_config(std::make_unique<DocumenttypesConfigBuilder>()), + _slobroks_config(std::make_unique<SlobroksConfigBuilder>()), + _messagebus_config(std::make_unique<MessagebusConfigBuilder>()), + _metrics_config(std::make_unique<MetricsmanagerConfigBuilder>()), + _persistence_config(std::make_unique<PersistenceConfigBuilder>()), + _distribution_config(std::make_unique<StorDistributionConfigBuilder>()), + _filestor_config(std::make_unique<StorFilestorConfigBuilder>()), + _upgrading_config(std::make_unique<UpgradingConfigBuilder>()), + _bucket_spaces_config(std::make_unique<BucketspacesConfigBuilder>()), + _bouncer_config(std::make_unique<StorBouncerConfigBuilder>()), + _communication_manager_config(std::make_unique<StorCommunicationmanagerConfigBuilder>()), + _distributor_manager_config(std::make_unique<StorDistributormanagerConfigBuilder>()), + _priority_mapping_config(std::make_unique<StorPrioritymappingConfigBuilder>()), + _server_config(std::make_unique<StorServerConfigBuilder>()), + _status_config(std::make_unique<StorStatusConfigBuilder>()), + _visitor_config(std::make_unique<StorVisitorConfigBuilder>()), + _visitor_dispatcher_config(std::make_unique<StorVisitordispatcherConfigBuilder>()), + _config_id_str(std::move(config_id_str)), + _config_ctx(std::make_shared<config::ConfigContext>(_config_set)), + _config_uri(_config_id_str, _config_ctx) +{ + _config_set.addBuilder(_config_id_str, _document_type_config.get()); + _config_set.addBuilder(_config_id_str, _slobroks_config.get()); + _config_set.addBuilder(_config_id_str, _messagebus_config.get()); + _config_set.addBuilder(_config_id_str, _metrics_config.get()); + _config_set.addBuilder(_config_id_str, _persistence_config.get()); + _config_set.addBuilder(_config_id_str, _distribution_config.get()); + _config_set.addBuilder(_config_id_str, _filestor_config.get()); + _config_set.addBuilder(_config_id_str, _upgrading_config.get()); + _config_set.addBuilder(_config_id_str, _bucket_spaces_config.get()); + _config_set.addBuilder(_config_id_str, _bouncer_config.get()); + _config_set.addBuilder(_config_id_str, _communication_manager_config.get()); + _config_set.addBuilder(_config_id_str, _distributor_manager_config.get()); + _config_set.addBuilder(_config_id_str, _priority_mapping_config.get()); + _config_set.addBuilder(_config_id_str, _server_config.get()); + _config_set.addBuilder(_config_id_str, _status_config.get()); + _config_set.addBuilder(_config_id_str, _visitor_config.get()); + _config_set.addBuilder(_config_id_str, _visitor_dispatcher_config.get()); + + init_default_configs(is_storage_node); + _config_ctx->reload(); +} + +StorageConfigSet::~StorageConfigSet() = default; + +void StorageConfigSet::init_default_configs(bool is_storage_node) { + // Most configs are left with their default values, with explicit values being a + // union of the legacy DirConfig test helpers. + *_document_type_config = document::TestDocRepo().getTypeConfig(); + + add_metric_consumer("status", {"*"}); + add_metric_consumer("statereporter", {"*"}); + + add_distribution_config(50); + add_bucket_space_mapping("testdoctype1", "default"); + + _communication_manager_config->rpcport = 0; + _communication_manager_config->mbusport = 0; + + _distributor_manager_config->splitcount = 1000; + _distributor_manager_config->splitsize = 10000000; + _distributor_manager_config->joincount = 500; + _distributor_manager_config->joinsize = 5000000; + _distributor_manager_config->maxClusterClockSkewSec = 0; + + _filestor_config->numThreads = 1; + _filestor_config->numResponseThreads = 1; + + _persistence_config->abortOperationsWithChangedBucketOwnership = true; + + _server_config->clusterName = "storage"; + _server_config->nodeIndex = 0; + _server_config->isDistributor = !is_storage_node; + _server_config->maxMergesPerNode = 25; + _server_config->maxMergeQueueSize = 20; + _server_config->resourceExhaustionMergeBackPressureDurationSecs = 15.0; + _server_config->writePidFileOnStartup = false; + + _status_config->httpport = 0; + + _visitor_config->maxconcurrentvisitorsFixed = 4; + _visitor_config->maxconcurrentvisitorsVariable = 0; +} + +void StorageConfigSet::add_bucket_space_mapping(vespalib::string doc_type, vespalib::string bucket_space_name) { + BucketspacesConfigBuilder::Documenttype type; + type.name = std::move(doc_type); + type.bucketspace = std::move(bucket_space_name); + _bucket_spaces_config->documenttype.emplace_back(std::move(type)); +} + +void StorageConfigSet::add_distribution_config(uint16_t nodes_in_top_level_group) { + StorDistributionConfigBuilder::Group group; + group.name = "invalid"; + group.index = "invalid"; + for (uint16_t i = 0; i < nodes_in_top_level_group; ++i) { + StorDistributionConfigBuilder::Group::Nodes node; + node.index = i; + group.nodes.emplace_back(std::move(node)); + } + _distribution_config->group.clear(); + _distribution_config->group.emplace_back(std::move(group)); + _distribution_config->redundancy = 2; +} + +void StorageConfigSet::add_metric_consumer(vespalib::string name, const std::vector<vespalib::string>& added_metrics) { + MetricsmanagerConfigBuilder::Consumer consumer; + consumer.name = std::move(name); + consumer.addedmetrics.assign(added_metrics.begin(), added_metrics.end()); + _metrics_config->consumer.emplace_back(std::move(consumer)); +} + +void StorageConfigSet::set_node_index(uint16_t node_index) { + _server_config->nodeIndex = node_index; +} + +void StorageConfigSet::set_slobrok_config_port(int slobrok_port) { + SlobroksConfigBuilder::Slobrok slobrok; + slobrok.connectionspec = vespalib::make_string("tcp/localhost:%d", slobrok_port); + _slobroks_config->slobrok.clear(); + _slobroks_config->slobrok.emplace_back(std::move(slobrok)); +} + +} // storage diff --git a/storage/src/tests/common/storage_config_set.h b/storage/src/tests/common/storage_config_set.h new file mode 100644 index 00000000000..66cdeaf527f --- /dev/null +++ b/storage/src/tests/common/storage_config_set.h @@ -0,0 +1,122 @@ +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/config/common/configcontext.h> +#include <vespa/config/subscription/configuri.h> +#include <vespa/config/subscription/sourcespec.h> +#include <memory> + +// FIXME "internal" here is very counter-productive since it precludes easy fwd decls. +// Currently have to punch holes in internal abstractions to make this work at all. +namespace cloud::config::internal { class InternalSlobroksType; } +namespace messagebus::internal { class InternalMessagebusType; } +namespace metrics::internal { class InternalMetricsmanagerType; } +namespace document::config::internal { class InternalDocumenttypesType; } +namespace vespa::config::content::internal { +class InternalPersistenceType; +class InternalStorDistributionType; +class InternalStorFilestorType; +class InternalUpgradingType; +} +namespace vespa::config::content::core::internal { +class InternalBucketspacesType; +class InternalStorBouncerType; +class InternalStorCommunicationmanagerType; +class InternalStorDistributormanagerType; +class InternalStorPrioritymappingType; +class InternalStorServerType; +class InternalStorStatusType; +class InternalStorVisitorType; +class InternalStorVisitordispatcherType; +} + +namespace storage { + +class StorageConfigSet { + using SlobroksConfigBuilder = cloud::config::internal::InternalSlobroksType; + using MessagebusConfigBuilder = messagebus::internal::InternalMessagebusType; + using MetricsmanagerConfigBuilder = metrics::internal::InternalMetricsmanagerType; + using DocumenttypesConfigBuilder = document::config::internal::InternalDocumenttypesType; + using PersistenceConfigBuilder = vespa::config::content::internal::InternalPersistenceType; + using StorDistributionConfigBuilder = vespa::config::content::internal::InternalStorDistributionType; + using StorFilestorConfigBuilder = vespa::config::content::internal::InternalStorFilestorType; + using UpgradingConfigBuilder = vespa::config::content::internal::InternalUpgradingType; + using BucketspacesConfigBuilder = vespa::config::content::core::internal::InternalBucketspacesType; + using StorBouncerConfigBuilder = vespa::config::content::core::internal::InternalStorBouncerType; + using StorCommunicationmanagerConfigBuilder = vespa::config::content::core::internal::InternalStorCommunicationmanagerType; + using StorDistributormanagerConfigBuilder = vespa::config::content::core::internal::InternalStorDistributormanagerType; + using StorPrioritymappingConfigBuilder = vespa::config::content::core::internal::InternalStorPrioritymappingType; + using StorServerConfigBuilder = vespa::config::content::core::internal::InternalStorServerType; + using StorStatusConfigBuilder = vespa::config::content::core::internal::InternalStorStatusType; + using StorVisitorConfigBuilder = vespa::config::content::core::internal::InternalStorVisitorType; + using StorVisitordispatcherConfigBuilder = vespa::config::content::core::internal::InternalStorVisitordispatcherType; + + std::unique_ptr<DocumenttypesConfigBuilder> _document_type_config; + std::unique_ptr<SlobroksConfigBuilder> _slobroks_config; + std::unique_ptr<MessagebusConfigBuilder> _messagebus_config; + std::unique_ptr<MetricsmanagerConfigBuilder> _metrics_config; + std::unique_ptr<PersistenceConfigBuilder> _persistence_config; + std::unique_ptr<StorDistributionConfigBuilder> _distribution_config; + std::unique_ptr<StorFilestorConfigBuilder> _filestor_config; + std::unique_ptr<UpgradingConfigBuilder> _upgrading_config; + std::unique_ptr<BucketspacesConfigBuilder> _bucket_spaces_config; + std::unique_ptr<StorBouncerConfigBuilder> _bouncer_config; + std::unique_ptr<StorCommunicationmanagerConfigBuilder> _communication_manager_config; + std::unique_ptr<StorDistributormanagerConfigBuilder> _distributor_manager_config; + std::unique_ptr<StorPrioritymappingConfigBuilder> _priority_mapping_config; // TODO removable? + std::unique_ptr<StorServerConfigBuilder> _server_config; + std::unique_ptr<StorStatusConfigBuilder> _status_config; + std::unique_ptr<StorVisitorConfigBuilder> _visitor_config; + std::unique_ptr<StorVisitordispatcherConfigBuilder> _visitor_dispatcher_config; + + vespalib::string _config_id_str; + config::ConfigSet _config_set; + std::shared_ptr<config::ConfigContext> _config_ctx; + config::ConfigUri _config_uri; + +public: + StorageConfigSet(vespalib::string config_id_str, bool is_storage_node); + ~StorageConfigSet(); + + void init_default_configs(bool is_storage_node); + void add_bucket_space_mapping(vespalib::string doc_type, vespalib::string bucket_space_name); + void add_metric_consumer(vespalib::string name, const std::vector<vespalib::string>& added_metrics); + void add_distribution_config(uint16_t nodes_in_top_level_group); + void set_slobrok_config_port(int slobrok_port); + void set_node_index(uint16_t node_index); + + [[nodiscard]] const config::ConfigUri& config_uri() const noexcept { + return _config_uri; + } + + DocumenttypesConfigBuilder& document_type_config() noexcept { return *_document_type_config; } + SlobroksConfigBuilder& slobroks_config() noexcept { return *_slobroks_config; } + MessagebusConfigBuilder& messagebus_config() noexcept {return *_messagebus_config; } + MetricsmanagerConfigBuilder& metrics_config() noexcept { return *_metrics_config; } + PersistenceConfigBuilder& persistence_config() noexcept { return *_persistence_config; } + StorDistributionConfigBuilder& distribution_config() noexcept { return *_distribution_config; } + StorFilestorConfigBuilder& filestor_config() noexcept { return *_filestor_config; } + BucketspacesConfigBuilder& bucket_spaces_config() noexcept { return *_bucket_spaces_config; } + StorBouncerConfigBuilder& bouncer_config() noexcept { return *_bouncer_config; }; + StorCommunicationmanagerConfigBuilder& communication_manager_config() noexcept { return *_communication_manager_config; } + StorDistributormanagerConfigBuilder& distributor_manager_config() noexcept { return *_distributor_manager_config; } + StorServerConfigBuilder& server_config() noexcept { return *_server_config; } + StorStatusConfigBuilder& status_config() noexcept { return *_status_config; } + StorVisitorConfigBuilder& visitor_config() noexcept { return *_visitor_config; } + StorVisitordispatcherConfigBuilder& visitor_dispatcher_config() noexcept { return *_visitor_dispatcher_config; } + + [[nodiscard]] static std::unique_ptr<StorageConfigSet> make_node_config(bool is_storage_node) { + return std::make_unique<StorageConfigSet>("my-node", is_storage_node); + } + + [[nodiscard]] static std::unique_ptr<StorageConfigSet> make_storage_node_config() { + return make_node_config(true); + } + + [[nodiscard]] static std::unique_ptr<StorageConfigSet> make_distributor_node_config() { + return make_node_config(false); + } +}; + +} diff --git a/storage/src/tests/common/testhelper.cpp b/storage/src/tests/common/testhelper.cpp deleted file mode 100644 index 4ca935b7904..00000000000 --- a/storage/src/tests/common/testhelper.cpp +++ /dev/null @@ -1,148 +0,0 @@ -// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <tests/common/testhelper.h> - -#include <vespa/log/log.h> -#include <vespa/vespalib/io/fileutil.h> -#include <vespa/vespalib/testkit/test_kit.h> - -LOG_SETUP(".testhelper"); - -namespace storage { - -void addStorageDistributionConfig(vdstestlib::DirConfig& dc) -{ - vdstestlib::DirConfig::Config* config; - config = &dc.getConfig("stor-distribution", true); - config->clear(); - config->set("group[1]"); - config->set("group[0].name", "invalid"); - config->set("group[0].index", "invalid"); - config->set("group[0].nodes[50]"); - config->set("redundancy", "2"); - - for (uint32_t i = 0; i < 50; i++) { - std::ostringstream key; key << "group[0].nodes[" << i << "].index"; - std::ostringstream val; val << i; - config->set(key.str(), val.str()); - } -} - -std::string getRootFolder(vdstestlib::DirConfig & dc) { - std::string defaultValue(""); - return dc.getConfig("stor-server").getValue("root_folder", defaultValue); -} - -vdstestlib::DirConfig getStandardConfig(bool storagenode, const std::string & rootOfRoot) { - std::string clusterName("storage"); - vdstestlib::DirConfig dc; - vdstestlib::DirConfig::Config* config; - config = &dc.addConfig("fleetcontroller"); - config->set("cluster_name", clusterName); - config->set("index", "0"); - config->set("zookeeper_server", "\"\""); - config->set("total_distributor_count", "10"); - config->set("total_storage_count", "10"); - config = &dc.addConfig("upgrading"); - config = &dc.addConfig("load-type"); - config = &dc.addConfig("bucket"); - config = &dc.addConfig("messagebus"); - config = &dc.addConfig("stor-prioritymapping"); - config = &dc.addConfig("stor-bucketdbupdater"); - config = &dc.addConfig("metricsmanager"); - config->set("consumer[2]"); - config->set("consumer[0].name", "\"status\""); - config->set("consumer[0].addedmetrics[1]"); - config->set("consumer[0].addedmetrics[0]", "\"*\""); - config->set("consumer[1].name", "\"statereporter\""); - config->set("consumer[1].addedmetrics[1]"); - config->set("consumer[1].addedmetrics[0]", "\"*\""); - config = &dc.addConfig("stor-communicationmanager"); - config->set("rpcport", "0"); - config->set("mbusport", "0"); - config = &dc.addConfig("stor-distributormanager"); - config->set("splitcount", "1000"); - config->set("splitsize", "10000000"); - config->set("joincount", "500"); - config->set("joinsize", "5000000"); - config->set("max_clock_skew_sec", "0"); - config = &dc.addConfig("persistence"); - config->set("abort_operations_with_changed_bucket_ownership", "true"); - config = &dc.addConfig("stor-filestor"); - // Easier to see what goes wrong with only 1 thread per disk. - config->set("num_threads", "1"); - config->set("num_response_threads", "1"); - config->set("maximum_versions_of_single_document_stored", "0"); - config->set("keep_remove_time_period", "2000000000"); - config->set("revert_time_period", "2000000000"); - // Don't want test to call exit() - config->set("fail_disk_after_error_count", "0"); - config = &dc.addConfig("stor-bouncer"); - config = &dc.addConfig("stor-server"); - config->set("cluster_name", clusterName); - config->set("enable_dead_lock_detector", "false"); - config->set("enable_dead_lock_detector_warnings", "false"); - config->set("max_merges_per_node", "25"); - config->set("max_merge_queue_size", "20"); - config->set("resource_exhaustion_merge_back_pressure_duration_secs", "15.0"); - vespalib::string rootFolder = rootOfRoot + "_"; - rootFolder += (storagenode ? "vdsroot" : "vdsroot.distributor"); - config->set("root_folder", rootFolder); - config->set("is_distributor", (storagenode ? "false" : "true")); - config = &dc.addConfig("stor-devices"); - config->set("root_folder", rootFolder); - config = &dc.addConfig("stor-status"); - config->set("httpport", "0"); - config = &dc.addConfig("stor-visitor"); - config->set("defaultdocblocksize", "8192"); - // By default, need "old" behaviour of maxconcurrent - config->set("maxconcurrentvisitors_fixed", "4"); - config->set("maxconcurrentvisitors_variable", "0"); - config = &dc.addConfig("stor-visitordispatcher"); - addFileConfig(dc, "documenttypes", TEST_PATH("config-doctypes.cfg")); - addStorageDistributionConfig(dc); - return dc; -} - -void addSlobrokConfig(vdstestlib::DirConfig& dc, - const mbus::Slobrok& slobrok) -{ - std::ostringstream ost; - ost << "tcp/localhost:" << slobrok.port(); - vdstestlib::DirConfig::Config* config; - config = &dc.getConfig("slobroks", true); - config->clear(); - config->set("slobrok[1]"); - config->set("slobrok[0].connectionspec", ost.str()); -} - -void addFileConfig(vdstestlib::DirConfig& dc, - const std::string& configDefName, - const std::string& fileName) -{ - vdstestlib::DirConfig::Config* config; - config = &dc.getConfig(configDefName, true); - config->clear(); - std::ifstream in(fileName.c_str()); - std::string line; - while (std::getline(in, line, '\n')) { - std::string::size_type pos = line.find(' '); - if (pos == std::string::npos) { - config->set(line); - } else { - config->set(line.substr(0, pos), line.substr(pos + 1)); - } - } - in.close(); -} - -TestName::TestName(const std::string& n) - : name(n) -{ - LOG(debug, "Starting test %s", name.c_str()); -} - -TestName::~TestName() { - LOG(debug, "Done with test %s", name.c_str()); -} - -} // storage diff --git a/storage/src/tests/common/testhelper.h b/storage/src/tests/common/testhelper.h index 1f83e938409..9f9b50652f9 100644 --- a/storage/src/tests/common/testhelper.h +++ b/storage/src/tests/common/testhelper.h @@ -1,39 +1,15 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once + #include <vespa/config/helper/configgetter.h> -#include <vespa/messagebus/testlib/slobrok.h> -#include <vespa/vdstestlib/config/dirconfig.h> -#include <fstream> -#include <sstream> +#include <vespa/config/subscription/configuri.h> namespace storage { -void addFileConfig(vdstestlib::DirConfig& dc, - const std::string& configDefName, - const std::string& fileName); - - -void addStorageDistributionConfig(vdstestlib::DirConfig& dc); - -vdstestlib::DirConfig getStandardConfig(bool storagenode, const std::string & rootFolder = "todo-make-unique"); - -std::string getRootFolder(vdstestlib::DirConfig & dc); - -void addSlobrokConfig(vdstestlib::DirConfig& dc, - const mbus::Slobrok& slobrok); - template <typename ConfigT> std::unique_ptr<ConfigT> config_from(const ::config::ConfigUri& cfg_uri) { return ::config::ConfigGetter<ConfigT>::getConfig(cfg_uri.getConfigId(), cfg_uri.getContext()); } -// Class used to print start and end of test. Enable debug when you want to see -// which test creates what output or where we get stuck -struct TestName { - std::string name; - TestName(const std::string& n); - ~TestName(); -}; - } // storage diff --git a/storage/src/tests/common/teststorageapp.cpp b/storage/src/tests/common/teststorageapp.cpp index e2e2de10702..d811f100aec 100644 --- a/storage/src/tests/common/teststorageapp.cpp +++ b/storage/src/tests/common/teststorageapp.cpp @@ -26,31 +26,31 @@ namespace storage { TestStorageApp::TestStorageApp(StorageComponentRegisterImpl::UP compReg, const lib::NodeType& type, NodeIndex index, - vespalib::stringref configId) + const config::ConfigUri& config_uri) : TestComponentRegister(ComponentRegisterImpl::UP(std::move(compReg))), _compReg(dynamic_cast<StorageComponentRegisterImpl&>(TestComponentRegister::getComponentRegister())), _docMan(), _nodeStateUpdater(type), - _configId(configId), + _configId(config_uri.getConfigId()), _node_identity("test_cluster", type, index), _initialized(false) { - // Use config to adjust values + // Use config to adjust values vespalib::string clusterName = "mycluster"; uint32_t redundancy = 2; uint32_t nodeCount = 10; - if (!configId.empty()) { - config::ConfigUri uri(configId); - std::unique_ptr<vespa::config::content::core::StorServerConfig> serverConfig = config::ConfigGetter<vespa::config::content::core::StorServerConfig>::getConfig(uri.getConfigId(), uri.getContext()); - clusterName = serverConfig->clusterName; - if (index == 0xffff) index = serverConfig->nodeIndex; - redundancy = config::ConfigGetter<vespa::config::content::StorDistributionConfig>::getConfig(uri.getConfigId(), uri.getContext())->redundancy; - nodeCount = config::ConfigGetter<vespa::config::content::FleetcontrollerConfig>::getConfig(uri.getConfigId(), uri.getContext())->totalStorageCount; - } else { - if (index == 0xffff) index = 0; + auto serverConfig = config::ConfigGetter<vespa::config::content::core::StorServerConfig>::getConfig(config_uri.getConfigId(), config_uri.getContext()); + clusterName = serverConfig->clusterName; + if (index == 0xffff) { + index = serverConfig->nodeIndex; + } + redundancy = config::ConfigGetter<vespa::config::content::StorDistributionConfig>::getConfig(config_uri.getConfigId(), config_uri.getContext())->redundancy; + if (index >= nodeCount) { + nodeCount = index + 1; + } + if (redundancy > nodeCount) { + redundancy = nodeCount; } - if (index >= nodeCount) nodeCount = index + 1; - if (redundancy > nodeCount) redundancy = nodeCount; _compReg.setNodeInfo(clusterName, type, index); _compReg.setNodeStateUpdater(_nodeStateUpdater); @@ -84,21 +84,17 @@ TestStorageApp::setClusterState(const lib::ClusterState& c) } namespace { -NodeIndex getIndexFromConfig(vespalib::stringref configId) { - if (!configId.empty()) { - config::ConfigUri uri(configId); - return NodeIndex( - config::ConfigGetter<vespa::config::content::core::StorServerConfig>::getConfig(uri.getConfigId(), uri.getContext())->nodeIndex); - } - return NodeIndex(0); + +NodeIndex node_index_from_config(const config::ConfigUri& uri) { + return NodeIndex(config::ConfigGetter<vespa::config::content::core::StorServerConfig>::getConfig(uri.getConfigId(), uri.getContext())->nodeIndex); } VESPA_THREAD_STACK_TAG(test_executor) } -TestServiceLayerApp::TestServiceLayerApp(vespalib::stringref configId) +TestServiceLayerApp::TestServiceLayerApp(NodeIndex index, const config::ConfigUri& config_uri) : TestStorageApp(std::make_unique<ServiceLayerComponentRegisterImpl>(ContentBucketDbOptions()), - lib::NodeType::STORAGE, getIndexFromConfig(configId), configId), + lib::NodeType::STORAGE, index, config_uri), _compReg(dynamic_cast<ServiceLayerComponentRegisterImpl&>(TestStorageApp::getComponentRegister())), _persistenceProvider(), _executor(vespalib::SequencedTaskExecutor::create(test_executor, 1)), @@ -108,17 +104,9 @@ TestServiceLayerApp::TestServiceLayerApp(vespalib::stringref configId) _nodeStateUpdater.setReportedNodeState(ns); } -TestServiceLayerApp::TestServiceLayerApp(NodeIndex index, - vespalib::stringref configId) - : TestStorageApp(std::make_unique<ServiceLayerComponentRegisterImpl>(ContentBucketDbOptions()), - lib::NodeType::STORAGE, index, configId), - _compReg(dynamic_cast<ServiceLayerComponentRegisterImpl&>(TestStorageApp::getComponentRegister())), - _persistenceProvider(), - _executor(vespalib::SequencedTaskExecutor::create(test_executor, 1)), - _host_info() +TestServiceLayerApp::TestServiceLayerApp(const config::ConfigUri& config_uri) + : TestServiceLayerApp(node_index_from_config(config_uri), config_uri) { - lib::NodeState ns(*_nodeStateUpdater.getReportedNodeState()); - _nodeStateUpdater.setReportedNodeState(ns); } TestServiceLayerApp::~TestServiceLayerApp() = default; @@ -147,45 +135,37 @@ TestServiceLayerApp::getPersistenceProvider() } namespace { - template<typename T> - T getConfig(vespalib::stringref configId) { - config::ConfigUri uri(configId); - return *config::ConfigGetter<T>::getConfig(uri.getConfigId(), uri.getContext()); - } + +template<typename T> +[[nodiscard]] T get_config(const config::ConfigUri& uri) { + return *config::ConfigGetter<T>::getConfig(uri.getConfigId(), uri.getContext()); +} + } void -TestDistributorApp::configure(vespalib::stringref id) +TestDistributorApp::configure(const config::ConfigUri& config_uri) { - if (id.empty()) return; - auto dc(getConfig<vespa::config::content::core::StorDistributormanagerConfig>(id)); + auto dc = get_config<vespa::config::content::core::StorDistributormanagerConfig>(config_uri); _compReg.setDistributorConfig(dc); - auto vc(getConfig<vespa::config::content::core::StorVisitordispatcherConfig>(id)); + auto vc = get_config<vespa::config::content::core::StorVisitordispatcherConfig>(config_uri); _compReg.setVisitorConfig(vc); } -TestDistributorApp::TestDistributorApp(vespalib::stringref configId) - : TestStorageApp( - std::make_unique<DistributorComponentRegisterImpl>(), - lib::NodeType::DISTRIBUTOR, getIndexFromConfig(configId), configId), +TestDistributorApp::TestDistributorApp(NodeIndex index, const config::ConfigUri& config_uri) + : TestStorageApp(std::make_unique<DistributorComponentRegisterImpl>(), + lib::NodeType::DISTRIBUTOR, index, config_uri), _compReg(dynamic_cast<DistributorComponentRegisterImpl&>(TestStorageApp::getComponentRegister())), _lastUniqueTimestampRequested(0), _uniqueTimestampCounter(0) { _compReg.setTimeCalculator(*this); - configure(configId); + configure(config_uri); } -TestDistributorApp::TestDistributorApp(NodeIndex index, vespalib::stringref configId) - : TestStorageApp( - std::make_unique<DistributorComponentRegisterImpl>(), - lib::NodeType::DISTRIBUTOR, index, configId), - _compReg(dynamic_cast<DistributorComponentRegisterImpl&>(TestStorageApp::getComponentRegister())), - _lastUniqueTimestampRequested(0), - _uniqueTimestampCounter(0) +TestDistributorApp::TestDistributorApp(const config::ConfigUri& config_uri) + : TestDistributorApp(node_index_from_config(config_uri), config_uri) { - _compReg.setTimeCalculator(*this); - configure(configId); } TestDistributorApp::~TestDistributorApp() = default; diff --git a/storage/src/tests/common/teststorageapp.h b/storage/src/tests/common/teststorageapp.h index fb91145c66a..04fa6996e15 100644 --- a/storage/src/tests/common/teststorageapp.h +++ b/storage/src/tests/common/teststorageapp.h @@ -1,9 +1,6 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. /** - * \class storage::TestServiceLayerApp - * \ingroup common - * - * \brief Helper class for tests involving service layer. + * Helper class for tests involving service layer. * * Some components need some dependencies injected in order to work correctly. * This test class simplifies the process of creating these dependencies. @@ -34,6 +31,8 @@ #include <vespa/vespalib/util/sequencedtaskexecutor.h> #include <atomic> +namespace config { class ConfigUri; } + namespace storage { namespace spi { struct PersistenceProvider; } @@ -66,8 +65,8 @@ public: * from config themselves. */ TestStorageApp(StorageComponentRegisterImpl::UP compReg, - const lib::NodeType&, NodeIndex = NodeIndex(0xffff), - vespalib::stringref configId = ""); + const lib::NodeType&, NodeIndex index, + const config::ConfigUri& config_uri); ~TestStorageApp() override; // Set functions, to be able to modify content while running. @@ -110,8 +109,8 @@ class TestServiceLayerApp : public TestStorageApp HostInfo _host_info; public: - explicit TestServiceLayerApp(vespalib::stringref configId); - explicit TestServiceLayerApp(NodeIndex = NodeIndex(0xffff), vespalib::stringref configId = ""); + TestServiceLayerApp(NodeIndex node_index, const config::ConfigUri& config_uri); + explicit TestServiceLayerApp(const config::ConfigUri& config_uri); ~TestServiceLayerApp() override; void setupDummyPersistence(); @@ -140,11 +139,11 @@ class TestDistributorApp : public TestStorageApp, uint64_t _lastUniqueTimestampRequested; uint32_t _uniqueTimestampCounter; - void configure(vespalib::stringref configId); + void configure(const config::ConfigUri& config_uri); public: - explicit TestDistributorApp(vespalib::stringref configId = ""); - explicit TestDistributorApp(NodeIndex index, vespalib::stringref configId = ""); + TestDistributorApp(NodeIndex index, const config::ConfigUri& config_uri); + explicit TestDistributorApp(const config::ConfigUri& config_uri); ~TestDistributorApp() override; DistributorComponentRegisterImpl& getComponentRegister() override { diff --git a/storage/src/tests/distributor/CMakeLists.txt b/storage/src/tests/distributor/CMakeLists.txt index 250cb872223..8f8c1b68de3 100644 --- a/storage/src/tests/distributor/CMakeLists.txt +++ b/storage/src/tests/distributor/CMakeLists.txt @@ -61,8 +61,14 @@ vespa_add_executable(storage_distributor_gtest_runner_app TEST GTest::gmock_main ) -vespa_add_test( - NAME storage_distributor_gtest_runner_app - COMMAND storage_distributor_gtest_runner_app - COST 350 -) +set(TOTAL_SHARDS 5) +math(EXPR MAX_SHARD_INDEX "${TOTAL_SHARDS} - 1") +foreach(SHARD_INDEX RANGE ${MAX_SHARD_INDEX}) + string(REGEX MATCH "...$" FMT_SHARD_INDEX "00" ${SHARD_INDEX}) + vespa_add_test( + NAME storage_distributor_gtest_runner_app_${FMT_SHARD_INDEX} + COMMAND storage_distributor_gtest_runner_app + ENVIRONMENT "GTEST_SHARD_INDEX=${SHARD_INDEX};GTEST_TOTAL_SHARDS=${TOTAL_SHARDS}" + COST 350 + ) +endforeach() diff --git a/storage/src/tests/distributor/distributor_stripe_test_util.cpp b/storage/src/tests/distributor/distributor_stripe_test_util.cpp index 923c7d1730b..0628b62bdfe 100644 --- a/storage/src/tests/distributor/distributor_stripe_test_util.cpp +++ b/storage/src/tests/distributor/distributor_stripe_test_util.cpp @@ -32,7 +32,7 @@ DistributorStripeTestUtil::DistributorStripeTestUtil() _done_initializing(true), _messageSender(_sender, _senderDown) { - _config = getStandardConfig(false); + _config = StorageConfigSet::make_distributor_node_config(); } DistributorStripeTestUtil::~DistributorStripeTestUtil() = default; @@ -40,7 +40,7 @@ DistributorStripeTestUtil::~DistributorStripeTestUtil() = default; void DistributorStripeTestUtil::createLinks() { - _node = std::make_unique<TestDistributorApp>(_config.getConfigId()); + _node = std::make_unique<TestDistributorApp>(_config->config_uri()); _metrics = std::make_shared<DistributorMetricSet>(); _ideal_state_metrics = std::make_shared<IdealStateMetricSet>(); _stripe = std::make_unique<DistributorStripe>(_node->getComponentRegister(), *_metrics, *_ideal_state_metrics, @@ -77,7 +77,7 @@ DistributorStripeTestUtil::setup_stripe(int redundancy, int node_count, const li // trigger_distribution_change(). // This isn't pretty, folks, but it avoids breaking the world for now, // as many tests have implicit assumptions about this being the behavior. - auto new_configs = BucketSpaceDistributionConfigs::from_default_distribution(std::move(distribution)); + auto new_configs = lib::BucketSpaceDistributionConfigs::from_default_distribution(std::move(distribution)); _stripe->update_distribution_config(new_configs); } @@ -95,7 +95,7 @@ void DistributorStripeTestUtil::trigger_distribution_change(lib::Distribution::SP distr) { _node->getComponentRegister().setDistribution(distr); - auto new_config = BucketSpaceDistributionConfigs::from_default_distribution(std::move(distr)); + auto new_config = lib::BucketSpaceDistributionConfigs::from_default_distribution(std::move(distr)); _stripe->update_distribution_config(new_config); } @@ -184,8 +184,8 @@ DistributorStripeTestUtil::close() { _stripe->flush_and_close(); _sender.clear(); - _node.reset(0); - _config = getStandardConfig(false); + _node.reset(); + _config = StorageConfigSet::make_distributor_node_config(); } namespace { diff --git a/storage/src/tests/distributor/distributor_stripe_test_util.h b/storage/src/tests/distributor/distributor_stripe_test_util.h index 801320e2bf8..862d9bfbfba 100644 --- a/storage/src/tests/distributor/distributor_stripe_test_util.h +++ b/storage/src/tests/distributor/distributor_stripe_test_util.h @@ -5,7 +5,9 @@ #include <tests/common/dummystoragelink.h> #include <tests/common/testhelper.h> #include <tests/common/teststorageapp.h> +#include <tests/common/storage_config_set.h> #include <vespa/storage/common/hostreporter/hostinfo.h> +#include <vespa/storage/config/config-stor-distributormanager.h> #include <vespa/storage/distributor/stripe_host_info_notifier.h> #include <vespa/storage/storageutil/utils.h> @@ -132,8 +134,8 @@ public: const DistributorConfiguration& getConfig(); - vdstestlib::DirConfig& getDirConfig() { - return _config; + vespa::config::content::core::StorDistributormanagerConfigBuilder& backing_config() noexcept { + return _config->distributor_manager_config(); } // TODO explicit notion of bucket spaces for tests @@ -237,7 +239,7 @@ public: void tag_content_node_supports_condition_probing(uint16_t index, bool supported); protected: - vdstestlib::DirConfig _config; + std::unique_ptr<StorageConfigSet> _config; std::unique_ptr<TestDistributorApp> _node; std::shared_ptr<DistributorMetricSet> _metrics; std::shared_ptr<IdealStateMetricSet> _ideal_state_metrics; diff --git a/storage/src/tests/distributor/externaloperationhandlertest.cpp b/storage/src/tests/distributor/externaloperationhandlertest.cpp index 634e4993d53..33da4727017 100644 --- a/storage/src/tests/distributor/externaloperationhandlertest.cpp +++ b/storage/src/tests/distributor/externaloperationhandlertest.cpp @@ -94,7 +94,7 @@ struct ExternalOperationHandlerTest : Test, DistributorStripeTestUtil { TEST_F(ExternalOperationHandlerTest, bucket_split_mask) { { createLinks(); - getDirConfig().getConfig("stor-distributormanager").set("minsplitcount", "16"); + backing_config().minsplitcount = 16; EXPECT_EQ(document::BucketId(16, 0xffff), operation_context().make_split_bit_constrained_bucket_id(document::DocumentId( @@ -115,7 +115,7 @@ TEST_F(ExternalOperationHandlerTest, bucket_split_mask) { close(); } { - getDirConfig().getConfig("stor-distributormanager").set("minsplitcount", "20"); + backing_config().minsplitcount = 20; createLinks(); EXPECT_EQ(document::BucketId(20, 0x11111), operation_context().make_split_bit_constrained_bucket_id(document::DocumentId( diff --git a/storage/src/tests/distributor/idealstatemanagertest.cpp b/storage/src/tests/distributor/idealstatemanagertest.cpp index 0cadaa3fc9f..4639a154e74 100644 --- a/storage/src/tests/distributor/idealstatemanagertest.cpp +++ b/storage/src/tests/distributor/idealstatemanagertest.cpp @@ -74,10 +74,10 @@ TEST_F(IdealStateManagerTest, sibling) { TEST_F(IdealStateManagerTest, status_page) { close(); - getDirConfig().getConfig("stor-distributormanager").set("splitsize", "100"); - getDirConfig().getConfig("stor-distributormanager").set("splitcount", "1000000"); - getDirConfig().getConfig("stor-distributormanager").set("joinsize", "0"); - getDirConfig().getConfig("stor-distributormanager").set("joincount", "0"); + backing_config().splitsize = 100; + backing_config().splitcount = 1000000; + backing_config().joinsize = 0; + backing_config().joincount = 0; createLinks(); setup_stripe(1, 1, "distributor:1 storage:1"); diff --git a/storage/src/tests/distributor/mock_tickable_stripe.h b/storage/src/tests/distributor/mock_tickable_stripe.h index 2fb486bab28..3a9ead6ae10 100644 --- a/storage/src/tests/distributor/mock_tickable_stripe.h +++ b/storage/src/tests/distributor/mock_tickable_stripe.h @@ -10,7 +10,7 @@ struct MockTickableStripe : TickableStripe { bool tick() override { abort(); } void flush_and_close() override { abort(); } void update_total_distributor_config(std::shared_ptr<const DistributorConfiguration>) override { abort(); } - void update_distribution_config(const BucketSpaceDistributionConfigs&) override { abort(); } + void update_distribution_config(const lib::BucketSpaceDistributionConfigs&) override { abort(); } void set_pending_cluster_state_bundle(const lib::ClusterStateBundle&) override { abort(); } void clear_pending_cluster_state_bundle() override { abort(); } void enable_cluster_state_bundle(const lib::ClusterStateBundle&, bool) override { abort(); } diff --git a/storage/src/tests/distributor/operationtargetresolvertest.cpp b/storage/src/tests/distributor/operationtargetresolvertest.cpp index f0f8a4359fd..171dc5a42c0 100644 --- a/storage/src/tests/distributor/operationtargetresolvertest.cpp +++ b/storage/src/tests/distributor/operationtargetresolvertest.cpp @@ -27,8 +27,7 @@ struct OperationTargetResolverTest : Test, DistributorStripeTestUtil { const document::DocumentType* _html_type; std::unique_ptr<Operation> op; - BucketInstanceList getInstances(const BucketId& bid, - bool stripToRedundancy); + BucketInstanceList getInstances(const BucketId& bid, bool stripToRedundancy, bool symmetry_mode); void SetUp() override { _repo.reset(new document::DocumentTypeRepo( @@ -62,7 +61,7 @@ namespace { TestTargets::createTest(id, *this, *_asserters.back()) struct Asserter { - virtual ~Asserter() {} + virtual ~Asserter() = default; virtual void assertEqualMsg(std::string t1, OperationTargetList t2, OperationTargetList t3) = 0; @@ -73,21 +72,29 @@ struct TestTargets { OperationTargetList _expected; OperationTargetResolverTest& _test; Asserter& _asserter; + bool _symmetry_mode; TestTargets(const BucketId& id, OperationTargetResolverTest& test, Asserter& asserter) - : _id(id), _test(test), _asserter(asserter) {} + : _id(id), _test(test), _asserter(asserter), _symmetry_mode(true) + { + } ~TestTargets() { - BucketInstanceList result(_test.getInstances(_id, true)); - BucketInstanceList all(_test.getInstances(_id, false)); + BucketInstanceList result(_test.getInstances(_id, true, _symmetry_mode)); + BucketInstanceList all(_test.getInstances(_id, false, _symmetry_mode)); _asserter.assertEqualMsg( all.toString(), _expected, result.createTargets(makeBucketSpace())); delete _asserters.back(); _asserters.pop_back(); } + TestTargets& with_symmetric_replica_selection(bool symmetry) noexcept { + _symmetry_mode = symmetry; + return *this; + } + TestTargets& sendsTo(const BucketId& id, uint16_t node) { _expected.push_back(OperationTarget( makeDocumentBucket(id), lib::Node(lib::NodeType::STORAGE, node), false)); @@ -110,7 +117,7 @@ struct TestTargets { } // anonymous BucketInstanceList -OperationTargetResolverTest::getInstances(const BucketId& id, bool stripToRedundancy) +OperationTargetResolverTest::getInstances(const BucketId& id, bool stripToRedundancy, bool symmetry_mode) { auto &bucketSpaceRepo(operation_context().bucket_space_repo()); auto &distributorBucketSpace(bucketSpaceRepo.get(makeBucketSpace())); @@ -118,6 +125,7 @@ OperationTargetResolverTest::getInstances(const BucketId& id, bool stripToRedund distributorBucketSpace, distributorBucketSpace.getBucketDatabase(), 16, distributorBucketSpace.getDistribution().getRedundancy(), makeBucketSpace()); + resolver.use_symmetric_replica_selection(symmetry_mode); if (stripToRedundancy) { return resolver.getInstances(OperationTargetResolver::PUT, id); } else { @@ -143,14 +151,48 @@ TEST_F(OperationTargetResolverTest, choose_ideal_state_when_many_copies) { .sendsTo(BucketId(16, 0), 3); } -TEST_F(OperationTargetResolverTest, trusted_over_ideal_state) { +TEST_F(OperationTargetResolverTest, legacy_prefers_trusted_over_ideal_state) { setup_stripe(2, 4, "storage:4 distributor:1"); addNodesToBucketDB(BucketId(16, 0), "0=0/0/0/t,1=0,2=0/0/0/t,3=0"); // ideal nodes: 1, 3 + MY_ASSERT_THAT(BucketId(32, 0)).with_symmetric_replica_selection(false) + .sendsTo(BucketId(16, 0), 0) + .sendsTo(BucketId(16, 0), 2); +} + +TEST_F(OperationTargetResolverTest, prefer_ready_over_ideal_state_order) { + setup_stripe(2, 4, "storage:4 distributor:1"); + addNodesToBucketDB(BucketId(16, 0), "0=1/2/3/u/i/r,1=1/2/3,2=1/2/3/u/i/r,3=1/2/3"); + // ideal nodes: 1, 3. 0 and 2 are ready. MY_ASSERT_THAT(BucketId(32, 0)).sendsTo(BucketId(16, 0), 0) .sendsTo(BucketId(16, 0), 2); } +TEST_F(OperationTargetResolverTest, prefer_ready_over_ideal_state_order_also_when_retired) { + setup_stripe(2, 4, "storage:4 .0.s:r distributor:1"); + addNodesToBucketDB(BucketId(16, 0), "0=1/2/3/u/i/r,1=1/2/3,2=1/2/3/u/i/r,3=1/2/3"); + // ideal nodes: 1, 3. 0 and 2 are ready. + MY_ASSERT_THAT(BucketId(32, 0)).sendsTo(BucketId(16, 0), 0) + .sendsTo(BucketId(16, 0), 2); +} + +TEST_F(OperationTargetResolverTest, prefer_replicas_with_more_docs_over_replicas_with_fewer_docs) { + setup_stripe(2, 4, "storage:4 distributor:1"); + addNodesToBucketDB(BucketId(16, 0), "0=2/3/4,1=1/2/3,2=3/4/5,3=1/2/3"); + // ideal nodes: 1, 3. 0 and 2 have more docs. + MY_ASSERT_THAT(BucketId(32, 0)).sendsTo(BucketId(16, 0), 2) + .sendsTo(BucketId(16, 0), 0); +} + +TEST_F(OperationTargetResolverTest, fall_back_to_active_state_and_db_index_if_all_other_fields_equal) { + // All replica nodes tagged as retired, which means none are part of the ideal state order + setup_stripe(2, 4, "storage:4 .0.s:r .2.s:r .3.s:r distributor:1"); + addNodesToBucketDB(BucketId(16, 0), "0=2/3/4/u/a,3=2/3/4,2=2/3/4"); + // ideal nodes: 1, 3. 0 is active and 3 is the remaining replica with the lowest DB order. + MY_ASSERT_THAT(BucketId(32, 0)).sendsTo(BucketId(16, 0), 0) + .sendsTo(BucketId(16, 0), 3); +} + TEST_F(OperationTargetResolverTest, choose_highest_split_bucket) { setup_stripe(2, 2, "storage:2 distributor:1"); // 0, 1 are both in ideal state for both buckets. diff --git a/storage/src/tests/distributor/statusreporterdelegatetest.cpp b/storage/src/tests/distributor/statusreporterdelegatetest.cpp index cc23fa7a22e..c70ab533af6 100644 --- a/storage/src/tests/distributor/statusreporterdelegatetest.cpp +++ b/storage/src/tests/distributor/statusreporterdelegatetest.cpp @@ -1,5 +1,6 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <tests/common/storage_config_set.h> #include <tests/common/testhelper.h> #include <tests/common/teststorageapp.h> #include <vespa/storage/distributor/statusreporterdelegate.h> @@ -45,8 +46,8 @@ public: } TEST(StatusReporterDelegateTest, delegate_invokes_delegator_on_status_request) { - vdstestlib::DirConfig config(getStandardConfig(false)); - TestDistributorApp app(config.getConfigId()); + auto config = StorageConfigSet::make_distributor_node_config(); + TestDistributorApp app(config->config_uri()); MockDelegator mockDelegator; MockStatusReporter reporter; diff --git a/storage/src/tests/distributor/top_level_distributor_test_util.cpp b/storage/src/tests/distributor/top_level_distributor_test_util.cpp index 94031c6d71e..e1b2bc93f62 100644 --- a/storage/src/tests/distributor/top_level_distributor_test_util.cpp +++ b/storage/src/tests/distributor/top_level_distributor_test_util.cpp @@ -24,7 +24,7 @@ TopLevelDistributorTestUtil::TopLevelDistributorTestUtil() : _message_sender(_sender, _sender_down), _num_distributor_stripes(4) { - _config = getStandardConfig(false); + _config = StorageConfigSet::make_distributor_node_config(); } TopLevelDistributorTestUtil::~TopLevelDistributorTestUtil() = default; @@ -32,7 +32,7 @@ TopLevelDistributorTestUtil::~TopLevelDistributorTestUtil() = default; void TopLevelDistributorTestUtil::create_links() { - _node = std::make_unique<TestDistributorApp>(_config.getConfigId()); + _node = std::make_unique<TestDistributorApp>(_config->config_uri()); _thread_pool = framework::TickingThreadPool::createDefault("distributor", 100ms); _stripe_pool = DistributorStripePool::make_non_threaded_pool_for_testing(); _distributor.reset(new TopLevelDistributor( @@ -123,7 +123,7 @@ TopLevelDistributorTestUtil::close() } _sender.clear(); _node.reset(); - _config = getStandardConfig(false); + _config = StorageConfigSet::make_distributor_node_config(); } void diff --git a/storage/src/tests/distributor/top_level_distributor_test_util.h b/storage/src/tests/distributor/top_level_distributor_test_util.h index 1d4c81a5bfb..51f0739e3e6 100644 --- a/storage/src/tests/distributor/top_level_distributor_test_util.h +++ b/storage/src/tests/distributor/top_level_distributor_test_util.h @@ -3,6 +3,7 @@ #include "distributor_message_sender_stub.h" #include <tests/common/dummystoragelink.h> +#include <tests/common/storage_config_set.h> #include <tests/common/testhelper.h> #include <tests/common/teststorageapp.h> #include <vespa/storage/common/hostreporter/hostinfo.h> @@ -140,7 +141,7 @@ public: static std::vector<document::BucketSpace> bucket_spaces(); protected: - vdstestlib::DirConfig _config; + std::unique_ptr<StorageConfigSet> _config; std::unique_ptr<TestDistributorApp> _node; std::unique_ptr<framework::TickingThreadPool> _thread_pool; std::unique_ptr<DistributorStripePool> _stripe_pool; diff --git a/storage/src/tests/distributor/updateoperationtest.cpp b/storage/src/tests/distributor/updateoperationtest.cpp index 31ebbe19cbb..e00ce249298 100644 --- a/storage/src/tests/distributor/updateoperationtest.cpp +++ b/storage/src/tests/distributor/updateoperationtest.cpp @@ -49,13 +49,13 @@ struct UpdateOperationTest : Test, DistributorStripeTestUtil { const api::ReturnCode& result = api::ReturnCode()); std::shared_ptr<UpdateOperation> - sendUpdate(const std::string& bucketState, bool create_if_missing = false); + sendUpdate(const std::string& bucketState, bool create_if_missing = false, bool cache_create_flag = false); document::BucketId _bId; }; std::shared_ptr<UpdateOperation> -UpdateOperationTest::sendUpdate(const std::string& bucketState, bool create_if_missing) +UpdateOperationTest::sendUpdate(const std::string& bucketState, bool create_if_missing, bool cache_create_flag) { auto update = std::make_shared<document::DocumentUpdate>( *_repo, *_html_type, @@ -67,6 +67,9 @@ UpdateOperationTest::sendUpdate(const std::string& bucketState, bool create_if_m addNodesToBucketDB(_bId, bucketState); auto msg = std::make_shared<api::UpdateCommand>(makeDocumentBucket(document::BucketId(0)), update, 100); + if (cache_create_flag) { + msg->set_cached_create_if_missing(create_if_missing); + } return std::make_shared<UpdateOperation>( node_context(), operation_context(), getDistributorBucketSpace(), msg, std::vector<BucketDatabase::Entry>(), @@ -271,4 +274,20 @@ TEST_F(UpdateOperationTest, cancelled_nodes_are_not_updated_in_db) { dumpBucket(_bId)); } +TEST_F(UpdateOperationTest, cached_create_if_missing_is_propagated_to_fanout_requests) { + setup_stripe(1, 1, "distributor:1 storage:1"); + for (bool cache_flag : {false, true}) { + for (bool create_if_missing : {false, true}) { + std::shared_ptr<UpdateOperation> cb(sendUpdate("0=1/2/3", create_if_missing, cache_flag)); + DistributorMessageSenderStub sender; + cb->start(sender); + + ASSERT_EQ("Update => 0", sender.getCommands(true)); + auto& cmd = dynamic_cast<api::UpdateCommand&>(*sender.command(0)); + EXPECT_EQ(cmd.has_cached_create_if_missing(), cache_flag); + EXPECT_EQ(cmd.create_if_missing(), create_if_missing); + } + } +} + } diff --git a/storage/src/tests/frameworkimpl/status/statustest.cpp b/storage/src/tests/frameworkimpl/status/statustest.cpp index bd28297e108..8592a332f0c 100644 --- a/storage/src/tests/frameworkimpl/status/statustest.cpp +++ b/storage/src/tests/frameworkimpl/status/statustest.cpp @@ -1,10 +1,11 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <tests/common/storage_config_set.h> +#include <tests/common/teststorageapp.h> #include <vespa/storageframework/defaultimplementation/component/componentregisterimpl.h> #include <vespa/storage/frameworkimpl/status/statuswebserver.h> #include <vespa/storageframework/generic/status/htmlstatusreporter.h> #include <vespa/storageframework/generic/status/xmlstatusreporter.h> -#include <tests/common/teststorageapp.h> #include <vespa/document/util/stringutil.h> #include <vespa/vespalib/net/crypto_engine.h> #include <vespa/vespalib/net/socket_spec.h> @@ -39,6 +40,7 @@ vespalib::string fetch(int port, const vespalib::string &path) { namespace storage { struct StatusTest : Test { + std::unique_ptr<StorageConfigSet> _config; std::unique_ptr<TestServiceLayerApp> _node; void SetUp() override; @@ -97,7 +99,8 @@ namespace { } void StatusTest::SetUp() { - _node = std::make_unique<TestServiceLayerApp>(); + _config = StorageConfigSet::make_storage_node_config(); + _node = std::make_unique<TestServiceLayerApp>(_config->config_uri()); } namespace { diff --git a/storage/src/tests/persistence/bucketownershipnotifiertest.cpp b/storage/src/tests/persistence/bucketownershipnotifiertest.cpp index 129cac34c68..717a79b030d 100644 --- a/storage/src/tests/persistence/bucketownershipnotifiertest.cpp +++ b/storage/src/tests/persistence/bucketownershipnotifiertest.cpp @@ -1,6 +1,7 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <tests/common/message_sender_stub.h> +#include <tests/common/storage_config_set.h> #include <tests/common/teststorageapp.h> #include <vespa/document/test/make_document_bucket.h> #include <vespa/storage/persistence/bucketownershipnotifier.h> @@ -14,6 +15,7 @@ using namespace ::testing; namespace storage { struct BucketOwnershipNotifierTest : public Test { + std::unique_ptr<StorageConfigSet> _config; std::unique_ptr<TestServiceLayerApp> _app; lib::ClusterState _clusterState; @@ -58,7 +60,8 @@ struct BucketOwnershipNotifierTest : public Test { void BucketOwnershipNotifierTest::SetUp() { - _app = std::make_unique<TestServiceLayerApp>(); + _config = StorageConfigSet::make_storage_node_config(); + _app = std::make_unique<TestServiceLayerApp>(_config->config_uri()); _app->setDistribution(Redundancy(1), NodeCount(2)); _app->setClusterState(_clusterState); } diff --git a/storage/src/tests/persistence/common/filestortestfixture.cpp b/storage/src/tests/persistence/common/filestortestfixture.cpp index 6581fb9f7b1..6c7b09c4736 100644 --- a/storage/src/tests/persistence/common/filestortestfixture.cpp +++ b/storage/src/tests/persistence/common/filestortestfixture.cpp @@ -25,14 +25,11 @@ const uint32_t FileStorTestFixture::MSG_WAIT_TIME; void FileStorTestFixture::setupPersistenceThreads(uint32_t threads) { - std::string rootOfRoot = "todo-make-unique-filestorefixture"; - _config = std::make_unique<vdstestlib::DirConfig>(getStandardConfig(true, rootOfRoot)); - _config->getConfig("stor-server").set("root_folder", (rootOfRoot + "-vdsroot.2")); - _config->getConfig("stor-devices").set("root_folder", (rootOfRoot + "-vdsroot.2")); - _config->getConfig("stor-server").set("node_index", "1"); - _config->getConfig("stor-filestor").set("num_threads", std::to_string(threads)); - - _node = std::make_unique<TestServiceLayerApp>(NodeIndex(1), _config->getConfigId()); + _config = StorageConfigSet::make_storage_node_config(); + _config->set_node_index(1); + _config->filestor_config().numThreads = threads; + + _node = std::make_unique<TestServiceLayerApp>(NodeIndex(1), _config->config_uri()); _testdoctype1 = _node->getTypeRepo()->getDocumentType("testdoctype1"); } @@ -77,7 +74,7 @@ FileStorTestFixture::TestFileStorComponents::TestFileStorComponents( { injector.inject(top); using StorFilestorConfig = vespa::config::content::internal::InternalStorFilestorType; - auto config = config_from<StorFilestorConfig>(config::ConfigUri(fixture._config->getConfigId())); + auto config = config_from<StorFilestorConfig>(fixture._config->config_uri()); auto fsm = std::make_unique<FileStorManager>(*config, fixture._node->getPersistenceProvider(), fixture._node->getComponentRegister(), *fixture._node, fixture._node->get_host_info()); manager = fsm.get(); diff --git a/storage/src/tests/persistence/common/filestortestfixture.h b/storage/src/tests/persistence/common/filestortestfixture.h index e4776f393ae..d4fb94101cc 100644 --- a/storage/src/tests/persistence/common/filestortestfixture.h +++ b/storage/src/tests/persistence/common/filestortestfixture.h @@ -2,6 +2,7 @@ #pragma once #include <tests/common/dummystoragelink.h> +#include <tests/common/storage_config_set.h> #include <tests/common/testhelper.h> #include <tests/common/teststorageapp.h> #include <vespa/storage/persistence/filestorage/filestorhandlerimpl.h> @@ -14,8 +15,8 @@ namespace storage { class FileStorTestFixture : public ::testing::Test { public: + std::unique_ptr<StorageConfigSet> _config; std::unique_ptr<TestServiceLayerApp> _node; - std::unique_ptr<vdstestlib::DirConfig> _config; const document::DocumentType* _testdoctype1; static const uint32_t MSG_WAIT_TIME = 60 * 1000; diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp index f12b85eb2ea..bdc3f17e576 100644 --- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp +++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp @@ -1,6 +1,7 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <tests/common/dummystoragelink.h> +#include <tests/common/storage_config_set.h> #include <tests/common/testhelper.h> #include <tests/common/teststorageapp.h> #include <tests/persistence/filestorage/forwardingmessagesender.h> @@ -41,7 +42,6 @@ #include <vespa/log/log.h> LOG_SETUP(".filestormanagertest"); -using std::unique_ptr; using document::Document; using document::BucketId; using namespace storage::api; @@ -91,10 +91,8 @@ make_bucket_for_doc(const document::DocumentId& docid) struct FileStorTestBase : Test { enum {LONG_WAITTIME=60}; - unique_ptr<TestServiceLayerApp> _node; - std::unique_ptr<vdstestlib::DirConfig> config; - std::unique_ptr<vdstestlib::DirConfig> config2; - std::unique_ptr<vdstestlib::DirConfig> smallConfig; + std::unique_ptr<StorageConfigSet> _config; + std::unique_ptr<TestServiceLayerApp> _node; const int32_t _waitTime; const document::DocumentType* _testdoctype1; @@ -165,29 +163,10 @@ struct FileStorTestBase : Test { void setupDisks() { std::string rootOfRoot = "filestormanagertest"; - config = std::make_unique<vdstestlib::DirConfig>(getStandardConfig(true, rootOfRoot)); - - config2 = std::make_unique<vdstestlib::DirConfig>(*config); - config2->getConfig("stor-server").set("root_folder", rootOfRoot + "-vdsroot.2"); - config2->getConfig("stor-devices").set("root_folder", rootOfRoot + "-vdsroot.2"); - config2->getConfig("stor-server").set("node_index", "1"); - - smallConfig = std::make_unique<vdstestlib::DirConfig>(*config); - vdstestlib::DirConfig::Config& c(smallConfig->getConfig("stor-filestor", true)); - c.set("initial_index_read", "128"); - c.set("use_direct_io", "false"); - c.set("maximum_gap_to_read_through", "64"); - - assert(system(vespalib::make_string("rm -rf %s", getRootFolder(*config).c_str()).c_str()) == 0); - assert(system(vespalib::make_string("rm -rf %s", getRootFolder(*config2).c_str()).c_str()) == 0); - assert(system(vespalib::make_string("mkdir -p %s/disks/d0", getRootFolder(*config).c_str()).c_str()) == 0); - assert(system(vespalib::make_string("mkdir -p %s/disks/d0", getRootFolder(*config2).c_str()).c_str()) == 0); - try { - _node = std::make_unique<TestServiceLayerApp>(NodeIndex(0), config->getConfigId()); - _node->setupDummyPersistence(); - } catch (config::InvalidConfigException& e) { - fprintf(stderr, "%s\n", e.what()); - } + _config = StorageConfigSet::make_storage_node_config(); + + _node = std::make_unique<TestServiceLayerApp>(NodeIndex(0), _config->config_uri()); + _node->setupDummyPersistence(); _testdoctype1 = _node->getTypeRepo()->getDocumentType("testdoctype1"); } @@ -227,11 +206,10 @@ struct TestFileStorComponents { DummyStorageLink top; FileStorManager* manager; - explicit TestFileStorComponents(FileStorTestBase& test, bool use_small_config = false) + explicit TestFileStorComponents(FileStorTestBase& test) : manager(nullptr) { - auto config_uri = config::ConfigUri((use_small_config ? test.smallConfig : test.config)->getConfigId()); - auto config = config_from<StorFilestorConfig>(config_uri); + auto config = config_from<StorFilestorConfig>(test._config->config_uri()); auto fsm = std::make_unique<FileStorManager>(*config, test._node->getPersistenceProvider(), test._node->getComponentRegister(), *test._node, test._node->get_host_info()); manager = fsm.get(); @@ -1255,7 +1233,7 @@ createIterator(DummyStorageLink& link, } TEST_F(FileStorManagerTest, visiting) { - TestFileStorComponents c(*this, true); + TestFileStorComponents c(*this); auto& top = c.top; // Adding documents to two buckets which we are going to visit // We want one bucket in one slotfile, and one bucket with a file split @@ -1409,8 +1387,7 @@ TEST_F(FileStorManagerTest, remove_location) { TEST_F(FileStorManagerTest, delete_bucket) { TestFileStorComponents c(*this); - auto config_uri = config::ConfigUri(config->getConfigId()); - StorFilestorConfigBuilder my_config(*config_from<StorFilestorConfig>(config_uri)); + auto my_config = *config_from<StorFilestorConfig>(_config->config_uri()); c.manager->on_configure(my_config); auto& top = c.top; diff --git a/storage/src/tests/persistence/filestorage/filestormodifiedbucketstest.cpp b/storage/src/tests/persistence/filestorage/filestormodifiedbucketstest.cpp index 710da80972f..38acc4a18b8 100644 --- a/storage/src/tests/persistence/filestorage/filestormodifiedbucketstest.cpp +++ b/storage/src/tests/persistence/filestorage/filestormodifiedbucketstest.cpp @@ -38,7 +38,7 @@ struct BucketCheckerInjector : FileStorTestFixture::StorageLinkInjector {} void inject(DummyStorageLink& link) const override { using vespa::config::content::core::StorServerConfig; - auto cfg = config_from<StorServerConfig>(config::ConfigUri(_fixture._config->getConfigId())); + auto cfg = config_from<StorServerConfig>(_fixture._config->config_uri()); link.push_back(std::make_unique<ModifiedBucketChecker>( _node.getComponentRegister(), _node.getPersistenceProvider(), *cfg)); } diff --git a/storage/src/tests/persistence/filestorage/modifiedbucketcheckertest.cpp b/storage/src/tests/persistence/filestorage/modifiedbucketcheckertest.cpp index f96ff9c012e..9fc6ddff268 100644 --- a/storage/src/tests/persistence/filestorage/modifiedbucketcheckertest.cpp +++ b/storage/src/tests/persistence/filestorage/modifiedbucketcheckertest.cpp @@ -1,6 +1,7 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <tests/common/dummystoragelink.h> +#include <tests/common/storage_config_set.h> #include <tests/common/testhelper.h> #include <tests/common/teststorageapp.h> #include <vespa/config/common/exceptions.h> @@ -34,20 +35,20 @@ struct ModifiedBucketCheckerTest : Test { ModifiedBucketChecker* _handler; DummyStorageLink* _bottom; + std::unique_ptr<StorageConfigSet> _config; std::unique_ptr<TestServiceLayerApp> _node; - std::unique_ptr<vdstestlib::DirConfig> _config; }; void ModifiedBucketCheckerTest::SetUp() { - _config.reset(new vdstestlib::DirConfig(getStandardConfig(true))); - _node.reset(new TestServiceLayerApp(NodeIndex(0), _config->getConfigId())); + _config = StorageConfigSet::make_storage_node_config(); + _node = std::make_unique<TestServiceLayerApp>(NodeIndex(0), _config->config_uri()); _node->setupDummyPersistence(); - _top.reset(new DummyStorageLink); + _top = std::make_unique<DummyStorageLink>(); using vespa::config::content::core::StorServerConfig; - auto bootstrap_cfg = config_from<StorServerConfig>(config::ConfigUri(_config->getConfigId())); + auto bootstrap_cfg = config_from<StorServerConfig>(_config->config_uri()); _handler = new ModifiedBucketChecker(_node->getComponentRegister(), _node->getPersistenceProvider(), *bootstrap_cfg); diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp index 4bd0570efa8..524f5bae392 100644 --- a/storage/src/tests/persistence/mergehandlertest.cpp +++ b/storage/src/tests/persistence/mergehandlertest.cpp @@ -220,11 +220,11 @@ void MergeHandlerTest::setUpChain(ChainPos pos) { _nodes.clear(); if (pos != FRONT) { - _nodes.push_back(api::MergeBucketCommand::Node(2, false)); + _nodes.emplace_back(2, false); } - _nodes.push_back(api::MergeBucketCommand::Node(0, false)); + _nodes.emplace_back(0, false); if (pos != BACK) { - _nodes.push_back(api::MergeBucketCommand::Node(1, false)); + _nodes.emplace_back(1, false); } } @@ -1439,4 +1439,52 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply) LOG(debug, "got mergebucket reply"); } +TEST_F(MergeHandlerTest, multiple_versions_in_apply_diff_only_writes_newest_version) { + setUpChain(BACK); + + document::TestDocMan doc_mgr; + document::Document::SP doc(doc_mgr.createRandomDocumentAtLocation(_location, 1)); + spi::Timestamp ts_old(10'000); + spi::Timestamp ts_new(20'000); + + PersistenceProviderWrapper provider_wrapper(getPersistenceProvider()); + MergeHandler handler = createHandler(provider_wrapper); + std::vector<api::ApplyBucketDiffCommand::Entry> apply_diff; + // Diff contains two entries for the same document; one old Remove and one newer Put that + // subsumes the Remove operation. We should only schedule the Put to the SPI. + { + api::ApplyBucketDiffCommand::Entry e; + e._entry._timestamp = ts_old; + e._entry._hasMask = 0x1; + e._docName = doc->getId().toString(); + e._entry._flags = MergeHandler::IN_USE | MergeHandler::DELETED; + apply_diff.push_back(e); + } + { + api::ApplyBucketDiffCommand::Entry e; + e._entry._timestamp = ts_new; + e._entry._hasMask = 0x1; + e._entry._flags = MergeHandler::IN_USE; + fill_entry(e, *doc, doc_mgr.getTypeRepo()); + apply_diff.push_back(e); + } + + auto apply_bucket_diff_cmd = std::make_shared<api::ApplyBucketDiffCommand>(_bucket, _nodes); + apply_bucket_diff_cmd->getDiff() = std::move(apply_diff); + + provider_wrapper.clearOperationLog(); + auto tracker = handler.handleApplyBucketDiff(*apply_bucket_diff_cmd, createTracker(apply_bucket_diff_cmd, _bucket)); + ASSERT_FALSE(tracker); + handler.drain_async_writes(); + + // There should be no remove at time=ts_old, only a put at time=ts_new. + // TODO ideally we shouldn't have to know about the other operations... + EXPECT_EQ(provider_wrapper.toString(), + "createIterator(Bucket(0x40000000000004d2), ALL_VERSIONS)\n" + "iterate(1, 18446744073709551615)\n" + "destroyIterator(1)\n" + "put(Bucket(0x40000000000004d2), 20000, id:mail:testdoctype1:n=1234:9380.html)\n" + "getBucketInfo(Bucket(0x40000000000004d2))\n"); +} + } // storage diff --git a/storage/src/tests/persistence/persistencetestutils.cpp b/storage/src/tests/persistence/persistencetestutils.cpp index a599b1d380a..b63df53781d 100644 --- a/storage/src/tests/persistence/persistencetestutils.cpp +++ b/storage/src/tests/persistence/persistencetestutils.cpp @@ -25,14 +25,6 @@ namespace storage { namespace { -vdstestlib::DirConfig initialize(const std::string & rootOfRoot) { - vdstestlib::DirConfig config(getStandardConfig(true, rootOfRoot)); - std::string rootFolder = getRootFolder(config); - std::filesystem::remove_all(std::filesystem::path(rootFolder)); - std::filesystem::create_directories(std::filesystem::path(vespalib::make_string("%s/disks/d0", rootFolder.c_str()))); - return config; -} - template<typename T> struct ConfigReader : public T::Subscriber { @@ -49,10 +41,10 @@ constexpr uint32_t MERGE_CHUNK_SIZE = 4_Mi; } -PersistenceTestEnvironment::PersistenceTestEnvironment(const std::string & rootOfRoot) - : _config(initialize(rootOfRoot)), +PersistenceTestEnvironment::PersistenceTestEnvironment() + : _config(StorageConfigSet::make_distributor_node_config()), _messageKeeper(), - _node(NodeIndex(0), _config.getConfigId()), + _node(NodeIndex(0), _config->config_uri()), _component(_node.getComponentRegister(), "persistence test env"), _metrics() { @@ -106,7 +98,7 @@ PersistenceTestUtils::MockBucketLocks::unlock(document::Bucket bucket) } PersistenceTestUtils::PersistenceTestUtils() - : _env(std::make_unique<PersistenceTestEnvironment>("todo-make-unique-persistencetestutils")), + : _env(std::make_unique<PersistenceTestEnvironment>()), _replySender(), _bucketOwnershipNotifier(getEnv()._component, getEnv()._fileStorHandler), _mock_bucket_locks(), diff --git a/storage/src/tests/persistence/persistencetestutils.h b/storage/src/tests/persistence/persistencetestutils.h index d03974855ad..0125bd7aa79 100644 --- a/storage/src/tests/persistence/persistencetestutils.h +++ b/storage/src/tests/persistence/persistencetestutils.h @@ -1,6 +1,7 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once +#include <tests/common/storage_config_set.h> #include <tests/common/teststorageapp.h> #include <tests/common/testhelper.h> #include <vespa/storage/persistence/persistencethread.h> @@ -25,11 +26,11 @@ struct MessageKeeper : public MessageSender { }; struct PersistenceTestEnvironment { - PersistenceTestEnvironment(const std::string & rootOfRoot); + PersistenceTestEnvironment(); ~PersistenceTestEnvironment(); document::TestDocMan _testDocMan; - vdstestlib::DirConfig _config; + std::unique_ptr<StorageConfigSet> _config; MessageKeeper _messageKeeper; TestServiceLayerApp _node; ServiceLayerComponent _component; diff --git a/storage/src/tests/persistence/provider_error_wrapper_test.cpp b/storage/src/tests/persistence/provider_error_wrapper_test.cpp index fc88428f915..d5ce8400b25 100644 --- a/storage/src/tests/persistence/provider_error_wrapper_test.cpp +++ b/storage/src/tests/persistence/provider_error_wrapper_test.cpp @@ -1,5 +1,6 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <tests/common/storage_config_set.h> #include <vespa/persistence/spi/test.h> #include <tests/persistence/persistencetestutils.h> #include <tests/persistence/common/persistenceproviderwrapper.h> @@ -33,13 +34,15 @@ struct MockErrorListener : ProviderErrorListener { struct Fixture { // We wrap the wrapper. It's turtles all the way down! PersistenceProviderWrapper providerWrapper; + std::unique_ptr<StorageConfigSet> config; TestServiceLayerApp app; ServiceLayerComponent component; ProviderErrorWrapper errorWrapper; Fixture(spi::PersistenceProvider& provider) : providerWrapper(provider), - app(), + config(StorageConfigSet::make_storage_node_config()), + app(config->config_uri()), component(app.getComponentRegister(), "dummy"), errorWrapper(providerWrapper) { diff --git a/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp b/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp index 698d8dee573..231f41ffd21 100644 --- a/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp +++ b/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp @@ -262,7 +262,6 @@ TEST_P(StorageProtocolTest, response_metadata_is_propagated) { TEST_P(StorageProtocolTest, update) { auto update = std::make_shared<document::DocumentUpdate>(_docMan.getTypeRepo(), *_testDoc->getDataType(), _testDoc->getId()); update->addUpdate(FieldUpdate(_testDoc->getField("headerval")).addUpdate(std::make_unique<AssignValueUpdate>(std::make_unique<IntFieldValue>(17)))); - update->addFieldPathUpdate(std::make_unique<RemoveFieldPathUpdate>("headerval", "testdoctype1.headerval > 0")); auto cmd = std::make_shared<UpdateCommand>(_bucket, update, 14); @@ -284,6 +283,37 @@ TEST_P(StorageProtocolTest, update) { EXPECT_NO_FATAL_FAILURE(assert_bucket_info_reply_fields_propagated(*reply2)); } +TEST_P(StorageProtocolTest, update_request_create_if_missing_flag_is_propagated) { + auto make_update_cmd = [&](bool create_if_missing, bool cached) { + auto update = std::make_shared<document::DocumentUpdate>( + _docMan.getTypeRepo(), *_testDoc->getDataType(), _testDoc->getId()); + update->addUpdate(FieldUpdate(_testDoc->getField("headerval")).addUpdate( + std::make_unique<AssignValueUpdate>(std::make_unique<IntFieldValue>(17)))); + update->addFieldPathUpdate(std::make_unique<RemoveFieldPathUpdate>("headerval", "testdoctype1.headerval > 0")); + update->setCreateIfNonExistent(create_if_missing); + auto cmd = std::make_shared<UpdateCommand>(_bucket, update, 14); + if (cached) { + cmd->set_cached_create_if_missing(create_if_missing); + } + return cmd; + }; + + auto check_flag_propagation = [&](bool create_if_missing, bool cached) { + auto cmd = make_update_cmd(create_if_missing, cached); + EXPECT_EQ(cmd->has_cached_create_if_missing(), cached); + EXPECT_EQ(cmd->create_if_missing(), create_if_missing); + + auto cmd2 = copyCommand(cmd); + EXPECT_EQ(cmd2->has_cached_create_if_missing(), cached); + EXPECT_EQ(cmd2->create_if_missing(), create_if_missing); + }; + + check_flag_propagation(false, false); + check_flag_propagation(true, false); + check_flag_propagation(false, true); + check_flag_propagation(true, true); +} + TEST_P(StorageProtocolTest, get) { auto cmd = std::make_shared<GetCommand>(_bucket, _testDocId, "foo,bar,vekterli", 123); auto cmd2 = copyCommand(cmd); @@ -880,7 +910,7 @@ TEST_P(StorageProtocolTest, track_memory_footprint_for_some_messages) { EXPECT_EQ(sizeof(BucketInfoCommand), sizeof(BucketCommand)); EXPECT_EQ(sizeof(TestAndSetCommand), sizeof(BucketInfoCommand) + sizeof(vespalib::string)); EXPECT_EQ(sizeof(PutCommand), sizeof(TestAndSetCommand) + 40); - EXPECT_EQ(sizeof(UpdateCommand), sizeof(TestAndSetCommand) + 32); + EXPECT_EQ(sizeof(UpdateCommand), sizeof(TestAndSetCommand) + 40); EXPECT_EQ(sizeof(RemoveCommand), sizeof(TestAndSetCommand) + 112); EXPECT_EQ(sizeof(GetCommand), sizeof(BucketInfoCommand) + sizeof(TestAndSetCondition) + 184); } diff --git a/storage/src/tests/storageserver/bouncertest.cpp b/storage/src/tests/storageserver/bouncertest.cpp index 296ed6d23bc..11742dd658f 100644 --- a/storage/src/tests/storageserver/bouncertest.cpp +++ b/storage/src/tests/storageserver/bouncertest.cpp @@ -1,6 +1,7 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <tests/common/dummystoragelink.h> +#include <tests/common/storage_config_set.h> #include <tests/common/testhelper.h> #include <tests/common/teststorageapp.h> #include <vespa/config/common/exceptions.h> @@ -26,6 +27,7 @@ using namespace ::testing; namespace storage { struct BouncerTest : public Test { + std::unique_ptr<StorageConfigSet> _config; std::unique_ptr<TestStorageApp> _node; std::unique_ptr<DummyStorageLink> _upper; Bouncer* _manager; @@ -57,15 +59,15 @@ BouncerTest::BouncerTest() } void BouncerTest::setUpAsNode(const lib::NodeType& type) { - vdstestlib::DirConfig config(getStandardConfig(type == lib::NodeType::STORAGE)); + _config = StorageConfigSet::make_node_config(type == lib::NodeType::STORAGE); if (type == lib::NodeType::STORAGE) { - _node = std::make_unique<TestServiceLayerApp>(NodeIndex(2), config.getConfigId()); + _node = std::make_unique<TestServiceLayerApp>(NodeIndex(2), _config->config_uri()); } else { - _node = std::make_unique<TestDistributorApp>(NodeIndex(2), config.getConfigId()); + _node = std::make_unique<TestDistributorApp>(NodeIndex(2), _config->config_uri()); } _upper = std::make_unique<DummyStorageLink>(); using StorBouncerConfig = vespa::config::content::core::StorBouncerConfig; - auto cfg_uri = config::ConfigUri(config.getConfigId()); + auto& cfg_uri = _config->config_uri(); auto cfg = config::ConfigGetter<StorBouncerConfig>::getConfig(cfg_uri.getConfigId(), cfg_uri.getContext()); _manager = new Bouncer(_node->getComponentRegister(), *cfg); _lower = new DummyStorageLink(); diff --git a/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp b/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp index 50977b5ec8b..8982b02f2b7 100644 --- a/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp +++ b/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp @@ -3,6 +3,7 @@ #include <tests/common/teststorageapp.h> #include <tests/common/testhelper.h> #include <tests/common/dummystoragelink.h> +#include <tests/common/storage_config_set.h> #include <vespa/config/helper/configgetter.hpp> #include <vespa/document/base/testdocman.h> #include <vespa/storage/bucketdb/storbucketdb.h> @@ -28,11 +29,12 @@ using namespace ::testing; namespace storage { struct ChangedBucketOwnershipHandlerTest : Test { + std::unique_ptr<StorageConfigSet> _config; std::unique_ptr<TestServiceLayerApp> _app; - std::unique_ptr<DummyStorageLink> _top; - ChangedBucketOwnershipHandler* _handler; - DummyStorageLink* _bottom; - document::TestDocMan _testDocRepo; + std::unique_ptr<DummyStorageLink> _top; + ChangedBucketOwnershipHandler* _handler; + DummyStorageLink* _bottom; + document::TestDocMan _testDocRepo; // TODO test: down edge triggered on cluster state with cluster down? @@ -126,11 +128,12 @@ void ChangedBucketOwnershipHandlerTest::SetUp() { using vespa::config::content::PersistenceConfig; - vdstestlib::DirConfig config(getStandardConfig(true)); - _app.reset(new TestServiceLayerApp); - _top.reset(new DummyStorageLink); - _handler = new ChangedBucketOwnershipHandler(*config_from<PersistenceConfig>(config::ConfigUri(config.getConfigId())), + _config = StorageConfigSet::make_storage_node_config(); + _app = std::make_unique<TestServiceLayerApp>(NodeIndex(0), _config->config_uri()); + _top = std::make_unique<DummyStorageLink>(); + + _handler = new ChangedBucketOwnershipHandler(*config_from<PersistenceConfig>(_config->config_uri()), _app->getComponentRegister()); _top->push_back(std::unique_ptr<StorageLink>(_handler)); _bottom = new DummyStorageLink; diff --git a/storage/src/tests/storageserver/communicationmanagertest.cpp b/storage/src/tests/storageserver/communicationmanagertest.cpp index 04322562d08..b741d79582f 100644 --- a/storage/src/tests/storageserver/communicationmanagertest.cpp +++ b/storage/src/tests/storageserver/communicationmanagertest.cpp @@ -1,6 +1,7 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <tests/common/dummystoragelink.h> +#include <tests/common/storage_config_set.h> #include <tests/common/testhelper.h> #include <tests/common/teststorageapp.h> #include <vespa/config/helper/configgetter.hpp> @@ -65,20 +66,20 @@ wait_for_slobrok_visibility(const CommunicationManager& mgr, TEST_F(CommunicationManagerTest, simple) { mbus::Slobrok slobrok; - vdstestlib::DirConfig distConfig(getStandardConfig(false)); - vdstestlib::DirConfig storConfig(getStandardConfig(true)); - distConfig.getConfig("stor-server").set("node_index", "1"); - storConfig.getConfig("stor-server").set("node_index", "1"); - addSlobrokConfig(distConfig, slobrok); - addSlobrokConfig(storConfig, slobrok); + auto dist_config = StorageConfigSet::make_distributor_node_config(); + auto stor_config = StorageConfigSet::make_storage_node_config(); + dist_config->set_node_index(1); + stor_config->set_node_index(1); + dist_config->set_slobrok_config_port(slobrok.port()); + stor_config->set_slobrok_config_port(slobrok.port()); + + auto& dist_cfg_uri = dist_config->config_uri(); + auto& stor_cfg_uri = stor_config->config_uri(); // Set up a "distributor" and a "storage" node with communication // managers and a dummy storage link below we can use for testing. - TestServiceLayerApp storNode(storConfig.getConfigId()); - TestDistributorApp distNode(distConfig.getConfigId()); - - auto dist_cfg_uri = config::ConfigUri(distConfig.getConfigId()); - auto stor_cfg_uri = config::ConfigUri(storConfig.getConfigId()); + TestServiceLayerApp storNode(stor_cfg_uri); + TestDistributorApp distNode(dist_cfg_uri); CommunicationManager distributor(distNode.getComponentRegister(), dist_cfg_uri, *config_from<CommunicationManagerConfig>(dist_cfg_uri)); @@ -123,23 +124,22 @@ void CommunicationManagerTest::doTestConfigPropagation(bool isContentNode) { mbus::Slobrok slobrok; - vdstestlib::DirConfig config(getStandardConfig(isContentNode)); - config.getConfig("stor-server").set("node_index", "1"); - auto& cfg = config.getConfig("stor-communicationmanager"); - cfg.set("mbus_content_node_max_pending_count", "12345"); - cfg.set("mbus_content_node_max_pending_size", "555666"); - cfg.set("mbus_distributor_node_max_pending_count", "6789"); - cfg.set("mbus_distributor_node_max_pending_size", "777888"); - addSlobrokConfig(config, slobrok); + auto config = StorageConfigSet::make_node_config(isContentNode); + config->set_node_index(1); + config->set_slobrok_config_port(slobrok.port()); + config->communication_manager_config().mbusContentNodeMaxPendingCount = 12345; + config->communication_manager_config().mbusContentNodeMaxPendingSize = 555666; + config->communication_manager_config().mbusDistributorNodeMaxPendingCount = 6789; + config->communication_manager_config().mbusDistributorNodeMaxPendingSize = 777888; + auto& cfg_uri = config->config_uri(); std::unique_ptr<TestStorageApp> node; if (isContentNode) { - node = std::make_unique<TestServiceLayerApp>(config.getConfigId()); + node = std::make_unique<TestServiceLayerApp>(cfg_uri); } else { - node = std::make_unique<TestDistributorApp>(config.getConfigId()); + node = std::make_unique<TestDistributorApp>(cfg_uri); } - auto cfg_uri = config::ConfigUri(config.getConfigId()); CommunicationManager commMgr(node->getComponentRegister(), cfg_uri, *config_from<CommunicationManagerConfig>(cfg_uri)); auto* storageLink = new DummyStorageLink(); @@ -180,12 +180,12 @@ TEST_F(CommunicationManagerTest, stor_pending_limit_configs_are_propagated_to_me TEST_F(CommunicationManagerTest, commands_are_dequeued_in_fifo_order) { mbus::Slobrok slobrok; - vdstestlib::DirConfig storConfig(getStandardConfig(true)); - storConfig.getConfig("stor-server").set("node_index", "1"); - addSlobrokConfig(storConfig, slobrok); - TestServiceLayerApp storNode(storConfig.getConfigId()); + auto config = StorageConfigSet::make_storage_node_config(); + config->set_node_index(1); + config->set_slobrok_config_port(slobrok.port()); + auto& cfg_uri = config->config_uri(); + TestServiceLayerApp storNode(cfg_uri); - auto cfg_uri = config::ConfigUri(storConfig.getConfigId()); CommunicationManager storage(storNode.getComponentRegister(), cfg_uri, *config_from<CommunicationManagerConfig>(cfg_uri)); auto* storageLink = new DummyStorageLink(); @@ -214,12 +214,12 @@ TEST_F(CommunicationManagerTest, commands_are_dequeued_in_fifo_order) { TEST_F(CommunicationManagerTest, replies_are_dequeued_in_fifo_order) { mbus::Slobrok slobrok; - vdstestlib::DirConfig storConfig(getStandardConfig(true)); - storConfig.getConfig("stor-server").set("node_index", "1"); - addSlobrokConfig(storConfig, slobrok); - TestServiceLayerApp storNode(storConfig.getConfigId()); + auto config = StorageConfigSet::make_storage_node_config(); + config->set_node_index(1); + config->set_slobrok_config_port(slobrok.port()); + auto& cfg_uri = config->config_uri(); + TestServiceLayerApp storNode(cfg_uri); - auto cfg_uri = config::ConfigUri(storConfig.getConfigId()); CommunicationManager storage(storNode.getComponentRegister(), cfg_uri, *config_from<CommunicationManagerConfig>(cfg_uri)); auto* storageLink = new DummyStorageLink(); @@ -249,19 +249,21 @@ struct MockMbusReplyHandler : mbus::IReplyHandler { }; struct CommunicationManagerFixture { + std::unique_ptr<StorageConfigSet> config; MockMbusReplyHandler reply_handler; mbus::Slobrok slobrok; std::unique_ptr<TestServiceLayerApp> node; std::unique_ptr<CommunicationManager> comm_mgr; DummyStorageLink* bottom_link; - CommunicationManagerFixture() { - vdstestlib::DirConfig stor_config(getStandardConfig(true)); - stor_config.getConfig("stor-server").set("node_index", "1"); - addSlobrokConfig(stor_config, slobrok); + CommunicationManagerFixture() + : config(StorageConfigSet::make_storage_node_config()) + { + config->set_node_index(1); + config->set_slobrok_config_port(slobrok.port()); + auto& cfg_uri = config->config_uri(); - node = std::make_unique<TestServiceLayerApp>(stor_config.getConfigId()); - auto cfg_uri = config::ConfigUri(stor_config.getConfigId()); + node = std::make_unique<TestServiceLayerApp>(cfg_uri); comm_mgr = std::make_unique<CommunicationManager>(node->getComponentRegister(), cfg_uri, *config_from<CommunicationManagerConfig>(cfg_uri)); bottom_link = new DummyStorageLink(); diff --git a/storage/src/tests/storageserver/documentapiconvertertest.cpp b/storage/src/tests/storageserver/documentapiconvertertest.cpp index eb4789b25d4..1eb6bf5dd9a 100644 --- a/storage/src/tests/storageserver/documentapiconvertertest.cpp +++ b/storage/src/tests/storageserver/documentapiconvertertest.cpp @@ -159,28 +159,46 @@ TEST_F(DocumentApiConverterTest, forwarded_put) { } TEST_F(DocumentApiConverterTest, update) { - auto update = std::make_shared<document::DocumentUpdate>(*_repo, _html_type, defaultDocId); - documentapi::UpdateDocumentMessage updateMsg(update); - updateMsg.setOldTimestamp(1234); - updateMsg.setNewTimestamp(5678); - updateMsg.setCondition(my_condition); - - auto updateCmd = toStorageAPI<api::UpdateCommand>(updateMsg); - EXPECT_EQ(defaultBucket, updateCmd->getBucket()); - ASSERT_EQ(update.get(), updateCmd->getUpdate().get()); - EXPECT_EQ(api::Timestamp(1234), updateCmd->getOldTimestamp()); - EXPECT_EQ(api::Timestamp(5678), updateCmd->getTimestamp()); - EXPECT_EQ(my_condition, updateCmd->getCondition()); - - auto mbusReply = updateMsg.createReply(); - ASSERT_TRUE(mbusReply.get()); - toStorageAPI<api::UpdateReply>(*mbusReply, *updateCmd); - - auto mbusUpdate = toDocumentAPI<documentapi::UpdateDocumentMessage>(*updateCmd); - ASSERT_EQ((&mbusUpdate->getDocumentUpdate()), update.get()); - EXPECT_EQ(api::Timestamp(1234), mbusUpdate->getOldTimestamp()); - EXPECT_EQ(api::Timestamp(5678), mbusUpdate->getNewTimestamp()); - EXPECT_EQ(my_condition, mbusUpdate->getCondition()); + auto do_test_update = [&](bool create_if_missing) { + auto update = std::make_shared<document::DocumentUpdate>(*_repo, _html_type, defaultDocId); + update->setCreateIfNonExistent(create_if_missing); + documentapi::UpdateDocumentMessage updateMsg(update); + updateMsg.setOldTimestamp(1234); + updateMsg.setNewTimestamp(5678); + updateMsg.setCondition(my_condition); + EXPECT_FALSE(updateMsg.has_cached_create_if_missing()); + EXPECT_EQ(updateMsg.create_if_missing(), create_if_missing); + + auto updateCmd = toStorageAPI<api::UpdateCommand>(updateMsg); + EXPECT_EQ(defaultBucket, updateCmd->getBucket()); + ASSERT_EQ(update.get(), updateCmd->getUpdate().get()); + EXPECT_EQ(api::Timestamp(1234), updateCmd->getOldTimestamp()); + EXPECT_EQ(api::Timestamp(5678), updateCmd->getTimestamp()); + EXPECT_EQ(my_condition, updateCmd->getCondition()); + EXPECT_FALSE(updateCmd->has_cached_create_if_missing()); + EXPECT_EQ(updateCmd->create_if_missing(), create_if_missing); + + auto mbusReply = updateMsg.createReply(); + ASSERT_TRUE(mbusReply.get()); + toStorageAPI<api::UpdateReply>(*mbusReply, *updateCmd); + + auto mbusUpdate = toDocumentAPI<documentapi::UpdateDocumentMessage>(*updateCmd); + ASSERT_EQ((&mbusUpdate->getDocumentUpdate()), update.get()); + EXPECT_EQ(api::Timestamp(1234), mbusUpdate->getOldTimestamp()); + EXPECT_EQ(api::Timestamp(5678), mbusUpdate->getNewTimestamp()); + EXPECT_EQ(my_condition, mbusUpdate->getCondition()); + EXPECT_EQ(mbusUpdate->create_if_missing(), create_if_missing); + + // Cached value of create_if_missing should override underlying update's value + updateCmd->set_cached_create_if_missing(!create_if_missing); + EXPECT_TRUE(updateCmd->has_cached_create_if_missing()); + EXPECT_EQ(updateCmd->create_if_missing(), !create_if_missing); + mbusUpdate = toDocumentAPI<documentapi::UpdateDocumentMessage>(*updateCmd); + EXPECT_TRUE(mbusUpdate->has_cached_create_if_missing()); + EXPECT_EQ(mbusUpdate->create_if_missing(), !create_if_missing); + }; + do_test_update(false); + do_test_update(true); } TEST_F(DocumentApiConverterTest, remove) { diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp index cdf203b8a39..bc1c7f60706 100644 --- a/storage/src/tests/storageserver/mergethrottlertest.cpp +++ b/storage/src/tests/storageserver/mergethrottlertest.cpp @@ -1,10 +1,12 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <tests/common/dummystoragelink.h> +#include <tests/common/storage_config_set.h> #include <tests/common/testhelper.h> #include <tests/common/teststorageapp.h> #include <vespa/config/helper/configgetter.hpp> #include <vespa/document/test/make_document_bucket.h> #include <vespa/messagebus/dynamicthrottlepolicy.h> +#include <vespa/storage/config/config-stor-server.h> #include <vespa/storage/persistence/messages.h> #include <vespa/storage/storageserver/mergethrottler.h> #include <vespa/storageapi/message/bucket.h> @@ -36,9 +38,8 @@ using StorServerConfigBuilder = vespa::config::content::core::StorServerConfigBu vespalib::string _storage("storage"); std::unique_ptr<StorServerConfig> default_server_config() { - vdstestlib::DirConfig dir_config(getStandardConfig(true)); - auto cfg_uri = ::config::ConfigUri(dir_config.getConfigId()); - return config_from<StorServerConfig>(cfg_uri); + auto config = StorageConfigSet::make_storage_node_config(); + return config_from<StorServerConfig>(config->config_uri()); } struct MergeBuilder { @@ -153,8 +154,9 @@ struct MergeThrottlerTest : Test { static constexpr int _messageWaitTime = 100; // Using n storage node links and dummy servers - std::vector<std::shared_ptr<DummyStorageLink> > _topLinks; - std::vector<std::shared_ptr<TestServiceLayerApp> > _servers; + std::unique_ptr<StorageConfigSet> _config; + std::vector<std::shared_ptr<DummyStorageLink>> _topLinks; + std::vector<std::shared_ptr<TestServiceLayerApp>> _servers; std::vector<MergeThrottler*> _throttlers; std::vector<DummyStorageLink*> _bottomLinks; @@ -198,14 +200,14 @@ MergeThrottlerTest::~MergeThrottlerTest() = default; void MergeThrottlerTest::SetUp() { - auto config = default_server_config(); + _config = StorageConfigSet::make_storage_node_config(); for (int i = 0; i < _storageNodeCount; ++i) { - auto server = std::make_unique<TestServiceLayerApp>(NodeIndex(i)); + auto server = std::make_unique<TestServiceLayerApp>(NodeIndex(i), _config->config_uri()); server->setClusterState(lib::ClusterState("distributor:100 storage:100 version:1")); std::unique_ptr<DummyStorageLink> top; top = std::make_unique<DummyStorageLink>(); - auto* throttler = new MergeThrottler(*config, server->getComponentRegister(), vespalib::HwInfo()); + auto* throttler = new MergeThrottler(_config->server_config(), server->getComponentRegister(), vespalib::HwInfo()); // MergeThrottler will be sandwiched in between two dummy links top->push_back(std::unique_ptr<StorageLink>(throttler)); auto* bottom = new DummyStorageLink; diff --git a/storage/src/tests/storageserver/rpc/cluster_controller_rpc_api_service_test.cpp b/storage/src/tests/storageserver/rpc/cluster_controller_rpc_api_service_test.cpp index 6e9485e24d4..c3641b9bc56 100644 --- a/storage/src/tests/storageserver/rpc/cluster_controller_rpc_api_service_test.cpp +++ b/storage/src/tests/storageserver/rpc/cluster_controller_rpc_api_service_test.cpp @@ -1,5 +1,6 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <tests/common/storage_config_set.h> #include <vespa/document/bucket/fixed_bucket_spaces.h> #include <vespa/fnet/frt/rpcrequest.h> #include <vespa/messagebus/testlib/slobrok.h> @@ -11,7 +12,6 @@ #include <vespa/storageapi/message/state.h> #include <vespa/vdslib/state/clusterstate.h> #include <vespa/vespalib/stllike/asciistream.h> -#include <tests/common/testhelper.h> #include <vespa/vespalib/gtest/gtest.h> #include <vector> @@ -43,7 +43,7 @@ struct DummyReturnHandler : FRT_IReturnHandler { struct FixtureBase { mbus::Slobrok slobrok; - vdstestlib::DirConfig config; + std::unique_ptr<StorageConfigSet> config; MockOperationDispatcher dispatcher; std::unique_ptr<SharedRpcResources> shared_rpc_resources; std::unique_ptr<ClusterControllerApiRpcService> cc_service; @@ -52,12 +52,12 @@ struct FixtureBase { FRT_RPCRequest* bound_request{nullptr}; FixtureBase() - : config(getStandardConfig(true)) + : config(StorageConfigSet::make_storage_node_config()) { - config.getConfig("stor-server").set("node_index", "1"); - addSlobrokConfig(config, slobrok); + config->set_node_index(1); + config->set_slobrok_config_port(slobrok.port()); - shared_rpc_resources = std::make_unique<SharedRpcResources>(config::ConfigUri(config.getConfigId()), 0, 1, 1); + shared_rpc_resources = std::make_unique<SharedRpcResources>(config->config_uri(), 0, 1, 1); cc_service = std::make_unique<ClusterControllerApiRpcService>(dispatcher, *shared_rpc_resources); shared_rpc_resources->start_server_and_register_slobrok("my_cool_rpc_test"); } diff --git a/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp b/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp index 72ddc89f9d3..010f2b441ef 100644 --- a/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp +++ b/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp @@ -1,6 +1,6 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <tests/common/testhelper.h> +#include <tests/common/storage_config_set.h> #include <vespa/document/base/testdocman.h> #include <vespa/document/repo/documenttyperepo.h> #include <vespa/document/fieldvalue/stringfieldvalue.h> @@ -102,7 +102,7 @@ vespalib::string to_slobrok_id(const api::StorageMessageAddress& address) { class RpcNode { protected: - vdstestlib::DirConfig _config; + std::unique_ptr<StorageConfigSet> _config; std::shared_ptr<const document::DocumentTypeRepo> _doc_type_repo; LockingMockOperationDispatcher _messages; std::unique_ptr<MessageCodecProvider> _codec_provider; @@ -111,17 +111,15 @@ protected: vespalib::string _slobrok_id; public: RpcNode(uint16_t node_index, bool is_distributor, const mbus::Slobrok& slobrok) - : _config(getStandardConfig(true)), + : _config(StorageConfigSet::make_node_config(!is_distributor)), _doc_type_repo(document::TestDocRepo().getTypeRepoSp()), _node_address(make_address(node_index, is_distributor)), _slobrok_id(to_slobrok_id(_node_address)) { - auto& cfg = _config.getConfig("stor-server"); - cfg.set("node_index", std::to_string(node_index)); - cfg.set("is_distributor", is_distributor ? "true" : "false"); - addSlobrokConfig(_config, slobrok); + _config->set_node_index(node_index); + _config->set_slobrok_config_port(slobrok.port()); - _shared_rpc_resources = std::make_unique<SharedRpcResources>(config::ConfigUri(_config.getConfigId()), 0, 1, 1); + _shared_rpc_resources = std::make_unique<SharedRpcResources>(_config->config_uri(), 0, 1, 1); // TODO make codec provider into interface so we can test decode-failures more easily? _codec_provider = std::make_unique<MessageCodecProvider>(_doc_type_repo); } diff --git a/storage/src/tests/storageserver/service_layer_error_listener_test.cpp b/storage/src/tests/storageserver/service_layer_error_listener_test.cpp index 63d8eec6dc3..b84f96dd847 100644 --- a/storage/src/tests/storageserver/service_layer_error_listener_test.cpp +++ b/storage/src/tests/storageserver/service_layer_error_listener_test.cpp @@ -1,12 +1,12 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <tests/common/storage_config_set.h> #include <tests/common/testhelper.h> #include <tests/common/teststorageapp.h> #include <vespa/config/helper/configgetter.hpp> #include <vespa/storage/storageserver/mergethrottler.h> #include <vespa/storage/storageserver/service_layer_error_listener.h> #include <vespa/storageframework/defaultimplementation/component/componentregisterimpl.h> -#include <vespa/vdstestlib/config/dirconfig.h> #include <vespa/vespalib/gtest/gtest.h> using namespace ::testing; @@ -37,10 +37,10 @@ private: struct Fixture { using StorServerConfig = vespa::config::content::core::StorServerConfig; - vdstestlib::DirConfig config{getStandardConfig(true)}; - TestServiceLayerApp app; + std::unique_ptr<StorageConfigSet> config{StorageConfigSet::make_storage_node_config()}; + TestServiceLayerApp app{config->config_uri()}; ServiceLayerComponent component{app.getComponentRegister(), "dummy"}; - MergeThrottler merge_throttler{*config_from<StorServerConfig>(config::ConfigUri(config.getConfigId())), + MergeThrottler merge_throttler{*config_from<StorServerConfig>(config->config_uri()), app.getComponentRegister(), vespalib::HwInfo()}; TestShutdownListener shutdown_listener; ServiceLayerErrorListener error_listener{component, merge_throttler}; diff --git a/storage/src/tests/storageserver/statemanagertest.cpp b/storage/src/tests/storageserver/statemanagertest.cpp index 2a5af397aca..79246cb3ce1 100644 --- a/storage/src/tests/storageserver/statemanagertest.cpp +++ b/storage/src/tests/storageserver/statemanagertest.cpp @@ -1,13 +1,14 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <tests/common/dummystoragelink.h> +#include <tests/common/storage_config_set.h> +#include <tests/common/teststorageapp.h> +#include <tests/common/testhelper.h> #include <vespa/storageapi/message/bucket.h> #include <vespa/storageapi/message/state.h> #include <vespa/vdslib/state/cluster_state_bundle.h> #include <vespa/vdslib/state/clusterstate.h> #include <vespa/storage/storageserver/statemanager.h> -#include <tests/common/teststorageapp.h> -#include <tests/common/testhelper.h> -#include <tests/common/dummystoragelink.h> #include <vespa/vespalib/data/slime/slime.h> #include <vespa/vespalib/gtest/gtest.h> @@ -20,6 +21,7 @@ using namespace ::testing; namespace storage { struct StateManagerTest : Test, NodeStateReporter { + std::unique_ptr<StorageConfigSet> _config; std::unique_ptr<TestServiceLayerApp> _node; std::unique_ptr<DummyStorageLink> _upper; StateManager* _manager; @@ -30,7 +32,18 @@ struct StateManagerTest : Test, NodeStateReporter { void SetUp() override; void TearDown() override; - void force_current_cluster_state_version(uint32_t version); + static std::shared_ptr<api::SetSystemStateCommand> make_set_state_cmd(vespalib::stringref state_str, uint16_t cc_index) { + auto cmd = std::make_shared<api::SetSystemStateCommand>(lib::ClusterState(state_str)); + cmd->setSourceIndex(cc_index); + return cmd; + } + + void get_single_reply(std::shared_ptr<api::StorageReply>& reply_out); + void get_only_ok_reply(std::shared_ptr<api::StorageReply>& reply_out); + void force_current_cluster_state_version(uint32_t version, uint16_t cc_index); + void force_current_cluster_state_version(uint32_t version) { + force_current_cluster_state_version(version, 0); + } void mark_reported_node_state_up(); void send_down_get_node_state_request(uint16_t controller_index); void assert_ok_get_node_state_reply_sent_and_clear(); @@ -46,7 +59,8 @@ struct StateManagerTest : Test, NodeStateReporter { }; StateManagerTest::StateManagerTest() - : _node(), + : _config(), + _node(), _upper(), _manager(nullptr), _lower(nullptr) @@ -56,7 +70,8 @@ StateManagerTest::StateManagerTest() void StateManagerTest::SetUp() { - _node = std::make_unique<TestServiceLayerApp>(NodeIndex(2)); + _config = StorageConfigSet::make_storage_node_config(); + _node = std::make_unique<TestServiceLayerApp>(NodeIndex(2), _config->config_uri()); // Clock will increase 1 sec per call. _node->getClock().setAbsoluteTimeInSeconds(1); _upper = std::make_unique<DummyStorageLink>(); @@ -82,11 +97,30 @@ StateManagerTest::TearDown() { } void -StateManagerTest::force_current_cluster_state_version(uint32_t version) +StateManagerTest::get_single_reply(std::shared_ptr<api::StorageReply>& reply_out) +{ + ASSERT_EQ(_upper->getNumReplies(), 1); + ASSERT_TRUE(_upper->getReply(0)->getType().isReply()); + reply_out = std::dynamic_pointer_cast<api::StorageReply>(_upper->getReply(0)); + ASSERT_TRUE(reply_out.get() != nullptr); + _upper->reset(); +} + +void +StateManagerTest::get_only_ok_reply(std::shared_ptr<api::StorageReply>& reply_out) +{ + ASSERT_NO_FATAL_FAILURE(get_single_reply(reply_out)); + ASSERT_EQ(reply_out->getResult(), api::ReturnCode(api::ReturnCode::OK)); +} + +void +StateManagerTest::force_current_cluster_state_version(uint32_t version, uint16_t cc_index) { ClusterState state(*_manager->getClusterStateBundle()->getBaselineClusterState()); state.setVersion(version); - _manager->setClusterStateBundle(lib::ClusterStateBundle(state)); + const auto maybe_rejected_by_ver = _manager->try_set_cluster_state_bundle( + std::make_shared<const lib::ClusterStateBundle>(state), cc_index); + ASSERT_EQ(maybe_rejected_by_ver, std::nullopt); } void @@ -114,23 +148,12 @@ StateManagerTest::extract_cluster_state_version_from_host_info(uint32_t& version version_out = clusterStateVersionCursor.asLong(); } -#define GET_ONLY_OK_REPLY(varname) \ -{ \ - ASSERT_EQ(size_t(1), _upper->getNumReplies()); \ - ASSERT_TRUE(_upper->getReply(0)->getType().isReply()); \ - varname = std::dynamic_pointer_cast<api::StorageReply>( \ - _upper->getReply(0)); \ - ASSERT_TRUE(varname.get() != nullptr); \ - _upper->reset(); \ - ASSERT_EQ(api::ReturnCode(api::ReturnCode::OK), \ - varname->getResult()); \ -} - TEST_F(StateManagerTest, cluster_state) { std::shared_ptr<api::StorageReply> reply; // Verify initial state on startup auto currentState = _manager->getClusterStateBundle()->getBaselineClusterState(); EXPECT_EQ("cluster:d", currentState->toString(false)); + EXPECT_EQ(currentState->getVersion(), 0); auto currentNodeState = _manager->getCurrentNodeState(); EXPECT_EQ("s:d", currentNodeState->toString(false)); @@ -138,7 +161,7 @@ TEST_F(StateManagerTest, cluster_state) { ClusterState sendState("storage:4 .2.s:m"); auto cmd = std::make_shared<api::SetSystemStateCommand>(sendState); _upper->sendDown(cmd); - GET_ONLY_OK_REPLY(reply); + ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply)); currentState = _manager->getClusterStateBundle()->getBaselineClusterState(); EXPECT_EQ(sendState, *currentState); @@ -147,13 +170,64 @@ TEST_F(StateManagerTest, cluster_state) { EXPECT_EQ("s:m", currentNodeState->toString(false)); } +TEST_F(StateManagerTest, accept_lower_state_versions_if_strict_requirement_disabled) { + _manager->set_require_strictly_increasing_cluster_state_versions(false); + + ASSERT_NO_FATAL_FAILURE(force_current_cluster_state_version(123, 1)); // CC 1 + ASSERT_EQ(_manager->getClusterStateBundle()->getVersion(), 123); + + _upper->sendDown(make_set_state_cmd("version:122 distributor:1 storage:1", 0)); // CC 0 + std::shared_ptr<api::StorageReply> reply; + ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply)); + EXPECT_EQ(_manager->getClusterStateBundle()->getVersion(), 122); +} + +TEST_F(StateManagerTest, reject_lower_state_versions_if_strict_requirement_enabled) { + _manager->set_require_strictly_increasing_cluster_state_versions(true); + + ASSERT_NO_FATAL_FAILURE(force_current_cluster_state_version(123, 1)); // CC 1 + ASSERT_EQ(_manager->getClusterStateBundle()->getVersion(), 123); + + _upper->sendDown(make_set_state_cmd("version:122 distributor:1 storage:1", 0)); // CC 0 + std::shared_ptr<api::StorageReply> reply; + ASSERT_NO_FATAL_FAILURE(get_single_reply(reply)); + api::ReturnCode expected_res(api::ReturnCode::REJECTED, "Cluster state version 122 rejected; node already has " + "a higher cluster state version (123)"); + EXPECT_EQ(reply->getResult(), expected_res); + EXPECT_EQ(_manager->getClusterStateBundle()->getVersion(), 123); +} + +// Observing a lower cluster state version from the same CC index directly implies that the ZooKeeper +// state has been lost, at which point we pragmatically (but begrudgingly) accept the state version +// to avoid stalling the entire cluster for an indeterminate amount of time. +TEST_F(StateManagerTest, accept_lower_state_versions_from_same_cc_index_even_if_strict_requirement_enabled) { + _manager->set_require_strictly_increasing_cluster_state_versions(true); + + ASSERT_NO_FATAL_FAILURE(force_current_cluster_state_version(123, 1)); // CC 1 + ASSERT_EQ(_manager->getClusterStateBundle()->getVersion(), 123); + + ASSERT_NO_FATAL_FAILURE(force_current_cluster_state_version(124, 2)); // CC 2 + ASSERT_EQ(_manager->getClusterStateBundle()->getVersion(), 124); + + // CC 1 restarts from scratch with previous ZK state up in smoke. + _upper->sendDown(make_set_state_cmd("version:3 distributor:1 storage:1", 1)); + std::shared_ptr<api::StorageReply> reply; + ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply)); + EXPECT_EQ(_manager->getClusterStateBundle()->getVersion(), 3); + + // CC 2 restarts and continues from where CC 1 left off. + _upper->sendDown(make_set_state_cmd("version:4 distributor:1 storage:1", 2)); + ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply)); + EXPECT_EQ(_manager->getClusterStateBundle()->getVersion(), 4); +} + namespace { struct MyStateListener : public StateListener { const NodeStateUpdater& updater; lib::NodeState current; std::ostringstream ost; - MyStateListener(const NodeStateUpdater& upd); + explicit MyStateListener(const NodeStateUpdater& upd); ~MyStateListener() override; void handleNewState() noexcept override { @@ -190,7 +264,7 @@ TEST_F(StateManagerTest, reported_node_state) { // And get node state command (no expected state) auto cmd = std::make_shared<api::GetNodeStateCommand>(lib::NodeState::UP()); _upper->sendDown(cmd); - GET_ONLY_OK_REPLY(reply); + ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply)); ASSERT_EQ(api::MessageType::GETNODESTATE_REPLY, reply->getType()); nodeState = std::make_shared<NodeState>( dynamic_cast<api::GetNodeStateReply&>(*reply).getNodeState()); @@ -199,7 +273,7 @@ TEST_F(StateManagerTest, reported_node_state) { cmd = std::make_shared<api::GetNodeStateCommand>( std::make_unique<NodeState>(NodeType::STORAGE, State::INITIALIZING)); _upper->sendDown(cmd); - GET_ONLY_OK_REPLY(reply); + ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply)); ASSERT_EQ(api::MessageType::GETNODESTATE_REPLY, reply->getType()); nodeState = std::make_unique<NodeState>( dynamic_cast<api::GetNodeStateReply&>(*reply).getNodeState()); @@ -218,7 +292,7 @@ TEST_F(StateManagerTest, reported_node_state) { _manager->setReportedNodeState(ns); } - GET_ONLY_OK_REPLY(reply); + ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply)); ASSERT_EQ(api::MessageType::GETNODESTATE_REPLY, reply->getType()); nodeState = std::make_unique<NodeState>( dynamic_cast<api::GetNodeStateReply&>(*reply).getNodeState()); @@ -240,7 +314,7 @@ TEST_F(StateManagerTest, reported_node_state) { } TEST_F(StateManagerTest, current_cluster_state_version_is_included_in_host_info_json) { - force_current_cluster_state_version(123); + ASSERT_NO_FATAL_FAILURE(force_current_cluster_state_version(123)); uint32_t version; ASSERT_NO_FATAL_FAILURE(extract_cluster_state_version_from_host_info(version)); EXPECT_EQ(version, 123); @@ -262,7 +336,7 @@ void StateManagerTest::send_down_get_node_state_request(uint16_t controller_inde void StateManagerTest::assert_ok_get_node_state_reply_sent_and_clear() { ASSERT_EQ(1, _upper->getNumReplies()); std::shared_ptr<api::StorageReply> reply; - GET_ONLY_OK_REPLY(reply); // Implicitly clears messages from _upper + ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply)); // Implicitly clears messages from _upper ASSERT_EQ(api::MessageType::GETNODESTATE_REPLY, reply->getType()); } @@ -340,7 +414,7 @@ TEST_F(StateManagerTest, request_almost_immediate_replies_triggers_fast_reply) } TEST_F(StateManagerTest, activation_command_is_bounced_with_current_cluster_state_version) { - force_current_cluster_state_version(12345); + ASSERT_NO_FATAL_FAILURE(force_current_cluster_state_version(12345)); auto cmd = std::make_shared<api::ActivateClusterStateVersionCommand>(12340); cmd->setTimeout(10000000ms); @@ -349,7 +423,7 @@ TEST_F(StateManagerTest, activation_command_is_bounced_with_current_cluster_stat ASSERT_EQ(1, _upper->getNumReplies()); std::shared_ptr<api::StorageReply> reply; - GET_ONLY_OK_REPLY(reply); // Implicitly clears messages from _upper + ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply)); // Implicitly clears messages from _upper ASSERT_EQ(api::MessageType::ACTIVATE_CLUSTER_STATE_VERSION_REPLY, reply->getType()); auto& activate_reply = dynamic_cast<api::ActivateClusterStateVersionReply&>(*reply); EXPECT_EQ(12340, activate_reply.activateVersion()); @@ -362,7 +436,7 @@ TEST_F(StateManagerTest, non_deferred_cluster_state_sets_reported_cluster_state_ cmd->setSourceIndex(0); _upper->sendDown(cmd); std::shared_ptr<api::StorageReply> reply; - GET_ONLY_OK_REPLY(reply); + ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply)); uint32_t version; ASSERT_NO_FATAL_FAILURE(extract_cluster_state_version_from_host_info(version)); @@ -370,7 +444,7 @@ TEST_F(StateManagerTest, non_deferred_cluster_state_sets_reported_cluster_state_ } TEST_F(StateManagerTest, deferred_cluster_state_does_not_update_state_until_activation_edge) { - force_current_cluster_state_version(100); + ASSERT_NO_FATAL_FAILURE(force_current_cluster_state_version(100)); lib::ClusterStateBundle deferred_bundle(lib::ClusterState("version:101 distributor:1 storage:1"), {}, true); auto state_cmd = std::make_shared<api::SetSystemStateCommand>(deferred_bundle); @@ -378,7 +452,7 @@ TEST_F(StateManagerTest, deferred_cluster_state_does_not_update_state_until_acti state_cmd->setSourceIndex(0); _upper->sendDown(state_cmd); std::shared_ptr<api::StorageReply> reply; - GET_ONLY_OK_REPLY(reply); + ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply)); uint32_t version; ASSERT_NO_FATAL_FAILURE(extract_cluster_state_version_from_host_info(version)); @@ -388,7 +462,7 @@ TEST_F(StateManagerTest, deferred_cluster_state_does_not_update_state_until_acti activation_cmd->setTimeout(1000s); activation_cmd->setSourceIndex(0); _upper->sendDown(activation_cmd); - GET_ONLY_OK_REPLY(reply); + ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply)); ASSERT_NO_FATAL_FAILURE(extract_cluster_state_version_from_host_info(version)); EXPECT_EQ(version, 101); diff --git a/storage/src/tests/storageserver/statereportertest.cpp b/storage/src/tests/storageserver/statereportertest.cpp index 43eb37afe15..29d3daf9b86 100644 --- a/storage/src/tests/storageserver/statereportertest.cpp +++ b/storage/src/tests/storageserver/statereportertest.cpp @@ -1,14 +1,15 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <tests/common/dummystoragelink.h> +#include <tests/common/storage_config_set.h> +#include <tests/common/teststorageapp.h> +#include <tests/common/testhelper.h> #include <vespa/storageframework/defaultimplementation/clock/fakeclock.h> #include <vespa/storage/persistence/filestorage/filestormanager.h> #include <vespa/storage/persistence/filestorage/filestormetrics.h> #include <vespa/storage/storageserver/applicationgenerationfetcher.h> #include <vespa/storage/storageserver/statereporter.h> #include <vespa/metrics/metricmanager.h> -#include <tests/common/teststorageapp.h> -#include <tests/common/testhelper.h> -#include <tests/common/dummystoragelink.h> #include <vespa/config/common/exceptions.h> #include <vespa/vespalib/data/slime/slime.h> #include <vespa/vespalib/data/simple_buffer.h> @@ -35,7 +36,7 @@ struct StateReporterTest : Test { std::unique_ptr<DummyStorageLink> _top; DummyApplicationGenerationFether _generationFetcher; std::unique_ptr<StateReporter> _stateReporter; - std::unique_ptr<vdstestlib::DirConfig> _config; + std::unique_ptr<StorageConfigSet> _config; std::unique_ptr<metrics::MetricSet> _topSet; std::unique_ptr<metrics::MetricManager> _metricManager; std::shared_ptr<FileStorMetrics> _filestorMetrics; @@ -68,10 +69,8 @@ StateReporterTest::StateReporterTest() StateReporterTest::~StateReporterTest() = default; void StateReporterTest::SetUp() { - _config = std::make_unique<vdstestlib::DirConfig>(getStandardConfig(true, "statereportertest")); - assert(system(("rm -rf " + getRootFolder(*_config)).c_str()) == 0); - - _node = std::make_unique<TestServiceLayerApp>(NodeIndex(0), _config->getConfigId()); + _config = StorageConfigSet::make_storage_node_config(); + _node = std::make_unique<TestServiceLayerApp>(NodeIndex(0), _config->config_uri()); _node->setupDummyPersistence(); _clock = &_node->getClock(); _clock->setAbsoluteTimeInSeconds(1000000); @@ -91,7 +90,7 @@ void StateReporterTest::SetUp() { _filestorMetrics->initDiskMetrics(1, 1); _topSet->registerMetric(*_filestorMetrics); - _metricManager->init(config::ConfigUri(_config->getConfigId())); + _metricManager->init(_config->config_uri()); } void StateReporterTest::TearDown() { diff --git a/storage/src/tests/visiting/visitormanagertest.cpp b/storage/src/tests/visiting/visitormanagertest.cpp index 09457038b70..29f065c0157 100644 --- a/storage/src/tests/visiting/visitormanagertest.cpp +++ b/storage/src/tests/visiting/visitormanagertest.cpp @@ -1,5 +1,9 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <tests/common/dummystoragelink.h> +#include <tests/common/storage_config_set.h> +#include <tests/common/teststorageapp.h> +#include <tests/common/testhelper.h> #include <vespa/config/helper/configgetter.hpp> #include <vespa/document/fieldvalue/intfieldvalue.h> #include <vespa/document/fieldvalue/stringfieldvalue.h> @@ -9,9 +13,6 @@ #include <vespa/storage/persistence/filestorage/filestormanager.h> #include <vespa/storage/visiting/visitormanager.h> #include <vespa/storageframework/defaultimplementation/clock/realclock.h> -#include <tests/common/teststorageapp.h> -#include <tests/common/testhelper.h> -#include <tests/common/dummystoragelink.h> #include <vespa/document/test/make_document_bucket.h> #include <vespa/document/test/make_bucket_space.h> #include <tests/storageserver/testvisitormessagesession.h> @@ -45,7 +46,9 @@ api::StorageMessageAddress _address(&_storage, lib::NodeType::STORAGE, 0); struct VisitorManagerTest : Test { protected: static uint32_t docCount; - std::vector<document::Document::SP > _documents; + + std::unique_ptr<StorageConfigSet> _config; + std::vector<document::Document::SP> _documents; std::unique_ptr<TestVisitorMessageSessionFactory> _messageSessionFactory; std::unique_ptr<TestServiceLayerApp> _node; std::unique_ptr<DummyStorageLink> _top; @@ -82,16 +85,16 @@ uint32_t VisitorManagerTest::docCount = 10; void VisitorManagerTest::initializeTest(bool defer_manager_thread_start) { - vdstestlib::DirConfig config(getStandardConfig(true)); - config.getConfig("stor-visitor").set("visitorthreads", "1"); + _config = StorageConfigSet::make_storage_node_config(); + _config->visitor_config().visitorthreads = 1; _messageSessionFactory = std::make_unique<TestVisitorMessageSessionFactory>(); - _node = std::make_unique<TestServiceLayerApp>(config.getConfigId()); + _node = std::make_unique<TestServiceLayerApp>(_config->config_uri()); _node->setupDummyPersistence(); _node->getStateUpdater().setClusterState(std::make_shared<lib::ClusterState>("storage:1 distributor:1")); _top = std::make_unique<DummyStorageLink>(); using vespa::config::content::core::StorVisitorConfig; - auto bootstrap_cfg = config_from<StorVisitorConfig>(config::ConfigUri(config.getConfigId())); + auto bootstrap_cfg = config_from<StorVisitorConfig>(_config->config_uri()); auto vm = std::make_unique<VisitorManager>(*bootstrap_cfg, _node->getComponentRegister(), *_messageSessionFactory, @@ -100,7 +103,7 @@ VisitorManagerTest::initializeTest(bool defer_manager_thread_start) _manager = vm.get(); _top->push_back(std::move(vm)); using StorFilestorConfig = vespa::config::content::internal::InternalStorFilestorType; - auto filestor_cfg = config_from<StorFilestorConfig>(config::ConfigUri(config.getConfigId())); + auto filestor_cfg = config_from<StorFilestorConfig>(_config->config_uri()); _top->push_back(std::make_unique<FileStorManager>(*filestor_cfg, _node->getPersistenceProvider(), _node->getComponentRegister(), *_node, _node->get_host_info())); _manager->setTimeBetweenTicks(10); diff --git a/storage/src/tests/visiting/visitortest.cpp b/storage/src/tests/visiting/visitortest.cpp index f83b6c99d64..075ebd13741 100644 --- a/storage/src/tests/visiting/visitortest.cpp +++ b/storage/src/tests/visiting/visitortest.cpp @@ -1,5 +1,9 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <tests/common/dummystoragelink.h> +#include <tests/common/storage_config_set.h> +#include <tests/common/testhelper.h> +#include <tests/common/teststorageapp.h> #include <vespa/config/common/exceptions.h> #include <vespa/config/helper/configgetter.hpp> #include <vespa/document/fieldvalue/intfieldvalue.h> @@ -14,9 +18,6 @@ #include <vespa/storage/visiting/visitormanager.h> #include <vespa/storageapi/message/datagram.h> #include <vespa/storageapi/message/persistence.h> -#include <tests/common/testhelper.h> -#include <tests/common/teststorageapp.h> -#include <tests/common/dummystoragelink.h> #include <tests/storageserver/testvisitormessagesession.h> #include <vespa/persistence/spi/docentry.h> #include <vespa/vespalib/gtest/gtest.h> @@ -59,6 +60,8 @@ struct TestParams { struct VisitorTest : Test { static uint32_t docCount; + + std::unique_ptr<StorageConfigSet> _config; std::vector<Document::SP> _documents; std::unique_ptr<TestVisitorMessageSessionFactory> _messageSessionFactory; std::unique_ptr<TestServiceLayerApp> _node; @@ -146,31 +149,20 @@ VisitorTest::~VisitorTest() = default; void VisitorTest::initializeTest(const TestParams& params) { - vdstestlib::DirConfig config(getStandardConfig(true, "visitortest")); - config.getConfig("stor-visitor").set("visitorthreads", "1"); - config.getConfig("stor-visitor").set( - "defaultparalleliterators", - std::to_string(params._parallelBuckets)); - config.getConfig("stor-visitor").set( - "visitor_memory_usage_limit", - std::to_string(params._maxVisitorMemoryUsage)); - - std::string rootFolder = getRootFolder(config); - - ::chmod(rootFolder.c_str(), 0755); - std::filesystem::remove_all(std::filesystem::path(rootFolder)); - std::filesystem::create_directories(std::filesystem::path(vespalib::make_string("%s/disks/d0", rootFolder.c_str()))); - std::filesystem::create_directories(std::filesystem::path(vespalib::make_string("%s/disks/d1", rootFolder.c_str()))); + _config = StorageConfigSet::make_storage_node_config(); + _config->visitor_config().visitorthreads = 1; + _config->visitor_config().defaultparalleliterators = params._parallelBuckets; + _config->visitor_config().visitorMemoryUsageLimit = params._maxVisitorMemoryUsage; _messageSessionFactory = std::make_unique<TestVisitorMessageSessionFactory>(); if (params._autoReplyError.getCode() != mbus::ErrorCode::NONE) { _messageSessionFactory->_autoReplyError = params._autoReplyError; _messageSessionFactory->_createAutoReplyVisitorSessions = true; } - _node = std::make_unique<TestServiceLayerApp>(config.getConfigId()); + _node = std::make_unique<TestServiceLayerApp>(_config->config_uri()); _top = std::make_unique<DummyStorageLink>(); using vespa::config::content::core::StorVisitorConfig; - auto bootstrap_cfg = config_from<StorVisitorConfig>(config::ConfigUri(config.getConfigId())); + auto bootstrap_cfg = config_from<StorVisitorConfig>(_config->config_uri()); _top->push_back(std::unique_ptr<StorageLink>(_manager = new VisitorManager(*bootstrap_cfg, _node->getComponentRegister(), *_messageSessionFactory))); _bottom = new DummyStorageLink(); @@ -217,14 +209,12 @@ VisitorTest::initializeTest(const TestParams& params) _documents.clear(); for (uint32_t i=0; i<docCount; ++i) { std::ostringstream uri; - uri << "id:test:testdoctype1:n=" << i % 10 << ":http://www.ntnu.no/" - << i << ".html"; + uri << "id:test:testdoctype1:n=" << i % 10 << ":http://www.ntnu.no/" << i << ".html"; _documents.push_back(Document::SP( _node->getTestDocMan().createDocument(content, uri.str()))); const document::DocumentType& type(_documents.back()->getType()); - _documents.back()->setValue(type.getField("headerval"), - document::IntFieldValue(i % 4)); + _documents.back()->setValue(type.getField("headerval"), document::IntFieldValue(i % 4)); } } diff --git a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp index 5337be6d79f..1f20a19ec51 100644 --- a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp +++ b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp @@ -2,26 +2,26 @@ #include "bucketmanager.h" #include "minimumusedbitstracker.h" -#include <iomanip> +#include <vespa/config/helper/configgetter.hpp> +#include <vespa/document/bucket/fixed_bucket_spaces.h> +#include <vespa/metrics/jsonwriter.h> #include <vespa/storage/common/content_bucket_space_repo.h> #include <vespa/storage/common/nodestateupdater.h> -#include <vespa/storage/common/global_bucket_space_distribution_converter.h> -#include <vespa/vdslib/state/cluster_state_bundle.h> #include <vespa/storage/config/config-stor-server.h> #include <vespa/storage/storageutil/distributorstatecache.h> +#include <vespa/storageapi/message/bucketsplitting.h> +#include <vespa/storageapi/message/persistence.h> +#include <vespa/storageapi/message/stat.h> +#include <vespa/storageapi/message/state.h> +#include <vespa/storageframework/generic/clock/timer.h> #include <vespa/storageframework/generic/status/htmlstatusreporter.h> #include <vespa/storageframework/generic/status/xmlstatusreporter.h> #include <vespa/storageframework/generic/thread/thread.h> -#include <vespa/storageframework/generic/clock/timer.h> -#include <vespa/storageapi/message/persistence.h> -#include <vespa/storageapi/message/state.h> -#include <vespa/storageapi/message/bucketsplitting.h> -#include <vespa/storageapi/message/stat.h> -#include <vespa/metrics/jsonwriter.h> -#include <vespa/document/bucket/fixed_bucket_spaces.h> +#include <vespa/vdslib/distribution/global_bucket_space_distribution_converter.h> +#include <vespa/vdslib/state/cluster_state_bundle.h> #include <vespa/vespalib/util/stringfmt.h> +#include <iomanip> #include <ranges> -#include <vespa/config/helper/configgetter.hpp> #include <chrono> #include <thread> @@ -526,6 +526,7 @@ BucketManager::processRequestBucketInfoCommands(document::BucketSpace bucketSpac using RBISP = std::shared_ptr<api::RequestBucketInfoCommand>; std::map<uint16_t, RBISP> requests; + // TODO fetch distribution from bundle as well auto distribution(_component.getBucketSpaceRepo().get(bucketSpace).getDistribution()); auto clusterStateBundle(_component.getStateUpdater().getClusterStateBundle()); assert(clusterStateBundle); diff --git a/storage/src/vespa/storage/common/CMakeLists.txt b/storage/src/vespa/storage/common/CMakeLists.txt index 708f3dd05b9..a385867ac98 100644 --- a/storage/src/vespa/storage/common/CMakeLists.txt +++ b/storage/src/vespa/storage/common/CMakeLists.txt @@ -5,7 +5,6 @@ vespa_add_library(storage_common OBJECT content_bucket_space.cpp content_bucket_space_repo.cpp distributorcomponent.cpp - global_bucket_space_distribution_converter.cpp messagebucket.cpp message_guard.cpp messagesender.cpp diff --git a/storage/src/vespa/storage/common/content_bucket_space.cpp b/storage/src/vespa/storage/common/content_bucket_space.cpp index 0cedb78cfe6..92b5257b991 100644 --- a/storage/src/vespa/storage/common/content_bucket_space.cpp +++ b/storage/src/vespa/storage/common/content_bucket_space.cpp @@ -4,44 +4,76 @@ namespace storage { +ClusterStateAndDistribution::ClusterStateAndDistribution( + std::shared_ptr<const lib::ClusterState> cluster_state, + std::shared_ptr<const lib::Distribution> distribution) noexcept + : _cluster_state(std::move(cluster_state)), + _distribution(std::move(distribution)) +{ +} + +ClusterStateAndDistribution::~ClusterStateAndDistribution() = default; + +std::shared_ptr<const ClusterStateAndDistribution> +ClusterStateAndDistribution::with_new_state(std::shared_ptr<const lib::ClusterState> cluster_state) const { + return std::make_shared<const ClusterStateAndDistribution>(std::move(cluster_state), _distribution); +} + +std::shared_ptr<const ClusterStateAndDistribution> +ClusterStateAndDistribution::with_new_distribution(std::shared_ptr<const lib::Distribution> distribution) const { + return std::make_shared<const ClusterStateAndDistribution>(_cluster_state, std::move(distribution)); +} + ContentBucketSpace::ContentBucketSpace(document::BucketSpace bucketSpace, const ContentBucketDbOptions& db_opts) : _bucketSpace(bucketSpace), _bucketDatabase(db_opts), _lock(), - _clusterState(), - _distribution(), + _state_and_distribution(std::make_shared<ClusterStateAndDistribution>()), _nodeUpInLastNodeStateSeenByProvider(false), _nodeMaintenanceInLastNodeStateSeenByProvider(false) { } void +ContentBucketSpace::set_state_and_distribution(std::shared_ptr<const ClusterStateAndDistribution> state_and_distr) noexcept { + assert(state_and_distr); + std::lock_guard guard(_lock); + _state_and_distribution = std::move(state_and_distr); +} + +std::shared_ptr<const ClusterStateAndDistribution> +ContentBucketSpace::state_and_distribution() const noexcept { + std::lock_guard guard(_lock); + return _state_and_distribution; +} + +void ContentBucketSpace::setClusterState(std::shared_ptr<const lib::ClusterState> clusterState) { std::lock_guard guard(_lock); - _clusterState = std::move(clusterState); + _state_and_distribution = _state_and_distribution->with_new_state(std::move(clusterState)); } std::shared_ptr<const lib::ClusterState> ContentBucketSpace::getClusterState() const { std::lock_guard guard(_lock); - return _clusterState; + return _state_and_distribution->_cluster_state; } void ContentBucketSpace::setDistribution(std::shared_ptr<const lib::Distribution> distribution) { std::lock_guard guard(_lock); - _distribution = std::move(distribution); + _state_and_distribution = _state_and_distribution->with_new_distribution(std::move(distribution)); } std::shared_ptr<const lib::Distribution> ContentBucketSpace::getDistribution() const { std::lock_guard guard(_lock); - return _distribution; + return _state_and_distribution->_distribution; } bool diff --git a/storage/src/vespa/storage/common/content_bucket_space.h b/storage/src/vespa/storage/common/content_bucket_space.h index 93b171bd48e..eb48640c97b 100644 --- a/storage/src/vespa/storage/common/content_bucket_space.h +++ b/storage/src/vespa/storage/common/content_bucket_space.h @@ -12,6 +12,27 @@ class ClusterState; class Distribution; } +struct ClusterStateAndDistribution { + std::shared_ptr<const lib::ClusterState> _cluster_state; + std::shared_ptr<const lib::Distribution> _distribution; + + ClusterStateAndDistribution() = default; + ClusterStateAndDistribution(std::shared_ptr<const lib::ClusterState> cluster_state, + std::shared_ptr<const lib::Distribution> distribution) noexcept; + ~ClusterStateAndDistribution(); + + [[nodiscard]] bool valid() const noexcept { return _cluster_state && _distribution; } + + // Precondition: valid() == true + [[nodiscard]] const lib::ClusterState& cluster_state() const noexcept { return *_cluster_state; } + [[nodiscard]] const lib::Distribution& distribution() const noexcept { return *_distribution; } + + [[nodiscard]] std::shared_ptr<const ClusterStateAndDistribution> with_new_state( + std::shared_ptr<const lib::ClusterState> cluster_state) const; + [[nodiscard]] std::shared_ptr<const ClusterStateAndDistribution> with_new_distribution( + std::shared_ptr<const lib::Distribution> distribution) const; +}; + /** * Class representing a bucket space (with associated bucket database) on a content node. */ @@ -20,8 +41,7 @@ private: document::BucketSpace _bucketSpace; StorBucketDatabase _bucketDatabase; mutable std::mutex _lock; - std::shared_ptr<const lib::ClusterState> _clusterState; - std::shared_ptr<const lib::Distribution> _distribution; + std::shared_ptr<const ClusterStateAndDistribution> _state_and_distribution; bool _nodeUpInLastNodeStateSeenByProvider; bool _nodeMaintenanceInLastNodeStateSeenByProvider; @@ -31,10 +51,18 @@ public: document::BucketSpace bucketSpace() const noexcept { return _bucketSpace; } StorBucketDatabase &bucketDatabase() { return _bucketDatabase; } + + void set_state_and_distribution(std::shared_ptr<const ClusterStateAndDistribution> state_and_distr) noexcept; + [[nodiscard]] std::shared_ptr<const ClusterStateAndDistribution> state_and_distribution() const noexcept; + // TODO deprecate; only use atomic state+distribution setter void setClusterState(std::shared_ptr<const lib::ClusterState> clusterState); + // TODO deprecate; only use atomic state+distribution getter std::shared_ptr<const lib::ClusterState> getClusterState() const; + // TODO deprecate; only use atomic state+distribution setter void setDistribution(std::shared_ptr<const lib::Distribution> distribution); + // TODO deprecate; only use atomic state+distribution getter std::shared_ptr<const lib::Distribution> getDistribution() const; + bool getNodeUpInLastNodeStateSeenByProvider() const; void setNodeUpInLastNodeStateSeenByProvider(bool nodeUpInLastNodeStateSeenByProvider); bool getNodeMaintenanceInLastNodeStateSeenByProvider() const; diff --git a/storage/src/vespa/storage/common/global_bucket_space_distribution_converter.cpp b/storage/src/vespa/storage/common/global_bucket_space_distribution_converter.cpp deleted file mode 100644 index eb42f19a5e8..00000000000 --- a/storage/src/vespa/storage/common/global_bucket_space_distribution_converter.cpp +++ /dev/null @@ -1,181 +0,0 @@ -// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "global_bucket_space_distribution_converter.h" -#include <vespa/vdslib/distribution/distribution.h> -#include <vespa/config/print/asciiconfigwriter.h> -#include <vespa/config/print/asciiconfigreader.hpp> -#include <vespa/vdslib/distribution/distribution_config_util.h> -#include <vespa/vespalib/stllike/asciistream.h> -#include <cassert> -#include <map> - -namespace storage { - -using DistributionConfig = vespa::config::content::StorDistributionConfig; -using DistributionConfigBuilder = vespa::config::content::StorDistributionConfigBuilder; - -namespace { - -struct Group { - uint16_t nested_leaf_count{0}; - std::map<uint16_t, std::unique_ptr<Group>> sub_groups; -}; - -void set_distribution_invariant_config_fields(DistributionConfigBuilder& builder) { - builder.activePerLeafGroup = true; - // TODO consider how to best support n-of-m replication for global docs - builder.ensurePrimaryPersisted = true; - builder.initialRedundancy = 0; -} - -const Group& find_non_root_group_by_index(const vespalib::string& index, const Group& root) { - auto path = lib::DistributionConfigUtil::getGroupPath(index); - auto* node = &root; - for (auto idx : path) { - auto child_iter = node->sub_groups.find(idx); - assert(child_iter != node->sub_groups.end()); - node = child_iter->second.get(); - } - return *node; -} - -vespalib::string sub_groups_to_partition_spec(const Group& parent) { - if (parent.sub_groups.empty()) { - return "*"; - } - vespalib::asciistream spec; - // We simplify the generated partition spec by only emitting wildcard entries. - // These will have replicas evenly divided amongst them. - for (size_t i = 0; i < parent.sub_groups.size(); ++i) { - if (i != 0) { - spec << '|'; - } - spec << '*'; - } - return spec.str(); -} - -bool is_leaf_group(const DistributionConfigBuilder::Group& g) noexcept { - return !g.nodes.empty(); -} - -void insert_new_group_into_tree( - std::unique_ptr<Group> new_group, - const DistributionConfigBuilder::Group& config_source_group, - Group& root) { - const auto path = lib::DistributionConfigUtil::getGroupPath(config_source_group.index); - assert(!path.empty()); - - Group* parent = &root; - for (size_t i = 0; i < path.size(); ++i) { - const auto idx = path[i]; - parent->nested_leaf_count += config_source_group.nodes.size(); // Empty if added group is not a leaf. - auto g_iter = parent->sub_groups.find(idx); - if (g_iter != parent->sub_groups.end()) { - assert(i != path.size() - 1); - parent = g_iter->second.get(); - } else { - assert(i == path.size() - 1); // Only valid case for last item in path. - parent->sub_groups.emplace(path.back(), std::move(new_group)); - } - } -} - -void build_transformed_root_group(DistributionConfigBuilder& builder, - const DistributionConfigBuilder::Group& config_source_root, - const Group& parsed_root) { - DistributionConfigBuilder::Group new_root(config_source_root); - new_root.partitions = sub_groups_to_partition_spec(parsed_root); - builder.group.emplace_back(std::move(new_root)); -} - -void build_transformed_non_root_group(DistributionConfigBuilder& builder, - const DistributionConfigBuilder::Group& config_source_group, - const Group& parsed_root) { - DistributionConfigBuilder::Group new_group(config_source_group); - if (!is_leaf_group(config_source_group)) { // Partition specs only apply to inner nodes - const auto& g = find_non_root_group_by_index(config_source_group.index, parsed_root); - new_group.partitions = sub_groups_to_partition_spec(g); - } - builder.group.emplace_back(std::move(new_group)); -} - -std::unique_ptr<Group> create_group_tree_from_config(const DistributionConfig& source) { - std::unique_ptr<Group> root; - for (auto& g : source.group) { - auto new_group = std::make_unique<Group>(); - assert(g.nodes.size() < UINT16_MAX); - new_group->nested_leaf_count = static_cast<uint16_t>(g.nodes.size()); - if (root) { - insert_new_group_into_tree(std::move(new_group), g, *root); - } else { - root = std::move(new_group); - } - } - return root; -} - -/* Even though groups are inherently hierarchical, the config is a flat array with a - * hierarchy bolted on through the use of (more or less) "multi-dimensional" index strings. - * Index string of root group is always "invalid" (or possibly some other string that cannot - * be interpreted as a dot-separated tree node path). Other groups have an index of the - * form "X.Y.Z", where Z is the group's immediate parent index, Y is Z's parent and so on. Just - * stating Z itself is not sufficient to uniquely identify the group, as group indices are - * not unique _across_ groups. For indices "0.1" and "1.1", the trailing "1" refers to 2 - * distinct groups, as they have different parents. - * - * It may be noted that the group index strings do _not_ include the root group, so we - * have to always implicitly include it ourselves. - * - * Config groups are ordered so that when a group is encountered, all its parents (and - * transitively, its parents again etc) have already been processed. This directly - * implies that the root group is always the first group present in the config. - */ -void build_global_groups(DistributionConfigBuilder& builder, const DistributionConfig& source) { - assert(!source.group.empty()); // TODO gracefully handle empty config? - auto root = create_group_tree_from_config(source); - - auto g_iter = source.group.begin(); - const auto g_end = source.group.end(); - build_transformed_root_group(builder, *g_iter, *root); - ++g_iter; - for (; g_iter != g_end; ++g_iter) { - build_transformed_non_root_group(builder, *g_iter, *root); - } - - builder.redundancy = root->nested_leaf_count; - builder.readyCopies = builder.redundancy; -} - -} // anon ns - -std::shared_ptr<DistributionConfig> -GlobalBucketSpaceDistributionConverter::convert_to_global(const DistributionConfig& source) { - DistributionConfigBuilder builder; - set_distribution_invariant_config_fields(builder); - build_global_groups(builder, source); - return std::make_shared<DistributionConfig>(builder); -} - -std::shared_ptr<lib::Distribution> -GlobalBucketSpaceDistributionConverter::convert_to_global(const lib::Distribution& distr) { - const auto src_config = distr.serialize(); - auto global_config = convert_to_global(*string_to_config(src_config)); - return std::make_shared<lib::Distribution>(*global_config); -} - -std::unique_ptr<DistributionConfig> -GlobalBucketSpaceDistributionConverter::string_to_config(const vespalib::string& cfg) { - vespalib::asciistream iss(cfg); - config::AsciiConfigReader<vespa::config::content::StorDistributionConfig> reader(iss); - return reader.read(); -} - -vespalib::string GlobalBucketSpaceDistributionConverter::config_to_string(const DistributionConfig& cfg) { - vespalib::asciistream ost; - config::AsciiConfigWriter writer(ost); - writer.write(cfg); - return ost.str(); -} - -} diff --git a/storage/src/vespa/storage/common/global_bucket_space_distribution_converter.h b/storage/src/vespa/storage/common/global_bucket_space_distribution_converter.h deleted file mode 100644 index c530922ad18..00000000000 --- a/storage/src/vespa/storage/common/global_bucket_space_distribution_converter.h +++ /dev/null @@ -1,22 +0,0 @@ -// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#pragma once - -#include <vespa/config-stor-distribution.h> -#include <memory> - -namespace storage::lib { class Distribution; } -namespace storage { - -struct GlobalBucketSpaceDistributionConverter { - using DistributionConfig = vespa::config::content::StorDistributionConfig; - - static std::shared_ptr<DistributionConfig> convert_to_global(const DistributionConfig&); - static std::shared_ptr<lib::Distribution> convert_to_global(const lib::Distribution&); - - // Helper functions which may be of use outside this class - static std::unique_ptr<DistributionConfig> string_to_config(const vespalib::string&); - static vespalib::string config_to_string(const DistributionConfig&); -}; - -} diff --git a/storage/src/vespa/storage/config/distributorconfiguration.cpp b/storage/src/vespa/storage/config/distributorconfiguration.cpp index 6957e541d6b..cfbc4caf82d 100644 --- a/storage/src/vespa/storage/config/distributorconfiguration.cpp +++ b/storage/src/vespa/storage/config/distributorconfiguration.cpp @@ -46,6 +46,7 @@ DistributorConfiguration::DistributorConfiguration(StorageComponent& component) _use_weak_internal_read_consistency_for_client_gets(false), _enable_metadata_only_fetch_phase_for_inconsistent_updates(true), _enable_operation_cancellation(false), + _symmetric_put_and_activate_replica_selection(false), _minimumReplicaCountingMode(ReplicaCountingMode::TRUSTED) { } @@ -150,6 +151,7 @@ DistributorConfiguration::configure(const DistributorManagerConfig & config) _max_activation_inhibited_out_of_sync_groups = config.maxActivationInhibitedOutOfSyncGroups; _enable_operation_cancellation = config.enableOperationCancellation; _minimumReplicaCountingMode = deriveReplicaCountingMode(config.minimumReplicaCountingMode); + _symmetric_put_and_activate_replica_selection = config.symmetricPutAndActivateReplicaSelection; if (config.maxClusterClockSkewSec >= 0) { _maxClusterClockSkew = std::chrono::seconds(config.maxClusterClockSkewSec); diff --git a/storage/src/vespa/storage/config/distributorconfiguration.h b/storage/src/vespa/storage/config/distributorconfiguration.h index 38fac13150c..2b73fdc0fa1 100644 --- a/storage/src/vespa/storage/config/distributorconfiguration.h +++ b/storage/src/vespa/storage/config/distributorconfiguration.h @@ -234,8 +234,11 @@ public: [[nodiscard]] bool enable_operation_cancellation() const noexcept { return _enable_operation_cancellation; } + [[nodiscard]] bool symmetric_put_and_activate_replica_selection() const noexcept { + return _symmetric_put_and_activate_replica_selection; + } - bool containsTimeStatement(const std::string& documentSelection) const; + [[nodiscard]] bool containsTimeStatement(const std::string& documentSelection) const; private: StorageComponent& _component; @@ -276,6 +279,7 @@ private: bool _use_weak_internal_read_consistency_for_client_gets; bool _enable_metadata_only_fetch_phase_for_inconsistent_updates; //TODO Rewrite tests and GC bool _enable_operation_cancellation; + bool _symmetric_put_and_activate_replica_selection; ReplicaCountingMode _minimumReplicaCountingMode; }; diff --git a/storage/src/vespa/storage/config/stor-distributormanager.def b/storage/src/vespa/storage/config/stor-distributormanager.def index 3f6028d7fa1..a4d4461ba68 100644 --- a/storage/src/vespa/storage/config/stor-distributormanager.def +++ b/storage/src/vespa/storage/config/stor-distributormanager.def @@ -159,6 +159,13 @@ num_distributor_stripes int default=0 restart ## requests partially or fully "invalidated" by such a change. enable_operation_cancellation bool default=false +## Iff true there will be an 1-1 symmetry between the replicas chosen as feed targets +## for Put operations and the replica selection logic for bucket activation. In particular, +## the most preferred replica for feed will be the most preferred bucket for activation. +## This helps ensure that new versions of documents are routed to replicas that are most +## likely to reflect these changes as part of visible search results. +symmetric_put_and_activate_replica_selection bool default=false + ## TODO GC very soon, it has no effect. priority_merge_out_of_sync_copies int default=120 diff --git a/storage/src/vespa/storage/config/stor-server.def b/storage/src/vespa/storage/config/stor-server.def index 8cd204bcf9f..49ce0b678d0 100644 --- a/storage/src/vespa/storage/config/stor-server.def +++ b/storage/src/vespa/storage/config/stor-server.def @@ -114,3 +114,15 @@ simulated_bucket_request_latency_msec int default=0 ## a disjoint subset of the node's buckets, in order to reduce locking contention. ## Max value is unspecified, but will be clamped internally. content_node_bucket_db_stripe_bits int default=4 restart + +## Iff set, a special `pidfile` file is written under the node's root directory upon +## startup containing the PID of the running process. +write_pid_file_on_startup bool default=true + +## Iff true, received cluster state versions that are lower than the current active +## (or pending to be active) version on the node will be explicitly rejected. This +## prevents race conditions caused by multiple cluster controllers believing they +## are the leader during overlapping time intervals, as only the most recent leader +## is able to increment the current state version in ZooKeeper, but the old controller +## may still attempt to publish its old state. +require_strictly_increasing_cluster_state_versions bool default=false diff --git a/storage/src/vespa/storage/distributor/CMakeLists.txt b/storage/src/vespa/storage/distributor/CMakeLists.txt index 195410cbe03..212570a3033 100644 --- a/storage/src/vespa/storage/distributor/CMakeLists.txt +++ b/storage/src/vespa/storage/distributor/CMakeLists.txt @@ -5,7 +5,6 @@ vespa_add_library(storage_distributor OBJECT blockingoperationstarter.cpp bucket_db_prune_elision.cpp bucket_ownership_calculator.cpp - bucket_space_distribution_configs.cpp bucket_space_distribution_context.cpp bucket_space_state_map.cpp bucket_spaces_stats_provider.cpp diff --git a/storage/src/vespa/storage/distributor/activecopy.cpp b/storage/src/vespa/storage/distributor/activecopy.cpp index 35070bcee3b..b823978a0cc 100644 --- a/storage/src/vespa/storage/distributor/activecopy.cpp +++ b/storage/src/vespa/storage/distributor/activecopy.cpp @@ -108,6 +108,7 @@ buildNodeList(const BucketDatabase::Entry& e,vespalib::ConstArrayRef<uint16_t> n struct ActiveStateOrder { bool operator()(const ActiveCopy & e1, const ActiveCopy & e2) noexcept { + // Replica selection order should be kept in sync with OperationTargetResolverImpl's InstanceOrder. if (e1._ready != e2._ready) { return e1._ready; } @@ -120,7 +121,9 @@ struct ActiveStateOrder { if (e1._active != e2._active) { return e1._active; } - return e1.nodeIndex() < e2.nodeIndex(); + // Use _entry_ order instead of node index, as it is in ideal state order (even for retired + // nodes), which avoids unintentional affinities towards lower node indexes. + return e1.entryIndex() < e2.entryIndex(); } }; diff --git a/storage/src/vespa/storage/distributor/bucket_space_distribution_configs.cpp b/storage/src/vespa/storage/distributor/bucket_space_distribution_configs.cpp deleted file mode 100644 index 37bf8f01752..00000000000 --- a/storage/src/vespa/storage/distributor/bucket_space_distribution_configs.cpp +++ /dev/null @@ -1,17 +0,0 @@ -// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include "bucket_space_distribution_configs.h" -#include <vespa/document/bucket/fixed_bucket_spaces.h> -#include <vespa/storage/common/global_bucket_space_distribution_converter.h> -#include <vespa/vdslib/distribution/distribution.h> - -namespace storage::distributor { - -BucketSpaceDistributionConfigs -BucketSpaceDistributionConfigs::from_default_distribution(std::shared_ptr<const lib::Distribution> distribution) { - BucketSpaceDistributionConfigs ret; - ret.space_configs.emplace(document::FixedBucketSpaces::global_space(), GlobalBucketSpaceDistributionConverter::convert_to_global(*distribution)); - ret.space_configs.emplace(document::FixedBucketSpaces::default_space(), std::move(distribution)); - return ret; -} - -} diff --git a/storage/src/vespa/storage/distributor/bucket_space_distribution_configs.h b/storage/src/vespa/storage/distributor/bucket_space_distribution_configs.h deleted file mode 100644 index cddd21d579f..00000000000 --- a/storage/src/vespa/storage/distributor/bucket_space_distribution_configs.h +++ /dev/null @@ -1,27 +0,0 @@ -// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#pragma once - -#include <vespa/document/bucket/bucketspace.h> -#include <map> -#include <memory> - -namespace storage::lib { class Distribution; } - -namespace storage::distributor { - -/** - * Represents a complete mapping of all known bucket spaces to their appropriate, - * (possibly derived) distribution config. - */ -struct BucketSpaceDistributionConfigs { - std::map<document::BucketSpace, std::shared_ptr<const lib::Distribution>> space_configs; - - std::shared_ptr<const lib::Distribution> get_or_nullptr(document::BucketSpace space) const noexcept { - auto iter = space_configs.find(space); - return (iter != space_configs.end()) ? iter->second : std::shared_ptr<const lib::Distribution>(); - } - - static BucketSpaceDistributionConfigs from_default_distribution(std::shared_ptr<const lib::Distribution>); -}; - -} diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp index c00ab7080da..f8abdf78c2b 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe.cpp +++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp @@ -12,7 +12,6 @@ #include "stripe_host_info_notifier.h" #include "throttlingoperationstarter.h" #include <vespa/document/bucket/fixed_bucket_spaces.h> -#include <vespa/storage/common/global_bucket_space_distribution_converter.h> #include <vespa/storage/common/node_identity.h> #include <vespa/storage/common/nodestateupdater.h> #include <vespa/storage/distributor/maintenance/simplebucketprioritydatabase.h> @@ -21,6 +20,7 @@ #include <vespa/storage/config/distributorconfiguration.h> #include <vespa/storageframework/generic/status/xmlstatusreporter.h> #include <vespa/vdslib/distribution/distribution.h> +#include <vespa/vdslib/distribution/global_bucket_space_distribution_converter.h> #include <vespa/vespalib/util/memoryusage.h> #include <algorithm> @@ -545,7 +545,7 @@ DistributorStripe::checkBucketForSplit(document::BucketSpace bucketSpace, const void DistributorStripe::propagateDefaultDistribution(std::shared_ptr<const lib::Distribution> distribution) { - auto global_distr = GlobalBucketSpaceDistributionConverter::convert_to_global(*distribution); + auto global_distr = lib::GlobalBucketSpaceDistributionConverter::convert_to_global(*distribution); for (auto* repo : {_bucketSpaceRepo.get(), _readOnlyBucketSpaceRepo.get()}) { repo->get(document::FixedBucketSpaces::default_space()).setDistribution(distribution); repo->get(document::FixedBucketSpaces::global_space()).setDistribution(global_distr); @@ -554,7 +554,7 @@ DistributorStripe::propagateDefaultDistribution(std::shared_ptr<const lib::Distr // Only called when stripe is in rendezvous freeze void -DistributorStripe::update_distribution_config(const BucketSpaceDistributionConfigs& new_configs) { +DistributorStripe::update_distribution_config(const lib::BucketSpaceDistributionConfigs& new_configs) { auto default_distr = new_configs.get_or_nullptr(document::FixedBucketSpaces::default_space()); auto global_distr = new_configs.get_or_nullptr(document::FixedBucketSpaces::global_space()); assert(default_distr && global_distr); diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.h b/storage/src/vespa/storage/distributor/distributor_stripe.h index d782432ab35..f5793e4d39b 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe.h +++ b/storage/src/vespa/storage/distributor/distributor_stripe.h @@ -283,7 +283,7 @@ private: void propagate_config_snapshot_to_internal_components(); // Additional implementations of TickableStripe: - void update_distribution_config(const BucketSpaceDistributionConfigs& new_configs) override; + void update_distribution_config(const lib::BucketSpaceDistributionConfigs& new_configs) override; void update_total_distributor_config(std::shared_ptr<const DistributorConfiguration> config) override; void set_pending_cluster_state_bundle(const lib::ClusterStateBundle& pending_state) override; void clear_pending_cluster_state_bundle() override; diff --git a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp index f1cce40ee8b..991a73ec5c6 100644 --- a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp +++ b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp @@ -33,7 +33,7 @@ void MultiThreadedStripeAccessGuard::update_total_distributor_config(std::shared }); } -void MultiThreadedStripeAccessGuard::update_distribution_config(const BucketSpaceDistributionConfigs& new_configs) { +void MultiThreadedStripeAccessGuard::update_distribution_config(const lib::BucketSpaceDistributionConfigs& new_configs) { for_each_stripe([&](TickableStripe& stripe) { stripe.update_distribution_config(new_configs); }); diff --git a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h index a4392416025..3ce22a3e1a7 100644 --- a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h +++ b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h @@ -31,7 +31,7 @@ public: void update_total_distributor_config(std::shared_ptr<const DistributorConfiguration> config) override; - void update_distribution_config(const BucketSpaceDistributionConfigs& new_configs) override; + void update_distribution_config(const lib::BucketSpaceDistributionConfigs& new_configs) override; void set_pending_cluster_state_bundle(const lib::ClusterStateBundle& pending_state) override; void clear_pending_cluster_state_bundle() override; void enable_cluster_state_bundle(const lib::ClusterStateBundle& new_state, diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp index 54087850e1b..a92896279b0 100644 --- a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp @@ -180,6 +180,8 @@ void PutOperation::start_direct_put_dispatch(DistributorStripeMessageSender& sen _op_ctx.distributor_config().getMinimalBucketSplit(), _bucket_space.getDistribution().getRedundancy(), _msg->getBucket().getBucketSpace()); + targetResolver.use_symmetric_replica_selection( + _op_ctx.distributor_config().symmetric_put_and_activate_replica_selection()); OperationTargetList targets(targetResolver.getTargets(OperationTargetResolver::PUT, _doc_id_bucket_id)); for (const auto& target : targets) { diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp index 84e9ab71bcb..849746416d6 100644 --- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp @@ -668,7 +668,7 @@ TwoPhaseUpdateOperation::applyUpdateToDocument(document::Document& doc) const bool TwoPhaseUpdateOperation::shouldCreateIfNonExistent() const { - return _updateCmd->getUpdate()->getCreateIfNonExistent(); + return _updateCmd->create_if_missing(); } bool diff --git a/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp index 7b6833cc299..2b47d53363f 100644 --- a/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp @@ -29,7 +29,7 @@ UpdateOperation::UpdateOperation(const DistributorNodeContext& node_ctx, _msg(msg), _entries(std::move(entries)), _new_timestamp(_msg->getTimestamp()), - _is_auto_create_update(_msg->getUpdate()->getCreateIfNonExistent()), + _is_auto_create_update(_msg->create_if_missing()), _node_ctx(node_ctx), _op_ctx(op_ctx), _bucketSpace(bucketSpace), @@ -112,6 +112,9 @@ UpdateOperation::onStart(DistributorStripeMessageSender& sender) copyMessageSettings(*_msg, *command); command->setOldTimestamp(_msg->getOldTimestamp()); command->setCondition(_msg->getCondition()); + if (_msg->has_cached_create_if_missing()) { + command->set_cached_create_if_missing(_msg->create_if_missing()); + } messages.emplace_back(std::move(command), node); } diff --git a/storage/src/vespa/storage/distributor/operationtargetresolverimpl.cpp b/storage/src/vespa/storage/distributor/operationtargetresolverimpl.cpp index 394c13c2bad..618cfb56359 100644 --- a/storage/src/vespa/storage/distributor/operationtargetresolverimpl.cpp +++ b/storage/src/vespa/storage/distributor/operationtargetresolverimpl.cpp @@ -10,9 +10,12 @@ namespace storage::distributor { BucketInstance::BucketInstance(const document::BucketId& id, const api::BucketInfo& info, lib::Node node, - uint16_t idealLocationPriority, bool trusted, bool exist) noexcept + uint16_t ideal_location_priority, uint16_t db_entry_order, + bool trusted, bool exist) noexcept : _bucket(id), _info(info), _node(node), - _idealLocationPriority(idealLocationPriority), _trusted(trusted), _exist(exist) + _ideal_location_priority(ideal_location_priority), + _db_entry_order(db_entry_order), + _trusted(trusted), _exists(exist) { } @@ -24,8 +27,8 @@ BucketInstance::print(vespalib::asciistream& out, const PrintProperties&) const std::ostringstream ost; ost << std::hex << _bucket.getId(); - out << "(" << ost.str() << ", " << infoString << ", node " << _node.getIndex() << ", ideal " << _idealLocationPriority - << (_trusted ? ", trusted" : "") << (_exist ? "" : ", new copy") << ")"; + out << "(" << ost.str() << ", " << infoString << ", node " << _node.getIndex() << ", ideal " << _ideal_location_priority + << (_trusted ? ", trusted" : "") << (_exists ? "" : ", new copy") << ")"; } bool @@ -42,7 +45,7 @@ BucketInstanceList::add(const BucketDatabase::Entry& e, const IdealServiceLayerN for (uint32_t i = 0; i < e.getBucketInfo().getNodeCount(); ++i) { const BucketCopy& copy(e.getBucketInfo().getNodeRef(i)); lib::Node node(lib::NodeType::STORAGE, copy.getNode()); - _instances.emplace_back(e.getBucketId(), copy.getBucketInfo(), node, idealState.lookup(copy.getNode()), copy.trusted(), true); + _instances.emplace_back(e.getBucketId(), copy.getBucketInfo(), node, idealState.lookup(copy.getNode()), i, copy.trusted(), true); } } @@ -106,7 +109,8 @@ BucketInstanceList::extendToEnoughCopies(const DistributorBucketSpace& distribut for (uint32_t i=0; i<idealNodes.size(); ++i) { lib::Node node(lib::NodeType::STORAGE, idealNodes[i]); if (!contains(node)) { - _instances.emplace_back(newTarget, api::BucketInfo(), node, i, false, false); + // We don't sort `_instances` after extending, so just reuse `i` as dummy DB entry order. + _instances.emplace_back(newTarget, api::BucketInfo(), node, i, i, false, false); } } } @@ -116,7 +120,7 @@ BucketInstanceList::createTargets(document::BucketSpace bucketSpace) { OperationTargetList result; for (const auto& bi : _instances) { - result.emplace_back(document::Bucket(bucketSpace, bi._bucket), bi._node, !bi._exist); + result.emplace_back(document::Bucket(bucketSpace, bi._bucket), bi._node, !bi._exists); } return result; } @@ -129,6 +133,49 @@ BucketInstanceList::print(vespalib::asciistream& out, const PrintProperties& p) namespace { /** + * To maintain a symmetry between which replicas receive Puts and which versions are + * preferred for activation, use an identical ordering predicate for both (for the case + * where replicas are for the same concrete bucket). + * + * Must only be used with BucketInstances that have a distinct _db_entry_order set per instance. + */ +struct ActiveReplicaSymmetricInstanceOrder { + bool operator()(const BucketInstance& a, const BucketInstance& b) noexcept { + if (a._bucket == b._bucket) { + if (a._info.isReady() != b._info.isReady()) { + return a._info.isReady(); + } + if (a._info.getDocumentCount() != b._info.getDocumentCount()) { + return a._info.getDocumentCount() > b._info.getDocumentCount(); + } + if (a._ideal_location_priority != b._ideal_location_priority) { + return a._ideal_location_priority < b._ideal_location_priority; + } + if (a._info.isActive() != b._info.isActive()) { + return a._info.isActive(); + } + // If all else is equal, this implies both A and B are on retired nodes, which is unlikely + // but possible. Fall back to the existing DB _entry order_, which is equal to an ideal + // state order where retired nodes are considered part of the ideal state (which is not the + // case for most ideal state operations). Since the DB entry order is in ideal state order, + // using this instead of node _index_ avoids affinities to lower indexes in such edge cases. + return a._db_entry_order < b._db_entry_order; + } else { + // TODO this inconsistent split case is equal to the legacy logic (aside from the tie-breaking), + // but is considered to be extremely unlikely in practice, so not worth optimizing for. + if ((a._info.getMetaCount() == 0) ^ (b._info.getMetaCount() == 0)) { + return (a._info.getMetaCount() == 0); + } + if (a._bucket.getUsedBits() != b._bucket.getUsedBits()) { + return (a._bucket.getUsedBits() > b._bucket.getUsedBits()); + } + return a._db_entry_order < b._db_entry_order; + } + return false; + } +}; + +/** * - Trusted copies should be preferred over non-trusted copies for the same bucket. * - Buckets in ideal locations should be preferred over non-ideal locations for the * same bucket across several nodes. @@ -137,14 +184,14 @@ namespace { * - Right after split/join, bucket is often not in ideal location, but should be * preferred instead of source anyhow. */ -struct InstanceOrder { - bool operator()(const BucketInstance& a, const BucketInstance& b) { +struct LegacyInstanceOrder { + bool operator()(const BucketInstance& a, const BucketInstance& b) noexcept { if (a._bucket == b._bucket) { - // Trusted only makes sense within same bucket - // Prefer trusted buckets over non-trusted ones. + // Trusted only makes sense within same bucket + // Prefer trusted buckets over non-trusted ones. if (a._trusted != b._trusted) return a._trusted; - if (a._idealLocationPriority != b._idealLocationPriority) { - return a._idealLocationPriority < b._idealLocationPriority; + if (a._ideal_location_priority != b._ideal_location_priority) { + return a._ideal_location_priority < b._ideal_location_priority; } } else { if ((a._info.getMetaCount() == 0) ^ (b._info.getMetaCount() == 0)) { @@ -164,7 +211,11 @@ OperationTargetResolverImpl::getAllInstances(OperationType type, const document: BucketInstanceList instances; if (type == PUT) { instances.populate(id, _distributor_bucket_space, _bucketDatabase); - instances.sort(InstanceOrder()); + if (_symmetric_replica_selection) { + instances.sort(ActiveReplicaSymmetricInstanceOrder()); + } else { + instances.sort(LegacyInstanceOrder()); + } instances.removeNodeDuplicates(); instances.extendToEnoughCopies(_distributor_bucket_space, _bucketDatabase, _bucketDatabase.getAppropriateBucket(_minUsedBucketBits, id), id); diff --git a/storage/src/vespa/storage/distributor/operationtargetresolverimpl.h b/storage/src/vespa/storage/distributor/operationtargetresolverimpl.h index 9f367a89cba..6ab38928200 100644 --- a/storage/src/vespa/storage/distributor/operationtargetresolverimpl.h +++ b/storage/src/vespa/storage/distributor/operationtargetresolverimpl.h @@ -15,15 +15,17 @@ struct BucketInstance : public vespalib::AsciiPrintable { document::BucketId _bucket; api::BucketInfo _info; lib::Node _node; - uint16_t _idealLocationPriority; - bool _trusted; - bool _exist; + uint16_t _ideal_location_priority; + uint16_t _db_entry_order; + bool _trusted; // TODO remove + bool _exists; BucketInstance() noexcept - : _idealLocationPriority(0xffff), _trusted(false), _exist(false) {} + : _ideal_location_priority(0xffff), _db_entry_order(0xffff), _trusted(false), _exists(false) + {} BucketInstance(const document::BucketId& id, const api::BucketInfo& info, - lib::Node node, uint16_t idealLocationPriority, bool trusted, - bool exist) noexcept; + lib::Node node, uint16_t ideal_location_priority, + uint16_t db_entry_order, bool trusted, bool exist) noexcept; void print(vespalib::asciistream& out, const PrintProperties&) const override; }; @@ -83,6 +85,7 @@ class OperationTargetResolverImpl : public OperationTargetResolver { uint32_t _minUsedBucketBits; uint16_t _redundancy; document::BucketSpace _bucketSpace; + bool _symmetric_replica_selection; public: OperationTargetResolverImpl(const DistributorBucketSpace& distributor_bucket_space, @@ -94,9 +97,14 @@ public: _bucketDatabase(bucketDatabase), _minUsedBucketBits(minUsedBucketBits), _redundancy(redundancy), - _bucketSpace(bucketSpace) + _bucketSpace(bucketSpace), + _symmetric_replica_selection(true) {} + void use_symmetric_replica_selection(bool symmetry) noexcept { + _symmetric_replica_selection = symmetry; + } + BucketInstanceList getAllInstances(OperationType type, const document::BucketId& id); BucketInstanceList getInstances(OperationType type, const document::BucketId& id) { BucketInstanceList result(getAllInstances(type, id)); diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp index b756c2e421b..f2b5fa62d1e 100644 --- a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp +++ b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp @@ -5,7 +5,6 @@ #include "pendingclusterstate.h" #include "top_level_bucket_db_updater.h" #include <vespa/document/bucket/fixed_bucket_spaces.h> -#include <vespa/storage/common/global_bucket_space_distribution_converter.h> #include <vespa/storageframework/defaultimplementation/clock/realclock.h> #include <vespa/vdslib/distribution/distribution.h> #include <vespa/vespalib/util/xmlstream.hpp> diff --git a/storage/src/vespa/storage/distributor/stripe_access_guard.h b/storage/src/vespa/storage/distributor/stripe_access_guard.h index 1618bc9be9d..8a930ed3305 100644 --- a/storage/src/vespa/storage/distributor/stripe_access_guard.h +++ b/storage/src/vespa/storage/distributor/stripe_access_guard.h @@ -1,11 +1,11 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include "bucket_space_distribution_configs.h" #include "pending_bucket_space_db_transition_entry.h" #include "potential_data_loss_report.h" #include "outdated_nodes.h" #include <vespa/document/bucket/bucketspace.h> +#include <vespa/vdslib/distribution/bucket_space_distribution_configs.h> #include <vespa/storageapi/defs.h> #include <unordered_set> // TODO use hash_set instead @@ -38,7 +38,7 @@ public: virtual void update_total_distributor_config(std::shared_ptr<const DistributorConfiguration> config) = 0; - virtual void update_distribution_config(const BucketSpaceDistributionConfigs& new_configs) = 0; + virtual void update_distribution_config(const lib::BucketSpaceDistributionConfigs& new_configs) = 0; virtual void set_pending_cluster_state_bundle(const lib::ClusterStateBundle& pending_state) = 0; virtual void clear_pending_cluster_state_bundle() = 0; virtual void enable_cluster_state_bundle(const lib::ClusterStateBundle& new_state, diff --git a/storage/src/vespa/storage/distributor/tickable_stripe.h b/storage/src/vespa/storage/distributor/tickable_stripe.h index ab1cd570089..2605c24639e 100644 --- a/storage/src/vespa/storage/distributor/tickable_stripe.h +++ b/storage/src/vespa/storage/distributor/tickable_stripe.h @@ -39,7 +39,7 @@ public: virtual void update_total_distributor_config(std::shared_ptr<const DistributorConfiguration> config) = 0; - virtual void update_distribution_config(const BucketSpaceDistributionConfigs& new_configs) = 0; + virtual void update_distribution_config(const lib::BucketSpaceDistributionConfigs& new_configs) = 0; virtual void set_pending_cluster_state_bundle(const lib::ClusterStateBundle& pending_state) = 0; virtual void clear_pending_cluster_state_bundle() = 0; virtual void enable_cluster_state_bundle(const lib::ClusterStateBundle& new_state, diff --git a/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp b/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp index 1f7d11362e2..1991c9aaf58 100644 --- a/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp +++ b/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp @@ -2,7 +2,6 @@ #include "top_level_bucket_db_updater.h" #include "bucket_db_prune_elision.h" -#include "bucket_space_distribution_configs.h" #include "bucket_space_distribution_context.h" #include "top_level_distributor.h" #include "distributor_bucket_space.h" @@ -11,11 +10,12 @@ #include "simpleclusterinformation.h" #include "stripe_access_guard.h" #include <vespa/document/bucket/fixed_bucket_spaces.h> -#include <vespa/storage/common/global_bucket_space_distribution_converter.h> #include <vespa/storage/config/distributorconfiguration.h> #include <vespa/storageapi/message/persistence.h> #include <vespa/storageapi/message/removelocation.h> +#include <vespa/vdslib/distribution/bucket_space_distribution_configs.h> #include <vespa/vdslib/distribution/distribution.h> +#include <vespa/vdslib/distribution/global_bucket_space_distribution_converter.h> #include <vespa/vdslib/state/clusterstate.h> #include <vespa/vespalib/util/xmlstream.h> #include <thread> @@ -54,6 +54,7 @@ TopLevelBucketDBUpdater::TopLevelBucketDBUpdater(const DistributorNodeContext& n { // FIXME STRIPE top-level Distributor needs a proper way to track the current cluster state bundle! propagate_active_state_bundle_internally(true); // We're just starting up so assume ownership transfer. + // TODO bootstrap cluster state bundle instead? version:0 cluster:d bootstrap_distribution_config(std::move(bootstrap_distribution)); } @@ -71,7 +72,7 @@ TopLevelBucketDBUpdater::propagate_active_state_bundle_internally(bool has_bucke void TopLevelBucketDBUpdater::bootstrap_distribution_config(std::shared_ptr<const lib::Distribution> distribution) { - auto global_distr = GlobalBucketSpaceDistributionConverter::convert_to_global(*distribution); + auto global_distr = lib::GlobalBucketSpaceDistributionConverter::convert_to_global(*distribution); _op_ctx.bucket_space_states().get(document::FixedBucketSpaces::default_space()).set_distribution(distribution); _op_ctx.bucket_space_states().get(document::FixedBucketSpaces::global_space()).set_distribution(global_distr); // TODO STRIPE do we need to bootstrap the stripes as well here? Or do they do this on their own volition? @@ -79,7 +80,7 @@ TopLevelBucketDBUpdater::bootstrap_distribution_config(std::shared_ptr<const lib } void -TopLevelBucketDBUpdater::propagate_distribution_config(const BucketSpaceDistributionConfigs& configs) { +TopLevelBucketDBUpdater::propagate_distribution_config(const lib::BucketSpaceDistributionConfigs& configs) { if (auto distr = configs.get_or_nullptr(document::FixedBucketSpaces::default_space())) { _op_ctx.bucket_space_states().get(document::FixedBucketSpaces::default_space()).set_distribution(distr); } @@ -183,13 +184,14 @@ TopLevelBucketDBUpdater::complete_transition_timer() } void -TopLevelBucketDBUpdater::storage_distribution_changed(const BucketSpaceDistributionConfigs& configs) +TopLevelBucketDBUpdater::storage_distribution_changed(const lib::BucketSpaceDistributionConfigs& configs) { propagate_distribution_config(configs); ensure_transition_timer_started(); auto guard = _stripe_accessor.rendezvous_and_hold_all(); // FIXME STRIPE might this cause a mismatch with the component stuff's own distribution config..?! + // TODO should be part of bundle only...!! guard->update_distribution_config(configs); remove_superfluous_buckets(*guard, _active_state_bundle, true); diff --git a/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h b/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h index e76456329d4..87a408281a7 100644 --- a/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h +++ b/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h @@ -23,9 +23,12 @@ class XmlOutputStream; class XmlAttribute; } +namespace storage::lib { +struct BucketSpaceDistributionConfigs; +} + namespace storage::distributor { -struct BucketSpaceDistributionConfigs; class BucketSpaceDistributionContext; class ClusterStateBundleActivationListener; class DistributorInterface; @@ -57,9 +60,9 @@ public: bool reportStatus(std::ostream&, const framework::HttpUrlPath&) const override; void resend_delayed_messages(); - void storage_distribution_changed(const BucketSpaceDistributionConfigs& configs); + void storage_distribution_changed(const lib::BucketSpaceDistributionConfigs& configs); void bootstrap_distribution_config(std::shared_ptr<const lib::Distribution>); - void propagate_distribution_config(const BucketSpaceDistributionConfigs& configs); + void propagate_distribution_config(const lib::BucketSpaceDistributionConfigs& configs); vespalib::string report_xml_status(vespalib::xml::XmlOutputStream& xos, const framework::HttpUrlPath&) const; diff --git a/storage/src/vespa/storage/distributor/top_level_distributor.cpp b/storage/src/vespa/storage/distributor/top_level_distributor.cpp index 7348dbd6409..f7ee89ae7c0 100644 --- a/storage/src/vespa/storage/distributor/top_level_distributor.cpp +++ b/storage/src/vespa/storage/distributor/top_level_distributor.cpp @@ -1,7 +1,6 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. // #include "blockingoperationstarter.h" -#include "bucket_space_distribution_configs.h" #include "top_level_bucket_db_updater.h" #include "top_level_distributor.h" #include "distributor_bucket_space.h" @@ -16,7 +15,6 @@ #include "throttlingoperationstarter.h" #include <vespa/document/bucket/fixed_bucket_spaces.h> #include <vespa/storage/common/bucket_stripe_utils.h> -#include <vespa/storage/common/global_bucket_space_distribution_converter.h> #include <vespa/storage/common/hostreporter/hostinfo.h> #include <vespa/storage/common/node_identity.h> #include <vespa/storage/common/nodestateupdater.h> @@ -25,7 +23,9 @@ #include <vespa/storageapi/message/persistence.h> #include <vespa/storageapi/message/visitor.h> #include <vespa/storageframework/generic/status/xmlstatusreporter.h> +#include <vespa/vdslib/distribution/bucket_space_distribution_configs.h> #include <vespa/vdslib/distribution/distribution.h> +#include <vespa/vdslib/distribution/global_bucket_space_distribution_converter.h> #include <vespa/vespalib/util/memoryusage.h> #include <algorithm> @@ -323,7 +323,7 @@ TopLevelDistributor::enable_next_distribution_if_changed() if (_next_distribution) { _distribution = _next_distribution; _next_distribution = std::shared_ptr<lib::Distribution>(); - auto new_configs = BucketSpaceDistributionConfigs::from_default_distribution(_distribution); + auto new_configs = lib::BucketSpaceDistributionConfigs::from_default_distribution(_distribution); _bucket_db_updater->storage_distribution_changed(new_configs); // Transitively updates all stripes' configs } } @@ -334,7 +334,7 @@ TopLevelDistributor::propagate_default_distribution_thread_unsafe( { // Should only be called at ctor time, at which point the pool is not yet running. assert(_stripe_pool.stripe_count() == 0); - auto new_configs = BucketSpaceDistributionConfigs::from_default_distribution(std::move(distribution)); + auto new_configs = lib::BucketSpaceDistributionConfigs::from_default_distribution(std::move(distribution)); for (auto& stripe : _stripes) { stripe->update_distribution_config(new_configs); } diff --git a/storage/src/vespa/storage/frameworkimpl/component/servicelayercomponentregisterimpl.cpp b/storage/src/vespa/storage/frameworkimpl/component/servicelayercomponentregisterimpl.cpp index 94853ec18d1..ab1cbf0b4d7 100644 --- a/storage/src/vespa/storage/frameworkimpl/component/servicelayercomponentregisterimpl.cpp +++ b/storage/src/vespa/storage/frameworkimpl/component/servicelayercomponentregisterimpl.cpp @@ -3,7 +3,7 @@ #include "servicelayercomponentregisterimpl.h" #include <vespa/vespalib/util/exceptions.h> #include <vespa/document/bucket/fixed_bucket_spaces.h> -#include <vespa/storage/common/global_bucket_space_distribution_converter.h> +#include <vespa/vdslib/distribution/global_bucket_space_distribution_converter.h> namespace storage { @@ -26,7 +26,7 @@ void ServiceLayerComponentRegisterImpl::setDistribution(std::shared_ptr<lib::Distribution> distribution) { _bucketSpaceRepo.get(document::FixedBucketSpaces::default_space()).setDistribution(distribution); - auto global_distr = GlobalBucketSpaceDistributionConverter::convert_to_global(*distribution); + auto global_distr = lib::GlobalBucketSpaceDistributionConverter::convert_to_global(*distribution); _bucketSpaceRepo.get(document::FixedBucketSpaces::global_space()).setDistribution(global_distr); StorageComponentRegisterImpl::setDistribution(distribution); } diff --git a/storage/src/vespa/storage/persistence/bucketownershipnotifier.cpp b/storage/src/vespa/storage/persistence/bucketownershipnotifier.cpp index ee49b346d92..d5de11c7d6f 100644 --- a/storage/src/vespa/storage/persistence/bucketownershipnotifier.cpp +++ b/storage/src/vespa/storage/persistence/bucketownershipnotifier.cpp @@ -19,6 +19,7 @@ uint16_t BucketOwnershipNotifier::getOwnerDistributorForBucket(const document::Bucket &bucket) const { try { + // TODO use state updater bundle for everything? auto distribution(_component.getBucketSpaceRepo().get(bucket.getBucketSpace()).getDistribution()); const auto clusterStateBundle = _component.getStateUpdater().getClusterStateBundle(); const auto &clusterState = *clusterStateBundle->getDerivedClusterState(bucket.getBucketSpace()); diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index 23de39f7130..495497d507d 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -930,8 +930,9 @@ FileStorManager::updateState() for (const auto &elem : _component.getBucketSpaceRepo()) { BucketSpace bucketSpace(elem.first); ContentBucketSpace& contentBucketSpace = *elem.second; - auto derivedClusterState = contentBucketSpace.getClusterState(); - const bool node_up_in_space = derivedClusterState->getNodeState(node).getState().oneOf("uir"); + auto state_and_distr = contentBucketSpace.state_and_distribution(); + assert(state_and_distr->valid()); + const bool node_up_in_space = state_and_distr->cluster_state().getNodeState(node).getState().oneOf("uir"); if (should_deactivate_buckets(contentBucketSpace, node_up_in_space, in_maintenance)) { LOG(debug, "Received cluster state where this node is down; de-activating all buckets " "in database for bucket space %s", bucketSpace.toString().c_str()); @@ -941,9 +942,8 @@ FileStorManager::updateState() } contentBucketSpace.setNodeUpInLastNodeStateSeenByProvider(node_up_in_space); contentBucketSpace.setNodeMaintenanceInLastNodeStateSeenByProvider(in_maintenance); - spi::ClusterState spiState(*derivedClusterState, _component.getIndex(), - *contentBucketSpace.getDistribution(), - in_maintenance); + spi::ClusterState spiState(state_and_distr->cluster_state(), _component.getIndex(), + state_and_distr->distribution(), in_maintenance); _provider->setClusterState(bucketSpace, spiState); } } @@ -959,6 +959,7 @@ FileStorManager::propagateClusterStates() { auto clusterStateBundle = _component.getStateUpdater().getClusterStateBundle(); for (const auto &elem : _component.getBucketSpaceRepo()) { + // TODO also distribution! bundle and repo must be 1-1 elem.second->setClusterState(clusterStateBundle->getDerivedClusterState(elem.first)); } } diff --git a/storage/src/vespa/storage/persistence/filestorage/merge_handler_metrics.cpp b/storage/src/vespa/storage/persistence/filestorage/merge_handler_metrics.cpp index c3cb38bd7ac..582c69e943f 100644 --- a/storage/src/vespa/storage/persistence/filestorage/merge_handler_metrics.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/merge_handler_metrics.cpp @@ -20,8 +20,8 @@ MergeHandlerMetrics::MergeHandlerMetrics(metrics::MetricSet* owner) "current node.", owner), mergeAverageDataReceivedNeeded("mergeavgdatareceivedneeded", {}, "Amount of data transferred from previous node " "in chain that we needed to apply locally.", owner), - put_latency("put_latency", {}, "Latency of individual puts that are part of merge operations", owner), - remove_latency("remove_latency", {}, "Latency of individual removes that are part of merge operations", owner) + merge_put_latency("merge_put_latency", {}, "Latency of individual puts that are part of merge operations", owner), + merge_remove_latency("merge_remove_latency", {}, "Latency of individual removes that are part of merge operations", owner) {} MergeHandlerMetrics::~MergeHandlerMetrics() = default; diff --git a/storage/src/vespa/storage/persistence/filestorage/merge_handler_metrics.h b/storage/src/vespa/storage/persistence/filestorage/merge_handler_metrics.h index 44b85570357..a2d68011695 100644 --- a/storage/src/vespa/storage/persistence/filestorage/merge_handler_metrics.h +++ b/storage/src/vespa/storage/persistence/filestorage/merge_handler_metrics.h @@ -21,8 +21,8 @@ struct MergeHandlerMetrics { metrics::DoubleAverageMetric mergeAverageDataReceivedNeeded; // Individual operation metrics. These capture both count and latency sum, so // no need for explicit count metric on the side. - metrics::DoubleAverageMetric put_latency; - metrics::DoubleAverageMetric remove_latency; + metrics::DoubleAverageMetric merge_put_latency; + metrics::DoubleAverageMetric merge_remove_latency; // Iteration over metadata and document payload data is already covered by // the merge[Meta]Data(Read|Write)Latency metrics, so not repeated here. Can be // explicitly added if deemed required. diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp index 7ee2d9f37bf..b3207428f5f 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.cpp +++ b/storage/src/vespa/storage/persistence/mergehandler.cpp @@ -4,13 +4,14 @@ #include "persistenceutil.h" #include "apply_bucket_diff_entry_complete.h" #include "apply_bucket_diff_state.h" -#include <vespa/storage/persistence/filestorage/mergestatus.h> -#include <vespa/persistence/spi/persistenceprovider.h> -#include <vespa/persistence/spi/docentry.h> -#include <vespa/vdslib/distribution/distribution.h> #include <vespa/document/fieldset/fieldsets.h> #include <vespa/document/fieldvalue/document.h> +#include <vespa/persistence/spi/docentry.h> +#include <vespa/persistence/spi/persistenceprovider.h> +#include <vespa/storage/persistence/filestorage/mergestatus.h> +#include <vespa/vdslib/distribution/distribution.h> #include <vespa/vespalib/objects/nbostream.h> +#include <vespa/vespalib/stllike/hash_map.hpp> #include <vespa/vespalib/util/exceptions.h> #include <vespa/vespalib/util/isequencedtaskexecutor.h> #include <algorithm> @@ -506,8 +507,18 @@ void MergeHandler::applyDiffEntry(std::shared_ptr<ApplyBucketDiffState> async_results, const spi::Bucket& bucket, const api::ApplyBucketDiffCommand::Entry& e, - const document::DocumentTypeRepo& repo) const + const document::DocumentTypeRepo& repo, + const NewestDocumentVersionMapping& newest_per_doc) const { + if (!e._docName.empty()) { + auto version_iter = newest_per_doc.find(e._docName); + assert(version_iter != newest_per_doc.end()); + if (e._entry._timestamp != version_iter->second) { + LOG(spam, "ApplyBucketDiff(%s): skipping diff entry %s since it is subsumed by a newer timestamp %" PRIu64, + bucket.toString().c_str(), e.toString().c_str(), version_iter->second); + return; + } + } auto throttle_token = _env._fileStorHandler.operation_throttler().blocking_acquire_one(); spi::Timestamp timestamp(e._entry._timestamp); if (!(e._entry._flags & (DELETED | DELETED_IN_PLACE))) { @@ -516,14 +527,14 @@ MergeHandler::applyDiffEntry(std::shared_ptr<ApplyBucketDiffState> async_results document::DocumentId docId = doc->getId(); auto complete = std::make_unique<ApplyBucketDiffEntryComplete>(std::move(async_results), std::move(docId), std::move(throttle_token), "put", - _clock, _env._metrics.merge_handler_metrics.put_latency); + _clock, _env._metrics.merge_handler_metrics.merge_put_latency); _spi.putAsync(bucket, timestamp, std::move(doc), std::move(complete)); } else { std::vector<spi::IdAndTimestamp> ids; ids.emplace_back(document::DocumentId(e._docName), timestamp); auto complete = std::make_unique<ApplyBucketDiffEntryComplete>(std::move(async_results), ids[0].id, std::move(throttle_token), "remove", - _clock, _env._metrics.merge_handler_metrics.remove_latency); + _clock, _env._metrics.merge_handler_metrics.merge_remove_latency); _spi.removeAsync(bucket, std::move(ids), std::move(complete)); } } @@ -548,6 +559,7 @@ MergeHandler::applyDiffLocally(const spi::Bucket& bucket, std::vector<api::Apply DocEntryList entries; populateMetaData(bucket, Timestamp::max(), entries, context); + const auto newest_versions = enumerate_newest_document_versions(diff); const document::DocumentTypeRepo & repo = _env.getDocumentTypeRepo(); uint32_t existingCount = entries.size(); @@ -580,7 +592,7 @@ MergeHandler::applyDiffLocally(const spi::Bucket& bucket, std::vector<api::Apply ++i; LOG(spam, "ApplyBucketDiff(%s): Adding slot %s", bucket.toString().c_str(), e.toString().c_str()); - applyDiffEntry(async_results, bucket, e, repo); + applyDiffEntry(async_results, bucket, e, repo, newest_versions); } else { assert(spi::Timestamp(e._entry._timestamp) == existing.getTimestamp()); // Diffing for existing timestamp; should either both be put @@ -591,7 +603,7 @@ MergeHandler::applyDiffLocally(const spi::Bucket& bucket, std::vector<api::Apply if ((e._entry._flags & DELETED) && !existing.isRemove()) { LOG(debug, "Slot in diff is remove for existing timestamp in %s. Diff slot: %s. Existing slot: %s", bucket.toString().c_str(), e.toString().c_str(), existing.toString().c_str()); - applyDiffEntry(async_results, bucket, e, repo); + applyDiffEntry(async_results, bucket, e, repo, newest_versions); } else { // Duplicate put, just ignore it. LOG(debug, "During diff apply, attempting to add slot whose timestamp already exists in %s, " @@ -619,7 +631,7 @@ MergeHandler::applyDiffLocally(const spi::Bucket& bucket, std::vector<api::Apply LOG(spam, "ApplyBucketDiff(%s): Adding slot %s", bucket.toString().c_str(), e.toString().c_str()); - applyDiffEntry(async_results, bucket, e, repo); + applyDiffEntry(async_results, bucket, e, repo, newest_versions); byteCount += e._headerBlob.size() + e._bodyBlob.size(); } if (byteCount + notNeededByteCount != 0) { @@ -631,6 +643,27 @@ MergeHandler::applyDiffLocally(const spi::Bucket& bucket, std::vector<api::Apply bucket.toString().c_str(), addedCount); } +MergeHandler::NewestDocumentVersionMapping +MergeHandler::enumerate_newest_document_versions(const std::vector<api::ApplyBucketDiffCommand::Entry>& diff) +{ + NewestDocumentVersionMapping newest_per_doc; + for (const auto& e : diff) { + // We expect the doc name to always be filled out, both for remove operations and for puts. + // But since the latter is technically redundant (ID is also found within the document), we + // guard on this to be forwards compatible in case this changes (e.g. to populate and use + // the GID field instead). Fallback to the legacy behavior if so. + if (e._docName.empty()) { + continue; + } + auto [existing_iter, inserted] = newest_per_doc.insert(std::make_pair(vespalib::stringref(e._docName), e._entry._timestamp)); + if (!inserted) { + assert(existing_iter != newest_per_doc.end()); + existing_iter->second = std::max(existing_iter->second, e._entry._timestamp); + } + } + return newest_per_doc; +} + void MergeHandler::sync_bucket_info(const spi::Bucket& bucket) const { diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h index f3bef802229..2be45e7bc8b 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.h +++ b/storage/src/vespa/storage/persistence/mergehandler.h @@ -18,6 +18,7 @@ #include <vespa/storageapi/message/bucket.h> #include <vespa/storage/common/cluster_context.h> #include <vespa/storage/common/messagesender.h> +#include <vespa/vespalib/stllike/hash_map.h> #include <vespa/vespalib/util/monitored_refcount.h> #include <vespa/storageframework/generic/clock/time.h> @@ -42,6 +43,8 @@ private: using MessageTrackerUP = std::unique_ptr<MessageTracker>; using Timestamp = framework::MicroSecTime; public: + using NewestDocumentVersionMapping = vespalib::hash_map<vespalib::stringref, api::Timestamp>; + enum StateFlag { IN_USE = 0x01, DELETED = 0x02, @@ -72,6 +75,17 @@ public: void handleApplyBucketDiffReply(api::ApplyBucketDiffReply&, MessageSender&, MessageTrackerUP) const; void drain_async_writes(); + /** + * Returns a mapping that, for each document ID, contains the newest version of that document that + * is present in the diff. + * + * The returned hash_map keys point directly into the `ApplyBucketDiffCommand::Entry::_docName` memory + * owned by `diff`, so this memory must remain unchanged and stable for the duration of the returned + * mapping's lifetime. + */ + static NewestDocumentVersionMapping enumerate_newest_document_versions( + const std::vector<api::ApplyBucketDiffCommand::Entry>& diff); + private: using DocEntryList = std::vector<std::unique_ptr<spi::DocEntry>>; const framework::Clock &_clock; @@ -90,9 +104,13 @@ private: /** * Invoke either put, remove or unrevertable remove on the SPI * depending on the flags in the diff entry. + * + * If `newest_doc_version` indicates that the entry is not the newest version present in the + * diff, the entry is silently ignored and is _not_ invoked on the SPI. */ void applyDiffEntry(std::shared_ptr<ApplyBucketDiffState> async_results, const spi::Bucket&, - const api::ApplyBucketDiffCommand::Entry&, const document::DocumentTypeRepo& repo) const; + const api::ApplyBucketDiffCommand::Entry&, const document::DocumentTypeRepo& repo, + const NewestDocumentVersionMapping& newest_per_doc) const; /** * Fill entries-vector with metadata for bucket up to maxTimestamp, diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp index bbd4e87cb40..1b119a0e631 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp +++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp @@ -622,7 +622,14 @@ CommunicationManager::sendDirectRPCReply(RPCRequestWrapper& request, request.addReturnString(ns.str().c_str()); LOGBP(debug, "Sending getnodestate2 reply with no host info."); } else if (requestName == "setsystemstate2" || requestName == "setdistributionstates") { - // No data to return + // No data to return, but the request must be failed iff we rejected the state version + // due to a higher version having been previously received. + auto& state_reply = dynamic_cast<api::SetSystemStateReply&>(*reply); + if (state_reply.getResult().getResult() == api::ReturnCode::REJECTED) { + vespalib::string err_msg = state_reply.getResult().getMessage(); // ReturnCode message is stringref + request.returnError(FRTE_RPC_METHOD_FAILED, err_msg.c_str()); + return; + } } else if (requestName == "activate_cluster_state_version") { auto& activate_reply(dynamic_cast<api::ActivateClusterStateVersionReply&>(*reply)); request.addReturnInt(activate_reply.actualVersion()); diff --git a/storage/src/vespa/storage/storageserver/documentapiconverter.cpp b/storage/src/vespa/storage/storageserver/documentapiconverter.cpp index ca46e87285b..5b8052a05f8 100644 --- a/storage/src/vespa/storage/storageserver/documentapiconverter.cpp +++ b/storage/src/vespa/storage/storageserver/documentapiconverter.cpp @@ -54,6 +54,9 @@ DocumentApiConverter::toStorageAPI(documentapi::DocumentMessage& fromMsg) auto to = std::make_unique<api::UpdateCommand>(bucket, from.stealDocumentUpdate(), from.getNewTimestamp()); to->setOldTimestamp(from.getOldTimestamp()); to->setCondition(from.getCondition()); + if (from.has_cached_create_if_missing()) { + to->set_cached_create_if_missing(from.create_if_missing()); + } toMsg = std::move(to); break; } @@ -217,6 +220,9 @@ DocumentApiConverter::toDocumentAPI(api::StorageCommand& fromMsg) to->setOldTimestamp(from.getOldTimestamp()); to->setNewTimestamp(from.getTimestamp()); to->setCondition(from.getCondition()); + if (from.has_cached_create_if_missing()) { + to->set_cached_create_if_missing(from.create_if_missing()); + } toMsg = std::move(to); break; } diff --git a/storage/src/vespa/storage/storageserver/rpc/cluster_controller_api_rpc_service.cpp b/storage/src/vespa/storage/storageserver/rpc/cluster_controller_api_rpc_service.cpp index 4d74eb1974b..3b53c0c9584 100644 --- a/storage/src/vespa/storage/storageserver/rpc/cluster_controller_api_rpc_service.cpp +++ b/storage/src/vespa/storage/storageserver/rpc/cluster_controller_api_rpc_service.cpp @@ -124,7 +124,8 @@ void ClusterControllerApiRpcService::RPC_setSystemState2(FRT_RPCRequest* req) { req->GetParams()->GetValue(0)._string._len); lib::ClusterState systemState(systemStateStr); - auto cmd = std::make_shared<api::SetSystemStateCommand>(lib::ClusterStateBundle(systemState)); + auto bundle = std::make_shared<const lib::ClusterStateBundle>(systemState); + auto cmd = std::make_shared<api::SetSystemStateCommand>(std::move(bundle)); cmd->setPriority(api::StorageMessage::VERYHIGH); detach_and_forward_to_enqueuer(std::move(cmd), req); @@ -167,8 +168,7 @@ void ClusterControllerApiRpcService::RPC_setDistributionStates(FRT_RPCRequest* r } LOG(debug, "Got state bundle %s", state_bundle->toString().c_str()); - // TODO add constructor taking in shared_ptr directly instead? - auto cmd = std::make_shared<api::SetSystemStateCommand>(*state_bundle); + auto cmd = std::make_shared<api::SetSystemStateCommand>(std::move(state_bundle)); cmd->setPriority(api::StorageMessage::VERYHIGH); detach_and_forward_to_enqueuer(std::move(cmd), req); diff --git a/storage/src/vespa/storage/storageserver/servicelayernode.cpp b/storage/src/vespa/storage/storageserver/servicelayernode.cpp index d18935afe24..98796ee6440 100644 --- a/storage/src/vespa/storage/storageserver/servicelayernode.cpp +++ b/storage/src/vespa/storage/storageserver/servicelayernode.cpp @@ -125,24 +125,28 @@ ServiceLayerNode::initializeNodeSpecific() void ServiceLayerNode::handleLiveConfigUpdate(const InitialGuard & initGuard) { - if (_server_config.staging) { - bool updated = false; - vespa::config::content::core::StorServerConfigBuilder oldC(*_server_config.active); - StorServerConfig& newC(*_server_config.staging); - { - updated = false; - NodeStateUpdater::Lock::SP lock(_component->getStateUpdater().grabStateChangeLock()); - lib::NodeState ns(*_component->getStateUpdater().getReportedNodeState()); - if (DIFFER(nodeCapacity)) { - LOG(info, "Live config update: Updating node capacity from %f to %f.", - oldC.nodeCapacity, newC.nodeCapacity); - ASSIGN(nodeCapacity); - ns.setCapacity(newC.nodeCapacity); - } - if (updated) { - // FIXME this always gets overwritten by StorageNode::handleLiveConfigUpdate...! Intentional? - _server_config.active = std::make_unique<vespa::config::content::core::StorServerConfig>(oldC); - _component->getStateUpdater().setReportedNodeState(ns); + { + std::lock_guard config_lock(_configLock); + // Live server config patching happens both here and in StorageNode::handleLiveConfigUpdate, + // which we have to delegate to afterward (_without_ holding _configLock at the time). + if (_server_config.staging) { + bool updated = false; + vespa::config::content::core::StorServerConfigBuilder oldC(*_server_config.active); + StorServerConfig& newC(*_server_config.staging); + { + NodeStateUpdater::Lock::SP lock(_component->getStateUpdater().grabStateChangeLock()); + lib::NodeState ns(*_component->getStateUpdater().getReportedNodeState()); + if (DIFFER(nodeCapacity)) { + LOG(info, "Live config update: Updating node capacity from %f to %f.", + oldC.nodeCapacity, newC.nodeCapacity); + ASSIGN(nodeCapacity); + ns.setCapacity(newC.nodeCapacity); + } + if (updated) { + // FIXME the patching of old config vs new config is confusing and error-prone. Redesign! + _server_config.active = std::make_unique<vespa::config::content::core::StorServerConfig>(oldC); + _component->getStateUpdater().setReportedNodeState(ns); + } } } } diff --git a/storage/src/vespa/storage/storageserver/statemanager.cpp b/storage/src/vespa/storage/storageserver/statemanager.cpp index adebaa51c08..a2106dce8d2 100644 --- a/storage/src/vespa/storage/storageserver/statemanager.cpp +++ b/storage/src/vespa/storage/storageserver/statemanager.cpp @@ -21,7 +21,7 @@ #include <fstream> #include <ranges> -#include <vespa/log/log.h> +#include <vespa/log/bufferedlogger.h> LOG_SETUP(".state.manager"); using vespalib::make_string_short::fmt; @@ -74,10 +74,13 @@ StateManager::StateManager(StorageComponentRegister& compReg, _health_ping_time(), _health_ping_warn_interval(5min), _health_ping_warn_time(_start_time + _health_ping_warn_interval), + _last_accepted_cluster_state_time(), + _last_observed_version_from_cc(), _hostInfo(std::move(hostInfo)), _controllers_observed_explicit_node_state(), _noThreadTestMode(testMode), _grabbedExternalLock(false), + _require_strictly_increasing_cluster_state_versions(false), _notifyingListeners(false), _requested_almost_immediate_node_state_replies(false) { @@ -436,21 +439,67 @@ StateManager::mark_controller_as_having_observed_explicit_node_state(const std:: _controllers_observed_explicit_node_state.emplace(controller_index); } -void -StateManager::setClusterStateBundle(const ClusterStateBundle& c) +std::optional<uint32_t> +StateManager::try_set_cluster_state_bundle(std::shared_ptr<const ClusterStateBundle> c, + uint16_t origin_controller_index) { { std::lock_guard lock(_stateLock); - _nextSystemState = std::make_shared<const ClusterStateBundle>(c); + uint32_t effective_active_version = (_nextSystemState ? _nextSystemState->getVersion() + : _systemState->getVersion()); + const auto now = _component.getClock().getMonotonicTime(); + const uint32_t last_ver_from_cc = _last_observed_version_from_cc[origin_controller_index]; + _last_observed_version_from_cc[origin_controller_index] = c->getVersion(); + + if (_require_strictly_increasing_cluster_state_versions && (c->getVersion() < effective_active_version)) { + if (c->getVersion() >= last_ver_from_cc) { + constexpr auto reject_warn_threshold = 30s; + if (now - _last_accepted_cluster_state_time <= reject_warn_threshold) { + LOG(debug, "Rejecting cluster state with version %u from cluster controller %u, as " + "we've already accepted version %u. Recently accepted another cluster state, " + "so assuming transient CC leadership period overlap.", + c->getVersion(), origin_controller_index, effective_active_version); + } else { + // Rejections have happened for some time. Make a bit of noise. + LOGBP(warning, "Rejecting cluster state with version %u from cluster controller %u, as " + "we've already accepted version %u.", + c->getVersion(), origin_controller_index, effective_active_version); + } + return {effective_active_version}; + } else { + // SetSystemState RPCs are FIFO-ordered and a particular CC should enforce strictly increasing + // cluster state versions through its ZooKeeper quorum (but commands may be resent for a given + // version). This means that commands should contain _monotonically increasing_ versions from + // a given CC origin index. + // If this is _not_ the case, it indicates ZooKeeper state on the CCs has been lost or wiped, + // at which point we have no other realistic choice than to accept the version, or the system + // will stall until an operator manually intervenes by restarting the content cluster. + LOG(error, "Received cluster state version %u from cluster controller %u, which is lower than " + "the current state version (%u) and the last received version (%u) from the same controller. " + "This indicates loss of cluster controller ZooKeeper state; accepting lower version to " + "prevent content cluster operations from stalling for an indeterminate amount of time.", + c->getVersion(), origin_controller_index, effective_active_version, last_ver_from_cc); + // Fall through to state acceptance. + } + } + _last_accepted_cluster_state_time = now; + _nextSystemState = std::move(c); } notifyStateListeners(); + return std::nullopt; } bool StateManager::onSetSystemState(const std::shared_ptr<api::SetSystemStateCommand>& cmd) { - setClusterStateBundle(cmd->getClusterStateBundle()); - sendUp(std::make_shared<api::SetSystemStateReply>(*cmd)); + auto reply = std::make_shared<api::SetSystemStateReply>(*cmd); + const auto maybe_rejected_by_ver = try_set_cluster_state_bundle(cmd->cluster_state_bundle_ptr(), cmd->getSourceIndex()); + if (maybe_rejected_by_ver) { + reply->setResult(api::ReturnCode(api::ReturnCode::REJECTED, + fmt("Cluster state version %u rejected; node already has a higher cluster state version (%u)", + cmd->getClusterStateBundle().getVersion(), *maybe_rejected_by_ver))); + } + sendUp(reply); return true; } @@ -520,6 +569,13 @@ StateManager::tick() { warn_on_missing_health_ping(); } +void +StateManager::set_require_strictly_increasing_cluster_state_versions(bool req) noexcept +{ + std::lock_guard guard(_stateLock); + _require_strictly_increasing_cluster_state_versions = req; +} + bool StateManager::sendGetNodeStateReplies() { return sendGetNodeStateReplies(0xffff); diff --git a/storage/src/vespa/storage/storageserver/statemanager.h b/storage/src/vespa/storage/storageserver/statemanager.h index 72b89dc4d65..d116f968731 100644 --- a/storage/src/vespa/storage/storageserver/statemanager.h +++ b/storage/src/vespa/storage/storageserver/statemanager.h @@ -1,9 +1,6 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. /** - * @class storage::StateManager - * @ingroup storageserver - * - * @brief Keeps and updates node and system states. + * Keeps and updates node and system states. * * This component implements the NodeStateUpdater interface to handle states * for all components. See that interface for documentation. @@ -22,11 +19,13 @@ #include <vespa/storageapi/message/state.h> #include <vespa/storageapi/messageapi/storagemessage.h> #include <vespa/vespalib/objects/floatingpointtype.h> +#include <vespa/vespalib/stllike/hash_map.h> +#include <atomic> #include <deque> +#include <list> #include <map> +#include <optional> #include <unordered_set> -#include <list> -#include <atomic> namespace metrics { class MetricManager; @@ -69,6 +68,8 @@ class StateManager : public NodeStateUpdater, std::optional<vespalib::steady_time> _health_ping_time; vespalib::duration _health_ping_warn_interval; vespalib::steady_time _health_ping_warn_time; + vespalib::steady_time _last_accepted_cluster_state_time; + vespalib::hash_map<uint16_t, uint32_t> _last_observed_version_from_cc; std::unique_ptr<HostInfo> _hostInfo; std::unique_ptr<framework::Thread> _thread; // Controllers that have observed a GetNodeState response sent _after_ @@ -76,6 +77,7 @@ class StateManager : public NodeStateUpdater, std::unordered_set<uint16_t> _controllers_observed_explicit_node_state; bool _noThreadTestMode; bool _grabbedExternalLock; + bool _require_strictly_increasing_cluster_state_versions; std::atomic<bool> _notifyingListeners; std::atomic<bool> _requested_almost_immediate_node_state_replies; @@ -90,6 +92,9 @@ public: void tick(); void warn_on_missing_health_ping(); + // Precondition: internal state mutex must not be held + void set_require_strictly_increasing_cluster_state_versions(bool req) noexcept; + void print(std::ostream& out, bool verbose, const std::string& indent) const override; void reportHtmlStatus(std::ostream&, const framework::HttpUrlPath&) const override; @@ -102,7 +107,11 @@ public: Lock::SP grabStateChangeLock() override; void setReportedNodeState(const lib::NodeState& state) override; - void setClusterStateBundle(const ClusterStateBundle& c); + // Iff state was accepted, returns std::nullopt + // Otherwise (i.e. state was rejected due to a higher version already having been accepted) + // returns an optional containing the current, higher cluster state version. + [[nodiscard]] std::optional<uint32_t> try_set_cluster_state_bundle(std::shared_ptr<const ClusterStateBundle> c, + uint16_t origin_controller_index); HostInfo& getHostInfo() { return *_hostInfo; } void immediately_send_get_node_state_replies() override; diff --git a/storage/src/vespa/storage/storageserver/storagenode.cpp b/storage/src/vespa/storage/storageserver/storagenode.cpp index f7a426a0527..5b665f830d3 100644 --- a/storage/src/vespa/storage/storageserver/storagenode.cpp +++ b/storage/src/vespa/storage/storageserver/storagenode.cpp @@ -92,6 +92,7 @@ StorageNode::StorageNode( _statusMetrics(), _stateReporter(), _stateManager(), + _state_manager_ptr(nullptr), _chain(), _configLock(), _initial_config_mutex(), @@ -140,18 +141,20 @@ StorageNode::initialize(const NodeStateReporter & nodeStateReporter) // Initializing state manager early, as others use it init time to // update node state according min used bits etc. // Needs node type to be set right away. Needs thread pool, index and - // dead lock detector too, but not before open() + // deadlock detector too, but not before open() _stateManager = std::make_unique<StateManager>( _context.getComponentRegister(), std::move(_hostInfo), nodeStateReporter, _singleThreadedDebugMode); + _stateManager->set_require_strictly_increasing_cluster_state_versions(server_config().requireStrictlyIncreasingClusterStateVersions); + _state_manager_ptr = _stateManager.get(); _context.getComponentRegister().setNodeStateUpdater(*_stateManager); - // Create VDS root folder, in case it doesn't already exist. - // Maybe better to rather fail if it doesn't exist, but tests - // might break if we do that. Might alter later. - std::filesystem::create_directories(std::filesystem::path(_rootFolder)); + // Create storage root folder, in case it doesn't already exist. + if (!_rootFolder.empty()) { + std::filesystem::create_directories(std::filesystem::path(_rootFolder)); + } // else: running as part of unit tests initializeNodeSpecific(); @@ -192,13 +195,16 @@ StorageNode::initialize(const NodeStateReporter & nodeStateReporter) initializeStatusWebServer(); + if (server_config().writePidFileOnStartup) { + assert(!_rootFolder.empty()); // Write pid file as the last thing we do. If we fail initialization // due to an exception we won't run shutdown. Thus we won't remove the // pid file if something throws after writing it in initialization. // Initialize _pidfile here, such that we can know that we didn't create // it in shutdown code for shutdown during init. - _pidFile = _rootFolder + "/pidfile"; - writePidFile(_pidFile); + _pidFile = _rootFolder + "/pidfile"; + writePidFile(_pidFile); + } } void @@ -242,12 +248,21 @@ StorageNode::handleLiveConfigUpdate(const InitialGuard & initGuard) DIFFERWARN(clusterName, "Cannot alter cluster name of node live"); DIFFERWARN(nodeIndex, "Cannot alter node index of node live"); DIFFERWARN(isDistributor, "Cannot alter role of node live"); - _server_config.active = std::make_unique<StorServerConfig>(oldC); // TODO this overwrites from ServiceLayerNode + [[maybe_unused]] bool updated = false; // magically touched by ASSIGN() macro. TODO rewrite this fun stuff. + if (DIFFER(requireStrictlyIncreasingClusterStateVersions)) { + LOG(info, "Live config update: require strictly increasing cluster state versions: %s -> %s", + (oldC.requireStrictlyIncreasingClusterStateVersions ? "true" : "false"), + (newC.requireStrictlyIncreasingClusterStateVersions ? "true" : "false")); + ASSIGN(requireStrictlyIncreasingClusterStateVersions); + } + _server_config.active = std::make_unique<StorServerConfig>(oldC); _server_config.staging.reset(); _deadLockDetector->enableWarning(server_config().enableDeadLockDetectorWarnings); _deadLockDetector->enableShutdown(server_config().enableDeadLockDetector); _deadLockDetector->setProcessSlack(vespalib::from_s(server_config().deadLockDetectorTimeoutSlack)); _deadLockDetector->setWaitSlack(vespalib::from_s(server_config().deadLockDetectorTimeoutSlack)); + assert(_state_manager_ptr); + _state_manager_ptr->set_require_strictly_increasing_cluster_state_versions(server_config().requireStrictlyIncreasingClusterStateVersions); } if (_distribution_config.staging) { StorDistributionConfigBuilder oldC(*_distribution_config.active); @@ -437,11 +452,17 @@ StorageNode::stage_config_change(ConfigWrapper<ConfigT>& cfg, std::unique_ptr<Co // else is doing configuration work, and then we write the new config // to a variable where we can find it later when processing config // updates + // TODO bail if we're shutting down to avoid racing with chain destruction? + // - only relevant for distribution config since it's not pushed by main thread + // - or have some way of injecting config changes from that level...? must be done atomically! + // - ideally want to expose cluster state _bundles_ to relevant components, not config alone! + bool live_update; { std::lock_guard config_lock_guard(_configLock); cfg.staging = std::move(new_cfg); + live_update = static_cast<bool>(cfg.active); } - if (cfg.active) { + if (live_update) { InitialGuard concurrent_config_guard(_initial_config_mutex); handleLiveConfigUpdate(concurrent_config_guard); } diff --git a/storage/src/vespa/storage/storageserver/storagenode.h b/storage/src/vespa/storage/storageserver/storagenode.h index a96f6b52a66..93265bece3c 100644 --- a/storage/src/vespa/storage/storageserver/storagenode.h +++ b/storage/src/vespa/storage/storageserver/storagenode.h @@ -135,6 +135,10 @@ private: // Depends on metric manager std::unique_ptr<StateReporter> _stateReporter; std::unique_ptr<StateManager> _stateManager; + // Node subclasses may take ownership of _stateManager in order to infuse it into + // their own storage link chain, but they MUST ensure its lifetime is maintained. + // We need to remember the original pointer in order to update its config. + StateManager* _state_manager_ptr; // The storage chain can depend on anything. std::unique_ptr<StorageLink> _chain; diff --git a/storage/src/vespa/storageapi/mbusprot/protobuf/feed.proto b/storage/src/vespa/storageapi/mbusprot/protobuf/feed.proto index 55d516a017b..403752b0c84 100644 --- a/storage/src/vespa/storageapi/mbusprot/protobuf/feed.proto +++ b/storage/src/vespa/storageapi/mbusprot/protobuf/feed.proto @@ -31,11 +31,18 @@ message Update { } message UpdateRequest { - Bucket bucket = 1; - Update update = 2; - uint64 new_timestamp = 3; - uint64 expected_old_timestamp = 4; // If zero; no expectation. - TestAndSetCondition condition = 5; + enum CreateIfMissing { + UNSPECIFIED = 0; // Legacy fallback: must deserialize `update` to find flag value + TRUE = 1; + FALSE = 2; + } + + Bucket bucket = 1; + Update update = 2; + uint64 new_timestamp = 3; + uint64 expected_old_timestamp = 4; // If zero; no expectation. + TestAndSetCondition condition = 5; + CreateIfMissing create_if_missing = 6; } message UpdateResponse { diff --git a/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp b/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp index 57047be6037..0f4a34cc775 100644 --- a/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp +++ b/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp @@ -465,6 +465,10 @@ void ProtocolSerialization7::onEncode(GBBuf& buf, const api::UpdateCommand& msg) if (msg.getCondition().isPresent()) { set_tas_condition(*req.mutable_condition(), msg.getCondition()); } + if (msg.has_cached_create_if_missing()) { + req.set_create_if_missing(msg.create_if_missing() ? protobuf::UpdateRequest_CreateIfMissing_TRUE + : protobuf::UpdateRequest_CreateIfMissing_FALSE); + } }); } @@ -482,6 +486,9 @@ api::StorageCommand::UP ProtocolSerialization7::onDecodeUpdateCommand(BBuf& buf) if (req.has_condition()) { cmd->setCondition(get_tas_condition(req.condition())); } + if (req.create_if_missing() != protobuf::UpdateRequest_CreateIfMissing_UNSPECIFIED) { + cmd->set_cached_create_if_missing(req.create_if_missing() == protobuf::UpdateRequest_CreateIfMissing_TRUE); + } return cmd; }); } diff --git a/storage/src/vespa/storageapi/message/persistence.cpp b/storage/src/vespa/storageapi/message/persistence.cpp index 4c24bb74faf..af054855bbe 100644 --- a/storage/src/vespa/storageapi/message/persistence.cpp +++ b/storage/src/vespa/storageapi/message/persistence.cpp @@ -105,13 +105,23 @@ UpdateCommand::UpdateCommand(const document::Bucket &bucket, const document::Doc : TestAndSetCommand(MessageType::UPDATE, bucket), _update(update), _timestamp(time), - _oldTimestamp(0) + _oldTimestamp(0), + _create_if_missing() { if ( ! _update) { throw vespalib::IllegalArgumentException("Cannot update a null update", VESPA_STRLOC); } } +bool +UpdateCommand::create_if_missing() const +{ + if (_create_if_missing.has_value()) { + return *_create_if_missing; + } + return _update->getCreateIfNonExistent(); +} + const document::DocumentType * UpdateCommand::getDocumentType() const { return &_update->getType(); diff --git a/storage/src/vespa/storageapi/message/persistence.h b/storage/src/vespa/storageapi/message/persistence.h index f44ab4e8280..0676e1d0f44 100644 --- a/storage/src/vespa/storageapi/message/persistence.h +++ b/storage/src/vespa/storageapi/message/persistence.h @@ -1,8 +1,6 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. /** - * @file persistence.h - * - * Persistence related commands, like put, get & remove + * Persistence related commands, like put, get & remove */ #pragma once @@ -10,6 +8,7 @@ #include <vespa/storageapi/defs.h> #include <vespa/document/base/documentid.h> #include <vespa/documentapi/messagebus/messages/testandsetcondition.h> +#include <optional> namespace document { class DocumentUpdate; @@ -117,20 +116,32 @@ class UpdateCommand : public TestAndSetCommand { std::shared_ptr<document::DocumentUpdate> _update; Timestamp _timestamp; Timestamp _oldTimestamp; + std::optional<bool> _create_if_missing; // caches the value held (possibly lazily deserialized) in _update public: UpdateCommand(const document::Bucket &bucket, const std::shared_ptr<document::DocumentUpdate>&, Timestamp); ~UpdateCommand() override; - void setTimestamp(Timestamp ts) { _timestamp = ts; } - void setOldTimestamp(Timestamp ts) { _oldTimestamp = ts; } + void setTimestamp(Timestamp ts) noexcept { _timestamp = ts; } + void setOldTimestamp(Timestamp ts) noexcept { _oldTimestamp = ts; } + + [[nodiscard]] bool has_cached_create_if_missing() const noexcept { + return _create_if_missing.has_value(); + } + // It is the caller's responsibility to ensure this value matches that of _update->getCreateIfNonExisting() + void set_cached_create_if_missing(bool create) noexcept { + _create_if_missing = create; + } const std::shared_ptr<document::DocumentUpdate>& getUpdate() const { return _update; } const document::DocumentId& getDocumentId() const override; Timestamp getTimestamp() const { return _timestamp; } Timestamp getOldTimestamp() const { return _oldTimestamp; } + // May throw iff has_cached_create_if_missing() == false, otherwise noexcept. + [[nodiscard]] bool create_if_missing() const; + const document::DocumentType * getDocumentType() const override; vespalib::string getSummary() const override; diff --git a/storage/src/vespa/storageapi/message/state.cpp b/storage/src/vespa/storageapi/message/state.cpp index 5a50167f584..b4e8655d783 100644 --- a/storage/src/vespa/storageapi/message/state.cpp +++ b/storage/src/vespa/storageapi/message/state.cpp @@ -5,8 +5,7 @@ #include <vespa/vdslib/state/clusterstate.h> #include <ostream> -namespace storage { -namespace api { +namespace storage::api { IMPLEMENT_COMMAND(GetNodeStateCommand, GetNodeStateReply) IMPLEMENT_REPLY(GetNodeStateReply) @@ -45,7 +44,7 @@ GetNodeStateReply::GetNodeStateReply(const GetNodeStateCommand& cmd) GetNodeStateReply::GetNodeStateReply(const GetNodeStateCommand& cmd, const lib::NodeState& state) : StorageReply(cmd), - _state(new lib::NodeState(state)) + _state(std::make_unique<lib::NodeState>(state)) { } @@ -64,23 +63,31 @@ GetNodeStateReply::print(std::ostream& out, bool verbose, } } +SetSystemStateCommand::SetSystemStateCommand(std::shared_ptr<const lib::ClusterStateBundle> state) + : StorageCommand(MessageType::SETSYSTEMSTATE), + _state(std::move(state)) +{ +} + SetSystemStateCommand::SetSystemStateCommand(const lib::ClusterStateBundle& state) : StorageCommand(MessageType::SETSYSTEMSTATE), - _state(state) + _state(std::make_shared<const lib::ClusterStateBundle>(state)) { } SetSystemStateCommand::SetSystemStateCommand(const lib::ClusterState& state) : StorageCommand(MessageType::SETSYSTEMSTATE), - _state(state) + _state(std::make_shared<const lib::ClusterStateBundle>(state)) { } +SetSystemStateCommand::~SetSystemStateCommand() = default; + void SetSystemStateCommand::print(std::ostream& out, bool verbose, const std::string& indent) const { - out << "SetSystemStateCommand(" << *_state.getBaselineClusterState() << ")"; + out << "SetSystemStateCommand(" << *_state->getBaselineClusterState() << ")"; if (verbose) { out << " : "; StorageCommand::print(out, verbose, indent); @@ -89,7 +96,7 @@ SetSystemStateCommand::print(std::ostream& out, bool verbose, SetSystemStateReply::SetSystemStateReply(const SetSystemStateCommand& cmd) : StorageReply(cmd), - _state(cmd.getClusterStateBundle()) + _state(cmd.cluster_state_bundle_ptr()) { } @@ -138,5 +145,4 @@ void ActivateClusterStateVersionReply::print(std::ostream& out, bool verbose, } } -} // api -} // storage +} // storage::api diff --git a/storage/src/vespa/storageapi/message/state.h b/storage/src/vespa/storageapi/message/state.h index 900355b12a2..afeb5ae9c11 100644 --- a/storage/src/vespa/storageapi/message/state.h +++ b/storage/src/vespa/storageapi/message/state.h @@ -9,12 +9,6 @@ namespace storage::api { -/** - * @class GetNodeStateCommand - * @ingroup message - * - * @brief Command for setting node state. No payload - */ class GetNodeStateCommand : public StorageCommand { lib::NodeState::UP _expectedState; @@ -27,12 +21,6 @@ public: DECLARE_STORAGECOMMAND(GetNodeStateCommand, onGetNodeState) }; -/** - * @class GetNodeStateReply - * @ingroup message - * - * @brief Reply to GetNodeStateCommand - */ class GetNodeStateReply : public StorageReply { lib::NodeState::UP _state; std::string _nodeInfo; @@ -53,41 +41,38 @@ public: }; /** - * @class SetSystemStateCommand - * @ingroup message - * - * @brief Command for telling a node about the system state - state of each node - * in the system and state of the system (all ok, no merging, block - * put/get/remove etx) + * Command for telling a node about the cluster state - state of each node + * in the cluster and state of the cluster itself (all ok, no merging, block + * put/get/remove etx) */ class SetSystemStateCommand : public StorageCommand { - lib::ClusterStateBundle _state; + std::shared_ptr<const lib::ClusterStateBundle> _state; public: + explicit SetSystemStateCommand(std::shared_ptr<const lib::ClusterStateBundle> state); explicit SetSystemStateCommand(const lib::ClusterStateBundle &state); explicit SetSystemStateCommand(const lib::ClusterState &state); - const lib::ClusterState& getSystemState() const { return *_state.getBaselineClusterState(); } - const lib::ClusterStateBundle& getClusterStateBundle() const { return _state; } + ~SetSystemStateCommand() override; + + [[nodiscard]] const lib::ClusterState& getSystemState() const { return *_state->getBaselineClusterState(); } + [[nodiscard]] const lib::ClusterStateBundle& getClusterStateBundle() const { return *_state; } + [[nodiscard]] std::shared_ptr<const lib::ClusterStateBundle> cluster_state_bundle_ptr() const noexcept { + return _state; + } void print(std::ostream& out, bool verbose, const std::string& indent) const override; DECLARE_STORAGECOMMAND(SetSystemStateCommand, onSetSystemState) }; -/** - * @class SetSystemStateReply - * @ingroup message - * - * @brief Reply received after a SetSystemStateCommand. - */ class SetSystemStateReply : public StorageReply { - lib::ClusterStateBundle _state; + std::shared_ptr<const lib::ClusterStateBundle> _state; public: explicit SetSystemStateReply(const SetSystemStateCommand& cmd); // Not serialized. Available locally - const lib::ClusterState& getSystemState() const { return *_state.getBaselineClusterState(); } - const lib::ClusterStateBundle& getClusterStateBundle() const { return _state; } + const lib::ClusterState& getSystemState() const { return *_state->getBaselineClusterState(); } + const lib::ClusterStateBundle& getClusterStateBundle() const { return *_state; } void print(std::ostream& out, bool verbose, const std::string& indent) const override; DECLARE_STORAGEREPLY(SetSystemStateReply, onSetSystemStateReply) |