summaryrefslogtreecommitdiffstats
path: root/storage/src
diff options
context:
space:
mode:
authorGeir Storli <geirst@verizonmedia.com>2021-01-19 13:56:41 +0100
committerGitHub <noreply@github.com>2021-01-19 13:56:41 +0100
commit290114fcb01d2806dd533d288ba9ccc43925d7dc (patch)
tree560dcaf46fde8b94025aa36633fff2696718a806 /storage/src
parent4771c4573396c8faa4541fa4bea6f5861dd7bb65 (diff)
parenta2a032e2cd01124298977abc7dc6796111b4e049 (diff)
Merge pull request #16095 from vespa-engine/geirst/reject-feed-in-distributor
Reject feed in distributor when feed in cluster is blocked
Diffstat (limited to 'storage/src')
-rw-r--r--storage/src/tests/distributor/distributortestutil.cpp24
-rw-r--r--storage/src/tests/distributor/distributortestutil.h7
-rw-r--r--storage/src/tests/distributor/externaloperationhandlertest.cpp54
-rw-r--r--storage/src/vespa/storage/distributor/distributor_operation_context.h2
-rw-r--r--storage/src/vespa/storage/distributor/distributorcomponent.h3
-rw-r--r--storage/src/vespa/storage/distributor/externaloperationhandler.cpp40
-rw-r--r--storage/src/vespa/storage/distributor/externaloperationhandler.h1
7 files changed, 111 insertions, 20 deletions
diff --git a/storage/src/tests/distributor/distributortestutil.cpp b/storage/src/tests/distributor/distributortestutil.cpp
index 2d2038b7cd0..b472ac1284e 100644
--- a/storage/src/tests/distributor/distributortestutil.cpp
+++ b/storage/src/tests/distributor/distributortestutil.cpp
@@ -43,14 +43,24 @@ DistributorTestUtil::setupDistributor(int redundancy,
uint32_t earlyReturn,
bool requirePrimaryToBeWritten)
{
+ setup_distributor(redundancy, nodeCount, lib::ClusterStateBundle(lib::ClusterState(systemState)), earlyReturn, requirePrimaryToBeWritten);
+}
+
+void
+DistributorTestUtil::setup_distributor(int redundancy,
+ int node_count,
+ const lib::ClusterStateBundle& state,
+ uint32_t early_return,
+ bool require_primary_to_be_written)
+{
lib::Distribution::DistributionConfigBuilder config(
- lib::Distribution::getDefaultDistributionConfig(redundancy, nodeCount).get());
+ lib::Distribution::getDefaultDistributionConfig(redundancy, node_count).get());
config.redundancy = redundancy;
- config.initialRedundancy = earlyReturn;
- config.ensurePrimaryPersisted = requirePrimaryToBeWritten;
+ config.initialRedundancy = early_return;
+ config.ensurePrimaryPersisted = require_primary_to_be_written;
auto distribution = std::make_shared<lib::Distribution>(config);
_node->getComponentRegister().setDistribution(distribution);
- enableDistributorClusterState(systemState);
+ enable_distributor_cluster_state(state);
// This is for all intents and purposes a hack to avoid having the
// distributor treat setting the distribution explicitly as a signal that
// it should send RequestBucketInfo to all configured nodes.
@@ -427,4 +437,10 @@ DistributorTestUtil::enableDistributorClusterState(vespalib::stringref state)
lib::ClusterStateBundle(lib::ClusterState(state)));
}
+void
+DistributorTestUtil::enable_distributor_cluster_state(const lib::ClusterStateBundle& state)
+{
+ getBucketDBUpdater().simulate_cluster_state_bundle_activation(state);
+}
+
}
diff --git a/storage/src/tests/distributor/distributortestutil.h b/storage/src/tests/distributor/distributortestutil.h
index 1bdb6e33512..ee0e1a9eb65 100644
--- a/storage/src/tests/distributor/distributortestutil.h
+++ b/storage/src/tests/distributor/distributortestutil.h
@@ -161,6 +161,12 @@ public:
uint32_t earlyReturn = false,
bool requirePrimaryToBeWritten = true);
+ void setup_distributor(int redundancy,
+ int node_count,
+ const lib::ClusterStateBundle& state,
+ uint32_t early_return = false,
+ bool require_primary_to_be_written = true);
+
void setRedundancy(uint32_t redundancy);
void notifyDoneInitializing() override {}
@@ -206,6 +212,7 @@ protected:
MessageSenderImpl _messageSender;
void enableDistributorClusterState(vespalib::stringref state);
+ void enable_distributor_cluster_state(const lib::ClusterStateBundle& state);
};
}
diff --git a/storage/src/tests/distributor/externaloperationhandlertest.cpp b/storage/src/tests/distributor/externaloperationhandlertest.cpp
index e566f16dcdd..a95418b0b74 100644
--- a/storage/src/tests/distributor/externaloperationhandlertest.cpp
+++ b/storage/src/tests/distributor/externaloperationhandlertest.cpp
@@ -1,19 +1,20 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <tests/distributor/distributortestutil.h>
-#include <vespa/storage/distributor/externaloperationhandler.h>
+#include <vespa/document/fieldset/fieldsets.h>
+#include <vespa/document/repo/documenttyperepo.h>
+#include <vespa/document/test/make_document_bucket.h>
+#include <vespa/document/update/assignvalueupdate.h>
+#include <vespa/document/update/documentupdate.h>
+#include <vespa/storage/common/reindexing_constants.h>
#include <vespa/storage/distributor/distributor.h>
#include <vespa/storage/distributor/distributor_bucket_space.h>
#include <vespa/storage/distributor/distributormetricsset.h>
+#include <vespa/storage/distributor/externaloperationhandler.h>
#include <vespa/storage/distributor/operations/external/getoperation.h>
#include <vespa/storage/distributor/operations/external/read_for_write_visitor_operation.h>
-#include <vespa/storage/common/reindexing_constants.h>
#include <vespa/storageapi/message/persistence.h>
#include <vespa/storageapi/message/visitor.h>
-#include <vespa/document/repo/documenttyperepo.h>
-#include <vespa/document/update/documentupdate.h>
-#include <vespa/document/fieldset/fieldsets.h>
-#include <vespa/document/test/make_document_bucket.h>
#include <vespa/vespalib/gtest/gtest.h>
using document::test::makeDocumentBucket;
@@ -66,6 +67,8 @@ struct ExternalOperationHandlerTest : Test, DistributorTestUtil {
void set_up_distributor_for_sequencing_test();
+ void set_up_distributor_with_feed_blocked_state();
+
const vespalib::string _dummy_id{"id:foo:testdoctype1::bar"};
// Returns an arbitrary bucket not owned in the pending state
@@ -355,6 +358,13 @@ void ExternalOperationHandlerTest::set_up_distributor_for_sequencing_test() {
setupDistributor(1, 2, "version:1 distributor:1 storage:1");
}
+void ExternalOperationHandlerTest::set_up_distributor_with_feed_blocked_state() {
+ createLinks();
+ setup_distributor(1, 2,
+ lib::ClusterStateBundle(lib::ClusterState("version:1 distributor:1 storage:1"),
+ {}, {true, "full disk"}, false));
+}
+
void ExternalOperationHandlerTest::start_operation_verify_not_rejected(
std::shared_ptr<api::StorageCommand> cmd,
Operation::SP& out_generated)
@@ -564,6 +574,38 @@ TEST_F(ExternalOperationHandlerTest, gets_are_sent_with_weak_consistency_if_conf
do_test_get_weak_consistency_is_propagated(true);
}
+TEST_F(ExternalOperationHandlerTest, puts_are_rejected_if_feed_is_blocked) {
+ set_up_distributor_with_feed_blocked_state();
+
+ ASSERT_NO_FATAL_FAILURE(start_operation_verify_rejected(
+ makePutCommand("testdoctype1", "id:foo:testdoctype1::foo")));
+ EXPECT_EQ("ReturnCode(NO_SPACE, External feed is blocked due to resource exhaustion: full disk)",
+ _sender.reply(0)->getResult().toString());
+}
+
+TEST_F(ExternalOperationHandlerTest, non_trivial_updates_are_rejected_if_feed_is_blocked) {
+ set_up_distributor_with_feed_blocked_state();
+
+ auto cmd = makeUpdateCommand("testdoctype1", "id:foo:testdoctype1::foo");
+ const auto* doc_type = _testDocMan.getTypeRepo().getDocumentType("testdoctype1");
+ document::FieldUpdate upd(doc_type->getField("title"));
+ upd.addUpdate(document::AssignValueUpdate(document::StringFieldValue("new value")));
+ cmd->getUpdate()->addUpdate(upd);
+
+ ASSERT_NO_FATAL_FAILURE(start_operation_verify_rejected(std::move(cmd)));
+ EXPECT_EQ("ReturnCode(NO_SPACE, External feed is blocked due to resource exhaustion: full disk)",
+ _sender.reply(0)->getResult().toString());
+}
+
+TEST_F(ExternalOperationHandlerTest, trivial_updates_are_not_rejected_if_feed_is_blocked) {
+ set_up_distributor_with_feed_blocked_state();
+
+ Operation::SP generated;
+ ASSERT_NO_FATAL_FAILURE(start_operation_verify_not_rejected(
+ makeUpdateCommand("testdoctype1", "id:foo:testdoctype1::foo"), generated));
+}
+
+
struct OperationHandlerSequencingTest : ExternalOperationHandlerTest {
void SetUp() override {
set_up_distributor_for_sequencing_test();
diff --git a/storage/src/vespa/storage/distributor/distributor_operation_context.h b/storage/src/vespa/storage/distributor/distributor_operation_context.h
index 083ffcdacf4..97a522a694a 100644
--- a/storage/src/vespa/storage/distributor/distributor_operation_context.h
+++ b/storage/src/vespa/storage/distributor/distributor_operation_context.h
@@ -12,6 +12,7 @@
namespace document { class Bucket; }
namespace storage { class DistributorConfiguration; }
+namespace storage::lib { class ClusterStateBundle; }
namespace storage::distributor {
@@ -48,6 +49,7 @@ public:
const document::Bucket& bucket,
uint32_t message_type) const = 0;
virtual const lib::ClusterState* pending_cluster_state_or_null(const document::BucketSpace& bucket_space) const = 0;
+ virtual const lib::ClusterStateBundle& cluster_state_bundle() const = 0;
// TODO: Move to being a free function instead.
virtual const char* storage_node_up_states() const = 0;
diff --git a/storage/src/vespa/storage/distributor/distributorcomponent.h b/storage/src/vespa/storage/distributor/distributorcomponent.h
index 13c0a72d44e..fc5f1663cfe 100644
--- a/storage/src/vespa/storage/distributor/distributorcomponent.h
+++ b/storage/src/vespa/storage/distributor/distributorcomponent.h
@@ -201,6 +201,9 @@ public:
const lib::ClusterState* pending_cluster_state_or_null(const document::BucketSpace& bucket_space) const override {
return getDistributor().pendingClusterStateOrNull(bucket_space);
}
+ const lib::ClusterStateBundle& cluster_state_bundle() const override {
+ return getClusterStateBundle();
+ }
const char* storage_node_up_states() const override {
return getDistributor().getStorageNodeUpStates();
}
diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
index 4d960d69322..ebca3574eac 100644
--- a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
+++ b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
@@ -2,26 +2,27 @@
#include "bucket_space_distribution_context.h"
#include "crypto_uuid_generator.h"
-#include "externaloperationhandler.h"
#include "distributor.h"
+#include "distributor_bucket_space.h"
+#include "distributor_bucket_space_repo.h"
+#include "externaloperationhandler.h"
#include "operation_sequencer.h"
#include <vespa/document/base/documentid.h>
-#include <vespa/storage/distributor/operations/external/putoperation.h>
-#include <vespa/storage/distributor/operations/external/twophaseupdateoperation.h>
-#include <vespa/storage/distributor/operations/external/updateoperation.h>
-#include <vespa/storage/distributor/operations/external/removeoperation.h>
+#include <vespa/document/util/feed_reject_helper.h>
+#include <vespa/storage/common/reindexing_constants.h>
#include <vespa/storage/distributor/operations/external/getoperation.h>
-#include <vespa/storage/distributor/operations/external/statbucketoperation.h>
-#include <vespa/storage/distributor/operations/external/statbucketlistoperation.h>
+#include <vespa/storage/distributor/operations/external/putoperation.h>
#include <vespa/storage/distributor/operations/external/read_for_write_visitor_operation.h>
#include <vespa/storage/distributor/operations/external/removelocationoperation.h>
+#include <vespa/storage/distributor/operations/external/removeoperation.h>
+#include <vespa/storage/distributor/operations/external/statbucketlistoperation.h>
+#include <vespa/storage/distributor/operations/external/statbucketoperation.h>
+#include <vespa/storage/distributor/operations/external/twophaseupdateoperation.h>
+#include <vespa/storage/distributor/operations/external/updateoperation.h>
#include <vespa/storage/distributor/operations/external/visitoroperation.h>
-#include <vespa/storage/common/reindexing_constants.h>
#include <vespa/storageapi/message/persistence.h>
#include <vespa/storageapi/message/removelocation.h>
#include <vespa/storageapi/message/stat.h>
-#include "distributor_bucket_space_repo.h"
-#include "distributor_bucket_space.h"
#include <vespa/log/log.h>
#include <vespa/document/bucket/fixed_bucket_spaces.h>
@@ -136,6 +137,13 @@ void ExternalOperationHandler::bounce_with_result(api::StorageCommand& cmd, cons
_msg_sender.sendUp(std::shared_ptr<api::StorageMessage>(reply.release()));
}
+void ExternalOperationHandler::bounce_with_feed_blocked(api::StorageCommand& cmd) {
+ const auto& feed_block = _op_ctx.cluster_state_bundle().feed_block();
+ bounce_with_result(cmd, api::ReturnCode(api::ReturnCode::NO_SPACE,
+ "External feed is blocked due to resource exhaustion: " +
+ feed_block->description()));
+}
+
void ExternalOperationHandler::bounce_with_wrong_distribution(api::StorageCommand& cmd,
const lib::ClusterState& cluster_state)
{
@@ -284,6 +292,11 @@ std::string extract_reindexing_token(const api::PutCommand& cmd) {
}
bool ExternalOperationHandler::onPut(const std::shared_ptr<api::PutCommand>& cmd) {
+ if (_op_ctx.cluster_state_bundle().block_feed_in_cluster()) {
+ bounce_with_feed_blocked(*cmd);
+ return true;
+ }
+
auto& metrics = getMetrics().puts;
if (!checkTimestampMutationPreconditions(*cmd, _op_ctx.make_split_bit_constrained_bucket_id(cmd->getDocumentId()), metrics)) {
return true;
@@ -327,6 +340,13 @@ bool ExternalOperationHandler::onPut(const std::shared_ptr<api::PutCommand>& cmd
bool ExternalOperationHandler::onUpdate(const std::shared_ptr<api::UpdateCommand>& cmd) {
+ if (_op_ctx.cluster_state_bundle().block_feed_in_cluster() &&
+ document::FeedRejectHelper::mustReject(*cmd->getUpdate()))
+ {
+ bounce_with_feed_blocked(*cmd);
+ return true;
+ }
+
auto& metrics = getMetrics().updates;
if (!checkTimestampMutationPreconditions(*cmd, _op_ctx.make_split_bit_constrained_bucket_id(cmd->getDocumentId()), metrics)) {
return true;
diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.h b/storage/src/vespa/storage/distributor/externaloperationhandler.h
index 9127325702a..1d42f4b3ca8 100644
--- a/storage/src/vespa/storage/distributor/externaloperationhandler.h
+++ b/storage/src/vespa/storage/distributor/externaloperationhandler.h
@@ -118,6 +118,7 @@ private:
const lib::ClusterState& current_state,
const lib::ClusterState& pending_state);
void bounce_with_result(api::StorageCommand& cmd, const api::ReturnCode& result);
+ void bounce_with_feed_blocked(api::StorageCommand& cmd);
std::shared_ptr<Operation> try_generate_get_operation(const std::shared_ptr<api::GetCommand>&);
bool checkSafeTimeReached(api::StorageCommand& cmd);