summaryrefslogtreecommitdiffstats
path: root/storage/src
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2019-03-13 13:37:23 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2019-03-14 14:44:29 +0000
commit4a18ca637cff723bcc45acc425689a69bcf4db66 (patch)
tree4c91b293de883cf16a0d8f024536498fce2da70a /storage/src
parentb97b0a3cf981c20ac1c5b7733116c5a218aa2c9b (diff)
Move non-owned buckets to read-only DB and allow use for read-only ops
Diffstat (limited to 'storage/src')
-rw-r--r--storage/src/tests/distributor/bucketdbupdatertest.cpp96
-rw-r--r--storage/src/tests/distributor/externaloperationhandlertest.cpp114
-rw-r--r--storage/src/vespa/storage/config/distributorconfiguration.cpp2
-rw-r--r--storage/src/vespa/storage/config/distributorconfiguration.h8
-rw-r--r--storage/src/vespa/storage/config/stor-distributormanager.def7
-rw-r--r--storage/src/vespa/storage/distributor/bucketdbupdater.cpp16
-rw-r--r--storage/src/vespa/storage/distributor/bucketdbupdater.h3
-rw-r--r--storage/src/vespa/storage/distributor/distributormetricsset.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/distributormetricsset.h2
-rw-r--r--storage/src/vespa/storage/distributor/externaloperationhandler.cpp121
-rw-r--r--storage/src/vespa/storage/distributor/externaloperationhandler.h13
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/getoperation.h3
12 files changed, 321 insertions, 66 deletions
diff --git a/storage/src/tests/distributor/bucketdbupdatertest.cpp b/storage/src/tests/distributor/bucketdbupdatertest.cpp
index bad6e80de47..6c70988cbc6 100644
--- a/storage/src/tests/distributor/bucketdbupdatertest.cpp
+++ b/storage/src/tests/distributor/bucketdbupdatertest.cpp
@@ -26,6 +26,7 @@ using document::test::makeDocumentBucket;
using document::test::makeBucketSpace;
using document::BucketSpace;
using document::FixedBucketSpaces;
+using document::BucketId;
namespace storage::distributor {
@@ -113,6 +114,7 @@ class BucketDBUpdaterTest : public CppUnit::TestFixture,
CPPUNIT_TEST(batch_update_from_distributor_change_does_not_mark_diverging_replicas_as_trusted);
CPPUNIT_TEST(global_distribution_hash_falls_back_to_legacy_format_upon_request_rejection);
CPPUNIT_TEST(non_owned_buckets_moved_to_read_only_db_on_ownership_change);
+ CPPUNIT_TEST(non_owned_buckets_purged_when_read_only_support_is_config_disabled);
CPPUNIT_TEST_SUITE_END();
/*
@@ -186,6 +188,7 @@ protected:
void batch_update_from_distributor_change_does_not_mark_diverging_replicas_as_trusted();
void global_distribution_hash_falls_back_to_legacy_format_upon_request_rejection();
void non_owned_buckets_moved_to_read_only_db_on_ownership_change();
+ void non_owned_buckets_purged_when_read_only_support_is_config_disabled();
auto &defaultDistributorBucketSpace() { return getBucketSpaceRepo().get(makeBucketSpace()); }
@@ -243,7 +246,7 @@ public:
uint32_t bucketCount,
uint32_t invalidBucketCount = 0)
{
- RequestBucketInfoReply* sreply = new RequestBucketInfoReply(cmd);
+ auto sreply = std::make_shared<RequestBucketInfoReply>(cmd);
sreply->setAddress(storageAddress(storageIndex));
api::RequestBucketInfoReply::EntryVector &vec = sreply->getBucketInfo();
@@ -276,7 +279,7 @@ public:
}
}
- return std::shared_ptr<api::RequestBucketInfoReply>(sreply);
+ return sreply;
}
void fakeBucketReply(const lib::ClusterState &state,
@@ -619,19 +622,17 @@ public:
}
};
- auto createPendingStateFixtureForStateChange(
+ std::unique_ptr<PendingClusterStateFixture> createPendingStateFixtureForStateChange(
const std::string& oldClusterState,
const std::string& newClusterState)
{
- return std::make_unique<PendingClusterStateFixture>(
- *this, oldClusterState, newClusterState);
+ return std::make_unique<PendingClusterStateFixture>(*this, oldClusterState, newClusterState);
}
- auto createPendingStateFixtureForDistributionChange(
+ std::unique_ptr<PendingClusterStateFixture> createPendingStateFixtureForDistributionChange(
const std::string& oldClusterState)
{
- return std::make_unique<PendingClusterStateFixture>(
- *this, oldClusterState);
+ return std::make_unique<PendingClusterStateFixture>(*this, oldClusterState);
}
};
@@ -639,8 +640,8 @@ CPPUNIT_TEST_SUITE_REGISTRATION(BucketDBUpdaterTest);
BucketDBUpdaterTest::BucketDBUpdaterTest()
: CppUnit::TestFixture(),
- DistributorTestUtil(),
- _bucketSpaces()
+ DistributorTestUtil(),
+ _bucketSpaces()
{
}
@@ -2617,8 +2618,83 @@ void BucketDBUpdaterTest::global_distribution_hash_falls_back_to_legacy_format_u
CPPUNIT_ASSERT_EQUAL(current_hash, new_current_req.getDistributionHash());
}
+namespace {
+
+template <typename Func>
+void for_each_bucket(const BucketDatabase& db, Func f) {
+ BucketId last(0);
+ auto e = db.getNext(last);
+ while (e.valid()) {
+ f(e);
+ e = db.getNext(e.getBucketId());
+ }
+}
+
+}
+
+using ConfigBuilder = vespa::config::content::core::StorDistributormanagerConfigBuilder;
+
void BucketDBUpdaterTest::non_owned_buckets_moved_to_read_only_db_on_ownership_change() {
+ getConfig().setAllowStaleReadsDuringClusterStateTransitions(true);
+
+ lib::ClusterState initial_state("distributor:1 storage:4"); // All buckets owned by us by definition
+ setSystemState(initial_state);
+
+ CPPUNIT_ASSERT_EQUAL(messageCount(4), _sender.commands.size());
+ constexpr uint32_t n_buckets = 10;
+ completeBucketInfoGathering(initial_state, messageCount(4), n_buckets);
+ _sender.clear();
+
+ auto& default_db = mutable_repo().get(makeBucketSpace()).getBucketDatabase();
+ auto& read_only_db = read_only_repo().get(makeBucketSpace()).getBucketDatabase();
+
+ CPPUNIT_ASSERT_EQUAL(size_t(n_buckets), default_db.size());
+ CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_db.size());
+
+ lib::ClusterState pending_state("distributor:2 storage:4");
+
+ // TODO iterate over buckets with spaces instead
+ std::unordered_set<BucketId, BucketId::hash> buckets_not_owned_in_pending_state;
+ for_each_bucket(default_db, [&](const auto& entry) {
+ if (!getBucketDBUpdater().getDistributorComponent()
+ .ownsBucketInState(pending_state, makeDocumentBucket(entry.getBucketId()))) {
+ buckets_not_owned_in_pending_state.insert(entry.getBucketId());
+ }
+ });
+ CPPUNIT_ASSERT(!buckets_not_owned_in_pending_state.empty());
+
+ setSystemState(pending_state);
+
+ CPPUNIT_ASSERT_EQUAL(size_t(n_buckets - buckets_not_owned_in_pending_state.size()), default_db.size());
+ CPPUNIT_ASSERT_EQUAL(buckets_not_owned_in_pending_state.size(), read_only_db.size());
+
+ // TODO replace with gmock unordered set equality matcher
+ for_each_bucket(read_only_db, [&](const auto& entry) {
+ CPPUNIT_ASSERT(buckets_not_owned_in_pending_state.find(entry.getBucketId())
+ != buckets_not_owned_in_pending_state.end());
+ });
+
+ // TODO check global space too!
+}
+
+void BucketDBUpdaterTest::non_owned_buckets_purged_when_read_only_support_is_config_disabled() {
+ getConfig().setAllowStaleReadsDuringClusterStateTransitions(false);
+
+ lib::ClusterState initial_state("distributor:1 storage:4"); // All buckets owned by us by definition
+ setSystemState(initial_state);
+
+ CPPUNIT_ASSERT_EQUAL(messageCount(4), _sender.commands.size());
+ constexpr uint32_t n_buckets = 10;
+ completeBucketInfoGathering(initial_state, messageCount(4), n_buckets);
+ _sender.clear();
+
+ auto& read_only_db = read_only_repo().get(makeBucketSpace()).getBucketDatabase();
+ CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_db.size());
+ lib::ClusterState pending_state("distributor:2 storage:4");
+ setSystemState(pending_state);
+ // No buckets should be moved into read only db
+ CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_db.size());
}
}
diff --git a/storage/src/tests/distributor/externaloperationhandlertest.cpp b/storage/src/tests/distributor/externaloperationhandlertest.cpp
index ddf88f50c36..ebf9d4e3bc8 100644
--- a/storage/src/tests/distributor/externaloperationhandlertest.cpp
+++ b/storage/src/tests/distributor/externaloperationhandlertest.cpp
@@ -4,6 +4,7 @@
#include <vespa/storage/distributor/externaloperationhandler.h>
#include <vespa/storage/distributor/distributor.h>
#include <vespa/storage/distributor/distributormetricsset.h>
+#include <vespa/storage/distributor/operations/external/getoperation.h>
#include <vespa/storageapi/message/persistence.h>
#include <vespa/document/repo/documenttyperepo.h>
#include <vespa/document/update/documentupdate.h>
@@ -20,8 +21,9 @@ class ExternalOperationHandlerTest : public CppUnit::TestFixture,
CPPUNIT_TEST_SUITE(ExternalOperationHandlerTest);
CPPUNIT_TEST(testBucketSplitMask);
- CPPUNIT_TEST(testOperationRejectedOnWrongDistribution);
- CPPUNIT_TEST(testOperationRejectedOnPendingWrongDistribution);
+ CPPUNIT_TEST(mutating_operation_wdr_bounced_on_wrong_current_distribution);
+ CPPUNIT_TEST(mutating_operation_busy_bounced_on_wrong_pending_distribution);
+ CPPUNIT_TEST(read_only_operation_wdr_bounced_on_wrong_current_distribution);
CPPUNIT_TEST(reject_put_if_not_past_safe_time_point);
CPPUNIT_TEST(reject_remove_if_not_past_safe_time_point);
CPPUNIT_TEST(reject_update_if_not_past_safe_time_point);
@@ -37,6 +39,9 @@ class ExternalOperationHandlerTest : public CppUnit::TestFixture,
CPPUNIT_TEST(concurrent_get_and_mutation_do_not_conflict);
CPPUNIT_TEST(sequencing_works_across_mutation_types);
CPPUNIT_TEST(sequencing_can_be_explicitly_config_disabled);
+ CPPUNIT_TEST(gets_are_started_with_mutable_db_outside_transition_period);
+ CPPUNIT_TEST(gets_are_started_with_read_only_db_during_transition_period);
+ CPPUNIT_TEST(gets_are_busy_bounced_during_transition_period_if_stale_reads_disabled);
CPPUNIT_TEST_SUITE_END();
document::BucketId findNonOwnedUserBucketInState(vespalib::stringref state);
@@ -49,6 +54,7 @@ class ExternalOperationHandlerTest : public CppUnit::TestFixture,
std::shared_ptr<api::UpdateCommand> makeUpdateCommand(const vespalib::string& doc_type,
const vespalib::string& id) const;
std::shared_ptr<api::UpdateCommand> makeUpdateCommand() const;
+ std::shared_ptr<api::UpdateCommand> makeUpdateCommandForUser(uint64_t id) const;
std::shared_ptr<api::PutCommand> makePutCommand(const vespalib::string& doc_type,
const vespalib::string& id) const;
std::shared_ptr<api::RemoveCommand> makeRemoveCommand(const vespalib::string& id) const;
@@ -80,10 +86,14 @@ class ExternalOperationHandlerTest : public CppUnit::TestFixture,
const vespalib::string _dummy_id{"id:foo:testdoctype1::bar"};
+ // Returns an arbitrary bucket not owned in the pending state
+ document::BucketId set_up_pending_cluster_state_transition(bool read_only_enabled);
+
protected:
void testBucketSplitMask();
- void testOperationRejectedOnWrongDistribution();
- void testOperationRejectedOnPendingWrongDistribution();
+ void mutating_operation_wdr_bounced_on_wrong_current_distribution();
+ void read_only_operation_wdr_bounced_on_wrong_current_distribution();
+ void mutating_operation_busy_bounced_on_wrong_pending_distribution();
void reject_put_if_not_past_safe_time_point();
void reject_remove_if_not_past_safe_time_point();
void reject_update_if_not_past_safe_time_point();
@@ -99,6 +109,9 @@ protected:
void concurrent_get_and_mutation_do_not_conflict();
void sequencing_works_across_mutation_types();
void sequencing_can_be_explicitly_config_disabled();
+ void gets_are_started_with_mutable_db_outside_transition_period();
+ void gets_are_started_with_read_only_db_during_transition_period();
+ void gets_are_busy_bounced_during_transition_period_if_stale_reads_disabled();
void assert_rejection_due_to_unsafe_time(
std::shared_ptr<api::StorageCommand> cmd);
@@ -220,6 +233,11 @@ ExternalOperationHandlerTest::makeUpdateCommand() const {
return makeUpdateCommand("testdoctype1", "id:foo:testdoctype1::baz");
}
+std::shared_ptr<api::UpdateCommand>
+ExternalOperationHandlerTest::makeUpdateCommandForUser(uint64_t id) const {
+ return makeUpdateCommand("testdoctype1", vespalib::make_string("id::testdoctype1:n=%" PRIu64 ":bar", id));
+}
+
std::shared_ptr<api::PutCommand> ExternalOperationHandlerTest::makePutCommand(
const vespalib::string& doc_type,
const vespalib::string& id) const {
@@ -233,7 +251,27 @@ std::shared_ptr<api::RemoveCommand> ExternalOperationHandlerTest::makeRemoveComm
}
void
-ExternalOperationHandlerTest::testOperationRejectedOnWrongDistribution()
+ExternalOperationHandlerTest::mutating_operation_wdr_bounced_on_wrong_current_distribution()
+{
+ createLinks();
+ std::string state("distributor:2 storage:2");
+ setupDistributor(1, 2, state);
+
+ document::BucketId bucket(findNonOwnedUserBucketInState(state));
+ auto cmd = makeUpdateCommandForUser(bucket.withoutCountBits());
+
+ Operation::SP genOp;
+ CPPUNIT_ASSERT(getExternalOperationHandler().handleMessage(cmd, genOp));
+ CPPUNIT_ASSERT(!genOp.get());
+ CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.replies.size());
+ CPPUNIT_ASSERT_EQUAL(
+ std::string("ReturnCode(WRONG_DISTRIBUTION, "
+ "distributor:2 storage:2)"),
+ _sender.replies[0]->getResult().toString());
+}
+
+void
+ExternalOperationHandlerTest::read_only_operation_wdr_bounced_on_wrong_current_distribution()
{
createLinks();
std::string state("distributor:2 storage:2");
@@ -253,35 +291,27 @@ ExternalOperationHandlerTest::testOperationRejectedOnWrongDistribution()
}
void
-ExternalOperationHandlerTest::testOperationRejectedOnPendingWrongDistribution()
+ExternalOperationHandlerTest::mutating_operation_busy_bounced_on_wrong_pending_distribution()
{
createLinks();
- std::string current("distributor:2 storage:2");
- std::string pending("distributor:3 storage:3");
+ std::string current("version:10 distributor:2 storage:2");
+ std::string pending("version:11 distributor:3 storage:3");
setupDistributor(1, 3, current);
document::BucketId b(findOwned1stNotOwned2ndInStates(current, pending));
// Trigger pending cluster state
- auto stateCmd = std::make_shared<api::SetSystemStateCommand>(
- lib::ClusterState(pending));
+ auto stateCmd = std::make_shared<api::SetSystemStateCommand>(lib::ClusterState(pending));
getBucketDBUpdater().onSetSystemState(stateCmd);
- auto cmd = makeGetCommandForUser(b.withoutCountBits());
+ auto cmd = makeUpdateCommandForUser(b.withoutCountBits());
Operation::SP genOp;
CPPUNIT_ASSERT(getExternalOperationHandler().handleMessage(cmd, genOp));
CPPUNIT_ASSERT(!genOp.get());
CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.replies.size());
- // Fail back with _pending_ cluster state so client can start trying
- // correct distributor immediately. If that distributor has not yet
- // completed processing its pending cluster state, it'll return the
- // old (current) cluster state, causing the client to bounce between
- // the two until the pending states have been resolved. This is pretty
- // much inevitable with the current design.
CPPUNIT_ASSERT_EQUAL(
- std::string("ReturnCode(WRONG_DISTRIBUTION, "
- "distributor:3 storage:3)"),
+ std::string("ReturnCode(BUSY, Currently pending cluster state transition from version 10 to 11)"),
_sender.replies[0]->getResult().toString());
}
@@ -486,6 +516,52 @@ void ExternalOperationHandlerTest::sequencing_can_be_explicitly_config_disabled(
start_operation_verify_not_rejected(makeRemoveCommand(_dummy_id));
}
+void ExternalOperationHandlerTest::gets_are_started_with_mutable_db_outside_transition_period() {
+ createLinks();
+ std::string current = "distributor:1 storage:3";
+ setupDistributor(1, 3, current);
+ getConfig().setAllowStaleReadsDuringClusterStateTransitions(true);
+
+ document::BucketId b(16, 1234); // Only 1 distributor (us), so doesn't matter
+
+ auto op = start_operation_verify_not_rejected(makeGetCommandForUser(b.withoutCountBits()));
+ auto& get_op = dynamic_cast<GetOperation&>(*op);
+ const auto* expected_space = &getBucketSpaceRepo().get(document::FixedBucketSpaces::default_space());
+ CPPUNIT_ASSERT_EQUAL(expected_space, &get_op.bucketSpace());
+}
+
+document::BucketId ExternalOperationHandlerTest::set_up_pending_cluster_state_transition(bool read_only_enabled) {
+ createLinks();
+ std::string current = "version:123 distributor:2 storage:2";
+ std::string pending = "version:321 distributor:3 storage:3";
+ setupDistributor(1, 3, current);
+ getConfig().setAllowStaleReadsDuringClusterStateTransitions(read_only_enabled);
+
+ // Trigger pending cluster state
+ auto stateCmd = std::make_shared<api::SetSystemStateCommand>(lib::ClusterState(pending));
+ getBucketDBUpdater().onSetSystemState(stateCmd);
+ return findOwned1stNotOwned2ndInStates(current, pending);
+}
+
+void ExternalOperationHandlerTest::gets_are_started_with_read_only_db_during_transition_period() {
+ auto non_owned_bucket = set_up_pending_cluster_state_transition(true);
+
+ auto op = start_operation_verify_not_rejected(makeGetCommandForUser(non_owned_bucket.withoutCountBits()));
+ auto& get_op = dynamic_cast<GetOperation&>(*op);
+ const auto* expected_space = &getReadOnlyBucketSpaceRepo().get(document::FixedBucketSpaces::default_space());
+ CPPUNIT_ASSERT_EQUAL(expected_space, &get_op.bucketSpace());
+}
+
+void ExternalOperationHandlerTest::gets_are_busy_bounced_during_transition_period_if_stale_reads_disabled() {
+ auto non_owned_bucket = set_up_pending_cluster_state_transition(false);
+
+ start_operation_verify_rejected(makeGetCommandForUser(non_owned_bucket.withoutCountBits()));
+ CPPUNIT_ASSERT_EQUAL(
+ std::string("ReturnCode(BUSY, Currently pending cluster state transition from version 123 to 321)"),
+ _sender.replies[0]->getResult().toString());
+
+}
+
// TODO support sequencing of RemoveLocation? It's a mutating operation, but supporting it with
// the current approach is not trivial. A RemoveLocation operation covers the _entire_ bucket
// sub tree under a given location, while the sequencer works on individual GIDs. Mapping the
diff --git a/storage/src/vespa/storage/config/distributorconfiguration.cpp b/storage/src/vespa/storage/config/distributorconfiguration.cpp
index 44cf56fdff8..294ce56f536 100644
--- a/storage/src/vespa/storage/config/distributorconfiguration.cpp
+++ b/storage/src/vespa/storage/config/distributorconfiguration.cpp
@@ -34,6 +34,7 @@ DistributorConfiguration::DistributorConfiguration(StorageComponent& component)
_enableHostInfoReporting(true),
_disableBucketActivation(false),
_sequenceMutatingOperations(true),
+ _allowStaleReadsDuringClusterStateTransitions(false),
_minimumReplicaCountingMode(ReplicaCountingMode::TRUSTED)
{ }
@@ -144,6 +145,7 @@ DistributorConfiguration::configure(const vespa::config::content::core::StorDist
_enableHostInfoReporting = config.enableHostInfoReporting;
_disableBucketActivation = config.disableBucketActivation;
_sequenceMutatingOperations = config.sequenceMutatingOperations;
+ _allowStaleReadsDuringClusterStateTransitions = config.allowStaleReadsDuringClusterStateTransitions;
_minimumReplicaCountingMode = config.minimumReplicaCountingMode;
diff --git a/storage/src/vespa/storage/config/distributorconfiguration.h b/storage/src/vespa/storage/config/distributorconfiguration.h
index 5dfc4f66cb8..8c84fef47b5 100644
--- a/storage/src/vespa/storage/config/distributorconfiguration.h
+++ b/storage/src/vespa/storage/config/distributorconfiguration.h
@@ -235,6 +235,13 @@ public:
void setSequenceMutatingOperations(bool sequenceMutations) noexcept {
_sequenceMutatingOperations = sequenceMutations;
}
+
+ bool allowStaleReadsDuringClusterStateTransitions() const noexcept {
+ return _allowStaleReadsDuringClusterStateTransitions;
+ }
+ void setAllowStaleReadsDuringClusterStateTransitions(bool allow) noexcept {
+ _allowStaleReadsDuringClusterStateTransitions = allow;
+ }
private:
DistributorConfiguration(const DistributorConfiguration& other);
@@ -274,6 +281,7 @@ private:
bool _enableHostInfoReporting;
bool _disableBucketActivation;
bool _sequenceMutatingOperations;
+ bool _allowStaleReadsDuringClusterStateTransitions;
DistrConfig::MinimumReplicaCountingMode _minimumReplicaCountingMode;
diff --git a/storage/src/vespa/storage/config/stor-distributormanager.def b/storage/src/vespa/storage/config/stor-distributormanager.def
index 89aad427ca9..d4f69073cc6 100644
--- a/storage/src/vespa/storage/config/stor-distributormanager.def
+++ b/storage/src/vespa/storage/config/stor-distributormanager.def
@@ -184,3 +184,10 @@ sequence_mutating_operations bool default=true
## towards a node if it has indicated that its merge queues are full or it is
## suffering from resource exhaustion.
inhibit_merge_sending_on_busy_node_duration_sec int default=10
+
+## If set, enables potentially stale reads during cluster state transitions where
+## buckets change ownership. This also implicitly enables support for two-phase
+## cluster state transitions on the distributor.
+## For this option to take effect, the cluster controller must also have two-phase
+## states enabled.
+allow_stale_reads_during_cluster_state_transitions bool default=false
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
index 97028c20191..9c6ec4a7f0e 100644
--- a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
+++ b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
@@ -114,24 +114,33 @@ void
BucketDBUpdater::removeSuperfluousBuckets(
const lib::ClusterStateBundle& newState)
{
+ // TODO too many indirections to get at config.
+ const bool move_to_read_only_db = _distributorComponent.getDistributor().getConfig()
+ .allowStaleReadsDuringClusterStateTransitions();
for (auto &elem : _distributorComponent.getBucketSpaceRepo()) {
const auto &newDistribution(elem.second->getDistribution());
const auto &oldClusterState(elem.second->getClusterState());
auto &bucketDb(elem.second->getBucketDatabase());
+ auto& readOnlyDb(_distributorComponent.getReadOnlyBucketSpaceRepo().get(elem.first).getBucketDatabase());
// Remove all buckets not belonging to this distributor, or
// being on storage nodes that are no longer up.
NodeRemover proc(
oldClusterState,
*newState.getDerivedClusterState(elem.first),
- _distributorComponent.getBucketIdFactory(),
_distributorComponent.getIndex(),
newDistribution,
_distributorComponent.getDistributor().getStorageNodeUpStates());
bucketDb.forEach(proc);
- for (const auto & entry :proc.getBucketsToRemove()) {
- bucketDb.remove(entry);
+ // TODO vec of Entry instead to avoid lookup and remove? Uses more transient memory...
+ for (const auto & bucket : proc.getBucketsToRemove()) {
+ if (move_to_read_only_db) {
+ // TODO explicit transfer function?
+ auto db_entry = bucketDb.get(bucket);
+ readOnlyDb.update(db_entry); // TODO Entry move support
+ }
+ bucketDb.remove(bucket);
}
}
}
@@ -202,7 +211,6 @@ BucketDBUpdater::onSetSystemState(
}
ensureTransitionTimerStarted();
- // TODO
removeSuperfluousBuckets(cmd->getClusterStateBundle());
replyToPreviousPendingClusterStateIfAny();
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.h b/storage/src/vespa/storage/distributor/bucketdbupdater.h
index a31b62d4e9b..18893edaeda 100644
--- a/storage/src/vespa/storage/distributor/bucketdbupdater.h
+++ b/storage/src/vespa/storage/distributor/bucketdbupdater.h
@@ -192,7 +192,6 @@ private:
public:
NodeRemover(const lib::ClusterState& oldState,
const lib::ClusterState& s,
- [[maybe_unused]] const document::BucketIdFactory& factory,
uint16_t localIndex,
const lib::Distribution& distribution,
const char* upStates)
@@ -202,7 +201,7 @@ private:
_distribution(distribution),
_upStates(upStates) {}
- ~NodeRemover();
+ ~NodeRemover() override;
bool process(BucketDatabase::Entry& e) override;
void logRemove(const document::BucketId& bucketId, const char* msg) const;
bool distributorOwnsBucket(const document::BucketId&) const;
diff --git a/storage/src/vespa/storage/distributor/distributormetricsset.cpp b/storage/src/vespa/storage/distributor/distributormetricsset.cpp
index 927dc06182d..83923a1f00e 100644
--- a/storage/src/vespa/storage/distributor/distributormetricsset.cpp
+++ b/storage/src/vespa/storage/distributor/distributormetricsset.cpp
@@ -17,7 +17,7 @@ DistributorMetricSet::DistributorMetricSet(const metrics::LoadTypeSet& lt)
removelocations(lt, PersistenceOperationMetricSet("removelocations"), this),
gets(lt, PersistenceOperationMetricSet("gets"), this),
stats(lt, PersistenceOperationMetricSet("stats"), this),
- multioperations(lt, PersistenceOperationMetricSet("multioperations"), this),
+ getbucketlists(lt, PersistenceOperationMetricSet("getbucketlists"), this),
visits(lt, VisitorMetricSet(), this),
stateTransitionTime("state_transition_time", {},
"Time it takes to complete a cluster state transition. If a "
diff --git a/storage/src/vespa/storage/distributor/distributormetricsset.h b/storage/src/vespa/storage/distributor/distributormetricsset.h
index 5a64027f500..dfe976a89ab 100644
--- a/storage/src/vespa/storage/distributor/distributormetricsset.h
+++ b/storage/src/vespa/storage/distributor/distributormetricsset.h
@@ -20,7 +20,7 @@ public:
metrics::LoadMetric<PersistenceOperationMetricSet> removelocations;
metrics::LoadMetric<PersistenceOperationMetricSet> gets;
metrics::LoadMetric<PersistenceOperationMetricSet> stats;
- metrics::LoadMetric<PersistenceOperationMetricSet> multioperations;
+ metrics::LoadMetric<PersistenceOperationMetricSet> getbucketlists;
metrics::LoadMetric<VisitorMetricSet> visits;
metrics::DoubleAverageMetric stateTransitionTime;
metrics::DoubleAverageMetric recoveryModeTime;
diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
index 447a5bb156c..5d3261a71f7 100644
--- a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
+++ b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
@@ -20,6 +20,8 @@
#include "distributor_bucket_space.h"
#include <vespa/log/log.h>
+#include <vespa/document/bucket/fixed_bucket_spaces.h>
+
LOG_SETUP(".distributor.manager");
namespace storage::distributor {
@@ -70,19 +72,60 @@ ExternalOperationHandler::checkSafeTimeReached(api::StorageCommand& cmd)
return true;
}
+void ExternalOperationHandler::bounce_with_wrong_distribution(api::StorageCommand& cmd) {
+ // Distributor ownership is equal across cluster states, so always send back default state.
+ // This also helps client avoid getting confused by possibly observing different actual
+ // states for global/non-global document types for the same state version.
+ auto cluster_state_str = _bucketSpaceRepo.get(document::FixedBucketSpaces::default_space())
+ .getClusterState().toString();
+ LOG(debug, "Got message with wrong distribution, sending back state '%s'",
+ cluster_state_str.c_str());
+
+ api::StorageReply::UP reply(cmd.makeReply());
+ api::ReturnCode ret(api::ReturnCode::WRONG_DISTRIBUTION, cluster_state_str);
+ reply->setResult(ret);
+ sendUp(std::shared_ptr<api::StorageMessage>(reply.release()));
+}
+
+void ExternalOperationHandler::bounce_with_busy_during_state_transition(
+ api::StorageCommand& cmd,
+ const lib::ClusterState& current_state,
+ const lib::ClusterState& pending_state)
+{
+ auto status_str = vespalib::make_string("Currently pending cluster state transition"
+ " from version %u to %u",
+ current_state.getVersion(), pending_state.getVersion());
+
+ api::StorageReply::UP reply(cmd.makeReply());
+ api::ReturnCode ret(api::ReturnCode::BUSY, status_str);
+ reply->setResult(ret);
+ sendUp(std::shared_ptr<api::StorageMessage>(reply.release()));
+}
+
bool
ExternalOperationHandler::checkTimestampMutationPreconditions(api::StorageCommand& cmd,
const document::BucketId &bucketId,
PersistenceOperationMetricSet& persistenceMetrics)
{
document::Bucket bucket(cmd.getBucket().getBucketSpace(), bucketId);
- if (!checkDistribution(cmd, bucket)) {
+ if (!ownsBucketInCurrentState(bucket)) {
LOG(debug, "Distributor manager received %s, bucket %s with wrong distribution",
cmd.toString().c_str(), bucket.toString().c_str());
-
+ bounce_with_wrong_distribution(cmd);
persistenceMetrics.failures.wrongdistributor.inc();
return false;
}
+
+ auto pending = getDistributor().checkOwnershipInPendingState(bucket);
+ if (!pending.isOwned()) {
+ // We return BUSY here instead of WrongDistributionReply to avoid clients potentially
+ // ping-ponging between cluster state versions during a state transition.
+ auto& current_state = _bucketSpaceRepo.get(document::FixedBucketSpaces::default_space()).getClusterState();
+ auto& pending_state = pending.getNonOwnedState();
+ bounce_with_busy_during_state_transition(cmd, current_state, pending_state);
+ return false;
+ }
+
if (!checkSafeTimeReached(cmd)) {
persistenceMetrics.failures.safe_time_not_reached.inc();
return false;
@@ -113,6 +156,35 @@ bool ExternalOperationHandler::allowMutation(const SequencingHandle& handle) con
return handle.valid();
}
+template <typename Func>
+void ExternalOperationHandler::bounce_or_invoke_read_only_op(
+ api::StorageCommand& cmd,
+ const document::Bucket& bucket,
+ PersistenceOperationMetricSet& metrics,
+ Func func)
+{
+ if (!ownsBucketInCurrentState(bucket)) {
+ LOG(debug, "Distributor manager received %s, bucket %s with wrong distribution",
+ cmd.toString().c_str(), bucket.toString().c_str());
+ bounce_with_wrong_distribution(cmd);
+ metrics.failures.wrongdistributor.inc();
+ return;
+ }
+
+ auto pending = getDistributor().checkOwnershipInPendingState(bucket);
+ if (pending.isOwned()) {
+ func(_bucketSpaceRepo);
+ } else {
+ if (getDistributor().getConfig().allowStaleReadsDuringClusterStateTransitions()) {
+ func(_readOnlyBucketSpaceRepo);
+ } else {
+ auto& current_state = _bucketSpaceRepo.get(document::FixedBucketSpaces::default_space()).getClusterState();
+ auto& pending_state = pending.getNonOwnedState();
+ bounce_with_busy_during_state_transition(cmd, current_state, pending_state);
+ }
+ }
+}
+
IMPL_MSG_COMMAND_H(ExternalOperationHandler, Put)
{
auto& metrics = getMetrics().puts[cmd->getLoadType()];
@@ -188,10 +260,8 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, RemoveLocation)
RemoveLocationOperation::getBucketId(*this, *cmd, bid);
document::Bucket bucket(cmd->getBucket().getBucketSpace(), bid);
- if (!checkDistribution(*cmd, bucket)) {
- LOG(debug, "Distributor manager received %s with wrong distribution", cmd->toString().c_str());
-
- getMetrics().removelocations[cmd->getLoadType()].failures.wrongdistributor.inc();
+ auto& metrics = getMetrics().removelocations[cmd->getLoadType()];
+ if (!checkTimestampMutationPreconditions(*cmd, bucket.getBucketId(), metrics)) {
return true;
}
@@ -203,43 +273,38 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, RemoveLocation)
IMPL_MSG_COMMAND_H(ExternalOperationHandler, Get)
{
document::Bucket bucket(cmd->getBucket().getBucketSpace(), getBucketId(cmd->getDocumentId()));
- if (!checkDistribution(*cmd, bucket)) {
- LOG(debug, "Distributor manager received get for %s, bucket %s with wrong distribution",
- cmd->getDocumentId().toString().c_str(), bucket.toString().c_str());
-
- getMetrics().gets[cmd->getLoadType()].failures.wrongdistributor.inc();
- return true;
- }
-
- _op = std::make_shared<GetOperation>(*this, _bucketSpaceRepo.get(cmd->getBucket().getBucketSpace()),
- cmd, getMetrics().gets[cmd->getLoadType()]);
+ auto& metrics = getMetrics().gets[cmd->getLoadType()];
+ bounce_or_invoke_read_only_op(*cmd, bucket, metrics, [&](auto& bucket_space_repo) {
+ _op = std::make_shared<GetOperation>(*this, bucket_space_repo.get(cmd->getBucket().getBucketSpace()),
+ cmd, metrics);
+ });
return true;
}
IMPL_MSG_COMMAND_H(ExternalOperationHandler, StatBucket)
{
- if (!checkDistribution(*cmd, cmd->getBucket())) {
- return true;
- }
- auto &distributorBucketSpace(_bucketSpaceRepo.get(cmd->getBucket().getBucketSpace()));
- _op = std::make_shared<StatBucketOperation>(*this, distributorBucketSpace, cmd);
+ auto& metrics = getMetrics().stats[cmd->getLoadType()];
+ bounce_or_invoke_read_only_op(*cmd, cmd->getBucket(), metrics, [&](auto& bucket_space_repo) {
+ auto& bucket_space = bucket_space_repo.get(cmd->getBucket().getBucketSpace());
+ _op = std::make_shared<StatBucketOperation>(*this, bucket_space, cmd);
+ });
return true;
}
IMPL_MSG_COMMAND_H(ExternalOperationHandler, GetBucketList)
{
- if (!checkDistribution(*cmd, cmd->getBucket())) {
- return true;
- }
- auto bucketSpace(cmd->getBucket().getBucketSpace());
- auto &distributorBucketSpace(_bucketSpaceRepo.get(bucketSpace));
- auto &bucketDatabase(distributorBucketSpace.getBucketDatabase());
- _op = std::make_shared<StatBucketListOperation>(bucketDatabase, _operationGenerator, getIndex(), cmd);
+ auto& metrics = getMetrics().getbucketlists[cmd->getLoadType()];
+ bounce_or_invoke_read_only_op(*cmd, cmd->getBucket(), metrics, [&](auto& bucket_space_repo) {
+ auto& bucket_space = bucket_space_repo.get(cmd->getBucket().getBucketSpace());
+ auto& bucket_database = bucket_space.getBucketDatabase();
+ _op = std::make_shared<StatBucketListOperation>(bucket_database, _operationGenerator, getIndex(), cmd);
+ });
return true;
}
IMPL_MSG_COMMAND_H(ExternalOperationHandler, CreateVisitor)
{
+ // TODO same handling as Gets (VisitorOperation needs to change)
const DistributorConfiguration& config(getDistributor().getConfig());
VisitorOperation::Config visitorConfig(config.getMinBucketsPerVisitor(), config.getMaxVisitorsPerNodePerClientVisitor());
auto &distributorBucketSpace(_bucketSpaceRepo.get(cmd->getBucket().getBucketSpace()));
diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.h b/storage/src/vespa/storage/distributor/externaloperationhandler.h
index 67fff2a9f39..bd51a914ea8 100644
--- a/storage/src/vespa/storage/distributor/externaloperationhandler.h
+++ b/storage/src/vespa/storage/distributor/externaloperationhandler.h
@@ -41,7 +41,7 @@ public:
const MaintenanceOperationGenerator&,
DistributorComponentRegister& compReg);
- ~ExternalOperationHandler();
+ ~ExternalOperationHandler() override;
bool handleMessage(const std::shared_ptr<api::StorageMessage>& msg,
Operation::SP& operation);
@@ -56,6 +56,17 @@ private:
Operation::SP _op;
TimePoint _rejectFeedBeforeTimeReached;
+ template <typename Func>
+ void bounce_or_invoke_read_only_op(api::StorageCommand& cmd,
+ const document::Bucket& bucket,
+ PersistenceOperationMetricSet& metrics,
+ Func f);
+
+ void bounce_with_wrong_distribution(api::StorageCommand& cmd);
+ void bounce_with_busy_during_state_transition(api::StorageCommand& cmd,
+ const lib::ClusterState& current_state,
+ const lib::ClusterState& pending_state);
+
bool checkSafeTimeReached(api::StorageCommand& cmd);
api::ReturnCode makeSafeTimeRejectionResult(TimePoint unsafeTime);
bool checkTimestampMutationPreconditions(
diff --git a/storage/src/vespa/storage/distributor/operations/external/getoperation.h b/storage/src/vespa/storage/distributor/operations/external/getoperation.h
index 198c588dfd1..3936f13077e 100644
--- a/storage/src/vespa/storage/distributor/operations/external/getoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/getoperation.h
@@ -34,6 +34,9 @@ public:
bool hasConsistentCopies() const;
+ // Exposed for unit testing. TODO feels a bit dirty :I
+ const DistributorBucketSpace& bucketSpace() const noexcept { return _bucketSpace; }
+
private:
class GroupId {
public: