diff options
Diffstat (limited to 'storage/src/tests')
17 files changed, 219 insertions, 208 deletions
diff --git a/storage/src/tests/common/global_bucket_space_distribution_converter_test.cpp b/storage/src/tests/common/global_bucket_space_distribution_converter_test.cpp index 23a0df81bab..cd70aecd1bb 100644 --- a/storage/src/tests/common/global_bucket_space_distribution_converter_test.cpp +++ b/storage/src/tests/common/global_bucket_space_distribution_converter_test.cpp @@ -2,7 +2,6 @@ #include <vespa/storage/common/global_bucket_space_distribution_converter.h> #include <vespa/vdslib/distribution/distribution.h> -#include <vespa/config/config.h> #include <vespa/vdslib/state/clusterstate.h> #include <vespa/vespalib/gtest/gtest.h> diff --git a/storage/src/tests/common/teststorageapp.cpp b/storage/src/tests/common/teststorageapp.cpp index 7a19e84791d..91fdf5aa602 100644 --- a/storage/src/tests/common/teststorageapp.cpp +++ b/storage/src/tests/common/teststorageapp.cpp @@ -4,14 +4,13 @@ #include <vespa/storage/common/content_bucket_db_options.h> #include <vespa/storage/config/config-stor-server.h> #include <vespa/config-stor-distribution.h> -#include <vespa/config-load-type.h> #include <vespa/config-fleetcontroller.h> #include <vespa/persistence/dummyimpl/dummypersistence.h> #include <vespa/vdslib/distribution/distribution.h> #include <vespa/vdslib/state/clusterstate.h> #include <vespa/vespalib/util/exceptions.h> #include <vespa/vespalib/util/time.h> -#include <vespa/config/config.h> +#include <vespa/config/subscription/configuri.h> #include <vespa/config/helper/configgetter.hpp> #include <thread> #include <sstream> diff --git a/storage/src/tests/distributor/CMakeLists.txt b/storage/src/tests/distributor/CMakeLists.txt index bee7650aebd..501f9a18c47 100644 --- a/storage/src/tests/distributor/CMakeLists.txt +++ b/storage/src/tests/distributor/CMakeLists.txt @@ -9,6 +9,7 @@ vespa_add_executable(storage_distributor_gtest_runner_app TEST bucketdbmetricupdatertest.cpp bucketgctimecalculatortest.cpp bucketstateoperationtest.cpp + distributor_bucket_space_repo_test.cpp distributor_bucket_space_test.cpp distributor_host_info_reporter_test.cpp distributor_message_sender_stub.cpp diff --git a/storage/src/tests/distributor/distributor_bucket_space_repo_test.cpp b/storage/src/tests/distributor/distributor_bucket_space_repo_test.cpp new file mode 100644 index 00000000000..151dcff3d10 --- /dev/null +++ b/storage/src/tests/distributor/distributor_bucket_space_repo_test.cpp @@ -0,0 +1,72 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/document/bucket/fixed_bucket_spaces.h> +#include <vespa/storage/distributor/distributor_bucket_space.h> +#include <vespa/storage/distributor/distributor_bucket_space_repo.h> +#include <vespa/vdslib/state/cluster_state_bundle.h> +#include <vespa/vdslib/state/clusterstate.h> +#include <vespa/vespalib/gtest/gtest.h> +#include <memory> + +namespace storage::distributor { + +using document::FixedBucketSpaces; +using namespace ::testing; + +struct DistributorBucketSpaceRepoTest : Test { + DistributorBucketSpaceRepo _repo; + + DistributorBucketSpaceRepoTest() : _repo(123) {} +}; + +namespace { + +lib::ClusterStateBundle bundle_with_global_merges() { + auto global_state = std::make_shared<lib::ClusterState>("distributor:1 storage:2"); + auto default_state = std::make_shared<lib::ClusterState>("distributor:1 storage:2 .1.s:m"); + return lib::ClusterStateBundle(*global_state, {{FixedBucketSpaces::default_space(), default_state}, + {FixedBucketSpaces::global_space(), global_state}}); +} + +lib::ClusterStateBundle bundle_without_global_merges() { + auto global_state = std::make_shared<lib::ClusterState>("distributor:1 storage:2"); + auto default_state = std::make_shared<lib::ClusterState>("distributor:1 storage:2"); + return lib::ClusterStateBundle(*global_state, {{FixedBucketSpaces::default_space(), default_state}, + {FixedBucketSpaces::global_space(), global_state}}); +} + +} + +TEST_F(DistributorBucketSpaceRepoTest, bucket_spaces_are_initially_not_tagged_as_merge_inhibited) { + EXPECT_FALSE(_repo.get(FixedBucketSpaces::default_space()).merges_inhibited()); + EXPECT_FALSE(_repo.get(FixedBucketSpaces::global_space()).merges_inhibited()); +} + +TEST_F(DistributorBucketSpaceRepoTest, enabled_bundle_with_pending_global_merges_tags_default_space_as_merge_inhibited) { + _repo.enable_cluster_state_bundle(bundle_with_global_merges()); + EXPECT_TRUE(_repo.get(FixedBucketSpaces::default_space()).merges_inhibited()); + EXPECT_FALSE(_repo.get(FixedBucketSpaces::global_space()).merges_inhibited()); +} + +TEST_F(DistributorBucketSpaceRepoTest, enabled_bundle_without_pending_global_merges_unsets_merge_inhibition) { + _repo.enable_cluster_state_bundle(bundle_with_global_merges()); + _repo.enable_cluster_state_bundle(bundle_without_global_merges()); + EXPECT_FALSE(_repo.get(FixedBucketSpaces::default_space()).merges_inhibited()); + EXPECT_FALSE(_repo.get(FixedBucketSpaces::global_space()).merges_inhibited()); +} + +TEST_F(DistributorBucketSpaceRepoTest, pending_bundle_with_pending_global_merges_tags_default_space_as_merge_inhibited) { + _repo.enable_cluster_state_bundle(bundle_without_global_merges()); + _repo.set_pending_cluster_state_bundle(bundle_with_global_merges()); + EXPECT_TRUE(_repo.get(FixedBucketSpaces::default_space()).merges_inhibited()); + EXPECT_FALSE(_repo.get(FixedBucketSpaces::global_space()).merges_inhibited()); +} + +TEST_F(DistributorBucketSpaceRepoTest, pending_bundle_without_pending_global_unsets_merge_inhibition) { + _repo.enable_cluster_state_bundle(bundle_with_global_merges()); + _repo.set_pending_cluster_state_bundle(bundle_without_global_merges()); + EXPECT_FALSE(_repo.get(FixedBucketSpaces::default_space()).merges_inhibited()); + EXPECT_FALSE(_repo.get(FixedBucketSpaces::global_space()).merges_inhibited()); +} + +} diff --git a/storage/src/tests/distributor/distributor_stripe_test.cpp b/storage/src/tests/distributor/distributor_stripe_test.cpp index 8c2ebc983fa..709f2e6cdc5 100644 --- a/storage/src/tests/distributor/distributor_stripe_test.cpp +++ b/storage/src/tests/distributor/distributor_stripe_test.cpp @@ -629,6 +629,19 @@ TEST_F(DistributorStripeTest, max_clock_skew_config_is_propagated_to_distributor EXPECT_EQ(getConfig().getMaxClusterClockSkew(), std::chrono::seconds(5)); } +TEST_F(DistributorStripeTest, inhibit_default_merge_if_global_merges_pending_config_is_propagated) +{ + setup_stripe(Redundancy(2), NodeCount(2), "storage:2 distributor:1"); + ConfigBuilder builder; + builder.inhibitDefaultMergesWhenGlobalMergesPending = true; + configure_stripe(builder); + EXPECT_TRUE(getConfig().inhibit_default_merges_when_global_merges_pending()); + + builder.inhibitDefaultMergesWhenGlobalMergesPending = false; + configure_stripe(builder); + EXPECT_FALSE(getConfig().inhibit_default_merges_when_global_merges_pending()); +} + namespace { auto makeDummyRemoveCommand() { diff --git a/storage/src/tests/distributor/putoperationtest.cpp b/storage/src/tests/distributor/putoperationtest.cpp index b02395717e0..53773a55826 100644 --- a/storage/src/tests/distributor/putoperationtest.cpp +++ b/storage/src/tests/distributor/putoperationtest.cpp @@ -1,7 +1,6 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <tests/distributor/distributor_stripe_test_util.h> -#include <vespa/document/config/documenttypes_config_fwd.h> #include <vespa/document/repo/documenttyperepo.h> #include <vespa/document/test/make_document_bucket.h> #include <vespa/storage/distributor/top_level_distributor.h> @@ -13,6 +12,7 @@ #include <vespa/storageapi/message/state.h> #include <vespa/vespalib/gtest/gtest.h> #include <vespa/vespalib/text/stringtokenizer.h> +#include <vespa/config/helper/configgetter.h> using std::shared_ptr; using config::ConfigGetter; diff --git a/storage/src/tests/distributor/statecheckerstest.cpp b/storage/src/tests/distributor/statecheckerstest.cpp index d481370b2c1..94f913a3325 100644 --- a/storage/src/tests/distributor/statecheckerstest.cpp +++ b/storage/src/tests/distributor/statecheckerstest.cpp @@ -175,6 +175,8 @@ struct StateCheckersTest : Test, DistributorStripeTestUtil { bool _includeSchedulingPriority {false}; bool _merge_operations_disabled {false}; bool _prioritize_global_bucket_merges {true}; + bool _config_enable_default_space_merge_inhibition {false}; + bool _merges_inhibited_in_bucket_space {false}; CheckerParams(); ~CheckerParams(); @@ -222,6 +224,14 @@ struct StateCheckersTest : Test, DistributorStripeTestUtil { _bucket_space = bucket_space; return *this; } + CheckerParams& config_enable_default_space_merge_inhibition(bool enabled) noexcept { + _config_enable_default_space_merge_inhibition = enabled; + return *this; + } + CheckerParams& merges_inhibited_in_bucket_space(bool inhibited) noexcept { + _merges_inhibited_in_bucket_space = inhibited; + return *this; + } }; template <typename CheckerImpl> @@ -236,10 +246,12 @@ struct StateCheckersTest : Test, DistributorStripeTestUtil { vespa::config::content::core::StorDistributormanagerConfigBuilder config; config.mergeOperationsDisabled = params._merge_operations_disabled; config.prioritizeGlobalBucketMerges = params._prioritize_global_bucket_merges; + config.inhibitDefaultMergesWhenGlobalMergesPending = params._config_enable_default_space_merge_inhibition; configure_stripe(config); if (!params._pending_cluster_state.empty()) { simulate_set_pending_cluster_state(params._pending_cluster_state); } + getBucketSpaceRepo().get(params._bucket_space).set_merges_inhibited(params._merges_inhibited_in_bucket_space); NodeMaintenanceStatsTracker statsTracker; StateChecker::Context c(node_context(), operation_context(), @@ -818,6 +830,32 @@ TEST_F(StateCheckersTest, no_merge_operation_generated_if_merges_explicitly_conf .merge_operations_disabled(true)); } +TEST_F(StateCheckersTest, no_merge_operation_generated_if_merges_inhibited_in_default_bucket_space_and_config_allowed) { + // Technically, the state checker doesn't look at global vs. non-global but instead defers + // to the distributor bucket space repo to set the inhibition flag on the correct bucket space. + // This particular logic is tested at a higher repo-level. + runAndVerify<SynchronizeAndMoveStateChecker>( + CheckerParams() + .expect("NO OPERATIONS GENERATED") // Would normally generate a merge op + .bucketInfo("0=1,2=2") + .config_enable_default_space_merge_inhibition(true) + .merges_inhibited_in_bucket_space(true) + .clusterState("distributor:1 storage:3")); +} + +TEST_F(StateCheckersTest, merge_operation_still_generated_if_merges_inhibited_in_default_bucket_space_but_config_disallowed) { + runAndVerify<SynchronizeAndMoveStateChecker>( + CheckerParams() + .expect("[Moving bucket to ideal node 1]" + "[Synchronizing buckets with different checksums " + "node(idx=0,crc=0x1,docs=1/1,bytes=1/1,trusted=false,active=false,ready=false), " + "node(idx=2,crc=0x2,docs=2/2,bytes=2/2,trusted=false,active=false,ready=false)]") + .bucketInfo("0=1,2=2") + .config_enable_default_space_merge_inhibition(false) + .merges_inhibited_in_bucket_space(true) + .clusterState("distributor:1 storage:3")); +} + std::string StateCheckersTest::testDeleteExtraCopies( const std::string& bucketInfo, uint32_t redundancy, diff --git a/storage/src/tests/distributor/top_level_distributor_test.cpp b/storage/src/tests/distributor/top_level_distributor_test.cpp index a0477e352d1..2c19652b4c2 100644 --- a/storage/src/tests/distributor/top_level_distributor_test.cpp +++ b/storage/src/tests/distributor/top_level_distributor_test.cpp @@ -253,7 +253,7 @@ TEST_F(TopLevelDistributorTest, tick_aggregates_status_requests_from_all_stripes FakeClock clock; ThreadPoolImpl pool(clock); int ticksBeforeWait = 1; - framework::Thread::UP tp(pool.startThread(thread, "statustest", 5ms, 5s, ticksBeforeWait)); + framework::Thread::UP tp(pool.startThread(thread, "statustest", 5ms, 5s, ticksBeforeWait, std::nullopt)); while (true) { std::this_thread::sleep_for(1ms); diff --git a/storage/src/tests/distributor/top_level_distributor_test_util.cpp b/storage/src/tests/distributor/top_level_distributor_test_util.cpp index 2a61141865a..5dcc7f5c6ad 100644 --- a/storage/src/tests/distributor/top_level_distributor_test_util.cpp +++ b/storage/src/tests/distributor/top_level_distributor_test_util.cpp @@ -31,8 +31,8 @@ TopLevelDistributorTestUtil::~TopLevelDistributorTestUtil() = default; void TopLevelDistributorTestUtil::create_links() { - _node.reset(new TestDistributorApp(_config.getConfigId())); - _thread_pool = framework::TickingThreadPool::createDefault("distributor"); + _node = std::make_unique<TestDistributorApp>(_config.getConfigId()); + _thread_pool = framework::TickingThreadPool::createDefault("distributor", 100ms); _stripe_pool = DistributorStripePool::make_non_threaded_pool_for_testing(); _distributor.reset(new TopLevelDistributor( _node->getComponentRegister(), @@ -43,7 +43,7 @@ TopLevelDistributorTestUtil::create_links() _num_distributor_stripes, _host_info, &_message_sender)); - _component.reset(new storage::DistributorComponent(_node->getComponentRegister(), "distrtestutil")); + _component = std::make_unique<storage::DistributorComponent>(_node->getComponentRegister(), "distrtestutil"); }; void diff --git a/storage/src/tests/distributor/visitoroperationtest.cpp b/storage/src/tests/distributor/visitoroperationtest.cpp index ea99186a434..6c597b620dd 100644 --- a/storage/src/tests/distributor/visitoroperationtest.cpp +++ b/storage/src/tests/distributor/visitoroperationtest.cpp @@ -252,6 +252,20 @@ TEST_F(VisitorOperationTest, no_bucket) { runEmptyVisitor(msg)); } +TEST_F(VisitorOperationTest, none_fieldset_is_rejected) { + enable_cluster_state("distributor:1 storage:1"); + auto msg = std::make_shared<api::CreateVisitorCommand>( + makeBucketSpace(), "dumpvisitor", "instance", ""); + msg->addBucketToBeVisited(document::BucketId(16, 1)); + msg->addBucketToBeVisited(nullId); + msg->setFieldSet("[none]"); + + EXPECT_EQ("CreateVisitorReply(last=BucketId(0x0000000000000000)) " + "ReturnCode(ILLEGAL_PARAMETERS, Field set '[none]' is not supported " + "for external visitor operations. Use '[id]' to return documents with no fields set.)", + runEmptyVisitor(msg)); +} + TEST_F(VisitorOperationTest, only_super_bucket_and_progress_allowed) { enable_cluster_state("distributor:1 storage:1"); diff --git a/storage/src/tests/frameworkimpl/status/statustest.cpp b/storage/src/tests/frameworkimpl/status/statustest.cpp index 2e136977026..97bfa41aece 100644 --- a/storage/src/tests/frameworkimpl/status/statustest.cpp +++ b/storage/src/tests/frameworkimpl/status/statustest.cpp @@ -2,7 +2,6 @@ #include <vespa/storageframework/defaultimplementation/component/componentregisterimpl.h> #include <vespa/storage/frameworkimpl/status/statuswebserver.h> -#include <vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h> #include <vespa/storageframework/generic/status/htmlstatusreporter.h> #include <vespa/storageframework/generic/status/xmlstatusreporter.h> #include <tests/common/teststorageapp.h> @@ -10,6 +9,7 @@ #include <vespa/vespalib/net/crypto_engine.h> #include <vespa/vespalib/net/socket_spec.h> #include <vespa/vespalib/net/sync_crypto_socket.h> +#include <vespa/config/subscription/configuri.h> #include <vespa/vespalib/gtest/gtest.h> #include <gmock/gmock.h> diff --git a/storage/src/tests/persistence/CMakeLists.txt b/storage/src/tests/persistence/CMakeLists.txt index fb8120210c1..7b165e11b66 100644 --- a/storage/src/tests/persistence/CMakeLists.txt +++ b/storage/src/tests/persistence/CMakeLists.txt @@ -12,7 +12,6 @@ vespa_add_executable(storage_persistence_gtest_runner_app TEST persistencethread_splittest.cpp processalltest.cpp provider_error_wrapper_test.cpp - shared_operation_throttler_test.cpp splitbitdetectortest.cpp testandsettest.cpp gtest_runner.cpp diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.h b/storage/src/tests/persistence/common/persistenceproviderwrapper.h index 1552a955221..e0538fb7ca7 100644 --- a/storage/src/tests/persistence/common/persistenceproviderwrapper.h +++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.h @@ -72,8 +72,14 @@ public: /** * Set a mask for operations to fail with _result */ - void setFailureMask(uint32_t mask) { _failureMask = mask; } - uint32_t getFailureMask() const { return _failureMask; } + void setFailureMask(uint32_t mask) { + Guard guard(_lock); + _failureMask = mask; + } + uint32_t getFailureMask() const { + Guard guard(_lock); + return _failureMask; + } /** * Get a string representation of all the operations performed on the diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp index 6288f86993d..e9d399d357f 100644 --- a/storage/src/tests/persistence/mergehandlertest.cpp +++ b/storage/src/tests/persistence/mergehandlertest.cpp @@ -21,11 +21,9 @@ using namespace ::testing; namespace storage { /* - * Class for testing merge handler taking async_apply_bucket_diff as - * parameter for the test. + * Class for testing merge handler. */ -struct MergeHandlerTest : PersistenceTestUtils, - public testing::WithParamInterface<bool> { +struct MergeHandlerTest : PersistenceTestUtils { uint32_t _location; // Location used for all merge tests document::Bucket _bucket; // Bucket used for all merge tests uint64_t _maxTimestamp; @@ -172,11 +170,11 @@ struct MergeHandlerTest : PersistenceTestUtils, MergeHandler createHandler(size_t maxChunkSize = 0x400000) { return MergeHandler(getEnv(), getPersistenceProvider(), - getEnv()._component.cluster_context(), getEnv()._component.getClock(), *_sequenceTaskExecutor, maxChunkSize, 64, GetParam()); + getEnv()._component.cluster_context(), getEnv()._component.getClock(), *_sequenceTaskExecutor, maxChunkSize, 64); } MergeHandler createHandler(spi::PersistenceProvider & spi) { return MergeHandler(getEnv(), spi, - getEnv()._component.cluster_context(), getEnv()._component.getClock(), *_sequenceTaskExecutor, 4190208, 64, GetParam()); + getEnv()._component.cluster_context(), getEnv()._component.getClock(), *_sequenceTaskExecutor, 4190208, 64); } std::shared_ptr<api::StorageMessage> get_queued_reply() { @@ -232,7 +230,7 @@ MergeHandlerTest::setUpChain(ChainPos pos) { // Test a regular merge bucket command fetching data, including // puts, removes, unrevertable removes & duplicates. -TEST_P(MergeHandlerTest, merge_bucket_command) { +TEST_F(MergeHandlerTest, merge_bucket_command) { MergeHandler handler = createHandler(); LOG(debug, "Handle a merge bucket command"); @@ -293,11 +291,11 @@ MergeHandlerTest::testGetBucketDiffChain(bool midChain) EXPECT_EQ(17, diff.size()); } -TEST_P(MergeHandlerTest, get_bucket_diff_mid_chain) { +TEST_F(MergeHandlerTest, get_bucket_diff_mid_chain) { testGetBucketDiffChain(true); } -TEST_P(MergeHandlerTest, get_bucket_diff_end_of_chain) { +TEST_F(MergeHandlerTest, get_bucket_diff_end_of_chain) { testGetBucketDiffChain(false); } @@ -344,17 +342,17 @@ MergeHandlerTest::testApplyBucketDiffChain(bool midChain) EXPECT_EQ(0, diff.size()); } -TEST_P(MergeHandlerTest, apply_bucket_diff_mid_chain) { +TEST_F(MergeHandlerTest, apply_bucket_diff_mid_chain) { testApplyBucketDiffChain(true); } -TEST_P(MergeHandlerTest, apply_bucket_diff_end_of_chain) { +TEST_F(MergeHandlerTest, apply_bucket_diff_end_of_chain) { testApplyBucketDiffChain(false); } // Test that a simplistic merge with one thing to actually merge, // sends correct commands and finish. -TEST_P(MergeHandlerTest, master_message_flow) { +TEST_F(MergeHandlerTest, master_message_flow) { MergeHandler handler = createHandler(); LOG(debug, "Handle a merge bucket command"); @@ -448,7 +446,7 @@ getFilledDataSize(const std::vector<api::ApplyBucketDiffCommand::Entry>& diff) } -TEST_P(MergeHandlerTest, chunked_apply_bucket_diff) { +TEST_F(MergeHandlerTest, chunked_apply_bucket_diff) { uint32_t docSize = 1024; uint32_t docCount = 10; uint32_t maxChunkSize = docSize * 3; @@ -512,7 +510,7 @@ TEST_P(MergeHandlerTest, chunked_apply_bucket_diff) { EXPECT_TRUE(reply->getResult().success()); } -TEST_P(MergeHandlerTest, chunk_limit_partially_filled_diff) { +TEST_F(MergeHandlerTest, chunk_limit_partially_filled_diff) { setUpChain(FRONT); uint32_t docSize = 1024; @@ -548,7 +546,7 @@ TEST_P(MergeHandlerTest, chunk_limit_partially_filled_diff) { EXPECT_LE(getFilledDataSize(fwdDiffCmd->getDiff()), maxChunkSize); } -TEST_P(MergeHandlerTest, max_timestamp) { +TEST_F(MergeHandlerTest, max_timestamp) { doPut(1234, spi::Timestamp(_maxTimestamp + 10), 1024, 1024); MergeHandler handler = createHandler(); @@ -656,7 +654,7 @@ MergeHandlerTest::createDummyGetBucketDiff(int timestampOffset, uint16_t hasMask return getBucketDiffCmd; } -TEST_P(MergeHandlerTest, spi_flush_guard) { +TEST_F(MergeHandlerTest, spi_flush_guard) { PersistenceProviderWrapper providerWrapper(getPersistenceProvider()); MergeHandler handler = createHandler(providerWrapper); @@ -671,16 +669,14 @@ TEST_P(MergeHandlerTest, spi_flush_guard) { try { auto cmd = createDummyApplyDiff(6000); handler.handleApplyBucketDiff(*cmd, createTracker(cmd, _bucket)); - if (GetParam()) { - convert_delayed_error_to_exception(handler); - } + convert_delayed_error_to_exception(handler); FAIL() << "No exception thrown on failing in-place remove"; } catch (const std::runtime_error& e) { EXPECT_TRUE(std::string(e.what()).find("Failed remove") != std::string::npos); } } -TEST_P(MergeHandlerTest, bucket_not_found_in_db) { +TEST_F(MergeHandlerTest, bucket_not_found_in_db) { MergeHandler handler = createHandler(); // Send merge for unknown bucket auto cmd = std::make_shared<api::MergeBucketCommand>(makeDocumentBucket(document::BucketId(16, 6789)), _nodes, _maxTimestamp); @@ -688,7 +684,7 @@ TEST_P(MergeHandlerTest, bucket_not_found_in_db) { EXPECT_TRUE(tracker->getResult().isBucketDisappearance()); } -TEST_P(MergeHandlerTest, merge_progress_safe_guard) { +TEST_F(MergeHandlerTest, merge_progress_safe_guard) { MergeHandler handler = createHandler(); auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp); handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket)); @@ -711,7 +707,7 @@ TEST_P(MergeHandlerTest, merge_progress_safe_guard) { EXPECT_EQ(mergeReply->getResult().getResult(), api::ReturnCode::INTERNAL_FAILURE); } -TEST_P(MergeHandlerTest, safe_guard_not_invoked_when_has_mask_changes) { +TEST_F(MergeHandlerTest, safe_guard_not_invoked_when_has_mask_changes) { MergeHandler handler = createHandler(); _nodes.clear(); _nodes.emplace_back(0, false); @@ -743,7 +739,7 @@ TEST_P(MergeHandlerTest, safe_guard_not_invoked_when_has_mask_changes) { EXPECT_EQ(0x5, applyBucketDiffCmd2->getDiff()[0]._entry._hasMask); } -TEST_P(MergeHandlerTest, entry_removed_after_get_bucket_diff) { +TEST_F(MergeHandlerTest, entry_removed_after_get_bucket_diff) { MergeHandler handler = createHandler(); std::vector<api::ApplyBucketDiffCommand::Entry> applyDiff; { @@ -799,9 +795,7 @@ MergeHandlerTest::doTestSPIException(MergeHandler& handler, providerWrapper.setFailureMask(failureMask); try { invoker.invoke(*this, handler, *_context); - if (GetParam()) { - convert_delayed_error_to_exception(handler); - } + convert_delayed_error_to_exception(handler); if (failureMask != 0) { return (std::string("No exception was thrown during handler " "invocation. Expected exception containing '") @@ -870,7 +864,7 @@ MergeHandlerTest::HandleMergeBucketInvoker::invoke( handler.handleMergeBucket(*cmd, test.createTracker(cmd, test._bucket)); } -TEST_P(MergeHandlerTest, merge_bucket_spi_failures) { +TEST_F(MergeHandlerTest, merge_bucket_spi_failures) { PersistenceProviderWrapper providerWrapper(getPersistenceProvider()); MergeHandler handler = createHandler(providerWrapper); providerWrapper.setResult( @@ -901,7 +895,7 @@ MergeHandlerTest::HandleGetBucketDiffInvoker::invoke( handler.handleGetBucketDiff(*cmd, test.createTracker(cmd, test._bucket)); } -TEST_P(MergeHandlerTest, get_bucket_diff_spi_failures) { +TEST_F(MergeHandlerTest, get_bucket_diff_spi_failures) { PersistenceProviderWrapper providerWrapper(getPersistenceProvider()); MergeHandler handler = createHandler(providerWrapper); providerWrapper.setResult(spi::Result(spi::Result::ErrorType::PERMANENT_ERROR, "who you gonna call?")); @@ -933,7 +927,7 @@ MergeHandlerTest::HandleApplyBucketDiffInvoker::invoke( handler.handleApplyBucketDiff(*cmd, test.createTracker(cmd, test._bucket)); } -TEST_P(MergeHandlerTest, apply_bucket_diff_spi_failures) { +TEST_F(MergeHandlerTest, apply_bucket_diff_spi_failures) { PersistenceProviderWrapper providerWrapper(getPersistenceProvider()); MergeHandler handler = createHandler(providerWrapper); providerWrapper.setResult( @@ -998,7 +992,7 @@ MergeHandlerTest::HandleGetBucketDiffReplyInvoker::afterInvoke( api::ReturnCode::INTERNAL_FAILURE); } -TEST_P(MergeHandlerTest, get_bucket_diff_reply_spi_failures) { +TEST_F(MergeHandlerTest, get_bucket_diff_reply_spi_failures) { PersistenceProviderWrapper providerWrapper(getPersistenceProvider()); MergeHandler handler = createHandler(providerWrapper); providerWrapper.setResult( @@ -1073,9 +1067,7 @@ MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::invoke( test.fillDummyApplyDiff(reply->getDiff()); _stub.clear(); handler.handleApplyBucketDiffReply(*reply, _stub, test.createTracker(reply, test._bucket)); - if (test.GetParam()) { - convert_delayed_error_to_exception(test, handler); - } + convert_delayed_error_to_exception(test, handler); } std::string @@ -1099,7 +1091,7 @@ MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::afterInvoke( } } -TEST_P(MergeHandlerTest, apply_bucket_diff_reply_spi_failures) { +TEST_F(MergeHandlerTest, apply_bucket_diff_reply_spi_failures) { PersistenceProviderWrapper providerWrapper(getPersistenceProvider()); HandleApplyBucketDiffReplyInvoker invoker; for (int i = 0; i < 2; ++i) { @@ -1126,7 +1118,7 @@ TEST_P(MergeHandlerTest, apply_bucket_diff_reply_spi_failures) { } } -TEST_P(MergeHandlerTest, remove_from_diff) { +TEST_F(MergeHandlerTest, remove_from_diff) { framework::defaultimplementation::FakeClock clock; MergeStatus status(clock, 0, 0); @@ -1192,7 +1184,7 @@ TEST_P(MergeHandlerTest, remove_from_diff) { } } -TEST_P(MergeHandlerTest, remove_put_on_existing_timestamp) { +TEST_F(MergeHandlerTest, remove_put_on_existing_timestamp) { setUpChain(BACK); document::TestDocMan docMan; @@ -1216,15 +1208,10 @@ TEST_P(MergeHandlerTest, remove_put_on_existing_timestamp) { auto tracker = handler.handleApplyBucketDiff(*applyBucketDiffCmd, createTracker(applyBucketDiffCmd, _bucket)); - if (GetParam()) { - ASSERT_FALSE(tracker); - handler.drain_async_writes(); - auto applyBucketDiffReply = std::dynamic_pointer_cast<api::ApplyBucketDiffReply>(get_queued_reply()); - ASSERT_TRUE(applyBucketDiffReply.get()); - } else { - auto applyBucketDiffReply = std::dynamic_pointer_cast<api::ApplyBucketDiffReply>(std::move(*tracker).stealReplySP()); - ASSERT_TRUE(applyBucketDiffReply.get()); - } + ASSERT_FALSE(tracker); + handler.drain_async_writes(); + auto applyBucketDiffReply = std::dynamic_pointer_cast<api::ApplyBucketDiffReply>(get_queued_reply()); + ASSERT_TRUE(applyBucketDiffReply.get()); tracker.reset(); auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp); @@ -1314,7 +1301,7 @@ std::ostream &operator<<(std::ostream &os, const GetBucketDiffCommand::Entry &en } -TEST_P(MergeHandlerTest, partially_filled_apply_bucket_diff_reply) +TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply) { using NodeList = decltype(_nodes); // Redundancy is 2 and source only nodes 3 and 4 have doc1 and doc2 @@ -1446,14 +1433,10 @@ TEST_P(MergeHandlerTest, partially_filled_apply_bucket_diff_reply) handler.handleApplyBucketDiffReply(*reply, messageKeeper(), createTracker(reply, _bucket)); LOG(debug, "handled fourth ApplyBucketDiffReply"); } - if (GetParam()) { - handler.drain_async_writes(); - } + handler.drain_async_writes(); ASSERT_EQ(6u, messageKeeper()._msgs.size()); ASSERT_EQ(api::MessageType::MERGEBUCKET_REPLY, messageKeeper()._msgs[5]->getType()); LOG(debug, "got mergebucket reply"); } -VESPA_GTEST_INSTANTIATE_TEST_SUITE_P(AsyncApplyBucketDiffParams, MergeHandlerTest, testing::Values(false, true)); - } // storage diff --git a/storage/src/tests/persistence/persistencetestutils.cpp b/storage/src/tests/persistence/persistencetestutils.cpp index c163f6de024..c35be2789da 100644 --- a/storage/src/tests/persistence/persistencetestutils.cpp +++ b/storage/src/tests/persistence/persistencetestutils.cpp @@ -127,7 +127,7 @@ VESPA_THREAD_STACK_TAG(test_executor) void PersistenceTestUtils::setupExecutor(uint32_t numThreads) { - _sequenceTaskExecutor = vespalib::SequencedTaskExecutor::create(test_executor, numThreads, 1000, vespalib::Executor::OptimizeFor::ADAPTIVE); + _sequenceTaskExecutor = vespalib::SequencedTaskExecutor::create(test_executor, numThreads, 1000, true, vespalib::Executor::OptimizeFor::ADAPTIVE); } StorBucketDatabase::WrappedEntry diff --git a/storage/src/tests/persistence/shared_operation_throttler_test.cpp b/storage/src/tests/persistence/shared_operation_throttler_test.cpp deleted file mode 100644 index 0ad380937c7..00000000000 --- a/storage/src/tests/persistence/shared_operation_throttler_test.cpp +++ /dev/null @@ -1,116 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <vespa/storage/persistence/shared_operation_throttler.h> -#include <vespa/vespalib/gtest/gtest.h> -#include <vespa/vespalib/util/barrier.h> -#include <chrono> -#include <thread> - -using namespace ::testing; - -namespace storage { - -using ThrottleToken = SharedOperationThrottler::Token; - -TEST(SharedOperationThrottlerTest, unlimited_throttler_does_not_throttle) { - // We technically can't test that the unlimited throttler _never_ throttles, but at - // least check that it doesn't throttle _twice_, and then induce from this ;) - auto throttler = SharedOperationThrottler::make_unlimited_throttler(); - auto token1 = throttler->try_acquire_one(); - EXPECT_TRUE(token1.valid()); - auto token2 = throttler->blocking_acquire_one(); - EXPECT_TRUE(token2.valid()); - // Window size should be zero (i.e. unlimited) for unlimited throttler - EXPECT_EQ(throttler->current_window_size(), 0); -} - -TEST(SharedOperationThrottlerTest, dynamic_throttler_respects_initial_window_size) { - auto throttler = SharedOperationThrottler::make_dynamic_throttler(1); - auto token1 = throttler->try_acquire_one(); - EXPECT_TRUE(token1.valid()); - auto token2 = throttler->try_acquire_one(); - EXPECT_FALSE(token2.valid()); - - EXPECT_EQ(throttler->current_window_size(), 1); -} - -TEST(SharedOperationThrottlerTest, blocking_acquire_returns_immediately_if_slot_available) { - auto throttler = SharedOperationThrottler::make_dynamic_throttler(1); - auto token = throttler->blocking_acquire_one(); - EXPECT_TRUE(token.valid()); - token.reset(); - token = throttler->blocking_acquire_one(600s); // Should never block. - EXPECT_TRUE(token.valid()); -} - -TEST(SharedOperationThrottlerTest, blocking_call_woken_up_if_throttle_slot_available) { - auto throttler = SharedOperationThrottler::make_dynamic_throttler(1); - vespalib::Barrier barrier(2); - std::thread t([&] { - auto token = throttler->try_acquire_one(); - assert(token.valid()); - barrier.await(); - while (throttler->waiting_threads() != 1) { - std::this_thread::sleep_for(100us); - } - // Implicit token release at thread scope exit - }); - barrier.await(); - auto token = throttler->blocking_acquire_one(); - EXPECT_TRUE(token.valid()); - t.join(); -} - -TEST(SharedOperationThrottlerTest, time_bounded_blocking_acquire_waits_for_timeout) { - auto throttler = SharedOperationThrottler::make_dynamic_throttler(1); - auto window_filling_token = throttler->try_acquire_one(); - auto before = std::chrono::steady_clock::now(); - // Will block for at least 1ms. Since no window slot will be available by that time, - // an invalid token should be returned. - auto token = throttler->blocking_acquire_one(1ms); - auto after = std::chrono::steady_clock::now(); - EXPECT_TRUE((after - before) >= 1ms); - EXPECT_FALSE(token.valid()); -} - -TEST(SharedOperationThrottlerTest, default_constructed_token_is_invalid) { - ThrottleToken token; - EXPECT_FALSE(token.valid()); - token.reset(); // no-op - EXPECT_FALSE(token.valid()); -} - -TEST(SharedOperationThrottlerTest, token_destruction_frees_up_throttle_window_slot) { - auto throttler = SharedOperationThrottler::make_dynamic_throttler(1); - { - auto token = throttler->try_acquire_one(); - EXPECT_TRUE(token.valid()); - } - auto token = throttler->try_acquire_one(); - EXPECT_TRUE(token.valid()); -} - -TEST(SharedOperationThrottlerTest, token_can_be_moved_and_reset) { - auto throttler = SharedOperationThrottler::make_dynamic_throttler(1); - auto token1 = throttler->try_acquire_one(); - auto token2 = std::move(token1); // move ctor - EXPECT_TRUE(token2.valid()); - EXPECT_FALSE(token1.valid()); - ThrottleToken token3; - token3 = std::move(token2); // move assignment op - EXPECT_TRUE(token3.valid()); - EXPECT_FALSE(token2.valid()); - - // Trying to fetch new token should not succeed due to active token and win size of 1 - token1 = throttler->try_acquire_one(); - EXPECT_FALSE(token1.valid()); - // Resetting the token should free up the slot in the window - token3.reset(); - token1 = throttler->try_acquire_one(); - EXPECT_TRUE(token1.valid()); -} - -// TODO ideally we'd test that the dynamic throttler has a window size that is actually -// dynamic, but the backing DynamicThrottlePolicy implementation is a black box so -// it's not trivial to know how to do this reliably. - -} diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp index 0f844ab6b4f..89b769078cc 100644 --- a/storage/src/tests/storageserver/mergethrottlertest.cpp +++ b/storage/src/tests/storageserver/mergethrottlertest.cpp @@ -1,18 +1,19 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <vespa/vespalib/util/document_runnable.h> -#include <vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h> #include <tests/common/testhelper.h> #include <tests/common/dummystoragelink.h> #include <tests/common/teststorageapp.h> #include <tests/common/dummystoragelink.h> #include <vespa/document/test/make_document_bucket.h> +#include <vespa/messagebus/dynamicthrottlepolicy.h> +#include <vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h> #include <vespa/storage/storageserver/mergethrottler.h> -#include <vespa/vdslib/state/clusterstate.h> #include <vespa/storage/persistence/messages.h> #include <vespa/storageapi/message/bucket.h> #include <vespa/storageapi/message/state.h> -#include <vespa/vespalib/util/exceptions.h> +#include <vespa/vdslib/state/clusterstate.h> #include <vespa/vespalib/gtest/gtest.h> +#include <vespa/vespalib/util/document_runnable.h> +#include <vespa/vespalib/util/exceptions.h> #include <unordered_set> #include <memory> #include <iterator> @@ -159,6 +160,8 @@ struct MergeThrottlerTest : Test { assert(merge); return merge; } + + [[nodiscard]] uint32_t throttler_max_merges_pending(uint16_t throttler_index) const noexcept; }; MergeThrottlerTest::MergeThrottlerTest() = default; @@ -212,13 +215,10 @@ bool checkChain(const StorageMessage::SP& msg, Iterator first, Iterator end) { - const MergeBucketCommand& cmd = - dynamic_cast<const MergeBucketCommand&>(*msg); - + auto& cmd = dynamic_cast<const MergeBucketCommand&>(*msg); if (cmd.getChain().size() != static_cast<size_t>(std::distance(first, end))) { return false; } - return std::equal(cmd.getChain().begin(), cmd.getChain().end(), first); } @@ -247,11 +247,17 @@ void waitUntilMergeQueueIs(MergeThrottler& throttler, size_t sz, int timeout) } +uint32_t +MergeThrottlerTest::throttler_max_merges_pending(uint16_t throttler_index) const noexcept +{ + return static_cast<uint32_t>(_throttlers[throttler_index]->getThrottlePolicy().getMaxWindowSize()); +} + // Extremely simple test that just checks that (min|max)_merges_per_node // under the stor-server config gets propagated to all the nodes TEST_F(MergeThrottlerTest, merges_config) { for (int i = 0; i < _storageNodeCount; ++i) { - EXPECT_EQ(25, _throttlers[i]->getThrottlePolicy().getMaxPendingCount()); + EXPECT_EQ(25, throttler_max_merges_pending(i)); EXPECT_EQ(20, _throttlers[i]->getMaxQueueSize()); } } @@ -636,7 +642,7 @@ TEST_F(MergeThrottlerTest, resend_handling) { TEST_F(MergeThrottlerTest, priority_queuing) { // Fill up all active merges - size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount(); + size_t maxPending = throttler_max_merges_pending(0); std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}}); ASSERT_GE(maxPending, 4u); for (size_t i = 0; i < maxPending; ++i) { @@ -691,7 +697,7 @@ TEST_F(MergeThrottlerTest, priority_queuing) { // in the queue for a merge that is already known. TEST_F(MergeThrottlerTest, command_in_queue_duplicate_of_known_merge) { // Fill up all active merges and 1 queued one - size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount(); + size_t maxPending = throttler_max_merges_pending(0); ASSERT_LT(maxPending, 100); for (size_t i = 0; i < maxPending + 1; ++i) { std::vector<MergeBucketCommand::Node> nodes({{0}, {uint16_t(2 + i)}, {uint16_t(5 + i)}}); @@ -786,7 +792,7 @@ TEST_F(MergeThrottlerTest, invalid_receiver_node) { // order. TEST_F(MergeThrottlerTest, forward_queued_merge) { // Fill up all active merges and then 3 queued ones - size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount(); + size_t maxPending = throttler_max_merges_pending(0); ASSERT_LT(maxPending, 100); for (size_t i = 0; i < maxPending + 3; ++i) { std::vector<MergeBucketCommand::Node> nodes({{0}, {uint16_t(2 + i)}, {uint16_t(5 + i)}}); @@ -846,7 +852,7 @@ TEST_F(MergeThrottlerTest, execute_queued_merge) { DummyStorageLink& bottomLink(*_bottomLinks[1]); // Fill up all active merges and then 3 queued ones - size_t maxPending = throttler.getThrottlePolicy().getMaxPendingCount(); + size_t maxPending = throttler_max_merges_pending(1); ASSERT_LT(maxPending, 100); for (size_t i = 0; i < maxPending + 3; ++i) { std::vector<MergeBucketCommand::Node> nodes({{1}, {uint16_t(5 + i)}, {uint16_t(7 + i)}}); @@ -914,7 +920,7 @@ TEST_F(MergeThrottlerTest, execute_queued_merge) { TEST_F(MergeThrottlerTest, flush) { // Fill up all active merges and then 3 queued ones - size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount(); + size_t maxPending = throttler_max_merges_pending(0); ASSERT_LT(maxPending, 100); for (size_t i = 0; i < maxPending + 3; ++i) { std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}}); @@ -972,14 +978,11 @@ TEST_F(MergeThrottlerTest, unseen_merge_with_node_in_chain) { // Second, test that we get rejected before queueing up. This is to // avoid a hypothetical deadlock scenario. // Fill up all active merges - { - - size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount(); - for (size_t i = 0; i < maxPending; ++i) { - auto fillCmd = std::make_shared<MergeBucketCommand>( - makeDocumentBucket(BucketId(32, 0xf00baa00 + i)), nodes, 1234); - _topLinks[0]->sendDown(fillCmd); - } + size_t maxPending = throttler_max_merges_pending(0); + for (size_t i = 0; i < maxPending; ++i) { + auto fillCmd = std::make_shared<MergeBucketCommand>( + makeDocumentBucket(BucketId(32, 0xf00baa00 + i)), nodes, 1234); + _topLinks[0]->sendDown(fillCmd); } _topLinks[0]->sendDown(cmd); @@ -993,7 +996,7 @@ TEST_F(MergeThrottlerTest, unseen_merge_with_node_in_chain) { TEST_F(MergeThrottlerTest, merge_with_newer_cluster_state_flushes_outdated_queued){ // Fill up all active merges and then 3 queued ones with the same // system state - size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount(); + size_t maxPending = throttler_max_merges_pending(0); ASSERT_LT(maxPending, 100); std::vector<api::StorageMessage::Id> ids; for (size_t i = 0; i < maxPending + 3; ++i) { @@ -1035,7 +1038,7 @@ TEST_F(MergeThrottlerTest, merge_with_newer_cluster_state_flushes_outdated_queue TEST_F(MergeThrottlerTest, updated_cluster_state_flushes_outdated_queued) { // State is version 1. Send down several merges with state version 2. - size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount(); + size_t maxPending = throttler_max_merges_pending(0); ASSERT_LT(maxPending, 100); std::vector<api::StorageMessage::Id> ids; for (size_t i = 0; i < maxPending + 3; ++i) { @@ -1074,7 +1077,7 @@ TEST_F(MergeThrottlerTest, updated_cluster_state_flushes_outdated_queued) { // TODO remove functionality and test TEST_F(MergeThrottlerTest, legacy_42_merges_do_not_trigger_flush) { // Fill up all active merges and then 1 queued one - size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount(); + size_t maxPending = throttler_max_merges_pending(0); ASSERT_LT(maxPending, 100); for (size_t i = 0; i < maxPending + 1; ++i) { std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}}); @@ -1156,7 +1159,7 @@ TEST_F(MergeThrottlerTest, unknown_merge_with_self_in_chain) { } TEST_F(MergeThrottlerTest, busy_returned_on_full_queue_for_merges_sent_from_distributors) { - size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount(); + size_t maxPending = throttler_max_merges_pending(0); size_t maxQueue = _throttlers[0]->getMaxQueueSize(); ASSERT_EQ(20, maxQueue); ASSERT_LT(maxPending, 100); @@ -1205,7 +1208,7 @@ MergeThrottlerTest::receive_chained_merge_with_full_queue(bool disable_queue_lim { // Note: uses node with index 1 to not be the first node in chain _throttlers[1]->set_disable_queue_limits_for_chained_merges(disable_queue_limits); - size_t max_pending = _throttlers[1]->getThrottlePolicy().getMaxPendingCount(); + size_t max_pending = throttler_max_merges_pending(1); size_t max_enqueued = _throttlers[1]->getMaxQueueSize(); for (size_t i = 0; i < max_pending + max_enqueued; ++i) { std::vector<MergeBucketCommand::Node> nodes({{1}, {2}, {3}}); @@ -1452,7 +1455,7 @@ TEST_F(MergeThrottlerTest, source_only_merges_are_not_affected_by_backpressure) } void MergeThrottlerTest::fill_throttler_queue_with_n_commands(uint16_t throttler_index, size_t queued_count) { - size_t max_pending = _throttlers[throttler_index]->getThrottlePolicy().getMaxPendingCount(); + size_t max_pending = throttler_max_merges_pending(throttler_index); for (size_t i = 0; i < max_pending + queued_count; ++i) { _topLinks[throttler_index]->sendDown(MergeBuilder(document::BucketId(16, i)) .nodes(throttler_index, throttler_index + 1) |