diff options
author | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2019-10-04 14:50:55 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-10-04 14:50:55 +0200 |
commit | 7bb9233afcf04a82bf8210c910450f0efc5f83f5 (patch) | |
tree | 6479c293aa4bc6ee737ff0a6983a632aed31e7a4 | |
parent | 685704ab391abc6efdb9577c24bdf7ed48333f8e (diff) | |
parent | b2c66836548ffa4aabc380195026fdbd844d40e7 (diff) |
Merge pull request #10861 from vespa-engine/vekterli/add-distribution-state-snapshotting
Add distribution state snapshotting
21 files changed, 605 insertions, 44 deletions
diff --git a/storage/src/tests/distributor/bucketdbupdatertest.cpp b/storage/src/tests/distributor/bucketdbupdatertest.cpp index 321b0cc3bba..8409bd60986 100644 --- a/storage/src/tests/distributor/bucketdbupdatertest.cpp +++ b/storage/src/tests/distributor/bucketdbupdatertest.cpp @@ -2,11 +2,10 @@ #include <vespa/storageapi/message/persistence.h> #include <vespa/storage/distributor/bucketdbupdater.h> +#include <vespa/storage/distributor/bucket_space_distribution_context.h> #include <vespa/storage/distributor/distributormetricsset.h> #include <vespa/storage/distributor/pending_bucket_space_db_transition.h> #include <vespa/storage/distributor/outdated_nodes_map.h> -#include <vespa/vespalib/io/fileutil.h> -#include <vespa/storageframework/defaultimplementation/clock/realclock.h> #include <vespa/storage/storageutil/distributorstatecache.h> #include <tests/distributor/distributortestutil.h> #include <vespa/document/test/make_document_bucket.h> @@ -124,7 +123,7 @@ public: createLinks(); _bucketSpaces = getBucketSpaces(); // Disable deferred activation by default (at least for now) to avoid breaking the entire world. - getConfig().setAllowStaleReadsDuringClusterStateTransitions(false); + getBucketDBUpdater().set_stale_reads_enabled(false); }; void TearDown() override { @@ -2415,7 +2414,7 @@ void for_each_bucket(const DistributorBucketSpaceRepo& repo, Func&& f) { } TEST_F(BucketDBUpdaterTest, non_owned_buckets_moved_to_read_only_db_on_ownership_change) { - getConfig().setAllowStaleReadsDuringClusterStateTransitions(true); + getBucketDBUpdater().set_stale_reads_enabled(true); lib::ClusterState initial_state("distributor:1 storage:4"); // All buckets owned by us by definition set_cluster_state_bundle(lib::ClusterStateBundle(initial_state, {}, false)); // Skip activation step for simplicity @@ -2468,7 +2467,7 @@ TEST_F(BucketDBUpdaterTest, buckets_no_longer_available_are_not_moved_to_read_on } TEST_F(BucketDBUpdaterTest, non_owned_buckets_purged_when_read_only_support_is_config_disabled) { - getConfig().setAllowStaleReadsDuringClusterStateTransitions(false); + getBucketDBUpdater().set_stale_reads_enabled(false); lib::ClusterState initial_state("distributor:1 storage:4"); // All buckets owned by us by definition set_cluster_state_bundle(lib::ClusterStateBundle(initial_state, {}, false)); // Skip activation step for simplicity @@ -2497,7 +2496,6 @@ void BucketDBUpdaterTest::trigger_completed_but_not_yet_activated_transition( uint32_t pending_buckets, uint32_t pending_expected_msgs) { - getConfig().setAllowStaleReadsDuringClusterStateTransitions(true); lib::ClusterState initial_state(initial_state_str); setSystemState(initial_state); ASSERT_EQ(messageCount(initial_expected_msgs), _sender.commands().size()); @@ -2514,6 +2512,7 @@ void BucketDBUpdaterTest::trigger_completed_but_not_yet_activated_transition( } TEST_F(BucketDBUpdaterTest, deferred_activated_state_does_not_enable_state_until_activation_received) { + getBucketDBUpdater().set_stale_reads_enabled(true); constexpr uint32_t n_buckets = 10; ASSERT_NO_FATAL_FAILURE( trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4, @@ -2533,6 +2532,7 @@ TEST_F(BucketDBUpdaterTest, deferred_activated_state_does_not_enable_state_until } TEST_F(BucketDBUpdaterTest, read_only_db_cleared_once_pending_state_is_activated) { + getBucketDBUpdater().set_stale_reads_enabled(true); constexpr uint32_t n_buckets = 10; ASSERT_NO_FATAL_FAILURE( trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4, @@ -2544,6 +2544,7 @@ TEST_F(BucketDBUpdaterTest, read_only_db_cleared_once_pending_state_is_activated } TEST_F(BucketDBUpdaterTest, read_only_db_is_populated_even_when_self_is_marked_down) { + getBucketDBUpdater().set_stale_reads_enabled(true); constexpr uint32_t n_buckets = 10; ASSERT_NO_FATAL_FAILURE( trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4, @@ -2557,6 +2558,7 @@ TEST_F(BucketDBUpdaterTest, read_only_db_is_populated_even_when_self_is_marked_d } TEST_F(BucketDBUpdaterTest, activate_cluster_state_request_with_mismatching_version_returns_actual_version) { + getBucketDBUpdater().set_stale_reads_enabled(true); constexpr uint32_t n_buckets = 10; ASSERT_NO_FATAL_FAILURE( trigger_completed_but_not_yet_activated_transition("version:4 distributor:1 storage:4", n_buckets, 4, @@ -2570,6 +2572,7 @@ TEST_F(BucketDBUpdaterTest, activate_cluster_state_request_with_mismatching_vers } TEST_F(BucketDBUpdaterTest, activate_cluster_state_request_without_pending_transition_passes_message_through) { + getBucketDBUpdater().set_stale_reads_enabled(true); constexpr uint32_t n_buckets = 10; ASSERT_NO_FATAL_FAILURE( trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4, @@ -2727,4 +2730,136 @@ TEST_F(BucketDBUpdaterTest, pending_cluster_state_getter_is_non_null_only_when_s EXPECT_TRUE(state == nullptr); } +struct BucketDBUpdaterSnapshotTest : BucketDBUpdaterTest { + lib::ClusterState empty_state; + std::shared_ptr<lib::ClusterState> initial_baseline; + std::shared_ptr<lib::ClusterState> initial_default; + lib::ClusterStateBundle initial_bundle; + Bucket default_bucket; + Bucket global_bucket; + + BucketDBUpdaterSnapshotTest() + : BucketDBUpdaterTest(), + empty_state(), + initial_baseline(std::make_shared<lib::ClusterState>("distributor:1 storage:2 .0.s:d")), + initial_default(std::make_shared<lib::ClusterState>("distributor:1 storage:2 .0.s:m")), + initial_bundle(*initial_baseline, {{FixedBucketSpaces::default_space(), initial_default}, + {FixedBucketSpaces::global_space(), initial_baseline}}), + default_bucket(FixedBucketSpaces::default_space(), BucketId(16, 1234)), + global_bucket(FixedBucketSpaces::global_space(), BucketId(16, 1234)) + { + } + ~BucketDBUpdaterSnapshotTest() override; + + void SetUp() override { + BucketDBUpdaterTest::SetUp(); + getBucketDBUpdater().set_stale_reads_enabled(true); + }; + + // Assumes that the distributor owns all buckets, so it may choose any arbitrary bucket in the bucket space + uint32_t buckets_in_snapshot_matching_current_db(DistributorBucketSpaceRepo& repo, BucketSpace bucket_space) { + auto rs = getBucketDBUpdater().read_snapshot_for_bucket(Bucket(bucket_space, BucketId(16, 1234))); + if (!rs.is_routable()) { + return 0; + } + auto guard = rs.steal_read_guard(); + uint32_t found_buckets = 0; + for_each_bucket(repo, [&](const auto& space, const auto& entry) { + if (space == bucket_space) { + std::vector<BucketDatabase::Entry> entries; + guard->find_parents_and_self(entry.getBucketId(), entries); + if (entries.size() == 1) { + ++found_buckets; + } + } + }); + return found_buckets; + } +}; + +BucketDBUpdaterSnapshotTest::~BucketDBUpdaterSnapshotTest() = default; + +TEST_F(BucketDBUpdaterSnapshotTest, default_space_snapshot_prior_to_activated_state_is_non_routable) { + auto rs = getBucketDBUpdater().read_snapshot_for_bucket(default_bucket); + EXPECT_FALSE(rs.is_routable()); +} + +TEST_F(BucketDBUpdaterSnapshotTest, global_space_snapshot_prior_to_activated_state_is_non_routable) { + auto rs = getBucketDBUpdater().read_snapshot_for_bucket(global_bucket); + EXPECT_FALSE(rs.is_routable()); +} + +TEST_F(BucketDBUpdaterSnapshotTest, read_snapshot_returns_appropriate_cluster_states) { + set_cluster_state_bundle(initial_bundle); + // State currently pending, empty initial state is active + + auto def_rs = getBucketDBUpdater().read_snapshot_for_bucket(default_bucket); + EXPECT_EQ(def_rs.context().active_cluster_state()->toString(), empty_state.toString()); + EXPECT_EQ(def_rs.context().default_active_cluster_state()->toString(), empty_state.toString()); + ASSERT_TRUE(def_rs.context().has_pending_state_transition()); + EXPECT_EQ(def_rs.context().pending_cluster_state()->toString(), initial_default->toString()); + + auto global_rs = getBucketDBUpdater().read_snapshot_for_bucket(global_bucket); + EXPECT_EQ(global_rs.context().active_cluster_state()->toString(), empty_state.toString()); + EXPECT_EQ(global_rs.context().default_active_cluster_state()->toString(), empty_state.toString()); + ASSERT_TRUE(global_rs.context().has_pending_state_transition()); + EXPECT_EQ(global_rs.context().pending_cluster_state()->toString(), initial_baseline->toString()); + + ASSERT_NO_FATAL_FAILURE(completeBucketInfoGathering(*initial_baseline, messageCount(1), 0)); + // State now activated, no pending + + def_rs = getBucketDBUpdater().read_snapshot_for_bucket(default_bucket); + EXPECT_EQ(def_rs.context().active_cluster_state()->toString(), initial_default->toString()); + EXPECT_EQ(def_rs.context().default_active_cluster_state()->toString(), initial_default->toString()); + EXPECT_FALSE(def_rs.context().has_pending_state_transition()); + + global_rs = getBucketDBUpdater().read_snapshot_for_bucket(global_bucket); + EXPECT_EQ(global_rs.context().active_cluster_state()->toString(), initial_baseline->toString()); + EXPECT_EQ(global_rs.context().default_active_cluster_state()->toString(), initial_default->toString()); + EXPECT_FALSE(global_rs.context().has_pending_state_transition()); +} + +TEST_F(BucketDBUpdaterSnapshotTest, snapshot_with_no_pending_state_transition_returns_mutable_db_guard) { + constexpr uint32_t n_buckets = 10; + ASSERT_NO_FATAL_FAILURE( + trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4, + "version:2 distributor:1 storage:4", n_buckets, 4)); + EXPECT_FALSE(activate_cluster_state_version(2)); + EXPECT_EQ(buckets_in_snapshot_matching_current_db(mutable_repo(), FixedBucketSpaces::default_space()), + n_buckets); + EXPECT_EQ(buckets_in_snapshot_matching_current_db(mutable_repo(), FixedBucketSpaces::global_space()), + n_buckets); +} + +TEST_F(BucketDBUpdaterSnapshotTest, snapshot_returns_unroutable_for_non_owned_bucket_in_current_state) { + ASSERT_NO_FATAL_FAILURE( + trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4, + "version:2 distributor:2 .0.s:d storage:4", 0, 0)); + EXPECT_FALSE(activate_cluster_state_version(2)); + // We're down in state 2 and therefore do not own any buckets + auto def_rs = getBucketDBUpdater().read_snapshot_for_bucket(default_bucket); + EXPECT_FALSE(def_rs.is_routable()); +} + +TEST_F(BucketDBUpdaterSnapshotTest, snapshot_with_pending_state_returns_read_only_guard_for_bucket_only_owned_in_current_state) { + constexpr uint32_t n_buckets = 10; + ASSERT_NO_FATAL_FAILURE( + trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4, + "version:2 distributor:2 .0.s:d storage:4", 0, 0)); + EXPECT_EQ(buckets_in_snapshot_matching_current_db(read_only_repo(), FixedBucketSpaces::default_space()), + n_buckets); + EXPECT_EQ(buckets_in_snapshot_matching_current_db(read_only_repo(), FixedBucketSpaces::global_space()), + n_buckets); +} + +TEST_F(BucketDBUpdaterSnapshotTest, snapshot_is_unroutable_if_stale_reads_disabled_and_bucket_not_owned_in_pending_state) { + getBucketDBUpdater().set_stale_reads_enabled(false); + constexpr uint32_t n_buckets = 10; + ASSERT_NO_FATAL_FAILURE( + trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4, + "version:2 distributor:2 .0.s:d storage:4", 0, 0)); + auto def_rs = getBucketDBUpdater().read_snapshot_for_bucket(default_bucket); + EXPECT_FALSE(def_rs.is_routable()); +} + } diff --git a/storage/src/tests/distributor/distributortestutil.cpp b/storage/src/tests/distributor/distributortestutil.cpp index 4a9ef147741..15820b64ff9 100644 --- a/storage/src/tests/distributor/distributortestutil.cpp +++ b/storage/src/tests/distributor/distributortestutil.cpp @@ -417,7 +417,8 @@ DistributorTestUtil::getBucketSpaces() const void DistributorTestUtil::enableDistributorClusterState(vespalib::stringref state) { - _distributor->enableClusterStateBundle(lib::ClusterStateBundle(lib::ClusterState(state))); + getBucketDBUpdater().simulate_cluster_state_bundle_activation( + lib::ClusterStateBundle(lib::ClusterState(state))); } } diff --git a/storage/src/tests/distributor/externaloperationhandlertest.cpp b/storage/src/tests/distributor/externaloperationhandlertest.cpp index 600e56faf31..84f7d34d069 100644 --- a/storage/src/tests/distributor/externaloperationhandlertest.cpp +++ b/storage/src/tests/distributor/externaloperationhandlertest.cpp @@ -505,6 +505,7 @@ document::BucketId ExternalOperationHandlerTest::set_up_pending_cluster_state_tr std::string current = "version:123 distributor:2 storage:2"; std::string pending = "version:321 distributor:3 storage:3"; setupDistributor(1, 3, current); + getBucketDBUpdater().set_stale_reads_enabled(read_only_enabled); getConfig().setAllowStaleReadsDuringClusterStateTransitions(read_only_enabled); // Trigger pending cluster state diff --git a/storage/src/tests/distributor/getoperationtest.cpp b/storage/src/tests/distributor/getoperationtest.cpp index 7c308e152db..99d7c12551d 100644 --- a/storage/src/tests/distributor/getoperationtest.cpp +++ b/storage/src/tests/distributor/getoperationtest.cpp @@ -3,6 +3,8 @@ #include <vespa/config/helper/configgetter.h> #include <vespa/document/config/config-documenttypes.h> #include <vespa/document/repo/documenttyperepo.h> +#include <vespa/storage/bucketdb/bucketdatabase.h> +#include <vespa/storage/distributor/distributor_bucket_space.h> #include <vespa/storage/distributor/externaloperationhandler.h> #include <vespa/storage/distributor/distributor.h> #include <vespa/storage/distributor/distributormetricsset.h> @@ -31,7 +33,7 @@ struct GetOperationTest : Test, DistributorTestUtil { std::unique_ptr<Operation> op; GetOperationTest(); - ~GetOperationTest(); + ~GetOperationTest() override; void SetUp() override { _repo.reset( @@ -53,6 +55,7 @@ struct GetOperationTest : Test, DistributorTestUtil { auto msg = std::make_shared<api::GetCommand>(makeDocumentBucket(document::BucketId(0)), docId, "[all]"); op = std::make_unique<GetOperation>( getExternalOperationHandler(), getDistributorBucketSpace(), + getDistributorBucketSpace().getBucketDatabase().acquire_read_guard(), msg, getDistributor().getMetrics(). gets[msg->getLoadType()]); op->start(_sender, framework::MilliSecTime(0)); } diff --git a/storage/src/vespa/storage/bucketdb/btree_bucket_database.cpp b/storage/src/vespa/storage/bucketdb/btree_bucket_database.cpp index c3ade3c2877..66d44a655e0 100644 --- a/storage/src/vespa/storage/bucketdb/btree_bucket_database.cpp +++ b/storage/src/vespa/storage/bucketdb/btree_bucket_database.cpp @@ -148,7 +148,9 @@ Entry BTreeBucketDatabase::entry_from_iterator(const BTree::ConstIterator& iter) if (!iter.valid()) { return Entry::createInvalid(); } - return entry_from_value(iter.getKey(), iter.getData()); + const auto value = iter.getData(); + std::atomic_thread_fence(std::memory_order_acquire); + return entry_from_value(iter.getKey(), value); } ConstEntryRef BTreeBucketDatabase::const_entry_ref_from_iterator(const BTree::ConstIterator& iter) const { @@ -156,6 +158,7 @@ ConstEntryRef BTreeBucketDatabase::const_entry_ref_from_iterator(const BTree::Co return ConstEntryRef::createInvalid(); } const auto value = iter.getData(); + std::atomic_thread_fence(std::memory_order_acquire); const auto replicas_ref = _store.get(entry_ref_from_value(value)); const auto bucket = BucketId(BucketId::keyToBucketId(iter.getKey())); return const_entry_ref_from_replica_array_ref(bucket, gc_timestamp_from_value(value), replicas_ref); diff --git a/storage/src/vespa/storage/distributor/CMakeLists.txt b/storage/src/vespa/storage/distributor/CMakeLists.txt index 8c701033e67..944df6e1708 100644 --- a/storage/src/vespa/storage/distributor/CMakeLists.txt +++ b/storage/src/vespa/storage/distributor/CMakeLists.txt @@ -7,6 +7,7 @@ vespa_add_library(storage_distributor bucket_db_prune_elision.cpp bucketgctimecalculator.cpp bucketlistmerger.cpp + bucket_space_distribution_context.cpp clusterinformation.cpp distributor_bucket_space.cpp distributor_bucket_space_repo.cpp @@ -20,6 +21,7 @@ vespa_add_library(storage_distributor idealstatemetricsset.cpp messagetracker.cpp nodeinfo.cpp + operation_routing_snapshot.cpp operation_sequencer.cpp operationowner.cpp operationtargetresolver.cpp diff --git a/storage/src/vespa/storage/distributor/bucket_space_distribution_context.cpp b/storage/src/vespa/storage/distributor/bucket_space_distribution_context.cpp new file mode 100644 index 00000000000..53040bc42b1 --- /dev/null +++ b/storage/src/vespa/storage/distributor/bucket_space_distribution_context.cpp @@ -0,0 +1,81 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "bucket_space_distribution_context.h" + +namespace storage::distributor { + +BucketSpaceDistributionContext::~BucketSpaceDistributionContext() = default; + +BucketSpaceDistributionContext::BucketSpaceDistributionContext( + std::shared_ptr<const lib::ClusterState> active_cluster_state, + std::shared_ptr<const lib::ClusterState> default_active_cluster_state, + std::shared_ptr<const lib::ClusterState> pending_cluster_state, + std::shared_ptr<const lib::Distribution> distribution, + uint16_t this_node_index) + : _active_cluster_state(std::move(active_cluster_state)), + _default_active_cluster_state(std::move(default_active_cluster_state)), + _pending_cluster_state(std::move(pending_cluster_state)), + _distribution(std::move(distribution)), + _this_node_index(this_node_index) +{} + +std::shared_ptr<BucketSpaceDistributionContext> BucketSpaceDistributionContext::make_state_transition( + std::shared_ptr<const lib::ClusterState> active_cluster_state, + std::shared_ptr<const lib::ClusterState> default_active_cluster_state, + std::shared_ptr<const lib::ClusterState> pending_cluster_state, + std::shared_ptr<const lib::Distribution> distribution, + uint16_t this_node_index) +{ + return std::make_shared<BucketSpaceDistributionContext>( + std::move(active_cluster_state), std::move(default_active_cluster_state), + std::move(pending_cluster_state), std::move(distribution), + this_node_index); +} + +std::shared_ptr<BucketSpaceDistributionContext> BucketSpaceDistributionContext::make_stable_state( + std::shared_ptr<const lib::ClusterState> active_cluster_state, + std::shared_ptr<const lib::ClusterState> default_active_cluster_state, + std::shared_ptr<const lib::Distribution> distribution, + uint16_t this_node_index) +{ + return std::make_shared<BucketSpaceDistributionContext>( + std::move(active_cluster_state), std::move(default_active_cluster_state), + std::shared_ptr<const lib::ClusterState>(), + std::move(distribution), this_node_index); +} + +std::shared_ptr<BucketSpaceDistributionContext> +BucketSpaceDistributionContext::make_not_yet_initialized(uint16_t this_node_index) +{ + return std::make_shared<BucketSpaceDistributionContext>( + std::make_shared<const lib::ClusterState>(), + std::make_shared<const lib::ClusterState>(), + std::shared_ptr<const lib::ClusterState>(), + std::make_shared<const lib::Distribution>(), + this_node_index); +} + +bool BucketSpaceDistributionContext::bucket_owned_in_state(const lib::ClusterState& state, + const document::BucketId& id) const +{ + try { + uint16_t owner_idx = _distribution->getIdealDistributorNode(state, id); + return (owner_idx == _this_node_index); + } catch (lib::TooFewBucketBitsInUseException&) { + return false; + } catch (lib::NoDistributorsAvailableException&) { + return false; + } +} + +bool BucketSpaceDistributionContext::bucket_owned_in_active_state(const document::BucketId& id) const { + return bucket_owned_in_state(*_active_cluster_state, id); +} + +bool BucketSpaceDistributionContext::bucket_owned_in_pending_state(const document::BucketId& id) const { + if (_pending_cluster_state) { + return bucket_owned_in_state(*_pending_cluster_state, id); + } + return true; // No pending state, owned by default. +} + +} diff --git a/storage/src/vespa/storage/distributor/bucket_space_distribution_context.h b/storage/src/vespa/storage/distributor/bucket_space_distribution_context.h new file mode 100644 index 00000000000..7a9c0fcae60 --- /dev/null +++ b/storage/src/vespa/storage/distributor/bucket_space_distribution_context.h @@ -0,0 +1,70 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/vdslib/distribution/distribution.h> +#include <vespa/vdslib/state/clusterstate.h> +#include <memory> +#include <cstdint> + +namespace storage::distributor { + +/** + * Represents a consistent snapshot of cluster state and distribution config + * information at a particular point in time. This is sufficient to compute + * bucket ownership and distributions for the bucket space associated with + * the context. + * + * Since this is a snapshot in time, the context is immutable once created. + */ +class BucketSpaceDistributionContext { + std::shared_ptr<const lib::ClusterState> _active_cluster_state; + std::shared_ptr<const lib::ClusterState> _default_active_cluster_state; + std::shared_ptr<const lib::ClusterState> _pending_cluster_state; // May be null if no state is pending + std::shared_ptr<const lib::Distribution> _distribution; // TODO ideally should have a pending distribution as well + uint16_t _this_node_index; +public: + BucketSpaceDistributionContext() = delete; + // Public due to make_shared, prefer factory functions to instantiate instead. + BucketSpaceDistributionContext(std::shared_ptr<const lib::ClusterState> active_cluster_state, + std::shared_ptr<const lib::ClusterState> default_active_cluster_state, + std::shared_ptr<const lib::ClusterState> pending_cluster_state, + std::shared_ptr<const lib::Distribution> distribution, + uint16_t this_node_index); + ~BucketSpaceDistributionContext(); + + static std::shared_ptr<BucketSpaceDistributionContext> make_state_transition( + std::shared_ptr<const lib::ClusterState> active_cluster_state, + std::shared_ptr<const lib::ClusterState> default_active_cluster_state, + std::shared_ptr<const lib::ClusterState> pending_cluster_state, + std::shared_ptr<const lib::Distribution> distribution, + uint16_t this_node_index); + static std::shared_ptr<BucketSpaceDistributionContext> make_stable_state( + std::shared_ptr<const lib::ClusterState> active_cluster_state, + std::shared_ptr<const lib::ClusterState> default_active_cluster_state, + std::shared_ptr<const lib::Distribution> distribution, + uint16_t this_node_index); + static std::shared_ptr<BucketSpaceDistributionContext> make_not_yet_initialized(uint16_t this_node_index); + + const std::shared_ptr<const lib::ClusterState>& active_cluster_state() const noexcept { + return _active_cluster_state; + } + + const std::shared_ptr<const lib::ClusterState>& default_active_cluster_state() const noexcept { + return _default_active_cluster_state; + } + bool has_pending_state_transition() const noexcept { + return (_pending_cluster_state.get() != nullptr); + } + // Returned shared_ptr is nullptr iff has_pending_state_transition() == false. + const std::shared_ptr<const lib::ClusterState>& pending_cluster_state() const noexcept { + return _pending_cluster_state; + } + + bool bucket_owned_in_state(const lib::ClusterState& state, const document::BucketId& id) const; + bool bucket_owned_in_active_state(const document::BucketId& id) const; + bool bucket_owned_in_pending_state(const document::BucketId& id) const; + + uint16_t this_node_index() const noexcept { return _this_node_index; } +}; + +} diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp index a901ac28a54..227165a0911 100644 --- a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp +++ b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp @@ -2,6 +2,7 @@ #include "bucketdbupdater.h" #include "bucket_db_prune_elision.h" +#include "bucket_space_distribution_context.h" #include "distributor.h" #include "distributor_bucket_space.h" #include "distributormetricsset.h" @@ -30,12 +31,44 @@ BucketDBUpdater::BucketDBUpdater(Distributor& owner, : framework::StatusReporter("bucketdb", "Bucket DB Updater"), _distributorComponent(owner, bucketSpaceRepo, readOnlyBucketSpaceRepo, compReg, "Bucket DB Updater"), _sender(sender), - _transitionTimer(_distributorComponent.getClock()) + _transitionTimer(_distributorComponent.getClock()), + _active_distribution_contexts(), + _distribution_context_mutex() { + for (auto& elem : _distributorComponent.getBucketSpaceRepo()) { + _active_distribution_contexts.emplace( + elem.first, + BucketSpaceDistributionContext::make_not_yet_initialized(_distributorComponent.getIndex())); + _explicit_transition_read_guard.emplace(elem.first, std::shared_ptr<BucketDatabase::ReadGuard>()); + } } BucketDBUpdater::~BucketDBUpdater() = default; +OperationRoutingSnapshot BucketDBUpdater::read_snapshot_for_bucket(const document::Bucket& bucket) const { + const auto bucket_space = bucket.getBucketSpace(); + std::lock_guard lock(_distribution_context_mutex); + auto active_state_iter = _active_distribution_contexts.find(bucket_space); + assert(active_state_iter != _active_distribution_contexts.cend()); + auto& state = *active_state_iter->second; + if (!state.bucket_owned_in_active_state(bucket.getBucketId())) { + return OperationRoutingSnapshot::make_not_routable_in_state(active_state_iter->second); + } + const bool bucket_present_in_mutable_db = state.bucket_owned_in_pending_state(bucket.getBucketId()); + if (!bucket_present_in_mutable_db && !stale_reads_enabled()) { + return OperationRoutingSnapshot::make_not_routable_in_state(active_state_iter->second); + } + const auto& space_repo = bucket_present_in_mutable_db + ? _distributorComponent.getBucketSpaceRepo() + : _distributorComponent.getReadOnlyBucketSpaceRepo(); + auto existing_guard_iter = _explicit_transition_read_guard.find(bucket_space); + assert(existing_guard_iter != _explicit_transition_read_guard.cend()); + auto db_guard = existing_guard_iter->second + ? existing_guard_iter-> second + : space_repo.get(bucket_space).getBucketDatabase().acquire_read_guard(); + return OperationRoutingSnapshot::make_routable_with_guard(active_state_iter->second, std::move(db_guard), space_repo); +} + void BucketDBUpdater::flush() { @@ -59,8 +92,7 @@ BucketDBUpdater::print(std::ostream& out, bool verbose, const std::string& inden bool BucketDBUpdater::shouldDeferStateEnabling() const noexcept { - return _distributorComponent.getDistributor().getConfig() - .allowStaleReadsDuringClusterStateTransitions(); + return stale_reads_enabled(); } bool @@ -258,6 +290,61 @@ BucketDBUpdater::replyToActivationWithActualVersion( _distributorComponent.sendUp(reply); // TODO let API accept rvalues } +void BucketDBUpdater::update_read_snapshot_before_db_pruning() { + std::lock_guard lock(_distribution_context_mutex); + for (auto& elem : _distributorComponent.getBucketSpaceRepo()) { + // At this point, we're still operating with a distribution context _without_ a + // pending state, i.e. anyone using the context will expect to find buckets + // in the DB that correspond to how the database looked like prior to pruning + // buckets from the DB. To ensure this is not violated, take a snapshot of the + // _mutable_ DB and expose this. This snapshot only lives until we atomically + // flip to expose a distribution context that includes the new, pending state. + // At that point, the read-only DB is known to contain the buckets that have + // been pruned away, so we can release the mutable DB snapshot safely. + // TODO test for, and handle, state preemption case! + _explicit_transition_read_guard[elem.first] = elem.second->getBucketDatabase().acquire_read_guard(); + } +} + + +void BucketDBUpdater::update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle& new_state) { + std::lock_guard lock(_distribution_context_mutex); + const auto old_default_state = _distributorComponent.getBucketSpaceRepo().get( + document::FixedBucketSpaces::default_space()).cluster_state_sp(); + for (auto& elem : _distributorComponent.getBucketSpaceRepo()) { + auto new_distribution = elem.second->distribution_sp(); + auto old_cluster_state = elem.second->cluster_state_sp(); + auto new_cluster_state = new_state.getDerivedClusterState(elem.first); + _active_distribution_contexts.insert_or_assign( + elem.first, + BucketSpaceDistributionContext::make_state_transition( + std::move(old_cluster_state), + old_default_state, + std::move(new_cluster_state), + std::move(new_distribution), + _distributorComponent.getIndex())); + // We can now remove the explicit mutable DB snapshot, as the buckets that have been + // pruned away are visible in the read-only DB. + _explicit_transition_read_guard[elem.first] = std::shared_ptr<BucketDatabase::ReadGuard>(); + } +} + +void BucketDBUpdater::update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state) { + std::lock_guard lock(_distribution_context_mutex); + const auto& default_cluster_state = activated_state.getDerivedClusterState(document::FixedBucketSpaces::default_space()); + for (auto& elem : _distributorComponent.getBucketSpaceRepo()) { + auto new_distribution = elem.second->distribution_sp(); + auto new_cluster_state = activated_state.getDerivedClusterState(elem.first); + _active_distribution_contexts.insert_or_assign( + elem.first, + BucketSpaceDistributionContext::make_stable_state( + std::move(new_cluster_state), + default_cluster_state, + std::move(new_distribution), + _distributorComponent.getIndex())); + } +} + bool BucketDBUpdater::onSetSystemState( const std::shared_ptr<api::SetSystemStateCommand>& cmd) @@ -275,8 +362,10 @@ BucketDBUpdater::onSetSystemState( ensureTransitionTimerStarted(); // Separate timer since _transitionTimer might span multiple pending states. framework::MilliSecTimer process_timer(_distributorComponent.getClock()); - - removeSuperfluousBuckets(cmd->getClusterStateBundle(), false); + update_read_snapshot_before_db_pruning(); + const auto& bundle = cmd->getClusterStateBundle(); + removeSuperfluousBuckets(bundle, false); + update_read_snapshot_after_db_pruning(bundle); replyToPreviousPendingClusterStateIfAny(); ClusterInformation::CSP clusterInfo( @@ -642,6 +731,7 @@ BucketDBUpdater::activatePendingClusterState() _distributorComponent.getDistributor().notifyDistributionChangeEnabled(); } + update_read_snapshot_after_activation(_pendingClusterState->getNewClusterStateBundle()); _pendingClusterState.reset(); _outdatedNodesMap.clear(); sendAllQueuedBucketRechecks(); @@ -665,6 +755,11 @@ BucketDBUpdater::enableCurrentClusterStateBundleInDistributor() _distributorComponent.getDistributor().enableClusterStateBundle(state); } +void BucketDBUpdater::simulate_cluster_state_bundle_activation(const lib::ClusterStateBundle& activated_state) { + update_read_snapshot_after_activation(activated_state); + _distributorComponent.getDistributor().enableClusterStateBundle(activated_state); +} + void BucketDBUpdater::addCurrentStateToClusterStateHistory() { diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.h b/storage/src/vespa/storage/distributor/bucketdbupdater.h index e69d328d8bc..86ceab14486 100644 --- a/storage/src/vespa/storage/distributor/bucketdbupdater.h +++ b/storage/src/vespa/storage/distributor/bucketdbupdater.h @@ -6,6 +6,7 @@ #include "distributorcomponent.h" #include "distributormessagesender.h" #include "pendingclusterstate.h" +#include "operation_routing_snapshot.h" #include "outdated_nodes_map.h" #include <vespa/document/bucket/bucket.h> #include <vespa/storageapi/messageapi/returncode.h> @@ -15,7 +16,9 @@ #include <vespa/storageframework/generic/clock/timer.h> #include <vespa/storageframework/generic/status/statusreporter.h> #include <vespa/storageapi/messageapi/messagehandler.h> +#include <atomic> #include <list> +#include <mutex> namespace vespalib::xml { class XmlOutputStream; @@ -25,6 +28,7 @@ class XmlAttribute; namespace storage::distributor { class Distributor; +class BucketSpaceDistributionContext; class BucketDBUpdater : public framework::StatusReporter, public api::MessageHandler @@ -70,7 +74,14 @@ public: return ((_pendingClusterState.get() != nullptr) && _pendingClusterState->hasBucketOwnershipTransfer()); } + void set_stale_reads_enabled(bool enabled) noexcept { + _stale_reads_enabled.store(enabled, std::memory_order_relaxed); + } + bool stale_reads_enabled() const noexcept { + return _stale_reads_enabled.load(std::memory_order_relaxed); + } + OperationRoutingSnapshot read_snapshot_for_bucket(const document::Bucket&) const; private: DistributorComponent _distributorComponent; class MergeReplyGuard { @@ -129,6 +140,12 @@ private: } }; + friend class DistributorTestUtil; + // Only to be used by tests that want to ensure both the BucketDBUpdater _and_ the Distributor + // components agree on the currently active cluster state bundle. + // Transitively invokes Distributor::enableClusterStateBundle + void simulate_cluster_state_bundle_activation(const lib::ClusterStateBundle& activated_state); + bool shouldDeferStateEnabling() const noexcept; bool hasPendingClusterState() const; bool pendingClusterStateAccepted(const std::shared_ptr<api::RequestBucketInfoReply>& repl); @@ -166,8 +183,11 @@ private: void updateState(const lib::ClusterState& oldState, const lib::ClusterState& newState); + void update_read_snapshot_before_db_pruning(); void removeSuperfluousBuckets(const lib::ClusterStateBundle& newState, bool is_distribution_config_change); + void update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle& new_state); + void update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state); void replyToPreviousPendingClusterStateIfAny(); void replyToActivationWithActualVersion( @@ -182,9 +202,6 @@ private: void maybe_inject_simulated_db_pruning_delay(); void maybe_inject_simulated_db_merging_delay(); - friend class BucketDBUpdater_Test; - friend class MergeOperation_Test; - /** Removes all copies of buckets that are on nodes that are down. */ @@ -235,6 +252,16 @@ private: std::set<EnqueuedBucketRecheck> _enqueuedRechecks; OutdatedNodesMap _outdatedNodesMap; framework::MilliSecTimer _transitionTimer; + std::atomic<bool> _stale_reads_enabled; + using DistributionContexts = std::unordered_map<document::BucketSpace, + std::shared_ptr<BucketSpaceDistributionContext>, + document::BucketSpace::hash>; + DistributionContexts _active_distribution_contexts; + using DbGuards = std::unordered_map<document::BucketSpace, + std::shared_ptr<BucketDatabase::ReadGuard>, + document::BucketSpace::hash>; + DbGuards _explicit_transition_read_guard; + mutable std::mutex _distribution_context_mutex; }; } diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp index 69b64ac8dc1..4adbdd32669 100644 --- a/storage/src/vespa/storage/distributor/distributor.cpp +++ b/storage/src/vespa/storage/distributor/distributor.cpp @@ -77,7 +77,8 @@ Distributor::Distributor(DistributorComponentRegister& compReg, _distributorStatusDelegate(compReg, *this, *this), _bucketDBStatusDelegate(compReg, *this, _bucketDBUpdater), _idealStateManager(*this, *_bucketSpaceRepo, *_readOnlyBucketSpaceRepo, compReg, manageActiveBucketCopies), - _externalOperationHandler(*this, *_bucketSpaceRepo, *_readOnlyBucketSpaceRepo, _idealStateManager, compReg), + _externalOperationHandler(*this, *_bucketSpaceRepo, *_readOnlyBucketSpaceRepo, + _idealStateManager, compReg, use_btree_database), _threadPool(threadPool), _initializingIsUp(true), _doneInitializeHandler(doneInitHandler), @@ -322,9 +323,7 @@ bool Distributor::handleMessage(const std::shared_ptr<api::StorageMessage>& msg) { if (msg->getType().isReply()) { - std::shared_ptr<api::StorageReply> reply = - std::dynamic_pointer_cast<api::StorageReply>(msg); - + auto reply = std::dynamic_pointer_cast<api::StorageReply>(msg); if (handleReply(reply)) { return true; } @@ -400,6 +399,10 @@ Distributor::enableClusterStateBundle(const lib::ClusterStateBundle& state) } } +OperationRoutingSnapshot Distributor::read_snapshot_for_bucket(const document::Bucket& bucket) const { + return _bucketDBUpdater.read_snapshot_for_bucket(bucket); +} + void Distributor::notifyDistributionChangeEnabled() { @@ -834,6 +837,7 @@ Distributor::enableNextConfig() _bucketDBMetricUpdater.setMinimumReplicaCountingMode(getConfig().getMinimumReplicaCountingMode()); _ownershipSafeTimeCalc->setMaxClusterClockSkew(getConfig().getMaxClusterClockSkew()); _pendingMessageTracker.setNodeBusyDuration(getConfig().getInhibitMergesOnBusyNodeDuration()); + _bucketDBUpdater.set_stale_reads_enabled(getConfig().allowStaleReadsDuringClusterStateTransitions()); } void diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h index 638704adf24..48d9145eec7 100644 --- a/storage/src/vespa/storage/distributor/distributor.h +++ b/storage/src/vespa/storage/distributor/distributor.h @@ -170,6 +170,8 @@ public: return *_readOnlyBucketSpaceRepo; } + OperationRoutingSnapshot read_snapshot_for_bucket(const document::Bucket&) const override; + class Status; class MetricUpdateHook : public framework::MetricUpdateHook { diff --git a/storage/src/vespa/storage/distributor/distributor_bucket_space.h b/storage/src/vespa/storage/distributor/distributor_bucket_space.h index 26a0ee9098c..8fbb99dfe89 100644 --- a/storage/src/vespa/storage/distributor/distributor_bucket_space.h +++ b/storage/src/vespa/storage/distributor/distributor_bucket_space.h @@ -48,6 +48,9 @@ public: void setClusterState(std::shared_ptr<const lib::ClusterState> clusterState); const lib::ClusterState &getClusterState() const noexcept { return *_clusterState; } + const std::shared_ptr<const lib::ClusterState>& cluster_state_sp() const noexcept { + return _clusterState; + } void setDistribution(std::shared_ptr<const lib::Distribution> distribution); @@ -55,6 +58,9 @@ public: const lib::Distribution& getDistribution() const noexcept { return *_distribution; } + const std::shared_ptr<const lib::Distribution>& distribution_sp() const noexcept { + return _distribution; + } }; diff --git a/storage/src/vespa/storage/distributor/distributorinterface.h b/storage/src/vespa/storage/distributor/distributorinterface.h index d9f037bb8f1..aba58e112dc 100644 --- a/storage/src/vespa/storage/distributor/distributorinterface.h +++ b/storage/src/vespa/storage/distributor/distributorinterface.h @@ -4,6 +4,7 @@ #include "bucketgctimecalculator.h" #include "distributormessagesender.h" #include "bucketownership.h" +#include "operation_routing_snapshot.h" #include <vespa/storage/bucketdb/bucketdatabase.h> #include <vespa/document/bucket/bucket.h> @@ -49,6 +50,8 @@ public: */ virtual const lib::ClusterStateBundle& getClusterStateBundle() const = 0; + virtual OperationRoutingSnapshot read_snapshot_for_bucket(const document::Bucket&) const = 0; + /** * Returns true if the node is currently initializing. */ diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp index 1b88f02cac6..221c516a56e 100644 --- a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp +++ b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp @@ -1,5 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "bucket_space_distribution_context.h" #include "externaloperationhandler.h" #include "distributor.h" #include <vespa/document/base/documentid.h> @@ -12,7 +13,6 @@ #include <vespa/storage/distributor/operations/external/statbucketlistoperation.h> #include <vespa/storage/distributor/operations/external/removelocationoperation.h> #include <vespa/storage/distributor/operations/external/visitoroperation.h> -#include <vespa/document/util/stringutil.h> #include <vespa/storageapi/message/persistence.h> #include <vespa/storageapi/message/removelocation.h> #include <vespa/storageapi/message/stat.h> @@ -30,11 +30,16 @@ ExternalOperationHandler::ExternalOperationHandler(Distributor& owner, DistributorBucketSpaceRepo& bucketSpaceRepo, DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo, const MaintenanceOperationGenerator& gen, - DistributorComponentRegister& compReg) + DistributorComponentRegister& compReg, + bool enable_concurrent_gets) : DistributorComponent(owner, bucketSpaceRepo, readOnlyBucketSpaceRepo, compReg, "External operation handler"), _operationGenerator(gen), - _rejectFeedBeforeTimeReached() // At epoch -{ } + _rejectFeedBeforeTimeReached(), // At epoch + _non_main_thread_ops_mutex(), + _non_main_thread_ops_owner(owner, getClock()), + _enable_concurrent_gets(enable_concurrent_gets) +{ +} ExternalOperationHandler::~ExternalOperationHandler() = default; @@ -78,24 +83,32 @@ void ExternalOperationHandler::bounce_with_result(api::StorageCommand& cmd, cons sendUp(std::shared_ptr<api::StorageMessage>(reply.release())); } -void ExternalOperationHandler::bounce_with_wrong_distribution(api::StorageCommand& cmd) { +void ExternalOperationHandler::bounce_with_wrong_distribution(api::StorageCommand& cmd, + const lib::ClusterState& cluster_state) +{ // Distributor ownership is equal across bucket spaces, so always send back default space state. // This also helps client avoid getting confused by possibly observing different actual // (derived) state strings for global/non-global document types for the same state version. // Similarly, if we've yet to activate any version at all we send back BUSY instead // of a suspiciously empty WrongDistributionReply. // TOOD consider NOT_READY instead of BUSY once we're sure this won't cause any other issues. - const auto& cluster_state = _bucketSpaceRepo.get(document::FixedBucketSpaces::default_space()).getClusterState(); if (cluster_state.getVersion() != 0) { auto cluster_state_str = cluster_state.toString(); - LOG(debug, "Got message with wrong distribution, sending back state '%s'", cluster_state_str.c_str()); + LOG(debug, "Got %s with wrong distribution, sending back state '%s'", + cmd.toString().c_str(), cluster_state_str.c_str()); bounce_with_result(cmd, api::ReturnCode(api::ReturnCode::WRONG_DISTRIBUTION, cluster_state_str)); } else { // Only valid for empty startup state - LOG(debug, "Got message with wrong distribution, but no cluster state activated yet. Sending back BUSY"); + LOG(debug, "Got %s with wrong distribution, but no cluster state activated yet. Sending back BUSY", + cmd.toString().c_str()); bounce_with_result(cmd, api::ReturnCode(api::ReturnCode::BUSY, "No cluster state activated yet")); } } +void ExternalOperationHandler::bounce_with_wrong_distribution(api::StorageCommand& cmd) { + const auto& cluster_state = _bucketSpaceRepo.get(document::FixedBucketSpaces::default_space()).getClusterState(); + bounce_with_wrong_distribution(cmd, cluster_state); +} + void ExternalOperationHandler::bounce_with_busy_during_state_transition( api::StorageCommand& cmd, const lib::ClusterState& current_state, @@ -283,10 +296,23 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, Get) { document::Bucket bucket(cmd->getBucket().getBucketSpace(), getBucketId(cmd->getDocumentId())); auto& metrics = getMetrics().gets[cmd->getLoadType()]; - bounce_or_invoke_read_only_op(*cmd, bucket, metrics, [&](auto& bucket_space_repo) { - _op = std::make_shared<GetOperation>(*this, bucket_space_repo.get(cmd->getBucket().getBucketSpace()), - cmd, metrics); - }); + auto snapshot = getDistributor().read_snapshot_for_bucket(bucket); + if (!snapshot.is_routable()) { + const auto& ctx = snapshot.context(); + if (ctx.has_pending_state_transition()) { + bounce_with_busy_during_state_transition(*cmd, *ctx.default_active_cluster_state(), + *ctx.pending_cluster_state()); + } else { + bounce_with_wrong_distribution(*cmd, *snapshot.context().default_active_cluster_state()); + metrics.failures.wrongdistributor.inc(); // TODO thread safety for updates + } + return true; + } + // The snapshot is aware of whether stale reads are enabled, so we don't have to check that here. + const auto* space_repo = snapshot.bucket_space_repo(); + assert(space_repo != nullptr); + _op = std::make_shared<GetOperation>(*this, space_repo->get(bucket.getBucketSpace()), + snapshot.steal_read_guard(), cmd, metrics); return true; } diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.h b/storage/src/vespa/storage/distributor/externaloperationhandler.h index 655feb5d00c..9db078af198 100644 --- a/storage/src/vespa/storage/distributor/externaloperationhandler.h +++ b/storage/src/vespa/storage/distributor/externaloperationhandler.h @@ -8,6 +8,7 @@ #include <vespa/storage/distributor/distributorcomponent.h> #include <vespa/storageapi/messageapi/messagehandler.h> #include <chrono> +#include <mutex> namespace storage { @@ -39,7 +40,8 @@ public: DistributorBucketSpaceRepo& bucketSpaceRepo, DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo, const MaintenanceOperationGenerator&, - DistributorComponentRegister& compReg); + DistributorComponentRegister& compReg, + bool enable_concurrent_gets); ~ExternalOperationHandler() override; @@ -55,6 +57,9 @@ private: OperationSequencer _mutationSequencer; Operation::SP _op; TimePoint _rejectFeedBeforeTimeReached; + mutable std::mutex _non_main_thread_ops_mutex; + OperationOwner _non_main_thread_ops_owner; + bool _enable_concurrent_gets; template <typename Func> void bounce_or_invoke_read_only_op(api::StorageCommand& cmd, @@ -62,6 +67,8 @@ private: PersistenceOperationMetricSet& metrics, Func f); + void bounce_with_wrong_distribution(api::StorageCommand& cmd, const lib::ClusterState& cluster_state); + // Bounce with the current _default_ space cluster state. void bounce_with_wrong_distribution(api::StorageCommand& cmd); void bounce_with_busy_during_state_transition(api::StorageCommand& cmd, const lib::ClusterState& current_state, diff --git a/storage/src/vespa/storage/distributor/operation_routing_snapshot.cpp b/storage/src/vespa/storage/distributor/operation_routing_snapshot.cpp new file mode 100644 index 00000000000..ec97e51b66d --- /dev/null +++ b/storage/src/vespa/storage/distributor/operation_routing_snapshot.cpp @@ -0,0 +1,30 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "operation_routing_snapshot.h" + +namespace storage::distributor { + +OperationRoutingSnapshot::OperationRoutingSnapshot(std::shared_ptr<const BucketSpaceDistributionContext> context, + std::shared_ptr<BucketDatabase::ReadGuard> read_guard, + const DistributorBucketSpaceRepo* bucket_space_repo) + : _context(std::move(context)), + _read_guard(std::move(read_guard)), + _bucket_space_repo(bucket_space_repo) +{} + +OperationRoutingSnapshot::~OperationRoutingSnapshot() = default; + +OperationRoutingSnapshot OperationRoutingSnapshot::make_not_routable_in_state( + std::shared_ptr<const BucketSpaceDistributionContext> context) +{ + return OperationRoutingSnapshot(std::move(context), std::shared_ptr<BucketDatabase::ReadGuard>(), nullptr); +} + +OperationRoutingSnapshot OperationRoutingSnapshot::make_routable_with_guard( + std::shared_ptr<const BucketSpaceDistributionContext> context, + std::shared_ptr<BucketDatabase::ReadGuard> read_guard, + const DistributorBucketSpaceRepo& bucket_space_repo) +{ + return OperationRoutingSnapshot(std::move(context), std::move(read_guard), &bucket_space_repo); +} + +} diff --git a/storage/src/vespa/storage/distributor/operation_routing_snapshot.h b/storage/src/vespa/storage/distributor/operation_routing_snapshot.h new file mode 100644 index 00000000000..16ec8fef1c7 --- /dev/null +++ b/storage/src/vespa/storage/distributor/operation_routing_snapshot.h @@ -0,0 +1,60 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/storage/bucketdb/bucketdatabase.h> +#include <memory> + +namespace storage::distributor { + +class BucketSpaceDistributionContext; +class DistributorBucketSpaceRepo; + +/** + * An "operation routing snapshot" is intended to provide a stable means of computing + * bucket routing targets and performing database lookups for a particular bucket space + * in a potentially multi-threaded setting. When using multiple threads, both the current + * cluster/distribution state as well as the underlying bucket database may change + * independent of each other when observed from any other thread than the main distributor + * thread. Additionally, the bucket management system may operate with separate read-only + * databases during state transitions, complicating things further. + * + * By using an OperationRoutingSnapshot, a caller gets a consistent view of the world + * that stays valid throughout the operation's life time. + * + * Note that holding the DB read guard should be done for as short a time as possible to + * avoid elevated memory usage caused by data stores not being able to free on-hold items. + */ +class OperationRoutingSnapshot { + std::shared_ptr<const BucketSpaceDistributionContext> _context; + std::shared_ptr<BucketDatabase::ReadGuard> _read_guard; + const DistributorBucketSpaceRepo* _bucket_space_repo; +public: + OperationRoutingSnapshot(std::shared_ptr<const BucketSpaceDistributionContext> context, + std::shared_ptr<BucketDatabase::ReadGuard> read_guard, + const DistributorBucketSpaceRepo* bucket_space_repo); + + static OperationRoutingSnapshot make_not_routable_in_state(std::shared_ptr<const BucketSpaceDistributionContext> context); + static OperationRoutingSnapshot make_routable_with_guard(std::shared_ptr<const BucketSpaceDistributionContext> context, + std::shared_ptr<BucketDatabase::ReadGuard> read_guard, + const DistributorBucketSpaceRepo& bucket_space_repo); + + OperationRoutingSnapshot(const OperationRoutingSnapshot&) noexcept = default; + OperationRoutingSnapshot& operator=(const OperationRoutingSnapshot&) noexcept = default; + OperationRoutingSnapshot(OperationRoutingSnapshot&&) noexcept = default; + OperationRoutingSnapshot& operator=(OperationRoutingSnapshot&&) noexcept = default; + + ~OperationRoutingSnapshot(); + + const BucketSpaceDistributionContext& context() const noexcept { return *_context; } + std::shared_ptr<BucketDatabase::ReadGuard> steal_read_guard() noexcept { + return std::move(_read_guard); + } + bool is_routable() const noexcept { + return (_read_guard.get() != nullptr); + } + const DistributorBucketSpaceRepo* bucket_space_repo() const noexcept { + return _bucket_space_repo; + } +}; + +} diff --git a/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp index 6cfc688db0e..7ff2e298791 100644 --- a/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp @@ -45,7 +45,8 @@ GetOperation::GroupId::operator==(const GroupId& other) const } GetOperation::GetOperation(DistributorComponent& manager, - DistributorBucketSpace &bucketSpace, + const DistributorBucketSpace &bucketSpace, + std::shared_ptr<BucketDatabase::ReadGuard> read_guard, std::shared_ptr<api::GetCommand> msg, PersistenceOperationMetricSet& metric) : Operation(), @@ -58,7 +59,7 @@ GetOperation::GetOperation(DistributorComponent& manager, _metric(metric), _operationTimer(manager.getClock()) { - assignTargetNodeGroups(); + assignTargetNodeGroups(*read_guard); } void @@ -213,13 +214,13 @@ GetOperation::sendReply(DistributorMessageSender& sender) } void -GetOperation::assignTargetNodeGroups() +GetOperation::assignTargetNodeGroups(const BucketDatabase::ReadGuard& read_guard) { document::BucketIdFactory bucketIdFactory; document::BucketId bid = bucketIdFactory.getBucketId(_msg->getDocumentId()); std::vector<BucketDatabase::Entry> entries; - _bucketSpace.getBucketDatabase().acquire_read_guard()->find_parents_and_self(bid, entries); + read_guard.find_parents_and_self(bid, entries); for (uint32_t j = 0; j < entries.size(); ++j) { const BucketDatabase::Entry& e = entries[j]; diff --git a/storage/src/vespa/storage/distributor/operations/external/getoperation.h b/storage/src/vespa/storage/distributor/operations/external/getoperation.h index 3936f13077e..fe4dab5e9f2 100644 --- a/storage/src/vespa/storage/distributor/operations/external/getoperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/getoperation.h @@ -3,7 +3,7 @@ #include <vespa/storageapi/defs.h> #include <vespa/storage/distributor/operations/operation.h> -#include <vespa/storage/bucketdb/bucketcopy.h> +#include <vespa/storage/bucketdb/bucketdatabase.h> #include <vespa/storageapi/messageapi/storagemessage.h> #include <vespa/storageframework/generic/clock/timer.h> @@ -23,8 +23,11 @@ class DistributorBucketSpace; class GetOperation : public Operation { public: - GetOperation(DistributorComponent& manager, DistributorBucketSpace &bucketSpace, - std::shared_ptr<api::GetCommand> msg, PersistenceOperationMetricSet& metric); + GetOperation(DistributorComponent& manager, + const DistributorBucketSpace &bucketSpace, + std::shared_ptr<BucketDatabase::ReadGuard> read_guard, + std::shared_ptr<api::GetCommand> msg, + PersistenceOperationMetricSet& metric); void onClose(DistributorMessageSender& sender) override; void onStart(DistributorMessageSender& sender) override; @@ -74,7 +77,7 @@ private: std::map<GroupId, GroupVector> _responses; DistributorComponent& _manager; - DistributorBucketSpace &_bucketSpace; + const DistributorBucketSpace &_bucketSpace; std::shared_ptr<api::GetCommand> _msg; @@ -89,7 +92,7 @@ private: void sendReply(DistributorMessageSender& sender); bool sendForChecksum(DistributorMessageSender& sender, const document::BucketId& id, GroupVector& res); - void assignTargetNodeGroups(); + void assignTargetNodeGroups(const BucketDatabase::ReadGuard& read_guard); bool copyIsOnLocalNode(const BucketCopy&) const; /** * Returns the vector index of the target to send to, or -1 if none diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp index b7ebafc114c..b3326a43be2 100644 --- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp @@ -178,7 +178,8 @@ TwoPhaseUpdateOperation::startSafePathUpdate(DistributorMessageSender& sender) document::Bucket bucket(_updateCmd->getBucket().getBucketSpace(), document::BucketId(0)); auto get = std::make_shared<api::GetCommand>(bucket, _updateCmd->getDocumentId(),"[all]"); copyMessageSettings(*_updateCmd, *get); - auto getOperation = std::make_shared<GetOperation>(_manager, _bucketSpace, get, _getMetric); + auto getOperation = std::make_shared<GetOperation>( + _manager, _bucketSpace, _bucketSpace.getBucketDatabase().acquire_read_guard(), get, _getMetric); GetOperation & op = *getOperation; IntermediateMessageSender intermediate(_sentMessageMap, std::move(getOperation), sender); op.start(intermediate, _manager.getClock().getTimeInMillis()); |