diff options
author | Geir Storli <geirst@verizonmedia.com> | 2021-01-19 11:33:43 +0000 |
---|---|---|
committer | Geir Storli <geirst@verizonmedia.com> | 2021-01-19 11:33:43 +0000 |
commit | 4b4938c47dd4c1cbb37ac22e7436f0ec2d2754fe (patch) | |
tree | c3103e3528f44d8fa8194414242ac224f46efa20 /storage/src | |
parent | 6e8f5059422bbdb36418f165c4933040ee0bfc72 (diff) |
Reject puts if feed is blocked in the cluster.
Diffstat (limited to 'storage/src')
6 files changed, 59 insertions, 4 deletions
diff --git a/storage/src/tests/distributor/distributortestutil.cpp b/storage/src/tests/distributor/distributortestutil.cpp index 2d2038b7cd0..b472ac1284e 100644 --- a/storage/src/tests/distributor/distributortestutil.cpp +++ b/storage/src/tests/distributor/distributortestutil.cpp @@ -43,14 +43,24 @@ DistributorTestUtil::setupDistributor(int redundancy, uint32_t earlyReturn, bool requirePrimaryToBeWritten) { + setup_distributor(redundancy, nodeCount, lib::ClusterStateBundle(lib::ClusterState(systemState)), earlyReturn, requirePrimaryToBeWritten); +} + +void +DistributorTestUtil::setup_distributor(int redundancy, + int node_count, + const lib::ClusterStateBundle& state, + uint32_t early_return, + bool require_primary_to_be_written) +{ lib::Distribution::DistributionConfigBuilder config( - lib::Distribution::getDefaultDistributionConfig(redundancy, nodeCount).get()); + lib::Distribution::getDefaultDistributionConfig(redundancy, node_count).get()); config.redundancy = redundancy; - config.initialRedundancy = earlyReturn; - config.ensurePrimaryPersisted = requirePrimaryToBeWritten; + config.initialRedundancy = early_return; + config.ensurePrimaryPersisted = require_primary_to_be_written; auto distribution = std::make_shared<lib::Distribution>(config); _node->getComponentRegister().setDistribution(distribution); - enableDistributorClusterState(systemState); + enable_distributor_cluster_state(state); // This is for all intents and purposes a hack to avoid having the // distributor treat setting the distribution explicitly as a signal that // it should send RequestBucketInfo to all configured nodes. @@ -427,4 +437,10 @@ DistributorTestUtil::enableDistributorClusterState(vespalib::stringref state) lib::ClusterStateBundle(lib::ClusterState(state))); } +void +DistributorTestUtil::enable_distributor_cluster_state(const lib::ClusterStateBundle& state) +{ + getBucketDBUpdater().simulate_cluster_state_bundle_activation(state); +} + } diff --git a/storage/src/tests/distributor/distributortestutil.h b/storage/src/tests/distributor/distributortestutil.h index 1bdb6e33512..ee0e1a9eb65 100644 --- a/storage/src/tests/distributor/distributortestutil.h +++ b/storage/src/tests/distributor/distributortestutil.h @@ -161,6 +161,12 @@ public: uint32_t earlyReturn = false, bool requirePrimaryToBeWritten = true); + void setup_distributor(int redundancy, + int node_count, + const lib::ClusterStateBundle& state, + uint32_t early_return = false, + bool require_primary_to_be_written = true); + void setRedundancy(uint32_t redundancy); void notifyDoneInitializing() override {} @@ -206,6 +212,7 @@ protected: MessageSenderImpl _messageSender; void enableDistributorClusterState(vespalib::stringref state); + void enable_distributor_cluster_state(const lib::ClusterStateBundle& state); }; } diff --git a/storage/src/tests/distributor/externaloperationhandlertest.cpp b/storage/src/tests/distributor/externaloperationhandlertest.cpp index e566f16dcdd..eae7ad7fcde 100644 --- a/storage/src/tests/distributor/externaloperationhandlertest.cpp +++ b/storage/src/tests/distributor/externaloperationhandlertest.cpp @@ -66,6 +66,8 @@ struct ExternalOperationHandlerTest : Test, DistributorTestUtil { void set_up_distributor_for_sequencing_test(); + void set_up_distributor_with_feed_blocked_state(); + const vespalib::string _dummy_id{"id:foo:testdoctype1::bar"}; // Returns an arbitrary bucket not owned in the pending state @@ -355,6 +357,13 @@ void ExternalOperationHandlerTest::set_up_distributor_for_sequencing_test() { setupDistributor(1, 2, "version:1 distributor:1 storage:1"); } +void ExternalOperationHandlerTest::set_up_distributor_with_feed_blocked_state() { + createLinks(); + setup_distributor(1, 2, + lib::ClusterStateBundle(lib::ClusterState("version:1 distributor:1 storage:1"), + {}, {true, "full disk"}, false)); +} + void ExternalOperationHandlerTest::start_operation_verify_not_rejected( std::shared_ptr<api::StorageCommand> cmd, Operation::SP& out_generated) @@ -564,6 +573,16 @@ TEST_F(ExternalOperationHandlerTest, gets_are_sent_with_weak_consistency_if_conf do_test_get_weak_consistency_is_propagated(true); } +TEST_F(ExternalOperationHandlerTest, puts_are_rejected_if_feed_is_blocked) { + set_up_distributor_with_feed_blocked_state(); + + ASSERT_NO_FATAL_FAILURE(start_operation_verify_rejected( + makePutCommand("testdoctype1", "id:foo:testdoctype1::foo"))); + EXPECT_EQ("ReturnCode(NO_SPACE, External feed is blocked due to resource exhaustion: full disk)", + _sender.reply(0)->getResult().toString()); +} + + struct OperationHandlerSequencingTest : ExternalOperationHandlerTest { void SetUp() override { set_up_distributor_for_sequencing_test(); diff --git a/storage/src/vespa/storage/distributor/distributor_operation_context.h b/storage/src/vespa/storage/distributor/distributor_operation_context.h index 083ffcdacf4..97a522a694a 100644 --- a/storage/src/vespa/storage/distributor/distributor_operation_context.h +++ b/storage/src/vespa/storage/distributor/distributor_operation_context.h @@ -12,6 +12,7 @@ namespace document { class Bucket; } namespace storage { class DistributorConfiguration; } +namespace storage::lib { class ClusterStateBundle; } namespace storage::distributor { @@ -48,6 +49,7 @@ public: const document::Bucket& bucket, uint32_t message_type) const = 0; virtual const lib::ClusterState* pending_cluster_state_or_null(const document::BucketSpace& bucket_space) const = 0; + virtual const lib::ClusterStateBundle& cluster_state_bundle() const = 0; // TODO: Move to being a free function instead. virtual const char* storage_node_up_states() const = 0; diff --git a/storage/src/vespa/storage/distributor/distributorcomponent.h b/storage/src/vespa/storage/distributor/distributorcomponent.h index 13c0a72d44e..fc5f1663cfe 100644 --- a/storage/src/vespa/storage/distributor/distributorcomponent.h +++ b/storage/src/vespa/storage/distributor/distributorcomponent.h @@ -201,6 +201,9 @@ public: const lib::ClusterState* pending_cluster_state_or_null(const document::BucketSpace& bucket_space) const override { return getDistributor().pendingClusterStateOrNull(bucket_space); } + const lib::ClusterStateBundle& cluster_state_bundle() const override { + return getClusterStateBundle(); + } const char* storage_node_up_states() const override { return getDistributor().getStorageNodeUpStates(); } diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp index 4d960d69322..1e76e701a7e 100644 --- a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp +++ b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp @@ -284,6 +284,14 @@ std::string extract_reindexing_token(const api::PutCommand& cmd) { } bool ExternalOperationHandler::onPut(const std::shared_ptr<api::PutCommand>& cmd) { + if (_op_ctx.cluster_state_bundle().block_feed_in_cluster()) { + const auto& feed_block = _op_ctx.cluster_state_bundle().feed_block(); + bounce_with_result(*cmd, api::ReturnCode(api::ReturnCode::NO_SPACE, + "External feed is blocked due to resource exhaustion: " + + feed_block->description())); + return true; + } + auto& metrics = getMetrics().puts; if (!checkTimestampMutationPreconditions(*cmd, _op_ctx.make_split_bit_constrained_bucket_id(cmd->getDocumentId()), metrics)) { return true; |