summaryrefslogtreecommitdiffstats
path: root/storage/src/tests
diff options
context:
space:
mode:
Diffstat (limited to 'storage/src/tests')
-rw-r--r--storage/src/tests/common/global_bucket_space_distribution_converter_test.cpp1
-rw-r--r--storage/src/tests/common/teststorageapp.cpp3
-rw-r--r--storage/src/tests/distributor/CMakeLists.txt1
-rw-r--r--storage/src/tests/distributor/distributor_bucket_space_repo_test.cpp72
-rw-r--r--storage/src/tests/distributor/distributor_stripe_test.cpp13
-rw-r--r--storage/src/tests/distributor/putoperationtest.cpp2
-rw-r--r--storage/src/tests/distributor/statecheckerstest.cpp38
-rw-r--r--storage/src/tests/distributor/top_level_distributor_test.cpp2
-rw-r--r--storage/src/tests/distributor/top_level_distributor_test_util.cpp6
-rw-r--r--storage/src/tests/distributor/visitoroperationtest.cpp14
-rw-r--r--storage/src/tests/frameworkimpl/status/statustest.cpp2
-rw-r--r--storage/src/tests/persistence/CMakeLists.txt1
-rw-r--r--storage/src/tests/persistence/common/persistenceproviderwrapper.h10
-rw-r--r--storage/src/tests/persistence/mergehandlertest.cpp85
-rw-r--r--storage/src/tests/persistence/persistencetestutils.cpp2
-rw-r--r--storage/src/tests/persistence/shared_operation_throttler_test.cpp116
-rw-r--r--storage/src/tests/storageserver/mergethrottlertest.cpp59
17 files changed, 219 insertions, 208 deletions
diff --git a/storage/src/tests/common/global_bucket_space_distribution_converter_test.cpp b/storage/src/tests/common/global_bucket_space_distribution_converter_test.cpp
index 23a0df81bab..cd70aecd1bb 100644
--- a/storage/src/tests/common/global_bucket_space_distribution_converter_test.cpp
+++ b/storage/src/tests/common/global_bucket_space_distribution_converter_test.cpp
@@ -2,7 +2,6 @@
#include <vespa/storage/common/global_bucket_space_distribution_converter.h>
#include <vespa/vdslib/distribution/distribution.h>
-#include <vespa/config/config.h>
#include <vespa/vdslib/state/clusterstate.h>
#include <vespa/vespalib/gtest/gtest.h>
diff --git a/storage/src/tests/common/teststorageapp.cpp b/storage/src/tests/common/teststorageapp.cpp
index 7a19e84791d..91fdf5aa602 100644
--- a/storage/src/tests/common/teststorageapp.cpp
+++ b/storage/src/tests/common/teststorageapp.cpp
@@ -4,14 +4,13 @@
#include <vespa/storage/common/content_bucket_db_options.h>
#include <vespa/storage/config/config-stor-server.h>
#include <vespa/config-stor-distribution.h>
-#include <vespa/config-load-type.h>
#include <vespa/config-fleetcontroller.h>
#include <vespa/persistence/dummyimpl/dummypersistence.h>
#include <vespa/vdslib/distribution/distribution.h>
#include <vespa/vdslib/state/clusterstate.h>
#include <vespa/vespalib/util/exceptions.h>
#include <vespa/vespalib/util/time.h>
-#include <vespa/config/config.h>
+#include <vespa/config/subscription/configuri.h>
#include <vespa/config/helper/configgetter.hpp>
#include <thread>
#include <sstream>
diff --git a/storage/src/tests/distributor/CMakeLists.txt b/storage/src/tests/distributor/CMakeLists.txt
index bee7650aebd..501f9a18c47 100644
--- a/storage/src/tests/distributor/CMakeLists.txt
+++ b/storage/src/tests/distributor/CMakeLists.txt
@@ -9,6 +9,7 @@ vespa_add_executable(storage_distributor_gtest_runner_app TEST
bucketdbmetricupdatertest.cpp
bucketgctimecalculatortest.cpp
bucketstateoperationtest.cpp
+ distributor_bucket_space_repo_test.cpp
distributor_bucket_space_test.cpp
distributor_host_info_reporter_test.cpp
distributor_message_sender_stub.cpp
diff --git a/storage/src/tests/distributor/distributor_bucket_space_repo_test.cpp b/storage/src/tests/distributor/distributor_bucket_space_repo_test.cpp
new file mode 100644
index 00000000000..151dcff3d10
--- /dev/null
+++ b/storage/src/tests/distributor/distributor_bucket_space_repo_test.cpp
@@ -0,0 +1,72 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/document/bucket/fixed_bucket_spaces.h>
+#include <vespa/storage/distributor/distributor_bucket_space.h>
+#include <vespa/storage/distributor/distributor_bucket_space_repo.h>
+#include <vespa/vdslib/state/cluster_state_bundle.h>
+#include <vespa/vdslib/state/clusterstate.h>
+#include <vespa/vespalib/gtest/gtest.h>
+#include <memory>
+
+namespace storage::distributor {
+
+using document::FixedBucketSpaces;
+using namespace ::testing;
+
+struct DistributorBucketSpaceRepoTest : Test {
+ DistributorBucketSpaceRepo _repo;
+
+ DistributorBucketSpaceRepoTest() : _repo(123) {}
+};
+
+namespace {
+
+lib::ClusterStateBundle bundle_with_global_merges() {
+ auto global_state = std::make_shared<lib::ClusterState>("distributor:1 storage:2");
+ auto default_state = std::make_shared<lib::ClusterState>("distributor:1 storage:2 .1.s:m");
+ return lib::ClusterStateBundle(*global_state, {{FixedBucketSpaces::default_space(), default_state},
+ {FixedBucketSpaces::global_space(), global_state}});
+}
+
+lib::ClusterStateBundle bundle_without_global_merges() {
+ auto global_state = std::make_shared<lib::ClusterState>("distributor:1 storage:2");
+ auto default_state = std::make_shared<lib::ClusterState>("distributor:1 storage:2");
+ return lib::ClusterStateBundle(*global_state, {{FixedBucketSpaces::default_space(), default_state},
+ {FixedBucketSpaces::global_space(), global_state}});
+}
+
+}
+
+TEST_F(DistributorBucketSpaceRepoTest, bucket_spaces_are_initially_not_tagged_as_merge_inhibited) {
+ EXPECT_FALSE(_repo.get(FixedBucketSpaces::default_space()).merges_inhibited());
+ EXPECT_FALSE(_repo.get(FixedBucketSpaces::global_space()).merges_inhibited());
+}
+
+TEST_F(DistributorBucketSpaceRepoTest, enabled_bundle_with_pending_global_merges_tags_default_space_as_merge_inhibited) {
+ _repo.enable_cluster_state_bundle(bundle_with_global_merges());
+ EXPECT_TRUE(_repo.get(FixedBucketSpaces::default_space()).merges_inhibited());
+ EXPECT_FALSE(_repo.get(FixedBucketSpaces::global_space()).merges_inhibited());
+}
+
+TEST_F(DistributorBucketSpaceRepoTest, enabled_bundle_without_pending_global_merges_unsets_merge_inhibition) {
+ _repo.enable_cluster_state_bundle(bundle_with_global_merges());
+ _repo.enable_cluster_state_bundle(bundle_without_global_merges());
+ EXPECT_FALSE(_repo.get(FixedBucketSpaces::default_space()).merges_inhibited());
+ EXPECT_FALSE(_repo.get(FixedBucketSpaces::global_space()).merges_inhibited());
+}
+
+TEST_F(DistributorBucketSpaceRepoTest, pending_bundle_with_pending_global_merges_tags_default_space_as_merge_inhibited) {
+ _repo.enable_cluster_state_bundle(bundle_without_global_merges());
+ _repo.set_pending_cluster_state_bundle(bundle_with_global_merges());
+ EXPECT_TRUE(_repo.get(FixedBucketSpaces::default_space()).merges_inhibited());
+ EXPECT_FALSE(_repo.get(FixedBucketSpaces::global_space()).merges_inhibited());
+}
+
+TEST_F(DistributorBucketSpaceRepoTest, pending_bundle_without_pending_global_unsets_merge_inhibition) {
+ _repo.enable_cluster_state_bundle(bundle_with_global_merges());
+ _repo.set_pending_cluster_state_bundle(bundle_without_global_merges());
+ EXPECT_FALSE(_repo.get(FixedBucketSpaces::default_space()).merges_inhibited());
+ EXPECT_FALSE(_repo.get(FixedBucketSpaces::global_space()).merges_inhibited());
+}
+
+}
diff --git a/storage/src/tests/distributor/distributor_stripe_test.cpp b/storage/src/tests/distributor/distributor_stripe_test.cpp
index 8c2ebc983fa..709f2e6cdc5 100644
--- a/storage/src/tests/distributor/distributor_stripe_test.cpp
+++ b/storage/src/tests/distributor/distributor_stripe_test.cpp
@@ -629,6 +629,19 @@ TEST_F(DistributorStripeTest, max_clock_skew_config_is_propagated_to_distributor
EXPECT_EQ(getConfig().getMaxClusterClockSkew(), std::chrono::seconds(5));
}
+TEST_F(DistributorStripeTest, inhibit_default_merge_if_global_merges_pending_config_is_propagated)
+{
+ setup_stripe(Redundancy(2), NodeCount(2), "storage:2 distributor:1");
+ ConfigBuilder builder;
+ builder.inhibitDefaultMergesWhenGlobalMergesPending = true;
+ configure_stripe(builder);
+ EXPECT_TRUE(getConfig().inhibit_default_merges_when_global_merges_pending());
+
+ builder.inhibitDefaultMergesWhenGlobalMergesPending = false;
+ configure_stripe(builder);
+ EXPECT_FALSE(getConfig().inhibit_default_merges_when_global_merges_pending());
+}
+
namespace {
auto makeDummyRemoveCommand() {
diff --git a/storage/src/tests/distributor/putoperationtest.cpp b/storage/src/tests/distributor/putoperationtest.cpp
index b02395717e0..53773a55826 100644
--- a/storage/src/tests/distributor/putoperationtest.cpp
+++ b/storage/src/tests/distributor/putoperationtest.cpp
@@ -1,7 +1,6 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <tests/distributor/distributor_stripe_test_util.h>
-#include <vespa/document/config/documenttypes_config_fwd.h>
#include <vespa/document/repo/documenttyperepo.h>
#include <vespa/document/test/make_document_bucket.h>
#include <vespa/storage/distributor/top_level_distributor.h>
@@ -13,6 +12,7 @@
#include <vespa/storageapi/message/state.h>
#include <vespa/vespalib/gtest/gtest.h>
#include <vespa/vespalib/text/stringtokenizer.h>
+#include <vespa/config/helper/configgetter.h>
using std::shared_ptr;
using config::ConfigGetter;
diff --git a/storage/src/tests/distributor/statecheckerstest.cpp b/storage/src/tests/distributor/statecheckerstest.cpp
index d481370b2c1..94f913a3325 100644
--- a/storage/src/tests/distributor/statecheckerstest.cpp
+++ b/storage/src/tests/distributor/statecheckerstest.cpp
@@ -175,6 +175,8 @@ struct StateCheckersTest : Test, DistributorStripeTestUtil {
bool _includeSchedulingPriority {false};
bool _merge_operations_disabled {false};
bool _prioritize_global_bucket_merges {true};
+ bool _config_enable_default_space_merge_inhibition {false};
+ bool _merges_inhibited_in_bucket_space {false};
CheckerParams();
~CheckerParams();
@@ -222,6 +224,14 @@ struct StateCheckersTest : Test, DistributorStripeTestUtil {
_bucket_space = bucket_space;
return *this;
}
+ CheckerParams& config_enable_default_space_merge_inhibition(bool enabled) noexcept {
+ _config_enable_default_space_merge_inhibition = enabled;
+ return *this;
+ }
+ CheckerParams& merges_inhibited_in_bucket_space(bool inhibited) noexcept {
+ _merges_inhibited_in_bucket_space = inhibited;
+ return *this;
+ }
};
template <typename CheckerImpl>
@@ -236,10 +246,12 @@ struct StateCheckersTest : Test, DistributorStripeTestUtil {
vespa::config::content::core::StorDistributormanagerConfigBuilder config;
config.mergeOperationsDisabled = params._merge_operations_disabled;
config.prioritizeGlobalBucketMerges = params._prioritize_global_bucket_merges;
+ config.inhibitDefaultMergesWhenGlobalMergesPending = params._config_enable_default_space_merge_inhibition;
configure_stripe(config);
if (!params._pending_cluster_state.empty()) {
simulate_set_pending_cluster_state(params._pending_cluster_state);
}
+ getBucketSpaceRepo().get(params._bucket_space).set_merges_inhibited(params._merges_inhibited_in_bucket_space);
NodeMaintenanceStatsTracker statsTracker;
StateChecker::Context c(node_context(),
operation_context(),
@@ -818,6 +830,32 @@ TEST_F(StateCheckersTest, no_merge_operation_generated_if_merges_explicitly_conf
.merge_operations_disabled(true));
}
+TEST_F(StateCheckersTest, no_merge_operation_generated_if_merges_inhibited_in_default_bucket_space_and_config_allowed) {
+ // Technically, the state checker doesn't look at global vs. non-global but instead defers
+ // to the distributor bucket space repo to set the inhibition flag on the correct bucket space.
+ // This particular logic is tested at a higher repo-level.
+ runAndVerify<SynchronizeAndMoveStateChecker>(
+ CheckerParams()
+ .expect("NO OPERATIONS GENERATED") // Would normally generate a merge op
+ .bucketInfo("0=1,2=2")
+ .config_enable_default_space_merge_inhibition(true)
+ .merges_inhibited_in_bucket_space(true)
+ .clusterState("distributor:1 storage:3"));
+}
+
+TEST_F(StateCheckersTest, merge_operation_still_generated_if_merges_inhibited_in_default_bucket_space_but_config_disallowed) {
+ runAndVerify<SynchronizeAndMoveStateChecker>(
+ CheckerParams()
+ .expect("[Moving bucket to ideal node 1]"
+ "[Synchronizing buckets with different checksums "
+ "node(idx=0,crc=0x1,docs=1/1,bytes=1/1,trusted=false,active=false,ready=false), "
+ "node(idx=2,crc=0x2,docs=2/2,bytes=2/2,trusted=false,active=false,ready=false)]")
+ .bucketInfo("0=1,2=2")
+ .config_enable_default_space_merge_inhibition(false)
+ .merges_inhibited_in_bucket_space(true)
+ .clusterState("distributor:1 storage:3"));
+}
+
std::string
StateCheckersTest::testDeleteExtraCopies(
const std::string& bucketInfo, uint32_t redundancy,
diff --git a/storage/src/tests/distributor/top_level_distributor_test.cpp b/storage/src/tests/distributor/top_level_distributor_test.cpp
index a0477e352d1..2c19652b4c2 100644
--- a/storage/src/tests/distributor/top_level_distributor_test.cpp
+++ b/storage/src/tests/distributor/top_level_distributor_test.cpp
@@ -253,7 +253,7 @@ TEST_F(TopLevelDistributorTest, tick_aggregates_status_requests_from_all_stripes
FakeClock clock;
ThreadPoolImpl pool(clock);
int ticksBeforeWait = 1;
- framework::Thread::UP tp(pool.startThread(thread, "statustest", 5ms, 5s, ticksBeforeWait));
+ framework::Thread::UP tp(pool.startThread(thread, "statustest", 5ms, 5s, ticksBeforeWait, std::nullopt));
while (true) {
std::this_thread::sleep_for(1ms);
diff --git a/storage/src/tests/distributor/top_level_distributor_test_util.cpp b/storage/src/tests/distributor/top_level_distributor_test_util.cpp
index 2a61141865a..5dcc7f5c6ad 100644
--- a/storage/src/tests/distributor/top_level_distributor_test_util.cpp
+++ b/storage/src/tests/distributor/top_level_distributor_test_util.cpp
@@ -31,8 +31,8 @@ TopLevelDistributorTestUtil::~TopLevelDistributorTestUtil() = default;
void
TopLevelDistributorTestUtil::create_links()
{
- _node.reset(new TestDistributorApp(_config.getConfigId()));
- _thread_pool = framework::TickingThreadPool::createDefault("distributor");
+ _node = std::make_unique<TestDistributorApp>(_config.getConfigId());
+ _thread_pool = framework::TickingThreadPool::createDefault("distributor", 100ms);
_stripe_pool = DistributorStripePool::make_non_threaded_pool_for_testing();
_distributor.reset(new TopLevelDistributor(
_node->getComponentRegister(),
@@ -43,7 +43,7 @@ TopLevelDistributorTestUtil::create_links()
_num_distributor_stripes,
_host_info,
&_message_sender));
- _component.reset(new storage::DistributorComponent(_node->getComponentRegister(), "distrtestutil"));
+ _component = std::make_unique<storage::DistributorComponent>(_node->getComponentRegister(), "distrtestutil");
};
void
diff --git a/storage/src/tests/distributor/visitoroperationtest.cpp b/storage/src/tests/distributor/visitoroperationtest.cpp
index ea99186a434..6c597b620dd 100644
--- a/storage/src/tests/distributor/visitoroperationtest.cpp
+++ b/storage/src/tests/distributor/visitoroperationtest.cpp
@@ -252,6 +252,20 @@ TEST_F(VisitorOperationTest, no_bucket) {
runEmptyVisitor(msg));
}
+TEST_F(VisitorOperationTest, none_fieldset_is_rejected) {
+ enable_cluster_state("distributor:1 storage:1");
+ auto msg = std::make_shared<api::CreateVisitorCommand>(
+ makeBucketSpace(), "dumpvisitor", "instance", "");
+ msg->addBucketToBeVisited(document::BucketId(16, 1));
+ msg->addBucketToBeVisited(nullId);
+ msg->setFieldSet("[none]");
+
+ EXPECT_EQ("CreateVisitorReply(last=BucketId(0x0000000000000000)) "
+ "ReturnCode(ILLEGAL_PARAMETERS, Field set '[none]' is not supported "
+ "for external visitor operations. Use '[id]' to return documents with no fields set.)",
+ runEmptyVisitor(msg));
+}
+
TEST_F(VisitorOperationTest, only_super_bucket_and_progress_allowed) {
enable_cluster_state("distributor:1 storage:1");
diff --git a/storage/src/tests/frameworkimpl/status/statustest.cpp b/storage/src/tests/frameworkimpl/status/statustest.cpp
index 2e136977026..97bfa41aece 100644
--- a/storage/src/tests/frameworkimpl/status/statustest.cpp
+++ b/storage/src/tests/frameworkimpl/status/statustest.cpp
@@ -2,7 +2,6 @@
#include <vespa/storageframework/defaultimplementation/component/componentregisterimpl.h>
#include <vespa/storage/frameworkimpl/status/statuswebserver.h>
-#include <vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h>
#include <vespa/storageframework/generic/status/htmlstatusreporter.h>
#include <vespa/storageframework/generic/status/xmlstatusreporter.h>
#include <tests/common/teststorageapp.h>
@@ -10,6 +9,7 @@
#include <vespa/vespalib/net/crypto_engine.h>
#include <vespa/vespalib/net/socket_spec.h>
#include <vespa/vespalib/net/sync_crypto_socket.h>
+#include <vespa/config/subscription/configuri.h>
#include <vespa/vespalib/gtest/gtest.h>
#include <gmock/gmock.h>
diff --git a/storage/src/tests/persistence/CMakeLists.txt b/storage/src/tests/persistence/CMakeLists.txt
index fb8120210c1..7b165e11b66 100644
--- a/storage/src/tests/persistence/CMakeLists.txt
+++ b/storage/src/tests/persistence/CMakeLists.txt
@@ -12,7 +12,6 @@ vespa_add_executable(storage_persistence_gtest_runner_app TEST
persistencethread_splittest.cpp
processalltest.cpp
provider_error_wrapper_test.cpp
- shared_operation_throttler_test.cpp
splitbitdetectortest.cpp
testandsettest.cpp
gtest_runner.cpp
diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.h b/storage/src/tests/persistence/common/persistenceproviderwrapper.h
index 1552a955221..e0538fb7ca7 100644
--- a/storage/src/tests/persistence/common/persistenceproviderwrapper.h
+++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.h
@@ -72,8 +72,14 @@ public:
/**
* Set a mask for operations to fail with _result
*/
- void setFailureMask(uint32_t mask) { _failureMask = mask; }
- uint32_t getFailureMask() const { return _failureMask; }
+ void setFailureMask(uint32_t mask) {
+ Guard guard(_lock);
+ _failureMask = mask;
+ }
+ uint32_t getFailureMask() const {
+ Guard guard(_lock);
+ return _failureMask;
+ }
/**
* Get a string representation of all the operations performed on the
diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp
index 6288f86993d..e9d399d357f 100644
--- a/storage/src/tests/persistence/mergehandlertest.cpp
+++ b/storage/src/tests/persistence/mergehandlertest.cpp
@@ -21,11 +21,9 @@ using namespace ::testing;
namespace storage {
/*
- * Class for testing merge handler taking async_apply_bucket_diff as
- * parameter for the test.
+ * Class for testing merge handler.
*/
-struct MergeHandlerTest : PersistenceTestUtils,
- public testing::WithParamInterface<bool> {
+struct MergeHandlerTest : PersistenceTestUtils {
uint32_t _location; // Location used for all merge tests
document::Bucket _bucket; // Bucket used for all merge tests
uint64_t _maxTimestamp;
@@ -172,11 +170,11 @@ struct MergeHandlerTest : PersistenceTestUtils,
MergeHandler createHandler(size_t maxChunkSize = 0x400000) {
return MergeHandler(getEnv(), getPersistenceProvider(),
- getEnv()._component.cluster_context(), getEnv()._component.getClock(), *_sequenceTaskExecutor, maxChunkSize, 64, GetParam());
+ getEnv()._component.cluster_context(), getEnv()._component.getClock(), *_sequenceTaskExecutor, maxChunkSize, 64);
}
MergeHandler createHandler(spi::PersistenceProvider & spi) {
return MergeHandler(getEnv(), spi,
- getEnv()._component.cluster_context(), getEnv()._component.getClock(), *_sequenceTaskExecutor, 4190208, 64, GetParam());
+ getEnv()._component.cluster_context(), getEnv()._component.getClock(), *_sequenceTaskExecutor, 4190208, 64);
}
std::shared_ptr<api::StorageMessage> get_queued_reply() {
@@ -232,7 +230,7 @@ MergeHandlerTest::setUpChain(ChainPos pos) {
// Test a regular merge bucket command fetching data, including
// puts, removes, unrevertable removes & duplicates.
-TEST_P(MergeHandlerTest, merge_bucket_command) {
+TEST_F(MergeHandlerTest, merge_bucket_command) {
MergeHandler handler = createHandler();
LOG(debug, "Handle a merge bucket command");
@@ -293,11 +291,11 @@ MergeHandlerTest::testGetBucketDiffChain(bool midChain)
EXPECT_EQ(17, diff.size());
}
-TEST_P(MergeHandlerTest, get_bucket_diff_mid_chain) {
+TEST_F(MergeHandlerTest, get_bucket_diff_mid_chain) {
testGetBucketDiffChain(true);
}
-TEST_P(MergeHandlerTest, get_bucket_diff_end_of_chain) {
+TEST_F(MergeHandlerTest, get_bucket_diff_end_of_chain) {
testGetBucketDiffChain(false);
}
@@ -344,17 +342,17 @@ MergeHandlerTest::testApplyBucketDiffChain(bool midChain)
EXPECT_EQ(0, diff.size());
}
-TEST_P(MergeHandlerTest, apply_bucket_diff_mid_chain) {
+TEST_F(MergeHandlerTest, apply_bucket_diff_mid_chain) {
testApplyBucketDiffChain(true);
}
-TEST_P(MergeHandlerTest, apply_bucket_diff_end_of_chain) {
+TEST_F(MergeHandlerTest, apply_bucket_diff_end_of_chain) {
testApplyBucketDiffChain(false);
}
// Test that a simplistic merge with one thing to actually merge,
// sends correct commands and finish.
-TEST_P(MergeHandlerTest, master_message_flow) {
+TEST_F(MergeHandlerTest, master_message_flow) {
MergeHandler handler = createHandler();
LOG(debug, "Handle a merge bucket command");
@@ -448,7 +446,7 @@ getFilledDataSize(const std::vector<api::ApplyBucketDiffCommand::Entry>& diff)
}
-TEST_P(MergeHandlerTest, chunked_apply_bucket_diff) {
+TEST_F(MergeHandlerTest, chunked_apply_bucket_diff) {
uint32_t docSize = 1024;
uint32_t docCount = 10;
uint32_t maxChunkSize = docSize * 3;
@@ -512,7 +510,7 @@ TEST_P(MergeHandlerTest, chunked_apply_bucket_diff) {
EXPECT_TRUE(reply->getResult().success());
}
-TEST_P(MergeHandlerTest, chunk_limit_partially_filled_diff) {
+TEST_F(MergeHandlerTest, chunk_limit_partially_filled_diff) {
setUpChain(FRONT);
uint32_t docSize = 1024;
@@ -548,7 +546,7 @@ TEST_P(MergeHandlerTest, chunk_limit_partially_filled_diff) {
EXPECT_LE(getFilledDataSize(fwdDiffCmd->getDiff()), maxChunkSize);
}
-TEST_P(MergeHandlerTest, max_timestamp) {
+TEST_F(MergeHandlerTest, max_timestamp) {
doPut(1234, spi::Timestamp(_maxTimestamp + 10), 1024, 1024);
MergeHandler handler = createHandler();
@@ -656,7 +654,7 @@ MergeHandlerTest::createDummyGetBucketDiff(int timestampOffset, uint16_t hasMask
return getBucketDiffCmd;
}
-TEST_P(MergeHandlerTest, spi_flush_guard) {
+TEST_F(MergeHandlerTest, spi_flush_guard) {
PersistenceProviderWrapper providerWrapper(getPersistenceProvider());
MergeHandler handler = createHandler(providerWrapper);
@@ -671,16 +669,14 @@ TEST_P(MergeHandlerTest, spi_flush_guard) {
try {
auto cmd = createDummyApplyDiff(6000);
handler.handleApplyBucketDiff(*cmd, createTracker(cmd, _bucket));
- if (GetParam()) {
- convert_delayed_error_to_exception(handler);
- }
+ convert_delayed_error_to_exception(handler);
FAIL() << "No exception thrown on failing in-place remove";
} catch (const std::runtime_error& e) {
EXPECT_TRUE(std::string(e.what()).find("Failed remove") != std::string::npos);
}
}
-TEST_P(MergeHandlerTest, bucket_not_found_in_db) {
+TEST_F(MergeHandlerTest, bucket_not_found_in_db) {
MergeHandler handler = createHandler();
// Send merge for unknown bucket
auto cmd = std::make_shared<api::MergeBucketCommand>(makeDocumentBucket(document::BucketId(16, 6789)), _nodes, _maxTimestamp);
@@ -688,7 +684,7 @@ TEST_P(MergeHandlerTest, bucket_not_found_in_db) {
EXPECT_TRUE(tracker->getResult().isBucketDisappearance());
}
-TEST_P(MergeHandlerTest, merge_progress_safe_guard) {
+TEST_F(MergeHandlerTest, merge_progress_safe_guard) {
MergeHandler handler = createHandler();
auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp);
handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket));
@@ -711,7 +707,7 @@ TEST_P(MergeHandlerTest, merge_progress_safe_guard) {
EXPECT_EQ(mergeReply->getResult().getResult(), api::ReturnCode::INTERNAL_FAILURE);
}
-TEST_P(MergeHandlerTest, safe_guard_not_invoked_when_has_mask_changes) {
+TEST_F(MergeHandlerTest, safe_guard_not_invoked_when_has_mask_changes) {
MergeHandler handler = createHandler();
_nodes.clear();
_nodes.emplace_back(0, false);
@@ -743,7 +739,7 @@ TEST_P(MergeHandlerTest, safe_guard_not_invoked_when_has_mask_changes) {
EXPECT_EQ(0x5, applyBucketDiffCmd2->getDiff()[0]._entry._hasMask);
}
-TEST_P(MergeHandlerTest, entry_removed_after_get_bucket_diff) {
+TEST_F(MergeHandlerTest, entry_removed_after_get_bucket_diff) {
MergeHandler handler = createHandler();
std::vector<api::ApplyBucketDiffCommand::Entry> applyDiff;
{
@@ -799,9 +795,7 @@ MergeHandlerTest::doTestSPIException(MergeHandler& handler,
providerWrapper.setFailureMask(failureMask);
try {
invoker.invoke(*this, handler, *_context);
- if (GetParam()) {
- convert_delayed_error_to_exception(handler);
- }
+ convert_delayed_error_to_exception(handler);
if (failureMask != 0) {
return (std::string("No exception was thrown during handler "
"invocation. Expected exception containing '")
@@ -870,7 +864,7 @@ MergeHandlerTest::HandleMergeBucketInvoker::invoke(
handler.handleMergeBucket(*cmd, test.createTracker(cmd, test._bucket));
}
-TEST_P(MergeHandlerTest, merge_bucket_spi_failures) {
+TEST_F(MergeHandlerTest, merge_bucket_spi_failures) {
PersistenceProviderWrapper providerWrapper(getPersistenceProvider());
MergeHandler handler = createHandler(providerWrapper);
providerWrapper.setResult(
@@ -901,7 +895,7 @@ MergeHandlerTest::HandleGetBucketDiffInvoker::invoke(
handler.handleGetBucketDiff(*cmd, test.createTracker(cmd, test._bucket));
}
-TEST_P(MergeHandlerTest, get_bucket_diff_spi_failures) {
+TEST_F(MergeHandlerTest, get_bucket_diff_spi_failures) {
PersistenceProviderWrapper providerWrapper(getPersistenceProvider());
MergeHandler handler = createHandler(providerWrapper);
providerWrapper.setResult(spi::Result(spi::Result::ErrorType::PERMANENT_ERROR, "who you gonna call?"));
@@ -933,7 +927,7 @@ MergeHandlerTest::HandleApplyBucketDiffInvoker::invoke(
handler.handleApplyBucketDiff(*cmd, test.createTracker(cmd, test._bucket));
}
-TEST_P(MergeHandlerTest, apply_bucket_diff_spi_failures) {
+TEST_F(MergeHandlerTest, apply_bucket_diff_spi_failures) {
PersistenceProviderWrapper providerWrapper(getPersistenceProvider());
MergeHandler handler = createHandler(providerWrapper);
providerWrapper.setResult(
@@ -998,7 +992,7 @@ MergeHandlerTest::HandleGetBucketDiffReplyInvoker::afterInvoke(
api::ReturnCode::INTERNAL_FAILURE);
}
-TEST_P(MergeHandlerTest, get_bucket_diff_reply_spi_failures) {
+TEST_F(MergeHandlerTest, get_bucket_diff_reply_spi_failures) {
PersistenceProviderWrapper providerWrapper(getPersistenceProvider());
MergeHandler handler = createHandler(providerWrapper);
providerWrapper.setResult(
@@ -1073,9 +1067,7 @@ MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::invoke(
test.fillDummyApplyDiff(reply->getDiff());
_stub.clear();
handler.handleApplyBucketDiffReply(*reply, _stub, test.createTracker(reply, test._bucket));
- if (test.GetParam()) {
- convert_delayed_error_to_exception(test, handler);
- }
+ convert_delayed_error_to_exception(test, handler);
}
std::string
@@ -1099,7 +1091,7 @@ MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::afterInvoke(
}
}
-TEST_P(MergeHandlerTest, apply_bucket_diff_reply_spi_failures) {
+TEST_F(MergeHandlerTest, apply_bucket_diff_reply_spi_failures) {
PersistenceProviderWrapper providerWrapper(getPersistenceProvider());
HandleApplyBucketDiffReplyInvoker invoker;
for (int i = 0; i < 2; ++i) {
@@ -1126,7 +1118,7 @@ TEST_P(MergeHandlerTest, apply_bucket_diff_reply_spi_failures) {
}
}
-TEST_P(MergeHandlerTest, remove_from_diff) {
+TEST_F(MergeHandlerTest, remove_from_diff) {
framework::defaultimplementation::FakeClock clock;
MergeStatus status(clock, 0, 0);
@@ -1192,7 +1184,7 @@ TEST_P(MergeHandlerTest, remove_from_diff) {
}
}
-TEST_P(MergeHandlerTest, remove_put_on_existing_timestamp) {
+TEST_F(MergeHandlerTest, remove_put_on_existing_timestamp) {
setUpChain(BACK);
document::TestDocMan docMan;
@@ -1216,15 +1208,10 @@ TEST_P(MergeHandlerTest, remove_put_on_existing_timestamp) {
auto tracker = handler.handleApplyBucketDiff(*applyBucketDiffCmd, createTracker(applyBucketDiffCmd, _bucket));
- if (GetParam()) {
- ASSERT_FALSE(tracker);
- handler.drain_async_writes();
- auto applyBucketDiffReply = std::dynamic_pointer_cast<api::ApplyBucketDiffReply>(get_queued_reply());
- ASSERT_TRUE(applyBucketDiffReply.get());
- } else {
- auto applyBucketDiffReply = std::dynamic_pointer_cast<api::ApplyBucketDiffReply>(std::move(*tracker).stealReplySP());
- ASSERT_TRUE(applyBucketDiffReply.get());
- }
+ ASSERT_FALSE(tracker);
+ handler.drain_async_writes();
+ auto applyBucketDiffReply = std::dynamic_pointer_cast<api::ApplyBucketDiffReply>(get_queued_reply());
+ ASSERT_TRUE(applyBucketDiffReply.get());
tracker.reset();
auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp);
@@ -1314,7 +1301,7 @@ std::ostream &operator<<(std::ostream &os, const GetBucketDiffCommand::Entry &en
}
-TEST_P(MergeHandlerTest, partially_filled_apply_bucket_diff_reply)
+TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply)
{
using NodeList = decltype(_nodes);
// Redundancy is 2 and source only nodes 3 and 4 have doc1 and doc2
@@ -1446,14 +1433,10 @@ TEST_P(MergeHandlerTest, partially_filled_apply_bucket_diff_reply)
handler.handleApplyBucketDiffReply(*reply, messageKeeper(), createTracker(reply, _bucket));
LOG(debug, "handled fourth ApplyBucketDiffReply");
}
- if (GetParam()) {
- handler.drain_async_writes();
- }
+ handler.drain_async_writes();
ASSERT_EQ(6u, messageKeeper()._msgs.size());
ASSERT_EQ(api::MessageType::MERGEBUCKET_REPLY, messageKeeper()._msgs[5]->getType());
LOG(debug, "got mergebucket reply");
}
-VESPA_GTEST_INSTANTIATE_TEST_SUITE_P(AsyncApplyBucketDiffParams, MergeHandlerTest, testing::Values(false, true));
-
} // storage
diff --git a/storage/src/tests/persistence/persistencetestutils.cpp b/storage/src/tests/persistence/persistencetestutils.cpp
index c163f6de024..c35be2789da 100644
--- a/storage/src/tests/persistence/persistencetestutils.cpp
+++ b/storage/src/tests/persistence/persistencetestutils.cpp
@@ -127,7 +127,7 @@ VESPA_THREAD_STACK_TAG(test_executor)
void
PersistenceTestUtils::setupExecutor(uint32_t numThreads) {
- _sequenceTaskExecutor = vespalib::SequencedTaskExecutor::create(test_executor, numThreads, 1000, vespalib::Executor::OptimizeFor::ADAPTIVE);
+ _sequenceTaskExecutor = vespalib::SequencedTaskExecutor::create(test_executor, numThreads, 1000, true, vespalib::Executor::OptimizeFor::ADAPTIVE);
}
StorBucketDatabase::WrappedEntry
diff --git a/storage/src/tests/persistence/shared_operation_throttler_test.cpp b/storage/src/tests/persistence/shared_operation_throttler_test.cpp
deleted file mode 100644
index 0ad380937c7..00000000000
--- a/storage/src/tests/persistence/shared_operation_throttler_test.cpp
+++ /dev/null
@@ -1,116 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <vespa/storage/persistence/shared_operation_throttler.h>
-#include <vespa/vespalib/gtest/gtest.h>
-#include <vespa/vespalib/util/barrier.h>
-#include <chrono>
-#include <thread>
-
-using namespace ::testing;
-
-namespace storage {
-
-using ThrottleToken = SharedOperationThrottler::Token;
-
-TEST(SharedOperationThrottlerTest, unlimited_throttler_does_not_throttle) {
- // We technically can't test that the unlimited throttler _never_ throttles, but at
- // least check that it doesn't throttle _twice_, and then induce from this ;)
- auto throttler = SharedOperationThrottler::make_unlimited_throttler();
- auto token1 = throttler->try_acquire_one();
- EXPECT_TRUE(token1.valid());
- auto token2 = throttler->blocking_acquire_one();
- EXPECT_TRUE(token2.valid());
- // Window size should be zero (i.e. unlimited) for unlimited throttler
- EXPECT_EQ(throttler->current_window_size(), 0);
-}
-
-TEST(SharedOperationThrottlerTest, dynamic_throttler_respects_initial_window_size) {
- auto throttler = SharedOperationThrottler::make_dynamic_throttler(1);
- auto token1 = throttler->try_acquire_one();
- EXPECT_TRUE(token1.valid());
- auto token2 = throttler->try_acquire_one();
- EXPECT_FALSE(token2.valid());
-
- EXPECT_EQ(throttler->current_window_size(), 1);
-}
-
-TEST(SharedOperationThrottlerTest, blocking_acquire_returns_immediately_if_slot_available) {
- auto throttler = SharedOperationThrottler::make_dynamic_throttler(1);
- auto token = throttler->blocking_acquire_one();
- EXPECT_TRUE(token.valid());
- token.reset();
- token = throttler->blocking_acquire_one(600s); // Should never block.
- EXPECT_TRUE(token.valid());
-}
-
-TEST(SharedOperationThrottlerTest, blocking_call_woken_up_if_throttle_slot_available) {
- auto throttler = SharedOperationThrottler::make_dynamic_throttler(1);
- vespalib::Barrier barrier(2);
- std::thread t([&] {
- auto token = throttler->try_acquire_one();
- assert(token.valid());
- barrier.await();
- while (throttler->waiting_threads() != 1) {
- std::this_thread::sleep_for(100us);
- }
- // Implicit token release at thread scope exit
- });
- barrier.await();
- auto token = throttler->blocking_acquire_one();
- EXPECT_TRUE(token.valid());
- t.join();
-}
-
-TEST(SharedOperationThrottlerTest, time_bounded_blocking_acquire_waits_for_timeout) {
- auto throttler = SharedOperationThrottler::make_dynamic_throttler(1);
- auto window_filling_token = throttler->try_acquire_one();
- auto before = std::chrono::steady_clock::now();
- // Will block for at least 1ms. Since no window slot will be available by that time,
- // an invalid token should be returned.
- auto token = throttler->blocking_acquire_one(1ms);
- auto after = std::chrono::steady_clock::now();
- EXPECT_TRUE((after - before) >= 1ms);
- EXPECT_FALSE(token.valid());
-}
-
-TEST(SharedOperationThrottlerTest, default_constructed_token_is_invalid) {
- ThrottleToken token;
- EXPECT_FALSE(token.valid());
- token.reset(); // no-op
- EXPECT_FALSE(token.valid());
-}
-
-TEST(SharedOperationThrottlerTest, token_destruction_frees_up_throttle_window_slot) {
- auto throttler = SharedOperationThrottler::make_dynamic_throttler(1);
- {
- auto token = throttler->try_acquire_one();
- EXPECT_TRUE(token.valid());
- }
- auto token = throttler->try_acquire_one();
- EXPECT_TRUE(token.valid());
-}
-
-TEST(SharedOperationThrottlerTest, token_can_be_moved_and_reset) {
- auto throttler = SharedOperationThrottler::make_dynamic_throttler(1);
- auto token1 = throttler->try_acquire_one();
- auto token2 = std::move(token1); // move ctor
- EXPECT_TRUE(token2.valid());
- EXPECT_FALSE(token1.valid());
- ThrottleToken token3;
- token3 = std::move(token2); // move assignment op
- EXPECT_TRUE(token3.valid());
- EXPECT_FALSE(token2.valid());
-
- // Trying to fetch new token should not succeed due to active token and win size of 1
- token1 = throttler->try_acquire_one();
- EXPECT_FALSE(token1.valid());
- // Resetting the token should free up the slot in the window
- token3.reset();
- token1 = throttler->try_acquire_one();
- EXPECT_TRUE(token1.valid());
-}
-
-// TODO ideally we'd test that the dynamic throttler has a window size that is actually
-// dynamic, but the backing DynamicThrottlePolicy implementation is a black box so
-// it's not trivial to know how to do this reliably.
-
-}
diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp
index 0f844ab6b4f..89b769078cc 100644
--- a/storage/src/tests/storageserver/mergethrottlertest.cpp
+++ b/storage/src/tests/storageserver/mergethrottlertest.cpp
@@ -1,18 +1,19 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <vespa/vespalib/util/document_runnable.h>
-#include <vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h>
#include <tests/common/testhelper.h>
#include <tests/common/dummystoragelink.h>
#include <tests/common/teststorageapp.h>
#include <tests/common/dummystoragelink.h>
#include <vespa/document/test/make_document_bucket.h>
+#include <vespa/messagebus/dynamicthrottlepolicy.h>
+#include <vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h>
#include <vespa/storage/storageserver/mergethrottler.h>
-#include <vespa/vdslib/state/clusterstate.h>
#include <vespa/storage/persistence/messages.h>
#include <vespa/storageapi/message/bucket.h>
#include <vespa/storageapi/message/state.h>
-#include <vespa/vespalib/util/exceptions.h>
+#include <vespa/vdslib/state/clusterstate.h>
#include <vespa/vespalib/gtest/gtest.h>
+#include <vespa/vespalib/util/document_runnable.h>
+#include <vespa/vespalib/util/exceptions.h>
#include <unordered_set>
#include <memory>
#include <iterator>
@@ -159,6 +160,8 @@ struct MergeThrottlerTest : Test {
assert(merge);
return merge;
}
+
+ [[nodiscard]] uint32_t throttler_max_merges_pending(uint16_t throttler_index) const noexcept;
};
MergeThrottlerTest::MergeThrottlerTest() = default;
@@ -212,13 +215,10 @@ bool
checkChain(const StorageMessage::SP& msg,
Iterator first, Iterator end)
{
- const MergeBucketCommand& cmd =
- dynamic_cast<const MergeBucketCommand&>(*msg);
-
+ auto& cmd = dynamic_cast<const MergeBucketCommand&>(*msg);
if (cmd.getChain().size() != static_cast<size_t>(std::distance(first, end))) {
return false;
}
-
return std::equal(cmd.getChain().begin(), cmd.getChain().end(), first);
}
@@ -247,11 +247,17 @@ void waitUntilMergeQueueIs(MergeThrottler& throttler, size_t sz, int timeout)
}
+uint32_t
+MergeThrottlerTest::throttler_max_merges_pending(uint16_t throttler_index) const noexcept
+{
+ return static_cast<uint32_t>(_throttlers[throttler_index]->getThrottlePolicy().getMaxWindowSize());
+}
+
// Extremely simple test that just checks that (min|max)_merges_per_node
// under the stor-server config gets propagated to all the nodes
TEST_F(MergeThrottlerTest, merges_config) {
for (int i = 0; i < _storageNodeCount; ++i) {
- EXPECT_EQ(25, _throttlers[i]->getThrottlePolicy().getMaxPendingCount());
+ EXPECT_EQ(25, throttler_max_merges_pending(i));
EXPECT_EQ(20, _throttlers[i]->getMaxQueueSize());
}
}
@@ -636,7 +642,7 @@ TEST_F(MergeThrottlerTest, resend_handling) {
TEST_F(MergeThrottlerTest, priority_queuing) {
// Fill up all active merges
- size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount();
+ size_t maxPending = throttler_max_merges_pending(0);
std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}});
ASSERT_GE(maxPending, 4u);
for (size_t i = 0; i < maxPending; ++i) {
@@ -691,7 +697,7 @@ TEST_F(MergeThrottlerTest, priority_queuing) {
// in the queue for a merge that is already known.
TEST_F(MergeThrottlerTest, command_in_queue_duplicate_of_known_merge) {
// Fill up all active merges and 1 queued one
- size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount();
+ size_t maxPending = throttler_max_merges_pending(0);
ASSERT_LT(maxPending, 100);
for (size_t i = 0; i < maxPending + 1; ++i) {
std::vector<MergeBucketCommand::Node> nodes({{0}, {uint16_t(2 + i)}, {uint16_t(5 + i)}});
@@ -786,7 +792,7 @@ TEST_F(MergeThrottlerTest, invalid_receiver_node) {
// order.
TEST_F(MergeThrottlerTest, forward_queued_merge) {
// Fill up all active merges and then 3 queued ones
- size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount();
+ size_t maxPending = throttler_max_merges_pending(0);
ASSERT_LT(maxPending, 100);
for (size_t i = 0; i < maxPending + 3; ++i) {
std::vector<MergeBucketCommand::Node> nodes({{0}, {uint16_t(2 + i)}, {uint16_t(5 + i)}});
@@ -846,7 +852,7 @@ TEST_F(MergeThrottlerTest, execute_queued_merge) {
DummyStorageLink& bottomLink(*_bottomLinks[1]);
// Fill up all active merges and then 3 queued ones
- size_t maxPending = throttler.getThrottlePolicy().getMaxPendingCount();
+ size_t maxPending = throttler_max_merges_pending(1);
ASSERT_LT(maxPending, 100);
for (size_t i = 0; i < maxPending + 3; ++i) {
std::vector<MergeBucketCommand::Node> nodes({{1}, {uint16_t(5 + i)}, {uint16_t(7 + i)}});
@@ -914,7 +920,7 @@ TEST_F(MergeThrottlerTest, execute_queued_merge) {
TEST_F(MergeThrottlerTest, flush) {
// Fill up all active merges and then 3 queued ones
- size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount();
+ size_t maxPending = throttler_max_merges_pending(0);
ASSERT_LT(maxPending, 100);
for (size_t i = 0; i < maxPending + 3; ++i) {
std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}});
@@ -972,14 +978,11 @@ TEST_F(MergeThrottlerTest, unseen_merge_with_node_in_chain) {
// Second, test that we get rejected before queueing up. This is to
// avoid a hypothetical deadlock scenario.
// Fill up all active merges
- {
-
- size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount();
- for (size_t i = 0; i < maxPending; ++i) {
- auto fillCmd = std::make_shared<MergeBucketCommand>(
- makeDocumentBucket(BucketId(32, 0xf00baa00 + i)), nodes, 1234);
- _topLinks[0]->sendDown(fillCmd);
- }
+ size_t maxPending = throttler_max_merges_pending(0);
+ for (size_t i = 0; i < maxPending; ++i) {
+ auto fillCmd = std::make_shared<MergeBucketCommand>(
+ makeDocumentBucket(BucketId(32, 0xf00baa00 + i)), nodes, 1234);
+ _topLinks[0]->sendDown(fillCmd);
}
_topLinks[0]->sendDown(cmd);
@@ -993,7 +996,7 @@ TEST_F(MergeThrottlerTest, unseen_merge_with_node_in_chain) {
TEST_F(MergeThrottlerTest, merge_with_newer_cluster_state_flushes_outdated_queued){
// Fill up all active merges and then 3 queued ones with the same
// system state
- size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount();
+ size_t maxPending = throttler_max_merges_pending(0);
ASSERT_LT(maxPending, 100);
std::vector<api::StorageMessage::Id> ids;
for (size_t i = 0; i < maxPending + 3; ++i) {
@@ -1035,7 +1038,7 @@ TEST_F(MergeThrottlerTest, merge_with_newer_cluster_state_flushes_outdated_queue
TEST_F(MergeThrottlerTest, updated_cluster_state_flushes_outdated_queued) {
// State is version 1. Send down several merges with state version 2.
- size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount();
+ size_t maxPending = throttler_max_merges_pending(0);
ASSERT_LT(maxPending, 100);
std::vector<api::StorageMessage::Id> ids;
for (size_t i = 0; i < maxPending + 3; ++i) {
@@ -1074,7 +1077,7 @@ TEST_F(MergeThrottlerTest, updated_cluster_state_flushes_outdated_queued) {
// TODO remove functionality and test
TEST_F(MergeThrottlerTest, legacy_42_merges_do_not_trigger_flush) {
// Fill up all active merges and then 1 queued one
- size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount();
+ size_t maxPending = throttler_max_merges_pending(0);
ASSERT_LT(maxPending, 100);
for (size_t i = 0; i < maxPending + 1; ++i) {
std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}});
@@ -1156,7 +1159,7 @@ TEST_F(MergeThrottlerTest, unknown_merge_with_self_in_chain) {
}
TEST_F(MergeThrottlerTest, busy_returned_on_full_queue_for_merges_sent_from_distributors) {
- size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount();
+ size_t maxPending = throttler_max_merges_pending(0);
size_t maxQueue = _throttlers[0]->getMaxQueueSize();
ASSERT_EQ(20, maxQueue);
ASSERT_LT(maxPending, 100);
@@ -1205,7 +1208,7 @@ MergeThrottlerTest::receive_chained_merge_with_full_queue(bool disable_queue_lim
{
// Note: uses node with index 1 to not be the first node in chain
_throttlers[1]->set_disable_queue_limits_for_chained_merges(disable_queue_limits);
- size_t max_pending = _throttlers[1]->getThrottlePolicy().getMaxPendingCount();
+ size_t max_pending = throttler_max_merges_pending(1);
size_t max_enqueued = _throttlers[1]->getMaxQueueSize();
for (size_t i = 0; i < max_pending + max_enqueued; ++i) {
std::vector<MergeBucketCommand::Node> nodes({{1}, {2}, {3}});
@@ -1452,7 +1455,7 @@ TEST_F(MergeThrottlerTest, source_only_merges_are_not_affected_by_backpressure)
}
void MergeThrottlerTest::fill_throttler_queue_with_n_commands(uint16_t throttler_index, size_t queued_count) {
- size_t max_pending = _throttlers[throttler_index]->getThrottlePolicy().getMaxPendingCount();
+ size_t max_pending = throttler_max_merges_pending(throttler_index);
for (size_t i = 0; i < max_pending + queued_count; ++i) {
_topLinks[throttler_index]->sendDown(MergeBuilder(document::BucketId(16, i))
.nodes(throttler_index, throttler_index + 1)