diff options
author | Geir Storli <geirst@verizonmedia.com> | 2021-01-19 13:56:41 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-01-19 13:56:41 +0100 |
commit | 290114fcb01d2806dd533d288ba9ccc43925d7dc (patch) | |
tree | 560dcaf46fde8b94025aa36633fff2696718a806 /storage/src | |
parent | 4771c4573396c8faa4541fa4bea6f5861dd7bb65 (diff) | |
parent | a2a032e2cd01124298977abc7dc6796111b4e049 (diff) |
Merge pull request #16095 from vespa-engine/geirst/reject-feed-in-distributor
Reject feed in distributor when feed in cluster is blocked
Diffstat (limited to 'storage/src')
7 files changed, 111 insertions, 20 deletions
diff --git a/storage/src/tests/distributor/distributortestutil.cpp b/storage/src/tests/distributor/distributortestutil.cpp index 2d2038b7cd0..b472ac1284e 100644 --- a/storage/src/tests/distributor/distributortestutil.cpp +++ b/storage/src/tests/distributor/distributortestutil.cpp @@ -43,14 +43,24 @@ DistributorTestUtil::setupDistributor(int redundancy, uint32_t earlyReturn, bool requirePrimaryToBeWritten) { + setup_distributor(redundancy, nodeCount, lib::ClusterStateBundle(lib::ClusterState(systemState)), earlyReturn, requirePrimaryToBeWritten); +} + +void +DistributorTestUtil::setup_distributor(int redundancy, + int node_count, + const lib::ClusterStateBundle& state, + uint32_t early_return, + bool require_primary_to_be_written) +{ lib::Distribution::DistributionConfigBuilder config( - lib::Distribution::getDefaultDistributionConfig(redundancy, nodeCount).get()); + lib::Distribution::getDefaultDistributionConfig(redundancy, node_count).get()); config.redundancy = redundancy; - config.initialRedundancy = earlyReturn; - config.ensurePrimaryPersisted = requirePrimaryToBeWritten; + config.initialRedundancy = early_return; + config.ensurePrimaryPersisted = require_primary_to_be_written; auto distribution = std::make_shared<lib::Distribution>(config); _node->getComponentRegister().setDistribution(distribution); - enableDistributorClusterState(systemState); + enable_distributor_cluster_state(state); // This is for all intents and purposes a hack to avoid having the // distributor treat setting the distribution explicitly as a signal that // it should send RequestBucketInfo to all configured nodes. @@ -427,4 +437,10 @@ DistributorTestUtil::enableDistributorClusterState(vespalib::stringref state) lib::ClusterStateBundle(lib::ClusterState(state))); } +void +DistributorTestUtil::enable_distributor_cluster_state(const lib::ClusterStateBundle& state) +{ + getBucketDBUpdater().simulate_cluster_state_bundle_activation(state); +} + } diff --git a/storage/src/tests/distributor/distributortestutil.h b/storage/src/tests/distributor/distributortestutil.h index 1bdb6e33512..ee0e1a9eb65 100644 --- a/storage/src/tests/distributor/distributortestutil.h +++ b/storage/src/tests/distributor/distributortestutil.h @@ -161,6 +161,12 @@ public: uint32_t earlyReturn = false, bool requirePrimaryToBeWritten = true); + void setup_distributor(int redundancy, + int node_count, + const lib::ClusterStateBundle& state, + uint32_t early_return = false, + bool require_primary_to_be_written = true); + void setRedundancy(uint32_t redundancy); void notifyDoneInitializing() override {} @@ -206,6 +212,7 @@ protected: MessageSenderImpl _messageSender; void enableDistributorClusterState(vespalib::stringref state); + void enable_distributor_cluster_state(const lib::ClusterStateBundle& state); }; } diff --git a/storage/src/tests/distributor/externaloperationhandlertest.cpp b/storage/src/tests/distributor/externaloperationhandlertest.cpp index e566f16dcdd..a95418b0b74 100644 --- a/storage/src/tests/distributor/externaloperationhandlertest.cpp +++ b/storage/src/tests/distributor/externaloperationhandlertest.cpp @@ -1,19 +1,20 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <tests/distributor/distributortestutil.h> -#include <vespa/storage/distributor/externaloperationhandler.h> +#include <vespa/document/fieldset/fieldsets.h> +#include <vespa/document/repo/documenttyperepo.h> +#include <vespa/document/test/make_document_bucket.h> +#include <vespa/document/update/assignvalueupdate.h> +#include <vespa/document/update/documentupdate.h> +#include <vespa/storage/common/reindexing_constants.h> #include <vespa/storage/distributor/distributor.h> #include <vespa/storage/distributor/distributor_bucket_space.h> #include <vespa/storage/distributor/distributormetricsset.h> +#include <vespa/storage/distributor/externaloperationhandler.h> #include <vespa/storage/distributor/operations/external/getoperation.h> #include <vespa/storage/distributor/operations/external/read_for_write_visitor_operation.h> -#include <vespa/storage/common/reindexing_constants.h> #include <vespa/storageapi/message/persistence.h> #include <vespa/storageapi/message/visitor.h> -#include <vespa/document/repo/documenttyperepo.h> -#include <vespa/document/update/documentupdate.h> -#include <vespa/document/fieldset/fieldsets.h> -#include <vespa/document/test/make_document_bucket.h> #include <vespa/vespalib/gtest/gtest.h> using document::test::makeDocumentBucket; @@ -66,6 +67,8 @@ struct ExternalOperationHandlerTest : Test, DistributorTestUtil { void set_up_distributor_for_sequencing_test(); + void set_up_distributor_with_feed_blocked_state(); + const vespalib::string _dummy_id{"id:foo:testdoctype1::bar"}; // Returns an arbitrary bucket not owned in the pending state @@ -355,6 +358,13 @@ void ExternalOperationHandlerTest::set_up_distributor_for_sequencing_test() { setupDistributor(1, 2, "version:1 distributor:1 storage:1"); } +void ExternalOperationHandlerTest::set_up_distributor_with_feed_blocked_state() { + createLinks(); + setup_distributor(1, 2, + lib::ClusterStateBundle(lib::ClusterState("version:1 distributor:1 storage:1"), + {}, {true, "full disk"}, false)); +} + void ExternalOperationHandlerTest::start_operation_verify_not_rejected( std::shared_ptr<api::StorageCommand> cmd, Operation::SP& out_generated) @@ -564,6 +574,38 @@ TEST_F(ExternalOperationHandlerTest, gets_are_sent_with_weak_consistency_if_conf do_test_get_weak_consistency_is_propagated(true); } +TEST_F(ExternalOperationHandlerTest, puts_are_rejected_if_feed_is_blocked) { + set_up_distributor_with_feed_blocked_state(); + + ASSERT_NO_FATAL_FAILURE(start_operation_verify_rejected( + makePutCommand("testdoctype1", "id:foo:testdoctype1::foo"))); + EXPECT_EQ("ReturnCode(NO_SPACE, External feed is blocked due to resource exhaustion: full disk)", + _sender.reply(0)->getResult().toString()); +} + +TEST_F(ExternalOperationHandlerTest, non_trivial_updates_are_rejected_if_feed_is_blocked) { + set_up_distributor_with_feed_blocked_state(); + + auto cmd = makeUpdateCommand("testdoctype1", "id:foo:testdoctype1::foo"); + const auto* doc_type = _testDocMan.getTypeRepo().getDocumentType("testdoctype1"); + document::FieldUpdate upd(doc_type->getField("title")); + upd.addUpdate(document::AssignValueUpdate(document::StringFieldValue("new value"))); + cmd->getUpdate()->addUpdate(upd); + + ASSERT_NO_FATAL_FAILURE(start_operation_verify_rejected(std::move(cmd))); + EXPECT_EQ("ReturnCode(NO_SPACE, External feed is blocked due to resource exhaustion: full disk)", + _sender.reply(0)->getResult().toString()); +} + +TEST_F(ExternalOperationHandlerTest, trivial_updates_are_not_rejected_if_feed_is_blocked) { + set_up_distributor_with_feed_blocked_state(); + + Operation::SP generated; + ASSERT_NO_FATAL_FAILURE(start_operation_verify_not_rejected( + makeUpdateCommand("testdoctype1", "id:foo:testdoctype1::foo"), generated)); +} + + struct OperationHandlerSequencingTest : ExternalOperationHandlerTest { void SetUp() override { set_up_distributor_for_sequencing_test(); diff --git a/storage/src/vespa/storage/distributor/distributor_operation_context.h b/storage/src/vespa/storage/distributor/distributor_operation_context.h index 083ffcdacf4..97a522a694a 100644 --- a/storage/src/vespa/storage/distributor/distributor_operation_context.h +++ b/storage/src/vespa/storage/distributor/distributor_operation_context.h @@ -12,6 +12,7 @@ namespace document { class Bucket; } namespace storage { class DistributorConfiguration; } +namespace storage::lib { class ClusterStateBundle; } namespace storage::distributor { @@ -48,6 +49,7 @@ public: const document::Bucket& bucket, uint32_t message_type) const = 0; virtual const lib::ClusterState* pending_cluster_state_or_null(const document::BucketSpace& bucket_space) const = 0; + virtual const lib::ClusterStateBundle& cluster_state_bundle() const = 0; // TODO: Move to being a free function instead. virtual const char* storage_node_up_states() const = 0; diff --git a/storage/src/vespa/storage/distributor/distributorcomponent.h b/storage/src/vespa/storage/distributor/distributorcomponent.h index 13c0a72d44e..fc5f1663cfe 100644 --- a/storage/src/vespa/storage/distributor/distributorcomponent.h +++ b/storage/src/vespa/storage/distributor/distributorcomponent.h @@ -201,6 +201,9 @@ public: const lib::ClusterState* pending_cluster_state_or_null(const document::BucketSpace& bucket_space) const override { return getDistributor().pendingClusterStateOrNull(bucket_space); } + const lib::ClusterStateBundle& cluster_state_bundle() const override { + return getClusterStateBundle(); + } const char* storage_node_up_states() const override { return getDistributor().getStorageNodeUpStates(); } diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp index 4d960d69322..ebca3574eac 100644 --- a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp +++ b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp @@ -2,26 +2,27 @@ #include "bucket_space_distribution_context.h" #include "crypto_uuid_generator.h" -#include "externaloperationhandler.h" #include "distributor.h" +#include "distributor_bucket_space.h" +#include "distributor_bucket_space_repo.h" +#include "externaloperationhandler.h" #include "operation_sequencer.h" #include <vespa/document/base/documentid.h> -#include <vespa/storage/distributor/operations/external/putoperation.h> -#include <vespa/storage/distributor/operations/external/twophaseupdateoperation.h> -#include <vespa/storage/distributor/operations/external/updateoperation.h> -#include <vespa/storage/distributor/operations/external/removeoperation.h> +#include <vespa/document/util/feed_reject_helper.h> +#include <vespa/storage/common/reindexing_constants.h> #include <vespa/storage/distributor/operations/external/getoperation.h> -#include <vespa/storage/distributor/operations/external/statbucketoperation.h> -#include <vespa/storage/distributor/operations/external/statbucketlistoperation.h> +#include <vespa/storage/distributor/operations/external/putoperation.h> #include <vespa/storage/distributor/operations/external/read_for_write_visitor_operation.h> #include <vespa/storage/distributor/operations/external/removelocationoperation.h> +#include <vespa/storage/distributor/operations/external/removeoperation.h> +#include <vespa/storage/distributor/operations/external/statbucketlistoperation.h> +#include <vespa/storage/distributor/operations/external/statbucketoperation.h> +#include <vespa/storage/distributor/operations/external/twophaseupdateoperation.h> +#include <vespa/storage/distributor/operations/external/updateoperation.h> #include <vespa/storage/distributor/operations/external/visitoroperation.h> -#include <vespa/storage/common/reindexing_constants.h> #include <vespa/storageapi/message/persistence.h> #include <vespa/storageapi/message/removelocation.h> #include <vespa/storageapi/message/stat.h> -#include "distributor_bucket_space_repo.h" -#include "distributor_bucket_space.h" #include <vespa/log/log.h> #include <vespa/document/bucket/fixed_bucket_spaces.h> @@ -136,6 +137,13 @@ void ExternalOperationHandler::bounce_with_result(api::StorageCommand& cmd, cons _msg_sender.sendUp(std::shared_ptr<api::StorageMessage>(reply.release())); } +void ExternalOperationHandler::bounce_with_feed_blocked(api::StorageCommand& cmd) { + const auto& feed_block = _op_ctx.cluster_state_bundle().feed_block(); + bounce_with_result(cmd, api::ReturnCode(api::ReturnCode::NO_SPACE, + "External feed is blocked due to resource exhaustion: " + + feed_block->description())); +} + void ExternalOperationHandler::bounce_with_wrong_distribution(api::StorageCommand& cmd, const lib::ClusterState& cluster_state) { @@ -284,6 +292,11 @@ std::string extract_reindexing_token(const api::PutCommand& cmd) { } bool ExternalOperationHandler::onPut(const std::shared_ptr<api::PutCommand>& cmd) { + if (_op_ctx.cluster_state_bundle().block_feed_in_cluster()) { + bounce_with_feed_blocked(*cmd); + return true; + } + auto& metrics = getMetrics().puts; if (!checkTimestampMutationPreconditions(*cmd, _op_ctx.make_split_bit_constrained_bucket_id(cmd->getDocumentId()), metrics)) { return true; @@ -327,6 +340,13 @@ bool ExternalOperationHandler::onPut(const std::shared_ptr<api::PutCommand>& cmd bool ExternalOperationHandler::onUpdate(const std::shared_ptr<api::UpdateCommand>& cmd) { + if (_op_ctx.cluster_state_bundle().block_feed_in_cluster() && + document::FeedRejectHelper::mustReject(*cmd->getUpdate())) + { + bounce_with_feed_blocked(*cmd); + return true; + } + auto& metrics = getMetrics().updates; if (!checkTimestampMutationPreconditions(*cmd, _op_ctx.make_split_bit_constrained_bucket_id(cmd->getDocumentId()), metrics)) { return true; diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.h b/storage/src/vespa/storage/distributor/externaloperationhandler.h index 9127325702a..1d42f4b3ca8 100644 --- a/storage/src/vespa/storage/distributor/externaloperationhandler.h +++ b/storage/src/vespa/storage/distributor/externaloperationhandler.h @@ -118,6 +118,7 @@ private: const lib::ClusterState& current_state, const lib::ClusterState& pending_state); void bounce_with_result(api::StorageCommand& cmd, const api::ReturnCode& result); + void bounce_with_feed_blocked(api::StorageCommand& cmd); std::shared_ptr<Operation> try_generate_get_operation(const std::shared_ptr<api::GetCommand>&); bool checkSafeTimeReached(api::StorageCommand& cmd); |