summaryrefslogtreecommitdiffstats
path: root/storage/src
diff options
context:
space:
mode:
authorGeir Storli <geirst@verizonmedia.com>2021-01-19 11:33:43 +0000
committerGeir Storli <geirst@verizonmedia.com>2021-01-19 11:33:43 +0000
commit4b4938c47dd4c1cbb37ac22e7436f0ec2d2754fe (patch)
treec3103e3528f44d8fa8194414242ac224f46efa20 /storage/src
parent6e8f5059422bbdb36418f165c4933040ee0bfc72 (diff)
Reject puts if feed is blocked in the cluster.
Diffstat (limited to 'storage/src')
-rw-r--r--storage/src/tests/distributor/distributortestutil.cpp24
-rw-r--r--storage/src/tests/distributor/distributortestutil.h7
-rw-r--r--storage/src/tests/distributor/externaloperationhandlertest.cpp19
-rw-r--r--storage/src/vespa/storage/distributor/distributor_operation_context.h2
-rw-r--r--storage/src/vespa/storage/distributor/distributorcomponent.h3
-rw-r--r--storage/src/vespa/storage/distributor/externaloperationhandler.cpp8
6 files changed, 59 insertions, 4 deletions
diff --git a/storage/src/tests/distributor/distributortestutil.cpp b/storage/src/tests/distributor/distributortestutil.cpp
index 2d2038b7cd0..b472ac1284e 100644
--- a/storage/src/tests/distributor/distributortestutil.cpp
+++ b/storage/src/tests/distributor/distributortestutil.cpp
@@ -43,14 +43,24 @@ DistributorTestUtil::setupDistributor(int redundancy,
uint32_t earlyReturn,
bool requirePrimaryToBeWritten)
{
+ setup_distributor(redundancy, nodeCount, lib::ClusterStateBundle(lib::ClusterState(systemState)), earlyReturn, requirePrimaryToBeWritten);
+}
+
+void
+DistributorTestUtil::setup_distributor(int redundancy,
+ int node_count,
+ const lib::ClusterStateBundle& state,
+ uint32_t early_return,
+ bool require_primary_to_be_written)
+{
lib::Distribution::DistributionConfigBuilder config(
- lib::Distribution::getDefaultDistributionConfig(redundancy, nodeCount).get());
+ lib::Distribution::getDefaultDistributionConfig(redundancy, node_count).get());
config.redundancy = redundancy;
- config.initialRedundancy = earlyReturn;
- config.ensurePrimaryPersisted = requirePrimaryToBeWritten;
+ config.initialRedundancy = early_return;
+ config.ensurePrimaryPersisted = require_primary_to_be_written;
auto distribution = std::make_shared<lib::Distribution>(config);
_node->getComponentRegister().setDistribution(distribution);
- enableDistributorClusterState(systemState);
+ enable_distributor_cluster_state(state);
// This is for all intents and purposes a hack to avoid having the
// distributor treat setting the distribution explicitly as a signal that
// it should send RequestBucketInfo to all configured nodes.
@@ -427,4 +437,10 @@ DistributorTestUtil::enableDistributorClusterState(vespalib::stringref state)
lib::ClusterStateBundle(lib::ClusterState(state)));
}
+void
+DistributorTestUtil::enable_distributor_cluster_state(const lib::ClusterStateBundle& state)
+{
+ getBucketDBUpdater().simulate_cluster_state_bundle_activation(state);
+}
+
}
diff --git a/storage/src/tests/distributor/distributortestutil.h b/storage/src/tests/distributor/distributortestutil.h
index 1bdb6e33512..ee0e1a9eb65 100644
--- a/storage/src/tests/distributor/distributortestutil.h
+++ b/storage/src/tests/distributor/distributortestutil.h
@@ -161,6 +161,12 @@ public:
uint32_t earlyReturn = false,
bool requirePrimaryToBeWritten = true);
+ void setup_distributor(int redundancy,
+ int node_count,
+ const lib::ClusterStateBundle& state,
+ uint32_t early_return = false,
+ bool require_primary_to_be_written = true);
+
void setRedundancy(uint32_t redundancy);
void notifyDoneInitializing() override {}
@@ -206,6 +212,7 @@ protected:
MessageSenderImpl _messageSender;
void enableDistributorClusterState(vespalib::stringref state);
+ void enable_distributor_cluster_state(const lib::ClusterStateBundle& state);
};
}
diff --git a/storage/src/tests/distributor/externaloperationhandlertest.cpp b/storage/src/tests/distributor/externaloperationhandlertest.cpp
index e566f16dcdd..eae7ad7fcde 100644
--- a/storage/src/tests/distributor/externaloperationhandlertest.cpp
+++ b/storage/src/tests/distributor/externaloperationhandlertest.cpp
@@ -66,6 +66,8 @@ struct ExternalOperationHandlerTest : Test, DistributorTestUtil {
void set_up_distributor_for_sequencing_test();
+ void set_up_distributor_with_feed_blocked_state();
+
const vespalib::string _dummy_id{"id:foo:testdoctype1::bar"};
// Returns an arbitrary bucket not owned in the pending state
@@ -355,6 +357,13 @@ void ExternalOperationHandlerTest::set_up_distributor_for_sequencing_test() {
setupDistributor(1, 2, "version:1 distributor:1 storage:1");
}
+void ExternalOperationHandlerTest::set_up_distributor_with_feed_blocked_state() {
+ createLinks();
+ setup_distributor(1, 2,
+ lib::ClusterStateBundle(lib::ClusterState("version:1 distributor:1 storage:1"),
+ {}, {true, "full disk"}, false));
+}
+
void ExternalOperationHandlerTest::start_operation_verify_not_rejected(
std::shared_ptr<api::StorageCommand> cmd,
Operation::SP& out_generated)
@@ -564,6 +573,16 @@ TEST_F(ExternalOperationHandlerTest, gets_are_sent_with_weak_consistency_if_conf
do_test_get_weak_consistency_is_propagated(true);
}
+TEST_F(ExternalOperationHandlerTest, puts_are_rejected_if_feed_is_blocked) {
+ set_up_distributor_with_feed_blocked_state();
+
+ ASSERT_NO_FATAL_FAILURE(start_operation_verify_rejected(
+ makePutCommand("testdoctype1", "id:foo:testdoctype1::foo")));
+ EXPECT_EQ("ReturnCode(NO_SPACE, External feed is blocked due to resource exhaustion: full disk)",
+ _sender.reply(0)->getResult().toString());
+}
+
+
struct OperationHandlerSequencingTest : ExternalOperationHandlerTest {
void SetUp() override {
set_up_distributor_for_sequencing_test();
diff --git a/storage/src/vespa/storage/distributor/distributor_operation_context.h b/storage/src/vespa/storage/distributor/distributor_operation_context.h
index 083ffcdacf4..97a522a694a 100644
--- a/storage/src/vespa/storage/distributor/distributor_operation_context.h
+++ b/storage/src/vespa/storage/distributor/distributor_operation_context.h
@@ -12,6 +12,7 @@
namespace document { class Bucket; }
namespace storage { class DistributorConfiguration; }
+namespace storage::lib { class ClusterStateBundle; }
namespace storage::distributor {
@@ -48,6 +49,7 @@ public:
const document::Bucket& bucket,
uint32_t message_type) const = 0;
virtual const lib::ClusterState* pending_cluster_state_or_null(const document::BucketSpace& bucket_space) const = 0;
+ virtual const lib::ClusterStateBundle& cluster_state_bundle() const = 0;
// TODO: Move to being a free function instead.
virtual const char* storage_node_up_states() const = 0;
diff --git a/storage/src/vespa/storage/distributor/distributorcomponent.h b/storage/src/vespa/storage/distributor/distributorcomponent.h
index 13c0a72d44e..fc5f1663cfe 100644
--- a/storage/src/vespa/storage/distributor/distributorcomponent.h
+++ b/storage/src/vespa/storage/distributor/distributorcomponent.h
@@ -201,6 +201,9 @@ public:
const lib::ClusterState* pending_cluster_state_or_null(const document::BucketSpace& bucket_space) const override {
return getDistributor().pendingClusterStateOrNull(bucket_space);
}
+ const lib::ClusterStateBundle& cluster_state_bundle() const override {
+ return getClusterStateBundle();
+ }
const char* storage_node_up_states() const override {
return getDistributor().getStorageNodeUpStates();
}
diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
index 4d960d69322..1e76e701a7e 100644
--- a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
+++ b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
@@ -284,6 +284,14 @@ std::string extract_reindexing_token(const api::PutCommand& cmd) {
}
bool ExternalOperationHandler::onPut(const std::shared_ptr<api::PutCommand>& cmd) {
+ if (_op_ctx.cluster_state_bundle().block_feed_in_cluster()) {
+ const auto& feed_block = _op_ctx.cluster_state_bundle().feed_block();
+ bounce_with_result(*cmd, api::ReturnCode(api::ReturnCode::NO_SPACE,
+ "External feed is blocked due to resource exhaustion: " +
+ feed_block->description()));
+ return true;
+ }
+
auto& metrics = getMetrics().puts;
if (!checkTimestampMutationPreconditions(*cmd, _op_ctx.make_split_bit_constrained_bucket_id(cmd->getDocumentId()), metrics)) {
return true;