summaryrefslogtreecommitdiffstats
path: root/storage/src/tests/distributor/externaloperationhandlertest.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'storage/src/tests/distributor/externaloperationhandlertest.cpp')
-rw-r--r--storage/src/tests/distributor/externaloperationhandlertest.cpp114
1 files changed, 95 insertions, 19 deletions
diff --git a/storage/src/tests/distributor/externaloperationhandlertest.cpp b/storage/src/tests/distributor/externaloperationhandlertest.cpp
index ddf88f50c36..ebf9d4e3bc8 100644
--- a/storage/src/tests/distributor/externaloperationhandlertest.cpp
+++ b/storage/src/tests/distributor/externaloperationhandlertest.cpp
@@ -4,6 +4,7 @@
#include <vespa/storage/distributor/externaloperationhandler.h>
#include <vespa/storage/distributor/distributor.h>
#include <vespa/storage/distributor/distributormetricsset.h>
+#include <vespa/storage/distributor/operations/external/getoperation.h>
#include <vespa/storageapi/message/persistence.h>
#include <vespa/document/repo/documenttyperepo.h>
#include <vespa/document/update/documentupdate.h>
@@ -20,8 +21,9 @@ class ExternalOperationHandlerTest : public CppUnit::TestFixture,
CPPUNIT_TEST_SUITE(ExternalOperationHandlerTest);
CPPUNIT_TEST(testBucketSplitMask);
- CPPUNIT_TEST(testOperationRejectedOnWrongDistribution);
- CPPUNIT_TEST(testOperationRejectedOnPendingWrongDistribution);
+ CPPUNIT_TEST(mutating_operation_wdr_bounced_on_wrong_current_distribution);
+ CPPUNIT_TEST(mutating_operation_busy_bounced_on_wrong_pending_distribution);
+ CPPUNIT_TEST(read_only_operation_wdr_bounced_on_wrong_current_distribution);
CPPUNIT_TEST(reject_put_if_not_past_safe_time_point);
CPPUNIT_TEST(reject_remove_if_not_past_safe_time_point);
CPPUNIT_TEST(reject_update_if_not_past_safe_time_point);
@@ -37,6 +39,9 @@ class ExternalOperationHandlerTest : public CppUnit::TestFixture,
CPPUNIT_TEST(concurrent_get_and_mutation_do_not_conflict);
CPPUNIT_TEST(sequencing_works_across_mutation_types);
CPPUNIT_TEST(sequencing_can_be_explicitly_config_disabled);
+ CPPUNIT_TEST(gets_are_started_with_mutable_db_outside_transition_period);
+ CPPUNIT_TEST(gets_are_started_with_read_only_db_during_transition_period);
+ CPPUNIT_TEST(gets_are_busy_bounced_during_transition_period_if_stale_reads_disabled);
CPPUNIT_TEST_SUITE_END();
document::BucketId findNonOwnedUserBucketInState(vespalib::stringref state);
@@ -49,6 +54,7 @@ class ExternalOperationHandlerTest : public CppUnit::TestFixture,
std::shared_ptr<api::UpdateCommand> makeUpdateCommand(const vespalib::string& doc_type,
const vespalib::string& id) const;
std::shared_ptr<api::UpdateCommand> makeUpdateCommand() const;
+ std::shared_ptr<api::UpdateCommand> makeUpdateCommandForUser(uint64_t id) const;
std::shared_ptr<api::PutCommand> makePutCommand(const vespalib::string& doc_type,
const vespalib::string& id) const;
std::shared_ptr<api::RemoveCommand> makeRemoveCommand(const vespalib::string& id) const;
@@ -80,10 +86,14 @@ class ExternalOperationHandlerTest : public CppUnit::TestFixture,
const vespalib::string _dummy_id{"id:foo:testdoctype1::bar"};
+ // Returns an arbitrary bucket not owned in the pending state
+ document::BucketId set_up_pending_cluster_state_transition(bool read_only_enabled);
+
protected:
void testBucketSplitMask();
- void testOperationRejectedOnWrongDistribution();
- void testOperationRejectedOnPendingWrongDistribution();
+ void mutating_operation_wdr_bounced_on_wrong_current_distribution();
+ void read_only_operation_wdr_bounced_on_wrong_current_distribution();
+ void mutating_operation_busy_bounced_on_wrong_pending_distribution();
void reject_put_if_not_past_safe_time_point();
void reject_remove_if_not_past_safe_time_point();
void reject_update_if_not_past_safe_time_point();
@@ -99,6 +109,9 @@ protected:
void concurrent_get_and_mutation_do_not_conflict();
void sequencing_works_across_mutation_types();
void sequencing_can_be_explicitly_config_disabled();
+ void gets_are_started_with_mutable_db_outside_transition_period();
+ void gets_are_started_with_read_only_db_during_transition_period();
+ void gets_are_busy_bounced_during_transition_period_if_stale_reads_disabled();
void assert_rejection_due_to_unsafe_time(
std::shared_ptr<api::StorageCommand> cmd);
@@ -220,6 +233,11 @@ ExternalOperationHandlerTest::makeUpdateCommand() const {
return makeUpdateCommand("testdoctype1", "id:foo:testdoctype1::baz");
}
+std::shared_ptr<api::UpdateCommand>
+ExternalOperationHandlerTest::makeUpdateCommandForUser(uint64_t id) const {
+ return makeUpdateCommand("testdoctype1", vespalib::make_string("id::testdoctype1:n=%" PRIu64 ":bar", id));
+}
+
std::shared_ptr<api::PutCommand> ExternalOperationHandlerTest::makePutCommand(
const vespalib::string& doc_type,
const vespalib::string& id) const {
@@ -233,7 +251,27 @@ std::shared_ptr<api::RemoveCommand> ExternalOperationHandlerTest::makeRemoveComm
}
void
-ExternalOperationHandlerTest::testOperationRejectedOnWrongDistribution()
+ExternalOperationHandlerTest::mutating_operation_wdr_bounced_on_wrong_current_distribution()
+{
+ createLinks();
+ std::string state("distributor:2 storage:2");
+ setupDistributor(1, 2, state);
+
+ document::BucketId bucket(findNonOwnedUserBucketInState(state));
+ auto cmd = makeUpdateCommandForUser(bucket.withoutCountBits());
+
+ Operation::SP genOp;
+ CPPUNIT_ASSERT(getExternalOperationHandler().handleMessage(cmd, genOp));
+ CPPUNIT_ASSERT(!genOp.get());
+ CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.replies.size());
+ CPPUNIT_ASSERT_EQUAL(
+ std::string("ReturnCode(WRONG_DISTRIBUTION, "
+ "distributor:2 storage:2)"),
+ _sender.replies[0]->getResult().toString());
+}
+
+void
+ExternalOperationHandlerTest::read_only_operation_wdr_bounced_on_wrong_current_distribution()
{
createLinks();
std::string state("distributor:2 storage:2");
@@ -253,35 +291,27 @@ ExternalOperationHandlerTest::testOperationRejectedOnWrongDistribution()
}
void
-ExternalOperationHandlerTest::testOperationRejectedOnPendingWrongDistribution()
+ExternalOperationHandlerTest::mutating_operation_busy_bounced_on_wrong_pending_distribution()
{
createLinks();
- std::string current("distributor:2 storage:2");
- std::string pending("distributor:3 storage:3");
+ std::string current("version:10 distributor:2 storage:2");
+ std::string pending("version:11 distributor:3 storage:3");
setupDistributor(1, 3, current);
document::BucketId b(findOwned1stNotOwned2ndInStates(current, pending));
// Trigger pending cluster state
- auto stateCmd = std::make_shared<api::SetSystemStateCommand>(
- lib::ClusterState(pending));
+ auto stateCmd = std::make_shared<api::SetSystemStateCommand>(lib::ClusterState(pending));
getBucketDBUpdater().onSetSystemState(stateCmd);
- auto cmd = makeGetCommandForUser(b.withoutCountBits());
+ auto cmd = makeUpdateCommandForUser(b.withoutCountBits());
Operation::SP genOp;
CPPUNIT_ASSERT(getExternalOperationHandler().handleMessage(cmd, genOp));
CPPUNIT_ASSERT(!genOp.get());
CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.replies.size());
- // Fail back with _pending_ cluster state so client can start trying
- // correct distributor immediately. If that distributor has not yet
- // completed processing its pending cluster state, it'll return the
- // old (current) cluster state, causing the client to bounce between
- // the two until the pending states have been resolved. This is pretty
- // much inevitable with the current design.
CPPUNIT_ASSERT_EQUAL(
- std::string("ReturnCode(WRONG_DISTRIBUTION, "
- "distributor:3 storage:3)"),
+ std::string("ReturnCode(BUSY, Currently pending cluster state transition from version 10 to 11)"),
_sender.replies[0]->getResult().toString());
}
@@ -486,6 +516,52 @@ void ExternalOperationHandlerTest::sequencing_can_be_explicitly_config_disabled(
start_operation_verify_not_rejected(makeRemoveCommand(_dummy_id));
}
+void ExternalOperationHandlerTest::gets_are_started_with_mutable_db_outside_transition_period() {
+ createLinks();
+ std::string current = "distributor:1 storage:3";
+ setupDistributor(1, 3, current);
+ getConfig().setAllowStaleReadsDuringClusterStateTransitions(true);
+
+ document::BucketId b(16, 1234); // Only 1 distributor (us), so doesn't matter
+
+ auto op = start_operation_verify_not_rejected(makeGetCommandForUser(b.withoutCountBits()));
+ auto& get_op = dynamic_cast<GetOperation&>(*op);
+ const auto* expected_space = &getBucketSpaceRepo().get(document::FixedBucketSpaces::default_space());
+ CPPUNIT_ASSERT_EQUAL(expected_space, &get_op.bucketSpace());
+}
+
+document::BucketId ExternalOperationHandlerTest::set_up_pending_cluster_state_transition(bool read_only_enabled) {
+ createLinks();
+ std::string current = "version:123 distributor:2 storage:2";
+ std::string pending = "version:321 distributor:3 storage:3";
+ setupDistributor(1, 3, current);
+ getConfig().setAllowStaleReadsDuringClusterStateTransitions(read_only_enabled);
+
+ // Trigger pending cluster state
+ auto stateCmd = std::make_shared<api::SetSystemStateCommand>(lib::ClusterState(pending));
+ getBucketDBUpdater().onSetSystemState(stateCmd);
+ return findOwned1stNotOwned2ndInStates(current, pending);
+}
+
+void ExternalOperationHandlerTest::gets_are_started_with_read_only_db_during_transition_period() {
+ auto non_owned_bucket = set_up_pending_cluster_state_transition(true);
+
+ auto op = start_operation_verify_not_rejected(makeGetCommandForUser(non_owned_bucket.withoutCountBits()));
+ auto& get_op = dynamic_cast<GetOperation&>(*op);
+ const auto* expected_space = &getReadOnlyBucketSpaceRepo().get(document::FixedBucketSpaces::default_space());
+ CPPUNIT_ASSERT_EQUAL(expected_space, &get_op.bucketSpace());
+}
+
+void ExternalOperationHandlerTest::gets_are_busy_bounced_during_transition_period_if_stale_reads_disabled() {
+ auto non_owned_bucket = set_up_pending_cluster_state_transition(false);
+
+ start_operation_verify_rejected(makeGetCommandForUser(non_owned_bucket.withoutCountBits()));
+ CPPUNIT_ASSERT_EQUAL(
+ std::string("ReturnCode(BUSY, Currently pending cluster state transition from version 123 to 321)"),
+ _sender.replies[0]->getResult().toString());
+
+}
+
// TODO support sequencing of RemoveLocation? It's a mutating operation, but supporting it with
// the current approach is not trivial. A RemoveLocation operation covers the _entire_ bucket
// sub tree under a given location, while the sequencer works on individual GIDs. Mapping the