diff options
author | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2019-10-01 11:09:36 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2019-10-01 12:20:51 +0000 |
commit | e368bcb60712305139377a97196b68f664aa2164 (patch) | |
tree | f9ca8888a4cb84f5aa105dd1e012ef8ce119f50d /storage | |
parent | 7295ea63246d69dc0b4c434c6f1773b8b5f8b5c4 (diff) |
Add support for snapshotting all state required for routing a bucket operation
Let BucketDBUpdater expose a snapshotting function which will handle
database routing based on the requested bucket and any pending
cluster state transition.
Diffstat (limited to 'storage')
10 files changed, 490 insertions, 11 deletions
diff --git a/storage/src/tests/distributor/bucketdbupdatertest.cpp b/storage/src/tests/distributor/bucketdbupdatertest.cpp index 321b0cc3bba..30b5256cf13 100644 --- a/storage/src/tests/distributor/bucketdbupdatertest.cpp +++ b/storage/src/tests/distributor/bucketdbupdatertest.cpp @@ -2,11 +2,10 @@ #include <vespa/storageapi/message/persistence.h> #include <vespa/storage/distributor/bucketdbupdater.h> +#include <vespa/storage/distributor/cluster_distribution_context.h> #include <vespa/storage/distributor/distributormetricsset.h> #include <vespa/storage/distributor/pending_bucket_space_db_transition.h> #include <vespa/storage/distributor/outdated_nodes_map.h> -#include <vespa/vespalib/io/fileutil.h> -#include <vespa/storageframework/defaultimplementation/clock/realclock.h> #include <vespa/storage/storageutil/distributorstatecache.h> #include <tests/distributor/distributortestutil.h> #include <vespa/document/test/make_document_bucket.h> @@ -124,7 +123,7 @@ public: createLinks(); _bucketSpaces = getBucketSpaces(); // Disable deferred activation by default (at least for now) to avoid breaking the entire world. - getConfig().setAllowStaleReadsDuringClusterStateTransitions(false); + getBucketDBUpdater().set_stale_reads_enabled(false); }; void TearDown() override { @@ -2415,7 +2414,7 @@ void for_each_bucket(const DistributorBucketSpaceRepo& repo, Func&& f) { } TEST_F(BucketDBUpdaterTest, non_owned_buckets_moved_to_read_only_db_on_ownership_change) { - getConfig().setAllowStaleReadsDuringClusterStateTransitions(true); + getBucketDBUpdater().set_stale_reads_enabled(true); lib::ClusterState initial_state("distributor:1 storage:4"); // All buckets owned by us by definition set_cluster_state_bundle(lib::ClusterStateBundle(initial_state, {}, false)); // Skip activation step for simplicity @@ -2468,7 +2467,7 @@ TEST_F(BucketDBUpdaterTest, buckets_no_longer_available_are_not_moved_to_read_on } TEST_F(BucketDBUpdaterTest, non_owned_buckets_purged_when_read_only_support_is_config_disabled) { - getConfig().setAllowStaleReadsDuringClusterStateTransitions(false); + getBucketDBUpdater().set_stale_reads_enabled(false); lib::ClusterState initial_state("distributor:1 storage:4"); // All buckets owned by us by definition set_cluster_state_bundle(lib::ClusterStateBundle(initial_state, {}, false)); // Skip activation step for simplicity @@ -2497,7 +2496,6 @@ void BucketDBUpdaterTest::trigger_completed_but_not_yet_activated_transition( uint32_t pending_buckets, uint32_t pending_expected_msgs) { - getConfig().setAllowStaleReadsDuringClusterStateTransitions(true); lib::ClusterState initial_state(initial_state_str); setSystemState(initial_state); ASSERT_EQ(messageCount(initial_expected_msgs), _sender.commands().size()); @@ -2514,6 +2512,7 @@ void BucketDBUpdaterTest::trigger_completed_but_not_yet_activated_transition( } TEST_F(BucketDBUpdaterTest, deferred_activated_state_does_not_enable_state_until_activation_received) { + getBucketDBUpdater().set_stale_reads_enabled(true); constexpr uint32_t n_buckets = 10; ASSERT_NO_FATAL_FAILURE( trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4, @@ -2533,6 +2532,7 @@ TEST_F(BucketDBUpdaterTest, deferred_activated_state_does_not_enable_state_until } TEST_F(BucketDBUpdaterTest, read_only_db_cleared_once_pending_state_is_activated) { + getBucketDBUpdater().set_stale_reads_enabled(true); constexpr uint32_t n_buckets = 10; ASSERT_NO_FATAL_FAILURE( trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4, @@ -2544,6 +2544,7 @@ TEST_F(BucketDBUpdaterTest, read_only_db_cleared_once_pending_state_is_activated } TEST_F(BucketDBUpdaterTest, read_only_db_is_populated_even_when_self_is_marked_down) { + getBucketDBUpdater().set_stale_reads_enabled(true); constexpr uint32_t n_buckets = 10; ASSERT_NO_FATAL_FAILURE( trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4, @@ -2557,6 +2558,7 @@ TEST_F(BucketDBUpdaterTest, read_only_db_is_populated_even_when_self_is_marked_d } TEST_F(BucketDBUpdaterTest, activate_cluster_state_request_with_mismatching_version_returns_actual_version) { + getBucketDBUpdater().set_stale_reads_enabled(true); constexpr uint32_t n_buckets = 10; ASSERT_NO_FATAL_FAILURE( trigger_completed_but_not_yet_activated_transition("version:4 distributor:1 storage:4", n_buckets, 4, @@ -2570,6 +2572,7 @@ TEST_F(BucketDBUpdaterTest, activate_cluster_state_request_with_mismatching_vers } TEST_F(BucketDBUpdaterTest, activate_cluster_state_request_without_pending_transition_passes_message_through) { + getBucketDBUpdater().set_stale_reads_enabled(true); constexpr uint32_t n_buckets = 10; ASSERT_NO_FATAL_FAILURE( trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4, @@ -2727,4 +2730,144 @@ TEST_F(BucketDBUpdaterTest, pending_cluster_state_getter_is_non_null_only_when_s EXPECT_TRUE(state == nullptr); } +struct BucketDBUpdaterSnapshotTest : BucketDBUpdaterTest { + lib::ClusterState empty_state; + std::shared_ptr<lib::ClusterState> initial_baseline; + std::shared_ptr<lib::ClusterState> initial_default; + lib::ClusterStateBundle initial_bundle; + Bucket default_bucket; + Bucket global_bucket; + + BucketDBUpdaterSnapshotTest() + : BucketDBUpdaterTest(), + empty_state(), + initial_baseline(std::make_shared<lib::ClusterState>("distributor:1 storage:2 .0.s:d")), + initial_default(std::make_shared<lib::ClusterState>("distributor:1 storage:2 .0.s:m")), + initial_bundle(*initial_baseline, {{FixedBucketSpaces::default_space(), initial_default}, + {FixedBucketSpaces::global_space(), initial_baseline}}), + default_bucket(FixedBucketSpaces::default_space(), BucketId(16, 1234)), + global_bucket(FixedBucketSpaces::global_space(), BucketId(16, 1234)) + { + } + ~BucketDBUpdaterSnapshotTest() override; + + void SetUp() override { + BucketDBUpdaterTest::SetUp(); + getBucketDBUpdater().set_stale_reads_enabled(true); + }; + + // Assumes that the distributor owns all buckets, so it may choose any arbitrary bucket in the bucket space + uint32_t buckets_in_snapshot_matching_current_db(DistributorBucketSpaceRepo& repo, BucketSpace bucket_space) { + auto def_rs = getBucketDBUpdater().read_snapshot_for_bucket(Bucket(bucket_space, BucketId(16, 1234))); + if (!def_rs.is_routable()) { + return 0; + } + auto guard = def_rs.steal_read_guard(); + uint32_t found_buckets = 0; + for_each_bucket(repo, [&](const auto& space, const auto& entry) { + if (space == bucket_space) { + std::vector<BucketDatabase::Entry> entries; + guard->find_parents_and_self(entry.getBucketId(), entries); + if (entries.size() == 1) { + ++found_buckets; + } + } + }); + return found_buckets; + } +}; + +BucketDBUpdaterSnapshotTest::~BucketDBUpdaterSnapshotTest() = default; + +TEST_F(BucketDBUpdaterSnapshotTest, default_space_snapshot_prior_to_activated_state_is_non_routable) { + auto rs = getBucketDBUpdater().read_snapshot_for_bucket(default_bucket); + EXPECT_FALSE(rs.is_routable()); +} + +TEST_F(BucketDBUpdaterSnapshotTest, global_space_snapshot_prior_to_activated_state_is_non_routable) { + auto rs = getBucketDBUpdater().read_snapshot_for_bucket(global_bucket); + EXPECT_FALSE(rs.is_routable()); +} + +TEST_F(BucketDBUpdaterSnapshotTest, read_snapshot_returns_appropriate_cluster_states) { + set_cluster_state_bundle(initial_bundle); + // State currently pending, empty initial state is active + + auto def_rs = getBucketDBUpdater().read_snapshot_for_bucket(default_bucket); + EXPECT_EQ(def_rs.context().active_cluster_state()->toString(), empty_state.toString()); + EXPECT_EQ(def_rs.context().baseline_active_cluster_state()->toString(), empty_state.toString()); + ASSERT_TRUE(def_rs.context().has_pending_state_transition()); + EXPECT_EQ(def_rs.context().pending_cluster_state()->toString(), initial_default->toString()); + + auto global_rs = getBucketDBUpdater().read_snapshot_for_bucket(global_bucket); + EXPECT_EQ(global_rs.context().active_cluster_state()->toString(), empty_state.toString()); + EXPECT_EQ(global_rs.context().baseline_active_cluster_state()->toString(), empty_state.toString()); + ASSERT_TRUE(global_rs.context().has_pending_state_transition()); + EXPECT_EQ(global_rs.context().pending_cluster_state()->toString(), initial_baseline->toString()); + + ASSERT_NO_FATAL_FAILURE(completeBucketInfoGathering(*initial_baseline, messageCount(1), 0)); + // State now activated, no pending + + def_rs = getBucketDBUpdater().read_snapshot_for_bucket(default_bucket); + EXPECT_EQ(def_rs.context().active_cluster_state()->toString(), initial_default->toString()); + EXPECT_EQ(def_rs.context().baseline_active_cluster_state()->toString(), initial_baseline->toString()); + EXPECT_FALSE(def_rs.context().has_pending_state_transition()); + + global_rs = getBucketDBUpdater().read_snapshot_for_bucket(global_bucket); + EXPECT_EQ(global_rs.context().active_cluster_state()->toString(), initial_baseline->toString()); + EXPECT_EQ(global_rs.context().baseline_active_cluster_state()->toString(), initial_baseline->toString()); + EXPECT_FALSE(global_rs.context().has_pending_state_transition()); +} + +TEST_F(BucketDBUpdaterSnapshotTest, snapshot_with_no_pending_state_transition_returns_mutable_db_guard) { + constexpr uint32_t n_buckets = 10; + ASSERT_NO_FATAL_FAILURE( + trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4, + "version:2 distributor:1 storage:4", n_buckets, 4)); + EXPECT_FALSE(activate_cluster_state_version(2)); + EXPECT_EQ(buckets_in_snapshot_matching_current_db(mutable_repo(), FixedBucketSpaces::default_space()), + n_buckets); + EXPECT_EQ(buckets_in_snapshot_matching_current_db(mutable_repo(), FixedBucketSpaces::global_space()), + n_buckets); +} + +TEST_F(BucketDBUpdaterSnapshotTest, snapshot_returns_unroutable_for_non_owned_bucket_in_current_state) { + ASSERT_NO_FATAL_FAILURE( + trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4, + "version:2 distributor:2 .0.s:d storage:4", 0, 0)); + EXPECT_FALSE(activate_cluster_state_version(2)); + // We're down in state 2 and therefore do not own any buckets + auto def_rs = getBucketDBUpdater().read_snapshot_for_bucket(default_bucket); + EXPECT_FALSE(def_rs.is_routable()); +} + +TEST_F(BucketDBUpdaterSnapshotTest, snapshot_with_pending_state_returns_read_only_guard_for_bucket_only_owned_in_current_state) { + constexpr uint32_t n_buckets = 10; + ASSERT_NO_FATAL_FAILURE( + trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4, + "version:2 distributor:2 .0.s:d storage:4", 0, 0)); + EXPECT_EQ(buckets_in_snapshot_matching_current_db(read_only_repo(), FixedBucketSpaces::default_space()), + n_buckets); + EXPECT_EQ(buckets_in_snapshot_matching_current_db(read_only_repo(), FixedBucketSpaces::global_space()), + n_buckets); +} + +TEST_F(BucketDBUpdaterSnapshotTest, snapshot_is_unroutable_if_stale_reads_disabled_and_bucket_not_owned_in_pending_state) { + getBucketDBUpdater().set_stale_reads_enabled(false); + constexpr uint32_t n_buckets = 10; + ASSERT_NO_FATAL_FAILURE( + trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4, + "version:2 distributor:2 .0.s:d storage:4", 0, 0)); + auto def_rs = getBucketDBUpdater().read_snapshot_for_bucket(default_bucket); + EXPECT_FALSE(def_rs.is_routable()); +} + + +/* + * TODO test + * - is_routable for pending/no pending + * - explicit guard (how?) + * - snapshots for pending/no pending + */ + } diff --git a/storage/src/vespa/storage/distributor/CMakeLists.txt b/storage/src/vespa/storage/distributor/CMakeLists.txt index 8c701033e67..01a4b72d411 100644 --- a/storage/src/vespa/storage/distributor/CMakeLists.txt +++ b/storage/src/vespa/storage/distributor/CMakeLists.txt @@ -7,6 +7,7 @@ vespa_add_library(storage_distributor bucket_db_prune_elision.cpp bucketgctimecalculator.cpp bucketlistmerger.cpp + cluster_distribution_context.cpp clusterinformation.cpp distributor_bucket_space.cpp distributor_bucket_space_repo.cpp @@ -20,6 +21,7 @@ vespa_add_library(storage_distributor idealstatemetricsset.cpp messagetracker.cpp nodeinfo.cpp + operation_routing_snapshot.cpp operation_sequencer.cpp operationowner.cpp operationtargetresolver.cpp diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp index a901ac28a54..b15bda7b4ca 100644 --- a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp +++ b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp @@ -2,6 +2,7 @@ #include "bucketdbupdater.h" #include "bucket_db_prune_elision.h" +#include "cluster_distribution_context.h" #include "distributor.h" #include "distributor_bucket_space.h" #include "distributormetricsset.h" @@ -30,12 +31,43 @@ BucketDBUpdater::BucketDBUpdater(Distributor& owner, : framework::StatusReporter("bucketdb", "Bucket DB Updater"), _distributorComponent(owner, bucketSpaceRepo, readOnlyBucketSpaceRepo, compReg, "Bucket DB Updater"), _sender(sender), - _transitionTimer(_distributorComponent.getClock()) + _transitionTimer(_distributorComponent.getClock()), + _active_distribution_contexts(), + _distribution_context_mutex() { + for (auto& elem : _distributorComponent.getBucketSpaceRepo()) { + _active_distribution_contexts.emplace( + elem.first, + ClusterDistributionContext::make_not_yet_initialized(_distributorComponent.getIndex())); + _explicit_transition_read_guard.emplace(elem.first, std::shared_ptr<BucketDatabase::ReadGuard>()); + } } BucketDBUpdater::~BucketDBUpdater() = default; +OperationRoutingSnapshot BucketDBUpdater::read_snapshot_for_bucket(const document::Bucket& bucket) const { + std::lock_guard lock(_distribution_context_mutex); + auto active_state_iter = _active_distribution_contexts.find(bucket.getBucketSpace()); + assert(active_state_iter != _active_distribution_contexts.cend()); + auto& state = *active_state_iter->second; + if (!state.bucket_owned_in_active_state(bucket.getBucketId())) { + return OperationRoutingSnapshot::make_not_routable_in_state(active_state_iter->second); + } + const bool bucket_present_in_mutable_db = state.bucket_owned_in_pending_state(bucket.getBucketId()); + if (!bucket_present_in_mutable_db && !stale_reads_enabled()) { + return OperationRoutingSnapshot::make_not_routable_in_state(active_state_iter->second); + } + const auto& space = bucket_present_in_mutable_db + ? _distributorComponent.getBucketSpaceRepo().get(bucket.getBucketSpace()) + : _distributorComponent.getReadOnlyBucketSpaceRepo().get(bucket.getBucketSpace()); + auto existing_guard_iter = _explicit_transition_read_guard.find(bucket.getBucketSpace()); + assert(existing_guard_iter != _explicit_transition_read_guard.cend()); + auto db_guard = existing_guard_iter->second + ? existing_guard_iter-> second + : space.getBucketDatabase().acquire_read_guard(); + return OperationRoutingSnapshot::make_routable_with_guard(active_state_iter->second, std::move(db_guard)); +} + void BucketDBUpdater::flush() { @@ -59,8 +91,7 @@ BucketDBUpdater::print(std::ostream& out, bool verbose, const std::string& inden bool BucketDBUpdater::shouldDeferStateEnabling() const noexcept { - return _distributorComponent.getDistributor().getConfig() - .allowStaleReadsDuringClusterStateTransitions(); + return stale_reads_enabled(); } bool @@ -258,6 +289,61 @@ BucketDBUpdater::replyToActivationWithActualVersion( _distributorComponent.sendUp(reply); // TODO let API accept rvalues } +void BucketDBUpdater::update_read_snapshot_before_db_pruning() { + std::lock_guard lock(_distribution_context_mutex); + for (auto& elem : _distributorComponent.getBucketSpaceRepo()) { + // At this point, we're still operating with a distribution context _without_ a + // pending state, i.e. anyone using the context will expect to find buckets + // in the DB that correspond to how the database looked like prior to pruning + // buckets from the DB. To ensure this is not violated, take a snapshot of the + // _mutable_ DB and expose this. This snapshot only lives until we atomically + // flip to expose a distribution context that includes the new, pending state. + // At that point, the read-only DB is known to contain the buckets that have + // been pruned away, so we can release the mutable DB snapshot safely. + // TODO test for, and handle, state preemption case! + _explicit_transition_read_guard[elem.first] = elem.second->getBucketDatabase().acquire_read_guard(); + } +} + + +void BucketDBUpdater::update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle& new_state) { + std::lock_guard lock(_distribution_context_mutex); + const auto old_baseline_state = _distributorComponent.getBucketSpaceRepo().get( + document::FixedBucketSpaces::default_space()).cluster_state_sp(); + for (auto& elem : _distributorComponent.getBucketSpaceRepo()) { + auto new_distribution = elem.second->distribution_sp(); + auto old_cluster_state = elem.second->cluster_state_sp(); + auto new_cluster_state = new_state.getDerivedClusterState(elem.first); + _active_distribution_contexts.insert_or_assign( + elem.first, + ClusterDistributionContext::make_state_transition( + std::move(old_cluster_state), + old_baseline_state, + std::move(new_cluster_state), + std::move(new_distribution), + _distributorComponent.getIndex())); + // We can now remove the explicit mutable DB snapshot, as the buckets that have been + // pruned away are visible in the read-only DB. + _explicit_transition_read_guard[elem.first] = std::shared_ptr<BucketDatabase::ReadGuard>(); + } +} + +void BucketDBUpdater::update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state) { + std::lock_guard lock(_distribution_context_mutex); + const auto& baseline_cluster_state = activated_state.getBaselineClusterState(); + for (auto& elem : _distributorComponent.getBucketSpaceRepo()) { + auto new_distribution = elem.second->distribution_sp(); + auto new_cluster_state = activated_state.getDerivedClusterState(elem.first); + _active_distribution_contexts.insert_or_assign( + elem.first, + ClusterDistributionContext::make_stable_state( + std::move(new_cluster_state), + baseline_cluster_state, + std::move(new_distribution), + _distributorComponent.getIndex())); + } +} + bool BucketDBUpdater::onSetSystemState( const std::shared_ptr<api::SetSystemStateCommand>& cmd) @@ -275,8 +361,10 @@ BucketDBUpdater::onSetSystemState( ensureTransitionTimerStarted(); // Separate timer since _transitionTimer might span multiple pending states. framework::MilliSecTimer process_timer(_distributorComponent.getClock()); - - removeSuperfluousBuckets(cmd->getClusterStateBundle(), false); + update_read_snapshot_before_db_pruning(); + const auto& bundle = cmd->getClusterStateBundle(); + removeSuperfluousBuckets(bundle, false); + update_read_snapshot_after_db_pruning(bundle); replyToPreviousPendingClusterStateIfAny(); ClusterInformation::CSP clusterInfo( @@ -642,6 +730,7 @@ BucketDBUpdater::activatePendingClusterState() _distributorComponent.getDistributor().notifyDistributionChangeEnabled(); } + update_read_snapshot_after_activation(_pendingClusterState->getNewClusterStateBundle()); _pendingClusterState.reset(); _outdatedNodesMap.clear(); sendAllQueuedBucketRechecks(); diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.h b/storage/src/vespa/storage/distributor/bucketdbupdater.h index e69d328d8bc..7de93774f22 100644 --- a/storage/src/vespa/storage/distributor/bucketdbupdater.h +++ b/storage/src/vespa/storage/distributor/bucketdbupdater.h @@ -6,6 +6,7 @@ #include "distributorcomponent.h" #include "distributormessagesender.h" #include "pendingclusterstate.h" +#include "operation_routing_snapshot.h" #include "outdated_nodes_map.h" #include <vespa/document/bucket/bucket.h> #include <vespa/storageapi/messageapi/returncode.h> @@ -15,7 +16,9 @@ #include <vespa/storageframework/generic/clock/timer.h> #include <vespa/storageframework/generic/status/statusreporter.h> #include <vespa/storageapi/messageapi/messagehandler.h> +#include <atomic> #include <list> +#include <mutex> namespace vespalib::xml { class XmlOutputStream; @@ -25,6 +28,7 @@ class XmlAttribute; namespace storage::distributor { class Distributor; +class ClusterDistributionContext; class BucketDBUpdater : public framework::StatusReporter, public api::MessageHandler @@ -70,7 +74,14 @@ public: return ((_pendingClusterState.get() != nullptr) && _pendingClusterState->hasBucketOwnershipTransfer()); } + void set_stale_reads_enabled(bool enabled) noexcept { + _stale_reads_enabled.store(enabled, std::memory_order_relaxed); + } + bool stale_reads_enabled() const noexcept { + return _stale_reads_enabled.load(std::memory_order_relaxed); + } + OperationRoutingSnapshot read_snapshot_for_bucket(const document::Bucket&) const; private: DistributorComponent _distributorComponent; class MergeReplyGuard { @@ -166,8 +177,11 @@ private: void updateState(const lib::ClusterState& oldState, const lib::ClusterState& newState); + void update_read_snapshot_before_db_pruning(); void removeSuperfluousBuckets(const lib::ClusterStateBundle& newState, bool is_distribution_config_change); + void update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle& new_state); + void update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state); void replyToPreviousPendingClusterStateIfAny(); void replyToActivationWithActualVersion( @@ -235,6 +249,16 @@ private: std::set<EnqueuedBucketRecheck> _enqueuedRechecks; OutdatedNodesMap _outdatedNodesMap; framework::MilliSecTimer _transitionTimer; + std::atomic<bool> _stale_reads_enabled; + using DistributionContexts = std::unordered_map<document::BucketSpace, + std::shared_ptr<ClusterDistributionContext>, + document::BucketSpace::hash>; + DistributionContexts _active_distribution_contexts; + using DbGuards = std::unordered_map<document::BucketSpace, + std::shared_ptr<BucketDatabase::ReadGuard>, + document::BucketSpace::hash>; + DbGuards _explicit_transition_read_guard; + mutable std::mutex _distribution_context_mutex; }; } diff --git a/storage/src/vespa/storage/distributor/cluster_distribution_context.cpp b/storage/src/vespa/storage/distributor/cluster_distribution_context.cpp new file mode 100644 index 00000000000..6ec7395c3f5 --- /dev/null +++ b/storage/src/vespa/storage/distributor/cluster_distribution_context.cpp @@ -0,0 +1,79 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "cluster_distribution_context.h" + +namespace storage::distributor { + +ClusterDistributionContext::~ClusterDistributionContext() = default; + +ClusterDistributionContext::ClusterDistributionContext( + std::shared_ptr<const lib::ClusterState> active_cluster_state, + std::shared_ptr<const lib::ClusterState> baseline_active_cluster_state, + std::shared_ptr<const lib::ClusterState> pending_cluster_state, + std::shared_ptr<const lib::Distribution> distribution, + uint16_t this_node_index) + : _active_cluster_state(std::move(active_cluster_state)), + _baseline_active_cluster_state(std::move(baseline_active_cluster_state)), + _pending_cluster_state(std::move(pending_cluster_state)), + _distribution(std::move(distribution)), + _this_node_index(this_node_index) +{} + +std::shared_ptr<ClusterDistributionContext> ClusterDistributionContext::make_state_transition( + std::shared_ptr<const lib::ClusterState> active_cluster_state, + std::shared_ptr<const lib::ClusterState> baseline_active_cluster_state, + std::shared_ptr<const lib::ClusterState> pending_cluster_state, + std::shared_ptr<const lib::Distribution> distribution, + uint16_t this_node_index) +{ + return std::make_shared<ClusterDistributionContext>( + std::move(active_cluster_state), std::move(baseline_active_cluster_state), + std::move(pending_cluster_state), std::move(distribution), + this_node_index); +} + +std::shared_ptr<ClusterDistributionContext> ClusterDistributionContext::make_stable_state( + std::shared_ptr<const lib::ClusterState> active_cluster_state, + std::shared_ptr<const lib::ClusterState> baseline_active_cluster_state, + std::shared_ptr<const lib::Distribution> distribution, + uint16_t this_node_index) +{ + return std::make_shared<ClusterDistributionContext>( + std::move(active_cluster_state), std::move(baseline_active_cluster_state), + std::shared_ptr<const lib::ClusterState>(), + std::move(distribution), this_node_index); +} + +std::shared_ptr<ClusterDistributionContext> ClusterDistributionContext::make_not_yet_initialized(uint16_t this_node_index) { + return std::make_shared<ClusterDistributionContext>( + std::make_shared<const lib::ClusterState>(), + std::make_shared<const lib::ClusterState>(), + std::shared_ptr<const lib::ClusterState>(), + std::make_shared<const lib::Distribution>(), + this_node_index); +} + +bool ClusterDistributionContext::bucket_owned_in_state(const lib::ClusterState& state, + const document::BucketId& id) const +{ + try { + uint16_t owner_idx = _distribution->getIdealDistributorNode(state, id); + return (owner_idx == _this_node_index); + } catch (lib::TooFewBucketBitsInUseException&) { + return false; + } catch (lib::NoDistributorsAvailableException&) { + return false; + } +} + +bool ClusterDistributionContext::bucket_owned_in_active_state(const document::BucketId& id) const { + return bucket_owned_in_state(*_active_cluster_state, id); +} + +bool ClusterDistributionContext::bucket_owned_in_pending_state(const document::BucketId& id) const { + if (_pending_cluster_state) { + return bucket_owned_in_state(*_pending_cluster_state, id); + } + return true; // No pending state, owned by default. +} + +} diff --git a/storage/src/vespa/storage/distributor/cluster_distribution_context.h b/storage/src/vespa/storage/distributor/cluster_distribution_context.h new file mode 100644 index 00000000000..f56a3f37cfa --- /dev/null +++ b/storage/src/vespa/storage/distributor/cluster_distribution_context.h @@ -0,0 +1,67 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/vdslib/distribution/distribution.h> +#include <vespa/vdslib/state/clusterstate.h> +#include <memory> +#include <cstdint> + +namespace storage::distributor { + +/** + * TODO desc + * TODO rename to be bucket space specific? + */ +class ClusterDistributionContext { + std::shared_ptr<const lib::ClusterState> _active_cluster_state; + std::shared_ptr<const lib::ClusterState> _baseline_active_cluster_state; + std::shared_ptr<const lib::ClusterState> _pending_cluster_state; // May be null if no state is pending + std::shared_ptr<const lib::Distribution> _distribution; // TODO ideally should have a pending distribution as well + uint16_t _this_node_index; +public: + // Public due to make_shared, prefer factory functions instead. + ClusterDistributionContext(std::shared_ptr<const lib::ClusterState> active_cluster_state, + std::shared_ptr<const lib::ClusterState> baseline_active_cluster_state, + std::shared_ptr<const lib::ClusterState> pending_cluster_state, + std::shared_ptr<const lib::Distribution> distribution, + uint16_t this_node_index); + + ClusterDistributionContext() = delete; + ~ClusterDistributionContext(); + + static std::shared_ptr<ClusterDistributionContext> make_state_transition( + std::shared_ptr<const lib::ClusterState> active_cluster_state, + std::shared_ptr<const lib::ClusterState> baseline_active_cluster_state, + std::shared_ptr<const lib::ClusterState> pending_cluster_state, + std::shared_ptr<const lib::Distribution> distribution, + uint16_t this_node_index); + static std::shared_ptr<ClusterDistributionContext> make_stable_state( + std::shared_ptr<const lib::ClusterState> active_cluster_state, + std::shared_ptr<const lib::ClusterState> baseline_active_cluster_state, + std::shared_ptr<const lib::Distribution> distribution, + uint16_t this_node_index); + static std::shared_ptr<ClusterDistributionContext> make_not_yet_initialized(uint16_t this_node_index); + + const std::shared_ptr<const lib::ClusterState>& active_cluster_state() const noexcept { + return _active_cluster_state; + } + + const std::shared_ptr<const lib::ClusterState>& baseline_active_cluster_state() const noexcept { + return _baseline_active_cluster_state; + } + bool has_pending_state_transition() const noexcept { + return (_pending_cluster_state.get() != nullptr); + } + // Returned shared_ptr is nullptr iff has_pending_state_transition() == false. + const std::shared_ptr<const lib::ClusterState>& pending_cluster_state() const noexcept { + return _pending_cluster_state; + } + + bool bucket_owned_in_state(const lib::ClusterState& state, const document::BucketId& id) const; + bool bucket_owned_in_active_state(const document::BucketId& id) const; + bool bucket_owned_in_pending_state(const document::BucketId& id) const; + + uint16_t this_node_index() const noexcept { return _this_node_index; } +}; + +} diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp index 69b64ac8dc1..5e2cd42bb46 100644 --- a/storage/src/vespa/storage/distributor/distributor.cpp +++ b/storage/src/vespa/storage/distributor/distributor.cpp @@ -834,6 +834,7 @@ Distributor::enableNextConfig() _bucketDBMetricUpdater.setMinimumReplicaCountingMode(getConfig().getMinimumReplicaCountingMode()); _ownershipSafeTimeCalc->setMaxClusterClockSkew(getConfig().getMaxClusterClockSkew()); _pendingMessageTracker.setNodeBusyDuration(getConfig().getInhibitMergesOnBusyNodeDuration()); + _bucketDBUpdater.set_stale_reads_enabled(getConfig().allowStaleReadsDuringClusterStateTransitions()); } void diff --git a/storage/src/vespa/storage/distributor/distributor_bucket_space.h b/storage/src/vespa/storage/distributor/distributor_bucket_space.h index 26a0ee9098c..8fbb99dfe89 100644 --- a/storage/src/vespa/storage/distributor/distributor_bucket_space.h +++ b/storage/src/vespa/storage/distributor/distributor_bucket_space.h @@ -48,6 +48,9 @@ public: void setClusterState(std::shared_ptr<const lib::ClusterState> clusterState); const lib::ClusterState &getClusterState() const noexcept { return *_clusterState; } + const std::shared_ptr<const lib::ClusterState>& cluster_state_sp() const noexcept { + return _clusterState; + } void setDistribution(std::shared_ptr<const lib::Distribution> distribution); @@ -55,6 +58,9 @@ public: const lib::Distribution& getDistribution() const noexcept { return *_distribution; } + const std::shared_ptr<const lib::Distribution>& distribution_sp() const noexcept { + return _distribution; + } }; diff --git a/storage/src/vespa/storage/distributor/operation_routing_snapshot.cpp b/storage/src/vespa/storage/distributor/operation_routing_snapshot.cpp new file mode 100644 index 00000000000..8300f715cac --- /dev/null +++ b/storage/src/vespa/storage/distributor/operation_routing_snapshot.cpp @@ -0,0 +1,27 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "operation_routing_snapshot.h" + +namespace storage::distributor { + +OperationRoutingSnapshot::OperationRoutingSnapshot(std::shared_ptr<ClusterDistributionContext> context, + std::shared_ptr<BucketDatabase::ReadGuard> read_guard) + : _context(std::move(context)), + _read_guard(std::move(read_guard)) +{} + +OperationRoutingSnapshot::~OperationRoutingSnapshot() = default; + +OperationRoutingSnapshot OperationRoutingSnapshot::make_not_routable_in_state( + std::shared_ptr<ClusterDistributionContext> context) +{ + return OperationRoutingSnapshot(std::move(context), std::shared_ptr<BucketDatabase::ReadGuard>()); +} + +OperationRoutingSnapshot OperationRoutingSnapshot::make_routable_with_guard( + std::shared_ptr<ClusterDistributionContext> context, + std::shared_ptr<BucketDatabase::ReadGuard> read_guard) +{ + return OperationRoutingSnapshot(std::move(context), std::move(read_guard)); +} + +} diff --git a/storage/src/vespa/storage/distributor/operation_routing_snapshot.h b/storage/src/vespa/storage/distributor/operation_routing_snapshot.h new file mode 100644 index 00000000000..0bd8c5d928e --- /dev/null +++ b/storage/src/vespa/storage/distributor/operation_routing_snapshot.h @@ -0,0 +1,41 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/storage/bucketdb/bucketdatabase.h> +#include <memory> + +namespace storage::distributor { + +class ClusterDistributionContext; + +/** + * TODO desc + */ +class OperationRoutingSnapshot { + std::shared_ptr<ClusterDistributionContext> _context; + std::shared_ptr<BucketDatabase::ReadGuard> _read_guard; +public: + OperationRoutingSnapshot(std::shared_ptr<ClusterDistributionContext> context, + std::shared_ptr<BucketDatabase::ReadGuard> read_guard); + + static OperationRoutingSnapshot make_not_routable_in_state(std::shared_ptr<ClusterDistributionContext> context); + static OperationRoutingSnapshot make_routable_with_guard(std::shared_ptr<ClusterDistributionContext> context, + std::shared_ptr<BucketDatabase::ReadGuard> read_guard); + + OperationRoutingSnapshot(const OperationRoutingSnapshot&) noexcept = default; + OperationRoutingSnapshot& operator=(const OperationRoutingSnapshot&) noexcept = default; + OperationRoutingSnapshot(OperationRoutingSnapshot&&) noexcept = default; + OperationRoutingSnapshot& operator=(OperationRoutingSnapshot&&) noexcept = default; + + ~OperationRoutingSnapshot(); + + const ClusterDistributionContext& context() const noexcept { return *_context; } + std::shared_ptr<BucketDatabase::ReadGuard> steal_read_guard() noexcept { + return std::move(_read_guard); + } + bool is_routable() const noexcept { + return (_read_guard.get() != nullptr); + } +}; + +} |