diff options
author | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2019-10-02 14:58:16 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2019-10-03 09:14:48 +0000 |
commit | b2c66836548ffa4aabc380195026fdbd844d40e7 (patch) | |
tree | 7dd2be2a23ab7c4384ecdec02b26cb4f2e651c9a /storage/src | |
parent | e368bcb60712305139377a97196b68f664aa2164 (diff) |
Rewrite Get operation starting to use explicit snapshotting
Diffstat (limited to 'storage/src')
17 files changed, 185 insertions, 114 deletions
diff --git a/storage/src/tests/distributor/bucketdbupdatertest.cpp b/storage/src/tests/distributor/bucketdbupdatertest.cpp index 30b5256cf13..8409bd60986 100644 --- a/storage/src/tests/distributor/bucketdbupdatertest.cpp +++ b/storage/src/tests/distributor/bucketdbupdatertest.cpp @@ -2,7 +2,7 @@ #include <vespa/storageapi/message/persistence.h> #include <vespa/storage/distributor/bucketdbupdater.h> -#include <vespa/storage/distributor/cluster_distribution_context.h> +#include <vespa/storage/distributor/bucket_space_distribution_context.h> #include <vespa/storage/distributor/distributormetricsset.h> #include <vespa/storage/distributor/pending_bucket_space_db_transition.h> #include <vespa/storage/distributor/outdated_nodes_map.h> @@ -2758,11 +2758,11 @@ struct BucketDBUpdaterSnapshotTest : BucketDBUpdaterTest { // Assumes that the distributor owns all buckets, so it may choose any arbitrary bucket in the bucket space uint32_t buckets_in_snapshot_matching_current_db(DistributorBucketSpaceRepo& repo, BucketSpace bucket_space) { - auto def_rs = getBucketDBUpdater().read_snapshot_for_bucket(Bucket(bucket_space, BucketId(16, 1234))); - if (!def_rs.is_routable()) { + auto rs = getBucketDBUpdater().read_snapshot_for_bucket(Bucket(bucket_space, BucketId(16, 1234))); + if (!rs.is_routable()) { return 0; } - auto guard = def_rs.steal_read_guard(); + auto guard = rs.steal_read_guard(); uint32_t found_buckets = 0; for_each_bucket(repo, [&](const auto& space, const auto& entry) { if (space == bucket_space) { @@ -2795,13 +2795,13 @@ TEST_F(BucketDBUpdaterSnapshotTest, read_snapshot_returns_appropriate_cluster_st auto def_rs = getBucketDBUpdater().read_snapshot_for_bucket(default_bucket); EXPECT_EQ(def_rs.context().active_cluster_state()->toString(), empty_state.toString()); - EXPECT_EQ(def_rs.context().baseline_active_cluster_state()->toString(), empty_state.toString()); + EXPECT_EQ(def_rs.context().default_active_cluster_state()->toString(), empty_state.toString()); ASSERT_TRUE(def_rs.context().has_pending_state_transition()); EXPECT_EQ(def_rs.context().pending_cluster_state()->toString(), initial_default->toString()); auto global_rs = getBucketDBUpdater().read_snapshot_for_bucket(global_bucket); EXPECT_EQ(global_rs.context().active_cluster_state()->toString(), empty_state.toString()); - EXPECT_EQ(global_rs.context().baseline_active_cluster_state()->toString(), empty_state.toString()); + EXPECT_EQ(global_rs.context().default_active_cluster_state()->toString(), empty_state.toString()); ASSERT_TRUE(global_rs.context().has_pending_state_transition()); EXPECT_EQ(global_rs.context().pending_cluster_state()->toString(), initial_baseline->toString()); @@ -2810,12 +2810,12 @@ TEST_F(BucketDBUpdaterSnapshotTest, read_snapshot_returns_appropriate_cluster_st def_rs = getBucketDBUpdater().read_snapshot_for_bucket(default_bucket); EXPECT_EQ(def_rs.context().active_cluster_state()->toString(), initial_default->toString()); - EXPECT_EQ(def_rs.context().baseline_active_cluster_state()->toString(), initial_baseline->toString()); + EXPECT_EQ(def_rs.context().default_active_cluster_state()->toString(), initial_default->toString()); EXPECT_FALSE(def_rs.context().has_pending_state_transition()); global_rs = getBucketDBUpdater().read_snapshot_for_bucket(global_bucket); EXPECT_EQ(global_rs.context().active_cluster_state()->toString(), initial_baseline->toString()); - EXPECT_EQ(global_rs.context().baseline_active_cluster_state()->toString(), initial_baseline->toString()); + EXPECT_EQ(global_rs.context().default_active_cluster_state()->toString(), initial_default->toString()); EXPECT_FALSE(global_rs.context().has_pending_state_transition()); } @@ -2862,12 +2862,4 @@ TEST_F(BucketDBUpdaterSnapshotTest, snapshot_is_unroutable_if_stale_reads_disabl EXPECT_FALSE(def_rs.is_routable()); } - -/* - * TODO test - * - is_routable for pending/no pending - * - explicit guard (how?) - * - snapshots for pending/no pending - */ - } diff --git a/storage/src/tests/distributor/distributortestutil.cpp b/storage/src/tests/distributor/distributortestutil.cpp index 4a9ef147741..15820b64ff9 100644 --- a/storage/src/tests/distributor/distributortestutil.cpp +++ b/storage/src/tests/distributor/distributortestutil.cpp @@ -417,7 +417,8 @@ DistributorTestUtil::getBucketSpaces() const void DistributorTestUtil::enableDistributorClusterState(vespalib::stringref state) { - _distributor->enableClusterStateBundle(lib::ClusterStateBundle(lib::ClusterState(state))); + getBucketDBUpdater().simulate_cluster_state_bundle_activation( + lib::ClusterStateBundle(lib::ClusterState(state))); } } diff --git a/storage/src/tests/distributor/externaloperationhandlertest.cpp b/storage/src/tests/distributor/externaloperationhandlertest.cpp index 600e56faf31..84f7d34d069 100644 --- a/storage/src/tests/distributor/externaloperationhandlertest.cpp +++ b/storage/src/tests/distributor/externaloperationhandlertest.cpp @@ -505,6 +505,7 @@ document::BucketId ExternalOperationHandlerTest::set_up_pending_cluster_state_tr std::string current = "version:123 distributor:2 storage:2"; std::string pending = "version:321 distributor:3 storage:3"; setupDistributor(1, 3, current); + getBucketDBUpdater().set_stale_reads_enabled(read_only_enabled); getConfig().setAllowStaleReadsDuringClusterStateTransitions(read_only_enabled); // Trigger pending cluster state diff --git a/storage/src/vespa/storage/distributor/CMakeLists.txt b/storage/src/vespa/storage/distributor/CMakeLists.txt index 01a4b72d411..944df6e1708 100644 --- a/storage/src/vespa/storage/distributor/CMakeLists.txt +++ b/storage/src/vespa/storage/distributor/CMakeLists.txt @@ -7,7 +7,7 @@ vespa_add_library(storage_distributor bucket_db_prune_elision.cpp bucketgctimecalculator.cpp bucketlistmerger.cpp - cluster_distribution_context.cpp + bucket_space_distribution_context.cpp clusterinformation.cpp distributor_bucket_space.cpp distributor_bucket_space_repo.cpp diff --git a/storage/src/vespa/storage/distributor/cluster_distribution_context.cpp b/storage/src/vespa/storage/distributor/bucket_space_distribution_context.cpp index 6ec7395c3f5..53040bc42b1 100644 --- a/storage/src/vespa/storage/distributor/cluster_distribution_context.cpp +++ b/storage/src/vespa/storage/distributor/bucket_space_distribution_context.cpp @@ -1,50 +1,52 @@ // Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include "cluster_distribution_context.h" +#include "bucket_space_distribution_context.h" namespace storage::distributor { -ClusterDistributionContext::~ClusterDistributionContext() = default; +BucketSpaceDistributionContext::~BucketSpaceDistributionContext() = default; -ClusterDistributionContext::ClusterDistributionContext( +BucketSpaceDistributionContext::BucketSpaceDistributionContext( std::shared_ptr<const lib::ClusterState> active_cluster_state, - std::shared_ptr<const lib::ClusterState> baseline_active_cluster_state, + std::shared_ptr<const lib::ClusterState> default_active_cluster_state, std::shared_ptr<const lib::ClusterState> pending_cluster_state, std::shared_ptr<const lib::Distribution> distribution, uint16_t this_node_index) : _active_cluster_state(std::move(active_cluster_state)), - _baseline_active_cluster_state(std::move(baseline_active_cluster_state)), + _default_active_cluster_state(std::move(default_active_cluster_state)), _pending_cluster_state(std::move(pending_cluster_state)), _distribution(std::move(distribution)), _this_node_index(this_node_index) {} -std::shared_ptr<ClusterDistributionContext> ClusterDistributionContext::make_state_transition( +std::shared_ptr<BucketSpaceDistributionContext> BucketSpaceDistributionContext::make_state_transition( std::shared_ptr<const lib::ClusterState> active_cluster_state, - std::shared_ptr<const lib::ClusterState> baseline_active_cluster_state, + std::shared_ptr<const lib::ClusterState> default_active_cluster_state, std::shared_ptr<const lib::ClusterState> pending_cluster_state, std::shared_ptr<const lib::Distribution> distribution, uint16_t this_node_index) { - return std::make_shared<ClusterDistributionContext>( - std::move(active_cluster_state), std::move(baseline_active_cluster_state), + return std::make_shared<BucketSpaceDistributionContext>( + std::move(active_cluster_state), std::move(default_active_cluster_state), std::move(pending_cluster_state), std::move(distribution), this_node_index); } -std::shared_ptr<ClusterDistributionContext> ClusterDistributionContext::make_stable_state( +std::shared_ptr<BucketSpaceDistributionContext> BucketSpaceDistributionContext::make_stable_state( std::shared_ptr<const lib::ClusterState> active_cluster_state, - std::shared_ptr<const lib::ClusterState> baseline_active_cluster_state, + std::shared_ptr<const lib::ClusterState> default_active_cluster_state, std::shared_ptr<const lib::Distribution> distribution, uint16_t this_node_index) { - return std::make_shared<ClusterDistributionContext>( - std::move(active_cluster_state), std::move(baseline_active_cluster_state), + return std::make_shared<BucketSpaceDistributionContext>( + std::move(active_cluster_state), std::move(default_active_cluster_state), std::shared_ptr<const lib::ClusterState>(), std::move(distribution), this_node_index); } -std::shared_ptr<ClusterDistributionContext> ClusterDistributionContext::make_not_yet_initialized(uint16_t this_node_index) { - return std::make_shared<ClusterDistributionContext>( +std::shared_ptr<BucketSpaceDistributionContext> +BucketSpaceDistributionContext::make_not_yet_initialized(uint16_t this_node_index) +{ + return std::make_shared<BucketSpaceDistributionContext>( std::make_shared<const lib::ClusterState>(), std::make_shared<const lib::ClusterState>(), std::shared_ptr<const lib::ClusterState>(), @@ -52,8 +54,8 @@ std::shared_ptr<ClusterDistributionContext> ClusterDistributionContext::make_not this_node_index); } -bool ClusterDistributionContext::bucket_owned_in_state(const lib::ClusterState& state, - const document::BucketId& id) const +bool BucketSpaceDistributionContext::bucket_owned_in_state(const lib::ClusterState& state, + const document::BucketId& id) const { try { uint16_t owner_idx = _distribution->getIdealDistributorNode(state, id); @@ -65,11 +67,11 @@ bool ClusterDistributionContext::bucket_owned_in_state(const lib::ClusterState& } } -bool ClusterDistributionContext::bucket_owned_in_active_state(const document::BucketId& id) const { +bool BucketSpaceDistributionContext::bucket_owned_in_active_state(const document::BucketId& id) const { return bucket_owned_in_state(*_active_cluster_state, id); } -bool ClusterDistributionContext::bucket_owned_in_pending_state(const document::BucketId& id) const { +bool BucketSpaceDistributionContext::bucket_owned_in_pending_state(const document::BucketId& id) const { if (_pending_cluster_state) { return bucket_owned_in_state(*_pending_cluster_state, id); } diff --git a/storage/src/vespa/storage/distributor/cluster_distribution_context.h b/storage/src/vespa/storage/distributor/bucket_space_distribution_context.h index f56a3f37cfa..7a9c0fcae60 100644 --- a/storage/src/vespa/storage/distributor/cluster_distribution_context.h +++ b/storage/src/vespa/storage/distributor/bucket_space_distribution_context.h @@ -9,45 +9,48 @@ namespace storage::distributor { /** - * TODO desc - * TODO rename to be bucket space specific? + * Represents a consistent snapshot of cluster state and distribution config + * information at a particular point in time. This is sufficient to compute + * bucket ownership and distributions for the bucket space associated with + * the context. + * + * Since this is a snapshot in time, the context is immutable once created. */ -class ClusterDistributionContext { +class BucketSpaceDistributionContext { std::shared_ptr<const lib::ClusterState> _active_cluster_state; - std::shared_ptr<const lib::ClusterState> _baseline_active_cluster_state; + std::shared_ptr<const lib::ClusterState> _default_active_cluster_state; std::shared_ptr<const lib::ClusterState> _pending_cluster_state; // May be null if no state is pending std::shared_ptr<const lib::Distribution> _distribution; // TODO ideally should have a pending distribution as well uint16_t _this_node_index; public: - // Public due to make_shared, prefer factory functions instead. - ClusterDistributionContext(std::shared_ptr<const lib::ClusterState> active_cluster_state, - std::shared_ptr<const lib::ClusterState> baseline_active_cluster_state, - std::shared_ptr<const lib::ClusterState> pending_cluster_state, - std::shared_ptr<const lib::Distribution> distribution, - uint16_t this_node_index); + BucketSpaceDistributionContext() = delete; + // Public due to make_shared, prefer factory functions to instantiate instead. + BucketSpaceDistributionContext(std::shared_ptr<const lib::ClusterState> active_cluster_state, + std::shared_ptr<const lib::ClusterState> default_active_cluster_state, + std::shared_ptr<const lib::ClusterState> pending_cluster_state, + std::shared_ptr<const lib::Distribution> distribution, + uint16_t this_node_index); + ~BucketSpaceDistributionContext(); - ClusterDistributionContext() = delete; - ~ClusterDistributionContext(); - - static std::shared_ptr<ClusterDistributionContext> make_state_transition( + static std::shared_ptr<BucketSpaceDistributionContext> make_state_transition( std::shared_ptr<const lib::ClusterState> active_cluster_state, - std::shared_ptr<const lib::ClusterState> baseline_active_cluster_state, + std::shared_ptr<const lib::ClusterState> default_active_cluster_state, std::shared_ptr<const lib::ClusterState> pending_cluster_state, std::shared_ptr<const lib::Distribution> distribution, uint16_t this_node_index); - static std::shared_ptr<ClusterDistributionContext> make_stable_state( + static std::shared_ptr<BucketSpaceDistributionContext> make_stable_state( std::shared_ptr<const lib::ClusterState> active_cluster_state, - std::shared_ptr<const lib::ClusterState> baseline_active_cluster_state, + std::shared_ptr<const lib::ClusterState> default_active_cluster_state, std::shared_ptr<const lib::Distribution> distribution, uint16_t this_node_index); - static std::shared_ptr<ClusterDistributionContext> make_not_yet_initialized(uint16_t this_node_index); + static std::shared_ptr<BucketSpaceDistributionContext> make_not_yet_initialized(uint16_t this_node_index); const std::shared_ptr<const lib::ClusterState>& active_cluster_state() const noexcept { return _active_cluster_state; } - const std::shared_ptr<const lib::ClusterState>& baseline_active_cluster_state() const noexcept { - return _baseline_active_cluster_state; + const std::shared_ptr<const lib::ClusterState>& default_active_cluster_state() const noexcept { + return _default_active_cluster_state; } bool has_pending_state_transition() const noexcept { return (_pending_cluster_state.get() != nullptr); diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp index b15bda7b4ca..227165a0911 100644 --- a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp +++ b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp @@ -2,7 +2,7 @@ #include "bucketdbupdater.h" #include "bucket_db_prune_elision.h" -#include "cluster_distribution_context.h" +#include "bucket_space_distribution_context.h" #include "distributor.h" #include "distributor_bucket_space.h" #include "distributormetricsset.h" @@ -38,7 +38,7 @@ BucketDBUpdater::BucketDBUpdater(Distributor& owner, for (auto& elem : _distributorComponent.getBucketSpaceRepo()) { _active_distribution_contexts.emplace( elem.first, - ClusterDistributionContext::make_not_yet_initialized(_distributorComponent.getIndex())); + BucketSpaceDistributionContext::make_not_yet_initialized(_distributorComponent.getIndex())); _explicit_transition_read_guard.emplace(elem.first, std::shared_ptr<BucketDatabase::ReadGuard>()); } } @@ -46,8 +46,9 @@ BucketDBUpdater::BucketDBUpdater(Distributor& owner, BucketDBUpdater::~BucketDBUpdater() = default; OperationRoutingSnapshot BucketDBUpdater::read_snapshot_for_bucket(const document::Bucket& bucket) const { + const auto bucket_space = bucket.getBucketSpace(); std::lock_guard lock(_distribution_context_mutex); - auto active_state_iter = _active_distribution_contexts.find(bucket.getBucketSpace()); + auto active_state_iter = _active_distribution_contexts.find(bucket_space); assert(active_state_iter != _active_distribution_contexts.cend()); auto& state = *active_state_iter->second; if (!state.bucket_owned_in_active_state(bucket.getBucketId())) { @@ -57,15 +58,15 @@ OperationRoutingSnapshot BucketDBUpdater::read_snapshot_for_bucket(const documen if (!bucket_present_in_mutable_db && !stale_reads_enabled()) { return OperationRoutingSnapshot::make_not_routable_in_state(active_state_iter->second); } - const auto& space = bucket_present_in_mutable_db - ? _distributorComponent.getBucketSpaceRepo().get(bucket.getBucketSpace()) - : _distributorComponent.getReadOnlyBucketSpaceRepo().get(bucket.getBucketSpace()); - auto existing_guard_iter = _explicit_transition_read_guard.find(bucket.getBucketSpace()); + const auto& space_repo = bucket_present_in_mutable_db + ? _distributorComponent.getBucketSpaceRepo() + : _distributorComponent.getReadOnlyBucketSpaceRepo(); + auto existing_guard_iter = _explicit_transition_read_guard.find(bucket_space); assert(existing_guard_iter != _explicit_transition_read_guard.cend()); auto db_guard = existing_guard_iter->second ? existing_guard_iter-> second - : space.getBucketDatabase().acquire_read_guard(); - return OperationRoutingSnapshot::make_routable_with_guard(active_state_iter->second, std::move(db_guard)); + : space_repo.get(bucket_space).getBucketDatabase().acquire_read_guard(); + return OperationRoutingSnapshot::make_routable_with_guard(active_state_iter->second, std::move(db_guard), space_repo); } void @@ -308,7 +309,7 @@ void BucketDBUpdater::update_read_snapshot_before_db_pruning() { void BucketDBUpdater::update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle& new_state) { std::lock_guard lock(_distribution_context_mutex); - const auto old_baseline_state = _distributorComponent.getBucketSpaceRepo().get( + const auto old_default_state = _distributorComponent.getBucketSpaceRepo().get( document::FixedBucketSpaces::default_space()).cluster_state_sp(); for (auto& elem : _distributorComponent.getBucketSpaceRepo()) { auto new_distribution = elem.second->distribution_sp(); @@ -316,9 +317,9 @@ void BucketDBUpdater::update_read_snapshot_after_db_pruning(const lib::ClusterSt auto new_cluster_state = new_state.getDerivedClusterState(elem.first); _active_distribution_contexts.insert_or_assign( elem.first, - ClusterDistributionContext::make_state_transition( + BucketSpaceDistributionContext::make_state_transition( std::move(old_cluster_state), - old_baseline_state, + old_default_state, std::move(new_cluster_state), std::move(new_distribution), _distributorComponent.getIndex())); @@ -330,15 +331,15 @@ void BucketDBUpdater::update_read_snapshot_after_db_pruning(const lib::ClusterSt void BucketDBUpdater::update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state) { std::lock_guard lock(_distribution_context_mutex); - const auto& baseline_cluster_state = activated_state.getBaselineClusterState(); + const auto& default_cluster_state = activated_state.getDerivedClusterState(document::FixedBucketSpaces::default_space()); for (auto& elem : _distributorComponent.getBucketSpaceRepo()) { auto new_distribution = elem.second->distribution_sp(); auto new_cluster_state = activated_state.getDerivedClusterState(elem.first); _active_distribution_contexts.insert_or_assign( elem.first, - ClusterDistributionContext::make_stable_state( + BucketSpaceDistributionContext::make_stable_state( std::move(new_cluster_state), - baseline_cluster_state, + default_cluster_state, std::move(new_distribution), _distributorComponent.getIndex())); } @@ -754,6 +755,11 @@ BucketDBUpdater::enableCurrentClusterStateBundleInDistributor() _distributorComponent.getDistributor().enableClusterStateBundle(state); } +void BucketDBUpdater::simulate_cluster_state_bundle_activation(const lib::ClusterStateBundle& activated_state) { + update_read_snapshot_after_activation(activated_state); + _distributorComponent.getDistributor().enableClusterStateBundle(activated_state); +} + void BucketDBUpdater::addCurrentStateToClusterStateHistory() { diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.h b/storage/src/vespa/storage/distributor/bucketdbupdater.h index 7de93774f22..86ceab14486 100644 --- a/storage/src/vespa/storage/distributor/bucketdbupdater.h +++ b/storage/src/vespa/storage/distributor/bucketdbupdater.h @@ -28,7 +28,7 @@ class XmlAttribute; namespace storage::distributor { class Distributor; -class ClusterDistributionContext; +class BucketSpaceDistributionContext; class BucketDBUpdater : public framework::StatusReporter, public api::MessageHandler @@ -140,6 +140,12 @@ private: } }; + friend class DistributorTestUtil; + // Only to be used by tests that want to ensure both the BucketDBUpdater _and_ the Distributor + // components agree on the currently active cluster state bundle. + // Transitively invokes Distributor::enableClusterStateBundle + void simulate_cluster_state_bundle_activation(const lib::ClusterStateBundle& activated_state); + bool shouldDeferStateEnabling() const noexcept; bool hasPendingClusterState() const; bool pendingClusterStateAccepted(const std::shared_ptr<api::RequestBucketInfoReply>& repl); @@ -196,9 +202,6 @@ private: void maybe_inject_simulated_db_pruning_delay(); void maybe_inject_simulated_db_merging_delay(); - friend class BucketDBUpdater_Test; - friend class MergeOperation_Test; - /** Removes all copies of buckets that are on nodes that are down. */ @@ -251,7 +254,7 @@ private: framework::MilliSecTimer _transitionTimer; std::atomic<bool> _stale_reads_enabled; using DistributionContexts = std::unordered_map<document::BucketSpace, - std::shared_ptr<ClusterDistributionContext>, + std::shared_ptr<BucketSpaceDistributionContext>, document::BucketSpace::hash>; DistributionContexts _active_distribution_contexts; using DbGuards = std::unordered_map<document::BucketSpace, diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp index 5e2cd42bb46..4adbdd32669 100644 --- a/storage/src/vespa/storage/distributor/distributor.cpp +++ b/storage/src/vespa/storage/distributor/distributor.cpp @@ -77,7 +77,8 @@ Distributor::Distributor(DistributorComponentRegister& compReg, _distributorStatusDelegate(compReg, *this, *this), _bucketDBStatusDelegate(compReg, *this, _bucketDBUpdater), _idealStateManager(*this, *_bucketSpaceRepo, *_readOnlyBucketSpaceRepo, compReg, manageActiveBucketCopies), - _externalOperationHandler(*this, *_bucketSpaceRepo, *_readOnlyBucketSpaceRepo, _idealStateManager, compReg), + _externalOperationHandler(*this, *_bucketSpaceRepo, *_readOnlyBucketSpaceRepo, + _idealStateManager, compReg, use_btree_database), _threadPool(threadPool), _initializingIsUp(true), _doneInitializeHandler(doneInitHandler), @@ -322,9 +323,7 @@ bool Distributor::handleMessage(const std::shared_ptr<api::StorageMessage>& msg) { if (msg->getType().isReply()) { - std::shared_ptr<api::StorageReply> reply = - std::dynamic_pointer_cast<api::StorageReply>(msg); - + auto reply = std::dynamic_pointer_cast<api::StorageReply>(msg); if (handleReply(reply)) { return true; } @@ -400,6 +399,10 @@ Distributor::enableClusterStateBundle(const lib::ClusterStateBundle& state) } } +OperationRoutingSnapshot Distributor::read_snapshot_for_bucket(const document::Bucket& bucket) const { + return _bucketDBUpdater.read_snapshot_for_bucket(bucket); +} + void Distributor::notifyDistributionChangeEnabled() { diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h index 638704adf24..48d9145eec7 100644 --- a/storage/src/vespa/storage/distributor/distributor.h +++ b/storage/src/vespa/storage/distributor/distributor.h @@ -170,6 +170,8 @@ public: return *_readOnlyBucketSpaceRepo; } + OperationRoutingSnapshot read_snapshot_for_bucket(const document::Bucket&) const override; + class Status; class MetricUpdateHook : public framework::MetricUpdateHook { diff --git a/storage/src/vespa/storage/distributor/distributorinterface.h b/storage/src/vespa/storage/distributor/distributorinterface.h index d9f037bb8f1..aba58e112dc 100644 --- a/storage/src/vespa/storage/distributor/distributorinterface.h +++ b/storage/src/vespa/storage/distributor/distributorinterface.h @@ -4,6 +4,7 @@ #include "bucketgctimecalculator.h" #include "distributormessagesender.h" #include "bucketownership.h" +#include "operation_routing_snapshot.h" #include <vespa/storage/bucketdb/bucketdatabase.h> #include <vespa/document/bucket/bucket.h> @@ -49,6 +50,8 @@ public: */ virtual const lib::ClusterStateBundle& getClusterStateBundle() const = 0; + virtual OperationRoutingSnapshot read_snapshot_for_bucket(const document::Bucket&) const = 0; + /** * Returns true if the node is currently initializing. */ diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp index f1f786d84ac..221c516a56e 100644 --- a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp +++ b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp @@ -1,5 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "bucket_space_distribution_context.h" #include "externaloperationhandler.h" #include "distributor.h" #include <vespa/document/base/documentid.h> @@ -12,7 +13,6 @@ #include <vespa/storage/distributor/operations/external/statbucketlistoperation.h> #include <vespa/storage/distributor/operations/external/removelocationoperation.h> #include <vespa/storage/distributor/operations/external/visitoroperation.h> -#include <vespa/document/util/stringutil.h> #include <vespa/storageapi/message/persistence.h> #include <vespa/storageapi/message/removelocation.h> #include <vespa/storageapi/message/stat.h> @@ -30,11 +30,16 @@ ExternalOperationHandler::ExternalOperationHandler(Distributor& owner, DistributorBucketSpaceRepo& bucketSpaceRepo, DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo, const MaintenanceOperationGenerator& gen, - DistributorComponentRegister& compReg) + DistributorComponentRegister& compReg, + bool enable_concurrent_gets) : DistributorComponent(owner, bucketSpaceRepo, readOnlyBucketSpaceRepo, compReg, "External operation handler"), _operationGenerator(gen), - _rejectFeedBeforeTimeReached() // At epoch -{ } + _rejectFeedBeforeTimeReached(), // At epoch + _non_main_thread_ops_mutex(), + _non_main_thread_ops_owner(owner, getClock()), + _enable_concurrent_gets(enable_concurrent_gets) +{ +} ExternalOperationHandler::~ExternalOperationHandler() = default; @@ -78,24 +83,32 @@ void ExternalOperationHandler::bounce_with_result(api::StorageCommand& cmd, cons sendUp(std::shared_ptr<api::StorageMessage>(reply.release())); } -void ExternalOperationHandler::bounce_with_wrong_distribution(api::StorageCommand& cmd) { +void ExternalOperationHandler::bounce_with_wrong_distribution(api::StorageCommand& cmd, + const lib::ClusterState& cluster_state) +{ // Distributor ownership is equal across bucket spaces, so always send back default space state. // This also helps client avoid getting confused by possibly observing different actual // (derived) state strings for global/non-global document types for the same state version. // Similarly, if we've yet to activate any version at all we send back BUSY instead // of a suspiciously empty WrongDistributionReply. // TOOD consider NOT_READY instead of BUSY once we're sure this won't cause any other issues. - const auto& cluster_state = _bucketSpaceRepo.get(document::FixedBucketSpaces::default_space()).getClusterState(); if (cluster_state.getVersion() != 0) { auto cluster_state_str = cluster_state.toString(); - LOG(debug, "Got message with wrong distribution, sending back state '%s'", cluster_state_str.c_str()); + LOG(debug, "Got %s with wrong distribution, sending back state '%s'", + cmd.toString().c_str(), cluster_state_str.c_str()); bounce_with_result(cmd, api::ReturnCode(api::ReturnCode::WRONG_DISTRIBUTION, cluster_state_str)); } else { // Only valid for empty startup state - LOG(debug, "Got message with wrong distribution, but no cluster state activated yet. Sending back BUSY"); + LOG(debug, "Got %s with wrong distribution, but no cluster state activated yet. Sending back BUSY", + cmd.toString().c_str()); bounce_with_result(cmd, api::ReturnCode(api::ReturnCode::BUSY, "No cluster state activated yet")); } } +void ExternalOperationHandler::bounce_with_wrong_distribution(api::StorageCommand& cmd) { + const auto& cluster_state = _bucketSpaceRepo.get(document::FixedBucketSpaces::default_space()).getClusterState(); + bounce_with_wrong_distribution(cmd, cluster_state); +} + void ExternalOperationHandler::bounce_with_busy_during_state_transition( api::StorageCommand& cmd, const lib::ClusterState& current_state, @@ -283,10 +296,23 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, Get) { document::Bucket bucket(cmd->getBucket().getBucketSpace(), getBucketId(cmd->getDocumentId())); auto& metrics = getMetrics().gets[cmd->getLoadType()]; - bounce_or_invoke_read_only_op(*cmd, bucket, metrics, [&](auto& bucket_space_repo) { - auto& space = bucket_space_repo.get(cmd->getBucket().getBucketSpace()); - _op = std::make_shared<GetOperation>(*this, space, space.getBucketDatabase().acquire_read_guard(), cmd, metrics); - }); + auto snapshot = getDistributor().read_snapshot_for_bucket(bucket); + if (!snapshot.is_routable()) { + const auto& ctx = snapshot.context(); + if (ctx.has_pending_state_transition()) { + bounce_with_busy_during_state_transition(*cmd, *ctx.default_active_cluster_state(), + *ctx.pending_cluster_state()); + } else { + bounce_with_wrong_distribution(*cmd, *snapshot.context().default_active_cluster_state()); + metrics.failures.wrongdistributor.inc(); // TODO thread safety for updates + } + return true; + } + // The snapshot is aware of whether stale reads are enabled, so we don't have to check that here. + const auto* space_repo = snapshot.bucket_space_repo(); + assert(space_repo != nullptr); + _op = std::make_shared<GetOperation>(*this, space_repo->get(bucket.getBucketSpace()), + snapshot.steal_read_guard(), cmd, metrics); return true; } diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.h b/storage/src/vespa/storage/distributor/externaloperationhandler.h index 655feb5d00c..9db078af198 100644 --- a/storage/src/vespa/storage/distributor/externaloperationhandler.h +++ b/storage/src/vespa/storage/distributor/externaloperationhandler.h @@ -8,6 +8,7 @@ #include <vespa/storage/distributor/distributorcomponent.h> #include <vespa/storageapi/messageapi/messagehandler.h> #include <chrono> +#include <mutex> namespace storage { @@ -39,7 +40,8 @@ public: DistributorBucketSpaceRepo& bucketSpaceRepo, DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo, const MaintenanceOperationGenerator&, - DistributorComponentRegister& compReg); + DistributorComponentRegister& compReg, + bool enable_concurrent_gets); ~ExternalOperationHandler() override; @@ -55,6 +57,9 @@ private: OperationSequencer _mutationSequencer; Operation::SP _op; TimePoint _rejectFeedBeforeTimeReached; + mutable std::mutex _non_main_thread_ops_mutex; + OperationOwner _non_main_thread_ops_owner; + bool _enable_concurrent_gets; template <typename Func> void bounce_or_invoke_read_only_op(api::StorageCommand& cmd, @@ -62,6 +67,8 @@ private: PersistenceOperationMetricSet& metrics, Func f); + void bounce_with_wrong_distribution(api::StorageCommand& cmd, const lib::ClusterState& cluster_state); + // Bounce with the current _default_ space cluster state. void bounce_with_wrong_distribution(api::StorageCommand& cmd); void bounce_with_busy_during_state_transition(api::StorageCommand& cmd, const lib::ClusterState& current_state, diff --git a/storage/src/vespa/storage/distributor/operation_routing_snapshot.cpp b/storage/src/vespa/storage/distributor/operation_routing_snapshot.cpp index 8300f715cac..ec97e51b66d 100644 --- a/storage/src/vespa/storage/distributor/operation_routing_snapshot.cpp +++ b/storage/src/vespa/storage/distributor/operation_routing_snapshot.cpp @@ -3,25 +3,28 @@ namespace storage::distributor { -OperationRoutingSnapshot::OperationRoutingSnapshot(std::shared_ptr<ClusterDistributionContext> context, - std::shared_ptr<BucketDatabase::ReadGuard> read_guard) +OperationRoutingSnapshot::OperationRoutingSnapshot(std::shared_ptr<const BucketSpaceDistributionContext> context, + std::shared_ptr<BucketDatabase::ReadGuard> read_guard, + const DistributorBucketSpaceRepo* bucket_space_repo) : _context(std::move(context)), - _read_guard(std::move(read_guard)) + _read_guard(std::move(read_guard)), + _bucket_space_repo(bucket_space_repo) {} OperationRoutingSnapshot::~OperationRoutingSnapshot() = default; OperationRoutingSnapshot OperationRoutingSnapshot::make_not_routable_in_state( - std::shared_ptr<ClusterDistributionContext> context) + std::shared_ptr<const BucketSpaceDistributionContext> context) { - return OperationRoutingSnapshot(std::move(context), std::shared_ptr<BucketDatabase::ReadGuard>()); + return OperationRoutingSnapshot(std::move(context), std::shared_ptr<BucketDatabase::ReadGuard>(), nullptr); } OperationRoutingSnapshot OperationRoutingSnapshot::make_routable_with_guard( - std::shared_ptr<ClusterDistributionContext> context, - std::shared_ptr<BucketDatabase::ReadGuard> read_guard) + std::shared_ptr<const BucketSpaceDistributionContext> context, + std::shared_ptr<BucketDatabase::ReadGuard> read_guard, + const DistributorBucketSpaceRepo& bucket_space_repo) { - return OperationRoutingSnapshot(std::move(context), std::move(read_guard)); + return OperationRoutingSnapshot(std::move(context), std::move(read_guard), &bucket_space_repo); } } diff --git a/storage/src/vespa/storage/distributor/operation_routing_snapshot.h b/storage/src/vespa/storage/distributor/operation_routing_snapshot.h index 0bd8c5d928e..16ec8fef1c7 100644 --- a/storage/src/vespa/storage/distributor/operation_routing_snapshot.h +++ b/storage/src/vespa/storage/distributor/operation_routing_snapshot.h @@ -6,21 +6,37 @@ namespace storage::distributor { -class ClusterDistributionContext; +class BucketSpaceDistributionContext; +class DistributorBucketSpaceRepo; /** - * TODO desc + * An "operation routing snapshot" is intended to provide a stable means of computing + * bucket routing targets and performing database lookups for a particular bucket space + * in a potentially multi-threaded setting. When using multiple threads, both the current + * cluster/distribution state as well as the underlying bucket database may change + * independent of each other when observed from any other thread than the main distributor + * thread. Additionally, the bucket management system may operate with separate read-only + * databases during state transitions, complicating things further. + * + * By using an OperationRoutingSnapshot, a caller gets a consistent view of the world + * that stays valid throughout the operation's life time. + * + * Note that holding the DB read guard should be done for as short a time as possible to + * avoid elevated memory usage caused by data stores not being able to free on-hold items. */ class OperationRoutingSnapshot { - std::shared_ptr<ClusterDistributionContext> _context; + std::shared_ptr<const BucketSpaceDistributionContext> _context; std::shared_ptr<BucketDatabase::ReadGuard> _read_guard; + const DistributorBucketSpaceRepo* _bucket_space_repo; public: - OperationRoutingSnapshot(std::shared_ptr<ClusterDistributionContext> context, - std::shared_ptr<BucketDatabase::ReadGuard> read_guard); + OperationRoutingSnapshot(std::shared_ptr<const BucketSpaceDistributionContext> context, + std::shared_ptr<BucketDatabase::ReadGuard> read_guard, + const DistributorBucketSpaceRepo* bucket_space_repo); - static OperationRoutingSnapshot make_not_routable_in_state(std::shared_ptr<ClusterDistributionContext> context); - static OperationRoutingSnapshot make_routable_with_guard(std::shared_ptr<ClusterDistributionContext> context, - std::shared_ptr<BucketDatabase::ReadGuard> read_guard); + static OperationRoutingSnapshot make_not_routable_in_state(std::shared_ptr<const BucketSpaceDistributionContext> context); + static OperationRoutingSnapshot make_routable_with_guard(std::shared_ptr<const BucketSpaceDistributionContext> context, + std::shared_ptr<BucketDatabase::ReadGuard> read_guard, + const DistributorBucketSpaceRepo& bucket_space_repo); OperationRoutingSnapshot(const OperationRoutingSnapshot&) noexcept = default; OperationRoutingSnapshot& operator=(const OperationRoutingSnapshot&) noexcept = default; @@ -29,13 +45,16 @@ public: ~OperationRoutingSnapshot(); - const ClusterDistributionContext& context() const noexcept { return *_context; } + const BucketSpaceDistributionContext& context() const noexcept { return *_context; } std::shared_ptr<BucketDatabase::ReadGuard> steal_read_guard() noexcept { return std::move(_read_guard); } bool is_routable() const noexcept { return (_read_guard.get() != nullptr); } + const DistributorBucketSpaceRepo* bucket_space_repo() const noexcept { + return _bucket_space_repo; + } }; } diff --git a/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp index 768da935fd2..7ff2e298791 100644 --- a/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp @@ -45,7 +45,7 @@ GetOperation::GroupId::operator==(const GroupId& other) const } GetOperation::GetOperation(DistributorComponent& manager, - DistributorBucketSpace &bucketSpace, + const DistributorBucketSpace &bucketSpace, std::shared_ptr<BucketDatabase::ReadGuard> read_guard, std::shared_ptr<api::GetCommand> msg, PersistenceOperationMetricSet& metric) diff --git a/storage/src/vespa/storage/distributor/operations/external/getoperation.h b/storage/src/vespa/storage/distributor/operations/external/getoperation.h index 8d0fdb0c2bb..fe4dab5e9f2 100644 --- a/storage/src/vespa/storage/distributor/operations/external/getoperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/getoperation.h @@ -24,7 +24,7 @@ class GetOperation : public Operation { public: GetOperation(DistributorComponent& manager, - DistributorBucketSpace &bucketSpace, + const DistributorBucketSpace &bucketSpace, std::shared_ptr<BucketDatabase::ReadGuard> read_guard, std::shared_ptr<api::GetCommand> msg, PersistenceOperationMetricSet& metric); @@ -77,7 +77,7 @@ private: std::map<GroupId, GroupVector> _responses; DistributorComponent& _manager; - DistributorBucketSpace &_bucketSpace; + const DistributorBucketSpace &_bucketSpace; std::shared_ptr<api::GetCommand> _msg; |