aboutsummaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
Diffstat (limited to 'storage')
-rw-r--r--storage/CMakeLists.txt1
-rw-r--r--storage/src/tests/bucketdb/bucketmanagertest.cpp51
-rw-r--r--storage/src/tests/common/CMakeLists.txt3
-rw-r--r--storage/src/tests/common/global_bucket_space_distribution_converter_test.cpp350
-rw-r--r--storage/src/tests/common/metricstest.cpp29
-rw-r--r--storage/src/tests/common/storage_config_set.cpp153
-rw-r--r--storage/src/tests/common/storage_config_set.h122
-rw-r--r--storage/src/tests/common/testhelper.cpp148
-rw-r--r--storage/src/tests/common/testhelper.h28
-rw-r--r--storage/src/tests/common/teststorageapp.cpp92
-rw-r--r--storage/src/tests/common/teststorageapp.h21
-rw-r--r--storage/src/tests/distributor/CMakeLists.txt16
-rw-r--r--storage/src/tests/distributor/distributor_stripe_test_util.cpp12
-rw-r--r--storage/src/tests/distributor/distributor_stripe_test_util.h8
-rw-r--r--storage/src/tests/distributor/externaloperationhandlertest.cpp4
-rw-r--r--storage/src/tests/distributor/idealstatemanagertest.cpp8
-rw-r--r--storage/src/tests/distributor/mock_tickable_stripe.h2
-rw-r--r--storage/src/tests/distributor/operationtargetresolvertest.cpp58
-rw-r--r--storage/src/tests/distributor/statusreporterdelegatetest.cpp5
-rw-r--r--storage/src/tests/distributor/top_level_distributor_test_util.cpp6
-rw-r--r--storage/src/tests/distributor/top_level_distributor_test_util.h3
-rw-r--r--storage/src/tests/distributor/updateoperationtest.cpp23
-rw-r--r--storage/src/tests/frameworkimpl/status/statustest.cpp7
-rw-r--r--storage/src/tests/persistence/bucketownershipnotifiertest.cpp5
-rw-r--r--storage/src/tests/persistence/common/filestortestfixture.cpp15
-rw-r--r--storage/src/tests/persistence/common/filestortestfixture.h3
-rw-r--r--storage/src/tests/persistence/filestorage/filestormanagertest.cpp45
-rw-r--r--storage/src/tests/persistence/filestorage/filestormodifiedbucketstest.cpp2
-rw-r--r--storage/src/tests/persistence/filestorage/modifiedbucketcheckertest.cpp11
-rw-r--r--storage/src/tests/persistence/mergehandlertest.cpp54
-rw-r--r--storage/src/tests/persistence/persistencetestutils.cpp16
-rw-r--r--storage/src/tests/persistence/persistencetestutils.h5
-rw-r--r--storage/src/tests/persistence/provider_error_wrapper_test.cpp5
-rw-r--r--storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp34
-rw-r--r--storage/src/tests/storageserver/bouncertest.cpp10
-rw-r--r--storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp19
-rw-r--r--storage/src/tests/storageserver/communicationmanagertest.cpp78
-rw-r--r--storage/src/tests/storageserver/documentapiconvertertest.cpp62
-rw-r--r--storage/src/tests/storageserver/mergethrottlertest.cpp18
-rw-r--r--storage/src/tests/storageserver/rpc/cluster_controller_rpc_api_service_test.cpp12
-rw-r--r--storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp14
-rw-r--r--storage/src/tests/storageserver/service_layer_error_listener_test.cpp8
-rw-r--r--storage/src/tests/storageserver/statemanagertest.cpp140
-rw-r--r--storage/src/tests/storageserver/statereportertest.cpp17
-rw-r--r--storage/src/tests/visiting/visitormanagertest.cpp21
-rw-r--r--storage/src/tests/visiting/visitortest.cpp38
-rw-r--r--storage/src/vespa/storage/bucketdb/bucketmanager.cpp23
-rw-r--r--storage/src/vespa/storage/common/CMakeLists.txt1
-rw-r--r--storage/src/vespa/storage/common/content_bucket_space.cpp44
-rw-r--r--storage/src/vespa/storage/common/content_bucket_space.h32
-rw-r--r--storage/src/vespa/storage/common/global_bucket_space_distribution_converter.cpp181
-rw-r--r--storage/src/vespa/storage/common/global_bucket_space_distribution_converter.h22
-rw-r--r--storage/src/vespa/storage/config/distributorconfiguration.cpp2
-rw-r--r--storage/src/vespa/storage/config/distributorconfiguration.h6
-rw-r--r--storage/src/vespa/storage/config/stor-distributormanager.def7
-rw-r--r--storage/src/vespa/storage/config/stor-server.def12
-rw-r--r--storage/src/vespa/storage/distributor/CMakeLists.txt1
-rw-r--r--storage/src/vespa/storage/distributor/activecopy.cpp5
-rw-r--r--storage/src/vespa/storage/distributor/bucket_space_distribution_configs.cpp17
-rw-r--r--storage/src/vespa/storage/distributor/bucket_space_distribution_configs.h27
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe.cpp6
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe.h2
-rw-r--r--storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h2
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/putoperation.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp5
-rw-r--r--storage/src/vespa/storage/distributor/operationtargetresolverimpl.cpp79
-rw-r--r--storage/src/vespa/storage/distributor/operationtargetresolverimpl.h22
-rw-r--r--storage/src/vespa/storage/distributor/pendingclusterstate.cpp1
-rw-r--r--storage/src/vespa/storage/distributor/stripe_access_guard.h4
-rw-r--r--storage/src/vespa/storage/distributor/tickable_stripe.h2
-rw-r--r--storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp12
-rw-r--r--storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h9
-rw-r--r--storage/src/vespa/storage/distributor/top_level_distributor.cpp8
-rw-r--r--storage/src/vespa/storage/frameworkimpl/component/servicelayercomponentregisterimpl.cpp4
-rw-r--r--storage/src/vespa/storage/persistence/bucketownershipnotifier.cpp1
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp11
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/merge_handler_metrics.cpp4
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/merge_handler_metrics.h4
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.cpp53
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.h20
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.cpp9
-rw-r--r--storage/src/vespa/storage/storageserver/documentapiconverter.cpp6
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/cluster_controller_api_rpc_service.cpp6
-rw-r--r--storage/src/vespa/storage/storageserver/servicelayernode.cpp40
-rw-r--r--storage/src/vespa/storage/storageserver/statemanager.cpp68
-rw-r--r--storage/src/vespa/storage/storageserver/statemanager.h23
-rw-r--r--storage/src/vespa/storage/storageserver/storagenode.cpp39
-rw-r--r--storage/src/vespa/storage/storageserver/storagenode.h4
-rw-r--r--storage/src/vespa/storageapi/mbusprot/protobuf/feed.proto17
-rw-r--r--storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp7
-rw-r--r--storage/src/vespa/storageapi/message/persistence.cpp12
-rw-r--r--storage/src/vespa/storageapi/message/persistence.h21
-rw-r--r--storage/src/vespa/storageapi/message/state.cpp24
-rw-r--r--storage/src/vespa/storageapi/message/state.h45
96 files changed, 1378 insertions, 1360 deletions
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index eeec705b13f..c59d095fb4c 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -54,7 +54,6 @@ vespa_define_module(
TEST_DEPENDS
messagebus_messagebus-test
- vdstestlib
TEST_EXTERNAL_DEPENDS
${VESPA_ATOMIC_LIB}
diff --git a/storage/src/tests/bucketdb/bucketmanagertest.cpp b/storage/src/tests/bucketdb/bucketmanagertest.cpp
index 92547a83d25..6aff5c52598 100644
--- a/storage/src/tests/bucketdb/bucketmanagertest.cpp
+++ b/storage/src/tests/bucketdb/bucketmanagertest.cpp
@@ -3,6 +3,7 @@
#include <tests/common/dummystoragelink.h>
#include <tests/common/testhelper.h>
#include <tests/common/teststorageapp.h>
+#include <tests/common/storage_config_set.h>
#include <vespa/config/helper/configgetter.hpp>
#include <vespa/document/config/config-documenttypes.h>
#include <vespa/document/datatype/documenttype.h>
@@ -13,17 +14,17 @@
#include <vespa/document/update/documentupdate.h>
#include <vespa/metrics/updatehook.h>
#include <vespa/storage/bucketdb/bucketmanager.h>
-#include <vespa/storage/common/global_bucket_space_distribution_converter.h>
#include <vespa/storage/persistence/filestorage/filestormanager.h>
#include <vespa/storageapi/message/bucketsplitting.h>
#include <vespa/storageapi/message/persistence.h>
#include <vespa/storageapi/message/state.h>
#include <vespa/vdslib/distribution/distribution.h>
+#include <vespa/vdslib/distribution/global_bucket_space_distribution_converter.h>
#include <vespa/vdslib/state/clusterstate.h>
#include <vespa/vdslib/state/random.h>
#include <vespa/vespalib/gtest/gtest.h>
#include <vespa/vespalib/stllike/asciistream.h>
-#include <vespa/config-stor-filestor.h>
+#include <vespa/config-stor-distribution.h>
#include <future>
#include <vespa/log/log.h>
@@ -59,6 +60,7 @@ struct TestParams;
struct BucketManagerTest : public Test {
public:
+ std::unique_ptr<StorageConfigSet> _config;
std::unique_ptr<TestServiceLayerApp> _node;
std::unique_ptr<DummyStorageLink> _top;
BucketManager *_manager;
@@ -69,8 +71,7 @@ public:
~BucketManagerTest() override;
- void setupTestEnvironment(bool fakePersistenceLayer = true,
- bool noDelete = false);
+ void setupTestEnvironment();
void addBucketsToDB(uint32_t count);
bool wasBlockedDueToLastModified(api::StorageMessage* msg, uint64_t lastModified);
void insertSingleBucket(const document::BucketId& bucket, const api::BucketInfo& info);
@@ -125,46 +126,24 @@ BucketManagerTest::~BucketManagerTest() = default;
FAIL() << ost.str(); \
}
-std::string getMkDirDisk(const std::string & rootFolder, int disk) {
- std::ostringstream os;
- os << "mkdir -p " << rootFolder << "/disks/d" << disk;
- return os.str();
-}
-
-void BucketManagerTest::setupTestEnvironment(bool fakePersistenceLayer, bool noDelete)
+void BucketManagerTest::setupTestEnvironment()
{
- vdstestlib::DirConfig config(getStandardConfig(true, "bucketmanagertest"));
- std::string rootFolder = getRootFolder(config);
- if (!noDelete) {
- assert(system(("rm -rf " + rootFolder).c_str()) == 0);
- }
- assert(system(getMkDirDisk(rootFolder, 0).c_str()) == 0);
- assert(system(getMkDirDisk(rootFolder, 1).c_str()) == 0);
-
+ _config = StorageConfigSet::make_storage_node_config();
auto repo = std::make_shared<const DocumentTypeRepo>(
*ConfigGetter<DocumenttypesConfig>::getConfig("config-doctypes", FileSpec("../config-doctypes.cfg")));
_top = std::make_unique<DummyStorageLink>();
- _node = std::make_unique<TestServiceLayerApp>(NodeIndex(0), config.getConfigId());
+ _node = std::make_unique<TestServiceLayerApp>(NodeIndex(0), _config->config_uri());
_node->setTypeRepo(repo);
_node->setupDummyPersistence();
// Set up the 3 links
- auto config_uri = config::ConfigUri(config.getConfigId());
using vespa::config::content::core::StorServerConfig;
- auto manager = std::make_unique<BucketManager>(*config_from<StorServerConfig>(config_uri), _node->getComponentRegister());
+ auto manager = std::make_unique<BucketManager>(*config_from<StorServerConfig>(_config->config_uri()), _node->getComponentRegister());
_manager = manager.get();
_top->push_back(std::move(manager));
- if (fakePersistenceLayer) {
- auto bottom = std::make_unique<DummyStorageLink>();
- _bottom = bottom.get();
- _top->push_back(std::move(bottom));
- } else {
- using StorFilestorConfig = vespa::config::content::internal::InternalStorFilestorType;
- auto bottom = std::make_unique<FileStorManager>(*config_from<StorFilestorConfig>(config_uri),
- _node->getPersistenceProvider(), _node->getComponentRegister(),
- *_node, _node->get_host_info());
- _top->push_back(std::move(bottom));
- }
- // Generate a doc to use for testing..
+ auto bottom = std::make_unique<DummyStorageLink>();
+ _bottom = bottom.get();
+ _top->push_back(std::move(bottom));
+
const DocumentType &type(*_node->getTypeRepo()->getDocumentType("text/html"));
_document = std::make_shared<document::Document>(*_node->getTypeRepo(), type, document::DocumentId("id:ns:text/html::ntnu"));
}
@@ -689,7 +668,7 @@ public:
static std::unique_ptr<lib::Distribution> default_grouped_distribution() {
return std::make_unique<lib::Distribution>(
- lib::Distribution::ConfigWrapper(GlobalBucketSpaceDistributionConverter::string_to_config(vespalib::string(
+ lib::Distribution::ConfigWrapper(lib::GlobalBucketSpaceDistributionConverter::string_to_config(vespalib::string(
R"(redundancy 2
group[3]
group[0].name "invalid"
@@ -713,7 +692,7 @@ group[2].nodes[2].index 5
static std::shared_ptr<lib::Distribution> derived_global_grouped_distribution() {
auto default_distr = default_grouped_distribution();
- return GlobalBucketSpaceDistributionConverter::convert_to_global(*default_distr);
+ return lib::GlobalBucketSpaceDistributionConverter::convert_to_global(*default_distr);
}
private:
diff --git a/storage/src/tests/common/CMakeLists.txt b/storage/src/tests/common/CMakeLists.txt
index 36487660cce..42f2ac7b40b 100644
--- a/storage/src/tests/common/CMakeLists.txt
+++ b/storage/src/tests/common/CMakeLists.txt
@@ -3,7 +3,7 @@ vespa_add_library(storage_testcommon TEST
SOURCES
dummystoragelink.cpp
message_sender_stub.cpp
- testhelper.cpp
+ storage_config_set.cpp
testnodestateupdater.cpp
teststorageapp.cpp
DEPENDS
@@ -14,7 +14,6 @@ vespa_add_executable(storage_common_gtest_runner_app TEST
SOURCES
bucket_stripe_utils_test.cpp
bucket_utils_test.cpp
- global_bucket_space_distribution_converter_test.cpp
gtest_runner.cpp
metricstest.cpp
storagelinktest.cpp
diff --git a/storage/src/tests/common/global_bucket_space_distribution_converter_test.cpp b/storage/src/tests/common/global_bucket_space_distribution_converter_test.cpp
deleted file mode 100644
index 774f90821fa..00000000000
--- a/storage/src/tests/common/global_bucket_space_distribution_converter_test.cpp
+++ /dev/null
@@ -1,350 +0,0 @@
-// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include <vespa/storage/common/global_bucket_space_distribution_converter.h>
-#include <vespa/vdslib/distribution/distribution.h>
-#include <vespa/vdslib/state/clusterstate.h>
-#include <vespa/vespalib/gtest/gtest.h>
-
-using namespace ::testing;
-
-namespace storage {
-
-using DistributionConfig = vespa::config::content::StorDistributionConfig;
-
-namespace {
-
-vespalib::string default_to_global_config(const vespalib::string& default_config) {
- auto default_cfg = GlobalBucketSpaceDistributionConverter::string_to_config(default_config);
- auto as_global = GlobalBucketSpaceDistributionConverter::convert_to_global(*default_cfg);
- return GlobalBucketSpaceDistributionConverter::config_to_string(*as_global);
-}
-
-vespalib::string default_flat_config(
-R"(redundancy 1
-group[1]
-group[0].name "invalid"
-group[0].index "invalid"
-group[0].partitions 1|*
-group[0].nodes[3]
-group[0].nodes[0].index 0
-group[0].nodes[1].index 1
-group[0].nodes[2].index 2
-)");
-
-vespalib::string expected_flat_global_config(
-R"(redundancy 3
-initial_redundancy 0
-ensure_primary_persisted true
-ready_copies 3
-active_per_leaf_group true
-group[0].index "invalid"
-group[0].name "invalid"
-group[0].capacity 1
-group[0].partitions "*"
-group[0].nodes[0].index 0
-group[0].nodes[0].retired false
-group[0].nodes[1].index 1
-group[0].nodes[1].retired false
-group[0].nodes[2].index 2
-group[0].nodes[2].retired false
-)");
-
-}
-
-TEST(GlobalBucketSpaceDistributionConverterTest, can_transform_flat_cluster_config) {
- EXPECT_EQ(expected_flat_global_config, default_to_global_config(default_flat_config));
-}
-
-
-TEST(GlobalBucketSpaceDistributionConverterTest, can_transform_single_level_multi_group_config) {
- vespalib::string default_config(
-R"(redundancy 2
-group[3]
-group[0].name "invalid"
-group[0].index "invalid"
-group[0].partitions 1|*
-group[0].nodes[0]
-group[1].name rack0
-group[1].index 0
-group[1].nodes[3]
-group[1].nodes[0].index 0
-group[1].nodes[1].index 1
-group[1].nodes[2].index 2
-group[2].name rack1
-group[2].index 1
-group[2].nodes[3]
-group[2].nodes[0].index 3
-group[2].nodes[1].index 4
-group[2].nodes[2].index 5
-)");
-
- // The config converter cannot distinguish between default values
- // and explicitly set ones, so we get a few more entries in our output
- // config string.
- // Most crucial parts of the transformed config is the root redundancy
- // and the new partition config. We test _all_ config fields here so that
- // we catch anything we miss transferring state of.
- vespalib::string expected_global_config(
-R"(redundancy 6
-initial_redundancy 0
-ensure_primary_persisted true
-ready_copies 6
-active_per_leaf_group true
-group[0].index "invalid"
-group[0].name "invalid"
-group[0].capacity 1
-group[0].partitions "*|*"
-group[1].index "0"
-group[1].name "rack0"
-group[1].capacity 1
-group[1].partitions ""
-group[1].nodes[0].index 0
-group[1].nodes[0].retired false
-group[1].nodes[1].index 1
-group[1].nodes[1].retired false
-group[1].nodes[2].index 2
-group[1].nodes[2].retired false
-group[2].index "1"
-group[2].name "rack1"
-group[2].capacity 1
-group[2].partitions ""
-group[2].nodes[0].index 3
-group[2].nodes[0].retired false
-group[2].nodes[1].index 4
-group[2].nodes[1].retired false
-group[2].nodes[2].index 5
-group[2].nodes[2].retired false
-)");
- EXPECT_EQ(expected_global_config, default_to_global_config(default_config));
-}
-
-TEST(GlobalBucketSpaceDistributionConverterTest, can_transform_multi_level_multi_group_config) {
- vespalib::string default_config(
-R"(redundancy 2
-group[5]
-group[0].name "invalid"
-group[0].index "invalid"
-group[0].partitions *|*
-group[0].nodes[0]
-group[1].name switch0
-group[1].index 0
-group[1].partitions 1|*
-group[1].nodes[0]
-group[2].name rack0
-group[2].index 0.0
-group[2].nodes[1]
-group[2].nodes[0].index 0
-group[3].name rack1
-group[3].index 0.1
-group[3].nodes[1]
-group[3].nodes[0].index 1
-group[4].name switch1
-group[4].index 1
-group[4].partitions *
-group[4].nodes[0]
-group[5].name rack0
-group[5].index 1.0
-group[5].nodes[1]
-group[5].nodes[0].index 2
-group[6].name rack1
-group[6].index 1.1
-group[6].nodes[1]
-group[6].nodes[0].index 3
-)");
-
- // Note: leaf groups do not have a partition spec, only inner groups.
- vespalib::string expected_global_config(
-R"(redundancy 4
-initial_redundancy 0
-ensure_primary_persisted true
-ready_copies 4
-active_per_leaf_group true
-group[0].index "invalid"
-group[0].name "invalid"
-group[0].capacity 1
-group[0].partitions "*|*"
-group[1].index "0"
-group[1].name "switch0"
-group[1].capacity 1
-group[1].partitions "*|*"
-group[2].index "0.0"
-group[2].name "rack0"
-group[2].capacity 1
-group[2].partitions ""
-group[2].nodes[0].index 0
-group[2].nodes[0].retired false
-group[3].index "0.1"
-group[3].name "rack1"
-group[3].capacity 1
-group[3].partitions ""
-group[3].nodes[0].index 1
-group[3].nodes[0].retired false
-group[4].index "1"
-group[4].name "switch1"
-group[4].capacity 1
-group[4].partitions "*|*"
-group[5].index "1.0"
-group[5].name "rack0"
-group[5].capacity 1
-group[5].partitions ""
-group[5].nodes[0].index 2
-group[5].nodes[0].retired false
-group[6].index "1.1"
-group[6].name "rack1"
-group[6].capacity 1
-group[6].partitions ""
-group[6].nodes[0].index 3
-group[6].nodes[0].retired false
-)");
- EXPECT_EQ(expected_global_config, default_to_global_config(default_config));
-}
-
-// FIXME partition specs are order-invariant with regards to groups, so heterogenous
-// setups will not produce the expected replica distribution.
-// TODO Consider disallowing entirely when using global docs.
-TEST(GlobalBucketSpaceDistributionConverterTest, can_transform_heterogenous_multi_group_config) {
- vespalib::string default_config(
-R"(redundancy 2
-ready_copies 2
-group[3]
-group[0].name "invalid"
-group[0].index "invalid"
-group[0].partitions "1|*"
-group[0].nodes[0]
-group[1].name rack0
-group[1].index 0
-group[1].nodes[2]
-group[1].nodes[0].index 0
-group[1].nodes[1].index 1
-group[2].name rack1
-group[2].index 1
-group[2].nodes[1]
-group[2].nodes[1].index 2
-)");
-
- vespalib::string expected_global_config(
-R"(redundancy 3
-initial_redundancy 0
-ensure_primary_persisted true
-ready_copies 3
-active_per_leaf_group true
-group[0].index "invalid"
-group[0].name "invalid"
-group[0].capacity 1
-group[0].partitions "*|*"
-group[1].index "0"
-group[1].name "rack0"
-group[1].capacity 1
-group[1].partitions ""
-group[1].nodes[0].index 0
-group[1].nodes[0].retired false
-group[1].nodes[1].index 1
-group[1].nodes[1].retired false
-group[2].index "1"
-group[2].name "rack1"
-group[2].capacity 1
-group[2].partitions ""
-group[2].nodes[0].index 2
-group[2].nodes[0].retired false
-)");
- EXPECT_EQ(expected_global_config, default_to_global_config(default_config));
-}
-
-TEST(GlobalBucketSpaceDistributionConverterTest, can_transform_concrete_distribution_instance) {
- auto default_cfg = GlobalBucketSpaceDistributionConverter::string_to_config(default_flat_config);
- lib::Distribution flat_distr(*default_cfg);
- auto global_distr = GlobalBucketSpaceDistributionConverter::convert_to_global(flat_distr);
- EXPECT_EQ(expected_flat_global_config, global_distr->serialize());
-}
-
-TEST(GlobalBucketSpaceDistributionConverterTest, config_retired_state_is_propagated) {
- vespalib::string default_config(
-R"(redundancy 1
-group[1]
-group[0].name "invalid"
-group[0].index "invalid"
-group[0].partitions 1|*
-group[0].nodes[3]
-group[0].nodes[0].index 0
-group[0].nodes[0].retired false
-group[0].nodes[1].index 1
-group[0].nodes[1].retired true
-group[0].nodes[2].index 2
-group[0].nodes[2].retired true
-)");
-
- auto default_cfg = GlobalBucketSpaceDistributionConverter::string_to_config(default_config);
- auto as_global = GlobalBucketSpaceDistributionConverter::convert_to_global(*default_cfg);
-
- ASSERT_EQ(1, as_global->group.size());
- ASSERT_EQ(3, as_global->group[0].nodes.size());
- EXPECT_FALSE(as_global->group[0].nodes[0].retired);
- EXPECT_TRUE(as_global->group[0].nodes[1].retired);
- EXPECT_TRUE(as_global->group[0].nodes[2].retired);
-}
-
-TEST(GlobalBucketSpaceDistributionConverterTest, group_capacities_are_propagated) {
- vespalib::string default_config(
-R"(redundancy 2
-group[3]
-group[0].name "invalid"
-group[0].index "invalid"
-group[0].partitions 1|*
-group[0].capacity 5
-group[0].nodes[0]
-group[1].name rack0
-group[1].index 0
-group[1].capacity 2
-group[1].nodes[1]
-group[1].nodes[0].index 0
-group[2].name rack1
-group[2].capacity 3
-group[2].index 1
-group[2].nodes[1]
-group[2].nodes[0].index 1
-)");
- auto default_cfg = GlobalBucketSpaceDistributionConverter::string_to_config(default_config);
- auto as_global = GlobalBucketSpaceDistributionConverter::convert_to_global(*default_cfg);
-
- ASSERT_EQ(3, as_global->group.size());
- EXPECT_DOUBLE_EQ(5.0, as_global->group[0].capacity);
- EXPECT_DOUBLE_EQ(2.0, as_global->group[1].capacity);
- EXPECT_DOUBLE_EQ(3.0, as_global->group[2].capacity);
-}
-
-TEST(GlobalBucketSpaceDistributionConverterTest, global_distribution_has_same_owner_distributors_as_default) {
- vespalib::string default_config(
-R"(redundancy 2
-ready_copies 2
-group[3]
-group[0].name "invalid"
-group[0].index "invalid"
-group[0].partitions 1|*
-group[0].nodes[0]
-group[1].name rack0
-group[1].index 0
-group[1].nodes[1]
-group[1].nodes[0].index 0
-group[2].name rack1
-group[2].index 1
-group[2].nodes[2]
-group[2].nodes[0].index 1
-group[2].nodes[1].index 2
-)");
-
- auto default_cfg = GlobalBucketSpaceDistributionConverter::string_to_config(default_config);
- auto global_cfg = GlobalBucketSpaceDistributionConverter::convert_to_global(*default_cfg);
-
- lib::Distribution default_distr(*default_cfg);
- lib::Distribution global_distr(*global_cfg);
- lib::ClusterState state("distributor:6 storage:6");
-
- for (unsigned int i = 0; i < UINT16_MAX; ++i) {
- document::BucketId bucket(16, i);
- const auto default_index = default_distr.getIdealDistributorNode(state, bucket, "ui");
- const auto global_index = global_distr.getIdealDistributorNode(state, bucket, "ui");
- ASSERT_EQ(default_index, global_index);
- }
-}
-
-}
diff --git a/storage/src/tests/common/metricstest.cpp b/storage/src/tests/common/metricstest.cpp
index 899c1979e86..6ca84f0304a 100644
--- a/storage/src/tests/common/metricstest.cpp
+++ b/storage/src/tests/common/metricstest.cpp
@@ -1,5 +1,9 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <tests/common/dummystoragelink.h>
+#include <tests/common/storage_config_set.h>
+#include <tests/common/teststorageapp.h>
+#include <tests/common/testhelper.h>
#include <vespa/storageapi/message/persistence.h>
#include <vespa/storageframework/defaultimplementation/clock/fakeclock.h>
#include <vespa/storage/bucketdb/bucketmanager.h>
@@ -7,9 +11,6 @@
#include <vespa/storage/persistence/filestorage/filestormanager.h>
#include <vespa/storage/persistence/filestorage/filestormetrics.h>
#include <vespa/storage/visiting/visitormetrics.h>
-#include <tests/common/teststorageapp.h>
-#include <tests/common/testhelper.h>
-#include <tests/common/dummystoragelink.h>
#include <vespa/metrics/metricmanager.h>
#include <vespa/config/common/exceptions.h>
#include <vespa/vespalib/gtest/gtest.h>
@@ -26,10 +27,10 @@ namespace storage {
struct MetricsTest : public Test {
framework::defaultimplementation::FakeClock* _clock;
+ std::unique_ptr<StorageConfigSet> _config;
std::unique_ptr<TestServiceLayerApp> _node;
std::unique_ptr<DummyStorageLink> _top;
std::unique_ptr<StatusMetricConsumer> _metricsConsumer;
- std::unique_ptr<vdstestlib::DirConfig> _config;
std::unique_ptr<metrics::MetricSet> _topSet;
std::unique_ptr<metrics::MetricManager> _metricManager;
std::shared_ptr<FileStorMetrics> _filestorMetrics;
@@ -66,17 +67,13 @@ MetricsTest::MetricsTest()
MetricsTest::~MetricsTest() = default;
void MetricsTest::SetUp() {
- _config = std::make_unique<vdstestlib::DirConfig>(getStandardConfig(true, "metricstest"));
- std::filesystem::remove_all(std::filesystem::path(getRootFolder(*_config)));
- try {
- _node = std::make_unique<TestServiceLayerApp>(NodeIndex(0), _config->getConfigId());
- _node->setupDummyPersistence();
- _clock = &_node->getClock();
- _clock->setAbsoluteTimeInSeconds(1000000);
- _top = std::make_unique<DummyStorageLink>();
- } catch (config::InvalidConfigException& e) {
- fprintf(stderr, "%s\n", e.what());
- }
+ _config = StorageConfigSet::make_storage_node_config();
+ _node = std::make_unique<TestServiceLayerApp>(NodeIndex(0), _config->config_uri());
+ _node->setupDummyPersistence();
+ _clock = &_node->getClock();
+ _clock->setAbsoluteTimeInSeconds(1000000);
+ _top = std::make_unique<DummyStorageLink>();
+
_metricManager = std::make_unique<metrics::MetricManager>(std::make_unique<MetricClock>(*_clock));
_topSet.reset(new metrics::MetricSet("vds", {}, ""));
{
@@ -96,7 +93,7 @@ void MetricsTest::SetUp() {
_visitorMetrics = std::make_shared<VisitorMetrics>();
_visitorMetrics->initThreads(4);
_topSet->registerMetric(*_visitorMetrics);
- _metricManager->init(config::ConfigUri(_config->getConfigId()));
+ _metricManager->init(_config->config_uri());
}
void MetricsTest::TearDown() {
diff --git a/storage/src/tests/common/storage_config_set.cpp b/storage/src/tests/common/storage_config_set.cpp
new file mode 100644
index 00000000000..ed10e113867
--- /dev/null
+++ b/storage/src/tests/common/storage_config_set.cpp
@@ -0,0 +1,153 @@
+// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "storage_config_set.h"
+#include <vespa/config-bucketspaces.h>
+#include <vespa/config-persistence.h>
+#include <vespa/config-slobroks.h>
+#include <vespa/config-stor-distribution.h>
+#include <vespa/config-stor-filestor.h>
+#include <vespa/config-upgrading.h>
+#include <vespa/document/repo/configbuilder.h>
+#include <vespa/document/repo/document_type_repo_factory.h>
+#include <vespa/document/repo/documenttyperepo.h>
+#include <vespa/messagebus/config-messagebus.h>
+#include <vespa/messagebus/testlib/slobrok.h>
+#include <vespa/metrics/config-metricsmanager.h>
+#include <vespa/storage/config/config-stor-bouncer.h>
+#include <vespa/storage/config/config-stor-communicationmanager.h>
+#include <vespa/storage/config/config-stor-distributormanager.h>
+#include <vespa/storage/config/config-stor-prioritymapping.h>
+#include <vespa/storage/config/config-stor-server.h>
+#include <vespa/storage/config/config-stor-status.h>
+#include <vespa/storage/config/config-stor-visitordispatcher.h>
+#include <vespa/storage/visiting/config-stor-visitor.h>
+#include <vespa/document/base/testdocrepo.h>
+#include <vespa/vespalib/util/stringfmt.h>
+
+namespace storage {
+
+StorageConfigSet::StorageConfigSet(vespalib::string config_id_str, bool is_storage_node)
+ : _document_type_config(std::make_unique<DocumenttypesConfigBuilder>()),
+ _slobroks_config(std::make_unique<SlobroksConfigBuilder>()),
+ _messagebus_config(std::make_unique<MessagebusConfigBuilder>()),
+ _metrics_config(std::make_unique<MetricsmanagerConfigBuilder>()),
+ _persistence_config(std::make_unique<PersistenceConfigBuilder>()),
+ _distribution_config(std::make_unique<StorDistributionConfigBuilder>()),
+ _filestor_config(std::make_unique<StorFilestorConfigBuilder>()),
+ _upgrading_config(std::make_unique<UpgradingConfigBuilder>()),
+ _bucket_spaces_config(std::make_unique<BucketspacesConfigBuilder>()),
+ _bouncer_config(std::make_unique<StorBouncerConfigBuilder>()),
+ _communication_manager_config(std::make_unique<StorCommunicationmanagerConfigBuilder>()),
+ _distributor_manager_config(std::make_unique<StorDistributormanagerConfigBuilder>()),
+ _priority_mapping_config(std::make_unique<StorPrioritymappingConfigBuilder>()),
+ _server_config(std::make_unique<StorServerConfigBuilder>()),
+ _status_config(std::make_unique<StorStatusConfigBuilder>()),
+ _visitor_config(std::make_unique<StorVisitorConfigBuilder>()),
+ _visitor_dispatcher_config(std::make_unique<StorVisitordispatcherConfigBuilder>()),
+ _config_id_str(std::move(config_id_str)),
+ _config_ctx(std::make_shared<config::ConfigContext>(_config_set)),
+ _config_uri(_config_id_str, _config_ctx)
+{
+ _config_set.addBuilder(_config_id_str, _document_type_config.get());
+ _config_set.addBuilder(_config_id_str, _slobroks_config.get());
+ _config_set.addBuilder(_config_id_str, _messagebus_config.get());
+ _config_set.addBuilder(_config_id_str, _metrics_config.get());
+ _config_set.addBuilder(_config_id_str, _persistence_config.get());
+ _config_set.addBuilder(_config_id_str, _distribution_config.get());
+ _config_set.addBuilder(_config_id_str, _filestor_config.get());
+ _config_set.addBuilder(_config_id_str, _upgrading_config.get());
+ _config_set.addBuilder(_config_id_str, _bucket_spaces_config.get());
+ _config_set.addBuilder(_config_id_str, _bouncer_config.get());
+ _config_set.addBuilder(_config_id_str, _communication_manager_config.get());
+ _config_set.addBuilder(_config_id_str, _distributor_manager_config.get());
+ _config_set.addBuilder(_config_id_str, _priority_mapping_config.get());
+ _config_set.addBuilder(_config_id_str, _server_config.get());
+ _config_set.addBuilder(_config_id_str, _status_config.get());
+ _config_set.addBuilder(_config_id_str, _visitor_config.get());
+ _config_set.addBuilder(_config_id_str, _visitor_dispatcher_config.get());
+
+ init_default_configs(is_storage_node);
+ _config_ctx->reload();
+}
+
+StorageConfigSet::~StorageConfigSet() = default;
+
+void StorageConfigSet::init_default_configs(bool is_storage_node) {
+ // Most configs are left with their default values, with explicit values being a
+ // union of the legacy DirConfig test helpers.
+ *_document_type_config = document::TestDocRepo().getTypeConfig();
+
+ add_metric_consumer("status", {"*"});
+ add_metric_consumer("statereporter", {"*"});
+
+ add_distribution_config(50);
+ add_bucket_space_mapping("testdoctype1", "default");
+
+ _communication_manager_config->rpcport = 0;
+ _communication_manager_config->mbusport = 0;
+
+ _distributor_manager_config->splitcount = 1000;
+ _distributor_manager_config->splitsize = 10000000;
+ _distributor_manager_config->joincount = 500;
+ _distributor_manager_config->joinsize = 5000000;
+ _distributor_manager_config->maxClusterClockSkewSec = 0;
+
+ _filestor_config->numThreads = 1;
+ _filestor_config->numResponseThreads = 1;
+
+ _persistence_config->abortOperationsWithChangedBucketOwnership = true;
+
+ _server_config->clusterName = "storage";
+ _server_config->nodeIndex = 0;
+ _server_config->isDistributor = !is_storage_node;
+ _server_config->maxMergesPerNode = 25;
+ _server_config->maxMergeQueueSize = 20;
+ _server_config->resourceExhaustionMergeBackPressureDurationSecs = 15.0;
+ _server_config->writePidFileOnStartup = false;
+
+ _status_config->httpport = 0;
+
+ _visitor_config->maxconcurrentvisitorsFixed = 4;
+ _visitor_config->maxconcurrentvisitorsVariable = 0;
+}
+
+void StorageConfigSet::add_bucket_space_mapping(vespalib::string doc_type, vespalib::string bucket_space_name) {
+ BucketspacesConfigBuilder::Documenttype type;
+ type.name = std::move(doc_type);
+ type.bucketspace = std::move(bucket_space_name);
+ _bucket_spaces_config->documenttype.emplace_back(std::move(type));
+}
+
+void StorageConfigSet::add_distribution_config(uint16_t nodes_in_top_level_group) {
+ StorDistributionConfigBuilder::Group group;
+ group.name = "invalid";
+ group.index = "invalid";
+ for (uint16_t i = 0; i < nodes_in_top_level_group; ++i) {
+ StorDistributionConfigBuilder::Group::Nodes node;
+ node.index = i;
+ group.nodes.emplace_back(std::move(node));
+ }
+ _distribution_config->group.clear();
+ _distribution_config->group.emplace_back(std::move(group));
+ _distribution_config->redundancy = 2;
+}
+
+void StorageConfigSet::add_metric_consumer(vespalib::string name, const std::vector<vespalib::string>& added_metrics) {
+ MetricsmanagerConfigBuilder::Consumer consumer;
+ consumer.name = std::move(name);
+ consumer.addedmetrics.assign(added_metrics.begin(), added_metrics.end());
+ _metrics_config->consumer.emplace_back(std::move(consumer));
+}
+
+void StorageConfigSet::set_node_index(uint16_t node_index) {
+ _server_config->nodeIndex = node_index;
+}
+
+void StorageConfigSet::set_slobrok_config_port(int slobrok_port) {
+ SlobroksConfigBuilder::Slobrok slobrok;
+ slobrok.connectionspec = vespalib::make_string("tcp/localhost:%d", slobrok_port);
+ _slobroks_config->slobrok.clear();
+ _slobroks_config->slobrok.emplace_back(std::move(slobrok));
+}
+
+} // storage
diff --git a/storage/src/tests/common/storage_config_set.h b/storage/src/tests/common/storage_config_set.h
new file mode 100644
index 00000000000..66cdeaf527f
--- /dev/null
+++ b/storage/src/tests/common/storage_config_set.h
@@ -0,0 +1,122 @@
+// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/config/common/configcontext.h>
+#include <vespa/config/subscription/configuri.h>
+#include <vespa/config/subscription/sourcespec.h>
+#include <memory>
+
+// FIXME "internal" here is very counter-productive since it precludes easy fwd decls.
+// Currently have to punch holes in internal abstractions to make this work at all.
+namespace cloud::config::internal { class InternalSlobroksType; }
+namespace messagebus::internal { class InternalMessagebusType; }
+namespace metrics::internal { class InternalMetricsmanagerType; }
+namespace document::config::internal { class InternalDocumenttypesType; }
+namespace vespa::config::content::internal {
+class InternalPersistenceType;
+class InternalStorDistributionType;
+class InternalStorFilestorType;
+class InternalUpgradingType;
+}
+namespace vespa::config::content::core::internal {
+class InternalBucketspacesType;
+class InternalStorBouncerType;
+class InternalStorCommunicationmanagerType;
+class InternalStorDistributormanagerType;
+class InternalStorPrioritymappingType;
+class InternalStorServerType;
+class InternalStorStatusType;
+class InternalStorVisitorType;
+class InternalStorVisitordispatcherType;
+}
+
+namespace storage {
+
+class StorageConfigSet {
+ using SlobroksConfigBuilder = cloud::config::internal::InternalSlobroksType;
+ using MessagebusConfigBuilder = messagebus::internal::InternalMessagebusType;
+ using MetricsmanagerConfigBuilder = metrics::internal::InternalMetricsmanagerType;
+ using DocumenttypesConfigBuilder = document::config::internal::InternalDocumenttypesType;
+ using PersistenceConfigBuilder = vespa::config::content::internal::InternalPersistenceType;
+ using StorDistributionConfigBuilder = vespa::config::content::internal::InternalStorDistributionType;
+ using StorFilestorConfigBuilder = vespa::config::content::internal::InternalStorFilestorType;
+ using UpgradingConfigBuilder = vespa::config::content::internal::InternalUpgradingType;
+ using BucketspacesConfigBuilder = vespa::config::content::core::internal::InternalBucketspacesType;
+ using StorBouncerConfigBuilder = vespa::config::content::core::internal::InternalStorBouncerType;
+ using StorCommunicationmanagerConfigBuilder = vespa::config::content::core::internal::InternalStorCommunicationmanagerType;
+ using StorDistributormanagerConfigBuilder = vespa::config::content::core::internal::InternalStorDistributormanagerType;
+ using StorPrioritymappingConfigBuilder = vespa::config::content::core::internal::InternalStorPrioritymappingType;
+ using StorServerConfigBuilder = vespa::config::content::core::internal::InternalStorServerType;
+ using StorStatusConfigBuilder = vespa::config::content::core::internal::InternalStorStatusType;
+ using StorVisitorConfigBuilder = vespa::config::content::core::internal::InternalStorVisitorType;
+ using StorVisitordispatcherConfigBuilder = vespa::config::content::core::internal::InternalStorVisitordispatcherType;
+
+ std::unique_ptr<DocumenttypesConfigBuilder> _document_type_config;
+ std::unique_ptr<SlobroksConfigBuilder> _slobroks_config;
+ std::unique_ptr<MessagebusConfigBuilder> _messagebus_config;
+ std::unique_ptr<MetricsmanagerConfigBuilder> _metrics_config;
+ std::unique_ptr<PersistenceConfigBuilder> _persistence_config;
+ std::unique_ptr<StorDistributionConfigBuilder> _distribution_config;
+ std::unique_ptr<StorFilestorConfigBuilder> _filestor_config;
+ std::unique_ptr<UpgradingConfigBuilder> _upgrading_config;
+ std::unique_ptr<BucketspacesConfigBuilder> _bucket_spaces_config;
+ std::unique_ptr<StorBouncerConfigBuilder> _bouncer_config;
+ std::unique_ptr<StorCommunicationmanagerConfigBuilder> _communication_manager_config;
+ std::unique_ptr<StorDistributormanagerConfigBuilder> _distributor_manager_config;
+ std::unique_ptr<StorPrioritymappingConfigBuilder> _priority_mapping_config; // TODO removable?
+ std::unique_ptr<StorServerConfigBuilder> _server_config;
+ std::unique_ptr<StorStatusConfigBuilder> _status_config;
+ std::unique_ptr<StorVisitorConfigBuilder> _visitor_config;
+ std::unique_ptr<StorVisitordispatcherConfigBuilder> _visitor_dispatcher_config;
+
+ vespalib::string _config_id_str;
+ config::ConfigSet _config_set;
+ std::shared_ptr<config::ConfigContext> _config_ctx;
+ config::ConfigUri _config_uri;
+
+public:
+ StorageConfigSet(vespalib::string config_id_str, bool is_storage_node);
+ ~StorageConfigSet();
+
+ void init_default_configs(bool is_storage_node);
+ void add_bucket_space_mapping(vespalib::string doc_type, vespalib::string bucket_space_name);
+ void add_metric_consumer(vespalib::string name, const std::vector<vespalib::string>& added_metrics);
+ void add_distribution_config(uint16_t nodes_in_top_level_group);
+ void set_slobrok_config_port(int slobrok_port);
+ void set_node_index(uint16_t node_index);
+
+ [[nodiscard]] const config::ConfigUri& config_uri() const noexcept {
+ return _config_uri;
+ }
+
+ DocumenttypesConfigBuilder& document_type_config() noexcept { return *_document_type_config; }
+ SlobroksConfigBuilder& slobroks_config() noexcept { return *_slobroks_config; }
+ MessagebusConfigBuilder& messagebus_config() noexcept {return *_messagebus_config; }
+ MetricsmanagerConfigBuilder& metrics_config() noexcept { return *_metrics_config; }
+ PersistenceConfigBuilder& persistence_config() noexcept { return *_persistence_config; }
+ StorDistributionConfigBuilder& distribution_config() noexcept { return *_distribution_config; }
+ StorFilestorConfigBuilder& filestor_config() noexcept { return *_filestor_config; }
+ BucketspacesConfigBuilder& bucket_spaces_config() noexcept { return *_bucket_spaces_config; }
+ StorBouncerConfigBuilder& bouncer_config() noexcept { return *_bouncer_config; };
+ StorCommunicationmanagerConfigBuilder& communication_manager_config() noexcept { return *_communication_manager_config; }
+ StorDistributormanagerConfigBuilder& distributor_manager_config() noexcept { return *_distributor_manager_config; }
+ StorServerConfigBuilder& server_config() noexcept { return *_server_config; }
+ StorStatusConfigBuilder& status_config() noexcept { return *_status_config; }
+ StorVisitorConfigBuilder& visitor_config() noexcept { return *_visitor_config; }
+ StorVisitordispatcherConfigBuilder& visitor_dispatcher_config() noexcept { return *_visitor_dispatcher_config; }
+
+ [[nodiscard]] static std::unique_ptr<StorageConfigSet> make_node_config(bool is_storage_node) {
+ return std::make_unique<StorageConfigSet>("my-node", is_storage_node);
+ }
+
+ [[nodiscard]] static std::unique_ptr<StorageConfigSet> make_storage_node_config() {
+ return make_node_config(true);
+ }
+
+ [[nodiscard]] static std::unique_ptr<StorageConfigSet> make_distributor_node_config() {
+ return make_node_config(false);
+ }
+};
+
+}
diff --git a/storage/src/tests/common/testhelper.cpp b/storage/src/tests/common/testhelper.cpp
deleted file mode 100644
index 4ca935b7904..00000000000
--- a/storage/src/tests/common/testhelper.cpp
+++ /dev/null
@@ -1,148 +0,0 @@
-// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <tests/common/testhelper.h>
-
-#include <vespa/log/log.h>
-#include <vespa/vespalib/io/fileutil.h>
-#include <vespa/vespalib/testkit/test_kit.h>
-
-LOG_SETUP(".testhelper");
-
-namespace storage {
-
-void addStorageDistributionConfig(vdstestlib::DirConfig& dc)
-{
- vdstestlib::DirConfig::Config* config;
- config = &dc.getConfig("stor-distribution", true);
- config->clear();
- config->set("group[1]");
- config->set("group[0].name", "invalid");
- config->set("group[0].index", "invalid");
- config->set("group[0].nodes[50]");
- config->set("redundancy", "2");
-
- for (uint32_t i = 0; i < 50; i++) {
- std::ostringstream key; key << "group[0].nodes[" << i << "].index";
- std::ostringstream val; val << i;
- config->set(key.str(), val.str());
- }
-}
-
-std::string getRootFolder(vdstestlib::DirConfig & dc) {
- std::string defaultValue("");
- return dc.getConfig("stor-server").getValue("root_folder", defaultValue);
-}
-
-vdstestlib::DirConfig getStandardConfig(bool storagenode, const std::string & rootOfRoot) {
- std::string clusterName("storage");
- vdstestlib::DirConfig dc;
- vdstestlib::DirConfig::Config* config;
- config = &dc.addConfig("fleetcontroller");
- config->set("cluster_name", clusterName);
- config->set("index", "0");
- config->set("zookeeper_server", "\"\"");
- config->set("total_distributor_count", "10");
- config->set("total_storage_count", "10");
- config = &dc.addConfig("upgrading");
- config = &dc.addConfig("load-type");
- config = &dc.addConfig("bucket");
- config = &dc.addConfig("messagebus");
- config = &dc.addConfig("stor-prioritymapping");
- config = &dc.addConfig("stor-bucketdbupdater");
- config = &dc.addConfig("metricsmanager");
- config->set("consumer[2]");
- config->set("consumer[0].name", "\"status\"");
- config->set("consumer[0].addedmetrics[1]");
- config->set("consumer[0].addedmetrics[0]", "\"*\"");
- config->set("consumer[1].name", "\"statereporter\"");
- config->set("consumer[1].addedmetrics[1]");
- config->set("consumer[1].addedmetrics[0]", "\"*\"");
- config = &dc.addConfig("stor-communicationmanager");
- config->set("rpcport", "0");
- config->set("mbusport", "0");
- config = &dc.addConfig("stor-distributormanager");
- config->set("splitcount", "1000");
- config->set("splitsize", "10000000");
- config->set("joincount", "500");
- config->set("joinsize", "5000000");
- config->set("max_clock_skew_sec", "0");
- config = &dc.addConfig("persistence");
- config->set("abort_operations_with_changed_bucket_ownership", "true");
- config = &dc.addConfig("stor-filestor");
- // Easier to see what goes wrong with only 1 thread per disk.
- config->set("num_threads", "1");
- config->set("num_response_threads", "1");
- config->set("maximum_versions_of_single_document_stored", "0");
- config->set("keep_remove_time_period", "2000000000");
- config->set("revert_time_period", "2000000000");
- // Don't want test to call exit()
- config->set("fail_disk_after_error_count", "0");
- config = &dc.addConfig("stor-bouncer");
- config = &dc.addConfig("stor-server");
- config->set("cluster_name", clusterName);
- config->set("enable_dead_lock_detector", "false");
- config->set("enable_dead_lock_detector_warnings", "false");
- config->set("max_merges_per_node", "25");
- config->set("max_merge_queue_size", "20");
- config->set("resource_exhaustion_merge_back_pressure_duration_secs", "15.0");
- vespalib::string rootFolder = rootOfRoot + "_";
- rootFolder += (storagenode ? "vdsroot" : "vdsroot.distributor");
- config->set("root_folder", rootFolder);
- config->set("is_distributor", (storagenode ? "false" : "true"));
- config = &dc.addConfig("stor-devices");
- config->set("root_folder", rootFolder);
- config = &dc.addConfig("stor-status");
- config->set("httpport", "0");
- config = &dc.addConfig("stor-visitor");
- config->set("defaultdocblocksize", "8192");
- // By default, need "old" behaviour of maxconcurrent
- config->set("maxconcurrentvisitors_fixed", "4");
- config->set("maxconcurrentvisitors_variable", "0");
- config = &dc.addConfig("stor-visitordispatcher");
- addFileConfig(dc, "documenttypes", TEST_PATH("config-doctypes.cfg"));
- addStorageDistributionConfig(dc);
- return dc;
-}
-
-void addSlobrokConfig(vdstestlib::DirConfig& dc,
- const mbus::Slobrok& slobrok)
-{
- std::ostringstream ost;
- ost << "tcp/localhost:" << slobrok.port();
- vdstestlib::DirConfig::Config* config;
- config = &dc.getConfig("slobroks", true);
- config->clear();
- config->set("slobrok[1]");
- config->set("slobrok[0].connectionspec", ost.str());
-}
-
-void addFileConfig(vdstestlib::DirConfig& dc,
- const std::string& configDefName,
- const std::string& fileName)
-{
- vdstestlib::DirConfig::Config* config;
- config = &dc.getConfig(configDefName, true);
- config->clear();
- std::ifstream in(fileName.c_str());
- std::string line;
- while (std::getline(in, line, '\n')) {
- std::string::size_type pos = line.find(' ');
- if (pos == std::string::npos) {
- config->set(line);
- } else {
- config->set(line.substr(0, pos), line.substr(pos + 1));
- }
- }
- in.close();
-}
-
-TestName::TestName(const std::string& n)
- : name(n)
-{
- LOG(debug, "Starting test %s", name.c_str());
-}
-
-TestName::~TestName() {
- LOG(debug, "Done with test %s", name.c_str());
-}
-
-} // storage
diff --git a/storage/src/tests/common/testhelper.h b/storage/src/tests/common/testhelper.h
index 1f83e938409..9f9b50652f9 100644
--- a/storage/src/tests/common/testhelper.h
+++ b/storage/src/tests/common/testhelper.h
@@ -1,39 +1,15 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
+
#include <vespa/config/helper/configgetter.h>
-#include <vespa/messagebus/testlib/slobrok.h>
-#include <vespa/vdstestlib/config/dirconfig.h>
-#include <fstream>
-#include <sstream>
+#include <vespa/config/subscription/configuri.h>
namespace storage {
-void addFileConfig(vdstestlib::DirConfig& dc,
- const std::string& configDefName,
- const std::string& fileName);
-
-
-void addStorageDistributionConfig(vdstestlib::DirConfig& dc);
-
-vdstestlib::DirConfig getStandardConfig(bool storagenode, const std::string & rootFolder = "todo-make-unique");
-
-std::string getRootFolder(vdstestlib::DirConfig & dc);
-
-void addSlobrokConfig(vdstestlib::DirConfig& dc,
- const mbus::Slobrok& slobrok);
-
template <typename ConfigT>
std::unique_ptr<ConfigT> config_from(const ::config::ConfigUri& cfg_uri) {
return ::config::ConfigGetter<ConfigT>::getConfig(cfg_uri.getConfigId(), cfg_uri.getContext());
}
-// Class used to print start and end of test. Enable debug when you want to see
-// which test creates what output or where we get stuck
-struct TestName {
- std::string name;
- TestName(const std::string& n);
- ~TestName();
-};
-
} // storage
diff --git a/storage/src/tests/common/teststorageapp.cpp b/storage/src/tests/common/teststorageapp.cpp
index e2e2de10702..d811f100aec 100644
--- a/storage/src/tests/common/teststorageapp.cpp
+++ b/storage/src/tests/common/teststorageapp.cpp
@@ -26,31 +26,31 @@ namespace storage {
TestStorageApp::TestStorageApp(StorageComponentRegisterImpl::UP compReg,
const lib::NodeType& type, NodeIndex index,
- vespalib::stringref configId)
+ const config::ConfigUri& config_uri)
: TestComponentRegister(ComponentRegisterImpl::UP(std::move(compReg))),
_compReg(dynamic_cast<StorageComponentRegisterImpl&>(TestComponentRegister::getComponentRegister())),
_docMan(),
_nodeStateUpdater(type),
- _configId(configId),
+ _configId(config_uri.getConfigId()),
_node_identity("test_cluster", type, index),
_initialized(false)
{
- // Use config to adjust values
+ // Use config to adjust values
vespalib::string clusterName = "mycluster";
uint32_t redundancy = 2;
uint32_t nodeCount = 10;
- if (!configId.empty()) {
- config::ConfigUri uri(configId);
- std::unique_ptr<vespa::config::content::core::StorServerConfig> serverConfig = config::ConfigGetter<vespa::config::content::core::StorServerConfig>::getConfig(uri.getConfigId(), uri.getContext());
- clusterName = serverConfig->clusterName;
- if (index == 0xffff) index = serverConfig->nodeIndex;
- redundancy = config::ConfigGetter<vespa::config::content::StorDistributionConfig>::getConfig(uri.getConfigId(), uri.getContext())->redundancy;
- nodeCount = config::ConfigGetter<vespa::config::content::FleetcontrollerConfig>::getConfig(uri.getConfigId(), uri.getContext())->totalStorageCount;
- } else {
- if (index == 0xffff) index = 0;
+ auto serverConfig = config::ConfigGetter<vespa::config::content::core::StorServerConfig>::getConfig(config_uri.getConfigId(), config_uri.getContext());
+ clusterName = serverConfig->clusterName;
+ if (index == 0xffff) {
+ index = serverConfig->nodeIndex;
+ }
+ redundancy = config::ConfigGetter<vespa::config::content::StorDistributionConfig>::getConfig(config_uri.getConfigId(), config_uri.getContext())->redundancy;
+ if (index >= nodeCount) {
+ nodeCount = index + 1;
+ }
+ if (redundancy > nodeCount) {
+ redundancy = nodeCount;
}
- if (index >= nodeCount) nodeCount = index + 1;
- if (redundancy > nodeCount) redundancy = nodeCount;
_compReg.setNodeInfo(clusterName, type, index);
_compReg.setNodeStateUpdater(_nodeStateUpdater);
@@ -84,21 +84,17 @@ TestStorageApp::setClusterState(const lib::ClusterState& c)
}
namespace {
-NodeIndex getIndexFromConfig(vespalib::stringref configId) {
- if (!configId.empty()) {
- config::ConfigUri uri(configId);
- return NodeIndex(
- config::ConfigGetter<vespa::config::content::core::StorServerConfig>::getConfig(uri.getConfigId(), uri.getContext())->nodeIndex);
- }
- return NodeIndex(0);
+
+NodeIndex node_index_from_config(const config::ConfigUri& uri) {
+ return NodeIndex(config::ConfigGetter<vespa::config::content::core::StorServerConfig>::getConfig(uri.getConfigId(), uri.getContext())->nodeIndex);
}
VESPA_THREAD_STACK_TAG(test_executor)
}
-TestServiceLayerApp::TestServiceLayerApp(vespalib::stringref configId)
+TestServiceLayerApp::TestServiceLayerApp(NodeIndex index, const config::ConfigUri& config_uri)
: TestStorageApp(std::make_unique<ServiceLayerComponentRegisterImpl>(ContentBucketDbOptions()),
- lib::NodeType::STORAGE, getIndexFromConfig(configId), configId),
+ lib::NodeType::STORAGE, index, config_uri),
_compReg(dynamic_cast<ServiceLayerComponentRegisterImpl&>(TestStorageApp::getComponentRegister())),
_persistenceProvider(),
_executor(vespalib::SequencedTaskExecutor::create(test_executor, 1)),
@@ -108,17 +104,9 @@ TestServiceLayerApp::TestServiceLayerApp(vespalib::stringref configId)
_nodeStateUpdater.setReportedNodeState(ns);
}
-TestServiceLayerApp::TestServiceLayerApp(NodeIndex index,
- vespalib::stringref configId)
- : TestStorageApp(std::make_unique<ServiceLayerComponentRegisterImpl>(ContentBucketDbOptions()),
- lib::NodeType::STORAGE, index, configId),
- _compReg(dynamic_cast<ServiceLayerComponentRegisterImpl&>(TestStorageApp::getComponentRegister())),
- _persistenceProvider(),
- _executor(vespalib::SequencedTaskExecutor::create(test_executor, 1)),
- _host_info()
+TestServiceLayerApp::TestServiceLayerApp(const config::ConfigUri& config_uri)
+ : TestServiceLayerApp(node_index_from_config(config_uri), config_uri)
{
- lib::NodeState ns(*_nodeStateUpdater.getReportedNodeState());
- _nodeStateUpdater.setReportedNodeState(ns);
}
TestServiceLayerApp::~TestServiceLayerApp() = default;
@@ -147,45 +135,37 @@ TestServiceLayerApp::getPersistenceProvider()
}
namespace {
- template<typename T>
- T getConfig(vespalib::stringref configId) {
- config::ConfigUri uri(configId);
- return *config::ConfigGetter<T>::getConfig(uri.getConfigId(), uri.getContext());
- }
+
+template<typename T>
+[[nodiscard]] T get_config(const config::ConfigUri& uri) {
+ return *config::ConfigGetter<T>::getConfig(uri.getConfigId(), uri.getContext());
+}
+
}
void
-TestDistributorApp::configure(vespalib::stringref id)
+TestDistributorApp::configure(const config::ConfigUri& config_uri)
{
- if (id.empty()) return;
- auto dc(getConfig<vespa::config::content::core::StorDistributormanagerConfig>(id));
+ auto dc = get_config<vespa::config::content::core::StorDistributormanagerConfig>(config_uri);
_compReg.setDistributorConfig(dc);
- auto vc(getConfig<vespa::config::content::core::StorVisitordispatcherConfig>(id));
+ auto vc = get_config<vespa::config::content::core::StorVisitordispatcherConfig>(config_uri);
_compReg.setVisitorConfig(vc);
}
-TestDistributorApp::TestDistributorApp(vespalib::stringref configId)
- : TestStorageApp(
- std::make_unique<DistributorComponentRegisterImpl>(),
- lib::NodeType::DISTRIBUTOR, getIndexFromConfig(configId), configId),
+TestDistributorApp::TestDistributorApp(NodeIndex index, const config::ConfigUri& config_uri)
+ : TestStorageApp(std::make_unique<DistributorComponentRegisterImpl>(),
+ lib::NodeType::DISTRIBUTOR, index, config_uri),
_compReg(dynamic_cast<DistributorComponentRegisterImpl&>(TestStorageApp::getComponentRegister())),
_lastUniqueTimestampRequested(0),
_uniqueTimestampCounter(0)
{
_compReg.setTimeCalculator(*this);
- configure(configId);
+ configure(config_uri);
}
-TestDistributorApp::TestDistributorApp(NodeIndex index, vespalib::stringref configId)
- : TestStorageApp(
- std::make_unique<DistributorComponentRegisterImpl>(),
- lib::NodeType::DISTRIBUTOR, index, configId),
- _compReg(dynamic_cast<DistributorComponentRegisterImpl&>(TestStorageApp::getComponentRegister())),
- _lastUniqueTimestampRequested(0),
- _uniqueTimestampCounter(0)
+TestDistributorApp::TestDistributorApp(const config::ConfigUri& config_uri)
+ : TestDistributorApp(node_index_from_config(config_uri), config_uri)
{
- _compReg.setTimeCalculator(*this);
- configure(configId);
}
TestDistributorApp::~TestDistributorApp() = default;
diff --git a/storage/src/tests/common/teststorageapp.h b/storage/src/tests/common/teststorageapp.h
index fb91145c66a..04fa6996e15 100644
--- a/storage/src/tests/common/teststorageapp.h
+++ b/storage/src/tests/common/teststorageapp.h
@@ -1,9 +1,6 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
/**
- * \class storage::TestServiceLayerApp
- * \ingroup common
- *
- * \brief Helper class for tests involving service layer.
+ * Helper class for tests involving service layer.
*
* Some components need some dependencies injected in order to work correctly.
* This test class simplifies the process of creating these dependencies.
@@ -34,6 +31,8 @@
#include <vespa/vespalib/util/sequencedtaskexecutor.h>
#include <atomic>
+namespace config { class ConfigUri; }
+
namespace storage {
namespace spi { struct PersistenceProvider; }
@@ -66,8 +65,8 @@ public:
* from config themselves.
*/
TestStorageApp(StorageComponentRegisterImpl::UP compReg,
- const lib::NodeType&, NodeIndex = NodeIndex(0xffff),
- vespalib::stringref configId = "");
+ const lib::NodeType&, NodeIndex index,
+ const config::ConfigUri& config_uri);
~TestStorageApp() override;
// Set functions, to be able to modify content while running.
@@ -110,8 +109,8 @@ class TestServiceLayerApp : public TestStorageApp
HostInfo _host_info;
public:
- explicit TestServiceLayerApp(vespalib::stringref configId);
- explicit TestServiceLayerApp(NodeIndex = NodeIndex(0xffff), vespalib::stringref configId = "");
+ TestServiceLayerApp(NodeIndex node_index, const config::ConfigUri& config_uri);
+ explicit TestServiceLayerApp(const config::ConfigUri& config_uri);
~TestServiceLayerApp() override;
void setupDummyPersistence();
@@ -140,11 +139,11 @@ class TestDistributorApp : public TestStorageApp,
uint64_t _lastUniqueTimestampRequested;
uint32_t _uniqueTimestampCounter;
- void configure(vespalib::stringref configId);
+ void configure(const config::ConfigUri& config_uri);
public:
- explicit TestDistributorApp(vespalib::stringref configId = "");
- explicit TestDistributorApp(NodeIndex index, vespalib::stringref configId = "");
+ TestDistributorApp(NodeIndex index, const config::ConfigUri& config_uri);
+ explicit TestDistributorApp(const config::ConfigUri& config_uri);
~TestDistributorApp() override;
DistributorComponentRegisterImpl& getComponentRegister() override {
diff --git a/storage/src/tests/distributor/CMakeLists.txt b/storage/src/tests/distributor/CMakeLists.txt
index 250cb872223..8f8c1b68de3 100644
--- a/storage/src/tests/distributor/CMakeLists.txt
+++ b/storage/src/tests/distributor/CMakeLists.txt
@@ -61,8 +61,14 @@ vespa_add_executable(storage_distributor_gtest_runner_app TEST
GTest::gmock_main
)
-vespa_add_test(
- NAME storage_distributor_gtest_runner_app
- COMMAND storage_distributor_gtest_runner_app
- COST 350
-)
+set(TOTAL_SHARDS 5)
+math(EXPR MAX_SHARD_INDEX "${TOTAL_SHARDS} - 1")
+foreach(SHARD_INDEX RANGE ${MAX_SHARD_INDEX})
+ string(REGEX MATCH "...$" FMT_SHARD_INDEX "00" ${SHARD_INDEX})
+ vespa_add_test(
+ NAME storage_distributor_gtest_runner_app_${FMT_SHARD_INDEX}
+ COMMAND storage_distributor_gtest_runner_app
+ ENVIRONMENT "GTEST_SHARD_INDEX=${SHARD_INDEX};GTEST_TOTAL_SHARDS=${TOTAL_SHARDS}"
+ COST 350
+ )
+endforeach()
diff --git a/storage/src/tests/distributor/distributor_stripe_test_util.cpp b/storage/src/tests/distributor/distributor_stripe_test_util.cpp
index 923c7d1730b..0628b62bdfe 100644
--- a/storage/src/tests/distributor/distributor_stripe_test_util.cpp
+++ b/storage/src/tests/distributor/distributor_stripe_test_util.cpp
@@ -32,7 +32,7 @@ DistributorStripeTestUtil::DistributorStripeTestUtil()
_done_initializing(true),
_messageSender(_sender, _senderDown)
{
- _config = getStandardConfig(false);
+ _config = StorageConfigSet::make_distributor_node_config();
}
DistributorStripeTestUtil::~DistributorStripeTestUtil() = default;
@@ -40,7 +40,7 @@ DistributorStripeTestUtil::~DistributorStripeTestUtil() = default;
void
DistributorStripeTestUtil::createLinks()
{
- _node = std::make_unique<TestDistributorApp>(_config.getConfigId());
+ _node = std::make_unique<TestDistributorApp>(_config->config_uri());
_metrics = std::make_shared<DistributorMetricSet>();
_ideal_state_metrics = std::make_shared<IdealStateMetricSet>();
_stripe = std::make_unique<DistributorStripe>(_node->getComponentRegister(), *_metrics, *_ideal_state_metrics,
@@ -77,7 +77,7 @@ DistributorStripeTestUtil::setup_stripe(int redundancy, int node_count, const li
// trigger_distribution_change().
// This isn't pretty, folks, but it avoids breaking the world for now,
// as many tests have implicit assumptions about this being the behavior.
- auto new_configs = BucketSpaceDistributionConfigs::from_default_distribution(std::move(distribution));
+ auto new_configs = lib::BucketSpaceDistributionConfigs::from_default_distribution(std::move(distribution));
_stripe->update_distribution_config(new_configs);
}
@@ -95,7 +95,7 @@ void
DistributorStripeTestUtil::trigger_distribution_change(lib::Distribution::SP distr)
{
_node->getComponentRegister().setDistribution(distr);
- auto new_config = BucketSpaceDistributionConfigs::from_default_distribution(std::move(distr));
+ auto new_config = lib::BucketSpaceDistributionConfigs::from_default_distribution(std::move(distr));
_stripe->update_distribution_config(new_config);
}
@@ -184,8 +184,8 @@ DistributorStripeTestUtil::close()
{
_stripe->flush_and_close();
_sender.clear();
- _node.reset(0);
- _config = getStandardConfig(false);
+ _node.reset();
+ _config = StorageConfigSet::make_distributor_node_config();
}
namespace {
diff --git a/storage/src/tests/distributor/distributor_stripe_test_util.h b/storage/src/tests/distributor/distributor_stripe_test_util.h
index 801320e2bf8..862d9bfbfba 100644
--- a/storage/src/tests/distributor/distributor_stripe_test_util.h
+++ b/storage/src/tests/distributor/distributor_stripe_test_util.h
@@ -5,7 +5,9 @@
#include <tests/common/dummystoragelink.h>
#include <tests/common/testhelper.h>
#include <tests/common/teststorageapp.h>
+#include <tests/common/storage_config_set.h>
#include <vespa/storage/common/hostreporter/hostinfo.h>
+#include <vespa/storage/config/config-stor-distributormanager.h>
#include <vespa/storage/distributor/stripe_host_info_notifier.h>
#include <vespa/storage/storageutil/utils.h>
@@ -132,8 +134,8 @@ public:
const DistributorConfiguration& getConfig();
- vdstestlib::DirConfig& getDirConfig() {
- return _config;
+ vespa::config::content::core::StorDistributormanagerConfigBuilder& backing_config() noexcept {
+ return _config->distributor_manager_config();
}
// TODO explicit notion of bucket spaces for tests
@@ -237,7 +239,7 @@ public:
void tag_content_node_supports_condition_probing(uint16_t index, bool supported);
protected:
- vdstestlib::DirConfig _config;
+ std::unique_ptr<StorageConfigSet> _config;
std::unique_ptr<TestDistributorApp> _node;
std::shared_ptr<DistributorMetricSet> _metrics;
std::shared_ptr<IdealStateMetricSet> _ideal_state_metrics;
diff --git a/storage/src/tests/distributor/externaloperationhandlertest.cpp b/storage/src/tests/distributor/externaloperationhandlertest.cpp
index 634e4993d53..33da4727017 100644
--- a/storage/src/tests/distributor/externaloperationhandlertest.cpp
+++ b/storage/src/tests/distributor/externaloperationhandlertest.cpp
@@ -94,7 +94,7 @@ struct ExternalOperationHandlerTest : Test, DistributorStripeTestUtil {
TEST_F(ExternalOperationHandlerTest, bucket_split_mask) {
{
createLinks();
- getDirConfig().getConfig("stor-distributormanager").set("minsplitcount", "16");
+ backing_config().minsplitcount = 16;
EXPECT_EQ(document::BucketId(16, 0xffff),
operation_context().make_split_bit_constrained_bucket_id(document::DocumentId(
@@ -115,7 +115,7 @@ TEST_F(ExternalOperationHandlerTest, bucket_split_mask) {
close();
}
{
- getDirConfig().getConfig("stor-distributormanager").set("minsplitcount", "20");
+ backing_config().minsplitcount = 20;
createLinks();
EXPECT_EQ(document::BucketId(20, 0x11111),
operation_context().make_split_bit_constrained_bucket_id(document::DocumentId(
diff --git a/storage/src/tests/distributor/idealstatemanagertest.cpp b/storage/src/tests/distributor/idealstatemanagertest.cpp
index 0cadaa3fc9f..4639a154e74 100644
--- a/storage/src/tests/distributor/idealstatemanagertest.cpp
+++ b/storage/src/tests/distributor/idealstatemanagertest.cpp
@@ -74,10 +74,10 @@ TEST_F(IdealStateManagerTest, sibling) {
TEST_F(IdealStateManagerTest, status_page) {
close();
- getDirConfig().getConfig("stor-distributormanager").set("splitsize", "100");
- getDirConfig().getConfig("stor-distributormanager").set("splitcount", "1000000");
- getDirConfig().getConfig("stor-distributormanager").set("joinsize", "0");
- getDirConfig().getConfig("stor-distributormanager").set("joincount", "0");
+ backing_config().splitsize = 100;
+ backing_config().splitcount = 1000000;
+ backing_config().joinsize = 0;
+ backing_config().joincount = 0;
createLinks();
setup_stripe(1, 1, "distributor:1 storage:1");
diff --git a/storage/src/tests/distributor/mock_tickable_stripe.h b/storage/src/tests/distributor/mock_tickable_stripe.h
index 2fb486bab28..3a9ead6ae10 100644
--- a/storage/src/tests/distributor/mock_tickable_stripe.h
+++ b/storage/src/tests/distributor/mock_tickable_stripe.h
@@ -10,7 +10,7 @@ struct MockTickableStripe : TickableStripe {
bool tick() override { abort(); }
void flush_and_close() override { abort(); }
void update_total_distributor_config(std::shared_ptr<const DistributorConfiguration>) override { abort(); }
- void update_distribution_config(const BucketSpaceDistributionConfigs&) override { abort(); }
+ void update_distribution_config(const lib::BucketSpaceDistributionConfigs&) override { abort(); }
void set_pending_cluster_state_bundle(const lib::ClusterStateBundle&) override { abort(); }
void clear_pending_cluster_state_bundle() override { abort(); }
void enable_cluster_state_bundle(const lib::ClusterStateBundle&, bool) override { abort(); }
diff --git a/storage/src/tests/distributor/operationtargetresolvertest.cpp b/storage/src/tests/distributor/operationtargetresolvertest.cpp
index f0f8a4359fd..171dc5a42c0 100644
--- a/storage/src/tests/distributor/operationtargetresolvertest.cpp
+++ b/storage/src/tests/distributor/operationtargetresolvertest.cpp
@@ -27,8 +27,7 @@ struct OperationTargetResolverTest : Test, DistributorStripeTestUtil {
const document::DocumentType* _html_type;
std::unique_ptr<Operation> op;
- BucketInstanceList getInstances(const BucketId& bid,
- bool stripToRedundancy);
+ BucketInstanceList getInstances(const BucketId& bid, bool stripToRedundancy, bool symmetry_mode);
void SetUp() override {
_repo.reset(new document::DocumentTypeRepo(
@@ -62,7 +61,7 @@ namespace {
TestTargets::createTest(id, *this, *_asserters.back())
struct Asserter {
- virtual ~Asserter() {}
+ virtual ~Asserter() = default;
virtual void assertEqualMsg(std::string t1,
OperationTargetList t2,
OperationTargetList t3) = 0;
@@ -73,21 +72,29 @@ struct TestTargets {
OperationTargetList _expected;
OperationTargetResolverTest& _test;
Asserter& _asserter;
+ bool _symmetry_mode;
TestTargets(const BucketId& id,
OperationTargetResolverTest& test,
Asserter& asserter)
- : _id(id), _test(test), _asserter(asserter) {}
+ : _id(id), _test(test), _asserter(asserter), _symmetry_mode(true)
+ {
+ }
~TestTargets() {
- BucketInstanceList result(_test.getInstances(_id, true));
- BucketInstanceList all(_test.getInstances(_id, false));
+ BucketInstanceList result(_test.getInstances(_id, true, _symmetry_mode));
+ BucketInstanceList all(_test.getInstances(_id, false, _symmetry_mode));
_asserter.assertEqualMsg(
all.toString(), _expected, result.createTargets(makeBucketSpace()));
delete _asserters.back();
_asserters.pop_back();
}
+ TestTargets& with_symmetric_replica_selection(bool symmetry) noexcept {
+ _symmetry_mode = symmetry;
+ return *this;
+ }
+
TestTargets& sendsTo(const BucketId& id, uint16_t node) {
_expected.push_back(OperationTarget(
makeDocumentBucket(id), lib::Node(lib::NodeType::STORAGE, node), false));
@@ -110,7 +117,7 @@ struct TestTargets {
} // anonymous
BucketInstanceList
-OperationTargetResolverTest::getInstances(const BucketId& id, bool stripToRedundancy)
+OperationTargetResolverTest::getInstances(const BucketId& id, bool stripToRedundancy, bool symmetry_mode)
{
auto &bucketSpaceRepo(operation_context().bucket_space_repo());
auto &distributorBucketSpace(bucketSpaceRepo.get(makeBucketSpace()));
@@ -118,6 +125,7 @@ OperationTargetResolverTest::getInstances(const BucketId& id, bool stripToRedund
distributorBucketSpace, distributorBucketSpace.getBucketDatabase(), 16,
distributorBucketSpace.getDistribution().getRedundancy(),
makeBucketSpace());
+ resolver.use_symmetric_replica_selection(symmetry_mode);
if (stripToRedundancy) {
return resolver.getInstances(OperationTargetResolver::PUT, id);
} else {
@@ -143,14 +151,48 @@ TEST_F(OperationTargetResolverTest, choose_ideal_state_when_many_copies) {
.sendsTo(BucketId(16, 0), 3);
}
-TEST_F(OperationTargetResolverTest, trusted_over_ideal_state) {
+TEST_F(OperationTargetResolverTest, legacy_prefers_trusted_over_ideal_state) {
setup_stripe(2, 4, "storage:4 distributor:1");
addNodesToBucketDB(BucketId(16, 0), "0=0/0/0/t,1=0,2=0/0/0/t,3=0");
// ideal nodes: 1, 3
+ MY_ASSERT_THAT(BucketId(32, 0)).with_symmetric_replica_selection(false)
+ .sendsTo(BucketId(16, 0), 0)
+ .sendsTo(BucketId(16, 0), 2);
+}
+
+TEST_F(OperationTargetResolverTest, prefer_ready_over_ideal_state_order) {
+ setup_stripe(2, 4, "storage:4 distributor:1");
+ addNodesToBucketDB(BucketId(16, 0), "0=1/2/3/u/i/r,1=1/2/3,2=1/2/3/u/i/r,3=1/2/3");
+ // ideal nodes: 1, 3. 0 and 2 are ready.
MY_ASSERT_THAT(BucketId(32, 0)).sendsTo(BucketId(16, 0), 0)
.sendsTo(BucketId(16, 0), 2);
}
+TEST_F(OperationTargetResolverTest, prefer_ready_over_ideal_state_order_also_when_retired) {
+ setup_stripe(2, 4, "storage:4 .0.s:r distributor:1");
+ addNodesToBucketDB(BucketId(16, 0), "0=1/2/3/u/i/r,1=1/2/3,2=1/2/3/u/i/r,3=1/2/3");
+ // ideal nodes: 1, 3. 0 and 2 are ready.
+ MY_ASSERT_THAT(BucketId(32, 0)).sendsTo(BucketId(16, 0), 0)
+ .sendsTo(BucketId(16, 0), 2);
+}
+
+TEST_F(OperationTargetResolverTest, prefer_replicas_with_more_docs_over_replicas_with_fewer_docs) {
+ setup_stripe(2, 4, "storage:4 distributor:1");
+ addNodesToBucketDB(BucketId(16, 0), "0=2/3/4,1=1/2/3,2=3/4/5,3=1/2/3");
+ // ideal nodes: 1, 3. 0 and 2 have more docs.
+ MY_ASSERT_THAT(BucketId(32, 0)).sendsTo(BucketId(16, 0), 2)
+ .sendsTo(BucketId(16, 0), 0);
+}
+
+TEST_F(OperationTargetResolverTest, fall_back_to_active_state_and_db_index_if_all_other_fields_equal) {
+ // All replica nodes tagged as retired, which means none are part of the ideal state order
+ setup_stripe(2, 4, "storage:4 .0.s:r .2.s:r .3.s:r distributor:1");
+ addNodesToBucketDB(BucketId(16, 0), "0=2/3/4/u/a,3=2/3/4,2=2/3/4");
+ // ideal nodes: 1, 3. 0 is active and 3 is the remaining replica with the lowest DB order.
+ MY_ASSERT_THAT(BucketId(32, 0)).sendsTo(BucketId(16, 0), 0)
+ .sendsTo(BucketId(16, 0), 3);
+}
+
TEST_F(OperationTargetResolverTest, choose_highest_split_bucket) {
setup_stripe(2, 2, "storage:2 distributor:1");
// 0, 1 are both in ideal state for both buckets.
diff --git a/storage/src/tests/distributor/statusreporterdelegatetest.cpp b/storage/src/tests/distributor/statusreporterdelegatetest.cpp
index cc23fa7a22e..c70ab533af6 100644
--- a/storage/src/tests/distributor/statusreporterdelegatetest.cpp
+++ b/storage/src/tests/distributor/statusreporterdelegatetest.cpp
@@ -1,5 +1,6 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <tests/common/storage_config_set.h>
#include <tests/common/testhelper.h>
#include <tests/common/teststorageapp.h>
#include <vespa/storage/distributor/statusreporterdelegate.h>
@@ -45,8 +46,8 @@ public:
}
TEST(StatusReporterDelegateTest, delegate_invokes_delegator_on_status_request) {
- vdstestlib::DirConfig config(getStandardConfig(false));
- TestDistributorApp app(config.getConfigId());
+ auto config = StorageConfigSet::make_distributor_node_config();
+ TestDistributorApp app(config->config_uri());
MockDelegator mockDelegator;
MockStatusReporter reporter;
diff --git a/storage/src/tests/distributor/top_level_distributor_test_util.cpp b/storage/src/tests/distributor/top_level_distributor_test_util.cpp
index 94031c6d71e..e1b2bc93f62 100644
--- a/storage/src/tests/distributor/top_level_distributor_test_util.cpp
+++ b/storage/src/tests/distributor/top_level_distributor_test_util.cpp
@@ -24,7 +24,7 @@ TopLevelDistributorTestUtil::TopLevelDistributorTestUtil()
: _message_sender(_sender, _sender_down),
_num_distributor_stripes(4)
{
- _config = getStandardConfig(false);
+ _config = StorageConfigSet::make_distributor_node_config();
}
TopLevelDistributorTestUtil::~TopLevelDistributorTestUtil() = default;
@@ -32,7 +32,7 @@ TopLevelDistributorTestUtil::~TopLevelDistributorTestUtil() = default;
void
TopLevelDistributorTestUtil::create_links()
{
- _node = std::make_unique<TestDistributorApp>(_config.getConfigId());
+ _node = std::make_unique<TestDistributorApp>(_config->config_uri());
_thread_pool = framework::TickingThreadPool::createDefault("distributor", 100ms);
_stripe_pool = DistributorStripePool::make_non_threaded_pool_for_testing();
_distributor.reset(new TopLevelDistributor(
@@ -123,7 +123,7 @@ TopLevelDistributorTestUtil::close()
}
_sender.clear();
_node.reset();
- _config = getStandardConfig(false);
+ _config = StorageConfigSet::make_distributor_node_config();
}
void
diff --git a/storage/src/tests/distributor/top_level_distributor_test_util.h b/storage/src/tests/distributor/top_level_distributor_test_util.h
index 1d4c81a5bfb..51f0739e3e6 100644
--- a/storage/src/tests/distributor/top_level_distributor_test_util.h
+++ b/storage/src/tests/distributor/top_level_distributor_test_util.h
@@ -3,6 +3,7 @@
#include "distributor_message_sender_stub.h"
#include <tests/common/dummystoragelink.h>
+#include <tests/common/storage_config_set.h>
#include <tests/common/testhelper.h>
#include <tests/common/teststorageapp.h>
#include <vespa/storage/common/hostreporter/hostinfo.h>
@@ -140,7 +141,7 @@ public:
static std::vector<document::BucketSpace> bucket_spaces();
protected:
- vdstestlib::DirConfig _config;
+ std::unique_ptr<StorageConfigSet> _config;
std::unique_ptr<TestDistributorApp> _node;
std::unique_ptr<framework::TickingThreadPool> _thread_pool;
std::unique_ptr<DistributorStripePool> _stripe_pool;
diff --git a/storage/src/tests/distributor/updateoperationtest.cpp b/storage/src/tests/distributor/updateoperationtest.cpp
index 31ebbe19cbb..e00ce249298 100644
--- a/storage/src/tests/distributor/updateoperationtest.cpp
+++ b/storage/src/tests/distributor/updateoperationtest.cpp
@@ -49,13 +49,13 @@ struct UpdateOperationTest : Test, DistributorStripeTestUtil {
const api::ReturnCode& result = api::ReturnCode());
std::shared_ptr<UpdateOperation>
- sendUpdate(const std::string& bucketState, bool create_if_missing = false);
+ sendUpdate(const std::string& bucketState, bool create_if_missing = false, bool cache_create_flag = false);
document::BucketId _bId;
};
std::shared_ptr<UpdateOperation>
-UpdateOperationTest::sendUpdate(const std::string& bucketState, bool create_if_missing)
+UpdateOperationTest::sendUpdate(const std::string& bucketState, bool create_if_missing, bool cache_create_flag)
{
auto update = std::make_shared<document::DocumentUpdate>(
*_repo, *_html_type,
@@ -67,6 +67,9 @@ UpdateOperationTest::sendUpdate(const std::string& bucketState, bool create_if_m
addNodesToBucketDB(_bId, bucketState);
auto msg = std::make_shared<api::UpdateCommand>(makeDocumentBucket(document::BucketId(0)), update, 100);
+ if (cache_create_flag) {
+ msg->set_cached_create_if_missing(create_if_missing);
+ }
return std::make_shared<UpdateOperation>(
node_context(), operation_context(), getDistributorBucketSpace(), msg, std::vector<BucketDatabase::Entry>(),
@@ -271,4 +274,20 @@ TEST_F(UpdateOperationTest, cancelled_nodes_are_not_updated_in_db) {
dumpBucket(_bId));
}
+TEST_F(UpdateOperationTest, cached_create_if_missing_is_propagated_to_fanout_requests) {
+ setup_stripe(1, 1, "distributor:1 storage:1");
+ for (bool cache_flag : {false, true}) {
+ for (bool create_if_missing : {false, true}) {
+ std::shared_ptr<UpdateOperation> cb(sendUpdate("0=1/2/3", create_if_missing, cache_flag));
+ DistributorMessageSenderStub sender;
+ cb->start(sender);
+
+ ASSERT_EQ("Update => 0", sender.getCommands(true));
+ auto& cmd = dynamic_cast<api::UpdateCommand&>(*sender.command(0));
+ EXPECT_EQ(cmd.has_cached_create_if_missing(), cache_flag);
+ EXPECT_EQ(cmd.create_if_missing(), create_if_missing);
+ }
+ }
+}
+
}
diff --git a/storage/src/tests/frameworkimpl/status/statustest.cpp b/storage/src/tests/frameworkimpl/status/statustest.cpp
index bd28297e108..8592a332f0c 100644
--- a/storage/src/tests/frameworkimpl/status/statustest.cpp
+++ b/storage/src/tests/frameworkimpl/status/statustest.cpp
@@ -1,10 +1,11 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <tests/common/storage_config_set.h>
+#include <tests/common/teststorageapp.h>
#include <vespa/storageframework/defaultimplementation/component/componentregisterimpl.h>
#include <vespa/storage/frameworkimpl/status/statuswebserver.h>
#include <vespa/storageframework/generic/status/htmlstatusreporter.h>
#include <vespa/storageframework/generic/status/xmlstatusreporter.h>
-#include <tests/common/teststorageapp.h>
#include <vespa/document/util/stringutil.h>
#include <vespa/vespalib/net/crypto_engine.h>
#include <vespa/vespalib/net/socket_spec.h>
@@ -39,6 +40,7 @@ vespalib::string fetch(int port, const vespalib::string &path) {
namespace storage {
struct StatusTest : Test {
+ std::unique_ptr<StorageConfigSet> _config;
std::unique_ptr<TestServiceLayerApp> _node;
void SetUp() override;
@@ -97,7 +99,8 @@ namespace {
}
void StatusTest::SetUp() {
- _node = std::make_unique<TestServiceLayerApp>();
+ _config = StorageConfigSet::make_storage_node_config();
+ _node = std::make_unique<TestServiceLayerApp>(_config->config_uri());
}
namespace {
diff --git a/storage/src/tests/persistence/bucketownershipnotifiertest.cpp b/storage/src/tests/persistence/bucketownershipnotifiertest.cpp
index 129cac34c68..717a79b030d 100644
--- a/storage/src/tests/persistence/bucketownershipnotifiertest.cpp
+++ b/storage/src/tests/persistence/bucketownershipnotifiertest.cpp
@@ -1,6 +1,7 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <tests/common/message_sender_stub.h>
+#include <tests/common/storage_config_set.h>
#include <tests/common/teststorageapp.h>
#include <vespa/document/test/make_document_bucket.h>
#include <vespa/storage/persistence/bucketownershipnotifier.h>
@@ -14,6 +15,7 @@ using namespace ::testing;
namespace storage {
struct BucketOwnershipNotifierTest : public Test {
+ std::unique_ptr<StorageConfigSet> _config;
std::unique_ptr<TestServiceLayerApp> _app;
lib::ClusterState _clusterState;
@@ -58,7 +60,8 @@ struct BucketOwnershipNotifierTest : public Test {
void
BucketOwnershipNotifierTest::SetUp()
{
- _app = std::make_unique<TestServiceLayerApp>();
+ _config = StorageConfigSet::make_storage_node_config();
+ _app = std::make_unique<TestServiceLayerApp>(_config->config_uri());
_app->setDistribution(Redundancy(1), NodeCount(2));
_app->setClusterState(_clusterState);
}
diff --git a/storage/src/tests/persistence/common/filestortestfixture.cpp b/storage/src/tests/persistence/common/filestortestfixture.cpp
index 6581fb9f7b1..6c7b09c4736 100644
--- a/storage/src/tests/persistence/common/filestortestfixture.cpp
+++ b/storage/src/tests/persistence/common/filestortestfixture.cpp
@@ -25,14 +25,11 @@ const uint32_t FileStorTestFixture::MSG_WAIT_TIME;
void
FileStorTestFixture::setupPersistenceThreads(uint32_t threads)
{
- std::string rootOfRoot = "todo-make-unique-filestorefixture";
- _config = std::make_unique<vdstestlib::DirConfig>(getStandardConfig(true, rootOfRoot));
- _config->getConfig("stor-server").set("root_folder", (rootOfRoot + "-vdsroot.2"));
- _config->getConfig("stor-devices").set("root_folder", (rootOfRoot + "-vdsroot.2"));
- _config->getConfig("stor-server").set("node_index", "1");
- _config->getConfig("stor-filestor").set("num_threads", std::to_string(threads));
-
- _node = std::make_unique<TestServiceLayerApp>(NodeIndex(1), _config->getConfigId());
+ _config = StorageConfigSet::make_storage_node_config();
+ _config->set_node_index(1);
+ _config->filestor_config().numThreads = threads;
+
+ _node = std::make_unique<TestServiceLayerApp>(NodeIndex(1), _config->config_uri());
_testdoctype1 = _node->getTypeRepo()->getDocumentType("testdoctype1");
}
@@ -77,7 +74,7 @@ FileStorTestFixture::TestFileStorComponents::TestFileStorComponents(
{
injector.inject(top);
using StorFilestorConfig = vespa::config::content::internal::InternalStorFilestorType;
- auto config = config_from<StorFilestorConfig>(config::ConfigUri(fixture._config->getConfigId()));
+ auto config = config_from<StorFilestorConfig>(fixture._config->config_uri());
auto fsm = std::make_unique<FileStorManager>(*config, fixture._node->getPersistenceProvider(),
fixture._node->getComponentRegister(), *fixture._node, fixture._node->get_host_info());
manager = fsm.get();
diff --git a/storage/src/tests/persistence/common/filestortestfixture.h b/storage/src/tests/persistence/common/filestortestfixture.h
index e4776f393ae..d4fb94101cc 100644
--- a/storage/src/tests/persistence/common/filestortestfixture.h
+++ b/storage/src/tests/persistence/common/filestortestfixture.h
@@ -2,6 +2,7 @@
#pragma once
#include <tests/common/dummystoragelink.h>
+#include <tests/common/storage_config_set.h>
#include <tests/common/testhelper.h>
#include <tests/common/teststorageapp.h>
#include <vespa/storage/persistence/filestorage/filestorhandlerimpl.h>
@@ -14,8 +15,8 @@ namespace storage {
class FileStorTestFixture : public ::testing::Test
{
public:
+ std::unique_ptr<StorageConfigSet> _config;
std::unique_ptr<TestServiceLayerApp> _node;
- std::unique_ptr<vdstestlib::DirConfig> _config;
const document::DocumentType* _testdoctype1;
static const uint32_t MSG_WAIT_TIME = 60 * 1000;
diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
index f12b85eb2ea..bdc3f17e576 100644
--- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
+++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
@@ -1,6 +1,7 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <tests/common/dummystoragelink.h>
+#include <tests/common/storage_config_set.h>
#include <tests/common/testhelper.h>
#include <tests/common/teststorageapp.h>
#include <tests/persistence/filestorage/forwardingmessagesender.h>
@@ -41,7 +42,6 @@
#include <vespa/log/log.h>
LOG_SETUP(".filestormanagertest");
-using std::unique_ptr;
using document::Document;
using document::BucketId;
using namespace storage::api;
@@ -91,10 +91,8 @@ make_bucket_for_doc(const document::DocumentId& docid)
struct FileStorTestBase : Test {
enum {LONG_WAITTIME=60};
- unique_ptr<TestServiceLayerApp> _node;
- std::unique_ptr<vdstestlib::DirConfig> config;
- std::unique_ptr<vdstestlib::DirConfig> config2;
- std::unique_ptr<vdstestlib::DirConfig> smallConfig;
+ std::unique_ptr<StorageConfigSet> _config;
+ std::unique_ptr<TestServiceLayerApp> _node;
const int32_t _waitTime;
const document::DocumentType* _testdoctype1;
@@ -165,29 +163,10 @@ struct FileStorTestBase : Test {
void setupDisks() {
std::string rootOfRoot = "filestormanagertest";
- config = std::make_unique<vdstestlib::DirConfig>(getStandardConfig(true, rootOfRoot));
-
- config2 = std::make_unique<vdstestlib::DirConfig>(*config);
- config2->getConfig("stor-server").set("root_folder", rootOfRoot + "-vdsroot.2");
- config2->getConfig("stor-devices").set("root_folder", rootOfRoot + "-vdsroot.2");
- config2->getConfig("stor-server").set("node_index", "1");
-
- smallConfig = std::make_unique<vdstestlib::DirConfig>(*config);
- vdstestlib::DirConfig::Config& c(smallConfig->getConfig("stor-filestor", true));
- c.set("initial_index_read", "128");
- c.set("use_direct_io", "false");
- c.set("maximum_gap_to_read_through", "64");
-
- assert(system(vespalib::make_string("rm -rf %s", getRootFolder(*config).c_str()).c_str()) == 0);
- assert(system(vespalib::make_string("rm -rf %s", getRootFolder(*config2).c_str()).c_str()) == 0);
- assert(system(vespalib::make_string("mkdir -p %s/disks/d0", getRootFolder(*config).c_str()).c_str()) == 0);
- assert(system(vespalib::make_string("mkdir -p %s/disks/d0", getRootFolder(*config2).c_str()).c_str()) == 0);
- try {
- _node = std::make_unique<TestServiceLayerApp>(NodeIndex(0), config->getConfigId());
- _node->setupDummyPersistence();
- } catch (config::InvalidConfigException& e) {
- fprintf(stderr, "%s\n", e.what());
- }
+ _config = StorageConfigSet::make_storage_node_config();
+
+ _node = std::make_unique<TestServiceLayerApp>(NodeIndex(0), _config->config_uri());
+ _node->setupDummyPersistence();
_testdoctype1 = _node->getTypeRepo()->getDocumentType("testdoctype1");
}
@@ -227,11 +206,10 @@ struct TestFileStorComponents {
DummyStorageLink top;
FileStorManager* manager;
- explicit TestFileStorComponents(FileStorTestBase& test, bool use_small_config = false)
+ explicit TestFileStorComponents(FileStorTestBase& test)
: manager(nullptr)
{
- auto config_uri = config::ConfigUri((use_small_config ? test.smallConfig : test.config)->getConfigId());
- auto config = config_from<StorFilestorConfig>(config_uri);
+ auto config = config_from<StorFilestorConfig>(test._config->config_uri());
auto fsm = std::make_unique<FileStorManager>(*config, test._node->getPersistenceProvider(),
test._node->getComponentRegister(), *test._node, test._node->get_host_info());
manager = fsm.get();
@@ -1255,7 +1233,7 @@ createIterator(DummyStorageLink& link,
}
TEST_F(FileStorManagerTest, visiting) {
- TestFileStorComponents c(*this, true);
+ TestFileStorComponents c(*this);
auto& top = c.top;
// Adding documents to two buckets which we are going to visit
// We want one bucket in one slotfile, and one bucket with a file split
@@ -1409,8 +1387,7 @@ TEST_F(FileStorManagerTest, remove_location) {
TEST_F(FileStorManagerTest, delete_bucket) {
TestFileStorComponents c(*this);
- auto config_uri = config::ConfigUri(config->getConfigId());
- StorFilestorConfigBuilder my_config(*config_from<StorFilestorConfig>(config_uri));
+ auto my_config = *config_from<StorFilestorConfig>(_config->config_uri());
c.manager->on_configure(my_config);
auto& top = c.top;
diff --git a/storage/src/tests/persistence/filestorage/filestormodifiedbucketstest.cpp b/storage/src/tests/persistence/filestorage/filestormodifiedbucketstest.cpp
index 710da80972f..38acc4a18b8 100644
--- a/storage/src/tests/persistence/filestorage/filestormodifiedbucketstest.cpp
+++ b/storage/src/tests/persistence/filestorage/filestormodifiedbucketstest.cpp
@@ -38,7 +38,7 @@ struct BucketCheckerInjector : FileStorTestFixture::StorageLinkInjector
{}
void inject(DummyStorageLink& link) const override {
using vespa::config::content::core::StorServerConfig;
- auto cfg = config_from<StorServerConfig>(config::ConfigUri(_fixture._config->getConfigId()));
+ auto cfg = config_from<StorServerConfig>(_fixture._config->config_uri());
link.push_back(std::make_unique<ModifiedBucketChecker>(
_node.getComponentRegister(), _node.getPersistenceProvider(), *cfg));
}
diff --git a/storage/src/tests/persistence/filestorage/modifiedbucketcheckertest.cpp b/storage/src/tests/persistence/filestorage/modifiedbucketcheckertest.cpp
index f96ff9c012e..9fc6ddff268 100644
--- a/storage/src/tests/persistence/filestorage/modifiedbucketcheckertest.cpp
+++ b/storage/src/tests/persistence/filestorage/modifiedbucketcheckertest.cpp
@@ -1,6 +1,7 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <tests/common/dummystoragelink.h>
+#include <tests/common/storage_config_set.h>
#include <tests/common/testhelper.h>
#include <tests/common/teststorageapp.h>
#include <vespa/config/common/exceptions.h>
@@ -34,20 +35,20 @@ struct ModifiedBucketCheckerTest : Test {
ModifiedBucketChecker* _handler;
DummyStorageLink* _bottom;
+ std::unique_ptr<StorageConfigSet> _config;
std::unique_ptr<TestServiceLayerApp> _node;
- std::unique_ptr<vdstestlib::DirConfig> _config;
};
void
ModifiedBucketCheckerTest::SetUp()
{
- _config.reset(new vdstestlib::DirConfig(getStandardConfig(true)));
- _node.reset(new TestServiceLayerApp(NodeIndex(0), _config->getConfigId()));
+ _config = StorageConfigSet::make_storage_node_config();
+ _node = std::make_unique<TestServiceLayerApp>(NodeIndex(0), _config->config_uri());
_node->setupDummyPersistence();
- _top.reset(new DummyStorageLink);
+ _top = std::make_unique<DummyStorageLink>();
using vespa::config::content::core::StorServerConfig;
- auto bootstrap_cfg = config_from<StorServerConfig>(config::ConfigUri(_config->getConfigId()));
+ auto bootstrap_cfg = config_from<StorServerConfig>(_config->config_uri());
_handler = new ModifiedBucketChecker(_node->getComponentRegister(),
_node->getPersistenceProvider(),
*bootstrap_cfg);
diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp
index 4bd0570efa8..524f5bae392 100644
--- a/storage/src/tests/persistence/mergehandlertest.cpp
+++ b/storage/src/tests/persistence/mergehandlertest.cpp
@@ -220,11 +220,11 @@ void
MergeHandlerTest::setUpChain(ChainPos pos) {
_nodes.clear();
if (pos != FRONT) {
- _nodes.push_back(api::MergeBucketCommand::Node(2, false));
+ _nodes.emplace_back(2, false);
}
- _nodes.push_back(api::MergeBucketCommand::Node(0, false));
+ _nodes.emplace_back(0, false);
if (pos != BACK) {
- _nodes.push_back(api::MergeBucketCommand::Node(1, false));
+ _nodes.emplace_back(1, false);
}
}
@@ -1439,4 +1439,52 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply)
LOG(debug, "got mergebucket reply");
}
+TEST_F(MergeHandlerTest, multiple_versions_in_apply_diff_only_writes_newest_version) {
+ setUpChain(BACK);
+
+ document::TestDocMan doc_mgr;
+ document::Document::SP doc(doc_mgr.createRandomDocumentAtLocation(_location, 1));
+ spi::Timestamp ts_old(10'000);
+ spi::Timestamp ts_new(20'000);
+
+ PersistenceProviderWrapper provider_wrapper(getPersistenceProvider());
+ MergeHandler handler = createHandler(provider_wrapper);
+ std::vector<api::ApplyBucketDiffCommand::Entry> apply_diff;
+ // Diff contains two entries for the same document; one old Remove and one newer Put that
+ // subsumes the Remove operation. We should only schedule the Put to the SPI.
+ {
+ api::ApplyBucketDiffCommand::Entry e;
+ e._entry._timestamp = ts_old;
+ e._entry._hasMask = 0x1;
+ e._docName = doc->getId().toString();
+ e._entry._flags = MergeHandler::IN_USE | MergeHandler::DELETED;
+ apply_diff.push_back(e);
+ }
+ {
+ api::ApplyBucketDiffCommand::Entry e;
+ e._entry._timestamp = ts_new;
+ e._entry._hasMask = 0x1;
+ e._entry._flags = MergeHandler::IN_USE;
+ fill_entry(e, *doc, doc_mgr.getTypeRepo());
+ apply_diff.push_back(e);
+ }
+
+ auto apply_bucket_diff_cmd = std::make_shared<api::ApplyBucketDiffCommand>(_bucket, _nodes);
+ apply_bucket_diff_cmd->getDiff() = std::move(apply_diff);
+
+ provider_wrapper.clearOperationLog();
+ auto tracker = handler.handleApplyBucketDiff(*apply_bucket_diff_cmd, createTracker(apply_bucket_diff_cmd, _bucket));
+ ASSERT_FALSE(tracker);
+ handler.drain_async_writes();
+
+ // There should be no remove at time=ts_old, only a put at time=ts_new.
+ // TODO ideally we shouldn't have to know about the other operations...
+ EXPECT_EQ(provider_wrapper.toString(),
+ "createIterator(Bucket(0x40000000000004d2), ALL_VERSIONS)\n"
+ "iterate(1, 18446744073709551615)\n"
+ "destroyIterator(1)\n"
+ "put(Bucket(0x40000000000004d2), 20000, id:mail:testdoctype1:n=1234:9380.html)\n"
+ "getBucketInfo(Bucket(0x40000000000004d2))\n");
+}
+
} // storage
diff --git a/storage/src/tests/persistence/persistencetestutils.cpp b/storage/src/tests/persistence/persistencetestutils.cpp
index a599b1d380a..b63df53781d 100644
--- a/storage/src/tests/persistence/persistencetestutils.cpp
+++ b/storage/src/tests/persistence/persistencetestutils.cpp
@@ -25,14 +25,6 @@ namespace storage {
namespace {
-vdstestlib::DirConfig initialize(const std::string & rootOfRoot) {
- vdstestlib::DirConfig config(getStandardConfig(true, rootOfRoot));
- std::string rootFolder = getRootFolder(config);
- std::filesystem::remove_all(std::filesystem::path(rootFolder));
- std::filesystem::create_directories(std::filesystem::path(vespalib::make_string("%s/disks/d0", rootFolder.c_str())));
- return config;
-}
-
template<typename T>
struct ConfigReader : public T::Subscriber
{
@@ -49,10 +41,10 @@ constexpr uint32_t MERGE_CHUNK_SIZE = 4_Mi;
}
-PersistenceTestEnvironment::PersistenceTestEnvironment(const std::string & rootOfRoot)
- : _config(initialize(rootOfRoot)),
+PersistenceTestEnvironment::PersistenceTestEnvironment()
+ : _config(StorageConfigSet::make_distributor_node_config()),
_messageKeeper(),
- _node(NodeIndex(0), _config.getConfigId()),
+ _node(NodeIndex(0), _config->config_uri()),
_component(_node.getComponentRegister(), "persistence test env"),
_metrics()
{
@@ -106,7 +98,7 @@ PersistenceTestUtils::MockBucketLocks::unlock(document::Bucket bucket)
}
PersistenceTestUtils::PersistenceTestUtils()
- : _env(std::make_unique<PersistenceTestEnvironment>("todo-make-unique-persistencetestutils")),
+ : _env(std::make_unique<PersistenceTestEnvironment>()),
_replySender(),
_bucketOwnershipNotifier(getEnv()._component, getEnv()._fileStorHandler),
_mock_bucket_locks(),
diff --git a/storage/src/tests/persistence/persistencetestutils.h b/storage/src/tests/persistence/persistencetestutils.h
index d03974855ad..0125bd7aa79 100644
--- a/storage/src/tests/persistence/persistencetestutils.h
+++ b/storage/src/tests/persistence/persistencetestutils.h
@@ -1,6 +1,7 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
+#include <tests/common/storage_config_set.h>
#include <tests/common/teststorageapp.h>
#include <tests/common/testhelper.h>
#include <vespa/storage/persistence/persistencethread.h>
@@ -25,11 +26,11 @@ struct MessageKeeper : public MessageSender {
};
struct PersistenceTestEnvironment {
- PersistenceTestEnvironment(const std::string & rootOfRoot);
+ PersistenceTestEnvironment();
~PersistenceTestEnvironment();
document::TestDocMan _testDocMan;
- vdstestlib::DirConfig _config;
+ std::unique_ptr<StorageConfigSet> _config;
MessageKeeper _messageKeeper;
TestServiceLayerApp _node;
ServiceLayerComponent _component;
diff --git a/storage/src/tests/persistence/provider_error_wrapper_test.cpp b/storage/src/tests/persistence/provider_error_wrapper_test.cpp
index fc88428f915..d5ce8400b25 100644
--- a/storage/src/tests/persistence/provider_error_wrapper_test.cpp
+++ b/storage/src/tests/persistence/provider_error_wrapper_test.cpp
@@ -1,5 +1,6 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <tests/common/storage_config_set.h>
#include <vespa/persistence/spi/test.h>
#include <tests/persistence/persistencetestutils.h>
#include <tests/persistence/common/persistenceproviderwrapper.h>
@@ -33,13 +34,15 @@ struct MockErrorListener : ProviderErrorListener {
struct Fixture {
// We wrap the wrapper. It's turtles all the way down!
PersistenceProviderWrapper providerWrapper;
+ std::unique_ptr<StorageConfigSet> config;
TestServiceLayerApp app;
ServiceLayerComponent component;
ProviderErrorWrapper errorWrapper;
Fixture(spi::PersistenceProvider& provider)
: providerWrapper(provider),
- app(),
+ config(StorageConfigSet::make_storage_node_config()),
+ app(config->config_uri()),
component(app.getComponentRegister(), "dummy"),
errorWrapper(providerWrapper)
{
diff --git a/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp b/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp
index 698d8dee573..231f41ffd21 100644
--- a/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp
+++ b/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp
@@ -262,7 +262,6 @@ TEST_P(StorageProtocolTest, response_metadata_is_propagated) {
TEST_P(StorageProtocolTest, update) {
auto update = std::make_shared<document::DocumentUpdate>(_docMan.getTypeRepo(), *_testDoc->getDataType(), _testDoc->getId());
update->addUpdate(FieldUpdate(_testDoc->getField("headerval")).addUpdate(std::make_unique<AssignValueUpdate>(std::make_unique<IntFieldValue>(17))));
-
update->addFieldPathUpdate(std::make_unique<RemoveFieldPathUpdate>("headerval", "testdoctype1.headerval > 0"));
auto cmd = std::make_shared<UpdateCommand>(_bucket, update, 14);
@@ -284,6 +283,37 @@ TEST_P(StorageProtocolTest, update) {
EXPECT_NO_FATAL_FAILURE(assert_bucket_info_reply_fields_propagated(*reply2));
}
+TEST_P(StorageProtocolTest, update_request_create_if_missing_flag_is_propagated) {
+ auto make_update_cmd = [&](bool create_if_missing, bool cached) {
+ auto update = std::make_shared<document::DocumentUpdate>(
+ _docMan.getTypeRepo(), *_testDoc->getDataType(), _testDoc->getId());
+ update->addUpdate(FieldUpdate(_testDoc->getField("headerval")).addUpdate(
+ std::make_unique<AssignValueUpdate>(std::make_unique<IntFieldValue>(17))));
+ update->addFieldPathUpdate(std::make_unique<RemoveFieldPathUpdate>("headerval", "testdoctype1.headerval > 0"));
+ update->setCreateIfNonExistent(create_if_missing);
+ auto cmd = std::make_shared<UpdateCommand>(_bucket, update, 14);
+ if (cached) {
+ cmd->set_cached_create_if_missing(create_if_missing);
+ }
+ return cmd;
+ };
+
+ auto check_flag_propagation = [&](bool create_if_missing, bool cached) {
+ auto cmd = make_update_cmd(create_if_missing, cached);
+ EXPECT_EQ(cmd->has_cached_create_if_missing(), cached);
+ EXPECT_EQ(cmd->create_if_missing(), create_if_missing);
+
+ auto cmd2 = copyCommand(cmd);
+ EXPECT_EQ(cmd2->has_cached_create_if_missing(), cached);
+ EXPECT_EQ(cmd2->create_if_missing(), create_if_missing);
+ };
+
+ check_flag_propagation(false, false);
+ check_flag_propagation(true, false);
+ check_flag_propagation(false, true);
+ check_flag_propagation(true, true);
+}
+
TEST_P(StorageProtocolTest, get) {
auto cmd = std::make_shared<GetCommand>(_bucket, _testDocId, "foo,bar,vekterli", 123);
auto cmd2 = copyCommand(cmd);
@@ -880,7 +910,7 @@ TEST_P(StorageProtocolTest, track_memory_footprint_for_some_messages) {
EXPECT_EQ(sizeof(BucketInfoCommand), sizeof(BucketCommand));
EXPECT_EQ(sizeof(TestAndSetCommand), sizeof(BucketInfoCommand) + sizeof(vespalib::string));
EXPECT_EQ(sizeof(PutCommand), sizeof(TestAndSetCommand) + 40);
- EXPECT_EQ(sizeof(UpdateCommand), sizeof(TestAndSetCommand) + 32);
+ EXPECT_EQ(sizeof(UpdateCommand), sizeof(TestAndSetCommand) + 40);
EXPECT_EQ(sizeof(RemoveCommand), sizeof(TestAndSetCommand) + 112);
EXPECT_EQ(sizeof(GetCommand), sizeof(BucketInfoCommand) + sizeof(TestAndSetCondition) + 184);
}
diff --git a/storage/src/tests/storageserver/bouncertest.cpp b/storage/src/tests/storageserver/bouncertest.cpp
index 296ed6d23bc..11742dd658f 100644
--- a/storage/src/tests/storageserver/bouncertest.cpp
+++ b/storage/src/tests/storageserver/bouncertest.cpp
@@ -1,6 +1,7 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <tests/common/dummystoragelink.h>
+#include <tests/common/storage_config_set.h>
#include <tests/common/testhelper.h>
#include <tests/common/teststorageapp.h>
#include <vespa/config/common/exceptions.h>
@@ -26,6 +27,7 @@ using namespace ::testing;
namespace storage {
struct BouncerTest : public Test {
+ std::unique_ptr<StorageConfigSet> _config;
std::unique_ptr<TestStorageApp> _node;
std::unique_ptr<DummyStorageLink> _upper;
Bouncer* _manager;
@@ -57,15 +59,15 @@ BouncerTest::BouncerTest()
}
void BouncerTest::setUpAsNode(const lib::NodeType& type) {
- vdstestlib::DirConfig config(getStandardConfig(type == lib::NodeType::STORAGE));
+ _config = StorageConfigSet::make_node_config(type == lib::NodeType::STORAGE);
if (type == lib::NodeType::STORAGE) {
- _node = std::make_unique<TestServiceLayerApp>(NodeIndex(2), config.getConfigId());
+ _node = std::make_unique<TestServiceLayerApp>(NodeIndex(2), _config->config_uri());
} else {
- _node = std::make_unique<TestDistributorApp>(NodeIndex(2), config.getConfigId());
+ _node = std::make_unique<TestDistributorApp>(NodeIndex(2), _config->config_uri());
}
_upper = std::make_unique<DummyStorageLink>();
using StorBouncerConfig = vespa::config::content::core::StorBouncerConfig;
- auto cfg_uri = config::ConfigUri(config.getConfigId());
+ auto& cfg_uri = _config->config_uri();
auto cfg = config::ConfigGetter<StorBouncerConfig>::getConfig(cfg_uri.getConfigId(), cfg_uri.getContext());
_manager = new Bouncer(_node->getComponentRegister(), *cfg);
_lower = new DummyStorageLink();
diff --git a/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp b/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp
index 50977b5ec8b..8982b02f2b7 100644
--- a/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp
+++ b/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp
@@ -3,6 +3,7 @@
#include <tests/common/teststorageapp.h>
#include <tests/common/testhelper.h>
#include <tests/common/dummystoragelink.h>
+#include <tests/common/storage_config_set.h>
#include <vespa/config/helper/configgetter.hpp>
#include <vespa/document/base/testdocman.h>
#include <vespa/storage/bucketdb/storbucketdb.h>
@@ -28,11 +29,12 @@ using namespace ::testing;
namespace storage {
struct ChangedBucketOwnershipHandlerTest : Test {
+ std::unique_ptr<StorageConfigSet> _config;
std::unique_ptr<TestServiceLayerApp> _app;
- std::unique_ptr<DummyStorageLink> _top;
- ChangedBucketOwnershipHandler* _handler;
- DummyStorageLink* _bottom;
- document::TestDocMan _testDocRepo;
+ std::unique_ptr<DummyStorageLink> _top;
+ ChangedBucketOwnershipHandler* _handler;
+ DummyStorageLink* _bottom;
+ document::TestDocMan _testDocRepo;
// TODO test: down edge triggered on cluster state with cluster down?
@@ -126,11 +128,12 @@ void
ChangedBucketOwnershipHandlerTest::SetUp()
{
using vespa::config::content::PersistenceConfig;
- vdstestlib::DirConfig config(getStandardConfig(true));
- _app.reset(new TestServiceLayerApp);
- _top.reset(new DummyStorageLink);
- _handler = new ChangedBucketOwnershipHandler(*config_from<PersistenceConfig>(config::ConfigUri(config.getConfigId())),
+ _config = StorageConfigSet::make_storage_node_config();
+ _app = std::make_unique<TestServiceLayerApp>(NodeIndex(0), _config->config_uri());
+ _top = std::make_unique<DummyStorageLink>();
+
+ _handler = new ChangedBucketOwnershipHandler(*config_from<PersistenceConfig>(_config->config_uri()),
_app->getComponentRegister());
_top->push_back(std::unique_ptr<StorageLink>(_handler));
_bottom = new DummyStorageLink;
diff --git a/storage/src/tests/storageserver/communicationmanagertest.cpp b/storage/src/tests/storageserver/communicationmanagertest.cpp
index 04322562d08..b741d79582f 100644
--- a/storage/src/tests/storageserver/communicationmanagertest.cpp
+++ b/storage/src/tests/storageserver/communicationmanagertest.cpp
@@ -1,6 +1,7 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <tests/common/dummystoragelink.h>
+#include <tests/common/storage_config_set.h>
#include <tests/common/testhelper.h>
#include <tests/common/teststorageapp.h>
#include <vespa/config/helper/configgetter.hpp>
@@ -65,20 +66,20 @@ wait_for_slobrok_visibility(const CommunicationManager& mgr,
TEST_F(CommunicationManagerTest, simple) {
mbus::Slobrok slobrok;
- vdstestlib::DirConfig distConfig(getStandardConfig(false));
- vdstestlib::DirConfig storConfig(getStandardConfig(true));
- distConfig.getConfig("stor-server").set("node_index", "1");
- storConfig.getConfig("stor-server").set("node_index", "1");
- addSlobrokConfig(distConfig, slobrok);
- addSlobrokConfig(storConfig, slobrok);
+ auto dist_config = StorageConfigSet::make_distributor_node_config();
+ auto stor_config = StorageConfigSet::make_storage_node_config();
+ dist_config->set_node_index(1);
+ stor_config->set_node_index(1);
+ dist_config->set_slobrok_config_port(slobrok.port());
+ stor_config->set_slobrok_config_port(slobrok.port());
+
+ auto& dist_cfg_uri = dist_config->config_uri();
+ auto& stor_cfg_uri = stor_config->config_uri();
// Set up a "distributor" and a "storage" node with communication
// managers and a dummy storage link below we can use for testing.
- TestServiceLayerApp storNode(storConfig.getConfigId());
- TestDistributorApp distNode(distConfig.getConfigId());
-
- auto dist_cfg_uri = config::ConfigUri(distConfig.getConfigId());
- auto stor_cfg_uri = config::ConfigUri(storConfig.getConfigId());
+ TestServiceLayerApp storNode(stor_cfg_uri);
+ TestDistributorApp distNode(dist_cfg_uri);
CommunicationManager distributor(distNode.getComponentRegister(), dist_cfg_uri,
*config_from<CommunicationManagerConfig>(dist_cfg_uri));
@@ -123,23 +124,22 @@ void
CommunicationManagerTest::doTestConfigPropagation(bool isContentNode)
{
mbus::Slobrok slobrok;
- vdstestlib::DirConfig config(getStandardConfig(isContentNode));
- config.getConfig("stor-server").set("node_index", "1");
- auto& cfg = config.getConfig("stor-communicationmanager");
- cfg.set("mbus_content_node_max_pending_count", "12345");
- cfg.set("mbus_content_node_max_pending_size", "555666");
- cfg.set("mbus_distributor_node_max_pending_count", "6789");
- cfg.set("mbus_distributor_node_max_pending_size", "777888");
- addSlobrokConfig(config, slobrok);
+ auto config = StorageConfigSet::make_node_config(isContentNode);
+ config->set_node_index(1);
+ config->set_slobrok_config_port(slobrok.port());
+ config->communication_manager_config().mbusContentNodeMaxPendingCount = 12345;
+ config->communication_manager_config().mbusContentNodeMaxPendingSize = 555666;
+ config->communication_manager_config().mbusDistributorNodeMaxPendingCount = 6789;
+ config->communication_manager_config().mbusDistributorNodeMaxPendingSize = 777888;
+ auto& cfg_uri = config->config_uri();
std::unique_ptr<TestStorageApp> node;
if (isContentNode) {
- node = std::make_unique<TestServiceLayerApp>(config.getConfigId());
+ node = std::make_unique<TestServiceLayerApp>(cfg_uri);
} else {
- node = std::make_unique<TestDistributorApp>(config.getConfigId());
+ node = std::make_unique<TestDistributorApp>(cfg_uri);
}
- auto cfg_uri = config::ConfigUri(config.getConfigId());
CommunicationManager commMgr(node->getComponentRegister(), cfg_uri,
*config_from<CommunicationManagerConfig>(cfg_uri));
auto* storageLink = new DummyStorageLink();
@@ -180,12 +180,12 @@ TEST_F(CommunicationManagerTest, stor_pending_limit_configs_are_propagated_to_me
TEST_F(CommunicationManagerTest, commands_are_dequeued_in_fifo_order) {
mbus::Slobrok slobrok;
- vdstestlib::DirConfig storConfig(getStandardConfig(true));
- storConfig.getConfig("stor-server").set("node_index", "1");
- addSlobrokConfig(storConfig, slobrok);
- TestServiceLayerApp storNode(storConfig.getConfigId());
+ auto config = StorageConfigSet::make_storage_node_config();
+ config->set_node_index(1);
+ config->set_slobrok_config_port(slobrok.port());
+ auto& cfg_uri = config->config_uri();
+ TestServiceLayerApp storNode(cfg_uri);
- auto cfg_uri = config::ConfigUri(storConfig.getConfigId());
CommunicationManager storage(storNode.getComponentRegister(), cfg_uri,
*config_from<CommunicationManagerConfig>(cfg_uri));
auto* storageLink = new DummyStorageLink();
@@ -214,12 +214,12 @@ TEST_F(CommunicationManagerTest, commands_are_dequeued_in_fifo_order) {
TEST_F(CommunicationManagerTest, replies_are_dequeued_in_fifo_order) {
mbus::Slobrok slobrok;
- vdstestlib::DirConfig storConfig(getStandardConfig(true));
- storConfig.getConfig("stor-server").set("node_index", "1");
- addSlobrokConfig(storConfig, slobrok);
- TestServiceLayerApp storNode(storConfig.getConfigId());
+ auto config = StorageConfigSet::make_storage_node_config();
+ config->set_node_index(1);
+ config->set_slobrok_config_port(slobrok.port());
+ auto& cfg_uri = config->config_uri();
+ TestServiceLayerApp storNode(cfg_uri);
- auto cfg_uri = config::ConfigUri(storConfig.getConfigId());
CommunicationManager storage(storNode.getComponentRegister(), cfg_uri,
*config_from<CommunicationManagerConfig>(cfg_uri));
auto* storageLink = new DummyStorageLink();
@@ -249,19 +249,21 @@ struct MockMbusReplyHandler : mbus::IReplyHandler {
};
struct CommunicationManagerFixture {
+ std::unique_ptr<StorageConfigSet> config;
MockMbusReplyHandler reply_handler;
mbus::Slobrok slobrok;
std::unique_ptr<TestServiceLayerApp> node;
std::unique_ptr<CommunicationManager> comm_mgr;
DummyStorageLink* bottom_link;
- CommunicationManagerFixture() {
- vdstestlib::DirConfig stor_config(getStandardConfig(true));
- stor_config.getConfig("stor-server").set("node_index", "1");
- addSlobrokConfig(stor_config, slobrok);
+ CommunicationManagerFixture()
+ : config(StorageConfigSet::make_storage_node_config())
+ {
+ config->set_node_index(1);
+ config->set_slobrok_config_port(slobrok.port());
+ auto& cfg_uri = config->config_uri();
- node = std::make_unique<TestServiceLayerApp>(stor_config.getConfigId());
- auto cfg_uri = config::ConfigUri(stor_config.getConfigId());
+ node = std::make_unique<TestServiceLayerApp>(cfg_uri);
comm_mgr = std::make_unique<CommunicationManager>(node->getComponentRegister(), cfg_uri,
*config_from<CommunicationManagerConfig>(cfg_uri));
bottom_link = new DummyStorageLink();
diff --git a/storage/src/tests/storageserver/documentapiconvertertest.cpp b/storage/src/tests/storageserver/documentapiconvertertest.cpp
index eb4789b25d4..1eb6bf5dd9a 100644
--- a/storage/src/tests/storageserver/documentapiconvertertest.cpp
+++ b/storage/src/tests/storageserver/documentapiconvertertest.cpp
@@ -159,28 +159,46 @@ TEST_F(DocumentApiConverterTest, forwarded_put) {
}
TEST_F(DocumentApiConverterTest, update) {
- auto update = std::make_shared<document::DocumentUpdate>(*_repo, _html_type, defaultDocId);
- documentapi::UpdateDocumentMessage updateMsg(update);
- updateMsg.setOldTimestamp(1234);
- updateMsg.setNewTimestamp(5678);
- updateMsg.setCondition(my_condition);
-
- auto updateCmd = toStorageAPI<api::UpdateCommand>(updateMsg);
- EXPECT_EQ(defaultBucket, updateCmd->getBucket());
- ASSERT_EQ(update.get(), updateCmd->getUpdate().get());
- EXPECT_EQ(api::Timestamp(1234), updateCmd->getOldTimestamp());
- EXPECT_EQ(api::Timestamp(5678), updateCmd->getTimestamp());
- EXPECT_EQ(my_condition, updateCmd->getCondition());
-
- auto mbusReply = updateMsg.createReply();
- ASSERT_TRUE(mbusReply.get());
- toStorageAPI<api::UpdateReply>(*mbusReply, *updateCmd);
-
- auto mbusUpdate = toDocumentAPI<documentapi::UpdateDocumentMessage>(*updateCmd);
- ASSERT_EQ((&mbusUpdate->getDocumentUpdate()), update.get());
- EXPECT_EQ(api::Timestamp(1234), mbusUpdate->getOldTimestamp());
- EXPECT_EQ(api::Timestamp(5678), mbusUpdate->getNewTimestamp());
- EXPECT_EQ(my_condition, mbusUpdate->getCondition());
+ auto do_test_update = [&](bool create_if_missing) {
+ auto update = std::make_shared<document::DocumentUpdate>(*_repo, _html_type, defaultDocId);
+ update->setCreateIfNonExistent(create_if_missing);
+ documentapi::UpdateDocumentMessage updateMsg(update);
+ updateMsg.setOldTimestamp(1234);
+ updateMsg.setNewTimestamp(5678);
+ updateMsg.setCondition(my_condition);
+ EXPECT_FALSE(updateMsg.has_cached_create_if_missing());
+ EXPECT_EQ(updateMsg.create_if_missing(), create_if_missing);
+
+ auto updateCmd = toStorageAPI<api::UpdateCommand>(updateMsg);
+ EXPECT_EQ(defaultBucket, updateCmd->getBucket());
+ ASSERT_EQ(update.get(), updateCmd->getUpdate().get());
+ EXPECT_EQ(api::Timestamp(1234), updateCmd->getOldTimestamp());
+ EXPECT_EQ(api::Timestamp(5678), updateCmd->getTimestamp());
+ EXPECT_EQ(my_condition, updateCmd->getCondition());
+ EXPECT_FALSE(updateCmd->has_cached_create_if_missing());
+ EXPECT_EQ(updateCmd->create_if_missing(), create_if_missing);
+
+ auto mbusReply = updateMsg.createReply();
+ ASSERT_TRUE(mbusReply.get());
+ toStorageAPI<api::UpdateReply>(*mbusReply, *updateCmd);
+
+ auto mbusUpdate = toDocumentAPI<documentapi::UpdateDocumentMessage>(*updateCmd);
+ ASSERT_EQ((&mbusUpdate->getDocumentUpdate()), update.get());
+ EXPECT_EQ(api::Timestamp(1234), mbusUpdate->getOldTimestamp());
+ EXPECT_EQ(api::Timestamp(5678), mbusUpdate->getNewTimestamp());
+ EXPECT_EQ(my_condition, mbusUpdate->getCondition());
+ EXPECT_EQ(mbusUpdate->create_if_missing(), create_if_missing);
+
+ // Cached value of create_if_missing should override underlying update's value
+ updateCmd->set_cached_create_if_missing(!create_if_missing);
+ EXPECT_TRUE(updateCmd->has_cached_create_if_missing());
+ EXPECT_EQ(updateCmd->create_if_missing(), !create_if_missing);
+ mbusUpdate = toDocumentAPI<documentapi::UpdateDocumentMessage>(*updateCmd);
+ EXPECT_TRUE(mbusUpdate->has_cached_create_if_missing());
+ EXPECT_EQ(mbusUpdate->create_if_missing(), !create_if_missing);
+ };
+ do_test_update(false);
+ do_test_update(true);
}
TEST_F(DocumentApiConverterTest, remove) {
diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp
index cdf203b8a39..bc1c7f60706 100644
--- a/storage/src/tests/storageserver/mergethrottlertest.cpp
+++ b/storage/src/tests/storageserver/mergethrottlertest.cpp
@@ -1,10 +1,12 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <tests/common/dummystoragelink.h>
+#include <tests/common/storage_config_set.h>
#include <tests/common/testhelper.h>
#include <tests/common/teststorageapp.h>
#include <vespa/config/helper/configgetter.hpp>
#include <vespa/document/test/make_document_bucket.h>
#include <vespa/messagebus/dynamicthrottlepolicy.h>
+#include <vespa/storage/config/config-stor-server.h>
#include <vespa/storage/persistence/messages.h>
#include <vespa/storage/storageserver/mergethrottler.h>
#include <vespa/storageapi/message/bucket.h>
@@ -36,9 +38,8 @@ using StorServerConfigBuilder = vespa::config::content::core::StorServerConfigBu
vespalib::string _storage("storage");
std::unique_ptr<StorServerConfig> default_server_config() {
- vdstestlib::DirConfig dir_config(getStandardConfig(true));
- auto cfg_uri = ::config::ConfigUri(dir_config.getConfigId());
- return config_from<StorServerConfig>(cfg_uri);
+ auto config = StorageConfigSet::make_storage_node_config();
+ return config_from<StorServerConfig>(config->config_uri());
}
struct MergeBuilder {
@@ -153,8 +154,9 @@ struct MergeThrottlerTest : Test {
static constexpr int _messageWaitTime = 100;
// Using n storage node links and dummy servers
- std::vector<std::shared_ptr<DummyStorageLink> > _topLinks;
- std::vector<std::shared_ptr<TestServiceLayerApp> > _servers;
+ std::unique_ptr<StorageConfigSet> _config;
+ std::vector<std::shared_ptr<DummyStorageLink>> _topLinks;
+ std::vector<std::shared_ptr<TestServiceLayerApp>> _servers;
std::vector<MergeThrottler*> _throttlers;
std::vector<DummyStorageLink*> _bottomLinks;
@@ -198,14 +200,14 @@ MergeThrottlerTest::~MergeThrottlerTest() = default;
void
MergeThrottlerTest::SetUp()
{
- auto config = default_server_config();
+ _config = StorageConfigSet::make_storage_node_config();
for (int i = 0; i < _storageNodeCount; ++i) {
- auto server = std::make_unique<TestServiceLayerApp>(NodeIndex(i));
+ auto server = std::make_unique<TestServiceLayerApp>(NodeIndex(i), _config->config_uri());
server->setClusterState(lib::ClusterState("distributor:100 storage:100 version:1"));
std::unique_ptr<DummyStorageLink> top;
top = std::make_unique<DummyStorageLink>();
- auto* throttler = new MergeThrottler(*config, server->getComponentRegister(), vespalib::HwInfo());
+ auto* throttler = new MergeThrottler(_config->server_config(), server->getComponentRegister(), vespalib::HwInfo());
// MergeThrottler will be sandwiched in between two dummy links
top->push_back(std::unique_ptr<StorageLink>(throttler));
auto* bottom = new DummyStorageLink;
diff --git a/storage/src/tests/storageserver/rpc/cluster_controller_rpc_api_service_test.cpp b/storage/src/tests/storageserver/rpc/cluster_controller_rpc_api_service_test.cpp
index 6e9485e24d4..c3641b9bc56 100644
--- a/storage/src/tests/storageserver/rpc/cluster_controller_rpc_api_service_test.cpp
+++ b/storage/src/tests/storageserver/rpc/cluster_controller_rpc_api_service_test.cpp
@@ -1,5 +1,6 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <tests/common/storage_config_set.h>
#include <vespa/document/bucket/fixed_bucket_spaces.h>
#include <vespa/fnet/frt/rpcrequest.h>
#include <vespa/messagebus/testlib/slobrok.h>
@@ -11,7 +12,6 @@
#include <vespa/storageapi/message/state.h>
#include <vespa/vdslib/state/clusterstate.h>
#include <vespa/vespalib/stllike/asciistream.h>
-#include <tests/common/testhelper.h>
#include <vespa/vespalib/gtest/gtest.h>
#include <vector>
@@ -43,7 +43,7 @@ struct DummyReturnHandler : FRT_IReturnHandler {
struct FixtureBase {
mbus::Slobrok slobrok;
- vdstestlib::DirConfig config;
+ std::unique_ptr<StorageConfigSet> config;
MockOperationDispatcher dispatcher;
std::unique_ptr<SharedRpcResources> shared_rpc_resources;
std::unique_ptr<ClusterControllerApiRpcService> cc_service;
@@ -52,12 +52,12 @@ struct FixtureBase {
FRT_RPCRequest* bound_request{nullptr};
FixtureBase()
- : config(getStandardConfig(true))
+ : config(StorageConfigSet::make_storage_node_config())
{
- config.getConfig("stor-server").set("node_index", "1");
- addSlobrokConfig(config, slobrok);
+ config->set_node_index(1);
+ config->set_slobrok_config_port(slobrok.port());
- shared_rpc_resources = std::make_unique<SharedRpcResources>(config::ConfigUri(config.getConfigId()), 0, 1, 1);
+ shared_rpc_resources = std::make_unique<SharedRpcResources>(config->config_uri(), 0, 1, 1);
cc_service = std::make_unique<ClusterControllerApiRpcService>(dispatcher, *shared_rpc_resources);
shared_rpc_resources->start_server_and_register_slobrok("my_cool_rpc_test");
}
diff --git a/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp b/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp
index 72ddc89f9d3..010f2b441ef 100644
--- a/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp
+++ b/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp
@@ -1,6 +1,6 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <tests/common/testhelper.h>
+#include <tests/common/storage_config_set.h>
#include <vespa/document/base/testdocman.h>
#include <vespa/document/repo/documenttyperepo.h>
#include <vespa/document/fieldvalue/stringfieldvalue.h>
@@ -102,7 +102,7 @@ vespalib::string to_slobrok_id(const api::StorageMessageAddress& address) {
class RpcNode {
protected:
- vdstestlib::DirConfig _config;
+ std::unique_ptr<StorageConfigSet> _config;
std::shared_ptr<const document::DocumentTypeRepo> _doc_type_repo;
LockingMockOperationDispatcher _messages;
std::unique_ptr<MessageCodecProvider> _codec_provider;
@@ -111,17 +111,15 @@ protected:
vespalib::string _slobrok_id;
public:
RpcNode(uint16_t node_index, bool is_distributor, const mbus::Slobrok& slobrok)
- : _config(getStandardConfig(true)),
+ : _config(StorageConfigSet::make_node_config(!is_distributor)),
_doc_type_repo(document::TestDocRepo().getTypeRepoSp()),
_node_address(make_address(node_index, is_distributor)),
_slobrok_id(to_slobrok_id(_node_address))
{
- auto& cfg = _config.getConfig("stor-server");
- cfg.set("node_index", std::to_string(node_index));
- cfg.set("is_distributor", is_distributor ? "true" : "false");
- addSlobrokConfig(_config, slobrok);
+ _config->set_node_index(node_index);
+ _config->set_slobrok_config_port(slobrok.port());
- _shared_rpc_resources = std::make_unique<SharedRpcResources>(config::ConfigUri(_config.getConfigId()), 0, 1, 1);
+ _shared_rpc_resources = std::make_unique<SharedRpcResources>(_config->config_uri(), 0, 1, 1);
// TODO make codec provider into interface so we can test decode-failures more easily?
_codec_provider = std::make_unique<MessageCodecProvider>(_doc_type_repo);
}
diff --git a/storage/src/tests/storageserver/service_layer_error_listener_test.cpp b/storage/src/tests/storageserver/service_layer_error_listener_test.cpp
index 63d8eec6dc3..b84f96dd847 100644
--- a/storage/src/tests/storageserver/service_layer_error_listener_test.cpp
+++ b/storage/src/tests/storageserver/service_layer_error_listener_test.cpp
@@ -1,12 +1,12 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <tests/common/storage_config_set.h>
#include <tests/common/testhelper.h>
#include <tests/common/teststorageapp.h>
#include <vespa/config/helper/configgetter.hpp>
#include <vespa/storage/storageserver/mergethrottler.h>
#include <vespa/storage/storageserver/service_layer_error_listener.h>
#include <vespa/storageframework/defaultimplementation/component/componentregisterimpl.h>
-#include <vespa/vdstestlib/config/dirconfig.h>
#include <vespa/vespalib/gtest/gtest.h>
using namespace ::testing;
@@ -37,10 +37,10 @@ private:
struct Fixture {
using StorServerConfig = vespa::config::content::core::StorServerConfig;
- vdstestlib::DirConfig config{getStandardConfig(true)};
- TestServiceLayerApp app;
+ std::unique_ptr<StorageConfigSet> config{StorageConfigSet::make_storage_node_config()};
+ TestServiceLayerApp app{config->config_uri()};
ServiceLayerComponent component{app.getComponentRegister(), "dummy"};
- MergeThrottler merge_throttler{*config_from<StorServerConfig>(config::ConfigUri(config.getConfigId())),
+ MergeThrottler merge_throttler{*config_from<StorServerConfig>(config->config_uri()),
app.getComponentRegister(), vespalib::HwInfo()};
TestShutdownListener shutdown_listener;
ServiceLayerErrorListener error_listener{component, merge_throttler};
diff --git a/storage/src/tests/storageserver/statemanagertest.cpp b/storage/src/tests/storageserver/statemanagertest.cpp
index 2a5af397aca..79246cb3ce1 100644
--- a/storage/src/tests/storageserver/statemanagertest.cpp
+++ b/storage/src/tests/storageserver/statemanagertest.cpp
@@ -1,13 +1,14 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <tests/common/dummystoragelink.h>
+#include <tests/common/storage_config_set.h>
+#include <tests/common/teststorageapp.h>
+#include <tests/common/testhelper.h>
#include <vespa/storageapi/message/bucket.h>
#include <vespa/storageapi/message/state.h>
#include <vespa/vdslib/state/cluster_state_bundle.h>
#include <vespa/vdslib/state/clusterstate.h>
#include <vespa/storage/storageserver/statemanager.h>
-#include <tests/common/teststorageapp.h>
-#include <tests/common/testhelper.h>
-#include <tests/common/dummystoragelink.h>
#include <vespa/vespalib/data/slime/slime.h>
#include <vespa/vespalib/gtest/gtest.h>
@@ -20,6 +21,7 @@ using namespace ::testing;
namespace storage {
struct StateManagerTest : Test, NodeStateReporter {
+ std::unique_ptr<StorageConfigSet> _config;
std::unique_ptr<TestServiceLayerApp> _node;
std::unique_ptr<DummyStorageLink> _upper;
StateManager* _manager;
@@ -30,7 +32,18 @@ struct StateManagerTest : Test, NodeStateReporter {
void SetUp() override;
void TearDown() override;
- void force_current_cluster_state_version(uint32_t version);
+ static std::shared_ptr<api::SetSystemStateCommand> make_set_state_cmd(vespalib::stringref state_str, uint16_t cc_index) {
+ auto cmd = std::make_shared<api::SetSystemStateCommand>(lib::ClusterState(state_str));
+ cmd->setSourceIndex(cc_index);
+ return cmd;
+ }
+
+ void get_single_reply(std::shared_ptr<api::StorageReply>& reply_out);
+ void get_only_ok_reply(std::shared_ptr<api::StorageReply>& reply_out);
+ void force_current_cluster_state_version(uint32_t version, uint16_t cc_index);
+ void force_current_cluster_state_version(uint32_t version) {
+ force_current_cluster_state_version(version, 0);
+ }
void mark_reported_node_state_up();
void send_down_get_node_state_request(uint16_t controller_index);
void assert_ok_get_node_state_reply_sent_and_clear();
@@ -46,7 +59,8 @@ struct StateManagerTest : Test, NodeStateReporter {
};
StateManagerTest::StateManagerTest()
- : _node(),
+ : _config(),
+ _node(),
_upper(),
_manager(nullptr),
_lower(nullptr)
@@ -56,7 +70,8 @@ StateManagerTest::StateManagerTest()
void
StateManagerTest::SetUp()
{
- _node = std::make_unique<TestServiceLayerApp>(NodeIndex(2));
+ _config = StorageConfigSet::make_storage_node_config();
+ _node = std::make_unique<TestServiceLayerApp>(NodeIndex(2), _config->config_uri());
// Clock will increase 1 sec per call.
_node->getClock().setAbsoluteTimeInSeconds(1);
_upper = std::make_unique<DummyStorageLink>();
@@ -82,11 +97,30 @@ StateManagerTest::TearDown() {
}
void
-StateManagerTest::force_current_cluster_state_version(uint32_t version)
+StateManagerTest::get_single_reply(std::shared_ptr<api::StorageReply>& reply_out)
+{
+ ASSERT_EQ(_upper->getNumReplies(), 1);
+ ASSERT_TRUE(_upper->getReply(0)->getType().isReply());
+ reply_out = std::dynamic_pointer_cast<api::StorageReply>(_upper->getReply(0));
+ ASSERT_TRUE(reply_out.get() != nullptr);
+ _upper->reset();
+}
+
+void
+StateManagerTest::get_only_ok_reply(std::shared_ptr<api::StorageReply>& reply_out)
+{
+ ASSERT_NO_FATAL_FAILURE(get_single_reply(reply_out));
+ ASSERT_EQ(reply_out->getResult(), api::ReturnCode(api::ReturnCode::OK));
+}
+
+void
+StateManagerTest::force_current_cluster_state_version(uint32_t version, uint16_t cc_index)
{
ClusterState state(*_manager->getClusterStateBundle()->getBaselineClusterState());
state.setVersion(version);
- _manager->setClusterStateBundle(lib::ClusterStateBundle(state));
+ const auto maybe_rejected_by_ver = _manager->try_set_cluster_state_bundle(
+ std::make_shared<const lib::ClusterStateBundle>(state), cc_index);
+ ASSERT_EQ(maybe_rejected_by_ver, std::nullopt);
}
void
@@ -114,23 +148,12 @@ StateManagerTest::extract_cluster_state_version_from_host_info(uint32_t& version
version_out = clusterStateVersionCursor.asLong();
}
-#define GET_ONLY_OK_REPLY(varname) \
-{ \
- ASSERT_EQ(size_t(1), _upper->getNumReplies()); \
- ASSERT_TRUE(_upper->getReply(0)->getType().isReply()); \
- varname = std::dynamic_pointer_cast<api::StorageReply>( \
- _upper->getReply(0)); \
- ASSERT_TRUE(varname.get() != nullptr); \
- _upper->reset(); \
- ASSERT_EQ(api::ReturnCode(api::ReturnCode::OK), \
- varname->getResult()); \
-}
-
TEST_F(StateManagerTest, cluster_state) {
std::shared_ptr<api::StorageReply> reply;
// Verify initial state on startup
auto currentState = _manager->getClusterStateBundle()->getBaselineClusterState();
EXPECT_EQ("cluster:d", currentState->toString(false));
+ EXPECT_EQ(currentState->getVersion(), 0);
auto currentNodeState = _manager->getCurrentNodeState();
EXPECT_EQ("s:d", currentNodeState->toString(false));
@@ -138,7 +161,7 @@ TEST_F(StateManagerTest, cluster_state) {
ClusterState sendState("storage:4 .2.s:m");
auto cmd = std::make_shared<api::SetSystemStateCommand>(sendState);
_upper->sendDown(cmd);
- GET_ONLY_OK_REPLY(reply);
+ ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply));
currentState = _manager->getClusterStateBundle()->getBaselineClusterState();
EXPECT_EQ(sendState, *currentState);
@@ -147,13 +170,64 @@ TEST_F(StateManagerTest, cluster_state) {
EXPECT_EQ("s:m", currentNodeState->toString(false));
}
+TEST_F(StateManagerTest, accept_lower_state_versions_if_strict_requirement_disabled) {
+ _manager->set_require_strictly_increasing_cluster_state_versions(false);
+
+ ASSERT_NO_FATAL_FAILURE(force_current_cluster_state_version(123, 1)); // CC 1
+ ASSERT_EQ(_manager->getClusterStateBundle()->getVersion(), 123);
+
+ _upper->sendDown(make_set_state_cmd("version:122 distributor:1 storage:1", 0)); // CC 0
+ std::shared_ptr<api::StorageReply> reply;
+ ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply));
+ EXPECT_EQ(_manager->getClusterStateBundle()->getVersion(), 122);
+}
+
+TEST_F(StateManagerTest, reject_lower_state_versions_if_strict_requirement_enabled) {
+ _manager->set_require_strictly_increasing_cluster_state_versions(true);
+
+ ASSERT_NO_FATAL_FAILURE(force_current_cluster_state_version(123, 1)); // CC 1
+ ASSERT_EQ(_manager->getClusterStateBundle()->getVersion(), 123);
+
+ _upper->sendDown(make_set_state_cmd("version:122 distributor:1 storage:1", 0)); // CC 0
+ std::shared_ptr<api::StorageReply> reply;
+ ASSERT_NO_FATAL_FAILURE(get_single_reply(reply));
+ api::ReturnCode expected_res(api::ReturnCode::REJECTED, "Cluster state version 122 rejected; node already has "
+ "a higher cluster state version (123)");
+ EXPECT_EQ(reply->getResult(), expected_res);
+ EXPECT_EQ(_manager->getClusterStateBundle()->getVersion(), 123);
+}
+
+// Observing a lower cluster state version from the same CC index directly implies that the ZooKeeper
+// state has been lost, at which point we pragmatically (but begrudgingly) accept the state version
+// to avoid stalling the entire cluster for an indeterminate amount of time.
+TEST_F(StateManagerTest, accept_lower_state_versions_from_same_cc_index_even_if_strict_requirement_enabled) {
+ _manager->set_require_strictly_increasing_cluster_state_versions(true);
+
+ ASSERT_NO_FATAL_FAILURE(force_current_cluster_state_version(123, 1)); // CC 1
+ ASSERT_EQ(_manager->getClusterStateBundle()->getVersion(), 123);
+
+ ASSERT_NO_FATAL_FAILURE(force_current_cluster_state_version(124, 2)); // CC 2
+ ASSERT_EQ(_manager->getClusterStateBundle()->getVersion(), 124);
+
+ // CC 1 restarts from scratch with previous ZK state up in smoke.
+ _upper->sendDown(make_set_state_cmd("version:3 distributor:1 storage:1", 1));
+ std::shared_ptr<api::StorageReply> reply;
+ ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply));
+ EXPECT_EQ(_manager->getClusterStateBundle()->getVersion(), 3);
+
+ // CC 2 restarts and continues from where CC 1 left off.
+ _upper->sendDown(make_set_state_cmd("version:4 distributor:1 storage:1", 2));
+ ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply));
+ EXPECT_EQ(_manager->getClusterStateBundle()->getVersion(), 4);
+}
+
namespace {
struct MyStateListener : public StateListener {
const NodeStateUpdater& updater;
lib::NodeState current;
std::ostringstream ost;
- MyStateListener(const NodeStateUpdater& upd);
+ explicit MyStateListener(const NodeStateUpdater& upd);
~MyStateListener() override;
void handleNewState() noexcept override {
@@ -190,7 +264,7 @@ TEST_F(StateManagerTest, reported_node_state) {
// And get node state command (no expected state)
auto cmd = std::make_shared<api::GetNodeStateCommand>(lib::NodeState::UP());
_upper->sendDown(cmd);
- GET_ONLY_OK_REPLY(reply);
+ ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply));
ASSERT_EQ(api::MessageType::GETNODESTATE_REPLY, reply->getType());
nodeState = std::make_shared<NodeState>(
dynamic_cast<api::GetNodeStateReply&>(*reply).getNodeState());
@@ -199,7 +273,7 @@ TEST_F(StateManagerTest, reported_node_state) {
cmd = std::make_shared<api::GetNodeStateCommand>(
std::make_unique<NodeState>(NodeType::STORAGE, State::INITIALIZING));
_upper->sendDown(cmd);
- GET_ONLY_OK_REPLY(reply);
+ ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply));
ASSERT_EQ(api::MessageType::GETNODESTATE_REPLY, reply->getType());
nodeState = std::make_unique<NodeState>(
dynamic_cast<api::GetNodeStateReply&>(*reply).getNodeState());
@@ -218,7 +292,7 @@ TEST_F(StateManagerTest, reported_node_state) {
_manager->setReportedNodeState(ns);
}
- GET_ONLY_OK_REPLY(reply);
+ ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply));
ASSERT_EQ(api::MessageType::GETNODESTATE_REPLY, reply->getType());
nodeState = std::make_unique<NodeState>(
dynamic_cast<api::GetNodeStateReply&>(*reply).getNodeState());
@@ -240,7 +314,7 @@ TEST_F(StateManagerTest, reported_node_state) {
}
TEST_F(StateManagerTest, current_cluster_state_version_is_included_in_host_info_json) {
- force_current_cluster_state_version(123);
+ ASSERT_NO_FATAL_FAILURE(force_current_cluster_state_version(123));
uint32_t version;
ASSERT_NO_FATAL_FAILURE(extract_cluster_state_version_from_host_info(version));
EXPECT_EQ(version, 123);
@@ -262,7 +336,7 @@ void StateManagerTest::send_down_get_node_state_request(uint16_t controller_inde
void StateManagerTest::assert_ok_get_node_state_reply_sent_and_clear() {
ASSERT_EQ(1, _upper->getNumReplies());
std::shared_ptr<api::StorageReply> reply;
- GET_ONLY_OK_REPLY(reply); // Implicitly clears messages from _upper
+ ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply)); // Implicitly clears messages from _upper
ASSERT_EQ(api::MessageType::GETNODESTATE_REPLY, reply->getType());
}
@@ -340,7 +414,7 @@ TEST_F(StateManagerTest, request_almost_immediate_replies_triggers_fast_reply)
}
TEST_F(StateManagerTest, activation_command_is_bounced_with_current_cluster_state_version) {
- force_current_cluster_state_version(12345);
+ ASSERT_NO_FATAL_FAILURE(force_current_cluster_state_version(12345));
auto cmd = std::make_shared<api::ActivateClusterStateVersionCommand>(12340);
cmd->setTimeout(10000000ms);
@@ -349,7 +423,7 @@ TEST_F(StateManagerTest, activation_command_is_bounced_with_current_cluster_stat
ASSERT_EQ(1, _upper->getNumReplies());
std::shared_ptr<api::StorageReply> reply;
- GET_ONLY_OK_REPLY(reply); // Implicitly clears messages from _upper
+ ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply)); // Implicitly clears messages from _upper
ASSERT_EQ(api::MessageType::ACTIVATE_CLUSTER_STATE_VERSION_REPLY, reply->getType());
auto& activate_reply = dynamic_cast<api::ActivateClusterStateVersionReply&>(*reply);
EXPECT_EQ(12340, activate_reply.activateVersion());
@@ -362,7 +436,7 @@ TEST_F(StateManagerTest, non_deferred_cluster_state_sets_reported_cluster_state_
cmd->setSourceIndex(0);
_upper->sendDown(cmd);
std::shared_ptr<api::StorageReply> reply;
- GET_ONLY_OK_REPLY(reply);
+ ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply));
uint32_t version;
ASSERT_NO_FATAL_FAILURE(extract_cluster_state_version_from_host_info(version));
@@ -370,7 +444,7 @@ TEST_F(StateManagerTest, non_deferred_cluster_state_sets_reported_cluster_state_
}
TEST_F(StateManagerTest, deferred_cluster_state_does_not_update_state_until_activation_edge) {
- force_current_cluster_state_version(100);
+ ASSERT_NO_FATAL_FAILURE(force_current_cluster_state_version(100));
lib::ClusterStateBundle deferred_bundle(lib::ClusterState("version:101 distributor:1 storage:1"), {}, true);
auto state_cmd = std::make_shared<api::SetSystemStateCommand>(deferred_bundle);
@@ -378,7 +452,7 @@ TEST_F(StateManagerTest, deferred_cluster_state_does_not_update_state_until_acti
state_cmd->setSourceIndex(0);
_upper->sendDown(state_cmd);
std::shared_ptr<api::StorageReply> reply;
- GET_ONLY_OK_REPLY(reply);
+ ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply));
uint32_t version;
ASSERT_NO_FATAL_FAILURE(extract_cluster_state_version_from_host_info(version));
@@ -388,7 +462,7 @@ TEST_F(StateManagerTest, deferred_cluster_state_does_not_update_state_until_acti
activation_cmd->setTimeout(1000s);
activation_cmd->setSourceIndex(0);
_upper->sendDown(activation_cmd);
- GET_ONLY_OK_REPLY(reply);
+ ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply));
ASSERT_NO_FATAL_FAILURE(extract_cluster_state_version_from_host_info(version));
EXPECT_EQ(version, 101);
diff --git a/storage/src/tests/storageserver/statereportertest.cpp b/storage/src/tests/storageserver/statereportertest.cpp
index 43eb37afe15..29d3daf9b86 100644
--- a/storage/src/tests/storageserver/statereportertest.cpp
+++ b/storage/src/tests/storageserver/statereportertest.cpp
@@ -1,14 +1,15 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <tests/common/dummystoragelink.h>
+#include <tests/common/storage_config_set.h>
+#include <tests/common/teststorageapp.h>
+#include <tests/common/testhelper.h>
#include <vespa/storageframework/defaultimplementation/clock/fakeclock.h>
#include <vespa/storage/persistence/filestorage/filestormanager.h>
#include <vespa/storage/persistence/filestorage/filestormetrics.h>
#include <vespa/storage/storageserver/applicationgenerationfetcher.h>
#include <vespa/storage/storageserver/statereporter.h>
#include <vespa/metrics/metricmanager.h>
-#include <tests/common/teststorageapp.h>
-#include <tests/common/testhelper.h>
-#include <tests/common/dummystoragelink.h>
#include <vespa/config/common/exceptions.h>
#include <vespa/vespalib/data/slime/slime.h>
#include <vespa/vespalib/data/simple_buffer.h>
@@ -35,7 +36,7 @@ struct StateReporterTest : Test {
std::unique_ptr<DummyStorageLink> _top;
DummyApplicationGenerationFether _generationFetcher;
std::unique_ptr<StateReporter> _stateReporter;
- std::unique_ptr<vdstestlib::DirConfig> _config;
+ std::unique_ptr<StorageConfigSet> _config;
std::unique_ptr<metrics::MetricSet> _topSet;
std::unique_ptr<metrics::MetricManager> _metricManager;
std::shared_ptr<FileStorMetrics> _filestorMetrics;
@@ -68,10 +69,8 @@ StateReporterTest::StateReporterTest()
StateReporterTest::~StateReporterTest() = default;
void StateReporterTest::SetUp() {
- _config = std::make_unique<vdstestlib::DirConfig>(getStandardConfig(true, "statereportertest"));
- assert(system(("rm -rf " + getRootFolder(*_config)).c_str()) == 0);
-
- _node = std::make_unique<TestServiceLayerApp>(NodeIndex(0), _config->getConfigId());
+ _config = StorageConfigSet::make_storage_node_config();
+ _node = std::make_unique<TestServiceLayerApp>(NodeIndex(0), _config->config_uri());
_node->setupDummyPersistence();
_clock = &_node->getClock();
_clock->setAbsoluteTimeInSeconds(1000000);
@@ -91,7 +90,7 @@ void StateReporterTest::SetUp() {
_filestorMetrics->initDiskMetrics(1, 1);
_topSet->registerMetric(*_filestorMetrics);
- _metricManager->init(config::ConfigUri(_config->getConfigId()));
+ _metricManager->init(_config->config_uri());
}
void StateReporterTest::TearDown() {
diff --git a/storage/src/tests/visiting/visitormanagertest.cpp b/storage/src/tests/visiting/visitormanagertest.cpp
index 09457038b70..29f065c0157 100644
--- a/storage/src/tests/visiting/visitormanagertest.cpp
+++ b/storage/src/tests/visiting/visitormanagertest.cpp
@@ -1,5 +1,9 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <tests/common/dummystoragelink.h>
+#include <tests/common/storage_config_set.h>
+#include <tests/common/teststorageapp.h>
+#include <tests/common/testhelper.h>
#include <vespa/config/helper/configgetter.hpp>
#include <vespa/document/fieldvalue/intfieldvalue.h>
#include <vespa/document/fieldvalue/stringfieldvalue.h>
@@ -9,9 +13,6 @@
#include <vespa/storage/persistence/filestorage/filestormanager.h>
#include <vespa/storage/visiting/visitormanager.h>
#include <vespa/storageframework/defaultimplementation/clock/realclock.h>
-#include <tests/common/teststorageapp.h>
-#include <tests/common/testhelper.h>
-#include <tests/common/dummystoragelink.h>
#include <vespa/document/test/make_document_bucket.h>
#include <vespa/document/test/make_bucket_space.h>
#include <tests/storageserver/testvisitormessagesession.h>
@@ -45,7 +46,9 @@ api::StorageMessageAddress _address(&_storage, lib::NodeType::STORAGE, 0);
struct VisitorManagerTest : Test {
protected:
static uint32_t docCount;
- std::vector<document::Document::SP > _documents;
+
+ std::unique_ptr<StorageConfigSet> _config;
+ std::vector<document::Document::SP> _documents;
std::unique_ptr<TestVisitorMessageSessionFactory> _messageSessionFactory;
std::unique_ptr<TestServiceLayerApp> _node;
std::unique_ptr<DummyStorageLink> _top;
@@ -82,16 +85,16 @@ uint32_t VisitorManagerTest::docCount = 10;
void
VisitorManagerTest::initializeTest(bool defer_manager_thread_start)
{
- vdstestlib::DirConfig config(getStandardConfig(true));
- config.getConfig("stor-visitor").set("visitorthreads", "1");
+ _config = StorageConfigSet::make_storage_node_config();
+ _config->visitor_config().visitorthreads = 1;
_messageSessionFactory = std::make_unique<TestVisitorMessageSessionFactory>();
- _node = std::make_unique<TestServiceLayerApp>(config.getConfigId());
+ _node = std::make_unique<TestServiceLayerApp>(_config->config_uri());
_node->setupDummyPersistence();
_node->getStateUpdater().setClusterState(std::make_shared<lib::ClusterState>("storage:1 distributor:1"));
_top = std::make_unique<DummyStorageLink>();
using vespa::config::content::core::StorVisitorConfig;
- auto bootstrap_cfg = config_from<StorVisitorConfig>(config::ConfigUri(config.getConfigId()));
+ auto bootstrap_cfg = config_from<StorVisitorConfig>(_config->config_uri());
auto vm = std::make_unique<VisitorManager>(*bootstrap_cfg,
_node->getComponentRegister(),
*_messageSessionFactory,
@@ -100,7 +103,7 @@ VisitorManagerTest::initializeTest(bool defer_manager_thread_start)
_manager = vm.get();
_top->push_back(std::move(vm));
using StorFilestorConfig = vespa::config::content::internal::InternalStorFilestorType;
- auto filestor_cfg = config_from<StorFilestorConfig>(config::ConfigUri(config.getConfigId()));
+ auto filestor_cfg = config_from<StorFilestorConfig>(_config->config_uri());
_top->push_back(std::make_unique<FileStorManager>(*filestor_cfg, _node->getPersistenceProvider(),
_node->getComponentRegister(), *_node, _node->get_host_info()));
_manager->setTimeBetweenTicks(10);
diff --git a/storage/src/tests/visiting/visitortest.cpp b/storage/src/tests/visiting/visitortest.cpp
index f83b6c99d64..075ebd13741 100644
--- a/storage/src/tests/visiting/visitortest.cpp
+++ b/storage/src/tests/visiting/visitortest.cpp
@@ -1,5 +1,9 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <tests/common/dummystoragelink.h>
+#include <tests/common/storage_config_set.h>
+#include <tests/common/testhelper.h>
+#include <tests/common/teststorageapp.h>
#include <vespa/config/common/exceptions.h>
#include <vespa/config/helper/configgetter.hpp>
#include <vespa/document/fieldvalue/intfieldvalue.h>
@@ -14,9 +18,6 @@
#include <vespa/storage/visiting/visitormanager.h>
#include <vespa/storageapi/message/datagram.h>
#include <vespa/storageapi/message/persistence.h>
-#include <tests/common/testhelper.h>
-#include <tests/common/teststorageapp.h>
-#include <tests/common/dummystoragelink.h>
#include <tests/storageserver/testvisitormessagesession.h>
#include <vespa/persistence/spi/docentry.h>
#include <vespa/vespalib/gtest/gtest.h>
@@ -59,6 +60,8 @@ struct TestParams {
struct VisitorTest : Test {
static uint32_t docCount;
+
+ std::unique_ptr<StorageConfigSet> _config;
std::vector<Document::SP> _documents;
std::unique_ptr<TestVisitorMessageSessionFactory> _messageSessionFactory;
std::unique_ptr<TestServiceLayerApp> _node;
@@ -146,31 +149,20 @@ VisitorTest::~VisitorTest() = default;
void
VisitorTest::initializeTest(const TestParams& params)
{
- vdstestlib::DirConfig config(getStandardConfig(true, "visitortest"));
- config.getConfig("stor-visitor").set("visitorthreads", "1");
- config.getConfig("stor-visitor").set(
- "defaultparalleliterators",
- std::to_string(params._parallelBuckets));
- config.getConfig("stor-visitor").set(
- "visitor_memory_usage_limit",
- std::to_string(params._maxVisitorMemoryUsage));
-
- std::string rootFolder = getRootFolder(config);
-
- ::chmod(rootFolder.c_str(), 0755);
- std::filesystem::remove_all(std::filesystem::path(rootFolder));
- std::filesystem::create_directories(std::filesystem::path(vespalib::make_string("%s/disks/d0", rootFolder.c_str())));
- std::filesystem::create_directories(std::filesystem::path(vespalib::make_string("%s/disks/d1", rootFolder.c_str())));
+ _config = StorageConfigSet::make_storage_node_config();
+ _config->visitor_config().visitorthreads = 1;
+ _config->visitor_config().defaultparalleliterators = params._parallelBuckets;
+ _config->visitor_config().visitorMemoryUsageLimit = params._maxVisitorMemoryUsage;
_messageSessionFactory = std::make_unique<TestVisitorMessageSessionFactory>();
if (params._autoReplyError.getCode() != mbus::ErrorCode::NONE) {
_messageSessionFactory->_autoReplyError = params._autoReplyError;
_messageSessionFactory->_createAutoReplyVisitorSessions = true;
}
- _node = std::make_unique<TestServiceLayerApp>(config.getConfigId());
+ _node = std::make_unique<TestServiceLayerApp>(_config->config_uri());
_top = std::make_unique<DummyStorageLink>();
using vespa::config::content::core::StorVisitorConfig;
- auto bootstrap_cfg = config_from<StorVisitorConfig>(config::ConfigUri(config.getConfigId()));
+ auto bootstrap_cfg = config_from<StorVisitorConfig>(_config->config_uri());
_top->push_back(std::unique_ptr<StorageLink>(_manager
= new VisitorManager(*bootstrap_cfg, _node->getComponentRegister(), *_messageSessionFactory)));
_bottom = new DummyStorageLink();
@@ -217,14 +209,12 @@ VisitorTest::initializeTest(const TestParams& params)
_documents.clear();
for (uint32_t i=0; i<docCount; ++i) {
std::ostringstream uri;
- uri << "id:test:testdoctype1:n=" << i % 10 << ":http://www.ntnu.no/"
- << i << ".html";
+ uri << "id:test:testdoctype1:n=" << i % 10 << ":http://www.ntnu.no/" << i << ".html";
_documents.push_back(Document::SP(
_node->getTestDocMan().createDocument(content, uri.str())));
const document::DocumentType& type(_documents.back()->getType());
- _documents.back()->setValue(type.getField("headerval"),
- document::IntFieldValue(i % 4));
+ _documents.back()->setValue(type.getField("headerval"), document::IntFieldValue(i % 4));
}
}
diff --git a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp
index 5337be6d79f..1f20a19ec51 100644
--- a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp
+++ b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp
@@ -2,26 +2,26 @@
#include "bucketmanager.h"
#include "minimumusedbitstracker.h"
-#include <iomanip>
+#include <vespa/config/helper/configgetter.hpp>
+#include <vespa/document/bucket/fixed_bucket_spaces.h>
+#include <vespa/metrics/jsonwriter.h>
#include <vespa/storage/common/content_bucket_space_repo.h>
#include <vespa/storage/common/nodestateupdater.h>
-#include <vespa/storage/common/global_bucket_space_distribution_converter.h>
-#include <vespa/vdslib/state/cluster_state_bundle.h>
#include <vespa/storage/config/config-stor-server.h>
#include <vespa/storage/storageutil/distributorstatecache.h>
+#include <vespa/storageapi/message/bucketsplitting.h>
+#include <vespa/storageapi/message/persistence.h>
+#include <vespa/storageapi/message/stat.h>
+#include <vespa/storageapi/message/state.h>
+#include <vespa/storageframework/generic/clock/timer.h>
#include <vespa/storageframework/generic/status/htmlstatusreporter.h>
#include <vespa/storageframework/generic/status/xmlstatusreporter.h>
#include <vespa/storageframework/generic/thread/thread.h>
-#include <vespa/storageframework/generic/clock/timer.h>
-#include <vespa/storageapi/message/persistence.h>
-#include <vespa/storageapi/message/state.h>
-#include <vespa/storageapi/message/bucketsplitting.h>
-#include <vespa/storageapi/message/stat.h>
-#include <vespa/metrics/jsonwriter.h>
-#include <vespa/document/bucket/fixed_bucket_spaces.h>
+#include <vespa/vdslib/distribution/global_bucket_space_distribution_converter.h>
+#include <vespa/vdslib/state/cluster_state_bundle.h>
#include <vespa/vespalib/util/stringfmt.h>
+#include <iomanip>
#include <ranges>
-#include <vespa/config/helper/configgetter.hpp>
#include <chrono>
#include <thread>
@@ -526,6 +526,7 @@ BucketManager::processRequestBucketInfoCommands(document::BucketSpace bucketSpac
using RBISP = std::shared_ptr<api::RequestBucketInfoCommand>;
std::map<uint16_t, RBISP> requests;
+ // TODO fetch distribution from bundle as well
auto distribution(_component.getBucketSpaceRepo().get(bucketSpace).getDistribution());
auto clusterStateBundle(_component.getStateUpdater().getClusterStateBundle());
assert(clusterStateBundle);
diff --git a/storage/src/vespa/storage/common/CMakeLists.txt b/storage/src/vespa/storage/common/CMakeLists.txt
index 708f3dd05b9..a385867ac98 100644
--- a/storage/src/vespa/storage/common/CMakeLists.txt
+++ b/storage/src/vespa/storage/common/CMakeLists.txt
@@ -5,7 +5,6 @@ vespa_add_library(storage_common OBJECT
content_bucket_space.cpp
content_bucket_space_repo.cpp
distributorcomponent.cpp
- global_bucket_space_distribution_converter.cpp
messagebucket.cpp
message_guard.cpp
messagesender.cpp
diff --git a/storage/src/vespa/storage/common/content_bucket_space.cpp b/storage/src/vespa/storage/common/content_bucket_space.cpp
index 0cedb78cfe6..92b5257b991 100644
--- a/storage/src/vespa/storage/common/content_bucket_space.cpp
+++ b/storage/src/vespa/storage/common/content_bucket_space.cpp
@@ -4,44 +4,76 @@
namespace storage {
+ClusterStateAndDistribution::ClusterStateAndDistribution(
+ std::shared_ptr<const lib::ClusterState> cluster_state,
+ std::shared_ptr<const lib::Distribution> distribution) noexcept
+ : _cluster_state(std::move(cluster_state)),
+ _distribution(std::move(distribution))
+{
+}
+
+ClusterStateAndDistribution::~ClusterStateAndDistribution() = default;
+
+std::shared_ptr<const ClusterStateAndDistribution>
+ClusterStateAndDistribution::with_new_state(std::shared_ptr<const lib::ClusterState> cluster_state) const {
+ return std::make_shared<const ClusterStateAndDistribution>(std::move(cluster_state), _distribution);
+}
+
+std::shared_ptr<const ClusterStateAndDistribution>
+ClusterStateAndDistribution::with_new_distribution(std::shared_ptr<const lib::Distribution> distribution) const {
+ return std::make_shared<const ClusterStateAndDistribution>(_cluster_state, std::move(distribution));
+}
+
ContentBucketSpace::ContentBucketSpace(document::BucketSpace bucketSpace,
const ContentBucketDbOptions& db_opts)
: _bucketSpace(bucketSpace),
_bucketDatabase(db_opts),
_lock(),
- _clusterState(),
- _distribution(),
+ _state_and_distribution(std::make_shared<ClusterStateAndDistribution>()),
_nodeUpInLastNodeStateSeenByProvider(false),
_nodeMaintenanceInLastNodeStateSeenByProvider(false)
{
}
void
+ContentBucketSpace::set_state_and_distribution(std::shared_ptr<const ClusterStateAndDistribution> state_and_distr) noexcept {
+ assert(state_and_distr);
+ std::lock_guard guard(_lock);
+ _state_and_distribution = std::move(state_and_distr);
+}
+
+std::shared_ptr<const ClusterStateAndDistribution>
+ContentBucketSpace::state_and_distribution() const noexcept {
+ std::lock_guard guard(_lock);
+ return _state_and_distribution;
+}
+
+void
ContentBucketSpace::setClusterState(std::shared_ptr<const lib::ClusterState> clusterState)
{
std::lock_guard guard(_lock);
- _clusterState = std::move(clusterState);
+ _state_and_distribution = _state_and_distribution->with_new_state(std::move(clusterState));
}
std::shared_ptr<const lib::ClusterState>
ContentBucketSpace::getClusterState() const
{
std::lock_guard guard(_lock);
- return _clusterState;
+ return _state_and_distribution->_cluster_state;
}
void
ContentBucketSpace::setDistribution(std::shared_ptr<const lib::Distribution> distribution)
{
std::lock_guard guard(_lock);
- _distribution = std::move(distribution);
+ _state_and_distribution = _state_and_distribution->with_new_distribution(std::move(distribution));
}
std::shared_ptr<const lib::Distribution>
ContentBucketSpace::getDistribution() const
{
std::lock_guard guard(_lock);
- return _distribution;
+ return _state_and_distribution->_distribution;
}
bool
diff --git a/storage/src/vespa/storage/common/content_bucket_space.h b/storage/src/vespa/storage/common/content_bucket_space.h
index 93b171bd48e..eb48640c97b 100644
--- a/storage/src/vespa/storage/common/content_bucket_space.h
+++ b/storage/src/vespa/storage/common/content_bucket_space.h
@@ -12,6 +12,27 @@ class ClusterState;
class Distribution;
}
+struct ClusterStateAndDistribution {
+ std::shared_ptr<const lib::ClusterState> _cluster_state;
+ std::shared_ptr<const lib::Distribution> _distribution;
+
+ ClusterStateAndDistribution() = default;
+ ClusterStateAndDistribution(std::shared_ptr<const lib::ClusterState> cluster_state,
+ std::shared_ptr<const lib::Distribution> distribution) noexcept;
+ ~ClusterStateAndDistribution();
+
+ [[nodiscard]] bool valid() const noexcept { return _cluster_state && _distribution; }
+
+ // Precondition: valid() == true
+ [[nodiscard]] const lib::ClusterState& cluster_state() const noexcept { return *_cluster_state; }
+ [[nodiscard]] const lib::Distribution& distribution() const noexcept { return *_distribution; }
+
+ [[nodiscard]] std::shared_ptr<const ClusterStateAndDistribution> with_new_state(
+ std::shared_ptr<const lib::ClusterState> cluster_state) const;
+ [[nodiscard]] std::shared_ptr<const ClusterStateAndDistribution> with_new_distribution(
+ std::shared_ptr<const lib::Distribution> distribution) const;
+};
+
/**
* Class representing a bucket space (with associated bucket database) on a content node.
*/
@@ -20,8 +41,7 @@ private:
document::BucketSpace _bucketSpace;
StorBucketDatabase _bucketDatabase;
mutable std::mutex _lock;
- std::shared_ptr<const lib::ClusterState> _clusterState;
- std::shared_ptr<const lib::Distribution> _distribution;
+ std::shared_ptr<const ClusterStateAndDistribution> _state_and_distribution;
bool _nodeUpInLastNodeStateSeenByProvider;
bool _nodeMaintenanceInLastNodeStateSeenByProvider;
@@ -31,10 +51,18 @@ public:
document::BucketSpace bucketSpace() const noexcept { return _bucketSpace; }
StorBucketDatabase &bucketDatabase() { return _bucketDatabase; }
+
+ void set_state_and_distribution(std::shared_ptr<const ClusterStateAndDistribution> state_and_distr) noexcept;
+ [[nodiscard]] std::shared_ptr<const ClusterStateAndDistribution> state_and_distribution() const noexcept;
+ // TODO deprecate; only use atomic state+distribution setter
void setClusterState(std::shared_ptr<const lib::ClusterState> clusterState);
+ // TODO deprecate; only use atomic state+distribution getter
std::shared_ptr<const lib::ClusterState> getClusterState() const;
+ // TODO deprecate; only use atomic state+distribution setter
void setDistribution(std::shared_ptr<const lib::Distribution> distribution);
+ // TODO deprecate; only use atomic state+distribution getter
std::shared_ptr<const lib::Distribution> getDistribution() const;
+
bool getNodeUpInLastNodeStateSeenByProvider() const;
void setNodeUpInLastNodeStateSeenByProvider(bool nodeUpInLastNodeStateSeenByProvider);
bool getNodeMaintenanceInLastNodeStateSeenByProvider() const;
diff --git a/storage/src/vespa/storage/common/global_bucket_space_distribution_converter.cpp b/storage/src/vespa/storage/common/global_bucket_space_distribution_converter.cpp
deleted file mode 100644
index eb42f19a5e8..00000000000
--- a/storage/src/vespa/storage/common/global_bucket_space_distribution_converter.cpp
+++ /dev/null
@@ -1,181 +0,0 @@
-// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "global_bucket_space_distribution_converter.h"
-#include <vespa/vdslib/distribution/distribution.h>
-#include <vespa/config/print/asciiconfigwriter.h>
-#include <vespa/config/print/asciiconfigreader.hpp>
-#include <vespa/vdslib/distribution/distribution_config_util.h>
-#include <vespa/vespalib/stllike/asciistream.h>
-#include <cassert>
-#include <map>
-
-namespace storage {
-
-using DistributionConfig = vespa::config::content::StorDistributionConfig;
-using DistributionConfigBuilder = vespa::config::content::StorDistributionConfigBuilder;
-
-namespace {
-
-struct Group {
- uint16_t nested_leaf_count{0};
- std::map<uint16_t, std::unique_ptr<Group>> sub_groups;
-};
-
-void set_distribution_invariant_config_fields(DistributionConfigBuilder& builder) {
- builder.activePerLeafGroup = true;
- // TODO consider how to best support n-of-m replication for global docs
- builder.ensurePrimaryPersisted = true;
- builder.initialRedundancy = 0;
-}
-
-const Group& find_non_root_group_by_index(const vespalib::string& index, const Group& root) {
- auto path = lib::DistributionConfigUtil::getGroupPath(index);
- auto* node = &root;
- for (auto idx : path) {
- auto child_iter = node->sub_groups.find(idx);
- assert(child_iter != node->sub_groups.end());
- node = child_iter->second.get();
- }
- return *node;
-}
-
-vespalib::string sub_groups_to_partition_spec(const Group& parent) {
- if (parent.sub_groups.empty()) {
- return "*";
- }
- vespalib::asciistream spec;
- // We simplify the generated partition spec by only emitting wildcard entries.
- // These will have replicas evenly divided amongst them.
- for (size_t i = 0; i < parent.sub_groups.size(); ++i) {
- if (i != 0) {
- spec << '|';
- }
- spec << '*';
- }
- return spec.str();
-}
-
-bool is_leaf_group(const DistributionConfigBuilder::Group& g) noexcept {
- return !g.nodes.empty();
-}
-
-void insert_new_group_into_tree(
- std::unique_ptr<Group> new_group,
- const DistributionConfigBuilder::Group& config_source_group,
- Group& root) {
- const auto path = lib::DistributionConfigUtil::getGroupPath(config_source_group.index);
- assert(!path.empty());
-
- Group* parent = &root;
- for (size_t i = 0; i < path.size(); ++i) {
- const auto idx = path[i];
- parent->nested_leaf_count += config_source_group.nodes.size(); // Empty if added group is not a leaf.
- auto g_iter = parent->sub_groups.find(idx);
- if (g_iter != parent->sub_groups.end()) {
- assert(i != path.size() - 1);
- parent = g_iter->second.get();
- } else {
- assert(i == path.size() - 1); // Only valid case for last item in path.
- parent->sub_groups.emplace(path.back(), std::move(new_group));
- }
- }
-}
-
-void build_transformed_root_group(DistributionConfigBuilder& builder,
- const DistributionConfigBuilder::Group& config_source_root,
- const Group& parsed_root) {
- DistributionConfigBuilder::Group new_root(config_source_root);
- new_root.partitions = sub_groups_to_partition_spec(parsed_root);
- builder.group.emplace_back(std::move(new_root));
-}
-
-void build_transformed_non_root_group(DistributionConfigBuilder& builder,
- const DistributionConfigBuilder::Group& config_source_group,
- const Group& parsed_root) {
- DistributionConfigBuilder::Group new_group(config_source_group);
- if (!is_leaf_group(config_source_group)) { // Partition specs only apply to inner nodes
- const auto& g = find_non_root_group_by_index(config_source_group.index, parsed_root);
- new_group.partitions = sub_groups_to_partition_spec(g);
- }
- builder.group.emplace_back(std::move(new_group));
-}
-
-std::unique_ptr<Group> create_group_tree_from_config(const DistributionConfig& source) {
- std::unique_ptr<Group> root;
- for (auto& g : source.group) {
- auto new_group = std::make_unique<Group>();
- assert(g.nodes.size() < UINT16_MAX);
- new_group->nested_leaf_count = static_cast<uint16_t>(g.nodes.size());
- if (root) {
- insert_new_group_into_tree(std::move(new_group), g, *root);
- } else {
- root = std::move(new_group);
- }
- }
- return root;
-}
-
-/* Even though groups are inherently hierarchical, the config is a flat array with a
- * hierarchy bolted on through the use of (more or less) "multi-dimensional" index strings.
- * Index string of root group is always "invalid" (or possibly some other string that cannot
- * be interpreted as a dot-separated tree node path). Other groups have an index of the
- * form "X.Y.Z", where Z is the group's immediate parent index, Y is Z's parent and so on. Just
- * stating Z itself is not sufficient to uniquely identify the group, as group indices are
- * not unique _across_ groups. For indices "0.1" and "1.1", the trailing "1" refers to 2
- * distinct groups, as they have different parents.
- *
- * It may be noted that the group index strings do _not_ include the root group, so we
- * have to always implicitly include it ourselves.
- *
- * Config groups are ordered so that when a group is encountered, all its parents (and
- * transitively, its parents again etc) have already been processed. This directly
- * implies that the root group is always the first group present in the config.
- */
-void build_global_groups(DistributionConfigBuilder& builder, const DistributionConfig& source) {
- assert(!source.group.empty()); // TODO gracefully handle empty config?
- auto root = create_group_tree_from_config(source);
-
- auto g_iter = source.group.begin();
- const auto g_end = source.group.end();
- build_transformed_root_group(builder, *g_iter, *root);
- ++g_iter;
- for (; g_iter != g_end; ++g_iter) {
- build_transformed_non_root_group(builder, *g_iter, *root);
- }
-
- builder.redundancy = root->nested_leaf_count;
- builder.readyCopies = builder.redundancy;
-}
-
-} // anon ns
-
-std::shared_ptr<DistributionConfig>
-GlobalBucketSpaceDistributionConverter::convert_to_global(const DistributionConfig& source) {
- DistributionConfigBuilder builder;
- set_distribution_invariant_config_fields(builder);
- build_global_groups(builder, source);
- return std::make_shared<DistributionConfig>(builder);
-}
-
-std::shared_ptr<lib::Distribution>
-GlobalBucketSpaceDistributionConverter::convert_to_global(const lib::Distribution& distr) {
- const auto src_config = distr.serialize();
- auto global_config = convert_to_global(*string_to_config(src_config));
- return std::make_shared<lib::Distribution>(*global_config);
-}
-
-std::unique_ptr<DistributionConfig>
-GlobalBucketSpaceDistributionConverter::string_to_config(const vespalib::string& cfg) {
- vespalib::asciistream iss(cfg);
- config::AsciiConfigReader<vespa::config::content::StorDistributionConfig> reader(iss);
- return reader.read();
-}
-
-vespalib::string GlobalBucketSpaceDistributionConverter::config_to_string(const DistributionConfig& cfg) {
- vespalib::asciistream ost;
- config::AsciiConfigWriter writer(ost);
- writer.write(cfg);
- return ost.str();
-}
-
-}
diff --git a/storage/src/vespa/storage/common/global_bucket_space_distribution_converter.h b/storage/src/vespa/storage/common/global_bucket_space_distribution_converter.h
deleted file mode 100644
index c530922ad18..00000000000
--- a/storage/src/vespa/storage/common/global_bucket_space_distribution_converter.h
+++ /dev/null
@@ -1,22 +0,0 @@
-// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#pragma once
-
-#include <vespa/config-stor-distribution.h>
-#include <memory>
-
-namespace storage::lib { class Distribution; }
-namespace storage {
-
-struct GlobalBucketSpaceDistributionConverter {
- using DistributionConfig = vespa::config::content::StorDistributionConfig;
-
- static std::shared_ptr<DistributionConfig> convert_to_global(const DistributionConfig&);
- static std::shared_ptr<lib::Distribution> convert_to_global(const lib::Distribution&);
-
- // Helper functions which may be of use outside this class
- static std::unique_ptr<DistributionConfig> string_to_config(const vespalib::string&);
- static vespalib::string config_to_string(const DistributionConfig&);
-};
-
-}
diff --git a/storage/src/vespa/storage/config/distributorconfiguration.cpp b/storage/src/vespa/storage/config/distributorconfiguration.cpp
index 6957e541d6b..cfbc4caf82d 100644
--- a/storage/src/vespa/storage/config/distributorconfiguration.cpp
+++ b/storage/src/vespa/storage/config/distributorconfiguration.cpp
@@ -46,6 +46,7 @@ DistributorConfiguration::DistributorConfiguration(StorageComponent& component)
_use_weak_internal_read_consistency_for_client_gets(false),
_enable_metadata_only_fetch_phase_for_inconsistent_updates(true),
_enable_operation_cancellation(false),
+ _symmetric_put_and_activate_replica_selection(false),
_minimumReplicaCountingMode(ReplicaCountingMode::TRUSTED)
{
}
@@ -150,6 +151,7 @@ DistributorConfiguration::configure(const DistributorManagerConfig & config)
_max_activation_inhibited_out_of_sync_groups = config.maxActivationInhibitedOutOfSyncGroups;
_enable_operation_cancellation = config.enableOperationCancellation;
_minimumReplicaCountingMode = deriveReplicaCountingMode(config.minimumReplicaCountingMode);
+ _symmetric_put_and_activate_replica_selection = config.symmetricPutAndActivateReplicaSelection;
if (config.maxClusterClockSkewSec >= 0) {
_maxClusterClockSkew = std::chrono::seconds(config.maxClusterClockSkewSec);
diff --git a/storage/src/vespa/storage/config/distributorconfiguration.h b/storage/src/vespa/storage/config/distributorconfiguration.h
index 38fac13150c..2b73fdc0fa1 100644
--- a/storage/src/vespa/storage/config/distributorconfiguration.h
+++ b/storage/src/vespa/storage/config/distributorconfiguration.h
@@ -234,8 +234,11 @@ public:
[[nodiscard]] bool enable_operation_cancellation() const noexcept {
return _enable_operation_cancellation;
}
+ [[nodiscard]] bool symmetric_put_and_activate_replica_selection() const noexcept {
+ return _symmetric_put_and_activate_replica_selection;
+ }
- bool containsTimeStatement(const std::string& documentSelection) const;
+ [[nodiscard]] bool containsTimeStatement(const std::string& documentSelection) const;
private:
StorageComponent& _component;
@@ -276,6 +279,7 @@ private:
bool _use_weak_internal_read_consistency_for_client_gets;
bool _enable_metadata_only_fetch_phase_for_inconsistent_updates; //TODO Rewrite tests and GC
bool _enable_operation_cancellation;
+ bool _symmetric_put_and_activate_replica_selection;
ReplicaCountingMode _minimumReplicaCountingMode;
};
diff --git a/storage/src/vespa/storage/config/stor-distributormanager.def b/storage/src/vespa/storage/config/stor-distributormanager.def
index 3f6028d7fa1..a4d4461ba68 100644
--- a/storage/src/vespa/storage/config/stor-distributormanager.def
+++ b/storage/src/vespa/storage/config/stor-distributormanager.def
@@ -159,6 +159,13 @@ num_distributor_stripes int default=0 restart
## requests partially or fully "invalidated" by such a change.
enable_operation_cancellation bool default=false
+## Iff true there will be an 1-1 symmetry between the replicas chosen as feed targets
+## for Put operations and the replica selection logic for bucket activation. In particular,
+## the most preferred replica for feed will be the most preferred bucket for activation.
+## This helps ensure that new versions of documents are routed to replicas that are most
+## likely to reflect these changes as part of visible search results.
+symmetric_put_and_activate_replica_selection bool default=false
+
## TODO GC very soon, it has no effect.
priority_merge_out_of_sync_copies int default=120
diff --git a/storage/src/vespa/storage/config/stor-server.def b/storage/src/vespa/storage/config/stor-server.def
index 8cd204bcf9f..49ce0b678d0 100644
--- a/storage/src/vespa/storage/config/stor-server.def
+++ b/storage/src/vespa/storage/config/stor-server.def
@@ -114,3 +114,15 @@ simulated_bucket_request_latency_msec int default=0
## a disjoint subset of the node's buckets, in order to reduce locking contention.
## Max value is unspecified, but will be clamped internally.
content_node_bucket_db_stripe_bits int default=4 restart
+
+## Iff set, a special `pidfile` file is written under the node's root directory upon
+## startup containing the PID of the running process.
+write_pid_file_on_startup bool default=true
+
+## Iff true, received cluster state versions that are lower than the current active
+## (or pending to be active) version on the node will be explicitly rejected. This
+## prevents race conditions caused by multiple cluster controllers believing they
+## are the leader during overlapping time intervals, as only the most recent leader
+## is able to increment the current state version in ZooKeeper, but the old controller
+## may still attempt to publish its old state.
+require_strictly_increasing_cluster_state_versions bool default=false
diff --git a/storage/src/vespa/storage/distributor/CMakeLists.txt b/storage/src/vespa/storage/distributor/CMakeLists.txt
index 195410cbe03..212570a3033 100644
--- a/storage/src/vespa/storage/distributor/CMakeLists.txt
+++ b/storage/src/vespa/storage/distributor/CMakeLists.txt
@@ -5,7 +5,6 @@ vespa_add_library(storage_distributor OBJECT
blockingoperationstarter.cpp
bucket_db_prune_elision.cpp
bucket_ownership_calculator.cpp
- bucket_space_distribution_configs.cpp
bucket_space_distribution_context.cpp
bucket_space_state_map.cpp
bucket_spaces_stats_provider.cpp
diff --git a/storage/src/vespa/storage/distributor/activecopy.cpp b/storage/src/vespa/storage/distributor/activecopy.cpp
index 35070bcee3b..b823978a0cc 100644
--- a/storage/src/vespa/storage/distributor/activecopy.cpp
+++ b/storage/src/vespa/storage/distributor/activecopy.cpp
@@ -108,6 +108,7 @@ buildNodeList(const BucketDatabase::Entry& e,vespalib::ConstArrayRef<uint16_t> n
struct ActiveStateOrder {
bool operator()(const ActiveCopy & e1, const ActiveCopy & e2) noexcept {
+ // Replica selection order should be kept in sync with OperationTargetResolverImpl's InstanceOrder.
if (e1._ready != e2._ready) {
return e1._ready;
}
@@ -120,7 +121,9 @@ struct ActiveStateOrder {
if (e1._active != e2._active) {
return e1._active;
}
- return e1.nodeIndex() < e2.nodeIndex();
+ // Use _entry_ order instead of node index, as it is in ideal state order (even for retired
+ // nodes), which avoids unintentional affinities towards lower node indexes.
+ return e1.entryIndex() < e2.entryIndex();
}
};
diff --git a/storage/src/vespa/storage/distributor/bucket_space_distribution_configs.cpp b/storage/src/vespa/storage/distributor/bucket_space_distribution_configs.cpp
deleted file mode 100644
index 37bf8f01752..00000000000
--- a/storage/src/vespa/storage/distributor/bucket_space_distribution_configs.cpp
+++ /dev/null
@@ -1,17 +0,0 @@
-// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include "bucket_space_distribution_configs.h"
-#include <vespa/document/bucket/fixed_bucket_spaces.h>
-#include <vespa/storage/common/global_bucket_space_distribution_converter.h>
-#include <vespa/vdslib/distribution/distribution.h>
-
-namespace storage::distributor {
-
-BucketSpaceDistributionConfigs
-BucketSpaceDistributionConfigs::from_default_distribution(std::shared_ptr<const lib::Distribution> distribution) {
- BucketSpaceDistributionConfigs ret;
- ret.space_configs.emplace(document::FixedBucketSpaces::global_space(), GlobalBucketSpaceDistributionConverter::convert_to_global(*distribution));
- ret.space_configs.emplace(document::FixedBucketSpaces::default_space(), std::move(distribution));
- return ret;
-}
-
-}
diff --git a/storage/src/vespa/storage/distributor/bucket_space_distribution_configs.h b/storage/src/vespa/storage/distributor/bucket_space_distribution_configs.h
deleted file mode 100644
index cddd21d579f..00000000000
--- a/storage/src/vespa/storage/distributor/bucket_space_distribution_configs.h
+++ /dev/null
@@ -1,27 +0,0 @@
-// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#pragma once
-
-#include <vespa/document/bucket/bucketspace.h>
-#include <map>
-#include <memory>
-
-namespace storage::lib { class Distribution; }
-
-namespace storage::distributor {
-
-/**
- * Represents a complete mapping of all known bucket spaces to their appropriate,
- * (possibly derived) distribution config.
- */
-struct BucketSpaceDistributionConfigs {
- std::map<document::BucketSpace, std::shared_ptr<const lib::Distribution>> space_configs;
-
- std::shared_ptr<const lib::Distribution> get_or_nullptr(document::BucketSpace space) const noexcept {
- auto iter = space_configs.find(space);
- return (iter != space_configs.end()) ? iter->second : std::shared_ptr<const lib::Distribution>();
- }
-
- static BucketSpaceDistributionConfigs from_default_distribution(std::shared_ptr<const lib::Distribution>);
-};
-
-}
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
index c00ab7080da..f8abdf78c2b 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe.cpp
+++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
@@ -12,7 +12,6 @@
#include "stripe_host_info_notifier.h"
#include "throttlingoperationstarter.h"
#include <vespa/document/bucket/fixed_bucket_spaces.h>
-#include <vespa/storage/common/global_bucket_space_distribution_converter.h>
#include <vespa/storage/common/node_identity.h>
#include <vespa/storage/common/nodestateupdater.h>
#include <vespa/storage/distributor/maintenance/simplebucketprioritydatabase.h>
@@ -21,6 +20,7 @@
#include <vespa/storage/config/distributorconfiguration.h>
#include <vespa/storageframework/generic/status/xmlstatusreporter.h>
#include <vespa/vdslib/distribution/distribution.h>
+#include <vespa/vdslib/distribution/global_bucket_space_distribution_converter.h>
#include <vespa/vespalib/util/memoryusage.h>
#include <algorithm>
@@ -545,7 +545,7 @@ DistributorStripe::checkBucketForSplit(document::BucketSpace bucketSpace, const
void
DistributorStripe::propagateDefaultDistribution(std::shared_ptr<const lib::Distribution> distribution)
{
- auto global_distr = GlobalBucketSpaceDistributionConverter::convert_to_global(*distribution);
+ auto global_distr = lib::GlobalBucketSpaceDistributionConverter::convert_to_global(*distribution);
for (auto* repo : {_bucketSpaceRepo.get(), _readOnlyBucketSpaceRepo.get()}) {
repo->get(document::FixedBucketSpaces::default_space()).setDistribution(distribution);
repo->get(document::FixedBucketSpaces::global_space()).setDistribution(global_distr);
@@ -554,7 +554,7 @@ DistributorStripe::propagateDefaultDistribution(std::shared_ptr<const lib::Distr
// Only called when stripe is in rendezvous freeze
void
-DistributorStripe::update_distribution_config(const BucketSpaceDistributionConfigs& new_configs) {
+DistributorStripe::update_distribution_config(const lib::BucketSpaceDistributionConfigs& new_configs) {
auto default_distr = new_configs.get_or_nullptr(document::FixedBucketSpaces::default_space());
auto global_distr = new_configs.get_or_nullptr(document::FixedBucketSpaces::global_space());
assert(default_distr && global_distr);
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.h b/storage/src/vespa/storage/distributor/distributor_stripe.h
index d782432ab35..f5793e4d39b 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe.h
+++ b/storage/src/vespa/storage/distributor/distributor_stripe.h
@@ -283,7 +283,7 @@ private:
void propagate_config_snapshot_to_internal_components();
// Additional implementations of TickableStripe:
- void update_distribution_config(const BucketSpaceDistributionConfigs& new_configs) override;
+ void update_distribution_config(const lib::BucketSpaceDistributionConfigs& new_configs) override;
void update_total_distributor_config(std::shared_ptr<const DistributorConfiguration> config) override;
void set_pending_cluster_state_bundle(const lib::ClusterStateBundle& pending_state) override;
void clear_pending_cluster_state_bundle() override;
diff --git a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp
index f1cce40ee8b..991a73ec5c6 100644
--- a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp
+++ b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp
@@ -33,7 +33,7 @@ void MultiThreadedStripeAccessGuard::update_total_distributor_config(std::shared
});
}
-void MultiThreadedStripeAccessGuard::update_distribution_config(const BucketSpaceDistributionConfigs& new_configs) {
+void MultiThreadedStripeAccessGuard::update_distribution_config(const lib::BucketSpaceDistributionConfigs& new_configs) {
for_each_stripe([&](TickableStripe& stripe) {
stripe.update_distribution_config(new_configs);
});
diff --git a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h
index a4392416025..3ce22a3e1a7 100644
--- a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h
+++ b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h
@@ -31,7 +31,7 @@ public:
void update_total_distributor_config(std::shared_ptr<const DistributorConfiguration> config) override;
- void update_distribution_config(const BucketSpaceDistributionConfigs& new_configs) override;
+ void update_distribution_config(const lib::BucketSpaceDistributionConfigs& new_configs) override;
void set_pending_cluster_state_bundle(const lib::ClusterStateBundle& pending_state) override;
void clear_pending_cluster_state_bundle() override;
void enable_cluster_state_bundle(const lib::ClusterStateBundle& new_state,
diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
index 54087850e1b..a92896279b0 100644
--- a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
@@ -180,6 +180,8 @@ void PutOperation::start_direct_put_dispatch(DistributorStripeMessageSender& sen
_op_ctx.distributor_config().getMinimalBucketSplit(),
_bucket_space.getDistribution().getRedundancy(),
_msg->getBucket().getBucketSpace());
+ targetResolver.use_symmetric_replica_selection(
+ _op_ctx.distributor_config().symmetric_put_and_activate_replica_selection());
OperationTargetList targets(targetResolver.getTargets(OperationTargetResolver::PUT, _doc_id_bucket_id));
for (const auto& target : targets) {
diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
index 84e9ab71bcb..849746416d6 100644
--- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
@@ -668,7 +668,7 @@ TwoPhaseUpdateOperation::applyUpdateToDocument(document::Document& doc) const
bool
TwoPhaseUpdateOperation::shouldCreateIfNonExistent() const
{
- return _updateCmd->getUpdate()->getCreateIfNonExistent();
+ return _updateCmd->create_if_missing();
}
bool
diff --git a/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp
index 7b6833cc299..2b47d53363f 100644
--- a/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp
@@ -29,7 +29,7 @@ UpdateOperation::UpdateOperation(const DistributorNodeContext& node_ctx,
_msg(msg),
_entries(std::move(entries)),
_new_timestamp(_msg->getTimestamp()),
- _is_auto_create_update(_msg->getUpdate()->getCreateIfNonExistent()),
+ _is_auto_create_update(_msg->create_if_missing()),
_node_ctx(node_ctx),
_op_ctx(op_ctx),
_bucketSpace(bucketSpace),
@@ -112,6 +112,9 @@ UpdateOperation::onStart(DistributorStripeMessageSender& sender)
copyMessageSettings(*_msg, *command);
command->setOldTimestamp(_msg->getOldTimestamp());
command->setCondition(_msg->getCondition());
+ if (_msg->has_cached_create_if_missing()) {
+ command->set_cached_create_if_missing(_msg->create_if_missing());
+ }
messages.emplace_back(std::move(command), node);
}
diff --git a/storage/src/vespa/storage/distributor/operationtargetresolverimpl.cpp b/storage/src/vespa/storage/distributor/operationtargetresolverimpl.cpp
index 394c13c2bad..618cfb56359 100644
--- a/storage/src/vespa/storage/distributor/operationtargetresolverimpl.cpp
+++ b/storage/src/vespa/storage/distributor/operationtargetresolverimpl.cpp
@@ -10,9 +10,12 @@
namespace storage::distributor {
BucketInstance::BucketInstance(const document::BucketId& id, const api::BucketInfo& info, lib::Node node,
- uint16_t idealLocationPriority, bool trusted, bool exist) noexcept
+ uint16_t ideal_location_priority, uint16_t db_entry_order,
+ bool trusted, bool exist) noexcept
: _bucket(id), _info(info), _node(node),
- _idealLocationPriority(idealLocationPriority), _trusted(trusted), _exist(exist)
+ _ideal_location_priority(ideal_location_priority),
+ _db_entry_order(db_entry_order),
+ _trusted(trusted), _exists(exist)
{
}
@@ -24,8 +27,8 @@ BucketInstance::print(vespalib::asciistream& out, const PrintProperties&) const
std::ostringstream ost;
ost << std::hex << _bucket.getId();
- out << "(" << ost.str() << ", " << infoString << ", node " << _node.getIndex() << ", ideal " << _idealLocationPriority
- << (_trusted ? ", trusted" : "") << (_exist ? "" : ", new copy") << ")";
+ out << "(" << ost.str() << ", " << infoString << ", node " << _node.getIndex() << ", ideal " << _ideal_location_priority
+ << (_trusted ? ", trusted" : "") << (_exists ? "" : ", new copy") << ")";
}
bool
@@ -42,7 +45,7 @@ BucketInstanceList::add(const BucketDatabase::Entry& e, const IdealServiceLayerN
for (uint32_t i = 0; i < e.getBucketInfo().getNodeCount(); ++i) {
const BucketCopy& copy(e.getBucketInfo().getNodeRef(i));
lib::Node node(lib::NodeType::STORAGE, copy.getNode());
- _instances.emplace_back(e.getBucketId(), copy.getBucketInfo(), node, idealState.lookup(copy.getNode()), copy.trusted(), true);
+ _instances.emplace_back(e.getBucketId(), copy.getBucketInfo(), node, idealState.lookup(copy.getNode()), i, copy.trusted(), true);
}
}
@@ -106,7 +109,8 @@ BucketInstanceList::extendToEnoughCopies(const DistributorBucketSpace& distribut
for (uint32_t i=0; i<idealNodes.size(); ++i) {
lib::Node node(lib::NodeType::STORAGE, idealNodes[i]);
if (!contains(node)) {
- _instances.emplace_back(newTarget, api::BucketInfo(), node, i, false, false);
+ // We don't sort `_instances` after extending, so just reuse `i` as dummy DB entry order.
+ _instances.emplace_back(newTarget, api::BucketInfo(), node, i, i, false, false);
}
}
}
@@ -116,7 +120,7 @@ BucketInstanceList::createTargets(document::BucketSpace bucketSpace)
{
OperationTargetList result;
for (const auto& bi : _instances) {
- result.emplace_back(document::Bucket(bucketSpace, bi._bucket), bi._node, !bi._exist);
+ result.emplace_back(document::Bucket(bucketSpace, bi._bucket), bi._node, !bi._exists);
}
return result;
}
@@ -129,6 +133,49 @@ BucketInstanceList::print(vespalib::asciistream& out, const PrintProperties& p)
namespace {
/**
+ * To maintain a symmetry between which replicas receive Puts and which versions are
+ * preferred for activation, use an identical ordering predicate for both (for the case
+ * where replicas are for the same concrete bucket).
+ *
+ * Must only be used with BucketInstances that have a distinct _db_entry_order set per instance.
+ */
+struct ActiveReplicaSymmetricInstanceOrder {
+ bool operator()(const BucketInstance& a, const BucketInstance& b) noexcept {
+ if (a._bucket == b._bucket) {
+ if (a._info.isReady() != b._info.isReady()) {
+ return a._info.isReady();
+ }
+ if (a._info.getDocumentCount() != b._info.getDocumentCount()) {
+ return a._info.getDocumentCount() > b._info.getDocumentCount();
+ }
+ if (a._ideal_location_priority != b._ideal_location_priority) {
+ return a._ideal_location_priority < b._ideal_location_priority;
+ }
+ if (a._info.isActive() != b._info.isActive()) {
+ return a._info.isActive();
+ }
+ // If all else is equal, this implies both A and B are on retired nodes, which is unlikely
+ // but possible. Fall back to the existing DB _entry order_, which is equal to an ideal
+ // state order where retired nodes are considered part of the ideal state (which is not the
+ // case for most ideal state operations). Since the DB entry order is in ideal state order,
+ // using this instead of node _index_ avoids affinities to lower indexes in such edge cases.
+ return a._db_entry_order < b._db_entry_order;
+ } else {
+ // TODO this inconsistent split case is equal to the legacy logic (aside from the tie-breaking),
+ // but is considered to be extremely unlikely in practice, so not worth optimizing for.
+ if ((a._info.getMetaCount() == 0) ^ (b._info.getMetaCount() == 0)) {
+ return (a._info.getMetaCount() == 0);
+ }
+ if (a._bucket.getUsedBits() != b._bucket.getUsedBits()) {
+ return (a._bucket.getUsedBits() > b._bucket.getUsedBits());
+ }
+ return a._db_entry_order < b._db_entry_order;
+ }
+ return false;
+ }
+};
+
+/**
* - Trusted copies should be preferred over non-trusted copies for the same bucket.
* - Buckets in ideal locations should be preferred over non-ideal locations for the
* same bucket across several nodes.
@@ -137,14 +184,14 @@ namespace {
* - Right after split/join, bucket is often not in ideal location, but should be
* preferred instead of source anyhow.
*/
-struct InstanceOrder {
- bool operator()(const BucketInstance& a, const BucketInstance& b) {
+struct LegacyInstanceOrder {
+ bool operator()(const BucketInstance& a, const BucketInstance& b) noexcept {
if (a._bucket == b._bucket) {
- // Trusted only makes sense within same bucket
- // Prefer trusted buckets over non-trusted ones.
+ // Trusted only makes sense within same bucket
+ // Prefer trusted buckets over non-trusted ones.
if (a._trusted != b._trusted) return a._trusted;
- if (a._idealLocationPriority != b._idealLocationPriority) {
- return a._idealLocationPriority < b._idealLocationPriority;
+ if (a._ideal_location_priority != b._ideal_location_priority) {
+ return a._ideal_location_priority < b._ideal_location_priority;
}
} else {
if ((a._info.getMetaCount() == 0) ^ (b._info.getMetaCount() == 0)) {
@@ -164,7 +211,11 @@ OperationTargetResolverImpl::getAllInstances(OperationType type, const document:
BucketInstanceList instances;
if (type == PUT) {
instances.populate(id, _distributor_bucket_space, _bucketDatabase);
- instances.sort(InstanceOrder());
+ if (_symmetric_replica_selection) {
+ instances.sort(ActiveReplicaSymmetricInstanceOrder());
+ } else {
+ instances.sort(LegacyInstanceOrder());
+ }
instances.removeNodeDuplicates();
instances.extendToEnoughCopies(_distributor_bucket_space, _bucketDatabase,
_bucketDatabase.getAppropriateBucket(_minUsedBucketBits, id), id);
diff --git a/storage/src/vespa/storage/distributor/operationtargetresolverimpl.h b/storage/src/vespa/storage/distributor/operationtargetresolverimpl.h
index 9f367a89cba..6ab38928200 100644
--- a/storage/src/vespa/storage/distributor/operationtargetresolverimpl.h
+++ b/storage/src/vespa/storage/distributor/operationtargetresolverimpl.h
@@ -15,15 +15,17 @@ struct BucketInstance : public vespalib::AsciiPrintable {
document::BucketId _bucket;
api::BucketInfo _info;
lib::Node _node;
- uint16_t _idealLocationPriority;
- bool _trusted;
- bool _exist;
+ uint16_t _ideal_location_priority;
+ uint16_t _db_entry_order;
+ bool _trusted; // TODO remove
+ bool _exists;
BucketInstance() noexcept
- : _idealLocationPriority(0xffff), _trusted(false), _exist(false) {}
+ : _ideal_location_priority(0xffff), _db_entry_order(0xffff), _trusted(false), _exists(false)
+ {}
BucketInstance(const document::BucketId& id, const api::BucketInfo& info,
- lib::Node node, uint16_t idealLocationPriority, bool trusted,
- bool exist) noexcept;
+ lib::Node node, uint16_t ideal_location_priority,
+ uint16_t db_entry_order, bool trusted, bool exist) noexcept;
void print(vespalib::asciistream& out, const PrintProperties&) const override;
};
@@ -83,6 +85,7 @@ class OperationTargetResolverImpl : public OperationTargetResolver {
uint32_t _minUsedBucketBits;
uint16_t _redundancy;
document::BucketSpace _bucketSpace;
+ bool _symmetric_replica_selection;
public:
OperationTargetResolverImpl(const DistributorBucketSpace& distributor_bucket_space,
@@ -94,9 +97,14 @@ public:
_bucketDatabase(bucketDatabase),
_minUsedBucketBits(minUsedBucketBits),
_redundancy(redundancy),
- _bucketSpace(bucketSpace)
+ _bucketSpace(bucketSpace),
+ _symmetric_replica_selection(true)
{}
+ void use_symmetric_replica_selection(bool symmetry) noexcept {
+ _symmetric_replica_selection = symmetry;
+ }
+
BucketInstanceList getAllInstances(OperationType type, const document::BucketId& id);
BucketInstanceList getInstances(OperationType type, const document::BucketId& id) {
BucketInstanceList result(getAllInstances(type, id));
diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
index b756c2e421b..f2b5fa62d1e 100644
--- a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
+++ b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
@@ -5,7 +5,6 @@
#include "pendingclusterstate.h"
#include "top_level_bucket_db_updater.h"
#include <vespa/document/bucket/fixed_bucket_spaces.h>
-#include <vespa/storage/common/global_bucket_space_distribution_converter.h>
#include <vespa/storageframework/defaultimplementation/clock/realclock.h>
#include <vespa/vdslib/distribution/distribution.h>
#include <vespa/vespalib/util/xmlstream.hpp>
diff --git a/storage/src/vespa/storage/distributor/stripe_access_guard.h b/storage/src/vespa/storage/distributor/stripe_access_guard.h
index 1618bc9be9d..8a930ed3305 100644
--- a/storage/src/vespa/storage/distributor/stripe_access_guard.h
+++ b/storage/src/vespa/storage/distributor/stripe_access_guard.h
@@ -1,11 +1,11 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
-#include "bucket_space_distribution_configs.h"
#include "pending_bucket_space_db_transition_entry.h"
#include "potential_data_loss_report.h"
#include "outdated_nodes.h"
#include <vespa/document/bucket/bucketspace.h>
+#include <vespa/vdslib/distribution/bucket_space_distribution_configs.h>
#include <vespa/storageapi/defs.h>
#include <unordered_set> // TODO use hash_set instead
@@ -38,7 +38,7 @@ public:
virtual void update_total_distributor_config(std::shared_ptr<const DistributorConfiguration> config) = 0;
- virtual void update_distribution_config(const BucketSpaceDistributionConfigs& new_configs) = 0;
+ virtual void update_distribution_config(const lib::BucketSpaceDistributionConfigs& new_configs) = 0;
virtual void set_pending_cluster_state_bundle(const lib::ClusterStateBundle& pending_state) = 0;
virtual void clear_pending_cluster_state_bundle() = 0;
virtual void enable_cluster_state_bundle(const lib::ClusterStateBundle& new_state,
diff --git a/storage/src/vespa/storage/distributor/tickable_stripe.h b/storage/src/vespa/storage/distributor/tickable_stripe.h
index ab1cd570089..2605c24639e 100644
--- a/storage/src/vespa/storage/distributor/tickable_stripe.h
+++ b/storage/src/vespa/storage/distributor/tickable_stripe.h
@@ -39,7 +39,7 @@ public:
virtual void update_total_distributor_config(std::shared_ptr<const DistributorConfiguration> config) = 0;
- virtual void update_distribution_config(const BucketSpaceDistributionConfigs& new_configs) = 0;
+ virtual void update_distribution_config(const lib::BucketSpaceDistributionConfigs& new_configs) = 0;
virtual void set_pending_cluster_state_bundle(const lib::ClusterStateBundle& pending_state) = 0;
virtual void clear_pending_cluster_state_bundle() = 0;
virtual void enable_cluster_state_bundle(const lib::ClusterStateBundle& new_state,
diff --git a/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp b/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp
index 1f7d11362e2..1991c9aaf58 100644
--- a/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp
+++ b/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp
@@ -2,7 +2,6 @@
#include "top_level_bucket_db_updater.h"
#include "bucket_db_prune_elision.h"
-#include "bucket_space_distribution_configs.h"
#include "bucket_space_distribution_context.h"
#include "top_level_distributor.h"
#include "distributor_bucket_space.h"
@@ -11,11 +10,12 @@
#include "simpleclusterinformation.h"
#include "stripe_access_guard.h"
#include <vespa/document/bucket/fixed_bucket_spaces.h>
-#include <vespa/storage/common/global_bucket_space_distribution_converter.h>
#include <vespa/storage/config/distributorconfiguration.h>
#include <vespa/storageapi/message/persistence.h>
#include <vespa/storageapi/message/removelocation.h>
+#include <vespa/vdslib/distribution/bucket_space_distribution_configs.h>
#include <vespa/vdslib/distribution/distribution.h>
+#include <vespa/vdslib/distribution/global_bucket_space_distribution_converter.h>
#include <vespa/vdslib/state/clusterstate.h>
#include <vespa/vespalib/util/xmlstream.h>
#include <thread>
@@ -54,6 +54,7 @@ TopLevelBucketDBUpdater::TopLevelBucketDBUpdater(const DistributorNodeContext& n
{
// FIXME STRIPE top-level Distributor needs a proper way to track the current cluster state bundle!
propagate_active_state_bundle_internally(true); // We're just starting up so assume ownership transfer.
+ // TODO bootstrap cluster state bundle instead? version:0 cluster:d
bootstrap_distribution_config(std::move(bootstrap_distribution));
}
@@ -71,7 +72,7 @@ TopLevelBucketDBUpdater::propagate_active_state_bundle_internally(bool has_bucke
void
TopLevelBucketDBUpdater::bootstrap_distribution_config(std::shared_ptr<const lib::Distribution> distribution) {
- auto global_distr = GlobalBucketSpaceDistributionConverter::convert_to_global(*distribution);
+ auto global_distr = lib::GlobalBucketSpaceDistributionConverter::convert_to_global(*distribution);
_op_ctx.bucket_space_states().get(document::FixedBucketSpaces::default_space()).set_distribution(distribution);
_op_ctx.bucket_space_states().get(document::FixedBucketSpaces::global_space()).set_distribution(global_distr);
// TODO STRIPE do we need to bootstrap the stripes as well here? Or do they do this on their own volition?
@@ -79,7 +80,7 @@ TopLevelBucketDBUpdater::bootstrap_distribution_config(std::shared_ptr<const lib
}
void
-TopLevelBucketDBUpdater::propagate_distribution_config(const BucketSpaceDistributionConfigs& configs) {
+TopLevelBucketDBUpdater::propagate_distribution_config(const lib::BucketSpaceDistributionConfigs& configs) {
if (auto distr = configs.get_or_nullptr(document::FixedBucketSpaces::default_space())) {
_op_ctx.bucket_space_states().get(document::FixedBucketSpaces::default_space()).set_distribution(distr);
}
@@ -183,13 +184,14 @@ TopLevelBucketDBUpdater::complete_transition_timer()
}
void
-TopLevelBucketDBUpdater::storage_distribution_changed(const BucketSpaceDistributionConfigs& configs)
+TopLevelBucketDBUpdater::storage_distribution_changed(const lib::BucketSpaceDistributionConfigs& configs)
{
propagate_distribution_config(configs);
ensure_transition_timer_started();
auto guard = _stripe_accessor.rendezvous_and_hold_all();
// FIXME STRIPE might this cause a mismatch with the component stuff's own distribution config..?!
+ // TODO should be part of bundle only...!!
guard->update_distribution_config(configs);
remove_superfluous_buckets(*guard, _active_state_bundle, true);
diff --git a/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h b/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h
index e76456329d4..87a408281a7 100644
--- a/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h
+++ b/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h
@@ -23,9 +23,12 @@ class XmlOutputStream;
class XmlAttribute;
}
+namespace storage::lib {
+struct BucketSpaceDistributionConfigs;
+}
+
namespace storage::distributor {
-struct BucketSpaceDistributionConfigs;
class BucketSpaceDistributionContext;
class ClusterStateBundleActivationListener;
class DistributorInterface;
@@ -57,9 +60,9 @@ public:
bool reportStatus(std::ostream&, const framework::HttpUrlPath&) const override;
void resend_delayed_messages();
- void storage_distribution_changed(const BucketSpaceDistributionConfigs& configs);
+ void storage_distribution_changed(const lib::BucketSpaceDistributionConfigs& configs);
void bootstrap_distribution_config(std::shared_ptr<const lib::Distribution>);
- void propagate_distribution_config(const BucketSpaceDistributionConfigs& configs);
+ void propagate_distribution_config(const lib::BucketSpaceDistributionConfigs& configs);
vespalib::string report_xml_status(vespalib::xml::XmlOutputStream& xos, const framework::HttpUrlPath&) const;
diff --git a/storage/src/vespa/storage/distributor/top_level_distributor.cpp b/storage/src/vespa/storage/distributor/top_level_distributor.cpp
index 7348dbd6409..f7ee89ae7c0 100644
--- a/storage/src/vespa/storage/distributor/top_level_distributor.cpp
+++ b/storage/src/vespa/storage/distributor/top_level_distributor.cpp
@@ -1,7 +1,6 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
//
#include "blockingoperationstarter.h"
-#include "bucket_space_distribution_configs.h"
#include "top_level_bucket_db_updater.h"
#include "top_level_distributor.h"
#include "distributor_bucket_space.h"
@@ -16,7 +15,6 @@
#include "throttlingoperationstarter.h"
#include <vespa/document/bucket/fixed_bucket_spaces.h>
#include <vespa/storage/common/bucket_stripe_utils.h>
-#include <vespa/storage/common/global_bucket_space_distribution_converter.h>
#include <vespa/storage/common/hostreporter/hostinfo.h>
#include <vespa/storage/common/node_identity.h>
#include <vespa/storage/common/nodestateupdater.h>
@@ -25,7 +23,9 @@
#include <vespa/storageapi/message/persistence.h>
#include <vespa/storageapi/message/visitor.h>
#include <vespa/storageframework/generic/status/xmlstatusreporter.h>
+#include <vespa/vdslib/distribution/bucket_space_distribution_configs.h>
#include <vespa/vdslib/distribution/distribution.h>
+#include <vespa/vdslib/distribution/global_bucket_space_distribution_converter.h>
#include <vespa/vespalib/util/memoryusage.h>
#include <algorithm>
@@ -323,7 +323,7 @@ TopLevelDistributor::enable_next_distribution_if_changed()
if (_next_distribution) {
_distribution = _next_distribution;
_next_distribution = std::shared_ptr<lib::Distribution>();
- auto new_configs = BucketSpaceDistributionConfigs::from_default_distribution(_distribution);
+ auto new_configs = lib::BucketSpaceDistributionConfigs::from_default_distribution(_distribution);
_bucket_db_updater->storage_distribution_changed(new_configs); // Transitively updates all stripes' configs
}
}
@@ -334,7 +334,7 @@ TopLevelDistributor::propagate_default_distribution_thread_unsafe(
{
// Should only be called at ctor time, at which point the pool is not yet running.
assert(_stripe_pool.stripe_count() == 0);
- auto new_configs = BucketSpaceDistributionConfigs::from_default_distribution(std::move(distribution));
+ auto new_configs = lib::BucketSpaceDistributionConfigs::from_default_distribution(std::move(distribution));
for (auto& stripe : _stripes) {
stripe->update_distribution_config(new_configs);
}
diff --git a/storage/src/vespa/storage/frameworkimpl/component/servicelayercomponentregisterimpl.cpp b/storage/src/vespa/storage/frameworkimpl/component/servicelayercomponentregisterimpl.cpp
index 94853ec18d1..ab1cbf0b4d7 100644
--- a/storage/src/vespa/storage/frameworkimpl/component/servicelayercomponentregisterimpl.cpp
+++ b/storage/src/vespa/storage/frameworkimpl/component/servicelayercomponentregisterimpl.cpp
@@ -3,7 +3,7 @@
#include "servicelayercomponentregisterimpl.h"
#include <vespa/vespalib/util/exceptions.h>
#include <vespa/document/bucket/fixed_bucket_spaces.h>
-#include <vespa/storage/common/global_bucket_space_distribution_converter.h>
+#include <vespa/vdslib/distribution/global_bucket_space_distribution_converter.h>
namespace storage {
@@ -26,7 +26,7 @@ void
ServiceLayerComponentRegisterImpl::setDistribution(std::shared_ptr<lib::Distribution> distribution)
{
_bucketSpaceRepo.get(document::FixedBucketSpaces::default_space()).setDistribution(distribution);
- auto global_distr = GlobalBucketSpaceDistributionConverter::convert_to_global(*distribution);
+ auto global_distr = lib::GlobalBucketSpaceDistributionConverter::convert_to_global(*distribution);
_bucketSpaceRepo.get(document::FixedBucketSpaces::global_space()).setDistribution(global_distr);
StorageComponentRegisterImpl::setDistribution(distribution);
}
diff --git a/storage/src/vespa/storage/persistence/bucketownershipnotifier.cpp b/storage/src/vespa/storage/persistence/bucketownershipnotifier.cpp
index ee49b346d92..d5de11c7d6f 100644
--- a/storage/src/vespa/storage/persistence/bucketownershipnotifier.cpp
+++ b/storage/src/vespa/storage/persistence/bucketownershipnotifier.cpp
@@ -19,6 +19,7 @@ uint16_t
BucketOwnershipNotifier::getOwnerDistributorForBucket(const document::Bucket &bucket) const
{
try {
+ // TODO use state updater bundle for everything?
auto distribution(_component.getBucketSpaceRepo().get(bucket.getBucketSpace()).getDistribution());
const auto clusterStateBundle = _component.getStateUpdater().getClusterStateBundle();
const auto &clusterState = *clusterStateBundle->getDerivedClusterState(bucket.getBucketSpace());
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
index 23de39f7130..495497d507d 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
@@ -930,8 +930,9 @@ FileStorManager::updateState()
for (const auto &elem : _component.getBucketSpaceRepo()) {
BucketSpace bucketSpace(elem.first);
ContentBucketSpace& contentBucketSpace = *elem.second;
- auto derivedClusterState = contentBucketSpace.getClusterState();
- const bool node_up_in_space = derivedClusterState->getNodeState(node).getState().oneOf("uir");
+ auto state_and_distr = contentBucketSpace.state_and_distribution();
+ assert(state_and_distr->valid());
+ const bool node_up_in_space = state_and_distr->cluster_state().getNodeState(node).getState().oneOf("uir");
if (should_deactivate_buckets(contentBucketSpace, node_up_in_space, in_maintenance)) {
LOG(debug, "Received cluster state where this node is down; de-activating all buckets "
"in database for bucket space %s", bucketSpace.toString().c_str());
@@ -941,9 +942,8 @@ FileStorManager::updateState()
}
contentBucketSpace.setNodeUpInLastNodeStateSeenByProvider(node_up_in_space);
contentBucketSpace.setNodeMaintenanceInLastNodeStateSeenByProvider(in_maintenance);
- spi::ClusterState spiState(*derivedClusterState, _component.getIndex(),
- *contentBucketSpace.getDistribution(),
- in_maintenance);
+ spi::ClusterState spiState(state_and_distr->cluster_state(), _component.getIndex(),
+ state_and_distr->distribution(), in_maintenance);
_provider->setClusterState(bucketSpace, spiState);
}
}
@@ -959,6 +959,7 @@ FileStorManager::propagateClusterStates()
{
auto clusterStateBundle = _component.getStateUpdater().getClusterStateBundle();
for (const auto &elem : _component.getBucketSpaceRepo()) {
+ // TODO also distribution! bundle and repo must be 1-1
elem.second->setClusterState(clusterStateBundle->getDerivedClusterState(elem.first));
}
}
diff --git a/storage/src/vespa/storage/persistence/filestorage/merge_handler_metrics.cpp b/storage/src/vespa/storage/persistence/filestorage/merge_handler_metrics.cpp
index c3cb38bd7ac..582c69e943f 100644
--- a/storage/src/vespa/storage/persistence/filestorage/merge_handler_metrics.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/merge_handler_metrics.cpp
@@ -20,8 +20,8 @@ MergeHandlerMetrics::MergeHandlerMetrics(metrics::MetricSet* owner)
"current node.", owner),
mergeAverageDataReceivedNeeded("mergeavgdatareceivedneeded", {}, "Amount of data transferred from previous node "
"in chain that we needed to apply locally.", owner),
- put_latency("put_latency", {}, "Latency of individual puts that are part of merge operations", owner),
- remove_latency("remove_latency", {}, "Latency of individual removes that are part of merge operations", owner)
+ merge_put_latency("merge_put_latency", {}, "Latency of individual puts that are part of merge operations", owner),
+ merge_remove_latency("merge_remove_latency", {}, "Latency of individual removes that are part of merge operations", owner)
{}
MergeHandlerMetrics::~MergeHandlerMetrics() = default;
diff --git a/storage/src/vespa/storage/persistence/filestorage/merge_handler_metrics.h b/storage/src/vespa/storage/persistence/filestorage/merge_handler_metrics.h
index 44b85570357..a2d68011695 100644
--- a/storage/src/vespa/storage/persistence/filestorage/merge_handler_metrics.h
+++ b/storage/src/vespa/storage/persistence/filestorage/merge_handler_metrics.h
@@ -21,8 +21,8 @@ struct MergeHandlerMetrics {
metrics::DoubleAverageMetric mergeAverageDataReceivedNeeded;
// Individual operation metrics. These capture both count and latency sum, so
// no need for explicit count metric on the side.
- metrics::DoubleAverageMetric put_latency;
- metrics::DoubleAverageMetric remove_latency;
+ metrics::DoubleAverageMetric merge_put_latency;
+ metrics::DoubleAverageMetric merge_remove_latency;
// Iteration over metadata and document payload data is already covered by
// the merge[Meta]Data(Read|Write)Latency metrics, so not repeated here. Can be
// explicitly added if deemed required.
diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp
index 7ee2d9f37bf..b3207428f5f 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.cpp
+++ b/storage/src/vespa/storage/persistence/mergehandler.cpp
@@ -4,13 +4,14 @@
#include "persistenceutil.h"
#include "apply_bucket_diff_entry_complete.h"
#include "apply_bucket_diff_state.h"
-#include <vespa/storage/persistence/filestorage/mergestatus.h>
-#include <vespa/persistence/spi/persistenceprovider.h>
-#include <vespa/persistence/spi/docentry.h>
-#include <vespa/vdslib/distribution/distribution.h>
#include <vespa/document/fieldset/fieldsets.h>
#include <vespa/document/fieldvalue/document.h>
+#include <vespa/persistence/spi/docentry.h>
+#include <vespa/persistence/spi/persistenceprovider.h>
+#include <vespa/storage/persistence/filestorage/mergestatus.h>
+#include <vespa/vdslib/distribution/distribution.h>
#include <vespa/vespalib/objects/nbostream.h>
+#include <vespa/vespalib/stllike/hash_map.hpp>
#include <vespa/vespalib/util/exceptions.h>
#include <vespa/vespalib/util/isequencedtaskexecutor.h>
#include <algorithm>
@@ -506,8 +507,18 @@ void
MergeHandler::applyDiffEntry(std::shared_ptr<ApplyBucketDiffState> async_results,
const spi::Bucket& bucket,
const api::ApplyBucketDiffCommand::Entry& e,
- const document::DocumentTypeRepo& repo) const
+ const document::DocumentTypeRepo& repo,
+ const NewestDocumentVersionMapping& newest_per_doc) const
{
+ if (!e._docName.empty()) {
+ auto version_iter = newest_per_doc.find(e._docName);
+ assert(version_iter != newest_per_doc.end());
+ if (e._entry._timestamp != version_iter->second) {
+ LOG(spam, "ApplyBucketDiff(%s): skipping diff entry %s since it is subsumed by a newer timestamp %" PRIu64,
+ bucket.toString().c_str(), e.toString().c_str(), version_iter->second);
+ return;
+ }
+ }
auto throttle_token = _env._fileStorHandler.operation_throttler().blocking_acquire_one();
spi::Timestamp timestamp(e._entry._timestamp);
if (!(e._entry._flags & (DELETED | DELETED_IN_PLACE))) {
@@ -516,14 +527,14 @@ MergeHandler::applyDiffEntry(std::shared_ptr<ApplyBucketDiffState> async_results
document::DocumentId docId = doc->getId();
auto complete = std::make_unique<ApplyBucketDiffEntryComplete>(std::move(async_results), std::move(docId),
std::move(throttle_token), "put",
- _clock, _env._metrics.merge_handler_metrics.put_latency);
+ _clock, _env._metrics.merge_handler_metrics.merge_put_latency);
_spi.putAsync(bucket, timestamp, std::move(doc), std::move(complete));
} else {
std::vector<spi::IdAndTimestamp> ids;
ids.emplace_back(document::DocumentId(e._docName), timestamp);
auto complete = std::make_unique<ApplyBucketDiffEntryComplete>(std::move(async_results), ids[0].id,
std::move(throttle_token), "remove",
- _clock, _env._metrics.merge_handler_metrics.remove_latency);
+ _clock, _env._metrics.merge_handler_metrics.merge_remove_latency);
_spi.removeAsync(bucket, std::move(ids), std::move(complete));
}
}
@@ -548,6 +559,7 @@ MergeHandler::applyDiffLocally(const spi::Bucket& bucket, std::vector<api::Apply
DocEntryList entries;
populateMetaData(bucket, Timestamp::max(), entries, context);
+ const auto newest_versions = enumerate_newest_document_versions(diff);
const document::DocumentTypeRepo & repo = _env.getDocumentTypeRepo();
uint32_t existingCount = entries.size();
@@ -580,7 +592,7 @@ MergeHandler::applyDiffLocally(const spi::Bucket& bucket, std::vector<api::Apply
++i;
LOG(spam, "ApplyBucketDiff(%s): Adding slot %s",
bucket.toString().c_str(), e.toString().c_str());
- applyDiffEntry(async_results, bucket, e, repo);
+ applyDiffEntry(async_results, bucket, e, repo, newest_versions);
} else {
assert(spi::Timestamp(e._entry._timestamp) == existing.getTimestamp());
// Diffing for existing timestamp; should either both be put
@@ -591,7 +603,7 @@ MergeHandler::applyDiffLocally(const spi::Bucket& bucket, std::vector<api::Apply
if ((e._entry._flags & DELETED) && !existing.isRemove()) {
LOG(debug, "Slot in diff is remove for existing timestamp in %s. Diff slot: %s. Existing slot: %s",
bucket.toString().c_str(), e.toString().c_str(), existing.toString().c_str());
- applyDiffEntry(async_results, bucket, e, repo);
+ applyDiffEntry(async_results, bucket, e, repo, newest_versions);
} else {
// Duplicate put, just ignore it.
LOG(debug, "During diff apply, attempting to add slot whose timestamp already exists in %s, "
@@ -619,7 +631,7 @@ MergeHandler::applyDiffLocally(const spi::Bucket& bucket, std::vector<api::Apply
LOG(spam, "ApplyBucketDiff(%s): Adding slot %s",
bucket.toString().c_str(), e.toString().c_str());
- applyDiffEntry(async_results, bucket, e, repo);
+ applyDiffEntry(async_results, bucket, e, repo, newest_versions);
byteCount += e._headerBlob.size() + e._bodyBlob.size();
}
if (byteCount + notNeededByteCount != 0) {
@@ -631,6 +643,27 @@ MergeHandler::applyDiffLocally(const spi::Bucket& bucket, std::vector<api::Apply
bucket.toString().c_str(), addedCount);
}
+MergeHandler::NewestDocumentVersionMapping
+MergeHandler::enumerate_newest_document_versions(const std::vector<api::ApplyBucketDiffCommand::Entry>& diff)
+{
+ NewestDocumentVersionMapping newest_per_doc;
+ for (const auto& e : diff) {
+ // We expect the doc name to always be filled out, both for remove operations and for puts.
+ // But since the latter is technically redundant (ID is also found within the document), we
+ // guard on this to be forwards compatible in case this changes (e.g. to populate and use
+ // the GID field instead). Fallback to the legacy behavior if so.
+ if (e._docName.empty()) {
+ continue;
+ }
+ auto [existing_iter, inserted] = newest_per_doc.insert(std::make_pair(vespalib::stringref(e._docName), e._entry._timestamp));
+ if (!inserted) {
+ assert(existing_iter != newest_per_doc.end());
+ existing_iter->second = std::max(existing_iter->second, e._entry._timestamp);
+ }
+ }
+ return newest_per_doc;
+}
+
void
MergeHandler::sync_bucket_info(const spi::Bucket& bucket) const
{
diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h
index f3bef802229..2be45e7bc8b 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.h
+++ b/storage/src/vespa/storage/persistence/mergehandler.h
@@ -18,6 +18,7 @@
#include <vespa/storageapi/message/bucket.h>
#include <vespa/storage/common/cluster_context.h>
#include <vespa/storage/common/messagesender.h>
+#include <vespa/vespalib/stllike/hash_map.h>
#include <vespa/vespalib/util/monitored_refcount.h>
#include <vespa/storageframework/generic/clock/time.h>
@@ -42,6 +43,8 @@ private:
using MessageTrackerUP = std::unique_ptr<MessageTracker>;
using Timestamp = framework::MicroSecTime;
public:
+ using NewestDocumentVersionMapping = vespalib::hash_map<vespalib::stringref, api::Timestamp>;
+
enum StateFlag {
IN_USE = 0x01,
DELETED = 0x02,
@@ -72,6 +75,17 @@ public:
void handleApplyBucketDiffReply(api::ApplyBucketDiffReply&, MessageSender&, MessageTrackerUP) const;
void drain_async_writes();
+ /**
+ * Returns a mapping that, for each document ID, contains the newest version of that document that
+ * is present in the diff.
+ *
+ * The returned hash_map keys point directly into the `ApplyBucketDiffCommand::Entry::_docName` memory
+ * owned by `diff`, so this memory must remain unchanged and stable for the duration of the returned
+ * mapping's lifetime.
+ */
+ static NewestDocumentVersionMapping enumerate_newest_document_versions(
+ const std::vector<api::ApplyBucketDiffCommand::Entry>& diff);
+
private:
using DocEntryList = std::vector<std::unique_ptr<spi::DocEntry>>;
const framework::Clock &_clock;
@@ -90,9 +104,13 @@ private:
/**
* Invoke either put, remove or unrevertable remove on the SPI
* depending on the flags in the diff entry.
+ *
+ * If `newest_doc_version` indicates that the entry is not the newest version present in the
+ * diff, the entry is silently ignored and is _not_ invoked on the SPI.
*/
void applyDiffEntry(std::shared_ptr<ApplyBucketDiffState> async_results, const spi::Bucket&,
- const api::ApplyBucketDiffCommand::Entry&, const document::DocumentTypeRepo& repo) const;
+ const api::ApplyBucketDiffCommand::Entry&, const document::DocumentTypeRepo& repo,
+ const NewestDocumentVersionMapping& newest_per_doc) const;
/**
* Fill entries-vector with metadata for bucket up to maxTimestamp,
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
index bbd4e87cb40..1b119a0e631 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
@@ -622,7 +622,14 @@ CommunicationManager::sendDirectRPCReply(RPCRequestWrapper& request,
request.addReturnString(ns.str().c_str());
LOGBP(debug, "Sending getnodestate2 reply with no host info.");
} else if (requestName == "setsystemstate2" || requestName == "setdistributionstates") {
- // No data to return
+ // No data to return, but the request must be failed iff we rejected the state version
+ // due to a higher version having been previously received.
+ auto& state_reply = dynamic_cast<api::SetSystemStateReply&>(*reply);
+ if (state_reply.getResult().getResult() == api::ReturnCode::REJECTED) {
+ vespalib::string err_msg = state_reply.getResult().getMessage(); // ReturnCode message is stringref
+ request.returnError(FRTE_RPC_METHOD_FAILED, err_msg.c_str());
+ return;
+ }
} else if (requestName == "activate_cluster_state_version") {
auto& activate_reply(dynamic_cast<api::ActivateClusterStateVersionReply&>(*reply));
request.addReturnInt(activate_reply.actualVersion());
diff --git a/storage/src/vespa/storage/storageserver/documentapiconverter.cpp b/storage/src/vespa/storage/storageserver/documentapiconverter.cpp
index ca46e87285b..5b8052a05f8 100644
--- a/storage/src/vespa/storage/storageserver/documentapiconverter.cpp
+++ b/storage/src/vespa/storage/storageserver/documentapiconverter.cpp
@@ -54,6 +54,9 @@ DocumentApiConverter::toStorageAPI(documentapi::DocumentMessage& fromMsg)
auto to = std::make_unique<api::UpdateCommand>(bucket, from.stealDocumentUpdate(), from.getNewTimestamp());
to->setOldTimestamp(from.getOldTimestamp());
to->setCondition(from.getCondition());
+ if (from.has_cached_create_if_missing()) {
+ to->set_cached_create_if_missing(from.create_if_missing());
+ }
toMsg = std::move(to);
break;
}
@@ -217,6 +220,9 @@ DocumentApiConverter::toDocumentAPI(api::StorageCommand& fromMsg)
to->setOldTimestamp(from.getOldTimestamp());
to->setNewTimestamp(from.getTimestamp());
to->setCondition(from.getCondition());
+ if (from.has_cached_create_if_missing()) {
+ to->set_cached_create_if_missing(from.create_if_missing());
+ }
toMsg = std::move(to);
break;
}
diff --git a/storage/src/vespa/storage/storageserver/rpc/cluster_controller_api_rpc_service.cpp b/storage/src/vespa/storage/storageserver/rpc/cluster_controller_api_rpc_service.cpp
index 4d74eb1974b..3b53c0c9584 100644
--- a/storage/src/vespa/storage/storageserver/rpc/cluster_controller_api_rpc_service.cpp
+++ b/storage/src/vespa/storage/storageserver/rpc/cluster_controller_api_rpc_service.cpp
@@ -124,7 +124,8 @@ void ClusterControllerApiRpcService::RPC_setSystemState2(FRT_RPCRequest* req) {
req->GetParams()->GetValue(0)._string._len);
lib::ClusterState systemState(systemStateStr);
- auto cmd = std::make_shared<api::SetSystemStateCommand>(lib::ClusterStateBundle(systemState));
+ auto bundle = std::make_shared<const lib::ClusterStateBundle>(systemState);
+ auto cmd = std::make_shared<api::SetSystemStateCommand>(std::move(bundle));
cmd->setPriority(api::StorageMessage::VERYHIGH);
detach_and_forward_to_enqueuer(std::move(cmd), req);
@@ -167,8 +168,7 @@ void ClusterControllerApiRpcService::RPC_setDistributionStates(FRT_RPCRequest* r
}
LOG(debug, "Got state bundle %s", state_bundle->toString().c_str());
- // TODO add constructor taking in shared_ptr directly instead?
- auto cmd = std::make_shared<api::SetSystemStateCommand>(*state_bundle);
+ auto cmd = std::make_shared<api::SetSystemStateCommand>(std::move(state_bundle));
cmd->setPriority(api::StorageMessage::VERYHIGH);
detach_and_forward_to_enqueuer(std::move(cmd), req);
diff --git a/storage/src/vespa/storage/storageserver/servicelayernode.cpp b/storage/src/vespa/storage/storageserver/servicelayernode.cpp
index d18935afe24..98796ee6440 100644
--- a/storage/src/vespa/storage/storageserver/servicelayernode.cpp
+++ b/storage/src/vespa/storage/storageserver/servicelayernode.cpp
@@ -125,24 +125,28 @@ ServiceLayerNode::initializeNodeSpecific()
void
ServiceLayerNode::handleLiveConfigUpdate(const InitialGuard & initGuard)
{
- if (_server_config.staging) {
- bool updated = false;
- vespa::config::content::core::StorServerConfigBuilder oldC(*_server_config.active);
- StorServerConfig& newC(*_server_config.staging);
- {
- updated = false;
- NodeStateUpdater::Lock::SP lock(_component->getStateUpdater().grabStateChangeLock());
- lib::NodeState ns(*_component->getStateUpdater().getReportedNodeState());
- if (DIFFER(nodeCapacity)) {
- LOG(info, "Live config update: Updating node capacity from %f to %f.",
- oldC.nodeCapacity, newC.nodeCapacity);
- ASSIGN(nodeCapacity);
- ns.setCapacity(newC.nodeCapacity);
- }
- if (updated) {
- // FIXME this always gets overwritten by StorageNode::handleLiveConfigUpdate...! Intentional?
- _server_config.active = std::make_unique<vespa::config::content::core::StorServerConfig>(oldC);
- _component->getStateUpdater().setReportedNodeState(ns);
+ {
+ std::lock_guard config_lock(_configLock);
+ // Live server config patching happens both here and in StorageNode::handleLiveConfigUpdate,
+ // which we have to delegate to afterward (_without_ holding _configLock at the time).
+ if (_server_config.staging) {
+ bool updated = false;
+ vespa::config::content::core::StorServerConfigBuilder oldC(*_server_config.active);
+ StorServerConfig& newC(*_server_config.staging);
+ {
+ NodeStateUpdater::Lock::SP lock(_component->getStateUpdater().grabStateChangeLock());
+ lib::NodeState ns(*_component->getStateUpdater().getReportedNodeState());
+ if (DIFFER(nodeCapacity)) {
+ LOG(info, "Live config update: Updating node capacity from %f to %f.",
+ oldC.nodeCapacity, newC.nodeCapacity);
+ ASSIGN(nodeCapacity);
+ ns.setCapacity(newC.nodeCapacity);
+ }
+ if (updated) {
+ // FIXME the patching of old config vs new config is confusing and error-prone. Redesign!
+ _server_config.active = std::make_unique<vespa::config::content::core::StorServerConfig>(oldC);
+ _component->getStateUpdater().setReportedNodeState(ns);
+ }
}
}
}
diff --git a/storage/src/vespa/storage/storageserver/statemanager.cpp b/storage/src/vespa/storage/storageserver/statemanager.cpp
index adebaa51c08..a2106dce8d2 100644
--- a/storage/src/vespa/storage/storageserver/statemanager.cpp
+++ b/storage/src/vespa/storage/storageserver/statemanager.cpp
@@ -21,7 +21,7 @@
#include <fstream>
#include <ranges>
-#include <vespa/log/log.h>
+#include <vespa/log/bufferedlogger.h>
LOG_SETUP(".state.manager");
using vespalib::make_string_short::fmt;
@@ -74,10 +74,13 @@ StateManager::StateManager(StorageComponentRegister& compReg,
_health_ping_time(),
_health_ping_warn_interval(5min),
_health_ping_warn_time(_start_time + _health_ping_warn_interval),
+ _last_accepted_cluster_state_time(),
+ _last_observed_version_from_cc(),
_hostInfo(std::move(hostInfo)),
_controllers_observed_explicit_node_state(),
_noThreadTestMode(testMode),
_grabbedExternalLock(false),
+ _require_strictly_increasing_cluster_state_versions(false),
_notifyingListeners(false),
_requested_almost_immediate_node_state_replies(false)
{
@@ -436,21 +439,67 @@ StateManager::mark_controller_as_having_observed_explicit_node_state(const std::
_controllers_observed_explicit_node_state.emplace(controller_index);
}
-void
-StateManager::setClusterStateBundle(const ClusterStateBundle& c)
+std::optional<uint32_t>
+StateManager::try_set_cluster_state_bundle(std::shared_ptr<const ClusterStateBundle> c,
+ uint16_t origin_controller_index)
{
{
std::lock_guard lock(_stateLock);
- _nextSystemState = std::make_shared<const ClusterStateBundle>(c);
+ uint32_t effective_active_version = (_nextSystemState ? _nextSystemState->getVersion()
+ : _systemState->getVersion());
+ const auto now = _component.getClock().getMonotonicTime();
+ const uint32_t last_ver_from_cc = _last_observed_version_from_cc[origin_controller_index];
+ _last_observed_version_from_cc[origin_controller_index] = c->getVersion();
+
+ if (_require_strictly_increasing_cluster_state_versions && (c->getVersion() < effective_active_version)) {
+ if (c->getVersion() >= last_ver_from_cc) {
+ constexpr auto reject_warn_threshold = 30s;
+ if (now - _last_accepted_cluster_state_time <= reject_warn_threshold) {
+ LOG(debug, "Rejecting cluster state with version %u from cluster controller %u, as "
+ "we've already accepted version %u. Recently accepted another cluster state, "
+ "so assuming transient CC leadership period overlap.",
+ c->getVersion(), origin_controller_index, effective_active_version);
+ } else {
+ // Rejections have happened for some time. Make a bit of noise.
+ LOGBP(warning, "Rejecting cluster state with version %u from cluster controller %u, as "
+ "we've already accepted version %u.",
+ c->getVersion(), origin_controller_index, effective_active_version);
+ }
+ return {effective_active_version};
+ } else {
+ // SetSystemState RPCs are FIFO-ordered and a particular CC should enforce strictly increasing
+ // cluster state versions through its ZooKeeper quorum (but commands may be resent for a given
+ // version). This means that commands should contain _monotonically increasing_ versions from
+ // a given CC origin index.
+ // If this is _not_ the case, it indicates ZooKeeper state on the CCs has been lost or wiped,
+ // at which point we have no other realistic choice than to accept the version, or the system
+ // will stall until an operator manually intervenes by restarting the content cluster.
+ LOG(error, "Received cluster state version %u from cluster controller %u, which is lower than "
+ "the current state version (%u) and the last received version (%u) from the same controller. "
+ "This indicates loss of cluster controller ZooKeeper state; accepting lower version to "
+ "prevent content cluster operations from stalling for an indeterminate amount of time.",
+ c->getVersion(), origin_controller_index, effective_active_version, last_ver_from_cc);
+ // Fall through to state acceptance.
+ }
+ }
+ _last_accepted_cluster_state_time = now;
+ _nextSystemState = std::move(c);
}
notifyStateListeners();
+ return std::nullopt;
}
bool
StateManager::onSetSystemState(const std::shared_ptr<api::SetSystemStateCommand>& cmd)
{
- setClusterStateBundle(cmd->getClusterStateBundle());
- sendUp(std::make_shared<api::SetSystemStateReply>(*cmd));
+ auto reply = std::make_shared<api::SetSystemStateReply>(*cmd);
+ const auto maybe_rejected_by_ver = try_set_cluster_state_bundle(cmd->cluster_state_bundle_ptr(), cmd->getSourceIndex());
+ if (maybe_rejected_by_ver) {
+ reply->setResult(api::ReturnCode(api::ReturnCode::REJECTED,
+ fmt("Cluster state version %u rejected; node already has a higher cluster state version (%u)",
+ cmd->getClusterStateBundle().getVersion(), *maybe_rejected_by_ver)));
+ }
+ sendUp(reply);
return true;
}
@@ -520,6 +569,13 @@ StateManager::tick() {
warn_on_missing_health_ping();
}
+void
+StateManager::set_require_strictly_increasing_cluster_state_versions(bool req) noexcept
+{
+ std::lock_guard guard(_stateLock);
+ _require_strictly_increasing_cluster_state_versions = req;
+}
+
bool
StateManager::sendGetNodeStateReplies() {
return sendGetNodeStateReplies(0xffff);
diff --git a/storage/src/vespa/storage/storageserver/statemanager.h b/storage/src/vespa/storage/storageserver/statemanager.h
index 72b89dc4d65..d116f968731 100644
--- a/storage/src/vespa/storage/storageserver/statemanager.h
+++ b/storage/src/vespa/storage/storageserver/statemanager.h
@@ -1,9 +1,6 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
/**
- * @class storage::StateManager
- * @ingroup storageserver
- *
- * @brief Keeps and updates node and system states.
+ * Keeps and updates node and system states.
*
* This component implements the NodeStateUpdater interface to handle states
* for all components. See that interface for documentation.
@@ -22,11 +19,13 @@
#include <vespa/storageapi/message/state.h>
#include <vespa/storageapi/messageapi/storagemessage.h>
#include <vespa/vespalib/objects/floatingpointtype.h>
+#include <vespa/vespalib/stllike/hash_map.h>
+#include <atomic>
#include <deque>
+#include <list>
#include <map>
+#include <optional>
#include <unordered_set>
-#include <list>
-#include <atomic>
namespace metrics {
class MetricManager;
@@ -69,6 +68,8 @@ class StateManager : public NodeStateUpdater,
std::optional<vespalib::steady_time> _health_ping_time;
vespalib::duration _health_ping_warn_interval;
vespalib::steady_time _health_ping_warn_time;
+ vespalib::steady_time _last_accepted_cluster_state_time;
+ vespalib::hash_map<uint16_t, uint32_t> _last_observed_version_from_cc;
std::unique_ptr<HostInfo> _hostInfo;
std::unique_ptr<framework::Thread> _thread;
// Controllers that have observed a GetNodeState response sent _after_
@@ -76,6 +77,7 @@ class StateManager : public NodeStateUpdater,
std::unordered_set<uint16_t> _controllers_observed_explicit_node_state;
bool _noThreadTestMode;
bool _grabbedExternalLock;
+ bool _require_strictly_increasing_cluster_state_versions;
std::atomic<bool> _notifyingListeners;
std::atomic<bool> _requested_almost_immediate_node_state_replies;
@@ -90,6 +92,9 @@ public:
void tick();
void warn_on_missing_health_ping();
+ // Precondition: internal state mutex must not be held
+ void set_require_strictly_increasing_cluster_state_versions(bool req) noexcept;
+
void print(std::ostream& out, bool verbose, const std::string& indent) const override;
void reportHtmlStatus(std::ostream&, const framework::HttpUrlPath&) const override;
@@ -102,7 +107,11 @@ public:
Lock::SP grabStateChangeLock() override;
void setReportedNodeState(const lib::NodeState& state) override;
- void setClusterStateBundle(const ClusterStateBundle& c);
+ // Iff state was accepted, returns std::nullopt
+ // Otherwise (i.e. state was rejected due to a higher version already having been accepted)
+ // returns an optional containing the current, higher cluster state version.
+ [[nodiscard]] std::optional<uint32_t> try_set_cluster_state_bundle(std::shared_ptr<const ClusterStateBundle> c,
+ uint16_t origin_controller_index);
HostInfo& getHostInfo() { return *_hostInfo; }
void immediately_send_get_node_state_replies() override;
diff --git a/storage/src/vespa/storage/storageserver/storagenode.cpp b/storage/src/vespa/storage/storageserver/storagenode.cpp
index f7a426a0527..5b665f830d3 100644
--- a/storage/src/vespa/storage/storageserver/storagenode.cpp
+++ b/storage/src/vespa/storage/storageserver/storagenode.cpp
@@ -92,6 +92,7 @@ StorageNode::StorageNode(
_statusMetrics(),
_stateReporter(),
_stateManager(),
+ _state_manager_ptr(nullptr),
_chain(),
_configLock(),
_initial_config_mutex(),
@@ -140,18 +141,20 @@ StorageNode::initialize(const NodeStateReporter & nodeStateReporter)
// Initializing state manager early, as others use it init time to
// update node state according min used bits etc.
// Needs node type to be set right away. Needs thread pool, index and
- // dead lock detector too, but not before open()
+ // deadlock detector too, but not before open()
_stateManager = std::make_unique<StateManager>(
_context.getComponentRegister(),
std::move(_hostInfo),
nodeStateReporter,
_singleThreadedDebugMode);
+ _stateManager->set_require_strictly_increasing_cluster_state_versions(server_config().requireStrictlyIncreasingClusterStateVersions);
+ _state_manager_ptr = _stateManager.get();
_context.getComponentRegister().setNodeStateUpdater(*_stateManager);
- // Create VDS root folder, in case it doesn't already exist.
- // Maybe better to rather fail if it doesn't exist, but tests
- // might break if we do that. Might alter later.
- std::filesystem::create_directories(std::filesystem::path(_rootFolder));
+ // Create storage root folder, in case it doesn't already exist.
+ if (!_rootFolder.empty()) {
+ std::filesystem::create_directories(std::filesystem::path(_rootFolder));
+ } // else: running as part of unit tests
initializeNodeSpecific();
@@ -192,13 +195,16 @@ StorageNode::initialize(const NodeStateReporter & nodeStateReporter)
initializeStatusWebServer();
+ if (server_config().writePidFileOnStartup) {
+ assert(!_rootFolder.empty());
// Write pid file as the last thing we do. If we fail initialization
// due to an exception we won't run shutdown. Thus we won't remove the
// pid file if something throws after writing it in initialization.
// Initialize _pidfile here, such that we can know that we didn't create
// it in shutdown code for shutdown during init.
- _pidFile = _rootFolder + "/pidfile";
- writePidFile(_pidFile);
+ _pidFile = _rootFolder + "/pidfile";
+ writePidFile(_pidFile);
+ }
}
void
@@ -242,12 +248,21 @@ StorageNode::handleLiveConfigUpdate(const InitialGuard & initGuard)
DIFFERWARN(clusterName, "Cannot alter cluster name of node live");
DIFFERWARN(nodeIndex, "Cannot alter node index of node live");
DIFFERWARN(isDistributor, "Cannot alter role of node live");
- _server_config.active = std::make_unique<StorServerConfig>(oldC); // TODO this overwrites from ServiceLayerNode
+ [[maybe_unused]] bool updated = false; // magically touched by ASSIGN() macro. TODO rewrite this fun stuff.
+ if (DIFFER(requireStrictlyIncreasingClusterStateVersions)) {
+ LOG(info, "Live config update: require strictly increasing cluster state versions: %s -> %s",
+ (oldC.requireStrictlyIncreasingClusterStateVersions ? "true" : "false"),
+ (newC.requireStrictlyIncreasingClusterStateVersions ? "true" : "false"));
+ ASSIGN(requireStrictlyIncreasingClusterStateVersions);
+ }
+ _server_config.active = std::make_unique<StorServerConfig>(oldC);
_server_config.staging.reset();
_deadLockDetector->enableWarning(server_config().enableDeadLockDetectorWarnings);
_deadLockDetector->enableShutdown(server_config().enableDeadLockDetector);
_deadLockDetector->setProcessSlack(vespalib::from_s(server_config().deadLockDetectorTimeoutSlack));
_deadLockDetector->setWaitSlack(vespalib::from_s(server_config().deadLockDetectorTimeoutSlack));
+ assert(_state_manager_ptr);
+ _state_manager_ptr->set_require_strictly_increasing_cluster_state_versions(server_config().requireStrictlyIncreasingClusterStateVersions);
}
if (_distribution_config.staging) {
StorDistributionConfigBuilder oldC(*_distribution_config.active);
@@ -437,11 +452,17 @@ StorageNode::stage_config_change(ConfigWrapper<ConfigT>& cfg, std::unique_ptr<Co
// else is doing configuration work, and then we write the new config
// to a variable where we can find it later when processing config
// updates
+ // TODO bail if we're shutting down to avoid racing with chain destruction?
+ // - only relevant for distribution config since it's not pushed by main thread
+ // - or have some way of injecting config changes from that level...? must be done atomically!
+ // - ideally want to expose cluster state _bundles_ to relevant components, not config alone!
+ bool live_update;
{
std::lock_guard config_lock_guard(_configLock);
cfg.staging = std::move(new_cfg);
+ live_update = static_cast<bool>(cfg.active);
}
- if (cfg.active) {
+ if (live_update) {
InitialGuard concurrent_config_guard(_initial_config_mutex);
handleLiveConfigUpdate(concurrent_config_guard);
}
diff --git a/storage/src/vespa/storage/storageserver/storagenode.h b/storage/src/vespa/storage/storageserver/storagenode.h
index a96f6b52a66..93265bece3c 100644
--- a/storage/src/vespa/storage/storageserver/storagenode.h
+++ b/storage/src/vespa/storage/storageserver/storagenode.h
@@ -135,6 +135,10 @@ private:
// Depends on metric manager
std::unique_ptr<StateReporter> _stateReporter;
std::unique_ptr<StateManager> _stateManager;
+ // Node subclasses may take ownership of _stateManager in order to infuse it into
+ // their own storage link chain, but they MUST ensure its lifetime is maintained.
+ // We need to remember the original pointer in order to update its config.
+ StateManager* _state_manager_ptr;
// The storage chain can depend on anything.
std::unique_ptr<StorageLink> _chain;
diff --git a/storage/src/vespa/storageapi/mbusprot/protobuf/feed.proto b/storage/src/vespa/storageapi/mbusprot/protobuf/feed.proto
index 55d516a017b..403752b0c84 100644
--- a/storage/src/vespa/storageapi/mbusprot/protobuf/feed.proto
+++ b/storage/src/vespa/storageapi/mbusprot/protobuf/feed.proto
@@ -31,11 +31,18 @@ message Update {
}
message UpdateRequest {
- Bucket bucket = 1;
- Update update = 2;
- uint64 new_timestamp = 3;
- uint64 expected_old_timestamp = 4; // If zero; no expectation.
- TestAndSetCondition condition = 5;
+ enum CreateIfMissing {
+ UNSPECIFIED = 0; // Legacy fallback: must deserialize `update` to find flag value
+ TRUE = 1;
+ FALSE = 2;
+ }
+
+ Bucket bucket = 1;
+ Update update = 2;
+ uint64 new_timestamp = 3;
+ uint64 expected_old_timestamp = 4; // If zero; no expectation.
+ TestAndSetCondition condition = 5;
+ CreateIfMissing create_if_missing = 6;
}
message UpdateResponse {
diff --git a/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp b/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
index 57047be6037..0f4a34cc775 100644
--- a/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
+++ b/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
@@ -465,6 +465,10 @@ void ProtocolSerialization7::onEncode(GBBuf& buf, const api::UpdateCommand& msg)
if (msg.getCondition().isPresent()) {
set_tas_condition(*req.mutable_condition(), msg.getCondition());
}
+ if (msg.has_cached_create_if_missing()) {
+ req.set_create_if_missing(msg.create_if_missing() ? protobuf::UpdateRequest_CreateIfMissing_TRUE
+ : protobuf::UpdateRequest_CreateIfMissing_FALSE);
+ }
});
}
@@ -482,6 +486,9 @@ api::StorageCommand::UP ProtocolSerialization7::onDecodeUpdateCommand(BBuf& buf)
if (req.has_condition()) {
cmd->setCondition(get_tas_condition(req.condition()));
}
+ if (req.create_if_missing() != protobuf::UpdateRequest_CreateIfMissing_UNSPECIFIED) {
+ cmd->set_cached_create_if_missing(req.create_if_missing() == protobuf::UpdateRequest_CreateIfMissing_TRUE);
+ }
return cmd;
});
}
diff --git a/storage/src/vespa/storageapi/message/persistence.cpp b/storage/src/vespa/storageapi/message/persistence.cpp
index 4c24bb74faf..af054855bbe 100644
--- a/storage/src/vespa/storageapi/message/persistence.cpp
+++ b/storage/src/vespa/storageapi/message/persistence.cpp
@@ -105,13 +105,23 @@ UpdateCommand::UpdateCommand(const document::Bucket &bucket, const document::Doc
: TestAndSetCommand(MessageType::UPDATE, bucket),
_update(update),
_timestamp(time),
- _oldTimestamp(0)
+ _oldTimestamp(0),
+ _create_if_missing()
{
if ( ! _update) {
throw vespalib::IllegalArgumentException("Cannot update a null update", VESPA_STRLOC);
}
}
+bool
+UpdateCommand::create_if_missing() const
+{
+ if (_create_if_missing.has_value()) {
+ return *_create_if_missing;
+ }
+ return _update->getCreateIfNonExistent();
+}
+
const document::DocumentType *
UpdateCommand::getDocumentType() const {
return &_update->getType();
diff --git a/storage/src/vespa/storageapi/message/persistence.h b/storage/src/vespa/storageapi/message/persistence.h
index f44ab4e8280..0676e1d0f44 100644
--- a/storage/src/vespa/storageapi/message/persistence.h
+++ b/storage/src/vespa/storageapi/message/persistence.h
@@ -1,8 +1,6 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
/**
- * @file persistence.h
- *
- * Persistence related commands, like put, get & remove
+ * Persistence related commands, like put, get & remove
*/
#pragma once
@@ -10,6 +8,7 @@
#include <vespa/storageapi/defs.h>
#include <vespa/document/base/documentid.h>
#include <vespa/documentapi/messagebus/messages/testandsetcondition.h>
+#include <optional>
namespace document {
class DocumentUpdate;
@@ -117,20 +116,32 @@ class UpdateCommand : public TestAndSetCommand {
std::shared_ptr<document::DocumentUpdate> _update;
Timestamp _timestamp;
Timestamp _oldTimestamp;
+ std::optional<bool> _create_if_missing; // caches the value held (possibly lazily deserialized) in _update
public:
UpdateCommand(const document::Bucket &bucket,
const std::shared_ptr<document::DocumentUpdate>&, Timestamp);
~UpdateCommand() override;
- void setTimestamp(Timestamp ts) { _timestamp = ts; }
- void setOldTimestamp(Timestamp ts) { _oldTimestamp = ts; }
+ void setTimestamp(Timestamp ts) noexcept { _timestamp = ts; }
+ void setOldTimestamp(Timestamp ts) noexcept { _oldTimestamp = ts; }
+
+ [[nodiscard]] bool has_cached_create_if_missing() const noexcept {
+ return _create_if_missing.has_value();
+ }
+ // It is the caller's responsibility to ensure this value matches that of _update->getCreateIfNonExisting()
+ void set_cached_create_if_missing(bool create) noexcept {
+ _create_if_missing = create;
+ }
const std::shared_ptr<document::DocumentUpdate>& getUpdate() const { return _update; }
const document::DocumentId& getDocumentId() const override;
Timestamp getTimestamp() const { return _timestamp; }
Timestamp getOldTimestamp() const { return _oldTimestamp; }
+ // May throw iff has_cached_create_if_missing() == false, otherwise noexcept.
+ [[nodiscard]] bool create_if_missing() const;
+
const document::DocumentType * getDocumentType() const override;
vespalib::string getSummary() const override;
diff --git a/storage/src/vespa/storageapi/message/state.cpp b/storage/src/vespa/storageapi/message/state.cpp
index 5a50167f584..b4e8655d783 100644
--- a/storage/src/vespa/storageapi/message/state.cpp
+++ b/storage/src/vespa/storageapi/message/state.cpp
@@ -5,8 +5,7 @@
#include <vespa/vdslib/state/clusterstate.h>
#include <ostream>
-namespace storage {
-namespace api {
+namespace storage::api {
IMPLEMENT_COMMAND(GetNodeStateCommand, GetNodeStateReply)
IMPLEMENT_REPLY(GetNodeStateReply)
@@ -45,7 +44,7 @@ GetNodeStateReply::GetNodeStateReply(const GetNodeStateCommand& cmd)
GetNodeStateReply::GetNodeStateReply(const GetNodeStateCommand& cmd,
const lib::NodeState& state)
: StorageReply(cmd),
- _state(new lib::NodeState(state))
+ _state(std::make_unique<lib::NodeState>(state))
{
}
@@ -64,23 +63,31 @@ GetNodeStateReply::print(std::ostream& out, bool verbose,
}
}
+SetSystemStateCommand::SetSystemStateCommand(std::shared_ptr<const lib::ClusterStateBundle> state)
+ : StorageCommand(MessageType::SETSYSTEMSTATE),
+ _state(std::move(state))
+{
+}
+
SetSystemStateCommand::SetSystemStateCommand(const lib::ClusterStateBundle& state)
: StorageCommand(MessageType::SETSYSTEMSTATE),
- _state(state)
+ _state(std::make_shared<const lib::ClusterStateBundle>(state))
{
}
SetSystemStateCommand::SetSystemStateCommand(const lib::ClusterState& state)
: StorageCommand(MessageType::SETSYSTEMSTATE),
- _state(state)
+ _state(std::make_shared<const lib::ClusterStateBundle>(state))
{
}
+SetSystemStateCommand::~SetSystemStateCommand() = default;
+
void
SetSystemStateCommand::print(std::ostream& out, bool verbose,
const std::string& indent) const
{
- out << "SetSystemStateCommand(" << *_state.getBaselineClusterState() << ")";
+ out << "SetSystemStateCommand(" << *_state->getBaselineClusterState() << ")";
if (verbose) {
out << " : ";
StorageCommand::print(out, verbose, indent);
@@ -89,7 +96,7 @@ SetSystemStateCommand::print(std::ostream& out, bool verbose,
SetSystemStateReply::SetSystemStateReply(const SetSystemStateCommand& cmd)
: StorageReply(cmd),
- _state(cmd.getClusterStateBundle())
+ _state(cmd.cluster_state_bundle_ptr())
{
}
@@ -138,5 +145,4 @@ void ActivateClusterStateVersionReply::print(std::ostream& out, bool verbose,
}
}
-} // api
-} // storage
+} // storage::api
diff --git a/storage/src/vespa/storageapi/message/state.h b/storage/src/vespa/storageapi/message/state.h
index 900355b12a2..afeb5ae9c11 100644
--- a/storage/src/vespa/storageapi/message/state.h
+++ b/storage/src/vespa/storageapi/message/state.h
@@ -9,12 +9,6 @@
namespace storage::api {
-/**
- * @class GetNodeStateCommand
- * @ingroup message
- *
- * @brief Command for setting node state. No payload
- */
class GetNodeStateCommand : public StorageCommand {
lib::NodeState::UP _expectedState;
@@ -27,12 +21,6 @@ public:
DECLARE_STORAGECOMMAND(GetNodeStateCommand, onGetNodeState)
};
-/**
- * @class GetNodeStateReply
- * @ingroup message
- *
- * @brief Reply to GetNodeStateCommand
- */
class GetNodeStateReply : public StorageReply {
lib::NodeState::UP _state;
std::string _nodeInfo;
@@ -53,41 +41,38 @@ public:
};
/**
- * @class SetSystemStateCommand
- * @ingroup message
- *
- * @brief Command for telling a node about the system state - state of each node
- * in the system and state of the system (all ok, no merging, block
- * put/get/remove etx)
+ * Command for telling a node about the cluster state - state of each node
+ * in the cluster and state of the cluster itself (all ok, no merging, block
+ * put/get/remove etx)
*/
class SetSystemStateCommand : public StorageCommand {
- lib::ClusterStateBundle _state;
+ std::shared_ptr<const lib::ClusterStateBundle> _state;
public:
+ explicit SetSystemStateCommand(std::shared_ptr<const lib::ClusterStateBundle> state);
explicit SetSystemStateCommand(const lib::ClusterStateBundle &state);
explicit SetSystemStateCommand(const lib::ClusterState &state);
- const lib::ClusterState& getSystemState() const { return *_state.getBaselineClusterState(); }
- const lib::ClusterStateBundle& getClusterStateBundle() const { return _state; }
+ ~SetSystemStateCommand() override;
+
+ [[nodiscard]] const lib::ClusterState& getSystemState() const { return *_state->getBaselineClusterState(); }
+ [[nodiscard]] const lib::ClusterStateBundle& getClusterStateBundle() const { return *_state; }
+ [[nodiscard]] std::shared_ptr<const lib::ClusterStateBundle> cluster_state_bundle_ptr() const noexcept {
+ return _state;
+ }
void print(std::ostream& out, bool verbose, const std::string& indent) const override;
DECLARE_STORAGECOMMAND(SetSystemStateCommand, onSetSystemState)
};
-/**
- * @class SetSystemStateReply
- * @ingroup message
- *
- * @brief Reply received after a SetSystemStateCommand.
- */
class SetSystemStateReply : public StorageReply {
- lib::ClusterStateBundle _state;
+ std::shared_ptr<const lib::ClusterStateBundle> _state;
public:
explicit SetSystemStateReply(const SetSystemStateCommand& cmd);
// Not serialized. Available locally
- const lib::ClusterState& getSystemState() const { return *_state.getBaselineClusterState(); }
- const lib::ClusterStateBundle& getClusterStateBundle() const { return _state; }
+ const lib::ClusterState& getSystemState() const { return *_state->getBaselineClusterState(); }
+ const lib::ClusterStateBundle& getClusterStateBundle() const { return *_state; }
void print(std::ostream& out, bool verbose, const std::string& indent) const override;
DECLARE_STORAGEREPLY(SetSystemStateReply, onSetSystemStateReply)